op_heads_store: remove LockedOpHeads

Make op resolution a closed operation, powered by a callback provided by the
caller which runs under an internal lock scope. This allows for greatly
simplifying the internal lifetime structuring.
This commit is contained in:
Daniel Ploch 2023-01-11 16:12:17 -05:00 committed by Daniel Ploch
parent bec1051967
commit bd43580437
9 changed files with 241 additions and 322 deletions

View file

@ -16,54 +16,32 @@ use std::collections::HashSet;
use std::fmt::Debug; use std::fmt::Debug;
use std::sync::Arc; use std::sync::Arc;
use itertools::Itertools;
use thiserror::Error; use thiserror::Error;
use crate::dag_walk; use crate::dag_walk;
use crate::op_store::{OpStore, OperationId}; use crate::op_store::{OpStore, OperationId};
use crate::operation::Operation; use crate::operation::Operation;
pub enum OpHeads {
/// There's a single latest operation. This is the normal case.
Single(Operation),
/// There are multiple latest operations, which means there has been
/// concurrent operations. These need to be resolved.
Unresolved {
locked_op_heads: LockedOpHeads,
op_heads: Vec<Operation>,
},
}
#[derive(Debug, Error, PartialEq, Eq)] #[derive(Debug, Error, PartialEq, Eq)]
pub enum OpHeadResolutionError { pub enum OpHeadResolutionError<E> {
#[error("Operation log has no heads")] #[error("Operation log has no heads")]
NoHeads, NoHeads,
#[error("Op resolution error: {0}")]
Err(E),
} }
pub trait LockedOpHeadsResolver { impl<E> From<E> for OpHeadResolutionError<E> {
fn finish(&self, new_op: &Operation); fn from(e: E) -> Self {
} OpHeadResolutionError::Err(e)
// Represents a mutually exclusive lock on the OpHeadsStore in local systems.
pub struct LockedOpHeads {
resolver: Box<dyn LockedOpHeadsResolver>,
}
impl LockedOpHeads {
pub fn new(resolver: Box<dyn LockedOpHeadsResolver>) -> Self {
LockedOpHeads { resolver }
}
pub fn finish(self, new_op: &Operation) {
self.resolver.finish(new_op);
} }
} }
/// Manages the very set of current heads of the operation log. pub trait OpHeadsStoreLock<'a> {
/// fn promote_new_op(&self, new_op: &Operation);
/// Implementations should use Arc<> internally, as the lock() and }
/// get_heads() return values which might outlive the original object. When Rust
/// makes it possible for a Trait method to reference &Arc<Self>, this can be /// Manages the set of current heads of the operation log.
/// simplified.
pub trait OpHeadsStore: Send + Sync + Debug { pub trait OpHeadsStore: Send + Sync + Debug {
fn name(&self) -> &str; fn name(&self) -> &str;
@ -73,13 +51,11 @@ pub trait OpHeadsStore: Send + Sync + Debug {
fn get_op_heads(&self) -> Vec<OperationId>; fn get_op_heads(&self) -> Vec<OperationId>;
fn lock(&self) -> LockedOpHeads; fn lock<'a>(&'a self) -> Box<dyn OpHeadsStoreLock<'a> + 'a>;
fn get_heads(&self, op_store: &Arc<dyn OpStore>) -> Result<OpHeads, OpHeadResolutionError>;
/// Removes operations in the input that are ancestors of other operations /// Removes operations in the input that are ancestors of other operations
/// in the input. The ancestors are removed both from the list and from /// in the input. The ancestors are removed both from the list and from
/// disk. /// storage.
fn handle_ancestor_ops(&self, op_heads: Vec<Operation>) -> Vec<Operation> { fn handle_ancestor_ops(&self, op_heads: Vec<Operation>) -> Vec<Operation> {
let op_head_ids_before: HashSet<_> = op_heads.iter().map(|op| op.id().clone()).collect(); let op_head_ids_before: HashSet<_> = op_heads.iter().map(|op| op.id().clone()).collect();
let neighbors_fn = |op: &Operation| op.parents(); let neighbors_fn = |op: &Operation| op.parents();
@ -93,3 +69,70 @@ pub trait OpHeadsStore: Send + Sync + Debug {
op_heads.into_iter().collect() op_heads.into_iter().collect()
} }
} }
// Given an OpHeadsStore, fetch and resolve its op heads down to one under a
// lock.
//
// This routine is defined outside the trait because it must support generics.
pub fn resolve_op_heads<E>(
op_heads_store: &dyn OpHeadsStore,
op_store: &Arc<dyn OpStore>,
resolver: impl FnOnce(Vec<Operation>) -> Result<Operation, E>,
) -> Result<Operation, OpHeadResolutionError<E>> {
let mut op_heads = op_heads_store.get_op_heads();
// TODO: De-duplicate this 'simple-resolution' code.
if op_heads.is_empty() {
return Err(OpHeadResolutionError::NoHeads);
}
if op_heads.len() == 1 {
let operation_id = op_heads.pop().unwrap();
let operation = op_store.read_operation(&operation_id).unwrap();
return Ok(Operation::new(op_store.clone(), operation_id, operation));
}
// There are multiple heads. We take a lock, then check if there are still
// multiple heads (it's likely that another process was in the process of
// deleting on of them). If there are still multiple heads, we attempt to
// merge all the views into one. We then write that view and a corresponding
// operation to the op-store.
// Note that the locking isn't necessary for correctness; we take the lock
// only to prevent other concurrent processes from doing the same work (and
// producing another set of divergent heads).
let lock = op_heads_store.lock();
let op_head_ids = op_heads_store.get_op_heads();
if op_head_ids.is_empty() {
return Err(OpHeadResolutionError::NoHeads);
}
if op_head_ids.len() == 1 {
let op_head_id = op_head_ids[0].clone();
let op_head = op_store.read_operation(&op_head_id).unwrap();
return Ok(Operation::new(op_store.clone(), op_head_id, op_head));
}
let op_heads = op_head_ids
.iter()
.map(|op_id: &OperationId| {
let data = op_store.read_operation(op_id).unwrap();
Operation::new(op_store.clone(), op_id.clone(), data)
})
.collect_vec();
let mut op_heads = op_heads_store.handle_ancestor_ops(op_heads);
// Return without creating a merge operation
if op_heads.len() == 1 {
return Ok(op_heads.pop().unwrap());
}
op_heads.sort_by_key(|op| op.store_operation().metadata.end_time.timestamp.clone());
match resolver(op_heads) {
Ok(new_op) => {
lock.promote_new_op(&new_op);
Ok(new_op)
}
Err(e) => Err(OpHeadResolutionError::Err(e)),
}
}

View file

@ -33,7 +33,7 @@ use crate::git_backend::GitBackend;
use crate::index::{IndexRef, MutableIndex, ReadonlyIndex}; use crate::index::{IndexRef, MutableIndex, ReadonlyIndex};
use crate::index_store::IndexStore; use crate::index_store::IndexStore;
use crate::local_backend::LocalBackend; use crate::local_backend::LocalBackend;
use crate::op_heads_store::{LockedOpHeads, OpHeads, OpHeadsStore}; use crate::op_heads_store::{self, OpHeadResolutionError, OpHeadsStore};
use crate::op_store::{ use crate::op_store::{
BranchTarget, OpStore, OperationId, OperationMetadata, RefTarget, WorkspaceId, BranchTarget, OpStore, OperationId, OperationMetadata, RefTarget, WorkspaceId,
}; };
@ -201,10 +201,8 @@ impl ReadonlyRepo {
user_settings: &UserSettings, user_settings: &UserSettings,
repo_path: &Path, repo_path: &Path,
store_factories: &StoreFactories, store_factories: &StoreFactories,
) -> Result<Arc<ReadonlyRepo>, BackendError> { ) -> Result<Arc<ReadonlyRepo>, OpHeadResolutionError<BackendError>> {
RepoLoader::init(user_settings, repo_path, store_factories) RepoLoader::init(user_settings, repo_path, store_factories).load_at_head(user_settings)
.load_at_head()
.resolve(user_settings)
} }
pub fn loader(&self) -> RepoLoader { pub fn loader(&self) -> RepoLoader {
@ -303,8 +301,8 @@ impl ReadonlyRepo {
pub fn reload_at_head( pub fn reload_at_head(
&self, &self,
user_settings: &UserSettings, user_settings: &UserSettings,
) -> Result<Arc<ReadonlyRepo>, BackendError> { ) -> Result<Arc<ReadonlyRepo>, OpHeadResolutionError<BackendError>> {
self.loader().load_at_head().resolve(user_settings) self.loader().load_at_head(user_settings)
} }
pub fn reload_at(&self, operation: &Operation) -> Arc<ReadonlyRepo> { pub fn reload_at(&self, operation: &Operation) -> Arc<ReadonlyRepo> {
@ -312,40 +310,6 @@ impl ReadonlyRepo {
} }
} }
pub enum RepoAtHead {
Single(Arc<ReadonlyRepo>),
Unresolved(Box<UnresolvedHeadRepo>),
}
impl RepoAtHead {
pub fn resolve(self, user_settings: &UserSettings) -> Result<Arc<ReadonlyRepo>, BackendError> {
match self {
RepoAtHead::Single(repo) => Ok(repo),
RepoAtHead::Unresolved(unresolved) => unresolved.resolve(user_settings),
}
}
}
pub struct UnresolvedHeadRepo {
pub repo_loader: RepoLoader,
pub locked_op_heads: LockedOpHeads,
pub op_heads: Vec<Operation>,
}
impl UnresolvedHeadRepo {
pub fn resolve(self, user_settings: &UserSettings) -> Result<Arc<ReadonlyRepo>, BackendError> {
let base_repo = self.repo_loader.load_at(&self.op_heads[0]);
let mut tx = base_repo.start_transaction(user_settings, "resolve concurrent operations");
for other_op_head in self.op_heads.into_iter().skip(1) {
tx.merge_operation(other_op_head);
tx.mut_repo().rebase_descendants(user_settings)?;
}
let merged_repo = tx.write().leave_unpublished();
self.locked_op_heads.finish(merged_repo.operation());
Ok(merged_repo)
}
}
type BackendFactory = Box<dyn Fn(&Path) -> Box<dyn Backend>>; type BackendFactory = Box<dyn Fn(&Path) -> Box<dyn Backend>>;
type OpStoreFactory = Box<dyn Fn(&Path) -> Box<dyn OpStore>>; type OpStoreFactory = Box<dyn Fn(&Path) -> Box<dyn OpStore>>;
type OpHeadsStoreFactory = Box<dyn Fn(&Path) -> Box<dyn OpHeadsStore>>; type OpHeadsStoreFactory = Box<dyn Fn(&Path) -> Box<dyn OpHeadsStore>>;
@ -526,22 +490,17 @@ impl RepoLoader {
&self.op_heads_store &self.op_heads_store
} }
pub fn load_at_head(&self) -> RepoAtHead { pub fn load_at_head(
let op_heads = self.op_heads_store.get_heads(&self.op_store).unwrap(); &self,
match op_heads { user_settings: &UserSettings,
OpHeads::Single(op) => { ) -> Result<Arc<ReadonlyRepo>, OpHeadResolutionError<BackendError>> {
let view = View::new(op.view().take_store_view()); let op = op_heads_store::resolve_op_heads(
RepoAtHead::Single(self._finish_load(op, view)) self.op_heads_store.as_ref(),
} &self.op_store,
OpHeads::Unresolved { |op_heads| self._resolve_op_heads(op_heads, user_settings),
locked_op_heads, )?;
op_heads, let view = View::new(op.view().take_store_view());
} => RepoAtHead::Unresolved(Box::new(UnresolvedHeadRepo { Ok(self._finish_load(op, view))
repo_loader: self.clone(),
locked_op_heads,
op_heads,
})),
}
} }
pub fn load_at(&self, op: &Operation) -> Arc<ReadonlyRepo> { pub fn load_at(&self, op: &Operation) -> Arc<ReadonlyRepo> {
@ -570,6 +529,21 @@ impl RepoLoader {
Arc::new(repo) Arc::new(repo)
} }
fn _resolve_op_heads(
&self,
op_heads: Vec<Operation>,
user_settings: &UserSettings,
) -> Result<Operation, BackendError> {
let base_repo = self.load_at(&op_heads[0]);
let mut tx = base_repo.start_transaction(user_settings, "resolve concurrent operations");
for other_op_head in op_heads.into_iter().skip(1) {
tx.merge_operation(other_op_head);
tx.mut_repo().rebase_descendants(user_settings)?;
}
let merged_repo = tx.write().leave_unpublished();
Ok(merged_repo.operation().clone())
}
fn _finish_load(&self, operation: Operation, view: View) -> Arc<ReadonlyRepo> { fn _finish_load(&self, operation: Operation, view: View) -> Arc<ReadonlyRepo> {
let repo = ReadonlyRepo { let repo = ReadonlyRepo {
repo_path: self.repo_path.clone(), repo_path: self.repo_path.clone(),

View file

@ -17,50 +17,25 @@ use std::fs;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::sync::Arc; use std::sync::Arc;
use itertools::Itertools;
use crate::lock::FileLock; use crate::lock::FileLock;
use crate::op_heads_store::{ use crate::op_heads_store::{OpHeadsStore, OpHeadsStoreLock};
LockedOpHeads, LockedOpHeadsResolver, OpHeadResolutionError, OpHeads, OpHeadsStore,
};
use crate::op_store; use crate::op_store;
use crate::op_store::{OpStore, OperationId, OperationMetadata}; use crate::op_store::{OpStore, OperationId, OperationMetadata};
use crate::operation::Operation; use crate::operation::Operation;
pub struct SimpleOpHeadsStore { pub struct SimpleOpHeadsStore {
store: Arc<InnerSimpleOpHeadsStore>, dir: PathBuf,
} }
impl Debug for SimpleOpHeadsStore { impl Debug for SimpleOpHeadsStore {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SimpleOpHeadsStore") f.debug_struct("SimpleOpHeadsStore")
.field("dir", &self.store.dir) .field("dir", &self.dir)
.finish() .finish()
} }
} }
/// Manages the very set of current heads of the operation log. This store is impl SimpleOpHeadsStore {
/// simply a directory where each operation id is a file with that name (and no
/// content).
struct InnerSimpleOpHeadsStore {
dir: PathBuf,
}
struct SimpleOpHeadsStoreLockResolver {
store: Arc<InnerSimpleOpHeadsStore>,
_lock: FileLock,
}
impl LockedOpHeadsResolver for SimpleOpHeadsStoreLockResolver {
fn finish(&self, new_op: &Operation) {
self.store.add_op_head(new_op.id());
for old_id in new_op.parent_ids() {
self.store.remove_op_head(old_id);
}
}
}
impl InnerSimpleOpHeadsStore {
pub fn init( pub fn init(
dir: &Path, dir: &Path,
op_store: &Arc<dyn OpStore>, op_store: &Arc<dyn OpStore>,
@ -78,75 +53,43 @@ impl InnerSimpleOpHeadsStore {
let op_heads_dir = dir.join("simple_op_heads"); let op_heads_dir = dir.join("simple_op_heads");
fs::create_dir(&op_heads_dir).unwrap(); fs::create_dir(&op_heads_dir).unwrap();
let op_heads_store = InnerSimpleOpHeadsStore { dir: op_heads_dir }; let op_heads_store = Self { dir: op_heads_dir };
op_heads_store.add_op_head(init_operation.id()); op_heads_store.add_op_head(init_operation.id());
(op_heads_store, init_operation) (op_heads_store, init_operation)
} }
pub fn add_op_head(&self, id: &OperationId) {
std::fs::write(self.dir.join(id.hex()), "").unwrap();
}
pub fn remove_op_head(&self, id: &OperationId) {
// It's fine if the old head was not found. It probably means
// that we're on a distributed file system where the locking
// doesn't work. We'll probably end up with two current
// heads. We'll detect that next time we load the view.
std::fs::remove_file(self.dir.join(id.hex())).ok();
}
pub fn get_op_heads(&self) -> Vec<OperationId> {
let mut op_heads = vec![];
for op_head_entry in std::fs::read_dir(&self.dir).unwrap() {
let op_head_file_name = op_head_entry.unwrap().file_name();
let op_head_file_name = op_head_file_name.to_str().unwrap();
if let Ok(op_head) = hex::decode(op_head_file_name) {
op_heads.push(OperationId::new(op_head));
}
}
op_heads
}
}
impl SimpleOpHeadsStore {
pub fn init(
dir: &Path,
op_store: &Arc<dyn OpStore>,
root_view: &op_store::View,
operation_metadata: OperationMetadata,
) -> (Self, Operation) {
let (inner, init_op) =
InnerSimpleOpHeadsStore::init(dir, op_store, root_view, operation_metadata);
(
SimpleOpHeadsStore {
store: Arc::new(inner),
},
init_op,
)
}
pub fn load(dir: &Path) -> Self { pub fn load(dir: &Path) -> Self {
let op_heads_dir = dir.join("simple_op_heads"); let op_heads_dir = dir.join("simple_op_heads");
// TODO: Delete this migration code at 0.8+ or so // TODO: Delete this migration code at 0.8+ or so
if !op_heads_dir.exists() { if !op_heads_dir.exists() {
let old_store = InnerSimpleOpHeadsStore { let old_store = Self {
dir: dir.to_path_buf(), dir: dir.to_path_buf(),
}; };
fs::create_dir(&op_heads_dir).unwrap(); fs::create_dir(&op_heads_dir).unwrap();
let new_store = InnerSimpleOpHeadsStore { dir: op_heads_dir }; let new_store = Self { dir: op_heads_dir };
for id in old_store.get_op_heads() { for id in old_store.get_op_heads() {
old_store.remove_op_head(&id); old_store.remove_op_head(&id);
new_store.add_op_head(&id); new_store.add_op_head(&id);
} }
return SimpleOpHeadsStore { return new_store;
store: Arc::new(new_store),
};
} }
SimpleOpHeadsStore { Self { dir: op_heads_dir }
store: Arc::new(InnerSimpleOpHeadsStore { dir: op_heads_dir }), }
}
struct SimpleOpHeadsStoreLock<'a> {
store: &'a dyn OpHeadsStore,
_lock: FileLock,
}
impl OpHeadsStoreLock<'_> for SimpleOpHeadsStoreLock<'_> {
fn promote_new_op(&self, new_op: &Operation) {
self.store.add_op_head(new_op.id());
for old_id in new_op.parent_ids() {
self.store.remove_op_head(old_id);
} }
} }
} }
@ -157,86 +100,33 @@ impl OpHeadsStore for SimpleOpHeadsStore {
} }
fn add_op_head(&self, id: &OperationId) { fn add_op_head(&self, id: &OperationId) {
self.store.add_op_head(id); std::fs::write(self.dir.join(id.hex()), "").unwrap();
} }
fn remove_op_head(&self, id: &OperationId) { fn remove_op_head(&self, id: &OperationId) {
self.store.remove_op_head(id); // It's fine if the old head was not found. It probably means
// that we're on a distributed file system where the locking
// doesn't work. We'll probably end up with two current
// heads. We'll detect that next time we load the view.
std::fs::remove_file(self.dir.join(id.hex())).ok();
} }
fn get_op_heads(&self) -> Vec<OperationId> { fn get_op_heads(&self) -> Vec<OperationId> {
self.store.get_op_heads() let mut op_heads = vec![];
for op_head_entry in std::fs::read_dir(&self.dir).unwrap() {
let op_head_file_name = op_head_entry.unwrap().file_name();
let op_head_file_name = op_head_file_name.to_str().unwrap();
if let Ok(op_head) = hex::decode(op_head_file_name) {
op_heads.push(OperationId::new(op_head));
}
}
op_heads
} }
fn lock(&self) -> LockedOpHeads { fn lock<'a>(&'a self) -> Box<dyn OpHeadsStoreLock<'a> + 'a> {
let lock = FileLock::lock(self.store.dir.join("lock")); Box::new(SimpleOpHeadsStoreLock {
LockedOpHeads::new(Box::new(SimpleOpHeadsStoreLockResolver { store: self,
store: self.store.clone(), _lock: FileLock::lock(self.dir.join("lock")),
_lock: lock,
}))
}
fn get_heads(&self, op_store: &Arc<dyn OpStore>) -> Result<OpHeads, OpHeadResolutionError> {
let mut op_heads = self.get_op_heads();
if op_heads.is_empty() {
return Err(OpHeadResolutionError::NoHeads);
}
if op_heads.len() == 1 {
let operation_id = op_heads.pop().unwrap();
let operation = op_store.read_operation(&operation_id).unwrap();
return Ok(OpHeads::Single(Operation::new(
op_store.clone(),
operation_id,
operation,
)));
}
// There are multiple heads. We take a lock, then check if there are still
// multiple heads (it's likely that another process was in the process of
// deleting on of them). If there are still multiple heads, we attempt to
// merge all the views into one. We then write that view and a corresponding
// operation to the op-store.
// Note that the locking isn't necessary for correctness; we take the lock
// only to prevent other concurrent processes from doing the same work (and
// producing another set of divergent heads).
let locked_op_heads = self.lock();
let op_head_ids = self.get_op_heads();
if op_head_ids.is_empty() {
return Err(OpHeadResolutionError::NoHeads);
}
if op_head_ids.len() == 1 {
let op_head_id = op_head_ids[0].clone();
let op_head = op_store.read_operation(&op_head_id).unwrap();
// Return early so we don't write a merge operation with a single parent
return Ok(OpHeads::Single(Operation::new(
op_store.clone(),
op_head_id,
op_head,
)));
}
let op_heads = op_head_ids
.iter()
.map(|op_id: &OperationId| {
let data = op_store.read_operation(op_id).unwrap();
Operation::new(op_store.clone(), op_id.clone(), data)
})
.collect_vec();
let mut op_heads = self.handle_ancestor_ops(op_heads);
// Return without creating a merge operation
if op_heads.len() == 1 {
return Ok(OpHeads::Single(op_heads.pop().unwrap()));
}
op_heads.sort_by_key(|op| op.store_operation().metadata.end_time.timestamp.clone());
Ok(OpHeads::Unresolved {
locked_op_heads,
op_heads,
}) })
} }
} }
@ -249,7 +139,6 @@ mod tests {
use itertools::Itertools; use itertools::Itertools;
use super::InnerSimpleOpHeadsStore;
use crate::op_heads_store::OpHeadsStore; use crate::op_heads_store::OpHeadsStore;
use crate::op_store::OperationId; use crate::op_store::OperationId;
use crate::simple_op_heads_store::SimpleOpHeadsStore; use crate::simple_op_heads_store::SimpleOpHeadsStore;
@ -274,7 +163,7 @@ mod tests {
ops.insert(op1.clone()); ops.insert(op1.clone());
ops.insert(op2.clone()); ops.insert(op2.clone());
let old_store = InnerSimpleOpHeadsStore { let old_store = SimpleOpHeadsStore {
dir: store_path.clone(), dir: store_path.clone(),
}; };
old_store.add_op_head(&op1); old_store.add_op_head(&op1);

View file

@ -178,7 +178,7 @@ impl UnpublishedOperation {
self.repo_loader self.repo_loader
.op_heads_store() .op_heads_store()
.lock() .lock()
.finish(&data.operation); .promote_new_op(&data.operation);
let repo = self let repo = self
.repo_loader .repo_loader
.create_from(data.operation, data.view, data.index); .create_from(data.operation, data.view, data.index);

View file

@ -112,8 +112,7 @@ fn test_bad_locking_children(use_git: bool) {
Workspace::load(&settings, machine1_root.path(), &StoreFactories::default()).unwrap(); Workspace::load(&settings, machine1_root.path(), &StoreFactories::default()).unwrap();
let machine1_repo = machine1_workspace let machine1_repo = machine1_workspace
.repo_loader() .repo_loader()
.load_at_head() .load_at_head(&settings)
.resolve(&settings)
.unwrap(); .unwrap();
let mut machine1_tx = machine1_repo.start_transaction(&settings, "test"); let mut machine1_tx = machine1_repo.start_transaction(&settings, "test");
let child1 = create_random_commit(machine1_tx.mut_repo(), &settings) let child1 = create_random_commit(machine1_tx.mut_repo(), &settings)
@ -129,8 +128,7 @@ fn test_bad_locking_children(use_git: bool) {
Workspace::load(&settings, machine2_root.path(), &StoreFactories::default()).unwrap(); Workspace::load(&settings, machine2_root.path(), &StoreFactories::default()).unwrap();
let machine2_repo = machine2_workspace let machine2_repo = machine2_workspace
.repo_loader() .repo_loader()
.load_at_head() .load_at_head(&settings)
.resolve(&settings)
.unwrap(); .unwrap();
let mut machine2_tx = machine2_repo.start_transaction(&settings, "test"); let mut machine2_tx = machine2_repo.start_transaction(&settings, "test");
let child2 = create_random_commit(machine2_tx.mut_repo(), &settings) let child2 = create_random_commit(machine2_tx.mut_repo(), &settings)
@ -152,8 +150,7 @@ fn test_bad_locking_children(use_git: bool) {
Workspace::load(&settings, merged_path.path(), &StoreFactories::default()).unwrap(); Workspace::load(&settings, merged_path.path(), &StoreFactories::default()).unwrap();
let merged_repo = merged_workspace let merged_repo = merged_workspace
.repo_loader() .repo_loader()
.load_at_head() .load_at_head(&settings)
.resolve(&settings)
.unwrap(); .unwrap();
assert!(merged_repo.view().heads().contains(child1.id())); assert!(merged_repo.view().heads().contains(child1.id()));
assert!(merged_repo.view().heads().contains(child2.id())); assert!(merged_repo.view().heads().contains(child2.id()));

View file

@ -34,7 +34,7 @@ fn test_load_at_operation(use_git: bool) {
// If we load the repo at head, we should not see the commit since it was // If we load the repo at head, we should not see the commit since it was
// removed // removed
let loader = RepoLoader::init(&settings, repo.repo_path(), &StoreFactories::default()); let loader = RepoLoader::init(&settings, repo.repo_path(), &StoreFactories::default());
let head_repo = loader.load_at_head().resolve(&settings).unwrap(); let head_repo = loader.load_at_head(&settings).unwrap();
assert!(!head_repo.view().heads().contains(commit.id())); assert!(!head_repo.view().heads().contains(commit.id()));
// If we load the repo at the previous operation, we should see the commit since // If we load the repo at the previous operation, we should see the commit since

View file

@ -438,7 +438,7 @@ fn commit_transactions(settings: &UserSettings, txs: Vec<Transaction>) -> Arc<Re
op_ids.push(tx.commit().op_id().clone()); op_ids.push(tx.commit().op_id().clone());
std::thread::sleep(std::time::Duration::from_millis(1)); std::thread::sleep(std::time::Duration::from_millis(1));
} }
let repo = repo_loader.load_at_head().resolve(settings).unwrap(); let repo = repo_loader.load_at_head(settings).unwrap();
// Test the setup. The assumption here is that the parent order matches the // Test the setup. The assumption here is that the parent order matches the
// order in which they were merged (which currently matches the transaction // order in which they were merged (which currently matches the transaction
// commit order), so we want to know make sure they appear in a certain // commit order), so we want to know make sure they appear in a certain

View file

@ -33,10 +33,12 @@ use jujutsu_lib::commit::Commit;
use jujutsu_lib::git::{GitExportError, GitImportError}; use jujutsu_lib::git::{GitExportError, GitImportError};
use jujutsu_lib::gitignore::GitIgnoreFile; use jujutsu_lib::gitignore::GitIgnoreFile;
use jujutsu_lib::matchers::{EverythingMatcher, Matcher, PrefixMatcher, Visit}; use jujutsu_lib::matchers::{EverythingMatcher, Matcher, PrefixMatcher, Visit};
use jujutsu_lib::op_heads_store::{OpHeadResolutionError, OpHeads, OpHeadsStore}; use jujutsu_lib::op_heads_store::{self, OpHeadResolutionError, OpHeadsStore};
use jujutsu_lib::op_store::{OpStore, OpStoreError, OperationId, WorkspaceId}; use jujutsu_lib::op_store::{OpStore, OpStoreError, OperationId, WorkspaceId};
use jujutsu_lib::operation::Operation; use jujutsu_lib::operation::Operation;
use jujutsu_lib::repo::{MutableRepo, ReadonlyRepo, RepoRef, RewriteRootCommit, StoreFactories}; use jujutsu_lib::repo::{
MutableRepo, ReadonlyRepo, RepoLoader, RepoRef, RewriteRootCommit, StoreFactories,
};
use jujutsu_lib::repo_path::{FsPathParseError, RepoPath}; use jujutsu_lib::repo_path::{FsPathParseError, RepoPath};
use jujutsu_lib::revset::{ use jujutsu_lib::revset::{
Revset, RevsetAliasesMap, RevsetError, RevsetExpression, RevsetParseError, Revset, RevsetAliasesMap, RevsetError, RevsetExpression, RevsetParseError,
@ -128,12 +130,13 @@ impl From<WorkspaceInitError> for CommandError {
} }
} }
impl From<OpHeadResolutionError> for CommandError { impl From<OpHeadResolutionError<CommandError>> for CommandError {
fn from(err: OpHeadResolutionError) -> Self { fn from(err: OpHeadResolutionError<CommandError>) -> Self {
match err { match err {
OpHeadResolutionError::NoHeads => CommandError::InternalError( OpHeadResolutionError::NoHeads => CommandError::InternalError(
"Corrupt repository: there are no operations".to_string(), "Corrupt repository: there are no operations".to_string(),
), ),
OpHeadResolutionError::Err(e) => e,
} }
} }
} }
@ -311,13 +314,32 @@ impl CommandHelper {
&self.settings &self.settings
} }
pub fn workspace_helper(&self, ui: &mut Ui) -> Result<WorkspaceCommandHelper, CommandError> { fn workspace_helper_internal(
&self,
ui: &mut Ui,
snapshot: bool,
) -> Result<WorkspaceCommandHelper, CommandError> {
let workspace = self.load_workspace()?; let workspace = self.load_workspace()?;
let mut workspace_command = self.resolve_operation(ui, workspace)?; let op_head = self.resolve_operation(ui, workspace.repo_loader())?;
workspace_command.snapshot(ui)?; let repo = workspace.repo_loader().load_at(&op_head);
let mut workspace_command = self.for_loaded_repo(ui, workspace, repo)?;
if snapshot {
workspace_command.snapshot(ui)?;
}
Ok(workspace_command) Ok(workspace_command)
} }
pub fn workspace_helper(&self, ui: &mut Ui) -> Result<WorkspaceCommandHelper, CommandError> {
self.workspace_helper_internal(ui, true)
}
pub fn workspace_helper_no_snapshot(
&self,
ui: &mut Ui,
) -> Result<WorkspaceCommandHelper, CommandError> {
self.workspace_helper_internal(ui, false)
}
pub fn load_workspace(&self) -> Result<Workspace, CommandError> { pub fn load_workspace(&self) -> Result<Workspace, CommandError> {
let loader = self.maybe_workspace_loader.as_ref().map_err(Clone::clone)?; let loader = self.maybe_workspace_loader.as_ref().map_err(Clone::clone)?;
loader loader
@ -328,49 +350,46 @@ impl CommandHelper {
pub fn resolve_operation( pub fn resolve_operation(
&self, &self,
ui: &mut Ui, ui: &mut Ui,
workspace: Workspace, repo_loader: &RepoLoader,
) -> Result<WorkspaceCommandHelper, CommandError> { ) -> Result<Operation, OpHeadResolutionError<CommandError>> {
let repo_loader = workspace.repo_loader(); if self.global_args.at_operation == "@" {
let op_heads = resolve_op_for_load( op_heads_store::resolve_op_heads(
repo_loader.op_store(), repo_loader.op_heads_store().as_ref(),
repo_loader.op_heads_store(), repo_loader.op_store(),
&self.global_args.at_operation, |op_heads| {
)?; writeln!(
let workspace_command = match op_heads { ui,
OpHeads::Single(op) => { "Concurrent modification detected, resolving automatically.",
let repo = repo_loader.load_at(&op); )?;
self.for_loaded_repo(ui, workspace, repo)? let base_repo = repo_loader.load_at(&op_heads[0]);
} // TODO: It may be helpful to print each operation we're merging here
OpHeads::Unresolved { let mut tx = start_repo_transaction(
locked_op_heads, &base_repo,
op_heads, &self.settings,
} => { &self.string_args,
writeln!( "resolve concurrent operations",
ui, );
"Concurrent modification detected, resolving automatically.", for other_op_head in op_heads.into_iter().skip(1) {
)?; tx.merge_operation(other_op_head);
let base_repo = repo_loader.load_at(&op_heads[0]); let num_rebased = tx.mut_repo().rebase_descendants(&self.settings)?;
// TODO: It may be helpful to print each operation we're merging here if num_rebased > 0 {
let mut workspace_command = self.for_loaded_repo(ui, workspace, base_repo)?; writeln!(
let mut tx = workspace_command.start_transaction("resolve concurrent operations"); ui,
for other_op_head in op_heads.into_iter().skip(1) { "Rebased {num_rebased} descendant commits onto commits rewritten \
tx.merge_operation(other_op_head); by other operation"
let num_rebased = tx.mut_repo().rebase_descendants(&self.settings)?; )?;
if num_rebased > 0 { }
writeln!(
ui,
"Rebased {num_rebased} descendant commits onto commits rewritten by \
other operation"
)?;
} }
} Ok(tx.write().leave_unpublished().operation().clone())
let merged_repo = tx.write().leave_unpublished(); },
locked_op_heads.finish(merged_repo.operation()); )
workspace_command.repo = merged_repo; } else {
workspace_command resolve_op_for_load(
} repo_loader.op_store(),
}; repo_loader.op_heads_store(),
Ok(workspace_command) &self.global_args.at_operation,
)
}
} }
pub fn for_loaded_repo( pub fn for_loaded_repo(
@ -1157,32 +1176,30 @@ fn resolve_op_for_load(
op_store: &Arc<dyn OpStore>, op_store: &Arc<dyn OpStore>,
op_heads_store: &Arc<dyn OpHeadsStore>, op_heads_store: &Arc<dyn OpHeadsStore>,
op_str: &str, op_str: &str,
) -> Result<OpHeads, CommandError> { ) -> Result<Operation, OpHeadResolutionError<CommandError>> {
if op_str == "@" { let get_current_op = || {
Ok(op_heads_store.get_heads(op_store)?) op_heads_store::resolve_op_heads(op_heads_store.as_ref(), op_store, |_| {
} else { Err(user_error(format!(
let get_current_op = || match op_heads_store.get_heads(op_store)? {
OpHeads::Single(current_op) => Ok(current_op),
OpHeads::Unresolved { .. } => Err(user_error(format!(
r#"The "{op_str}" expression resolved to more than one operation"# r#"The "{op_str}" expression resolved to more than one operation"#
))), )))
}; })
let operation = resolve_single_op(op_store, op_heads_store, get_current_op, op_str)?; };
Ok(OpHeads::Single(operation)) let operation = resolve_single_op(op_store, op_heads_store, get_current_op, op_str)?;
} Ok(operation)
} }
fn resolve_single_op( fn resolve_single_op(
op_store: &Arc<dyn OpStore>, op_store: &Arc<dyn OpStore>,
op_heads_store: &Arc<dyn OpHeadsStore>, op_heads_store: &Arc<dyn OpHeadsStore>,
get_current_op: impl FnOnce() -> Result<Operation, CommandError>, get_current_op: impl FnOnce() -> Result<Operation, OpHeadResolutionError<CommandError>>,
op_str: &str, op_str: &str,
) -> Result<Operation, CommandError> { ) -> Result<Operation, CommandError> {
let op_symbol = op_str.trim_end_matches('-'); let op_symbol = op_str.trim_end_matches('-');
let op_postfix = &op_str[op_symbol.len()..]; let op_postfix = &op_str[op_symbol.len()..];
let mut operation = match op_symbol { let mut operation = match op_symbol {
"@" => get_current_op(), "@" => get_current_op(),
s => resolve_single_op_from_store(op_store, op_heads_store, s), s => resolve_single_op_from_store(op_store, op_heads_store, s)
.map_err(OpHeadResolutionError::Err),
}?; }?;
for _ in op_postfix.chars() { for _ in op_postfix.chars() {
operation = match operation.parents().as_slice() { operation = match operation.parents().as_slice() {

View file

@ -3148,8 +3148,7 @@ fn cmd_workspace_update_stale(
command: &CommandHelper, command: &CommandHelper,
_args: &WorkspaceUpdateStaleArgs, _args: &WorkspaceUpdateStaleArgs,
) -> Result<(), CommandError> { ) -> Result<(), CommandError> {
let workspace = command.load_workspace()?; let mut workspace_command = command.workspace_helper_no_snapshot(ui)?;
let mut workspace_command = command.resolve_operation(ui, workspace)?;
let repo = workspace_command.repo().clone(); let repo = workspace_command.repo().clone();
let (mut locked_wc, desired_wc_commit) = let (mut locked_wc, desired_wc_commit) =
workspace_command.unsafe_start_working_copy_mutation()?; workspace_command.unsafe_start_working_copy_mutation()?;