mirror of
https://github.com/loro-dev/loro.git
synced 2025-02-02 02:59:51 +00:00
feat: add state only snapshot & refine check slow test
This commit is contained in:
parent
d678114681
commit
0f1df49b9e
8 changed files with 120 additions and 19 deletions
|
@ -427,6 +427,10 @@ impl ApplyDiff for TreeTracker {
|
|||
index,
|
||||
position,
|
||||
} => {
|
||||
if self.find_node_by_id(target).is_some() {
|
||||
panic!("{:?} node already exists", target);
|
||||
}
|
||||
|
||||
self.create_node(target, &parent.tree_id(), position.to_string(), index);
|
||||
}
|
||||
TreeExternalDiff::Delete { .. } => {
|
||||
|
|
|
@ -28,6 +28,7 @@ pub enum ExportMode<'a> {
|
|||
Updates { from: Cow<'a, VersionVector> },
|
||||
UpdatesInRange { spans: Cow<'a, [IdSpan]> },
|
||||
GcSnapshot(Cow<'a, Frontiers>),
|
||||
StateOnly(Option<Cow<'a, Frontiers>>),
|
||||
}
|
||||
|
||||
impl<'a> ExportMode<'a> {
|
||||
|
@ -71,6 +72,10 @@ impl<'a> ExportMode<'a> {
|
|||
let frontiers = Frontiers::from_id(id);
|
||||
ExportMode::GcSnapshot(Cow::Owned(frontiers))
|
||||
}
|
||||
|
||||
pub fn state_only(frontiers: Option<&'a Frontiers>) -> Self {
|
||||
ExportMode::StateOnly(frontiers.map(Cow::Borrowed))
|
||||
}
|
||||
}
|
||||
|
||||
const MAGIC_BYTES: [u8; 4] = *b"loro";
|
||||
|
@ -337,6 +342,12 @@ pub(crate) fn export_gc_snapshot(doc: &LoroDoc, f: &Frontiers) -> Vec<u8> {
|
|||
})
|
||||
}
|
||||
|
||||
pub(crate) fn export_state_only_snapshot(doc: &LoroDoc, f: &Frontiers) -> Vec<u8> {
|
||||
encode_with(EncodeMode::FastSnapshot, &mut |ans| {
|
||||
gc::export_state_only_snapshot(doc, f, ans).unwrap();
|
||||
})
|
||||
}
|
||||
|
||||
fn encode_with(mode: EncodeMode, f: &mut dyn FnMut(&mut Vec<u8>)) -> Vec<u8> {
|
||||
// HEADER
|
||||
let mut ans = Vec::with_capacity(MIN_HEADER_SIZE);
|
||||
|
|
|
@ -57,7 +57,7 @@ pub(crate) fn export_gc_snapshot<W: std::io::Write>(
|
|||
&start_vv, &start_from,
|
||||
);
|
||||
|
||||
let oplog_bytes = oplog.export_from_fast(&start_vv, &start_from);
|
||||
let oplog_bytes = oplog.export_change_store_from(&start_vv, &start_from);
|
||||
let latest_vv = oplog.vv();
|
||||
let ops_num: usize = latest_vv.sub_iter(&start_vv).map(|x| x.atom_len()).sum();
|
||||
drop(oplog);
|
||||
|
@ -95,6 +95,56 @@ pub(crate) fn export_gc_snapshot<W: std::io::Write>(
|
|||
Ok(start_from)
|
||||
}
|
||||
|
||||
pub(crate) fn export_state_only_snapshot<W: std::io::Write>(
|
||||
doc: &LoroDoc,
|
||||
start_from: &Frontiers,
|
||||
w: &mut W,
|
||||
) -> LoroResult<Frontiers> {
|
||||
assert!(!doc.is_detached());
|
||||
let oplog = doc.oplog().lock().unwrap();
|
||||
let start_from = calc_gc_doc_start(&oplog, start_from);
|
||||
trace!("gc_start_from {:?}", &start_from);
|
||||
let mut start_vv = oplog.dag().frontiers_to_vv(&start_from).unwrap();
|
||||
for id in start_from.iter() {
|
||||
// we need to include the ops in start_from, this can make things easier
|
||||
start_vv.insert(id.peer, id.counter);
|
||||
}
|
||||
|
||||
debug!(
|
||||
"start version vv={:?} frontiers={:?}",
|
||||
&start_vv, &start_from,
|
||||
);
|
||||
|
||||
let mut to_vv = start_vv.clone();
|
||||
for id in start_from.iter() {
|
||||
to_vv.insert(id.peer, id.counter + 1);
|
||||
}
|
||||
|
||||
let oplog_bytes =
|
||||
oplog.export_change_store_in_range(&start_vv, &start_from, &to_vv, &start_from);
|
||||
drop(oplog);
|
||||
doc.checkout(&start_from)?;
|
||||
let mut state = doc.app_state().lock().unwrap();
|
||||
let alive_containers = state.ensure_all_alive_containers();
|
||||
let alive_c_bytes: BTreeSet<Vec<u8>> = alive_containers.iter().map(|x| x.to_bytes()).collect();
|
||||
state.store.flush();
|
||||
let gc_state_kv = state.store.get_kv().clone();
|
||||
drop(state);
|
||||
doc.checkout_to_latest();
|
||||
let state_bytes = None;
|
||||
gc_state_kv.retain_keys(&alive_c_bytes);
|
||||
gc_state_kv.insert(FRONTIERS_KEY, start_from.encode().into());
|
||||
let gc_state_bytes = gc_state_kv.export();
|
||||
let snapshot = Snapshot {
|
||||
oplog_bytes,
|
||||
state_bytes,
|
||||
gc_bytes: gc_state_bytes,
|
||||
};
|
||||
|
||||
_encode_snapshot(snapshot, w);
|
||||
Ok(start_from)
|
||||
}
|
||||
|
||||
/// Calculates optimal starting version for the trimmed doc
|
||||
///
|
||||
/// It should be the LCA of the user given version and the latest version.
|
||||
|
|
|
@ -29,8 +29,8 @@ use crate::{
|
|||
diff_calc::DiffCalculator,
|
||||
encoding::{
|
||||
decode_snapshot, export_fast_snapshot, export_fast_updates, export_fast_updates_in_range,
|
||||
export_gc_snapshot, export_snapshot, json_schema::json::JsonSchema, parse_header_and_body,
|
||||
EncodeMode, ParsedHeaderAndBody,
|
||||
export_gc_snapshot, export_snapshot, export_state_only_snapshot,
|
||||
json_schema::json::JsonSchema, parse_header_and_body, EncodeMode, ParsedHeaderAndBody,
|
||||
},
|
||||
event::{str_to_path, EventTriggerKind, Index, InternalDocDiff},
|
||||
handler::{Handler, MovableListHandler, TextHandler, TreeHandler, ValueOrHandler},
|
||||
|
@ -1257,25 +1257,25 @@ impl LoroDoc {
|
|||
// 5. Compare the states of the new document and the current document.
|
||||
|
||||
// Step 1: Export the initial state from the GC snapshot.
|
||||
let initial_snapshot = self.export(ExportMode::GcSnapshot(Cow::Borrowed(
|
||||
&self.trimmed_frontiers(),
|
||||
)));
|
||||
let initial_snapshot =
|
||||
self.export(ExportMode::state_only(Some(&self.trimmed_frontiers())));
|
||||
|
||||
// Step 2: Create a new document and import the initial snapshot.
|
||||
let doc = LoroDoc::new();
|
||||
doc.detach();
|
||||
doc.import(&initial_snapshot).unwrap();
|
||||
self.checkout(&self.trimmed_frontiers()).unwrap();
|
||||
assert_eq!(self.get_deep_value(), doc.get_deep_value());
|
||||
|
||||
// Step 3: Export updates from the trimmed version vector to the current version.
|
||||
let updates = self.export(ExportMode::updates_owned(VersionVector::from_im_vv(
|
||||
&self.trimmed_vv(),
|
||||
)));
|
||||
let updates = self.export(ExportMode::all_updates());
|
||||
|
||||
// Step 4: Import these updates into the new document.
|
||||
doc.import(&updates).unwrap();
|
||||
self.checkout_to_latest();
|
||||
|
||||
// Step 5: Checkout to the current state's frontiers and compare the states.
|
||||
doc.checkout(&self.state_frontiers()).unwrap();
|
||||
// doc.checkout(&self.state_frontiers()).unwrap();
|
||||
assert_eq!(doc.get_deep_value(), self.get_deep_value());
|
||||
let mut calculated_state = doc.app_state().try_lock().unwrap();
|
||||
let mut current_state = self.app_state().try_lock().unwrap();
|
||||
current_state.check_is_the_same(&mut calculated_state);
|
||||
|
@ -1516,6 +1516,10 @@ impl LoroDoc {
|
|||
export_fast_updates_in_range(&self.oplog.lock().unwrap(), &spans)
|
||||
}
|
||||
ExportMode::GcSnapshot(f) => export_gc_snapshot(self, &f),
|
||||
ExportMode::StateOnly(f) => match f {
|
||||
Some(f) => export_state_only_snapshot(self, &f),
|
||||
None => export_state_only_snapshot(self, &self.oplog_frontiers()),
|
||||
},
|
||||
};
|
||||
|
||||
self.renew_txn_if_auto_commit();
|
||||
|
|
|
@ -365,14 +365,26 @@ impl OpLog {
|
|||
}
|
||||
|
||||
#[inline(always)]
|
||||
pub(crate) fn export_from_fast(&self, vv: &VersionVector, f: &Frontiers) -> Bytes {
|
||||
pub(crate) fn export_change_store_from(&self, vv: &VersionVector, f: &Frontiers) -> Bytes {
|
||||
self.change_store
|
||||
.export_from(vv, f, self.vv(), self.frontiers())
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
pub(crate) fn export_change_store_in_range(
|
||||
&self,
|
||||
vv: &VersionVector,
|
||||
f: &Frontiers,
|
||||
to_vv: &VersionVector,
|
||||
to_frontiers: &Frontiers,
|
||||
) -> Bytes {
|
||||
self.change_store.export_from(vv, f, to_vv, to_frontiers)
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
pub(crate) fn export_blocks_from<W: std::io::Write>(&self, vv: &VersionVector, w: &mut W) {
|
||||
self.change_store.export_blocks_from(vv, self.vv(), w)
|
||||
self.change_store
|
||||
.export_blocks_from(vv, self.trimmed_vv(), self.vv(), w)
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
|
|
|
@ -144,7 +144,7 @@ impl ChangeStore {
|
|||
pub(super) fn export_from(
|
||||
&self,
|
||||
start_vv: &VersionVector,
|
||||
f: &Frontiers,
|
||||
start_frontiers: &Frontiers,
|
||||
latest_vv: &VersionVector,
|
||||
latest_frontiers: &Frontiers,
|
||||
) -> Bytes {
|
||||
|
@ -166,8 +166,11 @@ impl ChangeStore {
|
|||
}
|
||||
}
|
||||
|
||||
debug!("start_vv={:?} start_frontiers={:?}", &start_vv, f);
|
||||
new_store.encode_from(start_vv, f, latest_vv, latest_frontiers)
|
||||
debug!(
|
||||
"start_vv={:?} start_frontiers={:?}",
|
||||
&start_vv, start_frontiers
|
||||
);
|
||||
new_store.encode_from(start_vv, start_frontiers, latest_vv, latest_frontiers)
|
||||
}
|
||||
|
||||
pub(super) fn export_blocks_in_range<W: std::io::Write>(&self, spans: &[IdSpan], w: &mut W) {
|
||||
|
@ -462,11 +465,19 @@ impl ChangeStore {
|
|||
pub(crate) fn export_blocks_from<W: std::io::Write>(
|
||||
&self,
|
||||
start_vv: &VersionVector,
|
||||
trimmed_vv: &ImVersionVector,
|
||||
latest_vv: &VersionVector,
|
||||
w: &mut W,
|
||||
) {
|
||||
let new_store = ChangeStore::new_mem(&self.arena, self.merge_interval.clone());
|
||||
for span in latest_vv.sub_iter(start_vv) {
|
||||
for mut span in latest_vv.sub_iter(start_vv) {
|
||||
let counter_lower_bound = trimmed_vv.get(&span.peer).copied().unwrap_or(0);
|
||||
span.counter.start = span.counter.start.max(counter_lower_bound);
|
||||
span.counter.end = span.counter.end.max(counter_lower_bound);
|
||||
if span.counter.start >= span.counter.end {
|
||||
continue;
|
||||
}
|
||||
|
||||
// PERF: this can be optimized by reusing the current encoded blocks
|
||||
// In the current method, it needs to parse and re-encode the blocks
|
||||
for c in self.iter_changes(span) {
|
||||
|
@ -497,7 +508,6 @@ fn encode_blocks_in_store<W: std::io::Write>(
|
|||
for (_id, block) in inner.mem_parsed_kv.iter_mut() {
|
||||
let bytes = block.to_bytes(&arena);
|
||||
leb128::write::unsigned(w, bytes.bytes.len() as u64).unwrap();
|
||||
trace!("encoded block_bytes = {:?}", &bytes.bytes);
|
||||
w.write_all(&bytes.bytes).unwrap();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1323,6 +1323,10 @@ impl DocState {
|
|||
arena: &SharedArena,
|
||||
state: &mut State,
|
||||
) -> Option<(ContainerID, (ContainerIdx, LoroValue))> {
|
||||
if state.is_state_empty() {
|
||||
return None;
|
||||
}
|
||||
|
||||
let id = arena.idx_to_id(state.container_idx()).unwrap();
|
||||
let value = match state {
|
||||
State::RichtextState(s) => s.get_richtext_value(),
|
||||
|
@ -1367,7 +1371,12 @@ impl DocState {
|
|||
let (_, other_value) = match other_id_to_states.remove(&id) {
|
||||
Some(x) => x,
|
||||
None => {
|
||||
panic!("id: {:?}, path: {:?} is missing", id, self.get_path(idx));
|
||||
panic!(
|
||||
"id: {:?}, path: {:?} is missing, value={:?}",
|
||||
id,
|
||||
self.get_path(idx),
|
||||
&this_value
|
||||
);
|
||||
}
|
||||
};
|
||||
|
||||
|
|
|
@ -28,6 +28,7 @@ anyhow = "1.0.83"
|
|||
ctor = "0.2"
|
||||
dev-utils = { path = "../dev-utils" }
|
||||
rand = "0.8.5"
|
||||
pretty_assertions = "1.4.0"
|
||||
|
||||
[features]
|
||||
counter = ["loro-internal/counter"]
|
||||
|
|
Loading…
Reference in a new issue