From 8252a5ae979111ee36070fd006a9b3b6c8ff2b67 Mon Sep 17 00:00:00 2001 From: Zixuan Chen Date: Thu, 26 Sep 2024 12:15:42 +0800 Subject: [PATCH] feat: travel change's ancestors (#483) --- .../src/change_meta.rs | 19 ++- crates/loro-internal/src/lib.rs | 3 + crates/loro-internal/src/loro.rs | 66 ++++++++- crates/loro/src/lib.rs | 21 ++- crates/loro/tests/loro_rust_test.rs | 135 ++++++++++++++++++ 5 files changed, 237 insertions(+), 7 deletions(-) rename crates/{loro => loro-internal}/src/change_meta.rs (87%) diff --git a/crates/loro/src/change_meta.rs b/crates/loro-internal/src/change_meta.rs similarity index 87% rename from crates/loro/src/change_meta.rs rename to crates/loro-internal/src/change_meta.rs index ccd66b7b..5555e247 100644 --- a/crates/loro/src/change_meta.rs +++ b/crates/loro-internal/src/change_meta.rs @@ -1,6 +1,9 @@ use std::{cmp::Ordering, sync::Arc}; -use loro_internal::{ +use loro_common::HasLamport; +use rle::HasLength; + +use crate::{ change::{Change, Lamport, Timestamp}, id::ID, version::Frontiers, @@ -56,8 +59,20 @@ impl Ord for ChangeMeta { } } +impl HasLamport for ChangeMeta { + fn lamport(&self) -> loro_common::Lamport { + self.lamport + } +} + +impl HasLength for ChangeMeta { + fn content_len(&self) -> usize { + self.len + } +} + impl ChangeMeta { - pub(super) fn from_change(c: &Change) -> Self { + pub fn from_change(c: &Change) -> Self { Self { id: c.id(), lamport: c.lamport(), diff --git a/crates/loro-internal/src/lib.rs b/crates/loro-internal/src/lib.rs index bdf09817..5c39f5c7 100644 --- a/crates/loro-internal/src/lib.rs +++ b/crates/loro-internal/src/lib.rs @@ -8,6 +8,7 @@ #![warn(missing_debug_implementations)] pub mod arena; +mod change_meta; pub mod diff; pub mod diff_calc; pub mod handler; @@ -17,6 +18,8 @@ use std::sync::{Arc, Mutex}; use arena::SharedArena; use configure::Configure; use diff_calc::DiffCalculator; + +pub use change_meta::ChangeMeta; pub use event::{ContainerDiff, DiffEvent, DocDiff, ListDiff, ListDiffInsertItem, ListDiffItem}; pub use fxhash::FxHashMap; pub use handler::{ diff --git a/crates/loro-internal/src/loro.rs b/crates/loro-internal/src/loro.rs index aac4af79..424f2b80 100644 --- a/crates/loro-internal/src/loro.rs +++ b/crates/loro-internal/src/loro.rs @@ -1,11 +1,15 @@ use either::Either; -use fxhash::FxHashMap; +use fxhash::{FxHashMap, FxHashSet}; use itertools::Itertools; -use loro_common::{ContainerID, ContainerType, HasIdSpan, IdSpan, LoroResult, LoroValue, ID}; +use loro_common::{ + ContainerID, ContainerType, HasIdSpan, HasLamportSpan, IdSpan, LoroResult, LoroValue, ID, +}; use rle::HasLength; use std::{ borrow::Cow, cmp::Ordering, + collections::BinaryHeap, + ops::ControlFlow, sync::{ atomic::{ AtomicBool, @@ -44,7 +48,8 @@ use crate::{ undo::DiffBatch, utils::subscription::{SubscriberSet, Subscription}, version::{shrink_frontiers, Frontiers, ImVersionVector}, - DocDiff, HandlerTrait, InternalString, ListHandler, LoroError, MapHandler, VersionVector, + ChangeMeta, DocDiff, HandlerTrait, InternalString, ListHandler, LoroError, MapHandler, + VersionVector, }; pub use crate::encoding::ExportMode; @@ -1490,6 +1495,61 @@ impl LoroDoc { 0 } } + + pub fn travel_change_ancestors( + &self, + id: ID, + f: &mut dyn FnMut(ChangeMeta) -> ControlFlow<()>, + ) { + struct PendingNode(ChangeMeta); + impl PartialEq for PendingNode { + fn eq(&self, other: &Self) -> bool { + self.0.lamport_last() == other.0.lamport_last() && self.0.id.peer == other.0.id.peer + } + } + impl Eq for PendingNode {} + impl PartialOrd for PendingNode { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } + } + impl Ord for PendingNode { + fn cmp(&self, other: &Self) -> Ordering { + self.0 + .lamport_last() + .cmp(&other.0.lamport_last()) + .then_with(|| self.0.id.peer.cmp(&other.0.id.peer)) + } + } + + if !self.oplog().try_lock().unwrap().vv().includes_id(id) { + return; + } + + let mut visited = FxHashSet::default(); + let mut pending: BinaryHeap = BinaryHeap::new(); + pending.push(PendingNode(ChangeMeta::from_change( + &self.oplog().lock().unwrap().get_change_at(id).unwrap(), + ))); + while let Some(PendingNode(node)) = pending.pop() { + let deps = node.deps.clone(); + if f(node).is_break() { + break; + } + + for &dep in deps.iter() { + let Some(dep_node) = self.oplog().lock().unwrap().get_change_at(dep) else { + continue; + }; + if visited.contains(&dep_node.id) { + continue; + } + + visited.insert(dep_node.id); + pending.push(PendingNode(ChangeMeta::from_change(&dep_node))); + } + } + } } // FIXME: PERF: This method is quite slow because it iterates all the changes diff --git a/crates/loro/src/lib.rs b/crates/loro/src/lib.rs index 675cdaf6..a8a346c7 100644 --- a/crates/loro/src/lib.rs +++ b/crates/loro/src/lib.rs @@ -1,7 +1,6 @@ #![doc = include_str!("../README.md")] #![warn(missing_docs)] #![warn(missing_debug_implementations)] -pub use change_meta::ChangeMeta; use either::Either; use event::{DiffEvent, Subscriber}; use loro_internal::container::IntoContainerId; @@ -26,13 +25,14 @@ use loro_internal::{ UnknownHandler as InnerUnknownHandler, }; use std::cmp::Ordering; +use std::ops::ControlFlow; use std::ops::Range; use std::sync::Arc; use tracing::info; -mod change_meta; pub use loro_internal::subscription::LocalUpdateCallback; pub use loro_internal::subscription::PeerIdUpdateCallback; +pub use loro_internal::ChangeMeta; pub mod event; pub use loro_internal::awareness; pub use loro_internal::configure::Configure; @@ -793,6 +793,23 @@ impl LoroDoc { pub fn get_pending_txn_len(&self) -> usize { self.doc.get_pending_txn_len() } + + /// Traverses the ancestors of the Change containing the given ID, including itself. + /// + /// This method visits all ancestors in causal order, from the latest to the oldest, + /// based on their Lamport timestamps. + /// + /// # Arguments + /// + /// * `id` - The ID of the Change to start the traversal from. + /// * `f` - A mutable function that is called for each ancestor. It can return `ControlFlow::Break(())` to stop the traversal. + pub fn travel_change_ancestors( + &self, + id: ID, + f: &mut dyn FnMut(ChangeMeta) -> ControlFlow<()>, + ) { + self.doc.travel_change_ancestors(id, f) + } } /// It's used to prevent the user from implementing the trait directly. diff --git a/crates/loro/tests/loro_rust_test.rs b/crates/loro/tests/loro_rust_test.rs index b4ca5256..469bd029 100644 --- a/crates/loro/tests/loro_rust_test.rs +++ b/crates/loro/tests/loro_rust_test.rs @@ -1,5 +1,6 @@ use std::{ cmp::Ordering, + ops::ControlFlow, sync::{ atomic::{AtomicBool, AtomicU64}, Arc, @@ -1747,3 +1748,137 @@ fn test_encode_snapshot_when_checkout() { json!({"text": "Hello World"}) ); } + +#[test] +fn travel_change_ancestors() { + let doc = LoroDoc::new(); + doc.set_peer_id(1).unwrap(); + doc.get_text("text").insert(0, "Hello").unwrap(); + doc.commit(); + let doc2 = doc.fork(); + doc2.set_peer_id(2).unwrap(); + doc2.get_text("text").insert(5, " World").unwrap(); + doc.get_text("text").insert(5, " Alice").unwrap(); + doc.import(&doc2.export(loro::ExportMode::all_updates())) + .unwrap(); + doc2.import(&doc.export(loro::ExportMode::all_updates())) + .unwrap(); + + doc.get_text("text").insert(0, "Y").unwrap(); + doc2.get_text("text").insert(0, "N").unwrap(); + doc.commit(); + doc2.commit(); + doc.import(&doc2.export(loro::ExportMode::all_updates())) + .unwrap(); + doc.get_text("text").insert(0, "X").unwrap(); + doc.commit(); + let f = doc.state_frontiers(); + assert_eq!(f.len(), 1); + let mut changes = vec![]; + doc.travel_change_ancestors(f[0], &mut |meta| { + changes.push(meta.clone()); + ControlFlow::Continue(()) + }); + + let dbg_str = format!("{:#?}", changes); + assert_eq!( + dbg_str, + r#"[ + ChangeMeta { + lamport: 12, + id: 12@1, + timestamp: 0, + message: None, + deps: Frontiers( + [ + 11@1, + 6@2, + ], + ), + len: 1, + }, + ChangeMeta { + lamport: 11, + id: 6@2, + timestamp: 0, + message: None, + deps: Frontiers( + [ + 5@2, + 10@1, + ], + ), + len: 1, + }, + ChangeMeta { + lamport: 11, + id: 11@1, + timestamp: 0, + message: None, + deps: Frontiers( + [ + 10@1, + 5@2, + ], + ), + len: 1, + }, + ChangeMeta { + lamport: 5, + id: 0@2, + timestamp: 0, + message: None, + deps: Frontiers( + [ + 4@1, + ], + ), + len: 6, + }, + ChangeMeta { + lamport: 0, + id: 0@1, + timestamp: 0, + message: None, + deps: Frontiers( + [], + ), + len: 11, + }, +]"# + ); + + let mut changes = vec![]; + doc.travel_change_ancestors(ID::new(2, 4), &mut |meta| { + changes.push(meta.clone()); + ControlFlow::Continue(()) + }); + let dbg_str = format!("{:#?}", changes); + assert_eq!( + dbg_str, + r#"[ + ChangeMeta { + lamport: 5, + id: 0@2, + timestamp: 0, + message: None, + deps: Frontiers( + [ + 4@1, + ], + ), + len: 6, + }, + ChangeMeta { + lamport: 0, + id: 0@1, + timestamp: 0, + message: None, + deps: Frontiers( + [], + ), + len: 11, + }, +]"# + ); +}