op_store: inline ProtoOpStore into SimpleOpStore

The `ProtoOpStore` was separated out to simplify the migration from
Thrift. Now that the `ThriftOpStore` is gone, we can inline
`ProtoOpStore` as the TODO says.
This commit is contained in:
Martin von Zweigbergk 2023-03-30 13:11:18 -07:00 committed by Martin von Zweigbergk
parent 68fb46b2af
commit 6643fb2bff
3 changed files with 305 additions and 357 deletions

View file

@ -40,7 +40,6 @@ pub mod nightly_shims;
pub mod op_heads_store;
pub mod op_store;
pub mod operation;
mod proto_op_store;
pub mod protos;
pub mod refs;
pub mod repo;

View file

@ -1,342 +0,0 @@
// Copyright 2020 The Jujutsu Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::BTreeMap;
use std::fmt::Debug;
use std::fs;
use std::io::{ErrorKind, Write};
use std::path::PathBuf;
use itertools::Itertools;
use prost::Message;
use tempfile::NamedTempFile;
use crate::backend::{CommitId, MillisSinceEpoch, ObjectId, Timestamp};
use crate::content_hash::blake2b_hash;
use crate::file_util::persist_content_addressed_temp_file;
use crate::op_store::{
BranchTarget, OpStoreError, OpStoreResult, Operation, OperationId, OperationMetadata,
RefTarget, View, ViewId, WorkspaceId,
};
impl From<prost::DecodeError> for OpStoreError {
fn from(err: prost::DecodeError) -> Self {
OpStoreError::Other(err.to_string())
}
}
#[derive(Debug)]
pub struct ProtoOpStore {
path: PathBuf,
}
impl ProtoOpStore {
pub fn init(store_path: PathBuf) -> Self {
fs::create_dir(store_path.join("views")).unwrap();
fs::create_dir(store_path.join("operations")).unwrap();
ProtoOpStore { path: store_path }
}
pub fn load(store_path: PathBuf) -> Self {
ProtoOpStore { path: store_path }
}
fn view_path(&self, id: &ViewId) -> PathBuf {
self.path.join("views").join(id.hex())
}
fn operation_path(&self, id: &OperationId) -> PathBuf {
self.path.join("operations").join(id.hex())
}
pub fn read_view(&self, id: &ViewId) -> OpStoreResult<View> {
let path = self.view_path(id);
let buf = fs::read(path)?;
let proto = crate::protos::op_store::View::decode(&*buf)?;
Ok(view_from_proto(proto))
}
pub fn write_view(&self, view: &View) -> OpStoreResult<ViewId> {
let temp_file = NamedTempFile::new_in(&self.path)?;
let proto = view_to_proto(view);
temp_file.as_file().write_all(&proto.encode_to_vec())?;
let id = ViewId::new(blake2b_hash(view).to_vec());
persist_content_addressed_temp_file(temp_file, self.view_path(&id))?;
Ok(id)
}
pub fn read_operation(&self, id: &OperationId) -> OpStoreResult<Operation> {
let path = self.operation_path(id);
let buf = fs::read(path).map_err(not_found_to_store_error)?;
let proto = crate::protos::op_store::Operation::decode(&*buf)?;
Ok(operation_from_proto(proto))
}
pub fn write_operation(&self, operation: &Operation) -> OpStoreResult<OperationId> {
let temp_file = NamedTempFile::new_in(&self.path)?;
let proto = operation_to_proto(operation);
temp_file.as_file().write_all(&proto.encode_to_vec())?;
let id = OperationId::new(blake2b_hash(operation).to_vec());
persist_content_addressed_temp_file(temp_file, self.operation_path(&id))?;
Ok(id)
}
}
fn not_found_to_store_error(err: std::io::Error) -> OpStoreError {
if err.kind() == ErrorKind::NotFound {
OpStoreError::NotFound
} else {
OpStoreError::from(err)
}
}
fn timestamp_to_proto(timestamp: &Timestamp) -> crate::protos::op_store::Timestamp {
crate::protos::op_store::Timestamp {
millis_since_epoch: timestamp.timestamp.0,
tz_offset: timestamp.tz_offset,
}
}
fn timestamp_from_proto(proto: crate::protos::op_store::Timestamp) -> Timestamp {
Timestamp {
timestamp: MillisSinceEpoch(proto.millis_since_epoch),
tz_offset: proto.tz_offset,
}
}
fn operation_metadata_to_proto(
metadata: &OperationMetadata,
) -> crate::protos::op_store::OperationMetadata {
crate::protos::op_store::OperationMetadata {
start_time: Some(timestamp_to_proto(&metadata.start_time)),
end_time: Some(timestamp_to_proto(&metadata.end_time)),
description: metadata.description.clone(),
hostname: metadata.hostname.clone(),
username: metadata.username.clone(),
tags: metadata.tags.clone(),
}
}
fn operation_metadata_from_proto(
proto: crate::protos::op_store::OperationMetadata,
) -> OperationMetadata {
let start_time = timestamp_from_proto(proto.start_time.unwrap_or_default());
let end_time = timestamp_from_proto(proto.end_time.unwrap_or_default());
OperationMetadata {
start_time,
end_time,
description: proto.description,
hostname: proto.hostname,
username: proto.username,
tags: proto.tags,
}
}
fn operation_to_proto(operation: &Operation) -> crate::protos::op_store::Operation {
let mut proto = crate::protos::op_store::Operation {
view_id: operation.view_id.as_bytes().to_vec(),
metadata: Some(operation_metadata_to_proto(&operation.metadata)),
..Default::default()
};
for parent in &operation.parents {
proto.parents.push(parent.to_bytes());
}
proto
}
fn operation_from_proto(proto: crate::protos::op_store::Operation) -> Operation {
let parents = proto.parents.into_iter().map(OperationId::new).collect();
let view_id = ViewId::new(proto.view_id);
let metadata = operation_metadata_from_proto(proto.metadata.unwrap_or_default());
Operation {
view_id,
parents,
metadata,
}
}
fn view_to_proto(view: &View) -> crate::protos::op_store::View {
let mut proto = crate::protos::op_store::View::default();
for (workspace_id, commit_id) in &view.wc_commit_ids {
proto
.wc_commit_ids
.insert(workspace_id.as_str().to_string(), commit_id.to_bytes());
}
for head_id in &view.head_ids {
proto.head_ids.push(head_id.to_bytes());
}
for head_id in &view.public_head_ids {
proto.public_head_ids.push(head_id.to_bytes());
}
for (name, target) in &view.branches {
let mut branch_proto = crate::protos::op_store::Branch {
name: name.clone(),
..Default::default()
};
branch_proto.name = name.clone();
if let Some(local_target) = &target.local_target {
branch_proto.local_target = Some(ref_target_to_proto(local_target));
}
for (remote_name, target) in &target.remote_targets {
branch_proto
.remote_branches
.push(crate::protos::op_store::RemoteBranch {
remote_name: remote_name.clone(),
target: Some(ref_target_to_proto(target)),
});
}
proto.branches.push(branch_proto);
}
for (name, target) in &view.tags {
proto.tags.push(crate::protos::op_store::Tag {
name: name.clone(),
target: Some(ref_target_to_proto(target)),
});
}
for (git_ref_name, target) in &view.git_refs {
proto.git_refs.push(crate::protos::op_store::GitRef {
name: git_ref_name.clone(),
target: Some(ref_target_to_proto(target)),
..Default::default()
});
}
if let Some(git_head) = &view.git_head {
proto.git_head = Some(ref_target_to_proto(git_head));
}
proto
}
fn view_from_proto(proto: crate::protos::op_store::View) -> View {
let mut view = View::default();
// For compatibility with old repos before we had support for multiple working
// copies
#[allow(deprecated)]
if !proto.wc_commit_id.is_empty() {
view.wc_commit_ids
.insert(WorkspaceId::default(), CommitId::new(proto.wc_commit_id));
}
for (workspace_id, commit_id) in proto.wc_commit_ids {
view.wc_commit_ids
.insert(WorkspaceId::new(workspace_id), CommitId::new(commit_id));
}
for head_id_bytes in proto.head_ids {
view.head_ids.insert(CommitId::new(head_id_bytes));
}
for head_id_bytes in proto.public_head_ids {
view.public_head_ids.insert(CommitId::new(head_id_bytes));
}
for branch_proto in proto.branches {
let local_target = branch_proto.local_target.map(ref_target_from_proto);
let mut remote_targets = BTreeMap::new();
for remote_branch in branch_proto.remote_branches {
remote_targets.insert(
remote_branch.remote_name,
ref_target_from_proto(remote_branch.target.unwrap_or_default()),
);
}
view.branches.insert(
branch_proto.name.clone(),
BranchTarget {
local_target,
remote_targets,
},
);
}
for tag_proto in proto.tags {
view.tags.insert(
tag_proto.name,
ref_target_from_proto(tag_proto.target.unwrap_or_default()),
);
}
for git_ref in proto.git_refs {
if let Some(target) = git_ref.target {
view.git_refs
.insert(git_ref.name, ref_target_from_proto(target));
} else {
// Legacy format
view.git_refs.insert(
git_ref.name,
RefTarget::Normal(CommitId::new(git_ref.commit_id)),
);
}
}
#[allow(deprecated)]
if let Some(git_head) = proto.git_head.as_ref() {
view.git_head = Some(ref_target_from_proto(git_head.clone()));
} else if !proto.git_head_legacy.is_empty() {
view.git_head = Some(RefTarget::Normal(CommitId::new(proto.git_head_legacy)));
}
view
}
fn ref_target_to_proto(value: &RefTarget) -> crate::protos::op_store::RefTarget {
let mut proto = crate::protos::op_store::RefTarget::default();
match value {
RefTarget::Normal(id) => {
proto.value = Some(crate::protos::op_store::ref_target::Value::CommitId(
id.to_bytes(),
));
}
RefTarget::Conflict { removes, adds } => {
let mut ref_conflict_proto = crate::protos::op_store::RefConflict::default();
for id in removes {
ref_conflict_proto.removes.push(id.to_bytes());
}
for id in adds {
ref_conflict_proto.adds.push(id.to_bytes());
}
proto.value = Some(crate::protos::op_store::ref_target::Value::Conflict(
ref_conflict_proto,
));
}
}
proto
}
fn ref_target_from_proto(proto: crate::protos::op_store::RefTarget) -> RefTarget {
match proto.value.unwrap() {
crate::protos::op_store::ref_target::Value::CommitId(id) => {
RefTarget::Normal(CommitId::new(id))
}
crate::protos::op_store::ref_target::Value::Conflict(conflict) => {
let removes = conflict
.removes
.into_iter()
.map(CommitId::new)
.collect_vec();
let adds = conflict.adds.into_iter().map(CommitId::new).collect_vec();
RefTarget::Conflict { removes, adds }
}
}
}

