view: split out separate type for keeping track of op heads

This commit is contained in:
Martin von Zweigbergk 2021-03-10 15:39:16 -08:00
parent 2955bc4a29
commit 4bd121dab5
3 changed files with 96 additions and 39 deletions

View file

@ -32,6 +32,7 @@ pub mod index_store;
pub mod local_store; pub mod local_store;
pub mod lock; pub mod lock;
pub mod matchers; pub mod matchers;
pub mod op_heads_store;
pub mod op_store; pub mod op_store;
pub mod operation; pub mod operation;
pub mod protos; pub mod protos;

67
lib/src/op_heads_store.rs Normal file
View file

@ -0,0 +1,67 @@
// Copyright 2021 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::lock::FileLock;
use crate::op_store::OperationId;
use crate::operation::Operation;
use std::path::PathBuf;
/// Manages the very set of current heads of the operation log. The store is
/// simply a directory where each operation id is a file with that name (and no
/// content).
pub struct OpHeadsStore {
dir: PathBuf,
}
impl OpHeadsStore {
pub fn init(dir: PathBuf) -> Self {
OpHeadsStore { dir }
}
pub fn load(dir: PathBuf) -> OpHeadsStore {
OpHeadsStore { dir }
}
pub fn add_op_head(&self, id: &OperationId) {
std::fs::write(self.dir.join(id.hex()), "").unwrap();
}
pub fn remove_op_head(&self, id: &OperationId) {
// It's fine if the old head was not found. It probably means
// that we're on a distributed file system where the locking
// doesn't work. We'll probably end up with two current
// heads. We'll detect that next time we load the view.
std::fs::remove_file(self.dir.join(id.hex())).ok();
}
pub fn get_op_heads(&self) -> Vec<OperationId> {
let mut op_heads = vec![];
for op_head_entry in std::fs::read_dir(&self.dir).unwrap() {
let op_head_file_name = op_head_entry.unwrap().file_name();
let op_head_file_name = op_head_file_name.to_str().unwrap();
if let Ok(op_head) = hex::decode(op_head_file_name) {
op_heads.push(OperationId(op_head));
}
}
op_heads
}
pub fn update_op_heads(&self, op: &Operation) {
let _op_heads_lock = FileLock::lock(self.dir.join("lock"));
self.add_op_head(op.id());
for old_parent_id in op.parent_ids() {
self.remove_op_head(old_parent_id);
}
}
}

View file

@ -25,6 +25,7 @@ use crate::dag_walk;
use crate::index::MutableIndex; use crate::index::MutableIndex;
use crate::index_store::IndexStore; use crate::index_store::IndexStore;
use crate::lock::FileLock; use crate::lock::FileLock;
use crate::op_heads_store::OpHeadsStore;
use crate::op_store; use crate::op_store;
use crate::op_store::{OpStore, OpStoreResult, OperationId, OperationMetadata}; use crate::op_store::{OpStore, OpStoreResult, OperationId, OperationMetadata};
use crate::operation::Operation; use crate::operation::Operation;
@ -93,6 +94,7 @@ pub struct ReadonlyView {
store: Arc<StoreWrapper>, store: Arc<StoreWrapper>,
path: PathBuf, path: PathBuf,
op_store: Arc<dyn OpStore>, op_store: Arc<dyn OpStore>,
op_heads_store: Arc<OpHeadsStore>,
op_id: OperationId, op_id: OperationId,
index_store: Arc<IndexStore>, index_store: Arc<IndexStore>,
data: op_store::View, data: op_store::View,
@ -100,8 +102,8 @@ pub struct ReadonlyView {
pub struct MutableView { pub struct MutableView {
store: Arc<StoreWrapper>, store: Arc<StoreWrapper>,
path: PathBuf,
op_store: Arc<dyn OpStore>, op_store: Arc<dyn OpStore>,
op_heads_store: Arc<OpHeadsStore>,
base_op_id: OperationId, base_op_id: OperationId,
data: op_store::View, data: op_store::View,
} }
@ -156,30 +158,6 @@ pub enum OpHeadResolutionError {
NoHeads, NoHeads,
} }
fn add_op_head(op_heads_dir: &Path, id: &OperationId) {
std::fs::write(op_heads_dir.join(id.hex()), "").unwrap();
}
fn remove_op_head(op_heads_dir: &Path, id: &OperationId) {
// It's fine if the old head was not found. It probably means
// that we're on a distributed file system where the locking
// doesn't work. We'll probably end up with two current
// heads. We'll detect that next time we load the view.
std::fs::remove_file(op_heads_dir.join(id.hex())).ok();
}
fn get_op_heads(op_heads_dir: &Path) -> Vec<OperationId> {
let mut op_heads = vec![];
for op_head_entry in std::fs::read_dir(op_heads_dir).unwrap() {
let op_head_file_name = op_head_entry.unwrap().file_name();
let op_head_file_name = op_head_file_name.to_str().unwrap();
if let Ok(op_head) = hex::decode(op_head_file_name) {
op_heads.push(OperationId(op_head));
}
}
op_heads
}
pub fn merge_views( pub fn merge_views(
store: &StoreWrapper, store: &StoreWrapper,
left: &op_store::View, left: &op_store::View,
@ -252,9 +230,10 @@ fn get_single_op_head(
store: &StoreWrapper, store: &StoreWrapper,
op_store: &Arc<dyn OpStore>, op_store: &Arc<dyn OpStore>,
index_store: &Arc<IndexStore>, index_store: &Arc<IndexStore>,
op_head_store: &Arc<OpHeadsStore>,
op_heads_dir: &Path, op_heads_dir: &Path,
) -> Result<(OperationId, op_store::Operation, op_store::View), OpHeadResolutionError> { ) -> Result<(OperationId, op_store::Operation, op_store::View), OpHeadResolutionError> {
let mut op_heads = get_op_heads(&op_heads_dir); let mut op_heads = op_head_store.get_op_heads();
if op_heads.is_empty() { if op_heads.is_empty() {
return Err(OpHeadResolutionError::NoHeads); return Err(OpHeadResolutionError::NoHeads);
@ -275,8 +254,9 @@ fn get_single_op_head(
// Note that the locking isn't necessary for correctness; we take the lock // Note that the locking isn't necessary for correctness; we take the lock
// only to avoid other concurrent processes from doing the same work (and // only to avoid other concurrent processes from doing the same work (and
// producing another set of divergent heads). // producing another set of divergent heads).
// TODO: Add a function to OpHeadsStore for taking the lock?
let _lock = FileLock::lock(op_heads_dir.join("lock")); let _lock = FileLock::lock(op_heads_dir.join("lock"));
let op_heads = get_op_heads(&op_heads_dir); let op_heads = op_head_store.get_op_heads();
if op_heads.is_empty() { if op_heads.is_empty() {
return Err(OpHeadResolutionError::NoHeads); return Err(OpHeadResolutionError::NoHeads);
@ -292,12 +272,12 @@ fn get_single_op_head(
let (merge_operation_id, merge_operation, merged_view) = let (merge_operation_id, merge_operation, merged_view) =
merge_op_heads(store, op_store, index_store, &op_heads)?; merge_op_heads(store, op_store, index_store, &op_heads)?;
add_op_head(&op_heads_dir, &merge_operation_id); op_head_store.add_op_head(&merge_operation_id);
for old_op_head_id in op_heads { for old_op_head_id in op_heads {
// The merged one will be in the input to the merge if it's a "fast-forward" // The merged one will be in the input to the merge if it's a "fast-forward"
// merge. // merge.
if old_op_head_id != merge_operation_id { if old_op_head_id != merge_operation_id {
remove_op_head(&op_heads_dir, &old_op_head_id); op_head_store.remove_op_head(&old_op_head_id);
} }
} }
Ok((merge_operation_id, merge_operation, merged_view)) Ok((merge_operation_id, merge_operation, merged_view))
@ -391,12 +371,14 @@ impl ReadonlyView {
let op_heads_dir = path.join("op_heads"); let op_heads_dir = path.join("op_heads");
std::fs::create_dir(&op_heads_dir).unwrap(); std::fs::create_dir(&op_heads_dir).unwrap();
add_op_head(&op_heads_dir, &init_operation_id); let op_heads_store = Arc::new(OpHeadsStore::init(op_heads_dir));
op_heads_store.add_op_head(&init_operation_id);
ReadonlyView { ReadonlyView {
store, store,
path, path,
op_store, op_store,
op_heads_store,
op_id: init_operation_id, op_id: init_operation_id,
index_store, index_store,
data: root_view, data: root_view,
@ -410,12 +392,20 @@ impl ReadonlyView {
path: PathBuf, path: PathBuf,
) -> Self { ) -> Self {
let op_heads_dir = path.join("op_heads"); let op_heads_dir = path.join("op_heads");
let (op_id, _operation, view) = let op_heads_store = Arc::new(OpHeadsStore::load(op_heads_dir.clone()));
get_single_op_head(&store, &op_store, &index_store, &op_heads_dir).unwrap(); let (op_id, _operation, view) = get_single_op_head(
&store,
&op_store,
&index_store,
&op_heads_store,
&op_heads_dir,
)
.unwrap();
ReadonlyView { ReadonlyView {
store, store,
path, path,
op_store, op_store,
op_heads_store,
op_id, op_id,
index_store, index_store,
data: view, data: view,
@ -429,10 +419,13 @@ impl ReadonlyView {
path: PathBuf, path: PathBuf,
operation: &Operation, operation: &Operation,
) -> Self { ) -> Self {
let op_heads_dir = path.join("op_heads");
let op_heads_store = Arc::new(OpHeadsStore::load(op_heads_dir));
ReadonlyView { ReadonlyView {
store, store,
path, path,
op_store, op_store,
op_heads_store,
op_id: operation.id().clone(), op_id: operation.id().clone(),
index_store, index_store,
data: operation.view().take_store_view(), data: operation.view().take_store_view(),
@ -445,6 +438,7 @@ impl ReadonlyView {
&self.store, &self.store,
&self.op_store, &self.op_store,
&self.index_store, &self.index_store,
&self.op_heads_store,
&op_heads_dir, &op_heads_dir,
) )
.unwrap(); .unwrap();
@ -462,8 +456,8 @@ impl ReadonlyView {
// TODO: Avoid the cloning of the sets here. // TODO: Avoid the cloning of the sets here.
MutableView { MutableView {
store: self.store.clone(), store: self.store.clone(),
path: self.path.clone(),
op_store: self.op_store.clone(), op_store: self.op_store.clone(),
op_heads_store: self.op_heads_store.clone(),
base_op_id: self.op_id.clone(), base_op_id: self.op_id.clone(),
data: self.data.clone(), data: self.data.clone(),
} }
@ -572,11 +566,6 @@ impl MutableView {
} }
pub fn update_op_heads(&self, op: &Operation) { pub fn update_op_heads(&self, op: &Operation) {
let op_heads_dir = self.path.join("op_heads"); self.op_heads_store.update_op_heads(op)
let _op_heads_lock = FileLock::lock(op_heads_dir.join("lock"));
add_op_head(&op_heads_dir, op.id());
for old_parent_id in op.parent_ids() {
remove_op_head(&op_heads_dir, old_parent_id);
}
} }
} }