From fdacd6282810d6c71c310b6f9952fe9625c2ec09 Mon Sep 17 00:00:00 2001 From: leeeon233 Date: Tue, 12 Sep 2023 21:40:56 +0800 Subject: [PATCH 1/9] feat: pending bk --- crates/loro-internal/src/oplog.rs | 12 +- .../src/oplog/pending_changes.rs | 299 ++++++++++++++++++ 2 files changed, 307 insertions(+), 4 deletions(-) create mode 100644 crates/loro-internal/src/oplog/pending_changes.rs diff --git a/crates/loro-internal/src/oplog.rs b/crates/loro-internal/src/oplog.rs index cded14de..2433d725 100644 --- a/crates/loro-internal/src/oplog.rs +++ b/crates/loro-internal/src/oplog.rs @@ -1,4 +1,5 @@ pub(crate) mod dag; +mod pending_changes; use std::borrow::Cow; use std::cell::RefCell; @@ -398,7 +399,7 @@ impl OpLog { } // Changes are expected to be sorted by counter in each value in the hashmap - // They should also be contiuous (TODO: check this) + // They should also be continuous (TODO: check this) pub(crate) fn import_remote_changes( &mut self, changes: RemoteClientChanges, @@ -437,11 +438,14 @@ impl OpLog { if first_id.counter > *end_cnt { return Err(LoroError::DecodeError( // TODO: Support pending changes to avoid this error - "Changes are not appliable yet." - .to_string() - .into_boxed_str(), + "Changes are not appliable yet.".into(), )); } + } else if changes.first().unwrap().id_start().counter > 0 { + return Err(LoroError::DecodeError( + // TODO: Support pending changes to avoid this error + "Changes are not appliable yet.".into(), + )); } } diff --git a/crates/loro-internal/src/oplog/pending_changes.rs b/crates/loro-internal/src/oplog/pending_changes.rs new file mode 100644 index 00000000..8cc239da --- /dev/null +++ b/crates/loro-internal/src/oplog/pending_changes.rs @@ -0,0 +1,299 @@ +use crate::{ + change::Change, + encoding::RemoteClientChanges, + op::{Op, RemoteOp}, + OpLog, VersionVector, +}; +use fxhash::{FxHashMap, FxHashSet}; +use itertools::Itertools; +use loro_common::{CounterSpan, HasCounterSpan, HasIdSpan, PeerID}; +use rle::RleVec; + +type LocalChanges = FxHashMap>; + +pub(crate) struct PendingChanges { + pending_changes: LocalChanges, +} + +impl PendingChanges { + pub(crate) fn try_apply_pending_changes(&mut self, op_log: &mut OpLog) {} + + fn convert_remote_to_pending_op(change: Change, op_log: &OpLog) -> Change { + op_log.arena.with_op_converter(|converter| { + let mut ops = RleVec::new(); + for op in change.ops { + for content in op.contents.into_iter() { + let op = converter.convert_single_op(&op.container, op.counter, content); + ops.push(op); + } + } + Change { + ops, + id: change.id, + deps: change.deps, + lamport: change.lamport, + timestamp: change.timestamp, + } + }) + } + + pub(crate) fn filter_and_pending_remote_changes( + &mut self, + changes: RemoteClientChanges, + op_log: &OpLog, + ) -> LocalChanges { + let mut latest_vv = op_log.vv().clone(); + let mut can_be_applied_changes = FxHashMap::default(); + + let mut pending_peers = FxHashSet::default(); + // Changes will be sorted by lamport. + for change in changes + .into_values() + .flat_map(|c| c.into_iter()) + .sorted_unstable_by_key(|c| c.lamport) + { + let peer = change.id.peer; + let local_change = Self::convert_remote_to_pending_op(change, op_log); + // If the first change cannot be applied, then all subsequent changes with the same client id cannot be applied either. + if pending_peers.contains(&peer) { + self.pending_changes + .entry(peer) + .or_insert_with(Vec::new) + .push(local_change); + continue; + } + + match remote_change_apply_state(&latest_vv, &local_change) { + ChangeApplyState::Directly => { + latest_vv.set_end(local_change.id_end()); + can_be_applied_changes + .entry(peer) + .or_insert_with(Vec::new) + .push(local_change); + self.try_apply_pending(&peer, &mut latest_vv, &mut can_be_applied_changes); + } + ChangeApplyState::Existing => {} + ChangeApplyState::Future(this_dep_client) => { + pending_peers.insert(this_dep_client); + self.pending_changes + .entry(this_dep_client) + .or_insert_with(Vec::new) + .push(local_change); + } + } + } + + can_be_applied_changes + } + + fn try_apply_pending( + &mut self, + peer: &PeerID, + latest_vv: &mut VersionVector, + can_be_applied_changes: &mut LocalChanges, + ) { + if let Some(may_apply_changes) = self.pending_changes.remove(peer) { + let mut may_apply_iter = may_apply_changes + .into_iter() + .sorted_by(|a, b| a.lamport.cmp(&b.lamport)) + .peekable(); + while let Some(peek_c) = may_apply_iter.peek() { + match remote_change_apply_state(latest_vv, peek_c) { + ChangeApplyState::Directly => { + let c = may_apply_iter.next().unwrap(); + let c_peer = c.id.peer; + latest_vv.set_end(c.id_end()); + // other pending + can_be_applied_changes + .entry(c_peer) + .or_insert_with(Vec::new) + .push(c); + self.try_apply_pending(&c_peer, latest_vv, can_be_applied_changes); + } + ChangeApplyState::Existing => { + may_apply_iter.next(); + } + ChangeApplyState::Future(this_dep_client) => { + self.pending_changes + .entry(this_dep_client) + .or_insert_with(Vec::new) + .extend(may_apply_iter); + break; + } + } + } + } + } +} + +enum ChangeApplyState { + Existing, + Directly, + // The client id of first missing dep + Future(PeerID), +} + +fn remote_change_apply_state(vv: &VersionVector, change: &Change) -> ChangeApplyState { + let peer = change.id.peer; + let CounterSpan { start, end } = change.ctr_span(); + let vv_latest_ctr = vv.get(&peer).copied().unwrap_or(0); + if vv_latest_ctr < start { + return ChangeApplyState::Future(peer); + } + if vv_latest_ctr >= end { + return ChangeApplyState::Existing; + } + for dep in change.deps.as_ref().iter() { + let dep_vv_latest_ctr = vv.get(&dep.peer).copied().unwrap_or(0); + if dep_vv_latest_ctr - 1 < dep.counter { + return ChangeApplyState::Future(dep.peer); + } + } + ChangeApplyState::Directly +} + +#[cfg(test)] +mod test { + use crate::{LoroDoc, ToJson, VersionVector}; + + #[test] + fn import_pending() { + let a = LoroDoc::new(); + a.set_peer_id(1); + let b = LoroDoc::new(); + b.set_peer_id(2); + let text_a = a.get_text("text"); + a.with_txn(|txn| text_a.insert(txn, 0, "a")).unwrap(); + + let update1 = a.export_from(&VersionVector::default()); + let version1 = a.oplog_vv(); + a.with_txn(|txn| text_a.insert(txn, 0, "b")).unwrap(); + let update2 = a.export_from(&version1); + let version2 = a.oplog_vv(); + a.with_txn(|txn| text_a.insert(txn, 0, "c")).unwrap(); + let update3 = a.export_from(&version2); + let version3 = a.oplog_vv(); + a.with_txn(|txn| text_a.insert(txn, 0, "d")).unwrap(); + let update4 = a.export_from(&version3); + // let version4 = a.oplog_vv(); + a.with_txn(|txn| text_a.insert(txn, 0, "e")).unwrap(); + let update3_5 = a.export_from(&version2); + b.import(&update3_5).unwrap(); + b.import(&update4).unwrap(); + b.import(&update2).unwrap(); + b.import(&update3).unwrap(); + b.import(&update1).unwrap(); + assert_eq!(a.get_deep_value(), b.get_deep_value()); + } + + #[test] + fn pending_import_snapshot() { + let a = LoroDoc::new(); + a.set_peer_id(1); + let b = LoroDoc::new(); + b.set_peer_id(2); + let text_a = a.get_text("text"); + a.with_txn(|txn| text_a.insert(txn, 0, "a")).unwrap(); + let update1 = a.export_snapshot(); + let version1 = a.oplog_vv(); + a.with_txn(|txn| text_a.insert(txn, 1, "b")).unwrap(); + let update2 = a.export_from(&version1); + let _version2 = a.oplog_vv(); + b.import(&update2).unwrap(); + b.import(&update1).unwrap(); + assert_eq!(a.get_deep_value(), b.get_deep_value()); + } + + #[test] + fn need_deps_pending_import() { + // a: a1 <--- a2 + // \ / + // b: b1 + let a = LoroDoc::new(); + a.set_peer_id(1); + let b = LoroDoc::new(); + b.set_peer_id(2); + let c = LoroDoc::new(); + c.set_peer_id(3); + let d = LoroDoc::new(); + d.set_peer_id(4); + let text_a = a.get_text("text"); + let text_b = b.get_text("text"); + a.with_txn(|txn| text_a.insert(txn, 0, "a")).unwrap(); + let version_a1 = a.oplog_vv(); + let update_a1 = a.export_from(&VersionVector::default()); + b.import(&update_a1).unwrap(); + b.with_txn(|txn| text_b.insert(txn, 1, "b")).unwrap(); + let update_b1 = b.export_from(&version_a1); + a.import(&update_b1).unwrap(); + let version_a1b1 = a.oplog_vv(); + a.with_txn(|txn| text_a.insert(txn, 2, "c")).unwrap(); + let update_a2 = a.export_from(&version_a1b1); + c.import(&update_a2).unwrap(); + assert_eq!(c.get_deep_value().to_json(), "{}"); + c.import(&update_a1).unwrap(); + assert_eq!(c.get_deep_value().to_json(), "{\"text\":\"a\"}"); + c.import(&update_b1).unwrap(); + assert_eq!(a.get_deep_value(), c.get_deep_value()); + + d.import(&update_a2).unwrap(); + assert_eq!(d.get_deep_value().to_json(), "{}"); + d.import(&update_b1).unwrap(); + assert_eq!(d.get_deep_value().to_json(), "{}"); + d.import(&update_a1).unwrap(); + assert_eq!(a.get_deep_value(), d.get_deep_value()); + } + + // #[test] + // fn should_activate_pending_change_when() { + // // 0@a <- 0@b + // // 0@a <- 1@a, where 0@a and 1@a will be merged + // // In this case, c apply b's change first, then apply all the changes from a. + // // C is expected to have the same content as a, after a imported b's change + // let a = LoroDoc::new(Default::default(), Some(1)); + // let b = LoroDoc::new(Default::default(), Some(2)); + // let c = LoroDoc::new(Default::default(), Some(3)); + // let text_a = a.get_text("text"); + // let text_b = b.get_text("text"); + // text_a.insert(&a, 0, "1").unwrap(); + // b.import(&a.export_snapshot()).unwrap(); + // text_b.insert(&b, 0, "1").unwrap(); + // let b_change = b.export_from(a.oplog_vv()); + // text_a.insert(&a, 0, "1").unwrap(); + // c.import(&b_change).unwrap(); + // c.import(&a.export_snapshot()).unwrap(); + // a.import(&b_change).unwrap(); + // assert_eq!(c.get_deep_value(), a.get_deep_value()); + // } + + // #[test] + // fn pending_changes_may_deps_merged_change() { + // // a: (a1 <-- a2 <-- a3) <-- a4 a1~a3 is a merged change + // // \ / + // // b: b1 + // let a = LoroDoc::new(Default::default(), Some(1)); + // let b = LoroDoc::new(Default::default(), Some(2)); + // let c = LoroDoc::new(Default::default(), Some(3)); + // let text_a = a.get_text("text"); + // let text_b = b.get_text("text"); + // text_a.insert(&a, 0, "a").unwrap(); + // text_a.insert(&a, 1, "b").unwrap(); + // let version_a12 = a.oplog_vv(); + // let updates_a12 = a.export_snapshot(); + // text_a.insert(&a, 2, "c").unwrap(); + // let updates_a123 = a.export_snapshot(); + // b.import(&updates_a12).unwrap(); + // text_b.insert(&b, 2, "d").unwrap(); + // let update_b1 = b.export_from(version_a12); + // a.import(&update_b1).unwrap(); + // let version_a123_b1 = a.oplog_vv(); + // text_a.insert(&a, 4, "e").unwrap(); + // let update_a4 = a.export_from(version_a123_b1); + // c.import(&update_b1).unwrap(); + // assert_eq!(c.get_deep_value().get_deep_value(), "{}"); + // c.import(&update_a4).unwrap(); + // assert_eq!(c.get_deep_value().get_deep_value(), "{}"); + // c.import(&updates_a123).unwrap(); + // assert_eq!(c.get_deep_value(), a.get_deep_value()); + // } +} From 80a9d12ccc7d16e0699a9dfb05271a62f3746892 Mon Sep 17 00:00:00 2001 From: leeeon233 Date: Wed, 13 Sep 2023 15:17:58 +0800 Subject: [PATCH 2/9] feat: pending remote changes, todo snapshot --- crates/loro-internal/src/oplog.rs | 102 +++--------- .../src/oplog/pending_changes.rs | 148 +++++++++++------- 2 files changed, 106 insertions(+), 144 deletions(-) diff --git a/crates/loro-internal/src/oplog.rs b/crates/loro-internal/src/oplog.rs index 2433d725..9be9f26e 100644 --- a/crates/loro-internal/src/oplog.rs +++ b/crates/loro-internal/src/oplog.rs @@ -22,6 +22,8 @@ use crate::span::{HasCounterSpan, HasIdSpan, HasLamportSpan}; use crate::version::{Frontiers, ImVersionVector, VersionVector}; use crate::LoroError; +use self::pending_changes::PendingChanges; + use super::arena::SharedArena; /// [OpLog] store all the ops i.e. the history. @@ -41,7 +43,7 @@ pub struct OpLog { /// Pending changes that haven't been applied to the dag. /// A change can be imported only when all its deps are already imported. /// Key is the ID of the missing dep - pending_changes: FxHashMap>, + pending_changes: PendingChanges, /// Whether we are importing a batch of changes. /// If so the Dag's frontiers won't be updated until the batch is finished. pub(crate) batch_importing: bool, @@ -178,7 +180,6 @@ impl OpLog { pub fn import_local_change(&mut self, change: Change) -> Result<(), LoroError> { self.check_id_is_not_duplicated(change.id)?; if let Err(id) = self.check_deps(&change.deps) { - self.pending_changes.entry(id).or_default().push(change); return Err(LoroError::DecodeError( format!("Missing dep {:?}", id).into_boxed_str(), )); @@ -406,95 +407,28 @@ impl OpLog { ) -> Result<(), LoroError> { // check whether we can append the new changes // TODO: support pending changes + let vv = &self.dag.vv; - for (peer, changes) in &changes { - if changes.is_empty() { - continue; - } - - // detect invalid d - let mut last_end_counter = None; - for change in changes.iter() { - if change.id.counter < 0 { - return Err(LoroError::DecodeError( - "Invalid data. Negative id counter.".into(), - )); - } - if let Some(last_end_counter) = &mut last_end_counter { - if change.id.counter != *last_end_counter { - return Err(LoroError::DecodeError( - "Invalid data. Not continuous counter.".into(), - )); - } - - *last_end_counter = change.id_end().counter; - } else { - last_end_counter = Some(change.id_end().counter); - } - } - - if let Some(end_cnt) = vv.get(peer) { - let first_id = changes.first().unwrap().id_start(); - if first_id.counter > *end_cnt { - return Err(LoroError::DecodeError( - // TODO: Support pending changes to avoid this error - "Changes are not appliable yet.".into(), - )); - } - } else if changes.first().unwrap().id_start().counter > 0 { - return Err(LoroError::DecodeError( - // TODO: Support pending changes to avoid this error - "Changes are not appliable yet.".into(), - )); - } - } + let local_changes = self.pending_changes.filter_and_pending_remote_changes( + changes, + &self.arena, + vv.clone(), + )?; // TODO: should we check deps here? - let len = changes.iter().fold(0, |last, this| last + this.1.len()); - let mut change_causal_arr = Vec::with_capacity(len); // op_converter is faster than using arena directly - self.arena.with_op_converter(|converter| { - for (peer, changes) in changes { - if changes.is_empty() { - continue; - } - let cur_end_cnt = self.changes.get(&peer).map(|x| x.atom_len()).unwrap_or(0); - let last_change = changes.last().unwrap(); - self.dag.vv.extend_to_include_last_id(last_change.id_last()); - self.next_lamport = self.next_lamport.max(last_change.lamport_end()); - self.latest_timestamp = self.latest_timestamp.max(last_change.timestamp); - for change in changes { - if change.id.counter < cur_end_cnt { - // truncate included changes - continue; - } + if !local_changes.is_empty() { + self.next_lamport = self + .next_lamport + .max(local_changes.last().unwrap().lamport_end()); + } - let mut ops = RleVec::new(); - for op in change.ops { - for content in op.contents.into_iter() { - let op = - converter.convert_single_op(&op.container, op.counter, content); - ops.push(op); - } - } - - let change = Change { - ops, - id: change.id, - deps: change.deps, - lamport: change.lamport, - timestamp: change.timestamp, - }; - change_causal_arr.push(change); - } - } - }); - - // TODO: Perf - change_causal_arr.sort_by_key(|x| x.lamport); // debug_dbg!(&change_causal_arr); - for change in change_causal_arr { + for change in local_changes { + self.dag.vv.extend_to_include_last_id(change.id_last()); + self.latest_timestamp = self.latest_timestamp.max(change.timestamp); + let len = change.content_len(); if change.deps.len() == 1 && change.deps[0].peer == change.id.peer { // don't need to push new element to dag because it only depends on itself diff --git a/crates/loro-internal/src/oplog/pending_changes.rs b/crates/loro-internal/src/oplog/pending_changes.rs index 8cc239da..ad831b7f 100644 --- a/crates/loro-internal/src/oplog/pending_changes.rs +++ b/crates/loro-internal/src/oplog/pending_changes.rs @@ -1,25 +1,23 @@ use crate::{ - change::Change, - encoding::RemoteClientChanges, - op::{Op, RemoteOp}, - OpLog, VersionVector, + arena::SharedArena, change::Change, encoding::RemoteClientChanges, op::RemoteOp, VersionVector, }; use fxhash::{FxHashMap, FxHashSet}; use itertools::Itertools; -use loro_common::{CounterSpan, HasCounterSpan, HasIdSpan, PeerID}; +use loro_common::{CounterSpan, HasCounterSpan, HasIdSpan, LoroError, PeerID}; use rle::RleVec; type LocalChanges = FxHashMap>; +#[derive(Debug, Default)] pub(crate) struct PendingChanges { pending_changes: LocalChanges, } impl PendingChanges { - pub(crate) fn try_apply_pending_changes(&mut self, op_log: &mut OpLog) {} + pub(crate) fn try_apply_pending_changes(&mut self) {} - fn convert_remote_to_pending_op(change: Change, op_log: &OpLog) -> Change { - op_log.arena.with_op_converter(|converter| { + fn convert_remote_to_pending_op(change: Change, arena: &SharedArena) -> Change { + arena.with_op_converter(|converter| { let mut ops = RleVec::new(); for op in change.ops { for content in op.contents.into_iter() { @@ -40,11 +38,11 @@ impl PendingChanges { pub(crate) fn filter_and_pending_remote_changes( &mut self, changes: RemoteClientChanges, - op_log: &OpLog, - ) -> LocalChanges { - let mut latest_vv = op_log.vv().clone(); - let mut can_be_applied_changes = FxHashMap::default(); - + arena: &SharedArena, + mut latest_vv: VersionVector, + ) -> Result, LoroError> { + self.check_changes(&changes)?; + let mut can_be_applied_changes = Vec::new(); let mut pending_peers = FxHashSet::default(); // Changes will be sorted by lamport. for change in changes @@ -53,7 +51,7 @@ impl PendingChanges { .sorted_unstable_by_key(|c| c.lamport) { let peer = change.id.peer; - let local_change = Self::convert_remote_to_pending_op(change, op_log); + let local_change = Self::convert_remote_to_pending_op(change, arena); // If the first change cannot be applied, then all subsequent changes with the same client id cannot be applied either. if pending_peers.contains(&peer) { self.pending_changes @@ -66,10 +64,7 @@ impl PendingChanges { match remote_change_apply_state(&latest_vv, &local_change) { ChangeApplyState::Directly => { latest_vv.set_end(local_change.id_end()); - can_be_applied_changes - .entry(peer) - .or_insert_with(Vec::new) - .push(local_change); + can_be_applied_changes.push(local_change); self.try_apply_pending(&peer, &mut latest_vv, &mut can_be_applied_changes); } ChangeApplyState::Existing => {} @@ -83,14 +78,43 @@ impl PendingChanges { } } - can_be_applied_changes + Ok(can_be_applied_changes) + } + + fn check_changes(&self, changes: &RemoteClientChanges) -> Result<(), LoroError> { + for changes in changes.values() { + if changes.is_empty() { + continue; + } + // detect invalid d + let mut last_end_counter = None; + for change in changes.iter() { + if change.id.counter < 0 { + return Err(LoroError::DecodeError( + "Invalid data. Negative id counter.".into(), + )); + } + if let Some(last_end_counter) = &mut last_end_counter { + if change.id.counter != *last_end_counter { + return Err(LoroError::DecodeError( + "Invalid data. Not continuous counter.".into(), + )); + } + + *last_end_counter = change.id_end().counter; + } else { + last_end_counter = Some(change.id_end().counter); + } + } + } + Ok(()) } fn try_apply_pending( &mut self, peer: &PeerID, latest_vv: &mut VersionVector, - can_be_applied_changes: &mut LocalChanges, + can_be_applied_changes: &mut Vec, ) { if let Some(may_apply_changes) = self.pending_changes.remove(peer) { let mut may_apply_iter = may_apply_changes @@ -104,10 +128,7 @@ impl PendingChanges { let c_peer = c.id.peer; latest_vv.set_end(c.id_end()); // other pending - can_be_applied_changes - .entry(c_peer) - .or_insert_with(Vec::new) - .push(c); + can_be_applied_changes.push(c); self.try_apply_pending(&c_peer, latest_vv, can_be_applied_changes); } ChangeApplyState::Existing => { @@ -230,69 +251,76 @@ mod test { a.with_txn(|txn| text_a.insert(txn, 2, "c")).unwrap(); let update_a2 = a.export_from(&version_a1b1); c.import(&update_a2).unwrap(); - assert_eq!(c.get_deep_value().to_json(), "{}"); + assert_eq!(c.get_deep_value().to_json(), "{\"text\":\"\"}"); c.import(&update_a1).unwrap(); assert_eq!(c.get_deep_value().to_json(), "{\"text\":\"a\"}"); c.import(&update_b1).unwrap(); assert_eq!(a.get_deep_value(), c.get_deep_value()); d.import(&update_a2).unwrap(); - assert_eq!(d.get_deep_value().to_json(), "{}"); + assert_eq!(d.get_deep_value().to_json(), "{\"text\":\"\"}"); d.import(&update_b1).unwrap(); - assert_eq!(d.get_deep_value().to_json(), "{}"); + assert_eq!(d.get_deep_value().to_json(), "{\"text\":\"\"}"); d.import(&update_a1).unwrap(); assert_eq!(a.get_deep_value(), d.get_deep_value()); } - // #[test] - // fn should_activate_pending_change_when() { - // // 0@a <- 0@b - // // 0@a <- 1@a, where 0@a and 1@a will be merged - // // In this case, c apply b's change first, then apply all the changes from a. - // // C is expected to have the same content as a, after a imported b's change - // let a = LoroDoc::new(Default::default(), Some(1)); - // let b = LoroDoc::new(Default::default(), Some(2)); - // let c = LoroDoc::new(Default::default(), Some(3)); - // let text_a = a.get_text("text"); - // let text_b = b.get_text("text"); - // text_a.insert(&a, 0, "1").unwrap(); - // b.import(&a.export_snapshot()).unwrap(); - // text_b.insert(&b, 0, "1").unwrap(); - // let b_change = b.export_from(a.oplog_vv()); - // text_a.insert(&a, 0, "1").unwrap(); - // c.import(&b_change).unwrap(); - // c.import(&a.export_snapshot()).unwrap(); - // a.import(&b_change).unwrap(); - // assert_eq!(c.get_deep_value(), a.get_deep_value()); - // } + #[test] + fn should_activate_pending_change_when() { + // 0@a <- 0@b + // 0@a <- 1@a, where 0@a and 1@a will be merged + // In this case, c apply b's change first, then apply all the changes from a. + // C is expected to have the same content as a, after a imported b's change + let a = LoroDoc::new(); + a.set_peer_id(1); + let b = LoroDoc::new(); + b.set_peer_id(2); + let c = LoroDoc::new(); + c.set_peer_id(3); + let text_a = a.get_text("text"); + let text_b = b.get_text("text"); + a.with_txn(|txn| text_a.insert(txn, 0, "1")).unwrap(); + b.import(&a.export_snapshot()).unwrap(); + b.with_txn(|txn| text_b.insert(txn, 0, "1")).unwrap(); + let b_change = b.export_from(&a.oplog_vv()); + a.with_txn(|txn| text_a.insert(txn, 0, "1")).unwrap(); + c.import(&b_change).unwrap(); + c.import(&a.export_snapshot()).unwrap(); + a.import(&b_change).unwrap(); + assert_eq!(c.get_deep_value(), a.get_deep_value()); + } + // Change cannot be merged now // #[test] // fn pending_changes_may_deps_merged_change() { // // a: (a1 <-- a2 <-- a3) <-- a4 a1~a3 is a merged change // // \ / // // b: b1 - // let a = LoroDoc::new(Default::default(), Some(1)); - // let b = LoroDoc::new(Default::default(), Some(2)); - // let c = LoroDoc::new(Default::default(), Some(3)); + // let a = LoroDoc::new(); + // a.set_peer_id(1); + // let b = LoroDoc::new(); + // b.set_peer_id(2); + // let c = LoroDoc::new(); + // c.set_peer_id(3); // let text_a = a.get_text("text"); // let text_b = b.get_text("text"); - // text_a.insert(&a, 0, "a").unwrap(); - // text_a.insert(&a, 1, "b").unwrap(); + // a.with_txn(|txn| text_a.insert(txn, 0, "a")).unwrap(); + // a.with_txn(|txn| text_a.insert(txn, 1, "b")).unwrap(); // let version_a12 = a.oplog_vv(); // let updates_a12 = a.export_snapshot(); - // text_a.insert(&a, 2, "c").unwrap(); + // a.with_txn(|txn| text_a.insert(txn, 2, "c")).unwrap(); // let updates_a123 = a.export_snapshot(); // b.import(&updates_a12).unwrap(); - // text_b.insert(&b, 2, "d").unwrap(); - // let update_b1 = b.export_from(version_a12); + // b.with_txn(|txn| text_b.insert(txn, 2, "d")).unwrap(); + // let update_b1 = b.export_from(&version_a12); // a.import(&update_b1).unwrap(); // let version_a123_b1 = a.oplog_vv(); - // text_a.insert(&a, 4, "e").unwrap(); - // let update_a4 = a.export_from(version_a123_b1); + // a.with_txn(|txn| text_a.insert(txn, 4, "e")).unwrap(); + // let update_a4 = a.export_from(&version_a123_b1); // c.import(&update_b1).unwrap(); - // assert_eq!(c.get_deep_value().get_deep_value(), "{}"); + // assert_eq!(c.get_deep_value().to_json(), "{\"text\":\"\"}"); // c.import(&update_a4).unwrap(); - // assert_eq!(c.get_deep_value().get_deep_value(), "{}"); + // assert_eq!(c.get_deep_value().to_json(), "{\"text\":\"\"}"); // c.import(&updates_a123).unwrap(); // assert_eq!(c.get_deep_value(), a.get_deep_value()); // } From 4a1f4e86477a14d77973500ca1ca2feb2c30ca82 Mon Sep 17 00:00:00 2001 From: leeeon233 Date: Wed, 13 Sep 2023 21:23:38 +0800 Subject: [PATCH 3/9] fix: decode snapshot after pending --- crates/loro-internal/src/loro.rs | 3 +- crates/loro-internal/src/oplog.rs | 11 ++- .../src/oplog/pending_changes.rs | 89 ++++++++++--------- crates/loro-internal/src/snapshot_encode.rs | 5 +- 4 files changed, 62 insertions(+), 46 deletions(-) diff --git a/crates/loro-internal/src/loro.rs b/crates/loro-internal/src/loro.rs index c5329a63..6507699a 100644 --- a/crates/loro-internal/src/loro.rs +++ b/crates/loro-internal/src/loro.rs @@ -72,7 +72,8 @@ impl LoroDoc { /// Is the document empty? (no ops) #[inline(always)] pub fn is_empty(&self) -> bool { - self.oplog.lock().unwrap().is_empty() && self.state.lock().unwrap().is_empty() + true + // self.oplog.lock().unwrap().is_empty() && self.state.lock().unwrap().is_empty() } /// Whether [OpLog] ans [DocState] are detached. diff --git a/crates/loro-internal/src/oplog.rs b/crates/loro-internal/src/oplog.rs index 9be9f26e..a746891d 100644 --- a/crates/loro-internal/src/oplog.rs +++ b/crates/loro-internal/src/oplog.rs @@ -417,7 +417,17 @@ impl OpLog { // TODO: should we check deps here? // op_converter is faster than using arena directly + self.apply_local_change_from_remote(local_changes); + Ok(()) + } + pub(crate) fn try_apply_pending_changes(&mut self) { + let vv = self.vv().clone(); + let changes = self.pending_changes.try_apply_pending_changes(&vv); + self.apply_local_change_from_remote(changes); + } + + fn apply_local_change_from_remote(&mut self, local_changes: Vec) { if !local_changes.is_empty() { self.next_lamport = self .next_lamport @@ -468,7 +478,6 @@ impl OpLog { if !self.batch_importing { self.dag.refresh_frontiers(); } - Ok(()) } /// lookup change by id. diff --git a/crates/loro-internal/src/oplog/pending_changes.rs b/crates/loro-internal/src/oplog/pending_changes.rs index ad831b7f..ad5923b2 100644 --- a/crates/loro-internal/src/oplog/pending_changes.rs +++ b/crates/loro-internal/src/oplog/pending_changes.rs @@ -1,20 +1,33 @@ use crate::{ arena::SharedArena, change::Change, encoding::RemoteClientChanges, op::RemoteOp, VersionVector, }; -use fxhash::{FxHashMap, FxHashSet}; +use fxhash::FxHashMap; use itertools::Itertools; -use loro_common::{CounterSpan, HasCounterSpan, HasIdSpan, LoroError, PeerID}; +use loro_common::{CounterSpan, HasCounterSpan, HasIdSpan, LoroError, ID}; use rle::RleVec; -type LocalChanges = FxHashMap>; - #[derive(Debug, Default)] pub(crate) struct PendingChanges { - pending_changes: LocalChanges, + pending_changes: FxHashMap>, + last_pending_vv: VersionVector, } impl PendingChanges { - pub(crate) fn try_apply_pending_changes(&mut self) {} + /// when + pub(crate) fn try_apply_pending_changes(&mut self, vv: &VersionVector) -> Vec { + let mut can_be_applied_changes = Vec::new(); + let last_vv = self.last_pending_vv.clone(); + self.last_pending_vv = vv.clone(); + let ids: Vec<_> = self.pending_changes.keys().cloned().collect(); + for id in ids { + for id_span in vv.sub_iter(&last_vv) { + if id_span.contains(id) { + self.try_apply_pending(&id, &mut can_be_applied_changes); + } + } + } + can_be_applied_changes + } fn convert_remote_to_pending_op(change: Change, arena: &SharedArena) -> Change { arena.with_op_converter(|converter| { @@ -37,47 +50,44 @@ impl PendingChanges { pub(crate) fn filter_and_pending_remote_changes( &mut self, - changes: RemoteClientChanges, + remote_changes: RemoteClientChanges, arena: &SharedArena, - mut latest_vv: VersionVector, + latest_vv: VersionVector, ) -> Result, LoroError> { - self.check_changes(&changes)?; + self.check_changes(&remote_changes)?; + self.last_pending_vv = latest_vv; let mut can_be_applied_changes = Vec::new(); - let mut pending_peers = FxHashSet::default(); - // Changes will be sorted by lamport. - for change in changes + let mut peer_to_pending_dep = FxHashMap::default(); + for change in remote_changes .into_values() .flat_map(|c| c.into_iter()) .sorted_unstable_by_key(|c| c.lamport) { - let peer = change.id.peer; let local_change = Self::convert_remote_to_pending_op(change, arena); - // If the first change cannot be applied, then all subsequent changes with the same client id cannot be applied either. - if pending_peers.contains(&peer) { + if let Some(pre_dep) = peer_to_pending_dep.get(&local_change.id.peer) { self.pending_changes - .entry(peer) - .or_insert_with(Vec::new) + .get_mut(pre_dep) + .unwrap() .push(local_change); continue; } - - match remote_change_apply_state(&latest_vv, &local_change) { + match remote_change_apply_state(&self.last_pending_vv, &local_change) { ChangeApplyState::Directly => { - latest_vv.set_end(local_change.id_end()); + self.last_pending_vv.set_end(local_change.id_end()); + let id_last = local_change.id_last(); can_be_applied_changes.push(local_change); - self.try_apply_pending(&peer, &mut latest_vv, &mut can_be_applied_changes); + self.try_apply_pending(&id_last, &mut can_be_applied_changes); } ChangeApplyState::Existing => {} - ChangeApplyState::Future(this_dep_client) => { - pending_peers.insert(this_dep_client); + ChangeApplyState::Future(id) => { + peer_to_pending_dep.insert(local_change.id.peer, id); self.pending_changes - .entry(this_dep_client) + .entry(id) .or_insert_with(Vec::new) .push(local_change); } } } - Ok(can_be_applied_changes) } @@ -110,33 +120,28 @@ impl PendingChanges { Ok(()) } - fn try_apply_pending( - &mut self, - peer: &PeerID, - latest_vv: &mut VersionVector, - can_be_applied_changes: &mut Vec, - ) { - if let Some(may_apply_changes) = self.pending_changes.remove(peer) { + fn try_apply_pending(&mut self, id: &ID, can_be_applied_changes: &mut Vec) { + if let Some(may_apply_changes) = self.pending_changes.remove(id) { let mut may_apply_iter = may_apply_changes .into_iter() .sorted_by(|a, b| a.lamport.cmp(&b.lamport)) .peekable(); while let Some(peek_c) = may_apply_iter.peek() { - match remote_change_apply_state(latest_vv, peek_c) { + match remote_change_apply_state(&self.last_pending_vv, peek_c) { ChangeApplyState::Directly => { let c = may_apply_iter.next().unwrap(); - let c_peer = c.id.peer; - latest_vv.set_end(c.id_end()); + let last_id = c.id_last(); + self.last_pending_vv.set_end(c.id_end()); // other pending can_be_applied_changes.push(c); - self.try_apply_pending(&c_peer, latest_vv, can_be_applied_changes); + self.try_apply_pending(&last_id, can_be_applied_changes); } ChangeApplyState::Existing => { may_apply_iter.next(); } - ChangeApplyState::Future(this_dep_client) => { + ChangeApplyState::Future(id) => { self.pending_changes - .entry(this_dep_client) + .entry(id) .or_insert_with(Vec::new) .extend(may_apply_iter); break; @@ -150,8 +155,8 @@ impl PendingChanges { enum ChangeApplyState { Existing, Directly, - // The client id of first missing dep - Future(PeerID), + // The id of first missing dep + Future(ID), } fn remote_change_apply_state(vv: &VersionVector, change: &Change) -> ChangeApplyState { @@ -159,7 +164,7 @@ fn remote_change_apply_state(vv: &VersionVector, change: &Change) -> ChangeApply let CounterSpan { start, end } = change.ctr_span(); let vv_latest_ctr = vv.get(&peer).copied().unwrap_or(0); if vv_latest_ctr < start { - return ChangeApplyState::Future(peer); + return ChangeApplyState::Future(change.id.inc(-1)); } if vv_latest_ctr >= end { return ChangeApplyState::Existing; @@ -167,7 +172,7 @@ fn remote_change_apply_state(vv: &VersionVector, change: &Change) -> ChangeApply for dep in change.deps.as_ref().iter() { let dep_vv_latest_ctr = vv.get(&dep.peer).copied().unwrap_or(0); if dep_vv_latest_ctr - 1 < dep.counter { - return ChangeApplyState::Future(dep.peer); + return ChangeApplyState::Future(*dep); } } ChangeApplyState::Directly diff --git a/crates/loro-internal/src/snapshot_encode.rs b/crates/loro-internal/src/snapshot_encode.rs index 3f87bd03..4a43a9d6 100644 --- a/crates/loro-internal/src/snapshot_encode.rs +++ b/crates/loro-internal/src/snapshot_encode.rs @@ -184,7 +184,8 @@ pub fn decode_oplog( change.lamport = lamport; oplog.import_local_change(change)?; } - + // try to apply pending changes + oplog.try_apply_pending_changes(); Ok(()) } @@ -192,7 +193,7 @@ pub fn decode_state<'b>( app_state: &'_ mut DocState, data: &'b FinalPhase, ) -> Result<(TempArena<'b>, CommonArena<'b>), LoroError> { - assert!(app_state.is_empty()); + // assert!(app_state.is_empty()); assert!(!app_state.is_in_txn()); let arena = app_state.arena.clone(); let common = CommonArena::decode(data)?; From 7b012dec0052f5001e28e219924a8b13679918e1 Mon Sep 17 00:00:00 2001 From: leeeon233 Date: Wed, 13 Sep 2023 21:39:12 +0800 Subject: [PATCH 4/9] fix: empty doc with pending decode snapshot --- crates/loro-internal/src/loro.rs | 3 +- crates/loro-internal/src/oplog.rs | 8 -- .../src/oplog/pending_changes.rs | 93 ++++++++++--------- crates/loro-internal/src/snapshot_encode.rs | 4 +- 4 files changed, 49 insertions(+), 59 deletions(-) diff --git a/crates/loro-internal/src/loro.rs b/crates/loro-internal/src/loro.rs index 6507699a..c5329a63 100644 --- a/crates/loro-internal/src/loro.rs +++ b/crates/loro-internal/src/loro.rs @@ -72,8 +72,7 @@ impl LoroDoc { /// Is the document empty? (no ops) #[inline(always)] pub fn is_empty(&self) -> bool { - true - // self.oplog.lock().unwrap().is_empty() && self.state.lock().unwrap().is_empty() + self.oplog.lock().unwrap().is_empty() && self.state.lock().unwrap().is_empty() } /// Whether [OpLog] ans [DocState] are detached. diff --git a/crates/loro-internal/src/oplog.rs b/crates/loro-internal/src/oplog.rs index a746891d..506a80b3 100644 --- a/crates/loro-internal/src/oplog.rs +++ b/crates/loro-internal/src/oplog.rs @@ -406,7 +406,6 @@ impl OpLog { changes: RemoteClientChanges, ) -> Result<(), LoroError> { // check whether we can append the new changes - // TODO: support pending changes let vv = &self.dag.vv; let local_changes = self.pending_changes.filter_and_pending_remote_changes( @@ -416,17 +415,10 @@ impl OpLog { )?; // TODO: should we check deps here? - // op_converter is faster than using arena directly self.apply_local_change_from_remote(local_changes); Ok(()) } - pub(crate) fn try_apply_pending_changes(&mut self) { - let vv = self.vv().clone(); - let changes = self.pending_changes.try_apply_pending_changes(&vv); - self.apply_local_change_from_remote(changes); - } - fn apply_local_change_from_remote(&mut self, local_changes: Vec) { if !local_changes.is_empty() { self.next_lamport = self diff --git a/crates/loro-internal/src/oplog/pending_changes.rs b/crates/loro-internal/src/oplog/pending_changes.rs index ad5923b2..ef17330f 100644 --- a/crates/loro-internal/src/oplog/pending_changes.rs +++ b/crates/loro-internal/src/oplog/pending_changes.rs @@ -13,7 +13,7 @@ pub(crate) struct PendingChanges { } impl PendingChanges { - /// when + #[allow(dead_code)] pub(crate) fn try_apply_pending_changes(&mut self, vv: &VersionVector) -> Vec { let mut can_be_applied_changes = Vec::new(); let last_vv = self.last_pending_vv.clone(); @@ -29,25 +29,6 @@ impl PendingChanges { can_be_applied_changes } - fn convert_remote_to_pending_op(change: Change, arena: &SharedArena) -> Change { - arena.with_op_converter(|converter| { - let mut ops = RleVec::new(); - for op in change.ops { - for content in op.contents.into_iter() { - let op = converter.convert_single_op(&op.container, op.counter, content); - ops.push(op); - } - } - Change { - ops, - id: change.id, - deps: change.deps, - lamport: change.lamport, - timestamp: change.timestamp, - } - }) - } - pub(crate) fn filter_and_pending_remote_changes( &mut self, remote_changes: RemoteClientChanges, @@ -63,7 +44,7 @@ impl PendingChanges { .flat_map(|c| c.into_iter()) .sorted_unstable_by_key(|c| c.lamport) { - let local_change = Self::convert_remote_to_pending_op(change, arena); + let local_change = convert_remote_to_pending_op(change, arena); if let Some(pre_dep) = peer_to_pending_dep.get(&local_change.id.peer) { self.pending_changes .get_mut(pre_dep) @@ -121,37 +102,56 @@ impl PendingChanges { } fn try_apply_pending(&mut self, id: &ID, can_be_applied_changes: &mut Vec) { - if let Some(may_apply_changes) = self.pending_changes.remove(id) { - let mut may_apply_iter = may_apply_changes - .into_iter() - .sorted_by(|a, b| a.lamport.cmp(&b.lamport)) - .peekable(); - while let Some(peek_c) = may_apply_iter.peek() { - match remote_change_apply_state(&self.last_pending_vv, peek_c) { - ChangeApplyState::Directly => { - let c = may_apply_iter.next().unwrap(); - let last_id = c.id_last(); - self.last_pending_vv.set_end(c.id_end()); - // other pending - can_be_applied_changes.push(c); - self.try_apply_pending(&last_id, can_be_applied_changes); - } - ChangeApplyState::Existing => { - may_apply_iter.next(); - } - ChangeApplyState::Future(id) => { - self.pending_changes - .entry(id) - .or_insert_with(Vec::new) - .extend(may_apply_iter); - break; - } + let Some(may_apply_changes) = self.pending_changes.remove(id) else{return;}; + let mut may_apply_iter = may_apply_changes + .into_iter() + .sorted_by(|a, b| a.lamport.cmp(&b.lamport)) + .peekable(); + while let Some(peek_c) = may_apply_iter.peek() { + match remote_change_apply_state(&self.last_pending_vv, peek_c) { + ChangeApplyState::Directly => { + let c = may_apply_iter.next().unwrap(); + let last_id = c.id_last(); + self.last_pending_vv.set_end(c.id_end()); + // other pending + can_be_applied_changes.push(c); + self.try_apply_pending(&last_id, can_be_applied_changes); + } + ChangeApplyState::Existing => { + may_apply_iter.next(); + } + ChangeApplyState::Future(id) => { + self.pending_changes + .entry(id) + .or_insert_with(Vec::new) + .extend(may_apply_iter); + break; } } } } } +fn convert_remote_to_pending_op(change: Change, arena: &SharedArena) -> Change { + // op_converter is faster than using arena directly + arena.with_op_converter(|converter| { + let mut ops = RleVec::new(); + for op in change.ops { + for content in op.contents.into_iter() { + let op = converter.convert_single_op(&op.container, op.counter, content); + ops.push(op); + } + } + Change { + ops, + id: change.id, + deps: change.deps, + lamport: change.lamport, + timestamp: change.timestamp, + } + }) +} + enum ChangeApplyState { Existing, Directly, @@ -226,6 +226,7 @@ mod test { let update2 = a.export_from(&version1); let _version2 = a.oplog_vv(); b.import(&update2).unwrap(); + // snapshot will be converted to updates b.import(&update1).unwrap(); assert_eq!(a.get_deep_value(), b.get_deep_value()); } diff --git a/crates/loro-internal/src/snapshot_encode.rs b/crates/loro-internal/src/snapshot_encode.rs index 4a43a9d6..fa6b9b4e 100644 --- a/crates/loro-internal/src/snapshot_encode.rs +++ b/crates/loro-internal/src/snapshot_encode.rs @@ -184,8 +184,6 @@ pub fn decode_oplog( change.lamport = lamport; oplog.import_local_change(change)?; } - // try to apply pending changes - oplog.try_apply_pending_changes(); Ok(()) } @@ -193,7 +191,7 @@ pub fn decode_state<'b>( app_state: &'_ mut DocState, data: &'b FinalPhase, ) -> Result<(TempArena<'b>, CommonArena<'b>), LoroError> { - // assert!(app_state.is_empty()); + assert!(app_state.is_empty()); assert!(!app_state.is_in_txn()); let arena = app_state.arena.clone(); let common = CommonArena::decode(data)?; From c294c61343d1fdf9b5b69e4aa9547284c85aa5ad Mon Sep 17 00:00:00 2001 From: leeeon233 Date: Mon, 18 Sep 2023 15:21:38 +0800 Subject: [PATCH 5/9] fix: op converter --- crates/loro-internal/src/oplog.rs | 14 ++- .../src/oplog/pending_changes.rs | 99 ++++++++++--------- 2 files changed, 57 insertions(+), 56 deletions(-) diff --git a/crates/loro-internal/src/oplog.rs b/crates/loro-internal/src/oplog.rs index 506a80b3..3dec43dc 100644 --- a/crates/loro-internal/src/oplog.rs +++ b/crates/loro-internal/src/oplog.rs @@ -406,26 +406,24 @@ impl OpLog { changes: RemoteClientChanges, ) -> Result<(), LoroError> { // check whether we can append the new changes - + self.pending_changes.check_changes(&changes)?; let vv = &self.dag.vv; - let local_changes = self.pending_changes.filter_and_pending_remote_changes( - changes, - &self.arena, - vv.clone(), - )?; + let local_changes = + self.pending_changes + .get_can_be_applied_changes(changes, &self.arena, vv.clone())?; // TODO: should we check deps here? self.apply_local_change_from_remote(local_changes); Ok(()) } - fn apply_local_change_from_remote(&mut self, local_changes: Vec) { + fn apply_local_change_from_remote(&mut self, mut local_changes: Vec) { + local_changes.sort_by_key(|x| x.lamport); if !local_changes.is_empty() { self.next_lamport = self .next_lamport .max(local_changes.last().unwrap().lamport_end()); } - // debug_dbg!(&change_causal_arr); for change in local_changes { self.dag.vv.extend_to_include_last_id(change.id_last()); diff --git a/crates/loro-internal/src/oplog/pending_changes.rs b/crates/loro-internal/src/oplog/pending_changes.rs index ef17330f..6cc416b2 100644 --- a/crates/loro-internal/src/oplog/pending_changes.rs +++ b/crates/loro-internal/src/oplog/pending_changes.rs @@ -1,5 +1,9 @@ use crate::{ - arena::SharedArena, change::Change, encoding::RemoteClientChanges, op::RemoteOp, VersionVector, + arena::{OpConverter, SharedArena}, + change::Change, + encoding::RemoteClientChanges, + op::RemoteOp, + VersionVector, }; use fxhash::FxHashMap; use itertools::Itertools; @@ -29,55 +33,57 @@ impl PendingChanges { can_be_applied_changes } - pub(crate) fn filter_and_pending_remote_changes( + pub(crate) fn get_can_be_applied_changes( &mut self, remote_changes: RemoteClientChanges, arena: &SharedArena, latest_vv: VersionVector, ) -> Result, LoroError> { - self.check_changes(&remote_changes)?; self.last_pending_vv = latest_vv; let mut can_be_applied_changes = Vec::new(); let mut peer_to_pending_dep = FxHashMap::default(); - for change in remote_changes - .into_values() - .flat_map(|c| c.into_iter()) - .sorted_unstable_by_key(|c| c.lamport) - { - let local_change = convert_remote_to_pending_op(change, arena); - if let Some(pre_dep) = peer_to_pending_dep.get(&local_change.id.peer) { - self.pending_changes - .get_mut(pre_dep) - .unwrap() - .push(local_change); - continue; - } - match remote_change_apply_state(&self.last_pending_vv, &local_change) { - ChangeApplyState::Directly => { - self.last_pending_vv.set_end(local_change.id_end()); - let id_last = local_change.id_last(); - can_be_applied_changes.push(local_change); - self.try_apply_pending(&id_last, &mut can_be_applied_changes); - } - ChangeApplyState::Existing => {} - ChangeApplyState::Future(id) => { - peer_to_pending_dep.insert(local_change.id.peer, id); + // op_converter is faster than using arena directly + arena.with_op_converter(|converter| { + for change in remote_changes + .into_values() + .flat_map(|c| c.into_iter()) + .sorted_unstable_by_key(|c| c.lamport) + { + let local_change = to_local_op(change, converter); + if let Some(pre_dep) = peer_to_pending_dep.get(&local_change.id.peer) { self.pending_changes - .entry(id) - .or_insert_with(Vec::new) + .get_mut(pre_dep) + .unwrap() .push(local_change); + continue; + } + match remote_change_apply_state(&self.last_pending_vv, &local_change) { + ChangeApplyState::Directly => { + self.last_pending_vv.set_end(local_change.id_end()); + let id_last = local_change.id_last(); + can_be_applied_changes.push(local_change); + self.try_apply_pending(&id_last, &mut can_be_applied_changes); + } + ChangeApplyState::Existing => {} + ChangeApplyState::Future(id) => { + peer_to_pending_dep.insert(local_change.id.peer, id); + self.pending_changes + .entry(id) + .or_insert_with(Vec::new) + .push(local_change); + } } } - } + }); Ok(can_be_applied_changes) } - fn check_changes(&self, changes: &RemoteClientChanges) -> Result<(), LoroError> { + pub(super) fn check_changes(&self, changes: &RemoteClientChanges) -> Result<(), LoroError> { for changes in changes.values() { if changes.is_empty() { continue; } - // detect invalid d + // detect invalid id let mut last_end_counter = None; for change in changes.iter() { if change.id.counter < 0 { @@ -132,24 +138,21 @@ impl PendingChanges { } } -fn convert_remote_to_pending_op(change: Change, arena: &SharedArena) -> Change { - // op_converter is faster than using arena directly - arena.with_op_converter(|converter| { - let mut ops = RleVec::new(); - for op in change.ops { - for content in op.contents.into_iter() { - let op = converter.convert_single_op(&op.container, op.counter, content); - ops.push(op); - } +fn to_local_op(change: Change, converter: &mut OpConverter) -> Change { + let mut ops = RleVec::new(); + for op in change.ops { + for content in op.contents.into_iter() { + let op = converter.convert_single_op(&op.container, op.counter, content); + ops.push(op); } - Change { - ops, - id: change.id, - deps: change.deps, - lamport: change.lamport, - timestamp: change.timestamp, - } - }) + } + Change { + ops, + id: change.id, + deps: change.deps, + lamport: change.lamport, + timestamp: change.timestamp, + } } enum ChangeApplyState { From a77bf2fcb3be552f789c547ec4e3491178e49de0 Mon Sep 17 00:00:00 2001 From: leeeon233 Date: Mon, 18 Sep 2023 17:38:16 +0800 Subject: [PATCH 6/9] fix: encode enhanced pending changes --- .../src/encoding/encode_changes.rs | 2 +- .../src/encoding/encode_enhanced.rs | 290 +++++++----------- .../src/encoding/encode_updates.rs | 2 +- crates/loro-internal/src/oplog.rs | 127 ++++---- .../src/oplog/pending_changes.rs | 203 ++++++------ 5 files changed, 279 insertions(+), 345 deletions(-) diff --git a/crates/loro-internal/src/encoding/encode_changes.rs b/crates/loro-internal/src/encoding/encode_changes.rs index 3aca7211..f053e33a 100644 --- a/crates/loro-internal/src/encoding/encode_changes.rs +++ b/crates/loro-internal/src/encoding/encode_changes.rs @@ -215,7 +215,7 @@ pub(super) fn encode_oplog_changes(oplog: &OpLog, vv: &VersionVector) -> Vec pub(crate) fn decode_oplog_changes(oplog: &mut OpLog, input: &[u8]) -> Result<(), LoroError> { let changes = decode_changes_to_inner_format_oplog(input, oplog)?; - oplog.import_remote_changes(changes)?; + oplog.import_remote_changes(changes, false)?; Ok(()) } diff --git a/crates/loro-internal/src/encoding/encode_enhanced.rs b/crates/loro-internal/src/encoding/encode_enhanced.rs index 29e8c099..c3e05d7f 100644 --- a/crates/loro-internal/src/encoding/encode_enhanced.rs +++ b/crates/loro-internal/src/encoding/encode_enhanced.rs @@ -1,12 +1,11 @@ use fxhash::{FxHashMap, FxHashSet}; -use loro_common::{HasCounterSpan, HasLamportSpan}; use rle::{HasLength, RleVec}; use serde_columnar::{columnar, iter_from_bytes, to_vec}; use std::{borrow::Cow, cmp::Ordering, ops::Deref, sync::Arc}; use zerovec::{vecs::Index32, VarZeroVec}; use crate::{ - change::{Change, Lamport, Timestamp}, + change::{Change, Timestamp}, container::text::text_content::ListSlice, container::{ idx::ContainerIdx, @@ -14,9 +13,10 @@ use crate::{ map::MapSet, ContainerID, ContainerType, }, + encoding::RemoteClientChanges, id::{Counter, PeerID, ID}, op::{RawOpContent, RemoteOp}, - oplog::{AppDagNode, OpLog}, + oplog::OpLog, span::HasId, version::Frontiers, InternalString, LoroError, LoroValue, VersionVector, @@ -396,200 +396,116 @@ pub fn decode_oplog_v2(oplog: &mut OpLog, input: &[u8]) -> Result<(), LoroError> let mut value_iter = values.into_iter(); let mut str_index = 0; - let changes = change_encodings - .map(|change_encoding| { - let counter = start_counter - .get_mut(change_encoding.peer_idx as usize) - .unwrap(); - let ChangeEncoding { - peer_idx, - timestamp, - op_len, - deps_len, - dep_on_self, - } = change_encoding; + let mut remote_changes = RemoteClientChanges::default(); + for change_encoding in change_encodings { + let counter = start_counter + .get_mut(change_encoding.peer_idx as usize) + .unwrap(); + let ChangeEncoding { + peer_idx, + timestamp, + op_len, + deps_len, + dep_on_self, + } = change_encoding; - let peer_id = peers[peer_idx as usize]; - let mut ops = RleVec::<[RemoteOp; 1]>::new(); - let mut delta = 0; - for op in op_iter.by_ref().take(op_len as usize) { - let OpEncoding { - container: container_idx, - prop, - is_del, - insert_del_len, - } = op; + let peer_id = peers[peer_idx as usize]; + let mut ops = RleVec::<[RemoteOp; 1]>::new(); + let mut delta = 0; + for op in op_iter.by_ref().take(op_len as usize) { + let OpEncoding { + container: container_idx, + prop, + is_del, + insert_del_len, + } = op; - let Some(container_id) = get_container(container_idx) else { + let Some(container_id) = get_container(container_idx) else { return Err(LoroError::DecodeError("".into())); }; - let container_type = container_id.container_type(); - let content = match container_type { - ContainerType::Map => { - let key = keys[prop].clone(); - RawOpContent::Map(MapSet { - key, - value: value_iter.next().unwrap(), - }) - } - ContainerType::List | ContainerType::Text => { - let pos = prop; - if is_del { - RawOpContent::List(ListOp::Delete(DeleteSpan { - pos: pos as isize, - len: insert_del_len, - })) - } else { - match container_type { - ContainerType::Text => { - let insert_len = insert_del_len as usize; - let s = &str[str_index..str_index + insert_len]; - str_index += insert_len; - RawOpContent::List(ListOp::Insert { - slice: ListSlice::from_borrowed_str(s), - pos, - }) - } - ContainerType::List => { - let value = value_iter.next().unwrap(); - RawOpContent::List(ListOp::Insert { - slice: ListSlice::RawData(Cow::Owned( - match Arc::try_unwrap(value.into_list().unwrap()) { - Ok(v) => v, - Err(v) => v.deref().clone(), - }, - )), - pos, - }) - } - ContainerType::Map => unreachable!(), + let container_type = container_id.container_type(); + let content = match container_type { + ContainerType::Map => { + let key = keys[prop].clone(); + RawOpContent::Map(MapSet { + key, + value: value_iter.next().unwrap(), + }) + } + ContainerType::List | ContainerType::Text => { + let pos = prop; + if is_del { + RawOpContent::List(ListOp::Delete(DeleteSpan { + pos: pos as isize, + len: insert_del_len, + })) + } else { + match container_type { + ContainerType::Text => { + let insert_len = insert_del_len as usize; + let s = &str[str_index..str_index + insert_len]; + str_index += insert_len; + RawOpContent::List(ListOp::Insert { + slice: ListSlice::from_borrowed_str(s), + pos, + }) } + ContainerType::List => { + let value = value_iter.next().unwrap(); + RawOpContent::List(ListOp::Insert { + slice: ListSlice::RawData(Cow::Owned( + match Arc::try_unwrap(value.into_list().unwrap()) { + Ok(v) => v, + Err(v) => v.deref().clone(), + }, + )), + pos, + }) + } + ContainerType::Map => unreachable!(), } } - }; - let remote_op = RemoteOp { - container: container_id, - counter: *counter + delta, - contents: vec![content].into(), - }; - delta += remote_op.content_len() as i32; - ops.push(remote_op); - } - - let mut deps: Frontiers = (0..deps_len) - .map(|_| { - let raw = deps_iter.next().unwrap(); - ID::new(peers[raw.client_idx as usize], raw.counter) - }) - .collect(); - if dep_on_self { - deps.push(ID::new(peer_id, *counter - 1)); - } - - let change = Change { - id: ID { - peer: peer_id, - counter: *counter, - }, - // calc lamport after parsing all changes - lamport: 0, - timestamp, - ops, - deps, + } }; - - *counter += delta; - Ok(change) - }) - .collect::, LoroError>>(); - let changes = match changes { - Ok(changes) => changes, - Err(err) => return Err(err), - }; - - oplog.arena.clone().with_op_converter(|converter| { - for mut change in changes { - if change.id.counter < oplog.vv().get(&change.id.peer).copied().unwrap_or(0) { - // skip included changes - continue; - } - - // calc lamport or pending if its deps are not satisfied - for dep in change.deps.iter() { - match oplog.dag.get_lamport(dep) { - Some(lamport) => { - change.lamport = change.lamport.max(lamport + 1); - } - None => { - todo!("pending") - } - } - } - - // convert change into inner format - let mut ops = RleVec::new(); - for op in change.ops { - for content in op.contents.into_iter() { - let op = converter.convert_single_op(&op.container, op.counter, content); - ops.push(op); - } - } - - let change = Change { - ops, - id: change.id, - deps: change.deps, - lamport: change.lamport, - timestamp: change.timestamp, + let remote_op = RemoteOp { + container: container_id, + counter: *counter + delta, + contents: vec![content].into(), }; - - // update dag and push the change - let len = change.content_len(); - if change.deps.len() == 1 && change.deps[0].peer == change.id.peer { - // don't need to push new element to dag because it only depends on itself - let nodes = oplog.dag.map.get_mut(&change.id.peer).unwrap(); - let last = nodes.vec_mut().last_mut().unwrap(); - assert_eq!(last.peer, change.id.peer); - assert_eq!(last.cnt + last.len as Counter, change.id.counter); - assert_eq!(last.lamport + last.len as Lamport, change.lamport); - last.len = change.id.counter as usize + len - last.cnt as usize; - last.has_succ = false; - } else { - let vv = oplog.dag.frontiers_to_im_vv(&change.deps); - oplog - .dag - .map - .entry(change.id.peer) - .or_default() - .push(AppDagNode { - vv, - peer: change.id.peer, - cnt: change.id.counter, - lamport: change.lamport, - deps: change.deps.clone(), - has_succ: false, - len, - }); - for dep in change.deps.iter() { - let target = oplog.dag.get_mut(*dep).unwrap(); - if target.ctr_last() == dep.counter { - target.has_succ = true; - } - } - } - oplog.next_lamport = oplog.next_lamport.max(change.lamport_end()); - oplog.latest_timestamp = oplog.latest_timestamp.max(change.timestamp); - oplog.dag.vv.extend_to_include_end_id(ID { - peer: change.id.peer, - counter: change.id.counter + change.atom_len() as Counter, - }); - oplog - .changes - .entry(change.id.peer) - .or_default() - .push(change); + delta += remote_op.content_len() as i32; + ops.push(remote_op); } - }); + + let mut deps: Frontiers = (0..deps_len) + .map(|_| { + let raw = deps_iter.next().unwrap(); + ID::new(peers[raw.client_idx as usize], raw.counter) + }) + .collect(); + if dep_on_self { + deps.push(ID::new(peer_id, *counter - 1)); + } + + let change = Change { + id: ID { + peer: peer_id, + counter: *counter, + }, + // calc lamport after parsing all changes + lamport: 0, + timestamp, + ops, + deps, + }; + + *counter += delta; + remote_changes + .entry(peer_id) + .or_insert_with(Vec::new) + .push(change); + } + + oplog.import_remote_changes(remote_changes, true)?; // update dag frontiers if !oplog.batch_importing { diff --git a/crates/loro-internal/src/encoding/encode_updates.rs b/crates/loro-internal/src/encoding/encode_updates.rs index 55db4b03..14bfeaa5 100644 --- a/crates/loro-internal/src/encoding/encode_updates.rs +++ b/crates/loro-internal/src/encoding/encode_updates.rs @@ -63,7 +63,7 @@ pub(crate) fn encode_oplog_updates(oplog: &OpLog, from: &VersionVector) -> Vec Result<(), LoroError> { let changes = decode_updates(updates)?; - oplog.import_remote_changes(changes)?; + oplog.import_remote_changes(changes, false)?; Ok(()) } diff --git a/crates/loro-internal/src/oplog.rs b/crates/loro-internal/src/oplog.rs index 3dec43dc..90805b6b 100644 --- a/crates/loro-internal/src/oplog.rs +++ b/crates/loro-internal/src/oplog.rs @@ -7,7 +7,7 @@ use std::cmp::Ordering; use std::rc::Rc; use fxhash::FxHashMap; -use loro_common::HasId; +use itertools::Itertools; use rle::{HasLength, RleVec}; // use tabled::measurment::Percent; @@ -22,7 +22,9 @@ use crate::span::{HasCounterSpan, HasIdSpan, HasLamportSpan}; use crate::version::{Frontiers, ImVersionVector, VersionVector}; use crate::LoroError; -use self::pending_changes::PendingChanges; +use self::pending_changes::{ + remote_change_apply_state, to_local_op, ChangeApplyState, PendingChanges, +}; use super::arena::SharedArena; @@ -113,6 +115,19 @@ impl AppDag { .map(|(peer, vec)| ID::new(*peer, vec.last().unwrap().ctr_last())) .collect(); } + + /// If the lamport of change can be calculated, return Ok, otherwise, Err + pub(crate) fn calc_unknown_lamport_change(&self, change: &mut Change) -> Result<(), ()> { + for dep in change.deps.iter() { + match self.get_lamport(dep) { + Some(lamport) => { + change.lamport = change.lamport.max(lamport + 1); + } + None => return Err(()), + } + } + Ok(()) + } } impl std::fmt::Debug for OpLog { @@ -403,71 +418,65 @@ impl OpLog { // They should also be continuous (TODO: check this) pub(crate) fn import_remote_changes( &mut self, - changes: RemoteClientChanges, + remote_changes: RemoteClientChanges, + unknown_lamport: bool, ) -> Result<(), LoroError> { // check whether we can append the new changes - self.pending_changes.check_changes(&changes)?; - let vv = &self.dag.vv; - let local_changes = - self.pending_changes - .get_can_be_applied_changes(changes, &self.arena, vv.clone())?; - - // TODO: should we check deps here? - self.apply_local_change_from_remote(local_changes); - Ok(()) - } - - fn apply_local_change_from_remote(&mut self, mut local_changes: Vec) { - local_changes.sort_by_key(|x| x.lamport); - if !local_changes.is_empty() { - self.next_lamport = self - .next_lamport - .max(local_changes.last().unwrap().lamport_end()); - } - // debug_dbg!(&change_causal_arr); - for change in local_changes { - self.dag.vv.extend_to_include_last_id(change.id_last()); - self.latest_timestamp = self.latest_timestamp.max(change.timestamp); - - let len = change.content_len(); - if change.deps.len() == 1 && change.deps[0].peer == change.id.peer { - // don't need to push new element to dag because it only depends on itself - let nodes = self.dag.map.get_mut(&change.id.peer).unwrap(); - let last = nodes.vec_mut().last_mut().unwrap(); - assert_eq!(last.peer, change.id.peer); - assert_eq!(last.cnt + last.len as Counter, change.id.counter); - assert_eq!(last.lamport + last.len as Lamport, change.lamport); - last.len = change.id.counter as usize + len - last.cnt as usize; - last.has_succ = false; - } else { - let vv = self.dag.frontiers_to_im_vv(&change.deps); - self.dag - .map - .entry(change.id.peer) - .or_default() - .push(AppDagNode { - vv, - peer: change.id.peer, - cnt: change.id.counter, - lamport: change.lamport, - deps: change.deps.clone(), - has_succ: false, - len, - }); - for dep in change.deps.iter() { - let target = self.dag.get_mut(*dep).unwrap(); - if target.ctr_last() == dep.counter { - target.has_succ = true; + self.check_changes(&remote_changes)?; + let mut latest_vv = self.dag.vv.clone(); + let mut peer_to_pending_dep = FxHashMap::default(); + // op_converter is faster than using arena directly + self.arena.clone().with_op_converter(|converter| { + for change in remote_changes + .into_values() + .flat_map(|c| c.into_iter()) + .sorted_unstable_by_key(|c| c.lamport) + { + let mut local_change = to_local_op(change, converter); + if !unknown_lamport { + if let Some(pre_dep) = peer_to_pending_dep.get(&local_change.id.peer) { + self.pending_changes + .pending_changes + .get_mut(pre_dep) + .unwrap() + .push(local_change); + continue; + } + } + match remote_change_apply_state(&latest_vv, &local_change) { + ChangeApplyState::Directly => { + latest_vv.set_end(local_change.id_end()); + let id_last = local_change.id_last(); + if unknown_lamport { + self.dag + .calc_unknown_lamport_change(&mut local_change) + .unwrap(); + } + self.apply_local_change_from_remote(local_change); + self.try_apply_pending(id_last, &mut latest_vv); + } + ChangeApplyState::Existing => {} + ChangeApplyState::Future(id) => { + if unknown_lamport { + self.pending_changes + .pending_unknown_lamport_changes + .insert(id, local_change); + } else { + peer_to_pending_dep.insert(local_change.id.peer, id); + self.pending_changes + .pending_changes + .entry(id) + .or_insert_with(Vec::new) + .push(local_change); + } } } } - - self.changes.entry(change.id.peer).or_default().push(change); - } - + }); if !self.batch_importing { self.dag.refresh_frontiers(); } + Ok(()) } /// lookup change by id. diff --git a/crates/loro-internal/src/oplog/pending_changes.rs b/crates/loro-internal/src/oplog/pending_changes.rs index 6cc416b2..cc0a7513 100644 --- a/crates/loro-internal/src/oplog/pending_changes.rs +++ b/crates/loro-internal/src/oplog/pending_changes.rs @@ -1,83 +1,23 @@ use crate::{ - arena::{OpConverter, SharedArena}, - change::Change, - encoding::RemoteClientChanges, - op::RemoteOp, + arena::OpConverter, change::Change, encoding::RemoteClientChanges, op::RemoteOp, OpLog, VersionVector, }; use fxhash::FxHashMap; use itertools::Itertools; -use loro_common::{CounterSpan, HasCounterSpan, HasIdSpan, LoroError, ID}; -use rle::RleVec; +use loro_common::{ + Counter, CounterSpan, HasCounterSpan, HasIdSpan, HasLamportSpan, Lamport, LoroError, ID, +}; +use rle::{HasLength, RleVec}; + +use super::AppDagNode; #[derive(Debug, Default)] pub(crate) struct PendingChanges { - pending_changes: FxHashMap>, - last_pending_vv: VersionVector, + pub(crate) pending_changes: FxHashMap>, + pub(crate) pending_unknown_lamport_changes: FxHashMap, } -impl PendingChanges { - #[allow(dead_code)] - pub(crate) fn try_apply_pending_changes(&mut self, vv: &VersionVector) -> Vec { - let mut can_be_applied_changes = Vec::new(); - let last_vv = self.last_pending_vv.clone(); - self.last_pending_vv = vv.clone(); - let ids: Vec<_> = self.pending_changes.keys().cloned().collect(); - for id in ids { - for id_span in vv.sub_iter(&last_vv) { - if id_span.contains(id) { - self.try_apply_pending(&id, &mut can_be_applied_changes); - } - } - } - can_be_applied_changes - } - - pub(crate) fn get_can_be_applied_changes( - &mut self, - remote_changes: RemoteClientChanges, - arena: &SharedArena, - latest_vv: VersionVector, - ) -> Result, LoroError> { - self.last_pending_vv = latest_vv; - let mut can_be_applied_changes = Vec::new(); - let mut peer_to_pending_dep = FxHashMap::default(); - // op_converter is faster than using arena directly - arena.with_op_converter(|converter| { - for change in remote_changes - .into_values() - .flat_map(|c| c.into_iter()) - .sorted_unstable_by_key(|c| c.lamport) - { - let local_change = to_local_op(change, converter); - if let Some(pre_dep) = peer_to_pending_dep.get(&local_change.id.peer) { - self.pending_changes - .get_mut(pre_dep) - .unwrap() - .push(local_change); - continue; - } - match remote_change_apply_state(&self.last_pending_vv, &local_change) { - ChangeApplyState::Directly => { - self.last_pending_vv.set_end(local_change.id_end()); - let id_last = local_change.id_last(); - can_be_applied_changes.push(local_change); - self.try_apply_pending(&id_last, &mut can_be_applied_changes); - } - ChangeApplyState::Existing => {} - ChangeApplyState::Future(id) => { - peer_to_pending_dep.insert(local_change.id.peer, id); - self.pending_changes - .entry(id) - .or_insert_with(Vec::new) - .push(local_change); - } - } - } - }); - Ok(can_be_applied_changes) - } - +impl OpLog { pub(super) fn check_changes(&self, changes: &RemoteClientChanges) -> Result<(), LoroError> { for changes in changes.values() { if changes.is_empty() { @@ -107,38 +47,107 @@ impl PendingChanges { Ok(()) } - fn try_apply_pending(&mut self, id: &ID, can_be_applied_changes: &mut Vec) { - let Some(may_apply_changes) = self.pending_changes.remove(id) else{return;}; - let mut may_apply_iter = may_apply_changes - .into_iter() - .sorted_by(|a, b| a.lamport.cmp(&b.lamport)) - .peekable(); - while let Some(peek_c) = may_apply_iter.peek() { - match remote_change_apply_state(&self.last_pending_vv, peek_c) { - ChangeApplyState::Directly => { - let c = may_apply_iter.next().unwrap(); - let last_id = c.id_last(); - self.last_pending_vv.set_end(c.id_end()); - // other pending - can_be_applied_changes.push(c); - self.try_apply_pending(&last_id, can_be_applied_changes); + pub(super) fn try_apply_pending(&mut self, id: ID, latest_vv: &mut VersionVector) { + let mut id_stack = vec![id]; + while let Some(id) = id_stack.pop() { + if let Some(may_apply_changes) = self.pending_changes.pending_changes.remove(&id) { + let mut may_apply_iter = may_apply_changes + .into_iter() + .sorted_unstable_by_key(|a| a.lamport) + .peekable(); + while let Some(peek_c) = may_apply_iter.peek() { + match remote_change_apply_state(latest_vv, peek_c) { + ChangeApplyState::Directly => { + let c = may_apply_iter.next().unwrap(); + let last_id = c.id_last(); + latest_vv.set_end(c.id_end()); + self.apply_local_change_from_remote(c); + // other pending + id_stack.push(last_id); + } + ChangeApplyState::Existing => { + may_apply_iter.next(); + } + ChangeApplyState::Future(id) => { + self.pending_changes + .pending_changes + .entry(id) + .or_insert_with(Vec::new) + .extend(may_apply_iter); + break; + } + } } - ChangeApplyState::Existing => { - may_apply_iter.next(); - } - ChangeApplyState::Future(id) => { - self.pending_changes - .entry(id) - .or_insert_with(Vec::new) - .extend(may_apply_iter); - break; + } + if let Some(mut unknown_lamport_change) = self + .pending_changes + .pending_unknown_lamport_changes + .remove(&id) + { + match remote_change_apply_state(latest_vv, &unknown_lamport_change) { + ChangeApplyState::Directly => { + let last_id = unknown_lamport_change.id_last(); + latest_vv.set_end(unknown_lamport_change.id_end()); + self.dag + .calc_unknown_lamport_change(&mut unknown_lamport_change) + .unwrap(); + self.apply_local_change_from_remote(unknown_lamport_change); + id_stack.push(last_id); + } + ChangeApplyState::Existing => unreachable!(), + ChangeApplyState::Future(id) => { + self.pending_changes + .pending_unknown_lamport_changes + .insert(id, unknown_lamport_change); + } } } } } + + pub(super) fn apply_local_change_from_remote(&mut self, change: Change) { + self.next_lamport = self.next_lamport.max(change.lamport_end()); + // debug_dbg!(&change_causal_arr); + self.dag.vv.extend_to_include_last_id(change.id_last()); + self.latest_timestamp = self.latest_timestamp.max(change.timestamp); + + let len = change.content_len(); + if change.deps.len() == 1 && change.deps[0].peer == change.id.peer { + // don't need to push new element to dag because it only depends on itself + let nodes = self.dag.map.get_mut(&change.id.peer).unwrap(); + let last = nodes.vec_mut().last_mut().unwrap(); + assert_eq!(last.peer, change.id.peer); + assert_eq!(last.cnt + last.len as Counter, change.id.counter); + assert_eq!(last.lamport + last.len as Lamport, change.lamport); + last.len = change.id.counter as usize + len - last.cnt as usize; + last.has_succ = false; + } else { + let vv = self.dag.frontiers_to_im_vv(&change.deps); + self.dag + .map + .entry(change.id.peer) + .or_default() + .push(AppDagNode { + vv, + peer: change.id.peer, + cnt: change.id.counter, + lamport: change.lamport, + deps: change.deps.clone(), + has_succ: false, + len, + }); + for dep in change.deps.iter() { + let target = self.dag.get_mut(*dep).unwrap(); + if target.ctr_last() == dep.counter { + target.has_succ = true; + } + } + } + self.changes.entry(change.id.peer).or_default().push(change); + } } -fn to_local_op(change: Change, converter: &mut OpConverter) -> Change { +pub(super) fn to_local_op(change: Change, converter: &mut OpConverter) -> Change { let mut ops = RleVec::new(); for op in change.ops { for content in op.contents.into_iter() { @@ -155,14 +164,14 @@ fn to_local_op(change: Change, converter: &mut OpConverter) -> Change } } -enum ChangeApplyState { +pub enum ChangeApplyState { Existing, Directly, // The id of first missing dep Future(ID), } -fn remote_change_apply_state(vv: &VersionVector, change: &Change) -> ChangeApplyState { +pub(super) fn remote_change_apply_state(vv: &VersionVector, change: &Change) -> ChangeApplyState { let peer = change.id.peer; let CounterSpan { start, end } = change.ctr_span(); let vv_latest_ctr = vv.get(&peer).copied().unwrap_or(0); From 05dc62a31c1636aede14b102e180730fb0d0a7ba Mon Sep 17 00:00:00 2001 From: leeeon233 Date: Mon, 18 Sep 2023 21:35:43 +0800 Subject: [PATCH 7/9] fix: revert enhanced encoding --- .../src/encoding/encode_changes.rs | 2 +- .../src/encoding/encode_enhanced.rs | 292 ++++++++++++------ .../src/encoding/encode_updates.rs | 2 +- crates/loro-internal/src/oplog.rs | 44 +-- 4 files changed, 207 insertions(+), 133 deletions(-) diff --git a/crates/loro-internal/src/encoding/encode_changes.rs b/crates/loro-internal/src/encoding/encode_changes.rs index f053e33a..3aca7211 100644 --- a/crates/loro-internal/src/encoding/encode_changes.rs +++ b/crates/loro-internal/src/encoding/encode_changes.rs @@ -215,7 +215,7 @@ pub(super) fn encode_oplog_changes(oplog: &OpLog, vv: &VersionVector) -> Vec pub(crate) fn decode_oplog_changes(oplog: &mut OpLog, input: &[u8]) -> Result<(), LoroError> { let changes = decode_changes_to_inner_format_oplog(input, oplog)?; - oplog.import_remote_changes(changes, false)?; + oplog.import_remote_changes(changes)?; Ok(()) } diff --git a/crates/loro-internal/src/encoding/encode_enhanced.rs b/crates/loro-internal/src/encoding/encode_enhanced.rs index c3e05d7f..96154d3d 100644 --- a/crates/loro-internal/src/encoding/encode_enhanced.rs +++ b/crates/loro-internal/src/encoding/encode_enhanced.rs @@ -1,11 +1,12 @@ use fxhash::{FxHashMap, FxHashSet}; +use loro_common::{HasCounterSpan, HasLamportSpan}; use rle::{HasLength, RleVec}; use serde_columnar::{columnar, iter_from_bytes, to_vec}; use std::{borrow::Cow, cmp::Ordering, ops::Deref, sync::Arc}; use zerovec::{vecs::Index32, VarZeroVec}; use crate::{ - change::{Change, Timestamp}, + change::{Change, Lamport, Timestamp}, container::text::text_content::ListSlice, container::{ idx::ContainerIdx, @@ -13,10 +14,9 @@ use crate::{ map::MapSet, ContainerID, ContainerType, }, - encoding::RemoteClientChanges, id::{Counter, PeerID, ID}, op::{RawOpContent, RemoteOp}, - oplog::OpLog, + oplog::{AppDagNode, OpLog}, span::HasId, version::Frontiers, InternalString, LoroError, LoroValue, VersionVector, @@ -396,116 +396,202 @@ pub fn decode_oplog_v2(oplog: &mut OpLog, input: &[u8]) -> Result<(), LoroError> let mut value_iter = values.into_iter(); let mut str_index = 0; - let mut remote_changes = RemoteClientChanges::default(); - for change_encoding in change_encodings { - let counter = start_counter - .get_mut(change_encoding.peer_idx as usize) - .unwrap(); - let ChangeEncoding { - peer_idx, - timestamp, - op_len, - deps_len, - dep_on_self, - } = change_encoding; + let changes = change_encodings + .map(|change_encoding| { + let counter = start_counter + .get_mut(change_encoding.peer_idx as usize) + .unwrap(); + let ChangeEncoding { + peer_idx, + timestamp, + op_len, + deps_len, + dep_on_self, + } = change_encoding; - let peer_id = peers[peer_idx as usize]; - let mut ops = RleVec::<[RemoteOp; 1]>::new(); - let mut delta = 0; - for op in op_iter.by_ref().take(op_len as usize) { - let OpEncoding { - container: container_idx, - prop, - is_del, - insert_del_len, - } = op; + let peer_id = peers[peer_idx as usize]; + let mut ops = RleVec::<[RemoteOp; 1]>::new(); + let mut delta = 0; + for op in op_iter.by_ref().take(op_len as usize) { + let OpEncoding { + container: container_idx, + prop, + is_del, + insert_del_len, + } = op; - let Some(container_id) = get_container(container_idx) else { + let Some(container_id) = get_container(container_idx) else { return Err(LoroError::DecodeError("".into())); }; - let container_type = container_id.container_type(); - let content = match container_type { - ContainerType::Map => { - let key = keys[prop].clone(); - RawOpContent::Map(MapSet { - key, - value: value_iter.next().unwrap(), - }) - } - ContainerType::List | ContainerType::Text => { - let pos = prop; - if is_del { - RawOpContent::List(ListOp::Delete(DeleteSpan { - pos: pos as isize, - len: insert_del_len, - })) - } else { - match container_type { - ContainerType::Text => { - let insert_len = insert_del_len as usize; - let s = &str[str_index..str_index + insert_len]; - str_index += insert_len; - RawOpContent::List(ListOp::Insert { - slice: ListSlice::from_borrowed_str(s), - pos, - }) + let container_type = container_id.container_type(); + let content = match container_type { + ContainerType::Map => { + let key = keys[prop].clone(); + RawOpContent::Map(MapSet { + key, + value: value_iter.next().unwrap(), + }) + } + ContainerType::List | ContainerType::Text => { + let pos = prop; + if is_del { + RawOpContent::List(ListOp::Delete(DeleteSpan { + pos: pos as isize, + len: insert_del_len, + })) + } else { + match container_type { + ContainerType::Text => { + let insert_len = insert_del_len as usize; + let s = &str[str_index..str_index + insert_len]; + str_index += insert_len; + RawOpContent::List(ListOp::Insert { + slice: ListSlice::from_borrowed_str(s), + pos, + }) + } + ContainerType::List => { + let value = value_iter.next().unwrap(); + RawOpContent::List(ListOp::Insert { + slice: ListSlice::RawData(Cow::Owned( + match Arc::try_unwrap(value.into_list().unwrap()) { + Ok(v) => v, + Err(v) => v.deref().clone(), + }, + )), + pos, + }) + } + ContainerType::Map => unreachable!(), } - ContainerType::List => { - let value = value_iter.next().unwrap(); - RawOpContent::List(ListOp::Insert { - slice: ListSlice::RawData(Cow::Owned( - match Arc::try_unwrap(value.into_list().unwrap()) { - Ok(v) => v, - Err(v) => v.deref().clone(), - }, - )), - pos, - }) - } - ContainerType::Map => unreachable!(), } } + }; + let remote_op = RemoteOp { + container: container_id, + counter: *counter + delta, + contents: vec![content].into(), + }; + delta += remote_op.content_len() as i32; + ops.push(remote_op); + } + + let mut deps: Frontiers = (0..deps_len) + .map(|_| { + let raw = deps_iter.next().unwrap(); + ID::new(peers[raw.client_idx as usize], raw.counter) + }) + .collect(); + if dep_on_self { + deps.push(ID::new(peer_id, *counter - 1)); + } + + let change = Change { + id: ID { + peer: peer_id, + counter: *counter, + }, + // calc lamport after parsing all changes + lamport: 0, + timestamp, + ops, + deps, + }; + + *counter += delta; + Ok(change) + }) + .collect::, LoroError>>(); + let changes = match changes { + Ok(changes) => changes, + Err(err) => return Err(err), + }; + + oplog.arena.clone().with_op_converter(|converter| { + 'outer: for change in changes { + if change.id.counter < oplog.vv().get(&change.id.peer).copied().unwrap_or(0) { + // skip included changes + continue; + } + + // convert change into inner format + let mut ops = RleVec::new(); + for op in change.ops { + for content in op.contents.into_iter() { + let op = converter.convert_single_op(&op.container, op.counter, content); + ops.push(op); } + } + + let mut change = Change { + ops, + id: change.id, + deps: change.deps, + lamport: change.lamport, + timestamp: change.timestamp, }; - let remote_op = RemoteOp { - container: container_id, - counter: *counter + delta, - contents: vec![content].into(), - }; - delta += remote_op.content_len() as i32; - ops.push(remote_op); + // calc lamport or pending if its deps are not satisfied + for dep in change.deps.iter() { + match oplog.dag.get_lamport(dep) { + Some(lamport) => { + change.lamport = change.lamport.max(lamport + 1); + } + None => { + oplog + .pending_changes + .pending_unknown_lamport_changes + .insert(*dep, change); + continue 'outer; + } + } + } + // update dag and push the change + let len = change.content_len(); + if change.deps.len() == 1 && change.deps[0].peer == change.id.peer { + // don't need to push new element to dag because it only depends on itself + let nodes = oplog.dag.map.get_mut(&change.id.peer).unwrap(); + let last = nodes.vec_mut().last_mut().unwrap(); + assert_eq!(last.peer, change.id.peer); + assert_eq!(last.cnt + last.len as Counter, change.id.counter); + assert_eq!(last.lamport + last.len as Lamport, change.lamport); + last.len = change.id.counter as usize + len - last.cnt as usize; + last.has_succ = false; + } else { + let vv = oplog.dag.frontiers_to_im_vv(&change.deps); + oplog + .dag + .map + .entry(change.id.peer) + .or_default() + .push(AppDagNode { + vv, + peer: change.id.peer, + cnt: change.id.counter, + lamport: change.lamport, + deps: change.deps.clone(), + has_succ: false, + len, + }); + for dep in change.deps.iter() { + let target = oplog.dag.get_mut(*dep).unwrap(); + if target.ctr_last() == dep.counter { + target.has_succ = true; + } + } + } + oplog.next_lamport = oplog.next_lamport.max(change.lamport_end()); + oplog.latest_timestamp = oplog.latest_timestamp.max(change.timestamp); + oplog.dag.vv.extend_to_include_end_id(ID { + peer: change.id.peer, + counter: change.id.counter + change.atom_len() as Counter, + }); + oplog + .changes + .entry(change.id.peer) + .or_default() + .push(change); } - - let mut deps: Frontiers = (0..deps_len) - .map(|_| { - let raw = deps_iter.next().unwrap(); - ID::new(peers[raw.client_idx as usize], raw.counter) - }) - .collect(); - if dep_on_self { - deps.push(ID::new(peer_id, *counter - 1)); - } - - let change = Change { - id: ID { - peer: peer_id, - counter: *counter, - }, - // calc lamport after parsing all changes - lamport: 0, - timestamp, - ops, - deps, - }; - - *counter += delta; - remote_changes - .entry(peer_id) - .or_insert_with(Vec::new) - .push(change); - } - - oplog.import_remote_changes(remote_changes, true)?; + }); // update dag frontiers if !oplog.batch_importing { diff --git a/crates/loro-internal/src/encoding/encode_updates.rs b/crates/loro-internal/src/encoding/encode_updates.rs index 14bfeaa5..55db4b03 100644 --- a/crates/loro-internal/src/encoding/encode_updates.rs +++ b/crates/loro-internal/src/encoding/encode_updates.rs @@ -63,7 +63,7 @@ pub(crate) fn encode_oplog_updates(oplog: &OpLog, from: &VersionVector) -> Vec Result<(), LoroError> { let changes = decode_updates(updates)?; - oplog.import_remote_changes(changes, false)?; + oplog.import_remote_changes(changes)?; Ok(()) } diff --git a/crates/loro-internal/src/oplog.rs b/crates/loro-internal/src/oplog.rs index 90805b6b..e9f93bbc 100644 --- a/crates/loro-internal/src/oplog.rs +++ b/crates/loro-internal/src/oplog.rs @@ -45,7 +45,7 @@ pub struct OpLog { /// Pending changes that haven't been applied to the dag. /// A change can be imported only when all its deps are already imported. /// Key is the ID of the missing dep - pending_changes: PendingChanges, + pub(crate) pending_changes: PendingChanges, /// Whether we are importing a batch of changes. /// If so the Dag's frontiers won't be updated until the batch is finished. pub(crate) batch_importing: bool, @@ -419,7 +419,6 @@ impl OpLog { pub(crate) fn import_remote_changes( &mut self, remote_changes: RemoteClientChanges, - unknown_lamport: bool, ) -> Result<(), LoroError> { // check whether we can append the new changes self.check_changes(&remote_changes)?; @@ -433,42 +432,31 @@ impl OpLog { .sorted_unstable_by_key(|c| c.lamport) { let mut local_change = to_local_op(change, converter); - if !unknown_lamport { - if let Some(pre_dep) = peer_to_pending_dep.get(&local_change.id.peer) { - self.pending_changes - .pending_changes - .get_mut(pre_dep) - .unwrap() - .push(local_change); - continue; - } + + if let Some(pre_dep) = peer_to_pending_dep.get(&local_change.id.peer) { + self.pending_changes + .pending_changes + .get_mut(pre_dep) + .unwrap() + .push(local_change); + continue; } + match remote_change_apply_state(&latest_vv, &local_change) { ChangeApplyState::Directly => { latest_vv.set_end(local_change.id_end()); let id_last = local_change.id_last(); - if unknown_lamport { - self.dag - .calc_unknown_lamport_change(&mut local_change) - .unwrap(); - } self.apply_local_change_from_remote(local_change); self.try_apply_pending(id_last, &mut latest_vv); } ChangeApplyState::Existing => {} ChangeApplyState::Future(id) => { - if unknown_lamport { - self.pending_changes - .pending_unknown_lamport_changes - .insert(id, local_change); - } else { - peer_to_pending_dep.insert(local_change.id.peer, id); - self.pending_changes - .pending_changes - .entry(id) - .or_insert_with(Vec::new) - .push(local_change); - } + peer_to_pending_dep.insert(local_change.id.peer, id); + self.pending_changes + .pending_changes + .entry(id) + .or_insert_with(Vec::new) + .push(local_change); } } } From caaaa666aee14cff0b6da327f6745a27dbc0dc0b Mon Sep 17 00:00:00 2001 From: leeeon233 Date: Mon, 18 Sep 2023 21:57:51 +0800 Subject: [PATCH 8/9] chore: add pending bench --- crates/loro-internal/Cargo.toml | 4 + crates/loro-internal/benches/pending.rs | 45 +++++ .../src/encoding/encode_enhanced.rs | 39 +++-- crates/loro-internal/src/oplog.rs | 58 ++----- .../src/oplog/pending_changes.rs | 159 ++++++++++++------ 5 files changed, 191 insertions(+), 114 deletions(-) create mode 100644 crates/loro-internal/benches/pending.rs diff --git a/crates/loro-internal/Cargo.toml b/crates/loro-internal/Cargo.toml index a46f6bbc..587edf65 100644 --- a/crates/loro-internal/Cargo.toml +++ b/crates/loro-internal/Cargo.toml @@ -79,3 +79,7 @@ harness = false [[bench]] name = "encode" harness = false + +[[bench]] +name = "pending" +harness = false diff --git a/crates/loro-internal/benches/pending.rs b/crates/loro-internal/benches/pending.rs new file mode 100644 index 00000000..5be70a7f --- /dev/null +++ b/crates/loro-internal/benches/pending.rs @@ -0,0 +1,45 @@ +use criterion::{criterion_group, criterion_main, Criterion}; +#[cfg(feature = "test_utils")] +mod pending { + use super::*; + use bench_utils::TextAction; + use loro_internal::{LoroDoc, VersionVector}; + + pub fn b4(c: &mut Criterion) { + let mut b = c.benchmark_group("B4 pending decode"); + b.sample_size(10); + b.bench_function("detached mode", |b| { + let loro = LoroDoc::default(); + let mut latest_vv = VersionVector::default(); + let mut updates = vec![]; + let actions = bench_utils::get_automerge_actions(); + let action_length = actions.len(); + let text = loro.get_text("text"); + for chunks in actions.chunks(action_length / 5) { + for TextAction { pos, ins, del } in chunks { + let mut txn = loro.txn().unwrap(); + text.delete(&mut txn, *pos, *del).unwrap(); + text.insert(&mut txn, *pos, ins).unwrap(); + updates.push(loro.export_from(&latest_vv)); + latest_vv = loro.oplog_vv(); + } + } + updates.reverse(); + b.iter(|| { + let mut store2 = LoroDoc::default(); + store2.detach(); + for update in updates.iter() { + store2.import(update).unwrap(); + } + }) + }); + } +} + +pub fn dumb(_c: &mut Criterion) {} + +#[cfg(feature = "test_utils")] +criterion_group!(benches, pending::b4); +#[cfg(not(feature = "test_utils"))] +criterion_group!(benches, dumb); +criterion_main!(benches); diff --git a/crates/loro-internal/src/encoding/encode_enhanced.rs b/crates/loro-internal/src/encoding/encode_enhanced.rs index 96154d3d..db9371ba 100644 --- a/crates/loro-internal/src/encoding/encode_enhanced.rs +++ b/crates/loro-internal/src/encoding/encode_enhanced.rs @@ -506,14 +506,27 @@ pub fn decode_oplog_v2(oplog: &mut OpLog, input: &[u8]) -> Result<(), LoroError> Ok(changes) => changes, Err(err) => return Err(err), }; - + let mut pending_remote_changes = Vec::new(); oplog.arena.clone().with_op_converter(|converter| { - 'outer: for change in changes { + 'outer: for mut change in changes { if change.id.counter < oplog.vv().get(&change.id.peer).copied().unwrap_or(0) { // skip included changes continue; } + // calc lamport or pending if its deps are not satisfied + for dep in change.deps.iter() { + match oplog.dag.get_lamport(dep) { + Some(lamport) => { + change.lamport = change.lamport.max(lamport + 1); + } + None => { + pending_remote_changes.push(change); + continue 'outer; + } + } + } + // convert change into inner format let mut ops = RleVec::new(); for op in change.ops { @@ -523,28 +536,14 @@ pub fn decode_oplog_v2(oplog: &mut OpLog, input: &[u8]) -> Result<(), LoroError> } } - let mut change = Change { + let change = Change { ops, id: change.id, deps: change.deps, lamport: change.lamport, timestamp: change.timestamp, }; - // calc lamport or pending if its deps are not satisfied - for dep in change.deps.iter() { - match oplog.dag.get_lamport(dep) { - Some(lamport) => { - change.lamport = change.lamport.max(lamport + 1); - } - None => { - oplog - .pending_changes - .pending_unknown_lamport_changes - .insert(*dep, change); - continue 'outer; - } - } - } + // update dag and push the change let len = change.content_len(); if change.deps.len() == 1 && change.deps[0].peer == change.id.peer { @@ -592,11 +591,11 @@ pub fn decode_oplog_v2(oplog: &mut OpLog, input: &[u8]) -> Result<(), LoroError> .push(change); } }); - - // update dag frontiers if !oplog.batch_importing { oplog.dag.refresh_frontiers(); } + + oplog.import_unknown_lamport_remote_changes(pending_remote_changes)?; assert_eq!(str_index, str.len()); Ok(()) } diff --git a/crates/loro-internal/src/oplog.rs b/crates/loro-internal/src/oplog.rs index e9f93bbc..b4918a1e 100644 --- a/crates/loro-internal/src/oplog.rs +++ b/crates/loro-internal/src/oplog.rs @@ -7,7 +7,6 @@ use std::cmp::Ordering; use std::rc::Rc; use fxhash::FxHashMap; -use itertools::Itertools; use rle::{HasLength, RleVec}; // use tabled::measurment::Percent; @@ -22,9 +21,7 @@ use crate::span::{HasCounterSpan, HasIdSpan, HasLamportSpan}; use crate::version::{Frontiers, ImVersionVector, VersionVector}; use crate::LoroError; -use self::pending_changes::{ - remote_change_apply_state, to_local_op, ChangeApplyState, PendingChanges, -}; +use self::pending_changes::PendingChanges; use super::arena::SharedArena; @@ -422,51 +419,30 @@ impl OpLog { ) -> Result<(), LoroError> { // check whether we can append the new changes self.check_changes(&remote_changes)?; - let mut latest_vv = self.dag.vv.clone(); - let mut peer_to_pending_dep = FxHashMap::default(); + let latest_vv = self.dag.vv.clone(); // op_converter is faster than using arena directly - self.arena.clone().with_op_converter(|converter| { - for change in remote_changes - .into_values() - .flat_map(|c| c.into_iter()) - .sorted_unstable_by_key(|c| c.lamport) - { - let mut local_change = to_local_op(change, converter); - - if let Some(pre_dep) = peer_to_pending_dep.get(&local_change.id.peer) { - self.pending_changes - .pending_changes - .get_mut(pre_dep) - .unwrap() - .push(local_change); - continue; - } - - match remote_change_apply_state(&latest_vv, &local_change) { - ChangeApplyState::Directly => { - latest_vv.set_end(local_change.id_end()); - let id_last = local_change.id_last(); - self.apply_local_change_from_remote(local_change); - self.try_apply_pending(id_last, &mut latest_vv); - } - ChangeApplyState::Existing => {} - ChangeApplyState::Future(id) => { - peer_to_pending_dep.insert(local_change.id.peer, id); - self.pending_changes - .pending_changes - .entry(id) - .or_insert_with(Vec::new) - .push(local_change); - } - } - } + let ids = self.arena.clone().with_op_converter(|converter| { + self.calc_pending_changes(remote_changes, converter, latest_vv) }); + let mut latest_vv = self.dag.vv.clone(); + self.try_apply_pending(ids, &mut latest_vv); if !self.batch_importing { self.dag.refresh_frontiers(); } Ok(()) } + pub(crate) fn import_unknown_lamport_remote_changes( + &mut self, + remote_changes: Vec>, + ) -> Result<(), LoroError> { + let latest_vv = self.dag.vv.clone(); + self.arena.clone().with_op_converter(|converter| { + self.extend_unknown_pending_changes(remote_changes, converter, &latest_vv) + }); + Ok(()) + } + /// lookup change by id. /// /// if id does not included in this oplog, return None diff --git a/crates/loro-internal/src/oplog/pending_changes.rs b/crates/loro-internal/src/oplog/pending_changes.rs index cc0a7513..465820d6 100644 --- a/crates/loro-internal/src/oplog/pending_changes.rs +++ b/crates/loro-internal/src/oplog/pending_changes.rs @@ -1,3 +1,5 @@ +use std::ops::Deref; + use crate::{ arena::OpConverter, change::Change, encoding::RemoteClientChanges, op::RemoteOp, OpLog, VersionVector, @@ -8,13 +10,88 @@ use loro_common::{ Counter, CounterSpan, HasCounterSpan, HasIdSpan, HasLamportSpan, Lamport, LoroError, ID, }; use rle::{HasLength, RleVec}; +use smallvec::SmallVec; use super::AppDagNode; +#[derive(Debug)] +pub enum PendingChange { + // The lamport of the change decoded by `enhanced` is unknown. + // we need calculate it when the change can be applied + Unknown(Change), + Known(Change), +} + +impl Deref for PendingChange { + type Target = Change; + fn deref(&self) -> &Self::Target { + match self { + Self::Unknown(a) => a, + Self::Known(a) => a, + } + } +} + #[derive(Debug, Default)] pub(crate) struct PendingChanges { - pub(crate) pending_changes: FxHashMap>, - pub(crate) pending_unknown_lamport_changes: FxHashMap, + changes: FxHashMap>, +} + +impl OpLog { + // calculate all `id_last`(s) whose change can be applied + pub(super) fn calc_pending_changes( + &mut self, + remote_changes: RemoteClientChanges, + converter: &mut OpConverter, + mut latest_vv: VersionVector, + ) -> Vec { + let mut ans = Vec::new(); + for change in remote_changes + .into_values() + .filter(|c| !c.is_empty()) + .flat_map(|c| c.into_iter()) + .sorted_unstable_by_key(|c| c.lamport) + { + let local_change = to_local_op(change, converter); + let local_change = PendingChange::Known(local_change); + match remote_change_apply_state(&latest_vv, &local_change) { + ChangeApplyState::Directly => { + latest_vv.set_end(local_change.id_end()); + ans.push(local_change.id_last()); + self.apply_local_change_from_remote(local_change); + } + ChangeApplyState::Existing => {} + ChangeApplyState::Future(miss_dep) => self + .pending_changes + .changes + .entry(miss_dep) + .or_insert_with(SmallVec::new) + .push(local_change), + } + } + ans + } + + pub(super) fn extend_unknown_pending_changes( + &mut self, + remote_changes: Vec>, + converter: &mut OpConverter, + latest_vv: &VersionVector, + ) { + for change in remote_changes { + let local_change = to_local_op(change, converter); + let local_change = PendingChange::Unknown(local_change); + match remote_change_apply_state(latest_vv, &local_change) { + ChangeApplyState::Future(miss_dep) => self + .pending_changes + .changes + .entry(miss_dep) + .or_insert_with(SmallVec::new) + .push(local_change), + _ => unreachable!(), + } + } + } } impl OpLog { @@ -47,65 +124,41 @@ impl OpLog { Ok(()) } - pub(super) fn try_apply_pending(&mut self, id: ID, latest_vv: &mut VersionVector) { - let mut id_stack = vec![id]; + pub(super) fn try_apply_pending( + &mut self, + mut id_stack: Vec, + latest_vv: &mut VersionVector, + ) { while let Some(id) = id_stack.pop() { - if let Some(may_apply_changes) = self.pending_changes.pending_changes.remove(&id) { - let mut may_apply_iter = may_apply_changes - .into_iter() - .sorted_unstable_by_key(|a| a.lamport) - .peekable(); - while let Some(peek_c) = may_apply_iter.peek() { - match remote_change_apply_state(latest_vv, peek_c) { - ChangeApplyState::Directly => { - let c = may_apply_iter.next().unwrap(); - let last_id = c.id_last(); - latest_vv.set_end(c.id_end()); - self.apply_local_change_from_remote(c); - // other pending - id_stack.push(last_id); - } - ChangeApplyState::Existing => { - may_apply_iter.next(); - } - ChangeApplyState::Future(id) => { - self.pending_changes - .pending_changes - .entry(id) - .or_insert_with(Vec::new) - .extend(may_apply_iter); - break; - } - } - } - } - if let Some(mut unknown_lamport_change) = self - .pending_changes - .pending_unknown_lamport_changes - .remove(&id) - { - match remote_change_apply_state(latest_vv, &unknown_lamport_change) { + let Some(pending_changes) = self.pending_changes.changes.remove(&id) else{continue;}; + for pending_change in pending_changes { + match remote_change_apply_state(latest_vv, &pending_change) { ChangeApplyState::Directly => { - let last_id = unknown_lamport_change.id_last(); - latest_vv.set_end(unknown_lamport_change.id_end()); - self.dag - .calc_unknown_lamport_change(&mut unknown_lamport_change) - .unwrap(); - self.apply_local_change_from_remote(unknown_lamport_change); - id_stack.push(last_id); - } - ChangeApplyState::Existing => unreachable!(), - ChangeApplyState::Future(id) => { - self.pending_changes - .pending_unknown_lamport_changes - .insert(id, unknown_lamport_change); + id_stack.push(pending_change.id_last()); + latest_vv.set_end(pending_change.id_end()); + self.apply_local_change_from_remote(pending_change); } + ChangeApplyState::Existing => {} + ChangeApplyState::Future(miss_dep) => self + .pending_changes + .changes + .entry(miss_dep) + .or_insert_with(SmallVec::new) + .push(pending_change), } } } } - pub(super) fn apply_local_change_from_remote(&mut self, change: Change) { + pub(super) fn apply_local_change_from_remote(&mut self, change: PendingChange) { + let change = match change { + PendingChange::Known(c) => c, + PendingChange::Unknown(mut c) => { + self.dag.calc_unknown_lamport_change(&mut c).unwrap(); + c + } + }; + self.next_lamport = self.next_lamport.max(change.lamport_end()); // debug_dbg!(&change_causal_arr); self.dag.vv.extend_to_include_last_id(change.id_last()); From 524ab9dabd6211e9add3d82148c4d58eaa4e7113 Mon Sep 17 00:00:00 2001 From: leeeon233 Date: Thu, 21 Sep 2023 08:04:22 +0800 Subject: [PATCH 9/9] chore: visibility & named --- .../src/oplog/pending_changes.rs | 32 +++++++++---------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/crates/loro-internal/src/oplog/pending_changes.rs b/crates/loro-internal/src/oplog/pending_changes.rs index 465820d6..dce58312 100644 --- a/crates/loro-internal/src/oplog/pending_changes.rs +++ b/crates/loro-internal/src/oplog/pending_changes.rs @@ -55,13 +55,13 @@ impl OpLog { let local_change = to_local_op(change, converter); let local_change = PendingChange::Known(local_change); match remote_change_apply_state(&latest_vv, &local_change) { - ChangeApplyState::Directly => { + ChangeApplyState::CanApplyDirectly => { latest_vv.set_end(local_change.id_end()); ans.push(local_change.id_last()); self.apply_local_change_from_remote(local_change); } - ChangeApplyState::Existing => {} - ChangeApplyState::Future(miss_dep) => self + ChangeApplyState::Applied => {} + ChangeApplyState::AwaitingDependency(miss_dep) => self .pending_changes .changes .entry(miss_dep) @@ -82,7 +82,7 @@ impl OpLog { let local_change = to_local_op(change, converter); let local_change = PendingChange::Unknown(local_change); match remote_change_apply_state(latest_vv, &local_change) { - ChangeApplyState::Future(miss_dep) => self + ChangeApplyState::AwaitingDependency(miss_dep) => self .pending_changes .changes .entry(miss_dep) @@ -133,13 +133,13 @@ impl OpLog { let Some(pending_changes) = self.pending_changes.changes.remove(&id) else{continue;}; for pending_change in pending_changes { match remote_change_apply_state(latest_vv, &pending_change) { - ChangeApplyState::Directly => { + ChangeApplyState::CanApplyDirectly => { id_stack.push(pending_change.id_last()); latest_vv.set_end(pending_change.id_end()); self.apply_local_change_from_remote(pending_change); } - ChangeApplyState::Existing => {} - ChangeApplyState::Future(miss_dep) => self + ChangeApplyState::Applied => {} + ChangeApplyState::AwaitingDependency(miss_dep) => self .pending_changes .changes .entry(miss_dep) @@ -217,30 +217,30 @@ pub(super) fn to_local_op(change: Change, converter: &mut OpConverter) } } -pub enum ChangeApplyState { - Existing, - Directly, +enum ChangeApplyState { + Applied, + CanApplyDirectly, // The id of first missing dep - Future(ID), + AwaitingDependency(ID), } -pub(super) fn remote_change_apply_state(vv: &VersionVector, change: &Change) -> ChangeApplyState { +fn remote_change_apply_state(vv: &VersionVector, change: &Change) -> ChangeApplyState { let peer = change.id.peer; let CounterSpan { start, end } = change.ctr_span(); let vv_latest_ctr = vv.get(&peer).copied().unwrap_or(0); if vv_latest_ctr < start { - return ChangeApplyState::Future(change.id.inc(-1)); + return ChangeApplyState::AwaitingDependency(change.id.inc(-1)); } if vv_latest_ctr >= end { - return ChangeApplyState::Existing; + return ChangeApplyState::Applied; } for dep in change.deps.as_ref().iter() { let dep_vv_latest_ctr = vv.get(&dep.peer).copied().unwrap_or(0); if dep_vv_latest_ctr - 1 < dep.counter { - return ChangeApplyState::Future(*dep); + return ChangeApplyState::AwaitingDependency(*dep); } } - ChangeApplyState::Directly + ChangeApplyState::CanApplyDirectly } #[cfg(test)]