mirror of
https://github.com/loro-dev/loro.git
synced 2025-02-08 21:47:41 +00:00
refactor: impl dag for log store
This commit is contained in:
parent
f3b36680f9
commit
d8bb107a56
4 changed files with 66 additions and 21 deletions
|
@ -6,11 +6,12 @@
|
||||||
//! In future, we may also use [Change] to represent a transaction. But this decision is postponed.
|
//! In future, we may also use [Change] to represent a transaction. But this decision is postponed.
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
|
dag::DagNode,
|
||||||
id::{Counter, ID},
|
id::{Counter, ID},
|
||||||
op::Op,
|
op::Op,
|
||||||
span::HasId,
|
span::{HasId, HasLamport},
|
||||||
};
|
};
|
||||||
use rle::{HasLength, Mergable, RleVec};
|
use rle::{HasLength, Mergable, RleVec, Sliceable};
|
||||||
use smallvec::SmallVec;
|
use smallvec::SmallVec;
|
||||||
|
|
||||||
pub type Timestamp = i64;
|
pub type Timestamp = i64;
|
||||||
|
@ -128,3 +129,34 @@ impl HasId for Change {
|
||||||
self.id
|
self.id
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl HasLamport for Change {
|
||||||
|
fn lamport(&self) -> Lamport {
|
||||||
|
self.lamport
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Sliceable for Change {
|
||||||
|
// TODO: feels slow, need to confirm whether this affects performance
|
||||||
|
fn slice(&self, from: usize, to: usize) -> Self {
|
||||||
|
Self {
|
||||||
|
ops: self.ops.slice(from, to),
|
||||||
|
deps: if from > 0 {
|
||||||
|
smallvec::smallvec![self.id.inc(from as Counter - 1)]
|
||||||
|
} else {
|
||||||
|
self.deps.clone()
|
||||||
|
},
|
||||||
|
id: self.id.inc(from as Counter),
|
||||||
|
lamport: self.lamport + from as Lamport,
|
||||||
|
timestamp: self.timestamp,
|
||||||
|
freezed: self.freezed,
|
||||||
|
break_points: self.break_points.clone(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl DagNode for Change {
|
||||||
|
fn deps(&self) -> &[ID] {
|
||||||
|
&self.deps
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -35,16 +35,6 @@ use self::{
|
||||||
pub(crate) trait DagNode: HasLamport + HasId + HasLength + Debug + Sliceable {
|
pub(crate) trait DagNode: HasLamport + HasId + HasLength + Debug + Sliceable {
|
||||||
fn deps(&self) -> &[ID];
|
fn deps(&self) -> &[ID];
|
||||||
|
|
||||||
#[inline]
|
|
||||||
fn is_empty(&self) -> bool {
|
|
||||||
self.len() == 0
|
|
||||||
}
|
|
||||||
|
|
||||||
#[inline]
|
|
||||||
fn get_offset(&self, c: Counter) -> Counter {
|
|
||||||
c - self.id_start().counter
|
|
||||||
}
|
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
fn get_lamport_from_counter(&self, c: Counter) -> Lamport {
|
fn get_lamport_from_counter(&self, c: Counter) -> Lamport {
|
||||||
self.lamport() + c as Lamport - self.id_start().counter as Lamport
|
self.lamport() + c as Lamport - self.id_start().counter as Lamport
|
||||||
|
@ -69,8 +59,7 @@ pub(crate) trait Dag {
|
||||||
fn get(&self, id: ID) -> Option<&Self::Node>;
|
fn get(&self, id: ID) -> Option<&Self::Node>;
|
||||||
|
|
||||||
fn frontier(&self) -> &[ID];
|
fn frontier(&self) -> &[ID];
|
||||||
fn roots(&self) -> Vec<&Self::Node>;
|
fn vv(&self) -> VersionVector;
|
||||||
fn vv(&self) -> &VersionVector;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) trait DagUtils: Dag {
|
pub(crate) trait DagUtils: Dag {
|
||||||
|
|
|
@ -105,12 +105,8 @@ impl Dag for TestDag {
|
||||||
&self.frontier
|
&self.frontier
|
||||||
}
|
}
|
||||||
|
|
||||||
fn roots(&self) -> Vec<&Self::Node> {
|
fn vv(&self) -> VersionVector {
|
||||||
self.nodes.values().map(|v| &v[0]).collect()
|
self.version_vec.clone()
|
||||||
}
|
|
||||||
|
|
||||||
fn vv(&self) -> &VersionVector {
|
|
||||||
&self.version_vec
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -16,8 +16,10 @@ use crate::{
|
||||||
change::{Change, ChangeMergeCfg},
|
change::{Change, ChangeMergeCfg},
|
||||||
configure::Configure,
|
configure::Configure,
|
||||||
container::{manager::ContainerManager, Container},
|
container::{manager::ContainerManager, Container},
|
||||||
|
dag::Dag,
|
||||||
id::{ClientID, Counter},
|
id::{ClientID, Counter},
|
||||||
op::OpProxy,
|
op::OpProxy,
|
||||||
|
span::HasIdSpan,
|
||||||
Lamport, Op, Timestamp, ID,
|
Lamport, Op, Timestamp, ID,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -70,7 +72,6 @@ impl LogStore {
|
||||||
container: Arc<RwLock<ContainerManager>>,
|
container: Arc<RwLock<ContainerManager>>,
|
||||||
) -> Arc<RwLock<Self>> {
|
) -> Arc<RwLock<Self>> {
|
||||||
let this_client_id = client_id.unwrap_or_else(|| cfg.rand.next_u64());
|
let this_client_id = client_id.unwrap_or_else(|| cfg.rand.next_u64());
|
||||||
|
|
||||||
|
|
||||||
Arc::new(RwLock::new(Self {
|
Arc::new(RwLock::new(Self {
|
||||||
cfg,
|
cfg,
|
||||||
|
@ -228,3 +229,30 @@ impl LogStore {
|
||||||
iter::OpIter::new(&self.changes)
|
iter::OpIter::new(&self.changes)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl Dag for LogStore {
|
||||||
|
type Node = Change;
|
||||||
|
|
||||||
|
fn get(&self, id: ID) -> Option<&Self::Node> {
|
||||||
|
self.changes
|
||||||
|
.get(&id.client_id)
|
||||||
|
.and_then(|x| x.get(id.counter as usize).map(|x| x.element))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn frontier(&self) -> &[ID] {
|
||||||
|
&self.frontier
|
||||||
|
}
|
||||||
|
|
||||||
|
fn vv(&self) -> crate::VersionVector {
|
||||||
|
self.changes
|
||||||
|
.iter()
|
||||||
|
.map(|(client, changes)| {
|
||||||
|
changes
|
||||||
|
.vec()
|
||||||
|
.last()
|
||||||
|
.map(|x| x.id_last())
|
||||||
|
.unwrap_or_else(|| ID::new(*client, 0))
|
||||||
|
})
|
||||||
|
.collect()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue