fix: common ancestor step 1

This commit is contained in:
Zixuan Chen 2022-10-25 01:33:17 +08:00
parent 4a608cc958
commit 352ddc1c11
11 changed files with 195 additions and 172 deletions

1
Cargo.lock generated
View file

@ -372,6 +372,7 @@ name = "loro-core"
version = "0.1.0"
dependencies = [
"arbitrary",
"bit-vec",
"color-backtrace",
"colored",
"crdt-list",

View file

@ -24,6 +24,7 @@ rand = { version = "0.8.5", optional = true }
arbitrary = { version = "1.1.7", optional = true }
tabled = { version = "0.10.0", optional = true }
colored = "2.0.0"
bit-vec = "0.6.3"
[dev-dependencies]
rand = { version = "0.8.5" }
@ -41,5 +42,5 @@ doctest = false
[features]
# whether to use list slice instead of raw str in text container
slice = []
fuzzing = ["crdt-list/fuzzing", "slice", "rand", "arbitrary", "tabled"]
fuzzing = ["crdt-list/fuzzing", "rand", "arbitrary", "tabled"]
proptest = ["fuzzing"]

View file

@ -21,9 +21,8 @@ fuzz-yata-long:
cargo fuzz run yata -- -max_total_time=300 -max_len=4000 -jobs=2
quick-fuzz:
cargo fuzz run single_client_text -- -max_total_time=10 -max_len=1000 &
cargo fuzz run yata -- -max_total_time=10 -max_len=1000 &
cargo fuzz run two_client_text -- -max_total_time=10 -max_len=1000
cargo fuzz run text -- -max_total_time=10 -max_len=1000
flame:
cargo flamegraph --example test --features=fuzzing --root

View file

@ -5,7 +5,7 @@ use crate::{
container::{list::list_op::ListOp, Container, ContainerID, ContainerType},
dag::DagUtils,
debug_log,
id::ID,
id::{Counter, ID},
log_store::LogStoreWeakRef,
op::{InsertContent, Op, OpContent},
smstring::SmString,
@ -132,7 +132,6 @@ impl Container for TextContainer {
}
// TODO: move main logic to tracker module
// TODO: we don't need op proxy, only ids are enough
fn apply(&mut self, id_span: IdSpan, store: &LogStore) {
debug_log!("APPLY ENTRY client={}", store.this_client_id);
let new_op_id = id_span.id_last();
@ -145,7 +144,7 @@ impl Container for TextContainer {
latest_head.push(new_op_id);
// println!("{}", store.mermaid());
debug_log!(
"START FROM {:?} new_op_id={} self.head={:?}",
"START FROM HEADS={:?} new_op_id={} self.head={:?}",
&common_ancestors,
new_op_id,
&self.head
@ -155,21 +154,22 @@ impl Container for TextContainer {
let head = if true {
debug_log!("NewTracker");
// FIXME use common ancestors
// self.tracker = Tracker::new(common_ancestors_vv);
// common_ancestors
self.tracker = Tracker::new(Default::default(), 0);
smallvec![]
self.tracker = Tracker::new(common_ancestors_vv, Counter::MAX / 2);
common_ancestors
// self.tracker = Tracker::new(Default::default(), 0);
// smallvec![]
} else {
debug_log!("OldTracker");
let vv = self.tracker.all_vv().clone();
self.tracker.checkout(vv);
self.tracker.checkout_to_latest();
self.tracker.all_vv().get_head()
};
// stage 1
// TODO: need a better mechanism to track the head (KEEP IT IN TRACKER?)
let path = store.find_path(&head, &latest_head);
dbg!(&path);
for iter in store.iter_partial(&head, path.right) {
dbg!(&iter);
self.tracker.retreat(&iter.retreat);
self.tracker.forward(&iter.forward);
// TODO: avoid this clone

View file

@ -134,7 +134,13 @@ impl Tracker {
let diff = self.head_vv.diff(&vv);
self.retreat(&diff.left);
self.forward(&diff.right);
assert_eq!(self.head_vv, vv);
debug_assert_eq!(self.head_vv, vv);
}
pub fn checkout_to_latest(&mut self) {
let diff = self.head_vv.diff(&self.all_vv);
self.forward(&diff.right);
debug_assert_eq!(self.head_vv, self.all_vv);
}
pub fn forward(&mut self, spans: &IdSpanVector) {
@ -241,6 +247,7 @@ impl Tracker {
slice.clone(),
);
debug_log!("INSERT YSPAN={}", format!("{:#?}", &yspan).red());
dbg!(&self.content);
// SAFETY: we know this is safe because in [YataImpl::insert_after] there is no access to shared elements
unsafe { crdt_list::yata::integrate::<YataImpl>(self, yspan) };
}

View file

@ -272,7 +272,14 @@ impl CursorMap {
if cfg!(test) {
let insert_len: usize = inserts.iter().map(|x| x.1.len).sum();
let del_len: usize = deletes.iter().map(|x| x.1.atom_len()).sum();
assert_eq!(insert_len + del_len, span.content_len());
assert_eq!(
insert_len + del_len,
span.content_len(),
"inserts={:#?}\ndeletes={:#?}\nspan={:#?}",
&inserts,
&deletes,
span
);
}
IdSpanQueryResult { inserts, deletes }

View file

@ -9,8 +9,10 @@
use std::{
collections::{BinaryHeap, HashMap},
fmt::Debug,
time::Instant,
};
use bit_vec::BitVec;
use fxhash::{FxHashMap, FxHashSet};
use rle::{HasLength, Sliceable};
use smallvec::SmallVec;
@ -277,7 +279,7 @@ impl<'a> OrdIdSpan<'a> {
OrdIdSpan {
id: self.id,
lamport: self.lamport,
deps: &self.deps[0..0],
deps: &[],
len: 1,
}
}
@ -289,7 +291,15 @@ where
D: DagNode + 'a,
F: Fn(ID) -> Option<&'a D>,
{
_find_common_ancestor(get, a_id, b_id, &mut |_, _| {}, false)
let mut ids = Vec::with_capacity(a_id.len() + b_id.len());
for id in a_id {
ids.push(*id);
}
for id in b_id {
ids.push(*id);
}
_find_common_ancestor_new(get, &ids)
.into_iter()
.map(|x| ID::new(x.0, x.1))
.collect()
@ -301,7 +311,7 @@ fn _find_common_ancestor<'a, F, D, G>(
a_ids: &[ID],
b_ids: &[ID],
notify: &mut G,
deep: bool,
find_path: bool,
) -> FxHashMap<ClientID, Counter>
where
D: DagNode + 'a,
@ -407,7 +417,7 @@ where
if a_count == 0
&& b_count == 0
&& (!deep || min.is_none() || &node <= min.as_ref().unwrap())
&& (!find_path || min.is_none() || &node <= min.as_ref().unwrap())
{
if node_type != NodeType::Shared {
ans.clear();
@ -425,7 +435,7 @@ where
ans.clear();
break;
}
if node.deps.is_empty() && !deep {
if node.deps.is_empty() && !find_path {
if node.len == 1 {
ans.clear();
break;
@ -453,135 +463,6 @@ where
ans
}
fn find_common_ancestor_old<'a, F, G, D>(
get: &'a F,
a_id: ID,
b_id: ID,
mut on_found: G,
) -> Option<ID>
where
D: DagNode + 'a,
F: Fn(ID) -> Option<&'a D>,
G: FnMut(ID, &FxHashMap<ID, ID>, &FxHashMap<ID, ID>),
{
if a_id.client_id == b_id.client_id {
if a_id.counter <= b_id.counter {
Some(a_id)
} else {
Some(b_id)
}
} else {
let mut _a_vv: HashMap<ClientID, Counter, _> = FxHashMap::default();
let mut _b_vv: HashMap<ClientID, Counter, _> = FxHashMap::default();
// Invariant: every op id inserted to the a_heap is a key in a_path map, except for a_id
let mut _a_heap: BinaryHeap<OrdIdSpan> = BinaryHeap::new();
// Likewise
let mut _b_heap: BinaryHeap<OrdIdSpan> = BinaryHeap::new();
// FxHashMap<To, From> is used to track the deps path of each node
let mut _a_path: FxHashMap<ID, ID> = FxHashMap::default();
let mut _b_path: FxHashMap<ID, ID> = FxHashMap::default();
{
let a = get(a_id).unwrap();
let b = get(b_id).unwrap();
_a_heap.push(OrdIdSpan {
id: a_id,
lamport: a.get_lamport_from_counter(a_id.counter),
deps: a.deps(),
len: 1,
});
_b_heap.push(OrdIdSpan {
id: b_id,
lamport: b.get_lamport_from_counter(b_id.counter),
deps: b.deps(),
len: 1,
});
_a_vv.insert(a_id.client_id, a_id.counter + 1);
_b_vv.insert(b_id.client_id, b_id.counter + 1);
}
while !_a_heap.is_empty() || !_b_heap.is_empty() {
let (a_heap, b_heap, a_vv, b_vv, a_path, b_path, _swapped) = if _a_heap.is_empty()
|| (_a_heap.peek().map(|x| x.lamport).unwrap_or(0)
< _b_heap.peek().map(|x| x.lamport).unwrap_or(0))
{
// swap
(
&mut _b_heap,
&mut _a_heap,
&mut _b_vv,
&mut _a_vv,
&mut _b_path,
&mut _a_path,
true,
)
} else {
(
&mut _a_heap,
&mut _b_heap,
&mut _a_vv,
&mut _b_vv,
&mut _a_path,
&mut _b_path,
false,
)
};
while !a_heap.is_empty()
&& a_heap.peek().map(|x| x.lamport).unwrap_or(0)
>= b_heap.peek().map(|x| x.lamport).unwrap_or(0)
{
let a = a_heap.pop().unwrap();
let id = a.id;
if let Some(counter_end) = b_vv.get(&id.client_id) {
if id.counter < *counter_end {
b_path
.entry(id)
.or_insert_with(|| ID::new(id.client_id, counter_end - 1));
on_found(id, &_a_path, &_b_path);
return Some(id);
}
}
#[cfg(debug_assertions)]
{
if let Some(v) = a_vv.get(&a.id.client_id) {
assert!(*v > a.id.counter)
}
}
for &dep_id in a.deps {
if a_path.contains_key(&dep_id) {
continue;
}
let dep = get(dep_id).unwrap();
a_heap.push(OrdIdSpan {
id: dep_id,
lamport: dep.get_lamport_from_counter(dep_id.counter),
deps: dep.deps(),
len: 1,
});
a_path.insert(dep_id, a.id);
if dep.id_start() != dep_id {
a_path.insert(dep.id_start(), dep_id);
}
if let Some(v) = a_vv.get_mut(&dep_id.client_id) {
if *v < dep_id.counter + 1 {
*v = dep_id.counter + 1;
}
} else {
a_vv.insert(dep_id.client_id, dep_id.counter + 1);
}
}
}
}
None
}
}
fn update_frontier(frontier: &mut Vec<ID>, new_node_id: ID, new_node_deps: &[ID]) {
frontier.retain(|x| {
if x.client_id == new_node_id.client_id && x.counter <= new_node_id.counter {
@ -602,3 +483,124 @@ fn update_frontier(frontier: &mut Vec<ID>, new_node_id: ID, new_node_deps: &[ID]
frontier.push(new_node_id);
}
}
// TODO: BitVec may be too slow here
fn _find_common_ancestor_new<'a, F, D>(get: &'a F, ids: &[ID]) -> FxHashMap<ClientID, Counter>
where
D: DagNode + 'a,
F: Fn(ID) -> Option<&'a D>,
{
let mut ans = FxHashMap::default();
if ids.len() <= 1 {
for id in ids {
ans.insert(id.client_id, id.counter);
}
return ans;
}
let mut queue: BinaryHeap<(OrdIdSpan, BitVec)> = BinaryHeap::new();
let mut shared_num = 0;
let mut min = None;
let mut visited: HashMap<ClientID, (Counter, BitVec), _> = FxHashMap::default();
for (i, id) in ids.iter().enumerate() {
let mut bitmap = BitVec::from_elem(ids.len(), false);
bitmap.set(i, true);
queue.push((OrdIdSpan::from_dag_node(*id, get).unwrap(), bitmap));
}
while let Some((this_node, mut this_map)) = queue.pop() {
let is_shared_from_start = this_map.all();
let mut is_shared = is_shared_from_start;
if is_shared_from_start {
shared_num -= 1;
}
// pop the same node in the queue
while let Some((other_node, other_map)) = queue.peek() {
if this_node.id_span() == other_node.id_span() {
if other_map.all() {
shared_num -= 1;
}
if !is_shared && this_map.or(other_map) && this_map.all() {
is_shared = true;
ans.insert(this_node.id.client_id, other_node.ctr_last());
}
queue.pop();
} else {
break;
}
}
// detect whether client is visited by other
if let Some((ctr, visited_map)) = visited.get_mut(&this_node.id.client_id) {
debug_assert!(*ctr >= this_node.id_last().counter);
if visited_map.all() {
is_shared = true;
} else if !is_shared && visited_map.or(&this_map) && visited_map.all() {
ans.insert(this_node.id.client_id, this_node.id_last().counter);
is_shared = true;
}
} else {
visited.insert(
this_node.id.client_id,
(this_node.id_last().counter, this_map.clone()),
);
}
if shared_num == queue.len() && (min.is_none() || &this_node <= min.as_ref().unwrap()) {
if !is_shared {
ans.clear();
}
break;
}
for &dep_id in this_node.deps {
let node = OrdIdSpan::from_dag_node(dep_id, get).unwrap();
if let Some(min) = &mut min {
let node_start = node.get_min();
if node_start < *min {
*min = node_start;
}
} else {
min = Some(node.get_min())
}
queue.push((node, this_map.clone()));
}
if is_shared {
shared_num += this_node.deps.len()
}
if !is_shared {
if queue.is_empty() {
ans.clear();
break;
}
if this_node.deps.is_empty() {
if this_node.len == 1 {
ans.clear();
break;
}
queue.push((
OrdIdSpan {
deps: &[],
id: this_node.id,
len: this_node.len - 1,
lamport: this_node.lamport,
},
this_map,
));
}
}
}
ans
}

View file

@ -276,6 +276,7 @@ impl<'a, T: DagNode + 'a, D: Dag<Node = T>> Iterator for DagPartialIter<'a, D> {
} else {
smallvec::smallvec![node.id_start().inc(slice_from - 1)]
};
dbg!(&self.frontier, &deps);
let path = self.dag.find_path(&self.frontier, &deps);
// NOTE: we expect user to update the tracker, to apply node, after visiting the node
self.frontier = smallvec::smallvec![node.id_start().inc(slice_end - 1)];

View file

@ -725,8 +725,7 @@ mod find_common_ancestors {
let mut a = TestDag::new(0);
let mut b = TestDag::new(1);
a.push(4);
b.push(4);
b.push(5);
b.push(9);
b.merge(&a);
b.frontier.retain(|x| x.client_id == 1);
let k = b.nodes.get_mut(&1).unwrap();
@ -767,7 +766,7 @@ mod find_common_ancestors {
a0.push(1);
a1.merge(&a2);
a1.merge(&a0);
// println!("{}", a1.mermaid());
println!("{}", a1.mermaid());
assert_eq!(
a1.find_common_ancestor(&[ID::new(0, 3)], &[ID::new(1, 4)])
.first()
@ -775,13 +774,10 @@ mod find_common_ancestors {
None
);
assert_eq!(
a1.find_common_ancestor(
&[ID::new(0, 3), ID::new(1, 2), ID::new(2, 2)],
&[ID::new(1, 4)]
)
.into_iter()
.collect::<Vec<_>>(),
vec![ID::new(1, 2), ID::new(2, 2)]
a1.find_common_ancestor(&[ID::new(2, 3)], &[ID::new(1, 3)])
.into_iter()
.collect::<Vec<_>>(),
vec![ID::new(2, 2)]
);
}
}
@ -971,8 +967,16 @@ mod find_common_ancestors_proptest {
let (target, dag): (&mut TestDag, &mut TestDag) =
unsafe_array_mut_ref!(dags, [target, i]);
target.merge(dag);
target.push(1);
}
}
let mut expected = Vec::with_capacity(N);
for i in 0..N {
dags[i].push(1);
expected.push(dags[i].frontier[0]);
}
for target in 0..N {
for i in N..dags.len() {
let (target, dag): (&mut TestDag, &mut TestDag) =
unsafe_array_mut_ref!(dags, [target, i]);
@ -980,13 +984,6 @@ mod find_common_ancestors_proptest {
}
}
let mut expected = Vec::with_capacity(N);
#[allow(clippy::needless_range_loop)]
for i in 0..N {
dags[i].push(1);
expected.push(dags[i].frontier[0]);
}
let mut merged_to_even = [false; N];
let mut merged_to_odd = [false; N];
@ -1069,7 +1066,7 @@ mod find_common_ancestors_proptest {
actual.sort();
let actual = actual.iter().copied().collect::<Vec<_>>();
if actual != expected {
println!("{}", dag_to_mermaid(dag_a));
println!("{}", dag_a.mermaid());
}
prop_assert_eq!(actual, expected);

View file

@ -29,11 +29,11 @@ macro_rules! debug_log {
// }
};
($($arg:tt)*) => {{
// if cfg!(test) {
// print!("{}:{}\t", file!().purple(), line!().to_string().purple());
// println!($($arg)*);
// }
if cfg!(test) {
use colored::Colorize;
print!("{}:{}\t", file!().purple(), line!().to_string().purple());
println!($($arg)*);
}
}};
}

View file

@ -250,6 +250,14 @@ pub trait HasIdSpan: HasId + HasLength {
self.id_start().inc(self.content_len() as i32 - 1)
}
fn ctr_last(&self) -> Counter {
self.id_start().counter + self.content_len() as i32 - 1
}
fn ctr_first(&self) -> Counter {
self.id_start().counter
}
fn contains_id(&self, id: ID) -> bool {
let id_start = self.id_start();
if id.client_id != id_start.client_id {