refactor: rm more usage of changes

This commit is contained in:
Zixuan Chen 2024-06-04 23:08:38 +08:00
parent 2ff4a88a49
commit 3a842a90ee
No known key found for this signature in database
5 changed files with 113 additions and 115 deletions

View file

@ -174,9 +174,13 @@ impl DiffCalculator {
if visited.contains(&op.container) {
// don't checkout if we have already checked out this container in this round
calculator.apply_change(oplog, RichOp::new_by_change(change, op), None);
calculator.apply_change(oplog, RichOp::new_by_change(&change, op), None);
} else {
calculator.apply_change(oplog, RichOp::new_by_change(change, op), Some(vv));
calculator.apply_change(
oplog,
RichOp::new_by_change(&change, op),
Some(vv),
);
visited.insert(container);
}
}
@ -726,14 +730,14 @@ impl DiffCalculatorTrait for RichtextDiffCalculator {
crate::container::list::list_op::InnerListOp::StyleEnd => {
let id = op.id();
// PERF: this can be sped up by caching the last style op
let start_op = oplog.get_op(op.id().inc(-1));
let start_op = oplog.get_op(op.id().inc(-1)).unwrap();
let InnerListOp::StyleStart {
start: _,
end,
key,
value,
info,
} = start_op.unwrap().content.as_list().unwrap()
} = start_op.content.as_list().unwrap()
else {
unreachable!()
};

View file

@ -1,5 +1,6 @@
use std::{borrow::Cow, cell::RefCell, cmp::Ordering, mem::take, rc::Rc};
use either::Either;
pub(crate) use encode::{encode_op, get_op_prop};
use fxhash::{FxHashMap, FxHashSet};
use generic_btree::rle::Sliceable;
@ -67,7 +68,10 @@ pub(crate) fn encode_updates(oplog: &OpLog, vv: &VersionVector) -> Vec<u8> {
} = extract_containers_in_order(
&mut diff_changes
.iter()
.flat_map(|x| x.ops.iter())
.flat_map(|x| match x {
Either::Left(c) => c.ops.iter(),
Either::Right(c) => c.ops.iter(),
})
.map(|x| x.container),
&oplog.arena,
);
@ -418,7 +422,13 @@ pub(crate) fn encode_snapshot(oplog: &OpLog, state: &DocState, vv: &VersionVecto
&mut state.iter().map(|x| x.container_idx()).chain(
diff_changes
.iter()
.flat_map(|x| x.ops.iter())
.flat_map(|x| {
let c = match x {
Either::Left(c) => c,
Either::Right(c) => c,
};
c.ops.iter()
})
.map(|x| x.container),
),
&oplog.arena,
@ -877,6 +887,7 @@ fn decode_snapshot_states(
mod encode {
#[allow(unused_imports)]
use crate::encoding::value::FutureValue;
use either::Either;
use fxhash::FxHashMap;
use loro_common::{ContainerType, HasId, PeerID, ID};
use rle::{HasLength, Sliceable};
@ -893,6 +904,7 @@ mod encode {
value_register::ValueRegister,
},
op::{FutureInnerContent, Op},
oplog::BlockChangeRef,
};
#[derive(Debug, Clone)]
@ -1006,7 +1018,7 @@ mod encode {
}
pub(super) fn encode_changes<'p, 'a: 'p>(
diff_changes: &'a [Cow<'a, Change>],
diff_changes: &'a [Either<Change, BlockChangeRef>],
dep_arena: &mut super::DepsArena,
push_op: &mut impl FnMut(TempOp<'a>),
container_idx2index: &FxHashMap<ContainerIdx, usize>,
@ -1016,6 +1028,10 @@ mod encode {
for change in diff_changes.iter() {
let mut dep_on_self = false;
let mut deps_len = 0;
let change = match change {
Either::Left(c) => c,
Either::Right(c) => c,
};
for dep in change.deps.iter() {
if dep.peer == change.id.peer {
dep_on_self = true;
@ -1058,12 +1074,12 @@ mod encode {
oplog: &'a OpLog,
vv: &'_ VersionVector,
peer_register: &mut ValueRegister<PeerID>,
) -> (Vec<i32>, Vec<Cow<'a, Change>>) {
) -> (Vec<i32>, Vec<Either<Change, BlockChangeRef>>) {
let self_vv = oplog.vv();
let start_vv = vv.trim(oplog.vv());
let mut start_counters = Vec::new();
let mut diff_changes: Vec<Cow<'a, Change>> = Vec::new();
let mut diff_changes: Vec<Either<Change, BlockChangeRef>> = Vec::new();
for change in oplog.iter_changes_peer_by_peer(&start_vv, self_vv) {
let start_cnt = start_vv.get(&change.id.peer).copied().unwrap_or(0);
if !peer_register.contains(&change.id.peer) {
@ -1072,13 +1088,18 @@ mod encode {
}
if change.id.counter < start_cnt {
let offset = start_cnt - change.id.counter;
diff_changes.push(Cow::Owned(change.slice(offset as usize, change.atom_len())));
diff_changes.push(Either::Left(
change.slice(offset as usize, change.atom_len()),
));
} else {
diff_changes.push(Cow::Borrowed(change));
diff_changes.push(Either::Right(change));
}
}
diff_changes.sort_by_key(|x| x.lamport);
diff_changes.sort_by_key(|x| match x {
Either::Left(c) => c.lamport,
Either::Right(c) => c.lamport,
});
(start_counters, diff_changes)
}

View file

@ -21,6 +21,7 @@ use crate::op::{FutureInnerContent, ListSlice, Op, RawOpContent, RemoteOp, RichO
use crate::span::{HasCounterSpan, HasIdSpan, HasLamportSpan};
use crate::version::{Frontiers, ImVersionVector, VersionVector};
use crate::LoroError;
use change_store::BlockOpRef;
pub(crate) use change_store::{BlockChangeRef, ChangeStore};
use fxhash::FxHashMap;
use itertools::Itertools;
@ -404,10 +405,6 @@ impl OpLog {
ID::new(peer, cnt)
}
pub fn get_peer_changes(&self, peer: PeerID) -> Option<&Vec<Change>> {
self.changes.get(&peer)
}
pub(crate) fn vv(&self) -> &VersionVector {
&self.dag.vv
}
@ -458,14 +455,8 @@ impl OpLog {
.unwrap_or(Lamport::MAX)
}
pub fn get_change_at(&self, id: ID) -> Option<&Change> {
if let Some(peer_changes) = self.changes.get(&id.peer) {
if let Some(result) = peer_changes.get_by_atom_index(id.counter) {
return Some(&peer_changes[result.merged_index]);
}
}
None
pub fn get_change_at(&self, id: ID) -> Option<BlockChangeRef> {
self.change_store.get_change(id)
}
pub fn get_deps_of(&self, id: ID) -> Option<Frontiers> {
@ -480,7 +471,7 @@ impl OpLog {
pub fn get_remote_change_at(&self, id: ID) -> Option<Change<RemoteOp>> {
let change = self.get_change_at(id)?;
Some(self.convert_change_to_remote(change))
Some(self.convert_change_to_remote(&change))
}
fn convert_change_to_remote(&self, change: &Change) -> Change<RemoteOp> {
@ -624,23 +615,8 @@ impl OpLog {
/// lookup change by id.
///
/// if id does not included in this oplog, return None
pub(crate) fn lookup_change(&self, id: ID) -> Option<&Change> {
self.changes.get(&id.peer).and_then(|changes| {
// Because get_by_atom_index would return Some if counter is at the end,
// we cannot use it directly.
// TODO: maybe we should refactor this
if id.counter <= changes.last().unwrap().id_last().counter {
Some(changes.get_by_atom_index(id.counter).unwrap().element)
} else {
None
}
})
}
#[allow(unused)]
pub(crate) fn lookup_op(&self, id: ID) -> Option<&crate::op::Op> {
self.lookup_change(id)
.and_then(|change| change.ops.get_by_atom_index(id.counter).map(|x| x.element))
pub(crate) fn lookup_change(&self, id: ID) -> Option<BlockChangeRef> {
self.change_store.get_change(id)
}
#[inline(always)]
@ -660,27 +636,10 @@ impl OpLog {
b: &VersionVector,
mut f: impl FnMut(&Change),
) {
for (peer, changes) in self.changes.iter() {
let mut from_cnt = a.get(peer).copied().unwrap_or(0);
let mut to_cnt = b.get(peer).copied().unwrap_or(0);
if from_cnt == to_cnt {
continue;
}
if to_cnt < from_cnt {
std::mem::swap(&mut from_cnt, &mut to_cnt);
}
let Some(result) = changes.get_by_atom_index(from_cnt) else {
continue;
};
for change in &changes[result.merged_index..changes.len()] {
if change.id.counter >= to_cnt {
break;
}
f(change)
let spans = b.sub_iter(a);
for span in spans {
for c in self.change_store.iter_changes(span) {
f(&c);
}
}
}
@ -703,7 +662,7 @@ impl OpLog {
to_frontiers: Option<&Frontiers>,
) -> (
VersionVector,
impl Iterator<Item = (&Change, Counter, Rc<RefCell<VersionVector>>)>,
impl Iterator<Item = (BlockChangeRef, Counter, Rc<RefCell<VersionVector>>)> + '_,
) {
let mut merged_vv = from.clone();
merged_vv.merge(to);
@ -749,11 +708,7 @@ impl OpLog {
.max(common_ancestors_vv.get(&peer).copied().unwrap_or(0));
let end = (inner.data.cnt + inner.data.len as Counter)
.min(merged_vv.get(&peer).copied().unwrap_or(0));
let change = self
.changes
.get(&peer)
.and_then(|x| x.get_by_atom_index(cnt).map(|x| x.element))
.unwrap();
let change = self.change_store.get_change(ID::new(peer, cnt)).unwrap();
if change.ctr_end() < end {
cur_cnt = change.ctr_end();
@ -773,7 +728,7 @@ impl OpLog {
}
pub fn len_changes(&self) -> usize {
self.changes.values().map(|x| x.len()).sum()
self.change_store.change_num()
}
pub fn diagnose_size(&self) -> SizeInfo {
@ -781,13 +736,11 @@ impl OpLog {
let mut total_ops = 0;
let mut total_atom_ops = 0;
let total_dag_node = self.dag.map.len();
for changes in self.changes.values() {
total_changes += changes.len();
for change in changes.iter() {
total_ops += change.ops.len();
total_atom_ops += change.atom_len();
}
}
self.change_store.visit_all_changes(&mut |change| {
total_changes += 1;
total_ops += change.ops.len();
total_atom_ops += change.atom_len();
});
println!("total changes: {}", total_changes);
println!("total ops: {}", total_ops);
@ -814,18 +767,11 @@ impl OpLog {
&'a self,
from: &VersionVector,
to: &VersionVector,
) -> impl Iterator<Item = &'a Change> + 'a {
) -> impl Iterator<Item = BlockChangeRef> + 'a {
let spans: Vec<_> = from.diff_iter(to).1.collect();
spans.into_iter().flat_map(move |span| {
let peer = span.peer;
let cnt = span.counter.start;
let end_cnt = span.counter.end;
let peer_changes = self.changes.get(&peer).unwrap();
let index = peer_changes.search_atom_index(cnt);
peer_changes[index..]
.iter()
.take_while(move |x| x.ctr_start() < end_cnt)
})
spans
.into_iter()
.flat_map(move |span| self.change_store.iter_changes(span))
}
pub(crate) fn iter_changes_causally_rev<'a>(
@ -845,28 +791,15 @@ impl OpLog {
}
pub(crate) fn idlp_to_id(&self, id: loro_common::IdLp) -> Option<ID> {
if let Some(peer_changes) = self.changes.get(&id.peer) {
let ans = peer_changes.binary_search_by(|c| {
if c.lamport > id.lamport {
Ordering::Greater
} else if (c.lamport + c.atom_len() as Lamport) <= id.lamport {
Ordering::Less
} else {
Ordering::Equal
}
});
match ans {
Ok(index) => {
let change = &peer_changes[index];
let counter = (id.lamport - change.lamport) as Counter + change.id.counter;
Some(ID::new(id.peer, counter))
}
Err(_) => None,
}
} else {
None
let change = self.change_store.get_change_by_idlp(id)?;
if change.lamport > id.lamport || change.lamport_end() <= id.lamport {
return None;
}
Some(ID::new(
change.id.peer,
(id.lamport - change.lamport) as Counter + change.id.counter,
))
}
#[allow(unused)]
@ -877,9 +810,9 @@ impl OpLog {
loro_common::IdLp { peer, lamport }
}
pub(crate) fn get_op(&self, id: ID) -> Option<&Op> {
pub(crate) fn get_op(&self, id: ID) -> Option<BlockOpRef> {
let change = self.get_change_at(id)?;
change.ops.get_by_atom_index(id.counter).map(|x| x.element)
change.get_op_with_counter(id.counter)
}
pub(crate) fn split_span_based_on_deps(&self, id_span: IdSpan) -> Vec<(IdSpan, Frontiers)> {

View file

@ -17,7 +17,7 @@ use tracing::trace;
mod block_encode;
mod delta_rle_encode;
use crate::{
arena::SharedArena, change::Change, estimated_size::EstimatedSize, version::Frontiers,
arena::SharedArena, change::Change, estimated_size::EstimatedSize, op::Op, version::Frontiers,
};
use self::block_encode::{decode_block, decode_header, encode_block, ChangesBlockHeader};
@ -64,7 +64,6 @@ impl ChangeStore {
pub(crate) fn encode_all(&self) -> Vec<u8> {
let mut kv = self.kv.lock().unwrap();
println!("block num {}", kv.len());
let mut bytes = Vec::new();
let iter = kv
.iter_mut()
@ -128,6 +127,16 @@ impl ChangeStore {
None
}
pub fn visit_all_changes(&self, f: &mut dyn FnMut(&Change)) {
let mut kv = self.kv.lock().unwrap();
for (_, block) in kv.iter_mut() {
block.ensure_changes().unwrap();
for c in block.content.try_changes().unwrap() {
f(c);
}
}
}
pub fn iter_changes(&self, id_span: IdSpan) -> impl Iterator<Item = BlockChangeRef> + '_ {
let mut kv = self.kv.lock().unwrap();
let start_counter = kv
@ -218,6 +227,36 @@ impl Deref for BlockChangeRef {
}
}
impl BlockChangeRef {
pub fn get_op_with_counter(&self, counter: Counter) -> Option<BlockOpRef> {
if counter >= self.id_end().counter {
return None;
}
let index = self.ops.search_atom_index(counter);
Some(BlockOpRef {
block: self.block.clone(),
change_index: self.change_index,
op_index: index,
})
}
}
#[derive(Clone)]
pub struct BlockOpRef {
pub block: Arc<ChangesBlock>,
pub change_index: usize,
pub op_index: usize,
}
impl Deref for BlockOpRef {
type Target = Op;
fn deref(&self) -> &Op {
&self.block.content.try_changes().unwrap()[self.change_index].ops[self.op_index]
}
}
#[derive(Debug, Clone)]
pub struct ChangesBlock {
arena: SharedArena,

View file

@ -407,8 +407,9 @@ fn get_counter_end(doc: &LoroDoc, peer: PeerID) -> Counter {
doc.oplog()
.lock()
.unwrap()
.get_peer_changes(peer)
.and_then(|x| x.last().map(|x| x.ctr_end()))
.vv()
.get(&peer)
.cloned()
.unwrap_or(0)
}