mirror of
https://github.com/loro-dev/loro.git
synced 2025-01-22 12:57:20 +00:00
perf: optimize mem layout of history cache of map and movable list (experimental)
This change has not been benchmarked yet. Maybe need to be reverted.
This commit is contained in:
parent
e00337d7d8
commit
870230c3e1
8 changed files with 389 additions and 144 deletions
|
@ -1,4 +1,4 @@
|
|||
use loro::{LoroDoc, LoroMap};
|
||||
use loro::{LoroDoc, LoroMap, LoroValue};
|
||||
|
||||
pub fn init_large_sheet(size: usize) -> LoroDoc {
|
||||
assert!(size >= 100);
|
||||
|
@ -8,7 +8,11 @@ pub fn init_large_sheet(size: usize) -> LoroDoc {
|
|||
for _ in 0..size / 100 {
|
||||
let map = rows.push_container(LoroMap::new()).unwrap();
|
||||
for i in 0..100 {
|
||||
map.insert(&i.to_string(), i).unwrap();
|
||||
let sub_map = map
|
||||
.insert_container(&i.to_string(), LoroMap::new())
|
||||
.unwrap();
|
||||
sub_map.insert("value", i).unwrap();
|
||||
sub_map.insert("meta", LoroValue::Null).unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -19,6 +19,7 @@ use smallvec::SmallVec;
|
|||
use tracing::{instrument, trace, warn};
|
||||
|
||||
use crate::{
|
||||
change::Lamport,
|
||||
container::{
|
||||
idx::ContainerIdx,
|
||||
list::list_op::InnerListOp,
|
||||
|
@ -537,28 +538,37 @@ impl DiffCalculatorTrait for MapDiffCalculator {
|
|||
) -> (InternalDiff, DiffMode) {
|
||||
match self.current_mode {
|
||||
DiffMode::Checkout | DiffMode::Import => oplog.with_history_cache(|h| {
|
||||
let group = h
|
||||
.get_checkout_cache(&self.container_idx)
|
||||
.unwrap()
|
||||
.as_map()
|
||||
.unwrap();
|
||||
let checkout_index = &h.get_checkout_index().map;
|
||||
let mut changed = Vec::new();
|
||||
let from_map = checkout_index.get_container_latest_op_at_vv(
|
||||
self.container_idx,
|
||||
from,
|
||||
Lamport::MAX,
|
||||
oplog,
|
||||
);
|
||||
let mut to_map = checkout_index.get_container_latest_op_at_vv(
|
||||
self.container_idx,
|
||||
to,
|
||||
Lamport::MAX,
|
||||
oplog,
|
||||
);
|
||||
|
||||
for k in group.keys() {
|
||||
let peek_from = group.last_op(k, from);
|
||||
let peek_to = group.last_op(k, to);
|
||||
match (peek_from, peek_to) {
|
||||
(None, None) => {}
|
||||
(None, Some(_)) => changed.push((k.clone(), peek_to)),
|
||||
(Some(_), None) => changed.push((k.clone(), peek_to)),
|
||||
(Some(a), Some(b)) => {
|
||||
if a != b {
|
||||
changed.push((k.clone(), peek_to))
|
||||
for (k, peek_from) in from_map.iter() {
|
||||
let peek_to = to_map.remove(k);
|
||||
match peek_to {
|
||||
None => changed.push((k.clone(), None)),
|
||||
Some(b) => {
|
||||
if peek_from.value != b.value {
|
||||
changed.push((k.clone(), Some(b)))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (k, peek_to) in to_map.into_iter() {
|
||||
changed.push((k, Some(peek_to)));
|
||||
}
|
||||
|
||||
let mut updated =
|
||||
FxHashMap::with_capacity_and_hasher(changed.len(), Default::default());
|
||||
for (key, value) in changed {
|
||||
|
@ -566,7 +576,7 @@ impl DiffCalculatorTrait for MapDiffCalculator {
|
|||
.map(|v| {
|
||||
let value = v.value.clone();
|
||||
if let Some(LoroValue::Container(c)) = &value {
|
||||
on_new_container(c);
|
||||
on_new_container(&c);
|
||||
}
|
||||
|
||||
MapValue {
|
||||
|
@ -986,7 +996,7 @@ impl DiffCalculatorTrait for RichtextDiffCalculator {
|
|||
crate::container::list::list_op::InnerListOp::StyleEnd => {
|
||||
let id = op.id();
|
||||
// PERF: this can be sped up by caching the last style op
|
||||
let start_op = oplog.get_op(op.id().inc(-1)).unwrap();
|
||||
let start_op = oplog.get_op_that_includes(op.id().inc(-1)).unwrap();
|
||||
let InnerListOp::StyleStart {
|
||||
start: _,
|
||||
end,
|
||||
|
@ -1277,14 +1287,16 @@ impl DiffCalculatorTrait for MovableListDiffCalculator {
|
|||
let last_pos = if is_checkout {
|
||||
// TODO: PERF: this lookup can be optimized
|
||||
oplog.with_history_cache(|h| {
|
||||
let list = h
|
||||
.get_checkout_cache(&real_op.container)
|
||||
.unwrap()
|
||||
.as_movable_list()
|
||||
.unwrap();
|
||||
list.last_pos(elem_id, this.tracker.current_vv())
|
||||
.unwrap()
|
||||
.id()
|
||||
let list = &h.get_checkout_index().movable_list;
|
||||
list.last_pos(
|
||||
*elem_id,
|
||||
this.tracker.current_vv(),
|
||||
// TODO: PERF: Provide the lamport of to version
|
||||
Lamport::MAX,
|
||||
oplog,
|
||||
)
|
||||
.unwrap()
|
||||
.id()
|
||||
})
|
||||
} else {
|
||||
// When it's import or linear mode, we need to use a fake id
|
||||
|
@ -1365,7 +1377,7 @@ impl DiffCalculatorTrait for MovableListDiffCalculator {
|
|||
let mut new_insert = SmallVec::with_capacity(len);
|
||||
for i in 0..len {
|
||||
let id = id.inc(i as i32);
|
||||
let op = oplog.get_op(id.id()).unwrap();
|
||||
let op = oplog.get_op_that_includes(id.id()).unwrap();
|
||||
let elem_id = match op.content.as_list().unwrap() {
|
||||
InnerListOp::Insert { .. } => id.idlp().compact(),
|
||||
InnerListOp::Move { elem_id, .. } => elem_id.compact(),
|
||||
|
@ -1394,18 +1406,24 @@ impl DiffCalculatorTrait for MovableListDiffCalculator {
|
|||
|
||||
if is_checkout {
|
||||
oplog.with_history_cache(|history_cache| {
|
||||
let group = history_cache.get_movable_list(&self.container_idx).unwrap();
|
||||
let checkout_index = &history_cache.get_checkout_index().movable_list;
|
||||
element_changes.retain(|id, change| {
|
||||
let id = id.to_id();
|
||||
// It can be None if the target does not exist before the `to` version
|
||||
// But we don't need to calc from, because the deletion is handled by the diff from list items
|
||||
let Some(pos) = group.last_pos(&id, to) else {
|
||||
|
||||
// TODO: PERF: Provide the lamport of to version
|
||||
let Some(pos) = checkout_index.last_pos(id, to, Lamport::MAX, oplog) else {
|
||||
return false;
|
||||
};
|
||||
|
||||
let value = group.last_value(&id, to).unwrap();
|
||||
let old_pos = group.last_pos(&id, from);
|
||||
let old_value = group.last_value(&id, from);
|
||||
// TODO: PERF: Provide the lamport of to version
|
||||
let value = checkout_index
|
||||
.last_value(id, to, Lamport::MAX, oplog)
|
||||
.unwrap();
|
||||
// TODO: PERF: Provide the lamport of to version
|
||||
let old_pos = checkout_index.last_pos(id, from, Lamport::MAX, oplog);
|
||||
// TODO: PERF: Provide the lamport of to version
|
||||
let old_value = checkout_index.last_value(id, from, Lamport::MAX, oplog);
|
||||
if old_pos.is_none() && old_value.is_none() {
|
||||
if let LoroValue::Container(c) = &value.value {
|
||||
on_new_container(c);
|
||||
|
|
|
@ -56,4 +56,8 @@ impl<T: std::hash::Hash + Clone + PartialEq + Eq> ValueRegister<T> {
|
|||
pub fn unwrap_vec(self) -> Vec<T> {
|
||||
self.vec
|
||||
}
|
||||
|
||||
pub fn get_value(&self, index: usize) -> Option<&T> {
|
||||
self.vec.get(index)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
use std::{
|
||||
collections::{BTreeMap, BTreeSet},
|
||||
ops::Bound,
|
||||
sync::{Arc, Mutex},
|
||||
};
|
||||
|
||||
|
@ -7,17 +8,20 @@ use enum_as_inner::EnumAsInner;
|
|||
use enum_dispatch::enum_dispatch;
|
||||
use fxhash::FxHashMap;
|
||||
use loro_common::{
|
||||
ContainerType, Counter, HasId, HasLamport, IdLp, InternalString, LoroValue, PeerID, ID,
|
||||
ContainerType, Counter, HasId, HasLamport, IdFull, IdLp, InternalString, LoroValue, PeerID, ID,
|
||||
};
|
||||
use rle::HasLength;
|
||||
|
||||
use crate::{
|
||||
arena::SharedArena,
|
||||
change::{Change, Lamport},
|
||||
container::{idx::ContainerIdx, tree::tree_op::TreeOp},
|
||||
container::{idx::ContainerIdx, list::list_op::InnerListOp, tree::tree_op::TreeOp},
|
||||
delta::MovableListInnerDelta,
|
||||
diff_calc::tree::TreeCacheForDiff,
|
||||
encoding::value_register::ValueRegister,
|
||||
op::{InnerContent, RichOp},
|
||||
oplog::ChangeStore,
|
||||
VersionVector,
|
||||
OpLog, VersionVector,
|
||||
};
|
||||
|
||||
/// A cache for the history of a container.
|
||||
|
@ -29,40 +33,34 @@ use crate::{
|
|||
pub(crate) struct ContainerHistoryCache {
|
||||
arena: SharedArena,
|
||||
change_store: ChangeStore,
|
||||
for_checkout: Option<FxHashMap<ContainerIdx, HistoryCacheForCheckout>>,
|
||||
for_checkout: Option<ForCheckout>,
|
||||
for_importing: Option<FxHashMap<ContainerIdx, HistoryCacheForImporting>>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
pub(crate) struct ForCheckout {
|
||||
pub(crate) map: MapHistoryCache,
|
||||
pub(crate) movable_list: MovableListHistoryCache,
|
||||
}
|
||||
|
||||
impl HistoryCacheTrait for ForCheckout {
|
||||
fn insert(&mut self, op: &RichOp) {
|
||||
match op.raw_op().container.get_type() {
|
||||
ContainerType::Map => self.map.insert(op),
|
||||
ContainerType::MovableList => self.movable_list.insert(op),
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl ContainerHistoryCache {
|
||||
pub(crate) fn fork(&self, arena: SharedArena, change_store: ChangeStore) -> Self {
|
||||
let mut ans = Self {
|
||||
Self {
|
||||
arena,
|
||||
change_store,
|
||||
for_checkout: None,
|
||||
for_importing: None,
|
||||
};
|
||||
|
||||
if let Some(old_for_checkout) = &self.for_checkout {
|
||||
let mut for_checkout =
|
||||
FxHashMap::with_capacity_and_hasher(old_for_checkout.len(), Default::default());
|
||||
for (container_idx, group) in old_for_checkout.iter() {
|
||||
for_checkout.insert(*container_idx, group.fork(&ans.arena));
|
||||
}
|
||||
|
||||
ans.for_checkout = Some(for_checkout);
|
||||
}
|
||||
|
||||
if let Some(old_for_importing) = &self.for_importing {
|
||||
let mut for_importing =
|
||||
FxHashMap::with_capacity_and_hasher(old_for_importing.len(), Default::default());
|
||||
for (container_idx, group) in old_for_importing.iter() {
|
||||
for_importing.insert(*container_idx, group.fork(&ans.arena));
|
||||
}
|
||||
|
||||
ans.for_importing = Some(for_importing);
|
||||
}
|
||||
|
||||
ans
|
||||
}
|
||||
|
||||
pub(crate) fn new(arena: SharedArena, change_store: ChangeStore) -> Self {
|
||||
|
@ -89,23 +87,8 @@ impl ContainerHistoryCache {
|
|||
ContainerType::Map | ContainerType::MovableList
|
||||
if self.for_checkout.is_some() && for_checkout =>
|
||||
{
|
||||
let container_idx = op.container;
|
||||
let rich_op = RichOp::new_by_change(change, op);
|
||||
let manager = self
|
||||
.for_checkout
|
||||
.as_mut()
|
||||
.unwrap()
|
||||
.entry(container_idx)
|
||||
.or_insert_with(|| match op.container.get_type() {
|
||||
ContainerType::Map => {
|
||||
HistoryCacheForCheckout::Map(MapOpGroup::default())
|
||||
}
|
||||
ContainerType::MovableList => HistoryCacheForCheckout::MovableList(
|
||||
MovableListOpGroup::new(self.arena.clone()),
|
||||
),
|
||||
_ => unreachable!(),
|
||||
});
|
||||
manager.insert(&rich_op)
|
||||
self.for_checkout.as_mut().unwrap().insert(&rich_op)
|
||||
}
|
||||
ContainerType::Tree if self.for_importing.is_some() && for_importing => {
|
||||
let container_idx = op.container;
|
||||
|
@ -128,11 +111,16 @@ impl ContainerHistoryCache {
|
|||
}
|
||||
}
|
||||
|
||||
pub(crate) fn get_checkout_index(&mut self) -> &ForCheckout {
|
||||
self.ensure_all_caches_exist();
|
||||
self.for_checkout.as_ref().unwrap()
|
||||
}
|
||||
|
||||
pub(crate) fn ensure_all_caches_exist(&mut self) {
|
||||
let mut record_for_checkout = false;
|
||||
let mut record_for_importing = false;
|
||||
if self.for_checkout.is_none() {
|
||||
self.for_checkout = Some(FxHashMap::default());
|
||||
self.for_checkout = Some(ForCheckout::default());
|
||||
record_for_checkout = true;
|
||||
}
|
||||
|
||||
|
@ -168,23 +156,8 @@ impl ContainerHistoryCache {
|
|||
ContainerType::Map | ContainerType::MovableList
|
||||
if self.for_checkout.is_some() && for_checkout =>
|
||||
{
|
||||
let container_idx = op.container;
|
||||
let rich_op = RichOp::new_by_change(c, op);
|
||||
let manager = self
|
||||
.for_checkout
|
||||
.as_mut()
|
||||
.unwrap()
|
||||
.entry(container_idx)
|
||||
.or_insert_with(|| match op.container.get_type() {
|
||||
ContainerType::Map => {
|
||||
HistoryCacheForCheckout::Map(MapOpGroup::default())
|
||||
}
|
||||
ContainerType::MovableList => HistoryCacheForCheckout::MovableList(
|
||||
MovableListOpGroup::new(self.arena.clone()),
|
||||
),
|
||||
_ => unreachable!(),
|
||||
});
|
||||
manager.insert(&rich_op)
|
||||
self.for_checkout.as_mut().unwrap().insert(&rich_op)
|
||||
}
|
||||
ContainerType::Tree if self.for_importing.is_some() && for_importing => {
|
||||
let container_idx = op.container;
|
||||
|
@ -208,14 +181,6 @@ impl ContainerHistoryCache {
|
|||
});
|
||||
}
|
||||
|
||||
pub(crate) fn get_checkout_cache(
|
||||
&mut self,
|
||||
container_idx: &ContainerIdx,
|
||||
) -> Option<&HistoryCacheForCheckout> {
|
||||
self.ensure_all_caches_exist();
|
||||
self.for_checkout.as_ref().unwrap().get(container_idx)
|
||||
}
|
||||
|
||||
pub(crate) fn get_importing_cache_unsafe(
|
||||
&self,
|
||||
container_idx: &ContainerIdx,
|
||||
|
@ -223,21 +188,6 @@ impl ContainerHistoryCache {
|
|||
self.for_importing.as_ref().unwrap().get(container_idx)
|
||||
}
|
||||
|
||||
pub(crate) fn get_movable_list(
|
||||
&mut self,
|
||||
container_idx: &ContainerIdx,
|
||||
) -> Option<&MovableListOpGroup> {
|
||||
self.ensure_all_caches_exist();
|
||||
self.for_checkout
|
||||
.as_ref()
|
||||
.unwrap()
|
||||
.get(container_idx)
|
||||
.and_then(|group| match group {
|
||||
HistoryCacheForCheckout::MovableList(movable_list) => Some(movable_list),
|
||||
_ => None,
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) fn get_tree(&mut self, container_idx: &ContainerIdx) -> Option<&TreeOpGroup> {
|
||||
self.ensure_importing_caches_exist();
|
||||
self.for_importing
|
||||
|
@ -259,19 +209,6 @@ impl ContainerHistoryCache {
|
|||
})
|
||||
}
|
||||
|
||||
#[allow(unused)]
|
||||
pub(crate) fn get_map(&mut self, container_idx: &ContainerIdx) -> Option<&MapOpGroup> {
|
||||
self.ensure_all_caches_exist();
|
||||
self.for_checkout
|
||||
.as_ref()
|
||||
.unwrap()
|
||||
.get(container_idx)
|
||||
.and_then(|group| match group {
|
||||
HistoryCacheForCheckout::Map(map) => Some(map),
|
||||
_ => None,
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) fn has_cache(&self) -> bool {
|
||||
self.for_checkout.is_some()
|
||||
}
|
||||
|
@ -309,18 +246,15 @@ impl HistoryCacheForCheckout {
|
|||
}
|
||||
|
||||
impl HistoryCacheForImporting {
|
||||
fn fork(&self, a: &SharedArena) -> Self {
|
||||
fn insert(&mut self, op: &RichOp) {
|
||||
match self {
|
||||
HistoryCacheForImporting::Tree(t) => HistoryCacheForImporting::Tree(TreeOpGroup {
|
||||
ops: t.ops.clone(),
|
||||
tree_for_diff: Arc::new(Mutex::new(Default::default())),
|
||||
}),
|
||||
HistoryCacheForImporting::Tree(t) => t.insert(op),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[enum_dispatch]
|
||||
trait OpGroupTrait {
|
||||
trait HistoryCacheTrait {
|
||||
fn insert(&mut self, op: &RichOp);
|
||||
}
|
||||
|
||||
|
@ -364,6 +298,101 @@ impl<T> Ord for GroupedMapOpInfo<T> {
|
|||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Ord, PartialOrd, Eq, PartialEq, Clone)]
|
||||
struct MapHistoryCacheEntry {
|
||||
container: ContainerIdx,
|
||||
key: u32,
|
||||
lamport: Lamport,
|
||||
peer: PeerID,
|
||||
counter: Counter,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
pub(crate) struct MapHistoryCache {
|
||||
keys: ValueRegister<InternalString>,
|
||||
map: BTreeSet<MapHistoryCacheEntry>,
|
||||
}
|
||||
|
||||
impl HistoryCacheTrait for MapHistoryCache {
|
||||
fn insert(&mut self, op: &RichOp) {
|
||||
let container = op.raw_op().container;
|
||||
let key = match &op.raw_op().content {
|
||||
InnerContent::Map(map) => map.key.clone(),
|
||||
_ => unreachable!(),
|
||||
};
|
||||
|
||||
let key_idx = self.keys.register(&key);
|
||||
self.map.insert(MapHistoryCacheEntry {
|
||||
container,
|
||||
key: key_idx as u32,
|
||||
lamport: op.lamport(),
|
||||
peer: op.peer,
|
||||
counter: op.counter(),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
impl MapHistoryCache {
|
||||
pub fn get_container_latest_op_at_vv(
|
||||
&self,
|
||||
container: ContainerIdx,
|
||||
vv: &VersionVector,
|
||||
// PERF: utilize this lamport
|
||||
_max_lamport: Lamport,
|
||||
oplog: &OpLog,
|
||||
) -> FxHashMap<InternalString, GroupedMapOpInfo> {
|
||||
let mut ans = FxHashMap::default();
|
||||
let mut last_key = u32::MAX;
|
||||
|
||||
'outer: loop {
|
||||
let range = (
|
||||
Bound::Included(MapHistoryCacheEntry {
|
||||
container,
|
||||
key: 0,
|
||||
lamport: 0,
|
||||
peer: 0,
|
||||
counter: 0,
|
||||
}),
|
||||
Bound::Excluded(MapHistoryCacheEntry {
|
||||
container,
|
||||
key: last_key,
|
||||
lamport: 0,
|
||||
peer: 0,
|
||||
counter: 0,
|
||||
}),
|
||||
);
|
||||
|
||||
for entry in self.map.range(range).rev() {
|
||||
if vv.get(&entry.peer).copied().unwrap_or(0) > entry.counter {
|
||||
let id = ID::new(entry.peer, entry.counter);
|
||||
let op = oplog.get_op_that_includes(id).unwrap();
|
||||
debug_assert_eq!(op.atom_len(), 1);
|
||||
match &op.content {
|
||||
InnerContent::Map(map) => {
|
||||
ans.insert(
|
||||
self.keys.get_value(entry.key as usize).unwrap().clone(),
|
||||
GroupedMapOpInfo {
|
||||
value: map.value.clone(),
|
||||
counter: id.counter,
|
||||
lamport: entry.lamport,
|
||||
peer: entry.peer,
|
||||
},
|
||||
);
|
||||
}
|
||||
_ => unreachable!(),
|
||||
}
|
||||
last_key = entry.key;
|
||||
continue 'outer;
|
||||
}
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
ans
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Default, Clone)]
|
||||
pub(crate) struct MapOpGroup {
|
||||
ops: FxHashMap<InternalString, SmallSet<GroupedMapOpInfo>>,
|
||||
|
@ -387,7 +416,7 @@ impl MapOpGroup {
|
|||
}
|
||||
}
|
||||
|
||||
impl OpGroupTrait for MapOpGroup {
|
||||
impl HistoryCacheTrait for MapOpGroup {
|
||||
fn insert(&mut self, op: &RichOp) {
|
||||
let key = match &op.raw_op().content {
|
||||
InnerContent::Map(map) => map.key.clone(),
|
||||
|
@ -442,7 +471,7 @@ pub(crate) struct TreeOpGroup {
|
|||
pub(crate) tree_for_diff: Arc<Mutex<TreeCacheForDiff>>,
|
||||
}
|
||||
|
||||
impl OpGroupTrait for TreeOpGroup {
|
||||
impl HistoryCacheTrait for TreeOpGroup {
|
||||
fn insert(&mut self, op: &RichOp) {
|
||||
let tree_op = op.raw_op().content.as_tree().unwrap();
|
||||
let entry = self.ops.entry(op.lamport()).or_default();
|
||||
|
@ -454,6 +483,171 @@ impl OpGroupTrait for TreeOpGroup {
|
|||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
pub(crate) struct MovableListHistoryCache {
|
||||
pub(crate) move_set: BTreeSet<MovableListInnerDeltaEntry>,
|
||||
pub(crate) set_set: BTreeSet<MovableListInnerDeltaEntry>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Ord, PartialOrd, Eq, PartialEq, Clone)]
|
||||
struct MovableListInnerDeltaEntry {
|
||||
element_lamport: Lamport,
|
||||
element_peer: PeerID,
|
||||
lamport: Lamport,
|
||||
peer: PeerID,
|
||||
counter: Counter,
|
||||
}
|
||||
|
||||
impl HistoryCacheTrait for MovableListHistoryCache {
|
||||
fn insert(&mut self, op: &RichOp) {
|
||||
let cur_id = op.id_full();
|
||||
match &op.op().content {
|
||||
InnerContent::List(l) => match l {
|
||||
crate::container::list::list_op::InnerListOp::Move { from, elem_id, to } => {
|
||||
self.move_set.insert(MovableListInnerDeltaEntry {
|
||||
element_lamport: elem_id.lamport,
|
||||
element_peer: elem_id.peer,
|
||||
lamport: cur_id.lamport,
|
||||
peer: cur_id.peer,
|
||||
counter: cur_id.counter,
|
||||
});
|
||||
}
|
||||
crate::container::list::list_op::InnerListOp::Set { elem_id, value } => {
|
||||
self.set_set.insert(MovableListInnerDeltaEntry {
|
||||
element_lamport: elem_id.lamport,
|
||||
element_peer: elem_id.peer,
|
||||
lamport: cur_id.lamport,
|
||||
peer: cur_id.peer,
|
||||
counter: cur_id.counter,
|
||||
});
|
||||
}
|
||||
_ => {}
|
||||
},
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl MovableListHistoryCache {
|
||||
pub(crate) fn last_value(
|
||||
&self,
|
||||
key: IdLp,
|
||||
vv: &VersionVector,
|
||||
max_lamport: Lamport,
|
||||
oplog: &OpLog,
|
||||
) -> Option<GroupedMapOpInfo<LoroValue>> {
|
||||
self.set_set
|
||||
.range((
|
||||
Bound::Included(MovableListInnerDeltaEntry {
|
||||
element_lamport: key.lamport,
|
||||
element_peer: key.peer,
|
||||
lamport: 0,
|
||||
peer: 0,
|
||||
counter: 0,
|
||||
}),
|
||||
Bound::Excluded(MovableListInnerDeltaEntry {
|
||||
element_lamport: key.lamport,
|
||||
element_peer: key.peer,
|
||||
lamport: max_lamport,
|
||||
peer: PeerID::MAX,
|
||||
counter: Counter::MAX,
|
||||
}),
|
||||
))
|
||||
.rev()
|
||||
.find(|e| vv.get(&e.peer).copied().unwrap_or(0) > e.counter)
|
||||
.map_or_else(
|
||||
|| {
|
||||
let id = oplog.idlp_to_id(key).unwrap();
|
||||
if vv.get(&id.peer).copied().unwrap_or(0) <= id.counter {
|
||||
return None;
|
||||
}
|
||||
|
||||
let op = oplog.get_op_that_includes(id).unwrap();
|
||||
let offset = id.counter - op.counter;
|
||||
match &op.content {
|
||||
InnerContent::List(InnerListOp::Insert { slice, .. }) => {
|
||||
let value = oplog
|
||||
.arena
|
||||
.get_value(slice.0.start as usize + offset as usize)
|
||||
.unwrap();
|
||||
Some(GroupedMapOpInfo {
|
||||
value,
|
||||
counter: id.counter,
|
||||
lamport: key.lamport,
|
||||
peer: id.peer,
|
||||
})
|
||||
}
|
||||
_ => {
|
||||
unreachable!()
|
||||
}
|
||||
}
|
||||
},
|
||||
|e| {
|
||||
let id = ID::new(e.peer, e.counter);
|
||||
let op = oplog.get_op_that_includes(id).unwrap();
|
||||
debug_assert_eq!(op.atom_len(), 1);
|
||||
let lamport = op.lamport();
|
||||
match &op.content {
|
||||
InnerContent::List(InnerListOp::Set { value, .. }) => {
|
||||
Some(GroupedMapOpInfo {
|
||||
value: value.clone(),
|
||||
counter: id.counter,
|
||||
lamport,
|
||||
peer: id.peer,
|
||||
})
|
||||
}
|
||||
_ => {
|
||||
unreachable!()
|
||||
}
|
||||
}
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
pub(crate) fn last_pos(
|
||||
&self,
|
||||
key: IdLp,
|
||||
vv: &VersionVector,
|
||||
max_lamport: Lamport,
|
||||
oplog: &OpLog,
|
||||
) -> Option<IdFull> {
|
||||
self.move_set
|
||||
.range((
|
||||
Bound::Included(MovableListInnerDeltaEntry {
|
||||
element_lamport: key.lamport,
|
||||
element_peer: key.peer,
|
||||
lamport: 0,
|
||||
peer: 0,
|
||||
counter: 0,
|
||||
}),
|
||||
Bound::Excluded(MovableListInnerDeltaEntry {
|
||||
element_lamport: key.lamport,
|
||||
element_peer: key.peer,
|
||||
lamport: max_lamport,
|
||||
peer: PeerID::MAX,
|
||||
counter: Counter::MAX,
|
||||
}),
|
||||
))
|
||||
.rev()
|
||||
.find(|e| vv.get(&e.peer).copied().unwrap_or(0) > e.counter)
|
||||
.map_or_else(
|
||||
|| {
|
||||
let id = oplog.idlp_to_id(key).unwrap();
|
||||
if vv.get(&id.peer).copied().unwrap_or(0) > id.counter {
|
||||
Some(IdFull::new(id.peer, id.counter, key.lamport))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
},
|
||||
|e| {
|
||||
let id = ID::new(e.peer, e.counter);
|
||||
let lamport = oplog.get_lamport_at(id).unwrap();
|
||||
Some(IdFull::new(e.peer, e.counter, lamport))
|
||||
},
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub(crate) struct MovableListOpGroup {
|
||||
arena: SharedArena,
|
||||
|
@ -504,7 +698,7 @@ impl MovableListTarget {
|
|||
}
|
||||
}
|
||||
|
||||
impl OpGroupTrait for MovableListOpGroup {
|
||||
impl HistoryCacheTrait for MovableListOpGroup {
|
||||
fn insert(&mut self, op: &RichOp) {
|
||||
let start_id = op.id_full().idlp();
|
||||
match &op.op().content {
|
||||
|
|
|
@ -313,6 +313,10 @@ impl<'a> RichOp<'a> {
|
|||
self.end
|
||||
}
|
||||
|
||||
pub fn counter(&self) -> Counter {
|
||||
self.op.counter + self.start as Counter
|
||||
}
|
||||
|
||||
#[allow(unused)]
|
||||
pub(crate) fn id(&self) -> ID {
|
||||
ID {
|
||||
|
|
|
@ -715,7 +715,8 @@ impl OpLog {
|
|||
loro_common::IdLp { peer, lamport }
|
||||
}
|
||||
|
||||
pub(crate) fn get_op(&self, id: ID) -> Option<BlockOpRef> {
|
||||
/// NOTE: This may return a op that includes the given id, not necessarily start with the given id
|
||||
pub(crate) fn get_op_that_includes(&self, id: ID) -> Option<BlockOpRef> {
|
||||
let change = self.get_change_at(id)?;
|
||||
change.get_op_with_counter(id.counter)
|
||||
}
|
||||
|
|
|
@ -271,8 +271,13 @@ impl ChangeStore {
|
|||
pub fn get_change_by_idlp(&self, idlp: IdLp) -> Option<BlockChangeRef> {
|
||||
let mut kv = self.mem_parsed_kv.lock().unwrap();
|
||||
let mut iter = kv.range_mut(ID::new(idlp.peer, 0)..ID::new(idlp.peer, i32::MAX));
|
||||
// FIX: PERF: this is super slow
|
||||
while let Some((_id, block)) = iter.next_back() {
|
||||
if block.lamport_range.0 <= idlp.lamport {
|
||||
if block.lamport_range.1 < idlp.lamport {
|
||||
break;
|
||||
}
|
||||
|
||||
block
|
||||
.ensure_changes(&self.arena)
|
||||
.expect("Parse block error");
|
||||
|
@ -489,6 +494,14 @@ impl Deref for BlockOpRef {
|
|||
}
|
||||
}
|
||||
|
||||
impl BlockOpRef {
|
||||
pub fn lamport(&self) -> Lamport {
|
||||
let change = &self.block.content.try_changes().unwrap()[self.change_index];
|
||||
let op = &change.ops[self.op_index];
|
||||
(op.counter - change.id.counter) as Lamport + change.lamport
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct ChangesBlock {
|
||||
peer: PeerID,
|
||||
|
@ -683,6 +696,8 @@ impl ChangesBlock {
|
|||
if not_found == 0 {
|
||||
None
|
||||
} else {
|
||||
// NOTE: we somehow need to return it even if we cannot find the perfect match
|
||||
// need to check which part relies on this behavior
|
||||
Some(not_found - 1)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -584,10 +584,15 @@ mod inner {
|
|||
};
|
||||
if let Some(leaf) = self.id_to_list_leaf.get(&new_pos) {
|
||||
ans.new_list_item_leaf = Some(*leaf);
|
||||
self.list.update_leaf(*leaf, |elem| {
|
||||
ans.activate_new_list_item = elem.pointed_by.is_none();
|
||||
debug_assert!(ans.activate_new_list_item);
|
||||
elem.pointed_by = Some(elem_id);
|
||||
self.list.update_leaf(*leaf, |list_item| {
|
||||
debug_assert!(
|
||||
list_item.pointed_by.is_none(),
|
||||
"list_item was pointed by {:?} but need to be changed to {:?}",
|
||||
list_item.pointed_by,
|
||||
elem_id
|
||||
);
|
||||
ans.activate_new_list_item = list_item.pointed_by.is_none();
|
||||
list_item.pointed_by = Some(elem_id);
|
||||
(true, None, None)
|
||||
});
|
||||
} else {
|
||||
|
|
Loading…
Reference in a new issue