refactor: make gc store accessible for history cache

This commit is contained in:
Zixuan Chen 2024-08-26 23:01:42 +08:00
parent 87bc464f3b
commit ea2beae48e
No known key found for this signature in database
8 changed files with 52 additions and 58 deletions

View file

@ -79,7 +79,7 @@ pub(crate) fn decode_snapshot(doc: &LoroDoc, bytes: Bytes) -> LoroResult<()> {
})?;
if !oplog.is_empty() {
unimplemented!("You can only import snapshot to a empty loro doc now");
panic!("InternalError importing snapshot to an non-empty doc");
}
assert!(state.frontiers.is_empty());
@ -98,6 +98,10 @@ pub(crate) fn decode_snapshot(doc: &LoroDoc, bytes: Bytes) -> LoroResult<()> {
state_bytes,
oplog.dag().trimmed_frontiers().clone(),
)?;
let gc_store = state.gc_store().cloned();
oplog.with_history_cache(|h| {
h.set_gc_store(gc_store);
});
}
// FIXME: we may need to extract the unknown containers here?
// Or we should lazy load it when the time comes?

View file

@ -1,4 +1,3 @@
use bytes::Bytes;
use loro_common::LoroResult;
use tracing::debug;
@ -9,8 +8,6 @@ use crate::{
LoroDoc,
};
use super::fast_snapshot::_decode_snapshot_bytes;
#[tracing::instrument(skip_all)]
pub(crate) fn export_gc_snapshot<W: std::io::Write>(
doc: &LoroDoc,
@ -68,31 +65,3 @@ fn calc_actual_start(oplog: &crate::OpLog, frontiers: &Frontiers) -> Frontiers {
let cur_f = oplog.frontiers();
oplog.dag.find_common_ancestor(&start, cur_f).0
}
pub(crate) fn import_gc_snapshot(doc: &LoroDoc, bytes: Bytes) -> LoroResult<()> {
let mut oplog = doc.oplog().lock().unwrap();
let mut state = doc.app_state().lock().unwrap();
if !oplog.is_empty() || !state.is_empty() {
panic!()
}
let Snapshot {
oplog_bytes,
state_bytes,
gc_bytes,
} = _decode_snapshot_bytes(bytes)?;
oplog.decode_change_store(oplog_bytes)?;
if !gc_bytes.is_empty() {
state.store.decode_gc(
gc_bytes,
state_bytes,
oplog.dag().trimmed_frontiers().clone(),
)?;
} else {
state.store.decode(state_bytes)?;
}
// FIXME: we may need to extract the unknown containers here?
// Or we should lazy load it when the time comes?
state.init_with_states_and_version(oplog.frontiers().clone(), &oplog, vec![], false);
Ok(())
}

View file

@ -14,13 +14,13 @@ use loro_common::{
use rle::HasLength;
use crate::{
arena::SharedArena,
change::{Change, Lamport},
container::{idx::ContainerIdx, list::list_op::InnerListOp, tree::tree_op::TreeOp},
diff_calc::tree::TreeCacheForDiff,
encoding::value_register::ValueRegister,
op::{InnerContent, RichOp},
oplog::ChangeStore,
state::GcStore,
OpLog, VersionVector,
};
@ -32,6 +32,7 @@ use crate::{
#[derive(Debug)]
pub(crate) struct ContainerHistoryCache {
change_store: ChangeStore,
gc: Option<Arc<GcStore>>,
for_checkout: Option<ForCheckout>,
for_importing: Option<FxHashMap<ContainerIdx, HistoryCacheForImporting>>,
}
@ -58,19 +59,21 @@ impl HistoryCacheTrait for ForCheckout {
}
impl ContainerHistoryCache {
pub(crate) fn fork(&self, _arena: SharedArena, change_store: ChangeStore) -> Self {
pub(crate) fn fork(&self, change_store: ChangeStore, gc: Option<Arc<GcStore>>) -> Self {
Self {
change_store,
for_checkout: None,
for_importing: None,
gc,
}
}
pub(crate) fn new(_arena: SharedArena, change_store: ChangeStore) -> Self {
pub(crate) fn new(change_store: ChangeStore, gc: Option<Arc<GcStore>>) -> Self {
Self {
change_store,
for_checkout: Default::default(),
for_importing: Default::default(),
gc,
}
}
@ -217,6 +220,10 @@ impl ContainerHistoryCache {
pub(crate) fn free(&mut self) {
self.for_checkout = None;
}
pub(crate) fn set_gc_store(&mut self, gc_store: Option<Arc<GcStore>>) {
self.gc = gc_store;
}
}
#[enum_dispatch(OpGroupTrait)]

View file

@ -125,13 +125,13 @@ impl LoroDoc {
.lock()
.unwrap()
.fork(arena.clone(), Arc::downgrade(&txn), config.clone());
let gc = new_state.try_lock().unwrap().gc_store().cloned();
let doc = LoroDoc {
oplog: Arc::new(Mutex::new(
self.oplog()
.lock()
.unwrap()
.fork(arena.clone(), config.clone()),
)),
oplog: Arc::new(Mutex::new(self.oplog().lock().unwrap().fork(
arena.clone(),
config.clone(),
gc,
))),
state: new_state,
arena,
config,

View file

@ -7,7 +7,7 @@ use std::borrow::Cow;
use std::cell::RefCell;
use std::cmp::Ordering;
use std::rc::Rc;
use std::sync::Mutex;
use std::sync::{Arc, Mutex};
use tracing::{debug, instrument, trace, trace_span};
use self::change_store::iter::MergedChangeIter;
@ -24,6 +24,7 @@ use crate::history_cache::ContainerHistoryCache;
use crate::id::{Counter, PeerID, ID};
use crate::op::{FutureInnerContent, ListSlice, RawOpContent, RemoteOp, RichOp};
use crate::span::{HasCounterSpan, HasLamportSpan};
use crate::state::GcStore;
use crate::version::{Frontiers, ImVersionVector, VersionVector};
use crate::LoroError;
use change_store::BlockOpRef;
@ -77,10 +78,7 @@ impl OpLog {
let cfg = Configure::default();
let change_store = ChangeStore::new_mem(&arena, cfg.merge_interval.clone());
Self {
history_cache: Mutex::new(ContainerHistoryCache::new(
arena.clone(),
change_store.clone(),
)),
history_cache: Mutex::new(ContainerHistoryCache::new(change_store.clone(), None)),
dag: AppDag::new(change_store.clone()),
change_store,
arena,
@ -92,7 +90,12 @@ impl OpLog {
}
}
pub(crate) fn fork(&self, arena: SharedArena, configure: Configure) -> Self {
pub(crate) fn fork(
&self,
arena: SharedArena,
configure: Configure,
gc: Option<Arc<GcStore>>,
) -> Self {
let change_store = self
.change_store
.fork(arena.clone(), configure.merge_interval.clone());
@ -101,7 +104,7 @@ impl OpLog {
self.history_cache
.lock()
.unwrap()
.fork(arena.clone(), change_store.clone()),
.fork(change_store.clone(), gc),
),
change_store: change_store.clone(),
dag: self.dag.fork(change_store),

View file

@ -46,6 +46,7 @@ mod tree_state;
mod unknown_state;
pub(crate) use self::movable_list_state::{IndexType, MovableListState};
pub(crate) use container_store::GcStore;
pub(crate) use list_state::ListState;
pub(crate) use map_state::MapState;
pub(crate) use richtext_state::RichtextState;
@ -1462,6 +1463,10 @@ impl DocState {
Some(value)
}
pub fn gc_store(&self) -> Option<&Arc<GcStore>> {
self.store.gc_store()
}
}
fn create_state_(idx: ContainerIdx, config: &Configure, peer: u64) -> State {

View file

@ -14,7 +14,7 @@ use bytes::Bytes;
use fxhash::FxHashMap;
use inner_store::InnerStore;
use loro_common::{ContainerID, ContainerType, LoroResult, LoroValue};
use std::sync::{atomic::AtomicU64, Arc};
use std::sync::{atomic::AtomicU64, Arc, Mutex};
pub(crate) use container_wrapper::ContainerWrapper;
@ -57,7 +57,7 @@ mod inner_store;
pub(crate) struct ContainerStore {
arena: SharedArena,
store: InnerStore,
gc_store: Option<Box<GcStore>>,
gc_store: Option<Arc<GcStore>>,
conf: Configure,
peer: Arc<AtomicU64>,
}
@ -70,9 +70,10 @@ impl std::fmt::Debug for ContainerStore {
}
}
struct GcStore {
trimmed_frontiers: Frontiers,
store: InnerStore,
#[derive(Debug)]
pub(crate) struct GcStore {
pub trimmed_frontiers: Frontiers,
pub store: Mutex<InnerStore>,
}
macro_rules! ctx {
@ -108,6 +109,10 @@ impl ContainerStore {
.map(|x| x.get_state(idx, ctx!(self)))
}
pub fn gc_store(&self) -> Option<&Arc<GcStore>> {
self.gc_store.as_ref()
}
pub fn get_value(&mut self, idx: ContainerIdx) -> Option<LoroValue> {
self.store
.get_mut(idx)
@ -120,7 +125,7 @@ impl ContainerStore {
pub fn encode_gc(&mut self) -> Bytes {
if let Some(gc) = self.gc_store.as_mut() {
gc.store.encode()
gc.store.try_lock().unwrap().get_kv().export()
} else {
Bytes::new()
}
@ -143,11 +148,12 @@ impl ContainerStore {
assert!(self.gc_store.is_none());
self.store.decode_twice(gc_bytes.clone(), state_bytes)?;
if !start_frontiers.is_empty() {
self.gc_store = Some(Box::new(GcStore {
let mut inner = InnerStore::new(self.arena.clone());
inner.decode(gc_bytes)?;
self.gc_store = Some(Arc::new(GcStore {
trimmed_frontiers: start_frontiers,
store: InnerStore::new(self.arena.clone()),
store: Mutex::new(inner),
}));
self.gc_store.as_mut().unwrap().store.decode(gc_bytes)?;
}
Ok(())
}

View file

@ -39,7 +39,7 @@ impl KvWrapper {
pub fn import(&self, bytes: Bytes) {
let mut kv = self.kv.lock().unwrap();
kv.import_all(bytes);
kv.import_all(bytes).unwrap();
}
pub fn export(&self) -> Bytes {