fix: missing event when checkout (#275)

* fix: refresh depth in diff_calc when checkout

* refactor: replace u16 with Option<NonZero>

---------

Co-authored-by: Zixuan Chen <remch183@outlook.com>
This commit is contained in:
Leon Zhao 2024-02-29 11:05:25 +08:00 committed by GitHub
parent dbf128959d
commit d3844ce04c
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 91 additions and 46 deletions

View file

@ -1,6 +1,7 @@
mod str_arena;
use std::{
num::NonZeroU16,
ops::{Range, RangeBounds},
sync::{Arc, Mutex, MutexGuard},
};
@ -30,7 +31,7 @@ struct InnerSharedArena {
// It might be better to use RwLock in the future
container_idx_to_id: Mutex<Vec<ContainerID>>,
// 0 stands for unknown, 1 stands for root containers
depth: Mutex<Vec<u16>>,
depth: Mutex<Vec<Option<NonZeroU16>>>,
container_id_to_idx: Mutex<FxHashMap<ContainerID, ContainerIdx>>,
/// The parent of each container.
parents: Mutex<FxHashMap<ContainerIdx, Option<ContainerIdx>>>,
@ -70,9 +71,9 @@ impl SharedArena {
if id.is_root() {
self.inner.root_c_idx.lock().unwrap().push(idx);
self.inner.parents.lock().unwrap().insert(idx, None);
self.inner.depth.lock().unwrap().push(1);
self.inner.depth.lock().unwrap().push(NonZeroU16::new(1));
} else {
self.inner.depth.lock().unwrap().push(0);
self.inner.depth.lock().unwrap().push(None);
}
idx
}
@ -146,13 +147,13 @@ impl SharedArena {
match parent {
Some(p) => {
if let Some(d) = get_depth(p, &mut depth, parents) {
depth[child.to_index() as usize] = d + 1;
depth[child.to_index() as usize] = NonZeroU16::new(d.get() + 1);
} else {
depth[child.to_index() as usize] = 0;
depth[child.to_index() as usize] = None;
}
}
None => {
depth[child.to_index() as usize] = 1;
depth[child.to_index() as usize] = NonZeroU16::new(1);
}
}
}
@ -355,7 +356,7 @@ impl SharedArena {
self.inner.root_c_idx.lock().unwrap().clone()
}
pub(crate) fn get_depth(&self, container: ContainerIdx) -> Option<u16> {
pub(crate) fn get_depth(&self, container: ContainerIdx) -> Option<NonZeroU16> {
get_depth(
container,
&mut self.inner.depth.lock().unwrap(),
@ -410,25 +411,25 @@ fn _slice_str(range: Range<usize>, s: &mut StrArena) -> String {
fn get_depth(
target: ContainerIdx,
depth: &mut Vec<u16>,
depth: &mut Vec<Option<NonZeroU16>>,
parents: &FxHashMap<ContainerIdx, Option<ContainerIdx>>,
) -> Option<u16> {
) -> Option<NonZeroU16> {
let mut d = depth[target.to_index() as usize];
if d != 0 {
return Some(d);
if d.is_some() {
return d;
}
let parent = parents.get(&target)?;
match parent {
Some(p) => {
d = get_depth(*p, depth, parents)? + 1;
d = NonZeroU16::new(get_depth(*p, depth, parents)?.get() + 1);
depth[target.to_index() as usize] = d;
}
None => {
depth[target.to_index() as usize] = 1;
d = 1;
depth[target.to_index() as usize] = NonZeroU16::new(1);
d = NonZeroU16::new(1);
}
}
Some(d)
d
}

View file

@ -1,4 +1,4 @@
use std::sync::Arc;
use std::{num::NonZeroU16, sync::Arc};
pub(super) mod tree;
use itertools::Itertools;
@ -36,8 +36,8 @@ use super::{event::InternalContainerDiff, oplog::OpLog};
pub struct DiffCalculator {
/// ContainerIdx -> (depth, calculator)
///
/// if depth == u16::MAX, we need to calculate it again
calculators: FxHashMap<ContainerIdx, (u16, ContainerDiffCalculator)>,
/// if depth is None, we need to calculate it again
calculators: FxHashMap<ContainerIdx, (Option<NonZeroU16>, ContainerDiffCalculator)>,
last_vv: VersionVector,
has_all: bool,
}
@ -141,8 +141,8 @@ impl DiffCalculator {
}
let vv = &mut vv.borrow_mut();
vv.extend_to_include_end_id(ID::new(change.peer(), op.counter));
let depth = oplog.arena.get_depth(op.container).unwrap_or(u16::MAX);
let (_, calculator) =
let depth = oplog.arena.get_depth(op.container);
let (old_depth, calculator) =
self.calculators.entry(op.container).or_insert_with(|| {
match op.container.get_type() {
crate::ContainerType::Text => (
@ -169,6 +169,11 @@ impl DiffCalculator {
),
}
});
// checkout use the same diff_calculator, the depth of calculator is not updated
// That may cause the container to be considered deleted
if *old_depth != depth {
*old_depth = depth;
}
if !started_set.contains(&op.container) {
started_set.insert(op.container);
@ -210,7 +215,7 @@ impl DiffCalculator {
// we need to iterate from parents to children. i.e. from smaller depth to larger depth.
let mut new_containers = FxHashSet::default();
let mut container_id_to_depth = FxHashMap::default();
let mut all: Vec<(u16, ContainerIdx)> = if let Some(set) = affected_set {
let mut all: Vec<(Option<NonZeroU16>, ContainerIdx)> = if let Some(set) = affected_set {
// only visit the affected containers
set.into_iter()
.map(|x| {
@ -224,35 +229,30 @@ impl DiffCalculator {
.map(|(x, (depth, _))| (*depth, *x))
.collect()
};
let mut are_rest_containers_deleted = false;
let mut ans = FxHashMap::default();
while !all.is_empty() {
// sort by depth and lamport, ensure we iterate from top to bottom
all.sort_by_key(|x| x.0);
let len = all.len();
for (_, idx) in std::mem::take(&mut all) {
if ans.contains_key(&idx) {
continue;
}
let (depth, calc) = self.calculators.get_mut(&idx).unwrap();
if *depth == u16::MAX && !are_rest_containers_deleted {
if let Some(d) = oplog.arena.get_depth(idx) {
if d != *depth {
*depth = d;
all.push((*depth, idx));
continue;
}
if depth.is_none() {
let d = oplog.arena.get_depth(idx);
if d != *depth {
*depth = d;
all.push((*depth, idx));
continue;
}
}
let id = oplog.arena.idx_to_id(idx).unwrap();
let bring_back = new_containers.remove(&id);
let diff = calc.calculate_diff(oplog, before, after, |c| {
if !are_rest_containers_deleted {
new_containers.insert(c.clone());
container_id_to_depth.insert(c.clone(), depth.saturating_add(1));
oplog.arena.register_container(c);
}
new_containers.insert(c.clone());
container_id_to_depth.insert(c.clone(), depth.and_then(|d| d.checked_add(1)));
oplog.arena.register_container(c);
});
if !diff.is_empty() || bring_back {
ans.insert(
@ -262,24 +262,15 @@ impl DiffCalculator {
InternalContainerDiff {
idx,
bring_back,
is_container_deleted: are_rest_containers_deleted,
is_container_deleted: false,
diff: Some(diff.into()),
},
),
);
}
}
if len == all.len() {
debug_log::debug_log!("Container might be deleted");
debug_log::debug_dbg!(&all);
for (_, idx) in all.iter() {
debug_log::debug_dbg!(oplog.arena.get_container_id(*idx));
}
// we still emit the event of deleted container
are_rest_containers_deleted = true;
}
}
while !new_containers.is_empty() {
for id in std::mem::take(&mut new_containers) {
let Some(idx) = oplog.arena.id_to_idx(&id) else {

View file

@ -1,7 +1,10 @@
use std::sync::{atomic::AtomicBool, Arc, Mutex};
use fxhash::FxHashMap;
use loro_common::{ContainerID, ContainerType, LoroResult, LoroValue, ID};
use loro_internal::{
delta::ResolvedMapValue,
event::Diff,
handler::{Handler, TextDelta, ValueOrContainer},
version::Frontiers,
ApplyDiff, LoroDoc, ToJson,
@ -829,3 +832,53 @@ fn state_may_deadlock_when_import() {
doc.import(&doc.export_snapshot()).unwrap();
})
}
#[test]
fn missing_event_when_checkout() {
let doc = LoroDoc::new_auto_commit();
doc.checkout(&doc.oplog_frontiers()).unwrap();
let value = Arc::new(Mutex::new(FxHashMap::default()));
let map = value.clone();
doc.subscribe(
&ContainerID::new_root("tree", ContainerType::Tree),
Arc::new(move |e| {
let mut v = map.lock().unwrap();
for container_diff in e.events.iter() {
let from_children =
container_diff.id != ContainerID::new_root("tree", ContainerType::Tree);
if from_children {
if let Diff::Map(map) = &container_diff.diff {
for (k, ResolvedMapValue { value, .. }) in map.updated.iter() {
match value {
Some(value) => {
v.insert(
k.to_string(),
*value.as_value().unwrap().as_i64().unwrap(),
);
}
None => {
v.remove(&k.to_string());
}
}
}
}
}
}
}),
);
let doc2 = LoroDoc::new_auto_commit();
let tree = doc2.get_tree("tree");
let node = tree.create(None).unwrap();
let _ = tree.create(None).unwrap();
let meta = tree.get_meta(node).unwrap();
meta.insert("a", 0).unwrap();
doc.import(&doc2.export_from(&doc.oplog_vv())).unwrap();
doc.attach();
meta.insert("b", 1).unwrap();
doc.checkout(&doc.oplog_frontiers()).unwrap();
doc.import(&doc2.export_from(&doc.oplog_vv())).unwrap();
// checkout use the same diff_calculator, the depth of calculator is not updated
doc.attach();
assert!(value.lock().unwrap().contains_key("b"));
}