bk: tree impl (untested)

This commit is contained in:
Zixuan Chen 2024-06-14 18:37:51 +08:00
parent 3729deb12d
commit a290f29bb5
No known key found for this signature in database

View file

@ -663,8 +663,16 @@ impl TreeState {
}
pub(crate) fn tree_nodes(&self) -> Vec<TreeNode> {
self.get_all_tree_nodes_under(TreeParentId::Root)
}
pub(crate) fn deleted_tree_nodes(&self) -> Vec<TreeNode> {
self.get_all_tree_nodes_under(TreeParentId::Deleted)
}
pub(crate) fn get_all_tree_nodes_under(&self, root: TreeParentId) -> Vec<TreeNode> {
let mut ans = vec![];
let children = self.children.get(&TreeParentId::Root);
let children = self.children.get(&root);
let mut q = children
.map(|x| {
VecDeque::from_iter(x.iter().enumerate().zip(std::iter::repeat(None::<TreeID>)))
@ -1177,3 +1185,174 @@ mod jitter {
}
}
}
mod snapshot {
use std::io::Read;
use fractional_index::FractionalIndex;
use loro_common::{IdFull, PeerID, TreeID};
use serde_columnar::columnar;
use crate::{encoding::value_register::ValueRegister, state::FastStateSnashot};
use super::{TreeNode, TreeParentId, TreeState};
#[columnar(vec, ser, de, iterable)]
#[derive(Debug, Clone)]
struct EncodedTreeNode {
#[columnar(strategy = "DeltaRle")]
peer_idx: usize,
#[columnar(strategy = "DeltaRle")]
counter: i32,
#[columnar(strategy = "DeltaRle")]
parent_peer_idx: usize,
#[columnar(strategy = "DeltaRle")]
parent_counter: i32,
#[columnar(strategy = "DeltaRle")]
last_set_peer_idx: usize,
#[columnar(strategy = "DeltaRle")]
last_set_counter: i32,
#[columnar(strategy = "DeltaRle")]
last_set_lamport: u32,
#[columnar(strategy = "DeltaRle")]
index: u32,
position: Vec<u8>,
}
#[columnar(ser, de)]
struct EncodedTree {
#[columnar(class = "vec", iter = "EncodedTreeNode")]
nodes: Vec<EncodedTreeNode>,
alive_len: u32,
}
fn encode(
state: &TreeState,
input: Vec<TreeNode>,
alive_len: usize,
) -> (ValueRegister<PeerID>, EncodedTree) {
let mut peers: ValueRegister<PeerID> = ValueRegister::new();
let mut nodes = Vec::with_capacity(input.len());
for node in input {
let n = state.trees.get(&node.id).unwrap();
let last_set_id = n.last_move_op;
nodes.push(EncodedTreeNode {
peer_idx: peers.register(&node.id.peer),
counter: node.id.counter,
// FIXME: this unwrap may panic
parent_peer_idx: peers.register(&node.parent.unwrap().peer),
// FIXME: this unwrap may panic
parent_counter: node.parent.unwrap().counter,
last_set_peer_idx: peers.register(&last_set_id.peer),
last_set_counter: last_set_id.counter,
last_set_lamport: last_set_id.lamport,
index: node.index as u32,
position: node.position.as_bytes().to_vec(),
})
}
(
peers,
EncodedTree {
nodes,
alive_len: alive_len as u32,
},
)
}
impl FastStateSnashot for TreeState {
fn encode_snapshot_fast<W: std::io::prelude::Write>(&mut self, mut w: W) {
let alive_tree_nodes = self.tree_nodes();
let deleted_tree_nodes = self.deleted_tree_nodes();
let alive_len = alive_tree_nodes.len();
let mut all_nodes = alive_tree_nodes;
all_nodes.extend(deleted_tree_nodes);
let (peers, encoded) = encode(self, all_nodes, alive_len);
let peers = peers.unwrap_vec();
leb128::write::unsigned(&mut w, peers.len() as u64).unwrap();
for peer in peers {
w.write_all(&peer.to_le_bytes()).unwrap();
}
let vec = serde_columnar::to_vec(&encoded).unwrap();
w.write_all(&vec).unwrap();
}
fn decode_value(
mut bytes: &[u8],
) -> loro_common::LoroResult<(loro_common::LoroValue, &[u8])> {
// TODO: PREF: FIXME: The performance for decoding the whole tree
// can be improved a lot
let peer_num = leb128::read::unsigned(&mut bytes).unwrap() as usize;
let mut peers = Vec::with_capacity(peer_num);
for _ in 0..peer_num {
let mut buf = [0u8; 8];
bytes.read_exact(&mut buf).unwrap();
peers.push(PeerID::from_le_bytes(buf));
}
let encoded: EncodedTree = serde_columnar::from_bytes(bytes)?;
let mut nodes = Vec::new();
for node in encoded.nodes.into_iter().take(encoded.alive_len as usize) {
let node = TreeNode {
id: TreeID::new(peers[node.peer_idx], node.counter),
parent: Some(TreeID::new(
peers[node.parent_peer_idx],
node.parent_counter,
)),
position: FractionalIndex::from_bytes(node.position),
index: node.index as usize,
};
nodes.push(node.into_value());
}
Ok((nodes.into(), bytes))
}
fn decode_snapshot_fast(
idx: crate::container::idx::ContainerIdx,
(_, mut bytes): (loro_common::LoroValue, &[u8]),
ctx: crate::state::ContainerCreationContext,
) -> loro_common::LoroResult<Self>
where
Self: Sized,
{
let peer_num = leb128::read::unsigned(&mut bytes).unwrap() as usize;
let mut peers = Vec::with_capacity(peer_num);
for _ in 0..peer_num {
let mut buf = [0u8; 8];
bytes.read_exact(&mut buf).unwrap();
peers.push(PeerID::from_le_bytes(buf));
}
let mut tree =
TreeState::new(idx, ctx.peer, ctx.configure.tree_position_jitter.clone());
let encoded: EncodedTree = serde_columnar::from_bytes(bytes)?;
for (i, node) in encoded.nodes.into_iter().enumerate() {
let is_dead = i >= encoded.alive_len as usize;
tree.mov(
TreeID::new(peers[node.peer_idx], node.counter),
if is_dead {
TreeParentId::Deleted
} else {
TreeParentId::Node(TreeID::new(
peers[node.parent_peer_idx],
node.parent_counter,
))
},
IdFull::new(
peers[node.last_set_peer_idx],
node.last_set_counter,
node.last_set_lamport,
),
Some(FractionalIndex::from_bytes(node.position)),
false,
)
.unwrap();
}
Ok(tree)
}
}
}