diff --git a/crates/loro-internal/src/oplog/change_store.rs b/crates/loro-internal/src/oplog/change_store.rs index 94bd7feb..27eb027c 100644 --- a/crates/loro-internal/src/oplog/change_store.rs +++ b/crates/loro-internal/src/oplog/change_store.rs @@ -1,7 +1,10 @@ use bytes::Bytes; -use loro_common::{Counter, HasLamportSpan, Lamport, LoroError, LoroResult, PeerID, ID}; -use rle::{HasLength, Mergable, RlePush}; -use std::{cmp::Ordering, collections::BTreeMap, io::Read, sync::Arc}; +use loro_common::{ + Counter, HasId, HasIdSpan, HasLamportSpan, IdSpan, Lamport, LoroError, LoroResult, PeerID, ID, +}; +use once_cell::sync::OnceCell; +use rle::{HasLength, Mergable, RleCollection, RlePush}; +use std::{cmp::Ordering, collections::BTreeMap, io::Read, ops::Deref, sync::Arc}; mod block_encode; mod delta_rle_encode; use crate::{ @@ -80,6 +83,58 @@ impl ChangeStore { Ok(()) } + + pub fn get_change(&mut self, id: ID) -> Option { + let (_id, block) = self.kv.range_mut(..=id).next_back()?; + if block.peer == id.peer && block.counter_range.1 > id.counter { + block.ensure_changes().unwrap(); + Some(BlockChangeRef { + change_index: block.get_change_index(id.counter).unwrap(), + block: block.clone(), + }) + } else { + None + } + } + + pub fn iter_changes(&mut self, id_span: IdSpan) -> impl Iterator + '_ { + self.kv + .range_mut(id_span.id_start()..=id_span.id_end()) + .flat_map(move |(_id, block)| { + block.ensure_changes().unwrap(); + let changes = block.content.try_changes().unwrap(); + let start; + let end; + if id_span.counter.start <= block.counter_range.0 + && id_span.counter.end >= block.counter_range.1 + { + start = 0; + end = changes.len(); + } else { + start = block.get_change_index(id_span.counter.start).unwrap_or(0); + end = block + .get_change_index(id_span.counter.end) + .unwrap_or(changes.len()); + } + + (start..end).map(|i| BlockChangeRef { + change_index: i, + block: block.clone(), + }) + }) + } +} + +pub struct BlockChangeRef { + block: Arc, + change_index: usize, +} + +impl Deref for BlockChangeRef { + type Target = Change; + fn deref(&self) -> &Change { + &self.block.content.try_changes().unwrap()[self.change_index] + } } #[derive(Debug, Clone)] @@ -206,7 +261,7 @@ impl ChangesBlock { ChangesBlockContent::Both(_, bytes) => bytes.clone(), ChangesBlockContent::Changes(changes) => { let bytes = ChangesBlockBytes::serialize(changes, a); - let c = Arc::clone(&changes); + let c = Arc::clone(changes); let this = Arc::make_mut(self); this.content = ChangesBlockContent::Both(c, bytes.clone()); bytes @@ -214,6 +269,38 @@ impl ChangesBlock { } } + pub fn ensure_changes(self: &mut Arc) -> LoroResult<()> { + match &self.content { + ChangesBlockContent::Changes(_) => Ok(()), + ChangesBlockContent::Both(_, _) => Ok(()), + ChangesBlockContent::Bytes(bytes) => { + let changes = bytes.parse(&SharedArena::new())?; + let b = bytes.clone(); + let this = Arc::make_mut(self); + this.content = ChangesBlockContent::Both(Arc::new(changes), b); + Ok(()) + } + } + } + + fn get_change_index(&self, counter: Counter) -> Option { + let changes = self.content.try_changes().unwrap(); + let r = changes.binary_search_by(|c| { + if c.id.counter > counter { + Ordering::Greater + } else if (c.id.counter + c.content_len() as Counter) <= counter { + Ordering::Less + } else { + Ordering::Equal + } + }); + + match r { + Ok(found) => Some(found), + Err(_) => None, + } + } + fn id(&self) -> ID { ID::new(self.peer, self.counter_range.0) } @@ -254,6 +341,14 @@ impl ChangesBlockContent { } } } + + fn try_changes(&self) -> Option<&Vec> { + match self { + ChangesBlockContent::Changes(changes) => Some(changes), + ChangesBlockContent::Both(changes, _) => Some(changes), + ChangesBlockContent::Bytes(_) => None, + } + } } impl std::fmt::Debug for ChangesBlockContent { @@ -278,27 +373,26 @@ impl std::fmt::Debug for ChangesBlockContent { #[derive(Clone)] pub(crate) struct ChangesBlockBytes { bytes: Bytes, - header: Option>, + header: OnceCell>, } impl ChangesBlockBytes { fn new(bytes: Bytes) -> Self { Self { - header: None, + header: OnceCell::new(), bytes, } } - fn ensure_header(&mut self) -> LoroResult<()> { - if self.header.is_none() { - self.header = Some(Arc::new(decode_header(&self.bytes)?)); - } + fn ensure_header(&self) -> LoroResult<()> { + self.header + .get_or_init(|| Arc::new(decode_header(&self.bytes).unwrap())); Ok(()) } - fn parse(&mut self, a: &SharedArena) -> LoroResult> { + fn parse(&self, a: &SharedArena) -> LoroResult> { self.ensure_header()?; - decode_block(&self.bytes, a, self.header.as_ref().map(|h| h.as_ref())) + decode_block(&self.bytes, a, self.header.get().map(|h| h.as_ref())) } fn serialize(changes: &[Change], a: &SharedArena) -> Self { @@ -311,29 +405,29 @@ impl ChangesBlockBytes { fn peer(&mut self) -> PeerID { self.ensure_header().unwrap(); - self.header.as_ref().unwrap().peer + self.header.get().as_ref().unwrap().peer } fn counter_range(&mut self) -> (Counter, Counter) { self.ensure_header().unwrap(); ( - self.header.as_ref().unwrap().counter, - *self.header.as_ref().unwrap().counters.last().unwrap(), + self.header.get().unwrap().counter, + *self.header.get().unwrap().counters.last().unwrap(), ) } fn lamport_range(&mut self) -> (Lamport, Lamport) { self.ensure_header().unwrap(); ( - self.header.as_ref().unwrap().lamports[0], - *self.header.as_ref().unwrap().lamports.last().unwrap(), + self.header.get().unwrap().lamports[0], + *self.header.get().unwrap().lamports.last().unwrap(), ) } /// Length of the changes fn len_changes(&mut self) -> usize { self.ensure_header().unwrap(); - self.header.as_ref().unwrap().n_changes + self.header.get().unwrap().n_changes } fn find_deps_for(&mut self, id: ID) -> Frontiers {