Merge pull request #112 from loro-dev/feat-pending-changes

Feat pending changes
This commit is contained in:
Leon zhao 2023-09-21 11:17:18 +08:00 committed by GitHub
commit 74cbd42391
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 489 additions and 132 deletions

View file

@ -79,3 +79,7 @@ harness = false
[[bench]]
name = "encode"
harness = false
[[bench]]
name = "pending"
harness = false

View file

@ -0,0 +1,45 @@
use criterion::{criterion_group, criterion_main, Criterion};
#[cfg(feature = "test_utils")]
mod pending {
use super::*;
use bench_utils::TextAction;
use loro_internal::{LoroDoc, VersionVector};
pub fn b4(c: &mut Criterion) {
let mut b = c.benchmark_group("B4 pending decode");
b.sample_size(10);
b.bench_function("detached mode", |b| {
let loro = LoroDoc::default();
let mut latest_vv = VersionVector::default();
let mut updates = vec![];
let actions = bench_utils::get_automerge_actions();
let action_length = actions.len();
let text = loro.get_text("text");
for chunks in actions.chunks(action_length / 5) {
for TextAction { pos, ins, del } in chunks {
let mut txn = loro.txn().unwrap();
text.delete(&mut txn, *pos, *del).unwrap();
text.insert(&mut txn, *pos, ins).unwrap();
updates.push(loro.export_from(&latest_vv));
latest_vv = loro.oplog_vv();
}
}
updates.reverse();
b.iter(|| {
let mut store2 = LoroDoc::default();
store2.detach();
for update in updates.iter() {
store2.import(update).unwrap();
}
})
});
}
}
pub fn dumb(_c: &mut Criterion) {}
#[cfg(feature = "test_utils")]
criterion_group!(benches, pending::b4);
#[cfg(not(feature = "test_utils"))]
criterion_group!(benches, dumb);
criterion_main!(benches);

View file

@ -506,9 +506,9 @@ pub fn decode_oplog_v2(oplog: &mut OpLog, input: &[u8]) -> Result<(), LoroError>
Ok(changes) => changes,
Err(err) => return Err(err),
};
let mut pending_remote_changes = Vec::new();
oplog.arena.clone().with_op_converter(|converter| {
for mut change in changes {
'outer: for mut change in changes {
if change.id.counter < oplog.vv().get(&change.id.peer).copied().unwrap_or(0) {
// skip included changes
continue;
@ -521,7 +521,8 @@ pub fn decode_oplog_v2(oplog: &mut OpLog, input: &[u8]) -> Result<(), LoroError>
change.lamport = change.lamport.max(lamport + 1);
}
None => {
todo!("pending")
pending_remote_changes.push(change);
continue 'outer;
}
}
}
@ -590,11 +591,11 @@ pub fn decode_oplog_v2(oplog: &mut OpLog, input: &[u8]) -> Result<(), LoroError>
.push(change);
}
});
// update dag frontiers
if !oplog.batch_importing {
oplog.dag.refresh_frontiers();
}
oplog.import_unknown_lamport_remote_changes(pending_remote_changes)?;
assert_eq!(str_index, str.len());
Ok(())
}

View file

