refactor!: better event api

- (js) Refactor subscription mechanism to return unsubscribe function
- (rust) Implement must-use struct for subscriptions in Rust
- Enhance API ergonomics for JavaScript-like environments
- Improve resource cleanup and prevent potential memory leaks
This commit is contained in:
Zixuan Chen 2024-10-03 00:42:21 +08:00
parent 09a004e365
commit 5630e742f6
No known key found for this signature in database
31 changed files with 409 additions and 546 deletions

View file

@ -57,7 +57,8 @@ impl Actor {
let mut tracker = cb_tracker.try_lock().unwrap();
tracker.apply_diff(e)
});
}));
}))
.detach();
let mut default_history = FxHashMap::default();
default_history.insert(Vec::new(), loro.get_deep_value());
Actor {

View file

@ -40,7 +40,7 @@ impl CounterActor {
let mut counter = counter.try_lock().unwrap();
counter.apply_diff(event);
}),
);
).detach();
let root = loro.get_counter("counter");
Self {

View file

@ -44,7 +44,7 @@ impl ListActor {
let mut list = list.try_lock().unwrap();
list.apply_diff(event);
}),
);
).detach();
let root = loro.get_list("list");
Self {

View file

@ -36,7 +36,7 @@ impl MapActor {
let mut map = map.try_lock().unwrap();
map.apply_diff(event);
}),
);
).detach();
let root = loro.get_map("map");
MapActor {

View file

@ -42,7 +42,8 @@ impl MovableListActor {
let mut list = list.try_lock().unwrap();
list.apply_diff(event);
}),
);
)
.detach();
let root = loro.get_movable_list("movable_list");
Self {

View file

@ -42,13 +42,13 @@ impl TextActor {
);
let tracker = Arc::new(Mutex::new(ContainerTracker::Map(tracker)));
let text = tracker.clone();
loro.subscribe(
&ContainerID::new_root("text", ContainerType::Text),
Arc::new(move |event| {
text.try_lock().unwrap().apply_diff(event);
}),
);
)
.detach();
let root = loro.get_text("text");
TextActor {
loro,

View file

@ -113,7 +113,8 @@ impl TreeActor {
tree.try_lock().unwrap().apply_diff(event);
// println!("after {:?}\n", tree.try_lock().unwrap().as_map().unwrap());
}),
);
)
.detach();
let root = loro.get_tree("tree");
root.enable_fractional_index(0);

View file

