test: setup baseline bench

This commit is contained in:
Zixuan Chen 2024-05-28 20:27:41 +08:00
parent d5de22008e
commit c84ef7df99
No known key found for this signature in database
6 changed files with 166 additions and 36 deletions

View file

@ -62,28 +62,52 @@ fn main() {
output.len(),
);
// {
// // Delta encoding
let start = Instant::now();
let blocks_bytes = loro.export_blocks();
println!("blocks time {}ms", start.elapsed().as_millis());
println!("blocks size {}", blocks_bytes.len());
let blocks_bytes_compressed = miniz_oxide::deflate::compress_to_vec(&blocks_bytes, 6);
println!(
"blocks size after compression {}",
blocks_bytes_compressed.len()
);
let start = Instant::now();
let blocks_bytes = loro.export_blocks();
println!("blocks time {}ms", start.elapsed().as_millis());
println!("blocks size {}", blocks_bytes.len());
let _blocks_bytes_compressed = miniz_oxide::deflate::compress_to_vec(&blocks_bytes, 6);
println!(
"blocks time after compression {}ms",
start.elapsed().as_millis()
);
let start = Instant::now();
let new_loro = LoroDoc::default();
new_loro.import_blocks(&blocks_bytes).unwrap();
println!("blocks decode time {:?}", start.elapsed());
// // let start = Instant::now();
// // for _ in 0..10 {
// // loro.export_from(&Default::default());
// // }
{
// Delta encoding
// // println!("Avg encode {}ms", start.elapsed().as_millis() as f64 / 10.0);
// let start = Instant::now();
// for _ in 0..10 {
// loro.export_from(&Default::default());
// }
// let data = loro.export_from(&Default::default());
// let start = Instant::now();
// for _ in 0..5 {
// let b = LoroDoc::default();
// b.import(&data).unwrap();
// }
// println!("Avg encode {}ms", start.elapsed().as_millis() as f64 / 10.0);
// println!("Avg decode {}ms", start.elapsed().as_millis() as f64 / 10.0);
// println!("size len={}", data.len());
// let d = miniz_oxide::deflate::compress_to_vec(&data, 10);
// println!("size after compress len={}", d.len());
// }
let data = loro.export_from(&Default::default());
let start = Instant::now();
for _ in 0..5 {
let b = LoroDoc::default();
b.detach();
b.import(&data).unwrap();
}
println!("Avg decode {}ms", start.elapsed().as_millis() as f64 / 10.0);
println!("size len={}", data.len());
let d = miniz_oxide::deflate::compress_to_vec(&data, 10);
println!("size after compress len={}", d.len());
}
// {
// // Snapshot encoding

View file

