devices: virtio-console: refactor and consolidate

This replaces the console backend for the virtio and vhost-user console
devices with a shared implementation.

There is one main worker thread per console device, which handles all
virtio queues for that device, and one thread per port to read from the
input source in a blocking fashion.

BUG=b:354677018
BUG=b:298289666

Change-Id: I376180e26497ed6391a9f49b7d9a5c4d5aafa64f
Reviewed-on: https://chromium-review.googlesource.com/c/crosvm/crosvm/+/5739275
Reviewed-by: Noah Gold <nkgold@google.com>
Commit-Queue: Daniel Verkamp <dverkamp@chromium.org>
Reviewed-by: Frederick Mayle <fmayle@google.com>
This commit is contained in:
Daniel Verkamp 2024-07-22 15:37:50 -07:00 committed by crosvm LUCI
parent 6dc36202de
commit 2e277ed5bb
19 changed files with 1333 additions and 1585 deletions

View file

@ -105,9 +105,9 @@ impl Display for SerialType {
#[serde(rename_all = "kebab-case")]
pub enum SerialHardware {
Serial, // Standard PC-style (8250/16550 compatible) UART
VirtioConsole, // virtio-console device (AsyncConsole)
VirtioConsole, // virtio-console device
Debugcon, // Bochs style debug port
LegacyVirtioConsole, // legacy virtio-console device (Console)
LegacyVirtioConsole, // legacy virtio-console device (alias for VirtioConsole)
}
impl Default for SerialHardware {

View file

@ -1,117 +0,0 @@
// Copyright 2022 The ChromiumOS Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
//! Provides helpers that make it easier to process virtio queues on an async executor.
#![cfg_attr(windows, allow(dead_code))]
use anyhow::bail;
use anyhow::Context;
use base::warn;
use cros_async::AsyncResult;
use cros_async::Executor;
use cros_async::TaskHandle;
use futures::future::AbortHandle;
use futures::future::Abortable;
use futures::future::Pending;
use futures::Future;
/// A queue for which processing can be started on an async executor.
///
/// `T` is the resource type of the queue, i.e. the device-specific data it needs in order to run.
/// For instance, a block device will likely need a file to provide its data.
pub enum AsyncQueueState<T: 'static> {
/// Queue is currently stopped.
Stopped(T),
/// Queue is being processed as a `Task` on an `Executor`, and can be stopped by aborting the
/// `AbortHandle`.
Running((TaskHandle<T>, Executor, AbortHandle)),
/// Something terrible happened and this queue is in a non-recoverable state.
Broken,
}
impl<T: 'static> AsyncQueueState<T> {
/// Start processing of the queue on `ex`, or stop and restart it with the new parameters if
/// it was already running.
///
/// `fut_provider` is a closure that is passed the resource of the queue, as well as a
/// `Abortable` future. It must return a `Future` that takes ownership of the device's resource
/// and processes the queue for as long as possible, but immediately quits and returns the
/// device resource when the `Abortable` is signaled.
///
/// If `fut_provider` or the `Future` it returns end with an error, the queue is considered
/// broken and cannot be used anymore.
///
/// The task is only scheduled and no processing actually starts in this method. The task is
/// scheduled locally, which implies that `ex` must be run on the current thread.
pub fn start<
U: Future<Output = T> + 'static,
F: FnOnce(T, Abortable<Pending<()>>) -> anyhow::Result<U>,
>(
&mut self,
ex: &Executor,
fut_provider: F,
) -> anyhow::Result<()> {
if matches!(self, AsyncQueueState::Running(_)) {
warn!("queue is already running, stopping it first");
self.stop().context("while trying to restart queue")?;
}
let resource = match std::mem::replace(self, AsyncQueueState::Broken) {
AsyncQueueState::Stopped(resource) => resource,
_ => bail!("queue is in a bad state and cannot be started"),
};
let (wait_fut, abort_handle) = futures::future::abortable(futures::future::pending::<()>());
let queue_future = fut_provider(resource, wait_fut)?;
let task = ex.spawn_local(queue_future);
*self = AsyncQueueState::Running((task, ex.clone(), abort_handle));
Ok(())
}
/// Stops a previously started queue.
///
/// The executor on which the task has been started will be run if needed in order to retrieve
/// the queue's resource.
///
/// Returns `true` if the queue was running, `false` if it wasn't.
pub fn stop(&mut self) -> AsyncResult<bool> {
// TODO: schuffelen - All callers should use stop_async instead.
match std::mem::replace(self, AsyncQueueState::Broken) {
AsyncQueueState::Running((task, ex, handle)) => {
// Abort the task and run it to completion to retrieve the queue's resource.
handle.abort();
let resource = ex.run_until(task)?;
*self = AsyncQueueState::Stopped(resource);
Ok(true)
}
state => {
*self = state;
Ok(false)
}
}
}
/// Stops a previously started queue.
///
/// The executor on which the task has been started will be run if needed in order to retrieve
/// the queue's resource.
///
/// Returns `true` if the queue was running, `false` if it wasn't.
pub async fn stop_async(&mut self) -> AsyncResult<bool> {
match std::mem::replace(self, AsyncQueueState::Broken) {
AsyncQueueState::Running((task, _, handle)) => {
// Abort the task and run it to completion to retrieve the queue's resource.
handle.abort();
let resource = task.await;
*self = AsyncQueueState::Stopped(resource);
Ok(true)
}
state => {
*self = state;
Ok(false)
}
}
}
}

View file

