mirror of
https://github.com/loro-dev/loro.git
synced 2025-01-22 12:57:20 +00:00
feat: travel change's ancestors (#483)
This commit is contained in:
parent
95931ba238
commit
8252a5ae97
5 changed files with 237 additions and 7 deletions
|
@ -1,6 +1,9 @@
|
|||
use std::{cmp::Ordering, sync::Arc};
|
||||
|
||||
use loro_internal::{
|
||||
use loro_common::HasLamport;
|
||||
use rle::HasLength;
|
||||
|
||||
use crate::{
|
||||
change::{Change, Lamport, Timestamp},
|
||||
id::ID,
|
||||
version::Frontiers,
|
||||
|
@ -56,8 +59,20 @@ impl Ord for ChangeMeta {
|
|||
}
|
||||
}
|
||||
|
||||
impl HasLamport for ChangeMeta {
|
||||
fn lamport(&self) -> loro_common::Lamport {
|
||||
self.lamport
|
||||
}
|
||||
}
|
||||
|
||||
impl HasLength for ChangeMeta {
|
||||
fn content_len(&self) -> usize {
|
||||
self.len
|
||||
}
|
||||
}
|
||||
|
||||
impl ChangeMeta {
|
||||
pub(super) fn from_change(c: &Change) -> Self {
|
||||
pub fn from_change(c: &Change) -> Self {
|
||||
Self {
|
||||
id: c.id(),
|
||||
lamport: c.lamport(),
|
|
@ -8,6 +8,7 @@
|
|||
#![warn(missing_debug_implementations)]
|
||||
|
||||
pub mod arena;
|
||||
mod change_meta;
|
||||
pub mod diff;
|
||||
pub mod diff_calc;
|
||||
pub mod handler;
|
||||
|
@ -17,6 +18,8 @@ use std::sync::{Arc, Mutex};
|
|||
use arena::SharedArena;
|
||||
use configure::Configure;
|
||||
use diff_calc::DiffCalculator;
|
||||
|
||||
pub use change_meta::ChangeMeta;
|
||||
pub use event::{ContainerDiff, DiffEvent, DocDiff, ListDiff, ListDiffInsertItem, ListDiffItem};
|
||||
pub use fxhash::FxHashMap;
|
||||
pub use handler::{
|
||||
|
|
|
@ -1,11 +1,15 @@
|
|||
use either::Either;
|
||||
use fxhash::FxHashMap;
|
||||
use fxhash::{FxHashMap, FxHashSet};
|
||||
use itertools::Itertools;
|
||||
use loro_common::{ContainerID, ContainerType, HasIdSpan, IdSpan, LoroResult, LoroValue, ID};
|
||||
use loro_common::{
|
||||
ContainerID, ContainerType, HasIdSpan, HasLamportSpan, IdSpan, LoroResult, LoroValue, ID,
|
||||
};
|
||||
use rle::HasLength;
|
||||
use std::{
|
||||
borrow::Cow,
|
||||
cmp::Ordering,
|
||||
collections::BinaryHeap,
|
||||
ops::ControlFlow,
|
||||
sync::{
|
||||
atomic::{
|
||||
AtomicBool,
|
||||
|
@ -44,7 +48,8 @@ use crate::{
|
|||
undo::DiffBatch,
|
||||
utils::subscription::{SubscriberSet, Subscription},
|
||||
version::{shrink_frontiers, Frontiers, ImVersionVector},
|
||||
DocDiff, HandlerTrait, InternalString, ListHandler, LoroError, MapHandler, VersionVector,
|
||||
ChangeMeta, DocDiff, HandlerTrait, InternalString, ListHandler, LoroError, MapHandler,
|
||||
VersionVector,
|
||||
};
|
||||
|
||||
pub use crate::encoding::ExportMode;
|
||||
|
@ -1490,6 +1495,61 @@ impl LoroDoc {
|
|||
0
|
||||
}
|
||||
}
|
||||
|
||||
pub fn travel_change_ancestors(
|
||||
&self,
|
||||
id: ID,
|
||||
f: &mut dyn FnMut(ChangeMeta) -> ControlFlow<()>,
|
||||
) {
|
||||
struct PendingNode(ChangeMeta);
|
||||
impl PartialEq for PendingNode {
|
||||
fn eq(&self, other: &Self) -> bool {
|
||||
self.0.lamport_last() == other.0.lamport_last() && self.0.id.peer == other.0.id.peer
|
||||
}
|
||||
}
|
||||
impl Eq for PendingNode {}
|
||||
impl PartialOrd for PendingNode {
|
||||
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
|
||||
Some(self.cmp(other))
|
||||
}
|
||||
}
|
||||
impl Ord for PendingNode {
|
||||
fn cmp(&self, other: &Self) -> Ordering {
|
||||
self.0
|
||||
.lamport_last()
|
||||
.cmp(&other.0.lamport_last())
|
||||
.then_with(|| self.0.id.peer.cmp(&other.0.id.peer))
|
||||
}
|
||||
}
|
||||
|
||||
if !self.oplog().try_lock().unwrap().vv().includes_id(id) {
|
||||
return;
|
||||
}
|
||||
|
||||
let mut visited = FxHashSet::default();
|
||||
let mut pending: BinaryHeap<PendingNode> = BinaryHeap::new();
|
||||
pending.push(PendingNode(ChangeMeta::from_change(
|
||||
&self.oplog().lock().unwrap().get_change_at(id).unwrap(),
|
||||
)));
|
||||
while let Some(PendingNode(node)) = pending.pop() {
|
||||
let deps = node.deps.clone();
|
||||
if f(node).is_break() {
|
||||
break;
|
||||
}
|
||||
|
||||
for &dep in deps.iter() {
|
||||
let Some(dep_node) = self.oplog().lock().unwrap().get_change_at(dep) else {
|
||||
continue;
|
||||
};
|
||||
if visited.contains(&dep_node.id) {
|
||||
continue;
|
||||
}
|
||||
|
||||
visited.insert(dep_node.id);
|
||||
pending.push(PendingNode(ChangeMeta::from_change(&dep_node)));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// FIXME: PERF: This method is quite slow because it iterates all the changes
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
#![doc = include_str!("../README.md")]
|
||||
#![warn(missing_docs)]
|
||||
#![warn(missing_debug_implementations)]
|
||||
pub use change_meta::ChangeMeta;
|
||||
use either::Either;
|
||||
use event::{DiffEvent, Subscriber};
|
||||
use loro_internal::container::IntoContainerId;
|
||||
|
@ -26,13 +25,14 @@ use loro_internal::{
|
|||
UnknownHandler as InnerUnknownHandler,
|
||||
};
|
||||
use std::cmp::Ordering;
|
||||
use std::ops::ControlFlow;
|
||||
use std::ops::Range;
|
||||
use std::sync::Arc;
|
||||
use tracing::info;
|
||||
|
||||
mod change_meta;
|
||||
pub use loro_internal::subscription::LocalUpdateCallback;
|
||||
pub use loro_internal::subscription::PeerIdUpdateCallback;
|
||||
pub use loro_internal::ChangeMeta;
|
||||
pub mod event;
|
||||
pub use loro_internal::awareness;
|
||||
pub use loro_internal::configure::Configure;
|
||||
|
@ -793,6 +793,23 @@ impl LoroDoc {
|
|||
pub fn get_pending_txn_len(&self) -> usize {
|
||||
self.doc.get_pending_txn_len()
|
||||
}
|
||||
|
||||
/// Traverses the ancestors of the Change containing the given ID, including itself.
|
||||
///
|
||||
/// This method visits all ancestors in causal order, from the latest to the oldest,
|
||||
/// based on their Lamport timestamps.
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `id` - The ID of the Change to start the traversal from.
|
||||
/// * `f` - A mutable function that is called for each ancestor. It can return `ControlFlow::Break(())` to stop the traversal.
|
||||
pub fn travel_change_ancestors(
|
||||
&self,
|
||||
id: ID,
|
||||
f: &mut dyn FnMut(ChangeMeta) -> ControlFlow<()>,
|
||||
) {
|
||||
self.doc.travel_change_ancestors(id, f)
|
||||
}
|
||||
}
|
||||
|
||||
/// It's used to prevent the user from implementing the trait directly.
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
use std::{
|
||||
cmp::Ordering,
|
||||
ops::ControlFlow,
|
||||
sync::{
|
||||
atomic::{AtomicBool, AtomicU64},
|
||||
Arc,
|
||||
|
@ -1747,3 +1748,137 @@ fn test_encode_snapshot_when_checkout() {
|
|||
json!({"text": "Hello World"})
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn travel_change_ancestors() {
|
||||
let doc = LoroDoc::new();
|
||||
doc.set_peer_id(1).unwrap();
|
||||
doc.get_text("text").insert(0, "Hello").unwrap();
|
||||
doc.commit();
|
||||
let doc2 = doc.fork();
|
||||
doc2.set_peer_id(2).unwrap();
|
||||
doc2.get_text("text").insert(5, " World").unwrap();
|
||||
doc.get_text("text").insert(5, " Alice").unwrap();
|
||||
doc.import(&doc2.export(loro::ExportMode::all_updates()))
|
||||
.unwrap();
|
||||
doc2.import(&doc.export(loro::ExportMode::all_updates()))
|
||||
.unwrap();
|
||||
|
||||
doc.get_text("text").insert(0, "Y").unwrap();
|
||||
doc2.get_text("text").insert(0, "N").unwrap();
|
||||
doc.commit();
|
||||
doc2.commit();
|
||||
doc.import(&doc2.export(loro::ExportMode::all_updates()))
|
||||
.unwrap();
|
||||
doc.get_text("text").insert(0, "X").unwrap();
|
||||
doc.commit();
|
||||
let f = doc.state_frontiers();
|
||||
assert_eq!(f.len(), 1);
|
||||
let mut changes = vec![];
|
||||
doc.travel_change_ancestors(f[0], &mut |meta| {
|
||||
changes.push(meta.clone());
|
||||
ControlFlow::Continue(())
|
||||
});
|
||||
|
||||
let dbg_str = format!("{:#?}", changes);
|
||||
assert_eq!(
|
||||
dbg_str,
|
||||
r#"[
|
||||
ChangeMeta {
|
||||
lamport: 12,
|
||||
id: 12@1,
|
||||
timestamp: 0,
|
||||
message: None,
|
||||
deps: Frontiers(
|
||||
[
|
||||
11@1,
|
||||
6@2,
|
||||
],
|
||||
),
|
||||
len: 1,
|
||||
},
|
||||
ChangeMeta {
|
||||
lamport: 11,
|
||||
id: 6@2,
|
||||
timestamp: 0,
|
||||
message: None,
|
||||
deps: Frontiers(
|
||||
[
|
||||
5@2,
|
||||
10@1,
|
||||
],
|
||||
),
|
||||
len: 1,
|
||||
},
|
||||
ChangeMeta {
|
||||
lamport: 11,
|
||||
id: 11@1,
|
||||
timestamp: 0,
|
||||
message: None,
|
||||
deps: Frontiers(
|
||||
[
|
||||
10@1,
|
||||
5@2,
|
||||
],
|
||||
),
|
||||
len: 1,
|
||||
},
|
||||
ChangeMeta {
|
||||
lamport: 5,
|
||||
id: 0@2,
|
||||
timestamp: 0,
|
||||
message: None,
|
||||
deps: Frontiers(
|
||||
[
|
||||
4@1,
|
||||
],
|
||||
),
|
||||
len: 6,
|
||||
},
|
||||
ChangeMeta {
|
||||
lamport: 0,
|
||||
id: 0@1,
|
||||
timestamp: 0,
|
||||
message: None,
|
||||
deps: Frontiers(
|
||||
[],
|
||||
),
|
||||
len: 11,
|
||||
},
|
||||
]"#
|
||||
);
|
||||
|
||||
let mut changes = vec![];
|
||||
doc.travel_change_ancestors(ID::new(2, 4), &mut |meta| {
|
||||
changes.push(meta.clone());
|
||||
ControlFlow::Continue(())
|
||||
});
|
||||
let dbg_str = format!("{:#?}", changes);
|
||||
assert_eq!(
|
||||
dbg_str,
|
||||
r#"[
|
||||
ChangeMeta {
|
||||
lamport: 5,
|
||||
id: 0@2,
|
||||
timestamp: 0,
|
||||
message: None,
|
||||
deps: Frontiers(
|
||||
[
|
||||
4@1,
|
||||
],
|
||||
),
|
||||
len: 6,
|
||||
},
|
||||
ChangeMeta {
|
||||
lamport: 0,
|
||||
id: 0@1,
|
||||
timestamp: 0,
|
||||
message: None,
|
||||
deps: Frontiers(
|
||||
[],
|
||||
),
|
||||
len: 11,
|
||||
},
|
||||
]"#
|
||||
);
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue