diff --git a/crates/loro-internal/src/oplog/change_store.rs b/crates/loro-internal/src/oplog/change_store.rs index ce07ff50..fecd3293 100644 --- a/crates/loro-internal/src/oplog/change_store.rs +++ b/crates/loro-internal/src/oplog/change_store.rs @@ -56,6 +56,9 @@ struct ChangeStoreInner { mem_parsed_kv: BTreeMap>, } +pub const VV_KEY: &[u8] = b"vv"; +pub const FRONTIERS_KEY: &[u8] = b"fr"; + impl ChangeStore { pub fn new_mem(a: &SharedArena, merge_interval: Arc) -> Self { Self { @@ -229,14 +232,26 @@ impl ChangeStore { } loop { - let old_vv_bytes = store.get(b"vv"); + let old_vv_bytes = store.get(VV_KEY); let new_vv = old_vv_bytes .as_ref() .map(|bytes| VersionVector::decode(bytes).unwrap()) .unwrap_or_default(); inner.vv.merge_vv(&new_vv); let vv_bytes = inner.vv.encode(); - if store.compare_and_swap(b"vv", old_vv_bytes, vv_bytes.into()) { + if store.compare_and_swap(VV_KEY, old_vv_bytes, vv_bytes.into()) { + } else { + continue; + } + + let old_frontiers_bytes = store.get(FRONTIERS_KEY); + let new_frontiers = old_frontiers_bytes + .as_ref() + .map(|bytes| Frontiers::decode(bytes).unwrap()) + .unwrap_or_default(); + inner.frontiers.merge_frontiers(&new_frontiers); + let frontiers_bytes = inner.frontiers.encode(); + if store.compare_and_swap(FRONTIERS_KEY, old_frontiers_bytes, frontiers_bytes.into()) { break; } } diff --git a/crates/loro-internal/src/version.rs b/crates/loro-internal/src/version.rs index 03a85467..d32c8a8f 100644 --- a/crates/loro-internal/src/version.rs +++ b/crates/loro-internal/src/version.rs @@ -203,6 +203,58 @@ impl Frontiers { pub(crate) fn with_capacity(cap: usize) -> Frontiers { Self(SmallVec::with_capacity(cap)) } + + pub(crate) fn merge_frontiers(&mut self, new_frontiers: &Frontiers) { + if self.len() <= 1 { + if self == new_frontiers { + return; + } + + if new_frontiers.len() == 0 { + return; + } + + if self.len() == 0 { + *self = new_frontiers.clone(); + return; + } + + if new_frontiers.len() == 1 { + let new_id = new_frontiers[0]; + if self[0].peer == new_id.peer { + if self[0].counter < new_id.counter { + self[0].counter = new_id.counter; + } else { + return; + } + } else { + self.push(new_id); + return; + } + } + } + + let mut map = self + .0 + .iter() + .map(|id| (id.peer, id.counter)) + .collect::>(); + + for id in new_frontiers.0.iter() { + if let Some(counter) = map.get_mut(&id.peer) { + if *counter < id.counter { + *counter = id.counter; + } + } else { + self.0.push(*id); + } + } + + self.0 = map + .into_iter() + .map(|(peer, counter)| ID::new(peer, counter)) + .collect(); + } } impl Deref for Frontiers {