refactor: use isomorphic structure for sync/async

This commit is contained in:
Zixuan Chen 2022-10-28 18:22:46 +08:00
parent bd30f675a6
commit c56c286d76
10 changed files with 164 additions and 84 deletions

View file

@ -42,6 +42,7 @@ criterion = "0.4.0"
doctest = false
[features]
parallel = []
# whether to use list slice instead of raw str in text container
slice = []
fuzzing = ["crdt-list/fuzzing", "rand", "slice", "arbitrary", "tabled"]

View file

@ -84,7 +84,7 @@ pub struct ContainerManager {
impl ContainerManager {
#[inline]
pub fn create(
pub(crate) fn create(
&mut self,
id: ContainerID,
container_type: ContainerType,
@ -114,7 +114,7 @@ impl ContainerManager {
self.containers.insert(id, container);
}
pub fn get_or_create(
pub(crate) fn get_or_create(
&mut self,
id: &ContainerID,
log_store: LogStoreWeakRef,

View file

@ -41,10 +41,16 @@ impl MapContainer {
}
}
pub fn insert(&mut self, key: InternalString, value: InsertValue, store: LogStoreWeakRef) {
// FIXME: keep store in the struct
pub(crate) fn insert(
&mut self,
key: InternalString,
value: InsertValue,
store: LogStoreWeakRef,
) {
let self_id = self.id.clone();
let m = store.upgrade().unwrap();
let mut store = m.write().unwrap();
let mut store = m.write();
let client_id = store.this_client_id;
let order = TotalOrderStamp {
client_id,
@ -83,8 +89,9 @@ impl MapContainer {
);
}
// FIXME: keep store in the struct
#[inline]
pub fn delete(&mut self, key: InternalString, store: LogStoreWeakRef) {
pub(crate) fn delete(&mut self, key: InternalString, store: LogStoreWeakRef) {
self.insert(key, InsertValue::Null, store);
}
}

View file

@ -1,13 +1,13 @@
#![cfg(test)]
use std::collections::HashMap;
use std::sync::Arc;
use fxhash::FxHashMap;
use proptest::prelude::*;
use proptest::proptest;
use crate::container::Container;
use crate::isomorph::Irc;
use crate::value::proptest::gen_insert_value;
use crate::{fx_map, value::InsertValue, LoroCore, LoroValue};
@ -15,7 +15,7 @@ use crate::{fx_map, value::InsertValue, LoroCore, LoroValue};
#[test]
fn basic() {
let mut loro = LoroCore::default();
let weak = Arc::downgrade(&loro.log_store);
let weak = Irc::downgrade(&loro.log_store);
let mut a = loro.get_map_container("map".into());
let container = a.as_mut();
container.insert("haha".into(), InsertValue::Int32(1), weak);
@ -38,7 +38,7 @@ mod map_proptest {
value in prop::collection::vec(gen_insert_value(), 0..10 * PROPTEST_FACTOR_10)
) {
let mut loro = LoroCore::default();
let weak = Arc::downgrade(&loro.log_store);
let weak = Irc::downgrade(&loro.log_store);
let mut a = loro.get_map_container("map".into());
let container = a.as_mut();
let mut map: HashMap<String, InsertValue> = HashMap::new();

View file

@ -40,7 +40,7 @@ pub struct TextContainer {
}
impl TextContainer {
pub fn new(id: ContainerID, log_store: LogStoreWeakRef) -> Self {
pub(crate) fn new(id: ContainerID, log_store: LogStoreWeakRef) -> Self {
Self {
id,
log_store,
@ -59,28 +59,25 @@ impl TextContainer {
return None;
}
let id = if let Ok(mut store) = self.log_store.upgrade().unwrap().write() {
let id = store.next_id();
#[cfg(feature = "slice")]
let slice = ListSlice::from_range(self.raw_str.alloc(text));
#[cfg(not(feature = "slice"))]
let slice = ListSlice::from_raw(SmString::from(text));
self.state.insert(pos, slice.clone());
let op = Op::new(
id,
OpContent::Normal {
content: InsertContent::List(ListOp::Insert { slice, pos }),
},
self.id.clone(),
);
let last_id = op.id_last();
store.append_local_ops(vec![op]);
self.head = smallvec![last_id];
self.vv.set_last(last_id);
id
} else {
unimplemented!()
};
let s = self.log_store.upgrade().unwrap();
let mut store = s.write();
let id = store.next_id();
#[cfg(feature = "slice")]
let slice = ListSlice::from_range(self.raw_str.alloc(text));
#[cfg(not(feature = "slice"))]
let slice = ListSlice::from_raw(SmString::from(text));
self.state.insert(pos, slice.clone());
let op = Op::new(
id,
OpContent::Normal {
content: InsertContent::List(ListOp::Insert { slice, pos }),
},
self.id.clone(),
);
let last_id = op.id_last();
store.append_local_ops(vec![op]);
self.head = smallvec![last_id];
self.vv.set_last(last_id);
Some(id)
}
@ -90,26 +87,22 @@ impl TextContainer {
return None;
}
let id = if let Ok(mut store) = self.log_store.upgrade().unwrap().write() {
let id = store.next_id();
let op = Op::new(
id,
OpContent::Normal {
content: InsertContent::List(ListOp::Delete { len, pos }),
},
self.id.clone(),
);
let last_id = op.id_last();
store.append_local_ops(vec![op]);
self.state.delete_range(Some(pos), Some(pos + len));
self.head = smallvec![last_id];
self.vv.set_last(last_id);
id
} else {
unimplemented!()
};
let s = self.log_store.upgrade().unwrap();
let mut store = s.write();
let id = store.next_id();
let op = Op::new(
id,
OpContent::Normal {
content: InsertContent::List(ListOp::Delete { len, pos }),
},
self.id.clone(),
);
let last_id = op.id_last();
store.append_local_ops(vec![op]);
self.state.delete_range(Some(pos), Some(pos + len));
self.head = smallvec![last_id];
self.vv.set_last(last_id);
Some(id)
}

View file

@ -202,7 +202,7 @@ pub mod fuzz {
}
use crdt_list::test::{Action, TestFramework};
use rle::{RleVec, RleVecWithIndex, RleVecWithLen};
use rle::{RleVecWithIndex, RleVecWithLen};
use tabled::TableIteratorExt;
use crate::{

View file

@ -0,0 +1,78 @@
use std::{
cell::{Ref, RefCell, RefMut},
rc::{Rc, Weak as RcWeak},
sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard, Weak as ArcWeak},
};
#[cfg(feature = "parallel")]
pub(crate) type Irc<T> = Arc<T>;
#[cfg(not(feature = "parallel"))]
pub(crate) type Irc<T> = Rc<T>;
#[cfg(feature = "parallel")]
pub(crate) type IsoWeak<T> = ArcWeak<T>;
#[cfg(not(feature = "parallel"))]
pub(crate) type IsoWeak<T> = RcWeak<T>;
#[cfg(feature = "parallel")]
#[derive(Debug)]
pub(crate) struct IsoRw<T>(RwLock<T>);
#[cfg(not(feature = "parallel"))]
#[derive(Debug)]
pub(crate) struct IsoRw<T>(RefCell<T>);
#[cfg(feature = "parallel")]
pub(crate) type IsoRef<'a, T> = RwLockReadGuard<'a, T>;
#[cfg(not(feature = "parallel"))]
pub(crate) type IsoRef<'a, T> = Ref<'a, T>;
#[cfg(feature = "parallel")]
pub(crate) type IsoRefMut<'a, T> = RwLockWriteGuard<'a, T>;
#[cfg(not(feature = "parallel"))]
pub(crate) type IsoRefMut<'a, T> = RefMut<'a, T>;
#[cfg(feature = "parallel")]
mod rw_parallel {
use super::*;
impl<T> IsoRw<T> {
#[inline(always)]
pub fn new(t: T) -> Self {
Self(RwLock::new(t))
}
#[inline(always)]
pub fn read(&self) -> std::sync::RwLockReadGuard<T> {
self.0.read().unwrap()
}
#[inline(always)]
pub fn write(&self) -> std::sync::RwLockWriteGuard<T> {
self.0.write().unwrap()
}
}
}
#[cfg(not(feature = "parallel"))]
mod rw_single {
use std::{cell::RefCell, ops::Deref};
use super::IsoRw;
impl<T> IsoRw<T> {
#[inline(always)]
pub fn new(t: T) -> Self {
IsoRw(RefCell::new(t))
}
#[inline(always)]
pub fn read(&self) -> std::cell::Ref<T> {
self.0.borrow()
}
#[inline(always)]
pub fn write(&self) -> std::cell::RefMut<T> {
self.0.borrow_mut()
}
}
}

View file

@ -18,6 +18,7 @@ pub mod version;
mod error;
#[cfg(feature = "fuzzing")]
pub mod fuzz;
mod isomorph;
mod loro;
mod smstring;
mod snapshot;

View file

@ -20,6 +20,7 @@ use crate::{
dag::Dag,
debug_log,
id::{ClientID, Counter},
isomorph::{Irc, IsoRw, IsoWeak},
span::{HasIdSpan, IdSpan},
Lamport, Op, Timestamp, VersionVector, ID,
};
@ -42,8 +43,8 @@ impl Default for GcConfig {
}
}
pub type LogStoreRef = Arc<RwLock<LogStore>>;
pub type LogStoreWeakRef = Weak<RwLock<LogStore>>;
pub(crate) type LogStoreRef = Irc<IsoRw<LogStore>>;
pub(crate) type LogStoreWeakRef = IsoWeak<IsoRw<LogStore>>;
#[derive(Debug)]
/// LogStore stores the full history of Loro
@ -62,20 +63,20 @@ pub struct LogStore {
pub(crate) this_client_id: ClientID,
frontier: SmallVec<[ID; 2]>,
/// CRDT container manager
pub(crate) container: Weak<RwLock<ContainerManager>>,
to_self: Weak<RwLock<LogStore>>,
pub(crate) container: IsoWeak<IsoRw<ContainerManager>>,
to_self: IsoWeak<IsoRw<LogStore>>,
_pin: PhantomPinned,
}
impl LogStore {
pub fn new(
pub(crate) fn new(
mut cfg: Configure,
client_id: Option<ClientID>,
container: Weak<RwLock<ContainerManager>>,
) -> Arc<RwLock<Self>> {
container: IsoWeak<IsoRw<ContainerManager>>,
) -> Irc<IsoRw<Self>> {
let this_client_id = client_id.unwrap_or_else(|| cfg.rand.next_u64());
Arc::new_cyclic(|x| {
RwLock::new(Self {
Irc::new_cyclic(|x| {
IsoRw::new(Self {
cfg,
this_client_id,
changes: FxHashMap::default(),
@ -156,7 +157,7 @@ impl LogStore {
fn change_to_export_format(&self, change: &mut Change) {
let upgraded = self.container.upgrade().unwrap();
let container_manager = upgraded.read().unwrap();
let container_manager = upgraded.read();
for op in change.ops.vec_mut().iter_mut() {
let container = container_manager.get(&op.container).unwrap();
container.to_export(op);
@ -264,7 +265,7 @@ impl LogStore {
// TODO: find a way to remove this clone? we don't need change in apply method actually
let upgraded = self.container.upgrade().unwrap();
let mut container_manager = upgraded.write().unwrap();
let mut container_manager = upgraded.write();
#[cfg(feature = "slice")]
self.change_to_imported_format(&mut container_manager, &mut change);
let v = self

View file

@ -1,7 +1,4 @@
use std::{
ptr::NonNull,
sync::{Arc, RwLock, RwLockWriteGuard},
};
use std::ptr::NonNull;
use owning_ref::{OwningRef, OwningRefMut};
@ -15,12 +12,13 @@ use crate::{
ContainerID, ContainerType,
},
id::ClientID,
isomorph::{Irc, IsoRefMut, IsoRw},
InternalString, LogStore, VersionVector,
};
pub struct LoroCore {
pub log_store: Arc<RwLock<LogStore>>,
pub container: Arc<RwLock<ContainerManager>>,
pub(crate) log_store: Irc<IsoRw<LogStore>>,
pub(crate) container: Irc<IsoRw<ContainerManager>>,
}
impl Default for LoroCore {
@ -31,30 +29,31 @@ impl Default for LoroCore {
impl LoroCore {
pub fn new(cfg: Configure, client_id: Option<ClientID>) -> Self {
let container = Arc::new(RwLock::new(ContainerManager {
let container = Irc::new(IsoRw::new(ContainerManager {
containers: Default::default(),
store: NonNull::dangling(),
}));
let weak = Irc::downgrade(&container);
Self {
log_store: LogStore::new(cfg, client_id, Arc::downgrade(&container)),
log_store: LogStore::new(cfg, client_id, weak),
container,
}
}
pub fn vv(&self) -> VersionVector {
self.log_store.read().unwrap().get_vv().clone()
self.log_store.read().get_vv().clone()
}
pub fn get_container(
&mut self,
name: InternalString,
container: ContainerType,
) -> OwningRefMut<RwLockWriteGuard<ContainerManager>, ContainerInstance> {
let a = OwningRefMut::new(self.container.write().unwrap());
) -> OwningRefMut<IsoRefMut<ContainerManager>, ContainerInstance> {
let a = OwningRefMut::new(self.container.write());
a.map_mut(|x| {
x.get_or_create(
&ContainerID::new_root(name, container),
Arc::downgrade(&self.log_store),
Irc::downgrade(&self.log_store),
)
})
}
@ -62,12 +61,12 @@ impl LoroCore {
pub fn get_map_container(
&mut self,
name: InternalString,
) -> OwningRefMut<RwLockWriteGuard<ContainerManager>, Box<MapContainer>> {
let a = OwningRefMut::new(self.container.write().unwrap());
) -> OwningRefMut<IsoRefMut<ContainerManager>, Box<MapContainer>> {
let a = OwningRefMut::new(self.container.write());
a.map_mut(|x| {
x.get_or_create(
&ContainerID::new_root(name, ContainerType::Map),
Arc::downgrade(&self.log_store),
Irc::downgrade(&self.log_store),
)
.as_map_mut()
.unwrap()
@ -77,12 +76,12 @@ impl LoroCore {
pub fn get_or_create_text_container_mut(
&mut self,
name: InternalString,
) -> OwningRefMut<RwLockWriteGuard<ContainerManager>, Box<TextContainer>> {
let a = OwningRefMut::new(self.container.write().unwrap());
) -> OwningRefMut<IsoRefMut<ContainerManager>, Box<TextContainer>> {
let a = OwningRefMut::new(self.container.write());
a.map_mut(|x| {
x.get_or_create(
&ContainerID::new_root(name, ContainerType::Text),
Arc::downgrade(&self.log_store),
Irc::downgrade(&self.log_store),
)
.as_text_mut()
.unwrap()
@ -92,8 +91,8 @@ impl LoroCore {
pub fn get_text_container(
&self,
name: InternalString,
) -> OwningRef<RwLockWriteGuard<ContainerManager>, Box<TextContainer>> {
let a = OwningRef::new(self.container.write().unwrap());
) -> OwningRef<IsoRefMut<ContainerManager>, Box<TextContainer>> {
let a = OwningRef::new(self.container.write());
a.map(|x| {
x.get(&ContainerID::new_root(name, ContainerType::Text))
.unwrap()
@ -103,12 +102,12 @@ impl LoroCore {
}
pub fn export(&self, remote_vv: VersionVector) -> Vec<Change> {
let store = self.log_store.read().unwrap();
let store = self.log_store.read();
store.export(&remote_vv)
}
pub fn import(&mut self, changes: Vec<Change>) {
let mut store = self.log_store.write().unwrap();
let mut store = self.log_store.write();
store.import(changes)
}
}