perf: optimize snapshot encoding (#264)

* perf: optimize snapshot encoding

* perf: rm id_int_map and boost speed 1.4x

Co-authored-by: Leon Zhao <leeeon233@gmail.com>

---------

Co-authored-by: Leon Zhao <leeeon233@gmail.com>
This commit is contained in:
Zixuan Chen 2024-02-04 15:28:08 +08:00 committed by GitHub
parent dcbdd55195
commit 6950e42cae
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 175 additions and 389 deletions

View file

@ -1,5 +1,4 @@
use loro_common::ID;
use loro_internal::{version::Frontiers, LoroDoc};
use loro_internal::LoroDoc;
fn main() {
use bench_utils::TextAction;
@ -10,15 +9,20 @@ fn main() {
let start = Instant::now();
// loro.subscribe_deep(Box::new(|_| ()));
let text = loro.get_text("text");
for _ in 0..1 {
let n = 100;
for _ in 0..n {
let mut txn = loro.txn().unwrap();
for TextAction { del, ins, pos } in actions.iter() {
text.delete_with_txn(&mut txn, *pos, *del).unwrap();
text.insert_with_txn(&mut txn, *pos, ins).unwrap();
}
}
loro.checkout(&Frontiers::from(ID::new(loro.peer_id(), 100)))
.unwrap();
// loro.diagnose();
println!("{}", start.elapsed().as_millis());
println!("Apply time {}", start.elapsed().as_millis());
loro.diagnose_size();
drop(actions);
let start = Instant::now();
for _ in 0..1 {
loro.export_snapshot();
}
println!("Snapshot encoding time {}", start.elapsed().as_millis());
}

View file

@ -412,7 +412,12 @@ impl Sliceable for InnerListOp {
pos,
} => InnerListOp::InsertText {
slice: {
let (a, b) = unicode_range_to_byte_range(slice, from, to);
let (a, b) = unicode_range_to_byte_range(
// SAFETY: we know it's a valid utf8 string
unsafe { std::str::from_utf8_unchecked(slice) },
from,
to,
);
slice.slice(a, b)
},
unicode_start: *unicode_start + from as u32,

View file

@ -1,9 +1,10 @@
use std::{borrow::Cow, cmp::Ordering, mem::take, sync::Arc};
use fxhash::{FxHashMap, FxHashSet};
use generic_btree::rle::Sliceable;
use itertools::Itertools;
use loro_common::{
ContainerID, ContainerType, Counter, HasCounterSpan, HasIdSpan, HasLamportSpan, IdSpan,
ContainerID, ContainerType, Counter, HasCounterSpan, HasId, HasIdSpan, HasLamportSpan, IdSpan,
InternalString, LoroError, LoroResult, PeerID, ID,
};
use num_traits::FromPrimitive;
@ -20,7 +21,6 @@ use crate::{
},
op::{Op, OpWithId, SliceRange},
state::ContainerState,
utils::id_int_map::IdIntMap,
version::Frontiers,
DocState, LoroDoc, OpLog, VersionVector,
};
@ -355,12 +355,14 @@ pub(crate) fn encode_snapshot(oplog: &OpLog, state: &DocState, vv: &VersionVecto
let mut cid_register: ValueRegister<ContainerID> = ValueRegister::from_existing(containers);
let mut dep_arena = arena::DepsArena::default();
let mut value_writer = ValueWriter::new();
let mut ops: Vec<TempOp> = Vec::new();
// This stores the required op positions of each container state.
// The states can be encoded in these positions in the next step.
// This data structure stores that mapping from op id to the required total order.
let mut map_op_to_pos = IdIntMap::new();
let mut origin_ops: Vec<TempOp<'_>> = Vec::new();
let mut pos_mapping_heap: Vec<PosMappingItem> = Vec::new();
let mut pos_target_value = 0;
let mut states = Vec::new();
let mut state_bytes = Vec::new();
for (_, c_idx) in c_pairs.iter() {
@ -390,7 +392,7 @@ pub(crate) fn encode_snapshot(oplog: &OpLog, state: &DocState, vv: &VersionVecto
}
},
encoder_by_op: &mut |op| {
ops.push(TempOp {
origin_ops.push(TempOp {
op: Cow::Owned(op.op),
peer_idx: peer_register.register(&op.peer) as u32,
peer_id: op.peer,
@ -401,8 +403,14 @@ pub(crate) fn encode_snapshot(oplog: &OpLog, state: &DocState, vv: &VersionVecto
});
},
record_idspan: &mut |id_span| {
op_len += id_span.atom_len();
map_op_to_pos.insert(id_span);
let len = id_span.atom_len();
op_len += len;
pos_mapping_heap.push(PosMappingItem {
start_id: id_span.id_start(),
len,
target_value: pos_target_value,
});
pos_target_value += len as i32;
},
mode: super::EncodeMode::Snapshot,
});
@ -420,32 +428,13 @@ pub(crate) fn encode_snapshot(oplog: &OpLog, state: &DocState, vv: &VersionVecto
&mut dep_arena,
&mut peer_register,
&mut |op| {
let mut count = 0;
let o_len = op.atom_len();
ops.extend(map_op_to_pos.split(op).map(|x| {
count += x.atom_len();
x
}));
debug_assert_eq!(count, o_len);
origin_ops.push(op);
},
&mut key_register,
&container_idx2index,
);
ops.sort_by(move |a, b| {
a.container_index.cmp(&b.container_index).then_with(|| {
match (map_op_to_pos.get(a.id()), map_op_to_pos.get(b.id())) {
(None, None) => a
.prop_that_used_for_sort
.cmp(&b.prop_that_used_for_sort)
.then_with(|| a.peer_idx.cmp(&b.peer_idx))
.then_with(|| a.lamport.cmp(&b.lamport)),
(None, Some(_)) => Ordering::Greater,
(Some(_), None) => Ordering::Less,
(Some(a), Some(b)) => a.0.cmp(&b.0),
}
})
});
let ops: Vec<TempOp> = calc_sorted_ops_for_snapshot(origin_ops, pos_mapping_heap);
let encoded_ops = encode_ops(
ops,
@ -486,6 +475,123 @@ pub(crate) fn encode_snapshot(oplog: &OpLog, state: &DocState, vv: &VersionVecto
serde_columnar::to_vec(&doc).unwrap()
}
#[derive(Clone, Copy, PartialEq, Debug, Eq)]
struct PosMappingItem {
start_id: ID,
len: usize,
target_value: i32,
}
impl Ord for PosMappingItem {
fn cmp(&self, other: &Self) -> Ordering {
// this is reversed so that the BinaryHeap will be a min-heap
other.start_id.cmp(&self.start_id)
}
}
impl PartialOrd for PosMappingItem {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl PosMappingItem {
fn split(&mut self, pos: usize) -> Self {
let new_len = self.len - pos;
self.len = pos;
PosMappingItem {
start_id: self.start_id.inc(pos as i32),
len: new_len,
target_value: self.target_value + pos as i32,
}
}
}
fn calc_sorted_ops_for_snapshot<'a>(
mut origin_ops: Vec<TempOp<'a>>,
mut pos_mapping_heap: Vec<PosMappingItem>,
) -> Vec<TempOp<'a>> {
origin_ops.sort_unstable();
pos_mapping_heap.sort_unstable();
let mut ops: Vec<TempOp<'a>> = Vec::with_capacity(origin_ops.len());
let ops_len: usize = origin_ops.iter().map(|x| x.atom_len()).sum();
let mut origin_top = origin_ops.pop();
let mut pos_top = pos_mapping_heap.pop();
while origin_top.is_some() || pos_top.is_some() {
let Some(mut inner_origin_top) = origin_top else {
unreachable!()
};
let Some(mut inner_pos_top) = pos_top else {
ops.push(inner_origin_top);
origin_top = origin_ops.pop();
continue;
};
match inner_origin_top.id_start().cmp(&inner_pos_top.start_id) {
std::cmp::Ordering::Less => {
if inner_origin_top.id_end() <= inner_pos_top.start_id {
ops.push(inner_origin_top);
origin_top = origin_ops.pop();
} else {
let delta =
inner_pos_top.start_id.counter - inner_origin_top.id_start().counter;
let right = inner_origin_top.split(delta as usize);
ops.push(inner_origin_top);
origin_top = Some(right);
}
}
std::cmp::Ordering::Equal => {
match inner_origin_top.atom_len().cmp(&inner_pos_top.len) {
std::cmp::Ordering::Less => {
// origin top is shorter than pos mapping,
// need to split the pos mapping
let len = inner_origin_top.atom_len();
inner_origin_top.prop_that_used_for_sort =
i32::MIN + inner_pos_top.target_value;
ops.push(inner_origin_top);
let next = inner_pos_top.split(len);
origin_top = origin_ops.pop();
pos_top = Some(next);
}
std::cmp::Ordering::Equal => {
// origin op's length equal to pos mapping's length
inner_origin_top.prop_that_used_for_sort =
i32::MIN + inner_pos_top.target_value;
ops.push(inner_origin_top.clone());
origin_top = origin_ops.pop();
pos_top = pos_mapping_heap.pop();
}
std::cmp::Ordering::Greater => {
// origin top is longer than pos mapping,
// need to split the origin top
let right = inner_origin_top.split(inner_pos_top.len);
inner_origin_top.prop_that_used_for_sort =
i32::MIN + inner_pos_top.target_value;
ops.push(inner_origin_top);
origin_top = Some(right);
pos_top = pos_mapping_heap.pop();
}
}
}
std::cmp::Ordering::Greater => unreachable!(),
}
}
ops.sort_unstable_by(|a, b| {
a.container_index.cmp(&b.container_index).then({
a.prop_that_used_for_sort
.cmp(&b.prop_that_used_for_sort)
.then_with(|| a.peer_idx.cmp(&b.peer_idx))
.then_with(|| a.lamport.cmp(&b.lamport))
})
});
debug_assert_eq!(ops.iter().map(|x| x.atom_len()).sum::<usize>(), ops_len);
ops
}
pub(crate) fn decode_snapshot(doc: &LoroDoc, bytes: &[u8]) -> LoroResult<()> {
let mut state = doc.app_state().try_lock().map_err(|_| {
LoroError::DecodeError(
@ -661,7 +767,7 @@ mod encode {
InternalString,
};
#[derive(Debug)]
#[derive(Debug, Clone)]
pub(super) struct TempOp<'a> {
pub op: Cow<'a, Op>,
pub lamport: Lamport,
@ -673,12 +779,26 @@ mod encode {
pub prop_that_used_for_sort: i32,
}
impl TempOp<'_> {
pub(crate) fn id(&self) -> loro_common::ID {
loro_common::ID {
peer: self.peer_id,
counter: self.op.counter,
}
impl PartialEq for TempOp<'_> {
fn eq(&self, other: &Self) -> bool {
self.peer_id == other.peer_id && self.lamport == other.lamport
}
}
impl Eq for TempOp<'_> {}
impl Ord for TempOp<'_> {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.peer_id
.cmp(&other.peer_id)
.then(self.lamport.cmp(&other.lamport))
// we need reverse because we'll need to use binary heap to get the smallest one
.reverse()
}
}
impl PartialOrd for TempOp<'_> {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}

View file

@ -1,341 +0,0 @@
use itertools::Either;
use loro_common::{HasCounter, HasCounterSpan, HasId, HasIdSpan, IdSpan, ID};
use rle::HasLength;
use std::collections::BTreeMap;
/// A map that maps spans of continuous [ID]s to spans of continuous integers.
///
/// It can merge spans that are adjacent to each other.
/// The value is automatically incremented by the length of the inserted span.
#[derive(Debug)]
pub struct IdIntMap {
inner: Either<BTreeMap<ID, Value>, Vec<(IdSpan, i32)>>,
next_value: i32,
}
const MAX_VEC_LEN: usize = 16;
#[derive(Debug)]
struct Value {
len: i32,
value: i32,
}
impl IdIntMap {
pub fn new() -> Self {
Self {
inner: Either::Right(Default::default()),
next_value: 0,
}
}
pub fn insert(&mut self, id_span: IdSpan) {
if cfg!(debug_assertions) {
let target = self.get(id_span.id_start());
assert!(
target.is_none(),
"ID already exists {id_span:?} {target:?} {:#?}",
self
);
}
match &mut self.inner {
Either::Left(map) => {
let value = self.next_value;
let len = id_span.atom_len() as i32;
self.next_value += len;
let id = id_span.id_start();
match map.range_mut(..&id).last() {
Some(last)
if last.0.peer == id.peer
&& last.0.counter + last.1.len == id.counter
&& last.1.value + last.1.len == value =>
{
// merge
last.1.len += len;
}
_ => {
map.insert(id, Value { len, value });
}
}
}
Either::Right(vec) => {
if vec.len() == MAX_VEC_LEN {
// convert to map and insert
self.escalate_to_map();
self.insert(id_span);
return;
}
let value = self.next_value;
let len = id_span.atom_len() as i32;
self.next_value += len;
let pos = match vec.binary_search_by(|x| x.0.id_start().cmp(&id_span.id_start())) {
Ok(_) => unreachable!("ID already exists"),
Err(i) => i,
};
if pos > 0 {
if let Some(last) = vec.get_mut(pos - 1) {
if last.0.id_end() == id_span.id_start()
&& last.1 + last.0.atom_len() as i32 == value
{
// can merge
last.0.counter.end += len;
return;
}
}
}
vec.insert(pos, (id_span, value));
}
}
}
fn escalate_to_map(&mut self) {
let Either::Right(vec) = &mut self.inner else {
return;
};
let mut map = BTreeMap::new();
for (id_span, value) in vec.drain(..) {
map.insert(
id_span.id_start(),
Value {
len: id_span.atom_len() as i32,
value,
},
);
}
self.inner = Either::Left(map);
}
/// Return (value, length) that starts at the given ID.
pub fn get(&self, target: ID) -> Option<(i32, usize)> {
let ans = match &self.inner {
Either::Left(map) => map.range(..=&target).last().and_then(|(entry_key, value)| {
if entry_key.peer != target.peer {
None
} else if entry_key.counter + value.len > target.counter {
Some((
value.value + target.counter - entry_key.counter,
(entry_key.counter + value.len - target.counter) as usize,
))
} else {
None
}
}),
Either::Right(vec) => vec
.iter()
.rev()
.find(|(id_span, _)| id_span.contains(target))
.map(|(id_span, value)| {
(
*value + target.counter - id_span.ctr_start(),
(id_span.ctr_end() - target.counter) as usize,
)
}),
};
ans
}
/// Call `next` for each key-value pair that is in the given span.
/// It's guaranteed that the keys are in ascending order.
pub fn get_values_in_span(&self, target: IdSpan, mut next: impl FnMut(IdSpan, i32)) {
let target_peer = target.client_id;
match &self.inner {
Either::Left(map) => {
let last = map
.range(..&target.id_start())
.next_back()
.and_then(|(id, v)| {
if id.peer != target_peer {
None
} else if id.counter + v.len > target.ctr_start() {
Some((id, v))
} else {
None
}
});
let iter = map.range(&target.id_start()..);
for (entry_key, value) in last.into_iter().chain(iter) {
if entry_key.peer > target_peer {
break;
}
if entry_key.counter >= target.ctr_end() {
break;
}
assert_eq!(entry_key.peer, target_peer);
let cur_span = &IdSpan::new(
target_peer,
entry_key.counter,
entry_key.counter + value.len,
);
let next_span = cur_span.get_intersection(&target).unwrap();
(next)(
next_span,
value.value + next_span.counter.start - entry_key.counter,
);
}
}
Either::Right(vec) => {
for (id_span, value) in vec.iter() {
if id_span.client_id < target_peer {
continue;
}
if id_span.client_id > target_peer {
break;
}
if target.ctr_start() >= id_span.ctr_end() {
continue;
}
if target.ctr_end() <= id_span.counter.start {
break;
}
assert_eq!(id_span.client_id, target_peer);
let next_span = id_span.get_intersection(&target).unwrap();
(next)(
next_span,
*value + next_span.counter.start - id_span.counter.start,
);
}
}
}
}
/// If the given item has overlapped section with the content in the map,
/// split the item into pieces where each piece maps to a continuous series of values or maps to none.
pub(crate) fn split<'a, T: HasIdSpan + generic_btree::rle::Sliceable + 'a>(
&'a self,
item: T,
) -> impl Iterator<Item = T> + 'a {
let len = item.rle_len();
let span = item.id_span();
// PERF: we may avoid this alloc if get_values_in_span returns an iter
let mut ans = Vec::new();
let mut ctr_start = span.ctr_start();
let mut index = 0;
let ctr_end = span.ctr_end();
self.get_values_in_span(span, |id_span: IdSpan, _| {
if id_span.counter.start == ctr_start && id_span.counter.end == ctr_end {
return;
}
if id_span.counter.start > ctr_start {
ans.push(
item.slice(
index as usize..(index + id_span.counter.start - ctr_start) as usize,
),
);
index += id_span.counter.start - ctr_start;
}
ans.push(item.slice(
index as usize..(index + id_span.counter.end - id_span.counter.start) as usize,
));
index += id_span.counter.end - id_span.counter.start;
ctr_start = id_span.ctr_end();
});
if ans.is_empty() && len > 0 {
ans.push(item);
} else if index as usize != len {
ans.push(item.slice(index as usize..len));
}
ans.into_iter()
}
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn test_basic() {
let mut map = IdIntMap::new();
map.insert(IdSpan::new(0, 0, 10));
map.insert(IdSpan::new(0, 10, 100));
map.insert(IdSpan::new(1, 0, 100));
map.insert(IdSpan::new(2, 0, 100));
map.insert(IdSpan::new(999, 0, 100));
assert!(map.inner.is_right());
assert_eq!(map.get(ID::new(0, 10)).unwrap().0, 10);
assert_eq!(map.get(ID::new(1, 10)).unwrap().0, 110);
assert_eq!(map.get(ID::new(2, 10)).unwrap().0, 210);
assert_eq!(map.get(ID::new(0, 0)).unwrap().0, 0);
assert_eq!(map.get(ID::new(1, 0)).unwrap().0, 100);
assert_eq!(map.get(ID::new(2, 0)).unwrap().0, 200);
assert_eq!(map.get(ID::new(999, 99)).unwrap().0, 399);
for i in 0..100 {
map.insert(IdSpan::new(3, i * 2, i * 2 + 1));
}
assert!(map.inner.is_left());
assert_eq!(map.get(ID::new(0, 10)).unwrap().0, 10);
assert_eq!(map.get(ID::new(1, 10)).unwrap().0, 110);
assert_eq!(map.get(ID::new(2, 10)).unwrap().0, 210);
assert_eq!(map.get(ID::new(0, 0)).unwrap().0, 0);
assert_eq!(map.get(ID::new(1, 0)).unwrap().0, 100);
assert_eq!(map.get(ID::new(2, 0)).unwrap().0, 200);
assert_eq!(map.get(ID::new(999, 99)).unwrap().0, 399);
for i in 0..100 {
assert_eq!(map.get(ID::new(3, i * 2)).unwrap().0, i + 400, "i = {i}");
}
let mut called = 0;
map.get_values_in_span(IdSpan::new(0, 3, 66), |id_span, value| {
called += 1;
assert_eq!(id_span, IdSpan::new(0, 3, 66));
assert_eq!(value, 3);
});
assert_eq!(called, 1);
let mut called = Vec::new();
map.get_values_in_span(IdSpan::new(3, 0, 10), |id_span, value| {
called.push((id_span, value));
});
assert_eq!(
called,
vec![
(IdSpan::new(3, 0, 1), 400),
(IdSpan::new(3, 2, 3), 401),
(IdSpan::new(3, 4, 5), 402),
(IdSpan::new(3, 6, 7), 403),
(IdSpan::new(3, 8, 9), 404),
]
);
}
#[test]
fn test_get_values() {
let mut map = IdIntMap::new();
map.insert(IdSpan::new(0, 3, 5));
map.insert(IdSpan::new(0, 0, 1));
map.insert(IdSpan::new(0, 2, 3));
let mut called = Vec::new();
map.get_values_in_span(IdSpan::new(0, 0, 10), |id_span, value| {
called.push((id_span, value));
});
assert_eq!(
called,
vec![
(IdSpan::new(0, 0, 1), 2),
(IdSpan::new(0, 2, 3), 3),
(IdSpan::new(0, 3, 5), 0),
]
);
}
}

View file

@ -1,5 +1,4 @@
pub(crate) mod delta_rle_encoded_num;
pub(crate) mod id_int_map;
pub(crate) mod lazy;
pub mod string_slice;
pub(crate) mod utf16;

View file

@ -192,14 +192,13 @@ impl DeltaValue for StringSlice {
}
}
pub fn unicode_range_to_byte_range(bytes: &[u8], start: usize, end: usize) -> (usize, usize) {
pub fn unicode_range_to_byte_range(s: &str, start: usize, end: usize) -> (usize, usize) {
debug_assert!(start <= end);
let s = std::str::from_utf8(bytes).unwrap();
let start_unicode_index = start;
let end_unicode_index = end;
let mut current_utf8_index = 0;
let mut start_byte = 0;
let mut end_byte = bytes.len();
let mut end_byte = s.len();
for (current_unicode_index, c) in s.chars().enumerate() {
if current_unicode_index == start_unicode_index {
start_byte = current_utf8_index;