@ -8,7 +8,7 @@ use std::{
use loro::{
cursor::CannotFindRelativePosition, DocAnalysis, FrontiersNotIncluded, IdSpan, JsonPathError,
JsonSchema, Lamport, LoroDoc as InnerLoroDoc, LoroEncodeError, LoroError, LoroResult, PeerID,
SubID, Timestamp, ID,
Timestamp, ID,
};
use crate::{
@ -377,25 +377,28 @@ impl LoroDoc {
self.doc.set_peer_id(peer)
}
pub fn subscribe(&self, container_id: &ContainerID, subscriber: Arc<dyn Subscriber>) -> SubID {
self.doc.subscribe(
&(container_id.into()),
Arc::new(move |e| {
subscriber.on_diff(DiffEvent::from(e));
}),
)
pub fn subscribe(
&self,
container_id: &ContainerID,
subscriber: Arc<dyn Subscriber>,
) -> Subscription {
self.doc
.subscribe(
&(container_id.into()),
Arc::new(move |e| {
subscriber.on_diff(DiffEvent::from(e));
}),
)
.into()
}
pub fn subscribe_root(&self, subscriber: Arc<dyn Subscriber>) -> SubID {
pub fn subscribe_root(&self, subscriber: Arc<dyn Subscriber>) -> Subscription {
// self.doc.subscribe_root(callback)
self.doc.subscribe_root(Arc::new(move |e| {
subscriber.on_diff(DiffEvent::from(e));
}))
}
/// Remove a subscription by subscription id.
pub fn unsubscribe(&self, id: SubID) {
self.doc.unsubscribe(id)
self.doc
.subscribe_root(Arc::new(move |e| {
subscriber.on_diff(DiffEvent::from(e));
}))
.into()
}
/// Subscribe the local update of the document.
@ -702,6 +705,12 @@ impl std::fmt::Debug for Subscription {
}
}
impl From<loro::Subscription> for Subscription {
fn from(value: loro::Subscription) -> Self {
Self(Arc::new(Mutex::new(value)))
}
}
pub struct PosQueryResult {
pub update: Option<Arc<Cursor>>,
pub current: AbsolutePosition,

View file

@ -6,7 +6,7 @@ pub use loro::{
EventTriggerKind, ExpandType, FractionalIndex, IdLp, IdSpan, JsonChange, JsonFutureOp,
JsonFutureOpWrapper, JsonListOp, JsonMapOp, JsonMovableListOp, JsonOp, JsonOpContent,
JsonPathError, JsonSchema, JsonTextOp, JsonTreeOp, Lamport, LoroError, PeerID, StyleConfig,
SubID, TreeID, ID,
TreeID, ID,
};
pub use std::cmp::Ordering;
use std::sync::Arc;

View file

@ -26,7 +26,7 @@ mod event {
b.iter(|| {
let loro = LoroDoc::default();
loro.start_auto_commit();
loro.subscribe_root(Arc::new(|_e| {}));
let _g = loro.subscribe_root(Arc::new(|_e| {}));
let mut handlers = vec![loro.get_list("list")];
for _ in 0..deep {
handlers = handlers

View file

@ -63,7 +63,7 @@ mod run {
b.iter(|| {
let loro = LoroDoc::default();
let text = loro.get_text("text");
loro.subscribe_root(Arc::new(move |event| {
let _g = loro.subscribe_root(Arc::new(move |event| {
black_box(event);
}));
let mut txn = loro.txn().unwrap();
@ -220,7 +220,7 @@ mod run {
b.iter(|| {
let loro = LoroDoc::default();
let text = loro.get_text("text");
loro.subscribe_root(Arc::new(move |event| {
let _g = loro.subscribe_root(Arc::new(move |event| {
black_box(event);
}));
{

View file

@ -10,7 +10,7 @@ fn main() {
let doc = LoroDoc::new();
doc.start_auto_commit();
let list = doc.get_list("list");
doc.subscribe_root(Arc::new(|e| {
let _g = doc.subscribe_root(Arc::new(|e| {
for container_diff in e.events {
match &container_diff.diff {
Diff::List(list) => {

View file

@ -503,7 +503,7 @@ mod test {
#[test]
fn test_text_event() {
let loro = LoroDoc::new();
loro.subscribe_root(Arc::new(|event| {
let _g = loro.subscribe_root(Arc::new(|event| {
let mut value = LoroValue::String(Default::default());
value.apply_diff(&event.events.iter().map(|x| x.diff.clone()).collect_vec());
assert_eq!(value, "h223ello".into());

View file

@ -4078,7 +4078,7 @@ mod test {
.unwrap();
let loro2 = LoroDoc::new();
loro2.subscribe_root(Arc::new(|e| {
let _g = loro2.subscribe_root(Arc::new(|e| {
println!("{} {:?} ", e.event_meta.by, e.event_meta.diff)
}));
loro2.import(&loro.export_from(&loro2.oplog_vv())).unwrap();

View file

@ -44,7 +44,7 @@ use crate::{
op::InnerContent,
oplog::{loro_dag::FrontiersNotIncluded, OpLog},
state::DocState,
subscription::{LocalUpdateCallback, Observer, SubID, Subscriber},
subscription::{LocalUpdateCallback, Observer, Subscriber},
txn::Transaction,
undo::DiffBatch,
utils::subscription::{SubscriberSet, Subscription},
@ -960,7 +960,7 @@ impl LoroDoc {
self.oplog().try_lock().unwrap().cmp_frontiers(a, b)
}
pub fn subscribe_root(&self, callback: Subscriber) -> SubID {
pub fn subscribe_root(&self, callback: Subscriber) -> Subscription {
let mut state = self.state.try_lock().unwrap();
if !state.is_recording() {
state.start_recording();
@ -969,7 +969,7 @@ impl LoroDoc {
self.observer.subscribe_root(callback)
}
pub fn subscribe(&self, container_id: &ContainerID, callback: Subscriber) -> SubID {
pub fn subscribe(&self, container_id: &ContainerID, callback: Subscriber) -> Subscription {
let mut state = self.state.try_lock().unwrap();
if !state.is_recording() {
state.start_recording();
@ -978,11 +978,6 @@ impl LoroDoc {
self.observer.subscribe(container_id, callback)
}
#[inline]
pub fn unsubscribe(&self, id: SubID) {
self.observer.unsubscribe(id);
}
pub fn subscribe_local_update(&self, callback: LocalUpdateCallback) -> Subscription {
let (sub, activate) = self.local_update_subs.insert((), callback);
activate();

View file

@ -1,19 +1,18 @@
use std::sync::{
atomic::{AtomicU32, AtomicUsize, Ordering},
Arc, Mutex,
};
use fxhash::{FxHashMap, FxHashSet};
use itertools::Itertools;
use loro_common::{ContainerID, Counter, PeerID};
use smallvec::SmallVec;
use crate::{container::idx::ContainerIdx, ContainerDiff, LoroDoc, Subscription};
use super::{
arena::SharedArena,
event::{DiffEvent, DocDiff},
};
use crate::{
container::idx::ContainerIdx, utils::subscription::SubscriberSet, ContainerDiff, LoroDoc,
Subscription,
};
use fxhash::FxHashMap;
use loro_common::{ContainerID, Counter, PeerID};
use smallvec::SmallVec;
use std::{
collections::VecDeque,
sync::{Arc, Mutex},
};
/// The callback of the local update.
pub type LocalUpdateCallback = Box<dyn Fn(&[u8]) + Send + Sync + 'static>;
@ -30,41 +29,28 @@ impl LoroDoc {
}
}
#[derive(Default)]
struct ObserverInner {
subscribers: FxHashMap<SubID, Subscriber>,
containers: FxHashMap<ContainerIdx, FxHashSet<SubID>>,
root: FxHashSet<SubID>,
deleted: FxHashSet<SubID>,
event_queue: Vec<DocDiff>,
subscriber_set: SubscriberSet<Option<ContainerIdx>, Subscriber>,
queue: Arc<Mutex<VecDeque<DocDiff>>>,
}
#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)]
pub struct SubID(u32);
impl SubID {
pub fn into_u32(self) -> u32 {
self.0
}
pub fn from_u32(id: u32) -> Self {
Self(id)
impl Default for ObserverInner {
fn default() -> Self {
Self {
subscriber_set: SubscriberSet::new(),
queue: Arc::new(Mutex::new(VecDeque::new())),
}
}
}
pub struct Observer {
inner: Mutex<ObserverInner>,
inner: ObserverInner,
arena: SharedArena,
next_sub_id: AtomicU32,
taken_times: AtomicUsize,
}
impl std::fmt::Debug for Observer {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Observer")
.field("next_sub_id", &self.next_sub_id)
.field("taken_times", &self.taken_times)
.finish()
f.debug_struct("Observer").finish()
}
}
@ -72,194 +58,106 @@ impl Observer {
pub fn new(arena: SharedArena) -> Self {
Self {
arena,
next_sub_id: AtomicU32::new(0),
taken_times: AtomicUsize::new(0),
inner: Mutex::new(ObserverInner {
subscribers: Default::default(),
containers: Default::default(),
root: Default::default(),
deleted: Default::default(),
event_queue: Default::default(),
}),
inner: ObserverInner::default(),
}
}
pub fn subscribe(&self, id: &ContainerID, callback: Subscriber) -> SubID {
pub fn subscribe(&self, id: &ContainerID, callback: Subscriber) -> Subscription {
let idx = self.arena.register_container(id);
let sub_id = self.fetch_add_next_id();
let mut inner = self.inner.try_lock().unwrap();
inner.subscribers.insert(sub_id, callback);
inner.containers.entry(idx).or_default().insert(sub_id);
sub_id
let inner = &self.inner;
let (sub, enable) = inner.subscriber_set.insert(Some(idx), callback);
enable();
sub
}
pub fn subscribe_root(&self, callback: Subscriber) -> SubID {
let sub_id = self.fetch_add_next_id();
let mut inner = self.inner.try_lock().unwrap();
inner.subscribers.insert(sub_id, callback);
inner.root.insert(sub_id);
sub_id
}
fn fetch_add_next_id(&self) -> SubID {
SubID(
self.next_sub_id
.fetch_add(1, std::sync::atomic::Ordering::Relaxed),
)
pub fn subscribe_root(&self, callback: Subscriber) -> Subscription {
let inner = &self.inner;
let (sub, enable) = inner.subscriber_set.insert(None, callback);
enable();
sub
}
pub(crate) fn emit(&self, doc_diff: DocDiff) {
if self.taken_times.load(Ordering::Relaxed) > 0 {
self.inner.try_lock().unwrap().event_queue.push(doc_diff);
return;
let success = self.emit_inner(doc_diff);
if success {
let mut e = self.inner.queue.try_lock().unwrap().pop_front();
while let Some(event) = e {
self.emit_inner(event);
e = self.inner.queue.try_lock().unwrap().pop_front();
}
}
let mut inner = self.take_inner();
self.emit_inner(&doc_diff, &mut inner);
self.reset_inner(inner);
}
// When emitting changes, we need to make sure that the observer is not locked.
fn emit_inner(&self, doc_diff: &DocDiff, inner: &mut ObserverInner) {
fn emit_inner(&self, doc_diff: DocDiff) -> bool {
let inner = &self.inner;
let mut container_events_map: FxHashMap<ContainerIdx, SmallVec<[&ContainerDiff; 1]>> =
Default::default();
for container_diff in doc_diff.diff.iter() {
self.arena
.with_ancestors(container_diff.idx, |ancestor, _| {
if let Some(subs) = inner.containers.get_mut(&ancestor) {
// update subscriber set on ancestors' listener entries
subs.retain(|sub| match inner.subscribers.contains_key(sub) {
true => {
container_events_map
.entry(ancestor)
.or_default()
.push(container_diff);
true
}
false => false,
});
if inner.subscriber_set.may_include(&Some(ancestor)) {
container_events_map
.entry(ancestor)
.or_default()
.push(container_diff);
}
});
}
for (container_idx, container_diffs) in container_events_map {
let subs = inner.containers.get_mut(&container_idx).unwrap();
for sub in subs.iter() {
let f = inner.subscribers.get_mut(sub).unwrap();
(f)(DiffEvent {
current_target: Some(self.arena.get_container_id(container_idx).unwrap()),
events: &container_diffs,
event_meta: doc_diff,
})
}
}
if !inner.root.is_empty() {
let events = doc_diff.diff.iter().collect_vec();
inner
.root
// use `.retain` to update subscriber set on ancestors' listener entries
.retain(|sub| match inner.subscribers.get_mut(sub) {
Some(f) => {
(f)(DiffEvent {
current_target: None,
events: &events,
event_meta: doc_diff,
});
true
}
None => false,
})
}
}
fn take_inner(&self) -> ObserverInner {
self.taken_times
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let mut inner_guard = self.inner.try_lock().unwrap();
std::mem::take(&mut *inner_guard)
}
fn reset_inner(&self, mut inner: ObserverInner) {
let mut count = 0;
loop {
let mut inner_guard = self.inner.try_lock().unwrap();
// need to merge the old and new sets
if !inner_guard.containers.is_empty() {
for (key, set) in inner_guard.containers.iter() {
let old_set = inner.containers.get_mut(key).unwrap();
for value in set {
old_set.insert(*value);
}
}
}
if !inner_guard.root.is_empty() {
for value in inner_guard.root.iter() {
inner.root.insert(*value);
}
}
if !inner_guard.subscribers.is_empty() {
for (key, value) in std::mem::take(&mut inner_guard.subscribers) {
inner.subscribers.insert(key, value);
}
}
if !inner_guard.deleted.is_empty() {
let is_taken = self.is_taken();
for value in inner_guard.deleted.iter() {
inner.subscribers.remove(value);
if is_taken {
inner.deleted.insert(*value);
}
}
}
if 1 == self
.taken_times
.fetch_sub(1, std::sync::atomic::Ordering::Release)
&& !inner_guard.event_queue.is_empty()
{
// Check whether we are calling events recursively.
// If so, push the event to the queue
if inner.subscriber_set.is_recursive_calling(&None)
|| container_events_map
.keys()
.any(|x| inner.subscriber_set.is_recursive_calling(&Some(*x)))
{
// emit the queued events
let events = std::mem::take(&mut inner_guard.event_queue);
*inner_guard = Default::default();
self.taken_times
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
drop(inner_guard);
for event in events {
self.emit_inner(&event, &mut inner);
}
count += 1;
if count >= 1024 {
panic!("Too many recursive events.");
}
} else {
inner.event_queue.append(&mut inner_guard.event_queue);
*inner_guard = inner;
return;
drop(container_events_map);
inner.queue.try_lock().unwrap().push_back(doc_diff);
return false;
}
}
}
pub fn unsubscribe(&self, sub_id: SubID) {
let mut inner = self.inner.try_lock().unwrap();
inner.subscribers.remove(&sub_id);
if self.is_taken() {
inner.deleted.insert(sub_id);
for (container_idx, container_diffs) in container_events_map {
inner
.subscriber_set
.retain(&Some(container_idx), &mut |callback| {
(callback)(DiffEvent {
current_target: Some(self.arena.get_container_id(container_idx).unwrap()),
events: &container_diffs,
event_meta: &doc_diff,
});
true
})
.unwrap();
}
}
fn is_taken(&self) -> bool {
self.taken_times.load(std::sync::atomic::Ordering::Acquire) != 0
let events: Vec<_> = doc_diff.diff.iter().collect();
inner
.subscriber_set
.retain(&None, &mut |callback| {
(callback)(DiffEvent {
current_target: None,
events: &events,
event_meta: &doc_diff,
});
true
})
.unwrap();
true
}
}
#[cfg(test)]
mod test {
use std::sync::atomic::{AtomicUsize, Ordering};
use crate::{handler::HandlerTrait, loro::LoroDoc};
use tracing::trace;
use super::*;
use crate::{handler::HandlerTrait, loro::LoroDoc};
#[test]
fn test_recursive_events() {
@ -267,15 +165,18 @@ mod test {
let loro_cp = loro.clone();
let count = Arc::new(AtomicUsize::new(0));
let count_cp = Arc::clone(&count);
loro_cp.subscribe_root(Arc::new(move |_| {
let _g = loro_cp.subscribe_root(Arc::new(move |_| {
count_cp.fetch_add(1, Ordering::SeqCst);
let mut txn = loro.txn().unwrap();
let text = loro.get_text("id");
if text.get_value().as_string().unwrap().len() > 10 {
trace!("Skip");
return;
}
text.insert_with_txn(&mut txn, 0, "123").unwrap();
trace!("PRE Another commit");
txn.commit().unwrap();
trace!("AFTER Another commit");
}));
let loro = loro_cp;
@ -311,7 +212,7 @@ mod test {
txn.commit().unwrap();
}
assert_eq!(count.load(Ordering::SeqCst), 2);
loro.unsubscribe(sub);
sub.unsubscribe();
{
let mut txn = loro.txn().unwrap();
text.insert_with_txn(&mut txn, 0, "123").unwrap();

View file

@ -11,6 +11,7 @@ use loro_common::{ContainerType, IdLp, IdSpan, LoroResult};
use loro_delta::{array_vec::ArrayVec, DeltaRopeBuilder};
use rle::{HasLength, Mergable, RleVec};
use smallvec::{smallvec, SmallVec};
use tracing::trace;
use crate::{
change::{Change, Lamport, Timestamp},
@ -71,10 +72,12 @@ impl crate::LoroDoc {
let obs = self.observer.clone();
let local_update_subs = self.local_update_subs.clone();
txn.set_on_commit(Box::new(move |state, oplog, id_span| {
trace!("on_commit!");
let mut state = state.try_lock().unwrap();
let events = state.take_events();
drop(state);
for event in events {
trace!("on_commit! {:#?}", &event);
obs.emit(event);
}

View file

@ -125,6 +125,7 @@ pub struct UndoManager {
container_remap: Arc<Mutex<FxHashMap<ContainerID, ContainerID>>>,
inner: Arc<Mutex<UndoManagerInner>>,
_peer_id_change_sub: Subscription,
_undo_sub: Subscription,
}
impl std::fmt::Debug for UndoManager {
@ -442,7 +443,7 @@ impl UndoManager {
let inner_clone2 = inner.clone();
let remap_containers = Arc::new(Mutex::new(FxHashMap::default()));
let remap_containers_clone = remap_containers.clone();
doc.subscribe_root(Arc::new(move |event| match event.event_meta.by {
let undo_sub = doc.subscribe_root(Arc::new(move |event| match event.event_meta.by {
EventTriggerKind::Local => {
// TODO: PERF undo can be significantly faster if we can get
// the DiffBatch for undo here
@ -517,6 +518,7 @@ impl UndoManager {
container_remap: remap_containers,
inner,
_peer_id_change_sub: sub,
_undo_sub: undo_sub,
}
}

View file

@ -226,11 +226,26 @@ Apache License
END OF TERMS AND CONDITIONS
*/
use std::collections::{BTreeMap, BTreeSet};
use std::collections::{BTreeMap, BTreeSet, VecDeque};
use std::error::Error;
use std::ptr::eq;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Mutex;
use std::{fmt::Debug, mem, sync::Arc};
use smallvec::SmallVec;
#[derive(Debug)]
pub enum SubscriptionError {
CannotEmitEventDueToRecursiveCall,
}
impl std::fmt::Display for Subscription {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "SubscriptionError")
}
}
pub(crate) struct SubscriberSet<EmitterKey, Callback>(
Arc<Mutex<SubscriberSetState<EmitterKey, Callback>>>,
);
@ -334,18 +349,30 @@ where
})
}
pub fn is_recursive_calling(&self, emitter: &EmitterKey) -> bool {
if let Some(set) = self.0.try_lock().unwrap().subscribers.get(emitter) {
set.is_none()
} else {
false
}
}
/// Call the given callback for each subscriber to the given emitter.
/// If the callback returns false, the subscriber is removed.
pub fn retain(&self, emitter: &EmitterKey, f: &mut dyn FnMut(&mut Callback) -> bool) {
let Some(mut subscribers) = self
.0
.try_lock()
.unwrap()
.subscribers
.get_mut(emitter)
.and_then(|s| s.take())
else {
return;
pub fn retain(
&self,
emitter: &EmitterKey,
f: &mut dyn FnMut(&mut Callback) -> bool,
) -> Result<(), SubscriptionError> {
let mut subscribers = {
let mut subscriber_set_state = self.0.try_lock().unwrap();
let Some(set) = subscriber_set_state.subscribers.get_mut(emitter) else {
return Ok(());
};
let Some(inner) = set.take() else {
return Err(SubscriptionError::CannotEmitEventDueToRecursiveCall);
};
inner
};
subscribers.retain(|_, subscriber| {
@ -371,18 +398,16 @@ where
if !subscribers.is_empty() {
lock.subscribers.insert(emitter.clone(), Some(subscribers));
}
Ok(())
}
pub fn is_empty(&self) -> bool {
self.0
.try_lock()
.unwrap()
.subscribers
.iter()
.all(|x| match &x.1 {
Some(x) => x.is_empty(),
None => true,
})
self.0.try_lock().unwrap().subscribers.is_empty()
}
pub fn may_include(&self, emitter: &EmitterKey) -> bool {
self.0.try_lock().unwrap().subscribers.contains_key(emitter)
}
}
@ -434,3 +459,56 @@ impl Drop for Subscription {
}
}
}
/// A wrapper around `SubscriberSet` that automatically handles recursive event emission.
///
/// This struct differs from `SubscriberSet` in the following ways:
/// 1. It automatically handles the `CannotEmitEventDueToRecursiveCall` error that can occur in `SubscriberSet`.
/// 2. When a recursive event emission is detected, it queues the event instead of throwing an error.
/// 3. After the current event processing is complete, it automatically processes the queued events.
///
/// This behavior ensures that all events are processed in the order they were emitted, even in cases
/// where recursive event emission would normally cause an error.
pub(crate) struct SubscriberSetWithQueue<EmitterKey, Callback, Payload> {
subscriber_set: SubscriberSet<EmitterKey, Callback>,
queue: Arc<Mutex<BTreeMap<EmitterKey, Vec<Payload>>>>,
}
impl<EmitterKey, Callback, Payload> SubscriberSetWithQueue<EmitterKey, Callback, Payload>
where
EmitterKey: 'static + Ord + Clone + Debug + Send + Sync,
Callback: 'static + Send + Sync + FnMut(&Payload) -> bool,
Payload: Send + Sync,
{
pub fn new() -> Self {
Self {
subscriber_set: SubscriberSet::new(),
queue: Arc::new(Mutex::new(Default::default())),
}
}
pub fn inner(&self) -> &SubscriberSet<EmitterKey, Callback> {
&self.subscriber_set
}
pub(crate) fn emit(&self, key: &EmitterKey, payload: Payload) {
let mut pending_events: SmallVec<[Payload; 1]> = SmallVec::new();
pending_events.push(payload);
while let Some(payload) = pending_events.pop() {
let result = self
.subscriber_set
.retain(key, &mut |callback| (callback)(&payload));
match result {
Ok(_) => {
let mut queue = self.queue.try_lock().unwrap();
if let Some(new_pending_events) = queue.remove(key) {
pending_events.extend(new_pending_events);
}
}
Err(SubscriptionError::CannotEmitEventDueToRecursiveCall) => {
let mut queue = self.queue.try_lock().unwrap();
queue.entry(key.clone()).or_default().push(payload);
}
}
}
}
}

View file

@ -94,7 +94,7 @@ fn mark_with_the_same_key_value_should_be_skipped() {
#[test]
fn event_from_checkout() {
let a = LoroDoc::new_auto_commit();
let sub_id = a.subscribe_root(Arc::new(|event| {
let sub = a.subscribe_root(Arc::new(|event| {
assert!(matches!(
event.event_meta.by,
EventTriggerKind::Checkout | EventTriggerKind::Local
@ -105,10 +105,10 @@ fn event_from_checkout() {
let version = a.oplog_frontiers();
a.get_text("text").insert(0, "hello").unwrap();
a.commit_then_renew();
a.unsubscribe(sub_id);
sub.unsubscribe();
let ran = Arc::new(AtomicBool::new(false));
let ran_cloned = ran.clone();
a.subscribe_root(Arc::new(move |event| {
let _g = a.subscribe_root(Arc::new(move |event| {
assert!(event.event_meta.by.is_checkout());
ran.store(true, std::sync::atomic::Ordering::Relaxed);
}));
@ -119,7 +119,7 @@ fn event_from_checkout() {
#[test]
fn handler_in_event() {
let doc = LoroDoc::new_auto_commit();
doc.subscribe_root(Arc::new(|e| {
let _g = doc.subscribe_root(Arc::new(|e| {
dbg!(&e);
let value = e.events[0]
.diff
@ -200,7 +200,7 @@ fn list() {
#[test]
fn richtext_mark_event() {
let a = LoroDoc::new_auto_commit();
a.subscribe(
let _g = a.subscribe(
&a.get_text("text").id(),
Arc::new(|e| {
let delta = e.events[0].diff.as_text().unwrap();
@ -221,7 +221,7 @@ fn richtext_mark_event() {
.unwrap();
a.commit_then_stop();
let b = LoroDoc::new_auto_commit();
b.subscribe(
let _g = b.subscribe(
&a.get_text("text").id(),
Arc::new(|e| {
let delta = e.events[0].diff.as_text().unwrap();
@ -265,7 +265,7 @@ fn concurrent_richtext_mark_event() {
);
a.merge(&b).unwrap();
a.unsubscribe(sub_id);
sub_id.unsubscribe();
let sub_id = a.subscribe(
&a.get_text("text").id(),
@ -290,8 +290,8 @@ fn concurrent_richtext_mark_event() {
.mark(2, 3, "bold", LoroValue::Null)
.unwrap();
a.merge(&b).unwrap();
a.unsubscribe(sub_id);
a.subscribe(
sub_id.unsubscribe();
let _g = a.subscribe(
&a.get_text("text").id(),
Arc::new(|e| {
for container_diff in e.events {
@ -322,7 +322,7 @@ fn insert_richtext_event() {
a.get_text("text").mark(0, 5, "bold", true.into()).unwrap();
a.commit_then_renew();
let text = a.get_text("text");
a.subscribe(
let _g = a.subscribe(
&text.id(),
Arc::new(|e| {
let delta = e.events[0].diff.as_text().unwrap();
@ -342,7 +342,7 @@ fn insert_richtext_event() {
#[test]
fn import_after_init_handlers() {
let a = LoroDoc::new_auto_commit();
a.subscribe(
let _g = a.subscribe(
&ContainerID::new_root("text", ContainerType::Text),
Arc::new(|event| {
assert!(matches!(
@ -351,7 +351,7 @@ fn import_after_init_handlers() {
))
}),
);
a.subscribe(
let _g = a.subscribe(
&ContainerID::new_root("map", ContainerType::Map),
Arc::new(|event| {
assert!(matches!(
@ -360,7 +360,7 @@ fn import_after_init_handlers() {
))
}),
);
a.subscribe(
let _g = a.subscribe(
&ContainerID::new_root("list", ContainerType::List),
Arc::new(|event| {
assert!(matches!(
@ -429,7 +429,7 @@ fn test_checkout() {
let value: Arc<Mutex<LoroValue>> = Arc::new(Mutex::new(LoroValue::Map(Default::default())));
let root_value = value.clone();
doc_0.subscribe_root(Arc::new(move |event| {
let _g = doc_0.subscribe_root(Arc::new(move |event| {
dbg!(&event);
let mut root_value = root_value.try_lock().unwrap();
for container_diff in event.events {
@ -722,7 +722,7 @@ fn map_concurrent_checkout() {
#[test]
fn tree_checkout() {
let doc_a = LoroDoc::new_auto_commit();
doc_a.subscribe_root(Arc::new(|_e| {}));
let _g = doc_a.subscribe_root(Arc::new(|_e| {}));
doc_a.set_peer_id(1).unwrap();
let tree = doc_a.get_tree("root");
let id1 = tree.create(TreeParentId::Root).unwrap();
@ -803,7 +803,7 @@ fn state_may_deadlock_when_import() {
panic_after(Duration::from_millis(100), || {
let doc = LoroDoc::new_auto_commit();
let map = doc.get_map("map");
doc.subscribe_root(Arc::new(move |_e| {
let _g = doc.subscribe_root(Arc::new(move |_e| {
map.id();
}));
@ -824,7 +824,7 @@ fn missing_event_when_checkout() {
doc.checkout(&doc.oplog_frontiers()).unwrap();
let value = Arc::new(Mutex::new(FxHashMap::default()));
let map = value.clone();
doc.subscribe(
let _g = doc.subscribe(
&ContainerID::new_root("tree", ContainerType::Tree),
Arc::new(move |e| {
let mut v = map.try_lock().unwrap();
@ -875,7 +875,7 @@ fn empty_event() {
doc.commit_then_renew();
let fire = Arc::new(AtomicBool::new(false));
let fire_clone = Arc::clone(&fire);
doc.subscribe_root(Arc::new(move |_e| {
let _g = doc.subscribe_root(Arc::new(move |_e| {
fire_clone.store(true, std::sync::atomic::Ordering::Relaxed);
}));
doc.import(&doc.export_snapshot().unwrap()).unwrap();

View file

@ -1,10 +1,9 @@
use std::sync::Arc;
use super::subscription_to_js_function_callback;
use loro_internal::{
handler::{counter::CounterHandler, Handler},
subscription::SubID,
HandlerTrait, LoroDoc,
};
use std::sync::Arc;
use wasm_bindgen::prelude::*;
use crate::{
@ -56,29 +55,20 @@ impl LoroCounter {
}
/// Subscribe to the changes of the counter.
pub fn subscribe(&self, f: js_sys::Function) -> JsResult<u32> {
pub fn subscribe(&self, f: js_sys::Function) -> JsResult<JsValue> {
let observer = observer::Observer::new(f);
let doc = self
.doc
.clone()
.ok_or_else(|| JsError::new("Document is not attached"))?;
let doc_clone = doc.clone();
let ans = doc.subscribe(
let sub = doc.subscribe(
&self.handler.id(),
Arc::new(move |e| {
call_after_micro_task(observer.clone(), e, &doc_clone);
}),
);
Ok(ans.into_u32())
}
/// Unsubscribe by the subscription id.
pub fn unsubscribe(&self, subscription: u32) -> JsResult<()> {
self.doc
.as_ref()
.ok_or_else(|| JsError::new("Document is not attached"))?
.unsubscribe(SubID::from_u32(subscription));
Ok(())
Ok(subscription_to_js_function_callback(sub))
}
/// Get the parent container of the counter container.

View file

@ -20,12 +20,11 @@ use loro_internal::{
json::JsonSchema,
loro::{CommitOptions, ExportMode},
loro_common::check_root_container_name,
subscription::SubID,
undo::{UndoItemMeta, UndoOrRedo},
version::Frontiers,
ContainerType, DiffEvent, FxHashMap, HandlerTrait, LoroDoc as LoroDocInner, LoroValue,
MovableListHandler, TreeNodeWithChildren, TreeParentId, UndoManager as InnerUndoManager,
VersionVector as InternalVersionVector,
MovableListHandler, Subscription, TreeNodeWithChildren, TreeParentId,
UndoManager as InnerUndoManager, VersionVector as InternalVersionVector,
};
use rle::HasLength;
use serde::{Deserialize, Serialize};
@ -1222,35 +1221,14 @@ impl LoroDoc {
/// doc.commit();
/// ```
// TODO: convert event and event sub config
pub fn subscribe(&self, f: js_sys::Function) -> u32 {
pub fn subscribe(&self, f: js_sys::Function) -> JsValue {
let observer = observer::Observer::new(f);
let doc = self.0.clone();
self.0
.subscribe_root(Arc::new(move |e| {
call_after_micro_task(observer.clone(), e, &doc)
// call_subscriber(observer.clone(), e);
}))
.into_u32()
}
/// Unsubscribe by the subscription id.
///
/// @example
/// ```ts
/// import { LoroDoc } from "loro-crdt";
///
/// const doc = new LoroDoc();
/// const text = doc.getText("text");
/// const subscription = doc.subscribe((event)=>{
/// console.log(event);
/// });
/// text.insert(0, "Hello");
/// // the events will be emitted when `commit()` is called.
/// doc.commit();
/// doc.unsubscribe(subscription);
/// ```
pub fn unsubscribe(&self, subscription: u32) {
self.0.unsubscribe(SubID::from_u32(subscription))
let sub = self.0.subscribe_root(Arc::new(move |e| {
call_after_micro_task(observer.clone(), e, &doc)
// call_subscriber(observer.clone(), e);
}));
subscription_to_js_function_callback(sub)
}
/// Subscribe the updates from local edits
@ -1962,7 +1940,7 @@ impl LoroText {
/// - `doc.checkout(version)` is called.
///
/// returns a subscription id, which can be used to unsubscribe.
pub fn subscribe(&self, f: js_sys::Function) -> JsResult<u32> {
pub fn subscribe(&self, f: js_sys::Function) -> JsResult<JsValue> {
let observer = observer::Observer::new(f);
let doc = self
.doc
@ -1976,16 +1954,7 @@ impl LoroText {
}),
);
Ok(ans.into_u32())
}
/// Unsubscribe by the subscription id.
pub fn unsubscribe(&self, subscription: u32) -> JsResult<()> {
self.doc
.as_ref()
.ok_or_else(|| JsError::new("Document is not attached"))?
.unsubscribe(SubID::from_u32(subscription));
Ok(())
Ok(subscription_to_js_function_callback(ans))
}
/// Change the state of this text by delta.
@ -2329,44 +2298,21 @@ impl LoroMap {
/// map.set("foo", "bar");
/// doc.commit();
/// ```
pub fn subscribe(&self, f: js_sys::Function) -> JsResult<u32> {
pub fn subscribe(&self, f: js_sys::Function) -> JsResult<JsValue> {
let observer = observer::Observer::new(f);
let doc = self
.doc
.clone()
.ok_or_else(|| JsError::new("Document is not attached"))?;
let doc_clone = doc.clone();
let id = doc.subscribe(
let sub = doc.subscribe(
&self.handler.id(),
Arc::new(move |e| {
call_after_micro_task(observer.clone(), e, &doc_clone);
}),
);
Ok(id.into_u32())
}
/// Unsubscribe by the subscription.
///
/// @example
/// ```ts
/// import { LoroDoc } from "loro-crdt";
///
/// const doc = new LoroDoc();
/// const map = doc.getMap("map");
/// const subscription = map.subscribe((event)=>{
/// console.log(event);
/// });
/// map.set("foo", "bar");
/// doc.commit();
/// map.unsubscribe(subscription);
/// ```
pub fn unsubscribe(&self, subscription: u32) -> JsResult<()> {
self.doc
.as_ref()
.ok_or_else(|| JsError::new("Document is not attached"))?
.unsubscribe(SubID::from_u32(subscription));
Ok(())
Ok(subscription_to_js_function_callback(sub))
}
/// Get the size of the map.
@ -2629,43 +2575,20 @@ impl LoroList {
/// list.insert(0, 100);
/// doc.commit();
/// ```
pub fn subscribe(&self, f: js_sys::Function) -> JsResult<u32> {
pub fn subscribe(&self, f: js_sys::Function) -> JsResult<JsValue> {
let observer = observer::Observer::new(f);
let doc = self
.doc
.clone()
.ok_or_else(|| JsError::new("Document is not attached"))?;
let doc_clone = doc.clone();
let ans = doc.subscribe(
let sub = doc.subscribe(
&self.handler.id(),
Arc::new(move |e| {
call_after_micro_task(observer.clone(), e, &doc_clone);
}),
);
Ok(ans.into_u32())
}
/// Unsubscribe by the subscription id.
///
/// @example
/// ```ts
/// import { LoroDoc } from "loro-crdt";
///
/// const doc = new LoroDoc();
/// const list = doc.getList("list");
/// const subscription = list.subscribe((event)=>{
/// console.log(event);
/// });
/// list.insert(0, 100);
/// doc.commit();
/// list.unsubscribe(subscription);
/// ```
pub fn unsubscribe(&self, subscription: u32) -> JsResult<()> {
self.doc
.as_ref()
.ok_or_else(|| JsError::new("Document is not attached"))?
.unsubscribe(SubID::from_u32(subscription));
Ok(())
Ok(subscription_to_js_function_callback(sub))
}
/// Get the length of list.
@ -2969,43 +2892,20 @@ impl LoroMovableList {
/// list.insert(0, 100);
/// doc.commit();
/// ```
pub fn subscribe(&self, f: js_sys::Function) -> JsResult<u32> {
pub fn subscribe(&self, f: js_sys::Function) -> JsResult<JsValue> {
let observer = observer::Observer::new(f);
let loro = self
.doc
.as_ref()
.ok_or_else(|| JsError::new("Document is not attached"))?;
let doc_clone = loro.clone();
let ans = loro.subscribe(
let sub = loro.subscribe(
&self.handler.id(),
Arc::new(move |e| {
call_after_micro_task(observer.clone(), e, &doc_clone);
}),
);
Ok(ans.into_u32())
}
/// Unsubscribe by the subscription id.
///
/// @example
/// ```ts
/// import { LoroDoc } from "loro-crdt";
///
/// const doc = new LoroDoc();
/// const list = doc.getList("list");
/// const subscription = list.subscribe((event)=>{
/// console.log(event);
/// });
/// list.insert(0, 100);
/// doc.commit();
/// list.unsubscribe(subscription);
/// ```
pub fn unsubscribe(&self, subscription: u32) -> JsResult<()> {
self.doc
.as_ref()
.ok_or_else(|| JsError::new("Document is not attached"))?
.unsubscribe(SubID::from_u32(subscription));
Ok(())
Ok(subscription_to_js_function_callback(sub))
}
/// Get the length of list.
@ -3683,7 +3583,7 @@ impl LoroTree {
/// const node = root.createNode();
/// doc.commit();
/// ```
pub fn subscribe(&self, f: js_sys::Function) -> JsResult<u32> {
pub fn subscribe(&self, f: js_sys::Function) -> JsResult<JsValue> {
let observer = observer::Observer::new(f);
let doc = self
.doc
@ -3696,31 +3596,7 @@ impl LoroTree {
call_after_micro_task(observer.clone(), e, &doc_clone);
}),
);
Ok(ans.into_u32())
}
/// Unsubscribe by the subscription id.
///
/// @example
/// ```ts
/// import { LoroDoc } from "loro-crdt";
///
/// const doc = new LoroDoc();
/// const tree = doc.getTree("tree");
/// const subscription = tree.subscribe((event)=>{
/// console.log(event);
/// });
/// const root = tree.createNode();
/// const node = root.createNode();
/// doc.commit();
/// tree.unsubscribe(subscription);
/// ```
pub fn unsubscribe(&self, subscription: u32) -> JsResult<()> {
self.doc
.as_ref()
.ok_or_else(|| JsError::new("Document is not attached"))?
.unsubscribe(SubID::from_u32(subscription));
Ok(())
Ok(subscription_to_js_function_callback(ans))
}
/// Get the parent container of the tree container.
@ -4335,6 +4211,17 @@ fn js_to_export_mode(js_mode: JsExportMode) -> JsResult<ExportMode<'static>> {
}
}
fn subscription_to_js_function_callback(sub: Subscription) -> JsValue {
let mut sub = Some(sub);
let closure = Closure::wrap(Box::new(move || {
if let Some(sub) = sub.take() {
sub.unsubscribe();
}
}) as Box<dyn FnMut()>);
closure.into_js_value()
}
#[wasm_bindgen(typescript_custom_section)]
const TYPES: &'static str = r#"
/**

View file

@ -51,7 +51,6 @@ pub use loro_internal::kv_store::{KvStore, MemKvStore};
pub use loro_internal::loro::CommitOptions;
pub use loro_internal::loro::DocAnalysis;
pub use loro_internal::oplog::FrontiersNotIncluded;
pub use loro_internal::subscription::SubID;
pub use loro_internal::undo;
pub use loro_internal::version::{Frontiers, VersionVector, VersionVectorDiff};
pub use loro_internal::ApplyDiff;
@ -573,7 +572,7 @@ impl LoroDoc {
/// let text = doc.get_text("text");
/// let ran = Arc::new(AtomicBool::new(false));
/// let ran2 = ran.clone();
/// doc.subscribe(
/// let sub = doc.subscribe(
/// &text.id(),
/// Arc::new(move |event| {
/// assert!(event.triggered_by.is_local());
@ -593,7 +592,7 @@ impl LoroDoc {
/// assert!(ran.load(std::sync::atomic::Ordering::Relaxed));
/// ```
#[inline]
pub fn subscribe(&self, container_id: &ContainerID, callback: Subscriber) -> SubID {
pub fn subscribe(&self, container_id: &ContainerID, callback: Subscriber) -> Subscription {
self.doc.subscribe(
container_id,
Arc::new(move |e| {
@ -614,18 +613,13 @@ impl LoroDoc {
/// - `doc.import(data)` is called.
/// - `doc.checkout(version)` is called.
#[inline]
pub fn subscribe_root(&self, callback: Subscriber) -> SubID {
pub fn subscribe_root(&self, callback: Subscriber) -> Subscription {
// self.doc.subscribe_root(callback)
self.doc.subscribe_root(Arc::new(move |e| {
callback(DiffEvent::from(e));
}))
}
/// Remove a subscription by subscription id.
pub fn unsubscribe(&self, id: SubID) {
self.doc.unsubscribe(id)
}
/// Subscribe the local update of the document.
pub fn subscribe_local_update(&self, callback: LocalUpdateCallback) -> Subscription {
self.doc.subscribe_local_update(callback)

View file

@ -23,7 +23,7 @@ fn allow_editing_on_detached_mode_when_detached_editing_is_enabled() {
doc.set_peer_id(1).unwrap();
let string = Arc::new(Mutex::new(String::new()));
let string_clone = string.clone();
doc.subscribe_root(Arc::new(move |batch| {
let _g = doc.subscribe_root(Arc::new(move |batch| {
for e in batch.events {
match e.diff {
loro::event::Diff::Text(vec) => {
@ -177,7 +177,7 @@ fn allow_editing_on_detached_mode_when_detached_editing_is_enabled_2() {
doc.set_peer_id(1).unwrap();
let string = Arc::new(Mutex::new(String::new()));
let string_clone = string.clone();
doc.subscribe_root(Arc::new(move |batch| {
let _g = doc.subscribe_root(Arc::new(move |batch| {
for e in batch.events {
match e.diff {
loro::event::Diff::Text(vec) => {

View file

@ -504,7 +504,7 @@ fn test_richtext_checkout() -> LoroResult<()> {
text.delete(0, 5)?;
doc.commit();
doc.subscribe_root(Arc::new(|event| {
let _g = doc.subscribe_root(Arc::new(|event| {
dbg!(&event);
let t = event.events[0].diff.as_text().unwrap();
let i = t[0].as_insert().unwrap();

View file

@ -59,7 +59,7 @@ fn fork_doc() -> anyhow::Result<()> {
let triggered = Arc::new(AtomicBool::new(false));
let trigger_cloned = triggered.clone();
doc0.commit();
doc0.subscribe_root(Arc::new(move |e| {
let _g = doc0.subscribe_root(Arc::new(move |e| {
for e in e.events {
let _t = e.diff.as_text().unwrap();
triggered.store(true, std::sync::atomic::Ordering::Release);
@ -256,7 +256,7 @@ fn time_travel() {
let doc2 = LoroDoc::new();
let text = doc.get_text("text");
let text2 = doc2.get_text("text");
doc.subscribe(
let _g = doc.subscribe(
&text.id(),
Arc::new(move |x| {
for event in x.events {
@ -289,7 +289,7 @@ fn travel_back_should_remove_styles() {
let doc2 = LoroDoc::new();
let text = doc.get_text("text");
let text2 = doc2.get_text("text");
doc.subscribe(
let _g = doc.subscribe(
&text.id(),
Arc::new(move |x| {
for event in x.events {
@ -487,7 +487,7 @@ fn subscribe() {
let ran = Arc::new(AtomicBool::new(false));
let ran2 = ran.clone();
doc.subscribe(
let _g = doc.subscribe(
&text.id(),
Arc::new(move |event| {
assert!(matches!(

View file

@ -194,9 +194,10 @@ export function getType<T>(
return "Json" as any;
}
export type Subscription = () => void;
declare module "loro-wasm" {
interface LoroDoc {
subscribe(listener: Listener): number;
subscribe(listener: Listener): Subscription;
}
interface UndoManager {
@ -362,7 +363,7 @@ declare module "loro-wasm" {
insert<V extends T>(pos: number, value: Exclude<V, Container>): void;
delete(pos: number, len: number): void;
push<V extends T>(value: Exclude<V, Container>): void;
subscribe(listener: Listener): number;
subscribe(listener: Listener): Subscription;
getAttached(): undefined | LoroList<T>;
}
@ -438,7 +439,7 @@ declare module "loro-wasm" {
insert<V extends T>(pos: number, value: Exclude<V, Container>): void;
delete(pos: number, len: number): void;
push<V extends T>(value: Exclude<V, Container>): void;
subscribe(listener: Listener): number;
subscribe(listener: Listener): Subscription;
getAttached(): undefined | LoroMovableList<T>;
/**
* Set the value at the given position.
@ -563,14 +564,14 @@ declare module "loro-wasm" {
value: Exclude<V, Container>,
): void;
delete(key: string): void;
subscribe(listener: Listener): number;
subscribe(listener: Listener): Subscription;
}
interface LoroText {
new(): LoroText;
insert(pos: number, text: string): void;
delete(pos: number, len: number): void;
subscribe(listener: Listener): number;
subscribe(listener: Listener): Subscription;
}
interface LoroTree<
@ -605,7 +606,7 @@ declare module "loro-wasm" {
* Get LoroTreeNode by the TreeID.
*/
getNodeByID(target: TreeID): LoroTreeNode<T>;
subscribe(listener: Listener): number;
subscribe(listener: Listener): Subscription;
}
interface LoroTreeNode<

View file

@ -172,7 +172,7 @@ describe("event", () => {
// unsubscribe
const oldRan = ran;
text.unsubscribe(sub);
sub();
text.insert(0, "789");
loro.commit();
await oneMs();
@ -201,7 +201,7 @@ describe("event", () => {
expect(times).toBe(3);
// unsubscribe
loro.unsubscribe(sub);
sub()
text.insert(0, "123");
loro.commit();
await oneMs();
@ -227,7 +227,7 @@ describe("event", () => {
expect(times).toBe(2);
// unsubscribe
loro.unsubscribe(sub);
sub()
text.insert(0, "123");
loro.commit();
await oneMs();

View file

@ -13,7 +13,7 @@ describe("transaction", () => {
let count = 0;
const sub = loro.subscribe(() => {
count += 1;
loro.unsubscribe(sub);
sub();
});
expect(count).toBe(0);
text.insert(0, "hello world");
@ -31,7 +31,7 @@ describe("transaction", () => {
let count = 0;
const sub = loro.subscribe((event: { origin: string }) => {
count += 1;
loro.unsubscribe(sub);
sub();
assertEquals(event.origin, "origin");
});
@ -72,7 +72,7 @@ describe("subscribe", () => {
loro.commit();
await one_ms();
assertEquals(count, 3);
loro.unsubscribe(sub);
sub();
text.insert(0, "hello world");
loro.commit();
await one_ms();
@ -85,7 +85,7 @@ describe("subscribe", () => {
let count = 0;
const sub = loro.subscribe(() => {
count += 1;
loro.unsubscribe(sub);
sub()
});
assertEquals(count, 0);
text.insert(0, "hello world");
@ -115,7 +115,7 @@ describe("subscribe", () => {
loro.commit();
await one_ms();
assertEquals(count, 2);
loro.unsubscribe(sub);
sub();
text.insert(0, "hello world");
loro.commit();
await one_ms();

View file

@ -165,7 +165,7 @@ describe("movable list", () => {
list.push("c");
let called = false;
let calledTimes = 0;
const id = list.subscribe((event) => {
const unsub = list.subscribe((event) => {
expect(event.by).toBe("local");
for (const e of event.events) {
expect(e.target).toBe(list.id);
@ -190,7 +190,7 @@ describe("movable list", () => {
await new Promise((r) => setTimeout(r, 1));
expect(called).toBeTruthy();
expect(calledTimes).toBe(1);
list.unsubscribe(id);
unsub();
list.push("d");
doc.commit();
await new Promise((r) => setTimeout(r, 1));

View file

@ -1,4 +1,4 @@
import { assert, describe, expect, it} from "vitest";
import { assert, describe, expect, it } from "vitest";
import { LoroDoc, LoroTree, LoroTreeNode, TreeDiff } from "../src";
function assertEquals(a: any, b: any) {
@ -36,7 +36,7 @@ describe("loro tree", () => {
tree.move(child2.id, child.id);
assertEquals(child2.parent()!.id, child.id);
assertEquals(child.children()![0].id, child2.id);
expect(()=>tree.move(child2.id, child.id, 1)).toThrowError();
expect(() => tree.move(child2.id, child.id, 1)).toThrowError();
});
it("delete", () => {
@ -80,7 +80,7 @@ describe("loro tree", () => {
assertEquals(root.children()![1].id, child2.id);
});
it("toArray", ()=>{
it("toArray", () => {
const loro2 = new LoroDoc();
const tree2 = loro2.getTree("root");
const root = tree2.createNode();
@ -98,18 +98,18 @@ describe("loro tree", () => {
assert(keys.includes("children"));
});
it("getNodes", ()=>{
it("getNodes", () => {
const loro2 = new LoroDoc();
const tree2 = loro2.getTree("root");
const root = tree2.createNode();
const child = root.createNode();
const nodes = tree2.getNodes({withDeleted: false});
const nodes = tree2.getNodes({ withDeleted: false });
assertEquals(nodes.length, 2);
assertEquals(nodes.map((n)=>{return n.id}), [root.id, child.id])
assertEquals(nodes.map((n) => { return n.id }), [root.id, child.id])
tree2.delete(child.id);
const nodesWithDeleted = tree2.getNodes({withDeleted:true});
assertEquals(nodesWithDeleted.map((n)=>{return n.id}), [root.id, child.id]);
assertEquals(tree2.getNodes({withDeleted: false}).map((n)=>{return n.id}), [root.id]);
const nodesWithDeleted = tree2.getNodes({ withDeleted: true });
assertEquals(nodesWithDeleted.map((n) => { return n.id }), [root.id, child.id]);
assertEquals(tree2.getNodes({ withDeleted: false }).map((n) => { return n.id }), [root.id]);
});
it("subscribe", async () => {
@ -125,7 +125,7 @@ describe("loro tree", () => {
loro.commit();
await one_ms();
assertEquals(count, 1);
loro.unsubscribe(sub);
sub();
child.data.set("a", 123);
loro.commit();
await one_ms();
@ -139,93 +139,93 @@ describe("loro tree", () => {
});
});
describe("loro tree node", ()=>{
const loro = new LoroDoc();
const tree = loro.getTree("root");
tree.enableFractionalIndex(0);
describe("loro tree node", () => {
const loro = new LoroDoc();
const tree = loro.getTree("root");
tree.enableFractionalIndex(0);
it("create", () => {
const root = tree.createNode();
const child = root.createNode();
assertEquals(child.parent()!.id, root.id);
const child2 = root.createNode();
assertEquals(child.index(), 0);
assertEquals(child2.index(), 1);
});
it("create", () => {
const root = tree.createNode();
const child = root.createNode();
assertEquals(child.parent()!.id, root.id);
const child2 = root.createNode();
assertEquals(child.index(), 0);
assertEquals(child2.index(), 1);
});
it("create with index", () => {
const root = tree.createNode();
const child = root.createNode();
assertEquals(child.parent()!.id, root.id);
const child2 = root.createNode(0);
assertEquals(child.index(), 1);
assertEquals(child2.index(), 0);
});
it("create with index", () => {
const root = tree.createNode();
const child = root.createNode();
assertEquals(child.parent()!.id, root.id);
const child2 = root.createNode(0);
assertEquals(child.index(), 1);
assertEquals(child2.index(), 0);
});
it("moveTo", () => {
const root = tree.createNode();
const child = root.createNode();
const child2 = root.createNode();
assertEquals(child2.parent()!.id, root.id);
child2.move(child);
assertEquals(child2.parent()!.id, child.id);
assertEquals(child.children()![0].id, child2.id);
expect(()=>child2.move(child, 1)).toThrowError();
});
it("moveTo", () => {
const root = tree.createNode();
const child = root.createNode();
const child2 = root.createNode();
assertEquals(child2.parent()!.id, root.id);
child2.move(child);
assertEquals(child2.parent()!.id, child.id);
assertEquals(child.children()![0].id, child2.id);
expect(() => child2.move(child, 1)).toThrowError();
});
it("moveAfter", () => {
const root = tree.createNode();
const child = root.createNode();
const child2 = root.createNode();
assertEquals(child2.parent()!.id, root.id);
child2.moveAfter(child);
assertEquals(child2.parent()!.id, root.id);
assertEquals(child.index(), 0);
assertEquals(child2.index(), 1);
});
it("moveAfter", () => {
const root = tree.createNode();
const child = root.createNode();
const child2 = root.createNode();
assertEquals(child2.parent()!.id, root.id);
child2.moveAfter(child);
assertEquals(child2.parent()!.id, root.id);
assertEquals(child.index(), 0);
assertEquals(child2.index(), 1);
});
it("moveBefore", () => {
const root = tree.createNode();
const child = root.createNode();
const child2 = root.createNode();
assertEquals(child2.parent()!.id, root.id);
child2.moveBefore(child);
assertEquals(child2.parent()!.id, root.id);
assertEquals(child.index(), 1);
assertEquals(child2.index(), 0);
});
it("moveBefore", () => {
const root = tree.createNode();
const child = root.createNode();
const child2 = root.createNode();
assertEquals(child2.parent()!.id, root.id);
child2.moveBefore(child);
assertEquals(child2.parent()!.id, root.id);
assertEquals(child.index(), 1);
assertEquals(child2.index(), 0);
});
it("index", () => {
const root = tree.createNode();
const child = tree.createNode(root.id);
const child2 = tree.createNode(root.id, 0);
assertEquals(child.index(), 1);
assertEquals(child2.index(), 0);
});
it("index", () => {
const root = tree.createNode();
const child = tree.createNode(root.id);
const child2 = tree.createNode(root.id, 0);
assertEquals(child.index(), 1);
assertEquals(child2.index(), 0);
});
it("old parent", () => {
const root = tree.createNode();
const child = root.createNode();
const child2 = root.createNode();
loro.commit();
const subID = tree.subscribe((e)=>{
if(e.events[0].diff.type == "tree"){
const diff = e.events[0].diff as TreeDiff;
if (diff.diff[0].action == "move"){
assertEquals(diff.diff[0].old_parent, root.id);
assertEquals(diff.diff[0].old_index, 1);
}
}
});
child2.move(child);
loro.commit();
tree.unsubscribe(subID);
assertEquals(child2.parent()!.id, child.id);
it("old parent", () => {
const root = tree.createNode();
const child = root.createNode();
const child2 = root.createNode();
loro.commit();
const unsub = tree.subscribe((e) => {
if (e.events[0].diff.type == "tree") {
const diff = e.events[0].diff as TreeDiff;
if (diff.diff[0].action == "move") {
assertEquals(diff.diff[0].old_parent, root.id);
assertEquals(diff.diff[0].old_index, 1);
}
}
});
child2.move(child);
loro.commit();
unsub()
assertEquals(child2.parent()!.id, child.id);
});
});
describe("LoroTree", () => {
it ("move", () => {
it("move", () => {
const loro = new LoroDoc();
const tree = loro.getTree("root");
const root = tree.createNode();