refactor: make map import independent from history cache

This commit is contained in:
Zixuan Chen 2024-08-06 23:22:24 +08:00
parent 06ea79b900
commit b791157494
No known key found for this signature in database
10 changed files with 234 additions and 104 deletions

View file

@ -19,7 +19,6 @@ enum_dispatch = "0.3.11"
enum-as-inner = "0.5.1"
fxhash = "0.2.1"
tracing = { version = "0.1", features = [
"max_level_debug",
"release_max_level_warn",
] }
serde_columnar = { version = "0.3.5" }

View file

@ -23,6 +23,7 @@ mod test;
use crate::{
change::Lamport,
diff_calc::DiffMode,
id::{Counter, PeerID, ID},
span::{CounterSpan, HasId, HasIdSpan, HasLamport, HasLamportSpan, IdSpan},
version::{Frontiers, VersionVector, VersionVectorDiff},
@ -57,7 +58,7 @@ pub(crate) trait Dag: Debug {
}
pub(crate) trait DagUtils: Dag {
fn find_common_ancestor(&self, a_id: &[ID], b_id: &[ID]) -> Frontiers;
fn find_common_ancestor(&self, a_id: &[ID], b_id: &[ID]) -> (Frontiers, DiffMode);
/// Slow, should probably only use on dev
#[allow(unused)]
fn get_vv(&self, id: ID) -> VersionVector;
@ -82,7 +83,7 @@ pub(crate) trait DagUtils: Dag {
impl<T: Dag + ?Sized> DagUtils for T {
#[inline]
fn find_common_ancestor(&self, a_id: &[ID], b_id: &[ID]) -> Frontiers {
fn find_common_ancestor(&self, a_id: &[ID], b_id: &[ID]) -> (Frontiers, DiffMode) {
// TODO: perf: make it also return the spans to reach common_ancestors
find_common_ancestor(&|id| self.get(id), a_id, b_id)
}
@ -267,7 +268,7 @@ impl<'a> Ord for OrdIdSpan<'a> {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.lamport_last()
.cmp(&other.lamport_last())
.then(self.id_last().cmp(&other.id_last()))
.then(self.id.peer.cmp(&other.id.peer))
}
}
@ -307,13 +308,17 @@ impl<'a> OrdIdSpan<'a> {
}
#[inline(always)]
fn find_common_ancestor<'a, F, D>(get: &'a F, a_id: &[ID], b_id: &[ID]) -> Frontiers
fn find_common_ancestor<'a, F, D>(get: &'a F, a_id: &[ID], b_id: &[ID]) -> (Frontiers, DiffMode)
where
D: DagNode + 'a,
F: Fn(ID) -> Option<&'a D>,
{
if a_id.is_empty() || b_id.is_empty() {
return Default::default();
if a_id.is_empty() {
return (Default::default(), DiffMode::Import);
}
if b_id.is_empty() {
return (Default::default(), DiffMode::Checkout);
}
_find_common_ancestor_new(get, a_id, b_id)
@ -472,7 +477,11 @@ where
ans
}
fn _find_common_ancestor_new<'a, F, D>(get: &'a F, left: &[ID], right: &[ID]) -> Frontiers
fn _find_common_ancestor_new<'a, F, D>(
get: &'a F,
left: &[ID],
right: &[ID],
) -> (Frontiers, DiffMode)
where
D: DagNode + 'a,
F: Fn(ID) -> Option<&'a D>,
@ -485,22 +494,24 @@ where
let right_span = get(right).unwrap();
if std::ptr::eq(left_span, right_span) {
if left.counter < right.counter {
return smallvec![left].into();
return (smallvec![left].into(), DiffMode::Linear);
} else {
return smallvec![right].into();
return (smallvec![right].into(), DiffMode::Checkout);
}
}
if left_span.deps().len() == 1 && right_span.contains_id(left_span.deps()[0]) {
return smallvec![right].into();
return (smallvec![right].into(), DiffMode::Checkout);
}
if right_span.deps().len() == 1 && left_span.contains_id(right_span.deps()[0]) {
return smallvec![left].into();
return (smallvec![left].into(), DiffMode::Linear);
}
}
}
let mut is_linear = left.len() == 1 && right.len() == 1;
let mut is_right_greater = true;
let mut ans: Frontiers = Default::default();
let mut queue: BinaryHeap<(SmallVec<[OrdIdSpan; 1]>, NodeType)> = BinaryHeap::new();
@ -513,8 +524,7 @@ where
.map(|&id| OrdIdSpan::from_dag_node(id, get).unwrap())
.collect();
if ans.len() > 1 {
ans.sort();
ans.reverse();
ans.sort_unstable_by(|a, b| b.cmp(a));
}
ans
@ -540,9 +550,17 @@ where
ans = node.into_iter().map(|x| x.id_last()).collect();
}
// Iteration is done and no common ancestor is found
// So the ans is empty
is_right_greater = false;
break;
}
// if node_type is A, then the left side is greater or parallel to the right side
if node_type == NodeType::A {
is_right_greater = false;
}
if node.len() > 1 {
for node in node.drain(1..node.len()) {
queue.push((smallvec::smallvec![node], node_type));
@ -575,12 +593,29 @@ where
if node[0].deps.len() > 0 {
queue.push((ids_to_ord_id_spans(node[0].deps.as_ref(), get), node_type));
is_linear = false;
} else {
is_right_greater = false;
break;
}
}
ans
let mode = if is_right_greater {
if ans.len() <= 1 {
debug_assert_eq!(ans.as_slice(), left);
}
if is_linear {
debug_assert!(ans.len() <= 1);
DiffMode::Linear
} else {
DiffMode::Import
}
} else {
DiffMode::Checkout
};
(ans, mode)
}
pub fn remove_included_frontiers(frontiers: &mut VersionVector, new_change_deps: &[ID]) {

View file

@ -279,6 +279,7 @@ fn test_dag() {
// println!("{}", b.mermaid());
assert_eq!(
b.find_common_ancestor(&[ID::new(0, 2)], &[ID::new(1, 1)])
.0
.first()
.copied(),
None,
@ -687,9 +688,14 @@ mod find_common_ancestors {
a.push(5);
let actual = a
.find_common_ancestor(&[ID::new(0, 2)], &[ID::new(0, 4)])
.0
.first()
.copied();
assert_eq!(actual, Some(ID::new(0, 2)));
assert_eq!(
a.find_common_ancestor(&[ID::new(0, 2)], &[ID::new(0, 4)]).1,
DiffMode::Linear
);
}
#[test]
@ -701,6 +707,7 @@ mod find_common_ancestors {
a.merge(&b);
let actual = a
.find_common_ancestor(&[ID::new(0, 0)], &[ID::new(1, 0)])
.0
.first()
.copied();
assert_eq!(actual, None);
@ -715,9 +722,14 @@ mod find_common_ancestors {
// should no exist any common ancestor between a and b
let actual = a
.find_common_ancestor(&[ID::new(0, 0)], &[ID::new(1, 0)])
.0
.first()
.copied();
assert_eq!(actual, None);
assert_eq!(
a.find_common_ancestor(&[ID::new(0, 0)], &[ID::new(1, 0)]).1,
DiffMode::Checkout
)
}
#[test]
@ -733,11 +745,13 @@ mod find_common_ancestors {
println!("{}", a.mermaid());
let actual = a
.find_common_ancestor(&[ID::new(0, 4)], &[ID::new(0, 1), ID::new(1, 1)])
.0
.first()
.copied();
assert_eq!(actual, None);
let actual = a
.find_common_ancestor(&[ID::new(0, 4)], &[ID::new(1, 1)])
.0
.first()
.copied();
assert_eq!(actual, None);
@ -755,6 +769,7 @@ mod find_common_ancestors {
println!("{}", b.mermaid());
assert_eq!(
b.find_common_ancestor(&[ID::new(0, 3)], &[ID::new(1, 8)])
.0
.first()
.copied(),
Some(ID::new(0, 2))
@ -785,12 +800,14 @@ mod find_common_ancestors {
println!("{}", a1.mermaid());
assert_eq!(
a1.find_common_ancestor(&[ID::new(0, 3)], &[ID::new(1, 4)])
.0
.first()
.copied(),
Some(ID::new(0, 2))
);
assert_eq!(
a1.find_common_ancestor(&[ID::new(2, 3)], &[ID::new(1, 3)])
.0
.iter()
.copied()
.collect::<Vec<_>>(),
@ -953,7 +970,7 @@ mod find_common_ancestors_proptest {
let a = dags[0].nodes.get(&0).unwrap().last().unwrap().id_last();
let b = dags[1].nodes.get(&1).unwrap().last().unwrap().id_last();
let actual = dags[0].find_common_ancestor(&[a], &[b]);
prop_assert_eq!(&**actual, &[expected]);
prop_assert_eq!(&**actual.0, &[expected]);
Ok(())
}
@ -1101,7 +1118,7 @@ mod find_common_ancestors_proptest {
dag_a.merge(dag_b);
let a = dag_a.get_last_node().id;
let b = dag_b.get_last_node().id;
let mut actual = dag_a.find_common_ancestor(&[a], &[b]);
let mut actual = dag_a.find_common_ancestor(&[a], &[b]).0;
actual.sort();
let actual = actual.iter().copied().collect::<Vec<_>>();
if actual != expected {

View file

@ -15,7 +15,7 @@ use loro_common::{
PeerID, ID,
};
use smallvec::SmallVec;
use tracing::instrument;
use tracing::{instrument, trace, warn};
use crate::{
container::{
@ -142,7 +142,7 @@ impl DiffCalculator {
self.has_all = true;
self.last_vv = Default::default();
}
let (lca, iter) =
let (lca, diff_mode, iter) =
oplog.iter_from_lca_causally(before, before_frontiers, after, after_frontiers);
tracing::info!("LCA: {:?}", &lca);
let mut started_set = FxHashSet::default();
@ -156,6 +156,7 @@ impl DiffCalculator {
);
}
let end_counter = merged.get(&change.id.peer).unwrap();
if self.has_all {
self.last_vv.extend_to_include_end_id(change.id_end());
}
@ -166,6 +167,10 @@ impl DiffCalculator {
.unwrap_or_else(|e| e);
let mut visited = FxHashSet::default();
for mut op in &change.ops.vec()[iter_start..] {
if op.counter >= *end_counter {
break;
}
let idx = op.container;
if let Some(filter) = container_filter {
if !filter(idx) {
@ -174,14 +179,17 @@ impl DiffCalculator {
}
// slice the op if needed
// PERF: we can skip the slice by using the RichOp::new_slice
let stack_sliced_op;
if op.counter < start_counter {
if op.ctr_last() < start_counter {
continue;
}
if op.ctr_last() < start_counter {
continue;
}
stack_sliced_op =
Some(op.slice((start_counter - op.counter) as usize, op.atom_len()));
if op.counter < start_counter || op.ctr_end() > *end_counter {
stack_sliced_op = Some(op.slice(
((start_counter as usize).saturating_sub(op.counter as usize)),
op.atom_len().min((end_counter - op.counter) as usize),
));
op = stack_sliced_op.as_ref().unwrap();
}
let vv = &mut vv.borrow_mut();
@ -197,7 +205,7 @@ impl DiffCalculator {
if !started_set.contains(&op.container) {
started_set.insert(container);
calculator.start_tracking(oplog, &lca);
calculator.start_tracking(oplog, &lca, diff_mode);
}
if visited.contains(&op.container) {
@ -213,9 +221,6 @@ impl DiffCalculator {
}
}
}
for (_, (_, calculator)) in self.calculators.iter_mut() {
calculator.stop_tracking(oplog, after);
}
Some(started_set)
} else {
@ -274,6 +279,7 @@ impl DiffCalculator {
if d != *depth {
*depth = d;
all.push((*depth, container_idx));
calc.finish_this_round();
continue;
}
}
@ -285,6 +291,7 @@ impl DiffCalculator {
container_id_to_depth.insert(c.clone(), depth.and_then(|d| d.checked_add(1)));
oplog.arena.register_container(c);
});
calc.finish_this_round();
if !diff.is_empty() || bring_back {
ans.insert(
container_idx,
@ -383,14 +390,13 @@ impl DiffCalculator {
///
#[enum_dispatch]
pub(crate) trait DiffCalculatorTrait {
fn start_tracking(&mut self, oplog: &OpLog, vv: &crate::VersionVector);
fn start_tracking(&mut self, oplog: &OpLog, vv: &crate::VersionVector, mode: DiffMode);
fn apply_change(
&mut self,
oplog: &OpLog,
op: crate::op::RichOp,
vv: Option<&crate::VersionVector>,
);
fn stop_tracking(&mut self, oplog: &OpLog, vv: &crate::VersionVector);
fn calculate_diff(
&mut self,
oplog: &OpLog,
@ -398,6 +404,8 @@ pub(crate) trait DiffCalculatorTrait {
to: &crate::VersionVector,
on_new_container: impl FnMut(&ContainerID),
) -> (InternalDiff, DiffMode);
/// This round of diff calc is finished, we can clear the cache
fn finish_this_round(&mut self);
}
#[enum_dispatch(DiffCalculatorTrait)]
@ -416,20 +424,30 @@ pub(crate) enum ContainerDiffCalculator {
#[derive(Debug)]
pub(crate) struct MapDiffCalculator {
container_idx: ContainerIdx,
changed_key: FxHashSet<InternalString>,
changed: FxHashMap<InternalString, MapValue>,
current_mode: DiffMode,
}
impl MapDiffCalculator {
pub(crate) fn new(container_idx: ContainerIdx) -> Self {
Self {
container_idx,
changed_key: Default::default(),
changed: Default::default(),
current_mode: DiffMode::Checkout,
}
}
}
impl DiffCalculatorTrait for MapDiffCalculator {
fn start_tracking(&mut self, _oplog: &crate::OpLog, _vv: &crate::VersionVector) {}
fn start_tracking(
&mut self,
_oplog: &crate::OpLog,
_vv: &crate::VersionVector,
mode: DiffMode,
) {
self.changed.clear();
self.current_mode = mode;
}
fn apply_change(
&mut self,
@ -437,11 +455,29 @@ impl DiffCalculatorTrait for MapDiffCalculator {
op: crate::op::RichOp,
_vv: Option<&crate::VersionVector>,
) {
if matches!(self.current_mode, DiffMode::Checkout) {
// We need to use history cache anyway
return;
}
let map = op.raw_op().content.as_map().unwrap();
self.changed_key.insert(map.key.clone());
let new_value = MapValue {
value: map.value.clone(),
peer: op.peer,
lamp: op.lamport(),
};
match self.changed.get(&map.key) {
Some(old_value) if old_value > &new_value => {}
_ => {
self.changed.insert(map.key.clone(), new_value);
}
}
}
fn stop_tracking(&mut self, _oplog: &super::oplog::OpLog, _vv: &crate::VersionVector) {}
fn finish_this_round(&mut self) {
self.changed.clear();
self.current_mode = DiffMode::Checkout;
}
fn calculate_diff(
&mut self,
@ -450,53 +486,66 @@ impl DiffCalculatorTrait for MapDiffCalculator {
to: &crate::VersionVector,
mut on_new_container: impl FnMut(&ContainerID),
) -> (InternalDiff, DiffMode) {
let mut changed = Vec::new();
let group = oplog
.op_groups
.get(&self.container_idx)
.unwrap()
.as_map()
.unwrap();
for k in self.changed_key.iter() {
let peek_from = group.last_op(k, from);
let peek_to = group.last_op(k, to);
match (peek_from, peek_to) {
(None, None) => {}
(None, Some(_)) => changed.push((k.clone(), peek_to)),
(Some(_), None) => changed.push((k.clone(), peek_to)),
(Some(a), Some(b)) => {
if a != b {
changed.push((k.clone(), peek_to))
match self.current_mode {
DiffMode::Checkout => {
let mut changed = Vec::new();
let group = oplog
.op_groups
.get(&self.container_idx)
.unwrap()
.as_map()
.unwrap();
for k in group.keys() {
let peek_from = group.last_op(k, from);
let peek_to = group.last_op(k, to);
match (peek_from, peek_to) {
(None, None) => {}
(None, Some(_)) => changed.push((k.clone(), peek_to)),
(Some(_), None) => changed.push((k.clone(), peek_to)),
(Some(a), Some(b)) => {
if a != b {
changed.push((k.clone(), peek_to))
}
}
}
}
let mut updated =
FxHashMap::with_capacity_and_hasher(changed.len(), Default::default());
for (key, value) in changed {
let value = value
.map(|v| {
let value = v.value.clone();
if let Some(LoroValue::Container(c)) = &value {
on_new_container(c);
}
MapValue {
value,
lamp: v.lamport,
peer: v.peer,
}
})
.unwrap_or_else(|| MapValue {
value: None,
lamp: 0,
peer: 0,
});
updated.insert(key, value);
}
(InternalDiff::Map(MapDelta { updated }), DiffMode::Checkout)
}
DiffMode::Import | DiffMode::Linear => {
let changed = std::mem::take(&mut self.changed);
let mode = self.current_mode;
// Reset this field to avoid we use `has_all` to cache the diff calc and use it next round
// (In the next round we need to use the checkout mode)
self.current_mode = DiffMode::Checkout;
(InternalDiff::Map(MapDelta { updated: changed }), mode)
}
}
let mut updated = FxHashMap::with_capacity_and_hasher(changed.len(), Default::default());
for (key, value) in changed {
let value = value
.map(|v| {
let value = v.value.clone();
if let Some(LoroValue::Container(c)) = &value {
on_new_container(c);
}
MapValue {
value,
lamp: v.lamport,
peer: v.peer,
}
})
.unwrap_or_else(|| MapValue {
value: None,
lamp: 0,
peer: 0,
});
updated.insert(key, value);
}
(InternalDiff::Map(MapDelta { updated }), DiffMode::Checkout)
}
}
@ -531,7 +580,7 @@ impl std::fmt::Debug for ListDiffCalculator {
}
impl DiffCalculatorTrait for ListDiffCalculator {
fn start_tracking(&mut self, _oplog: &OpLog, vv: &crate::VersionVector) {
fn start_tracking(&mut self, _oplog: &OpLog, vv: &crate::VersionVector, mode: DiffMode) {
if !vv.includes_vv(&self.start_vv) || !self.tracker.all_vv().includes_vv(vv) {
self.tracker = Box::new(RichtextTracker::new_with_unknown());
self.start_vv = vv.clone();
@ -574,7 +623,7 @@ impl DiffCalculatorTrait for ListDiffCalculator {
}
}
fn stop_tracking(&mut self, _oplog: &OpLog, _vv: &crate::VersionVector) {}
fn finish_this_round(&mut self) {}
fn calculate_diff(
&mut self,
@ -690,7 +739,12 @@ impl RichtextDiffCalculator {
}
impl DiffCalculatorTrait for RichtextDiffCalculator {
fn start_tracking(&mut self, _oplog: &super::oplog::OpLog, vv: &crate::VersionVector) {
fn start_tracking(
&mut self,
_oplog: &super::oplog::OpLog,
vv: &crate::VersionVector,
mode: DiffMode,
) {
if !vv.includes_vv(&self.start_vv) || !self.tracker.all_vv().includes_vv(vv) {
self.tracker = Box::new(RichtextTracker::new_with_unknown());
self.styles.clear();
@ -800,7 +854,7 @@ impl DiffCalculatorTrait for RichtextDiffCalculator {
}
}
fn stop_tracking(&mut self, _oplog: &super::oplog::OpLog, _vv: &crate::VersionVector) {}
fn finish_this_round(&mut self) {}
fn calculate_diff(
&mut self,
@ -888,7 +942,7 @@ pub(crate) struct MovableListDiffCalculator {
}
impl DiffCalculatorTrait for MovableListDiffCalculator {
fn start_tracking(&mut self, _oplog: &OpLog, vv: &crate::VersionVector) {
fn start_tracking(&mut self, _oplog: &OpLog, vv: &crate::VersionVector, mode: DiffMode) {
if !vv.includes_vv(&self.list.start_vv) || !self.list.tracker.all_vv().includes_vv(vv) {
self.list.tracker = Box::new(RichtextTracker::new_with_unknown());
self.list.start_vv = vv.clone();
@ -993,8 +1047,8 @@ impl DiffCalculatorTrait for MovableListDiffCalculator {
};
}
fn stop_tracking(&mut self, oplog: &OpLog, vv: &crate::VersionVector) {
self.list.stop_tracking(oplog, vv)
fn finish_this_round(&mut self) {
self.list.finish_this_round()
}
#[instrument(skip(self, oplog, on_new_container))]

View file

@ -22,7 +22,7 @@ impl CounterDiffCalculator {
}
impl DiffCalculatorTrait for CounterDiffCalculator {
fn start_tracking(&mut self, _oplog: &OpLog, _vv: &crate::VersionVector) {}
fn start_tracking(&mut self, _oplog: &OpLog, _vv: &crate::VersionVector, mode: DiffMode) {}
fn apply_change(
&mut self,
@ -37,7 +37,7 @@ impl DiffCalculatorTrait for CounterDiffCalculator {
);
}
fn stop_tracking(&mut self, _oplog: &OpLog, _vv: &crate::VersionVector) {}
fn finish_this_round(&mut self) {}
fn calculate_diff(
&mut self,

View file

@ -23,7 +23,7 @@ pub(crate) struct TreeDiffCalculator {
}
impl DiffCalculatorTrait for TreeDiffCalculator {
fn start_tracking(&mut self, _oplog: &OpLog, _vv: &crate::VersionVector) {}
fn start_tracking(&mut self, _oplog: &OpLog, _vv: &crate::VersionVector, mode: DiffMode) {}
fn apply_change(
&mut self,
@ -33,7 +33,7 @@ impl DiffCalculatorTrait for TreeDiffCalculator {
) {
}
fn stop_tracking(&mut self, _oplog: &OpLog, _vv: &crate::VersionVector) {}
fn finish_this_round(&mut self) {}
fn calculate_diff(
&mut self,
@ -148,7 +148,7 @@ impl TreeDiffCalculator {
let _e = s.enter();
let to_frontiers = to.to_frontiers(&oplog.dag);
let from_frontiers = from.to_frontiers(&oplog.dag);
let common_ancestors = oplog
let (common_ancestors, _mode) = oplog
.dag
.find_common_ancestor(&from_frontiers, &to_frontiers);
let lca_vv = oplog.dag.frontiers_to_vv(&common_ancestors).unwrap();

View file

@ -8,7 +8,7 @@ use super::{DiffCalculatorTrait, DiffMode};
pub struct UnknownDiffCalculator;
impl DiffCalculatorTrait for UnknownDiffCalculator {
fn start_tracking(&mut self, _oplog: &OpLog, _vv: &crate::VersionVector) {}
fn start_tracking(&mut self, _oplog: &OpLog, _vv: &crate::VersionVector, mode: DiffMode) {}
fn apply_change(
&mut self,
@ -18,7 +18,7 @@ impl DiffCalculatorTrait for UnknownDiffCalculator {
) {
}
fn stop_tracking(&mut self, _oplog: &OpLog, _vv: &crate::VersionVector) {}
fn finish_this_round(&mut self) {}
fn calculate_diff(
&mut self,

View file

@ -201,6 +201,10 @@ impl MapOpGroup {
.find(|op| vv.get(&op.peer).copied().unwrap_or(0) > op.counter)
})
}
pub(crate) fn keys(&self) -> impl Iterator<Item = &InternalString> {
self.ops.keys()
}
}
impl OpGroupTrait for MapOpGroup {

View file

@ -14,6 +14,7 @@ use crate::change::{get_sys_timestamp, Change, Lamport, Timestamp};
use crate::configure::Configure;
use crate::container::list::list_op;
use crate::dag::{Dag, DagUtils};
use crate::diff_calc::DiffMode;
use crate::encoding::ParsedHeaderAndBody;
use crate::encoding::{decode_oplog, encode_oplog, EncodeMode};
use crate::history_cache::ContainerHistoryCache;
@ -529,6 +530,7 @@ impl OpLog {
to_frontiers: Option<&Frontiers>,
) -> (
VersionVector,
DiffMode,
impl Iterator<Item = (BlockChangeRef, Counter, Rc<RefCell<VersionVector>>)> + '_,
) {
let mut merged_vv = from.clone();
@ -552,7 +554,8 @@ impl OpLog {
}
};
let common_ancestors = self.dag.find_common_ancestor(from_frontiers, to_frontiers);
let (common_ancestors, diff_mode) =
self.dag.find_common_ancestor(from_frontiers, to_frontiers);
let common_ancestors_vv = self.dag.frontiers_to_vv(&common_ancestors).unwrap();
// go from lca to merged_vv
let diff = common_ancestors_vv.diff(&merged_vv).right;
@ -562,6 +565,7 @@ impl OpLog {
let vv = Rc::new(RefCell::new(VersionVector::default()));
(
common_ancestors_vv.clone(),
diff_mode,
std::iter::from_fn(move || {
if let Some(inner) = &node {
let mut inner_vv = vv.borrow_mut();

View file

@ -11,6 +11,7 @@ use crate::{
arena::SharedArena,
container::{idx::ContainerIdx, map::MapSet},
delta::{MapValue, ResolvedMapDelta, ResolvedMapValue},
diff_calc::DiffMode,
encoding::{EncodeMode, StateSnapshotDecodeContext, StateSnapshotEncoder},
event::{Diff, Index, InternalDiff},
handler::ValueOrHandler,
@ -53,18 +54,34 @@ impl ContainerState for MapState {
let InternalDiff::Map(delta) = diff else {
unreachable!()
};
let force = matches!(mode, DiffMode::Checkout | DiffMode::Linear);
let mut resolved_delta = ResolvedMapDelta::new();
for (key, value) in delta.updated.into_iter() {
self.map.insert(key.clone(), value.clone());
resolved_delta = resolved_delta.with_entry(
key,
ResolvedMapValue {
idlp: IdLp::new(value.peer, value.lamp),
value: value
.value
.map(|v| ValueOrHandler::from_value(v, arena, txn, state)),
},
)
let mut changed = false;
if force {
self.map.insert(key.clone(), value.clone());
changed = true;
} else {
match self.map.get(&key) {
Some(old_value) if old_value > &value => {}
_ => {
self.map.insert(key.clone(), value.clone());
changed = true;
}
}
}
if changed {
resolved_delta = resolved_delta.with_entry(
key,
ResolvedMapValue {
idlp: IdLp::new(value.peer, value.lamp),
value: value
.value
.map(|v| ValueOrHandler::from_value(v, arena, txn, state)),
},
)
}
}
Diff::Map(resolved_delta)