feat: convert frontiers to version vector

This commit is contained in:
Zixuan Chen 2023-07-04 18:25:33 +08:00
parent 4a8ce16ff1
commit 336bd1e497
3 changed files with 212 additions and 7 deletions

View file

@ -14,7 +14,7 @@ use crate::event::Diff;
use crate::id::{Counter, PeerID, ID};
use crate::log_store::ClientChanges;
use crate::span::{HasId, HasLamport};
use crate::version::{Frontiers, VersionVector};
use crate::version::{Frontiers, ImVersionVector, VersionVector};
use super::diff_calc::DiffCalculator;
@ -47,6 +47,7 @@ pub struct AppDagNode {
cnt: Counter,
lamport: Lamport,
parents: SmallVec<[ID; 2]>,
vv: ImVersionVector,
len: usize,
}

View file

@ -1,3 +1,5 @@
use std::cmp::Ordering;
use super::*;
impl HasIndex for AppDagNode {
type Int = Counter;
@ -17,6 +19,7 @@ impl Sliceable for AppDagNode {
cnt: self.cnt + from as Counter,
lamport: self.lamport + from as Lamport,
parents: Default::default(),
vv: Default::default(),
len: to - from,
}
}
@ -76,3 +79,36 @@ impl Dag for AppDag {
self.vv.clone()
}
}
impl AppDag {
/// get the version vector for a certain op.
/// It's the version when the op is applied
pub fn get_vv(&self, id: ID) -> Option<ImVersionVector> {
self.map.get(&id.peer).and_then(|rle| {
rle.get(id.counter).map(|x| {
let mut vv = x.element.vv.clone();
vv.insert(id.peer, id.counter);
vv
})
})
}
/// Compare the causal order of two versions.
/// If None, two versions are concurrent to each other
pub fn cmp_version(&self, a: ID, b: ID) -> Option<Ordering> {
if a.peer == b.peer {
return Some(a.counter.cmp(&b.counter));
}
let a = self.get_vv(a).unwrap();
let b = self.get_vv(b).unwrap();
a.partial_cmp(&b)
}
pub fn get_lamport(&self, id: &ID) -> Option<Lamport> {
self.map.get(&id.peer).and_then(|rle| {
rle.get(id.counter)
.map(|x| x.element.lamport + (id.counter - x.element.cnt) as Lamport)
})
}
}

View file

