fix: lamport issue

This commit is contained in:
Zixuan Chen 2023-07-12 18:47:04 +08:00
parent 3638e3d0ed
commit 2a0f842fc5
7 changed files with 65 additions and 23 deletions

View file

@ -217,7 +217,7 @@ mod test {
ListSlice::RawStr("".into()),
ListSlice::Unknown(0),
];
let list_slice_buf = vec![3, 0, 1, 1, 1, 1, 0, 2, 0];
let list_slice_buf = vec![3, 0, 1, 1, 1, 1, 0, 3, 0];
assert_eq!(
postcard::from_bytes::<Vec<ListSlice>>(&list_slice_buf).unwrap(),
list_slice

View file

@ -12,6 +12,7 @@ use std::{
fmt::Debug,
};
use debug_log::debug_dbg;
use fxhash::{FxHashMap, FxHashSet};
use rle::{HasLength, Sliceable};
use smallvec::{smallvec, SmallVec};
@ -352,6 +353,7 @@ where
let mut b_count = b_ids.len();
let mut min = None;
while let Some((node, mut node_type)) = queue.pop() {
debug_dbg!(&node);
match node_type {
NodeType::A => a_count -= 1,
NodeType::B => b_count -= 1,

View file

@ -937,6 +937,36 @@ mod test {
)
}
#[test]
fn fuzz_r1() {
test_multi_sites_refactored(
8,
&mut [
Ins {
content: 3871,
pos: 20971570,
site: 0,
},
Sync { from: 0, to: 31 },
Ins {
content: 0,
pos: 0,
site: 0,
},
Ins {
content: 0,
pos: 58502001197056,
site: 0,
},
Ins {
content: 13599,
pos: 36261893487333151,
site: 31,
},
],
)
}
#[test]
fn fuzz_r() {
test_multi_sites_refactored(

View file

@ -3,7 +3,7 @@ use std::cmp::Ordering;
use crate::change::Lamport;
use crate::dag::{Dag, DagNode};
use crate::id::{Counter, ID};
use crate::span::{HasId, HasLamport};
use crate::span::{HasId, HasLamport, HasLamportSpan};
use crate::version::{Frontiers, ImVersionVector, VersionVector};
use rle::{HasIndex, HasLength, Mergable, Sliceable};
@ -160,4 +160,25 @@ impl AppDag {
pub fn vv_to_frontiers(&self, vv: &VersionVector) -> Frontiers {
vv.to_frontiers(self)
}
pub(crate) fn frontiers_to_next_lamport(&self, frontiers: &Frontiers) -> Lamport {
if frontiers.is_empty() {
return 0;
}
let mut lamport = {
let id = frontiers[0];
let Some(rle) = self.map.get(&id.peer) else { unreachable!() };
let Some(x) = rle.get_by_atom_index(id.counter) else { unreachable!() };
x.element.lamport_end()
};
for id in frontiers[1..].iter() {
let Some(rle) = self.map.get(&id.peer) else { unreachable!() };
let Some(x) = rle.get_by_atom_index(id.counter) else { unreachable!() };
lamport = lamport.max(x.element.lamport_end());
}
lamport
}
}

View file

@ -1,4 +1,3 @@
use debug_log::debug_dbg;
use enum_as_inner::EnumAsInner;
use enum_dispatch::enum_dispatch;
use fxhash::{FxHashMap, FxHashSet};
@ -28,7 +27,6 @@ use super::{arena::SharedArena, oplog::OpLog};
#[derive(Clone)]
pub struct AppState {
pub(super) peer: PeerID,
pub(super) next_lamport: Lamport,
pub(super) next_counter: Counter,
pub(super) frontiers: Frontiers,
@ -96,7 +94,6 @@ impl AppState {
Self {
peer,
next_counter: 0,
next_lamport: oplog.latest_lamport + 1,
frontiers: Frontiers::default(),
states: FxHashMap::default(),
arena: oplog.arena.clone(),
@ -135,7 +132,6 @@ impl AppState {
state.apply_diff(&diff.diff, &self.arena);
}
self.next_lamport = next_lamport.max(self.next_lamport);
self.frontiers = frontiers.clone();
}
@ -165,13 +161,7 @@ impl AppState {
self.in_txn = false;
}
pub(crate) fn commit_txn(
&mut self,
new_frontiers: Frontiers,
next_lamport: Lamport,
next_counter: Counter,
) {
debug_dbg!(&self.next_lamport, next_lamport);
pub(crate) fn commit_txn(&mut self, new_frontiers: Frontiers, next_counter: Counter) {
for container_idx in std::mem::take(&mut self.changed_in_txn) {
self.states.get_mut(&container_idx).unwrap().commit_txn();
}
@ -179,7 +169,6 @@ impl AppState {
self.in_txn = false;
self.frontiers = new_frontiers;
self.next_counter = next_counter;
self.next_lamport = next_lamport;
}
pub(super) fn get_state_mut(&mut self, idx: ContainerIdx) -> Option<&mut State> {

View file

@ -37,15 +37,19 @@ impl Transaction {
let frontiers = state_lock.frontiers.clone();
let peer = state_lock.peer;
let next_counter = state_lock.next_counter;
let next_lamport = state_lock.next_lamport;
let next_lamport = oplog
.lock()
.unwrap()
.dag
.frontiers_to_next_lamport(&frontiers);
drop(state_lock);
Self {
peer,
next_lamport,
next_counter,
state,
arena,
oplog,
next_lamport,
frontiers,
local_ops: RleVec::new(),
finished: false,
@ -86,10 +90,10 @@ impl Transaction {
let mut oplog = self.oplog.lock().unwrap();
let deps = take(&mut self.frontiers);
let change = Change {
lamport: self.next_lamport - ops.atom_len() as Lamport,
ops,
deps,
id: oplog.next_id(state.peer),
lamport: self.next_lamport,
timestamp: oplog.get_timestamp(),
};
@ -100,11 +104,7 @@ impl Transaction {
self._abort();
return Err(err);
}
state.commit_txn(
Frontiers::from_id(last_id),
self.next_lamport,
self.next_counter,
);
state.commit_txn(Frontiers::from_id(last_id), self.next_counter);
Ok(())
}

View file

@ -413,7 +413,7 @@ where
pub fn atom_len(&self) -> <A::Item as HasIndex>::Int {
self.vec
.last()
.map(|x| x.get_end_index())
.map(|x| x.get_end_index() - self.vec.first().unwrap().get_start_index())
.unwrap_or(<A::Item as HasIndex>::Int::from_usize(0).unwrap())
}
}