feat: encode/decode v2

This commit is contained in:
Zixuan Chen 2023-08-29 10:38:48 +08:00
parent 5b6cc28f6b
commit f208744ec1
No known key found for this signature in database
16 changed files with 915 additions and 73 deletions

53
Cargo.lock generated
View file

@ -973,6 +973,7 @@ dependencies = [
"string_cache",
"thiserror",
"wasm-bindgen",
"zerovec",
]
[[package]]
@ -1039,6 +1040,7 @@ dependencies = [
"thiserror",
"tracing",
"wasm-bindgen",
"zerovec",
]
[[package]]
@ -1454,7 +1456,7 @@ version = "0.4.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf3d2011ab5c909338f7887f4fc896d35932e29146c12c8d01da6b22a80ba759"
dependencies = [
"unicode-xid",
"unicode-xid 0.1.0",
]
[[package]]
@ -1987,7 +1989,7 @@ checksum = "9ca4b3b69a77cbe1ffc9e198781b7acb0c7365a883670e8f1c1bc66fba79a5c5"
dependencies = [
"proc-macro2 0.4.30",
"quote 0.6.13",
"unicode-xid",
"unicode-xid 0.1.0",
]
[[package]]
@ -2012,6 +2014,18 @@ dependencies = [
"unicode-ident",
]
[[package]]
name = "synstructure"
version = "0.12.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f36bdaa60a83aca3921b5259d5400cbf5e90fc51931376a9bd4a0eb79aa7210f"
dependencies = [
"proc-macro2 1.0.64",
"quote 1.0.29",
"syn 1.0.107",
"unicode-xid 0.2.4",
]
[[package]]
name = "sys-info"
version = "0.9.1"
@ -2203,6 +2217,12 @@ version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fc72304796d0818e357ead4e000d19c9c174ab23dc11093ac919054d20a6a7fc"
[[package]]
name = "unicode-xid"
version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f962df74c8c05a667b5ee8bcf162993134c104e96440b663c8daa176dc772d8c"
[[package]]
name = "unindent"
version = "0.1.11"
@ -2464,3 +2484,32 @@ name = "windows_x86_64_msvc"
version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1a515f5799fe4961cb532f983ce2b23082366b898e52ffbce459c86f67c8378a"
[[package]]
name = "zerofrom"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "df54d76c3251de27615dfcce21e636c172dafb2549cd7fd93e21c66f6ca6bea2"
[[package]]
name = "zerovec"
version = "0.9.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "198f54134cd865f437820aa3b43d0ad518af4e68ee161b444cdd15d8e567c8ea"
dependencies = [
"serde",
"zerofrom",
"zerovec-derive",
]
[[package]]
name = "zerovec-derive"
version = "0.9.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "486558732d5dde10d0f8cb2936507c1bb21bc539d924c949baf5f36a58e51bac"
dependencies = [
"proc-macro2 1.0.64",
"quote 1.0.29",
"syn 1.0.107",
"synstructure",
]

View file

@ -15,6 +15,7 @@ enum-as-inner = "0.6.0"
string_cache = "0.8.7"
arbitrary = {version = "1.3.0", features=["derive"]}
js-sys = {version="0.3.64", optional=true}
zerovec = "0.9.4"
[features]
wasm = ["wasm-bindgen", "js-sys"]

View file

