fix: snapshot pending import

This commit is contained in:
leeeon233 2023-03-25 17:23:11 +08:00 committed by Leonzhao
parent 9db061ed36
commit ac80775b17
2 changed files with 49 additions and 87 deletions

View file

@ -362,72 +362,53 @@ impl LogStore {
fn tailor_changes(&mut self, changes: RemoteClientChanges) -> RemoteClientChanges {
let mut latest_vv = self.get_vv().clone();
println!("#####Decode 开始 changes: {:?}", changes);
println!("latest vv {:?}", latest_vv);
self.debug_pending();
let mut retain_changes = FxHashMap::default();
let mut client_to_pending_dep = FxHashMap::default();
changes
.into_values()
.flat_map(|c| c.into_iter())
.sorted_by(|a, b| Ord::cmp(&b.lamport, &a.lamport))
.for_each(|c| {
print!("当前 ");
debug_remote_change(&c);
println!("");
if let Some(pre_dep) = client_to_pending_dep.get(&c.id.client_id) {
self.pending_changes.get_mut(pre_dep).unwrap().push(c);
return;
}
match can_remote_change_be_applied(&latest_vv, &c) {
ChangeApplyState::Directly => {
println!("apply");
latest_vv.set_end(c.id_end());
let last_id = c.id_last();
retain_changes
.entry(c.id.client_id)
.or_insert_with(Vec::new)
.push(c);
self.try_apply_pending(&last_id, &mut latest_vv, &mut retain_changes);
if changes.values().map(|c| c.len()).sum::<usize>() == 0 {
// snapshot
let client_ids: Vec<_> = latest_vv.keys().copied().collect();
for client_id in client_ids {
let counter = latest_vv.get_last(client_id).unwrap();
self.try_apply_pending(
&ID { client_id, counter },
&mut latest_vv,
&mut retain_changes,
)
}
} else {
let mut client_to_pending_dep = FxHashMap::default();
changes
.into_values()
.flat_map(|c| c.into_iter())
.sorted_by(|a, b| Ord::cmp(&b.lamport, &a.lamport))
.for_each(|c| {
if let Some(pre_dep) = client_to_pending_dep.get(&c.id.client_id) {
self.pending_changes.get_mut(pre_dep).unwrap().push(c);
return;
}
ChangeApplyState::Existing => {
println!("exist")
match can_remote_change_be_applied(&latest_vv, &c) {
ChangeApplyState::Directly => {
latest_vv.set_end(c.id_end());
let last_id = c.id_last();
retain_changes
.entry(c.id.client_id)
.or_insert_with(Vec::new)
.push(c);
self.try_apply_pending(&last_id, &mut latest_vv, &mut retain_changes);
}
ChangeApplyState::Existing => {}
ChangeApplyState::Future(dep) => {
client_to_pending_dep.insert(c.id.client_id, dep);
self.pending_changes
.entry(dep)
.or_insert_with(Vec::new)
.push(c);
}
}
ChangeApplyState::Future(dep) => {
println!("future dep {:?}", dep);
client_to_pending_dep.insert(c.id.client_id, dep);
self.pending_changes
.entry(dep)
.or_insert_with(Vec::new)
.push(c);
}
}
});
self.debug_pending();
// retain_changes
// .values_mut()
// .for_each(|v| v.sort_by(|a, b| Ord::cmp(&a.lamport, &b.lamport)));
println!("!!!!!Decode 结束 \n{:?}", retain_changes);
});
}
retain_changes
// cancel filter empty changes, snapshot can use empty changes to check pending changes
// changes.retain(|_, v| !v.is_empty());
// for (client_id, changes) in changes.iter_mut() {
// self.filter_changes(client_id, changes);
// }
// changes.retain(|_, v| !v.is_empty());
// changes
}
fn debug_pending(&self) {
println!("pending:");
for (k, v) in self.pending_changes.iter() {
print!(" {:?}: ", k);
v.iter().for_each(debug_remote_change);
println!("");
}
println!("")
}
fn try_apply_pending(
@ -437,7 +418,6 @@ impl LogStore {
retain_changes: &mut RemoteClientChanges,
) {
if let Some(may_apply_changes) = self.pending_changes.remove(dep) {
println!(" 有此依赖 {:?} ", dep);
let mut may_apply_iter = may_apply_changes
.into_iter()
.sorted_by(|a, b| a.lamport.cmp(&b.lamport))
@ -446,9 +426,6 @@ impl LogStore {
match can_remote_change_be_applied(latest_vv, peek_c) {
ChangeApplyState::Directly => {
let c = may_apply_iter.next().unwrap();
print!("apply ");
debug_remote_change(&c);
println!("");
latest_vv.set_end(c.id_end());
let last_id = c.id_last();
// other pending
@ -500,21 +477,6 @@ fn can_remote_change_be_applied(vv: &VersionVector, change: &Change<RemoteOp>) -
ChangeApplyState::Directly
}
fn debug_remote_change(change: &Change<RemoteOp>) {
print!(
"Change: id_span: {:?} deps: {:?} lamport {}, ",
change.id_span(),
change.deps,
change.lamport
);
}
// #[derive(Debug)]
// pub(crate) struct ChangeWithLamport {
// change: Change<RemoteOp>,
// lamport: Lamport,
// }
#[cfg(test)]
mod test {
use crate::{LoroCore, VersionVector};
@ -584,12 +546,12 @@ mod test {
let version_a1b1 = a.vv_cloned();
text_a.insert(&a, 2, "c").unwrap();
let update_a2 = a.encode_from(version_a1b1);
// c.decode(&update_a2).unwrap();
// assert_eq!(c.to_json().to_json(), "{}");
// c.decode(&update_a1).unwrap();
// assert_eq!(c.to_json().to_json(), "{\"text\":\"a\"}");
// c.decode(&update_b1).unwrap();
// assert_eq!(a.to_json(), c.to_json());
c.decode(&update_a2).unwrap();
assert_eq!(c.to_json().to_json(), "{}");
c.decode(&update_a1).unwrap();
assert_eq!(c.to_json().to_json(), "{\"text\":\"a\"}");
c.decode(&update_b1).unwrap();
assert_eq!(a.to_json(), c.to_json());
d.decode(&update_a2).unwrap();
assert_eq!(d.to_json().to_json(), "{}");

View file

@ -327,7 +327,7 @@ impl VersionVector {
}
#[inline]
pub fn get_last(&mut self, client_id: ClientID) -> Option<Counter> {
pub fn get_last(&self, client_id: ClientID) -> Option<Counter> {
self.0
.get(&client_id)
.and_then(|&x| if x == 0 { None } else { Some(x - 1) })