perf: make import remote changes faster

This commit is contained in:
Zixuan Chen 2023-08-06 01:41:22 +08:00
parent dc5159bf67
commit 7d5ee5f6d0
2 changed files with 39 additions and 3 deletions

View file

@ -261,6 +261,7 @@ impl SharedArena {
(self.inner.values.lock().unwrap()[range]).to_vec()
}
#[inline(always)]
pub(crate) fn with_op_converter(&self, f: impl FnOnce(&mut OpConverter)) {
let mut op_converter = OpConverter {
container_idx_to_id: self.inner.container_idx_to_id.lock().unwrap(),

View file

@ -121,7 +121,7 @@ impl OpLog {
/// - Return Err(LoroError::UsedOpID) when the change's id is occupied
/// - Return Err(LoroError::DecodeError) when the change's deps are missing
pub fn import_local_change(&mut self, change: Change) -> Result<(), LoroError> {
self.check_id_valid(change.id)?;
self.check_id_is_not_duplicated(change.id)?;
if let Err(id) = self.check_deps(&change.deps) {
self.pending_changes.entry(id).or_default().push(change);
return Err(LoroError::DecodeError(
@ -172,7 +172,7 @@ impl OpLog {
Ok(())
}
fn check_id_valid(&self, id: ID) -> Result<(), LoroError> {
fn check_id_is_not_duplicated(&self, id: ID) -> Result<(), LoroError> {
let cur_end = self.dag.vv.get(&id.peer).cloned().unwrap_or(0);
if cur_end > id.counter {
return Err(LoroError::UsedOpID { id });
@ -340,11 +340,21 @@ impl OpLog {
) -> Result<(), LoroError> {
let len = changes.iter().fold(0, |last, this| last + this.1.len());
let mut change_causal_arr = Vec::with_capacity(len);
// op_converter is faster than using arena directly
self.arena.with_op_converter(|converter| {
for (peer, changes) in changes {
if changes.is_empty() {
continue;
}
let cur_end_cnt = self.changes.get(&peer).map(|x| x.atom_len()).unwrap_or(0);
let last_change = changes.last().unwrap();
self.dag.vv.extend_to_include_last_id(last_change.id_last());
self.next_lamport = self.next_lamport.max(last_change.lamport_end());
self.latest_timestamp = self.latest_timestamp.max(last_change.timestamp);
for change in changes {
if change.id.counter < cur_end_cnt {
// truncate included changes
continue;
}
@ -373,9 +383,34 @@ impl OpLog {
change_causal_arr.sort_by_key(|x| x.lamport);
// debug_dbg!(&change_causal_arr);
for change in change_causal_arr {
self.import_local_change(change)?;
let len = change.content_len();
if change.deps.len() == 1 && change.deps[0].peer == change.id.peer {
// don't need to push new element to dag because it only depends on itself
let nodes = self.dag.map.get_mut(&change.id.peer).unwrap();
let last = nodes.vec_mut().last_mut().unwrap();
assert_eq!(last.peer, change.id.peer);
assert_eq!(last.cnt + last.len as Counter, change.id.counter);
assert_eq!(last.lamport + last.len as Lamport, change.lamport);
last.len = change.id.counter as usize + len - last.cnt as usize;
} else {
let vv = self.dag.frontiers_to_im_vv(&change.deps);
self.dag
.map
.entry(change.id.peer)
.or_default()
.push(AppDagNode {
vv,
peer: change.id.peer,
cnt: change.id.counter,
lamport: change.lamport,
deps: change.deps.clone(),
len,
});
}
self.changes.entry(change.id.peer).or_default().push(change);
}
self.dag.frontiers = self.dag.vv_to_frontiers(&self.dag.vv);
Ok(())
}