View file

@ -12,13 +12,23 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::BTreeMap;
use std::fmt::Debug;
use std::path::Path;
use std::fs;
use std::io::{ErrorKind, Write};
use std::path::{Path, PathBuf};
use tempfile::PersistError;
use itertools::Itertools;
use prost::Message;
use tempfile::{NamedTempFile, PersistError};
use crate::op_store::{OpStore, OpStoreError, OpStoreResult, Operation, OperationId, View, ViewId};
use crate::proto_op_store::ProtoOpStore;
use crate::backend::{CommitId, MillisSinceEpoch, ObjectId, Timestamp};
use crate::content_hash::blake2b_hash;
use crate::file_util::persist_content_addressed_temp_file;
use crate::op_store::{
BranchTarget, OpStore, OpStoreError, OpStoreResult, Operation, OperationId, OperationMetadata,
RefTarget, View, ViewId, WorkspaceId,
};
impl From<std::io::Error> for OpStoreError {
fn from(err: std::io::Error) -> Self {
@ -32,21 +42,38 @@ impl From<PersistError> for OpStoreError {
}
}
// TODO: In version 0.7.0 or so, inline ProtoOpStore into this type
impl From<prost::DecodeError> for OpStoreError {
fn from(err: prost::DecodeError) -> Self {
OpStoreError::Other(err.to_string())
}
}
#[derive(Debug)]
pub struct SimpleOpStore {
delegate: ProtoOpStore,
path: PathBuf,
}
impl SimpleOpStore {
pub fn init(store_path: &Path) -> Self {
let delegate = ProtoOpStore::init(store_path.to_path_buf());
SimpleOpStore { delegate }
fs::create_dir(store_path.join("views")).unwrap();
fs::create_dir(store_path.join("operations")).unwrap();
SimpleOpStore {
path: store_path.to_owned(),
}
}
pub fn load(store_path: &Path) -> Self {
let delegate = ProtoOpStore::load(store_path.to_path_buf());
SimpleOpStore { delegate }
SimpleOpStore {
path: store_path.to_path_buf(),
}
}
fn view_path(&self, id: &ViewId) -> PathBuf {
self.path.join("views").join(id.hex())
}
fn operation_path(&self, id: &OperationId) -> PathBuf {
self.path.join("operations").join(id.hex())
}
}
@ -56,19 +83,283 @@ impl OpStore for SimpleOpStore {
}
fn read_view(&self, id: &ViewId) -> OpStoreResult<View> {
self.delegate.read_view(id)
let path = self.view_path(id);
let buf = fs::read(path)?;
let proto = crate::protos::op_store::View::decode(&*buf)?;
Ok(view_from_proto(proto))
}
fn write_view(&self, view: &View) -> OpStoreResult<ViewId> {
self.delegate.write_view(view)
let temp_file = NamedTempFile::new_in(&self.path)?;
let proto = view_to_proto(view);
temp_file.as_file().write_all(&proto.encode_to_vec())?;
let id = ViewId::new(blake2b_hash(view).to_vec());
persist_content_addressed_temp_file(temp_file, self.view_path(&id))?;
Ok(id)
}
fn read_operation(&self, id: &OperationId) -> OpStoreResult<Operation> {
self.delegate.read_operation(id)
let path = self.operation_path(id);
let buf = fs::read(path).map_err(not_found_to_store_error)?;
let proto = crate::protos::op_store::Operation::decode(&*buf)?;
Ok(operation_from_proto(proto))
}
fn write_operation(&self, operation: &Operation) -> OpStoreResult<OperationId> {
self.delegate.write_operation(operation)
let temp_file = NamedTempFile::new_in(&self.path)?;
let proto = operation_to_proto(operation);
temp_file.as_file().write_all(&proto.encode_to_vec())?;
let id = OperationId::new(blake2b_hash(operation).to_vec());
persist_content_addressed_temp_file(temp_file, self.operation_path(&id))?;
Ok(id)
}
}
fn not_found_to_store_error(err: std::io::Error) -> OpStoreError {
if err.kind() == ErrorKind::NotFound {
OpStoreError::NotFound
} else {
OpStoreError::from(err)
}
}
fn timestamp_to_proto(timestamp: &Timestamp) -> crate::protos::op_store::Timestamp {
crate::protos::op_store::Timestamp {
millis_since_epoch: timestamp.timestamp.0,
tz_offset: timestamp.tz_offset,
}
}
fn timestamp_from_proto(proto: crate::protos::op_store::Timestamp) -> Timestamp {
Timestamp {
timestamp: MillisSinceEpoch(proto.millis_since_epoch),
tz_offset: proto.tz_offset,
}
}
fn operation_metadata_to_proto(
metadata: &OperationMetadata,
) -> crate::protos::op_store::OperationMetadata {
crate::protos::op_store::OperationMetadata {
start_time: Some(timestamp_to_proto(&metadata.start_time)),
end_time: Some(timestamp_to_proto(&metadata.end_time)),
description: metadata.description.clone(),
hostname: metadata.hostname.clone(),
username: metadata.username.clone(),
tags: metadata.tags.clone(),
}
}
fn operation_metadata_from_proto(
proto: crate::protos::op_store::OperationMetadata,
) -> OperationMetadata {
let start_time = timestamp_from_proto(proto.start_time.unwrap_or_default());
let end_time = timestamp_from_proto(proto.end_time.unwrap_or_default());
OperationMetadata {
start_time,
end_time,
description: proto.description,
hostname: proto.hostname,
username: proto.username,
tags: proto.tags,
}
}
fn operation_to_proto(operation: &Operation) -> crate::protos::op_store::Operation {
let mut proto = crate::protos::op_store::Operation {
view_id: operation.view_id.as_bytes().to_vec(),
metadata: Some(operation_metadata_to_proto(&operation.metadata)),
..Default::default()
};
for parent in &operation.parents {
proto.parents.push(parent.to_bytes());
}
proto
}
fn operation_from_proto(proto: crate::protos::op_store::Operation) -> Operation {
let parents = proto.parents.into_iter().map(OperationId::new).collect();
let view_id = ViewId::new(proto.view_id);
let metadata = operation_metadata_from_proto(proto.metadata.unwrap_or_default());
Operation {
view_id,
parents,
metadata,
}
}
fn view_to_proto(view: &View) -> crate::protos::op_store::View {
let mut proto = crate::protos::op_store::View::default();
for (workspace_id, commit_id) in &view.wc_commit_ids {
proto
.wc_commit_ids
.insert(workspace_id.as_str().to_string(), commit_id.to_bytes());
}
for head_id in &view.head_ids {
proto.head_ids.push(head_id.to_bytes());
}
for head_id in &view.public_head_ids {
proto.public_head_ids.push(head_id.to_bytes());
}
for (name, target) in &view.branches {
let mut branch_proto = crate::protos::op_store::Branch {
name: name.clone(),
..Default::default()
};
branch_proto.name = name.clone();
if let Some(local_target) = &target.local_target {
branch_proto.local_target = Some(ref_target_to_proto(local_target));
}
for (remote_name, target) in &target.remote_targets {
branch_proto
.remote_branches
.push(crate::protos::op_store::RemoteBranch {
remote_name: remote_name.clone(),
target: Some(ref_target_to_proto(target)),
});
}
proto.branches.push(branch_proto);
}
for (name, target) in &view.tags {
proto.tags.push(crate::protos::op_store::Tag {
name: name.clone(),
target: Some(ref_target_to_proto(target)),
});
}
for (git_ref_name, target) in &view.git_refs {
proto.git_refs.push(crate::protos::op_store::GitRef {
name: git_ref_name.clone(),
target: Some(ref_target_to_proto(target)),
..Default::default()
});
}
if let Some(git_head) = &view.git_head {
proto.git_head = Some(ref_target_to_proto(git_head));
}
proto
}
fn view_from_proto(proto: crate::protos::op_store::View) -> View {
let mut view = View::default();
// For compatibility with old repos before we had support for multiple working
// copies
#[allow(deprecated)]
if !proto.wc_commit_id.is_empty() {
view.wc_commit_ids
.insert(WorkspaceId::default(), CommitId::new(proto.wc_commit_id));
}
for (workspace_id, commit_id) in proto.wc_commit_ids {
view.wc_commit_ids
.insert(WorkspaceId::new(workspace_id), CommitId::new(commit_id));
}
for head_id_bytes in proto.head_ids {
view.head_ids.insert(CommitId::new(head_id_bytes));
}
for head_id_bytes in proto.public_head_ids {
view.public_head_ids.insert(CommitId::new(head_id_bytes));
}
for branch_proto in proto.branches {
let local_target = branch_proto.local_target.map(ref_target_from_proto);
let mut remote_targets = BTreeMap::new();
for remote_branch in branch_proto.remote_branches {
remote_targets.insert(
remote_branch.remote_name,
ref_target_from_proto(remote_branch.target.unwrap_or_default()),
);
}
view.branches.insert(
branch_proto.name.clone(),
BranchTarget {
local_target,
remote_targets,
},
);
}
for tag_proto in proto.tags {
view.tags.insert(
tag_proto.name,
ref_target_from_proto(tag_proto.target.unwrap_or_default()),
);
}
for git_ref in proto.git_refs {
if let Some(target) = git_ref.target {
view.git_refs
.insert(git_ref.name, ref_target_from_proto(target));
} else {
// Legacy format
view.git_refs.insert(
git_ref.name,
RefTarget::Normal(CommitId::new(git_ref.commit_id)),
);
}
}
#[allow(deprecated)]
if let Some(git_head) = proto.git_head.as_ref() {
view.git_head = Some(ref_target_from_proto(git_head.clone()));
} else if !proto.git_head_legacy.is_empty() {
view.git_head = Some(RefTarget::Normal(CommitId::new(proto.git_head_legacy)));
}
view
}
fn ref_target_to_proto(value: &RefTarget) -> crate::protos::op_store::RefTarget {
let mut proto = crate::protos::op_store::RefTarget::default();
match value {
RefTarget::Normal(id) => {
proto.value = Some(crate::protos::op_store::ref_target::Value::CommitId(
id.to_bytes(),
));
}
RefTarget::Conflict { removes, adds } => {
let mut ref_conflict_proto = crate::protos::op_store::RefConflict::default();
for id in removes {
ref_conflict_proto.removes.push(id.to_bytes());
}
for id in adds {
ref_conflict_proto.adds.push(id.to_bytes());
}
proto.value = Some(crate::protos::op_store::ref_target::Value::Conflict(
ref_conflict_proto,
));
}
}
proto
}
fn ref_target_from_proto(proto: crate::protos::op_store::RefTarget) -> RefTarget {
match proto.value.unwrap() {
crate::protos::op_store::ref_target::Value::CommitId(id) => {
RefTarget::Normal(CommitId::new(id))
}
crate::protos::op_store::ref_target::Value::Conflict(conflict) => {
let removes = conflict
.removes
.into_iter()
.map(CommitId::new)
.collect_vec();
let adds = conflict.adds.into_iter().map(CommitId::new).collect_vec();
RefTarget::Conflict { removes, adds }
}
}
}