mirror of
https://github.com/loro-dev/loro.git
synced 2025-01-23 13:39:12 +00:00
fix: dep counter
This commit is contained in:
parent
697f9708b9
commit
b8e27dc011
3 changed files with 125 additions and 57 deletions
|
@ -641,12 +641,14 @@ fn check_synced(sites: &mut [Actor]) {
|
||||||
let (a, b) = array_mut_ref!(sites, [i, j]);
|
let (a, b) = array_mut_ref!(sites, [i, j]);
|
||||||
let a_doc = &mut a.loro;
|
let a_doc = &mut a.loro;
|
||||||
let b_doc = &mut b.loro;
|
let b_doc = &mut b.loro;
|
||||||
a_doc
|
// a_doc
|
||||||
.decode(&b_doc.encode_with_cfg(EncodeConfig::rle_update(a_doc.vv_cloned())))
|
// .decode(&b_doc.encode_with_cfg(EncodeConfig::rle_update(a_doc.vv_cloned())))
|
||||||
.unwrap();
|
// .unwrap();
|
||||||
b_doc
|
// b_doc
|
||||||
.decode(&a_doc.encode_with_cfg(EncodeConfig::rle_update(b_doc.vv_cloned())))
|
// .decode(&a_doc.encode_with_cfg(EncodeConfig::rle_update(b_doc.vv_cloned())))
|
||||||
.unwrap();
|
// .unwrap();
|
||||||
|
a_doc.decode(&b_doc.encode_all()).unwrap();
|
||||||
|
b_doc.decode(&a_doc.encode_all()).unwrap();
|
||||||
check_eq(a, b);
|
check_eq(a, b);
|
||||||
debug_log::group_end!();
|
debug_log::group_end!();
|
||||||
}
|
}
|
||||||
|
@ -681,7 +683,6 @@ pub fn normalize(site_num: u8, actions: &mut [Action]) -> Vec<Action> {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn test_multi_sites(site_num: u8, actions: &mut [Action]) {
|
pub fn test_multi_sites(site_num: u8, actions: &mut [Action]) {
|
||||||
// println!("{:?}", actions);
|
|
||||||
let mut sites = Vec::new();
|
let mut sites = Vec::new();
|
||||||
for i in 0..site_num {
|
for i in 0..site_num {
|
||||||
sites.push(Actor::new(i as u64));
|
sites.push(Actor::new(i as u64));
|
||||||
|
@ -695,7 +696,6 @@ pub fn test_multi_sites(site_num: u8, actions: &mut [Action]) {
|
||||||
sites.apply_action(action);
|
sites.apply_action(action);
|
||||||
}
|
}
|
||||||
|
|
||||||
// println!("{}", actions.table());
|
|
||||||
debug_log::group!("check synced");
|
debug_log::group!("check synced");
|
||||||
check_synced(&mut sites);
|
check_synced(&mut sites);
|
||||||
debug_log::group_end!();
|
debug_log::group_end!();
|
||||||
|
@ -984,7 +984,7 @@ mod failed_tests {
|
||||||
#[test]
|
#[test]
|
||||||
fn list_unknown() {
|
fn list_unknown() {
|
||||||
test_multi_sites(
|
test_multi_sites(
|
||||||
3,
|
5,
|
||||||
&mut [
|
&mut [
|
||||||
List {
|
List {
|
||||||
site: 139,
|
site: 139,
|
||||||
|
@ -1018,7 +1018,7 @@ mod failed_tests {
|
||||||
#[test]
|
#[test]
|
||||||
fn path_issue() {
|
fn path_issue() {
|
||||||
test_multi_sites(
|
test_multi_sites(
|
||||||
2,
|
5,
|
||||||
&mut [
|
&mut [
|
||||||
List {
|
List {
|
||||||
site: 1,
|
site: 1,
|
||||||
|
|
|
@ -239,6 +239,12 @@ impl LogStore {
|
||||||
client_id: self.this_client_id,
|
client_id: self.this_client_id,
|
||||||
counter: self.get_next_counter(self.this_client_id),
|
counter: self.get_next_counter(self.this_client_id),
|
||||||
};
|
};
|
||||||
|
if id.counter > 0 {
|
||||||
|
let self_dep = id.inc(-1);
|
||||||
|
if !self.frontiers.contains(&self_dep) {
|
||||||
|
self.frontiers.push(self_dep);
|
||||||
|
}
|
||||||
|
}
|
||||||
let last = ops.last().unwrap();
|
let last = ops.last().unwrap();
|
||||||
let last_ctr = last.ctr_last();
|
let last_ctr = last.ctr_last();
|
||||||
let last_id = ID::new(self.this_client_id, last_ctr);
|
let last_id = ID::new(self.this_client_id, last_ctr);
|
||||||
|
|
|
@ -18,7 +18,7 @@ use crate::{
|
||||||
registry::ContainerIdx,
|
registry::ContainerIdx,
|
||||||
Container, ContainerID,
|
Container, ContainerID,
|
||||||
},
|
},
|
||||||
dag::remove_included_frontiers,
|
dag::{remove_included_frontiers, Dag},
|
||||||
event::RawEvent,
|
event::RawEvent,
|
||||||
hierarchy::Hierarchy,
|
hierarchy::Hierarchy,
|
||||||
id::{ClientID, Counter, ID},
|
id::{ClientID, Counter, ID},
|
||||||
|
@ -123,8 +123,8 @@ impl EncodedStateContent {
|
||||||
#[columnar(vec, ser, de)]
|
#[columnar(vec, ser, de)]
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
struct SnapshotOpEncoding {
|
struct SnapshotOpEncoding {
|
||||||
#[columnar(strategy = "Rle", original_type = "u32")]
|
#[columnar(strategy = "Rle", original_type = "usize")]
|
||||||
container: u32,
|
container: usize,
|
||||||
/// key index or insert/delete pos
|
/// key index or insert/delete pos
|
||||||
#[columnar(strategy = "DeltaRle")]
|
#[columnar(strategy = "DeltaRle")]
|
||||||
prop: usize,
|
prop: usize,
|
||||||
|
@ -196,6 +196,7 @@ pub(super) fn encode_snapshot(store: &LogStore, gc: bool) -> Result<Vec<u8>, Lor
|
||||||
debug_log::debug_dbg!(&store.vv);
|
debug_log::debug_dbg!(&store.vv);
|
||||||
debug_log::debug_dbg!(&store.changes);
|
debug_log::debug_dbg!(&store.changes);
|
||||||
let mut client_id_to_idx: FxHashMap<ClientID, ClientIdx> = FxHashMap::default();
|
let mut client_id_to_idx: FxHashMap<ClientID, ClientIdx> = FxHashMap::default();
|
||||||
|
let mut container_id_to_idx: FxHashMap<ContainerID, usize> = FxHashMap::default();
|
||||||
let mut clients = Vec::with_capacity(store.changes.len());
|
let mut clients = Vec::with_capacity(store.changes.len());
|
||||||
let mut change_num = 0;
|
let mut change_num = 0;
|
||||||
for (key, changes) in store.changes.iter() {
|
for (key, changes) in store.changes.iter() {
|
||||||
|
@ -205,7 +206,7 @@ pub(super) fn encode_snapshot(store: &LogStore, gc: bool) -> Result<Vec<u8>, Lor
|
||||||
}
|
}
|
||||||
|
|
||||||
let (_, containers) = store.reg.export();
|
let (_, containers) = store.reg.export();
|
||||||
for container_id in containers.iter() {
|
for (idx, container_id) in containers.iter().enumerate() {
|
||||||
let container = store.reg.get(container_id).unwrap();
|
let container = store.reg.get(container_id).unwrap();
|
||||||
container
|
container
|
||||||
.upgrade()
|
.upgrade()
|
||||||
|
@ -213,6 +214,7 @@ pub(super) fn encode_snapshot(store: &LogStore, gc: bool) -> Result<Vec<u8>, Lor
|
||||||
.try_lock()
|
.try_lock()
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.initialize_pool_mapping();
|
.initialize_pool_mapping();
|
||||||
|
container_id_to_idx.insert(container_id.clone(), idx);
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut changes = Vec::with_capacity(change_num);
|
let mut changes = Vec::with_capacity(change_num);
|
||||||
|
@ -239,6 +241,7 @@ pub(super) fn encode_snapshot(store: &LogStore, gc: bool) -> Result<Vec<u8>, Lor
|
||||||
for op in change.ops.iter() {
|
for op in change.ops.iter() {
|
||||||
let container_idx = op.container;
|
let container_idx = op.container;
|
||||||
let container_id = store.reg.get_id(container_idx).unwrap();
|
let container_id = store.reg.get_id(container_idx).unwrap();
|
||||||
|
let container_idx = container_id_to_idx.get(container_id).unwrap();
|
||||||
let container = store.reg.get(container_id).unwrap();
|
let container = store.reg.get(container_id).unwrap();
|
||||||
let new_ops = container
|
let new_ops = container
|
||||||
.upgrade()
|
.upgrade()
|
||||||
|
@ -251,7 +254,7 @@ pub(super) fn encode_snapshot(store: &LogStore, gc: bool) -> Result<Vec<u8>, Lor
|
||||||
let (prop, value, value2) =
|
let (prop, value, value2) =
|
||||||
convert_inner_content(&op_content, &mut key_to_idx, &mut keys);
|
convert_inner_content(&op_content, &mut key_to_idx, &mut keys);
|
||||||
ops.push(SnapshotOpEncoding {
|
ops.push(SnapshotOpEncoding {
|
||||||
container: container_idx.to_u32(),
|
container: *container_idx,
|
||||||
prop,
|
prop,
|
||||||
value,
|
value,
|
||||||
value2,
|
value2,
|
||||||
|
@ -319,15 +322,44 @@ pub(super) fn decode_snapshot(
|
||||||
}
|
}
|
||||||
return Ok(vec![]);
|
return Ok(vec![]);
|
||||||
}
|
}
|
||||||
|
let mut idx_to_container_type = FxHashMap::default();
|
||||||
|
for (idx, container_id) in containers.iter().enumerate() {
|
||||||
|
idx_to_container_type.insert(idx, container_id.container_type());
|
||||||
|
}
|
||||||
|
|
||||||
|
// calc vv
|
||||||
|
let vv = calc_vv(&change_encodings, &ops, &clients, &idx_to_container_type);
|
||||||
|
let can_load = match vv.partial_cmp(&store.vv) {
|
||||||
|
Some(ord) => match ord {
|
||||||
|
std::cmp::Ordering::Less => {
|
||||||
|
// TODO warning
|
||||||
|
println!("[Warning] the vv of encoded snapshot is smaller than self, no change is applied");
|
||||||
|
return Ok(vec![]);
|
||||||
|
}
|
||||||
|
std::cmp::Ordering::Equal => return Ok(vec![]),
|
||||||
|
std::cmp::Ordering::Greater => store.vv.is_empty(),
|
||||||
|
},
|
||||||
|
None => false,
|
||||||
|
};
|
||||||
|
|
||||||
let mut op_iter = ops.into_iter();
|
let mut op_iter = ops.into_iter();
|
||||||
let mut changes_dq = FxHashMap::default();
|
let mut changes_dq = FxHashMap::default();
|
||||||
let mut deps_iter = deps.into_iter();
|
let mut deps_iter = deps.into_iter();
|
||||||
let mut container_idx2type = FxHashMap::default();
|
let mut idx_to_container_idx = FxHashMap::default();
|
||||||
|
|
||||||
for container_id in containers.iter() {
|
// the container_idx need to be calculated first
|
||||||
let container_idx = store.reg.get_or_create_container_idx(container_id);
|
// because the op needs the corresponding container (in new or old store)
|
||||||
container_idx2type.insert(container_idx, container_id.container_type());
|
let new_loro = LoroCore::default();
|
||||||
|
let mut new_store = new_loro.log_store.try_write().unwrap();
|
||||||
|
let mut new_hierarchy = new_loro.hierarchy.try_lock().unwrap();
|
||||||
|
|
||||||
|
for (idx, container_id) in containers.iter().enumerate() {
|
||||||
|
let container_idx = if can_load {
|
||||||
|
store.reg.get_or_create_container_idx(container_id)
|
||||||
|
} else {
|
||||||
|
new_store.reg.get_or_create_container_idx(container_id)
|
||||||
|
};
|
||||||
|
idx_to_container_idx.insert(idx, container_idx);
|
||||||
}
|
}
|
||||||
|
|
||||||
for (_, this_changes_encoding) in &change_encodings.into_iter().group_by(|c| c.client_idx) {
|
for (_, this_changes_encoding) in &change_encodings.into_iter().group_by(|c| c.client_idx) {
|
||||||
|
@ -359,8 +391,7 @@ pub(super) fn decode_snapshot(
|
||||||
value2,
|
value2,
|
||||||
} = op;
|
} = op;
|
||||||
|
|
||||||
let container_idx = ContainerIdx::from_u32(container_idx);
|
let container_type = idx_to_container_type[&container_idx];
|
||||||
let container_type = container_idx2type[&container_idx];
|
|
||||||
let content = match container_type {
|
let content = match container_type {
|
||||||
ContainerType::Map => {
|
ContainerType::Map => {
|
||||||
let key = keys[prop].clone();
|
let key = keys[prop].clone();
|
||||||
|
@ -395,7 +426,7 @@ pub(super) fn decode_snapshot(
|
||||||
};
|
};
|
||||||
let op = Op {
|
let op = Op {
|
||||||
counter: counter + delta,
|
counter: counter + delta,
|
||||||
container: container_idx,
|
container: idx_to_container_idx[&container_idx],
|
||||||
content,
|
content,
|
||||||
};
|
};
|
||||||
delta += op.content_len() as i32;
|
delta += op.content_len() as i32;
|
||||||
|
@ -411,7 +442,7 @@ pub(super) fn decode_snapshot(
|
||||||
.next()
|
.next()
|
||||||
.unwrap_or(0);
|
.unwrap_or(0);
|
||||||
} else {
|
} else {
|
||||||
deps.push(ID::new(client_id, next_change_deps_counter));
|
deps.push(ID::new(client_id, next_change_deps_counter - 1));
|
||||||
}
|
}
|
||||||
next_change_deps_counter += delta;
|
next_change_deps_counter += delta;
|
||||||
let change = Change {
|
let change = Change {
|
||||||
|
@ -466,22 +497,6 @@ pub(super) fn decode_snapshot(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let vv: VersionVector = changes
|
|
||||||
.values()
|
|
||||||
.map(|changes| changes.last().unwrap().id_last())
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
debug_log::debug_dbg!(&vv, &changes_dq);
|
|
||||||
|
|
||||||
let can_load = match vv.partial_cmp(&store.vv) {
|
|
||||||
Some(ord) => match ord {
|
|
||||||
std::cmp::Ordering::Less => false,
|
|
||||||
std::cmp::Ordering::Equal => return Ok(vec![]),
|
|
||||||
std::cmp::Ordering::Greater => true,
|
|
||||||
},
|
|
||||||
None => false,
|
|
||||||
};
|
|
||||||
|
|
||||||
if can_load {
|
if can_load {
|
||||||
let mut import_context = load_snapshot(
|
let mut import_context = load_snapshot(
|
||||||
store,
|
store,
|
||||||
|
@ -490,15 +505,11 @@ pub(super) fn decode_snapshot(
|
||||||
changes,
|
changes,
|
||||||
containers,
|
containers,
|
||||||
container_states,
|
container_states,
|
||||||
container_idx2type,
|
|
||||||
&keys,
|
&keys,
|
||||||
&clients,
|
&clients,
|
||||||
);
|
);
|
||||||
Ok(store.get_events(hierarchy, &mut import_context))
|
Ok(store.get_events(hierarchy, &mut import_context))
|
||||||
} else {
|
} else {
|
||||||
let new_loro = LoroCore::default();
|
|
||||||
let mut new_store = new_loro.log_store.try_write().unwrap();
|
|
||||||
let mut new_hierarchy = new_loro.hierarchy.try_lock().unwrap();
|
|
||||||
load_snapshot(
|
load_snapshot(
|
||||||
&mut new_store,
|
&mut new_store,
|
||||||
&mut new_hierarchy,
|
&mut new_hierarchy,
|
||||||
|
@ -506,7 +517,6 @@ pub(super) fn decode_snapshot(
|
||||||
changes,
|
changes,
|
||||||
containers,
|
containers,
|
||||||
container_states,
|
container_states,
|
||||||
container_idx2type,
|
|
||||||
&keys,
|
&keys,
|
||||||
&clients,
|
&clients,
|
||||||
);
|
);
|
||||||
|
@ -523,7 +533,6 @@ fn load_snapshot(
|
||||||
changes: FxHashMap<ClientID, RleVecWithIndex<Change, ChangeMergeCfg>>,
|
changes: FxHashMap<ClientID, RleVecWithIndex<Change, ChangeMergeCfg>>,
|
||||||
containers: Vec<ContainerID>,
|
containers: Vec<ContainerID>,
|
||||||
container_states: Vec<EncodedStateContent>,
|
container_states: Vec<EncodedStateContent>,
|
||||||
mut container_idx2type: FxHashMap<ContainerIdx, ContainerType>,
|
|
||||||
keys: &[InternalString],
|
keys: &[InternalString],
|
||||||
clients: &[u64],
|
clients: &[u64],
|
||||||
) -> ImportContext {
|
) -> ImportContext {
|
||||||
|
@ -538,17 +547,18 @@ fn load_snapshot(
|
||||||
let mut import_context = ImportContext {
|
let mut import_context = ImportContext {
|
||||||
old_frontiers: smallvec![],
|
old_frontiers: smallvec![],
|
||||||
new_frontiers: frontiers.get_frontiers(),
|
new_frontiers: frontiers.get_frontiers(),
|
||||||
old_vv: VersionVector::new(),
|
old_vv: new_store.vv(),
|
||||||
spans: vv.diff(&new_store.vv).left,
|
spans: vv.diff(&new_store.vv).left,
|
||||||
new_vv: vv.clone(),
|
new_vv: vv.clone(),
|
||||||
diff: Default::default(),
|
diff: Default::default(),
|
||||||
patched_old_vv: None,
|
patched_old_vv: None,
|
||||||
};
|
};
|
||||||
for (container_id, pool_mapping) in containers.into_iter().zip(container_states.into_iter()) {
|
for (container_id, pool_mapping) in containers.into_iter().zip(container_states.into_iter()) {
|
||||||
let container_idx = new_store.reg.get_or_create_container_idx(&container_id);
|
|
||||||
container_idx2type.insert(container_idx, container_id.container_type());
|
|
||||||
let state = pool_mapping.into_state(keys, clients);
|
let state = pool_mapping.into_state(keys, clients);
|
||||||
let container = new_store.reg.get_by_idx(container_idx).unwrap();
|
let container = new_store
|
||||||
|
.get_or_create_container(&container_id)
|
||||||
|
.upgrade()
|
||||||
|
.unwrap();
|
||||||
let mut container = container.try_lock().unwrap();
|
let mut container = container.try_lock().unwrap();
|
||||||
container.to_import_snapshot(state, new_hierarchy, &mut import_context);
|
container.to_import_snapshot(state, new_hierarchy, &mut import_context);
|
||||||
}
|
}
|
||||||
|
@ -571,21 +581,73 @@ fn load_snapshot(
|
||||||
import_context
|
import_context
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn calc_vv(
|
||||||
|
changes_encoding: &[ChangeEncoding],
|
||||||
|
ops_encoding: &[SnapshotOpEncoding],
|
||||||
|
clients: &[ClientID],
|
||||||
|
idx_to_container_type: &FxHashMap<usize, ContainerType>,
|
||||||
|
) -> VersionVector {
|
||||||
|
let mut vv = FxHashMap::default();
|
||||||
|
let mut op_iter = ops_encoding.iter();
|
||||||
|
for (client_idx, this_changes_encoding) in &changes_encoding.iter().group_by(|c| c.client_idx) {
|
||||||
|
let client_id = clients[client_idx as usize];
|
||||||
|
let mut counter = 0;
|
||||||
|
for change_encoding in this_changes_encoding {
|
||||||
|
let op_len = change_encoding.op_len;
|
||||||
|
let mut delta = 0;
|
||||||
|
for op in op_iter.by_ref().take(op_len as usize) {
|
||||||
|
let SnapshotOpEncoding {
|
||||||
|
container: container_idx,
|
||||||
|
prop: _,
|
||||||
|
value,
|
||||||
|
value2,
|
||||||
|
} = *op;
|
||||||
|
let container_type = idx_to_container_type[&container_idx];
|
||||||
|
let op_content_len = match container_type {
|
||||||
|
ContainerType::Map => 1,
|
||||||
|
_ => {
|
||||||
|
let is_del = value2 == ENCODED_DELETED_CONTENT;
|
||||||
|
if is_del {
|
||||||
|
value
|
||||||
|
} else {
|
||||||
|
let is_unknown = value2 == ENCODED_UNKNOWN_SLICE;
|
||||||
|
if is_unknown {
|
||||||
|
value
|
||||||
|
} else {
|
||||||
|
value2 as u64
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
delta += op_content_len as i32;
|
||||||
|
}
|
||||||
|
counter += delta;
|
||||||
|
}
|
||||||
|
vv.insert(client_id, counter);
|
||||||
|
}
|
||||||
|
vv.into()
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod test {
|
mod test {
|
||||||
use crate::{log_store::EncodeConfig, LoroCore};
|
use crate::{ContainerType, LoroCore};
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn cannot_load() {
|
fn cannot_load() {
|
||||||
let mut loro = LoroCore::new(Default::default(), Some(1));
|
let mut loro = LoroCore::new(Default::default(), Some(1));
|
||||||
let mut text = loro.get_text("text");
|
let mut list = loro.get_list("list");
|
||||||
text.insert(&loro, 0, "abc").unwrap();
|
list.insert(&loro, 0, ContainerType::Text).unwrap();
|
||||||
let snapshot = loro.encode_all();
|
|
||||||
|
|
||||||
let mut loro2 = LoroCore::new(Default::default(), Some(2));
|
let mut loro2 = LoroCore::new(Default::default(), Some(2));
|
||||||
let mut text2 = loro2.get_text("text");
|
loro2.import(loro.export(loro2.vv_cloned()));
|
||||||
text2.insert(&loro2, 0, "efg").unwrap();
|
|
||||||
loro2.decode(&snapshot).unwrap();
|
let mut list2 = loro2.get_list("list");
|
||||||
assert_eq!(text2.get_value().to_json_pretty(), "\"abcefg\"");
|
list2.delete(&loro2, 0, 1).unwrap();
|
||||||
|
loro.import(loro2.export(loro.vv_cloned()));
|
||||||
|
|
||||||
|
let mut list = loro.get_list("list");
|
||||||
|
list.insert(&loro, 0, "abc").unwrap();
|
||||||
|
loro2.decode(&loro.encode_all()).unwrap();
|
||||||
|
loro.decode(&loro2.encode_all()).unwrap();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue