refactor: simplify dag and inserting change

This commit is contained in:
Zixuan Chen 2024-08-18 14:34:27 +08:00
parent 4df37ba170
commit 0c59bd2dcc
No known key found for this signature in database
6 changed files with 52 additions and 45 deletions

View file

@ -243,11 +243,8 @@ pub(crate) fn import_changes_to_oplog(
let Some(change) = oplog.trim_the_known_part_of_change(change) else { let Some(change) = oplog.trim_the_known_part_of_change(change) else {
continue; continue;
}; };
// update dag and push the change
let mark = oplog.update_dag_on_new_change(&change); oplog.insert_new_change(change);
oplog.next_lamport = oplog.next_lamport.max(change.lamport_end());
oplog.latest_timestamp = oplog.latest_timestamp.max(change.timestamp);
oplog.insert_new_change(change, mark);
} }
Ok((latest_ids, pending_changes)) Ok((latest_ids, pending_changes))

View file

@ -1,7 +1,9 @@
use std::sync::Arc; use std::sync::Arc;
use either::Either; use either::Either;
use loro_common::{ContainerID, ContainerType, IdLp, LoroResult, LoroValue, PeerID, TreeID, ID}; use loro_common::{
ContainerID, ContainerType, HasCounterSpan, IdLp, LoroResult, LoroValue, PeerID, TreeID, ID,
};
use rle::{HasLength, RleVec, Sliceable}; use rle::{HasLength, RleVec, Sliceable};
use crate::{ use crate::{
@ -26,15 +28,15 @@ const SCHEMA_VERSION: u8 = 1;
fn refine_vv(vv: &VersionVector, oplog: &OpLog) -> VersionVector { fn refine_vv(vv: &VersionVector, oplog: &OpLog) -> VersionVector {
let mut refined = VersionVector::new(); let mut refined = VersionVector::new();
for (peer, counter) in vv.iter() { for (&peer, &counter) in vv.iter() {
if counter == &0 { if counter == 0 {
continue; continue;
} }
let end = oplog.vv().get(peer).copied().unwrap_or(0); let end = oplog.vv().get(&peer).copied().unwrap_or(0);
if end <= *counter { if end <= counter {
refined.insert(*peer, end); refined.insert(peer, end);
} else { } else {
refined.insert(*peer, *counter); refined.insert(peer, counter);
} }
} }
refined refined
@ -79,6 +81,10 @@ fn init_encode<'s, 'a: 's>(
let start_cnt = start_vv.get(&change.id.peer).copied().unwrap_or(0); let start_cnt = start_vv.get(&change.id.peer).copied().unwrap_or(0);
let end_cnt = end_vv.get(&change.id.peer).copied().unwrap_or(0); let end_cnt = end_vv.get(&change.id.peer).copied().unwrap_or(0);
if change.id.counter < start_cnt { if change.id.counter < start_cnt {
if change.ctr_end() <= start_cnt {
continue;
}
let offset = start_cnt - change.id.counter; let offset = start_cnt - change.id.counter;
let to = change let to = change
.atom_len() .atom_len()

View file

@ -15,7 +15,7 @@ use fxhash::FxHashMap;
use itertools::Itertools; use itertools::Itertools;
use loro_common::{ContainerID, ContainerType, HasIdSpan, IdSpan, LoroResult, LoroValue, ID}; use loro_common::{ContainerID, ContainerType, HasIdSpan, IdSpan, LoroResult, LoroValue, ID};
use rle::HasLength; use rle::HasLength;
use tracing::{info, info_span, instrument}; use tracing::{info, info_span, instrument, trace};
use crate::{ use crate::{
arena::SharedArena, arena::SharedArena,
@ -512,6 +512,11 @@ impl LoroDoc {
|oplog| oplog.decode(parsed), |oplog| oplog.decode(parsed),
origin, origin,
)?; )?;
// let new_doc = LoroDoc::new();
// new_doc.import(bytes)?;
// let updates = new_doc.export_from(&self.oplog_vv());
// return self.import_with(updates.as_slice(), origin);
} }
} }
EncodeMode::Auto => { EncodeMode::Auto => {
@ -1092,6 +1097,11 @@ impl LoroDoc {
); );
self.commit_then_stop(); self.commit_then_stop();
let oplog = self.oplog.lock().unwrap(); let oplog = self.oplog.lock().unwrap();
trace!(
"oplog: vv={:?} frontiers={:?}",
oplog.vv(),
oplog.frontiers()
);
let mut state = self.state.lock().unwrap(); let mut state = self.state.lock().unwrap();
self.detached.store(true, Release); self.detached.store(true, Release);
let mut calc = self.diff_calculator.lock().unwrap(); let mut calc = self.diff_calculator.lock().unwrap();

View file

@ -8,7 +8,7 @@ use std::cell::RefCell;
use std::cmp::Ordering; use std::cmp::Ordering;
use std::rc::Rc; use std::rc::Rc;
use std::sync::Mutex; use std::sync::Mutex;
use tracing::debug; use tracing::{debug, trace_span};
use crate::change::{get_sys_timestamp, Change, Lamport, Timestamp}; use crate::change::{get_sys_timestamp, Change, Lamport, Timestamp};
use crate::configure::Configure; use crate::configure::Configure;
@ -30,7 +30,6 @@ use rle::{HasLength, RleVec, Sliceable};
use smallvec::SmallVec; use smallvec::SmallVec;
use self::iter::MergedChangeIter; use self::iter::MergedChangeIter;
use self::loro_dag::EnsureDagNodeDepsAreAtTheEnd;
pub use self::loro_dag::{AppDag, AppDagNode, FrontiersNotIncluded}; pub use self::loro_dag::{AppDag, AppDagNode, FrontiersNotIncluded};
use self::pending_changes::PendingChanges; use self::pending_changes::PendingChanges;
@ -159,7 +158,16 @@ impl OpLog {
} }
/// This is the **only** place to update the `OpLog.changes` /// This is the **only** place to update the `OpLog.changes`
pub(crate) fn insert_new_change(&mut self, change: Change, _: EnsureDagNodeDepsAreAtTheEnd) { pub(crate) fn insert_new_change(&mut self, change: Change) {
let s = trace_span!(
"insert_new_change",
id = ?change.id,
lamport = change.lamport
);
let _enter = s.enter();
self.dag.handle_new_change(&change);
self.next_lamport = self.next_lamport.max(change.lamport_end());
self.latest_timestamp = self.latest_timestamp.max(change.timestamp);
self.history_cache self.history_cache
.lock() .lock()
.unwrap() .unwrap()
@ -216,21 +224,10 @@ impl OpLog {
); );
} }
self.next_lamport = self.next_lamport.max(change.lamport_end()); self.insert_new_change(change);
self.latest_timestamp = self.latest_timestamp.max(change.timestamp);
let mark = self.update_dag_on_new_change(&change);
self.insert_new_change(change, mark);
Ok(()) Ok(())
} }
/// Every time we import a new change, it should run this function to update the dag
pub(crate) fn update_dag_on_new_change(
&mut self,
change: &Change,
) -> EnsureDagNodeDepsAreAtTheEnd {
self.dag.handle_new_change(change)
}
/// Trim the known part of change /// Trim the known part of change
pub(crate) fn trim_the_known_part_of_change(&self, change: Change) -> Option<Change> { pub(crate) fn trim_the_known_part_of_change(&self, change: Change) -> Option<Change> {
let Some(&end) = self.dag.vv().get(&change.id.peer) else { let Some(&end) = self.dag.vv().get(&change.id.peer) else {

View file

@ -42,10 +42,6 @@ pub struct AppDag {
unhandled_dep_points: Mutex<BTreeSet<ID>>, unhandled_dep_points: Mutex<BTreeSet<ID>>,
} }
pub(crate) struct EnsureDagNodeDepsAreAtTheEnd {
_private: PhantomData<()>,
}
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct AppDagNode { pub struct AppDagNode {
pub(crate) peer: PeerID, pub(crate) peer: PeerID,
@ -84,9 +80,9 @@ impl AppDag {
self.vv.is_empty() self.vv.is_empty()
} }
pub(super) fn handle_new_change(&mut self, change: &Change) -> EnsureDagNodeDepsAreAtTheEnd { pub(super) fn handle_new_change(&mut self, change: &Change) {
let len = change.content_len(); let len = change.content_len();
self.update_frontiers(change.id_last(), &change.deps); self.update_version_on_new_change(change);
if change.deps_on_self() { if change.deps_on_self() {
// don't need to push new element to dag because it only depends on itself // don't need to push new element to dag because it only depends on itself
self.with_last_mut_of_peer(change.id.peer, |last| { self.with_last_mut_of_peer(change.id.peer, |last| {
@ -169,10 +165,6 @@ impl AppDag {
} }
} }
} }
EnsureDagNodeDepsAreAtTheEnd {
_private: PhantomData,
}
} }
fn with_node_mut<R>( fn with_node_mut<R>(
@ -235,9 +227,15 @@ impl AppDag {
f(last) f(last)
} }
pub(super) fn update_frontiers(&mut self, id: ID, deps: &Frontiers) { fn update_version_on_new_change(&mut self, change: &Change) {
self.frontiers.update_frontiers_on_new_change(id, deps); self.frontiers
self.vv.extend_to_include_last_id(id); .update_frontiers_on_new_change(change.id_last(), &change.deps);
assert_eq!(
self.vv.get(&change.id.peer).copied().unwrap_or(0),
change.id.counter
);
self.vv.extend_to_include_last_id(change.id_last());
} }
pub(super) fn lazy_load_last_of_peer(&mut self, peer: u64) { pub(super) fn lazy_load_last_of_peer(&mut self, peer: u64) {
@ -365,6 +363,7 @@ impl AppDag {
} }
pub(crate) fn set_version_by_fast_snapshot_import(&mut self, v: BatchDecodeInfo) { pub(crate) fn set_version_by_fast_snapshot_import(&mut self, v: BatchDecodeInfo) {
assert!(self.vv.is_empty());
*self.unparsed_vv.try_lock().unwrap() = v.vv.clone(); *self.unparsed_vv.try_lock().unwrap() = v.vv.clone();
self.vv = v.vv; self.vv = v.vv;
self.frontiers = v.frontiers; self.frontiers = v.frontiers;

View file

@ -122,10 +122,8 @@ impl OpLog {
let Some(change) = self.trim_the_known_part_of_change(change) else { let Some(change) = self.trim_the_known_part_of_change(change) else {
return; return;
}; };
self.next_lamport = self.next_lamport.max(change.lamport_end());
self.latest_timestamp = self.latest_timestamp.max(change.timestamp); self.insert_new_change(change);
let mark = self.update_dag_on_new_change(&change);
self.insert_new_change(change, mark);
} }
} }