fix: make LoroCore: Send + Sync (#61)

This commit is contained in:
Zixuan Chen 2023-01-06 21:03:11 +08:00 committed by GitHub
parent 6a02ce1568
commit a03c68a993
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 90 additions and 37 deletions

View file

@ -20,7 +20,7 @@ impl Debug for Configure {
}
}
pub trait SecureRandomGenerator {
pub trait SecureRandomGenerator: Send + Sync {
fn fill_byte(&self, dest: &mut [u8]);
fn next_u64(&self) -> u64 {
let mut buf = [0u8; 8];

View file

@ -68,7 +68,7 @@ impl TryFrom<&str> for ContainerType {
}
}
pub trait Container: Debug + Any + Unpin {
pub trait Container: Debug + Any + Unpin + Send + Sync {
fn id(&self) -> &ContainerID;
fn type_(&self) -> ContainerType;
fn get_value(&self) -> LoroValue;

View file

@ -311,7 +311,7 @@ impl Sliceable for InnerListOp {
}
}
#[cfg(test)]
#[cfg(all(test, feature = "test_utils"))]
mod test {
use rle::{Mergable, Sliceable};

View file

@ -57,6 +57,11 @@ pub struct Tracker {
id_to_cursor: CursorMap,
}
// SAFETY: Tracker is safe to be sent to another thread
unsafe impl Send for Tracker {}
// SAFETY: &Tracker is safe to be shared by threads
unsafe impl Sync for Tracker {}
impl From<ID> for u128 {
fn from(id: ID) -> Self {
((id.client_id as u128) << 64) | id.counter as u128

View file

@ -22,7 +22,7 @@ use super::y_span::{YSpan, YSpanTreeTrait};
// marker can only live while the bumpalo is alive. So we are safe to use 'static here
#[non_exhaustive]
#[derive(Debug, Clone, EnumAsInner, PartialEq, Eq)]
pub enum Marker {
pub(super) enum Marker {
Insert {
ptr: NonNull<LeafNode<'static, YSpan, YSpanTreeTrait>>,
len: usize,
@ -195,7 +195,7 @@ impl Mergable for Marker {
}
#[derive(Debug, Default)]
pub struct CursorMap(RangeMap<u128, Marker, BumpMode>);
pub(super) struct CursorMap(RangeMap<u128, Marker, BumpMode>);
impl Deref for CursorMap {
type Target = RangeMap<u128, Marker, BumpMode>;

View file

@ -73,6 +73,6 @@ pub struct MapDiff {
pub deleted: FxHashSet<InternalString>,
}
pub type Observer = Box<dyn FnMut(&Event)>;
pub type Observer = Box<dyn FnMut(&Event) + Send>;
pub type SubscriptionID = u32;

View file

@ -1,4 +1,9 @@
use std::{cell::RefCell, collections::HashSet, fmt::Debug, rc::Rc};
use std::{
collections::HashSet,
fmt::Debug,
rc::Rc,
sync::{Arc, Mutex},
};
use arbitrary::Arbitrary;
use enum_as_inner::EnumAsInner;
@ -45,10 +50,10 @@ pub enum Action {
struct Actor {
loro: LoroCore,
value_tracker: Rc<RefCell<LoroValue>>,
map_tracker: Rc<RefCell<FxHashMap<String, LoroValue>>>,
list_tracker: Rc<RefCell<Vec<LoroValue>>>,
text_tracker: Rc<RefCell<String>>,
value_tracker: Arc<Mutex<LoroValue>>,
map_tracker: Arc<Mutex<FxHashMap<String, LoroValue>>>,
list_tracker: Arc<Mutex<Vec<LoroValue>>>,
text_tracker: Arc<Mutex<String>>,
map_containers: Vec<Map>,
list_containers: Vec<List>,
text_containers: Vec<Text>,
@ -58,7 +63,7 @@ impl Actor {
fn new(id: ClientID) -> Self {
let mut actor = Actor {
loro: LoroCore::new(Default::default(), Some(id)),
value_tracker: Rc::new(RefCell::new(LoroValue::Map(Default::default()))),
value_tracker: Arc::new(Mutex::new(LoroValue::Map(Default::default()))),
map_tracker: Default::default(),
list_tracker: Default::default(),
text_tracker: Default::default(),
@ -67,19 +72,19 @@ impl Actor {
text_containers: Default::default(),
};
let root_value = Rc::clone(&actor.value_tracker);
let root_value = Arc::clone(&actor.value_tracker);
actor.loro.subscribe_deep(Box::new(move |event| {
let mut root_value = root_value.borrow_mut();
let mut root_value = root_value.lock().unwrap();
root_value.apply(&event.relative_path, &event.diff);
}));
let log_store = actor.loro.log_store.write().unwrap();
let mut hierarchy = actor.loro.hierarchy.try_lock().unwrap();
let text = Rc::clone(&actor.text_tracker);
let text = Arc::clone(&actor.text_tracker);
hierarchy.subscribe(
&ContainerID::new_root("text", ContainerType::Text),
Box::new(move |event| {
let mut text = text.borrow_mut();
let mut text = text.lock().unwrap();
for diff in event.diff.iter() {
match diff {
Diff::Text(delta) => {
@ -106,11 +111,11 @@ impl Actor {
false,
);
let map = Rc::clone(&actor.map_tracker);
let map = Arc::clone(&actor.map_tracker);
hierarchy.subscribe(
&ContainerID::new_root("map", ContainerType::Map),
Box::new(move |event| {
let mut map = map.borrow_mut();
let mut map = map.lock().unwrap();
for diff in event.diff.iter() {
match diff {
Diff::Map(map_diff) => {
@ -131,11 +136,11 @@ impl Actor {
false,
);
let list = Rc::clone(&actor.list_tracker);
let list = Arc::clone(&actor.list_tracker);
hierarchy.subscribe(
&ContainerID::new_root("list", ContainerType::List),
Box::new(move |event| {
let mut list = list.borrow_mut();
let mut list = list.lock().unwrap();
for diff in event.diff.iter() {
match diff {
Diff::List(delta) => {
@ -590,24 +595,27 @@ fn check_eq(a_actor: &mut Actor, b_actor: &mut Actor) {
let a_result = a_doc.to_json();
debug_log::debug_log!("{}", a_result.to_json_pretty());
assert_eq!(&a_result, &b_doc.to_json());
assert_value_eq(&a_result, &a_actor.value_tracker.borrow());
assert_value_eq(&a_result, &a_actor.value_tracker.lock().unwrap());
let a = a_doc.get_text("text");
let value_a = a.get_value();
assert_eq!(
&**value_a.as_string().unwrap(),
&*a_actor.text_tracker.borrow(),
&*a_actor.text_tracker.lock().unwrap(),
);
let a = a_doc.get_map("map");
let value_a = a.get_value();
assert_eq!(&**value_a.as_map().unwrap(), &*a_actor.map_tracker.borrow());
assert_eq!(
&**value_a.as_map().unwrap(),
&*a_actor.map_tracker.lock().unwrap()
);
let a = a_doc.get_list("list");
let value_a = a.get_value();
assert_eq!(
&**value_a.as_list().unwrap(),
&*a_actor.list_tracker.borrow(),
&*a_actor.list_tracker.lock().unwrap(),
);
}

View file

@ -1,17 +1,20 @@
use std::cell::RefCell;
use std::rc::Rc;
use ctor::ctor;
use loro_core::container::registry::ContainerWrapper;
use loro_core::container::ContainerID;
use loro_core::context::Context;
use loro_core::event::Index;
use loro_core::id::ID;
use loro_core::log_store::{EncodeConfig, EncodeMode};
use loro_core::{ContainerType, LoroCore, LoroValue, VersionVector};
#[test]
fn send_sync() {
fn example<T: Send + Sync + 'static>(l: T) {}
let loro = LoroCore::default();
example(loro);
}
#[test]
fn example_list() {
let mut doc = LoroCore::default();
@ -57,11 +60,13 @@ fn subscribe_deep() {
#[test]
#[cfg(feature = "json")]
fn text_observe() {
use std::sync::{Arc, Mutex};
let mut doc = LoroCore::default();
let track_value = Rc::new(RefCell::new(LoroValue::Map(Default::default())));
let moved_value = Rc::clone(&track_value);
let track_value = Arc::new(Mutex::new(LoroValue::Map(Default::default())));
let moved_value = Arc::clone(&track_value);
doc.subscribe_deep(Box::new(move |event| {
let mut v = RefCell::borrow_mut(&*moved_value);
let mut v = moved_value.lock().unwrap();
v.apply(&event.relative_path, &event.diff);
}));
let mut map = doc.get_map("meta");
@ -74,14 +79,14 @@ fn text_observe() {
let todo_item = list.insert(&doc, 0, ContainerType::Map).unwrap().unwrap();
let mut todo_item = doc.get_map(todo_item);
todo_item.insert(&doc, "todo", "coding").unwrap();
assert_eq!(&doc.to_json(), &*RefCell::borrow(&track_value));
assert_eq!(&doc.to_json(), &*track_value.lock().unwrap());
let mut text = doc.get_text("text");
text.insert(&doc, 0, "hello ").unwrap();
let mut doc_b = LoroCore::default();
let mut text_b = doc_b.get_text("text");
text_b.insert(&doc_b, 0, "world").unwrap();
doc.import(doc_b.export(Default::default()));
assert_eq!(&doc.to_json(), &*RefCell::borrow(&track_value));
assert_eq!(&doc.to_json(), &*track_value.lock().unwrap());
println!("{}", doc.to_json().to_json());
}

View file

@ -65,6 +65,38 @@ impl SecureRandomGenerator for MathRandom {
}
}
mod observer {
use std::thread::ThreadId;
use wasm_bindgen::JsValue;
/// We need to wrap the observer function in a struct so that we can implement Send for it.
/// But it's not Send essentially, so we need to check it manually in runtime.
pub(crate) struct Observer {
f: js_sys::Function,
thread: ThreadId,
}
impl Observer {
pub fn new(f: js_sys::Function) -> Self {
Self {
f,
thread: std::thread::current().id(),
}
}
pub fn call1(&self, arg: &JsValue) {
if std::thread::current().id() == self.thread {
self.f.call1(&JsValue::NULL, arg).unwrap();
} else {
panic!("Observer called from different thread")
}
}
}
unsafe impl Send for Observer {}
}
#[wasm_bindgen]
impl Loro {
#[wasm_bindgen(constructor)]
@ -192,9 +224,9 @@ impl Loro {
// TODO: convert event and event sub config
pub fn subscribe(&self, f: js_sys::Function) -> u32 {
let observer = observer::Observer::new(f);
self.0.borrow_mut().subscribe_deep(Box::new(move |e| {
f.call1(&JsValue::NULL, &JsValue::from_bool(e.local))
.unwrap();
observer.call1(&JsValue::from_bool(e.local));
}))
}

View file

@ -650,6 +650,4 @@ mod test {
vec![&V::new(0, 3, "a"), &V::new(3, 6, "c")]
);
}
static_assertions::assert_not_impl_any!(RangeMap<usize, Range<usize>>: Sync, Send);
}

View file

@ -30,6 +30,11 @@ pub struct RleTree<T: Rle + 'static, A: RleTreeTrait<T> + 'static> {
pub node: <A::Arena as arena::Arena>::Boxed<'this, Node<'this, T, A>>,
}
// SAFETY: tree is safe to send to another thread
unsafe impl<T: Rle + 'static + Send, A: RleTreeTrait<T> + 'static> Send for RleTree<T, A> {}
// SAFETY: &tree is safe to be shared between threads
unsafe impl<T: Rle + 'static + Send + Sync, A: RleTreeTrait<T> + 'static> Sync for RleTree<T, A> {}
impl<T: Rle + 'static, A: RleTreeTrait<T> + 'static> Default for RleTree<T, A> {
fn default() -> Self {
assert!(