feat: replace states with container store

This commit is contained in:
Zixuan Chen 2024-06-20 13:17:32 +08:00
parent fcdbaed172
commit a276010128
No known key found for this signature in database
9 changed files with 611 additions and 200 deletions

View file

@ -45,7 +45,7 @@
},
"rust-analyzer.cargo.features": [
"test_utils",
"counter"
// "counter"
],
"editor.defaultFormatter": "rust-lang.rust-analyzer",
"rust-analyzer.server.extraEnv": {

View file

@ -230,7 +230,7 @@ fn encode_header_and_body(mode: EncodeMode, body: Vec<u8>) -> Vec<u8> {
pub(crate) fn export_snapshot(doc: &LoroDoc) -> Vec<u8> {
let body = encode_reordered::encode_snapshot(
&doc.oplog().try_lock().unwrap(),
&doc.app_state().try_lock().unwrap(),
&mut doc.app_state().try_lock().unwrap(),
&Default::default(),
);

View file

@ -408,7 +408,7 @@ fn extract_ops(
})
}
pub(crate) fn encode_snapshot(oplog: &OpLog, state: &DocState, vv: &VersionVector) -> Vec<u8> {
pub(crate) fn encode_snapshot(oplog: &OpLog, state: &mut DocState, vv: &VersionVector) -> Vec<u8> {
assert!(!state.is_in_txn());
assert_eq!(oplog.frontiers(), &state.frontiers);
@ -419,18 +419,21 @@ pub(crate) fn encode_snapshot(oplog: &OpLog, state: &DocState, vv: &VersionVecto
cid_idx_pairs: c_pairs,
container_to_index: container_idx2index,
} = extract_containers_in_order(
&mut state.iter().map(|x| x.container_idx()).chain(
diff_changes
.iter()
.flat_map(|x| {
let c = match x {
Either::Left(c) => c,
Either::Right(c) => c,
};
c.ops.iter()
})
.map(|x| x.container),
),
&mut state
.iter_and_decode_all()
.map(|x| x.container_idx())
.chain(
diff_changes
.iter()
.flat_map(|x| {
let c = match x {
Either::Left(c) => c,
Either::Right(c) => c,
};
c.ops.iter()
})
.map(|x| x.container),
),
&oplog.arena,
);
let mut dep_arena = DepsArena::default();
@ -703,7 +706,7 @@ pub(crate) fn decode_snapshot(doc: &LoroDoc, bytes: &[u8]) -> LoroResult<()> {
fn encode_snapshot_states(
container_idxs: impl Iterator<Item = ContainerIdx>,
state: &DocState,
state: &mut DocState,
oplog: &OpLog,
container_idx2index: &FxHashMap<ContainerIdx, usize>,
registers: Rc<RefCell<EncodedRegisters>>,
@ -731,7 +734,7 @@ fn encode_snapshot_states(
continue;
}
let state = match state.get_state(container) {
let state = match state.get_container_mut(container) {
Some(state) if !state.is_state_empty() => state,
_ => {
states.push(EncodedStateInfo {
@ -879,8 +882,7 @@ fn decode_snapshot_states(
)?;
}
let s = take(&mut state.states);
state.init_with_states_and_version(s, frontiers, oplog, unknown_containers);
state.init_with_states_and_version(frontiers, oplog, unknown_containers);
Ok(())
}
@ -898,9 +900,7 @@ mod encode {
change::{Change, Lamport},
container::{idx::ContainerIdx, tree::tree_op::TreeOp},
encoding::{
value::{
MarkStart, Value, ValueEncodeRegister, ValueKind, ValueWriter,
},
value::{MarkStart, Value, ValueEncodeRegister, ValueKind, ValueWriter},
value_register::ValueRegister,
},
op::{FutureInnerContent, Op},

View file

@ -208,14 +208,20 @@ impl LoroDoc {
#[inline(always)]
pub fn peer_id(&self) -> PeerID {
self.state.lock().unwrap().peer
self.state
.lock()
.unwrap()
.peer
.load(std::sync::atomic::Ordering::Relaxed)
}
#[inline(always)]
pub fn set_peer_id(&self, peer: PeerID) -> LoroResult<()> {
if self.auto_commit.load(Acquire) {
let mut doc_state = self.state.lock().unwrap();
doc_state.peer = peer;
let doc_state = self.state.lock().unwrap();
doc_state
.peer
.store(peer, std::sync::atomic::Ordering::Relaxed);
drop(doc_state);
let txn = self.txn.lock().unwrap().take();
@ -228,7 +234,7 @@ impl LoroDoc {
return Ok(());
}
let mut doc_state = self.state.lock().unwrap();
let doc_state = self.state.lock().unwrap();
if doc_state.is_in_txn() {
return Err(LoroError::TransactionError(
"Cannot change peer id during transaction"
@ -237,7 +243,9 @@ impl LoroDoc {
));
}
doc_state.peer = peer;
doc_state
.peer
.store(peer, std::sync::atomic::Ordering::Relaxed);
Ok(())
}

View file

@ -1,9 +1,13 @@
use std::{
borrow::Cow,
io::Write,
sync::{atomic::AtomicU8, Arc, Mutex, RwLock, Weak},
sync::{
atomic::{AtomicU64, AtomicU8, Ordering},
Arc, Mutex, RwLock, Weak,
},
};
use container_store::ContainerStore;
use enum_as_inner::EnumAsInner;
use enum_dispatch::enum_dispatch;
use fxhash::{FxHashMap, FxHashSet};
@ -43,27 +47,33 @@ pub(crate) use map_state::MapState;
pub(crate) use richtext_state::RichtextState;
pub(crate) use tree_state::{get_meta_value, FractionalIndexGenResult, TreeParentId, TreeState};
use self::unknown_state::UnknownState;
use self::{container_store::ContainerWrapper, unknown_state::UnknownState};
#[cfg(feature = "counter")]
use self::counter_state::CounterState;
use super::{arena::SharedArena, event::InternalDocDiff};
macro_rules! get_or_create {
($doc_state: ident, $idx: expr) => {{
if !$doc_state.states.contains_key(&$idx) {
if !$doc_state.store.contains($idx) {
let state = $doc_state.create_state($idx);
$doc_state.states.insert($idx, state);
$doc_state
.store
.insert($idx, ContainerWrapper::new(state, &$doc_state.arena));
}
$doc_state.states.get_mut(&$idx).unwrap()
$doc_state.store.get_container_mut($idx).unwrap()
}};
}
#[derive(Clone)]
pub struct DocState {
pub(super) peer: PeerID,
pub(super) peer: Arc<AtomicU64>,
pub(super) frontiers: Frontiers,
pub(super) states: FxHashMap<ContainerIdx, State>,
// pub(super) states: FxHashMap<ContainerIdx, State>,
pub(super) store: ContainerStore,
pub(super) arena: SharedArena,
pub(crate) config: Configure,
// resolve event stuff
@ -158,6 +168,27 @@ pub(crate) trait ContainerState: Clone {
fn import_from_snapshot_ops(&mut self, ctx: StateSnapshotDecodeContext) -> LoroResult<()>;
}
impl<T: FastStateSnapshot> FastStateSnapshot for Box<T> {
fn encode_snapshot_fast<W: Write>(&mut self, w: W) {
self.as_mut().encode_snapshot_fast(w)
}
fn decode_value(bytes: &[u8]) -> LoroResult<(LoroValue, &[u8])> {
T::decode_value(bytes)
}
fn decode_snapshot_fast(
idx: ContainerIdx,
v: (LoroValue, &[u8]),
ctx: ContainerCreationContext,
) -> LoroResult<Self>
where
Self: Sized,
{
T::decode_snapshot_fast(idx, v, ctx).map(|x| Box::new(x))
}
}
impl<T: ContainerState> ContainerState for Box<T> {
fn container_idx(&self) -> ContainerIdx {
self.as_ref().container_idx()
@ -251,7 +282,44 @@ pub enum State {
TreeState(Box<TreeState>),
#[cfg(feature = "counter")]
CounterState(Box<counter_state::CounterState>),
UnknownState(Box<UnknownState>),
UnknownState(UnknownState),
}
impl From<ListState> for State {
fn from(s: ListState) -> Self {
Self::ListState(Box::new(s))
}
}
impl From<RichtextState> for State {
fn from(s: RichtextState) -> Self {
Self::RichtextState(Box::new(s))
}
}
impl From<MovableListState> for State {
fn from(s: MovableListState) -> Self {
Self::MovableListState(Box::new(s))
}
}
impl From<MapState> for State {
fn from(s: MapState) -> Self {
Self::MapState(Box::new(s))
}
}
impl From<TreeState> for State {
fn from(s: TreeState) -> Self {
Self::TreeState(Box::new(s))
}
}
#[cfg(feature = "counter")]
impl From<CounterState> for State {
fn from(s: CounterState) -> Self {
Self::CounterState(Box::new(s))
}
}
impl State {
@ -272,7 +340,20 @@ impl State {
}
pub fn new_unknown(idx: ContainerIdx) -> Self {
Self::UnknownState(Box::new(UnknownState::new(idx)))
Self::UnknownState(UnknownState::new(idx))
}
pub fn encode_snapshot_fast<W: Write>(&mut self, mut w: W) {
match self {
State::ListState(s) => s.encode_snapshot_fast(&mut w),
State::MovableListState(s) => s.encode_snapshot_fast(&mut w),
State::MapState(s) => s.encode_snapshot_fast(&mut w),
State::RichtextState(s) => s.encode_snapshot_fast(&mut w),
State::TreeState(s) => s.encode_snapshot_fast(&mut w),
#[cfg(feature = "counter")]
State::CounterState(s) => s.encode_snapshot_fast(&mut w),
State::UnknownState(s) => s.encode_snapshot_fast(&mut w),
}
}
}
@ -286,11 +367,12 @@ impl DocState {
let peer = DefaultRandom.next_u64();
// TODO: maybe we should switch to certain version in oplog?
Arc::new_cyclic(|weak| {
let peer = Arc::new(AtomicU64::new(peer));
Mutex::new(Self {
store: ContainerStore::new(arena.clone(), config.clone(), peer.clone()),
peer,
arena,
frontiers: Frontiers::default(),
states: FxHashMap::default(),
weak_state: weak.clone(),
config,
global_txn,
@ -321,7 +403,10 @@ impl DocState {
}
pub fn refresh_peer_id(&mut self) {
self.peer = DefaultRandom.next_u64();
self.peer.store(
DefaultRandom.next_u64(),
std::sync::atomic::Ordering::Relaxed,
);
}
/// Take all the diffs that are recorded and convert them to events.
@ -395,11 +480,11 @@ impl DocState {
/// It changes the peer id for the future txn on this AppState
#[inline]
pub fn set_peer_id(&mut self, peer: PeerID) {
self.peer = peer;
self.peer.store(peer, std::sync::atomic::Ordering::Relaxed);
}
pub fn peer_id(&self) -> PeerID {
self.peer
self.peer.load(std::sync::atomic::Ordering::Relaxed)
}
/// It's expected that diff only contains [`InternalDiff`]
@ -455,13 +540,7 @@ impl DocState {
let to_create = std::mem::take(&mut to_revive_in_this_layer);
to_revive_in_this_layer = std::mem::take(&mut to_revive_in_next_layer);
for new in to_create {
let state = {
if !self.states.contains_key(&new) {
continue;
}
self.states.get_mut(&new).unwrap()
};
let state = get_or_create!(self, new);
if state.is_state_empty() {
continue;
}
@ -545,13 +624,7 @@ impl DocState {
while !to_revive_in_this_layer.is_empty() || !to_revive_in_next_layer.is_empty() {
let to_create = std::mem::take(&mut to_revive_in_this_layer);
for new in to_create {
let state = {
if !self.states.contains_key(&new) {
continue;
}
self.states.get_mut(&new).unwrap()
};
let state = get_or_create!(self, new);
if state.is_state_empty() {
continue;
}
@ -598,12 +671,8 @@ impl DocState {
self.in_txn = false;
}
pub fn iter(&self) -> impl Iterator<Item = &State> {
self.states.values()
}
pub fn iter_mut(&mut self) -> impl Iterator<Item = &mut State> {
self.states.values_mut()
pub fn iter_and_decode_all(&mut self) -> impl Iterator<Item = &mut State> {
self.store.iter_and_decode_all()
}
pub(crate) fn init_container(
@ -630,22 +699,12 @@ impl DocState {
}
#[inline]
#[allow(unused)]
pub(super) fn get_state_mut(&mut self, idx: ContainerIdx) -> Option<&mut State> {
self.states.get_mut(&idx)
}
#[inline]
#[allow(unused)]
pub(super) fn get_state(&self, idx: ContainerIdx) -> Option<&State> {
self.states.get(&idx)
pub(super) fn get_container_mut(&mut self, idx: ContainerIdx) -> Option<&mut State> {
self.store.get_container_mut(idx)
}
pub(crate) fn get_value_by_idx(&mut self, container_idx: ContainerIdx) -> LoroValue {
self.states
.get_mut(&container_idx)
.map(|x| x.get_value())
.unwrap_or_else(|| container_idx.get_type().default_value())
self.store.get_value(container_idx).unwrap()
}
/// Set the state of the container with the given container idx.
@ -656,18 +715,17 @@ impl DocState {
/// If the state is not empty.
pub(super) fn init_with_states_and_version(
&mut self,
states: FxHashMap<ContainerIdx, State>,
frontiers: Frontiers,
oplog: &OpLog,
unknown_containers: Vec<ContainerIdx>,
) {
assert!(self.states.is_empty(), "overriding states");
self.pre_txn(Default::default(), EventTriggerKind::Import);
self.states = states;
for (idx, state) in self.states.iter() {
for child_id in state.get_child_containers() {
for state in self.store.iter_and_decode_all() {
let idx = state.container_idx();
let s = state;
for child_id in s.get_child_containers() {
let child_idx = self.arena.register_container(&child_id);
self.arena.set_parent(child_idx, Some(*idx));
self.arena.set_parent(child_idx, Some(idx));
}
}
@ -691,13 +749,20 @@ impl DocState {
if self.is_recording() {
let diff: Vec<_> = self
.states
.store
.iter_mut()
.map(|(&idx, state)| InternalContainerDiff {
idx,
bring_back: false,
is_container_deleted: false,
diff: state
.get_state_mut(
idx,
ContainerCreationContext {
configure: &self.config,
peer: self.peer.load(Ordering::Relaxed),
},
)
.to_diff(&self.arena, &self.global_txn, &self.weak_state)
.into(),
})
@ -721,9 +786,18 @@ impl DocState {
id: I,
) -> Option<&mut richtext_state::RichtextState> {
let idx = self.id_to_idx(id, ContainerType::Text);
self.states
.entry(idx)
.or_insert_with(|| State::new_richtext(idx, self.config.text_style_config.clone()))
self.store
.get_or_create(idx, || {
let state = State::new_richtext(idx, self.config.text_style_config.clone());
ContainerWrapper::new(state, &self.arena)
})
.get_state_mut(
idx,
ContainerCreationContext {
configure: &self.config,
peer: self.peer.load(Ordering::Relaxed),
},
)
.as_richtext_state_mut()
.map(|x| &mut **x)
}
@ -732,9 +806,22 @@ impl DocState {
/// if it's str it will use Root container, which will not be None
pub(crate) fn get_tree<I: Into<ContainerIdRaw>>(&mut self, id: I) -> Option<&mut TreeState> {
let idx = self.id_to_idx(id, ContainerType::Tree);
self.states
.entry(idx)
.or_insert_with(|| State::new_richtext(idx, self.config.text_style_config.clone()))
self.store
.get_or_create(idx, || {
let state = State::new_tree(
idx,
self.peer.load(std::sync::atomic::Ordering::Relaxed),
self.config.tree_position_jitter.clone(),
);
ContainerWrapper::new(state, &self.arena)
})
.get_state_mut(
idx,
ContainerCreationContext {
configure: &self.config,
peer: self.peer.load(Ordering::Relaxed),
},
)
.as_tree_state_mut()
.map(|x| &mut **x)
}
@ -756,7 +843,6 @@ impl DocState {
}
};
idx.unwrap()
}
@ -766,15 +852,27 @@ impl DocState {
where
F: FnOnce(&State) -> R,
{
let state = self.states.get(&idx);
if let Some(state) = state {
f(state)
} else {
let state = self.create_state(idx);
let ans = f(&state);
self.states.insert(idx, state);
ans
}
let depth = self.arena.get_depth(idx).unwrap().get() as usize;
let parent = self
.arena
.get_parent(idx)
.and_then(|a| self.arena.get_container_id(a));
let state = self
.store
.get_or_create(idx, || {
ContainerWrapper::new(
create_state_(idx, &self.config, self.peer.load(Ordering::Relaxed)),
&self.arena,
)
})
.get_state_mut(
idx,
ContainerCreationContext {
configure: &self.config,
peer: self.peer.load(Ordering::Relaxed),
},
);
f(state)
}
#[inline(always)]
@ -782,15 +880,20 @@ impl DocState {
where
F: FnOnce(&mut State) -> R,
{
let state = self.states.get_mut(&idx);
if let Some(state) = state {
f(state)
} else {
let mut state = self.create_state(idx);
let ans = f(&mut state);
self.states.insert(idx, state);
ans
}
let state = self
.store
.get_or_create(idx, || {
let state = create_state_(idx, &self.config, self.peer.load(Ordering::Relaxed));
ContainerWrapper::new(state, &self.arena)
})
.get_state_mut(
idx,
ContainerCreationContext {
configure: &self.config,
peer: self.peer.load(Ordering::Relaxed),
},
);
f(state)
}
pub(super) fn is_in_txn(&self) -> bool {
@ -798,7 +901,7 @@ impl DocState {
}
pub fn is_empty(&self) -> bool {
!self.in_txn && self.states.is_empty() && self.arena.can_import_snapshot()
!self.in_txn && self.store.is_empty() && self.arena.can_import_snapshot()
}
pub fn get_deep_value(&mut self) -> LoroValue {
@ -846,7 +949,7 @@ impl DocState {
id: Option<ContainerID>,
) -> LoroValue {
let id = id.unwrap_or_else(|| self.arena.idx_to_id(container).unwrap());
let Some(state) = self.states.get_mut(&container) else {
let Some(state) = self.store.get_container_mut(container) else {
return container.get_type().default_value();
};
let value = state.get_value();
@ -907,7 +1010,7 @@ impl DocState {
}
pub fn get_container_deep_value(&mut self, container: ContainerIdx) -> LoroValue {
let Some(state) = self.states.get_mut(&container) else {
let Some(state) = self.store.get_container_mut(container) else {
return container.get_type().default_value();
};
let value = state.get_value();
@ -1026,14 +1129,14 @@ impl DocState {
}
}
pub(crate) fn get_reachable(&self, id: &ContainerID) -> bool {
pub(crate) fn get_reachable(&mut self, id: &ContainerID) -> bool {
let Some(mut idx) = self.arena.id_to_idx(id) else {
return false;
};
loop {
let id = self.arena.idx_to_id(idx).unwrap();
if let Some(parent_idx) = self.arena.get_parent(idx) {
let Some(parent_state) = self.states.get(&parent_idx) else {
let Some(parent_state) = self.store.get_container_mut(parent_idx) else {
return false;
};
if !parent_state.contains_child(&id) {
@ -1051,13 +1154,13 @@ impl DocState {
}
// the container may be override, so it may return None
fn get_path(&self, idx: ContainerIdx) -> Option<Vec<(ContainerID, Index)>> {
fn get_path(&mut self, idx: ContainerIdx) -> Option<Vec<(ContainerID, Index)>> {
let mut ans = Vec::new();
let mut idx = idx;
loop {
let id = self.arena.idx_to_id(idx).unwrap();
if let Some(parent_idx) = self.arena.get_parent(idx) {
let parent_state = self.states.get(&parent_idx)?;
let parent_state = self.store.get_container_mut(parent_idx)?;
let Some(prop) = parent_state.get_child_index(&id) else {
tracing::info!("Missing in parent children");
return None;
@ -1135,16 +1238,16 @@ impl DocState {
}
let self_id_to_states: FxHashMap<ContainerID, (ContainerIdx, LoroValue)> = self
.states
.values_mut()
.store
.iter_and_decode_all()
.filter_map(|state: &mut State| {
let arena = &self.arena;
get_entries_for_state(arena, state)
})
.collect();
let mut other_id_to_states: FxHashMap<ContainerID, (ContainerIdx, LoroValue)> = other
.states
.values_mut()
.store
.iter_and_decode_all()
.filter_map(|state: &mut State| {
let arena = &other.arena;
get_entries_for_state(arena, state)
@ -1173,52 +1276,32 @@ impl DocState {
}
pub fn log_estimated_size(&self) {
let state_entries_size = self.states.len()
* (std::mem::size_of::<State>() + std::mem::size_of::<ContainerIdx>());
let state_entries_size =
self.store.len() * (std::mem::size_of::<State>() + std::mem::size_of::<ContainerIdx>());
let mut state_size_sum = 0;
for state in self.states.values() {
state_size_sum += state.estimate_size();
}
state_size_sum += self.store.estimate_size();
eprintln!(
"ContainerNum: {}\nEstimated state size: \nEntries: {} \nSum: {}",
self.states.len(),
self.store.len(),
state_entries_size,
state_size_sum
);
}
pub fn create_state(&self, idx: ContainerIdx) -> State {
match idx.get_type() {
ContainerType::Map => State::MapState(Box::new(MapState::new(idx))),
ContainerType::List => State::ListState(Box::new(ListState::new(idx))),
ContainerType::Text => State::RichtextState(Box::new(RichtextState::new(
idx,
self.config.text_style_config.clone(),
))),
ContainerType::Tree => State::TreeState(Box::new(TreeState::new(
idx,
self.peer,
self.config.tree_position_jitter.clone(),
))),
ContainerType::MovableList => {
State::MovableListState(Box::new(MovableListState::new(idx)))
}
#[cfg(feature = "counter")]
ContainerType::Counter => {
State::CounterState(Box::new(counter_state::CounterState::new(idx)))
}
ContainerType::Unknown(_) => State::UnknownState(Box::new(UnknownState::new(idx))),
}
let config = &self.config;
let peer = self.peer.load(std::sync::atomic::Ordering::Relaxed);
create_state_(idx, config, peer)
}
pub fn create_unknown_state(&self, idx: ContainerIdx) -> State {
State::UnknownState(Box::new(UnknownState::new(idx)))
State::UnknownState(UnknownState::new(idx))
}
pub fn get_relative_position(&mut self, pos: &Cursor, use_event_index: bool) -> Option<usize> {
let idx = self.arena.register_container(&pos.container);
let state = self.states.get_mut(&idx)?;
let state = self.store.get_container_mut(idx)?;
if let Some(id) = pos.id {
match state {
State::ListState(s) => s.get_index_of_id(id),
@ -1264,7 +1347,7 @@ impl DocState {
}
for index in path[..path.len() - 1].iter().skip(1) {
let parent_state = self.states.get(&state_idx)?;
let parent_state = self.store.get_container_mut(state_idx)?;
match parent_state {
State::ListState(l) => {
let Some(LoroValue::Container(c)) = l.get(*index.as_seq()?) else {
@ -1297,7 +1380,7 @@ impl DocState {
}
}
let parent_state = self.states.get_mut(&state_idx)?;
let parent_state = self.store.get_container_mut(state_idx)?;
let index = path.last().unwrap();
let value: LoroValue = match parent_state {
State::ListState(l) => l.get(*index.as_seq()?).cloned()?,
@ -1323,6 +1406,28 @@ impl DocState {
}
}
fn create_state_(idx: ContainerIdx, config: &Configure, peer: u64) -> State {
match idx.get_type() {
ContainerType::Map => State::MapState(Box::new(MapState::new(idx))),
ContainerType::List => State::ListState(Box::new(ListState::new(idx))),
ContainerType::Text => State::RichtextState(Box::new(RichtextState::new(
idx,
config.text_style_config.clone(),
))),
ContainerType::Tree => State::TreeState(Box::new(TreeState::new(
idx,
peer,
config.tree_position_jitter.clone(),
))),
ContainerType::MovableList => State::MovableListState(Box::new(MovableListState::new(idx))),
#[cfg(feature = "counter")]
ContainerType::Counter => {
State::CounterState(Box::new(counter_state::CounterState::new(idx)))
}
ContainerType::Unknown(_) => State::UnknownState(UnknownState::new(idx)),
}
}
fn trigger_on_new_container(state_diff: &Diff, mut listener: impl FnMut(ContainerIdx)) {
match state_diff {
Diff::List(list) => {

View file

@ -1,10 +1,28 @@
use crate::{arena::SharedArena, container::idx::ContainerIdx};
use std::{
io::Write,
mem,
sync::{atomic::AtomicU64, Arc},
};
use crate::{
arena::SharedArena,
configure::Configure,
container::idx::ContainerIdx,
state::{FastStateSnapshot, RichtextState},
};
use bytes::Bytes;
use encode::{decode_cids, CidOffsetEncoder};
use fxhash::FxHashMap;
use loro_common::{LoroResult, LoroValue};
use loro_common::{ContainerID, ContainerType, LoroResult, LoroValue};
use tracing::field::Visit;
use super::{ContainerState, State};
use super::{
unknown_state::UnknownState, ContainerCreationContext, ContainerState, ListState, MapState,
MovableListState, State, TreeState,
};
#[cfg(feature = "counter")]
use super::counter_state::CounterState;
/// Encoding Schema for Container Store
///
@ -39,23 +57,63 @@ use super::{ContainerState, State};
/// │ │
/// │ │
/// └───────────────────────────────────────────────────┘
#[derive(Clone)]
pub(crate) struct ContainerStore {
arena: SharedArena,
store: FxHashMap<ContainerIdx, ContainerWrapper>,
conf: Configure,
peer: Arc<AtomicU64>,
}
impl std::fmt::Debug for ContainerStore {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ContainerStore")
.field("store", &self.store)
.finish()
}
}
impl ContainerStore {
pub fn get_container(&mut self, idx: ContainerIdx) -> Option<&mut ContainerWrapper> {
self.store.get_mut(&idx)
pub fn new(arena: SharedArena, conf: Configure, peer: Arc<AtomicU64>) -> Self {
ContainerStore {
arena,
store: Default::default(),
conf,
peer,
}
}
pub fn get_container_mut(&mut self, idx: ContainerIdx) -> Option<&mut State> {
self.store.get_mut(&idx).map(|x| {
x.get_state_mut(
idx,
ContainerCreationContext {
configure: &self.conf,
peer: self.peer.load(std::sync::atomic::Ordering::Relaxed),
},
)
})
}
pub fn get_container(&mut self, idx: ContainerIdx) -> Option<&State> {
self.store.get_mut(&idx).map(|x| {
x.get_state(
idx,
ContainerCreationContext {
configure: &self.conf,
peer: self.peer.load(std::sync::atomic::Ordering::Relaxed),
},
)
})
}
pub fn get_value(&mut self, idx: ContainerIdx) -> Option<LoroValue> {
self.store.get_mut(&idx).and_then(|c| c.get_value())
self.store.get_mut(&idx).map(|c| c.get_value())
}
pub fn encode(&self) -> Bytes {
pub fn encode(&mut self) -> Bytes {
let mut id_bytes_pairs = Vec::with_capacity(self.store.len());
for (idx, container) in self.store.iter() {
for (idx, container) in self.store.iter_mut() {
let id = self.arena.get_container_id(*idx).unwrap();
id_bytes_pairs.push((id, container.encode()))
}
@ -98,47 +156,259 @@ impl ContainerStore {
Ok(())
}
pub fn iter_and_decode_all(&mut self) -> impl Iterator<Item = &mut State> {
self.store.iter_mut().map(|(idx, v)| {
v.get_state_mut(
*idx,
ContainerCreationContext {
configure: &self.conf,
peer: self.peer.load(std::sync::atomic::Ordering::Relaxed),
},
)
})
}
pub fn is_empty(&self) -> bool {
self.store.is_empty()
}
pub fn len(&self) -> usize {
self.store.len()
}
pub fn iter(&self) -> impl Iterator<Item = (&ContainerIdx, &ContainerWrapper)> {
self.store.iter()
}
pub fn iter_mut(&mut self) -> impl Iterator<Item = (&ContainerIdx, &mut ContainerWrapper)> {
self.store.iter_mut()
}
pub(super) fn get_or_create(
&mut self,
idx: ContainerIdx,
f: impl FnOnce() -> ContainerWrapper,
) -> &mut ContainerWrapper {
let s = self.store.entry(idx).or_insert_with(f);
s
}
pub(crate) fn estimate_size(&self) -> usize {
self.store.len() * 4
+ self
.store
.values()
.map(|v| v.estimate_size())
.sum::<usize>()
}
pub(super) fn contains(&self, idx: ContainerIdx) -> bool {
self.store.contains_key(&idx)
}
pub(super) fn insert(&mut self, idx: ContainerIdx, state: ContainerWrapper) {
self.store.insert(idx, state);
}
}
pub(crate) enum ContainerWrapper {
Bytes(Bytes),
PartialParsed { bytes: Bytes, value: LoroValue },
Parsed { bytes: Bytes, state: State },
State(State),
#[derive(Clone, Debug)]
pub(crate) struct ContainerWrapper {
depth: usize,
kind: ContainerType,
parent: Option<ContainerID>,
/// The possible combinations of is_some() are:
///
/// 1. bytes: new container decoded from bytes
/// 2. bytes + value: new container decoded from bytes, with value decoded
/// 3. state + bytes + value: new container decoded from bytes, with value and state decoded
/// 4. state
bytes: Option<Bytes>,
value: Option<LoroValue>,
bytes_offset_for_state: Option<usize>,
state: Option<State>,
}
impl ContainerWrapper {
pub fn get_state(&mut self) -> Option<&State> {
match self {
ContainerWrapper::Bytes(_) => todo!(),
ContainerWrapper::PartialParsed { bytes, value } => todo!(),
ContainerWrapper::Parsed { bytes, state } => todo!(),
ContainerWrapper::State(_) => todo!(),
pub fn new(state: State, arena: &SharedArena) -> Self {
let idx = state.container_idx();
let parent = arena
.get_parent(idx)
.and_then(|p| arena.get_container_id(p));
let depth = arena.get_depth(idx).unwrap().get() as usize;
Self {
depth,
parent,
kind: idx.get_type(),
state: Some(state),
bytes: None,
value: None,
bytes_offset_for_state: None,
}
}
pub fn get_value(&mut self) -> Option<LoroValue> {
match self {
ContainerWrapper::Bytes(bytes) => todo!("partial parse"),
ContainerWrapper::PartialParsed { bytes, value } => Some(value.clone()),
ContainerWrapper::Parsed { bytes, state } => Some(state.get_value()),
ContainerWrapper::State(s) => Some(s.get_value()),
/// It will not decode the state if it is not decoded
pub fn try_get_state(&self) -> Option<&State> {
self.state.as_ref()
}
/// It will decode the state if it is not decoded
pub fn get_state(&mut self, idx: ContainerIdx, ctx: ContainerCreationContext) -> &State {
self.decode_state(idx, ctx).unwrap();
self.state.as_ref().expect("ContainerWrapper is empty")
}
/// It will decode the state if it is not decoded
pub fn get_state_mut(
&mut self,
idx: ContainerIdx,
ctx: ContainerCreationContext,
) -> &mut State {
self.decode_state(idx, ctx).unwrap();
self.bytes = None;
self.value = None;
self.state.as_mut().unwrap()
}
pub fn get_value(&mut self) -> LoroValue {
if let Some(v) = self.value.as_ref() {
return v.clone();
}
self.decode_value().unwrap();
if self.value.is_none() {
return self.state.as_mut().unwrap().get_value();
}
self.value.as_ref().unwrap().clone()
}
pub fn encode(&mut self) -> Bytes {
if let Some(bytes) = self.bytes.as_ref() {
return bytes.clone();
}
// ContainerType
// Depth
// ParentID
// StateSnapshot
let mut output = Vec::new();
output.push(self.kind.to_u8());
leb128::write::unsigned(&mut output, self.depth as u64).unwrap();
postcard::to_io(&self.parent, &mut output).unwrap();
self.state
.as_mut()
.unwrap()
.encode_snapshot_fast(&mut output);
output.into()
}
pub fn new_from_bytes(b: Bytes) -> Self {
let src: &[u8] = &b;
let bytes: &[u8] = &b;
let kind = ContainerType::try_from_u8(bytes[0]).unwrap();
let mut bytes = &bytes[1..];
let depth = leb128::read::unsigned(&mut bytes).unwrap();
let (parent, bytes) = postcard::take_from_bytes(bytes).unwrap();
// SAFETY: bytes is a slice of b
let size = unsafe { bytes.as_ptr().offset_from(src.as_ptr()) };
Self {
depth: depth as usize,
kind,
parent,
state: None,
value: None,
bytes: Some(b.slice(size as usize..)),
bytes_offset_for_state: None,
}
}
pub fn encode(&self) -> Bytes {
todo!("encode container")
pub fn ensure_value(&mut self) -> &LoroValue {
if self.value.is_some() {
} else if self.state.is_some() {
let value = self.state.as_mut().unwrap().get_value();
self.value = Some(value);
} else {
self.decode_value().unwrap();
}
self.value.as_ref().unwrap()
}
pub fn new_from_bytes(bytes: Bytes) -> Self {
ContainerWrapper::Bytes(bytes)
fn decode_value(&mut self) -> LoroResult<()> {
let Some(b) = self.bytes.as_ref() else {
return Ok(());
};
let (v, rest) = match self.kind {
ContainerType::Text => RichtextState::decode_value(b)?,
ContainerType::Map => MapState::decode_value(b)?,
ContainerType::List => ListState::decode_value(b)?,
ContainerType::MovableList => MovableListState::decode_value(b)?,
ContainerType::Tree => TreeState::decode_value(b)?,
#[cfg(feature = "counter")]
ContainerType::Counter => CounterState::decode_value(b)?,
ContainerType::Unknown(_) => UnknownState::decode_value(b)?,
};
self.value = Some(v);
// SAFETY: rest is a slice of b
let offset = unsafe { rest.as_ptr().offset_from(b.as_ptr()) };
self.bytes_offset_for_state = Some(offset as usize);
Ok(())
}
fn decode_state(&mut self, idx: ContainerIdx, ctx: ContainerCreationContext) -> LoroResult<()> {
if self.state.is_some() {
return Ok(());
}
if self.value.is_none() {
self.decode_value()?;
}
let b = self.bytes.as_ref().unwrap();
let offset = self.bytes_offset_for_state.unwrap();
let b = &b[offset..];
let v = self.value.as_ref().unwrap().clone();
let state: State = match self.kind {
ContainerType::Text => RichtextState::decode_snapshot_fast(idx, (v, b), ctx)?.into(),
ContainerType::Map => MapState::decode_snapshot_fast(idx, (v, b), ctx)?.into(),
ContainerType::List => ListState::decode_snapshot_fast(idx, (v, b), ctx)?.into(),
ContainerType::MovableList => {
MovableListState::decode_snapshot_fast(idx, (v, b), ctx)?.into()
}
ContainerType::Tree => TreeState::decode_snapshot_fast(idx, (v, b), ctx)?.into(),
#[cfg(feature = "counter")]
ContainerType::Counter => CounterState::decode_snapshot_fast(idx, (v, b), ctx)?.into(),
ContainerType::Unknown(_) => {
UnknownState::decode_snapshot_fast(idx, (v, b), ctx)?.into()
}
};
self.state = Some(state);
Ok(())
}
pub fn estimate_size(&self) -> usize {
if let Some(bytes) = self.bytes.as_ref() {
return bytes.len();
}
self.state.as_ref().unwrap().estimate_size()
}
pub(crate) fn is_state_empty(&self) -> bool {
if let Some(state) = self.state.as_ref() {
return state.is_state_empty();
}
// FIXME: it's not very accurate...
self.bytes.as_ref().unwrap().len() > 10
}
}
mod encode {
use loro_common::{
ContainerID, ContainerType, Counter, InternalString, LoroError, LoroResult,
};
use loro_common::{ContainerID, ContainerType, Counter, InternalString, LoroError, LoroResult};
use serde::{Deserialize, Serialize};
use serde_columnar::{
AnyRleDecoder, AnyRleEncoder, BoolRleDecoder, BoolRleEncoder, DeltaRleDecoder,

View file

@ -105,3 +105,32 @@ impl ContainerState for UnknownState {
Ok(())
}
}
mod snapshot {
use loro_common::LoroValue;
use crate::state::FastStateSnapshot;
use super::UnknownState;
impl FastStateSnapshot for UnknownState {
fn encode_snapshot_fast<W: std::io::prelude::Write>(&mut self, mut w: W) {
w.write_all(&[]);
}
fn decode_value(bytes: &[u8]) -> loro_common::LoroResult<(loro_common::LoroValue, &[u8])> {
Ok((LoroValue::Null, bytes))
}
fn decode_snapshot_fast(
idx: crate::container::idx::ContainerIdx,
v: (loro_common::LoroValue, &[u8]),
ctx: crate::state::ContainerCreationContext,
) -> loro_common::LoroResult<Self>
where
Self: Sized,
{
Ok(UnknownState::new(idx))
}
}
}

View file

@ -35,7 +35,7 @@ use super::{
event::{InternalContainerDiff, InternalDocDiff},
handler::{ListHandler, MapHandler, TextHandler, TreeHandler},
oplog::OpLog,
state::{DocState, State},
state::DocState,
};
pub type OnCommitFn = Box<dyn FnOnce(&Arc<Mutex<DocState>>) + Sync + Send>;
@ -246,7 +246,7 @@ impl Transaction {
state_lock.start_txn(origin, crate::event::EventTriggerKind::Local);
let arena = state_lock.arena.clone();
let frontiers = state_lock.frontiers.clone();
let peer = state_lock.peer;
let peer = state_lock.peer.load(std::sync::atomic::Ordering::Relaxed);
let next_counter = oplog_lock.next_id(peer).counter;
let next_lamport = oplog_lock.dag.frontiers_to_next_lamport(&frontiers);
drop(state_lock);
@ -372,8 +372,19 @@ impl Transaction {
) -> LoroResult<()> {
if Arc::as_ptr(&self.state) != Weak::as_ptr(state_ref) {
return Err(LoroError::UnmatchedContext {
expected: self.state.lock().unwrap().peer,
found: state_ref.upgrade().unwrap().lock().unwrap().peer,
expected: self
.state
.lock()
.unwrap()
.peer
.load(std::sync::atomic::Ordering::Relaxed),
found: state_ref
.upgrade()
.unwrap()
.lock()
.unwrap()
.peer
.load(std::sync::atomic::Ordering::Relaxed),
});
}
@ -469,29 +480,12 @@ impl Transaction {
.unwrap()
}
pub fn get_value_by_idx(&self, idx: ContainerIdx) -> LoroValue {
self.state.lock().unwrap().get_value_by_idx(idx)
}
#[allow(unused)]
pub(crate) fn with_state<F, R>(&self, idx: ContainerIdx, f: F) -> R
where
F: FnOnce(&State) -> R,
{
let state = self.state.lock().unwrap();
f(state.get_state(idx).unwrap())
}
pub fn next_id(&self) -> ID {
ID {
peer: self.peer,
counter: self.next_counter,
}
}
pub fn is_empty(&self) -> bool {
self.local_ops.is_empty()
}
}
impl Drop for Transaction {

View file

@ -40,6 +40,11 @@ fn auto_commit_list() {
assert_eq!(value.to_json_value(), json!({"list": ["world", "hello"]}))
}
#[ctor::ctor]
fn init() {
dev_utils::setup_test_log();
}
#[test]
fn auto_commit_with_checkout() {
let doc = LoroDoc::default();