From bdf006ed2e16b2b8df5fa50d79e195b14a7ee431 Mon Sep 17 00:00:00 2001 From: Zixuan Chen Date: Mon, 23 Sep 2024 09:51:07 +0800 Subject: [PATCH] feat: fork doc at the target version (#469) --- crates/loro-common/src/error.rs | 6 ++ crates/loro-common/src/lib.rs | 2 +- crates/loro-internal/src/encoding.rs | 13 ++++ .../src/encoding/fast_snapshot.rs | 52 +++++++++++++- crates/loro-internal/src/encoding/gc.rs | 1 - crates/loro-internal/src/fork.rs | 45 ++++++++++++ crates/loro-internal/src/lib.rs | 1 + crates/loro-internal/src/loro.rs | 17 +++-- crates/loro-internal/src/oplog.rs | 8 +++ .../loro-internal/src/oplog/change_store.rs | 34 ++++++++++ crates/loro-internal/src/version.rs | 32 +++++++++ crates/loro/src/lib.rs | 7 ++ crates/loro/tests/integration_test/mod.rs | 1 + .../integration_test/snapshot_at_test.rs | 68 +++++++++++++++++++ crates/loro/tests/loro_rust_test.rs | 30 ++++++++ 15 files changed, 309 insertions(+), 8 deletions(-) create mode 100644 crates/loro-internal/src/fork.rs create mode 100644 crates/loro/tests/integration_test/snapshot_at_test.rs diff --git a/crates/loro-common/src/error.rs b/crates/loro-common/src/error.rs index d107ef92..3997e429 100644 --- a/crates/loro-common/src/error.rs +++ b/crates/loro-common/src/error.rs @@ -102,6 +102,12 @@ pub enum LoroTreeError { TreeNodeDeletedOrNotExist(TreeID), } +#[derive(Error, Debug, PartialEq)] +pub enum LoroEncodeError { + #[error("The frontiers are not found in this doc: {0}")] + FrontiersNotFound(String), +} + #[cfg(feature = "wasm")] pub mod wasm { use wasm_bindgen::JsValue; diff --git a/crates/loro-common/src/lib.rs b/crates/loro-common/src/lib.rs index 3b1bbbfd..747efc95 100644 --- a/crates/loro-common/src/lib.rs +++ b/crates/loro-common/src/lib.rs @@ -13,7 +13,7 @@ mod macros; mod span; mod value; -pub use error::{LoroError, LoroResult, LoroTreeError}; +pub use error::{LoroEncodeError, LoroError, LoroResult, LoroTreeError}; #[doc(hidden)] pub use fxhash::FxHashMap; pub use internal_string::InternalString; diff --git a/crates/loro-internal/src/encoding.rs b/crates/loro-internal/src/encoding.rs index bedc8c4e..4b45db53 100644 --- a/crates/loro-internal/src/encoding.rs +++ b/crates/loro-internal/src/encoding.rs @@ -29,6 +29,7 @@ pub enum ExportMode<'a> { UpdatesInRange { spans: Cow<'a, [IdSpan]> }, GcSnapshot(Cow<'a, Frontiers>), StateOnly(Option>), + SnapshotAt { version: Cow<'a, Frontiers> }, } impl<'a> ExportMode<'a> { @@ -76,6 +77,12 @@ impl<'a> ExportMode<'a> { pub fn state_only(frontiers: Option<&'a Frontiers>) -> Self { ExportMode::StateOnly(frontiers.map(Cow::Borrowed)) } + + pub fn snapshot_at(frontiers: &'a Frontiers) -> Self { + ExportMode::SnapshotAt { + version: Cow::Borrowed(frontiers), + } + } } const MAGIC_BYTES: [u8; 4] = *b"loro"; @@ -324,6 +331,12 @@ pub(crate) fn export_fast_snapshot(doc: &LoroDoc) -> Vec { }) } +pub(crate) fn export_fast_snapshot_at(doc: &LoroDoc, frontiers: &Frontiers) -> Vec { + encode_with(EncodeMode::FastSnapshot, &mut |ans| { + fast_snapshot::encode_snapshot_at(doc, frontiers, ans).unwrap(); + }) +} + pub(crate) fn export_fast_updates(doc: &LoroDoc, vv: &VersionVector) -> Vec { encode_with(EncodeMode::FastUpdates, &mut |ans| { fast_snapshot::encode_updates(doc, vv, ans); diff --git a/crates/loro-internal/src/encoding/fast_snapshot.rs b/crates/loro-internal/src/encoding/fast_snapshot.rs index e97fb289..cfbefa54 100644 --- a/crates/loro-internal/src/encoding/fast_snapshot.rs +++ b/crates/loro-internal/src/encoding/fast_snapshot.rs @@ -15,9 +15,9 @@ //! use std::io::{Read, Write}; -use crate::{encoding::gc, oplog::ChangeStore, LoroDoc, OpLog, VersionVector}; +use crate::{encoding::gc, oplog::ChangeStore, version::Frontiers, LoroDoc, OpLog, VersionVector}; use bytes::{Buf, Bytes}; -use loro_common::{IdSpan, LoroError, LoroResult}; +use loro_common::{IdSpan, LoroEncodeError, LoroError, LoroResult}; use tracing::trace; use super::encode_reordered::{import_changes_to_oplog, ImportChangesResult}; @@ -163,6 +163,7 @@ pub(crate) fn encode_snapshot(doc: &LoroDoc, w: &mut W) { let oplog = doc.oplog().try_lock().unwrap(); let is_gc = state.store.gc_store().is_some(); if is_gc { + // TODO: PERF: this can be optimized by reusing the bytes of gc store let f = oplog.trimmed_frontiers().clone(); drop(oplog); drop(state); @@ -193,6 +194,53 @@ pub(crate) fn encode_snapshot(doc: &LoroDoc, w: &mut W) { ); } +pub(crate) fn encode_snapshot_at( + doc: &LoroDoc, + frontiers: &Frontiers, + w: &mut W, +) -> Result<(), LoroEncodeError> { + let version_before_start = doc.oplog_frontiers(); + doc.checkout_without_emitting(frontiers).unwrap(); + { + let mut state = doc.app_state().try_lock().unwrap(); + let oplog = doc.oplog().try_lock().unwrap(); + let is_gc = state.store.gc_store().is_some(); + if is_gc { + unimplemented!() + } + + assert!(!state.is_in_txn()); + let Some(oplog_bytes) = oplog.fork_changes_up_to(frontiers) else { + return Err(LoroEncodeError::FrontiersNotFound(format!( + "frontiers: {:?} when export in SnapshotAt mode", + frontiers + ))); + }; + + if oplog.is_trimmed() { + assert_eq!( + oplog.trimmed_frontiers(), + state.store.trimmed_frontiers().unwrap() + ); + } + + state.ensure_all_alive_containers(); + let state_bytes = state.store.encode(); + _encode_snapshot( + Snapshot { + oplog_bytes, + state_bytes: Some(state_bytes), + gc_bytes: Bytes::new(), + }, + w, + ); + } + doc.checkout_without_emitting(&version_before_start) + .unwrap(); + doc.ignore_events(); + Ok(()) +} + pub(crate) fn decode_oplog(oplog: &mut OpLog, bytes: &[u8]) -> Result<(), LoroError> { let oplog_len = u32::from_le_bytes(bytes[0..4].try_into().unwrap()); let oplog_bytes = &bytes[4..4 + oplog_len as usize]; diff --git a/crates/loro-internal/src/encoding/gc.rs b/crates/loro-internal/src/encoding/gc.rs index 0fb87f10..89aab830 100644 --- a/crates/loro-internal/src/encoding/gc.rs +++ b/crates/loro-internal/src/encoding/gc.rs @@ -27,7 +27,6 @@ pub(crate) fn export_gc_snapshot( assert!(!doc.is_detached()); let oplog = doc.oplog().lock().unwrap(); let start_from = calc_gc_doc_start(&oplog, start_from); - trace!("gc_start_from {:?}", &start_from); let mut start_vv = oplog.dag().frontiers_to_vv(&start_from).unwrap(); for id in start_from.iter() { // we need to include the ops in start_from, this can make things easier diff --git a/crates/loro-internal/src/fork.rs b/crates/loro-internal/src/fork.rs new file mode 100644 index 00000000..5baaa512 --- /dev/null +++ b/crates/loro-internal/src/fork.rs @@ -0,0 +1,45 @@ +//! The fork module provides functionality to create a new LoroDoc instance at a specified version +//! (Frontiers) with minimal overhead. +//! +//! # Implementation Overview +//! +//! The `fork_at` function in this module allows for the creation of a new document that reflects +//! the state of the original document at a given version. The function achieves this by: +//! +//! ## Exporting Necessary Data: +//! +//! - **Change Store Data**: Collects all changes up to the specified version from the change +//! store's key-value (kv) data store. It includes the version vector and frontiers for accurate +//! identification of the version. +//! +//! - **Container Store Data**: Exports the container store's kv data representing the document's +//! state at the specified version. This involves checking out to the desired version, exporting +//! the state, and efficiently checking back to the latest version. +//! +//! - **GC Store Data**: If applicable, exports the gc store's kv data, ensuring that version +//! identifiers are included. +//! +//! ## Reconstructing the New Document: +//! +//! Imports the exported data into a new LoroDoc instance using optimized import mechanisms +//! similar to those used in fast snapshot imports. +//! +//! By focusing on exporting only the necessary data and optimizing state transitions during +//! version checkout, the `fork_at` function minimizes overhead and efficiently creates new +//! document instances representing past versions. +//! +use std::borrow::Cow; + +use crate::{version::Frontiers, LoroDoc}; + +impl LoroDoc { + /// Creates a new LoroDoc at a specified version (Frontiers) + pub fn fork_at(&self, frontiers: &Frontiers) -> LoroDoc { + let bytes = self.export(crate::loro::ExportMode::SnapshotAt { + version: Cow::Borrowed(frontiers), + }); + let doc = LoroDoc::new(); + doc.import(&bytes).unwrap(); + doc + } +} diff --git a/crates/loro-internal/src/lib.rs b/crates/loro-internal/src/lib.rs index e3fe82ea..179cd420 100644 --- a/crates/loro-internal/src/lib.rs +++ b/crates/loro-internal/src/lib.rs @@ -31,6 +31,7 @@ pub mod container; pub mod cursor; pub mod dag; pub mod encoding; +pub(crate) mod fork; pub mod id; #[cfg(feature = "jsonpath")] pub mod jsonpath; diff --git a/crates/loro-internal/src/loro.rs b/crates/loro-internal/src/loro.rs index 745dfd7e..0e985cb7 100644 --- a/crates/loro-internal/src/loro.rs +++ b/crates/loro-internal/src/loro.rs @@ -28,9 +28,10 @@ use crate::{ dag::DagUtils, diff_calc::DiffCalculator, encoding::{ - decode_snapshot, export_fast_snapshot, export_fast_updates, export_fast_updates_in_range, - export_gc_snapshot, export_snapshot, export_state_only_snapshot, - json_schema::json::JsonSchema, parse_header_and_body, EncodeMode, ParsedHeaderAndBody, + decode_snapshot, export_fast_snapshot, export_fast_snapshot_at, export_fast_updates, + export_fast_updates_in_range, export_gc_snapshot, export_snapshot, + export_state_only_snapshot, json_schema::json::JsonSchema, parse_header_and_body, + EncodeMode, ParsedHeaderAndBody, }, event::{str_to_path, EventTriggerKind, Index, InternalDocDiff}, handler::{Handler, MovableListHandler, TextHandler, TreeHandler, ValueOrHandler}, @@ -644,6 +645,13 @@ impl LoroDoc { } } + pub(crate) fn ignore_events(&self) { + let _events = { + let mut state = self.state.lock().unwrap(); + state.take_events() + }; + } + #[instrument(skip_all)] pub fn export_snapshot(&self) -> Vec { self.commit_then_stop(); @@ -1133,7 +1141,7 @@ impl LoroDoc { } #[instrument(level = "info", skip(self))] - fn checkout_without_emitting(&self, frontiers: &Frontiers) -> Result<(), LoroError> { + pub(crate) fn checkout_without_emitting(&self, frontiers: &Frontiers) -> Result<(), LoroError> { self.commit_then_stop(); let from_frontiers = self.state_frontiers(); info!( @@ -1529,6 +1537,7 @@ impl LoroDoc { Some(f) => export_state_only_snapshot(self, &f), None => export_state_only_snapshot(self, &self.oplog_frontiers()), }, + ExportMode::SnapshotAt { version } => export_fast_snapshot_at(self, &version), }; self.renew_txn_if_auto_commit(); diff --git a/crates/loro-internal/src/oplog.rs b/crates/loro-internal/src/oplog.rs index 1c5acc21..b5e6bfde 100644 --- a/crates/loro-internal/src/oplog.rs +++ b/crates/loro-internal/src/oplog.rs @@ -392,6 +392,14 @@ impl OpLog { self.change_store.export_blocks_in_range(spans, w) } + pub(crate) fn fork_changes_up_to(&self, frontiers: &Frontiers) -> Option { + let vv = self.dag.frontiers_to_vv(frontiers)?; + Some( + self.change_store + .fork_changes_up_to(self.dag.trimmed_vv(), frontiers, &vv), + ) + } + #[inline(always)] pub(crate) fn decode(&mut self, data: ParsedHeaderAndBody) -> Result<(), LoroError> { decode_oplog(self, data) diff --git a/crates/loro-internal/src/oplog/change_store.rs b/crates/loro-internal/src/oplog/change_store.rs index 3947ca92..f2c5de36 100644 --- a/crates/loro-internal/src/oplog/change_store.rs +++ b/crates/loro-internal/src/oplog/change_store.rs @@ -500,6 +500,40 @@ impl ChangeStore { let arena = &self.arena; encode_blocks_in_store(new_store, arena, w); } + + pub(crate) fn fork_changes_up_to( + &self, + start_vv: &ImVersionVector, + frontiers: &Frontiers, + vv: &VersionVector, + ) -> Bytes { + let new_store = ChangeStore::new_mem(&self.arena, self.merge_interval.clone()); + for mut span in vv.sub_iter_im(start_vv) { + let counter_lower_bound = start_vv.get(&span.peer).copied().unwrap_or(0); + span.counter.start = span.counter.start.max(counter_lower_bound); + span.counter.end = span.counter.end.max(counter_lower_bound); + if span.counter.start >= span.counter.end { + continue; + } + + // PERF: this can be optimized by reusing the current encoded blocks + // In the current method, it needs to parse and re-encode the blocks + for c in self.iter_changes(span) { + let start = ((start_vv.get(&c.id.peer).copied().unwrap_or(0) - c.id.counter).max(0) + as usize) + .min(c.atom_len()); + let end = ((vv.get(&c.id.peer).copied().unwrap_or(0) - c.id.counter).max(0) + as usize) + .min(c.atom_len()); + + assert_ne!(start, end); + let ch = c.slice(start, end); + new_store.insert_change(ch, false); + } + } + + new_store.encode_all(vv, frontiers) + } } fn encode_blocks_in_store( diff --git a/crates/loro-internal/src/version.rs b/crates/loro-internal/src/version.rs index 7cd840bb..8de8234a 100644 --- a/crates/loro-internal/src/version.rs +++ b/crates/loro-internal/src/version.rs @@ -623,6 +623,38 @@ impl VersionVector { }) } + /// Returns the spans that are in `self` but not in `rhs` + pub fn sub_iter_im<'a>( + &'a self, + rhs: &'a ImVersionVector, + ) -> impl Iterator + 'a { + self.iter().filter_map(move |(peer, &counter)| { + if let Some(&rhs_counter) = rhs.get(peer) { + if counter > rhs_counter { + Some(IdSpan { + peer: *peer, + counter: CounterSpan { + start: rhs_counter, + end: counter, + }, + }) + } else { + None + } + } else if counter > 0 { + Some(IdSpan { + peer: *peer, + counter: CounterSpan { + start: 0, + end: counter, + }, + }) + } else { + None + } + }) + } + /// Iter all span from a -> b and b -> a pub fn iter_between<'a>(&'a self, other: &'a Self) -> impl Iterator + 'a { // PERF: can be optimized a little diff --git a/crates/loro/src/lib.rs b/crates/loro/src/lib.rs index bfc7895c..91d7990e 100644 --- a/crates/loro/src/lib.rs +++ b/crates/loro/src/lib.rs @@ -742,6 +742,13 @@ impl LoroDoc { .collect() }) } + + /// Fork the document at the given frontiers. + pub fn fork_at(&self, frontiers: &Frontiers) -> LoroDoc { + let new_doc = self.doc.fork_at(frontiers); + new_doc.start_auto_commit(); + LoroDoc::_new(new_doc) + } } /// It's used to prevent the user from implementing the trait directly. diff --git a/crates/loro/tests/integration_test/mod.rs b/crates/loro/tests/integration_test/mod.rs index f215a896..f56e1582 100644 --- a/crates/loro/tests/integration_test/mod.rs +++ b/crates/loro/tests/integration_test/mod.rs @@ -3,6 +3,7 @@ use loro::LoroDoc; mod gc_test; #[cfg(feature = "jsonpath")] mod jsonpath_test; +mod snapshot_at_test; mod undo_test; fn gen_action(doc: &LoroDoc, seed: u64, mut ops_len: usize) { diff --git a/crates/loro/tests/integration_test/snapshot_at_test.rs b/crates/loro/tests/integration_test/snapshot_at_test.rs new file mode 100644 index 00000000..63c4e6a1 --- /dev/null +++ b/crates/loro/tests/integration_test/snapshot_at_test.rs @@ -0,0 +1,68 @@ +use std::borrow::Cow; + +use super::gen_action; +use loro::{ExportMode, LoroDoc, ToJson}; +use serde_json::json; + +#[test] +fn test_snapshot_at_with_multiple_actions() -> anyhow::Result<()> { + let doc = LoroDoc::new(); + doc.set_peer_id(1)?; + + // Perform a series of actions + gen_action(&doc, 1, 10); + doc.commit(); + let frontiers_after_first_commit = doc.oplog_frontiers(); + let value_after_first_commit = doc.get_deep_value(); + + gen_action(&doc, 2, 20); + doc.commit(); + let frontiers_after_second_commit = doc.oplog_frontiers(); + let value_after_second_commit = doc.get_deep_value(); + // Export snapshot at the first frontiers + let snapshot_at_first = doc.export(ExportMode::SnapshotAt { + version: Cow::Borrowed(&frontiers_after_first_commit), + }); + let new_doc_first = LoroDoc::new(); + new_doc_first.import(&snapshot_at_first)?; + + // Verify the state of the new document matches the expected state + assert_eq!(new_doc_first.get_deep_value(), value_after_first_commit); + + // Export snapshot at the second frontiers + let snapshot_at_second = doc.export(ExportMode::SnapshotAt { + version: Cow::Borrowed(&frontiers_after_second_commit), + }); + let new_doc_second = LoroDoc::new(); + new_doc_second.import(&snapshot_at_second)?; + + // Verify the state of the new document matches the expected state + assert_eq!(new_doc_second.get_deep_value(), value_after_second_commit); + + Ok(()) +} + +#[test] +fn test_fork_at_target_frontiers() -> anyhow::Result<()> { + let doc = LoroDoc::new(); + doc.set_peer_id(1)?; + + // Perform initial actions + gen_action(&doc, 1, 10); + doc.commit(); + let frontiers = doc.oplog_frontiers(); + let value_after_first_commit = doc.get_deep_value(); + + // Perform more actions + gen_action(&doc, 2, 20); + doc.commit(); + + let new_doc = doc.fork_at(&frontiers); + assert_eq!(new_doc.get_deep_value(), value_after_first_commit); + + // Import all updates to the new document + new_doc.import(&doc.export(ExportMode::all_updates()))?; + assert_eq!(new_doc.get_deep_value(), doc.get_deep_value()); + + Ok(()) +} diff --git a/crates/loro/tests/loro_rust_test.rs b/crates/loro/tests/loro_rust_test.rs index 91334475..88a919d7 100644 --- a/crates/loro/tests/loro_rust_test.rs +++ b/crates/loro/tests/loro_rust_test.rs @@ -1675,3 +1675,33 @@ fn checkout_should_reset_container_deleted_cache() { doc.checkout(&f).unwrap(); assert!(!text.is_deleted()); } + +#[test] +fn test_fork_at_target_frontiers() { + let doc = LoroDoc::new(); + let list = doc.get_movable_list("list"); + let _text = list.push_container(LoroText::new()).unwrap(); + doc.commit(); + let f = doc.state_frontiers(); + list.set(0, 1).unwrap(); + doc.commit(); + let snapshot = doc.export(loro::ExportMode::snapshot_at(&f)); + let new_doc = LoroDoc::new(); + new_doc.import(&snapshot).unwrap(); + assert_eq!(new_doc.state_frontiers(), f); + assert_eq!( + new_doc.get_deep_value().to_json_value(), + json!({ + "list": [""] + }) + ); + new_doc + .import(&doc.export(loro::ExportMode::all_updates())) + .unwrap(); + assert_eq!( + new_doc.get_deep_value().to_json_value(), + json!({ + "list": [1] + }) + ); +}