fix: op converter

This commit is contained in:
leeeon233 2023-09-18 15:21:38 +08:00
parent 7b012dec00
commit c294c61343
2 changed files with 57 additions and 56 deletions

View file

@ -406,26 +406,24 @@ impl OpLog {
changes: RemoteClientChanges,
) -> Result<(), LoroError> {
// check whether we can append the new changes
self.pending_changes.check_changes(&changes)?;
let vv = &self.dag.vv;
let local_changes = self.pending_changes.filter_and_pending_remote_changes(
changes,
&self.arena,
vv.clone(),
)?;
let local_changes =
self.pending_changes
.get_can_be_applied_changes(changes, &self.arena, vv.clone())?;
// TODO: should we check deps here?
self.apply_local_change_from_remote(local_changes);
Ok(())
}
fn apply_local_change_from_remote(&mut self, local_changes: Vec<Change>) {
fn apply_local_change_from_remote(&mut self, mut local_changes: Vec<Change>) {
local_changes.sort_by_key(|x| x.lamport);
if !local_changes.is_empty() {
self.next_lamport = self
.next_lamport
.max(local_changes.last().unwrap().lamport_end());
}
// debug_dbg!(&change_causal_arr);
for change in local_changes {
self.dag.vv.extend_to_include_last_id(change.id_last());

View file

@ -1,5 +1,9 @@
use crate::{
arena::SharedArena, change::Change, encoding::RemoteClientChanges, op::RemoteOp, VersionVector,
arena::{OpConverter, SharedArena},
change::Change,
encoding::RemoteClientChanges,
op::RemoteOp,
VersionVector,
};
use fxhash::FxHashMap;
use itertools::Itertools;
@ -29,55 +33,57 @@ impl PendingChanges {
can_be_applied_changes
}
pub(crate) fn filter_and_pending_remote_changes(
pub(crate) fn get_can_be_applied_changes(
&mut self,
remote_changes: RemoteClientChanges,
arena: &SharedArena,
latest_vv: VersionVector,
) -> Result<Vec<Change>, LoroError> {
self.check_changes(&remote_changes)?;
self.last_pending_vv = latest_vv;
let mut can_be_applied_changes = Vec::new();
let mut peer_to_pending_dep = FxHashMap::default();
for change in remote_changes
.into_values()
.flat_map(|c| c.into_iter())
.sorted_unstable_by_key(|c| c.lamport)
{
let local_change = convert_remote_to_pending_op(change, arena);
if let Some(pre_dep) = peer_to_pending_dep.get(&local_change.id.peer) {
self.pending_changes
.get_mut(pre_dep)
.unwrap()
.push(local_change);
continue;
}
match remote_change_apply_state(&self.last_pending_vv, &local_change) {
ChangeApplyState::Directly => {
self.last_pending_vv.set_end(local_change.id_end());
let id_last = local_change.id_last();
can_be_applied_changes.push(local_change);
self.try_apply_pending(&id_last, &mut can_be_applied_changes);
}
ChangeApplyState::Existing => {}
ChangeApplyState::Future(id) => {
peer_to_pending_dep.insert(local_change.id.peer, id);
// op_converter is faster than using arena directly
arena.with_op_converter(|converter| {
for change in remote_changes
.into_values()
.flat_map(|c| c.into_iter())
.sorted_unstable_by_key(|c| c.lamport)
{
let local_change = to_local_op(change, converter);
if let Some(pre_dep) = peer_to_pending_dep.get(&local_change.id.peer) {
self.pending_changes
.entry(id)
.or_insert_with(Vec::new)
.get_mut(pre_dep)
.unwrap()
.push(local_change);
continue;
}
match remote_change_apply_state(&self.last_pending_vv, &local_change) {
ChangeApplyState::Directly => {
self.last_pending_vv.set_end(local_change.id_end());
let id_last = local_change.id_last();
can_be_applied_changes.push(local_change);
self.try_apply_pending(&id_last, &mut can_be_applied_changes);
}
ChangeApplyState::Existing => {}
ChangeApplyState::Future(id) => {
peer_to_pending_dep.insert(local_change.id.peer, id);
self.pending_changes
.entry(id)
.or_insert_with(Vec::new)
.push(local_change);
}
}
}
}
});
Ok(can_be_applied_changes)
}
fn check_changes(&self, changes: &RemoteClientChanges) -> Result<(), LoroError> {
pub(super) fn check_changes(&self, changes: &RemoteClientChanges) -> Result<(), LoroError> {
for changes in changes.values() {
if changes.is_empty() {
continue;
}
// detect invalid d
// detect invalid id
let mut last_end_counter = None;
for change in changes.iter() {
if change.id.counter < 0 {
@ -132,24 +138,21 @@ impl PendingChanges {
}
}
fn convert_remote_to_pending_op(change: Change<RemoteOp>, arena: &SharedArena) -> Change {
// op_converter is faster than using arena directly
arena.with_op_converter(|converter| {
let mut ops = RleVec::new();
for op in change.ops {
for content in op.contents.into_iter() {
let op = converter.convert_single_op(&op.container, op.counter, content);
ops.push(op);
}
fn to_local_op(change: Change<RemoteOp>, converter: &mut OpConverter) -> Change {
let mut ops = RleVec::new();
for op in change.ops {
for content in op.contents.into_iter() {
let op = converter.convert_single_op(&op.container, op.counter, content);
ops.push(op);
}
Change {
ops,
id: change.id,
deps: change.deps,
lamport: change.lamport,
timestamp: change.timestamp,
}
})
}
Change {
ops,
id: change.id,
deps: change.deps,
lamport: change.lamport,
timestamp: change.timestamp,
}
}
enum ChangeApplyState {