From a276010128f1e982f941bdf3df2e63ab4fa5444c Mon Sep 17 00:00:00 2001 From: Zixuan Chen Date: Thu, 20 Jun 2024 13:17:32 +0800 Subject: [PATCH] feat: replace states with container store --- .vscode/settings.json | 2 +- crates/loro-internal/src/encoding.rs | 2 +- .../src/encoding/encode_reordered.rs | 40 +- crates/loro-internal/src/loro.rs | 18 +- crates/loro-internal/src/state.rs | 345 ++++++++++++------ .../src/state/container_store.rs | 334 +++++++++++++++-- .../loro-internal/src/state/unknown_state.rs | 29 ++ crates/loro-internal/src/txn.rs | 36 +- crates/loro-internal/tests/autocommit.rs | 5 + 9 files changed, 611 insertions(+), 200 deletions(-) diff --git a/.vscode/settings.json b/.vscode/settings.json index 54f73501..dd3f21c4 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -45,7 +45,7 @@ }, "rust-analyzer.cargo.features": [ "test_utils", - "counter" + // "counter" ], "editor.defaultFormatter": "rust-lang.rust-analyzer", "rust-analyzer.server.extraEnv": { diff --git a/crates/loro-internal/src/encoding.rs b/crates/loro-internal/src/encoding.rs index f5eeca96..65a69889 100644 --- a/crates/loro-internal/src/encoding.rs +++ b/crates/loro-internal/src/encoding.rs @@ -230,7 +230,7 @@ fn encode_header_and_body(mode: EncodeMode, body: Vec) -> Vec { pub(crate) fn export_snapshot(doc: &LoroDoc) -> Vec { let body = encode_reordered::encode_snapshot( &doc.oplog().try_lock().unwrap(), - &doc.app_state().try_lock().unwrap(), + &mut doc.app_state().try_lock().unwrap(), &Default::default(), ); diff --git a/crates/loro-internal/src/encoding/encode_reordered.rs b/crates/loro-internal/src/encoding/encode_reordered.rs index 34d3d16d..c80224ee 100644 --- a/crates/loro-internal/src/encoding/encode_reordered.rs +++ b/crates/loro-internal/src/encoding/encode_reordered.rs @@ -408,7 +408,7 @@ fn extract_ops( }) } -pub(crate) fn encode_snapshot(oplog: &OpLog, state: &DocState, vv: &VersionVector) -> Vec { +pub(crate) fn encode_snapshot(oplog: &OpLog, state: &mut DocState, vv: &VersionVector) -> Vec { assert!(!state.is_in_txn()); assert_eq!(oplog.frontiers(), &state.frontiers); @@ -419,18 +419,21 @@ pub(crate) fn encode_snapshot(oplog: &OpLog, state: &DocState, vv: &VersionVecto cid_idx_pairs: c_pairs, container_to_index: container_idx2index, } = extract_containers_in_order( - &mut state.iter().map(|x| x.container_idx()).chain( - diff_changes - .iter() - .flat_map(|x| { - let c = match x { - Either::Left(c) => c, - Either::Right(c) => c, - }; - c.ops.iter() - }) - .map(|x| x.container), - ), + &mut state + .iter_and_decode_all() + .map(|x| x.container_idx()) + .chain( + diff_changes + .iter() + .flat_map(|x| { + let c = match x { + Either::Left(c) => c, + Either::Right(c) => c, + }; + c.ops.iter() + }) + .map(|x| x.container), + ), &oplog.arena, ); let mut dep_arena = DepsArena::default(); @@ -703,7 +706,7 @@ pub(crate) fn decode_snapshot(doc: &LoroDoc, bytes: &[u8]) -> LoroResult<()> { fn encode_snapshot_states( container_idxs: impl Iterator, - state: &DocState, + state: &mut DocState, oplog: &OpLog, container_idx2index: &FxHashMap, registers: Rc>, @@ -731,7 +734,7 @@ fn encode_snapshot_states( continue; } - let state = match state.get_state(container) { + let state = match state.get_container_mut(container) { Some(state) if !state.is_state_empty() => state, _ => { states.push(EncodedStateInfo { @@ -879,8 +882,7 @@ fn decode_snapshot_states( )?; } - let s = take(&mut state.states); - state.init_with_states_and_version(s, frontiers, oplog, unknown_containers); + state.init_with_states_and_version(frontiers, oplog, unknown_containers); Ok(()) } @@ -898,9 +900,7 @@ mod encode { change::{Change, Lamport}, container::{idx::ContainerIdx, tree::tree_op::TreeOp}, encoding::{ - value::{ - MarkStart, Value, ValueEncodeRegister, ValueKind, ValueWriter, - }, + value::{MarkStart, Value, ValueEncodeRegister, ValueKind, ValueWriter}, value_register::ValueRegister, }, op::{FutureInnerContent, Op}, diff --git a/crates/loro-internal/src/loro.rs b/crates/loro-internal/src/loro.rs index c503fcf6..173991ba 100644 --- a/crates/loro-internal/src/loro.rs +++ b/crates/loro-internal/src/loro.rs @@ -208,14 +208,20 @@ impl LoroDoc { #[inline(always)] pub fn peer_id(&self) -> PeerID { - self.state.lock().unwrap().peer + self.state + .lock() + .unwrap() + .peer + .load(std::sync::atomic::Ordering::Relaxed) } #[inline(always)] pub fn set_peer_id(&self, peer: PeerID) -> LoroResult<()> { if self.auto_commit.load(Acquire) { - let mut doc_state = self.state.lock().unwrap(); - doc_state.peer = peer; + let doc_state = self.state.lock().unwrap(); + doc_state + .peer + .store(peer, std::sync::atomic::Ordering::Relaxed); drop(doc_state); let txn = self.txn.lock().unwrap().take(); @@ -228,7 +234,7 @@ impl LoroDoc { return Ok(()); } - let mut doc_state = self.state.lock().unwrap(); + let doc_state = self.state.lock().unwrap(); if doc_state.is_in_txn() { return Err(LoroError::TransactionError( "Cannot change peer id during transaction" @@ -237,7 +243,9 @@ impl LoroDoc { )); } - doc_state.peer = peer; + doc_state + .peer + .store(peer, std::sync::atomic::Ordering::Relaxed); Ok(()) } diff --git a/crates/loro-internal/src/state.rs b/crates/loro-internal/src/state.rs index 1c272b50..d1c9fab1 100644 --- a/crates/loro-internal/src/state.rs +++ b/crates/loro-internal/src/state.rs @@ -1,9 +1,13 @@ use std::{ borrow::Cow, io::Write, - sync::{atomic::AtomicU8, Arc, Mutex, RwLock, Weak}, + sync::{ + atomic::{AtomicU64, AtomicU8, Ordering}, + Arc, Mutex, RwLock, Weak, + }, }; +use container_store::ContainerStore; use enum_as_inner::EnumAsInner; use enum_dispatch::enum_dispatch; use fxhash::{FxHashMap, FxHashSet}; @@ -43,27 +47,33 @@ pub(crate) use map_state::MapState; pub(crate) use richtext_state::RichtextState; pub(crate) use tree_state::{get_meta_value, FractionalIndexGenResult, TreeParentId, TreeState}; -use self::unknown_state::UnknownState; +use self::{container_store::ContainerWrapper, unknown_state::UnknownState}; + +#[cfg(feature = "counter")] +use self::counter_state::CounterState; use super::{arena::SharedArena, event::InternalDocDiff}; macro_rules! get_or_create { ($doc_state: ident, $idx: expr) => {{ - if !$doc_state.states.contains_key(&$idx) { + if !$doc_state.store.contains($idx) { let state = $doc_state.create_state($idx); - $doc_state.states.insert($idx, state); + $doc_state + .store + .insert($idx, ContainerWrapper::new(state, &$doc_state.arena)); } - $doc_state.states.get_mut(&$idx).unwrap() + $doc_state.store.get_container_mut($idx).unwrap() }}; } #[derive(Clone)] pub struct DocState { - pub(super) peer: PeerID, + pub(super) peer: Arc, pub(super) frontiers: Frontiers, - pub(super) states: FxHashMap, + // pub(super) states: FxHashMap, + pub(super) store: ContainerStore, pub(super) arena: SharedArena, pub(crate) config: Configure, // resolve event stuff @@ -158,6 +168,27 @@ pub(crate) trait ContainerState: Clone { fn import_from_snapshot_ops(&mut self, ctx: StateSnapshotDecodeContext) -> LoroResult<()>; } +impl FastStateSnapshot for Box { + fn encode_snapshot_fast(&mut self, w: W) { + self.as_mut().encode_snapshot_fast(w) + } + + fn decode_value(bytes: &[u8]) -> LoroResult<(LoroValue, &[u8])> { + T::decode_value(bytes) + } + + fn decode_snapshot_fast( + idx: ContainerIdx, + v: (LoroValue, &[u8]), + ctx: ContainerCreationContext, + ) -> LoroResult + where + Self: Sized, + { + T::decode_snapshot_fast(idx, v, ctx).map(|x| Box::new(x)) + } +} + impl ContainerState for Box { fn container_idx(&self) -> ContainerIdx { self.as_ref().container_idx() @@ -251,7 +282,44 @@ pub enum State { TreeState(Box), #[cfg(feature = "counter")] CounterState(Box), - UnknownState(Box), + UnknownState(UnknownState), +} + +impl From for State { + fn from(s: ListState) -> Self { + Self::ListState(Box::new(s)) + } +} + +impl From for State { + fn from(s: RichtextState) -> Self { + Self::RichtextState(Box::new(s)) + } +} + +impl From for State { + fn from(s: MovableListState) -> Self { + Self::MovableListState(Box::new(s)) + } +} + +impl From for State { + fn from(s: MapState) -> Self { + Self::MapState(Box::new(s)) + } +} + +impl From for State { + fn from(s: TreeState) -> Self { + Self::TreeState(Box::new(s)) + } +} + +#[cfg(feature = "counter")] +impl From for State { + fn from(s: CounterState) -> Self { + Self::CounterState(Box::new(s)) + } } impl State { @@ -272,7 +340,20 @@ impl State { } pub fn new_unknown(idx: ContainerIdx) -> Self { - Self::UnknownState(Box::new(UnknownState::new(idx))) + Self::UnknownState(UnknownState::new(idx)) + } + + pub fn encode_snapshot_fast(&mut self, mut w: W) { + match self { + State::ListState(s) => s.encode_snapshot_fast(&mut w), + State::MovableListState(s) => s.encode_snapshot_fast(&mut w), + State::MapState(s) => s.encode_snapshot_fast(&mut w), + State::RichtextState(s) => s.encode_snapshot_fast(&mut w), + State::TreeState(s) => s.encode_snapshot_fast(&mut w), + #[cfg(feature = "counter")] + State::CounterState(s) => s.encode_snapshot_fast(&mut w), + State::UnknownState(s) => s.encode_snapshot_fast(&mut w), + } } } @@ -286,11 +367,12 @@ impl DocState { let peer = DefaultRandom.next_u64(); // TODO: maybe we should switch to certain version in oplog? Arc::new_cyclic(|weak| { + let peer = Arc::new(AtomicU64::new(peer)); Mutex::new(Self { + store: ContainerStore::new(arena.clone(), config.clone(), peer.clone()), peer, arena, frontiers: Frontiers::default(), - states: FxHashMap::default(), weak_state: weak.clone(), config, global_txn, @@ -321,7 +403,10 @@ impl DocState { } pub fn refresh_peer_id(&mut self) { - self.peer = DefaultRandom.next_u64(); + self.peer.store( + DefaultRandom.next_u64(), + std::sync::atomic::Ordering::Relaxed, + ); } /// Take all the diffs that are recorded and convert them to events. @@ -395,11 +480,11 @@ impl DocState { /// It changes the peer id for the future txn on this AppState #[inline] pub fn set_peer_id(&mut self, peer: PeerID) { - self.peer = peer; + self.peer.store(peer, std::sync::atomic::Ordering::Relaxed); } pub fn peer_id(&self) -> PeerID { - self.peer + self.peer.load(std::sync::atomic::Ordering::Relaxed) } /// It's expected that diff only contains [`InternalDiff`] @@ -455,13 +540,7 @@ impl DocState { let to_create = std::mem::take(&mut to_revive_in_this_layer); to_revive_in_this_layer = std::mem::take(&mut to_revive_in_next_layer); for new in to_create { - let state = { - if !self.states.contains_key(&new) { - continue; - } - self.states.get_mut(&new).unwrap() - }; - + let state = get_or_create!(self, new); if state.is_state_empty() { continue; } @@ -545,13 +624,7 @@ impl DocState { while !to_revive_in_this_layer.is_empty() || !to_revive_in_next_layer.is_empty() { let to_create = std::mem::take(&mut to_revive_in_this_layer); for new in to_create { - let state = { - if !self.states.contains_key(&new) { - continue; - } - self.states.get_mut(&new).unwrap() - }; - + let state = get_or_create!(self, new); if state.is_state_empty() { continue; } @@ -598,12 +671,8 @@ impl DocState { self.in_txn = false; } - pub fn iter(&self) -> impl Iterator { - self.states.values() - } - - pub fn iter_mut(&mut self) -> impl Iterator { - self.states.values_mut() + pub fn iter_and_decode_all(&mut self) -> impl Iterator { + self.store.iter_and_decode_all() } pub(crate) fn init_container( @@ -630,22 +699,12 @@ impl DocState { } #[inline] - #[allow(unused)] - pub(super) fn get_state_mut(&mut self, idx: ContainerIdx) -> Option<&mut State> { - self.states.get_mut(&idx) - } - - #[inline] - #[allow(unused)] - pub(super) fn get_state(&self, idx: ContainerIdx) -> Option<&State> { - self.states.get(&idx) + pub(super) fn get_container_mut(&mut self, idx: ContainerIdx) -> Option<&mut State> { + self.store.get_container_mut(idx) } pub(crate) fn get_value_by_idx(&mut self, container_idx: ContainerIdx) -> LoroValue { - self.states - .get_mut(&container_idx) - .map(|x| x.get_value()) - .unwrap_or_else(|| container_idx.get_type().default_value()) + self.store.get_value(container_idx).unwrap() } /// Set the state of the container with the given container idx. @@ -656,18 +715,17 @@ impl DocState { /// If the state is not empty. pub(super) fn init_with_states_and_version( &mut self, - states: FxHashMap, frontiers: Frontiers, oplog: &OpLog, unknown_containers: Vec, ) { - assert!(self.states.is_empty(), "overriding states"); self.pre_txn(Default::default(), EventTriggerKind::Import); - self.states = states; - for (idx, state) in self.states.iter() { - for child_id in state.get_child_containers() { + for state in self.store.iter_and_decode_all() { + let idx = state.container_idx(); + let s = state; + for child_id in s.get_child_containers() { let child_idx = self.arena.register_container(&child_id); - self.arena.set_parent(child_idx, Some(*idx)); + self.arena.set_parent(child_idx, Some(idx)); } } @@ -691,13 +749,20 @@ impl DocState { if self.is_recording() { let diff: Vec<_> = self - .states + .store .iter_mut() .map(|(&idx, state)| InternalContainerDiff { idx, bring_back: false, is_container_deleted: false, diff: state + .get_state_mut( + idx, + ContainerCreationContext { + configure: &self.config, + peer: self.peer.load(Ordering::Relaxed), + }, + ) .to_diff(&self.arena, &self.global_txn, &self.weak_state) .into(), }) @@ -721,9 +786,18 @@ impl DocState { id: I, ) -> Option<&mut richtext_state::RichtextState> { let idx = self.id_to_idx(id, ContainerType::Text); - self.states - .entry(idx) - .or_insert_with(|| State::new_richtext(idx, self.config.text_style_config.clone())) + self.store + .get_or_create(idx, || { + let state = State::new_richtext(idx, self.config.text_style_config.clone()); + ContainerWrapper::new(state, &self.arena) + }) + .get_state_mut( + idx, + ContainerCreationContext { + configure: &self.config, + peer: self.peer.load(Ordering::Relaxed), + }, + ) .as_richtext_state_mut() .map(|x| &mut **x) } @@ -732,9 +806,22 @@ impl DocState { /// if it's str it will use Root container, which will not be None pub(crate) fn get_tree>(&mut self, id: I) -> Option<&mut TreeState> { let idx = self.id_to_idx(id, ContainerType::Tree); - self.states - .entry(idx) - .or_insert_with(|| State::new_richtext(idx, self.config.text_style_config.clone())) + self.store + .get_or_create(idx, || { + let state = State::new_tree( + idx, + self.peer.load(std::sync::atomic::Ordering::Relaxed), + self.config.tree_position_jitter.clone(), + ); + ContainerWrapper::new(state, &self.arena) + }) + .get_state_mut( + idx, + ContainerCreationContext { + configure: &self.config, + peer: self.peer.load(Ordering::Relaxed), + }, + ) .as_tree_state_mut() .map(|x| &mut **x) } @@ -756,7 +843,6 @@ impl DocState { } }; - idx.unwrap() } @@ -766,15 +852,27 @@ impl DocState { where F: FnOnce(&State) -> R, { - let state = self.states.get(&idx); - if let Some(state) = state { - f(state) - } else { - let state = self.create_state(idx); - let ans = f(&state); - self.states.insert(idx, state); - ans - } + let depth = self.arena.get_depth(idx).unwrap().get() as usize; + let parent = self + .arena + .get_parent(idx) + .and_then(|a| self.arena.get_container_id(a)); + let state = self + .store + .get_or_create(idx, || { + ContainerWrapper::new( + create_state_(idx, &self.config, self.peer.load(Ordering::Relaxed)), + &self.arena, + ) + }) + .get_state_mut( + idx, + ContainerCreationContext { + configure: &self.config, + peer: self.peer.load(Ordering::Relaxed), + }, + ); + f(state) } #[inline(always)] @@ -782,15 +880,20 @@ impl DocState { where F: FnOnce(&mut State) -> R, { - let state = self.states.get_mut(&idx); - if let Some(state) = state { - f(state) - } else { - let mut state = self.create_state(idx); - let ans = f(&mut state); - self.states.insert(idx, state); - ans - } + let state = self + .store + .get_or_create(idx, || { + let state = create_state_(idx, &self.config, self.peer.load(Ordering::Relaxed)); + ContainerWrapper::new(state, &self.arena) + }) + .get_state_mut( + idx, + ContainerCreationContext { + configure: &self.config, + peer: self.peer.load(Ordering::Relaxed), + }, + ); + f(state) } pub(super) fn is_in_txn(&self) -> bool { @@ -798,7 +901,7 @@ impl DocState { } pub fn is_empty(&self) -> bool { - !self.in_txn && self.states.is_empty() && self.arena.can_import_snapshot() + !self.in_txn && self.store.is_empty() && self.arena.can_import_snapshot() } pub fn get_deep_value(&mut self) -> LoroValue { @@ -846,7 +949,7 @@ impl DocState { id: Option, ) -> LoroValue { let id = id.unwrap_or_else(|| self.arena.idx_to_id(container).unwrap()); - let Some(state) = self.states.get_mut(&container) else { + let Some(state) = self.store.get_container_mut(container) else { return container.get_type().default_value(); }; let value = state.get_value(); @@ -907,7 +1010,7 @@ impl DocState { } pub fn get_container_deep_value(&mut self, container: ContainerIdx) -> LoroValue { - let Some(state) = self.states.get_mut(&container) else { + let Some(state) = self.store.get_container_mut(container) else { return container.get_type().default_value(); }; let value = state.get_value(); @@ -1026,14 +1129,14 @@ impl DocState { } } - pub(crate) fn get_reachable(&self, id: &ContainerID) -> bool { + pub(crate) fn get_reachable(&mut self, id: &ContainerID) -> bool { let Some(mut idx) = self.arena.id_to_idx(id) else { return false; }; loop { let id = self.arena.idx_to_id(idx).unwrap(); if let Some(parent_idx) = self.arena.get_parent(idx) { - let Some(parent_state) = self.states.get(&parent_idx) else { + let Some(parent_state) = self.store.get_container_mut(parent_idx) else { return false; }; if !parent_state.contains_child(&id) { @@ -1051,13 +1154,13 @@ impl DocState { } // the container may be override, so it may return None - fn get_path(&self, idx: ContainerIdx) -> Option> { + fn get_path(&mut self, idx: ContainerIdx) -> Option> { let mut ans = Vec::new(); let mut idx = idx; loop { let id = self.arena.idx_to_id(idx).unwrap(); if let Some(parent_idx) = self.arena.get_parent(idx) { - let parent_state = self.states.get(&parent_idx)?; + let parent_state = self.store.get_container_mut(parent_idx)?; let Some(prop) = parent_state.get_child_index(&id) else { tracing::info!("Missing in parent children"); return None; @@ -1135,16 +1238,16 @@ impl DocState { } let self_id_to_states: FxHashMap = self - .states - .values_mut() + .store + .iter_and_decode_all() .filter_map(|state: &mut State| { let arena = &self.arena; get_entries_for_state(arena, state) }) .collect(); let mut other_id_to_states: FxHashMap = other - .states - .values_mut() + .store + .iter_and_decode_all() .filter_map(|state: &mut State| { let arena = &other.arena; get_entries_for_state(arena, state) @@ -1173,52 +1276,32 @@ impl DocState { } pub fn log_estimated_size(&self) { - let state_entries_size = self.states.len() - * (std::mem::size_of::() + std::mem::size_of::()); + let state_entries_size = + self.store.len() * (std::mem::size_of::() + std::mem::size_of::()); let mut state_size_sum = 0; - for state in self.states.values() { - state_size_sum += state.estimate_size(); - } + state_size_sum += self.store.estimate_size(); eprintln!( "ContainerNum: {}\nEstimated state size: \nEntries: {} \nSum: {}", - self.states.len(), + self.store.len(), state_entries_size, state_size_sum ); } pub fn create_state(&self, idx: ContainerIdx) -> State { - match idx.get_type() { - ContainerType::Map => State::MapState(Box::new(MapState::new(idx))), - ContainerType::List => State::ListState(Box::new(ListState::new(idx))), - ContainerType::Text => State::RichtextState(Box::new(RichtextState::new( - idx, - self.config.text_style_config.clone(), - ))), - ContainerType::Tree => State::TreeState(Box::new(TreeState::new( - idx, - self.peer, - self.config.tree_position_jitter.clone(), - ))), - ContainerType::MovableList => { - State::MovableListState(Box::new(MovableListState::new(idx))) - } - #[cfg(feature = "counter")] - ContainerType::Counter => { - State::CounterState(Box::new(counter_state::CounterState::new(idx))) - } - ContainerType::Unknown(_) => State::UnknownState(Box::new(UnknownState::new(idx))), - } + let config = &self.config; + let peer = self.peer.load(std::sync::atomic::Ordering::Relaxed); + create_state_(idx, config, peer) } pub fn create_unknown_state(&self, idx: ContainerIdx) -> State { - State::UnknownState(Box::new(UnknownState::new(idx))) + State::UnknownState(UnknownState::new(idx)) } pub fn get_relative_position(&mut self, pos: &Cursor, use_event_index: bool) -> Option { let idx = self.arena.register_container(&pos.container); - let state = self.states.get_mut(&idx)?; + let state = self.store.get_container_mut(idx)?; if let Some(id) = pos.id { match state { State::ListState(s) => s.get_index_of_id(id), @@ -1264,7 +1347,7 @@ impl DocState { } for index in path[..path.len() - 1].iter().skip(1) { - let parent_state = self.states.get(&state_idx)?; + let parent_state = self.store.get_container_mut(state_idx)?; match parent_state { State::ListState(l) => { let Some(LoroValue::Container(c)) = l.get(*index.as_seq()?) else { @@ -1297,7 +1380,7 @@ impl DocState { } } - let parent_state = self.states.get_mut(&state_idx)?; + let parent_state = self.store.get_container_mut(state_idx)?; let index = path.last().unwrap(); let value: LoroValue = match parent_state { State::ListState(l) => l.get(*index.as_seq()?).cloned()?, @@ -1323,6 +1406,28 @@ impl DocState { } } +fn create_state_(idx: ContainerIdx, config: &Configure, peer: u64) -> State { + match idx.get_type() { + ContainerType::Map => State::MapState(Box::new(MapState::new(idx))), + ContainerType::List => State::ListState(Box::new(ListState::new(idx))), + ContainerType::Text => State::RichtextState(Box::new(RichtextState::new( + idx, + config.text_style_config.clone(), + ))), + ContainerType::Tree => State::TreeState(Box::new(TreeState::new( + idx, + peer, + config.tree_position_jitter.clone(), + ))), + ContainerType::MovableList => State::MovableListState(Box::new(MovableListState::new(idx))), + #[cfg(feature = "counter")] + ContainerType::Counter => { + State::CounterState(Box::new(counter_state::CounterState::new(idx))) + } + ContainerType::Unknown(_) => State::UnknownState(UnknownState::new(idx)), + } +} + fn trigger_on_new_container(state_diff: &Diff, mut listener: impl FnMut(ContainerIdx)) { match state_diff { Diff::List(list) => { diff --git a/crates/loro-internal/src/state/container_store.rs b/crates/loro-internal/src/state/container_store.rs index a414c45b..99006514 100644 --- a/crates/loro-internal/src/state/container_store.rs +++ b/crates/loro-internal/src/state/container_store.rs @@ -1,10 +1,28 @@ -use crate::{arena::SharedArena, container::idx::ContainerIdx}; +use std::{ + io::Write, + mem, + sync::{atomic::AtomicU64, Arc}, +}; + +use crate::{ + arena::SharedArena, + configure::Configure, + container::idx::ContainerIdx, + state::{FastStateSnapshot, RichtextState}, +}; use bytes::Bytes; use encode::{decode_cids, CidOffsetEncoder}; use fxhash::FxHashMap; -use loro_common::{LoroResult, LoroValue}; +use loro_common::{ContainerID, ContainerType, LoroResult, LoroValue}; +use tracing::field::Visit; -use super::{ContainerState, State}; +use super::{ + unknown_state::UnknownState, ContainerCreationContext, ContainerState, ListState, MapState, + MovableListState, State, TreeState, +}; + +#[cfg(feature = "counter")] +use super::counter_state::CounterState; /// Encoding Schema for Container Store /// @@ -39,23 +57,63 @@ use super::{ContainerState, State}; /// │ │ /// │ │ /// └───────────────────────────────────────────────────┘ +#[derive(Clone)] pub(crate) struct ContainerStore { arena: SharedArena, store: FxHashMap, + conf: Configure, + peer: Arc, +} + +impl std::fmt::Debug for ContainerStore { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ContainerStore") + .field("store", &self.store) + .finish() + } } impl ContainerStore { - pub fn get_container(&mut self, idx: ContainerIdx) -> Option<&mut ContainerWrapper> { - self.store.get_mut(&idx) + pub fn new(arena: SharedArena, conf: Configure, peer: Arc) -> Self { + ContainerStore { + arena, + store: Default::default(), + conf, + peer, + } + } + + pub fn get_container_mut(&mut self, idx: ContainerIdx) -> Option<&mut State> { + self.store.get_mut(&idx).map(|x| { + x.get_state_mut( + idx, + ContainerCreationContext { + configure: &self.conf, + peer: self.peer.load(std::sync::atomic::Ordering::Relaxed), + }, + ) + }) + } + + pub fn get_container(&mut self, idx: ContainerIdx) -> Option<&State> { + self.store.get_mut(&idx).map(|x| { + x.get_state( + idx, + ContainerCreationContext { + configure: &self.conf, + peer: self.peer.load(std::sync::atomic::Ordering::Relaxed), + }, + ) + }) } pub fn get_value(&mut self, idx: ContainerIdx) -> Option { - self.store.get_mut(&idx).and_then(|c| c.get_value()) + self.store.get_mut(&idx).map(|c| c.get_value()) } - pub fn encode(&self) -> Bytes { + pub fn encode(&mut self) -> Bytes { let mut id_bytes_pairs = Vec::with_capacity(self.store.len()); - for (idx, container) in self.store.iter() { + for (idx, container) in self.store.iter_mut() { let id = self.arena.get_container_id(*idx).unwrap(); id_bytes_pairs.push((id, container.encode())) } @@ -98,47 +156,259 @@ impl ContainerStore { Ok(()) } + + pub fn iter_and_decode_all(&mut self) -> impl Iterator { + self.store.iter_mut().map(|(idx, v)| { + v.get_state_mut( + *idx, + ContainerCreationContext { + configure: &self.conf, + peer: self.peer.load(std::sync::atomic::Ordering::Relaxed), + }, + ) + }) + } + + pub fn is_empty(&self) -> bool { + self.store.is_empty() + } + + pub fn len(&self) -> usize { + self.store.len() + } + + pub fn iter(&self) -> impl Iterator { + self.store.iter() + } + + pub fn iter_mut(&mut self) -> impl Iterator { + self.store.iter_mut() + } + + pub(super) fn get_or_create( + &mut self, + idx: ContainerIdx, + f: impl FnOnce() -> ContainerWrapper, + ) -> &mut ContainerWrapper { + let s = self.store.entry(idx).or_insert_with(f); + s + } + + pub(crate) fn estimate_size(&self) -> usize { + self.store.len() * 4 + + self + .store + .values() + .map(|v| v.estimate_size()) + .sum::() + } + + pub(super) fn contains(&self, idx: ContainerIdx) -> bool { + self.store.contains_key(&idx) + } + + pub(super) fn insert(&mut self, idx: ContainerIdx, state: ContainerWrapper) { + self.store.insert(idx, state); + } } -pub(crate) enum ContainerWrapper { - Bytes(Bytes), - PartialParsed { bytes: Bytes, value: LoroValue }, - Parsed { bytes: Bytes, state: State }, - State(State), +#[derive(Clone, Debug)] +pub(crate) struct ContainerWrapper { + depth: usize, + kind: ContainerType, + parent: Option, + /// The possible combinations of is_some() are: + /// + /// 1. bytes: new container decoded from bytes + /// 2. bytes + value: new container decoded from bytes, with value decoded + /// 3. state + bytes + value: new container decoded from bytes, with value and state decoded + /// 4. state + bytes: Option, + value: Option, + bytes_offset_for_state: Option, + state: Option, } impl ContainerWrapper { - pub fn get_state(&mut self) -> Option<&State> { - match self { - ContainerWrapper::Bytes(_) => todo!(), - ContainerWrapper::PartialParsed { bytes, value } => todo!(), - ContainerWrapper::Parsed { bytes, state } => todo!(), - ContainerWrapper::State(_) => todo!(), + pub fn new(state: State, arena: &SharedArena) -> Self { + let idx = state.container_idx(); + let parent = arena + .get_parent(idx) + .and_then(|p| arena.get_container_id(p)); + let depth = arena.get_depth(idx).unwrap().get() as usize; + Self { + depth, + parent, + kind: idx.get_type(), + state: Some(state), + bytes: None, + value: None, + bytes_offset_for_state: None, } } - pub fn get_value(&mut self) -> Option { - match self { - ContainerWrapper::Bytes(bytes) => todo!("partial parse"), - ContainerWrapper::PartialParsed { bytes, value } => Some(value.clone()), - ContainerWrapper::Parsed { bytes, state } => Some(state.get_value()), - ContainerWrapper::State(s) => Some(s.get_value()), + /// It will not decode the state if it is not decoded + pub fn try_get_state(&self) -> Option<&State> { + self.state.as_ref() + } + + /// It will decode the state if it is not decoded + pub fn get_state(&mut self, idx: ContainerIdx, ctx: ContainerCreationContext) -> &State { + self.decode_state(idx, ctx).unwrap(); + self.state.as_ref().expect("ContainerWrapper is empty") + } + + /// It will decode the state if it is not decoded + pub fn get_state_mut( + &mut self, + idx: ContainerIdx, + ctx: ContainerCreationContext, + ) -> &mut State { + self.decode_state(idx, ctx).unwrap(); + self.bytes = None; + self.value = None; + self.state.as_mut().unwrap() + } + + pub fn get_value(&mut self) -> LoroValue { + if let Some(v) = self.value.as_ref() { + return v.clone(); + } + + self.decode_value().unwrap(); + if self.value.is_none() { + return self.state.as_mut().unwrap().get_value(); + } + + self.value.as_ref().unwrap().clone() + } + + pub fn encode(&mut self) -> Bytes { + if let Some(bytes) = self.bytes.as_ref() { + return bytes.clone(); + } + + // ContainerType + // Depth + // ParentID + // StateSnapshot + let mut output = Vec::new(); + output.push(self.kind.to_u8()); + leb128::write::unsigned(&mut output, self.depth as u64).unwrap(); + postcard::to_io(&self.parent, &mut output).unwrap(); + self.state + .as_mut() + .unwrap() + .encode_snapshot_fast(&mut output); + output.into() + } + + pub fn new_from_bytes(b: Bytes) -> Self { + let src: &[u8] = &b; + let bytes: &[u8] = &b; + let kind = ContainerType::try_from_u8(bytes[0]).unwrap(); + let mut bytes = &bytes[1..]; + let depth = leb128::read::unsigned(&mut bytes).unwrap(); + let (parent, bytes) = postcard::take_from_bytes(bytes).unwrap(); + // SAFETY: bytes is a slice of b + let size = unsafe { bytes.as_ptr().offset_from(src.as_ptr()) }; + Self { + depth: depth as usize, + kind, + parent, + state: None, + value: None, + bytes: Some(b.slice(size as usize..)), + bytes_offset_for_state: None, } } - pub fn encode(&self) -> Bytes { - todo!("encode container") + pub fn ensure_value(&mut self) -> &LoroValue { + if self.value.is_some() { + } else if self.state.is_some() { + let value = self.state.as_mut().unwrap().get_value(); + self.value = Some(value); + } else { + self.decode_value().unwrap(); + } + + self.value.as_ref().unwrap() } - pub fn new_from_bytes(bytes: Bytes) -> Self { - ContainerWrapper::Bytes(bytes) + fn decode_value(&mut self) -> LoroResult<()> { + let Some(b) = self.bytes.as_ref() else { + return Ok(()); + }; + + let (v, rest) = match self.kind { + ContainerType::Text => RichtextState::decode_value(b)?, + ContainerType::Map => MapState::decode_value(b)?, + ContainerType::List => ListState::decode_value(b)?, + ContainerType::MovableList => MovableListState::decode_value(b)?, + ContainerType::Tree => TreeState::decode_value(b)?, + #[cfg(feature = "counter")] + ContainerType::Counter => CounterState::decode_value(b)?, + ContainerType::Unknown(_) => UnknownState::decode_value(b)?, + }; + + self.value = Some(v); + // SAFETY: rest is a slice of b + let offset = unsafe { rest.as_ptr().offset_from(b.as_ptr()) }; + self.bytes_offset_for_state = Some(offset as usize); + Ok(()) + } + + fn decode_state(&mut self, idx: ContainerIdx, ctx: ContainerCreationContext) -> LoroResult<()> { + if self.state.is_some() { + return Ok(()); + } + + if self.value.is_none() { + self.decode_value()?; + } + + let b = self.bytes.as_ref().unwrap(); + let offset = self.bytes_offset_for_state.unwrap(); + let b = &b[offset..]; + let v = self.value.as_ref().unwrap().clone(); + let state: State = match self.kind { + ContainerType::Text => RichtextState::decode_snapshot_fast(idx, (v, b), ctx)?.into(), + ContainerType::Map => MapState::decode_snapshot_fast(idx, (v, b), ctx)?.into(), + ContainerType::List => ListState::decode_snapshot_fast(idx, (v, b), ctx)?.into(), + ContainerType::MovableList => { + MovableListState::decode_snapshot_fast(idx, (v, b), ctx)?.into() + } + ContainerType::Tree => TreeState::decode_snapshot_fast(idx, (v, b), ctx)?.into(), + #[cfg(feature = "counter")] + ContainerType::Counter => CounterState::decode_snapshot_fast(idx, (v, b), ctx)?.into(), + ContainerType::Unknown(_) => { + UnknownState::decode_snapshot_fast(idx, (v, b), ctx)?.into() + } + }; + self.state = Some(state); + Ok(()) + } + + pub fn estimate_size(&self) -> usize { + if let Some(bytes) = self.bytes.as_ref() { + return bytes.len(); + } + + self.state.as_ref().unwrap().estimate_size() + } + + pub(crate) fn is_state_empty(&self) -> bool { + if let Some(state) = self.state.as_ref() { + return state.is_state_empty(); + } + + // FIXME: it's not very accurate... + self.bytes.as_ref().unwrap().len() > 10 } } mod encode { - use loro_common::{ - ContainerID, ContainerType, Counter, InternalString, LoroError, LoroResult, - }; + use loro_common::{ContainerID, ContainerType, Counter, InternalString, LoroError, LoroResult}; use serde::{Deserialize, Serialize}; use serde_columnar::{ AnyRleDecoder, AnyRleEncoder, BoolRleDecoder, BoolRleEncoder, DeltaRleDecoder, diff --git a/crates/loro-internal/src/state/unknown_state.rs b/crates/loro-internal/src/state/unknown_state.rs index 8d5a5849..8724d1f7 100644 --- a/crates/loro-internal/src/state/unknown_state.rs +++ b/crates/loro-internal/src/state/unknown_state.rs @@ -105,3 +105,32 @@ impl ContainerState for UnknownState { Ok(()) } } + +mod snapshot { + use loro_common::LoroValue; + + use crate::state::FastStateSnapshot; + + use super::UnknownState; + + impl FastStateSnapshot for UnknownState { + fn encode_snapshot_fast(&mut self, mut w: W) { + w.write_all(&[]); + } + + fn decode_value(bytes: &[u8]) -> loro_common::LoroResult<(loro_common::LoroValue, &[u8])> { + Ok((LoroValue::Null, bytes)) + } + + fn decode_snapshot_fast( + idx: crate::container::idx::ContainerIdx, + v: (loro_common::LoroValue, &[u8]), + ctx: crate::state::ContainerCreationContext, + ) -> loro_common::LoroResult + where + Self: Sized, + { + Ok(UnknownState::new(idx)) + } + } +} diff --git a/crates/loro-internal/src/txn.rs b/crates/loro-internal/src/txn.rs index 3e57a2d7..8f8e4c4e 100644 --- a/crates/loro-internal/src/txn.rs +++ b/crates/loro-internal/src/txn.rs @@ -35,7 +35,7 @@ use super::{ event::{InternalContainerDiff, InternalDocDiff}, handler::{ListHandler, MapHandler, TextHandler, TreeHandler}, oplog::OpLog, - state::{DocState, State}, + state::DocState, }; pub type OnCommitFn = Box>) + Sync + Send>; @@ -246,7 +246,7 @@ impl Transaction { state_lock.start_txn(origin, crate::event::EventTriggerKind::Local); let arena = state_lock.arena.clone(); let frontiers = state_lock.frontiers.clone(); - let peer = state_lock.peer; + let peer = state_lock.peer.load(std::sync::atomic::Ordering::Relaxed); let next_counter = oplog_lock.next_id(peer).counter; let next_lamport = oplog_lock.dag.frontiers_to_next_lamport(&frontiers); drop(state_lock); @@ -372,8 +372,19 @@ impl Transaction { ) -> LoroResult<()> { if Arc::as_ptr(&self.state) != Weak::as_ptr(state_ref) { return Err(LoroError::UnmatchedContext { - expected: self.state.lock().unwrap().peer, - found: state_ref.upgrade().unwrap().lock().unwrap().peer, + expected: self + .state + .lock() + .unwrap() + .peer + .load(std::sync::atomic::Ordering::Relaxed), + found: state_ref + .upgrade() + .unwrap() + .lock() + .unwrap() + .peer + .load(std::sync::atomic::Ordering::Relaxed), }); } @@ -469,29 +480,12 @@ impl Transaction { .unwrap() } - pub fn get_value_by_idx(&self, idx: ContainerIdx) -> LoroValue { - self.state.lock().unwrap().get_value_by_idx(idx) - } - - #[allow(unused)] - pub(crate) fn with_state(&self, idx: ContainerIdx, f: F) -> R - where - F: FnOnce(&State) -> R, - { - let state = self.state.lock().unwrap(); - f(state.get_state(idx).unwrap()) - } - pub fn next_id(&self) -> ID { ID { peer: self.peer, counter: self.next_counter, } } - - pub fn is_empty(&self) -> bool { - self.local_ops.is_empty() - } } impl Drop for Transaction { diff --git a/crates/loro-internal/tests/autocommit.rs b/crates/loro-internal/tests/autocommit.rs index cd6f4ded..0fc909dd 100644 --- a/crates/loro-internal/tests/autocommit.rs +++ b/crates/loro-internal/tests/autocommit.rs @@ -40,6 +40,11 @@ fn auto_commit_list() { assert_eq!(value.to_json_value(), json!({"list": ["world", "hello"]})) } +#[ctor::ctor] +fn init() { + dev_utils::setup_test_log(); +} + #[test] fn auto_commit_with_checkout() { let doc = LoroDoc::default();