fix: decode iter return result by updating columnar to 0.3.4 (#309)

This commit is contained in:
Leon Zhao 2024-04-01 17:29:07 +08:00 committed by GitHub
parent 3da28459b0
commit 51890ff8d8
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 461 additions and 407 deletions

785
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -16,3 +16,4 @@ enum_dispatch = "0.3.11"
debug-log = { version = "0.3.1", features = [] }
enum-as-inner = "0.5.1"
fxhash = "0.2.1"
serde_columnar = { version = "0.3.4" }

View file

@ -22,7 +22,7 @@ enum-as-inner = "0.6.0"
string_cache = "0.8"
arbitrary = { version = "1.3.0", features = ["derive"] }
js-sys = { version = "0.3.60", optional = true }
serde_columnar = "0.3.3"
serde_columnar = { workspace = true }
nonmax = "0.5.5"
[features]

View file

@ -31,7 +31,7 @@ serde-wasm-bindgen = { version = "0.5.0", optional = true }
js-sys = { version = "0.3.60", optional = true }
serde_json = { version = "1" }
arref = "0.1.0"
serde_columnar = { version = "0.3.3" }
serde_columnar = { workspace = true }
append-only-bytes = { version = "0.1.12", features = ["u32_range"] }
itertools = "0.11.0"
enum_dispatch = { workspace = true }

View file

@ -9,7 +9,7 @@ use loro_common::{
};
use num_traits::FromPrimitive;
use rle::HasLength;
use serde_columnar::columnar;
use serde_columnar::{columnar, ColumnarError};
use crate::{
arena::SharedArena,
@ -270,7 +270,7 @@ fn decode_changes<'a>(
encoded_changes: IterableEncodedChange<'_>,
mut counters: Vec<i32>,
peer_ids: arena::PeerIdArena,
mut deps: impl Iterator<Item = arena::EncodedDep> + 'a,
mut deps: impl Iterator<Item = Result<arena::EncodedDep, ColumnarError>> + 'a,
mut ops_map: std::collections::HashMap<
u64,
Vec<Op>,
@ -278,15 +278,15 @@ fn decode_changes<'a>(
>,
) -> LoroResult<Vec<Change>> {
let mut changes = Vec::with_capacity(encoded_changes.size_hint().0);
for EncodedChange {
peer_idx,
mut len,
timestamp,
deps_len,
dep_on_self,
msg_len: _,
} in encoded_changes
{
for encoded_change in encoded_changes {
let EncodedChange {
peer_idx,
mut len,
timestamp,
deps_len,
dep_on_self,
msg_len: _,
} = encoded_change?;
if peer_ids.peer_ids.len() <= peer_idx || counters.len() <= peer_idx {
return Err(LoroError::DecodeDataCorruptionError);
}
@ -312,7 +312,7 @@ fn decode_changes<'a>(
}
for _ in 0..deps_len {
let dep = deps.next().ok_or(LoroError::DecodeDataCorruptionError)?;
let dep = deps.next().ok_or(LoroError::DecodeDataCorruptionError)??;
change
.deps
.push(ID::new(peer_ids.peer_ids[dep.peer_idx], dep.counter));
@ -342,8 +342,8 @@ struct ExtractedOps {
#[allow(clippy::too_many_arguments)]
fn extract_ops(
raw_values: &[u8],
iter: impl Iterator<Item = EncodedOp>,
mut del_iter: impl Iterator<Item = EncodedDeleteStartId>,
iter: impl Iterator<Item = Result<EncodedOp, ColumnarError>>,
mut del_iter: impl Iterator<Item = Result<EncodedDeleteStartId, ColumnarError>>,
arena: &SharedArena,
containers: &ContainerArena,
keys: &arena::KeyArena,
@ -358,14 +358,14 @@ fn extract_ops(
.map(|x| x.as_container_id(&keys.keys, &peer_ids.peer_ids))
.try_collect()?;
let mut ops = Vec::new();
for EncodedOp {
container_index,
prop,
peer_idx,
value_type,
counter,
} in iter
{
for op in iter {
let EncodedOp {
container_index,
prop,
peer_idx,
value_type,
counter,
} = op?;
if containers.len() <= container_index as usize
|| peer_ids.peer_ids.len() <= peer_idx as usize
{
@ -779,12 +779,12 @@ fn decode_snapshot_states(
) -> LoroResult<()> {
let mut state_blob_index: usize = 0;
let mut ops_index: usize = 0;
for EncodedStateInfo {
container_index,
mut op_len,
state_bytes_len,
} in encoded_state_iter
{
for encoded_state in encoded_state_iter {
let EncodedStateInfo {
container_index,
mut op_len,
state_bytes_len,
} = encoded_state?;
if op_len == 0 && state_bytes_len == 0 {
continue;
}
@ -1217,7 +1217,7 @@ mod encode {
fn decode_op(
cid: &ContainerID,
kind: ValueKind,
del_iter: &mut impl Iterator<Item = EncodedDeleteStartId>,
del_iter: &mut impl Iterator<Item = Result<EncodedDeleteStartId, ColumnarError>>,
value_reader: &mut ValueReader<'_>,
arena: &crate::arena::SharedArena,
prop: i32,
@ -1240,7 +1240,7 @@ fn decode_op(
)
}
ValueKind::DeleteSeq => {
let del_start = del_iter.next().unwrap();
let del_start = del_iter.next().unwrap()?;
let peer_idx = del_start.peer_idx;
let cnt = del_start.counter;
let len = del_start.len;
@ -1314,7 +1314,7 @@ fn decode_op(
)
}
ValueKind::DeleteSeq => {
let del_start = del_iter.next().unwrap();
let del_start = del_iter.next().unwrap()?;
let peer_idx = del_start.peer_idx;
let cnt = del_start.counter;
let len = del_start.len;
@ -2285,7 +2285,7 @@ mod arena {
use crate::InternalString;
use loro_common::{ContainerID, ContainerType, LoroError, LoroResult, PeerID};
use serde::{Deserialize, Serialize};
use serde_columnar::columnar;
use serde_columnar::{columnar, ColumnarError};
use super::{encode::ValueRegister, PeerIdx, MAX_DECODED_SIZE};
@ -2316,7 +2316,7 @@ mod arena {
pub(super) peer_ids: PeerIdArena,
pub(super) containers: ContainerArena,
pub(super) keys: KeyArena,
pub deps: Box<dyn Iterator<Item = EncodedDep> + 'a>,
pub deps: Box<dyn Iterator<Item = Result<EncodedDep, ColumnarError>> + 'a>,
pub state_blob_arena: &'a [u8],
}
@ -2539,7 +2539,9 @@ mod arena {
serde_columnar::to_vec(&self).unwrap()
}
pub fn decode_iter(bytes: &[u8]) -> LoroResult<impl Iterator<Item = EncodedDep> + '_> {
pub fn decode_iter(
bytes: &[u8],
) -> LoroResult<impl Iterator<Item = Result<EncodedDep, ColumnarError>> + '_> {
let iter = serde_columnar::iter_from_bytes::<DepsArena>(bytes)?;
Ok(iter.deps)
}

View file

@ -14,6 +14,6 @@ keywords = ["crdt", "local-first"]
[dependencies]
serde = { version = "1", features = ["derive"] }
serde_columnar = { version = "0.3.2" }
serde_columnar = { workspace = true }
loro-common = { path = "../loro-common", version = "0.2.0" }
bytes = "1.4.0"

View file

@ -1,6 +1,6 @@
use bytes::{BufMut, BytesMut};
use loro_common::{ContainerID, InternalString, LoroError, LoroResult, LoroValue, ID};
use serde_columnar::{columnar, to_vec};
use serde_columnar::{columnar, to_vec, ColumnarError};
use std::borrow::Cow;
use serde::{Deserialize, Serialize};
@ -184,7 +184,9 @@ pub struct TextRanges {
impl TextRanges {
#[inline]
pub fn decode_iter(bytes: &[u8]) -> LoroResult<impl Iterator<Item = TextRange> + '_> {
pub fn decode_iter(
bytes: &[u8],
) -> LoroResult<impl Iterator<Item = Result<TextRange, ColumnarError>> + '_> {
let iter = serde_columnar::iter_from_bytes::<TextRanges>(bytes)?;
Ok(iter.ranges)
}