refactor: use a new version of txn

This commit is contained in:
Zixuan Chen 2023-07-10 12:06:11 +08:00
parent bdb0e26b93
commit 508ca4b5c6
9 changed files with 72 additions and 24 deletions

View file

@ -23,7 +23,9 @@
"RUST_BACKTRACE": "full",
"DEBUG": "*"
},
"rust-analyzer.cargo.features": ["test_utils"],
"rust-analyzer.cargo.features": [
// "test_utils"
],
"editor.defaultFormatter": "rust-lang.rust-analyzer",
"rust-analyzer.server.extraEnv": { "RUSTUP_TOOLCHAIN": "stable" },
"editor.formatOnSave": true,
@ -41,5 +43,6 @@
"*.rs": "${capture}.excalidraw"
},
"excalidraw.theme": "dark",
"deno.enable": true
"deno.enable": true,
"cortex-debug.variableUseNaturalFormat": true
}

View file

@ -41,9 +41,27 @@ pub mod yata_impl;
/// - [YSpan] never gets removed in both [ContentMap] and [CursorMap]
/// - The deleted contents are marked with deleted, but still lives on the [ContentMap] with length of 0
///
#[cfg(not(feature = "test_utils"))]
#[derive(Default, Debug)]
pub struct Tracker {
/// from start_vv to latest vv are applied
start_vv: VersionVector,
/// latest applied ops version vector
all_vv: VersionVector,
/// current content version vector
current_vv: VersionVector,
/// The pretend current content version vector.
///
/// Because sometimes we don't actually need to checkout to the version.
/// So we may cache the changes then applying them when we really need to.
content: ContentMap,
id_to_cursor: CursorMap,
}
#[cfg(feature = "test_utils")]
#[derive(Default, Debug)]
pub struct Tracker {
#[cfg(feature = "test_utils")]
client_id: PeerID,
/// from start_vv to latest vv are applied
start_vv: VersionVector,
@ -143,7 +161,7 @@ impl Tracker {
}
/// for_diff = true should be called after the tracker checkout to A version with for_diff = false.
/// Then we can calculate the diff between A and vv.
/// Then we can calculate the diff between A and vv.
fn _checkout(&mut self, vv: &VersionVector, for_diff: bool) {
// clear after_status as it may be outdated
if for_diff {

View file

@ -5,7 +5,7 @@ use smallvec::SmallVec;
use crate::{
container::ContainerID,
delta::{Delta, DeltaType, MapDelta, MapDiff, Meta},
text::text_content::{SliceRanges},
text::text_content::SliceRanges,
transaction::Origin,
version::Frontiers,
InternalString, LoroValue,

View file

@ -11,7 +11,6 @@ use fxhash::FxHashMap;
use rle::HasLength;
use rle::RleVec;
use crate::change::Change;
pub struct ClientOpIter<'a> {

View file

@ -0,0 +1,11 @@
use crate::container::registry::ContainerIdx;
use super::txn::Transaction;
pub struct Text {
container_idx: ContainerIdx,
}
impl Text {
pub fn insert(&self, txn: &Transaction, pos: usize, s: &str) {}
}

View file

@ -1,6 +1,9 @@
#![allow(dead_code)]
pub(super) mod arena;
mod container;
pub(super) mod diff_calc;
mod handler;
pub mod oplog;
mod state;
mod txn;

View file

@ -1,9 +1,9 @@
mod dag;
pub(crate) mod dag;
use fxhash::FxHashMap;
use rle::RleVec;
use smallvec::SmallVec;
use tabled::measurment::Percent;
// use tabled::measurment::Percent;
use crate::change::{Change, Lamport, Timestamp};
use crate::container::list::list_op::{InnerListOp, ListOp};
@ -100,9 +100,8 @@ impl OpLog {
/// # Err
///
/// Return Err(LoroError::UsedOpID) when the change's id is occupied
pub fn import_change(&mut self, change: Change<RemoteOp>) -> Result<(), LoroError> {
pub fn import_change(&mut self, change: Change) -> Result<(), LoroError> {
self.check_id_valid(change.id)?;
let change = self.convert_change(change);
if let Err(id) = self.check_deps(&change.deps) {
self.pending_changes.entry(id).or_default().push(change);
return Ok(());

View file

@ -26,7 +26,7 @@ pub struct AppState {
pub(super) frontiers: Frontiers,
state: FxHashMap<ContainerIdx, State>,
arena: SharedArena,
pub(super) arena: SharedArena,
in_txn: bool,
changed_in_txn: FxHashSet<ContainerIdx>,

View file

@ -1,52 +1,67 @@
use std::sync::{Arc, Mutex};
use rle::RleVec;
use crate::{change::Change, op::RemoteOp, LoroError};
use crate::{change::Change, op::Op, LoroError};
use super::{oplog::OpLog, state::AppState};
use super::{arena::SharedArena, oplog::OpLog, state::AppState};
pub struct Transaction<'a> {
pub struct Transaction {
finished: bool,
state: &'a mut AppState,
ops: RleVec<[RemoteOp<'a>; 1]>,
state: Arc<Mutex<AppState>>,
ops: RleVec<[Op; 1]>,
oplog: Arc<Mutex<OpLog>>,
arena: SharedArena,
}
impl<'a> Transaction<'a> {
pub fn new(state: &'a mut AppState) -> Self {
state.start_txn();
impl Transaction {
pub fn new(state: Arc<Mutex<AppState>>, oplog: Arc<Mutex<OpLog>>) -> Self {
let mut state_lock = state.lock().unwrap();
state_lock.start_txn();
let arena = state_lock.arena.clone();
drop(state_lock);
Self {
state,
arena,
oplog,
finished: false,
ops: RleVec::new(),
}
}
pub fn abort(&mut self) {
self.state.abort_txn();
self.state.lock().unwrap().abort_txn();
self.finished = true;
}
pub fn commit(&mut self, oplog: &mut OpLog) -> Result<(), LoroError> {
let mut state = self.state.lock().unwrap();
let ops = std::mem::take(&mut self.ops);
let change = Change {
ops,
deps: self.state.frontiers.clone(),
id: oplog.next_id(self.state.peer),
deps: state.frontiers.clone(),
id: oplog.next_id(state.peer),
lamport: oplog.next_lamport(),
timestamp: oplog.get_timestamp(),
};
if let Err(err) = oplog.import_change(change) {
drop(state);
self.abort();
return Err(err);
}
self.state.commit_txn();
state.commit_txn();
self.finished = true;
Ok(())
}
pub fn decode(&mut self, updates: &[u8]) -> Result<(), LoroError> {
unimplemented!()
}
}
impl<'a> Drop for Transaction<'a> {
impl Drop for Transaction {
fn drop(&mut self) {
if !self.finished {
self.abort();