From 5a233501ccbaa6411d78cb10eb1e27ae58712fd8 Mon Sep 17 00:00:00 2001 From: Zixuan Chen Date: Tue, 18 Jul 2023 01:23:49 +0800 Subject: [PATCH] perf: speed up import by reducing dag nodes --- Cargo.lock | 7 ++ crates/loro-internal/deno.json | 2 +- .../examples/encoding_refactored.rs | 85 ++++++++----- crates/loro-internal/src/refactor/oplog.rs | 52 +++++--- crates/loro-preload/Cargo.toml | 1 + crates/loro-preload/src/encode.rs | 116 +++++++++++++++++- 6 files changed, 212 insertions(+), 51 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1d35983f..9fca0bf4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -198,6 +198,12 @@ version = "1.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610" +[[package]] +name = "bytes" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "89b2fd2a0dcf38d7971e2194b6b6eebab45ae01067456a7fd93d5547a61b70be" + [[package]] name = "cast" version = "0.3.0" @@ -1043,6 +1049,7 @@ dependencies = [ name = "loro-preload" version = "0.1.0" dependencies = [ + "bytes", "loro-common", "serde", "serde_columnar", diff --git a/crates/loro-internal/deno.json b/crates/loro-internal/deno.json index d5509cc3..1fb2886f 100644 --- a/crates/loro-internal/deno.json +++ b/crates/loro-internal/deno.json @@ -5,7 +5,7 @@ "fuzz": "cargo +nightly fuzz run", "quick-fuzz": "deno run -A ./scripts/fuzz.ts text text_refactored recursive encoding recursive_txn", "mem": "deno run -A ./scripts/run_mem.ts", - "flame": "CARGO_PROFILE_RELEASE_DEBUG=true cargo flamegraph --example automerge_x100 --root", + "flame": "CARGO_PROFILE_RELEASE_DEBUG=true cargo flamegraph --example encoding_refactored --root", "bench": "cargo bench --features test_utils" } } diff --git a/crates/loro-internal/examples/encoding_refactored.rs b/crates/loro-internal/examples/encoding_refactored.rs index 3bd84661..1d4dbc9a 100644 --- a/crates/loro-internal/examples/encoding_refactored.rs +++ b/crates/loro-internal/examples/encoding_refactored.rs @@ -1,46 +1,75 @@ use bench_utils::TextAction; +use criterion::black_box; use loro_internal::refactor::loro::LoroApp; fn main() { + // let actions = bench_utils::get_automerge_actions(); + // { + // let loro = LoroApp::default(); + // let text = loro.get_text("text"); + // let mut txn = loro.txn().unwrap(); + + // for TextAction { pos, ins, del } in actions.iter() { + // text.delete(&mut txn, *pos, *del); + // text.insert(&mut txn, *pos, ins); + // } + // txn.commit().unwrap(); + // let snapshot = loro.export_snapshot(); + // let updates = loro.export_from(&Default::default()); + // println!("\n"); + // println!("Snapshot size={}", snapshot.len()); + // println!("Updates size={}", updates.len()); + // println!("\n"); + // loro.diagnose_size(); + // } + // println!("\n"); + // println!("\n"); + // println!("\n"); + // { + // println!("One Transaction Per Action"); + // let loro = LoroApp::default(); + // let text = loro.get_text("text"); + + // for TextAction { pos, ins, del } in actions.iter() { + // let mut txn = loro.txn().unwrap(); + // text.delete(&mut txn, *pos, *del); + // text.insert(&mut txn, *pos, ins); + // txn.commit().unwrap(); + // } + // let snapshot = loro.export_snapshot(); + // let updates = loro.export_from(&Default::default()); + // println!("\n"); + // println!("Snapshot size={}", snapshot.len()); + // println!("Updates size={}", updates.len()); + // println!("\n"); + // loro.diagnose_size(); + // } + + bench_decode(); +} + +fn bench_decode() { let actions = bench_utils::get_automerge_actions(); { let loro = LoroApp::default(); let text = loro.get_text("text"); - let mut txn = loro.txn().unwrap(); + let mut txn = loro.txn().unwrap(); + let mut n = 0; for TextAction { pos, ins, del } in actions.iter() { + if n % 10 == 0 { + drop(txn); + txn = loro.txn().unwrap(); + } + n += 1; text.delete(&mut txn, *pos, *del); text.insert(&mut txn, *pos, ins); } txn.commit().unwrap(); let snapshot = loro.export_snapshot(); - let updates = loro.export_from(&Default::default()); - println!("\n"); - println!("Snapshot size={}", snapshot.len()); - println!("Updates size={}", updates.len()); - println!("\n"); - loro.diagnose_size(); - } - println!("\n"); - println!("\n"); - println!("\n"); - { - println!("One Transaction Per Action"); - let loro = LoroApp::default(); - let text = loro.get_text("text"); - - for TextAction { pos, ins, del } in actions.iter() { - let mut txn = loro.txn().unwrap(); - text.delete(&mut txn, *pos, *del); - text.insert(&mut txn, *pos, ins); - txn.commit().unwrap(); + for _ in 0..100 { + let loro = LoroApp::new(); + loro.import(black_box(&snapshot)).unwrap(); } - let snapshot = loro.export_snapshot(); - let updates = loro.export_from(&Default::default()); - println!("\n"); - println!("Snapshot size={}", snapshot.len()); - println!("Updates size={}", updates.len()); - println!("\n"); - loro.diagnose_size(); } } diff --git a/crates/loro-internal/src/refactor/oplog.rs b/crates/loro-internal/src/refactor/oplog.rs index c2114887..ad12bd26 100644 --- a/crates/loro-internal/src/refactor/oplog.rs +++ b/crates/loro-internal/src/refactor/oplog.rs @@ -129,7 +129,6 @@ impl OpLog { )); } - self.dag.vv.extend_to_include_last_id(change.id_last()); if cfg!(debug_assertions) { let lamport = self.dag.frontiers_to_next_lamport(&change.deps); assert_eq!( @@ -141,23 +140,34 @@ impl OpLog { self.next_lamport = self.next_lamport.max(change.lamport_end()); self.latest_timestamp = self.latest_timestamp.max(change.timestamp); + self.dag.vv.extend_to_include_last_id(change.id_last()); self.dag.frontiers.retain_non_included(&change.deps); self.dag.frontiers.filter_peer(change.id.peer); self.dag.frontiers.push(change.id_last()); - let vv = self.dag.frontiers_to_im_vv(&change.deps); let len = change.content_len(); - self.dag - .map - .entry(change.id.peer) - .or_default() - .push(AppDagNode { - vv, - peer: change.id.peer, - cnt: change.id.counter, - lamport: change.lamport, - deps: change.deps.clone(), - len, - }); + if change.deps.len() == 1 && change.deps[0].peer == change.id.peer { + // don't need to push new element to dag because it only depends on itself + let nodes = self.dag.map.get_mut(&change.id.peer).unwrap(); + let last = nodes.vec_mut().last_mut().unwrap(); + assert_eq!(last.peer, change.id.peer); + assert_eq!(last.cnt + last.len as Counter, change.id.counter); + assert_eq!(last.lamport + last.len as Lamport, change.lamport); + last.len = change.id.counter as usize + len - last.cnt as usize; + } else { + let vv = self.dag.frontiers_to_im_vv(&change.deps); + self.dag + .map + .entry(change.id.peer) + .or_default() + .push(AppDagNode { + vv, + peer: change.id.peer, + cnt: change.id.counter, + lamport: change.lamport, + deps: change.deps.clone(), + len, + }); + } self.changes.entry(change.id.peer).or_default().push(change); Ok(()) } @@ -443,18 +453,22 @@ impl OpLog { let mut iter = self.dag.iter_causal(&common_ancestors, diff); let mut node = iter.next(); let mut cur_cnt = 0; - // reuse the allocated memory in merged_vv... - let vv = Rc::new(RefCell::new(merged_vv)); + let vv = Rc::new(RefCell::new(VersionVector::default())); ( - common_ancestors_vv, + common_ancestors_vv.clone(), std::iter::from_fn(move || { if let Some(inner) = &node { let mut inner_vv = vv.borrow_mut(); inner_vv.clear(); inner_vv.extend_to_include_vv(inner.data.vv.iter()); let peer = inner.data.peer; - let cnt = inner.data.cnt.max(cur_cnt); - let end = inner.data.cnt + inner.data.len as Counter; + let cnt = inner + .data + .cnt + .max(cur_cnt) + .max(common_ancestors_vv.get(&peer).copied().unwrap_or(0)); + let end = (inner.data.cnt + inner.data.len as Counter) + .min(merged_vv.get(&peer).copied().unwrap_or(0)); let change = self .changes .get(&peer) diff --git a/crates/loro-preload/Cargo.toml b/crates/loro-preload/Cargo.toml index 36a77008..404d7c39 100644 --- a/crates/loro-preload/Cargo.toml +++ b/crates/loro-preload/Cargo.toml @@ -9,3 +9,4 @@ edition = "2021" serde = {version="1.0.171", features=["derive"]} serde_columnar = "0.2.5" loro-common = {path="../loro-common"} +bytes = "1.4.0" diff --git a/crates/loro-preload/src/encode.rs b/crates/loro-preload/src/encode.rs index c10c1c77..7305e7d7 100644 --- a/crates/loro-preload/src/encode.rs +++ b/crates/loro-preload/src/encode.rs @@ -1,3 +1,4 @@ +use bytes::{BufMut, Bytes, BytesMut}; use loro_common::{ContainerID, InternalString, LoroError, LoroValue, ID}; use serde_columnar::{columnar, to_vec}; use std::borrow::Cow; @@ -16,13 +17,57 @@ pub struct FinalPhase<'a> { impl<'a> FinalPhase<'a> { #[inline(always)] pub fn encode(&self) -> Vec { - to_vec(self).unwrap() + let mut bytes = BytesMut::with_capacity( + self.common.len() + + self.app_state.len() + + self.state_arena.len() + + self.additional_arena.len() + + self.oplog.len() + + 10, + ); + + leb::write_unsigned(&mut bytes, self.common.len() as u64); + bytes.put_slice(&self.common); + leb::write_unsigned(&mut bytes, self.app_state.len() as u64); + bytes.put_slice(&self.app_state); + leb::write_unsigned(&mut bytes, self.state_arena.len() as u64); + bytes.put_slice(&self.state_arena); + leb::write_unsigned(&mut bytes, self.additional_arena.len() as u64); + bytes.put_slice(&self.additional_arena); + leb::write_unsigned(&mut bytes, self.oplog.len() as u64); + bytes.put_slice(&self.oplog); + bytes.to_vec() } #[inline(always)] pub fn decode(bytes: &'a [u8]) -> Result { - serde_columnar::from_bytes(bytes) - .map_err(|e| LoroError::DecodeError(e.to_string().into_boxed_str())) + let mut index = 0; + let len = leb::read_unsigned(bytes, &mut index) as usize; + let common = &bytes[index..index + len]; + index += len; + + let len = leb::read_unsigned(bytes, &mut index) as usize; + let app_state = &bytes[index..index + len]; + index += len; + + let len = leb::read_unsigned(bytes, &mut index) as usize; + let state_arena = &bytes[index..index + len]; + index += len; + + let len = leb::read_unsigned(bytes, &mut index) as usize; + let additional_arena = &bytes[index..index + len]; + index += len; + + let len = leb::read_unsigned(bytes, &mut index) as usize; + let oplog = &bytes[index..index + len]; + + Ok(FinalPhase { + common: Cow::Borrowed(common), + app_state: Cow::Borrowed(app_state), + state_arena: Cow::Borrowed(state_arena), + additional_arena: Cow::Borrowed(additional_arena), + oplog: Cow::Borrowed(oplog), + }) } pub fn diagnose_size(&self) { @@ -124,3 +169,68 @@ impl<'a> TempArena<'a> { pub fn decode_state(_bytes: &[u8]) -> LoroValue { unimplemented!() } + +mod leb { + use bytes::{BufMut, BytesMut}; + pub const CONTINUATION_BIT: u8 = 1 << 7; + + pub fn write_unsigned(w: &mut BytesMut, mut val: u64) -> usize { + let mut bytes_written = 0; + loop { + let mut byte = low_bits_of_u64(val); + val >>= 7; + if val != 0 { + // More bytes to come, so set the continuation bit. + byte |= CONTINUATION_BIT; + } + + w.put_u8(byte); + bytes_written += 1; + + if val == 0 { + return bytes_written; + } + } + } + + #[doc(hidden)] + #[inline] + pub fn low_bits_of_byte(byte: u8) -> u8 { + byte & !CONTINUATION_BIT + } + + #[doc(hidden)] + #[inline] + pub fn low_bits_of_u64(val: u64) -> u8 { + let byte = val & (std::u8::MAX as u64); + low_bits_of_byte(byte as u8) + } + + pub fn read_unsigned(r: &[u8], index: &mut usize) -> u64 { + let mut result = 0; + let mut shift = 0; + + loop { + let mut buf = [r[*index]]; + *index += 1; + + if shift == 63 && buf[0] != 0x00 && buf[0] != 0x01 { + while buf[0] & CONTINUATION_BIT != 0 { + buf = [r[*index]]; + *index += 1; + } + + panic!("overflow"); + } + + let low_bits = low_bits_of_byte(buf[0]) as u64; + result |= low_bits << shift; + + if buf[0] & CONTINUATION_BIT == 0 { + return result; + } + + shift += 7; + } + } +}