From 4bd121dab5f605a57e1891e258bf2cda837acd1b Mon Sep 17 00:00:00 2001 From: Martin von Zweigbergk Date: Wed, 10 Mar 2021 15:39:16 -0800 Subject: [PATCH] view: split out separate type for keeping track of op heads --- lib/src/lib.rs | 1 + lib/src/op_heads_store.rs | 67 +++++++++++++++++++++++++++++++++++++++ lib/src/view.rs | 67 ++++++++++++++++----------------------- 3 files changed, 96 insertions(+), 39 deletions(-) create mode 100644 lib/src/op_heads_store.rs diff --git a/lib/src/lib.rs b/lib/src/lib.rs index db2d4b7a0..b739e0d42 100644 --- a/lib/src/lib.rs +++ b/lib/src/lib.rs @@ -32,6 +32,7 @@ pub mod index_store; pub mod local_store; pub mod lock; pub mod matchers; +pub mod op_heads_store; pub mod op_store; pub mod operation; pub mod protos; diff --git a/lib/src/op_heads_store.rs b/lib/src/op_heads_store.rs new file mode 100644 index 000000000..4b9281b21 --- /dev/null +++ b/lib/src/op_heads_store.rs @@ -0,0 +1,67 @@ +// Copyright 2021 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::lock::FileLock; +use crate::op_store::OperationId; +use crate::operation::Operation; +use std::path::PathBuf; + +/// Manages the very set of current heads of the operation log. The store is +/// simply a directory where each operation id is a file with that name (and no +/// content). +pub struct OpHeadsStore { + dir: PathBuf, +} + +impl OpHeadsStore { + pub fn init(dir: PathBuf) -> Self { + OpHeadsStore { dir } + } + + pub fn load(dir: PathBuf) -> OpHeadsStore { + OpHeadsStore { dir } + } + + pub fn add_op_head(&self, id: &OperationId) { + std::fs::write(self.dir.join(id.hex()), "").unwrap(); + } + + pub fn remove_op_head(&self, id: &OperationId) { + // It's fine if the old head was not found. It probably means + // that we're on a distributed file system where the locking + // doesn't work. We'll probably end up with two current + // heads. We'll detect that next time we load the view. + std::fs::remove_file(self.dir.join(id.hex())).ok(); + } + + pub fn get_op_heads(&self) -> Vec { + let mut op_heads = vec![]; + for op_head_entry in std::fs::read_dir(&self.dir).unwrap() { + let op_head_file_name = op_head_entry.unwrap().file_name(); + let op_head_file_name = op_head_file_name.to_str().unwrap(); + if let Ok(op_head) = hex::decode(op_head_file_name) { + op_heads.push(OperationId(op_head)); + } + } + op_heads + } + + pub fn update_op_heads(&self, op: &Operation) { + let _op_heads_lock = FileLock::lock(self.dir.join("lock")); + self.add_op_head(op.id()); + for old_parent_id in op.parent_ids() { + self.remove_op_head(old_parent_id); + } + } +} diff --git a/lib/src/view.rs b/lib/src/view.rs index 5880fe208..6deb25229 100644 --- a/lib/src/view.rs +++ b/lib/src/view.rs @@ -25,6 +25,7 @@ use crate::dag_walk; use crate::index::MutableIndex; use crate::index_store::IndexStore; use crate::lock::FileLock; +use crate::op_heads_store::OpHeadsStore; use crate::op_store; use crate::op_store::{OpStore, OpStoreResult, OperationId, OperationMetadata}; use crate::operation::Operation; @@ -93,6 +94,7 @@ pub struct ReadonlyView { store: Arc, path: PathBuf, op_store: Arc, + op_heads_store: Arc, op_id: OperationId, index_store: Arc, data: op_store::View, @@ -100,8 +102,8 @@ pub struct ReadonlyView { pub struct MutableView { store: Arc, - path: PathBuf, op_store: Arc, + op_heads_store: Arc, base_op_id: OperationId, data: op_store::View, } @@ -156,30 +158,6 @@ pub enum OpHeadResolutionError { NoHeads, } -fn add_op_head(op_heads_dir: &Path, id: &OperationId) { - std::fs::write(op_heads_dir.join(id.hex()), "").unwrap(); -} - -fn remove_op_head(op_heads_dir: &Path, id: &OperationId) { - // It's fine if the old head was not found. It probably means - // that we're on a distributed file system where the locking - // doesn't work. We'll probably end up with two current - // heads. We'll detect that next time we load the view. - std::fs::remove_file(op_heads_dir.join(id.hex())).ok(); -} - -fn get_op_heads(op_heads_dir: &Path) -> Vec { - let mut op_heads = vec![]; - for op_head_entry in std::fs::read_dir(op_heads_dir).unwrap() { - let op_head_file_name = op_head_entry.unwrap().file_name(); - let op_head_file_name = op_head_file_name.to_str().unwrap(); - if let Ok(op_head) = hex::decode(op_head_file_name) { - op_heads.push(OperationId(op_head)); - } - } - op_heads -} - pub fn merge_views( store: &StoreWrapper, left: &op_store::View, @@ -252,9 +230,10 @@ fn get_single_op_head( store: &StoreWrapper, op_store: &Arc, index_store: &Arc, + op_head_store: &Arc, op_heads_dir: &Path, ) -> Result<(OperationId, op_store::Operation, op_store::View), OpHeadResolutionError> { - let mut op_heads = get_op_heads(&op_heads_dir); + let mut op_heads = op_head_store.get_op_heads(); if op_heads.is_empty() { return Err(OpHeadResolutionError::NoHeads); @@ -275,8 +254,9 @@ fn get_single_op_head( // Note that the locking isn't necessary for correctness; we take the lock // only to avoid other concurrent processes from doing the same work (and // producing another set of divergent heads). + // TODO: Add a function to OpHeadsStore for taking the lock? let _lock = FileLock::lock(op_heads_dir.join("lock")); - let op_heads = get_op_heads(&op_heads_dir); + let op_heads = op_head_store.get_op_heads(); if op_heads.is_empty() { return Err(OpHeadResolutionError::NoHeads); @@ -292,12 +272,12 @@ fn get_single_op_head( let (merge_operation_id, merge_operation, merged_view) = merge_op_heads(store, op_store, index_store, &op_heads)?; - add_op_head(&op_heads_dir, &merge_operation_id); + op_head_store.add_op_head(&merge_operation_id); for old_op_head_id in op_heads { // The merged one will be in the input to the merge if it's a "fast-forward" // merge. if old_op_head_id != merge_operation_id { - remove_op_head(&op_heads_dir, &old_op_head_id); + op_head_store.remove_op_head(&old_op_head_id); } } Ok((merge_operation_id, merge_operation, merged_view)) @@ -391,12 +371,14 @@ impl ReadonlyView { let op_heads_dir = path.join("op_heads"); std::fs::create_dir(&op_heads_dir).unwrap(); - add_op_head(&op_heads_dir, &init_operation_id); + let op_heads_store = Arc::new(OpHeadsStore::init(op_heads_dir)); + op_heads_store.add_op_head(&init_operation_id); ReadonlyView { store, path, op_store, + op_heads_store, op_id: init_operation_id, index_store, data: root_view, @@ -410,12 +392,20 @@ impl ReadonlyView { path: PathBuf, ) -> Self { let op_heads_dir = path.join("op_heads"); - let (op_id, _operation, view) = - get_single_op_head(&store, &op_store, &index_store, &op_heads_dir).unwrap(); + let op_heads_store = Arc::new(OpHeadsStore::load(op_heads_dir.clone())); + let (op_id, _operation, view) = get_single_op_head( + &store, + &op_store, + &index_store, + &op_heads_store, + &op_heads_dir, + ) + .unwrap(); ReadonlyView { store, path, op_store, + op_heads_store, op_id, index_store, data: view, @@ -429,10 +419,13 @@ impl ReadonlyView { path: PathBuf, operation: &Operation, ) -> Self { + let op_heads_dir = path.join("op_heads"); + let op_heads_store = Arc::new(OpHeadsStore::load(op_heads_dir)); ReadonlyView { store, path, op_store, + op_heads_store, op_id: operation.id().clone(), index_store, data: operation.view().take_store_view(), @@ -445,6 +438,7 @@ impl ReadonlyView { &self.store, &self.op_store, &self.index_store, + &self.op_heads_store, &op_heads_dir, ) .unwrap(); @@ -462,8 +456,8 @@ impl ReadonlyView { // TODO: Avoid the cloning of the sets here. MutableView { store: self.store.clone(), - path: self.path.clone(), op_store: self.op_store.clone(), + op_heads_store: self.op_heads_store.clone(), base_op_id: self.op_id.clone(), data: self.data.clone(), } @@ -572,11 +566,6 @@ impl MutableView { } pub fn update_op_heads(&self, op: &Operation) { - let op_heads_dir = self.path.join("op_heads"); - let _op_heads_lock = FileLock::lock(op_heads_dir.join("lock")); - add_op_head(&op_heads_dir, op.id()); - for old_parent_id in op.parent_ids() { - remove_op_head(&op_heads_dir, old_parent_id); - } + self.op_heads_store.update_op_heads(op) } }