2021-03-10 23:39:16 +00:00
|
|
|
// Copyright 2021 Google LLC
|
|
|
|
//
|
|
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
|
// you may not use this file except in compliance with the License.
|
|
|
|
// You may obtain a copy of the License at
|
|
|
|
//
|
|
|
|
// https://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
//
|
|
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
// See the License for the specific language governing permissions and
|
|
|
|
// limitations under the License.
|
|
|
|
|
2021-03-11 05:36:08 +00:00
|
|
|
use crate::dag_walk;
|
|
|
|
use crate::index::MutableIndex;
|
2021-03-12 06:12:49 +00:00
|
|
|
|
2021-03-10 23:39:16 +00:00
|
|
|
use crate::lock::FileLock;
|
2021-03-11 05:36:08 +00:00
|
|
|
use crate::op_store;
|
|
|
|
use crate::op_store::{OpStore, OperationId, OperationMetadata};
|
2021-03-10 23:39:16 +00:00
|
|
|
use crate::operation::Operation;
|
2021-03-12 06:12:49 +00:00
|
|
|
|
2021-03-11 05:36:08 +00:00
|
|
|
use crate::view;
|
2021-03-10 23:39:16 +00:00
|
|
|
use std::path::PathBuf;
|
2021-03-11 05:36:08 +00:00
|
|
|
use std::sync::Arc;
|
|
|
|
|
2021-03-12 06:12:49 +00:00
|
|
|
use crate::repo::RepoLoader;
|
2021-03-11 06:45:56 +00:00
|
|
|
use crate::store::CommitId;
|
2021-03-12 07:30:06 +00:00
|
|
|
use std::collections::HashSet;
|
2021-03-11 05:36:08 +00:00
|
|
|
use thiserror::Error;
|
2021-03-10 23:39:16 +00:00
|
|
|
|
|
|
|
/// Manages the very set of current heads of the operation log. The store is
|
|
|
|
/// simply a directory where each operation id is a file with that name (and no
|
|
|
|
/// content).
|
|
|
|
pub struct OpHeadsStore {
|
|
|
|
dir: PathBuf,
|
|
|
|
}
|
|
|
|
|
2021-03-11 05:36:08 +00:00
|
|
|
#[derive(Debug, Error, PartialEq, Eq)]
|
|
|
|
pub enum OpHeadResolutionError {
|
|
|
|
#[error("Operation log has no heads")]
|
|
|
|
NoHeads,
|
|
|
|
}
|
|
|
|
|
2021-03-10 23:39:16 +00:00
|
|
|
impl OpHeadsStore {
|
2021-03-11 06:45:56 +00:00
|
|
|
pub fn init(
|
|
|
|
dir: PathBuf,
|
|
|
|
op_store: &Arc<dyn OpStore>,
|
|
|
|
checkout: CommitId,
|
|
|
|
) -> (Self, OperationId, op_store::View) {
|
|
|
|
let mut root_view = op_store::View::new(checkout.clone());
|
|
|
|
root_view.head_ids.insert(checkout);
|
|
|
|
let root_view_id = op_store.write_view(&root_view).unwrap();
|
|
|
|
let operation_metadata = OperationMetadata::new("initialize repo".to_string());
|
|
|
|
let init_operation = op_store::Operation {
|
|
|
|
view_id: root_view_id,
|
|
|
|
parents: vec![],
|
|
|
|
metadata: operation_metadata,
|
|
|
|
};
|
|
|
|
let init_operation_id = op_store.write_operation(&init_operation).unwrap();
|
|
|
|
|
|
|
|
let op_heads_store = OpHeadsStore { dir };
|
|
|
|
op_heads_store.add_op_head(&init_operation_id);
|
|
|
|
(op_heads_store, init_operation_id, root_view)
|
2021-03-10 23:39:16 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
pub fn load(dir: PathBuf) -> OpHeadsStore {
|
|
|
|
OpHeadsStore { dir }
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn add_op_head(&self, id: &OperationId) {
|
|
|
|
std::fs::write(self.dir.join(id.hex()), "").unwrap();
|
|
|
|
}
|
|
|
|
|
2021-03-11 05:36:08 +00:00
|
|
|
fn remove_op_head(&self, id: &OperationId) {
|
2021-03-10 23:39:16 +00:00
|
|
|
// It's fine if the old head was not found. It probably means
|
|
|
|
// that we're on a distributed file system where the locking
|
|
|
|
// doesn't work. We'll probably end up with two current
|
|
|
|
// heads. We'll detect that next time we load the view.
|
|
|
|
std::fs::remove_file(self.dir.join(id.hex())).ok();
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn get_op_heads(&self) -> Vec<OperationId> {
|
|
|
|
let mut op_heads = vec![];
|
|
|
|
for op_head_entry in std::fs::read_dir(&self.dir).unwrap() {
|
|
|
|
let op_head_file_name = op_head_entry.unwrap().file_name();
|
|
|
|
let op_head_file_name = op_head_file_name.to_str().unwrap();
|
|
|
|
if let Ok(op_head) = hex::decode(op_head_file_name) {
|
|
|
|
op_heads.push(OperationId(op_head));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
op_heads
|
|
|
|
}
|
|
|
|
|
2021-03-11 05:36:08 +00:00
|
|
|
fn lock(&self) -> FileLock {
|
2021-03-11 05:24:54 +00:00
|
|
|
FileLock::lock(self.dir.join("lock"))
|
|
|
|
}
|
|
|
|
|
2021-03-10 23:39:16 +00:00
|
|
|
pub fn update_op_heads(&self, op: &Operation) {
|
2021-03-11 05:24:54 +00:00
|
|
|
let _op_heads_lock = self.lock();
|
2021-03-10 23:39:16 +00:00
|
|
|
self.add_op_head(op.id());
|
|
|
|
for old_parent_id in op.parent_ids() {
|
|
|
|
self.remove_op_head(old_parent_id);
|
|
|
|
}
|
|
|
|
}
|
2021-03-11 05:36:08 +00:00
|
|
|
|
|
|
|
// TODO: Introduce context objects (like commit::Commit) so we won't have to
|
|
|
|
// pass around OperationId and Operation separately like we do here.
|
|
|
|
pub fn get_single_op_head(
|
|
|
|
&self,
|
2021-03-12 06:12:49 +00:00
|
|
|
repo_loader: &RepoLoader,
|
2021-03-11 05:36:08 +00:00
|
|
|
) -> Result<(OperationId, op_store::Operation, op_store::View), OpHeadResolutionError> {
|
|
|
|
let mut op_heads = self.get_op_heads();
|
|
|
|
|
|
|
|
if op_heads.is_empty() {
|
|
|
|
return Err(OpHeadResolutionError::NoHeads);
|
|
|
|
}
|
|
|
|
|
2021-03-12 06:12:49 +00:00
|
|
|
let op_store = repo_loader.op_store();
|
2021-03-11 05:36:08 +00:00
|
|
|
if op_heads.len() == 1 {
|
|
|
|
let operation_id = op_heads.pop().unwrap();
|
|
|
|
let operation = op_store.read_operation(&operation_id).unwrap();
|
|
|
|
let view = op_store.read_view(&operation.view_id).unwrap();
|
|
|
|
return Ok((operation_id, operation, view));
|
|
|
|
}
|
|
|
|
|
|
|
|
// There are multiple heads. We take a lock, then check if there are still
|
|
|
|
// multiple heads (it's likely that another process was in the process of
|
|
|
|
// deleting on of them). If there are still multiple heads, we attempt to
|
|
|
|
// merge all the views into one. We then write that view and a corresponding
|
|
|
|
// operation to the op-store.
|
|
|
|
// Note that the locking isn't necessary for correctness; we take the lock
|
|
|
|
// only to avoid other concurrent processes from doing the same work (and
|
|
|
|
// producing another set of divergent heads).
|
|
|
|
let _lock = self.lock();
|
2021-03-12 07:07:47 +00:00
|
|
|
let op_head_ids = self.get_op_heads();
|
2021-03-11 05:36:08 +00:00
|
|
|
|
2021-03-12 07:07:47 +00:00
|
|
|
if op_head_ids.is_empty() {
|
2021-03-11 05:36:08 +00:00
|
|
|
return Err(OpHeadResolutionError::NoHeads);
|
|
|
|
}
|
|
|
|
|
2021-03-12 07:07:47 +00:00
|
|
|
if op_head_ids.len() == 1 {
|
|
|
|
let op_head_id = op_head_ids[0].clone();
|
2021-03-11 05:36:08 +00:00
|
|
|
let op_head = op_store.read_operation(&op_head_id).unwrap();
|
|
|
|
// Return early so we don't write a merge operation with a single parent
|
|
|
|
let view = op_store.read_view(&op_head.view_id).unwrap();
|
|
|
|
return Ok((op_head_id, op_head, view));
|
|
|
|
}
|
|
|
|
|
2021-03-12 07:07:47 +00:00
|
|
|
let op_heads: Vec<_> = op_head_ids
|
|
|
|
.iter()
|
|
|
|
.map(|op_id: &OperationId| {
|
|
|
|
let data = op_store.read_operation(op_id).unwrap();
|
|
|
|
Operation::new(op_store.clone(), op_id.clone(), data)
|
|
|
|
})
|
|
|
|
.collect();
|
2021-03-12 07:30:06 +00:00
|
|
|
let op_heads = self.handle_ancestor_ops(op_heads);
|
2021-03-12 07:07:47 +00:00
|
|
|
|
2021-03-14 00:33:31 +00:00
|
|
|
// Return without creating a merge operation
|
|
|
|
if op_heads.len() == 1 {
|
|
|
|
return Ok((
|
|
|
|
op_heads[0].id().clone(),
|
|
|
|
op_heads[0].store_operation().clone(),
|
|
|
|
op_heads[0].view().take_store_view(),
|
|
|
|
));
|
|
|
|
}
|
|
|
|
|
2021-03-11 05:36:08 +00:00
|
|
|
let (merge_operation_id, merge_operation, merged_view) =
|
2021-03-12 06:12:49 +00:00
|
|
|
merge_op_heads(repo_loader, op_heads)?;
|
2021-03-11 05:36:08 +00:00
|
|
|
self.add_op_head(&merge_operation_id);
|
2021-03-12 07:30:06 +00:00
|
|
|
for old_op_head_id in &merge_operation.parents {
|
|
|
|
self.remove_op_head(old_op_head_id);
|
2021-03-11 05:36:08 +00:00
|
|
|
}
|
|
|
|
Ok((merge_operation_id, merge_operation, merged_view))
|
|
|
|
}
|
2021-03-12 07:30:06 +00:00
|
|
|
|
|
|
|
/// Removes operations in the input that are ancestors of other operations
|
|
|
|
/// in the input. The ancestors are removed both from the list and from
|
|
|
|
/// disk.
|
|
|
|
fn handle_ancestor_ops(&self, op_heads: Vec<Operation>) -> Vec<Operation> {
|
|
|
|
let op_head_ids_before: HashSet<_> = op_heads.iter().map(|op| op.id().clone()).collect();
|
|
|
|
let neighbors_fn = |op: &Operation| op.parents();
|
|
|
|
// Remove ancestors so we don't create merge operation with an operation and its
|
|
|
|
// ancestor
|
|
|
|
let op_heads =
|
|
|
|
dag_walk::unreachable(op_heads, &neighbors_fn, &|op: &Operation| op.id().clone());
|
|
|
|
let op_head_ids_after: HashSet<_> = op_heads.iter().map(|op| op.id().clone()).collect();
|
|
|
|
for removed_op_head in op_head_ids_before.difference(&op_head_ids_after) {
|
|
|
|
self.remove_op_head(&removed_op_head);
|
|
|
|
}
|
|
|
|
op_heads.into_iter().collect()
|
|
|
|
}
|
2021-03-11 05:36:08 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
fn merge_op_heads(
|
2021-03-12 06:12:49 +00:00
|
|
|
repo_loader: &RepoLoader,
|
2021-03-12 07:07:47 +00:00
|
|
|
mut op_heads: Vec<Operation>,
|
2021-03-11 05:36:08 +00:00
|
|
|
) -> Result<(OperationId, op_store::Operation, op_store::View), OpHeadResolutionError> {
|
|
|
|
op_heads.sort_by_key(|op| op.store_operation().metadata.end_time.timestamp.clone());
|
|
|
|
let first_op_head = op_heads[0].clone();
|
2021-03-12 06:12:49 +00:00
|
|
|
let op_store = repo_loader.op_store();
|
2021-03-11 05:36:08 +00:00
|
|
|
let mut merged_view = op_store.read_view(first_op_head.view().id()).unwrap();
|
|
|
|
|
2021-03-12 07:07:47 +00:00
|
|
|
let neighbors_fn = |op: &Operation| op.parents();
|
2021-03-12 06:12:49 +00:00
|
|
|
let store = repo_loader.store();
|
|
|
|
let index_store = repo_loader.index_store();
|
2021-03-11 05:36:08 +00:00
|
|
|
let base_index = index_store.get_index_at_op(&first_op_head, store);
|
|
|
|
let mut index = MutableIndex::incremental(base_index);
|
|
|
|
for (i, other_op_head) in op_heads.iter().enumerate().skip(1) {
|
|
|
|
let other_index = index_store.get_index_at_op(other_op_head, store);
|
|
|
|
index.merge_in(&other_index);
|
|
|
|
let ancestor_op = dag_walk::closest_common_node(
|
|
|
|
op_heads[0..i].to_vec(),
|
|
|
|
vec![other_op_head.clone()],
|
|
|
|
&neighbors_fn,
|
|
|
|
&|op: &Operation| op.id().clone(),
|
|
|
|
)
|
|
|
|
.unwrap();
|
|
|
|
merged_view = view::merge_views(
|
|
|
|
store,
|
|
|
|
&merged_view,
|
|
|
|
ancestor_op.view().store_view(),
|
|
|
|
other_op_head.view().store_view(),
|
|
|
|
);
|
|
|
|
}
|
|
|
|
let merged_index = index_store.write_index(index).unwrap();
|
|
|
|
let merged_view_id = op_store.write_view(&merged_view).unwrap();
|
|
|
|
let operation_metadata = OperationMetadata::new("resolve concurrent operations".to_string());
|
|
|
|
let op_parent_ids = op_heads.iter().map(|op| op.id().clone()).collect();
|
|
|
|
let merge_operation = op_store::Operation {
|
|
|
|
view_id: merged_view_id,
|
|
|
|
parents: op_parent_ids,
|
|
|
|
metadata: operation_metadata,
|
|
|
|
};
|
|
|
|
let merge_operation_id = op_store.write_operation(&merge_operation).unwrap();
|
|
|
|
index_store
|
|
|
|
.associate_file_with_operation(merged_index.as_ref(), &merge_operation_id)
|
|
|
|
.unwrap();
|
|
|
|
Ok((merge_operation_id, merge_operation, merged_view))
|
2021-03-10 23:39:16 +00:00
|
|
|
}
|