diff --git a/crates/loro-internal/examples/encoding.rs b/crates/loro-internal/examples/encoding.rs index 0565d390..a4ee87bf 100644 --- a/crates/loro-internal/examples/encoding.rs +++ b/crates/loro-internal/examples/encoding.rs @@ -62,28 +62,52 @@ fn main() { output.len(), ); - // { - // // Delta encoding + let start = Instant::now(); + let blocks_bytes = loro.export_blocks(); + println!("blocks time {}ms", start.elapsed().as_millis()); + println!("blocks size {}", blocks_bytes.len()); + let blocks_bytes_compressed = miniz_oxide::deflate::compress_to_vec(&blocks_bytes, 6); + println!( + "blocks size after compression {}", + blocks_bytes_compressed.len() + ); + let start = Instant::now(); + let blocks_bytes = loro.export_blocks(); + println!("blocks time {}ms", start.elapsed().as_millis()); + println!("blocks size {}", blocks_bytes.len()); + let _blocks_bytes_compressed = miniz_oxide::deflate::compress_to_vec(&blocks_bytes, 6); + println!( + "blocks time after compression {}ms", + start.elapsed().as_millis() + ); + let start = Instant::now(); + let new_loro = LoroDoc::default(); + new_loro.import_blocks(&blocks_bytes).unwrap(); + println!("blocks decode time {:?}", start.elapsed()); - // // let start = Instant::now(); - // // for _ in 0..10 { - // // loro.export_from(&Default::default()); - // // } + { + // Delta encoding - // // println!("Avg encode {}ms", start.elapsed().as_millis() as f64 / 10.0); + // let start = Instant::now(); + // for _ in 0..10 { + // loro.export_from(&Default::default()); + // } - // let data = loro.export_from(&Default::default()); - // let start = Instant::now(); - // for _ in 0..5 { - // let b = LoroDoc::default(); - // b.import(&data).unwrap(); - // } + // println!("Avg encode {}ms", start.elapsed().as_millis() as f64 / 10.0); - // println!("Avg decode {}ms", start.elapsed().as_millis() as f64 / 10.0); - // println!("size len={}", data.len()); - // let d = miniz_oxide::deflate::compress_to_vec(&data, 10); - // println!("size after compress len={}", d.len()); - // } + let data = loro.export_from(&Default::default()); + let start = Instant::now(); + for _ in 0..5 { + let b = LoroDoc::default(); + b.detach(); + b.import(&data).unwrap(); + } + + println!("Avg decode {}ms", start.elapsed().as_millis() as f64 / 10.0); + println!("size len={}", data.len()); + let d = miniz_oxide::deflate::compress_to_vec(&data, 10); + println!("size after compress len={}", d.len()); + } // { // // Snapshot encoding diff --git a/crates/loro-internal/src/change.rs b/crates/loro-internal/src/change.rs index 552fa765..4cc27254 100644 --- a/crates/loro-internal/src/change.rs +++ b/crates/loro-internal/src/change.rs @@ -97,10 +97,10 @@ impl EstimatedSize for Change { /// Estimate the storage size of the change in bytes #[inline] fn estimate_storage_size(&self) -> usize { - let id_size = 5; - let lamport_size = 3; - let timestamp_size = 4; - let deps_size = (self.deps.len().max(1) - 1) * 5; + let id_size = 2; + let lamport_size = 1; + let timestamp_size = 1; + let deps_size = (self.deps.len().max(1) - 1) * 4; let ops_size = self .ops .iter() diff --git a/crates/loro-internal/src/loro.rs b/crates/loro-internal/src/loro.rs index 2575ab47..6e875f43 100644 --- a/crates/loro-internal/src/loro.rs +++ b/crates/loro-internal/src/loro.rs @@ -415,6 +415,13 @@ impl LoroDoc { ans } + pub fn export_blocks(&self) -> Vec { + self.commit_then_stop(); + let ans = self.oplog.lock().unwrap().export_blocks(); + self.renew_txn_if_auto_commit(); + ans + } + #[inline(always)] #[instrument(skip_all)] pub fn import(&self, bytes: &[u8]) -> Result<(), LoroError> { @@ -429,6 +436,11 @@ impl LoroDoc { ans } + /// TODO: FIXME: HACK: This is just for experiment + pub fn import_blocks(&self, blocks_bytes: &[u8]) -> LoroResult<()> { + self.oplog.lock().unwrap().import_blocks(blocks_bytes) + } + fn _import_with(&self, bytes: &[u8], origin: InternalString) -> Result<(), LoroError> { let parsed = parse_header_and_body(bytes)?; match parsed.mode.is_snapshot() { diff --git a/crates/loro-internal/src/oplog.rs b/crates/loro-internal/src/oplog.rs index bac3e4e7..ba140025 100644 --- a/crates/loro-internal/src/oplog.rs +++ b/crates/loro-internal/src/oplog.rs @@ -177,10 +177,10 @@ impl OpLog { pub(crate) fn new() -> Self { let arena = SharedArena::new(); Self { + change_store: ChangeStore::new(&arena), dag: AppDag::default(), op_groups: OpGroups::new(arena.clone()), changes: ClientChanges::default(), - change_store: ChangeStore::new(), arena, next_lamport: 0, latest_timestamp: Timestamp::default(), @@ -248,6 +248,7 @@ impl OpLog { /// This is the **only** place to update the `OpLog.changes` pub(crate) fn insert_new_change(&mut self, mut change: Change, _: EnsureChangeDepsAreAtTheEnd) { self.op_groups.insert_by_change(&change); + self.change_store.insert_change(change.clone()); self.register_container_and_parent_link(&change); let entry = self.changes.entry(change.id.peer).or_default(); match entry.last_mut() { @@ -974,6 +975,14 @@ impl OpLog { ans } + + pub(crate) fn export_blocks(&mut self) -> Vec { + self.change_store.encode_all() + } + + pub(crate) fn import_blocks(&mut self, blocks: &[u8]) -> Result<(), LoroError> { + self.change_store.decode_all(blocks) + } } #[derive(Debug)] diff --git a/crates/loro-internal/src/oplog/change_store.rs b/crates/loro-internal/src/oplog/change_store.rs index 5b10b629..6ba1db14 100644 --- a/crates/loro-internal/src/oplog/change_store.rs +++ b/crates/loro-internal/src/oplog/change_store.rs @@ -1,7 +1,7 @@ use bytes::Bytes; -use loro_common::{Counter, HasLamportSpan, Lamport, LoroResult, PeerID, ID}; -use rle::{HasLength, RlePush}; -use std::{cmp::Ordering, collections::BTreeMap}; +use loro_common::{Counter, HasLamportSpan, Lamport, LoroError, LoroResult, PeerID, ID}; +use rle::{HasLength, Mergable, RlePush}; +use std::{cmp::Ordering, collections::BTreeMap, io::Read, sync::Arc}; mod block_encode; mod delta_rle_encode; use crate::{ @@ -12,15 +12,71 @@ use self::block_encode::{decode_block, decode_header, encode_block, ChangesBlock #[derive(Debug, Clone)] pub struct ChangeStore { + arena: SharedArena, kv: BTreeMap, } impl ChangeStore { - pub fn new() -> Self { + pub fn new(a: &SharedArena) -> Self { Self { + arena: a.clone(), kv: BTreeMap::new(), } } + + pub fn insert_change(&mut self, mut change: Change) { + let id = change.id; + if let Some((_id, block)) = self.kv.range_mut(..id).next_back() { + match block.push_change(change) { + Ok(_) => { + return; + } + Err(c) => change = c, + } + } + + self.kv.insert(id, ChangesBlock::new(change, &self.arena)); + } + + pub fn insert_block(&mut self, block: ChangesBlock) { + unimplemented!() + } + + pub fn block_num(&self) -> usize { + self.kv.len() + } + + pub(crate) fn iter_bytes(&mut self) -> impl Iterator + '_ { + self.kv + .iter_mut() + .map(|(id, block)| (*id, block.content.bytes(&self.arena))) + } + + pub(crate) fn encode_all(&mut self) -> Vec { + println!("block num {}", self.kv.len()); + let mut bytes = Vec::new(); + for (_, block) in self.iter_bytes() { + // println!("block size {}", block.bytes.len()); + leb128::write::unsigned(&mut bytes, block.bytes.len() as u64).unwrap(); + bytes.extend(&block.bytes); + } + + bytes + } + + pub(crate) fn decode_all(&mut self, blocks: &[u8]) -> Result<(), LoroError> { + assert!(self.kv.is_empty()); + let mut reader = blocks; + while !reader.is_empty() { + let size = leb128::read::unsigned(&mut reader).unwrap(); + let block_bytes = &reader[0..size as usize]; + let block = ChangesBlock::from_bytes(Bytes::copy_from_slice(block_bytes), &self.arena)?; + self.kv.insert(block.id(), block); + reader = &reader[size as usize..]; + } + + Ok(()) + } } #[derive(Debug, Clone)] @@ -37,7 +93,7 @@ pub struct ChangesBlock { const MAX_BLOCK_SIZE: usize = 1024 * 4; impl ChangesBlock { - pub fn from_bytes(bytes: Bytes, arena: SharedArena) -> LoroResult { + pub fn from_bytes(bytes: Bytes, arena: &SharedArena) -> LoroResult { let len = bytes.len(); let bytes = ChangesBlockBytes::new(bytes)?; let peer = bytes.peer(); @@ -45,7 +101,7 @@ impl ChangesBlock { let lamport_range = bytes.lamport_range(); let content = ChangesBlockContent::Bytes(bytes); Ok(Self { - arena, + arena: arena.clone(), peer, estimated_size: len, counter_range, @@ -54,6 +110,23 @@ impl ChangesBlock { }) } + pub fn new(change: Change, a: &SharedArena) -> Self { + let atom_len = change.atom_len(); + let counter_range = (change.id.counter, change.id.counter + atom_len as Counter); + let lamport_range = (change.lamport, change.lamport + atom_len as Lamport); + let estimated_size = change.estimate_storage_size(); + let peer = change.id.peer; + let content = ChangesBlockContent::Changes(vec![change]); + Self { + arena: a.clone(), + peer, + counter_range, + lamport_range, + estimated_size, + content, + } + } + pub fn cmp_id(&self, id: ID) -> Ordering { self.peer.cmp(&id.peer).then_with(|| { if self.counter_range.0 > id.counter { @@ -89,12 +162,24 @@ impl ChangesBlock { let atom_len = change.atom_len(); self.lamport_range.1 = change.lamport + atom_len as Lamport; self.counter_range.1 = change.id.counter + atom_len as Counter; - self.estimated_size += change.estimate_storage_size(); + let changes = self.content.changes_mut().unwrap(); - changes.push_rle_element(change); + match changes.last_mut() { + Some(last) if last.is_mergable(&change, &()) => { + last.merge(&change, &()); + } + _ => { + self.estimated_size += change.estimate_storage_size(); + changes.push(change); + } + } Ok(()) } } + + fn id(&self) -> ID { + ID::new(self.peer, self.counter_range.0) + } } #[derive(Clone)] @@ -117,14 +202,14 @@ impl ChangesBlockContent { } } - pub fn bytes(&mut self) -> &ChangesBlockBytes { + pub fn bytes(&mut self, a: &SharedArena) -> &ChangesBlockBytes { match self { ChangesBlockContent::Bytes(bytes) => bytes, ChangesBlockContent::Both(_, bytes) => bytes, ChangesBlockContent::Changes(changes) => { - let bytes = ChangesBlockBytes::serialize(changes, &SharedArena::new()); + let bytes = ChangesBlockBytes::serialize(changes, a); *self = ChangesBlockContent::Both(std::mem::take(changes), bytes); - self.bytes() + self.bytes(a) } } } @@ -165,7 +250,7 @@ impl std::fmt::Debug for ChangesBlockContent { } #[derive(Clone)] -struct ChangesBlockBytes { +pub(crate) struct ChangesBlockBytes { bytes: Bytes, header: ChangesBlockHeader, } diff --git a/crates/loro-internal/src/oplog/change_store/block_encode.rs b/crates/loro-internal/src/oplog/change_store/block_encode.rs index 7e7ce3e9..5f71f545 100644 --- a/crates/loro-internal/src/oplog/change_store/block_encode.rs +++ b/crates/loro-internal/src/oplog/change_store/block_encode.rs @@ -360,7 +360,7 @@ pub fn decode_header(m_bytes: &[u8]) -> LoroResult { let n_changes = n_changes as usize; let peer_num = peers_bytes.len() / 8; let mut peers = Vec::with_capacity(peer_num as usize); - for i in 0..(n_changes as usize) { + for i in 0..peer_num as usize { let peer_id = PeerID::from_le_bytes((&peers_bytes[(8 * i)..(8 * (i + 1))]).try_into().unwrap()); peers.push(peer_id);