fix: fix a encode/decode issue

exposed by fuzzing test
This commit is contained in:
Zixuan Chen 2023-07-12 18:17:57 +08:00
parent 2d47387882
commit 3638e3d0ed
7 changed files with 288 additions and 16 deletions

View file

@ -552,7 +552,9 @@ pub fn test_multi_sites_refactored(site_num: u8, actions: &mut [Action]) {
sites.preprocess(action);
applied.push(action.clone());
debug_log!("\n{}", (&applied).table());
debug_log::group!("ApplyAction {:?}", &action);
sites.apply_action(action);
debug_log::group_end!();
}
debug_log::group!("CheckSynced");
@ -941,20 +943,250 @@ mod test {
8,
&mut [
Ins {
content: 9728,
content: 5225,
pos: 0,
site: 57,
site: 4,
},
Ins {
content: 205,
content: 53,
pos: 4,
site: 4,
},
Ins {
content: 10284,
pos: 0,
site: 37,
site: 2,
},
Ins {
content: 10794,
pos: 0,
site: 2,
},
Ins {
content: 10794,
pos: 6,
site: 2,
},
Ins {
content: 10794,
pos: 6,
site: 2,
},
Ins {
content: 8234,
pos: 0,
site: 6,
},
Ins {
content: 7710,
pos: 1,
site: 6,
},
Ins {
content: 0,
pos: 7,
site: 2,
},
Ins {
content: 127,
pos: 0,
site: 7,
},
Ins {
content: 2560,
pos: 0,
site: 0,
},
Ins {
content: 10794,
pos: 4,
site: 2,
},
Ins {
content: 10794,
pos: 1,
site: 2,
},
Ins {
content: 10794,
pos: 30,
site: 2,
},
Ins {
content: 10794,
pos: 29,
site: 2,
},
Ins {
content: 10794,
pos: 4,
site: 6,
},
Ins {
content: 10794,
pos: 0,
site: 2,
},
Ins {
content: 4626,
pos: 6,
site: 2,
},
Ins {
content: 4626,
pos: 2,
site: 2,
},
Ins {
content: 10794,
pos: 6,
site: 2,
},
Ins {
content: 54826,
pos: 0,
site: 0,
},
Ins {
content: 12800,
pos: 9,
site: 6,
},
Ins {
content: 3598,
pos: 0,
site: 4,
},
Ins {
content: 11308,
pos: 2,
site: 4,
},
Ins {
content: 10284,
pos: 3,
site: 4,
},
Ins {
content: 11308,
pos: 10,
site: 4,
},
Ins {
content: 11308,
pos: 24,
site: 4,
},
Ins {
content: 11308,
pos: 28,
site: 4,
},
Ins {
content: 11312,
pos: 16,
site: 4,
},
Ins {
content: 11308,
pos: 5,
site: 4,
},
Ins {
content: 15420,
pos: 9,
site: 2,
},
Ins {
content: 12800,
pos: 0,
site: 5,
},
Ins {
content: 10794,
pos: 6,
site: 2,
},
Ins {
content: 10794,
pos: 21,
site: 2,
},
Ins {
content: 10794,
pos: 34,
site: 2,
},
Ins {
content: 12850,
pos: 10,
site: 2,
},
Ins {
content: 12850,
pos: 0,
site: 2,
},
Ins {
content: 10794,
pos: 21,
site: 2,
},
Ins {
content: 10794,
pos: 6,
site: 2,
},
Ins {
content: 10794,
pos: 56,
site: 2,
},
Ins {
content: 10794,
pos: 2,
site: 6,
},
Ins {
content: 7710,
pos: 2,
site: 6,
},
Ins {
content: 10794,
pos: 27,
site: 2,
},
Ins {
content: 10794,
pos: 70,
site: 2,
},
Ins {
content: 10794,
pos: 69,
site: 2,
},
SyncAll,
Ins {
content: 52487,
pos: 5,
site: 54,
content: 0,
pos: 184,
site: 0,
},
Del {
pos: 18,
len: 191,
site: 0,
},
Del {
pos: 4,
len: 204,
site: 4,
},
Del {
pos: 90,
len: 118,
site: 5,
},
],
);
@ -962,7 +1194,7 @@ mod test {
#[test]
fn mini_r() {
minify_error(2, vec![], test_multi_sites_refactored, normalize)
minify_error(8, vec![], test_multi_sites_refactored, normalize)
}
#[test]

View file

@ -720,11 +720,11 @@ pub(super) fn decode_changes_to_inner_format(
pub(crate) fn get_lamport_by_deps_oplog(
deps: &Frontiers,
lamport_map: &FxHashMap<PeerID, Vec<(Range<Counter>, Lamport)>>,
store: Option<&OpLog>,
oplog: Option<&OpLog>,
) -> Result<Lamport, PeerID> {
let mut ans = Vec::new();
for id in deps.iter() {
if let Some(c) = store.and_then(|x| x.lookup_change(*id)) {
if let Some(c) = oplog.and_then(|x| x.lookup_change(*id)) {
let offset = id.counter - c.id.counter;
ans.push(c.lamport + offset as u32);
} else if let Some(v) = lamport_map.get(&id.peer) {

View file

@ -59,6 +59,7 @@ impl LoroApp {
state.apply_diff(AppStateDiff {
diff: &diff,
frontiers: oplog.frontiers(),
next_lamport: oplog.latest_lamport + 1,
});
}
@ -95,6 +96,7 @@ impl LoroApp {
state.apply_diff(AppStateDiff {
diff: &diff,
frontiers: oplog.frontiers(),
next_lamport: oplog.latest_lamport + 1,
});
}

View file

@ -220,6 +220,8 @@ impl OpLog {
}
}
debug_dbg!(&changes);
changes
}
@ -326,17 +328,29 @@ impl OpLog {
// TODO: Perf
change_causal_arr.sort_by_key(|x| x.lamport);
debug_dbg!(&change_causal_arr);
for change in change_causal_arr {
debug_dbg!(&change);
self.import_local_change(change)?;
}
Ok(())
}
/// lookup change by id.
///
/// if id does not included in this oplog, return None
pub(crate) fn lookup_change(&self, id: ID) -> Option<&Change> {
self.changes
.get(&id.peer)
.and_then(|x| x.get_by_atom_index(id.counter).map(|x| x.element))
self.changes.get(&id.peer).and_then(|changes| {
// Because get_by_atom_index would return Some if counter is at the end,
// we cannot use it directly.
// TODO: maybe we should refactor this
if id.counter <= changes.last().unwrap().id_last().counter {
Some(changes.get_by_atom_index(id.counter).unwrap().element)
} else {
None
}
})
}
pub fn export_from(&self, vv: &VersionVector) -> Vec<u8> {

View file

@ -1,3 +1,4 @@
use debug_log::debug_dbg;
use enum_as_inner::EnumAsInner;
use enum_dispatch::enum_dispatch;
use fxhash::{FxHashMap, FxHashSet};
@ -85,6 +86,7 @@ pub struct ContainerStateDiff {
pub struct AppStateDiff<'a> {
pub(crate) diff: &'a [ContainerStateDiff],
pub(crate) frontiers: &'a Frontiers,
pub(crate) next_lamport: Lamport,
}
impl AppState {
@ -107,7 +109,18 @@ impl AppState {
self.peer = peer;
}
pub fn apply_diff(&mut self, AppStateDiff { diff, frontiers }: AppStateDiff) {
pub fn apply_diff(
&mut self,
AppStateDiff {
diff,
frontiers,
next_lamport,
}: AppStateDiff,
) {
if self.in_txn {
panic!("apply_diff should not be called in a transaction");
}
for diff in diff {
let state = self.states.entry(diff.idx).or_insert_with(|| {
let id = self.arena.get_container_id(diff.idx).unwrap();
@ -122,6 +135,7 @@ impl AppState {
state.apply_diff(&diff.diff, &self.arena);
}
self.next_lamport = next_lamport.max(self.next_lamport);
self.frontiers = frontiers.clone();
}
@ -157,6 +171,7 @@ impl AppState {
next_lamport: Lamport,
next_counter: Counter,
) {
debug_dbg!(&self.next_lamport, next_lamport);
for container_idx in std::mem::take(&mut self.changed_in_txn) {
self.states.get_mut(&container_idx).unwrap().commit_txn();
}

View file

@ -57,6 +57,10 @@ impl Transaction {
}
fn _abort(&mut self) {
if self.finished {
return;
}
self.finished = true;
self.state.lock().unwrap().abort_txn();
self.local_ops.clear();
@ -67,6 +71,10 @@ impl Transaction {
}
fn _commit(&mut self) -> Result<(), LoroError> {
if self.finished {
return Ok(());
}
self.finished = true;
let mut state = self.state.lock().unwrap();
if self.local_ops.is_empty() {
@ -81,7 +89,7 @@ impl Transaction {
ops,
deps,
id: oplog.next_id(state.peer),
lamport: oplog.next_lamport(),
lamport: self.next_lamport,
timestamp: oplog.get_timestamp(),
};

View file

@ -325,8 +325,9 @@ where
}
/// Get the element at the given atom index.
///
/// If the index is larger than the end index, return None.
/// return: (element, merged_index, offset)
/// Otherwise, return: Some((element, merged_index, offset))
pub fn get_by_atom_index(
&self,
index: <A::Item as HasIndex>::Int,