perf: remove the dep with same client_id

This commit is contained in:
leeeon233 2023-02-18 19:54:58 +08:00
parent 3a75b7005c
commit 697f9708b9
2 changed files with 29 additions and 78 deletions

View file

@ -629,6 +629,7 @@ fn check_eq(a_actor: &mut Actor, b_actor: &mut Actor) {
for (la, lb) in ca.iter().zip(cb.iter()) { for (la, lb) in ca.iter().zip(cb.iter()) {
assert_eq!(la.lamport, lb.lamport); assert_eq!(la.lamport, lb.lamport);
assert_eq!(la.id, lb.id); assert_eq!(la.id, lb.id);
assert!(!la.deps.iter().any(|u| !lb.deps.contains(u)))
} }
} }
} }

View file

@ -2,6 +2,7 @@ use std::collections::VecDeque;
use fxhash::FxHashMap; use fxhash::FxHashMap;
use itertools::Itertools; use itertools::Itertools;
use num::Zero;
use rle::{HasLength, RleVec, RleVecWithIndex}; use rle::{HasLength, RleVec, RleVecWithIndex};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use serde_columnar::{columnar, from_bytes, to_vec}; use serde_columnar::{columnar, from_bytes, to_vec};
@ -220,13 +221,20 @@ pub(super) fn encode_snapshot(store: &LogStore, gc: bool) -> Result<Vec<u8>, Lor
let mut key_to_idx = FxHashMap::default(); let mut key_to_idx = FxHashMap::default();
let mut deps = Vec::with_capacity(change_num); let mut deps = Vec::with_capacity(change_num);
for (client_idx, (_, change_vec)) in store.changes.iter().enumerate() { for (client_idx, (_, change_vec)) in store.changes.iter().enumerate() {
for change in change_vec.iter() { for (i, change) in change_vec.iter().enumerate() {
let client_id = change.id.client_id;
let mut op_len = 0; let mut op_len = 0;
let mut this_deps_len = 0;
for dep in change.deps.iter() { for dep in change.deps.iter() {
// the first change will encode the self-client deps
if !i.is_zero() && dep.client_id == client_id {
continue;
}
deps.push(DepsEncoding::new( deps.push(DepsEncoding::new(
*client_id_to_idx.get(&dep.client_id).unwrap(), *client_id_to_idx.get(&dep.client_id).unwrap(),
dep.counter, dep.counter,
)); ));
this_deps_len += 1;
} }
for op in change.ops.iter() { for op in change.ops.iter() {
let container_idx = op.container; let container_idx = op.container;
@ -254,7 +262,7 @@ pub(super) fn encode_snapshot(store: &LogStore, gc: bool) -> Result<Vec<u8>, Lor
changes.push(ChangeEncoding { changes.push(ChangeEncoding {
client_idx: client_idx as ClientIdx, client_idx: client_idx as ClientIdx,
timestamp: change.timestamp, timestamp: change.timestamp,
deps_len: change.deps.len() as u32, deps_len: this_deps_len,
op_len: op_len as u32, op_len: op_len as u32,
}); });
} }
@ -273,24 +281,6 @@ pub(super) fn encode_snapshot(store: &LogStore, gc: bool) -> Result<Vec<u8>, Lor
.into_encoded(&key_to_idx, &client_id_to_idx) .into_encoded(&key_to_idx, &client_id_to_idx)
}) })
.collect::<Vec<_>>(); .collect::<Vec<_>>();
println!(
"changes len {} ops len {} deps len {} container len {} keys len {}",
changes.len(),
ops.len(),
deps.len(),
containers.len(),
keys.len()
);
// let encoded = SnapshotEncoded {
// changes: Default::default(),
// ops: Default::default(),
// deps: Default::default(),
// clients: Default::default(),
// containers: Default::default(),
// container_states: Default::default(),
// keys: Default::default(),
// };
let encoded = SnapshotEncoded { let encoded = SnapshotEncoded {
changes, changes,
ops, ops,
@ -300,7 +290,6 @@ pub(super) fn encode_snapshot(store: &LogStore, gc: bool) -> Result<Vec<u8>, Lor
container_states, container_states,
keys, keys,
}; };
println!("{:?}", encoded);
to_vec(&encoded).map_err(|e| LoroError::DecodeError(e.to_string().into())) to_vec(&encoded).map_err(|e| LoroError::DecodeError(e.to_string().into()))
} }
@ -341,9 +330,10 @@ pub(super) fn decode_snapshot(
container_idx2type.insert(container_idx, container_id.container_type()); container_idx2type.insert(container_idx, container_id.container_type());
} }
for (_, this_change_encodings) in &change_encodings.into_iter().group_by(|c| c.client_idx) { for (_, this_changes_encoding) in &change_encodings.into_iter().group_by(|c| c.client_idx) {
let mut counter = 0; let mut counter = 0;
for change_encoding in this_change_encodings { let mut next_change_deps_counter = 0;
for (i, change_encoding) in this_changes_encoding.enumerate() {
let ChangeEncoding { let ChangeEncoding {
client_idx, client_idx,
timestamp, timestamp,
@ -353,12 +343,12 @@ pub(super) fn decode_snapshot(
let client_id = clients[client_idx as usize]; let client_id = clients[client_idx as usize];
let mut ops = RleVec::<[Op; 2]>::new(); let mut ops = RleVec::<[Op; 2]>::new();
let deps = (0..deps_len) let mut deps = (0..deps_len)
.map(|_| { .map(|_| {
let raw = deps_iter.next().unwrap(); let raw = deps_iter.next().unwrap();
ID::new(clients[raw.client_idx as usize], raw.counter) ID::new(clients[raw.client_idx as usize], raw.counter)
}) })
.collect(); .collect::<Vec<_>>();
let mut delta = 0; let mut delta = 0;
for op in op_iter.by_ref().take(op_len as usize) { for op in op_iter.by_ref().take(op_len as usize) {
@ -412,13 +402,25 @@ pub(super) fn decode_snapshot(
ops.push(op); ops.push(op);
} }
if i.is_zero() {
// first change
next_change_deps_counter = deps
.iter()
.filter(|&d| d.client_id == client_id)
.map(|d| d.counter)
.next()
.unwrap_or(0);
} else {
deps.push(ID::new(client_id, next_change_deps_counter));
}
next_change_deps_counter += delta;
let change = Change { let change = Change {
id: ID { client_id, counter }, id: ID { client_id, counter },
// cal lamport after parsing all changes // cal lamport after parsing all changes
lamport: 0, lamport: 0,
timestamp, timestamp,
ops, ops,
deps, deps: deps.into(),
}; };
counter += delta; counter += delta;
@ -586,56 +588,4 @@ mod test {
loro2.decode(&snapshot).unwrap(); loro2.decode(&snapshot).unwrap();
assert_eq!(text2.get_value().to_json_pretty(), "\"abcefg\""); assert_eq!(text2.get_value().to_json_pretty(), "\"abcefg\"");
} }
#[test]
fn analyze_parallel_doc_size_large() {
let actions = vec![
("i", 0, "a"),
("i", 0, "b"),
("i", 0, "c"),
("i", 0, "d"),
("i", 0, "e"),
("i", 0, "f"),
// ("i", 0, "g"),
// ("i", 0, "h"),
// ("i", 0, "i"),
// ("i", 0, "j"),
// ("i", 0, "k"),
// ("i", 0, "l"),
];
let mut action_iter = actions.clone().into_iter();
let mut loro = LoroCore::new(Default::default(), Some(1));
let mut text = loro.get_text("text");
while let Some((action, pos, c)) = action_iter.next() {
match action {
"i" => text.insert(&loro, pos, c).unwrap(),
_ => unreachable!(),
}
}
let single = loro.encode_with_cfg(EncodeConfig::snapshot().without_compress());
println!("size {}", single.len());
let mut action_iter = actions.into_iter();
let mut loro = LoroCore::new(Default::default(), Some(1));
let mut loro2 = LoroCore::new(Default::default(), Some(2));
let mut text = loro.get_text("text");
let mut text2 = loro2.get_text("text");
while let Some((action, pos, c)) = action_iter.next() {
match action {
"i" => text.insert(&loro, pos, c).unwrap(),
_ => unreachable!(),
}
if let Some((action, pos, c)) = action_iter.next() {
match action {
"i" => text2.insert(&loro2, pos, c).unwrap(),
_ => unreachable!(),
}
}
loro2.decode(&loro.encode_from(loro2.vv_cloned())).unwrap();
loro.decode(&loro2.encode_from(loro.vv_cloned())).unwrap();
}
let parallel = loro.encode_with_cfg(EncodeConfig::snapshot().without_compress());
println!("size {}", parallel.len());
}
} }