fix: subscription convert error (#525)
Some checks are pending
Release WASM / Release (push) Waiting to run
Test All / build (push) Waiting to run

This commit is contained in:
Zixuan Chen 2024-10-22 08:06:37 +08:00 committed by GitHub
parent 484d6db7a1
commit 2622e301d0
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 71 additions and 18 deletions

View file

@ -28,7 +28,7 @@ use loro_internal::{
};
use rle::HasLength;
use serde::{Deserialize, Serialize};
use std::{cell::RefCell, cmp::Ordering, mem::ManuallyDrop, ops::ControlFlow, rc::Rc, sync::Arc};
use std::{cell::RefCell, cmp::Ordering, ops::ControlFlow, rc::Rc, sync::Arc};
use wasm_bindgen::{__rt::IntoJsResult, prelude::*, throw_val};
use wasm_bindgen_derive::TryFromJsValue;
@ -1362,20 +1362,16 @@ impl LoroDoc {
#[wasm_bindgen(js_name = "subscribeLocalUpdates", skip_typescript)]
pub fn subscribe_local_updates(&self, f: js_sys::Function) -> JsValue {
let observer = observer::Observer::new(f);
let mut sub = Some(self.0.subscribe_local_update(Box::new(move |e| {
let sub = self.0.subscribe_local_update(Box::new(move |e| {
let arr = js_sys::Uint8Array::new_with_length(e.len() as u32);
arr.copy_from(e);
if let Err(e) = observer.call1(&arr.into()) {
console_error!("Error: {:?}", e);
}
true
})));
}));
let closure = Closure::wrap(Box::new(move || {
drop(sub.take());
}) as Box<dyn FnMut()>);
closure.into_js_value()
subscription_to_js_function_callback(sub)
}
/// Debug the size of the history
@ -4359,10 +4355,22 @@ fn js_to_export_mode(js_mode: JsExportMode) -> JsResult<ExportMode<'static>> {
}
fn subscription_to_js_function_callback(sub: Subscription) -> JsValue {
let mut sub: Option<ManuallyDrop<Subscription>> = Some(ManuallyDrop::new(sub));
struct JsSubscription {
sub: Option<Subscription>,
}
impl Drop for JsSubscription {
fn drop(&mut self) {
if let Some(sub) = self.sub.take() {
sub.detach();
}
}
}
let mut sub: JsSubscription = JsSubscription { sub: Some(sub) };
let closure = Closure::wrap(Box::new(move || {
if let Some(sub) = sub.take() {
ManuallyDrop::into_inner(sub).unsubscribe();
if let Some(sub) = sub.sub.take() {
sub.unsubscribe()
}
}) as Box<dyn FnMut()>);

View file

@ -20,7 +20,7 @@
"scripts": {
"build": "rollup -c",
"watch": "rollup -c -w",
"test": "vitest run && npx tsc --noEmit",
"test": "node --expose-gc ./node_modules/vitest/vitest.mjs run && npx tsc --noEmit",
"prepublish": "pnpm run build"
},
"author": "Loro",

View file

@ -201,7 +201,7 @@ describe("event", () => {
expect(times).toBe(3);
// unsubscribe
sub()
sub();
text.insert(0, "123");
loro.commit();
await oneMs();
@ -227,7 +227,7 @@ describe("event", () => {
expect(times).toBe(2);
// unsubscribe
sub()
sub();
text.insert(0, "123");
loro.commit();
await oneMs();
@ -448,13 +448,13 @@ describe("event", () => {
map.set("key", "value");
loro.commit();
expect(updates).toBe(1); // All changes are bundled in one update
expect(updates).toBe(1); // All changes are bundled in one update
text.insert(5, "!");
loro.commit();
expect(updates).toBe(2);
})
});
it("can be used to sync", () => {
const loro1 = new Loro();
@ -489,8 +489,53 @@ describe("event", () => {
// Both documents should converge to the same state
expect(text1.toString()).toBe("1. Hello World!");
expect(text2.toString()).toBe("1. Hello World!");
})
})
});
});
});
it("subscription works after timeout", async () => {
const doc = new LoroDoc();
let times = 0;
doc.subscribe(() => {
times += 1;
});
for (let i = 0; i < 3; i++) {
if ((globalThis as any).gc) {
(globalThis as any).gc();
} else {
throw new Error("No GC");
}
const s = i.toString();
doc.getText("text").insert(0, s);
doc.commit();
await oneMs();
expect(times).toBe(1);
times = 0;
await new Promise((resolve) => setTimeout(resolve, 10));
}
});
it("subscription for local updates works after timeout", async () => {
const doc = new LoroDoc();
let times = 0;
doc.subscribeLocalUpdates(() => {
times += 1;
});
for (let i = 0; i < 3; i++) {
if ((globalThis as any).gc) {
(globalThis as any).gc();
} else {
throw new Error("No GC");
}
doc.getText("text").insert(0, "h");
doc.commit();
await oneMs();
expect(times).toBe(1);
times = 0;
await new Promise((resolve) => setTimeout(resolve, 10));
}
});
function oneMs(): Promise<void> {