From 3638e3d0ed2b321689243d97955eeb7571c0f465 Mon Sep 17 00:00:00 2001 From: Zixuan Chen Date: Wed, 12 Jul 2023 18:17:57 +0800 Subject: [PATCH] fix: fix a encode/decode issue exposed by fuzzing test --- crates/loro-internal/src/fuzz.rs | 248 +++++++++++++++++- .../src/log_store/encoding/encode_changes.rs | 4 +- crates/loro-internal/src/refactor/loro.rs | 2 + crates/loro-internal/src/refactor/oplog.rs | 20 +- crates/loro-internal/src/refactor/state.rs | 17 +- crates/loro-internal/src/refactor/txn.rs | 10 +- crates/rle/src/rle_vec.rs | 3 +- 7 files changed, 288 insertions(+), 16 deletions(-) diff --git a/crates/loro-internal/src/fuzz.rs b/crates/loro-internal/src/fuzz.rs index 304244ca..fdabca5b 100644 --- a/crates/loro-internal/src/fuzz.rs +++ b/crates/loro-internal/src/fuzz.rs @@ -552,7 +552,9 @@ pub fn test_multi_sites_refactored(site_num: u8, actions: &mut [Action]) { sites.preprocess(action); applied.push(action.clone()); debug_log!("\n{}", (&applied).table()); + debug_log::group!("ApplyAction {:?}", &action); sites.apply_action(action); + debug_log::group_end!(); } debug_log::group!("CheckSynced"); @@ -941,20 +943,250 @@ mod test { 8, &mut [ Ins { - content: 9728, + content: 5225, pos: 0, - site: 57, + site: 4, }, Ins { - content: 205, + content: 53, + pos: 4, + site: 4, + }, + Ins { + content: 10284, pos: 0, - site: 37, + site: 2, + }, + Ins { + content: 10794, + pos: 0, + site: 2, + }, + Ins { + content: 10794, + pos: 6, + site: 2, + }, + Ins { + content: 10794, + pos: 6, + site: 2, + }, + Ins { + content: 8234, + pos: 0, + site: 6, + }, + Ins { + content: 7710, + pos: 1, + site: 6, + }, + Ins { + content: 0, + pos: 7, + site: 2, + }, + Ins { + content: 127, + pos: 0, + site: 7, + }, + Ins { + content: 2560, + pos: 0, + site: 0, + }, + Ins { + content: 10794, + pos: 4, + site: 2, + }, + Ins { + content: 10794, + pos: 1, + site: 2, + }, + Ins { + content: 10794, + pos: 30, + site: 2, + }, + Ins { + content: 10794, + pos: 29, + site: 2, + }, + Ins { + content: 10794, + pos: 4, + site: 6, + }, + Ins { + content: 10794, + pos: 0, + site: 2, + }, + Ins { + content: 4626, + pos: 6, + site: 2, + }, + Ins { + content: 4626, + pos: 2, + site: 2, + }, + Ins { + content: 10794, + pos: 6, + site: 2, + }, + Ins { + content: 54826, + pos: 0, + site: 0, + }, + Ins { + content: 12800, + pos: 9, + site: 6, + }, + Ins { + content: 3598, + pos: 0, + site: 4, + }, + Ins { + content: 11308, + pos: 2, + site: 4, + }, + Ins { + content: 10284, + pos: 3, + site: 4, + }, + Ins { + content: 11308, + pos: 10, + site: 4, + }, + Ins { + content: 11308, + pos: 24, + site: 4, + }, + Ins { + content: 11308, + pos: 28, + site: 4, + }, + Ins { + content: 11312, + pos: 16, + site: 4, + }, + Ins { + content: 11308, + pos: 5, + site: 4, + }, + Ins { + content: 15420, + pos: 9, + site: 2, + }, + Ins { + content: 12800, + pos: 0, + site: 5, + }, + Ins { + content: 10794, + pos: 6, + site: 2, + }, + Ins { + content: 10794, + pos: 21, + site: 2, + }, + Ins { + content: 10794, + pos: 34, + site: 2, + }, + Ins { + content: 12850, + pos: 10, + site: 2, + }, + Ins { + content: 12850, + pos: 0, + site: 2, + }, + Ins { + content: 10794, + pos: 21, + site: 2, + }, + Ins { + content: 10794, + pos: 6, + site: 2, + }, + Ins { + content: 10794, + pos: 56, + site: 2, + }, + Ins { + content: 10794, + pos: 2, + site: 6, + }, + Ins { + content: 7710, + pos: 2, + site: 6, + }, + Ins { + content: 10794, + pos: 27, + site: 2, + }, + Ins { + content: 10794, + pos: 70, + site: 2, + }, + Ins { + content: 10794, + pos: 69, + site: 2, }, SyncAll, Ins { - content: 52487, - pos: 5, - site: 54, + content: 0, + pos: 184, + site: 0, + }, + Del { + pos: 18, + len: 191, + site: 0, + }, + Del { + pos: 4, + len: 204, + site: 4, + }, + Del { + pos: 90, + len: 118, + site: 5, }, ], ); @@ -962,7 +1194,7 @@ mod test { #[test] fn mini_r() { - minify_error(2, vec![], test_multi_sites_refactored, normalize) + minify_error(8, vec![], test_multi_sites_refactored, normalize) } #[test] diff --git a/crates/loro-internal/src/log_store/encoding/encode_changes.rs b/crates/loro-internal/src/log_store/encoding/encode_changes.rs index 867d6718..9cb472b8 100644 --- a/crates/loro-internal/src/log_store/encoding/encode_changes.rs +++ b/crates/loro-internal/src/log_store/encoding/encode_changes.rs @@ -720,11 +720,11 @@ pub(super) fn decode_changes_to_inner_format( pub(crate) fn get_lamport_by_deps_oplog( deps: &Frontiers, lamport_map: &FxHashMap, Lamport)>>, - store: Option<&OpLog>, + oplog: Option<&OpLog>, ) -> Result { let mut ans = Vec::new(); for id in deps.iter() { - if let Some(c) = store.and_then(|x| x.lookup_change(*id)) { + if let Some(c) = oplog.and_then(|x| x.lookup_change(*id)) { let offset = id.counter - c.id.counter; ans.push(c.lamport + offset as u32); } else if let Some(v) = lamport_map.get(&id.peer) { diff --git a/crates/loro-internal/src/refactor/loro.rs b/crates/loro-internal/src/refactor/loro.rs index 1b6856eb..cd8819aa 100644 --- a/crates/loro-internal/src/refactor/loro.rs +++ b/crates/loro-internal/src/refactor/loro.rs @@ -59,6 +59,7 @@ impl LoroApp { state.apply_diff(AppStateDiff { diff: &diff, frontiers: oplog.frontiers(), + next_lamport: oplog.latest_lamport + 1, }); } @@ -95,6 +96,7 @@ impl LoroApp { state.apply_diff(AppStateDiff { diff: &diff, frontiers: oplog.frontiers(), + next_lamport: oplog.latest_lamport + 1, }); } diff --git a/crates/loro-internal/src/refactor/oplog.rs b/crates/loro-internal/src/refactor/oplog.rs index 3197bc2a..c87ee7ce 100644 --- a/crates/loro-internal/src/refactor/oplog.rs +++ b/crates/loro-internal/src/refactor/oplog.rs @@ -220,6 +220,8 @@ impl OpLog { } } + debug_dbg!(&changes); + changes } @@ -326,17 +328,29 @@ impl OpLog { // TODO: Perf change_causal_arr.sort_by_key(|x| x.lamport); + debug_dbg!(&change_causal_arr); for change in change_causal_arr { + debug_dbg!(&change); self.import_local_change(change)?; } Ok(()) } + /// lookup change by id. + /// + /// if id does not included in this oplog, return None pub(crate) fn lookup_change(&self, id: ID) -> Option<&Change> { - self.changes - .get(&id.peer) - .and_then(|x| x.get_by_atom_index(id.counter).map(|x| x.element)) + self.changes.get(&id.peer).and_then(|changes| { + // Because get_by_atom_index would return Some if counter is at the end, + // we cannot use it directly. + // TODO: maybe we should refactor this + if id.counter <= changes.last().unwrap().id_last().counter { + Some(changes.get_by_atom_index(id.counter).unwrap().element) + } else { + None + } + }) } pub fn export_from(&self, vv: &VersionVector) -> Vec { diff --git a/crates/loro-internal/src/refactor/state.rs b/crates/loro-internal/src/refactor/state.rs index cb22b6ce..e9b66e6e 100644 --- a/crates/loro-internal/src/refactor/state.rs +++ b/crates/loro-internal/src/refactor/state.rs @@ -1,3 +1,4 @@ +use debug_log::debug_dbg; use enum_as_inner::EnumAsInner; use enum_dispatch::enum_dispatch; use fxhash::{FxHashMap, FxHashSet}; @@ -85,6 +86,7 @@ pub struct ContainerStateDiff { pub struct AppStateDiff<'a> { pub(crate) diff: &'a [ContainerStateDiff], pub(crate) frontiers: &'a Frontiers, + pub(crate) next_lamport: Lamport, } impl AppState { @@ -107,7 +109,18 @@ impl AppState { self.peer = peer; } - pub fn apply_diff(&mut self, AppStateDiff { diff, frontiers }: AppStateDiff) { + pub fn apply_diff( + &mut self, + AppStateDiff { + diff, + frontiers, + next_lamport, + }: AppStateDiff, + ) { + if self.in_txn { + panic!("apply_diff should not be called in a transaction"); + } + for diff in diff { let state = self.states.entry(diff.idx).or_insert_with(|| { let id = self.arena.get_container_id(diff.idx).unwrap(); @@ -122,6 +135,7 @@ impl AppState { state.apply_diff(&diff.diff, &self.arena); } + self.next_lamport = next_lamport.max(self.next_lamport); self.frontiers = frontiers.clone(); } @@ -157,6 +171,7 @@ impl AppState { next_lamport: Lamport, next_counter: Counter, ) { + debug_dbg!(&self.next_lamport, next_lamport); for container_idx in std::mem::take(&mut self.changed_in_txn) { self.states.get_mut(&container_idx).unwrap().commit_txn(); } diff --git a/crates/loro-internal/src/refactor/txn.rs b/crates/loro-internal/src/refactor/txn.rs index 339397cc..110c2b47 100644 --- a/crates/loro-internal/src/refactor/txn.rs +++ b/crates/loro-internal/src/refactor/txn.rs @@ -57,6 +57,10 @@ impl Transaction { } fn _abort(&mut self) { + if self.finished { + return; + } + self.finished = true; self.state.lock().unwrap().abort_txn(); self.local_ops.clear(); @@ -67,6 +71,10 @@ impl Transaction { } fn _commit(&mut self) -> Result<(), LoroError> { + if self.finished { + return Ok(()); + } + self.finished = true; let mut state = self.state.lock().unwrap(); if self.local_ops.is_empty() { @@ -81,7 +89,7 @@ impl Transaction { ops, deps, id: oplog.next_id(state.peer), - lamport: oplog.next_lamport(), + lamport: self.next_lamport, timestamp: oplog.get_timestamp(), }; diff --git a/crates/rle/src/rle_vec.rs b/crates/rle/src/rle_vec.rs index 0343bb32..f6fde4b2 100644 --- a/crates/rle/src/rle_vec.rs +++ b/crates/rle/src/rle_vec.rs @@ -325,8 +325,9 @@ where } /// Get the element at the given atom index. + /// /// If the index is larger than the end index, return None. - /// return: (element, merged_index, offset) + /// Otherwise, return: Some((element, merged_index, offset)) pub fn get_by_atom_index( &self, index: ::Int,