From fd6e945d4bded1ba64e55ac000677b0118604c3a Mon Sep 17 00:00:00 2001 From: Zixuan Chen Date: Tue, 28 May 2024 22:09:15 +0800 Subject: [PATCH] fix: import change merge err --- crates/loro-internal/examples/encoding.rs | 22 ++-- crates/loro-internal/src/oplog.rs | 4 +- .../loro-internal/src/oplog/change_store.rs | 105 ++++++++++++------ .../src/oplog/change_store/block_encode.rs | 54 +++++---- crates/rle/src/rle_vec.rs | 7 +- 5 files changed, 122 insertions(+), 70 deletions(-) diff --git a/crates/loro-internal/examples/encoding.rs b/crates/loro-internal/examples/encoding.rs index a4ee87bf..6d991815 100644 --- a/crates/loro-internal/examples/encoding.rs +++ b/crates/loro-internal/examples/encoding.rs @@ -73,8 +73,8 @@ fn main() { ); let start = Instant::now(); let blocks_bytes = loro.export_blocks(); - println!("blocks time {}ms", start.elapsed().as_millis()); - println!("blocks size {}", blocks_bytes.len()); + println!("export blocks time {:?} (2nd time)", start.elapsed()); + println!("export blocks size {}", blocks_bytes.len()); let _blocks_bytes_compressed = miniz_oxide::deflate::compress_to_vec(&blocks_bytes, 6); println!( "blocks time after compression {}ms", @@ -88,22 +88,26 @@ fn main() { { // Delta encoding - // let start = Instant::now(); - // for _ in 0..10 { - // loro.export_from(&Default::default()); - // } + let start = Instant::now(); + for _ in 0..10 { + loro.export_from(&Default::default()); + } - // println!("Avg encode {}ms", start.elapsed().as_millis() as f64 / 10.0); + println!("Avg encode {}ms", start.elapsed().as_millis() as f64 / 10.0); let data = loro.export_from(&Default::default()); let start = Instant::now(); - for _ in 0..5 { + let n = 5; + for _ in 0..n { let b = LoroDoc::default(); b.detach(); b.import(&data).unwrap(); } - println!("Avg decode {}ms", start.elapsed().as_millis() as f64 / 10.0); + println!( + "Avg normal decode {}ms (without applying)", + start.elapsed().as_millis() as f64 / (n as f64) + ); println!("size len={}", data.len()); let d = miniz_oxide::deflate::compress_to_vec(&data, 10); println!("size after compress len={}", d.len()); diff --git a/crates/loro-internal/src/oplog.rs b/crates/loro-internal/src/oplog.rs index ba140025..1443eb0b 100644 --- a/crates/loro-internal/src/oplog.rs +++ b/crates/loro-internal/src/oplog.rs @@ -258,11 +258,13 @@ impl OpLog { last.ctr_end(), "change id is not continuous" ); + let merge_interval = self.configure.merge_interval(); + let timestamp_change = change.timestamp - last.timestamp; // TODO: make this a config if !last.has_dependents && change.deps_on_self() - && timestamp_change < self.configure.merge_interval() + && timestamp_change < merge_interval { for op in take(change.ops.vec_mut()) { last.ops.push(op); diff --git a/crates/loro-internal/src/oplog/change_store.rs b/crates/loro-internal/src/oplog/change_store.rs index 6ba1db14..50276797 100644 --- a/crates/loro-internal/src/oplog/change_store.rs +++ b/crates/loro-internal/src/oplog/change_store.rs @@ -10,6 +10,8 @@ use crate::{ use self::block_encode::{decode_block, decode_header, encode_block, ChangesBlockHeader}; +const MAX_BLOCK_SIZE: usize = 1024 * 4; + #[derive(Debug, Clone)] pub struct ChangeStore { arena: SharedArena, @@ -56,7 +58,7 @@ impl ChangeStore { println!("block num {}", self.kv.len()); let mut bytes = Vec::new(); for (_, block) in self.iter_bytes() { - // println!("block size {}", block.bytes.len()); + println!("block size {}", block.bytes.len()); leb128::write::unsigned(&mut bytes, block.bytes.len() as u64).unwrap(); bytes.extend(&block.bytes); } @@ -90,12 +92,10 @@ pub struct ChangesBlock { content: ChangesBlockContent, } -const MAX_BLOCK_SIZE: usize = 1024 * 4; - impl ChangesBlock { pub fn from_bytes(bytes: Bytes, arena: &SharedArena) -> LoroResult { let len = bytes.len(); - let bytes = ChangesBlockBytes::new(bytes)?; + let mut bytes = ChangesBlockBytes::new(bytes); let peer = bytes.peer(); let counter_range = bytes.counter_range(); let lamport_range = bytes.lamport_range(); @@ -156,25 +156,41 @@ impl ChangesBlock { } pub fn push_change(&mut self, change: Change) -> Result<(), Change> { - if self.is_full() { - Err(change) - } else { - let atom_len = change.atom_len(); - self.lamport_range.1 = change.lamport + atom_len as Lamport; - self.counter_range.1 = change.id.counter + atom_len as Counter; + let atom_len = change.atom_len(); + let next_lamport = change.lamport + atom_len as Lamport; + let next_counter = change.id.counter + atom_len as Counter; - let changes = self.content.changes_mut().unwrap(); - match changes.last_mut() { - Some(last) if last.is_mergable(&change, &()) => { - last.merge(&change, &()); + let is_full = self.is_full(); + let changes = self.content.changes_mut().unwrap(); + let merge_interval = 10000; // TODO: FIXME: Use configure + match changes.last_mut() { + Some(last) + if change.deps_on_self() + && change.timestamp - last.timestamp < merge_interval + && (!is_full + || (change.ops.len() == 1 + && last.ops.last().unwrap().is_mergable(&change.ops[0], &()))) => + { + for op in change.ops.into_iter() { + let size = op.estimate_storage_size(); + if !last.ops.push(op) { + self.estimated_size += size; + } } - _ => { + } + _ => { + if is_full { + return Err(change); + } else { self.estimated_size += change.estimate_storage_size(); changes.push(change); } } - Ok(()) } + + self.counter_range.1 = next_counter; + self.lamport_range.1 = next_lamport; + Ok(()) } fn id(&self) -> ID { @@ -252,48 +268,65 @@ impl std::fmt::Debug for ChangesBlockContent { #[derive(Clone)] pub(crate) struct ChangesBlockBytes { bytes: Bytes, - header: ChangesBlockHeader, + header: Option, } impl ChangesBlockBytes { - fn new(bytes: Bytes) -> LoroResult { - Ok(Self { - header: decode_header(&bytes)?, + fn new(bytes: Bytes) -> Self { + Self { + header: None, bytes, - }) + } } - fn parse(&self, a: &SharedArena) -> LoroResult> { - decode_block(&self.bytes, a, &self.header) + fn ensure_header(&mut self) -> LoroResult<()> { + if self.header.is_none() { + self.header = Some(decode_header(&self.bytes)?); + } + Ok(()) + } + + fn parse(&mut self, a: &SharedArena) -> LoroResult> { + self.ensure_header()?; + decode_block(&self.bytes, a, self.header.as_ref()) } fn serialize(changes: &[Change], a: &SharedArena) -> Self { let bytes = encode_block(changes, a); // TODO: Perf we can calculate header directly without parsing the bytes - Self::new(Bytes::from(bytes)).unwrap() + let mut bytes = ChangesBlockBytes::new(Bytes::from(bytes)); + bytes.ensure_header().unwrap(); + bytes } - fn peer(&self) -> PeerID { - self.header.peer + fn peer(&mut self) -> PeerID { + self.ensure_header().unwrap(); + self.header.as_ref().unwrap().peer } - fn counter_range(&self) -> (Counter, Counter) { - (self.header.counter, *self.header.counters.last().unwrap()) - } - - fn lamport_range(&self) -> (Lamport, Lamport) { + fn counter_range(&mut self) -> (Counter, Counter) { + self.ensure_header().unwrap(); ( - self.header.lamports[0], - *self.header.lamports.last().unwrap(), + self.header.as_ref().unwrap().counter, + *self.header.as_ref().unwrap().counters.last().unwrap(), + ) + } + + fn lamport_range(&mut self) -> (Lamport, Lamport) { + self.ensure_header().unwrap(); + ( + self.header.as_ref().unwrap().lamports[0], + *self.header.as_ref().unwrap().lamports.last().unwrap(), ) } /// Length of the changes - fn len_changes(&self) -> usize { - self.header.n_changes + fn len_changes(&mut self) -> usize { + self.ensure_header().unwrap(); + self.header.as_ref().unwrap().n_changes } - fn find_deps_for(&self, id: ID) -> Frontiers { + fn find_deps_for(&mut self, id: ID) -> Frontiers { unimplemented!() } } diff --git a/crates/loro-internal/src/oplog/change_store/block_encode.rs b/crates/loro-internal/src/oplog/change_store/block_encode.rs index 5f71f545..bf20bf13 100644 --- a/crates/loro-internal/src/oplog/change_store/block_encode.rs +++ b/crates/loro-internal/src/oplog/change_store/block_encode.rs @@ -233,6 +233,7 @@ pub fn encode_block(block: &[Change], arena: &SharedArena) -> Vec { // │ │ │ │ │ // └──────────┴──────────┴───────┴────────────────────────────────┘ + println!("ops num {}", encoded_ops.len()); let ops_bytes = serde_columnar::to_vec(&EncodedOpsAndDeleteStarts { ops: encoded_ops, delete_start_ids: del_starts, @@ -331,8 +332,15 @@ pub(crate) struct ChangesBlockHeader { } pub fn decode_header(m_bytes: &[u8]) -> LoroResult { + let doc = postcard::from_bytes(m_bytes).map_err(|e| { + LoroError::DecodeError(format!("Decode block error {}", e).into_boxed_str()) + })?; + + decode_header_from_doc(&doc) +} + +fn decode_header_from_doc(doc: &EncodedDoc) -> Result { let EncodedDoc { - version, n_changes, first_counter, peers: peers_bytes, @@ -342,22 +350,18 @@ pub fn decode_header(m_bytes: &[u8]) -> LoroResult { dep_peer_idxs, dep_counters, lamports, - timestamps, - commit_msg_lengths, - commit_msgs, - cids, - keys, - ops, - values, - } = postcard::from_bytes(m_bytes).map_err(|e| { - LoroError::DecodeError(format!("Decode block error {}", e).into_boxed_str()) - })?; - if version != VERSION { - return Err(LoroError::IncompatibleFutureEncodingError(version as usize)); + version, + .. + } = doc; + + if *version != VERSION { + return Err(LoroError::IncompatibleFutureEncodingError( + *version as usize, + )); } - let first_counter = first_counter as Counter; - let n_changes = n_changes as usize; + let first_counter = *first_counter as Counter; + let n_changes = *n_changes as usize; let peer_num = peers_bytes.len() / 8; let mut peers = Vec::with_capacity(peer_num as usize); for i in 0..peer_num as usize { @@ -481,15 +485,22 @@ impl<'a> ValueDecodedArenasTrait for ValueDecodeArena<'a> { pub fn decode_block( m_bytes: &[u8], shared_arena: &SharedArena, - header: &ChangesBlockHeader, + header: Option<&ChangesBlockHeader>, ) -> LoroResult> { - let mut changes = Vec::with_capacity(header.n_changes); + let doc = postcard::from_bytes(m_bytes).map_err(|e| { + LoroError::DecodeError(format!("Decode block error {}", e).into_boxed_str()) + })?; + let mut header_on_stack = None; + let header = header.unwrap_or_else(|| { + header_on_stack = Some(decode_header_from_doc(&doc).unwrap()); + &header_on_stack.as_ref().unwrap() + }); let EncodedDoc { version, n_changes, first_counter, - peers: peers_bytes, - lengths: lengths_bytes, + peers, + lengths, dep_on_self, dep_len, dep_peer_idxs, @@ -502,9 +513,8 @@ pub fn decode_block( keys, ops, values, - } = postcard::from_bytes(m_bytes).map_err(|e| { - LoroError::DecodeError(format!("Decode block error {}", e).into_boxed_str()) - })?; + } = doc; + let mut changes = Vec::with_capacity(n_changes as usize); if version != VERSION { return Err(LoroError::IncompatibleFutureEncodingError(version as usize)); } diff --git a/crates/rle/src/rle_vec.rs b/crates/rle/src/rle_vec.rs index 40d792ad..544491c8 100644 --- a/crates/rle/src/rle_vec.rs +++ b/crates/rle/src/rle_vec.rs @@ -256,15 +256,18 @@ where A::Item: Mergable + HasLength, { /// push a new element to the end of the array. It may be merged with last element. - pub fn push(&mut self, value: A::Item) { + /// + /// Return whether merged + pub fn push(&mut self, value: A::Item) -> bool { if let Some(last) = self.vec.last_mut() { if last.is_mergable(&value, &()) { last.merge(&value, &()); - return; + return true; } } self.vec.push(value); + false } } impl RleVec