perf: make shrink frontiers faster when the peer num is large

This commit is contained in:
Zixuan Chen 2024-09-26 11:29:56 +08:00
parent fe85805327
commit 1054c1804b
No known key found for this signature in database
5 changed files with 114 additions and 50 deletions

View file

@ -239,11 +239,10 @@ impl TreeDiffCalculator {
let _e = s.enter();
let to_frontiers = info.to_frontiers;
let from_frontiers = info.from_frontiers;
let (common_ancestors, _mode) = oplog
.dag
.find_common_ancestor(&from_frontiers, &to_frontiers);
let (common_ancestors, _mode) =
oplog.dag.find_common_ancestor(from_frontiers, to_frontiers);
let lca_vv = oplog.dag.frontiers_to_vv(&common_ancestors).unwrap();
let lca_frontiers = lca_vv.to_frontiers(&oplog.dag);
let lca_frontiers = common_ancestors;
tracing::info!(
"from vv {:?} to vv {:?} current vv {:?} lca vv {:?}",
info.from_vv,
@ -252,7 +251,7 @@ impl TreeDiffCalculator {
lca_vv
);
let to_max_lamport = self.get_max_lamport_by_frontiers(&to_frontiers, oplog);
let to_max_lamport = self.get_max_lamport_by_frontiers(to_frontiers, oplog);
let lca_min_lamport = self.get_min_lamport_by_frontiers(&lca_frontiers, oplog);
// retreat for diff

View file

@ -1067,7 +1067,7 @@ impl LoroDoc {
return Ok(());
}
let oplog = self.oplog.lock().unwrap();
let oplog = self.oplog.try_lock().unwrap();
if oplog.dag.is_on_trimmed_history(frontiers) {
drop(oplog);
self.renew_txn_if_auto_commit();
@ -1081,8 +1081,8 @@ impl LoroDoc {
return Ok(());
}
let mut state = self.state.lock().unwrap();
let mut calc = self.diff_calculator.lock().unwrap();
let mut state = self.state.try_lock().unwrap();
let mut calc = self.diff_calculator.try_lock().unwrap();
for &i in frontiers.iter() {
if !oplog.dag.contains(i) {
drop(oplog);
@ -1123,12 +1123,16 @@ impl LoroDoc {
#[inline]
pub fn vv_to_frontiers(&self, vv: &VersionVector) -> Frontiers {
self.oplog.lock().unwrap().dag.vv_to_frontiers(vv)
self.oplog.try_lock().unwrap().dag.vv_to_frontiers(vv)
}
#[inline]
pub fn frontiers_to_vv(&self, frontiers: &Frontiers) -> Option<VersionVector> {
self.oplog.lock().unwrap().dag.frontiers_to_vv(frontiers)
self.oplog
.try_lock()
.unwrap()
.dag
.frontiers_to_vv(frontiers)
}
/// Import ops from other doc.
@ -1150,7 +1154,7 @@ impl LoroDoc {
#[inline]
pub fn len_changes(&self) -> usize {
let oplog = self.oplog.lock().unwrap();
let oplog = self.oplog.try_lock().unwrap();
oplog.len_changes()
}

View file

@ -478,7 +478,7 @@ impl OpLog {
let from_frontiers = match from_frontiers {
Some(f) => f,
None => {
from_frontiers_inner = Some(from.to_frontiers(&self.dag));
from_frontiers_inner = Some(self.dag.vv_to_frontiers(from));
from_frontiers_inner.as_ref().unwrap()
}
};
@ -486,7 +486,7 @@ impl OpLog {
let to_frontiers = match to_frontiers {
Some(t) => t,
None => {
to_frontiers_inner = Some(to.to_frontiers(&self.dag));
to_frontiers_inner = Some(self.dag.vv_to_frontiers(to));
to_frontiers_inner.as_ref().unwrap()
}
};

View file

@ -8,9 +8,9 @@ use loro_common::{HasCounter, HasCounterSpan, HasIdSpan, HasLamportSpan, PeerID}
use once_cell::sync::OnceCell;
use rle::{HasIndex, HasLength, Mergable, Sliceable};
use std::cmp::Ordering;
use std::collections::{BTreeMap, BTreeSet};
use std::collections::{BTreeMap, BTreeSet, BinaryHeap};
use std::fmt::Display;
use std::ops::Deref;
use std::ops::{ControlFlow, Deref};
use std::sync::{Arc, Mutex};
use tracing::{instrument, trace};
@ -590,6 +590,57 @@ impl AppDag {
false
}
/// Travel the ancestors of the given id, and call the callback for each node
///
/// It will travel the ancestors in the reverse order (from the greatest lamport to the smallest)
pub(crate) fn travel_ancestors(
&self,
id: ID,
f: &mut dyn FnMut(&AppDagNode) -> ControlFlow<()>,
) {
struct PendingNode(AppDagNode);
impl PartialEq for PendingNode {
fn eq(&self, other: &Self) -> bool {
self.0.lamport_last() == other.0.lamport_last() && self.0.peer == other.0.peer
}
}
impl Eq for PendingNode {}
impl PartialOrd for PendingNode {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Ord for PendingNode {
fn cmp(&self, other: &Self) -> Ordering {
self.0
.lamport_last()
.cmp(&other.0.lamport_last())
.then_with(|| self.0.peer.cmp(&other.0.peer))
}
}
let mut visited = FxHashSet::default();
let mut pending: BinaryHeap<PendingNode> = BinaryHeap::new();
pending.push(PendingNode(self.get(id).unwrap()));
while let Some(PendingNode(node)) = pending.pop() {
if f(&node).is_break() {
break;
}
for &dep in node.deps.iter() {
let Some(dep_node) = self.get(dep) else {
continue;
};
if visited.contains(&dep_node.id_start()) {
continue;
}
visited.insert(dep_node.id_start());
pending.push(PendingNode(dep_node));
}
}
}
}
fn check_always_dep_on_last_id(map: &BTreeMap<ID, AppDagNode>) {

View file

@ -1,8 +1,9 @@
use loro_common::{HasCounter, HasCounterSpan, IdSpanVector};
use itertools::Itertools;
use loro_common::{HasCounter, HasCounterSpan, HasIdSpan, HasLamportSpan, IdFull, IdSpanVector};
use smallvec::smallvec;
use std::{
cmp::Ordering,
ops::{Deref, DerefMut},
ops::{ControlFlow, Deref, DerefMut},
};
use fxhash::{FxHashMap, FxHashSet};
@ -78,15 +79,6 @@ impl ImVersionVector {
self.0.contains_key(k)
}
/// Convert to a [Frontiers]
///
/// # Panic
///
/// When self is greater than dag.vv
pub fn to_frontiers(&self, dag: &AppDag) -> Frontiers {
dag.im_vv_to_frontiers(self)
}
pub fn encode(&self) -> Vec<u8> {
postcard::to_allocvec(self).unwrap()
}
@ -918,15 +910,6 @@ impl VersionVector {
postcard::from_bytes(bytes).map_err(|_| LoroError::DecodeVersionVectorError)
}
/// Convert to a [Frontiers]
///
/// # Panic
///
/// When self is greater than dag.vv
pub fn to_frontiers(&self, dag: &AppDag) -> Frontiers {
dag.vv_to_frontiers(self)
}
pub(crate) fn trim(&self, vv: &VersionVector) -> VersionVector {
let mut ans = VersionVector::new();
for (client_id, &counter) in self.iter() {
@ -949,10 +932,7 @@ impl VersionVector {
/// Use minimal set of ids to represent the frontiers
pub fn shrink_frontiers(last_ids: &[ID], dag: &AppDag) -> Frontiers {
// it only keep the ids of ops that are concurrent to each other
let mut frontiers = Frontiers::default();
let mut frontiers_vv = Vec::new();
if last_ids.is_empty() {
return frontiers;
}
@ -962,30 +942,60 @@ pub fn shrink_frontiers(last_ids: &[ID], dag: &AppDag) -> Frontiers {
return frontiers;
}
let mut last_ids = last_ids.to_vec();
// sort by lamport, ascending
last_ids.sort_by_cached_key(|x| dag.get_lamport(x).unwrap() as isize);
let mut last_ids = filter_duplicated_peer_id(last_ids)
.into_iter()
.map(|x| IdFull::new(x.peer, x.counter, dag.get_lamport(&x).unwrap()))
.collect_vec();
if last_ids.len() == 1 {
frontiers.push(last_ids[0].id());
return frontiers;
}
// Iterate from the greatest lamport to the smallest
last_ids.sort_by_key(|x| x.lamport);
for id in last_ids.iter().rev() {
let vv = dag.get_vv(*id).unwrap();
let mut should_insert = true;
for f_vv in frontiers_vv.iter() {
if vv.partial_cmp(f_vv).is_some() {
// This is not concurrent op, should be ignored in frontiers
should_insert = false;
break;
}
let mut len = 0;
// travel backward because they have more similar lamport
for f_id in frontiers.iter().rev() {
dag.travel_ancestors(*f_id, &mut |x| {
len += 1;
if x.contains_id(id.id()) {
should_insert = false;
ControlFlow::Break(())
} else if x.lamport_last() < id.lamport {
// Already travel to a node with smaller lamport, no need to continue, we are sure two ops are concurrent now
ControlFlow::Break(())
} else {
ControlFlow::Continue(())
}
});
}
if should_insert {
frontiers.push(*id);
frontiers_vv.push(vv);
frontiers.push(id.id());
}
}
frontiers
}
fn filter_duplicated_peer_id(last_ids: &[ID]) -> Vec<ID> {
let mut peer_max_counters = FxHashMap::default();
for &id in last_ids {
let counter = peer_max_counters.entry(id.peer).or_insert(id.counter);
if id.counter > *counter {
*counter = id.counter;
}
}
peer_max_counters
.into_iter()
.map(|(peer, counter)| ID::new(peer, counter))
.collect()
}
impl Default for VersionVector {
fn default() -> Self {
Self::new()