From 6950e42cae05614951d4c111f14a5f3d73319709 Mon Sep 17 00:00:00 2001 From: Zixuan Chen Date: Sun, 4 Feb 2024 15:28:08 +0800 Subject: [PATCH] perf: optimize snapshot encoding (#264) * perf: optimize snapshot encoding * perf: rm id_int_map and boost speed 1.4x Co-authored-by: Leon Zhao --------- Co-authored-by: Leon Zhao --- .../loro-internal/examples/automerge_x100.rs | 18 +- .../src/container/list/list_op.rs | 7 +- .../src/encoding/encode_reordered.rs | 192 ++++++++-- crates/loro-internal/src/utils/id_int_map.rs | 341 ------------------ crates/loro-internal/src/utils/mod.rs | 1 - .../loro-internal/src/utils/string_slice.rs | 5 +- 6 files changed, 175 insertions(+), 389 deletions(-) delete mode 100644 crates/loro-internal/src/utils/id_int_map.rs diff --git a/crates/loro-internal/examples/automerge_x100.rs b/crates/loro-internal/examples/automerge_x100.rs index 1ac3ef6f..5a4ebfbf 100644 --- a/crates/loro-internal/examples/automerge_x100.rs +++ b/crates/loro-internal/examples/automerge_x100.rs @@ -1,5 +1,4 @@ -use loro_common::ID; -use loro_internal::{version::Frontiers, LoroDoc}; +use loro_internal::LoroDoc; fn main() { use bench_utils::TextAction; @@ -10,15 +9,20 @@ fn main() { let start = Instant::now(); // loro.subscribe_deep(Box::new(|_| ())); let text = loro.get_text("text"); - for _ in 0..1 { + let n = 100; + for _ in 0..n { let mut txn = loro.txn().unwrap(); for TextAction { del, ins, pos } in actions.iter() { text.delete_with_txn(&mut txn, *pos, *del).unwrap(); text.insert_with_txn(&mut txn, *pos, ins).unwrap(); } } - loro.checkout(&Frontiers::from(ID::new(loro.peer_id(), 100))) - .unwrap(); - // loro.diagnose(); - println!("{}", start.elapsed().as_millis()); + println!("Apply time {}", start.elapsed().as_millis()); + loro.diagnose_size(); + drop(actions); + let start = Instant::now(); + for _ in 0..1 { + loro.export_snapshot(); + } + println!("Snapshot encoding time {}", start.elapsed().as_millis()); } diff --git a/crates/loro-internal/src/container/list/list_op.rs b/crates/loro-internal/src/container/list/list_op.rs index 42204870..641209b0 100644 --- a/crates/loro-internal/src/container/list/list_op.rs +++ b/crates/loro-internal/src/container/list/list_op.rs @@ -412,7 +412,12 @@ impl Sliceable for InnerListOp { pos, } => InnerListOp::InsertText { slice: { - let (a, b) = unicode_range_to_byte_range(slice, from, to); + let (a, b) = unicode_range_to_byte_range( + // SAFETY: we know it's a valid utf8 string + unsafe { std::str::from_utf8_unchecked(slice) }, + from, + to, + ); slice.slice(a, b) }, unicode_start: *unicode_start + from as u32, diff --git a/crates/loro-internal/src/encoding/encode_reordered.rs b/crates/loro-internal/src/encoding/encode_reordered.rs index f58a49e6..eaa4dfe5 100644 --- a/crates/loro-internal/src/encoding/encode_reordered.rs +++ b/crates/loro-internal/src/encoding/encode_reordered.rs @@ -1,9 +1,10 @@ use std::{borrow::Cow, cmp::Ordering, mem::take, sync::Arc}; use fxhash::{FxHashMap, FxHashSet}; +use generic_btree::rle::Sliceable; use itertools::Itertools; use loro_common::{ - ContainerID, ContainerType, Counter, HasCounterSpan, HasIdSpan, HasLamportSpan, IdSpan, + ContainerID, ContainerType, Counter, HasCounterSpan, HasId, HasIdSpan, HasLamportSpan, IdSpan, InternalString, LoroError, LoroResult, PeerID, ID, }; use num_traits::FromPrimitive; @@ -20,7 +21,6 @@ use crate::{ }, op::{Op, OpWithId, SliceRange}, state::ContainerState, - utils::id_int_map::IdIntMap, version::Frontiers, DocState, LoroDoc, OpLog, VersionVector, }; @@ -355,12 +355,14 @@ pub(crate) fn encode_snapshot(oplog: &OpLog, state: &DocState, vv: &VersionVecto let mut cid_register: ValueRegister = ValueRegister::from_existing(containers); let mut dep_arena = arena::DepsArena::default(); let mut value_writer = ValueWriter::new(); - let mut ops: Vec = Vec::new(); // This stores the required op positions of each container state. // The states can be encoded in these positions in the next step. // This data structure stores that mapping from op id to the required total order. - let mut map_op_to_pos = IdIntMap::new(); + let mut origin_ops: Vec> = Vec::new(); + let mut pos_mapping_heap: Vec = Vec::new(); + let mut pos_target_value = 0; + let mut states = Vec::new(); let mut state_bytes = Vec::new(); for (_, c_idx) in c_pairs.iter() { @@ -390,7 +392,7 @@ pub(crate) fn encode_snapshot(oplog: &OpLog, state: &DocState, vv: &VersionVecto } }, encoder_by_op: &mut |op| { - ops.push(TempOp { + origin_ops.push(TempOp { op: Cow::Owned(op.op), peer_idx: peer_register.register(&op.peer) as u32, peer_id: op.peer, @@ -401,8 +403,14 @@ pub(crate) fn encode_snapshot(oplog: &OpLog, state: &DocState, vv: &VersionVecto }); }, record_idspan: &mut |id_span| { - op_len += id_span.atom_len(); - map_op_to_pos.insert(id_span); + let len = id_span.atom_len(); + op_len += len; + pos_mapping_heap.push(PosMappingItem { + start_id: id_span.id_start(), + len, + target_value: pos_target_value, + }); + pos_target_value += len as i32; }, mode: super::EncodeMode::Snapshot, }); @@ -420,32 +428,13 @@ pub(crate) fn encode_snapshot(oplog: &OpLog, state: &DocState, vv: &VersionVecto &mut dep_arena, &mut peer_register, &mut |op| { - let mut count = 0; - let o_len = op.atom_len(); - ops.extend(map_op_to_pos.split(op).map(|x| { - count += x.atom_len(); - x - })); - - debug_assert_eq!(count, o_len); + origin_ops.push(op); }, &mut key_register, &container_idx2index, ); - ops.sort_by(move |a, b| { - a.container_index.cmp(&b.container_index).then_with(|| { - match (map_op_to_pos.get(a.id()), map_op_to_pos.get(b.id())) { - (None, None) => a - .prop_that_used_for_sort - .cmp(&b.prop_that_used_for_sort) - .then_with(|| a.peer_idx.cmp(&b.peer_idx)) - .then_with(|| a.lamport.cmp(&b.lamport)), - (None, Some(_)) => Ordering::Greater, - (Some(_), None) => Ordering::Less, - (Some(a), Some(b)) => a.0.cmp(&b.0), - } - }) - }); + + let ops: Vec = calc_sorted_ops_for_snapshot(origin_ops, pos_mapping_heap); let encoded_ops = encode_ops( ops, @@ -486,6 +475,123 @@ pub(crate) fn encode_snapshot(oplog: &OpLog, state: &DocState, vv: &VersionVecto serde_columnar::to_vec(&doc).unwrap() } +#[derive(Clone, Copy, PartialEq, Debug, Eq)] +struct PosMappingItem { + start_id: ID, + len: usize, + target_value: i32, +} + +impl Ord for PosMappingItem { + fn cmp(&self, other: &Self) -> Ordering { + // this is reversed so that the BinaryHeap will be a min-heap + other.start_id.cmp(&self.start_id) + } +} + +impl PartialOrd for PosMappingItem { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl PosMappingItem { + fn split(&mut self, pos: usize) -> Self { + let new_len = self.len - pos; + self.len = pos; + PosMappingItem { + start_id: self.start_id.inc(pos as i32), + len: new_len, + target_value: self.target_value + pos as i32, + } + } +} + +fn calc_sorted_ops_for_snapshot<'a>( + mut origin_ops: Vec>, + mut pos_mapping_heap: Vec, +) -> Vec> { + origin_ops.sort_unstable(); + pos_mapping_heap.sort_unstable(); + let mut ops: Vec> = Vec::with_capacity(origin_ops.len()); + let ops_len: usize = origin_ops.iter().map(|x| x.atom_len()).sum(); + let mut origin_top = origin_ops.pop(); + let mut pos_top = pos_mapping_heap.pop(); + + while origin_top.is_some() || pos_top.is_some() { + let Some(mut inner_origin_top) = origin_top else { + unreachable!() + }; + + let Some(mut inner_pos_top) = pos_top else { + ops.push(inner_origin_top); + origin_top = origin_ops.pop(); + continue; + }; + + match inner_origin_top.id_start().cmp(&inner_pos_top.start_id) { + std::cmp::Ordering::Less => { + if inner_origin_top.id_end() <= inner_pos_top.start_id { + ops.push(inner_origin_top); + origin_top = origin_ops.pop(); + } else { + let delta = + inner_pos_top.start_id.counter - inner_origin_top.id_start().counter; + let right = inner_origin_top.split(delta as usize); + ops.push(inner_origin_top); + origin_top = Some(right); + } + } + std::cmp::Ordering::Equal => { + match inner_origin_top.atom_len().cmp(&inner_pos_top.len) { + std::cmp::Ordering::Less => { + // origin top is shorter than pos mapping, + // need to split the pos mapping + let len = inner_origin_top.atom_len(); + inner_origin_top.prop_that_used_for_sort = + i32::MIN + inner_pos_top.target_value; + ops.push(inner_origin_top); + let next = inner_pos_top.split(len); + origin_top = origin_ops.pop(); + pos_top = Some(next); + } + std::cmp::Ordering::Equal => { + // origin op's length equal to pos mapping's length + inner_origin_top.prop_that_used_for_sort = + i32::MIN + inner_pos_top.target_value; + ops.push(inner_origin_top.clone()); + origin_top = origin_ops.pop(); + pos_top = pos_mapping_heap.pop(); + } + std::cmp::Ordering::Greater => { + // origin top is longer than pos mapping, + // need to split the origin top + let right = inner_origin_top.split(inner_pos_top.len); + inner_origin_top.prop_that_used_for_sort = + i32::MIN + inner_pos_top.target_value; + ops.push(inner_origin_top); + origin_top = Some(right); + pos_top = pos_mapping_heap.pop(); + } + } + } + std::cmp::Ordering::Greater => unreachable!(), + } + } + + ops.sort_unstable_by(|a, b| { + a.container_index.cmp(&b.container_index).then({ + a.prop_that_used_for_sort + .cmp(&b.prop_that_used_for_sort) + .then_with(|| a.peer_idx.cmp(&b.peer_idx)) + .then_with(|| a.lamport.cmp(&b.lamport)) + }) + }); + + debug_assert_eq!(ops.iter().map(|x| x.atom_len()).sum::(), ops_len); + ops +} + pub(crate) fn decode_snapshot(doc: &LoroDoc, bytes: &[u8]) -> LoroResult<()> { let mut state = doc.app_state().try_lock().map_err(|_| { LoroError::DecodeError( @@ -661,7 +767,7 @@ mod encode { InternalString, }; - #[derive(Debug)] + #[derive(Debug, Clone)] pub(super) struct TempOp<'a> { pub op: Cow<'a, Op>, pub lamport: Lamport, @@ -673,12 +779,26 @@ mod encode { pub prop_that_used_for_sort: i32, } - impl TempOp<'_> { - pub(crate) fn id(&self) -> loro_common::ID { - loro_common::ID { - peer: self.peer_id, - counter: self.op.counter, - } + impl PartialEq for TempOp<'_> { + fn eq(&self, other: &Self) -> bool { + self.peer_id == other.peer_id && self.lamport == other.lamport + } + } + + impl Eq for TempOp<'_> {} + impl Ord for TempOp<'_> { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.peer_id + .cmp(&other.peer_id) + .then(self.lamport.cmp(&other.lamport)) + // we need reverse because we'll need to use binary heap to get the smallest one + .reverse() + } + } + + impl PartialOrd for TempOp<'_> { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) } } diff --git a/crates/loro-internal/src/utils/id_int_map.rs b/crates/loro-internal/src/utils/id_int_map.rs deleted file mode 100644 index ff1f2783..00000000 --- a/crates/loro-internal/src/utils/id_int_map.rs +++ /dev/null @@ -1,341 +0,0 @@ -use itertools::Either; -use loro_common::{HasCounter, HasCounterSpan, HasId, HasIdSpan, IdSpan, ID}; -use rle::HasLength; -use std::collections::BTreeMap; - -/// A map that maps spans of continuous [ID]s to spans of continuous integers. -/// -/// It can merge spans that are adjacent to each other. -/// The value is automatically incremented by the length of the inserted span. -#[derive(Debug)] -pub struct IdIntMap { - inner: Either, Vec<(IdSpan, i32)>>, - next_value: i32, -} - -const MAX_VEC_LEN: usize = 16; - -#[derive(Debug)] -struct Value { - len: i32, - value: i32, -} - -impl IdIntMap { - pub fn new() -> Self { - Self { - inner: Either::Right(Default::default()), - next_value: 0, - } - } - - pub fn insert(&mut self, id_span: IdSpan) { - if cfg!(debug_assertions) { - let target = self.get(id_span.id_start()); - assert!( - target.is_none(), - "ID already exists {id_span:?} {target:?} {:#?}", - self - ); - } - - match &mut self.inner { - Either::Left(map) => { - let value = self.next_value; - let len = id_span.atom_len() as i32; - self.next_value += len; - - let id = id_span.id_start(); - match map.range_mut(..&id).last() { - Some(last) - if last.0.peer == id.peer - && last.0.counter + last.1.len == id.counter - && last.1.value + last.1.len == value => - { - // merge - last.1.len += len; - } - _ => { - map.insert(id, Value { len, value }); - } - } - } - Either::Right(vec) => { - if vec.len() == MAX_VEC_LEN { - // convert to map and insert - self.escalate_to_map(); - self.insert(id_span); - return; - } - - let value = self.next_value; - let len = id_span.atom_len() as i32; - self.next_value += len; - - let pos = match vec.binary_search_by(|x| x.0.id_start().cmp(&id_span.id_start())) { - Ok(_) => unreachable!("ID already exists"), - Err(i) => i, - }; - - if pos > 0 { - if let Some(last) = vec.get_mut(pos - 1) { - if last.0.id_end() == id_span.id_start() - && last.1 + last.0.atom_len() as i32 == value - { - // can merge - last.0.counter.end += len; - return; - } - } - } - - vec.insert(pos, (id_span, value)); - } - } - } - - fn escalate_to_map(&mut self) { - let Either::Right(vec) = &mut self.inner else { - return; - }; - let mut map = BTreeMap::new(); - for (id_span, value) in vec.drain(..) { - map.insert( - id_span.id_start(), - Value { - len: id_span.atom_len() as i32, - value, - }, - ); - } - - self.inner = Either::Left(map); - } - - /// Return (value, length) that starts at the given ID. - pub fn get(&self, target: ID) -> Option<(i32, usize)> { - let ans = match &self.inner { - Either::Left(map) => map.range(..=&target).last().and_then(|(entry_key, value)| { - if entry_key.peer != target.peer { - None - } else if entry_key.counter + value.len > target.counter { - Some(( - value.value + target.counter - entry_key.counter, - (entry_key.counter + value.len - target.counter) as usize, - )) - } else { - None - } - }), - Either::Right(vec) => vec - .iter() - .rev() - .find(|(id_span, _)| id_span.contains(target)) - .map(|(id_span, value)| { - ( - *value + target.counter - id_span.ctr_start(), - (id_span.ctr_end() - target.counter) as usize, - ) - }), - }; - ans - } - - /// Call `next` for each key-value pair that is in the given span. - /// It's guaranteed that the keys are in ascending order. - pub fn get_values_in_span(&self, target: IdSpan, mut next: impl FnMut(IdSpan, i32)) { - let target_peer = target.client_id; - match &self.inner { - Either::Left(map) => { - let last = map - .range(..&target.id_start()) - .next_back() - .and_then(|(id, v)| { - if id.peer != target_peer { - None - } else if id.counter + v.len > target.ctr_start() { - Some((id, v)) - } else { - None - } - }); - - let iter = map.range(&target.id_start()..); - for (entry_key, value) in last.into_iter().chain(iter) { - if entry_key.peer > target_peer { - break; - } - - if entry_key.counter >= target.ctr_end() { - break; - } - - assert_eq!(entry_key.peer, target_peer); - let cur_span = &IdSpan::new( - target_peer, - entry_key.counter, - entry_key.counter + value.len, - ); - - let next_span = cur_span.get_intersection(&target).unwrap(); - (next)( - next_span, - value.value + next_span.counter.start - entry_key.counter, - ); - } - } - Either::Right(vec) => { - for (id_span, value) in vec.iter() { - if id_span.client_id < target_peer { - continue; - } - - if id_span.client_id > target_peer { - break; - } - - if target.ctr_start() >= id_span.ctr_end() { - continue; - } - - if target.ctr_end() <= id_span.counter.start { - break; - } - - assert_eq!(id_span.client_id, target_peer); - let next_span = id_span.get_intersection(&target).unwrap(); - (next)( - next_span, - *value + next_span.counter.start - id_span.counter.start, - ); - } - } - } - } - - /// If the given item has overlapped section with the content in the map, - /// split the item into pieces where each piece maps to a continuous series of values or maps to none. - pub(crate) fn split<'a, T: HasIdSpan + generic_btree::rle::Sliceable + 'a>( - &'a self, - item: T, - ) -> impl Iterator + 'a { - let len = item.rle_len(); - let span = item.id_span(); - // PERF: we may avoid this alloc if get_values_in_span returns an iter - let mut ans = Vec::new(); - let mut ctr_start = span.ctr_start(); - let mut index = 0; - let ctr_end = span.ctr_end(); - self.get_values_in_span(span, |id_span: IdSpan, _| { - if id_span.counter.start == ctr_start && id_span.counter.end == ctr_end { - return; - } - - if id_span.counter.start > ctr_start { - ans.push( - item.slice( - index as usize..(index + id_span.counter.start - ctr_start) as usize, - ), - ); - index += id_span.counter.start - ctr_start; - } - - ans.push(item.slice( - index as usize..(index + id_span.counter.end - id_span.counter.start) as usize, - )); - index += id_span.counter.end - id_span.counter.start; - ctr_start = id_span.ctr_end(); - }); - - if ans.is_empty() && len > 0 { - ans.push(item); - } else if index as usize != len { - ans.push(item.slice(index as usize..len)); - } - - ans.into_iter() - } -} - -#[cfg(test)] -mod test { - use super::*; - - #[test] - fn test_basic() { - let mut map = IdIntMap::new(); - map.insert(IdSpan::new(0, 0, 10)); - map.insert(IdSpan::new(0, 10, 100)); - map.insert(IdSpan::new(1, 0, 100)); - map.insert(IdSpan::new(2, 0, 100)); - map.insert(IdSpan::new(999, 0, 100)); - assert!(map.inner.is_right()); - assert_eq!(map.get(ID::new(0, 10)).unwrap().0, 10); - assert_eq!(map.get(ID::new(1, 10)).unwrap().0, 110); - assert_eq!(map.get(ID::new(2, 10)).unwrap().0, 210); - assert_eq!(map.get(ID::new(0, 0)).unwrap().0, 0); - assert_eq!(map.get(ID::new(1, 0)).unwrap().0, 100); - assert_eq!(map.get(ID::new(2, 0)).unwrap().0, 200); - assert_eq!(map.get(ID::new(999, 99)).unwrap().0, 399); - - for i in 0..100 { - map.insert(IdSpan::new(3, i * 2, i * 2 + 1)); - } - - assert!(map.inner.is_left()); - assert_eq!(map.get(ID::new(0, 10)).unwrap().0, 10); - assert_eq!(map.get(ID::new(1, 10)).unwrap().0, 110); - assert_eq!(map.get(ID::new(2, 10)).unwrap().0, 210); - assert_eq!(map.get(ID::new(0, 0)).unwrap().0, 0); - assert_eq!(map.get(ID::new(1, 0)).unwrap().0, 100); - assert_eq!(map.get(ID::new(2, 0)).unwrap().0, 200); - assert_eq!(map.get(ID::new(999, 99)).unwrap().0, 399); - for i in 0..100 { - assert_eq!(map.get(ID::new(3, i * 2)).unwrap().0, i + 400, "i = {i}"); - } - - let mut called = 0; - map.get_values_in_span(IdSpan::new(0, 3, 66), |id_span, value| { - called += 1; - assert_eq!(id_span, IdSpan::new(0, 3, 66)); - assert_eq!(value, 3); - }); - assert_eq!(called, 1); - - let mut called = Vec::new(); - map.get_values_in_span(IdSpan::new(3, 0, 10), |id_span, value| { - called.push((id_span, value)); - }); - assert_eq!( - called, - vec![ - (IdSpan::new(3, 0, 1), 400), - (IdSpan::new(3, 2, 3), 401), - (IdSpan::new(3, 4, 5), 402), - (IdSpan::new(3, 6, 7), 403), - (IdSpan::new(3, 8, 9), 404), - ] - ); - } - - #[test] - fn test_get_values() { - let mut map = IdIntMap::new(); - map.insert(IdSpan::new(0, 3, 5)); - map.insert(IdSpan::new(0, 0, 1)); - map.insert(IdSpan::new(0, 2, 3)); - - let mut called = Vec::new(); - map.get_values_in_span(IdSpan::new(0, 0, 10), |id_span, value| { - called.push((id_span, value)); - }); - assert_eq!( - called, - vec![ - (IdSpan::new(0, 0, 1), 2), - (IdSpan::new(0, 2, 3), 3), - (IdSpan::new(0, 3, 5), 0), - ] - ); - } -} diff --git a/crates/loro-internal/src/utils/mod.rs b/crates/loro-internal/src/utils/mod.rs index 9b4d6a49..9c14288f 100644 --- a/crates/loro-internal/src/utils/mod.rs +++ b/crates/loro-internal/src/utils/mod.rs @@ -1,5 +1,4 @@ pub(crate) mod delta_rle_encoded_num; -pub(crate) mod id_int_map; pub(crate) mod lazy; pub mod string_slice; pub(crate) mod utf16; diff --git a/crates/loro-internal/src/utils/string_slice.rs b/crates/loro-internal/src/utils/string_slice.rs index 85136f28..2c1012f8 100644 --- a/crates/loro-internal/src/utils/string_slice.rs +++ b/crates/loro-internal/src/utils/string_slice.rs @@ -192,14 +192,13 @@ impl DeltaValue for StringSlice { } } -pub fn unicode_range_to_byte_range(bytes: &[u8], start: usize, end: usize) -> (usize, usize) { +pub fn unicode_range_to_byte_range(s: &str, start: usize, end: usize) -> (usize, usize) { debug_assert!(start <= end); - let s = std::str::from_utf8(bytes).unwrap(); let start_unicode_index = start; let end_unicode_index = end; let mut current_utf8_index = 0; let mut start_byte = 0; - let mut end_byte = bytes.len(); + let mut end_byte = s.len(); for (current_unicode_index, c) in s.chars().enumerate() { if current_unicode_index == start_unicode_index { start_byte = current_utf8_index;