devices: virtio: make Queue own its Event

This cleans up the activate() signature and avoids the need to pass two
related (Queue, Event) variables everywhere a queue is needed.

No functional change intended.

BUG=None
TEST=tools/dev_container tools/presubmit

Change-Id: I9e5259428b39ad3a677fbb8a0bf574b3f15a7f35
Reviewed-on: https://chromium-review.googlesource.com/c/crosvm/crosvm/+/4738991
Reviewed-by: Frederick Mayle <fmayle@google.com>
Commit-Queue: Daniel Verkamp <dverkamp@chromium.org>
This commit is contained in:
Daniel Verkamp 2023-08-01 15:56:22 -07:00 committed by crosvm LUCI
parent f931125120
commit c3622e7f8d
49 changed files with 538 additions and 531 deletions

View file

@ -851,16 +851,16 @@ async fn handle_pending_adjusted_responses(
/// Represents queues & events for the balloon device.
struct BalloonQueues {
inflate: (Queue, Event),
deflate: (Queue, Event),
stats: Option<(Queue, Event)>,
reporting: Option<(Queue, Event)>,
events: Option<(Queue, Event)>,
ws: (Option<(Queue, Event)>, Option<(Queue, Event)>),
inflate: Queue,
deflate: Queue,
stats: Option<Queue>,
reporting: Option<Queue>,
events: Option<Queue>,
ws: (Option<Queue>, Option<Queue>),
}
impl BalloonQueues {
fn new(inflate: (Queue, Event), deflate: (Queue, Event)) -> Self {
fn new(inflate: Queue, deflate: Queue) -> Self {
BalloonQueues {
inflate,
deflate,
@ -935,12 +935,12 @@ struct WorkerReturn {
// The main worker thread. Initialized the asynchronous worker tasks and passes them to the executor
// to be processed.
fn run_worker(
inflate_queue: (Queue, Event),
deflate_queue: (Queue, Event),
stats_queue: Option<(Queue, Event)>,
reporting_queue: Option<(Queue, Event)>,
events_queue: Option<(Queue, Event)>,
ws_queues: (Option<(Queue, Event)>, Option<(Queue, Event)>),
inflate_queue: Queue,
deflate_queue: Queue,
stats_queue: Option<Queue>,
reporting_queue: Option<Queue>,
events_queue: Option<Queue>,
ws_queues: (Option<Queue>, Option<Queue>),
command_tube: Tube,
#[cfg(windows)] vm_memory_client: VmMemoryClient,
release_memory_tube: Option<Tube>,
@ -965,9 +965,13 @@ fn run_worker(
let paused_queues = {
// The first queue is used for inflate messages
let stop_rx = create_stop_oneshot(&mut stop_queue_oneshots);
let inflate_queue_evt = inflate_queue
.event()
.try_clone()
.expect("failed to clone queue event");
let inflate = handle_queue(
inflate_queue.0,
EventAsync::new(inflate_queue.1, &ex).expect("failed to create async event"),
inflate_queue,
EventAsync::new(inflate_queue_evt, &ex).expect("failed to create async event"),
release_memory_tube.as_ref(),
interrupt.clone(),
|guest_address, len| {
@ -987,9 +991,13 @@ fn run_worker(
// The second queue is used for deflate messages
let stop_rx = create_stop_oneshot(&mut stop_queue_oneshots);
let deflate_queue_evt = deflate_queue
.event()
.try_clone()
.expect("failed to clone queue event");
let deflate = handle_queue(
deflate_queue.0,
EventAsync::new(deflate_queue.1, &ex).expect("failed to create async event"),
deflate_queue,
EventAsync::new(deflate_queue_evt, &ex).expect("failed to create async event"),
None,
interrupt.clone(),
|guest_address, len| {
@ -1008,8 +1016,12 @@ fn run_worker(
// The next queue is used for stats messages if VIRTIO_BALLOON_F_STATS_VQ is negotiated.
let (stats_tx, stats_rx) = mpsc::channel::<()>(1);
let has_stats_queue = stats_queue.is_some();
let stats = if let Some((stats_queue, stats_queue_evt)) = stats_queue {
let stats = if let Some(stats_queue) = stats_queue {
let stop_rx = create_stop_oneshot(&mut stop_queue_oneshots);
let stats_queue_evt = stats_queue
.event()
.try_clone()
.expect("failed to clone queue event");
handle_stats_queue(
stats_queue,
EventAsync::new(stats_queue_evt, &ex).expect("failed to create async event"),
@ -1030,8 +1042,12 @@ fn run_worker(
// The next queue is used for reporting messages
let has_reporting_queue = reporting_queue.is_some();
let reporting = if let Some((reporting_queue, reporting_queue_evt)) = reporting_queue {
let reporting = if let Some(reporting_queue) = reporting_queue {
let stop_rx = create_stop_oneshot(&mut stop_queue_oneshots);
let reporting_queue_evt = reporting_queue
.event()
.try_clone()
.expect("failed to clone queue event");
handle_reporting_queue(
reporting_queue,
EventAsync::new(reporting_queue_evt, &ex).expect("failed to create async event"),
@ -1059,8 +1075,12 @@ fn run_worker(
// If VIRTIO_BALLOON_F_WS_REPORTING is set 2 queues must handled - one for WS data and one
// for WS notifications.
let has_ws_data_queue = ws_queues.0.is_some();
let ws_data = if let Some((ws_data_queue, ws_data_queue_evt)) = ws_queues.0 {
let ws_data = if let Some(ws_data_queue) = ws_queues.0 {
let stop_rx = create_stop_oneshot(&mut stop_queue_oneshots);
let ws_data_queue_evt = ws_data_queue
.event()
.try_clone()
.expect("failed to clone queue event");
handle_ws_data_queue(
ws_data_queue,
EventAsync::new(ws_data_queue_evt, &ex).expect("failed to create async event"),
@ -1080,8 +1100,12 @@ fn run_worker(
let (ws_op_tx, ws_op_rx) = mpsc::channel::<WSOp>(1);
let has_ws_op_queue = ws_queues.1.is_some();
let ws_op = if let Some((ws_op_queue, ws_op_queue_evt)) = ws_queues.1 {
let ws_op = if let Some(ws_op_queue) = ws_queues.1 {
let stop_rx = create_stop_oneshot(&mut stop_queue_oneshots);
let ws_op_queue_evt = ws_op_queue
.event()
.try_clone()
.expect("failed to clone queue event");
handle_ws_op_queue(
ws_op_queue,
EventAsync::new(ws_op_queue_evt, &ex).expect("failed to create async event"),
@ -1128,8 +1152,12 @@ fn run_worker(
// The next queue is used for events if VIRTIO_BALLOON_F_EVENTS_VQ is negotiated.
let has_events_queue = events_queue.is_some();
let events = if let Some((events_queue, events_queue_evt)) = events_queue {
let events = if let Some(events_queue) = events_queue {
let stop_rx = create_stop_oneshot(&mut stop_queue_oneshots);
let events_queue_evt = events_queue
.event()
.try_clone()
.expect("failed to clone queue event");
handle_events_queue(
events_queue,
EventAsync::new(events_queue_evt, &ex).expect("failed to create async event"),
@ -1406,7 +1434,7 @@ impl Balloon {
/// are not negotiated) into a structure that is easier to work with.
fn get_queues_from_map(
&self,
mut queues: BTreeMap<usize, (Queue, Event)>,
mut queues: BTreeMap<usize, Queue>,
) -> anyhow::Result<BalloonQueues> {
let expected_queues = Balloon::num_expected_queues(self.acked_features);
if queues.len() != expected_queues {
@ -1560,7 +1588,7 @@ impl VirtioDevice for Balloon {
&mut self,
mem: GuestMemory,
interrupt: Interrupt,
queues: BTreeMap<usize, (Queue, Event)>,
queues: BTreeMap<usize, Queue>,
) -> anyhow::Result<()> {
let queues = self.get_queues_from_map(queues)?;
self.start_worker(mem, interrupt, queues)
@ -1588,7 +1616,7 @@ impl VirtioDevice for Balloon {
fn virtio_wake(
&mut self,
queues_state: Option<(GuestMemory, Interrupt, BTreeMap<usize, (Queue, Event)>)>,
queues_state: Option<(GuestMemory, Interrupt, BTreeMap<usize, Queue>)>,
) -> anyhow::Result<()> {
if let Some((mem, interrupt, queues)) = queues_state {
if queues.len() < 2 {

View file

@ -492,7 +492,6 @@ pub enum WorkerCmd {
StartQueue {
index: usize,
queue: Queue,
kick_evt: Event,
interrupt: Interrupt,
},
StopQueue {
@ -563,8 +562,9 @@ pub async fn run_worker(
worker_cmd = worker_rx.next() => {
match worker_cmd {
None => anyhow::bail!("worker control channel unexpectedly closed"),
Some(WorkerCmd::StartQueue{index, queue, kick_evt, interrupt}) => {
Some(WorkerCmd::StartQueue{index, queue, interrupt}) => {
let (tx, rx) = oneshot::channel();
let kick_evt = queue.event().try_clone().expect("Failed to clone queue event");
let (handle_queue_future, remote_handle) = handle_queue(
Rc::clone(disk_state),
Rc::new(RefCell::new(queue)),
@ -992,7 +992,7 @@ impl VirtioDevice for BlockAsync {
&mut self,
_mem: GuestMemory,
interrupt: Interrupt,
queues: BTreeMap<usize, (Queue, Event)>,
queues: BTreeMap<usize, Queue>,
) -> anyhow::Result<()> {
assert!(self.num_activated_queues.is_none());
self.num_activated_queues = Some(queues.len());
@ -1037,12 +1037,11 @@ impl VirtioDevice for BlockAsync {
let (worker_tx, worker_rx) = mpsc::unbounded();
// Add commands to start all the queues before starting the worker.
for (index, (queue, event)) in queues.into_iter() {
for (index, queue) in queues.into_iter() {
worker_tx
.unbounded_send(WorkerCmd::StartQueue {
index,
queue,
kick_evt: event,
interrupt: interrupt.clone(),
})
.unwrap_or_else(|_| panic!("worker channel closed early"));
@ -1156,7 +1155,7 @@ impl VirtioDevice for BlockAsync {
fn virtio_wake(
&mut self,
queues_state: Option<(GuestMemory, Interrupt, BTreeMap<usize, (Queue, Event)>)>,
queues_state: Option<(GuestMemory, Interrupt, BTreeMap<usize, Queue>)>,
) -> anyhow::Result<()> {
if let Some((mem, interrupt, queues)) = queues_state {
// TODO: activate is just what we want at the moment, but we should probably move
@ -1656,18 +1655,20 @@ mod tests {
// activate with queues of an arbitrary size.
let mut q0 = QueueConfig::new(DEFAULT_QUEUE_SIZE, 0);
q0.set_ready(true);
let q0 = q0.activate(&mem).expect("QueueConfig::activate");
let e0 = Event::new().unwrap();
let q0 = q0
.activate(&mem, Event::new().unwrap())
.expect("QueueConfig::activate");
let mut q1 = QueueConfig::new(DEFAULT_QUEUE_SIZE, 0);
q1.set_ready(true);
let q1 = q1.activate(&mem).expect("QueueConfig::activate");
let e1 = Event::new().unwrap();
let q1 = q1
.activate(&mem, Event::new().unwrap())
.expect("QueueConfig::activate");
b.activate(
mem.clone(),
Interrupt::new(IrqLevelEvent::new().unwrap(), None, VIRTIO_MSI_NO_VECTOR),
BTreeMap::from([(0, (q0, e0)), (1, (q1, e1))]),
BTreeMap::from([(0, q0), (1, q1)]),
)
.expect("activate should succeed");
// assert resources are consumed
@ -1695,18 +1696,20 @@ mod tests {
// re-activate should succeed
let mut q0 = QueueConfig::new(DEFAULT_QUEUE_SIZE, 0);
q0.set_ready(true);
let q0 = q0.activate(&mem).expect("QueueConfig::activate");
let e0 = Event::new().unwrap();
let q0 = q0
.activate(&mem, Event::new().unwrap())
.expect("QueueConfig::activate");
let mut q1 = QueueConfig::new(DEFAULT_QUEUE_SIZE, 0);
q1.set_ready(true);
let q1 = q1.activate(&mem).expect("QueueConfig::activate");
let e1 = Event::new().unwrap();
let q1 = q1
.activate(&mem, Event::new().unwrap())
.expect("QueueConfig::activate");
b.activate(
mem,
Interrupt::new(IrqLevelEvent::new().unwrap(), None, VIRTIO_MSI_NO_VECTOR),
BTreeMap::from([(0, (q0, e0)), (1, (q1, e1))]),
BTreeMap::from([(0, q0), (1, q1)]),
)
.expect("re-activate should succeed");
}
@ -1767,21 +1770,19 @@ mod tests {
// activate with queues of an arbitrary size.
let mut q0 = QueueConfig::new(DEFAULT_QUEUE_SIZE, 0);
q0.set_ready(true);
let q0 = q0.activate(&mem).expect("QueueConfig::activate");
let e0 = Event::new().unwrap();
let q0 = q0
.activate(&mem, Event::new().unwrap())
.expect("QueueConfig::activate");
let mut q1 = QueueConfig::new(DEFAULT_QUEUE_SIZE, 0);
q1.set_ready(true);
let q1 = q1.activate(&mem).expect("QueueConfig::activate");
let e1 = Event::new().unwrap();
let q1 = q1
.activate(&mem, Event::new().unwrap())
.expect("QueueConfig::activate");
let interrupt = Interrupt::new(IrqLevelEvent::new().unwrap(), None, VIRTIO_MSI_NO_VECTOR);
b.activate(
mem,
interrupt.clone(),
BTreeMap::from([(0, (q0, e0)), (1, (q1, e1))]),
)
.expect("activate should succeed");
b.activate(mem, interrupt.clone(), BTreeMap::from([(0, q0), (1, q1)]))
.expect("activate should succeed");
// assert the original size first
assert_eq!(
@ -1878,18 +1879,20 @@ mod tests {
// activate with queues of an arbitrary size.
let mut q0 = QueueConfig::new(DEFAULT_QUEUE_SIZE, 0);
q0.set_ready(true);
let q0 = q0.activate(&mem).expect("QueueConfig::activate");
let e0 = Event::new().unwrap();
let q0 = q0
.activate(&mem, Event::new().unwrap())
.expect("QueueConfig::activate");
let mut q1 = QueueConfig::new(DEFAULT_QUEUE_SIZE, 0);
q1.set_ready(true);
let q1 = q1.activate(&mem).expect("QueueConfig::activate");
let e1 = Event::new().unwrap();
let q1 = q1
.activate(&mem, Event::new().unwrap())
.expect("QueueConfig::activate");
b.activate(
mem.clone(),
Interrupt::new(IrqLevelEvent::new().unwrap(), None, VIRTIO_MSI_NO_VECTOR),
BTreeMap::from([(0, (q0, e0)), (1, (q1, e1))]),
BTreeMap::from([(0, q0), (1, q1)]),
)
.expect("activate should succeed");
@ -1906,18 +1909,20 @@ mod tests {
// activate should succeed
let mut q0 = QueueConfig::new(DEFAULT_QUEUE_SIZE, 0);
q0.set_ready(true);
let q0 = q0.activate(&mem).expect("QueueConfig::activate");
let e0 = Event::new().unwrap();
let q0 = q0
.activate(&mem, Event::new().unwrap())
.expect("QueueConfig::activate");
let mut q1 = QueueConfig::new(DEFAULT_QUEUE_SIZE, 0);
q1.set_ready(true);
let q1 = q1.activate(&mem).expect("QueueConfig::activate");
let e1 = Event::new().unwrap();
let q1 = q1
.activate(&mem, Event::new().unwrap())
.expect("QueueConfig::activate");
b.activate(
mem,
Interrupt::new(IrqLevelEvent::new().unwrap(), None, VIRTIO_MSI_NO_VECTOR),
BTreeMap::from([(0, (q0, e0)), (1, (q1, e1))]),
BTreeMap::from([(0, q0), (1, q1)]),
)
.expect("activate should succeed");

View file

@ -170,9 +170,7 @@ struct Worker {
kill_evt: Event,
in_avail_evt: Event,
receive_queue: Arc<Mutex<Queue>>,
receive_evt: Event,
transmit_queue: Arc<Mutex<Queue>>,
transmit_evt: Event,
}
impl Worker {
@ -187,8 +185,14 @@ impl Worker {
}
let wait_ctx: WaitContext<Token> = WaitContext::build_with(&[
(&self.transmit_evt, Token::TransmitQueueAvailable),
(&self.receive_evt, Token::ReceiveQueueAvailable),
(
self.transmit_queue.lock().event(),
Token::TransmitQueueAvailable,
),
(
self.receive_queue.lock().event(),
Token::ReceiveQueueAvailable,
),
(&self.in_avail_evt, Token::InputAvailable),
(&self.kill_evt, Token::Kill),
])?;
@ -203,7 +207,9 @@ impl Worker {
for event in events.iter().filter(|e| e.is_readable) {
match event.token {
Token::TransmitQueueAvailable => {
self.transmit_evt
self.transmit_queue
.lock()
.event()
.wait()
.context("failed reading transmit queue Event")?;
process_transmit_queue(
@ -213,7 +219,9 @@ impl Worker {
);
}
Token::ReceiveQueueAvailable => {
self.receive_evt
self.receive_queue
.lock()
.event()
.wait()
.context("failed reading receive queue Event")?;
if let Some(in_buf_ref) = self.input.as_ref() {
@ -326,14 +334,14 @@ impl VirtioDevice for Console {
&mut self,
_mem: GuestMemory,
interrupt: Interrupt,
mut queues: BTreeMap<usize, (Queue, Event)>,
mut queues: BTreeMap<usize, Queue>,
) -> anyhow::Result<()> {
if queues.len() < 2 {
return Err(anyhow!("expected 2 queues, got {}", queues.len()));
}
let (receive_queue, receive_evt) = queues.remove(&0).unwrap();
let (transmit_queue, transmit_evt) = queues.remove(&1).unwrap();
let receive_queue = queues.remove(&0).unwrap();
let transmit_queue = queues.remove(&1).unwrap();
if self.in_avail_evt.is_none() {
self.in_avail_evt = Some(Event::new().context("failed creating Event")?);
@ -374,10 +382,8 @@ impl VirtioDevice for Console {
kill_evt,
// Device -> driver
receive_queue: Arc::new(Mutex::new(receive_queue)),
receive_evt,
// Driver -> device
transmit_queue: Arc::new(Mutex::new(transmit_queue)),
transmit_evt,
};
if let Err(e) = worker.run() {
error!("console run failure: {:?}", e);
@ -428,7 +434,7 @@ impl VirtioDevice for Console {
fn virtio_wake(
&mut self,
queues_state: Option<(GuestMemory, Interrupt, BTreeMap<usize, (Queue, Event)>)>,
queues_state: Option<(GuestMemory, Interrupt, BTreeMap<usize, Queue>)>,
) -> anyhow::Result<()> {
match queues_state {
None => Ok(()),

View file

@ -129,13 +129,17 @@ impl ConsoleDevice {
ex: &Executor,
queue: Arc<Mutex<virtio::Queue>>,
doorbell: Interrupt,
kick_evt: Event,
) -> anyhow::Result<()> {
let input_queue = match self.input.as_mut() {
Some(input_queue) => input_queue,
None => return Ok(()),
};
let kick_evt = queue
.lock()
.event()
.try_clone()
.context("Failed to clone queue event")?;
let kick_evt =
EventAsync::new(kick_evt, ex).context("Failed to create EventAsync for kick_evt")?;
@ -172,8 +176,12 @@ impl ConsoleDevice {
ex: &Executor,
queue: Arc<Mutex<virtio::Queue>>,
doorbell: Interrupt,
kick_evt: Event,
) -> anyhow::Result<()> {
let kick_evt = queue
.lock()
.event()
.try_clone()
.context("Failed to clone queue event")?;
let kick_evt =
EventAsync::new(kick_evt, ex).context("Failed to create EventAsync for kick_evt")?;
@ -285,7 +293,7 @@ impl VirtioDevice for AsyncConsole {
&mut self,
_mem: GuestMemory,
interrupt: Interrupt,
mut queues: BTreeMap<usize, (Queue, Event)>,
mut queues: BTreeMap<usize, Queue>,
) -> anyhow::Result<()> {
if queues.len() < 2 {
return Err(anyhow!("expected 2 queues, got {}", queues.len()));
@ -308,8 +316,8 @@ impl VirtioDevice for AsyncConsole {
};
let ex = Executor::new().expect("failed to create an executor");
let (receive_queue, receive_evt) = queues.remove(&0).unwrap();
let (transmit_queue, transmit_evt) = queues.remove(&1).unwrap();
let receive_queue = queues.remove(&0).unwrap();
let transmit_queue = queues.remove(&1).unwrap();
self.state =
VirtioConsoleState::Running(WorkerThread::start("v_console", move |kill_evt| {
@ -317,9 +325,9 @@ impl VirtioDevice for AsyncConsole {
let receive_queue = Arc::new(Mutex::new(receive_queue));
let transmit_queue = Arc::new(Mutex::new(transmit_queue));
console.start_receive_queue(&ex, receive_queue, interrupt.clone(), receive_evt)?;
console.start_receive_queue(&ex, receive_queue, interrupt.clone())?;
console.start_transmit_queue(&ex, transmit_queue, interrupt, transmit_evt)?;
console.start_transmit_queue(&ex, transmit_queue, interrupt)?;
// Run until the kill event is signaled and cancel all tasks.
ex.run_until(async {

View file

@ -11,7 +11,6 @@ use base::error;
use base::warn;
use base::AsRawDescriptor;
use base::Error as SysError;
use base::Event;
use base::RawDescriptor;
use base::Tube;
use base::WorkerThread;
@ -208,7 +207,7 @@ impl VirtioDevice for Fs {
&mut self,
_guest_mem: GuestMemory,
interrupt: Interrupt,
queues: BTreeMap<usize, (Queue, Event)>,
queues: BTreeMap<usize, Queue>,
) -> anyhow::Result<()> {
if queues.len() != self.queue_sizes.len() {
return Err(anyhow!(
@ -250,7 +249,7 @@ impl VirtioDevice for Fs {
self.workers = queues
.into_iter()
.map(|(idx, (queue, evt))| {
.map(|(idx, queue)| {
let server = server.clone();
let irq = interrupt.clone();
let socket = Arc::clone(&socket);
@ -258,7 +257,7 @@ impl VirtioDevice for Fs {
let worker =
WorkerThread::start(format!("v_fs:{}:{}", self.tag, idx), move |kill_evt| {
let mut worker = Worker::new(queue, server, irq, socket, slot);
worker.run(evt, kill_evt, watch_resample_event)
worker.run(kill_evt, watch_resample_event)
});
if watch_resample_event {

View file

@ -2,13 +2,11 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use std::cell::RefCell;
use std::convert::TryFrom;
use std::convert::TryInto;
use std::fs::File;
use std::io;
use std::os::unix::io::AsRawFd;
use std::rc::Rc;
use std::sync::Arc;
use base::error;
@ -141,7 +139,7 @@ impl fuse::Mapper for Mapper {
}
pub struct Worker<F: FileSystem + Sync> {
queue: Rc<RefCell<Queue>>,
queue: Queue,
server: Arc<fuse::Server<F>>,
irq: Interrupt,
tube: Arc<Mutex<Tube>>,
@ -150,13 +148,12 @@ pub struct Worker<F: FileSystem + Sync> {
pub fn process_fs_queue<F: FileSystem + Sync>(
interrupt: &Interrupt,
queue: &Rc<RefCell<Queue>>,
queue: &mut Queue,
server: &Arc<fuse::Server<F>>,
tube: &Arc<Mutex<Tube>>,
slot: u32,
) -> Result<()> {
let mapper = Mapper::new(Arc::clone(tube), slot);
let mut queue = queue.borrow_mut();
while let Some(mut avail_desc) = queue.pop() {
let total =
server.handle_message(&mut avail_desc.reader, &mut avail_desc.writer, &mapper)?;
@ -177,7 +174,7 @@ impl<F: FileSystem + Sync> Worker<F> {
slot: u32,
) -> Worker<F> {
Worker {
queue: Rc::new(RefCell::new(queue)),
queue,
server,
irq,
tube,
@ -185,12 +182,7 @@ impl<F: FileSystem + Sync> Worker<F> {
}
}
pub fn run(
&mut self,
queue_evt: Event,
kill_evt: Event,
watch_resample_event: bool,
) -> Result<()> {
pub fn run(&mut self, kill_evt: Event, watch_resample_event: bool) -> Result<()> {
// We need to set the no setuid fixup secure bit so that we don't drop capabilities when
// changing the thread uid/gid. Without this, creating new entries can fail in some corner
// cases.
@ -221,9 +213,11 @@ impl<F: FileSystem + Sync> Worker<F> {
Kill,
}
let wait_ctx =
WaitContext::build_with(&[(&queue_evt, Token::QueueReady), (&kill_evt, Token::Kill)])
.map_err(Error::CreateWaitContext)?;
let wait_ctx = WaitContext::build_with(&[
(self.queue.event(), Token::QueueReady),
(&kill_evt, Token::Kill),
])
.map_err(Error::CreateWaitContext)?;
if watch_resample_event {
if let Some(resample_evt) = self.irq.get_resample_evt() {
@ -238,10 +232,10 @@ impl<F: FileSystem + Sync> Worker<F> {
for event in events.iter().filter(|e| e.is_readable) {
match event.token {
Token::QueueReady => {
queue_evt.wait().map_err(Error::ReadQueueEvent)?;
self.queue.event().wait().map_err(Error::ReadQueueEvent)?;
if let Err(e) = process_fs_queue(
&self.irq,
&self.queue,
&mut self.queue,
&self.server,
&self.tube,
self.slot,

View file

@ -1501,7 +1501,7 @@ impl VirtioDevice for Gpu {
&mut self,
mem: GuestMemory,
interrupt: Interrupt,
mut queues: BTreeMap<usize, (Queue, Event)>,
mut queues: BTreeMap<usize, Queue>,
) -> anyhow::Result<()> {
if queues.len() != QUEUE_SIZES.len() {
return Err(anyhow!(
@ -1511,9 +1511,17 @@ impl VirtioDevice for Gpu {
));
}
let (ctrl_queue, ctrl_evt) = queues.remove(&0).unwrap();
let ctrl_queue = queues.remove(&0).unwrap();
let ctrl_evt = ctrl_queue
.event()
.try_clone()
.context("failed to clone queue event")?;
let ctrl_queue = SharedQueueReader::new(ctrl_queue, interrupt.clone());
let (cursor_queue, cursor_evt) = queues.remove(&1).unwrap();
let cursor_queue = queues.remove(&1).unwrap();
let cursor_evt = cursor_queue
.event()
.try_clone()
.context("failed to clone queue event")?;
let cursor_queue = LocalQueueReader::new(cursor_queue, interrupt.clone());
self.worker_thread

View file

@ -433,7 +433,7 @@ impl<T: EventSource> Worker<T> {
// Allow error! and early return anywhere in function
#[allow(clippy::needless_return)]
fn run(&mut self, event_queue_evt: Event, status_queue_evt: Event, kill_evt: Event) {
fn run(&mut self, kill_evt: Event) {
if let Err(e) = self.event_source.init() {
error!("failed initializing event source: {}", e);
return;
@ -448,8 +448,8 @@ impl<T: EventSource> Worker<T> {
Kill,
}
let wait_ctx: WaitContext<Token> = match WaitContext::build_with(&[
(&event_queue_evt, Token::EventQAvailable),
(&status_queue_evt, Token::StatusQAvailable),
(self.event_queue.event(), Token::EventQAvailable),
(self.status_queue.event(), Token::StatusQAvailable),
(&self.event_source, Token::InputEventsAvailable),
(&kill_evt, Token::Kill),
]) {
@ -483,14 +483,14 @@ impl<T: EventSource> Worker<T> {
for wait_event in wait_events.iter().filter(|e| e.is_readable) {
match wait_event.token {
Token::EventQAvailable => {
if let Err(e) = event_queue_evt.wait() {
if let Err(e) = self.event_queue.event().wait() {
error!("failed reading event queue Event: {}", e);
break 'wait;
}
eventq_needs_interrupt |= self.send_events();
}
Token::StatusQAvailable => {
if let Err(e) = status_queue_evt.wait() {
if let Err(e) = self.status_queue.event().wait() {
error!("failed reading status queue Event: {}", e);
break 'wait;
}
@ -578,13 +578,13 @@ where
&mut self,
_mem: GuestMemory,
interrupt: Interrupt,
mut queues: BTreeMap<usize, (Queue, Event)>,
mut queues: BTreeMap<usize, Queue>,
) -> anyhow::Result<()> {
if queues.len() != 2 {
return Err(anyhow!("expected 2 queues, got {}", queues.len()));
}
let (event_queue, event_queue_evt) = queues.remove(&0).unwrap();
let (status_queue, status_queue_evt) = queues.remove(&1).unwrap();
let event_queue = queues.remove(&0).unwrap();
let status_queue = queues.remove(&1).unwrap();
let source = self
.source
@ -597,7 +597,7 @@ where
event_queue,
status_queue,
};
worker.run(event_queue_evt, status_queue_evt, kill_evt);
worker.run(kill_evt);
worker
}));
@ -626,7 +626,7 @@ where
fn virtio_wake(
&mut self,
queues_state: Option<(GuestMemory, Interrupt, BTreeMap<usize, (Queue, Event)>)>,
queues_state: Option<(GuestMemory, Interrupt, BTreeMap<usize, Queue>)>,
) -> anyhow::Result<()> {
if let Some((mem, interrupt, queues)) = queues_state {
self.activate(mem, interrupt, queues)?;

View file

@ -620,7 +620,7 @@ async fn request_queue(
fn run(
state: State,
iommu_device_tube: Tube,
mut queues: BTreeMap<usize, (Queue, Event)>,
mut queues: BTreeMap<usize, Queue>,
kill_evt: Event,
interrupt: Interrupt,
translate_response_senders: Option<BTreeMap<u32, Tube>>,
@ -629,7 +629,11 @@ fn run(
let state = Rc::new(RefCell::new(state));
let ex = Executor::new().expect("Failed to create an executor");
let (req_queue, req_evt) = queues.remove(&0).unwrap();
let req_queue = queues.remove(&0).unwrap();
let req_evt = req_queue
.event()
.try_clone()
.expect("Failed to clone queue event");
let req_evt = EventAsync::new(req_evt, &ex).expect("Failed to create async event for queue");
let f_resample = async_utils::handle_irq_resample(&ex, interrupt.clone());
@ -796,7 +800,7 @@ impl VirtioDevice for Iommu {
&mut self,
mem: GuestMemory,
interrupt: Interrupt,
queues: BTreeMap<usize, (Queue, Event)>,
queues: BTreeMap<usize, Queue>,
) -> anyhow::Result<()> {
if queues.len() != QUEUE_SIZES.len() {
return Err(anyhow!(

View file

@ -389,13 +389,7 @@ where
)
}
fn run(
&mut self,
rx_queue_evt: Event,
tx_queue_evt: Event,
ctrl_queue_evt: Option<Event>,
handle_interrupt_resample: bool,
) -> Result<(), NetError> {
fn run(&mut self, handle_interrupt_resample: bool) -> Result<(), NetError> {
let wait_ctx: WaitContext<Token> = WaitContext::build_with(&[
// This doesn't use get_read_notifier() because of overlapped io; we
// have overlapped wrapper separate from the TAP so that we can pass
@ -408,15 +402,15 @@ where
),
#[cfg(unix)]
(self.tap.get_read_notifier(), Token::RxTap),
(&rx_queue_evt, Token::RxQueue),
(&tx_queue_evt, Token::TxQueue),
(self.rx_queue.event(), Token::RxQueue),
(self.tx_queue.event(), Token::TxQueue),
(&self.kill_evt, Token::Kill),
])
.map_err(NetError::CreateWaitContext)?;
if let Some(ctrl_evt) = &ctrl_queue_evt {
if let Some(ctrl_queue) = &self.ctrl_queue {
wait_ctx
.add(ctrl_evt, Token::CtrlQueue)
.add(ctrl_queue.event(), Token::CtrlQueue)
.map_err(NetError::CreateWaitContext)?;
}
@ -440,7 +434,7 @@ where
}
Token::RxQueue => {
let _trace = cros_tracing::trace_event!(VirtioNet, "handle RxQueue event");
if let Err(e) = rx_queue_evt.wait() {
if let Err(e) = self.rx_queue.event().wait() {
error!("net: error reading rx queue Event: {}", e);
break 'wait;
}
@ -449,7 +443,7 @@ where
}
Token::TxQueue => {
let _trace = cros_tracing::trace_event!(VirtioNet, "handle TxQueue event");
if let Err(e) = tx_queue_evt.wait() {
if let Err(e) = self.tx_queue.event().wait() {
error!("net: error reading tx queue Event: {}", e);
break 'wait;
}
@ -458,7 +452,7 @@ where
Token::CtrlQueue => {
let _trace =
cros_tracing::trace_event!(VirtioNet, "handle CtrlQueue event");
if let Some(ctrl_evt) = &ctrl_queue_evt {
if let Some(ctrl_evt) = self.ctrl_queue.as_ref().map(|q| q.event()) {
if let Err(e) = ctrl_evt.wait() {
error!("net: error reading ctrl queue Event: {}", e);
break 'wait;
@ -724,7 +718,7 @@ where
&mut self,
_mem: GuestMemory,
interrupt: Interrupt,
mut queues: BTreeMap<usize, (Queue, Event)>,
mut queues: BTreeMap<usize, Queue>,
) -> anyhow::Result<()> {
let ctrl_vq_enabled = self.acked_features & (1 << virtio_net::VIRTIO_NET_F_CTRL_VQ) != 0;
let mq_enabled = self.acked_features & (1 << virtio_net::VIRTIO_NET_F_MQ) != 0;
@ -762,13 +756,12 @@ where
let interrupt = interrupt.clone();
let first_queue = i == 0;
// Queues alternate between rx0, tx0, rx1, tx1, ..., rxN, txN, ctrl.
let (rx_queue, rx_queue_evt) = queues.pop_first().unwrap().1;
let (tx_queue, tx_queue_evt) = queues.pop_first().unwrap().1;
let (ctrl_queue, ctrl_queue_evt) = if first_queue && ctrl_vq_enabled {
let (queue, evt) = queues.pop_last().unwrap().1;
(Some(queue), Some(evt))
let rx_queue = queues.pop_first().unwrap().1;
let tx_queue = queues.pop_first().unwrap().1;
let ctrl_queue = if first_queue && ctrl_vq_enabled {
Some(queues.pop_last().unwrap().1)
} else {
(None, None)
None
};
// Handle interrupt resampling on the first queue's thread.
let handle_interrupt_resample = first_queue;
@ -795,12 +788,7 @@ where
deferred_rx: false,
kill_evt,
};
let result = worker.run(
rx_queue_evt,
tx_queue_evt,
ctrl_queue_evt,
handle_interrupt_resample,
);
let result = worker.run(handle_interrupt_resample);
if let Err(e) = result {
error!("net worker thread exited with error: {}", e);
}
@ -836,7 +824,7 @@ where
fn virtio_wake(
&mut self,
device_state: Option<(GuestMemory, Interrupt, BTreeMap<usize, (Queue, Event)>)>,
device_state: Option<(GuestMemory, Interrupt, BTreeMap<usize, Queue>)>,
) -> anyhow::Result<()> {
match device_state {
None => Ok(()),

View file

@ -72,7 +72,6 @@ pub type P9Result<T> = result::Result<T, P9Error>;
struct Worker {
interrupt: Interrupt,
queue: Queue,
queue_evt: Event,
server: p9::Server,
}
@ -104,7 +103,7 @@ impl Worker {
}
let wait_ctx: WaitContext<Token> = WaitContext::build_with(&[
(&self.queue_evt, Token::QueueReady),
(self.queue.event(), Token::QueueReady),
(&kill_evt, Token::Kill),
])
.map_err(P9Error::CreateWaitContext)?;
@ -119,7 +118,7 @@ impl Worker {
for event in events.iter().filter(|e| e.is_readable) {
match event.token {
Token::QueueReady => {
self.queue_evt.wait().map_err(P9Error::ReadQueueEvent)?;
self.queue.event().wait().map_err(P9Error::ReadQueueEvent)?;
self.process_queue()?;
}
Token::InterruptResample => {
@ -207,13 +206,13 @@ impl VirtioDevice for P9 {
&mut self,
_guest_mem: GuestMemory,
interrupt: Interrupt,
mut queues: BTreeMap<usize, (Queue, Event)>,
mut queues: BTreeMap<usize, Queue>,
) -> anyhow::Result<()> {
if queues.len() != 1 {
return Err(anyhow!("expected 1 queue, got {}", queues.len()));
}
let (queue, queue_evt) = queues.remove(&0).unwrap();
let queue = queues.remove(&0).unwrap();
let server = self.server.take().context("missing server")?;
@ -221,7 +220,6 @@ impl VirtioDevice for P9 {
let mut worker = Worker {
interrupt,
queue,
queue_evt,
server,
};

View file

@ -178,7 +178,6 @@ async fn handle_queue(
}
fn run_worker(
queue_evt: Event,
queue: &mut Queue,
pmem_device_tube: Tube,
interrupt: Interrupt,
@ -188,6 +187,10 @@ fn run_worker(
) {
let ex = Executor::new().unwrap();
let queue_evt = queue
.event()
.try_clone()
.expect("failed to clone queue event");
let queue_evt = EventAsync::new(queue_evt, &ex).expect("failed to set up the queue event");
// Process requests from the virtio queue.
@ -292,13 +295,13 @@ impl VirtioDevice for Pmem {
&mut self,
_memory: GuestMemory,
interrupt: Interrupt,
mut queues: BTreeMap<usize, (Queue, Event)>,
mut queues: BTreeMap<usize, Queue>,
) -> anyhow::Result<()> {
if queues.len() != 1 {
return Err(anyhow!("expected 1 queue, got {}", queues.len()));
}
let (mut queue, queue_event) = queues.remove(&0).unwrap();
let mut queue = queues.remove(&0).unwrap();
let mapping_arena_slot = self.mapping_arena_slot;
// We checked that this fits in a usize in `Pmem::new`.
@ -311,7 +314,6 @@ impl VirtioDevice for Pmem {
self.worker_thread = Some(WorkerThread::start("v_pmem", move |kill_event| {
run_worker(
queue_event,
&mut queue,
pmem_device_tube,
interrupt,
@ -343,7 +345,7 @@ impl VirtioDevice for Pmem {
fn virtio_wake(
&mut self,
queues_state: Option<(GuestMemory, Interrupt, BTreeMap<usize, (Queue, Event)>)>,
queues_state: Option<(GuestMemory, Interrupt, BTreeMap<usize, Queue>)>,
) -> anyhow::Result<()> {
if let Some((mem, interrupt, queues)) = queues_state {
self.activate(mem, interrupt, queues)?;

View file

@ -251,7 +251,7 @@ impl PvClock {
fn start_worker(
&mut self,
interrupt: Interrupt,
mut queues: BTreeMap<usize, (Queue, Event)>,
mut queues: BTreeMap<usize, Queue>,
pvclock_worker: PvClockWorker,
) -> anyhow::Result<()> {
if queues.len() != QUEUE_SIZES.len() {
@ -262,7 +262,7 @@ impl PvClock {
));
}
let (set_pvclock_page_queue, set_pvclock_page_queue_evt) = queues.remove(&0).unwrap();
let set_pvclock_page_queue = queues.remove(&0).unwrap();
let suspend_tube = self
.suspend_tube
@ -274,7 +274,6 @@ impl PvClock {
move |kill_evt| {
run_worker(
pvclock_worker,
set_pvclock_page_queue_evt,
set_pvclock_page_queue,
suspend_tube,
interrupt,
@ -527,7 +526,6 @@ struct WorkerReturn {
// TODO(b/237300012): asyncify this device.
fn run_worker(
mut worker: PvClockWorker,
set_pvclock_page_queue_evt: Event,
mut set_pvclock_page_queue: Queue,
suspend_tube: Tube,
interrupt: Interrupt,
@ -542,7 +540,7 @@ fn run_worker(
}
let wait_ctx: WaitContext<Token> = match WaitContext::build_with(&[
(&set_pvclock_page_queue_evt, Token::SetPvClockPageQueue),
(set_pvclock_page_queue.event(), Token::SetPvClockPageQueue),
(suspend_tube.get_read_notifier(), Token::SuspendResume),
// TODO(b/242743502): Can also close on Tube closure for Unix once CloseNotifier is
// implemented for Tube.
@ -586,7 +584,7 @@ fn run_worker(
for event in events.iter().filter(|e| e.is_readable) {
match event.token {
Token::SetPvClockPageQueue => {
let _ = set_pvclock_page_queue_evt.wait();
let _ = set_pvclock_page_queue.event().wait();
let desc_chain = match set_pvclock_page_queue.pop() {
Some(desc_chain) => desc_chain,
None => {
@ -729,7 +727,7 @@ impl VirtioDevice for PvClock {
&mut self,
mem: GuestMemory,
interrupt: Interrupt,
queues: BTreeMap<usize, (Queue, Event)>,
queues: BTreeMap<usize, Queue>,
) -> anyhow::Result<()> {
let tsc_frequency = self.tsc_frequency;
let total_suspend_ns = self.total_suspend_ns.clone();
@ -761,7 +759,7 @@ impl VirtioDevice for PvClock {
fn virtio_wake(
&mut self,
queues_state: Option<(GuestMemory, Interrupt, BTreeMap<usize, (Queue, Event)>)>,
queues_state: Option<(GuestMemory, Interrupt, BTreeMap<usize, Queue>)>,
) -> anyhow::Result<()> {
if let Some((mem, interrupt, queues)) = queues_state {
let worker_snap = self
@ -839,10 +837,7 @@ mod tests {
.activate(
mem.clone(),
make_interrupt(),
BTreeMap::from([(
0,
(fake_queue.activate(&mem).unwrap(), Event::new().unwrap()),
)]),
BTreeMap::from([(0, fake_queue.activate(&mem, Event::new().unwrap()).unwrap())]),
)
.expect("activate should succeed");
@ -866,10 +861,7 @@ mod tests {
let mut wake_queues = BTreeMap::new();
let mut fake_queue = QueueConfig::new(TEST_QUEUE_SIZE, 0);
fake_queue.set_ready(true);
wake_queues.insert(
0,
(fake_queue.activate(mem).unwrap(), Event::new().unwrap()),
);
wake_queues.insert(0, fake_queue.activate(mem, Event::new().unwrap()).unwrap());
let queues_state = (mem.clone(), make_interrupt(), wake_queues);
pvclock_device
.virtio_wake(Some(queues_state))

View file

@ -18,6 +18,7 @@ use anyhow::bail;
use anyhow::Context;
use anyhow::Result;
use base::warn;
use base::Event;
use cros_async::AsyncError;
use cros_async::EventAsync;
use futures::channel::oneshot;
@ -262,7 +263,7 @@ impl QueueConfig {
}
/// Convert the queue configuration into an active queue.
pub fn activate(&mut self, mem: &GuestMemory) -> Result<Queue> {
pub fn activate(&mut self, mem: &GuestMemory, event: Event) -> Result<Queue> {
if !self.ready {
bail!("attempted to activate a non-ready queue");
}
@ -272,10 +273,12 @@ impl QueueConfig {
}
// If VIRTIO_F_RING_PACKED feature bit is set, create a packed queue, otherwise create a split queue
let queue: Queue = if ((self.acked_features >> VIRTIO_F_RING_PACKED) & 1) != 0 {
let pq = PackedQueue::new(self, mem).context("Failed to create a packed queue.")?;
let pq =
PackedQueue::new(self, mem, event).context("Failed to create a packed queue.")?;
Queue::PackedVirtQueue(pq)
} else {
let sq = SplitQueue::new(self, mem).context("Failed to create a split queue.")?;
let sq =
SplitQueue::new(self, mem, event).context("Failed to create a split queue.")?;
Queue::SplitVirtQueue(sq)
};
@ -445,11 +448,12 @@ impl Queue {
queue_config: &QueueConfig,
queue_value: serde_json::Value,
mem: &GuestMemory,
event: Event,
) -> anyhow::Result<Queue> {
if queue_config.acked_features & 1 << VIRTIO_F_RING_PACKED != 0 {
PackedQueue::restore(queue_value, mem).map(Queue::PackedVirtQueue)
PackedQueue::restore(queue_value, mem, event).map(Queue::PackedVirtQueue)
} else {
SplitQueue::restore(queue_value, mem).map(Queue::SplitVirtQueue)
SplitQueue::restore(queue_value, mem, event).map(Queue::SplitVirtQueue)
}
}
@ -484,6 +488,12 @@ impl Queue {
u16,
);
define_queue_method!(
/// Get a reference to the queue's event.
event,
&Event,
);
define_queue_method!(
/// Reset queue's counters.
/// This method doesn't change the queue's metadata so it's reusable without initializing it

View file

@ -14,6 +14,7 @@ use anyhow::Context;
use anyhow::Result;
use base::error;
use base::warn;
use base::Event;
use serde::Deserialize;
use serde::Serialize;
use sync::Mutex;
@ -86,10 +87,12 @@ impl Default for PackedQueueIndex {
}
}
#[derive(Clone, Debug)]
#[derive(Debug)]
pub struct PackedQueue {
mem: GuestMemory,
event: Event,
// The queue size in elements the driver selected
size: u16,
@ -137,7 +140,7 @@ pub struct PackedQueueSnapshot {
impl PackedQueue {
/// Constructs an empty virtio queue with the given `max_size`.
pub fn new(config: &QueueConfig, mem: &GuestMemory) -> Result<Self> {
pub fn new(config: &QueueConfig, mem: &GuestMemory, event: Event) -> Result<Self> {
let size = config.size();
let desc_table = config.desc_table();
@ -165,6 +168,7 @@ impl PackedQueue {
Ok(PackedQueue {
mem: mem.clone(),
event,
size,
vector: config.vector(),
desc_table: config.desc_table(),
@ -208,6 +212,11 @@ impl PackedQueue {
self.device_event_suppression
}
/// Get a reference to the queue's "kick event"
pub fn event(&self) -> &Event {
&self.event
}
fn area_sizes(
queue_size: u16,
desc_table: GuestAddress,
@ -491,7 +500,11 @@ impl PackedQueue {
/// TODO: b/290307056 - Implement restore for packed virtqueue,
/// add tests to validate.
pub fn restore(_queue_value: serde_json::Value, _mem: &GuestMemory) -> Result<PackedQueue> {
pub fn restore(
_queue_value: serde_json::Value,
_mem: &GuestMemory,
_event: Event,
) -> Result<PackedQueue> {
bail!("Restore for packed virtqueue not implemented.");
}
}

View file

@ -12,6 +12,7 @@ use anyhow::bail;
use anyhow::Context;
use anyhow::Result;
use base::error;
use base::Event;
use data_model::Le32;
use serde::Deserialize;
use serde::Serialize;
@ -41,6 +42,8 @@ const VIRTQ_AVAIL_F_NO_INTERRUPT: u16 = 0x1;
pub struct SplitQueue {
mem: GuestMemory,
event: Event,
/// The queue size in elements the driver selected. This is always guaranteed to be a power of
/// two, as required for split virtqueues.
size: u16,
@ -96,7 +99,7 @@ struct virtq_used_elem {
impl SplitQueue {
/// Constructs an activated split virtio queue with the given configuration.
pub fn new(config: &QueueConfig, mem: &GuestMemory) -> Result<SplitQueue> {
pub fn new(config: &QueueConfig, mem: &GuestMemory, event: Event) -> Result<SplitQueue> {
let size = config.size();
if !size.is_power_of_two() {
bail!("split queue size {size} is not a power of 2");
@ -125,6 +128,7 @@ impl SplitQueue {
Ok(SplitQueue {
mem: mem.clone(),
event,
size,
vector: config.vector(),
desc_table: config.desc_table(),
@ -167,6 +171,11 @@ impl SplitQueue {
self.used_ring
}
/// Get a reference to the queue's "kick event"
pub fn event(&self) -> &Event {
&self.event
}
// Return `index` modulo the currently configured queue size.
fn wrap_queue_index(&self, index: Wrapping<u16>) -> u16 {
// We know that `self.size` is a power of two (enforced by `new()`), so the modulus can
@ -557,10 +566,12 @@ impl SplitQueue {
pub fn restore(
queue_value: serde_json::Value,
mem: &GuestMemory,
event: Event,
) -> anyhow::Result<SplitQueue> {
let s: SplitQueueSnapshot = serde_json::from_value(queue_value)?;
let queue = SplitQueue {
mem: mem.clone(),
event,
size: s.size,
vector: s.vector,
desc_table: s.desc_table,
@ -682,7 +693,9 @@ mod tests {
queue.ack_features((1u64) << VIRTIO_RING_F_EVENT_IDX);
queue.set_ready(true);
queue.activate(mem).expect("QueueConfig::activate failed")
queue
.activate(mem, Event::new().unwrap())
.expect("QueueConfig::activate failed")
}
fn fake_desc_chain(mem: &GuestMemory) -> DescriptorChain {

View file

@ -35,7 +35,6 @@ pub type Result<T> = std::result::Result<T, RngError>;
struct Worker {
interrupt: Interrupt,
queue: Queue,
queue_evt: Event,
}
impl Worker {
@ -74,7 +73,7 @@ impl Worker {
}
let wait_ctx: WaitContext<Token> = match WaitContext::build_with(&[
(&self.queue_evt, Token::QueueAvailable),
(self.queue.event(), Token::QueueAvailable),
(&kill_evt, Token::Kill),
]) {
Ok(pc) => pc,
@ -105,7 +104,7 @@ impl Worker {
for event in events.iter().filter(|e| e.is_readable) {
match event.token {
Token::QueueAvailable => {
if let Err(e) = self.queue_evt.wait() {
if let Err(e) = self.queue.event().wait() {
error!("failed reading queue Event: {}", e);
break 'wait;
}
@ -165,20 +164,16 @@ impl VirtioDevice for Rng {
&mut self,
_mem: GuestMemory,
interrupt: Interrupt,
mut queues: BTreeMap<usize, (Queue, Event)>,
mut queues: BTreeMap<usize, Queue>,
) -> anyhow::Result<()> {
if queues.len() != 1 {
return Err(anyhow!("expected 1 queue, got {}", queues.len()));
}
let (queue, queue_evt) = queues.remove(&0).unwrap();
let queue = queues.remove(&0).unwrap();
self.worker_thread = Some(WorkerThread::start("v_rng", move |kill_evt| {
let worker = Worker {
interrupt,
queue,
queue_evt,
};
let worker = Worker { interrupt, queue };
worker.run(kill_evt)
}));
@ -206,7 +201,7 @@ impl VirtioDevice for Rng {
fn virtio_wake(
&mut self,
queues_state: Option<(GuestMemory, Interrupt, BTreeMap<usize, (Queue, Event)>)>,
queues_state: Option<(GuestMemory, Interrupt, BTreeMap<usize, Queue>)>,
) -> anyhow::Result<()> {
if let Some((mem, interrupt, queues)) = queues_state {
self.activate(mem, interrupt, queues)?;

View file

@ -429,7 +429,7 @@ impl VirtioDevice for VirtioSnd {
&mut self,
_guest_mem: GuestMemory,
interrupt: Interrupt,
queues: BTreeMap<usize, (Queue, Event)>,
queues: BTreeMap<usize, Queue>,
) -> anyhow::Result<()> {
if queues.len() != self.queue_sizes.len() {
return Err(anyhow!(
@ -474,7 +474,7 @@ enum LoopState {
fn run_worker(
interrupt: Interrupt,
queues: BTreeMap<usize, (Queue, Event)>,
queues: BTreeMap<usize, Queue>,
snd_data: SndData,
kill_evt: Event,
stream_info_builders: Vec<StreamInfoBuilder>,
@ -496,8 +496,9 @@ fn run_worker(
let streams = Rc::new(AsyncRwLock::new(streams));
let mut queues: Vec<(Queue, EventAsync)> = queues
.into_iter()
.map(|(_, (q, e))| {
.into_values()
.map(|q| {
let e = q.event().try_clone().expect("Failed to clone queue event");
(
q,
EventAsync::new(e, &ex).expect("Failed to create async event for queue"),

View file

@ -23,7 +23,6 @@ use anyhow::anyhow;
use anyhow::Context;
use base::error;
use base::Error as BaseError;
use base::Event;
use base::RawDescriptor;
use base::WorkerThread;
use data_model::Le32;
@ -109,7 +108,7 @@ impl VirtioDevice for Sound {
&mut self,
_mem: GuestMemory,
interrupt: Interrupt,
mut queues: BTreeMap<usize, (Queue, Event)>,
mut queues: BTreeMap<usize, Queue>,
) -> anyhow::Result<()> {
if self.worker_thread.is_some() {
return Err(anyhow!("virtio-snd: Device is already active"));
@ -120,10 +119,10 @@ impl VirtioDevice for Sound {
queues.len(),
));
}
let (control_queue, control_queue_evt) = queues.remove(&0).unwrap();
let (event_queue, event_queue_evt) = queues.remove(&1).unwrap();
let (tx_queue, tx_queue_evt) = queues.remove(&2).unwrap();
let (rx_queue, rx_queue_evt) = queues.remove(&3).unwrap();
let control_queue = queues.remove(&0).unwrap();
let event_queue = queues.remove(&1).unwrap();
let tx_queue = queues.remove(&2).unwrap();
let rx_queue = queues.remove(&3).unwrap();
let vios_client = self.vios_client.clone();
vios_client
@ -137,13 +136,9 @@ impl VirtioDevice for Sound {
vios_client,
interrupt,
Arc::new(Mutex::new(control_queue)),
control_queue_evt,
event_queue,
event_queue_evt,
Arc::new(Mutex::new(tx_queue)),
tx_queue_evt,
Arc::new(Mutex::new(rx_queue)),
rx_queue_evt,
) {
Ok(mut worker) => match worker.control_loop(kill_evt) {
Ok(_) => true,

View file

@ -30,9 +30,7 @@ pub struct Worker {
// Lock order: Must never hold more than one queue lock at the same time.
interrupt: Interrupt,
control_queue: Arc<Mutex<Queue>>,
control_queue_evt: Event,
event_queue: Queue,
event_queue_evt: Event,
vios_client: Arc<VioSClient>,
streams: Vec<StreamProxy>,
io_thread: Option<thread::JoinHandle<Result<()>>>,
@ -45,13 +43,9 @@ impl Worker {
vios_client: Arc<VioSClient>,
interrupt: Interrupt,
control_queue: Arc<Mutex<Queue>>,
control_queue_evt: Event,
event_queue: Queue,
event_queue_evt: Event,
tx_queue: Arc<Mutex<Queue>>,
tx_queue_evt: Event,
rx_queue: Arc<Mutex<Queue>>,
rx_queue_evt: Event,
) -> Result<Worker> {
let mut streams: Vec<StreamProxy> = Vec::with_capacity(vios_client.num_streams() as usize);
{
@ -83,23 +77,13 @@ impl Worker {
.spawn(move || {
try_set_real_time_priority();
io_loop(
interrupt_clone,
tx_queue,
tx_queue_evt,
rx_queue,
rx_queue_evt,
senders,
kill_io,
)
io_loop(interrupt_clone, tx_queue, rx_queue, senders, kill_io)
})
.map_err(SoundError::CreateThread)?;
Ok(Worker {
interrupt,
control_queue,
control_queue_evt,
event_queue,
event_queue_evt,
vios_client,
streams,
io_thread: Some(io_thread),
@ -123,8 +107,8 @@ impl Worker {
Kill,
}
let wait_ctx: WaitContext<Token> = WaitContext::build_with(&[
(&self.control_queue_evt, Token::ControlQAvailable),
(&self.event_queue_evt, Token::EventQAvailable),
(self.control_queue.lock().event(), Token::ControlQAvailable),
(self.event_queue.event(), Token::EventQAvailable),
(&event_notifier, Token::EventTriggered),
(&kill_evt, Token::Kill),
])
@ -141,7 +125,9 @@ impl Worker {
for wait_evt in wait_events.iter().filter(|e| e.is_readable) {
match wait_evt.token {
Token::ControlQAvailable => {
self.control_queue_evt
self.control_queue
.lock()
.event()
.wait()
.map_err(SoundError::QueueEvt)?;
self.process_controlq_buffers()?;
@ -150,7 +136,10 @@ impl Worker {
// Just read from the event object to make sure the producer of such events
// never blocks. The buffers will only be used when actual virtio-snd
// events are triggered.
self.event_queue_evt.wait().map_err(SoundError::QueueEvt)?;
self.event_queue
.event()
.wait()
.map_err(SoundError::QueueEvt)?;
}
Token::EventTriggered => {
event_notifier.wait().map_err(SoundError::QueueEvt)?;
@ -505,9 +494,7 @@ impl Drop for Worker {
fn io_loop(
interrupt: Interrupt,
tx_queue: Arc<Mutex<Queue>>,
tx_queue_evt: Event,
rx_queue: Arc<Mutex<Queue>>,
rx_queue_evt: Event,
senders: Vec<Sender<Box<StreamMsg>>>,
kill_evt: Event,
) -> Result<()> {
@ -518,8 +505,8 @@ fn io_loop(
Kill,
}
let wait_ctx: WaitContext<Token> = WaitContext::build_with(&[
(&tx_queue_evt, Token::TxQAvailable),
(&rx_queue_evt, Token::RxQAvailable),
(tx_queue.lock().event(), Token::TxQAvailable),
(rx_queue.lock().event(), Token::RxQAvailable),
(&kill_evt, Token::Kill),
])
.map_err(SoundError::WaitCtx)?;
@ -529,11 +516,19 @@ fn io_loop(
for wait_evt in wait_events.iter().filter(|e| e.is_readable) {
let queue = match wait_evt.token {
Token::TxQAvailable => {
tx_queue_evt.wait().map_err(SoundError::QueueEvt)?;
tx_queue
.lock()
.event()
.wait()
.map_err(SoundError::QueueEvt)?;
&tx_queue
}
Token::RxQAvailable => {
rx_queue_evt.wait().map_err(SoundError::QueueEvt)?;
rx_queue
.lock()
.event()
.wait()
.map_err(SoundError::QueueEvt)?;
&rx_queue
}
Token::Kill => {

View file

@ -40,7 +40,6 @@ const TPM_BUFSIZE: usize = 4096;
struct Worker {
interrupt: Interrupt,
queue: Queue,
queue_evt: Event,
backend: Box<dyn TpmBackend>,
}
@ -111,7 +110,7 @@ impl Worker {
}
let wait_ctx = match WaitContext::build_with(&[
(&self.queue_evt, Token::QueueAvailable),
(self.queue.event(), Token::QueueAvailable),
(&kill_evt, Token::Kill),
])
.and_then(|wc| {
@ -140,7 +139,7 @@ impl Worker {
for event in events.iter().filter(|e| e.is_readable) {
match event.token {
Token::QueueAvailable => {
if let Err(e) = self.queue_evt.wait() {
if let Err(e) = self.queue.event().wait() {
error!("vtpm failed reading queue Event: {}", e);
break 'wait;
}
@ -197,19 +196,18 @@ impl VirtioDevice for Tpm {
&mut self,
_mem: GuestMemory,
interrupt: Interrupt,
mut queues: BTreeMap<usize, (Queue, Event)>,
mut queues: BTreeMap<usize, Queue>,
) -> anyhow::Result<()> {
if queues.len() != 1 {
return Err(anyhow!("expected 1 queue, got {}", queues.len()));
}
let (queue, queue_evt) = queues.pop_first().unwrap().1;
let queue = queues.pop_first().unwrap().1;
let backend = self.backend.take().context("no backend in vtpm")?;
let worker = Worker {
interrupt,
queue,
queue_evt,
backend,
};

View file

@ -184,7 +184,7 @@ where
&mut self,
mem: GuestMemory,
interrupt: Interrupt,
queues: BTreeMap<usize, (Queue, Event)>,
queues: BTreeMap<usize, Queue>,
) -> anyhow::Result<()> {
if queues.len() != NUM_QUEUES {
return Err(anyhow!(
@ -410,19 +410,21 @@ pub mod tests {
let mut q0 = QueueConfig::new(1, 0);
q0.set_ready(true);
let q0 = q0.activate(&guest_memory).expect("QueueConfig::activate");
let e0 = Event::new().unwrap();
let q0 = q0
.activate(&guest_memory, Event::new().unwrap())
.expect("QueueConfig::activate");
let mut q1 = QueueConfig::new(1, 0);
q1.set_ready(true);
let q1 = q1.activate(&guest_memory).expect("QueueConfig::activate");
let e1 = Event::new().unwrap();
let q1 = q1
.activate(&guest_memory, Event::new().unwrap())
.expect("QueueConfig::activate");
// Just testing that we don't panic, for now
let _ = net.activate(
guest_memory,
Interrupt::new(IrqLevelEvent::new().unwrap(), None, VIRTIO_MSI_NO_VECTOR),
BTreeMap::from([(0, (q0, e0)), (1, (q1, e1))]),
BTreeMap::from([(0, q0), (1, q1)]),
);
}
}

View file

@ -104,7 +104,7 @@ impl VirtioDevice for Scmi {
&mut self,
mem: GuestMemory,
interrupt: Interrupt,
queues: BTreeMap<usize, (Queue, Event)>,
queues: BTreeMap<usize, Queue>,
) -> anyhow::Result<()> {
if queues.len() != NUM_QUEUES {
return Err(anyhow!(

View file

@ -238,7 +238,6 @@ impl VhostUserBackend for BlockBackend {
queue: virtio::Queue,
_mem: GuestMemory,
doorbell: Interrupt,
kick_evt: Event,
) -> anyhow::Result<()> {
// `start_worker` will return early if the worker has already started.
self.start_worker();
@ -250,7 +249,6 @@ impl VhostUserBackend for BlockBackend {
.unbounded_send(WorkerCmd::StartQueue {
index: idx,
queue,
kick_evt,
interrupt: doorbell,
})
.unwrap_or_else(|_| panic!("worker channel closed early"));

View file

@ -163,29 +163,24 @@ impl VhostUserBackend for ConsoleBackend {
queue: virtio::Queue,
_mem: GuestMemory,
doorbell: Interrupt,
kick_evt: Event,
) -> anyhow::Result<()> {
let queue = Arc::new(Mutex::new(queue));
match idx {
// ReceiveQueue
0 => {
let res = self.device.console.start_receive_queue(
&self.ex,
queue.clone(),
doorbell,
kick_evt,
);
let res =
self.device
.console
.start_receive_queue(&self.ex, queue.clone(), doorbell);
self.active_in_queue = Some(queue);
res
}
// TransmitQueue
1 => {
let res = self.device.console.start_transmit_queue(
&self.ex,
queue.clone(),
doorbell,
kick_evt,
);
let res =
self.device
.console
.start_transmit_queue(&self.ex, queue.clone(), doorbell);
self.active_out_queue = Some(queue);
res
}

View file

@ -16,7 +16,6 @@ use argh::FromArgs;
use base::error;
use base::warn;
use base::AsRawDescriptors;
use base::Event;
use base::RawDescriptor;
use base::Tube;
use cros_async::EventAsync;
@ -63,7 +62,7 @@ async fn handle_fs_queue(
error!("Failed to read kick event for fs queue: {}", e);
break;
}
if let Err(e) = process_fs_queue(&doorbell, &queue, &server, &tube, slot) {
if let Err(e) = process_fs_queue(&doorbell, &mut queue.borrow_mut(), &server, &tube, slot) {
error!("Process FS queue failed: {}", e);
break;
}
@ -180,13 +179,16 @@ impl VhostUserBackend for FsBackend {
queue: virtio::Queue,
_mem: GuestMemory,
doorbell: Interrupt,
kick_evt: Event,
) -> anyhow::Result<()> {
if self.workers[idx].is_some() {
warn!("Starting new queue handler without stopping old handler");
self.stop_queue(idx)?;
}
let kick_evt = queue
.event()
.try_clone()
.context("failed to clone queue event")?;
let kick_evt = EventAsync::new(kick_evt, &self.ex)
.context("failed to create EventAsync for kick_evt")?;
let (handle, registration) = AbortHandle::new_pair();

View file

@ -13,7 +13,6 @@ use anyhow::bail;
use anyhow::Context;
use base::error;
use base::warn;
use base::Event;
use base::Tube;
use cros_async::EventAsync;
use cros_async::Executor;
@ -148,7 +147,6 @@ impl VhostUserBackend for GpuBackend {
queue: Queue,
mem: GuestMemory,
doorbell: Interrupt,
kick_evt: Event,
) -> anyhow::Result<()> {
if self.queue_workers[idx].is_some() {
warn!("Starting new queue handler without stopping old handler");
@ -163,6 +161,10 @@ impl VhostUserBackend for GpuBackend {
_ => bail!("attempted to start unknown queue: {}", idx),
}
let kick_evt = queue
.event()
.try_clone()
.context("failed to clone queue event")?;
let kick_evt = EventAsync::new(kick_evt, &self.ex)
.context("failed to create EventAsync for kick_evt")?;

View file

@ -163,7 +163,6 @@ pub trait VhostUserBackend {
queue: Queue,
mem: GuestMemory,
doorbell: Interrupt,
kick_evt: Event,
) -> anyhow::Result<()>;
/// Indicates that the backend should stop processing requests for virtio queue number `idx`.
@ -216,9 +215,6 @@ struct Vring {
enabled: bool,
// Active queue that is only `Some` when the device is sleeping.
paused_queue: Option<Queue>,
// This queue kick_evt is saved so that the queues handlers can start back up with the
// same events when it is woken up.
kick_evt: Option<Event>,
}
#[derive(Serialize, Deserialize)]
@ -237,7 +233,6 @@ impl Vring {
doorbell: None,
enabled: false,
paused_queue: None,
kick_evt: None,
}
}
@ -246,7 +241,6 @@ impl Vring {
self.doorbell = None;
self.enabled = false;
self.paused_queue = None;
self.kick_evt = None;
}
fn snapshot(&self) -> anyhow::Result<VringSnapshot> {
@ -261,12 +255,24 @@ impl Vring {
})
}
fn restore(&mut self, vring_snapshot: VringSnapshot, mem: &GuestMemory) -> anyhow::Result<()> {
fn restore(
&mut self,
vring_snapshot: VringSnapshot,
mem: &GuestMemory,
event: Option<Event>,
) -> anyhow::Result<()> {
self.queue.restore(vring_snapshot.queue)?;
self.enabled = vring_snapshot.enabled;
self.paused_queue = vring_snapshot
.paused_queue
.map(|value| Queue::restore(&self.queue, value, mem))
.map(|value| {
Queue::restore(
&self.queue,
value,
mem,
event.context("missing queue event")?,
)
})
.transpose()?;
Ok(())
}
@ -586,8 +592,6 @@ impl VhostUserSlaveReqHandlerMut for DeviceRequestHandler {
}
let kick_evt = self.ops.set_vring_kick(index, file)?;
// Save kick_evts so they can be re-used when waking up the device.
vring.kick_evt = Some(kick_evt.try_clone().expect("Failed to clone kick_evt"));
// Enable any virtqueue features that were negotiated (like VIRTIO_RING_F_EVENT_IDX).
vring.queue.ack_features(self.backend.acked_features());
@ -599,7 +603,7 @@ impl VhostUserSlaveReqHandlerMut for DeviceRequestHandler {
.cloned()
.ok_or(VhostError::InvalidOperation)?;
let queue = match vring.queue.activate(&mem) {
let queue = match vring.queue.activate(&mem, kick_evt) {
Ok(queue) => queue,
Err(e) => {
error!("failed to activate vring: {:#}", e);
@ -611,7 +615,7 @@ impl VhostUserSlaveReqHandlerMut for DeviceRequestHandler {
if let Err(e) = self
.backend
.start_queue(index as usize, queue, mem, doorbell, kick_evt)
.start_queue(index as usize, queue, mem, doorbell)
{
error!("Failed to start queue {}: {}", index, e);
return Err(VhostError::SlaveInternalError);
@ -739,17 +743,8 @@ impl VhostUserSlaveReqHandlerMut for DeviceRequestHandler {
if let Some(queue) = vring.paused_queue.take() {
let mem = self.mem.clone().ok_or(VhostError::SlaveInternalError)?;
let doorbell = vring.doorbell.clone().expect("Failed to clone doorbell");
let kick_evt = vring
.kick_evt
.as_ref()
.ok_or(VhostError::SlaveInternalError)?
.try_clone()
.expect("Failed to clone kick_evt");
if let Err(e) = self
.backend
.start_queue(index, queue, mem, doorbell, kick_evt)
{
if let Err(e) = self.backend.start_queue(index, queue, mem, doorbell) {
error!("Failed to start queue {}: {}", index, e);
return Err(VhostError::SlaveInternalError);
}
@ -787,28 +782,30 @@ impl VhostUserSlaveReqHandlerMut for DeviceRequestHandler {
let snapshotted_vrings = device_request_handler_snapshot.vrings;
assert_eq!(snapshotted_vrings.len(), self.vrings.len());
for (vring, snapshotted_vring) in self.vrings.iter_mut().zip(snapshotted_vrings.into_iter())
{
vring
.restore(snapshotted_vring, mem)
.map_err(VhostError::RestoreError)?;
}
// `queue_evts` should only be `Some` if the snapshotted device is already activated.
// This wire up the doorbell events.
if let Some(queue_evts) = queue_evts {
// TODO(b/288596005): It is assumed that the index of `queue_evts` should map to the
// index of `self.vrings`. However, this assumption may break in the future, so a Map
// of indexes to queue_evt should be used to support sparse activated queues.
for (index, queue_evt_fd) in queue_evts.into_iter().enumerate() {
if let Some(vring) = self.vrings.get_mut(index) {
let kick_evt = self.ops.set_vring_kick(index as u8, Some(queue_evt_fd))?;
// Save kick_evts so they can be re-used when waking up the device.
vring.kick_evt = Some(kick_evt);
} else {
return Err(VhostError::VringIndexNotFound(index));
}
}
let mut queue_evts_iter = queue_evts.map(Vec::into_iter);
for (index, (vring, snapshotted_vring)) in self
.vrings
.iter_mut()
.zip(snapshotted_vrings.into_iter())
.enumerate()
{
let queue_evt = if let Some(queue_evts_iter) = &mut queue_evts_iter {
// TODO(b/288596005): It is assumed that the index of `queue_evts` should map to the
// index of `self.vrings`. However, this assumption may break in the future, so a
// Map of indexes to queue_evt should be used to support sparse activated queues.
let queue_evt_file = queue_evts_iter
.next()
.ok_or(VhostError::VringIndexNotFound(index))?;
Some(self.ops.set_vring_kick(index as u8, Some(queue_evt_file))?)
} else {
None
};
vring
.restore(snapshotted_vring, mem, queue_evt)
.map_err(VhostError::RestoreError)?;
}
self.backend
@ -976,6 +973,7 @@ mod tests {
use anyhow::anyhow;
use anyhow::bail;
use base::Event;
#[cfg(unix)]
use tempfile::Builder;
#[cfg(unix)]
@ -1075,7 +1073,6 @@ mod tests {
queue: Queue,
_mem: GuestMemory,
_doorbell: Interrupt,
_kick_evt: Event,
) -> anyhow::Result<()> {
self.active_queues[idx] = Some(queue);
Ok(())
@ -1144,12 +1141,13 @@ mod tests {
println!("activate_mem_table: queue_index={}", idx);
let mut queue = QueueConfig::new(0x10, 0);
queue.set_ready(true);
let queue = queue.activate(&mem).expect("QueueConfig::activate");
let queue_evt = Event::new().unwrap();
let queue = queue
.activate(&mem, Event::new().unwrap())
.expect("QueueConfig::activate");
let irqfd = Event::new().unwrap();
vmm_handler
.activate_vring(&mem, idx, &queue, &queue_evt, &irqfd)
.activate_vring(&mem, idx, &queue, &irqfd)
.unwrap();
}
@ -1228,12 +1226,13 @@ mod tests {
println!("activate_mem_table: queue_index={}", idx);
let mut queue = QueueConfig::new(0x10, 0);
queue.set_ready(true);
let queue = queue.activate(&mem).expect("QueueConfig::activate");
let queue_evt = Event::new().unwrap();
let queue = queue
.activate(&mem, Event::new().unwrap())
.expect("QueueConfig::activate");
let irqfd = Event::new().unwrap();
vmm_handler
.activate_vring(&mem, idx, &queue, &queue_evt, &irqfd)
.activate_vring(&mem, idx, &queue, &irqfd)
.unwrap();
}
}

View file

@ -9,7 +9,6 @@ use anyhow::bail;
use anyhow::Context;
use base::error;
use base::AsRawDescriptors;
use base::Event;
use cros_async::EventAsync;
use cros_async::Executor;
use cros_async::IntoAsync;
@ -116,7 +115,7 @@ pub struct NetBackend<T: TapT + IntoAsync> {
acked_protocol_features: VhostUserProtocolFeatures,
mtu: u16,
#[cfg(all(windows, feature = "slirp"))]
slirp_kill_event: Event,
slirp_kill_event: base::Event,
workers: [Option<(TaskHandle<Queue>, oneshot::Sender<()>)>; MAX_QUEUE_NUM],
}
@ -203,9 +202,8 @@ where
queue: virtio::Queue,
mem: GuestMemory,
doorbell: Interrupt,
kick_evt: Event,
) -> anyhow::Result<()> {
sys::start_queue(self, idx, queue, mem, doorbell, kick_evt)
sys::start_queue(self, idx, queue, mem, doorbell)
}
fn stop_queue(&mut self, idx: usize) -> anyhow::Result<virtio::Queue> {

View file

@ -14,7 +14,6 @@ use base::error;
use base::info;
use base::validate_raw_descriptor;
use base::warn;
use base::Event;
use base::RawDescriptor;
use cros_async::EventAsync;
use cros_async::Executor;
@ -184,7 +183,6 @@ pub(in crate::virtio::vhost::user::device::net) fn start_queue<T: 'static + Into
queue: virtio::Queue,
_mem: GuestMemory,
doorbell: Interrupt,
kick_evt: Event,
) -> anyhow::Result<()> {
if backend.workers[idx].is_some() {
warn!("Starting new queue handler without stopping old handler");
@ -195,6 +193,10 @@ pub(in crate::virtio::vhost::user::device::net) fn start_queue<T: 'static + Into
// Safe because the executor is initialized in main() below.
let ex = ex.get().expect("Executor not initialized");
let kick_evt = queue
.event()
.try_clone()
.context("failed to clone queue event")?;
let kick_evt =
EventAsync::new(kick_evt, ex).context("failed to create EventAsync for kick_evt")?;
let tap = backend

View file

@ -175,7 +175,6 @@ pub(in crate::virtio::vhost::user::device::net) fn start_queue<T: 'static + Into
queue: virtio::Queue,
_mem: GuestMemory,
doorbell: Interrupt,
kick_evt: Event,
) -> anyhow::Result<()> {
if backend.workers.get(idx).is_some() {
warn!("Starting new queue handler without stopping old handler");
@ -189,6 +188,10 @@ pub(in crate::virtio::vhost::user::device::net) fn start_queue<T: 'static + Into
// Safe because the executor is initialized in main() below.
let ex = ex.get().expect("Executor not initialized");
let kick_evt = queue
.event()
.try_clone()
.context("failed to clone queue event")?;
let kick_evt =
EventAsync::new(kick_evt, ex).context("failed to create EventAsync for kick_evt")?;
let tap = backend

View file

@ -11,7 +11,6 @@ use anyhow::bail;
use anyhow::Context;
use base::error;
use base::warn;
use base::Event;
use cros_async::sync::RwLock as AsyncRwLock;
use cros_async::EventAsync;
use cros_async::Executor;
@ -191,7 +190,6 @@ impl VhostUserBackend for SndBackend {
queue: virtio::Queue,
_mem: GuestMemory,
doorbell: Interrupt,
kick_evt: Event,
) -> anyhow::Result<()> {
if self.workers[idx].is_some() {
warn!("Starting new queue handler without stopping old handler");
@ -201,6 +199,10 @@ impl VhostUserBackend for SndBackend {
// Safe because the executor is initialized in main() below.
let ex = SND_EXECUTOR.get().expect("Executor not initialized");
let kick_evt = queue
.event()
.try_clone()
.context("failed to clone queue event")?;
let mut kick_evt =
EventAsync::new(kick_evt, ex).context("failed to create EventAsync for kick_evt")?;
let (handle, registration) = AbortHandle::new_pair();

View file

@ -434,6 +434,7 @@ mod test {
use std::io::Read;
use std::io::Write;
use base::Event;
use vm_memory::GuestMemory;
use super::*;
@ -465,7 +466,9 @@ mod test {
queue.set_avail_ring(IOVA(addrs.avail));
queue.set_used_ring(IOVA(addrs.used));
queue.set_ready(true);
queue.activate(mem).expect("QueueConfig::activate")
queue
.activate(mem, Event::new().unwrap())
.expect("QueueConfig::activate")
}
fn device_write(q: &mut DeviceQueue, data: &[u8]) -> usize {

View file

@ -17,7 +17,6 @@ use argh::FromArgs;
use base::clone_descriptor;
use base::error;
use base::warn;
use base::Event;
use base::FromRawDescriptor;
use base::SafeDescriptor;
use base::Tube;
@ -66,7 +65,11 @@ async fn run_out_queue(
break;
}
wl::process_out_queue(&doorbell, &queue, &mut wlstate.borrow_mut());
wl::process_out_queue(
&doorbell,
&mut queue.borrow_mut(),
&mut wlstate.borrow_mut(),
);
}
}
@ -86,8 +89,11 @@ async fn run_in_queue(
break;
}
if wl::process_in_queue(&doorbell, &queue, &mut wlstate.borrow_mut())
== Err(wl::DescriptorsExhausted)
if wl::process_in_queue(
&doorbell,
&mut queue.borrow_mut(),
&mut wlstate.borrow_mut(),
) == Err(wl::DescriptorsExhausted)
{
if let Err(e) = kick_evt.next_val().await {
error!("Failed to read kick event for in queue: {}", e);
@ -204,13 +210,16 @@ impl VhostUserBackend for WlBackend {
queue: Queue,
_mem: GuestMemory,
doorbell: Interrupt,
kick_evt: Event,
) -> anyhow::Result<()> {
if self.workers[idx].is_some() {
warn!("Starting new queue handler without stopping old handler");
self.stop_queue(idx)?;
}
let kick_evt = queue
.event()
.try_clone()
.context("failed to clone queue event")?;
let kick_evt = EventAsync::new(kick_evt, &self.ex)
.context("failed to create EventAsync for kick_evt")?;

View file

@ -436,12 +436,7 @@ impl Worker {
// - Process messages from the device over Virtio, from the sibling over a unix domain socket,
// from the main thread in this device over a tube and from the main crosvm process over a
// tube.
fn run(
&mut self,
rx_queue_evt: Event,
tx_queue_evt: Event,
kill_evt: Event,
) -> Result<ExitReason> {
fn run(&mut self, kill_evt: Event) -> Result<ExitReason> {
let fault_event = self
.iommu
.lock()
@ -460,8 +455,8 @@ impl Worker {
// TODO(abhishekbh): Should interrupt.signal_config_changed be called here ?.
let mut wait_ctx: WaitContext<Token> = WaitContext::build_with(&[
(&self.slave_req_helper, Token::SiblingSocket),
(&rx_queue_evt, Token::RxQueue),
(&tx_queue_evt, Token::TxQueue),
(self.rx_queue.event(), Token::RxQueue),
(self.tx_queue.event(), Token::TxQueue),
(&kill_evt, Token::Kill),
(&fault_event, Token::IommuFault),
])
@ -496,7 +491,7 @@ impl Worker {
}
}
Token::RxQueue => {
if let Err(e) = rx_queue_evt.wait() {
if let Err(e) = self.rx_queue.event().wait() {
bail!("error reading rx queue Event: {}", e);
}
@ -508,7 +503,7 @@ impl Worker {
}
}
Token::TxQueue => {
if let Err(e) = tx_queue_evt.wait() {
if let Err(e) = self.tx_queue.event().wait() {
bail!("error reading tx queue event: {}", e);
}
self.process_tx()
@ -1255,8 +1250,6 @@ enum State {
interrupt: Interrupt,
rx_queue: Queue,
tx_queue: Queue,
rx_queue_evt: Event,
tx_queue_evt: Event,
iommu: Arc<Mutex<IpcMemoryMapper>>,
},
@ -1470,43 +1463,30 @@ impl VirtioVhostUser {
let old_state: State = std::mem::replace(&mut *state, State::Invalid);
// Retrieve values stored in the state value.
let (
vm_memory_client,
listener,
mem,
interrupt,
rx_queue,
tx_queue,
rx_queue_evt,
tx_queue_evt,
iommu,
) = match old_state {
State::Activated {
vm_memory_client,
listener,
mem,
interrupt,
rx_queue,
tx_queue,
rx_queue_evt,
tx_queue_evt,
iommu,
} => (
vm_memory_client,
listener,
mem,
interrupt,
rx_queue,
tx_queue,
rx_queue_evt,
tx_queue_evt,
iommu,
),
s => {
// Unreachable because we've checked the state at the beginning of this function.
unreachable!("invalid state: {}", s)
}
};
let (vm_memory_client, listener, mem, interrupt, rx_queue, tx_queue, iommu) =
match old_state {
State::Activated {
vm_memory_client,
listener,
mem,
interrupt,
rx_queue,
tx_queue,
iommu,
} => (
vm_memory_client,
listener,
mem,
interrupt,
rx_queue,
tx_queue,
iommu,
),
s => {
// Unreachable because we've checked the state at the beginning of this function.
unreachable!("invalid state: {}", s)
}
};
// Safe because a PCI bar is guaranteed to be allocated at this point.
let io_pci_bar = self.io_pci_bar.expect("PCI bar unallocated");
@ -1557,11 +1537,7 @@ impl VirtioVhostUser {
pending_unmap: None,
};
let run_result = worker.run(
rx_queue_evt.try_clone().unwrap(),
tx_queue_evt.try_clone().unwrap(),
kill_evt,
);
let run_result = worker.run(kill_evt);
if let Err(e) = worker.release_exported_regions() {
error!("failed to release exported memory: {:?}", e);
@ -1609,8 +1585,6 @@ impl VirtioVhostUser {
interrupt,
rx_queue,
tx_queue,
rx_queue_evt,
tx_queue_evt,
iommu,
};
@ -1771,14 +1745,14 @@ impl VirtioDevice for VirtioVhostUser {
&mut self,
mem: GuestMemory,
interrupt: Interrupt,
mut queues: BTreeMap<usize, (Queue, Event)>,
mut queues: BTreeMap<usize, Queue>,
) -> anyhow::Result<()> {
if queues.len() != NUM_PROXY_DEVICE_QUEUES {
return Err(anyhow!("bad queue length: {}", queues.len()));
}
let (rx_queue, rx_queue_evt) = queues.pop_first().unwrap().1;
let (tx_queue, tx_queue_evt) = queues.pop_first().unwrap().1;
let rx_queue = queues.pop_first().unwrap().1;
let tx_queue = queues.pop_first().unwrap().1;
let mut state = self.state.lock();
// Use `State::Invalid` as the intermediate state here.
@ -1796,8 +1770,6 @@ impl VirtioDevice for VirtioVhostUser {
interrupt,
rx_queue,
tx_queue,
rx_queue_evt,
tx_queue_evt,
iommu: self.iommu.take().unwrap(),
};
}

View file

@ -221,7 +221,6 @@ impl VhostUserHandler {
mem: &GuestMemory,
queue_index: usize,
queue: &Queue,
queue_evt: &Event,
irqfd: &Event,
) -> Result<()> {
self.vu
@ -254,7 +253,7 @@ impl VhostUserHandler {
.set_vring_call(queue_index, irqfd)
.map_err(Error::SetVringCall)?;
self.vu
.set_vring_kick(queue_index, queue_evt)
.set_vring_kick(queue_index, queue.event())
.map_err(Error::SetVringKick)?;
self.vu
.set_vring_enable(queue_index, true)
@ -268,7 +267,7 @@ impl VhostUserHandler {
&mut self,
mem: GuestMemory,
interrupt: Interrupt,
queues: BTreeMap<usize, (Queue, Event)>,
queues: BTreeMap<usize, Queue>,
label: &str,
) -> Result<WorkerThread<()>> {
self.set_mem_table(&mem)?;
@ -280,11 +279,11 @@ impl VhostUserHandler {
let msix_config = msix_config_opt.lock();
let non_msix_evt = Event::new().map_err(Error::CreateEvent)?;
for (&queue_index, (queue, queue_evt)) in queues.iter() {
for (&queue_index, queue) in queues.iter() {
let irqfd = msix_config
.get_irqfd(queue.vector() as usize)
.unwrap_or(&non_msix_evt);
self.activate_vring(&mem, queue_index, queue, queue_evt, irqfd)?;
self.activate_vring(&mem, queue_index, queue, irqfd)?;
}
drop(msix_config);

View file

@ -161,7 +161,7 @@ impl VirtioDevice for VhostUserVirtioDevice {
&mut self,
mem: GuestMemory,
interrupt: Interrupt,
queues: BTreeMap<usize, (Queue, Event)>,
queues: BTreeMap<usize, Queue>,
) -> anyhow::Result<()> {
let worker_thread = self
.handler
@ -216,7 +216,7 @@ impl VirtioDevice for VhostUserVirtioDevice {
&mut self,
// Vhost user doesn't need to pass queue_states back to the device process, since it will
// already have it.
_queues_state: Option<(GuestMemory, Interrupt, BTreeMap<usize, (Queue, Event)>)>,
_queues_state: Option<(GuestMemory, Interrupt, BTreeMap<usize, Queue>)>,
) -> anyhow::Result<()> {
self.handler
.borrow_mut()

View file

@ -171,7 +171,7 @@ impl VirtioDevice for Vsock {
&mut self,
mem: GuestMemory,
interrupt: Interrupt,
mut queues: BTreeMap<usize, (Queue, Event)>,
mut queues: BTreeMap<usize, Queue>,
) -> anyhow::Result<()> {
if queues.len() != NUM_QUEUES {
return Err(anyhow!(
@ -188,7 +188,7 @@ impl VirtioDevice for Vsock {
// The third vq is an event-only vq that is not handled by the vhost
// subsystem (but still needs to exist). Split it off here.
let mut event_queue = queues.remove(&2).unwrap().0;
let mut event_queue = queues.remove(&2).unwrap();
// Send TRANSPORT_RESET event if needed.
if self.needs_transport_reset {
self.needs_transport_reset = false;
@ -265,11 +265,7 @@ impl VirtioDevice for Vsock {
.vhost_handle
.stop()
.context("failed to stop vrings")?;
let mut queues: BTreeMap<usize, Queue> = worker
.queues
.into_iter()
.map(|(i, (q, _))| (i, q))
.collect();
let mut queues: BTreeMap<usize, Queue> = worker.queues;
let mut vrings_base = Vec::new();
for (pos, _) in queues.iter() {
let vring_base = VringBase {
@ -291,7 +287,7 @@ impl VirtioDevice for Vsock {
fn virtio_wake(
&mut self,
device_state: Option<(GuestMemory, Interrupt, BTreeMap<usize, (Queue, Event)>)>,
device_state: Option<(GuestMemory, Interrupt, BTreeMap<usize, Queue>)>,
) -> anyhow::Result<()> {
match device_state {
None => Ok(()),

View file

@ -33,7 +33,7 @@ pub struct VringBase {
/// Worker that takes care of running the vhost device.
pub struct Worker<T: Vhost> {
interrupt: Interrupt,
pub queues: BTreeMap<usize, (Queue, Event)>,
pub queues: BTreeMap<usize, Queue>,
pub vhost_handle: T,
pub vhost_interrupt: Vec<Event>,
acked_features: u64,
@ -43,7 +43,7 @@ pub struct Worker<T: Vhost> {
impl<T: Vhost> Worker<T> {
pub fn new(
queues: BTreeMap<usize, (Queue, Event)>,
queues: BTreeMap<usize, Queue>,
vhost_handle: T,
vhost_interrupt: Vec<Event>,
interrupt: Interrupt,
@ -98,7 +98,7 @@ impl<T: Vhost> Worker<T> {
.set_mem_table(&mem)
.map_err(Error::VhostSetMemTable)?;
for (&queue_index, (queue, queue_evt)) in self.queues.iter() {
for (&queue_index, queue) in self.queues.iter() {
self.vhost_handle
.set_vring_num(queue_index, queue.size())
.map_err(Error::VhostSetVringNum)?;
@ -135,7 +135,7 @@ impl<T: Vhost> Worker<T> {
}
self.set_vring_call_for_entry(queue_index, queue.vector() as usize)?;
self.vhost_handle
.set_vring_kick(queue_index, queue_evt)
.set_vring_kick(queue_index, queue.event())
.map_err(Error::VhostSetVringKick)?;
}
@ -184,7 +184,7 @@ impl<T: Vhost> Worker<T> {
.wait()
.map_err(Error::VhostIrqRead)?;
self.interrupt
.signal_used_queue(self.queues[&index].0.vector());
.signal_used_queue(self.queues[&index].vector());
}
Token::InterruptResample => {
self.interrupt.interrupt_resample();
@ -198,7 +198,7 @@ impl<T: Vhost> Worker<T> {
match socket.recv() {
Ok(VhostDevRequest::MsixEntryChanged(index)) => {
let mut qindex = 0;
for (&queue_index, (queue, _evt)) in self.queues.iter() {
for (&queue_index, queue) in self.queues.iter() {
if queue.vector() == index as u16 {
qindex = queue_index;
break;
@ -289,7 +289,7 @@ impl<T: Vhost> Worker<T> {
.map_err(Error::VhostSetVringCall)?;
}
} else {
for (&queue_index, (queue, _evt)) in self.queues.iter() {
for (&queue_index, queue) in self.queues.iter() {
let vector = queue.vector() as usize;
if !msix_config.table_masked(vector) {
if let Some(irqfd) = msix_config.get_irqfd(vector) {

View file

@ -202,7 +202,7 @@ impl VirtioDevice for VideoDevice {
&mut self,
mem: GuestMemory,
interrupt: Interrupt,
mut queues: BTreeMap<usize, (Queue, Event)>,
mut queues: BTreeMap<usize, Queue>,
) -> anyhow::Result<()> {
if queues.len() != QUEUE_SIZES.len() {
return Err(anyhow!(
@ -217,8 +217,8 @@ impl VirtioDevice for VideoDevice {
.context("failed to create kill Event pair")?;
self.kill_evt = Some(self_kill_evt);
let (cmd_queue, cmd_evt) = queues.pop_first().unwrap().1;
let (event_queue, event_evt) = queues.pop_first().unwrap().1;
let cmd_queue = queues.pop_first().unwrap().1;
let event_queue = queues.pop_first().unwrap().1;
let backend = self.backend;
let resource_bridge = self
.resource_bridge
@ -240,7 +240,7 @@ impl VirtioDevice for VideoDevice {
}
};
if let Err(e) = worker.run(device, &cmd_evt, &event_evt, &kill_evt) {
if let Err(e) = worker.run(device, &kill_evt) {
error!("Failed to start decoder worker: {}", e);
};
// Don't return any information since the return value is never checked.
@ -292,7 +292,7 @@ impl VirtioDevice for VideoDevice {
}
};
if let Err(e) = worker.run(device, &cmd_evt, &event_evt, &kill_evt) {
if let Err(e) = worker.run(device, &kill_evt) {
error!("Failed to start encoder worker: {}", e);
}
}),

View file

@ -298,19 +298,11 @@ impl Worker {
/// # Arguments
///
/// * `device` - Instance of backend device
/// * `cmd_evt` - Driver-to-device kick event for the command queue
/// * `event_evt` - Driver-to-device kick event for the event queue
/// * `kill_evt` - `Event` notified to make `run` stop and return
pub fn run(
&mut self,
mut device: Box<dyn Device>,
cmd_evt: &Event,
event_evt: &Event,
kill_evt: &Event,
) -> Result<()> {
pub fn run(&mut self, mut device: Box<dyn Device>, kill_evt: &Event) -> Result<()> {
let wait_ctx: WaitContext<Token> = WaitContext::build_with(&[
(cmd_evt, Token::CmdQueue),
(event_evt, Token::EventQueue),
(self.cmd_queue.event(), Token::CmdQueue),
(self.event_queue.event(), Token::EventQueue),
(kill_evt, Token::Kill),
])
.and_then(|wc| {
@ -329,11 +321,11 @@ impl Worker {
for wait_event in wait_events.iter().filter(|e| e.is_readable) {
match wait_event.token {
Token::CmdQueue => {
let _ = cmd_evt.wait();
let _ = self.cmd_queue.event().wait();
self.handle_command_queue(device.as_mut(), &wait_ctx)?;
}
Token::EventQueue => {
let _ = event_evt.wait();
let _ = self.event_queue.event().wait();
}
Token::Event { id } => {
self.handle_event(device.as_mut(), id)?;

View file

@ -118,7 +118,7 @@ pub trait VirtioDevice: Send {
&mut self,
mem: GuestMemory,
interrupt: Interrupt,
queues: BTreeMap<usize, (Queue, Event)>,
queues: BTreeMap<usize, Queue>,
) -> Result<()>;
/// Optionally deactivates this device. If the reset method is
@ -223,7 +223,7 @@ pub trait VirtioDevice: Send {
/// is an error.
fn virtio_wake(
&mut self,
_queues_state: Option<(GuestMemory, Interrupt, BTreeMap<usize, (Queue, Event)>)>,
_queues_state: Option<(GuestMemory, Interrupt, BTreeMap<usize, Queue>)>,
) -> anyhow::Result<()> {
anyhow::bail!("virtio_wake not implemented for {}", self.debug_label());
}

View file

@ -158,15 +158,15 @@ impl VirtioMmioDevice {
.enumerate()
.filter(|(_, (q, _))| q.ready())
.map(|(queue_index, (queue, evt))| {
let queue_evt = evt.try_clone().context("failed to clone queue_evt")?;
Ok((
queue_index,
(
queue.activate(&mem).context("failed to activate queue")?,
evt.try_clone().context("failed to clone queue_evt")?,
),
queue
.activate(&mem, queue_evt)
.context("failed to activate queue")?,
))
})
.collect::<anyhow::Result<BTreeMap<usize, (Queue, Event)>>>()?;
.collect::<anyhow::Result<BTreeMap<usize, Queue>>>()?;
if let Err(e) = self.device.activate(mem, interrupt, queues) {
error!("{} activate failed: {:#}", self.debug_label(), e);

View file

@ -246,7 +246,6 @@ impl VirtioPciCommonConfig {
mod tests {
use std::collections::BTreeMap;
use base::Event;
use base::RawDescriptor;
use vm_memory::GuestMemory;
@ -270,7 +269,7 @@ mod tests {
&mut self,
_mem: GuestMemory,
_interrupt: Interrupt,
_queues: BTreeMap<usize, (Queue, Event)>,
_queues: BTreeMap<usize, Queue>,
) -> anyhow::Result<()> {
Ok(())
}

View file

@ -546,17 +546,15 @@ impl VirtioPciDevice {
.context("failed to register ioevent")?;
evt.ioevent_registered = true;
}
let queue_evt = evt.event.try_clone().context("failed to clone queue_evt")?;
Ok((
queue_index,
(
queue
.activate(&self.mem)
.context("failed to activate queue")?,
evt.event.try_clone().context("failed to clone queue_evt")?,
),
queue
.activate(&self.mem, queue_evt)
.context("failed to activate queue")?,
))
})
.collect::<anyhow::Result<BTreeMap<usize, (Queue, Event)>>>()?;
.collect::<anyhow::Result<BTreeMap<usize, Queue>>>()?;
if let Some(iommu) = &self.iommu {
self.device.set_iommu(iommu);
@ -1130,28 +1128,13 @@ impl Suspendable for VirtioPciDevice {
.expect("virtio_wake failed, can't recover");
}
Some(SleepState::Active { activated_queues }) => {
let mut queues_to_wake = BTreeMap::new();
for (index, queue) in activated_queues.into_iter() {
queues_to_wake.insert(
index,
(
queue,
self.queue_evts[index]
.event
.try_clone()
.expect("failed to clone event"),
),
);
}
self.device
.virtio_wake(Some((
self.mem.clone(),
self.interrupt
.clone()
.expect("interrupt missing for already active queues"),
queues_to_wake,
activated_queues,
)))
.expect("virtio_wake failed, can't recover");
}
@ -1237,9 +1220,16 @@ impl Suspendable for VirtioPciDevice {
.queues
.get(index)
.with_context(|| format!("missing queue config for activated queue {index}"))?;
let queue_evt = self
.queue_evts
.get(index)
.with_context(|| format!("missing queue event for activated queue {index}"))?
.event
.try_clone()
.context("failed to clone queue event")?;
activated_queues.insert(
index,
Queue::restore(queue_config, queue_snapshot, &self.mem)?,
Queue::restore(queue_config, queue_snapshot, &self.mem, queue_evt)?,
);
}

View file

@ -191,9 +191,9 @@ impl Vsock {
interrupt: Interrupt,
mut queues: VsockQueues,
) -> anyhow::Result<()> {
let (rx_queue, rx_queue_evt) = queues.rx;
let (tx_queue, tx_queue_evt) = queues.tx;
let (event_queue, event_queue_evt) = queues.event;
let rx_queue = queues.rx;
let tx_queue = queues.tx;
let event_queue = queues.event;
let host_guid = self.host_guid.clone();
let guest_cid = self.guest_cid;
@ -201,15 +201,7 @@ impl Vsock {
"userspace_virtio_vsock",
move |kill_evt| {
let mut worker = Worker::new(mem, interrupt, host_guid, guest_cid);
let result = worker.run(
rx_queue,
tx_queue,
event_queue,
rx_queue_evt,
tx_queue_evt,
event_queue_evt,
kill_evt,
);
let result = worker.run(rx_queue, tx_queue, event_queue, kill_evt);
match result {
Err(e) => {
@ -254,7 +246,7 @@ impl VirtioDevice for Vsock {
&mut self,
mem: GuestMemory,
interrupt: Interrupt,
mut queues: BTreeMap<usize, (Queue, Event)>,
mut queues: BTreeMap<usize, Queue>,
) -> anyhow::Result<()> {
if queues.len() != QUEUE_SIZES.len() {
return Err(anyhow!(
@ -288,7 +280,7 @@ impl VirtioDevice for Vsock {
fn virtio_wake(
&mut self,
queues_state: Option<(GuestMemory, Interrupt, BTreeMap<usize, (Queue, Event)>)>,
queues_state: Option<(GuestMemory, Interrupt, BTreeMap<usize, Queue>)>,
) -> anyhow::Result<()> {
if let Some((mem, interrupt, queues)) = queues_state {
self.start_worker(
@ -1337,11 +1329,13 @@ impl Worker {
rx_queue: Queue,
tx_queue: Queue,
event_queue: Queue,
rx_queue_evt: Event,
tx_queue_evt: Event,
event_queue_evt: Event,
kill_evt: Event,
) -> Result<Option<PausedQueues>> {
let rx_queue_evt = rx_queue
.event()
.try_clone()
.map_err(VsockError::CloneDescriptor)?;
// Note that this mutex won't ever be contended because the HandleExecutor is single
// threaded. We need the mutex for compile time correctness, but technically it is not
// actually providing mandatory locking, at least not at the moment. If we later use a
@ -1369,8 +1363,14 @@ impl Worker {
let (send, recv) = mpsc::channel(CHANNEL_SIZE);
let tx_evt_async =
EventAsync::new(tx_queue_evt, &ex).expect("Failed to set up the tx queue event");
let tx_evt_async = EventAsync::new(
tx_queue
.event()
.try_clone()
.map_err(VsockError::CloneDescriptor)?,
&ex,
)
.expect("Failed to set up the tx queue event");
let stop_rx = create_stop_oneshot(&mut stop_queue_oneshots);
let tx_handler = self.process_tx_queue(tx_queue, tx_evt_async, send, stop_rx);
let tx_handler = tx_handler.fuse();
@ -1382,8 +1382,14 @@ impl Worker {
let packet_handler = packet_handler.fuse();
pin_mut!(packet_handler);
let event_evt_async = EventAsync::new(event_queue_evt, &ex)
.expect("Failed to set up the event queue event");
let event_evt_async = EventAsync::new(
event_queue
.event()
.try_clone()
.map_err(VsockError::CloneDescriptor)?,
&ex,
)
.expect("Failed to set up the event queue event");
let stop_rx = create_stop_oneshot(&mut stop_queue_oneshots);
let event_handler = self.process_event_queue(event_queue, event_evt_async, stop_rx);
let event_handler = event_handler.fuse();
@ -1457,14 +1463,14 @@ impl Worker {
/// Queues & events for the vsock device.
struct VsockQueues {
rx: (Queue, Event),
tx: (Queue, Event),
event: (Queue, Event),
rx: Queue,
tx: Queue,
event: Queue,
}
impl TryFrom<BTreeMap<usize, (Queue, Event)>> for VsockQueues {
impl TryFrom<BTreeMap<usize, Queue>> for VsockQueues {
type Error = anyhow::Error;
fn try_from(mut queues: BTreeMap<usize, (Queue, Event)>) -> result::Result<Self, Self::Error> {
fn try_from(mut queues: BTreeMap<usize, Queue>) -> result::Result<Self, Self::Error> {
if queues.len() < 3 {
anyhow::bail!(
"{} queues were found, but an activated vsock must have at 3 active queues.",

View file

@ -1690,14 +1690,13 @@ pub struct DescriptorsExhausted;
/// Handle incoming events and forward them to the VM over the input queue.
pub fn process_in_queue(
interrupt: &Interrupt,
in_queue: &Rc<RefCell<Queue>>,
in_queue: &mut Queue,
state: &mut WlState,
) -> ::std::result::Result<(), DescriptorsExhausted> {
state.process_wait_context();
let mut needs_interrupt = false;
let mut exhausted_queue = false;
let mut in_queue = in_queue.borrow_mut();
loop {
let mut desc = if let Some(d) = in_queue.peek() {
d
@ -1740,13 +1739,8 @@ pub fn process_in_queue(
}
/// Handle messages from the output queue and forward them to the display sever, if necessary.
pub fn process_out_queue(
interrupt: &Interrupt,
out_queue: &Rc<RefCell<Queue>>,
state: &mut WlState,
) {
pub fn process_out_queue(interrupt: &Interrupt, out_queue: &mut Queue, state: &mut WlState) {
let mut needs_interrupt = false;
let mut out_queue = out_queue.borrow_mut();
while let Some(mut desc) = out_queue.pop() {
let resp = match state.execute(&mut desc.reader) {
Ok(r) => r,
@ -1772,18 +1766,16 @@ pub fn process_out_queue(
struct Worker {
interrupt: Interrupt,
in_queue: Rc<RefCell<Queue>>,
in_queue_evt: Event,
out_queue: Rc<RefCell<Queue>>,
out_queue_evt: Event,
in_queue: Queue,
out_queue: Queue,
state: WlState,
}
impl Worker {
fn new(
interrupt: Interrupt,
in_queue: (Queue, Event),
out_queue: (Queue, Event),
in_queue: Queue,
out_queue: Queue,
wayland_paths: BTreeMap<String, PathBuf>,
mapper: Box<dyn SharedMemoryMapper>,
use_transition_flags: bool,
@ -1794,10 +1786,8 @@ impl Worker {
) -> Worker {
Worker {
interrupt,
in_queue: Rc::new(RefCell::new(in_queue.0)),
in_queue_evt: in_queue.1,
out_queue: Rc::new(RefCell::new(out_queue.0)),
out_queue_evt: out_queue.1,
in_queue,
out_queue,
state: WlState::new(
wayland_paths,
mapper,
@ -1822,8 +1812,8 @@ impl Worker {
}
let wait_ctx: WaitContext<Token> = WaitContext::build_with(&[
(&self.in_queue_evt, Token::InQueue),
(&self.out_queue_evt, Token::OutQueue),
(self.in_queue.event(), Token::InQueue),
(self.out_queue.event(), Token::OutQueue),
(&kill_evt, Token::Kill),
(&self.state.wait_ctx, Token::State),
])
@ -1848,7 +1838,7 @@ impl Worker {
for event in &events {
match event.token {
Token::InQueue => {
let _ = self.in_queue_evt.wait();
let _ = self.in_queue.event().wait();
if !watching_state_ctx {
if let Err(e) =
wait_ctx.modify(&self.state.wait_ctx, EventType::Read, Token::State)
@ -1860,13 +1850,13 @@ impl Worker {
}
}
Token::OutQueue => {
let _ = self.out_queue_evt.wait();
process_out_queue(&self.interrupt, &self.out_queue, &mut self.state);
let _ = self.out_queue.event().wait();
process_out_queue(&self.interrupt, &mut self.out_queue, &mut self.state);
}
Token::Kill => break 'wait,
Token::State => {
if let Err(DescriptorsExhausted) =
process_in_queue(&self.interrupt, &self.in_queue, &mut self.state)
process_in_queue(&self.interrupt, &mut self.in_queue, &mut self.state)
{
if let Err(e) =
wait_ctx.modify(&self.state.wait_ctx, EventType::None, Token::State)
@ -1886,15 +1876,9 @@ impl Worker {
}
}
}
let in_queue = match Rc::try_unwrap(self.in_queue) {
Ok(queue_cell) => queue_cell.into_inner(),
Err(_) => panic!("failed to recover queue from worker"),
};
let out_queue = match Rc::try_unwrap(self.out_queue) {
Ok(queue_cell) => queue_cell.into_inner(),
Err(_) => panic!("failed to recover queue from worker"),
};
let in_queue = self.in_queue;
let out_queue = self.out_queue;
Ok(vec![in_queue, out_queue])
}
@ -1982,7 +1966,7 @@ impl VirtioDevice for Wl {
&mut self,
_mem: GuestMemory,
interrupt: Interrupt,
mut queues: BTreeMap<usize, (Queue, Event)>,
mut queues: BTreeMap<usize, Queue>,
) -> anyhow::Result<()> {
if queues.len() != QUEUE_SIZES.len() {
return Err(anyhow!(
@ -2055,7 +2039,7 @@ impl VirtioDevice for Wl {
fn virtio_wake(
&mut self,
device_state: Option<(GuestMemory, Interrupt, BTreeMap<usize, (Queue, Event)>)>,
device_state: Option<(GuestMemory, Interrupt, BTreeMap<usize, Queue>)>,
) -> anyhow::Result<()> {
match device_state {
None => Ok(()),

View file

@ -81,9 +81,10 @@ fuzz_target!(|bytes| {
let mut q = QueueConfig::new(QUEUE_SIZE, 0);
q.set_size(QUEUE_SIZE / 2);
q.set_ready(true);
let q = q.activate(&mem).expect("QueueConfig::activate");
let queue_evt = Event::new().unwrap();
let q = q
.activate(&mem, Event::new().unwrap())
.expect("QueueConfig::activate");
let queue_evt = q.event().try_clone().unwrap();
let features = base_features(ProtectionType::Unprotected);
@ -112,7 +113,7 @@ fuzz_target!(|bytes| {
None, // msix_config
0xFFFF, // VIRTIO_MSI_NO_VECTOR
),
BTreeMap::from([(0, (q, queue_evt.try_clone().unwrap()))]),
BTreeMap::from([(0, q)]),
)
.unwrap();

View file

@ -9,6 +9,7 @@ use std::io::Read;
use std::io::Write;
use std::mem::size_of;
use base::Event;
use crosvm_fuzz::fuzz_target;
use crosvm_fuzz::rand::FuzzRng;
use devices::virtio::QueueConfig;
@ -74,7 +75,7 @@ fuzz_target!(|data: &[u8]| {
q.set_ready(true);
GUEST_MEM.with(|mem| {
let mut q = if let Ok(q) = q.activate(mem) {
let mut q = if let Ok(q) = q.activate(mem, Event::new().unwrap()) {
q
} else {
return;