From 352ddc1c11a633cdbcffd6353b630f738d6e7fef Mon Sep 17 00:00:00 2001 From: Zixuan Chen Date: Tue, 25 Oct 2022 01:33:17 +0800 Subject: [PATCH] fix: common ancestor step 1 --- Cargo.lock | 1 + crates/loro-core/Cargo.toml | 3 +- crates/loro-core/justfile | 3 +- .../src/container/text/text_container.rs | 18 +- .../loro-core/src/container/text/tracker.rs | 9 +- .../src/container/text/tracker/cursor_map.rs | 9 +- crates/loro-core/src/dag.rs | 270 +++++++++--------- crates/loro-core/src/dag/iter.rs | 1 + crates/loro-core/src/dag/test.rs | 35 ++- crates/loro-core/src/macros.rs | 10 +- crates/loro-core/src/span.rs | 8 + 11 files changed, 195 insertions(+), 172 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 53176bbe..7e9aa99d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -372,6 +372,7 @@ name = "loro-core" version = "0.1.0" dependencies = [ "arbitrary", + "bit-vec", "color-backtrace", "colored", "crdt-list", diff --git a/crates/loro-core/Cargo.toml b/crates/loro-core/Cargo.toml index 8314c436..0a9447f5 100644 --- a/crates/loro-core/Cargo.toml +++ b/crates/loro-core/Cargo.toml @@ -24,6 +24,7 @@ rand = { version = "0.8.5", optional = true } arbitrary = { version = "1.1.7", optional = true } tabled = { version = "0.10.0", optional = true } colored = "2.0.0" +bit-vec = "0.6.3" [dev-dependencies] rand = { version = "0.8.5" } @@ -41,5 +42,5 @@ doctest = false [features] # whether to use list slice instead of raw str in text container slice = [] -fuzzing = ["crdt-list/fuzzing", "slice", "rand", "arbitrary", "tabled"] +fuzzing = ["crdt-list/fuzzing", "rand", "arbitrary", "tabled"] proptest = ["fuzzing"] diff --git a/crates/loro-core/justfile b/crates/loro-core/justfile index ca88379d..4b6fbe94 100644 --- a/crates/loro-core/justfile +++ b/crates/loro-core/justfile @@ -21,9 +21,8 @@ fuzz-yata-long: cargo fuzz run yata -- -max_total_time=300 -max_len=4000 -jobs=2 quick-fuzz: - cargo fuzz run single_client_text -- -max_total_time=10 -max_len=1000 & cargo fuzz run yata -- -max_total_time=10 -max_len=1000 & - cargo fuzz run two_client_text -- -max_total_time=10 -max_len=1000 + cargo fuzz run text -- -max_total_time=10 -max_len=1000 flame: cargo flamegraph --example test --features=fuzzing --root diff --git a/crates/loro-core/src/container/text/text_container.rs b/crates/loro-core/src/container/text/text_container.rs index 8790e540..bca927b1 100644 --- a/crates/loro-core/src/container/text/text_container.rs +++ b/crates/loro-core/src/container/text/text_container.rs @@ -5,7 +5,7 @@ use crate::{ container::{list::list_op::ListOp, Container, ContainerID, ContainerType}, dag::DagUtils, debug_log, - id::ID, + id::{Counter, ID}, log_store::LogStoreWeakRef, op::{InsertContent, Op, OpContent}, smstring::SmString, @@ -132,7 +132,6 @@ impl Container for TextContainer { } // TODO: move main logic to tracker module - // TODO: we don't need op proxy, only ids are enough fn apply(&mut self, id_span: IdSpan, store: &LogStore) { debug_log!("APPLY ENTRY client={}", store.this_client_id); let new_op_id = id_span.id_last(); @@ -145,7 +144,7 @@ impl Container for TextContainer { latest_head.push(new_op_id); // println!("{}", store.mermaid()); debug_log!( - "START FROM {:?} new_op_id={} self.head={:?}", + "START FROM HEADS={:?} new_op_id={} self.head={:?}", &common_ancestors, new_op_id, &self.head @@ -155,21 +154,22 @@ impl Container for TextContainer { let head = if true { debug_log!("NewTracker"); // FIXME use common ancestors - // self.tracker = Tracker::new(common_ancestors_vv); - // common_ancestors - self.tracker = Tracker::new(Default::default(), 0); - smallvec![] + self.tracker = Tracker::new(common_ancestors_vv, Counter::MAX / 2); + common_ancestors + // self.tracker = Tracker::new(Default::default(), 0); + // smallvec![] } else { debug_log!("OldTracker"); - let vv = self.tracker.all_vv().clone(); - self.tracker.checkout(vv); + self.tracker.checkout_to_latest(); self.tracker.all_vv().get_head() }; // stage 1 // TODO: need a better mechanism to track the head (KEEP IT IN TRACKER?) let path = store.find_path(&head, &latest_head); + dbg!(&path); for iter in store.iter_partial(&head, path.right) { + dbg!(&iter); self.tracker.retreat(&iter.retreat); self.tracker.forward(&iter.forward); // TODO: avoid this clone diff --git a/crates/loro-core/src/container/text/tracker.rs b/crates/loro-core/src/container/text/tracker.rs index a8322e98..995c7b49 100644 --- a/crates/loro-core/src/container/text/tracker.rs +++ b/crates/loro-core/src/container/text/tracker.rs @@ -134,7 +134,13 @@ impl Tracker { let diff = self.head_vv.diff(&vv); self.retreat(&diff.left); self.forward(&diff.right); - assert_eq!(self.head_vv, vv); + debug_assert_eq!(self.head_vv, vv); + } + + pub fn checkout_to_latest(&mut self) { + let diff = self.head_vv.diff(&self.all_vv); + self.forward(&diff.right); + debug_assert_eq!(self.head_vv, self.all_vv); } pub fn forward(&mut self, spans: &IdSpanVector) { @@ -241,6 +247,7 @@ impl Tracker { slice.clone(), ); debug_log!("INSERT YSPAN={}", format!("{:#?}", &yspan).red()); + dbg!(&self.content); // SAFETY: we know this is safe because in [YataImpl::insert_after] there is no access to shared elements unsafe { crdt_list::yata::integrate::(self, yspan) }; } diff --git a/crates/loro-core/src/container/text/tracker/cursor_map.rs b/crates/loro-core/src/container/text/tracker/cursor_map.rs index 47765c75..89eb3d9c 100644 --- a/crates/loro-core/src/container/text/tracker/cursor_map.rs +++ b/crates/loro-core/src/container/text/tracker/cursor_map.rs @@ -272,7 +272,14 @@ impl CursorMap { if cfg!(test) { let insert_len: usize = inserts.iter().map(|x| x.1.len).sum(); let del_len: usize = deletes.iter().map(|x| x.1.atom_len()).sum(); - assert_eq!(insert_len + del_len, span.content_len()); + assert_eq!( + insert_len + del_len, + span.content_len(), + "inserts={:#?}\ndeletes={:#?}\nspan={:#?}", + &inserts, + &deletes, + span + ); } IdSpanQueryResult { inserts, deletes } diff --git a/crates/loro-core/src/dag.rs b/crates/loro-core/src/dag.rs index fc4bd083..2dac567f 100644 --- a/crates/loro-core/src/dag.rs +++ b/crates/loro-core/src/dag.rs @@ -9,8 +9,10 @@ use std::{ collections::{BinaryHeap, HashMap}, fmt::Debug, + time::Instant, }; +use bit_vec::BitVec; use fxhash::{FxHashMap, FxHashSet}; use rle::{HasLength, Sliceable}; use smallvec::SmallVec; @@ -277,7 +279,7 @@ impl<'a> OrdIdSpan<'a> { OrdIdSpan { id: self.id, lamport: self.lamport, - deps: &self.deps[0..0], + deps: &[], len: 1, } } @@ -289,7 +291,15 @@ where D: DagNode + 'a, F: Fn(ID) -> Option<&'a D>, { - _find_common_ancestor(get, a_id, b_id, &mut |_, _| {}, false) + let mut ids = Vec::with_capacity(a_id.len() + b_id.len()); + for id in a_id { + ids.push(*id); + } + for id in b_id { + ids.push(*id); + } + + _find_common_ancestor_new(get, &ids) .into_iter() .map(|x| ID::new(x.0, x.1)) .collect() @@ -301,7 +311,7 @@ fn _find_common_ancestor<'a, F, D, G>( a_ids: &[ID], b_ids: &[ID], notify: &mut G, - deep: bool, + find_path: bool, ) -> FxHashMap where D: DagNode + 'a, @@ -407,7 +417,7 @@ where if a_count == 0 && b_count == 0 - && (!deep || min.is_none() || &node <= min.as_ref().unwrap()) + && (!find_path || min.is_none() || &node <= min.as_ref().unwrap()) { if node_type != NodeType::Shared { ans.clear(); @@ -425,7 +435,7 @@ where ans.clear(); break; } - if node.deps.is_empty() && !deep { + if node.deps.is_empty() && !find_path { if node.len == 1 { ans.clear(); break; @@ -453,135 +463,6 @@ where ans } -fn find_common_ancestor_old<'a, F, G, D>( - get: &'a F, - a_id: ID, - b_id: ID, - mut on_found: G, -) -> Option -where - D: DagNode + 'a, - F: Fn(ID) -> Option<&'a D>, - G: FnMut(ID, &FxHashMap, &FxHashMap), -{ - if a_id.client_id == b_id.client_id { - if a_id.counter <= b_id.counter { - Some(a_id) - } else { - Some(b_id) - } - } else { - let mut _a_vv: HashMap = FxHashMap::default(); - let mut _b_vv: HashMap = FxHashMap::default(); - // Invariant: every op id inserted to the a_heap is a key in a_path map, except for a_id - let mut _a_heap: BinaryHeap = BinaryHeap::new(); - // Likewise - let mut _b_heap: BinaryHeap = BinaryHeap::new(); - // FxHashMap is used to track the deps path of each node - let mut _a_path: FxHashMap = FxHashMap::default(); - let mut _b_path: FxHashMap = FxHashMap::default(); - { - let a = get(a_id).unwrap(); - let b = get(b_id).unwrap(); - _a_heap.push(OrdIdSpan { - id: a_id, - lamport: a.get_lamport_from_counter(a_id.counter), - deps: a.deps(), - len: 1, - }); - _b_heap.push(OrdIdSpan { - id: b_id, - lamport: b.get_lamport_from_counter(b_id.counter), - deps: b.deps(), - len: 1, - }); - _a_vv.insert(a_id.client_id, a_id.counter + 1); - _b_vv.insert(b_id.client_id, b_id.counter + 1); - } - - while !_a_heap.is_empty() || !_b_heap.is_empty() { - let (a_heap, b_heap, a_vv, b_vv, a_path, b_path, _swapped) = if _a_heap.is_empty() - || (_a_heap.peek().map(|x| x.lamport).unwrap_or(0) - < _b_heap.peek().map(|x| x.lamport).unwrap_or(0)) - { - // swap - ( - &mut _b_heap, - &mut _a_heap, - &mut _b_vv, - &mut _a_vv, - &mut _b_path, - &mut _a_path, - true, - ) - } else { - ( - &mut _a_heap, - &mut _b_heap, - &mut _a_vv, - &mut _b_vv, - &mut _a_path, - &mut _b_path, - false, - ) - }; - - while !a_heap.is_empty() - && a_heap.peek().map(|x| x.lamport).unwrap_or(0) - >= b_heap.peek().map(|x| x.lamport).unwrap_or(0) - { - let a = a_heap.pop().unwrap(); - let id = a.id; - if let Some(counter_end) = b_vv.get(&id.client_id) { - if id.counter < *counter_end { - b_path - .entry(id) - .or_insert_with(|| ID::new(id.client_id, counter_end - 1)); - - on_found(id, &_a_path, &_b_path); - return Some(id); - } - } - - #[cfg(debug_assertions)] - { - if let Some(v) = a_vv.get(&a.id.client_id) { - assert!(*v > a.id.counter) - } - } - - for &dep_id in a.deps { - if a_path.contains_key(&dep_id) { - continue; - } - - let dep = get(dep_id).unwrap(); - a_heap.push(OrdIdSpan { - id: dep_id, - lamport: dep.get_lamport_from_counter(dep_id.counter), - deps: dep.deps(), - len: 1, - }); - a_path.insert(dep_id, a.id); - if dep.id_start() != dep_id { - a_path.insert(dep.id_start(), dep_id); - } - - if let Some(v) = a_vv.get_mut(&dep_id.client_id) { - if *v < dep_id.counter + 1 { - *v = dep_id.counter + 1; - } - } else { - a_vv.insert(dep_id.client_id, dep_id.counter + 1); - } - } - } - } - - None - } -} - fn update_frontier(frontier: &mut Vec, new_node_id: ID, new_node_deps: &[ID]) { frontier.retain(|x| { if x.client_id == new_node_id.client_id && x.counter <= new_node_id.counter { @@ -602,3 +483,124 @@ fn update_frontier(frontier: &mut Vec, new_node_id: ID, new_node_deps: &[ID] frontier.push(new_node_id); } } + +// TODO: BitVec may be too slow here +fn _find_common_ancestor_new<'a, F, D>(get: &'a F, ids: &[ID]) -> FxHashMap +where + D: DagNode + 'a, + F: Fn(ID) -> Option<&'a D>, +{ + let mut ans = FxHashMap::default(); + if ids.len() <= 1 { + for id in ids { + ans.insert(id.client_id, id.counter); + } + + return ans; + } + + let mut queue: BinaryHeap<(OrdIdSpan, BitVec)> = BinaryHeap::new(); + let mut shared_num = 0; + let mut min = None; + let mut visited: HashMap = FxHashMap::default(); + for (i, id) in ids.iter().enumerate() { + let mut bitmap = BitVec::from_elem(ids.len(), false); + bitmap.set(i, true); + queue.push((OrdIdSpan::from_dag_node(*id, get).unwrap(), bitmap)); + } + + while let Some((this_node, mut this_map)) = queue.pop() { + let is_shared_from_start = this_map.all(); + let mut is_shared = is_shared_from_start; + + if is_shared_from_start { + shared_num -= 1; + } + + // pop the same node in the queue + while let Some((other_node, other_map)) = queue.peek() { + if this_node.id_span() == other_node.id_span() { + if other_map.all() { + shared_num -= 1; + } + + if !is_shared && this_map.or(other_map) && this_map.all() { + is_shared = true; + ans.insert(this_node.id.client_id, other_node.ctr_last()); + } + + queue.pop(); + } else { + break; + } + } + + // detect whether client is visited by other + if let Some((ctr, visited_map)) = visited.get_mut(&this_node.id.client_id) { + debug_assert!(*ctr >= this_node.id_last().counter); + if visited_map.all() { + is_shared = true; + } else if !is_shared && visited_map.or(&this_map) && visited_map.all() { + ans.insert(this_node.id.client_id, this_node.id_last().counter); + is_shared = true; + } + } else { + visited.insert( + this_node.id.client_id, + (this_node.id_last().counter, this_map.clone()), + ); + } + + if shared_num == queue.len() && (min.is_none() || &this_node <= min.as_ref().unwrap()) { + if !is_shared { + ans.clear(); + } + + break; + } + + for &dep_id in this_node.deps { + let node = OrdIdSpan::from_dag_node(dep_id, get).unwrap(); + if let Some(min) = &mut min { + let node_start = node.get_min(); + if node_start < *min { + *min = node_start; + } + } else { + min = Some(node.get_min()) + } + + queue.push((node, this_map.clone())); + } + + if is_shared { + shared_num += this_node.deps.len() + } + + if !is_shared { + if queue.is_empty() { + ans.clear(); + break; + } + + if this_node.deps.is_empty() { + if this_node.len == 1 { + ans.clear(); + break; + } + + queue.push(( + OrdIdSpan { + deps: &[], + id: this_node.id, + len: this_node.len - 1, + lamport: this_node.lamport, + }, + this_map, + )); + } + } + } + + ans +} diff --git a/crates/loro-core/src/dag/iter.rs b/crates/loro-core/src/dag/iter.rs index 6453ec27..4f793c90 100644 --- a/crates/loro-core/src/dag/iter.rs +++ b/crates/loro-core/src/dag/iter.rs @@ -276,6 +276,7 @@ impl<'a, T: DagNode + 'a, D: Dag> Iterator for DagPartialIter<'a, D> { } else { smallvec::smallvec![node.id_start().inc(slice_from - 1)] }; + dbg!(&self.frontier, &deps); let path = self.dag.find_path(&self.frontier, &deps); // NOTE: we expect user to update the tracker, to apply node, after visiting the node self.frontier = smallvec::smallvec![node.id_start().inc(slice_end - 1)]; diff --git a/crates/loro-core/src/dag/test.rs b/crates/loro-core/src/dag/test.rs index a7c3bb24..22050bae 100644 --- a/crates/loro-core/src/dag/test.rs +++ b/crates/loro-core/src/dag/test.rs @@ -725,8 +725,7 @@ mod find_common_ancestors { let mut a = TestDag::new(0); let mut b = TestDag::new(1); a.push(4); - b.push(4); - b.push(5); + b.push(9); b.merge(&a); b.frontier.retain(|x| x.client_id == 1); let k = b.nodes.get_mut(&1).unwrap(); @@ -767,7 +766,7 @@ mod find_common_ancestors { a0.push(1); a1.merge(&a2); a1.merge(&a0); - // println!("{}", a1.mermaid()); + println!("{}", a1.mermaid()); assert_eq!( a1.find_common_ancestor(&[ID::new(0, 3)], &[ID::new(1, 4)]) .first() @@ -775,13 +774,10 @@ mod find_common_ancestors { None ); assert_eq!( - a1.find_common_ancestor( - &[ID::new(0, 3), ID::new(1, 2), ID::new(2, 2)], - &[ID::new(1, 4)] - ) - .into_iter() - .collect::>(), - vec![ID::new(1, 2), ID::new(2, 2)] + a1.find_common_ancestor(&[ID::new(2, 3)], &[ID::new(1, 3)]) + .into_iter() + .collect::>(), + vec![ID::new(2, 2)] ); } } @@ -971,8 +967,16 @@ mod find_common_ancestors_proptest { let (target, dag): (&mut TestDag, &mut TestDag) = unsafe_array_mut_ref!(dags, [target, i]); target.merge(dag); - target.push(1); } + } + + let mut expected = Vec::with_capacity(N); + for i in 0..N { + dags[i].push(1); + expected.push(dags[i].frontier[0]); + } + + for target in 0..N { for i in N..dags.len() { let (target, dag): (&mut TestDag, &mut TestDag) = unsafe_array_mut_ref!(dags, [target, i]); @@ -980,13 +984,6 @@ mod find_common_ancestors_proptest { } } - let mut expected = Vec::with_capacity(N); - #[allow(clippy::needless_range_loop)] - for i in 0..N { - dags[i].push(1); - expected.push(dags[i].frontier[0]); - } - let mut merged_to_even = [false; N]; let mut merged_to_odd = [false; N]; @@ -1069,7 +1066,7 @@ mod find_common_ancestors_proptest { actual.sort(); let actual = actual.iter().copied().collect::>(); if actual != expected { - println!("{}", dag_to_mermaid(dag_a)); + println!("{}", dag_a.mermaid()); } prop_assert_eq!(actual, expected); diff --git a/crates/loro-core/src/macros.rs b/crates/loro-core/src/macros.rs index b34e6a00..119534c3 100644 --- a/crates/loro-core/src/macros.rs +++ b/crates/loro-core/src/macros.rs @@ -29,11 +29,11 @@ macro_rules! debug_log { // } }; ($($arg:tt)*) => {{ - // if cfg!(test) { - - // print!("{}:{}\t", file!().purple(), line!().to_string().purple()); - // println!($($arg)*); - // } + if cfg!(test) { + use colored::Colorize; + print!("{}:{}\t", file!().purple(), line!().to_string().purple()); + println!($($arg)*); + } }}; } diff --git a/crates/loro-core/src/span.rs b/crates/loro-core/src/span.rs index 90b6c723..8a8c4466 100644 --- a/crates/loro-core/src/span.rs +++ b/crates/loro-core/src/span.rs @@ -250,6 +250,14 @@ pub trait HasIdSpan: HasId + HasLength { self.id_start().inc(self.content_len() as i32 - 1) } + fn ctr_last(&self) -> Counter { + self.id_start().counter + self.content_len() as i32 - 1 + } + + fn ctr_first(&self) -> Counter { + self.id_start().counter + } + fn contains_id(&self, id: ID) -> bool { let id_start = self.id_start(); if id.client_id != id_start.client_id {