From ca9325f5ed0a1da04805ba68c881754be5ae7b0e Mon Sep 17 00:00:00 2001 From: Zixuan Chen Date: Tue, 29 Aug 2023 19:43:35 +0800 Subject: [PATCH] perf: refine encode size (can be better) --- crates/loro-internal/examples/encoding.rs | 6 +- .../src/encoding/encode_enhanced.rs | 61 ++++++++++++------- crates/loro-internal/src/snapshot_encode.rs | 4 +- 3 files changed, 44 insertions(+), 27 deletions(-) diff --git a/crates/loro-internal/examples/encoding.rs b/crates/loro-internal/examples/encoding.rs index f8e38a57..de98b778 100644 --- a/crates/loro-internal/examples/encoding.rs +++ b/crates/loro-internal/examples/encoding.rs @@ -24,14 +24,14 @@ fn main() { let data = loro.export_from(&Default::default()); let start = Instant::now(); - for _ in 0..100 { + for _ in 0..10 { let mut b = LoroDoc::default(); b.detach(); b.import(&data).unwrap(); } - println!("Avg {}ms", start.elapsed().as_millis() as f64 / 100.0); + println!("Avg decode {}ms", start.elapsed().as_millis() as f64 / 10.0); println!("size len={}", data.len()); let d = miniz_oxide::deflate::compress_to_vec(&data, 10); - println!("size len={}", d.len()); + println!("size after compress len={}", d.len()); } diff --git a/crates/loro-internal/src/encoding/encode_enhanced.rs b/crates/loro-internal/src/encoding/encode_enhanced.rs index 184e4321..9e6dc2be 100644 --- a/crates/loro-internal/src/encoding/encode_enhanced.rs +++ b/crates/loro-internal/src/encoding/encode_enhanced.rs @@ -52,8 +52,10 @@ struct ChangeEncoding { pub(super) peer_idx: PeerIdx, #[columnar(strategy = "DeltaRle", original_type = "i64")] pub(super) timestamp: Timestamp, - #[columnar(strategy = "DeltaRle", original_type = "i64")] + #[columnar(strategy = "DeltaRle")] pub(super) op_len: u32, + #[columnar(strategy = "DeltaRle")] + pub(super) lamport: u32, /// The length of deps that exclude the dep on the same client #[columnar(strategy = "Rle")] pub(super) deps_len: u32, @@ -77,6 +79,8 @@ struct OpEncoding { // if is_del != true, then the following fields is the length of unknown insertion #[columnar(strategy = "Rle", original_type = "usize")] gc: isize, + #[columnar(strategy = "Rle", original_type = "usize")] + insert_len: usize, } #[columnar(vec, ser, de)] @@ -110,7 +114,7 @@ struct DocEncoding<'a> { normal_containers: Vec, #[serde(borrow)] - str: VarZeroVec<'a, str, Index32>, + str: Cow<'a, str>, #[serde(borrow)] root_containers: VarZeroVec<'a, RootContainerULE, Index32>, @@ -131,19 +135,24 @@ pub fn encode_oplog_v2(oplog: &OpLog, vv: &VersionVector) -> Vec { let mut start_counter = Vec::new(); for span in diff.left.iter() { - let change = oplog.get_change_at(span.id_start()).unwrap(); + let id = span.id_start(); + let changes = oplog.get_change_at(id).unwrap(); let peer_id = *span.0; let idx = peers.len() as PeerIdx; peers.push(peer_id); peer_id_to_idx.insert(peer_id, idx); - start_counter.push(change.id.counter); + start_counter.push(changes.id.counter); + + if let Some(peer_changes) = oplog.changes.get(&id.peer) { + if let Some(result) = peer_changes.get_by_atom_index(id.counter) { + for change in &peer_changes.vec()[result.merged_index..] { + diff_changes.push(change); + } + } + } } debug_log::debug_dbg!(&start_vv, &self_vv); - for (change, _) in oplog.iter_causally(start_vv, self_vv.clone()) { - diff_changes.push(change.clone()); - } - let (root_containers, container_idx2index, normal_containers) = extract_containers(&diff_changes, oplog, &mut peer_id_to_idx, &mut peers); @@ -164,7 +173,7 @@ pub fn encode_oplog_v2(oplog: &OpLog, vv: &VersionVector) -> Vec { let mut key_to_idx = FxHashMap::default(); let mut deps = Vec::with_capacity(change_num); let mut values = Vec::new(); - let mut strings: Vec = Vec::new(); + let mut string: String = String::new(); for change in &diff_changes { let client_idx = peer_id_to_idx[&change.id.peer]; @@ -188,7 +197,7 @@ pub fn encode_oplog_v2(oplog: &OpLog, vv: &VersionVector) -> Vec { let container_index = *container_idx2index.get(&container).unwrap(); let op = oplog.local_op_to_remote(op); for content in op.contents.into_iter() { - let (prop, gc, is_del) = match content { + let (prop, gc, is_del, insert_len) = match content { crate::op::RawOpContent::Map(MapSet { key, value }) => { values.push(value.clone()); ( @@ -198,6 +207,7 @@ pub fn encode_oplog_v2(oplog: &OpLog, vv: &VersionVector) -> Vec { }), 0, false, // always insert + 0, ) } crate::op::RawOpContent::List(list) => match list { @@ -206,6 +216,7 @@ pub fn encode_oplog_v2(oplog: &OpLog, vv: &VersionVector) -> Vec { ListSlice::Unknown(v) => *v as isize, _ => 0, }; + let mut len = 0; match slice { ListSlice::RawData(v) => { values.push(LoroValue::List(Arc::new(v.to_vec()))); @@ -214,25 +225,24 @@ pub fn encode_oplog_v2(oplog: &OpLog, vv: &VersionVector) -> Vec { str, unicode_len: _, } => { - strings.push(match str { - Cow::Borrowed(s) => s.to_string(), - Cow::Owned(s) => s, - }); + len = str.len(); + string.push_str(str.deref()); } ListSlice::Unknown(_) => {} }; - (pos, gc, false) + (pos, gc, false, len) } - ListOp::Delete(span) => (span.pos as usize, span.len, true), + ListOp::Delete(span) => (span.pos as usize, span.len, true, 0), }, }; op_len += 1; ops.push(OpEncoding { - container: container_index, - prop, gc, + prop, is_del, + insert_len, + container: container_index, }) } } @@ -240,6 +250,7 @@ pub fn encode_oplog_v2(oplog: &OpLog, vv: &VersionVector) -> Vec { changes.push(ChangeEncoding { peer_idx: client_idx as PeerIdx, timestamp: change.timestamp, + lamport: change.lamport, deps_len, op_len, dep_on_self, @@ -250,7 +261,7 @@ pub fn encode_oplog_v2(oplog: &OpLog, vv: &VersionVector) -> Vec { changes, ops, deps, - str: VarZeroVec::from(&strings), + str: Cow::Owned(string), clients: peers, keys, start_counter, @@ -267,7 +278,7 @@ pub fn encode_oplog_v2(oplog: &OpLog, vv: &VersionVector) -> Vec { /// Containers are sorted by their peer_id and counter so that /// they can be compressed by using delta encoding. fn extract_containers( - diff_changes: &Vec, + diff_changes: &Vec<&Change>, oplog: &OpLog, peer_id_to_idx: &mut FxHashMap, peers: &mut Vec, @@ -348,7 +359,7 @@ pub fn decode_oplog_v2(oplog: &mut OpLog, input: &[u8]) -> Result<(), LoroError> from_bytes(input).map_err(|e| LoroError::DecodeError(e.to_string().into()))?; let DocEncoding { - changes: change_encodings, + changes: mut change_encodings, ops, deps, normal_containers, @@ -399,7 +410,8 @@ pub fn decode_oplog_v2(oplog: &mut OpLog, input: &[u8]) -> Result<(), LoroError> }; let mut value_iter = values.into_iter(); - let mut str_iter = str.iter(); + let mut str_index = 0; + change_encodings.sort_by_key(|x| x.lamport); let change_iter = change_encodings.into_iter().map(|change_encoding| { let counter = start_counter .get_mut(change_encoding.peer_idx as usize) @@ -410,6 +422,7 @@ pub fn decode_oplog_v2(oplog: &mut OpLog, input: &[u8]) -> Result<(), LoroError> op_len, deps_len, dep_on_self, + lamport: _, } = change_encoding; let peer_id = peers[peer_idx as usize]; @@ -418,6 +431,7 @@ pub fn decode_oplog_v2(oplog: &mut OpLog, input: &[u8]) -> Result<(), LoroError> for op in op_iter.by_ref().take(op_len as usize) { let OpEncoding { container: container_idx, + insert_len, prop, gc, is_del, @@ -448,7 +462,8 @@ pub fn decode_oplog_v2(oplog: &mut OpLog, input: &[u8]) -> Result<(), LoroError> } else { match container_type { ContainerType::Text => { - let s = str_iter.next().unwrap(); + let s = &str[str_index..str_index + insert_len]; + str_index += insert_len; RawOpContent::List(ListOp::Insert { slice: ListSlice::from_str(s), pos, diff --git a/crates/loro-internal/src/snapshot_encode.rs b/crates/loro-internal/src/snapshot_encode.rs index 98f39d2b..2af01b57 100644 --- a/crates/loro-internal/src/snapshot_encode.rs +++ b/crates/loro-internal/src/snapshot_encode.rs @@ -603,7 +603,9 @@ fn encode_oplog(oplog: &OpLog, state_ref: Option) -> FinalPhase idx }; - let Cow::Owned(mut peers) = std::mem::take(&mut common.peer_ids) else {unreachable!()}; + let Cow::Owned(mut peers) = std::mem::take(&mut common.peer_ids) else { + unreachable!() + }; let mut record_peer = |peer: PeerID| { if let Some(idx) = peer_lookup.get(&peer) { return *idx as u32;