Merge branch 'refactor-parallel' into feat-wasm

This commit is contained in:
Zixuan Chen 2022-10-28 18:49:01 +08:00
commit 87227ad39a
14 changed files with 194 additions and 107 deletions

View file

@ -45,6 +45,7 @@ doctest = false
[features] [features]
wasm = ["wasm-bindgen", "js-sys"] wasm = ["wasm-bindgen", "js-sys"]
parallel = []
# whether to use list slice instead of raw str in text container # whether to use list slice instead of raw str in text container
slice = [] slice = []
fuzzing = ["crdt-list/fuzzing", "rand", "slice", "arbitrary", "tabled"] fuzzing = ["crdt-list/fuzzing", "rand", "slice", "arbitrary", "tabled"]

View file

@ -1,5 +1,8 @@
use arbitrary::Unstructured;
use criterion::{criterion_group, criterion_main, Criterion}; use criterion::{criterion_group, criterion_main, Criterion};
#[cfg(feature = "fuzzing")]
mod run {
use super::*;
use arbitrary::Unstructured;
use loro_core::fuzz::test_multi_sites; use loro_core::fuzz::test_multi_sites;
use loro_core::fuzz::Action; use loro_core::fuzz::Action;
use rand::Rng; use rand::Rng;
@ -18,6 +21,11 @@ pub fn criterion_benchmark(c: &mut Criterion) {
b.iter(|| test_multi_sites(2, actions.clone().into())) b.iter(|| test_multi_sites(2, actions.clone().into()))
}); });
} }
}
pub fn dumb(_c: &mut Criterion) {}
criterion_group!(benches, criterion_benchmark); #[cfg(feature = "fuzzing")]
criterion_group!(benches, run::criterion_benchmark);
#[cfg(not(feature = "fuzzing"))]
criterion_group!(benches, dumb);
criterion_main!(benches); criterion_main!(benches);

View file

