From 6dcf801dd3e6e1cc3f8117e57f55c06bd75e14fa Mon Sep 17 00:00:00 2001 From: Zixuan Chen Date: Tue, 4 Jun 2024 15:45:24 +0800 Subject: [PATCH] feat: add necessary query entry for change_store --- .../loro-internal/src/oplog/change_store.rs | 77 +++++++++++++++++-- 1 file changed, 70 insertions(+), 7 deletions(-) diff --git a/crates/loro-internal/src/oplog/change_store.rs b/crates/loro-internal/src/oplog/change_store.rs index 27eb027c..8accc189 100644 --- a/crates/loro-internal/src/oplog/change_store.rs +++ b/crates/loro-internal/src/oplog/change_store.rs @@ -1,6 +1,7 @@ use bytes::Bytes; use loro_common::{ - Counter, HasId, HasIdSpan, HasLamportSpan, IdSpan, Lamport, LoroError, LoroResult, PeerID, ID, + Counter, HasId, HasIdSpan, HasLamportSpan, IdLp, IdSpan, Lamport, LoroError, LoroResult, + PeerID, ID, }; use once_cell::sync::OnceCell; use rle::{HasLength, Mergable, RleCollection, RlePush}; @@ -89,7 +90,7 @@ impl ChangeStore { if block.peer == id.peer && block.counter_range.1 > id.counter { block.ensure_changes().unwrap(); Some(BlockChangeRef { - change_index: block.get_change_index(id.counter).unwrap(), + change_index: block.get_change_index_by_counter(id.counter).unwrap(), block: block.clone(), }) } else { @@ -97,6 +98,29 @@ impl ChangeStore { } } + pub fn get_change_by_idlp(&mut self, idlp: IdLp) -> Option { + // TODO: this can be optimized if we use a more customized tree structure + let mut iter = self + .kv + .range_mut(ID::new(idlp.peer, 0)..ID::new(idlp.peer, i32::MAX)); + while let Some((_id, block)) = iter.next_back() { + if block.lamport_range.1 <= idlp.lamport { + return None; + } + + if block.lamport_range.0 <= idlp.lamport { + block.ensure_changes().unwrap(); + let index = block.get_change_index_by_lamport(idlp.lamport).unwrap(); + return Some(BlockChangeRef { + change_index: index, + block: block.clone(), + }); + } + } + + None + } + pub fn iter_changes(&mut self, id_span: IdSpan) -> impl Iterator + '_ { self.kv .range_mut(id_span.id_start()..=id_span.id_end()) @@ -111,9 +135,11 @@ impl ChangeStore { start = 0; end = changes.len(); } else { - start = block.get_change_index(id_span.counter.start).unwrap_or(0); + start = block + .get_change_index_by_counter(id_span.counter.start) + .unwrap_or(0); end = block - .get_change_index(id_span.counter.end) + .get_change_index_by_counter(id_span.counter.end) .unwrap_or(changes.len()); } @@ -123,6 +149,13 @@ impl ChangeStore { }) }) } + + pub fn change_num(&mut self) -> usize { + self.kv + .iter_mut() + .map(|(_, block)| block.change_num()) + .sum() + } } pub struct BlockChangeRef { @@ -283,7 +316,7 @@ impl ChangesBlock { } } - fn get_change_index(&self, counter: Counter) -> Option { + fn get_change_index_by_counter(&self, counter: Counter) -> Option { let changes = self.content.try_changes().unwrap(); let r = changes.binary_search_by(|c| { if c.id.counter > counter { @@ -301,9 +334,39 @@ impl ChangesBlock { } } + fn get_change_index_by_lamport(&self, lamport: Lamport) -> Option { + let changes = self.content.try_changes().unwrap(); + let r = changes.binary_search_by(|c| { + if c.lamport > lamport { + Ordering::Greater + } else if (c.lamport + c.content_len() as Lamport) <= lamport { + Ordering::Less + } else { + Ordering::Equal + } + }); + + match r { + Ok(found) => Some(found), + Err(_) => None, + } + } + + fn get_changes(&mut self) -> LoroResult<&Vec> { + self.content.changes() + } + fn id(&self) -> ID { ID::new(self.peer, self.counter_range.0) } + + pub fn change_num(&self) -> usize { + match &self.content { + ChangesBlockContent::Changes(c) => c.len(), + ChangesBlockContent::Bytes(b) => b.len_changes(), + ChangesBlockContent::Both(c, _) => c.len(), + } + } } #[derive(Clone)] @@ -398,7 +461,7 @@ impl ChangesBlockBytes { fn serialize(changes: &[Change], a: &SharedArena) -> Self { let bytes = encode_block(changes, a); // TODO: Perf we can calculate header directly without parsing the bytes - let mut bytes = ChangesBlockBytes::new(Bytes::from(bytes)); + let bytes = ChangesBlockBytes::new(Bytes::from(bytes)); bytes.ensure_header().unwrap(); bytes } @@ -425,7 +488,7 @@ impl ChangesBlockBytes { } /// Length of the changes - fn len_changes(&mut self) -> usize { + fn len_changes(&self) -> usize { self.ensure_header().unwrap(); self.header.get().unwrap().n_changes }