fix: detect importing outdated updates on gc doc

This commit is contained in:
Zixuan Chen 2024-08-26 22:14:23 +08:00
parent cfe8652415
commit 4e64051c58
No known key found for this signature in database
6 changed files with 78 additions and 16 deletions

View file

@ -5,7 +5,7 @@ use crate::{InternalString, PeerID, TreeID, ID};
pub type LoroResult<T> = Result<T, LoroError>;
#[derive(Error, Debug)]
#[derive(Error, Debug, PartialEq)]
pub enum LoroError {
#[error("Context's client_id({found:?}) does not match Container's client_id({expected:?})")]
UnmatchedContext { expected: PeerID, found: PeerID },
@ -76,9 +76,11 @@ pub enum LoroError {
EndIndexLessThanStartIndex { start: usize, end: usize },
#[error("Invalid root container name! Don't include '/' or '\\0'")]
InvalidRootContainerName,
#[error("Import Failed: The dependencies of the importing updates are trimmed from the doc.")]
ImportUpdatesThatDependsOnOutdatedVersion,
}
#[derive(Error, Debug)]
#[derive(Error, Debug, PartialEq)]
pub enum LoroTreeError {
#[error("`Cycle move` occurs when moving tree nodes.")]
CyclicMoveError,

View file

@ -239,6 +239,10 @@ pub(crate) fn import_changes_to_oplog(
continue;
}
if oplog.dag.is_dep_on_trimmed_history(&change.deps) {
return Err(LoroError::ImportUpdatesThatDependsOnOutdatedVersion);
}
latest_ids.push(change.id_last());
// calc lamport or pending if its deps are not satisfied
match oplog.dag.get_change_lamport_from_deps(&change.deps) {

View file

@ -572,7 +572,7 @@ impl LoroDoc {
let mut oplog = self.oplog.lock().unwrap();
let old_vv = oplog.vv().clone();
let old_frontiers = oplog.frontiers().clone();
f(&mut oplog)?;
let result = f(&mut oplog);
if !self.detached.load(Acquire) {
debug!("checkout from {:?} to {:?}", old_vv, oplog.vv());
let mut diff = DiffCalculator::new(false);
@ -594,7 +594,7 @@ impl LoroDoc {
} else {
tracing::info!("Detached");
}
Ok(())
result
}
/// For fuzzing tests

View file

@ -32,9 +32,9 @@ pub struct AppDag {
/// The latest known version vectorG
vv: VersionVector,
/// The latest known frontiers
start_frontiers: Frontiers,
trimmed_frontiers: Frontiers,
/// The latest known version vectorG
start_vv: ImVersionVector,
trimmed_vv: ImVersionVector,
/// Ops included in the version vector but not parsed yet
///
/// # Invariants
@ -99,8 +99,8 @@ impl AppDag {
vv: VersionVector::default(),
unparsed_vv: Mutex::new(VersionVector::default()),
unhandled_dep_points: Mutex::new(BTreeSet::new()),
start_frontiers: Default::default(),
start_vv: Default::default(),
trimmed_frontiers: Default::default(),
trimmed_vv: Default::default(),
}
}
@ -113,11 +113,11 @@ impl AppDag {
}
pub fn start_vv(&self) -> &ImVersionVector {
&self.start_vv
&self.trimmed_vv
}
pub fn start_frontiers(&self) -> &Frontiers {
&self.start_frontiers
&self.trimmed_frontiers
}
pub fn is_empty(&self) -> bool {
@ -407,8 +407,8 @@ impl AppDag {
vv: self.vv.clone(),
unparsed_vv: Mutex::new(self.unparsed_vv.try_lock().unwrap().clone()),
unhandled_dep_points: Mutex::new(self.unhandled_dep_points.try_lock().unwrap().clone()),
start_frontiers: self.start_frontiers.clone(),
start_vv: self.start_vv.clone(),
trimmed_frontiers: self.trimmed_frontiers.clone(),
trimmed_vv: self.trimmed_vv.clone(),
}
}
@ -422,8 +422,8 @@ impl AppDag {
self.vv = v.vv;
self.frontiers = v.frontiers;
if let Some((vv, f)) = v.start_version {
self.start_frontiers = f;
self.start_vv = ImVersionVector::from_vv(&vv);
self.trimmed_frontiers = f;
self.trimmed_vv = ImVersionVector::from_vv(&vv);
}
}
@ -526,6 +526,18 @@ impl AppDag {
assert_eq!(maybe_frontiers, frontiers);
}
}
pub(crate) fn is_dep_on_trimmed_history(&self, deps: &Frontiers) -> bool {
if self.trimmed_vv.is_empty() {
return false;
}
if deps.is_empty() {
return true;
}
deps.iter().any(|x| self.trimmed_vv.includes_id(*x))
}
}
fn check_always_dep_on_last_id(map: &BTreeMap<ID, AppDagNode>) {
@ -680,8 +692,8 @@ impl AppDag {
}
let mut ans_vv = ImVersionVector::default();
if node.deps == self.start_frontiers {
for (&p, &c) in self.start_vv.iter() {
if node.deps == self.trimmed_frontiers {
for (&p, &c) in self.trimmed_vv.iter() {
ans_vv.insert(p, c);
}
} else {

View file

@ -153,6 +153,10 @@ impl ImVersionVector {
self.set_last(id)
}
}
pub(crate) fn includes_id(&self, x: ID) -> bool {
self.get(&x.peer).copied().unwrap_or(0) > x.counter
}
}
// TODO: use a better data structure that is Array when small

View file

@ -1044,3 +1044,43 @@ fn test_gc_empty() {
doc_c.import(&bytes).unwrap();
assert_eq!(doc_c.get_deep_value(), new_doc.get_deep_value());
}
#[test]
fn test_gc_import_outdated_updates() {
let doc = LoroDoc::new();
apply_random_ops(&doc, 123, 11);
let bytes = doc.export(loro::ExportMode::GcSnapshot(
&ID::new(doc.peer_id(), 5).into(),
));
let new_doc = LoroDoc::new();
new_doc.import(&bytes).unwrap();
let other_doc = LoroDoc::new();
apply_random_ops(&other_doc, 123, 11);
let err = new_doc
.import(&other_doc.export_from(&Default::default()))
.unwrap_err();
assert_eq!(err, LoroError::ImportUpdatesThatDependsOnOutdatedVersion);
}
#[test]
fn test_gc_import_pending_updates_that_is_outdated() {
let doc = LoroDoc::new();
apply_random_ops(&doc, 123, 11);
let bytes = doc.export(loro::ExportMode::GcSnapshot(
&ID::new(doc.peer_id(), 5).into(),
));
let new_doc = LoroDoc::new();
new_doc.import(&bytes).unwrap();
let other_doc = LoroDoc::new();
apply_random_ops(&other_doc, 123, 5);
let bytes_a = other_doc.export_from(&Default::default());
let vv = other_doc.oplog_vv();
apply_random_ops(&other_doc, 123, 5);
let bytes_b = other_doc.export_from(&vv);
// pending
new_doc.import(&bytes_b).unwrap();
let err = new_doc.import(&bytes_a).unwrap_err();
assert_eq!(err, LoroError::ImportUpdatesThatDependsOnOutdatedVersion);
}