feat: apply change

This commit is contained in:
Zixuan Chen 2022-07-18 13:53:16 +08:00
parent aa9590b540
commit 8c0f033950
15 changed files with 363 additions and 111 deletions

View file

@ -11,3 +11,4 @@ rle = {path = "../rle"}
smallvec = "1.8.0"
smartstring = "1.0.1"
fxhash = "0.2.1"
ring = "0.16.20"

View file

@ -1,6 +1,9 @@
use std::char::MAX;
use crate::{id::ID, op::Op};
use crate::{
id::{Counter, ID},
op::Op,
};
use rle::{HasLength, Mergable, RleVec};
use smallvec::SmallVec;
@ -43,6 +46,14 @@ impl Change {
freezed,
}
}
pub fn last_id(&self) -> ID {
self.id.inc(self.len() as Counter - 1)
}
pub fn last_lamport(&self) -> Lamport {
self.lamport + self.len() as Lamport - 1
}
}
impl HasLength for Change {
@ -75,7 +86,9 @@ impl Mergable<ChangeMergeCfg> for Change {
return false;
}
if !other.deps.is_empty() {
if other.deps.is_empty()
|| (other.deps.len() == 1 && self.id.is_connected_id(&other.deps[0], self.len() as u32))
{
return false;
}

View file

@ -0,0 +1,42 @@
use crate::{change::ChangeMergeCfg, log_store::GcConfig, Timestamp};
use ring::rand::{SecureRandom, SystemRandom};
pub struct Configure {
pub change: ChangeMergeCfg,
pub gc: GcConfig,
pub(crate) get_time: fn() -> Timestamp,
pub(crate) rand: Box<dyn SecureRandomGenerator>,
}
pub trait SecureRandomGenerator {
fn fill_byte(&mut self, dest: &mut [u8]);
fn next_u64(&mut self) -> u64 {
let mut buf = [0u8; 8];
self.fill_byte(&mut buf);
u64::from_le_bytes(buf)
}
fn next_u32(&mut self) -> u32 {
let mut buf = [0u8; 4];
self.fill_byte(&mut buf);
u32::from_le_bytes(buf)
}
fn next_i64(&mut self) -> i64 {
let mut buf = [0u8; 8];
self.fill_byte(&mut buf);
i64::from_le_bytes(buf)
}
fn next_i32(&mut self) -> i32 {
let mut buf = [0u8; 4];
self.fill_byte(&mut buf);
i32::from_le_bytes(buf)
}
}
impl SecureRandomGenerator for SystemRandom {
fn fill_byte(&mut self, dest: &mut [u8]) {
self.fill(dest).unwrap();
}
}

View file

@ -1,16 +1,25 @@
//! CRDT [Container]. Each container may have different CRDT type [ContainerType].
//! Each [Op] has an associated container. It's the [Container]'s responsibility to
//! calculate the state from the [Op]s.
//!
//! Every [Container] can take a [Snapshot], which contains [crate::LoroValue] that describes the state.
//!
use crate::{
op::OpProxy, snapshot::Snapshot, version::VersionVector, InsertContent, InternalString,
LogStore, Op, SmString, ID,
};
use rle::{HasLength, Mergable, Sliceable};
use std::alloc::Layout;
use std::{alloc::Layout, fmt::Debug};
mod container_content;
mod manager;
pub mod map;
pub mod text;
pub use container_content::*;
pub use manager::*;
pub trait Container {
pub trait Container: Debug {
fn id(&self) -> &ContainerID;
fn type_id(&self) -> ContainerType;
fn apply(&mut self, op: &OpProxy);

View file

@ -0,0 +1,56 @@
use std::pin::Pin;
use fxhash::FxHashMap;
use crate::LogStore;
use super::{map::MapContainer, Container, ContainerID, ContainerType};
#[derive(Debug, Default)]
pub(crate) struct ContainerManager {
containers: FxHashMap<ContainerID, Box<dyn Container>>,
}
impl ContainerManager {
#[inline]
pub fn create(
&mut self,
id: ContainerID,
container_type: ContainerType,
store: Pin<&mut LogStore>,
) -> Box<dyn Container> {
match container_type {
ContainerType::Map => Box::new(MapContainer::new(id, store)),
_ => unimplemented!(),
}
}
#[inline]
pub fn get(&self, id: ContainerID) -> Option<&dyn Container> {
self.containers.get(&id).map(|c| c.as_ref())
}
#[inline]
pub fn get_mut(&mut self, id: ContainerID) -> Option<&mut Box<dyn Container>> {
self.containers.get_mut(&id)
}
#[inline]
fn insert(&mut self, id: ContainerID, container: Box<dyn Container>) {
self.containers.insert(id, container);
}
pub fn get_or_create(
&mut self,
id: ContainerID,
container_type: ContainerType,
store: Pin<&mut LogStore>,
) -> &mut Box<dyn Container> {
if !self.containers.contains_key(&id) {
let container = self.create(id.clone(), container_type, store);
self.insert(id.clone(), container);
}
self.get_mut(id).unwrap()
}
}

View file

@ -4,9 +4,9 @@ use fxhash::FxHashMap;
use crate::{
container::{Container, ContainerID, ContainerType},
content::downcast_ref,
op::content::downcast_ref,
op::OpProxy,
value::{InsertValue, SnapshotValue},
value::{InsertValue, LoroValue},
ClientID, InternalString, Lamport, LogStore, OpType, Snapshot,
};
@ -14,6 +14,8 @@ use super::MapInsertContent;
/// we can only insert to Map
/// delete = set null
///
#[derive(Debug)]
pub struct MapContainer {
id: ContainerID,
state: FxHashMap<InternalString, ValueSlot>,
@ -27,12 +29,14 @@ struct TotalOrder {
client_id: ClientID,
}
#[derive(Debug)]
struct ValueSlot {
value: InsertValue,
order: TotalOrder,
}
impl MapContainer {
#[inline]
pub fn new(id: ContainerID, store: Pin<&mut LogStore>) -> Self {
MapContainer {
id,
@ -57,6 +61,7 @@ impl MapContainer {
}
impl Container for MapContainer {
#[inline(always)]
fn id(&self) -> &ContainerID {
&self.id
}
@ -101,13 +106,13 @@ impl Container for MapContainer {
map.insert(key.clone(), value.value.clone().into());
}
self.snapshot = Some(Snapshot::new(SnapshotValue::Map(map)));
self.snapshot = Some(Snapshot::new(LoroValue::Map(map)));
}
self.snapshot.as_ref().unwrap()
}
fn checkout_version(&mut self, vv: &crate::version::VersionVector, log: &crate::LogStore) {
fn checkout_version(&mut self, _vv: &crate::version::VersionVector, _log: &crate::LogStore) {
todo!()
}
}

View file

@ -1,4 +1,4 @@
use crate::{content, ContentType, InsertContent, ID};
use crate::{ContentType, InsertContent, ID};
use rle::{HasLength, Mergable, Sliceable};
#[derive(Debug, Clone)]
@ -63,7 +63,7 @@ impl HasLength for TextContent {
#[cfg(test)]
mod test {
use crate::{container::ContainerID, content, id::ROOT_ID, ContentType, Op, OpContent, ID};
use crate::{container::ContainerID, id::ROOT_ID, ContentType, Op, OpContent, ID};
use rle::RleVec;
use super::TextContent;
@ -97,8 +97,9 @@ mod test {
));
assert_eq!(vec.merged_len(), 1);
let merged = vec.get_merged(0);
assert_eq!(merged.content().id(), ContentType::Text);
let text_content = content::downcast_ref::<TextContent>(&**merged.content()).unwrap();
assert_eq!(merged.insert_content().id(), ContentType::Text);
let text_content =
crate::op::content::downcast_ref::<TextContent>(&**merged.insert_content()).unwrap();
assert_eq!(text_content.text, "ab");
}
@ -132,12 +133,12 @@ mod test {
assert_eq!(vec.merged_len(), 2);
assert_eq!(
vec.slice_iter(2, 6)
.map(
|x| content::downcast_ref::<TextContent>(&**x.into_inner().content())
.unwrap()
.text
.clone()
.map(|x| crate::op::content::downcast_ref::<TextContent>(
&**x.into_inner().insert_content()
)
.unwrap()
.text
.clone())
.collect::<Vec<String>>(),
vec!["34", "56"]
)

View file

@ -1,4 +1,5 @@
pub type ClientID = u64;
pub type Counter = u32;
#[derive(PartialEq, Eq, Hash, Clone, Debug, Copy, PartialOrd, Ord)]
pub struct ID {
@ -23,4 +24,16 @@ impl ID {
pub fn is_null(&self) -> bool {
self.client_id == u64::MAX
}
#[inline]
pub(crate) fn is_connected_id(&self, other: &Self, self_len: u32) -> bool {
self.client_id == other.client_id && self.counter + self_len == other.counter
}
pub fn inc(&self, inc: u32) -> Self {
ID {
client_id: self.client_id,
counter: self.counter + inc,
}
}
}

View file

@ -1,22 +1,27 @@
//! # Loro
//!
//!
#![allow(dead_code, unused_imports)]
mod change;
mod id;
pub mod change;
pub mod configure;
pub mod id;
mod id_span;
mod log_store;
mod op;
pub mod op;
mod snapshot;
mod value;
mod version;
pub mod container;
pub use change::{Change, Lamport, Timestamp};
pub use id::{ClientID, ID};
pub(crate) use change::{Change, Lamport, Timestamp};
pub(crate) use id::{ClientID, ID};
pub use log_store::LogStore;
pub use op::{content, ContentType, InsertContent, Op, OpContent, OpType};
pub(crate) use op::{ContentType, InsertContent, Op, OpContent, OpType};
use smartstring::{LazyCompact, SmartString};
pub use snapshot::Snapshot;
pub(crate) use snapshot::Snapshot;
pub(crate) type SmString = SmartString<LazyCompact>;
use string_cache::DefaultAtom;
pub(crate) type InternalString = DefaultAtom;
pub use value::LoroValue;

View file

@ -1,12 +1,15 @@
use fxhash::FxHashMap;
use rle::RleVec;
use ring::rand::SystemRandom;
use rle::{HasLength, RleVec};
use smallvec::SmallVec;
use string_cache::{Atom, DefaultAtom, EmptyStaticAtomSet};
use crate::{
change::{Change, ChangeMergeCfg},
container::{Container, ContainerID},
id::ClientID,
configure::Configure,
container::{Container, ContainerID, ContainerManager},
id::{ClientID, Counter},
id_span::IdSpan,
Lamport, Op, Timestamp, ID,
};
const YEAR: u64 = 365 * 24 * 60 * 60;
@ -26,39 +29,35 @@ impl Default for GcConfig {
}
}
pub struct Configure {
pub change: ChangeMergeCfg,
pub gc: GcConfig,
get_time: fn() -> Timestamp,
}
/// Entry of the loro inner state.
pub struct LogStore {
ops: FxHashMap<ClientID, RleVec<Change, ChangeMergeCfg>>,
changes: FxHashMap<ClientID, RleVec<Change, ChangeMergeCfg>>,
cfg: Configure,
latest_lamport: Lamport,
latest_timestamp: Lamport,
latest_timestamp: Timestamp,
pub(crate) this_client_id: ClientID,
frontier: SmallVec<[ID; 2]>,
containers: FxHashMap<ContainerID, Box<dyn Container>>,
/// CRDT container manager
container: ContainerManager,
}
impl LogStore {
pub fn new(cfg: Configure, client_id: Option<ClientID>) -> Self {
pub fn new(mut cfg: Configure, client_id: Option<ClientID>) -> Self {
let this_client_id = client_id.unwrap_or_else(|| cfg.rand.next_u64());
Self {
cfg,
ops: FxHashMap::default(),
this_client_id,
changes: FxHashMap::default(),
latest_lamport: 0,
latest_timestamp: 0,
containers: Default::default(),
container: Default::default(),
frontier: Default::default(),
// TODO: or else random id
this_client_id: client_id.unwrap_or_else(|| 0),
}
}
pub fn lookup_change(&self, id: ID) -> Option<&Change> {
self.ops
self.changes
.get(&id.client_id)
.map(|changes| changes.get(id.counter as usize).unwrap().element)
}
@ -67,26 +66,94 @@ impl LogStore {
self.latest_lamport + 1
}
pub fn append_local_change(&mut self, change: Change) {
self.ops
.entry(change.id.client_id)
.or_insert(RleVec::new())
.push(change);
todo!("set frontier timestamp and lamport");
todo!("frontier of the same client can be dropped, if only itself is included");
}
pub fn append_local_op(&mut self, op: Op) {
// TODO: we can check change mergeable before append
let change = Change {
id: op.id,
ops: vec![op].into(),
deps: self.frontier.clone(),
lamport: self.next_lamport(),
timestamp: (self.cfg.get_time)(),
pub fn append_local_ops(&mut self, ops: Vec<Op>) {
let lamport = self.next_lamport();
let timestamp = (self.cfg.get_time)();
let id = ID {
client_id: self.this_client_id,
counter: self.get_next_counter(self.this_client_id),
};
let mut change = Change {
id,
ops: ops.into(),
deps: std::mem::take(&mut self.frontier),
lamport,
timestamp,
freezed: false,
};
self.append_local_change(change);
change.deps.push(ID::new(
self.this_client_id,
id.counter + change.len() as Counter - 1,
));
self.latest_lamport = lamport + change.len() as u32 - 1;
self.latest_timestamp = timestamp;
self.changes
.entry(self.this_client_id)
.or_insert_with(RleVec::new)
.push(change);
}
pub fn apply_remote_change(&mut self, mut change: Change) {
change.freezed = true;
if self.includes(change.last_id()) {
return;
}
for dep in &change.deps {
if !self.includes(*dep) {
unimplemented!("need impl pending changes");
}
}
self.frontier = self
.frontier
.iter()
.filter(|x| !change.deps.contains(x))
.copied()
.collect();
self.frontier.push(change.last_id());
if change.last_lamport() > self.latest_lamport {
self.latest_lamport = change.last_lamport();
}
if change.timestamp > self.latest_timestamp {
self.latest_timestamp = change.timestamp;
}
for op in change.ops.iter() {
self.apply_remote_op(op);
}
self.push_change(change);
}
#[inline]
fn push_change(&mut self, change: Change) {
self.changes
.entry(change.id.client_id)
.or_insert_with(RleVec::new)
.push(change);
}
/// this function assume op is not included in the log, and its deps are included.
fn apply_remote_op(&mut self, op: &Op) {
todo!()
}
pub fn includes(&self, id: ID) -> bool {
self.changes
.get(&id.client_id)
.map_or(0, |changes| changes.len())
> id.counter as usize
}
fn get_next_counter(&self, client_id: ClientID) -> Counter {
self.changes
.get(&client_id)
.map(|changes| changes.len())
.unwrap_or(0) as Counter
}
}
@ -97,6 +164,7 @@ impl Default for LogStore {
change: Default::default(),
gc: Default::default(),
get_time: || 0,
rand: Box::new(SystemRandom::new()),
},
None,
)

View file

@ -16,7 +16,6 @@ pub enum OpType {
Restore,
}
#[derive(Debug, Clone)]
/// Operation is a unit of change.
///
/// It has 3 types:
@ -25,12 +24,14 @@ pub enum OpType {
/// - Restore
///
/// A Op may have multiple atomic operations, since Op can be merged.
#[derive(Debug, Clone)]
pub struct Op {
pub(crate) id: ID,
pub(crate) content: OpContent,
}
impl Op {
#[inline]
pub fn new(id: ID, content: OpContent) -> Self {
Op { id, content }
}
@ -43,17 +44,17 @@ impl Op {
}
}
#[allow(clippy::borrowed_box)]
pub fn content(&self) -> &Box<dyn InsertContent> {
pub fn container(&self) -> &ContainerID {
match &self.content {
OpContent::Insert { content, .. } => content,
OpContent::Insert { container, .. } => container,
_ => unreachable!(),
}
}
pub fn container(&self) -> &ContainerID {
#[allow(clippy::borrowed_box)]
pub fn insert_content(&self) -> &Box<dyn InsertContent> {
match &self.content {
OpContent::Insert { container, .. } => container,
OpContent::Insert { content, .. } => content,
_ => unreachable!(),
}
}
@ -61,27 +62,8 @@ impl Op {
impl Mergable for Op {
fn is_mergable(&self, other: &Self, cfg: &()) -> bool {
match &self.content {
OpContent::Insert { container, content } => match other.content {
OpContent::Insert {
container: ref other_container,
content: ref other_content,
} => container == other_container && content.is_mergable_content(&**other_content),
_ => false,
},
OpContent::Delete { target } => match other.content {
OpContent::Delete {
target: ref other_target,
} => target.is_mergable(other_target, cfg),
_ => false,
},
OpContent::Restore { target } => match other.content {
OpContent::Restore {
target: ref other_target,
} => target.is_mergable(other_target, cfg),
_ => false,
},
}
self.id.is_connected_id(&other.id, self.len() as u32)
&& self.content.is_mergable(&other.content, cfg)
}
fn merge(&mut self, other: &Self, cfg: &()) {

View file

@ -1,8 +1,8 @@
use rle::{HasLength, RleVec, Sliceable};
use rle::{HasLength, Mergable, RleVec, Sliceable};
use crate::{container::ContainerID, id::ID, id_span::IdSpan};
use crate::{container::ContainerID, id::ID, id_span::IdSpan, OpType};
use super::InsertContent;
use super::{InsertContent, MergeableContent};
#[derive(Debug)]
pub enum OpContent {
@ -11,13 +11,25 @@ pub enum OpContent {
content: Box<dyn InsertContent>,
},
Delete {
container: ContainerID,
target: RleVec<IdSpan>,
},
Restore {
container: ContainerID,
target: RleVec<IdSpan>,
},
}
impl OpContent {
pub fn op_type(&self) -> OpType {
match self {
OpContent::Insert { .. } => OpType::Insert,
OpContent::Delete { .. } => OpType::Delete,
OpContent::Restore { .. } => OpType::Restore,
}
}
}
impl HasLength for OpContent {
fn len(&self) -> usize {
match self {
@ -35,10 +47,12 @@ impl Clone for OpContent {
container: container.clone(),
content: content.clone_content(),
},
OpContent::Delete { target } => OpContent::Delete {
OpContent::Delete { target, container } => OpContent::Delete {
container: container.clone(),
target: target.clone(),
},
OpContent::Restore { target } => OpContent::Restore {
OpContent::Restore { target, container } => OpContent::Restore {
container: container.clone(),
target: target.clone(),
},
}
@ -52,12 +66,51 @@ impl Sliceable for OpContent {
container: container.clone(),
content: content.slice_content(from, to),
},
OpContent::Delete { target } => OpContent::Delete {
OpContent::Delete { target, container } => OpContent::Delete {
container: container.clone(),
target: target.slice(from, to),
},
OpContent::Restore { target } => OpContent::Restore {
OpContent::Restore { target, container } => OpContent::Restore {
container: container.clone(),
target: target.slice(from, to),
},
}
}
}
impl Mergable for OpContent {
fn is_mergable(&self, other: &Self, cfg: &()) -> bool
where
Self: Sized,
{
match &self {
OpContent::Insert { container, content } => match other {
OpContent::Insert {
container: ref other_container,
content: ref other_content,
} => container == other_container && content.is_mergable_content(&**other_content),
_ => false,
},
OpContent::Delete { target, container } => match other {
OpContent::Delete {
target: ref other_target,
container: other_container,
} => container == other_container && target.is_mergable(other_target, cfg),
_ => false,
},
OpContent::Restore { target, container } => match other {
OpContent::Restore {
target: ref other_target,
container: ref other_container,
} => container == other_container && target.is_mergable(other_target, cfg),
_ => false,
},
}
}
fn merge(&mut self, _other: &Self, _conf: &())
where
Self: Sized,
{
}
}

View file

@ -1,16 +1,16 @@
use crate::value::SnapshotValue;
use crate::value::LoroValue;
#[derive(Debug, PartialEq, Clone)]
pub struct Snapshot {
value: SnapshotValue,
value: LoroValue,
}
impl Snapshot {
pub fn new(value: SnapshotValue) -> Self {
pub fn new(value: LoroValue) -> Self {
Snapshot { value }
}
pub fn value(&self) -> &SnapshotValue {
pub fn value(&self) -> &LoroValue {
&self.value
}
}

View file

@ -2,34 +2,34 @@ use fxhash::FxHashMap;
use crate::{container::ContainerID, InternalString, SmString};
/// [SnapshotValue] is used to represents the state at a given time
/// [LoroValue] is used to represents the state of CRDT at a given version
#[derive(Debug, PartialEq, Clone)]
pub enum SnapshotValue {
pub enum LoroValue {
Null,
Bool(bool),
Double(f64),
Integer(i32),
String(SmString),
List(Vec<SnapshotValue>),
Map(FxHashMap<InternalString, SnapshotValue>),
List(Vec<LoroValue>),
Map(FxHashMap<InternalString, LoroValue>),
Unresolved(ContainerID),
}
impl Default for SnapshotValue {
impl Default for LoroValue {
fn default() -> Self {
SnapshotValue::Null
LoroValue::Null
}
}
impl From<InsertValue> for SnapshotValue {
impl From<InsertValue> for LoroValue {
fn from(v: InsertValue) -> Self {
match v {
InsertValue::Null => SnapshotValue::Null,
InsertValue::Bool(b) => SnapshotValue::Bool(b),
InsertValue::Double(d) => SnapshotValue::Double(d),
InsertValue::Integer(i) => SnapshotValue::Integer(i),
InsertValue::String(s) => SnapshotValue::String(s),
InsertValue::Container(c) => SnapshotValue::Unresolved(c),
InsertValue::Null => LoroValue::Null,
InsertValue::Bool(b) => LoroValue::Bool(b),
InsertValue::Double(d) => LoroValue::Double(d),
InsertValue::Integer(i) => LoroValue::Integer(i),
InsertValue::String(s) => LoroValue::String(s),
InsertValue::Container(c) => LoroValue::Unresolved(c),
}
}
}

View file

@ -209,6 +209,10 @@ impl<T, Conf> RleVec<T, Conf> {
&self.vec
}
pub fn iter(&self) -> std::slice::Iter<'_, T> {
self.vec.iter()
}
pub fn vec_mut(&mut self) -> &mut Vec<T> {
&mut self.vec
}