@ -89,7 +89,7 @@ pub struct ContainerManager {
impl ContainerManager { impl ContainerManager {
#[inline] #[inline]
pub fn create( pub(crate) fn create(
&mut self, &mut self,
id: ContainerID, id: ContainerID,
container_type: ContainerType, container_type: ContainerType,
@ -119,7 +119,7 @@ impl ContainerManager {
self.containers.insert(id, container); self.containers.insert(id, container);
} }
pub fn get_or_create( pub(crate) fn get_or_create(
&mut self, &mut self,
id: &ContainerID, id: &ContainerID,
log_store: LogStoreWeakRef, log_store: LogStoreWeakRef,

View file

@ -41,10 +41,16 @@ impl MapContainer {
} }
} }
pub fn insert(&mut self, key: InternalString, value: InsertValue, store: LogStoreWeakRef) { // FIXME: keep store in the struct
pub(crate) fn insert(
&mut self,
key: InternalString,
value: InsertValue,
store: LogStoreWeakRef,
) {
let self_id = self.id.clone(); let self_id = self.id.clone();
let m = store.upgrade().unwrap(); let m = store.upgrade().unwrap();
let mut store = m.write().unwrap(); let mut store = m.write();
let client_id = store.this_client_id; let client_id = store.this_client_id;
let order = TotalOrderStamp { let order = TotalOrderStamp {
client_id, client_id,
@ -83,8 +89,9 @@ impl MapContainer {
); );
} }
// FIXME: keep store in the struct
#[inline] #[inline]
pub fn delete(&mut self, key: InternalString, store: LogStoreWeakRef) { pub(crate) fn delete(&mut self, key: InternalString, store: LogStoreWeakRef) {
self.insert(key, InsertValue::Null, store); self.insert(key, InsertValue::Null, store);
} }
} }

View file

@ -1,13 +1,13 @@
#![cfg(test)] #![cfg(test)]
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::Arc;
use fxhash::FxHashMap; use fxhash::FxHashMap;
use proptest::prelude::*; use proptest::prelude::*;
use proptest::proptest; use proptest::proptest;
use crate::container::Container; use crate::container::Container;
use crate::isomorph::Irc;
use crate::value::proptest::gen_insert_value; use crate::value::proptest::gen_insert_value;
use crate::{fx_map, value::InsertValue, LoroCore, LoroValue}; use crate::{fx_map, value::InsertValue, LoroCore, LoroValue};
@ -15,7 +15,7 @@ use crate::{fx_map, value::InsertValue, LoroCore, LoroValue};
#[test] #[test]
fn basic() { fn basic() {
let mut loro = LoroCore::default(); let mut loro = LoroCore::default();
let weak = Arc::downgrade(&loro.log_store); let weak = Irc::downgrade(&loro.log_store);
let mut a = loro.get_map_container("map".into()); let mut a = loro.get_map_container("map".into());
let container = a.as_mut(); let container = a.as_mut();
container.insert("haha".into(), InsertValue::Int32(1), weak); container.insert("haha".into(), InsertValue::Int32(1), weak);
@ -38,7 +38,7 @@ mod map_proptest {
value in prop::collection::vec(gen_insert_value(), 0..10 * PROPTEST_FACTOR_10) value in prop::collection::vec(gen_insert_value(), 0..10 * PROPTEST_FACTOR_10)
) { ) {
let mut loro = LoroCore::default(); let mut loro = LoroCore::default();
let weak = Arc::downgrade(&loro.log_store); let weak = Irc::downgrade(&loro.log_store);
let mut a = loro.get_map_container("map".into()); let mut a = loro.get_map_container("map".into());
let container = a.as_mut(); let container = a.as_mut();
let mut map: HashMap<String, InsertValue> = HashMap::new(); let mut map: HashMap<String, InsertValue> = HashMap::new();

View file

@ -40,7 +40,7 @@ pub struct TextContainer {
} }
impl TextContainer { impl TextContainer {
pub fn new(id: ContainerID, log_store: LogStoreWeakRef) -> Self { pub(crate) fn new(id: ContainerID, log_store: LogStoreWeakRef) -> Self {
Self { Self {
id, id,
log_store, log_store,
@ -59,7 +59,8 @@ impl TextContainer {
return None; return None;
} }
let id = if let Ok(mut store) = self.log_store.upgrade().unwrap().write() { let s = self.log_store.upgrade().unwrap();
let mut store = s.write();
let id = store.next_id(); let id = store.next_id();
#[cfg(feature = "slice")] #[cfg(feature = "slice")]
let slice = ListSlice::from_range(self.raw_str.alloc(text)); let slice = ListSlice::from_range(self.raw_str.alloc(text));
@ -77,10 +78,6 @@ impl TextContainer {
store.append_local_ops(vec![op]); store.append_local_ops(vec![op]);
self.head = smallvec![last_id]; self.head = smallvec![last_id];
self.vv.set_last(last_id); self.vv.set_last(last_id);
id
} else {
unimplemented!()
};
Some(id) Some(id)
} }
@ -90,7 +87,8 @@ impl TextContainer {
return None; return None;
} }
let id = if let Ok(mut store) = self.log_store.upgrade().unwrap().write() { let s = self.log_store.upgrade().unwrap();
let mut store = s.write();
let id = store.next_id(); let id = store.next_id();
let op = Op::new( let op = Op::new(
id, id,
@ -105,11 +103,6 @@ impl TextContainer {
self.state.delete_range(Some(pos), Some(pos + len)); self.state.delete_range(Some(pos), Some(pos + len));
self.head = smallvec![last_id]; self.head = smallvec![last_id];
self.vv.set_last(last_id); self.vv.set_last(last_id);
id
} else {
unimplemented!()
};
Some(id) Some(id)
} }

View file

@ -202,7 +202,7 @@ pub mod fuzz {
} }
use crdt_list::test::{Action, TestFramework}; use crdt_list::test::{Action, TestFramework};
use rle::{RleVec, RleVecWithIndex, RleVecWithLen}; use rle::{RleVecWithIndex, RleVecWithLen};
use tabled::TableIteratorExt; use tabled::TableIteratorExt;
use crate::{ use crate::{

View file

@ -0,0 +1,79 @@
#![allow(unused)]
use std::{
cell::{Ref, RefCell, RefMut},
rc::{Rc, Weak as RcWeak},
sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard, Weak as ArcWeak},
};
#[cfg(feature = "parallel")]
pub(crate) type Irc<T> = Arc<T>;
#[cfg(not(feature = "parallel"))]
pub(crate) type Irc<T> = Rc<T>;
#[cfg(feature = "parallel")]
pub(crate) type IsoWeak<T> = ArcWeak<T>;
#[cfg(not(feature = "parallel"))]
pub(crate) type IsoWeak<T> = RcWeak<T>;
#[cfg(feature = "parallel")]
#[derive(Debug)]
pub(crate) struct IsoRw<T>(RwLock<T>);
#[cfg(not(feature = "parallel"))]
#[derive(Debug)]
pub(crate) struct IsoRw<T>(RefCell<T>);
#[cfg(feature = "parallel")]
pub(crate) type IsoRef<'a, T> = RwLockReadGuard<'a, T>;
#[cfg(not(feature = "parallel"))]
pub(crate) type IsoRef<'a, T> = Ref<'a, T>;
#[cfg(feature = "parallel")]
pub(crate) type IsoRefMut<'a, T> = RwLockWriteGuard<'a, T>;
#[cfg(not(feature = "parallel"))]
pub(crate) type IsoRefMut<'a, T> = RefMut<'a, T>;
#[cfg(feature = "parallel")]
mod rw_parallel {
use super::*;
impl<T> IsoRw<T> {
#[inline(always)]
pub fn new(t: T) -> Self {
Self(RwLock::new(t))
}
#[inline(always)]
pub fn read(&self) -> std::sync::RwLockReadGuard<T> {
self.0.read().unwrap()
}
#[inline(always)]
pub fn write(&self) -> std::sync::RwLockWriteGuard<T> {
self.0.write().unwrap()
}
}
}
#[cfg(not(feature = "parallel"))]
mod rw_single {
use std::{cell::RefCell, ops::Deref};
use super::IsoRw;
impl<T> IsoRw<T> {
#[inline(always)]
pub fn new(t: T) -> Self {
IsoRw(RefCell::new(t))
}
#[inline(always)]
pub fn read(&self) -> std::cell::Ref<T> {
self.0.borrow()
}
#[inline(always)]
pub fn write(&self) -> std::cell::RefMut<T> {
self.0.borrow_mut()
}
}
}

View file

@ -18,6 +18,7 @@ pub mod version;
mod error; mod error;
#[cfg(feature = "fuzzing")] #[cfg(feature = "fuzzing")]
pub mod fuzz; pub mod fuzz;
mod isomorph;
mod loro; mod loro;
mod smstring; mod smstring;
mod snapshot; mod snapshot;

View file

@ -4,7 +4,6 @@
mod iter; mod iter;
use std::{ use std::{
marker::PhantomPinned, marker::PhantomPinned,
sync::{Arc, RwLock, Weak},
}; };
use fxhash::{FxHashMap, FxHashSet}; use fxhash::{FxHashMap, FxHashSet};
@ -20,6 +19,7 @@ use crate::{
dag::Dag, dag::Dag,
debug_log, debug_log,
id::{ClientID, Counter}, id::{ClientID, Counter},
isomorph::{Irc, IsoRw, IsoWeak},
span::{HasIdSpan, IdSpan}, span::{HasIdSpan, IdSpan},
Lamport, Op, Timestamp, VersionVector, ID, Lamport, Op, Timestamp, VersionVector, ID,
}; };
@ -42,8 +42,8 @@ impl Default for GcConfig {
} }
} }
pub type LogStoreRef = Arc<RwLock<LogStore>>; pub(crate) type LogStoreRef = Irc<IsoRw<LogStore>>;
pub type LogStoreWeakRef = Weak<RwLock<LogStore>>; pub(crate) type LogStoreWeakRef = IsoWeak<IsoRw<LogStore>>;
#[derive(Debug)] #[derive(Debug)]
/// LogStore stores the full history of Loro /// LogStore stores the full history of Loro
@ -62,20 +62,20 @@ pub struct LogStore {
pub(crate) this_client_id: ClientID, pub(crate) this_client_id: ClientID,
frontier: SmallVec<[ID; 2]>, frontier: SmallVec<[ID; 2]>,
/// CRDT container manager /// CRDT container manager
pub(crate) container: Weak<RwLock<ContainerManager>>, pub(crate) container: IsoWeak<IsoRw<ContainerManager>>,
to_self: Weak<RwLock<LogStore>>, to_self: IsoWeak<IsoRw<LogStore>>,
_pin: PhantomPinned, _pin: PhantomPinned,
} }
impl LogStore { impl LogStore {
pub fn new( pub(crate) fn new(
mut cfg: Configure, mut cfg: Configure,
client_id: Option<ClientID>, client_id: Option<ClientID>,
container: Weak<RwLock<ContainerManager>>, container: IsoWeak<IsoRw<ContainerManager>>,
) -> Arc<RwLock<Self>> { ) -> Irc<IsoRw<Self>> {
let this_client_id = client_id.unwrap_or_else(|| cfg.rand.next_u64()); let this_client_id = client_id.unwrap_or_else(|| cfg.rand.next_u64());
Arc::new_cyclic(|x| { Irc::new_cyclic(|x| {
RwLock::new(Self { IsoRw::new(Self {
cfg, cfg,
this_client_id, this_client_id,
changes: FxHashMap::default(), changes: FxHashMap::default(),
@ -156,7 +156,7 @@ impl LogStore {
fn change_to_export_format(&self, change: &mut Change) { fn change_to_export_format(&self, change: &mut Change) {
let upgraded = self.container.upgrade().unwrap(); let upgraded = self.container.upgrade().unwrap();
let container_manager = upgraded.read().unwrap(); let container_manager = upgraded.read();
for op in change.ops.vec_mut().iter_mut() { for op in change.ops.vec_mut().iter_mut() {
let container = container_manager.get(&op.container).unwrap(); let container = container_manager.get(&op.container).unwrap();
container.to_export(op); container.to_export(op);
@ -264,7 +264,7 @@ impl LogStore {
// TODO: find a way to remove this clone? we don't need change in apply method actually // TODO: find a way to remove this clone? we don't need change in apply method actually
let upgraded = self.container.upgrade().unwrap(); let upgraded = self.container.upgrade().unwrap();
let mut container_manager = upgraded.write().unwrap(); let mut container_manager = upgraded.write();
#[cfg(feature = "slice")] #[cfg(feature = "slice")]
self.change_to_imported_format(&mut container_manager, &mut change); self.change_to_imported_format(&mut container_manager, &mut change);
let v = self let v = self

View file

@ -1,7 +1,4 @@
use std::{ use std::ptr::NonNull;
ptr::NonNull,
sync::{Arc, RwLock, RwLockWriteGuard},
};
use owning_ref::{OwningRef, OwningRefMut}; use owning_ref::{OwningRef, OwningRefMut};
@ -15,12 +12,13 @@ use crate::{
ContainerID, ContainerType, ContainerID, ContainerType,
}, },
id::ClientID, id::ClientID,
LogStore, VersionVector, isomorph::{Irc, IsoRefMut, IsoRw},
InternalString, LogStore, LogStore, VersionVector, VersionVector,
}; };
pub struct LoroCore { pub struct LoroCore {
pub log_store: Arc<RwLock<LogStore>>, pub(crate) log_store: Irc<IsoRw<LogStore>>,
pub container: Arc<RwLock<ContainerManager>>, pub(crate) container: Irc<IsoRw<ContainerManager>>,
} }
impl Default for LoroCore { impl Default for LoroCore {
@ -31,30 +29,31 @@ impl Default for LoroCore {
impl LoroCore { impl LoroCore {
pub fn new(cfg: Configure, client_id: Option<ClientID>) -> Self { pub fn new(cfg: Configure, client_id: Option<ClientID>) -> Self {
let container = Arc::new(RwLock::new(ContainerManager { let container = Irc::new(IsoRw::new(ContainerManager {
containers: Default::default(), containers: Default::default(),
store: NonNull::dangling(), store: NonNull::dangling(),
})); }));
let weak = Irc::downgrade(&container);
Self { Self {
log_store: LogStore::new(cfg, client_id, Arc::downgrade(&container)), log_store: LogStore::new(cfg, client_id, weak),
container, container,
} }
} }
pub fn vv(&self) -> VersionVector { pub fn vv(&self) -> VersionVector {
self.log_store.read().unwrap().get_vv().clone() self.log_store.read().get_vv().clone()
} }
pub fn get_container( pub fn get_container(
&mut self, &mut self,
name: &str, name: &str,
container: ContainerType, container: ContainerType,
) -> OwningRefMut<RwLockWriteGuard<ContainerManager>, ContainerInstance> { ) -> OwningRefMut<IsoRefMut<ContainerManager>, ContainerInstance> {
let a = OwningRefMut::new(self.container.write().unwrap()); let a = OwningRefMut::new(self.container.write());
a.map_mut(|x| { a.map_mut(|x| {
x.get_or_create( x.get_or_create(
&ContainerID::new_root(name, container), &ContainerID::new_root(name, container),
Arc::downgrade(&self.log_store), Irc::downgrade(&self.log_store),
) )
}) })
} }
@ -63,11 +62,11 @@ impl LoroCore {
&mut self, &mut self,
name: &str, name: &str,
) -> OwningRefMut<RwLockWriteGuard<ContainerManager>, Box<MapContainer>> { ) -> OwningRefMut<RwLockWriteGuard<ContainerManager>, Box<MapContainer>> {
let a = OwningRefMut::new(self.container.write().unwrap()); let a = OwningRefMut::new(self.container.write());
a.map_mut(|x| { a.map_mut(|x| {
x.get_or_create( x.get_or_create(
&ContainerID::new_root(name, ContainerType::Map), &ContainerID::new_root(name, ContainerType::Map),
Arc::downgrade(&self.log_store), Irc::downgrade(&self.log_store),
) )
.as_map_mut() .as_map_mut()
.unwrap() .unwrap()
@ -75,11 +74,11 @@ impl LoroCore {
} }
pub fn get_or_create_text_container_mut(&mut self, name: &str) -> ContainerRef<TextContainer> { pub fn get_or_create_text_container_mut(&mut self, name: &str) -> ContainerRef<TextContainer> {
let a = OwningRefMut::new(self.container.write().unwrap()); let a = OwningRefMut::new(self.container.write());
a.map_mut(|x| { a.map_mut(|x| {
x.get_or_create( x.get_or_create(
&ContainerID::new_root(name, ContainerType::Text), &ContainerID::new_root(name, ContainerType::Text),
Arc::downgrade(&self.log_store), Irc::downgrade(&self.log_store),
) )
.as_text_mut() .as_text_mut()
.unwrap() .unwrap()
@ -91,7 +90,7 @@ impl LoroCore {
&self, &self,
name: &str, name: &str,
) -> OwningRef<RwLockWriteGuard<ContainerManager>, Box<TextContainer>> { ) -> OwningRef<RwLockWriteGuard<ContainerManager>, Box<TextContainer>> {
let a = OwningRef::new(self.container.write().unwrap()); let a = OwningRef::new(self.container.write());
a.map(|x| { a.map(|x| {
x.get(&ContainerID::new_root(name, ContainerType::Text)) x.get(&ContainerID::new_root(name, ContainerType::Text))
.unwrap() .unwrap()
@ -101,12 +100,12 @@ impl LoroCore {
} }
pub fn export(&self, remote_vv: VersionVector) -> Vec<Change> { pub fn export(&self, remote_vv: VersionVector) -> Vec<Change> {
let store = self.log_store.read().unwrap(); let store = self.log_store.read();
store.export(&remote_vv) store.export(&remote_vv)
} }
pub fn import(&mut self, changes: Vec<Change>) { pub fn import(&mut self, changes: Vec<Change>) {
let mut store = self.log_store.write().unwrap(); let mut store = self.log_store.write();
store.import(changes) store.import(changes)
} }
} }

View file

@ -1,5 +1,5 @@
use bumpalo::boxed::Box as BumpBox;
use std::{cell::UnsafeCell, fmt::Debug, ptr::NonNull}; use std::{fmt::Debug, ptr::NonNull};
use fxhash::FxHashSet; use fxhash::FxHashSet;
@ -7,8 +7,7 @@ use crate::{
rle_trait::{GlobalIndex, HasIndex, ZeroElement}, rle_trait::{GlobalIndex, HasIndex, ZeroElement},
rle_tree::{ rle_tree::{
node::{InternalNode, LeafNode}, node::{InternalNode, LeafNode},
tree_trait::GlobalTreeTrait, tree_trait::GlobalTreeTrait, UnsafeCursor,
Position, UnsafeCursor,
}, },
HasLength, Mergable, Rle, RleTree, Sliceable, HasLength, Mergable, Rle, RleTree, Sliceable,
}; };

View file

@ -3,7 +3,7 @@ use std::{
fmt::{Debug, Error, Formatter}, fmt::{Debug, Error, Formatter},
}; };
use fxhash::{FxBuildHasher, FxHashSet, FxHasher}; use fxhash::{FxBuildHasher, FxHashSet};
use smallvec::SmallVec; use smallvec::SmallVec;
use crate::rle_tree::{ use crate::rle_tree::{

View file

@ -20,8 +20,8 @@ check:
check-unsafe: check-unsafe:
env RUSTFLAGS="-Funsafe-code --cap-lints=warn" cargo check env RUSTFLAGS="-Funsafe-code --cap-lints=warn" cargo check
fix: fix *FLAGS:
cargo clippy --fix cargo clippy --fix {{FLAGS}}
deny: deny:
cargo deny check cargo deny check