diff --git a/devices/src/bus.rs b/devices/src/bus.rs index 9b71bff4e2..2fc44cc6f4 100644 --- a/devices/src/bus.rs +++ b/devices/src/bus.rs @@ -8,10 +8,8 @@ use std::cmp::Ord; use std::cmp::Ordering; use std::cmp::PartialEq; use std::cmp::PartialOrd; -use std::collections::btree_map::BTreeMap; -use std::collections::hash_map::HashMap; +use std::collections::BTreeMap; use std::collections::BTreeSet; -use std::collections::VecDeque; use std::fmt; use std::result; use std::sync::Arc; @@ -423,6 +421,34 @@ impl Bus { .collect() } + /// Same as `unique_devices`, but also calculates the "snapshot key" for each device. + /// + /// The keys are used to associate a particular device with data in a serialized snapshot. The + /// keys need to be stable across multiple runs of the same crosvm binary. + /// + /// It is most convienent to calculate all the snapshot keys at once, because the keys are + /// dependant on the order of devices on the bus. + fn unique_devices_with_snapshot_key(&self) -> Vec<(String, BusDeviceEntry)> { + let mut next_ids = BTreeMap::::new(); + let mut choose_key = |debug_label: String| -> String { + let label = debug_label.replace(char::is_whitespace, "-"); + let id = next_ids.entry(label.clone()).or_default(); + let key = format!("{}-{}", label, id); + *id += 1; + key + }; + + let mut result = Vec::new(); + for device_entry in self.unique_devices() { + let key = match &device_entry { + BusDeviceEntry::OuterSync(d) => choose_key(d.lock().debug_label()), + BusDeviceEntry::InnerSync(d) => choose_key(d.debug_label()), + }; + result.push((key, device_entry)); + } + result + } + pub fn sleep_devices(&self) -> anyhow::Result<()> { for device_entry in self.unique_devices() { match device_entry { @@ -463,26 +489,27 @@ impl Bus { pub fn snapshot_devices( &self, - mut add_snapshot: impl FnMut(u32, serde_json::Value), + snapshot_writer: &vm_control::SnapshotWriter, ) -> anyhow::Result<()> { - for device_entry in self.unique_devices() { + for (snapshot_key, device_entry) in self.unique_devices_with_snapshot_key() { match device_entry { BusDeviceEntry::OuterSync(dev) => { let mut dev = dev.lock(); debug!("Snapshot on device: {}", dev.debug_label()); - add_snapshot( - u32::from(dev.device_id()), - dev.snapshot() + snapshot_writer.write_fragment( + &snapshot_key, + &(*dev) + .snapshot() .with_context(|| format!("failed to snapshot {}", dev.debug_label()))?, - ) + )?; } BusDeviceEntry::InnerSync(dev) => { debug!("Snapshot on device: {}", dev.debug_label()); - add_snapshot( - u32::from(dev.device_id()), - dev.snapshot_sync() + snapshot_writer.write_fragment( + &snapshot_key, + &dev.snapshot_sync() .with_context(|| format!("failed to snapshot {}", dev.debug_label()))?, - ) + )?; } } } @@ -491,36 +518,38 @@ impl Bus { pub fn restore_devices( &self, - devices_map: &mut HashMap>, + snapshot_reader: &vm_control::SnapshotReader, ) -> anyhow::Result<()> { - let mut pop_snapshot = |device_id| { - devices_map - .get_mut(&u32::from(device_id)) - .and_then(|dq| dq.pop_front()) - }; - for device_entry in self.unique_devices() { + let mut unused_keys: BTreeSet = + snapshot_reader.list_fragments()?.into_iter().collect(); + for (snapshot_key, device_entry) in self.unique_devices_with_snapshot_key() { + unused_keys.remove(&snapshot_key); match device_entry { BusDeviceEntry::OuterSync(dev) => { let mut dev = dev.lock(); debug!("Restore on device: {}", dev.debug_label()); - let snapshot = pop_snapshot(dev.device_id()).ok_or_else(|| { - anyhow!("missing snapshot for device {}", dev.debug_label()) - })?; - dev.restore(snapshot).with_context(|| { - format!("restore failed for device {}", dev.debug_label()) - })?; + dev.restore(snapshot_reader.read_fragment(&snapshot_key)?) + .with_context(|| { + format!("restore failed for device {}", dev.debug_label()) + })?; } BusDeviceEntry::InnerSync(dev) => { debug!("Restore on device: {}", dev.debug_label()); - let snapshot = pop_snapshot(dev.device_id()).ok_or_else(|| { - anyhow!("missing snapshot for device {}", dev.debug_label()) - })?; - dev.restore_sync(snapshot).with_context(|| { - format!("restore failed for device {}", dev.debug_label()) - })?; + dev.restore_sync(snapshot_reader.read_fragment(&snapshot_key)?) + .with_context(|| { + format!("restore failed for device {}", dev.debug_label()) + })?; } } } + + if !unused_keys.is_empty() { + error!( + "unused restore data in bus, devices might be missing: {:?}", + unused_keys + ); + } + Ok(()) } diff --git a/devices/src/lib.rs b/devices/src/lib.rs index 1cc3689df3..1823814062 100644 --- a/devices/src/lib.rs +++ b/devices/src/lib.rs @@ -41,9 +41,6 @@ cfg_if::cfg_if! { } } -use std::collections::HashMap; -use std::collections::VecDeque; -use std::fs::File; use std::sync::Arc; use anyhow::anyhow; @@ -255,130 +252,44 @@ fn wake_buses(buses: &[&Bus]) { } } -fn snapshot_devices( - bus: &Bus, - add_snapshot: impl FnMut(u32, serde_json::Value), -) -> anyhow::Result<()> { - match bus.snapshot_devices(add_snapshot) { - Ok(_) => { - debug!( - "Devices snapshot successfully for {:?} Bus", - bus.get_bus_type() - ); - Ok(()) - } - Err(e) => { - // If snapshot fails, wake devices and return error - error!( - "failed to snapshot devices: {:#} on {:?} Bus", - e, - bus.get_bus_type() - ); - Err(e) - } - } -} - -fn restore_devices( - bus: &Bus, - devices_map: &mut HashMap>, -) -> anyhow::Result<()> { - match bus.restore_devices(devices_map) { - Ok(_) => { - debug!( - "Devices restore successfully for {:?} Bus", - bus.get_bus_type() - ); - Ok(()) - } - Err(e) => { - // If restore fails, wake devices and return error - error!( - "failed to restore devices: {:#} on {:?} Bus", - e, - bus.get_bus_type() - ); - Err(e) - } - } -} - -#[derive(serde::Serialize, serde::Deserialize)] -struct SnapshotRoot { - guest_memory_metadata: serde_json::Value, - devices: Vec>, -} - async fn snapshot_handler( - path: &std::path::Path, + snapshot_writer: vm_control::SnapshotWriter, guest_memory: &GuestMemory, buses: &[&Bus], ) -> anyhow::Result<()> { - let mut snapshot_root = SnapshotRoot { - guest_memory_metadata: serde_json::Value::Null, - devices: Vec::new(), - }; - - // TODO(b/268093674): Better output file format. - // TODO(b/268094487): If the snapshot fail, this leaves an incomplete memory snapshot at the - // requested path. - - let mut json_file = - File::create(path).with_context(|| format!("failed to open {}", path.display()))?; - - let mem_path = path.with_extension("mem"); - let mut mem_file = File::create(&mem_path) - .with_context(|| format!("failed to open {}", mem_path.display()))?; - - snapshot_root.guest_memory_metadata = guest_memory - .snapshot(&mut mem_file) + let guest_memory_metadata = guest_memory + .snapshot(&mut snapshot_writer.raw_fragment("mem")?) .context("failed to snapshot memory")?; - - for bus in buses { - snapshot_devices(bus, |id, snapshot| { - snapshot_root.devices.push([(id, snapshot)].into()) - }) - .context("failed to snapshot devices")?; + snapshot_writer.write_fragment("mem_metadata", &guest_memory_metadata)?; + for (i, bus) in buses.iter().enumerate() { + bus.snapshot_devices(&snapshot_writer.add_namespace(&format!("bus{i}"))?) + .context("failed to snapshot bus devices")?; + debug!( + "Devices snapshot successfully for {:?} Bus", + bus.get_bus_type() + ); } - - serde_json::to_writer(&mut json_file, &snapshot_root)?; - Ok(()) } async fn restore_handler( - path: &std::path::Path, + snapshot_reader: vm_control::SnapshotReader, guest_memory: &GuestMemory, buses: &[&Bus], ) -> anyhow::Result<()> { - let file = File::open(path).with_context(|| format!("failed to open {}", path.display()))?; - - let mem_path = path.with_extension("mem"); - let mut mem_file = - File::open(&mem_path).with_context(|| format!("failed to open {}", mem_path.display()))?; - - let snapshot_root: SnapshotRoot = serde_json::from_reader(file)?; - - let mut devices_map: HashMap> = HashMap::new(); - for (id, device) in snapshot_root.devices.into_iter().flatten() { - devices_map.entry(id).or_default().push_back(device) - } - - { - guest_memory.restore(snapshot_root.guest_memory_metadata, &mut mem_file)?; - - for bus in buses { - restore_devices(bus, &mut devices_map)?; - } - } - - for (key, _) in devices_map.iter().filter(|(_, v)| !v.is_empty()) { - info!( - "Unused restore data for device_id {}, device might be missing.", - key + let guest_memory_metadata = snapshot_reader.read_fragment("mem_metadata")?; + guest_memory.restore( + guest_memory_metadata, + &mut snapshot_reader.raw_fragment("mem")?, + )?; + for (i, bus) in buses.iter().enumerate() { + bus.restore_devices(&snapshot_reader.namespace(&format!("bus{i}"))?) + .context("failed to restore bus devices")?; + debug!( + "Devices restore successfully for {:?} Bus", + bus.get_bus_type() ); } - Ok(()) } @@ -435,14 +346,13 @@ async fn handle_command_tube( .await .context("failed to reply to wake devices request")?; } - DeviceControlCommand::SnapshotDevices { - snapshot_path: path, - } => { + DeviceControlCommand::SnapshotDevices { snapshot_writer } => { assert!( matches!(devices_state, DevicesState::Sleep), "devices must be sleeping to snapshot" ); - if let Err(e) = snapshot_handler(path.as_path(), &guest_memory, buses).await + if let Err(e) = + snapshot_handler(snapshot_writer, &guest_memory, buses).await { error!("failed to snapshot: {:#}", e); command_tube @@ -456,13 +366,13 @@ async fn handle_command_tube( .await .context("Failed to send response")?; } - DeviceControlCommand::RestoreDevices { restore_path: path } => { + DeviceControlCommand::RestoreDevices { snapshot_reader } => { assert!( matches!(devices_state, DevicesState::Sleep), "devices must be sleeping to restore" ); if let Err(e) = - restore_handler(path.as_path(), &guest_memory, &[&*io_bus, &*mmio_bus]) + restore_handler(snapshot_reader, &guest_memory, &[&*io_bus, &*mmio_bus]) .await { error!("failed to restore: {:#}", e); diff --git a/devices/src/pci/pci_root.rs b/devices/src/pci/pci_root.rs index 03e22554c7..b870e714d7 100644 --- a/devices/src/pci/pci_root.rs +++ b/devices/src/pci/pci_root.rs @@ -678,7 +678,7 @@ const PCI_RESET_CPU_BIT: u8 = 1 << 2; impl BusDevice for PciConfigIo { fn debug_label(&self) -> String { - format!("pci config io-port 0x{:03x}", self.config_address) + "pci config io-port".to_string() } fn device_id(&self) -> DeviceId { diff --git a/e2e_tests/tests/suspend_resume.rs b/e2e_tests/tests/suspend_resume.rs index 48fbe7fdbc..727ee21fc1 100644 --- a/e2e_tests/tests/suspend_resume.rs +++ b/e2e_tests/tests/suspend_resume.rs @@ -23,6 +23,25 @@ use tempfile::NamedTempFile; // System-wide suspend/resume, snapshot/restore. // Tests below check for snapshot/restore functionality, and suspend/resume. +fn compare_snapshots(a: &Path, b: &Path) -> (bool, String) { + let result = std::process::Command::new("diff") + .arg("-qr") + // vcpu and irqchip have timestamps that differ even if a freshly restored VM is + // snapshotted before it starts running again. + .arg("--exclude") + .arg("vcpu*") + .arg("--exclude") + .arg("irqchip") + .arg(a) + .arg(b) + .output() + .unwrap(); + ( + result.status.success(), + String::from_utf8(result.stdout).unwrap(), + ) +} + #[test] fn suspend_snapshot_restore_resume() -> anyhow::Result<()> { suspend_resume_system(false) @@ -118,11 +137,15 @@ fn suspend_resume_system(disabled_sandbox: bool) -> anyhow::Result<()> { vm.exec_in_guest("cat /tmp/foo").unwrap().stdout.trim() ); - let snap1 = std::fs::read_to_string(&snap1_path).unwrap(); - let snap2 = std::fs::read_to_string(&snap2_path).unwrap(); - let snap3 = std::fs::read_to_string(&snap3_path).unwrap(); - assert_ne!(snap1, snap2); - assert_eq!(snap1, snap3); + let (equal, output) = compare_snapshots(&snap1_path, &snap2_path); + assert!( + !equal, + "1st and 2nd snapshot are unexpectedly equal:\n{output}" + ); + + let (equal, output) = compare_snapshots(&snap1_path, &snap3_path); + assert!(equal, "1st and 3rd snapshot are not equal:\n{output}"); + Ok(()) } @@ -175,10 +198,6 @@ fn snapshot_vhost_user() { // suspend VM vm.suspend_full().unwrap(); vm.snapshot(&snap_path).unwrap(); - - let snapshot_json = std::fs::read_to_string(&snap_path).unwrap(); - - assert!(snapshot_json.contains("\"device_name\":\"virtio-block\"")); } let (_block_vu_device, _net_vu_device, block_socket, net_socket) = spin_up_vhost_user_devices(); diff --git a/src/crosvm/sys/linux/vcpu.rs b/src/crosvm/sys/linux/vcpu.rs index cafe4736d4..f5f25b7dd9 100644 --- a/src/crosvm/sys/linux/vcpu.rs +++ b/src/crosvm/sys/linux/vcpu.rs @@ -314,21 +314,29 @@ where error!("Failed to send GetState: {}", e); }; } - VcpuControl::Snapshot(response_chan) => { + VcpuControl::Snapshot(snapshot_writer, response_chan) => { let resp = vcpu .snapshot() + .and_then(|s| { + snapshot_writer + .write_fragment(&format!("vcpu{}", vcpu.id()), &s) + }) .with_context(|| format!("Failed to snapshot Vcpu #{}", vcpu.id())); if let Err(e) = response_chan.send(resp) { error!("Failed to send snapshot response: {}", e); } } VcpuControl::Restore(req) => { - let resp = vcpu - .restore( - &req.snapshot, - #[cfg(target_arch = "x86_64")] - req.host_tsc_reference_moment, - ) + let resp = req + .snapshot_reader + .read_fragment(&format!("vcpu{}", vcpu.id())) + .and_then(|s| { + vcpu.restore( + &s, + #[cfg(target_arch = "x86_64")] + req.host_tsc_reference_moment, + ) + }) .with_context(|| format!("Failed to restore Vcpu #{}", vcpu.id())); if let Err(e) = req.result_sender.send(resp) { error!("Failed to send restore response: {}", e); diff --git a/src/sys/windows/run_vcpu.rs b/src/sys/windows/run_vcpu.rs index 7a35133b32..4fffd25800 100644 --- a/src/sys/windows/run_vcpu.rs +++ b/src/sys/windows/run_vcpu.rs @@ -977,17 +977,20 @@ fn process_vcpu_control_messages( error!("Failed to send GetState: {}", e); }; } - VcpuControl::Snapshot(response_chan) => { + VcpuControl::Snapshot(snapshot_writer, response_chan) => { let resp = vcpu .snapshot() + .and_then(|s| snapshot_writer.write_fragment(&format!("vcpu{}", vcpu.id()), &s)) .with_context(|| format!("Failed to snapshot Vcpu #{}", vcpu.id())); if let Err(e) = response_chan.send(resp) { error!("Failed to send snapshot response: {}", e); } } VcpuControl::Restore(req) => { - let resp = vcpu - .restore(&req.snapshot, req.host_tsc_reference_moment) + let resp = req + .snapshot_reader + .read_fragment(&format!("vcpu{}", vcpu.id())) + .and_then(|s| vcpu.restore(&s, req.host_tsc_reference_moment)) .with_context(|| format!("Failed to restore Vcpu #{}", vcpu.id())); if let Err(e) = req.result_sender.send(resp) { error!("Failed to send restore response: {}", e); diff --git a/vm_control/src/lib.rs b/vm_control/src/lib.rs index fca6657348..bcf19a28cc 100644 --- a/vm_control/src/lib.rs +++ b/vm_control/src/lib.rs @@ -26,6 +26,7 @@ use hypervisor::MemRegion; #[cfg(feature = "balloon")] mod balloon_tube; pub mod client; +mod snapshot_format; pub mod sys; #[cfg(target_arch = "x86_64")] @@ -68,7 +69,6 @@ use hypervisor::IoEventAddress; use hypervisor::IrqRoute; use hypervisor::IrqSource; pub use hypervisor::MemSlot; -use hypervisor::VcpuSnapshot; use hypervisor::Vm; use libc::EINVAL; use libc::EIO; @@ -89,6 +89,7 @@ use rutabaga_gfx::RutabagaMappedRegion; use rutabaga_gfx::VulkanInfo; use serde::Deserialize; use serde::Serialize; +pub use snapshot_format::*; use swap::SwapStatus; use sync::Mutex; #[cfg(any(target_os = "android", target_os = "linux"))] @@ -125,7 +126,9 @@ pub enum VcpuControl { MakeRT, // Request the current state of the vCPU. The result is sent back over the included channel. GetStates(mpsc::Sender), - Snapshot(mpsc::Sender>), + // Request the vcpu write a snapshot of itself to the writer, then send a `Result` back over + // the channel after completion/failure. + Snapshot(SnapshotWriter, mpsc::Sender>), Restore(VcpuRestoreRequest), } @@ -134,7 +137,7 @@ pub enum VcpuControl { #[derive(Clone, Debug)] pub struct VcpuRestoreRequest { pub result_sender: mpsc::Sender>, - pub snapshot: Box, + pub snapshot_reader: SnapshotReader, #[cfg(target_arch = "x86_64")] pub host_tsc_reference_moment: u64, } @@ -323,8 +326,8 @@ pub enum RestoreCommand { pub enum DeviceControlCommand { SleepDevices, WakeDevices, - SnapshotDevices { snapshot_path: PathBuf }, - RestoreDevices { restore_path: PathBuf }, + SnapshotDevices { snapshot_writer: SnapshotWriter }, + RestoreDevices { snapshot_reader: SnapshotReader }, GetDevicesState, Exit, } @@ -2024,37 +2027,31 @@ fn do_snapshot( } info!("flushed IRQs in {} iterations", flush_attempts); + let snapshot_writer = SnapshotWriter::new(snapshot_path)?; + // Snapshot Vcpus - let vcpu_path = snapshot_path.with_extension("vcpu"); - let cpu_file = File::create(&vcpu_path) - .with_context(|| format!("failed to open path {}", vcpu_path.display()))?; let (send_chan, recv_chan) = mpsc::channel(); - kick_vcpus(VcpuControl::Snapshot(send_chan)); + kick_vcpus(VcpuControl::Snapshot( + snapshot_writer.add_namespace("vcpu")?, + send_chan, + )); // Validate all Vcpus snapshot successfully - let mut cpu_vec = Vec::with_capacity(vcpu_size); for _ in 0..vcpu_size { - match recv_chan + recv_chan .recv() - .context("Failed to snapshot Vcpu, aborting snapshot")? - { - Ok(snap) => { - cpu_vec.push(snap); - } - Err(e) => bail!("Failed to snapshot Vcpu, aborting snapshot: {}", e), - } + .context("Failed to recv Vcpu snapshot response")? + .context("Failed to snapshot Vcpu")?; } - serde_json::to_writer(cpu_file, &cpu_vec).expect("Failed to write Vcpu state"); // Snapshot irqchip - let irqchip_path = snapshot_path.with_extension("irqchip"); - let irqchip_file = File::create(&irqchip_path) - .with_context(|| format!("failed to open path {}", irqchip_path.display()))?; let irqchip_snap = snapshot_irqchip()?; - serde_json::to_writer(irqchip_file, &irqchip_snap).expect("Failed to write irqchip state"); + snapshot_writer + .write_fragment("irqchip", &irqchip_snap) + .context("Failed to write irqchip state")?; // Snapshot devices device_control_tube - .send(&DeviceControlCommand::SnapshotDevices { snapshot_path }) + .send(&DeviceControlCommand::SnapshotDevices { snapshot_writer }) .context("send command to devices control socket")?; let resp: VmResponse = device_control_tube .recv() @@ -2081,38 +2078,33 @@ pub fn do_restore( let _guard = VcpuSuspendGuard::new(&kick_vcpus, vcpu_size); let _devices_guard = DeviceSleepGuard::new(device_control_tube)?; + let snapshot_reader = SnapshotReader::new(restore_path)?; + // Restore IrqChip - let irq_path = restore_path.with_extension("irqchip"); - let irq_file = File::open(&irq_path) - .with_context(|| format!("failed to open path {}", irq_path.display()))?; - let irq_snapshot: serde_json::Value = serde_json::from_reader(irq_file)?; + let irq_snapshot: serde_json::Value = snapshot_reader.read_fragment("irqchip")?; restore_irqchip(irq_snapshot)?; // Restore Vcpu(s) - let vcpu_path = restore_path.with_extension("vcpu"); - let cpu_file = File::open(&vcpu_path) - .with_context(|| format!("failed to open path {}", vcpu_path.display()))?; - let vcpu_snapshots: Vec = serde_json::from_reader(cpu_file)?; - if vcpu_snapshots.len() != vcpu_size { + let vcpu_snapshot_reader = snapshot_reader.namespace("vcpu")?; + let vcpu_snapshot_count = vcpu_snapshot_reader.list_fragments()?.len(); + if vcpu_snapshot_count != vcpu_size { bail!( "bad cpu count in snapshot: expected={} got={}", vcpu_size, - vcpu_snapshots.len() + vcpu_snapshot_count, ); } - #[cfg(target_arch = "x86_64")] let host_tsc_reference_moment = { // SAFETY: rdtsc takes no arguments. unsafe { _rdtsc() } }; let (send_chan, recv_chan) = mpsc::channel(); - for vcpu_snap in vcpu_snapshots { - let vcpu_id = vcpu_snap.vcpu_id; + for vcpu_id in 0..vcpu_size { kick_vcpu( VcpuControl::Restore(VcpuRestoreRequest { result_sender: send_chan.clone(), - snapshot: Box::new(vcpu_snap), + snapshot_reader: vcpu_snapshot_reader.clone(), #[cfg(target_arch = "x86_64")] host_tsc_reference_moment, }), @@ -2128,7 +2120,7 @@ pub fn do_restore( // Restore devices device_control_tube - .send(&DeviceControlCommand::RestoreDevices { restore_path }) + .send(&DeviceControlCommand::RestoreDevices { snapshot_reader }) .context("send command to devices control socket")?; let resp: VmResponse = device_control_tube .recv() diff --git a/vm_control/src/snapshot_format.rs b/vm_control/src/snapshot_format.rs new file mode 100644 index 0000000000..621473b7f0 --- /dev/null +++ b/vm_control/src/snapshot_format.rs @@ -0,0 +1,133 @@ +// Copyright 2024 The ChromiumOS Authors +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +use std::fs::File; +use std::path::PathBuf; + +use anyhow::Context; +use anyhow::Result; + +/// Writer of serialized VM snapshots. +/// +/// Each fragment is an opaque byte blob. Namespaces can be used to avoid fragment naming +/// collisions between devices. +/// +/// In the current implementation, fragments are files and namespaces are directories, but the API +/// is kept abstract so that we can potentially support something like a single file archive +/// output. +#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] +pub struct SnapshotWriter { + dir: PathBuf, +} + +impl SnapshotWriter { + /// Creates a new `SnapshotWriter` that will writes its data to a dir at `root`. The path must + /// not exist yet. + // TODO(b/268094487): If the snapshot fails, we leave incomplete snapshot files at the + // requested path. Consider building up the snapshot dir somewhere else and moving it into + // place at the end. + pub fn new(root: PathBuf) -> Result { + std::fs::create_dir(&root) + .with_context(|| format!("failed to create snapshot root dir: {}", root.display()))?; + Ok(Self { dir: root }) + } + + /// Creates a snapshot fragment and get access to the `File` representing it. + pub fn raw_fragment(&self, name: &str) -> Result { + let path = self.dir.join(name); + let file = File::options() + .write(true) + .create_new(true) + .open(&path) + .with_context(|| { + format!( + "failed to create snapshot fragment {name:?} at {}", + path.display() + ) + })?; + Ok(file) + } + + /// Creates a snapshot fragment from a serialized representation of `v`. + pub fn write_fragment(&self, name: &str, v: &T) -> Result<()> { + Ok(serde_json::to_writer(self.raw_fragment(name)?, v)?) + } + + /// Creates new namespace and returns a `SnapshotWriter` that writes to it. Namespaces can be + /// nested. + pub fn add_namespace(&self, name: &str) -> Result { + let dir = self.dir.join(name); + std::fs::create_dir(&dir).with_context(|| { + format!( + "failed to create nested snapshot writer {name:?} at {}", + dir.display() + ) + })?; + Ok(Self { dir }) + } +} + +/// Reads snapshots created by `SnapshotWriter`. +#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] +pub struct SnapshotReader { + dir: PathBuf, +} + +impl SnapshotReader { + /// Reads a snapshot at `root`. + pub fn new(root: PathBuf) -> Result { + Ok(Self { dir: root }) + } + + /// Gets access to the `File` representing a fragment. + pub fn raw_fragment(&self, name: &str) -> Result { + let path = self.dir.join(name); + let file = File::open(&path).with_context(|| { + format!( + "failed to open snapshot fragment {name:?} at {}", + path.display() + ) + })?; + Ok(file) + } + + /// Reads a fragment. + pub fn read_fragment(&self, name: &str) -> Result { + Ok(serde_json::from_reader(self.raw_fragment(name)?)?) + } + + /// Reads the names of all fragments in this namespace. + pub fn list_fragments(&self) -> Result> { + let mut result = Vec::new(); + for entry in std::fs::read_dir(&self.dir)? { + let entry = entry?; + if entry.path().is_file() { + if let Some(file_name) = entry.path().file_name() { + result.push(file_name.to_string_lossy().into_owned()); + } + } + } + Ok(result) + } + + /// Open a namespace. + pub fn namespace(&self, name: &str) -> Result { + let dir = self.dir.join(name); + Ok(Self { dir }) + } + + /// Reads the names of all child namespaces + pub fn list_namespaces(&self) -> Result> { + let mut result = Vec::new(); + for entry in std::fs::read_dir(&self.dir)? { + let entry = entry?; + if entry.path().is_dir() { + if let Some(file_name) = entry.path().file_name() { + result.push(file_name.to_string_lossy().into_owned()); + } + } + } + Ok(result) + } +}