perf: speed up import by reducing dag nodes

This commit is contained in:
Zixuan Chen 2023-07-18 01:23:49 +08:00
parent d03617ca26
commit 5a233501cc
6 changed files with 212 additions and 51 deletions

7
Cargo.lock generated
View file

@ -198,6 +198,12 @@ version = "1.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610"
[[package]]
name = "bytes"
version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "89b2fd2a0dcf38d7971e2194b6b6eebab45ae01067456a7fd93d5547a61b70be"
[[package]]
name = "cast"
version = "0.3.0"
@ -1043,6 +1049,7 @@ dependencies = [
name = "loro-preload"
version = "0.1.0"
dependencies = [
"bytes",
"loro-common",
"serde",
"serde_columnar",

View file

@ -5,7 +5,7 @@
"fuzz": "cargo +nightly fuzz run",
"quick-fuzz": "deno run -A ./scripts/fuzz.ts text text_refactored recursive encoding recursive_txn",
"mem": "deno run -A ./scripts/run_mem.ts",
"flame": "CARGO_PROFILE_RELEASE_DEBUG=true cargo flamegraph --example automerge_x100 --root",
"flame": "CARGO_PROFILE_RELEASE_DEBUG=true cargo flamegraph --example encoding_refactored --root",
"bench": "cargo bench --features test_utils"
}
}

View file

@ -1,46 +1,75 @@
use bench_utils::TextAction;
use criterion::black_box;
use loro_internal::refactor::loro::LoroApp;
fn main() {
// let actions = bench_utils::get_automerge_actions();
// {
// let loro = LoroApp::default();
// let text = loro.get_text("text");
// let mut txn = loro.txn().unwrap();
// for TextAction { pos, ins, del } in actions.iter() {
// text.delete(&mut txn, *pos, *del);
// text.insert(&mut txn, *pos, ins);
// }
// txn.commit().unwrap();
// let snapshot = loro.export_snapshot();
// let updates = loro.export_from(&Default::default());
// println!("\n");
// println!("Snapshot size={}", snapshot.len());
// println!("Updates size={}", updates.len());
// println!("\n");
// loro.diagnose_size();
// }
// println!("\n");
// println!("\n");
// println!("\n");
// {
// println!("One Transaction Per Action");
// let loro = LoroApp::default();
// let text = loro.get_text("text");
// for TextAction { pos, ins, del } in actions.iter() {
// let mut txn = loro.txn().unwrap();
// text.delete(&mut txn, *pos, *del);
// text.insert(&mut txn, *pos, ins);
// txn.commit().unwrap();
// }
// let snapshot = loro.export_snapshot();
// let updates = loro.export_from(&Default::default());
// println!("\n");
// println!("Snapshot size={}", snapshot.len());
// println!("Updates size={}", updates.len());
// println!("\n");
// loro.diagnose_size();
// }
bench_decode();
}
fn bench_decode() {
let actions = bench_utils::get_automerge_actions();
{
let loro = LoroApp::default();
let text = loro.get_text("text");
let mut txn = loro.txn().unwrap();
let mut txn = loro.txn().unwrap();
let mut n = 0;
for TextAction { pos, ins, del } in actions.iter() {
if n % 10 == 0 {
drop(txn);
txn = loro.txn().unwrap();
}
n += 1;
text.delete(&mut txn, *pos, *del);
text.insert(&mut txn, *pos, ins);
}
txn.commit().unwrap();
let snapshot = loro.export_snapshot();
let updates = loro.export_from(&Default::default());
println!("\n");
println!("Snapshot size={}", snapshot.len());
println!("Updates size={}", updates.len());
println!("\n");
loro.diagnose_size();
}
println!("\n");
println!("\n");
println!("\n");
{
println!("One Transaction Per Action");
let loro = LoroApp::default();
let text = loro.get_text("text");
for TextAction { pos, ins, del } in actions.iter() {
let mut txn = loro.txn().unwrap();
text.delete(&mut txn, *pos, *del);
text.insert(&mut txn, *pos, ins);
txn.commit().unwrap();
for _ in 0..100 {
let loro = LoroApp::new();
loro.import(black_box(&snapshot)).unwrap();
}
let snapshot = loro.export_snapshot();
let updates = loro.export_from(&Default::default());
println!("\n");
println!("Snapshot size={}", snapshot.len());
println!("Updates size={}", updates.len());
println!("\n");
loro.diagnose_size();
}
}

View file

@ -129,7 +129,6 @@ impl OpLog {
));
}
self.dag.vv.extend_to_include_last_id(change.id_last());
if cfg!(debug_assertions) {
let lamport = self.dag.frontiers_to_next_lamport(&change.deps);
assert_eq!(
@ -141,23 +140,34 @@ impl OpLog {
self.next_lamport = self.next_lamport.max(change.lamport_end());
self.latest_timestamp = self.latest_timestamp.max(change.timestamp);
self.dag.vv.extend_to_include_last_id(change.id_last());
self.dag.frontiers.retain_non_included(&change.deps);
self.dag.frontiers.filter_peer(change.id.peer);
self.dag.frontiers.push(change.id_last());
let vv = self.dag.frontiers_to_im_vv(&change.deps);
let len = change.content_len();
self.dag
.map
.entry(change.id.peer)
.or_default()
.push(AppDagNode {
vv,
peer: change.id.peer,
cnt: change.id.counter,
lamport: change.lamport,
deps: change.deps.clone(),
len,
});
if change.deps.len() == 1 && change.deps[0].peer == change.id.peer {
// don't need to push new element to dag because it only depends on itself
let nodes = self.dag.map.get_mut(&change.id.peer).unwrap();
let last = nodes.vec_mut().last_mut().unwrap();
assert_eq!(last.peer, change.id.peer);
assert_eq!(last.cnt + last.len as Counter, change.id.counter);
assert_eq!(last.lamport + last.len as Lamport, change.lamport);
last.len = change.id.counter as usize + len - last.cnt as usize;
} else {
let vv = self.dag.frontiers_to_im_vv(&change.deps);
self.dag
.map
.entry(change.id.peer)
.or_default()
.push(AppDagNode {
vv,
peer: change.id.peer,
cnt: change.id.counter,
lamport: change.lamport,
deps: change.deps.clone(),
len,
});
}
self.changes.entry(change.id.peer).or_default().push(change);
Ok(())
}
@ -443,18 +453,22 @@ impl OpLog {
let mut iter = self.dag.iter_causal(&common_ancestors, diff);
let mut node = iter.next();
let mut cur_cnt = 0;
// reuse the allocated memory in merged_vv...
let vv = Rc::new(RefCell::new(merged_vv));
let vv = Rc::new(RefCell::new(VersionVector::default()));
(
common_ancestors_vv,
common_ancestors_vv.clone(),
std::iter::from_fn(move || {
if let Some(inner) = &node {
let mut inner_vv = vv.borrow_mut();
inner_vv.clear();
inner_vv.extend_to_include_vv(inner.data.vv.iter());
let peer = inner.data.peer;
let cnt = inner.data.cnt.max(cur_cnt);
let end = inner.data.cnt + inner.data.len as Counter;
let cnt = inner
.data
.cnt
.max(cur_cnt)
.max(common_ancestors_vv.get(&peer).copied().unwrap_or(0));
let end = (inner.data.cnt + inner.data.len as Counter)
.min(merged_vv.get(&peer).copied().unwrap_or(0));
let change = self
.changes
.get(&peer)

View file

@ -9,3 +9,4 @@ edition = "2021"
serde = {version="1.0.171", features=["derive"]}
serde_columnar = "0.2.5"
loro-common = {path="../loro-common"}
bytes = "1.4.0"

View file

@ -1,3 +1,4 @@
use bytes::{BufMut, Bytes, BytesMut};
use loro_common::{ContainerID, InternalString, LoroError, LoroValue, ID};
use serde_columnar::{columnar, to_vec};
use std::borrow::Cow;
@ -16,13 +17,57 @@ pub struct FinalPhase<'a> {
impl<'a> FinalPhase<'a> {
#[inline(always)]
pub fn encode(&self) -> Vec<u8> {
to_vec(self).unwrap()
let mut bytes = BytesMut::with_capacity(
self.common.len()
+ self.app_state.len()
+ self.state_arena.len()
+ self.additional_arena.len()
+ self.oplog.len()
+ 10,
);
leb::write_unsigned(&mut bytes, self.common.len() as u64);
bytes.put_slice(&self.common);
leb::write_unsigned(&mut bytes, self.app_state.len() as u64);
bytes.put_slice(&self.app_state);
leb::write_unsigned(&mut bytes, self.state_arena.len() as u64);
bytes.put_slice(&self.state_arena);
leb::write_unsigned(&mut bytes, self.additional_arena.len() as u64);
bytes.put_slice(&self.additional_arena);
leb::write_unsigned(&mut bytes, self.oplog.len() as u64);
bytes.put_slice(&self.oplog);
bytes.to_vec()
}
#[inline(always)]
pub fn decode(bytes: &'a [u8]) -> Result<Self, LoroError> {
serde_columnar::from_bytes(bytes)
.map_err(|e| LoroError::DecodeError(e.to_string().into_boxed_str()))
let mut index = 0;
let len = leb::read_unsigned(bytes, &mut index) as usize;
let common = &bytes[index..index + len];
index += len;
let len = leb::read_unsigned(bytes, &mut index) as usize;
let app_state = &bytes[index..index + len];
index += len;
let len = leb::read_unsigned(bytes, &mut index) as usize;
let state_arena = &bytes[index..index + len];
index += len;
let len = leb::read_unsigned(bytes, &mut index) as usize;
let additional_arena = &bytes[index..index + len];
index += len;
let len = leb::read_unsigned(bytes, &mut index) as usize;
let oplog = &bytes[index..index + len];
Ok(FinalPhase {
common: Cow::Borrowed(common),
app_state: Cow::Borrowed(app_state),
state_arena: Cow::Borrowed(state_arena),
additional_arena: Cow::Borrowed(additional_arena),
oplog: Cow::Borrowed(oplog),
})
}
pub fn diagnose_size(&self) {
@ -124,3 +169,68 @@ impl<'a> TempArena<'a> {
pub fn decode_state(_bytes: &[u8]) -> LoroValue {
unimplemented!()
}
mod leb {
use bytes::{BufMut, BytesMut};
pub const CONTINUATION_BIT: u8 = 1 << 7;
pub fn write_unsigned(w: &mut BytesMut, mut val: u64) -> usize {
let mut bytes_written = 0;
loop {
let mut byte = low_bits_of_u64(val);
val >>= 7;
if val != 0 {
// More bytes to come, so set the continuation bit.
byte |= CONTINUATION_BIT;
}
w.put_u8(byte);
bytes_written += 1;
if val == 0 {
return bytes_written;
}
}
}
#[doc(hidden)]
#[inline]
pub fn low_bits_of_byte(byte: u8) -> u8 {
byte & !CONTINUATION_BIT
}
#[doc(hidden)]
#[inline]
pub fn low_bits_of_u64(val: u64) -> u8 {
let byte = val & (std::u8::MAX as u64);
low_bits_of_byte(byte as u8)
}
pub fn read_unsigned(r: &[u8], index: &mut usize) -> u64 {
let mut result = 0;
let mut shift = 0;
loop {
let mut buf = [r[*index]];
*index += 1;
if shift == 63 && buf[0] != 0x00 && buf[0] != 0x01 {
while buf[0] & CONTINUATION_BIT != 0 {
buf = [r[*index]];
*index += 1;
}
panic!("overflow");
}
let low_bits = low_bits_of_byte(buf[0]) as u64;
result |= low_bits << shift;
if buf[0] & CONTINUATION_BIT == 0 {
return result;
}
shift += 7;
}
}
}