feat: encode updates

This commit is contained in:
Zixuan Chen 2022-11-29 18:31:57 +08:00
parent a79083c05e
commit d3a0d10b12
19 changed files with 246 additions and 126 deletions

78
Cargo.lock generated
View file

@ -157,15 +157,6 @@ version = "1.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
[[package]]
name = "bitmaps"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "031043d04099746d8db04daf1fa424b2bc8bd69d92b25962dcde24da39ab64a2"
dependencies = [
"typenum",
]
[[package]]
name = "block-buffer"
version = "0.10.3"
@ -276,17 +267,6 @@ dependencies = [
"termcolor",
]
[[package]]
name = "colored"
version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b3616f750b84d8f0de8a58bda93e08e2a81ad3f523089b05f1dffecab48c6cbd"
dependencies = [
"atty",
"lazy_static",
"winapi",
]
[[package]]
name = "console_error_panic_hook"
version = "0.1.7"
@ -671,20 +651,6 @@ version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39"
[[package]]
name = "im"
version = "15.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d0acd33ff0285af998aaf9b57342af478078f53492322fafc47450e09397e0e9"
dependencies = [
"bitmaps",
"rand_core",
"rand_xoshiro",
"sized-chunks",
"typenum",
"version_check",
]
[[package]]
name = "indexmap"
version = "1.9.1"
@ -766,9 +732,7 @@ dependencies = [
"arbitrary",
"arbtest",
"arref",
"bit-vec",
"color-backtrace",
"colored",
"crdt-list",
"criterion",
"ctor",
@ -777,11 +741,10 @@ dependencies = [
"enum-as-inner",
"flate2",
"fxhash",
"im",
"js-sys",
"num",
"owning_ref",
"pin-project",
"postcard",
"proptest",
"proptest-derive",
"rand",
@ -1055,26 +1018,6 @@ dependencies = [
"siphasher",
]
[[package]]
name = "pin-project"
version = "1.0.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ad29a609b6bcd67fee905812e544992d216af9d755757c05ed2d0e15a74c6ecc"
dependencies = [
"pin-project-internal",
]
[[package]]
name = "pin-project-internal"
version = "1.0.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "069bdb1e05adc7a8990dce9cc75370895fbe4e3d58b9b73bf1aee56359344a55"
dependencies = [
"proc-macro2 1.0.47",
"quote 1.0.21",
"syn 1.0.103",
]
[[package]]
name = "plotters"
version = "0.3.4"
@ -1268,15 +1211,6 @@ dependencies = [
"rand_core",
]
[[package]]
name = "rand_xoshiro"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6f97cdb2a36ed4183de61b2f824cc45c9f1037f28afe0a322e9fff4c108b5aaa"
dependencies = [
"rand_core",
]
[[package]]
name = "rayon"
version = "1.5.3"
@ -1561,16 +1495,6 @@ version = "0.3.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7bd3e3206899af3f8b12af284fafc038cc1dc2b41d1b89dd17297221c5d225de"
[[package]]
name = "sized-chunks"
version = "0.6.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "16d69225bde7a69b235da73377861095455d298f2b970996eec25ddbb42b3d1e"
dependencies = [
"bitmaps",
"typenum",
]
[[package]]
name = "smallvec"
version = "1.10.0"

View file

@ -9,22 +9,19 @@ edition = "2021"
string_cache = "0.8.3"
rle = { path = "../rle" }
smallvec = "1.8.0"
smartstring = "1.0.1"
smartstring = { version = "1.0.1" }
fxhash = "0.2.1"
ring = "0.16.20"
pin-project = "1.0.10"
serde = { version = "1.0.140", features = ["derive"] }
thiserror = "1.0.31"
im = "15.1.0"
enum-as-inner = "0.5.1"
num = "0.4.0"
crdt-list = { version = "0.3.0" }
owning_ref = "0.4.1"
postcard = "1.0.2"
rand = { version = "0.8.5", optional = true }
arbitrary = { version = "1.1.7", optional = true }
tabled = { version = "0.10.0", optional = true }
colored = "2.0.0"
bit-vec = "0.6.3"
wasm-bindgen = { version = "0.2.83", optional = true }
serde-wasm-bindgen = { version = "0.4.5", optional = true }
js-sys = { version = "0.3.60", optional = true }

View file

@ -10,7 +10,7 @@ use crate::{
log_store::ImportContext,
op::{InnerContent, RemoteContent, RichOp},
version::{IdSpanVector, VersionVector},
InternalString, LogStore, LoroValue, ID,
InternalString, LoroValue, ID,
};
use serde::{Deserialize, Serialize};

View file

@ -2,10 +2,11 @@ use std::ops::Range;
use enum_as_inner::EnumAsInner;
use rle::{HasLength, Mergable, Sliceable};
use serde::{Deserialize, Serialize};
use crate::container::text::text_content::{ListSlice, SliceRange};
#[derive(EnumAsInner, Debug, Clone)]
#[derive(EnumAsInner, Debug, Clone, Serialize, Deserialize)]
pub enum ListOp {
Insert { slice: ListSlice, pos: usize },
Delete(DeleteSpan),
@ -23,7 +24,7 @@ pub enum InnerListOp {
/// len cannot be zero;
///
/// pos: 5, len: -3 eq a range of (2, 5]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub struct DeleteSpan {
pub pos: isize,
pub len: isize,

View file

@ -1,8 +1,9 @@
use rle::{HasLength, Mergable, Sliceable};
use serde::{Deserialize, Serialize};
use crate::{ContentType, InsertContentTrait, InternalString, LoroValue};
#[derive(Clone, Debug, PartialEq)]
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct MapSet {
pub(crate) key: InternalString,
pub(crate) value: LoroValue,

View file

@ -17,7 +17,7 @@ use crate::{
log_store::ImportContext,
op::{RemoteContent, RichOp},
version::IdSpanVector,
LogStore, LoroError, LoroValue,
LoroError, LoroValue,
};
use super::{

View file

@ -2,10 +2,11 @@ use std::ops::Range;
use enum_as_inner::EnumAsInner;
use rle::{HasLength, Mergable, Sliceable};
use serde::{Deserialize, Serialize};
use crate::{smstring::SmString, LoroValue};
#[derive(PartialEq, Debug, EnumAsInner, Clone)]
#[derive(PartialEq, Debug, EnumAsInner, Clone, Serialize, Deserialize)]
pub enum ListSlice {
// TODO: use Box<[LoroValue]> ?
RawData(Vec<LoroValue>),
@ -101,7 +102,7 @@ impl HasLength for ListSlice {
impl Sliceable for ListSlice {
fn slice(&self, from: usize, to: usize) -> Self {
match self {
ListSlice::RawStr(s) => ListSlice::RawStr(s.0[from..to].into()),
ListSlice::RawStr(s) => ListSlice::RawStr(s[from..to].into()),
ListSlice::Unknown(_) => ListSlice::Unknown(to - from),
ListSlice::RawData(x) => ListSlice::RawData(x[from..to].to_vec()),
}

View file

@ -1,4 +1,3 @@
use colored::Colorize;
use debug_log::debug_log;
use rle::{rle_tree::UnsafeCursor, HasLength, Sliceable};
use smallvec::SmallVec;
@ -326,7 +325,7 @@ impl Tracker {
let mut spans = self
.content
.get_active_id_spans(span.start() as usize, span.atom_len());
debug_log!("DELETED SPANS={}", format!("{:#?}", &spans).red());
debug_log!("DELETED SPANS={}", format!("{:#?}", &spans));
self.update_spans(&spans, StatusChange::Delete);
if span.is_reversed() && span.atom_len() > 1 {

View file

@ -12,8 +12,6 @@ use std::{
fmt::Debug,
};
#[allow(unused)]
use colored::Colorize;
use fxhash::{FxHashMap, FxHashSet};
use rle::{HasLength, Sliceable};
use smallvec::{smallvec, SmallVec};

View file

@ -1,5 +1,6 @@
use enum_as_inner::EnumAsInner;
use fxhash::{FxHashMap, FxHashSet};
use serde::{Deserialize, Serialize};
use crate::{container::ContainerID, delta::Delta, version::Frontiers, InternalString, LoroValue};
@ -27,7 +28,7 @@ pub struct Event {
pub type Path = Vec<Index>;
#[derive(Debug, Clone, PartialEq, Eq)]
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum Index {
Key(InternalString),
Seq(usize),

View file

@ -1,9 +1,11 @@
//! [LogStore] stores all the [Change]s and [Op]s. It's also a [DAG][crate::dag];
//!
//!
mod encode_updates;
mod encoding;
mod import;
mod iter;
use crate::LoroValue;
pub(crate) use import::ImportContext;
use std::{
@ -13,7 +15,6 @@ use std::{
use fxhash::FxHashMap;
use crate::context::Context;
use rle::{HasLength, RleVec, RleVecWithIndex, Sliceable};
use smallvec::SmallVec;
@ -51,7 +52,7 @@ impl Default for GcConfig {
}
type ClientChanges = FxHashMap<ClientID, RleVecWithIndex<Change, ChangeMergeCfg>>;
type RemoteClientChanges = FxHashMap<ClientID, RleVecWithIndex<Change<RemoteOp>, ChangeMergeCfg>>;
type RemoteClientChanges = FxHashMap<ClientID, Vec<Change<RemoteOp>>>;
#[derive(Debug)]
/// LogStore stores the full history of Loro
@ -101,21 +102,14 @@ impl LogStore {
.map(|changes| changes.get(id.counter as usize).unwrap().element)
}
pub fn export(
&self,
remote_vv: &VersionVector,
) -> FxHashMap<ClientID, RleVecWithIndex<Change<RemoteOp>, ChangeMergeCfg>> {
let mut ans: FxHashMap<ClientID, RleVecWithIndex<Change<RemoteOp>, ChangeMergeCfg>> =
Default::default();
pub fn export(&self, remote_vv: &VersionVector) -> FxHashMap<ClientID, Vec<Change<RemoteOp>>> {
let mut ans: FxHashMap<ClientID, Vec<Change<RemoteOp>>> = Default::default();
let self_vv = self.vv();
let diff = self_vv.diff(remote_vv);
for span in diff.left.iter() {
let changes = self.get_changes_slice(span.id_span());
for change in changes.iter() {
let vec = ans
.entry(change.id.client_id)
.or_insert_with(|| RleVecWithIndex::new_cfg(self.get_change_merge_cfg()));
let vec = ans.entry(change.id.client_id).or_insert_with(|| Vec::new());
vec.push(self.change_to_export_format(change));
}
}

View file

@ -0,0 +1,162 @@
use rle::{HasLength, RleVec};
use serde::{Deserialize, Serialize};
use smallvec::SmallVec;
use crate::{
change::{Change, Lamport, Timestamp},
container::ContainerID,
id::{ClientID, Counter, ID},
op::{RemoteContent, RemoteOp},
LogStore, VersionVector,
};
use super::RemoteClientChanges;
#[derive(Serialize, Deserialize)]
struct Updates {
changes: Vec<EncodedClientChanges>,
}
/// the continuous changes from the same client
#[derive(Serialize, Deserialize)]
struct EncodedClientChanges {
meta: FirstChangeInfo,
data: Vec<EncodedChange>,
}
#[derive(Serialize, Deserialize)]
struct FirstChangeInfo {
pub(crate) client: ClientID,
pub(crate) counter: Counter,
pub(crate) lamport: Lamport,
pub(crate) timestamp: Timestamp,
}
#[derive(Serialize, Deserialize)]
struct EncodedOp {
pub(crate) container: ContainerID,
pub(crate) contents: Vec<RemoteContent>,
}
#[derive(Serialize, Deserialize)]
struct EncodedChange {
pub(crate) ops: Vec<EncodedOp>,
pub(crate) deps_except_self: Vec<ID>,
pub(crate) lamport_delta: u32,
pub(crate) timestamp_delta: i64,
}
impl LogStore {
pub fn encode_updates(&self, from: &VersionVector) -> Result<Vec<u8>, postcard::Error> {
let changes = self.export(from);
let mut updates = Updates {
changes: Vec::with_capacity(changes.len()),
};
for (_, changes) in changes {
let encoded = convert_changes_to_encoded(changes.into_iter());
updates.changes.push(encoded);
}
postcard::to_allocvec(&updates)
}
pub fn decode_updates(&mut self, input: &[u8]) -> Result<(), postcard::Error> {
let updates: Updates = postcard::from_bytes(input)?;
let mut changes: RemoteClientChanges = Default::default();
for encoded in updates.changes {
changes.insert(encoded.meta.client, convert_encoded_to_changes(encoded));
}
self.import(changes);
Ok(())
}
}
fn convert_changes_to_encoded<I>(mut changes: I) -> EncodedClientChanges
where
I: Iterator<Item = Change<RemoteOp>>,
{
let first_change = changes.next().unwrap();
let this_client_id = first_change.id.client_id;
let mut data = Vec::with_capacity(changes.size_hint().0 + 1);
let mut last_change = first_change.clone();
for change in changes {
data.push(EncodedChange {
ops: change
.ops
.iter()
.map(|op| EncodedOp {
container: op.container.clone(),
contents: op.contents.iter().cloned().collect(),
})
.collect(),
deps_except_self: change
.deps
.iter()
.filter(|x| x.client_id != this_client_id)
.copied()
.collect(),
lamport_delta: change.lamport - last_change.lamport,
timestamp_delta: change.timestamp - last_change.timestamp,
});
last_change = change;
}
EncodedClientChanges {
meta: FirstChangeInfo {
client: this_client_id,
counter: first_change.id.counter,
lamport: first_change.lamport,
timestamp: first_change.timestamp,
},
data,
}
}
fn convert_encoded_to_changes(changes: EncodedClientChanges) -> Vec<Change<RemoteOp>> {
let mut result = Vec::with_capacity(changes.data.len() + 1);
let mut last_lamport = changes.meta.lamport;
let mut last_timestamp = changes.meta.timestamp;
let mut counter: Counter = changes.meta.counter;
for encoded in changes.data {
let start_counter = counter;
let mut deps = SmallVec::with_capacity(encoded.deps_except_self.len() + 1);
if counter > 0 {
deps.push(ID {
client_id: changes.meta.client,
counter: changes.meta.counter - 1,
});
}
for dep in encoded.deps_except_self {
deps.push(dep);
}
let mut ops = RleVec::with_capacity(encoded.ops.len());
for op in encoded.ops {
let len: usize = op.contents.iter().map(|x| x.atom_len()).sum();
ops.push(RemoteOp {
counter,
container: op.container,
contents: op.contents.into_iter().collect(),
});
counter += len as Counter;
}
let change = Change {
id: ID {
client_id: changes.meta.client,
counter: start_counter,
},
lamport: last_lamport + encoded.lamport_delta,
timestamp: last_timestamp + encoded.timestamp_delta,
ops,
deps,
};
last_lamport = change.lamport;
last_timestamp = change.timestamp;
result.push(change);
}
result
}

View file

@ -19,7 +19,6 @@ use crate::{
dag::remove_included_frontiers,
id::{ClientID, Counter, ID},
op::{Op, RemoteContent, RemoteOp},
smstring::SmString,
span::{HasIdSpan, HasLamportSpan},
ContainerType, InternalString, LogStore, LoroValue, VersionVector,
};
@ -154,7 +153,6 @@ fn encode_changes(store: &LogStore) -> Encoded {
(span.pos as usize, 0, LoroValue::I32(span.len as i32))
}
},
crate::op::RemoteContent::Dyn(_) => unreachable!(),
};
op_len += 1;
ops.push(OpEncoding {
@ -271,7 +269,7 @@ fn decode_changes(
},
_ => {
let slice = match value {
LoroValue::String(s) => ListSlice::RawStr(SmString::from(&*s)),
LoroValue::String(s) => ListSlice::RawStr(s.into()),
LoroValue::List(v) => ListSlice::RawData(*v),
_ => unreachable!(),
};

View file

@ -72,18 +72,12 @@ impl LoroCore {
Text::from_instance(instance, cid)
}
pub fn export(
&self,
remote_vv: VersionVector,
) -> FxHashMap<u64, RleVecWithIndex<Change<RemoteOp>, ChangeMergeCfg>> {
pub fn export(&self, remote_vv: VersionVector) -> FxHashMap<u64, Vec<Change<RemoteOp>>> {
let store = self.log_store.read().unwrap();
store.export(&remote_vv)
}
pub fn import(
&mut self,
changes: FxHashMap<u64, RleVecWithIndex<Change<RemoteOp>, ChangeMergeCfg>>,
) {
pub fn import(&mut self, changes: FxHashMap<u64, Vec<Change<RemoteOp>>>) {
let mut store = self.log_store.write().unwrap();
store.import(changes)
}

View file

@ -2,6 +2,7 @@ use std::any::{Any, TypeId};
use enum_as_inner::EnumAsInner;
use rle::{HasLength, Mergable, Sliceable};
use serde::{Deserialize, Serialize};
use crate::container::{
list::list_op::{InnerListOp, ListOp},
@ -24,11 +25,10 @@ pub enum InnerContent {
Map(InnerMapSet),
}
#[derive(EnumAsInner, Debug)]
#[derive(EnumAsInner, Debug, Serialize, Deserialize)]
pub enum RemoteContent {
Map(MapSet),
List(ListOp),
Dyn(Box<dyn InsertContentTrait>),
}
impl Clone for RemoteContent {
@ -36,7 +36,6 @@ impl Clone for RemoteContent {
match self {
Self::Map(arg0) => Self::Map(arg0.clone()),
Self::List(arg0) => Self::List(arg0.clone()),
Self::Dyn(arg0) => Self::Dyn(arg0.clone_content()),
}
}
}
@ -93,7 +92,6 @@ impl HasLength for RemoteContent {
fn content_len(&self) -> usize {
match self {
RemoteContent::Map(x) => x.content_len(),
RemoteContent::Dyn(x) => x.content_len(),
RemoteContent::List(x) => x.content_len(),
}
}
@ -103,7 +101,6 @@ impl Sliceable for RemoteContent {
fn slice(&self, from: usize, to: usize) -> Self {
match self {
RemoteContent::Map(x) => RemoteContent::Map(x.slice(from, to)),
RemoteContent::Dyn(x) => RemoteContent::Dyn(x.slice_content(from, to)),
RemoteContent::List(x) => RemoteContent::List(x.slice(from, to)),
}
}
@ -117,7 +114,6 @@ impl Mergable for RemoteContent {
match (self, other) {
(RemoteContent::Map(x), RemoteContent::Map(y)) => x.is_mergable(y, &()),
(RemoteContent::List(x), RemoteContent::List(y)) => x.is_mergable(y, &()),
(RemoteContent::Dyn(x), RemoteContent::Dyn(y)) => x.is_mergable_content(&**y),
_ => false,
}
}
@ -135,7 +131,6 @@ impl Mergable for RemoteContent {
RemoteContent::List(y) => x.merge(y, &()),
_ => unreachable!(),
},
RemoteContent::Dyn(x) => x.merge_content(&**_other.as_dyn().unwrap()),
}
}
}

View file

@ -3,6 +3,7 @@ use std::ops::DerefMut;
use std::ops::Deref;
use rle::Mergable;
use serde::Deserialize;
use serde::Serialize;
use smartstring::LazyCompact;
@ -63,6 +64,13 @@ impl From<String> for SmString {
}
}
impl From<Box<str>> for SmString {
fn from(s: Box<str>) -> Self {
let s: &str = &s;
SmString(s.into())
}
}
impl From<&str> for SmString {
fn from(s: &str) -> Self {
SmString(s.into())
@ -77,3 +85,13 @@ impl Serialize for SmString {
serializer.serialize_str(&self.0)
}
}
impl<'de> Deserialize<'de> for SmString {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
let s = String::deserialize(deserializer)?;
Ok(s.into())
}
}

View file

@ -1,4 +1,3 @@
use std::borrow::{Borrow, BorrowMut};
use std::cell::RefCell;
use std::rc::Rc;

View file

@ -555,6 +555,31 @@ impl<A: Array> Deref for RleVecWithLen<A> {
}
}
impl<T: Sliceable + HasLength> Sliceable for Vec<T> {
fn slice(&self, start: usize, end: usize) -> Self {
if start >= end || self.is_empty() {
return Vec::new();
}
let mut ans = Vec::new();
let mut index = 0;
for item in self {
if index >= end {
break;
}
let len = item.atom_len();
if start < index + len {
ans.push(item.slice(start.saturating_sub(index), (end - index).min(len)))
}
index += len;
}
ans
}
}
#[cfg(test)]
mod test {
use super::*;

View file

@ -1,4 +1,7 @@
use std::ops::{Deref, Range};
use std::{
ops::{Deref, Range},
vec,
};
use num::Integer;
@ -213,6 +216,16 @@ impl<T, Conf> RleVecWithIndex<T, Conf> {
}
}
impl<T, Cfg> IntoIterator for RleVecWithIndex<T, Cfg> {
type Item = T;
type IntoIter = vec::IntoIter<T>;
fn into_iter(self) -> Self::IntoIter {
self.vec.into_iter()
}
}
impl<T> Default for RleVecWithIndex<T> {
fn default() -> Self {
Self::new()