From f9764a2d4cd619186860d703cf48481a29caff02 Mon Sep 17 00:00:00 2001 From: Zixuan Chen Date: Sat, 5 Aug 2023 16:11:41 +0800 Subject: [PATCH] Fix DiffCalculator error when going back in history(#106) --- crates/loro-internal/fuzz/Cargo.lock | 3 - crates/loro-internal/src/diff_calc.rs | 340 +++++++++++++------------- crates/loro-internal/src/fuzz.rs | 35 ++- crates/loro-internal/src/loro.rs | 25 +- crates/loro-internal/src/oplog.rs | 31 +-- crates/loro-internal/src/version.rs | 26 ++ crates/loro-internal/tests/test.rs | 8 +- 7 files changed, 246 insertions(+), 222 deletions(-) diff --git a/crates/loro-internal/fuzz/Cargo.lock b/crates/loro-internal/fuzz/Cargo.lock index 8e3e04fd..593c721d 100644 --- a/crates/loro-internal/fuzz/Cargo.lock +++ b/crates/loro-internal/fuzz/Cargo.lock @@ -25,9 +25,6 @@ name = "append-only-bytes" version = "0.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dd736657a12852ffb42ed309ac3409382d93f76f49ae0ad69fae4ca927e584d9" -dependencies = [ - "serde", -] [[package]] name = "arbitrary" diff --git a/crates/loro-internal/src/diff_calc.rs b/crates/loro-internal/src/diff_calc.rs index af207add..1d35260e 100644 --- a/crates/loro-internal/src/diff_calc.rs +++ b/crates/loro-internal/src/diff_calc.rs @@ -1,11 +1,12 @@ -use std::{cmp::Ordering, collections::BinaryHeap}; +use std::collections::BinaryHeap; +use debug_log::debug_log; use enum_dispatch::enum_dispatch; use fxhash::{FxHashMap, FxHashSet}; -use loro_common::{ContainerType, HasIdSpan, PeerID, ID}; +use loro_common::{HasIdSpan, PeerID, ID}; use crate::{ - change::{Change, Lamport}, + change::Lamport, container::idx::ContainerIdx, delta::{MapDelta, MapValue}, event::Diff, @@ -23,15 +24,19 @@ use super::{event::InternalContainerDiff, oplog::OpLog}; /// and [AppState][super::state::AppState]. /// /// TODO: persist diffCalculator and skip processed version -#[derive(Default)] +#[derive(Debug, Default)] pub struct DiffCalculator { calculators: FxHashMap, + last_vv: VersionVector, + has_all: bool, } impl DiffCalculator { pub fn new() -> Self { Self { calculators: Default::default(), + last_vv: Default::default(), + has_all: false, } } @@ -54,48 +59,109 @@ impl DiffCalculator { after: &crate::VersionVector, after_frontiers: Option<&Frontiers>, ) -> Vec { - let mut diffs = Vec::new(); - let (lca, iter) = - oplog.iter_from_lca_causally(before, before_frontiers, after, after_frontiers); - for (change, vv) in iter { - let mut visited = FxHashSet::default(); - for op in change.ops.iter() { - let calculator = self.calculators.entry(op.container).or_insert_with(|| { - let mut new = match op.container.get_type() { - crate::ContainerType::Text => { - ContainerDiffCalculator::Text(TextDiffCalculator::default()) - } - crate::ContainerType::Map => { - ContainerDiffCalculator::Map(MapDiffCalculator::new(op.container)) - } - crate::ContainerType::List => { - ContainerDiffCalculator::List(ListDiffCalculator::default()) - } - }; - new.start_tracking(oplog, &lca); - new - }); + let affected_set = if !self.has_all + || !self.last_vv.includes_vv(before) + || !self.last_vv.includes_vv(after) + { + // if we don't have all the ops, we need to calculate the diff by tracing back + debug_log!("DiffCalculator: calculate diff by tracing back"); + let mut after = after; + let mut before = before; + let mut merged = before.clone(); + let mut before_frontiers = before_frontiers; + let mut after_frontiers = after_frontiers; + merged.merge(after); + let empty_vv: VersionVector = Default::default(); + if !after.includes_vv(before) { + debug_log!("GO BACK TO THE BEGINNING"); + // if after is not after before, we need to calculate the diff from the beginning + before = &merged; + after = &empty_vv; + before_frontiers = None; + after_frontiers = None; + self.has_all = true; + } - if visited.contains(&op.container) { - // don't checkout if we have already checked out this container in this round - calculator.apply_change(oplog, RichOp::new_by_change(change, op), None); - } else { - calculator.apply_change( - oplog, - RichOp::new_by_change(change, op), - Some(&vv.borrow()), - ); - visited.insert(op.container); + let (lca, iter) = + oplog.iter_from_lca_causally(before, before_frontiers, after, after_frontiers); + let mut started_set = FxHashSet::default(); + for (change, vv) in iter { + self.last_vv.extend_to_include_end_id(change.id_end()); + let mut visited = FxHashSet::default(); + for op in change.ops.iter() { + let calculator = self.calculators.entry(op.container).or_insert_with(|| { + debug_log::debug_log!("Create new diff calculator"); + match op.container.get_type() { + crate::ContainerType::Text => { + ContainerDiffCalculator::Text(TextDiffCalculator::default()) + } + crate::ContainerType::Map => { + ContainerDiffCalculator::Map(MapDiffCalculator::new()) + } + crate::ContainerType::List => { + ContainerDiffCalculator::List(ListDiffCalculator::default()) + } + } + }); + + if !started_set.contains(&op.container) { + started_set.insert(op.container); + calculator.start_tracking(oplog, &lca); + } + + if visited.contains(&op.container) { + // don't checkout if we have already checked out this container in this round + calculator.apply_change(oplog, RichOp::new_by_change(change, op), None); + } else { + calculator.apply_change( + oplog, + RichOp::new_by_change(change, op), + Some(&vv.borrow()), + ); + visited.insert(op.container); + } } } - } - for (&idx, calculator) in self.calculators.iter_mut() { - calculator.stop_tracking(oplog, after); - diffs.push(InternalContainerDiff { - idx, - diff: calculator.calculate_diff(oplog, before, after), - }); + for (_, calculator) in self.calculators.iter_mut() { + calculator.stop_tracking(oplog, after); + } + + Some(started_set) + } else { + // We can calculate the diff by the current calculators. + + // Find a set of affected containers idx, if it's relatively cheap + if before.distance_to(after) < self.calculators.len() { + let mut set = FxHashSet::default(); + oplog.for_each_change_within(before, after, |change| { + for op in change.ops.iter() { + set.insert(op.container); + } + }); + Some(set) + } else { + None + } + }; + + let mut diffs = Vec::with_capacity(self.calculators.len()); + if let Some(set) = affected_set { + // only visit the affected containers + for idx in set { + let calc = self.calculators.get_mut(&idx).unwrap(); + diffs.push(InternalContainerDiff { + idx, + diff: calc.calculate_diff(oplog, before, after), + }); + } + } else { + for (&idx, calculator) in self.calculators.iter_mut() { + diffs.push(InternalContainerDiff { + idx, + diff: calculator.calculate_diff(oplog, before, after), + }); + } } diffs @@ -128,6 +194,7 @@ pub trait DiffCalculatorTrait { } #[enum_dispatch(DiffCalculatorTrait)] +#[derive(Debug)] enum ContainerDiffCalculator { Text(TextDiffCalculator), Map(MapDiffCalculator), @@ -139,53 +206,31 @@ struct TextDiffCalculator { tracker: Tracker, } -struct MapDiffCalculator { - idx: ContainerIdx, - grouped: FxHashMap, -} - -#[derive(Default)] -struct GroupedValues { - /// Each value in this set should be included in the current version or - /// "concurrent to the current version it is not at the peak". - applied_or_smaller: BinaryHeap, - /// The values that are guaranteed not in the current version. (they are from the future) - pending: FxHashSet, -} - -impl GroupedValues { - fn checkout(&mut self, vv: &VersionVector) { - self.pending.retain(|v| { - if vv.includes_id(v.id_start()) { - self.applied_or_smaller.push(v.clone()); - false - } else { - true - } - }); - - while let Some(top) = self.applied_or_smaller.peek() { - if vv.includes_id(top.id_start()) { - break; - } else { - let top = self.applied_or_smaller.pop().unwrap(); - self.pending.insert(top); - } - } +impl std::fmt::Debug for TextDiffCalculator { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("TextDiffCalculator") + // .field("tracker", &self.tracker) + .finish() } } +#[derive(Debug, Default)] +struct MapDiffCalculator { + grouped: FxHashMap, +} + impl MapDiffCalculator { - pub(crate) fn new(idx: ContainerIdx) -> Self { + pub(crate) fn new() -> Self { Self { - idx, grouped: Default::default(), } } fn checkout(&mut self, vv: &VersionVector) { for (_, g) in self.grouped.iter_mut() { - g.checkout(vv) + let a = g.len(); + g.checkout(vv); + debug_assert_eq!(a, g.len()); } } } @@ -195,17 +240,20 @@ impl DiffCalculatorTrait for MapDiffCalculator { fn apply_change( &mut self, - oplog: &crate::OpLog, + _oplog: &crate::OpLog, op: crate::op::RichOp, _vv: Option<&crate::VersionVector>, ) { let map = op.op().content.as_map().unwrap(); - let value = oplog.arena.get_value(map.value as usize); self.grouped .entry(map.key.clone()) .or_default() .pending - .insert(MapValue::new(op.id_start(), op.lamport(), value)); + .push(CompactMapValue { + lamport: op.lamport(), + peer: op.client_id(), + counter: op.id_start().counter, + }); } fn stop_tracking(&mut self, _oplog: &super::oplog::OpLog, _vv: &crate::VersionVector) {} @@ -219,16 +267,10 @@ impl DiffCalculatorTrait for MapDiffCalculator { let mut changed = Vec::new(); self.checkout(from); for (k, g) in self.grouped.iter_mut() { - let top = g.applied_or_smaller.pop(); + let top = g.applied_or_smaller.peek().copied(); g.checkout(to); - if let Some(top) = &top { - if to.includes_id(top.id_start()) { - g.applied_or_smaller.push(top.clone()); - } - } - match (&top, g.applied_or_smaller.peek()) { - (None, None) => todo!(), + (None, None) => {} (None, Some(_)) => changed.push(k.clone()), (Some(_), None) => changed.push(k.clone()), (Some(a), Some(b)) => { @@ -240,36 +282,35 @@ impl DiffCalculatorTrait for MapDiffCalculator { } let mut updated = FxHashMap::with_capacity_and_hasher(changed.len(), Default::default()); - let mut extra_lookup = Vec::new(); for key in changed { - if let Some(value) = self + let value = self .grouped .get(&key) .unwrap() .applied_or_smaller .peek() .cloned() - { - updated.insert(key, value); - } else { - extra_lookup.push(key); - } - } - - if !extra_lookup.is_empty() { - // PERF: the first time we do this, it may take a long time: - // it will travel the whole history with O(n) time - let ans = oplog.lookup_map_values_at(self.idx, &extra_lookup, to); - for (k, v) in extra_lookup.into_iter().zip(ans.into_iter()) { - updated.insert( - k, - v.unwrap_or_else(|| MapValue { - counter: 0, - value: None, - lamport: (0, 0), - }), - ); - } + .map(|v| { + let value = oplog + .lookup_op(v.id_start()) + .unwrap() + .content + .as_map() + .unwrap() + .value; + let value = oplog.arena.get_value(value as usize); + MapValue { + counter: v.counter, + value, + lamport: (v.lamport, v.peer), + } + }) + .unwrap_or_else(|| MapValue { + counter: 0, + value: None, + lamport: (0, 0), + }); + updated.insert(key, value); } Diff::NewMap(MapDelta { updated }) @@ -319,61 +360,8 @@ impl CompactGroupedValues { } } - fn peek(&self) -> Option { - self.applied_or_smaller.peek().cloned() - } -} - -#[derive(Default)] -pub(crate) struct GlobalMapDiffCalculator { - maps: FxHashMap>, - pub(crate) last_vv: VersionVector, -} - -impl GlobalMapDiffCalculator { - pub fn process_change(&mut self, change: &Change) { - if self.last_vv.includes_id(change.id_last()) { - return; - } - - for op in change.ops.iter() { - if op.container.get_type() == ContainerType::Map { - let key = op.content.as_map().unwrap().key.clone(); - self.maps - .entry(op.container) - .or_default() - .entry(key) - .or_default() - .pending - .push(CompactMapValue { - lamport: (op.counter - change.id.counter) as Lamport + change.lamport, - peer: change.id.peer, - counter: op.counter, - }); - } - } - - self.last_vv.extend_to_include_end_id(change.id_end()); - } - - pub fn get_value_at( - &mut self, - container: ContainerIdx, - key: &InternalString, - vv: &VersionVector, - oplog: &OpLog, - ) -> Option { - let group = self.maps.get_mut(&container)?.get_mut(key)?; - group.checkout(vv); - let peek = group.peek()?; - let op = oplog.lookup_op(peek.id_start()).unwrap(); - let value_idx = op.content.as_map().unwrap().value; - let value = oplog.arena.get_value(value_idx as usize); - Some(MapValue { - counter: peek.counter, - value, - lamport: (peek.lamport, peek.peer), - }) + fn len(&self) -> usize { + self.applied_or_smaller.len() + self.pending.len() } } @@ -382,12 +370,17 @@ struct ListDiffCalculator { tracker: Tracker, } +impl std::fmt::Debug for ListDiffCalculator { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ListDiffCalculator") + // .field("tracker", &self.tracker) + .finish() + } +} + impl DiffCalculatorTrait for ListDiffCalculator { fn start_tracking(&mut self, _oplog: &OpLog, vv: &crate::VersionVector) { - if matches!( - self.tracker.start_vv().partial_cmp(vv), - None | Some(Ordering::Less) - ) { + if !vv.includes_vv(self.tracker.start_vv()) || !self.tracker.all_vv().includes_vv(vv) { self.tracker = Tracker::new(vv.clone(), Counter::MAX / 2); } @@ -420,10 +413,7 @@ impl DiffCalculatorTrait for ListDiffCalculator { impl DiffCalculatorTrait for TextDiffCalculator { fn start_tracking(&mut self, _oplog: &super::oplog::OpLog, vv: &crate::VersionVector) { - if matches!( - self.tracker.start_vv().partial_cmp(vv), - None | Some(Ordering::Less) - ) { + if !vv.includes_vv(self.tracker.start_vv()) || !self.tracker.all_vv().includes_vv(vv) { self.tracker = Tracker::new(vv.clone(), Counter::MAX / 2); } diff --git a/crates/loro-internal/src/fuzz.rs b/crates/loro-internal/src/fuzz.rs index 7c635b8f..d7bc925b 100644 --- a/crates/loro-internal/src/fuzz.rs +++ b/crates/loro-internal/src/fuzz.rs @@ -245,7 +245,15 @@ fn check_eq_refactored(site_a: &mut LoroDoc, site_b: &mut LoroDoc) { let text_b = b.get_text("text"); let value_a = text_a.get_value(); let value_b = text_b.get_value(); - assert_eq!(value_a, value_b); + assert_eq!( + value_a, + value_b, + "peer{}={:?}, peer{}={:?}", + site_a.peer_id(), + value_a, + site_b.peer_id(), + value_b + ); } pub fn minify_error(site_num: u8, actions: Vec, f: F, normalize: N) @@ -999,6 +1007,31 @@ mod test { ) } + #[test] + fn checkout() { + test_multi_sites_refactored( + 4, + &mut [ + Ins { + content: 53, + pos: 4, + site: 2, + }, + SyncAll, + Ins { + content: 0, + pos: 1, + site: 0, + }, + Del { + pos: 4, + len: 1, + site: 2, + }, + ], + ) + } + #[test] fn mini_r() { minify_error(8, vec![], test_multi_sites_refactored, |_, ans| { diff --git a/crates/loro-internal/src/loro.rs b/crates/loro-internal/src/loro.rs index 2a4542c5..eb1aaf96 100644 --- a/crates/loro-internal/src/loro.rs +++ b/crates/loro-internal/src/loro.rs @@ -49,6 +49,7 @@ pub struct LoroDoc { state: Arc>, arena: SharedArena, observer: Arc, + diff_calculator: Arc>, detached: bool, } @@ -63,6 +64,7 @@ impl LoroDoc { state, detached: false, observer: Arc::new(Observer::new(arena.clone())), + diff_calculator: Arc::new(Mutex::new(DiffCalculator::new())), arena, } } @@ -85,6 +87,7 @@ impl LoroDoc { observer: Arc::new(obs), oplog: Arc::new(Mutex::new(oplog)), state: Arc::new(Mutex::new(state)), + diff_calculator: Arc::new(Mutex::new(DiffCalculator::new())), detached: false, } } @@ -186,20 +189,20 @@ impl LoroDoc { match mode { ConcreteEncodeMode::Updates | ConcreteEncodeMode::RleUpdates => { // TODO: need to throw error if state is in transaction - debug_log::group!("import"); + debug_log::group!("import to {}", self.peer_id()); let mut oplog = self.oplog.lock().unwrap(); let old_vv = oplog.vv().clone(); let old_frontiers = oplog.frontiers().clone(); oplog.decode(bytes)?; - let mut diff = DiffCalculator::new(); - let diff = diff.calc_diff_internal( - &oplog, - &old_vv, - Some(&old_frontiers), - oplog.vv(), - Some(oplog.dag.get_frontiers()), - ); if !self.detached { + let mut diff = self.diff_calculator.lock().unwrap(); + let diff = diff.calc_diff_internal( + &oplog, + &old_vv, + Some(&old_frontiers), + oplog.vv(), + Some(oplog.dag.get_frontiers()), + ); let mut state = self.state.lock().unwrap(); state.apply_diff(InternalDocDiff { origin, @@ -225,9 +228,7 @@ impl LoroDoc { } }; - debug_dbg!(&self.oplog.lock().unwrap().changes); self.emit_events(); - Ok(()) } @@ -359,7 +360,7 @@ impl LoroDoc { let oplog = self.oplog.lock().unwrap(); let mut state = self.state.lock().unwrap(); self.detached = true; - let mut calc = DiffCalculator::new(); + let mut calc = self.diff_calculator.lock().unwrap(); let before = &oplog.dag.frontiers_to_vv(&state.frontiers); let after = &oplog.dag.frontiers_to_vv(frontiers); let diff = calc.calc_diff_internal( diff --git a/crates/loro-internal/src/oplog.rs b/crates/loro-internal/src/oplog.rs index fe8186ed..2f2ebbf3 100644 --- a/crates/loro-internal/src/oplog.rs +++ b/crates/loro-internal/src/oplog.rs @@ -4,25 +4,21 @@ use std::borrow::Cow; use std::cell::RefCell; use std::cmp::Ordering; use std::rc::Rc; -use std::sync::{Arc, Mutex}; use fxhash::FxHashMap; use rle::{HasLength, RleVec}; // use tabled::measurment::Percent; use crate::change::{Change, Lamport, Timestamp}; -use crate::container::idx::ContainerIdx; use crate::container::list::list_op; use crate::dag::DagUtils; -use crate::delta::MapValue; -use crate::diff_calc::GlobalMapDiffCalculator; use crate::encoding::{decode_oplog, encode_oplog, EncodeMode}; use crate::encoding::{ClientChanges, RemoteClientChanges}; use crate::id::{Counter, PeerID, ID}; use crate::op::{RawOpContent, RemoteOp}; use crate::span::{HasCounterSpan, HasIdSpan, HasLamportSpan}; use crate::version::{Frontiers, ImVersionVector, VersionVector}; -use crate::{InternalString, LoroError}; +use crate::LoroError; use super::arena::SharedArena; @@ -44,7 +40,6 @@ pub struct OpLog { /// A change can be imported only when all its deps are already imported. /// Key is the ID of the missing dep pending_changes: FxHashMap>, - map_diff_calc: Arc>, } /// [AppDag] maintains the causal graph of the app. @@ -75,7 +70,6 @@ impl Clone for OpLog { next_lamport: self.next_lamport, latest_timestamp: self.latest_timestamp, pending_changes: Default::default(), - map_diff_calc: Default::default(), } } } @@ -100,7 +94,6 @@ impl OpLog { next_lamport: 0, latest_timestamp: Timestamp::default(), pending_changes: Default::default(), - map_diff_calc: Default::default(), } } @@ -568,28 +561,6 @@ impl OpLog { println!("total atom ops: {}", total_atom_ops); println!("total dag node: {}", total_dag_node); } - - /// lookup map values at a specific version - // PERF: this is slow. it needs to traverse all changes to build the cache for now - pub(crate) fn lookup_map_values_at( - &self, - idx: ContainerIdx, - extra_lookup: &[InternalString], - to: &VersionVector, - ) -> Vec> { - let mut map_diff_calc = self.map_diff_calc.lock().unwrap(); - if to.partial_cmp(&map_diff_calc.last_vv) != Some(Ordering::Less) { - let from = map_diff_calc.last_vv.clone(); - self.for_each_change_within(&from, to, |change| map_diff_calc.process_change(change)); - } - - let ans = extra_lookup - .iter() - .map(|x| map_diff_calc.get_value_at(idx, x, to, self)) - .collect(); - - ans - } } impl Default for OpLog { diff --git a/crates/loro-internal/src/version.rs b/crates/loro-internal/src/version.rs index 047d0f05..c480a104 100644 --- a/crates/loro-internal/src/version.rs +++ b/crates/loro-internal/src/version.rs @@ -475,6 +475,21 @@ impl VersionVector { .collect() } + pub fn distance_to(&self, other: &Self) -> usize { + let mut ans = 0; + for (client_id, &counter) in self.iter() { + if let Some(&other_counter) = other.get(client_id) { + if counter > other_counter { + ans += counter - other_counter; + } + } else if counter > 0 { + ans += counter; + } + } + + ans as usize + } + pub fn to_spans(&self) -> IdSpanVector { self.iter() .map(|(client_id, &counter)| { @@ -573,6 +588,17 @@ impl VersionVector { } } + pub fn includes_vv(&self, other: &VersionVector) -> bool { + match self.partial_cmp(other) { + Some(ord) => match ord { + Ordering::Less => false, + Ordering::Equal => true, + Ordering::Greater => true, + }, + None => false, + } + } + pub fn includes_id(&self, id: ID) -> bool { if let Some(end) = self.get(&id.peer) { if *end > id.counter { diff --git a/crates/loro-internal/tests/test.rs b/crates/loro-internal/tests/test.rs index 18cee753..06f00591 100644 --- a/crates/loro-internal/tests/test.rs +++ b/crates/loro-internal/tests/test.rs @@ -18,7 +18,7 @@ fn test_timestamp() { } #[test] -fn test_checkout() { +fn test_text_checkout() { let mut doc = LoroDoc::new(); let text = doc.get_text("text"); let mut txn = doc.txn().unwrap(); @@ -44,6 +44,12 @@ fn test_checkout() { assert_eq!(text.len_unicode(), 4); assert_eq!(text.len_utf8(), 12); assert_eq!(text.len_unicode(), 4); + + doc.checkout_to_latest(); + doc.with_txn(|txn| text.delete(txn, 3, 1)).unwrap(); + assert_eq!(text.get_value().as_string().unwrap().as_str(), "你好世"); + doc.checkout(&Frontiers::from([ID::new(doc.peer_id(), 3)].as_slice())); + assert_eq!(text.get_value().as_string().unwrap().as_str(), "你好世界"); } #[test]