feat: add necessary query entry for change_store

This commit is contained in:
Zixuan Chen 2024-06-04 15:45:24 +08:00
parent 1f3290ef80
commit 6dcf801dd3
No known key found for this signature in database

View file

@ -1,6 +1,7 @@
use bytes::Bytes;
use loro_common::{
Counter, HasId, HasIdSpan, HasLamportSpan, IdSpan, Lamport, LoroError, LoroResult, PeerID, ID,
Counter, HasId, HasIdSpan, HasLamportSpan, IdLp, IdSpan, Lamport, LoroError, LoroResult,
PeerID, ID,
};
use once_cell::sync::OnceCell;
use rle::{HasLength, Mergable, RleCollection, RlePush};
@ -89,7 +90,7 @@ impl ChangeStore {
if block.peer == id.peer && block.counter_range.1 > id.counter {
block.ensure_changes().unwrap();
Some(BlockChangeRef {
change_index: block.get_change_index(id.counter).unwrap(),
change_index: block.get_change_index_by_counter(id.counter).unwrap(),
block: block.clone(),
})
} else {
@ -97,6 +98,29 @@ impl ChangeStore {
}
}
pub fn get_change_by_idlp(&mut self, idlp: IdLp) -> Option<BlockChangeRef> {
// TODO: this can be optimized if we use a more customized tree structure
let mut iter = self
.kv
.range_mut(ID::new(idlp.peer, 0)..ID::new(idlp.peer, i32::MAX));
while let Some((_id, block)) = iter.next_back() {
if block.lamport_range.1 <= idlp.lamport {
return None;
}
if block.lamport_range.0 <= idlp.lamport {
block.ensure_changes().unwrap();
let index = block.get_change_index_by_lamport(idlp.lamport).unwrap();
return Some(BlockChangeRef {
change_index: index,
block: block.clone(),
});
}
}
None
}
pub fn iter_changes(&mut self, id_span: IdSpan) -> impl Iterator<Item = BlockChangeRef> + '_ {
self.kv
.range_mut(id_span.id_start()..=id_span.id_end())
@ -111,9 +135,11 @@ impl ChangeStore {
start = 0;
end = changes.len();
} else {
start = block.get_change_index(id_span.counter.start).unwrap_or(0);
start = block
.get_change_index_by_counter(id_span.counter.start)
.unwrap_or(0);
end = block
.get_change_index(id_span.counter.end)
.get_change_index_by_counter(id_span.counter.end)
.unwrap_or(changes.len());
}
@ -123,6 +149,13 @@ impl ChangeStore {
})
})
}
pub fn change_num(&mut self) -> usize {
self.kv
.iter_mut()
.map(|(_, block)| block.change_num())
.sum()
}
}
pub struct BlockChangeRef {
@ -283,7 +316,7 @@ impl ChangesBlock {
}
}
fn get_change_index(&self, counter: Counter) -> Option<usize> {
fn get_change_index_by_counter(&self, counter: Counter) -> Option<usize> {
let changes = self.content.try_changes().unwrap();
let r = changes.binary_search_by(|c| {
if c.id.counter > counter {
@ -301,9 +334,39 @@ impl ChangesBlock {
}
}
fn get_change_index_by_lamport(&self, lamport: Lamport) -> Option<usize> {
let changes = self.content.try_changes().unwrap();
let r = changes.binary_search_by(|c| {
if c.lamport > lamport {
Ordering::Greater
} else if (c.lamport + c.content_len() as Lamport) <= lamport {
Ordering::Less
} else {
Ordering::Equal
}
});
match r {
Ok(found) => Some(found),
Err(_) => None,
}
}
fn get_changes(&mut self) -> LoroResult<&Vec<Change>> {
self.content.changes()
}
fn id(&self) -> ID {
ID::new(self.peer, self.counter_range.0)
}
pub fn change_num(&self) -> usize {
match &self.content {
ChangesBlockContent::Changes(c) => c.len(),
ChangesBlockContent::Bytes(b) => b.len_changes(),
ChangesBlockContent::Both(c, _) => c.len(),
}
}
}
#[derive(Clone)]
@ -398,7 +461,7 @@ impl ChangesBlockBytes {
fn serialize(changes: &[Change], a: &SharedArena) -> Self {
let bytes = encode_block(changes, a);
// TODO: Perf we can calculate header directly without parsing the bytes
let mut bytes = ChangesBlockBytes::new(Bytes::from(bytes));
let bytes = ChangesBlockBytes::new(Bytes::from(bytes));
bytes.ensure_header().unwrap();
bytes
}
@ -425,7 +488,7 @@ impl ChangesBlockBytes {
}
/// Length of the changes
fn len_changes(&mut self) -> usize {
fn len_changes(&self) -> usize {
self.ensure_header().unwrap();
self.header.get().unwrap().n_changes
}