From 3d2d9d9c183ab3fdffa67fc33b713a8ede71bff2 Mon Sep 17 00:00:00 2001 From: Zixuan Chen Date: Fri, 6 Sep 2024 16:26:50 +0800 Subject: [PATCH 1/5] refactor: optimize block encoder --- crates/loro-internal/src/encoding/arena.rs | 16 ++ .../loro-internal/src/oplog/change_store.rs | 14 ++ .../src/oplog/change_store/block_encode.rs | 142 +++++++++++++----- 3 files changed, 131 insertions(+), 41 deletions(-) diff --git a/crates/loro-internal/src/encoding/arena.rs b/crates/loro-internal/src/encoding/arena.rs index bb3e303f..6597256e 100644 --- a/crates/loro-internal/src/encoding/arena.rs +++ b/crates/loro-internal/src/encoding/arena.rs @@ -499,6 +499,22 @@ impl<'a> PositionArena<'a> { pub fn decode<'de: 'a>(bytes: &'de [u8]) -> LoroResult { Ok(serde_columnar::from_bytes(bytes)?) } + + pub fn encode_v2(&self) -> Vec { + if self.positions.is_empty() { + return Vec::new(); + } + + serde_columnar::to_vec(&self).unwrap() + } + + pub fn decode_v2<'de: 'a>(bytes: &'de [u8]) -> LoroResult { + if bytes.is_empty() { + return Ok(Self::default()); + } + + Ok(serde_columnar::from_bytes(bytes)?) + } } fn longest_common_prefix_length(a: &[u8], b: &[u8]) -> usize { diff --git a/crates/loro-internal/src/oplog/change_store.rs b/crates/loro-internal/src/oplog/change_store.rs index a77441e6..29f9bddd 100644 --- a/crates/loro-internal/src/oplog/change_store.rs +++ b/crates/loro-internal/src/oplog/change_store.rs @@ -44,6 +44,20 @@ const MAX_BLOCK_SIZE: usize = 128; /// - We don't allow holes in a block or between two blocks with the same peer id. /// The [Change] should be continuous for each peer. /// - However, the first block of a peer can have counter > 0 so that we can trim the history. +/// +/// # Encoding Schema +/// +/// It's based on the underlying KV store. +/// +/// The entries of the KV store is made up of the following fields +/// +/// |Key |Value | +/// |:-- |:---- | +/// |b"vv" |VersionVector | +/// |b"fr" |Frontiers | +/// |b"sv" |Trimmed VV | +/// |b"sf" |Trimmed Frontiers | +/// |12 bytes PeerID + Counter |Encoded Block | #[derive(Debug, Clone)] pub struct ChangeStore { inner: Arc>, diff --git a/crates/loro-internal/src/oplog/change_store/block_encode.rs b/crates/loro-internal/src/oplog/change_store/block_encode.rs index eb7750c5..855c747f 100644 --- a/crates/loro-internal/src/oplog/change_store/block_encode.rs +++ b/crates/loro-internal/src/oplog/change_store/block_encode.rs @@ -8,9 +8,9 @@ //! Peer_1 = This Peer //! //! -//! ┌──────────┬─────────────────────────┬─────────────────────────┐ -//! │2B Version│ LEB Counter Start&Len │ LEB Lamport Start&Len │◁───┐ -//! └──────────┴─────────────────────────┴─────────────────────────┘ │ +//! ┌────────────────────────┬─────────────────────────────────────┐ +//! │ LEB Counter Start&Len │ LEB Lamport Start&Len │◁───┐ +//! └────────────────────────┴─────────────────────────────────────┘ │ //! ┌──────────────┬──────────────┬────────────────────────────────┐ │ //! │ LEB N │ LEB Peer Num │ 8B Peer_1,...,Peer_x │◁───┤ //! └──────────────┴──────────────┴────────────────────────────────┘ │ @@ -52,19 +52,17 @@ //! │ │ │ │ │ │ //! │ │ │ │ │ │ //! └────────┴──────────┴──────────┴───────┴───────────────────────┘ +//! ┌──────────────────────────────────────────────────────────────┐ +//! │ (Encoded with Ops By serde_columnar) │ +//! │ Delete Start IDs │ +//! │ │ +//! └──────────────────────────────────────────────────────────────┘ //! ┌────────────────┬─────────────────────────────────────────────┐ //! │ │ │ //! │Value Bytes Size│ Value Bytes │ //! │ │ │ //! └────────────────┴─────────────────────────────────────────────┘ -//! ┌──────────────────────────────────────────────────────────────┐ -//! │ │ -//! │ Delete Start IDs │ -//! │ │ -//! └──────────────────────────────────────────────────────────────┘ //! ``` -//! -//! use std::borrow::Cow; use std::collections::BTreeSet; @@ -80,6 +78,7 @@ use once_cell::sync::OnceCell; use rle::HasLength; use serde::{Deserialize, Serialize}; use serde_columnar::{columnar, DeltaRleDecoder, Itertools}; +use tracing::info; use super::delta_rle_encode::{UnsignedDeltaDecoder, UnsignedDeltaEncoder}; use crate::arena::SharedArena; @@ -97,13 +96,11 @@ use serde_columnar::{ #[derive(Debug, Serialize, Deserialize)] struct EncodedBlock<'a> { - version: u16, counter_start: u32, counter_len: u32, lamport_start: u32, lamport_len: u32, n_changes: u32, - first_counter: u32, #[serde(borrow)] peers: Cow<'a, [u8]>, #[serde(borrow)] @@ -134,9 +131,36 @@ struct EncodedBlock<'a> { #[serde(borrow)] ops: Cow<'a, [u8]>, #[serde(borrow)] + delete_start_ids: Cow<'a, [u8]>, + #[serde(borrow)] values: Cow<'a, [u8]>, } +fn diagnose_block(block: &EncodedBlock) { + use std::mem; + + info!("Diagnosing EncodedBlock:"); + info!(" peers: {} bytes", block.peers.len()); + info!(" lengths: {} bytes", block.lengths.len()); + info!(" dep_on_self: {} bytes", block.dep_on_self.len()); + info!(" dep_len: {} bytes", block.dep_len.len()); + info!(" dep_peer_idxs: {} bytes", block.dep_peer_idxs.len()); + info!(" dep_counters: {} bytes", block.dep_counters.len()); + info!(" lamports: {} bytes", block.lamports.len()); + info!(" timestamps: {} bytes", block.timestamps.len()); + info!( + " commit_msg_lengths: {} bytes", + block.commit_msg_lengths.len() + ); + info!(" commit_msgs: {} bytes", block.commit_msgs.len()); + info!(" cids: {} bytes", block.cids.len()); + info!(" keys: {} bytes", block.keys.len()); + info!(" positions: {} bytes", block.positions.len()); + info!(" ops: {} bytes", block.ops.len()); + info!(" delete_id_starts: {} bytes", block.delete_start_ids.len()); + info!(" values: {} bytes", block.values.len()); +} + const VERSION: u16 = 0; // MARK: encode_block @@ -161,6 +185,7 @@ pub fn encode_block(block: &[Change], arena: &SharedArena) -> Vec { peer_idx: AnyRleEncoder::new(), counter: AnyRleEncoder::new(), }; + for c in block { timestamp_encoder.append(c.timestamp()).unwrap(); lamport_encoder.push(c.lamport() as u64); @@ -277,7 +302,7 @@ pub fn encode_block(block: &[Change], arena: &SharedArena) -> Vec { // └────────────────────┴─────────────────────────────────────────┘ let position_vec = registers.position_register.unwrap_vec(); let positions = PositionArena::from_positions(position_vec.iter().map(|p| p.as_bytes())); - let position_bytes = positions.encode(); + let position_bytes = positions.encode_v2(); // ┌──────────┬──────────┬───────┬────────────────────────────────┐ // │ │ │ │ │ @@ -291,24 +316,27 @@ pub fn encode_block(block: &[Change], arena: &SharedArena) -> Vec { // │ │ │ │ │ // └──────────┴──────────┴───────┴────────────────────────────────┘ - let ops_bytes = serde_columnar::to_vec(&EncodedOpsAndDeleteStarts { - ops: encoded_ops, - delete_start_ids: del_starts, - }) - .unwrap(); + let ops_bytes = serde_columnar::to_vec(&EncodedOps { ops: encoded_ops }).unwrap(); + + let delete_id_starts_bytes = if del_starts.is_empty() { + Vec::new() + } else { + serde_columnar::to_vec(&EncodedDeleteStartIds { + delete_start_ids: del_starts, + }) + .unwrap() + }; // ┌────────────────┬─────────────────────────────────────────────┐ // │Value Bytes Size│ Value Bytes │ // └────────────────┴─────────────────────────────────────────────┘ let value_bytes = value_writer.finish(); let out = EncodedBlock { - version: VERSION, counter_start: block[0].id.counter as u32, counter_len: (block.last().unwrap().ctr_end() - block[0].id.counter) as u32, lamport_start: block[0].lamport(), lamport_len: block.last().unwrap().lamport_end() - block[0].lamport(), n_changes: block.len() as u32, - first_counter: block[0].id.counter as u32, peers: Cow::Owned(peer_bytes), lengths: Cow::Owned(lengths_bytes), dep_on_self: dep_self_encoder.finish().unwrap().into(), @@ -323,9 +351,14 @@ pub fn encode_block(block: &[Change], arena: &SharedArena) -> Vec { keys: keys_bytes.into(), positions: position_bytes.into(), ops: ops_bytes.into(), + delete_start_ids: delete_id_starts_bytes.into(), values: value_bytes.into(), }; - postcard::to_allocvec(&out).unwrap() + + // diagnose_block(&out); + let ans = postcard::to_allocvec(&out).unwrap(); + // info!("block size = {}", ans.len()); + ans } fn encode_keys(keys: Vec) -> Vec { @@ -442,7 +475,6 @@ pub fn decode_header(m_bytes: &[u8]) -> LoroResult { fn decode_header_from_doc(doc: &EncodedBlock) -> Result { let EncodedBlock { n_changes, - first_counter, peers: peers_bytes, lengths: lengths_bytes, dep_on_self, @@ -450,7 +482,6 @@ fn decode_header_from_doc(doc: &EncodedBlock) -> Result Result, +} + +#[columnar(ser, de)] +struct EncodedDeleteStartIds { #[columnar(class = "vec", iter = "EncodedDeleteStartId")] delete_start_ids: Vec, } @@ -679,23 +708,20 @@ pub fn decode_block( header_on_stack.as_ref().unwrap() }); let EncodedBlock { - version, n_changes, - first_counter, timestamps, + counter_start: first_counter, commit_msg_lengths, commit_msgs, cids, keys, ops, + delete_start_ids, values, positions, .. } = doc; let mut changes = Vec::with_capacity(n_changes as usize); - if version != VERSION { - return Err(LoroError::IncompatibleFutureEncodingError(version as usize)); - } let mut timestamp_decoder: DeltaRleDecoder = DeltaRleDecoder::new(×tamps); let mut commit_msg_len_decoder = AnyRleDecoder::::new(&commit_msg_lengths); let mut commit_msg_index = 0; @@ -704,7 +730,7 @@ pub fn decode_block( peers: &header.peers, keys, }; - let positions = PositionArena::decode(&positions)?; + let positions = PositionArena::decode_v2(&positions)?; let positions = positions.parse_to_positions(); let cids: &Vec = header.cids.get_or_init(|| { ContainerArena::decode(&cids) @@ -715,10 +741,19 @@ pub fn decode_block( .unwrap() }); let mut value_reader = ValueReader::new(&values); - let encoded_ops_iters = - serde_columnar::iter_from_bytes::(&ops).unwrap(); + let encoded_ops_iters = serde_columnar::iter_from_bytes::(&ops).unwrap(); let op_iter = encoded_ops_iters.ops; - let mut del_iter = encoded_ops_iters.delete_start_ids; + let encoded_delete_id_starts: EncodedDeleteStartIds = if delete_start_ids.is_empty() { + EncodedDeleteStartIds { + delete_start_ids: Vec::new(), + } + } else { + serde_columnar::from_bytes(&delete_start_ids).unwrap() + }; + let mut del_iter = encoded_delete_id_starts + .delete_start_ids + .into_iter() + .map(Ok); for i in 0..(n_changes as usize) { let commit_msg: Option> = { let len = commit_msg_len_decoder.next().unwrap().unwrap(); @@ -793,3 +828,28 @@ pub fn decode_block( Ok(changes) } + +#[cfg(test)] +mod test { + use crate::{delta::DeltaValue, LoroDoc}; + + #[test] + pub fn encode_single_text_edit() { + let doc = LoroDoc::new(); + doc.start_auto_commit(); + doc.get_text("text").insert(0, "Hi").unwrap(); + + let bytes = doc.export_from(&Default::default()); + println!("Old Update bytes {:?}", dev_utils::ByteSize(bytes.length())); + + let bytes = doc.export(crate::loro::ExportMode::Updates { + from: &Default::default(), + }); + println!("Update bytes {:?}", dev_utils::ByteSize(bytes.length())); + // assert!(bytes.len() < 30); + + let bytes = doc.export(crate::loro::ExportMode::Snapshot); + println!("Snapshot bytes {:?}", dev_utils::ByteSize(bytes.length())); + // assert!(bytes.len() < 30); + } +} From 1c4d7e218280b58e9ee9e13c1b948e8b25e4e3ca Mon Sep 17 00:00:00 2001 From: Zixuan Chen Date: Fri, 6 Sep 2024 22:57:19 +0800 Subject: [PATCH 2/5] chore: add some debug log --- .../src/oplog/change_store/block_encode.rs | 33 +++++++++++++++++-- 1 file changed, 31 insertions(+), 2 deletions(-) diff --git a/crates/loro-internal/src/oplog/change_store/block_encode.rs b/crates/loro-internal/src/oplog/change_store/block_encode.rs index 855c747f..b1e5374a 100644 --- a/crates/loro-internal/src/oplog/change_store/block_encode.rs +++ b/crates/loro-internal/src/oplog/change_store/block_encode.rs @@ -355,9 +355,10 @@ pub fn encode_block(block: &[Change], arena: &SharedArena) -> Vec { values: value_bytes.into(), }; - // diagnose_block(&out); + diagnose_block(&out); let ans = postcard::to_allocvec(&out).unwrap(); // info!("block size = {}", ans.len()); + println!("BLOCK SIZE = {}", ans.len()); ans } @@ -837,8 +838,26 @@ mod test { pub fn encode_single_text_edit() { let doc = LoroDoc::new(); doc.start_auto_commit(); - doc.get_text("text").insert(0, "Hi").unwrap(); + doc.get_map("map").insert("x", 100).unwrap(); + // doc.get_text("text").insert(0, "Hi").unwrap(); + // let node = doc.get_tree("tree").create(None).unwrap(); + // doc.get_tree("tree").create(node).unwrap(); + diagnose(&doc); + doc.get_map("map").insert("y", 20).unwrap(); + diagnose(&doc); + doc.get_map("map").insert("z", 1000).unwrap(); + diagnose(&doc); + doc.get_text("text").insert(0, "Hello").unwrap(); + diagnose(&doc); + doc.get_text("text").insert(2, "He").unwrap(); + diagnose(&doc); + doc.get_text("text").delete(1, 4).unwrap(); + diagnose(&doc); + doc.get_text("text").delete(0, 2).unwrap(); + diagnose(&doc); + } + fn diagnose(doc: &LoroDoc) { let bytes = doc.export_from(&Default::default()); println!("Old Update bytes {:?}", dev_utils::ByteSize(bytes.length())); @@ -851,5 +870,15 @@ mod test { let bytes = doc.export(crate::loro::ExportMode::Snapshot); println!("Snapshot bytes {:?}", dev_utils::ByteSize(bytes.length())); // assert!(bytes.len() < 30); + + let json = doc.export_json_updates(&Default::default(), &doc.oplog_vv()); + let json_string = serde_json::to_string(&json.changes).unwrap(); + println!( + "JSON string bytes {:?}", + dev_utils::ByteSize(json_string.len()) + ); + let bytes = postcard::to_allocvec(&json.changes).unwrap(); + println!("JSON update bytes {:?}", dev_utils::ByteSize(bytes.len())); + println!("\n\n") } } From 71e46f65f1878e8ae34dbdf364fa3b65dfe95531 Mon Sep 17 00:00:00 2001 From: Zixuan Chen Date: Tue, 10 Sep 2024 13:09:45 +0800 Subject: [PATCH 3/5] perf: export block directly for updates It can reduce the overhead for small updates --- crates/loro-internal/src/change.rs | 6 +- crates/loro-internal/src/encoding.rs | 69 +++++----------- .../src/encoding/fast_snapshot.rs | 68 +++++++++------- crates/loro-internal/src/encoding/gc.rs | 2 +- crates/loro-internal/src/oplog.rs | 13 ++- .../loro-internal/src/oplog/change_store.rs | 79 ++++++++++++++++--- crates/loro/tests/loro_rust_test.rs | 12 +++ loro-js/tests/basic.test.ts | 2 +- 8 files changed, 154 insertions(+), 97 deletions(-) diff --git a/crates/loro-internal/src/change.rs b/crates/loro-internal/src/change.rs index 7333cb0a..7a3d8d3f 100644 --- a/crates/loro-internal/src/change.rs +++ b/crates/loro-internal/src/change.rs @@ -257,14 +257,14 @@ impl Change { } /// [Unix time](https://en.wikipedia.org/wiki/Unix_time) -/// It is the number of seconds that have elapsed since 00:00:00 UTC on 1 January 1970. +/// It is the number of milliseconds that have elapsed since 00:00:00 UTC on 1 January 1970. #[cfg(not(all(feature = "wasm", target_arch = "wasm32")))] pub(crate) fn get_sys_timestamp() -> Timestamp { use std::time::{SystemTime, UNIX_EPOCH}; SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap() - .as_secs() + .as_millis() .as_() } @@ -281,7 +281,7 @@ pub fn get_sys_timestamp() -> Timestamp { pub fn now() -> f64; } - now() as Timestamp / 1000 + now() as Timestamp } #[cfg(test)] diff --git a/crates/loro-internal/src/encoding.rs b/crates/loro-internal/src/encoding.rs index e432b22b..3fa138fd 100644 --- a/crates/loro-internal/src/encoding.rs +++ b/crates/loro-internal/src/encoding.rs @@ -178,9 +178,8 @@ pub(crate) fn decode_oplog( let ParsedHeaderAndBody { mode, body, .. } = parsed; match mode { EncodeMode::Rle | EncodeMode::Snapshot => encode_reordered::decode_updates(oplog, body), - EncodeMode::FastSnapshot | EncodeMode::FastUpdates => { - fast_snapshot::decode_oplog(oplog, body) - } + EncodeMode::FastSnapshot => fast_snapshot::decode_oplog(oplog, body), + EncodeMode::FastUpdates => fast_snapshot::decode_updates(oplog, body.to_vec().into()), EncodeMode::Auto => unreachable!(), } } @@ -268,69 +267,39 @@ pub(crate) fn export_snapshot(doc: &LoroDoc) -> Vec { } pub(crate) fn export_fast_snapshot(doc: &LoroDoc) -> Vec { - // HEADER - let mut ans = Vec::with_capacity(MIN_HEADER_SIZE); - ans.extend(MAGIC_BYTES); - let checksum = [0; 16]; - ans.extend(checksum); - ans.extend(EncodeMode::FastSnapshot.to_bytes()); - - // BODY - fast_snapshot::encode_snapshot(doc, &mut ans); - - // CHECKSUM in HEADER - let checksum_body = &ans[20..]; - let checksum = xxhash_rust::xxh32::xxh32(checksum_body, XXH_SEED); - ans[16..20].copy_from_slice(&checksum.to_le_bytes()); - ans + encode_with(EncodeMode::FastSnapshot, &mut |ans| { + fast_snapshot::encode_snapshot(doc, ans); + }) } pub(crate) fn export_fast_updates(doc: &LoroDoc, vv: &VersionVector) -> Vec { - // HEADER - let mut ans = Vec::with_capacity(MIN_HEADER_SIZE); - ans.extend(MAGIC_BYTES); - let checksum = [0; 16]; - ans.extend(checksum); - ans.extend(EncodeMode::FastUpdates.to_bytes()); - - // BODY - fast_snapshot::encode_updates(doc, vv, &mut ans); - - // CHECKSUM in HEADER - let checksum_body = &ans[20..]; - let checksum = xxhash_rust::xxh32::xxh32(checksum_body, XXH_SEED); - ans[16..20].copy_from_slice(&checksum.to_le_bytes()); - ans + encode_with(EncodeMode::FastUpdates, &mut |ans| { + fast_snapshot::encode_updates(doc, vv, ans); + }) } pub(crate) fn export_fast_updates_in_range(oplog: &OpLog, spans: &[IdSpan]) -> Vec { - // HEADER - let mut ans = Vec::with_capacity(MIN_HEADER_SIZE); - ans.extend(MAGIC_BYTES); - let checksum = [0; 16]; - ans.extend(checksum); - ans.extend(EncodeMode::FastUpdates.to_bytes()); - - // BODY - fast_snapshot::encode_updates_in_range(oplog, spans, &mut ans); - - // CHECKSUM in HEADER - let checksum_body = &ans[20..]; - let checksum = xxhash_rust::xxh32::xxh32(checksum_body, XXH_SEED); - ans[16..20].copy_from_slice(&checksum.to_le_bytes()); - ans + encode_with(EncodeMode::FastUpdates, &mut |ans| { + fast_snapshot::encode_updates_in_range(oplog, spans, ans); + }) } pub(crate) fn export_gc_snapshot(doc: &LoroDoc, f: &Frontiers) -> Vec { + encode_with(EncodeMode::FastSnapshot, &mut |ans| { + gc::export_gc_snapshot(doc, f, ans).unwrap(); + }) +} + +fn encode_with(mode: EncodeMode, f: &mut dyn FnMut(&mut Vec)) -> Vec { // HEADER let mut ans = Vec::with_capacity(MIN_HEADER_SIZE); ans.extend(MAGIC_BYTES); let checksum = [0; 16]; ans.extend(checksum); - ans.extend(EncodeMode::FastSnapshot.to_bytes()); + ans.extend(mode.to_bytes()); // BODY - gc::export_gc_snapshot(doc, f, &mut ans).unwrap(); + f(&mut ans); // CHECKSUM in HEADER let checksum_body = &ans[20..]; diff --git a/crates/loro-internal/src/encoding/fast_snapshot.rs b/crates/loro-internal/src/encoding/fast_snapshot.rs index 1a60c8a6..c82aaf14 100644 --- a/crates/loro-internal/src/encoding/fast_snapshot.rs +++ b/crates/loro-internal/src/encoding/fast_snapshot.rs @@ -179,35 +179,6 @@ pub(crate) fn encode_snapshot(doc: &LoroDoc, w: &mut W) { ); } -pub(crate) fn encode_updates(doc: &LoroDoc, vv: &VersionVector, w: &mut W) { - let oplog = doc.oplog().try_lock().unwrap(); - let bytes = oplog.export_from_fast(vv); - _encode_snapshot( - Snapshot { - oplog_bytes: bytes, - state_bytes: None, - gc_bytes: Bytes::new(), - }, - w, - ); -} - -pub(crate) fn encode_updates_in_range( - oplog: &OpLog, - spans: &[IdSpan], - w: &mut W, -) { - let bytes = oplog.export_from_fast_in_range(spans); - _encode_snapshot( - Snapshot { - oplog_bytes: bytes, - state_bytes: None, - gc_bytes: Bytes::new(), - }, - w, - ); -} - pub(crate) fn decode_oplog(oplog: &mut OpLog, bytes: &[u8]) -> Result<(), LoroError> { let oplog_len = u32::from_le_bytes(bytes[0..4].try_into().unwrap()); let oplog_bytes = &bytes[4..4 + oplog_len as usize]; @@ -223,3 +194,42 @@ pub(crate) fn decode_oplog(oplog: &mut OpLog, bytes: &[u8]) -> Result<(), LoroEr oplog.import_unknown_lamport_pending_changes(pending_changes)?; Ok(()) } + +pub(crate) fn encode_updates(doc: &LoroDoc, vv: &VersionVector, w: &mut W) { + let oplog = doc.oplog().try_lock().unwrap(); + oplog.export_blocks_from(vv, w); +} + +pub(crate) fn encode_updates_in_range( + oplog: &OpLog, + spans: &[IdSpan], + w: &mut W, +) { + oplog.export_blocks_in_range(spans, w); +} + +pub(crate) fn decode_updates(oplog: &mut OpLog, body: Bytes) -> Result<(), LoroError> { + let mut reader: &[u8] = body.as_ref(); + let mut index = 0; + let self_vv = oplog.vv(); + let mut changes = Vec::new(); + while !reader.is_empty() { + let old_reader_len = reader.len(); + let len = leb128::read::unsigned(&mut reader).unwrap() as usize; + index += old_reader_len - reader.len(); + trace!("index={}", index); + let block_bytes = body.slice(index..index + len); + trace!("decoded block_bytes = {:?}", &block_bytes); + let new_changes = ChangeStore::decode_block_bytes(block_bytes, &oplog.arena, self_vv)?; + changes.extend(new_changes); + index += len; + reader = &reader[len..]; + } + + changes.sort_unstable_by_key(|x| x.lamport); + let (latest_ids, pending_changes) = import_changes_to_oplog(changes, oplog)?; + // TODO: PERF: should we use hashmap to filter latest_ids with the same peer first? + oplog.try_apply_pending(latest_ids); + oplog.import_unknown_lamport_pending_changes(pending_changes)?; + Ok(()) +} diff --git a/crates/loro-internal/src/encoding/gc.rs b/crates/loro-internal/src/encoding/gc.rs index 2016ac7c..29c6fe9c 100644 --- a/crates/loro-internal/src/encoding/gc.rs +++ b/crates/loro-internal/src/encoding/gc.rs @@ -37,7 +37,7 @@ pub(crate) fn export_gc_snapshot( &start_vv, &start_from, ); - let oplog_bytes = oplog.export_from_fast(&start_vv); + let oplog_bytes = oplog.export_oplog_from(&start_vv); let latest_vv = oplog.vv(); let ops_num: usize = latest_vv.sub_iter(&start_vv).map(|x| x.atom_len()).sum(); drop(oplog); diff --git a/crates/loro-internal/src/oplog.rs b/crates/loro-internal/src/oplog.rs index df1dd43e..aba2579e 100644 --- a/crates/loro-internal/src/oplog.rs +++ b/crates/loro-internal/src/oplog.rs @@ -366,14 +366,19 @@ impl OpLog { } #[inline(always)] - pub(crate) fn export_from_fast(&self, vv: &VersionVector) -> Bytes { + pub(crate) fn export_oplog_from(&self, vv: &VersionVector) -> Bytes { self.change_store .export_from(vv, self.vv(), self.frontiers()) } #[inline(always)] - pub(crate) fn export_from_fast_in_range(&self, spans: &[IdSpan]) -> Bytes { - self.change_store.export_from_fast_in_range(spans) + pub(crate) fn export_blocks_from(&self, vv: &VersionVector, w: &mut W) { + self.change_store.export_blocks_from(vv, self.vv(), w) + } + + #[inline(always)] + pub(crate) fn export_blocks_in_range(&self, spans: &[IdSpan], w: &mut W) { + self.change_store.export_blocks_in_range(spans, w) } #[inline(always)] @@ -545,7 +550,7 @@ impl OpLog { pub fn get_timestamp_for_next_txn(&self) -> Timestamp { if self.configure.record_timestamp() { - get_sys_timestamp() + (get_sys_timestamp() + 500) / 1000 } else { 0 } diff --git a/crates/loro-internal/src/oplog/change_store.rs b/crates/loro-internal/src/oplog/change_store.rs index 29f9bddd..a523ce1b 100644 --- a/crates/loro-internal/src/oplog/change_store.rs +++ b/crates/loro-internal/src/oplog/change_store.rs @@ -197,10 +197,8 @@ impl ChangeStore { new_store.encode_from(start_vv, &start_frontiers, latest_vv, latest_frontiers) } - pub(super) fn export_from_fast_in_range(&self, spans: &[IdSpan]) -> Bytes { + pub(super) fn export_blocks_in_range(&self, spans: &[IdSpan], w: &mut W) { let new_store = ChangeStore::new_mem(&self.arena, self.merge_interval.clone()); - let latest_vv: VersionVector = spans.iter().map(|x| x.id_last()).collect(); - let start_vv: VersionVector = spans.iter().map(|x| x.id_start()).collect(); for span in spans { let mut span = *span; span.normalize_(); @@ -216,12 +214,7 @@ impl ChangeStore { } } - new_store.encode_from( - &start_vv, - &Default::default(), - &latest_vv, - &Default::default(), - ) + encode_blocks_in_store(new_store, &self.arena, w); } fn encode_from( @@ -266,6 +259,32 @@ impl ChangeStore { Ok(changes) } + + pub(crate) fn decode_block_bytes( + bytes: Bytes, + arena: &SharedArena, + self_vv: &VersionVector, + ) -> LoroResult> { + let mut ans = ChangesBlockBytes::new(bytes).parse(arena)?; + if ans.is_empty() { + return Ok(ans); + } + + let start = self_vv.get(&ans[0].peer()).copied().unwrap_or(0); + ans.retain_mut(|c| { + if c.id.counter >= start { + true + } else if c.ctr_end() > start { + *c = c.slice((start - c.id.counter) as usize, c.atom_len()); + true + } else { + false + } + }); + + Ok(ans) + } + pub fn get_dag_nodes_that_contains(&self, id: ID) -> Option> { let block = self.get_block_that_contains(id)?; Some(block.content.iter_dag_nodes()) @@ -466,6 +485,48 @@ impl ChangeStore { .map(|(k, v)| k.len() + v.len()) .sum() } + + pub(crate) fn export_blocks_from( + &self, + start_vv: &VersionVector, + latest_vv: &VersionVector, + w: &mut W, + ) { + let new_store = ChangeStore::new_mem(&self.arena, self.merge_interval.clone()); + for span in latest_vv.sub_iter(start_vv) { + // PERF: this can be optimized by reusing the current encoded blocks + // In the current method, it needs to parse and re-encode the blocks + for c in self.iter_changes(span) { + let start = ((start_vv.get(&c.id.peer).copied().unwrap_or(0) - c.id.counter).max(0) + as usize) + .min(c.atom_len()); + let end = ((latest_vv.get(&c.id.peer).copied().unwrap_or(0) - c.id.counter).max(0) + as usize) + .min(c.atom_len()); + + assert_ne!(start, end); + let ch = c.slice(start, end); + new_store.insert_change(ch, false); + } + } + + let arena = &self.arena; + encode_blocks_in_store(new_store, arena, w); + } +} + +fn encode_blocks_in_store( + new_store: ChangeStore, + arena: &SharedArena, + w: &mut W, +) { + let mut inner = new_store.inner.lock().unwrap(); + for (_id, block) in inner.mem_parsed_kv.iter_mut() { + let bytes = block.to_bytes(&arena); + leb128::write::unsigned(w, bytes.bytes.len() as u64).unwrap(); + trace!("encoded block_bytes = {:?}", &bytes.bytes); + w.write_all(&bytes.bytes).unwrap(); + } } mod mut_external_kv { diff --git a/crates/loro/tests/loro_rust_test.rs b/crates/loro/tests/loro_rust_test.rs index 99946739..9eacf3df 100644 --- a/crates/loro/tests/loro_rust_test.rs +++ b/crates/loro/tests/loro_rust_test.rs @@ -1566,3 +1566,15 @@ fn test_gc_can_remove_unreachable_states() -> LoroResult<()> { Ok(()) } + +#[test] +fn small_update_size() { + let doc = LoroDoc::new(); + let text = doc.get_text("text"); + text.insert(0, "h").unwrap(); + let bytes = doc.export(loro::ExportMode::Updates { + from: &Default::default(), + }); + println!("Update bytes {:?}", dev_utils::ByteSize(bytes.len())); + assert!(bytes.len() < 90, "Large update size {}", bytes.len()); +} diff --git a/loro-js/tests/basic.test.ts b/loro-js/tests/basic.test.ts index ccca652d..1c903fa9 100644 --- a/loro-js/tests/basic.test.ts +++ b/loro-js/tests/basic.test.ts @@ -360,7 +360,7 @@ it("enable timestamp", () => { doc.commit(); { const c = doc.getChangeAt({ peer: "1", counter: 4 }); - expect(c.timestamp).toBeCloseTo(Date.now(), -1); + expect(c.timestamp).toBeCloseTo(Date.now()/1000, -1); } }); From dfc99c1746c3fb691d69734987c04a5fa7f2f3d0 Mon Sep 17 00:00:00 2001 From: Leon Zhao Date: Wed, 11 Sep 2024 22:53:10 +0800 Subject: [PATCH 4/5] feat: block encode header (#458) * bk * feat: encode header * chore: use columnar 0.3.9 * fix: not need encode last lamport and length * chore: cargo fix * fix: print * bk * chore: columnar 0.3.10 --- Cargo.lock | 24 +- Cargo.toml | 2 +- crates/examples/examples/time_tracker.rs | 1 - crates/examples/src/utils.rs | 2 +- crates/fuzz/src/container/tree.rs | 1 - crates/loro-common/src/lib.rs | 2 +- .../src/container/richtext/tracker.rs | 2 +- crates/loro-internal/src/op.rs | 1 - crates/loro-internal/src/oplog.rs | 3 +- .../loro-internal/src/oplog/change_store.rs | 8 +- .../src/oplog/change_store/block_encode.rs | 245 +++--------------- .../oplog/change_store/block_meta_encode.rs | 186 +++++++++++++ crates/loro-internal/src/state.rs | 1 - .../src/state/container_store.rs | 9 +- .../src/state/container_store/inner_store.rs | 5 +- .../src/state/movable_list_state.rs | 3 +- crates/loro-internal/src/utils/kv_wrapper.rs | 2 +- crates/loro/src/change_meta.rs | 2 +- crates/loro/src/lib.rs | 1 - crates/loro/tests/integration_test/mod.rs | 3 +- 20 files changed, 255 insertions(+), 248 deletions(-) create mode 100644 crates/loro-internal/src/oplog/change_store/block_meta_encode.rs diff --git a/Cargo.lock b/Cargo.lock index b83a6e9a..c38d4fc3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -491,9 +491,9 @@ dependencies = [ [[package]] name = "darling" -version = "0.20.3" +version = "0.20.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0209d94da627ab5605dcccf08bb18afa5009cfbef48d8a8b7d7bdbc79be25c5e" +checksum = "6f63b86c8a8826a49b8c21f08a2d07338eec8d900540f8630dc76284be802989" dependencies = [ "darling_core", "darling_macro", @@ -501,9 +501,9 @@ dependencies = [ [[package]] name = "darling_core" -version = "0.20.3" +version = "0.20.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "177e3443818124b357d8e76f53be906d60937f0d3a90773a664fa63fa253e621" +checksum = "95133861a8032aaea082871032f5815eb9e98cef03fa916ab4500513994df9e5" dependencies = [ "fnv", "ident_case", @@ -515,9 +515,9 @@ dependencies = [ [[package]] name = "darling_macro" -version = "0.20.3" +version = "0.20.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "836a9bbc7ad63342d6d6e7b815ccab164bc77a2d95d84bc3117a8c0d5c98e2d5" +checksum = "d336a2a514f6ccccaa3e09b02d41d35330c07ddf03a62165fcec10bb561c7806" dependencies = [ "darling_core", "quote 1.0.35", @@ -2024,9 +2024,9 @@ dependencies = [ [[package]] name = "serde_columnar" -version = "0.3.7" +version = "0.3.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3748cbf2b43a15ee9627881cabd7820d50508781cdebd3bb54cea49215d367a1" +checksum = "6d4e3c0e46450edf7da174b610b9143eb8ca22059ace5016741fc9e20b88d1e7" dependencies = [ "itertools 0.11.0", "postcard", @@ -2037,9 +2037,9 @@ dependencies = [ [[package]] name = "serde_columnar_derive" -version = "0.3.4" +version = "0.3.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3e5eaacabbc55a397ffbb1ee32523f40f86fdefea8a8d9db19630d8b7c00edd1" +checksum = "42c5d47942b2a7e76118b697fc0f94516a5d8366a3c0fee8d0e2b713e952e306" dependencies = [ "darling", "proc-macro2 1.0.75", @@ -2151,9 +2151,9 @@ dependencies = [ [[package]] name = "strsim" -version = "0.10.0" +version = "0.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623" +checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f" [[package]] name = "syn" diff --git a/Cargo.toml b/Cargo.toml index ef34f4e6..aa59ed6f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,7 +20,7 @@ enum_dispatch = "0.3.11" enum-as-inner = "0.5.1" fxhash = "0.2.1" tracing = { version = "0.1" } -serde_columnar = { version = "0.3.7" } +serde_columnar = { version = "0.3.10" } serde_json = "1.0" thiserror = "1" smallvec = { version = "1.8.0", features = ["serde"] } diff --git a/crates/examples/examples/time_tracker.rs b/crates/examples/examples/time_tracker.rs index fd2bf63e..a29e124f 100644 --- a/crates/examples/examples/time_tracker.rs +++ b/crates/examples/examples/time_tracker.rs @@ -1,4 +1,3 @@ -use std::time::Instant; use dev_utils::{get_mem_usage, ByteSize}; use loro::{CommitOptions, LoroCounter, LoroDoc, LoroMap}; diff --git a/crates/examples/src/utils.rs b/crates/examples/src/utils.rs index ba10b087..0ea51c7e 100644 --- a/crates/examples/src/utils.rs +++ b/crates/examples/src/utils.rs @@ -1,6 +1,6 @@ use dev_utils::ByteSize; use loro::LoroDoc; -use std::time::{Duration, Instant}; +use std::time::Instant; pub fn bench_fast_snapshot(doc: &LoroDoc) { let old_v; diff --git a/crates/fuzz/src/container/tree.rs b/crates/fuzz/src/container/tree.rs index b6c50dd2..7bc8dc0e 100644 --- a/crates/fuzz/src/container/tree.rs +++ b/crates/fuzz/src/container/tree.rs @@ -10,7 +10,6 @@ use loro::{ event::Diff, Container, ContainerID, ContainerType, LoroDoc, LoroError, LoroTree, LoroValue, TreeExternalDiff, TreeID, }; -use tracing::{debug, trace}; use crate::{ actions::{Actionable, FromGenericAction, GenericAction}, diff --git a/crates/loro-common/src/lib.rs b/crates/loro-common/src/lib.rs index 891657cf..3b1bbbfd 100644 --- a/crates/loro-common/src/lib.rs +++ b/crates/loro-common/src/lib.rs @@ -1,4 +1,4 @@ -use std::{fmt::Display, io::Write, str::Bytes, sync::Arc}; +use std::{fmt::Display, io::Write, sync::Arc}; use arbitrary::Arbitrary; use enum_as_inner::EnumAsInner; diff --git a/crates/loro-internal/src/container/richtext/tracker.rs b/crates/loro-internal/src/container/richtext/tracker.rs index 05abb6cf..6349cc25 100644 --- a/crates/loro-internal/src/container/richtext/tracker.rs +++ b/crates/loro-internal/src/container/richtext/tracker.rs @@ -6,7 +6,7 @@ use generic_btree::{ }; use loro_common::{Counter, HasId, HasIdSpan, IdFull, IdSpan, Lamport, PeerID, ID}; use rle::HasLength as _; -use tracing::{instrument, trace}; +use tracing::instrument; use crate::{cursor::AbsolutePosition, VersionVector}; diff --git a/crates/loro-internal/src/op.rs b/crates/loro-internal/src/op.rs index c23f5da9..f830266c 100644 --- a/crates/loro-internal/src/op.rs +++ b/crates/loro-internal/src/op.rs @@ -12,7 +12,6 @@ use enum_as_inner::EnumAsInner; use loro_common::{CompactIdLp, ContainerType, CounterSpan, IdFull, IdLp, IdSpan}; use rle::{HasIndex, HasLength, Mergable, Sliceable}; use serde::{ser::SerializeSeq, Deserialize, Serialize}; -use smallvec::SmallVec; use std::{borrow::Cow, ops::Range}; mod content; diff --git a/crates/loro-internal/src/oplog.rs b/crates/loro-internal/src/oplog.rs index aba2579e..91e89b70 100644 --- a/crates/loro-internal/src/oplog.rs +++ b/crates/loro-internal/src/oplog.rs @@ -8,7 +8,7 @@ use std::cell::RefCell; use std::cmp::Ordering; use std::rc::Rc; use std::sync::{Arc, Mutex}; -use tracing::{debug, instrument, trace, trace_span}; +use tracing::{debug, trace, trace_span}; use self::change_store::iter::MergedChangeIter; use self::pending_changes::PendingChanges; @@ -559,6 +559,7 @@ impl OpLog { #[inline(never)] pub(crate) fn idlp_to_id(&self, id: loro_common::IdLp) -> Option { let change = self.change_store.get_change_by_lamport_lte(id)?; + if change.lamport > id.lamport || change.lamport_end() <= id.lamport { return None; } diff --git a/crates/loro-internal/src/oplog/change_store.rs b/crates/loro-internal/src/oplog/change_store.rs index a523ce1b..2e46064f 100644 --- a/crates/loro-internal/src/oplog/change_store.rs +++ b/crates/loro-internal/src/oplog/change_store.rs @@ -1,5 +1,5 @@ use self::block_encode::{decode_block, decode_header, encode_block, ChangesBlockHeader}; -use super::{loro_dag::AppDagNodeInner, AppDag, AppDagNode}; +use super::{loro_dag::AppDagNodeInner, AppDagNode}; use crate::{ arena::SharedArena, change::{Change, Timestamp}, @@ -7,12 +7,12 @@ use crate::{ kv_store::KvStore, op::Op, parent::register_container_and_parent_link, - version::{shrink_frontiers, Frontiers, ImVersionVector}, + version::{Frontiers, ImVersionVector}, VersionVector, }; use block_encode::decode_block_range; use bytes::Bytes; -use fxhash::{FxHashMap, FxHashSet}; +use fxhash::FxHashMap; use itertools::Itertools; use loro_common::{ Counter, HasCounterSpan, HasId, HasIdSpan, HasLamportSpan, IdLp, IdSpan, Lamport, LoroError, @@ -22,7 +22,6 @@ use loro_kv_store::{mem_store::MemKvConfig, MemKvStore}; use once_cell::sync::OnceCell; use rle::{HasLength, Mergable, RlePush, RleVec, Sliceable}; use std::{ - borrow::BorrowMut, cmp::Ordering, collections::{BTreeMap, VecDeque}, ops::{Bound, Deref}, @@ -31,6 +30,7 @@ use std::{ use tracing::{debug, info_span, trace, warn}; mod block_encode; +mod block_meta_encode; mod delta_rle_encode; pub(super) mod iter; diff --git a/crates/loro-internal/src/oplog/change_store/block_encode.rs b/crates/loro-internal/src/oplog/change_store/block_encode.rs index b1e5374a..51858429 100644 --- a/crates/loro-internal/src/oplog/change_store/block_encode.rs +++ b/crates/loro-internal/src/oplog/change_store/block_encode.rs @@ -24,7 +24,7 @@ //! │ N LEB128 Delta Lamports │◁───┘ //! └──────────────────────────────────────────────────────────────┘ //! ┌──────────────────────────────────────────────────────────────┐ -//! │ N LEB128 Delta Rle Timestamps │ +//! │ N LEB128 DeltaOfDelta Timestamps │ //! └──────────────────────────────────────────────────────────────┘ //! ┌────────────────────────────────┬─────────────────────────────┐ //! │ N Rle Commit Msg Lengths │ Commit Messages │ @@ -77,10 +77,10 @@ use loro_common::{ use once_cell::sync::OnceCell; use rle::HasLength; use serde::{Deserialize, Serialize}; -use serde_columnar::{columnar, DeltaRleDecoder, Itertools}; +use serde_columnar::{columnar, AnyRleDecoder, DeltaOfDeltaDecoder, Itertools}; use tracing::info; -use super::delta_rle_encode::{UnsignedDeltaDecoder, UnsignedDeltaEncoder}; +use super::block_meta_encode::decode_changes_header; use crate::arena::SharedArena; use crate::change::{Change, Timestamp}; use crate::container::tree::tree_op; @@ -90,9 +90,6 @@ use crate::encoding::{ self, decode_op, encode_op, get_op_prop, EncodedDeleteStartId, IterableEncodedDeleteStartId, }; use crate::op::Op; -use serde_columnar::{ - AnyRleDecoder, AnyRleEncoder, BoolRleDecoder, BoolRleEncoder, DeltaRleEncoder, -}; #[derive(Debug, Serialize, Deserialize)] struct EncodedBlock<'a> { @@ -102,25 +99,10 @@ struct EncodedBlock<'a> { lamport_len: u32, n_changes: u32, #[serde(borrow)] - peers: Cow<'a, [u8]>, + header: Cow<'a, [u8]>, + // timestamp and commit messages #[serde(borrow)] - lengths: Cow<'a, [u8]>, - #[serde(borrow)] - dep_on_self: Cow<'a, [u8]>, - #[serde(borrow)] - dep_len: Cow<'a, [u8]>, - #[serde(borrow)] - dep_peer_idxs: Cow<'a, [u8]>, - #[serde(borrow)] - dep_counters: Cow<'a, [u8]>, - #[serde(borrow)] - lamports: Cow<'a, [u8]>, - #[serde(borrow)] - timestamps: Cow<'a, [u8]>, - #[serde(borrow)] - commit_msg_lengths: Cow<'a, [u8]>, - #[serde(borrow)] - commit_msgs: Cow<'a, [u8]>, + change_meta: Cow<'a, [u8]>, // ---------------------- Ops ---------------------- #[serde(borrow)] cids: Cow<'a, [u8]>, @@ -137,22 +119,9 @@ struct EncodedBlock<'a> { } fn diagnose_block(block: &EncodedBlock) { - use std::mem; - info!("Diagnosing EncodedBlock:"); - info!(" peers: {} bytes", block.peers.len()); - info!(" lengths: {} bytes", block.lengths.len()); - info!(" dep_on_self: {} bytes", block.dep_on_self.len()); - info!(" dep_len: {} bytes", block.dep_len.len()); - info!(" dep_peer_idxs: {} bytes", block.dep_peer_idxs.len()); - info!(" dep_counters: {} bytes", block.dep_counters.len()); - info!(" lamports: {} bytes", block.lamports.len()); - info!(" timestamps: {} bytes", block.timestamps.len()); - info!( - " commit_msg_lengths: {} bytes", - block.commit_msg_lengths.len() - ); - info!(" commit_msgs: {} bytes", block.commit_msgs.len()); + info!(" header {} bytes", block.header.len()); + info!(" change_meta {} bytes", block.change_meta.len()); info!(" cids: {} bytes", block.cids.len()); info!(" keys: {} bytes", block.keys.len()); info!(" positions: {} bytes", block.positions.len()); @@ -175,47 +144,6 @@ pub fn encode_block(block: &[Change], arena: &SharedArena) -> Vec { peer_register.register(&peer); let cid_register: ValueRegister = ValueRegister::new(); - let mut timestamp_encoder = DeltaRleEncoder::new(); - let mut lamport_encoder = UnsignedDeltaEncoder::new(block.len() * 2 + 4); - let mut commit_msg_len_encoder = AnyRleEncoder::::new(); - let mut commit_msgs = String::new(); - let mut dep_self_encoder = BoolRleEncoder::new(); - let mut dep_len_encoder = AnyRleEncoder::::new(); - let mut encoded_deps = EncodedDeps { - peer_idx: AnyRleEncoder::new(), - counter: AnyRleEncoder::new(), - }; - - for c in block { - timestamp_encoder.append(c.timestamp()).unwrap(); - lamport_encoder.push(c.lamport() as u64); - if let Some(msg) = c.commit_msg.as_ref() { - commit_msg_len_encoder.append(msg.len() as u32).unwrap(); - commit_msgs.push_str(msg); - } else { - commit_msg_len_encoder.append(0).unwrap(); - } - - let mut dep_on_self = false; - for dep in c.deps().iter() { - if dep.peer == peer { - dep_on_self = true; - } else { - let peer_idx = peer_register.register(&dep.peer); - encoded_deps.peer_idx.append(peer_idx as u32).unwrap(); - encoded_deps.counter.append(dep.counter as u32).unwrap(); - } - } - - dep_self_encoder.append(dep_on_self).unwrap(); - dep_len_encoder - .append(if dep_on_self { - c.deps().len() as u64 - 1 - } else { - c.deps().len() as u64 - }) - .unwrap(); - } let mut encoded_ops = Vec::new(); let mut registers = Registers { @@ -281,16 +209,6 @@ pub fn encode_block(block: &[Change], arena: &SharedArena) -> Vec { // Write to output - // PeerIDs - let peers = registers.peer_register.unwrap_vec(); - let peer_bytes: Vec = peers.iter().flat_map(|p| p.to_le_bytes()).collect(); - - // First Counter + Change Len - let mut lengths_bytes = Vec::new(); - for c in block { - leb128::write::unsigned(&mut lengths_bytes, c.atom_len() as u64).unwrap(); - } - // ┌────────────────────┬─────────────────────────────────────────┐ // │ Key Strings Size │ Key Strings │ // └────────────────────┴─────────────────────────────────────────┘ @@ -330,6 +248,14 @@ pub fn encode_block(block: &[Change], arena: &SharedArena) -> Vec { // │Value Bytes Size│ Value Bytes │ // └────────────────┴─────────────────────────────────────────────┘ + // PeerIDs + let mut peer_register = registers.peer_register; + // .unwrap_vec(); + // let peer_bytes: Vec = peers.iter().flat_map(|p| p.to_le_bytes()).collect(); + + // Change meta + let (header, change_meta) = encode_changes(block, &mut peer_register); + let value_bytes = value_writer.finish(); let out = EncodedBlock { counter_start: block[0].id.counter as u32, @@ -337,16 +263,8 @@ pub fn encode_block(block: &[Change], arena: &SharedArena) -> Vec { lamport_start: block[0].lamport(), lamport_len: block.last().unwrap().lamport_end() - block[0].lamport(), n_changes: block.len() as u32, - peers: Cow::Owned(peer_bytes), - lengths: Cow::Owned(lengths_bytes), - dep_on_self: dep_self_encoder.finish().unwrap().into(), - dep_len: dep_len_encoder.finish().unwrap().into(), - dep_peer_idxs: encoded_deps.peer_idx.finish().unwrap().into(), - dep_counters: encoded_deps.counter.finish().unwrap().into(), - lamports: lamport_encoder.finish().0.into(), - timestamps: timestamp_encoder.finish().unwrap().into(), - commit_msg_lengths: commit_msg_len_encoder.finish().unwrap().into(), - commit_msgs: Cow::Owned(commit_msgs.into_bytes()), + header: header.into(), + change_meta: change_meta.into(), cids: container_arena.encode().into(), keys: keys_bytes.into(), positions: position_bytes.into(), @@ -357,8 +275,7 @@ pub fn encode_block(block: &[Change], arena: &SharedArena) -> Vec { diagnose_block(&out); let ans = postcard::to_allocvec(&out).unwrap(); - // info!("block size = {}", ans.len()); - println!("BLOCK SIZE = {}", ans.len()); + info!("block size = {}", ans.len()); ans } @@ -395,6 +312,7 @@ use crate::encoding::value::{ RawTreeMove, Value, ValueDecodedArenasTrait, ValueEncodeRegister, ValueKind, ValueReader, ValueWriter, }; +use crate::oplog::change_store::block_meta_encode::encode_changes; use crate::version::Frontiers; impl ValueEncodeRegister for Registers { fn key_mut(&mut self) -> &mut ValueRegister { @@ -476,107 +394,22 @@ pub fn decode_header(m_bytes: &[u8]) -> LoroResult { fn decode_header_from_doc(doc: &EncodedBlock) -> Result { let EncodedBlock { n_changes, - peers: peers_bytes, - lengths: lengths_bytes, - dep_on_self, - dep_len, - dep_peer_idxs, - dep_counters, - lamports, + header, counter_len, counter_start, lamport_len, lamport_start, .. } = doc; - - let first_counter = *counter_start as Counter; - let n_changes = *n_changes as usize; - let peer_num = peers_bytes.len() / 8; - let mut peers = Vec::with_capacity(peer_num); - for i in 0..peer_num { - let peer_id = - PeerID::from_le_bytes((&peers_bytes[(8 * i)..(8 * (i + 1))]).try_into().unwrap()); - peers.push(peer_id); - } - - // ┌───────────────────┬──────────────────────────────────────────┐ │ - // │ LEB First Counter │ N LEB128 Change AtomLen │◁───┼───── Important metadata - // └───────────────────┴──────────────────────────────────────────┘ │ - let mut lengths = Vec::with_capacity(n_changes); - let mut lengths_bytes: &[u8] = lengths_bytes; - for _ in 0..n_changes { - lengths.push(leb128::read::unsigned(&mut lengths_bytes).unwrap() as Counter); - } - - // ┌───────────────────┬────────────────────────┬─────────────────┐ │ - // │N DepOnSelf BoolRle│ N Delta Rle Deps Lens │ N Dep IDs │◁───┘ - // └───────────────────┴────────────────────────┴─────────────────┘ - - let mut dep_self_decoder = BoolRleDecoder::new(dep_on_self); - let mut this_counter = first_counter; - let mut deps: Vec = Vec::with_capacity(n_changes); - let n = n_changes; - let mut deps_len = AnyRleDecoder::::new(dep_len); - let deps_peers_decoder = AnyRleDecoder::::new(dep_peer_idxs); - let deps_counters_decoder = AnyRleDecoder::::new(dep_counters); - let mut deps_peers_iter = deps_peers_decoder; - let mut deps_counters_iter = deps_counters_decoder; - for i in 0..n { - let mut f = Frontiers::default(); - - if dep_self_decoder.next().unwrap().unwrap() { - f.push(ID::new(peers[0], this_counter - 1)) - } - - let len = deps_len.next().unwrap().unwrap() as usize; - for _ in 0..len { - let peer_idx = deps_peers_iter.next().unwrap().unwrap() as usize; - let peer = peers[peer_idx]; - let counter = deps_counters_iter.next().unwrap().unwrap() as Counter; - f.push(ID::new(peer, counter)); - } - - deps.push(f); - this_counter += lengths[i]; - } - - let mut counters = Vec::with_capacity(n + 1); - let mut last = first_counter; - for i in 0..n { - counters.push(last); - last += lengths[i]; - } - counters.push(last); - assert_eq!(last, (counter_start + counter_len) as Counter); - let mut lamport_decoder = UnsignedDeltaDecoder::new(lamports, n_changes); - let mut lamports = Vec::with_capacity(n + 1); - for _ in 0..n { - lamports.push(lamport_decoder.next().unwrap() as Lamport); - } - - let last_lamport = *lamports.last().unwrap(); - lamports.push(last_lamport + lengths.last().copied().unwrap() as Lamport); - assert_eq!( - *lamports.last().unwrap(), - (lamport_start + lamport_len) as Lamport + let ans: ChangesBlockHeader = decode_changes_header( + &header, + *n_changes as usize, + *counter_start as Counter, + *counter_len as Counter, + *lamport_start, + *lamport_len, ); - Ok(ChangesBlockHeader { - peer: peers[0], - counter: first_counter, - n_changes, - peers, - counters, - deps_groups: deps, - lamports, - keys: OnceCell::new(), - cids: OnceCell::new(), - }) -} - -struct EncodedDeps { - peer_idx: AnyRleEncoder, - counter: AnyRleEncoder, + Ok(ans) } #[columnar(vec, ser, de, iterable)] @@ -710,10 +543,8 @@ pub fn decode_block( }); let EncodedBlock { n_changes, - timestamps, counter_start: first_counter, - commit_msg_lengths, - commit_msgs, + change_meta, cids, keys, ops, @@ -722,9 +553,12 @@ pub fn decode_block( positions, .. } = doc; - let mut changes = Vec::with_capacity(n_changes as usize); - let mut timestamp_decoder: DeltaRleDecoder = DeltaRleDecoder::new(×tamps); - let mut commit_msg_len_decoder = AnyRleDecoder::::new(&commit_msg_lengths); + let n_changes = n_changes as usize; + let mut changes = Vec::with_capacity(n_changes); + let timestamp_decoder = DeltaOfDeltaDecoder::::new(&change_meta).unwrap(); + let (timestamps, bytes) = timestamp_decoder.take_n_finalize(n_changes).unwrap(); + let commit_msg_len_decoder = AnyRleDecoder::::new(bytes); + let (commit_msg_lens, commit_msgs) = commit_msg_len_decoder.take_n_finalize(n_changes).unwrap(); let mut commit_msg_index = 0; let keys = header.keys.get_or_init(|| decode_keys(&keys)); let decode_arena = ValueDecodeArena { @@ -755,9 +589,9 @@ pub fn decode_block( .delete_start_ids .into_iter() .map(Ok); - for i in 0..(n_changes as usize) { + for i in 0..n_changes { let commit_msg: Option> = { - let len = commit_msg_len_decoder.next().unwrap().unwrap(); + let len = commit_msg_lens[i]; if len == 0 { None } else { @@ -779,7 +613,7 @@ pub fn decode_block( deps: header.deps_groups[i].clone(), id: ID::new(header.peer, header.counters[i]), lamport: header.lamports[i], - timestamp: timestamp_decoder.next().unwrap().unwrap() as Timestamp, + timestamp: timestamps[i] as Timestamp, commit_msg, }) } @@ -826,7 +660,6 @@ pub fn decode_block( change_index += 1; } } - Ok(changes) } diff --git a/crates/loro-internal/src/oplog/change_store/block_meta_encode.rs b/crates/loro-internal/src/oplog/change_store/block_meta_encode.rs new file mode 100644 index 00000000..85ee92d6 --- /dev/null +++ b/crates/loro-internal/src/oplog/change_store/block_meta_encode.rs @@ -0,0 +1,186 @@ +use loro_common::{Counter, Lamport, PeerID, ID}; +use once_cell::sync::OnceCell; +use rle::HasLength; +use serde_columnar::{ + AnyRleDecoder, AnyRleEncoder, BoolRleDecoder, BoolRleEncoder, DeltaOfDeltaDecoder, + DeltaOfDeltaEncoder, +}; + +use crate::{change::Change, encoding::value_register::ValueRegister, version::Frontiers}; + +use super::block_encode::ChangesBlockHeader; + +pub(crate) fn encode_changes( + block: &[Change], + peer_register: &mut ValueRegister, +) -> (Vec, Vec) { + let peer = block[0].peer(); + let mut timestamp_encoder = DeltaOfDeltaEncoder::new(); + let mut lamport_encoder = DeltaOfDeltaEncoder::new(); + let mut commit_msg_len_encoder = AnyRleEncoder::::new(); + let mut commit_msgs = String::new(); + let mut dep_self_encoder = BoolRleEncoder::new(); + let mut dep_len_encoder = AnyRleEncoder::::new(); + let mut encoded_deps = EncodedDeps { + peer_idx: AnyRleEncoder::new(), + counter: DeltaOfDeltaEncoder::new(), + }; + // First Counter + Change Len + let mut lengths_bytes = Vec::new(); + let mut counter = vec![]; + let mut n = 0; + + for (i, c) in block.iter().enumerate() { + counter.push(c.id.counter); + let is_last = i == block.len() - 1; + if !is_last { + leb128::write::unsigned(&mut lengths_bytes, c.atom_len() as u64).unwrap(); + lamport_encoder.append(c.lamport() as i64).unwrap(); + } + timestamp_encoder.append(c.timestamp()).unwrap(); + if let Some(msg) = c.commit_msg.as_ref() { + commit_msg_len_encoder.append(msg.len() as u32).unwrap(); + commit_msgs.push_str(msg); + } else { + commit_msg_len_encoder.append(0).unwrap(); + } + + let mut dep_on_self = false; + for dep in c.deps().iter() { + if dep.peer == peer { + dep_on_self = true; + } else { + let peer_idx = peer_register.register(&dep.peer); + encoded_deps.peer_idx.append(peer_idx as u32).unwrap(); + encoded_deps.counter.append(dep.counter as i64).unwrap(); + n += 1; + } + } + + dep_self_encoder.append(dep_on_self).unwrap(); + dep_len_encoder + .append(if dep_on_self { + c.deps().len() - 1 + } else { + c.deps().len() + }) + .unwrap(); + } + + // TODO: capacity + let mut ans = Vec::with_capacity(block.len() * 15); + let _ = leb128::write::unsigned(&mut ans, peer_register.vec().len() as u64); + ans.extend(peer_register.vec().iter().flat_map(|p| p.to_le_bytes())); + ans.append(&mut lengths_bytes); + ans.append(&mut dep_self_encoder.finish().unwrap()); + ans.append(&mut dep_len_encoder.finish().unwrap()); + ans.append(&mut encoded_deps.peer_idx.finish().unwrap()); + ans.append(&mut encoded_deps.counter.finish().unwrap()); + ans.append(&mut lamport_encoder.finish().unwrap()); + + let mut t = timestamp_encoder.finish().unwrap(); + let mut cml = commit_msg_len_encoder.finish().unwrap(); + let mut cms = commit_msgs.into_bytes(); + let mut meta = Vec::with_capacity(t.len() + cml.len() + cms.len()); + meta.append(&mut t); + meta.append(&mut cml); + meta.append(&mut cms); + + (ans, meta) +} + +pub(crate) fn decode_changes_header( + mut bytes: &[u8], + n_changes: usize, + first_counter: Counter, + counter_len: Counter, + lamport_start: Lamport, + lamport_len: Lamport, +) -> ChangesBlockHeader { + let mut this_counter = first_counter; + let peer_num = leb128::read::unsigned(&mut bytes).unwrap() as usize; + let mut peers = Vec::with_capacity(peer_num); + for i in 0..peer_num { + let peer_id = PeerID::from_le_bytes((&bytes[(8 * i)..(8 * (i + 1))]).try_into().unwrap()); + peers.push(peer_id); + } + let mut bytes = &bytes[8 * peer_num..]; + + // ┌───────────────────┬──────────────────────────────────────────┐ │ + // │ LEB First Counter │ N LEB128 Change AtomLen │◁───┼───── Important metadata + // └───────────────────┴──────────────────────────────────────────┘ │ + + let mut lengths = Vec::with_capacity(n_changes); + for _ in 0..n_changes - 1 { + lengths.push(leb128::read::unsigned(&mut bytes).unwrap() as Counter); + } + lengths.push(counter_len - lengths.iter().sum::()); + + // ┌───────────────────┬────────────────────────┬─────────────────┐ │ + // │N DepOnSelf BoolRle│ N Delta Rle Deps Lens │ N Dep IDs │◁───┘ + // └───────────────────┴────────────────────────┴─────────────────┘ + + let dep_self_decoder = BoolRleDecoder::new(bytes); + let (dep_self, bytes) = dep_self_decoder.take_n_finalize(n_changes).unwrap(); + let dep_len_decoder = AnyRleDecoder::::new(bytes); + let (deps_len, bytes) = dep_len_decoder.take_n_finalize(n_changes).unwrap(); + let other_dep_num = deps_len.iter().sum::(); + let dep_peer_decoder = AnyRleDecoder::::new(bytes); + let (dep_peers, bytes) = dep_peer_decoder.take_n_finalize(other_dep_num).unwrap(); + let mut deps_peers_iter = dep_peers.into_iter(); + let dep_counter_decoder = DeltaOfDeltaDecoder::::new(bytes).unwrap(); + let (dep_counters, bytes) = dep_counter_decoder.take_n_finalize(other_dep_num).unwrap(); + let mut deps_counters_iter = dep_counters.into_iter(); + let mut deps = Vec::with_capacity(n_changes); + for i in 0..n_changes { + let mut f = Frontiers::default(); + if dep_self[i] { + f.push(ID::new(peers[0], this_counter - 1)) + } + + let len = deps_len[i]; + for _ in 0..len { + let peer_idx = deps_peers_iter.next().unwrap(); + let peer = peers[peer_idx]; + let counter = deps_counters_iter.next().unwrap() as Counter; + f.push(ID::new(peer, counter)); + } + + deps.push(f); + this_counter += lengths[i]; + } + let mut counters = Vec::with_capacity(n_changes); + let mut last = first_counter; + for i in 0..n_changes { + counters.push(last); + last += lengths[i]; + } + + let lamport_decoder = DeltaOfDeltaDecoder::new(bytes).unwrap(); + let (mut lamports, rest) = lamport_decoder + .take_n_finalize(n_changes.saturating_sub(1)) + .unwrap(); + // the last lamport + lamports.push((lamport_start + lamport_len - *lengths.last().unwrap_or(&0) as u32) as Lamport); + + // we need counter range, so encode + counters.push(first_counter + counter_len); + debug_assert!(rest.is_empty()); + + ChangesBlockHeader { + peer: peers[0], + counter: first_counter, + n_changes, + peers, + counters, + deps_groups: deps, + lamports, + keys: OnceCell::new(), + cids: OnceCell::new(), + } +} + +struct EncodedDeps { + peer_idx: AnyRleEncoder, + counter: DeltaOfDeltaEncoder, +} diff --git a/crates/loro-internal/src/state.rs b/crates/loro-internal/src/state.rs index 071c14e0..ae1e2ce1 100644 --- a/crates/loro-internal/src/state.rs +++ b/crates/loro-internal/src/state.rs @@ -1,6 +1,5 @@ use std::{ borrow::Cow, - collections::BTreeMap, io::Write, sync::{ atomic::{AtomicU64, AtomicU8, Ordering}, diff --git a/crates/loro-internal/src/state/container_store.rs b/crates/loro-internal/src/state/container_store.rs index 813a65fe..d4d33da7 100644 --- a/crates/loro-internal/src/state/container_store.rs +++ b/crates/loro-internal/src/state/container_store.rs @@ -1,19 +1,14 @@ -#[cfg(feature = "counter")] -use super::counter_state::CounterState; -use super::{ContainerCreationContext, MovableListState, State, TreeState}; +use super::{ContainerCreationContext, State}; use crate::{ arena::SharedArena, configure::Configure, container::idx::ContainerIdx, - state::{FastStateSnapshot, RichtextState}, utils::kv_wrapper::KvWrapper, version::Frontiers, - VersionVector, }; use bytes::Bytes; -use fxhash::{FxHashMap, FxHashSet}; use inner_store::InnerStore; -use loro_common::{ContainerID, ContainerType, LoroResult, LoroValue}; +use loro_common::{LoroResult, LoroValue}; use std::sync::{atomic::AtomicU64, Arc, Mutex}; pub(crate) use container_wrapper::ContainerWrapper; diff --git a/crates/loro-internal/src/state/container_store/inner_store.rs b/crates/loro-internal/src/state/container_store/inner_store.rs index e1691db3..edfa3de9 100644 --- a/crates/loro-internal/src/state/container_store/inner_store.rs +++ b/crates/loro-internal/src/state/container_store/inner_store.rs @@ -1,14 +1,13 @@ use std::ops::Bound; use bytes::Bytes; -use fxhash::{FxHashMap, FxHashSet}; +use fxhash::FxHashMap; use loro_common::ContainerID; -use tracing::trace; use crate::{ arena::SharedArena, container::idx::ContainerIdx, - state::{container_store::FRONTIERS_KEY, ContainerCreationContext}, + state::container_store::FRONTIERS_KEY, utils::kv_wrapper::KvWrapper, version::Frontiers, }; diff --git a/crates/loro-internal/src/state/movable_list_state.rs b/crates/loro-internal/src/state/movable_list_state.rs index 8f73f834..f5375ada 100644 --- a/crates/loro-internal/src/state/movable_list_state.rs +++ b/crates/loro-internal/src/state/movable_list_state.rs @@ -18,8 +18,7 @@ use crate::{ handler::ValueOrHandler, op::{ListSlice, Op, RawOp}, state::movable_list_state::inner::PushElemInfo, - txn::Transaction, - ApplyDiff, DocState, ListDiff, + txn::Transaction, DocState, ListDiff, }; use self::{ diff --git a/crates/loro-internal/src/utils/kv_wrapper.rs b/crates/loro-internal/src/utils/kv_wrapper.rs index a4215dc4..0212d661 100644 --- a/crates/loro-internal/src/utils/kv_wrapper.rs +++ b/crates/loro-internal/src/utils/kv_wrapper.rs @@ -5,7 +5,7 @@ use std::{ }; use bytes::Bytes; -use loro_kv_store::{compress::CompressionType, mem_store::MemKvConfig, MemKvStore}; +use loro_kv_store::{mem_store::MemKvConfig, MemKvStore}; use crate::kv_store::KvStore; diff --git a/crates/loro/src/change_meta.rs b/crates/loro/src/change_meta.rs index 15e597a4..ccd66b7b 100644 --- a/crates/loro/src/change_meta.rs +++ b/crates/loro/src/change_meta.rs @@ -1,4 +1,4 @@ -use std::{cmp::Ordering, sync::Arc, time::Instant}; +use std::{cmp::Ordering, sync::Arc}; use loro_internal::{ change::{Change, Lamport, Timestamp}, diff --git a/crates/loro/src/lib.rs b/crates/loro/src/lib.rs index 0b05cc48..ce226407 100644 --- a/crates/loro/src/lib.rs +++ b/crates/loro/src/lib.rs @@ -12,7 +12,6 @@ use loro_internal::cursor::Side; use loro_internal::encoding::ImportBlobMetadata; use loro_internal::handler::HandlerTrait; use loro_internal::handler::ValueOrHandler; -use loro_internal::json::JsonChange; use loro_internal::obs::LocalUpdateCallback; use loro_internal::undo::{OnPop, OnPush}; use loro_internal::version::ImVersionVector; diff --git a/crates/loro/tests/integration_test/mod.rs b/crates/loro/tests/integration_test/mod.rs index cb43b95a..0b6d70f4 100644 --- a/crates/loro/tests/integration_test/mod.rs +++ b/crates/loro/tests/integration_test/mod.rs @@ -1,4 +1,3 @@ -use dev_utils::setup_test_log; use loro::LoroDoc; mod gc_test; @@ -6,7 +5,7 @@ mod undo_test; fn gen_action(doc: &LoroDoc, seed: u64, mut ops_len: usize) { let mut rng = StdRng::seed_from_u64(seed); - use loro::{ContainerType, LoroValue}; + use loro::LoroValue; use rand::prelude::*; let root_map = doc.get_map("root"); From 5450e511d01e270e14aa6ddd0922e1aa75198705 Mon Sep 17 00:00:00 2001 From: leeeon233 Date: Thu, 12 Sep 2024 17:24:40 +0800 Subject: [PATCH 5/5] chore: merge gc --- crates/fuzz/fuzz/Cargo.lock | 24 +++++++++---------- .../src/encoding/fast_snapshot.rs | 7 +++++- 2 files changed, 18 insertions(+), 13 deletions(-) diff --git a/crates/fuzz/fuzz/Cargo.lock b/crates/fuzz/fuzz/Cargo.lock index a26afd65..b87e9664 100644 --- a/crates/fuzz/fuzz/Cargo.lock +++ b/crates/fuzz/fuzz/Cargo.lock @@ -152,9 +152,9 @@ checksum = "22ec99545bb0ed0ea7bb9b8e1e9122ea386ff8a48c0922e43f36d45ab09e0e80" [[package]] name = "darling" -version = "0.20.7" +version = "0.20.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3a5d17510e4a1a87f323de70b7b1eaac1ee0e37866c6720b2d279452d0edf389" +checksum = "6f63b86c8a8826a49b8c21f08a2d07338eec8d900540f8630dc76284be802989" dependencies = [ "darling_core", "darling_macro", @@ -162,9 +162,9 @@ dependencies = [ [[package]] name = "darling_core" -version = "0.20.7" +version = "0.20.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a98eea36a7ff910fa751413d0895551143a8ea41d695d9798ec7d665df7f7f5e" +checksum = "95133861a8032aaea082871032f5815eb9e98cef03fa916ab4500513994df9e5" dependencies = [ "fnv", "ident_case", @@ -176,9 +176,9 @@ dependencies = [ [[package]] name = "darling_macro" -version = "0.20.7" +version = "0.20.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d6a366a3f90c5d59a4b91169775f88e52e8f71a0e7804cc98a8db2932cf4ed57" +checksum = "d336a2a514f6ccccaa3e09b02d41d35330c07ddf03a62165fcec10bb561c7806" dependencies = [ "darling_core", "quote", @@ -1180,9 +1180,9 @@ dependencies = [ [[package]] name = "serde_columnar" -version = "0.3.7" +version = "0.3.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3748cbf2b43a15ee9627881cabd7820d50508781cdebd3bb54cea49215d367a1" +checksum = "6d4e3c0e46450edf7da174b610b9143eb8ca22059ace5016741fc9e20b88d1e7" dependencies = [ "itertools 0.11.0", "postcard", @@ -1193,9 +1193,9 @@ dependencies = [ [[package]] name = "serde_columnar_derive" -version = "0.3.4" +version = "0.3.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3e5eaacabbc55a397ffbb1ee32523f40f86fdefea8a8d9db19630d8b7c00edd1" +checksum = "42c5d47942b2a7e76118b697fc0f94516a5d8366a3c0fee8d0e2b713e952e306" dependencies = [ "darling", "proc-macro2", @@ -1287,9 +1287,9 @@ dependencies = [ [[package]] name = "strsim" -version = "0.10.0" +version = "0.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623" +checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f" [[package]] name = "syn" diff --git a/crates/loro-internal/src/encoding/fast_snapshot.rs b/crates/loro-internal/src/encoding/fast_snapshot.rs index 3ee4f7a5..55e40a52 100644 --- a/crates/loro-internal/src/encoding/fast_snapshot.rs +++ b/crates/loro-internal/src/encoding/fast_snapshot.rs @@ -18,6 +18,7 @@ use std::io::{Read, Write}; use crate::{oplog::ChangeStore, LoroDoc, OpLog, VersionVector}; use bytes::{Buf, Bytes}; use loro_common::{IdSpan, LoroError, LoroResult}; +use tracing::trace; use super::encode_reordered::{import_changes_to_oplog, ImportChangesResult}; @@ -237,7 +238,11 @@ pub(crate) fn decode_updates(oplog: &mut OpLog, body: Bytes) -> Result<(), LoroE } changes.sort_unstable_by_key(|x| x.lamport); - let (latest_ids, pending_changes) = import_changes_to_oplog(changes, oplog)?; + let ImportChangesResult { + latest_ids, + pending_changes, + changes_that_deps_on_trimmed_history: _, + } = import_changes_to_oplog(changes, oplog); // TODO: PERF: should we use hashmap to filter latest_ids with the same peer first? oplog.try_apply_pending(latest_ids); oplog.import_unknown_lamport_pending_changes(pending_changes)?;