mirror of
https://github.com/loro-dev/loro.git
synced 2025-01-23 05:24:51 +00:00
fix: wasm hierarchy notify dead lock
This commit is contained in:
parent
e3a93be6a2
commit
80640ca4e1
4 changed files with 128 additions and 17 deletions
|
@ -435,7 +435,8 @@ pub trait ContainerWrapper {
|
|||
let ans = match event {
|
||||
Some(event) => {
|
||||
debug_log::debug_log!("get event");
|
||||
hierarchy.try_lock().unwrap().notify(event);
|
||||
// let mut hierarchy = hierarchy.try_lock().unwrap();
|
||||
Hierarchy::notify_without_lock(hierarchy, event);
|
||||
Ok(ans)
|
||||
}
|
||||
None => Ok(ans),
|
||||
|
|
|
@ -1,4 +1,7 @@
|
|||
use std::{fmt::Debug, sync::RwLockWriteGuard};
|
||||
use std::{
|
||||
fmt::Debug,
|
||||
sync::{Arc, Mutex, RwLockWriteGuard},
|
||||
};
|
||||
|
||||
use fxhash::{FxHashMap, FxHashSet};
|
||||
|
||||
|
@ -224,21 +227,91 @@ impl Hierarchy {
|
|||
false
|
||||
}
|
||||
|
||||
pub(crate) fn notify_without_path<'a, 'b>(
|
||||
&'a mut self,
|
||||
mut raw_event: RawEvent,
|
||||
store: RwLockWriteGuard<'b, LogStore>,
|
||||
) {
|
||||
let reg = &store.reg;
|
||||
// pub(crate) fn notify_without_path<'a, 'b>(
|
||||
// &'a mut self,
|
||||
// mut raw_event: RawEvent,
|
||||
// store: RwLockWriteGuard<'b, LogStore>,
|
||||
// ) {
|
||||
// let reg = &store.reg;
|
||||
|
||||
if raw_event.abs_path.is_empty() {
|
||||
let Some(absolute_path) = self.get_path(reg, &raw_event.container_id, None) else {
|
||||
return ;
|
||||
// if raw_event.abs_path.is_empty() {
|
||||
// let Some(absolute_path) = self.get_path(reg, &raw_event.container_id, None) else {
|
||||
// return ;
|
||||
// };
|
||||
// raw_event.abs_path = absolute_path;
|
||||
// }
|
||||
// drop(store);
|
||||
// self.notify(raw_event);
|
||||
// }
|
||||
|
||||
pub(crate) fn notify_without_lock(hierarchy: Arc<Mutex<Hierarchy>>, raw_event: RawEvent) {
|
||||
let (mut nodes, mut root_observers) = {
|
||||
let mut hierarchy = hierarchy.try_lock().unwrap();
|
||||
(
|
||||
std::mem::take(&mut hierarchy.nodes),
|
||||
std::mem::take(&mut hierarchy.root_observers),
|
||||
)
|
||||
};
|
||||
raw_event.abs_path = absolute_path;
|
||||
|
||||
// mut nodes: FxHashMap<ContainerID, Node>,
|
||||
// mut root_observers: FxHashMap<SubscriptionID, Observer>,
|
||||
|
||||
let target_id = raw_event.container_id;
|
||||
let mut event = Event {
|
||||
absolute_path: raw_event.abs_path,
|
||||
relative_path: Default::default(),
|
||||
old_version: raw_event.old_version,
|
||||
new_version: raw_event.new_version,
|
||||
current_target: Some(target_id.clone()),
|
||||
target: target_id.clone(),
|
||||
diff: raw_event.diff,
|
||||
local: raw_event.local,
|
||||
};
|
||||
let mut current_target_id = Some(target_id.clone());
|
||||
let mut count = 0;
|
||||
let mut path_to_root = event.absolute_path.clone();
|
||||
path_to_root.reverse();
|
||||
let node = nodes.entry(target_id).or_default();
|
||||
if !node.observers.is_empty() {
|
||||
for (_, observer) in node.observers.iter_mut() {
|
||||
observer(&event);
|
||||
}
|
||||
}
|
||||
while let Some(id) = current_target_id {
|
||||
let Some(node) = nodes.get_mut(&id) else {
|
||||
break;
|
||||
};
|
||||
if !node.deep_observers.is_empty() {
|
||||
let mut relative_path = path_to_root[..count].to_vec();
|
||||
relative_path.reverse();
|
||||
event.relative_path = relative_path;
|
||||
event.current_target = Some(id.clone());
|
||||
for (_, observer) in node.deep_observers.iter_mut() {
|
||||
observer(&event);
|
||||
}
|
||||
}
|
||||
|
||||
count += 1;
|
||||
if node.parent.is_none() {
|
||||
debug_assert!(id.is_root());
|
||||
}
|
||||
|
||||
current_target_id = node.parent.as_ref().cloned();
|
||||
}
|
||||
|
||||
if !root_observers.is_empty() {
|
||||
debug_log::debug_log!("notify root");
|
||||
event.relative_path = event.absolute_path.clone();
|
||||
event.current_target = None;
|
||||
for (_, observer) in root_observers.iter_mut() {
|
||||
observer(&event);
|
||||
}
|
||||
}
|
||||
{
|
||||
let mut hierarchy = hierarchy.try_lock().unwrap();
|
||||
hierarchy.nodes = nodes;
|
||||
hierarchy.root_observers = root_observers;
|
||||
}
|
||||
drop(store);
|
||||
self.notify(raw_event);
|
||||
}
|
||||
|
||||
pub fn notify(&mut self, raw_event: RawEvent) {
|
||||
|
@ -348,4 +421,13 @@ impl Hierarchy {
|
|||
self.notify(event);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn send_notifications_without_lock(
|
||||
hierarchy: Arc<Mutex<Hierarchy>>,
|
||||
events: Vec<RawEvent>,
|
||||
) {
|
||||
for event in events {
|
||||
Hierarchy::notify_without_lock(hierarchy.clone(), event);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -49,7 +49,7 @@ impl LoroCore {
|
|||
#[inline(always)]
|
||||
pub fn get_list<I: Into<ContainerIdRaw>>(&mut self, id: I) -> List {
|
||||
let id: ContainerIdRaw = id.into();
|
||||
let mut store = self.log_store.write().unwrap();
|
||||
let mut store = self.log_store.try_write().unwrap();
|
||||
let instance = store.get_or_create_container(&id.with_type(ContainerType::List));
|
||||
let cid = store.this_client_id();
|
||||
List::from_instance(instance, cid)
|
||||
|
@ -58,7 +58,7 @@ impl LoroCore {
|
|||
#[inline(always)]
|
||||
pub fn get_map<I: Into<ContainerIdRaw>>(&mut self, id: I) -> Map {
|
||||
let id: ContainerIdRaw = id.into();
|
||||
let mut store = self.log_store.write().unwrap();
|
||||
let mut store = self.log_store.try_write().unwrap();
|
||||
let instance = store.get_or_create_container(&id.with_type(ContainerType::Map));
|
||||
let cid = store.this_client_id();
|
||||
Map::from_instance(instance, cid)
|
||||
|
@ -67,7 +67,7 @@ impl LoroCore {
|
|||
#[inline(always)]
|
||||
pub fn get_text<I: Into<ContainerIdRaw>>(&mut self, id: I) -> Text {
|
||||
let id: ContainerIdRaw = id.into();
|
||||
let mut store = self.log_store.write().unwrap();
|
||||
let mut store = self.log_store.try_write().unwrap();
|
||||
let instance = store.get_or_create_container(&id.with_type(ContainerType::Text));
|
||||
let cid = store.this_client_id();
|
||||
Text::from_instance(instance, cid)
|
||||
|
@ -126,4 +126,9 @@ impl LoroCore {
|
|||
let mut h = self.hierarchy.try_lock().unwrap();
|
||||
h.send_notifications(events);
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
pub fn notify_without_lock(&self, events: Vec<RawEvent>) {
|
||||
Hierarchy::send_notifications_without_lock(self.hierarchy.clone(), events)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -169,3 +169,26 @@ Deno.test({ name: "test prelim" }, async (t) => {
|
|||
}]]);
|
||||
});
|
||||
});
|
||||
|
||||
Deno.test("subscribe_lock", () => {
|
||||
const loro = new Loro();
|
||||
const text = loro.getText("text");
|
||||
const list = loro.getList("list");
|
||||
let count = 0;
|
||||
let i = 3;
|
||||
const sub = loro.subscribe(() => {
|
||||
if (i >0){
|
||||
list.insert(loro, 0, i);
|
||||
i--;
|
||||
}
|
||||
|
||||
count += 1;
|
||||
});
|
||||
text.insert(loro, 0, "hello world");
|
||||
assertEquals(count, 1);
|
||||
text.insert(loro, 0, "hello world");
|
||||
assertEquals(count, 2);
|
||||
loro.unsubscribe(sub);
|
||||
text.insert(loro, 0, "hello world");
|
||||
assertEquals(count, 2);
|
||||
});
|
Loading…
Reference in a new issue