fix: return err if snapshot container has unknown container (#488)

This commit is contained in:
Zixuan Chen 2024-10-02 14:24:57 +08:00 committed by GitHub
parent 09a004e365
commit de93d34a9c
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 138 additions and 106 deletions

View file

@ -110,12 +110,15 @@ pub enum LoroTreeError {
TreeNodeDeletedOrNotExist(TreeID),
}
#[non_exhaustive]
#[derive(Error, Debug, PartialEq)]
pub enum LoroEncodeError {
#[error("The frontiers are not found in this doc: {0}")]
FrontiersNotFound(String),
#[error("Trimmed snapshot incompatible with old snapshot format. Use new snapshot format or avoid trimmed snapshots for storage.")]
TrimmedSnapshotIncompatibleWithOldFormat,
#[error("Cannot export trimmed snapshot with unknown container type. Please upgrade the Loro version.")]
UnknownContainer,
}
#[cfg(feature = "wasm")]

View file

@ -1,15 +1,14 @@
pub(crate) mod arena;
mod encode_reordered;
mod fast_snapshot;
pub(crate) mod json_schema;
mod outdated_encode_reordered;
mod trimmed_snapshot;
pub(crate) mod value;
pub(crate) mod value_register;
use std::borrow::Cow;
pub(crate) use encode_reordered::{
pub(crate) use outdated_encode_reordered::{
decode_op, encode_op, get_op_prop, EncodedDeleteStartId, IterableEncodedDeleteStartId,
};
pub(crate) mod json_schema;
mod outdated_fast_snapshot;
mod trimmed_snapshot;
pub(crate) use value::OwnedValue;
use crate::op::OpWithId;
use crate::version::Frontiers;
@ -19,7 +18,7 @@ use loro_common::{IdLpSpan, IdSpan, LoroEncodeError, LoroResult, PeerID, ID};
use num_traits::{FromPrimitive, ToPrimitive};
use rle::{HasLength, Sliceable};
use serde::{Deserialize, Serialize};
pub(crate) use value::OwnedValue;
use std::borrow::Cow;
#[non_exhaustive]
#[derive(Debug, Clone)]
@ -234,7 +233,7 @@ pub(crate) fn encode_oplog(oplog: &OpLog, vv: &VersionVector, mode: EncodeMode)
};
let body = match &mode {
EncodeMode::OutdatedRle => encode_reordered::encode_updates(oplog, vv),
EncodeMode::OutdatedRle => outdated_encode_reordered::encode_updates(oplog, vv),
_ => unreachable!(),
};
@ -248,12 +247,10 @@ pub(crate) fn decode_oplog(
let ParsedHeaderAndBody { mode, body, .. } = parsed;
match mode {
EncodeMode::OutdatedRle | EncodeMode::OutdatedSnapshot => {
encode_reordered::decode_updates(oplog, body)
}
EncodeMode::FastSnapshot => outdated_fast_snapshot::decode_oplog(oplog, body),
EncodeMode::FastUpdates => {
outdated_fast_snapshot::decode_updates(oplog, body.to_vec().into())
outdated_encode_reordered::decode_updates(oplog, body)
}
EncodeMode::FastSnapshot => fast_snapshot::decode_oplog(oplog, body),
EncodeMode::FastUpdates => fast_snapshot::decode_updates(oplog, body.to_vec().into()),
EncodeMode::Auto => unreachable!(),
}
}
@ -331,7 +328,7 @@ fn encode_header_and_body(mode: EncodeMode, body: Vec<u8>) -> Vec<u8> {
}
pub(crate) fn export_snapshot(doc: &LoroDoc) -> Vec<u8> {
let body = encode_reordered::encode_snapshot(
let body = outdated_encode_reordered::encode_snapshot(
&doc.oplog().try_lock().unwrap(),
&mut doc.app_state().try_lock().unwrap(),
&Default::default(),
@ -342,30 +339,36 @@ pub(crate) fn export_snapshot(doc: &LoroDoc) -> Vec<u8> {
pub(crate) fn export_fast_snapshot(doc: &LoroDoc) -> Vec<u8> {
encode_with(EncodeMode::FastSnapshot, &mut |ans| {
outdated_fast_snapshot::encode_snapshot(doc, ans);
fast_snapshot::encode_snapshot(doc, ans);
Ok(())
})
.unwrap()
}
pub(crate) fn export_fast_snapshot_at(
pub(crate) fn export_snapshot_at(
doc: &LoroDoc,
frontiers: &Frontiers,
) -> Result<Vec<u8>, LoroEncodeError> {
check_target_version_reachable(doc, frontiers)?;
Ok(encode_with(EncodeMode::FastSnapshot, &mut |ans| {
outdated_fast_snapshot::encode_snapshot_at(doc, frontiers, ans).unwrap();
}))
encode_with(EncodeMode::FastSnapshot, &mut |ans| {
trimmed_snapshot::encode_snapshot_at(doc, frontiers, ans)
})
}
pub(crate) fn export_fast_updates(doc: &LoroDoc, vv: &VersionVector) -> Vec<u8> {
encode_with(EncodeMode::FastUpdates, &mut |ans| {
outdated_fast_snapshot::encode_updates(doc, vv, ans);
fast_snapshot::encode_updates(doc, vv, ans);
Ok(())
})
.unwrap()
}
pub(crate) fn export_fast_updates_in_range(oplog: &OpLog, spans: &[IdSpan]) -> Vec<u8> {
encode_with(EncodeMode::FastUpdates, &mut |ans| {
outdated_fast_snapshot::encode_updates_in_range(oplog, spans, ans);
fast_snapshot::encode_updates_in_range(oplog, spans, ans);
Ok(())
})
.unwrap()
}
pub(crate) fn export_trimmed_snapshot(
@ -373,9 +376,10 @@ pub(crate) fn export_trimmed_snapshot(
f: &Frontiers,
) -> Result<Vec<u8>, LoroEncodeError> {
check_target_version_reachable(doc, f)?;
Ok(encode_with(EncodeMode::FastSnapshot, &mut |ans| {
trimmed_snapshot::export_trimmed_snapshot(doc, f, ans).unwrap();
}))
encode_with(EncodeMode::FastSnapshot, &mut |ans| {
trimmed_snapshot::export_trimmed_snapshot(doc, f, ans)?;
Ok(())
})
}
fn check_target_version_reachable(doc: &LoroDoc, f: &Frontiers) -> Result<(), LoroEncodeError> {
@ -392,12 +396,16 @@ pub(crate) fn export_state_only_snapshot(
f: &Frontiers,
) -> Result<Vec<u8>, LoroEncodeError> {
check_target_version_reachable(doc, f)?;
Ok(encode_with(EncodeMode::FastSnapshot, &mut |ans| {
trimmed_snapshot::export_state_only_snapshot(doc, f, ans).unwrap();
}))
encode_with(EncodeMode::FastSnapshot, &mut |ans| {
trimmed_snapshot::export_state_only_snapshot(doc, f, ans)?;
Ok(())
})
}
fn encode_with(mode: EncodeMode, f: &mut dyn FnMut(&mut Vec<u8>)) -> Vec<u8> {
fn encode_with(
mode: EncodeMode,
f: &mut dyn FnMut(&mut Vec<u8>) -> Result<(), LoroEncodeError>,
) -> Result<Vec<u8>, LoroEncodeError> {
// HEADER
let mut ans = Vec::with_capacity(MIN_HEADER_SIZE);
ans.extend(MAGIC_BYTES);
@ -406,13 +414,13 @@ fn encode_with(mode: EncodeMode, f: &mut dyn FnMut(&mut Vec<u8>)) -> Vec<u8> {
ans.extend(mode.to_bytes());
// BODY
f(&mut ans);
f(&mut ans)?;
// CHECKSUM in HEADER
let checksum_body = &ans[20..];
let checksum = xxhash_rust::xxh32::xxh32(checksum_body, XXH_SEED);
ans[16..20].copy_from_slice(&checksum.to_le_bytes());
ans
Ok(ans)
}
pub(crate) fn decode_snapshot(
@ -421,10 +429,8 @@ pub(crate) fn decode_snapshot(
body: &[u8],
) -> Result<(), LoroError> {
match mode {
EncodeMode::OutdatedSnapshot => encode_reordered::decode_snapshot(doc, body),
EncodeMode::FastSnapshot => {
outdated_fast_snapshot::decode_snapshot(doc, body.to_vec().into())
}
EncodeMode::OutdatedSnapshot => outdated_encode_reordered::decode_snapshot(doc, body),
EncodeMode::FastSnapshot => fast_snapshot::decode_snapshot(doc, body.to_vec().into()),
_ => unreachable!(),
}
}
@ -453,7 +459,7 @@ pub struct ImportBlobMetadata {
impl LoroDoc {
/// Decodes the metadata for an imported blob from the provided bytes.
pub fn decode_import_blob_meta(blob: &[u8]) -> LoroResult<ImportBlobMetadata> {
encode_reordered::decode_import_blob_meta(blob)
outdated_encode_reordered::decode_import_blob_meta(blob)
}
}

View file

@ -8,7 +8,7 @@ use serde::{Deserialize, Serialize};
use serde_columnar::{columnar, ColumnarError};
use super::{
encode_reordered::{PeerIdx, MAX_DECODED_SIZE},
outdated_encode_reordered::{PeerIdx, MAX_DECODED_SIZE},
value::{Value, ValueDecodedArenasTrait, ValueEncodeRegister},
value_register::ValueRegister,
};

View file

@ -15,15 +15,12 @@
//!
use std::io::{Read, Write};
use crate::{
encoding::trimmed_snapshot, oplog::ChangeStore, version::Frontiers, LoroDoc, OpLog,
VersionVector,
};
use crate::{encoding::trimmed_snapshot, oplog::ChangeStore, LoroDoc, OpLog, VersionVector};
use bytes::{Buf, Bytes};
use loro_common::{IdSpan, LoroEncodeError, LoroError, LoroResult};
use loro_common::{IdSpan, LoroError, LoroResult};
use tracing::trace;
use super::encode_reordered::{import_changes_to_oplog, ImportChangesResult};
use super::outdated_encode_reordered::{import_changes_to_oplog, ImportChangesResult};
pub(crate) const EMPTY_MARK: &[u8] = b"E";
pub(super) struct Snapshot {
@ -211,53 +208,6 @@ pub(crate) fn encode_snapshot<W: std::io::Write>(doc: &LoroDoc, w: &mut W) {
}
}
pub(crate) fn encode_snapshot_at<W: std::io::Write>(
doc: &LoroDoc,
frontiers: &Frontiers,
w: &mut W,
) -> Result<(), LoroEncodeError> {
let version_before_start = doc.oplog_frontiers();
doc.checkout_without_emitting(frontiers).unwrap();
{
let mut state = doc.app_state().try_lock().unwrap();
let oplog = doc.oplog().try_lock().unwrap();
let is_gc = state.store.trimmed_store().is_some();
if is_gc {
unimplemented!()
}
assert!(!state.is_in_txn());
let Some(oplog_bytes) = oplog.fork_changes_up_to(frontiers) else {
return Err(LoroEncodeError::FrontiersNotFound(format!(
"frontiers: {:?} when export in SnapshotAt mode",
frontiers
)));
};
if oplog.is_trimmed() {
assert_eq!(
oplog.trimmed_frontiers(),
state.store.trimmed_frontiers().unwrap()
);
}
state.ensure_all_alive_containers();
let state_bytes = state.store.encode();
_encode_snapshot(
Snapshot {
oplog_bytes,
state_bytes: Some(state_bytes),
trimmed_bytes: Bytes::new(),
},
w,
);
}
doc.checkout_without_emitting(&version_before_start)
.unwrap();
doc.drop_pending_events();
Ok(())
}
pub(crate) fn decode_oplog(oplog: &mut OpLog, bytes: &[u8]) -> Result<(), LoroError> {
let oplog_len = u32::from_le_bytes(bytes[0..4].try_into().unwrap());
let oplog_bytes = &bytes[4..4 + oplog_len as usize];

View file

@ -22,7 +22,9 @@ use crate::{
OpLog, VersionVector,
};
use super::encode_reordered::{import_changes_to_oplog, ImportChangesResult, ValueRegister};
use super::outdated_encode_reordered::{
import_changes_to_oplog, ImportChangesResult, ValueRegister,
};
use json::{JsonOpContent, JsonSchema};
const SCHEMA_VERSION: u8 = 1;

View file

@ -1,13 +1,14 @@
use bytes::Bytes;
use rle::HasLength;
use std::collections::BTreeSet;
use loro_common::{ContainerID, LoroResult, ID};
use loro_common::{ContainerID, ContainerType, LoroEncodeError, ID};
use tracing::{debug, trace};
use crate::{
container::list::list_op::InnerListOp,
dag::{Dag, DagUtils},
encoding::outdated_fast_snapshot::{Snapshot, _encode_snapshot},
encoding::fast_snapshot::{Snapshot, _encode_snapshot},
state::container_store::FRONTIERS_KEY,
version::Frontiers,
LoroDoc,
@ -23,7 +24,7 @@ pub(crate) fn export_trimmed_snapshot<W: std::io::Write>(
doc: &LoroDoc,
start_from: &Frontiers,
w: &mut W,
) -> LoroResult<Frontiers> {
) -> Result<Frontiers, LoroEncodeError> {
let oplog = doc.oplog().try_lock().unwrap();
let start_from = calc_trimmed_doc_start(&oplog, start_from);
let mut start_vv = oplog.dag().frontiers_to_vv(&start_from).unwrap();
@ -62,9 +63,12 @@ pub(crate) fn export_trimmed_snapshot<W: std::io::Write>(
let latest_vv = oplog.vv();
let ops_num: usize = latest_vv.sub_iter(&start_vv).map(|x| x.atom_len()).sum();
drop(oplog);
doc.checkout_without_emitting(&start_from)?;
doc.checkout_without_emitting(&start_from).unwrap();
let mut state = doc.app_state().try_lock().unwrap();
let alive_containers = state.ensure_all_alive_containers();
if has_unknown_container(alive_containers.iter()) {
return Err(LoroEncodeError::UnknownContainer);
}
let mut alive_c_bytes: BTreeSet<Vec<u8>> =
alive_containers.iter().map(|x| x.to_bytes()).collect();
state.store.flush();
@ -119,11 +123,15 @@ pub(crate) fn export_trimmed_snapshot<W: std::io::Write>(
Ok(start_from)
}
fn has_unknown_container<'a>(mut cids: impl Iterator<Item = &'a ContainerID>) -> bool {
cids.any(|cid| matches!(cid.container_type(), ContainerType::Unknown(_)))
}
pub(crate) fn export_state_only_snapshot<W: std::io::Write>(
doc: &LoroDoc,
start_from: &Frontiers,
w: &mut W,
) -> LoroResult<Frontiers> {
) -> Result<Frontiers, LoroEncodeError> {
let oplog = doc.oplog().try_lock().unwrap();
let start_from = calc_trimmed_doc_start(&oplog, start_from);
let mut start_vv = oplog.dag().frontiers_to_vv(&start_from).unwrap();
@ -147,14 +155,13 @@ pub(crate) fn export_state_only_snapshot<W: std::io::Write>(
let state_frontiers = doc.state_frontiers();
let is_attached = !doc.is_detached();
drop(oplog);
doc.checkout_without_emitting(&start_from)?;
doc.checkout_without_emitting(&start_from).unwrap();
let mut state = doc.app_state().try_lock().unwrap();
let alive_containers = state.ensure_all_alive_containers();
let alive_c_bytes: BTreeSet<Vec<u8>> = alive_containers.iter().map(|x| x.to_bytes()).collect();
let alive_c_bytes = cids_to_bytes(alive_containers);
state.store.flush();
let trimmed_state_kv = state.store.get_kv().clone();
drop(state);
let state_bytes = None;
trimmed_state_kv.retain_keys(&alive_c_bytes);
trimmed_state_kv.insert(FRONTIERS_KEY, start_from.encode().into());
let trimmed_state_bytes = trimmed_state_kv.export();
@ -162,7 +169,7 @@ pub(crate) fn export_state_only_snapshot<W: std::io::Write>(
// println!("oplog_bytes.len = {:?}", oplog_bytes.len());
let snapshot = Snapshot {
oplog_bytes,
state_bytes,
state_bytes: None,
trimmed_bytes: trimmed_state_bytes,
};
_encode_snapshot(snapshot, w);
@ -179,6 +186,16 @@ pub(crate) fn export_state_only_snapshot<W: std::io::Write>(
Ok(start_from)
}
fn cids_to_bytes(
alive_containers: std::collections::HashSet<
ContainerID,
std::hash::BuildHasherDefault<fxhash::FxHasher>,
>,
) -> BTreeSet<Vec<u8>> {
let alive_c_bytes: BTreeSet<Vec<u8>> = alive_containers.iter().map(|x| x.to_bytes()).collect();
alive_c_bytes
}
/// Calculates optimal starting version for the trimmed doc
///
/// It should be the LCA of the user given version and the latest version.
@ -210,3 +227,58 @@ fn calc_trimmed_doc_start(oplog: &crate::OpLog, frontiers: &Frontiers) -> Fronti
start
}
pub(crate) fn encode_snapshot_at<W: std::io::Write>(
doc: &LoroDoc,
frontiers: &Frontiers,
w: &mut W,
) -> Result<(), LoroEncodeError> {
let version_before_start = doc.oplog_frontiers();
doc.checkout_without_emitting(frontiers).unwrap();
{
let mut state = doc.app_state().try_lock().unwrap();
let oplog = doc.oplog().try_lock().unwrap();
let is_trimmed = state.store.trimmed_store().is_some();
if is_trimmed {
unimplemented!()
}
assert!(!state.is_in_txn());
let Some(oplog_bytes) = oplog.fork_changes_up_to(frontiers) else {
return Err(LoroEncodeError::FrontiersNotFound(format!(
"frontiers: {:?} when export in SnapshotAt mode",
frontiers
)));
};
if oplog.is_trimmed() {
assert_eq!(
oplog.trimmed_frontiers(),
state.store.trimmed_frontiers().unwrap()
);
}
let alive_containers = state.ensure_all_alive_containers();
if has_unknown_container(alive_containers.iter()) {
return Err(LoroEncodeError::UnknownContainer);
}
let alive_c_bytes = cids_to_bytes(alive_containers);
state.store.flush();
let state_kv = state.store.get_kv().clone();
state_kv.retain_keys(&alive_c_bytes);
let bytes = state_kv.export();
_encode_snapshot(
Snapshot {
oplog_bytes,
state_bytes: Some(bytes),
trimmed_bytes: Bytes::new(),
},
w,
);
}
doc.checkout_without_emitting(&version_before_start)
.unwrap();
doc.drop_pending_events();
Ok(())
}

View file

@ -10,7 +10,7 @@ use std::sync::Arc;
use crate::{
change::Lamport, container::tree::tree_op::TreeOp,
encoding::encode_reordered::MAX_COLLECTION_SIZE,
encoding::outdated_encode_reordered::MAX_COLLECTION_SIZE,
};
use super::{

View file

@ -33,10 +33,9 @@ use crate::{
dag::DagUtils,
diff_calc::DiffCalculator,
encoding::{
decode_snapshot, export_fast_snapshot, export_fast_snapshot_at, export_fast_updates,
export_fast_updates_in_range, export_snapshot, export_state_only_snapshot,
export_trimmed_snapshot, json_schema::json::JsonSchema, parse_header_and_body, EncodeMode,
ParsedHeaderAndBody,
decode_snapshot, export_fast_snapshot, export_fast_updates, export_fast_updates_in_range,
export_snapshot, export_snapshot_at, export_state_only_snapshot, export_trimmed_snapshot,
json_schema::json::JsonSchema, parse_header_and_body, EncodeMode, ParsedHeaderAndBody,
},
event::{str_to_path, EventTriggerKind, Index, InternalDocDiff},
handler::{Handler, MovableListHandler, TextHandler, TreeHandler, ValueOrHandler},
@ -1484,7 +1483,7 @@ impl LoroDoc {
Some(f) => export_state_only_snapshot(self, &f)?,
None => export_state_only_snapshot(self, &self.oplog_frontiers())?,
},
ExportMode::SnapshotAt { version } => export_fast_snapshot_at(self, &version)?,
ExportMode::SnapshotAt { version } => export_snapshot_at(self, &version)?,
};
self.renew_txn_if_auto_commit();