diff --git a/crates/loro-internal/src/loro.rs b/crates/loro-internal/src/loro.rs index c89ab46f..8b59a3eb 100644 --- a/crates/loro-internal/src/loro.rs +++ b/crates/loro-internal/src/loro.rs @@ -35,7 +35,7 @@ use crate::{ handler::{Handler, MovableListHandler, TextHandler, TreeHandler, ValueOrHandler}, id::PeerID, op::InnerContent, - oplog::dag::FrontiersNotIncluded, + oplog::loro_dag::FrontiersNotIncluded, undo::DiffBatch, version::Frontiers, HandlerTrait, InternalString, LoroError, VersionVector, diff --git a/crates/loro-internal/src/oplog.rs b/crates/loro-internal/src/oplog.rs index 8af42d28..b25c8198 100644 --- a/crates/loro-internal/src/oplog.rs +++ b/crates/loro-internal/src/oplog.rs @@ -1,6 +1,6 @@ mod change_store; -pub(crate) mod dag; mod iter; +pub(crate) mod loro_dag; mod pending_changes; use once_cell::sync::OnceCell; @@ -23,7 +23,7 @@ use crate::history_cache::ContainerHistoryCache; use crate::id::{Counter, PeerID, ID}; use crate::op::{FutureInnerContent, ListSlice, RawOpContent, RemoteOp, RichOp}; use crate::span::{HasCounterSpan, HasIdSpan, HasLamportSpan}; -use crate::version::{Frontiers, ImVersionVector, VersionVector}; +use crate::version::{Frontiers, VersionVector}; use crate::LoroError; use change_store::BlockOpRef; pub use change_store::{BlockChangeRef, ChangeStore}; @@ -31,8 +31,8 @@ use loro_common::{HasId, IdLp, IdSpan}; use rle::{HasLength, Mergable, RleVec, Sliceable}; use smallvec::SmallVec; -pub use self::dag::FrontiersNotIncluded; use self::iter::MergedChangeIter; +pub use self::loro_dag::{AppDag, AppDagNode, FrontiersNotIncluded}; use self::pending_changes::PendingChanges; use super::arena::SharedArena; @@ -62,124 +62,6 @@ pub struct OpLog { pub(crate) configure: Configure, } -/// [AppDag] maintains the causal graph of the app. -/// It's faster to answer the question like what's the LCA version -#[derive(Debug)] -pub struct AppDag { - change_store: ChangeStore, - pub(crate) map: Mutex>, - pub(crate) frontiers: Frontiers, - pub(crate) vv: VersionVector, - /// Ops included in the version vector but not parsed yet - pub(crate) unparsed_vv: VersionVector, -} - -#[derive(Debug, Clone)] -pub struct AppDagNode { - pub(crate) peer: PeerID, - pub(crate) cnt: Counter, - pub(crate) lamport: Lamport, - pub(crate) deps: Frontiers, - pub(crate) vv: OnceCell, - /// A flag indicating whether any other nodes depend on this node. - /// The calculation of frontiers is based on this value. - pub(crate) has_succ: bool, - pub(crate) len: usize, -} - -impl AppDag { - pub(crate) fn with_node_mut( - &self, - id: ID, - f: impl FnOnce(Option<&mut AppDagNode>) -> R, - ) -> R { - self.ensure_lazy_load_node(id); - let mut map = self.map.lock().unwrap(); - let x = map.range_mut(..=id).next_back(); - if let Some((_, node)) = x { - if node.contains_id(id) { - f(Some(node)) - } else { - f(None) - } - } else { - f(None) - } - } - - /// If the lamport of change can be calculated, return Ok, otherwise, Err - pub(crate) fn calc_unknown_lamport_change(&self, change: &mut Change) -> Result<(), ()> { - for dep in change.deps.iter() { - match self.get_lamport(dep) { - Some(lamport) => { - change.lamport = change.lamport.max(lamport + 1); - } - None => return Err(()), - } - } - Ok(()) - } - - pub(crate) fn find_deps_of_id(&self, id: ID) -> Frontiers { - self.ensure_lazy_load_node(id); - let Some(node) = self.get(id) else { - return Frontiers::default(); - }; - - let offset = id.counter - node.cnt; - if offset == 0 { - node.deps.clone() - } else { - ID::new(id.peer, node.cnt + offset - 1).into() - } - } - - pub(crate) fn with_last_mut_of_peer( - &mut self, - peer: PeerID, - f: impl FnOnce(Option<&mut AppDagNode>) -> R, - ) -> R { - self.lazy_load_last_of_peer(peer); - let mut binding = self.map.lock().unwrap(); - let last = binding - .range_mut(..=ID::new(peer, Counter::MAX)) - .next_back() - .map(|(_, v)| v); - f(last) - } - - fn update_frontiers(&mut self, id: ID, deps: &Frontiers) { - self.frontiers.update_frontiers_on_new_change(id, deps); - self.vv.extend_to_include_last_id(id); - } - - fn lazy_load_last_of_peer(&mut self, peer: u64) { - if !self.unparsed_vv.contains_key(&peer) { - return; - } - - todo!() - } - - fn ensure_lazy_load_node(&self, id: ID) { - if !self.unparsed_vv.includes_id(id) { - return; - } - - todo!("load dag node from kv store, from id -> unparsed_vv.get(peer)") - } - - fn fork(&self, change_store: ChangeStore) -> AppDag { - AppDag { - change_store: change_store.clone(), - map: Mutex::new(self.map.lock().unwrap().clone()), - frontiers: self.frontiers.clone(), - vv: self.vv.clone(), - unparsed_vv: self.unparsed_vv.clone(), - } - } -} - impl std::fmt::Debug for OpLog { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("OpLog") @@ -345,7 +227,6 @@ impl OpLog { self.next_lamport = self.next_lamport.max(change.lamport_end()); self.latest_timestamp = self.latest_timestamp.max(change.timestamp); - self.dag.vv.extend_to_include_last_id(change.id_last()); let mark = self.update_dag_on_new_change(&change); self.insert_new_change(change, mark); Ok(()) diff --git a/crates/loro-internal/src/oplog/dag.rs b/crates/loro-internal/src/oplog/loro_dag.rs similarity index 68% rename from crates/loro-internal/src/oplog/dag.rs rename to crates/loro-internal/src/oplog/loro_dag.rs index c43d0dd4..53237a64 100644 --- a/crates/loro-internal/src/oplog/dag.rs +++ b/crates/loro-internal/src/oplog/loro_dag.rs @@ -1,17 +1,135 @@ -use std::cmp::Ordering; -use std::fmt::Display; -use std::sync::Arc; - -use crate::change::Lamport; +use crate::change::{Change, Lamport}; use crate::dag::{Dag, DagNode}; use crate::id::{Counter, ID}; use crate::span::{HasId, HasLamport}; use crate::version::{Frontiers, ImVersionVector, VersionVector}; -use loro_common::{HasCounter, HasCounterSpan, HasIdSpan}; +use loro_common::{HasCounter, HasCounterSpan, HasIdSpan, PeerID}; use once_cell::sync::OnceCell; use rle::{HasIndex, HasLength, Mergable, Sliceable}; +use std::cmp::Ordering; +use std::collections::BTreeMap; +use std::fmt::Display; +use std::sync::Mutex; -use super::{AppDag, AppDagNode}; +use super::ChangeStore; + +/// [AppDag] maintains the causal graph of the app. +/// It's faster to answer the question like what's the LCA version +#[derive(Debug)] +pub struct AppDag { + pub(super) change_store: ChangeStore, + pub(crate) map: Mutex>, + pub(crate) frontiers: Frontiers, + pub(crate) vv: VersionVector, + /// Ops included in the version vector but not parsed yet + pub(crate) unparsed_vv: VersionVector, +} + +#[derive(Debug, Clone)] +pub struct AppDagNode { + pub(crate) peer: PeerID, + pub(crate) cnt: Counter, + pub(crate) lamport: Lamport, + pub(crate) deps: Frontiers, + pub(crate) vv: OnceCell, + /// A flag indicating whether any other nodes depend on this node. + /// The calculation of frontiers is based on this value. + pub(crate) has_succ: bool, + pub(crate) len: usize, +} + +impl AppDag { + pub(crate) fn with_node_mut( + &self, + id: ID, + f: impl FnOnce(Option<&mut AppDagNode>) -> R, + ) -> R { + self.ensure_lazy_load_node(id); + let mut map = self.map.lock().unwrap(); + let x = map.range_mut(..=id).next_back(); + if let Some((_, node)) = x { + if node.contains_id(id) { + f(Some(node)) + } else { + f(None) + } + } else { + f(None) + } + } + + /// If the lamport of change can be calculated, return Ok, otherwise, Err + pub(crate) fn calc_unknown_lamport_change(&self, change: &mut Change) -> Result<(), ()> { + for dep in change.deps.iter() { + match self.get_lamport(dep) { + Some(lamport) => { + change.lamport = change.lamport.max(lamport + 1); + } + None => return Err(()), + } + } + Ok(()) + } + + pub(crate) fn find_deps_of_id(&self, id: ID) -> Frontiers { + self.ensure_lazy_load_node(id); + let Some(node) = self.get(id) else { + return Frontiers::default(); + }; + + let offset = id.counter - node.cnt; + if offset == 0 { + node.deps.clone() + } else { + ID::new(id.peer, node.cnt + offset - 1).into() + } + } + + pub(crate) fn with_last_mut_of_peer( + &mut self, + peer: PeerID, + f: impl FnOnce(Option<&mut AppDagNode>) -> R, + ) -> R { + self.lazy_load_last_of_peer(peer); + let mut binding = self.map.lock().unwrap(); + let last = binding + .range_mut(..=ID::new(peer, Counter::MAX)) + .next_back() + .map(|(_, v)| v); + f(last) + } + + pub(super) fn update_frontiers(&mut self, id: ID, deps: &Frontiers) { + self.frontiers.update_frontiers_on_new_change(id, deps); + self.vv.extend_to_include_last_id(id); + } + + pub(super) fn lazy_load_last_of_peer(&mut self, peer: u64) { + if !self.unparsed_vv.contains_key(&peer) { + return; + } + + todo!() + } + + pub(super) fn ensure_lazy_load_node(&self, id: ID) { + if !self.unparsed_vv.includes_id(id) { + return; + } + + todo!("load dag node from kv store, from id -> unparsed_vv.get(peer)") + } + + pub(super) fn fork(&self, change_store: ChangeStore) -> AppDag { + AppDag { + change_store: change_store.clone(), + map: Mutex::new(self.map.lock().unwrap().clone()), + frontiers: self.frontiers.clone(), + vv: self.vv.clone(), + unparsed_vv: self.unparsed_vv.clone(), + } + } +} impl HasIndex for AppDagNode { type Int = Counter;