From b13b4bcf94eddb0515c28b6cd000b068a0fea64d Mon Sep 17 00:00:00 2001 From: Zixuan Chen Date: Mon, 27 May 2024 19:02:01 +0800 Subject: [PATCH] refactor: reuse more serde_columnar code --- Cargo.lock | 4 +- Cargo.toml | 2 +- crates/loro-internal/src/oplog.rs | 4 + .../loro-internal/src/oplog/change_store.rs | 13 +- .../src/oplog/change_store/block_encode.rs | 364 +++++++---- .../oplog/change_store/delta_rle_encode.rs | 592 ------------------ 6 files changed, 263 insertions(+), 716 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 62b2bb77..e971ed0e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1853,9 +1853,9 @@ dependencies = [ [[package]] name = "serde_columnar" -version = "0.3.4" +version = "0.3.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a5d54dd7e7a1ec134c842f8a3bdb5a1fc662d002682e0457f976f3046cf9ccf8" +checksum = "06a86f5f6dc16d8308c37e145dd4c7e60fba1486d84982519388d31ea0ad6703" dependencies = [ "itertools 0.11.0", "postcard", diff --git a/Cargo.toml b/Cargo.toml index 6c76cc0a..92bb4dc0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,5 +23,5 @@ tracing = { version = "0.1", features = [ "max_level_debug", "release_max_level_warn", ] } -serde_columnar = { version = "0.3.4" } +serde_columnar = { version = "0.3.5" } itertools = "0.12.1" diff --git a/crates/loro-internal/src/oplog.rs b/crates/loro-internal/src/oplog.rs index 173004db..bac3e4e7 100644 --- a/crates/loro-internal/src/oplog.rs +++ b/crates/loro-internal/src/oplog.rs @@ -21,6 +21,7 @@ use crate::op::{FutureInnerContent, ListSlice, Op, RawOpContent, RemoteOp, RichO use crate::span::{HasCounterSpan, HasIdSpan, HasLamportSpan}; use crate::version::{Frontiers, ImVersionVector, VersionVector}; use crate::LoroError; +use change_store::ChangeStore; use fxhash::FxHashMap; use loro_common::{HasCounter, HasId, IdLp, IdSpan}; use rle::{HasLength, RleCollection, RlePush, RleVec, Sliceable}; @@ -44,6 +45,7 @@ pub struct OpLog { pub(crate) dag: AppDag, pub(crate) arena: SharedArena, changes: ClientChanges, + change_store: ChangeStore, pub(crate) op_groups: OpGroups, /// **lamport starts from 0** pub(crate) next_lamport: Lamport, @@ -87,6 +89,7 @@ impl Clone for OpLog { arena: self.arena.clone(), changes: self.changes.clone(), op_groups: self.op_groups.clone(), + change_store: self.change_store.clone(), next_lamport: self.next_lamport, latest_timestamp: self.latest_timestamp, pending_changes: Default::default(), @@ -177,6 +180,7 @@ impl OpLog { dag: AppDag::default(), op_groups: OpGroups::new(arena.clone()), changes: ClientChanges::default(), + change_store: ChangeStore::new(), arena, next_lamport: 0, latest_timestamp: Timestamp::default(), diff --git a/crates/loro-internal/src/oplog/change_store.rs b/crates/loro-internal/src/oplog/change_store.rs index 05e823cc..32e36d0f 100644 --- a/crates/loro-internal/src/oplog/change_store.rs +++ b/crates/loro-internal/src/oplog/change_store.rs @@ -7,12 +7,20 @@ mod delta_rle_encode; mod ops_encode; use crate::{arena::SharedArena, change::Change, version::Frontiers}; -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct ChangeStore { kv: BTreeMap, } -#[derive(Debug)] +impl ChangeStore { + pub fn new() -> Self { + Self { + kv: BTreeMap::new(), + } + } +} + +#[derive(Debug, Clone)] pub struct ChangesBlock { arena: SharedArena, peer: PeerID, @@ -68,6 +76,7 @@ impl ChangesBlock { } } +#[derive(Clone)] enum ChangesBlockContent { Changes(Vec), Bytes(ChangesBlockBytes), diff --git a/crates/loro-internal/src/oplog/change_store/block_encode.rs b/crates/loro-internal/src/oplog/change_store/block_encode.rs index 394690ed..a8037d88 100644 --- a/crates/loro-internal/src/oplog/change_store/block_encode.rs +++ b/crates/loro-internal/src/oplog/change_store/block_encode.rs @@ -1,67 +1,80 @@ //! # Encode //! -//! ≈4KB after compression //! -//! N = Number of Changes +//! ≈4KB after compression //! -//! Peer_1 = This Peer +//! N = Number of Changes +//! +//! Peer_1 = This Peer //! //! -//! ┌──────────┬─────┬────────────┬───────────────────────────────┐ -//! │2B Version│LEB N│LEB Peer Num│ 8B Peer_1,...,Peer_x │◁────┐ -//! └──────────┴─────┴────────────┴───────────────────────────────┘ │ -//! ┌──────────────────────────────────────────────────────────────┐ │ -//! │ N LEB128 Delta Counters │◁───┼───── Important metadata -//! └──────────────────────────────────────────────────────────────┘ │ -//! ┌───────────────────┬────────────────────────┬─────────────────┐ │ -//! │N DepOnSelf BoolRle│ N Delta Rle Deps Lens │ N Dep IDs │◁───┘ -//! └───────────────────┴────────────────────────┴─────────────────┘ -//! ┌──────────────────────────────────────────────────────────────┐ -//! │ N LEB128 Delta Lamports │ -//! └──────────────────────────────────────────────────────────────┘ -//! ┌──────────────────────────────────────────────────────────────┐ -//! │ N LEB128 Delta Timestamps │ -//! └──────────────────────────────────────────────────────────────┘ -//! ┌────────────────────────────────┬─────────────────────────────┐ -//! │ N Rle Commit Msg Lengths │ Commit Messages │ -//! └────────────────────────────────┴─────────────────────────────┘ +//! ┌──────────┬─────┬────────────┬───────────────────────────────┐ +//! │2B Version│LEB N│LEB Peer Num│ 8B Peer_1,...,Peer_x │◁────┐ +//! └──────────┴─────┴────────────┴───────────────────────────────┘ │ +//! ┌───────────────────┬──────────────────────────────────────────┐ │ +//! │ LEB First Counter │ N LEB128 Change AtomLen │◁───┼───── Important metadata +//! └───────────────────┴──────────────────────────────────────────┘ │ +//! ┌───────────────────┬────────────────────────┬─────────────────┐ │ +//! │N DepOnSelf BoolRle│ N Delta Rle Deps Lens │ Dep IDs │◁───┤ +//! └───────────────────┴────────────────────────┴─────────────────┘ │ +//! ┌──────────────────────────────────────────────────────────────┐ │ +//! │ N LEB128 Delta Lamports │◁───┘ +//! └──────────────────────────────────────────────────────────────┘ +//! ┌──────────────────────────────────────────────────────────────┐ +//! │ N LEB128 Delta Timestamps │ +//! └──────────────────────────────────────────────────────────────┘ +//! ┌────────────────────────────────┬─────────────────────────────┐ +//! │ N Rle Commit Msg Lengths │ Commit Messages │ +//! └────────────────────────────────┴─────────────────────────────┘ //! -//! ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ Encoded Operations ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ +//! ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ Encoded Operations ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ //! -//! ┌────────────────────┬─────────────────────────────────────────┐ -//! │ ContainerIDs Size │ ContainerIDs │ -//! └────────────────────┴─────────────────────────────────────────┘ -//! ┌────────────────────┬─────────────────────────────────────────┐ -//! │ Key Strings Size │ Key Strings │ -//! └────────────────────┴─────────────────────────────────────────┘ -//! ┌──────────┬──────────┬───────┬────────────────────────────────┐ -//! │ │ │ │ │ -//! │ │ │ │ │ -//! │ │ │ │ │ -//! │ LEB128 │ RLE │ Delta │ │ -//! │ Lengths │Containers│ RLE │ ValueType │ -//! │ │ │ Props │ │ -//! │ │ │ │ │ -//! │ │ │ │ │ -//! │ │ │ │ │ -//! └──────────┴──────────┴───────┴────────────────────────────────┘ -//! ┌────────────────┬─────────────────────────────────────────────┐ -//! │Value Bytes Size│ Value Bytes │ -//! └────────────────┴─────────────────────────────────────────────┘ +//! ┌────────────────────┬─────────────────────────────────────────┐ +//! │ ContainerIDs Size │ ContainerIDs │ +//! └────────────────────┴─────────────────────────────────────────┘ +//! ┌────────────────────┬─────────────────────────────────────────┐ +//! │ Key Strings Size │ Key Strings │ +//! └────────────────────┴─────────────────────────────────────────┘ +//! ┌────────┬──────────┬──────────┬───────┬───────────────────────┐ +//! │ │ │ │ │ │ +//! │ │ │ │ │ │ +//! │ │ │ │ │ │ +//! │ Ops │ LEB128 │ RLE │ Delta │ │ +//! │ Size │ Lengths │Containers│ RLE │ ValueType │ +//! │ │ │ │ Props │ │ +//! │ │ │ │ │ │ +//! │ │ │ │ │ │ +//! │ │ │ │ │ │ +//! └────────┴──────────┴──────────┴───────┴───────────────────────┘ +//! ┌────────────────┬─────────────────────────────────────────────┐ +//! │ │ │ +//! │Value Bytes Size│ Value Bytes │ +//! │ │ │ +//! └────────────────┴─────────────────────────────────────────────┘ +//! ┌──────────────────────────────────────────────────────────────┐ +//! │ │ +//! │ Delete Start IDs │ +//! │ │ +//! └──────────────────────────────────────────────────────────────┘ +use std::borrow::Cow; use std::io::Write; -use loro_common::{ContainerID, PeerID}; +use loro_common::{ContainerID, Counter, LoroError, LoroResult, PeerID, ID}; +use num::complex::ParseComplexError; use rle::HasLength; -use serde_columnar::columnar; +use serde::{Deserialize, Serialize}; +use serde_columnar::{columnar, Itertools}; -use super::delta_rle_encode::{BoolRleEncoder, UnsignedDeltaEncoder, UnsignedRleEncoder}; -use super::ChangesBlock; +use super::delta_rle_encode::UnsignedDeltaEncoder; use crate::arena::SharedArena; use crate::change::Change; use crate::encoding::arena::ContainerArena; use crate::encoding::value_register::ValueRegister; use crate::encoding::{encode_op, get_op_prop}; +use serde_columnar::{ + AnyRleDecoder, AnyRleEncoder, BoolRleDecoder, BoolRleEncoder, DeltaRleEncoder, +}; const VERSION: u16 = 0; @@ -71,29 +84,24 @@ pub fn encode(block: &[Change], arena: &SharedArena) -> Vec { panic!("Empty block") } - let mut output = Vec::with_capacity(4096); - output.write_all(&VERSION.to_le_bytes()).unwrap(); - leb128::write::unsigned(&mut output, block.len() as u64).unwrap(); let mut peer_register: ValueRegister = ValueRegister::new(); let peer = block[0].peer(); peer_register.register(&peer); let cid_register: ValueRegister = ValueRegister::new(); - let mut counter_encoder = UnsignedDeltaEncoder::new(block.len() * 2 + 4); let mut timestamp_encoder = UnsignedDeltaEncoder::new(block.len() * 3 + 8); let mut lamport_encoder = UnsignedDeltaEncoder::new(block.len() * 2 + 4); - let mut commit_msg_len_encoder = UnsignedRleEncoder::new(0); + let mut commit_msg_len_encoder = AnyRleEncoder::::new(); let mut dep_self_encoder = BoolRleEncoder::new(); - let mut dep_len_encoder = UnsignedRleEncoder::new(0); + let mut dep_len_encoder = AnyRleEncoder::::new(); let mut encoded_deps = EncodedDeps { - peer_idx: UnsignedRleEncoder::new(0), - counter: UnsignedRleEncoder::new(0), + peer_idx: AnyRleEncoder::new(), + counter: AnyRleEncoder::new(), }; for c in block { - counter_encoder.push(c.id.counter as u64); timestamp_encoder.push(c.timestamp() as u64); lamport_encoder.push(c.lamport() as u64); - commit_msg_len_encoder.push(0); + commit_msg_len_encoder.append(0); let mut dep_on_self = false; for dep in c.deps().iter() { @@ -101,13 +109,13 @@ pub fn encode(block: &[Change], arena: &SharedArena) -> Vec { dep_on_self = true; } else { let peer_idx = peer_register.register(&dep.peer); - encoded_deps.peer_idx.push(peer_idx as u64); - encoded_deps.counter.push(dep.counter as u64); + encoded_deps.peer_idx.append(peer_idx as u32); + encoded_deps.counter.append(dep.counter as u32); } } - dep_self_encoder.push(dep_on_self); - dep_len_encoder.push(if dep_on_self { + dep_self_encoder.append(dep_on_self); + dep_len_encoder.append(if dep_on_self { c.deps().len() as u64 - 1 } else { c.deps().len() as u64 @@ -121,7 +129,7 @@ pub fn encode(block: &[Change], arena: &SharedArena) -> Vec { cid_register, }; - let mut del_starts = Vec::new(); + let mut del_starts: Vec<_> = Vec::new(); let mut value_writer = ValueWriter::new(); for c in block { for op in c.ops().iter() { @@ -155,65 +163,23 @@ pub fn encode(block: &[Change], arena: &SharedArena) -> Vec { // PeerIDs let peers = registers.peer_register.unwrap_vec(); - leb128::write::unsigned(&mut output, peers.len() as u64).unwrap(); - for peer in peers { - output.write_all(&peer.to_le_bytes()).unwrap(); + let peer_bytes: Vec = peers.iter().map(|p| p.to_le_bytes()).flatten().collect(); + + // Frist Counter + Change Len + let mut lengths_bytes = Vec::new(); + for c in block { + leb128::write::unsigned(&mut lengths_bytes, c.atom_len() as u64).unwrap(); } - // Counters - let (bytes, _n) = counter_encoder.finish(); - output.write_all(&bytes).unwrap(); - - // ┌───────────────────┬────────────────────────┬─────────────────┐ - // │N DepOnSelf BoolRle│ N Delta Rle Deps Lens │ N Dep IDs │ - // └───────────────────┴────────────────────────┴─────────────────┘ - let (buf, _n) = dep_self_encoder.finish(); - output.write_all(&buf).unwrap(); - let (buf, _n) = dep_len_encoder.finish(); - output.write_all(&buf).unwrap(); - let (buf, _n) = encoded_deps.peer_idx.finish(); - output.write_all(&buf).unwrap(); - let (buf, _n) = encoded_deps.counter.finish(); - output.write_all(&buf).unwrap(); - - // ┌──────────────────────────────────────────────────────────────┐ - // │ N LEB128 Delta Lamports │ - // └──────────────────────────────────────────────────────────────┘ - let (buf, _n) = lamport_encoder.finish(); - output.write_all(&buf).unwrap(); - - // ┌──────────────────────────────────────────────────────────────┐ - // │ N LEB128 Delta Timestamps │ - // └──────────────────────────────────────────────────────────────┘ - let (buf, _n) = timestamp_encoder.finish(); - output.write_all(&buf).unwrap(); - - // ┌────────────────────────────────┬─────────────────────────────┐ - // │ N Rle Commit Msg Lengths │ Commit Messages │ - // └────────────────────────────────┴─────────────────────────────┘ - let (buf, _n) = commit_msg_len_encoder.finish(); - output.write_all(&buf).unwrap(); - // TODO: Commit messages - - // ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ Encoded Operations ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ - // - // ┌────────────────────┬─────────────────────────────────────────┐ - // │ ContainerIDs Size │ ContainerIDs │ - // └────────────────────┴─────────────────────────────────────────┘ - - let bytes = container_arena.encode(); - leb128::write::unsigned(&mut output, bytes.len() as u64).unwrap(); - output.write_all(&bytes).unwrap(); - // ┌────────────────────┬─────────────────────────────────────────┐ // │ Key Strings Size │ Key Strings │ // └────────────────────┴─────────────────────────────────────────┘ let keys = registers.key_register.unwrap_vec(); - leb128::write::unsigned(&mut output, keys.len() as u64).unwrap(); + let mut keys_bytes = Vec::new(); for key in keys { let bytes = key.as_bytes(); - leb128::write::unsigned(&mut output, bytes.len() as u64).unwrap(); - output.write_all(bytes).unwrap(); + leb128::write::unsigned(&mut keys_bytes, bytes.len() as u64).unwrap(); + keys_bytes.write_all(bytes).unwrap(); } // ┌──────────┬──────────┬───────┬────────────────────────────────┐ @@ -229,16 +195,32 @@ pub fn encode(block: &[Change], arena: &SharedArena) -> Vec { // └──────────┴──────────┴───────┴────────────────────────────────┘ let ops_bytes = serde_columnar::to_vec(&encoded_ops).unwrap(); - leb128::write::unsigned(&mut output, ops_bytes.len() as u64).unwrap(); - output.write_all(&ops_bytes).unwrap(); // ┌────────────────┬─────────────────────────────────────────────┐ // │Value Bytes Size│ Value Bytes │ // └────────────────┴─────────────────────────────────────────────┘ let value_bytes = value_writer.finish(); - leb128::write::unsigned(&mut output, value_bytes.len() as u64).unwrap(); - output.write_all(&value_bytes).unwrap(); - output + let out = EncodedDoc { + version: VERSION, + n_changes: block.len() as u32, + first_counter: block[0].id.counter as u32, + peers: Cow::Owned(peer_bytes), + lengths: Cow::Owned(lengths_bytes), + dep_on_self: dep_self_encoder.finish().unwrap().into(), + dep_len: dep_len_encoder.finish().unwrap().into(), + dep_peer_idxs: encoded_deps.peer_idx.finish().unwrap().into(), + dep_counters: encoded_deps.counter.finish().unwrap().into(), + lamports: lamport_encoder.finish().0.into(), + timestamps: timestamp_encoder.finish().0.into(), + commit_msg_lengths: commit_msg_len_encoder.finish().unwrap().into(), + commit_msgs: Cow::Owned(vec![]), + cids: container_arena.encode().into(), + keys: keys_bytes.into(), + ops: ops_bytes.into(), + values: value_bytes.into(), + delete_start_ids: serde_columnar::to_vec(&del_starts).unwrap().into(), + }; + postcard::to_allocvec(&out).unwrap() } struct Registers { @@ -248,6 +230,7 @@ struct Registers { } use crate::encoding::value::{ValueEncodeRegister, ValueWriter}; +use crate::version::Frontiers; impl ValueEncodeRegister for Registers { fn key_mut(&mut self) -> &mut ValueRegister { &mut self.key_register @@ -265,9 +248,114 @@ impl ValueEncodeRegister for Registers { } } +pub(crate) struct BlockHeader { + peer: PeerID, + counter: Counter, + n_changes: usize, + peers: Vec, + /// This has n + 1 elements, where counters[n] is the end counter of the + /// last change in the block. + counters: Vec, + deps: Vec, +} + +pub fn decode_header(m_bytes: &[u8]) -> LoroResult { + let EncodedDoc { + version, + n_changes, + first_counter, + peers: peers_bytes, + lengths: lengths_bytes, + dep_on_self, + dep_len, + dep_peer_idxs, + dep_counters, + lamports, + timestamps, + commit_msg_lengths, + commit_msgs, + cids, + keys, + ops, + values, + delete_start_ids, + } = postcard::from_bytes(m_bytes).map_err(|e| { + LoroError::DecodeError(format!("Decode block error {}", e).into_boxed_str()) + })?; + if version != VERSION { + return Err(LoroError::IncompatibleFutureEncodingError(version as usize)); + } + + let first_counter = first_counter as Counter; + let n_changes = n_changes as usize; + let peer_num = peers_bytes.len() / 8; + let mut peers = Vec::with_capacity(peer_num as usize); + for i in 0..(n_changes as usize) { + let peer_id = + PeerID::from_le_bytes((&peers_bytes[(8 * i)..(8 * (i + 1))]).try_into().unwrap()); + peers.push(peer_id); + } + + // ┌───────────────────┬──────────────────────────────────────────┐ │ + // │ LEB First Counter │ N LEB128 Change AtomLen │◁───┼───── Important metadata + // └───────────────────┴──────────────────────────────────────────┘ │ + let mut lengths = Vec::with_capacity(n_changes as usize); + let mut lengths_bytes: &[u8] = &*lengths_bytes; + for _ in 0..n_changes { + lengths.push(leb128::read::unsigned(&mut lengths_bytes).unwrap() as Counter); + } + + // ┌───────────────────┬────────────────────────┬─────────────────┐ │ + // │N DepOnSelf BoolRle│ N Delta Rle Deps Lens │ N Dep IDs │◁───┘ + // └───────────────────┴────────────────────────┴─────────────────┘ + + let mut dep_self_decoder = BoolRleDecoder::new(&dep_on_self); + let mut this_counter = first_counter; + let deps: Vec = Vec::with_capacity(n_changes); + let n = n_changes; + let mut deps_len = AnyRleDecoder::::new(&dep_len); + let deps_peers_decoder = AnyRleDecoder::::new(&dep_peer_idxs); + let deps_counters_decoder = AnyRleDecoder::::new(&dep_counters); + let mut deps_peers_iter = deps_peers_decoder; + let mut deps_counters_iter = deps_counters_decoder; + for i in 0..n { + let mut f = Frontiers::default(); + + if dep_self_decoder.next().unwrap().unwrap() { + f.push(ID::new(peers[0], this_counter - 1)) + } + + let len = deps_len.next().unwrap().unwrap() as usize; + for _ in 0..len { + let peer_idx = deps_peers_iter.next().unwrap().unwrap() as usize; + let peer = peers[peer_idx]; + let counter = deps_counters_iter.next().unwrap().unwrap() as Counter; + f.push(ID::new(peer, counter)); + } + + this_counter += lengths[i]; + } + + let mut counters = Vec::with_capacity(n + 1); + let mut last = first_counter; + for i in 0..n { + counters.push(last); + last += lengths[i]; + } + counters.push(last); + Ok(BlockHeader { + peer: peers[0], + counter: first_counter, + n_changes, + peers, + counters, + deps, + }) +} + struct EncodedDeps { - peer_idx: UnsignedRleEncoder, - counter: UnsignedRleEncoder, + peer_idx: AnyRleEncoder, + counter: AnyRleEncoder, } #[columnar(vec, ser, de, iterable)] @@ -282,3 +370,41 @@ struct EncodedOp { #[columnar(strategy = "Rle")] len: u32, } + +#[derive(Serialize, Deserialize)] +struct EncodedDoc<'a> { + version: u16, + n_changes: u32, + first_counter: u32, + #[serde(borrow)] + peers: Cow<'a, [u8]>, + #[serde(borrow)] + lengths: Cow<'a, [u8]>, + #[serde(borrow)] + dep_on_self: Cow<'a, [u8]>, + #[serde(borrow)] + dep_len: Cow<'a, [u8]>, + #[serde(borrow)] + dep_peer_idxs: Cow<'a, [u8]>, + #[serde(borrow)] + dep_counters: Cow<'a, [u8]>, + #[serde(borrow)] + lamports: Cow<'a, [u8]>, + #[serde(borrow)] + timestamps: Cow<'a, [u8]>, + #[serde(borrow)] + commit_msg_lengths: Cow<'a, [u8]>, + #[serde(borrow)] + commit_msgs: Cow<'a, [u8]>, + // ---------------------- Ops ---------------------- + #[serde(borrow)] + cids: Cow<'a, [u8]>, + #[serde(borrow)] + keys: Cow<'a, [u8]>, + #[serde(borrow)] + ops: Cow<'a, [u8]>, + #[serde(borrow)] + values: Cow<'a, [u8]>, + #[serde(borrow)] + delete_start_ids: Cow<'a, [u8]>, +} diff --git a/crates/loro-internal/src/oplog/change_store/delta_rle_encode.rs b/crates/loro-internal/src/oplog/change_store/delta_rle_encode.rs index 46e5c5e0..81f685e0 100644 --- a/crates/loro-internal/src/oplog/change_store/delta_rle_encode.rs +++ b/crates/loro-internal/src/oplog/change_store/delta_rle_encode.rs @@ -1,309 +1,3 @@ -use std::io::Write; - -use either::Either; -#[derive(Clone, Debug)] -enum RleState { - NonRle { vec: Vec, same_suffix_len: usize }, - Rle { value: T, count: usize }, -} - -impl RleState { - pub fn new() -> Self { - Self::NonRle { - vec: Vec::new(), - same_suffix_len: 1, - } - } - - #[must_use] - pub fn push(&mut self, value: T, min_size_to_use_rle: usize) -> Option> { - match self { - RleState::NonRle { - vec, - same_suffix_len, - } => { - if let Some(last) = vec.last() { - if *last == value { - *same_suffix_len += 1; - if *same_suffix_len >= min_size_to_use_rle { - let last = vec.pop().unwrap(); - let vec = std::mem::take(vec); - *self = RleState::Rle { - value: last, - count: *same_suffix_len, - }; - Some(RleState::NonRle { - vec, - same_suffix_len: 1, - }) - } else { - None - } - } else { - vec.push(value); - *same_suffix_len = 1; - None - } - } else { - *same_suffix_len = 1; - vec.push(value); - None - } - } - RleState::Rle { value: last, count } => { - if *last == value { - *count += 1; - None - } else { - Some(std::mem::replace( - self, - RleState::NonRle { - vec: vec![value], - same_suffix_len: 1, - }, - )) - } - } - } - } - - pub fn flush( - &self, - w: &mut W, - mut write_t: impl FnMut(&mut W, T) -> std::io::Result<()>, - ) -> std::io::Result<()> { - match self { - RleState::NonRle { - vec, - same_suffix_len, - } => { - leb128::write::signed(w, (vec.len() + same_suffix_len - 1) as i64).unwrap(); - for v in vec { - write_t(w, *v)?; - } - for _ in 1..*same_suffix_len { - write_t(w, *vec.last().unwrap())?; - } - } - RleState::Rle { value, count } => { - leb128::write::signed(w, -(*count as i64))?; - write_t(w, *value)?; - } - } - - Ok(()) - } - - pub fn from_reader( - r: &mut R, - mut read_t: impl FnMut(&mut R) -> std::io::Result, - ) -> std::io::Result { - let count = leb128::read::signed(r).unwrap(); - if count >= 0 { - let mut vec = Vec::new(); - for _ in 0..count { - vec.push(read_t(r)?); - } - Ok(RleState::NonRle { - vec, - same_suffix_len: 1, - }) - } else { - let count = -count; - let value = read_t(r)?; - Ok(RleState::Rle { - value, - count: count as usize, - }) - } - } -} - -impl Default for RleState { - fn default() -> Self { - Self::new() - } -} - -struct RleEncoderInner { - state: RleState, - min_size_to_use_rle: usize, -} - -impl RleEncoderInner { - pub fn new() -> Self { - Self { - state: RleState::new(), - min_size_to_use_rle: 2, - } - } - - pub fn new_with_min_size_to_use_rle(min_size_to_use_rle: usize) -> Self { - Self { - state: RleState::new(), - min_size_to_use_rle, - } - } - - pub fn push(&mut self, value: T) -> Option> { - self.state.push(value, self.min_size_to_use_rle) - } - - pub fn take(&mut self) -> RleState { - let ans = std::mem::take(&mut self.state); - ans - } -} - -pub struct UnsignedRleEncoder { - v: Vec, - last: u64, - rle: RleEncoderInner, - rounds: usize, -} - -impl UnsignedRleEncoder { - pub fn new(estimate_bytes: usize) -> Self { - Self { - v: Vec::new(), - last: 0, - rle: RleEncoderInner::new(), - rounds: 0, - } - } - - pub fn push(&mut self, value: u64) { - match self.rle.push(value) { - None => {} - Some(to_flush) => { - self.flush(to_flush); - } - } - } - - fn flush(&mut self, to_flush: RleState) { - to_flush - .flush(&mut self.v, |w, v| { - leb128::write::unsigned(w, v).map(|_| ()) - }) - .unwrap(); - self.rounds += 1; - } - - pub fn finish(mut self) -> (Vec, usize) { - let to_flush = self.rle.take(); - self.flush(to_flush); - let v = self.v; - (v, self.rounds) - } -} - -pub struct UnsignedRleDecoder<'a> { - v: &'a [u8], - nth_round: usize, - current_rle: Either, // (value, remaining count) -} - -impl<'a> UnsignedRleDecoder<'a> { - pub fn new(v: &'a [u8], round: usize) -> Self { - Self { - v, - nth_round: round, - current_rle: Either::Left(0), - } - } - - pub fn next(&mut self) -> Option { - match &mut self.current_rle { - Either::Left(count) => { - if *count > 0 { - *count -= 1; - let value = leb128::read::unsigned(&mut self.v).unwrap(); - return Some(value); - } - } - Either::Right((value, count)) => { - if *count > 0 { - *count -= 1; - return Some(*value); - } - } - } - - if self.nth_round == 0 { - return None; - } - - self.nth_round -= 1; - let len = leb128::read::signed(&mut self.v).unwrap(); - if len < 0 { - // Read the RLE value and count - let value = leb128::read::unsigned(&mut self.v).unwrap(); - self.current_rle = Either::Right((value, -len)); - self.next() - } else { - // Read the non-RLE value - self.current_rle = Either::Left(len); - self.next() - } - } -} - -pub struct SignedDeltaEncoder { - v: Vec, - last: i64, - round: usize, -} - -impl SignedDeltaEncoder { - pub fn new(estimate_bytes: usize) -> Self { - Self { - v: Vec::new(), - last: 0, - round: 0, - } - } - - pub fn push(&mut self, value: i64) { - let delta = value - self.last; - self.last = value; - leb128::write::signed(&mut self.v, delta).unwrap(); - self.round += 1; - } - - pub fn finish(self) -> (Vec, usize) { - let v = self.v; - (v, self.round) - } -} - -pub struct SignedDeltaDecoder<'a> { - v: &'a [u8], - count: usize, - last: i64, -} - -impl<'a> SignedDeltaDecoder<'a> { - pub fn new(v: &'a [u8], count: usize) -> Self { - Self { v, count, last: 0 } - } - - pub fn next(&mut self) -> Option { - if self.count == 0 { - return None; - } - - match leb128::read::signed(&mut self.v) { - Ok(delta) => { - self.last += delta; - self.count -= 1; - Some(self.last) - } - Err(_) => None, - } - } -} - pub struct UnsignedDeltaEncoder { v: Vec, last: u64, @@ -362,289 +56,3 @@ impl<'a> Iterator for UnsignedDeltaDecoder<'a> { Some(self.last) } } - -pub struct SignedDeltaRleEncoder { - v: Vec, - last: i64, - count: usize, - rle: RleEncoderInner, -} - -impl SignedDeltaRleEncoder { - pub fn new(estimate_bytes: usize) -> Self { - let v = Vec::with_capacity(estimate_bytes); - Self { - v, - last: 0, - count: 0, - rle: RleEncoderInner::new(), - } - } - - pub fn push(&mut self, value: i64) { - let delta = value - self.last; - match self.rle.push(delta) { - None => {} - Some(to_flush) => { - self.flush(to_flush); - } - } - } - - fn flush(&mut self, to_flush: RleState) { - to_flush - .flush(&mut self.v, |w, v| { - leb128::write::signed(w, v)?; - Ok(()) - }) - .unwrap(); - self.count += 1; - } - - pub fn finish(mut self) -> (Vec, usize) { - let to_flush = self.rle.take(); - self.flush(to_flush); - (self.v, self.count) - } -} - -pub struct SignedDeltaRleDecoder<'a> { - v: &'a [u8], - count: usize, - state: Either, -} - -impl<'a> SignedDeltaRleDecoder<'a> { - pub fn new(v: &'a [u8], count: usize) -> Self { - Self { - v, - count, - state: Either::Left(0), - } - } - - pub fn rest(mut self) -> &'a [u8] { - while self.next().is_some() {} - - self.v - } -} - -impl<'a> Iterator for SignedDeltaRleDecoder<'a> { - type Item = i64; - - fn next(&mut self) -> Option { - match &mut self.state { - Either::Left(len) => { - if *len > 0 { - *len -= 1; - let next = leb128::read::signed(&mut self.v).unwrap(); - return Some(next); - } - } - Either::Right((next, count)) => { - if *count > 0 { - *count -= 1; - return Some(*next); - } - } - } - - if self.count == 0 { - return None; - } - - self.count -= 1; - let len = leb128::read::signed(&mut self.v).unwrap(); - if len < 0 { - // RLE - let last = leb128::read::signed(&mut self.v).unwrap(); - self.state = Either::Right((last, (-len) as usize)); - self.next() - } else { - // non-RLE - self.state = Either::Left(len as usize); - self.next() - } - } -} - -pub struct UnsignedDeltaRleEncoder { - v: Vec, - last: u64, - count: usize, - rle: RleEncoderInner, -} - -impl UnsignedDeltaRleEncoder { - pub fn new(estimate_bytes: usize) -> Self { - let v = Vec::with_capacity(estimate_bytes); - Self { - v, - last: 0, - count: 0, - rle: RleEncoderInner::new(), - } - } - - pub fn push(&mut self, value: u64) { - let delta = value - self.last; - match self.rle.push(delta) { - None => {} - Some(to_flush) => { - self.flush(to_flush); - } - } - } - - fn flush(&mut self, to_flush: RleState) { - to_flush - .flush(&mut self.v, |w, v| { - leb128::write::unsigned(w, v)?; - Ok(()) - }) - .unwrap(); - self.count += 1; - } - - pub fn finish(mut self) -> (Vec, usize) { - let to_flush = self.rle.take(); - self.flush(to_flush); - (self.v, self.count) - } -} - -pub struct UnsignedDeltaRleDecoder<'a> { - v: &'a [u8], - count: usize, - state: Either, -} - -impl<'a> UnsignedDeltaRleDecoder<'a> { - pub fn new(v: &'a [u8], count: usize) -> Self { - Self { - v, - count, - state: Either::Left(0), - } - } - - pub fn rest(mut self) -> &'a [u8] { - while self.next().is_some() {} - self.v - } -} - -impl<'a> Iterator for UnsignedDeltaRleDecoder<'a> { - type Item = u64; - - fn next(&mut self) -> Option { - match &mut self.state { - Either::Left(len) => { - if *len > 0 { - *len -= 1; - let next = leb128::read::unsigned(&mut self.v).unwrap(); - return Some(next); - } - } - Either::Right((next, count)) => { - if *count > 0 { - *count -= 1; - return Some(*next); - } - } - } - - if self.count == 0 { - return None; - } - - self.count -= 1; - let len = leb128::read::signed(&mut self.v).unwrap(); - if len < 0 { - // RLE - let last = leb128::read::unsigned(&mut self.v).unwrap(); - self.state = Either::Right((last, (-len) as usize)); - self.next() - } else { - // non-RLE - self.state = Either::Left(len as usize); - self.next() - } - } -} - -pub struct BoolRleEncoder { - v: Vec, - count: usize, - rle: RleEncoderInner, -} - -impl BoolRleEncoder { - pub fn new() -> Self { - BoolRleEncoder { - v: Vec::new(), - count: 0, - rle: RleEncoderInner::new_with_min_size_to_use_rle(8), - } - } - - pub fn push(&mut self, value: bool) { - if let Some(to_flush) = self.rle.push(value) { - self.flush(to_flush); - } - } - - pub fn finish(self) -> (Vec, usize) { - (self.v, self.count) - } - - fn flush(&mut self, to_flush: RleState) { - to_flush - .flush(&mut self.v, |w, v| { - w.write_all(&[if v { 1 } else { 0 }])?; - Ok(()) - }) - .unwrap(); - self.count += 1; - } -} - -pub struct BoolRleDecoder<'a> { - v: &'a [u8], - pos: usize, - last: bool, - count: u16, -} - -impl<'a> BoolRleDecoder<'a> { - pub fn new(v: &'a [u8]) -> Self { - BoolRleDecoder { - v, - pos: 0, - last: false, - count: 0, - } - } -} - -impl<'a> Iterator for BoolRleDecoder<'a> { - type Item = bool; - - fn next(&mut self) -> Option { - if self.count > 0 { - self.count -= 1; - return Some(self.last); - } - - if self.pos >= self.v.len() { - return None; - } - - self.last = self.v[self.pos] != 0; - self.count = u16::from_le_bytes([self.v[self.pos + 1], self.v[self.pos + 2]]); - self.pos += 3; - - self.next() - } -}