mirror of
https://github.com/loro-dev/loro.git
synced 2025-01-23 05:24:51 +00:00
feat: record hierarchical info
This commit is contained in:
parent
aeb935455e
commit
9bdb6b9fd4
10 changed files with 353 additions and 147 deletions
|
@ -63,3 +63,9 @@ impl Default for Configure {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn rand_u64() -> u64 {
|
||||
let mut buf = [0u8; 8];
|
||||
SystemRandom::new().fill(&mut buf).unwrap();
|
||||
u64::from_le_bytes(buf)
|
||||
}
|
||||
|
|
|
@ -5,9 +5,10 @@
|
|||
//! Every [Container] can take a [Snapshot], which contains [crate::LoroValue] that describes the state.
|
||||
//!
|
||||
use crate::{
|
||||
hierarchy::Hierarchy,
|
||||
op::{InnerContent, RemoteContent, RichOp},
|
||||
version::{IdSpanVector, VersionVector},
|
||||
InternalString, LoroValue, ID,
|
||||
InternalString, LogStore, LoroValue, ID,
|
||||
};
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
@ -47,7 +48,7 @@ pub trait Container: Debug + Any + Unpin {
|
|||
fn to_import(&mut self, content: RemoteContent) -> InnerContent;
|
||||
|
||||
/// Apply the effect of the op directly to the state.
|
||||
fn update_state_directly(&mut self, op: &RichOp);
|
||||
fn update_state_directly(&mut self, hierarchy: &mut Hierarchy, op: &RichOp);
|
||||
|
||||
/// Tracker need to retreat in order to apply the op.
|
||||
/// TODO: can be merged into checkout
|
||||
|
@ -65,11 +66,16 @@ pub trait Container: Debug + Any + Unpin {
|
|||
/// Here we have not updated the container state yet. Because we
|
||||
/// need to calculate the effect of the op for [crate::List] and
|
||||
/// [crate::Text] by using tracker.
|
||||
fn track_apply(&mut self, op: &RichOp);
|
||||
fn track_apply(&mut self, hierarchy: &mut Hierarchy, op: &RichOp);
|
||||
|
||||
/// Make tracker iterate over the target spans and apply the calculated
|
||||
/// effects to the container state
|
||||
fn apply_tracked_effects_from(&mut self, from: &VersionVector, effect_spans: &IdSpanVector);
|
||||
fn apply_tracked_effects_from(
|
||||
&mut self,
|
||||
store: &mut LogStore,
|
||||
from: &VersionVector,
|
||||
effect_spans: &IdSpanVector,
|
||||
);
|
||||
}
|
||||
|
||||
/// [ContainerID] includes the Op's [ID] and the type. So it's impossible to have
|
||||
|
|
|
@ -3,7 +3,7 @@ use std::sync::{Arc, Mutex};
|
|||
|
||||
use rle::{
|
||||
rle_tree::{tree_trait::CumulateTreeTrait, HeapMode},
|
||||
RleTree,
|
||||
HasLength, RleTree,
|
||||
};
|
||||
use smallvec::SmallVec;
|
||||
|
||||
|
@ -20,12 +20,13 @@ use crate::{
|
|||
},
|
||||
context::Context,
|
||||
event::Index,
|
||||
hierarchy::Hierarchy,
|
||||
id::{ClientID, Counter, ID},
|
||||
op::{InnerContent, Op, RemoteContent, RichOp},
|
||||
prelim::Prelim,
|
||||
value::LoroValue,
|
||||
version::IdSpanVector,
|
||||
LoroError,
|
||||
LogStore, LoroError,
|
||||
};
|
||||
|
||||
use super::list_op::InnerListOp;
|
||||
|
@ -52,6 +53,7 @@ impl ListContainer {
|
|||
if values.is_empty() {
|
||||
return;
|
||||
}
|
||||
assert!(!values.iter().any(|x|x.as_unresolved().is_some()), "Cannot have containers in insert_batch method. If you want to create sub container, please use insert_obj or insert method");
|
||||
let store = ctx.log_store();
|
||||
let mut store = store.try_write().unwrap();
|
||||
|
||||
|
@ -93,6 +95,10 @@ impl ListContainer {
|
|||
}
|
||||
|
||||
fn insert_value<C: Context>(&mut self, ctx: &C, pos: usize, value: LoroValue) -> Option<ID> {
|
||||
assert!(
|
||||
value.as_unresolved().is_none(),
|
||||
"To insert a container to list, you should use insert_obj method or insert a prelim type to the list"
|
||||
);
|
||||
let store = ctx.log_store();
|
||||
let mut store = store.write().unwrap();
|
||||
let id = store.next_id();
|
||||
|
@ -114,7 +120,10 @@ impl ListContainer {
|
|||
fn insert_obj<C: Context>(&mut self, ctx: &C, pos: usize, obj: ContainerType) -> ContainerID {
|
||||
let m = ctx.log_store();
|
||||
let mut store = m.write().unwrap();
|
||||
let container_id = store.create_container(obj);
|
||||
let (container_id, _) = store.create_container(obj);
|
||||
// Update hierarchy info
|
||||
store.hierarchy.add_child(&self.id, &container_id);
|
||||
|
||||
// TODO: we can avoid this lock
|
||||
drop(store);
|
||||
self.insert(
|
||||
|
@ -122,6 +131,7 @@ impl ListContainer {
|
|||
pos,
|
||||
LoroValue::Unresolved(Box::new(container_id.clone())),
|
||||
);
|
||||
|
||||
container_id
|
||||
}
|
||||
|
||||
|
@ -151,10 +161,28 @@ impl ListContainer {
|
|||
);
|
||||
|
||||
store.append_local_ops(&[op]);
|
||||
// Update hierarchy info
|
||||
self.update_hierarchy_on_delete(&mut store.hierarchy, pos, len);
|
||||
|
||||
self.state.delete_range(Some(pos), Some(pos + len));
|
||||
Some(id)
|
||||
}
|
||||
|
||||
fn update_hierarchy_on_delete(&mut self, hierarchy: &mut Hierarchy, pos: usize, len: usize) {
|
||||
if !hierarchy.has_children(&self.id) {
|
||||
return;
|
||||
}
|
||||
|
||||
for state in self.state.iter_range(pos, Some(pos + len)) {
|
||||
let range = &state.as_ref().0;
|
||||
for value in self.raw_data.slice(range).iter() {
|
||||
if let LoroValue::Unresolved(container_id) = value {
|
||||
hierarchy.remove_child(&self.id, container_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn values_len(&self) -> usize {
|
||||
self.state.len()
|
||||
}
|
||||
|
@ -189,6 +217,14 @@ impl ListContainer {
|
|||
|
||||
None
|
||||
}
|
||||
|
||||
fn update_hierarchy_on_insert(&mut self, hierarchy: &mut Hierarchy, content: &SliceRange) {
|
||||
for value in self.raw_data.slice(&content.0).iter() {
|
||||
if let LoroValue::Unresolved(container_id) = value {
|
||||
hierarchy.add_child(&self.id, container_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Container for ListContainer {
|
||||
|
@ -248,13 +284,22 @@ impl Container for ListContainer {
|
|||
}
|
||||
}
|
||||
|
||||
fn update_state_directly(&mut self, op: &RichOp) {
|
||||
fn update_state_directly(&mut self, hierarchy: &mut Hierarchy, op: &RichOp) {
|
||||
match &op.get_sliced().content {
|
||||
InnerContent::List(op) => match op {
|
||||
InnerListOp::Insert { slice, pos } => self.state.insert(*pos, slice.clone()),
|
||||
InnerListOp::Delete(span) => self
|
||||
.state
|
||||
.delete_range(Some(span.start() as usize), Some(span.end() as usize)),
|
||||
InnerListOp::Insert { slice, pos } => {
|
||||
self.update_hierarchy_on_insert(hierarchy, slice);
|
||||
self.state.insert(*pos, slice.clone());
|
||||
}
|
||||
InnerListOp::Delete(span) => {
|
||||
self.update_hierarchy_on_delete(
|
||||
hierarchy,
|
||||
span.start() as usize,
|
||||
span.atom_len(),
|
||||
);
|
||||
self.state
|
||||
.delete_range(Some(span.start() as usize), Some(span.end() as usize));
|
||||
}
|
||||
},
|
||||
_ => unreachable!(),
|
||||
}
|
||||
|
@ -279,19 +324,46 @@ impl Container for ListContainer {
|
|||
}
|
||||
}
|
||||
|
||||
fn track_apply(&mut self, rich_op: &RichOp) {
|
||||
fn track_apply(&mut self, _: &mut Hierarchy, rich_op: &RichOp) {
|
||||
self.tracker.track_apply(rich_op);
|
||||
}
|
||||
|
||||
fn apply_tracked_effects_from(
|
||||
&mut self,
|
||||
store: &mut LogStore,
|
||||
from: &crate::VersionVector,
|
||||
effect_spans: &IdSpanVector,
|
||||
) {
|
||||
for effect in self.tracker.iter_effects(from, effect_spans) {
|
||||
match effect {
|
||||
Effect::Del { pos, len } => self.state.delete_range(Some(pos), Some(pos + len)),
|
||||
Effect::Ins { pos, content } => self.state.insert(pos, content),
|
||||
Effect::Del { pos, len } => {
|
||||
// Update hierarchy info
|
||||
if store.hierarchy.has_children(&self.id) {
|
||||
for state in self.state.iter_range(pos, Some(pos + len)) {
|
||||
let range = &state.as_ref().0;
|
||||
for value in self.raw_data.slice(range).iter() {
|
||||
if let LoroValue::Unresolved(container_id) = value {
|
||||
store.hierarchy.remove_child(&self.id, container_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
self.state.delete_range(Some(pos), Some(pos + len));
|
||||
}
|
||||
Effect::Ins { pos, content } => {
|
||||
// Update hierarchy info
|
||||
{
|
||||
let content = &content;
|
||||
for value in self.raw_data.slice(&content.0).iter() {
|
||||
if let LoroValue::Unresolved(container_id) = value {
|
||||
store.hierarchy.add_child(&self.id, container_id);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
self.state.insert(pos, content);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -11,13 +11,14 @@ use crate::{
|
|||
},
|
||||
context::Context,
|
||||
event::Index,
|
||||
hierarchy::Hierarchy,
|
||||
id::ClientID,
|
||||
op::{InnerContent, Op, RemoteContent, RichOp},
|
||||
prelim::Prelim,
|
||||
span::HasLamport,
|
||||
value::LoroValue,
|
||||
version::{IdSpanVector, TotalOrderStamp},
|
||||
InternalString,
|
||||
InternalString, LogStore,
|
||||
};
|
||||
|
||||
use super::MapSet;
|
||||
|
@ -72,6 +73,7 @@ impl MapContainer {
|
|||
}
|
||||
|
||||
fn insert_value<C: Context>(&mut self, ctx: &C, key: InternalString, value: LoroValue) {
|
||||
assert!(value.as_unresolved().is_none(), "To insert a container to map, you should use insert_obj method or insert with a Prelim container value");
|
||||
let value_index = self.pool.alloc(value).start;
|
||||
let value = value_index;
|
||||
let self_id = &self.id;
|
||||
|
@ -84,7 +86,6 @@ impl MapContainer {
|
|||
};
|
||||
|
||||
let id = store.next_id_for(client_id);
|
||||
// // TODO: store this value?
|
||||
let container = store.get_container_idx(self_id).unwrap();
|
||||
store.append_local_ops(&[Op {
|
||||
counter: id.counter,
|
||||
|
@ -94,6 +95,7 @@ impl MapContainer {
|
|||
value,
|
||||
}),
|
||||
}]);
|
||||
self.update_hierarchy_if_container_is_overwritten(&key, &mut store);
|
||||
self.state.insert(key, ValueSlot { value, order });
|
||||
}
|
||||
|
||||
|
@ -107,12 +109,10 @@ impl MapContainer {
|
|||
let m = ctx.log_store();
|
||||
let mut store = m.write().unwrap();
|
||||
let client_id = store.this_client_id;
|
||||
let container_id = store.create_container(obj);
|
||||
let value_index = self.pool.alloc(container_id.clone()).start;
|
||||
let value = value_index;
|
||||
// TODO: store this value?
|
||||
let (container_id, _) = store.create_container(obj);
|
||||
let value = self.pool.alloc(container_id.clone()).start;
|
||||
let id = store.next_id_for(client_id);
|
||||
let container = store.get_container_idx(self_id).unwrap();
|
||||
let self_idx = store.get_container_idx(self_id).unwrap();
|
||||
let order = TotalOrderStamp {
|
||||
client_id,
|
||||
lamport: store.next_lamport(),
|
||||
|
@ -120,19 +120,35 @@ impl MapContainer {
|
|||
|
||||
store.append_local_ops(&[Op {
|
||||
counter: id.counter,
|
||||
container,
|
||||
container: self_idx,
|
||||
content: InnerContent::Map(InnerMapSet {
|
||||
value,
|
||||
key: key.clone(),
|
||||
}),
|
||||
}]);
|
||||
store.hierarchy.add_child(&self.id, &container_id);
|
||||
self.update_hierarchy_if_container_is_overwritten(&key, &mut store);
|
||||
|
||||
self.state.insert(key, ValueSlot { value, order });
|
||||
container_id
|
||||
}
|
||||
|
||||
fn update_hierarchy_if_container_is_overwritten(
|
||||
&mut self,
|
||||
key: &InternalString,
|
||||
store: &mut LogStore,
|
||||
) {
|
||||
if let Some(old_value) = self.state.get(key) {
|
||||
let v = &self.pool[old_value.value];
|
||||
if let Some(container) = v.as_unresolved() {
|
||||
store.hierarchy.remove_child(&self.id, container);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn delete<C: Context>(&mut self, ctx: &C, key: InternalString) {
|
||||
self.insert(ctx, key, LoroValue::Null);
|
||||
self.insert_value(ctx, key, LoroValue::Null);
|
||||
}
|
||||
|
||||
pub fn index_of_child(&self, child: &ContainerID) -> Option<Index> {
|
||||
|
@ -213,23 +229,27 @@ impl Container for MapContainer {
|
|||
unreachable!()
|
||||
}
|
||||
|
||||
fn update_state_directly(&mut self, op: &RichOp) {
|
||||
fn update_state_directly(&mut self, hierarchy: &mut Hierarchy, op: &RichOp) {
|
||||
let content = op.get_sliced().content;
|
||||
let v: &InnerMapSet = content.as_map().unwrap();
|
||||
let new_val: &InnerMapSet = content.as_map().unwrap();
|
||||
let order = TotalOrderStamp {
|
||||
lamport: op.lamport(),
|
||||
client_id: op.client_id(),
|
||||
};
|
||||
if let Some(slot) = self.state.get_mut(&v.key) {
|
||||
if let Some(slot) = self.state.get_mut(&new_val.key) {
|
||||
if slot.order < order {
|
||||
slot.value = v.value;
|
||||
let old_val = &self.pool[slot.value];
|
||||
if let Some(container) = old_val.as_unresolved() {
|
||||
hierarchy.remove_child(&self.id, container);
|
||||
}
|
||||
slot.value = new_val.value;
|
||||
slot.order = order;
|
||||
}
|
||||
} else {
|
||||
self.state.insert(
|
||||
v.key.to_owned(),
|
||||
new_val.key.to_owned(),
|
||||
ValueSlot {
|
||||
value: v.value,
|
||||
value: new_val.value,
|
||||
order,
|
||||
},
|
||||
);
|
||||
|
@ -240,10 +260,16 @@ impl Container for MapContainer {
|
|||
|
||||
fn track_forward(&mut self, _: &IdSpanVector) {}
|
||||
|
||||
fn apply_tracked_effects_from(&mut self, _: &crate::VersionVector, _: &IdSpanVector) {}
|
||||
fn track_apply(&mut self, hierarchy: &mut Hierarchy, op: &RichOp) {
|
||||
self.update_state_directly(hierarchy, op);
|
||||
}
|
||||
|
||||
fn track_apply(&mut self, op: &RichOp) {
|
||||
self.update_state_directly(op);
|
||||
fn apply_tracked_effects_from(
|
||||
&mut self,
|
||||
_store: &mut crate::LogStore,
|
||||
_from: &crate::VersionVector,
|
||||
_effect_spans: &IdSpanVector,
|
||||
) {
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -12,10 +12,11 @@ use smallvec::SmallVec;
|
|||
use crate::{
|
||||
context::Context,
|
||||
event::Index,
|
||||
hierarchy::Hierarchy,
|
||||
id::ClientID,
|
||||
op::{RemoteContent, RichOp},
|
||||
version::IdSpanVector,
|
||||
LoroError, LoroValue, VersionVector,
|
||||
LogStore, LoroError, LoroValue, VersionVector,
|
||||
};
|
||||
|
||||
use super::{
|
||||
|
@ -78,12 +79,12 @@ impl Container for ContainerInstance {
|
|||
}
|
||||
}
|
||||
|
||||
fn update_state_directly(&mut self, op: &RichOp) {
|
||||
fn update_state_directly(&mut self, hierarchy: &mut Hierarchy, op: &RichOp) {
|
||||
match self {
|
||||
ContainerInstance::Map(x) => x.update_state_directly(op),
|
||||
ContainerInstance::Text(x) => x.update_state_directly(op),
|
||||
ContainerInstance::Dyn(x) => x.update_state_directly(op),
|
||||
ContainerInstance::List(x) => x.update_state_directly(op),
|
||||
ContainerInstance::Map(x) => x.update_state_directly(hierarchy, op),
|
||||
ContainerInstance::Text(x) => x.update_state_directly(hierarchy, op),
|
||||
ContainerInstance::Dyn(x) => x.update_state_directly(hierarchy, op),
|
||||
ContainerInstance::List(x) => x.update_state_directly(hierarchy, op),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -105,21 +106,26 @@ impl Container for ContainerInstance {
|
|||
}
|
||||
}
|
||||
|
||||
fn track_apply(&mut self, op: &RichOp) {
|
||||
fn track_apply(&mut self, hierarchy: &mut Hierarchy, op: &RichOp) {
|
||||
match self {
|
||||
ContainerInstance::Map(x) => x.track_apply(op),
|
||||
ContainerInstance::Text(x) => x.track_apply(op),
|
||||
ContainerInstance::Dyn(x) => x.track_apply(op),
|
||||
ContainerInstance::List(x) => x.track_apply(op),
|
||||
ContainerInstance::Map(x) => x.track_apply(hierarchy, op),
|
||||
ContainerInstance::Text(x) => x.track_apply(hierarchy, op),
|
||||
ContainerInstance::Dyn(x) => x.track_apply(hierarchy, op),
|
||||
ContainerInstance::List(x) => x.track_apply(hierarchy, op),
|
||||
}
|
||||
}
|
||||
|
||||
fn apply_tracked_effects_from(&mut self, from: &VersionVector, effect_spans: &IdSpanVector) {
|
||||
fn apply_tracked_effects_from(
|
||||
&mut self,
|
||||
store: &mut LogStore,
|
||||
from: &VersionVector,
|
||||
effect_spans: &IdSpanVector,
|
||||
) {
|
||||
match self {
|
||||
ContainerInstance::Map(x) => x.apply_tracked_effects_from(from, effect_spans),
|
||||
ContainerInstance::Text(x) => x.apply_tracked_effects_from(from, effect_spans),
|
||||
ContainerInstance::Dyn(x) => x.apply_tracked_effects_from(from, effect_spans),
|
||||
ContainerInstance::List(x) => x.apply_tracked_effects_from(from, effect_spans),
|
||||
ContainerInstance::Map(x) => x.apply_tracked_effects_from(store, from, effect_spans),
|
||||
ContainerInstance::Text(x) => x.apply_tracked_effects_from(store, from, effect_spans),
|
||||
ContainerInstance::Dyn(x) => x.apply_tracked_effects_from(store, from, effect_spans),
|
||||
ContainerInstance::List(x) => x.apply_tracked_effects_from(store, from, effect_spans),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -225,9 +231,9 @@ impl ContainerRegistry {
|
|||
ContainerIdx(self.containers.len() as u32)
|
||||
}
|
||||
|
||||
pub(crate) fn register(&mut self, id: &ContainerID) {
|
||||
pub(crate) fn register(&mut self, id: &ContainerID) -> ContainerIdx {
|
||||
let container = self.create(id.clone());
|
||||
self.insert(id.clone(), container);
|
||||
self.insert(id.clone(), container)
|
||||
}
|
||||
|
||||
pub(crate) fn get_or_create(&mut self, id: &ContainerID) -> &Arc<Mutex<ContainerInstance>> {
|
||||
|
|
|
@ -14,10 +14,12 @@ use crate::{
|
|||
},
|
||||
context::Context,
|
||||
debug_log,
|
||||
hierarchy::Hierarchy,
|
||||
id::{ClientID, Counter, ID},
|
||||
op::{InnerContent, Op, RemoteContent, RichOp},
|
||||
value::LoroValue,
|
||||
version::IdSpanVector,
|
||||
LogStore,
|
||||
};
|
||||
|
||||
use super::{
|
||||
|
@ -216,7 +218,7 @@ impl Container for TextContainer {
|
|||
}
|
||||
}
|
||||
|
||||
fn update_state_directly(&mut self, op: &RichOp) {
|
||||
fn update_state_directly(&mut self, _: &mut Hierarchy, op: &RichOp) {
|
||||
match &op.get_sliced().content {
|
||||
InnerContent::List(op) => match op {
|
||||
InnerListOp::Insert { slice, pos } => self.state.insert(*pos, slice.clone()),
|
||||
|
@ -252,12 +254,13 @@ impl Container for TextContainer {
|
|||
}
|
||||
}
|
||||
|
||||
fn track_apply(&mut self, rich_op: &RichOp) {
|
||||
fn track_apply(&mut self, _: &mut Hierarchy, rich_op: &RichOp) {
|
||||
self.tracker.track_apply(rich_op);
|
||||
}
|
||||
|
||||
fn apply_tracked_effects_from(
|
||||
&mut self,
|
||||
_: &mut LogStore,
|
||||
from: &crate::VersionVector,
|
||||
effect_spans: &IdSpanVector,
|
||||
) {
|
||||
|
|
|
@ -1,17 +1,12 @@
|
|||
use fxhash::{FxHashMap, FxHashSet};
|
||||
|
||||
use crate::{
|
||||
container::{registry::ContainerIdx, ContainerID},
|
||||
delta::Delta,
|
||||
version::Frontiers,
|
||||
InternalString, LoroValue,
|
||||
};
|
||||
use crate::{container::ContainerID, delta::Delta, version::Frontiers, InternalString, LoroValue};
|
||||
|
||||
pub(crate) struct RawEvent {
|
||||
container_idx: ContainerIdx,
|
||||
old_version: Frontiers,
|
||||
new_version: Frontiers,
|
||||
diff: Diff,
|
||||
pub struct RawEvent {
|
||||
pub container_id: ContainerID,
|
||||
pub old_version: Frontiers,
|
||||
pub new_version: Frontiers,
|
||||
pub diff: Vec<Diff>,
|
||||
}
|
||||
|
||||
pub struct Event {
|
||||
|
@ -26,22 +21,26 @@ pub struct Event {
|
|||
|
||||
pub type Path = Vec<Index>;
|
||||
|
||||
#[derive(Clone, PartialEq, Eq)]
|
||||
pub enum Index {
|
||||
Key(InternalString),
|
||||
Seq(usize),
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub enum Diff {
|
||||
List(Delta<Vec<LoroValue>>),
|
||||
Text(Delta<String>),
|
||||
Map(MapDiff),
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct ValuePair {
|
||||
pub old: LoroValue,
|
||||
pub new: LoroValue,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct MapDiff {
|
||||
pub added: FxHashMap<InternalString, LoroValue>,
|
||||
pub updated: FxHashMap<InternalString, ValuePair>,
|
||||
|
|
|
@ -3,24 +3,23 @@ use std::fmt::Debug;
|
|||
use fxhash::{FxHashMap, FxHashSet};
|
||||
|
||||
use crate::{
|
||||
container::{
|
||||
registry::{ContainerIdx, ContainerRegistry},
|
||||
ContainerID,
|
||||
},
|
||||
event::{Index, Observer, Path, RawEvent},
|
||||
configure::rand_u64,
|
||||
container::{registry::ContainerRegistry, ContainerID},
|
||||
event::{Event, Index, Observer, Path, RawEvent},
|
||||
};
|
||||
|
||||
/// [`Hierarchy`] stores the hierarchical relationship between containers
|
||||
#[derive(Default, Debug)]
|
||||
pub(crate) struct Hierarchy {
|
||||
nodes: FxHashMap<ContainerIdx, Node>,
|
||||
pub struct Hierarchy {
|
||||
nodes: FxHashMap<ContainerID, Node>,
|
||||
}
|
||||
|
||||
type SubscriptionID = u64;
|
||||
#[derive(Default)]
|
||||
struct Node {
|
||||
parent: Option<ContainerIdx>,
|
||||
children: FxHashSet<ContainerIdx>,
|
||||
observers: Vec<Box<Observer>>,
|
||||
parent: Option<ContainerID>,
|
||||
children: FxHashSet<ContainerID>,
|
||||
observers: FxHashMap<SubscriptionID, Box<Observer>>,
|
||||
}
|
||||
|
||||
impl Debug for Node {
|
||||
|
@ -33,23 +32,35 @@ impl Debug for Node {
|
|||
}
|
||||
|
||||
impl Hierarchy {
|
||||
pub fn add_child(&mut self, parent: ContainerIdx, child: ContainerIdx) {
|
||||
let parent_node = self.nodes.entry(parent).or_default();
|
||||
parent_node.children.insert(child);
|
||||
let child_node = self.nodes.entry(child).or_default();
|
||||
child_node.parent = Some(parent);
|
||||
#[inline(always)]
|
||||
pub fn is_empty(&self) -> bool {
|
||||
self.nodes.is_empty()
|
||||
}
|
||||
|
||||
pub fn remove_child(&mut self, parent: ContainerIdx, child: ContainerIdx) {
|
||||
let parent_node = self.nodes.get_mut(&parent).unwrap();
|
||||
parent_node.children.remove(&child);
|
||||
pub fn add_child(&mut self, parent: &ContainerID, child: &ContainerID) {
|
||||
let parent_node = self.nodes.entry(parent.clone()).or_default();
|
||||
parent_node.children.insert(child.clone());
|
||||
let child_node = self.nodes.entry(child.clone()).or_default();
|
||||
child_node.parent = Some(parent.clone());
|
||||
}
|
||||
|
||||
pub fn has_children(&self, id: &ContainerID) -> bool {
|
||||
self.nodes
|
||||
.get(id)
|
||||
.map(|node| !node.children.is_empty())
|
||||
.unwrap_or(false)
|
||||
}
|
||||
|
||||
pub fn remove_child(&mut self, parent: &ContainerID, child: &ContainerID) {
|
||||
let parent_node = self.nodes.get_mut(parent).unwrap();
|
||||
parent_node.children.remove(child);
|
||||
let mut visited_descendants = FxHashSet::default();
|
||||
let mut stack = vec![child];
|
||||
while let Some(child) = stack.pop() {
|
||||
visited_descendants.insert(child);
|
||||
let child_node = self.nodes.get(&child).unwrap();
|
||||
visited_descendants.insert(child.clone());
|
||||
let child_node = self.nodes.get(child).unwrap();
|
||||
for child in child_node.children.iter() {
|
||||
stack.push(*child);
|
||||
stack.push(child);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -61,24 +72,23 @@ impl Hierarchy {
|
|||
pub fn get_path(
|
||||
&mut self,
|
||||
reg: &ContainerRegistry,
|
||||
descendant: &ContainerIdx,
|
||||
current_target: Option<&ContainerIdx>,
|
||||
descendant: &ContainerID,
|
||||
current_target: Option<&ContainerID>,
|
||||
) -> Path {
|
||||
let mut path = Path::default();
|
||||
let node = Some(descendant);
|
||||
while let Some(node_idx) = node {
|
||||
let node = self.nodes.get(node_idx).unwrap();
|
||||
let node_id = reg.get_id(*node_idx).unwrap();
|
||||
while let Some(node_id) = node {
|
||||
let node = self.nodes.get(node_id).unwrap();
|
||||
let parent = &node.parent;
|
||||
if let Some(parent) = parent {
|
||||
let parent_node = reg.get_by_idx(*parent).unwrap();
|
||||
let parent_node = reg.get(parent).unwrap();
|
||||
let index = parent_node.lock().unwrap().index_of_child(node_id).unwrap();
|
||||
path.push(index);
|
||||
} else {
|
||||
match node_id {
|
||||
ContainerID::Root {
|
||||
name,
|
||||
container_type,
|
||||
container_type: _,
|
||||
} => path.push(Index::Key(name.clone())),
|
||||
_ => unreachable!(),
|
||||
}
|
||||
|
@ -93,21 +103,68 @@ impl Hierarchy {
|
|||
path
|
||||
}
|
||||
|
||||
pub fn should_notify(&self, container_idx: ContainerIdx) -> bool {
|
||||
let mut node_idx = Some(container_idx);
|
||||
while let Some(inner_node_idx) = node_idx {
|
||||
let node = self.nodes.get(&inner_node_idx).unwrap();
|
||||
pub fn should_notify(&self, container_id: ContainerID) -> bool {
|
||||
let mut node_id = Some(&container_id);
|
||||
while let Some(inner_node_id) = node_id {
|
||||
let node = self.nodes.get(inner_node_id).unwrap();
|
||||
if !node.observers.is_empty() {
|
||||
return true;
|
||||
}
|
||||
|
||||
node_idx = node.parent;
|
||||
node_id = node.parent.as_ref();
|
||||
}
|
||||
|
||||
false
|
||||
}
|
||||
|
||||
pub fn notify(&mut self, event: RawEvent, reg: &ContainerRegistry) {
|
||||
todo!()
|
||||
pub fn notify(&mut self, raw_event: RawEvent, reg: &ContainerRegistry) {
|
||||
let target_id = raw_event.container_id;
|
||||
let mut absolute_path = self.get_path(reg, &target_id, None);
|
||||
absolute_path.reverse();
|
||||
let path_to_root = absolute_path;
|
||||
let mut current_target_id = Some(&target_id);
|
||||
let mut count = 0;
|
||||
let mut event = Event {
|
||||
relative_path: Default::default(),
|
||||
old_version: raw_event.old_version,
|
||||
new_version: raw_event.new_version,
|
||||
current_target: target_id.clone(),
|
||||
target: target_id.clone(),
|
||||
diff: raw_event.diff,
|
||||
};
|
||||
|
||||
while let Some(id) = current_target_id {
|
||||
let node = self.nodes.get(id).unwrap();
|
||||
if !node.observers.is_empty() {
|
||||
let mut relative_path = path_to_root[..count].to_vec();
|
||||
relative_path.reverse();
|
||||
event.relative_path = relative_path;
|
||||
event.current_target = id.clone();
|
||||
for (_, observer) in node.observers.iter() {
|
||||
observer(&event);
|
||||
}
|
||||
}
|
||||
|
||||
count += 1;
|
||||
current_target_id = node.parent.as_ref();
|
||||
}
|
||||
}
|
||||
|
||||
pub fn observe(&mut self, container: ContainerID, observer: Box<Observer>) -> SubscriptionID {
|
||||
let id = rand_u64();
|
||||
self.nodes
|
||||
.entry(container)
|
||||
.or_default()
|
||||
.observers
|
||||
.insert(id, observer);
|
||||
id
|
||||
}
|
||||
|
||||
pub fn cancel_observe(&mut self, container: ContainerID, id: SubscriptionID) {
|
||||
self.nodes
|
||||
.entry(container)
|
||||
.or_default()
|
||||
.observers
|
||||
.remove(&id);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -184,11 +184,14 @@ impl LogStore {
|
|||
op.clone().convert(&mut container, self.cfg.gc.gc)
|
||||
}
|
||||
|
||||
pub(crate) fn create_container(&mut self, container_type: ContainerType) -> ContainerID {
|
||||
pub(crate) fn create_container(
|
||||
&mut self,
|
||||
container_type: ContainerType,
|
||||
) -> (ContainerID, ContainerIdx) {
|
||||
let id = self.next_id();
|
||||
let container_id = ContainerID::new_normal(id, container_type);
|
||||
self.reg.register(&container_id);
|
||||
container_id
|
||||
let idx = self.reg.register(&container_id);
|
||||
(container_id, idx)
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
use crate::{container::registry::ContainerIdx, LogStore};
|
||||
use crate::{container::registry::ContainerIdx, hierarchy::Hierarchy, LogStore};
|
||||
use std::{ops::ControlFlow, sync::MutexGuard};
|
||||
|
||||
use fxhash::FxHashMap;
|
||||
|
@ -118,38 +118,48 @@ impl LogStore {
|
|||
let target_spans = next_vv.diff(&self.vv).left;
|
||||
if target_spans.len() == 1 {
|
||||
let (client_id, span) = target_spans.iter().next().unwrap();
|
||||
for op in self.iter_ops_at_id_span(IdSpan::new(*client_id, span.start, span.end)) {
|
||||
let container = container_map.get_mut(&op.op().container).unwrap();
|
||||
container.update_state_directly(&op);
|
||||
}
|
||||
|
||||
self.with_hierarchy(|store, hierarchy| {
|
||||
for op in
|
||||
store.iter_ops_at_id_span(IdSpan::new(*client_id, span.start, span.end))
|
||||
{
|
||||
let container = container_map.get_mut(&op.op().container).unwrap();
|
||||
container.update_state_directly(hierarchy, &op);
|
||||
}
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
// TODO: can reuse this path
|
||||
let causal_visit_path: Vec<_> =
|
||||
self.iter_causal(&common_ancestors, target_spans).collect();
|
||||
if causal_visit_path
|
||||
.iter()
|
||||
.all(|x| x.retreat.is_empty() && x.forward.is_empty())
|
||||
{
|
||||
// can update containers state directly without consulting CRDT
|
||||
for iter in causal_visit_path {
|
||||
let start = iter.slice.start;
|
||||
let end = iter.slice.end;
|
||||
let change = iter.data;
|
||||
let can_skip = self.with_hierarchy(|store, hierarchy| {
|
||||
// TODO: can reuse this path
|
||||
let causal_visit_path: Vec<_> =
|
||||
store.iter_causal(&common_ancestors, target_spans).collect();
|
||||
if causal_visit_path
|
||||
.iter()
|
||||
.all(|x| x.retreat.is_empty() && x.forward.is_empty())
|
||||
{
|
||||
// can update containers state directly without consulting CRDT
|
||||
for iter in causal_visit_path {
|
||||
let start = iter.slice.start;
|
||||
let end = iter.slice.end;
|
||||
let change = iter.data;
|
||||
|
||||
for op in change.ops.iter() {
|
||||
let rich_op = RichOp::new_by_slice_on_change(change, op, start, end);
|
||||
if rich_op.atom_len() == 0 {
|
||||
continue;
|
||||
for op in change.ops.iter() {
|
||||
let rich_op = RichOp::new_by_slice_on_change(change, op, start, end);
|
||||
if rich_op.atom_len() == 0 {
|
||||
continue;
|
||||
}
|
||||
|
||||
let container = container_map.get_mut(&op.container).unwrap();
|
||||
container.update_state_directly(hierarchy, &rich_op);
|
||||
}
|
||||
|
||||
let container = container_map.get_mut(&op.container).unwrap();
|
||||
container.update_state_directly(&rich_op);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
false
|
||||
});
|
||||
|
||||
if can_skip {
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
@ -159,33 +169,38 @@ impl LogStore {
|
|||
for (_, container) in container_map.iter_mut() {
|
||||
container.tracker_checkout(&common_ancestors_vv);
|
||||
}
|
||||
for iter in self.iter_causal(&common_ancestors, next_vv.diff(&common_ancestors_vv).left) {
|
||||
let start = iter.slice.start;
|
||||
let end = iter.slice.end;
|
||||
let change = iter.data;
|
||||
debug_log!("iter {:#?}", &iter);
|
||||
// TODO: perf: we can make iter_causal returns target vv and only
|
||||
// checkout the related container to the target vv
|
||||
for (_, container) in container_map.iter_mut() {
|
||||
container.track_retreat(&iter.retreat);
|
||||
container.track_forward(&iter.forward);
|
||||
}
|
||||
|
||||
for op in change.ops.iter() {
|
||||
let rich_op = RichOp::new_by_slice_on_change(change, op, start, end);
|
||||
if rich_op.atom_len() == 0 {
|
||||
continue;
|
||||
self.with_hierarchy(|store, hierarchy| {
|
||||
for iter in
|
||||
store.iter_causal(&common_ancestors, next_vv.diff(&common_ancestors_vv).left)
|
||||
{
|
||||
let start = iter.slice.start;
|
||||
let end = iter.slice.end;
|
||||
let change = iter.data;
|
||||
debug_log!("iter {:#?}", &iter);
|
||||
// TODO: perf: we can make iter_causal returns target vv and only
|
||||
// checkout the related container to the target vv
|
||||
for (_, container) in container_map.iter_mut() {
|
||||
container.track_retreat(&iter.retreat);
|
||||
container.track_forward(&iter.forward);
|
||||
}
|
||||
|
||||
if let Some(container) = container_map.get_mut(&op.container) {
|
||||
container.track_apply(&rich_op);
|
||||
for op in change.ops.iter() {
|
||||
let rich_op = RichOp::new_by_slice_on_change(change, op, start, end);
|
||||
if rich_op.atom_len() == 0 {
|
||||
continue;
|
||||
}
|
||||
|
||||
if let Some(container) = container_map.get_mut(&op.container) {
|
||||
container.track_apply(hierarchy, &rich_op);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
debug_log!("LOGSTORE STAGE 2",);
|
||||
let path = next_vv.diff(&self.vv).left;
|
||||
let vv = self.vv.clone();
|
||||
for (_, container) in container_map.iter_mut() {
|
||||
container.apply_tracked_effects_from(&self.vv, &path);
|
||||
container.apply_tracked_effects_from(self, &vv, &path);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -233,4 +248,17 @@ impl LogStore {
|
|||
changes.retain(|_, v| !v.is_empty());
|
||||
ControlFlow::Continue(())
|
||||
}
|
||||
|
||||
// TODO: this feels like a dumb way to bypass lifetime issue, but I don't have better idea right now :(
|
||||
#[inline(always)]
|
||||
fn with_hierarchy<F, R>(&mut self, f: F) -> R
|
||||
where
|
||||
for<'any> F: FnOnce(&'any mut LogStore, &'any mut Hierarchy) -> R,
|
||||
{
|
||||
let mut hierarchy = std::mem::take(&mut self.hierarchy);
|
||||
let result = f(self, &mut hierarchy);
|
||||
assert!(self.hierarchy.is_empty());
|
||||
self.hierarchy = hierarchy;
|
||||
result
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue