Implement anchors using an offset + a version vector

This commit is contained in:
Antonio Scandurra 2021-06-01 15:28:20 +02:00
parent 311e1b0f5e
commit da7e3c8cd8
4 changed files with 273 additions and 293 deletions

View file

@ -351,6 +351,8 @@ pub struct FragmentSummary {
text: FragmentTextSummary,
max_fragment_id: FragmentId,
max_version: time::Global,
min_insertion_version: time::Global,
max_insertion_version: time::Global,
}
#[derive(Default, Clone, Debug, PartialEq, Eq)]
@ -360,7 +362,7 @@ struct FragmentTextSummary {
}
impl<'a> sum_tree::Dimension<'a, FragmentSummary> for FragmentTextSummary {
fn add_summary(&mut self, summary: &'a FragmentSummary, _: &()) {
fn add_summary(&mut self, summary: &'a FragmentSummary, _: &Option<time::Global>) {
self.visible += summary.text.visible;
self.deleted += summary.text.deleted;
}
@ -502,7 +504,7 @@ impl Buffer {
base_insertion.clone(),
0..0,
),
&(),
&None,
);
if base_text.len() > 0 {
@ -520,7 +522,7 @@ impl Buffer {
);
fragments.push(
Fragment::new(base_fragment_id, base_insertion, range_in_insertion.clone()),
&(),
&None,
);
}
@ -825,7 +827,7 @@ impl Buffer {
}
pub fn len(&self) -> usize {
self.fragments.extent::<usize>(&())
self.fragments.extent::<usize>(&None)
}
pub fn line_len(&self, row: u32) -> u32 {
@ -873,7 +875,7 @@ impl Buffer {
let since_2 = since.clone();
let cursor = self.fragments.filter(
move |summary| summary.max_version.changed_since(&since_2),
&(),
&None,
);
Edits {
@ -1191,8 +1193,11 @@ impl Buffer {
let mut fragments_cursor = old_fragments.cursor::<FragmentIdRef, FragmentTextSummary>();
let mut new_fragments =
fragments_cursor.slice(&FragmentIdRef::new(&start_fragment_id), SeekBias::Left, &());
let mut new_fragments = fragments_cursor.slice(
&FragmentIdRef::new(&start_fragment_id),
SeekBias::Left,
&None,
);
let mut new_ropes =
RopeBuilder::new(old_visible_text.cursor(0), old_deleted_text.cursor(0));
new_ropes.push_tree(new_fragments.summary().text);
@ -1201,8 +1206,8 @@ impl Buffer {
if start_offset == start_fragment.range_in_insertion.end {
let fragment = fragments_cursor.item().unwrap().clone();
new_ropes.push_fragment(&fragment, fragment.visible);
new_fragments.push(fragment, &());
fragments_cursor.next(&());
new_fragments.push(fragment, &None);
fragments_cursor.next(&None);
}
while let Some(fragment) = fragments_cursor.item() {
@ -1242,11 +1247,11 @@ impl Buffer {
};
if let Some(fragment) = before_range {
new_ropes.push_fragment(&fragment, fragment.visible);
new_fragments.push(fragment, &());
new_fragments.push(fragment, &None);
}
if let Some(fragment) = insertion {
new_ropes.push_str(new_text.take().unwrap());
new_fragments.push(fragment, &());
new_fragments.push(fragment, &None);
}
if let Some(mut fragment) = within_range {
let fragment_was_visible = fragment.visible;
@ -1258,11 +1263,11 @@ impl Buffer {
}
new_ropes.push_fragment(&fragment, fragment_was_visible);
new_fragments.push(fragment, &());
new_fragments.push(fragment, &None);
}
if let Some(fragment) = after_range {
new_ropes.push_fragment(&fragment, fragment.visible);
new_fragments.push(fragment, &());
new_fragments.push(fragment, &None);
}
} else {
if new_text.is_some() && lamport_timestamp > fragment.insertion.lamport_timestamp {
@ -1275,7 +1280,7 @@ impl Buffer {
lamport_timestamp,
);
new_ropes.push_str(new_text);
new_fragments.push(fragment, &());
new_fragments.push(fragment, &None);
}
let fragment_was_visible = fragment.visible;
@ -1289,10 +1294,10 @@ impl Buffer {
}
new_ropes.push_fragment(&fragment, fragment_was_visible);
new_fragments.push(fragment, &());
new_fragments.push(fragment, &None);
}
fragments_cursor.next(&());
fragments_cursor.next(&None);
}
if let Some(new_text) = new_text {
@ -1304,11 +1309,11 @@ impl Buffer {
lamport_timestamp,
);
new_ropes.push_str(new_text);
new_fragments.push(fragment, &());
new_fragments.push(fragment, &None);
}
let (visible_text, deleted_text) = new_ropes.finish();
new_fragments.push_tree(fragments_cursor.suffix(&()), &());
new_fragments.push_tree(fragments_cursor.suffix(&None), &None);
self.fragments = new_fragments;
self.visible_text = visible_text;
@ -1409,7 +1414,7 @@ impl Buffer {
let first_split_id = insertion_splits.next().unwrap();
new_fragments =
fragments_cursor.slice(&FragmentIdRef::new(first_split_id), SeekBias::Left, &());
fragments_cursor.slice(&FragmentIdRef::new(first_split_id), SeekBias::Left, &None);
new_ropes.push_tree(new_fragments.summary().text);
loop {
@ -1419,14 +1424,17 @@ impl Buffer {
fragment.max_undos.observe(undo.id);
new_ropes.push_fragment(&fragment, was_visible);
new_fragments.push(fragment.clone(), &());
new_fragments.push(fragment.clone(), &None);
fragments_cursor.next(&());
fragments_cursor.next(&None);
if let Some(split_id) = insertion_splits.next() {
let slice =
fragments_cursor.slice(&FragmentIdRef::new(split_id), SeekBias::Left, &());
let slice = fragments_cursor.slice(
&FragmentIdRef::new(split_id),
SeekBias::Left,
&None,
);
new_ropes.push_tree(slice.summary().text);
new_fragments.push_tree(slice, &());
new_fragments.push_tree(slice, &None);
} else {
break;
}
@ -1435,7 +1443,7 @@ impl Buffer {
new_fragments = fragments_cursor.slice(
&FragmentIdRef::new(&start_fragment_id),
SeekBias::Left,
&(),
&None,
);
new_ropes.push_tree(new_fragments.summary().text);
@ -1453,13 +1461,13 @@ impl Buffer {
}
new_ropes.push_fragment(&fragment, fragment_was_visible);
new_fragments.push(fragment, &());
fragments_cursor.next(&());
new_fragments.push(fragment, &None);
fragments_cursor.next(&None);
}
}
}
new_fragments.push_tree(fragments_cursor.suffix(&()), &());
new_fragments.push_tree(fragments_cursor.suffix(&None), &None);
let (visible_text, deleted_text) = new_ropes.finish();
drop(fragments_cursor);
@ -1551,7 +1559,7 @@ impl Buffer {
let mut fragments_cursor = old_fragments.cursor::<usize, usize>();
let mut new_fragments =
fragments_cursor.slice(&cur_range.as_ref().unwrap().start, SeekBias::Right, &());
fragments_cursor.slice(&cur_range.as_ref().unwrap().start, SeekBias::Right, &None);
let mut new_ropes =
RopeBuilder::new(old_visible_text.cursor(0), old_deleted_text.cursor(0));
@ -1595,7 +1603,7 @@ impl Buffer {
fragment.range_in_insertion.start = prefix.range_in_insertion.end;
new_ropes.push_fragment(&prefix, prefix.visible);
new_fragments.push(prefix.clone(), &());
new_fragments.push(prefix.clone(), &None);
new_split_tree.push(
InsertionSplit {
extent: prefix.range_in_insertion.end - prefix.range_in_insertion.start,
@ -1628,7 +1636,7 @@ impl Buffer {
);
new_ropes.push_str(&new_text);
new_fragments.push(new_fragment, &());
new_fragments.push(new_fragment, &None);
}
}
@ -1639,14 +1647,14 @@ impl Buffer {
prefix.range_in_insertion.start + (range.end - fragment_start);
prefix.id =
FragmentId::between(&new_fragments.last().unwrap().id, &fragment.id);
version_in_range.observe_all(&fragment_summary.max_version);
version_in_range.join(&fragment_summary.max_version);
if prefix.visible {
prefix.deletions.insert(local_timestamp);
prefix.visible = false;
}
fragment.range_in_insertion.start = prefix.range_in_insertion.end;
new_ropes.push_fragment(&prefix, fragment_was_visible);
new_fragments.push(prefix.clone(), &());
new_fragments.push(prefix.clone(), &None);
new_split_tree.push(
InsertionSplit {
extent: prefix.range_in_insertion.end
@ -1660,7 +1668,7 @@ impl Buffer {
end_offset = Some(fragment.range_in_insertion.start);
}
} else {
version_in_range.observe_all(&fragment_summary.max_version);
version_in_range.join(&fragment_summary.max_version);
if fragment.visible {
fragment.deletions.insert(local_timestamp);
fragment.visible = false;
@ -1714,10 +1722,10 @@ impl Buffer {
.insert(fragment.insertion.id, new_split_tree);
new_ropes.push_fragment(&fragment, fragment_was_visible);
new_fragments.push(fragment, &());
new_fragments.push(fragment, &None);
// Scan forward until we find a fragment that is not fully contained by the current splice.
fragments_cursor.next(&());
fragments_cursor.next(&None);
if let Some(range) = cur_range.clone() {
while let Some(fragment) = fragments_cursor.item() {
let fragment_summary = fragments_cursor.item_summary().unwrap();
@ -1726,15 +1734,15 @@ impl Buffer {
fragment_end = fragment_start + fragment.visible_len();
if range.start < fragment_start && range.end >= fragment_end {
let mut new_fragment = fragment.clone();
version_in_range.observe_all(&fragment_summary.max_version);
version_in_range.join(&fragment_summary.max_version);
if new_fragment.visible {
new_fragment.deletions.insert(local_timestamp);
new_fragment.visible = false;
}
new_ropes.push_fragment(&new_fragment, fragment_was_visible);
new_fragments.push(new_fragment, &());
fragments_cursor.next(&());
new_fragments.push(new_fragment, &None);
fragments_cursor.next(&None);
if range.end == fragment_end {
end_id = Some(fragment.insertion.id);
@ -1777,10 +1785,10 @@ impl Buffer {
let slice = fragments_cursor.slice(
&cur_range.as_ref().unwrap().start,
SeekBias::Right,
&(),
&None,
);
new_ropes.push_tree(slice.summary().text);
new_fragments.push_tree(slice, &());
new_fragments.push_tree(slice, &None);
}
}
}
@ -1814,11 +1822,11 @@ impl Buffer {
);
new_ropes.push_str(&new_text);
new_fragments.push(new_fragment, &());
new_fragments.push(new_fragment, &None);
}
}
new_fragments.push_tree(fragments_cursor.suffix(&()), &());
new_fragments.push_tree(fragments_cursor.suffix(&None), &None);
let (visible_text, deleted_text) = new_ropes.finish();
self.fragments = new_fragments;
@ -1982,8 +1990,10 @@ impl Buffer {
} else if offset == max_offset && bias == AnchorBias::Right {
Anchor::End
} else {
let mut cursor = self.fragments.cursor::<usize, FragmentTextSummary>();
cursor.seek(&offset, bias.to_seek_bias(), &None);
Anchor::Middle {
offset,
offset: offset + cursor.start().deleted,
bias,
version: self.version(),
}
@ -1991,6 +2001,60 @@ impl Buffer {
}
fn summary_for_anchor(&self, anchor: &Anchor) -> TextSummary {
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
enum VersionedOffset {
Offset(usize),
InvalidVersion,
}
impl VersionedOffset {
fn offset(&self) -> usize {
if let Self::Offset(offset) = self {
*offset
} else {
panic!("invalid version")
}
}
}
impl Default for VersionedOffset {
fn default() -> Self {
Self::Offset(0)
}
}
impl<'a> sum_tree::Dimension<'a, FragmentSummary> for VersionedOffset {
fn add_summary(&mut self, summary: &'a FragmentSummary, cx: &Option<time::Global>) {
if let Self::Offset(offset) = self {
let version = cx.as_ref().unwrap();
if *version >= summary.max_insertion_version {
*offset += summary.text.visible + summary.text.deleted;
} else if *version < summary.min_insertion_version {
// Every insertion in this subtree is causally after the context's version.
} else {
*self = VersionedOffset::InvalidVersion;
}
}
}
}
impl<'a> sum_tree::SeekDimension<'a, FragmentSummary> for VersionedOffset {
fn cmp(&self, other: &Self, _: &Option<time::Global>) -> cmp::Ordering {
match (self, other) {
(Self::Offset(a), Self::Offset(b)) => Ord::cmp(a, b),
(Self::Offset(_), Self::InvalidVersion) => cmp::Ordering::Less,
(Self::InvalidVersion, _) => unreachable!(),
}
}
}
impl<'a> sum_tree::Dimension<'a, FragmentSummary> for (VersionedOffset, usize) {
fn add_summary(&mut self, summary: &'a FragmentSummary, cx: &Option<time::Global>) {
self.0.add_summary(summary, cx);
self.1 += summary.text.visible;
}
}
match anchor {
Anchor::Start => TextSummary::default(),
Anchor::End => self.text_summary(),
@ -2001,39 +2065,20 @@ impl Buffer {
} => {
let mut cursor = self
.fragments
.filter::<_, usize>(|summary| !(*version >= summary.max_version), &());
.cursor::<VersionedOffset, (VersionedOffset, usize)>();
cursor.seek(
&VersionedOffset::Offset(*offset),
bias.to_seek_bias(),
&Some(version.clone()),
);
let fragment = cursor.item().unwrap();
let overshoot = if fragment.visible {
offset - cursor.start().0.offset()
} else {
0
};
let mut old_offset = 0;
let mut new_offset = 0;
while let Some(fragment) = cursor.item() {
let bytes_since_last_fragment = *cursor.start() - new_offset;
let comparison = offset.cmp(&(old_offset + bytes_since_last_fragment));
if comparison == cmp::Ordering::Greater
|| (comparison == cmp::Ordering::Equal && *bias == AnchorBias::Right)
{
old_offset += bytes_since_last_fragment;
new_offset += bytes_since_last_fragment;
if fragment.was_visible(version, &self.undo_map) {
let comparison = offset.cmp(&(old_offset + fragment.visible_len()));
if comparison == cmp::Ordering::Greater
|| (comparison == cmp::Ordering::Equal
&& *bias == AnchorBias::Right)
{
old_offset += fragment.len();
} else {
break;
}
}
new_offset += fragment.visible_len();
cursor.next(&());
} else {
break;
}
}
let ix = new_offset + offset.saturating_sub(old_offset);
self.text_summary_for_range(0..ix)
self.text_summary_for_range(0..cursor.start().1 + overshoot)
}
}
}
@ -2262,7 +2307,7 @@ impl<'a, F: Fn(&FragmentSummary) -> bool> Iterator for Edits<'a, F> {
}
}
self.cursor.next(&());
self.cursor.next(&None);
}
change
@ -2446,7 +2491,7 @@ impl<'a> FragmentIdRef<'a> {
}
impl<'a> sum_tree::Dimension<'a, FragmentSummary> for FragmentIdRef<'a> {
fn add_summary(&mut self, summary: &'a FragmentSummary, _: &()) {
fn add_summary(&mut self, summary: &'a FragmentSummary, _: &Option<time::Global>) {
self.0 = Some(&summary.max_fragment_id)
}
}
@ -2497,8 +2542,11 @@ impl sum_tree::Item for Fragment {
for deletion in &self.deletions {
max_version.observe(*deletion);
}
max_version.observe_all(&self.max_undos);
max_version.join(&self.max_undos);
let mut min_insertion_version = time::Global::new();
min_insertion_version.observe(self.insertion.id);
let max_insertion_version = min_insertion_version.clone();
if self.visible {
FragmentSummary {
text: FragmentTextSummary {
@ -2507,6 +2555,8 @@ impl sum_tree::Item for Fragment {
},
max_fragment_id: self.id.clone(),
max_version,
min_insertion_version,
max_insertion_version,
}
} else {
FragmentSummary {
@ -2516,20 +2566,26 @@ impl sum_tree::Item for Fragment {
},
max_fragment_id: self.id.clone(),
max_version,
min_insertion_version,
max_insertion_version,
}
}
}
}
impl sum_tree::Summary for FragmentSummary {
type Context = ();
type Context = Option<time::Global>;
fn add_summary(&mut self, other: &Self, _: &Self::Context) {
self.text.visible += &other.text.visible;
self.text.deleted += &other.text.deleted;
debug_assert!(self.max_fragment_id <= other.max_fragment_id);
self.max_fragment_id = other.max_fragment_id.clone();
self.max_version.observe_all(&other.max_version);
self.max_version.join(&other.max_version);
self.min_insertion_version
.meet(&other.min_insertion_version);
self.max_insertion_version
.join(&other.max_insertion_version);
}
}
@ -2539,12 +2595,14 @@ impl Default for FragmentSummary {
text: FragmentTextSummary::default(),
max_fragment_id: FragmentId::min_value().clone(),
max_version: time::Global::new(),
min_insertion_version: time::Global::new(),
max_insertion_version: time::Global::new(),
}
}
}
impl<'a> sum_tree::Dimension<'a, FragmentSummary> for usize {
fn add_summary(&mut self, summary: &FragmentSummary, _: &()) {
fn add_summary(&mut self, summary: &FragmentSummary, _: &Option<time::Global>) {
*self += summary.text.visible;
}
}

View file

@ -1,5 +1,5 @@
use super::{Buffer, ToOffset};
use crate::time;
use crate::{sum_tree, time};
use anyhow::Result;
use std::{cmp::Ordering, ops::Range};
@ -14,12 +14,21 @@ pub enum Anchor {
},
}
#[derive(Clone, Eq, PartialEq, Debug, Hash)]
#[derive(Copy, Clone, Eq, PartialEq, Debug, Hash)]
pub enum AnchorBias {
Left,
Right,
}
impl AnchorBias {
pub fn to_seek_bias(self) -> sum_tree::SeekBias {
match self {
AnchorBias::Left => sum_tree::SeekBias::Left,
AnchorBias::Right => sum_tree::SeekBias::Right,
}
}
}
impl PartialOrd for AnchorBias {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))

View file

@ -415,220 +415,108 @@ where
D: Dimension<'a, T::Summary>,
{
debug_assert!(target.cmp(&self.seek_dimension, cx) >= Ordering::Equal);
let mut containing_subtree = None;
if self.did_seek {
'outer: while let Some(entry) = self.stack.last_mut() {
{
match *entry.tree.0 {
Node::Internal {
ref child_summaries,
ref child_trees,
..
} => {
if !self.did_seek {
self.did_seek = true;
self.stack.push(StackEntry {
tree: self.tree,
index: 0,
seek_dimension: Default::default(),
sum_dimension: Default::default(),
});
}
let mut ascending = false;
'outer: while let Some(entry) = self.stack.last_mut() {
match *entry.tree.0 {
Node::Internal {
ref child_summaries,
ref child_trees,
..
} => {
if ascending {
entry.index += 1;
}
for (child_tree, child_summary) in child_trees[entry.index..]
.iter()
.zip(&child_summaries[entry.index..])
{
let mut child_end = self.seek_dimension.clone();
child_end.add_summary(&child_summary, cx);
let comparison = target.cmp(&child_end, cx);
if comparison == Ordering::Greater
|| (comparison == Ordering::Equal && bias == SeekBias::Right)
{
self.seek_dimension = child_end;
self.sum_dimension.add_summary(child_summary, cx);
match aggregate {
SeekAggregate::None => {}
SeekAggregate::Slice(slice) => {
slice.push_tree(child_tree.clone(), cx);
}
SeekAggregate::Summary(summary) => {
summary.add_summary(child_summary, cx);
}
}
entry.index += 1;
for (child_tree, child_summary) in child_trees[entry.index..]
.iter()
.zip(&child_summaries[entry.index..])
{
let mut child_end = self.seek_dimension.clone();
child_end.add_summary(&child_summary, cx);
let comparison = target.cmp(&child_end, cx);
if comparison == Ordering::Greater
|| (comparison == Ordering::Equal && bias == SeekBias::Right)
{
self.seek_dimension = child_end;
self.sum_dimension.add_summary(child_summary, cx);
match aggregate {
SeekAggregate::None => {}
SeekAggregate::Slice(slice) => {
slice.push_tree(child_tree.clone(), cx);
}
SeekAggregate::Summary(summary) => {
summary.add_summary(child_summary, cx);
}
}
entry.index += 1;
} else {
containing_subtree = Some(child_tree);
break 'outer;
}
}
}
Node::Leaf {
ref items,
ref item_summaries,
..
} => {
let mut slice_items = ArrayVec::<[T; 2 * TREE_BASE]>::new();
let mut slice_item_summaries =
ArrayVec::<[T::Summary; 2 * TREE_BASE]>::new();
let mut slice_items_summary = match aggregate {
SeekAggregate::Slice(_) => Some(T::Summary::default()),
_ => None,
};
for (item, item_summary) in items[entry.index..]
.iter()
.zip(&item_summaries[entry.index..])
{
let mut child_end = self.seek_dimension.clone();
child_end.add_summary(item_summary, cx);
let comparison = target.cmp(&child_end, cx);
if comparison == Ordering::Greater
|| (comparison == Ordering::Equal && bias == SeekBias::Right)
{
self.seek_dimension = child_end;
self.sum_dimension.add_summary(item_summary, cx);
match aggregate {
SeekAggregate::None => {}
SeekAggregate::Slice(_) => {
slice_items.push(item.clone());
slice_item_summaries.push(item_summary.clone());
slice_items_summary
.as_mut()
.unwrap()
.add_summary(item_summary, cx);
}
SeekAggregate::Summary(summary) => {
summary.add_summary(item_summary, cx);
}
}
entry.index += 1;
} else {
if let SeekAggregate::Slice(slice) = aggregate {
slice.push_tree(
SumTree(Arc::new(Node::Leaf {
summary: slice_items_summary.unwrap(),
items: slice_items,
item_summaries: slice_item_summaries,
})),
cx,
);
}
break 'outer;
}
}
if let SeekAggregate::Slice(slice) = aggregate {
if !slice_items.is_empty() {
slice.push_tree(
SumTree(Arc::new(Node::Leaf {
summary: slice_items_summary.unwrap(),
items: slice_items,
item_summaries: slice_item_summaries,
})),
cx,
);
}
}
entry.seek_dimension = self.seek_dimension.clone();
entry.sum_dimension = self.sum_dimension.clone();
} else {
self.stack.push(StackEntry {
tree: child_tree,
index: 0,
seek_dimension: self.seek_dimension.clone(),
sum_dimension: self.sum_dimension.clone(),
});
ascending = false;
continue 'outer;
}
}
}
Node::Leaf {
ref items,
ref item_summaries,
..
} => {
let mut slice_items = ArrayVec::<[T; 2 * TREE_BASE]>::new();
let mut slice_item_summaries = ArrayVec::<[T::Summary; 2 * TREE_BASE]>::new();
let mut slice_items_summary = match aggregate {
SeekAggregate::Slice(_) => Some(T::Summary::default()),
_ => None,
};
self.stack.pop();
}
} else {
self.did_seek = true;
containing_subtree = Some(self.tree);
}
for (item, item_summary) in items[entry.index..]
.iter()
.zip(&item_summaries[entry.index..])
{
let mut child_end = self.seek_dimension.clone();
child_end.add_summary(item_summary, cx);
if let Some(mut subtree) = containing_subtree {
loop {
let mut next_subtree = None;
match *subtree.0 {
Node::Internal {
ref child_summaries,
ref child_trees,
..
} => {
for (index, (child_tree, child_summary)) in
child_trees.iter().zip(child_summaries).enumerate()
let comparison = target.cmp(&child_end, cx);
if comparison == Ordering::Greater
|| (comparison == Ordering::Equal && bias == SeekBias::Right)
{
let mut child_end = self.seek_dimension.clone();
child_end.add_summary(child_summary, cx);
let comparison = target.cmp(&child_end, cx);
if comparison == Ordering::Greater
|| (comparison == Ordering::Equal && bias == SeekBias::Right)
{
self.seek_dimension = child_end;
self.sum_dimension.add_summary(child_summary, cx);
match aggregate {
SeekAggregate::None => {}
SeekAggregate::Slice(slice) => {
slice.push_tree(child_trees[index].clone(), cx);
}
SeekAggregate::Summary(summary) => {
summary.add_summary(child_summary, cx);
}
self.seek_dimension = child_end;
self.sum_dimension.add_summary(item_summary, cx);
match aggregate {
SeekAggregate::None => {}
SeekAggregate::Slice(_) => {
slice_items.push(item.clone());
slice_item_summaries.push(item_summary.clone());
slice_items_summary
.as_mut()
.unwrap()
.add_summary(item_summary, cx);
}
} else {
self.stack.push(StackEntry {
tree: subtree,
index,
seek_dimension: self.seek_dimension.clone(),
sum_dimension: self.sum_dimension.clone(),
});
next_subtree = Some(child_tree);
break;
}
}
}
Node::Leaf {
ref items,
ref item_summaries,
..
} => {
let mut slice_items = ArrayVec::<[T; 2 * TREE_BASE]>::new();
let mut slice_item_summaries =
ArrayVec::<[T::Summary; 2 * TREE_BASE]>::new();
let mut slice_items_summary = match aggregate {
SeekAggregate::Slice(_) => Some(T::Summary::default()),
_ => None,
};
for (index, (item, item_summary)) in
items.iter().zip(item_summaries).enumerate()
{
let mut child_end = self.seek_dimension.clone();
child_end.add_summary(item_summary, cx);
let comparison = target.cmp(&child_end, cx);
if comparison == Ordering::Greater
|| (comparison == Ordering::Equal && bias == SeekBias::Right)
{
self.seek_dimension = child_end;
self.sum_dimension.add_summary(item_summary, cx);
match aggregate {
SeekAggregate::None => {}
SeekAggregate::Slice(_) => {
slice_items.push(item.clone());
slice_items_summary
.as_mut()
.unwrap()
.add_summary(item_summary, cx);
slice_item_summaries.push(item_summary.clone());
}
SeekAggregate::Summary(summary) => {
summary.add_summary(item_summary, cx);
}
SeekAggregate::Summary(summary) => {
summary.add_summary(item_summary, cx);
}
} else {
self.stack.push(StackEntry {
tree: subtree,
index,
seek_dimension: self.seek_dimension.clone(),
sum_dimension: self.sum_dimension.clone(),
});
break;
}
}
if let SeekAggregate::Slice(slice) = aggregate {
if !slice_items.is_empty() {
entry.index += 1;
} else {
if let SeekAggregate::Slice(slice) = aggregate {
slice.push_tree(
SumTree(Arc::new(Node::Leaf {
summary: slice_items_summary.unwrap(),
@ -638,16 +526,27 @@ where
cx,
);
}
break 'outer;
}
}
};
if let Some(next_subtree) = next_subtree {
subtree = next_subtree;
} else {
break;
if let SeekAggregate::Slice(slice) = aggregate {
if !slice_items.is_empty() {
slice.push_tree(
SumTree(Arc::new(Node::Leaf {
summary: slice_items_summary.unwrap(),
items: slice_items,
item_summaries: slice_item_summaries,
})),
cx,
);
}
}
}
}
self.stack.pop();
ascending = true;
}
self.at_end = self.stack.is_empty();

View file

@ -81,12 +81,26 @@ impl Global {
}
}
pub fn observe_all(&mut self, other: &Self) {
pub fn join(&mut self, other: &Self) {
for timestamp in other.0.iter() {
self.observe(*timestamp);
}
}
pub fn meet(&mut self, other: &Self) {
for timestamp in other.0.iter() {
if let Some(entry) = self
.0
.iter_mut()
.find(|t| t.replica_id == timestamp.replica_id)
{
entry.value = cmp::min(entry.value, timestamp.value);
} else {
self.0.push(*timestamp);
}
}
}
pub fn observed(&self, timestamp: Local) -> bool {
self.get(timestamp.replica_id) >= timestamp.value
}