fix: encode use dep_on_self

Co-authored-by: Zixuan Chen <me@zxch3n.com>
This commit is contained in:
leeeon233 2023-02-22 11:56:54 +08:00
parent 0328c11a8e
commit 5b22a1e9aa
11 changed files with 267 additions and 247 deletions

View file

@ -235,6 +235,10 @@ impl ListContainer {
for state in self.state.iter_range(pos, Some(pos + len)) {
let range = &state.get_sliced().0;
if SliceRange::from(range.start..range.end).is_unknown() {
continue;
}
for value in self.raw_data.slice(range).iter() {
if let LoroValue::Unresolved(container_id) = value {
debug_log::debug_log!("Deleted {:?}", container_id);

View file

@ -29,15 +29,15 @@ use super::{
#[derive(PartialEq, Eq, Clone, Copy, Hash, Debug)]
pub(crate) struct ContainerIdx(u32);
impl ContainerIdx {
pub(crate) fn to_u32(self) -> u32 {
self.0
}
// impl ContainerIdx {
// pub(crate) fn to_u32(self) -> u32 {
// self.0
// }
pub(crate) fn from_u32(idx: u32) -> Self {
Self(idx)
}
}
// pub(crate) fn from_u32(idx: u32) -> Self {
// Self(idx)
// }
// }
// TODO: replace this with a fat pointer?
#[derive(Debug, EnumAsInner)]

View file

@ -40,7 +40,7 @@ impl Rope {
)
}
pub fn utf8_to_utf16(&self, index: usize) -> usize {
pub fn _utf8_to_utf16(&self, index: usize) -> usize {
self.process_cursor_at(
index,
|x| x.utf8 as usize,

View file

@ -9,6 +9,7 @@ use enum_as_inner::EnumAsInner;
use fxhash::FxHashMap;
use tabled::{TableIteratorExt, Tabled};
#[allow(unused_imports)]
use crate::{
array_mut_ref,
container::ContainerID,
@ -641,14 +642,18 @@ fn check_synced(sites: &mut [Actor]) {
let (a, b) = array_mut_ref!(sites, [i, j]);
let a_doc = &mut a.loro;
let b_doc = &mut b.loro;
// a_doc
// .decode(&b_doc.encode_with_cfg(EncodeConfig::rle_update(a_doc.vv_cloned())))
// .unwrap();
// b_doc
// .decode(&a_doc.encode_with_cfg(EncodeConfig::rle_update(b_doc.vv_cloned())))
// .unwrap();
a_doc.decode(&b_doc.encode_all()).unwrap();
b_doc.decode(&a_doc.encode_all()).unwrap();
if i % 2 == 0 {
a_doc
.decode(&b_doc.encode_with_cfg(EncodeConfig::rle_update(a_doc.vv_cloned())))
.unwrap();
b_doc
.decode(&a_doc.encode_with_cfg(EncodeConfig::update(b_doc.vv_cloned())))
.unwrap();
} else {
a_doc.decode(&b_doc.encode_all()).unwrap();
b_doc.decode(&a_doc.encode_all()).unwrap();
}
check_eq(a, b);
debug_log::group_end!();
}
@ -1286,27 +1291,30 @@ mod failed_tests {
test_multi_sites(
5,
&mut [
Text {
site: 0,
List {
site: 4,
container_idx: 0,
pos: 0,
value: 32125,
is_del: false,
key: 0,
value: I32(1),
},
SyncAll,
Text {
site: 2,
List {
site: 0,
container_idx: 0,
pos: 1,
value: 1,
is_del: true,
key: 0,
value: Container(C::Text),
},
Text {
site: 2,
List {
site: 0,
container_idx: 0,
pos: 0,
value: 1,
is_del: true,
key: 1,
value: I32(2105376125),
},
List {
site: 0,
container_idx: 0,
key: 1,
value: Null,
},
],
)
@ -1318,12 +1326,6 @@ mod failed_tests {
minify_error(
5,
vec![
Map {
site: 0,
container_idx: 255,
key: 255,
value: Null,
},
SyncAll,
SyncAll,
SyncAll,
@ -1331,114 +1333,6 @@ mod failed_tests {
SyncAll,
SyncAll,
SyncAll,
Map {
site: 21,
container_idx: 21,
key: 21,
value: Null,
},
Map {
site: 21,
container_idx: 21,
key: 21,
value: Null,
},
Map {
site: 21,
container_idx: 21,
key: 21,
value: Null,
},
Map {
site: 21,
container_idx: 21,
key: 21,
value: Null,
},
Map {
site: 21,
container_idx: 21,
key: 21,
value: I32(2105376125),
},
Text {
site: 125,
container_idx: 125,
pos: 125,
value: 32125,
is_del: true,
},
Text {
site: 125,
container_idx: 125,
pos: 125,
value: 32125,
is_del: true,
},
Text {
site: 125,
container_idx: 125,
pos: 125,
value: 32125,
is_del: false,
},
Text {
site: 125,
container_idx: 125,
pos: 125,
value: 32125,
is_del: true,
},
Text {
site: 107,
container_idx: 107,
pos: 107,
value: 27499,
is_del: true,
},
Text {
site: 107,
container_idx: 107,
pos: 107,
value: 27499,
is_del: true,
},
Text {
site: 107,
container_idx: 107,
pos: 107,
value: 27499,
is_del: true,
},
SyncAll,
SyncAll,
SyncAll,
SyncAll,
SyncAll,
SyncAll,
SyncAll,
SyncAll,
SyncAll,
SyncAll,
SyncAll,
Map {
site: 21,
container_idx: 21,
key: 21,
value: Null,
},
Map {
site: 21,
container_idx: 21,
key: 21,
value: Container(C::List),
},
SyncAll,
SyncAll,
SyncAll,
SyncAll,
SyncAll,
SyncAll,
SyncAll,
List {
site: 64,
@ -1446,9 +1340,16 @@ mod failed_tests {
key: 64,
value: Null,
},
List {
site: 64,
container_idx: 64,
SyncAll,
Map {
site: 21,
container_idx: 21,
key: 21,
value: Null,
},
Map {
site: 21,
container_idx: 125,
key: 125,
value: I32(2105376125),
},
@ -1463,63 +1364,129 @@ mod failed_tests {
site: 125,
container_idx: 125,
pos: 125,
value: 27517,
value: 32125,
is_del: true,
},
Text {
site: 107,
container_idx: 107,
pos: 107,
value: 27499,
site: 125,
container_idx: 125,
pos: 125,
value: 32125,
is_del: true,
},
SyncAll,
SyncAll,
Map {
site: 0,
container_idx: 255,
key: 64,
value: Null,
},
List {
site: 64,
container_idx: 64,
key: 64,
value: Null,
},
List {
site: 70,
container_idx: 255,
key: 255,
value: Container(C::Text),
},
Map {
site: 21,
container_idx: 21,
key: 21,
value: Null,
},
Sync { from: 155, to: 155 },
Sync { from: 155, to: 155 },
Map {
site: 21,
container_idx: 21,
key: 21,
value: Null,
},
Map {
site: 3,
container_idx: 3,
key: 3,
value: Null,
},
Map {
site: 3,
container_idx: 3,
key: 3,
value: Null,
},
Map {
site: 3,
container_idx: 3,
key: 3,
value: I32(353703189),
},
Map {
site: 1,
container_idx: 0,
key: 0,
value: I32(2105376125),
},
Text {
site: 125,
container_idx: 125,
pos: 125,
value: 32125,
is_del: true,
},
Sync { from: 155, to: 155 },
Sync { from: 155, to: 155 },
Sync { from: 155, to: 155 },
Sync { from: 155, to: 155 },
Sync { from: 155, to: 155 },
Sync { from: 155, to: 155 },
Sync { from: 155, to: 155 },
Sync { from: 155, to: 155 },
Sync { from: 155, to: 155 },
Text {
site: 125,
container_idx: 125,
pos: 125,
value: 32125,
is_del: true,
},
List {
site: 64,
container_idx: 64,
key: 64,
value: Null,
},
Text {
site: 125,
container_idx: 125,
pos: 125,
value: 32125,
is_del: true,
},
Text {
site: 107,
container_idx: 107,
pos: 107,
value: 27499,
site: 125,
container_idx: 125,
pos: 125,
value: 32125,
is_del: true,
},
Text {
site: 107,
container_idx: 107,
pos: 107,
value: 27499,
site: 125,
container_idx: 125,
pos: 125,
value: 32125,
is_del: true,
},
Text {
site: 107,
container_idx: 107,
pos: 107,
value: 27499,
is_del: true,
},
Text {
site: 107,
container_idx: 107,
pos: 107,
value: 27499,
is_del: true,
},
Text {
site: 107,
container_idx: 107,
pos: 107,
value: 27499,
is_del: true,
},
Text {
site: 107,
container_idx: 107,
pos: 107,
value: 27499,
is_del: true,
},
Text {
site: 107,
container_idx: 107,
pos: 64,
value: 16448,
site: 125,
container_idx: 64,
pos: 70,
value: 17990,
is_del: false,
},
List {
@ -1534,10 +1501,58 @@ mod failed_tests {
key: 64,
value: Null,
},
List {
site: 125,
container_idx: 125,
key: 125,
value: I32(2105376125),
},
List {
site: 70,
container_idx: 70,
key: 70,
value: Null,
},
List {
site: 64,
container_idx: 64,
key: 255,
key: 64,
value: Null,
},
List {
site: 64,
container_idx: 64,
key: 64,
value: Null,
},
Map {
site: 0,
container_idx: 0,
key: 0,
value: Null,
},
Map {
site: 0,
container_idx: 0,
key: 0,
value: Null,
},
Map {
site: 0,
container_idx: 0,
key: 0,
value: Null,
},
Map {
site: 0,
container_idx: 0,
key: 0,
value: Null,
},
Map {
site: 0,
container_idx: 0,
key: 0,
value: Null,
},
Map {

View file

@ -239,12 +239,6 @@ impl LogStore {
client_id: self.this_client_id,
counter: self.get_next_counter(self.this_client_id),
};
if id.counter > 0 {
let self_dep = id.inc(-1);
if !self.frontiers.contains(&self_dep) {
self.frontiers.push(self_dep);
}
}
let last = ops.last().unwrap();
let last_ctr = last.ctr_last();
let last_id = ID::new(self.this_client_id, last_ctr);

View file

@ -39,8 +39,13 @@ pub(super) struct ChangeEncoding {
#[columnar(strategy = "DeltaRle", original_type = "i64")]
pub(super) timestamp: Timestamp,
pub(super) op_len: u32,
/// The length of deps that exclude the dep on the same client
#[columnar(strategy = "Rle")]
pub(super) deps_len: u32,
/// Whether the change has a dep on the same client.
/// It can save lots of space by using this field instead of [`DepsEncoding`]
#[columnar(strategy = "BoolRle")]
pub(super) dep_on_self: bool,
}
#[columnar(vec, ser, de)]
@ -135,11 +140,18 @@ pub(super) fn encode_changes(store: &LogStore, vv: &VersionVector) -> Result<Vec
for change in diff_changes {
let client_idx = client_id_to_idx[&change.id.client_id];
let mut dep_on_self = false;
let mut deps_len = 0;
for dep in change.deps.iter() {
deps.push(DepsEncoding::new(
*client_id_to_idx.get(&dep.client_id).unwrap(),
dep.counter,
));
if change.id.client_id != dep.client_id {
deps.push(DepsEncoding::new(
*client_id_to_idx.get(&dep.client_id).unwrap(),
dep.counter,
));
deps_len += 1;
} else {
dep_on_self = true;
}
}
let mut op_len = 0;
@ -193,8 +205,9 @@ pub(super) fn encode_changes(store: &LogStore, vv: &VersionVector) -> Result<Vec
changes.push(ChangeEncoding {
client_idx: client_idx as ClientIdx,
timestamp: change.timestamp,
deps_len: change.deps.len() as u32,
deps_len,
op_len,
dep_on_self,
});
}
@ -252,6 +265,7 @@ pub(super) fn decode_changes_to_inner_format(
timestamp,
op_len,
deps_len,
dep_on_self,
} = change_encoding;
let client_id = clients[client_idx as usize];
@ -304,12 +318,15 @@ pub(super) fn decode_changes_to_inner_format(
ops.push(remote_op);
}
let deps: SmallVec<[ID; 2]> = (0..deps_len)
let mut deps: SmallVec<[ID; 2]> = (0..deps_len)
.map(|_| {
let raw = deps_iter.next().unwrap();
ID::new(clients[raw.client_idx as usize], raw.counter)
})
.collect();
if dep_on_self {
deps.push(ID::new(client_id, counter - 1));
}
let change = Change {
id: ID { client_id, counter },

View file

@ -2,11 +2,10 @@ use std::collections::VecDeque;
use fxhash::FxHashMap;
use itertools::Itertools;
use num::Zero;
use rle::{HasLength, RleVec, RleVecWithIndex};
use serde::{Deserialize, Serialize};
use serde_columnar::{columnar, from_bytes, to_vec};
use smallvec::smallvec;
use smallvec::{smallvec, SmallVec};
use crate::{
change::{Change, ChangeMergeCfg},
@ -222,20 +221,22 @@ pub(super) fn encode_snapshot(store: &LogStore, gc: bool) -> Result<Vec<u8>, Lor
let mut key_to_idx = FxHashMap::default();
let mut deps = Vec::with_capacity(change_num);
for (client_idx, (_, change_vec)) in store.changes.iter().enumerate() {
for (i, change) in change_vec.iter().enumerate() {
for change in change_vec.iter() {
let client_id = change.id.client_id;
let mut op_len = 0;
let mut this_deps_len = 0;
let mut deps_len = 0;
let mut dep_on_self = false;
for dep in change.deps.iter() {
// the first change will encode the self-client deps
if !i.is_zero() && dep.client_id == client_id {
continue;
if dep.client_id == client_id {
dep_on_self = true;
} else {
deps.push(DepsEncoding::new(
*client_id_to_idx.get(&dep.client_id).unwrap(),
dep.counter,
));
deps_len += 1;
}
deps.push(DepsEncoding::new(
*client_id_to_idx.get(&dep.client_id).unwrap(),
dep.counter,
));
this_deps_len += 1;
}
for op in change.ops.iter() {
let container_idx = op.container;
@ -264,7 +265,8 @@ pub(super) fn encode_snapshot(store: &LogStore, gc: bool) -> Result<Vec<u8>, Lor
changes.push(ChangeEncoding {
client_idx: client_idx as ClientIdx,
timestamp: change.timestamp,
deps_len: this_deps_len,
deps_len,
dep_on_self,
op_len: op_len as u32,
});
}
@ -366,24 +368,17 @@ pub(super) fn decode_snapshot(
for (_, this_changes_encoding) in &change_encodings.into_iter().group_by(|c| c.client_idx) {
let mut counter = 0;
let mut next_change_deps_counter = 0;
for (i, change_encoding) in this_changes_encoding.enumerate() {
for change_encoding in this_changes_encoding {
let ChangeEncoding {
client_idx,
timestamp,
op_len,
deps_len,
dep_on_self,
} = change_encoding;
let client_id = clients[client_idx as usize];
let mut ops = RleVec::<[Op; 2]>::new();
let mut deps = (0..deps_len)
.map(|_| {
let raw = deps_iter.next().unwrap();
ID::new(clients[raw.client_idx as usize], raw.counter)
})
.collect::<Vec<_>>();
let mut delta = 0;
for op in op_iter.by_ref().take(op_len as usize) {
let SnapshotOpEncoding {
@ -435,25 +430,23 @@ pub(super) fn decode_snapshot(
ops.push(op);
}
if i.is_zero() {
// first change
next_change_deps_counter = deps
.iter()
.filter(|&d| d.client_id == client_id)
.map(|d| d.counter)
.next()
.unwrap_or(0);
} else {
deps.push(ID::new(client_id, next_change_deps_counter - 1));
let mut deps = (0..deps_len)
.map(|_| {
let raw = deps_iter.next().unwrap();
ID::new(clients[raw.client_idx as usize], raw.counter)
})
.collect::<SmallVec<_>>();
if dep_on_self {
deps.push(ID::new(client_id, counter - 1));
}
next_change_deps_counter += delta;
let change = Change {
id: ID { client_id, counter },
// cal lamport after parsing all changes
lamport: 0,
timestamp,
ops,
deps: deps.into(),
deps,
};
counter += delta;

View file

@ -2,14 +2,12 @@ use std::sync::{Arc, Mutex, RwLock};
use crate::{
container::ContainerID,
dag::Dag,
event::{ObserverHandler, RawEvent},
hierarchy::Hierarchy,
log_store::{EncodeConfig, LoroEncoder},
LoroError, LoroValue,
};
use fxhash::{FxHashMap, FxHashSet};
use rle::HasLength;
use tracing::instrument;
use crate::{

View file

@ -1,7 +1,6 @@
use std::{
collections::HashSet,
fmt::{Debug, Error, Formatter},
mem::transmute,
ops::DerefMut,
};
@ -498,7 +497,7 @@ impl<'a, T: Rle, A: RleTreeTrait<T>> InternalNode<'a, T, A> {
if result.is_err() && self.is_root() {
#[allow(clippy::unnecessary_unwrap)]
let (update, new_vec) = result.unwrap_err();
let (_update, new_vec) = result.unwrap_err();
{
// create level
let mut origin_root = self.bump.allocate(Node::Internal(InternalNode::new(

View file

@ -337,7 +337,7 @@ impl<'bump, T: Rle, A: RleTreeTrait<T>> LeafNode<'bump, T, A> {
notify,
true,
)
.map(|x| ())
.map(|_x| ())
.map_err(|(_, new)| new)
.unwrap();
Err((x + update, new))

View file

@ -423,7 +423,7 @@ impl<T: Rle + HasIndex, const MAX_CHILD: usize, TreeArena: Arena> RleTreeTrait<T
Zero
}
fn update_cache_internal(node: &mut InternalNode<'_, T, Self>, hint: Option<Zero>) -> Zero {
fn update_cache_internal(node: &mut InternalNode<'_, T, Self>, _hint: Option<Zero>) -> Zero {
if node.children.is_empty() {
node.cache.end = node.cache.start;
return Zero;
@ -608,7 +608,7 @@ impl<T: Rle + HasIndex, const MAX_CHILD: usize, TreeArena: Arena> RleTreeTrait<T
node.children[child_index].get_start_index()
}
fn value_to_update(x: &T) -> Self::CacheInParent {
fn value_to_update(_x: &T) -> Self::CacheInParent {
Zero
}
}