fix: remove lamport from snapshot

This commit is contained in:
leeeon233 2023-02-17 09:55:57 +08:00 committed by Leonzhao
parent 7c8aa72969
commit d87b3b960d
3 changed files with 68 additions and 28 deletions

View file

@ -1,10 +1,13 @@
use bench_utils::{get_automerge_actions, TextAction}; use bench_utils::{get_automerge_actions, TextAction};
use loro_internal::{log_store::EncodeConfig, LoroCore, VersionVector}; use loro_internal::{log_store::EncodeConfig, LoroCore, VersionVector};
use rand::{rngs::StdRng, Rng, SeedableRng};
fn main() { fn main() {
let mut rng: StdRng = SeedableRng::seed_from_u64(1);
let actions = get_automerge_actions(); let actions = get_automerge_actions();
let mut loro = LoroCore::default(); let mut loro = LoroCore::new(Default::default(), Some(1));
let mut loro_b = LoroCore::default(); let mut loro_b = LoroCore::new(Default::default(), Some(2));
let mut i = 0; let mut i = 0;
for TextAction { pos, ins, del } in actions.iter() { for TextAction { pos, ins, del } in actions.iter() {
i += 1; i += 1;
@ -17,11 +20,17 @@ fn main() {
text.delete(&loro, pos, del).unwrap(); text.delete(&loro, pos, del).unwrap();
text.insert(&loro, pos, ins).unwrap(); text.insert(&loro, pos, ins).unwrap();
let mut text = loro_b.get_text("text"); let mut text = loro_b.get_text("text");
text.delete(&loro_b, pos, del).unwrap(); let r = rng.gen_range(1..11);
text.insert(&loro_b, pos, ins).unwrap(); for _ in 0..r {
text.delete(&loro_b, pos, del).unwrap();
text.insert(&loro_b, pos, ins).unwrap();
}
loro_b.import(loro.export(loro_b.vv_cloned())); loro_b.import(loro.export(loro_b.vv_cloned()));
loro.import(loro_b.export(loro.vv_cloned())); loro.import(loro_b.export(loro.vv_cloned()));
} }
let encoded = loro.encode_with_cfg(EncodeConfig::rle_update(VersionVector::new())); let encoded =
loro.encode_with_cfg(EncodeConfig::rle_update(VersionVector::new()).without_compress());
println!("parallel doc size {} bytes", encoded.len());
let encoded = loro.encode_with_cfg(EncodeConfig::snapshot().without_compress());
println!("parallel doc size {} bytes", encoded.len()); println!("parallel doc size {} bytes", encoded.len());
} }

View file

@ -331,12 +331,12 @@ pub(super) fn decode_changes_to_inner_format(
let mut lamport_map = FxHashMap::default(); let mut lamport_map = FxHashMap::default();
let mut changes_ans = FxHashMap::default(); let mut changes_ans = FxHashMap::default();
// calculate lamport // calculate lamport
let mut q: VecDeque<_> = changes.keys().copied().collect(); let mut client_ids: VecDeque<_> = changes.keys().copied().collect();
let len = q.len(); let len = client_ids.len();
let mut loop_time = len; let mut loop_time = len;
while let Some(client_id) = q.pop_front() { while let Some(client_id) = client_ids.pop_front() {
let dq = changes.get_mut(&client_id).unwrap(); let this_client_changes = changes.get_mut(&client_id).unwrap();
while let Some(mut change) = dq.pop_front() { while let Some(mut change) = this_client_changes.pop_front() {
match get_lamport_by_deps(&change.deps, &lamport_map, Some(store)) { match get_lamport_by_deps(&change.deps, &lamport_map, Some(store)) {
Ok(lamport) => { Ok(lamport) => {
change.lamport = lamport; change.lamport = lamport;
@ -351,8 +351,8 @@ pub(super) fn decode_changes_to_inner_format(
loop_time = len; loop_time = len;
} }
Err(_not_found_client) => { Err(_not_found_client) => {
dq.push_front(change); this_client_changes.push_front(change);
q.push_back(client_id); client_ids.push_back(client_id);
loop_time -= 1; loop_time -= 1;
if loop_time == 0 { if loop_time == 0 {
unreachable!(); unreachable!();
@ -368,7 +368,7 @@ pub(super) fn decode_changes_to_inner_format(
} }
// snapshot store is None // snapshot store is None
pub(super) fn get_lamport_by_deps( pub(crate) fn get_lamport_by_deps(
deps: &SmallVec<[ID; 2]>, deps: &SmallVec<[ID; 2]>,
lamport_map: &FxHashMap<ClientID, Vec<(Range<Counter>, Lamport)>>, lamport_map: &FxHashMap<ClientID, Vec<(Range<Counter>, Lamport)>>,
store: Option<&LogStore>, store: Option<&LogStore>,

View file

@ -1,3 +1,5 @@
use std::collections::VecDeque;
use fxhash::FxHashMap; use fxhash::FxHashMap;
use itertools::Itertools; use itertools::Itertools;
use rle::{HasLength, RleVec, RleVecWithIndex}; use rle::{HasLength, RleVec, RleVecWithIndex};
@ -6,7 +8,7 @@ use serde_columnar::{columnar, from_bytes, to_vec};
use smallvec::smallvec; use smallvec::smallvec;
use crate::{ use crate::{
change::{Change, ChangeMergeCfg, Lamport}, change::{Change, ChangeMergeCfg},
container::text::text_content::SliceRange, container::text::text_content::SliceRange,
container::{ container::{
list::list_op::{DeleteSpan, InnerListOp}, list::list_op::{DeleteSpan, InnerListOp},
@ -19,7 +21,7 @@ use crate::{
event::RawEvent, event::RawEvent,
hierarchy::Hierarchy, hierarchy::Hierarchy,
id::{ClientID, Counter, ID}, id::{ClientID, Counter, ID},
log_store::ImportContext, log_store::{encoding::encode_changes::get_lamport_by_deps, ImportContext},
op::{InnerContent, Op}, op::{InnerContent, Op},
span::{HasIdSpan, HasLamportSpan}, span::{HasIdSpan, HasLamportSpan},
version::TotalOrderStamp, version::TotalOrderStamp,
@ -145,7 +147,6 @@ pub(super) struct SnapshotEncoded {
container_states: Vec<EncodedStateContent>, container_states: Vec<EncodedStateContent>,
keys: Vec<InternalString>, keys: Vec<InternalString>,
start_counter: Vec<Counter>, start_counter: Vec<Counter>,
start_lamport: Vec<Lamport>,
} }
const ENCODED_UNKNOWN_SLICE: i64 = -2; const ENCODED_UNKNOWN_SLICE: i64 = -2;
@ -196,14 +197,12 @@ pub(super) fn encode_snapshot(store: &LogStore, gc: bool) -> Result<Vec<u8>, Lor
let mut clients = Vec::with_capacity(store.changes.len()); let mut clients = Vec::with_capacity(store.changes.len());
let mut change_num = 0; let mut change_num = 0;
let mut start_counter = Vec::new(); let mut start_counter = Vec::new();
let mut start_lamport = Vec::new();
for (key, changes) in store.changes.iter() { for (key, changes) in store.changes.iter() {
client_id_to_idx.insert(*key, clients.len() as ClientIdx); client_id_to_idx.insert(*key, clients.len() as ClientIdx);
clients.push(*key); clients.push(*key);
change_num += changes.merged_len(); change_num += changes.merged_len();
start_counter.push(changes.first().unwrap().id.counter); start_counter.push(changes.first().unwrap().id.counter);
start_lamport.push(changes.first().unwrap().lamport);
} }
let (_, containers) = store.reg.export(); let (_, containers) = store.reg.export();
@ -286,7 +285,6 @@ pub(super) fn encode_snapshot(store: &LogStore, gc: bool) -> Result<Vec<u8>, Lor
container_states, container_states,
keys, keys,
start_counter, start_counter,
start_lamport,
}; };
to_vec(&encoded).map_err(|e| LoroError::DecodeError(e.to_string().into())) to_vec(&encoded).map_err(|e| LoroError::DecodeError(e.to_string().into()))
} }
@ -307,7 +305,6 @@ pub(super) fn decode_snapshot(
container_states, container_states,
keys, keys,
start_counter, start_counter,
start_lamport,
} = encoded; } = encoded;
if change_encodings.is_empty() { if change_encodings.is_empty() {
@ -321,7 +318,7 @@ pub(super) fn decode_snapshot(
} }
let mut op_iter = ops.into_iter(); let mut op_iter = ops.into_iter();
let mut changes = FxHashMap::default(); let mut changes_dq = FxHashMap::default();
let mut deps_iter = deps.into_iter(); let mut deps_iter = deps.into_iter();
let mut container_idx2type = FxHashMap::default(); let mut container_idx2type = FxHashMap::default();
@ -334,7 +331,6 @@ pub(super) fn decode_snapshot(
&change_encodings.into_iter().group_by(|c| c.client_idx) &change_encodings.into_iter().group_by(|c| c.client_idx)
{ {
let mut counter = start_counter[client_idx as usize]; let mut counter = start_counter[client_idx as usize];
let mut lamport = start_lamport[client_idx as usize];
for change_encoding in this_change_encodings { for change_encoding in this_change_encodings {
let ChangeEncoding { let ChangeEncoding {
client_idx, client_idx,
@ -406,19 +402,54 @@ pub(super) fn decode_snapshot(
let change = Change { let change = Change {
id: ID { client_id, counter }, id: ID { client_id, counter },
lamport, // cal lamport after parsing all changes
lamport: 0,
timestamp, timestamp,
ops, ops,
deps, deps,
}; };
counter += delta; counter += delta;
lamport += delta as u32;
changes changes_dq
.entry(client_id) .entry(client_id)
.or_insert_with(|| RleVecWithIndex::new_with_conf(ChangeMergeCfg::new())) .or_insert_with(VecDeque::new)
.push(change); .push_back(change)
}
}
// calculate lamport
let mut lamport_map = FxHashMap::default();
let mut changes = FxHashMap::default();
// calculate lamport
let mut client_ids: VecDeque<_> = changes_dq.keys().copied().collect();
let len = client_ids.len();
let mut loop_time = len;
while let Some(client_id) = client_ids.pop_front() {
let this_client_changes = changes_dq.get_mut(&client_id).unwrap();
while let Some(mut change) = this_client_changes.pop_front() {
match get_lamport_by_deps(&change.deps, &lamport_map, None) {
Ok(lamport) => {
change.lamport = lamport;
lamport_map.entry(client_id).or_insert_with(Vec::new).push((
change.id.counter..change.id.counter + change.content_len() as Counter,
lamport,
));
changes
.entry(client_id)
.or_insert_with(|| RleVecWithIndex::new_with_conf(ChangeMergeCfg::new()))
.push(change);
loop_time = len;
}
Err(_not_found_client) => {
this_client_changes.push_front(change);
client_ids.push_back(client_id);
loop_time -= 1;
if loop_time == 0 {
unreachable!();
}
break;
}
}
} }
} }
@ -427,7 +458,7 @@ pub(super) fn decode_snapshot(
.map(|changes| changes.last().unwrap().id_last()) .map(|changes| changes.last().unwrap().id_last())
.collect(); .collect();
debug_log::debug_dbg!(&vv, &changes); debug_log::debug_dbg!(&vv, &changes_dq);
let can_load = match vv.partial_cmp(&store.vv) { let can_load = match vv.partial_cmp(&store.vv) {
Some(ord) => match ord { Some(ord) => match ord {