perf: avoid memory leak when forking repeatedly (#500)

* perf: avoid memory leak when forking repeatedly

* fix: use the safest way to fork
This commit is contained in:
Zixuan Chen 2024-10-08 17:24:08 +08:00 committed by GitHub
parent c8f776f018
commit afbcee99b3
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
17 changed files with 115 additions and 169 deletions

View file

@ -30,3 +30,7 @@ harness = false
[[bench]]
name = "bench_text"
harness = false
[[bench]]
name = "fork"
harness = false

View file

@ -0,0 +1,21 @@
use criterion::{criterion_group, criterion_main, Criterion};
use loro::LoroDoc;
fn bench_fork(c: &mut Criterion) {
{
let mut b = c.benchmark_group("fork");
b.bench_function("fork 1000 times with text edit at each fork", |b| {
b.iter(|| {
let mut doc = LoroDoc::new();
for _ in 0..1000 {
let text = doc.get_text("text");
text.insert(0, "Hi").unwrap();
doc = doc.fork();
}
});
});
}
}
criterion_group!(benches, bench_fork);
criterion_main!(benches);

View file

@ -0,0 +1,10 @@
use loro::LoroDoc;
fn main() {
let mut doc = LoroDoc::new();
for _ in 0..10_000 {
let text = doc.get_text("text");
text.insert(0, "Hi").unwrap();
doc = doc.fork();
}
}

View file

@ -37,7 +37,7 @@ struct InnerSharedArena {
parents: Mutex<FxHashMap<ContainerIdx, Option<ContainerIdx>>>,
values: Mutex<Vec<LoroValue>>,
root_c_idx: Mutex<Vec<ContainerIdx>>,
str: Mutex<StrArena>,
str: Arc<Mutex<StrArena>>,
}
/// This is shared between [OpLog] and [AppState].
@ -76,7 +76,7 @@ impl SharedArena {
parents: Mutex::new(self.inner.parents.try_lock().unwrap().clone()),
values: Mutex::new(self.inner.values.try_lock().unwrap().clone()),
root_c_idx: Mutex::new(self.inner.root_c_idx.try_lock().unwrap().clone()),
str: Mutex::new(self.inner.str.try_lock().unwrap().clone()),
str: self.inner.str.clone(),
}),
}
}

View file

