refactor: reuse more serde_columnar code

This commit is contained in:
Zixuan Chen 2024-05-27 19:02:01 +08:00
parent 0c81543b6e
commit b13b4bcf94
No known key found for this signature in database
6 changed files with 263 additions and 716 deletions

4
Cargo.lock generated
View file

@ -1853,9 +1853,9 @@ dependencies = [
[[package]]
name = "serde_columnar"
version = "0.3.4"
version = "0.3.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a5d54dd7e7a1ec134c842f8a3bdb5a1fc662d002682e0457f976f3046cf9ccf8"
checksum = "06a86f5f6dc16d8308c37e145dd4c7e60fba1486d84982519388d31ea0ad6703"
dependencies = [
"itertools 0.11.0",
"postcard",

View file

@ -23,5 +23,5 @@ tracing = { version = "0.1", features = [
"max_level_debug",
"release_max_level_warn",
] }
serde_columnar = { version = "0.3.4" }
serde_columnar = { version = "0.3.5" }
itertools = "0.12.1"

View file

@ -21,6 +21,7 @@ use crate::op::{FutureInnerContent, ListSlice, Op, RawOpContent, RemoteOp, RichO
use crate::span::{HasCounterSpan, HasIdSpan, HasLamportSpan};
use crate::version::{Frontiers, ImVersionVector, VersionVector};
use crate::LoroError;
use change_store::ChangeStore;
use fxhash::FxHashMap;
use loro_common::{HasCounter, HasId, IdLp, IdSpan};
use rle::{HasLength, RleCollection, RlePush, RleVec, Sliceable};
@ -44,6 +45,7 @@ pub struct OpLog {
pub(crate) dag: AppDag,
pub(crate) arena: SharedArena,
changes: ClientChanges,
change_store: ChangeStore,
pub(crate) op_groups: OpGroups,
/// **lamport starts from 0**
pub(crate) next_lamport: Lamport,
@ -87,6 +89,7 @@ impl Clone for OpLog {
arena: self.arena.clone(),
changes: self.changes.clone(),
op_groups: self.op_groups.clone(),
change_store: self.change_store.clone(),
next_lamport: self.next_lamport,
latest_timestamp: self.latest_timestamp,
pending_changes: Default::default(),
@ -177,6 +180,7 @@ impl OpLog {
dag: AppDag::default(),
op_groups: OpGroups::new(arena.clone()),
changes: ClientChanges::default(),
change_store: ChangeStore::new(),
arena,
next_lamport: 0,
latest_timestamp: Timestamp::default(),

View file

@ -7,12 +7,20 @@ mod delta_rle_encode;
mod ops_encode;
use crate::{arena::SharedArena, change::Change, version::Frontiers};
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct ChangeStore {
kv: BTreeMap<Bytes, ChangesBlock>,
}
#[derive(Debug)]
impl ChangeStore {
pub fn new() -> Self {
Self {
kv: BTreeMap::new(),
}
}
}
#[derive(Debug, Clone)]
pub struct ChangesBlock {
arena: SharedArena,
peer: PeerID,
@ -68,6 +76,7 @@ impl ChangesBlock {
}
}
#[derive(Clone)]
enum ChangesBlockContent {
Changes(Vec<Change>),
Bytes(ChangesBlockBytes),

View file

@ -1,67 +1,80 @@
//! # Encode
//!
//! ≈4KB after compression
//!
//! N = Number of Changes
//! ≈4KB after compression
//!
//! Peer_1 = This Peer
//! N = Number of Changes
//!
//! Peer_1 = This Peer
//!
//!
//! ┌──────────┬─────┬────────────┬───────────────────────────────┐
//! │2B Version│LEB N│LEB Peer Num│ 8B Peer_1,...,Peer_x │◁────┐
//! └──────────┴─────┴────────────┴───────────────────────────────┘ │
//! ┌─────────────────────────────────────────────────────────────┐ │
//! │ N LEB128 Delta Counters │◁───┼───── Important metadata
//! └─────────────────────────────────────────────────────────────┘ │
//! ┌───────────────────┬────────────────────────┬─────────────────┐ │
//! │N DepOnSelf BoolRle│ N Delta Rle Deps Lens │ N Dep IDs │◁───┘
//! └───────────────────┴────────────────────────┴─────────────────┘
//! ┌──────────────────────────────────────────────────────────────┐
//! │ N LEB128 Delta Lamports │
//! └──────────────────────────────────────────────────────────────┘
//! ┌──────────────────────────────────────────────────────────────┐
//! │ N LEB128 Delta Timestamps │
//! └──────────────────────────────────────────────────────────────┘
//! ┌────────────────────────────────┬─────────────────────────────┐
//! │ N Rle Commit Msg Lengths │ Commit Messages │
//! └────────────────────────────────┴─────────────────────────────┘
//! ┌──────────┬─────┬────────────┬───────────────────────────────┐
//! │2B Version│LEB N│LEB Peer Num│ 8B Peer_1,...,Peer_x │◁────┐
//! └──────────┴─────┴────────────┴───────────────────────────────┘ │
//! ┌─────────────────────────────────────────────────────────────┐ │
//! │ LEB First Counter │ N LEB128 Change AtomLen │◁───┼───── Important metadata
//! └─────────────────────────────────────────────────────────────┘ │
//! ┌───────────────────┬────────────────────────┬─────────────────┐ │
//! │N DepOnSelf BoolRle│ N Delta Rle Deps Lens │ Dep IDs │◁───┤
//! └───────────────────┴────────────────────────┴─────────────────┘
//! ┌──────────────────────────────────────────────────────────────┐
//! │ N LEB128 Delta Lamports │◁───┘
//! └──────────────────────────────────────────────────────────────┘
//! ┌──────────────────────────────────────────────────────────────┐
//! │ N LEB128 Delta Timestamps │
//! └──────────────────────────────────────────────────────────────┘
//! ┌────────────────────────────────┬─────────────────────────────┐
//! │ N Rle Commit Msg Lengths │ Commit Messages │
//! └────────────────────────────────┴─────────────────────────────┘
//!
//! ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ Encoded Operations ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
//! ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ Encoded Operations ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
//!
//! ┌────────────────────┬─────────────────────────────────────────┐
//! │ ContainerIDs Size │ ContainerIDs │
//! └────────────────────┴─────────────────────────────────────────┘
//! ┌────────────────────┬─────────────────────────────────────────┐
//! │ Key Strings Size │ Key Strings │
//! └────────────────────┴─────────────────────────────────────────┘
//! ┌──────────┬──────────┬───────┬────────────────────────────────┐
//! │ │ │ │ │
//! │ │ │ │ │
//! │ │ │ │ │
//! │ LEB128 │ RLE │ Delta │ │
//! │ Lengths │Containers│ RLE │ ValueType │
//! │ │ │ Props │ │
//! │ │ │ │ │
//! │ │ │ │ │
//! │ │ │ │ │
//! └──────────┴──────────┴───────┴────────────────────────────────┘
//! ┌────────────────┬─────────────────────────────────────────────┐
//! │Value Bytes Size│ Value Bytes │
//! └────────────────┴─────────────────────────────────────────────┘
//! ┌────────────────────┬─────────────────────────────────────────┐
//! │ ContainerIDs Size │ ContainerIDs │
//! └────────────────────┴─────────────────────────────────────────┘
//! ┌────────────────────┬─────────────────────────────────────────┐
//! │ Key Strings Size │ Key Strings │
//! └────────────────────┴─────────────────────────────────────────┘
//! ┌────────┬──────────┬──────────┬───────┬───────────────────────┐
//! │ │ │ │ │ │
//! │ │ │ │ │ │
//! │ │ │ │ │ │
//! │ Ops │ LEB128 │ RLE │ Delta │ │
//! │ Size │ Lengths │Containers│ RLE │ ValueType │
//! │ │ │ │ Props │ │
//! │ │ │ │ │ │
//! │ │ │ │ │ │
//! │ │ │ │ │ │
//! └────────┴──────────┴──────────┴───────┴───────────────────────┘
//! ┌────────────────┬─────────────────────────────────────────────┐
//! │ │ │
//! │Value Bytes Size│ Value Bytes │
//! │ │ │
//! └────────────────┴─────────────────────────────────────────────┘
//! ┌──────────────────────────────────────────────────────────────┐
//! │ │
//! │ Delete Start IDs │
//! │ │
//! └──────────────────────────────────────────────────────────────┘
use std::borrow::Cow;
use std::io::Write;
use loro_common::{ContainerID, PeerID};
use loro_common::{ContainerID, Counter, LoroError, LoroResult, PeerID, ID};
use num::complex::ParseComplexError;
use rle::HasLength;
use serde_columnar::columnar;
use serde::{Deserialize, Serialize};
use serde_columnar::{columnar, Itertools};
use super::delta_rle_encode::{BoolRleEncoder, UnsignedDeltaEncoder, UnsignedRleEncoder};
use super::ChangesBlock;
use super::delta_rle_encode::UnsignedDeltaEncoder;
use crate::arena::SharedArena;
use crate::change::Change;
use crate::encoding::arena::ContainerArena;
use crate::encoding::value_register::ValueRegister;
use crate::encoding::{encode_op, get_op_prop};
use serde_columnar::{
AnyRleDecoder, AnyRleEncoder, BoolRleDecoder, BoolRleEncoder, DeltaRleEncoder,
};
const VERSION: u16 = 0;
@ -71,29 +84,24 @@ pub fn encode(block: &[Change], arena: &SharedArena) -> Vec<u8> {
panic!("Empty block")
}
let mut output = Vec::with_capacity(4096);
output.write_all(&VERSION.to_le_bytes()).unwrap();
leb128::write::unsigned(&mut output, block.len() as u64).unwrap();
let mut peer_register: ValueRegister<PeerID> = ValueRegister::new();
let peer = block[0].peer();
peer_register.register(&peer);
let cid_register: ValueRegister<ContainerID> = ValueRegister::new();
let mut counter_encoder = UnsignedDeltaEncoder::new(block.len() * 2 + 4);
let mut timestamp_encoder = UnsignedDeltaEncoder::new(block.len() * 3 + 8);
let mut lamport_encoder = UnsignedDeltaEncoder::new(block.len() * 2 + 4);
let mut commit_msg_len_encoder = UnsignedRleEncoder::new(0);
let mut commit_msg_len_encoder = AnyRleEncoder::<u32>::new();
let mut dep_self_encoder = BoolRleEncoder::new();
let mut dep_len_encoder = UnsignedRleEncoder::new(0);
let mut dep_len_encoder = AnyRleEncoder::<u64>::new();
let mut encoded_deps = EncodedDeps {
peer_idx: UnsignedRleEncoder::new(0),
counter: UnsignedRleEncoder::new(0),
peer_idx: AnyRleEncoder::new(),
counter: AnyRleEncoder::new(),
};
for c in block {
counter_encoder.push(c.id.counter as u64);
timestamp_encoder.push(c.timestamp() as u64);
lamport_encoder.push(c.lamport() as u64);
commit_msg_len_encoder.push(0);
commit_msg_len_encoder.append(0);
let mut dep_on_self = false;
for dep in c.deps().iter() {
@ -101,13 +109,13 @@ pub fn encode(block: &[Change], arena: &SharedArena) -> Vec<u8> {
dep_on_self = true;
} else {
let peer_idx = peer_register.register(&dep.peer);
encoded_deps.peer_idx.push(peer_idx as u64);
encoded_deps.counter.push(dep.counter as u64);
encoded_deps.peer_idx.append(peer_idx as u32);
encoded_deps.counter.append(dep.counter as u32);
}
}
dep_self_encoder.push(dep_on_self);
dep_len_encoder.push(if dep_on_self {
dep_self_encoder.append(dep_on_self);
dep_len_encoder.append(if dep_on_self {
c.deps().len() as u64 - 1
} else {
c.deps().len() as u64
@ -121,7 +129,7 @@ pub fn encode(block: &[Change], arena: &SharedArena) -> Vec<u8> {
cid_register,
};
let mut del_starts = Vec::new();
let mut del_starts: Vec<_> = Vec::new();
let mut value_writer = ValueWriter::new();
for c in block {
for op in c.ops().iter() {
@ -155,65 +163,23 @@ pub fn encode(block: &[Change], arena: &SharedArena) -> Vec<u8> {
// PeerIDs
let peers = registers.peer_register.unwrap_vec();
leb128::write::unsigned(&mut output, peers.len() as u64).unwrap();
for peer in peers {
output.write_all(&peer.to_le_bytes()).unwrap();
let peer_bytes: Vec<u8> = peers.iter().map(|p| p.to_le_bytes()).flatten().collect();
// Frist Counter + Change Len
let mut lengths_bytes = Vec::new();
for c in block {
leb128::write::unsigned(&mut lengths_bytes, c.atom_len() as u64).unwrap();
}
// Counters
let (bytes, _n) = counter_encoder.finish();
output.write_all(&bytes).unwrap();
// ┌───────────────────┬────────────────────────┬─────────────────┐
// │N DepOnSelf BoolRle│ N Delta Rle Deps Lens │ N Dep IDs │
// └───────────────────┴────────────────────────┴─────────────────┘
let (buf, _n) = dep_self_encoder.finish();
output.write_all(&buf).unwrap();
let (buf, _n) = dep_len_encoder.finish();
output.write_all(&buf).unwrap();
let (buf, _n) = encoded_deps.peer_idx.finish();
output.write_all(&buf).unwrap();
let (buf, _n) = encoded_deps.counter.finish();
output.write_all(&buf).unwrap();
// ┌──────────────────────────────────────────────────────────────┐
// │ N LEB128 Delta Lamports │
// └──────────────────────────────────────────────────────────────┘
let (buf, _n) = lamport_encoder.finish();
output.write_all(&buf).unwrap();
// ┌──────────────────────────────────────────────────────────────┐
// │ N LEB128 Delta Timestamps │
// └──────────────────────────────────────────────────────────────┘
let (buf, _n) = timestamp_encoder.finish();
output.write_all(&buf).unwrap();
// ┌────────────────────────────────┬─────────────────────────────┐
// │ N Rle Commit Msg Lengths │ Commit Messages │
// └────────────────────────────────┴─────────────────────────────┘
let (buf, _n) = commit_msg_len_encoder.finish();
output.write_all(&buf).unwrap();
// TODO: Commit messages
// ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ Encoded Operations ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
//
// ┌────────────────────┬─────────────────────────────────────────┐
// │ ContainerIDs Size │ ContainerIDs │
// └────────────────────┴─────────────────────────────────────────┘
let bytes = container_arena.encode();
leb128::write::unsigned(&mut output, bytes.len() as u64).unwrap();
output.write_all(&bytes).unwrap();
// ┌────────────────────┬─────────────────────────────────────────┐
// │ Key Strings Size │ Key Strings │
// └────────────────────┴─────────────────────────────────────────┘
let keys = registers.key_register.unwrap_vec();
leb128::write::unsigned(&mut output, keys.len() as u64).unwrap();
let mut keys_bytes = Vec::new();
for key in keys {
let bytes = key.as_bytes();
leb128::write::unsigned(&mut output, bytes.len() as u64).unwrap();
output.write_all(bytes).unwrap();
leb128::write::unsigned(&mut keys_bytes, bytes.len() as u64).unwrap();
keys_bytes.write_all(bytes).unwrap();
}
// ┌──────────┬──────────┬───────┬────────────────────────────────┐
@ -229,16 +195,32 @@ pub fn encode(block: &[Change], arena: &SharedArena) -> Vec<u8> {
// └──────────┴──────────┴───────┴────────────────────────────────┘
let ops_bytes = serde_columnar::to_vec(&encoded_ops).unwrap();
leb128::write::unsigned(&mut output, ops_bytes.len() as u64).unwrap();
output.write_all(&ops_bytes).unwrap();
// ┌────────────────┬─────────────────────────────────────────────┐
// │Value Bytes Size│ Value Bytes │
// └────────────────┴─────────────────────────────────────────────┘
let value_bytes = value_writer.finish();
leb128::write::unsigned(&mut output, value_bytes.len() as u64).unwrap();
output.write_all(&value_bytes).unwrap();
output
let out = EncodedDoc {
version: VERSION,
n_changes: block.len() as u32,
first_counter: block[0].id.counter as u32,
peers: Cow::Owned(peer_bytes),
lengths: Cow::Owned(lengths_bytes),
dep_on_self: dep_self_encoder.finish().unwrap().into(),
dep_len: dep_len_encoder.finish().unwrap().into(),
dep_peer_idxs: encoded_deps.peer_idx.finish().unwrap().into(),
dep_counters: encoded_deps.counter.finish().unwrap().into(),
lamports: lamport_encoder.finish().0.into(),
timestamps: timestamp_encoder.finish().0.into(),
commit_msg_lengths: commit_msg_len_encoder.finish().unwrap().into(),
commit_msgs: Cow::Owned(vec![]),
cids: container_arena.encode().into(),
keys: keys_bytes.into(),
ops: ops_bytes.into(),
values: value_bytes.into(),
delete_start_ids: serde_columnar::to_vec(&del_starts).unwrap().into(),
};
postcard::to_allocvec(&out).unwrap()
}
struct Registers {
@ -248,6 +230,7 @@ struct Registers {
}
use crate::encoding::value::{ValueEncodeRegister, ValueWriter};
use crate::version::Frontiers;
impl ValueEncodeRegister for Registers {
fn key_mut(&mut self) -> &mut ValueRegister<loro_common::InternalString> {
&mut self.key_register
@ -265,9 +248,114 @@ impl ValueEncodeRegister for Registers {
}
}
pub(crate) struct BlockHeader {
peer: PeerID,
counter: Counter,
n_changes: usize,
peers: Vec<PeerID>,
/// This has n + 1 elements, where counters[n] is the end counter of the
/// last change in the block.
counters: Vec<Counter>,
deps: Vec<Frontiers>,
}
pub fn decode_header(m_bytes: &[u8]) -> LoroResult<BlockHeader> {
let EncodedDoc {
version,
n_changes,
first_counter,
peers: peers_bytes,
lengths: lengths_bytes,
dep_on_self,
dep_len,
dep_peer_idxs,
dep_counters,
lamports,
timestamps,
commit_msg_lengths,
commit_msgs,
cids,
keys,
ops,
values,
delete_start_ids,
} = postcard::from_bytes(m_bytes).map_err(|e| {
LoroError::DecodeError(format!("Decode block error {}", e).into_boxed_str())
})?;
if version != VERSION {
return Err(LoroError::IncompatibleFutureEncodingError(version as usize));
}
let first_counter = first_counter as Counter;
let n_changes = n_changes as usize;
let peer_num = peers_bytes.len() / 8;
let mut peers = Vec::with_capacity(peer_num as usize);
for i in 0..(n_changes as usize) {
let peer_id =
PeerID::from_le_bytes((&peers_bytes[(8 * i)..(8 * (i + 1))]).try_into().unwrap());
peers.push(peer_id);
}
// ┌───────────────────┬──────────────────────────────────────────┐ │
// │ LEB First Counter │ N LEB128 Change AtomLen │◁───┼───── Important metadata
// └───────────────────┴──────────────────────────────────────────┘ │
let mut lengths = Vec::with_capacity(n_changes as usize);
let mut lengths_bytes: &[u8] = &*lengths_bytes;
for _ in 0..n_changes {
lengths.push(leb128::read::unsigned(&mut lengths_bytes).unwrap() as Counter);
}
// ┌───────────────────┬────────────────────────┬─────────────────┐ │
// │N DepOnSelf BoolRle│ N Delta Rle Deps Lens │ N Dep IDs │◁───┘
// └───────────────────┴────────────────────────┴─────────────────┘
let mut dep_self_decoder = BoolRleDecoder::new(&dep_on_self);
let mut this_counter = first_counter;
let deps: Vec<Frontiers> = Vec::with_capacity(n_changes);
let n = n_changes;
let mut deps_len = AnyRleDecoder::<u64>::new(&dep_len);
let deps_peers_decoder = AnyRleDecoder::<u32>::new(&dep_peer_idxs);
let deps_counters_decoder = AnyRleDecoder::<u32>::new(&dep_counters);
let mut deps_peers_iter = deps_peers_decoder;
let mut deps_counters_iter = deps_counters_decoder;
for i in 0..n {
let mut f = Frontiers::default();
if dep_self_decoder.next().unwrap().unwrap() {
f.push(ID::new(peers[0], this_counter - 1))
}
let len = deps_len.next().unwrap().unwrap() as usize;
for _ in 0..len {
let peer_idx = deps_peers_iter.next().unwrap().unwrap() as usize;
let peer = peers[peer_idx];
let counter = deps_counters_iter.next().unwrap().unwrap() as Counter;
f.push(ID::new(peer, counter));
}
this_counter += lengths[i];
}
let mut counters = Vec::with_capacity(n + 1);
let mut last = first_counter;
for i in 0..n {
counters.push(last);
last += lengths[i];
}
counters.push(last);
Ok(BlockHeader {
peer: peers[0],
counter: first_counter,
n_changes,
peers,
counters,
deps,
})
}
struct EncodedDeps {
peer_idx: UnsignedRleEncoder,
counter: UnsignedRleEncoder,
peer_idx: AnyRleEncoder<u32>,
counter: AnyRleEncoder<u32>,
}
#[columnar(vec, ser, de, iterable)]
@ -282,3 +370,41 @@ struct EncodedOp {
#[columnar(strategy = "Rle")]
len: u32,
}
#[derive(Serialize, Deserialize)]
struct EncodedDoc<'a> {
version: u16,
n_changes: u32,
first_counter: u32,
#[serde(borrow)]
peers: Cow<'a, [u8]>,
#[serde(borrow)]
lengths: Cow<'a, [u8]>,
#[serde(borrow)]
dep_on_self: Cow<'a, [u8]>,
#[serde(borrow)]
dep_len: Cow<'a, [u8]>,
#[serde(borrow)]
dep_peer_idxs: Cow<'a, [u8]>,
#[serde(borrow)]
dep_counters: Cow<'a, [u8]>,
#[serde(borrow)]
lamports: Cow<'a, [u8]>,
#[serde(borrow)]
timestamps: Cow<'a, [u8]>,
#[serde(borrow)]
commit_msg_lengths: Cow<'a, [u8]>,
#[serde(borrow)]
commit_msgs: Cow<'a, [u8]>,
// ---------------------- Ops ----------------------
#[serde(borrow)]
cids: Cow<'a, [u8]>,
#[serde(borrow)]
keys: Cow<'a, [u8]>,
#[serde(borrow)]
ops: Cow<'a, [u8]>,
#[serde(borrow)]
values: Cow<'a, [u8]>,
#[serde(borrow)]
delete_start_ids: Cow<'a, [u8]>,
}

View file

@ -1,309 +1,3 @@
use std::io::Write;
use either::Either;
#[derive(Clone, Debug)]
enum RleState<T> {
NonRle { vec: Vec<T>, same_suffix_len: usize },
Rle { value: T, count: usize },
}
impl<T: Copy + Eq> RleState<T> {
pub fn new() -> Self {
Self::NonRle {
vec: Vec::new(),
same_suffix_len: 1,
}
}
#[must_use]
pub fn push(&mut self, value: T, min_size_to_use_rle: usize) -> Option<RleState<T>> {
match self {
RleState::NonRle {
vec,
same_suffix_len,
} => {
if let Some(last) = vec.last() {
if *last == value {
*same_suffix_len += 1;
if *same_suffix_len >= min_size_to_use_rle {
let last = vec.pop().unwrap();
let vec = std::mem::take(vec);
*self = RleState::Rle {
value: last,
count: *same_suffix_len,
};
Some(RleState::NonRle {
vec,
same_suffix_len: 1,
})
} else {
None
}
} else {
vec.push(value);
*same_suffix_len = 1;
None
}
} else {
*same_suffix_len = 1;
vec.push(value);
None
}
}
RleState::Rle { value: last, count } => {
if *last == value {
*count += 1;
None
} else {
Some(std::mem::replace(
self,
RleState::NonRle {
vec: vec![value],
same_suffix_len: 1,
},
))
}
}
}
}
pub fn flush<W: ?Sized + Write>(
&self,
w: &mut W,
mut write_t: impl FnMut(&mut W, T) -> std::io::Result<()>,
) -> std::io::Result<()> {
match self {
RleState::NonRle {
vec,
same_suffix_len,
} => {
leb128::write::signed(w, (vec.len() + same_suffix_len - 1) as i64).unwrap();
for v in vec {
write_t(w, *v)?;
}
for _ in 1..*same_suffix_len {
write_t(w, *vec.last().unwrap())?;
}
}
RleState::Rle { value, count } => {
leb128::write::signed(w, -(*count as i64))?;
write_t(w, *value)?;
}
}
Ok(())
}
pub fn from_reader<R: ?Sized + std::io::Read>(
r: &mut R,
mut read_t: impl FnMut(&mut R) -> std::io::Result<T>,
) -> std::io::Result<Self> {
let count = leb128::read::signed(r).unwrap();
if count >= 0 {
let mut vec = Vec::new();
for _ in 0..count {
vec.push(read_t(r)?);
}
Ok(RleState::NonRle {
vec,
same_suffix_len: 1,
})
} else {
let count = -count;
let value = read_t(r)?;
Ok(RleState::Rle {
value,
count: count as usize,
})
}
}
}
impl<T: Copy + Eq> Default for RleState<T> {
fn default() -> Self {
Self::new()
}
}
struct RleEncoderInner<T> {
state: RleState<T>,
min_size_to_use_rle: usize,
}
impl<T: Copy + Eq> RleEncoderInner<T> {
pub fn new() -> Self {
Self {
state: RleState::new(),
min_size_to_use_rle: 2,
}
}
pub fn new_with_min_size_to_use_rle(min_size_to_use_rle: usize) -> Self {
Self {
state: RleState::new(),
min_size_to_use_rle,
}
}
pub fn push(&mut self, value: T) -> Option<RleState<T>> {
self.state.push(value, self.min_size_to_use_rle)
}
pub fn take(&mut self) -> RleState<T> {
let ans = std::mem::take(&mut self.state);
ans
}
}
pub struct UnsignedRleEncoder {
v: Vec<u8>,
last: u64,
rle: RleEncoderInner<u64>,
rounds: usize,
}
impl UnsignedRleEncoder {
pub fn new(estimate_bytes: usize) -> Self {
Self {
v: Vec::new(),
last: 0,
rle: RleEncoderInner::new(),
rounds: 0,
}
}
pub fn push(&mut self, value: u64) {
match self.rle.push(value) {
None => {}
Some(to_flush) => {
self.flush(to_flush);
}
}
}
fn flush(&mut self, to_flush: RleState<u64>) {
to_flush
.flush(&mut self.v, |w, v| {
leb128::write::unsigned(w, v).map(|_| ())
})
.unwrap();
self.rounds += 1;
}
pub fn finish(mut self) -> (Vec<u8>, usize) {
let to_flush = self.rle.take();
self.flush(to_flush);
let v = self.v;
(v, self.rounds)
}
}
pub struct UnsignedRleDecoder<'a> {
v: &'a [u8],
nth_round: usize,
current_rle: Either<i64, (u64, i64)>, // (value, remaining count)
}
impl<'a> UnsignedRleDecoder<'a> {
pub fn new(v: &'a [u8], round: usize) -> Self {
Self {
v,
nth_round: round,
current_rle: Either::Left(0),
}
}
pub fn next(&mut self) -> Option<u64> {
match &mut self.current_rle {
Either::Left(count) => {
if *count > 0 {
*count -= 1;
let value = leb128::read::unsigned(&mut self.v).unwrap();
return Some(value);
}
}
Either::Right((value, count)) => {
if *count > 0 {
*count -= 1;
return Some(*value);
}
}
}
if self.nth_round == 0 {
return None;
}
self.nth_round -= 1;
let len = leb128::read::signed(&mut self.v).unwrap();
if len < 0 {
// Read the RLE value and count
let value = leb128::read::unsigned(&mut self.v).unwrap();
self.current_rle = Either::Right((value, -len));
self.next()
} else {
// Read the non-RLE value
self.current_rle = Either::Left(len);
self.next()
}
}
}
pub struct SignedDeltaEncoder {
v: Vec<u8>,
last: i64,
round: usize,
}
impl SignedDeltaEncoder {
pub fn new(estimate_bytes: usize) -> Self {
Self {
v: Vec::new(),
last: 0,
round: 0,
}
}
pub fn push(&mut self, value: i64) {
let delta = value - self.last;
self.last = value;
leb128::write::signed(&mut self.v, delta).unwrap();
self.round += 1;
}
pub fn finish(self) -> (Vec<u8>, usize) {
let v = self.v;
(v, self.round)
}
}
pub struct SignedDeltaDecoder<'a> {
v: &'a [u8],
count: usize,
last: i64,
}
impl<'a> SignedDeltaDecoder<'a> {
pub fn new(v: &'a [u8], count: usize) -> Self {
Self { v, count, last: 0 }
}
pub fn next(&mut self) -> Option<i64> {
if self.count == 0 {
return None;
}
match leb128::read::signed(&mut self.v) {
Ok(delta) => {
self.last += delta;
self.count -= 1;
Some(self.last)
}
Err(_) => None,
}
}
}
pub struct UnsignedDeltaEncoder {
v: Vec<u8>,
last: u64,
@ -362,289 +56,3 @@ impl<'a> Iterator for UnsignedDeltaDecoder<'a> {
Some(self.last)
}
}
pub struct SignedDeltaRleEncoder {
v: Vec<u8>,
last: i64,
count: usize,
rle: RleEncoderInner<i64>,
}
impl SignedDeltaRleEncoder {
pub fn new(estimate_bytes: usize) -> Self {
let v = Vec::with_capacity(estimate_bytes);
Self {
v,
last: 0,
count: 0,
rle: RleEncoderInner::new(),
}
}
pub fn push(&mut self, value: i64) {
let delta = value - self.last;
match self.rle.push(delta) {
None => {}
Some(to_flush) => {
self.flush(to_flush);
}
}
}
fn flush(&mut self, to_flush: RleState<i64>) {
to_flush
.flush(&mut self.v, |w, v| {
leb128::write::signed(w, v)?;
Ok(())
})
.unwrap();
self.count += 1;
}
pub fn finish(mut self) -> (Vec<u8>, usize) {
let to_flush = self.rle.take();
self.flush(to_flush);
(self.v, self.count)
}
}
pub struct SignedDeltaRleDecoder<'a> {
v: &'a [u8],
count: usize,
state: Either<usize, (i64, usize)>,
}
impl<'a> SignedDeltaRleDecoder<'a> {
pub fn new(v: &'a [u8], count: usize) -> Self {
Self {
v,
count,
state: Either::Left(0),
}
}
pub fn rest(mut self) -> &'a [u8] {
while self.next().is_some() {}
self.v
}
}
impl<'a> Iterator for SignedDeltaRleDecoder<'a> {
type Item = i64;
fn next(&mut self) -> Option<Self::Item> {
match &mut self.state {
Either::Left(len) => {
if *len > 0 {
*len -= 1;
let next = leb128::read::signed(&mut self.v).unwrap();
return Some(next);
}
}
Either::Right((next, count)) => {
if *count > 0 {
*count -= 1;
return Some(*next);
}
}
}
if self.count == 0 {
return None;
}
self.count -= 1;
let len = leb128::read::signed(&mut self.v).unwrap();
if len < 0 {
// RLE
let last = leb128::read::signed(&mut self.v).unwrap();
self.state = Either::Right((last, (-len) as usize));
self.next()
} else {
// non-RLE
self.state = Either::Left(len as usize);
self.next()
}
}
}
pub struct UnsignedDeltaRleEncoder {
v: Vec<u8>,
last: u64,
count: usize,
rle: RleEncoderInner<u64>,
}
impl UnsignedDeltaRleEncoder {
pub fn new(estimate_bytes: usize) -> Self {
let v = Vec::with_capacity(estimate_bytes);
Self {
v,
last: 0,
count: 0,
rle: RleEncoderInner::new(),
}
}
pub fn push(&mut self, value: u64) {
let delta = value - self.last;
match self.rle.push(delta) {
None => {}
Some(to_flush) => {
self.flush(to_flush);
}
}
}
fn flush(&mut self, to_flush: RleState<u64>) {
to_flush
.flush(&mut self.v, |w, v| {
leb128::write::unsigned(w, v)?;
Ok(())
})
.unwrap();
self.count += 1;
}
pub fn finish(mut self) -> (Vec<u8>, usize) {
let to_flush = self.rle.take();
self.flush(to_flush);
(self.v, self.count)
}
}
pub struct UnsignedDeltaRleDecoder<'a> {
v: &'a [u8],
count: usize,
state: Either<usize, (u64, usize)>,
}
impl<'a> UnsignedDeltaRleDecoder<'a> {
pub fn new(v: &'a [u8], count: usize) -> Self {
Self {
v,
count,
state: Either::Left(0),
}
}
pub fn rest(mut self) -> &'a [u8] {
while self.next().is_some() {}
self.v
}
}
impl<'a> Iterator for UnsignedDeltaRleDecoder<'a> {
type Item = u64;
fn next(&mut self) -> Option<Self::Item> {
match &mut self.state {
Either::Left(len) => {
if *len > 0 {
*len -= 1;
let next = leb128::read::unsigned(&mut self.v).unwrap();
return Some(next);
}
}
Either::Right((next, count)) => {
if *count > 0 {
*count -= 1;
return Some(*next);
}
}
}
if self.count == 0 {
return None;
}
self.count -= 1;
let len = leb128::read::signed(&mut self.v).unwrap();
if len < 0 {
// RLE
let last = leb128::read::unsigned(&mut self.v).unwrap();
self.state = Either::Right((last, (-len) as usize));
self.next()
} else {
// non-RLE
self.state = Either::Left(len as usize);
self.next()
}
}
}
pub struct BoolRleEncoder {
v: Vec<u8>,
count: usize,
rle: RleEncoderInner<bool>,
}
impl BoolRleEncoder {
pub fn new() -> Self {
BoolRleEncoder {
v: Vec::new(),
count: 0,
rle: RleEncoderInner::new_with_min_size_to_use_rle(8),
}
}
pub fn push(&mut self, value: bool) {
if let Some(to_flush) = self.rle.push(value) {
self.flush(to_flush);
}
}
pub fn finish(self) -> (Vec<u8>, usize) {
(self.v, self.count)
}
fn flush(&mut self, to_flush: RleState<bool>) {
to_flush
.flush(&mut self.v, |w, v| {
w.write_all(&[if v { 1 } else { 0 }])?;
Ok(())
})
.unwrap();
self.count += 1;
}
}
pub struct BoolRleDecoder<'a> {
v: &'a [u8],
pos: usize,
last: bool,
count: u16,
}
impl<'a> BoolRleDecoder<'a> {
pub fn new(v: &'a [u8]) -> Self {
BoolRleDecoder {
v,
pos: 0,
last: false,
count: 0,
}
}
}
impl<'a> Iterator for BoolRleDecoder<'a> {
type Item = bool;
fn next(&mut self) -> Option<Self::Item> {
if self.count > 0 {
self.count -= 1;
return Some(self.last);
}
if self.pos >= self.v.len() {
return None;
}
self.last = self.v[self.pos] != 0;
self.count = u16::from_le_bytes([self.v[self.pos + 1], self.v[self.pos + 2]]);
self.pos += 3;
self.next()
}
}