devices: SnapshotReader/Writer abstraction + new snapshot format

Instead of building up one big json value, we pass around a "writer"
object that allows writing out fragments of the snapshot and nested
namespacing. The snapshot fragments can use different serialize formats
if desired, e.g. the RAM snapshot uses a raw binary file.

Snapshot directory contents:

     213 .../bus0/ACPIPMResource-0
     299 .../bus0/cmos-0
       2 .../bus0/i8042-0
     542 .../bus0/pci-config-io-port-0
     234 .../bus0/serial-0
     234 .../bus0/serial-1
     234 .../bus0/serial-2
     235 .../bus0/serial-3
     536 .../bus1/pci-config-mmio-0
    3.5K .../bus1/pcivirtio-balloon-0
    6.1K .../bus1/pcivirtio-block-0
    1.7K .../bus1/pcivirtio-rng-0
     536 .../bus1/pci-virtual-config-mmio-0
     569 .../bus1/PvPanic-0
    5.3K .../irqchip
    8.0G .../mem
      52 .../mem_metadata
    6.5K .../vcpu/vcpu0
    6.6K .../vcpu/vcpu1
    6.5K .../vcpu/vcpu2
    6.5K .../vcpu/vcpu3

For now, the new abstractions are not used in `BusDevice` or any layers
beyond `BusDevice`, so each device is still a monolithic JSON file.

`PciConfigIo::debug_label` was modified so that it can't change at
runtime.

Based on some light testing, this seems to reduce the snapshot time for
a particular ubuntu VM config from 14s to 13s.

BUG=b:268093674

Change-Id: Ic16980629ff9321bee19f56f6e8e50f214492a7d
Reviewed-on: https://chromium-review.googlesource.com/c/crosvm/crosvm/+/4739910
Commit-Queue: Frederick Mayle <fmayle@google.com>
Reviewed-by: Elie Kheirallah <khei@google.com>
Reviewed-by: Daniel Verkamp <dverkamp@chromium.org>
Reviewed-by: Noah Gold <nkgold@google.com>
This commit is contained in:
Frederick Mayle 2023-07-31 18:30:18 -07:00 committed by crosvm LUCI
parent aeac7cfa54
commit b488863647
8 changed files with 303 additions and 209 deletions

View file

