Merge pull request Perf: reduce checkout when importing #45

Perf: reduce checkout when importing
This commit is contained in:
Zixuan Chen 2022-12-18 19:20:35 +08:00 committed by GitHub
commit 02f6653d04
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
17 changed files with 435 additions and 142 deletions

View file

@ -1,11 +1,11 @@
use loro_core::{LoroCore, LoroValue};
fn main() {
let mut actors: Vec<_> = (0..200).map(|_| LoroCore::default()).collect();
let mut actors: Vec<_> = (0..500).map(|_| LoroCore::default()).collect();
for (i, actor) in actors.iter_mut().enumerate() {
let mut list = actor.get_list("list");
let mut map = actor.get_map("map");
let value: LoroValue = i.to_string().into();
list.insert(actor, 0, value).unwrap();
map.insert(actor, &i.to_string(), value).unwrap();
}
for i in 1..actors.len() {

View file

@ -6,11 +6,10 @@
//!
use crate::{
event::{Observer, SubscriptionID},
fx_map,
hierarchy::Hierarchy,
log_store::ImportContext,
op::{InnerContent, RemoteContent, RichOp},
version::{IdSpanVector, VersionVector},
version::VersionVector,
InternalString, LoroError, LoroValue, ID,
};
@ -96,13 +95,8 @@ pub trait Container: Debug + Any + Unpin {
/// convert an op content to compact imported format
fn to_import(&mut self, content: RemoteContent) -> InnerContent;
/// Tracker need to retreat in order to apply the op.
/// TODO: can be merged into checkout
fn track_retreat(&mut self, spans: &IdSpanVector);
/// Tracker need to forward in order to apply the op.
/// TODO: can be merged into checkout
fn track_forward(&mut self, spans: &IdSpanVector);
/// Initialize tracker at the target version
fn tracker_init(&mut self, vv: &VersionVector);
/// Tracker need to checkout to target version in order to apply the op.
fn tracker_checkout(&mut self, vv: &VersionVector);

View file

@ -28,7 +28,6 @@ use crate::{
op::{InnerContent, Op, RemoteContent, RichOp},
prelim::Prelim,
value::LoroValue,
version::IdSpanVector,
LoroError,
};
@ -387,15 +386,7 @@ impl Container for ListContainer {
}
}
fn track_retreat(&mut self, spans: &IdSpanVector) {
self.tracker.retreat(spans);
}
fn track_forward(&mut self, spans: &IdSpanVector) {
self.tracker.forward(spans);
}
fn tracker_checkout(&mut self, vv: &crate::VersionVector) {
fn tracker_init(&mut self, vv: &crate::VersionVector) {
if (!vv.is_empty() || self.tracker.start_vv().is_empty())
&& self.tracker.all_vv() >= vv
&& vv >= self.tracker.start_vv()
@ -406,6 +397,10 @@ impl Container for ListContainer {
}
}
fn tracker_checkout(&mut self, vv: &crate::VersionVector) {
self.tracker.checkout(vv)
}
fn track_apply(&mut self, _: &mut Hierarchy, rich_op: &RichOp, _: &mut ImportContext) {
self.tracker.track_apply(rich_op);
}

View file

@ -25,7 +25,7 @@ use crate::{
prelim::Prelim,
span::HasLamport,
value::LoroValue,
version::{Frontiers, IdSpanVector, TotalOrderStamp},
version::{Frontiers, TotalOrderStamp},
InternalString,
};
@ -295,7 +295,9 @@ impl Container for MapContainer {
map.into()
}
fn tracker_checkout(&mut self, _vv: &crate::version::VersionVector) {}
fn tracker_init(&mut self, _vv: &crate::version::VersionVector) {}
fn tracker_checkout(&mut self, vv: &crate::VersionVector) {}
fn to_export(&mut self, content: InnerContent, _gc: bool) -> SmallVec<[RemoteContent; 1]> {
if let Ok(set) = content.into_map() {
@ -382,10 +384,6 @@ impl Container for MapContainer {
}
}
fn track_retreat(&mut self, _: &IdSpanVector) {}
fn track_forward(&mut self, _: &IdSpanVector) {}
fn track_apply(&mut self, _: &mut Hierarchy, op: &RichOp, _: &mut ImportContext) {
self.pending_ops.push(op.as_owned());
}

View file

@ -67,6 +67,16 @@ impl Container for ContainerInstance {
}
}
#[instrument(skip_all)]
fn tracker_init(&mut self, vv: &crate::VersionVector) {
match self {
ContainerInstance::Map(x) => x.tracker_init(vv),
ContainerInstance::Text(x) => x.tracker_init(vv),
ContainerInstance::Dyn(x) => x.tracker_init(vv),
ContainerInstance::List(x) => x.tracker_init(vv),
}
}
#[instrument(skip_all)]
fn tracker_checkout(&mut self, vv: &crate::VersionVector) {
match self {
@ -102,26 +112,6 @@ impl Container for ContainerInstance {
}
}
#[instrument(skip_all)]
fn track_retreat(&mut self, op: &IdSpanVector) {
match self {
ContainerInstance::Map(x) => x.track_retreat(op),
ContainerInstance::Text(x) => x.track_retreat(op),
ContainerInstance::Dyn(x) => x.track_retreat(op),
ContainerInstance::List(x) => x.track_retreat(op),
}
}
#[instrument(skip_all)]
fn track_forward(&mut self, op: &IdSpanVector) {
match self {
ContainerInstance::Map(x) => x.track_forward(op),
ContainerInstance::Text(x) => x.track_forward(op),
ContainerInstance::Dyn(x) => x.track_forward(op),
ContainerInstance::List(x) => x.track_forward(op),
}
}
#[instrument(skip_all)]
fn track_apply(&mut self, hierarchy: &mut Hierarchy, op: &RichOp, ctx: &mut ImportContext) {
match self {

View file

@ -319,16 +319,8 @@ impl Container for TextContainer {
}
}
fn track_retreat(&mut self, spans: &IdSpanVector) {
self.tracker.retreat(spans);
}
fn track_forward(&mut self, spans: &IdSpanVector) {
self.tracker.forward(spans);
}
#[instrument(skip_all)]
fn tracker_checkout(&mut self, vv: &crate::VersionVector) {
fn tracker_init(&mut self, vv: &crate::VersionVector) {
if (!vv.is_empty() || self.tracker.start_vv().is_empty())
&& self.tracker.all_vv() >= vv
&& vv >= self.tracker.start_vv()
@ -339,6 +331,11 @@ impl Container for TextContainer {
}
}
#[instrument(skip_all)]
fn tracker_checkout(&mut self, vv: &crate::VersionVector) {
self.tracker.checkout(vv)
}
#[instrument(skip_all)]
fn track_apply(
&mut self,

View file

@ -54,7 +54,6 @@ pub struct Tracker {
///
/// Because sometimes we don't actually need to checkout to the version.
/// So we may cache the changes then applying them when we really need to.
cached_fake_current_vv: VersionVector,
content: ContentMap,
id_to_cursor: CursorMap,
}
@ -88,7 +87,6 @@ impl Tracker {
#[cfg(feature = "test_utils")]
client_id: 0,
current_vv: start_vv.clone(),
cached_fake_current_vv: start_vv.clone(),
all_vv: start_vv.clone(),
start_vv,
}
@ -104,7 +102,7 @@ impl Tracker {
}
pub fn contains(&self, id: ID) -> bool {
!self.cached_fake_current_vv.includes_id(id) && self.all_vv.includes_id(id)
self.all_vv.includes_id(id)
}
/// check whether id_to_cursor correctly reflect the status of the content
@ -134,23 +132,17 @@ impl Tracker {
}
pub fn checkout(&mut self, vv: &VersionVector) {
self.cached_fake_current_vv = vv.clone();
}
fn real_checkout(&mut self) {
if self.current_vv == self.cached_fake_current_vv {
if &self.current_vv == vv {
return;
}
let diff = self.current_vv.diff(&self.cached_fake_current_vv);
self.real_retreat(&diff.left);
self.real_forward(&diff.right);
debug_assert_eq!(&self.current_vv, &self.cached_fake_current_vv);
}
pub fn forward(&mut self, spans: &IdSpanVector) {
self.cached_fake_current_vv.forward(spans);
self.all_vv.forward(spans);
let self_vv = std::mem::take(&mut self.current_vv);
{
let diff = self_vv.diff_iter(vv);
self.retreat(diff.0);
self.forward(diff.1);
}
self.current_vv = vv.clone();
}
pub fn track_apply(&mut self, rich_op: &RichOp) {
@ -160,14 +152,17 @@ impl Tracker {
.all_vv()
.includes_id(id.inc(content.atom_len() as Counter - 1))
{
self.forward(&id.to_span(content.atom_len()).to_id_span_vec());
self.forward(std::iter::once(id.to_span(content.atom_len())));
return;
}
if self.all_vv().includes_id(id) {
let this_ctr = self.all_vv().get(&id.client_id).unwrap();
let shift = this_ctr - id.counter;
self.forward(&id.to_span(shift as usize).to_id_span_vec());
self.forward(std::iter::once(id.to_span(shift as usize)));
if shift as usize >= content.atom_len() {
unreachable!();
}
self.apply(
id.inc(shift),
&content.slice(shift as usize, content.atom_len()),
@ -177,23 +172,19 @@ impl Tracker {
}
}
fn real_forward(&mut self, spans: &IdSpanVector) {
if spans.is_empty() {
return;
}
let mut cursors = Vec::with_capacity(spans.len());
let mut args = Vec::with_capacity(spans.len());
for span in spans.iter() {
let end_id = ID::new(*span.0, span.1.end);
fn forward(&mut self, spans: impl Iterator<Item = IdSpan>) {
let mut cursors = Vec::new();
let mut args = Vec::new();
for span in spans {
let end_id = ID::new(span.client_id, span.counter.end);
self.current_vv.set_end(end_id);
if let Some(all_end_ctr) = self.all_vv.get_mut(span.0) {
if let Some(all_end_ctr) = self.all_vv.get_mut(&span.client_id) {
let all_end = *all_end_ctr;
if all_end < span.1.end {
if all_end < span.counter.end {
// there may be holes when there are multiple containers
*all_end_ctr = span.1.end;
*all_end_ctr = span.counter.end;
}
if all_end <= span.1.start {
if all_end <= span.counter.start {
continue;
}
} else {
@ -201,9 +192,9 @@ impl Tracker {
continue;
}
let IdSpanQueryResult { inserts, deletes } = self
.id_to_cursor
.get_cursors_at_id_span(IdSpan::new(*span.0, span.1.start, span.1.end));
let IdSpanQueryResult { inserts, deletes } = self.id_to_cursor.get_cursors_at_id_span(
IdSpan::new(span.client_id, span.counter.start, span.counter.end),
);
for (_, delete) in deletes {
for deleted_span in delete.iter() {
for span in self
@ -235,25 +226,16 @@ impl Tracker {
)
}
pub fn retreat(&mut self, spans: &IdSpanVector) {
self.cached_fake_current_vv.retreat(spans);
self.all_vv.forward(spans);
}
fn real_retreat(&mut self, spans: &IdSpanVector) {
if spans.is_empty() {
return;
}
let mut cursors = Vec::with_capacity(spans.len());
let mut args = Vec::with_capacity(spans.len());
for span in spans.iter() {
let span_start = ID::new(*span.0, span.1.start);
fn retreat(&mut self, spans: impl Iterator<Item = IdSpan>) {
let mut cursors = Vec::new();
let mut args = Vec::new();
for span in spans {
let span_start = ID::new(span.client_id, span.counter.start);
self.current_vv.set_end(span_start);
if let Some(all_end_ctr) = self.all_vv.get_mut(span.0) {
if let Some(all_end_ctr) = self.all_vv.get_mut(&span.client_id) {
let all_end = *all_end_ctr;
if all_end < span.1.start {
*all_end_ctr = span.1.end;
if all_end < span.counter.start {
*all_end_ctr = span.counter.end;
continue;
}
} else {
@ -261,9 +243,9 @@ impl Tracker {
continue;
}
let IdSpanQueryResult { inserts, deletes } = self
.id_to_cursor
.get_cursors_at_id_span(IdSpan::new(*span.0, span.1.start, span.1.end));
let IdSpanQueryResult { inserts, deletes } = self.id_to_cursor.get_cursors_at_id_span(
IdSpan::new(span.client_id, span.counter.start, span.counter.end),
);
for (id, delete) in deletes {
assert!(span.contains_id(id));
@ -304,13 +286,10 @@ impl Tracker {
/// apply an operation directly to the current tracker
fn apply(&mut self, id: ID, content: &InnerContent) {
self.real_checkout();
assert!(*self.current_vv.get(&id.client_id).unwrap_or(&0) <= id.counter);
assert!(*self.all_vv.get(&id.client_id).unwrap_or(&0) <= id.counter);
self.current_vv
.set_end(id.inc(content.content_len() as i32));
self.cached_fake_current_vv
.set_end(id.inc(content.content_len() as i32));
self.all_vv.set_end(id.inc(content.content_len() as i32));
let text_content = content.as_list().expect("Content is not for list");
match text_content {
@ -389,7 +368,6 @@ impl Tracker {
pub fn iter_effects(&mut self, from: &VersionVector, target: &IdSpanVector) -> EffectIter<'_> {
self.checkout(from);
self.real_checkout();
EffectIter::new(self, target)
}

View file

@ -9,6 +9,7 @@ use crate::{
use super::{cursor_map::FirstCursorResult, y_span::StatusChange, Tracker};
#[derive(Debug)]
pub struct EffectIter<'a> {
tracker: &'a mut Tracker,
left_spans: Vec<IdSpan>,
@ -78,7 +79,6 @@ impl<'a> Iterator for EffectIter<'a> {
let len = cursor.len;
*delete_op_id = delete_op_id.inc(cursor.len as Counter);
self.tracker.current_vv.set_end(*delete_op_id);
self.tracker.cached_fake_current_vv.set_end(*delete_op_id);
let length = -self.tracker.update_cursors(cursor, StatusChange::Delete);
assert!(length >= 0);
if length > 0 {
@ -111,9 +111,6 @@ impl<'a> Iterator for EffectIter<'a> {
self.tracker
.current_vv
.set_end(id.inc(cursor.len as Counter));
self.tracker
.cached_fake_current_vv
.set_end(id.inc(cursor.len as Counter));
let length_diff = self
.tracker
.update_cursors(cursor, StatusChange::SetAsCurrent);

View file

@ -187,8 +187,10 @@ pub(crate) struct DagCausalIter<'a, Dag> {
pub(crate) struct IterReturn<'a, T> {
pub retreat: IdSpanVector,
pub forward: IdSpanVector,
/// data is a reference, it need to be sliced by the counter_range to get the underlying data
pub data: &'a T,
pub slice: Range<Counter>,
/// data[slice] is the data we want to return
pub slice: Range<i32>,
}
impl<'a, T: DagNode, D: Dag<Node = T>> DagCausalIter<'a, D> {
@ -273,6 +275,10 @@ impl<'a, T: DagNode + 'a, D: Dag<Node = T>> Iterator for DagCausalIter<'a, D> {
smallvec::smallvec![node.id_start().inc(slice_from - 1)]
};
let path = self.dag.find_path(&self.frontier, &deps);
debug_log::group!("Dag Causal");
debug_log::debug_dbg!(&deps);
debug_log::debug_dbg!(&path);
debug_log::group_end!();
// NOTE: we expect user to update the tracker, to apply node, after visiting the node
self.frontier = smallvec::smallvec![node.id_start().inc(slice_end - 1)];
Some(IterReturn {

View file

@ -900,6 +900,36 @@ mod test {
)
}
#[test]
fn simplify_checkout() {
test_multi_sites(
8,
&mut [
Ins {
content: 35368,
pos: 73184288580830345,
site: 16,
},
Ins {
content: 4,
pos: 18446744073693037568,
site: 255,
},
SyncAll,
Del {
pos: 18377562991809527818,
len: 9955211391596233732,
site: 137,
},
Ins {
content: 1028,
pos: 283674009020420,
site: 0,
},
],
)
}
#[test]
fn case_encode() {
test_single_client_encode(vec![Ins {

View file

@ -1106,6 +1106,121 @@ mod failed_tests {
)
}
#[test]
fn checkout_error() {
test_multi_sites(
2,
&mut [
Map {
site: 0,
container_idx: 0,
key: 0,
value: Null,
},
List {
site: 1,
container_idx: 0,
key: 0,
value: I32(1),
},
List {
site: 0,
container_idx: 0,
key: 0,
value: Container(C::List),
},
],
)
}
#[test]
fn unknown() {
test_multi_sites(
5,
&mut [
Text {
site: 2,
container_idx: 0,
pos: 0,
value: 39064,
is_del: false,
},
Text {
site: 2,
container_idx: 0,
pos: 5,
value: 152,
is_del: false,
},
Sync { from: 2, to: 3 },
Text {
site: 3,
container_idx: 0,
pos: 10,
value: 2,
is_del: true,
},
Text {
site: 2,
container_idx: 0,
pos: 0,
value: 39064,
is_del: false,
},
Sync { from: 2, to: 3 },
Text {
site: 2,
container_idx: 0,
pos: 16,
value: 39064,
is_del: false,
},
Text {
site: 2,
container_idx: 0,
pos: 8,
value: 39064,
is_del: false,
},
Text {
site: 2,
container_idx: 0,
pos: 28,
value: 39064,
is_del: false,
},
Text {
site: 2,
container_idx: 0,
pos: 0,
value: 39064,
is_del: false,
},
Text {
site: 2,
container_idx: 0,
pos: 41,
value: 45232,
is_del: false,
},
Sync { from: 1, to: 2 },
Text {
site: 2,
container_idx: 0,
pos: 48,
value: 39064,
is_del: false,
},
List {
site: 1,
container_idx: 0,
key: 0,
value: I32(-1734829928),
},
],
)
}
#[test]
fn list_slice_err() {
test_multi_sites(
@ -1149,7 +1264,92 @@ mod failed_tests {
use super::ContainerType as C;
#[test]
fn to_minify() {
minify_error(5, vec![], test_multi_sites, normalize)
minify_error(
5,
vec![
Text {
site: 2,
container_idx: 0,
pos: 0,
value: 39064,
is_del: false,
},
Text {
site: 2,
container_idx: 0,
pos: 5,
value: 152,
is_del: false,
},
Sync { from: 2, to: 3 },
Text {
site: 3,
container_idx: 0,
pos: 10,
value: 2,
is_del: true,
},
Text {
site: 2,
container_idx: 0,
pos: 0,
value: 39064,
is_del: false,
},
Sync { from: 2, to: 3 },
Text {
site: 2,
container_idx: 0,
pos: 16,
value: 39064,
is_del: false,
},
Text {
site: 2,
container_idx: 0,
pos: 8,
value: 39064,
is_del: false,
},
Text {
site: 2,
container_idx: 0,
pos: 28,
value: 39064,
is_del: false,
},
Text {
site: 2,
container_idx: 0,
pos: 0,
value: 39064,
is_del: false,
},
Text {
site: 2,
container_idx: 0,
pos: 41,
value: 45232,
is_del: false,
},
Sync { from: 1, to: 2 },
Text {
site: 2,
container_idx: 0,
pos: 48,
value: 39064,
is_del: false,
},
List {
site: 1,
container_idx: 0,
key: 0,
value: I32(-1734829928),
},
],
test_multi_sites,
normalize,
)
}
#[ctor::ctor]

View file

@ -236,8 +236,6 @@ impl Hierarchy {
}
pub fn notify(&mut self, raw_event: RawEvent) {
debug_log::debug_log!("notify {:#?}", &raw_event);
debug_log::debug_dbg!(&self);
let target_id = raw_event.container_id;
let mut event = Event {
absolute_path: raw_event.abs_path,
@ -311,8 +309,6 @@ impl Hierarchy {
.observers
.insert(id, observer);
}
debug_log::debug_log!("Subscribe {:?}", container);
debug_log::debug_dbg!(&self);
id
}

View file

@ -192,13 +192,16 @@ impl LoroCore {
}
pub fn import_updates(&mut self, input: &[u8]) -> Result<(), LoroError> {
debug_log::group!("Import updates at {}", self.client_id());
let ans = self.log_store.write().unwrap().import_updates(input);
match ans {
let ans = match ans {
Ok(events) => {
self.notify(events);
Ok(())
}
Err(err) => Err(LoroError::DecodeError(err.to_string().into_boxed_str())),
}
};
debug_log::group_end!();
ans
}
}

View file

@ -1,13 +1,15 @@
use crate::id::{Counter, ID};
use crate::LogStore;
use crate::{
container::registry::ContainerIdx,
event::{Diff, RawEvent},
version::{Frontiers, IdSpanVector},
};
use std::thread::current;
use std::{collections::VecDeque, ops::ControlFlow, sync::MutexGuard};
use tracing::instrument;
use fxhash::FxHashMap;
use fxhash::{FxHashMap, FxHashSet};
use rle::{slice_vec_by, HasLength, RleVecWithIndex};
@ -226,7 +228,7 @@ impl LogStore {
let change = iter.data;
for op in change.ops.iter() {
let rich_op = RichOp::new_by_slice_on_change(change, op, start, end);
let rich_op = RichOp::new_by_slice_on_change(change, start, end, op);
if rich_op.atom_len() == 0 {
continue;
}
@ -249,33 +251,46 @@ impl LogStore {
let mut common_ancestors_vv = self.vv.clone();
common_ancestors_vv.retreat(&self.find_path(&common_ancestors, &self.frontiers).right);
for (_, container) in container_map.iter_mut() {
container.tracker_checkout(&common_ancestors_vv);
container.tracker_init(&common_ancestors_vv);
}
self.with_hierarchy(|store, hierarchy| {
let mut current_vv = common_ancestors_vv.clone();
let mut already_checkout = FxHashSet::default();
for iter in store.iter_causal(
&common_ancestors,
context.new_vv.diff(&common_ancestors_vv).left,
context.new_vv.sub_vec(&common_ancestors_vv),
) {
debug_log::debug_dbg!(&iter);
debug_log::debug_dbg!(&current_vv);
already_checkout.clear();
let start = iter.slice.start;
let end = iter.slice.end;
let change = iter.data;
// TODO: perf: we can make iter_causal returns target vv and only
// checkout the related container to the target vv
for (_, container) in container_map.iter_mut() {
container.track_retreat(&iter.retreat);
container.track_forward(&iter.forward);
}
current_vv.retreat(&iter.retreat);
current_vv.forward(&iter.forward);
debug_log::debug_dbg!(&current_vv);
for op in change.ops.iter() {
let rich_op = RichOp::new_by_slice_on_change(change, op, start, end);
let rich_op = RichOp::new_by_slice_on_change(change, start, end, op);
if rich_op.atom_len() == 0 {
continue;
}
debug_log::debug_dbg!(&rich_op);
if let Some(container) = container_map.get_mut(&op.container) {
if !already_checkout.contains(&op.container) {
already_checkout.insert(op.container);
container.tracker_checkout(&current_vv);
}
container.track_apply(hierarchy, &rich_op, context);
}
}
current_vv.set_end(ID::new(
change.id.client_id,
end as Counter + change.id.counter,
));
}
});
debug_log::group!("apply effects");

View file

@ -93,9 +93,9 @@ impl<'a> Iterator for OpSpanIter<'a> {
self.op_index += 1;
let op = RichOp::new_by_slice_on_change(
change,
op,
self.span.counter.min() - change.id.counter,
self.span.counter.end() - change.id.counter,
op,
);
if op.atom_len() == 0 {
return None;

View file

@ -235,7 +235,10 @@ impl<'a> RichOp<'a> {
}
}
pub fn new_by_slice_on_change(change: &Change<Op>, op: &'a Op, start: i32, end: i32) -> Self {
/// we want the overlap part of the op and change[start..end]
///
/// op is contained in the change, but it's not necessary overlap with change[start..end]
pub fn new_by_slice_on_change(change: &Change<Op>, start: i32, end: i32, op: &'a Op) -> Self {
debug_assert!(end > start);
let op_index_in_change = op.counter - change.id.counter;
let op_slice_start = (start - op_index_in_change)

View file

@ -31,6 +31,14 @@ use crate::{
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct VersionVector(FxHashMap<ClientID, Counter>);
/// This is a subset of VersionVector, it only includes the counters of a subset of the clients.
/// If the counter is zero, then it means the client is not in the target version
///
/// It's used to represent the changed part of the version vector.
#[repr(transparent)]
#[derive(Debug, Clone)]
pub struct VersionVectorSubset(pub VersionVector);
// TODO: use new type
pub type Frontiers = SmallVec<[ID; 2]>;
@ -236,6 +244,67 @@ impl VersionVector {
ans
}
/// Returns two iterators that cover the differences between two version vectors.
///
/// - The first iterator contains the spans that are in `self` but not in `rhs`
/// - The second iterator contains the spans that are in `rhs` but not in `self`
pub fn diff_iter<'a>(
&'a self,
rhs: &'a Self,
) -> (
impl Iterator<Item = IdSpan> + 'a,
impl Iterator<Item = IdSpan> + 'a,
) {
(self.sub_iter(rhs), rhs.sub_iter(self))
}
/// Returns the spans that are in `self` but not in `rhs`
pub fn sub_iter<'a>(&'a self, rhs: &'a Self) -> impl Iterator<Item = IdSpan> + 'a {
self.iter().filter_map(move |(client_id, &counter)| {
if let Some(&rhs_counter) = rhs.get(client_id) {
if counter > rhs_counter {
Some(IdSpan {
client_id: *client_id,
counter: CounterSpan {
start: rhs_counter,
end: counter,
},
})
} else {
None
}
} else {
Some(IdSpan {
client_id: *client_id,
counter: CounterSpan {
start: 0,
end: counter,
},
})
}
})
}
pub fn sub_vec(&self, rhs: &Self) -> IdSpanVector {
self.sub_iter(rhs)
.map(|x| (x.client_id, x.counter))
.collect()
}
pub fn to_spans(&self) -> IdSpanVector {
self.iter()
.map(|(client_id, &counter)| {
(
*client_id,
CounterSpan {
start: 0,
end: counter,
},
)
})
.collect()
}
#[inline]
pub fn get_frontiers(&self) -> SmallVec<[ID; 2]> {
self.iter()
@ -343,6 +412,28 @@ impl VersionVector {
None
}
pub fn extend_to_include_vv(&mut self, vv: &VersionVector) {
for (&client_id, &counter) in vv.iter() {
if let Some(my_counter) = self.get_mut(&client_id) {
if *my_counter < counter {
*my_counter = counter;
}
} else {
self.0.insert(client_id, counter);
}
}
}
pub fn extend_to_include_last_id(&mut self, id: ID) {
if let Some(counter) = self.get_mut(&id.client_id) {
if *counter <= id.counter {
*counter = id.counter + 1;
}
} else {
self.set_last(id)
}
}
pub fn extend_to_include(&mut self, span: IdSpan) {
if let Some(counter) = self.get_mut(&span.client_id) {
if *counter < span.counter.end() {
@ -407,7 +498,7 @@ impl VersionVector {
#[inline(always)]
pub fn decode(bytes: &[u8]) -> Result<Self, LoroError> {
postcard::from_bytes(bytes).map_err(|_|LoroError::DecodeVersionVectorError)
postcard::from_bytes(bytes).map_err(|_| LoroError::DecodeVersionVectorError)
}
}