From c18cec15a90b2aba6580f924b8eef1a1c7c72cc5 Mon Sep 17 00:00:00 2001 From: Zixuan Chen Date: Fri, 13 Sep 2024 18:39:44 +0800 Subject: [PATCH] fix: lazy load dag node --- Cargo.lock | 23 + crates/fuzz/Cargo.toml | 1 + crates/fuzz/src/actor.rs | 1 + crates/fuzz/tests/test.rs | 467 +++++++++++++++++++++ crates/loro-internal/src/loro.rs | 3 - crates/loro-internal/src/oplog.rs | 3 - crates/loro-internal/src/oplog/loro_dag.rs | 54 ++- 7 files changed, 528 insertions(+), 24 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 49f093cd..03d8b15a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -564,6 +564,12 @@ dependencies = [ "thousands", ] +[[package]] +name = "diff" +version = "0.1.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "56254986775e3233ffa9c4d7d3faaf6d36a2c09d30b20687e9f88bc8bafc16c8" + [[package]] name = "either" version = "1.9.0" @@ -696,6 +702,7 @@ dependencies = [ "loro 0.16.2 (git+https://github.com/loro-dev/loro.git?tag=loro-crdt@0.16.7)", "loro 0.16.2 (git+https://github.com/loro-dev/loro.git?rev=90470658435ec4c62b5af59ebb82fe9e1f5aa761)", "num_cpus", + "pretty_assertions", "rand", "rayon", "serde_json", @@ -1726,6 +1733,16 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "925383efa346730478fb4838dbe9137d2a47675ad789c546d150a6e1dd4ab31c" +[[package]] +name = "pretty_assertions" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "af7cee1a6c8a5b9208b3cb1061f10c0cb689087b3d8ce85fb9d2dd7a29b6ba66" +dependencies = [ + "diff", + "yansi", +] + [[package]] name = "proc-macro-error" version = "1.0.4" @@ -2816,6 +2833,12 @@ version = "0.8.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6a5cbf750400958819fb6178eaa83bee5cd9c29a26a40cc241df8c70fdd46984" +[[package]] +name = "yansi" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09041cd90cf85f7f8b2df60c646f853b7f535ce68f85244eb6731cf89fa498ec" + [[package]] name = "zerocopy" version = "0.7.35" diff --git a/crates/fuzz/Cargo.toml b/crates/fuzz/Cargo.toml index b0ed0e50..0c04eefc 100644 --- a/crates/fuzz/Cargo.toml +++ b/crates/fuzz/Cargo.toml @@ -23,6 +23,7 @@ num_cpus = "1.16.0" rayon = "1.10.0" bytes = "1" ensure-cov = "0.1.0" +pretty_assertions = "1.4.0" [dev-dependencies] ctor = "0.2" diff --git a/crates/fuzz/src/actor.rs b/crates/fuzz/src/actor.rs index 05628ec7..1bc97c4c 100644 --- a/crates/fuzz/src/actor.rs +++ b/crates/fuzz/src/actor.rs @@ -12,6 +12,7 @@ use loro::{ Container, ContainerID, ContainerType, Frontiers, LoroDoc, LoroError, LoroValue, PeerID, UndoManager, ID, }; +use pretty_assertions::assert_eq; use rand::{rngs::StdRng, Rng, SeedableRng}; use tracing::info_span; diff --git a/crates/fuzz/tests/test.rs b/crates/fuzz/tests/test.rs index c98549ea..bb4e45f9 100644 --- a/crates/fuzz/tests/test.rs +++ b/crates/fuzz/tests/test.rs @@ -8936,6 +8936,473 @@ fn fast_snapshot_4() { ) } +#[test] +fn fast_snapshot_5() { + test_multi_sites( + 5, + vec![FuzzTarget::All], + &mut [ + Handle { + site: 25, + target: 25, + container: 193, + action: Generic(GenericAction { + value: I32(1644825), + bool: false, + key: 393216, + pos: 27487790694400, + length: 1808504320951916800, + prop: 18446744073709551385, + }), + }, + Handle { + site: 25, + target: 25, + container: 25, + action: Generic(GenericAction { + value: Container(Unknown(255)), + bool: true, + key: 436207615, + pos: 1808504320951916825, + length: 16131858542891077913, + prop: 18410398479363858399, + }), + }, + SyncAll, + Handle { + site: 25, + target: 34, + container: 25, + action: Generic(GenericAction { + value: Container(Unknown(255)), + bool: true, + key: 4294967295, + pos: 18446744073709551615, + length: 18446744073709551615, + prop: 18381750949675392991, + }), + }, + Handle { + site: 25, + target: 25, + container: 25, + action: Generic(GenericAction { + value: I32(-538976367), + bool: true, + key: 685760479, + pos: 16131858456979185696, + length: 1811037595742312729, + prop: 1849036717598251289, + }), + }, + Handle { + site: 0, + target: 0, + container: 0, + action: Generic(GenericAction { + value: I32(421068800), + bool: true, + key: 432014617, + pos: 18446744073694419225, + length: 18446744073709551615, + prop: 16131858542891098079, + }), + }, + Handle { + site: 25, + target: 25, + container: 25, + action: Generic(GenericAction { + value: Container(Unknown(223)), + bool: true, + key: 538978527, + pos: 1808722877991345952, + length: 1808504359606622489, + prop: 1808504939427207449, + }), + }, + Handle { + site: 0, + target: 0, + container: 0, + action: Generic(GenericAction { + value: I32(421075225), + bool: true, + key: 421075392, + pos: 18446744073709551513, + length: 16131858541569507327, + prop: 16131858542891098079, + }), + }, + Handle { + site: 25, + target: 255, + container: 255, + action: Generic(GenericAction { + value: Container(Text), + bool: true, + key: 4279834905, + pos: 10455415605511192575, + length: 2945318833950285791, + prop: 16131858456979185696, + }), + }, + Handle { + site: 223, + target: 40, + container: 32, + action: Generic(GenericAction { + value: Container(Text), + bool: true, + key: 421075225, + pos: 1808504320951916834, + length: 7064470003718569, + prop: 0, + }), + }, + Handle { + site: 192, + target: 25, + container: 25, + action: Generic(GenericAction { + value: Container(Unknown(255)), + bool: true, + key: 3750828543, + pos: 16131858542891098079, + length: 18446744073170575327, + prop: 18446744073709551615, + }), + }, + Handle { + site: 25, + target: 25, + container: 25, + action: Generic(GenericAction { + value: Container(Unknown(25)), + bool: true, + key: 421075225, + pos: 16131858204548733209, + length: 2314885568395206623, + prop: 1808505174690352095, + }), + }, + Handle { + site: 25, + target: 25, + container: 25, + action: Generic(GenericAction { + value: Container(Text), + bool: true, + key: 25, + pos: 18446462598732840960, + length: 18446744073709551615, + prop: 18446744073709551615, + }), + }, + SyncAll, + Handle { + site: 25, + target: 25, + container: 25, + action: Generic(GenericAction { + value: Container(Unknown(223)), + bool: true, + key: 538976296, + pos: 1808505174690352095, + length: 1808504321102911769, + prop: 1808504323367835929, + }), + }, + Handle { + site: 0, + target: 0, + container: 0, + action: Generic(GenericAction { + value: I32(435231001), + bool: true, + key: 421075392, + pos: 18446744073709551385, + length: 16131893865241116671, + prop: 8319119876378817395, + }), + }, + Undo { + site: 115, + op_len: 1936946035, + }, + Handle { + site: 25, + target: 25, + container: 25, + action: Generic(GenericAction { + value: I32(-544138983), + bool: true, + key: 3755991007, + pos: 16126228219801118943, + length: 1808504320951967711, + prop: 1808504320951916834, + }), + }, + Handle { + site: 0, + target: 0, + container: 0, + action: Generic(GenericAction { + value: I32(421117957), + bool: true, + key: 4294967065, + pos: 18446744073709551615, + length: 16131858542891106303, + prop: 18446744073170575327, + }), + }, + Handle { + site: 25, + target: 255, + container: 255, + action: Generic(GenericAction { + value: Container(Text), + bool: true, + key: 421075225, + pos: 16131858541569448217, + length: 16077885992209473503, + prop: 1808504324286832587, + }), + }, + Handle { + site: 25, + target: 25, + container: 25, + action: Generic(GenericAction { + value: I32(421075225), + bool: true, + key: 0, + pos: 1808504213156659200, + length: 18417779746705245465, + prop: 18446744073709551615, + }), + }, + Handle { + site: 255, + target: 255, + container: 255, + action: Generic(GenericAction { + value: Container(Text), + bool: true, + key: 421075225, + pos: 16131858542885935385, + length: 14690495831856439263, + prop: 1808504320964943839, + }), + }, + Handle { + site: 25, + target: 25, + container: 25, + action: Generic(GenericAction { + value: I32(421075225), + bool: false, + key: 0, + pos: 18446744069414584320, + length: 18446744073706012671, + prop: 18446744073709551615, + }), + }, + Handle { + site: 25, + target: 25, + container: 0, + action: Generic(GenericAction { + value: I32(421075225), + bool: true, + key: 4294967065, + pos: 18446744073709551615, + length: 1808758200342674943, + prop: 18446490194318792985, + }), + }, + Handle { + site: 25, + target: 25, + container: 25, + action: Generic(GenericAction { + value: Container(Unknown(25)), + bool: true, + key: 421075225, + pos: 16131858204548733209, + length: 2314885568395206623, + prop: 1808505174690352095, + }), + }, + Handle { + site: 25, + target: 25, + container: 25, + action: Generic(GenericAction { + value: Container(Text), + bool: true, + key: 25, + pos: 1808476725365964800, + length: 1808505037875966233, + prop: 18446744073709551385, + }), + }, + Undo { + site: 115, + op_len: 1936946035, + }, + Handle { + site: 25, + target: 25, + container: 25, + action: Generic(GenericAction { + value: Container(Unknown(223)), + bool: true, + key: 538976486, + pos: 1808505174690352095, + length: 1808504321102911769, + prop: 1808504323367835929, + }), + }, + Handle { + site: 0, + target: 0, + container: 0, + action: Generic(GenericAction { + value: I32(421117957), + bool: true, + key: 4294967065, + pos: 18446744073709551615, + length: 16131858542891106303, + prop: 18446744073170575327, + }), + }, + Handle { + site: 25, + target: 255, + container: 255, + action: Generic(GenericAction { + value: Container(Text), + bool: true, + key: 421075225, + pos: 16131858541569448217, + length: 16077885992209473503, + prop: 1808504324286832587, + }), + }, + Handle { + site: 255, + target: 255, + container: 255, + action: Generic(GenericAction { + value: Container(Text), + bool: true, + key: 421075225, + pos: 16131858542885935385, + length: 1808504320964943871, + prop: 18446744073709551615, + }), + }, + Handle { + site: 25, + target: 25, + container: 25, + action: Generic(GenericAction { + value: Container(Unknown(223)), + bool: true, + key: 538976296, + pos: 1808505174690352095, + length: 1808504321102911769, + prop: 1808504323367835929, + }), + }, + Handle { + site: 0, + target: 0, + container: 0, + action: Generic(GenericAction { + value: Container(Unknown(255)), + bool: true, + key: 4294967295, + pos: 65535, + length: 1808476725365964800, + prop: 1801439851369273625, + }), + }, + Handle { + site: 25, + target: 25, + container: 255, + action: Generic(GenericAction { + value: Container(Unknown(255)), + bool: true, + key: 4294967295, + pos: 1808504320951975935, + length: 16131858204548733209, + prop: 16131858542891098079, + }), + }, + Handle { + site: 25, + target: 25, + container: 34, + action: Generic(GenericAction { + value: I32(-57089), + bool: true, + key: 4294967295, + pos: 18446744073709551615, + length: 18446744073709551615, + prop: 1808504324286840831, + }), + }, + Handle { + site: 25, + target: 25, + container: 25, + action: Generic(GenericAction { + value: I32(-544138983), + bool: true, + key: 3755991007, + pos: 16126228219801118943, + length: 1808504320951967711, + prop: 1808504320951916834, + }), + }, + Handle { + site: 0, + target: 0, + container: 0, + action: Generic(GenericAction { + value: I32(0), + bool: true, + key: 85530905, + pos: 18446743081993181632, + length: 18446744073709551615, + prop: 8319119878197870591, + }), + }, + Undo { + site: 115, + op_len: 1936946035, + }, + SyncAll, + Handle { + site: 29, + target: 29, + container: 29, + action: Generic(GenericAction { + value: I32(0), + bool: false, + key: 0, + pos: 0, + length: 0, + prop: 0, + }), + }, + ], + ) +} + #[test] fn gc_fuzz() { test_multi_sites_with_gc( diff --git a/crates/loro-internal/src/loro.rs b/crates/loro-internal/src/loro.rs index 7a1f5f12..db35955f 100644 --- a/crates/loro-internal/src/loro.rs +++ b/crates/loro-internal/src/loro.rs @@ -560,9 +560,6 @@ impl LoroDoc { if !self.detached.load(Acquire) { debug!("checkout from {:?} to {:?}", old_vv, oplog.vv()); let mut diff = DiffCalculator::new(false); - oplog.change_store().visit_all_changes(&mut |c| { - trace!("change {:#?}", &c); - }); if &old_vv != oplog.vv() { let diff = diff.calc_diff_internal( &oplog, diff --git a/crates/loro-internal/src/oplog.rs b/crates/loro-internal/src/oplog.rs index 934cb83c..beb53603 100644 --- a/crates/loro-internal/src/oplog.rs +++ b/crates/loro-internal/src/oplog.rs @@ -451,9 +451,6 @@ impl OpLog { debug!("from_frontiers={:?} vv={:?}", &from_frontiers, from); debug!("to_frontiers={:?} vv={:?}", &to_frontiers, to); trace!("trimmed vv = {:?}", self.dag.trimmed_vv()); - self.change_store.visit_all_changes(&mut |c| { - trace!("change {:#?}", &c); - }); let (common_ancestors, mut diff_mode) = self.dag.find_common_ancestor(from_frontiers, to_frontiers); if diff_mode == DiffMode::Checkout && to > from { diff --git a/crates/loro-internal/src/oplog/loro_dag.rs b/crates/loro-internal/src/oplog/loro_dag.rs index 29f2ffed..ad82c983 100644 --- a/crates/loro-internal/src/oplog/loro_dag.rs +++ b/crates/loro-internal/src/oplog/loro_dag.rs @@ -218,7 +218,7 @@ impl AppDag { } pub(crate) fn find_deps_of_id(&self, id: ID) -> Frontiers { - self.ensure_lazy_load_node(id, None); + self.ensure_lazy_load_node(id); let Some(node) = self.get(id) else { return Frontiers::default(); }; @@ -283,8 +283,10 @@ impl AppDag { map_guard.as_mut().unwrap() }); let new_dag_start_counter_for_the_peer = nodes[0].cnt; + let nodes_cnt_end = nodes.last().unwrap().ctr_end(); let mut unparsed_vv = self.unparsed_vv.try_lock().unwrap(); let end_counter = unparsed_vv[&peer]; + assert!(end_counter <= nodes_cnt_end); let mut deps_on_others = vec![]; let mut break_point_set = self.unhandled_dep_points.try_lock().unwrap(); for mut node in nodes { @@ -389,20 +391,32 @@ impl AppDag { } } - fn ensure_lazy_load_node(&self, id: ID, map: Option<&mut BTreeMap>) { - if !self.unparsed_vv.try_lock().unwrap().includes_id(id) { - return; - } - + fn ensure_lazy_load_node(&self, id: ID) { if self.trimmed_vv.includes_id(id) { return; } - let Some(nodes) = self.change_store.get_dag_nodes_that_contains(id) else { - panic!("unparsed vv don't match with change store. Id:{id} is not in change store") - }; + loop { + // We need to load all the dag nodes that has the same peer and greater counter than the given `id` + // Because we only record the end counter of the unparsed version on `unparsed_vv` + let unparsed_end = { + let unparsed_vv = self.unparsed_vv.try_lock().unwrap(); + unparsed_vv.get(&id.peer).copied().unwrap_or(0) + }; + if unparsed_end <= id.counter { + return; + } - self.lazy_load_nodes_internal(nodes, id.peer, map); + let last_unparsed_id = ID::new(id.peer, unparsed_end - 1); + let Some(nodes) = self + .change_store + .get_dag_nodes_that_contains(last_unparsed_id) + else { + panic!("unparsed vv don't match with change store. Id:{id} is not in change store") + }; + + self.lazy_load_nodes_internal(nodes, id.peer, None); + } } pub(super) fn fork(&self, change_store: ChangeStore) -> AppDag { @@ -431,7 +445,7 @@ impl AppDag { if let Some((vv, f)) = v.start_version { if !f.is_empty() { assert!(f.len() == 1); - self.ensure_lazy_load_node(f[0], None); + self.ensure_lazy_load_node(f[0]); let node = self.get(f[0]).unwrap(); assert!(node.cnt == f[0].counter); self.trimmed_frontiers_deps = node.deps.clone(); @@ -464,7 +478,7 @@ impl AppDag { let init_counter = self.trimmed_vv.get(peer).copied().unwrap_or(0); while end_cnt > init_counter { let cnt = end_cnt - 1; - self.ensure_lazy_load_node(ID::new(*peer, cnt), None); + self.ensure_lazy_load_node(ID::new(*peer, cnt)); end_cnt = self .unparsed_vv .try_lock() @@ -698,7 +712,7 @@ impl Dag for AppDag { } fn get(&self, id: ID) -> Option { - self.ensure_lazy_load_node(id, None); + self.ensure_lazy_load_node(id); let binding = self.map.try_lock().unwrap(); let x = binding.range(..=id).next_back()?; if x.1.contains_id(id) { @@ -733,15 +747,18 @@ impl AppDag { } let mut ans_vv = ImVersionVector::default(); - trace!("deps={:?}", &node.deps); - trace!("this.trimmed_f_deps={:?}", &self.trimmed_frontiers_deps); + // trace!("deps={:?}", &node.deps); + // trace!("this.trimmed_f_deps={:?}", &self.trimmed_frontiers_deps); + // trace!("this.vv={:?}", &self.vv); + // trace!("this.unparsed_vv={:?}", &self.unparsed_vv); + // trace!("this.trimmed_vv={:?}", &self.trimmed_vv); if node.deps == self.trimmed_frontiers_deps { for (&p, &c) in self.trimmed_vv.iter() { ans_vv.insert(p, c); } } else { for id in node.deps.iter() { - trace!("id {}", id); + // trace!("id {}", id); let node = self.get(*id).expect("deps should be in the dag"); let dep_vv = self.ensure_vv_for(&node); if ans_vv.is_empty() { @@ -754,7 +771,7 @@ impl AppDag { } } - trace!("ans_vv={:?}", &ans_vv); + // trace!("ans_vv={:?}", &ans_vv); node.vv.set(ans_vv.clone()).unwrap(); ans_vv } @@ -772,7 +789,7 @@ impl AppDag { } pub fn get_lamport(&self, id: &ID) -> Option { - self.ensure_lazy_load_node(*id, None); + self.ensure_lazy_load_node(*id); self.get(*id).and_then(|node| { assert!(id.counter >= node.cnt); if node.cnt + node.len as Counter > id.counter { @@ -813,6 +830,7 @@ impl AppDag { Some(vv) } + #[allow(unused)] pub(crate) fn frontiers_to_im_vv(&self, frontiers: &Frontiers) -> ImVersionVector { if frontiers.is_empty() { return Default::default();