fix: lazy load dag node

This commit is contained in:
Zixuan Chen 2024-09-13 18:39:44 +08:00
parent 15175a1e5f
commit c18cec15a9
No known key found for this signature in database
7 changed files with 528 additions and 24 deletions

23
Cargo.lock generated
View file

@ -564,6 +564,12 @@ dependencies = [
"thousands",
]
[[package]]
name = "diff"
version = "0.1.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "56254986775e3233ffa9c4d7d3faaf6d36a2c09d30b20687e9f88bc8bafc16c8"
[[package]]
name = "either"
version = "1.9.0"
@ -696,6 +702,7 @@ dependencies = [
"loro 0.16.2 (git+https://github.com/loro-dev/loro.git?tag=loro-crdt@0.16.7)",
"loro 0.16.2 (git+https://github.com/loro-dev/loro.git?rev=90470658435ec4c62b5af59ebb82fe9e1f5aa761)",
"num_cpus",
"pretty_assertions",
"rand",
"rayon",
"serde_json",
@ -1726,6 +1733,16 @@ version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "925383efa346730478fb4838dbe9137d2a47675ad789c546d150a6e1dd4ab31c"
[[package]]
name = "pretty_assertions"
version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "af7cee1a6c8a5b9208b3cb1061f10c0cb689087b3d8ce85fb9d2dd7a29b6ba66"
dependencies = [
"diff",
"yansi",
]
[[package]]
name = "proc-macro-error"
version = "1.0.4"
@ -2816,6 +2833,12 @@ version = "0.8.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6a5cbf750400958819fb6178eaa83bee5cd9c29a26a40cc241df8c70fdd46984"
[[package]]
name = "yansi"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09041cd90cf85f7f8b2df60c646f853b7f535ce68f85244eb6731cf89fa498ec"
[[package]]
name = "zerocopy"
version = "0.7.35"

View file

@ -23,6 +23,7 @@ num_cpus = "1.16.0"
rayon = "1.10.0"
bytes = "1"
ensure-cov = "0.1.0"
pretty_assertions = "1.4.0"
[dev-dependencies]
ctor = "0.2"

View file

@ -12,6 +12,7 @@ use loro::{
Container, ContainerID, ContainerType, Frontiers, LoroDoc, LoroError, LoroValue, PeerID,
UndoManager, ID,
};
use pretty_assertions::assert_eq;
use rand::{rngs::StdRng, Rng, SeedableRng};
use tracing::info_span;

View file

@ -8936,6 +8936,473 @@ fn fast_snapshot_4() {
)
}
#[test]
fn fast_snapshot_5() {
test_multi_sites(
5,
vec![FuzzTarget::All],
&mut [
Handle {
site: 25,
target: 25,
container: 193,
action: Generic(GenericAction {
value: I32(1644825),
bool: false,
key: 393216,
pos: 27487790694400,
length: 1808504320951916800,
prop: 18446744073709551385,
}),
},
Handle {
site: 25,
target: 25,
container: 25,
action: Generic(GenericAction {
value: Container(Unknown(255)),
bool: true,
key: 436207615,
pos: 1808504320951916825,
length: 16131858542891077913,
prop: 18410398479363858399,
}),
},
SyncAll,
Handle {
site: 25,
target: 34,
container: 25,
action: Generic(GenericAction {
value: Container(Unknown(255)),
bool: true,
key: 4294967295,
pos: 18446744073709551615,
length: 18446744073709551615,
prop: 18381750949675392991,
}),
},
Handle {
site: 25,
target: 25,
container: 25,
action: Generic(GenericAction {
value: I32(-538976367),
bool: true,
key: 685760479,
pos: 16131858456979185696,
length: 1811037595742312729,
prop: 1849036717598251289,
}),
},
Handle {
site: 0,
target: 0,
container: 0,
action: Generic(GenericAction {
value: I32(421068800),
bool: true,
key: 432014617,
pos: 18446744073694419225,
length: 18446744073709551615,
prop: 16131858542891098079,
}),
},
Handle {
site: 25,
target: 25,
container: 25,
action: Generic(GenericAction {
value: Container(Unknown(223)),
bool: true,
key: 538978527,
pos: 1808722877991345952,
length: 1808504359606622489,
prop: 1808504939427207449,
}),
},
Handle {
site: 0,
target: 0,
container: 0,
action: Generic(GenericAction {
value: I32(421075225),
bool: true,
key: 421075392,
pos: 18446744073709551513,
length: 16131858541569507327,
prop: 16131858542891098079,
}),
},
Handle {
site: 25,
target: 255,
container: 255,
action: Generic(GenericAction {
value: Container(Text),
bool: true,
key: 4279834905,
pos: 10455415605511192575,
length: 2945318833950285791,
prop: 16131858456979185696,
}),
},
Handle {
site: 223,
target: 40,
container: 32,
action: Generic(GenericAction {
value: Container(Text),
bool: true,
key: 421075225,
pos: 1808504320951916834,
length: 7064470003718569,
prop: 0,
}),
},
Handle {
site: 192,
target: 25,
container: 25,
action: Generic(GenericAction {
value: Container(Unknown(255)),
bool: true,
key: 3750828543,
pos: 16131858542891098079,
length: 18446744073170575327,
prop: 18446744073709551615,
}),
},
Handle {
site: 25,
target: 25,
container: 25,
action: Generic(GenericAction {
value: Container(Unknown(25)),
bool: true,
key: 421075225,
pos: 16131858204548733209,
length: 2314885568395206623,
prop: 1808505174690352095,
}),
},
Handle {
site: 25,
target: 25,
container: 25,
action: Generic(GenericAction {
value: Container(Text),
bool: true,
key: 25,
pos: 18446462598732840960,
length: 18446744073709551615,
prop: 18446744073709551615,
}),
},
SyncAll,
Handle {
site: 25,
target: 25,
container: 25,
action: Generic(GenericAction {
value: Container(Unknown(223)),
bool: true,
key: 538976296,
pos: 1808505174690352095,
length: 1808504321102911769,
prop: 1808504323367835929,
}),
},
Handle {
site: 0,
target: 0,
container: 0,
action: Generic(GenericAction {
value: I32(435231001),
bool: true,
key: 421075392,
pos: 18446744073709551385,
length: 16131893865241116671,
prop: 8319119876378817395,
}),
},
Undo {
site: 115,
op_len: 1936946035,
},
Handle {
site: 25,
target: 25,
container: 25,
action: Generic(GenericAction {
value: I32(-544138983),
bool: true,
key: 3755991007,
pos: 16126228219801118943,
length: 1808504320951967711,
prop: 1808504320951916834,
}),
},
Handle {
site: 0,
target: 0,
container: 0,
action: Generic(GenericAction {
value: I32(421117957),
bool: true,
key: 4294967065,
pos: 18446744073709551615,
length: 16131858542891106303,
prop: 18446744073170575327,
}),
},
Handle {
site: 25,
target: 255,
container: 255,
action: Generic(GenericAction {
value: Container(Text),
bool: true,
key: 421075225,
pos: 16131858541569448217,
length: 16077885992209473503,
prop: 1808504324286832587,
}),
},
Handle {
site: 25,
target: 25,
container: 25,
action: Generic(GenericAction {
value: I32(421075225),
bool: true,
key: 0,
pos: 1808504213156659200,
length: 18417779746705245465,
prop: 18446744073709551615,
}),
},
Handle {
site: 255,
target: 255,
container: 255,
action: Generic(GenericAction {
value: Container(Text),
bool: true,
key: 421075225,
pos: 16131858542885935385,
length: 14690495831856439263,
prop: 1808504320964943839,
}),
},
Handle {
site: 25,
target: 25,
container: 25,
action: Generic(GenericAction {
value: I32(421075225),
bool: false,
key: 0,
pos: 18446744069414584320,
length: 18446744073706012671,
prop: 18446744073709551615,
}),
},
Handle {
site: 25,
target: 25,
container: 0,
action: Generic(GenericAction {
value: I32(421075225),
bool: true,
key: 4294967065,
pos: 18446744073709551615,
length: 1808758200342674943,
prop: 18446490194318792985,
}),
},
Handle {
site: 25,
target: 25,
container: 25,
action: Generic(GenericAction {
value: Container(Unknown(25)),
bool: true,
key: 421075225,
pos: 16131858204548733209,
length: 2314885568395206623,
prop: 1808505174690352095,
}),
},
Handle {
site: 25,
target: 25,
container: 25,
action: Generic(GenericAction {
value: Container(Text),
bool: true,
key: 25,
pos: 1808476725365964800,
length: 1808505037875966233,
prop: 18446744073709551385,
}),
},
Undo {
site: 115,
op_len: 1936946035,
},
Handle {
site: 25,
target: 25,
container: 25,
action: Generic(GenericAction {
value: Container(Unknown(223)),
bool: true,
key: 538976486,
pos: 1808505174690352095,
length: 1808504321102911769,
prop: 1808504323367835929,
}),
},
Handle {
site: 0,
target: 0,
container: 0,
action: Generic(GenericAction {
value: I32(421117957),
bool: true,
key: 4294967065,
pos: 18446744073709551615,
length: 16131858542891106303,
prop: 18446744073170575327,
}),
},
Handle {
site: 25,
target: 255,
container: 255,
action: Generic(GenericAction {
value: Container(Text),
bool: true,
key: 421075225,
pos: 16131858541569448217,
length: 16077885992209473503,
prop: 1808504324286832587,
}),
},
Handle {
site: 255,
target: 255,
container: 255,
action: Generic(GenericAction {
value: Container(Text),
bool: true,
key: 421075225,
pos: 16131858542885935385,
length: 1808504320964943871,
prop: 18446744073709551615,
}),
},
Handle {
site: 25,
target: 25,
container: 25,
action: Generic(GenericAction {
value: Container(Unknown(223)),
bool: true,
key: 538976296,
pos: 1808505174690352095,
length: 1808504321102911769,
prop: 1808504323367835929,
}),
},
Handle {
site: 0,
target: 0,
container: 0,
action: Generic(GenericAction {
value: Container(Unknown(255)),
bool: true,
key: 4294967295,
pos: 65535,
length: 1808476725365964800,
prop: 1801439851369273625,
}),
},
Handle {
site: 25,
target: 25,
container: 255,
action: Generic(GenericAction {
value: Container(Unknown(255)),
bool: true,
key: 4294967295,
pos: 1808504320951975935,
length: 16131858204548733209,
prop: 16131858542891098079,
}),
},
Handle {
site: 25,
target: 25,
container: 34,
action: Generic(GenericAction {
value: I32(-57089),
bool: true,
key: 4294967295,
pos: 18446744073709551615,
length: 18446744073709551615,
prop: 1808504324286840831,
}),
},
Handle {
site: 25,
target: 25,
container: 25,
action: Generic(GenericAction {
value: I32(-544138983),
bool: true,
key: 3755991007,
pos: 16126228219801118943,
length: 1808504320951967711,
prop: 1808504320951916834,
}),
},
Handle {
site: 0,
target: 0,
container: 0,
action: Generic(GenericAction {
value: I32(0),
bool: true,
key: 85530905,
pos: 18446743081993181632,
length: 18446744073709551615,
prop: 8319119878197870591,
}),
},
Undo {
site: 115,
op_len: 1936946035,
},
SyncAll,
Handle {
site: 29,
target: 29,
container: 29,
action: Generic(GenericAction {
value: I32(0),
bool: false,
key: 0,
pos: 0,
length: 0,
prop: 0,
}),
},
],
)
}
#[test]
fn gc_fuzz() {
test_multi_sites_with_gc(

View file

@ -560,9 +560,6 @@ impl LoroDoc {
if !self.detached.load(Acquire) {
debug!("checkout from {:?} to {:?}", old_vv, oplog.vv());
let mut diff = DiffCalculator::new(false);
oplog.change_store().visit_all_changes(&mut |c| {
trace!("change {:#?}", &c);
});
if &old_vv != oplog.vv() {
let diff = diff.calc_diff_internal(
&oplog,

View file

@ -451,9 +451,6 @@ impl OpLog {
debug!("from_frontiers={:?} vv={:?}", &from_frontiers, from);
debug!("to_frontiers={:?} vv={:?}", &to_frontiers, to);
trace!("trimmed vv = {:?}", self.dag.trimmed_vv());
self.change_store.visit_all_changes(&mut |c| {
trace!("change {:#?}", &c);
});
let (common_ancestors, mut diff_mode) =
self.dag.find_common_ancestor(from_frontiers, to_frontiers);
if diff_mode == DiffMode::Checkout && to > from {

View file

@ -218,7 +218,7 @@ impl AppDag {
}
pub(crate) fn find_deps_of_id(&self, id: ID) -> Frontiers {
self.ensure_lazy_load_node(id, None);
self.ensure_lazy_load_node(id);
let Some(node) = self.get(id) else {
return Frontiers::default();
};
@ -283,8 +283,10 @@ impl AppDag {
map_guard.as_mut().unwrap()
});
let new_dag_start_counter_for_the_peer = nodes[0].cnt;
let nodes_cnt_end = nodes.last().unwrap().ctr_end();
let mut unparsed_vv = self.unparsed_vv.try_lock().unwrap();
let end_counter = unparsed_vv[&peer];
assert!(end_counter <= nodes_cnt_end);
let mut deps_on_others = vec![];
let mut break_point_set = self.unhandled_dep_points.try_lock().unwrap();
for mut node in nodes {
@ -389,20 +391,32 @@ impl AppDag {
}
}
fn ensure_lazy_load_node(&self, id: ID, map: Option<&mut BTreeMap<ID, AppDagNode>>) {
if !self.unparsed_vv.try_lock().unwrap().includes_id(id) {
return;
}
fn ensure_lazy_load_node(&self, id: ID) {
if self.trimmed_vv.includes_id(id) {
return;
}
let Some(nodes) = self.change_store.get_dag_nodes_that_contains(id) else {
loop {
// We need to load all the dag nodes that has the same peer and greater counter than the given `id`
// Because we only record the end counter of the unparsed version on `unparsed_vv`
let unparsed_end = {
let unparsed_vv = self.unparsed_vv.try_lock().unwrap();
unparsed_vv.get(&id.peer).copied().unwrap_or(0)
};
if unparsed_end <= id.counter {
return;
}
let last_unparsed_id = ID::new(id.peer, unparsed_end - 1);
let Some(nodes) = self
.change_store
.get_dag_nodes_that_contains(last_unparsed_id)
else {
panic!("unparsed vv don't match with change store. Id:{id} is not in change store")
};
self.lazy_load_nodes_internal(nodes, id.peer, map);
self.lazy_load_nodes_internal(nodes, id.peer, None);
}
}
pub(super) fn fork(&self, change_store: ChangeStore) -> AppDag {
@ -431,7 +445,7 @@ impl AppDag {
if let Some((vv, f)) = v.start_version {
if !f.is_empty() {
assert!(f.len() == 1);
self.ensure_lazy_load_node(f[0], None);
self.ensure_lazy_load_node(f[0]);
let node = self.get(f[0]).unwrap();
assert!(node.cnt == f[0].counter);
self.trimmed_frontiers_deps = node.deps.clone();
@ -464,7 +478,7 @@ impl AppDag {
let init_counter = self.trimmed_vv.get(peer).copied().unwrap_or(0);
while end_cnt > init_counter {
let cnt = end_cnt - 1;
self.ensure_lazy_load_node(ID::new(*peer, cnt), None);
self.ensure_lazy_load_node(ID::new(*peer, cnt));
end_cnt = self
.unparsed_vv
.try_lock()
@ -698,7 +712,7 @@ impl Dag for AppDag {
}
fn get(&self, id: ID) -> Option<Self::Node> {
self.ensure_lazy_load_node(id, None);
self.ensure_lazy_load_node(id);
let binding = self.map.try_lock().unwrap();
let x = binding.range(..=id).next_back()?;
if x.1.contains_id(id) {
@ -733,15 +747,18 @@ impl AppDag {
}
let mut ans_vv = ImVersionVector::default();
trace!("deps={:?}", &node.deps);
trace!("this.trimmed_f_deps={:?}", &self.trimmed_frontiers_deps);
// trace!("deps={:?}", &node.deps);
// trace!("this.trimmed_f_deps={:?}", &self.trimmed_frontiers_deps);
// trace!("this.vv={:?}", &self.vv);
// trace!("this.unparsed_vv={:?}", &self.unparsed_vv);
// trace!("this.trimmed_vv={:?}", &self.trimmed_vv);
if node.deps == self.trimmed_frontiers_deps {
for (&p, &c) in self.trimmed_vv.iter() {
ans_vv.insert(p, c);
}
} else {
for id in node.deps.iter() {
trace!("id {}", id);
// trace!("id {}", id);
let node = self.get(*id).expect("deps should be in the dag");
let dep_vv = self.ensure_vv_for(&node);
if ans_vv.is_empty() {
@ -754,7 +771,7 @@ impl AppDag {
}
}
trace!("ans_vv={:?}", &ans_vv);
// trace!("ans_vv={:?}", &ans_vv);
node.vv.set(ans_vv.clone()).unwrap();
ans_vv
}
@ -772,7 +789,7 @@ impl AppDag {
}
pub fn get_lamport(&self, id: &ID) -> Option<Lamport> {
self.ensure_lazy_load_node(*id, None);
self.ensure_lazy_load_node(*id);
self.get(*id).and_then(|node| {
assert!(id.counter >= node.cnt);
if node.cnt + node.len as Counter > id.counter {
@ -813,6 +830,7 @@ impl AppDag {
Some(vv)
}
#[allow(unused)]
pub(crate) fn frontiers_to_im_vv(&self, frontiers: &Frontiers) -> ImVersionVector {
if frontiers.is_empty() {
return Default::default();