perf: speedup when there are many peers

This commit is contained in:
Zixuan Chen 2023-09-02 18:19:34 +08:00
parent 4ecd850632
commit 345b5bbcb9
No known key found for this signature in database
7 changed files with 153 additions and 34 deletions

View file

@ -1,25 +1,29 @@
use std::time::Instant;
use loro_internal::{LoroDoc, LoroValue};
// #[global_allocator]
// static ALLOC: dhat::Alloc = dhat::Alloc;
#[global_allocator]
static ALLOC: dhat::Alloc = dhat::Alloc;
fn main() {
let start = Instant::now();
// let profiler = dhat::Profiler::builder().trim_backtraces(None).build();
let mut actors: Vec<_> = (0..1540).map(|_| LoroDoc::default()).collect();
let mut actors: Vec<_> = (0..1000).map(|_| LoroDoc::default()).collect();
let mut updates: Vec<Vec<u8>> = Vec::new();
for (i, actor) in actors.iter_mut().enumerate() {
let list = actor.get_list("list");
let value: LoroValue = i.to_string().into();
let mut txn = actor.txn().unwrap();
list.insert(&mut txn, 0, value).unwrap();
txn.commit().unwrap();
updates.push(actor.export_from(&Default::default()));
}
// drop(profiler);
println!("{}", start.elapsed().as_millis());
drop(actors);
todo!();
// actors[0].decode_batch(&updates).unwrap();
let profiler = dhat::Profiler::builder().trim_backtraces(None).build();
let start = Instant::now();
let mut actor = LoroDoc::default();
actor.import_batch(&updates).unwrap();
println!("{} bytes", updates.iter().map(|x| x.len()).sum::<usize>());
// dbg!(actor.get_state_deep_value());
println!("{} ms", start.elapsed().as_millis());
drop(profiler);
}

View file

@ -1,5 +1,5 @@
use fxhash::{FxHashMap, FxHashSet};
use loro_common::HasLamportSpan;
use loro_common::{HasCounterSpan, HasLamportSpan};
use rle::{HasLength, RleVec};
use serde::{Deserialize, Serialize};
use serde_columnar::{columnar, from_bytes, to_vec};
@ -570,6 +570,7 @@ pub fn decode_oplog_v2(oplog: &mut OpLog, input: &[u8]) -> Result<(), LoroError>
assert_eq!(last.cnt + last.len as Counter, change.id.counter);
assert_eq!(last.lamport + last.len as Lamport, change.lamport);
last.len = change.id.counter as usize + len - last.cnt as usize;
last.has_succ = false;
} else {
let vv = oplog.dag.frontiers_to_im_vv(&change.deps);
oplog
@ -583,8 +584,15 @@ pub fn decode_oplog_v2(oplog: &mut OpLog, input: &[u8]) -> Result<(), LoroError>
cnt: change.id.counter,
lamport: change.lamport,
deps: change.deps.clone(),
has_succ: false,
len,
});
for dep in change.deps.iter() {
let target = oplog.dag.get_mut(*dep).unwrap();
if target.ctr_last() == dep.counter {
target.has_succ = true;
}
}
}
oplog.next_lamport = oplog.next_lamport.max(change.lamport_end());
oplog.latest_timestamp = oplog.latest_timestamp.max(change.timestamp);
@ -601,7 +609,9 @@ pub fn decode_oplog_v2(oplog: &mut OpLog, input: &[u8]) -> Result<(), LoroError>
});
// update dag frontiers
oplog.dag.frontiers = oplog.dag.vv_to_frontiers(&oplog.dag.vv);
if !oplog.batch_importing {
oplog.dag.refresh_frontiers();
}
assert_eq!(str_index, str.len());
Ok(())
}

View file