@ -11,6 +11,7 @@ mod value;
pub use error::{LoroError, LoroResult};
pub use span::*;
pub use value::LoroValue;
use zerovec::ule::AsULE;
pub type PeerID = u64;
pub type Counter = i32;
pub type Lamport = u32;
@ -47,8 +48,11 @@ pub enum ContainerID {
}
pub type InternalString = string_cache::DefaultAtom;
// TODO: add non_exausted
// Note: It will be encoded into binary format, so the order of its fields should not be changed.
#[derive(Arbitrary, Debug, PartialEq, Eq, Hash, Clone, Copy, Serialize, Deserialize)]
#[derive(
Arbitrary, Debug, PartialEq, Eq, Hash, Clone, Copy, PartialOrd, Ord, Serialize, Deserialize,
)]
pub enum ContainerType {
/// See [`crate::text::TextContent`]
Text,
@ -58,6 +62,27 @@ pub enum ContainerType {
// Custom(u16),
}
impl AsULE for ContainerType {
type ULE = u8;
fn to_unaligned(self) -> Self::ULE {
match self {
ContainerType::Text => 0,
ContainerType::Map => 1,
ContainerType::List => 2,
}
}
fn from_unaligned(unaligned: Self::ULE) -> Self {
match unaligned {
0 => ContainerType::Text,
1 => ContainerType::Map,
2 => ContainerType::List,
_ => unreachable!(),
}
}
}
impl ContainerType {
pub fn default_value(&self) -> LoroValue {
match self {
@ -66,6 +91,23 @@ impl ContainerType {
ContainerType::List => LoroValue::List(Arc::new(Default::default())),
}
}
pub fn to_u8(self) -> u8 {
match self {
ContainerType::Text => 0,
ContainerType::Map => 1,
ContainerType::List => 2,
}
}
pub fn from_u8(v: u8) -> Self {
match v {
0 => ContainerType::Text,
1 => ContainerType::Map,
2 => ContainerType::List,
_ => unreachable!(),
}
}
}
// a weird dependency in Prelim in loro_internal need this convertion to work.

View file

@ -90,6 +90,14 @@ impl CounterSpan {
}
}
pub fn set_min(&mut self, min: Counter) {
if self.start < self.end {
self.start = min;
} else {
self.end = min - 1;
}
}
#[inline(always)]
pub fn max(&self) -> Counter {
if self.start > self.end {

View file

@ -37,6 +37,7 @@ jumprope = { version = "1.1.2", features = ["wchar_conversion"] }
generic-btree = "0.4.1"
miniz_oxide = "0.7.1"
getrandom = "0.2.10"
zerovec = { version = "0.9.4", features = ["serde", "derive"] }
[dev-dependencies]
serde_json = "1.0.87"

View file

@ -385,6 +385,7 @@ dependencies = [
"serde",
"string_cache",
"thiserror",
"zerovec",
]
[[package]]
@ -419,6 +420,7 @@ dependencies = [
"tabled",
"thiserror",
"tracing",
"zerovec",
]
[[package]]
@ -910,6 +912,18 @@ dependencies = [
"unicode-ident",
]
[[package]]
name = "synstructure"
version = "0.12.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f36bdaa60a83aca3921b5259d5400cbf5e90fc51931376a9bd4a0eb79aa7210f"
dependencies = [
"proc-macro2",
"quote",
"syn 1.0.105",
"unicode-xid",
]
[[package]]
name = "tabled"
version = "0.10.0"
@ -1010,6 +1024,12 @@ version = "0.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c0edd1e5b14653f783770bce4a4dabb4a5108a5370a5f5d8cfe8710c361f6c8b"
[[package]]
name = "unicode-xid"
version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f962df74c8c05a667b5ee8bcf162993134c104e96440b663c8daa176dc772d8c"
[[package]]
name = "version_check"
version = "0.9.4"
@ -1078,3 +1098,32 @@ name = "windows_x86_64_msvc"
version = "0.42.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f40009d85759725a34da6d89a94e63d7bdc50a862acf0dbc7c8e488f1edcb6f5"
[[package]]
name = "zerofrom"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "df54d76c3251de27615dfcce21e636c172dafb2549cd7fd93e21c66f6ca6bea2"
[[package]]
name = "zerovec"
version = "0.9.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "198f54134cd865f437820aa3b43d0ad518af4e68ee161b444cdd15d8e567c8ea"
dependencies = [
"serde",
"zerofrom",
"zerovec-derive",
]
[[package]]
name = "zerovec-derive"
version = "0.9.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "486558732d5dde10d0f8cb2936507c1bb21bc539d924c949baf5f36a58e51bac"
dependencies = [
"proc-macro2",
"quote",
"syn 1.0.105",
"synstructure",
]

View file

@ -18,6 +18,15 @@ pub enum ListSlice<'a> {
Unknown(usize),
}
impl<'a> ListSlice<'a> {
pub fn from_str(str: &'a str) -> Self {
Self::RawStr {
str: Cow::Borrowed(str),
unicode_len: str.chars().count(),
}
}
}
#[repr(transparent)]
#[derive(PartialEq, Eq, Debug, Clone, Serialize)]
pub struct SliceRange(pub Range<u32>);

View file

@ -180,8 +180,9 @@ pub(crate) struct DagCausalIter<'a, Dag> {
dag: &'a Dag,
frontier: Frontiers,
target: IdSpanVector,
in_degrees: FxHashMap<ID, usize>,
succ: BTreeMap<ID, Frontiers>,
/// how many dependencies are inside target for each id
out_degrees: FxHashMap<ID, usize>,
succ: BTreeMap<ID, Vec<ID>>,
stack: Vec<ID>,
}
@ -200,9 +201,8 @@ pub(crate) struct IterReturn<'a, T> {
impl<'a, T: DagNode, D: Dag<Node = T>> DagCausalIter<'a, D> {
pub fn new(dag: &'a D, from: Frontiers, target: IdSpanVector) -> Self {
// debug_dbg!(&from, &target);
let mut in_degrees: FxHashMap<ID, usize> = FxHashMap::default();
let mut succ: BTreeMap<ID, Frontiers> = BTreeMap::default();
let mut out_degrees: FxHashMap<ID, usize> = FxHashMap::default();
let mut succ: BTreeMap<ID, Vec<ID>> = BTreeMap::default();
let mut stack = Vec::new();
let mut q = vec![];
for id in target.iter() {
@ -212,36 +212,36 @@ impl<'a, T: DagNode, D: Dag<Node = T>> DagCausalIter<'a, D> {
}
}
// traverse all nodes
// traverse all nodes, calculate the out_degrees
// if out_degree is 0, then it can be iterate directly
while let Some(id) = q.pop() {
let client = id.peer;
let node = dag.get(id).unwrap();
let deps = node.deps();
if deps.len().is_zero() {
in_degrees.insert(id, 0);
}
for dep in deps.iter() {
let filter = if let Some(span) = target.get(&dep.peer) {
dep.counter < span.min()
} else {
true
};
if filter {
in_degrees.entry(id).or_insert(0);
} else {
in_degrees.entry(id).and_modify(|i| *i += 1).or_insert(1);
}
succ.entry(*dep).or_default().push(id);
}
let mut target_span = *target.get(&client).unwrap();
out_degrees.insert(
id,
deps.iter()
.filter(|&dep| {
if let Some(span) = target.get(&dep.peer) {
let ans = dep.counter >= span.min() && dep.counter <= span.max();
if ans {
succ.entry(*dep).or_default().push(id);
}
ans
} else {
false
}
})
.count(),
);
let target_span = target.get(&client).unwrap();
let last_counter = node.id_last().counter;
target_span.set_start(last_counter + 1);
if target_span.content_len() > 0 {
if target_span.max() > last_counter {
q.push(ID::new(client, last_counter + 1))
}
}
in_degrees.retain(|k, v| {
out_degrees.retain(|k, v| {
if v.is_zero() {
stack.push(*k);
return false;
@ -253,7 +253,7 @@ impl<'a, T: DagNode, D: Dag<Node = T>> DagCausalIter<'a, D> {
dag,
frontier: from,
target,
in_degrees,
out_degrees,
succ,
stack,
}
@ -275,7 +275,6 @@ impl<'a, T: DagNode + 'a, D: Dag<Node = T>> Iterator for DagCausalIter<'a, D> {
return None;
}
let node_id = self.stack.pop().unwrap();
let target_span = self.target.get_mut(&node_id.peer).unwrap();
debug_assert_eq!(
node_id.counter,
@ -311,25 +310,27 @@ impl<'a, T: DagNode + 'a, D: Dag<Node = T>> Iterator for DagCausalIter<'a, D> {
};
let path = self.dag.find_path(&self.frontier, &deps);
debug_log::group!("Dag Causal");
debug_log::debug_dbg!(&deps);
debug_log::debug_dbg!(&path);
debug_log::group_end!();
// debug_log::group!("Dag Causal");
// debug_log::debug_dbg!(&deps);
// debug_log::debug_dbg!(&path);
// debug_log::group_end!();
// NOTE: we expect user to update the tracker, to apply node, after visiting the node
self.frontier = Frontiers::from_id(node.id_start().inc(slice_end - 1));
let current_client = node_id.peer;
let current_peer = node_id.peer;
let mut keys = Vec::new();
let mut heap = BinaryHeap::new();
// The in-degree of the successor node minus 1, and if it becomes 0, it is added to the heap
for (key, succ) in self.succ.range(node.id_start()..node.id_end()) {
keys.push(*key);
for succ_id in succ.iter() {
self.in_degrees.entry(*succ_id).and_modify(|i| *i -= 1);
if let Some(in_degree) = self.in_degrees.get(succ_id) {
self.out_degrees.entry(*succ_id).and_modify(|i| *i -= 1);
if let Some(in_degree) = self.out_degrees.get(succ_id) {
if in_degree.is_zero() {
heap.push((succ_id.peer != current_client, *succ_id));
self.in_degrees.remove(succ_id);
heap.push((succ_id.peer != current_peer, *succ_id));
self.out_degrees.remove(succ_id);
}
}
}

View file

@ -20,6 +20,7 @@ use self::{
encode_updates::decode_oplog_updates,
};
pub(crate) use encode_enhanced::{decode_oplog_v2, encode_oplog_v2};
pub(crate) use encode_updates::encode_oplog_updates;
pub(crate) const COMPRESS_RLE_THRESHOLD: usize = 20 * 1024;
@ -33,6 +34,7 @@ pub enum EncodeMode {
RleUpdates(VersionVector),
Snapshot,
CompressRleUpdates(VersionVector),
RleUpdatesV2(VersionVector),
}
impl EncodeMode {
@ -43,6 +45,7 @@ impl EncodeMode {
EncodeMode::RleUpdates(_) => 1,
EncodeMode::Snapshot => 2,
EncodeMode::CompressRleUpdates(_) => 3,
EncodeMode::RleUpdatesV2(_) => 4,
}
}
}
@ -52,6 +55,7 @@ pub enum ConcreteEncodeMode {
RleUpdates = 1,
Snapshot = 2,
CompressedRleUpdates = 3,
RleUpdatesV2 = 4,
}
impl From<u8> for ConcreteEncodeMode {
@ -61,6 +65,7 @@ impl From<u8> for ConcreteEncodeMode {
1 => ConcreteEncodeMode::RleUpdates,
2 => ConcreteEncodeMode::Snapshot,
3 => ConcreteEncodeMode::CompressedRleUpdates,
4 => ConcreteEncodeMode::RleUpdatesV2,
_ => unreachable!(),
}
}
@ -82,10 +87,8 @@ pub(crate) fn encode_oplog(oplog: &OpLog, mode: EncodeMode) -> Vec<u8> {
.sum::<usize>();
if update_total_len > COMPRESS_RLE_THRESHOLD {
EncodeMode::CompressRleUpdates(vv)
} else if update_total_len > UPDATE_ENCODE_THRESHOLD {
EncodeMode::RleUpdates(vv)
} else {
EncodeMode::Updates(vv)
EncodeMode::RleUpdatesV2(vv)
}
}
mode => mode,
@ -98,6 +101,10 @@ pub(crate) fn encode_oplog(oplog: &OpLog, mode: EncodeMode) -> Vec<u8> {
miniz_oxide::deflate::compress_to_vec(&bytes, 7)
}
EncodeMode::Snapshot => unimplemented!(),
EncodeMode::RleUpdatesV2(vv) => {
let bytes = encode_oplog_v2(oplog, vv);
miniz_oxide::deflate::compress_to_vec(&bytes, 7)
}
_ => unreachable!(),
};
ans.push(mode.to_byte());
@ -127,5 +134,8 @@ pub(crate) fn decode_oplog(oplog: &mut OpLog, input: &[u8]) -> Result<(), LoroEr
.and_then(|bytes| decode_oplog_changes(oplog, &bytes))
}
ConcreteEncodeMode::Snapshot => unimplemented!(),
ConcreteEncodeMode::RleUpdatesV2 => miniz_oxide::inflate::decompress_to_vec(decoded)
.map_err(|_| LoroError::DecodeError("Invalid compressed data".into()))
.and_then(|bytes| decode_oplog_v2(oplog, &bytes)),
}
}

View file

@ -0,0 +1,587 @@
use fxhash::{FxHashMap, FxHashSet};
use loro_common::HasLamportSpan;
use rle::{HasLength, RleVec};
use serde::{Deserialize, Serialize};
use serde_columnar::{columnar, from_bytes, to_vec};
use std::{borrow::Cow, cmp::Ordering, ops::Deref, sync::Arc};
use zerovec::{vecs::VarZeroVecOwned, VarZeroVec};
use crate::{
change::{Change, Lamport, Timestamp},
container::text::text_content::ListSlice,
container::{
idx::ContainerIdx,
list::list_op::{DeleteSpan, ListOp},
map::MapSet,
ContainerID, ContainerType,
},
id::{Counter, PeerID, ID},
op::{RawOpContent, RemoteOp},
oplog::{AppDagNode, OpLog},
span::HasId,
version::Frontiers,
InternalString, LoroError, LoroValue, VersionVector,
};
type PeerIdx = u32;
#[zerovec::make_varule(RootContainerULE)]
#[zerovec::derive(Serialize, Deserialize)]
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, serde::Serialize, serde::Deserialize)]
struct RootContainer<'a> {
#[serde(borrow)]
name: Cow<'a, str>,
type_: ContainerType,
}
#[columnar(vec, ser, de)]
#[derive(Debug, Clone, Serialize, Deserialize)]
struct NormalContainer {
#[columnar(strategy = "DeltaRle", original_type = "u32")]
peer_idx: PeerIdx,
#[columnar(strategy = "DeltaRle", original_type = "u32")]
counter: Counter,
#[columnar(strategy = "Rle")]
type_: u8,
}
#[columnar(vec, ser, de)]
#[derive(Debug, Clone, Serialize, Deserialize)]
struct ChangeEncoding {
#[columnar(strategy = "Rle", original_type = "u32")]
pub(super) peer_idx: PeerIdx,
#[columnar(strategy = "DeltaRle", original_type = "i64")]
pub(super) timestamp: Timestamp,
#[columnar(strategy = "DeltaRle", original_type = "i64")]
pub(super) op_len: u32,
/// The length of deps that exclude the dep on the same client
#[columnar(strategy = "Rle")]
pub(super) deps_len: u32,
/// Whether the change has a dep on the same client.
/// It can save lots of space by using this field instead of [`DepsEncoding`]
#[columnar(strategy = "BoolRle")]
pub(super) dep_on_self: bool,
}
#[columnar(vec, ser, de)]
#[derive(Debug, Clone, Serialize, Deserialize)]
struct OpEncoding {
#[columnar(strategy = "DeltaRle", original_type = "usize")]
container: usize,
/// key index or insert/delete pos
#[columnar(strategy = "DeltaRle")]
prop: usize,
#[columnar(strategy = "BoolRle")]
is_del: bool,
// if is_del == true, then the following fields is the length of the deletion
// if is_del != true, then the following fields is the length of unknown insertion
#[columnar(strategy = "Rle", original_type = "usize")]
gc: isize,
}
#[columnar(vec, ser, de)]
#[derive(Debug, Copy, Clone, Serialize, Deserialize)]
pub(super) struct DepsEncoding {
#[columnar(strategy = "DeltaRle", original_type = "u32")]
pub(super) client_idx: PeerIdx,
#[columnar(strategy = "DeltaRle", original_type = "i32")]
pub(super) counter: Counter,
}
impl DepsEncoding {
pub(super) fn new(client_idx: PeerIdx, counter: Counter) -> Self {
Self {
client_idx,
counter,
}
}
}
#[columnar(ser, de)]
#[derive(Serialize, Deserialize)]
struct DocEncoding<'a> {
#[columnar(type = "vec")]
changes: Vec<ChangeEncoding>,
#[columnar(type = "vec")]
ops: Vec<OpEncoding>,
#[columnar(type = "vec")]
deps: Vec<DepsEncoding>,
#[columnar(type = "vec")]
normal_containers: Vec<NormalContainer>,
#[serde(borrow)]
str: VarZeroVec<'a, str>,
#[serde(borrow)]
root_containers: VarZeroVec<'a, RootContainerULE>,
start_counter: Vec<Counter>,
values: Vec<LoroValue>,
clients: Vec<PeerID>,
keys: Vec<InternalString>,
}
pub fn encode_oplog_v2(oplog: &OpLog, vv: &VersionVector) -> Vec<u8> {
let mut peer_id_to_idx: FxHashMap<PeerID, PeerIdx> = FxHashMap::default();
let mut peers = Vec::with_capacity(oplog.changes.len());
let mut diff_changes = Vec::new();
let self_vv = oplog.vv();
let start_vv = vv.trim(&oplog.vv());
let diff = self_vv.diff(&start_vv);
let mut start_counter = Vec::new();
for span in diff.left.iter() {
let change = oplog.get_change_at(span.id_start()).unwrap();
let peer_id = *span.0;
let idx = peers.len() as PeerIdx;
peers.push(peer_id);
peer_id_to_idx.insert(peer_id, idx);
start_counter.push(change.id.counter);
}
debug_log::debug_dbg!(&start_vv, &self_vv);
for (change, _) in oplog.iter_causally(start_vv, self_vv.clone()) {
diff_changes.push(change.clone());
}
let (root_containers, container_idx2index, normal_containers) =
extract_containers(&diff_changes, oplog, &mut peer_id_to_idx, &mut peers);
for change in &diff_changes {
for deps in change.deps.iter() {
peer_id_to_idx.entry(deps.peer).or_insert_with(|| {
let idx = peers.len() as PeerIdx;
peers.push(deps.peer);
idx
});
}
}
let change_num = diff_changes.len();
let mut changes = Vec::with_capacity(change_num);
let mut ops = Vec::with_capacity(change_num);
let mut keys = Vec::new();
let mut key_to_idx = FxHashMap::default();
let mut deps = Vec::with_capacity(change_num);
let mut values = Vec::new();
let mut strings = VarZeroVecOwned::new();
for change in &diff_changes {
let client_idx = peer_id_to_idx[&change.id.peer];
let mut dep_on_self = false;
let mut deps_len = 0;
for dep in change.deps.iter() {
if change.id.peer != dep.peer {
deps.push(DepsEncoding::new(
*peer_id_to_idx.get(&dep.peer).unwrap(),
dep.counter,
));
deps_len += 1;
} else {
dep_on_self = true;
}
}
let mut op_len = 0;
for op in change.ops.iter() {
let container = op.container;
let container_index = *container_idx2index.get(&container).unwrap();
let op = oplog.local_op_to_remote(op);
for content in op.contents.into_iter() {
let (prop, gc, is_del) = match content {
crate::op::RawOpContent::Map(MapSet { key, value }) => {
values.push(value);
(
*key_to_idx.entry(key.clone()).or_insert_with(|| {
keys.push(key.clone());
keys.len() - 1
}),
0,
false, // always insert
)
}
crate::op::RawOpContent::List(list) => match list {
ListOp::Insert { slice, pos } => {
match &slice {
ListSlice::RawData(v) => {
values.push(LoroValue::List(Arc::new(v.to_vec())));
}
ListSlice::RawStr {
str,
unicode_len: _,
} => {
strings.push(str);
}
ListSlice::Unknown(_) => {}
};
(
pos,
match &slice {
ListSlice::Unknown(v) => *v as isize,
_ => 0,
},
false,
)
}
ListOp::Delete(span) => (span.pos as usize, span.len, true),
},
};
op_len += 1;
ops.push(OpEncoding {
container: container_index,
prop,
gc,
is_del,
})
}
}
changes.push(ChangeEncoding {
peer_idx: client_idx as PeerIdx,
timestamp: change.timestamp,
deps_len,
op_len,
dep_on_self,
});
}
let encoded = DocEncoding {
changes,
ops,
deps,
str: VarZeroVec::Owned(strings),
clients: peers,
keys,
start_counter,
root_containers: VarZeroVec::Owned(root_containers),
normal_containers,
values,
};
to_vec(&encoded).unwrap()
}
/// Extract containers from oplog changes.
///
/// Containers are sorted by their peer_id and counter so that
/// they can be compressed by using delta encoding.
fn extract_containers(
diff_changes: &Vec<Change>,
oplog: &OpLog,
peer_id_to_idx: &mut FxHashMap<PeerID, PeerIdx>,
peers: &mut Vec<PeerID>,
) -> (
VarZeroVecOwned<RootContainerULE>,
FxHashMap<ContainerIdx, usize>,
Vec<NormalContainer>,
) {
let mut root_containers: VarZeroVecOwned<RootContainerULE> = VarZeroVecOwned::new();
let mut container_idx2index = FxHashMap::default();
let normal_containers = {
// register containers in sorted order
let mut visited = FxHashSet::default();
let mut normal_container_idx_pairs = Vec::new();
for change in diff_changes {
for op in change.ops.iter() {
let container = op.container;
if visited.contains(&container) {
continue;
}
visited.insert(container);
let id = oplog.arena.get_container_id(container).unwrap();
match id {
ContainerID::Root {
name,
container_type,
} => {
container_idx2index.insert(container, root_containers.len());
root_containers.push(&RootContainer {
name: Cow::Owned(name.to_string()),
type_: container_type,
});
}
ContainerID::Normal {
peer,
counter,
container_type,
} => normal_container_idx_pairs.push((
NormalContainer {
peer_idx: *peer_id_to_idx.entry(peer).or_insert_with(|| {
peers.push(peer);
(peers.len() - 1) as PeerIdx
}),
counter,
type_: container_type.to_u8(),
},
container,
)),
}
}
}
normal_container_idx_pairs.sort_by(|a, b| {
if a.0.peer_idx != b.0.peer_idx {
a.0.peer_idx.cmp(&b.0.peer_idx)
} else {
a.0.counter.cmp(&b.0.counter)
}
});
let mut index = root_containers.len();
normal_container_idx_pairs
.into_iter()
.map(|(container, idx)| {
container_idx2index.insert(idx, index);
index += 1;
container
})
.collect::<Vec<_>>()
};
(root_containers, container_idx2index, normal_containers)
}
pub fn decode_oplog_v2(oplog: &mut OpLog, input: &[u8]) -> Result<(), LoroError> {
let encoded: DocEncoding =
from_bytes(input).map_err(|e| LoroError::DecodeError(e.to_string().into()))?;
let DocEncoding {
changes: change_encodings,
ops,
deps,
normal_containers,
mut start_counter,
str,
clients: peers,
keys,
root_containers,
values,
} = encoded;
let start_vv: VersionVector = peers
.iter()
.copied()
.zip(start_counter.iter().map(|x| *x as Counter))
.collect::<FxHashMap<_, _>>()
.into();
let ord = start_vv.partial_cmp(oplog.vv());
if ord.is_none() || ord.unwrap() == Ordering::Greater {
return Err(LoroError::DecodeError(
format!(
"Warning: current Loro version is `{:?}`, but remote changes start at version `{:?}`.
These updates can not be applied",
oplog.vv(),
start_vv
)
.into(),
));
}
let mut op_iter = ops.into_iter();
let mut deps_iter = deps.into_iter();
let get_container = |idx: usize| {
if idx < root_containers.len() {
let container = root_containers.get(idx).unwrap();
ContainerID::Root {
name: container.name.into(),
container_type: ContainerType::from_u8(container.type_),
}
} else {
let container = normal_containers.get(idx - root_containers.len()).unwrap();
ContainerID::Normal {
peer: peers[container.peer_idx as usize],
counter: container.counter,
container_type: ContainerType::from_u8(container.type_),
}
}
};
let mut value_iter = values.into_iter();
let mut str_iter = str.iter();
let change_iter = change_encodings.into_iter().map(|change_encoding| {
let counter = start_counter
.get_mut(change_encoding.peer_idx as usize)
.unwrap();
let ChangeEncoding {
peer_idx,
timestamp,
op_len,
deps_len,
dep_on_self,
} = change_encoding;
let peer_id = peers[peer_idx as usize];
let mut ops = RleVec::<[RemoteOp; 1]>::new();
let mut delta = 0;
for op in op_iter.by_ref().take(op_len as usize) {
let OpEncoding {
container: container_idx,
prop,
gc,
is_del,
} = op;
let container_id = get_container(container_idx);
let container_type = container_id.container_type();
let content = match container_type {
ContainerType::Map => {
let key = keys[prop].clone();
RawOpContent::Map(MapSet {
key,
value: value_iter.next().unwrap(),
})
}
ContainerType::List | ContainerType::Text => {
let pos = prop;
if is_del {
RawOpContent::List(ListOp::Delete(DeleteSpan {
pos: pos as isize,
len: gc,
}))
} else if gc > 0 {
RawOpContent::List(ListOp::Insert {
pos,
slice: ListSlice::Unknown(gc as usize),
})
} else {
match container_type {
ContainerType::Text => {
let s = str_iter.next().unwrap();
RawOpContent::List(ListOp::Insert {
slice: ListSlice::from_str(s),
pos,
})
}
ContainerType::List => {
let value = value_iter.next().unwrap();
RawOpContent::List(ListOp::Insert {
slice: ListSlice::RawData(Cow::Owned(
match Arc::try_unwrap(value.into_list().unwrap()) {
Ok(v) => v,
Err(v) => v.deref().clone(),
},
)),
pos,
})
}
ContainerType::Map => unreachable!(),
}
}
}
};
let remote_op = RemoteOp {
container: container_id,
counter: *counter + delta,
contents: vec![content].into(),
};
delta += remote_op.content_len() as i32;
ops.push(remote_op);
}
let mut deps: Frontiers = (0..deps_len)
.map(|_| {
let raw = deps_iter.next().unwrap();
ID::new(peers[raw.client_idx as usize], raw.counter)
})
.collect();
if dep_on_self {
deps.push(ID::new(peer_id, *counter - 1));
}
let change = Change {
id: ID {
peer: peer_id,
counter: *counter,
},
// calc lamport after parsing all changes
lamport: 0,
timestamp,
ops,
deps,
};
*counter += delta;
change
});
oplog.arena.clone().with_op_converter(|converter| {
for mut change in change_iter {
if change.id.counter < oplog.vv().get(&change.id.peer).copied().unwrap_or(0) {
// skip included changes
continue;
}
// calc lamport or pending if its deps are not satisfied
for dep in change.deps.iter() {
match oplog.dag.get_lamport(dep) {
Some(lamport) => {
change.lamport = change.lamport.max(lamport + 1);
}
None => {
todo!("pending")
}
}
}
// convert change into inner format
let mut ops = RleVec::new();
for op in change.ops {
for content in op.contents.into_iter() {
let op = converter.convert_single_op(&op.container, op.counter, content);
ops.push(op);
}
}
let change = Change {
ops,
id: change.id,
deps: change.deps,
lamport: change.lamport,
timestamp: change.timestamp,
};
// update dag and push the change
let len = change.content_len();
if change.deps.len() == 1 && change.deps[0].peer == change.id.peer {
// don't need to push new element to dag because it only depends on itself
let nodes = oplog.dag.map.get_mut(&change.id.peer).unwrap();
let last = nodes.vec_mut().last_mut().unwrap();
assert_eq!(last.peer, change.id.peer);
assert_eq!(last.cnt + last.len as Counter, change.id.counter);
assert_eq!(last.lamport + last.len as Lamport, change.lamport);
last.len = change.id.counter as usize + len - last.cnt as usize;
} else {
let vv = oplog.dag.frontiers_to_im_vv(&change.deps);
oplog
.dag
.map
.entry(change.id.peer)
.or_default()
.push(AppDagNode {
vv,
peer: change.id.peer,
cnt: change.id.counter,
lamport: change.lamport,
deps: change.deps.clone(),
len,
});
}
oplog.next_lamport = oplog.next_lamport.max(change.lamport_end());
oplog.latest_timestamp = oplog.latest_timestamp.max(change.timestamp);
oplog.dag.vv.extend_to_include_end_id(ID {
peer: change.id.peer,
counter: change.id.counter + change.atom_len() as Counter,
});
oplog
.changes
.entry(change.id.peer)
.or_default()
.push(change);
}
});
// update dag frontiers
oplog.dag.frontiers = oplog.dag.vv_to_frontiers(&oplog.dag.vv);
Ok(())
}

View file

@ -658,6 +658,47 @@ mod test {
);
}
#[test]
fn new_encode() {
test_multi_sites_refactored(
8,
&mut [
Ins {
content: 3871,
pos: 2755657778,
site: 0,
},
Sync { from: 0, to: 31 },
Ins {
content: 3840,
pos: 55040529478965,
site: 212,
},
Ins {
content: 0,
pos: 17381979229574397952,
site: 15,
},
Ins {
content: 12815,
pos: 2248762090699358208,
site: 15,
},
Sync { from: 0, to: 212 },
Ins {
content: 25896,
pos: 14090375187464448,
site: 64,
},
Ins {
content: 0,
pos: 6067790159959556096,
site: 212,
},
],
)
}
#[test]
fn snapshot() {
test_multi_sites_refactored(

View file

@ -210,6 +210,7 @@ impl LoroDoc {
match mode {
ConcreteEncodeMode::Updates
| ConcreteEncodeMode::RleUpdates
| ConcreteEncodeMode::RleUpdatesV2
| ConcreteEncodeMode::CompressedRleUpdates => {
// TODO: need to throw error if state is in transaction
debug_log::group!("import to {}", self.peer_id());

View file

@ -109,21 +109,13 @@ impl Sliceable for Op {
}
impl<'a> Mergable for RemoteOp<'a> {
fn is_mergable(&self, other: &Self, cfg: &()) -> bool {
self.counter + self.content_len() as Counter == other.counter
&& other.contents.len() == 1
&& self
.contents
.last()
.unwrap()
.is_mergable(other.contents.first().unwrap(), cfg)
&& self.container == other.container
fn is_mergable(&self, _other: &Self, _cfg: &()) -> bool {
// don't merge remote op, because it's already merged.
false
}
fn merge(&mut self, other: &Self, _: &()) {
for content in other.contents.iter() {
self.contents.push(content.clone())
}
fn merge(&mut self, _other: &Self, _: &()) {
unreachable!()
}
}

View file

@ -47,19 +47,19 @@ pub struct OpLog {
/// It's faster to answer the question like what's the LCA version
#[derive(Debug, Clone, Default)]
pub struct AppDag {
map: FxHashMap<PeerID, RleVec<[AppDagNode; 0]>>,
frontiers: Frontiers,
vv: VersionVector,
pub(crate) map: FxHashMap<PeerID, RleVec<[AppDagNode; 0]>>,
pub(crate) frontiers: Frontiers,
pub(crate) vv: VersionVector,
}
#[derive(Debug, Clone)]
pub struct AppDagNode {
peer: PeerID,
cnt: Counter,
lamport: Lamport,
deps: Frontiers,
vv: ImVersionVector,
len: usize,
pub(crate) peer: PeerID,
pub(crate) cnt: Counter,
pub(crate) lamport: Lamport,
pub(crate) deps: Frontiers,
pub(crate) vv: ImVersionVector,
pub(crate) len: usize,
}
impl Clone for OpLog {
@ -273,10 +273,10 @@ impl OpLog {
changes
}
pub fn get_change_at(&self, id: ID) -> Option<Change> {
pub fn get_change_at(&self, id: ID) -> Option<&Change> {
if let Some(peer_changes) = self.changes.get(&id.peer) {
if let Some(result) = peer_changes.get_by_atom_index(id.counter) {
return Some(peer_changes.vec()[result.merged_index].clone());
return Some(&peer_changes.vec()[result.merged_index]);
}
}
@ -604,6 +604,51 @@ impl OpLog {
)
}
pub(crate) fn iter_causally(
&self,
from: VersionVector,
to: VersionVector,
) -> impl Iterator<Item = (&Change, Rc<RefCell<VersionVector>>)> {
let from_frontiers = from.to_frontiers(&self.dag);
let diff = from.diff(&to).right;
let mut iter = self.dag.iter_causal(&from_frontiers, diff);
let mut node = iter.next();
let mut cur_cnt = 0;
let vv = Rc::new(RefCell::new(VersionVector::default()));
std::iter::from_fn(move || {
if let Some(inner) = &node {
let mut inner_vv = vv.borrow_mut();
inner_vv.clear();
inner_vv.extend_to_include_vv(inner.data.vv.iter());
let peer = inner.data.peer;
let cnt = inner
.data
.cnt
.max(cur_cnt)
.max(from.get(&peer).copied().unwrap_or(0));
let end = (inner.data.cnt + inner.data.len as Counter)
.min(to.get(&peer).copied().unwrap_or(0));
let change = self
.changes
.get(&peer)
.and_then(|x| x.get_by_atom_index(cnt).map(|x| x.element))
.unwrap();
if change.ctr_end() < end {
cur_cnt = change.ctr_end();
} else {
node = iter.next();
cur_cnt = 0;
}
inner_vv.extend_to_include_end_id(change.id);
Some((change, vv.clone()))
} else {
None
}
})
}
pub(crate) fn len_changes(&self) -> usize {
self.changes.values().map(|x| x.len()).sum()
}

View file

@ -744,6 +744,16 @@ impl VersionVector {
shrink_frontiers(last_ids, dag)
}
pub(crate) fn trim(&self, vv: &&VersionVector) -> VersionVector {
let mut ans = VersionVector::new();
for (client_id, &counter) in self.iter() {
if let Some(&other_counter) = vv.get(client_id) {
ans.insert(*client_id, counter.min(other_counter));
}
}
ans
}
}
impl ImVersionVector {

View file

@ -8,12 +8,8 @@ fn test_timestamp() {
let mut txn = doc.txn().unwrap();
text.insert(&mut txn, 0, "123").unwrap();
txn.commit().unwrap();
let change = doc
.oplog()
.lock()
.unwrap()
.get_change_at(ID::new(doc.peer_id(), 0))
.unwrap();
let op_log = &doc.oplog().lock().unwrap();
let change = op_log.get_change_at(ID::new(doc.peer_id(), 0)).unwrap();
assert!(change.timestamp() > 1690966970);
}