Merge pull request #89 from loro-dev/fix-change-counter

fix: import change slice
This commit is contained in:
Zixuan Chen 2023-03-28 23:06:44 +08:00 committed by GitHub
commit 357fd8499a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -2,7 +2,7 @@ use crate::change::Change;
use crate::hierarchy::Hierarchy;
use crate::id::{ClientID, Counter, ID};
use crate::op::RemoteOp;
use crate::span::{CounterSpan, HasCounterSpan};
use crate::span::{CounterSpan, HasCounter, HasCounterSpan};
use crate::version::PatchedVersionVector;
use crate::LogStore;
use crate::{
@ -18,7 +18,7 @@ use tracing::instrument;
use fxhash::{FxHashMap, FxHashSet};
use rle::{HasLength, RleVecWithIndex};
use rle::{slice_vec_by, HasLength, RleVecWithIndex, Sliceable};
use crate::{
container::{registry::ContainerInstance, ContainerID, ContainerTrait},
@ -201,6 +201,12 @@ impl LogStore {
.entry(*client_id)
.or_insert_with(|| RleVecWithIndex::new_cfg(cfg.clone()));
for change in inner_changes {
// if let Some(last) = rle.last() {
// assert_eq!(
// last.id.counter + last.atom_len() as Counter,
// change.id.counter
// )
// }
rle.push(change);
}
}
@ -359,7 +365,10 @@ impl LogStore {
}
}
fn process_and_queue_changes(&mut self, changes: RemoteClientChanges) -> RemoteClientChanges {
fn process_and_queue_changes(
&mut self,
mut changes: RemoteClientChanges,
) -> RemoteClientChanges {
let mut latest_vv = self.get_vv().clone();
let mut retain_changes = FxHashMap::default();
@ -373,19 +382,20 @@ impl LogStore {
} else {
// Changes will be sorted by lamport. If the first change cannot be applied, then all subsequent changes with the same client id cannot be applied either.
// we cache these client id.
// self.tailor_changes(&mut changes);
let mut pending_clients = FxHashSet::default();
changes
.into_values()
.flat_map(|c| c.into_iter())
// sort changes by lamport from small to large
.sorted_by(|a, b| a.lamport.cmp(&b.lamport))
.for_each(|c| {
.for_each(|mut c| {
let c_client_id = c.id.client_id;
if pending_clients.contains(&c_client_id) {
self.pending_changes.get_mut(&c_client_id).unwrap().push(c);
return;
}
match can_remote_change_be_applied(&latest_vv, &c) {
match can_remote_change_be_applied(&latest_vv, &mut c) {
ChangeApplyState::Directly => {
latest_vv.set_end(c.id_end());
retain_changes
@ -409,7 +419,6 @@ impl LogStore {
}
});
}
retain_changes
}
@ -424,7 +433,7 @@ impl LogStore {
.into_iter()
.sorted_by(|a, b| a.lamport.cmp(&b.lamport))
.peekable();
while let Some(peek_c) = may_apply_iter.peek() {
while let Some(peek_c) = may_apply_iter.peek_mut() {
match can_remote_change_be_applied(latest_vv, peek_c) {
ChangeApplyState::Directly => {
let c = may_apply_iter.next().unwrap();
@ -451,8 +460,30 @@ impl LogStore {
}
}
}
fn tailor_changes(&mut self, changes: &mut RemoteClientChanges) {
changes.retain(|_, v| !v.is_empty());
for (client_id, changes) in changes.iter_mut() {
let self_end_ctr = self.vv.get(client_id).copied().unwrap_or(0);
let other_start_ctr = changes.first().unwrap().ctr_start();
match other_start_ctr.cmp(&self_end_ctr) {
std::cmp::Ordering::Less => {
*changes = slice_vec_by(
changes,
|x| x.id.counter as usize,
self_end_ctr as usize,
usize::MAX,
);
}
std::cmp::Ordering::Equal => {}
std::cmp::Ordering::Greater => {}
}
}
changes.retain(|_, v| !v.is_empty());
}
}
#[derive(Debug)]
enum ChangeApplyState {
Existing,
Directly,
@ -460,14 +491,17 @@ enum ChangeApplyState {
Future(ClientID),
}
fn can_remote_change_be_applied(vv: &VersionVector, change: &Change<RemoteOp>) -> ChangeApplyState {
fn can_remote_change_be_applied(
vv: &VersionVector,
change: &mut Change<RemoteOp>,
) -> ChangeApplyState {
let change_client_id = change.id.client_id;
let CounterSpan { start, end } = change.ctr_span();
let vv_latest_ctr = vv.get(&change_client_id).copied().unwrap_or(0);
if vv_latest_ctr < start {
return ChangeApplyState::Future(change_client_id);
}
if vv_latest_ctr >= end {
if vv_latest_ctr >= end || start == end {
return ChangeApplyState::Existing;
}
for dep in &change.deps {
@ -476,12 +510,17 @@ fn can_remote_change_be_applied(vv: &VersionVector, change: &Change<RemoteOp>) -
return ChangeApplyState::Future(dep.client_id);
}
}
if start < vv_latest_ctr {
*change = change.slice((vv_latest_ctr - start) as usize, (end - start) as usize);
}
ChangeApplyState::Directly
}
#[cfg(test)]
mod test {
use crate::{LoroCore, VersionVector};
use crate::{LoroCore, Transact, VersionVector};
#[test]
fn import_pending() {
@ -615,4 +654,24 @@ mod test {
c.decode(&updates_a123).unwrap();
assert_eq!(c.to_json(), a.to_json());
}
#[test]
fn applied_change_filter() {
let mut a = LoroCore::new(Default::default(), Some(1));
let mut b = LoroCore::new(Default::default(), Some(2));
let mut list_a = a.get_list("list");
let mut list_b = b.get_list("list");
{
let txn = a.transact();
list_a.insert(&txn, 0, "1").unwrap();
list_a.insert(&txn, 1, "1").unwrap();
}
b.decode(&a.encode_from(Default::default())).unwrap();
{
let txn = a.transact();
list_a.insert(&txn, 2, "1").unwrap();
list_a.insert(&txn, 3, "1").unwrap();
}
b.decode(&a.encode_from(Default::default())).unwrap();
}
}