@ -14,6 +14,7 @@ use tracing::instrument;
use crate::{
change::Lamport,
id::{Counter, ID},
refactor::oplog::AppDag,
span::{CounterSpan, HasId, HasIdSpan, IdSpan},
LoroError, PeerID,
};
@ -24,16 +25,73 @@ use crate::{
/// i.e. a [VersionVector] of `{A: 1, B: 2}` means that A has 1 atomic op and B has 2 atomic ops,
/// thus ID of `{client: A, counter: 1}` is out of the range.
///
/// In implementation, it's a immutable hash map with O(1) clone. Because
/// - we want a cheap clone op on vv;
/// - neighbor op's VersionVectors are very similar, most of the memory can be shared in
/// immutable hashmap
///
/// see also [im].
#[repr(transparent)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct VersionVector(FxHashMap<PeerID, Counter>);
/// Immutable version vector
///
/// It has O(1) clone time and O(logN) insert/delete/lookup time.
///
/// It's more memory efficient than [VersionVector] when the version vector
/// can be created from cloning and modifying other similar version vectors.
#[repr(transparent)]
#[derive(Debug, Clone, Default)]
pub struct ImVersionVector(im::HashMap<PeerID, Counter, fxhash::FxBuildHasher>);
impl ImVersionVector {
pub fn clear(&mut self) {
self.0.clear()
}
pub fn get(&self, key: &PeerID) -> Option<&Counter> {
self.0.get(key)
}
pub fn insert(&mut self, k: PeerID, v: Counter) {
self.0.insert(k, v);
}
pub fn is_empty(&self) -> bool {
self.0.is_empty()
}
pub fn iter(&self) -> im::hashmap::Iter<'_, PeerID, Counter> {
self.0.iter()
}
pub fn remove(&mut self, k: &PeerID) -> Option<Counter> {
self.0.remove(k)
}
pub fn len(&self) -> usize {
self.0.len()
}
pub fn contains_key(&self, k: &PeerID) -> bool {
self.0.contains_key(k)
}
/// Convert to a [Frontiers]
///
/// # Panic
///
/// When self is greater than dag.vv
pub fn to_frontiers(&self, dag: &AppDag) -> Frontiers {
let last_ids: Vec<ID> = self
.iter()
.filter_map(|(client_id, cnt)| {
if *cnt == 0 {
return None;
}
Some(ID::new(*client_id, cnt - 1))
})
.collect();
shrink_frontiers(last_ids, dag)
}
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct Frontiers(SmallVec<[ID; 2]>);
@ -119,6 +177,20 @@ impl PartialEq for VersionVector {
impl Eq for VersionVector {}
impl PartialEq for ImVersionVector {
fn eq(&self, other: &Self) -> bool {
self.0
.iter()
.all(|(client, counter)| other.0.get(client).unwrap_or(&0) == counter)
&& other
.0
.iter()
.all(|(client, counter)| self.0.get(client).unwrap_or(&0) == counter)
}
}
impl Eq for ImVersionVector {}
impl Deref for VersionVector {
type Target = FxHashMap<PeerID, Counter>;
@ -252,6 +324,48 @@ impl PartialOrd for VersionVector {
}
}
impl PartialOrd for ImVersionVector {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
let mut self_greater = true;
let mut other_greater = true;
let mut eq = true;
for (client_id, other_end) in other.iter() {
if let Some(self_end) = self.get(client_id) {
if self_end < other_end {
self_greater = false;
eq = false;
}
if self_end > other_end {
other_greater = false;
eq = false;
}
} else if *other_end > 0 {
self_greater = false;
eq = false;
}
}
for (client_id, self_end) in self.iter() {
if other.contains_key(client_id) {
continue;
} else if *self_end > 0 {
other_greater = false;
eq = false;
}
}
if eq {
Some(Ordering::Equal)
} else if self_greater {
Some(Ordering::Greater)
} else if other_greater {
Some(Ordering::Less)
} else {
None
}
}
}
impl DerefMut for VersionVector {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
@ -569,6 +683,60 @@ impl VersionVector {
pub fn decode(bytes: &[u8]) -> Result<Self, LoroError> {
postcard::from_bytes(bytes).map_err(|_| LoroError::DecodeVersionVectorError)
}
/// Convert to a [Frontiers]
///
/// # Panic
///
/// When self is greater than dag.vv
pub fn to_frontiers(&self, dag: &AppDag) -> Frontiers {
let last_ids: Vec<ID> = self
.iter()
.filter_map(|(client_id, cnt)| {
if *cnt == 0 {
return None;
}
Some(ID::new(*client_id, cnt - 1))
})
.collect();
shrink_frontiers(last_ids, dag)
}
}
/// Use minimal set of ids to represent the frontiers
fn shrink_frontiers(mut last_ids: Vec<ID>, dag: &AppDag) -> Frontiers {
// it only keep the ids of ops that are concurrent to each other
let mut frontiers = Frontiers::default();
let mut frontiers_vv = Vec::new();
if last_ids.len() == 1 {
frontiers.push(last_ids[0]);
return frontiers;
}
// sort by lamport, ascending
last_ids.sort_by_cached_key(|x| ((dag.get_lamport(x).unwrap() as isize), x.peer));
for id in last_ids.iter().rev() {
let vv = dag.get_vv(*id).unwrap();
let mut should_insert = true;
for f_vv in frontiers_vv.iter() {
if vv.partial_cmp(f_vv).is_some() {
// This is not concurrent op, should be ignored in frontiers
should_insert = false;
break;
}
}
if should_insert {
frontiers.push(*id);
frontiers_vv.push(vv);
}
}
frontiers
}
impl Default for VersionVector {