Merge pull request #19 from loro-dev/refactor-list

Refactor list
This commit is contained in:
Zixuan Chen 2022-11-18 21:03:30 +08:00 committed by GitHub
commit fb711d22a9
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
40 changed files with 1353 additions and 1413 deletions

View file

@ -1,23 +1,23 @@
{
"name": "Rust",
"image": "mcr.microsoft.com/devcontainers/rust:1-bullseye",
"features": {
"ghcr.io/devcontainers/features/node:1": {},
"ghcr.io/devcontainers-contrib/features/deno:1": {}
},
"name": "Rust",
"image": "mcr.microsoft.com/devcontainers/rust:1-bullseye",
"features": {
"ghcr.io/devcontainers/features/node:1": {},
"ghcr.io/devcontainers-contrib/features/deno:1": {}
},
// Features to add to the dev container. More info: https://containers.dev/implementors/features.
// "features": {},
// Features to add to the dev container. More info: https://containers.dev/implementors/features.
// "features": {},
// Use 'forwardPorts' to make a list of ports inside the container available locally.
// "forwardPorts": [],
// Use 'forwardPorts' to make a list of ports inside the container available locally.
// "forwardPorts": [],
// Use 'postCreateCommand' to run commands after the container is created.
"postCreateCommand": "bash scripts/install-dev-tools.sh"
// Use 'postCreateCommand' to run commands after the container is created.
"postCreateCommand": "bash scripts/install-dev-tools.sh"
// Configure tool-specific properties.
// "customizations": {},
// Configure tool-specific properties.
// "customizations": {},
// Uncomment to connect as root instead. More info: https://aka.ms/dev-containers-non-root.
// "remoteUser": "root"
// Uncomment to connect as root instead. More info: https://aka.ms/dev-containers-non-root.
// "remoteUser": "root"
}

View file

@ -28,8 +28,8 @@ bit-vec = "0.6.3"
wasm-bindgen = { version = "0.2.83", optional = true }
js-sys = { version = "0.3.60", optional = true }
serde_json = { version = "1.0.87", optional = true }
serde_columnar = { version = "0.1.0" }
arref = "0.1.0"
serde_columnar = { version = "0.1.0" }
[dev-dependencies]
serde_json = "1.0.87"
@ -61,9 +61,9 @@ name = "text"
harness = false
[[bench]]
name = "encode"
name = "list"
harness = false
[[bench]]
name = "list"
name = "encode"
harness = false

View file

@ -39,14 +39,14 @@ mod run {
let mut text = actor.get_text(container.to_string().as_str());
text.insert(
actor,
(action.pos as usize) % text.text_len().max(1),
(action.pos as usize) % text.len().max(1),
action.value.to_string().as_str(),
);
} else {
let mut list = actor.get_list(container.to_string().as_str());
list.insert(
actor,
(action.pos as usize) % list.values_len().max(1),
(action.pos as usize) % list.len().max(1),
action.value.to_string().as_str(),
);
}

View file

