fix: dep is in merged pending change

This commit is contained in:
leeeon233 2023-03-26 11:53:21 +08:00 committed by Leonzhao
parent 87ae95ef02
commit d4f786e64a
2 changed files with 59 additions and 29 deletions

View file

@ -9,7 +9,6 @@ use crate::{version::Frontiers, LoroValue};
pub use encoding::{EncodeMode, LoroEncoder};
pub(crate) use import::ImportContext;
use std::{
collections::BinaryHeap,
marker::PhantomPinned,
sync::{Arc, Mutex, MutexGuard, RwLock, Weak},
};
@ -80,7 +79,7 @@ pub struct LogStore {
pub(crate) this_client_id: ClientID,
/// CRDT container manager
pub(crate) reg: ContainerRegistry,
pending_changes: FxHashMap<ID, Vec<Change<RemoteOp>>>,
pending_changes: RemoteClientChanges,
_pin: PhantomPinned,
}

View file

@ -1,4 +1,4 @@
use crate::change::{Change, Lamport};
use crate::change::Change;
use crate::hierarchy::Hierarchy;
use crate::id::{ClientID, Counter, ID};
use crate::op::RemoteOp;
@ -12,20 +12,19 @@ use crate::{
};
use itertools::Itertools;
use smallvec::{smallvec, SmallVec};
use std::collections::BinaryHeap;
use std::sync::Arc;
use std::{collections::VecDeque, sync::MutexGuard};
use tracing::instrument;
use fxhash::{FxHashMap, FxHashSet};
use rle::{slice_vec_by, HasLength, RleVecWithIndex};
use rle::{HasLength, RleVecWithIndex};
use crate::{
container::{registry::ContainerInstance, ContainerID, ContainerTrait},
dag::{remove_included_frontiers, DagUtils},
op::RichOp,
span::{HasCounter, HasIdSpan, HasLamportSpan, IdSpan},
span::{HasIdSpan, HasLamportSpan, IdSpan},
version::are_frontiers_eq,
VersionVector,
};
@ -91,7 +90,7 @@ impl LogStore {
hierarchy: &mut Hierarchy,
changes: RemoteClientChanges,
) -> Vec<RawEvent> {
let changes = self.tailor_changes(changes);
let changes = self.process_and_queue_changes(changes);
if changes.is_empty() {
return vec![];
}
@ -360,7 +359,7 @@ impl LogStore {
}
}
fn tailor_changes(&mut self, changes: RemoteClientChanges) -> RemoteClientChanges {
fn process_and_queue_changes(&mut self, changes: RemoteClientChanges) -> RemoteClientChanges {
let mut latest_vv = self.get_vv().clone();
let mut retain_changes = FxHashMap::default();
@ -368,39 +367,40 @@ impl LogStore {
// snapshot
let client_ids: Vec<_> = latest_vv.keys().copied().collect();
for client_id in client_ids {
let counter = latest_vv.get_last(client_id).unwrap();
self.try_apply_pending(
&ID { client_id, counter },
&mut latest_vv,
&mut retain_changes,
)
// let counter = latest_vv.get_last(client_id).unwrap();
self.try_apply_pending(&client_id, &mut latest_vv, &mut retain_changes)
}
} else {
let mut client_to_pending_dep = FxHashMap::default();
let mut pending_clients = FxHashSet::default();
changes
.into_values()
.flat_map(|c| c.into_iter())
.sorted_by(|a, b| Ord::cmp(&b.lamport, &a.lamport))
.for_each(|c| {
if let Some(pre_dep) = client_to_pending_dep.get(&c.id.client_id) {
self.pending_changes.get_mut(pre_dep).unwrap().push(c);
let c_client_id = c.id.client_id;
if pending_clients.contains(&c_client_id) {
self.pending_changes.get_mut(&c_client_id).unwrap().push(c);
return;
}
match can_remote_change_be_applied(&latest_vv, &c) {
ChangeApplyState::Directly => {
latest_vv.set_end(c.id_end());
let last_id = c.id_last();
// let last_id = c.id_last();
retain_changes
.entry(c.id.client_id)
.entry(c_client_id)
.or_insert_with(Vec::new)
.push(c);
self.try_apply_pending(&last_id, &mut latest_vv, &mut retain_changes);
self.try_apply_pending(
&c_client_id,
&mut latest_vv,
&mut retain_changes,
);
}
ChangeApplyState::Existing => {}
ChangeApplyState::Future(dep) => {
client_to_pending_dep.insert(c.id.client_id, dep);
pending_clients.insert(c_client_id);
self.pending_changes
.entry(dep)
.entry(dep.client_id)
.or_insert_with(Vec::new)
.push(c);
}
@ -413,11 +413,11 @@ impl LogStore {
fn try_apply_pending(
&mut self,
dep: &ID,
client_id: &ClientID,
latest_vv: &mut VersionVector,
retain_changes: &mut RemoteClientChanges,
) {
if let Some(may_apply_changes) = self.pending_changes.remove(dep) {
if let Some(may_apply_changes) = self.pending_changes.remove(client_id) {
let mut may_apply_iter = may_apply_changes
.into_iter()
.sorted_by(|a, b| a.lamport.cmp(&b.lamport))
@ -426,21 +426,21 @@ impl LogStore {
match can_remote_change_be_applied(latest_vv, peek_c) {
ChangeApplyState::Directly => {
let c = may_apply_iter.next().unwrap();
let c_client_id = c.id.client_id;
latest_vv.set_end(c.id_end());
let last_id = c.id_last();
// other pending
retain_changes
.entry(c.id.client_id)
.entry(c_client_id)
.or_insert_with(Vec::new)
.push(c);
self.try_apply_pending(&last_id, latest_vv, retain_changes);
self.try_apply_pending(&c_client_id, latest_vv, retain_changes);
}
ChangeApplyState::Existing => {
may_apply_iter.next();
}
ChangeApplyState::Future(this_dep) => {
self.pending_changes
.entry(this_dep)
.entry(this_dep.client_id)
.or_insert_with(Vec::new)
.extend(may_apply_iter);
break;
@ -454,7 +454,7 @@ impl LogStore {
enum ChangeApplyState {
Existing,
Directly,
// The first dissatisfied deps: the previous change or it's deps
/// The id of first missing dep
Future(ID),
}
@ -582,4 +582,35 @@ mod test {
a.decode(&b_change).unwrap();
assert_eq!(c.to_json(), a.to_json());
}
#[test]
fn pending_changes_may_deps_merged_change() {
// a: (a1 <-- a2 <-- a3) <-- a4 a1~a3 is a merged change
// \ /
// b: b1
let mut a = LoroCore::new(Default::default(), Some(1));
let mut b = LoroCore::new(Default::default(), Some(2));
let mut c = LoroCore::new(Default::default(), Some(3));
let mut text_a = a.get_text("text");
let mut text_b = b.get_text("text");
text_a.insert(&a, 0, "a").unwrap();
text_a.insert(&a, 1, "b").unwrap();
let version_a12 = a.vv_cloned();
let updates_a12 = a.encode_all();
text_a.insert(&a, 2, "c").unwrap();
let updates_a123 = a.encode_all();
b.decode(&updates_a12).unwrap();
text_b.insert(&b, 2, "d").unwrap();
let update_b1 = b.encode_from(version_a12);
a.decode(&update_b1).unwrap();
let version_a123_b1 = a.vv_cloned();
text_a.insert(&a, 4, "e").unwrap();
let update_a4 = a.encode_from(version_a123_b1);
c.decode(&update_b1).unwrap();
assert_eq!(c.to_json().to_json(), "{}");
c.decode(&update_a4).unwrap();
assert_eq!(c.to_json().to_json(), "{}");
c.decode(&updates_a123).unwrap();
assert_eq!(c.to_json(), a.to_json());
}
}