Merge pull request #449 from loro-dev/zxch3n/loro-912-opt-export-updates-block-encode

Optimize block encoding schema & new encoding schema for updates
This commit is contained in:
Zixuan Chen 2024-09-12 19:45:27 +08:00 committed by GitHub
commit f39b34e854
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
21 changed files with 559 additions and 365 deletions

24
Cargo.lock generated
View file

@ -491,9 +491,9 @@ dependencies = [
[[package]]
name = "darling"
version = "0.20.3"
version = "0.20.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0209d94da627ab5605dcccf08bb18afa5009cfbef48d8a8b7d7bdbc79be25c5e"
checksum = "6f63b86c8a8826a49b8c21f08a2d07338eec8d900540f8630dc76284be802989"
dependencies = [
"darling_core",
"darling_macro",
@ -501,9 +501,9 @@ dependencies = [
[[package]]
name = "darling_core"
version = "0.20.3"
version = "0.20.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "177e3443818124b357d8e76f53be906d60937f0d3a90773a664fa63fa253e621"
checksum = "95133861a8032aaea082871032f5815eb9e98cef03fa916ab4500513994df9e5"
dependencies = [
"fnv",
"ident_case",
@ -515,9 +515,9 @@ dependencies = [
[[package]]
name = "darling_macro"
version = "0.20.3"
version = "0.20.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "836a9bbc7ad63342d6d6e7b815ccab164bc77a2d95d84bc3117a8c0d5c98e2d5"
checksum = "d336a2a514f6ccccaa3e09b02d41d35330c07ddf03a62165fcec10bb561c7806"
dependencies = [
"darling_core",
"quote 1.0.35",
@ -2035,9 +2035,9 @@ dependencies = [
[[package]]
name = "serde_columnar"
version = "0.3.7"
version = "0.3.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3748cbf2b43a15ee9627881cabd7820d50508781cdebd3bb54cea49215d367a1"
checksum = "6d4e3c0e46450edf7da174b610b9143eb8ca22059ace5016741fc9e20b88d1e7"
dependencies = [
"itertools 0.11.0",
"postcard",
@ -2048,9 +2048,9 @@ dependencies = [
[[package]]
name = "serde_columnar_derive"
version = "0.3.4"
version = "0.3.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3e5eaacabbc55a397ffbb1ee32523f40f86fdefea8a8d9db19630d8b7c00edd1"
checksum = "42c5d47942b2a7e76118b697fc0f94516a5d8366a3c0fee8d0e2b713e952e306"
dependencies = [
"darling",
"proc-macro2 1.0.75",
@ -2162,9 +2162,9 @@ dependencies = [
[[package]]
name = "strsim"
version = "0.10.0"
version = "0.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623"
checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f"
[[package]]
name = "syn"

View file

@ -20,7 +20,7 @@ enum_dispatch = "0.3.11"
enum-as-inner = "0.5.1"
fxhash = "0.2.1"
tracing = { version = "0.1" }
serde_columnar = { version = "0.3.7" }
serde_columnar = { version = "0.3.10" }
serde_json = "1.0"
thiserror = "1"
smallvec = { version = "1.8.0", features = ["serde"] }

View file

@ -1,4 +1,3 @@
use std::time::Instant;
use dev_utils::{get_mem_usage, ByteSize};
use loro::{CommitOptions, LoroCounter, LoroDoc, LoroMap};

View file

@ -1,6 +1,6 @@
use dev_utils::ByteSize;
use loro::LoroDoc;
use std::time::{Duration, Instant};
use std::time::Instant;
pub fn bench_fast_snapshot(doc: &LoroDoc) {
let old_v;

View file

@ -152,9 +152,9 @@ checksum = "22ec99545bb0ed0ea7bb9b8e1e9122ea386ff8a48c0922e43f36d45ab09e0e80"
[[package]]
name = "darling"
version = "0.20.7"
version = "0.20.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3a5d17510e4a1a87f323de70b7b1eaac1ee0e37866c6720b2d279452d0edf389"
checksum = "6f63b86c8a8826a49b8c21f08a2d07338eec8d900540f8630dc76284be802989"
dependencies = [
"darling_core",
"darling_macro",
@ -162,9 +162,9 @@ dependencies = [
[[package]]
name = "darling_core"
version = "0.20.7"
version = "0.20.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a98eea36a7ff910fa751413d0895551143a8ea41d695d9798ec7d665df7f7f5e"
checksum = "95133861a8032aaea082871032f5815eb9e98cef03fa916ab4500513994df9e5"
dependencies = [
"fnv",
"ident_case",
@ -176,9 +176,9 @@ dependencies = [
[[package]]
name = "darling_macro"
version = "0.20.7"
version = "0.20.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d6a366a3f90c5d59a4b91169775f88e52e8f71a0e7804cc98a8db2932cf4ed57"
checksum = "d336a2a514f6ccccaa3e09b02d41d35330c07ddf03a62165fcec10bb561c7806"
dependencies = [
"darling_core",
"quote",
@ -1180,9 +1180,9 @@ dependencies = [
[[package]]
name = "serde_columnar"
version = "0.3.7"
version = "0.3.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3748cbf2b43a15ee9627881cabd7820d50508781cdebd3bb54cea49215d367a1"
checksum = "6d4e3c0e46450edf7da174b610b9143eb8ca22059ace5016741fc9e20b88d1e7"
dependencies = [
"itertools 0.11.0",
"postcard",
@ -1193,9 +1193,9 @@ dependencies = [
[[package]]
name = "serde_columnar_derive"
version = "0.3.4"
version = "0.3.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3e5eaacabbc55a397ffbb1ee32523f40f86fdefea8a8d9db19630d8b7c00edd1"
checksum = "42c5d47942b2a7e76118b697fc0f94516a5d8366a3c0fee8d0e2b713e952e306"
dependencies = [
"darling",
"proc-macro2",
@ -1287,9 +1287,9 @@ dependencies = [
[[package]]
name = "strsim"
version = "0.10.0"
version = "0.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623"
checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f"
[[package]]
name = "syn"

View file

@ -257,7 +257,7 @@ impl Change {
}
/// [Unix time](https://en.wikipedia.org/wiki/Unix_time)
/// It is the number of seconds that have elapsed since 00:00:00 UTC on 1 January 1970.
/// It is the number of milliseconds that have elapsed since 00:00:00 UTC on 1 January 1970.
#[cfg(not(all(feature = "wasm", target_arch = "wasm32")))]
pub(crate) fn get_sys_timestamp() -> Timestamp {
use std::time::{SystemTime, UNIX_EPOCH};

View file

@ -6,7 +6,7 @@ use generic_btree::{
};
use loro_common::{Counter, HasId, HasIdSpan, IdFull, IdSpan, Lamport, PeerID, ID};
use rle::HasLength as _;
use tracing::{instrument, trace};
use tracing::instrument;
use crate::{cursor::AbsolutePosition, VersionVector};

View file

@ -178,9 +178,8 @@ pub(crate) fn decode_oplog(
let ParsedHeaderAndBody { mode, body, .. } = parsed;
match mode {
EncodeMode::Rle | EncodeMode::Snapshot => encode_reordered::decode_updates(oplog, body),
EncodeMode::FastSnapshot | EncodeMode::FastUpdates => {
fast_snapshot::decode_oplog(oplog, body)
}
EncodeMode::FastSnapshot => fast_snapshot::decode_oplog(oplog, body),
EncodeMode::FastUpdates => fast_snapshot::decode_updates(oplog, body.to_vec().into()),
EncodeMode::Auto => unreachable!(),
}
}
@ -268,69 +267,39 @@ pub(crate) fn export_snapshot(doc: &LoroDoc) -> Vec<u8> {
}
pub(crate) fn export_fast_snapshot(doc: &LoroDoc) -> Vec<u8> {
// HEADER
let mut ans = Vec::with_capacity(MIN_HEADER_SIZE);
ans.extend(MAGIC_BYTES);
let checksum = [0; 16];
ans.extend(checksum);
ans.extend(EncodeMode::FastSnapshot.to_bytes());
// BODY
fast_snapshot::encode_snapshot(doc, &mut ans);
// CHECKSUM in HEADER
let checksum_body = &ans[20..];
let checksum = xxhash_rust::xxh32::xxh32(checksum_body, XXH_SEED);
ans[16..20].copy_from_slice(&checksum.to_le_bytes());
ans
encode_with(EncodeMode::FastSnapshot, &mut |ans| {
fast_snapshot::encode_snapshot(doc, ans);
})
}
pub(crate) fn export_fast_updates(doc: &LoroDoc, vv: &VersionVector) -> Vec<u8> {
// HEADER
let mut ans = Vec::with_capacity(MIN_HEADER_SIZE);
ans.extend(MAGIC_BYTES);
let checksum = [0; 16];
ans.extend(checksum);
ans.extend(EncodeMode::FastUpdates.to_bytes());
// BODY
fast_snapshot::encode_updates(doc, vv, &mut ans);
// CHECKSUM in HEADER
let checksum_body = &ans[20..];
let checksum = xxhash_rust::xxh32::xxh32(checksum_body, XXH_SEED);
ans[16..20].copy_from_slice(&checksum.to_le_bytes());
ans
encode_with(EncodeMode::FastUpdates, &mut |ans| {
fast_snapshot::encode_updates(doc, vv, ans);
})
}
pub(crate) fn export_fast_updates_in_range(oplog: &OpLog, spans: &[IdSpan]) -> Vec<u8> {
// HEADER
let mut ans = Vec::with_capacity(MIN_HEADER_SIZE);
ans.extend(MAGIC_BYTES);
let checksum = [0; 16];
ans.extend(checksum);
ans.extend(EncodeMode::FastUpdates.to_bytes());
// BODY
fast_snapshot::encode_updates_in_range(oplog, spans, &mut ans);
// CHECKSUM in HEADER
let checksum_body = &ans[20..];
let checksum = xxhash_rust::xxh32::xxh32(checksum_body, XXH_SEED);
ans[16..20].copy_from_slice(&checksum.to_le_bytes());
ans
encode_with(EncodeMode::FastUpdates, &mut |ans| {
fast_snapshot::encode_updates_in_range(oplog, spans, ans);
})
}
pub(crate) fn export_gc_snapshot(doc: &LoroDoc, f: &Frontiers) -> Vec<u8> {
encode_with(EncodeMode::FastSnapshot, &mut |ans| {
gc::export_gc_snapshot(doc, f, ans).unwrap();
})
}
fn encode_with(mode: EncodeMode, f: &mut dyn FnMut(&mut Vec<u8>)) -> Vec<u8> {
// HEADER
let mut ans = Vec::with_capacity(MIN_HEADER_SIZE);
ans.extend(MAGIC_BYTES);
let checksum = [0; 16];
ans.extend(checksum);
ans.extend(EncodeMode::FastSnapshot.to_bytes());
ans.extend(mode.to_bytes());
// BODY
gc::export_gc_snapshot(doc, f, &mut ans).unwrap();
f(&mut ans);
// CHECKSUM in HEADER
let checksum_body = &ans[20..];

View file

@ -499,6 +499,22 @@ impl<'a> PositionArena<'a> {
pub fn decode<'de: 'a>(bytes: &'de [u8]) -> LoroResult<Self> {
Ok(serde_columnar::from_bytes(bytes)?)
}
pub fn encode_v2(&self) -> Vec<u8> {
if self.positions.is_empty() {
return Vec::new();
}
serde_columnar::to_vec(&self).unwrap()
}
pub fn decode_v2<'de: 'a>(bytes: &'de [u8]) -> LoroResult<Self> {
if bytes.is_empty() {
return Ok(Self::default());
}
Ok(serde_columnar::from_bytes(bytes)?)
}
}
fn longest_common_prefix_length(a: &[u8], b: &[u8]) -> usize {

View file

@ -18,6 +18,7 @@ use std::io::{Read, Write};
use crate::{oplog::ChangeStore, LoroDoc, OpLog, VersionVector};
use bytes::{Buf, Bytes};
use loro_common::{IdSpan, LoroError, LoroResult};
use tracing::trace;
use super::encode_reordered::{import_changes_to_oplog, ImportChangesResult};
@ -182,35 +183,6 @@ pub(crate) fn encode_snapshot<W: std::io::Write>(doc: &LoroDoc, w: &mut W) {
);
}
pub(crate) fn encode_updates<W: std::io::Write>(doc: &LoroDoc, vv: &VersionVector, w: &mut W) {
let oplog = doc.oplog().try_lock().unwrap();
let bytes = oplog.export_from_fast(vv);
_encode_snapshot(
Snapshot {
oplog_bytes: bytes,
state_bytes: None,
gc_bytes: Bytes::new(),
},
w,
);
}
pub(crate) fn encode_updates_in_range<W: std::io::Write>(
oplog: &OpLog,
spans: &[IdSpan],
w: &mut W,
) {
let bytes = oplog.export_from_fast_in_range(spans);
_encode_snapshot(
Snapshot {
oplog_bytes: bytes,
state_bytes: None,
gc_bytes: Bytes::new(),
},
w,
);
}
pub(crate) fn decode_oplog(oplog: &mut OpLog, bytes: &[u8]) -> Result<(), LoroError> {
let oplog_len = u32::from_le_bytes(bytes[0..4].try_into().unwrap());
let oplog_bytes = &bytes[4..4 + oplog_len as usize];
@ -233,3 +205,46 @@ pub(crate) fn decode_oplog(oplog: &mut OpLog, bytes: &[u8]) -> Result<(), LoroEr
}
Ok(())
}
pub(crate) fn encode_updates<W: std::io::Write>(doc: &LoroDoc, vv: &VersionVector, w: &mut W) {
let oplog = doc.oplog().try_lock().unwrap();
oplog.export_blocks_from(vv, w);
}
pub(crate) fn encode_updates_in_range<W: std::io::Write>(
oplog: &OpLog,
spans: &[IdSpan],
w: &mut W,
) {
oplog.export_blocks_in_range(spans, w);
}
pub(crate) fn decode_updates(oplog: &mut OpLog, body: Bytes) -> Result<(), LoroError> {
let mut reader: &[u8] = body.as_ref();
let mut index = 0;
let self_vv = oplog.vv();
let mut changes = Vec::new();
while !reader.is_empty() {
let old_reader_len = reader.len();
let len = leb128::read::unsigned(&mut reader).unwrap() as usize;
index += old_reader_len - reader.len();
trace!("index={}", index);
let block_bytes = body.slice(index..index + len);
trace!("decoded block_bytes = {:?}", &block_bytes);
let new_changes = ChangeStore::decode_block_bytes(block_bytes, &oplog.arena, self_vv)?;
changes.extend(new_changes);
index += len;
reader = &reader[len..];
}
changes.sort_unstable_by_key(|x| x.lamport);
let ImportChangesResult {
latest_ids,
pending_changes,
changes_that_deps_on_trimmed_history: _,
} = import_changes_to_oplog(changes, oplog);
// TODO: PERF: should we use hashmap to filter latest_ids with the same peer first?
oplog.try_apply_pending(latest_ids);
oplog.import_unknown_lamport_pending_changes(pending_changes)?;
Ok(())
}

View file

@ -37,7 +37,7 @@ pub(crate) fn export_gc_snapshot<W: std::io::Write>(
&start_vv, &start_from,
);
let oplog_bytes = oplog.export_from_fast(&start_vv);
let oplog_bytes = oplog.export_oplog_from(&start_vv);
let latest_vv = oplog.vv();
let ops_num: usize = latest_vv.sub_iter(&start_vv).map(|x| x.atom_len()).sum();
drop(oplog);

View file

@ -12,7 +12,6 @@ use enum_as_inner::EnumAsInner;
use loro_common::{CompactIdLp, ContainerType, CounterSpan, IdFull, IdLp, IdSpan};
use rle::{HasIndex, HasLength, Mergable, Sliceable};
use serde::{ser::SerializeSeq, Deserialize, Serialize};
use smallvec::SmallVec;
use std::{borrow::Cow, ops::Range};
mod content;

View file

@ -8,7 +8,7 @@ use std::cell::RefCell;
use std::cmp::Ordering;
use std::rc::Rc;
use std::sync::{Arc, Mutex};
use tracing::{debug, instrument, trace, trace_span};
use tracing::{debug, trace, trace_span};
use self::change_store::iter::MergedChangeIter;
use self::pending_changes::PendingChanges;
@ -364,14 +364,19 @@ impl OpLog {
}
#[inline(always)]
pub(crate) fn export_from_fast(&self, vv: &VersionVector) -> Bytes {
pub(crate) fn export_oplog_from(&self, vv: &VersionVector) -> Bytes {
self.change_store
.export_from(vv, self.vv(), self.frontiers())
}
#[inline(always)]
pub(crate) fn export_from_fast_in_range(&self, spans: &[IdSpan]) -> Bytes {
self.change_store.export_from_fast_in_range(spans)
pub(crate) fn export_blocks_from<W: std::io::Write>(&self, vv: &VersionVector, w: &mut W) {
self.change_store.export_blocks_from(vv, self.vv(), w)
}
#[inline(always)]
pub(crate) fn export_blocks_in_range<W: std::io::Write>(&self, spans: &[IdSpan], w: &mut W) {
self.change_store.export_blocks_in_range(spans, w)
}
#[inline(always)]
@ -552,6 +557,7 @@ impl OpLog {
#[inline(never)]
pub(crate) fn idlp_to_id(&self, id: loro_common::IdLp) -> Option<ID> {
let change = self.change_store.get_change_by_lamport_lte(id)?;
if change.lamport > id.lamport || change.lamport_end() <= id.lamport {
return None;
}

View file

@ -1,5 +1,5 @@
use self::block_encode::{decode_block, decode_header, encode_block, ChangesBlockHeader};
use super::{loro_dag::AppDagNodeInner, AppDag, AppDagNode};
use super::{loro_dag::AppDagNodeInner, AppDagNode};
use crate::{
arena::SharedArena,
change::{Change, Timestamp},
@ -7,12 +7,12 @@ use crate::{
kv_store::KvStore,
op::Op,
parent::register_container_and_parent_link,
version::{shrink_frontiers, Frontiers, ImVersionVector},
version::{Frontiers, ImVersionVector},
VersionVector,
};
use block_encode::decode_block_range;
use bytes::Bytes;
use fxhash::{FxHashMap, FxHashSet};
use fxhash::FxHashMap;
use itertools::Itertools;
use loro_common::{
Counter, HasCounterSpan, HasId, HasIdSpan, HasLamportSpan, IdLp, IdSpan, Lamport, LoroError,
@ -22,7 +22,6 @@ use loro_kv_store::{mem_store::MemKvConfig, MemKvStore};
use once_cell::sync::OnceCell;
use rle::{HasLength, Mergable, RlePush, RleVec, Sliceable};
use std::{
borrow::BorrowMut,
cmp::Ordering,
collections::{BTreeMap, VecDeque},
ops::{Bound, Deref},
@ -31,6 +30,7 @@ use std::{
use tracing::{debug, info_span, trace, warn};
mod block_encode;
mod block_meta_encode;
mod delta_rle_encode;
pub(super) mod iter;
@ -44,6 +44,20 @@ const MAX_BLOCK_SIZE: usize = 128;
/// - We don't allow holes in a block or between two blocks with the same peer id.
/// The [Change] should be continuous for each peer.
/// - However, the first block of a peer can have counter > 0 so that we can trim the history.
///
/// # Encoding Schema
///
/// It's based on the underlying KV store.
///
/// The entries of the KV store is made up of the following fields
///
/// |Key |Value |
/// |:-- |:---- |
/// |b"vv" |VersionVector |
/// |b"fr" |Frontiers |
/// |b"sv" |Trimmed VV |
/// |b"sf" |Trimmed Frontiers |
/// |12 bytes PeerID + Counter |Encoded Block |
#[derive(Debug, Clone)]
pub struct ChangeStore {
inner: Arc<Mutex<ChangeStoreInner>>,
@ -183,10 +197,8 @@ impl ChangeStore {
new_store.encode_from(start_vv, &start_frontiers, latest_vv, latest_frontiers)
}
pub(super) fn export_from_fast_in_range(&self, spans: &[IdSpan]) -> Bytes {
pub(super) fn export_blocks_in_range<W: std::io::Write>(&self, spans: &[IdSpan], w: &mut W) {
let new_store = ChangeStore::new_mem(&self.arena, self.merge_interval.clone());
let latest_vv: VersionVector = spans.iter().map(|x| x.id_last()).collect();
let start_vv: VersionVector = spans.iter().map(|x| x.id_start()).collect();
for span in spans {
let mut span = *span;
span.normalize_();
@ -202,12 +214,7 @@ impl ChangeStore {
}
}
new_store.encode_from(
&start_vv,
&Default::default(),
&latest_vv,
&Default::default(),
)
encode_blocks_in_store(new_store, &self.arena, w);
}
fn encode_from(
@ -252,6 +259,32 @@ impl ChangeStore {
Ok(changes)
}
pub(crate) fn decode_block_bytes(
bytes: Bytes,
arena: &SharedArena,
self_vv: &VersionVector,
) -> LoroResult<Vec<Change>> {
let mut ans = ChangesBlockBytes::new(bytes).parse(arena)?;
if ans.is_empty() {
return Ok(ans);
}
let start = self_vv.get(&ans[0].peer()).copied().unwrap_or(0);
ans.retain_mut(|c| {
if c.id.counter >= start {
true
} else if c.ctr_end() > start {
*c = c.slice((start - c.id.counter) as usize, c.atom_len());
true
} else {
false
}
});
Ok(ans)
}
pub fn get_dag_nodes_that_contains(&self, id: ID) -> Option<Vec<AppDagNode>> {
let block = self.get_block_that_contains(id)?;
Some(block.content.iter_dag_nodes())
@ -452,6 +485,48 @@ impl ChangeStore {
.map(|(k, v)| k.len() + v.len())
.sum()
}
pub(crate) fn export_blocks_from<W: std::io::Write>(
&self,
start_vv: &VersionVector,
latest_vv: &VersionVector,
w: &mut W,
) {
let new_store = ChangeStore::new_mem(&self.arena, self.merge_interval.clone());
for span in latest_vv.sub_iter(start_vv) {
// PERF: this can be optimized by reusing the current encoded blocks
// In the current method, it needs to parse and re-encode the blocks
for c in self.iter_changes(span) {
let start = ((start_vv.get(&c.id.peer).copied().unwrap_or(0) - c.id.counter).max(0)
as usize)
.min(c.atom_len());
let end = ((latest_vv.get(&c.id.peer).copied().unwrap_or(0) - c.id.counter).max(0)
as usize)
.min(c.atom_len());
assert_ne!(start, end);
let ch = c.slice(start, end);
new_store.insert_change(ch, false);
}
}
let arena = &self.arena;
encode_blocks_in_store(new_store, arena, w);
}
}
fn encode_blocks_in_store<W: std::io::Write>(
new_store: ChangeStore,
arena: &SharedArena,
w: &mut W,
) {
let mut inner = new_store.inner.lock().unwrap();
for (_id, block) in inner.mem_parsed_kv.iter_mut() {
let bytes = block.to_bytes(&arena);
leb128::write::unsigned(w, bytes.bytes.len() as u64).unwrap();
trace!("encoded block_bytes = {:?}", &bytes.bytes);
w.write_all(&bytes.bytes).unwrap();
}
}
mod mut_external_kv {

View file

@ -8,9 +8,9 @@
//! Peer_1 = This Peer
//!
//!
//! ┌──────────┬─────────────────────────┬─────────────────────────┐
//! │2B Version│ LEB Counter Start&Len │ LEB Lamport Start&Len │◁───┐
//! └──────────┴─────────────────────────┴─────────────────────────┘ │
//! ┌────────────────────────┬─────────────────────────────────────
//! │ LEB Counter Start&Len │ LEB Lamport Start&Len │◁───┐
//! └────────────────────────┴─────────────────────────────────────┘ │
//! ┌──────────────┬──────────────┬────────────────────────────────┐ │
//! │ LEB N │ LEB Peer Num │ 8B Peer_1,...,Peer_x │◁───┤
//! └──────────────┴──────────────┴────────────────────────────────┘ │
@ -24,7 +24,7 @@
//! │ N LEB128 Delta Lamports │◁───┘
//! └──────────────────────────────────────────────────────────────┘
//! ┌──────────────────────────────────────────────────────────────┐
//! │ N LEB128 Delta Rle Timestamps
//! │ N LEB128 DeltaOfDelta Timestamps
//! └──────────────────────────────────────────────────────────────┘
//! ┌────────────────────────────────┬─────────────────────────────┐
//! │ N Rle Commit Msg Lengths │ Commit Messages │
@ -52,19 +52,17 @@
//! │ │ │ │ │ │
//! │ │ │ │ │ │
//! └────────┴──────────┴──────────┴───────┴───────────────────────┘
//! ┌──────────────────────────────────────────────────────────────┐
//! │ (Encoded with Ops By serde_columnar) │
//! │ Delete Start IDs │
//! │ │
//! └──────────────────────────────────────────────────────────────┘
//! ┌────────────────┬─────────────────────────────────────────────┐
//! │ │ │
//! │Value Bytes Size│ Value Bytes │
//! │ │ │
//! └────────────────┴─────────────────────────────────────────────┘
//! ┌──────────────────────────────────────────────────────────────┐
//! │ │
//! │ Delete Start IDs │
//! │ │
//! └──────────────────────────────────────────────────────────────┘
//! ```
//!
//!
use std::borrow::Cow;
use std::collections::BTreeSet;
@ -79,9 +77,10 @@ use loro_common::{
use once_cell::sync::OnceCell;
use rle::HasLength;
use serde::{Deserialize, Serialize};
use serde_columnar::{columnar, DeltaRleDecoder, Itertools};
use serde_columnar::{columnar, AnyRleDecoder, DeltaOfDeltaDecoder, Itertools};
use tracing::info;
use super::delta_rle_encode::{UnsignedDeltaDecoder, UnsignedDeltaEncoder};
use super::block_meta_encode::decode_changes_header;
use crate::arena::SharedArena;
use crate::change::{Change, Timestamp};
use crate::container::tree::tree_op;
@ -91,39 +90,19 @@ use crate::encoding::{
self, decode_op, encode_op, get_op_prop, EncodedDeleteStartId, IterableEncodedDeleteStartId,
};
use crate::op::Op;
use serde_columnar::{
AnyRleDecoder, AnyRleEncoder, BoolRleDecoder, BoolRleEncoder, DeltaRleEncoder,
};
#[derive(Debug, Serialize, Deserialize)]
struct EncodedBlock<'a> {
version: u16,
counter_start: u32,
counter_len: u32,
lamport_start: u32,
lamport_len: u32,
n_changes: u32,
first_counter: u32,
#[serde(borrow)]
peers: Cow<'a, [u8]>,
header: Cow<'a, [u8]>,
// timestamp and commit messages
#[serde(borrow)]
lengths: Cow<'a, [u8]>,
#[serde(borrow)]
dep_on_self: Cow<'a, [u8]>,
#[serde(borrow)]
dep_len: Cow<'a, [u8]>,
#[serde(borrow)]
dep_peer_idxs: Cow<'a, [u8]>,
#[serde(borrow)]
dep_counters: Cow<'a, [u8]>,
#[serde(borrow)]
lamports: Cow<'a, [u8]>,
#[serde(borrow)]
timestamps: Cow<'a, [u8]>,
#[serde(borrow)]
commit_msg_lengths: Cow<'a, [u8]>,
#[serde(borrow)]
commit_msgs: Cow<'a, [u8]>,
change_meta: Cow<'a, [u8]>,
// ---------------------- Ops ----------------------
#[serde(borrow)]
cids: Cow<'a, [u8]>,
@ -134,9 +113,23 @@ struct EncodedBlock<'a> {
#[serde(borrow)]
ops: Cow<'a, [u8]>,
#[serde(borrow)]
delete_start_ids: Cow<'a, [u8]>,
#[serde(borrow)]
values: Cow<'a, [u8]>,
}
fn diagnose_block(block: &EncodedBlock) {
info!("Diagnosing EncodedBlock:");
info!(" header {} bytes", block.header.len());
info!(" change_meta {} bytes", block.change_meta.len());
info!(" cids: {} bytes", block.cids.len());
info!(" keys: {} bytes", block.keys.len());
info!(" positions: {} bytes", block.positions.len());
info!(" ops: {} bytes", block.ops.len());
info!(" delete_id_starts: {} bytes", block.delete_start_ids.len());
info!(" values: {} bytes", block.values.len());
}
const VERSION: u16 = 0;
// MARK: encode_block
@ -151,46 +144,6 @@ pub fn encode_block(block: &[Change], arena: &SharedArena) -> Vec<u8> {
peer_register.register(&peer);
let cid_register: ValueRegister<ContainerID> = ValueRegister::new();
let mut timestamp_encoder = DeltaRleEncoder::new();
let mut lamport_encoder = UnsignedDeltaEncoder::new(block.len() * 2 + 4);
let mut commit_msg_len_encoder = AnyRleEncoder::<u32>::new();
let mut commit_msgs = String::new();
let mut dep_self_encoder = BoolRleEncoder::new();
let mut dep_len_encoder = AnyRleEncoder::<u64>::new();
let mut encoded_deps = EncodedDeps {
peer_idx: AnyRleEncoder::new(),
counter: AnyRleEncoder::new(),
};
for c in block {
timestamp_encoder.append(c.timestamp()).unwrap();
lamport_encoder.push(c.lamport() as u64);
if let Some(msg) = c.commit_msg.as_ref() {
commit_msg_len_encoder.append(msg.len() as u32).unwrap();
commit_msgs.push_str(msg);
} else {
commit_msg_len_encoder.append(0).unwrap();
}
let mut dep_on_self = false;
for dep in c.deps().iter() {
if dep.peer == peer {
dep_on_self = true;
} else {
let peer_idx = peer_register.register(&dep.peer);
encoded_deps.peer_idx.append(peer_idx as u32).unwrap();
encoded_deps.counter.append(dep.counter as u32).unwrap();
}
}
dep_self_encoder.append(dep_on_self).unwrap();
dep_len_encoder
.append(if dep_on_self {
c.deps().len() as u64 - 1
} else {
c.deps().len() as u64
})
.unwrap();
}
let mut encoded_ops = Vec::new();
let mut registers = Registers {
@ -256,16 +209,6 @@ pub fn encode_block(block: &[Change], arena: &SharedArena) -> Vec<u8> {
// Write to output
// PeerIDs
let peers = registers.peer_register.unwrap_vec();
let peer_bytes: Vec<u8> = peers.iter().flat_map(|p| p.to_le_bytes()).collect();
// First Counter + Change Len
let mut lengths_bytes = Vec::new();
for c in block {
leb128::write::unsigned(&mut lengths_bytes, c.atom_len() as u64).unwrap();
}
// ┌────────────────────┬─────────────────────────────────────────┐
// │ Key Strings Size │ Key Strings │
// └────────────────────┴─────────────────────────────────────────┘
@ -277,7 +220,7 @@ pub fn encode_block(block: &[Change], arena: &SharedArena) -> Vec<u8> {
// └────────────────────┴─────────────────────────────────────────┘
let position_vec = registers.position_register.unwrap_vec();
let positions = PositionArena::from_positions(position_vec.iter().map(|p| p.as_bytes()));
let position_bytes = positions.encode();
let position_bytes = positions.encode_v2();
// ┌──────────┬──────────┬───────┬────────────────────────────────┐
// │ │ │ │ │
@ -291,41 +234,49 @@ pub fn encode_block(block: &[Change], arena: &SharedArena) -> Vec<u8> {
// │ │ │ │ │
// └──────────┴──────────┴───────┴────────────────────────────────┘
let ops_bytes = serde_columnar::to_vec(&EncodedOpsAndDeleteStarts {
ops: encoded_ops,
delete_start_ids: del_starts,
})
.unwrap();
let ops_bytes = serde_columnar::to_vec(&EncodedOps { ops: encoded_ops }).unwrap();
let delete_id_starts_bytes = if del_starts.is_empty() {
Vec::new()
} else {
serde_columnar::to_vec(&EncodedDeleteStartIds {
delete_start_ids: del_starts,
})
.unwrap()
};
// ┌────────────────┬─────────────────────────────────────────────┐
// │Value Bytes Size│ Value Bytes │
// └────────────────┴─────────────────────────────────────────────┘
// PeerIDs
let mut peer_register = registers.peer_register;
// .unwrap_vec();
// let peer_bytes: Vec<u8> = peers.iter().flat_map(|p| p.to_le_bytes()).collect();
// Change meta
let (header, change_meta) = encode_changes(block, &mut peer_register);
let value_bytes = value_writer.finish();
let out = EncodedBlock {
version: VERSION,
counter_start: block[0].id.counter as u32,
counter_len: (block.last().unwrap().ctr_end() - block[0].id.counter) as u32,
lamport_start: block[0].lamport(),
lamport_len: block.last().unwrap().lamport_end() - block[0].lamport(),
n_changes: block.len() as u32,
first_counter: block[0].id.counter as u32,
peers: Cow::Owned(peer_bytes),
lengths: Cow::Owned(lengths_bytes),
dep_on_self: dep_self_encoder.finish().unwrap().into(),
dep_len: dep_len_encoder.finish().unwrap().into(),
dep_peer_idxs: encoded_deps.peer_idx.finish().unwrap().into(),
dep_counters: encoded_deps.counter.finish().unwrap().into(),
lamports: lamport_encoder.finish().0.into(),
timestamps: timestamp_encoder.finish().unwrap().into(),
commit_msg_lengths: commit_msg_len_encoder.finish().unwrap().into(),
commit_msgs: Cow::Owned(commit_msgs.into_bytes()),
header: header.into(),
change_meta: change_meta.into(),
cids: container_arena.encode().into(),
keys: keys_bytes.into(),
positions: position_bytes.into(),
ops: ops_bytes.into(),
delete_start_ids: delete_id_starts_bytes.into(),
values: value_bytes.into(),
};
postcard::to_allocvec(&out).unwrap()
diagnose_block(&out);
let ans = postcard::to_allocvec(&out).unwrap();
info!("block size = {}", ans.len());
ans
}
fn encode_keys(keys: Vec<loro_common::InternalString>) -> Vec<u8> {
@ -361,6 +312,7 @@ use crate::encoding::value::{
RawTreeMove, Value, ValueDecodedArenasTrait, ValueEncodeRegister, ValueKind, ValueReader,
ValueWriter,
};
use crate::oplog::change_store::block_meta_encode::encode_changes;
use crate::version::Frontiers;
impl ValueEncodeRegister for Registers {
fn key_mut(&mut self) -> &mut ValueRegister<loro_common::InternalString> {
@ -442,115 +394,22 @@ pub fn decode_header(m_bytes: &[u8]) -> LoroResult<ChangesBlockHeader> {
fn decode_header_from_doc(doc: &EncodedBlock) -> Result<ChangesBlockHeader, LoroError> {
let EncodedBlock {
n_changes,
first_counter,
peers: peers_bytes,
lengths: lengths_bytes,
dep_on_self,
dep_len,
dep_peer_idxs,
dep_counters,
lamports,
version,
header,
counter_len,
counter_start,
lamport_len,
lamport_start,
..
} = doc;
if *version != VERSION {
return Err(LoroError::IncompatibleFutureEncodingError(
*version as usize,
));
}
let first_counter = *first_counter as Counter;
let n_changes = *n_changes as usize;
let peer_num = peers_bytes.len() / 8;
let mut peers = Vec::with_capacity(peer_num);
for i in 0..peer_num {
let peer_id =
PeerID::from_le_bytes((&peers_bytes[(8 * i)..(8 * (i + 1))]).try_into().unwrap());
peers.push(peer_id);
}
// ┌───────────────────┬──────────────────────────────────────────┐ │
// │ LEB First Counter │ N LEB128 Change AtomLen │◁───┼───── Important metadata
// └───────────────────┴──────────────────────────────────────────┘ │
let mut lengths = Vec::with_capacity(n_changes);
let mut lengths_bytes: &[u8] = lengths_bytes;
for _ in 0..n_changes {
lengths.push(leb128::read::unsigned(&mut lengths_bytes).unwrap() as Counter);
}
// ┌───────────────────┬────────────────────────┬─────────────────┐ │
// │N DepOnSelf BoolRle│ N Delta Rle Deps Lens │ N Dep IDs │◁───┘
// └───────────────────┴────────────────────────┴─────────────────┘
let mut dep_self_decoder = BoolRleDecoder::new(dep_on_self);
let mut this_counter = first_counter;
let mut deps: Vec<Frontiers> = Vec::with_capacity(n_changes);
let n = n_changes;
let mut deps_len = AnyRleDecoder::<u64>::new(dep_len);
let deps_peers_decoder = AnyRleDecoder::<u32>::new(dep_peer_idxs);
let deps_counters_decoder = AnyRleDecoder::<u32>::new(dep_counters);
let mut deps_peers_iter = deps_peers_decoder;
let mut deps_counters_iter = deps_counters_decoder;
for i in 0..n {
let mut f = Frontiers::default();
if dep_self_decoder.next().unwrap().unwrap() {
f.push(ID::new(peers[0], this_counter - 1))
}
let len = deps_len.next().unwrap().unwrap() as usize;
for _ in 0..len {
let peer_idx = deps_peers_iter.next().unwrap().unwrap() as usize;
let peer = peers[peer_idx];
let counter = deps_counters_iter.next().unwrap().unwrap() as Counter;
f.push(ID::new(peer, counter));
}
deps.push(f);
this_counter += lengths[i];
}
let mut counters = Vec::with_capacity(n + 1);
let mut last = first_counter;
for i in 0..n {
counters.push(last);
last += lengths[i];
}
counters.push(last);
assert_eq!(last, (counter_start + counter_len) as Counter);
let mut lamport_decoder = UnsignedDeltaDecoder::new(lamports, n_changes);
let mut lamports = Vec::with_capacity(n + 1);
for _ in 0..n {
lamports.push(lamport_decoder.next().unwrap() as Lamport);
}
let last_lamport = *lamports.last().unwrap();
lamports.push(last_lamport + lengths.last().copied().unwrap() as Lamport);
assert_eq!(
*lamports.last().unwrap(),
(lamport_start + lamport_len) as Lamport
let ans: ChangesBlockHeader = decode_changes_header(
&header,
*n_changes as usize,
*counter_start as Counter,
*counter_len as Counter,
*lamport_start,
*lamport_len,
);
Ok(ChangesBlockHeader {
peer: peers[0],
counter: first_counter,
n_changes,
peers,
counters,
deps_groups: deps,
lamports,
keys: OnceCell::new(),
cids: OnceCell::new(),
})
}
struct EncodedDeps {
peer_idx: AnyRleEncoder<u32>,
counter: AnyRleEncoder<u32>,
Ok(ans)
}
#[columnar(vec, ser, de, iterable)]
@ -560,16 +419,20 @@ struct EncodedOp {
container_index: u32,
#[columnar(strategy = "DeltaRle")]
prop: i32,
#[columnar(strategy = "DeltaRle")]
#[columnar(strategy = "Rle")]
value_type: u8,
#[columnar(strategy = "Rle")]
len: u32,
}
#[columnar(ser, de)]
struct EncodedOpsAndDeleteStarts {
struct EncodedOps {
#[columnar(class = "vec", iter = "EncodedOp")]
ops: Vec<EncodedOp>,
}
#[columnar(ser, de)]
struct EncodedDeleteStartIds {
#[columnar(class = "vec", iter = "EncodedDeleteStartId")]
delete_start_ids: Vec<EncodedDeleteStartId>,
}
@ -679,32 +542,30 @@ pub fn decode_block(
header_on_stack.as_ref().unwrap()
});
let EncodedBlock {
version,
n_changes,
first_counter,
timestamps,
commit_msg_lengths,
commit_msgs,
counter_start: first_counter,
change_meta,
cids,
keys,
ops,
delete_start_ids,
values,
positions,
..
} = doc;
let mut changes = Vec::with_capacity(n_changes as usize);
if version != VERSION {
return Err(LoroError::IncompatibleFutureEncodingError(version as usize));
}
let mut timestamp_decoder: DeltaRleDecoder<i64> = DeltaRleDecoder::new(&timestamps);
let mut commit_msg_len_decoder = AnyRleDecoder::<u32>::new(&commit_msg_lengths);
let n_changes = n_changes as usize;
let mut changes = Vec::with_capacity(n_changes);
let timestamp_decoder = DeltaOfDeltaDecoder::<i64>::new(&change_meta).unwrap();
let (timestamps, bytes) = timestamp_decoder.take_n_finalize(n_changes).unwrap();
let commit_msg_len_decoder = AnyRleDecoder::<u32>::new(bytes);
let (commit_msg_lens, commit_msgs) = commit_msg_len_decoder.take_n_finalize(n_changes).unwrap();
let mut commit_msg_index = 0;
let keys = header.keys.get_or_init(|| decode_keys(&keys));
let decode_arena = ValueDecodeArena {
peers: &header.peers,
keys,
};
let positions = PositionArena::decode(&positions)?;
let positions = PositionArena::decode_v2(&positions)?;
let positions = positions.parse_to_positions();
let cids: &Vec<ContainerID> = header.cids.get_or_init(|| {
ContainerArena::decode(&cids)
@ -715,13 +576,22 @@ pub fn decode_block(
.unwrap()
});
let mut value_reader = ValueReader::new(&values);
let encoded_ops_iters =
serde_columnar::iter_from_bytes::<EncodedOpsAndDeleteStarts>(&ops).unwrap();
let encoded_ops_iters = serde_columnar::iter_from_bytes::<EncodedOps>(&ops).unwrap();
let op_iter = encoded_ops_iters.ops;
let mut del_iter = encoded_ops_iters.delete_start_ids;
for i in 0..(n_changes as usize) {
let encoded_delete_id_starts: EncodedDeleteStartIds = if delete_start_ids.is_empty() {
EncodedDeleteStartIds {
delete_start_ids: Vec::new(),
}
} else {
serde_columnar::from_bytes(&delete_start_ids).unwrap()
};
let mut del_iter = encoded_delete_id_starts
.delete_start_ids
.into_iter()
.map(Ok);
for i in 0..n_changes {
let commit_msg: Option<Arc<str>> = {
let len = commit_msg_len_decoder.next().unwrap().unwrap();
let len = commit_msg_lens[i];
if len == 0 {
None
} else {
@ -743,7 +613,7 @@ pub fn decode_block(
deps: header.deps_groups[i].clone(),
id: ID::new(header.peer, header.counters[i]),
lamport: header.lamports[i],
timestamp: timestamp_decoder.next().unwrap().unwrap() as Timestamp,
timestamp: timestamps[i] as Timestamp,
commit_msg,
})
}
@ -790,6 +660,58 @@ pub fn decode_block(
change_index += 1;
}
}
Ok(changes)
}
#[cfg(test)]
mod test {
use crate::{delta::DeltaValue, LoroDoc};
#[test]
pub fn encode_single_text_edit() {
let doc = LoroDoc::new();
doc.start_auto_commit();
doc.get_map("map").insert("x", 100).unwrap();
// doc.get_text("text").insert(0, "Hi").unwrap();
// let node = doc.get_tree("tree").create(None).unwrap();
// doc.get_tree("tree").create(node).unwrap();
diagnose(&doc);
doc.get_map("map").insert("y", 20).unwrap();
diagnose(&doc);
doc.get_map("map").insert("z", 1000).unwrap();
diagnose(&doc);
doc.get_text("text").insert(0, "Hello").unwrap();
diagnose(&doc);
doc.get_text("text").insert(2, "He").unwrap();
diagnose(&doc);
doc.get_text("text").delete(1, 4).unwrap();
diagnose(&doc);
doc.get_text("text").delete(0, 2).unwrap();
diagnose(&doc);
}
fn diagnose(doc: &LoroDoc) {
let bytes = doc.export_from(&Default::default());
println!("Old Update bytes {:?}", dev_utils::ByteSize(bytes.length()));
let bytes = doc.export(crate::loro::ExportMode::Updates {
from: &Default::default(),
});
println!("Update bytes {:?}", dev_utils::ByteSize(bytes.length()));
// assert!(bytes.len() < 30);
let bytes = doc.export(crate::loro::ExportMode::Snapshot);
println!("Snapshot bytes {:?}", dev_utils::ByteSize(bytes.length()));
// assert!(bytes.len() < 30);
let json = doc.export_json_updates(&Default::default(), &doc.oplog_vv());
let json_string = serde_json::to_string(&json.changes).unwrap();
println!(
"JSON string bytes {:?}",
dev_utils::ByteSize(json_string.len())
);
let bytes = postcard::to_allocvec(&json.changes).unwrap();
println!("JSON update bytes {:?}", dev_utils::ByteSize(bytes.len()));
println!("\n\n")
}
}

View file

@ -0,0 +1,186 @@
use loro_common::{Counter, Lamport, PeerID, ID};
use once_cell::sync::OnceCell;
use rle::HasLength;
use serde_columnar::{
AnyRleDecoder, AnyRleEncoder, BoolRleDecoder, BoolRleEncoder, DeltaOfDeltaDecoder,
DeltaOfDeltaEncoder,
};
use crate::{change::Change, encoding::value_register::ValueRegister, version::Frontiers};
use super::block_encode::ChangesBlockHeader;
pub(crate) fn encode_changes(
block: &[Change],
peer_register: &mut ValueRegister<PeerID>,
) -> (Vec<u8>, Vec<u8>) {
let peer = block[0].peer();
let mut timestamp_encoder = DeltaOfDeltaEncoder::new();
let mut lamport_encoder = DeltaOfDeltaEncoder::new();
let mut commit_msg_len_encoder = AnyRleEncoder::<u32>::new();
let mut commit_msgs = String::new();
let mut dep_self_encoder = BoolRleEncoder::new();
let mut dep_len_encoder = AnyRleEncoder::<usize>::new();
let mut encoded_deps = EncodedDeps {
peer_idx: AnyRleEncoder::new(),
counter: DeltaOfDeltaEncoder::new(),
};
// First Counter + Change Len
let mut lengths_bytes = Vec::new();
let mut counter = vec![];
let mut n = 0;
for (i, c) in block.iter().enumerate() {
counter.push(c.id.counter);
let is_last = i == block.len() - 1;
if !is_last {
leb128::write::unsigned(&mut lengths_bytes, c.atom_len() as u64).unwrap();
lamport_encoder.append(c.lamport() as i64).unwrap();
}
timestamp_encoder.append(c.timestamp()).unwrap();
if let Some(msg) = c.commit_msg.as_ref() {
commit_msg_len_encoder.append(msg.len() as u32).unwrap();
commit_msgs.push_str(msg);
} else {
commit_msg_len_encoder.append(0).unwrap();
}
let mut dep_on_self = false;
for dep in c.deps().iter() {
if dep.peer == peer {
dep_on_self = true;
} else {
let peer_idx = peer_register.register(&dep.peer);
encoded_deps.peer_idx.append(peer_idx as u32).unwrap();
encoded_deps.counter.append(dep.counter as i64).unwrap();
n += 1;
}
}
dep_self_encoder.append(dep_on_self).unwrap();
dep_len_encoder
.append(if dep_on_self {
c.deps().len() - 1
} else {
c.deps().len()
})
.unwrap();
}
// TODO: capacity
let mut ans = Vec::with_capacity(block.len() * 15);
let _ = leb128::write::unsigned(&mut ans, peer_register.vec().len() as u64);
ans.extend(peer_register.vec().iter().flat_map(|p| p.to_le_bytes()));
ans.append(&mut lengths_bytes);
ans.append(&mut dep_self_encoder.finish().unwrap());
ans.append(&mut dep_len_encoder.finish().unwrap());
ans.append(&mut encoded_deps.peer_idx.finish().unwrap());
ans.append(&mut encoded_deps.counter.finish().unwrap());
ans.append(&mut lamport_encoder.finish().unwrap());
let mut t = timestamp_encoder.finish().unwrap();
let mut cml = commit_msg_len_encoder.finish().unwrap();
let mut cms = commit_msgs.into_bytes();
let mut meta = Vec::with_capacity(t.len() + cml.len() + cms.len());
meta.append(&mut t);
meta.append(&mut cml);
meta.append(&mut cms);
(ans, meta)
}
pub(crate) fn decode_changes_header(
mut bytes: &[u8],
n_changes: usize,
first_counter: Counter,
counter_len: Counter,
lamport_start: Lamport,
lamport_len: Lamport,
) -> ChangesBlockHeader {
let mut this_counter = first_counter;
let peer_num = leb128::read::unsigned(&mut bytes).unwrap() as usize;
let mut peers = Vec::with_capacity(peer_num);
for i in 0..peer_num {
let peer_id = PeerID::from_le_bytes((&bytes[(8 * i)..(8 * (i + 1))]).try_into().unwrap());
peers.push(peer_id);
}
let mut bytes = &bytes[8 * peer_num..];
// ┌───────────────────┬──────────────────────────────────────────┐ │
// │ LEB First Counter │ N LEB128 Change AtomLen │◁───┼───── Important metadata
// └───────────────────┴──────────────────────────────────────────┘ │
let mut lengths = Vec::with_capacity(n_changes);
for _ in 0..n_changes - 1 {
lengths.push(leb128::read::unsigned(&mut bytes).unwrap() as Counter);
}
lengths.push(counter_len - lengths.iter().sum::<i32>());
// ┌───────────────────┬────────────────────────┬─────────────────┐ │
// │N DepOnSelf BoolRle│ N Delta Rle Deps Lens │ N Dep IDs │◁───┘
// └───────────────────┴────────────────────────┴─────────────────┘
let dep_self_decoder = BoolRleDecoder::new(bytes);
let (dep_self, bytes) = dep_self_decoder.take_n_finalize(n_changes).unwrap();
let dep_len_decoder = AnyRleDecoder::<usize>::new(bytes);
let (deps_len, bytes) = dep_len_decoder.take_n_finalize(n_changes).unwrap();
let other_dep_num = deps_len.iter().sum::<usize>();
let dep_peer_decoder = AnyRleDecoder::<usize>::new(bytes);
let (dep_peers, bytes) = dep_peer_decoder.take_n_finalize(other_dep_num).unwrap();
let mut deps_peers_iter = dep_peers.into_iter();
let dep_counter_decoder = DeltaOfDeltaDecoder::<u32>::new(bytes).unwrap();
let (dep_counters, bytes) = dep_counter_decoder.take_n_finalize(other_dep_num).unwrap();
let mut deps_counters_iter = dep_counters.into_iter();
let mut deps = Vec::with_capacity(n_changes);
for i in 0..n_changes {
let mut f = Frontiers::default();
if dep_self[i] {
f.push(ID::new(peers[0], this_counter - 1))
}
let len = deps_len[i];
for _ in 0..len {
let peer_idx = deps_peers_iter.next().unwrap();
let peer = peers[peer_idx];
let counter = deps_counters_iter.next().unwrap() as Counter;
f.push(ID::new(peer, counter));
}
deps.push(f);
this_counter += lengths[i];
}
let mut counters = Vec::with_capacity(n_changes);
let mut last = first_counter;
for i in 0..n_changes {
counters.push(last);
last += lengths[i];
}
let lamport_decoder = DeltaOfDeltaDecoder::new(bytes).unwrap();
let (mut lamports, rest) = lamport_decoder
.take_n_finalize(n_changes.saturating_sub(1))
.unwrap();
// the last lamport
lamports.push((lamport_start + lamport_len - *lengths.last().unwrap_or(&0) as u32) as Lamport);
// we need counter range, so encode
counters.push(first_counter + counter_len);
debug_assert!(rest.is_empty());
ChangesBlockHeader {
peer: peers[0],
counter: first_counter,
n_changes,
peers,
counters,
deps_groups: deps,
lamports,
keys: OnceCell::new(),
cids: OnceCell::new(),
}
}
struct EncodedDeps {
peer_idx: AnyRleEncoder<u32>,
counter: DeltaOfDeltaEncoder,
}

View file

@ -3,15 +3,12 @@ use crate::{
arena::SharedArena,
configure::Configure,
container::idx::ContainerIdx,
state::{FastStateSnapshot, RichtextState},
utils::kv_wrapper::KvWrapper,
version::Frontiers,
VersionVector,
};
use bytes::Bytes;
use fxhash::{FxHashMap, FxHashSet};
use inner_store::InnerStore;
use loro_common::{ContainerID, ContainerType, LoroResult, LoroValue};
use loro_common::{LoroResult, LoroValue};
use std::sync::{atomic::AtomicU64, Arc, Mutex};
pub(crate) use container_wrapper::ContainerWrapper;

View file

@ -1,14 +1,13 @@
use std::ops::Bound;
use bytes::Bytes;
use fxhash::{FxHashMap, FxHashSet};
use fxhash::FxHashMap;
use loro_common::ContainerID;
use tracing::trace;
use crate::{
arena::SharedArena,
container::idx::ContainerIdx,
state::{container_store::FRONTIERS_KEY, ContainerCreationContext},
state::container_store::FRONTIERS_KEY,
utils::kv_wrapper::KvWrapper,
version::Frontiers,
};

View file

@ -1,4 +1,4 @@
use std::{cmp::Ordering, sync::Arc, time::Instant};
use std::{cmp::Ordering, sync::Arc};
use loro_internal::{
change::{Change, Lamport, Timestamp},

View file

@ -1,4 +1,3 @@
use dev_utils::setup_test_log;
use loro::LoroDoc;
mod gc_test;
@ -6,7 +5,7 @@ mod undo_test;
fn gen_action(doc: &LoroDoc, seed: u64, mut ops_len: usize) {
let mut rng = StdRng::seed_from_u64(seed);
use loro::{ContainerType, LoroValue};
use loro::LoroValue;
use rand::prelude::*;
let root_map = doc.get_map("root");

View file

@ -1572,3 +1572,15 @@ fn test_gc_can_remove_unreachable_states() -> LoroResult<()> {
Ok(())
}
#[test]
fn small_update_size() {
let doc = LoroDoc::new();
let text = doc.get_text("text");
text.insert(0, "h").unwrap();
let bytes = doc.export(loro::ExportMode::Updates {
from: &Default::default(),
});
println!("Update bytes {:?}", dev_utils::ByteSize(bytes.len()));
assert!(bytes.len() < 90, "Large update size {}", bytes.len());
}