From 084aa95af1fe8feae7a940885cd5549167d68ad2 Mon Sep 17 00:00:00 2001 From: Noah Gold Date: Tue, 12 Apr 2022 21:36:23 -0700 Subject: [PATCH] cros_async: make AsyncEvent use base::Event EventFd is the original sys_util era Unix-specific Event. We need to use base::Event, which is the cross platform event that works on Windows. Thanks to acourbot@ for suggesting the splits in this series. BUG=b:213147081 TEST=see final CL in series. Change-Id: I174313cb544c5fc3768810365a42e6eaac1ca91a Reviewed-on: https://chromium-review.googlesource.com/c/chromiumos/platform/crosvm/+/3583611 Reviewed-by: Alexandre Courbot Tested-by: kokoro Commit-Queue: Noah Gold --- base/src/event.rs | 18 +++++--- cros_async/src/event.rs | 5 ++- cros_async/src/io_ext.rs | 7 ++- devices/src/virtio/async_utils.rs | 4 +- devices/src/virtio/balloon.rs | 12 +++--- devices/src/virtio/block/asynchronous.rs | 43 ++++++++++--------- devices/src/virtio/iommu.rs | 2 +- devices/src/virtio/pmem.rs | 2 +- devices/src/virtio/snd/cras_backend/mod.rs | 2 +- devices/src/virtio/vhost/user/device/block.rs | 2 +- .../src/virtio/vhost/user/device/console.rs | 4 +- .../src/virtio/vhost/user/device/cras_snd.rs | 2 +- devices/src/virtio/vhost/user/device/fs.rs | 2 +- devices/src/virtio/vhost/user/device/gpu.rs | 2 +- .../src/virtio/vhost/user/device/unix/net.rs | 2 +- devices/src/virtio/vhost/user/device/vsock.rs | 2 +- .../virtio/vhost/user/device/vvu/device.rs | 4 +- .../virtio/vhost/user/device/windows/net.rs | 2 +- devices/src/virtio/vhost/user/device/wl.rs | 2 +- 19 files changed, 62 insertions(+), 57 deletions(-) diff --git a/base/src/event.rs b/base/src/event.rs index f3f7ac73e6..cefeb77c83 100644 --- a/base/src/event.rs +++ b/base/src/event.rs @@ -2,13 +2,7 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. -use std::{ - mem, - ops::Deref, - os::unix::io::{AsRawFd, FromRawFd, IntoRawFd}, - ptr, - time::Duration, -}; +use std::{mem, ops::Deref, ptr, time::Duration}; use serde::{Deserialize, Serialize}; @@ -16,6 +10,9 @@ use crate::descriptor::{AsRawDescriptor, FromRawDescriptor, IntoRawDescriptor}; pub use crate::platform::EventReadResult; use crate::{generate_scoped_event, platform::EventFd, RawDescriptor, Result}; +#[cfg(unix)] +use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; + /// See [EventFd](crate::platform::EventFd) for struct- and method-level /// documentation. #[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] @@ -61,4 +58,11 @@ impl IntoRawDescriptor for Event { } } +#[cfg(unix)] +impl AsRawFd for Event { + fn as_raw_fd(&self) -> RawFd { + self.0.as_raw_fd() + } +} + generate_scoped_event!(Event); diff --git a/cros_async/src/event.rs b/cros_async/src/event.rs index 6f65d0cbcd..703205f8f9 100644 --- a/cros_async/src/event.rs +++ b/cros_async/src/event.rs @@ -2,9 +2,8 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. -use base::EventFd; - use super::{AsyncResult, Executor, IntoAsync, IoSourceExt}; +use base::Event as EventFd; /// An async version of `base::EventFd`. pub struct EventAsync { @@ -83,3 +82,5 @@ mod tests { assert_eq!(val, 0xaa); } } +// Safe because an `EventFd` is used underneath, which is safe to pass between threads. +unsafe impl Send for EventAsync {} diff --git a/cros_async/src/io_ext.rs b/cros_async/src/io_ext.rs index 6b3e6e827e..d48ad10d20 100644 --- a/cros_async/src/io_ext.rs +++ b/cros_async/src/io_ext.rs @@ -175,7 +175,6 @@ mod tests { task::{Context, Poll, Waker}, thread, }; - use sync::Mutex; use super::{ @@ -187,6 +186,7 @@ mod tests { }, *, }; + use base::Event; struct State { should_quit: bool, @@ -434,20 +434,19 @@ mod tests { if !use_uring() { return; } - use base::EventFd; async fn go(source: Box>) -> u64 { source.read_u64().await.unwrap() } - let eventfd = EventFd::new().unwrap(); + let eventfd = Event::new().unwrap(); eventfd.write(0x55).unwrap(); let ex = URingExecutor::new().unwrap(); let uring_source = async_uring_from(eventfd, &ex).unwrap(); let val = ex.run_until(go(uring_source)).unwrap(); assert_eq!(val, 0x55); - let eventfd = EventFd::new().unwrap(); + let eventfd = Event::new().unwrap(); eventfd.write(0xaa).unwrap(); let poll_ex = FdExecutor::new().unwrap(); let poll_source = async_poll_from(eventfd, &poll_ex).unwrap(); diff --git a/devices/src/virtio/async_utils.rs b/devices/src/virtio/async_utils.rs index 9f2f2f62ae..2250a4ee66 100644 --- a/devices/src/virtio/async_utils.rs +++ b/devices/src/virtio/async_utils.rs @@ -16,7 +16,7 @@ use super::{Interrupt, SignalableInterrupt}; /// Async task that waits for a signal from `event`. Once this event is readable, exit. Exiting /// this future will cause the main loop to break and the worker thread to exit. pub async fn await_and_exit(ex: &Executor, event: Event) -> Result<()> { - let event_async = EventAsync::new(event.0, ex).context("failed to create EventAsync")?; + let event_async = EventAsync::new(event, ex).context("failed to create EventAsync")?; let _ = event_async.next_val().await; Ok(()) } @@ -30,7 +30,7 @@ pub async fn handle_irq_resample(ex: &Executor, interrupt: Rc let resample_evt = resample_evt .try_clone() .context("resample_evt.try_clone() failed")?; - Some(EventAsync::new(resample_evt.0, ex).context("failed to create async resample event")?) + Some(EventAsync::new(resample_evt, ex).context("failed to create async resample event")?) } else { None }; diff --git a/devices/src/virtio/balloon.rs b/devices/src/virtio/balloon.rs index 82645b45c1..3ff0078c5d 100644 --- a/devices/src/virtio/balloon.rs +++ b/devices/src/virtio/balloon.rs @@ -465,8 +465,8 @@ fn run_worker( // We need a block to release all references to command_tube at the end before returning it. { // The first queue is used for inflate messages - let inflate_event = EventAsync::new(queue_evts.remove(0).0, &ex) - .expect("failed to set up the inflate event"); + let inflate_event = + EventAsync::new(queue_evts.remove(0), &ex).expect("failed to set up the inflate event"); let inflate = handle_queue( &mem, queues.remove(0), @@ -482,8 +482,8 @@ fn run_worker( pin_mut!(inflate); // The second queue is used for deflate messages - let deflate_event = EventAsync::new(queue_evts.remove(0).0, &ex) - .expect("failed to set up the deflate event"); + let deflate_event = + EventAsync::new(queue_evts.remove(0), &ex).expect("failed to set up the deflate event"); let deflate = handle_queue( &mem, queues.remove(0), @@ -499,7 +499,7 @@ fn run_worker( // stats results that were queued during an error condition. let (stats_tx, stats_rx) = mpsc::channel::(1); let stats_event = - EventAsync::new(queue_evts.remove(0).0, &ex).expect("failed to set up the stats event"); + EventAsync::new(queue_evts.remove(0), &ex).expect("failed to set up the stats event"); let stats = handle_stats_queue( &mem, queues.remove(0), @@ -525,7 +525,7 @@ fn run_worker( pin_mut!(kill); let res = if !queues.is_empty() { - let events_event = EventAsync::new(queue_evts.remove(0).0, &ex) + let events_event = EventAsync::new(queue_evts.remove(0), &ex) .expect("failed to set up the events event"); let events = handle_events_queue( &mem, diff --git a/devices/src/virtio/block/asynchronous.rs b/devices/src/virtio/block/asynchronous.rs index ffb6bd38b3..e0002cf161 100644 --- a/devices/src/virtio/block/asynchronous.rs +++ b/devices/src/virtio/block/asynchronous.rs @@ -404,27 +404,28 @@ fn run_worker( .expect("Failed to create an async timer"), )); - let queue_handlers = - queues - .into_iter() - .map(|q| Rc::new(RefCell::new(q))) - .zip(queue_evts.into_iter().map(|e| { - EventAsync::new(e.0, &ex).expect("Failed to create async event for queue") - })) - .map(|(queue, event)| { - handle_queue( - ex.clone(), - mem.clone(), - Rc::clone(disk_state), - Rc::clone(&queue), - event, - Rc::clone(&interrupt), - Rc::clone(&flush_timer), - Rc::clone(&flush_timer_armed), - ) - }) - .collect::>() - .into_future(); + let queue_handlers = queues + .into_iter() + .map(|q| Rc::new(RefCell::new(q))) + .zip( + queue_evts + .into_iter() + .map(|e| EventAsync::new(e, &ex).expect("Failed to create async event for queue")), + ) + .map(|(queue, event)| { + handle_queue( + ex.clone(), + mem.clone(), + Rc::clone(disk_state), + Rc::clone(&queue), + event, + Rc::clone(&interrupt), + Rc::clone(&flush_timer), + Rc::clone(&flush_timer_armed), + ) + }) + .collect::>() + .into_future(); // Flushes the disk periodically. let flush_timer = TimerAsync::new(timer.0, &ex).expect("Failed to create an async timer"); diff --git a/devices/src/virtio/iommu.rs b/devices/src/virtio/iommu.rs index 16b31821aa..003a263fb8 100644 --- a/devices/src/virtio/iommu.rs +++ b/devices/src/virtio/iommu.rs @@ -600,7 +600,7 @@ impl Worker { let mut evts_async: Vec = queue_evts .into_iter() - .map(|e| EventAsync::new(e.0, &ex).expect("Failed to create async event for queue")) + .map(|e| EventAsync::new(e, &ex).expect("Failed to create async event for queue")) .collect(); let interrupt = Rc::new(RefCell::new(interrupt)); let interrupt_ref = &*interrupt.borrow(); diff --git a/devices/src/virtio/pmem.rs b/devices/src/virtio/pmem.rs index 0301bd06c9..42734ff3ae 100644 --- a/devices/src/virtio/pmem.rs +++ b/devices/src/virtio/pmem.rs @@ -189,7 +189,7 @@ fn run_worker( let ex = Executor::new().unwrap(); - let queue_evt = EventAsync::new(queue_evt.0, &ex).expect("failed to set up the queue event"); + let queue_evt = EventAsync::new(queue_evt, &ex).expect("failed to set up the queue event"); // Process requests from the virtio queue. let queue_fut = handle_queue( diff --git a/devices/src/virtio/snd/cras_backend/mod.rs b/devices/src/virtio/snd/cras_backend/mod.rs index 3efdb4f319..c5f99bad92 100644 --- a/devices/src/virtio/snd/cras_backend/mod.rs +++ b/devices/src/virtio/snd/cras_backend/mod.rs @@ -688,7 +688,7 @@ fn run_worker( let mut evts_async: Vec = queue_evts .into_iter() - .map(|e| EventAsync::new(e.0, &ex).expect("Failed to create async event for queue")) + .map(|e| EventAsync::new(e, &ex).expect("Failed to create async event for queue")) .collect(); let ctrl_queue_evt = evts_async.remove(0); diff --git a/devices/src/virtio/vhost/user/device/block.rs b/devices/src/virtio/vhost/user/device/block.rs index 262e65610a..d7eb038f05 100644 --- a/devices/src/virtio/vhost/user/device/block.rs +++ b/devices/src/virtio/vhost/user/device/block.rs @@ -210,7 +210,7 @@ impl VhostUserBackend for BlockBackend { // Enable any virtqueue features that were negotiated (like VIRTIO_RING_F_EVENT_IDX). queue.ack_features(self.acked_features); - let kick_evt = EventAsync::new(kick_evt.0, &self.ex) + let kick_evt = EventAsync::new(kick_evt, &self.ex) .context("failed to create EventAsync for kick_evt")?; let (handle, registration) = AbortHandle::new_pair(); diff --git a/devices/src/virtio/vhost/user/device/console.rs b/devices/src/virtio/vhost/user/device/console.rs index 50d1f11a53..ec8cd8ec91 100644 --- a/devices/src/virtio/vhost/user/device/console.rs +++ b/devices/src/virtio/vhost/user/device/console.rs @@ -188,7 +188,7 @@ impl VhostUserBackend for ConsoleBackend { // Enable any virtqueue features that were negotiated (like VIRTIO_RING_F_EVENT_IDX). queue.ack_features(self.acked_features); - let kick_evt = EventAsync::new(kick_evt.0, &self.ex) + let kick_evt = EventAsync::new(kick_evt, &self.ex) .context("Failed to create EventAsync for kick_evt")?; let (handle, registration) = AbortHandle::new_pair(); match idx { @@ -214,7 +214,7 @@ impl VhostUserBackend for ConsoleBackend { .ok_or_else(|| anyhow!("input channel unavailable"))?; // Create the async 'in' event so we can await on it. - let in_avail_async_evt = EventAsync::new(in_avail_evt.0, &self.ex) + let in_avail_async_evt = EventAsync::new(in_avail_evt, &self.ex) .context("Failed to create EventAsync for in_avail_evt")?; self.ex diff --git a/devices/src/virtio/vhost/user/device/cras_snd.rs b/devices/src/virtio/vhost/user/device/cras_snd.rs index 57df226055..36381b04fc 100644 --- a/devices/src/virtio/vhost/user/device/cras_snd.rs +++ b/devices/src/virtio/vhost/user/device/cras_snd.rs @@ -158,7 +158,7 @@ impl VhostUserBackend for CrasSndBackend { queue.ack_features(self.acked_features); let kick_evt = - EventAsync::new(kick_evt.0, ex).context("failed to create EventAsync for kick_evt")?; + EventAsync::new(kick_evt, ex).context("failed to create EventAsync for kick_evt")?; let (handle, registration) = AbortHandle::new_pair(); match idx { 0 => { diff --git a/devices/src/virtio/vhost/user/device/fs.rs b/devices/src/virtio/vhost/user/device/fs.rs index 8b996c2ab5..a310ee7f58 100644 --- a/devices/src/virtio/vhost/user/device/fs.rs +++ b/devices/src/virtio/vhost/user/device/fs.rs @@ -228,7 +228,7 @@ impl VhostUserBackend for FsBackend { // Enable any virtqueue features that were negotiated (like VIRTIO_RING_F_EVENT_IDX). queue.ack_features(self.acked_features); - let kick_evt = EventAsync::new(kick_evt.0, &self.ex) + let kick_evt = EventAsync::new(kick_evt, &self.ex) .context("failed to create EventAsync for kick_evt")?; let (handle, registration) = AbortHandle::new_pair(); let (_, fs_device_tube) = Tube::pair()?; diff --git a/devices/src/virtio/vhost/user/device/gpu.rs b/devices/src/virtio/vhost/user/device/gpu.rs index 865d2cb698..c7b6c2fdb1 100644 --- a/devices/src/virtio/vhost/user/device/gpu.rs +++ b/devices/src/virtio/vhost/user/device/gpu.rs @@ -253,7 +253,7 @@ impl VhostUserBackend for GpuBackend { _ => bail!("attempted to start unknown queue: {}", idx), } - let kick_evt = EventAsync::new(kick_evt.0, &self.ex) + let kick_evt = EventAsync::new(kick_evt, &self.ex) .context("failed to create EventAsync for kick_evt")?; let reader = SharedReader { diff --git a/devices/src/virtio/vhost/user/device/unix/net.rs b/devices/src/virtio/vhost/user/device/unix/net.rs index aca8573c95..8df374d451 100644 --- a/devices/src/virtio/vhost/user/device/unix/net.rs +++ b/devices/src/virtio/vhost/user/device/unix/net.rs @@ -169,7 +169,7 @@ pub(super) fn start_queue( let ex = ex.get().expect("Executor not initialized"); let kick_evt = - EventAsync::new(kick_evt.0, ex).context("failed to create EventAsync for kick_evt")?; + EventAsync::new(kick_evt, ex).context("failed to create EventAsync for kick_evt")?; let tap = backend .tap .try_clone() diff --git a/devices/src/virtio/vhost/user/device/vsock.rs b/devices/src/virtio/vhost/user/device/vsock.rs index 1d06a22f79..cd22bfb4b4 100644 --- a/devices/src/virtio/vhost/user/device/vsock.rs +++ b/devices/src/virtio/vhost/user/device/vsock.rs @@ -378,7 +378,7 @@ impl VhostUserSlaveReqHandlerMut for VsockBackend { let kernel_evt = Event::new().map_err(|_| Error::SlaveInternalError)?; let task_evt = EventAsync::new( - kernel_evt.try_clone().expect("failed to clone event").0, + kernel_evt.try_clone().expect("failed to clone event"), &self.ex, ) .map_err(|_| Error::SlaveInternalError)?; diff --git a/devices/src/virtio/vhost/user/device/vvu/device.rs b/devices/src/virtio/vhost/user/device/vvu/device.rs index e28f5f8e94..dfc4729602 100644 --- a/devices/src/virtio/vhost/user/device/vvu/device.rs +++ b/devices/src/virtio/vhost/user/device/vvu/device.rs @@ -65,11 +65,11 @@ fn run_worker( tx_queue: Arc>, tx_irq: Event, ) -> Result<()> { - let rx_irq = EventAsync::new(rx_irq.0, &ex).context("failed to create async event")?; + let rx_irq = EventAsync::new(rx_irq, &ex).context("failed to create async event")?; let rxq = process_rxq(rx_irq, rx_queue, rx_sender, rx_evt); pin_mut!(rxq); - let tx_irq = EventAsync::new(tx_irq.0, &ex).context("failed to create async event")?; + let tx_irq = EventAsync::new(tx_irq, &ex).context("failed to create async event")?; let txq = process_txq(tx_irq, Arc::clone(&tx_queue)); pin_mut!(txq); diff --git a/devices/src/virtio/vhost/user/device/windows/net.rs b/devices/src/virtio/vhost/user/device/windows/net.rs index 67b5395cbc..e7205b0dba 100644 --- a/devices/src/virtio/vhost/user/device/windows/net.rs +++ b/devices/src/virtio/vhost/user/device/windows/net.rs @@ -131,7 +131,7 @@ pub(super) fn start_queue( let ex = ex.get().expect("Executor not initialized"); let kick_evt = - EventAsync::new(kick_evt.0, ex).context("failed to create EventAsync for kick_evt")?; + EventAsync::new(kick_evt, ex).context("failed to create EventAsync for kick_evt")?; let tap = backend .tap .try_clone() diff --git a/devices/src/virtio/vhost/user/device/wl.rs b/devices/src/virtio/vhost/user/device/wl.rs index 6b7fc32fce..56afdd15a9 100644 --- a/devices/src/virtio/vhost/user/device/wl.rs +++ b/devices/src/virtio/vhost/user/device/wl.rs @@ -178,7 +178,7 @@ impl VhostUserBackend for WlBackend { // Enable any virtqueue features that were negotiated (like VIRTIO_RING_F_EVENT_IDX). queue.ack_features(self.acked_features); - let kick_evt = EventAsync::new(kick_evt.0, &self.ex) + let kick_evt = EventAsync::new(kick_evt, &self.ex) .context("failed to create EventAsync for kick_evt")?; // We use this de-structuring let binding to separate borrows so that the compiler doesn't