feat: add iter changes and get change to change_store

This commit is contained in:
Zixuan Chen 2024-05-29 19:01:45 +08:00
parent 8e0a4c6cc8
commit 338601308e
No known key found for this signature in database

View file

@ -1,7 +1,10 @@
use bytes::Bytes;
use loro_common::{Counter, HasLamportSpan, Lamport, LoroError, LoroResult, PeerID, ID};
use rle::{HasLength, Mergable, RlePush};
use std::{cmp::Ordering, collections::BTreeMap, io::Read, sync::Arc};
use loro_common::{
Counter, HasId, HasIdSpan, HasLamportSpan, IdSpan, Lamport, LoroError, LoroResult, PeerID, ID,
};
use once_cell::sync::OnceCell;
use rle::{HasLength, Mergable, RleCollection, RlePush};
use std::{cmp::Ordering, collections::BTreeMap, io::Read, ops::Deref, sync::Arc};
mod block_encode;
mod delta_rle_encode;
use crate::{
@ -80,6 +83,58 @@ impl ChangeStore {
Ok(())
}
pub fn get_change(&mut self, id: ID) -> Option<BlockChangeRef> {
let (_id, block) = self.kv.range_mut(..=id).next_back()?;
if block.peer == id.peer && block.counter_range.1 > id.counter {
block.ensure_changes().unwrap();
Some(BlockChangeRef {
change_index: block.get_change_index(id.counter).unwrap(),
block: block.clone(),
})
} else {
None
}
}
pub fn iter_changes(&mut self, id_span: IdSpan) -> impl Iterator<Item = BlockChangeRef> + '_ {
self.kv
.range_mut(id_span.id_start()..=id_span.id_end())
.flat_map(move |(_id, block)| {
block.ensure_changes().unwrap();
let changes = block.content.try_changes().unwrap();
let start;
let end;
if id_span.counter.start <= block.counter_range.0
&& id_span.counter.end >= block.counter_range.1
{
start = 0;
end = changes.len();
} else {
start = block.get_change_index(id_span.counter.start).unwrap_or(0);
end = block
.get_change_index(id_span.counter.end)
.unwrap_or(changes.len());
}
(start..end).map(|i| BlockChangeRef {
change_index: i,
block: block.clone(),
})
})
}
}
pub struct BlockChangeRef {
block: Arc<ChangesBlock>,
change_index: usize,
}
impl Deref for BlockChangeRef {
type Target = Change;
fn deref(&self) -> &Change {
&self.block.content.try_changes().unwrap()[self.change_index]
}
}
#[derive(Debug, Clone)]
@ -206,7 +261,7 @@ impl ChangesBlock {
ChangesBlockContent::Both(_, bytes) => bytes.clone(),
ChangesBlockContent::Changes(changes) => {
let bytes = ChangesBlockBytes::serialize(changes, a);
let c = Arc::clone(&changes);
let c = Arc::clone(changes);
let this = Arc::make_mut(self);
this.content = ChangesBlockContent::Both(c, bytes.clone());
bytes
@ -214,6 +269,38 @@ impl ChangesBlock {
}
}
pub fn ensure_changes(self: &mut Arc<Self>) -> LoroResult<()> {
match &self.content {
ChangesBlockContent::Changes(_) => Ok(()),
ChangesBlockContent::Both(_, _) => Ok(()),
ChangesBlockContent::Bytes(bytes) => {
let changes = bytes.parse(&SharedArena::new())?;
let b = bytes.clone();
let this = Arc::make_mut(self);
this.content = ChangesBlockContent::Both(Arc::new(changes), b);
Ok(())
}
}
}
fn get_change_index(&self, counter: Counter) -> Option<usize> {
let changes = self.content.try_changes().unwrap();
let r = changes.binary_search_by(|c| {
if c.id.counter > counter {
Ordering::Greater
} else if (c.id.counter + c.content_len() as Counter) <= counter {
Ordering::Less
} else {
Ordering::Equal
}
});
match r {
Ok(found) => Some(found),
Err(_) => None,
}
}
fn id(&self) -> ID {
ID::new(self.peer, self.counter_range.0)
}
@ -254,6 +341,14 @@ impl ChangesBlockContent {
}
}
}
fn try_changes(&self) -> Option<&Vec<Change>> {
match self {
ChangesBlockContent::Changes(changes) => Some(changes),
ChangesBlockContent::Both(changes, _) => Some(changes),
ChangesBlockContent::Bytes(_) => None,
}
}
}
impl std::fmt::Debug for ChangesBlockContent {
@ -278,27 +373,26 @@ impl std::fmt::Debug for ChangesBlockContent {
#[derive(Clone)]
pub(crate) struct ChangesBlockBytes {
bytes: Bytes,
header: Option<Arc<ChangesBlockHeader>>,
header: OnceCell<Arc<ChangesBlockHeader>>,
}
impl ChangesBlockBytes {
fn new(bytes: Bytes) -> Self {
Self {
header: None,
header: OnceCell::new(),
bytes,
}
}
fn ensure_header(&mut self) -> LoroResult<()> {
if self.header.is_none() {
self.header = Some(Arc::new(decode_header(&self.bytes)?));
}
fn ensure_header(&self) -> LoroResult<()> {
self.header
.get_or_init(|| Arc::new(decode_header(&self.bytes).unwrap()));
Ok(())
}
fn parse(&mut self, a: &SharedArena) -> LoroResult<Vec<Change>> {
fn parse(&self, a: &SharedArena) -> LoroResult<Vec<Change>> {
self.ensure_header()?;
decode_block(&self.bytes, a, self.header.as_ref().map(|h| h.as_ref()))
decode_block(&self.bytes, a, self.header.get().map(|h| h.as_ref()))
}
fn serialize(changes: &[Change], a: &SharedArena) -> Self {
@ -311,29 +405,29 @@ impl ChangesBlockBytes {
fn peer(&mut self) -> PeerID {
self.ensure_header().unwrap();
self.header.as_ref().unwrap().peer
self.header.get().as_ref().unwrap().peer
}
fn counter_range(&mut self) -> (Counter, Counter) {
self.ensure_header().unwrap();
(
self.header.as_ref().unwrap().counter,
*self.header.as_ref().unwrap().counters.last().unwrap(),
self.header.get().unwrap().counter,
*self.header.get().unwrap().counters.last().unwrap(),
)
}
fn lamport_range(&mut self) -> (Lamport, Lamport) {
self.ensure_header().unwrap();
(
self.header.as_ref().unwrap().lamports[0],
*self.header.as_ref().unwrap().lamports.last().unwrap(),
self.header.get().unwrap().lamports[0],
*self.header.get().unwrap().lamports.last().unwrap(),
)
}
/// Length of the changes
fn len_changes(&mut self) -> usize {
self.ensure_header().unwrap();
self.header.as_ref().unwrap().n_changes
self.header.get().unwrap().n_changes
}
fn find_deps_for(&mut self, id: ID) -> Frontiers {