mirror of
https://github.com/loro-dev/loro.git
synced 2025-01-22 12:57:20 +00:00
feat: fork doc at the target version (#469)
This commit is contained in:
parent
3fe619bc81
commit
bdf006ed2e
15 changed files with 309 additions and 8 deletions
|
@ -102,6 +102,12 @@ pub enum LoroTreeError {
|
|||
TreeNodeDeletedOrNotExist(TreeID),
|
||||
}
|
||||
|
||||
#[derive(Error, Debug, PartialEq)]
|
||||
pub enum LoroEncodeError {
|
||||
#[error("The frontiers are not found in this doc: {0}")]
|
||||
FrontiersNotFound(String),
|
||||
}
|
||||
|
||||
#[cfg(feature = "wasm")]
|
||||
pub mod wasm {
|
||||
use wasm_bindgen::JsValue;
|
||||
|
|
|
@ -13,7 +13,7 @@ mod macros;
|
|||
mod span;
|
||||
mod value;
|
||||
|
||||
pub use error::{LoroError, LoroResult, LoroTreeError};
|
||||
pub use error::{LoroEncodeError, LoroError, LoroResult, LoroTreeError};
|
||||
#[doc(hidden)]
|
||||
pub use fxhash::FxHashMap;
|
||||
pub use internal_string::InternalString;
|
||||
|
|
|
@ -29,6 +29,7 @@ pub enum ExportMode<'a> {
|
|||
UpdatesInRange { spans: Cow<'a, [IdSpan]> },
|
||||
GcSnapshot(Cow<'a, Frontiers>),
|
||||
StateOnly(Option<Cow<'a, Frontiers>>),
|
||||
SnapshotAt { version: Cow<'a, Frontiers> },
|
||||
}
|
||||
|
||||
impl<'a> ExportMode<'a> {
|
||||
|
@ -76,6 +77,12 @@ impl<'a> ExportMode<'a> {
|
|||
pub fn state_only(frontiers: Option<&'a Frontiers>) -> Self {
|
||||
ExportMode::StateOnly(frontiers.map(Cow::Borrowed))
|
||||
}
|
||||
|
||||
pub fn snapshot_at(frontiers: &'a Frontiers) -> Self {
|
||||
ExportMode::SnapshotAt {
|
||||
version: Cow::Borrowed(frontiers),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const MAGIC_BYTES: [u8; 4] = *b"loro";
|
||||
|
@ -324,6 +331,12 @@ pub(crate) fn export_fast_snapshot(doc: &LoroDoc) -> Vec<u8> {
|
|||
})
|
||||
}
|
||||
|
||||
pub(crate) fn export_fast_snapshot_at(doc: &LoroDoc, frontiers: &Frontiers) -> Vec<u8> {
|
||||
encode_with(EncodeMode::FastSnapshot, &mut |ans| {
|
||||
fast_snapshot::encode_snapshot_at(doc, frontiers, ans).unwrap();
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) fn export_fast_updates(doc: &LoroDoc, vv: &VersionVector) -> Vec<u8> {
|
||||
encode_with(EncodeMode::FastUpdates, &mut |ans| {
|
||||
fast_snapshot::encode_updates(doc, vv, ans);
|
||||
|
|
|
@ -15,9 +15,9 @@
|
|||
//!
|
||||
use std::io::{Read, Write};
|
||||
|
||||
use crate::{encoding::gc, oplog::ChangeStore, LoroDoc, OpLog, VersionVector};
|
||||
use crate::{encoding::gc, oplog::ChangeStore, version::Frontiers, LoroDoc, OpLog, VersionVector};
|
||||
use bytes::{Buf, Bytes};
|
||||
use loro_common::{IdSpan, LoroError, LoroResult};
|
||||
use loro_common::{IdSpan, LoroEncodeError, LoroError, LoroResult};
|
||||
use tracing::trace;
|
||||
|
||||
use super::encode_reordered::{import_changes_to_oplog, ImportChangesResult};
|
||||
|
@ -163,6 +163,7 @@ pub(crate) fn encode_snapshot<W: std::io::Write>(doc: &LoroDoc, w: &mut W) {
|
|||
let oplog = doc.oplog().try_lock().unwrap();
|
||||
let is_gc = state.store.gc_store().is_some();
|
||||
if is_gc {
|
||||
// TODO: PERF: this can be optimized by reusing the bytes of gc store
|
||||
let f = oplog.trimmed_frontiers().clone();
|
||||
drop(oplog);
|
||||
drop(state);
|
||||
|
@ -193,6 +194,53 @@ pub(crate) fn encode_snapshot<W: std::io::Write>(doc: &LoroDoc, w: &mut W) {
|
|||
);
|
||||
}
|
||||
|
||||
pub(crate) fn encode_snapshot_at<W: std::io::Write>(
|
||||
doc: &LoroDoc,
|
||||
frontiers: &Frontiers,
|
||||
w: &mut W,
|
||||
) -> Result<(), LoroEncodeError> {
|
||||
let version_before_start = doc.oplog_frontiers();
|
||||
doc.checkout_without_emitting(frontiers).unwrap();
|
||||
{
|
||||
let mut state = doc.app_state().try_lock().unwrap();
|
||||
let oplog = doc.oplog().try_lock().unwrap();
|
||||
let is_gc = state.store.gc_store().is_some();
|
||||
if is_gc {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
assert!(!state.is_in_txn());
|
||||
let Some(oplog_bytes) = oplog.fork_changes_up_to(frontiers) else {
|
||||
return Err(LoroEncodeError::FrontiersNotFound(format!(
|
||||
"frontiers: {:?} when export in SnapshotAt mode",
|
||||
frontiers
|
||||
)));
|
||||
};
|
||||
|
||||
if oplog.is_trimmed() {
|
||||
assert_eq!(
|
||||
oplog.trimmed_frontiers(),
|
||||
state.store.trimmed_frontiers().unwrap()
|
||||
);
|
||||
}
|
||||
|
||||
state.ensure_all_alive_containers();
|
||||
let state_bytes = state.store.encode();
|
||||
_encode_snapshot(
|
||||
Snapshot {
|
||||
oplog_bytes,
|
||||
state_bytes: Some(state_bytes),
|
||||
gc_bytes: Bytes::new(),
|
||||
},
|
||||
w,
|
||||
);
|
||||
}
|
||||
doc.checkout_without_emitting(&version_before_start)
|
||||
.unwrap();
|
||||
doc.ignore_events();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) fn decode_oplog(oplog: &mut OpLog, bytes: &[u8]) -> Result<(), LoroError> {
|
||||
let oplog_len = u32::from_le_bytes(bytes[0..4].try_into().unwrap());
|
||||
let oplog_bytes = &bytes[4..4 + oplog_len as usize];
|
||||
|
|
|
@ -27,7 +27,6 @@ pub(crate) fn export_gc_snapshot<W: std::io::Write>(
|
|||
assert!(!doc.is_detached());
|
||||
let oplog = doc.oplog().lock().unwrap();
|
||||
let start_from = calc_gc_doc_start(&oplog, start_from);
|
||||
trace!("gc_start_from {:?}", &start_from);
|
||||
let mut start_vv = oplog.dag().frontiers_to_vv(&start_from).unwrap();
|
||||
for id in start_from.iter() {
|
||||
// we need to include the ops in start_from, this can make things easier
|
||||
|
|
45
crates/loro-internal/src/fork.rs
Normal file
45
crates/loro-internal/src/fork.rs
Normal file
|
@ -0,0 +1,45 @@
|
|||
//! The fork module provides functionality to create a new LoroDoc instance at a specified version
|
||||
//! (Frontiers) with minimal overhead.
|
||||
//!
|
||||
//! # Implementation Overview
|
||||
//!
|
||||
//! The `fork_at` function in this module allows for the creation of a new document that reflects
|
||||
//! the state of the original document at a given version. The function achieves this by:
|
||||
//!
|
||||
//! ## Exporting Necessary Data:
|
||||
//!
|
||||
//! - **Change Store Data**: Collects all changes up to the specified version from the change
|
||||
//! store's key-value (kv) data store. It includes the version vector and frontiers for accurate
|
||||
//! identification of the version.
|
||||
//!
|
||||
//! - **Container Store Data**: Exports the container store's kv data representing the document's
|
||||
//! state at the specified version. This involves checking out to the desired version, exporting
|
||||
//! the state, and efficiently checking back to the latest version.
|
||||
//!
|
||||
//! - **GC Store Data**: If applicable, exports the gc store's kv data, ensuring that version
|
||||
//! identifiers are included.
|
||||
//!
|
||||
//! ## Reconstructing the New Document:
|
||||
//!
|
||||
//! Imports the exported data into a new LoroDoc instance using optimized import mechanisms
|
||||
//! similar to those used in fast snapshot imports.
|
||||
//!
|
||||
//! By focusing on exporting only the necessary data and optimizing state transitions during
|
||||
//! version checkout, the `fork_at` function minimizes overhead and efficiently creates new
|
||||
//! document instances representing past versions.
|
||||
//!
|
||||
use std::borrow::Cow;
|
||||
|
||||
use crate::{version::Frontiers, LoroDoc};
|
||||
|
||||
impl LoroDoc {
|
||||
/// Creates a new LoroDoc at a specified version (Frontiers)
|
||||
pub fn fork_at(&self, frontiers: &Frontiers) -> LoroDoc {
|
||||
let bytes = self.export(crate::loro::ExportMode::SnapshotAt {
|
||||
version: Cow::Borrowed(frontiers),
|
||||
});
|
||||
let doc = LoroDoc::new();
|
||||
doc.import(&bytes).unwrap();
|
||||
doc
|
||||
}
|
||||
}
|
|
@ -31,6 +31,7 @@ pub mod container;
|
|||
pub mod cursor;
|
||||
pub mod dag;
|
||||
pub mod encoding;
|
||||
pub(crate) mod fork;
|
||||
pub mod id;
|
||||
#[cfg(feature = "jsonpath")]
|
||||
pub mod jsonpath;
|
||||
|
|
|
@ -28,9 +28,10 @@ use crate::{
|
|||
dag::DagUtils,
|
||||
diff_calc::DiffCalculator,
|
||||
encoding::{
|
||||
decode_snapshot, export_fast_snapshot, export_fast_updates, export_fast_updates_in_range,
|
||||
export_gc_snapshot, export_snapshot, export_state_only_snapshot,
|
||||
json_schema::json::JsonSchema, parse_header_and_body, EncodeMode, ParsedHeaderAndBody,
|
||||
decode_snapshot, export_fast_snapshot, export_fast_snapshot_at, export_fast_updates,
|
||||
export_fast_updates_in_range, export_gc_snapshot, export_snapshot,
|
||||
export_state_only_snapshot, json_schema::json::JsonSchema, parse_header_and_body,
|
||||
EncodeMode, ParsedHeaderAndBody,
|
||||
},
|
||||
event::{str_to_path, EventTriggerKind, Index, InternalDocDiff},
|
||||
handler::{Handler, MovableListHandler, TextHandler, TreeHandler, ValueOrHandler},
|
||||
|
@ -644,6 +645,13 @@ impl LoroDoc {
|
|||
}
|
||||
}
|
||||
|
||||
pub(crate) fn ignore_events(&self) {
|
||||
let _events = {
|
||||
let mut state = self.state.lock().unwrap();
|
||||
state.take_events()
|
||||
};
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
pub fn export_snapshot(&self) -> Vec<u8> {
|
||||
self.commit_then_stop();
|
||||
|
@ -1133,7 +1141,7 @@ impl LoroDoc {
|
|||
}
|
||||
|
||||
#[instrument(level = "info", skip(self))]
|
||||
fn checkout_without_emitting(&self, frontiers: &Frontiers) -> Result<(), LoroError> {
|
||||
pub(crate) fn checkout_without_emitting(&self, frontiers: &Frontiers) -> Result<(), LoroError> {
|
||||
self.commit_then_stop();
|
||||
let from_frontiers = self.state_frontiers();
|
||||
info!(
|
||||
|
@ -1529,6 +1537,7 @@ impl LoroDoc {
|
|||
Some(f) => export_state_only_snapshot(self, &f),
|
||||
None => export_state_only_snapshot(self, &self.oplog_frontiers()),
|
||||
},
|
||||
ExportMode::SnapshotAt { version } => export_fast_snapshot_at(self, &version),
|
||||
};
|
||||
|
||||
self.renew_txn_if_auto_commit();
|
||||
|
|
|
@ -392,6 +392,14 @@ impl OpLog {
|
|||
self.change_store.export_blocks_in_range(spans, w)
|
||||
}
|
||||
|
||||
pub(crate) fn fork_changes_up_to(&self, frontiers: &Frontiers) -> Option<Bytes> {
|
||||
let vv = self.dag.frontiers_to_vv(frontiers)?;
|
||||
Some(
|
||||
self.change_store
|
||||
.fork_changes_up_to(self.dag.trimmed_vv(), frontiers, &vv),
|
||||
)
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
pub(crate) fn decode(&mut self, data: ParsedHeaderAndBody) -> Result<(), LoroError> {
|
||||
decode_oplog(self, data)
|
||||
|
|
|
@ -500,6 +500,40 @@ impl ChangeStore {
|
|||
let arena = &self.arena;
|
||||
encode_blocks_in_store(new_store, arena, w);
|
||||
}
|
||||
|
||||
pub(crate) fn fork_changes_up_to(
|
||||
&self,
|
||||
start_vv: &ImVersionVector,
|
||||
frontiers: &Frontiers,
|
||||
vv: &VersionVector,
|
||||
) -> Bytes {
|
||||
let new_store = ChangeStore::new_mem(&self.arena, self.merge_interval.clone());
|
||||
for mut span in vv.sub_iter_im(start_vv) {
|
||||
let counter_lower_bound = start_vv.get(&span.peer).copied().unwrap_or(0);
|
||||
span.counter.start = span.counter.start.max(counter_lower_bound);
|
||||
span.counter.end = span.counter.end.max(counter_lower_bound);
|
||||
if span.counter.start >= span.counter.end {
|
||||
continue;
|
||||
}
|
||||
|
||||
// PERF: this can be optimized by reusing the current encoded blocks
|
||||
// In the current method, it needs to parse and re-encode the blocks
|
||||
for c in self.iter_changes(span) {
|
||||
let start = ((start_vv.get(&c.id.peer).copied().unwrap_or(0) - c.id.counter).max(0)
|
||||
as usize)
|
||||
.min(c.atom_len());
|
||||
let end = ((vv.get(&c.id.peer).copied().unwrap_or(0) - c.id.counter).max(0)
|
||||
as usize)
|
||||
.min(c.atom_len());
|
||||
|
||||
assert_ne!(start, end);
|
||||
let ch = c.slice(start, end);
|
||||
new_store.insert_change(ch, false);
|
||||
}
|
||||
}
|
||||
|
||||
new_store.encode_all(vv, frontiers)
|
||||
}
|
||||
}
|
||||
|
||||
fn encode_blocks_in_store<W: std::io::Write>(
|
||||
|
|
|
@ -623,6 +623,38 @@ impl VersionVector {
|
|||
})
|
||||
}
|
||||
|
||||
/// Returns the spans that are in `self` but not in `rhs`
|
||||
pub fn sub_iter_im<'a>(
|
||||
&'a self,
|
||||
rhs: &'a ImVersionVector,
|
||||
) -> impl Iterator<Item = IdSpan> + 'a {
|
||||
self.iter().filter_map(move |(peer, &counter)| {
|
||||
if let Some(&rhs_counter) = rhs.get(peer) {
|
||||
if counter > rhs_counter {
|
||||
Some(IdSpan {
|
||||
peer: *peer,
|
||||
counter: CounterSpan {
|
||||
start: rhs_counter,
|
||||
end: counter,
|
||||
},
|
||||
})
|
||||
} else {
|
||||
None
|
||||
}
|
||||
} else if counter > 0 {
|
||||
Some(IdSpan {
|
||||
peer: *peer,
|
||||
counter: CounterSpan {
|
||||
start: 0,
|
||||
end: counter,
|
||||
},
|
||||
})
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
/// Iter all span from a -> b and b -> a
|
||||
pub fn iter_between<'a>(&'a self, other: &'a Self) -> impl Iterator<Item = IdSpan> + 'a {
|
||||
// PERF: can be optimized a little
|
||||
|
|
|
@ -742,6 +742,13 @@ impl LoroDoc {
|
|||
.collect()
|
||||
})
|
||||
}
|
||||
|
||||
/// Fork the document at the given frontiers.
|
||||
pub fn fork_at(&self, frontiers: &Frontiers) -> LoroDoc {
|
||||
let new_doc = self.doc.fork_at(frontiers);
|
||||
new_doc.start_auto_commit();
|
||||
LoroDoc::_new(new_doc)
|
||||
}
|
||||
}
|
||||
|
||||
/// It's used to prevent the user from implementing the trait directly.
|
||||
|
|
|
@ -3,6 +3,7 @@ use loro::LoroDoc;
|
|||
mod gc_test;
|
||||
#[cfg(feature = "jsonpath")]
|
||||
mod jsonpath_test;
|
||||
mod snapshot_at_test;
|
||||
mod undo_test;
|
||||
|
||||
fn gen_action(doc: &LoroDoc, seed: u64, mut ops_len: usize) {
|
||||
|
|
68
crates/loro/tests/integration_test/snapshot_at_test.rs
Normal file
68
crates/loro/tests/integration_test/snapshot_at_test.rs
Normal file
|
@ -0,0 +1,68 @@
|
|||
use std::borrow::Cow;
|
||||
|
||||
use super::gen_action;
|
||||
use loro::{ExportMode, LoroDoc, ToJson};
|
||||
use serde_json::json;
|
||||
|
||||
#[test]
|
||||
fn test_snapshot_at_with_multiple_actions() -> anyhow::Result<()> {
|
||||
let doc = LoroDoc::new();
|
||||
doc.set_peer_id(1)?;
|
||||
|
||||
// Perform a series of actions
|
||||
gen_action(&doc, 1, 10);
|
||||
doc.commit();
|
||||
let frontiers_after_first_commit = doc.oplog_frontiers();
|
||||
let value_after_first_commit = doc.get_deep_value();
|
||||
|
||||
gen_action(&doc, 2, 20);
|
||||
doc.commit();
|
||||
let frontiers_after_second_commit = doc.oplog_frontiers();
|
||||
let value_after_second_commit = doc.get_deep_value();
|
||||
// Export snapshot at the first frontiers
|
||||
let snapshot_at_first = doc.export(ExportMode::SnapshotAt {
|
||||
version: Cow::Borrowed(&frontiers_after_first_commit),
|
||||
});
|
||||
let new_doc_first = LoroDoc::new();
|
||||
new_doc_first.import(&snapshot_at_first)?;
|
||||
|
||||
// Verify the state of the new document matches the expected state
|
||||
assert_eq!(new_doc_first.get_deep_value(), value_after_first_commit);
|
||||
|
||||
// Export snapshot at the second frontiers
|
||||
let snapshot_at_second = doc.export(ExportMode::SnapshotAt {
|
||||
version: Cow::Borrowed(&frontiers_after_second_commit),
|
||||
});
|
||||
let new_doc_second = LoroDoc::new();
|
||||
new_doc_second.import(&snapshot_at_second)?;
|
||||
|
||||
// Verify the state of the new document matches the expected state
|
||||
assert_eq!(new_doc_second.get_deep_value(), value_after_second_commit);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_fork_at_target_frontiers() -> anyhow::Result<()> {
|
||||
let doc = LoroDoc::new();
|
||||
doc.set_peer_id(1)?;
|
||||
|
||||
// Perform initial actions
|
||||
gen_action(&doc, 1, 10);
|
||||
doc.commit();
|
||||
let frontiers = doc.oplog_frontiers();
|
||||
let value_after_first_commit = doc.get_deep_value();
|
||||
|
||||
// Perform more actions
|
||||
gen_action(&doc, 2, 20);
|
||||
doc.commit();
|
||||
|
||||
let new_doc = doc.fork_at(&frontiers);
|
||||
assert_eq!(new_doc.get_deep_value(), value_after_first_commit);
|
||||
|
||||
// Import all updates to the new document
|
||||
new_doc.import(&doc.export(ExportMode::all_updates()))?;
|
||||
assert_eq!(new_doc.get_deep_value(), doc.get_deep_value());
|
||||
|
||||
Ok(())
|
||||
}
|
|
@ -1675,3 +1675,33 @@ fn checkout_should_reset_container_deleted_cache() {
|
|||
doc.checkout(&f).unwrap();
|
||||
assert!(!text.is_deleted());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_fork_at_target_frontiers() {
|
||||
let doc = LoroDoc::new();
|
||||
let list = doc.get_movable_list("list");
|
||||
let _text = list.push_container(LoroText::new()).unwrap();
|
||||
doc.commit();
|
||||
let f = doc.state_frontiers();
|
||||
list.set(0, 1).unwrap();
|
||||
doc.commit();
|
||||
let snapshot = doc.export(loro::ExportMode::snapshot_at(&f));
|
||||
let new_doc = LoroDoc::new();
|
||||
new_doc.import(&snapshot).unwrap();
|
||||
assert_eq!(new_doc.state_frontiers(), f);
|
||||
assert_eq!(
|
||||
new_doc.get_deep_value().to_json_value(),
|
||||
json!({
|
||||
"list": [""]
|
||||
})
|
||||
);
|
||||
new_doc
|
||||
.import(&doc.export(loro::ExportMode::all_updates()))
|
||||
.unwrap();
|
||||
assert_eq!(
|
||||
new_doc.get_deep_value().to_json_value(),
|
||||
json!({
|
||||
"list": [1]
|
||||
})
|
||||
);
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue