Fix DiffCalculator error when going back in history(#106)

This commit is contained in:
Zixuan Chen 2023-08-05 16:11:41 +08:00 committed by GitHub
parent ccee201641
commit f9764a2d4c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 246 additions and 222 deletions

View file

@ -25,9 +25,6 @@ name = "append-only-bytes"
version = "0.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dd736657a12852ffb42ed309ac3409382d93f76f49ae0ad69fae4ca927e584d9"
dependencies = [
"serde",
]
[[package]]
name = "arbitrary"

View file

@ -1,11 +1,12 @@
use std::{cmp::Ordering, collections::BinaryHeap};
use std::collections::BinaryHeap;
use debug_log::debug_log;
use enum_dispatch::enum_dispatch;
use fxhash::{FxHashMap, FxHashSet};
use loro_common::{ContainerType, HasIdSpan, PeerID, ID};
use loro_common::{HasIdSpan, PeerID, ID};
use crate::{
change::{Change, Lamport},
change::Lamport,
container::idx::ContainerIdx,
delta::{MapDelta, MapValue},
event::Diff,
@ -23,15 +24,19 @@ use super::{event::InternalContainerDiff, oplog::OpLog};
/// and [AppState][super::state::AppState].
///
/// TODO: persist diffCalculator and skip processed version
#[derive(Default)]
#[derive(Debug, Default)]
pub struct DiffCalculator {
calculators: FxHashMap<ContainerIdx, ContainerDiffCalculator>,
last_vv: VersionVector,
has_all: bool,
}
impl DiffCalculator {
pub fn new() -> Self {
Self {
calculators: Default::default(),
last_vv: Default::default(),
has_all: false,
}
}
@ -54,48 +59,109 @@ impl DiffCalculator {
after: &crate::VersionVector,
after_frontiers: Option<&Frontiers>,
) -> Vec<InternalContainerDiff> {
let mut diffs = Vec::new();
let (lca, iter) =
oplog.iter_from_lca_causally(before, before_frontiers, after, after_frontiers);
for (change, vv) in iter {
let mut visited = FxHashSet::default();
for op in change.ops.iter() {
let calculator = self.calculators.entry(op.container).or_insert_with(|| {
let mut new = match op.container.get_type() {
crate::ContainerType::Text => {
ContainerDiffCalculator::Text(TextDiffCalculator::default())
}
crate::ContainerType::Map => {
ContainerDiffCalculator::Map(MapDiffCalculator::new(op.container))
}
crate::ContainerType::List => {
ContainerDiffCalculator::List(ListDiffCalculator::default())
}
};
new.start_tracking(oplog, &lca);
new
});
let affected_set = if !self.has_all
|| !self.last_vv.includes_vv(before)
|| !self.last_vv.includes_vv(after)
{
// if we don't have all the ops, we need to calculate the diff by tracing back
debug_log!("DiffCalculator: calculate diff by tracing back");
let mut after = after;
let mut before = before;
let mut merged = before.clone();
let mut before_frontiers = before_frontiers;
let mut after_frontiers = after_frontiers;
merged.merge(after);
let empty_vv: VersionVector = Default::default();
if !after.includes_vv(before) {
debug_log!("GO BACK TO THE BEGINNING");
// if after is not after before, we need to calculate the diff from the beginning
before = &merged;
after = &empty_vv;
before_frontiers = None;
after_frontiers = None;
self.has_all = true;
}
if visited.contains(&op.container) {
// don't checkout if we have already checked out this container in this round
calculator.apply_change(oplog, RichOp::new_by_change(change, op), None);
} else {
calculator.apply_change(
oplog,
RichOp::new_by_change(change, op),
Some(&vv.borrow()),
);
visited.insert(op.container);
let (lca, iter) =
oplog.iter_from_lca_causally(before, before_frontiers, after, after_frontiers);
let mut started_set = FxHashSet::default();
for (change, vv) in iter {
self.last_vv.extend_to_include_end_id(change.id_end());
let mut visited = FxHashSet::default();
for op in change.ops.iter() {
let calculator = self.calculators.entry(op.container).or_insert_with(|| {
debug_log::debug_log!("Create new diff calculator");
match op.container.get_type() {
crate::ContainerType::Text => {
ContainerDiffCalculator::Text(TextDiffCalculator::default())
}
crate::ContainerType::Map => {
ContainerDiffCalculator::Map(MapDiffCalculator::new())
}
crate::ContainerType::List => {
ContainerDiffCalculator::List(ListDiffCalculator::default())
}
}
});
if !started_set.contains(&op.container) {
started_set.insert(op.container);
calculator.start_tracking(oplog, &lca);
}
if visited.contains(&op.container) {
// don't checkout if we have already checked out this container in this round
calculator.apply_change(oplog, RichOp::new_by_change(change, op), None);
} else {
calculator.apply_change(
oplog,
RichOp::new_by_change(change, op),
Some(&vv.borrow()),
);
visited.insert(op.container);
}
}
}
}
for (&idx, calculator) in self.calculators.iter_mut() {
calculator.stop_tracking(oplog, after);
diffs.push(InternalContainerDiff {
idx,
diff: calculator.calculate_diff(oplog, before, after),
});
for (_, calculator) in self.calculators.iter_mut() {
calculator.stop_tracking(oplog, after);
}
Some(started_set)
} else {
// We can calculate the diff by the current calculators.
// Find a set of affected containers idx, if it's relatively cheap
if before.distance_to(after) < self.calculators.len() {
let mut set = FxHashSet::default();
oplog.for_each_change_within(before, after, |change| {
for op in change.ops.iter() {
set.insert(op.container);
}
});
Some(set)
} else {
None
}
};
let mut diffs = Vec::with_capacity(self.calculators.len());
if let Some(set) = affected_set {
// only visit the affected containers
for idx in set {
let calc = self.calculators.get_mut(&idx).unwrap();
diffs.push(InternalContainerDiff {
idx,
diff: calc.calculate_diff(oplog, before, after),
});
}
} else {
for (&idx, calculator) in self.calculators.iter_mut() {
diffs.push(InternalContainerDiff {
idx,
diff: calculator.calculate_diff(oplog, before, after),
});
}
}
diffs
@ -128,6 +194,7 @@ pub trait DiffCalculatorTrait {
}
#[enum_dispatch(DiffCalculatorTrait)]
#[derive(Debug)]
enum ContainerDiffCalculator {
Text(TextDiffCalculator),
Map(MapDiffCalculator),
@ -139,53 +206,31 @@ struct TextDiffCalculator {
tracker: Tracker,
}
struct MapDiffCalculator {
idx: ContainerIdx,
grouped: FxHashMap<InternalString, GroupedValues>,
}
#[derive(Default)]
struct GroupedValues {
/// Each value in this set should be included in the current version or
/// "concurrent to the current version it is not at the peak".
applied_or_smaller: BinaryHeap<MapValue>,
/// The values that are guaranteed not in the current version. (they are from the future)
pending: FxHashSet<MapValue>,
}
impl GroupedValues {
fn checkout(&mut self, vv: &VersionVector) {
self.pending.retain(|v| {
if vv.includes_id(v.id_start()) {
self.applied_or_smaller.push(v.clone());
false
} else {
true
}
});
while let Some(top) = self.applied_or_smaller.peek() {
if vv.includes_id(top.id_start()) {
break;
} else {
let top = self.applied_or_smaller.pop().unwrap();
self.pending.insert(top);
}
}
impl std::fmt::Debug for TextDiffCalculator {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("TextDiffCalculator")
// .field("tracker", &self.tracker)
.finish()
}
}
#[derive(Debug, Default)]
struct MapDiffCalculator {
grouped: FxHashMap<InternalString, CompactGroupedValues>,
}
impl MapDiffCalculator {
pub(crate) fn new(idx: ContainerIdx) -> Self {
pub(crate) fn new() -> Self {
Self {
idx,
grouped: Default::default(),
}
}
fn checkout(&mut self, vv: &VersionVector) {
for (_, g) in self.grouped.iter_mut() {
g.checkout(vv)
let a = g.len();
g.checkout(vv);
debug_assert_eq!(a, g.len());
}
}
}
@ -195,17 +240,20 @@ impl DiffCalculatorTrait for MapDiffCalculator {
fn apply_change(
&mut self,
oplog: &crate::OpLog,
_oplog: &crate::OpLog,
op: crate::op::RichOp,
_vv: Option<&crate::VersionVector>,
) {
let map = op.op().content.as_map().unwrap();
let value = oplog.arena.get_value(map.value as usize);
self.grouped
.entry(map.key.clone())
.or_default()
.pending
.insert(MapValue::new(op.id_start(), op.lamport(), value));
.push(CompactMapValue {
lamport: op.lamport(),
peer: op.client_id(),
counter: op.id_start().counter,
});
}
fn stop_tracking(&mut self, _oplog: &super::oplog::OpLog, _vv: &crate::VersionVector) {}
@ -219,16 +267,10 @@ impl DiffCalculatorTrait for MapDiffCalculator {
let mut changed = Vec::new();
self.checkout(from);
for (k, g) in self.grouped.iter_mut() {
let top = g.applied_or_smaller.pop();
let top = g.applied_or_smaller.peek().copied();
g.checkout(to);
if let Some(top) = &top {
if to.includes_id(top.id_start()) {
g.applied_or_smaller.push(top.clone());
}
}
match (&top, g.applied_or_smaller.peek()) {
(None, None) => todo!(),
(None, None) => {}
(None, Some(_)) => changed.push(k.clone()),
(Some(_), None) => changed.push(k.clone()),
(Some(a), Some(b)) => {
@ -240,36 +282,35 @@ impl DiffCalculatorTrait for MapDiffCalculator {
}
let mut updated = FxHashMap::with_capacity_and_hasher(changed.len(), Default::default());
let mut extra_lookup = Vec::new();
for key in changed {
if let Some(value) = self
let value = self
.grouped
.get(&key)
.unwrap()
.applied_or_smaller
.peek()
.cloned()
{
updated.insert(key, value);
} else {
extra_lookup.push(key);
}
}
if !extra_lookup.is_empty() {
// PERF: the first time we do this, it may take a long time:
// it will travel the whole history with O(n) time
let ans = oplog.lookup_map_values_at(self.idx, &extra_lookup, to);
for (k, v) in extra_lookup.into_iter().zip(ans.into_iter()) {
updated.insert(
k,
v.unwrap_or_else(|| MapValue {
counter: 0,
value: None,
lamport: (0, 0),
}),
);
}
.map(|v| {
let value = oplog
.lookup_op(v.id_start())
.unwrap()
.content
.as_map()
.unwrap()
.value;
let value = oplog.arena.get_value(value as usize);
MapValue {
counter: v.counter,
value,
lamport: (v.lamport, v.peer),
}
})
.unwrap_or_else(|| MapValue {
counter: 0,
value: None,
lamport: (0, 0),
});
updated.insert(key, value);
}
Diff::NewMap(MapDelta { updated })
@ -319,61 +360,8 @@ impl CompactGroupedValues {
}
}
fn peek(&self) -> Option<CompactMapValue> {
self.applied_or_smaller.peek().cloned()
}
}
#[derive(Default)]
pub(crate) struct GlobalMapDiffCalculator {
maps: FxHashMap<ContainerIdx, FxHashMap<InternalString, CompactGroupedValues>>,
pub(crate) last_vv: VersionVector,
}
impl GlobalMapDiffCalculator {
pub fn process_change(&mut self, change: &Change) {
if self.last_vv.includes_id(change.id_last()) {
return;
}
for op in change.ops.iter() {
if op.container.get_type() == ContainerType::Map {
let key = op.content.as_map().unwrap().key.clone();
self.maps
.entry(op.container)
.or_default()
.entry(key)
.or_default()
.pending
.push(CompactMapValue {
lamport: (op.counter - change.id.counter) as Lamport + change.lamport,
peer: change.id.peer,
counter: op.counter,
});
}
}
self.last_vv.extend_to_include_end_id(change.id_end());
}
pub fn get_value_at(
&mut self,
container: ContainerIdx,
key: &InternalString,
vv: &VersionVector,
oplog: &OpLog,
) -> Option<MapValue> {
let group = self.maps.get_mut(&container)?.get_mut(key)?;
group.checkout(vv);
let peek = group.peek()?;
let op = oplog.lookup_op(peek.id_start()).unwrap();
let value_idx = op.content.as_map().unwrap().value;
let value = oplog.arena.get_value(value_idx as usize);
Some(MapValue {
counter: peek.counter,
value,
lamport: (peek.lamport, peek.peer),
})
fn len(&self) -> usize {
self.applied_or_smaller.len() + self.pending.len()
}
}
@ -382,12 +370,17 @@ struct ListDiffCalculator {
tracker: Tracker,
}
impl std::fmt::Debug for ListDiffCalculator {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ListDiffCalculator")
// .field("tracker", &self.tracker)
.finish()
}
}
impl DiffCalculatorTrait for ListDiffCalculator {
fn start_tracking(&mut self, _oplog: &OpLog, vv: &crate::VersionVector) {
if matches!(
self.tracker.start_vv().partial_cmp(vv),
None | Some(Ordering::Less)
) {
if !vv.includes_vv(self.tracker.start_vv()) || !self.tracker.all_vv().includes_vv(vv) {
self.tracker = Tracker::new(vv.clone(), Counter::MAX / 2);
}
@ -420,10 +413,7 @@ impl DiffCalculatorTrait for ListDiffCalculator {
impl DiffCalculatorTrait for TextDiffCalculator {
fn start_tracking(&mut self, _oplog: &super::oplog::OpLog, vv: &crate::VersionVector) {
if matches!(
self.tracker.start_vv().partial_cmp(vv),
None | Some(Ordering::Less)
) {
if !vv.includes_vv(self.tracker.start_vv()) || !self.tracker.all_vv().includes_vv(vv) {
self.tracker = Tracker::new(vv.clone(), Counter::MAX / 2);
}

View file

@ -245,7 +245,15 @@ fn check_eq_refactored(site_a: &mut LoroDoc, site_b: &mut LoroDoc) {
let text_b = b.get_text("text");
let value_a = text_a.get_value();
let value_b = text_b.get_value();
assert_eq!(value_a, value_b);
assert_eq!(
value_a,
value_b,
"peer{}={:?}, peer{}={:?}",
site_a.peer_id(),
value_a,
site_b.peer_id(),
value_b
);
}
pub fn minify_error<T, F, N>(site_num: u8, actions: Vec<T>, f: F, normalize: N)
@ -999,6 +1007,31 @@ mod test {
)
}
#[test]
fn checkout() {
test_multi_sites_refactored(
4,
&mut [
Ins {
content: 53,
pos: 4,
site: 2,
},
SyncAll,
Ins {
content: 0,
pos: 1,
site: 0,
},
Del {
pos: 4,
len: 1,
site: 2,
},
],
)
}
#[test]
fn mini_r() {
minify_error(8, vec![], test_multi_sites_refactored, |_, ans| {

View file

@ -49,6 +49,7 @@ pub struct LoroDoc {
state: Arc<Mutex<DocState>>,
arena: SharedArena,
observer: Arc<Observer>,
diff_calculator: Arc<Mutex<DiffCalculator>>,
detached: bool,
}
@ -63,6 +64,7 @@ impl LoroDoc {
state,
detached: false,
observer: Arc::new(Observer::new(arena.clone())),
diff_calculator: Arc::new(Mutex::new(DiffCalculator::new())),
arena,
}
}
@ -85,6 +87,7 @@ impl LoroDoc {
observer: Arc::new(obs),
oplog: Arc::new(Mutex::new(oplog)),
state: Arc::new(Mutex::new(state)),
diff_calculator: Arc::new(Mutex::new(DiffCalculator::new())),
detached: false,
}
}
@ -186,20 +189,20 @@ impl LoroDoc {
match mode {
ConcreteEncodeMode::Updates | ConcreteEncodeMode::RleUpdates => {
// TODO: need to throw error if state is in transaction
debug_log::group!("import");
debug_log::group!("import to {}", self.peer_id());
let mut oplog = self.oplog.lock().unwrap();
let old_vv = oplog.vv().clone();
let old_frontiers = oplog.frontiers().clone();
oplog.decode(bytes)?;
let mut diff = DiffCalculator::new();
let diff = diff.calc_diff_internal(
&oplog,
&old_vv,
Some(&old_frontiers),
oplog.vv(),
Some(oplog.dag.get_frontiers()),
);
if !self.detached {
let mut diff = self.diff_calculator.lock().unwrap();
let diff = diff.calc_diff_internal(
&oplog,
&old_vv,
Some(&old_frontiers),
oplog.vv(),
Some(oplog.dag.get_frontiers()),
);
let mut state = self.state.lock().unwrap();
state.apply_diff(InternalDocDiff {
origin,
@ -225,9 +228,7 @@ impl LoroDoc {
}
};
debug_dbg!(&self.oplog.lock().unwrap().changes);
self.emit_events();
Ok(())
}
@ -359,7 +360,7 @@ impl LoroDoc {
let oplog = self.oplog.lock().unwrap();
let mut state = self.state.lock().unwrap();
self.detached = true;
let mut calc = DiffCalculator::new();
let mut calc = self.diff_calculator.lock().unwrap();
let before = &oplog.dag.frontiers_to_vv(&state.frontiers);
let after = &oplog.dag.frontiers_to_vv(frontiers);
let diff = calc.calc_diff_internal(

View file

@ -4,25 +4,21 @@ use std::borrow::Cow;
use std::cell::RefCell;
use std::cmp::Ordering;
use std::rc::Rc;
use std::sync::{Arc, Mutex};
use fxhash::FxHashMap;
use rle::{HasLength, RleVec};
// use tabled::measurment::Percent;
use crate::change::{Change, Lamport, Timestamp};
use crate::container::idx::ContainerIdx;
use crate::container::list::list_op;
use crate::dag::DagUtils;
use crate::delta::MapValue;
use crate::diff_calc::GlobalMapDiffCalculator;
use crate::encoding::{decode_oplog, encode_oplog, EncodeMode};
use crate::encoding::{ClientChanges, RemoteClientChanges};
use crate::id::{Counter, PeerID, ID};
use crate::op::{RawOpContent, RemoteOp};
use crate::span::{HasCounterSpan, HasIdSpan, HasLamportSpan};
use crate::version::{Frontiers, ImVersionVector, VersionVector};
use crate::{InternalString, LoroError};
use crate::LoroError;
use super::arena::SharedArena;
@ -44,7 +40,6 @@ pub struct OpLog {
/// A change can be imported only when all its deps are already imported.
/// Key is the ID of the missing dep
pending_changes: FxHashMap<ID, Vec<Change>>,
map_diff_calc: Arc<Mutex<GlobalMapDiffCalculator>>,
}
/// [AppDag] maintains the causal graph of the app.
@ -75,7 +70,6 @@ impl Clone for OpLog {
next_lamport: self.next_lamport,
latest_timestamp: self.latest_timestamp,
pending_changes: Default::default(),
map_diff_calc: Default::default(),
}
}
}
@ -100,7 +94,6 @@ impl OpLog {
next_lamport: 0,
latest_timestamp: Timestamp::default(),
pending_changes: Default::default(),
map_diff_calc: Default::default(),
}
}
@ -568,28 +561,6 @@ impl OpLog {
println!("total atom ops: {}", total_atom_ops);
println!("total dag node: {}", total_dag_node);
}
/// lookup map values at a specific version
// PERF: this is slow. it needs to traverse all changes to build the cache for now
pub(crate) fn lookup_map_values_at(
&self,
idx: ContainerIdx,
extra_lookup: &[InternalString],
to: &VersionVector,
) -> Vec<Option<MapValue>> {
let mut map_diff_calc = self.map_diff_calc.lock().unwrap();
if to.partial_cmp(&map_diff_calc.last_vv) != Some(Ordering::Less) {
let from = map_diff_calc.last_vv.clone();
self.for_each_change_within(&from, to, |change| map_diff_calc.process_change(change));
}
let ans = extra_lookup
.iter()
.map(|x| map_diff_calc.get_value_at(idx, x, to, self))
.collect();
ans
}
}
impl Default for OpLog {

View file

@ -475,6 +475,21 @@ impl VersionVector {
.collect()
}
pub fn distance_to(&self, other: &Self) -> usize {
let mut ans = 0;
for (client_id, &counter) in self.iter() {
if let Some(&other_counter) = other.get(client_id) {
if counter > other_counter {
ans += counter - other_counter;
}
} else if counter > 0 {
ans += counter;
}
}
ans as usize
}
pub fn to_spans(&self) -> IdSpanVector {
self.iter()
.map(|(client_id, &counter)| {
@ -573,6 +588,17 @@ impl VersionVector {
}
}
pub fn includes_vv(&self, other: &VersionVector) -> bool {
match self.partial_cmp(other) {
Some(ord) => match ord {
Ordering::Less => false,
Ordering::Equal => true,
Ordering::Greater => true,
},
None => false,
}
}
pub fn includes_id(&self, id: ID) -> bool {
if let Some(end) = self.get(&id.peer) {
if *end > id.counter {

View file

@ -18,7 +18,7 @@ fn test_timestamp() {
}
#[test]
fn test_checkout() {
fn test_text_checkout() {
let mut doc = LoroDoc::new();
let text = doc.get_text("text");
let mut txn = doc.txn().unwrap();
@ -44,6 +44,12 @@ fn test_checkout() {
assert_eq!(text.len_unicode(), 4);
assert_eq!(text.len_utf8(), 12);
assert_eq!(text.len_unicode(), 4);
doc.checkout_to_latest();
doc.with_txn(|txn| text.delete(txn, 3, 1)).unwrap();
assert_eq!(text.get_value().as_string().unwrap().as_str(), "你好世");
doc.checkout(&Frontiers::from([ID::new(doc.peer_id(), 3)].as_slice()));
assert_eq!(text.get_value().as_string().unwrap().as_str(), "你好世界");
}
#[test]