perf: refine encode size (can be better)

This commit is contained in:
Zixuan Chen 2023-08-29 19:43:35 +08:00
parent 60201989ec
commit ca9325f5ed
No known key found for this signature in database
3 changed files with 44 additions and 27 deletions

View file

@ -24,14 +24,14 @@ fn main() {
let data = loro.export_from(&Default::default()); let data = loro.export_from(&Default::default());
let start = Instant::now(); let start = Instant::now();
for _ in 0..100 { for _ in 0..10 {
let mut b = LoroDoc::default(); let mut b = LoroDoc::default();
b.detach(); b.detach();
b.import(&data).unwrap(); b.import(&data).unwrap();
} }
println!("Avg {}ms", start.elapsed().as_millis() as f64 / 100.0); println!("Avg decode {}ms", start.elapsed().as_millis() as f64 / 10.0);
println!("size len={}", data.len()); println!("size len={}", data.len());
let d = miniz_oxide::deflate::compress_to_vec(&data, 10); let d = miniz_oxide::deflate::compress_to_vec(&data, 10);
println!("size len={}", d.len()); println!("size after compress len={}", d.len());
} }

View file

@ -52,8 +52,10 @@ struct ChangeEncoding {
pub(super) peer_idx: PeerIdx, pub(super) peer_idx: PeerIdx,
#[columnar(strategy = "DeltaRle", original_type = "i64")] #[columnar(strategy = "DeltaRle", original_type = "i64")]
pub(super) timestamp: Timestamp, pub(super) timestamp: Timestamp,
#[columnar(strategy = "DeltaRle", original_type = "i64")] #[columnar(strategy = "DeltaRle")]
pub(super) op_len: u32, pub(super) op_len: u32,
#[columnar(strategy = "DeltaRle")]
pub(super) lamport: u32,
/// The length of deps that exclude the dep on the same client /// The length of deps that exclude the dep on the same client
#[columnar(strategy = "Rle")] #[columnar(strategy = "Rle")]
pub(super) deps_len: u32, pub(super) deps_len: u32,
@ -77,6 +79,8 @@ struct OpEncoding {
// if is_del != true, then the following fields is the length of unknown insertion // if is_del != true, then the following fields is the length of unknown insertion
#[columnar(strategy = "Rle", original_type = "usize")] #[columnar(strategy = "Rle", original_type = "usize")]
gc: isize, gc: isize,
#[columnar(strategy = "Rle", original_type = "usize")]
insert_len: usize,
} }
#[columnar(vec, ser, de)] #[columnar(vec, ser, de)]
@ -110,7 +114,7 @@ struct DocEncoding<'a> {
normal_containers: Vec<NormalContainer>, normal_containers: Vec<NormalContainer>,
#[serde(borrow)] #[serde(borrow)]
str: VarZeroVec<'a, str, Index32>, str: Cow<'a, str>,
#[serde(borrow)] #[serde(borrow)]
root_containers: VarZeroVec<'a, RootContainerULE, Index32>, root_containers: VarZeroVec<'a, RootContainerULE, Index32>,
@ -131,19 +135,24 @@ pub fn encode_oplog_v2(oplog: &OpLog, vv: &VersionVector) -> Vec<u8> {
let mut start_counter = Vec::new(); let mut start_counter = Vec::new();
for span in diff.left.iter() { for span in diff.left.iter() {
let change = oplog.get_change_at(span.id_start()).unwrap(); let id = span.id_start();
let changes = oplog.get_change_at(id).unwrap();
let peer_id = *span.0; let peer_id = *span.0;
let idx = peers.len() as PeerIdx; let idx = peers.len() as PeerIdx;
peers.push(peer_id); peers.push(peer_id);
peer_id_to_idx.insert(peer_id, idx); peer_id_to_idx.insert(peer_id, idx);
start_counter.push(change.id.counter); start_counter.push(changes.id.counter);
if let Some(peer_changes) = oplog.changes.get(&id.peer) {
if let Some(result) = peer_changes.get_by_atom_index(id.counter) {
for change in &peer_changes.vec()[result.merged_index..] {
diff_changes.push(change);
}
}
}
} }
debug_log::debug_dbg!(&start_vv, &self_vv); debug_log::debug_dbg!(&start_vv, &self_vv);
for (change, _) in oplog.iter_causally(start_vv, self_vv.clone()) {
diff_changes.push(change.clone());
}
let (root_containers, container_idx2index, normal_containers) = let (root_containers, container_idx2index, normal_containers) =
extract_containers(&diff_changes, oplog, &mut peer_id_to_idx, &mut peers); extract_containers(&diff_changes, oplog, &mut peer_id_to_idx, &mut peers);
@ -164,7 +173,7 @@ pub fn encode_oplog_v2(oplog: &OpLog, vv: &VersionVector) -> Vec<u8> {
let mut key_to_idx = FxHashMap::default(); let mut key_to_idx = FxHashMap::default();
let mut deps = Vec::with_capacity(change_num); let mut deps = Vec::with_capacity(change_num);
let mut values = Vec::new(); let mut values = Vec::new();
let mut strings: Vec<String> = Vec::new(); let mut string: String = String::new();
for change in &diff_changes { for change in &diff_changes {
let client_idx = peer_id_to_idx[&change.id.peer]; let client_idx = peer_id_to_idx[&change.id.peer];
@ -188,7 +197,7 @@ pub fn encode_oplog_v2(oplog: &OpLog, vv: &VersionVector) -> Vec<u8> {
let container_index = *container_idx2index.get(&container).unwrap(); let container_index = *container_idx2index.get(&container).unwrap();
let op = oplog.local_op_to_remote(op); let op = oplog.local_op_to_remote(op);
for content in op.contents.into_iter() { for content in op.contents.into_iter() {
let (prop, gc, is_del) = match content { let (prop, gc, is_del, insert_len) = match content {
crate::op::RawOpContent::Map(MapSet { key, value }) => { crate::op::RawOpContent::Map(MapSet { key, value }) => {
values.push(value.clone()); values.push(value.clone());
( (
@ -198,6 +207,7 @@ pub fn encode_oplog_v2(oplog: &OpLog, vv: &VersionVector) -> Vec<u8> {
}), }),
0, 0,
false, // always insert false, // always insert
0,
) )
} }
crate::op::RawOpContent::List(list) => match list { crate::op::RawOpContent::List(list) => match list {
@ -206,6 +216,7 @@ pub fn encode_oplog_v2(oplog: &OpLog, vv: &VersionVector) -> Vec<u8> {
ListSlice::Unknown(v) => *v as isize, ListSlice::Unknown(v) => *v as isize,
_ => 0, _ => 0,
}; };
let mut len = 0;
match slice { match slice {
ListSlice::RawData(v) => { ListSlice::RawData(v) => {
values.push(LoroValue::List(Arc::new(v.to_vec()))); values.push(LoroValue::List(Arc::new(v.to_vec())));
@ -214,25 +225,24 @@ pub fn encode_oplog_v2(oplog: &OpLog, vv: &VersionVector) -> Vec<u8> {
str, str,
unicode_len: _, unicode_len: _,
} => { } => {
strings.push(match str { len = str.len();
Cow::Borrowed(s) => s.to_string(), string.push_str(str.deref());
Cow::Owned(s) => s,
});
} }
ListSlice::Unknown(_) => {} ListSlice::Unknown(_) => {}
}; };
(pos, gc, false) (pos, gc, false, len)
} }
ListOp::Delete(span) => (span.pos as usize, span.len, true), ListOp::Delete(span) => (span.pos as usize, span.len, true, 0),
}, },
}; };
op_len += 1; op_len += 1;
ops.push(OpEncoding { ops.push(OpEncoding {
container: container_index,
prop,
gc, gc,
prop,
is_del, is_del,
insert_len,
container: container_index,
}) })
} }
} }
@ -240,6 +250,7 @@ pub fn encode_oplog_v2(oplog: &OpLog, vv: &VersionVector) -> Vec<u8> {
changes.push(ChangeEncoding { changes.push(ChangeEncoding {
peer_idx: client_idx as PeerIdx, peer_idx: client_idx as PeerIdx,
timestamp: change.timestamp, timestamp: change.timestamp,
lamport: change.lamport,
deps_len, deps_len,
op_len, op_len,
dep_on_self, dep_on_self,
@ -250,7 +261,7 @@ pub fn encode_oplog_v2(oplog: &OpLog, vv: &VersionVector) -> Vec<u8> {
changes, changes,
ops, ops,
deps, deps,
str: VarZeroVec::from(&strings), str: Cow::Owned(string),
clients: peers, clients: peers,
keys, keys,
start_counter, start_counter,
@ -267,7 +278,7 @@ pub fn encode_oplog_v2(oplog: &OpLog, vv: &VersionVector) -> Vec<u8> {
/// Containers are sorted by their peer_id and counter so that /// Containers are sorted by their peer_id and counter so that
/// they can be compressed by using delta encoding. /// they can be compressed by using delta encoding.
fn extract_containers( fn extract_containers(
diff_changes: &Vec<Change>, diff_changes: &Vec<&Change>,
oplog: &OpLog, oplog: &OpLog,
peer_id_to_idx: &mut FxHashMap<PeerID, PeerIdx>, peer_id_to_idx: &mut FxHashMap<PeerID, PeerIdx>,
peers: &mut Vec<PeerID>, peers: &mut Vec<PeerID>,
@ -348,7 +359,7 @@ pub fn decode_oplog_v2(oplog: &mut OpLog, input: &[u8]) -> Result<(), LoroError>
from_bytes(input).map_err(|e| LoroError::DecodeError(e.to_string().into()))?; from_bytes(input).map_err(|e| LoroError::DecodeError(e.to_string().into()))?;
let DocEncoding { let DocEncoding {
changes: change_encodings, changes: mut change_encodings,
ops, ops,
deps, deps,
normal_containers, normal_containers,
@ -399,7 +410,8 @@ pub fn decode_oplog_v2(oplog: &mut OpLog, input: &[u8]) -> Result<(), LoroError>
}; };
let mut value_iter = values.into_iter(); let mut value_iter = values.into_iter();
let mut str_iter = str.iter(); let mut str_index = 0;
change_encodings.sort_by_key(|x| x.lamport);
let change_iter = change_encodings.into_iter().map(|change_encoding| { let change_iter = change_encodings.into_iter().map(|change_encoding| {
let counter = start_counter let counter = start_counter
.get_mut(change_encoding.peer_idx as usize) .get_mut(change_encoding.peer_idx as usize)
@ -410,6 +422,7 @@ pub fn decode_oplog_v2(oplog: &mut OpLog, input: &[u8]) -> Result<(), LoroError>
op_len, op_len,
deps_len, deps_len,
dep_on_self, dep_on_self,
lamport: _,
} = change_encoding; } = change_encoding;
let peer_id = peers[peer_idx as usize]; let peer_id = peers[peer_idx as usize];
@ -418,6 +431,7 @@ pub fn decode_oplog_v2(oplog: &mut OpLog, input: &[u8]) -> Result<(), LoroError>
for op in op_iter.by_ref().take(op_len as usize) { for op in op_iter.by_ref().take(op_len as usize) {
let OpEncoding { let OpEncoding {
container: container_idx, container: container_idx,
insert_len,
prop, prop,
gc, gc,
is_del, is_del,
@ -448,7 +462,8 @@ pub fn decode_oplog_v2(oplog: &mut OpLog, input: &[u8]) -> Result<(), LoroError>
} else { } else {
match container_type { match container_type {
ContainerType::Text => { ContainerType::Text => {
let s = str_iter.next().unwrap(); let s = &str[str_index..str_index + insert_len];
str_index += insert_len;
RawOpContent::List(ListOp::Insert { RawOpContent::List(ListOp::Insert {
slice: ListSlice::from_str(s), slice: ListSlice::from_str(s),
pos, pos,

View file

@ -603,7 +603,9 @@ fn encode_oplog(oplog: &OpLog, state_ref: Option<PreEncodedState>) -> FinalPhase
idx idx
}; };
let Cow::Owned(mut peers) = std::mem::take(&mut common.peer_ids) else {unreachable!()}; let Cow::Owned(mut peers) = std::mem::take(&mut common.peer_ids) else {
unreachable!()
};
let mut record_peer = |peer: PeerID| { let mut record_peer = |peer: PeerID| {
if let Some(idx) = peer_lookup.get(&peer) { if let Some(idx) = peer_lookup.get(&peer) {
return *idx as u32; return *idx as u32;