fix: import change merge err

This commit is contained in:
Zixuan Chen 2024-05-28 22:09:15 +08:00
parent c84ef7df99
commit fd6e945d4b
No known key found for this signature in database
5 changed files with 122 additions and 70 deletions

View file

@ -73,8 +73,8 @@ fn main() {
);
let start = Instant::now();
let blocks_bytes = loro.export_blocks();
println!("blocks time {}ms", start.elapsed().as_millis());
println!("blocks size {}", blocks_bytes.len());
println!("export blocks time {:?} (2nd time)", start.elapsed());
println!("export blocks size {}", blocks_bytes.len());
let _blocks_bytes_compressed = miniz_oxide::deflate::compress_to_vec(&blocks_bytes, 6);
println!(
"blocks time after compression {}ms",
@ -88,22 +88,26 @@ fn main() {
{
// Delta encoding
// let start = Instant::now();
// for _ in 0..10 {
// loro.export_from(&Default::default());
// }
let start = Instant::now();
for _ in 0..10 {
loro.export_from(&Default::default());
}
// println!("Avg encode {}ms", start.elapsed().as_millis() as f64 / 10.0);
println!("Avg encode {}ms", start.elapsed().as_millis() as f64 / 10.0);
let data = loro.export_from(&Default::default());
let start = Instant::now();
for _ in 0..5 {
let n = 5;
for _ in 0..n {
let b = LoroDoc::default();
b.detach();
b.import(&data).unwrap();
}
println!("Avg decode {}ms", start.elapsed().as_millis() as f64 / 10.0);
println!(
"Avg normal decode {}ms (without applying)",
start.elapsed().as_millis() as f64 / (n as f64)
);
println!("size len={}", data.len());
let d = miniz_oxide::deflate::compress_to_vec(&data, 10);
println!("size after compress len={}", d.len());

View file

@ -258,11 +258,13 @@ impl OpLog {
last.ctr_end(),
"change id is not continuous"
);
let merge_interval = self.configure.merge_interval();
let timestamp_change = change.timestamp - last.timestamp;
// TODO: make this a config
if !last.has_dependents
&& change.deps_on_self()
&& timestamp_change < self.configure.merge_interval()
&& timestamp_change < merge_interval
{
for op in take(change.ops.vec_mut()) {
last.ops.push(op);

View file

@ -10,6 +10,8 @@ use crate::{
use self::block_encode::{decode_block, decode_header, encode_block, ChangesBlockHeader};
const MAX_BLOCK_SIZE: usize = 1024 * 4;
#[derive(Debug, Clone)]
pub struct ChangeStore {
arena: SharedArena,
@ -56,7 +58,7 @@ impl ChangeStore {
println!("block num {}", self.kv.len());
let mut bytes = Vec::new();
for (_, block) in self.iter_bytes() {
// println!("block size {}", block.bytes.len());
println!("block size {}", block.bytes.len());
leb128::write::unsigned(&mut bytes, block.bytes.len() as u64).unwrap();
bytes.extend(&block.bytes);
}
@ -90,12 +92,10 @@ pub struct ChangesBlock {
content: ChangesBlockContent,
}
const MAX_BLOCK_SIZE: usize = 1024 * 4;
impl ChangesBlock {
pub fn from_bytes(bytes: Bytes, arena: &SharedArena) -> LoroResult<Self> {
let len = bytes.len();
let bytes = ChangesBlockBytes::new(bytes)?;
let mut bytes = ChangesBlockBytes::new(bytes);
let peer = bytes.peer();
let counter_range = bytes.counter_range();
let lamport_range = bytes.lamport_range();
@ -156,25 +156,41 @@ impl ChangesBlock {
}
pub fn push_change(&mut self, change: Change) -> Result<(), Change> {
if self.is_full() {
Err(change)
} else {
let atom_len = change.atom_len();
self.lamport_range.1 = change.lamport + atom_len as Lamport;
self.counter_range.1 = change.id.counter + atom_len as Counter;
let next_lamport = change.lamport + atom_len as Lamport;
let next_counter = change.id.counter + atom_len as Counter;
let is_full = self.is_full();
let changes = self.content.changes_mut().unwrap();
let merge_interval = 10000; // TODO: FIXME: Use configure
match changes.last_mut() {
Some(last) if last.is_mergable(&change, &()) => {
last.merge(&change, &());
Some(last)
if change.deps_on_self()
&& change.timestamp - last.timestamp < merge_interval
&& (!is_full
|| (change.ops.len() == 1
&& last.ops.last().unwrap().is_mergable(&change.ops[0], &()))) =>
{
for op in change.ops.into_iter() {
let size = op.estimate_storage_size();
if !last.ops.push(op) {
self.estimated_size += size;
}
}
}
_ => {
if is_full {
return Err(change);
} else {
self.estimated_size += change.estimate_storage_size();
changes.push(change);
}
}
Ok(())
}
self.counter_range.1 = next_counter;
self.lamport_range.1 = next_lamport;
Ok(())
}
fn id(&self) -> ID {
@ -252,48 +268,65 @@ impl std::fmt::Debug for ChangesBlockContent {
#[derive(Clone)]
pub(crate) struct ChangesBlockBytes {
bytes: Bytes,
header: ChangesBlockHeader,
header: Option<ChangesBlockHeader>,
}
impl ChangesBlockBytes {
fn new(bytes: Bytes) -> LoroResult<Self> {
Ok(Self {
header: decode_header(&bytes)?,
fn new(bytes: Bytes) -> Self {
Self {
header: None,
bytes,
})
}
}
fn parse(&self, a: &SharedArena) -> LoroResult<Vec<Change>> {
decode_block(&self.bytes, a, &self.header)
fn ensure_header(&mut self) -> LoroResult<()> {
if self.header.is_none() {
self.header = Some(decode_header(&self.bytes)?);
}
Ok(())
}
fn parse(&mut self, a: &SharedArena) -> LoroResult<Vec<Change>> {
self.ensure_header()?;
decode_block(&self.bytes, a, self.header.as_ref())
}
fn serialize(changes: &[Change], a: &SharedArena) -> Self {
let bytes = encode_block(changes, a);
// TODO: Perf we can calculate header directly without parsing the bytes
Self::new(Bytes::from(bytes)).unwrap()
let mut bytes = ChangesBlockBytes::new(Bytes::from(bytes));
bytes.ensure_header().unwrap();
bytes
}
fn peer(&self) -> PeerID {
self.header.peer
fn peer(&mut self) -> PeerID {
self.ensure_header().unwrap();
self.header.as_ref().unwrap().peer
}
fn counter_range(&self) -> (Counter, Counter) {
(self.header.counter, *self.header.counters.last().unwrap())
}
fn lamport_range(&self) -> (Lamport, Lamport) {
fn counter_range(&mut self) -> (Counter, Counter) {
self.ensure_header().unwrap();
(
self.header.lamports[0],
*self.header.lamports.last().unwrap(),
self.header.as_ref().unwrap().counter,
*self.header.as_ref().unwrap().counters.last().unwrap(),
)
}
fn lamport_range(&mut self) -> (Lamport, Lamport) {
self.ensure_header().unwrap();
(
self.header.as_ref().unwrap().lamports[0],
*self.header.as_ref().unwrap().lamports.last().unwrap(),
)
}
/// Length of the changes
fn len_changes(&self) -> usize {
self.header.n_changes
fn len_changes(&mut self) -> usize {
self.ensure_header().unwrap();
self.header.as_ref().unwrap().n_changes
}
fn find_deps_for(&self, id: ID) -> Frontiers {
fn find_deps_for(&mut self, id: ID) -> Frontiers {
unimplemented!()
}
}

View file

@ -233,6 +233,7 @@ pub fn encode_block(block: &[Change], arena: &SharedArena) -> Vec<u8> {
// │ │ │ │ │
// └──────────┴──────────┴───────┴────────────────────────────────┘
println!("ops num {}", encoded_ops.len());
let ops_bytes = serde_columnar::to_vec(&EncodedOpsAndDeleteStarts {
ops: encoded_ops,
delete_start_ids: del_starts,
@ -331,8 +332,15 @@ pub(crate) struct ChangesBlockHeader {
}
pub fn decode_header(m_bytes: &[u8]) -> LoroResult<ChangesBlockHeader> {
let doc = postcard::from_bytes(m_bytes).map_err(|e| {
LoroError::DecodeError(format!("Decode block error {}", e).into_boxed_str())
})?;
decode_header_from_doc(&doc)
}
fn decode_header_from_doc(doc: &EncodedDoc) -> Result<ChangesBlockHeader, LoroError> {
let EncodedDoc {
version,
n_changes,
first_counter,
peers: peers_bytes,
@ -342,22 +350,18 @@ pub fn decode_header(m_bytes: &[u8]) -> LoroResult<ChangesBlockHeader> {
dep_peer_idxs,
dep_counters,
lamports,
timestamps,
commit_msg_lengths,
commit_msgs,
cids,
keys,
ops,
values,
} = postcard::from_bytes(m_bytes).map_err(|e| {
LoroError::DecodeError(format!("Decode block error {}", e).into_boxed_str())
})?;
if version != VERSION {
return Err(LoroError::IncompatibleFutureEncodingError(version as usize));
version,
..
} = doc;
if *version != VERSION {
return Err(LoroError::IncompatibleFutureEncodingError(
*version as usize,
));
}
let first_counter = first_counter as Counter;
let n_changes = n_changes as usize;
let first_counter = *first_counter as Counter;
let n_changes = *n_changes as usize;
let peer_num = peers_bytes.len() / 8;
let mut peers = Vec::with_capacity(peer_num as usize);
for i in 0..peer_num as usize {
@ -481,15 +485,22 @@ impl<'a> ValueDecodedArenasTrait for ValueDecodeArena<'a> {
pub fn decode_block(
m_bytes: &[u8],
shared_arena: &SharedArena,
header: &ChangesBlockHeader,
header: Option<&ChangesBlockHeader>,
) -> LoroResult<Vec<Change>> {
let mut changes = Vec::with_capacity(header.n_changes);
let doc = postcard::from_bytes(m_bytes).map_err(|e| {
LoroError::DecodeError(format!("Decode block error {}", e).into_boxed_str())
})?;
let mut header_on_stack = None;
let header = header.unwrap_or_else(|| {
header_on_stack = Some(decode_header_from_doc(&doc).unwrap());
&header_on_stack.as_ref().unwrap()
});
let EncodedDoc {
version,
n_changes,
first_counter,
peers: peers_bytes,
lengths: lengths_bytes,
peers,
lengths,
dep_on_self,
dep_len,
dep_peer_idxs,
@ -502,9 +513,8 @@ pub fn decode_block(
keys,
ops,
values,
} = postcard::from_bytes(m_bytes).map_err(|e| {
LoroError::DecodeError(format!("Decode block error {}", e).into_boxed_str())
})?;
} = doc;
let mut changes = Vec::with_capacity(n_changes as usize);
if version != VERSION {
return Err(LoroError::IncompatibleFutureEncodingError(version as usize));
}

View file

@ -256,15 +256,18 @@ where
A::Item: Mergable + HasLength,
{
/// push a new element to the end of the array. It may be merged with last element.
pub fn push(&mut self, value: A::Item) {
///
/// Return whether merged
pub fn push(&mut self, value: A::Item) -> bool {
if let Some(last) = self.vec.last_mut() {
if last.is_mergable(&value, &()) {
last.merge(&value, &());
return;
return true;
}
}
self.vec.push(value);
false
}
}
impl<A: Array> RleVec<A>