@ -363,9 +363,31 @@ impl LoroDoc {
}
// PERF: opt
pub fn import_batch(&self, bytes: &[Vec<u8>]) -> LoroResult<()> {
pub fn import_batch(&mut self, bytes: &[Vec<u8>]) -> LoroResult<()> {
let is_detached = self.is_detached();
self.detach();
self.oplog.lock().unwrap().batch_importing = true;
let mut err = None;
for data in bytes.iter() {
self.import(data)?;
match self.import(data) {
Ok(_) => {}
Err(e) => {
err = Some(e);
}
}
}
let mut oplog = self.oplog.lock().unwrap();
oplog.batch_importing = false;
oplog.dag.refresh_frontiers();
drop(oplog);
if !is_detached {
self.checkout_to_latest();
}
if let Some(err) = err {
return Err(err);
}
Ok(())

View file

@ -6,13 +6,13 @@ use std::cmp::Ordering;
use std::rc::Rc;
use fxhash::FxHashMap;
use loro_common::HasId;
use loro_common::{HasCounter, HasId};
use rle::{HasLength, RleVec};
// use tabled::measurment::Percent;
use crate::change::{Change, Lamport, Timestamp};
use crate::container::list::list_op;
use crate::dag::DagUtils;
use crate::dag::{Dag, DagUtils};
use crate::encoding::{decode_oplog, encode_oplog, EncodeMode};
use crate::encoding::{ClientChanges, RemoteClientChanges};
use crate::id::{Counter, PeerID, ID};
@ -41,6 +41,9 @@ pub struct OpLog {
/// A change can be imported only when all its deps are already imported.
/// Key is the ID of the missing dep
pending_changes: FxHashMap<ID, Vec<Change>>,
/// Whether we are importing a batch of changes.
/// If so the Dag's frontiers won't be updated until the batch is finished.
pub(crate) batch_importing: bool,
}
/// [AppDag] maintains the causal graph of the app.
@ -59,6 +62,7 @@ pub struct AppDagNode {
pub(crate) lamport: Lamport,
pub(crate) deps: Frontiers,
pub(crate) vv: ImVersionVector,
pub(crate) has_succ: bool,
pub(crate) len: usize,
}
@ -71,10 +75,43 @@ impl Clone for OpLog {
next_lamport: self.next_lamport,
latest_timestamp: self.latest_timestamp,
pending_changes: Default::default(),
batch_importing: false,
}
}
}
impl AppDag {
pub fn get_mut(&mut self, id: ID) -> Option<&mut AppDagNode> {
let ID {
peer: client_id,
counter,
} = id;
self.map.get_mut(&client_id).and_then(|rle| {
if counter >= rle.atom_len() {
return None;
}
let index = rle.search_atom_index(counter);
Some(&mut rle.vec_mut()[index])
})
}
pub(crate) fn refresh_frontiers(&mut self) {
self.frontiers = self
.map
.iter()
.filter(|(_, vec)| {
if vec.is_empty() {
return false;
}
!vec.last().unwrap().has_succ
})
.map(|(peer, vec)| ID::new(*peer, vec.last().unwrap().ctr_last()))
.collect();
}
}
impl std::fmt::Debug for OpLog {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("OpLog")
@ -95,6 +132,7 @@ impl OpLog {
next_lamport: 0,
latest_timestamp: Timestamp::default(),
pending_changes: Default::default(),
batch_importing: false,
}
}
@ -169,6 +207,7 @@ impl OpLog {
assert_eq!(last.cnt + last.len as Counter, change.id.counter);
assert_eq!(last.lamport + last.len as Lamport, change.lamport);
last.len = change.id.counter as usize + len - last.cnt as usize;
last.has_succ = false;
} else {
let vv = self.dag.frontiers_to_im_vv(&change.deps);
self.dag
@ -181,8 +220,16 @@ impl OpLog {
cnt: change.id.counter,
lamport: change.lamport,
deps: change.deps.clone(),
has_succ: false,
len,
});
for dep in change.deps.iter() {
let target = self.dag.get_mut(*dep).unwrap();
if target.ctr_last() == dep.counter {
target.has_succ = true;
}
}
}
self.changes.entry(change.id.peer).or_default().push(change);
Ok(())
@ -453,6 +500,7 @@ impl OpLog {
assert_eq!(last.cnt + last.len as Counter, change.id.counter);
assert_eq!(last.lamport + last.len as Lamport, change.lamport);
last.len = change.id.counter as usize + len - last.cnt as usize;
last.has_succ = false;
} else {
let vv = self.dag.frontiers_to_im_vv(&change.deps);
self.dag
@ -465,13 +513,23 @@ impl OpLog {
cnt: change.id.counter,
lamport: change.lamport,
deps: change.deps.clone(),
has_succ: false,
len,
});
for dep in change.deps.iter() {
let target = self.dag.get_mut(*dep).unwrap();
if target.ctr_last() == dep.counter {
target.has_succ = true;
}
}
}
self.changes.entry(change.id.peer).or_default().push(change);
}
self.dag.frontiers = self.dag.vv_to_frontiers(&self.dag.vv);
if !self.batch_importing {
self.dag.refresh_frontiers();
}
Ok(())
}

