forked from mirrors/jj
stacked_table: add a generic store based on the stacked-table format
The new store works the same way as the `OpHeadsStore`. It keeps track of the current head file(s) by recording their names in a directory. When a write happens, it adds the new head and then removes the old head. There will be generally be a single head at a time. The only exception is when there's been concurrent operations (locally, or remotely, in the case of a distributed file system). When there are multiple heads files, they are automatically merged. No guarantee is given about which value wins if the key exists in several heads; the store is meant to be used for data that's immutable once written. As long as different keys are written, this is a CRDT. That makes it fit for solving both #3 and #7.
This commit is contained in:
parent
e86d266e6b
commit
85773cf81f
1 changed files with 178 additions and 27 deletions
|
@ -33,8 +33,9 @@ use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
|
|||
use tempfile::NamedTempFile;
|
||||
|
||||
use crate::file_util::persist_content_addressed_temp_file;
|
||||
use crate::lock::FileLock;
|
||||
|
||||
trait TableSegment {
|
||||
pub trait TableSegment {
|
||||
fn segment_num_entries(&self) -> usize;
|
||||
fn segment_parent_file(&self) -> &Option<Arc<ReadonlyTable>>;
|
||||
fn segment_get_value(&self, key: &[u8]) -> Option<&[u8]>;
|
||||
|
@ -74,9 +75,9 @@ pub struct ReadonlyTable {
|
|||
}
|
||||
|
||||
impl ReadonlyTable {
|
||||
pub fn load_from(
|
||||
fn load_from(
|
||||
file: &mut dyn Read,
|
||||
dir: PathBuf,
|
||||
store: &TableStore,
|
||||
name: String,
|
||||
key_size: usize,
|
||||
) -> io::Result<Arc<ReadonlyTable>> {
|
||||
|
@ -86,10 +87,7 @@ impl ReadonlyTable {
|
|||
let mut parent_filename_bytes = vec![0; parent_filename_len as usize];
|
||||
file.read_exact(&mut parent_filename_bytes)?;
|
||||
let parent_filename = String::from_utf8(parent_filename_bytes).unwrap();
|
||||
let parent_file_path = dir.join(&parent_filename);
|
||||
let mut parent_file = File::open(&parent_file_path).unwrap();
|
||||
let parent_file =
|
||||
ReadonlyTable::load_from(&mut parent_file, dir, parent_filename, key_size)?;
|
||||
let parent_file = store.load_table(parent_filename)?;
|
||||
maybe_parent_file = Some(parent_file);
|
||||
} else {
|
||||
maybe_parent_file = None;
|
||||
|
@ -110,6 +108,10 @@ impl ReadonlyTable {
|
|||
}))
|
||||
}
|
||||
|
||||
pub fn start_modification(self: &Arc<Self>) -> MutableTable {
|
||||
MutableTable::incremental(self.clone())
|
||||
}
|
||||
|
||||
fn segment_value_offset_by_pos(&self, pos: usize) -> usize {
|
||||
if pos == self.num_local_entries {
|
||||
self.values.len()
|
||||
|
@ -201,7 +203,7 @@ pub struct MutableTable {
|
|||
}
|
||||
|
||||
impl MutableTable {
|
||||
pub fn full(key_size: usize) -> Self {
|
||||
fn full(key_size: usize) -> Self {
|
||||
Self {
|
||||
key_size,
|
||||
parent_file: None,
|
||||
|
@ -209,7 +211,7 @@ impl MutableTable {
|
|||
}
|
||||
}
|
||||
|
||||
pub fn incremental(parent_file: Arc<ReadonlyTable>) -> Self {
|
||||
fn incremental(parent_file: Arc<ReadonlyTable>) -> Self {
|
||||
let key_size = parent_file.key_size;
|
||||
Self {
|
||||
key_size,
|
||||
|
@ -227,7 +229,7 @@ impl MutableTable {
|
|||
other.segment_add_entries_to(self);
|
||||
}
|
||||
|
||||
pub fn merge_in(&mut self, other: &Arc<ReadonlyTable>) {
|
||||
fn merge_in(&mut self, other: &Arc<ReadonlyTable>) {
|
||||
let mut maybe_own_ancestor = self.parent_file.clone();
|
||||
let mut maybe_other_ancestor = Some(other.clone());
|
||||
let mut files_to_add = vec![];
|
||||
|
@ -323,26 +325,24 @@ impl MutableTable {
|
|||
squashed
|
||||
}
|
||||
|
||||
pub fn save_in(self, dir: PathBuf) -> io::Result<Arc<ReadonlyTable>> {
|
||||
fn save_in(self, store: &TableStore) -> io::Result<Arc<ReadonlyTable>> {
|
||||
if self.entries.is_empty() && self.parent_file.is_some() {
|
||||
return Ok(self.parent_file.unwrap());
|
||||
}
|
||||
|
||||
let key_size = self.key_size;
|
||||
|
||||
let buf = self.maybe_squash_with_ancestors().serialize();
|
||||
let mut hasher = Blake2b::new();
|
||||
hasher.update(&buf);
|
||||
let file_id_hex = hex::encode(&hasher.finalize());
|
||||
let file_path = dir.join(&file_id_hex);
|
||||
let file_path = store.dir.join(&file_id_hex);
|
||||
|
||||
let mut temp_file = NamedTempFile::new_in(&dir)?;
|
||||
let mut temp_file = NamedTempFile::new_in(&store.dir)?;
|
||||
let file = temp_file.as_file_mut();
|
||||
file.write_all(&buf)?;
|
||||
persist_content_addressed_temp_file(temp_file, &file_path)?;
|
||||
|
||||
let mut cursor = Cursor::new(&buf);
|
||||
ReadonlyTable::load_from(&mut cursor, dir, file_id_hex, key_size)
|
||||
ReadonlyTable::load_from(&mut cursor, store, file_id_hex, store.key_size)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -366,6 +366,115 @@ impl TableSegment for MutableTable {
|
|||
}
|
||||
}
|
||||
|
||||
pub struct TableStore {
|
||||
dir: PathBuf,
|
||||
key_size: usize,
|
||||
}
|
||||
|
||||
impl TableStore {
|
||||
pub fn init(dir: PathBuf, key_size: usize) -> Self {
|
||||
std::fs::create_dir(dir.join("heads")).unwrap();
|
||||
TableStore { dir, key_size }
|
||||
}
|
||||
|
||||
pub fn reinit(&self) {
|
||||
std::fs::remove_dir_all(self.dir.join("heads")).unwrap();
|
||||
TableStore::init(self.dir.clone(), self.key_size);
|
||||
}
|
||||
|
||||
pub fn key_size(&self) -> usize {
|
||||
self.key_size
|
||||
}
|
||||
|
||||
pub fn load(dir: PathBuf, key_size: usize) -> Self {
|
||||
TableStore { dir, key_size }
|
||||
}
|
||||
|
||||
pub fn save_table(&self, mut_table: MutableTable) -> io::Result<Arc<ReadonlyTable>> {
|
||||
let maybe_parent_table = mut_table.parent_file.clone();
|
||||
let table = mut_table.save_in(self)?;
|
||||
self.add_head(&table)?;
|
||||
if let Some(parent_table) = maybe_parent_table {
|
||||
self.remove_head(&parent_table);
|
||||
}
|
||||
Ok(table)
|
||||
}
|
||||
|
||||
fn add_head(&self, table: &Arc<ReadonlyTable>) -> std::io::Result<()> {
|
||||
std::fs::write(self.dir.join("heads").join(&table.name), "")
|
||||
}
|
||||
|
||||
fn remove_head(&self, table: &Arc<ReadonlyTable>) {
|
||||
// It's fine if the old head was not found. It probably means
|
||||
// that we're on a distributed file system where the locking
|
||||
// doesn't work. We'll probably end up with two current
|
||||
// heads. We'll detect that next time we load the table.
|
||||
std::fs::remove_file(self.dir.join("heads").join(&table.name)).ok();
|
||||
}
|
||||
|
||||
fn lock(&self) -> FileLock {
|
||||
FileLock::lock(self.dir.join("lock"))
|
||||
}
|
||||
|
||||
fn load_table(&self, name: String) -> std::io::Result<Arc<ReadonlyTable>> {
|
||||
// TODO: Add caching
|
||||
let table_file_path = self.dir.join(&name);
|
||||
let mut table_file = File::open(&table_file_path)?;
|
||||
ReadonlyTable::load_from(&mut table_file, self, name, self.key_size)
|
||||
}
|
||||
|
||||
fn get_head_tables(&self) -> io::Result<Vec<Arc<ReadonlyTable>>> {
|
||||
let mut tables = vec![];
|
||||
for head_entry in std::fs::read_dir(self.dir.join("heads"))? {
|
||||
let head_file_name = head_entry?.file_name();
|
||||
let table = self.load_table(head_file_name.to_str().unwrap().to_string())?;
|
||||
tables.push(table);
|
||||
}
|
||||
Ok(tables)
|
||||
}
|
||||
|
||||
pub fn get_head(&self) -> io::Result<Arc<ReadonlyTable>> {
|
||||
let mut tables = self.get_head_tables()?;
|
||||
|
||||
if tables.is_empty() {
|
||||
let empty_table = MutableTable::full(self.key_size);
|
||||
self.save_table(empty_table)
|
||||
} else if tables.len() == 1 {
|
||||
Ok(tables.pop().unwrap())
|
||||
} else {
|
||||
// There are multiple heads. We take a lock, then check if there are still
|
||||
// multiple heads (it's likely that another process was in the process of
|
||||
// deleting on of them). If there are still multiple heads, we attempt to
|
||||
// merge all the tables into one. We then save that table and record the new
|
||||
// head. Note that the locking isn't necessary for correctness; we
|
||||
// take the lock only to avoid other concurrent processes from doing
|
||||
// the same work (and producing another set of divergent heads).
|
||||
let _lock = self.lock();
|
||||
let mut tables = self.get_head_tables()?;
|
||||
|
||||
if tables.is_empty() {
|
||||
let empty_table = MutableTable::full(self.key_size);
|
||||
return self.save_table(empty_table);
|
||||
}
|
||||
|
||||
if tables.len() == 1 {
|
||||
// Return early so we don't write a table with no changes compared to its parent
|
||||
return Ok(tables.pop().unwrap());
|
||||
}
|
||||
|
||||
let mut merged_table = MutableTable::incremental(tables[0].clone());
|
||||
for other in &tables[1..] {
|
||||
merged_table.merge_in(other);
|
||||
}
|
||||
let merged_table = self.save_table(merged_table)?;
|
||||
for table in &tables[1..] {
|
||||
self.remove_head(table);
|
||||
}
|
||||
Ok(merged_table)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use test_case::test_case;
|
||||
|
@ -376,10 +485,11 @@ mod tests {
|
|||
#[test_case(true; "file")]
|
||||
fn stacked_table_empty(on_disk: bool) {
|
||||
let temp_dir = tempfile::tempdir().unwrap();
|
||||
let mut_table = MutableTable::full(3);
|
||||
let store = TableStore::init(temp_dir.path().to_path_buf(), 3);
|
||||
let mut_table = store.get_head().unwrap().start_modification();
|
||||
let mut _saved_table = None;
|
||||
let table: &dyn TableSegment = if on_disk {
|
||||
_saved_table = Some(mut_table.save_in(temp_dir.path().to_owned()).unwrap());
|
||||
_saved_table = Some(store.save_table(mut_table).unwrap());
|
||||
_saved_table.as_ref().unwrap().as_ref()
|
||||
} else {
|
||||
&mut_table
|
||||
|
@ -395,11 +505,12 @@ mod tests {
|
|||
#[test_case(true; "file")]
|
||||
fn stacked_table_single_key(on_disk: bool) {
|
||||
let temp_dir = tempfile::tempdir().unwrap();
|
||||
let mut mut_table = MutableTable::full(3);
|
||||
let store = TableStore::init(temp_dir.path().to_path_buf(), 3);
|
||||
let mut mut_table = store.get_head().unwrap().start_modification();
|
||||
mut_table.add_entry(b"abc".to_vec(), b"value".to_vec());
|
||||
let mut _saved_table = None;
|
||||
let table: &dyn TableSegment = if on_disk {
|
||||
_saved_table = Some(mut_table.save_in(temp_dir.path().to_owned()).unwrap());
|
||||
_saved_table = Some(store.save_table(mut_table).unwrap());
|
||||
_saved_table.as_ref().unwrap().as_ref()
|
||||
} else {
|
||||
&mut_table
|
||||
|
@ -415,13 +526,14 @@ mod tests {
|
|||
#[test_case(true; "file")]
|
||||
fn stacked_table_multiple_keys(on_disk: bool) {
|
||||
let temp_dir = tempfile::tempdir().unwrap();
|
||||
let mut mut_table = MutableTable::full(3);
|
||||
let store = TableStore::init(temp_dir.path().to_path_buf(), 3);
|
||||
let mut mut_table = store.get_head().unwrap().start_modification();
|
||||
mut_table.add_entry(b"zzz".to_vec(), b"val3".to_vec());
|
||||
mut_table.add_entry(b"abc".to_vec(), b"value1".to_vec());
|
||||
mut_table.add_entry(b"abd".to_vec(), b"value 2".to_vec());
|
||||
let mut _saved_table = None;
|
||||
let table: &dyn TableSegment = if on_disk {
|
||||
_saved_table = Some(mut_table.save_in(temp_dir.path().to_owned()).unwrap());
|
||||
_saved_table = Some(store.save_table(mut_table).unwrap());
|
||||
_saved_table.as_ref().unwrap().as_ref()
|
||||
} else {
|
||||
&mut_table
|
||||
|
@ -440,7 +552,8 @@ mod tests {
|
|||
#[test]
|
||||
fn stacked_table_multiple_keys_with_parent_file() {
|
||||
let temp_dir = tempfile::tempdir().unwrap();
|
||||
let mut mut_table = MutableTable::full(3);
|
||||
let store = TableStore::init(temp_dir.path().to_path_buf(), 3);
|
||||
let mut mut_table = store.get_head().unwrap().start_modification();
|
||||
mut_table.add_entry(b"abd".to_vec(), b"value 2".to_vec());
|
||||
mut_table.add_entry(b"abc".to_vec(), b"value1".to_vec());
|
||||
mut_table.add_entry(b"zzz".to_vec(), b"val3".to_vec());
|
||||
|
@ -451,7 +564,7 @@ mod tests {
|
|||
format!("value {}{}", i, round).into_bytes(),
|
||||
);
|
||||
}
|
||||
let saved_table = mut_table.save_in(temp_dir.path().to_owned()).unwrap();
|
||||
let saved_table = store.save_table(mut_table).unwrap();
|
||||
mut_table = MutableTable::incremental(saved_table);
|
||||
}
|
||||
|
||||
|
@ -469,15 +582,16 @@ mod tests {
|
|||
#[test]
|
||||
fn stacked_table_merge() {
|
||||
let temp_dir = tempfile::tempdir().unwrap();
|
||||
let mut mut_base_table = MutableTable::full(3);
|
||||
let store = TableStore::init(temp_dir.path().to_path_buf(), 3);
|
||||
let mut mut_base_table = store.get_head().unwrap().start_modification();
|
||||
mut_base_table.add_entry(b"abc".to_vec(), b"value1".to_vec());
|
||||
let base_table = mut_base_table.save_in(temp_dir.path().to_owned()).unwrap();
|
||||
let base_table = store.save_table(mut_base_table).unwrap();
|
||||
|
||||
let mut mut_table1 = MutableTable::incremental(base_table.clone());
|
||||
mut_table1.add_entry(b"abd".to_vec(), b"value 2".to_vec());
|
||||
mut_table1.add_entry(b"zzz".to_vec(), b"val3".to_vec());
|
||||
mut_table1.add_entry(b"mmm".to_vec(), b"side 1".to_vec());
|
||||
let table1 = mut_table1.save_in(temp_dir.path().to_owned()).unwrap();
|
||||
let table1 = store.save_table(mut_table1).unwrap();
|
||||
let mut mut_table2 = MutableTable::incremental(base_table);
|
||||
mut_table2.add_entry(b"yyy".to_vec(), b"val5".to_vec());
|
||||
mut_table2.add_entry(b"mmm".to_vec(), b"side 2".to_vec());
|
||||
|
@ -496,4 +610,41 @@ mod tests {
|
|||
assert_eq!(mut_table2.get_value(b"zzz"), Some(b"val3".as_slice()));
|
||||
assert_eq!(mut_table2.get_value(b"\xff\xff\xff"), None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn stacked_table_automatic_merge() {
|
||||
// Same test as above, but here we let the store do the merging on load
|
||||
let temp_dir = tempfile::tempdir().unwrap();
|
||||
let store = TableStore::init(temp_dir.path().to_path_buf(), 3);
|
||||
let mut mut_base_table = store.get_head().unwrap().start_modification();
|
||||
mut_base_table.add_entry(b"abc".to_vec(), b"value1".to_vec());
|
||||
let base_table = store.save_table(mut_base_table).unwrap();
|
||||
|
||||
let mut mut_table1 = MutableTable::incremental(base_table.clone());
|
||||
mut_table1.add_entry(b"abd".to_vec(), b"value 2".to_vec());
|
||||
mut_table1.add_entry(b"zzz".to_vec(), b"val3".to_vec());
|
||||
mut_table1.add_entry(b"mmm".to_vec(), b"side 1".to_vec());
|
||||
store.save_table(mut_table1).unwrap();
|
||||
let mut mut_table2 = MutableTable::incremental(base_table);
|
||||
mut_table2.add_entry(b"yyy".to_vec(), b"val5".to_vec());
|
||||
mut_table2.add_entry(b"mmm".to_vec(), b"side 2".to_vec());
|
||||
mut_table2.add_entry(b"abe".to_vec(), b"value 4".to_vec());
|
||||
let table2 = store.save_table(mut_table2).unwrap();
|
||||
|
||||
// The saved table does not have the keys from table1
|
||||
assert_eq!(table2.get_value(b"abd"), None);
|
||||
|
||||
// Can find expected keys in the merged table we get from get_head()
|
||||
let merged_table = store.get_head().unwrap();
|
||||
assert_eq!(merged_table.get_value(b"\0\0\0"), None);
|
||||
assert_eq!(merged_table.get_value(b"abc"), Some(b"value1".as_slice()));
|
||||
assert_eq!(merged_table.get_value(b"abd"), Some(b"value 2".as_slice()));
|
||||
assert_eq!(merged_table.get_value(b"abe"), Some(b"value 4".as_slice()));
|
||||
// The caller shouldn't write two values for the same key, so it's undefined
|
||||
// which wins, but let's test how it currently behaves.
|
||||
assert_eq!(merged_table.get_value(b"mmm"), Some(b"side 1".as_slice()));
|
||||
assert_eq!(merged_table.get_value(b"yyy"), Some(b"val5".as_slice()));
|
||||
assert_eq!(merged_table.get_value(b"zzz"), Some(b"val3".as_slice()));
|
||||
assert_eq!(merged_table.get_value(b"\xff\xff\xff"), None);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue