refactor: move dag logic to dag mod

This commit is contained in:
Zixuan Chen 2024-08-16 10:56:35 +08:00
parent cb3458b862
commit 35f0f811eb
No known key found for this signature in database
3 changed files with 129 additions and 130 deletions

View file

@ -35,7 +35,7 @@ use crate::{
handler::{Handler, MovableListHandler, TextHandler, TreeHandler, ValueOrHandler},
id::PeerID,
op::InnerContent,
oplog::dag::FrontiersNotIncluded,
oplog::loro_dag::FrontiersNotIncluded,
undo::DiffBatch,
version::Frontiers,
HandlerTrait, InternalString, LoroError, VersionVector,

View file

@ -1,6 +1,6 @@
mod change_store;
pub(crate) mod dag;
mod iter;
pub(crate) mod loro_dag;
mod pending_changes;
use once_cell::sync::OnceCell;
@ -23,7 +23,7 @@ use crate::history_cache::ContainerHistoryCache;
use crate::id::{Counter, PeerID, ID};
use crate::op::{FutureInnerContent, ListSlice, RawOpContent, RemoteOp, RichOp};
use crate::span::{HasCounterSpan, HasIdSpan, HasLamportSpan};
use crate::version::{Frontiers, ImVersionVector, VersionVector};
use crate::version::{Frontiers, VersionVector};
use crate::LoroError;
use change_store::BlockOpRef;
pub use change_store::{BlockChangeRef, ChangeStore};
@ -31,8 +31,8 @@ use loro_common::{HasId, IdLp, IdSpan};
use rle::{HasLength, Mergable, RleVec, Sliceable};
use smallvec::SmallVec;
pub use self::dag::FrontiersNotIncluded;
use self::iter::MergedChangeIter;
pub use self::loro_dag::{AppDag, AppDagNode, FrontiersNotIncluded};
use self::pending_changes::PendingChanges;
use super::arena::SharedArena;
@ -62,124 +62,6 @@ pub struct OpLog {
pub(crate) configure: Configure,
}
/// [AppDag] maintains the causal graph of the app.
/// It's faster to answer the question like what's the LCA version
#[derive(Debug)]
pub struct AppDag {
change_store: ChangeStore,
pub(crate) map: Mutex<BTreeMap<ID, AppDagNode>>,
pub(crate) frontiers: Frontiers,
pub(crate) vv: VersionVector,
/// Ops included in the version vector but not parsed yet
pub(crate) unparsed_vv: VersionVector,
}
#[derive(Debug, Clone)]
pub struct AppDagNode {
pub(crate) peer: PeerID,
pub(crate) cnt: Counter,
pub(crate) lamport: Lamport,
pub(crate) deps: Frontiers,
pub(crate) vv: OnceCell<ImVersionVector>,
/// A flag indicating whether any other nodes depend on this node.
/// The calculation of frontiers is based on this value.
pub(crate) has_succ: bool,
pub(crate) len: usize,
}
impl AppDag {
pub(crate) fn with_node_mut<R>(
&self,
id: ID,
f: impl FnOnce(Option<&mut AppDagNode>) -> R,
) -> R {
self.ensure_lazy_load_node(id);
let mut map = self.map.lock().unwrap();
let x = map.range_mut(..=id).next_back();
if let Some((_, node)) = x {
if node.contains_id(id) {
f(Some(node))
} else {
f(None)
}
} else {
f(None)
}
}
/// If the lamport of change can be calculated, return Ok, otherwise, Err
pub(crate) fn calc_unknown_lamport_change(&self, change: &mut Change) -> Result<(), ()> {
for dep in change.deps.iter() {
match self.get_lamport(dep) {
Some(lamport) => {
change.lamport = change.lamport.max(lamport + 1);
}
None => return Err(()),
}
}
Ok(())
}
pub(crate) fn find_deps_of_id(&self, id: ID) -> Frontiers {
self.ensure_lazy_load_node(id);
let Some(node) = self.get(id) else {
return Frontiers::default();
};
let offset = id.counter - node.cnt;
if offset == 0 {
node.deps.clone()
} else {
ID::new(id.peer, node.cnt + offset - 1).into()
}
}
pub(crate) fn with_last_mut_of_peer<R>(
&mut self,
peer: PeerID,
f: impl FnOnce(Option<&mut AppDagNode>) -> R,
) -> R {
self.lazy_load_last_of_peer(peer);
let mut binding = self.map.lock().unwrap();
let last = binding
.range_mut(..=ID::new(peer, Counter::MAX))
.next_back()
.map(|(_, v)| v);
f(last)
}
fn update_frontiers(&mut self, id: ID, deps: &Frontiers) {
self.frontiers.update_frontiers_on_new_change(id, deps);
self.vv.extend_to_include_last_id(id);
}
fn lazy_load_last_of_peer(&mut self, peer: u64) {
if !self.unparsed_vv.contains_key(&peer) {
return;
}
todo!()
}
fn ensure_lazy_load_node(&self, id: ID) {
if !self.unparsed_vv.includes_id(id) {
return;
}
todo!("load dag node from kv store, from id -> unparsed_vv.get(peer)")
}
fn fork(&self, change_store: ChangeStore) -> AppDag {
AppDag {
change_store: change_store.clone(),
map: Mutex::new(self.map.lock().unwrap().clone()),
frontiers: self.frontiers.clone(),
vv: self.vv.clone(),
unparsed_vv: self.unparsed_vv.clone(),
}
}
}
impl std::fmt::Debug for OpLog {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("OpLog")
@ -345,7 +227,6 @@ impl OpLog {
self.next_lamport = self.next_lamport.max(change.lamport_end());
self.latest_timestamp = self.latest_timestamp.max(change.timestamp);
self.dag.vv.extend_to_include_last_id(change.id_last());
let mark = self.update_dag_on_new_change(&change);
self.insert_new_change(change, mark);
Ok(())

View file

@ -1,17 +1,135 @@
use std::cmp::Ordering;
use std::fmt::Display;
use std::sync::Arc;
use crate::change::Lamport;
use crate::change::{Change, Lamport};
use crate::dag::{Dag, DagNode};
use crate::id::{Counter, ID};
use crate::span::{HasId, HasLamport};
use crate::version::{Frontiers, ImVersionVector, VersionVector};
use loro_common::{HasCounter, HasCounterSpan, HasIdSpan};
use loro_common::{HasCounter, HasCounterSpan, HasIdSpan, PeerID};
use once_cell::sync::OnceCell;
use rle::{HasIndex, HasLength, Mergable, Sliceable};
use std::cmp::Ordering;
use std::collections::BTreeMap;
use std::fmt::Display;
use std::sync::Mutex;
use super::{AppDag, AppDagNode};
use super::ChangeStore;
/// [AppDag] maintains the causal graph of the app.
/// It's faster to answer the question like what's the LCA version
#[derive(Debug)]
pub struct AppDag {
pub(super) change_store: ChangeStore,
pub(crate) map: Mutex<BTreeMap<ID, AppDagNode>>,
pub(crate) frontiers: Frontiers,
pub(crate) vv: VersionVector,
/// Ops included in the version vector but not parsed yet
pub(crate) unparsed_vv: VersionVector,
}
#[derive(Debug, Clone)]
pub struct AppDagNode {
pub(crate) peer: PeerID,
pub(crate) cnt: Counter,
pub(crate) lamport: Lamport,
pub(crate) deps: Frontiers,
pub(crate) vv: OnceCell<ImVersionVector>,
/// A flag indicating whether any other nodes depend on this node.
/// The calculation of frontiers is based on this value.
pub(crate) has_succ: bool,
pub(crate) len: usize,
}
impl AppDag {
pub(crate) fn with_node_mut<R>(
&self,
id: ID,
f: impl FnOnce(Option<&mut AppDagNode>) -> R,
) -> R {
self.ensure_lazy_load_node(id);
let mut map = self.map.lock().unwrap();
let x = map.range_mut(..=id).next_back();
if let Some((_, node)) = x {
if node.contains_id(id) {
f(Some(node))
} else {
f(None)
}
} else {
f(None)
}
}
/// If the lamport of change can be calculated, return Ok, otherwise, Err
pub(crate) fn calc_unknown_lamport_change(&self, change: &mut Change) -> Result<(), ()> {
for dep in change.deps.iter() {
match self.get_lamport(dep) {
Some(lamport) => {
change.lamport = change.lamport.max(lamport + 1);
}
None => return Err(()),
}
}
Ok(())
}
pub(crate) fn find_deps_of_id(&self, id: ID) -> Frontiers {
self.ensure_lazy_load_node(id);
let Some(node) = self.get(id) else {
return Frontiers::default();
};
let offset = id.counter - node.cnt;
if offset == 0 {
node.deps.clone()
} else {
ID::new(id.peer, node.cnt + offset - 1).into()
}
}
pub(crate) fn with_last_mut_of_peer<R>(
&mut self,
peer: PeerID,
f: impl FnOnce(Option<&mut AppDagNode>) -> R,
) -> R {
self.lazy_load_last_of_peer(peer);
let mut binding = self.map.lock().unwrap();
let last = binding
.range_mut(..=ID::new(peer, Counter::MAX))
.next_back()
.map(|(_, v)| v);
f(last)
}
pub(super) fn update_frontiers(&mut self, id: ID, deps: &Frontiers) {
self.frontiers.update_frontiers_on_new_change(id, deps);
self.vv.extend_to_include_last_id(id);
}
pub(super) fn lazy_load_last_of_peer(&mut self, peer: u64) {
if !self.unparsed_vv.contains_key(&peer) {
return;
}
todo!()
}
pub(super) fn ensure_lazy_load_node(&self, id: ID) {
if !self.unparsed_vv.includes_id(id) {
return;
}
todo!("load dag node from kv store, from id -> unparsed_vv.get(peer)")
}
pub(super) fn fork(&self, change_store: ChangeStore) -> AppDag {
AppDag {
change_store: change_store.clone(),
map: Mutex::new(self.map.lock().unwrap().clone()),
frontiers: self.frontiers.clone(),
vv: self.vv.clone(),
unparsed_vv: self.unparsed_vv.clone(),
}
}
}
impl HasIndex for AppDagNode {
type Int = Counter;