@ -8,10 +8,8 @@ use std::cmp::Ord;
use std::cmp::Ordering;
use std::cmp::PartialEq;
use std::cmp::PartialOrd;
use std::collections::btree_map::BTreeMap;
use std::collections::hash_map::HashMap;
use std::collections::BTreeMap;
use std::collections::BTreeSet;
use std::collections::VecDeque;
use std::fmt;
use std::result;
use std::sync::Arc;
@ -423,6 +421,34 @@ impl Bus {
.collect()
}
/// Same as `unique_devices`, but also calculates the "snapshot key" for each device.
///
/// The keys are used to associate a particular device with data in a serialized snapshot. The
/// keys need to be stable across multiple runs of the same crosvm binary.
///
/// It is most convienent to calculate all the snapshot keys at once, because the keys are
/// dependant on the order of devices on the bus.
fn unique_devices_with_snapshot_key(&self) -> Vec<(String, BusDeviceEntry)> {
let mut next_ids = BTreeMap::<String, usize>::new();
let mut choose_key = |debug_label: String| -> String {
let label = debug_label.replace(char::is_whitespace, "-");
let id = next_ids.entry(label.clone()).or_default();
let key = format!("{}-{}", label, id);
*id += 1;
key
};
let mut result = Vec::new();
for device_entry in self.unique_devices() {
let key = match &device_entry {
BusDeviceEntry::OuterSync(d) => choose_key(d.lock().debug_label()),
BusDeviceEntry::InnerSync(d) => choose_key(d.debug_label()),
};
result.push((key, device_entry));
}
result
}
pub fn sleep_devices(&self) -> anyhow::Result<()> {
for device_entry in self.unique_devices() {
match device_entry {
@ -463,26 +489,27 @@ impl Bus {
pub fn snapshot_devices(
&self,
mut add_snapshot: impl FnMut(u32, serde_json::Value),
snapshot_writer: &vm_control::SnapshotWriter,
) -> anyhow::Result<()> {
for device_entry in self.unique_devices() {
for (snapshot_key, device_entry) in self.unique_devices_with_snapshot_key() {
match device_entry {
BusDeviceEntry::OuterSync(dev) => {
let mut dev = dev.lock();
debug!("Snapshot on device: {}", dev.debug_label());
add_snapshot(
u32::from(dev.device_id()),
dev.snapshot()
snapshot_writer.write_fragment(
&snapshot_key,
&(*dev)
.snapshot()
.with_context(|| format!("failed to snapshot {}", dev.debug_label()))?,
)
)?;
}
BusDeviceEntry::InnerSync(dev) => {
debug!("Snapshot on device: {}", dev.debug_label());
add_snapshot(
u32::from(dev.device_id()),
dev.snapshot_sync()
snapshot_writer.write_fragment(
&snapshot_key,
&dev.snapshot_sync()
.with_context(|| format!("failed to snapshot {}", dev.debug_label()))?,
)
)?;
}
}
}
@ -491,36 +518,38 @@ impl Bus {
pub fn restore_devices(
&self,
devices_map: &mut HashMap<u32, VecDeque<serde_json::Value>>,
snapshot_reader: &vm_control::SnapshotReader,
) -> anyhow::Result<()> {
let mut pop_snapshot = |device_id| {
devices_map
.get_mut(&u32::from(device_id))
.and_then(|dq| dq.pop_front())
};
for device_entry in self.unique_devices() {
let mut unused_keys: BTreeSet<String> =
snapshot_reader.list_fragments()?.into_iter().collect();
for (snapshot_key, device_entry) in self.unique_devices_with_snapshot_key() {
unused_keys.remove(&snapshot_key);
match device_entry {
BusDeviceEntry::OuterSync(dev) => {
let mut dev = dev.lock();
debug!("Restore on device: {}", dev.debug_label());
let snapshot = pop_snapshot(dev.device_id()).ok_or_else(|| {
anyhow!("missing snapshot for device {}", dev.debug_label())
})?;
dev.restore(snapshot).with_context(|| {
format!("restore failed for device {}", dev.debug_label())
})?;
dev.restore(snapshot_reader.read_fragment(&snapshot_key)?)
.with_context(|| {
format!("restore failed for device {}", dev.debug_label())
})?;
}
BusDeviceEntry::InnerSync(dev) => {
debug!("Restore on device: {}", dev.debug_label());
let snapshot = pop_snapshot(dev.device_id()).ok_or_else(|| {
anyhow!("missing snapshot for device {}", dev.debug_label())
})?;
dev.restore_sync(snapshot).with_context(|| {
format!("restore failed for device {}", dev.debug_label())
})?;
dev.restore_sync(snapshot_reader.read_fragment(&snapshot_key)?)
.with_context(|| {
format!("restore failed for device {}", dev.debug_label())
})?;
}
}
}
if !unused_keys.is_empty() {
error!(
"unused restore data in bus, devices might be missing: {:?}",
unused_keys
);
}
Ok(())
}

View file

@ -41,9 +41,6 @@ cfg_if::cfg_if! {
}
}
use std::collections::HashMap;
use std::collections::VecDeque;
use std::fs::File;
use std::sync::Arc;
use anyhow::anyhow;
@ -255,130 +252,44 @@ fn wake_buses(buses: &[&Bus]) {
}
}
fn snapshot_devices(
bus: &Bus,
add_snapshot: impl FnMut(u32, serde_json::Value),
) -> anyhow::Result<()> {
match bus.snapshot_devices(add_snapshot) {
Ok(_) => {
debug!(
"Devices snapshot successfully for {:?} Bus",
bus.get_bus_type()
);
Ok(())
}
Err(e) => {
// If snapshot fails, wake devices and return error
error!(
"failed to snapshot devices: {:#} on {:?} Bus",
e,
bus.get_bus_type()
);
Err(e)
}
}
}
fn restore_devices(
bus: &Bus,
devices_map: &mut HashMap<u32, VecDeque<serde_json::Value>>,
) -> anyhow::Result<()> {
match bus.restore_devices(devices_map) {
Ok(_) => {
debug!(
"Devices restore successfully for {:?} Bus",
bus.get_bus_type()
);
Ok(())
}
Err(e) => {
// If restore fails, wake devices and return error
error!(
"failed to restore devices: {:#} on {:?} Bus",
e,
bus.get_bus_type()
);
Err(e)
}
}
}
#[derive(serde::Serialize, serde::Deserialize)]
struct SnapshotRoot {
guest_memory_metadata: serde_json::Value,
devices: Vec<HashMap<u32, serde_json::Value>>,
}
async fn snapshot_handler(
path: &std::path::Path,
snapshot_writer: vm_control::SnapshotWriter,
guest_memory: &GuestMemory,
buses: &[&Bus],
) -> anyhow::Result<()> {
let mut snapshot_root = SnapshotRoot {
guest_memory_metadata: serde_json::Value::Null,
devices: Vec::new(),
};
// TODO(b/268093674): Better output file format.
// TODO(b/268094487): If the snapshot fail, this leaves an incomplete memory snapshot at the
// requested path.
let mut json_file =
File::create(path).with_context(|| format!("failed to open {}", path.display()))?;
let mem_path = path.with_extension("mem");
let mut mem_file = File::create(&mem_path)
.with_context(|| format!("failed to open {}", mem_path.display()))?;
snapshot_root.guest_memory_metadata = guest_memory
.snapshot(&mut mem_file)
let guest_memory_metadata = guest_memory
.snapshot(&mut snapshot_writer.raw_fragment("mem")?)
.context("failed to snapshot memory")?;
for bus in buses {
snapshot_devices(bus, |id, snapshot| {
snapshot_root.devices.push([(id, snapshot)].into())
})
.context("failed to snapshot devices")?;
snapshot_writer.write_fragment("mem_metadata", &guest_memory_metadata)?;
for (i, bus) in buses.iter().enumerate() {
bus.snapshot_devices(&snapshot_writer.add_namespace(&format!("bus{i}"))?)
.context("failed to snapshot bus devices")?;
debug!(
"Devices snapshot successfully for {:?} Bus",
bus.get_bus_type()
);
}
serde_json::to_writer(&mut json_file, &snapshot_root)?;
Ok(())
}
async fn restore_handler(
path: &std::path::Path,
snapshot_reader: vm_control::SnapshotReader,
guest_memory: &GuestMemory,
buses: &[&Bus],
) -> anyhow::Result<()> {
let file = File::open(path).with_context(|| format!("failed to open {}", path.display()))?;
let mem_path = path.with_extension("mem");
let mut mem_file =
File::open(&mem_path).with_context(|| format!("failed to open {}", mem_path.display()))?;
let snapshot_root: SnapshotRoot = serde_json::from_reader(file)?;
let mut devices_map: HashMap<u32, VecDeque<serde_json::Value>> = HashMap::new();
for (id, device) in snapshot_root.devices.into_iter().flatten() {
devices_map.entry(id).or_default().push_back(device)
}
{
guest_memory.restore(snapshot_root.guest_memory_metadata, &mut mem_file)?;
for bus in buses {
restore_devices(bus, &mut devices_map)?;
}
}
for (key, _) in devices_map.iter().filter(|(_, v)| !v.is_empty()) {
info!(
"Unused restore data for device_id {}, device might be missing.",
key
let guest_memory_metadata = snapshot_reader.read_fragment("mem_metadata")?;
guest_memory.restore(
guest_memory_metadata,
&mut snapshot_reader.raw_fragment("mem")?,
)?;
for (i, bus) in buses.iter().enumerate() {
bus.restore_devices(&snapshot_reader.namespace(&format!("bus{i}"))?)
.context("failed to restore bus devices")?;
debug!(
"Devices restore successfully for {:?} Bus",
bus.get_bus_type()
);
}
Ok(())
}
@ -435,14 +346,13 @@ async fn handle_command_tube(
.await
.context("failed to reply to wake devices request")?;
}
DeviceControlCommand::SnapshotDevices {
snapshot_path: path,
} => {
DeviceControlCommand::SnapshotDevices { snapshot_writer } => {
assert!(
matches!(devices_state, DevicesState::Sleep),
"devices must be sleeping to snapshot"
);
if let Err(e) = snapshot_handler(path.as_path(), &guest_memory, buses).await
if let Err(e) =
snapshot_handler(snapshot_writer, &guest_memory, buses).await
{
error!("failed to snapshot: {:#}", e);
command_tube
@ -456,13 +366,13 @@ async fn handle_command_tube(
.await
.context("Failed to send response")?;
}
DeviceControlCommand::RestoreDevices { restore_path: path } => {
DeviceControlCommand::RestoreDevices { snapshot_reader } => {
assert!(
matches!(devices_state, DevicesState::Sleep),
"devices must be sleeping to restore"
);
if let Err(e) =
restore_handler(path.as_path(), &guest_memory, &[&*io_bus, &*mmio_bus])
restore_handler(snapshot_reader, &guest_memory, &[&*io_bus, &*mmio_bus])
.await
{
error!("failed to restore: {:#}", e);

View file

@ -678,7 +678,7 @@ const PCI_RESET_CPU_BIT: u8 = 1 << 2;
impl BusDevice for PciConfigIo {
fn debug_label(&self) -> String {
format!("pci config io-port 0x{:03x}", self.config_address)
"pci config io-port".to_string()
}
fn device_id(&self) -> DeviceId {

View file

@ -23,6 +23,25 @@ use tempfile::NamedTempFile;
// System-wide suspend/resume, snapshot/restore.
// Tests below check for snapshot/restore functionality, and suspend/resume.
fn compare_snapshots(a: &Path, b: &Path) -> (bool, String) {
let result = std::process::Command::new("diff")
.arg("-qr")
// vcpu and irqchip have timestamps that differ even if a freshly restored VM is
// snapshotted before it starts running again.
.arg("--exclude")
.arg("vcpu*")
.arg("--exclude")
.arg("irqchip")
.arg(a)
.arg(b)
.output()
.unwrap();
(
result.status.success(),
String::from_utf8(result.stdout).unwrap(),
)
}
#[test]
fn suspend_snapshot_restore_resume() -> anyhow::Result<()> {
suspend_resume_system(false)
@ -118,11 +137,15 @@ fn suspend_resume_system(disabled_sandbox: bool) -> anyhow::Result<()> {
vm.exec_in_guest("cat /tmp/foo").unwrap().stdout.trim()
);
let snap1 = std::fs::read_to_string(&snap1_path).unwrap();
let snap2 = std::fs::read_to_string(&snap2_path).unwrap();
let snap3 = std::fs::read_to_string(&snap3_path).unwrap();
assert_ne!(snap1, snap2);
assert_eq!(snap1, snap3);
let (equal, output) = compare_snapshots(&snap1_path, &snap2_path);
assert!(
!equal,
"1st and 2nd snapshot are unexpectedly equal:\n{output}"
);
let (equal, output) = compare_snapshots(&snap1_path, &snap3_path);
assert!(equal, "1st and 3rd snapshot are not equal:\n{output}");
Ok(())
}
@ -175,10 +198,6 @@ fn snapshot_vhost_user() {
// suspend VM
vm.suspend_full().unwrap();
vm.snapshot(&snap_path).unwrap();
let snapshot_json = std::fs::read_to_string(&snap_path).unwrap();
assert!(snapshot_json.contains("\"device_name\":\"virtio-block\""));
}
let (_block_vu_device, _net_vu_device, block_socket, net_socket) = spin_up_vhost_user_devices();

View file

@ -314,21 +314,29 @@ where
error!("Failed to send GetState: {}", e);
};
}
VcpuControl::Snapshot(response_chan) => {
VcpuControl::Snapshot(snapshot_writer, response_chan) => {
let resp = vcpu
.snapshot()
.and_then(|s| {
snapshot_writer
.write_fragment(&format!("vcpu{}", vcpu.id()), &s)
})
.with_context(|| format!("Failed to snapshot Vcpu #{}", vcpu.id()));
if let Err(e) = response_chan.send(resp) {
error!("Failed to send snapshot response: {}", e);
}
}
VcpuControl::Restore(req) => {
let resp = vcpu
.restore(
&req.snapshot,
#[cfg(target_arch = "x86_64")]
req.host_tsc_reference_moment,
)
let resp = req
.snapshot_reader
.read_fragment(&format!("vcpu{}", vcpu.id()))
.and_then(|s| {
vcpu.restore(
&s,
#[cfg(target_arch = "x86_64")]
req.host_tsc_reference_moment,
)
})
.with_context(|| format!("Failed to restore Vcpu #{}", vcpu.id()));
if let Err(e) = req.result_sender.send(resp) {
error!("Failed to send restore response: {}", e);

View file

@ -977,17 +977,20 @@ fn process_vcpu_control_messages<V>(
error!("Failed to send GetState: {}", e);
};
}
VcpuControl::Snapshot(response_chan) => {
VcpuControl::Snapshot(snapshot_writer, response_chan) => {
let resp = vcpu
.snapshot()
.and_then(|s| snapshot_writer.write_fragment(&format!("vcpu{}", vcpu.id()), &s))
.with_context(|| format!("Failed to snapshot Vcpu #{}", vcpu.id()));
if let Err(e) = response_chan.send(resp) {
error!("Failed to send snapshot response: {}", e);
}
}
VcpuControl::Restore(req) => {
let resp = vcpu
.restore(&req.snapshot, req.host_tsc_reference_moment)
let resp = req
.snapshot_reader
.read_fragment(&format!("vcpu{}", vcpu.id()))
.and_then(|s| vcpu.restore(&s, req.host_tsc_reference_moment))
.with_context(|| format!("Failed to restore Vcpu #{}", vcpu.id()));
if let Err(e) = req.result_sender.send(resp) {
error!("Failed to send restore response: {}", e);

View file

@ -26,6 +26,7 @@ use hypervisor::MemRegion;
#[cfg(feature = "balloon")]
mod balloon_tube;
pub mod client;
mod snapshot_format;
pub mod sys;
#[cfg(target_arch = "x86_64")]
@ -68,7 +69,6 @@ use hypervisor::IoEventAddress;
use hypervisor::IrqRoute;
use hypervisor::IrqSource;
pub use hypervisor::MemSlot;
use hypervisor::VcpuSnapshot;
use hypervisor::Vm;
use libc::EINVAL;
use libc::EIO;
@ -89,6 +89,7 @@ use rutabaga_gfx::RutabagaMappedRegion;
use rutabaga_gfx::VulkanInfo;
use serde::Deserialize;
use serde::Serialize;
pub use snapshot_format::*;
use swap::SwapStatus;
use sync::Mutex;
#[cfg(any(target_os = "android", target_os = "linux"))]
@ -125,7 +126,9 @@ pub enum VcpuControl {
MakeRT,
// Request the current state of the vCPU. The result is sent back over the included channel.
GetStates(mpsc::Sender<VmRunMode>),
Snapshot(mpsc::Sender<anyhow::Result<VcpuSnapshot>>),
// Request the vcpu write a snapshot of itself to the writer, then send a `Result` back over
// the channel after completion/failure.
Snapshot(SnapshotWriter, mpsc::Sender<anyhow::Result<()>>),
Restore(VcpuRestoreRequest),
}
@ -134,7 +137,7 @@ pub enum VcpuControl {
#[derive(Clone, Debug)]
pub struct VcpuRestoreRequest {
pub result_sender: mpsc::Sender<anyhow::Result<()>>,
pub snapshot: Box<VcpuSnapshot>,
pub snapshot_reader: SnapshotReader,
#[cfg(target_arch = "x86_64")]
pub host_tsc_reference_moment: u64,
}
@ -323,8 +326,8 @@ pub enum RestoreCommand {
pub enum DeviceControlCommand {
SleepDevices,
WakeDevices,
SnapshotDevices { snapshot_path: PathBuf },
RestoreDevices { restore_path: PathBuf },
SnapshotDevices { snapshot_writer: SnapshotWriter },
RestoreDevices { snapshot_reader: SnapshotReader },
GetDevicesState,
Exit,
}
@ -2024,37 +2027,31 @@ fn do_snapshot(
}
info!("flushed IRQs in {} iterations", flush_attempts);
let snapshot_writer = SnapshotWriter::new(snapshot_path)?;
// Snapshot Vcpus
let vcpu_path = snapshot_path.with_extension("vcpu");
let cpu_file = File::create(&vcpu_path)
.with_context(|| format!("failed to open path {}", vcpu_path.display()))?;
let (send_chan, recv_chan) = mpsc::channel();
kick_vcpus(VcpuControl::Snapshot(send_chan));
kick_vcpus(VcpuControl::Snapshot(
snapshot_writer.add_namespace("vcpu")?,
send_chan,
));
// Validate all Vcpus snapshot successfully
let mut cpu_vec = Vec::with_capacity(vcpu_size);
for _ in 0..vcpu_size {
match recv_chan
recv_chan
.recv()
.context("Failed to snapshot Vcpu, aborting snapshot")?
{
Ok(snap) => {
cpu_vec.push(snap);
}
Err(e) => bail!("Failed to snapshot Vcpu, aborting snapshot: {}", e),
}
.context("Failed to recv Vcpu snapshot response")?
.context("Failed to snapshot Vcpu")?;
}
serde_json::to_writer(cpu_file, &cpu_vec).expect("Failed to write Vcpu state");
// Snapshot irqchip
let irqchip_path = snapshot_path.with_extension("irqchip");
let irqchip_file = File::create(&irqchip_path)
.with_context(|| format!("failed to open path {}", irqchip_path.display()))?;
let irqchip_snap = snapshot_irqchip()?;
serde_json::to_writer(irqchip_file, &irqchip_snap).expect("Failed to write irqchip state");
snapshot_writer
.write_fragment("irqchip", &irqchip_snap)
.context("Failed to write irqchip state")?;
// Snapshot devices
device_control_tube
.send(&DeviceControlCommand::SnapshotDevices { snapshot_path })
.send(&DeviceControlCommand::SnapshotDevices { snapshot_writer })
.context("send command to devices control socket")?;
let resp: VmResponse = device_control_tube
.recv()
@ -2081,38 +2078,33 @@ pub fn do_restore(
let _guard = VcpuSuspendGuard::new(&kick_vcpus, vcpu_size);
let _devices_guard = DeviceSleepGuard::new(device_control_tube)?;
let snapshot_reader = SnapshotReader::new(restore_path)?;
// Restore IrqChip
let irq_path = restore_path.with_extension("irqchip");
let irq_file = File::open(&irq_path)
.with_context(|| format!("failed to open path {}", irq_path.display()))?;
let irq_snapshot: serde_json::Value = serde_json::from_reader(irq_file)?;
let irq_snapshot: serde_json::Value = snapshot_reader.read_fragment("irqchip")?;
restore_irqchip(irq_snapshot)?;
// Restore Vcpu(s)
let vcpu_path = restore_path.with_extension("vcpu");
let cpu_file = File::open(&vcpu_path)
.with_context(|| format!("failed to open path {}", vcpu_path.display()))?;
let vcpu_snapshots: Vec<VcpuSnapshot> = serde_json::from_reader(cpu_file)?;
if vcpu_snapshots.len() != vcpu_size {
let vcpu_snapshot_reader = snapshot_reader.namespace("vcpu")?;
let vcpu_snapshot_count = vcpu_snapshot_reader.list_fragments()?.len();
if vcpu_snapshot_count != vcpu_size {
bail!(
"bad cpu count in snapshot: expected={} got={}",
vcpu_size,
vcpu_snapshots.len()
vcpu_snapshot_count,
);
}
#[cfg(target_arch = "x86_64")]
let host_tsc_reference_moment = {
// SAFETY: rdtsc takes no arguments.
unsafe { _rdtsc() }
};
let (send_chan, recv_chan) = mpsc::channel();
for vcpu_snap in vcpu_snapshots {
let vcpu_id = vcpu_snap.vcpu_id;
for vcpu_id in 0..vcpu_size {
kick_vcpu(
VcpuControl::Restore(VcpuRestoreRequest {
result_sender: send_chan.clone(),
snapshot: Box::new(vcpu_snap),
snapshot_reader: vcpu_snapshot_reader.clone(),
#[cfg(target_arch = "x86_64")]
host_tsc_reference_moment,
}),
@ -2128,7 +2120,7 @@ pub fn do_restore(
// Restore devices
device_control_tube
.send(&DeviceControlCommand::RestoreDevices { restore_path })
.send(&DeviceControlCommand::RestoreDevices { snapshot_reader })
.context("send command to devices control socket")?;
let resp: VmResponse = device_control_tube
.recv()

View file

@ -0,0 +1,133 @@
// Copyright 2024 The ChromiumOS Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use std::fs::File;
use std::path::PathBuf;
use anyhow::Context;
use anyhow::Result;
/// Writer of serialized VM snapshots.
///
/// Each fragment is an opaque byte blob. Namespaces can be used to avoid fragment naming
/// collisions between devices.
///
/// In the current implementation, fragments are files and namespaces are directories, but the API
/// is kept abstract so that we can potentially support something like a single file archive
/// output.
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
pub struct SnapshotWriter {
dir: PathBuf,
}
impl SnapshotWriter {
/// Creates a new `SnapshotWriter` that will writes its data to a dir at `root`. The path must
/// not exist yet.
// TODO(b/268094487): If the snapshot fails, we leave incomplete snapshot files at the
// requested path. Consider building up the snapshot dir somewhere else and moving it into
// place at the end.
pub fn new(root: PathBuf) -> Result<Self> {
std::fs::create_dir(&root)
.with_context(|| format!("failed to create snapshot root dir: {}", root.display()))?;
Ok(Self { dir: root })
}
/// Creates a snapshot fragment and get access to the `File` representing it.
pub fn raw_fragment(&self, name: &str) -> Result<File> {
let path = self.dir.join(name);
let file = File::options()
.write(true)
.create_new(true)
.open(&path)
.with_context(|| {
format!(
"failed to create snapshot fragment {name:?} at {}",
path.display()
)
})?;
Ok(file)
}
/// Creates a snapshot fragment from a serialized representation of `v`.
pub fn write_fragment<T: serde::Serialize>(&self, name: &str, v: &T) -> Result<()> {
Ok(serde_json::to_writer(self.raw_fragment(name)?, v)?)
}
/// Creates new namespace and returns a `SnapshotWriter` that writes to it. Namespaces can be
/// nested.
pub fn add_namespace(&self, name: &str) -> Result<Self> {
let dir = self.dir.join(name);
std::fs::create_dir(&dir).with_context(|| {
format!(
"failed to create nested snapshot writer {name:?} at {}",
dir.display()
)
})?;
Ok(Self { dir })
}
}
/// Reads snapshots created by `SnapshotWriter`.
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
pub struct SnapshotReader {
dir: PathBuf,
}
impl SnapshotReader {
/// Reads a snapshot at `root`.
pub fn new(root: PathBuf) -> Result<Self> {
Ok(Self { dir: root })
}
/// Gets access to the `File` representing a fragment.
pub fn raw_fragment(&self, name: &str) -> Result<File> {
let path = self.dir.join(name);
let file = File::open(&path).with_context(|| {
format!(
"failed to open snapshot fragment {name:?} at {}",
path.display()
)
})?;
Ok(file)
}
/// Reads a fragment.
pub fn read_fragment<T: serde::de::DeserializeOwned>(&self, name: &str) -> Result<T> {
Ok(serde_json::from_reader(self.raw_fragment(name)?)?)
}
/// Reads the names of all fragments in this namespace.
pub fn list_fragments(&self) -> Result<Vec<String>> {
let mut result = Vec::new();
for entry in std::fs::read_dir(&self.dir)? {
let entry = entry?;
if entry.path().is_file() {
if let Some(file_name) = entry.path().file_name() {
result.push(file_name.to_string_lossy().into_owned());
}
}
}
Ok(result)
}
/// Open a namespace.
pub fn namespace(&self, name: &str) -> Result<Self> {
let dir = self.dir.join(name);
Ok(Self { dir })
}
/// Reads the names of all child namespaces
pub fn list_namespaces(&self) -> Result<Vec<String>> {
let mut result = Vec::new();
for entry in std::fs::read_dir(&self.dir)? {
let entry = entry?;
if entry.path().is_dir() {
if let Some(file_name) = entry.path().file_name() {
result.push(file_name.to_string_lossy().into_owned());
}
}
}
Ok(result)
}
}