From bd4358043768314802463b31a0bb4d27bb9ba674 Mon Sep 17 00:00:00 2001 From: Daniel Ploch Date: Wed, 11 Jan 2023 16:12:17 -0500 Subject: [PATCH] op_heads_store: remove LockedOpHeads Make op resolution a closed operation, powered by a callback provided by the caller which runs under an internal lock scope. This allows for greatly simplifying the internal lifetime structuring. --- lib/src/op_heads_store.rs | 119 +++++++++++++------ lib/src/repo.rs | 88 +++++--------- lib/src/simple_op_heads_store.rs | 195 +++++++------------------------ lib/src/transaction.rs | 2 +- lib/tests/test_bad_locking.rs | 9 +- lib/tests/test_load_repo.rs | 2 +- lib/tests/test_view.rs | 2 +- src/cli_util.rs | 143 +++++++++++++---------- src/commands/mod.rs | 3 +- 9 files changed, 241 insertions(+), 322 deletions(-) diff --git a/lib/src/op_heads_store.rs b/lib/src/op_heads_store.rs index 37e240a58..e6d02d750 100644 --- a/lib/src/op_heads_store.rs +++ b/lib/src/op_heads_store.rs @@ -16,54 +16,32 @@ use std::collections::HashSet; use std::fmt::Debug; use std::sync::Arc; +use itertools::Itertools; use thiserror::Error; use crate::dag_walk; use crate::op_store::{OpStore, OperationId}; use crate::operation::Operation; -pub enum OpHeads { - /// There's a single latest operation. This is the normal case. - Single(Operation), - /// There are multiple latest operations, which means there has been - /// concurrent operations. These need to be resolved. - Unresolved { - locked_op_heads: LockedOpHeads, - op_heads: Vec, - }, -} - #[derive(Debug, Error, PartialEq, Eq)] -pub enum OpHeadResolutionError { +pub enum OpHeadResolutionError { #[error("Operation log has no heads")] NoHeads, + #[error("Op resolution error: {0}")] + Err(E), } -pub trait LockedOpHeadsResolver { - fn finish(&self, new_op: &Operation); -} - -// Represents a mutually exclusive lock on the OpHeadsStore in local systems. -pub struct LockedOpHeads { - resolver: Box, -} - -impl LockedOpHeads { - pub fn new(resolver: Box) -> Self { - LockedOpHeads { resolver } - } - - pub fn finish(self, new_op: &Operation) { - self.resolver.finish(new_op); +impl From for OpHeadResolutionError { + fn from(e: E) -> Self { + OpHeadResolutionError::Err(e) } } -/// Manages the very set of current heads of the operation log. -/// -/// Implementations should use Arc<> internally, as the lock() and -/// get_heads() return values which might outlive the original object. When Rust -/// makes it possible for a Trait method to reference &Arc, this can be -/// simplified. +pub trait OpHeadsStoreLock<'a> { + fn promote_new_op(&self, new_op: &Operation); +} + +/// Manages the set of current heads of the operation log. pub trait OpHeadsStore: Send + Sync + Debug { fn name(&self) -> &str; @@ -73,13 +51,11 @@ pub trait OpHeadsStore: Send + Sync + Debug { fn get_op_heads(&self) -> Vec; - fn lock(&self) -> LockedOpHeads; - - fn get_heads(&self, op_store: &Arc) -> Result; + fn lock<'a>(&'a self) -> Box + 'a>; /// Removes operations in the input that are ancestors of other operations /// in the input. The ancestors are removed both from the list and from - /// disk. + /// storage. fn handle_ancestor_ops(&self, op_heads: Vec) -> Vec { let op_head_ids_before: HashSet<_> = op_heads.iter().map(|op| op.id().clone()).collect(); let neighbors_fn = |op: &Operation| op.parents(); @@ -93,3 +69,70 @@ pub trait OpHeadsStore: Send + Sync + Debug { op_heads.into_iter().collect() } } + +// Given an OpHeadsStore, fetch and resolve its op heads down to one under a +// lock. +// +// This routine is defined outside the trait because it must support generics. +pub fn resolve_op_heads( + op_heads_store: &dyn OpHeadsStore, + op_store: &Arc, + resolver: impl FnOnce(Vec) -> Result, +) -> Result> { + let mut op_heads = op_heads_store.get_op_heads(); + + // TODO: De-duplicate this 'simple-resolution' code. + if op_heads.is_empty() { + return Err(OpHeadResolutionError::NoHeads); + } + + if op_heads.len() == 1 { + let operation_id = op_heads.pop().unwrap(); + let operation = op_store.read_operation(&operation_id).unwrap(); + return Ok(Operation::new(op_store.clone(), operation_id, operation)); + } + + // There are multiple heads. We take a lock, then check if there are still + // multiple heads (it's likely that another process was in the process of + // deleting on of them). If there are still multiple heads, we attempt to + // merge all the views into one. We then write that view and a corresponding + // operation to the op-store. + // Note that the locking isn't necessary for correctness; we take the lock + // only to prevent other concurrent processes from doing the same work (and + // producing another set of divergent heads). + let lock = op_heads_store.lock(); + let op_head_ids = op_heads_store.get_op_heads(); + + if op_head_ids.is_empty() { + return Err(OpHeadResolutionError::NoHeads); + } + + if op_head_ids.len() == 1 { + let op_head_id = op_head_ids[0].clone(); + let op_head = op_store.read_operation(&op_head_id).unwrap(); + return Ok(Operation::new(op_store.clone(), op_head_id, op_head)); + } + + let op_heads = op_head_ids + .iter() + .map(|op_id: &OperationId| { + let data = op_store.read_operation(op_id).unwrap(); + Operation::new(op_store.clone(), op_id.clone(), data) + }) + .collect_vec(); + let mut op_heads = op_heads_store.handle_ancestor_ops(op_heads); + + // Return without creating a merge operation + if op_heads.len() == 1 { + return Ok(op_heads.pop().unwrap()); + } + + op_heads.sort_by_key(|op| op.store_operation().metadata.end_time.timestamp.clone()); + match resolver(op_heads) { + Ok(new_op) => { + lock.promote_new_op(&new_op); + Ok(new_op) + } + Err(e) => Err(OpHeadResolutionError::Err(e)), + } +} diff --git a/lib/src/repo.rs b/lib/src/repo.rs index 19139f47b..efccba575 100644 --- a/lib/src/repo.rs +++ b/lib/src/repo.rs @@ -33,7 +33,7 @@ use crate::git_backend::GitBackend; use crate::index::{IndexRef, MutableIndex, ReadonlyIndex}; use crate::index_store::IndexStore; use crate::local_backend::LocalBackend; -use crate::op_heads_store::{LockedOpHeads, OpHeads, OpHeadsStore}; +use crate::op_heads_store::{self, OpHeadResolutionError, OpHeadsStore}; use crate::op_store::{ BranchTarget, OpStore, OperationId, OperationMetadata, RefTarget, WorkspaceId, }; @@ -201,10 +201,8 @@ impl ReadonlyRepo { user_settings: &UserSettings, repo_path: &Path, store_factories: &StoreFactories, - ) -> Result, BackendError> { - RepoLoader::init(user_settings, repo_path, store_factories) - .load_at_head() - .resolve(user_settings) + ) -> Result, OpHeadResolutionError> { + RepoLoader::init(user_settings, repo_path, store_factories).load_at_head(user_settings) } pub fn loader(&self) -> RepoLoader { @@ -303,8 +301,8 @@ impl ReadonlyRepo { pub fn reload_at_head( &self, user_settings: &UserSettings, - ) -> Result, BackendError> { - self.loader().load_at_head().resolve(user_settings) + ) -> Result, OpHeadResolutionError> { + self.loader().load_at_head(user_settings) } pub fn reload_at(&self, operation: &Operation) -> Arc { @@ -312,40 +310,6 @@ impl ReadonlyRepo { } } -pub enum RepoAtHead { - Single(Arc), - Unresolved(Box), -} - -impl RepoAtHead { - pub fn resolve(self, user_settings: &UserSettings) -> Result, BackendError> { - match self { - RepoAtHead::Single(repo) => Ok(repo), - RepoAtHead::Unresolved(unresolved) => unresolved.resolve(user_settings), - } - } -} - -pub struct UnresolvedHeadRepo { - pub repo_loader: RepoLoader, - pub locked_op_heads: LockedOpHeads, - pub op_heads: Vec, -} - -impl UnresolvedHeadRepo { - pub fn resolve(self, user_settings: &UserSettings) -> Result, BackendError> { - let base_repo = self.repo_loader.load_at(&self.op_heads[0]); - let mut tx = base_repo.start_transaction(user_settings, "resolve concurrent operations"); - for other_op_head in self.op_heads.into_iter().skip(1) { - tx.merge_operation(other_op_head); - tx.mut_repo().rebase_descendants(user_settings)?; - } - let merged_repo = tx.write().leave_unpublished(); - self.locked_op_heads.finish(merged_repo.operation()); - Ok(merged_repo) - } -} - type BackendFactory = Box Box>; type OpStoreFactory = Box Box>; type OpHeadsStoreFactory = Box Box>; @@ -526,22 +490,17 @@ impl RepoLoader { &self.op_heads_store } - pub fn load_at_head(&self) -> RepoAtHead { - let op_heads = self.op_heads_store.get_heads(&self.op_store).unwrap(); - match op_heads { - OpHeads::Single(op) => { - let view = View::new(op.view().take_store_view()); - RepoAtHead::Single(self._finish_load(op, view)) - } - OpHeads::Unresolved { - locked_op_heads, - op_heads, - } => RepoAtHead::Unresolved(Box::new(UnresolvedHeadRepo { - repo_loader: self.clone(), - locked_op_heads, - op_heads, - })), - } + pub fn load_at_head( + &self, + user_settings: &UserSettings, + ) -> Result, OpHeadResolutionError> { + let op = op_heads_store::resolve_op_heads( + self.op_heads_store.as_ref(), + &self.op_store, + |op_heads| self._resolve_op_heads(op_heads, user_settings), + )?; + let view = View::new(op.view().take_store_view()); + Ok(self._finish_load(op, view)) } pub fn load_at(&self, op: &Operation) -> Arc { @@ -570,6 +529,21 @@ impl RepoLoader { Arc::new(repo) } + fn _resolve_op_heads( + &self, + op_heads: Vec, + user_settings: &UserSettings, + ) -> Result { + let base_repo = self.load_at(&op_heads[0]); + let mut tx = base_repo.start_transaction(user_settings, "resolve concurrent operations"); + for other_op_head in op_heads.into_iter().skip(1) { + tx.merge_operation(other_op_head); + tx.mut_repo().rebase_descendants(user_settings)?; + } + let merged_repo = tx.write().leave_unpublished(); + Ok(merged_repo.operation().clone()) + } + fn _finish_load(&self, operation: Operation, view: View) -> Arc { let repo = ReadonlyRepo { repo_path: self.repo_path.clone(), diff --git a/lib/src/simple_op_heads_store.rs b/lib/src/simple_op_heads_store.rs index 6c5e87a0d..614146d2d 100644 --- a/lib/src/simple_op_heads_store.rs +++ b/lib/src/simple_op_heads_store.rs @@ -17,50 +17,25 @@ use std::fs; use std::path::{Path, PathBuf}; use std::sync::Arc; -use itertools::Itertools; - use crate::lock::FileLock; -use crate::op_heads_store::{ - LockedOpHeads, LockedOpHeadsResolver, OpHeadResolutionError, OpHeads, OpHeadsStore, -}; +use crate::op_heads_store::{OpHeadsStore, OpHeadsStoreLock}; use crate::op_store; use crate::op_store::{OpStore, OperationId, OperationMetadata}; use crate::operation::Operation; pub struct SimpleOpHeadsStore { - store: Arc, + dir: PathBuf, } impl Debug for SimpleOpHeadsStore { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.debug_struct("SimpleOpHeadsStore") - .field("dir", &self.store.dir) + .field("dir", &self.dir) .finish() } } -/// Manages the very set of current heads of the operation log. This store is -/// simply a directory where each operation id is a file with that name (and no -/// content). -struct InnerSimpleOpHeadsStore { - dir: PathBuf, -} - -struct SimpleOpHeadsStoreLockResolver { - store: Arc, - _lock: FileLock, -} - -impl LockedOpHeadsResolver for SimpleOpHeadsStoreLockResolver { - fn finish(&self, new_op: &Operation) { - self.store.add_op_head(new_op.id()); - for old_id in new_op.parent_ids() { - self.store.remove_op_head(old_id); - } - } -} - -impl InnerSimpleOpHeadsStore { +impl SimpleOpHeadsStore { pub fn init( dir: &Path, op_store: &Arc, @@ -78,75 +53,43 @@ impl InnerSimpleOpHeadsStore { let op_heads_dir = dir.join("simple_op_heads"); fs::create_dir(&op_heads_dir).unwrap(); - let op_heads_store = InnerSimpleOpHeadsStore { dir: op_heads_dir }; + let op_heads_store = Self { dir: op_heads_dir }; op_heads_store.add_op_head(init_operation.id()); (op_heads_store, init_operation) } - pub fn add_op_head(&self, id: &OperationId) { - std::fs::write(self.dir.join(id.hex()), "").unwrap(); - } - - pub fn remove_op_head(&self, id: &OperationId) { - // It's fine if the old head was not found. It probably means - // that we're on a distributed file system where the locking - // doesn't work. We'll probably end up with two current - // heads. We'll detect that next time we load the view. - std::fs::remove_file(self.dir.join(id.hex())).ok(); - } - - pub fn get_op_heads(&self) -> Vec { - let mut op_heads = vec![]; - for op_head_entry in std::fs::read_dir(&self.dir).unwrap() { - let op_head_file_name = op_head_entry.unwrap().file_name(); - let op_head_file_name = op_head_file_name.to_str().unwrap(); - if let Ok(op_head) = hex::decode(op_head_file_name) { - op_heads.push(OperationId::new(op_head)); - } - } - op_heads - } -} - -impl SimpleOpHeadsStore { - pub fn init( - dir: &Path, - op_store: &Arc, - root_view: &op_store::View, - operation_metadata: OperationMetadata, - ) -> (Self, Operation) { - let (inner, init_op) = - InnerSimpleOpHeadsStore::init(dir, op_store, root_view, operation_metadata); - ( - SimpleOpHeadsStore { - store: Arc::new(inner), - }, - init_op, - ) - } - pub fn load(dir: &Path) -> Self { let op_heads_dir = dir.join("simple_op_heads"); // TODO: Delete this migration code at 0.8+ or so if !op_heads_dir.exists() { - let old_store = InnerSimpleOpHeadsStore { + let old_store = Self { dir: dir.to_path_buf(), }; fs::create_dir(&op_heads_dir).unwrap(); - let new_store = InnerSimpleOpHeadsStore { dir: op_heads_dir }; + let new_store = Self { dir: op_heads_dir }; for id in old_store.get_op_heads() { old_store.remove_op_head(&id); new_store.add_op_head(&id); } - return SimpleOpHeadsStore { - store: Arc::new(new_store), - }; + return new_store; } - SimpleOpHeadsStore { - store: Arc::new(InnerSimpleOpHeadsStore { dir: op_heads_dir }), + Self { dir: op_heads_dir } + } +} + +struct SimpleOpHeadsStoreLock<'a> { + store: &'a dyn OpHeadsStore, + _lock: FileLock, +} + +impl OpHeadsStoreLock<'_> for SimpleOpHeadsStoreLock<'_> { + fn promote_new_op(&self, new_op: &Operation) { + self.store.add_op_head(new_op.id()); + for old_id in new_op.parent_ids() { + self.store.remove_op_head(old_id); } } } @@ -157,86 +100,33 @@ impl OpHeadsStore for SimpleOpHeadsStore { } fn add_op_head(&self, id: &OperationId) { - self.store.add_op_head(id); + std::fs::write(self.dir.join(id.hex()), "").unwrap(); } fn remove_op_head(&self, id: &OperationId) { - self.store.remove_op_head(id); + // It's fine if the old head was not found. It probably means + // that we're on a distributed file system where the locking + // doesn't work. We'll probably end up with two current + // heads. We'll detect that next time we load the view. + std::fs::remove_file(self.dir.join(id.hex())).ok(); } fn get_op_heads(&self) -> Vec { - self.store.get_op_heads() + let mut op_heads = vec![]; + for op_head_entry in std::fs::read_dir(&self.dir).unwrap() { + let op_head_file_name = op_head_entry.unwrap().file_name(); + let op_head_file_name = op_head_file_name.to_str().unwrap(); + if let Ok(op_head) = hex::decode(op_head_file_name) { + op_heads.push(OperationId::new(op_head)); + } + } + op_heads } - fn lock(&self) -> LockedOpHeads { - let lock = FileLock::lock(self.store.dir.join("lock")); - LockedOpHeads::new(Box::new(SimpleOpHeadsStoreLockResolver { - store: self.store.clone(), - _lock: lock, - })) - } - - fn get_heads(&self, op_store: &Arc) -> Result { - let mut op_heads = self.get_op_heads(); - - if op_heads.is_empty() { - return Err(OpHeadResolutionError::NoHeads); - } - - if op_heads.len() == 1 { - let operation_id = op_heads.pop().unwrap(); - let operation = op_store.read_operation(&operation_id).unwrap(); - return Ok(OpHeads::Single(Operation::new( - op_store.clone(), - operation_id, - operation, - ))); - } - - // There are multiple heads. We take a lock, then check if there are still - // multiple heads (it's likely that another process was in the process of - // deleting on of them). If there are still multiple heads, we attempt to - // merge all the views into one. We then write that view and a corresponding - // operation to the op-store. - // Note that the locking isn't necessary for correctness; we take the lock - // only to prevent other concurrent processes from doing the same work (and - // producing another set of divergent heads). - let locked_op_heads = self.lock(); - let op_head_ids = self.get_op_heads(); - - if op_head_ids.is_empty() { - return Err(OpHeadResolutionError::NoHeads); - } - - if op_head_ids.len() == 1 { - let op_head_id = op_head_ids[0].clone(); - let op_head = op_store.read_operation(&op_head_id).unwrap(); - // Return early so we don't write a merge operation with a single parent - return Ok(OpHeads::Single(Operation::new( - op_store.clone(), - op_head_id, - op_head, - ))); - } - - let op_heads = op_head_ids - .iter() - .map(|op_id: &OperationId| { - let data = op_store.read_operation(op_id).unwrap(); - Operation::new(op_store.clone(), op_id.clone(), data) - }) - .collect_vec(); - let mut op_heads = self.handle_ancestor_ops(op_heads); - - // Return without creating a merge operation - if op_heads.len() == 1 { - return Ok(OpHeads::Single(op_heads.pop().unwrap())); - } - - op_heads.sort_by_key(|op| op.store_operation().metadata.end_time.timestamp.clone()); - Ok(OpHeads::Unresolved { - locked_op_heads, - op_heads, + fn lock<'a>(&'a self) -> Box + 'a> { + Box::new(SimpleOpHeadsStoreLock { + store: self, + _lock: FileLock::lock(self.dir.join("lock")), }) } } @@ -249,7 +139,6 @@ mod tests { use itertools::Itertools; - use super::InnerSimpleOpHeadsStore; use crate::op_heads_store::OpHeadsStore; use crate::op_store::OperationId; use crate::simple_op_heads_store::SimpleOpHeadsStore; @@ -274,7 +163,7 @@ mod tests { ops.insert(op1.clone()); ops.insert(op2.clone()); - let old_store = InnerSimpleOpHeadsStore { + let old_store = SimpleOpHeadsStore { dir: store_path.clone(), }; old_store.add_op_head(&op1); diff --git a/lib/src/transaction.rs b/lib/src/transaction.rs index f6878a4cc..16db09a60 100644 --- a/lib/src/transaction.rs +++ b/lib/src/transaction.rs @@ -178,7 +178,7 @@ impl UnpublishedOperation { self.repo_loader .op_heads_store() .lock() - .finish(&data.operation); + .promote_new_op(&data.operation); let repo = self .repo_loader .create_from(data.operation, data.view, data.index); diff --git a/lib/tests/test_bad_locking.rs b/lib/tests/test_bad_locking.rs index 2a233c01d..b70c89c38 100644 --- a/lib/tests/test_bad_locking.rs +++ b/lib/tests/test_bad_locking.rs @@ -112,8 +112,7 @@ fn test_bad_locking_children(use_git: bool) { Workspace::load(&settings, machine1_root.path(), &StoreFactories::default()).unwrap(); let machine1_repo = machine1_workspace .repo_loader() - .load_at_head() - .resolve(&settings) + .load_at_head(&settings) .unwrap(); let mut machine1_tx = machine1_repo.start_transaction(&settings, "test"); let child1 = create_random_commit(machine1_tx.mut_repo(), &settings) @@ -129,8 +128,7 @@ fn test_bad_locking_children(use_git: bool) { Workspace::load(&settings, machine2_root.path(), &StoreFactories::default()).unwrap(); let machine2_repo = machine2_workspace .repo_loader() - .load_at_head() - .resolve(&settings) + .load_at_head(&settings) .unwrap(); let mut machine2_tx = machine2_repo.start_transaction(&settings, "test"); let child2 = create_random_commit(machine2_tx.mut_repo(), &settings) @@ -152,8 +150,7 @@ fn test_bad_locking_children(use_git: bool) { Workspace::load(&settings, merged_path.path(), &StoreFactories::default()).unwrap(); let merged_repo = merged_workspace .repo_loader() - .load_at_head() - .resolve(&settings) + .load_at_head(&settings) .unwrap(); assert!(merged_repo.view().heads().contains(child1.id())); assert!(merged_repo.view().heads().contains(child2.id())); diff --git a/lib/tests/test_load_repo.rs b/lib/tests/test_load_repo.rs index 95b98cb83..c94c9fd21 100644 --- a/lib/tests/test_load_repo.rs +++ b/lib/tests/test_load_repo.rs @@ -34,7 +34,7 @@ fn test_load_at_operation(use_git: bool) { // If we load the repo at head, we should not see the commit since it was // removed let loader = RepoLoader::init(&settings, repo.repo_path(), &StoreFactories::default()); - let head_repo = loader.load_at_head().resolve(&settings).unwrap(); + let head_repo = loader.load_at_head(&settings).unwrap(); assert!(!head_repo.view().heads().contains(commit.id())); // If we load the repo at the previous operation, we should see the commit since diff --git a/lib/tests/test_view.rs b/lib/tests/test_view.rs index 5468f8c97..9ac9234b9 100644 --- a/lib/tests/test_view.rs +++ b/lib/tests/test_view.rs @@ -438,7 +438,7 @@ fn commit_transactions(settings: &UserSettings, txs: Vec) -> Arc for CommandError { } } -impl From for CommandError { - fn from(err: OpHeadResolutionError) -> Self { +impl From> for CommandError { + fn from(err: OpHeadResolutionError) -> Self { match err { OpHeadResolutionError::NoHeads => CommandError::InternalError( "Corrupt repository: there are no operations".to_string(), ), + OpHeadResolutionError::Err(e) => e, } } } @@ -311,13 +314,32 @@ impl CommandHelper { &self.settings } - pub fn workspace_helper(&self, ui: &mut Ui) -> Result { + fn workspace_helper_internal( + &self, + ui: &mut Ui, + snapshot: bool, + ) -> Result { let workspace = self.load_workspace()?; - let mut workspace_command = self.resolve_operation(ui, workspace)?; - workspace_command.snapshot(ui)?; + let op_head = self.resolve_operation(ui, workspace.repo_loader())?; + let repo = workspace.repo_loader().load_at(&op_head); + let mut workspace_command = self.for_loaded_repo(ui, workspace, repo)?; + if snapshot { + workspace_command.snapshot(ui)?; + } Ok(workspace_command) } + pub fn workspace_helper(&self, ui: &mut Ui) -> Result { + self.workspace_helper_internal(ui, true) + } + + pub fn workspace_helper_no_snapshot( + &self, + ui: &mut Ui, + ) -> Result { + self.workspace_helper_internal(ui, false) + } + pub fn load_workspace(&self) -> Result { let loader = self.maybe_workspace_loader.as_ref().map_err(Clone::clone)?; loader @@ -328,49 +350,46 @@ impl CommandHelper { pub fn resolve_operation( &self, ui: &mut Ui, - workspace: Workspace, - ) -> Result { - let repo_loader = workspace.repo_loader(); - let op_heads = resolve_op_for_load( - repo_loader.op_store(), - repo_loader.op_heads_store(), - &self.global_args.at_operation, - )?; - let workspace_command = match op_heads { - OpHeads::Single(op) => { - let repo = repo_loader.load_at(&op); - self.for_loaded_repo(ui, workspace, repo)? - } - OpHeads::Unresolved { - locked_op_heads, - op_heads, - } => { - writeln!( - ui, - "Concurrent modification detected, resolving automatically.", - )?; - let base_repo = repo_loader.load_at(&op_heads[0]); - // TODO: It may be helpful to print each operation we're merging here - let mut workspace_command = self.for_loaded_repo(ui, workspace, base_repo)?; - let mut tx = workspace_command.start_transaction("resolve concurrent operations"); - for other_op_head in op_heads.into_iter().skip(1) { - tx.merge_operation(other_op_head); - let num_rebased = tx.mut_repo().rebase_descendants(&self.settings)?; - if num_rebased > 0 { - writeln!( - ui, - "Rebased {num_rebased} descendant commits onto commits rewritten by \ - other operation" - )?; + repo_loader: &RepoLoader, + ) -> Result> { + if self.global_args.at_operation == "@" { + op_heads_store::resolve_op_heads( + repo_loader.op_heads_store().as_ref(), + repo_loader.op_store(), + |op_heads| { + writeln!( + ui, + "Concurrent modification detected, resolving automatically.", + )?; + let base_repo = repo_loader.load_at(&op_heads[0]); + // TODO: It may be helpful to print each operation we're merging here + let mut tx = start_repo_transaction( + &base_repo, + &self.settings, + &self.string_args, + "resolve concurrent operations", + ); + for other_op_head in op_heads.into_iter().skip(1) { + tx.merge_operation(other_op_head); + let num_rebased = tx.mut_repo().rebase_descendants(&self.settings)?; + if num_rebased > 0 { + writeln!( + ui, + "Rebased {num_rebased} descendant commits onto commits rewritten \ + by other operation" + )?; + } } - } - let merged_repo = tx.write().leave_unpublished(); - locked_op_heads.finish(merged_repo.operation()); - workspace_command.repo = merged_repo; - workspace_command - } - }; - Ok(workspace_command) + Ok(tx.write().leave_unpublished().operation().clone()) + }, + ) + } else { + resolve_op_for_load( + repo_loader.op_store(), + repo_loader.op_heads_store(), + &self.global_args.at_operation, + ) + } } pub fn for_loaded_repo( @@ -1157,32 +1176,30 @@ fn resolve_op_for_load( op_store: &Arc, op_heads_store: &Arc, op_str: &str, -) -> Result { - if op_str == "@" { - Ok(op_heads_store.get_heads(op_store)?) - } else { - let get_current_op = || match op_heads_store.get_heads(op_store)? { - OpHeads::Single(current_op) => Ok(current_op), - OpHeads::Unresolved { .. } => Err(user_error(format!( +) -> Result> { + let get_current_op = || { + op_heads_store::resolve_op_heads(op_heads_store.as_ref(), op_store, |_| { + Err(user_error(format!( r#"The "{op_str}" expression resolved to more than one operation"# - ))), - }; - let operation = resolve_single_op(op_store, op_heads_store, get_current_op, op_str)?; - Ok(OpHeads::Single(operation)) - } + ))) + }) + }; + let operation = resolve_single_op(op_store, op_heads_store, get_current_op, op_str)?; + Ok(operation) } fn resolve_single_op( op_store: &Arc, op_heads_store: &Arc, - get_current_op: impl FnOnce() -> Result, + get_current_op: impl FnOnce() -> Result>, op_str: &str, ) -> Result { let op_symbol = op_str.trim_end_matches('-'); let op_postfix = &op_str[op_symbol.len()..]; let mut operation = match op_symbol { "@" => get_current_op(), - s => resolve_single_op_from_store(op_store, op_heads_store, s), + s => resolve_single_op_from_store(op_store, op_heads_store, s) + .map_err(OpHeadResolutionError::Err), }?; for _ in op_postfix.chars() { operation = match operation.parents().as_slice() { diff --git a/src/commands/mod.rs b/src/commands/mod.rs index c468a35fb..76715f6d5 100644 --- a/src/commands/mod.rs +++ b/src/commands/mod.rs @@ -3148,8 +3148,7 @@ fn cmd_workspace_update_stale( command: &CommandHelper, _args: &WorkspaceUpdateStaleArgs, ) -> Result<(), CommandError> { - let workspace = command.load_workspace()?; - let mut workspace_command = command.resolve_operation(ui, workspace)?; + let mut workspace_command = command.workspace_helper_no_snapshot(ui)?; let repo = workspace_command.repo().clone(); let (mut locked_wc, desired_wc_commit) = workspace_command.unsafe_start_working_copy_mutation()?;