mirror of
https://github.com/loro-dev/loro.git
synced 2025-02-02 11:06:14 +00:00
fix: decode hierarchy for snapshot mode
update columnar version, reduce compression time
This commit is contained in:
parent
ffce7d81eb
commit
4748e1d38c
25 changed files with 295 additions and 87 deletions
4
Cargo.lock
generated
4
Cargo.lock
generated
|
@ -1344,9 +1344,9 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "serde_columnar"
|
||||
version = "0.2.2"
|
||||
version = "0.2.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "774a5997b44f99b17866e559efff9138852415e32a07d04029bafb0e53ceff2a"
|
||||
checksum = "a0ee0b00614e5c6aa133c162d63d75f14ef581856e3ca1e6e88e1374941f1452"
|
||||
dependencies = [
|
||||
"flate2",
|
||||
"itertools",
|
||||
|
|
|
@ -28,7 +28,7 @@ js-sys = { version = "0.3.60", optional = true }
|
|||
serde_json = { version = "1.0.87", optional = true }
|
||||
arref = "0.1.0"
|
||||
debug-log = "0.1.4"
|
||||
serde_columnar = { version = "0.2.2" }
|
||||
serde_columnar = { version = "0.2.3" }
|
||||
tracing = { version = "0.1.37" }
|
||||
append-only-bytes = { version = "0.1.4", features = ["u32_range"] }
|
||||
flate2 = "1.0"
|
||||
|
|
|
@ -1,6 +1,107 @@
|
|||
use criterion::{criterion_group, criterion_main, Criterion};
|
||||
const RAW_DATA: &[u8; 901823] = include_bytes!("automerge-paper.json.gz");
|
||||
|
||||
#[cfg(feature = "test_utils")]
|
||||
mod sync {
|
||||
use std::io::Read;
|
||||
|
||||
use super::*;
|
||||
use flate2::read::GzDecoder;
|
||||
use loro_core::container::registry::ContainerWrapper;
|
||||
use loro_core::log_store::{EncodeConfig, EncodeMode};
|
||||
use loro_core::LoroCore;
|
||||
use serde_json::Value;
|
||||
|
||||
pub fn b4(c: &mut Criterion) {
|
||||
let mut d = GzDecoder::new(&RAW_DATA[..]);
|
||||
let mut s = String::new();
|
||||
d.read_to_string(&mut s).unwrap();
|
||||
let json: Value = serde_json::from_str(&s).unwrap();
|
||||
let txns = json.as_object().unwrap().get("txns");
|
||||
let mut actions = Vec::new();
|
||||
|
||||
for (i, txn) in txns.unwrap().as_array().unwrap().iter().enumerate() {
|
||||
if i > 1000 {
|
||||
break;
|
||||
}
|
||||
let patches = txn
|
||||
.as_object()
|
||||
.unwrap()
|
||||
.get("patches")
|
||||
.unwrap()
|
||||
.as_array()
|
||||
.unwrap();
|
||||
for patch in patches {
|
||||
let pos = patch[0].as_u64().unwrap() as usize;
|
||||
let del_here = patch[1].as_u64().unwrap() as usize;
|
||||
let ins_content = patch[2].as_str().unwrap();
|
||||
actions.push((pos, del_here, ins_content));
|
||||
}
|
||||
}
|
||||
let mut b = c.benchmark_group("encode_with_sync");
|
||||
b.sample_size(10);
|
||||
b.bench_function("update", |b| {
|
||||
let mut c1 = LoroCore::new(Default::default(), Some(0));
|
||||
let mut c2 = LoroCore::new(Default::default(), Some(1));
|
||||
let t1 = c1.get_text("text");
|
||||
let t2 = c2.get_text("text");
|
||||
b.iter(|| {
|
||||
for (i, action) in actions.iter().enumerate() {
|
||||
let (pos, del, insert) = action;
|
||||
if i % 2 == 0 {
|
||||
t1.with_container(|text| {
|
||||
text.delete(&c1, *pos, *del);
|
||||
text.insert(&c1, *pos, insert);
|
||||
});
|
||||
let update = c1
|
||||
.encode(EncodeConfig::new(EncodeMode::Updates(c2.vv()), None))
|
||||
.unwrap();
|
||||
c2.decode(&update).unwrap();
|
||||
} else {
|
||||
t2.with_container(|text| {
|
||||
text.delete(&c2, *pos, *del);
|
||||
text.insert(&c2, *pos, insert);
|
||||
});
|
||||
let update = c2
|
||||
.encode(EncodeConfig::new(EncodeMode::Updates(c1.vv()), None))
|
||||
.unwrap();
|
||||
c1.decode(&update).unwrap();
|
||||
}
|
||||
}
|
||||
})
|
||||
});
|
||||
b.bench_function("rle update", |b| {
|
||||
let mut c1 = LoroCore::new(Default::default(), Some(0));
|
||||
let mut c2 = LoroCore::new(Default::default(), Some(1));
|
||||
let t1 = c1.get_text("text");
|
||||
let t2 = c2.get_text("text");
|
||||
b.iter(|| {
|
||||
for (i, action) in actions.iter().enumerate() {
|
||||
let (pos, del, insert) = action;
|
||||
if i % 2 == 0 {
|
||||
t1.with_container(|text| {
|
||||
text.delete(&c1, *pos, *del);
|
||||
text.insert(&c1, *pos, insert);
|
||||
});
|
||||
let update = c1
|
||||
.encode(EncodeConfig::new(EncodeMode::RleUpdates(c2.vv()), None))
|
||||
.unwrap();
|
||||
c2.decode(&update).unwrap();
|
||||
} else {
|
||||
t2.with_container(|text| {
|
||||
text.delete(&c2, *pos, *del);
|
||||
text.insert(&c2, *pos, insert);
|
||||
});
|
||||
let update = c2
|
||||
.encode(EncodeConfig::new(EncodeMode::RleUpdates(c1.vv()), None))
|
||||
.unwrap();
|
||||
c1.decode(&update).unwrap();
|
||||
}
|
||||
}
|
||||
})
|
||||
});
|
||||
}
|
||||
}
|
||||
#[cfg(feature = "test_utils")]
|
||||
mod run {
|
||||
use std::io::Read;
|
||||
|
@ -39,7 +140,7 @@ mod run {
|
|||
}
|
||||
});
|
||||
let mut b = c.benchmark_group("encode");
|
||||
b.bench_function("B4_encode_changes_no_compress", |b| {
|
||||
b.bench_function("B4_encode_updates", |b| {
|
||||
b.iter(|| {
|
||||
let _ = loro
|
||||
.encode(EncodeConfig::new(
|
||||
|
@ -49,7 +150,7 @@ mod run {
|
|||
.unwrap();
|
||||
})
|
||||
});
|
||||
b.bench_function("B4_decode_changes_no_compress", |b| {
|
||||
b.bench_function("B4_decode_updates", |b| {
|
||||
let buf = loro
|
||||
.encode(EncodeConfig::new(
|
||||
EncodeMode::Updates(VersionVector::new()),
|
||||
|
@ -57,17 +158,38 @@ mod run {
|
|||
))
|
||||
.unwrap();
|
||||
let mut store2 = LoroCore::default();
|
||||
// store2.get_list("list").insert(&store2, 0, "lll").unwrap();
|
||||
b.iter(|| {
|
||||
store2.decode(&buf).unwrap();
|
||||
})
|
||||
});
|
||||
b.bench_function("B4_encode_snapshot_no_compress", |b| {
|
||||
b.bench_function("B4_encode_rle_updates", |b| {
|
||||
b.iter(|| {
|
||||
let _ = loro
|
||||
.encode(EncodeConfig::new(
|
||||
EncodeMode::RleUpdates(VersionVector::new()),
|
||||
None,
|
||||
))
|
||||
.unwrap();
|
||||
})
|
||||
});
|
||||
b.bench_function("B4_decode_rle_updates", |b| {
|
||||
let buf = loro
|
||||
.encode(EncodeConfig::new(
|
||||
EncodeMode::RleUpdates(VersionVector::new()),
|
||||
None,
|
||||
))
|
||||
.unwrap();
|
||||
let mut store2 = LoroCore::default();
|
||||
b.iter(|| {
|
||||
store2.decode(&buf).unwrap();
|
||||
})
|
||||
});
|
||||
b.bench_function("B4_encode_snapshot", |b| {
|
||||
b.iter(|| {
|
||||
let _ = loro.encode(EncodeConfig::from_vv(None)).unwrap();
|
||||
})
|
||||
});
|
||||
b.bench_function("B4_decode_snapshot_no_compress", |b| {
|
||||
b.bench_function("B4_decode_snapshot", |b| {
|
||||
let buf = loro.encode(EncodeConfig::from_vv(None)).unwrap();
|
||||
let mut store2 = LoroCore::default();
|
||||
b.iter(|| {
|
||||
|
@ -79,7 +201,7 @@ mod run {
|
|||
pub fn dumb(_c: &mut Criterion) {}
|
||||
|
||||
#[cfg(feature = "test_utils")]
|
||||
criterion_group!(benches, run::b4);
|
||||
criterion_group!(benches, run::b4, sync::b4);
|
||||
#[cfg(not(feature = "test_utils"))]
|
||||
criterion_group!(benches, dumb);
|
||||
criterion_main!(benches);
|
||||
|
|
|
@ -87,7 +87,7 @@ pub trait Container: Debug + Any + Unpin {
|
|||
) -> SmallVec<[InnerContent; 1]>;
|
||||
|
||||
/// Decode the pool mapping from the bytes and apply it to the container.
|
||||
fn to_import_snapshot(&mut self, state_content: StateContent);
|
||||
fn to_import_snapshot(&mut self, state_content: StateContent, hierarchy: &mut Hierarchy);
|
||||
|
||||
/// convert an op content to exported format that includes the raw data
|
||||
fn to_export(&mut self, content: InnerContent, gc: bool) -> SmallVec<[RemoteContent; 1]>;
|
||||
|
|
|
@ -504,10 +504,15 @@ impl Container for ListContainer {
|
|||
self.pool_mapping = Some(pool_mapping);
|
||||
}
|
||||
|
||||
fn to_import_snapshot(&mut self, state_content: StateContent) {
|
||||
fn to_import_snapshot(&mut self, state_content: StateContent, hierarchy: &mut Hierarchy) {
|
||||
if let StateContent::List { pool, state_len } = state_content {
|
||||
for v in pool.iter() {
|
||||
if let LoroValue::Unresolved(child_container_id) = v {
|
||||
hierarchy.add_child(self.id(), child_container_id.as_ref());
|
||||
}
|
||||
}
|
||||
self.raw_data = pool.into();
|
||||
self.state.insert(0, (0..state_len as u32).into());
|
||||
self.state.insert(0, (0..state_len).into());
|
||||
} else {
|
||||
unreachable!()
|
||||
}
|
||||
|
@ -588,7 +593,7 @@ impl ContainerWrapper for List {
|
|||
where
|
||||
F: FnOnce(&mut Self::Container) -> R,
|
||||
{
|
||||
let mut container_instance = self.instance.lock().unwrap();
|
||||
let mut container_instance = self.instance.try_lock().unwrap();
|
||||
let list = container_instance.as_list_mut().unwrap();
|
||||
let ans = f(list);
|
||||
drop(container_instance);
|
||||
|
|
|
@ -435,9 +435,15 @@ impl Container for MapContainer {
|
|||
}
|
||||
}
|
||||
|
||||
fn to_import_snapshot(&mut self, state_content: StateContent) {
|
||||
fn to_import_snapshot(&mut self, state_content: StateContent, hierarchy: &mut Hierarchy) {
|
||||
if let StateContent::Map { pool, keys, values } = state_content {
|
||||
for v in pool.iter() {
|
||||
if let LoroValue::Unresolved(child_container_id) = v {
|
||||
hierarchy.add_child(self.id(), child_container_id.as_ref());
|
||||
}
|
||||
}
|
||||
self.pool = pool.into();
|
||||
|
||||
self.state = keys.into_iter().zip(values).collect();
|
||||
} else {
|
||||
unreachable!()
|
||||
|
@ -485,11 +491,22 @@ impl Map {
|
|||
}
|
||||
|
||||
pub fn id(&self) -> ContainerID {
|
||||
self.instance.lock().unwrap().as_map().unwrap().id.clone()
|
||||
self.instance
|
||||
.try_lock()
|
||||
.unwrap()
|
||||
.as_map()
|
||||
.unwrap()
|
||||
.id
|
||||
.clone()
|
||||
}
|
||||
|
||||
pub fn get_value(&self) -> LoroValue {
|
||||
self.instance.lock().unwrap().as_map().unwrap().get_value()
|
||||
self.instance
|
||||
.try_lock()
|
||||
.unwrap()
|
||||
.as_map()
|
||||
.unwrap()
|
||||
.get_value()
|
||||
}
|
||||
|
||||
pub fn len(&self) -> usize {
|
||||
|
@ -510,7 +527,7 @@ impl ContainerWrapper for Map {
|
|||
where
|
||||
F: FnOnce(&mut Self::Container) -> R,
|
||||
{
|
||||
let mut container_instance = self.instance.lock().unwrap();
|
||||
let mut container_instance = self.instance.try_lock().unwrap();
|
||||
let map = container_instance.as_map_mut().unwrap();
|
||||
let ans = f(map);
|
||||
drop(container_instance);
|
||||
|
|
|
@ -3,7 +3,6 @@
|
|||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
use fxhash::FxHashMap;
|
||||
use proptest::prelude::*;
|
||||
use proptest::proptest;
|
||||
|
||||
|
|
|
@ -22,8 +22,8 @@ use crate::{
|
|||
};
|
||||
|
||||
use super::{
|
||||
list::ListContainer, map::MapContainer, text::TextContainer, Container, ContainerID,
|
||||
ContainerType,
|
||||
list::ListContainer, map::MapContainer, pool_mapping::StateContent, text::TextContainer,
|
||||
Container, ContainerID, ContainerType,
|
||||
};
|
||||
|
||||
#[derive(PartialEq, Eq, Clone, Copy, Hash, Debug)]
|
||||
|
@ -182,7 +182,7 @@ impl Container for ContainerInstance {
|
|||
}
|
||||
}
|
||||
|
||||
fn encode_and_release_pool_mapping(&mut self) -> super::pool_mapping::StateContent {
|
||||
fn encode_and_release_pool_mapping(&mut self) -> StateContent {
|
||||
match self {
|
||||
ContainerInstance::Map(x) => x.encode_and_release_pool_mapping(),
|
||||
ContainerInstance::Text(x) => x.encode_and_release_pool_mapping(),
|
||||
|
@ -191,12 +191,12 @@ impl Container for ContainerInstance {
|
|||
}
|
||||
}
|
||||
|
||||
fn to_import_snapshot(&mut self, state_content: super::pool_mapping::StateContent) {
|
||||
fn to_import_snapshot(&mut self, state_content: StateContent, hierarchy: &mut Hierarchy) {
|
||||
match self {
|
||||
ContainerInstance::Map(x) => x.to_import_snapshot(state_content),
|
||||
ContainerInstance::Text(x) => x.to_import_snapshot(state_content),
|
||||
ContainerInstance::Dyn(x) => x.to_import_snapshot(state_content),
|
||||
ContainerInstance::List(x) => x.to_import_snapshot(state_content),
|
||||
ContainerInstance::Map(x) => x.to_import_snapshot(state_content, hierarchy),
|
||||
ContainerInstance::Text(x) => x.to_import_snapshot(state_content, hierarchy),
|
||||
ContainerInstance::Dyn(x) => x.to_import_snapshot(state_content, hierarchy),
|
||||
ContainerInstance::List(x) => x.to_import_snapshot(state_content, hierarchy),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -307,7 +307,7 @@ impl ContainerRegistry {
|
|||
#[cfg(feature = "test_utils")]
|
||||
pub fn debug_inspect(&mut self) {
|
||||
for ContainerAndId { container, id: _ } in self.containers.iter_mut() {
|
||||
if let ContainerInstance::Text(x) = container.lock().unwrap().deref_mut() {
|
||||
if let ContainerInstance::Text(x) = container.try_lock().unwrap().deref_mut() {
|
||||
x.debug_inspect()
|
||||
}
|
||||
}
|
||||
|
@ -321,7 +321,7 @@ impl ContainerRegistry {
|
|||
container_type,
|
||||
} = id
|
||||
{
|
||||
let container = container.lock().unwrap();
|
||||
let container = container.try_lock().unwrap();
|
||||
let json = match container.deref() {
|
||||
ContainerInstance::Map(x) => x.to_json(self),
|
||||
ContainerInstance::Text(x) => x.to_json(),
|
||||
|
|
|
@ -445,7 +445,7 @@ impl Container for TextContainer {
|
|||
}
|
||||
}
|
||||
|
||||
fn to_import_snapshot(&mut self, state_content: StateContent) {
|
||||
fn to_import_snapshot(&mut self, state_content: StateContent, _hierarchy: &mut Hierarchy) {
|
||||
if let StateContent::Text { pool, state_len } = state_content {
|
||||
let mut append_only_bytes = AppendOnlyBytes::with_capacity(pool.len());
|
||||
append_only_bytes.push_slice(&pool);
|
||||
|
@ -481,7 +481,13 @@ impl Text {
|
|||
}
|
||||
|
||||
pub fn id(&self) -> ContainerID {
|
||||
self.instance.lock().unwrap().as_text().unwrap().id.clone()
|
||||
self.instance
|
||||
.try_lock()
|
||||
.unwrap()
|
||||
.as_text()
|
||||
.unwrap()
|
||||
.id
|
||||
.clone()
|
||||
}
|
||||
|
||||
pub fn insert<C: Context>(
|
||||
|
@ -503,7 +509,12 @@ impl Text {
|
|||
}
|
||||
|
||||
pub fn get_value(&self) -> LoroValue {
|
||||
self.instance.lock().unwrap().as_text().unwrap().get_value()
|
||||
self.instance
|
||||
.try_lock()
|
||||
.unwrap()
|
||||
.as_text()
|
||||
.unwrap()
|
||||
.get_value()
|
||||
}
|
||||
|
||||
pub fn len(&self) -> usize {
|
||||
|
@ -523,7 +534,7 @@ impl ContainerWrapper for Text {
|
|||
where
|
||||
F: FnOnce(&mut Self::Container) -> R,
|
||||
{
|
||||
let mut container_instance = self.instance.lock().unwrap();
|
||||
let mut container_instance = self.instance.try_lock().unwrap();
|
||||
let text = container_instance.as_text_mut().unwrap();
|
||||
let ans = f(text);
|
||||
drop(container_instance);
|
||||
|
|
|
@ -38,7 +38,7 @@ pub mod wasm {
|
|||
fn from(v: JsValue) -> Self {
|
||||
Self::JsError(
|
||||
v.as_string()
|
||||
.unwrap_or("unknown error".to_owned())
|
||||
.unwrap_or_else(|| "unknown error".to_owned())
|
||||
.into_boxed_str(),
|
||||
)
|
||||
}
|
||||
|
|
|
@ -29,7 +29,6 @@ pub struct Event {
|
|||
|
||||
pub type Path = Vec<Index>;
|
||||
|
||||
// Note: It will be encoded into binary format, so its order should not be changed.
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub enum Index {
|
||||
Key(InternalString),
|
||||
|
|
|
@ -73,7 +73,7 @@ impl Actor {
|
|||
}));
|
||||
|
||||
let log_store = actor.loro.log_store.write().unwrap();
|
||||
let mut hierarchy = log_store.hierarchy.lock().unwrap();
|
||||
let mut hierarchy = log_store.hierarchy.try_lock().unwrap();
|
||||
let text = Rc::clone(&actor.text_tracker);
|
||||
hierarchy.subscribe(
|
||||
&ContainerID::new_root("text", ContainerType::Text),
|
||||
|
|
|
@ -159,7 +159,11 @@ impl Hierarchy {
|
|||
let parent = &node.parent;
|
||||
if let Some(parent) = parent {
|
||||
let parent_node = reg.get(parent).unwrap();
|
||||
let index = parent_node.lock().unwrap().index_of_child(node_id).unwrap();
|
||||
let index = parent_node
|
||||
.try_lock()
|
||||
.unwrap()
|
||||
.index_of_child(node_id)
|
||||
.unwrap();
|
||||
path.push(index);
|
||||
} else {
|
||||
match node_id {
|
||||
|
|
|
@ -86,7 +86,7 @@ pub struct LogStore {
|
|||
type ContainerGuard<'a> = MutexGuard<'a, ContainerInstance>;
|
||||
|
||||
impl LogStore {
|
||||
pub(crate) fn new(mut cfg: Configure, client_id: Option<ClientID>) -> Arc<RwLock<Self>> {
|
||||
pub(crate) fn new(cfg: Configure, client_id: Option<ClientID>) -> Arc<RwLock<Self>> {
|
||||
let this_client_id = client_id.unwrap_or_else(|| cfg.rand.next_u64());
|
||||
Arc::new(RwLock::new(Self {
|
||||
cfg,
|
||||
|
@ -181,7 +181,7 @@ impl LogStore {
|
|||
|
||||
fn to_remote_op(&self, op: &Op) -> RemoteOp {
|
||||
let container = self.reg.get_by_idx(op.container).unwrap();
|
||||
let mut container = container.lock().unwrap();
|
||||
let mut container = container.try_lock().unwrap();
|
||||
op.clone().convert(&mut container, self.cfg.gc.gc)
|
||||
}
|
||||
|
||||
|
@ -328,6 +328,11 @@ impl LogStore {
|
|||
self.reg.debug_inspect();
|
||||
}
|
||||
|
||||
#[cfg(feature = "test_utils")]
|
||||
pub fn hierarchy(&self) -> &Arc<Mutex<Hierarchy>> {
|
||||
&self.hierarchy
|
||||
}
|
||||
|
||||
// TODO: remove
|
||||
#[inline(always)]
|
||||
pub(crate) fn get_container_idx(&self, container: &ContainerID) -> Option<ContainerIdx> {
|
||||
|
@ -356,7 +361,7 @@ impl LogStore {
|
|||
for<'any> F: FnOnce(&'any mut LogStore, &'any mut Hierarchy) -> R,
|
||||
{
|
||||
let h = self.hierarchy.clone();
|
||||
let mut h = h.lock().unwrap();
|
||||
let mut h = h.try_lock().unwrap();
|
||||
f(self, &mut h)
|
||||
}
|
||||
|
||||
|
|
|
@ -11,6 +11,7 @@ use crate::{dag::Dag, event::RawEvent, LogStore, LoroCore, LoroError, VersionVec
|
|||
|
||||
const UPDATE_ENCODE_THRESHOLD: usize = 5;
|
||||
const MAGIC_BYTES: [u8; 4] = [0x6c, 0x6f, 0x72, 0x6f];
|
||||
const ENCODE_SCHEMA_VERSION: &str = "1.0";
|
||||
pub enum EncodeMode {
|
||||
Auto(Option<VersionVector>),
|
||||
Updates(VersionVector),
|
||||
|
@ -54,7 +55,7 @@ pub struct LoroEncoder;
|
|||
|
||||
impl LoroEncoder {
|
||||
pub(crate) fn encode(loro: &LoroCore, config: EncodeConfig) -> Result<Vec<u8>, LoroError> {
|
||||
let version = env!("CARGO_PKG_VERSION");
|
||||
let version = ENCODE_SCHEMA_VERSION;
|
||||
let store = loro
|
||||
.log_store
|
||||
.try_read()
|
||||
|
|
|
@ -113,6 +113,13 @@ pub(super) fn encode_changes(store: &LogStore, vv: &VersionVector) -> Result<Vec
|
|||
idx
|
||||
});
|
||||
change_num += 1;
|
||||
for deps in change.deps.iter() {
|
||||
client_id_to_idx.entry(deps.client_id).or_insert_with(|| {
|
||||
let idx = clients.len() as ClientIdx;
|
||||
clients.push(deps.client_id);
|
||||
idx
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
let mut changes = Vec::with_capacity(change_num);
|
||||
|
@ -198,6 +205,7 @@ pub(super) fn encode_changes(store: &LogStore, vv: &VersionVector) -> Result<Vec
|
|||
};
|
||||
|
||||
to_vec(&encoded).map_err(|e| LoroError::DecodeError(e.to_string().into()))
|
||||
// postcard::to_allocvec(&encoded).map_err(|e| LoroError::DecodeError(e.to_string().into()))
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
|
@ -207,6 +215,7 @@ pub(super) fn decode_changes(
|
|||
) -> Result<Vec<RawEvent>, LoroError> {
|
||||
let encoded: Encoded =
|
||||
from_bytes(input).map_err(|e| LoroError::DecodeError(e.to_string().into()))?;
|
||||
|
||||
let Encoded {
|
||||
changes: change_encodings,
|
||||
ops,
|
||||
|
|
|
@ -293,6 +293,7 @@ pub(super) fn decode_snapshot(
|
|||
}
|
||||
return Ok(vec![]);
|
||||
}
|
||||
let mut hierarchy = store.hierarchy.try_lock().unwrap();
|
||||
|
||||
let mut op_iter = ops.into_iter();
|
||||
let mut changes = FxHashMap::default();
|
||||
|
@ -305,8 +306,9 @@ pub(super) fn decode_snapshot(
|
|||
let state = pool_mapping.into_state(&keys, &clients);
|
||||
let container = store.reg.get_by_idx(container_idx).unwrap();
|
||||
let mut container = container.try_lock().unwrap();
|
||||
container.to_import_snapshot(state);
|
||||
container.to_import_snapshot(state, &mut hierarchy);
|
||||
}
|
||||
drop(hierarchy);
|
||||
|
||||
for change_encoding in change_encodings {
|
||||
let ChangeEncoding {
|
||||
|
|
|
@ -182,24 +182,3 @@ fn convert_encoded_to_changes(changes: EncodedClientChanges) -> Vec<Change<Remot
|
|||
|
||||
result
|
||||
}
|
||||
|
||||
// impl LoroCore {
|
||||
// #[instrument(skip_all)]
|
||||
// pub fn export_updates(&self, from: &VersionVector) -> Result<Vec<u8>, LoroError> {
|
||||
// match self.log_store.try_read() {
|
||||
// Ok(x) => x.export_updates(from),
|
||||
// Err(_) => Err(LoroError::LockError),
|
||||
// }
|
||||
// }
|
||||
|
||||
// pub fn import_updates(&mut self, input: &[u8]) -> Result<(), LoroError> {
|
||||
// let ans = self.log_store.write().unwrap().import_updates(input);
|
||||
// match ans {
|
||||
// Ok(events) => {
|
||||
// self.notify(events);
|
||||
// Ok(())
|
||||
// }
|
||||
// Err(err) => Err(LoroError::DecodeError(err.to_string().into_boxed_str())),
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
|
|
|
@ -145,14 +145,14 @@ impl LogStore {
|
|||
self.frontiers = next_frontiers.get_frontiers();
|
||||
self.latest_lamport = self
|
||||
.changes
|
||||
.iter()
|
||||
.map(|(_, v)| v.last().unwrap().lamport_last())
|
||||
.values()
|
||||
.map(|v| v.last().unwrap().lamport_last())
|
||||
.max()
|
||||
.unwrap();
|
||||
self.latest_timestamp = self
|
||||
.changes
|
||||
.iter()
|
||||
.map(|(_, v)| v.last().unwrap().timestamp)
|
||||
.values()
|
||||
.map(|v| v.last().unwrap().timestamp)
|
||||
.max()
|
||||
.unwrap();
|
||||
}
|
||||
|
@ -294,7 +294,7 @@ impl LogStore {
|
|||
}
|
||||
});
|
||||
debug_log::group!("apply effects");
|
||||
let mut queue: VecDeque<_> = container_map.into_iter().map(|(_, x)| x).collect();
|
||||
let mut queue: VecDeque<_> = container_map.into_values().collect();
|
||||
let mut retries = 0;
|
||||
let mut h = self.hierarchy.try_lock().unwrap();
|
||||
// only apply the effects of a container when it's registered to the hierarchy
|
||||
|
@ -331,7 +331,7 @@ impl LogStore {
|
|||
for change in changes.iter() {
|
||||
for op in change.ops.iter() {
|
||||
if !container_map.contains_key(&op.container) {
|
||||
let guard = self.reg.get_or_create(&op.container).lock().unwrap();
|
||||
let guard = self.reg.get_or_create(&op.container).try_lock().unwrap();
|
||||
container_map
|
||||
// SAFETY: ignore lifetime issues here, because it's safe for us to store the mutex guard here
|
||||
.insert(op.container.clone(), unsafe { std::mem::transmute(guard) });
|
||||
|
|
|
@ -131,7 +131,7 @@ impl LoroCore {
|
|||
let store = self.log_store.read().unwrap();
|
||||
let hierarchy = store.hierarchy.clone();
|
||||
drop(store);
|
||||
let mut h = hierarchy.lock().unwrap();
|
||||
let mut h = hierarchy.try_lock().unwrap();
|
||||
h.send_notifications(events);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -241,10 +241,8 @@ impl<'a> RichOp<'a> {
|
|||
pub fn new_by_slice_on_change(change: &Change<Op>, start: i32, end: i32, op: &'a Op) -> Self {
|
||||
debug_assert!(end > start);
|
||||
let op_index_in_change = op.counter - change.id.counter;
|
||||
let op_slice_start = (start - op_index_in_change)
|
||||
.max(0)
|
||||
.min(op.atom_len() as i32);
|
||||
let op_slice_end = (end - op_index_in_change).max(0).min(op.atom_len() as i32);
|
||||
let op_slice_start = (start - op_index_in_change).clamp(0, op.atom_len() as i32);
|
||||
let op_slice_end = (end - op_index_in_change).clamp(0, op.atom_len() as i32);
|
||||
RichOp {
|
||||
op,
|
||||
client_id: change.id.client_id,
|
||||
|
|
|
@ -52,7 +52,7 @@ impl LoroValue {
|
|||
self = reg
|
||||
.get(id)
|
||||
.map(|container| {
|
||||
let mut value = container.lock().unwrap().get_value();
|
||||
let mut value = container.try_lock().unwrap().get_value();
|
||||
|
||||
match &mut value {
|
||||
LoroValue::List(list) => {
|
||||
|
|
|
@ -5,10 +5,11 @@ use ctor::ctor;
|
|||
|
||||
use loro_core::container::registry::ContainerWrapper;
|
||||
use loro_core::container::ContainerID;
|
||||
use loro_core::event::Index;
|
||||
use loro_core::context::Context;
|
||||
use loro_core::id::ID;
|
||||
|
||||
use loro_core::{ContainerType, LoroCore, LoroValue};
|
||||
use loro_core::log_store::{EncodeConfig, EncodeMode};
|
||||
use loro_core::{ContainerType, LoroCore, LoroValue, VersionVector};
|
||||
|
||||
#[test]
|
||||
#[cfg(feature = "json")]
|
||||
|
@ -231,13 +232,69 @@ fn fix_fields_order() {
|
|||
postcard::from_bytes::<Vec<ContainerID>>(&container_id_buf).unwrap(),
|
||||
container_id
|
||||
);
|
||||
}
|
||||
|
||||
let index = vec![Index::Key("".into()), Index::Seq(0)];
|
||||
let index_buf = vec![2, 0, 0, 1, 0];
|
||||
assert_eq!(
|
||||
postcard::from_bytes::<Vec<Index>>(&index_buf).unwrap(),
|
||||
index
|
||||
);
|
||||
#[test]
|
||||
fn encode_hierarchy() {
|
||||
fn assert_eq(c1: &LoroCore, c2: &LoroCore) {
|
||||
let s1 = c1.log_store();
|
||||
let s1 = s1.try_read().unwrap();
|
||||
let h1 = s1.hierarchy().try_lock().unwrap();
|
||||
|
||||
let s2 = c2.log_store();
|
||||
let s2 = s2.try_read().unwrap();
|
||||
let h2 = s2.hierarchy().try_lock().unwrap();
|
||||
assert_eq!(format!("{:?}", h1), format!("{:?}", h2));
|
||||
}
|
||||
|
||||
let mut c1 = LoroCore::default();
|
||||
let mut map = c1.get_map("map");
|
||||
let (_, text_id) = {
|
||||
let list_id = map.insert(&c1, "a", ContainerType::List).unwrap();
|
||||
let list = c1.get_container(&list_id.unwrap()).unwrap();
|
||||
let mut list = list.try_lock().unwrap();
|
||||
let list = list.as_list_mut().unwrap();
|
||||
list.insert(&c1, 0, ContainerType::Text)
|
||||
};
|
||||
{
|
||||
let text = c1.get_container(&text_id.unwrap()).unwrap();
|
||||
let mut text = text.try_lock().unwrap();
|
||||
let text = text.as_text_mut().unwrap();
|
||||
text.insert(&c1, 0, "text_text");
|
||||
};
|
||||
|
||||
// updates
|
||||
println!("updates");
|
||||
let input = c1
|
||||
.encode(EncodeConfig::new(
|
||||
EncodeMode::Updates(VersionVector::new()),
|
||||
None,
|
||||
))
|
||||
.unwrap();
|
||||
let mut c2 = LoroCore::default();
|
||||
c2.decode(&input).unwrap();
|
||||
assert_eq(&c1, &c2);
|
||||
|
||||
// rle updates
|
||||
println!("rle updates");
|
||||
let input = c1
|
||||
.encode(EncodeConfig::new(
|
||||
EncodeMode::RleUpdates(VersionVector::new()),
|
||||
None,
|
||||
))
|
||||
.unwrap();
|
||||
let mut c2 = LoroCore::default();
|
||||
c2.decode(&input).unwrap();
|
||||
assert_eq(&c1, &c2);
|
||||
|
||||
// snapshot
|
||||
println!("snapshot");
|
||||
let input = c1
|
||||
.encode(EncodeConfig::new(EncodeMode::Snapshot, None))
|
||||
.unwrap();
|
||||
let mut c2 = LoroCore::default();
|
||||
c2.decode(&input).unwrap();
|
||||
assert_eq(&c1, &c2);
|
||||
}
|
||||
|
||||
#[ctor]
|
||||
|
|
|
@ -149,7 +149,7 @@ impl Prelim for PrelimText {
|
|||
}
|
||||
|
||||
fn integrate<C: Context>(self, ctx: &C, container: &Arc<Mutex<ContainerInstance>>) {
|
||||
let mut text = container.lock().unwrap();
|
||||
let mut text = container.try_lock().unwrap();
|
||||
let text = text.as_text_mut().unwrap();
|
||||
text.insert(ctx, 0, &self.0);
|
||||
}
|
||||
|
@ -161,7 +161,7 @@ impl Prelim for PrelimList {
|
|||
}
|
||||
|
||||
fn integrate<C: Context>(self, ctx: &C, container: &Arc<Mutex<ContainerInstance>>) {
|
||||
let mut list = container.lock().unwrap();
|
||||
let mut list = container.try_lock().unwrap();
|
||||
let list = list.as_list_mut().unwrap();
|
||||
let values: Vec<LoroValue> = self.0.into_iter().map(|v| v.into()).collect();
|
||||
list.insert_batch(ctx, 0, values);
|
||||
|
@ -174,7 +174,7 @@ impl Prelim for PrelimMap {
|
|||
}
|
||||
|
||||
fn integrate<C: Context>(self, ctx: &C, container: &Arc<Mutex<ContainerInstance>>) {
|
||||
let mut map = container.lock().unwrap();
|
||||
let mut map = container.try_lock().unwrap();
|
||||
let map = map.as_map_mut().unwrap();
|
||||
for (key, value) in self.0.into_iter() {
|
||||
let value: LoroValue = value.into();
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
use std::{cell::RefCell, fmt::Debug, ptr::NonNull, rc::Rc};
|
||||
use std::{fmt::Debug, ptr::NonNull};
|
||||
|
||||
use fxhash::{FxHashMap, FxHashSet};
|
||||
|
||||
|
@ -7,7 +7,7 @@ use crate::{
|
|||
rle_tree::{
|
||||
node::{InternalNode, LeafNode},
|
||||
tree_trait::GlobalTreeTrait,
|
||||
Arena, HeapMode, SafeCursor, UnsafeCursor, VecTrait,
|
||||
Arena, HeapMode, UnsafeCursor, VecTrait,
|
||||
},
|
||||
HasLength, Mergable, Rle, RleTree, Sliceable,
|
||||
};
|
||||
|
|
Loading…
Reference in a new issue