fix: dag issues

This commit is contained in:
Zixuan Chen 2022-07-29 18:10:06 +08:00
parent 1ca2b4226e
commit 8db47780b9
2 changed files with 169 additions and 74 deletions

View file

@ -18,10 +18,12 @@ pub trait DagNode {
fn len(&self) -> usize;
fn deps(&self) -> &Vec<ID>;
#[inline]
fn is_empty(&self) -> bool {
self.len() == 0
}
#[inline]
fn dag_id_span(&self) -> IdSpan {
let id = self.dag_id_start();
IdSpan {
@ -29,6 +31,16 @@ pub trait DagNode {
counter: CounterSpan::new(id.counter, id.counter + self.len() as Counter),
}
}
/// inclusive end
#[inline]
fn dag_id_end(&self) -> ID {
let id = self.dag_id_start();
ID {
client_id: id.client_id,
counter: id.counter + self.len() as Counter - 1,
}
}
}
pub(crate) trait Dag {
@ -39,83 +51,98 @@ pub(crate) trait Dag {
fn frontier(&self) -> &[ID];
fn roots(&self) -> Vec<&Self::Node>;
fn get_common_ancestor(&self, a: ID, b: ID) -> Option<ID> {
if a.client_id == b.client_id {
if a.counter <= b.counter {
Some(a)
//
// TODO: Maybe use Result return type
// TODO: benchmark
// TODO: visited
// how to test better?
// - converge through other nodes
//
/// only returns a single root.
/// but the least common ancestor may be more than one root.
/// But that is a rare case.
fn find_common_ancestor(&self, a_id: ID, b_id: ID) -> Option<ID> {
if a_id.client_id == b_id.client_id {
if a_id.counter <= b_id.counter {
Some(a_id)
} else {
Some(b)
Some(b_id)
}
} else {
#[derive(Debug, PartialEq, Eq)]
struct OrdId {
struct OrdId<'a> {
id: ID,
lamport: Lamport,
deps: &'a [ID],
}
impl PartialOrd for OrdId {
impl<'a> PartialOrd for OrdId<'a> {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.lamport.cmp(&other.lamport))
}
}
impl Ord for OrdId {
impl<'a> Ord for OrdId<'a> {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.lamport.cmp(&other.lamport)
}
}
let mut a_map: HashMap<ClientID, Range<Counter>, _> = FxHashMap::default();
let mut b_map: HashMap<ClientID, Range<Counter>, _> = FxHashMap::default();
let mut a_heap: BinaryHeap<OrdId> = BinaryHeap::new();
let mut b_heap: BinaryHeap<OrdId> = BinaryHeap::new();
let mut _a_vv: HashMap<ClientID, Counter, _> = FxHashMap::default();
let mut _b_vv: HashMap<ClientID, Counter, _> = FxHashMap::default();
let mut _a_heap: BinaryHeap<OrdId> = BinaryHeap::new();
let mut _b_heap: BinaryHeap<OrdId> = BinaryHeap::new();
{
let a = self.get(a).unwrap();
let b = self.get(b).unwrap();
a_heap.push(OrdId {
id: a.dag_id_start(),
lamport: a.lamport_start() + a.len() as Lamport,
let a = self.get(a_id).unwrap();
let b = self.get(b_id).unwrap();
_a_heap.push(OrdId {
id: a_id,
lamport: a_id.counter + a.lamport_start() - a.dag_id_start().counter,
deps: a.deps(),
});
b_heap.push(OrdId {
id: b.dag_id_start(),
lamport: b.lamport_start() + b.len() as Lamport,
_b_heap.push(OrdId {
id: b_id,
lamport: b_id.counter + b.lamport_start() - b.dag_id_start().counter,
deps: b.deps(),
});
_a_vv.insert(a_id.client_id, a_id.counter + 1);
_b_vv.insert(b_id.client_id, b_id.counter + 1);
}
while !a_heap.is_empty() || !b_heap.is_empty() {
let (a_heap, b_heap, a_map, b_map) =
if a_heap.peek().map(|x| x.lamport).unwrap_or(0)
< b_heap.peek().map(|x| x.lamport).unwrap_or(0)
{
// swap
(&mut b_heap, &mut a_heap, &mut b_map, &mut a_map)
} else {
(&mut a_heap, &mut b_heap, &mut a_map, &mut b_map)
};
while !_a_heap.is_empty() || !_b_heap.is_empty() {
let (a_heap, b_heap, a_vv, b_vv) = if _a_heap.is_empty()
|| (_a_heap.peek().map(|x| x.lamport).unwrap_or(0)
< _b_heap.peek().map(|x| x.lamport).unwrap_or(0))
{
// swap
(&mut _b_heap, &mut _a_heap, &mut _b_vv, &mut _a_vv)
} else {
(&mut _a_heap, &mut _b_heap, &mut _a_vv, &mut _b_vv)
};
while !a_heap.is_empty()
&& a_heap.peek().map(|x| x.lamport).unwrap_or(0)
>= b_heap.peek().map(|x| x.lamport).unwrap_or(0)
{
let id = a_heap.pop().unwrap().id;
if let Some(range) = b_map.get(&id.client_id) {
if range.contains(&id.counter) {
let a = a_heap.pop().unwrap();
let id = a.id;
if let Some(counter_end) = b_vv.get(&id.client_id) {
if id.counter < *counter_end {
return Some(id);
}
}
let a = self.get(id).unwrap();
for dep in a.deps() {
for dep_id in a.deps {
let dep = self.get(*dep_id).unwrap();
a_heap.push(OrdId {
id: *dep,
lamport: a.lamport_start() + a.len() as Lamport,
id: *dep_id,
lamport: dep_id.counter + dep.lamport_start()
- dep.dag_id_start().counter,
deps: dep.deps(),
});
}
if let Some(range) = a_map.get_mut(&id.client_id) {
range.start = a.dag_id_start().counter;
} else {
let span = a.dag_id_span();
a_map.insert(id.client_id, span.counter.from..span.counter.to);
a_vv.entry(dep_id.client_id)
.or_insert_with(|| dep_id.counter + 1);
}
}
}

View file

@ -145,7 +145,11 @@ impl TestDag {
pending.push((client_id, i));
return true;
}
update_frontier(&mut self.frontier, node.id, &node.deps);
update_frontier(
&mut self.frontier,
node.id.inc((node.len() - 1) as u32),
&node.deps,
);
self.nodes
.entry(client_id)
.or_insert(vec![])
@ -191,28 +195,109 @@ fn test_dag() {
assert_eq!(b.next_lamport, 3);
assert_eq!(b.frontier().len(), 2);
assert_eq!(
b.get_common_ancestor(ID::new(0, 2), ID::new(1, 1)),
b.find_common_ancestor(ID::new(0, 2), ID::new(1, 1)),
Some(ID::new(1, 0))
);
}
#[cfg(not(no_proptest))]
#[derive(Debug, Clone, Copy)]
struct Interaction {
dag_idx: usize,
merge_with: Option<usize>,
len: usize,
}
mod find_common_ancestors {
use super::*;
#[test]
fn no_common_ancestors() {
let mut a = TestDag::new(0);
let mut b = TestDag::new(1);
a.push(1);
b.push(1);
a.merge(&b);
let actual = a.find_common_ancestor(ID::new(0, 0), ID::new(1, 0));
assert_eq!(actual, None);
// interactions between b and c
let mut c = TestDag::new(2);
c.merge(&b);
c.push(2);
b.merge(&c);
b.push(3);
// should no exist any common ancestor between a and b
let actual = a.find_common_ancestor(ID::new(0, 0), ID::new(1, 0));
assert_eq!(actual, None);
}
#[test]
fn dep_in_middle() {
let mut a = TestDag::new(0);
let mut b = TestDag::new(1);
a.push(4);
b.push(4);
b.push(5);
b.merge(&a);
b.frontier.retain(|x| x.client_id == 1);
let k = b.nodes.get_mut(&1).unwrap();
k[1].deps.push(ID::new(0, 2));
assert_eq!(
b.find_common_ancestor(ID::new(0, 3), ID::new(1, 8)),
Some(ID::new(0, 2))
);
}
/// ![](https://i.ibb.co/C5xLG53/image.png)
#[test]
fn large_lamport_with_longer_path() {
let mut a0 = TestDag::new(0);
let mut a1 = TestDag::new(1);
let mut a2 = TestDag::new(2);
a0.push(3);
a1.push(3);
a2.push(2);
a2.merge(&a0);
a2.push(1);
a1.merge(&a2);
a2.push(1);
a1.push(1);
a1.merge(&a2);
a1.push(1);
a1.nodes
.get_mut(&1)
.unwrap()
.last_mut()
.unwrap()
.deps
.push(ID::new(0, 1));
a0.push(1);
a1.merge(&a2);
a1.merge(&a0);
assert_eq!(
a1.find_common_ancestor(ID::new(0, 3), ID::new(1, 4)),
Some(ID::new(0, 2))
);
}
}
#[cfg(not(no_proptest))]
mod find_common_ancestors_proptest {
use proptest::prelude::*;
use crate::{array_mut_ref, unsafe_array_mut_ref};
use super::*;
#[derive(Debug, Clone, Copy)]
struct Interaction {
dag_idx: usize,
merge_with: Option<usize>,
len: usize,
}
prop_compose! {
fn gen_interaction(num: usize)(dag_idx in 0..num, merge_with in 0..num, length in 1..10, should_merge in 0..2) -> Interaction {
fn gen_interaction(num: usize) (
dag_idx in 0..num,
merge_with in 0..num,
length in 1..10,
should_merge in 0..2
) -> Interaction {
Interaction {
dag_idx,
merge_with: if should_merge == 1 && merge_with != dag_idx { Some(merge_with) } else { None },
@ -238,14 +323,6 @@ mod find_common_ancestors {
test(3, before_merged_insertions, after_merged_insertions)?;
}
#[test]
fn test_4dags(
before_merged_insertions in prop::collection::vec(gen_interaction(4), 0..300),
after_merged_insertions in prop::collection::vec(gen_interaction(4), 0..300)
) {
test(4, before_merged_insertions, after_merged_insertions)?;
}
#[test]
fn test_10dags(
before_merged_insertions in prop::collection::vec(gen_interaction(10), 0..300),
@ -253,14 +330,6 @@ mod find_common_ancestors {
) {
test(10, before_merged_insertions, after_merged_insertions)?;
}
#[test]
fn test_100dags(
before_merged_insertions in prop::collection::vec(gen_interaction(100), 0..2000),
after_merged_insertions in prop::collection::vec(gen_interaction(100), 0..2000)
) {
test(100, before_merged_insertions, after_merged_insertions)?;
}
}
fn preprocess(interactions: &mut [Interaction], num: i32) {
@ -317,10 +386,9 @@ mod find_common_ancestors {
dag1.push(1);
dag0.merge(dag1);
// dbg!(dag0, dag1, expected);
let actual = dags[0].get_common_ancestor(
dags[0].nodes.get(&0).unwrap().last().unwrap().id,
dags[1].nodes.get(&1).unwrap().last().unwrap().id,
);
let a = dags[0].nodes.get(&0).unwrap().last().unwrap().id;
let b = dags[1].nodes.get(&1).unwrap().last().unwrap().id;
let actual = dags[0].find_common_ancestor(a, b);
prop_assert_eq!(actual.unwrap(), expected);
Ok(())
}