@ -97,10 +97,10 @@ impl<O: EstimatedSize> EstimatedSize for Change<O> {
/// Estimate the storage size of the change in bytes
#[inline]
fn estimate_storage_size(&self) -> usize {
let id_size = 5;
let lamport_size = 3;
let timestamp_size = 4;
let deps_size = (self.deps.len().max(1) - 1) * 5;
let id_size = 2;
let lamport_size = 1;
let timestamp_size = 1;
let deps_size = (self.deps.len().max(1) - 1) * 4;
let ops_size = self
.ops
.iter()

View file

@ -415,6 +415,13 @@ impl LoroDoc {
ans
}
pub fn export_blocks(&self) -> Vec<u8> {
self.commit_then_stop();
let ans = self.oplog.lock().unwrap().export_blocks();
self.renew_txn_if_auto_commit();
ans
}
#[inline(always)]
#[instrument(skip_all)]
pub fn import(&self, bytes: &[u8]) -> Result<(), LoroError> {
@ -429,6 +436,11 @@ impl LoroDoc {
ans
}
/// TODO: FIXME: HACK: This is just for experiment
pub fn import_blocks(&self, blocks_bytes: &[u8]) -> LoroResult<()> {
self.oplog.lock().unwrap().import_blocks(blocks_bytes)
}
fn _import_with(&self, bytes: &[u8], origin: InternalString) -> Result<(), LoroError> {
let parsed = parse_header_and_body(bytes)?;
match parsed.mode.is_snapshot() {

View file

@ -177,10 +177,10 @@ impl OpLog {
pub(crate) fn new() -> Self {
let arena = SharedArena::new();
Self {
change_store: ChangeStore::new(&arena),
dag: AppDag::default(),
op_groups: OpGroups::new(arena.clone()),
changes: ClientChanges::default(),
change_store: ChangeStore::new(),
arena,
next_lamport: 0,
latest_timestamp: Timestamp::default(),
@ -248,6 +248,7 @@ impl OpLog {
/// This is the **only** place to update the `OpLog.changes`
pub(crate) fn insert_new_change(&mut self, mut change: Change, _: EnsureChangeDepsAreAtTheEnd) {
self.op_groups.insert_by_change(&change);
self.change_store.insert_change(change.clone());
self.register_container_and_parent_link(&change);
let entry = self.changes.entry(change.id.peer).or_default();
match entry.last_mut() {
@ -974,6 +975,14 @@ impl OpLog {
ans
}
pub(crate) fn export_blocks(&mut self) -> Vec<u8> {
self.change_store.encode_all()
}
pub(crate) fn import_blocks(&mut self, blocks: &[u8]) -> Result<(), LoroError> {
self.change_store.decode_all(blocks)
}
}
#[derive(Debug)]

View file

@ -1,7 +1,7 @@
use bytes::Bytes;
use loro_common::{Counter, HasLamportSpan, Lamport, LoroResult, PeerID, ID};
use rle::{HasLength, RlePush};
use std::{cmp::Ordering, collections::BTreeMap};
use loro_common::{Counter, HasLamportSpan, Lamport, LoroError, LoroResult, PeerID, ID};
use rle::{HasLength, Mergable, RlePush};
use std::{cmp::Ordering, collections::BTreeMap, io::Read, sync::Arc};
mod block_encode;
mod delta_rle_encode;
use crate::{
@ -12,15 +12,71 @@ use self::block_encode::{decode_block, decode_header, encode_block, ChangesBlock
#[derive(Debug, Clone)]
pub struct ChangeStore {
arena: SharedArena,
kv: BTreeMap<ID, ChangesBlock>,
}
impl ChangeStore {
pub fn new() -> Self {
pub fn new(a: &SharedArena) -> Self {
Self {
arena: a.clone(),
kv: BTreeMap::new(),
}
}
pub fn insert_change(&mut self, mut change: Change) {
let id = change.id;
if let Some((_id, block)) = self.kv.range_mut(..id).next_back() {
match block.push_change(change) {
Ok(_) => {
return;
}
Err(c) => change = c,
}
}
self.kv.insert(id, ChangesBlock::new(change, &self.arena));
}
pub fn insert_block(&mut self, block: ChangesBlock) {
unimplemented!()
}
pub fn block_num(&self) -> usize {
self.kv.len()
}
pub(crate) fn iter_bytes(&mut self) -> impl Iterator<Item = (ID, &'_ ChangesBlockBytes)> + '_ {
self.kv
.iter_mut()
.map(|(id, block)| (*id, block.content.bytes(&self.arena)))
}
pub(crate) fn encode_all(&mut self) -> Vec<u8> {
println!("block num {}", self.kv.len());
let mut bytes = Vec::new();
for (_, block) in self.iter_bytes() {
// println!("block size {}", block.bytes.len());
leb128::write::unsigned(&mut bytes, block.bytes.len() as u64).unwrap();
bytes.extend(&block.bytes);
}
bytes
}
pub(crate) fn decode_all(&mut self, blocks: &[u8]) -> Result<(), LoroError> {
assert!(self.kv.is_empty());
let mut reader = blocks;
while !reader.is_empty() {
let size = leb128::read::unsigned(&mut reader).unwrap();
let block_bytes = &reader[0..size as usize];
let block = ChangesBlock::from_bytes(Bytes::copy_from_slice(block_bytes), &self.arena)?;
self.kv.insert(block.id(), block);
reader = &reader[size as usize..];
}
Ok(())
}
}
#[derive(Debug, Clone)]
@ -37,7 +93,7 @@ pub struct ChangesBlock {
const MAX_BLOCK_SIZE: usize = 1024 * 4;
impl ChangesBlock {
pub fn from_bytes(bytes: Bytes, arena: SharedArena) -> LoroResult<Self> {
pub fn from_bytes(bytes: Bytes, arena: &SharedArena) -> LoroResult<Self> {
let len = bytes.len();
let bytes = ChangesBlockBytes::new(bytes)?;
let peer = bytes.peer();
@ -45,7 +101,7 @@ impl ChangesBlock {
let lamport_range = bytes.lamport_range();
let content = ChangesBlockContent::Bytes(bytes);
Ok(Self {
arena,
arena: arena.clone(),
peer,
estimated_size: len,
counter_range,
@ -54,6 +110,23 @@ impl ChangesBlock {
})
}
pub fn new(change: Change, a: &SharedArena) -> Self {
let atom_len = change.atom_len();
let counter_range = (change.id.counter, change.id.counter + atom_len as Counter);
let lamport_range = (change.lamport, change.lamport + atom_len as Lamport);
let estimated_size = change.estimate_storage_size();
let peer = change.id.peer;
let content = ChangesBlockContent::Changes(vec![change]);
Self {
arena: a.clone(),
peer,
counter_range,
lamport_range,
estimated_size,
content,
}
}
pub fn cmp_id(&self, id: ID) -> Ordering {
self.peer.cmp(&id.peer).then_with(|| {
if self.counter_range.0 > id.counter {
@ -89,12 +162,24 @@ impl ChangesBlock {
let atom_len = change.atom_len();
self.lamport_range.1 = change.lamport + atom_len as Lamport;
self.counter_range.1 = change.id.counter + atom_len as Counter;
self.estimated_size += change.estimate_storage_size();
let changes = self.content.changes_mut().unwrap();
changes.push_rle_element(change);
match changes.last_mut() {
Some(last) if last.is_mergable(&change, &()) => {
last.merge(&change, &());
}
_ => {
self.estimated_size += change.estimate_storage_size();
changes.push(change);
}
}
Ok(())
}
}
fn id(&self) -> ID {
ID::new(self.peer, self.counter_range.0)
}
}
#[derive(Clone)]
@ -117,14 +202,14 @@ impl ChangesBlockContent {
}
}
pub fn bytes(&mut self) -> &ChangesBlockBytes {
pub fn bytes(&mut self, a: &SharedArena) -> &ChangesBlockBytes {
match self {
ChangesBlockContent::Bytes(bytes) => bytes,
ChangesBlockContent::Both(_, bytes) => bytes,
ChangesBlockContent::Changes(changes) => {
let bytes = ChangesBlockBytes::serialize(changes, &SharedArena::new());
let bytes = ChangesBlockBytes::serialize(changes, a);
*self = ChangesBlockContent::Both(std::mem::take(changes), bytes);
self.bytes()
self.bytes(a)
}
}
}
@ -165,7 +250,7 @@ impl std::fmt::Debug for ChangesBlockContent {
}
#[derive(Clone)]
struct ChangesBlockBytes {
pub(crate) struct ChangesBlockBytes {
bytes: Bytes,
header: ChangesBlockHeader,
}

View file

@ -360,7 +360,7 @@ pub fn decode_header(m_bytes: &[u8]) -> LoroResult<ChangesBlockHeader> {
let n_changes = n_changes as usize;
let peer_num = peers_bytes.len() / 8;
let mut peers = Vec::with_capacity(peer_num as usize);
for i in 0..(n_changes as usize) {
for i in 0..peer_num as usize {
let peer_id =
PeerID::from_le_bytes((&peers_bytes[(8 * i)..(8 * (i + 1))]).try_into().unwrap());
peers.push(peer_id);