From 3a842a90eee1e1be0fbfb06d77d64ec2c13993f0 Mon Sep 17 00:00:00 2001 From: Zixuan Chen Date: Tue, 4 Jun 2024 23:08:38 +0800 Subject: [PATCH] refactor: rm more usage of changes --- crates/loro-internal/src/diff_calc.rs | 12 +- .../src/encoding/encode_reordered.rs | 37 +++-- crates/loro-internal/src/oplog.rs | 131 +++++------------- .../loro-internal/src/oplog/change_store.rs | 43 +++++- crates/loro-internal/src/undo.rs | 5 +- 5 files changed, 113 insertions(+), 115 deletions(-) diff --git a/crates/loro-internal/src/diff_calc.rs b/crates/loro-internal/src/diff_calc.rs index a0dfdaeb..5602d62c 100644 --- a/crates/loro-internal/src/diff_calc.rs +++ b/crates/loro-internal/src/diff_calc.rs @@ -174,9 +174,13 @@ impl DiffCalculator { if visited.contains(&op.container) { // don't checkout if we have already checked out this container in this round - calculator.apply_change(oplog, RichOp::new_by_change(change, op), None); + calculator.apply_change(oplog, RichOp::new_by_change(&change, op), None); } else { - calculator.apply_change(oplog, RichOp::new_by_change(change, op), Some(vv)); + calculator.apply_change( + oplog, + RichOp::new_by_change(&change, op), + Some(vv), + ); visited.insert(container); } } @@ -726,14 +730,14 @@ impl DiffCalculatorTrait for RichtextDiffCalculator { crate::container::list::list_op::InnerListOp::StyleEnd => { let id = op.id(); // PERF: this can be sped up by caching the last style op - let start_op = oplog.get_op(op.id().inc(-1)); + let start_op = oplog.get_op(op.id().inc(-1)).unwrap(); let InnerListOp::StyleStart { start: _, end, key, value, info, - } = start_op.unwrap().content.as_list().unwrap() + } = start_op.content.as_list().unwrap() else { unreachable!() }; diff --git a/crates/loro-internal/src/encoding/encode_reordered.rs b/crates/loro-internal/src/encoding/encode_reordered.rs index 99564c3c..7f3178ec 100644 --- a/crates/loro-internal/src/encoding/encode_reordered.rs +++ b/crates/loro-internal/src/encoding/encode_reordered.rs @@ -1,5 +1,6 @@ use std::{borrow::Cow, cell::RefCell, cmp::Ordering, mem::take, rc::Rc}; +use either::Either; pub(crate) use encode::{encode_op, get_op_prop}; use fxhash::{FxHashMap, FxHashSet}; use generic_btree::rle::Sliceable; @@ -67,7 +68,10 @@ pub(crate) fn encode_updates(oplog: &OpLog, vv: &VersionVector) -> Vec { } = extract_containers_in_order( &mut diff_changes .iter() - .flat_map(|x| x.ops.iter()) + .flat_map(|x| match x { + Either::Left(c) => c.ops.iter(), + Either::Right(c) => c.ops.iter(), + }) .map(|x| x.container), &oplog.arena, ); @@ -418,7 +422,13 @@ pub(crate) fn encode_snapshot(oplog: &OpLog, state: &DocState, vv: &VersionVecto &mut state.iter().map(|x| x.container_idx()).chain( diff_changes .iter() - .flat_map(|x| x.ops.iter()) + .flat_map(|x| { + let c = match x { + Either::Left(c) => c, + Either::Right(c) => c, + }; + c.ops.iter() + }) .map(|x| x.container), ), &oplog.arena, @@ -877,6 +887,7 @@ fn decode_snapshot_states( mod encode { #[allow(unused_imports)] use crate::encoding::value::FutureValue; + use either::Either; use fxhash::FxHashMap; use loro_common::{ContainerType, HasId, PeerID, ID}; use rle::{HasLength, Sliceable}; @@ -893,6 +904,7 @@ mod encode { value_register::ValueRegister, }, op::{FutureInnerContent, Op}, + oplog::BlockChangeRef, }; #[derive(Debug, Clone)] @@ -1006,7 +1018,7 @@ mod encode { } pub(super) fn encode_changes<'p, 'a: 'p>( - diff_changes: &'a [Cow<'a, Change>], + diff_changes: &'a [Either], dep_arena: &mut super::DepsArena, push_op: &mut impl FnMut(TempOp<'a>), container_idx2index: &FxHashMap, @@ -1016,6 +1028,10 @@ mod encode { for change in diff_changes.iter() { let mut dep_on_self = false; let mut deps_len = 0; + let change = match change { + Either::Left(c) => c, + Either::Right(c) => c, + }; for dep in change.deps.iter() { if dep.peer == change.id.peer { dep_on_self = true; @@ -1058,12 +1074,12 @@ mod encode { oplog: &'a OpLog, vv: &'_ VersionVector, peer_register: &mut ValueRegister, - ) -> (Vec, Vec>) { + ) -> (Vec, Vec>) { let self_vv = oplog.vv(); let start_vv = vv.trim(oplog.vv()); let mut start_counters = Vec::new(); - let mut diff_changes: Vec> = Vec::new(); + let mut diff_changes: Vec> = Vec::new(); for change in oplog.iter_changes_peer_by_peer(&start_vv, self_vv) { let start_cnt = start_vv.get(&change.id.peer).copied().unwrap_or(0); if !peer_register.contains(&change.id.peer) { @@ -1072,13 +1088,18 @@ mod encode { } if change.id.counter < start_cnt { let offset = start_cnt - change.id.counter; - diff_changes.push(Cow::Owned(change.slice(offset as usize, change.atom_len()))); + diff_changes.push(Either::Left( + change.slice(offset as usize, change.atom_len()), + )); } else { - diff_changes.push(Cow::Borrowed(change)); + diff_changes.push(Either::Right(change)); } } - diff_changes.sort_by_key(|x| x.lamport); + diff_changes.sort_by_key(|x| match x { + Either::Left(c) => c.lamport, + Either::Right(c) => c.lamport, + }); (start_counters, diff_changes) } diff --git a/crates/loro-internal/src/oplog.rs b/crates/loro-internal/src/oplog.rs index 2ca7a282..c1027f44 100644 --- a/crates/loro-internal/src/oplog.rs +++ b/crates/loro-internal/src/oplog.rs @@ -21,6 +21,7 @@ use crate::op::{FutureInnerContent, ListSlice, Op, RawOpContent, RemoteOp, RichO use crate::span::{HasCounterSpan, HasIdSpan, HasLamportSpan}; use crate::version::{Frontiers, ImVersionVector, VersionVector}; use crate::LoroError; +use change_store::BlockOpRef; pub(crate) use change_store::{BlockChangeRef, ChangeStore}; use fxhash::FxHashMap; use itertools::Itertools; @@ -404,10 +405,6 @@ impl OpLog { ID::new(peer, cnt) } - pub fn get_peer_changes(&self, peer: PeerID) -> Option<&Vec> { - self.changes.get(&peer) - } - pub(crate) fn vv(&self) -> &VersionVector { &self.dag.vv } @@ -458,14 +455,8 @@ impl OpLog { .unwrap_or(Lamport::MAX) } - pub fn get_change_at(&self, id: ID) -> Option<&Change> { - if let Some(peer_changes) = self.changes.get(&id.peer) { - if let Some(result) = peer_changes.get_by_atom_index(id.counter) { - return Some(&peer_changes[result.merged_index]); - } - } - - None + pub fn get_change_at(&self, id: ID) -> Option { + self.change_store.get_change(id) } pub fn get_deps_of(&self, id: ID) -> Option { @@ -480,7 +471,7 @@ impl OpLog { pub fn get_remote_change_at(&self, id: ID) -> Option> { let change = self.get_change_at(id)?; - Some(self.convert_change_to_remote(change)) + Some(self.convert_change_to_remote(&change)) } fn convert_change_to_remote(&self, change: &Change) -> Change { @@ -624,23 +615,8 @@ impl OpLog { /// lookup change by id. /// /// if id does not included in this oplog, return None - pub(crate) fn lookup_change(&self, id: ID) -> Option<&Change> { - self.changes.get(&id.peer).and_then(|changes| { - // Because get_by_atom_index would return Some if counter is at the end, - // we cannot use it directly. - // TODO: maybe we should refactor this - if id.counter <= changes.last().unwrap().id_last().counter { - Some(changes.get_by_atom_index(id.counter).unwrap().element) - } else { - None - } - }) - } - - #[allow(unused)] - pub(crate) fn lookup_op(&self, id: ID) -> Option<&crate::op::Op> { - self.lookup_change(id) - .and_then(|change| change.ops.get_by_atom_index(id.counter).map(|x| x.element)) + pub(crate) fn lookup_change(&self, id: ID) -> Option { + self.change_store.get_change(id) } #[inline(always)] @@ -660,27 +636,10 @@ impl OpLog { b: &VersionVector, mut f: impl FnMut(&Change), ) { - for (peer, changes) in self.changes.iter() { - let mut from_cnt = a.get(peer).copied().unwrap_or(0); - let mut to_cnt = b.get(peer).copied().unwrap_or(0); - if from_cnt == to_cnt { - continue; - } - - if to_cnt < from_cnt { - std::mem::swap(&mut from_cnt, &mut to_cnt); - } - - let Some(result) = changes.get_by_atom_index(from_cnt) else { - continue; - }; - - for change in &changes[result.merged_index..changes.len()] { - if change.id.counter >= to_cnt { - break; - } - - f(change) + let spans = b.sub_iter(a); + for span in spans { + for c in self.change_store.iter_changes(span) { + f(&c); } } } @@ -703,7 +662,7 @@ impl OpLog { to_frontiers: Option<&Frontiers>, ) -> ( VersionVector, - impl Iterator>)>, + impl Iterator>)> + '_, ) { let mut merged_vv = from.clone(); merged_vv.merge(to); @@ -749,11 +708,7 @@ impl OpLog { .max(common_ancestors_vv.get(&peer).copied().unwrap_or(0)); let end = (inner.data.cnt + inner.data.len as Counter) .min(merged_vv.get(&peer).copied().unwrap_or(0)); - let change = self - .changes - .get(&peer) - .and_then(|x| x.get_by_atom_index(cnt).map(|x| x.element)) - .unwrap(); + let change = self.change_store.get_change(ID::new(peer, cnt)).unwrap(); if change.ctr_end() < end { cur_cnt = change.ctr_end(); @@ -773,7 +728,7 @@ impl OpLog { } pub fn len_changes(&self) -> usize { - self.changes.values().map(|x| x.len()).sum() + self.change_store.change_num() } pub fn diagnose_size(&self) -> SizeInfo { @@ -781,13 +736,11 @@ impl OpLog { let mut total_ops = 0; let mut total_atom_ops = 0; let total_dag_node = self.dag.map.len(); - for changes in self.changes.values() { - total_changes += changes.len(); - for change in changes.iter() { - total_ops += change.ops.len(); - total_atom_ops += change.atom_len(); - } - } + self.change_store.visit_all_changes(&mut |change| { + total_changes += 1; + total_ops += change.ops.len(); + total_atom_ops += change.atom_len(); + }); println!("total changes: {}", total_changes); println!("total ops: {}", total_ops); @@ -814,18 +767,11 @@ impl OpLog { &'a self, from: &VersionVector, to: &VersionVector, - ) -> impl Iterator + 'a { + ) -> impl Iterator + 'a { let spans: Vec<_> = from.diff_iter(to).1.collect(); - spans.into_iter().flat_map(move |span| { - let peer = span.peer; - let cnt = span.counter.start; - let end_cnt = span.counter.end; - let peer_changes = self.changes.get(&peer).unwrap(); - let index = peer_changes.search_atom_index(cnt); - peer_changes[index..] - .iter() - .take_while(move |x| x.ctr_start() < end_cnt) - }) + spans + .into_iter() + .flat_map(move |span| self.change_store.iter_changes(span)) } pub(crate) fn iter_changes_causally_rev<'a>( @@ -845,28 +791,15 @@ impl OpLog { } pub(crate) fn idlp_to_id(&self, id: loro_common::IdLp) -> Option { - if let Some(peer_changes) = self.changes.get(&id.peer) { - let ans = peer_changes.binary_search_by(|c| { - if c.lamport > id.lamport { - Ordering::Greater - } else if (c.lamport + c.atom_len() as Lamport) <= id.lamport { - Ordering::Less - } else { - Ordering::Equal - } - }); - - match ans { - Ok(index) => { - let change = &peer_changes[index]; - let counter = (id.lamport - change.lamport) as Counter + change.id.counter; - Some(ID::new(id.peer, counter)) - } - Err(_) => None, - } - } else { - None + let change = self.change_store.get_change_by_idlp(id)?; + if change.lamport > id.lamport || change.lamport_end() <= id.lamport { + return None; } + + Some(ID::new( + change.id.peer, + (id.lamport - change.lamport) as Counter + change.id.counter, + )) } #[allow(unused)] @@ -877,9 +810,9 @@ impl OpLog { loro_common::IdLp { peer, lamport } } - pub(crate) fn get_op(&self, id: ID) -> Option<&Op> { + pub(crate) fn get_op(&self, id: ID) -> Option { let change = self.get_change_at(id)?; - change.ops.get_by_atom_index(id.counter).map(|x| x.element) + change.get_op_with_counter(id.counter) } pub(crate) fn split_span_based_on_deps(&self, id_span: IdSpan) -> Vec<(IdSpan, Frontiers)> { diff --git a/crates/loro-internal/src/oplog/change_store.rs b/crates/loro-internal/src/oplog/change_store.rs index 1f5693f6..42e81386 100644 --- a/crates/loro-internal/src/oplog/change_store.rs +++ b/crates/loro-internal/src/oplog/change_store.rs @@ -17,7 +17,7 @@ use tracing::trace; mod block_encode; mod delta_rle_encode; use crate::{ - arena::SharedArena, change::Change, estimated_size::EstimatedSize, version::Frontiers, + arena::SharedArena, change::Change, estimated_size::EstimatedSize, op::Op, version::Frontiers, }; use self::block_encode::{decode_block, decode_header, encode_block, ChangesBlockHeader}; @@ -64,7 +64,6 @@ impl ChangeStore { pub(crate) fn encode_all(&self) -> Vec { let mut kv = self.kv.lock().unwrap(); - println!("block num {}", kv.len()); let mut bytes = Vec::new(); let iter = kv .iter_mut() @@ -128,6 +127,16 @@ impl ChangeStore { None } + pub fn visit_all_changes(&self, f: &mut dyn FnMut(&Change)) { + let mut kv = self.kv.lock().unwrap(); + for (_, block) in kv.iter_mut() { + block.ensure_changes().unwrap(); + for c in block.content.try_changes().unwrap() { + f(c); + } + } + } + pub fn iter_changes(&self, id_span: IdSpan) -> impl Iterator + '_ { let mut kv = self.kv.lock().unwrap(); let start_counter = kv @@ -218,6 +227,36 @@ impl Deref for BlockChangeRef { } } +impl BlockChangeRef { + pub fn get_op_with_counter(&self, counter: Counter) -> Option { + if counter >= self.id_end().counter { + return None; + } + + let index = self.ops.search_atom_index(counter); + Some(BlockOpRef { + block: self.block.clone(), + change_index: self.change_index, + op_index: index, + }) + } +} + +#[derive(Clone)] +pub struct BlockOpRef { + pub block: Arc, + pub change_index: usize, + pub op_index: usize, +} + +impl Deref for BlockOpRef { + type Target = Op; + + fn deref(&self) -> &Op { + &self.block.content.try_changes().unwrap()[self.change_index].ops[self.op_index] + } +} + #[derive(Debug, Clone)] pub struct ChangesBlock { arena: SharedArena, diff --git a/crates/loro-internal/src/undo.rs b/crates/loro-internal/src/undo.rs index 14dc32ca..efee0061 100644 --- a/crates/loro-internal/src/undo.rs +++ b/crates/loro-internal/src/undo.rs @@ -407,8 +407,9 @@ fn get_counter_end(doc: &LoroDoc, peer: PeerID) -> Counter { doc.oplog() .lock() .unwrap() - .get_peer_changes(peer) - .and_then(|x| x.last().map(|x| x.ctr_end())) + .vv() + .get(&peer) + .cloned() .unwrap_or(0) }