mirror of
https://github.com/loro-dev/loro.git
synced 2024-11-24 12:20:06 +00:00
fix: fork at should restore detached state (#523)
This commit is contained in:
parent
77024c378f
commit
484d6db7a1
8 changed files with 53 additions and 24 deletions
|
@ -3,7 +3,7 @@ use rle::HasLength;
|
|||
use std::collections::BTreeSet;
|
||||
|
||||
use loro_common::{ContainerID, ContainerType, LoroEncodeError, ID};
|
||||
use tracing::{debug, trace};
|
||||
use tracing::debug;
|
||||
|
||||
use crate::{
|
||||
container::list::list_op::InnerListOp,
|
||||
|
@ -95,7 +95,6 @@ pub(crate) fn export_shallow_snapshot_inner(
|
|||
if let ContainerID::Normal { peer, counter, .. } = cid {
|
||||
let temp_id = ID::new(peer, counter);
|
||||
if !start_from.contains(&temp_id) {
|
||||
trace!("Retain Container {:?}", temp_id);
|
||||
alive_c_bytes.insert(cid.to_bytes());
|
||||
}
|
||||
} else {
|
||||
|
@ -253,9 +252,10 @@ pub(crate) fn encode_snapshot_at<W: std::io::Write>(
|
|||
frontiers: &Frontiers,
|
||||
w: &mut W,
|
||||
) -> Result<(), LoroEncodeError> {
|
||||
let was_detached = doc.is_detached();
|
||||
let version_before_start = doc.oplog_frontiers();
|
||||
doc.checkout_without_emitting(frontiers, true).unwrap();
|
||||
{
|
||||
let result = 'block: {
|
||||
let mut state = doc.app_state().try_lock().unwrap();
|
||||
let oplog = doc.oplog().try_lock().unwrap();
|
||||
let is_shallow = state.store.shallow_root_store().is_some();
|
||||
|
@ -265,7 +265,7 @@ pub(crate) fn encode_snapshot_at<W: std::io::Write>(
|
|||
|
||||
assert!(!state.is_in_txn());
|
||||
let Some(oplog_bytes) = oplog.fork_changes_up_to(frontiers) else {
|
||||
return Err(LoroEncodeError::FrontiersNotFound(format!(
|
||||
break 'block Err(LoroEncodeError::FrontiersNotFound(format!(
|
||||
"frontiers: {:?} when export in SnapshotAt mode",
|
||||
frontiers
|
||||
)));
|
||||
|
@ -280,7 +280,7 @@ pub(crate) fn encode_snapshot_at<W: std::io::Write>(
|
|||
|
||||
let alive_containers = state.ensure_all_alive_containers();
|
||||
if has_unknown_container(alive_containers.iter()) {
|
||||
return Err(LoroEncodeError::UnknownContainer);
|
||||
break 'block Err(LoroEncodeError::UnknownContainer);
|
||||
}
|
||||
|
||||
let alive_c_bytes = cids_to_bytes(alive_containers);
|
||||
|
@ -296,9 +296,14 @@ pub(crate) fn encode_snapshot_at<W: std::io::Write>(
|
|||
},
|
||||
w,
|
||||
);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
};
|
||||
doc.checkout_without_emitting(&version_before_start, false)
|
||||
.unwrap();
|
||||
if !was_detached {
|
||||
doc.set_detached(false);
|
||||
}
|
||||
doc.drop_pending_events();
|
||||
Ok(())
|
||||
result
|
||||
}
|
||||
|
|
|
@ -19,7 +19,7 @@ use std::{
|
|||
Arc, Mutex, Weak,
|
||||
},
|
||||
};
|
||||
use tracing::{debug_span, info, info_span, instrument, trace, warn};
|
||||
use tracing::{debug_span, info, info_span, instrument, warn};
|
||||
|
||||
use crate::{
|
||||
arena::SharedArena,
|
||||
|
@ -1059,7 +1059,10 @@ impl LoroDoc {
|
|||
frontiers: &Frontiers,
|
||||
to_shrink_frontiers: bool,
|
||||
) -> Result<(), LoroError> {
|
||||
self.commit_then_stop();
|
||||
let had_txn = self.txn.try_lock().unwrap().is_some();
|
||||
if had_txn {
|
||||
self.commit_then_stop();
|
||||
}
|
||||
let from_frontiers = self.state_frontiers();
|
||||
info!(
|
||||
"checkout from={:?} to={:?} cur_vv={:?}",
|
||||
|
@ -1069,14 +1072,18 @@ impl LoroDoc {
|
|||
);
|
||||
|
||||
if &from_frontiers == frontiers {
|
||||
self.renew_txn_if_auto_commit();
|
||||
if had_txn {
|
||||
self.renew_txn_if_auto_commit();
|
||||
}
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let oplog = self.oplog.try_lock().unwrap();
|
||||
if oplog.dag.is_before_shallow_root(frontiers) {
|
||||
drop(oplog);
|
||||
self.renew_txn_if_auto_commit();
|
||||
if had_txn {
|
||||
self.renew_txn_if_auto_commit();
|
||||
}
|
||||
return Err(LoroError::SwitchToVersionBeforeShallowRoot);
|
||||
}
|
||||
|
||||
|
@ -1088,7 +1095,9 @@ impl LoroDoc {
|
|||
};
|
||||
if from_frontiers == frontiers {
|
||||
drop(oplog);
|
||||
self.renew_txn_if_auto_commit();
|
||||
if had_txn {
|
||||
self.renew_txn_if_auto_commit();
|
||||
}
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
|
@ -1098,17 +1107,20 @@ impl LoroDoc {
|
|||
if !oplog.dag.contains(i) {
|
||||
drop(oplog);
|
||||
drop(state);
|
||||
self.renew_txn_if_auto_commit();
|
||||
if had_txn {
|
||||
self.renew_txn_if_auto_commit();
|
||||
}
|
||||
return Err(LoroError::FrontiersNotFound(i));
|
||||
}
|
||||
}
|
||||
|
||||
trace!("state.frontiers={:?}", &state.frontiers);
|
||||
let before = &oplog.dag.frontiers_to_vv(&state.frontiers).unwrap();
|
||||
let Some(after) = &oplog.dag.frontiers_to_vv(&frontiers) else {
|
||||
drop(oplog);
|
||||
drop(state);
|
||||
self.renew_txn_if_auto_commit();
|
||||
if had_txn {
|
||||
self.renew_txn_if_auto_commit();
|
||||
}
|
||||
return Err(LoroError::NotFoundError(
|
||||
format!("Cannot find the specified version {:?}", frontiers).into_boxed_str(),
|
||||
));
|
||||
|
|
|
@ -15,7 +15,7 @@ use fxhash::{FxHashMap, FxHashSet};
|
|||
use itertools::Itertools;
|
||||
use loro_common::{ContainerID, LoroError, LoroResult};
|
||||
use loro_delta::DeltaItem;
|
||||
use tracing::{info_span, instrument, trace, warn};
|
||||
use tracing::{info_span, instrument, warn};
|
||||
|
||||
use crate::{
|
||||
configure::{Configure, DefaultRandom, SecureRandomGenerator},
|
||||
|
@ -972,10 +972,6 @@ impl DocState {
|
|||
}
|
||||
|
||||
pub fn can_import_snapshot(&self) -> bool {
|
||||
trace!("in_txn: {:?}", self.in_txn);
|
||||
trace!("store: {:?}", self.store.is_empty());
|
||||
trace!("arena: {:?}", self.arena.can_import_snapshot());
|
||||
|
||||
!self.in_txn && self.arena.can_import_snapshot() && self.store.can_import_snapshot()
|
||||
}
|
||||
|
||||
|
|
|
@ -168,6 +168,7 @@ impl ContainerStore {
|
|||
self.store.get_kv()
|
||||
}
|
||||
|
||||
#[allow(unused)]
|
||||
pub fn is_empty(&self) -> bool {
|
||||
self.store.is_empty()
|
||||
}
|
||||
|
|
|
@ -254,6 +254,7 @@ impl InnerStore {
|
|||
self.len
|
||||
}
|
||||
|
||||
#[allow(unused)]
|
||||
pub(crate) fn is_empty(&self) -> bool {
|
||||
self.len == 0
|
||||
}
|
||||
|
|
|
@ -440,6 +440,7 @@ impl Transaction {
|
|||
|
||||
let last_id = change.id_last();
|
||||
if let Err(err) = oplog.import_local_change(change) {
|
||||
state.abort_txn();
|
||||
drop(state);
|
||||
drop(oplog);
|
||||
return Err(err);
|
||||
|
|
|
@ -1959,3 +1959,15 @@ fn test_loro_doc() {
|
|||
doc.get_text("text").insert(0, "Hello").unwrap();
|
||||
doc.state_vv();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_fork_at_should_restore_attached_state() {
|
||||
let doc = LoroDoc::new();
|
||||
doc.set_peer_id(0).unwrap();
|
||||
doc.get_text("text").insert(0, "Hello").unwrap();
|
||||
doc.fork_at(&[ID::new(0, 0)].into());
|
||||
assert!(!doc.is_detached());
|
||||
doc.detach();
|
||||
doc.fork_at(&[ID::new(0, 0)].into());
|
||||
assert!(doc.is_detached());
|
||||
}
|
||||
|
|
|
@ -294,8 +294,7 @@ it("fork at", () => {
|
|||
const doc = new LoroDoc();
|
||||
doc.setPeerId("0");
|
||||
doc.getText("text").insert(0, "Hello, world!");
|
||||
console.log("0");
|
||||
const newDoc = doc.forkAt([{ peer: "0", counter: 6 }]);
|
||||
const newDoc = doc.forkAt([{ peer: "0", counter: 5 }]);
|
||||
newDoc.setPeerId("1");
|
||||
newDoc.getText("text").insert(6, " Alice!");
|
||||
// ┌───────────────┐ ┌───────────────┐
|
||||
|
@ -305,8 +304,10 @@ it("fork at", () => {
|
|||
// │ ┌───────────────┐
|
||||
// └──│ Alice! │
|
||||
// └───────────────┘
|
||||
doc.import(newDoc.export({ mode: "update" }));
|
||||
console.log(doc.getText("text").toString()); // "Hello, world! Alice!"
|
||||
const updates = newDoc.export({ mode: "update" });
|
||||
doc.checkoutToLatest();
|
||||
doc.import(updates);
|
||||
expect(doc.getText("text").toString()).toEqual("Hello, world! Alice!");
|
||||
});
|
||||
|
||||
function one_ms(): Promise<void> {
|
||||
|
|
Loading…
Reference in a new issue