Merge branch 'dev' into feat-gc

This commit is contained in:
Zixuan Chen 2024-09-03 21:21:14 +08:00
commit 4c325ab87c
No known key found for this signature in database
50 changed files with 3919 additions and 246 deletions

View file

@ -6,6 +6,7 @@
"cids",
"clippy",
"collab",
"curr",
"dhat",
"flate",
"fuzzer",
@ -34,6 +35,7 @@
"reparent",
"RUSTFLAGS",
"smstring",
"sstable",
"Stewen",
"thiserror",
"tinyvec",

101
Cargo.lock generated
View file

@ -17,6 +17,19 @@ version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe"
[[package]]
name = "ahash"
version = "0.8.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e89da841a80418a9b391ebaea17f5c112ffaaa96f621d2c285b5174da76b9011"
dependencies = [
"cfg-if",
"getrandom",
"once_cell",
"version_check",
"zerocopy",
]
[[package]]
name = "android-tzdata"
version = "0.1.1"
@ -599,6 +612,12 @@ dependencies = [
"syn 1.0.107",
]
[[package]]
name = "equivalent"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5"
[[package]]
name = "examples"
version = "0.1.0"
@ -658,6 +677,7 @@ version = "0.1.0"
dependencies = [
"arbitrary",
"arbtest",
"bytes",
"color-backtrace",
"ctor 0.2.6",
"dev-utils",
@ -703,9 +723,9 @@ dependencies = [
[[package]]
name = "getrandom"
version = "0.2.10"
version = "0.2.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "be4136b2a15dd319360be1c07d9933517ccf0be8f16bf62a3bee4f0d618df427"
checksum = "c4567c8db10ae91089c99af84c68c38da3ec2f087c3f82960bcdbf3656b6f4d7"
dependencies = [
"cfg-if",
"js-sys",
@ -750,6 +770,12 @@ version = "0.12.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888"
[[package]]
name = "hashbrown"
version = "0.14.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1"
[[package]]
name = "heapless"
version = "0.7.16"
@ -868,7 +894,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1885e79c1fc4b10f0e172c475f458b7f7b93061064d98c3293e98c5ba0c8b399"
dependencies = [
"autocfg",
"hashbrown",
"hashbrown 0.12.3",
]
[[package]]
@ -956,9 +982,9 @@ checksum = "884e2677b40cc8c339eaefcb701c32ef1fd2493d71118dc0ca4b6a736c93bd67"
[[package]]
name = "libc"
version = "0.2.147"
version = "0.2.158"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b4668fb0ea861c1df094127ac5f1da3409a82116a4ba74fca2e58ef927159bb3"
checksum = "d8adc4bb1803a324070e64a98ae98f38934d91957a99cfb3a43dcbc01bc56439"
[[package]]
name = "lock_api"
@ -1139,6 +1165,7 @@ dependencies = [
"leb128",
"loro-common 0.16.2",
"loro-delta 0.16.2",
"loro-kv-store",
"loro-rle 0.16.2",
"loro_fractional_index 0.16.2",
"md5",
@ -1236,6 +1263,19 @@ dependencies = [
"tracing",
]
[[package]]
name = "loro-kv-store"
version = "0.16.2"
dependencies = [
"bytes",
"fxhash",
"loro-common 0.16.2",
"lz4_flex",
"once_cell",
"quick_cache",
"xxhash-rust",
]
[[package]]
name = "loro-rle"
version = "0.16.2"
@ -1337,6 +1377,15 @@ dependencies = [
"smallvec",
]
[[package]]
name = "lz4_flex"
version = "0.11.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "75761162ae2b0e580d7e7c390558127e5f01b4194debd6221fd8c207fc80e3f5"
dependencies = [
"twox-hash",
]
[[package]]
name = "md5"
version = "0.7.0"
@ -1750,6 +1799,18 @@ version = "2.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a993555f31e5a609f617c12db6250dedcac1b0a85076912c436e6fc9b2c8e6a3"
[[package]]
name = "quick_cache"
version = "0.6.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "27a893a83255c587d31137bc7e350387b49267b0deac44120fd8fa8bd0d61645"
dependencies = [
"ahash",
"equivalent",
"hashbrown 0.14.5",
"parking_lot",
]
[[package]]
name = "quote"
version = "0.6.13"
@ -2350,6 +2411,16 @@ dependencies = [
"wasm-bindgen",
]
[[package]]
name = "twox-hash"
version = "1.6.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "97fee6b57c6a41524a810daee9286c02d7752c4253064d0b05472833a438f675"
dependencies = [
"cfg-if",
"static_assertions",
]
[[package]]
name = "typenum"
version = "1.16.0"
@ -2733,6 +2804,26 @@ version = "0.8.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6a5cbf750400958819fb6178eaa83bee5cd9c29a26a40cc241df8c70fdd46984"
[[package]]
name = "zerocopy"
version = "0.7.35"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1b9b4fd18abc82b8136838da5d50bae7bdea537c574d8dc1a34ed098d6c166f0"
dependencies = [
"zerocopy-derive",
]
[[package]]
name = "zerocopy-derive"
version = "0.7.35"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e"
dependencies = [
"proc-macro2 1.0.75",
"quote 1.0.35",
"syn 2.0.48",
]
[[package]]
name = "zstd"
version = "0.13.2"

View file

@ -11,6 +11,7 @@ members = [
"crates/fractional_index",
"crates/dev-utils",
"crates/delta",
"crates/kv-store",
]
resolver = "2"
@ -18,9 +19,13 @@ resolver = "2"
enum_dispatch = "0.3.11"
enum-as-inner = "0.5.1"
fxhash = "0.2.1"
tracing = { version = "0.1", features = ["release_max_level_warn"] }
tracing = { version = "0.1" }
serde_columnar = { version = "0.3.7" }
serde_json = "1.0"
thiserror = "1"
smallvec = { version = "1.8.0", features = ["serde"] }
itertools = "0.12.1"
serde = "1"
bytes = "1"
once_cell = "1.18.0"
xxhash-rust = { version = "0.8.12", features = ["xxh32"] }

View file

@ -21,6 +21,7 @@ rand = "0.8.5"
serde_json = "1"
num_cpus = "1.16.0"
rayon = "1.10.0"
bytes = "1"
[dev-dependencies]
ctor = "0.2"

View file

@ -2,6 +2,19 @@
# It is not intended for manual editing.
version = 3
[[package]]
name = "ahash"
version = "0.8.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e89da841a80418a9b391ebaea17f5c112ffaaa96f621d2c285b5174da76b9011"
dependencies = [
"cfg-if",
"getrandom",
"once_cell",
"version_check",
"zerocopy",
]
[[package]]
name = "append-only-bytes"
version = "0.1.12"
@ -231,6 +244,12 @@ dependencies = [
"syn 2.0.50",
]
[[package]]
name = "equivalent"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5"
[[package]]
name = "fnv"
version = "1.0.7"
@ -242,12 +261,15 @@ name = "fuzz"
version = "0.1.0"
dependencies = [
"arbitrary",
"bytes",
"enum-as-inner 0.5.1",
"enum_dispatch",
"fxhash",
"itertools 0.12.1",
"loro 0.16.2",
"loro 0.16.2 (git+https://github.com/loro-dev/loro.git?tag=loro-crdt@0.16.7)",
"loro 0.16.2 (git+https://github.com/loro-dev/loro.git?rev=90470658435ec4c62b5af59ebb82fe9e1f5aa761)",
"num_cpus",
"rand",
"rayon",
"serde_json",
@ -289,9 +311,9 @@ dependencies = [
[[package]]
name = "getrandom"
version = "0.2.12"
version = "0.2.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "190092ea657667030ac6a35e305e62fc4dd69fd98ac98631e5d3a2b1575a12b5"
checksum = "c4567c8db10ae91089c99af84c68c38da3ec2f087c3f82960bcdbf3656b6f4d7"
dependencies = [
"cfg-if",
"libc",
@ -316,6 +338,12 @@ dependencies = [
"byteorder",
]
[[package]]
name = "hashbrown"
version = "0.14.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1"
[[package]]
name = "heapless"
version = "0.7.17"
@ -436,9 +464,9 @@ checksum = "884e2677b40cc8c339eaefcb701c32ef1fd2493d71118dc0ca4b6a736c93bd67"
[[package]]
name = "libc"
version = "0.2.153"
version = "0.2.158"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c198f91728a82281a64e1f4f9eeb25d82cb32a5de251c6bd1b5154d63a8e7bd"
checksum = "d8adc4bb1803a324070e64a98ae98f38934d91957a99cfb3a43dcbc01bc56439"
[[package]]
name = "libfuzzer-sys"
@ -473,6 +501,19 @@ dependencies = [
"tracing",
]
[[package]]
name = "loro"
version = "0.16.2"
source = "git+https://github.com/loro-dev/loro.git?tag=loro-crdt@0.16.7#d2b0520f8633f96146a49ec205bd5e7056880f1a"
dependencies = [
"either",
"enum-as-inner 0.6.0",
"generic-btree",
"loro-delta 0.16.2 (git+https://github.com/loro-dev/loro.git?tag=loro-crdt@0.16.7)",
"loro-internal 0.16.2 (git+https://github.com/loro-dev/loro.git?tag=loro-crdt@0.16.7)",
"tracing",
]
[[package]]
name = "loro"
version = "0.16.2"
@ -502,6 +543,22 @@ dependencies = [
"thiserror",
]
[[package]]
name = "loro-common"
version = "0.16.2"
source = "git+https://github.com/loro-dev/loro.git?tag=loro-crdt@0.16.7#d2b0520f8633f96146a49ec205bd5e7056880f1a"
dependencies = [
"arbitrary",
"enum-as-inner 0.6.0",
"fxhash",
"loro-rle 0.16.2 (git+https://github.com/loro-dev/loro.git?tag=loro-crdt@0.16.7)",
"nonmax",
"serde",
"serde_columnar",
"string_cache",
"thiserror",
]
[[package]]
name = "loro-common"
version = "0.16.2"
@ -529,6 +586,18 @@ dependencies = [
"tracing",
]
[[package]]
name = "loro-delta"
version = "0.16.2"
source = "git+https://github.com/loro-dev/loro.git?tag=loro-crdt@0.16.7#d2b0520f8633f96146a49ec205bd5e7056880f1a"
dependencies = [
"arrayvec",
"enum-as-inner 0.5.1",
"generic-btree",
"heapless 0.8.0",
"tracing",
]
[[package]]
name = "loro-delta"
version = "0.16.2"
@ -559,6 +628,7 @@ dependencies = [
"leb128",
"loro-common 0.16.2",
"loro-delta 0.16.2",
"loro-kv-store",
"loro-rle 0.16.2",
"loro_fractional_index 0.16.2",
"md5",
@ -578,6 +648,41 @@ dependencies = [
"xxhash-rust",
]
[[package]]
name = "loro-internal"
version = "0.16.2"
source = "git+https://github.com/loro-dev/loro.git?tag=loro-crdt@0.16.7#d2b0520f8633f96146a49ec205bd5e7056880f1a"
dependencies = [
"append-only-bytes",
"arref",
"either",
"enum-as-inner 0.5.1",
"enum_dispatch",
"fxhash",
"generic-btree",
"getrandom",
"im",
"itertools 0.12.1",
"leb128",
"loro-common 0.16.2 (git+https://github.com/loro-dev/loro.git?tag=loro-crdt@0.16.7)",
"loro-delta 0.16.2 (git+https://github.com/loro-dev/loro.git?tag=loro-crdt@0.16.7)",
"loro-rle 0.16.2 (git+https://github.com/loro-dev/loro.git?tag=loro-crdt@0.16.7)",
"loro_fractional_index 0.16.2 (git+https://github.com/loro-dev/loro.git?tag=loro-crdt@0.16.7)",
"md5",
"num",
"num-derive",
"num-traits",
"once_cell",
"postcard",
"rand",
"serde",
"serde_columnar",
"serde_json",
"smallvec",
"thiserror",
"tracing",
]
[[package]]
name = "loro-internal"
version = "0.16.2"
@ -613,6 +718,19 @@ dependencies = [
"tracing",
]
[[package]]
name = "loro-kv-store"
version = "0.16.2"
dependencies = [
"bytes",
"fxhash",
"loro-common 0.16.2",
"lz4_flex",
"once_cell",
"quick_cache",
"xxhash-rust",
]
[[package]]
name = "loro-rle"
version = "0.16.2"
@ -625,6 +743,19 @@ dependencies = [
"smallvec",
]
[[package]]
name = "loro-rle"
version = "0.16.2"
source = "git+https://github.com/loro-dev/loro.git?tag=loro-crdt@0.16.7#d2b0520f8633f96146a49ec205bd5e7056880f1a"
dependencies = [
"append-only-bytes",
"arref",
"enum-as-inner 0.6.0",
"fxhash",
"num",
"smallvec",
]
[[package]]
name = "loro-rle"
version = "0.16.2"
@ -654,6 +785,17 @@ dependencies = [
"smallvec",
]
[[package]]
name = "loro_fractional_index"
version = "0.16.2"
source = "git+https://github.com/loro-dev/loro.git?tag=loro-crdt@0.16.7#d2b0520f8633f96146a49ec205bd5e7056880f1a"
dependencies = [
"imbl",
"rand",
"serde",
"smallvec",
]
[[package]]
name = "loro_fractional_index"
version = "0.16.2"
@ -665,6 +807,15 @@ dependencies = [
"smallvec",
]
[[package]]
name = "lz4_flex"
version = "0.11.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "75761162ae2b0e580d7e7c390558127e5f01b4194debd6221fd8c207fc80e3f5"
dependencies = [
"twox-hash",
]
[[package]]
name = "md5"
version = "0.7.0"
@ -891,6 +1042,18 @@ dependencies = [
"unicode-ident",
]
[[package]]
name = "quick_cache"
version = "0.6.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ec932c60e6faf77dc6601ea149a23d821598b019b450bb1d98fe89c0301c0b61"
dependencies = [
"ahash",
"equivalent",
"hashbrown",
"parking_lot",
]
[[package]]
name = "quote"
version = "1.0.35"
@ -1091,6 +1254,12 @@ version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3"
[[package]]
name = "static_assertions"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f"
[[package]]
name = "string_cache"
version = "0.8.7"
@ -1208,6 +1377,16 @@ dependencies = [
"once_cell",
]
[[package]]
name = "twox-hash"
version = "1.6.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "97fee6b57c6a41524a810daee9286c02d7752c4253064d0b05472833a438f675"
dependencies = [
"cfg-if",
"static_assertions",
]
[[package]]
name = "typenum"
version = "1.17.0"
@ -1300,3 +1479,23 @@ name = "xxhash-rust"
version = "0.8.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6a5cbf750400958819fb6178eaa83bee5cd9c29a26a40cc241df8c70fdd46984"
[[package]]
name = "zerocopy"
version = "0.7.35"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1b9b4fd18abc82b8136838da5d50bae7bdea537c574d8dc1a34ed098d6c166f0"
dependencies = [
"zerocopy-derive",
]
[[package]]
name = "zerocopy-derive"
version = "0.7.35"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.50",
]

View file

@ -53,3 +53,15 @@ name = "text-update"
path = "fuzz_targets/text-update.rs"
test = false
doc = false
[[bin]]
name = "kv_store"
path = "fuzz_targets/kv_store.rs"
test = false
doc = false
[[bin]]
name = "random_import"
path = "fuzz_targets/random_import.rs"
test = false
doc = false

View file

@ -0,0 +1,8 @@
#![no_main]
use fuzz::{test_mem_kv_fuzzer, KVAction};
use libfuzzer_sys::fuzz_target;
fuzz_target!(|actions: Vec<KVAction>| {
test_mem_kv_fuzzer(&mut actions.clone());
});

View file

@ -0,0 +1,8 @@
#![no_main]
use fuzz::test_random_bytes_import;
use libfuzzer_sys::fuzz_target;
fuzz_target!(|data: &[u8]| {
test_random_bytes_import(data);
});

View file

@ -9,7 +9,6 @@ use std::{
use arbitrary::Arbitrary;
use fxhash::FxHashSet;
use loro::{ContainerType, Frontiers};
use rayon::iter::ParallelExtend;
use tabled::TableIteratorExt;
use tracing::{info, info_span};

View file

@ -5,3 +5,8 @@ pub mod crdt_fuzzer;
mod macros;
mod value;
pub use crdt_fuzzer::{test_multi_sites, Action, FuzzTarget};
mod mem_kv_fuzzer;
pub use mem_kv_fuzzer::{
minify_simple as kv_minify_simple, test_mem_kv_fuzzer, test_random_bytes_import,
Action as KVAction,
};

View file

@ -0,0 +1,332 @@
use arbitrary::Arbitrary;
use bytes::Bytes;
use loro::{KvStore, MemKvStore};
use std::collections::{BTreeMap, BTreeSet};
use std::ops::Bound;
#[derive(Clone, Arbitrary)]
pub enum Action {
Add {
key: Vec<u8>,
value: Vec<u8>,
},
Get(usize),
Remove(usize),
Scan {
start: usize,
end: usize,
start_include: bool,
end_include: bool,
},
ExportAndImport,
Flush,
}
impl std::fmt::Debug for Action {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Action::Add { key, value } => {
write!(
f,
"Add{{\n\tkey: vec!{:?}, \n\tvalue: vec!{:?}\n}}",
key, value
)
}
Action::Get(index) => write!(f, "Get({})", index),
Action::Remove(index) => write!(f, "Remove({})", index),
Action::Scan {
start,
end,
start_include,
end_include,
} => write!(
f,
"Scan{{\n\tstart: {:?}, \n\tend: {:?}, \n\tstart_include: {:?}, \n\tend_include: {:?}\n}}",
start, end, start_include, end_include
),
Action::ExportAndImport => write!(f, "ExportAndImport"),
Action::Flush => write!(f, "Flush"),
}
}
}
#[derive(Default)]
pub struct MemKvFuzzer {
kv: MemKvStore,
btree: BTreeMap<Bytes, Bytes>,
all_keys: BTreeSet<Bytes>,
merged_kv: MemKvStore,
merged_btree: BTreeMap<Bytes, Bytes>,
}
impl MemKvFuzzer {
fn prepare(&self, action: &mut Action) {
match action {
Action::Add { key, value } => {
if key.is_empty() {
*key = vec![0];
}
if value.is_empty() {
*value = vec![0];
}
}
Action::Get(index) | Action::Remove(index) => {
if self.all_keys.is_empty() {
*action = Action::Add {
key: vec![0],
value: vec![0],
};
} else {
*index %= self.all_keys.len();
}
}
Action::Scan {
start,
end,
start_include,
end_include,
} => {
if self.all_keys.is_empty() {
*action = Action::Add {
key: vec![0],
value: vec![0],
};
} else {
*start %= self.all_keys.len();
*end %= self.all_keys.len();
if *start > *end {
std::mem::swap(start, end);
} else if *start == *end && !*start_include && !*end_include {
*end_include = true;
}
}
}
Action::ExportAndImport | Action::Flush => {}
}
}
fn apply(&mut self, action: &Action) {
match action {
Action::Add { key, value } => {
let key_bytes = Bytes::from(key.clone());
let value_bytes = Bytes::from(value.clone());
self.kv.set(&key_bytes, value_bytes.clone());
self.btree.insert(key_bytes.clone(), value_bytes);
self.all_keys.insert(key_bytes);
}
Action::Get(index) => {
if let Some(key) = self.all_keys.iter().nth(*index) {
let kv_result = self.kv.get(key);
let btree_result = self.btree.get(key).cloned();
assert_eq!(kv_result, btree_result, "get failed");
}
}
Action::Remove(index) => {
let key = self.all_keys.iter().nth(*index).unwrap();
self.kv.remove(key);
self.btree.insert(key.clone(), Bytes::new());
self.all_keys.remove(&key.clone());
}
Action::Scan {
start,
end,
start_include,
end_include,
} => {
let keys: Vec<_> = self.all_keys.iter().collect();
let start_bound = if *start_include {
Bound::Included(&keys[*start][..])
} else {
Bound::Excluded(&keys[*start][..])
};
let end_bound = if *end_include {
Bound::Included(&keys[*end][..])
} else {
Bound::Excluded(&keys[*end][..])
};
let kv_scan: Vec<_> = self.kv.scan(start_bound, end_bound).collect();
let btree_scan: Vec<_> = self
.btree
.scan(start_bound, end_bound)
.filter(|(_, v)| !v.is_empty())
.collect();
assert_eq!(kv_scan, btree_scan);
let kv_scan: Vec<_> = self.kv.scan(start_bound, end_bound).rev().collect();
let btree_scan: Vec<_> = self
.btree
.scan(start_bound, end_bound)
.filter(|(_, v)| !v.is_empty())
.rev()
.collect();
assert_eq!(kv_scan, btree_scan);
}
Action::ExportAndImport => {
let exported = self.kv.export_all();
self.kv = MemKvStore::default();
self.merged_kv.import_all(exported).expect("import failed");
self.merged_btree.extend(std::mem::take(&mut self.btree));
for (key, value) in self.merged_btree.iter().filter(|(_, v)| !v.is_empty()) {
assert_eq!(
self.merged_kv.get(key),
Some(value.clone()),
"export and import failed key: {:?}",
key
);
}
self.all_keys.clear();
}
Action::Flush => {
self.kv.export_all();
}
}
}
fn equal(&self) {
let kv_scan: Vec<_> = self.kv.scan(Bound::Unbounded, Bound::Unbounded).collect();
let btree_scan: Vec<_> = self
.btree
.scan(Bound::Unbounded, Bound::Unbounded)
.filter(|(_, v)| !v.is_empty())
.collect();
assert_eq!(kv_scan, btree_scan);
let kv_scan: Vec<_> = self
.kv
.scan(Bound::Unbounded, Bound::Unbounded)
.rev()
.collect();
let btree_scan: Vec<_> = self
.btree
.scan(Bound::Unbounded, Bound::Unbounded)
.filter(|(_, v)| !v.is_empty())
.rev()
.collect();
assert_eq!(kv_scan, btree_scan);
let merge_scan: Vec<_> = self
.merged_kv
.scan(Bound::Unbounded, Bound::Unbounded)
.collect();
let btree_scan: Vec<_> = self
.merged_btree
.scan(Bound::Unbounded, Bound::Unbounded)
.filter(|(_, v)| !v.is_empty())
.collect();
assert_eq!(merge_scan, btree_scan);
let merge_scan: Vec<_> = self
.merged_kv
.scan(Bound::Unbounded, Bound::Unbounded)
.rev()
.collect();
let btree_scan: Vec<_> = self
.merged_btree
.scan(Bound::Unbounded, Bound::Unbounded)
.filter(|(_, v)| !v.is_empty())
.rev()
.collect();
assert_eq!(merge_scan, btree_scan);
}
}
pub fn test_mem_kv_fuzzer(actions: &mut [Action]) {
let mut fuzzer = MemKvFuzzer::default();
let mut applied = Vec::new();
for action in actions {
fuzzer.prepare(action);
applied.push(action.clone());
tracing::info!("\n{:#?}", applied);
fuzzer.apply(action);
}
tracing::info!("\n{:#?}", applied);
fuzzer.equal();
}
pub fn test_random_bytes_import(bytes: &[u8]) {
let mut kv = MemKvStore::default();
match kv.import_all(Bytes::from(bytes.to_vec())) {
Ok(_) => {
// do nothing
}
Err(_) => {
// do nothing
}
}
}
pub fn minify_simple<T, F>(f: F, actions: Vec<T>)
where
F: Fn(&mut [T]),
T: Clone + std::fmt::Debug,
{
std::panic::set_hook(Box::new(|_info| {
// ignore panic output
// println!("{:?}", _info);
}));
let f_ref: *const _ = &f;
let f_ref: usize = f_ref as usize;
#[allow(clippy::redundant_clone)]
let mut actions_clone = actions.clone();
let action_ref: usize = (&mut actions_clone) as *mut _ as usize;
#[allow(clippy::blocks_in_conditions)]
if std::panic::catch_unwind(|| {
// SAFETY: test
let f = unsafe { &*(f_ref as *const F) };
// SAFETY: test
let actions_ref = unsafe { &mut *(action_ref as *mut Vec<T>) };
f(actions_ref);
})
.is_ok()
{
println!("No Error Found");
return;
}
let mut minified = actions.clone();
let mut current_index = minified.len() as i64 - 1;
while current_index > 0 {
let a = minified.remove(current_index as usize);
let f_ref: *const _ = &f;
let f_ref: usize = f_ref as usize;
let mut actions_clone = minified.clone();
let action_ref: usize = (&mut actions_clone) as *mut _ as usize;
let mut re = false;
#[allow(clippy::blocks_in_conditions)]
if std::panic::catch_unwind(|| {
// SAFETY: test
let f = unsafe { &*(f_ref as *const F) };
// SAFETY: test
let actions_ref = unsafe { &mut *(action_ref as *mut Vec<T>) };
f(actions_ref);
})
.is_err()
{
re = true;
} else {
minified.insert(current_index as usize, a);
}
println!(
"{}/{} {}",
actions.len() as i64 - current_index,
actions.len(),
re
);
current_index -= 1;
}
println!("{:?}", &minified);
println!(
"Old Length {}, New Length {}",
actions.len(),
minified.len()
);
if actions.len() > minified.len() {
minify_simple(f, minified);
}
}

168
crates/fuzz/tests/kv.rs Normal file
View file

@ -0,0 +1,168 @@
use fuzz::{kv_minify_simple, test_mem_kv_fuzzer, KVAction::*};
#[ctor::ctor]
fn init() {
dev_utils::setup_test_log();
}
#[test]
fn add_same_key_twice() {
test_mem_kv_fuzzer(&mut [
Add {
key: vec![],
value: vec![254],
},
Flush,
Add {
key: vec![],
value: vec![],
},
])
}
#[test]
fn add_and_remove() {
test_mem_kv_fuzzer(&mut [
Add {
key: vec![],
value: vec![238],
},
Remove(0),
])
}
#[test]
fn add_flush_remove() {
test_mem_kv_fuzzer(&mut [
Add {
key: vec![],
value: vec![],
},
Flush,
Remove(3791655167),
])
}
#[test]
fn export_and_import() {
test_mem_kv_fuzzer(&mut [
Add {
key: vec![],
value: vec![],
},
ExportAndImport,
])
}
#[test]
fn add_flush_add_scan() {
test_mem_kv_fuzzer(&mut [
Add {
key: vec![],
value: vec![],
},
Flush,
Add {
key: vec![128],
value: vec![252, 169],
},
Scan {
start: 12249507989402000797,
end: 18231419743747221929,
start_include: true,
end_include: true,
},
])
}
#[test]
fn add_some() {
test_mem_kv_fuzzer(&mut [
Add {
key: vec![255, 255, 255, 255, 63],
value: vec![],
},
Add {
key: vec![255, 3],
value: vec![255],
},
Add {
key: vec![255],
value: vec![],
},
Add {
key: vec![],
value: vec![],
},
Flush,
Scan {
start: 18446744073709551615,
end: 18446744073709551615,
start_include: true,
end_include: true,
},
])
}
#[test]
fn merge_import() {
test_mem_kv_fuzzer(&mut [
Add {
key: vec![205, 197, 255, 12],
value: vec![0, 0, 9],
},
Add {
key: vec![],
value: vec![],
},
Flush,
Add {
key: vec![57],
value: vec![209, 3, 255, 174, 0, 255],
},
Add {
key: vec![41],
value: vec![209, 0, 41, 63, 205],
},
Add {
key: vec![0, 0],
value: vec![1],
},
ExportAndImport,
Flush,
ExportAndImport,
Remove(14829789716734785489),
Remove(13191005920967349589),
ExportAndImport,
ExportAndImport,
Get(13238251090391746632),
Add {
key: vec![],
value: vec![],
},
])
}
#[test]
fn scan_empty() {
test_mem_kv_fuzzer(&mut [
Add{
key: vec![0, 255],
value: vec![]
},
Add{
key: vec![],
value: vec![]
},
Scan{
start: 129,
end: 0,
start_include: false,
end_include: false
},
])
}
#[test]
fn minify() {
kv_minify_simple(test_mem_kv_fuzzer, vec![])
}

View file

@ -0,0 +1,13 @@
[package]
name = "loro-kv-store"
version = "0.16.2"
edition = "2021"
[dependencies]
loro-common = { path = "../loro-common", version = "0.16.2" }
bytes = { workspace = true }
fxhash = { workspace = true }
once_cell = { workspace = true }
lz4_flex = { version = "0.11" }
quick_cache = "0.6.2"
xxhash-rust = { workspace = true }

View file

@ -0,0 +1,651 @@
use std::{
borrow::Cow, fmt::Debug, io::Write, ops::{Bound, Range}, sync::Arc
};
use bytes::{Buf, Bytes};
use loro_common::LoroResult;
use crate::{compress::{compress, decompress, CompressionType}, iter::KvIterator, sstable::{get_common_prefix_len_and_strip, SIZE_OF_U32, XXH_SEED}};
use super::sstable::{ SIZE_OF_U16, SIZE_OF_U8};
#[derive(Debug)]
pub struct LargeValueBlock{
// without checksum
pub value_bytes: Bytes,
pub key: Bytes,
}
impl LargeValueBlock{
/// ┌──────────────────────────┐
/// │Large Block │
/// │┌ ─ ─ ─ ┬ ─ ─ ─ ─ ─ ─ ─ ─ │
/// │ value Block Checksum ││
/// ││ bytes │ u32 │
/// │ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘│
/// └──────────────────────────┘
fn encode(&self, w: &mut Vec<u8>, compression_type: CompressionType){
let origin_len = w.len();
compress(w, Cow::Borrowed(&self.value_bytes), compression_type);
let checksum = xxhash_rust::xxh32::xxh32(&w[origin_len..], XXH_SEED);
w.write_all(&checksum.to_le_bytes()).unwrap();
}
fn decode(bytes:Bytes, key: Bytes, compression_type: CompressionType)->LoroResult<Self>{
let mut value_bytes = vec![];
decompress(&mut value_bytes, bytes.slice(..bytes.len() - SIZE_OF_U32), compression_type)?;
Ok(LargeValueBlock{
value_bytes: Bytes::from(value_bytes),
key,
})
}
}
#[derive(Debug)]
pub struct NormalBlock {
pub data: Bytes,
pub first_key: Bytes,
pub offsets: Vec<u16>,
}
impl NormalBlock {
/// ┌────────────────────────────────────────────────────────────────────────────────────────┐
/// │Block │
/// │┌ ─ ─ ─ ─ ─ ─ ─ ┬ ─ ─ ─┌ ─ ─ ─ ─ ─ ─ ─ ┬ ─ ─ ─ ─┌ ─ ─ ─┌ ─ ─ ─ ┬ ─ ─ ─ ─┌ ─ ─ ─ ─ ─ ─ ─ │
/// │ Key Value Chunk ... │Key Value Chunk offset │ ... │ offset kv len │Block Checksum││
/// ││ bytes │ │ bytes │ u16 │ │ u16 │ u16 │ u32 │
/// │ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘─ ─ ─ ┘─ ─ ─ ─ ─ ─ ─ ─ ┘─ ─ ─ ─ ─ ─ ─ ┘│
/// └────────────────────────────────────────────────────────────────────────────────────────┘
///
/// The block body may be compressed then we calculate its checksum (the checksum is not compressed).
fn encode(&self, w: &mut Vec<u8>, compression_type: CompressionType) {
let origin_len = w.len();
let mut buf = self.data.to_vec();
for offset in &self.offsets {
buf.extend_from_slice(&offset.to_le_bytes());
}
buf.extend_from_slice(&(self.offsets.len() as u16).to_le_bytes());
compress(w,Cow::Owned(buf), compression_type);
let checksum = xxhash_rust::xxh32::xxh32(&w[origin_len..], XXH_SEED);
w.extend_from_slice(&checksum.to_le_bytes());
}
fn decode(raw_block_and_check: Bytes, first_key: Bytes, compression_type: CompressionType)-> LoroResult<NormalBlock>{
let buf = raw_block_and_check.slice(..raw_block_and_check.len() - SIZE_OF_U32);
let mut data = vec![];
decompress(&mut data, buf,compression_type)?;
let offsets_len = (&data[data.len()-SIZE_OF_U16..]).get_u16_le() as usize;
let data_end = data.len() - SIZE_OF_U16 * (offsets_len + 1);
let offsets = &data[data_end..data.len()-SIZE_OF_U16];
let offsets = offsets.chunks(SIZE_OF_U16).map(|mut chunk| chunk.get_u16_le()).collect();
Ok(NormalBlock{
data: Bytes::copy_from_slice(&data[..data_end]),
offsets,
first_key,
})
}
}
#[derive(Debug)]
pub enum Block{
Normal(NormalBlock),
Large(LargeValueBlock),
}
impl Block{
pub fn is_large(&self)->bool{
matches!(self, Block::Large(_))
}
pub fn data(&self)->Bytes{
match self{
Block::Normal(block)=>block.data.clone(),
Block::Large(block)=>block.value_bytes.clone(),
}
}
pub fn first_key(&self)->Bytes{
match self{
Block::Normal(block)=>block.first_key.clone(),
Block::Large(block)=>block.key.clone(),
}
}
pub fn encode(&self, w: &mut Vec<u8>, compression_type: CompressionType){
match self{
Block::Normal(block)=>block.encode(w,compression_type),
Block::Large(block)=>block.encode(w,compression_type),
}
}
pub fn decode(raw_block_and_check: Bytes, is_large: bool, key: Bytes, compression_type: CompressionType)->Self{
// we have checked the checksum, so the block should be valid when decompressing
if is_large{
return LargeValueBlock::decode(raw_block_and_check, key, compression_type).map(Block::Large).unwrap()
}
NormalBlock::decode(raw_block_and_check, key, compression_type).map(Block::Normal).unwrap()
}
pub fn len(&self)->usize{
match self{
Block::Normal(block)=>block.offsets.len(),
Block::Large(_)=>1,
}
}
pub fn is_empty(&self)->bool{
match self{
Block::Normal(block)=>block.offsets.is_empty(),
Block::Large(_)=>false,
}
}
}
#[derive(Debug)]
pub struct BlockBuilder {
data: Vec<u8>,
offsets: Vec<u16>,
block_size: usize,
// for key compression
first_key: Bytes,
is_large: bool,
}
impl BlockBuilder {
pub fn new(block_size: usize) -> Self {
Self {
data: Vec::new(),
offsets: Vec::new(),
block_size,
first_key: Bytes::new(),
is_large:false
}
}
fn estimated_size(&self) -> usize {
if self.is_large{
self.data.len()
}else{
// key-value pairs number
SIZE_OF_U16 +
// offsets
self.offsets.len() * SIZE_OF_U16 +
// key-value pairs data
self.data.len() +
// checksum
SIZE_OF_U32
}
}
pub fn is_empty(&self)->bool{
!self.is_large && self.offsets.is_empty()
}
/// Add a key-value pair to the block.
/// Returns true if the key-value pair is added successfully, false the block is full.
///
/// ┌─────────────────────────────────────────────────────┐
/// │ Key Value Chunk │
/// │┌ ─ ─ ─ ─ ─ ─ ─ ─ ┬ ─ ─ ─ ─ ─ ─ ─┌ ─ ─ ─ ─ ─┬ ─ ─ ─ ┐│
/// │ common prefix len key suffix len│key suffix│ value ││
/// ││ u8 │ u16 │ bytes │ bytes ││
/// │ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘─ ─ ─ ─ ─ ┘ ─ ─ ─ ┘│
/// └─────────────────────────────────────────────────────┘
///
pub fn add(&mut self, key: &[u8], value: &[u8]) -> bool {
debug_assert!(!key.is_empty(), "key cannot be empty");
if self.first_key.is_empty() {
if value.len() > self.block_size {
self.data.extend_from_slice(value);
self.is_large = true;
self.first_key = Bytes::copy_from_slice(key);
return true;
}
self.first_key = Bytes::copy_from_slice(key);
self.offsets.push(self.data.len() as u16);
self.data.extend_from_slice(value);
return true;
}
// whether the block is full
if self.estimated_size() + key.len() + value.len() + SIZE_OF_U8 + SIZE_OF_U16 > self.block_size {
return false;
}
self.offsets.push(self.data.len() as u16);
let (common, suffix) = get_common_prefix_len_and_strip(key, &self.first_key);
let key_len = suffix.len() ;
self.data.push(common);
self.data.extend_from_slice(&(key_len as u16).to_le_bytes());
self.data.extend_from_slice(suffix);
self.data.extend_from_slice(value);
true
}
pub fn build(self)->Block{
if self.is_large{
return Block::Large(LargeValueBlock{
value_bytes: Bytes::from(self.data),
key: self.first_key,
});
}
debug_assert!(!self.offsets.is_empty(), "block is empty");
Block::Normal(NormalBlock{
data: Bytes::from(self.data),
offsets: self.offsets,
first_key: self.first_key,
})
}
}
/// Block iterator
///
/// If the key is empty, it means the iterator is invalid.
#[derive(Clone)]
pub struct BlockIter {
block: Arc<Block>,
next_key: Bytes,
next_value_range: Range<usize>,
prev_key: Bytes,
prev_value_range: Range<usize>,
next_idx: usize,
prev_idx: isize,
first_key: Bytes,
}
impl Debug for BlockIter {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("BlockIter")
.field("is_large", &self.block.is_large())
.field("next_key", &self.next_key)
.field("next_value_range", &self.next_value_range)
.field("prev_key", &self.prev_key)
.field("prev_value_range", &self.prev_value_range)
.field("next_idx", &self.next_idx)
.field("prev_idx", &self.prev_idx)
.field("first_key", &Bytes::copy_from_slice(&self.first_key))
.finish()
}
}
impl BlockIter {
pub fn new(block: Arc<Block>) -> Self {
let prev_idx = block.len() as isize - 1;
let mut iter = Self {
first_key: block.first_key(),
block,
next_key: Bytes::new(),
next_value_range: 0..0,
prev_key: Bytes::new(),
prev_value_range: 0..0,
next_idx: 0,
prev_idx,
};
iter.seek_to_idx(0);
iter.back_to_idx(prev_idx);
iter
}
pub fn new_seek_to_key(block: Arc<Block>, key: &[u8]) -> Self {
let prev_idx = block.len() as isize - 1;
let mut iter = Self {
first_key: block.first_key(),
block,
next_key: Bytes::new(),
next_value_range: 0..0,
prev_key: Bytes::new(),
prev_value_range: 0..0,
next_idx: 0,
prev_idx,
};
iter.seek_to_key(key);
iter.back_to_idx(prev_idx);
iter
}
pub fn new_back_to_key(block: Arc<Block>, key: &[u8]) -> Self {
let prev_idx = block.len() as isize - 1;
let mut iter = Self {
first_key: block.first_key(),
block,
next_key: Bytes::new(),
next_value_range: 0..0,
prev_key: Bytes::new(),
prev_value_range: 0..0,
next_idx: 0,
prev_idx,
};
iter.seek_to_idx(0);
iter.back_to_key(key);
iter
}
pub fn new_scan(block: Arc<Block>, start: Bound<&[u8]>, end: Bound<&[u8]>) -> Self {
let mut iter = match start {
Bound::Included(key) => Self::new_seek_to_key(block, key),
Bound::Excluded(key) => {
let mut iter = Self::new_seek_to_key(block, key);
while iter.has_next() && iter.peek_next_curr_key().unwrap() == key {
iter.next();
}
iter
}
Bound::Unbounded => Self::new(block),
};
match end {
Bound::Included(key) => {
iter.back_to_key(key);
}
Bound::Excluded(key) => {
iter.back_to_key(key);
while iter.has_next_back() && iter.peek_back_curr_key().unwrap() == key {
iter.next_back();
}
}
Bound::Unbounded => {}
}
iter
}
pub fn peek_next_curr_key(&self) -> Option<Bytes> {
if self.has_next(){
Some(Bytes::copy_from_slice(&self.next_key))
}else{
None
}
}
pub fn peek_next_curr_value(&self) -> Option<Bytes> {
if self.has_next(){
Some(self.block.data().slice(self.next_value_range.clone()))
}else{
None
}
}
pub fn has_next(&self) -> bool {
!self.next_key.is_empty() && self.next_idx as isize <= self.prev_idx
}
pub fn peek_back_curr_key(&self) -> Option<Bytes> {
if self.has_next_back(){
Some(Bytes::copy_from_slice(&self.prev_key))
}else{
None
}
}
pub fn peek_back_curr_value(&self) -> Option<Bytes> {
if self.has_next_back(){
Some(self.block.data().slice(self.prev_value_range.clone()))
}else{
None
}
}
pub fn has_next_back(&self) -> bool {
!self.prev_key.is_empty() && self.next_idx as isize <= self.prev_idx
}
pub fn next(&mut self) {
self.next_idx += 1;
if self.next_idx as isize > self.prev_idx {
self.next_key.clear();
self.next_value_range = 0..0;
return;
}
self.seek_to_idx(self.next_idx);
}
pub fn next_back(&mut self) {
self.prev_idx -= 1;
if self.prev_idx < 0 || self.prev_idx < (self.next_idx as isize) {
self.prev_key.clear();
self.prev_value_range = 0..0;
return;
}
self.back_to_idx(self.prev_idx);
}
pub fn seek_to_key(&mut self, key: &[u8]) {
match self.block.as_ref() {
Block::Normal(block) => {
let mut left = 0;
let mut right = block.offsets.len();
while left < right {
let mid = left + (right - left) / 2;
self.seek_to_idx(mid);
debug_assert!(self.has_next());
if self.next_key == key {
return;
}
if self.next_key < key {
left = mid + 1;
} else {
right = mid;
}
}
self.seek_to_idx(left);
}
Block::Large(block) => {
if key > block.key {
self.seek_to_idx(1);
} else {
self.seek_to_idx(0);
}
}
}
}
/// MUST be called after seek_to_key()
pub fn back_to_key(&mut self, key: &[u8]) {
match self.block.as_ref() {
Block::Normal(block) => {
let mut left = self.next_idx;
let mut right = block.offsets.len();
while left < right {
let mid = left + (right - left) / 2;
self.back_to_idx(mid as isize);
// prev idx <= next idx
if !self.has_next_back() {
return;
}
debug_assert!(self.has_next_back());
if self.prev_key > key {
right = mid;
} else {
left = mid + 1;
}
}
self.back_to_idx(left as isize - 1);
}
Block::Large(block) => {
if key < block.key {
self.back_to_idx(-1);
} else {
self.back_to_idx(0);
}
}
}
}
fn seek_to_idx(&mut self, idx: usize) {
match self.block.as_ref() {
Block::Normal(block) => {
if idx >= block.offsets.len() {
self.next_key.clear();
self.next_value_range = 0..0;
self.next_idx = idx;
return;
}
let offset = block.offsets[idx] as usize;
self.seek_to_offset(
offset,
*block
.offsets
.get(idx + 1)
.unwrap_or(&(block.data.len() as u16)) as usize,
idx == 0,
);
self.next_idx = idx;
}
Block::Large(block) => {
if idx > 0 {
self.next_key.clear();
self.next_value_range = 0..0;
self.next_idx = idx;
return;
}
self.next_key = block.key.clone();
self.next_value_range = 0..block.value_bytes.len();
self.next_idx = idx;
}
}
}
fn back_to_idx(&mut self, idx: isize) {
match self.block.as_ref() {
Block::Normal(block) => {
if idx < 0 {
self.prev_key.clear();
self.prev_value_range = 0..0;
self.prev_idx = idx;
return;
}
let offset = block.offsets[idx as usize] as usize;
self.back_to_offset(
offset,
*block
.offsets
.get(idx as usize + 1)
.unwrap_or(&(block.data.len() as u16)) as usize,
idx == 0,
);
self.prev_idx = idx;
}
Block::Large(block) => {
if idx < 0 {
self.prev_key.clear();
self.prev_value_range = 0..0;
self.prev_idx = idx;
return;
}
self.prev_key = block.key.clone();
self.prev_value_range = 0..block.value_bytes.len();
self.prev_idx = idx;
}
}
}
fn seek_to_offset(&mut self, offset: usize, offset_end: usize, is_first: bool) {
match self.block.as_ref() {
Block::Normal(block) => {
if is_first{
self.next_key = self.first_key.clone();
self.next_value_range = offset..offset_end;
return;
}
let mut rest = &block.data[offset..];
let common_prefix_len = rest.get_u8() as usize;
let key_suffix_len = rest.get_u16_le() as usize;
let mut next_key = Vec::with_capacity(common_prefix_len + key_suffix_len);
next_key.extend_from_slice(&self.first_key[..common_prefix_len]);
next_key.extend_from_slice(&rest[..key_suffix_len]);
self.next_key = next_key.into();
let value_start = offset + SIZE_OF_U8 + SIZE_OF_U16 + key_suffix_len;
self.next_value_range = value_start..offset_end;
}
Block::Large(_) => {
unreachable!()
}
}
}
fn back_to_offset(&mut self, offset: usize, offset_end: usize, is_first: bool) {
match self.block.as_ref() {
Block::Normal(block) => {
if is_first{
self.prev_key = self.first_key.clone();
self.prev_value_range = offset..offset_end;
return;
}
let mut rest = &block.data[offset..];
let common_prefix_len = rest.get_u8() as usize;
let key_suffix_len = rest.get_u16_le() as usize;
let mut prev_key = Vec::with_capacity(common_prefix_len + key_suffix_len);
prev_key.extend_from_slice(&self.first_key[..common_prefix_len]);
prev_key.extend_from_slice(&rest[..key_suffix_len]);
self.prev_key = prev_key.into();
let value_start = offset + SIZE_OF_U8 + SIZE_OF_U16 + key_suffix_len;
self.prev_value_range = value_start..offset_end;
}
Block::Large(_) => {
unreachable!()
}
}
}
}
impl KvIterator for BlockIter{
fn peek_next_key(&self) -> Option<Bytes> {
self.peek_next_curr_key()
}
fn peek_next_value(&self) -> Option<Bytes> {
self.peek_next_curr_value()
}
fn next_(&mut self) {
self.next();
}
fn has_next(&self) -> bool {
self.has_next()
}
fn peek_next_back_key(&self) -> Option<Bytes> {
self.peek_back_curr_key()
}
fn peek_next_back_value(&self) -> Option<Bytes> {
self.peek_back_curr_value()
}
fn next_back_(&mut self) {
self.next_back();
}
fn has_next_back(&self) -> bool {
self.has_next_back()
}
}
impl Iterator for BlockIter {
type Item = (Bytes, Bytes);
fn next(&mut self) -> Option<Self::Item> {
if !self.has_next() {
return None;
}
let key = self.peek_next_curr_key().unwrap();
let value = self.peek_next_curr_value().unwrap();
self.next();
Some((key, value))
}
}
impl DoubleEndedIterator for BlockIter {
fn next_back(&mut self) -> Option<Self::Item> {
if !self.has_next_back() {
return None;
}
let key = self.peek_back_curr_key().unwrap();
let value = self.peek_back_curr_value().unwrap();
self.next_back();
Some((key, value))
}
}

View file

@ -0,0 +1,74 @@
use std::{
borrow::Cow,
io::{self, Write},
};
use bytes::Bytes;
use loro_common::LoroError;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CompressionType {
None,
LZ4,
}
impl CompressionType {
pub fn is_none(&self) -> bool {
matches!(self, CompressionType::None)
}
}
impl TryFrom<u8> for CompressionType {
type Error = LoroError;
fn try_from(value: u8) -> Result<Self, Self::Error> {
match value {
0 => Ok(CompressionType::None),
1 => Ok(CompressionType::LZ4),
_ => Err(LoroError::DecodeError(
format!("Invalid compression type: {}", value).into(),
)),
}
}
}
impl From<CompressionType> for u8 {
fn from(value: CompressionType) -> Self {
match value {
CompressionType::None => 0,
CompressionType::LZ4 => 1,
}
}
}
pub fn compress(w: &mut Vec<u8>, data: Cow<[u8]>, compression_type: CompressionType) {
match compression_type {
CompressionType::None => {
w.write_all(&data).unwrap();
}
CompressionType::LZ4 => {
let mut encoder = lz4_flex::frame::FrameEncoder::new(w);
encoder.write_all(&data).unwrap();
let _w = encoder.finish().unwrap();
}
}
}
pub fn decompress(
out: &mut Vec<u8>,
data: Bytes,
compression_type: CompressionType,
) -> Result<(), LoroError> {
match compression_type {
CompressionType::None => {
out.write_all(&data).unwrap();
Ok(())
}
CompressionType::LZ4 => {
let mut decoder = lz4_flex::frame::FrameDecoder::new(data.as_ref());
io::copy(&mut decoder, out)
.map_err(|e| LoroError::DecodeError(e.to_string().into()))?;
Ok(())
}
}
}

191
crates/kv-store/src/iter.rs Normal file
View file

@ -0,0 +1,191 @@
use std::fmt::Debug;
use bytes::Bytes;
/// When you need peek next key and value and next back key and value, use this trait.
pub trait KvIterator: Debug + DoubleEndedIterator<Item = (Bytes, Bytes)> {
fn peek_next_key(&self) -> Option<Bytes>;
fn peek_next_value(&self) -> Option<Bytes>;
fn next_(&mut self);
fn has_next(&self) -> bool;
fn peek_next_back_key(&self) -> Option<Bytes>;
fn peek_next_back_value(&self) -> Option<Bytes>;
fn next_back_(&mut self);
fn has_next_back(&self) -> bool;
}
/// Merge multiple iterators into one.
///
/// The iterators are merged in the order they are provided.
/// If two iterators have the same key, the value from the iterator with the smallest index is used.
///
/// Note: This implementation is not optimized for lots of iterators.
/// You should only use this when you have a small number of iterators.
#[derive(Debug)]
pub struct MergeIterator<T: KvIterator> {
iters: Vec<T>,
}
impl<T: KvIterator> MergeIterator<T> {
pub fn new(iters: Vec<T>) -> Self {
Self { iters }
}
}
impl<T: KvIterator> Iterator for MergeIterator<T> {
type Item = (Bytes, Bytes);
fn next(&mut self) -> Option<Self::Item> {
let mut min_key = None;
let mut min_index = None;
let mut has_to_remove = false;
for (i, iter) in self.iters.iter_mut().enumerate() {
if let Some(key) = iter.peek_next_key() {
if let Some(this_min_key) = &min_key {
match key.cmp(this_min_key) {
std::cmp::Ordering::Less => {
min_key = Some(key);
min_index = Some(i);
}
std::cmp::Ordering::Equal => {
// the same key, skip it
iter.next();
}
std::cmp::Ordering::Greater => {}
}
} else {
min_key = Some(key);
min_index = Some(i);
}
} else {
has_to_remove = true;
}
}
let ans = if let Some(idx) = min_index {
self.iters[idx].next()
} else {
None
};
if has_to_remove {
self.iters.retain(|x| x.has_next());
}
ans
}
}
impl<T: KvIterator> DoubleEndedIterator for MergeIterator<T> {
fn next_back(&mut self) -> Option<Self::Item> {
let mut max_key = None;
let mut max_index = None;
let mut has_to_remove = false;
for (i, iter) in self.iters.iter_mut().enumerate() {
if let Some(key) = iter.peek_next_back_key() {
if let Some(this_max_key) = &max_key {
match key.cmp(this_max_key) {
std::cmp::Ordering::Less => {}
std::cmp::Ordering::Equal => {
// the same key, skip it
iter.next_back();
}
std::cmp::Ordering::Greater => {
max_key = Some(key);
max_index = Some(i);
}
}
} else {
max_key = Some(key);
max_index = Some(i);
}
} else {
has_to_remove = true;
}
}
let ans = if let Some(idx) = max_index {
self.iters[idx].next_back()
} else {
None
};
if has_to_remove {
self.iters.retain(|x| x.has_next_back());
}
ans
}
}
#[cfg(test)]
mod tests {
use std::ops::Bound;
use super::*;
use crate::{compress::CompressionType, sstable};
use bytes::Bytes;
#[test]
fn test_merge_iterator() {
let a = Bytes::from("a");
let b = Bytes::from("b");
let c = Bytes::from("c");
let d = Bytes::from("d");
let mut sstable1 = sstable::SsTableBuilder::new(10, CompressionType::LZ4);
sstable1.add(a.clone(), a.clone());
sstable1.add(c.clone(), c.clone());
let sstable1 = sstable1.build();
let iter1 = sstable::SsTableIter::new_scan(&sstable1, Bound::Unbounded, Bound::Unbounded);
let mut sstable2 = sstable::SsTableBuilder::new(10, CompressionType::LZ4);
sstable2.add(b.clone(), b.clone());
sstable2.add(d.clone(), d.clone());
let sstable2 = sstable2.build();
let iter2 = sstable::SsTableIter::new_scan(&sstable2, Bound::Unbounded, Bound::Unbounded);
let merged_iter = MergeIterator::new(vec![iter1.clone(), iter2.clone()]);
let ans = merged_iter.collect::<Vec<_>>();
assert_eq!(
ans,
vec![
(a.clone(), a.clone()),
(b.clone(), b.clone()),
(c.clone(), c.clone()),
(d.clone(), d.clone())
]
);
let merged_iter = MergeIterator::new(vec![iter1.clone(), iter2.clone()]);
let ans2 = merged_iter.rev().collect::<Vec<_>>();
assert_eq!(ans2, ans.iter().rev().cloned().collect::<Vec<_>>());
}
#[test]
fn same_key() {
let a = Bytes::from("a");
let a2 = Bytes::from("a2");
let c = Bytes::from("c");
let d = Bytes::from("d");
let mut sstable1 = sstable::SsTableBuilder::new(10, CompressionType::LZ4);
sstable1.add(a.clone(), a.clone());
sstable1.add(c.clone(), c.clone());
let sstable1 = sstable1.build();
let iter1 = sstable::SsTableIter::new_scan(&sstable1, Bound::Unbounded, Bound::Unbounded);
let mut sstable2 = sstable::SsTableBuilder::new(10, CompressionType::LZ4);
sstable2.add(a.clone(), a2.clone());
sstable2.add(d.clone(), d.clone());
let sstable2 = sstable2.build();
let iter2 = sstable::SsTableIter::new_scan(&sstable2, Bound::Unbounded, Bound::Unbounded);
let merged_iter = MergeIterator::new(vec![iter1.clone(), iter2.clone()]);
let ans = merged_iter.collect::<Vec<_>>();
assert_eq!(
ans,
vec![
(a.clone(), a.clone()),
(c.clone(), c.clone()),
(d.clone(), d.clone())
]
);
}
}

142
crates/kv-store/src/lib.rs Normal file
View file

@ -0,0 +1,142 @@
//! # MemKvStore Documentation
//!
//! MemKvStore use SSTable as backend. The SSTable (Sorted String Table) is a persistent data structure
//! used for storing key-value pairs in a sorted manner. This document describes the binary format of
//! the SSTable.
//!
//! ## Overall Structure
//!
//! The SSTable consists of the following sections:
//!
//! ┌─────────────────────────────────────────────────────────────────────────────────────────────────┐
//! │ MemKVStore │
//! │┌ ─ ─ ─ ─ ─ ─ ─┌ ─ ─ ─ ─ ─ ─ ─ ─┌ ─ ─ ─ ─ ─ ─ ┬ ─ ─ ─┌ ─ ─ ─ ─ ─ ─ ─ ┬ ─ ─ ─ ─ ─ ─┌ ─ ─ ─ ─ ─ ─ ┐│
//! │ Magic Number │ Schema Version │ Block Chunk ... │ Block Chunk Block Meta │ Meta Offset │
//! ││ u32 │ u8 │ bytes │ │ bytes │ bytes │ u32 ││
//! │ ─ ─ ─ ─ ─ ─ ─ ┘─ ─ ─ ─ ─ ─ ─ ─ ┘─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘─ ─ ─ ─ ─ ─ ─ │
//! └─────────────────────────────────────────────────────────────────────────────────────────────────┘
//!
//! 1. Magic Number (4 bytes): A fixed value "LORO" to identify the file format.
//! 2. Schema Version (1 byte): The version of the MemKVStore schema.
//! 3. Block Chunks: A series of data blocks containing key-value pairs.
//! 4. Block Meta: Metadata for all blocks, including block offset, the first key of the block, `is_large` flag, and last key
//! if not large.
//! 5. Meta Offset (4 bytes): The offset of the Block Meta section from the beginning of the file.
//!
//! ## Block Types
//!
//! There are two types of blocks: Normal Blocks and Large Value Blocks.
//!
//! ### Normal Block
//!
//! Normal blocks store multiple key-value pairs with compressed keys.
//!
//! ┌────────────────────────────────────────────────────────────────────────────────────────────┐
//! │Block │
//! │┌ ─ ─ ─ ─ ─ ─ ─ ┬ ─ ─ ─┌ ─ ─ ─ ─ ─ ─ ─ ┬ ─ ─ ─ ─┌ ─ ─ ─┌ ─ ─ ─ ┬ ─ ─ ─ ─┌ ─ ─ ─ ─ ─ ─ ─ │
//! │ Key Value Chunk ... │Key Value Chunk offset │ ... │ offset kv len │Block Checksum│ │
//! ││ bytes │ │ bytes │ u16 │ │ u16 │ u16 │ u32 │
//! │ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘─ ─ ─ ┘─ ─ ─ ─ ─ ─ ─ ─ ┘─ ─ ─ ─ ─ ─ ─ ┘ │
//! └────────────────────────────────────────────────────────────────────────────────────────────┘
//!
//! Each Key Value Chunk is encoded as follows:
//!
//! ┌─────────────────────────────────────────────────────┐
//! │ Key Value Chunk │
//! │┌ ─ ─ ─ ─ ─ ─ ─ ─ ┬ ─ ─ ─ ─ ─ ─ ─┌ ─ ─ ─ ─ ─┬ ─ ─ ─ ┐│
//! │ common prefix len key suffix len│key suffix│ value ││
//! ││ u8 │ u16 │ bytes │ bytes ││
//! │ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘─ ─ ─ ─ ─ ┘─ ─ ─ ─┘│
//! └─────────────────────────────────────────────────────┘
//!
//! Because the key of the first chunk is equal to the first key of the block,
//! the first chunk can be simplified as:
//! ┌────────────────────┐
//! │ Key Value Chunk │
//! │┌ ─ ─ ─ ─ ─┬ ─ ─ ─ ┐│
//! ││key suffix│ value ││
//! ││ bytes │ bytes ││
//! │ ─ ─ ─ ─ ─ ┘─ ─ ─ ─┘│
//! └────────────────────┘
//!
//! Encoding:
//! 1. Compress key-value pairs data as Key Value Chunk.
//! 2. Write offsets for each key-value pair.
//! 3. Write the number of key-value pairs.
//! 4. By default, **Compress** the entire block using LZ4. If you set `compression_type` to `None`, it will not compress the block.
//! - For now, there are two compression type: `None` and `LZ4`.
//! 5. Calculate and append xxhash_32 checksum.
//!
//! Decoding:
//! 1. Verify the xxhash_32 checksum.
//! 2. By default, **Decompress** the block using LZ4. If you set `compression_type` to `None`, it will not decompress the block.
//! 3. Read the number of key-value pairs.
//! 4. Read offsets for each key-value pair.
//! 5. Parse individual key-value chunks.
//!
//! ### Large Value Block
//!
//! Large Value Blocks store a single key-value pair with a large value.
//!
//! ┌──────────────────────────┐
//! │Large Block │
//! │┌ ─ ─ ─ ┬ ─ ─ ─ ─ ─ ─ ─ ─ │
//! │ value Block Checksum ││
//! ││ bytes │ u32 │
//! │ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘│
//! └──────────────────────────┘
//!
//! Encoding:
//! 1. Write the value bytes.
//! 2. Calculate and append xxhash_32 checksum.
//!
//! Decoding:
//! 1. Verify the xxhash_32 checksum.
//! 2. Read the value bytes.
//!
//! We need not encode the length of value, because we can get the whole Block by offset in meta.
//!
//! ## Block Meta
//!
//! The Block Meta section contains metadata for all blocks in the SSTable.
//!
//! ┌────────────────────────────────────────────────────────────┐
//! │ All Block Meta │
//! │┌ ─ ─ ─ ─ ─ ─ ─┌ ─ ─ ─ ─ ─ ─┌ ─ ─ ─┌ ─ ─ ─ ─ ─ ─┌ ─ ─ ─ ─ ─ │
//! │ block number │ Block Meta │ ... │ Block Meta │ checksum ││
//! ││ u32 │ bytes │ │ bytes │ u32 │
//! │ ─ ─ ─ ─ ─ ─ ─ ┘─ ─ ─ ─ ─ ─ ┘─ ─ ─ ┘─ ─ ─ ─ ─ ─ ┘─ ─ ─ ─ ─ ┘│
//! └────────────────────────────────────────────────────────────┘
//!
//! Each Block Meta entry is encoded as follows:
//!
//! ┌──────────────────────────────────────────────────────────────────────────────────────────┐
//! │ Block Meta │
//! │┌ ─ ─ ─ ─ ─ ─ ─┌ ─ ─ ─ ─ ─ ─ ─ ┬ ─ ─ ─ ─ ─ ┬ ─ ─ ─ ─ ─ ─ ─┌ ─ ─ ─ ─ ─ ─ ─ ┬ ─ ─ ─ ─ ─ ─ ┐ │
//! │ block offset │ first key len first key block type │ last key len last key │
//! ││ u32 │ u16 │ bytes │ u8 │ u16(option) │bytes(option)│ │
//! │ ─ ─ ─ ─ ─ ─ ─ ┘─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ │
//! └──────────────────────────────────────────────────────────────────────────────────────────┘
//!
//! Encoding:
//! 1. Write the number of blocks.
//! 2. For each block, write its metadata (offset, first key, block type, and last key if not large).
//! - block type: the first bit for is_large, the next 7 bits for compression_type.
//! 3. Calculate and append xxhash_32 checksum.
//!
//! Decoding:
//! 1. Read the number of blocks.
//! 2. For each block, read its metadata.
//! 3. Verify the xxhash_32 checksum.
//!
//!
//! Note: In this crate, the empty value is regarded as deleted. **only** [MemStoreIterator] will filter empty value.
//! Other iterators will still return empty value.
pub mod block;
pub mod compress;
pub mod iter;
pub mod mem_store;
pub mod sstable;
mod utils;
pub use iter::{KvIterator, MergeIterator};
pub use mem_store::{MemKvStore, MemStoreIterator};

View file

@ -0,0 +1,500 @@
use crate::block::BlockIter;
use crate::compress::CompressionType;
use crate::sstable::{SsTable, SsTableBuilder, SsTableIter};
use crate::MergeIterator;
use bytes::Bytes;
use std::ops::Bound;
use std::{cmp::Ordering, collections::BTreeMap};
const DEFAULT_BLOCK_SIZE: usize = 4 * 1024;
#[derive(Debug, Clone)]
pub struct MemKvStore {
mem_table: BTreeMap<Bytes, Bytes>,
// From the oldest to the newest
ss_table: Vec<SsTable>,
block_size: usize,
compression_type: CompressionType,
}
impl Default for MemKvStore {
fn default() -> Self {
Self::new(DEFAULT_BLOCK_SIZE, CompressionType::LZ4)
}
}
impl MemKvStore {
pub fn new(block_size: usize, compression_type: CompressionType) -> Self {
Self {
mem_table: BTreeMap::new(),
ss_table: Vec::new(),
block_size,
compression_type,
}
}
pub fn get(&self, key: &[u8]) -> Option<Bytes> {
if let Some(v) = self.mem_table.get(key) {
if v.is_empty() {
return None;
}
return Some(v.clone());
}
for table in self.ss_table.iter().rev() {
if table.first_key > key || table.last_key < key {
continue;
}
// table.
let idx = table.find_block_idx(key);
let block = table.read_block_cached(idx);
let block_iter = BlockIter::new_seek_to_key(block, key);
if let Some(k) = block_iter.peek_next_curr_key() {
let v = block_iter.peek_next_curr_value().unwrap();
if k == key {
return if v.is_empty() { None } else { Some(v) };
}
}
}
None
}
pub fn set(&mut self, key: &[u8], value: Bytes) {
self.mem_table.insert(Bytes::copy_from_slice(key), value);
}
pub fn compare_and_swap(&mut self, key: &[u8], old: Option<Bytes>, new: Bytes) -> bool {
match self.get(key) {
Some(v) => {
if old == Some(v) {
self.set(key, new);
true
} else {
false
}
}
None => {
if old.is_none() {
self.set(key, new);
true
} else {
false
}
}
}
}
pub fn remove(&mut self, key: &[u8]) {
self.set(key, Bytes::new());
}
/// Check if the key exists in the mem table or the sstable
///
/// If the value is empty, it means the key is deleted
pub fn contains_key(&self, key: &[u8]) -> bool {
if self.mem_table.contains_key(key) {
return !self.mem_table.get(key).unwrap().is_empty();
}
for table in self.ss_table.iter().rev() {
if table.contains_key(key) {
if let Some(v) = table.get(key) {
return !v.is_empty();
}
}
}
false
}
pub fn scan(
&self,
start: std::ops::Bound<&[u8]>,
end: std::ops::Bound<&[u8]>,
) -> Box<dyn DoubleEndedIterator<Item = (Bytes, Bytes)> + '_> {
if self.ss_table.is_empty() {
return Box::new(
self.mem_table
.range::<[u8], _>((start, end))
.filter(|(_, v)| !v.is_empty())
.map(|(k, v)| (k.clone(), v.clone())),
);
}
Box::new(MemStoreIterator::new(
self.mem_table
.range::<[u8], _>((start, end))
.map(|(k, v)| (k.clone(), v.clone())),
MergeIterator::new(
self.ss_table
.iter()
.rev()
.map(|table| SsTableIter::new_scan(table, start, end))
.collect(),
),
true,
))
}
/// The number of valid keys in the mem table and sstable, it's expensive to call
pub fn len(&self) -> usize {
// TODO: PERF
self.scan(Bound::Unbounded, Bound::Unbounded).count()
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn size(&self) -> usize {
self.mem_table
.iter()
.fold(0, |acc, (k, v)| acc + k.len() + v.len())
+ self
.ss_table
.iter()
.map(|table| table.data_size())
.sum::<usize>()
}
pub fn export_all(&mut self) -> Bytes {
if self.mem_table.is_empty() && self.ss_table.len() == 1 {
return self.ss_table[0].export_all();
}
let mut builder = SsTableBuilder::new(self.block_size, self.compression_type);
// we could use scan() here, we should keep the empty value
for (k, v) in MemStoreIterator::new(
self.mem_table
.range::<[u8], _>((Bound::Unbounded, Bound::Unbounded))
.map(|(k, v)| (k.clone(), v.clone())),
MergeIterator::new(
self.ss_table
.iter()
.rev()
.map(|table| SsTableIter::new_scan(table, Bound::Unbounded, Bound::Unbounded))
.collect(),
),
false,
) {
builder.add(k, v);
}
builder.finish_block();
if builder.is_empty() {
return Bytes::new();
}
self.mem_table.clear();
let ss = builder.build();
let ans = ss.export_all();
let _ = std::mem::replace(&mut self.ss_table, vec![ss]);
ans
}
/// We can import several times, the latter will override the former.
pub fn import_all(&mut self, bytes: Bytes) -> Result<(), String> {
if bytes.is_empty() {
return Ok(());
}
let ss_table = SsTable::import_all(bytes).map_err(|e| e.to_string())?;
self.ss_table.push(ss_table);
Ok(())
}
}
#[derive(Debug)]
pub struct MemStoreIterator<T, S> {
mem: T,
sst: S,
current_mem: Option<(Bytes, Bytes)>,
current_sstable: Option<(Bytes, Bytes)>,
back_mem: Option<(Bytes, Bytes)>,
back_sstable: Option<(Bytes, Bytes)>,
filter_empty: bool,
}
impl<T, S> MemStoreIterator<T, S>
where
T: DoubleEndedIterator<Item = (Bytes, Bytes)>,
S: DoubleEndedIterator<Item = (Bytes, Bytes)>,
{
fn new(mut mem: T, sst: S, filter_empty: bool) -> Self {
let current_mem = mem.next();
let back_mem = mem.next_back();
Self {
mem,
sst,
current_mem,
back_mem,
current_sstable: None,
back_sstable: None,
filter_empty,
}
}
}
impl<T, S> Iterator for MemStoreIterator<T, S>
where
T: DoubleEndedIterator<Item = (Bytes, Bytes)>,
S: DoubleEndedIterator<Item = (Bytes, Bytes)>,
{
type Item = (Bytes, Bytes);
fn next(&mut self) -> Option<Self::Item> {
if self.current_sstable.is_none() {
if let Some((k, v)) = self.sst.next() {
self.current_sstable = Some((k, v));
}
}
if self.current_mem.is_none() && self.back_mem.is_some() {
std::mem::swap(&mut self.back_mem, &mut self.current_mem);
}
let ans = match (&self.current_mem, &self.current_sstable) {
(Some((mem_key, _)), Some((iter_key, _))) => match mem_key.cmp(iter_key) {
Ordering::Less => self.current_mem.take().map(|kv| {
self.current_mem = self.mem.next();
kv
}),
Ordering::Equal => {
self.current_sstable.take();
self.current_mem.take().map(|kv| {
self.current_mem = self.mem.next();
kv
})
}
Ordering::Greater => self.current_sstable.take(),
},
(Some(_), None) => self.current_mem.take().map(|kv| {
self.current_mem = self.mem.next();
kv
}),
(None, Some(_)) => self.current_sstable.take(),
(None, None) => None,
};
if self.filter_empty {
if let Some((_k, v)) = &ans {
if v.is_empty() {
return self.next();
}
}
}
ans
}
}
impl<T, S> DoubleEndedIterator for MemStoreIterator<T, S>
where
T: DoubleEndedIterator<Item = (Bytes, Bytes)>,
S: DoubleEndedIterator<Item = (Bytes, Bytes)>,
{
fn next_back(&mut self) -> Option<Self::Item> {
if self.back_sstable.is_none() {
if let Some((k, v)) = self.sst.next_back() {
self.back_sstable = Some((k, v));
}
}
if self.back_mem.is_none() && self.current_mem.is_some() {
std::mem::swap(&mut self.back_mem, &mut self.current_mem);
}
let ans = match (&self.back_mem, &self.back_sstable) {
(Some((mem_key, _)), Some((iter_key, _))) => match mem_key.cmp(iter_key) {
Ordering::Greater => self.back_mem.take().map(|kv| {
self.back_mem = self.mem.next_back();
kv
}),
Ordering::Equal => {
self.back_sstable.take();
self.back_mem.take().map(|kv| {
self.back_mem = self.mem.next_back();
kv
})
}
Ordering::Less => self.back_sstable.take(),
},
(Some(_), None) => self.back_mem.take().map(|kv| {
self.back_mem = self.mem.next_back();
kv
}),
(None, Some(_)) => self.back_sstable.take(),
(None, None) => None,
};
if self.filter_empty {
if let Some((_k, v)) = &ans {
if v.is_empty() {
return self.next_back();
}
}
}
ans
}
}
#[cfg(test)]
mod tests {
use std::vec;
use crate::MemKvStore;
use bytes::Bytes;
#[test]
fn test_mem_kv_store() {
let key = &[0];
let value = Bytes::from_static(&[0]);
let key2 = &[0, 1];
let value2 = Bytes::from_static(&[0, 1]);
let mut store = MemKvStore::default();
store.set(key, value.clone());
assert_eq!(store.get(key), Some(value));
store.remove(key);
assert!(store.is_empty());
assert_eq!(store.get(key), None);
store.compare_and_swap(key, None, value2.clone());
assert_eq!(store.get(key), Some(value2.clone()));
assert!(store.contains_key(key));
assert!(!store.contains_key(key2));
store.set(key2, value2.clone());
assert_eq!(store.get(key2), Some(value2.clone()));
assert_eq!(store.len(), 2);
assert_eq!(store.size(), 7);
let bytes = store.export_all();
let mut new_store = MemKvStore::default();
assert_eq!(new_store.len(), 0);
assert_eq!(new_store.size(), 0);
new_store.import_all(bytes).unwrap();
let iter1 = store
.scan(std::ops::Bound::Unbounded, std::ops::Bound::Unbounded)
.collect::<Vec<_>>();
let iter2 = new_store
.scan(std::ops::Bound::Unbounded, std::ops::Bound::Unbounded)
.collect::<Vec<_>>();
assert_eq!(iter1, iter2);
let iter1 = store
.scan(std::ops::Bound::Unbounded, std::ops::Bound::Unbounded)
.rev()
.collect::<Vec<_>>();
let iter2 = new_store
.scan(std::ops::Bound::Unbounded, std::ops::Bound::Unbounded)
.rev()
.collect::<Vec<_>>();
assert_eq!(iter1, iter2);
}
#[test]
fn test_large_block() {
let mut store = MemKvStore::default();
let key = &[0];
let value = Bytes::from_static(&[0]);
let key2 = &[0, 1];
let key3 = &[0, 1, 2];
let large_value = Bytes::from_iter([0; 1024 * 8]);
let large_value2 = Bytes::from_iter([0; 1024 * 8]);
store.set(key, value.clone());
store.set(key2, large_value.clone());
let v2 = store.get(&[]);
assert_eq!(v2, None);
assert_eq!(store.get(key), Some(value.clone()));
assert_eq!(store.get(key2), Some(large_value.clone()));
store.export_all();
store.set(key3, large_value2.clone());
assert_eq!(store.get(key3), Some(large_value2.clone()));
assert_eq!(store.len(), 3);
let iter = store
.scan(std::ops::Bound::Unbounded, std::ops::Bound::Unbounded)
.collect::<Vec<_>>();
assert_eq!(
iter,
vec![
(Bytes::from_static(key), value.clone()),
(Bytes::from_static(key2), large_value.clone()),
(Bytes::from_static(key3), large_value2.clone())
]
);
let iter2 = store
.scan(
std::ops::Bound::Included(key),
std::ops::Bound::Included(key3),
)
.collect::<Vec<_>>();
assert_eq!(iter, iter2);
let iter3 = store
.scan(
std::ops::Bound::Excluded(key),
std::ops::Bound::Excluded(key3),
)
.collect::<Vec<_>>();
assert_eq!(iter3.len(), 1);
assert_eq!(iter3[0], (Bytes::from_static(key2), large_value.clone()));
let v = store.get(key2).unwrap();
assert_eq!(v, large_value);
let v2 = store.get(&[]);
assert_eq!(v2, None);
store.compare_and_swap(key, Some(value.clone()), large_value.clone());
assert!(store.contains_key(key));
}
#[test]
fn same_key() {
let mut store = MemKvStore::default();
let key = &[0];
let value = Bytes::from_static(&[0]);
store.set(key, value.clone());
store.export_all();
store.set(key, Bytes::new());
assert_eq!(store.get(key), None);
let iter = store
.scan(std::ops::Bound::Unbounded, std::ops::Bound::Unbounded)
.collect::<Vec<_>>();
assert_eq!(iter.len(), 0);
store.set(key, value.clone());
assert_eq!(store.get(key), Some(value));
}
#[test]
fn import_several_times() {
let a = Bytes::from_static(b"a");
let b = Bytes::from_static(b"b");
let c = Bytes::from_static(b"c");
let d = Bytes::from_static(b"d");
let e = Bytes::from_static(b"e");
let mut store = MemKvStore::default();
store.set(&a, a.clone());
store.export_all();
store.set(&c, c.clone());
let encode1 = store.export_all();
let mut store2 = MemKvStore::default();
store2.set(&b, b.clone());
store2.export_all();
store2.set(&c, Bytes::new());
let encode2 = store2.export_all();
let mut store3 = MemKvStore::default();
store3.set(&d, d.clone());
store3.set(&a, Bytes::new());
store3.export_all();
store3.set(&e, e.clone());
store3.set(&c, c.clone());
let encode3 = store3.export_all();
let mut store = MemKvStore::default();
store.import_all(encode1).unwrap();
store.import_all(encode2).unwrap();
store.import_all(encode3).unwrap();
assert_eq!(store.get(&a), None);
assert_eq!(store.get(&b), Some(b.clone()));
assert_eq!(store.get(&c), Some(c.clone()));
assert_eq!(store.get(&d), Some(d.clone()));
assert_eq!(store.get(&e), Some(e.clone()));
}
}

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,24 @@
use loro_common::{LoroError, LoroResult};
pub(crate) fn get_u32_le(bytes: &[u8]) -> LoroResult<(u32, &[u8])> {
if bytes.len() < 4 {
return Err(LoroError::DecodeError("Invalid bytes".into()));
}
let ans = u32::from_le_bytes(bytes[..4].try_into().unwrap());
Ok((ans, &bytes[4..]))
}
pub(crate) fn get_u8_le(bytes: &[u8]) -> LoroResult<(u8, &[u8])> {
if bytes.is_empty() {
return Err(LoroError::DecodeError("Invalid bytes".into()));
}
Ok((bytes[0], &bytes[1..]))
}
pub(crate) fn get_u16_le(bytes: &[u8]) -> LoroResult<(u16, &[u8])> {
if bytes.len() < 2 {
return Err(LoroError::DecodeError("Invalid bytes".into()));
}
let ans = u16::from_le_bytes(bytes[..2].try_into().unwrap());
Ok((ans, &bytes[2..]))
}

View file

@ -0,0 +1,59 @@
use bytes::Bytes;
use loro_kv_store::MemKvStore;
#[test]
fn add_and_remove() {
let key = &[0];
let value = Bytes::from_static(&[0]);
let mut store = MemKvStore::default();
store.set(key, value.clone());
assert_eq!(store.get(key), Some(value));
store.remove(key);
assert_eq!(store.get(key), None);
}
#[test]
fn add_flush_remove() {
let key = &[0];
let value = Bytes::from_static(&[0]);
let mut store = MemKvStore::default();
store.set(key, value.clone());
store.export_all();
store.remove(key);
assert_eq!(store.get(key), None);
}
#[test]
fn add_flush_add_scan() {
let key1 = &[0];
let value1 = Bytes::from_static(&[0]);
let key2 = &[128];
let value2 = Bytes::from_static(&[252, 169]);
let mut store = MemKvStore::default();
store.set(key1, value1.clone());
store.export_all();
store.set(key2, value2.clone());
let mut iter = store.scan(std::ops::Bound::Unbounded, std::ops::Bound::Unbounded);
assert_eq!(
iter.next(),
Some((Bytes::from_static(key1), value1.clone()))
);
assert_eq!(
iter.next(),
Some((Bytes::from_static(key2), value2.clone()))
);
assert_eq!(iter.next(), None);
let mut iter = store
.scan(std::ops::Bound::Unbounded, std::ops::Bound::Unbounded)
.rev();
assert_eq!(
iter.next(),
Some((Bytes::from_static(key2), value2.clone()))
);
assert_eq!(
iter.next(),
Some((Bytes::from_static(key1), value1.clone()))
);
assert_eq!(iter.next(), None);
}

View file

@ -269,7 +269,8 @@ impl ContainerID {
}
pub fn to_bytes(&self) -> Vec<u8> {
let mut bytes = Vec::new();
// normal need 13 bytes
let mut bytes = Vec::with_capacity(13);
self.encode(&mut bytes).unwrap();
bytes
}

View file

@ -14,6 +14,7 @@ keywords = ["crdt", "local-first"]
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
generic-btree = { version = "^0.10.5" }
smallvec = { workspace = true }
loro-delta = { path = "../delta", version = "0.16.2", package = "loro-delta" }
rle = { path = "../rle", version = "0.16.2", package = "loro-rle" }
@ -21,37 +22,38 @@ loro-common = { path = "../loro-common", version = "0.16.2" }
fractional_index = { path = "../fractional_index", features = [
"serde",
], version = "0.16.2", package = "loro_fractional_index" }
postcard = { version = "1.0.8", features = ["use-std"] }
loro-kv-store = { path = "../kv-store", version = "0.16.2" }
fxhash = { workspace = true }
serde = { workspace = true }
thiserror = "1"
thiserror = { workspace = true }
enum-as-inner = { workspace = true }
num = "0.4.0"
rand = { version = "0.8.5" }
serde_json = { workspace = true }
bytes = { workspace = true }
xxhash-rust = { workspace = true }
serde_columnar = { workspace = true }
itertools = { workspace = true }
enum_dispatch = { workspace = true }
once_cell = { workspace = true }
arbitrary = { version = "1", optional = true }
postcard = { version = "1.0.8", features = ["use-std"] }
append-only-bytes = { version = "0.1.12", features = ["u32_range"] }
im = { version = "15.1.0", features = ["serde"] }
tabled = { version = "0.10.0", optional = true }
wasm-bindgen = { version = "=0.2.92", optional = true }
serde-wasm-bindgen = { version = "0.5.0", optional = true }
js-sys = { version = "0.3.60", optional = true }
serde_json = { workspace = true }
arref = "0.1.0"
serde_columnar = { workspace = true }
append-only-bytes = { version = "0.1.12", features = ["u32_range"] }
itertools = { workspace = true }
enum_dispatch = { workspace = true }
im = { version = "15.1.0", features = ["serde"] }
generic-btree = { version = "^0.10.5" }
getrandom = "0.2.10"
once_cell = "1.18.0"
num = "0.4.0"
rand = { version = "0.8.5" }
getrandom = "0.2.15"
leb128 = "0.2.5"
num-traits = "0.2"
num-derive = "0.3"
either = "1"
md5 = "0.7.0"
tracing = { version = "0.1", features = ["release_max_level_warn"] }
bytes = "1.6.0"
arref = "0.1.0"
tracing = { version = "0.1" }
nonmax = "0.5.5"
xxhash-rust = { version = "0.8.12", features = ["xxh32"] }
[dev-dependencies]
miniz_oxide = "0.7.1"

View file

@ -19,48 +19,8 @@ features = ["test_utils"]
[workspace]
members = ["."]
[[bin]]
name = "yata"
path = "fuzz_targets/yata.rs"
test = false
doc = false
# [profile.dev]
# lto = true
# opt-level = 3
[[bin]]
name = "text_refactored"
path = "fuzz_targets/text_refactored.rs"
test = false
doc = false
[[bin]]
name = "recursive_refactored"
path = "fuzz_targets/recursive_refactored.rs"
test = false
doc = false
[[bin]]
name = "import"
path = "fuzz_targets/import.rs"
test = false
doc = false
[[bin]]
name = "tree"
path = "fuzz_targets/tree.rs"
test = false
doc = false
[[bin]]
name = "richtext"
path = "fuzz_targets/richtext.rs"
test = false
doc = false
[[bin]]
name = "map"
path = "fuzz_targets/map.rs"
test = false
doc = false

View file

@ -1,8 +0,0 @@
#![no_main]
use libfuzzer_sys::fuzz_target;
use loro_internal::fuzz::crdt_fuzzer::{test_multi_sites, Action, FuzzTarget};
fuzz_target!(|actions: Vec<Action>| {
test_multi_sites(5, vec![FuzzTarget::Map], &mut actions.clone())
});

View file

@ -1,5 +0,0 @@
#![no_main]
use libfuzzer_sys::fuzz_target;
use loro_internal::fuzz::recursive_refactored::{test_multi_sites, Action};
fuzz_target!(|actions: Vec<Action>| { test_multi_sites(5, &mut actions.clone()) });

View file

@ -1,5 +0,0 @@
#![no_main]
use libfuzzer_sys::fuzz_target;
use loro_internal::fuzz::richtext::{test_multi_sites, Action};
fuzz_target!(|actions: Vec<Action>| { test_multi_sites(5, &mut actions.clone()) });

View file

@ -1,5 +0,0 @@
#![no_main]
use libfuzzer_sys::fuzz_target;
use loro_internal::fuzz::{test_multi_sites, Action};
fuzz_target!(|actions: Vec<Action>| { test_multi_sites(5, &mut actions.clone()) });

View file

@ -1,5 +0,0 @@
#![no_main]
use libfuzzer_sys::fuzz_target;
use loro_internal::fuzz::tree::{test_multi_sites, Action};
fuzz_target!(|actions: Vec<Action>| { test_multi_sites(5, &mut actions.clone()) });

View file

@ -633,7 +633,10 @@ impl Sliceable for InnerListOp {
InnerListOp::StyleStart { .. }
| InnerListOp::StyleEnd { .. }
| InnerListOp::Move { .. }
| InnerListOp::Set { .. } => self.clone(),
| InnerListOp::Set { .. } => {
assert!(from == 0 && to == 1);
self.clone()
}
}
}
}

View file

@ -1,5 +1,7 @@
use bytes::Bytes;
pub use loro_kv_store::MemKvStore;
use std::{
collections::BTreeMap,
ops::Bound,
sync::{Arc, Mutex},
};
@ -16,12 +18,83 @@ pub trait KvStore: std::fmt::Debug + Send + Sync {
end: Bound<&[u8]>,
) -> Box<dyn DoubleEndedIterator<Item = (Bytes, Bytes)> + '_>;
fn len(&self) -> usize;
fn is_empty(&self) -> bool;
fn size(&self) -> usize;
fn export_all(&self) -> Bytes;
fn export_all(&mut self) -> Bytes;
fn import_all(&mut self, bytes: Bytes) -> Result<(), String>;
fn clone_store(&self) -> Arc<Mutex<dyn KvStore>>;
}
fn get_common_prefix_len_and_strip<'a, T: AsRef<[u8]> + ?Sized>(
this: &'a T,
last: &T,
) -> (u8, &'a [u8]) {
let mut common_prefix_len = 0;
for (i, (a, b)) in this.as_ref().iter().zip(last.as_ref().iter()).enumerate() {
if a != b || i == 255 {
common_prefix_len = i;
break;
}
}
let suffix = &this.as_ref()[common_prefix_len..];
(common_prefix_len as u8, suffix)
}
impl KvStore for MemKvStore {
fn get(&self, key: &[u8]) -> Option<Bytes> {
self.get(key)
}
fn set(&mut self, key: &[u8], value: Bytes) {
self.set(key, value)
}
fn compare_and_swap(&mut self, key: &[u8], old: Option<Bytes>, new: Bytes) -> bool {
self.compare_and_swap(key, old, new)
}
fn remove(&mut self, key: &[u8]) {
self.remove(key)
}
fn contains_key(&self, key: &[u8]) -> bool {
self.contains_key(key)
}
fn scan(
&self,
start: Bound<&[u8]>,
end: Bound<&[u8]>,
) -> Box<dyn DoubleEndedIterator<Item = (Bytes, Bytes)> + '_> {
self.scan(start, end)
}
fn len(&self) -> usize {
self.len()
}
fn is_empty(&self) -> bool {
self.is_empty()
}
fn size(&self) -> usize {
self.size()
}
fn export_all(&mut self) -> Bytes {
self.export_all()
}
fn import_all(&mut self, bytes: Bytes) -> Result<(), String> {
self.import_all(bytes)
}
fn clone_store(&self) -> Arc<Mutex<dyn KvStore>> {
Arc::new(Mutex::new(self.clone()))
}
}
mod default_binary_format {
//! Default binary format for the key-value store.
//!
@ -29,6 +102,8 @@ mod default_binary_format {
use bytes::Bytes;
use super::get_common_prefix_len_and_strip;
pub fn export_by_scan(store: &impl super::KvStore) -> bytes::Bytes {
let mut buf = Vec::new();
let mut last_key: Option<Bytes> = None;
@ -59,19 +134,6 @@ mod default_binary_format {
buf.into()
}
fn get_common_prefix_len_and_strip<'a>(this: &'a Bytes, last: &Bytes) -> (u8, &'a [u8]) {
let mut common_prefix_len = 0;
for (i, (a, b)) in this.iter().zip(last.iter()).enumerate() {
if a != b || i == 255 {
common_prefix_len = i;
break;
}
}
let suffix = &this[common_prefix_len..];
(common_prefix_len as u8, suffix)
}
pub fn import(store: &mut impl super::KvStore, bytes: bytes::Bytes) -> Result<(), String> {
let mut bytes: &[u8] = &bytes;
let mut last_key = Vec::new();
@ -110,110 +172,77 @@ mod default_binary_format {
}
}
mod mem {
use super::*;
use std::{collections::BTreeMap, sync::Arc};
pub type MemKvStore = BTreeMap<Bytes, Bytes>;
impl KvStore for BTreeMap<Bytes, Bytes> {
fn get(&self, key: &[u8]) -> Option<Bytes> {
self.get(key).cloned()
}
impl KvStore for MemKvStore {
fn get(&self, key: &[u8]) -> Option<Bytes> {
self.get(key).cloned()
}
fn set(&mut self, key: &[u8], value: Bytes) {
self.insert(Bytes::copy_from_slice(key), value);
}
fn set(&mut self, key: &[u8], value: Bytes) {
self.insert(Bytes::copy_from_slice(key), value);
}
fn compare_and_swap(&mut self, key: &[u8], old: Option<Bytes>, new: Bytes) -> bool {
let key = Bytes::copy_from_slice(key);
match self.get_mut(&key) {
Some(v) => {
if old.as_ref() == Some(v) {
self.insert(key, new);
true
} else {
false
}
fn compare_and_swap(&mut self, key: &[u8], old: Option<Bytes>, new: Bytes) -> bool {
let key = Bytes::copy_from_slice(key);
match self.get_mut(&key) {
Some(v) => {
if old.as_ref() == Some(v) {
self.insert(key, new);
true
} else {
false
}
None => {
if old.is_none() {
self.insert(key, new);
true
} else {
false
}
}
None => {
if old.is_none() {
self.insert(key, new);
true
} else {
false
}
}
}
fn remove(&mut self, key: &[u8]) {
self.remove(key);
}
fn contains_key(&self, key: &[u8]) -> bool {
self.contains_key(key)
}
fn scan(
&self,
start: Bound<&[u8]>,
end: Bound<&[u8]>,
) -> Box<dyn DoubleEndedIterator<Item = (Bytes, Bytes)> + '_> {
Box::new(
self.range::<[u8], _>((start, end))
.map(|(k, v)| (k.clone(), v.clone())),
)
}
fn len(&self) -> usize {
self.len()
}
fn size(&self) -> usize {
self.iter().fold(0, |acc, (k, v)| acc + k.len() + v.len())
}
fn export_all(&self) -> Bytes {
default_binary_format::export_by_scan(self)
}
fn import_all(&mut self, bytes: Bytes) -> Result<(), String> {
default_binary_format::import(self, bytes)
}
fn clone_store(&self) -> Arc<Mutex<dyn KvStore>> {
Arc::new(Mutex::new(self.clone()))
}
}
#[cfg(test)]
mod test {
use super::*;
fn remove(&mut self, key: &[u8]) {
self.remove(key);
}
#[test]
fn test_export_and_import_all() {
let mut store1 = MemKvStore::default();
store1.insert(Bytes::from("key1"), Bytes::from("value1"));
store1.insert(Bytes::from("key2"), Bytes::from("value2"));
fn contains_key(&self, key: &[u8]) -> bool {
self.contains_key(key)
}
let exported = store1.export_all();
assert!(!exported.is_empty());
fn scan(
&self,
start: Bound<&[u8]>,
end: Bound<&[u8]>,
) -> Box<dyn DoubleEndedIterator<Item = (Bytes, Bytes)> + '_> {
Box::new(
self.range::<[u8], _>((start, end))
.map(|(k, v)| (k.clone(), v.clone())),
)
}
let mut store2 = MemKvStore::default();
let result = store2.import_all(exported);
fn len(&self) -> usize {
self.len()
}
assert!(result.is_ok());
assert_eq!(
store2.get(&Bytes::from("key1")),
Some(&Bytes::from("value1"))
);
assert_eq!(
store2.get(&Bytes::from("key2")),
Some(&Bytes::from("value2"))
);
assert_eq!(store1.len(), store2.len());
assert_eq!(store1.size(), store2.size());
assert_eq!(store1, store2);
}
fn is_empty(&self) -> bool {
self.is_empty()
}
fn size(&self) -> usize {
self.iter().fold(0, |acc, (k, v)| acc + k.len() + v.len())
}
fn export_all(&mut self) -> Bytes {
default_binary_format::export_by_scan(self)
}
fn import_all(&mut self, bytes: Bytes) -> Result<(), String> {
default_binary_format::import(self, bytes)
}
fn clone_store(&self) -> Arc<Mutex<dyn KvStore>> {
Arc::new(Mutex::new(self.clone()))
}
}

View file

@ -25,7 +25,7 @@ pub use undo::UndoManager;
pub use utils::subscription::Subscription;
pub mod awareness;
pub mod cursor;
mod kv_store;
pub mod kv_store;
pub mod loro;
pub mod obs;
pub mod oplog;

View file

@ -217,7 +217,12 @@ impl LoroDoc {
/// Is the document empty? (no ops)
#[inline(always)]
pub fn can_reset_with_snapshot(&self) -> bool {
self.oplog.lock().unwrap().is_empty() && self.state.lock().unwrap().is_empty()
let oplog = self.oplog.lock().unwrap();
if oplog.batch_importing {
return false;
}
oplog.is_empty() && self.state.lock().unwrap().is_empty()
}
/// Whether [OpLog] and [DocState] are detached.
@ -1301,7 +1306,10 @@ impl LoroDoc {
.id_to_idx(&pos.container)
.ok_or(CannotFindRelativePosition::ContainerDeleted)?;
// We know where the target id is when we trace back to the delete_op_id.
let delete_op_id = find_last_delete_op(&oplog, id, idx).unwrap();
let Some(delete_op_id) = find_last_delete_op(&oplog, id, idx) else {
tracing::error!("Cannot find id {}", id);
return Err(CannotFindRelativePosition::IdNotFound);
};
// Should use persist mode so that it will force all the diff calculators to use the `checkout` mode
let mut diff_calc = DiffCalculator::new(true);
let before_frontiers: Frontiers = oplog.dag.find_deps_of_id(delete_op_id);
@ -1491,7 +1499,7 @@ impl LoroDoc {
}
fn find_last_delete_op(oplog: &OpLog, id: ID, idx: ContainerIdx) -> Option<ID> {
let start_vv = oplog.dag.frontiers_to_vv(&id.into()).unwrap();
let start_vv = oplog.dag.frontiers_to_vv(&id.into())?;
for change in oplog.iter_changes_causally_rev(&start_vv, oplog.vv()) {
for op in change.ops.iter().rev() {
if op.container != idx {

View file

@ -130,12 +130,14 @@ impl Op {
InnerContent::List(l) => match l {
crate::container::list::list_op::InnerListOp::Insert { .. } => {
if matches!(self.container.get_type(), ContainerType::Text) {
Some(size)
Some(size.min(self.atom_len()))
} else {
Some(size / 4)
Some((size / 4).min(self.atom_len()))
}
}
crate::container::list::list_op::InnerListOp::InsertText { .. } => Some(size),
crate::container::list::list_op::InnerListOp::InsertText { .. } => {
Some(size.min(self.atom_len()))
}
_ => unreachable!(),
},
_ => unreachable!(),
@ -175,6 +177,7 @@ impl HasLength for Op {
impl Sliceable for Op {
fn slice(&self, from: usize, to: usize) -> Self {
assert!(to > from, "{to} should be greater than {from}");
assert!(to <= self.atom_len());
let content: InnerContent = self.content.slice(from, to);
Op {
counter: (self.counter + from as Counter),

View file

@ -229,10 +229,19 @@ impl HasLength for InnerContent {
impl Sliceable for InnerContent {
fn slice(&self, from: usize, to: usize) -> Self {
match self {
a @ InnerContent::Map(_) => a.clone(),
a @ InnerContent::Tree(_) => a.clone(),
a @ InnerContent::Map(_) => {
assert!(from == 0 && to == 1);
a.clone()
}
a @ InnerContent::Tree(_) => {
assert!(from == 0 && to == 1);
a.clone()
}
InnerContent::List(x) => InnerContent::List(x.slice(from, to)),
InnerContent::Future(f) => InnerContent::Future(f.clone()),
InnerContent::Future(f) => {
assert!(from == 0 && to == 1);
InnerContent::Future(f.clone())
}
}
}
}

View file

@ -18,6 +18,7 @@ use loro_common::{
Counter, HasCounterSpan, HasId, HasIdSpan, HasLamportSpan, IdLp, IdSpan, Lamport, LoroError,
LoroResult, PeerID, ID,
};
use loro_kv_store::MemKvStore;
use once_cell::sync::OnceCell;
use rle::{HasLength, Mergable, RlePush, RleVec, Sliceable};
use std::{
@ -108,7 +109,8 @@ impl ChangeStore {
})),
arena: a.clone(),
external_vv: Arc::new(Mutex::new(VersionVector::new())),
external_kv: Arc::new(Mutex::new(BTreeMap::new())),
external_kv: Arc::new(Mutex::new(MemKvStore::default())),
// external_kv: Arc::new(Mutex::new(BTreeMap::default())),
merge_interval,
}
}
@ -120,7 +122,8 @@ impl ChangeStore {
pub(super) fn encode_all(&self, vv: &VersionVector, frontiers: &Frontiers) -> Bytes {
self.flush_and_compact(vv, frontiers);
self.external_kv.lock().unwrap().export_all()
let mut kv = self.external_kv.lock().unwrap();
kv.export_all()
}
#[tracing::instrument(skip(self), level = "debug")]
@ -873,7 +876,7 @@ mod mut_inner_kv {
self.insert_change(new_change, false);
}
debug_assert_eq!(total_len, original_len);
assert_eq!(total_len, original_len);
}
fn _insert_splitted_change(
@ -919,6 +922,17 @@ mod mut_inner_kv {
let mut iter = store
.scan(Bound::Unbounded, Bound::Included(&id.to_bytes()))
.filter(|(id, _)| id.len() == 12);
// println!(
// "\nkeys {:?}",
// store
// .scan(Bound::Unbounded, Bound::Included(&id.to_bytes()))
// .filter(|(id, _)| id.len() == 12)
// .map(|(k, _v)| ID::from_bytes(&k))
// .count()
// );
// println!("id {:?}", id);
let (b_id, b_bytes) = iter.next_back()?;
let block_id: ID = ID::from_bytes(&b_id[..]);
let block = ChangesBlock::from_bytes(b_bytes).unwrap();

View file

@ -41,15 +41,16 @@ impl InnerStore {
if let std::collections::hash_map::Entry::Vacant(e) = self.store.entry(idx) {
let id = self.arena.get_container_id(idx).unwrap();
let key = id.to_bytes();
if !self.all_loaded && self.kv.contains_key(&key) {
let c = ContainerWrapper::new_from_bytes(self.kv.get(&key).unwrap());
e.insert(c);
return self.store.get_mut(&idx).unwrap();
} else {
let c = f();
e.insert(c);
self.len += 1;
if !self.all_loaded {
if let Some(v) = self.kv.get(&key) {
let c = ContainerWrapper::new_from_bytes(v);
e.insert(c);
return self.store.get_mut(&idx).unwrap();
}
}
let c = f();
e.insert(c);
self.len += 1;
}
self.store.get_mut(&idx).unwrap()
@ -59,9 +60,11 @@ impl InnerStore {
if let std::collections::hash_map::Entry::Vacant(e) = self.store.entry(idx) {
let id = self.arena.get_container_id(idx).unwrap();
let key = id.to_bytes();
if !self.all_loaded && self.kv.contains_key(&key) {
let c = ContainerWrapper::new_from_bytes(self.kv.get(&key).unwrap());
e.insert(c);
if !self.all_loaded {
if let Some(v) = self.kv.get(&key) {
let c = ContainerWrapper::new_from_bytes(v);
e.insert(c);
}
}
}

View file

@ -7,6 +7,7 @@ use std::{
use bytes::Bytes;
use fxhash::FxHashMap;
use loro_common::ContainerID;
use loro_kv_store::MemKvStore;
use crate::kv_store::KvStore;
@ -33,7 +34,8 @@ impl Clone for KvWrapper {
impl KvWrapper {
pub fn new_mem() -> Self {
Self {
kv: Arc::new(Mutex::new(BTreeMap::new())),
// kv: Arc::new(Mutex::new(BTreeMap::new())),
kv: Arc::new(Mutex::new(MemKvStore::default())),
}
}
@ -43,7 +45,7 @@ impl KvWrapper {
}
pub fn export(&self) -> Bytes {
let kv = self.kv.lock().unwrap();
let mut kv = self.kv.lock().unwrap();
kv.export_all()
}

View file

@ -5,3 +5,4 @@ opt-level = "s"
# But it's valuable for debugging.
debug = true
codegen-units = 1
strip = true

View file

@ -1,5 +1,45 @@
# Changelog
## 0.16.10
### Patch Changes
- 7cf54e8: Fix batch importing with snapshot
## 0.16.9
### Patch Changes
- a761430: Fix build script
## 0.16.8
### Patch Changes
- 38b4bcf: Add text update API
- Remove the patch for crypto
- Add text update API (#404)
- Check invalid root container name (#411)
### 🐛 Bug Fixes
- Workaround lldb bug make loro crate debuggable (#414)
- Delete the **bring back** tree node from the undo container remap (#423)
### 📚 Documentation
- Fix typo
- Refine docs about event (#417)
### 🎨 Styling
- Use clippy to perf code (#407)
### ⚙️ Miscellaneous Tasks
- Add test tools (#410)
## 0.16.7
### Patch Changes

View file

@ -15,11 +15,11 @@ wasm-bindgen = "=0.2.92"
serde-wasm-bindgen = { version = "^0.6.5" }
wasm-bindgen-derive = "0.2.1"
console_error_panic_hook = { version = "0.1.6", optional = true }
getrandom = { version = "0.2.10", features = ["js"] }
getrandom = { version = "0.2.15", features = ["js"] }
serde = { workspace = true }
rle = { path = "../rle", package = "loro-rle" }
tracing-wasm = "0.2.1"
tracing = { version = "0.1" }
tracing = { version = "0.1", features = ["release_max_level_warn"] }
serde_json = "1"
[features]

View file

@ -1,6 +1,6 @@
{
"name": "loro-wasm",
"version": "0.16.7",
"version": "0.16.10",
"description": "Loro CRDTs is a high-performance CRDT framework that makes your app state synchronized, collaborative and maintainable effortlessly.",
"keywords": [
"crdt",

View file

@ -1,5 +1,11 @@
const { webcrypto } = require("crypto");
Object.defineProperty(globalThis, 'crypto', {
value: webcrypto,
writable: true
});
// Don't patch this if it already exists (for example in Deno)
if (!globalThis.crypto) {
// We need this patch because we use `getrandom` crate in Rust, which relies on this patch
// for nodejs
// https://docs.rs/getrandom/latest/getrandom/#nodejs-es-module-support
const { webcrypto } = require("crypto");
Object.defineProperty(globalThis, 'crypto', {
value: webcrypto,
writable: true
});
}

View file

@ -1580,7 +1580,7 @@ impl LoroText {
/// text.insert(0, "Hello");
/// text.update("Hello World");
/// ```
pub fn update(&self, text: &str) -> () {
pub fn update(&self, text: &str) {
self.handler.update(text);
}

View file

@ -1,8 +1,8 @@
use std::sync::Arc;
use std::{cmp::Ordering, sync::Arc};
use loro_internal::{
change::{Change, Lamport, Timestamp},
id::ID,
id::{Counter, ID},
version::Frontiers,
};
@ -19,14 +19,36 @@ use loro_internal::{
/// The length of the `Change` is how many operations it contains
#[derive(Debug, Clone)]
pub struct ChangeMeta {
pub id: ID,
pub lamport: Lamport,
pub id: ID,
pub timestamp: Timestamp,
pub message: Option<Arc<str>>,
pub deps: Frontiers,
pub len: usize,
}
impl PartialEq for ChangeMeta {
fn eq(&self, other: &Self) -> bool {
self.lamport == other.lamport && self.id == other.id
}
}
impl Eq for ChangeMeta {}
impl PartialOrd for ChangeMeta {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Ord for ChangeMeta {
fn cmp(&self, other: &Self) -> Ordering {
(self.lamport + self.len as Lamport)
.cmp(&(other.lamport + other.len as Lamport))
.then(self.id.peer.cmp(&other.id.peer))
}
}
impl ChangeMeta {
pub(super) fn from_change(c: &Change) -> Self {
Self {

View file

@ -1,7 +1,7 @@
#![doc = include_str!("../README.md")]
#![warn(missing_docs)]
#![warn(missing_debug_implementations)]
use change_meta::ChangeMeta;
pub use change_meta::ChangeMeta;
use either::Either;
use event::{DiffEvent, Subscriber};
use loro_internal::container::IntoContainerId;
@ -45,6 +45,7 @@ pub use loro_internal::handler::TextDelta;
pub use loro_internal::id::{PeerID, TreeID, ID};
pub use loro_internal::json;
pub use loro_internal::json::JsonSchema;
pub use loro_internal::kv_store::{KvStore, MemKvStore};
pub use loro_internal::loro::CommitOptions;
pub use loro_internal::loro::DocAnalysis;
pub use loro_internal::obs::SubID;
@ -251,7 +252,7 @@ impl LoroDoc {
///
/// The data can be in arbitrary order. The import result will be the same.
#[inline]
pub fn import_batch(&mut self, bytes: &[Vec<u8>]) -> LoroResult<()> {
pub fn import_batch(&self, bytes: &[Vec<u8>]) -> LoroResult<()> {
self.doc.import_batch(bytes)
}

View file

@ -0,0 +1,14 @@
use loro::LoroDoc;
#[ctor::ctor]
fn init() {
dev_utils::setup_test_log();
}
#[test]
fn issue_0() {
let bytes = include_bytes!("./issue_0.bin");
let doc = LoroDoc::new();
doc.import_batch(&[bytes.into()]).unwrap();
doc.export_snapshot();
}

Binary file not shown.

View file

@ -1,5 +1,52 @@
# Changelog
## 0.16.10
### Patch Changes
- 7cf54e8: Fix batch importing with snapshot
- Updated dependencies [7cf54e8]
- loro-wasm@0.16.10
## 0.16.9
### Patch Changes
- a761430: Fix build script
- Updated dependencies [a761430]
- loro-wasm@0.16.9
## 0.16.8
### Patch Changes
- 38b4bcf: Add text update API
- Remove the patch for crypto
- Add text update API (#404)
- Check invalid root container name (#411)
### 🐛 Bug Fixes
- Workaround lldb bug make loro crate debuggable (#414)
- Delete the **bring back** tree node from the undo container remap (#423)
### 📚 Documentation
- Fix typo
- Refine docs about event (#417)
### 🎨 Styling
- Use clippy to perf code (#407)
### ⚙️ Miscellaneous Tasks
- Add test tools (#410)
- Updated dependencies [38b4bcf]
- loro-wasm@0.16.8
## 0.16.7
### Patch Changes

View file

@ -1,6 +1,6 @@
{
"name": "loro-crdt",
"version": "0.16.7",
"version": "0.16.10",
"description": "Loro CRDTs is a high-performance CRDT framework that makes your app state synchronized, collaborative and maintainable effortlessly.",
"keywords": [
"crdt",