@ -2,303 +2,58 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
//! Legacy console device that uses a polling thread. This is kept because it is still used by
//! Windows ; outside of this use-case, please use [[asynchronous::AsyncConsole]] instead.
//! Virtio console device.
pub mod control;
pub mod device;
pub mod input;
pub mod output;
pub mod port;
pub mod worker;
pub mod asynchronous;
mod multiport;
mod sys;
use std::collections::BTreeMap;
use std::collections::VecDeque;
use std::io;
use std::io::Read;
use std::io::Write;
use std::ops::DerefMut;
use std::result;
use std::sync::Arc;
use anyhow::anyhow;
use anyhow::Context;
use base::error;
use base::AsRawDescriptor;
use base::Descriptor;
use base::Event;
use base::EventToken;
use base::RawDescriptor;
#[cfg(windows)]
use base::ReadNotifier;
use base::WaitContext;
use base::WorkerThread;
use data_model::Le16;
use data_model::Le32;
use hypervisor::ProtectionType;
use remain::sorted;
use serde::Deserialize;
use serde::Serialize;
use sync::Mutex;
use thiserror::Error as ThisError;
use vm_memory::GuestMemory;
use zerocopy::AsBytes;
use zerocopy::FromBytes;
use zerocopy::FromZeroes;
use crate::serial::sys::InStreamType;
use crate::virtio::base_features;
use crate::virtio::copy_config;
use crate::virtio::console::device::ConsoleDevice;
use crate::virtio::console::device::ConsoleSnapshot;
use crate::virtio::console::port::ConsolePort;
use crate::virtio::DeviceType;
use crate::virtio::Interrupt;
use crate::virtio::Queue;
use crate::virtio::Reader;
use crate::virtio::VirtioDevice;
use crate::PciAddress;
pub(crate) const QUEUE_SIZE: u16 = 256;
// For now, just implement port 0 (receiveq and transmitq).
// If VIRTIO_CONSOLE_F_MULTIPORT is implemented, more queues will be needed.
const QUEUE_SIZES: &[u16] = &[QUEUE_SIZE, QUEUE_SIZE];
#[sorted]
#[derive(ThisError, Debug)]
pub enum ConsoleError {
/// There are no more available descriptors to receive into
#[error("no rx descriptors available")]
RxDescriptorsExhausted,
}
#[derive(Copy, Clone, Debug, Default, AsBytes, FromZeroes, FromBytes)]
#[repr(C)]
pub struct virtio_console_config {
pub cols: Le16,
pub rows: Le16,
pub max_nr_ports: Le32,
pub emerg_wr: Le32,
}
/// Checks for input from `buffer` and transfers it to the receive queue, if any.
///
/// # Arguments
///
/// * `interrupt` - Interrupt used to signal that the queue has been used
/// * `buffer` - Ring buffer providing data to put into the guest
/// * `receive_queue` - The receive virtio Queue
fn handle_input(
interrupt: &Interrupt,
buffer: &mut VecDeque<u8>,
receive_queue: &Arc<Mutex<Queue>>,
) -> result::Result<(), ConsoleError> {
let mut receive_queue = receive_queue
.try_lock()
.expect("Lock should not be unavailable");
loop {
let mut desc = receive_queue
.peek()
.ok_or(ConsoleError::RxDescriptorsExhausted)?;
let writer = &mut desc.writer;
while writer.available_bytes() > 0 && !buffer.is_empty() {
let (buffer_front, buffer_back) = buffer.as_slices();
let buffer_chunk = if !buffer_front.is_empty() {
buffer_front
} else {
buffer_back
};
let written = writer.write(buffer_chunk).unwrap();
drop(buffer.drain(..written));
}
let bytes_written = writer.bytes_written() as u32;
if bytes_written > 0 {
let desc = desc.pop();
receive_queue.add_used(desc, bytes_written);
receive_queue.trigger_interrupt(interrupt);
}
if bytes_written == 0 {
return Ok(());
}
}
}
/// Writes the available data from the reader into the given output queue.
///
/// # Arguments
///
/// * `reader` - The Reader with the data we want to write.
/// * `output` - The output sink we are going to write the data to.
fn process_transmit_request(reader: &mut Reader, output: &mut dyn io::Write) -> io::Result<()> {
let len = reader.available_bytes();
let mut data = vec![0u8; len];
reader.read_exact(&mut data)?;
output.write_all(&data)?;
output.flush()?;
Ok(())
}
/// Processes the data taken from the given transmit queue into the output sink.
///
/// # Arguments
///
/// * `interrupt` - Interrupt used to signal (if required) that the queue has been used
/// * `transmit_queue` - The transmit virtio Queue
/// * `output` - The output sink we are going to write the data into
fn process_transmit_queue(
interrupt: &Interrupt,
transmit_queue: &Arc<Mutex<Queue>>,
output: &mut dyn io::Write,
) {
let mut needs_interrupt = false;
let mut transmit_queue = transmit_queue
.try_lock()
.expect("Lock should not be unavailable");
while let Some(mut avail_desc) = transmit_queue.pop() {
process_transmit_request(&mut avail_desc.reader, output)
.unwrap_or_else(|e| error!("console: process_transmit_request failed: {}", e));
transmit_queue.add_used(avail_desc, 0);
needs_interrupt = true;
}
if needs_interrupt {
transmit_queue.trigger_interrupt(interrupt);
}
}
struct Worker {
interrupt: Interrupt,
input: Option<Arc<Mutex<VecDeque<u8>>>>,
output: Box<dyn io::Write + Send>,
kill_evt: Event,
in_avail_evt: Event,
receive_queue: Arc<Mutex<Queue>>,
transmit_queue: Arc<Mutex<Queue>>,
}
impl Worker {
fn run(&mut self) -> anyhow::Result<()> {
#[derive(EventToken)]
enum Token {
ReceiveQueueAvailable,
TransmitQueueAvailable,
InputAvailable,
InterruptResample,
Kill,
}
let wait_ctx: WaitContext<Token> = WaitContext::build_with(&[
(
self.transmit_queue.lock().event(),
Token::TransmitQueueAvailable,
),
(
self.receive_queue.lock().event(),
Token::ReceiveQueueAvailable,
),
(&self.in_avail_evt, Token::InputAvailable),
(&self.kill_evt, Token::Kill),
])?;
if let Some(resample_evt) = self.interrupt.get_resample_evt() {
wait_ctx.add(resample_evt, Token::InterruptResample)?;
}
let mut running = true;
while running {
let events = wait_ctx.wait()?;
for event in events.iter().filter(|e| e.is_readable) {
match event.token {
Token::TransmitQueueAvailable => {
self.transmit_queue
.lock()
.event()
.wait()
.context("failed reading transmit queue Event")?;
process_transmit_queue(
&self.interrupt,
&self.transmit_queue,
&mut self.output,
);
}
Token::ReceiveQueueAvailable => {
self.receive_queue
.lock()
.event()
.wait()
.context("failed reading receive queue Event")?;
if let Some(in_buf_ref) = self.input.as_ref() {
let _ = handle_input(
&self.interrupt,
in_buf_ref.lock().deref_mut(),
&self.receive_queue,
);
}
}
Token::InputAvailable => {
self.in_avail_evt
.wait()
.context("failed reading in_avail_evt")?;
if let Some(in_buf_ref) = self.input.as_ref() {
let _ = handle_input(
&self.interrupt,
in_buf_ref.lock().deref_mut(),
&self.receive_queue,
);
}
}
Token::InterruptResample => {
self.interrupt.interrupt_resample();
}
Token::Kill => running = false,
}
}
}
Ok(())
}
}
const QUEUE_SIZE: u16 = 256;
/// Virtio console device.
pub struct Console {
base_features: u64,
in_avail_evt: Event,
worker_thread: Option<WorkerThread<Worker>>,
input: Option<InStreamType>,
output: Option<Box<dyn io::Write + Send>>,
keep_descriptors: Vec<Descriptor>,
input_thread: Option<WorkerThread<InStreamType>>,
// input_buffer is not continuously updated. It holds the state of the buffer when a snapshot
// happens, or when a restore is performed. On a fresh startup, it will be empty. On a restore,
// it will contain whatever data was remaining in the buffer in the snapshot.
input_buffer: VecDeque<u8>,
console: ConsoleDevice,
queue_sizes: Vec<u16>,
pci_address: Option<PciAddress>,
}
#[derive(Serialize, Deserialize)]
struct ConsoleSnapshot {
base_features: u64,
input_buffer: VecDeque<u8>,
}
impl Console {
fn new(
protection_type: ProtectionType,
input: Option<InStreamType>,
output: Option<Box<dyn io::Write + Send>>,
mut keep_rds: Vec<RawDescriptor>,
output: Option<Box<dyn std::io::Write + Send>>,
keep_rds: Vec<RawDescriptor>,
pci_address: Option<PciAddress>,
) -> Console {
let in_avail_evt = Event::new().expect("failed creating Event");
keep_rds.push(in_avail_evt.as_raw_descriptor());
let port = ConsolePort::new(input, output, None, keep_rds);
let console = ConsoleDevice::new_single_port(protection_type, port);
let queue_sizes = vec![QUEUE_SIZE; console.max_queues()];
Console {
base_features: base_features(protection_type),
in_avail_evt,
worker_thread: None,
input,
output,
keep_descriptors: keep_rds.iter().map(|rd| Descriptor(*rd)).collect(),
input_thread: None,
input_buffer: VecDeque::new(),
console,
queue_sizes,
pci_address,
}
}
@ -306,15 +61,11 @@ impl Console {
impl VirtioDevice for Console {
fn keep_rds(&self) -> Vec<RawDescriptor> {
// return the raw descriptors as opposed to descriptor.
self.keep_descriptors
.iter()
.map(|descr| descr.as_raw_descriptor())
.collect()
self.console.keep_rds()
}
fn features(&self) -> u64 {
self.base_features
self.console.features()
}
fn device_type(&self) -> DeviceType {
@ -322,71 +73,26 @@ impl VirtioDevice for Console {
}
fn queue_max_sizes(&self) -> &[u16] {
QUEUE_SIZES
&self.queue_sizes
}
fn read_config(&self, offset: u64, data: &mut [u8]) {
let config = virtio_console_config {
max_nr_ports: 1.into(),
..Default::default()
};
copy_config(data, 0, config.as_bytes(), offset);
self.console.read_config(offset, data);
}
fn on_device_sandboxed(&mut self) {
self.console.start_input_threads();
}
fn activate(
&mut self,
_mem: GuestMemory,
interrupt: Interrupt,
mut queues: BTreeMap<usize, Queue>,
queues: BTreeMap<usize, Queue>,
) -> anyhow::Result<()> {
if queues.len() < 2 {
return Err(anyhow!("expected 2 queues, got {}", queues.len()));
for (idx, queue) in queues.into_iter() {
self.console.start_queue(idx, queue, interrupt.clone())?
}
let receive_queue = queues.remove(&0).unwrap();
let transmit_queue = queues.remove(&1).unwrap();
let in_avail_evt = self
.in_avail_evt
.try_clone()
.context("failed creating input available Event pair")?;
// Spawn a separate thread to poll self.input.
// A thread is used because io::Read only provides a blocking interface, and there is no
// generic way to add an io::Read instance to a poll context (it may not be backed by a file
// descriptor). Moving the blocking read call to a separate thread and sending data back to
// the main worker thread with an event for notification bridges this gap.
let input = match self.input.take() {
Some(read) => {
let (buffer, thread) = sys::spawn_input_thread(
read,
&self.in_avail_evt,
std::mem::take(&mut self.input_buffer),
);
self.input_thread = Some(thread);
Some(buffer)
}
None => None,
};
let output = self.output.take().unwrap_or_else(|| Box::new(io::sink()));
self.worker_thread = Some(WorkerThread::start("v_console", move |kill_evt| {
let mut worker = Worker {
interrupt,
input,
output,
in_avail_evt,
kill_evt,
// Device -> driver
receive_queue: Arc::new(Mutex::new(receive_queue)),
// Driver -> device
transmit_queue: Arc::new(Mutex::new(transmit_queue)),
};
if let Err(e) = worker.run() {
error!("console run failure: {:?}", e);
};
worker
}));
Ok(())
}
@ -395,100 +101,50 @@ impl VirtioDevice for Console {
}
fn reset(&mut self) -> anyhow::Result<()> {
if let Some(input_thread) = self.input_thread.take() {
self.input = Some(input_thread.stop());
}
if let Some(worker_thread) = self.worker_thread.take() {
let worker = worker_thread.stop();
// NOTE: Even though we are reseting the device, it still makes sense to preserve the
// pending input bytes that the host sent but the guest hasn't accepted yet.
self.input_buffer = worker
.input
.map_or(VecDeque::new(), |arc_mutex| arc_mutex.lock().clone());
self.output = Some(worker.output);
}
Ok(())
self.console.reset()
}
fn virtio_sleep(&mut self) -> anyhow::Result<Option<BTreeMap<usize, Queue>>> {
if let Some(input_thread) = self.input_thread.take() {
self.input = Some(input_thread.stop());
// Stop and collect all the queues.
let mut queues = BTreeMap::new();
for idx in 0..self.console.max_queues() {
if let Some(queue) = self
.console
.stop_queue(idx)
.with_context(|| format!("failed to stop queue {idx}"))?
{
queues.insert(idx, queue);
}
}
if let Some(worker_thread) = self.worker_thread.take() {
let worker = worker_thread.stop();
self.input_buffer = worker
.input
.map_or(VecDeque::new(), |arc_mutex| arc_mutex.lock().clone());
self.output = Some(worker.output);
let receive_queue = match Arc::try_unwrap(worker.receive_queue) {
Ok(mutex) => mutex.into_inner(),
Err(_) => return Err(anyhow!("failed to retrieve receive queue to sleep device.")),
};
let transmit_queue = match Arc::try_unwrap(worker.transmit_queue) {
Ok(mutex) => mutex.into_inner(),
Err(_) => {
return Err(anyhow!(
"failed to retrieve transmit queue to sleep device."
))
}
};
return Ok(Some(BTreeMap::from([
(0, receive_queue),
(1, transmit_queue),
])));
if !queues.is_empty() {
Ok(Some(queues))
} else {
Ok(None)
}
Ok(None)
}
fn virtio_wake(
&mut self,
queues_state: Option<(GuestMemory, Interrupt, BTreeMap<usize, Queue>)>,
) -> anyhow::Result<()> {
match queues_state {
None => Ok(()),
Some((mem, interrupt, queues)) => {
// TODO(khei): activate is just what we want at the moment, but we should probably
// move it into a "start workers" function to make it obvious that
// it isn't strictly used for activate events.
self.activate(mem, interrupt, queues)?;
Ok(())
if let Some((_mem, interrupt, queues)) = queues_state {
for (idx, queue) in queues.into_iter() {
self.console.start_queue(idx, queue, interrupt.clone())?;
}
}
Ok(())
}
fn virtio_snapshot(&mut self) -> anyhow::Result<serde_json::Value> {
if let Some(read) = self.input.as_mut() {
// If the device was not activated yet, we still read the input.
// It's fine to do so since the the data is not lost. It will get queued in the
// input_buffer and restored. When the device activates, the data will still be
// available, and if there's any new data, that new data will get appended.
let input_buffer = Arc::new(Mutex::new(std::mem::take(&mut self.input_buffer)));
let kill_evt = Event::new().unwrap();
let _ = kill_evt.signal();
sys::read_input(read, &self.in_avail_evt, input_buffer.clone(), kill_evt);
self.input_buffer = std::mem::take(&mut input_buffer.lock());
};
serde_json::to_value(ConsoleSnapshot {
// Snapshot base_features as a safeguard when restoring the console device. Saving this
// info allows us to validate that the proper config was used for the console.
base_features: self.base_features,
input_buffer: self.input_buffer.clone(),
})
.context("failed to snapshot virtio console")
let snap = self.console.snapshot()?;
serde_json::to_value(snap).context("failed to snapshot virtio console")
}
fn virtio_restore(&mut self, data: serde_json::Value) -> anyhow::Result<()> {
let deser: ConsoleSnapshot =
let snap: ConsoleSnapshot =
serde_json::from_value(data).context("failed to deserialize virtio console")?;
anyhow::ensure!(
self.base_features == deser.base_features,
"Virtio console incorrect base features for restore:\n Expected: {}, Actual: {}",
self.base_features,
deser.base_features,
);
self.input_buffer = deser.input_buffer;
Ok(())
self.console.restore(&snap)
}
}
@ -504,51 +160,95 @@ mod tests {
struct ConsoleContext {
#[cfg(windows)]
input_peer: named_pipes::PipeConnection,
input_pipe_client: named_pipes::PipeConnection,
}
fn modify_device(_context: &mut ConsoleContext, b: &mut Console) {
b.input_buffer.push_back(0);
let input_buffer = b.console.ports[0].clone_input_buffer();
input_buffer.lock().push_back(0);
}
#[cfg(any(target_os = "android", target_os = "linux"))]
fn create_device() -> (ConsoleContext, Console) {
#[cfg(any(target_os = "android", target_os = "linux"))]
let (input, context) = (Box::new(tempfile().unwrap()), ConsoleContext {});
#[cfg(windows)]
let (input, context) = {
let (x, y) = named_pipes::pair(
&named_pipes::FramingMode::Byte,
&named_pipes::BlockingMode::NoWait,
0,
)
.unwrap();
(Box::new(x), ConsoleContext { input_peer: y })
};
let input = Box::new(tempfile().unwrap());
let output = Box::new(tempfile().unwrap());
(
context,
Console::new(
hypervisor::ProtectionType::Unprotected,
Some(input),
Some(output),
Vec::new(),
None,
),
let console = Console::new(
hypervisor::ProtectionType::Unprotected,
Some(input),
Some(output),
Vec::new(),
None,
);
let context = ConsoleContext {};
(context, console)
}
#[cfg(windows)]
fn create_device() -> (ConsoleContext, Console) {
let (input_pipe_server, input_pipe_client) = named_pipes::pair(
&named_pipes::FramingMode::Byte,
&named_pipes::BlockingMode::NoWait,
0,
)
.unwrap();
let input = Box::new(input_pipe_server);
let output = Box::new(tempfile().unwrap());
let console = Console::new(
hypervisor::ProtectionType::Unprotected,
Some(input),
Some(output),
Vec::new(),
None,
);
let context = ConsoleContext { input_pipe_client };
(context, console)
}
suspendable_virtio_tests!(console, create_device, 2, modify_device);
#[test]
fn test_inactive_sleep_resume() {
let (_ctx, device) = &mut create_device();
let (_ctx, mut device) = create_device();
let input_buffer = device.console.ports[0].clone_input_buffer();
// Initialize the device, starting the input thread, but don't activate any queues.
device.on_device_sandboxed();
// No queues were started, so `virtio_sleep()` should return `None`.
let sleep_result = device.virtio_sleep().expect("failed to sleep");
assert!(sleep_result.is_none());
device.virtio_snapshot().expect("failed to snapshot");
// Inject some input data.
input_buffer.lock().extend(b"Hello".iter());
// Ensure snapshot does not fail and contains the buffered input data.
let snapshot = device.virtio_snapshot().expect("failed to snapshot");
let snapshot_input_buffer = snapshot
.get("ports")
.unwrap()
.get(0)
.unwrap()
.get("input_buffer")
.unwrap()
.as_array()
.unwrap();
assert_eq!(snapshot_input_buffer.len(), b"Hello".len());
assert_eq!(snapshot_input_buffer[0].as_i64(), Some(b'H' as i64));
assert_eq!(snapshot_input_buffer[1].as_i64(), Some(b'e' as i64));
assert_eq!(snapshot_input_buffer[2].as_i64(), Some(b'l' as i64));
assert_eq!(snapshot_input_buffer[3].as_i64(), Some(b'l' as i64));
assert_eq!(snapshot_input_buffer[4].as_i64(), Some(b'o' as i64));
// Wake up the device, which should start the input thread again.
device.virtio_wake(None).expect("failed to wake");
// Make sure the input and output haven't been dropped.
assert!(device.input.is_some());
assert!(device.output.is_some());
}
}

View file

@ -1,565 +0,0 @@
// Copyright 2020 The ChromiumOS Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
//! Asynchronous console device which implementation can be shared by VMM and vhost-user.
use std::collections::BTreeMap;
use std::collections::VecDeque;
use std::io;
use std::sync::Arc;
use anyhow::anyhow;
use anyhow::Context;
use base::error;
#[cfg(windows)]
use base::named_pipes;
use base::AsRawDescriptor;
use base::Descriptor;
use base::Event;
use base::FileSync;
use base::RawDescriptor;
use base::WorkerThread;
use cros_async::select2;
use cros_async::AsyncResult;
use cros_async::EventAsync;
use cros_async::Executor;
use cros_async::IntoAsync;
use cros_async::IoSource;
use futures::FutureExt;
use hypervisor::ProtectionType;
use sync::Mutex;
use vm_memory::GuestMemory;
use vmm_vhost::VHOST_USER_F_PROTOCOL_FEATURES;
use zerocopy::AsBytes;
use super::handle_input;
use super::process_transmit_queue;
use super::QUEUE_SIZES;
use crate::serial_device::SerialInput;
use crate::serial_device::SerialOptions;
use crate::virtio;
use crate::virtio::async_device::AsyncQueueState;
use crate::virtio::async_utils;
use crate::virtio::base_features;
use crate::virtio::console::multiport::ConsolePortInfo;
use crate::virtio::console::multiport::ControlPort;
use crate::virtio::console::virtio_console_config;
use crate::virtio::console::ConsoleError;
use crate::virtio::copy_config;
use crate::virtio::device_constants::console::VIRTIO_CONSOLE_F_MULTIPORT;
use crate::virtio::DeviceType;
use crate::virtio::Interrupt;
use crate::virtio::Queue;
use crate::virtio::VirtioDevice;
use crate::PciAddress;
use crate::SerialDevice;
/// Wrapper that makes any `SerialInput` usable as an async source by providing an implementation of
/// `IntoAsync`.
struct AsyncSerialInput(Box<dyn SerialInput>);
impl AsRawDescriptor for AsyncSerialInput {
fn as_raw_descriptor(&self) -> RawDescriptor {
self.0.get_read_notifier().as_raw_descriptor()
}
}
impl IntoAsync for AsyncSerialInput {}
async fn run_tx_queue(
queue: &Arc<Mutex<virtio::Queue>>,
doorbell: Interrupt,
kick_evt: EventAsync,
output: &mut Box<dyn io::Write + Send>,
) {
loop {
if let Err(e) = kick_evt.next_val().await {
error!("Failed to read kick event for tx queue: {}", e);
break;
}
process_transmit_queue(&doorbell, queue, output.as_mut());
}
}
async fn run_rx_queue(
queue: &Arc<Mutex<virtio::Queue>>,
doorbell: Interrupt,
kick_evt: EventAsync,
input: &IoSource<AsyncSerialInput>,
) {
// Staging buffer, required because of `handle_input`'s API. We can probably remove this once
// the regular virtio device is switched to async.
let mut in_buffer = VecDeque::<u8>::new();
let mut rx_buf = vec![0u8; 4096];
loop {
match input.read_to_vec(None, rx_buf).await {
// Input source has closed.
Ok((0, _)) => break,
Ok((size, v)) => {
in_buffer.extend(&v[0..size]);
rx_buf = v;
}
Err(e) => {
error!("Failed to read console input: {}", e);
return;
}
}
// Submit all the data obtained during this read.
while !in_buffer.is_empty() {
match handle_input(&doorbell, &mut in_buffer, queue) {
Ok(()) => {}
Err(ConsoleError::RxDescriptorsExhausted) => {
// Wait until a descriptor becomes available and try again.
if let Err(e) = kick_evt.next_val().await {
error!("Failed to read kick event for rx queue: {}", e);
return;
}
}
}
}
}
}
pub struct ConsolePort {
input: Option<AsyncQueueState<AsyncSerialInput>>,
output: AsyncQueueState<Box<dyn io::Write + Send>>,
info: ConsolePortInfo,
}
impl SerialDevice for ConsolePort {
fn new(
_protection_type: ProtectionType,
_evt: Event,
input: Option<Box<dyn SerialInput>>,
output: Option<Box<dyn io::Write + Send>>,
_sync: Option<Box<dyn FileSync + Send>>,
options: SerialOptions,
_keep_rds: Vec<RawDescriptor>,
) -> ConsolePort {
let input = input.map(AsyncSerialInput).map(AsyncQueueState::Stopped);
let output = AsyncQueueState::Stopped(output.unwrap_or_else(|| Box::new(io::sink())));
let info = ConsolePortInfo {
console: options.console,
name: options.name.unwrap_or_default(),
};
ConsolePort {
input,
output,
info,
}
}
#[cfg(windows)]
fn new_with_pipe(
_protection_type: ProtectionType,
_interrupt_evt: Event,
_pipe_in: named_pipes::PipeConnection,
_pipe_out: named_pipes::PipeConnection,
_options: SerialOptions,
_keep_rds: Vec<RawDescriptor>,
) -> ConsolePort {
unimplemented!("new_with_pipe unimplemented for ConsolePort");
}
}
impl ConsolePort {
pub fn start_receive_queue(
&mut self,
ex: &Executor,
queue: Arc<Mutex<virtio::Queue>>,
doorbell: Interrupt,
) -> anyhow::Result<()> {
let input_queue = match self.input.as_mut() {
Some(input_queue) => input_queue,
None => return Ok(()),
};
let kick_evt = queue
.lock()
.event()
.try_clone()
.context("Failed to clone queue event")?;
let kick_evt =
EventAsync::new(kick_evt, ex).context("Failed to create EventAsync for kick_evt")?;
let closure_ex = ex.clone();
let rx_future = move |input, abort| {
let async_input = closure_ex
.async_from(input)
.context("failed to create async input")?;
Ok(async move {
select2(
run_rx_queue(&queue, doorbell, kick_evt, &async_input).boxed_local(),
abort,
)
.await;
async_input.into_source()
})
};
input_queue.start(ex, rx_future)
}
pub fn stop_receive_queue(&mut self) -> AsyncResult<bool> {
if let Some(queue) = self.input.as_mut() {
queue.stop()
} else {
Ok(false)
}
}
pub fn start_transmit_queue(
&mut self,
ex: &Executor,
queue: Arc<Mutex<virtio::Queue>>,
doorbell: Interrupt,
) -> anyhow::Result<()> {
let kick_evt = queue
.lock()
.event()
.try_clone()
.context("Failed to clone queue event")?;
let kick_evt =
EventAsync::new(kick_evt, ex).context("Failed to create EventAsync for kick_evt")?;
let tx_future = |mut output, abort| {
Ok(async move {
select2(
run_tx_queue(&queue, doorbell, kick_evt, &mut output).boxed_local(),
abort,
)
.await;
output
})
};
self.output.start(ex, tx_future)
}
pub fn stop_transmit_queue(&mut self) -> AsyncResult<bool> {
self.output.stop()
}
}
/// Console device with an optional control port to support for multiport
pub struct ConsoleDevice {
avail_features: u64,
// Port 0 always exists.
port0: ConsolePort,
// Control port, if multiport is in use.
control_port: Option<ControlPort>,
// Port 1..n, if they exist.
extra_ports: Vec<ConsolePort>,
}
impl ConsoleDevice {
/// Create a console device with the multiport feature enabled
/// The multiport feature is referred to virtio spec.
pub fn new_multi_port(
protection_type: ProtectionType,
port0: ConsolePort,
extra_ports: Vec<ConsolePort>,
) -> ConsoleDevice {
let avail_features =
virtio::base_features(protection_type) | (1 << VIRTIO_CONSOLE_F_MULTIPORT);
let info = std::iter::once(&port0)
.chain(extra_ports.iter())
.map(|port| port.info.clone())
.collect::<Vec<_>>();
ConsoleDevice {
avail_features,
port0,
control_port: Some(ControlPort::new(info)),
extra_ports,
}
}
/// Return available features
pub fn avail_features(&self) -> u64 {
self.avail_features
}
/// Return whether current console device supports multiport feature
pub fn is_multi_port(&self) -> bool {
self.avail_features & (1 << VIRTIO_CONSOLE_F_MULTIPORT) != 0
}
/// Return the number of the port initiated by the console device
pub fn max_ports(&self) -> usize {
1 + self.extra_ports.len()
}
/// Returns the maximum number of queues supported by this device.
pub fn max_queues(&self) -> usize {
// The port 0 receive and transmit queues always exist;
// other queues only exist if VIRTIO_CONSOLE_F_MULTIPORT is set.
if self.is_multi_port() {
let port_num = self.max_ports();
// Extra 1 is for control port; each port has two queues (tx & rx)
(port_num + 1) * 2
} else {
2
}
}
/// Return the reference of the console port by port_id
fn get_console_port(&mut self, port_id: usize) -> anyhow::Result<&mut ConsolePort> {
match port_id {
0 => Ok(&mut self.port0),
port_id => self
.extra_ports
.get_mut(port_id - 1)
.with_context(|| format!("failed to get console port {}", port_id)),
}
}
/// Start the queue with the index `idx`
pub fn start_queue(
&mut self,
ex: &Executor,
idx: usize,
queue: Arc<Mutex<virtio::Queue>>,
doorbell: Interrupt,
) -> anyhow::Result<()> {
match idx {
// rxq (port0)
0 => self.port0.start_receive_queue(ex, queue, doorbell),
// txq (port0)
1 => self.port0.start_transmit_queue(ex, queue, doorbell),
// control port rxq
2 => self
.control_port
.as_mut()
.unwrap()
.start_receive_queue(ex, queue, doorbell),
// control port txq
3 => self
.control_port
.as_mut()
.unwrap()
.start_transmit_queue(ex, queue, doorbell),
// {4, 5} -> port1 {rxq, txq} if exist
// {6, 7} -> port2 {rxq, txq} if exist
// ...
_ => {
let port_id = idx / 2 - 1;
let port = self.get_console_port(port_id)?;
match idx % 2 {
0 => port.start_receive_queue(ex, queue, doorbell),
1 => port.start_transmit_queue(ex, queue, doorbell),
_ => unreachable!(),
}
}
}
}
/// Stop the queue with the index `idx`
pub fn stop_queue(&mut self, idx: usize) -> anyhow::Result<bool> {
match idx {
0 => self
.port0
.stop_receive_queue()
.context("failed to stop rx queue"),
1 => self
.port0
.stop_transmit_queue()
.context("failed to stop tx queue"),
2 => self.control_port.as_mut().unwrap().stop_receive_queue(),
3 => self.control_port.as_mut().unwrap().stop_transmit_queue(),
_ => {
let port_id = idx / 2 - 1;
let port = self.get_console_port(port_id)?;
match idx % 2 {
0 => port.stop_receive_queue().context("failed to stop rx queue"),
1 => port
.stop_transmit_queue()
.context("failed to stop tx queue"),
_ => unreachable!(),
}
}
}
}
}
impl SerialDevice for ConsoleDevice {
/// Create a default console device, without multiport support
fn new(
protection_type: ProtectionType,
evt: Event,
input: Option<Box<dyn SerialInput>>,
output: Option<Box<dyn io::Write + Send>>,
sync: Option<Box<dyn FileSync + Send>>,
options: SerialOptions,
keep_rds: Vec<RawDescriptor>,
) -> ConsoleDevice {
let avail_features =
virtio::base_features(protection_type) | 1 << VHOST_USER_F_PROTOCOL_FEATURES;
let port0 = ConsolePort::new(protection_type, evt, input, output, sync, options, keep_rds);
ConsoleDevice {
avail_features,
port0,
control_port: None,
extra_ports: vec![],
}
}
#[cfg(windows)]
fn new_with_pipe(
_protection_type: ProtectionType,
_interrupt_evt: Event,
_pipe_in: named_pipes::PipeConnection,
_pipe_out: named_pipes::PipeConnection,
_options: SerialOptions,
_keep_rds: Vec<RawDescriptor>,
) -> ConsoleDevice {
unimplemented!("new_with_pipe unimplemented for ConsoleDevice");
}
}
/// Virtio console device.
pub struct AsyncConsole {
console_device: Option<ConsoleDevice>,
worker_thread: Option<WorkerThread<anyhow::Result<ConsoleDevice>>>,
base_features: u64,
keep_descriptors: Vec<Descriptor>,
pci_address: Option<PciAddress>,
}
impl SerialDevice for AsyncConsole {
fn new(
protection_type: ProtectionType,
evt: Event,
input: Option<Box<dyn SerialInput>>,
output: Option<Box<dyn io::Write + Send>>,
sync: Option<Box<dyn FileSync + Send>>,
options: SerialOptions,
keep_rds: Vec<RawDescriptor>,
) -> AsyncConsole {
let pci_address = options.pci_address;
AsyncConsole {
console_device: Some(ConsoleDevice::new(
protection_type,
evt,
input,
output,
sync,
options,
Default::default(),
)),
worker_thread: None,
base_features: base_features(protection_type),
keep_descriptors: keep_rds.iter().copied().map(Descriptor).collect(),
pci_address,
}
}
#[cfg(windows)]
fn new_with_pipe(
_protection_type: ProtectionType,
_interrupt_evt: Event,
_pipe_in: named_pipes::PipeConnection,
_pipe_out: named_pipes::PipeConnection,
_options: SerialOptions,
_keep_rds: Vec<RawDescriptor>,
) -> AsyncConsole {
unimplemented!("new_with_pipe unimplemented for AsyncConsole");
}
}
impl VirtioDevice for AsyncConsole {
fn keep_rds(&self) -> Vec<RawDescriptor> {
self.keep_descriptors
.iter()
.map(Descriptor::as_raw_descriptor)
.collect()
}
fn features(&self) -> u64 {
self.base_features
}
fn device_type(&self) -> DeviceType {
DeviceType::Console
}
fn queue_max_sizes(&self) -> &[u16] {
QUEUE_SIZES
}
fn read_config(&self, offset: u64, data: &mut [u8]) {
let config = virtio_console_config {
max_nr_ports: 1.into(),
..Default::default()
};
copy_config(data, 0, config.as_bytes(), offset);
}
fn activate(
&mut self,
_mem: GuestMemory,
interrupt: Interrupt,
mut queues: BTreeMap<usize, Queue>,
) -> anyhow::Result<()> {
if queues.len() < 2 {
return Err(anyhow!("expected 2 queues, got {}", queues.len()));
}
let console = self.console_device.take().context("no console_device")?;
let ex = Executor::new().expect("failed to create an executor");
let receive_queue = queues.remove(&0).unwrap();
let transmit_queue = queues.remove(&1).unwrap();
self.worker_thread = Some(WorkerThread::start("v_console", move |kill_evt| {
let mut console = console;
let receive_queue = Arc::new(Mutex::new(receive_queue));
let transmit_queue = Arc::new(Mutex::new(transmit_queue));
// Start transmit queue of port 0
console.start_queue(&ex, 0, receive_queue, interrupt.clone())?;
// Start receive queue of port 0
console.start_queue(&ex, 1, transmit_queue, interrupt.clone())?;
// Run until the kill event is signaled and cancel all tasks.
ex.run_until(async {
async_utils::await_and_exit(&ex, kill_evt).await?;
let port = &mut console.port0;
if let Some(input) = port.input.as_mut() {
input
.stop_async()
.await
.context("failed to stop rx queue")?;
}
port.output
.stop_async()
.await
.context("failed to stop tx queue")?;
Ok(console)
})?
}));
Ok(())
}
fn pci_address(&self) -> Option<PciAddress> {
self.pci_address
}
fn reset(&mut self) -> anyhow::Result<()> {
if let Some(worker_thread) = self.worker_thread.take() {
let console = worker_thread.stop()?;
self.console_device = Some(console);
}
Ok(())
}
}

View file

@ -0,0 +1,180 @@
// Copyright 2024 The ChromiumOS Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
//! Virtio console device control queue handling.
use std::collections::VecDeque;
use std::io::Write;
use anyhow::anyhow;
use anyhow::Context;
use base::debug;
use base::error;
use zerocopy::AsBytes;
use crate::virtio::console::worker::WorkerPort;
use crate::virtio::device_constants::console::virtio_console_control;
use crate::virtio::device_constants::console::VIRTIO_CONSOLE_CONSOLE_PORT;
use crate::virtio::device_constants::console::VIRTIO_CONSOLE_DEVICE_ADD;
use crate::virtio::device_constants::console::VIRTIO_CONSOLE_DEVICE_READY;
use crate::virtio::device_constants::console::VIRTIO_CONSOLE_PORT_NAME;
use crate::virtio::device_constants::console::VIRTIO_CONSOLE_PORT_OPEN;
use crate::virtio::device_constants::console::VIRTIO_CONSOLE_PORT_READY;
use crate::virtio::Interrupt;
use crate::virtio::Queue;
use crate::virtio::Reader;
pub type ControlMsgBytes = Box<[u8]>;
fn control_msg(id: u32, event: u16, value: u16, extra_bytes: &[u8]) -> ControlMsgBytes {
virtio_console_control {
id: id.into(),
event: event.into(),
value: value.into(),
}
.as_bytes()
.iter()
.chain(extra_bytes.iter())
.copied()
.collect()
}
fn process_control_msg(
reader: &mut Reader,
ports: &[WorkerPort],
pending_receive_control_msgs: &mut VecDeque<ControlMsgBytes>,
) -> anyhow::Result<()> {
let ctrl_msg: virtio_console_control =
reader.read_obj().context("failed to read from reader")?;
let id = ctrl_msg.id.to_native();
let event = ctrl_msg.event.to_native();
let value = ctrl_msg.value.to_native();
match event {
VIRTIO_CONSOLE_DEVICE_READY => {
// value of 1 indicates success, and 0 indicates failure
if value != 1 {
return Err(anyhow!("console device ready failure ({value})"));
}
for (index, port) in ports.iter().enumerate() {
let port_id = index as u32;
// TODO(dverkamp): cap the size of `pending_receive_control_msgs` somehow
pending_receive_control_msgs.push_back(control_msg(
port_id,
VIRTIO_CONSOLE_DEVICE_ADD,
0,
&[],
));
if let Some(name) = port.name() {
pending_receive_control_msgs.push_back(control_msg(
port_id,
VIRTIO_CONSOLE_PORT_NAME,
0,
name.as_bytes(),
));
}
}
Ok(())
}
VIRTIO_CONSOLE_PORT_READY => {
// value of 1 indicates success, and 0 indicates failure
if value != 1 {
return Err(anyhow!("console port{id} ready failure ({value})"));
}
let port = ports
.get(id as usize)
.with_context(|| format!("invalid port id {id}"))?;
pending_receive_control_msgs.push_back(control_msg(
id,
VIRTIO_CONSOLE_PORT_OPEN,
1,
&[],
));
if port.is_console() {
pending_receive_control_msgs.push_back(control_msg(
id,
VIRTIO_CONSOLE_CONSOLE_PORT,
1,
&[],
));
}
Ok(())
}
VIRTIO_CONSOLE_PORT_OPEN => {
match value {
// Currently, port state change is not supported, default is open.
// And only print debug info here.
0 => debug!("console port{id} close"),
1 => debug!("console port{id} open"),
_ => error!("console port{id} unknown value {value}"),
}
Ok(())
}
_ => Err(anyhow!("unexpected control event {}", event)),
}
}
pub fn process_control_transmit_queue(
queue: &mut Queue,
interrupt: &Interrupt,
ports: &[WorkerPort],
pending_receive_control_msgs: &mut VecDeque<ControlMsgBytes>,
) {
let mut needs_interrupt = false;
while let Some(mut avail_desc) = queue.pop() {
if let Err(e) =
process_control_msg(&mut avail_desc.reader, ports, pending_receive_control_msgs)
{
error!("failed to handle control msg: {:#}", e);
}
queue.add_used(avail_desc, 0);
needs_interrupt = true;
}
if needs_interrupt {
queue.trigger_interrupt(interrupt);
}
}
pub fn process_control_receive_queue(
queue: &mut Queue,
interrupt: &Interrupt,
pending_receive_control_msgs: &mut VecDeque<ControlMsgBytes>,
) {
let mut needs_interrupt = false;
while !pending_receive_control_msgs.is_empty() {
let Some(mut avail_desc) = queue.pop() else {
break;
};
// Get a reply to copy into `avail_desc`. This should never fail since we check that
// `pending_receive_control_msgs` is not empty in the loop condition.
let reply = pending_receive_control_msgs
.pop_front()
.expect("missing reply");
let len = match avail_desc.writer.write_all(&reply) {
Ok(()) => avail_desc.writer.bytes_written() as u32,
Err(e) => {
error!("failed to write control receiveq reply: {}", e);
0
}
};
queue.add_used(avail_desc, len);
needs_interrupt = true;
}
if needs_interrupt {
queue.trigger_interrupt(interrupt);
}
}

View file

@ -0,0 +1,172 @@
// Copyright 2024 The ChromiumOS Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
//! virtio-console and vhost-user-console device shared backend implementation
use base::RawDescriptor;
use data_model::Le32;
use hypervisor::ProtectionType;
use serde::Deserialize;
use serde::Serialize;
use zerocopy::AsBytes;
use crate::virtio::base_features;
use crate::virtio::console::port::ConsolePort;
use crate::virtio::console::port::ConsolePortSnapshot;
use crate::virtio::console::worker::WorkerHandle;
use crate::virtio::console::worker::WorkerPort;
use crate::virtio::copy_config;
use crate::virtio::device_constants::console::virtio_console_config;
use crate::virtio::device_constants::console::VIRTIO_CONSOLE_F_MULTIPORT;
use crate::virtio::Interrupt;
use crate::virtio::Queue;
pub struct ConsoleDevice {
avail_features: u64,
pub(crate) ports: Vec<ConsolePort>,
worker: Option<WorkerHandle>,
}
#[derive(Serialize, Deserialize)]
pub struct ConsoleSnapshot {
avail_features: u64,
ports: Vec<ConsolePortSnapshot>,
}
impl ConsoleDevice {
/// Create a console device that does not support the multiport feature.
pub fn new_single_port(protection_type: ProtectionType, port: ConsolePort) -> ConsoleDevice {
ConsoleDevice {
avail_features: base_features(protection_type),
ports: vec![port],
worker: None,
}
}
/// Create a console device with the multiport feature enabled.
pub fn new_multi_port(
protection_type: ProtectionType,
ports: Vec<ConsolePort>,
) -> ConsoleDevice {
// Port 0 must always exist.
assert!(!ports.is_empty());
let avail_features = base_features(protection_type) | (1 << VIRTIO_CONSOLE_F_MULTIPORT);
ConsoleDevice {
avail_features,
ports,
worker: None,
}
}
pub fn features(&self) -> u64 {
self.avail_features
}
pub fn max_ports(&self) -> usize {
self.ports.len()
}
/// Returns the maximum number of queues supported by this device.
pub fn max_queues(&self) -> usize {
// The port 0 receive and transmit queues always exist;
// other queues only exist if VIRTIO_CONSOLE_F_MULTIPORT is set.
let num_queues = self.ports.len().max(1);
if self.avail_features & (1 << VIRTIO_CONSOLE_F_MULTIPORT) != 0 {
// Each port has two queues (tx & rx), plus 2 for control receiveq and transmitq.
num_queues * 2 + 2
} else {
// port0 receiveq + transmitq
2
}
}
pub fn read_config(&self, offset: u64, data: &mut [u8]) {
let max_nr_ports = self.max_ports();
let config = virtio_console_config {
max_nr_ports: Le32::from(max_nr_ports as u32),
..Default::default()
};
copy_config(data, 0, config.as_bytes(), offset);
}
pub fn keep_rds(&self) -> Vec<RawDescriptor> {
self.ports.iter().flat_map(ConsolePort::keep_rds).collect()
}
fn ensure_worker_started(&mut self, interrupt: Interrupt) -> &mut WorkerHandle {
self.worker.get_or_insert_with(|| {
let ports = self
.ports
.iter_mut()
.map(WorkerPort::from_console_port)
.collect();
WorkerHandle::new(interrupt, ports).expect("failed to create console worker")
})
}
pub fn start_queue(
&mut self,
idx: usize,
queue: Queue,
interrupt: Interrupt,
) -> anyhow::Result<()> {
let worker = self.ensure_worker_started(interrupt);
worker.start_queue(idx, queue)
}
pub fn stop_queue(&mut self, idx: usize) -> anyhow::Result<Option<Queue>> {
match self.worker.as_mut() {
Some(worker) => worker.stop_queue(idx),
None => Ok(None),
}
}
pub fn reset(&mut self) -> anyhow::Result<()> {
for idx in 0..self.max_queues() {
let _ = self.stop_queue(idx);
}
Ok(())
}
pub fn start_input_threads(&mut self) {
for port in self.ports.iter_mut() {
port.start_input_thread();
}
}
pub fn stop_input_threads(&mut self) {
for port in self.ports.iter_mut() {
port.stop_input_thread();
}
}
pub fn snapshot(&mut self) -> anyhow::Result<ConsoleSnapshot> {
let mut ports = Vec::new();
for port in &mut self.ports {
ports.push(port.snapshot());
}
Ok(ConsoleSnapshot {
avail_features: self.avail_features,
ports,
})
}
pub fn restore(&mut self, snap: &ConsoleSnapshot) -> anyhow::Result<()> {
anyhow::ensure!(
self.avail_features == snap.avail_features,
"Virtio console incorrect features for restore: Expected: {}, Actual: {}",
self.avail_features,
snap.avail_features,
);
for (port, port_snap) in self.ports.iter_mut().zip(snap.ports.iter()) {
port.restore(port_snap);
}
Ok(())
}
}

View file

@ -0,0 +1,50 @@
// Copyright 2024 The ChromiumOS Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
//! Virtio console device input handling.
use std::collections::VecDeque;
use std::io::Write;
use crate::virtio::Interrupt;
use crate::virtio::Queue;
/// Checks for input from `buffer` and transfers it to the receive queue, if any.
///
/// # Arguments
///
/// * `interrupt` - Interrupt used to signal that the queue has been used
/// * `buffer` - Ring buffer providing data to put into the guest
/// * `receive_queue` - The receive virtio Queue
pub fn process_receive_queue(
interrupt: &Interrupt,
buffer: &mut VecDeque<u8>,
receive_queue: &mut Queue,
) {
while let Some(mut desc) = receive_queue.peek() {
if buffer.is_empty() {
break;
}
let writer = &mut desc.writer;
while writer.available_bytes() > 0 && !buffer.is_empty() {
let (buffer_front, buffer_back) = buffer.as_slices();
let buffer_chunk = if !buffer_front.is_empty() {
buffer_front
} else {
buffer_back
};
let written = writer.write(buffer_chunk).unwrap();
drop(buffer.drain(..written));
}
let bytes_written = writer.bytes_written() as u32;
if bytes_written > 0 {
let desc = desc.pop();
receive_queue.add_used(desc, bytes_written);
receive_queue.trigger_interrupt(interrupt);
}
}
}

View file

@ -1,324 +0,0 @@
// Copyright 2023 The ChromiumOS Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
//! Implementation of control port used for multi-port enabled virtio-console
use std::collections::VecDeque;
use std::sync::Arc;
use anyhow::anyhow;
use anyhow::Context;
use anyhow::Result;
use base::debug;
use base::error;
use cros_async::select2;
use cros_async::EventAsync;
use cros_async::Executor;
use data_model::Le16;
use data_model::Le32;
use futures::channel::mpsc;
use futures::FutureExt;
use futures::SinkExt;
use futures::StreamExt;
use sync::Mutex;
use zerocopy::AsBytes;
use zerocopy::FromBytes;
use zerocopy::FromZeroes;
use super::handle_input;
use crate::virtio;
use crate::virtio::async_device::AsyncQueueState;
use crate::virtio::console::ConsoleError;
use crate::virtio::Interrupt;
use crate::virtio::Queue;
use crate::virtio::Reader;
type ControlMsgBytes = VecDeque<u8>;
#[derive(Clone, Debug, Default, FromZeroes, FromBytes, AsBytes)]
#[repr(C)]
struct ControlMsg {
id: Le32,
event: Le16,
value: Le16,
}
impl ControlMsg {
fn new(id: u32, event: ControlEvent, value: u16) -> ControlMsg {
ControlMsg {
id: Le32::from(id),
event: Le16::from(event as u16),
value: Le16::from(value),
}
}
}
#[derive(Debug, PartialEq, enumn::N)]
enum ControlEvent {
DeviceReady = 0,
DeviceAdd = 1,
DeviceRemove = 2,
PortReady = 3,
ConsolePort = 4,
Resize = 5,
PortOpen = 6,
PortName = 7,
}
impl TryFrom<u16> for ControlEvent {
type Error = anyhow::Error;
fn try_from(value: u16) -> Result<Self> {
match ControlEvent::n(value) {
Some(event) => Ok(event),
None => Err(anyhow!("unsupported event {}", value)),
}
}
}
fn process_tx_ctrl_msg(
reader: &mut Reader,
ports: &[ConsolePortInfo],
) -> Result<Vec<ControlMsgBytes>> {
let mut messages = Vec::<ControlMsgBytes>::new();
let ports_num = ports.len() as u32;
let ctrl_msg: ControlMsg = reader.read_obj().context("failed to read from reader")?;
let id = ctrl_msg.id.to_native();
let event = ControlEvent::try_from(ctrl_msg.event.to_native())?;
let value: u16 = ctrl_msg.value.to_native();
if id >= ports_num && event != ControlEvent::DeviceReady {
return Err(anyhow!("console: id {} out of range", id));
}
match event {
ControlEvent::DeviceReady => {
// value of 1 indicates success, and 0 indicates failure
if value == 1 {
for id in 0..ports_num {
let msg = ControlMsg::new(id, ControlEvent::DeviceAdd, 0);
let _ = msg.as_bytes();
messages.push(msg.as_bytes().to_owned().into());
let name = ports[id as usize].name.clone();
let msg = ControlMsg::new(id, ControlEvent::PortName, 0);
let mut reply: ControlMsgBytes = msg.as_bytes().to_owned().into();
reply.extend(name.as_bytes());
messages.push(reply);
}
} else {
error!("console: received event {:?} value {}", event, value);
}
}
ControlEvent::PortReady => {
// value of 1 indicates success, and 0 indicates failure
if value == 1 {
let msg = ControlMsg::new(id, ControlEvent::PortOpen, 1);
messages.push(msg.as_bytes().to_owned().into());
let is_console = ports[id as usize].console;
if is_console {
let msg = ControlMsg::new(id, ControlEvent::ConsolePort, 1);
messages.push(msg.as_bytes().to_owned().into());
}
} else {
error!("console: received event {:?} value {}", event, value);
}
}
ControlEvent::PortOpen => match value {
// Currently, port state change is not supported, default is open.
// And only print debug info here.
0 => debug!("console port{} close", id),
1 => debug!("console port{} open", id),
_ => error!("console port{} open {}", id, value),
},
_ => {
return Err(anyhow!("console: unexpected control event {:?}", event));
}
}
Ok(messages)
}
fn process_tx_ctrl_queue(
queue: &Arc<Mutex<Queue>>,
doorbell: &Interrupt,
ports: &[ConsolePortInfo],
) -> Vec<ControlMsgBytes> {
let mut needs_interrupt = false;
let mut messages = Vec::<ControlMsgBytes>::new();
let mut queue = queue.try_lock().expect("Lock should not be unavailable");
while let Some(mut avail_desc) = queue.pop() {
match process_tx_ctrl_msg(&mut avail_desc.reader, ports) {
Ok(mut msg) => messages.append(&mut msg),
Err(e) => {
error!("console: failed to handle control msg: {}", e);
}
}
queue.add_used(avail_desc, 0);
needs_interrupt = true;
}
if needs_interrupt {
queue.trigger_interrupt(doorbell);
}
messages
}
async fn run_tx_ctrl_queue(
queue: &Arc<Mutex<Queue>>,
doorbell: Interrupt,
kick_evt: EventAsync,
sender: &mut mpsc::UnboundedSender<Vec<ControlMsgBytes>>,
ports: Vec<ConsolePortInfo>,
) {
loop {
if let Err(e) = kick_evt.next_val().await {
error!("Failed to read kick event for tx queue: {}", e);
break;
}
let messages = process_tx_ctrl_queue(queue, &doorbell, &ports);
if let Err(e) = sender.send(messages).await {
error!("console: failed to send control msg: {}", e);
break;
}
}
}
async fn run_rx_ctrl_queue(
queue: &Arc<Mutex<Queue>>,
doorbell: Interrupt,
kick_evt: EventAsync,
receiver: &mut mpsc::UnboundedReceiver<Vec<ControlMsgBytes>>,
) {
loop {
let messages = receiver.next().await;
if let Some(messages) = messages {
for mut msg in messages.into_iter() {
while !msg.is_empty() {
match handle_input(&doorbell, &mut msg, queue) {
Ok(()) => {}
Err(ConsoleError::RxDescriptorsExhausted) => {
// Wait until a descriptor becomes available and try again.
if let Err(e) = kick_evt.next_val().await {
error!("Failed to read kick event for rx-ctrl queue: {}", e);
return;
}
}
}
}
}
}
}
}
/// Each port info for multi-port virtio-console
#[derive(Default, Clone)]
pub struct ConsolePortInfo {
pub console: bool,
pub name: String,
}
/// Control port for multi-port virtio-console
pub struct ControlPort {
sender: AsyncQueueState<mpsc::UnboundedSender<Vec<ControlMsgBytes>>>,
receiver: AsyncQueueState<mpsc::UnboundedReceiver<Vec<ControlMsgBytes>>>,
ports: Vec<ConsolePortInfo>,
}
impl ControlPort {
/// Create a control port with the given port info
pub fn new(ports: Vec<ConsolePortInfo>) -> ControlPort {
let (sender, receiver) = mpsc::unbounded::<Vec<ControlMsgBytes>>();
ControlPort {
sender: AsyncQueueState::Stopped(sender),
receiver: AsyncQueueState::Stopped(receiver),
ports,
}
}
/// Start the control receiveq
pub fn start_receive_queue(
&mut self,
ex: &Executor,
queue: Arc<Mutex<virtio::Queue>>,
doorbell: Interrupt,
) -> Result<()> {
let kick_evt = queue
.lock()
.event()
.try_clone()
.context("Failed to clone queue event")?;
let kick_evt =
EventAsync::new(kick_evt, ex).context("Failed to create EventAsync for kick_evt")?;
let receiver = &mut self.receiver;
let rx_future = |mut receiver, abort| {
Ok(async move {
select2(
run_rx_ctrl_queue(&queue, doorbell, kick_evt, &mut receiver).boxed_local(),
abort,
)
.await;
receiver
})
};
receiver.start(ex, rx_future)
}
/// Stop the control receiveq
pub fn stop_receive_queue(&mut self) -> anyhow::Result<bool> {
self.receiver
.stop()
.context("failed to stop control rx queue")
}
/// Start the control transmitq
pub fn start_transmit_queue(
&mut self,
ex: &Executor,
queue: Arc<Mutex<virtio::Queue>>,
doorbell: Interrupt,
) -> Result<()> {
let kick_evt = queue
.lock()
.event()
.try_clone()
.context("Failed to clone queue event")?;
let kick_evt =
EventAsync::new(kick_evt, ex).context("Failed to create EventAsync for kick_evt")?;
let sender = &mut self.sender;
let ports = self.ports.clone();
let tx_future = |mut sender, abort| {
Ok(async move {
select2(
run_tx_ctrl_queue(&queue, doorbell, kick_evt, &mut sender, ports).boxed_local(),
abort,
)
.await;
sender
})
};
sender.start(ex, tx_future)
}
/// Stop the control transmitq
pub fn stop_transmit_queue(&mut self) -> anyhow::Result<bool> {
self.sender
.stop()
.context("failed to stop control tx queue")
}
}

View file

@ -0,0 +1,56 @@
// Copyright 2024 The ChromiumOS Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
//! Virtio console device output handling.
use std::io;
use std::io::Read;
use base::error;
use crate::virtio::Interrupt;
use crate::virtio::Queue;
use crate::virtio::Reader;
/// Writes the available data from the reader into the given output queue.
///
/// # Arguments
///
/// * `reader` - The Reader with the data we want to write.
/// * `output` - The output sink we are going to write the data to.
fn process_transmit_request(reader: &mut Reader, output: &mut dyn io::Write) -> io::Result<()> {
let len = reader.available_bytes();
let mut data = vec![0u8; len];
reader.read_exact(&mut data)?;
output.write_all(&data)?;
output.flush()?;
Ok(())
}
/// Processes the data taken from the given transmit queue into the output sink.
///
/// # Arguments
///
/// * `interrupt` - Interrupt used to signal (if required) that the queue has been used
/// * `transmit_queue` - The transmit virtio Queue
/// * `output` - The output sink we are going to write the data into
pub fn process_transmit_queue(
interrupt: &Interrupt,
transmit_queue: &mut Queue,
output: &mut dyn io::Write,
) {
let mut needs_interrupt = false;
while let Some(mut avail_desc) = transmit_queue.pop() {
if let Err(e) = process_transmit_request(&mut avail_desc.reader, output) {
error!("console: process_transmit_request failed: {}", e);
}
transmit_queue.add_used(avail_desc, 0);
needs_interrupt = true;
}
if needs_interrupt {
transmit_queue.trigger_interrupt(interrupt);
}
}

View file

@ -0,0 +1,150 @@
// Copyright 2024 The ChromiumOS Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
//! Virtio console device per-port functionality.
use std::collections::VecDeque;
use std::sync::Arc;
use anyhow::Context;
use base::AsRawDescriptor;
use base::Descriptor;
use base::Event;
use base::RawDescriptor;
use base::WorkerThread;
use serde::Deserialize;
use serde::Serialize;
use sync::Mutex;
use crate::serial::sys::InStreamType;
use crate::virtio::console::sys::spawn_input_thread;
/// Each port info for multi-port virtio-console
#[derive(Clone, Debug)]
pub struct ConsolePortInfo {
pub console: bool,
pub name: Option<String>,
}
impl ConsolePortInfo {
pub fn name(&self) -> Option<&str> {
self.name.as_deref()
}
}
pub struct ConsolePort {
pub(crate) input: Option<InStreamType>,
pub(crate) output: Option<Box<dyn std::io::Write + Send>>,
info: Option<ConsolePortInfo>,
// input_buffer is shared with the input_thread while it is running.
input_buffer: Arc<Mutex<VecDeque<u8>>>,
// `in_avail_evt` will be signaled by the input thread to notify the worker when new input is
// available in `input_buffer`.
in_avail_evt: Event,
input_thread: Option<WorkerThread<InStreamType>>,
keep_descriptors: Vec<Descriptor>,
}
#[derive(Serialize, Deserialize)]
pub struct ConsolePortSnapshot {
input_buffer: Vec<u8>,
}
impl ConsolePort {
pub fn new(
input: Option<InStreamType>,
output: Option<Box<dyn std::io::Write + Send>>,
info: Option<ConsolePortInfo>,
mut keep_rds: Vec<RawDescriptor>,
) -> Self {
let input_buffer = Arc::new(Mutex::new(VecDeque::new()));
let in_avail_evt = Event::new().expect("Event::new() failed");
keep_rds.push(in_avail_evt.as_raw_descriptor());
ConsolePort {
input,
output,
info,
input_buffer,
in_avail_evt,
input_thread: None,
keep_descriptors: keep_rds.iter().map(|rd| Descriptor(*rd)).collect(),
}
}
pub fn clone_in_avail_evt(&self) -> anyhow::Result<Event> {
self.in_avail_evt
.try_clone()
.context("clone_in_avail_evt failed")
}
pub fn clone_input_buffer(&self) -> Arc<Mutex<VecDeque<u8>>> {
self.input_buffer.clone()
}
pub fn take_output(&mut self) -> Option<Box<dyn std::io::Write + Send>> {
self.output.take()
}
pub fn port_info(&self) -> Option<&ConsolePortInfo> {
self.info.as_ref()
}
pub fn start_input_thread(&mut self) {
// Spawn a separate thread to poll input.
// A thread is used because io::Read only provides a blocking interface, and there is no
// generic way to add an io::Read instance to a poll context (it may not be backed by a
// file descriptor). Moving the blocking read call to a separate thread and
// sending data back to the main worker thread with an event for
// notification bridges this gap.
if let Some(input) = self.input.take() {
assert!(self.input_thread.is_none());
let thread_in_avail_evt = self
.clone_in_avail_evt()
.expect("failed creating input available Event pair");
let thread = spawn_input_thread(input, thread_in_avail_evt, self.input_buffer.clone());
self.input_thread = Some(thread);
}
}
pub fn stop_input_thread(&mut self) {
if let Some(input_thread) = self.input_thread.take() {
let input = input_thread.stop();
self.input = Some(input);
}
}
pub fn snapshot(&mut self) -> ConsolePortSnapshot {
// This is only guaranteed to return a consistent state while the input thread is stopped.
self.stop_input_thread();
let input_buffer = self.input_buffer.lock().iter().copied().collect();
self.start_input_thread();
ConsolePortSnapshot { input_buffer }
}
pub fn restore(&mut self, snap: &ConsolePortSnapshot) {
self.stop_input_thread();
// Set the input buffer, discarding any currently buffered data.
let mut input_buffer = self.input_buffer.lock();
input_buffer.clear();
input_buffer.extend(snap.input_buffer.iter());
drop(input_buffer);
self.start_input_thread();
}
pub fn keep_rds(&self) -> Vec<RawDescriptor> {
self.keep_descriptors
.iter()
.map(|descr| descr.as_raw_descriptor())
.collect()
}
}

View file

@ -12,5 +12,4 @@ cfg_if::cfg_if! {
}
}
pub(in crate::virtio::console) use platform::read_input;
pub(in crate::virtio::console) use platform::spawn_input_thread;

View file

@ -8,6 +8,7 @@ use std::sync::Arc;
use std::time::Duration;
use std::time::Instant;
use anyhow::Context;
use base::error;
use base::Event;
use base::EventToken;
@ -20,6 +21,9 @@ use sync::Mutex;
use crate::serial::sys::InStreamType;
use crate::serial_device::SerialInput;
use crate::serial_device::SerialOptions;
use crate::virtio::console::device::ConsoleDevice;
use crate::virtio::console::port::ConsolePort;
use crate::virtio::console::port::ConsolePortInfo;
use crate::virtio::console::Console;
use crate::virtio::ProtectionType;
use crate::SerialDevice;
@ -29,91 +33,114 @@ impl SerialDevice for Console {
protection_type: ProtectionType,
_event: Event,
input: Option<Box<dyn SerialInput>>,
out: Option<Box<dyn io::Write + Send>>,
output: Option<Box<dyn io::Write + Send>>,
// TODO(b/171331752): connect filesync functionality.
_sync: Option<Box<dyn FileSync + Send>>,
options: SerialOptions,
keep_rds: Vec<RawDescriptor>,
) -> Console {
Console::new(protection_type, input, out, keep_rds, options.pci_address)
Console::new(
protection_type,
input,
output,
keep_rds,
options.pci_address,
)
}
}
fn is_a_fatal_input_error(e: &io::Error) -> bool {
e.kind() != io::ErrorKind::Interrupted
impl SerialDevice for ConsoleDevice {
fn new(
protection_type: ProtectionType,
_event: Event,
input: Option<Box<dyn SerialInput>>,
output: Option<Box<dyn io::Write + Send>>,
_sync: Option<Box<dyn FileSync + Send>>,
options: SerialOptions,
keep_rds: Vec<RawDescriptor>,
) -> ConsoleDevice {
let info = ConsolePortInfo {
name: options.name,
console: options.console,
};
let port = ConsolePort::new(input, output, Some(info), keep_rds);
ConsoleDevice::new_single_port(protection_type, port)
}
}
/// Starts a thread that reads rx and sends the input back via the returned buffer.
impl SerialDevice for ConsolePort {
fn new(
_protection_type: ProtectionType,
_event: Event,
input: Option<Box<dyn SerialInput>>,
output: Option<Box<dyn io::Write + Send>>,
// TODO(b/171331752): connect filesync functionality.
_sync: Option<Box<dyn FileSync + Send>>,
options: SerialOptions,
keep_rds: Vec<RawDescriptor>,
) -> ConsolePort {
let info = ConsolePortInfo {
name: options.name,
console: options.console,
};
ConsolePort::new(input, output, Some(info), keep_rds)
}
}
/// Starts a thread that reads input and sends the input back via the provided buffer.
///
/// The caller should listen on `in_avail_evt` for events. When `in_avail_evt` signals that data
/// is available, the caller should lock the returned `Mutex` and read data out of the inner
/// is available, the caller should lock `input_buffer` and read data out of the inner
/// `VecDeque`. The data should be removed from the beginning of the `VecDeque` as it is processed.
///
/// # Arguments
///
/// * `rx` - Data source that the reader thread will wait on to send data back to the buffer
/// * `input` - Data source that the reader thread will wait on to send data back to the buffer
/// * `in_avail_evt` - Event triggered by the thread when new input is available on the buffer
pub(in crate::virtio::console) fn spawn_input_thread(
mut rx: InStreamType,
in_avail_evt: &Event,
input_buffer: VecDeque<u8>,
) -> (Arc<Mutex<VecDeque<u8>>>, WorkerThread<InStreamType>) {
let buffer = Arc::new(Mutex::new(input_buffer));
let buffer_cloned = buffer.clone();
let thread_in_avail_evt = in_avail_evt
.try_clone()
.expect("failed to clone in_avail_evt");
let res = WorkerThread::start("v_console_input", move |kill_evt| {
mut input: InStreamType,
in_avail_evt: Event,
input_buffer: Arc<Mutex<VecDeque<u8>>>,
) -> WorkerThread<InStreamType> {
WorkerThread::start("v_console_input", move |kill_evt| {
// If there is already data, signal immediately.
if !buffer.lock().is_empty() {
thread_in_avail_evt.signal().unwrap();
if !input_buffer.lock().is_empty() {
in_avail_evt.signal().unwrap();
}
read_input(&mut rx, &thread_in_avail_evt, buffer, kill_evt);
rx
});
(buffer_cloned, res)
if let Err(e) = read_input(&mut input, &in_avail_evt, input_buffer, kill_evt) {
error!("console input thread exited with error: {:#}", e);
}
input
})
}
pub(in crate::virtio::console) fn read_input(
rx: &mut InStreamType,
fn read_input(
input: &mut InStreamType,
thread_in_avail_evt: &Event,
buffer: Arc<Mutex<VecDeque<u8>>>,
kill_evt: Event,
) {
) -> anyhow::Result<()> {
#[derive(EventToken)]
enum Token {
ConsoleEvent,
Kill,
}
let wait_ctx: WaitContext<Token> = match WaitContext::build_with(&[
let wait_ctx: WaitContext<Token> = WaitContext::build_with(&[
(&kill_evt, Token::Kill),
(rx.get_read_notifier(), Token::ConsoleEvent),
]) {
Ok(ctx) => ctx,
Err(e) => {
error!("failed creating WaitContext {:?}", e);
return;
}
};
(input.get_read_notifier(), Token::ConsoleEvent),
])
.context("failed creating WaitContext")?;
let mut kill_timeout = None;
let mut rx_buf = [0u8; 1 << 12];
'wait: loop {
let events = match wait_ctx.wait() {
Ok(events) => events,
Err(e) => {
error!("Failed to wait for events. {}", e);
return;
}
};
let events = wait_ctx.wait().context("Failed to wait for events")?;
for event in events.iter() {
match event.token {
Token::Kill => {
// Ignore the kill event until there are no other events to process so that
// we drain `rx` as much as possible. The next `wait_ctx.wait()` call will
// we drain `input` as much as possible. The next `wait_ctx.wait()` call will
// immediately re-entry this case since we don't call `kill_evt.wait()`.
if events.iter().all(|e| matches!(e.token, Token::Kill)) {
break 'wait;
@ -135,25 +162,22 @@ pub(in crate::virtio::console) fn read_input(
}
}
Token::ConsoleEvent => {
match rx.read(&mut rx_buf) {
match input.read(&mut rx_buf) {
Ok(0) => break 'wait, // Assume the stream of input has ended.
Ok(size) => {
buffer.lock().extend(&rx_buf[0..size]);
thread_in_avail_evt.signal().unwrap();
}
// Being interrupted is not an error, but everything else is.
Err(e) if e.kind() == io::ErrorKind::Interrupted => {}
Err(e) => {
// Being interrupted is not an error, but everything else is.
if is_a_fatal_input_error(&e) {
error!(
"failed to read for bytes to queue into console device: {}",
e
);
break 'wait;
}
return Err(e).context("failed to read console input");
}
}
}
}
}
}
Ok(())
}

View file

@ -90,23 +90,13 @@ fn is_a_fatal_input_error(e: &io::Error) -> bool {
/// * `in_avail_evt` - Event triggered by the thread when new input is available on the buffer
pub(in crate::virtio::console) fn spawn_input_thread(
mut rx: Box<named_pipes::PipeConnection>,
in_avail_evt: &Event,
input_buffer: VecDeque<u8>,
) -> (
Arc<Mutex<VecDeque<u8>>>,
WorkerThread<Box<named_pipes::PipeConnection>>,
) {
let buffer = Arc::new(Mutex::new(input_buffer));
let buffer_cloned = buffer.clone();
let thread_in_avail_evt = in_avail_evt
.try_clone()
.expect("failed to clone in_avail_evt");
let res = WorkerThread::start("v_console_input", move |kill_evt| {
in_avail_evt: Event,
input_buffer: Arc<Mutex<VecDeque<u8>>>,
) -> WorkerThread<Box<named_pipes::PipeConnection>> {
WorkerThread::start("v_console_input", move |kill_evt| {
// If there is already data, signal immediately.
if !buffer.lock().is_empty() {
thread_in_avail_evt.signal().unwrap();
if !input_buffer.lock().is_empty() {
in_avail_evt.signal().unwrap();
}
match rx.wait_for_client_connection_overlapped_blocking(&kill_evt) {
@ -115,13 +105,12 @@ pub(in crate::virtio::console) fn spawn_input_thread(
Ok(()) => (),
}
read_input(&mut rx, &thread_in_avail_evt, buffer, kill_evt);
read_input(&mut rx, &in_avail_evt, input_buffer, kill_evt);
rx
});
(buffer_cloned, res)
})
}
pub(in crate::virtio::console) fn read_input(
fn read_input(
rx: &mut Box<named_pipes::PipeConnection>,
thread_in_avail_evt: &Event,
buffer: Arc<Mutex<VecDeque<u8>>>,

View file

@ -0,0 +1,442 @@
// Copyright 2024 The ChromiumOS Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
//! Virtio console device worker thread.
use std::collections::BTreeMap;
use std::collections::VecDeque;
use std::sync::mpsc;
use std::sync::Arc;
use anyhow::anyhow;
use anyhow::Context;
use base::error;
use base::Event;
use base::EventToken;
use base::WaitContext;
use base::WorkerThread;
use sync::Mutex;
use crate::virtio::console::control::process_control_receive_queue;
use crate::virtio::console::control::process_control_transmit_queue;
use crate::virtio::console::control::ControlMsgBytes;
use crate::virtio::console::input::process_receive_queue;
use crate::virtio::console::output::process_transmit_queue;
use crate::virtio::console::port::ConsolePort;
use crate::virtio::console::port::ConsolePortInfo;
use crate::virtio::Interrupt;
use crate::virtio::Queue;
const PORT0_RECEIVEQ_IDX: usize = 0;
const PORT0_TRANSMITQ_IDX: usize = 1;
const CONTROL_RECEIVEQ_IDX: usize = 2;
const CONTROL_TRANSMITQ_IDX: usize = 3;
const PORT1_RECEIVEQ_IDX: usize = 4;
const PORT1_TRANSMITQ_IDX: usize = 5;
pub struct WorkerPort {
info: Option<ConsolePortInfo>,
in_avail_evt: Event,
input_buffer: Arc<Mutex<VecDeque<u8>>>,
output: Box<dyn std::io::Write + Send>,
}
impl WorkerPort {
pub fn from_console_port(port: &mut ConsolePort) -> WorkerPort {
let in_avail_evt = port.clone_in_avail_evt().unwrap();
let input_buffer = port.clone_input_buffer();
let output = port
.take_output()
.unwrap_or_else(|| Box::new(std::io::sink()));
let info = port.port_info().cloned();
WorkerPort {
info,
in_avail_evt,
input_buffer,
output,
}
}
pub fn is_console(&self) -> bool {
self.info
.as_ref()
.map(|info| info.console)
.unwrap_or_default()
}
pub fn name(&self) -> Option<&str> {
self.info.as_ref().and_then(ConsolePortInfo::name)
}
}
#[derive(EventToken)]
enum Token {
ReceiveQueueAvailable(u32),
TransmitQueueAvailable(u32),
InputAvailable(u32),
ControlReceiveQueueAvailable,
ControlTransmitQueueAvailable,
InterruptResample,
WorkerRequest,
Kill,
}
pub enum WorkerRequest {
StartQueue {
idx: usize,
queue: Queue,
response_sender: mpsc::SyncSender<anyhow::Result<()>>,
},
StopQueue {
idx: usize,
response_sender: mpsc::SyncSender<Option<Queue>>,
},
}
pub struct Worker {
wait_ctx: WaitContext<Token>,
interrupt: Interrupt,
// Currently running queues.
queues: BTreeMap<usize, Queue>,
// Console ports indexed by port ID. At least port 0 will exist, and other ports may be
// available if `VIRTIO_CONSOLE_F_MULTIPORT` is enabled.
ports: Vec<WorkerPort>,
// Device-to-driver messages to be received by the driver via the control receiveq.
pending_receive_control_msgs: VecDeque<ControlMsgBytes>,
worker_receiver: mpsc::Receiver<WorkerRequest>,
worker_event: Event,
}
impl Worker {
pub fn new(
interrupt: Interrupt,
ports: Vec<WorkerPort>,
worker_receiver: mpsc::Receiver<WorkerRequest>,
worker_event: Event,
) -> anyhow::Result<Self> {
let wait_ctx = WaitContext::new().context("WaitContext::new() failed")?;
wait_ctx.add(&worker_event, Token::WorkerRequest)?;
for (index, port) in ports.iter().enumerate() {
let port_id = index as u32;
wait_ctx.add(&port.in_avail_evt, Token::InputAvailable(port_id))?;
}
if let Some(resample_evt) = interrupt.get_resample_evt() {
wait_ctx.add(resample_evt, Token::InterruptResample)?;
}
Ok(Worker {
wait_ctx,
interrupt,
queues: BTreeMap::new(),
ports,
pending_receive_control_msgs: VecDeque::new(),
worker_receiver,
worker_event,
})
}
pub fn run(&mut self, kill_evt: &Event) -> anyhow::Result<()> {
self.wait_ctx.add(kill_evt, Token::Kill)?;
let res = self.run_loop();
self.wait_ctx.delete(kill_evt)?;
res
}
fn run_loop(&mut self) -> anyhow::Result<()> {
let mut running = true;
while running {
let events = self.wait_ctx.wait()?;
for event in events.iter().filter(|e| e.is_readable) {
match event.token {
Token::TransmitQueueAvailable(port_id) => {
if let (Some(port), Some(transmitq)) = (
self.ports.get_mut(port_id as usize),
transmitq_idx(port_id).and_then(|idx| self.queues.get_mut(&idx)),
) {
transmitq
.event()
.wait()
.context("failed reading transmit queue Event")?;
process_transmit_queue(&self.interrupt, transmitq, &mut port.output);
}
}
Token::ReceiveQueueAvailable(port_id) | Token::InputAvailable(port_id) => {
let port = self.ports.get_mut(port_id as usize);
let receiveq =
receiveq_idx(port_id).and_then(|idx| self.queues.get_mut(&idx));
let event = if matches!(event.token, Token::ReceiveQueueAvailable(..)) {
receiveq.as_ref().map(|q| q.event())
} else {
port.as_ref().map(|p| &p.in_avail_evt)
};
if let Some(event) = event {
event.wait().context("failed to clear receive event")?;
}
if let (Some(port), Some(receiveq)) = (port, receiveq) {
let mut input_buffer = port.input_buffer.lock();
process_receive_queue(&self.interrupt, &mut input_buffer, receiveq);
}
}
Token::ControlReceiveQueueAvailable => {
if let Some(ctrl_receiveq) = self.queues.get_mut(&CONTROL_RECEIVEQ_IDX) {
ctrl_receiveq
.event()
.wait()
.context("failed waiting on control event")?;
process_control_receive_queue(
ctrl_receiveq,
&self.interrupt,
&mut self.pending_receive_control_msgs,
);
}
}
Token::ControlTransmitQueueAvailable => {
if let Some(ctrl_transmitq) = self.queues.get_mut(&CONTROL_TRANSMITQ_IDX) {
ctrl_transmitq
.event()
.wait()
.context("failed waiting on control event")?;
process_control_transmit_queue(
ctrl_transmitq,
&self.interrupt,
&self.ports,
&mut self.pending_receive_control_msgs,
);
}
// Attempt to send any new replies if there is space in the receiveq.
if let Some(ctrl_receiveq) = self.queues.get_mut(&CONTROL_RECEIVEQ_IDX) {
process_control_receive_queue(
ctrl_receiveq,
&self.interrupt,
&mut self.pending_receive_control_msgs,
)
}
}
Token::InterruptResample => {
self.interrupt.interrupt_resample();
}
Token::WorkerRequest => {
self.worker_event.wait()?;
self.process_worker_requests();
}
Token::Kill => running = false,
}
}
}
Ok(())
}
fn process_worker_requests(&mut self) {
while let Ok(request) = self.worker_receiver.try_recv() {
match request {
WorkerRequest::StartQueue {
idx,
queue,
response_sender,
} => {
let res = self.start_queue(idx, queue);
let _ = response_sender.send(res);
}
WorkerRequest::StopQueue {
idx,
response_sender,
} => {
let res = self.stop_queue(idx);
let _ = response_sender.send(res);
}
}
}
}
fn start_queue(&mut self, idx: usize, queue: Queue) -> anyhow::Result<()> {
if let Some(port_id) = receiveq_port_id(idx) {
self.wait_ctx
.add(queue.event(), Token::ReceiveQueueAvailable(port_id))?;
} else if let Some(port_id) = transmitq_port_id(idx) {
self.wait_ctx
.add(queue.event(), Token::TransmitQueueAvailable(port_id))?;
} else if idx == CONTROL_RECEIVEQ_IDX {
self.wait_ctx
.add(queue.event(), Token::ControlReceiveQueueAvailable)?;
} else if idx == CONTROL_TRANSMITQ_IDX {
self.wait_ctx
.add(queue.event(), Token::ControlTransmitQueueAvailable)?;
} else {
return Err(anyhow!("unhandled queue idx {idx}"));
}
let prev = self.queues.insert(idx, queue);
assert!(prev.is_none());
Ok(())
}
fn stop_queue(&mut self, idx: usize) -> Option<Queue> {
if let Some(queue) = self.queues.remove(&idx) {
let _ = self.wait_ctx.delete(queue.event());
Some(queue)
} else {
None
}
}
}
pub struct WorkerHandle {
worker_thread: WorkerThread<()>,
worker_sender: mpsc::Sender<WorkerRequest>,
worker_event: Event,
}
impl WorkerHandle {
pub fn new(interrupt: Interrupt, ports: Vec<WorkerPort>) -> anyhow::Result<Self> {
let worker_event = Event::new().context("Event::new")?;
let worker_event_clone = worker_event.try_clone().context("Event::try_clone")?;
let (worker_sender, worker_receiver) = mpsc::channel();
let worker_thread = WorkerThread::start("v_console", move |kill_evt| {
let mut worker = Worker::new(interrupt, ports, worker_receiver, worker_event_clone)
.expect("console Worker::new() failed");
if let Err(e) = worker.run(&kill_evt) {
error!("console worker failed: {:#}", e);
}
});
Ok(WorkerHandle {
worker_thread,
worker_sender,
worker_event,
})
}
pub fn start_queue(&mut self, idx: usize, queue: Queue) -> anyhow::Result<()> {
let (response_sender, response_receiver) = mpsc::sync_channel(0);
self.worker_sender
.send(WorkerRequest::StartQueue {
idx,
queue,
response_sender,
})
.context("mpsc::Sender::send")?;
self.worker_event.signal().context("Event::signal")?;
response_receiver.recv().context("mpsc::Receiver::recv")?
}
pub fn stop_queue(&mut self, idx: usize) -> anyhow::Result<Option<Queue>> {
let (response_sender, response_receiver) = mpsc::sync_channel(0);
self.worker_sender
.send(WorkerRequest::StopQueue {
idx,
response_sender,
})
.context("mpsc::Sender::send")?;
self.worker_event.signal().context("Event::signal")?;
response_receiver.recv().context("mpsc::Receiver::recv")
}
}
impl Drop for WorkerHandle {
fn drop(&mut self) {
let _ = self.worker_thread.signal();
}
}
fn receiveq_idx(port_id: u32) -> Option<usize> {
if port_id == 0 {
Some(PORT0_RECEIVEQ_IDX)
} else {
PORT1_RECEIVEQ_IDX.checked_add((port_id - 1).checked_mul(2)?.try_into().ok()?)
}
}
fn transmitq_idx(port_id: u32) -> Option<usize> {
if port_id == 0 {
Some(PORT0_TRANSMITQ_IDX)
} else {
PORT1_TRANSMITQ_IDX.checked_add((port_id - 1).checked_mul(2)?.try_into().ok()?)
}
}
fn receiveq_port_id(queue_idx: usize) -> Option<u32> {
if queue_idx == PORT0_RECEIVEQ_IDX {
Some(0)
} else if queue_idx >= PORT1_RECEIVEQ_IDX && (queue_idx & 1) == 0 {
((queue_idx - PORT1_RECEIVEQ_IDX) / 2)
.checked_add(1)?
.try_into()
.ok()
} else {
None
}
}
fn transmitq_port_id(queue_idx: usize) -> Option<u32> {
if queue_idx == PORT0_TRANSMITQ_IDX {
Some(0)
} else if queue_idx >= PORT1_TRANSMITQ_IDX && (queue_idx & 1) == 1 {
((queue_idx - PORT1_TRANSMITQ_IDX) / 2)
.checked_add(1)?
.try_into()
.ok()
} else {
None
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_receiveq_idx() {
assert_eq!(receiveq_idx(0), Some(0));
assert_eq!(receiveq_idx(1), Some(4));
assert_eq!(receiveq_idx(2), Some(6));
assert_eq!(receiveq_idx(3), Some(8));
}
#[test]
fn test_transmitq_idx() {
assert_eq!(transmitq_idx(0), Some(1));
assert_eq!(transmitq_idx(1), Some(5));
assert_eq!(transmitq_idx(2), Some(7));
assert_eq!(transmitq_idx(3), Some(9));
}
#[test]
fn test_receiveq_port_id() {
assert_eq!(receiveq_port_id(0), Some(0));
assert_eq!(receiveq_port_id(1), None); // port0 transmitq
assert_eq!(receiveq_port_id(2), None); // ctrl receiveq
assert_eq!(receiveq_port_id(3), None); // ctrl transmitq
assert_eq!(receiveq_port_id(4), Some(1));
assert_eq!(receiveq_port_id(5), None);
assert_eq!(receiveq_port_id(6), Some(2));
assert_eq!(receiveq_port_id(7), None);
assert_eq!(receiveq_port_id(8), Some(3));
assert_eq!(receiveq_port_id(9), None);
}
#[test]
fn test_transmitq_port_id() {
assert_eq!(transmitq_port_id(0), None); // port0 receiveq
assert_eq!(transmitq_port_id(1), Some(0));
assert_eq!(transmitq_port_id(2), None); // ctrl receiveq
assert_eq!(transmitq_port_id(3), None); // ctrl transmitq
assert_eq!(transmitq_port_id(4), None); // port1 receiveq
assert_eq!(transmitq_port_id(5), Some(1));
assert_eq!(transmitq_port_id(6), None);
assert_eq!(transmitq_port_id(7), Some(2));
assert_eq!(transmitq_port_id(8), None);
assert_eq!(transmitq_port_id(9), Some(3));
}
}

View file

@ -252,7 +252,46 @@ pub mod wl {
}
pub mod console {
use data_model::Le16;
use data_model::Le32;
use zerocopy::AsBytes;
use zerocopy::FromBytes;
use zerocopy::FromZeroes;
pub const VIRTIO_CONSOLE_F_SIZE: u32 = 0;
pub const VIRTIO_CONSOLE_F_MULTIPORT: u32 = 1;
pub const VIRTIO_CONSOLE_F_EMERG_WRITE: u32 = 2;
#[derive(Copy, Clone, Debug, Default, AsBytes, FromZeroes, FromBytes)]
#[repr(C)]
pub struct virtio_console_config {
pub cols: Le16,
pub rows: Le16,
pub max_nr_ports: Le32,
pub emerg_wr: Le32,
}
#[derive(Copy, Clone, Debug, Default, FromZeroes, FromBytes, AsBytes)]
#[repr(C)]
pub struct virtio_console_control {
pub id: Le32,
pub event: Le16,
pub value: Le16,
}
#[derive(Copy, Clone, Debug, Default, FromZeroes, FromBytes, AsBytes)]
#[repr(C)]
pub struct virtio_console_resize {
pub cols: Le16,
pub rows: Le16,
}
pub const VIRTIO_CONSOLE_DEVICE_READY: u16 = 0;
pub const VIRTIO_CONSOLE_DEVICE_ADD: u16 = 1;
pub const VIRTIO_CONSOLE_DEVICE_REMOVE: u16 = 2;
pub const VIRTIO_CONSOLE_PORT_READY: u16 = 3;
pub const VIRTIO_CONSOLE_CONSOLE_PORT: u16 = 4;
pub const VIRTIO_CONSOLE_RESIZE: u16 = 5;
pub const VIRTIO_CONSOLE_PORT_OPEN: u16 = 6;
pub const VIRTIO_CONSOLE_PORT_NAME: u16 = 7;
}

View file

@ -4,7 +4,6 @@
//! Implements virtio devices, queues, and transport mechanisms.
mod async_device;
mod async_utils;
#[cfg(feature = "balloon")]
mod balloon;

View file

@ -3,8 +3,8 @@
// found in the LICENSE file.
use std::path::PathBuf;
use std::sync::Arc;
use anyhow::anyhow;
use anyhow::bail;
use anyhow::Context;
use argh::FromArgs;
@ -13,21 +13,14 @@ use base::Event;
use base::RawDescriptor;
use base::Terminal;
use cros_async::Executor;
use data_model::Le32;
use hypervisor::ProtectionType;
use sync::Mutex;
use vm_memory::GuestMemory;
use vmm_vhost::message::VhostUserProtocolFeatures;
use vmm_vhost::VHOST_USER_F_PROTOCOL_FEATURES;
use zerocopy::AsBytes;
use crate::virtio;
use crate::virtio::console::asynchronous::ConsoleDevice;
use crate::virtio::console::asynchronous::ConsolePort;
use crate::virtio::console::virtio_console_config;
use crate::virtio::copy_config;
use crate::virtio::console::device::ConsoleDevice;
use crate::virtio::console::port::ConsolePort;
use crate::virtio::vhost::user::device::handler::DeviceRequestHandler;
use crate::virtio::vhost::user::device::handler::Error as DeviceError;
use crate::virtio::vhost::user::device::handler::VhostUserDevice;
use crate::virtio::vhost::user::device::listener::sys::VhostUserListener;
use crate::virtio::vhost::user::device::listener::VhostUserListenerTrait;
@ -59,7 +52,7 @@ impl Drop for VhostUserConsoleDevice {
}
impl VhostUserDeviceBuilder for VhostUserConsoleDevice {
fn build(self: Box<Self>, ex: &Executor) -> anyhow::Result<Box<dyn vmm_vhost::Backend>> {
fn build(mut self: Box<Self>, _ex: &Executor) -> anyhow::Result<Box<dyn vmm_vhost::Backend>> {
if self.raw_stdin {
// Set stdin() to raw mode so we can send over individual keystrokes unbuffered
std::io::stdin()
@ -67,14 +60,9 @@ impl VhostUserDeviceBuilder for VhostUserConsoleDevice {
.context("failed to set terminal in raw mode")?;
}
let queue_num = self.console.max_queues();
let active_queues = vec![None; queue_num];
self.console.start_input_threads();
let backend = ConsoleBackend {
device: *self,
ex: ex.clone(),
active_queues,
};
let backend = ConsoleBackend { device: *self };
let handler = DeviceRequestHandler::new(backend);
Ok(Box::new(handler))
@ -83,8 +71,6 @@ impl VhostUserDeviceBuilder for VhostUserConsoleDevice {
struct ConsoleBackend {
device: VhostUserConsoleDevice,
ex: Executor,
active_queues: Vec<Option<Arc<Mutex<Queue>>>>,
}
impl VhostUserDevice for ConsoleBackend {
@ -93,7 +79,7 @@ impl VhostUserDevice for ConsoleBackend {
}
fn features(&self) -> u64 {
self.device.console.avail_features() | 1 << VHOST_USER_F_PROTOCOL_FEATURES
self.device.console.features() | 1 << VHOST_USER_F_PROTOCOL_FEATURES
}
fn protocol_features(&self) -> VhostUserProtocolFeatures {
@ -101,52 +87,30 @@ impl VhostUserDevice for ConsoleBackend {
}
fn read_config(&self, offset: u64, data: &mut [u8]) {
let max_nr_ports = self.device.console.max_ports();
let config = virtio_console_config {
max_nr_ports: Le32::from(max_nr_ports as u32),
..Default::default()
};
copy_config(data, 0, config.as_bytes(), offset);
self.device.console.read_config(offset, data);
}
fn reset(&mut self) {
for queue_num in 0..self.max_queue_num() {
if let Err(e) = self.stop_queue(queue_num) {
error!("Failed to stop_queue during reset: {}", e);
}
if let Err(e) = self.device.console.reset() {
error!("console reset failed: {:#}", e);
}
}
fn start_queue(
&mut self,
idx: usize,
queue: virtio::Queue,
queue: Queue,
_mem: GuestMemory,
doorbell: Interrupt,
) -> anyhow::Result<()> {
let queue = Arc::new(Mutex::new(queue));
let res = self
.device
.console
.start_queue(&self.ex, idx, queue.clone(), doorbell);
self.active_queues[idx].replace(queue);
res
self.device.console.start_queue(idx, queue, doorbell)
}
fn stop_queue(&mut self, idx: usize) -> anyhow::Result<virtio::Queue> {
if let Err(e) = self.device.console.stop_queue(idx) {
error!("error while stopping queue {}: {}", idx, e);
}
if let Some(active_queue) = self.active_queues[idx].take() {
let queue = Arc::try_unwrap(active_queue)
.expect("failed to recover queue from worker")
.into_inner();
Ok(queue)
} else {
Err(anyhow::Error::new(DeviceError::WorkerNotFound))
fn stop_queue(&mut self, idx: usize) -> anyhow::Result<Queue> {
match self.device.console.stop_queue(idx) {
Ok(Some(queue)) => Ok(queue),
Ok(None) => Err(anyhow!("queue {idx} not started")),
Err(e) => Err(e).with_context(|| format!("failed to stop queue {idx}")),
}
}
}
@ -176,7 +140,7 @@ fn create_vu_multi_port_device(
params: &[SerialParameters],
keep_rds: &mut Vec<RawDescriptor>,
) -> anyhow::Result<VhostUserConsoleDevice> {
let mut ports = params
let ports = params
.iter()
.map(|x| {
let port = x
@ -193,8 +157,7 @@ fn create_vu_multi_port_device(
})
.collect::<anyhow::Result<Vec<_>>>()?;
let port0 = ports.remove(0);
let device = ConsoleDevice::new_multi_port(ProtectionType::Unprotected, port0, ports);
let device = ConsoleDevice::new_multi_port(ProtectionType::Unprotected, ports);
Ok(VhostUserConsoleDevice {
console: device,

View file

@ -285,7 +285,7 @@ macro_rules! suspendable_virtio_tests {
let mut queue = QueueConfig::new(queue_size, 0);
queue.set_ready(true);
let queue = queue
.activate(mem, Event::new().unwrap())
.activate(mem, base::Event::new().unwrap())
.expect("QueueConfig::activate");
queues.insert(i, queue);
}
@ -357,7 +357,7 @@ macro_rules! suspendable_virtio_tests {
device
.virtio_wake(Some((mem.clone(), interrupt.clone(), sleep_result)))
.expect("failed to wake");
let (_, device) = &mut $dev();
let (_ctx2, mut device) = $dev();
device
.virtio_restore(snap.clone())
.expect("failed to restore");

View file

@ -25,13 +25,11 @@ use base::linux::MemfdSeals;
use base::sys::SharedMemoryLinux;
use base::ReadNotifier;
use base::*;
use devices::serial_device::SerialHardware;
use devices::serial_device::SerialParameters;
use devices::serial_device::SerialType;
use devices::vfio::VfioContainerManager;
use devices::virtio;
use devices::virtio::block::DiskOption;
use devices::virtio::console::asynchronous::AsyncConsole;
#[cfg(any(feature = "video-decoder", feature = "video-encoder"))]
use devices::virtio::device_constants::video::VideoBackendType;
#[cfg(any(feature = "video-decoder", feature = "video-encoder"))]
@ -1382,17 +1380,10 @@ impl VirtioDeviceBuilder for &SerialParameters {
let mut keep_rds = Vec::new();
let evt = Event::new().context("failed to create event")?;
if self.hardware == SerialHardware::LegacyVirtioConsole {
Ok(Box::new(
self.create_serial_device::<Console>(protection_type, &evt, &mut keep_rds)
.context("failed to create console device")?,
))
} else {
Ok(Box::new(
self.create_serial_device::<AsyncConsole>(protection_type, &evt, &mut keep_rds)
.context("failed to create console device")?,
))
}
Ok(Box::new(
self.create_serial_device::<Console>(protection_type, &evt, &mut keep_rds)
.context("failed to create console device")?,
))
}
fn create_vhost_user_device(