@ -1,4 +1,5 @@
pub(crate) mod dag;
mod pending_changes;
use std::borrow::Cow;
use std::cell::RefCell;
@ -6,7 +7,6 @@ use std::cmp::Ordering;
use std::rc::Rc;
use fxhash::FxHashMap;
use loro_common::HasId;
use rle::{HasLength, RleVec};
// use tabled::measurment::Percent;
@ -21,6 +21,8 @@ use crate::span::{HasCounterSpan, HasIdSpan, HasLamportSpan};
use crate::version::{Frontiers, ImVersionVector, VersionVector};
use crate::LoroError;
use self::pending_changes::PendingChanges;
use super::arena::SharedArena;
/// [OpLog] store all the ops i.e. the history.
@ -40,7 +42,7 @@ pub struct OpLog {
/// Pending changes that haven't been applied to the dag.
/// A change can be imported only when all its deps are already imported.
/// Key is the ID of the missing dep
pending_changes: FxHashMap<ID, Vec<Change>>,
pub(crate) pending_changes: PendingChanges,
/// Whether we are importing a batch of changes.
/// If so the Dag's frontiers won't be updated until the batch is finished.
pub(crate) batch_importing: bool,
@ -110,6 +112,19 @@ impl AppDag {
.map(|(peer, vec)| ID::new(*peer, vec.last().unwrap().ctr_last()))
.collect();
}
/// If the lamport of change can be calculated, return Ok, otherwise, Err
pub(crate) fn calc_unknown_lamport_change(&self, change: &mut Change) -> Result<(), ()> {
for dep in change.deps.iter() {
match self.get_lamport(dep) {
Some(lamport) => {
change.lamport = change.lamport.max(lamport + 1);
}
None => return Err(()),
}
}
Ok(())
}
}
impl std::fmt::Debug for OpLog {
@ -177,7 +192,6 @@ impl OpLog {
pub fn import_local_change(&mut self, change: Change) -> Result<(), LoroError> {
self.check_id_is_not_duplicated(change.id)?;
if let Err(id) = self.check_deps(&change.deps) {
self.pending_changes.entry(id).or_default().push(change);
return Err(LoroError::DecodeError(
format!("Missing dep {:?}", id).into_boxed_str(),
));
@ -398,141 +412,37 @@ impl OpLog {
}
// Changes are expected to be sorted by counter in each value in the hashmap
// They should also be contiuous (TODO: check this)
// They should also be continuous (TODO: check this)
pub(crate) fn import_remote_changes(
&mut self,
changes: RemoteClientChanges,
remote_changes: RemoteClientChanges,
) -> Result<(), LoroError> {
// check whether we can append the new changes
// TODO: support pending changes
let vv = &self.dag.vv;
for (peer, changes) in &changes {
if changes.is_empty() {
continue;
}
// detect invalid d
let mut last_end_counter = None;
for change in changes.iter() {
if change.id.counter < 0 {
return Err(LoroError::DecodeError(
"Invalid data. Negative id counter.".into(),
));
}
if let Some(last_end_counter) = &mut last_end_counter {
if change.id.counter != *last_end_counter {
return Err(LoroError::DecodeError(
"Invalid data. Not continuous counter.".into(),
));
}
*last_end_counter = change.id_end().counter;
} else {
last_end_counter = Some(change.id_end().counter);
}
}
if let Some(end_cnt) = vv.get(peer) {
let first_id = changes.first().unwrap().id_start();
if first_id.counter > *end_cnt {
return Err(LoroError::DecodeError(
// TODO: Support pending changes to avoid this error
"Changes are not appliable yet."
.to_string()
.into_boxed_str(),
));
}
}
}
// TODO: should we check deps here?
let len = changes.iter().fold(0, |last, this| last + this.1.len());
let mut change_causal_arr = Vec::with_capacity(len);
self.check_changes(&remote_changes)?;
let latest_vv = self.dag.vv.clone();
// op_converter is faster than using arena directly
self.arena.with_op_converter(|converter| {
for (peer, changes) in changes {
if changes.is_empty() {
continue;
}
let cur_end_cnt = self.changes.get(&peer).map(|x| x.atom_len()).unwrap_or(0);
let last_change = changes.last().unwrap();
self.dag.vv.extend_to_include_last_id(last_change.id_last());
self.next_lamport = self.next_lamport.max(last_change.lamport_end());
self.latest_timestamp = self.latest_timestamp.max(last_change.timestamp);
for change in changes {
if change.id.counter < cur_end_cnt {
// truncate included changes
continue;
}
let mut ops = RleVec::new();
for op in change.ops {
for content in op.contents.into_iter() {
let op =
converter.convert_single_op(&op.container, op.counter, content);
ops.push(op);
}
}
let change = Change {
ops,
id: change.id,
deps: change.deps,
lamport: change.lamport,
timestamp: change.timestamp,
};
change_causal_arr.push(change);
}
}
let ids = self.arena.clone().with_op_converter(|converter| {
self.calc_pending_changes(remote_changes, converter, latest_vv)
});
// TODO: Perf
change_causal_arr.sort_by_key(|x| x.lamport);
// debug_dbg!(&change_causal_arr);
for change in change_causal_arr {
let len = change.content_len();
if change.deps.len() == 1 && change.deps[0].peer == change.id.peer {
// don't need to push new element to dag because it only depends on itself
let nodes = self.dag.map.get_mut(&change.id.peer).unwrap();
let last = nodes.vec_mut().last_mut().unwrap();
assert_eq!(last.peer, change.id.peer);
assert_eq!(last.cnt + last.len as Counter, change.id.counter);
assert_eq!(last.lamport + last.len as Lamport, change.lamport);
last.len = change.id.counter as usize + len - last.cnt as usize;
last.has_succ = false;
} else {
let vv = self.dag.frontiers_to_im_vv(&change.deps);
self.dag
.map
.entry(change.id.peer)
.or_default()
.push(AppDagNode {
vv,
peer: change.id.peer,
cnt: change.id.counter,
lamport: change.lamport,
deps: change.deps.clone(),
has_succ: false,
len,
});
for dep in change.deps.iter() {
let target = self.dag.get_mut(*dep).unwrap();
if target.ctr_last() == dep.counter {
target.has_succ = true;
}
}
}
self.changes.entry(change.id.peer).or_default().push(change);
}
let mut latest_vv = self.dag.vv.clone();
self.try_apply_pending(ids, &mut latest_vv);
if !self.batch_importing {
self.dag.refresh_frontiers();
}
Ok(())
}
pub(crate) fn import_unknown_lamport_remote_changes(
&mut self,
remote_changes: Vec<Change<RemoteOp>>,
) -> Result<(), LoroError> {
let latest_vv = self.dag.vv.clone();
self.arena.clone().with_op_converter(|converter| {
self.extend_unknown_pending_changes(remote_changes, converter, &latest_vv)
});
Ok(())
}
/// lookup change by id.
///
/// if id does not included in this oplog, return None

View file

@ -0,0 +1,398 @@
use std::ops::Deref;
use crate::{
arena::OpConverter, change::Change, encoding::RemoteClientChanges, op::RemoteOp, OpLog,
VersionVector,
};
use fxhash::FxHashMap;
use itertools::Itertools;
use loro_common::{
Counter, CounterSpan, HasCounterSpan, HasIdSpan, HasLamportSpan, Lamport, LoroError, ID,
};
use rle::{HasLength, RleVec};
use smallvec::SmallVec;
use super::AppDagNode;
#[derive(Debug)]
pub enum PendingChange {
// The lamport of the change decoded by `enhanced` is unknown.
// we need calculate it when the change can be applied
Unknown(Change),
Known(Change),
}
impl Deref for PendingChange {
type Target = Change;
fn deref(&self) -> &Self::Target {
match self {
Self::Unknown(a) => a,
Self::Known(a) => a,
}
}
}
#[derive(Debug, Default)]
pub(crate) struct PendingChanges {
changes: FxHashMap<ID, SmallVec<[PendingChange; 1]>>,
}
impl OpLog {
// calculate all `id_last`(s) whose change can be applied
pub(super) fn calc_pending_changes(
&mut self,
remote_changes: RemoteClientChanges,
converter: &mut OpConverter,
mut latest_vv: VersionVector,
) -> Vec<ID> {
let mut ans = Vec::new();
for change in remote_changes
.into_values()
.filter(|c| !c.is_empty())
.flat_map(|c| c.into_iter())
.sorted_unstable_by_key(|c| c.lamport)
{
let local_change = to_local_op(change, converter);
let local_change = PendingChange::Known(local_change);
match remote_change_apply_state(&latest_vv, &local_change) {
ChangeApplyState::CanApplyDirectly => {
latest_vv.set_end(local_change.id_end());
ans.push(local_change.id_last());
self.apply_local_change_from_remote(local_change);
}
ChangeApplyState::Applied => {}
ChangeApplyState::AwaitingDependency(miss_dep) => self
.pending_changes
.changes
.entry(miss_dep)
.or_insert_with(SmallVec::new)
.push(local_change),
}
}
ans
}
pub(super) fn extend_unknown_pending_changes(
&mut self,
remote_changes: Vec<Change<RemoteOp>>,
converter: &mut OpConverter,
latest_vv: &VersionVector,
) {
for change in remote_changes {
let local_change = to_local_op(change, converter);
let local_change = PendingChange::Unknown(local_change);
match remote_change_apply_state(latest_vv, &local_change) {
ChangeApplyState::AwaitingDependency(miss_dep) => self
.pending_changes
.changes
.entry(miss_dep)
.or_insert_with(SmallVec::new)
.push(local_change),
_ => unreachable!(),
}
}
}
}
impl OpLog {
pub(super) fn check_changes(&self, changes: &RemoteClientChanges) -> Result<(), LoroError> {
for changes in changes.values() {
if changes.is_empty() {
continue;
}
// detect invalid id
let mut last_end_counter = None;
for change in changes.iter() {
if change.id.counter < 0 {
return Err(LoroError::DecodeError(
"Invalid data. Negative id counter.".into(),
));
}
if let Some(last_end_counter) = &mut last_end_counter {
if change.id.counter != *last_end_counter {
return Err(LoroError::DecodeError(
"Invalid data. Not continuous counter.".into(),
));
}
*last_end_counter = change.id_end().counter;
} else {
last_end_counter = Some(change.id_end().counter);
}
}
}
Ok(())
}
pub(super) fn try_apply_pending(
&mut self,
mut id_stack: Vec<ID>,
latest_vv: &mut VersionVector,
) {
while let Some(id) = id_stack.pop() {
let Some(pending_changes) = self.pending_changes.changes.remove(&id) else{continue;};
for pending_change in pending_changes {
match remote_change_apply_state(latest_vv, &pending_change) {
ChangeApplyState::CanApplyDirectly => {
id_stack.push(pending_change.id_last());
latest_vv.set_end(pending_change.id_end());
self.apply_local_change_from_remote(pending_change);
}
ChangeApplyState::Applied => {}
ChangeApplyState::AwaitingDependency(miss_dep) => self
.pending_changes
.changes
.entry(miss_dep)
.or_insert_with(SmallVec::new)
.push(pending_change),
}
}
}
}
pub(super) fn apply_local_change_from_remote(&mut self, change: PendingChange) {
let change = match change {
PendingChange::Known(c) => c,
PendingChange::Unknown(mut c) => {
self.dag.calc_unknown_lamport_change(&mut c).unwrap();
c
}
};
self.next_lamport = self.next_lamport.max(change.lamport_end());
// debug_dbg!(&change_causal_arr);
self.dag.vv.extend_to_include_last_id(change.id_last());
self.latest_timestamp = self.latest_timestamp.max(change.timestamp);
let len = change.content_len();
if change.deps.len() == 1 && change.deps[0].peer == change.id.peer {
// don't need to push new element to dag because it only depends on itself
let nodes = self.dag.map.get_mut(&change.id.peer).unwrap();
let last = nodes.vec_mut().last_mut().unwrap();
assert_eq!(last.peer, change.id.peer);
assert_eq!(last.cnt + last.len as Counter, change.id.counter);
assert_eq!(last.lamport + last.len as Lamport, change.lamport);
last.len = change.id.counter as usize + len - last.cnt as usize;
last.has_succ = false;
} else {
let vv = self.dag.frontiers_to_im_vv(&change.deps);
self.dag
.map
.entry(change.id.peer)
.or_default()
.push(AppDagNode {
vv,
peer: change.id.peer,
cnt: change.id.counter,
lamport: change.lamport,
deps: change.deps.clone(),
has_succ: false,
len,
});
for dep in change.deps.iter() {
let target = self.dag.get_mut(*dep).unwrap();
if target.ctr_last() == dep.counter {
target.has_succ = true;
}
}
}
self.changes.entry(change.id.peer).or_default().push(change);
}
}
pub(super) fn to_local_op(change: Change<RemoteOp>, converter: &mut OpConverter) -> Change {
let mut ops = RleVec::new();
for op in change.ops {
for content in op.contents.into_iter() {
let op = converter.convert_single_op(&op.container, op.counter, content);
ops.push(op);
}
}
Change {
ops,
id: change.id,
deps: change.deps,
lamport: change.lamport,
timestamp: change.timestamp,
}
}
enum ChangeApplyState {
Applied,
CanApplyDirectly,
// The id of first missing dep
AwaitingDependency(ID),
}
fn remote_change_apply_state(vv: &VersionVector, change: &Change) -> ChangeApplyState {
let peer = change.id.peer;
let CounterSpan { start, end } = change.ctr_span();
let vv_latest_ctr = vv.get(&peer).copied().unwrap_or(0);
if vv_latest_ctr < start {
return ChangeApplyState::AwaitingDependency(change.id.inc(-1));
}
if vv_latest_ctr >= end {
return ChangeApplyState::Applied;
}
for dep in change.deps.as_ref().iter() {
let dep_vv_latest_ctr = vv.get(&dep.peer).copied().unwrap_or(0);
if dep_vv_latest_ctr - 1 < dep.counter {
return ChangeApplyState::AwaitingDependency(*dep);
}
}
ChangeApplyState::CanApplyDirectly
}
#[cfg(test)]
mod test {
use crate::{LoroDoc, ToJson, VersionVector};
#[test]
fn import_pending() {
let a = LoroDoc::new();
a.set_peer_id(1);
let b = LoroDoc::new();
b.set_peer_id(2);
let text_a = a.get_text("text");
a.with_txn(|txn| text_a.insert(txn, 0, "a")).unwrap();
let update1 = a.export_from(&VersionVector::default());
let version1 = a.oplog_vv();
a.with_txn(|txn| text_a.insert(txn, 0, "b")).unwrap();
let update2 = a.export_from(&version1);
let version2 = a.oplog_vv();
a.with_txn(|txn| text_a.insert(txn, 0, "c")).unwrap();
let update3 = a.export_from(&version2);
let version3 = a.oplog_vv();
a.with_txn(|txn| text_a.insert(txn, 0, "d")).unwrap();
let update4 = a.export_from(&version3);
// let version4 = a.oplog_vv();
a.with_txn(|txn| text_a.insert(txn, 0, "e")).unwrap();
let update3_5 = a.export_from(&version2);
b.import(&update3_5).unwrap();
b.import(&update4).unwrap();
b.import(&update2).unwrap();
b.import(&update3).unwrap();
b.import(&update1).unwrap();
assert_eq!(a.get_deep_value(), b.get_deep_value());
}
#[test]
fn pending_import_snapshot() {
let a = LoroDoc::new();
a.set_peer_id(1);
let b = LoroDoc::new();
b.set_peer_id(2);
let text_a = a.get_text("text");
a.with_txn(|txn| text_a.insert(txn, 0, "a")).unwrap();
let update1 = a.export_snapshot();
let version1 = a.oplog_vv();
a.with_txn(|txn| text_a.insert(txn, 1, "b")).unwrap();
let update2 = a.export_from(&version1);
let _version2 = a.oplog_vv();
b.import(&update2).unwrap();
// snapshot will be converted to updates
b.import(&update1).unwrap();
assert_eq!(a.get_deep_value(), b.get_deep_value());
}
#[test]
fn need_deps_pending_import() {
// a: a1 <--- a2
// \ /
// b: b1
let a = LoroDoc::new();
a.set_peer_id(1);
let b = LoroDoc::new();
b.set_peer_id(2);
let c = LoroDoc::new();
c.set_peer_id(3);
let d = LoroDoc::new();
d.set_peer_id(4);
let text_a = a.get_text("text");
let text_b = b.get_text("text");
a.with_txn(|txn| text_a.insert(txn, 0, "a")).unwrap();
let version_a1 = a.oplog_vv();
let update_a1 = a.export_from(&VersionVector::default());
b.import(&update_a1).unwrap();
b.with_txn(|txn| text_b.insert(txn, 1, "b")).unwrap();
let update_b1 = b.export_from(&version_a1);
a.import(&update_b1).unwrap();
let version_a1b1 = a.oplog_vv();
a.with_txn(|txn| text_a.insert(txn, 2, "c")).unwrap();
let update_a2 = a.export_from(&version_a1b1);
c.import(&update_a2).unwrap();
assert_eq!(c.get_deep_value().to_json(), "{\"text\":\"\"}");
c.import(&update_a1).unwrap();
assert_eq!(c.get_deep_value().to_json(), "{\"text\":\"a\"}");
c.import(&update_b1).unwrap();
assert_eq!(a.get_deep_value(), c.get_deep_value());
d.import(&update_a2).unwrap();
assert_eq!(d.get_deep_value().to_json(), "{\"text\":\"\"}");
d.import(&update_b1).unwrap();
assert_eq!(d.get_deep_value().to_json(), "{\"text\":\"\"}");
d.import(&update_a1).unwrap();
assert_eq!(a.get_deep_value(), d.get_deep_value());
}
#[test]
fn should_activate_pending_change_when() {
// 0@a <- 0@b
// 0@a <- 1@a, where 0@a and 1@a will be merged
// In this case, c apply b's change first, then apply all the changes from a.
// C is expected to have the same content as a, after a imported b's change
let a = LoroDoc::new();
a.set_peer_id(1);
let b = LoroDoc::new();
b.set_peer_id(2);
let c = LoroDoc::new();
c.set_peer_id(3);
let text_a = a.get_text("text");
let text_b = b.get_text("text");
a.with_txn(|txn| text_a.insert(txn, 0, "1")).unwrap();
b.import(&a.export_snapshot()).unwrap();
b.with_txn(|txn| text_b.insert(txn, 0, "1")).unwrap();
let b_change = b.export_from(&a.oplog_vv());
a.with_txn(|txn| text_a.insert(txn, 0, "1")).unwrap();
c.import(&b_change).unwrap();
c.import(&a.export_snapshot()).unwrap();
a.import(&b_change).unwrap();
assert_eq!(c.get_deep_value(), a.get_deep_value());
}
// Change cannot be merged now
// #[test]
// fn pending_changes_may_deps_merged_change() {
// // a: (a1 <-- a2 <-- a3) <-- a4 a1~a3 is a merged change
// // \ /
// // b: b1
// let a = LoroDoc::new();
// a.set_peer_id(1);
// let b = LoroDoc::new();
// b.set_peer_id(2);
// let c = LoroDoc::new();
// c.set_peer_id(3);
// let text_a = a.get_text("text");
// let text_b = b.get_text("text");
// a.with_txn(|txn| text_a.insert(txn, 0, "a")).unwrap();
// a.with_txn(|txn| text_a.insert(txn, 1, "b")).unwrap();
// let version_a12 = a.oplog_vv();
// let updates_a12 = a.export_snapshot();
// a.with_txn(|txn| text_a.insert(txn, 2, "c")).unwrap();
// let updates_a123 = a.export_snapshot();
// b.import(&updates_a12).unwrap();
// b.with_txn(|txn| text_b.insert(txn, 2, "d")).unwrap();
// let update_b1 = b.export_from(&version_a12);
// a.import(&update_b1).unwrap();
// let version_a123_b1 = a.oplog_vv();
// a.with_txn(|txn| text_a.insert(txn, 4, "e")).unwrap();
// let update_a4 = a.export_from(&version_a123_b1);
// c.import(&update_b1).unwrap();
// assert_eq!(c.get_deep_value().to_json(), "{\"text\":\"\"}");
// c.import(&update_a4).unwrap();
// assert_eq!(c.get_deep_value().to_json(), "{\"text\":\"\"}");
// c.import(&updates_a123).unwrap();
// assert_eq!(c.get_deep_value(), a.get_deep_value());
// }
}

View file

@ -184,7 +184,6 @@ pub fn decode_oplog(
change.lamport = lamport;
oplog.import_local_change(change)?;
}
Ok(())
}