From 0c59bd2dccca19d1c72214d97967392ce16d1e51 Mon Sep 17 00:00:00 2001 From: Zixuan Chen Date: Sun, 18 Aug 2024 14:34:27 +0800 Subject: [PATCH] refactor: simplify dag and inserting change --- .../src/encoding/encode_reordered.rs | 7 ++--- .../loro-internal/src/encoding/json_schema.rs | 20 +++++++++----- crates/loro-internal/src/loro.rs | 12 ++++++++- crates/loro-internal/src/oplog.rs | 27 +++++++++---------- crates/loro-internal/src/oplog/loro_dag.rs | 25 +++++++++-------- .../src/oplog/pending_changes.rs | 6 ++--- 6 files changed, 52 insertions(+), 45 deletions(-) diff --git a/crates/loro-internal/src/encoding/encode_reordered.rs b/crates/loro-internal/src/encoding/encode_reordered.rs index f36b4327..f9ed23e7 100644 --- a/crates/loro-internal/src/encoding/encode_reordered.rs +++ b/crates/loro-internal/src/encoding/encode_reordered.rs @@ -243,11 +243,8 @@ pub(crate) fn import_changes_to_oplog( let Some(change) = oplog.trim_the_known_part_of_change(change) else { continue; }; - // update dag and push the change - let mark = oplog.update_dag_on_new_change(&change); - oplog.next_lamport = oplog.next_lamport.max(change.lamport_end()); - oplog.latest_timestamp = oplog.latest_timestamp.max(change.timestamp); - oplog.insert_new_change(change, mark); + + oplog.insert_new_change(change); } Ok((latest_ids, pending_changes)) diff --git a/crates/loro-internal/src/encoding/json_schema.rs b/crates/loro-internal/src/encoding/json_schema.rs index ec91a442..8be1d95e 100644 --- a/crates/loro-internal/src/encoding/json_schema.rs +++ b/crates/loro-internal/src/encoding/json_schema.rs @@ -1,7 +1,9 @@ use std::sync::Arc; use either::Either; -use loro_common::{ContainerID, ContainerType, IdLp, LoroResult, LoroValue, PeerID, TreeID, ID}; +use loro_common::{ + ContainerID, ContainerType, HasCounterSpan, IdLp, LoroResult, LoroValue, PeerID, TreeID, ID, +}; use rle::{HasLength, RleVec, Sliceable}; use crate::{ @@ -26,15 +28,15 @@ const SCHEMA_VERSION: u8 = 1; fn refine_vv(vv: &VersionVector, oplog: &OpLog) -> VersionVector { let mut refined = VersionVector::new(); - for (peer, counter) in vv.iter() { - if counter == &0 { + for (&peer, &counter) in vv.iter() { + if counter == 0 { continue; } - let end = oplog.vv().get(peer).copied().unwrap_or(0); - if end <= *counter { - refined.insert(*peer, end); + let end = oplog.vv().get(&peer).copied().unwrap_or(0); + if end <= counter { + refined.insert(peer, end); } else { - refined.insert(*peer, *counter); + refined.insert(peer, counter); } } refined @@ -79,6 +81,10 @@ fn init_encode<'s, 'a: 's>( let start_cnt = start_vv.get(&change.id.peer).copied().unwrap_or(0); let end_cnt = end_vv.get(&change.id.peer).copied().unwrap_or(0); if change.id.counter < start_cnt { + if change.ctr_end() <= start_cnt { + continue; + } + let offset = start_cnt - change.id.counter; let to = change .atom_len() diff --git a/crates/loro-internal/src/loro.rs b/crates/loro-internal/src/loro.rs index 9ded591c..a3408613 100644 --- a/crates/loro-internal/src/loro.rs +++ b/crates/loro-internal/src/loro.rs @@ -15,7 +15,7 @@ use fxhash::FxHashMap; use itertools::Itertools; use loro_common::{ContainerID, ContainerType, HasIdSpan, IdSpan, LoroResult, LoroValue, ID}; use rle::HasLength; -use tracing::{info, info_span, instrument}; +use tracing::{info, info_span, instrument, trace}; use crate::{ arena::SharedArena, @@ -512,6 +512,11 @@ impl LoroDoc { |oplog| oplog.decode(parsed), origin, )?; + + // let new_doc = LoroDoc::new(); + // new_doc.import(bytes)?; + // let updates = new_doc.export_from(&self.oplog_vv()); + // return self.import_with(updates.as_slice(), origin); } } EncodeMode::Auto => { @@ -1092,6 +1097,11 @@ impl LoroDoc { ); self.commit_then_stop(); let oplog = self.oplog.lock().unwrap(); + trace!( + "oplog: vv={:?} frontiers={:?}", + oplog.vv(), + oplog.frontiers() + ); let mut state = self.state.lock().unwrap(); self.detached.store(true, Release); let mut calc = self.diff_calculator.lock().unwrap(); diff --git a/crates/loro-internal/src/oplog.rs b/crates/loro-internal/src/oplog.rs index a74ae18c..d4f58e1a 100644 --- a/crates/loro-internal/src/oplog.rs +++ b/crates/loro-internal/src/oplog.rs @@ -8,7 +8,7 @@ use std::cell::RefCell; use std::cmp::Ordering; use std::rc::Rc; use std::sync::Mutex; -use tracing::debug; +use tracing::{debug, trace_span}; use crate::change::{get_sys_timestamp, Change, Lamport, Timestamp}; use crate::configure::Configure; @@ -30,7 +30,6 @@ use rle::{HasLength, RleVec, Sliceable}; use smallvec::SmallVec; use self::iter::MergedChangeIter; -use self::loro_dag::EnsureDagNodeDepsAreAtTheEnd; pub use self::loro_dag::{AppDag, AppDagNode, FrontiersNotIncluded}; use self::pending_changes::PendingChanges; @@ -159,7 +158,16 @@ impl OpLog { } /// This is the **only** place to update the `OpLog.changes` - pub(crate) fn insert_new_change(&mut self, change: Change, _: EnsureDagNodeDepsAreAtTheEnd) { + pub(crate) fn insert_new_change(&mut self, change: Change) { + let s = trace_span!( + "insert_new_change", + id = ?change.id, + lamport = change.lamport + ); + let _enter = s.enter(); + self.dag.handle_new_change(&change); + self.next_lamport = self.next_lamport.max(change.lamport_end()); + self.latest_timestamp = self.latest_timestamp.max(change.timestamp); self.history_cache .lock() .unwrap() @@ -216,21 +224,10 @@ impl OpLog { ); } - self.next_lamport = self.next_lamport.max(change.lamport_end()); - self.latest_timestamp = self.latest_timestamp.max(change.timestamp); - let mark = self.update_dag_on_new_change(&change); - self.insert_new_change(change, mark); + self.insert_new_change(change); Ok(()) } - /// Every time we import a new change, it should run this function to update the dag - pub(crate) fn update_dag_on_new_change( - &mut self, - change: &Change, - ) -> EnsureDagNodeDepsAreAtTheEnd { - self.dag.handle_new_change(change) - } - /// Trim the known part of change pub(crate) fn trim_the_known_part_of_change(&self, change: Change) -> Option { let Some(&end) = self.dag.vv().get(&change.id.peer) else { diff --git a/crates/loro-internal/src/oplog/loro_dag.rs b/crates/loro-internal/src/oplog/loro_dag.rs index f27751c0..8b8537b7 100644 --- a/crates/loro-internal/src/oplog/loro_dag.rs +++ b/crates/loro-internal/src/oplog/loro_dag.rs @@ -42,10 +42,6 @@ pub struct AppDag { unhandled_dep_points: Mutex>, } -pub(crate) struct EnsureDagNodeDepsAreAtTheEnd { - _private: PhantomData<()>, -} - #[derive(Debug, Clone)] pub struct AppDagNode { pub(crate) peer: PeerID, @@ -84,9 +80,9 @@ impl AppDag { self.vv.is_empty() } - pub(super) fn handle_new_change(&mut self, change: &Change) -> EnsureDagNodeDepsAreAtTheEnd { + pub(super) fn handle_new_change(&mut self, change: &Change) { let len = change.content_len(); - self.update_frontiers(change.id_last(), &change.deps); + self.update_version_on_new_change(change); if change.deps_on_self() { // don't need to push new element to dag because it only depends on itself self.with_last_mut_of_peer(change.id.peer, |last| { @@ -169,10 +165,6 @@ impl AppDag { } } } - - EnsureDagNodeDepsAreAtTheEnd { - _private: PhantomData, - } } fn with_node_mut( @@ -235,9 +227,15 @@ impl AppDag { f(last) } - pub(super) fn update_frontiers(&mut self, id: ID, deps: &Frontiers) { - self.frontiers.update_frontiers_on_new_change(id, deps); - self.vv.extend_to_include_last_id(id); + fn update_version_on_new_change(&mut self, change: &Change) { + self.frontiers + .update_frontiers_on_new_change(change.id_last(), &change.deps); + assert_eq!( + self.vv.get(&change.id.peer).copied().unwrap_or(0), + change.id.counter + ); + + self.vv.extend_to_include_last_id(change.id_last()); } pub(super) fn lazy_load_last_of_peer(&mut self, peer: u64) { @@ -365,6 +363,7 @@ impl AppDag { } pub(crate) fn set_version_by_fast_snapshot_import(&mut self, v: BatchDecodeInfo) { + assert!(self.vv.is_empty()); *self.unparsed_vv.try_lock().unwrap() = v.vv.clone(); self.vv = v.vv; self.frontiers = v.frontiers; diff --git a/crates/loro-internal/src/oplog/pending_changes.rs b/crates/loro-internal/src/oplog/pending_changes.rs index 10840ef9..ae6c9581 100644 --- a/crates/loro-internal/src/oplog/pending_changes.rs +++ b/crates/loro-internal/src/oplog/pending_changes.rs @@ -122,10 +122,8 @@ impl OpLog { let Some(change) = self.trim_the_known_part_of_change(change) else { return; }; - self.next_lamport = self.next_lamport.max(change.lamport_end()); - self.latest_timestamp = self.latest_timestamp.max(change.timestamp); - let mark = self.update_dag_on_new_change(&change); - self.insert_new_change(change, mark); + + self.insert_new_change(change); } }