fix: make it work

but it may be unsound
This commit is contained in:
Zixuan Chen 2024-08-15 22:21:57 +08:00
parent 6568af86b9
commit ea20f62d31
No known key found for this signature in database
3 changed files with 115 additions and 68 deletions

View file

@ -246,10 +246,6 @@ pub(crate) fn import_changes_to_oplog(
let mark = oplog.update_dag_on_new_change(&change);
oplog.next_lamport = oplog.next_lamport.max(change.lamport_end());
oplog.latest_timestamp = oplog.latest_timestamp.max(change.timestamp);
oplog.dag.vv.extend_to_include_end_id(ID {
peer: change.id.peer,
counter: change.id.counter + change.atom_len() as Counter,
});
oplog.insert_new_change(change, mark);
}

View file

@ -9,8 +9,8 @@ use std::cell::RefCell;
use std::cmp::Ordering;
use std::collections::BTreeMap;
use std::rc::Rc;
use std::sync::Mutex;
use tracing::{debug, trace};
use std::sync::{Arc, Mutex};
use tracing::debug;
use crate::change::{get_sys_timestamp, Change, Lamport, Timestamp};
use crate::configure::Configure;
@ -64,10 +64,10 @@ pub struct OpLog {
/// [AppDag] maintains the causal graph of the app.
/// It's faster to answer the question like what's the LCA version
#[derive(Debug, Clone)]
#[derive(Debug)]
pub struct AppDag {
change_store: ChangeStore,
pub(crate) map: BTreeMap<ID, AppDagNode>,
pub(crate) map: Mutex<BTreeMap<ID, Arc<AppDagNode>>>,
pub(crate) frontiers: Frontiers,
pub(crate) vv: VersionVector,
/// Ops included in the version vector but not parsed yet
@ -88,13 +88,22 @@ pub struct AppDagNode {
}
impl AppDag {
pub fn get_mut(&mut self, id: ID) -> Option<&mut AppDagNode> {
self.lazy_load_node(id);
let x = self.map.range_mut(..=id).next_back()?;
if x.1.contains_id(id) {
Some(x.1)
pub(crate) fn with_node_mut<R>(
&self,
id: ID,
f: impl FnOnce(Option<&mut AppDagNode>) -> R,
) -> R {
self.ensure_lazy_load_node(id);
let mut map = self.map.lock().unwrap();
let x = map.range_mut(..=id).next_back();
if let Some((_, node)) = x {
if node.contains_id(id) {
f(Some(Arc::make_mut(node)))
} else {
f(None)
}
} else {
None
f(None)
}
}
@ -112,7 +121,7 @@ impl AppDag {
}
pub(crate) fn find_deps_of_id(&self, id: ID) -> Frontiers {
self.lazy_load_node(id);
self.ensure_lazy_load_node(id);
let Some(node) = self.get(id) else {
return Frontiers::default();
};
@ -125,29 +134,50 @@ impl AppDag {
}
}
pub(crate) fn get_last_mut_of_peer(&mut self, peer: PeerID) -> Option<&mut AppDagNode> {
pub(crate) fn with_last_mut_of_peer<R>(
&mut self,
peer: PeerID,
f: impl FnOnce(Option<&mut AppDagNode>) -> R,
) -> R {
self.lazy_load_last_of_peer(peer);
self.map
let mut binding = self.map.lock().unwrap();
let last = binding
.range_mut(..=ID::new(peer, Counter::MAX))
.next_back()
.map(|(_, v)| v)
.map(|(_, v)| Arc::make_mut(v));
f(last)
}
fn update_frontiers(&mut self, id: ID, deps: &Frontiers) {
self.frontiers.update_frontiers_on_new_change(id, deps);
self.vv.extend_to_include_last_id(id);
}
fn lazy_load_last_of_peer(&mut self, peer: u64) {
if !self.unparsed_vv.contains_key(&peer) {
return;
}
todo!()
}
fn lazy_load_node(&mut self, id: ID) {
fn ensure_lazy_load_node(&self, id: ID) {
if !self.unparsed_vv.includes_id(id) {
return;
}
todo!("load dag node from kv store, from id -> unparsed_vv.get(peer)")
}
fn fork(&self, change_store: ChangeStore) -> AppDag {
AppDag {
change_store: change_store.clone(),
map: Mutex::new(self.map.lock().unwrap().clone()),
frontiers: self.frontiers.clone(),
vv: self.vv.clone(),
unparsed_vv: self.unparsed_vv.clone(),
}
}
}
impl std::fmt::Debug for OpLog {
@ -176,7 +206,7 @@ impl OpLog {
)),
dag: AppDag {
change_store: change_store.clone(),
map: BTreeMap::new(),
map: Mutex::new(BTreeMap::new()),
frontiers: Frontiers::default(),
vv: VersionVector::default(),
unparsed_vv: VersionVector::default(),
@ -196,15 +226,15 @@ impl OpLog {
.change_store
.fork(arena.clone(), configure.merge_interval.clone());
Self {
change_store: change_store.clone(),
dag: self.dag.clone(),
arena: self.arena.clone(),
history_cache: Mutex::new(
self.history_cache
.lock()
.unwrap()
.fork(arena.clone(), change_store),
.fork(arena.clone(), change_store.clone()),
),
change_store: change_store.clone(),
dag: self.dag.fork(change_store),
arena: self.arena.clone(),
next_lamport: self.next_lamport,
latest_timestamp: self.latest_timestamp,
pending_changes: Default::default(),
@ -252,7 +282,7 @@ impl OpLog {
#[inline]
pub fn is_empty(&self) -> bool {
self.dag.map.is_empty() && self.arena.can_import_snapshot()
self.dag.map.lock().unwrap().is_empty() && self.arena.can_import_snapshot()
}
/// This is the **only** place to update the `OpLog.changes`
@ -330,20 +360,22 @@ impl OpLog {
self.dag.update_frontiers(change.id_last(), &change.deps);
if change.deps_on_self() {
// don't need to push new element to dag because it only depends on itself
let last = self.dag.get_last_mut_of_peer(change.id.peer).unwrap();
assert_eq!(last.peer, change.id.peer, "peer id is not the same");
assert_eq!(
last.cnt + last.len as Counter,
change.id.counter,
"counter is not continuous"
);
assert_eq!(
last.lamport + last.len as Lamport,
change.lamport,
"lamport is not continuous"
);
last.len = (change.id.counter - last.cnt) as usize + len;
last.has_succ = false;
self.dag.with_last_mut_of_peer(change.id.peer, |last| {
let last = last.unwrap();
assert_eq!(last.peer, change.id.peer, "peer id is not the same");
assert_eq!(
last.cnt + last.len as Counter,
change.id.counter,
"counter is not continuous"
);
assert_eq!(
last.lamport + last.len as Lamport,
change.lamport,
"lamport is not continuous"
);
last.len = (change.id.counter - last.cnt) as usize + len;
last.has_succ = false;
});
} else {
let vv = self.dag.frontiers_to_im_vv(&change.deps);
let mut pushed = false;
@ -356,37 +388,53 @@ impl OpLog {
has_succ: false,
len,
};
if let Some(last) = self.dag.get_last_mut_of_peer(change.id.peer) {
if change.id.counter > 0 {
assert_eq!(
last.ctr_end(),
change.id.counter,
"counter is not continuous"
);
}
self.dag.with_last_mut_of_peer(change.id.peer, |last| {
if let Some(last) = last {
if change.id.counter > 0 {
assert_eq!(
last.ctr_end(),
change.id.counter,
"counter is not continuous"
);
}
if last.is_mergable(&node, &()) {
pushed = true;
last.merge(&node, &());
if last.is_mergable(&node, &()) {
pushed = true;
last.merge(&node, &());
}
}
}
});
if !pushed {
self.dag.map.insert(node.id_start(), node);
self.dag
.map
.lock()
.unwrap()
.insert(node.id_start(), Arc::new(node));
}
for dep in change.deps.iter() {
let target = self.dag.get_mut(*dep).unwrap();
if target.ctr_last() == dep.counter {
target.has_succ = true;
} else {
// We need to split the target node into two part
// so that we can ensure the new change depends on the
// last id of a dag node.
let new_node =
target.slice(dep.counter as usize - target.cnt as usize, target.len);
target.len -= new_node.len;
self.dag.map.insert(new_node.id_start(), new_node);
let ans = self.dag.with_node_mut(*dep, |target| {
let target = target.unwrap();
if target.ctr_last() == dep.counter {
target.has_succ = true;
None
} else {
// We need to split the target node into two part
// so that we can ensure the new change depends on the
// last id of a dag node.
let new_node =
target.slice(dep.counter as usize - target.cnt as usize, target.len);
target.len -= new_node.len;
Some(new_node)
}
});
if let Some(new_node) = ans {
self.dag
.map
.lock()
.unwrap()
.insert(new_node.id_start(), Arc::new(new_node));
}
}
}
@ -653,7 +701,7 @@ impl OpLog {
let mut total_changes = 0;
let mut total_ops = 0;
let mut total_atom_ops = 0;
let total_dag_node = self.dag.map.len();
let total_dag_node = self.dag.map.lock().unwrap().len();
self.change_store.visit_all_changes(&mut |change| {
total_changes += 1;
total_ops += change.ops.len();

View file

@ -108,10 +108,13 @@ impl Dag for AppDag {
}
fn get(&self, id: ID) -> Option<&Self::Node> {
self.lazy_load_node(id);
let x = self.map.range(..=id).next_back()?;
self.ensure_lazy_load_node(id);
let binding = self.map.lock().unwrap();
let x = binding.range(..=id).next_back()?;
if x.1.contains_id(id) {
Some(x.1)
let app_node: &AppDagNode = x.1;
// SAFETY: the nodes on app_dag will not be dropped util itself is dropped
Some(unsafe { std::mem::transmute::<&AppDagNode, &AppDagNode>(app_node) })
} else {
None
}
@ -169,7 +172,7 @@ impl AppDag {
}
pub fn get_lamport(&self, id: &ID) -> Option<Lamport> {
self.lazy_load_node(id);
self.ensure_lazy_load_node(*id);
self.get(*id).and_then(|node| {
assert!(id.counter >= node.cnt);
if node.cnt + node.len as Counter > id.counter {