pref: optimize diff calc cache use (#475)

* pref: optimize diff calc cache use

* fix: type err
This commit is contained in:
Zixuan Chen 2024-09-29 14:01:27 +08:00 committed by GitHub
parent 4414053a82
commit a66dbd6fe6
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 98 additions and 76 deletions

View file

@ -13,7 +13,8 @@ use itertools::Itertools;
use enum_dispatch::enum_dispatch;
use fxhash::{FxHashMap, FxHashSet};
use loro_common::{
CompactIdLp, ContainerID, Counter, HasCounterSpan, IdFull, IdLp, IdSpan, LoroValue, PeerID, ID,
CompactIdLp, ContainerID, Counter, HasCounter, HasCounterSpan, HasIdSpan, IdFull, IdLp, IdSpan,
LoroValue, PeerID, ID,
};
use loro_delta::DeltaRope;
use smallvec::SmallVec;
@ -36,7 +37,7 @@ use crate::{
event::{DiffVariant, InternalDiff},
op::{InnerContent, RichOp, SliceRange, SliceWithId},
span::{HasId, HasLamport},
version::Frontiers,
version::{Frontiers, VersionRange},
InternalString, VersionVector,
};
@ -64,10 +65,7 @@ enum DiffCalculatorRetainMode {
/// The diff calculator can only be used once.
Once { used: bool },
/// The diff calculator will be persisted and can be reused after the diff calc is done.
Persist {
has_all: bool,
last_vv: VersionVector,
},
Persist { recorded_ops_range: VersionRange },
}
/// This mode defines how the diff is calculated and how it should be applied on the state.
@ -124,8 +122,7 @@ impl DiffCalculator {
calculators: Default::default(),
retain_mode: if persist {
DiffCalculatorRetainMode::Persist {
has_all: false,
last_vv: Default::default(),
recorded_ops_range: Default::default(),
}
} else {
DiffCalculatorRetainMode::Once { used: false }
@ -155,65 +152,41 @@ impl DiffCalculator {
let _e = s.enter();
let mut use_persisted_shortcut = false;
let mut merged = before.clone();
merged.merge(after);
let (lca, mut diff_mode, iter) =
oplog.iter_from_lca_causally(before, before_frontiers, after, after_frontiers);
match &mut self.retain_mode {
DiffCalculatorRetainMode::Once { used } => {
if *used {
panic!("DiffCalculator with retain_mode Once can only be used once");
}
}
DiffCalculatorRetainMode::Persist { has_all, last_vv } => {
if *has_all {
let include_before = last_vv.includes_vv(before);
let include_after = last_vv.includes_vv(after);
if !include_after || !include_before {
*has_all = false;
*last_vv = Default::default();
}
}
if *has_all {
DiffCalculatorRetainMode::Persist { recorded_ops_range } => {
if recorded_ops_range.contains_ops_between(&lca, &merged)
&& recorded_ops_range.contains_ops_between(before, after)
{
use_persisted_shortcut = true;
} else {
diff_mode = DiffMode::Checkout;
recorded_ops_range.clear();
}
}
}
let affected_set = if !use_persisted_shortcut {
// if we don't have all the ops, we need to calculate the diff by tracing back
let mut merged = before.clone();
merged.merge(after);
let (lca, mut diff_mode, iter) = oplog.iter_from_lca_causally(
before,
Some(before_frontiers),
after,
Some(after_frontiers),
);
if let DiffCalculatorRetainMode::Persist { has_all, last_vv } = &mut self.retain_mode {
if before.is_empty() {
*has_all = true;
*last_vv = Default::default();
}
diff_mode = DiffMode::Checkout;
}
tracing::debug!("LCA: {:?} mode={:?}", &lca, diff_mode);
let mut started_set = FxHashSet::default();
for (change, (start_counter, end_counter), vv) in iter {
if let DiffCalculatorRetainMode::Persist { has_all, last_vv } =
if let DiffCalculatorRetainMode::Persist { recorded_ops_range } =
&mut self.retain_mode
{
if *has_all {
if change.id.counter > 0 {
debug_assert!(
last_vv.includes_id(change.id.inc(-1)),
"{:?} {}",
&last_vv,
change.id
);
}
last_vv.extend_to_include_end_id(ID::new(change.id.peer, end_counter));
if container_filter.is_none() {
recorded_ops_range.extends_to_include_id_span(IdSpan::new(
change.peer(),
start_counter,
end_counter,
));
}
}
@ -287,8 +260,12 @@ impl DiffCalculator {
// Find a set of affected containers idx, if it's relatively cheap
if before.distance_between(after) < self.calculators.len() || cfg!(debug_assertions) {
let mut set = FxHashSet::default();
oplog.for_each_change_within(before, after, |change| {
oplog.for_each_change_within(before, after, |change, (start, end)| {
for op in change.ops.iter() {
if op.ctr_end() <= start || op.ctr_start() >= end {
continue;
}
let idx = op.container;
if let Some(filter) = container_filter {
if !filter(idx) {

View file

@ -28,7 +28,7 @@ use crate::state::GcStore;
use crate::version::{Frontiers, ImVersionVector, VersionVector};
use crate::LoroError;
use change_store::BlockOpRef;
use loro_common::{IdLp, IdSpan};
use loro_common::{HasCounter, IdLp, IdSpan};
use rle::{HasLength, RleVec, Sliceable};
use smallvec::SmallVec;
@ -413,12 +413,12 @@ impl OpLog {
&self,
a: &VersionVector,
b: &VersionVector,
mut f: impl FnMut(&Change),
mut f: impl FnMut(&Change, (Counter, Counter)),
) {
let spans = b.iter_between(a);
for span in spans {
for c in self.change_store.iter_changes(span) {
f(&c);
f(&c, (span.ctr_start(), span.ctr_end()));
}
}
}
@ -436,9 +436,9 @@ impl OpLog {
pub(crate) fn iter_from_lca_causally(
&self,
from: &VersionVector,
from_frontiers: Option<&Frontiers>,
from_frontiers: &Frontiers,
to: &VersionVector,
to_frontiers: Option<&Frontiers>,
to_frontiers: &Frontiers,
) -> (
VersionVector,
DiffMode,
@ -452,26 +452,6 @@ impl OpLog {
) {
let mut merged_vv = from.clone();
merged_vv.merge(to);
let from_frontiers_inner;
let to_frontiers_inner;
let from_frontiers = match from_frontiers {
Some(f) => f,
None => {
from_frontiers_inner = Some(self.dag.vv_to_frontiers(from));
from_frontiers_inner.as_ref().unwrap()
}
};
let to_frontiers = match to_frontiers {
Some(t) => t,
None => {
to_frontiers_inner = Some(self.dag.vv_to_frontiers(to));
to_frontiers_inner.as_ref().unwrap()
}
};
debug!("from_frontiers={:?} vv={:?}", &from_frontiers, from);
debug!("to_frontiers={:?} vv={:?}", &to_frontiers, to);
trace!("trimmed vv = {:?}", self.dag.trimmed_vv());
let (common_ancestors, mut diff_mode) =

View file

@ -28,6 +28,71 @@ use crate::{
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct VersionVector(FxHashMap<PeerID, Counter>);
#[repr(transparent)]
#[derive(Debug, Clone, Default)]
pub struct VersionRange(pub(crate) FxHashMap<PeerID, (Counter, Counter)>);
impl VersionRange {
pub fn new() -> Self {
Self(Default::default())
}
pub fn clear(&mut self) {
self.0.clear()
}
pub fn get(&self, peer: &PeerID) -> Option<&(Counter, Counter)> {
self.0.get(peer)
}
pub fn insert(&mut self, peer: PeerID, start: Counter, end: Counter) {
self.0.insert(peer, (start, end));
}
pub fn contains_ops_between(&self, vv_a: &VersionVector, vv_b: &VersionVector) -> bool {
for span in vv_a.sub_iter(vv_b) {
if !self.contains_id_span(IdSpan::new(
span.peer,
span.counter.start.saturating_sub(1),
span.counter.end,
)) {
return false;
}
}
for span in vv_b.sub_iter(vv_a) {
if !self.contains_id_span(IdSpan::new(
span.peer,
span.counter.start.saturating_sub(1),
span.counter.end,
)) {
return false;
}
}
true
}
pub fn contains_id_span(&self, mut span: IdSpan) -> bool {
span.normalize_();
if let Some((start, end)) = self.get(&span.peer) {
start <= &span.counter.start && end >= &span.counter.end
} else {
false
}
}
pub fn extends_to_include_id_span(&mut self, mut span: IdSpan) {
span.normalize_();
if let Some((start, end)) = self.0.get_mut(&span.peer) {
*start = (*start).min(span.counter.start);
*end = (*end).max(span.counter.end);
} else {
self.insert(span.peer, span.counter.start, span.counter.end);
}
}
}
/// Immutable version vector
///
/// It has O(1) clone time and O(logN) insert/delete/lookup time.