@ -1,4 +1,5 @@
pub use crate::container::richtext::config::{StyleConfig, StyleConfigMap};
use crate::LoroDoc;
#[derive(Clone, Debug)]
pub struct Configure {
@ -8,6 +9,15 @@ pub struct Configure {
pub(crate) editable_detached_mode: Arc<AtomicBool>,
}
impl LoroDoc {
pub(crate) fn set_config(&self, config: &Configure) {
self.config_text_style(config.text_style_config.read().unwrap().clone());
self.set_record_timestamp(config.record_timestamp());
self.set_change_merge_interval(config.merge_interval());
self.set_detached_editing(config.detached_editing());
}
}
impl Default for Configure {
fn default() -> Self {
Self {

View file

@ -1,5 +1,5 @@
pub(crate) mod arena;
mod fast_snapshot;
pub(crate) mod fast_snapshot;
pub(crate) mod json_schema;
mod outdated_encode_reordered;
mod shallow_snapshot;
@ -213,7 +213,10 @@ impl EncodeMode {
}
pub fn is_snapshot(self) -> bool {
matches!(self, EncodeMode::OutdatedSnapshot)
matches!(
self,
EncodeMode::OutdatedSnapshot | EncodeMode::FastSnapshot
)
}
}

View file

@ -22,7 +22,7 @@ use bytes::{Buf, Bytes};
use loro_common::{IdSpan, LoroError, LoroResult};
use tracing::trace;
pub(crate) const EMPTY_MARK: &[u8] = b"E";
pub(super) struct Snapshot {
pub(crate) struct Snapshot {
pub oplog_bytes: Bytes,
pub state_bytes: Option<Bytes>,
pub shallow_root_state_bytes: Bytes,
@ -70,6 +70,16 @@ fn read_u32_le(r: &mut bytes::buf::Reader<Bytes>) -> u32 {
}
pub(crate) fn decode_snapshot(doc: &LoroDoc, bytes: Bytes) -> LoroResult<()> {
let snapshot = _decode_snapshot_bytes(bytes)?;
decode_snapshot_inner(snapshot, doc)
}
pub(crate) fn decode_snapshot_inner(snapshot: Snapshot, doc: &LoroDoc) -> Result<(), LoroError> {
let Snapshot {
oplog_bytes,
state_bytes,
shallow_root_state_bytes,
} = snapshot;
ensure_cov::notify_cov("loro_internal::import::fast_snapshot::decode_snapshot");
let mut state = doc.app_state().try_lock().map_err(|_| {
LoroError::DecodeError(
@ -94,11 +104,6 @@ pub(crate) fn decode_snapshot(doc: &LoroDoc, bytes: Bytes) -> LoroResult<()> {
assert!(state.frontiers.is_empty());
assert!(oplog.frontiers().is_empty());
let Snapshot {
oplog_bytes,
state_bytes,
shallow_root_state_bytes,
} = _decode_snapshot_bytes(bytes)?;
oplog.decode_change_store(oplog_bytes)?;
let need_calc = state_bytes.is_none();
let state_frontiers;
@ -157,7 +162,11 @@ impl OpLog {
}
pub(crate) fn encode_snapshot<W: std::io::Write>(doc: &LoroDoc, w: &mut W) {
// events should be emitted before encode snapshot
let snapshot = encode_snapshot_inner(doc);
_encode_snapshot(snapshot, w);
}
pub(crate) fn encode_snapshot_inner(doc: &LoroDoc) -> Snapshot {
assert!(doc.drop_pending_events().is_empty());
let old_state_frontiers = doc.state_frontiers();
let was_detached = doc.is_detached();
@ -169,9 +178,10 @@ pub(crate) fn encode_snapshot<W: std::io::Write>(doc: &LoroDoc, w: &mut W) {
let f = oplog.shallow_since_frontiers().clone();
drop(oplog);
drop(state);
shallow_snapshot::export_shallow_snapshot(doc, &f, w).unwrap();
return;
let (snapshot, _) = shallow_snapshot::export_shallow_snapshot_inner(doc, &f).unwrap();
return snapshot;
}
assert!(!state.is_in_txn());
let oplog_bytes = oplog.encode_change_store();
if oplog.is_shallow() {
@ -180,7 +190,6 @@ pub(crate) fn encode_snapshot<W: std::io::Write>(doc: &LoroDoc, w: &mut W) {
state.store.shallow_root_frontiers().unwrap()
);
}
if was_detached {
let latest = oplog.frontiers().clone();
drop(oplog);
@ -188,23 +197,20 @@ pub(crate) fn encode_snapshot<W: std::io::Write>(doc: &LoroDoc, w: &mut W) {
doc.checkout_without_emitting(&latest).unwrap();
state = doc.app_state().try_lock().unwrap();
}
state.ensure_all_alive_containers();
let state_bytes = state.store.encode();
_encode_snapshot(
Snapshot {
oplog_bytes,
state_bytes: Some(state_bytes),
shallow_root_state_bytes: Bytes::new(),
},
w,
);
let snapshot = Snapshot {
oplog_bytes,
state_bytes: Some(state_bytes),
shallow_root_state_bytes: Bytes::new(),
};
if was_detached {
drop(state);
doc.checkout_without_emitting(&old_state_frontiers).unwrap();
doc.drop_pending_events();
}
snapshot
}
pub(crate) fn decode_oplog(oplog: &mut OpLog, bytes: &[u8]) -> Result<Vec<Change>, LoroError> {

View file

@ -25,6 +25,15 @@ pub(crate) fn export_shallow_snapshot<W: std::io::Write>(
start_from: &Frontiers,
w: &mut W,
) -> Result<Frontiers, LoroEncodeError> {
let (snapshot, start_from) = export_shallow_snapshot_inner(doc, start_from)?;
_encode_snapshot(snapshot, w);
Ok(start_from)
}
pub(crate) fn export_shallow_snapshot_inner(
doc: &LoroDoc,
start_from: &Frontiers,
) -> Result<(Snapshot, Frontiers), LoroEncodeError> {
let oplog = doc.oplog().try_lock().unwrap();
let start_from = calc_shallow_doc_start(&oplog, start_from);
let mut start_vv = oplog.dag().frontiers_to_vv(&start_from).unwrap();
@ -110,7 +119,6 @@ pub(crate) fn export_shallow_snapshot<W: std::io::Write>(
shallow_root_state_bytes,
};
_encode_snapshot(snapshot, w);
if state_frontiers != latest_frontiers {
doc.checkout_without_emitting(&state_frontiers).unwrap();
}
@ -120,7 +128,7 @@ pub(crate) fn export_shallow_snapshot<W: std::io::Write>(
}
doc.drop_pending_events();
Ok(start_from)
Ok((snapshot, start_from))
}
fn has_unknown_container<'a>(mut cids: impl Iterator<Item = &'a ContainerID>) -> bool {

View file

@ -66,15 +66,6 @@ impl HistoryCacheTrait for ForCheckout {
}
impl ContainerHistoryCache {
pub(crate) fn fork(&self, change_store: ChangeStore, gc: Option<Arc<GcStore>>) -> Self {
Self {
change_store,
for_checkout: None,
for_importing: None,
shallow_root_state: gc,
}
}
pub(crate) fn new(change_store: ChangeStore, gc: Option<Arc<GcStore>>) -> Self {
Self {
change_store,

View file

@ -33,10 +33,10 @@ use crate::{
dag::DagUtils,
diff_calc::DiffCalculator,
encoding::{
decode_snapshot, export_fast_snapshot, export_fast_updates, export_fast_updates_in_range,
export_shallow_snapshot, export_snapshot, export_snapshot_at, export_state_only_snapshot,
json_schema::json::JsonSchema, parse_header_and_body, EncodeMode, ImportStatus,
ParsedHeaderAndBody,
self, decode_snapshot, export_fast_snapshot, export_fast_updates,
export_fast_updates_in_range, export_shallow_snapshot, export_snapshot, export_snapshot_at,
export_state_only_snapshot, json_schema::json::JsonSchema, parse_header_and_body,
EncodeMode, ImportStatus, ParsedHeaderAndBody,
},
event::{str_to_path, EventTriggerKind, Index, InternalDocDiff},
handler::{Handler, MovableListHandler, TextHandler, TreeHandler, ValueOrHandler},
@ -98,37 +98,13 @@ impl LoroDoc {
pub fn fork(&self) -> Self {
self.commit_then_stop();
let arena = self.arena.fork();
let config = self.config.fork();
let txn = Arc::new(Mutex::new(None));
let new_state = self.state.try_lock().unwrap().fork_with_new_peer_id(
arena.clone(),
Arc::downgrade(&txn),
config.clone(),
);
let gc = new_state.try_lock().unwrap().shallow_root_store().cloned();
let doc = LoroDoc {
oplog: Arc::new(Mutex::new(self.oplog().try_lock().unwrap().fork(
arena.clone(),
config.clone(),
gc,
))),
state: new_state,
observer: Arc::new(Observer::new(arena.clone())),
arena,
config,
diff_calculator: Arc::new(Mutex::new(DiffCalculator::new(true))),
txn,
auto_commit: AtomicBool::new(false),
detached: AtomicBool::new(self.is_detached()),
local_update_subs: SubscriberSetWithQueue::new(),
peer_id_change_subs: SubscriberSetWithQueue::new(),
};
let snapshot = encoding::fast_snapshot::encode_snapshot_inner(self);
let doc = Self::new();
encoding::fast_snapshot::decode_snapshot_inner(snapshot, &doc).unwrap();
doc.set_config(&self.config);
if self.auto_commit.load(std::sync::atomic::Ordering::Relaxed) {
doc.start_auto_commit();
}
self.renew_txn_if_auto_commit();
doc
}

View file

@ -7,7 +7,7 @@ use std::borrow::Cow;
use std::cell::RefCell;
use std::cmp::Ordering;
use std::rc::Rc;
use std::sync::{Arc, Mutex};
use std::sync::Mutex;
use tracing::{debug, trace, trace_span};
use self::change_store::iter::MergedChangeIter;
@ -24,7 +24,6 @@ use crate::history_cache::ContainerHistoryCache;
use crate::id::{Counter, PeerID, ID};
use crate::op::{FutureInnerContent, ListSlice, RawOpContent, RemoteOp, RichOp};
use crate::span::{HasCounterSpan, HasLamportSpan};
use crate::state::GcStore;
use crate::version::{Frontiers, ImVersionVector, VersionVector};
use crate::LoroError;
use change_store::BlockOpRef;
@ -83,34 +82,6 @@ impl OpLog {
}
}
pub(crate) fn fork(
&self,
arena: SharedArena,
configure: Configure,
gc: Option<Arc<GcStore>>,
) -> Self {
let change_store = self.change_store.fork(
arena.clone(),
configure.merge_interval.clone(),
self.vv(),
self.frontiers(),
);
Self {
history_cache: Mutex::new(
self.history_cache
.try_lock()
.unwrap()
.fork(change_store.clone(), gc),
),
change_store: change_store.clone(),
dag: self.dag.fork(change_store),
arena,
pending_changes: Default::default(),
batch_importing: false,
configure,
}
}
#[inline]
pub fn dag(&self) -> &AppDag {
&self.dag

View file

@ -429,21 +429,6 @@ impl AppDag {
}
}
pub(super) fn fork(&self, change_store: ChangeStore) -> AppDag {
AppDag {
change_store: change_store.clone(),
map: Mutex::new(self.map.try_lock().unwrap().clone()),
frontiers: self.frontiers.clone(),
vv: self.vv.clone(),
unparsed_vv: Mutex::new(self.unparsed_vv.try_lock().unwrap().clone()),
unhandled_dep_points: Mutex::new(self.unhandled_dep_points.try_lock().unwrap().clone()),
shallow_since_frontiers: self.shallow_since_frontiers.clone(),
shallow_since_vv: self.shallow_since_vv.clone(),
shallow_root_frontiers_deps: self.shallow_root_frontiers_deps.clone(),
pending_txn_node: self.pending_txn_node.clone(),
}
}
pub fn total_parsed_dag_node(&self) -> usize {
self.map.try_lock().unwrap().len()
}

View file

@ -385,17 +385,18 @@ impl DocState {
}
pub fn fork_with_new_peer_id(
&self,
&mut self,
arena: SharedArena,
global_txn: Weak<Mutex<Option<Transaction>>>,
config: Configure,
) -> Arc<Mutex<Self>> {
Arc::new_cyclic(|weak| {
let peer = Arc::new(AtomicU64::new(DefaultRandom.next_u64()));
let peer = Arc::new(AtomicU64::new(DefaultRandom.next_u64()));
let store = self.store.fork(arena.clone(), peer.clone(), config.clone());
Arc::new_cyclic(move |weak| {
Mutex::new(Self {
peer: peer.clone(),
peer,
frontiers: self.frontiers.clone(),
store: self.store.fork(arena.clone(), peer, config.clone()),
store,
arena,
config,
weak_state: weak.clone(),

View file

@ -229,7 +229,7 @@ impl ContainerStore {
}
pub(crate) fn fork(
&self,
&mut self,
arena: SharedArena,
peer: Arc<AtomicU64>,
config: Configure,

View file

@ -5,7 +5,6 @@ use loro_common::{ContainerID, ContainerType, LoroResult, LoroValue};
use crate::state::counter_state::CounterState;
use crate::{
arena::SharedArena,
configure::Configure,
container::idx::ContainerIdx,
state::{
unknown_state::UnknownState, ContainerCreationContext, ContainerState, FastStateSnapshot,
@ -52,34 +51,6 @@ impl ContainerWrapper {
}
}
pub fn fork(&self, config: &Configure) -> Self {
if self.flushed {
Self {
depth: self.depth,
kind: self.kind,
parent: self.parent.clone(),
bytes: self.bytes.clone(),
value: self.value.clone(),
bytes_offset_for_value: self.bytes_offset_for_value,
bytes_offset_for_state: self.bytes_offset_for_state,
state: None,
flushed: true,
}
} else {
Self {
depth: self.depth,
kind: self.kind,
parent: self.parent.clone(),
bytes: None,
value: None,
bytes_offset_for_value: None,
bytes_offset_for_state: None,
state: Some(self.state.as_ref().unwrap().fork(config)),
flushed: false,
}
}
}
pub fn depth(&self) -> usize {
self.depth
}

View file

@ -1,14 +1,11 @@
use std::ops::Bound;
use bytes::Bytes;
use fxhash::FxHashMap;
use loro_common::ContainerID;
use tracing::trace;
use crate::{
arena::SharedArena, configure::Configure, container::idx::ContainerIdx,
state::container_store::FRONTIERS_KEY, utils::kv_wrapper::KvWrapper, version::Frontiers,
};
use bytes::Bytes;
use fxhash::FxHashMap;
use loro_common::ContainerID;
use std::ops::Bound;
use super::ContainerWrapper;
@ -145,7 +142,6 @@ impl InnerStore {
count += 1;
let cid = ContainerID::from_bytes(&k);
let parent = ContainerWrapper::decode_parent(&v);
trace!("decode register parent {:?} parent = {:?}", &cid, &parent);
let idx = self.arena.register_container(&cid);
let p = parent.as_ref().map(|p| self.arena.register_container(p));
self.arena.set_parent(idx, p);
@ -238,19 +234,12 @@ impl InnerStore {
}
}
pub(crate) fn fork(&self, arena: SharedArena, config: &Configure) -> InnerStore {
// PERF: we can try flushing before forking
InnerStore {
arena,
store: self
.store
.iter()
.map(|(idx, c)| (*idx, c.fork(config)))
.collect(),
kv: self.kv.clone(),
len: self.len,
all_loaded: self.all_loaded,
}
pub(crate) fn fork(&mut self, arena: SharedArena, _config: &Configure) -> InnerStore {
// PERF: we can try to reuse
let bytes = self.encode();
let mut new_store = Self::new(arena);
new_store.decode(bytes).unwrap();
new_store
}
pub(crate) fn len(&self) -> usize {

View file

@ -1768,7 +1768,7 @@ fn test_encode_snapshot_when_checkout() {
}
#[test]
fn travel_change_ancestors() {
fn test_travel_change_ancestors() {
let doc = LoroDoc::new();
doc.set_peer_id(1).unwrap();
doc.get_text("text").insert(0, "Hello").unwrap();