mirror of
https://github.com/loro-dev/loro.git
synced 2025-01-22 12:57:20 +00:00
perf: better tree fast snapshot
This commit is contained in:
parent
a8c1180810
commit
e19e1af543
4 changed files with 134 additions and 88 deletions
|
@ -97,6 +97,7 @@ impl std::fmt::Debug for DocState {
|
|||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy)]
|
||||
pub(crate) struct ContainerCreationContext<'a> {
|
||||
configure: &'a Configure,
|
||||
peer: PeerID,
|
||||
|
|
|
@ -103,7 +103,15 @@ impl ContainerStore {
|
|||
}
|
||||
|
||||
pub fn get_value(&mut self, idx: ContainerIdx) -> Option<LoroValue> {
|
||||
self.store.get_mut(&idx).map(|c| c.get_value())
|
||||
self.store.get_mut(&idx).map(|c| {
|
||||
c.get_value(
|
||||
idx,
|
||||
ContainerCreationContext {
|
||||
configure: &self.conf,
|
||||
peer: self.peer.load(std::sync::atomic::Ordering::Relaxed),
|
||||
},
|
||||
)
|
||||
})
|
||||
}
|
||||
|
||||
pub fn encode(&mut self) -> Bytes {
|
||||
|
@ -284,12 +292,12 @@ impl ContainerWrapper {
|
|||
self.state.as_mut().unwrap()
|
||||
}
|
||||
|
||||
pub fn get_value(&mut self) -> LoroValue {
|
||||
pub fn get_value(&mut self, idx: ContainerIdx, ctx: ContainerCreationContext) -> LoroValue {
|
||||
if let Some(v) = self.value.as_ref() {
|
||||
return v.clone();
|
||||
}
|
||||
|
||||
self.decode_value().unwrap();
|
||||
self.decode_value(idx, ctx).unwrap();
|
||||
if self.value.is_none() {
|
||||
return self.state.as_mut().unwrap().get_value();
|
||||
}
|
||||
|
@ -337,19 +345,19 @@ impl ContainerWrapper {
|
|||
}
|
||||
}
|
||||
|
||||
pub fn ensure_value(&mut self) -> &LoroValue {
|
||||
pub fn ensure_value(&mut self, idx: ContainerIdx, ctx: ContainerCreationContext) -> &LoroValue {
|
||||
if self.value.is_some() {
|
||||
} else if self.state.is_some() {
|
||||
let value = self.state.as_mut().unwrap().get_value();
|
||||
self.value = Some(value);
|
||||
} else {
|
||||
self.decode_value().unwrap();
|
||||
self.decode_value(idx, ctx).unwrap();
|
||||
}
|
||||
|
||||
self.value.as_ref().unwrap()
|
||||
}
|
||||
|
||||
fn decode_value(&mut self) -> LoroResult<()> {
|
||||
fn decode_value(&mut self, idx: ContainerIdx, ctx: ContainerCreationContext) -> LoroResult<()> {
|
||||
let Some(b) = self.bytes.as_ref() else {
|
||||
return Ok(());
|
||||
};
|
||||
|
@ -359,7 +367,13 @@ impl ContainerWrapper {
|
|||
ContainerType::Map => MapState::decode_value(b)?,
|
||||
ContainerType::List => ListState::decode_value(b)?,
|
||||
ContainerType::MovableList => MovableListState::decode_value(b)?,
|
||||
ContainerType::Tree => TreeState::decode_value(b)?,
|
||||
ContainerType::Tree => {
|
||||
assert!(self.state.is_none());
|
||||
let mut state = TreeState::decode_snapshot_fast(idx, (LoroValue::Null, b), ctx)?;
|
||||
self.value = Some(state.get_value());
|
||||
self.state = Some(State::TreeState(Box::new(state)));
|
||||
return Ok(());
|
||||
}
|
||||
#[cfg(feature = "counter")]
|
||||
ContainerType::Counter => CounterState::decode_value(b)?,
|
||||
ContainerType::Unknown(_) => UnknownState::decode_value(b)?,
|
||||
|
@ -378,7 +392,7 @@ impl ContainerWrapper {
|
|||
}
|
||||
|
||||
if self.value.is_none() {
|
||||
self.decode_value()?;
|
||||
self.decode_value(idx, ctx)?;
|
||||
}
|
||||
|
||||
let b = self.bytes.as_ref().unwrap();
|
||||
|
|
|
@ -5,6 +5,7 @@ use fxhash::FxHashMap;
|
|||
use itertools::Itertools;
|
||||
use loro_common::{
|
||||
ContainerID, IdFull, IdLp, LoroError, LoroResult, LoroTreeError, LoroValue, PeerID, TreeID,
|
||||
DELETED_TREE_ROOT,
|
||||
};
|
||||
use rand::SeedableRng;
|
||||
use rle::HasLength;
|
||||
|
@ -58,6 +59,17 @@ impl From<Option<TreeID>> for TreeParentId {
|
|||
}
|
||||
}
|
||||
|
||||
impl TreeParentId {
|
||||
fn id(&self) -> Option<TreeID> {
|
||||
match self {
|
||||
TreeParentId::Node(id) => Some(*id),
|
||||
TreeParentId::Root => None,
|
||||
TreeParentId::Deleted => Some(DELETED_TREE_ROOT),
|
||||
TreeParentId::Unexist => unreachable!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
enum NodeChildren {
|
||||
Vec(Vec<(NodePosition, TreeID)>),
|
||||
|
@ -729,6 +741,31 @@ impl TreeState {
|
|||
ans
|
||||
}
|
||||
|
||||
pub fn bfs_all_nodes(&self) -> Vec<TreeNode> {
|
||||
let mut ans = vec![];
|
||||
self._bfs_all_nodes(TreeParentId::Root, &mut ans);
|
||||
self._bfs_all_nodes(TreeParentId::Deleted, &mut ans);
|
||||
ans
|
||||
}
|
||||
|
||||
fn _bfs_all_nodes(&self, root: TreeParentId, ans: &mut Vec<TreeNode>) {
|
||||
let children = self.children.get(&root);
|
||||
if let Some(children) = children {
|
||||
for (index, (position, target)) in children.iter().enumerate() {
|
||||
ans.push(TreeNode {
|
||||
id: *target,
|
||||
parent: root.id(),
|
||||
position: position.position.clone(),
|
||||
index,
|
||||
});
|
||||
}
|
||||
|
||||
for (_, id) in children.iter() {
|
||||
self._bfs_all_nodes(TreeParentId::Node(*id), ans);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn nodes(&self) -> Vec<TreeID> {
|
||||
self.trees
|
||||
.keys()
|
||||
|
@ -1297,27 +1334,36 @@ mod jitter {
|
|||
}
|
||||
|
||||
mod snapshot {
|
||||
use std::io::Read;
|
||||
use std::{borrow::Cow, collections::BTreeSet, io::Read};
|
||||
|
||||
use fractional_index::FractionalIndex;
|
||||
use loro_common::{IdFull, PeerID, TreeID};
|
||||
use fxhash::FxHashMap;
|
||||
use itertools::Itertools;
|
||||
use loro_common::{IdFull, PeerID, TreeID, ID};
|
||||
use serde::{Deserialize as _, Serialize as _};
|
||||
use serde_columnar::columnar;
|
||||
|
||||
use crate::{encoding::value_register::ValueRegister, state::FastStateSnapshot};
|
||||
use crate::{
|
||||
encoding::{arena::PositionArena, value_register::ValueRegister},
|
||||
state::FastStateSnapshot,
|
||||
};
|
||||
|
||||
use super::{TreeNode, TreeParentId, TreeState};
|
||||
|
||||
#[columnar(vec, ser, de, iterable)]
|
||||
#[derive(Debug, Clone)]
|
||||
struct EncodedTreeNode {
|
||||
struct EncodedTreeNodeId {
|
||||
#[columnar(strategy = "DeltaRle")]
|
||||
peer_idx: usize,
|
||||
#[columnar(strategy = "DeltaRle")]
|
||||
counter: i32,
|
||||
}
|
||||
|
||||
#[columnar(vec, ser, de, iterable)]
|
||||
#[derive(Debug, Clone)]
|
||||
struct EncodedTreeNode {
|
||||
/// If this field is 0, it means none
|
||||
#[columnar(strategy = "DeltaRle")]
|
||||
parent_peer_idx_plus_one: usize,
|
||||
#[columnar(strategy = "DeltaRle")]
|
||||
parent_counter: i32,
|
||||
parent_idx_plus_one: usize,
|
||||
#[columnar(strategy = "DeltaRle")]
|
||||
last_set_peer_idx: usize,
|
||||
#[columnar(strategy = "DeltaRle")]
|
||||
|
@ -1326,61 +1372,75 @@ mod snapshot {
|
|||
last_set_lamport: u32,
|
||||
#[columnar(strategy = "DeltaRle")]
|
||||
index: u32,
|
||||
position: Vec<u8>,
|
||||
fractional_index_idx: usize,
|
||||
}
|
||||
|
||||
#[columnar(ser, de)]
|
||||
struct EncodedTree {
|
||||
struct EncodedTree<'a> {
|
||||
#[columnar(class = "vec", iter = "EncodedTreeNodeId")]
|
||||
node_ids: Vec<EncodedTreeNodeId>,
|
||||
#[columnar(class = "vec", iter = "EncodedTreeNode")]
|
||||
nodes: Vec<EncodedTreeNode>,
|
||||
alive_len: u32,
|
||||
reserved_has_effect_bool_rle: Vec<u8>,
|
||||
#[columnar(borrow)]
|
||||
fractional_indexes: Cow<'a, [u8]>,
|
||||
#[columnar(borrow)]
|
||||
reserved_has_effect_bool_rle: Cow<'a, [u8]>,
|
||||
}
|
||||
|
||||
fn encode(
|
||||
state: &TreeState,
|
||||
input: Vec<TreeNode>,
|
||||
alive_len: usize,
|
||||
) -> (ValueRegister<PeerID>, EncodedTree) {
|
||||
fn encode(state: &TreeState, input: Vec<TreeNode>) -> (ValueRegister<PeerID>, EncodedTree) {
|
||||
let mut peers: ValueRegister<PeerID> = ValueRegister::new();
|
||||
let mut position_set = BTreeSet::default();
|
||||
let mut nodes = Vec::with_capacity(input.len());
|
||||
let mut node_ids = Vec::with_capacity(input.len());
|
||||
let mut position_register = ValueRegister::new();
|
||||
let mut id_to_idx = FxHashMap::default();
|
||||
for node in input.iter() {
|
||||
position_set.insert(node.position.clone());
|
||||
let idx = node_ids.len();
|
||||
node_ids.push(EncodedTreeNodeId {
|
||||
peer_idx: peers.register(&node.id.peer),
|
||||
counter: node.id.counter,
|
||||
});
|
||||
id_to_idx.insert(node.id, idx);
|
||||
}
|
||||
|
||||
for p in position_set {
|
||||
position_register.register(&p);
|
||||
}
|
||||
|
||||
for node in input {
|
||||
let n = state.trees.get(&node.id).unwrap();
|
||||
let last_set_id = n.last_move_op;
|
||||
nodes.push(EncodedTreeNode {
|
||||
peer_idx: peers.register(&node.id.peer),
|
||||
counter: node.id.counter,
|
||||
parent_peer_idx_plus_one: node
|
||||
parent_idx_plus_one: node
|
||||
.parent
|
||||
.map(|p| peers.register(&p.peer) + 1)
|
||||
.map(|p| id_to_idx.get(&p).unwrap() + 1)
|
||||
.unwrap_or(0),
|
||||
parent_counter: node.parent.map(|p| p.counter).unwrap_or(0),
|
||||
last_set_peer_idx: peers.register(&last_set_id.peer),
|
||||
last_set_counter: last_set_id.counter,
|
||||
last_set_lamport: last_set_id.lamport,
|
||||
index: node.index as u32,
|
||||
position: node.position.as_bytes().to_vec(),
|
||||
fractional_index_idx: position_register.register(&node.position),
|
||||
})
|
||||
}
|
||||
|
||||
let position_vec = position_register.unwrap_vec();
|
||||
let positions = PositionArena::from_positions(position_vec.iter().map(|p| p.as_bytes()));
|
||||
(
|
||||
peers,
|
||||
EncodedTree {
|
||||
node_ids,
|
||||
nodes,
|
||||
alive_len: alive_len as u32,
|
||||
reserved_has_effect_bool_rle: vec![],
|
||||
fractional_indexes: positions.encode().into(),
|
||||
reserved_has_effect_bool_rle: vec![].into(),
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
impl FastStateSnapshot for TreeState {
|
||||
fn encode_snapshot_fast<W: std::io::prelude::Write>(&mut self, mut w: W) {
|
||||
let alive_tree_nodes = self.tree_nodes();
|
||||
let deleted_tree_nodes = self.deleted_tree_nodes();
|
||||
let alive_len = alive_tree_nodes.len();
|
||||
let mut all_nodes = alive_tree_nodes;
|
||||
all_nodes.extend(deleted_tree_nodes);
|
||||
let (peers, encoded) = encode(self, all_nodes, alive_len);
|
||||
let all_nodes = self.bfs_all_nodes();
|
||||
let (peers, encoded) = encode(self, all_nodes);
|
||||
let peers = peers.unwrap_vec();
|
||||
leb128::write::unsigned(&mut w, peers.len() as u64).unwrap();
|
||||
for peer in peers {
|
||||
|
@ -1391,41 +1451,8 @@ mod snapshot {
|
|||
w.write_all(&vec).unwrap();
|
||||
}
|
||||
|
||||
fn decode_value(
|
||||
mut bytes: &[u8],
|
||||
) -> loro_common::LoroResult<(loro_common::LoroValue, &[u8])> {
|
||||
// TODO: PREF: FIXME: The performance for decoding the whole tree
|
||||
// can be improved a lot
|
||||
let original_bytes = bytes;
|
||||
let peer_num = leb128::read::unsigned(&mut bytes).unwrap() as usize;
|
||||
let mut peers = Vec::with_capacity(peer_num);
|
||||
for _ in 0..peer_num {
|
||||
let mut buf = [0u8; 8];
|
||||
bytes.read_exact(&mut buf).unwrap();
|
||||
peers.push(PeerID::from_le_bytes(buf));
|
||||
}
|
||||
|
||||
let encoded: EncodedTree = serde_columnar::from_bytes(bytes)?;
|
||||
let mut nodes = Vec::new();
|
||||
for node in encoded.nodes.into_iter().take(encoded.alive_len as usize) {
|
||||
let node = TreeNode {
|
||||
id: TreeID::new(peers[node.peer_idx], node.counter),
|
||||
parent: if node.parent_peer_idx_plus_one == 0 {
|
||||
None
|
||||
} else {
|
||||
Some(TreeID::new(
|
||||
peers[node.parent_peer_idx_plus_one - 1],
|
||||
node.parent_counter,
|
||||
))
|
||||
},
|
||||
position: FractionalIndex::from_bytes(node.position),
|
||||
index: node.index as usize,
|
||||
};
|
||||
|
||||
nodes.push(node.into_value());
|
||||
}
|
||||
|
||||
Ok((nodes.into(), original_bytes))
|
||||
fn decode_value(_: &[u8]) -> loro_common::LoroResult<(loro_common::LoroValue, &[u8])> {
|
||||
unreachable!()
|
||||
}
|
||||
|
||||
fn decode_snapshot_fast(
|
||||
|
@ -1447,26 +1474,30 @@ mod snapshot {
|
|||
let mut tree =
|
||||
TreeState::new(idx, ctx.peer, ctx.configure.tree_position_jitter.clone());
|
||||
let encoded: EncodedTree = serde_columnar::from_bytes(bytes)?;
|
||||
for (i, node) in encoded.nodes.into_iter().enumerate() {
|
||||
let is_dead = i >= encoded.alive_len as usize;
|
||||
let fractional_indexes = PositionArena::decode(&encoded.fractional_indexes).unwrap();
|
||||
let fractional_indexes = fractional_indexes.parse_to_positions();
|
||||
let node_ids = encoded
|
||||
.node_ids
|
||||
.iter()
|
||||
.map(|x| TreeID::new(peers[x.peer_idx], x.counter))
|
||||
.collect_vec();
|
||||
for (node_id, node) in node_ids.iter().zip(encoded.nodes.into_iter()) {
|
||||
tree.mov(
|
||||
TreeID::new(peers[node.peer_idx], node.counter),
|
||||
if is_dead {
|
||||
TreeParentId::Deleted
|
||||
} else if node.parent_peer_idx_plus_one == 0 {
|
||||
*node_id,
|
||||
if node.parent_idx_plus_one == 0 {
|
||||
TreeParentId::Root
|
||||
} else {
|
||||
TreeParentId::Node(TreeID::new(
|
||||
peers[node.parent_peer_idx_plus_one - 1],
|
||||
node.parent_counter,
|
||||
))
|
||||
let id = node_ids[node.parent_idx_plus_one - 1];
|
||||
TreeParentId::from(Some(id))
|
||||
},
|
||||
IdFull::new(
|
||||
peers[node.last_set_peer_idx],
|
||||
node.last_set_counter,
|
||||
node.last_set_lamport,
|
||||
),
|
||||
Some(FractionalIndex::from_bytes(node.position)),
|
||||
Some(FractionalIndex::from_bytes(
|
||||
fractional_indexes[node.fractional_index_idx].clone(),
|
||||
)),
|
||||
false,
|
||||
)
|
||||
.unwrap();
|
||||
|
@ -1478,6 +1509,8 @@ mod snapshot {
|
|||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use loro_common::LoroValue;
|
||||
|
||||
use crate::{
|
||||
container::idx::ContainerIdx,
|
||||
state::{ContainerCreationContext, ContainerState},
|
||||
|
@ -1504,11 +1537,9 @@ mod snapshot {
|
|||
tree_state.encode_snapshot_fast(&mut bytes);
|
||||
(bytes, value)
|
||||
};
|
||||
let (decoded_value, bytes) = TreeState::decode_value(&bytes).unwrap();
|
||||
assert_eq!(decoded_value, value);
|
||||
let mut new_tree_state = TreeState::decode_snapshot_fast(
|
||||
ContainerIdx::from_index_and_type(0, loro_common::ContainerType::Tree),
|
||||
(decoded_value, bytes),
|
||||
(LoroValue::Null, &bytes),
|
||||
ContainerCreationContext {
|
||||
configure: &Default::default(),
|
||||
peer: 0,
|
||||
|
|
Loading…
Reference in a new issue