mirror of
https://github.com/loro-dev/loro.git
synced 2024-11-24 12:20:06 +00:00
refactor: change the first param of travel change from id to ids (#492)
* refactor: change the first param of travel change from id to ids * fix: fix all warnings and refine the impl of subscription * chore: use pnpm
This commit is contained in:
parent
51c9b40022
commit
e3a7757610
13 changed files with 137 additions and 70 deletions
6
.github/workflows/rust.yml
vendored
6
.github/workflows/rust.yml
vendored
|
@ -40,9 +40,9 @@ jobs:
|
|||
with:
|
||||
version: "0.2.92"
|
||||
- uses: Swatinem/rust-cache@v2
|
||||
- name: Check
|
||||
run: cargo clippy --all-features
|
||||
- name: Build
|
||||
run: cargo build --verbose
|
||||
- name: Run rust tests
|
||||
run: deno task test
|
||||
- name: Run wasm tests
|
||||
run: deno task test-wasm
|
||||
run: deno task test-all
|
||||
|
|
|
@ -409,6 +409,7 @@ impl LoroDoc {
|
|||
let s = self.doc.subscribe_local_update(Box::new(move |update| {
|
||||
// TODO: should it be cloned?
|
||||
callback.on_local_update(update.to_vec());
|
||||
true
|
||||
}));
|
||||
Arc::new(Subscription(Arc::new(Mutex::new(s))))
|
||||
}
|
||||
|
|
|
@ -1,3 +1,6 @@
|
|||
#![allow(dead_code)]
|
||||
#![allow(unused)]
|
||||
|
||||
mod bfs;
|
||||
pub(crate) use bfs::calc_critical_version_bfs as calc_critical_version;
|
||||
|
||||
|
|
|
@ -33,7 +33,7 @@ pub use state::{TreeNode, TreeNodeWithChildren, TreeParentId};
|
|||
use subscription::{LocalUpdateCallback, Observer, PeerIdUpdateCallback};
|
||||
use txn::Transaction;
|
||||
pub use undo::UndoManager;
|
||||
use utils::subscription::SubscriberSet;
|
||||
use utils::subscription::SubscriberSetWithQueue;
|
||||
pub use utils::subscription::Subscription;
|
||||
pub mod allocation;
|
||||
pub mod awareness;
|
||||
|
@ -120,7 +120,6 @@ pub struct LoroDoc {
|
|||
txn: Arc<Mutex<Option<Transaction>>>,
|
||||
auto_commit: AtomicBool,
|
||||
detached: AtomicBool,
|
||||
|
||||
local_update_subs: SubscriberSet<(), LocalUpdateCallback>,
|
||||
peer_id_change_subs: SubscriberSet<(), PeerIdUpdateCallback>,
|
||||
local_update_subs: SubscriberSetWithQueue<(), LocalUpdateCallback, Vec<u8>>,
|
||||
peer_id_change_subs: SubscriberSetWithQueue<(), PeerIdUpdateCallback, ID>,
|
||||
}
|
||||
|
|
|
@ -46,7 +46,7 @@ use crate::{
|
|||
subscription::{LocalUpdateCallback, Observer, Subscriber},
|
||||
txn::Transaction,
|
||||
undo::DiffBatch,
|
||||
utils::subscription::{SubscriberSet, Subscription},
|
||||
utils::subscription::{SubscriberSetWithQueue, Subscription},
|
||||
version::{shrink_frontiers, Frontiers, ImVersionVector},
|
||||
ChangeMeta, DocDiff, HandlerTrait, InternalString, ListHandler, LoroError, MapHandler,
|
||||
VersionVector,
|
||||
|
@ -90,8 +90,8 @@ impl LoroDoc {
|
|||
diff_calculator: Arc::new(Mutex::new(DiffCalculator::new(true))),
|
||||
txn: global_txn,
|
||||
arena,
|
||||
local_update_subs: SubscriberSet::new(),
|
||||
peer_id_change_subs: SubscriberSet::new(),
|
||||
local_update_subs: SubscriberSetWithQueue::new(),
|
||||
peer_id_change_subs: SubscriberSetWithQueue::new(),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -120,9 +120,8 @@ impl LoroDoc {
|
|||
txn,
|
||||
auto_commit: AtomicBool::new(false),
|
||||
detached: AtomicBool::new(self.is_detached()),
|
||||
|
||||
local_update_subs: SubscriberSet::new(),
|
||||
peer_id_change_subs: SubscriberSet::new(),
|
||||
local_update_subs: SubscriberSetWithQueue::new(),
|
||||
peer_id_change_subs: SubscriberSetWithQueue::new(),
|
||||
};
|
||||
|
||||
if self.auto_commit.load(std::sync::atomic::Ordering::Relaxed) {
|
||||
|
@ -279,11 +278,7 @@ impl LoroDoc {
|
|||
|
||||
let new_txn = self.txn().unwrap();
|
||||
self.txn.try_lock().unwrap().replace(new_txn);
|
||||
|
||||
self.peer_id_change_subs.retain(&(), &mut |callback| {
|
||||
callback(peer, next_id.counter);
|
||||
true
|
||||
});
|
||||
self.peer_id_change_subs.emit(&(), next_id);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
|
@ -300,10 +295,7 @@ impl LoroDoc {
|
|||
.peer
|
||||
.store(peer, std::sync::atomic::Ordering::Relaxed);
|
||||
drop(doc_state);
|
||||
self.peer_id_change_subs.retain(&(), &mut |callback| {
|
||||
callback(peer, next_id.counter);
|
||||
true
|
||||
});
|
||||
self.peer_id_change_subs.emit(&(), next_id);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
@ -981,7 +973,7 @@ impl LoroDoc {
|
|||
}
|
||||
|
||||
pub fn subscribe_local_update(&self, callback: LocalUpdateCallback) -> Subscription {
|
||||
let (sub, activate) = self.local_update_subs.insert((), callback);
|
||||
let (sub, activate) = self.local_update_subs.inner().insert((), callback);
|
||||
activate();
|
||||
sub
|
||||
}
|
||||
|
@ -1507,24 +1499,36 @@ impl LoroDoc {
|
|||
0
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum ChangeTravelError {
|
||||
#[error("Target id not found {0:?}")]
|
||||
TargetIdNotFound(ID),
|
||||
#[error("History on the target version is trimmed")]
|
||||
TargetVersionTrimmed,
|
||||
}
|
||||
|
||||
impl LoroDoc {
|
||||
pub fn travel_change_ancestors(
|
||||
&self,
|
||||
id: ID,
|
||||
ids: &[ID],
|
||||
f: &mut dyn FnMut(ChangeMeta) -> ControlFlow<()>,
|
||||
) {
|
||||
) -> Result<(), ChangeTravelError> {
|
||||
struct PendingNode(ChangeMeta);
|
||||
impl PartialEq for PendingNode {
|
||||
fn eq(&self, other: &Self) -> bool {
|
||||
self.0.lamport_last() == other.0.lamport_last() && self.0.id.peer == other.0.id.peer
|
||||
}
|
||||
}
|
||||
|
||||
impl Eq for PendingNode {}
|
||||
impl PartialOrd for PendingNode {
|
||||
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
|
||||
Some(self.cmp(other))
|
||||
}
|
||||
}
|
||||
|
||||
impl Ord for PendingNode {
|
||||
fn cmp(&self, other: &Self) -> Ordering {
|
||||
self.0
|
||||
|
@ -1534,15 +1538,23 @@ impl LoroDoc {
|
|||
}
|
||||
}
|
||||
|
||||
if !self.oplog().try_lock().unwrap().vv().includes_id(id) {
|
||||
return;
|
||||
for id in ids {
|
||||
let op_log = &self.oplog().try_lock().unwrap();
|
||||
if !op_log.vv().includes_id(*id) {
|
||||
return Err(ChangeTravelError::TargetIdNotFound(*id));
|
||||
}
|
||||
if op_log.dag.trimmed_vv().includes_id(*id) {
|
||||
return Err(ChangeTravelError::TargetVersionTrimmed);
|
||||
}
|
||||
}
|
||||
|
||||
let mut visited = FxHashSet::default();
|
||||
let mut pending: BinaryHeap<PendingNode> = BinaryHeap::new();
|
||||
pending.push(PendingNode(ChangeMeta::from_change(
|
||||
&self.oplog().try_lock().unwrap().get_change_at(id).unwrap(),
|
||||
)));
|
||||
for id in ids {
|
||||
pending.push(PendingNode(ChangeMeta::from_change(
|
||||
&self.oplog().try_lock().unwrap().get_change_at(*id).unwrap(),
|
||||
)));
|
||||
}
|
||||
while let Some(PendingNode(node)) = pending.pop() {
|
||||
let deps = node.deps.clone();
|
||||
if f(node).is_break() {
|
||||
|
@ -1561,6 +1573,8 @@ impl LoroDoc {
|
|||
pending.push(PendingNode(ChangeMeta::from_change(&dep_node)));
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -7,7 +7,7 @@ use crate::{
|
|||
Subscription,
|
||||
};
|
||||
use fxhash::FxHashMap;
|
||||
use loro_common::{ContainerID, Counter, PeerID};
|
||||
use loro_common::{ContainerID, ID};
|
||||
use smallvec::SmallVec;
|
||||
use std::{
|
||||
collections::VecDeque,
|
||||
|
@ -15,15 +15,15 @@ use std::{
|
|||
};
|
||||
|
||||
/// The callback of the local update.
|
||||
pub type LocalUpdateCallback = Box<dyn Fn(&[u8]) + Send + Sync + 'static>;
|
||||
pub type LocalUpdateCallback = Box<dyn Fn(&Vec<u8>) -> bool + Send + Sync + 'static>;
|
||||
/// The callback of the peer id change. The second argument is the next counter for the peer.
|
||||
pub type PeerIdUpdateCallback = Box<dyn Fn(PeerID, Counter) + Send + Sync + 'static>;
|
||||
pub type PeerIdUpdateCallback = Box<dyn Fn(&ID) -> bool + Send + Sync + 'static>;
|
||||
pub type Subscriber = Arc<dyn (for<'a> Fn(DiffEvent<'a>)) + Send + Sync>;
|
||||
|
||||
impl LoroDoc {
|
||||
/// Subscribe to the changes of the peer id.
|
||||
pub fn subscribe_peer_id_change(&self, callback: PeerIdUpdateCallback) -> Subscription {
|
||||
let (s, enable) = self.peer_id_change_subs.insert((), callback);
|
||||
let (s, enable) = self.peer_id_change_subs.inner().insert((), callback);
|
||||
enable();
|
||||
s
|
||||
}
|
||||
|
|
|
@ -11,7 +11,6 @@ use loro_common::{ContainerType, IdLp, IdSpan, LoroResult};
|
|||
use loro_delta::{array_vec::ArrayVec, DeltaRopeBuilder};
|
||||
use rle::{HasLength, Mergable, RleVec};
|
||||
use smallvec::{smallvec, SmallVec};
|
||||
use tracing::trace;
|
||||
|
||||
use crate::{
|
||||
change::{Change, Lamport, Timestamp},
|
||||
|
@ -70,24 +69,25 @@ impl crate::LoroDoc {
|
|||
);
|
||||
|
||||
let obs = self.observer.clone();
|
||||
let local_update_subs = self.local_update_subs.clone();
|
||||
let local_update_subs_weak = self.local_update_subs.downgrade();
|
||||
txn.set_on_commit(Box::new(move |state, oplog, id_span| {
|
||||
trace!("on_commit!");
|
||||
let mut state = state.try_lock().unwrap();
|
||||
let events = state.take_events();
|
||||
drop(state);
|
||||
for event in events {
|
||||
trace!("on_commit! {:#?}", &event);
|
||||
obs.emit(event);
|
||||
}
|
||||
|
||||
if !local_update_subs.is_empty() {
|
||||
let bytes =
|
||||
{ export_fast_updates_in_range(&oplog.try_lock().unwrap(), &[id_span]) };
|
||||
local_update_subs.retain(&(), &mut |callback| {
|
||||
callback(&bytes);
|
||||
true
|
||||
});
|
||||
if id_span.atom_len() == 0 {
|
||||
return;
|
||||
}
|
||||
|
||||
if let Some(local_update_subs) = local_update_subs_weak.upgrade() {
|
||||
if !local_update_subs.inner().is_empty() {
|
||||
let bytes =
|
||||
{ export_fast_updates_in_range(&oplog.try_lock().unwrap(), &[id_span]) };
|
||||
local_update_subs.emit(&(), bytes);
|
||||
}
|
||||
}
|
||||
}));
|
||||
|
||||
|
|
|
@ -505,12 +505,13 @@ impl UndoManager {
|
|||
}
|
||||
}));
|
||||
|
||||
let sub = doc.subscribe_peer_id_change(Box::new(move |peer_id, counter| {
|
||||
let sub = doc.subscribe_peer_id_change(Box::new(move |id| {
|
||||
let mut inner = inner_clone2.try_lock().unwrap();
|
||||
inner.undo_stack.clear();
|
||||
inner.redo_stack.clear();
|
||||
inner.next_counter = Some(counter);
|
||||
peer_clone2.store(peer_id, std::sync::atomic::Ordering::Relaxed);
|
||||
inner.next_counter = Some(id.counter);
|
||||
peer_clone2.store(id.peer, std::sync::atomic::Ordering::Relaxed);
|
||||
true
|
||||
}));
|
||||
|
||||
UndoManager {
|
||||
|
|
|
@ -226,14 +226,11 @@ Apache License
|
|||
END OF TERMS AND CONDITIONS
|
||||
|
||||
*/
|
||||
use std::collections::{BTreeMap, BTreeSet, VecDeque};
|
||||
use std::error::Error;
|
||||
use std::ptr::eq;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::Mutex;
|
||||
use std::{fmt::Debug, mem, sync::Arc};
|
||||
|
||||
use smallvec::SmallVec;
|
||||
use std::collections::{BTreeMap, BTreeSet};
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::{Mutex, Weak};
|
||||
use std::{fmt::Debug, mem, sync::Arc};
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum SubscriptionError {
|
||||
|
@ -474,11 +471,25 @@ pub(crate) struct SubscriberSetWithQueue<EmitterKey, Callback, Payload> {
|
|||
queue: Arc<Mutex<BTreeMap<EmitterKey, Vec<Payload>>>>,
|
||||
}
|
||||
|
||||
pub(crate) struct WeakSubscriberSetWithQueue<EmitterKey, Callback, Payload> {
|
||||
subscriber_set: Weak<Mutex<SubscriberSetState<EmitterKey, Callback>>>,
|
||||
queue: Weak<Mutex<BTreeMap<EmitterKey, Vec<Payload>>>>,
|
||||
}
|
||||
|
||||
impl<EmitterKey, Callback, Payload> WeakSubscriberSetWithQueue<EmitterKey, Callback, Payload> {
|
||||
pub fn upgrade(self) -> Option<SubscriberSetWithQueue<EmitterKey, Callback, Payload>> {
|
||||
Some(SubscriberSetWithQueue {
|
||||
subscriber_set: SubscriberSet(self.subscriber_set.upgrade()?),
|
||||
queue: self.queue.upgrade()?,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl<EmitterKey, Callback, Payload> SubscriberSetWithQueue<EmitterKey, Callback, Payload>
|
||||
where
|
||||
EmitterKey: 'static + Ord + Clone + Debug + Send + Sync,
|
||||
Callback: 'static + Send + Sync + FnMut(&Payload) -> bool,
|
||||
Payload: Send + Sync,
|
||||
Callback: 'static + Send + Sync + for<'a> FnMut(&'a Payload) -> bool,
|
||||
Payload: Send + Sync + Debug,
|
||||
{
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
|
@ -486,6 +497,14 @@ where
|
|||
queue: Arc::new(Mutex::new(Default::default())),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn downgrade(&self) -> WeakSubscriberSetWithQueue<EmitterKey, Callback, Payload> {
|
||||
WeakSubscriberSetWithQueue {
|
||||
subscriber_set: Arc::downgrade(&self.subscriber_set.0),
|
||||
queue: Arc::downgrade(&self.queue),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn inner(&self) -> &SubscriberSet<EmitterKey, Callback> {
|
||||
&self.subscriber_set
|
||||
}
|
||||
|
|
|
@ -2,7 +2,7 @@
|
|||
#![allow(non_snake_case)]
|
||||
#![allow(clippy::empty_docs)]
|
||||
#![allow(clippy::doc_lazy_continuation)]
|
||||
#![warn(missing_docs)]
|
||||
// #![warn(missing_docs)]
|
||||
|
||||
use convert::{js_to_version_vector, resolved_diff_to_js};
|
||||
use js_sys::{Array, Object, Promise, Reflect, Uint8Array};
|
||||
|
@ -1241,6 +1241,7 @@ impl LoroDoc {
|
|||
if let Err(e) = observer.call1(&arr.into()) {
|
||||
console_error!("Error: {:?}", e);
|
||||
}
|
||||
true
|
||||
})));
|
||||
|
||||
let closure = Closure::wrap(Box::new(move || {
|
||||
|
|
|
@ -8,6 +8,7 @@ use loro_internal::cursor::PosQueryResult;
|
|||
use loro_internal::cursor::Side;
|
||||
use loro_internal::handler::HandlerTrait;
|
||||
use loro_internal::handler::ValueOrHandler;
|
||||
use loro_internal::loro::ChangeTravelError;
|
||||
use loro_internal::undo::{OnPop, OnPush};
|
||||
pub use loro_internal::version::ImVersionVector;
|
||||
use loro_internal::DocState;
|
||||
|
@ -798,14 +799,14 @@ impl LoroDoc {
|
|||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `id` - The ID of the Change to start the traversal from.
|
||||
/// * `ids` - The IDs of the Change to start the traversal from.
|
||||
/// * `f` - A mutable function that is called for each ancestor. It can return `ControlFlow::Break(())` to stop the traversal.
|
||||
pub fn travel_change_ancestors(
|
||||
&self,
|
||||
id: ID,
|
||||
ids: &[ID],
|
||||
f: &mut dyn FnMut(ChangeMeta) -> ControlFlow<()>,
|
||||
) {
|
||||
self.doc.travel_change_ancestors(id, f)
|
||||
) -> Result<(), ChangeTravelError> {
|
||||
self.doc.travel_change_ancestors(ids, f)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1238,8 +1238,9 @@ fn test_loro_export_local_updates() {
|
|||
let updates = Arc::new(Mutex::new(Vec::new()));
|
||||
|
||||
let updates_clone = updates.clone();
|
||||
let subscription = doc.subscribe_local_update(Box::new(move |bytes: &[u8]| {
|
||||
let subscription = doc.subscribe_local_update(Box::new(move |bytes: &Vec<u8>| {
|
||||
updates_clone.try_lock().unwrap().push(bytes.to_vec());
|
||||
true
|
||||
}));
|
||||
|
||||
// Make some changes
|
||||
|
@ -1728,8 +1729,9 @@ fn change_peer_id() {
|
|||
let doc = LoroDoc::new();
|
||||
let received_peer_id = Arc::new(AtomicU64::new(0));
|
||||
let received_peer_id_clone = received_peer_id.clone();
|
||||
let sub = doc.subscribe_peer_id_change(Box::new(move |peer_id, _counter| {
|
||||
received_peer_id_clone.store(peer_id, Ordering::SeqCst);
|
||||
let sub = doc.subscribe_peer_id_change(Box::new(move |id| {
|
||||
received_peer_id_clone.store(id.peer, Ordering::SeqCst);
|
||||
true
|
||||
}));
|
||||
|
||||
doc.set_peer_id(1).unwrap();
|
||||
|
@ -1787,10 +1789,11 @@ fn travel_change_ancestors() {
|
|||
let f = doc.state_frontiers();
|
||||
assert_eq!(f.len(), 1);
|
||||
let mut changes = vec![];
|
||||
doc.travel_change_ancestors(f[0], &mut |meta| {
|
||||
doc.travel_change_ancestors(&[f[0]], &mut |meta| {
|
||||
changes.push(meta.clone());
|
||||
ControlFlow::Continue(())
|
||||
});
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
let dbg_str = format!("{:#?}", changes);
|
||||
assert_eq!(
|
||||
|
@ -1861,10 +1864,11 @@ fn travel_change_ancestors() {
|
|||
);
|
||||
|
||||
let mut changes = vec![];
|
||||
doc.travel_change_ancestors(ID::new(2, 4), &mut |meta| {
|
||||
doc.travel_change_ancestors(&[ID::new(2, 4)], &mut |meta| {
|
||||
changes.push(meta.clone());
|
||||
ControlFlow::Continue(())
|
||||
});
|
||||
})
|
||||
.unwrap();
|
||||
let dbg_str = format!("{:#?}", changes);
|
||||
assert_eq!(
|
||||
dbg_str,
|
||||
|
@ -1895,6 +1899,30 @@ fn travel_change_ancestors() {
|
|||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn no_dead_loop_when_subscribe_local_updates_to_each_other() {
|
||||
let doc1 = Arc::new(LoroDoc::new());
|
||||
let doc2 = Arc::new(LoroDoc::new());
|
||||
|
||||
let doc1_clone = doc1.clone();
|
||||
let doc2_clone = doc2.clone();
|
||||
let _sub1 = doc1.subscribe_local_update(Box::new(move |updates| {
|
||||
doc2_clone.import(updates).unwrap();
|
||||
true
|
||||
}));
|
||||
let _sub2 = doc2.subscribe_local_update(Box::new(move |updates| {
|
||||
doc1_clone.import(updates).unwrap();
|
||||
true
|
||||
}));
|
||||
|
||||
doc1.get_text("text").insert(0, "Hello").unwrap();
|
||||
doc1.commit();
|
||||
doc2.get_text("text").insert(0, "World").unwrap();
|
||||
doc2.commit();
|
||||
|
||||
assert_eq!(doc1.get_deep_value(), doc2.get_deep_value());
|
||||
}
|
||||
|
||||
/// https://github.com/loro-dev/loro/issues/490
|
||||
#[test]
|
||||
fn issue_490() -> anyhow::Result<()> {
|
||||
|
|
|
@ -8,7 +8,7 @@
|
|||
"check-all": "cargo hack check --each-feature",
|
||||
"build": "cargo build",
|
||||
"test": "cargo nextest run --features=test_utils,jsonpath --no-fail-fast && cargo test --doc",
|
||||
"test-all": "nr test && nr test-wasm",
|
||||
"test-all": "pnpm test && pnpm test-wasm",
|
||||
"test-wasm": "cd crates/loro-wasm && deno task dev && cd ../../loro-js && pnpm i && pnpm run test",
|
||||
"coverage": "mkdir -p coverage && cargo llvm-cov nextest --features test_utils,jsonpath --lcov > coverage/lcov-nextest.info && cargo llvm-cov report",
|
||||
"release-wasm": "cd crates/loro-wasm && deno task release && cd ../../loro-js && pnpm i && pnpm build && pnpm run test",
|
||||
|
|
Loading…
Reference in a new issue