cros_async: make AsyncEvent use base::Event

EventFd is the original sys_util era Unix-specific Event. We need to use
base::Event, which is the cross platform event that works on Windows.

Thanks to acourbot@ for suggesting the splits in this series.

BUG=b:213147081
TEST=see final CL in series.

Change-Id: I174313cb544c5fc3768810365a42e6eaac1ca91a
Reviewed-on: https://chromium-review.googlesource.com/c/chromiumos/platform/crosvm/+/3583611
Reviewed-by: Alexandre Courbot <acourbot@chromium.org>
Tested-by: kokoro <noreply+kokoro@google.com>
Commit-Queue: Noah Gold <nkgold@google.com>
This commit is contained in:
Noah Gold 2022-04-12 21:36:23 -07:00 committed by Chromeos LUCI
parent ab11843e86
commit 084aa95af1
19 changed files with 62 additions and 57 deletions

View file

@ -2,13 +2,7 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use std::{
mem,
ops::Deref,
os::unix::io::{AsRawFd, FromRawFd, IntoRawFd},
ptr,
time::Duration,
};
use std::{mem, ops::Deref, ptr, time::Duration};
use serde::{Deserialize, Serialize};
@ -16,6 +10,9 @@ use crate::descriptor::{AsRawDescriptor, FromRawDescriptor, IntoRawDescriptor};
pub use crate::platform::EventReadResult;
use crate::{generate_scoped_event, platform::EventFd, RawDescriptor, Result};
#[cfg(unix)]
use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
/// See [EventFd](crate::platform::EventFd) for struct- and method-level
/// documentation.
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
@ -61,4 +58,11 @@ impl IntoRawDescriptor for Event {
}
}
#[cfg(unix)]
impl AsRawFd for Event {
fn as_raw_fd(&self) -> RawFd {
self.0.as_raw_fd()
}
}
generate_scoped_event!(Event);

View file

@ -2,9 +2,8 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use base::EventFd;
use super::{AsyncResult, Executor, IntoAsync, IoSourceExt};
use base::Event as EventFd;
/// An async version of `base::EventFd`.
pub struct EventAsync {
@ -83,3 +82,5 @@ mod tests {
assert_eq!(val, 0xaa);
}
}
// Safe because an `EventFd` is used underneath, which is safe to pass between threads.
unsafe impl Send for EventAsync {}

View file

@ -175,7 +175,6 @@ mod tests {
task::{Context, Poll, Waker},
thread,
};
use sync::Mutex;
use super::{
@ -187,6 +186,7 @@ mod tests {
},
*,
};
use base::Event;
struct State {
should_quit: bool,
@ -434,20 +434,19 @@ mod tests {
if !use_uring() {
return;
}
use base::EventFd;
async fn go<F: AsRawFd>(source: Box<dyn IoSourceExt<F>>) -> u64 {
source.read_u64().await.unwrap()
}
let eventfd = EventFd::new().unwrap();
let eventfd = Event::new().unwrap();
eventfd.write(0x55).unwrap();
let ex = URingExecutor::new().unwrap();
let uring_source = async_uring_from(eventfd, &ex).unwrap();
let val = ex.run_until(go(uring_source)).unwrap();
assert_eq!(val, 0x55);
let eventfd = EventFd::new().unwrap();
let eventfd = Event::new().unwrap();
eventfd.write(0xaa).unwrap();
let poll_ex = FdExecutor::new().unwrap();
let poll_source = async_poll_from(eventfd, &poll_ex).unwrap();

View file

@ -16,7 +16,7 @@ use super::{Interrupt, SignalableInterrupt};
/// Async task that waits for a signal from `event`. Once this event is readable, exit. Exiting
/// this future will cause the main loop to break and the worker thread to exit.
pub async fn await_and_exit(ex: &Executor, event: Event) -> Result<()> {
let event_async = EventAsync::new(event.0, ex).context("failed to create EventAsync")?;
let event_async = EventAsync::new(event, ex).context("failed to create EventAsync")?;
let _ = event_async.next_val().await;
Ok(())
}
@ -30,7 +30,7 @@ pub async fn handle_irq_resample(ex: &Executor, interrupt: Rc<RefCell<Interrupt>
let resample_evt = resample_evt
.try_clone()
.context("resample_evt.try_clone() failed")?;
Some(EventAsync::new(resample_evt.0, ex).context("failed to create async resample event")?)
Some(EventAsync::new(resample_evt, ex).context("failed to create async resample event")?)
} else {
None
};

View file

@ -465,8 +465,8 @@ fn run_worker(
// We need a block to release all references to command_tube at the end before returning it.
{
// The first queue is used for inflate messages
let inflate_event = EventAsync::new(queue_evts.remove(0).0, &ex)
.expect("failed to set up the inflate event");
let inflate_event =
EventAsync::new(queue_evts.remove(0), &ex).expect("failed to set up the inflate event");
let inflate = handle_queue(
&mem,
queues.remove(0),
@ -482,8 +482,8 @@ fn run_worker(
pin_mut!(inflate);
// The second queue is used for deflate messages
let deflate_event = EventAsync::new(queue_evts.remove(0).0, &ex)
.expect("failed to set up the deflate event");
let deflate_event =
EventAsync::new(queue_evts.remove(0), &ex).expect("failed to set up the deflate event");
let deflate = handle_queue(
&mem,
queues.remove(0),
@ -499,7 +499,7 @@ fn run_worker(
// stats results that were queued during an error condition.
let (stats_tx, stats_rx) = mpsc::channel::<u64>(1);
let stats_event =
EventAsync::new(queue_evts.remove(0).0, &ex).expect("failed to set up the stats event");
EventAsync::new(queue_evts.remove(0), &ex).expect("failed to set up the stats event");
let stats = handle_stats_queue(
&mem,
queues.remove(0),
@ -525,7 +525,7 @@ fn run_worker(
pin_mut!(kill);
let res = if !queues.is_empty() {
let events_event = EventAsync::new(queue_evts.remove(0).0, &ex)
let events_event = EventAsync::new(queue_evts.remove(0), &ex)
.expect("failed to set up the events event");
let events = handle_events_queue(
&mem,

View file

@ -404,27 +404,28 @@ fn run_worker(
.expect("Failed to create an async timer"),
));
let queue_handlers =
queues
.into_iter()
.map(|q| Rc::new(RefCell::new(q)))
.zip(queue_evts.into_iter().map(|e| {
EventAsync::new(e.0, &ex).expect("Failed to create async event for queue")
}))
.map(|(queue, event)| {
handle_queue(
ex.clone(),
mem.clone(),
Rc::clone(disk_state),
Rc::clone(&queue),
event,
Rc::clone(&interrupt),
Rc::clone(&flush_timer),
Rc::clone(&flush_timer_armed),
)
})
.collect::<FuturesUnordered<_>>()
.into_future();
let queue_handlers = queues
.into_iter()
.map(|q| Rc::new(RefCell::new(q)))
.zip(
queue_evts
.into_iter()
.map(|e| EventAsync::new(e, &ex).expect("Failed to create async event for queue")),
)
.map(|(queue, event)| {
handle_queue(
ex.clone(),
mem.clone(),
Rc::clone(disk_state),
Rc::clone(&queue),
event,
Rc::clone(&interrupt),
Rc::clone(&flush_timer),
Rc::clone(&flush_timer_armed),
)
})
.collect::<FuturesUnordered<_>>()
.into_future();
// Flushes the disk periodically.
let flush_timer = TimerAsync::new(timer.0, &ex).expect("Failed to create an async timer");

View file

@ -600,7 +600,7 @@ impl Worker {
let mut evts_async: Vec<EventAsync> = queue_evts
.into_iter()
.map(|e| EventAsync::new(e.0, &ex).expect("Failed to create async event for queue"))
.map(|e| EventAsync::new(e, &ex).expect("Failed to create async event for queue"))
.collect();
let interrupt = Rc::new(RefCell::new(interrupt));
let interrupt_ref = &*interrupt.borrow();

View file

@ -189,7 +189,7 @@ fn run_worker(
let ex = Executor::new().unwrap();
let queue_evt = EventAsync::new(queue_evt.0, &ex).expect("failed to set up the queue event");
let queue_evt = EventAsync::new(queue_evt, &ex).expect("failed to set up the queue event");
// Process requests from the virtio queue.
let queue_fut = handle_queue(

View file

@ -688,7 +688,7 @@ fn run_worker(
let mut evts_async: Vec<EventAsync> = queue_evts
.into_iter()
.map(|e| EventAsync::new(e.0, &ex).expect("Failed to create async event for queue"))
.map(|e| EventAsync::new(e, &ex).expect("Failed to create async event for queue"))
.collect();
let ctrl_queue_evt = evts_async.remove(0);

View file

@ -210,7 +210,7 @@ impl VhostUserBackend for BlockBackend {
// Enable any virtqueue features that were negotiated (like VIRTIO_RING_F_EVENT_IDX).
queue.ack_features(self.acked_features);
let kick_evt = EventAsync::new(kick_evt.0, &self.ex)
let kick_evt = EventAsync::new(kick_evt, &self.ex)
.context("failed to create EventAsync for kick_evt")?;
let (handle, registration) = AbortHandle::new_pair();

View file

@ -188,7 +188,7 @@ impl VhostUserBackend for ConsoleBackend {
// Enable any virtqueue features that were negotiated (like VIRTIO_RING_F_EVENT_IDX).
queue.ack_features(self.acked_features);
let kick_evt = EventAsync::new(kick_evt.0, &self.ex)
let kick_evt = EventAsync::new(kick_evt, &self.ex)
.context("Failed to create EventAsync for kick_evt")?;
let (handle, registration) = AbortHandle::new_pair();
match idx {
@ -214,7 +214,7 @@ impl VhostUserBackend for ConsoleBackend {
.ok_or_else(|| anyhow!("input channel unavailable"))?;
// Create the async 'in' event so we can await on it.
let in_avail_async_evt = EventAsync::new(in_avail_evt.0, &self.ex)
let in_avail_async_evt = EventAsync::new(in_avail_evt, &self.ex)
.context("Failed to create EventAsync for in_avail_evt")?;
self.ex

View file

@ -158,7 +158,7 @@ impl VhostUserBackend for CrasSndBackend {
queue.ack_features(self.acked_features);
let kick_evt =
EventAsync::new(kick_evt.0, ex).context("failed to create EventAsync for kick_evt")?;
EventAsync::new(kick_evt, ex).context("failed to create EventAsync for kick_evt")?;
let (handle, registration) = AbortHandle::new_pair();
match idx {
0 => {

View file

@ -228,7 +228,7 @@ impl VhostUserBackend for FsBackend {
// Enable any virtqueue features that were negotiated (like VIRTIO_RING_F_EVENT_IDX).
queue.ack_features(self.acked_features);
let kick_evt = EventAsync::new(kick_evt.0, &self.ex)
let kick_evt = EventAsync::new(kick_evt, &self.ex)
.context("failed to create EventAsync for kick_evt")?;
let (handle, registration) = AbortHandle::new_pair();
let (_, fs_device_tube) = Tube::pair()?;

View file

@ -253,7 +253,7 @@ impl VhostUserBackend for GpuBackend {
_ => bail!("attempted to start unknown queue: {}", idx),
}
let kick_evt = EventAsync::new(kick_evt.0, &self.ex)
let kick_evt = EventAsync::new(kick_evt, &self.ex)
.context("failed to create EventAsync for kick_evt")?;
let reader = SharedReader {

View file

@ -169,7 +169,7 @@ pub(super) fn start_queue<T: 'static + IntoAsync + TapT>(
let ex = ex.get().expect("Executor not initialized");
let kick_evt =
EventAsync::new(kick_evt.0, ex).context("failed to create EventAsync for kick_evt")?;
EventAsync::new(kick_evt, ex).context("failed to create EventAsync for kick_evt")?;
let tap = backend
.tap
.try_clone()

View file

@ -378,7 +378,7 @@ impl VhostUserSlaveReqHandlerMut for VsockBackend {
let kernel_evt = Event::new().map_err(|_| Error::SlaveInternalError)?;
let task_evt = EventAsync::new(
kernel_evt.try_clone().expect("failed to clone event").0,
kernel_evt.try_clone().expect("failed to clone event"),
&self.ex,
)
.map_err(|_| Error::SlaveInternalError)?;

View file

@ -65,11 +65,11 @@ fn run_worker(
tx_queue: Arc<Mutex<UserQueue>>,
tx_irq: Event,
) -> Result<()> {
let rx_irq = EventAsync::new(rx_irq.0, &ex).context("failed to create async event")?;
let rx_irq = EventAsync::new(rx_irq, &ex).context("failed to create async event")?;
let rxq = process_rxq(rx_irq, rx_queue, rx_sender, rx_evt);
pin_mut!(rxq);
let tx_irq = EventAsync::new(tx_irq.0, &ex).context("failed to create async event")?;
let tx_irq = EventAsync::new(tx_irq, &ex).context("failed to create async event")?;
let txq = process_txq(tx_irq, Arc::clone(&tx_queue));
pin_mut!(txq);

View file

@ -131,7 +131,7 @@ pub(super) fn start_queue<T: 'static + IntoAsync + TapT>(
let ex = ex.get().expect("Executor not initialized");
let kick_evt =
EventAsync::new(kick_evt.0, ex).context("failed to create EventAsync for kick_evt")?;
EventAsync::new(kick_evt, ex).context("failed to create EventAsync for kick_evt")?;
let tap = backend
.tap
.try_clone()

View file

@ -178,7 +178,7 @@ impl VhostUserBackend for WlBackend {
// Enable any virtqueue features that were negotiated (like VIRTIO_RING_F_EVENT_IDX).
queue.ack_features(self.acked_features);
let kick_evt = EventAsync::new(kick_evt.0, &self.ex)
let kick_evt = EventAsync::new(kick_evt, &self.ex)
.context("failed to create EventAsync for kick_evt")?;
// We use this de-structuring let binding to separate borrows so that the compiler doesn't