From 2622e301d03d4821c190a7fccc9fe57835c57c3c Mon Sep 17 00:00:00 2001 From: Zixuan Chen Date: Tue, 22 Oct 2024 08:06:37 +0800 Subject: [PATCH] fix: subscription convert error (#525) --- crates/loro-wasm/src/lib.rs | 30 ++++++++++++------- loro-js/package.json | 2 +- loro-js/tests/event.test.ts | 57 +++++++++++++++++++++++++++++++++---- 3 files changed, 71 insertions(+), 18 deletions(-) diff --git a/crates/loro-wasm/src/lib.rs b/crates/loro-wasm/src/lib.rs index f22d3542..08c37882 100644 --- a/crates/loro-wasm/src/lib.rs +++ b/crates/loro-wasm/src/lib.rs @@ -28,7 +28,7 @@ use loro_internal::{ }; use rle::HasLength; use serde::{Deserialize, Serialize}; -use std::{cell::RefCell, cmp::Ordering, mem::ManuallyDrop, ops::ControlFlow, rc::Rc, sync::Arc}; +use std::{cell::RefCell, cmp::Ordering, ops::ControlFlow, rc::Rc, sync::Arc}; use wasm_bindgen::{__rt::IntoJsResult, prelude::*, throw_val}; use wasm_bindgen_derive::TryFromJsValue; @@ -1362,20 +1362,16 @@ impl LoroDoc { #[wasm_bindgen(js_name = "subscribeLocalUpdates", skip_typescript)] pub fn subscribe_local_updates(&self, f: js_sys::Function) -> JsValue { let observer = observer::Observer::new(f); - let mut sub = Some(self.0.subscribe_local_update(Box::new(move |e| { + let sub = self.0.subscribe_local_update(Box::new(move |e| { let arr = js_sys::Uint8Array::new_with_length(e.len() as u32); arr.copy_from(e); if let Err(e) = observer.call1(&arr.into()) { console_error!("Error: {:?}", e); } true - }))); + })); - let closure = Closure::wrap(Box::new(move || { - drop(sub.take()); - }) as Box); - - closure.into_js_value() + subscription_to_js_function_callback(sub) } /// Debug the size of the history @@ -4359,10 +4355,22 @@ fn js_to_export_mode(js_mode: JsExportMode) -> JsResult> { } fn subscription_to_js_function_callback(sub: Subscription) -> JsValue { - let mut sub: Option> = Some(ManuallyDrop::new(sub)); + struct JsSubscription { + sub: Option, + } + + impl Drop for JsSubscription { + fn drop(&mut self) { + if let Some(sub) = self.sub.take() { + sub.detach(); + } + } + } + + let mut sub: JsSubscription = JsSubscription { sub: Some(sub) }; let closure = Closure::wrap(Box::new(move || { - if let Some(sub) = sub.take() { - ManuallyDrop::into_inner(sub).unsubscribe(); + if let Some(sub) = sub.sub.take() { + sub.unsubscribe() } }) as Box); diff --git a/loro-js/package.json b/loro-js/package.json index 532c749c..4fdb6bbe 100644 --- a/loro-js/package.json +++ b/loro-js/package.json @@ -20,7 +20,7 @@ "scripts": { "build": "rollup -c", "watch": "rollup -c -w", - "test": "vitest run && npx tsc --noEmit", + "test": "node --expose-gc ./node_modules/vitest/vitest.mjs run && npx tsc --noEmit", "prepublish": "pnpm run build" }, "author": "Loro", diff --git a/loro-js/tests/event.test.ts b/loro-js/tests/event.test.ts index 2e64d939..2af292c8 100644 --- a/loro-js/tests/event.test.ts +++ b/loro-js/tests/event.test.ts @@ -201,7 +201,7 @@ describe("event", () => { expect(times).toBe(3); // unsubscribe - sub() + sub(); text.insert(0, "123"); loro.commit(); await oneMs(); @@ -227,7 +227,7 @@ describe("event", () => { expect(times).toBe(2); // unsubscribe - sub() + sub(); text.insert(0, "123"); loro.commit(); await oneMs(); @@ -448,13 +448,13 @@ describe("event", () => { map.set("key", "value"); loro.commit(); - expect(updates).toBe(1); // All changes are bundled in one update + expect(updates).toBe(1); // All changes are bundled in one update text.insert(5, "!"); loro.commit(); expect(updates).toBe(2); - }) + }); it("can be used to sync", () => { const loro1 = new Loro(); @@ -489,8 +489,53 @@ describe("event", () => { // Both documents should converge to the same state expect(text1.toString()).toBe("1. Hello World!"); expect(text2.toString()).toBe("1. Hello World!"); - }) - }) + }); + }); +}); + +it("subscription works after timeout", async () => { + const doc = new LoroDoc(); + let times = 0; + doc.subscribe(() => { + times += 1; + }); + + for (let i = 0; i < 3; i++) { + if ((globalThis as any).gc) { + (globalThis as any).gc(); + } else { + throw new Error("No GC"); + } + const s = i.toString(); + doc.getText("text").insert(0, s); + doc.commit(); + await oneMs(); + expect(times).toBe(1); + times = 0; + await new Promise((resolve) => setTimeout(resolve, 10)); + } +}); + +it("subscription for local updates works after timeout", async () => { + const doc = new LoroDoc(); + let times = 0; + doc.subscribeLocalUpdates(() => { + times += 1; + }); + + for (let i = 0; i < 3; i++) { + if ((globalThis as any).gc) { + (globalThis as any).gc(); + } else { + throw new Error("No GC"); + } + doc.getText("text").insert(0, "h"); + doc.commit(); + await oneMs(); + expect(times).toBe(1); + times = 0; + await new Promise((resolve) => setTimeout(resolve, 10)); + } }); function oneMs(): Promise {