chore: impl basic framework of text applying

This commit is contained in:
Zixuan Chen 2022-10-20 12:31:50 +08:00
parent ee8bee54ab
commit d1e1143c58
12 changed files with 320 additions and 53 deletions

View file

@ -18,7 +18,7 @@ pub type Timestamp = i64;
pub type Lamport = u32;
/// A `Change` contains a list of [Op]s.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct Change {
pub(crate) ops: RleVec<Op>,
pub(crate) deps: SmallVec<[ID; 2]>,

View file

@ -15,7 +15,7 @@ use crate::{
use super::{
string_pool::StringPool,
text_content::{ListSlice, ListSliceTreeTrait},
tracker::Tracker,
tracker::{Effect, Tracker},
};
#[derive(Clone, Debug)]
@ -98,9 +98,9 @@ impl Container for TextContainer {
let mut latest_head: SmallVec<[ID; 2]> = store.frontier().into();
latest_head.push(new_op_id);
if common.len() == 0 || !common.iter().all(|x| self.tracker.contains(new_op_id)) {
if common.len() == 0 || !common.iter().all(|x| self.tracker.contains(*x)) {
// stage 1
self.tracker = Tracker::new(common.clone(), common_vv);
self.tracker = Tracker::new(common_vv);
let path = store.find_path(&common, &latest_head);
for iter in store.iter_partial(&common, path.right) {
self.tracker.retreat(&iter.retreat);
@ -114,9 +114,16 @@ impl Container for TextContainer {
}
// stage 2
let path = store.find_path(&latest_head, &store.frontier());
let path = store.find_path(&latest_head, store.frontier());
self.tracker.retreat(&path.left);
todo!("turn on tracker and calculate effects")
for effect in self.tracker.iter_effects(path.left) {
match effect {
Effect::Del { pos, len } => self.state.delete_range(Some(pos), Some(pos + len)),
Effect::Ins { pos, content } => {
todo!("need to find the span from store / keep the list slice in the tracker");
}
}
}
}
fn checkout_version(&mut self, _vv: &crate::VersionVector) {

View file

@ -1,10 +1,10 @@
use rle::HasLength;
use rle::{rle_tree::UnsafeCursor, HasLength};
use smallvec::SmallVec;
use crate::{
container::{list::list_op::ListOp, text::tracker::yata_impl::YataImpl},
id::{Counter, ID},
op::{Op, OpContent},
op::OpContent,
span::IdSpan,
version::IdSpanVector,
VersionVector,
@ -13,11 +13,14 @@ use crate::{
use self::{
content_map::ContentMap,
cursor_map::{make_notify, CursorMap, IdSpanQueryResult},
y_span::{Status, StatusChange, YSpan},
effects_iter::EffectIter,
y_span::{Status, StatusChange, YSpan, YSpanTreeTrait},
};
pub use effects_iter::Effect;
mod content_map;
mod cursor_map;
mod effects_iter;
mod y_span;
#[cfg(not(feature = "fuzzing"))]
mod yata_impl;
@ -42,7 +45,6 @@ pub struct Tracker {
all_vv: VersionVector,
/// current content version vector
head_vv: VersionVector,
start_head: SmallVec<[ID; 2]>,
content: ContentMap,
id_to_cursor: CursorMap,
}
@ -54,7 +56,7 @@ impl From<ID> for u128 {
}
impl Tracker {
pub fn new(head: SmallVec<[ID; 2]>, start_vv: VersionVector) -> Self {
pub fn new(start_vv: VersionVector) -> Self {
let min = ID::unknown(0);
let max = ID::unknown(Counter::MAX / 2);
let len = (max.counter - min.counter) as usize;
@ -76,7 +78,6 @@ impl Tracker {
content,
id_to_cursor,
start_vv,
start_head: head,
client_id: 0,
head_vv: Default::default(),
all_vv: Default::default(),
@ -144,18 +145,22 @@ impl Tracker {
span.1.start,
span.1.end,
));
for delete in deletes {
for (id, delete) in deletes {
for deleted_span in delete.iter() {
to_delete.append(
&mut self
.id_to_cursor
.get_cursors_at_id_span(*deleted_span)
.inserts,
.inserts
.iter()
.map(|x| x.1)
.collect(),
);
}
}
to_set_as_applied.append(&mut inserts);
// TODO: maybe we can skip this collect
to_set_as_applied.append(&mut inserts.iter().map(|x| x.1).collect());
}
self.content.update_at_cursors_twice(
@ -183,18 +188,22 @@ impl Tracker {
span.1.start,
span.1.end,
));
for delete in deletes {
for (id, delete) in deletes {
for deleted_span in delete.iter() {
to_undo_delete.append(
&mut self
.id_to_cursor
.get_cursors_at_id_span(*deleted_span)
.inserts,
.inserts
.iter()
.map(|x| x.1)
.collect(),
);
}
}
to_set_as_future.append(&mut inserts);
// TODO: maybe we can skip this collect
to_set_as_future.append(&mut inserts.iter().map(|x| x.1).collect());
}
self.content.update_at_cursors_twice(
@ -210,7 +219,7 @@ impl Tracker {
}
/// apply an operation directly to the current tracker
pub fn apply(&mut self, id: ID, content: &OpContent) {
pub(crate) fn apply(&mut self, id: ID, content: &OpContent) {
assert_eq!(*self.head_vv.get(&id.client_id).unwrap_or(&0), id.counter);
self.head_vv.set_end(id.inc(content.len() as i32));
match &content {
@ -235,19 +244,48 @@ impl Tracker {
}
}
fn update_cursors(
&mut self,
mut cursors: SmallVec<[UnsafeCursor<'_, YSpan, YSpanTreeTrait>; 2]>,
change: StatusChange,
) -> i32 {
let mut changed: i32 = 0;
self.content.update_at_cursors(
&mut cursors,
&mut |v| {
let before = v.len() as i32;
v.status.apply(change);
let after = v.len() as i32;
changed += after - before;
},
&mut make_notify(&mut self.id_to_cursor),
);
changed
}
fn update_spans(&mut self, spans: &[IdSpan], change: StatusChange) {
let mut cursors = Vec::new();
let mut cursors: SmallVec<
[UnsafeCursor<YSpan, rle::rle_tree::tree_trait::CumulateTreeTrait<YSpan, 4>>; 2],
> = SmallVec::with_capacity(spans.len());
for span in spans.iter() {
let mut inserts = self.id_to_cursor.get_cursors_at_id_span(*span).inserts;
cursors.append(&mut inserts);
let inserts = self.id_to_cursor.get_cursors_at_id_span(*span).inserts;
// TODO: maybe we can skip this collect
for x in inserts.iter() {
cursors.push(x.1);
}
}
self.content.update_at_cursors(
cursors,
&mut cursors,
&mut |v| {
v.status.apply(change);
},
&mut make_notify(&mut self.id_to_cursor),
)
}
pub fn iter_effects(&mut self, target: IdSpanVector) -> EffectIter<'_> {
EffectIter::new(self, target)
}
}

View file

@ -210,29 +210,58 @@ pub(super) fn make_notify(
}
pub(super) struct IdSpanQueryResult<'a> {
pub inserts: Vec<UnsafeCursor<'static, YSpan, YSpanTreeTrait>>,
pub deletes: Vec<&'a RleVec<IdSpan>>,
pub inserts: Vec<(ID, UnsafeCursor<'static, YSpan, YSpanTreeTrait>)>,
pub deletes: Vec<(ID, &'a RleVec<IdSpan>)>,
}
#[derive(EnumAsInner)]
pub enum FirstCursorResult<'a> {
Ins(ID, UnsafeCursor<'static, YSpan, YSpanTreeTrait>),
Del(ID, &'a RleVec<IdSpan>),
}
impl CursorMap {
pub fn get_cursors_at_id_span(&self, span: IdSpan) -> IdSpanQueryResult {
let mut inserts = Vec::new();
let mut inserts: Vec<(ID, UnsafeCursor<'static, YSpan, YSpanTreeTrait>)> = Vec::new();
let mut deletes = Vec::new();
for marker in self.get_range(span.min_id().into(), span.end_id().into()) {
let mut inserted_set = fxhash::FxHashSet::default();
for (id, marker) in self.get_range_with_index(span.min_id().into(), span.end_id().into()) {
let id: ID = id.into();
match marker {
Marker::Insert { .. } => {
let mut offset = 0;
for cursor in marker.get_spans(span) {
if !inserts.contains(&cursor) {
inserts.push(cursor);
let new_id = id.inc(offset);
if !inserted_set.contains(&cursor) {
inserted_set.insert(cursor);
offset += cursor.len as i32;
inserts.push((new_id, cursor));
}
}
}
Marker::Delete(del) => {
deletes.push(del);
deletes.push((id, del));
}
}
}
IdSpanQueryResult { inserts, deletes }
}
pub fn get_first_cursors_at_id_span(&self, span: IdSpan) -> Option<FirstCursorResult> {
for (id, marker) in self.get_range_with_index(span.min_id().into(), span.end_id().into()) {
let id: ID = id.into();
match marker {
Marker::Insert { .. } => {
let spans = marker.get_spans(span);
if !spans.is_empty() {
return Some(FirstCursorResult::Ins(id, spans[0]));
}
}
Marker::Delete(del) => return Some(FirstCursorResult::Del(id, del)),
}
}
None
}
}

View file

@ -0,0 +1,150 @@
use rle::HasLength;
use smallvec::smallvec;
use crate::{
id::Counter,
span::{CounterSpan, HasId, IdSpan},
version::IdSpanVector,
};
use super::{
cursor_map::{FirstCursorResult, IdSpanQueryResult},
y_span::StatusChange,
Tracker,
};
pub struct EffectIter<'a> {
tracker: &'a mut Tracker,
left_spans: Vec<IdSpan>,
current_span: Option<IdSpan>,
current_delete_targets: Option<Vec<IdSpan>>,
}
impl<'a> EffectIter<'a> {
pub fn new(tracker: &'a mut Tracker, target: IdSpanVector) -> Self {
let spans = target
.iter()
.map(|(client, ctr)| IdSpan::new(*client, ctr.start, ctr.end))
.collect();
Self {
tracker,
left_spans: spans,
current_span: None,
current_delete_targets: None,
}
}
fn run(tracker: &mut Tracker, spans: &IdSpanVector) {
let mut to_set_as_applied = Vec::with_capacity(spans.len());
let mut to_delete = Vec::with_capacity(spans.len());
for span in spans.iter() {
let IdSpanQueryResult {
mut inserts,
deletes,
} = tracker.id_to_cursor.get_cursors_at_id_span(IdSpan::new(
*span.0,
span.1.start,
span.1.end,
));
for (id, delete) in deletes {
for deleted_span in delete.iter() {
to_delete.append(
&mut tracker
.id_to_cursor
.get_cursors_at_id_span(*deleted_span)
.inserts,
);
}
}
to_set_as_applied.append(&mut inserts);
}
}
}
pub enum Effect {
Del { pos: usize, len: usize },
Ins { pos: usize, content: IdSpan },
}
impl<'a> Iterator for EffectIter<'a> {
type Item = Effect;
fn next(&mut self) -> Option<Self::Item> {
loop {
if let Some(ref mut delete_targets) = self.current_delete_targets {
let target = delete_targets.pop().unwrap();
let result = self
.tracker
.id_to_cursor
.get_first_cursors_at_id_span(target)
.unwrap();
let (id, cursor) = result.as_ins().unwrap();
assert_eq!(*id, target.id_start());
if cursor.len != target.len() {
delete_targets.push(IdSpan {
client_id: target.client_id,
counter: CounterSpan::new(
id.counter + cursor.len as Counter,
target.counter.end,
),
});
}
// SAFETY: we know that the cursor is valid here
let pos = unsafe { cursor.get_index() };
let changed_len = self
.tracker
.update_cursors(smallvec![*cursor], StatusChange::Delete);
return Some(Effect::Del {
pos,
len: (-changed_len) as usize,
});
}
if let Some(ref mut current) = self.current_span {
let cursor = self
.tracker
.id_to_cursor
.get_first_cursors_at_id_span(*current);
if let Some(cursor) = cursor {
match cursor {
FirstCursorResult::Ins(id, cursor) => {
current
.counter
.set_start(id.counter + cursor.len as Counter);
// SAFETY: we know that the cursor is valid here
let index = unsafe { cursor.get_index() };
let span = IdSpan::new(
id.client_id,
id.counter,
id.counter + cursor.len as Counter,
);
let changed = self
.tracker
.update_cursors(smallvec![cursor], StatusChange::SetAsCurrent);
assert_eq!(changed as usize, span.len());
return Some(Effect::Ins {
pos: index,
content: span,
});
}
FirstCursorResult::Del(id, del) => {
current.counter.set_start(id.counter + del.len() as Counter);
self.current_delete_targets = Some(del.iter().cloned().collect());
}
}
} else {
self.current_span = None;
}
} else {
if self.left_spans.is_empty() {
return None;
}
self.current_span = self.left_spans.pop();
}
}
}
}

View file

@ -237,7 +237,7 @@ pub mod fuzz {
}
fn new_container(client_id: usize) -> Self::Container {
let mut tracker = Tracker::new();
let mut tracker = Tracker::new(Default::default());
#[cfg(feature = "fuzzing")]
{
tracker.client_id = client_id as ClientID;

View file

@ -165,14 +165,13 @@ impl LogStore {
}
}
let change = self.push_change(change);
// TODO: find a way to remove this clone?
let change = self.push_change(change).clone();
// Apply ops.
// NOTE: applying expects that log_store has store the Change, but has not updated its vv yet
let mut offset = 0;
for op in change.ops.iter() {
self.apply_remote_op(change, op, offset);
offset += op.len() as u32;
self.apply_remote_op(&change, op);
}
self.frontier = self
@ -206,10 +205,10 @@ impl LogStore {
/// this function assume op is not included in the log, and its deps are included.
#[inline]
fn apply_remote_op(&mut self, change: &Change, op: &Op, offset: u32) {
fn apply_remote_op(&mut self, change: &Change, op: &Op) {
let mut container = self.container.write().unwrap();
let container = container.get_or_create(op.container());
container.apply(&OpProxy::new(change, op, offset, None), self);
container.apply(&OpProxy::new(change, op, None), self);
}
#[inline]

View file

@ -13,9 +13,6 @@ use crate::{
pub struct OpProxy<'a> {
change: &'a Change,
op: &'a Op,
/// offset of op in change.
/// i.e. change.id.inc(op_offset) == op.start
op_offset: u32,
/// slice range of the op, op[slice_range]
slice_range: Range<Counter>,
}
@ -43,7 +40,7 @@ impl<'a> HasLength for OpProxy<'a> {
impl<'a> HasLamport for OpProxy<'a> {
fn lamport(&self) -> Lamport {
self.change.lamport + self.op_offset + self.slice_range.start as u32
self.change.lamport + self.offset() as u32 + self.slice_range.start as u32
}
}
@ -72,12 +69,7 @@ impl Ord for OpProxy<'_> {
}
impl<'a> OpProxy<'a> {
pub fn new(
change: &'a Change,
op: &'a Op,
op_offset: u32,
range: Option<Range<Counter>>,
) -> Self {
pub fn new(change: &'a Change, op: &'a Op, range: Option<Range<Counter>>) -> Self {
OpProxy {
change,
op,
@ -86,12 +78,11 @@ impl<'a> OpProxy<'a> {
} else {
0..op.len() as Counter
},
op_offset,
}
}
pub fn offset(&self) -> u32 {
self.op_offset
pub fn offset(&self) -> i32 {
self.op.id.counter - self.change.id.counter
}
pub fn lamport(&self) -> Lamport {

View file

@ -97,6 +97,17 @@ impl<Index: GlobalIndex + 'static, Value: Rle + ZeroElement + 'static> RangeMap<
ans
}
#[inline]
pub fn get_range_with_index(&self, start: Index, end: Index) -> Vec<(Index, &Value)> {
let mut ans = Vec::new();
for value in self.tree.iter_range(start, Some(end)) {
let value = value.as_tree_ref();
ans.push((value.index, &value.value));
}
ans
}
#[inline]
pub fn get(&self, index: Index) -> Option<&Value> {
let cursor = self.tree.get(index);

View file

@ -252,7 +252,7 @@ impl<T: Rle, A: RleTreeTrait<T>> RleTree<T, A> {
pub fn update_at_cursors<U, F>(
&mut self,
cursors: Vec<UnsafeCursor<T, A>>,
cursors: &mut [UnsafeCursor<T, A>],
update_fn: &mut U,
notify: &mut F,
) where
@ -387,7 +387,8 @@ impl<T: Rle, A: RleTreeTrait<T>> RleTree<T, A> {
}
// SAFETY: it's perfectly safe here because we know what we are doing in the update_at_cursors
self.update_at_cursors(unsafe { std::mem::transmute(cursors) }, update_fn, notify);
let mut cursors: Vec<_> = unsafe { std::mem::transmute(cursors) };
self.update_at_cursors(&mut cursors, update_fn, notify);
}
pub fn debug_check(&mut self) {

View file

@ -1,6 +1,7 @@
use std::{hash::Hash, marker::PhantomData, ptr::NonNull};
use crdt_list::crdt::GetOp;
use num::FromPrimitive;
use crate::{HasLength, Rle, RleTreeTrait};
@ -175,6 +176,14 @@ impl<'tree, T: Rle, A: RleTreeTrait<T>> UnsafeCursor<'tree, T, A> {
})
}
/// # Safety
///
/// we need to make sure that the leaf is still valid
pub unsafe fn get_index(&self) -> A::Int {
let index = A::get_index(self.leaf.as_ref(), self.index);
index + A::Int::from_usize(self.offset).unwrap()
}
/// move cursor forward
///
/// # Safety

View file

@ -91,7 +91,8 @@ pub trait RleTreeTrait<T: Rle>: Sized + Debug {
/// - If index is at the end of an element, `found` should be true
/// - If target index is after last child, then `child_index` = children.len().wrapping_sub(1), `offset` = children.last().unwrap().len()
fn find_pos_leaf(node: &LeafNode<'_, T, Self>, index: Self::Int) -> FindPosResult<usize>;
/// calculate the index of the child element of a leaf node
fn get_index(node: &LeafNode<'_, T, Self>, child_index: usize) -> Self::Int;
fn len_leaf(node: &LeafNode<'_, T, Self>) -> Self::Int;
fn len_internal(node: &InternalNode<'_, T, Self>) -> Self::Int;
fn check_cache_leaf(_node: &LeafNode<'_, T, Self>) {}
@ -197,6 +198,33 @@ impl<T: Rle, const MAX_CHILD: usize> RleTreeTrait<T> for CumulateTreeTrait<T, MA
fn check_cache_leaf(node: &LeafNode<'_, T, Self>) {
assert_eq!(node.cache, node.children().iter().map(|x| x.len()).sum());
}
fn get_index(node: &LeafNode<'_, T, Self>, mut child_index: usize) -> Self::Int {
debug_assert!(!node.is_deleted());
let mut index = 0;
for i in 0..child_index {
index += node.children[i].len();
}
child_index = node.get_index_in_parent().unwrap();
// SAFETY: parent is valid if node is valid
let mut node = unsafe { node.parent.as_ref() };
loop {
for i in 0..child_index {
index += node.children[i].len();
}
if let Some(parent) = node.parent {
child_index = node.get_index_in_parent().unwrap();
// SAFETY: parent is valid if node is valid
node = unsafe { parent.as_ref() };
} else {
break;
}
}
index
}
}
impl Position {
@ -382,6 +410,10 @@ impl<T: Rle + HasGlobalIndex, const MAX_CHILD: usize> RleTreeTrait<T>
);
assert_eq!(node.cache.start, get_cache(node.children()[0]).start);
}
fn get_index(node: &LeafNode<'_, T, Self>, child_index: usize) -> Self::Int {
node.children[child_index].get_global_start()
}
}
#[inline]