mirror of
https://github.com/loro-dev/loro.git
synced 2025-01-22 21:07:43 +00:00
chore: init apply logic
This commit is contained in:
parent
d924e9fac8
commit
0bcf19038d
10 changed files with 272 additions and 68 deletions
|
@ -1,21 +1,19 @@
|
|||
use rle::{HasLength, RleVec};
|
||||
use rle::HasLength;
|
||||
|
||||
use crate::{
|
||||
container::{list::list_op::ListOp, text::tracker::yata::YataImpl},
|
||||
id::{Counter, ID},
|
||||
op::{Op},
|
||||
op::Op,
|
||||
span::IdSpan,
|
||||
VersionVector,
|
||||
};
|
||||
|
||||
use self::{
|
||||
content_map::ContentMap,
|
||||
cursor_map::{make_notify, CursorMap},
|
||||
cursor_map::{make_notify, CursorMap, IdSpanQueryResult},
|
||||
y_span::{Status, StatusChange, YSpan},
|
||||
};
|
||||
|
||||
|
||||
|
||||
mod content_map;
|
||||
mod cursor_map;
|
||||
mod y_span;
|
||||
|
@ -35,7 +33,10 @@ pub mod yata;
|
|||
pub struct Tracker {
|
||||
// #[cfg(feature = "fuzzing")]
|
||||
client_id: u64,
|
||||
vv: VersionVector,
|
||||
/// all applied operations version vector
|
||||
all_vv: VersionVector,
|
||||
/// current content version vector
|
||||
head_vv: VersionVector,
|
||||
content: ContentMap,
|
||||
id_to_cursor: CursorMap,
|
||||
}
|
||||
|
@ -69,7 +70,8 @@ impl Tracker {
|
|||
content,
|
||||
id_to_cursor,
|
||||
client_id: 0,
|
||||
vv: Default::default(),
|
||||
head_vv: Default::default(),
|
||||
all_vv: Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -99,30 +101,105 @@ impl Tracker {
|
|||
self.id_to_cursor.debug_check();
|
||||
}
|
||||
|
||||
fn turn_on(&mut self, _id: IdSpan) {}
|
||||
fn turn_off(&mut self, _id: IdSpan) {}
|
||||
fn checkout(&mut self, _vv: VersionVector) {}
|
||||
fn checkout(&mut self, vv: VersionVector) {
|
||||
let to_retreat = self.head_vv.clone() - vv.clone();
|
||||
let to_forward = vv.clone() - self.head_vv.clone();
|
||||
self.retreat(&to_retreat);
|
||||
self.forward(&to_forward);
|
||||
self.head_vv = vv;
|
||||
}
|
||||
|
||||
fn forward(&mut self, spans: &[IdSpan]) {
|
||||
let mut to_set_as_applied = Vec::with_capacity(spans.len());
|
||||
let mut to_delete = Vec::with_capacity(spans.len());
|
||||
for span in spans.iter() {
|
||||
let IdSpanQueryResult {
|
||||
mut inserts,
|
||||
deletes,
|
||||
} = self.id_to_cursor.get_cursor_at_id_span(*span);
|
||||
for delete in deletes {
|
||||
for deleted_span in delete.iter() {
|
||||
to_delete.append(
|
||||
&mut self
|
||||
.id_to_cursor
|
||||
.get_cursor_at_id_span(*deleted_span)
|
||||
.inserts,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
to_set_as_applied.append(&mut inserts);
|
||||
}
|
||||
|
||||
self.content.update_at_cursors_twice(
|
||||
&[&to_set_as_applied, &to_delete],
|
||||
&mut |v| {
|
||||
v.status.apply(StatusChange::SetAsFuture);
|
||||
},
|
||||
&mut |v: &mut YSpan| {
|
||||
v.status.apply(StatusChange::UndoDelete);
|
||||
},
|
||||
&mut make_notify(&mut self.id_to_cursor),
|
||||
)
|
||||
}
|
||||
|
||||
fn retreat(&mut self, spans: &[IdSpan]) {
|
||||
let mut to_set_as_future = Vec::with_capacity(spans.len());
|
||||
let mut to_undo_delete = Vec::with_capacity(spans.len());
|
||||
for span in spans.iter() {
|
||||
let IdSpanQueryResult {
|
||||
mut inserts,
|
||||
deletes,
|
||||
} = self.id_to_cursor.get_cursor_at_id_span(*span);
|
||||
for delete in deletes {
|
||||
for deleted_span in delete.iter() {
|
||||
to_undo_delete.append(
|
||||
&mut self
|
||||
.id_to_cursor
|
||||
.get_cursor_at_id_span(*deleted_span)
|
||||
.inserts,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
to_set_as_future.append(&mut inserts);
|
||||
}
|
||||
|
||||
self.content.update_at_cursors_twice(
|
||||
&[&to_set_as_future, &to_undo_delete],
|
||||
&mut |v| {
|
||||
v.status.apply(StatusChange::SetAsFuture);
|
||||
},
|
||||
&mut |v: &mut YSpan| {
|
||||
v.status.apply(StatusChange::UndoDelete);
|
||||
},
|
||||
&mut make_notify(&mut self.id_to_cursor),
|
||||
)
|
||||
}
|
||||
|
||||
/// apply an operation directly to the current tracker
|
||||
fn apply(&mut self, op: &Op) {
|
||||
assert_eq!(*self.vv.get(&op.id.client_id).unwrap_or(&0), op.id.counter);
|
||||
self.vv.set_end(op.id.inc(op.len() as i32));
|
||||
if self.all_vv.includes_id(op.id.inc(op.len() as i32 - 1)) {}
|
||||
assert_eq!(
|
||||
*self.head_vv.get(&op.id.client_id).unwrap_or(&0),
|
||||
op.id.counter
|
||||
);
|
||||
self.head_vv.set_end(op.id.inc(op.len() as i32));
|
||||
let id = op.id;
|
||||
match &op.content {
|
||||
crate::op::OpContent::Normal { content } => {
|
||||
if let Some(text_content) = content.as_list() {
|
||||
match text_content {
|
||||
ListOp::Insert { slice, pos } => {
|
||||
let yspan = self.content.get_yspan_at_pos(id, *pos, slice.len());
|
||||
// SAFETY: we know this is safe because in [YataImpl::insert_after] there is no access to shared elements
|
||||
unsafe { crdt_list::yata::integrate::<YataImpl>(self, yspan) };
|
||||
}
|
||||
ListOp::Delete { pos, len } => {
|
||||
let spans = self.content.get_id_spans(*pos, *len);
|
||||
self.update_spans(&spans, StatusChange::Delete);
|
||||
self.id_to_cursor
|
||||
.set((id).into(), cursor_map::Marker::Delete(spans));
|
||||
}
|
||||
let text_content = content.as_list().expect("Content is not for list");
|
||||
match text_content {
|
||||
ListOp::Insert { slice, pos } => {
|
||||
let yspan = self.content.get_yspan_at_pos(id, *pos, slice.len());
|
||||
// SAFETY: we know this is safe because in [YataImpl::insert_after] there is no access to shared elements
|
||||
unsafe { crdt_list::yata::integrate::<YataImpl>(self, yspan) };
|
||||
}
|
||||
ListOp::Delete { pos, len } => {
|
||||
let spans = self.content.get_id_spans(*pos, *len);
|
||||
self.update_spans(&spans, StatusChange::Delete);
|
||||
self.id_to_cursor
|
||||
.set((id).into(), cursor_map::Marker::Delete(spans));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -131,22 +208,11 @@ impl Tracker {
|
|||
}
|
||||
}
|
||||
|
||||
pub fn update_spans(&mut self, spans: &RleVec<IdSpan>, change: StatusChange) {
|
||||
fn update_spans(&mut self, spans: &[IdSpan], change: StatusChange) {
|
||||
let mut cursors = Vec::new();
|
||||
for span in spans.iter() {
|
||||
let mut group = Vec::new();
|
||||
for marker in self
|
||||
.id_to_cursor
|
||||
.get_range(span.min_id().into(), span.end_id().into())
|
||||
{
|
||||
for cursor in marker.get_spans(*span) {
|
||||
if !group.contains(&cursor) {
|
||||
group.push(cursor);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
cursors.append(&mut group);
|
||||
let mut inserts = self.id_to_cursor.get_cursor_at_id_span(*span).inserts;
|
||||
cursors.append(&mut inserts);
|
||||
}
|
||||
|
||||
self.content.update_at_cursors(
|
||||
|
|
|
@ -84,7 +84,7 @@ impl ContentMap {
|
|||
let mut next_cursor = cursor.next_elem_start();
|
||||
let mut ans = None;
|
||||
while let Some(next_inner) = next_cursor {
|
||||
if next_inner.as_ref().status.unapplied {
|
||||
if next_inner.as_ref().status.future {
|
||||
let mut cursor = next_inner.unwrap();
|
||||
cursor.offset = 0;
|
||||
cursor.pos = Position::Start;
|
||||
|
@ -97,7 +97,7 @@ impl ContentMap {
|
|||
|
||||
(prev, ans)
|
||||
} else {
|
||||
while cursor.as_ref().status.unapplied {
|
||||
while cursor.as_ref().status.future {
|
||||
if let Some(next) = cursor.next_elem_start() {
|
||||
cursor = next;
|
||||
} else {
|
||||
|
|
|
@ -1,7 +1,12 @@
|
|||
use std::{fmt::Debug, ptr::NonNull};
|
||||
use std::{
|
||||
fmt::Debug,
|
||||
ops::{Deref, DerefMut},
|
||||
ptr::NonNull,
|
||||
};
|
||||
|
||||
use enum_as_inner::EnumAsInner;
|
||||
|
||||
use moveit::DerefMove;
|
||||
use rle::{
|
||||
range_map::RangeMap,
|
||||
rle_tree::{node::LeafNode, Position, SafeCursor, SafeCursorMut, UnsafeCursor},
|
||||
|
@ -21,7 +26,7 @@ pub(super) enum Marker {
|
|||
len: usize,
|
||||
},
|
||||
Delete(RleVec<IdSpan>),
|
||||
// TODO: REDO, UNDO
|
||||
// FUTURE: REDO, UNDO
|
||||
}
|
||||
|
||||
impl ZeroElement for Marker {
|
||||
|
@ -172,7 +177,21 @@ impl Mergable for Marker {
|
|||
}
|
||||
}
|
||||
|
||||
pub(super) type CursorMap = RangeMap<u128, Marker>;
|
||||
#[derive(Debug, Default)]
|
||||
pub(super) struct CursorMap(RangeMap<u128, Marker>);
|
||||
|
||||
impl Deref for CursorMap {
|
||||
type Target = RangeMap<u128, Marker>;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.0
|
||||
}
|
||||
}
|
||||
impl DerefMut for CursorMap {
|
||||
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||
&mut self.0
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) fn make_notify(
|
||||
map: &mut CursorMap,
|
||||
|
@ -190,3 +209,31 @@ pub(super) fn make_notify(
|
|||
)
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) struct IdSpanQueryResult<'a> {
|
||||
pub inserts: Vec<UnsafeCursor<'static, YSpan, YSpanTreeTrait>>,
|
||||
pub deletes: Vec<&'a RleVec<IdSpan>>,
|
||||
}
|
||||
|
||||
impl CursorMap {
|
||||
pub fn get_cursor_at_id_span(&self, span: IdSpan) -> IdSpanQueryResult {
|
||||
let mut inserts = Vec::new();
|
||||
let mut deletes = Vec::new();
|
||||
for marker in self.get_range(span.min_id().into(), span.end_id().into()) {
|
||||
match marker {
|
||||
Marker::Insert { .. } => {
|
||||
for cursor in marker.get_spans(span) {
|
||||
if !inserts.contains(&cursor) {
|
||||
inserts.push(cursor);
|
||||
}
|
||||
}
|
||||
}
|
||||
Marker::Delete(del) => {
|
||||
deletes.push(del);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
IdSpanQueryResult { inserts, deletes }
|
||||
}
|
||||
}
|
||||
|
|
|
@ -3,7 +3,8 @@ use rle::{rle_tree::tree_trait::CumulateTreeTrait, HasLength, Mergable, Sliceabl
|
|||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Default, Hash)]
|
||||
pub struct Status {
|
||||
pub unapplied: bool,
|
||||
/// is this span from a future operation
|
||||
pub future: bool,
|
||||
pub delete_times: usize,
|
||||
pub undo_times: usize,
|
||||
}
|
||||
|
@ -12,7 +13,7 @@ impl Status {
|
|||
#[inline]
|
||||
pub fn new() -> Self {
|
||||
Status {
|
||||
unapplied: false,
|
||||
future: false,
|
||||
delete_times: 0,
|
||||
undo_times: 0,
|
||||
}
|
||||
|
@ -20,7 +21,7 @@ impl Status {
|
|||
|
||||
#[inline]
|
||||
pub fn is_activated(&self) -> bool {
|
||||
!self.unapplied && self.delete_times == 0 && self.undo_times == 0
|
||||
!self.future && self.delete_times == 0 && self.undo_times == 0
|
||||
}
|
||||
|
||||
/// Return whether the activation changed
|
||||
|
@ -28,8 +29,8 @@ impl Status {
|
|||
pub fn apply(&mut self, change: StatusChange) -> bool {
|
||||
let activated = self.is_activated();
|
||||
match change {
|
||||
StatusChange::Apply => self.unapplied = false,
|
||||
StatusChange::PreApply => self.unapplied = true,
|
||||
StatusChange::SetAsCurrent => self.future = false,
|
||||
StatusChange::SetAsFuture => self.future = true,
|
||||
StatusChange::Redo => self.undo_times -= 1,
|
||||
StatusChange::Undo => self.undo_times += 1,
|
||||
StatusChange::Delete => self.delete_times += 1,
|
||||
|
@ -51,8 +52,8 @@ pub struct YSpan {
|
|||
|
||||
#[derive(Clone, Copy, Debug)]
|
||||
pub enum StatusChange {
|
||||
Apply,
|
||||
PreApply,
|
||||
SetAsCurrent,
|
||||
SetAsFuture,
|
||||
Redo,
|
||||
Undo,
|
||||
Delete,
|
||||
|
|
|
@ -182,7 +182,7 @@ pub mod fuzz {
|
|||
|
||||
impl TestFramework for YataImpl {
|
||||
fn integrate(container: &mut Self::Container, op: Self::OpUnit) {
|
||||
container.vv.set_end(op.id.inc(op.len as i32));
|
||||
container.head_vv.set_end(op.id.inc(op.len as i32));
|
||||
// SAFETY: we know this is safe because in [YataImpl::insert_after] there is no access to shared elements
|
||||
unsafe { crdt_list::yata::integrate::<Self>(container, op) };
|
||||
}
|
||||
|
@ -190,18 +190,18 @@ pub mod fuzz {
|
|||
#[inline]
|
||||
fn can_integrate(container: &Self::Container, op: &Self::OpUnit) -> bool {
|
||||
if let Some(value) = op.origin_left {
|
||||
if !value.is_unknown() && !container.vv.includes(value) {
|
||||
if !value.is_unknown() && !container.head_vv.includes_id(value) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(value) = op.origin_right {
|
||||
if !value.is_unknown() && !container.vv.includes(value) {
|
||||
if !value.is_unknown() && !container.head_vv.includes_id(value) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
if op.id.counter != 0 && !container.vv.includes(op.id.inc(-1)) {
|
||||
if op.id.counter != 0 && !container.head_vv.includes_id(op.id.inc(-1)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -255,7 +255,7 @@ pub mod fuzz {
|
|||
let ans = container.content.get_yspan_at_pos(
|
||||
ID::new(
|
||||
container.client_id,
|
||||
*container.vv.get(&container.client_id).unwrap_or(&0),
|
||||
*container.head_vv.get(&container.client_id).unwrap_or(&0),
|
||||
),
|
||||
pos % container.content.len(),
|
||||
pos % 10 + 1,
|
||||
|
|
|
@ -87,7 +87,7 @@ pub(crate) trait Dag {
|
|||
|
||||
#[inline]
|
||||
fn contains(&self, id: ID) -> bool {
|
||||
self.vv().includes(id)
|
||||
self.vv().includes_id(id)
|
||||
}
|
||||
|
||||
fn frontier(&self) -> &[ID];
|
||||
|
|
|
@ -43,6 +43,14 @@ impl CounterSpan {
|
|||
self.to - 1
|
||||
}
|
||||
}
|
||||
|
||||
fn end(&self) -> i32 {
|
||||
if self.from > self.to {
|
||||
self.from + 1
|
||||
} else {
|
||||
self.to
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl HasLength for CounterSpan {
|
||||
|
@ -124,7 +132,7 @@ impl IdSpan {
|
|||
|
||||
#[inline]
|
||||
pub fn end_id(&self) -> ID {
|
||||
ID::new(self.client_id, self.counter.max() + 1)
|
||||
ID::new(self.client_id, self.counter.end())
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
use std::{
|
||||
cmp::Ordering,
|
||||
ops::{Deref, DerefMut},
|
||||
ops::{Deref, DerefMut, Sub},
|
||||
};
|
||||
|
||||
use fxhash::FxHashMap;
|
||||
|
@ -37,6 +37,25 @@ impl Deref for VersionVector {
|
|||
}
|
||||
}
|
||||
|
||||
impl Sub for VersionVector {
|
||||
type Output = Vec<IdSpan>;
|
||||
|
||||
fn sub(self, rhs: Self) -> Self::Output {
|
||||
let mut ans = Vec::new();
|
||||
for (client_id, &counter) in self.iter() {
|
||||
if let Some(&rhs_counter) = rhs.get(client_id) {
|
||||
if counter > rhs_counter {
|
||||
ans.push(IdSpan::new(*client_id, rhs_counter, counter));
|
||||
}
|
||||
} else {
|
||||
ans.push(IdSpan::new(*client_id, 0, counter));
|
||||
}
|
||||
}
|
||||
|
||||
ans
|
||||
}
|
||||
}
|
||||
|
||||
impl PartialOrd for VersionVector {
|
||||
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
|
||||
let mut self_greater = true;
|
||||
|
@ -147,7 +166,7 @@ impl VersionVector {
|
|||
}
|
||||
}
|
||||
|
||||
pub fn includes(&self, id: ID) -> bool {
|
||||
pub fn includes_id(&self, id: ID) -> bool {
|
||||
if let Some(end) = self.get(&id.client_id) {
|
||||
if *end > id.counter {
|
||||
return true;
|
||||
|
|
|
@ -277,9 +277,64 @@ impl<T: Rle, A: RleTreeTrait<T>> RleTree<T, A> {
|
|||
}
|
||||
}
|
||||
|
||||
self.update_with_gathered_map(updates_map, notify);
|
||||
}
|
||||
|
||||
pub fn update_at_cursors_twice<U, V, F>(
|
||||
&mut self,
|
||||
cursor_groups: &[&[UnsafeCursor<T, A>]; 2],
|
||||
update_fn_u: &mut U,
|
||||
update_fn_v: &mut V,
|
||||
notify: &mut F,
|
||||
) where
|
||||
U: FnMut(&mut T),
|
||||
V: FnMut(&mut T),
|
||||
F: FnMut(&T, *mut LeafNode<T, A>),
|
||||
{
|
||||
let mut updates_map: HashMap<NonNull<_>, Vec<(usize, Vec<T>)>, _> = FxHashMap::default();
|
||||
for (i, cursors) in cursor_groups.iter().enumerate() {
|
||||
for cursor in cursors.iter() {
|
||||
// SAFETY: we has the exclusive reference to the tree and the cursor is valid
|
||||
let updates = unsafe {
|
||||
if i == 0 {
|
||||
cursor.leaf.as_ref().pure_update(
|
||||
cursor.index,
|
||||
cursor.offset,
|
||||
cursor.len,
|
||||
update_fn_u,
|
||||
)
|
||||
} else {
|
||||
cursor.leaf.as_ref().pure_update(
|
||||
cursor.index,
|
||||
cursor.offset,
|
||||
cursor.len,
|
||||
update_fn_v,
|
||||
)
|
||||
}
|
||||
};
|
||||
|
||||
if let Some(update) = updates {
|
||||
updates_map
|
||||
.entry(cursor.leaf)
|
||||
.or_default()
|
||||
.push((cursor.index, update));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
self.update_with_gathered_map(updates_map, notify);
|
||||
}
|
||||
|
||||
fn update_with_gathered_map<F, M>(
|
||||
&mut self,
|
||||
iter: HashMap<NonNull<LeafNode<T, A>>, Vec<(usize, Vec<T>)>, M>,
|
||||
notify: &mut F,
|
||||
) where
|
||||
F: FnMut(&T, *mut LeafNode<T, A>),
|
||||
{
|
||||
let mut internal_updates_map: HashMap<NonNull<_>, Vec<(usize, Vec<_>)>, _> =
|
||||
FxHashMap::default();
|
||||
for (mut leaf, updates) in updates_map {
|
||||
for (mut leaf, updates) in iter {
|
||||
// SAFETY: we has the exclusive reference to the tree and the cursor is valid
|
||||
let leaf = unsafe { leaf.as_mut() };
|
||||
if let Err(new) = leaf.apply_updates(updates, notify) {
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
use std::ops::Range;
|
||||
use std::ops::{Deref, Range};
|
||||
|
||||
use crate::{HasLength, Mergable, Slice, Sliceable};
|
||||
|
||||
|
@ -174,32 +174,32 @@ impl<T, Conf> RleVec<T, Conf> {
|
|||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
#[inline(always)]
|
||||
pub fn merged_len(&self) -> usize {
|
||||
self.vec.len()
|
||||
}
|
||||
|
||||
#[inline]
|
||||
#[inline(always)]
|
||||
pub fn to_vec(self) -> Vec<T> {
|
||||
self.vec
|
||||
}
|
||||
|
||||
#[inline]
|
||||
#[inline(always)]
|
||||
pub fn vec(&self) -> &Vec<T> {
|
||||
&self.vec
|
||||
}
|
||||
|
||||
#[inline]
|
||||
#[inline(always)]
|
||||
pub fn iter(&self) -> std::slice::Iter<'_, T> {
|
||||
self.vec.iter()
|
||||
}
|
||||
|
||||
#[inline]
|
||||
#[inline(always)]
|
||||
pub fn vec_mut(&mut self) -> &mut Vec<T> {
|
||||
&mut self.vec
|
||||
}
|
||||
|
||||
#[inline]
|
||||
#[inline(always)]
|
||||
pub fn get_merged(&self, index: usize) -> Option<&T> {
|
||||
self.vec.get(index)
|
||||
}
|
||||
|
@ -285,6 +285,14 @@ impl<T> HasLength for RleVec<T> {
|
|||
}
|
||||
}
|
||||
|
||||
impl<T> Deref for RleVec<T> {
|
||||
type Target = [T];
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
self.vec()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
mod prime_value {
|
||||
|
|
Loading…
Reference in a new issue