// Copyright 2021 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // https://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. //! A persistent table of fixed-size keys to variable-size values. The keys are //! stored in sorted order, with each key followed by an integer offset into the //! list of values. The values are concatenated after the keys. A file may have //! a parent file, and the parent may have its own parent, and so on. The child //! file then represents the union of the entries. extern crate byteorder; use std::cmp::Ordering; use std::collections::BTreeMap; use std::fs::File; use std::io; use std::io::{Cursor, Read, Write}; use std::path::PathBuf; use std::sync::Arc; use blake2::{Blake2b, Digest}; use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt}; use tempfile::NamedTempFile; use crate::file_util::persist_content_addressed_temp_file; trait TableSegment { fn segment_num_entries(&self) -> usize; fn segment_parent_file(&self) -> &Option>; fn segment_get_value(&self, key: &[u8]) -> Option<&[u8]>; fn segment_add_entries_to(&self, mut_table: &mut MutableTable); fn num_entries(&self) -> usize { if let Some(parent_file) = self.segment_parent_file() { parent_file.num_entries() + self.segment_num_entries() } else { self.segment_num_entries() } } fn get_value<'a>(&'a self, key: &[u8]) -> Option<&'a [u8]> { if let Some(value) = self.segment_get_value(key) { Some(value) } else if let Some(parent_file) = self.segment_parent_file() { let parent_file: &ReadonlyTable = parent_file.as_ref(); // The parent ReadonlyIndex outlives the child let parent_file: &'a ReadonlyTable = unsafe { std::mem::transmute(parent_file) }; parent_file.get_value(key) } else { None } } } pub struct ReadonlyTable { key_size: usize, parent_file: Option>, name: String, // Number of entries not counting the parent file num_local_entries: usize, // The file's entries in the raw format they're stored in on disk. index: Vec, values: Vec, } impl ReadonlyTable { pub fn load_from( file: &mut dyn Read, dir: PathBuf, name: String, key_size: usize, ) -> io::Result> { let parent_filename_len = file.read_u32::()?; let maybe_parent_file; if parent_filename_len > 0 { let mut parent_filename_bytes = vec![0; parent_filename_len as usize]; file.read_exact(&mut parent_filename_bytes)?; let parent_filename = String::from_utf8(parent_filename_bytes).unwrap(); let parent_file_path = dir.join(&parent_filename); let mut parent_file = File::open(&parent_file_path).unwrap(); let parent_file = ReadonlyTable::load_from(&mut parent_file, dir, parent_filename, key_size)?; maybe_parent_file = Some(parent_file); } else { maybe_parent_file = None; }; let num_local_entries = file.read_u32::()? as usize; let index_size = num_local_entries * ReadonlyTableIndexEntry::size(key_size); let mut data = vec![]; file.read_to_end(&mut data)?; let values = data.split_off(index_size); let index = data; Ok(Arc::new(ReadonlyTable { key_size, parent_file: maybe_parent_file, name, num_local_entries, index, values, })) } fn segment_value_offset_by_pos(&self, pos: usize) -> usize { if pos == self.num_local_entries { self.values.len() } else { ReadonlyTableIndexEntry::new(self, pos).value_offset() } } fn segment_value_by_pos(&self, pos: usize) -> &[u8] { &self.values [self.segment_value_offset_by_pos(pos)..self.segment_value_offset_by_pos(pos + 1)] } } impl TableSegment for ReadonlyTable { fn segment_num_entries(&self) -> usize { self.num_local_entries } fn segment_parent_file(&self) -> &Option> { &self.parent_file } fn segment_get_value(&self, key: &[u8]) -> Option<&[u8]> { let mut low_pos = 0; let mut high_pos = self.num_local_entries; loop { if high_pos == low_pos { return None; } let mid_pos = (low_pos + high_pos) / 2; let mid_entry = ReadonlyTableIndexEntry::new(self, mid_pos); match key.cmp(mid_entry.key()) { Ordering::Less => { high_pos = mid_pos; } Ordering::Equal => { return Some(self.segment_value_by_pos(mid_pos)); } Ordering::Greater => { low_pos = mid_pos + 1; } } } } fn segment_add_entries_to(&self, mut_table: &mut MutableTable) { for pos in 0..self.num_local_entries { let entry = ReadonlyTableIndexEntry::new(self, pos); mut_table.add_entry( entry.key().to_vec(), self.segment_value_by_pos(pos).to_vec(), ); } } } struct ReadonlyTableIndexEntry<'table> { data: &'table [u8], } impl<'table> ReadonlyTableIndexEntry<'table> { fn new(table: &'table ReadonlyTable, pos: usize) -> Self { let entry_size = ReadonlyTableIndexEntry::size(table.key_size); let offset = entry_size * pos; let data = &table.index.as_slice()[offset..offset + entry_size]; ReadonlyTableIndexEntry { data } } fn size(key_size: usize) -> usize { key_size + 4 } fn key(&self) -> &'table [u8] { &self.data[0..self.data.len() - 4] } fn value_offset(&self) -> usize { (&self.data[self.data.len() - 4..self.data.len()]) .read_u32::() .unwrap() as usize } } pub struct MutableTable { key_size: usize, parent_file: Option>, entries: BTreeMap, Vec>, } impl MutableTable { pub fn full(key_size: usize) -> Self { Self { key_size, parent_file: None, entries: BTreeMap::new(), } } pub fn incremental(parent_file: Arc) -> Self { let key_size = parent_file.key_size; Self { key_size, parent_file: Some(parent_file), entries: BTreeMap::new(), } } pub fn add_entry(&mut self, key: Vec, value: Vec) { assert_eq!(key.len(), self.key_size); self.entries.insert(key, value); } fn add_entries_from(&mut self, other: &dyn TableSegment) { other.segment_add_entries_to(self); } pub fn merge_in(&mut self, other: &Arc) { let mut maybe_own_ancestor = self.parent_file.clone(); let mut maybe_other_ancestor = Some(other.clone()); let mut files_to_add = vec![]; loop { if maybe_other_ancestor.is_none() { break; } let other_ancestor = maybe_other_ancestor.as_ref().unwrap(); if maybe_own_ancestor.is_none() { files_to_add.push(other_ancestor.clone()); maybe_other_ancestor = other_ancestor.parent_file.clone(); continue; } let own_ancestor = maybe_own_ancestor.as_ref().unwrap(); if own_ancestor.name == other_ancestor.name { break; } if own_ancestor.num_entries() < other_ancestor.num_entries() { files_to_add.push(other_ancestor.clone()); maybe_other_ancestor = other_ancestor.parent_file.clone(); } else { maybe_own_ancestor = own_ancestor.parent_file.clone(); } } for file in files_to_add.iter().rev() { self.add_entries_from(file.as_ref()); } } fn serialize(self) -> Vec { let mut buf = vec![]; if let Some(parent_file) = &self.parent_file { buf.write_u32::(parent_file.name.len() as u32) .unwrap(); buf.write_all(parent_file.name.as_bytes()).unwrap(); } else { buf.write_u32::(0).unwrap(); } buf.write_u32::(self.entries.len() as u32) .unwrap(); let mut value_offset = 0; for (key, value) in &self.entries { buf.write_all(key).unwrap(); buf.write_u32::(value_offset).unwrap(); value_offset += value.len() as u32; } for value in self.entries.values() { buf.write_all(value).unwrap(); } buf } /// If the MutableTable has more than half the entries of its parent /// ReadonlyTable, return MutableTable with the commits from both. This /// is done recursively, so the stack of index files has O(log n) files. fn maybe_squash_with_ancestors(self) -> MutableTable { let mut num_new_entries = self.entries.len(); let mut files_to_squash = vec![]; let mut maybe_parent_file = self.parent_file.clone(); let mut squashed; loop { match maybe_parent_file { Some(parent_file) => { // TODO: We should probably also squash if the parent file has less than N // commits, regardless of how many (few) are in `self`. if 2 * num_new_entries < parent_file.num_local_entries { squashed = MutableTable::incremental(parent_file); break; } num_new_entries += parent_file.num_local_entries; files_to_squash.push(parent_file.clone()); maybe_parent_file = parent_file.parent_file.clone(); } None => { squashed = MutableTable::full(self.key_size); break; } } } if files_to_squash.is_empty() { return self; } for parent_file in files_to_squash.iter().rev() { squashed.add_entries_from(parent_file.as_ref()); } squashed.add_entries_from(&self); squashed } pub fn save_in(self, dir: PathBuf) -> io::Result> { if self.entries.is_empty() && self.parent_file.is_some() { return Ok(self.parent_file.unwrap()); } let key_size = self.key_size; let buf = self.maybe_squash_with_ancestors().serialize(); let mut hasher = Blake2b::new(); hasher.update(&buf); let file_id_hex = hex::encode(&hasher.finalize()); let file_path = dir.join(&file_id_hex); let mut temp_file = NamedTempFile::new_in(&dir)?; let file = temp_file.as_file_mut(); file.write_all(&buf)?; persist_content_addressed_temp_file(temp_file, &file_path)?; let mut cursor = Cursor::new(&buf); ReadonlyTable::load_from(&mut cursor, dir, file_id_hex, key_size) } } impl TableSegment for MutableTable { fn segment_num_entries(&self) -> usize { self.entries.len() } fn segment_parent_file(&self) -> &Option> { &self.parent_file } fn segment_get_value(&self, key: &[u8]) -> Option<&[u8]> { self.entries.get(key).map(Vec::as_slice) } fn segment_add_entries_to(&self, mut_table: &mut MutableTable) { for (key, value) in &self.entries { mut_table.add_entry(key.clone(), value.clone()); } } } #[cfg(test)] mod tests { use test_case::test_case; use super::*; #[test_case(false; "memory")] #[test_case(true; "file")] fn stacked_table_empty(on_disk: bool) { let temp_dir = tempfile::tempdir().unwrap(); let mut_table = MutableTable::full(3); let mut _saved_table = None; let table: &dyn TableSegment = if on_disk { _saved_table = Some(mut_table.save_in(temp_dir.path().to_owned()).unwrap()); _saved_table.as_ref().unwrap().as_ref() } else { &mut_table }; // Cannot find any keys assert_eq!(table.get_value(b"\0\0\0"), None); assert_eq!(table.get_value(b"aaa"), None); assert_eq!(table.get_value(b"\xff\xff\xff"), None); } #[test_case(false; "memory")] #[test_case(true; "file")] fn stacked_table_single_key(on_disk: bool) { let temp_dir = tempfile::tempdir().unwrap(); let mut mut_table = MutableTable::full(3); mut_table.add_entry(b"abc".to_vec(), b"value".to_vec()); let mut _saved_table = None; let table: &dyn TableSegment = if on_disk { _saved_table = Some(mut_table.save_in(temp_dir.path().to_owned()).unwrap()); _saved_table.as_ref().unwrap().as_ref() } else { &mut_table }; // Can find expected keys assert_eq!(table.get_value(b"\0\0\0"), None); assert_eq!(table.get_value(b"abc"), Some(b"value".as_slice())); assert_eq!(table.get_value(b"\xff\xff\xff"), None); } #[test_case(false; "memory")] #[test_case(true; "file")] fn stacked_table_multiple_keys(on_disk: bool) { let temp_dir = tempfile::tempdir().unwrap(); let mut mut_table = MutableTable::full(3); mut_table.add_entry(b"zzz".to_vec(), b"val3".to_vec()); mut_table.add_entry(b"abc".to_vec(), b"value1".to_vec()); mut_table.add_entry(b"abd".to_vec(), b"value 2".to_vec()); let mut _saved_table = None; let table: &dyn TableSegment = if on_disk { _saved_table = Some(mut_table.save_in(temp_dir.path().to_owned()).unwrap()); _saved_table.as_ref().unwrap().as_ref() } else { &mut_table }; // Can find expected keys assert_eq!(table.get_value(b"\0\0\0"), None); assert_eq!(table.get_value(b"abb"), None); assert_eq!(table.get_value(b"abc"), Some(b"value1".as_slice())); assert_eq!(table.get_value(b"abd"), Some(b"value 2".as_slice())); assert_eq!(table.get_value(b"abe"), None); assert_eq!(table.get_value(b"zzz"), Some(b"val3".as_slice())); assert_eq!(table.get_value(b"\xff\xff\xff"), None); } #[test] fn stacked_table_multiple_keys_with_parent_file() { let temp_dir = tempfile::tempdir().unwrap(); let mut mut_table = MutableTable::full(3); mut_table.add_entry(b"abd".to_vec(), b"value 2".to_vec()); mut_table.add_entry(b"abc".to_vec(), b"value1".to_vec()); mut_table.add_entry(b"zzz".to_vec(), b"val3".to_vec()); for round in 0..10 { for i in 0..10 { mut_table.add_entry( format!("x{}{}", i, round).into_bytes(), format!("value {}{}", i, round).into_bytes(), ); } let saved_table = mut_table.save_in(temp_dir.path().to_owned()).unwrap(); mut_table = MutableTable::incremental(saved_table); } // Can find expected keys assert_eq!(mut_table.get_value(b"\0\0\0"), None); assert_eq!(mut_table.get_value(b"x.."), None); assert_eq!(mut_table.get_value(b"x14"), Some(b"value 14".as_slice())); assert_eq!(mut_table.get_value(b"x41"), Some(b"value 41".as_slice())); assert_eq!(mut_table.get_value(b"x49"), Some(b"value 49".as_slice())); assert_eq!(mut_table.get_value(b"x94"), Some(b"value 94".as_slice())); assert_eq!(mut_table.get_value(b"xAA"), None); assert_eq!(mut_table.get_value(b"\xff\xff\xff"), None); } #[test] fn stacked_table_merge() { let temp_dir = tempfile::tempdir().unwrap(); let mut mut_base_table = MutableTable::full(3); mut_base_table.add_entry(b"abc".to_vec(), b"value1".to_vec()); let base_table = mut_base_table.save_in(temp_dir.path().to_owned()).unwrap(); let mut mut_table1 = MutableTable::incremental(base_table.clone()); mut_table1.add_entry(b"abd".to_vec(), b"value 2".to_vec()); mut_table1.add_entry(b"zzz".to_vec(), b"val3".to_vec()); mut_table1.add_entry(b"mmm".to_vec(), b"side 1".to_vec()); let table1 = mut_table1.save_in(temp_dir.path().to_owned()).unwrap(); let mut mut_table2 = MutableTable::incremental(base_table); mut_table2.add_entry(b"yyy".to_vec(), b"val5".to_vec()); mut_table2.add_entry(b"mmm".to_vec(), b"side 2".to_vec()); mut_table2.add_entry(b"abe".to_vec(), b"value 4".to_vec()); mut_table2.merge_in(&table1); // Can find expected keys assert_eq!(mut_table2.get_value(b"\0\0\0"), None); assert_eq!(mut_table2.get_value(b"abc"), Some(b"value1".as_slice())); assert_eq!(mut_table2.get_value(b"abd"), Some(b"value 2".as_slice())); assert_eq!(mut_table2.get_value(b"abe"), Some(b"value 4".as_slice())); // The caller shouldn't write two values for the same key, so it's undefined // which wins, but let's test how it currently behaves. assert_eq!(mut_table2.get_value(b"mmm"), Some(b"side 1".as_slice())); assert_eq!(mut_table2.get_value(b"yyy"), Some(b"val5".as_slice())); assert_eq!(mut_table2.get_value(b"zzz"), Some(b"val3".as_slice())); assert_eq!(mut_table2.get_value(b"\xff\xff\xff"), None); } }