From b8e27dc011f4b708ca913f5900b08fbc2464bd4c Mon Sep 17 00:00:00 2001 From: leeeon233 Date: Sat, 18 Feb 2023 21:25:29 +0800 Subject: [PATCH] fix: dep counter --- crates/loro-internal/src/fuzz/recursive.rs | 20 +-- crates/loro-internal/src/log_store.rs | 6 + .../src/log_store/encoding/encode_snapshot.rs | 156 ++++++++++++------ 3 files changed, 125 insertions(+), 57 deletions(-) diff --git a/crates/loro-internal/src/fuzz/recursive.rs b/crates/loro-internal/src/fuzz/recursive.rs index 1ad1b186..004a72d3 100644 --- a/crates/loro-internal/src/fuzz/recursive.rs +++ b/crates/loro-internal/src/fuzz/recursive.rs @@ -641,12 +641,14 @@ fn check_synced(sites: &mut [Actor]) { let (a, b) = array_mut_ref!(sites, [i, j]); let a_doc = &mut a.loro; let b_doc = &mut b.loro; - a_doc - .decode(&b_doc.encode_with_cfg(EncodeConfig::rle_update(a_doc.vv_cloned()))) - .unwrap(); - b_doc - .decode(&a_doc.encode_with_cfg(EncodeConfig::rle_update(b_doc.vv_cloned()))) - .unwrap(); + // a_doc + // .decode(&b_doc.encode_with_cfg(EncodeConfig::rle_update(a_doc.vv_cloned()))) + // .unwrap(); + // b_doc + // .decode(&a_doc.encode_with_cfg(EncodeConfig::rle_update(b_doc.vv_cloned()))) + // .unwrap(); + a_doc.decode(&b_doc.encode_all()).unwrap(); + b_doc.decode(&a_doc.encode_all()).unwrap(); check_eq(a, b); debug_log::group_end!(); } @@ -681,7 +683,6 @@ pub fn normalize(site_num: u8, actions: &mut [Action]) -> Vec { } pub fn test_multi_sites(site_num: u8, actions: &mut [Action]) { - // println!("{:?}", actions); let mut sites = Vec::new(); for i in 0..site_num { sites.push(Actor::new(i as u64)); @@ -695,7 +696,6 @@ pub fn test_multi_sites(site_num: u8, actions: &mut [Action]) { sites.apply_action(action); } - // println!("{}", actions.table()); debug_log::group!("check synced"); check_synced(&mut sites); debug_log::group_end!(); @@ -984,7 +984,7 @@ mod failed_tests { #[test] fn list_unknown() { test_multi_sites( - 3, + 5, &mut [ List { site: 139, @@ -1018,7 +1018,7 @@ mod failed_tests { #[test] fn path_issue() { test_multi_sites( - 2, + 5, &mut [ List { site: 1, diff --git a/crates/loro-internal/src/log_store.rs b/crates/loro-internal/src/log_store.rs index ca6b5eb0..7e4d76a1 100644 --- a/crates/loro-internal/src/log_store.rs +++ b/crates/loro-internal/src/log_store.rs @@ -239,6 +239,12 @@ impl LogStore { client_id: self.this_client_id, counter: self.get_next_counter(self.this_client_id), }; + if id.counter > 0 { + let self_dep = id.inc(-1); + if !self.frontiers.contains(&self_dep) { + self.frontiers.push(self_dep); + } + } let last = ops.last().unwrap(); let last_ctr = last.ctr_last(); let last_id = ID::new(self.this_client_id, last_ctr); diff --git a/crates/loro-internal/src/log_store/encoding/encode_snapshot.rs b/crates/loro-internal/src/log_store/encoding/encode_snapshot.rs index a7f7ddb5..912d340e 100644 --- a/crates/loro-internal/src/log_store/encoding/encode_snapshot.rs +++ b/crates/loro-internal/src/log_store/encoding/encode_snapshot.rs @@ -18,7 +18,7 @@ use crate::{ registry::ContainerIdx, Container, ContainerID, }, - dag::remove_included_frontiers, + dag::{remove_included_frontiers, Dag}, event::RawEvent, hierarchy::Hierarchy, id::{ClientID, Counter, ID}, @@ -123,8 +123,8 @@ impl EncodedStateContent { #[columnar(vec, ser, de)] #[derive(Debug, Clone, Serialize, Deserialize)] struct SnapshotOpEncoding { - #[columnar(strategy = "Rle", original_type = "u32")] - container: u32, + #[columnar(strategy = "Rle", original_type = "usize")] + container: usize, /// key index or insert/delete pos #[columnar(strategy = "DeltaRle")] prop: usize, @@ -196,6 +196,7 @@ pub(super) fn encode_snapshot(store: &LogStore, gc: bool) -> Result, Lor debug_log::debug_dbg!(&store.vv); debug_log::debug_dbg!(&store.changes); let mut client_id_to_idx: FxHashMap = FxHashMap::default(); + let mut container_id_to_idx: FxHashMap = FxHashMap::default(); let mut clients = Vec::with_capacity(store.changes.len()); let mut change_num = 0; for (key, changes) in store.changes.iter() { @@ -205,7 +206,7 @@ pub(super) fn encode_snapshot(store: &LogStore, gc: bool) -> Result, Lor } let (_, containers) = store.reg.export(); - for container_id in containers.iter() { + for (idx, container_id) in containers.iter().enumerate() { let container = store.reg.get(container_id).unwrap(); container .upgrade() @@ -213,6 +214,7 @@ pub(super) fn encode_snapshot(store: &LogStore, gc: bool) -> Result, Lor .try_lock() .unwrap() .initialize_pool_mapping(); + container_id_to_idx.insert(container_id.clone(), idx); } let mut changes = Vec::with_capacity(change_num); @@ -239,6 +241,7 @@ pub(super) fn encode_snapshot(store: &LogStore, gc: bool) -> Result, Lor for op in change.ops.iter() { let container_idx = op.container; let container_id = store.reg.get_id(container_idx).unwrap(); + let container_idx = container_id_to_idx.get(container_id).unwrap(); let container = store.reg.get(container_id).unwrap(); let new_ops = container .upgrade() @@ -251,7 +254,7 @@ pub(super) fn encode_snapshot(store: &LogStore, gc: bool) -> Result, Lor let (prop, value, value2) = convert_inner_content(&op_content, &mut key_to_idx, &mut keys); ops.push(SnapshotOpEncoding { - container: container_idx.to_u32(), + container: *container_idx, prop, value, value2, @@ -319,15 +322,44 @@ pub(super) fn decode_snapshot( } return Ok(vec![]); } + let mut idx_to_container_type = FxHashMap::default(); + for (idx, container_id) in containers.iter().enumerate() { + idx_to_container_type.insert(idx, container_id.container_type()); + } + + // calc vv + let vv = calc_vv(&change_encodings, &ops, &clients, &idx_to_container_type); + let can_load = match vv.partial_cmp(&store.vv) { + Some(ord) => match ord { + std::cmp::Ordering::Less => { + // TODO warning + println!("[Warning] the vv of encoded snapshot is smaller than self, no change is applied"); + return Ok(vec![]); + } + std::cmp::Ordering::Equal => return Ok(vec![]), + std::cmp::Ordering::Greater => store.vv.is_empty(), + }, + None => false, + }; let mut op_iter = ops.into_iter(); let mut changes_dq = FxHashMap::default(); let mut deps_iter = deps.into_iter(); - let mut container_idx2type = FxHashMap::default(); + let mut idx_to_container_idx = FxHashMap::default(); - for container_id in containers.iter() { - let container_idx = store.reg.get_or_create_container_idx(container_id); - container_idx2type.insert(container_idx, container_id.container_type()); + // the container_idx need to be calculated first + // because the op needs the corresponding container (in new or old store) + let new_loro = LoroCore::default(); + let mut new_store = new_loro.log_store.try_write().unwrap(); + let mut new_hierarchy = new_loro.hierarchy.try_lock().unwrap(); + + for (idx, container_id) in containers.iter().enumerate() { + let container_idx = if can_load { + store.reg.get_or_create_container_idx(container_id) + } else { + new_store.reg.get_or_create_container_idx(container_id) + }; + idx_to_container_idx.insert(idx, container_idx); } for (_, this_changes_encoding) in &change_encodings.into_iter().group_by(|c| c.client_idx) { @@ -359,8 +391,7 @@ pub(super) fn decode_snapshot( value2, } = op; - let container_idx = ContainerIdx::from_u32(container_idx); - let container_type = container_idx2type[&container_idx]; + let container_type = idx_to_container_type[&container_idx]; let content = match container_type { ContainerType::Map => { let key = keys[prop].clone(); @@ -395,7 +426,7 @@ pub(super) fn decode_snapshot( }; let op = Op { counter: counter + delta, - container: container_idx, + container: idx_to_container_idx[&container_idx], content, }; delta += op.content_len() as i32; @@ -411,7 +442,7 @@ pub(super) fn decode_snapshot( .next() .unwrap_or(0); } else { - deps.push(ID::new(client_id, next_change_deps_counter)); + deps.push(ID::new(client_id, next_change_deps_counter - 1)); } next_change_deps_counter += delta; let change = Change { @@ -466,22 +497,6 @@ pub(super) fn decode_snapshot( } } - let vv: VersionVector = changes - .values() - .map(|changes| changes.last().unwrap().id_last()) - .collect(); - - debug_log::debug_dbg!(&vv, &changes_dq); - - let can_load = match vv.partial_cmp(&store.vv) { - Some(ord) => match ord { - std::cmp::Ordering::Less => false, - std::cmp::Ordering::Equal => return Ok(vec![]), - std::cmp::Ordering::Greater => true, - }, - None => false, - }; - if can_load { let mut import_context = load_snapshot( store, @@ -490,15 +505,11 @@ pub(super) fn decode_snapshot( changes, containers, container_states, - container_idx2type, &keys, &clients, ); Ok(store.get_events(hierarchy, &mut import_context)) } else { - let new_loro = LoroCore::default(); - let mut new_store = new_loro.log_store.try_write().unwrap(); - let mut new_hierarchy = new_loro.hierarchy.try_lock().unwrap(); load_snapshot( &mut new_store, &mut new_hierarchy, @@ -506,7 +517,6 @@ pub(super) fn decode_snapshot( changes, containers, container_states, - container_idx2type, &keys, &clients, ); @@ -523,7 +533,6 @@ fn load_snapshot( changes: FxHashMap>, containers: Vec, container_states: Vec, - mut container_idx2type: FxHashMap, keys: &[InternalString], clients: &[u64], ) -> ImportContext { @@ -538,17 +547,18 @@ fn load_snapshot( let mut import_context = ImportContext { old_frontiers: smallvec![], new_frontiers: frontiers.get_frontiers(), - old_vv: VersionVector::new(), + old_vv: new_store.vv(), spans: vv.diff(&new_store.vv).left, new_vv: vv.clone(), diff: Default::default(), patched_old_vv: None, }; for (container_id, pool_mapping) in containers.into_iter().zip(container_states.into_iter()) { - let container_idx = new_store.reg.get_or_create_container_idx(&container_id); - container_idx2type.insert(container_idx, container_id.container_type()); let state = pool_mapping.into_state(keys, clients); - let container = new_store.reg.get_by_idx(container_idx).unwrap(); + let container = new_store + .get_or_create_container(&container_id) + .upgrade() + .unwrap(); let mut container = container.try_lock().unwrap(); container.to_import_snapshot(state, new_hierarchy, &mut import_context); } @@ -571,21 +581,73 @@ fn load_snapshot( import_context } +fn calc_vv( + changes_encoding: &[ChangeEncoding], + ops_encoding: &[SnapshotOpEncoding], + clients: &[ClientID], + idx_to_container_type: &FxHashMap, +) -> VersionVector { + let mut vv = FxHashMap::default(); + let mut op_iter = ops_encoding.iter(); + for (client_idx, this_changes_encoding) in &changes_encoding.iter().group_by(|c| c.client_idx) { + let client_id = clients[client_idx as usize]; + let mut counter = 0; + for change_encoding in this_changes_encoding { + let op_len = change_encoding.op_len; + let mut delta = 0; + for op in op_iter.by_ref().take(op_len as usize) { + let SnapshotOpEncoding { + container: container_idx, + prop: _, + value, + value2, + } = *op; + let container_type = idx_to_container_type[&container_idx]; + let op_content_len = match container_type { + ContainerType::Map => 1, + _ => { + let is_del = value2 == ENCODED_DELETED_CONTENT; + if is_del { + value + } else { + let is_unknown = value2 == ENCODED_UNKNOWN_SLICE; + if is_unknown { + value + } else { + value2 as u64 + } + } + } + }; + delta += op_content_len as i32; + } + counter += delta; + } + vv.insert(client_id, counter); + } + vv.into() +} + #[cfg(test)] mod test { - use crate::{log_store::EncodeConfig, LoroCore}; + use crate::{ContainerType, LoroCore}; #[test] fn cannot_load() { let mut loro = LoroCore::new(Default::default(), Some(1)); - let mut text = loro.get_text("text"); - text.insert(&loro, 0, "abc").unwrap(); - let snapshot = loro.encode_all(); + let mut list = loro.get_list("list"); + list.insert(&loro, 0, ContainerType::Text).unwrap(); let mut loro2 = LoroCore::new(Default::default(), Some(2)); - let mut text2 = loro2.get_text("text"); - text2.insert(&loro2, 0, "efg").unwrap(); - loro2.decode(&snapshot).unwrap(); - assert_eq!(text2.get_value().to_json_pretty(), "\"abcefg\""); + loro2.import(loro.export(loro2.vv_cloned())); + + let mut list2 = loro2.get_list("list"); + list2.delete(&loro2, 0, 1).unwrap(); + loro.import(loro2.export(loro.vv_cloned())); + + let mut list = loro.get_list("list"); + list.insert(&loro, 0, "abc").unwrap(); + loro2.decode(&loro.encode_all()).unwrap(); + loro.decode(&loro2.encode_all()).unwrap(); } }