@ -85,15 +85,6 @@ version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f8fe8f5a8a398345e52358e18ff07cc17a568fbca5c6f73873d3a62056309603"
[[package]]
name = "bincode"
version = "1.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b1f45e9417d87227c7a56d22e471c6206462cba514c7590c09aff4cf6d1ddcad"
dependencies = [
"serde",
]
[[package]]
name = "bit-vec"
version = "0.6.3"
@ -177,30 +168,6 @@ dependencies = [
"winapi",
]
[[package]]
name = "columnar"
version = "0.1.0"
dependencies = [
"bincode",
"columnar_derive",
"flate2",
"itertools",
"lazy_static",
"postcard",
"serde",
"thiserror",
]
[[package]]
name = "columnar_derive"
version = "0.1.0"
dependencies = [
"darling",
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "cortex-m"
version = "0.7.6"
@ -493,9 +460,9 @@ name = "loro-core"
version = "0.1.0"
dependencies = [
"arbitrary",
"arref",
"bit-vec",
"colored",
"columnar",
"crdt-list",
"enum-as-inner",
"fxhash",
@ -507,6 +474,7 @@ dependencies = [
"ring",
"rle",
"serde",
"serde_columnar",
"serde_json",
"smallvec",
"smartstring",
@ -977,6 +945,32 @@ dependencies = [
"serde_derive",
]
[[package]]
name = "serde_columnar"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "520222b6f07ee150126e79842687c567c2eeb9043648065c9f0d2e22adeeaa7c"
dependencies = [
"flate2",
"itertools",
"postcard",
"serde",
"serde_columnar_derive",
"thiserror",
]
[[package]]
name = "serde_columnar_derive"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dbbc55f793dde5998bba28398fab7440edcddf02e7b60665dbd60cfa1bb5e039"
dependencies = [
"darling",
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "serde_derive"
version = "1.0.145"

View file

@ -12,7 +12,7 @@ use crate::{
span::{HasId, HasIdSpan, HasLamport},
};
use num::traits::AsPrimitive;
use rle::{HasIndex, HasLength, Mergable, RleVec, Sliceable};
use rle::{HasIndex, HasLength, Mergable, Rle, RleVec, Sliceable};
use smallvec::SmallVec;
pub type Timestamp = i64;
@ -68,7 +68,7 @@ impl<O: Mergable + HasLength + HasIndex> HasLength for Change<O> {
}
}
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct ChangeMergeCfg {
pub max_change_length: usize,
pub max_change_interval: usize,
@ -92,7 +92,7 @@ impl Default for ChangeMergeCfg {
}
}
impl Mergable<ChangeMergeCfg> for Change {
impl<O: Rle + HasIndex> Mergable<ChangeMergeCfg> for Change<O> {
fn merge(&mut self, other: &Self, _: &ChangeMergeCfg) {
self.ops.merge(&other.ops, &());
}

View file

@ -5,34 +5,69 @@
//! Every [Container] can take a [Snapshot], which contains [crate::LoroValue] that describes the state.
//!
use crate::{
op::RemoteOp, span::IdSpan, version::VersionVector, InternalString, LogStore, LoroValue, ID,
op::{RemoteOp, RichOp},
version::{IdSpanVector, VersionVector},
InternalString, LoroValue, ID,
};
use serde::{Deserialize, Serialize};
use std::{any::Any, fmt::Debug};
mod container_content;
pub mod registry;
pub mod list;
pub mod map;
pub mod text;
pub use container_content::*;
#[cfg_attr(feature = "test_utils", derive(arbitrary::Arbitrary))]
#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy, Serialize, Deserialize)]
pub enum ContainerType {
/// See [`crate::text::TextContent`]
Text,
Map,
List,
// TODO: Users can define their own container types.
// Custom(u16),
}
pub trait Container: Debug + Any + Unpin {
fn id(&self) -> &ContainerID;
fn type_(&self) -> ContainerType;
/// NOTE: this method expect that [LogStore] has store the Change
fn apply(&mut self, id_span: IdSpan, log: &LogStore);
fn checkout_version(&mut self, vv: &VersionVector);
fn get_value(&self) -> LoroValue;
// TODO: need a custom serializer
// fn serialize(&self) -> Vec<u8>;
/// convert an op to export format. for example [ListSlice] should be convert to str before export
/// convert an op content to exported format that includes the raw data
fn to_export(&mut self, op: &mut RemoteOp, gc: bool);
/// convert an op content to compact imported format
fn to_import(&mut self, op: &mut RemoteOp);
/// Apply the effect of the op directly to the state.
fn update_state_directly(&mut self, op: &RichOp);
/// Tracker need to retreat in order to apply the op.
/// TODO: can be merged into checkout
fn track_retreat(&mut self, spans: &IdSpanVector);
/// Tracker need to forward in order to apply the op.
/// TODO: can be merged into checkout
fn track_forward(&mut self, spans: &IdSpanVector);
/// Tracker need to checkout to target version in order to apply the op.
fn tracker_checkout(&mut self, vv: &VersionVector);
/// Apply the op to the tracker.
///
/// Here we have not updated the container state yet. Because we
/// need to calculate the effect of the op for [crate::List] and
/// [crate::Text] by using tracker.
fn track_apply(&mut self, op: &RichOp);
/// Make tracker iterate over the target spans and apply the calculated
/// effects to the container state
fn apply_tracked_effects_from(&mut self, from: &VersionVector, effect_spans: &IdSpanVector);
}
/// [ContainerID] includes the Op's [ID] and the type. So it's impossible to have

View file

@ -1,62 +0,0 @@
use crate::{InsertContentTrait, ID};
use rle::{HasLength, Mergable, Sliceable};
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone)]
pub(crate) enum Slot {}
#[non_exhaustive]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[cfg_attr(feature = "test_utils", derive(arbitrary::Arbitrary))]
pub enum ContainerType {
/// See [`crate::text::TextContent`]
Text,
Map,
List,
// TODO: Users can define their own container types.
// Custom(u16),
}
/// Container is a special kind of op content. Each container has its own CRDT implementation.
/// Each [Op] must be associated with a container.
///
#[derive(Debug, Clone)]
pub struct ContainerContent {
parent: ID,
container_type: ContainerType,
}
impl HasLength for ContainerContent {
fn content_len(&self) -> usize {
1
}
}
impl Mergable for ContainerContent {
fn is_mergable(&self, _: &Self, _: &()) -> bool
where
Self: Sized,
{
false
}
fn merge(&mut self, _: &Self, _: &())
where
Self: Sized,
{
unreachable!()
}
}
impl Sliceable for ContainerContent {
fn slice(&self, from: usize, to: usize) -> Self {
assert!(from == 0 && to == 1);
self.clone()
}
}
impl InsertContentTrait for ContainerContent {
fn id(&self) -> crate::ContentType {
crate::ContentType::Container
}
}

View file

@ -6,37 +6,32 @@ use std::{
use rle::{
rle_tree::{tree_trait::CumulateTreeTrait, HeapMode},
HasLength, RleTree, Sliceable,
RleTree,
};
use smallvec::{smallvec, SmallVec};
use crate::{
container::{
list::list_op::ListOp,
registry::{ContainerInstance, ContainerWrapper},
text::{
text_content::ListSlice,
text_content::{ListSlice, SliceRange},
tracker::{Effect, Tracker},
},
Container, ContainerID, ContainerType,
},
context::Context,
dag::DagUtils,
debug_log,
id::{Counter, ID},
op::{Content, Op, OpContent, RemoteOp},
span::{HasCounterSpan, HasIdSpan, IdSpan},
op::{Content, Op, RemoteOp, RichOp},
value::LoroValue,
LogStore,
version::IdSpanVector,
};
#[derive(Debug)]
pub struct ListContainer {
id: ContainerID,
state: RleTree<Range<u32>, CumulateTreeTrait<Range<u32>, 8, HeapMode>>,
state: RleTree<SliceRange, CumulateTreeTrait<SliceRange, 8, HeapMode>>,
raw_data: Pool,
tracker: Tracker,
head: SmallVec<[ID; 2]>,
}
#[derive(Debug, Default)]
@ -76,8 +71,6 @@ impl ListContainer {
raw_data: Pool::default(),
tracker: Tracker::new(Default::default(), 0),
state: Default::default(),
// TODO: should be eq to log_store frontier?
head: Default::default(),
}
}
@ -90,23 +83,16 @@ impl ListContainer {
let mut store = store.write().unwrap();
let id = store.next_id();
let slice = self.raw_data.alloc_arr(values);
self.state.insert(pos, slice.clone());
self.state.insert(pos, slice.clone().into());
let op = Op::new(
id,
OpContent::Normal {
content: Content::List(ListOp::Insert {
slice: slice.into(),
pos,
}),
},
Content::List(ListOp::Insert {
slice: slice.into(),
pos,
}),
store.get_or_create_container_idx(&self.id),
);
let last_id = ID::new(
store.this_client_id,
op.counter + op.atom_len() as Counter - 1,
);
store.append_local_ops(&[op]);
self.head = smallvec![last_id];
}
pub fn insert<C: Context, V: Into<LoroValue>>(
@ -119,23 +105,16 @@ impl ListContainer {
let mut store = store.write().unwrap();
let id = store.next_id();
let slice = self.raw_data.alloc(value);
self.state.insert(pos, slice.clone());
self.state.insert(pos, slice.clone().into());
let op = Op::new(
id,
OpContent::Normal {
content: Content::List(ListOp::Insert {
slice: slice.into(),
pos,
}),
},
Content::List(ListOp::Insert {
slice: slice.into(),
pos,
}),
store.get_or_create_container_idx(&self.id),
);
let last_id = ID::new(
store.this_client_id,
op.counter + op.atom_len() as Counter - 1,
);
store.append_local_ops(&[op]);
self.head = smallvec![last_id];
Some(id)
}
@ -154,16 +133,12 @@ impl ListContainer {
let id = store.next_id();
let op = Op::new(
id,
OpContent::Normal {
content: Content::List(ListOp::new_del(pos, len)),
},
Content::List(ListOp::new_del(pos, len)),
store.get_or_create_container_idx(&self.id),
);
let last_id = ID::new(store.this_client_id, op.ctr_last());
store.append_local_ops(&[op]);
self.state.delete_range(Some(pos), Some(pos + len));
self.head = smallvec![last_id];
Some(id)
}
@ -175,7 +150,7 @@ impl ListContainer {
) -> ContainerID {
let m = ctx.log_store();
let mut store = m.write().unwrap();
let container_id = store.create_container(obj, self.id.clone());
let container_id = store.create_container(obj);
// TODO: we can avoid this lock
drop(store);
self.insert(
@ -214,196 +189,12 @@ impl Container for ListContainer {
ContainerType::Text
}
// TODO: move main logic to tracker module
fn apply(&mut self, id_span: IdSpan, store: &LogStore) {
debug_log!("APPLY ENTRY client={}", store.this_client_id);
let self_idx = store.get_container_idx(&self.id).unwrap();
let new_op_id = id_span.id_last();
// TODO: may reduce following two into one op
let common_ancestors = store.find_common_ancestor(&[new_op_id], &self.head);
let vv = store.get_vv();
if common_ancestors == self.head {
let latest_head = smallvec![new_op_id];
let path = store.find_path(&self.head, &latest_head);
if path.right.len() == 1 {
// linear updates, we can apply them directly
let start = vv.get(&new_op_id.client_id).copied().unwrap_or(0);
for op in store.iter_ops_at_id_span(
IdSpan::new(new_op_id.client_id, start, new_op_id.counter + 1),
self.id.clone(),
) {
let op = op.get_sliced();
debug_log!("APPLY {:?}", &op);
match &op.content {
OpContent::Normal {
content: Content::List(op),
} => match op {
ListOp::Insert { slice, pos } => {
self.state.insert(*pos, slice.as_slice().unwrap().clone().0)
}
ListOp::Delete(span) => self.state.delete_range(
Some(span.start() as usize),
Some(span.end() as usize),
),
},
OpContent::Normal {
content: Content::Container(_),
} => {}
_ => unreachable!(),
}
}
self.head = latest_head;
return;
} else {
let path: Vec<_> = store.iter_partial(&self.head, path.right).collect();
if path
.iter()
.all(|x| x.forward.is_empty() && x.retreat.is_empty())
{
// if we don't need to retreat or forward, we can update the state directly
for iter in path {
let change = iter
.data
.slice(iter.slice.start as usize, iter.slice.end as usize);
for op in change.ops.iter() {
if op.container == self_idx {
debug_log!("APPLY 1 {:?}", &op);
match &op.content {
OpContent::Normal {
content: Content::List(op),
} => match op {
ListOp::Insert { slice, pos } => self
.state
.insert(*pos, slice.as_slice().unwrap().clone().0),
ListOp::Delete(span) => self.state.delete_range(
Some(span.start() as usize),
Some(span.end() as usize),
),
},
OpContent::Normal {
content: Content::Container(_),
} => {}
_ => unreachable!(),
}
}
}
}
self.head = latest_head;
return;
}
}
}
let path_to_head = store.find_path(&common_ancestors, &self.head);
let mut common_ancestors_vv = vv.clone();
common_ancestors_vv.retreat(&path_to_head.right);
let mut latest_head: SmallVec<[ID; 2]> = self.head.clone();
latest_head.retain(|x| !common_ancestors_vv.includes_id(*x));
latest_head.push(new_op_id);
// println!("{}", store.mermaid());
debug_log!(
"START FROM HEADS={:?} new_op_id={} self.head={:?}",
&common_ancestors,
new_op_id,
&self.head
);
let tracker_head = if (common_ancestors.is_empty() && !self.tracker.start_vv().is_empty())
|| !common_ancestors.iter().all(|x| self.tracker.contains(*x))
{
debug_log!("NewTracker");
self.tracker = Tracker::new(common_ancestors_vv, Counter::MAX / 2);
common_ancestors
} else {
debug_log!("OldTracker");
self.tracker.checkout_to_latest();
self.tracker.all_vv().get_head()
};
// stage 1
let path = store.find_path(&tracker_head, &latest_head);
debug_log!("path={:?}", &path);
for iter in store.iter_partial(&tracker_head, path.right) {
// TODO: avoid this clone
let change = iter
.data
.slice(iter.slice.start as usize, iter.slice.end as usize);
debug_log!(
"Stage1 retreat:{} forward:{}\n{}",
format!("{:?}", &iter.retreat).red(),
format!("{:?}", &iter.forward).red(),
format!("{:#?}", &change).blue(),
);
self.tracker.retreat(&iter.retreat);
self.tracker.forward(&iter.forward);
for op in change.ops.iter() {
if op.container == self_idx
&& op
.content
.as_normal()
.map(|x| x.as_list().is_some())
.unwrap_or(false)
{
// TODO: convert op to local
self.tracker.apply(
ID {
client_id: change.id.client_id,
counter: op.counter,
},
&op.content,
)
}
}
}
// stage 2
// TODO: reduce computations
let path = store.find_path(&self.head, &latest_head);
debug_log!("BEFORE CHECKOUT");
self.tracker.checkout(vv);
debug_log!("AFTER CHECKOUT");
debug_log!(
"[Stage 2]: Iterate path: {} from {} => {}",
format!("{:?}", path.right).red(),
format!("{:?}", self.head).red(),
format!("{:?}", latest_head).red(),
);
debug_log!(
"BEFORE EFFECT STATE={:?}",
self.get_value().as_list().unwrap()
);
for effect in self.tracker.iter_effects(path.right) {
debug_log!("EFFECT: {:?}", &effect);
match effect {
Effect::Del { pos, len } => self.state.delete_range(Some(pos), Some(pos + len)),
Effect::Ins { pos, content } => {
self.state
.insert(pos, content.as_slice().unwrap().clone().0);
}
}
debug_log!("AFTER EFFECT");
}
debug_log!(
"AFTER EFFECT STATE={:?}",
self.get_value().as_list().unwrap()
);
self.head = latest_head;
debug_log!("--------------------------------");
}
fn checkout_version(&mut self, _vv: &crate::VersionVector) {
todo!()
}
// TODO: maybe we need to let this return Cow
fn get_value(&self) -> LoroValue {
let mut values = Vec::new();
for range in self.state.iter() {
let content = range.as_ref();
for value in self.raw_data.slice(content) {
for value in self.raw_data.slice(&content.0) {
values.push(value.clone());
}
}
@ -413,11 +204,7 @@ impl Container for ListContainer {
fn to_export(&mut self, op: &mut RemoteOp, _gc: bool) {
for content in op.contents.iter_mut() {
if let Some((slice, _pos)) = content
.as_normal_mut()
.and_then(|c| c.as_list_mut())
.and_then(|x| x.as_insert_mut())
{
if let Some((slice, _pos)) = content.as_list_mut().and_then(|x| x.as_insert_mut()) {
if let Some(change) = if let ListSlice::Slice(ranges) = slice {
Some(self.raw_data.slice(&ranges.0))
} else {
@ -431,11 +218,7 @@ impl Container for ListContainer {
fn to_import(&mut self, op: &mut RemoteOp) {
for content in op.contents.iter_mut() {
if let Some((slice, _pos)) = content
.as_normal_mut()
.and_then(|c| c.as_list_mut())
.and_then(|x| x.as_insert_mut())
{
if let Some((slice, _pos)) = content.as_list_mut().and_then(|x| x.as_insert_mut()) {
if let Some(slice_range) = match std::mem::take(slice) {
ListSlice::RawData(data) => Some(self.raw_data.alloc_arr(data)),
_ => unreachable!(),
@ -445,6 +228,64 @@ impl Container for ListContainer {
}
}
}
fn update_state_directly(&mut self, op: &RichOp) {
match &op.get_sliced().content {
Content::List(op) => match op {
ListOp::Insert { slice, pos } => {
self.state.insert(*pos, slice.as_slice().unwrap().clone())
}
ListOp::Delete(span) => self
.state
.delete_range(Some(span.start() as usize), Some(span.end() as usize)),
},
_ => unreachable!(),
}
}
fn track_retreat(&mut self, spans: &IdSpanVector) {
self.tracker.retreat(spans);
}
fn track_forward(&mut self, spans: &IdSpanVector) {
self.tracker.forward(spans);
}
fn tracker_checkout(&mut self, vv: &crate::VersionVector) {
if (!vv.is_empty() || self.tracker.start_vv().is_empty())
&& self.tracker.all_vv() >= vv
&& vv >= self.tracker.start_vv()
{
self.tracker.checkout(vv);
} else {
self.tracker = Tracker::new(vv.clone(), Counter::MAX / 2);
}
}
fn track_apply(&mut self, rich_op: &RichOp) {
self.tracker.track_apply(rich_op);
}
fn apply_tracked_effects_from(
&mut self,
from: &crate::VersionVector,
effect_spans: &IdSpanVector,
) {
for effect in self.tracker.iter_effects(from, effect_spans) {
match effect {
Effect::Del { pos, len } => self.state.delete_range(Some(pos), Some(pos + len)),
Effect::Ins { pos, content } => {
let v = match content {
ListSlice::Slice(slice) => slice.clone(),
ListSlice::Unknown(u) => ListSlice::unknown_range(u),
_ => unreachable!(),
};
self.state.insert(pos, v)
}
}
}
}
}
pub struct List {
@ -486,9 +327,14 @@ impl List {
self.with_container(|text| text.delete(ctx, pos, len))
}
pub fn values_len(&self) -> usize {
pub fn len(&self) -> usize {
self.with_container(|text| text.values_len())
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.len() == 0
}
}
impl ContainerWrapper for List {

View file

@ -6,7 +6,7 @@ use rle::{HasLength, Mergable, Sliceable};
use crate::container::text::text_content::ListSlice;
#[derive(EnumAsInner, Debug, Clone)]
pub(crate) enum ListOp {
pub enum ListOp {
Insert { slice: ListSlice, pos: usize },
Delete(DeleteSpan),
}
@ -18,7 +18,7 @@ pub(crate) enum ListOp {
///
/// pos: 5, len: -3 eq a range of (2, 5]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) struct DeleteSpan {
pub struct DeleteSpan {
pub pos: isize,
pub len: isize,
}

View file

@ -3,18 +3,17 @@ use std::sync::{Arc, Mutex};
use fxhash::FxHashMap;
use crate::{
change::Lamport,
container::{
registry::{ContainerInstance, ContainerWrapper},
Container, ContainerID, ContainerType,
},
context::Context,
op::RemoteOp,
op::{Content, Op, RichOp},
op::{OpContent, RemoteOp},
span::IdSpan,
span::HasLamport,
value::LoroValue,
version::TotalOrderStamp,
InternalString, LogStore,
version::{IdSpanVector, TotalOrderStamp},
InternalString,
};
use super::MapSet;
@ -34,6 +33,7 @@ struct ValueSlot {
order: TotalOrderStamp,
}
// FIXME: make map container support checkout to certain version
impl MapContainer {
#[inline]
pub(crate) fn new(id: ContainerID) -> Self {
@ -65,12 +65,10 @@ impl MapContainer {
store.append_local_ops(&[Op {
counter: id.counter,
container,
content: OpContent::Normal {
content: Content::Map(MapSet {
key: key.clone(),
value: value.clone(),
}),
},
content: Content::Map(MapSet {
key: key.clone(),
value: value.clone(),
}),
}]);
self.state.insert(key, ValueSlot { value, order });
@ -86,7 +84,7 @@ impl MapContainer {
let m = ctx.log_store();
let mut store = m.write().unwrap();
let client_id = store.this_client_id;
let container_id = store.create_container(obj, self_id.clone());
let container_id = store.create_container(obj);
// TODO: store this value?
let id = store.next_id_for(client_id);
let container = store.get_container_idx(self_id).unwrap();
@ -98,12 +96,10 @@ impl MapContainer {
store.append_local_ops(&[Op {
counter: id.counter,
container,
content: OpContent::Normal {
content: Content::Map(MapSet {
key: key.clone(),
value: container_id.clone().into(),
}),
},
content: Content::Map(MapSet {
key: key.clone(),
value: container_id.clone().into(),
}),
}]);
self.state.insert(
key,
@ -131,43 +127,6 @@ impl Container for MapContainer {
ContainerType::Map
}
fn apply(&mut self, id_span: IdSpan, log: &LogStore) {
for RichOp {
op, lamport, start, ..
} in log.iter_ops_at_id_span(id_span, self.id.clone())
{
match &op.content {
OpContent::Normal { content } => {
if content.as_container().is_some() {
continue;
}
let v: &MapSet = content.as_map().unwrap();
let order = TotalOrderStamp {
lamport: lamport + start as Lamport,
client_id: id_span.client_id,
};
if let Some(slot) = self.state.get_mut(&v.key) {
if slot.order < order {
// TODO: can avoid this clone
slot.value = v.value.clone();
slot.order = order;
}
} else {
self.state.insert(
v.key.to_owned(),
ValueSlot {
value: v.value.clone(),
order,
},
);
}
}
_ => unreachable!(),
}
}
}
fn get_value(&self) -> LoroValue {
let mut map = FxHashMap::default();
for (key, value) in self.state.iter() {
@ -185,13 +144,44 @@ impl Container for MapContainer {
map.into()
}
fn checkout_version(&mut self, _vv: &crate::version::VersionVector) {
todo!()
}
fn tracker_checkout(&mut self, _vv: &crate::version::VersionVector) {}
fn to_export(&mut self, _op: &mut RemoteOp, _gc: bool) {}
fn to_import(&mut self, _op: &mut RemoteOp) {}
fn update_state_directly(&mut self, op: &RichOp) {
let content = op.get_sliced().content;
let v: &MapSet = content.as_map().unwrap();
let order = TotalOrderStamp {
lamport: op.lamport(),
client_id: op.client_id(),
};
if let Some(slot) = self.state.get_mut(&v.key) {
if slot.order < order {
slot.value = v.value.clone();
slot.order = order;
}
} else {
self.state.insert(
v.key.to_owned(),
ValueSlot {
value: v.value.clone(),
order,
},
);
}
}
fn track_retreat(&mut self, _: &IdSpanVector) {}
fn track_forward(&mut self, _: &IdSpanVector) {}
fn apply_tracked_effects_from(&mut self, _: &crate::VersionVector, _: &IdSpanVector) {}
fn track_apply(&mut self, op: &RichOp) {
self.update_state_directly(op);
}
}
pub struct Map {
@ -227,6 +217,23 @@ impl Map {
map.delete(ctx, key.into());
})
}
pub fn id(&self) -> ContainerID {
self.instance.lock().unwrap().as_map().unwrap().id.clone()
}
pub fn get_value(&self) -> LoroValue {
self.instance.lock().unwrap().as_map().unwrap().get_value()
}
pub fn len(&self) -> usize {
self.with_container(|map| map.state.len())
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.len() == 0
}
}
impl ContainerWrapper for Map {

View file

@ -4,7 +4,7 @@ use crate::{ContentType, InsertContentTrait, InternalString, LoroValue};
// TODO: use imported and exported format to save the space
#[derive(Clone, Debug, PartialEq)]
pub(crate) struct MapSet {
pub struct MapSet {
pub(crate) key: InternalString,
pub(crate) value: LoroValue,
}

View file

@ -7,8 +7,6 @@ use fxhash::FxHashMap;
use proptest::prelude::*;
use proptest::proptest;
use crate::container::registry::ContainerWrapper;
use crate::{fx_map, LoroCore, LoroValue};
#[test]

View file

@ -1,20 +1,27 @@
use std::{
ops::{Deref, DerefMut},
sync::{Arc, Mutex, RwLockReadGuard, RwLockWriteGuard},
sync::{Arc, Mutex, RwLockWriteGuard},
};
use enum_as_inner::EnumAsInner;
use fxhash::FxHashMap;
use owning_ref::{OwningRef, OwningRefMut};
use owning_ref::OwningRefMut;
use crate::{context::Context, id::ContainerIdx, op::RemoteOp, span::IdSpan, LogStore, LoroValue};
use crate::{
context::Context,
id::ContainerIdx,
op::{RemoteOp, RichOp},
version::IdSpanVector,
LoroValue, VersionVector,
};
use super::{
list::ListContainer, map::MapContainer, text::TextContainer, Container, ContainerID,
ContainerType,
};
// TODO: replace this with a fat pointer?
#[derive(Debug, EnumAsInner)]
pub enum ContainerInstance {
Map(Box<MapContainer>),
@ -37,26 +44,17 @@ impl Container for ContainerInstance {
match self {
ContainerInstance::Map(_) => ContainerType::Map,
ContainerInstance::Text(_) => ContainerType::Text,
ContainerInstance::Dyn(x) => x.type_(),
ContainerInstance::List(_) => ContainerType::List,
ContainerInstance::Dyn(x) => x.type_(),
}
}
fn apply(&mut self, id_span: IdSpan, log: &LogStore) {
fn tracker_checkout(&mut self, vv: &crate::VersionVector) {
match self {
ContainerInstance::Map(x) => x.apply(id_span, log),
ContainerInstance::Text(x) => x.apply(id_span, log),
ContainerInstance::Dyn(x) => x.apply(id_span, log),
ContainerInstance::List(x) => x.apply(id_span, log),
}
}
fn checkout_version(&mut self, vv: &crate::VersionVector) {
match self {
ContainerInstance::Map(x) => x.checkout_version(vv),
ContainerInstance::Text(x) => x.checkout_version(vv),
ContainerInstance::Dyn(x) => x.checkout_version(vv),
ContainerInstance::List(x) => x.checkout_version(vv),
ContainerInstance::Map(x) => x.tracker_checkout(vv),
ContainerInstance::Text(x) => x.tracker_checkout(vv),
ContainerInstance::Dyn(x) => x.tracker_checkout(vv),
ContainerInstance::List(x) => x.tracker_checkout(vv),
}
}
@ -85,6 +83,51 @@ impl Container for ContainerInstance {
ContainerInstance::List(x) => x.to_import(op),
}
}
fn update_state_directly(&mut self, op: &RichOp) {
match self {
ContainerInstance::Map(x) => x.update_state_directly(op),
ContainerInstance::Text(x) => x.update_state_directly(op),
ContainerInstance::Dyn(x) => x.update_state_directly(op),
ContainerInstance::List(x) => x.update_state_directly(op),
}
}
fn track_retreat(&mut self, op: &IdSpanVector) {
match self {
ContainerInstance::Map(x) => x.track_retreat(op),
ContainerInstance::Text(x) => x.track_retreat(op),
ContainerInstance::Dyn(x) => x.track_retreat(op),
ContainerInstance::List(x) => x.track_retreat(op),
}
}
fn track_forward(&mut self, op: &IdSpanVector) {
match self {
ContainerInstance::Map(x) => x.track_forward(op),
ContainerInstance::Text(x) => x.track_forward(op),
ContainerInstance::Dyn(x) => x.track_forward(op),
ContainerInstance::List(x) => x.track_forward(op),
}
}
fn track_apply(&mut self, op: &RichOp) {
match self {
ContainerInstance::Map(x) => x.track_apply(op),
ContainerInstance::Text(x) => x.track_apply(op),
ContainerInstance::Dyn(x) => x.track_apply(op),
ContainerInstance::List(x) => x.track_apply(op),
}
}
fn apply_tracked_effects_from(&mut self, from: &VersionVector, effect_spans: &IdSpanVector) {
match self {
ContainerInstance::Map(x) => x.apply_tracked_effects_from(from, effect_spans),
ContainerInstance::Text(x) => x.apply_tracked_effects_from(from, effect_spans),
ContainerInstance::Dyn(x) => x.apply_tracked_effects_from(from, effect_spans),
ContainerInstance::List(x) => x.apply_tracked_effects_from(from, effect_spans),
}
}
}
// TODO: containers snapshot: we need to resolve each container's parent even
@ -140,13 +183,15 @@ impl ContainerRegistry {
}
#[inline(always)]
fn insert(&mut self, id: ContainerID, container: ContainerInstance) {
fn insert(&mut self, id: ContainerID, container: ContainerInstance) -> ContainerIdx {
let idx = self.next_idx();
self.container_to_idx.insert(id.clone(), idx);
self.containers.push(ContainerAndId {
container: Arc::new(Mutex::new(container)),
id,
});
idx
}
#[inline(always)]
@ -154,6 +199,11 @@ impl ContainerRegistry {
self.containers.len() as ContainerIdx
}
pub(crate) fn register(&mut self, id: &ContainerID) {
let container = self.create(id.clone());
self.insert(id.clone(), container);
}
pub(crate) fn get_or_create(&mut self, id: &ContainerID) -> &Arc<Mutex<ContainerInstance>> {
if !self.container_to_idx.contains_key(id) {
let container = self.create(id.clone());
@ -165,12 +215,12 @@ impl ContainerRegistry {
}
pub(crate) fn get_or_create_container_idx(&mut self, id: &ContainerID) -> ContainerIdx {
if !self.container_to_idx.contains_key(id) {
if let Some(idx) = self.container_to_idx.get(id) {
*idx
} else {
let container = self.create(id.clone());
self.insert(id.clone(), container);
self.insert(id.clone(), container)
}
self.get_idx(id).unwrap()
}
#[cfg(feature = "test_utils")]
@ -200,10 +250,6 @@ pub struct ContainerRefMut<'a, T> {
value: OwningRefMut<RwLockWriteGuard<'a, ContainerRegistry>, Box<T>>,
}
pub struct ContainerRef<'a, T> {
value: OwningRef<RwLockReadGuard<'a, ContainerRegistry>, Box<T>>,
}
impl<'a, T> From<OwningRefMut<RwLockWriteGuard<'a, ContainerRegistry>, Box<T>>>
for ContainerRefMut<'a, T>
{
@ -212,14 +258,6 @@ impl<'a, T> From<OwningRefMut<RwLockWriteGuard<'a, ContainerRegistry>, Box<T>>>
}
}
impl<'a, T> From<OwningRef<RwLockReadGuard<'a, ContainerRegistry>, Box<T>>>
for ContainerRef<'a, T>
{
fn from(value: OwningRef<RwLockReadGuard<'a, ContainerRegistry>, Box<T>>) -> Self {
ContainerRef { value }
}
}
impl<'a, T> Deref for ContainerRefMut<'a, T> {
type Target = T;

View file

@ -2,9 +2,8 @@ use std::sync::{Arc, Mutex};
use rle::{
rle_tree::{tree_trait::CumulateTreeTrait, HeapMode},
HasLength, RleTree, RleVec, Sliceable,
HasLength, RleTree, RleVec,
};
use smallvec::{smallvec, SmallVec};
use crate::{
container::{
@ -13,13 +12,11 @@ use crate::{
Container, ContainerID, ContainerType,
},
context::Context,
dag::DagUtils,
debug_log,
id::{Counter, ID},
op::{Content, Op, OpContent, RemoteOp},
span::{HasCounterSpan, HasIdSpan, IdSpan},
op::{Content, Op, RemoteOp, RichOp},
value::LoroValue,
LogStore,
version::IdSpanVector,
};
use super::{
@ -28,19 +25,12 @@ use super::{
tracker::{Effect, Tracker},
};
#[derive(Clone, Debug)]
struct DagNode {
id: IdSpan,
deps: SmallVec<[ID; 2]>,
}
#[derive(Debug)]
pub struct TextContainer {
id: ContainerID,
state: RleTree<SliceRange, CumulateTreeTrait<SliceRange, 8, HeapMode>>,
raw_str: StringPool,
tracker: Tracker,
head: SmallVec<[ID; 2]>,
}
impl TextContainer {
@ -50,8 +40,6 @@ impl TextContainer {
raw_str: StringPool::default(),
tracker: Tracker::new(Default::default(), 0),
state: Default::default(),
// TODO: should be eq to log_store frontier?
head: Default::default(),
}
}
@ -71,20 +59,13 @@ impl TextContainer {
self.state.insert(pos, slice.clone().into());
let op = Op::new(
id,
OpContent::Normal {
content: Content::List(ListOp::Insert {
slice: slice.into(),
pos,
}),
},
Content::List(ListOp::Insert {
slice: slice.into(),
pos,
}),
store.get_or_create_container_idx(&self.id),
);
let last_id = ID::new(
store.this_client_id,
op.counter + op.atom_len() as Counter - 1,
);
store.append_local_ops(&[op]);
self.head = smallvec![last_id];
Some(id)
}
@ -103,16 +84,12 @@ impl TextContainer {
let id = store.next_id();
let op = Op::new(
id,
OpContent::Normal {
content: Content::List(ListOp::new_del(pos, len)),
},
Content::List(ListOp::new_del(pos, len)),
store.get_or_create_container_idx(&self.id),
);
let last_id = ID::new(store.this_client_id, op.ctr_last());
store.append_local_ops(&[op]);
self.state.delete_range(Some(pos), Some(pos + len));
self.head = smallvec![last_id];
Some(id)
}
@ -144,197 +121,6 @@ impl Container for TextContainer {
ContainerType::Text
}
// TODO: move main logic to tracker module
fn apply(&mut self, id_span: IdSpan, store: &LogStore) {
debug_log!("APPLY ENTRY client={}", store.this_client_id);
let self_idx = store.get_container_idx(&self.id).unwrap();
let new_op_id = id_span.id_last();
// TODO: may reduce following two into one op
let common_ancestors = store.find_common_ancestor(&[new_op_id], &self.head);
let vv = store.get_vv();
if common_ancestors == self.head {
let latest_head = smallvec![new_op_id];
let path = store.find_path(&self.head, &latest_head);
if path.right.len() == 1 {
// linear updates, we can apply them directly
let start = vv.get(&new_op_id.client_id).copied().unwrap_or(0);
for op in store.iter_ops_at_id_span(
IdSpan::new(new_op_id.client_id, start, new_op_id.counter + 1),
self.id.clone(),
) {
let op = op.get_sliced();
match &op.content {
OpContent::Normal {
content: Content::List(op),
} => match op {
ListOp::Insert { slice, pos } => {
let v = match slice {
ListSlice::Slice(slice) => slice.clone(),
ListSlice::Unknown(u) => ListSlice::unknown_range(*u),
_ => unreachable!(),
};
self.state.insert(*pos, v)
}
ListOp::Delete(span) => self.state.delete_range(
Some(span.start() as usize),
Some(span.end() as usize),
),
},
_ => unreachable!(),
}
}
self.head = latest_head;
return;
} else {
let path: Vec<_> = store.iter_partial(&self.head, path.right).collect();
if path
.iter()
.all(|x| x.forward.is_empty() && x.retreat.is_empty())
{
// if we don't need to retreat or forward, we can update the state directly
for iter in path {
let change = iter
.data
.slice(iter.slice.start as usize, iter.slice.end as usize);
for op in change.ops.iter() {
if op.container == self_idx {
match &op.content {
OpContent::Normal {
content: Content::List(op),
} => match op {
ListOp::Insert { slice, pos } => {
let v = match slice {
ListSlice::Slice(slice) => slice.clone(),
ListSlice::Unknown(u) => {
ListSlice::unknown_range(*u)
}
_ => unreachable!(),
};
self.state.insert(*pos, v)
}
ListOp::Delete(span) => self.state.delete_range(
Some(span.start() as usize),
Some(span.end() as usize),
),
},
_ => unreachable!(),
}
}
}
}
self.head = latest_head;
return;
}
}
}
let path_to_head = store.find_path(&common_ancestors, &self.head);
let mut common_ancestors_vv = vv.clone();
common_ancestors_vv.retreat(&path_to_head.right);
let mut latest_head: SmallVec<[ID; 2]> = self.head.clone();
latest_head.retain(|x| !common_ancestors_vv.includes_id(*x));
latest_head.push(new_op_id);
// println!("{}", store.mermaid());
debug_log!(
"START FROM HEADS={:?} new_op_id={} self.head={:?}",
&common_ancestors,
new_op_id,
&self.head
);
let tracker_head = if (common_ancestors.is_empty() && !self.tracker.start_vv().is_empty())
|| !common_ancestors.iter().all(|x| self.tracker.contains(*x))
{
debug_log!("NewTracker");
self.tracker = Tracker::new(common_ancestors_vv, Counter::MAX / 2);
common_ancestors
} else {
debug_log!("OldTracker");
self.tracker.checkout_to_latest();
self.tracker.all_vv().get_head()
};
// stage 1
let path = store.find_path(&tracker_head, &latest_head);
debug_log!("path={:?}", &path.right);
for iter in store.iter_partial(&tracker_head, path.right) {
// TODO: avoid this clone
let change = iter
.data
.slice(iter.slice.start as usize, iter.slice.end as usize);
debug_log!(
"Stage1 retreat:{} forward:{}\n{}",
format!("{:?}", &iter.retreat).red(),
format!("{:?}", &iter.forward).red(),
format!("{:#?}", &change).blue(),
);
self.tracker.retreat(&iter.retreat);
self.tracker.forward(&iter.forward);
for op in change.ops.iter() {
if op.container == self_idx {
// TODO: convert op to local
self.tracker.apply(
ID {
client_id: change.id.client_id,
counter: op.counter,
},
&op.content,
)
}
}
}
// stage 2
// TODO: reduce computations
let path = store.find_path(&self.head, &latest_head);
debug_log!("BEFORE CHECKOUT");
// dbg!(&self.tracker);
self.tracker.checkout(vv);
debug_log!("AFTER CHECKOUT");
// dbg!(&self.tracker);
debug_log!(
"[Stage 2]: Iterate path: {} from {} => {}",
format!("{:?}", path.right).red(),
format!("{:?}", self.head).red(),
format!("{:?}", latest_head).red(),
);
debug_log!(
"BEFORE EFFECT STATE={}",
self.get_value().as_string().unwrap()
);
for effect in self.tracker.iter_effects(path.right) {
debug_log!("EFFECT: {:?}", &effect);
match effect {
Effect::Del { pos, len } => self.state.delete_range(Some(pos), Some(pos + len)),
Effect::Ins { pos, content } => {
let v = match content {
ListSlice::Slice(slice) => slice.clone(),
ListSlice::Unknown(u) => ListSlice::unknown_range(u),
_ => unreachable!(),
};
self.state.insert(pos, v)
}
}
debug_log!("AFTER EFFECT");
}
debug_log!(
"AFTER EFFECT STATE={}",
self.get_value().as_string().unwrap()
);
self.head = latest_head;
debug_log!("--------------------------------");
}
fn checkout_version(&mut self, _vv: &crate::VersionVector) {
todo!()
}
// TODO: maybe we need to let this return Cow
fn get_value(&self) -> LoroValue {
let mut ans_str = String::new();
@ -356,13 +142,9 @@ impl Container for TextContainer {
.update_aliveness(self.state.iter().map(|x| x.as_ref().0.clone()))
}
let mut contents: RleVec<[OpContent; 1]> = RleVec::new();
let mut contents: RleVec<[Content; 1]> = RleVec::new();
for content in op.contents.iter_mut() {
if let Some((slice, pos)) = content
.as_normal_mut()
.and_then(|c| c.as_list_mut())
.and_then(|x| x.as_insert_mut())
{
if let Some((slice, pos)) = content.as_list_mut().and_then(|x| x.as_insert_mut()) {
match slice {
ListSlice::Slice(r) => {
if r.is_unknown() {
@ -376,22 +158,16 @@ impl Container for TextContainer {
for span in self.raw_str.get_aliveness(&r.0) {
match span {
Alive::True(span) => {
contents.push(OpContent::Normal {
content: Content::List(ListOp::Insert {
slice: ListSlice::RawStr(
s[start..start + span].into(),
),
pos: pos_start,
}),
});
contents.push(Content::List(ListOp::Insert {
slice: ListSlice::RawStr(s[start..start + span].into()),
pos: pos_start,
}));
}
Alive::False(span) => {
let v = OpContent::Normal {
content: Content::List(ListOp::Insert {
slice: ListSlice::Unknown(span),
pos: pos_start,
}),
};
let v = Content::List(ListOp::Insert {
slice: ListSlice::Unknown(span),
pos: pos_start,
});
contents.push(v);
}
}
@ -401,21 +177,17 @@ impl Container for TextContainer {
}
assert_eq!(start, r.atom_len());
} else {
contents.push(OpContent::Normal {
content: Content::List(ListOp::Insert {
slice: ListSlice::RawStr(s),
pos: *pos,
}),
});
contents.push(Content::List(ListOp::Insert {
slice: ListSlice::RawStr(s),
pos: *pos,
}));
}
}
this => {
contents.push(OpContent::Normal {
content: Content::List(ListOp::Insert {
slice: this.clone(),
pos: *pos,
}),
});
contents.push(Content::List(ListOp::Insert {
slice: this.clone(),
pos: *pos,
}));
}
}
} else {
@ -427,12 +199,9 @@ impl Container for TextContainer {
}
fn to_import(&mut self, op: &mut RemoteOp) {
debug_log!("IMPORT {:#?}", &op);
for content in op.contents.iter_mut() {
if let Some((slice, _pos)) = content
.as_normal_mut()
.and_then(|c| c.as_list_mut())
.and_then(|x| x.as_insert_mut())
{
if let Some((slice, _pos)) = content.as_list_mut().and_then(|x| x.as_insert_mut()) {
if let Some(slice_range) = match slice {
ListSlice::RawStr(s) => {
let range = self.raw_str.alloc(s);
@ -446,6 +215,79 @@ impl Container for TextContainer {
}
}
}
debug_log!("IMPORTED {:#?}", &op);
}
fn update_state_directly(&mut self, op: &RichOp) {
match &op.get_sliced().content {
Content::List(op) => match op {
ListOp::Insert { slice, pos } => {
let v = match slice {
ListSlice::Slice(slice) => slice.clone(),
ListSlice::Unknown(u) => ListSlice::unknown_range(*u),
_ => unreachable!(),
};
self.state.insert(*pos, v)
}
ListOp::Delete(span) => self
.state
.delete_range(Some(span.start() as usize), Some(span.end() as usize)),
},
_ => unreachable!(),
}
}
fn track_retreat(&mut self, spans: &IdSpanVector) {
debug_log!("TRACKER RETREAT {:#?}", &spans);
self.tracker.retreat(spans);
}
fn track_forward(&mut self, spans: &IdSpanVector) {
debug_log!("TRACKER FORWARD {:#?}", &spans);
self.tracker.forward(spans);
}
fn tracker_checkout(&mut self, vv: &crate::VersionVector) {
debug_log!("Tracker checkout {:?}", vv);
if (!vv.is_empty() || self.tracker.start_vv().is_empty())
&& self.tracker.all_vv() >= vv
&& vv >= self.tracker.start_vv()
{
debug_log!("OLD Tracker");
self.tracker.checkout(vv);
} else {
debug_log!("NEW Tracker");
self.tracker = Tracker::new(vv.clone(), Counter::MAX / 2);
}
}
fn track_apply(&mut self, rich_op: &RichOp) {
self.tracker.track_apply(rich_op);
}
fn apply_tracked_effects_from(
&mut self,
from: &crate::VersionVector,
effect_spans: &IdSpanVector,
) {
debug_log!("BEFORE APPLY EFFECT {:?}", self.get_value());
for effect in self.tracker.iter_effects(from, effect_spans) {
debug_log!("APPLY EFFECT {:?}", &effect);
match effect {
Effect::Del { pos, len } => self.state.delete_range(Some(pos), Some(pos + len)),
Effect::Ins { pos, content } => {
let v = match content {
ListSlice::Slice(slice) => slice.clone(),
ListSlice::Unknown(u) => ListSlice::unknown_range(u),
_ => unreachable!(),
};
self.state.insert(pos, v)
}
}
}
debug_log!("AFTER APPLY EFFECT {:?}", self.get_value());
}
}
@ -462,6 +304,10 @@ impl Clone for Text {
}
impl Text {
pub fn id(&self) -> ContainerID {
self.instance.lock().unwrap().as_text().unwrap().id.clone()
}
pub fn insert<C: Context>(&mut self, ctx: &C, pos: usize, text: &str) -> Option<ID> {
self.with_container(|x| x.insert(ctx, pos, text))
}
@ -470,10 +316,18 @@ impl Text {
self.with_container(|text| text.delete(ctx, pos, len))
}
// TODO: can be len?
pub fn text_len(&self) -> usize {
pub fn get_value(&self) -> LoroValue {
self.instance.lock().unwrap().as_text().unwrap().get_value()
}
pub fn len(&self) -> usize {
self.with_container(|text| text.text_len())
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.len() == 0
}
}
impl ContainerWrapper for Text {

View file

@ -1,7 +1,7 @@
use std::ops::Range;
use enum_as_inner::EnumAsInner;
use rle::{rle_tree::tree_trait::CumulateTreeTrait, HasLength, Mergable, Sliceable};
use rle::{HasLength, Mergable, Sliceable};
use crate::{smstring::SmString, LoroValue};
@ -160,5 +160,3 @@ impl Mergable for ListSlice {
}
}
}
pub(super) type ListSliceTreeTrait = CumulateTreeTrait<ListSlice, 8>;

View file

@ -1,12 +1,12 @@
use rle::{rle_tree::UnsafeCursor, HasLength};
use rle::{rle_tree::UnsafeCursor, HasLength, Sliceable};
use smallvec::SmallVec;
use crate::{
container::{list::list_op::ListOp, text::tracker::yata_impl::YataImpl},
debug_log,
id::{Counter, ID},
op::OpContent,
span::{HasIdSpan, IdSpan},
op::{Content, RichOp},
span::{HasId, HasIdSpan, IdSpan},
version::IdSpanVector,
VersionVector,
};
@ -49,7 +49,12 @@ pub struct Tracker {
/// latest applied ops version vector
all_vv: VersionVector,
/// current content version vector
head_vv: VersionVector,
current_vv: VersionVector,
/// The pretend current content version vector.
///
/// Because sometimes we don't actually need to checkout to the version.
/// So we may cache the changes then applying them when we really need to.
cached_fake_current_vv: VersionVector,
content: ContentMap,
id_to_cursor: CursorMap,
}
@ -82,7 +87,8 @@ impl Tracker {
id_to_cursor,
#[cfg(feature = "test_utils")]
client_id: 0,
head_vv: start_vv.clone(),
current_vv: start_vv.clone(),
cached_fake_current_vv: start_vv.clone(),
all_vv: start_vv.clone(),
start_vv,
}
@ -97,13 +103,8 @@ impl Tracker {
&self.all_vv
}
#[inline]
pub fn head_vv(&self) -> &VersionVector {
&self.head_vv
}
pub fn contains(&self, id: ID) -> bool {
!self.start_vv.includes_id(id) && self.all_vv.includes_id(id)
!self.cached_fake_current_vv.includes_id(id) && self.all_vv.includes_id(id)
}
/// check whether id_to_cursor correctly reflect the status of the content
@ -133,19 +134,50 @@ impl Tracker {
}
pub fn checkout(&mut self, vv: &VersionVector) {
let diff = self.head_vv.diff(vv);
self.retreat(&diff.left);
self.forward(&diff.right);
debug_assert_eq!(&self.head_vv, vv);
self.cached_fake_current_vv = vv.clone();
}
pub fn checkout_to_latest(&mut self) {
let diff = self.head_vv.diff(&self.all_vv);
self.forward(&diff.right);
debug_assert_eq!(self.head_vv, self.all_vv);
fn real_checkout(&mut self) {
if self.current_vv == self.cached_fake_current_vv {
return;
}
let diff = self.current_vv.diff(&self.cached_fake_current_vv);
self.real_retreat(&diff.left);
self.real_forward(&diff.right);
debug_assert_eq!(&self.current_vv, &self.cached_fake_current_vv);
}
pub fn forward(&mut self, spans: &IdSpanVector) {
self.cached_fake_current_vv.forward(spans);
self.all_vv.forward(spans);
}
pub fn track_apply(&mut self, rich_op: &RichOp) {
let content = rich_op.get_sliced().content;
let id = rich_op.id_start();
if self
.all_vv()
.includes_id(id.inc(content.atom_len() as Counter - 1))
{
self.forward(&id.to_span(content.atom_len()).to_id_span_vec());
return;
}
if self.all_vv().includes_id(id) {
let this_ctr = self.all_vv().get(&id.client_id).unwrap();
let shift = this_ctr - id.counter;
self.forward(&id.to_span(shift as usize).to_id_span_vec());
self.apply(
id.inc(shift),
&content.slice(shift as usize, content.atom_len()),
);
} else {
self.apply(id, &content)
}
}
fn real_forward(&mut self, spans: &IdSpanVector) {
if spans.is_empty() {
return;
}
@ -154,7 +186,7 @@ impl Tracker {
let mut args = Vec::with_capacity(spans.len());
for span in spans.iter() {
let end_id = ID::new(*span.0, span.1.end);
self.head_vv.set_end(end_id);
self.current_vv.set_end(end_id);
if let Some(all_end_ctr) = self.all_vv.get_mut(span.0) {
let all_end = *all_end_ctr;
if all_end < span.1.end {
@ -204,6 +236,11 @@ impl Tracker {
}
pub fn retreat(&mut self, spans: &IdSpanVector) {
self.cached_fake_current_vv.retreat(spans);
self.all_vv.forward(spans);
}
fn real_retreat(&mut self, spans: &IdSpanVector) {
if spans.is_empty() {
return;
}
@ -212,7 +249,7 @@ impl Tracker {
let mut args = Vec::with_capacity(spans.len());
for span in spans.iter() {
let span_start = ID::new(*span.0, span.1.start);
self.head_vv.set_end(span_start);
self.current_vv.set_end(span_start);
if let Some(all_end_ctr) = self.all_vv.get_mut(span.0) {
let all_end = *all_end_ctr;
if all_end < span.1.start {
@ -266,52 +303,44 @@ impl Tracker {
}
/// apply an operation directly to the current tracker
pub(crate) fn apply(&mut self, id: ID, content: &OpContent) {
assert!(*self.head_vv.get(&id.client_id).unwrap_or(&0) <= id.counter);
fn apply(&mut self, id: ID, content: &Content) {
self.real_checkout();
assert!(*self.current_vv.get(&id.client_id).unwrap_or(&0) <= id.counter);
assert!(*self.all_vv.get(&id.client_id).unwrap_or(&0) <= id.counter);
self.head_vv.set_end(id.inc(content.content_len() as i32));
self.current_vv
.set_end(id.inc(content.content_len() as i32));
self.cached_fake_current_vv
.set_end(id.inc(content.content_len() as i32));
self.all_vv.set_end(id.inc(content.content_len() as i32));
match &content {
crate::op::OpContent::Normal { content } => {
let text_content = content.as_list().expect("Content is not for list");
match text_content {
ListOp::Insert { slice, pos } => {
let yspan = self.content.get_yspan_at_pos(
id,
*pos,
slice.content_len(),
slice.to_range(),
);
debug_log!("INSERT YSPAN={}", format!("{:#?}", &yspan).red());
// SAFETY: we know this is safe because in [YataImpl::insert_after] there is no access to shared elements
unsafe { crdt_list::yata::integrate::<YataImpl>(self, yspan) };
}
ListOp::Delete(span) => {
let mut spans = self
.content
.get_active_id_spans(span.start() as usize, span.atom_len());
debug_log!("DELETED SPANS={}", format!("{:#?}", &spans).red());
self.update_spans(&spans, StatusChange::Delete);
let text_content = content.as_list().expect("Content is not for list");
match text_content {
ListOp::Insert { slice, pos } => {
let yspan =
self.content
.get_yspan_at_pos(id, *pos, slice.content_len(), slice.to_range());
// SAFETY: we know this is safe because in [YataImpl::insert_after] there is no access to shared elements
unsafe { crdt_list::yata::integrate::<YataImpl>(self, yspan) };
}
ListOp::Delete(span) => {
let mut spans = self
.content
.get_active_id_spans(span.start() as usize, span.atom_len());
debug_log!("DELETED SPANS={}", format!("{:#?}", &spans).red());
self.update_spans(&spans, StatusChange::Delete);
if span.is_reversed() && span.atom_len() > 1 {
spans.reverse();
// SAFETY: we don't change the size of the span
unsafe {
for span in spans.iter_mut() {
span.reverse();
}
}
if span.is_reversed() && span.atom_len() > 1 {
spans.reverse();
// SAFETY: we don't change the size of the span
unsafe {
for span in spans.iter_mut() {
span.reverse();
}
self.id_to_cursor.set_small_range(
(id).into(),
cursor_map::Marker::Delete(Box::new(spans)),
);
}
}
self.id_to_cursor
.set_small_range((id).into(), cursor_map::Marker::Delete(Box::new(spans)));
}
crate::op::OpContent::Undo { .. } => todo!(),
crate::op::OpContent::Redo { .. } => todo!(),
}
}
@ -357,7 +386,9 @@ impl Tracker {
)
}
pub fn iter_effects(&mut self, target: IdSpanVector) -> EffectIter<'_> {
pub fn iter_effects(&mut self, from: &VersionVector, target: &IdSpanVector) -> EffectIter<'_> {
self.checkout(from);
self.real_checkout();
EffectIter::new(self, target)
}

View file

@ -17,7 +17,7 @@ pub struct EffectIter<'a> {
}
impl<'a> EffectIter<'a> {
pub fn new(tracker: &'a mut Tracker, target: IdSpanVector) -> Self {
pub fn new(tracker: &'a mut Tracker, target: &IdSpanVector) -> Self {
let spans = target
.iter()
.map(|(client, ctr)| IdSpan::new(*client, ctr.start, ctr.end))
@ -77,7 +77,8 @@ impl<'a> Iterator for EffectIter<'a> {
let pos = unsafe { cursor.get_index() };
let len = cursor.len;
*delete_op_id = delete_op_id.inc(cursor.len as Counter);
self.tracker.head_vv.set_end(*delete_op_id);
self.tracker.current_vv.set_end(*delete_op_id);
self.tracker.cached_fake_current_vv.set_end(*delete_op_id);
let length = -self.tracker.update_cursors(cursor, StatusChange::Delete);
assert!(length >= 0);
if length > 0 {
@ -107,7 +108,12 @@ impl<'a> Iterator for EffectIter<'a> {
// SAFETY: cursor is valid here
let content = unsafe { cursor.get_sliced().slice };
let len = cursor.len;
self.tracker.head_vv.set_end(id.inc(cursor.len as Counter));
self.tracker
.current_vv
.set_end(id.inc(cursor.len as Counter));
self.tracker
.cached_fake_current_vv
.set_end(id.inc(cursor.len as Counter));
let length_diff = self
.tracker
.update_cursors(cursor, StatusChange::SetAsCurrent);

View file

@ -192,11 +192,7 @@ impl HasLength for YSpan {
#[cfg(any(test, features = "test_utils"))]
pub mod test {
use crate::{
container::text::text_content::ListSlice,
op::{Content, OpContent},
ContentType, Op, ID,
};
use crate::{container::text::text_content::ListSlice, op::Content, ContentType, Op, ID};
use rle::{HasLength, RleVecWithIndex};
use super::YSpan;
@ -206,34 +202,30 @@ pub mod test {
let mut vec: RleVecWithIndex<Op> = RleVecWithIndex::new();
vec.push(Op::new(
ID::new(0, 1),
OpContent::Normal {
content: Content::Dyn(Box::new(YSpan {
origin_left: Some(ID::new(0, 0)),
origin_right: None,
id: ID::new(0, 1),
status: Default::default(),
slice: ListSlice::unknown_range(1),
})),
},
Content::Dyn(Box::new(YSpan {
origin_left: Some(ID::new(0, 0)),
origin_right: None,
id: ID::new(0, 1),
status: Default::default(),
slice: ListSlice::unknown_range(1),
})),
5,
));
vec.push(Op::new(
ID::new(0, 2),
OpContent::Normal {
content: Content::Dyn(Box::new(YSpan {
origin_left: Some(ID::new(0, 1)),
origin_right: None,
id: ID::new(0, 2),
status: Default::default(),
slice: ListSlice::unknown_range(1),
})),
},
Content::Dyn(Box::new(YSpan {
origin_left: Some(ID::new(0, 1)),
origin_right: None,
id: ID::new(0, 2),
status: Default::default(),
slice: ListSlice::unknown_range(1),
})),
5,
));
assert_eq!(vec.merged_len(), 1);
let merged = vec.get_merged(0).unwrap();
assert_eq!(merged.content.as_normal().unwrap().id(), ContentType::Text);
let text_content = merged.content.as_normal().unwrap().as_dyn().unwrap();
assert_eq!(merged.content.id(), ContentType::Text);
let text_content = merged.content.as_dyn().unwrap();
dbg!(&merged);
assert_eq!(text_content.content_len(), 2);
}
@ -243,28 +235,24 @@ pub mod test {
let mut vec: RleVecWithIndex<Op> = RleVecWithIndex::new();
vec.push(Op::new(
ID::new(0, 1),
OpContent::Normal {
content: Content::Dyn(Box::new(YSpan {
origin_left: Some(ID::new(0, 0)),
origin_right: None,
id: ID::new(0, 1),
status: Default::default(),
slice: ListSlice::unknown_range(4),
})),
},
Content::Dyn(Box::new(YSpan {
origin_left: Some(ID::new(0, 0)),
origin_right: None,
id: ID::new(0, 1),
status: Default::default(),
slice: ListSlice::unknown_range(4),
})),
5,
));
vec.push(Op::new(
ID::new(0, 2),
OpContent::Normal {
content: Content::Dyn(Box::new(YSpan {
origin_left: Some(ID::new(0, 0)),
origin_right: Some(ID::new(0, 1)),
id: ID::new(0, 5),
status: Default::default(),
slice: ListSlice::unknown_range(4),
})),
},
Content::Dyn(Box::new(YSpan {
origin_left: Some(ID::new(0, 0)),
origin_right: Some(ID::new(0, 1)),
id: ID::new(0, 5),
status: Default::default(),
slice: ListSlice::unknown_range(4),
})),
5,
));
assert_eq!(vec.merged_len(), 2);

View file

@ -226,7 +226,9 @@ pub mod fuzz {
impl TestFramework for YataImpl {
fn integrate(container: &mut Self::Container, op: Self::OpUnit) {
container.head_vv.set_end(op.id.inc(op.atom_len() as i32));
container
.current_vv
.set_end(op.id.inc(op.atom_len() as i32));
// SAFETY: we know this is safe because in [YataImpl::insert_after] there is no access to shared elements
unsafe { crdt_list::yata::integrate::<Self>(container, op) };
}
@ -234,18 +236,18 @@ pub mod fuzz {
#[inline]
fn can_integrate(container: &Self::Container, op: &Self::OpUnit) -> bool {
if let Some(value) = op.origin_left {
if !value.is_unknown() && !container.head_vv.includes_id(value) {
if !value.is_unknown() && !container.current_vv.includes_id(value) {
return false;
}
}
if let Some(value) = op.origin_right {
if !value.is_unknown() && !container.head_vv.includes_id(value) {
if !value.is_unknown() && !container.current_vv.includes_id(value) {
return false;
}
}
if op.id.counter != 0 && !container.head_vv.includes_id(op.id.inc(-1)) {
if op.id.counter != 0 && !container.current_vv.includes_id(op.id.inc(-1)) {
return false;
}
@ -300,7 +302,7 @@ pub mod fuzz {
let ans = container.content.get_yspan_at_pos(
ID::new(
container.client_id,
*container.head_vv.get(&container.client_id).unwrap_or(&0),
*container.current_vv.get(&container.client_id).unwrap_or(&0),
),
pos % container.content.len(),
len,

View file

@ -24,18 +24,16 @@ mod test;
use crate::{
change::Lamport,
debug_log,
id::{ClientID, Counter, ID},
span::{CounterSpan, HasId, HasIdSpan, HasLamport, HasLamportSpan, IdSpan},
version::{IdSpanVector, VersionVector, VersionVectorDiff},
};
use self::{
iter::{iter_dag, iter_dag_with_vv, DagIterator, DagIteratorVV, DagPartialIter},
iter::{iter_dag, iter_dag_with_vv, DagCausalIter, DagIterator, DagIteratorVV},
mermaid::dag_to_mermaid,
};
// TODO: use HasId, HasLength
pub(crate) trait DagNode: HasLamport + HasId + HasLength + Debug + Sliceable {
fn deps(&self) -> &[ID];
@ -45,14 +43,6 @@ pub(crate) trait DagNode: HasLamport + HasId + HasLength + Debug + Sliceable {
}
}
#[allow(clippy::ptr_arg)]
fn reverse_path(path: &mut Vec<IdSpan>) {
path.reverse();
for span in path.iter_mut() {
span.counter.reverse();
}
}
/// Dag (Directed Acyclic Graph).
///
/// We have following invariance in DAG
@ -72,7 +62,7 @@ pub(crate) trait DagUtils: Dag {
fn get_vv(&self, id: ID) -> VersionVector;
fn find_path(&self, from: &[ID], to: &[ID]) -> VersionVectorDiff;
fn contains(&self, id: ID) -> bool;
fn iter_partial(&self, from: &[ID], target: IdSpanVector) -> DagPartialIter<'_, Self>
fn iter_causal(&self, from: &[ID], target: IdSpanVector) -> DagCausalIter<'_, Self>
where
Self: Sized;
fn iter(&self) -> DagIterator<'_, Self::Node>
@ -89,6 +79,7 @@ pub(crate) trait DagUtils: Dag {
impl<T: Dag + ?Sized> DagUtils for T {
#[inline]
fn find_common_ancestor(&self, a_id: &[ID], b_id: &[ID]) -> SmallVec<[ID; 2]> {
// TODO: perf: make it also return the spans to reach common_ancestors
find_common_ancestor(&|id| self.get(id), a_id, b_id)
}
@ -104,10 +95,6 @@ impl<T: Dag + ?Sized> DagUtils for T {
fn find_path(&self, from: &[ID], to: &[ID]) -> VersionVectorDiff {
let mut ans = VersionVectorDiff::default();
debug_log!(
"{}",
format!("FINDPATH from={:?} to={:?}", from, to).green()
);
if from == to {
return ans;
}
@ -165,7 +152,6 @@ impl<T: Dag + ?Sized> DagUtils for T {
true,
);
// dbg!(from, to, &ans);
ans
}
@ -178,11 +164,11 @@ impl<T: Dag + ?Sized> DagUtils for T {
}
#[inline(always)]
fn iter_partial(&self, from: &[ID], target: IdSpanVector) -> DagPartialIter<'_, Self>
fn iter_causal(&self, from: &[ID], target: IdSpanVector) -> DagCausalIter<'_, Self>
where
Self: Sized,
{
DagPartialIter::new(self, from.into(), target)
DagCausalIter::new(self, from.into(), target)
}
#[inline(always)]
@ -309,31 +295,6 @@ impl<'a> OrdIdSpan<'a> {
})
}
#[inline]
fn from_dag_node_conservatively<D, F>(id: ID, get: &'a F) -> Option<OrdIdSpan>
where
D: DagNode + 'a,
F: Fn(ID) -> Option<&'a D>,
{
let span = get(id)?;
let span_id = span.id_start();
if span_id == id {
Some(OrdIdSpan {
id: span_id,
lamport: span.lamport(),
deps: Cow::Borrowed(span.deps()),
len: 1,
})
} else {
Some(OrdIdSpan {
id: span_id.inc(1),
lamport: span.lamport() + 1,
deps: Cow::Owned(vec![span_id]),
len: (id.counter - span_id.counter) as usize,
})
}
}
#[inline]
fn get_min(&self) -> OrdIdSpan<'a> {
OrdIdSpan {
@ -468,10 +429,7 @@ where
NodeType::Shared => {}
}
if a_count == 0
&& b_count == 0
&& (!find_path || min.is_none() || &node <= min.as_ref().unwrap())
{
if a_count == 0 && b_count == 0 && (min.is_none() || &node <= min.as_ref().unwrap()) {
if node_type != NodeType::Shared {
ans.clear();
}
@ -516,27 +474,6 @@ where
ans
}
fn update_frontier(frontier: &mut Vec<ID>, new_node_id: ID, new_node_deps: &[ID]) {
frontier.retain(|x| {
if x.client_id == new_node_id.client_id && x.counter <= new_node_id.counter {
return false;
}
!new_node_deps
.iter()
.any(|y| y.client_id == x.client_id && y.counter >= x.counter)
});
// nodes from the same client with `counter < new_node_id.counter`
// are filtered out from frontier.
if frontier
.iter()
.all(|x| x.client_id != new_node_id.client_id)
{
frontier.push(new_node_id);
}
}
fn _find_common_ancestor_new<'a, F, D>(get: &'a F, left: &[ID], right: &[ID]) -> SmallVec<[ID; 2]>
where
D: DagNode + 'a,
@ -647,3 +584,13 @@ where
ans
}
pub fn remove_included_frontiers(frontiers: &mut VersionVector, new_change_deps: &[ID]) {
for dep in new_change_deps.iter() {
if let Some(last) = frontiers.get_last(dep.client_id) {
if last <= dep.counter {
frontiers.remove(&dep.client_id);
}
}
}
}

View file

@ -176,7 +176,7 @@ impl<'a, T: DagNode> Iterator for DagIteratorVV<'a, T> {
/// Visit every span in the target IdSpanVector.
/// It's guaranteed that the spans are visited in causal order, and each span is visited only once.
/// When visiting a span, we will checkout to the version where the span was created
pub(crate) struct DagPartialIter<'a, Dag> {
pub(crate) struct DagCausalIter<'a, Dag> {
dag: &'a Dag,
frontier: SmallVec<[ID; 2]>,
target: IdSpanVector,
@ -191,7 +191,7 @@ pub(crate) struct IterReturn<'a, T> {
pub slice: Range<Counter>,
}
impl<'a, T: DagNode, D: Dag<Node = T>> DagPartialIter<'a, D> {
impl<'a, T: DagNode, D: Dag<Node = T>> DagCausalIter<'a, D> {
pub fn new(dag: &'a D, from: SmallVec<[ID; 2]>, target: IdSpanVector) -> Self {
let mut heap = BinaryHeap::new();
for id in target.iter() {
@ -215,7 +215,7 @@ impl<'a, T: DagNode, D: Dag<Node = T>> DagPartialIter<'a, D> {
}
}
impl<'a, T: DagNode + 'a, D: Dag<Node = T>> Iterator for DagPartialIter<'a, D> {
impl<'a, T: DagNode + 'a, D: Dag<Node = T>> Iterator for DagCausalIter<'a, D> {
type Item = IterReturn<'a, T>;
fn next(&mut self) -> Option<Self::Item> {
@ -230,8 +230,7 @@ impl<'a, T: DagNode + 'a, D: Dag<Node = T>> Iterator for DagPartialIter<'a, D> {
return None;
}
let node_id = self.heap.pop().unwrap();
let node_id = node_id.id;
let node_id = self.heap.pop().unwrap().id;
let target_span = self.target.get_mut(&node_id.client_id).unwrap();
debug_assert_eq!(
node_id.counter,

View file

@ -1,6 +1,5 @@
use super::*;
struct BreakPoints {
vv: VersionVector,
break_points: FxHashMap<ClientID, FxHashSet<Counter>>,
/// start ID to ID. The target ID may be in the middle of an op.
///
@ -115,7 +114,6 @@ fn break_points_to_output(input: BreakPoints) -> Output {
fn get_dag_break_points<T: DagNode>(dag: &impl Dag<Node = T>) -> BreakPoints {
let mut break_points = BreakPoints {
vv: dag.vv(),
break_points: FxHashMap::default(),
links: FxHashMap::default(),
};

View file

@ -180,6 +180,27 @@ impl TestDag {
}
}
fn update_frontier(frontier: &mut Vec<ID>, new_node_id: ID, new_node_deps: &[ID]) {
frontier.retain(|x| {
if x.client_id == new_node_id.client_id && x.counter <= new_node_id.counter {
return false;
}
!new_node_deps
.iter()
.any(|y| y.client_id == x.client_id && y.counter >= x.counter)
});
// nodes from the same client with `counter < new_node_id.counter`
// are filtered out from frontier.
if frontier
.iter()
.all(|x| x.client_id != new_node_id.client_id)
{
frontier.push(new_node_id);
}
}
fn _try_push_node(
&mut self,
node: &TestNode,
@ -194,7 +215,7 @@ impl TestDag {
pending.push((client_id, i));
return true;
}
update_frontier(&mut self.frontier, node.id_last(), &node.deps);
Self::update_frontier(&mut self.frontier, node.id_last(), &node.deps);
let contains_start = self.contains(node.id_start());
let arr = self.nodes.entry(client_id).or_default();
if contains_start {
@ -647,7 +668,7 @@ mod find_path {
#[test]
fn proptest_path_large(
interactions in prop::collection::vec(gen_interaction(10), 0..1 * PROPTEST_FACTOR_10 * PROPTEST_FACTOR_10 + 10),
interactions in prop::collection::vec(gen_interaction(10), 0..PROPTEST_FACTOR_10 * PROPTEST_FACTOR_10 + 10),
) {
test_find_path(10, interactions)?;
}
@ -1152,7 +1173,7 @@ mod dag_partial_iter {
forward,
retreat,
slice,
} in a.iter_partial(&[node.id], diff_spans.clone())
} in a.iter_causal(&[node.id], diff_spans.clone())
{
let sliced = data.slice(slice.start as usize, slice.end as usize);
{

View file

@ -176,18 +176,18 @@ impl Actionable for Vec<LoroCore> {
Action::Ins { pos, site, .. } => {
*site %= self.len() as u8;
let text = self[*site as usize].get_text("text");
change_pos_to_char_boundary(pos, text.text_len());
change_pos_to_char_boundary(pos, text.len());
}
Action::Del { pos, len, site } => {
*site %= self.len() as u8;
let text = self[*site as usize].get_text("text");
if text.text_len() == 0 {
if text.len() == 0 {
*len = 0;
*pos = 0;
return;
}
change_delete_to_char_boundary(pos, len, text.text_len());
change_delete_to_char_boundary(pos, len, text.len());
}
Action::Sync { from, to } => {
*from %= self.len() as u8;
@ -248,7 +248,7 @@ pub fn test_single_client(mut actions: Vec<Action>) {
text_container.insert(&store, *pos, &content.to_string());
}
Action::Del { pos, len, .. } => {
if text_container.text_len() == 0 {
if text_container.len() == 0 {
return;
}
@ -283,7 +283,7 @@ pub fn test_single_client_encode(mut actions: Vec<Action>) {
text_container.insert(&store, *pos, &content.to_string());
}
Action::Del { pos, len, .. } => {
if text_container.text_len() == 0 {
if text_container.is_empty() {
return;
}
@ -363,10 +363,8 @@ where
candidates.drain(0..30);
}
}
if start.elapsed().as_secs() > 10 {
if minified.len() <= 4 {
break;
}
if start.elapsed().as_secs() > 10 && minified.len() <= 4 {
break;
}
if start.elapsed().as_secs() > 60 {
break;
@ -625,7 +623,7 @@ mod test {
fn case0() {
test_multi_sites(
4,
&mut vec![
&mut [
Ins {
content: 31800,
pos: 723390690148040714,
@ -646,9 +644,217 @@ mod test {
)
}
#[test]
fn case_two() {
test_multi_sites(
3,
&mut [
Ins {
content: 35108,
pos: 0,
site: 2,
},
Ins {
content: 18218,
pos: 0,
site: 7,
},
Ins {
content: 65280,
pos: 2,
site: 7,
},
],
)
}
#[test]
fn mini() {
minify_error(8, vec![], test_multi_sites, normalize)
minify_error(
8,
vec![
Ins {
content: 35108,
pos: 0,
site: 2,
},
Ins {
content: 18218,
pos: 0,
site: 7,
},
Ins {
content: 35624,
pos: 0,
site: 0,
},
Ins {
content: 38400,
pos: 0,
site: 6,
},
Ins {
content: 65280,
pos: 2,
site: 7,
},
Ins {
content: 4626,
pos: 5,
site: 0,
},
Ins {
content: 60672,
pos: 0,
site: 1,
},
Ins {
content: 35072,
pos: 1,
site: 2,
},
Ins {
content: 15035,
pos: 3,
site: 0,
},
Ins {
content: 65280,
pos: 0,
site: 7,
},
Ins {
content: 4626,
pos: 0,
site: 0,
},
Ins {
content: 201,
pos: 2,
site: 2,
},
Ins {
content: 65377,
pos: 3,
site: 1,
},
Ins {
content: 9988,
pos: 0,
site: 0,
},
Ins {
content: 4626,
pos: 14,
site: 0,
},
Ins {
content: 4626,
pos: 11,
site: 7,
},
Ins {
content: 1070,
pos: 0,
site: 5,
},
Ins {
content: 27421,
pos: 7,
site: 1,
},
Ins {
content: 65121,
pos: 22,
site: 0,
},
Ins {
content: 65462,
pos: 1,
site: 0,
},
Ins {
content: 4626,
pos: 0,
site: 4,
},
Ins {
content: 4626,
pos: 16,
site: 0,
},
Ins {
content: 65462,
pos: 11,
site: 2,
},
Ins {
content: 48009,
pos: 10,
site: 0,
},
Ins {
content: 23277,
pos: 7,
site: 0,
},
Ins {
content: 60672,
pos: 13,
site: 1,
},
Ins {
content: 4626,
pos: 2,
site: 7,
},
Ins {
content: 4626,
pos: 2,
site: 0,
},
Ins {
content: 2606,
pos: 0,
site: 3,
},
Ins {
content: 65270,
pos: 10,
site: 0,
},
SyncAll,
Ins {
content: 65462,
pos: 107,
site: 4,
},
SyncAll,
Ins {
content: 4626,
pos: 98,
site: 0,
},
SyncAll,
Ins {
content: 0,
pos: 0,
site: 0,
},
Del {
pos: 0,
len: 147,
site: 0,
},
Ins {
content: 0,
pos: 146,
site: 4,
},
],
test_multi_sites,
normalize,
)
}
#[test]

View file

@ -41,7 +41,6 @@ pub enum Action {
}
struct Actor {
site: ClientID,
loro: LoroCore,
map_containers: Vec<Map>,
list_containers: Vec<List>,
@ -176,8 +175,8 @@ impl Actionable for Vec<Actor> {
.list_containers
.get(*container_idx as usize)
{
*key %= (list.values_len() as u8).max(1);
if *value == FuzzValue::Null && list.values_len() == 0 {
*key %= (list.len() as u8).max(1);
if *value == FuzzValue::Null && list.len() == 0 {
// no value, cannot delete
*value = FuzzValue::I32(1);
}
@ -201,10 +200,10 @@ impl Actionable for Vec<Actor> {
.text_containers
.get(*container_idx as usize)
{
*pos %= (text.text_len() as u8).max(1);
*pos %= (text.len() as u8).max(1);
if *is_del {
*value &= 0x1f;
*value = (*value).min(text.text_len() as u16 - (*pos) as u16);
*value = (*value).min(text.len() as u16 - (*pos) as u16);
}
} else {
*is_del = false;
@ -452,7 +451,6 @@ pub fn normalize(site_num: u8, actions: &mut [Action]) -> Vec<Action> {
let mut sites = Vec::new();
for i in 0..site_num {
sites.push(Actor {
site: i as u64,
loro: LoroCore::new(Default::default(), Some(i as u64)),
map_containers: Default::default(),
list_containers: Default::default(),
@ -484,7 +482,6 @@ pub fn test_multi_sites(site_num: u8, actions: &mut [Action]) {
let mut sites = Vec::new();
for i in 0..site_num {
sites.push(Actor {
site: i as u64,
loro: LoroCore::new(Default::default(), Some(i as u64)),
map_containers: Default::default(),
list_containers: Default::default(),
@ -549,34 +546,34 @@ mod failed_tests {
test_multi_sites(
3,
&mut [
Map {
site: 139,
container_idx: 198,
key: 190,
value: I32(533294902),
},
Text {
site: 167,
container_idx: 182,
pos: 106,
value: 20544,
site: 2,
container_idx: 0,
pos: 0,
value: 39064,
is_del: false,
},
SyncAll,
Text {
site: 0,
container_idx: 0,
pos: 0,
value: 0,
is_del: false,
},
Text {
site: 99,
container_idx: 33,
pos: 126,
value: 35453,
site: 2,
container_idx: 0,
pos: 5,
value: 39064,
is_del: false,
},
Text {
site: 87,
container_idx: 1,
pos: 84,
value: 5821,
is_del: true,
List {
site: 1,
container_idx: 0,
key: 0,
value: I32(1616928864),
},
Sync { from: 85, to: 226 },
],
)
}
@ -648,57 +645,33 @@ mod failed_tests {
minify_error(
5,
vec![
List {
site: 1,
Text {
site: 2,
container_idx: 0,
key: 0,
value: Container(C::List),
},
List {
site: 4,
container_idx: 0,
key: 0,
value: Container(C::List),
pos: 0,
value: 39064,
is_del: false,
},
SyncAll,
List {
site: 1,
container_idx: 1,
key: 0,
value: Container(C::List),
Text {
site: 0,
container_idx: 0,
pos: 0,
value: 0,
is_del: false,
},
Text {
site: 2,
container_idx: 0,
pos: 5,
value: 39064,
is_del: false,
},
List {
site: 1,
container_idx: 0,
key: 0,
value: Container(C::List),
},
Sync { from: 1, to: 0 },
List {
site: 4,
container_idx: 0,
key: 0,
value: I32(1),
},
List {
site: 1,
container_idx: 0,
key: 0,
value: Container(C::List),
},
Sync { from: 4, to: 0 },
Sync { from: 1, to: 0 },
List {
site: 4,
container_idx: 1,
key: 0,
value: Null,
},
List {
site: 1,
container_idx: 1,
key: 0,
value: Container(C::List),
value: I32(1616928864),
},
],
test_multi_sites,

View file

@ -98,6 +98,7 @@ impl ID {
}
#[inline]
#[allow(dead_code)]
pub(crate) fn is_connected_id(&self, other: &Self, self_len: usize) -> bool {
self.client_id == other.client_id && self.counter + self_len as Counter == other.counter
}

View file

@ -3,7 +3,6 @@
//!
//!
//!
#![allow(dead_code)]
#![deny(clippy::undocumented_unsafe_blocks)]
pub mod change;
@ -32,11 +31,12 @@ pub use error::LoroError;
pub(crate) mod macros;
pub(crate) use change::{Lamport, Timestamp};
pub(crate) use id::{ClientID, ID};
pub(crate) use op::{ContentType, InsertContentTrait, Op, OpType};
pub(crate) use op::{ContentType, InsertContentTrait, Op};
pub(crate) type InternalString = DefaultAtom;
pub(crate) use container::Container;
pub use container::{list::List, map::Map, text::Text, Container, ContainerType};
pub use container::{list::List, map::Map, text::Text, ContainerType};
pub use log_store::LogStore;
pub use loro::LoroCore;
pub use value::LoroValue;

View file

@ -2,13 +2,14 @@
//!
//!
mod encoding;
mod import;
mod iter;
use std::{
marker::PhantomPinned,
sync::{Arc, Mutex, RwLock, Weak},
sync::{Arc, Mutex, MutexGuard, RwLock},
};
use fxhash::{FxHashMap, FxHashSet};
use fxhash::FxHashMap;
use rle::{HasLength, RleVec, RleVecWithIndex, Sliceable};
@ -19,14 +20,13 @@ use crate::{
configure::Configure,
container::{
registry::{ContainerInstance, ContainerRegistry},
text::text_content::ListSlice,
Container, ContainerID,
},
dag::Dag,
debug_log,
id::{ClientID, ContainerIdx, Counter},
op::{Content, OpContent, RemoteOp},
span::{HasCounterSpan, HasIdSpan, HasLamportSpan, IdSpan},
op::RemoteOp,
span::{HasCounterSpan, HasIdSpan, IdSpan},
ContainerType, Lamport, Op, Timestamp, VersionVector, ID,
};
@ -48,8 +48,8 @@ impl Default for GcConfig {
}
}
pub(crate) type LogStoreRef = Arc<RwLock<LogStore>>;
pub(crate) type LogStoreWeakRef = Weak<RwLock<LogStore>>;
type ClientChanges = FxHashMap<ClientID, RleVecWithIndex<Change, ChangeMergeCfg>>;
type RemoteClientChanges = FxHashMap<ClientID, RleVecWithIndex<Change<RemoteOp>, ChangeMergeCfg>>;
#[derive(Debug)]
/// LogStore stores the full history of Loro
@ -60,18 +60,20 @@ pub(crate) type LogStoreWeakRef = Weak<RwLock<LogStore>>;
///
/// TODO: Refactor we need to move the things about the current state out of LogStore (container, latest_lamport, ..)
pub struct LogStore {
changes: FxHashMap<ClientID, RleVecWithIndex<Change, ChangeMergeCfg>>,
changes: ClientChanges,
vv: VersionVector,
cfg: Configure,
latest_lamport: Lamport,
latest_timestamp: Timestamp,
pub(crate) this_client_id: ClientID,
frontier: SmallVec<[ID; 2]>,
frontiers: SmallVec<[ID; 2]>,
/// CRDT container manager
pub(crate) reg: ContainerRegistry,
_pin: PhantomPinned,
}
type ContainerGuard<'a> = MutexGuard<'a, ContainerInstance>;
impl LogStore {
pub(crate) fn new(mut cfg: Configure, client_id: Option<ClientID>) -> Arc<RwLock<Self>> {
let this_client_id = client_id.unwrap_or_else(|| cfg.rand.next_u64());
@ -81,7 +83,7 @@ impl LogStore {
changes: FxHashMap::default(),
latest_lamport: 0,
latest_timestamp: 0,
frontier: Default::default(),
frontiers: Default::default(),
vv: Default::default(),
reg: ContainerRegistry::new(),
_pin: PhantomPinned,
@ -95,32 +97,26 @@ impl LogStore {
.map(|changes| changes.get(id.counter as usize).unwrap().element)
}
pub fn import(&mut self, mut changes: Vec<Change<RemoteOp>>) {
let self_vv = self.vv();
// guarantee that changes are applied in causal order
changes.sort_by_cached_key(|x| x.lamport);
for change in changes
.into_iter()
.filter(|x| !self_vv.includes_id(x.id_last()))
{
check_import_change_valid(&change);
// TODO: cache pending changes
assert!(change.deps.iter().all(|x| self.vv().includes_id(*x)));
self.apply_remote_change(change)
}
}
pub fn export(&self, remote_vv: &VersionVector) -> Vec<Change<RemoteOp>> {
let mut ans = Vec::default();
pub fn export(
&self,
remote_vv: &VersionVector,
) -> FxHashMap<ClientID, RleVecWithIndex<Change<RemoteOp>, ChangeMergeCfg>> {
let mut ans: FxHashMap<ClientID, RleVecWithIndex<Change<RemoteOp>, ChangeMergeCfg>> =
Default::default();
let self_vv = self.vv();
let diff = self_vv.diff(remote_vv);
for span in diff.left.iter() {
let changes = self.get_changes_slice(span.id_span());
for change in changes.iter() {
ans.push(self.change_to_export_format(change))
let vec = ans
.entry(change.id.client_id)
.or_insert_with(|| RleVecWithIndex::new_cfg(self.get_change_merge_cfg()));
vec.push(self.change_to_export_format(change));
}
}
debug_log!("export {:#?}", &ans);
ans
}
@ -141,13 +137,17 @@ impl LogStore {
}
}
fn change_to_imported_format(&mut self, change: Change<RemoteOp>) -> Change {
fn change_to_imported_format(
&mut self,
change: &Change<RemoteOp>,
containers: &mut FxHashMap<ContainerID, ContainerGuard>,
) -> Change {
let mut new_ops = RleVec::new();
for mut op in change.ops.into_iter() {
let container = self.reg.get_or_create(&op.container);
let mut container = container.lock().unwrap();
for op in change.ops.iter() {
let container = containers.get_mut(&op.container).unwrap();
// TODO: avoid this clone
let mut op = op.clone();
container.to_import(&mut op);
drop(container);
for op in op.convert(self) {
new_ops.push(op);
}
@ -155,7 +155,7 @@ impl LogStore {
Change {
ops: new_ops,
deps: change.deps,
deps: change.deps.clone(),
id: change.id,
lamport: change.lamport,
timestamp: change.timestamp,
@ -185,22 +185,10 @@ impl LogStore {
op
}
pub(crate) fn create_container(
&mut self,
container_type: ContainerType,
parent: ContainerID,
) -> ContainerID {
pub(crate) fn create_container(&mut self, container_type: ContainerType) -> ContainerID {
let id = self.next_id();
let container_id = ContainerID::new_normal(id, container_type);
let parent_idx = self.get_container_idx(&parent).unwrap();
self.append_local_ops(&[Op::new(
id,
OpContent::Normal {
content: Content::Container(container_id.clone()),
},
parent_idx,
)]);
self.reg.get_or_create(&container_id);
self.reg.register(&container_id);
container_id
}
@ -231,29 +219,14 @@ impl LogStore {
}
#[inline(always)]
pub fn frontier(&self) -> &[ID] {
&self.frontier
pub fn frontiers(&self) -> &[ID] {
&self.frontiers
}
fn update_frontier(&mut self, clear: &[ID], new: &[ID]) {
self.frontier.retain(|x| {
!clear
.iter()
.any(|y| x.client_id == y.client_id && x.counter <= y.counter)
&& !new
.iter()
.any(|y| x.client_id == y.client_id && x.counter <= y.counter)
});
for next in new.iter() {
if self
.frontier
.iter()
.any(|x| x.client_id == next.client_id && x.counter >= next.counter)
{
continue;
}
self.frontier.push(*next);
fn get_change_merge_cfg(&self) -> ChangeMergeCfg {
ChangeMergeCfg {
max_change_length: self.cfg.change.max_change_length,
max_change_interval: self.cfg.change.max_change_interval,
}
}
@ -274,7 +247,7 @@ impl LogStore {
let last_id = ID::new(self.this_client_id, last_ctr);
let change = Change {
id,
deps: std::mem::replace(&mut self.frontier, smallvec::smallvec![last_id]),
deps: std::mem::replace(&mut self.frontiers, smallvec::smallvec![last_id]),
ops: ops.into(),
lamport,
timestamp,
@ -283,57 +256,15 @@ impl LogStore {
self.latest_lamport = lamport + change.content_len() as u32 - 1;
self.latest_timestamp = timestamp;
self.vv.set_end(change.id_end());
let cfg = self.get_change_merge_cfg();
self.changes
.entry(self.this_client_id)
.or_insert_with(|| RleVecWithIndex::new_with_conf(ChangeMergeCfg::new()))
.or_insert_with(|| RleVecWithIndex::new_with_conf(cfg))
.push(change);
debug_log!("CHANGES---------------- site {}", self.this_client_id);
}
pub fn apply_remote_change(&mut self, change: Change<RemoteOp>) {
if self.contains(change.id_last()) {
return;
}
debug_log!("Client {} Apply {:#?}", self.this_client_id, &change);
for dep in &change.deps {
if !self.contains(*dep) {
unimplemented!("need impl pending changes");
}
}
// TODO: find a way to remove this clone? we don't need change in apply method actually
let change = self.change_to_imported_format(change);
let change_id_span = change.id_span();
let change_deps = change.deps.clone();
let change_last_lamport = change.lamport_last();
let change_time = change.timestamp;
let changes = self
.changes
.entry(change.id.client_id)
.or_insert_with(RleVecWithIndex::new);
let mut set = FxHashSet::default();
for op in change.ops.iter() {
set.insert(op.container);
}
changes.push(change);
// Apply ops.
// NOTE: applying expects that log_store has store the Change, and updated self vv
for container in set {
let mut container = self.reg.get_by_idx(container).unwrap().lock().unwrap();
container.apply(change_id_span, self);
}
self.vv.set_end(change_id_span.id_end());
self.update_frontier(&change_deps, &[change_id_span.id_last()]);
self.latest_lamport = self.latest_lamport.max(change_last_lamport);
self.latest_timestamp = self.latest_timestamp.max(change_time);
}
#[inline]
pub fn contains(&self, id: ID) -> bool {
self.changes
@ -351,6 +282,7 @@ impl LogStore {
}
#[inline]
#[allow(dead_code)]
pub(crate) fn iter_client_op(&self, client_id: ClientID) -> iter::ClientOpIter<'_> {
iter::ClientOpIter {
change_index: 0,
@ -359,13 +291,8 @@ impl LogStore {
}
}
pub(crate) fn iter_ops_at_id_span(
&self,
id_span: IdSpan,
container: ContainerID,
) -> iter::OpSpanIter<'_> {
let idx = self.get_container_idx(&container).unwrap();
iter::OpSpanIter::new(&self.changes, id_span, idx)
pub(crate) fn iter_ops_at_id_span(&self, id_span: IdSpan) -> iter::OpSpanIter<'_> {
iter::OpSpanIter::new(&self.changes, id_span)
}
#[inline(always)]
@ -404,7 +331,6 @@ impl LogStore {
self.reg.get_idx(container)
}
#[inline(always)]
pub fn get_or_create_container(
&mut self,
container: &ContainerID,
@ -432,29 +358,10 @@ impl Dag for LogStore {
}
fn frontier(&self) -> &[ID] {
&self.frontier
&self.frontiers
}
fn vv(&self) -> crate::VersionVector {
self.vv.clone()
}
}
fn check_import_change_valid(change: &Change<RemoteOp>) {
if cfg!(test) {
for op in change.ops.iter() {
for content in op.contents.iter() {
if let Some((slice, _)) = content
.as_normal()
.and_then(|x| x.as_list())
.and_then(|x| x.as_insert())
{
assert!(matches!(
slice,
ListSlice::RawData(_) | ListSlice::RawStr(_) | ListSlice::Unknown(_)
))
}
}
}
}
}

View file

@ -18,8 +18,9 @@ use crate::{
text::text_content::ListSlice,
ContainerID,
},
dag::remove_included_frontiers,
id::{ClientID, ContainerIdx, Counter, ID},
op::{Content, Op, OpContent, RemoteOp},
op::{Content, Op, RemoteOp},
smstring::SmString,
span::{HasIdSpan, HasLamportSpan},
ContainerType, InternalString, LogStore, LoroValue, VersionVector,
@ -127,11 +128,7 @@ fn encode_changes(store: &LogStore) -> Encoded {
let mut op_len = 0;
for (op, container) in remote_ops.into_iter().zip(containers.into_iter()) {
for content in op.contents.into_iter() {
let content = content.into_normal().unwrap();
let (prop, gc, value) = match content {
crate::op::Content::Container(_) => {
todo!();
}
crate::op::Content::Map(MapSet { key, value }) => (
*key_to_idx.entry(key.clone()).or_insert_with(|| {
keys.push(key);
@ -223,7 +220,7 @@ fn decode_changes(
let mut changes = FxHashMap::default();
let mut deps_iter = deps.into_iter();
for container in containers.iter() {
container_reg.get_or_create(container);
container_reg.register(container);
}
for change_encoding in change_encodings {
@ -288,7 +285,7 @@ fn decode_changes(
let op = Op {
counter: op_counter,
container,
content: OpContent::Normal { content },
content,
};
op_counter += op.content_len() as i32;
@ -317,7 +314,7 @@ fn decode_changes(
let mut frontier = vv.clone();
for (_, changes) in changes.iter() {
for change in changes.iter() {
update_frontiers(&mut frontier, &change.deps);
remove_included_frontiers(&mut frontier, &change.deps);
}
}
@ -338,7 +335,7 @@ fn decode_changes(
latest_lamport,
latest_timestamp,
this_client_id,
frontier: frontier.get_head(),
frontiers: frontier.get_frontiers(),
reg: container_reg,
_pin: PhantomPinned,
}))
@ -360,13 +357,3 @@ impl LogStore {
decode_changes(encoded, client_id, cfg)
}
}
fn update_frontiers(frontiers: &mut VersionVector, new_change_deps: &[ID]) {
for dep in new_change_deps.iter() {
if let Some(last) = frontiers.get_last(dep.client_id) {
if last <= dep.counter {
frontiers.remove(&dep.client_id);
}
}
}
}

View file

@ -0,0 +1,237 @@
use crate::LogStore;
use std::{ops::ControlFlow, sync::MutexGuard};
use fxhash::FxHashMap;
use rle::{HasLength, RleVecWithIndex, Sliceable};
use crate::{
container::{registry::ContainerInstance, Container, ContainerID},
dag::{remove_included_frontiers, DagUtils},
debug_log,
id::ContainerIdx,
op::RichOp,
span::{HasCounter, HasIdSpan, HasLamportSpan, IdSpan},
version::are_frontiers_eq,
VersionVector,
};
use super::{ContainerGuard, RemoteClientChanges};
impl LogStore {
/// Import remote clients' changes into the local log store.
///
/// # How does it work
///
/// > The core algorithm is in the [`LogStore::apply`] method.
///
/// - First, we remove all the changes that are already included in the local log store.
/// And cache the changes whose dependencies are not included.
/// - Then, we append the changes to the local log store.
/// - Apply
/// - Check whether we can apply the change directly by testing whether self.frontiers == common ancestors.
/// If so, we apply the change directly and return.
/// - Otherwise
/// - Stage 1: we iterate over the new changes by causal order, and record them to the tracker.
/// - Stage 2: we calculate the effects of the new changes, and apply them to the state.
/// - Update the rest of the log store state.
pub fn import(&mut self, mut changes: RemoteClientChanges) {
if let ControlFlow::Break(_) = self.tailor_changes(&mut changes) {
return;
}
let mut container_map: FxHashMap<ContainerID, ContainerGuard> = Default::default();
self.lock_related_containers(&changes, &mut container_map);
let (next_vv, next_frontiers) = self.push_changes(changes, &mut container_map);
let container_map: FxHashMap<ContainerIdx, ContainerGuard> = container_map
.into_iter()
.map(|(k, v)| (self.reg.get_idx(&k).unwrap(), v))
.collect();
self.apply(&next_frontiers, &next_vv, container_map);
self.update_version_info(next_vv, next_frontiers);
}
fn update_version_info(&mut self, next_vv: VersionVector, next_frontiers: VersionVector) {
self.vv = next_vv;
self.frontiers = next_frontiers.get_frontiers();
self.latest_lamport = self
.changes
.iter()
.map(|(_, v)| v.last().unwrap().lamport_last())
.max()
.unwrap();
self.latest_timestamp = self
.changes
.iter()
.map(|(_, v)| v.last().unwrap().timestamp)
.max()
.unwrap();
}
fn push_changes(
&mut self,
changes: RemoteClientChanges,
container_map: &mut FxHashMap<ContainerID, MutexGuard<ContainerInstance>>,
) -> (VersionVector, VersionVector) {
let mut next_vv: VersionVector = self.vv.clone();
let mut next_frontiers: VersionVector = self.frontiers.iter().copied().collect();
for (_, changes) in changes.iter() {
next_frontiers.set_end(changes.last().unwrap().id_end());
next_vv.set_end(changes.last().unwrap().id_end());
}
// push changes to log stores
let cfg = self.get_change_merge_cfg();
for (client_id, changes) in changes.iter() {
let mut inner_changes = Vec::with_capacity(changes.len());
for change in changes.iter() {
remove_included_frontiers(&mut next_frontiers, &change.deps);
let change = self.change_to_imported_format(change, container_map);
inner_changes.push(change);
}
let rle = self
.changes
.entry(*client_id)
.or_insert_with(|| RleVecWithIndex::new_cfg(cfg.clone()));
for change in inner_changes {
rle.push(change);
}
}
(next_vv, next_frontiers)
}
fn apply(
&mut self,
next_frontiers: &VersionVector,
next_vv: &VersionVector,
mut container_map: FxHashMap<u32, MutexGuard<ContainerInstance>>,
) {
let latest_frontiers = next_frontiers.get_frontiers();
debug_log!(
"FIND COMMON ANCESTORS self={:?} latest={:?}",
&self.frontiers,
&latest_frontiers
);
let common_ancestors = self.find_common_ancestor(&self.frontiers, &latest_frontiers);
if are_frontiers_eq(&common_ancestors, &self.frontiers) {
// we may apply changes directly into state
let target_spans = next_vv.diff(&self.vv).left;
if target_spans.len() == 1 {
let (client_id, span) = target_spans.iter().next().unwrap();
for op in self.iter_ops_at_id_span(IdSpan::new(*client_id, span.start, span.end)) {
let container = container_map.get_mut(&op.op().container).unwrap();
container.update_state_directly(&op);
}
return;
}
// TODO: can reuse this path
let causal_visit_path: Vec<_> =
self.iter_causal(&common_ancestors, target_spans).collect();
if causal_visit_path
.iter()
.all(|x| x.retreat.is_empty() && x.forward.is_empty())
{
// can update containers state directly without consulting CRDT
for iter in causal_visit_path {
let start = iter.slice.start;
let end = iter.slice.end;
let change = iter.data;
for op in change.ops.iter() {
let rich_op = RichOp::new_by_slice_on_change(change, op, start, end);
if rich_op.atom_len() == 0 {
continue;
}
let container = container_map.get_mut(&op.container).unwrap();
container.update_state_directly(&rich_op);
}
}
return;
}
}
let mut common_ancestors_vv = self.vv.clone();
common_ancestors_vv.retreat(&self.find_path(&common_ancestors, &self.frontiers).right);
for (_, container) in container_map.iter_mut() {
container.tracker_checkout(&common_ancestors_vv);
}
for iter in self.iter_causal(&common_ancestors, next_vv.diff(&common_ancestors_vv).left) {
let start = iter.slice.start;
let end = iter.slice.end;
let change = iter.data;
debug_log!("iter {:#?}", &iter);
// TODO: perf: we can make iter_causal returns target vv and only
// checkout the related container to the target vv
for (_, container) in container_map.iter_mut() {
container.track_retreat(&iter.retreat);
container.track_forward(&iter.forward);
}
for op in change.ops.iter() {
let rich_op = RichOp::new_by_slice_on_change(change, op, start, end);
if rich_op.atom_len() == 0 {
continue;
}
if let Some(container) = container_map.get_mut(&op.container) {
container.track_apply(&rich_op);
}
}
}
debug_log!("LOGSTORE STAGE 2",);
let path = next_vv.diff(&self.vv).left;
for (_, container) in container_map.iter_mut() {
container.apply_tracked_effects_from(&self.vv, &path);
}
}
/// get the locks of the containers to avoid repeated acquiring and releasing the locks
fn lock_related_containers(
&mut self,
changes: &RemoteClientChanges,
container_map: &mut FxHashMap<ContainerID, MutexGuard<ContainerInstance>>,
) {
for (_, changes) in changes.iter() {
for change in changes.iter() {
for op in change.ops.iter() {
if !container_map.contains_key(&op.container) {
let guard = self.reg.get_or_create(&op.container).lock().unwrap();
container_map
// SAFETY: ignore lifetime issues here, because it's safe for us to store the mutex guard here
.insert(op.container.clone(), unsafe { std::mem::transmute(guard) });
}
}
}
}
}
fn tailor_changes(&mut self, changes: &mut RemoteClientChanges) -> ControlFlow<()> {
changes.retain(|_, v| !v.is_empty());
if changes.is_empty() {
return ControlFlow::Break(());
}
for (client_id, changes) in changes.iter_mut() {
let self_end_ctr = self.vv.get(client_id).copied().unwrap_or(0);
let other_start_ctr = changes.first().unwrap().ctr_start();
match other_start_ctr.cmp(&self_end_ctr) {
std::cmp::Ordering::Less => {
*changes = changes.slice(
(self_end_ctr - other_start_ctr) as usize,
changes.atom_len(),
);
}
std::cmp::Ordering::Equal => {}
std::cmp::Ordering::Greater => {
unimplemented!("cache pending changes");
}
}
}
changes.retain(|_, v| !v.is_empty());
ControlFlow::Continue(())
}
}

View file

@ -1,9 +1,7 @@
use crate::Op;
use crate::change::Lamport;
use crate::id::ClientID;
use crate::id::ContainerIdx;
use crate::op::RichOp;
use crate::span::HasId;
@ -48,7 +46,6 @@ pub struct OpSpanIter<'a> {
changes: &'a [Change],
change_index: usize,
op_index: usize,
container: ContainerIdx,
span: IdSpan,
}
@ -56,7 +53,6 @@ impl<'a> OpSpanIter<'a> {
pub fn new(
changes: &'a FxHashMap<ClientID, RleVecWithIndex<Change, ChangeMergeCfg>>,
target_span: IdSpan,
container: ContainerIdx,
) -> Self {
let rle_changes = changes.get(&target_span.client_id).unwrap();
let changes = rle_changes.vec();
@ -67,7 +63,6 @@ impl<'a> OpSpanIter<'a> {
Self {
span: target_span,
container,
changes,
change_index,
op_index: rle_changes[change_index]
@ -96,20 +91,17 @@ impl<'a> Iterator for OpSpanIter<'a> {
}
self.op_index += 1;
if op.container != self.container {
continue;
}
let start = (self.span.counter.min() - op.counter).max(0) as usize;
let end = ((self.span.counter.end() - op.counter) as usize).min(op.atom_len());
assert!(start < end, "{:?} {:#?}", self.span, op);
return Some(RichOp {
let op = RichOp::new_by_slice_on_change(
change,
op,
start,
end,
lamport: (op.counter - change.id.counter) as Lamport + change.lamport,
timestamp: change.timestamp,
});
self.span.counter.min() - change.id.counter,
self.span.counter.end() - change.id.counter,
);
if op.atom_len() == 0 {
return None;
} else {
return Some(op);
}
} else {
self.op_index = 0;
self.change_index += 1;

View file

@ -1,12 +1,12 @@
use std::sync::{Arc, Mutex, RwLock};
use std::sync::{Arc, RwLock};
use fxhash::FxHashMap;
use rle::RleVecWithIndex;
use crate::{
change::Change,
change::{Change, ChangeMergeCfg},
configure::Configure,
container::{
list::List, map::Map, registry::ContainerInstance, text::Text, ContainerID, ContainerIdRaw,
ContainerType,
},
container::{list::List, map::Map, text::Text, ContainerIdRaw, ContainerType},
id::ClientID,
op::RemoteOp,
LogStore, VersionVector,
@ -66,23 +66,18 @@ impl LoroCore {
.into()
}
#[inline(always)]
pub fn get_container(&self, id: &ContainerID) -> Option<Arc<Mutex<ContainerInstance>>> {
self.log_store
.read()
.unwrap()
.get_container(id)
.unwrap()
.clone()
.into()
}
pub fn export(&self, remote_vv: VersionVector) -> Vec<Change<RemoteOp>> {
pub fn export(
&self,
remote_vv: VersionVector,
) -> FxHashMap<u64, RleVecWithIndex<Change<RemoteOp>, ChangeMergeCfg>> {
let store = self.log_store.read().unwrap();
store.export(&remote_vv)
}
pub fn import(&mut self, changes: Vec<Change<RemoteOp>>) {
pub fn import(
&mut self,
changes: FxHashMap<u64, RleVecWithIndex<Change<RemoteOp>, ChangeMergeCfg>>,
) {
let mut store = self.log_store.write().unwrap();
store.import(changes)
}

View file

@ -23,11 +23,6 @@ macro_rules! fx_map {
#[macro_export]
macro_rules! debug_log {
() => {
// if cfg!(test) {
// $crate::print!("\n")
// }
};
($($arg:tt)*) => {{
if cfg!(test) {
use ::colored::Colorize;
@ -36,6 +31,11 @@ macro_rules! debug_log {
println!($($arg)*);
}
}};
() => {
// if cfg!(test) {
// $crate::print!("\n")
// }
};
}
#[macro_export]

View file

@ -1,19 +1,16 @@
use crate::{
change::{Lamport, Timestamp},
change::{Change, Lamport, Timestamp},
container::ContainerID,
id::{ContainerIdx, Counter, ID},
span::HasCounter,
id::{ClientID, ContainerIdx, Counter, ID},
span::{HasCounter, HasId, HasLamport},
LogStore,
};
use rle::{HasIndex, HasLength, Mergable, RleVec, Sliceable};
mod insert_content;
mod op_content;
mod content;
pub use insert_content::*;
pub use content::*;
use smallvec::{smallvec, SmallVec};
pub(crate) use self::op_content::OpContent;
#[non_exhaustive]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum OpType {
@ -34,19 +31,30 @@ pub enum OpType {
pub struct Op {
pub(crate) counter: Counter,
pub(crate) container: ContainerIdx,
pub(crate) content: OpContent,
pub(crate) content: Content,
}
#[derive(Debug, Clone)]
pub struct RemoteOp {
pub(crate) counter: Counter,
pub(crate) container: ContainerID,
pub(crate) contents: RleVec<[OpContent; 1]>,
pub(crate) contents: RleVec<[Content; 1]>,
}
/// RichOp includes lamport and timestamp info, which is used for conflict resolution.
#[derive(Debug, Clone)]
pub struct RichOp<'a> {
op: &'a Op,
client_id: ClientID,
lamport: Lamport,
timestamp: Timestamp,
start: usize,
end: usize,
}
impl Op {
#[inline]
pub(crate) fn new(id: ID, content: OpContent, container: u32) -> Self {
pub(crate) fn new(id: ID, content: Content, container: u32) -> Self {
Op {
counter: id.counter,
content,
@ -54,19 +62,6 @@ impl Op {
}
}
#[inline]
pub(crate) fn new_insert_op(id: ID, container: u32, content: Content) -> Self {
Op::new(id, OpContent::Normal { content }, container)
}
pub fn op_type(&self) -> OpType {
match self.content {
OpContent::Normal { .. } => OpType::Normal,
OpContent::Undo { .. } => OpType::Undo,
OpContent::Redo { .. } => OpType::Redo,
}
}
pub(crate) fn convert(self, log: &LogStore) -> RemoteOp {
let container = log.reg.get_id(self.container).unwrap().clone();
RemoteOp {
@ -99,35 +94,12 @@ impl RemoteOp {
impl Mergable for Op {
fn is_mergable(&self, other: &Self, cfg: &()) -> bool {
self.counter + self.content_len() as Counter == other.counter
&& self.content.is_mergable(&other.content, cfg)
&& self.container == other.container
&& self.content.is_mergable(&other.content, cfg)
}
fn merge(&mut self, other: &Self, cfg: &()) {
match &mut self.content {
OpContent::Normal { content } => match &other.content {
OpContent::Normal {
content: other_content,
} => {
content.merge(other_content, cfg);
}
_ => unreachable!(),
},
OpContent::Undo { target, .. } => match &other.content {
OpContent::Undo {
target: other_target,
..
} => target.merge(other_target, cfg),
_ => unreachable!(),
},
OpContent::Redo { target, .. } => match &other.content {
OpContent::Redo {
target: other_target,
..
} => target.merge(other_target, cfg),
_ => unreachable!(),
},
}
self.content.merge(&other.content, cfg)
}
}
@ -140,7 +112,7 @@ impl HasLength for Op {
impl Sliceable for Op {
fn slice(&self, from: usize, to: usize) -> Self {
assert!(to > from);
let content: OpContent = self.content.slice(from, to);
let content: Content = self.content.slice(from, to);
Op {
counter: (self.counter + from as Counter),
content,
@ -185,24 +157,6 @@ impl Sliceable for RemoteOp {
}
}
/// RichOp includes lamport and timestamp info, which is used for conflict resolution.
///
/// `lamport` is the lamport of the returned op, to get the lamport of the sliced op, you need to use `lamport + start`
///
pub struct RichOp<'a> {
pub op: &'a Op,
pub lamport: Lamport,
pub timestamp: Timestamp,
pub start: usize,
pub end: usize,
}
impl<'a> RichOp<'a> {
pub fn get_sliced(&self) -> Op {
self.op.slice(self.start, self.end)
}
}
impl HasIndex for Op {
type Int = Counter;
@ -230,3 +184,90 @@ impl HasCounter for RemoteOp {
self.counter
}
}
impl<'a> HasId for RichOp<'a> {
fn id_start(&self) -> ID {
ID {
client_id: self.client_id,
counter: self.op.counter + self.start as Counter,
}
}
}
impl<'a> HasLength for RichOp<'a> {
fn content_len(&self) -> usize {
self.end - self.start
}
}
impl<'a> HasLamport for RichOp<'a> {
fn lamport(&self) -> Lamport {
self.lamport + self.start as Lamport
}
}
impl<'a> RichOp<'a> {
pub fn new(op: &'a Op, client_id: ClientID, lamport: Lamport, timestamp: Timestamp) -> Self {
RichOp {
op,
client_id,
lamport,
timestamp,
start: 0,
end: op.content_len(),
}
}
pub fn new_by_change(change: &Change<Op>, op: &'a Op) -> Self {
let diff = op.counter - change.id.counter;
RichOp {
op,
client_id: change.id.client_id,
lamport: change.lamport + diff as Lamport,
timestamp: change.timestamp,
start: 0,
end: op.atom_len(),
}
}
pub fn new_by_slice_on_change(change: &Change<Op>, op: &'a Op, start: i32, end: i32) -> Self {
debug_assert!(end > start);
let op_index_in_change = op.counter - change.id.counter;
let op_slice_start = (start - op_index_in_change)
.max(0)
.min(op.atom_len() as i32);
let op_slice_end = (end - op_index_in_change).max(0).min(op.atom_len() as i32);
RichOp {
op,
client_id: change.id.client_id,
lamport: change.lamport + op_index_in_change as Lamport,
timestamp: change.timestamp,
start: op_slice_start as usize,
end: op_slice_end as usize,
}
}
pub fn get_sliced(&self) -> Op {
self.op.slice(self.start, self.end)
}
pub fn op(&self) -> &Op {
self.op
}
pub fn client_id(&self) -> u64 {
self.client_id
}
pub fn timestamp(&self) -> i64 {
self.timestamp
}
pub fn start(&self) -> usize {
self.start
}
pub fn end(&self) -> usize {
self.end
}
}

View file

@ -3,12 +3,10 @@ use std::any::{Any, TypeId};
use enum_as_inner::EnumAsInner;
use rle::{HasLength, Mergable, Sliceable};
use crate::container::{list::list_op::ListOp, map::MapSet, ContainerID};
use crate::container::{list::list_op::ListOp, map::MapSet};
#[derive(PartialEq, Eq, Debug, Clone, Copy)]
pub enum ContentType {
/// See [`crate::container::ContainerContent`]
Container,
/// See [`crate::container::text::TextContent`]
Text,
/// See [`crate::container::map::MapInsertContent`]
@ -18,8 +16,7 @@ pub enum ContentType {
}
#[derive(EnumAsInner, Debug)]
pub(crate) enum Content {
Container(ContainerID),
pub enum Content {
Map(MapSet),
List(ListOp),
Dyn(Box<dyn InsertContentTrait>),
@ -31,7 +28,6 @@ impl Clone for Content {
Self::Map(arg0) => Self::Map(arg0.clone()),
Self::List(arg0) => Self::List(arg0.clone()),
Self::Dyn(arg0) => Self::Dyn(arg0.clone_content()),
Content::Container(arg0) => Self::Container(arg0.clone()),
}
}
}
@ -42,7 +38,6 @@ impl Content {
Self::Map(_) => ContentType::Map,
Self::List(_) => ContentType::Text,
Self::Dyn(arg0) => arg0.id(),
Self::Container(_) => ContentType::Container,
}
}
}
@ -101,7 +96,6 @@ impl HasLength for Content {
Content::Map(x) => x.content_len(),
Content::Dyn(x) => x.content_len(),
Content::List(x) => x.content_len(),
Content::Container(_) => 1,
}
}
}
@ -112,7 +106,6 @@ impl Sliceable for Content {
Content::Map(x) => Content::Map(x.slice(from, to)),
Content::Dyn(x) => Content::Dyn(x.slice_content(from, to)),
Content::List(x) => Content::List(x.slice(from, to)),
Content::Container(x) => Content::Container(x.clone()),
}
}
}
@ -126,7 +119,6 @@ impl Mergable for Content {
(Content::Map(x), Content::Map(y)) => x.is_mergable(y, &()),
(Content::List(x), Content::List(y)) => x.is_mergable(y, &()),
(Content::Dyn(x), Content::Dyn(y)) => x.is_mergable_content(&**y),
(Content::Container(_), _) => false,
_ => false,
}
}
@ -145,7 +137,6 @@ impl Mergable for Content {
_ => unreachable!(),
},
Content::Dyn(x) => x.merge_content(&**_other.as_dyn().unwrap()),
Content::Container(_) => unreachable!(),
}
}
}

View file

@ -1,116 +0,0 @@
use enum_as_inner::EnumAsInner;
use rle::{HasLength, Mergable, Sliceable};
use crate::{span::IdSpan, OpType};
use super::Content;
#[derive(Debug, EnumAsInner)]
pub(crate) enum OpContent {
Normal { content: Content },
Undo { target: IdSpan },
Redo { target: IdSpan },
}
impl OpContent {
#[inline]
pub fn op_type(&self) -> OpType {
match self {
OpContent::Normal { .. } => OpType::Normal,
OpContent::Undo { .. } => OpType::Undo,
OpContent::Redo { .. } => OpType::Redo,
}
}
}
impl HasLength for OpContent {
fn content_len(&self) -> usize {
match self {
OpContent::Normal { content, .. } => content.content_len(),
OpContent::Undo { target, .. } => target.atom_len(),
OpContent::Redo { target, .. } => target.atom_len(),
}
}
}
impl Clone for OpContent {
fn clone(&self) -> Self {
match self {
OpContent::Normal { content } => OpContent::Normal {
content: content.clone(),
},
OpContent::Undo { target } => OpContent::Undo { target: *target },
OpContent::Redo { target } => OpContent::Redo { target: *target },
}
}
}
impl Sliceable for OpContent {
fn slice(&self, from: usize, to: usize) -> Self {
match self {
OpContent::Normal { content } => OpContent::Normal {
content: content.slice(from, to),
},
OpContent::Undo { target } => OpContent::Undo {
target: target.slice(from, to),
},
OpContent::Redo { target } => OpContent::Redo {
target: target.slice(from, to),
},
}
}
}
impl Mergable for OpContent {
fn is_mergable(&self, other: &Self, cfg: &()) -> bool
where
Self: Sized,
{
match &self {
OpContent::Normal { content } => match other {
OpContent::Normal {
content: ref other_content,
} => content.is_mergable(other_content, cfg),
_ => false,
},
OpContent::Undo { target } => match other {
OpContent::Undo {
target: ref other_target,
} => target.is_mergable(other_target, cfg),
_ => false,
},
OpContent::Redo { target } => match other {
OpContent::Redo {
target: ref other_target,
} => target.is_mergable(other_target, cfg),
_ => false,
},
}
}
fn merge(&mut self, _other: &Self, _conf: &())
where
Self: Sized,
{
match self {
OpContent::Normal { content } => match _other {
OpContent::Normal {
content: ref other_content,
} => content.merge(other_content, _conf),
_ => unreachable!(),
},
OpContent::Undo { target } => match _other {
OpContent::Undo {
target: ref other_target,
} => target.merge(other_target, _conf),
_ => unreachable!(),
},
OpContent::Redo { target } => match _other {
OpContent::Redo {
target: ref other_target,
} => target.merge(other_target, _conf),
_ => unreachable!(),
},
}
}
}

View file

@ -3,6 +3,7 @@ use std::fmt::Debug;
use crate::{
change::Lamport,
id::{ClientID, Counter, ID},
version::IdSpanVector,
};
use rle::{HasLength, Mergable, Slice, Sliceable};
@ -213,6 +214,12 @@ impl IdSpan {
pub fn end_id(&self) -> ID {
ID::new(self.client_id, self.counter.end())
}
pub fn to_id_span_vec(self) -> IdSpanVector {
let mut out = IdSpanVector::default();
out.insert(self.client_id, self.counter);
out
}
}
impl HasLength for IdSpan {

View file

@ -233,7 +233,7 @@ impl VersionVector {
}
#[inline]
pub fn get_head(&self) -> SmallVec<[ID; 2]> {
pub fn get_frontiers(&self) -> SmallVec<[ID; 2]> {
self.iter()
.filter_map(|(client_id, &counter)| {
if counter > 0 {
@ -350,6 +350,11 @@ impl VersionVector {
}
pub fn shrink_to_exclude(&mut self, span: IdSpan) {
if span.counter.min() == 0 {
self.remove(&span.client_id);
return;
}
if let Some(counter) = self.get_mut(&span.client_id) {
if *counter > span.counter.min() {
*counter = span.counter.min();
@ -436,6 +441,20 @@ pub(crate) struct TotalOrderStamp {
pub(crate) client_id: ClientID,
}
pub fn are_frontiers_eq(a: &[ID], b: &[ID]) -> bool {
if a.len() != b.len() {
return false;
}
let mut a: SmallVec<[ID; 10]> = a.into();
let mut b: SmallVec<[ID; 10]> = b.into();
a.sort();
b.sort();
a == b
}
#[cfg(test)]
mod tests {
use super::*;

View file

@ -4,10 +4,7 @@ use std::{
rc::{Rc, Weak},
};
use loro_core::{
container::{registry::ContainerWrapper, ContainerID},
LoroCore,
};
use loro_core::{container::ContainerID, LoroCore};
use wasm_bindgen::prelude::*;
#[wasm_bindgen]

View file

@ -219,9 +219,9 @@ impl<T> Default for RleVecWithIndex<T> {
}
}
impl<T: Mergable + HasLength> FromIterator<T> for RleVecWithIndex<T> {
impl<T: Mergable<Cfg> + HasLength, Cfg: Default> FromIterator<T> for RleVecWithIndex<T, Cfg> {
fn from_iter<I: IntoIterator<Item = T>>(iter: I) -> Self {
let mut vec = RleVecWithIndex::new();
let mut vec = RleVecWithIndex::new_with_conf(Default::default());
for item in iter {
vec.push(item);
}
@ -300,11 +300,14 @@ impl<T: Mergable<Cfg> + HasLength + Sliceable + Clone, Cfg> Mergable<Cfg>
}
}
impl<T: Mergable + HasLength + Sliceable + Clone> Sliceable for RleVecWithIndex<T> {
impl<T: Mergable<Cfg> + HasLength + Sliceable, Cfg: Clone> Sliceable for RleVecWithIndex<T, Cfg> {
fn slice(&self, start: usize, end: usize) -> Self {
self.slice_iter(start, end)
.map(|x| x.into_inner())
.collect()
let mut ans = RleVecWithIndex::new_with_conf(self.cfg.clone());
for value in self.slice_iter(start, end).map(|x| x.into_inner()) {
ans.push(value);
}
ans
}
}