View file

@ -28,6 +28,7 @@ impl Sliceable for AppDagNode {
lamport: self.lamport + from as Lamport,
deps: Default::default(),
vv: Default::default(),
has_succ: if to == self.len { self.has_succ } else { true },
len: to - from,
}
}
@ -141,8 +142,12 @@ impl AppDag {
pub fn frontiers_to_vv(&self, frontiers: &Frontiers) -> Option<VersionVector> {
let mut vv: VersionVector = Default::default();
for id in frontiers.iter() {
let Some(rle) = self.map.get(&id.peer) else { return None };
let Some(x) = rle.get_by_atom_index(id.counter) else { return None };
let Some(rle) = self.map.get(&id.peer) else {
return None;
};
let Some(x) = rle.get_by_atom_index(id.counter) else {
return None;
};
vv.extend_to_include_vv(x.element.vv.iter());
vv.extend_to_include_last_id(*id);
}
@ -157,16 +162,24 @@ impl AppDag {
let mut vv = {
let id = frontiers[0];
let Some(rle) = self.map.get(&id.peer) else { unreachable!() };
let Some(x) = rle.get_by_atom_index(id.counter) else { unreachable!() };
let Some(rle) = self.map.get(&id.peer) else {
unreachable!()
};
let Some(x) = rle.get_by_atom_index(id.counter) else {
unreachable!()
};
let mut vv = x.element.vv.clone();
vv.extend_to_include_last_id(id);
vv
};
for id in frontiers[1..].iter() {
let Some(rle) = self.map.get(&id.peer) else { unreachable!() };
let Some(x) = rle.get_by_atom_index(id.counter) else { unreachable!() };
let Some(rle) = self.map.get(&id.peer) else {
unreachable!()
};
let Some(x) = rle.get_by_atom_index(id.counter) else {
unreachable!()
};
vv.extend_to_include_vv(x.element.vv.iter());
vv.extend_to_include_last_id(*id);
}
@ -186,14 +199,22 @@ impl AppDag {
let mut lamport = {
let id = frontiers[0];
let Some(rle) = self.map.get(&id.peer) else { unreachable!() };
let Some(x) = rle.get_by_atom_index(id.counter) else { unreachable!("{} not found", id) };
let Some(rle) = self.map.get(&id.peer) else {
unreachable!()
};
let Some(x) = rle.get_by_atom_index(id.counter) else {
unreachable!("{} not found", id)
};
(id.counter - x.element.cnt) as Lamport + x.element.lamport + 1
};
for id in frontiers[1..].iter() {
let Some(rle) = self.map.get(&id.peer) else { unreachable!() };
let Some(x) = rle.get_by_atom_index(id.counter) else { unreachable!() };
let Some(rle) = self.map.get(&id.peer) else {
unreachable!()
};
let Some(x) = rle.get_by_atom_index(id.counter) else {
unreachable!()
};
lamport = lamport.max((id.counter - x.element.cnt) as Lamport + x.element.lamport + 1);
}

View file

@ -266,7 +266,7 @@ impl Loro {
}
#[wasm_bindgen(js_name = "importUpdateBatch")]
pub fn import_update_batch(&self, data: Array) -> JsResult<()> {
pub fn import_update_batch(&mut self, data: Array) -> JsResult<()> {
let data = data
.iter()
.map(|x| {

View file

@ -336,6 +336,16 @@ where
return None;
}
let merged_index = self.search_atom_index(index);
let value = &self.vec[merged_index];
Some(SearchResult {
merged_index,
element: value,
offset: index - self[merged_index].get_start_index(),
})
}
pub fn search_atom_index(&self, index: <<A as Array>::Item as HasIndex>::Int) -> usize {
let mut start = 0;
let mut end = self.vec.len() - 1;
while start < end {
@ -357,13 +367,7 @@ where
if index < self[start].get_start_index() {
start -= 1;
}
let value = &self.vec[start];
Some(SearchResult {
element: value,
merged_index: start,
offset: index - self[start].get_start_index(),
})
start
}
pub fn slice_iter(