diff --git a/crates/examples/Cargo.toml b/crates/examples/Cargo.toml index d19e3a43..db550647 100644 --- a/crates/examples/Cargo.toml +++ b/crates/examples/Cargo.toml @@ -30,3 +30,7 @@ harness = false [[bench]] name = "bench_text" harness = false + +[[bench]] +name = "fork" +harness = false diff --git a/crates/examples/benches/fork.rs b/crates/examples/benches/fork.rs new file mode 100644 index 00000000..84587ba2 --- /dev/null +++ b/crates/examples/benches/fork.rs @@ -0,0 +1,21 @@ +use criterion::{criterion_group, criterion_main, Criterion}; +use loro::LoroDoc; + +fn bench_fork(c: &mut Criterion) { + { + let mut b = c.benchmark_group("fork"); + b.bench_function("fork 1000 times with text edit at each fork", |b| { + b.iter(|| { + let mut doc = LoroDoc::new(); + for _ in 0..1000 { + let text = doc.get_text("text"); + text.insert(0, "Hi").unwrap(); + doc = doc.fork(); + } + }); + }); + } +} + +criterion_group!(benches, bench_fork); +criterion_main!(benches); diff --git a/crates/examples/examples/fork.rs b/crates/examples/examples/fork.rs new file mode 100644 index 00000000..9e48caa6 --- /dev/null +++ b/crates/examples/examples/fork.rs @@ -0,0 +1,10 @@ +use loro::LoroDoc; + +fn main() { + let mut doc = LoroDoc::new(); + for _ in 0..10_000 { + let text = doc.get_text("text"); + text.insert(0, "Hi").unwrap(); + doc = doc.fork(); + } +} diff --git a/crates/loro-internal/src/arena.rs b/crates/loro-internal/src/arena.rs index 3c2149c5..6a7b44b7 100644 --- a/crates/loro-internal/src/arena.rs +++ b/crates/loro-internal/src/arena.rs @@ -37,7 +37,7 @@ struct InnerSharedArena { parents: Mutex>>, values: Mutex>, root_c_idx: Mutex>, - str: Mutex, + str: Arc>, } /// This is shared between [OpLog] and [AppState]. @@ -76,7 +76,7 @@ impl SharedArena { parents: Mutex::new(self.inner.parents.try_lock().unwrap().clone()), values: Mutex::new(self.inner.values.try_lock().unwrap().clone()), root_c_idx: Mutex::new(self.inner.root_c_idx.try_lock().unwrap().clone()), - str: Mutex::new(self.inner.str.try_lock().unwrap().clone()), + str: self.inner.str.clone(), }), } } diff --git a/crates/loro-internal/src/configure.rs b/crates/loro-internal/src/configure.rs index 12027dc7..7fd5bca7 100644 --- a/crates/loro-internal/src/configure.rs +++ b/crates/loro-internal/src/configure.rs @@ -1,4 +1,5 @@ pub use crate::container::richtext::config::{StyleConfig, StyleConfigMap}; +use crate::LoroDoc; #[derive(Clone, Debug)] pub struct Configure { @@ -8,6 +9,15 @@ pub struct Configure { pub(crate) editable_detached_mode: Arc, } +impl LoroDoc { + pub(crate) fn set_config(&self, config: &Configure) { + self.config_text_style(config.text_style_config.read().unwrap().clone()); + self.set_record_timestamp(config.record_timestamp()); + self.set_change_merge_interval(config.merge_interval()); + self.set_detached_editing(config.detached_editing()); + } +} + impl Default for Configure { fn default() -> Self { Self { diff --git a/crates/loro-internal/src/encoding.rs b/crates/loro-internal/src/encoding.rs index 819a2d8f..a937fc16 100644 --- a/crates/loro-internal/src/encoding.rs +++ b/crates/loro-internal/src/encoding.rs @@ -1,5 +1,5 @@ pub(crate) mod arena; -mod fast_snapshot; +pub(crate) mod fast_snapshot; pub(crate) mod json_schema; mod outdated_encode_reordered; mod shallow_snapshot; @@ -213,7 +213,10 @@ impl EncodeMode { } pub fn is_snapshot(self) -> bool { - matches!(self, EncodeMode::OutdatedSnapshot) + matches!( + self, + EncodeMode::OutdatedSnapshot | EncodeMode::FastSnapshot + ) } } diff --git a/crates/loro-internal/src/encoding/fast_snapshot.rs b/crates/loro-internal/src/encoding/fast_snapshot.rs index 6e3ea46b..ea46a2b8 100644 --- a/crates/loro-internal/src/encoding/fast_snapshot.rs +++ b/crates/loro-internal/src/encoding/fast_snapshot.rs @@ -22,7 +22,7 @@ use bytes::{Buf, Bytes}; use loro_common::{IdSpan, LoroError, LoroResult}; use tracing::trace; pub(crate) const EMPTY_MARK: &[u8] = b"E"; -pub(super) struct Snapshot { +pub(crate) struct Snapshot { pub oplog_bytes: Bytes, pub state_bytes: Option, pub shallow_root_state_bytes: Bytes, @@ -70,6 +70,16 @@ fn read_u32_le(r: &mut bytes::buf::Reader) -> u32 { } pub(crate) fn decode_snapshot(doc: &LoroDoc, bytes: Bytes) -> LoroResult<()> { + let snapshot = _decode_snapshot_bytes(bytes)?; + decode_snapshot_inner(snapshot, doc) +} + +pub(crate) fn decode_snapshot_inner(snapshot: Snapshot, doc: &LoroDoc) -> Result<(), LoroError> { + let Snapshot { + oplog_bytes, + state_bytes, + shallow_root_state_bytes, + } = snapshot; ensure_cov::notify_cov("loro_internal::import::fast_snapshot::decode_snapshot"); let mut state = doc.app_state().try_lock().map_err(|_| { LoroError::DecodeError( @@ -94,11 +104,6 @@ pub(crate) fn decode_snapshot(doc: &LoroDoc, bytes: Bytes) -> LoroResult<()> { assert!(state.frontiers.is_empty()); assert!(oplog.frontiers().is_empty()); - let Snapshot { - oplog_bytes, - state_bytes, - shallow_root_state_bytes, - } = _decode_snapshot_bytes(bytes)?; oplog.decode_change_store(oplog_bytes)?; let need_calc = state_bytes.is_none(); let state_frontiers; @@ -157,7 +162,11 @@ impl OpLog { } pub(crate) fn encode_snapshot(doc: &LoroDoc, w: &mut W) { - // events should be emitted before encode snapshot + let snapshot = encode_snapshot_inner(doc); + _encode_snapshot(snapshot, w); +} + +pub(crate) fn encode_snapshot_inner(doc: &LoroDoc) -> Snapshot { assert!(doc.drop_pending_events().is_empty()); let old_state_frontiers = doc.state_frontiers(); let was_detached = doc.is_detached(); @@ -169,9 +178,10 @@ pub(crate) fn encode_snapshot(doc: &LoroDoc, w: &mut W) { let f = oplog.shallow_since_frontiers().clone(); drop(oplog); drop(state); - shallow_snapshot::export_shallow_snapshot(doc, &f, w).unwrap(); - return; + let (snapshot, _) = shallow_snapshot::export_shallow_snapshot_inner(doc, &f).unwrap(); + return snapshot; } + assert!(!state.is_in_txn()); let oplog_bytes = oplog.encode_change_store(); if oplog.is_shallow() { @@ -180,7 +190,6 @@ pub(crate) fn encode_snapshot(doc: &LoroDoc, w: &mut W) { state.store.shallow_root_frontiers().unwrap() ); } - if was_detached { let latest = oplog.frontiers().clone(); drop(oplog); @@ -188,23 +197,20 @@ pub(crate) fn encode_snapshot(doc: &LoroDoc, w: &mut W) { doc.checkout_without_emitting(&latest).unwrap(); state = doc.app_state().try_lock().unwrap(); } - state.ensure_all_alive_containers(); let state_bytes = state.store.encode(); - _encode_snapshot( - Snapshot { - oplog_bytes, - state_bytes: Some(state_bytes), - shallow_root_state_bytes: Bytes::new(), - }, - w, - ); - + let snapshot = Snapshot { + oplog_bytes, + state_bytes: Some(state_bytes), + shallow_root_state_bytes: Bytes::new(), + }; if was_detached { drop(state); doc.checkout_without_emitting(&old_state_frontiers).unwrap(); doc.drop_pending_events(); } + + snapshot } pub(crate) fn decode_oplog(oplog: &mut OpLog, bytes: &[u8]) -> Result, LoroError> { diff --git a/crates/loro-internal/src/encoding/shallow_snapshot.rs b/crates/loro-internal/src/encoding/shallow_snapshot.rs index 6632023d..989a1a54 100644 --- a/crates/loro-internal/src/encoding/shallow_snapshot.rs +++ b/crates/loro-internal/src/encoding/shallow_snapshot.rs @@ -25,6 +25,15 @@ pub(crate) fn export_shallow_snapshot( start_from: &Frontiers, w: &mut W, ) -> Result { + let (snapshot, start_from) = export_shallow_snapshot_inner(doc, start_from)?; + _encode_snapshot(snapshot, w); + Ok(start_from) +} + +pub(crate) fn export_shallow_snapshot_inner( + doc: &LoroDoc, + start_from: &Frontiers, +) -> Result<(Snapshot, Frontiers), LoroEncodeError> { let oplog = doc.oplog().try_lock().unwrap(); let start_from = calc_shallow_doc_start(&oplog, start_from); let mut start_vv = oplog.dag().frontiers_to_vv(&start_from).unwrap(); @@ -110,7 +119,6 @@ pub(crate) fn export_shallow_snapshot( shallow_root_state_bytes, }; - _encode_snapshot(snapshot, w); if state_frontiers != latest_frontiers { doc.checkout_without_emitting(&state_frontiers).unwrap(); } @@ -120,7 +128,7 @@ pub(crate) fn export_shallow_snapshot( } doc.drop_pending_events(); - Ok(start_from) + Ok((snapshot, start_from)) } fn has_unknown_container<'a>(mut cids: impl Iterator) -> bool { diff --git a/crates/loro-internal/src/history_cache.rs b/crates/loro-internal/src/history_cache.rs index c79eca3d..7b419b10 100644 --- a/crates/loro-internal/src/history_cache.rs +++ b/crates/loro-internal/src/history_cache.rs @@ -66,15 +66,6 @@ impl HistoryCacheTrait for ForCheckout { } impl ContainerHistoryCache { - pub(crate) fn fork(&self, change_store: ChangeStore, gc: Option>) -> Self { - Self { - change_store, - for_checkout: None, - for_importing: None, - shallow_root_state: gc, - } - } - pub(crate) fn new(change_store: ChangeStore, gc: Option>) -> Self { Self { change_store, diff --git a/crates/loro-internal/src/loro.rs b/crates/loro-internal/src/loro.rs index 1ccee0de..4917b838 100644 --- a/crates/loro-internal/src/loro.rs +++ b/crates/loro-internal/src/loro.rs @@ -33,10 +33,10 @@ use crate::{ dag::DagUtils, diff_calc::DiffCalculator, encoding::{ - decode_snapshot, export_fast_snapshot, export_fast_updates, export_fast_updates_in_range, - export_shallow_snapshot, export_snapshot, export_snapshot_at, export_state_only_snapshot, - json_schema::json::JsonSchema, parse_header_and_body, EncodeMode, ImportStatus, - ParsedHeaderAndBody, + self, decode_snapshot, export_fast_snapshot, export_fast_updates, + export_fast_updates_in_range, export_shallow_snapshot, export_snapshot, export_snapshot_at, + export_state_only_snapshot, json_schema::json::JsonSchema, parse_header_and_body, + EncodeMode, ImportStatus, ParsedHeaderAndBody, }, event::{str_to_path, EventTriggerKind, Index, InternalDocDiff}, handler::{Handler, MovableListHandler, TextHandler, TreeHandler, ValueOrHandler}, @@ -98,37 +98,13 @@ impl LoroDoc { pub fn fork(&self) -> Self { self.commit_then_stop(); - let arena = self.arena.fork(); - let config = self.config.fork(); - let txn = Arc::new(Mutex::new(None)); - let new_state = self.state.try_lock().unwrap().fork_with_new_peer_id( - arena.clone(), - Arc::downgrade(&txn), - config.clone(), - ); - let gc = new_state.try_lock().unwrap().shallow_root_store().cloned(); - let doc = LoroDoc { - oplog: Arc::new(Mutex::new(self.oplog().try_lock().unwrap().fork( - arena.clone(), - config.clone(), - gc, - ))), - state: new_state, - observer: Arc::new(Observer::new(arena.clone())), - arena, - config, - diff_calculator: Arc::new(Mutex::new(DiffCalculator::new(true))), - txn, - auto_commit: AtomicBool::new(false), - detached: AtomicBool::new(self.is_detached()), - local_update_subs: SubscriberSetWithQueue::new(), - peer_id_change_subs: SubscriberSetWithQueue::new(), - }; - + let snapshot = encoding::fast_snapshot::encode_snapshot_inner(self); + let doc = Self::new(); + encoding::fast_snapshot::decode_snapshot_inner(snapshot, &doc).unwrap(); + doc.set_config(&self.config); if self.auto_commit.load(std::sync::atomic::Ordering::Relaxed) { doc.start_auto_commit(); } - self.renew_txn_if_auto_commit(); doc } diff --git a/crates/loro-internal/src/oplog.rs b/crates/loro-internal/src/oplog.rs index ac058b01..5f8bac0c 100644 --- a/crates/loro-internal/src/oplog.rs +++ b/crates/loro-internal/src/oplog.rs @@ -7,7 +7,7 @@ use std::borrow::Cow; use std::cell::RefCell; use std::cmp::Ordering; use std::rc::Rc; -use std::sync::{Arc, Mutex}; +use std::sync::Mutex; use tracing::{debug, trace, trace_span}; use self::change_store::iter::MergedChangeIter; @@ -24,7 +24,6 @@ use crate::history_cache::ContainerHistoryCache; use crate::id::{Counter, PeerID, ID}; use crate::op::{FutureInnerContent, ListSlice, RawOpContent, RemoteOp, RichOp}; use crate::span::{HasCounterSpan, HasLamportSpan}; -use crate::state::GcStore; use crate::version::{Frontiers, ImVersionVector, VersionVector}; use crate::LoroError; use change_store::BlockOpRef; @@ -83,34 +82,6 @@ impl OpLog { } } - pub(crate) fn fork( - &self, - arena: SharedArena, - configure: Configure, - gc: Option>, - ) -> Self { - let change_store = self.change_store.fork( - arena.clone(), - configure.merge_interval.clone(), - self.vv(), - self.frontiers(), - ); - Self { - history_cache: Mutex::new( - self.history_cache - .try_lock() - .unwrap() - .fork(change_store.clone(), gc), - ), - change_store: change_store.clone(), - dag: self.dag.fork(change_store), - arena, - pending_changes: Default::default(), - batch_importing: false, - configure, - } - } - #[inline] pub fn dag(&self) -> &AppDag { &self.dag diff --git a/crates/loro-internal/src/oplog/loro_dag.rs b/crates/loro-internal/src/oplog/loro_dag.rs index f678aa80..6ead3919 100644 --- a/crates/loro-internal/src/oplog/loro_dag.rs +++ b/crates/loro-internal/src/oplog/loro_dag.rs @@ -429,21 +429,6 @@ impl AppDag { } } - pub(super) fn fork(&self, change_store: ChangeStore) -> AppDag { - AppDag { - change_store: change_store.clone(), - map: Mutex::new(self.map.try_lock().unwrap().clone()), - frontiers: self.frontiers.clone(), - vv: self.vv.clone(), - unparsed_vv: Mutex::new(self.unparsed_vv.try_lock().unwrap().clone()), - unhandled_dep_points: Mutex::new(self.unhandled_dep_points.try_lock().unwrap().clone()), - shallow_since_frontiers: self.shallow_since_frontiers.clone(), - shallow_since_vv: self.shallow_since_vv.clone(), - shallow_root_frontiers_deps: self.shallow_root_frontiers_deps.clone(), - pending_txn_node: self.pending_txn_node.clone(), - } - } - pub fn total_parsed_dag_node(&self) -> usize { self.map.try_lock().unwrap().len() } diff --git a/crates/loro-internal/src/state.rs b/crates/loro-internal/src/state.rs index ecd25186..57f4fd1b 100644 --- a/crates/loro-internal/src/state.rs +++ b/crates/loro-internal/src/state.rs @@ -385,17 +385,18 @@ impl DocState { } pub fn fork_with_new_peer_id( - &self, + &mut self, arena: SharedArena, global_txn: Weak>>, config: Configure, ) -> Arc> { - Arc::new_cyclic(|weak| { - let peer = Arc::new(AtomicU64::new(DefaultRandom.next_u64())); + let peer = Arc::new(AtomicU64::new(DefaultRandom.next_u64())); + let store = self.store.fork(arena.clone(), peer.clone(), config.clone()); + Arc::new_cyclic(move |weak| { Mutex::new(Self { - peer: peer.clone(), + peer, frontiers: self.frontiers.clone(), - store: self.store.fork(arena.clone(), peer, config.clone()), + store, arena, config, weak_state: weak.clone(), diff --git a/crates/loro-internal/src/state/container_store.rs b/crates/loro-internal/src/state/container_store.rs index 3388a254..2e9925ab 100644 --- a/crates/loro-internal/src/state/container_store.rs +++ b/crates/loro-internal/src/state/container_store.rs @@ -229,7 +229,7 @@ impl ContainerStore { } pub(crate) fn fork( - &self, + &mut self, arena: SharedArena, peer: Arc, config: Configure, diff --git a/crates/loro-internal/src/state/container_store/container_wrapper.rs b/crates/loro-internal/src/state/container_store/container_wrapper.rs index 08d57d0a..1daa684f 100644 --- a/crates/loro-internal/src/state/container_store/container_wrapper.rs +++ b/crates/loro-internal/src/state/container_store/container_wrapper.rs @@ -5,7 +5,6 @@ use loro_common::{ContainerID, ContainerType, LoroResult, LoroValue}; use crate::state::counter_state::CounterState; use crate::{ arena::SharedArena, - configure::Configure, container::idx::ContainerIdx, state::{ unknown_state::UnknownState, ContainerCreationContext, ContainerState, FastStateSnapshot, @@ -52,34 +51,6 @@ impl ContainerWrapper { } } - pub fn fork(&self, config: &Configure) -> Self { - if self.flushed { - Self { - depth: self.depth, - kind: self.kind, - parent: self.parent.clone(), - bytes: self.bytes.clone(), - value: self.value.clone(), - bytes_offset_for_value: self.bytes_offset_for_value, - bytes_offset_for_state: self.bytes_offset_for_state, - state: None, - flushed: true, - } - } else { - Self { - depth: self.depth, - kind: self.kind, - parent: self.parent.clone(), - bytes: None, - value: None, - bytes_offset_for_value: None, - bytes_offset_for_state: None, - state: Some(self.state.as_ref().unwrap().fork(config)), - flushed: false, - } - } - } - pub fn depth(&self) -> usize { self.depth } diff --git a/crates/loro-internal/src/state/container_store/inner_store.rs b/crates/loro-internal/src/state/container_store/inner_store.rs index 0f4bc501..7dca6ea9 100644 --- a/crates/loro-internal/src/state/container_store/inner_store.rs +++ b/crates/loro-internal/src/state/container_store/inner_store.rs @@ -1,14 +1,11 @@ -use std::ops::Bound; - -use bytes::Bytes; -use fxhash::FxHashMap; -use loro_common::ContainerID; -use tracing::trace; - use crate::{ arena::SharedArena, configure::Configure, container::idx::ContainerIdx, state::container_store::FRONTIERS_KEY, utils::kv_wrapper::KvWrapper, version::Frontiers, }; +use bytes::Bytes; +use fxhash::FxHashMap; +use loro_common::ContainerID; +use std::ops::Bound; use super::ContainerWrapper; @@ -145,7 +142,6 @@ impl InnerStore { count += 1; let cid = ContainerID::from_bytes(&k); let parent = ContainerWrapper::decode_parent(&v); - trace!("decode register parent {:?} parent = {:?}", &cid, &parent); let idx = self.arena.register_container(&cid); let p = parent.as_ref().map(|p| self.arena.register_container(p)); self.arena.set_parent(idx, p); @@ -238,19 +234,12 @@ impl InnerStore { } } - pub(crate) fn fork(&self, arena: SharedArena, config: &Configure) -> InnerStore { - // PERF: we can try flushing before forking - InnerStore { - arena, - store: self - .store - .iter() - .map(|(idx, c)| (*idx, c.fork(config))) - .collect(), - kv: self.kv.clone(), - len: self.len, - all_loaded: self.all_loaded, - } + pub(crate) fn fork(&mut self, arena: SharedArena, _config: &Configure) -> InnerStore { + // PERF: we can try to reuse + let bytes = self.encode(); + let mut new_store = Self::new(arena); + new_store.decode(bytes).unwrap(); + new_store } pub(crate) fn len(&self) -> usize { diff --git a/crates/loro/tests/loro_rust_test.rs b/crates/loro/tests/loro_rust_test.rs index e3a2b16d..c103860d 100644 --- a/crates/loro/tests/loro_rust_test.rs +++ b/crates/loro/tests/loro_rust_test.rs @@ -1768,7 +1768,7 @@ fn test_encode_snapshot_when_checkout() { } #[test] -fn travel_change_ancestors() { +fn test_travel_change_ancestors() { let doc = LoroDoc::new(); doc.set_peer_id(1).unwrap(); doc.get_text("text").insert(0, "Hello").unwrap();