diff --git a/crates/loro-internal/src/diff_calc/tree.rs b/crates/loro-internal/src/diff_calc/tree.rs index 554cff0e..cfbb97f2 100644 --- a/crates/loro-internal/src/diff_calc/tree.rs +++ b/crates/loro-internal/src/diff_calc/tree.rs @@ -239,11 +239,10 @@ impl TreeDiffCalculator { let _e = s.enter(); let to_frontiers = info.to_frontiers; let from_frontiers = info.from_frontiers; - let (common_ancestors, _mode) = oplog - .dag - .find_common_ancestor(&from_frontiers, &to_frontiers); + let (common_ancestors, _mode) = + oplog.dag.find_common_ancestor(from_frontiers, to_frontiers); let lca_vv = oplog.dag.frontiers_to_vv(&common_ancestors).unwrap(); - let lca_frontiers = lca_vv.to_frontiers(&oplog.dag); + let lca_frontiers = common_ancestors; tracing::info!( "from vv {:?} to vv {:?} current vv {:?} lca vv {:?}", info.from_vv, @@ -252,7 +251,7 @@ impl TreeDiffCalculator { lca_vv ); - let to_max_lamport = self.get_max_lamport_by_frontiers(&to_frontiers, oplog); + let to_max_lamport = self.get_max_lamport_by_frontiers(to_frontiers, oplog); let lca_min_lamport = self.get_min_lamport_by_frontiers(&lca_frontiers, oplog); // retreat for diff diff --git a/crates/loro-internal/src/loro.rs b/crates/loro-internal/src/loro.rs index 8decbf1e..aac4af79 100644 --- a/crates/loro-internal/src/loro.rs +++ b/crates/loro-internal/src/loro.rs @@ -1067,7 +1067,7 @@ impl LoroDoc { return Ok(()); } - let oplog = self.oplog.lock().unwrap(); + let oplog = self.oplog.try_lock().unwrap(); if oplog.dag.is_on_trimmed_history(frontiers) { drop(oplog); self.renew_txn_if_auto_commit(); @@ -1081,8 +1081,8 @@ impl LoroDoc { return Ok(()); } - let mut state = self.state.lock().unwrap(); - let mut calc = self.diff_calculator.lock().unwrap(); + let mut state = self.state.try_lock().unwrap(); + let mut calc = self.diff_calculator.try_lock().unwrap(); for &i in frontiers.iter() { if !oplog.dag.contains(i) { drop(oplog); @@ -1123,12 +1123,16 @@ impl LoroDoc { #[inline] pub fn vv_to_frontiers(&self, vv: &VersionVector) -> Frontiers { - self.oplog.lock().unwrap().dag.vv_to_frontiers(vv) + self.oplog.try_lock().unwrap().dag.vv_to_frontiers(vv) } #[inline] pub fn frontiers_to_vv(&self, frontiers: &Frontiers) -> Option { - self.oplog.lock().unwrap().dag.frontiers_to_vv(frontiers) + self.oplog + .try_lock() + .unwrap() + .dag + .frontiers_to_vv(frontiers) } /// Import ops from other doc. @@ -1150,7 +1154,7 @@ impl LoroDoc { #[inline] pub fn len_changes(&self) -> usize { - let oplog = self.oplog.lock().unwrap(); + let oplog = self.oplog.try_lock().unwrap(); oplog.len_changes() } diff --git a/crates/loro-internal/src/oplog.rs b/crates/loro-internal/src/oplog.rs index be7bccdc..30d934ae 100644 --- a/crates/loro-internal/src/oplog.rs +++ b/crates/loro-internal/src/oplog.rs @@ -478,7 +478,7 @@ impl OpLog { let from_frontiers = match from_frontiers { Some(f) => f, None => { - from_frontiers_inner = Some(from.to_frontiers(&self.dag)); + from_frontiers_inner = Some(self.dag.vv_to_frontiers(from)); from_frontiers_inner.as_ref().unwrap() } }; @@ -486,7 +486,7 @@ impl OpLog { let to_frontiers = match to_frontiers { Some(t) => t, None => { - to_frontiers_inner = Some(to.to_frontiers(&self.dag)); + to_frontiers_inner = Some(self.dag.vv_to_frontiers(to)); to_frontiers_inner.as_ref().unwrap() } }; diff --git a/crates/loro-internal/src/oplog/loro_dag.rs b/crates/loro-internal/src/oplog/loro_dag.rs index 37d1cb00..6c5b80cd 100644 --- a/crates/loro-internal/src/oplog/loro_dag.rs +++ b/crates/loro-internal/src/oplog/loro_dag.rs @@ -8,9 +8,9 @@ use loro_common::{HasCounter, HasCounterSpan, HasIdSpan, HasLamportSpan, PeerID} use once_cell::sync::OnceCell; use rle::{HasIndex, HasLength, Mergable, Sliceable}; use std::cmp::Ordering; -use std::collections::{BTreeMap, BTreeSet}; +use std::collections::{BTreeMap, BTreeSet, BinaryHeap}; use std::fmt::Display; -use std::ops::Deref; +use std::ops::{ControlFlow, Deref}; use std::sync::{Arc, Mutex}; use tracing::{instrument, trace}; @@ -590,6 +590,57 @@ impl AppDag { false } + + /// Travel the ancestors of the given id, and call the callback for each node + /// + /// It will travel the ancestors in the reverse order (from the greatest lamport to the smallest) + pub(crate) fn travel_ancestors( + &self, + id: ID, + f: &mut dyn FnMut(&AppDagNode) -> ControlFlow<()>, + ) { + struct PendingNode(AppDagNode); + impl PartialEq for PendingNode { + fn eq(&self, other: &Self) -> bool { + self.0.lamport_last() == other.0.lamport_last() && self.0.peer == other.0.peer + } + } + impl Eq for PendingNode {} + impl PartialOrd for PendingNode { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } + } + impl Ord for PendingNode { + fn cmp(&self, other: &Self) -> Ordering { + self.0 + .lamport_last() + .cmp(&other.0.lamport_last()) + .then_with(|| self.0.peer.cmp(&other.0.peer)) + } + } + + let mut visited = FxHashSet::default(); + let mut pending: BinaryHeap = BinaryHeap::new(); + pending.push(PendingNode(self.get(id).unwrap())); + while let Some(PendingNode(node)) = pending.pop() { + if f(&node).is_break() { + break; + } + + for &dep in node.deps.iter() { + let Some(dep_node) = self.get(dep) else { + continue; + }; + if visited.contains(&dep_node.id_start()) { + continue; + } + + visited.insert(dep_node.id_start()); + pending.push(PendingNode(dep_node)); + } + } + } } fn check_always_dep_on_last_id(map: &BTreeMap) { diff --git a/crates/loro-internal/src/version.rs b/crates/loro-internal/src/version.rs index 4ad89605..5a68d62b 100644 --- a/crates/loro-internal/src/version.rs +++ b/crates/loro-internal/src/version.rs @@ -1,8 +1,9 @@ -use loro_common::{HasCounter, HasCounterSpan, IdSpanVector}; +use itertools::Itertools; +use loro_common::{HasCounter, HasCounterSpan, HasIdSpan, HasLamportSpan, IdFull, IdSpanVector}; use smallvec::smallvec; use std::{ cmp::Ordering, - ops::{Deref, DerefMut}, + ops::{ControlFlow, Deref, DerefMut}, }; use fxhash::{FxHashMap, FxHashSet}; @@ -78,15 +79,6 @@ impl ImVersionVector { self.0.contains_key(k) } - /// Convert to a [Frontiers] - /// - /// # Panic - /// - /// When self is greater than dag.vv - pub fn to_frontiers(&self, dag: &AppDag) -> Frontiers { - dag.im_vv_to_frontiers(self) - } - pub fn encode(&self) -> Vec { postcard::to_allocvec(self).unwrap() } @@ -918,15 +910,6 @@ impl VersionVector { postcard::from_bytes(bytes).map_err(|_| LoroError::DecodeVersionVectorError) } - /// Convert to a [Frontiers] - /// - /// # Panic - /// - /// When self is greater than dag.vv - pub fn to_frontiers(&self, dag: &AppDag) -> Frontiers { - dag.vv_to_frontiers(self) - } - pub(crate) fn trim(&self, vv: &VersionVector) -> VersionVector { let mut ans = VersionVector::new(); for (client_id, &counter) in self.iter() { @@ -949,10 +932,7 @@ impl VersionVector { /// Use minimal set of ids to represent the frontiers pub fn shrink_frontiers(last_ids: &[ID], dag: &AppDag) -> Frontiers { // it only keep the ids of ops that are concurrent to each other - let mut frontiers = Frontiers::default(); - let mut frontiers_vv = Vec::new(); - if last_ids.is_empty() { return frontiers; } @@ -962,30 +942,60 @@ pub fn shrink_frontiers(last_ids: &[ID], dag: &AppDag) -> Frontiers { return frontiers; } - let mut last_ids = last_ids.to_vec(); - // sort by lamport, ascending - last_ids.sort_by_cached_key(|x| dag.get_lamport(x).unwrap() as isize); + let mut last_ids = filter_duplicated_peer_id(last_ids) + .into_iter() + .map(|x| IdFull::new(x.peer, x.counter, dag.get_lamport(&x).unwrap())) + .collect_vec(); + if last_ids.len() == 1 { + frontiers.push(last_ids[0].id()); + return frontiers; + } + + // Iterate from the greatest lamport to the smallest + last_ids.sort_by_key(|x| x.lamport); for id in last_ids.iter().rev() { - let vv = dag.get_vv(*id).unwrap(); let mut should_insert = true; - for f_vv in frontiers_vv.iter() { - if vv.partial_cmp(f_vv).is_some() { - // This is not concurrent op, should be ignored in frontiers - should_insert = false; - break; - } + let mut len = 0; + // travel backward because they have more similar lamport + for f_id in frontiers.iter().rev() { + dag.travel_ancestors(*f_id, &mut |x| { + len += 1; + if x.contains_id(id.id()) { + should_insert = false; + ControlFlow::Break(()) + } else if x.lamport_last() < id.lamport { + // Already travel to a node with smaller lamport, no need to continue, we are sure two ops are concurrent now + ControlFlow::Break(()) + } else { + ControlFlow::Continue(()) + } + }); } if should_insert { - frontiers.push(*id); - frontiers_vv.push(vv); + frontiers.push(id.id()); } } frontiers } +fn filter_duplicated_peer_id(last_ids: &[ID]) -> Vec { + let mut peer_max_counters = FxHashMap::default(); + for &id in last_ids { + let counter = peer_max_counters.entry(id.peer).or_insert(id.counter); + if id.counter > *counter { + *counter = id.counter; + } + } + + peer_max_counters + .into_iter() + .map(|(peer, counter)| ID::new(peer, counter)) + .collect() +} + impl Default for VersionVector { fn default() -> Self { Self::new()