base_tokio: tokio compatible Event and Tube types

BUG=b:338274203

Change-Id: I6fbce1386c2c086d7a53a706018d959f4fadfada
Reviewed-on: https://chromium-review.googlesource.com/c/crosvm/crosvm/+/5507771
Reviewed-by: Noah Gold <nkgold@google.com>
Reviewed-by: Daniel Verkamp <dverkamp@chromium.org>
Commit-Queue: Frederick Mayle <fmayle@google.com>
This commit is contained in:
Frederick Mayle 2024-04-25 16:26:38 -07:00 committed by crosvm LUCI
parent 01f9a34a0b
commit 7ec3293ab4
14 changed files with 404 additions and 1 deletions

27
Cargo.lock generated
View file

@ -301,6 +301,21 @@ dependencies = [
"syn 2.0.37",
]
[[package]]
name = "base_tokio"
version = "0.1.0"
dependencies = [
"anyhow",
"base",
"cfg-if",
"futures",
"libc",
"serde",
"sync",
"tokio",
"winapi",
]
[[package]]
name = "bindgen"
version = "0.63.0"
@ -2886,9 +2901,21 @@ dependencies = [
"num_cpus",
"pin-project-lite",
"socket2",
"tokio-macros",
"windows-sys 0.48.0",
]
[[package]]
name = "tokio-macros"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e"
dependencies = [
"proc-macro2",
"quote 1.0.33",
"syn 2.0.37",
]
[[package]]
name = "toml"
version = "0.5.9"

View file

@ -45,6 +45,7 @@ members = [
"audio_util",
"audio_streams_conformance_test",
"base",
"base_tokio",
"bit_field",
"broker_ipc",
"common/audio_streams",
@ -114,6 +115,9 @@ exclude = [
"media/libvda",
]
[workspace.dependencies]
tokio = { version = "1.29.1", features = ["net", "rt-multi-thread", "time", "sync", "macros"] }
[features]
## Default features of crosvm. This selection is somewhat arbitrary for historical reasons.
default = ["audio", "balloon", "config-file", "document-features", "gpu", "qcow", "usb", "libvda-stub", "net", "slirp"]

View file

@ -94,6 +94,10 @@ impl Tube {
}
pub fn recv<T: DeserializeOwned>(&self) -> Result<T> {
// WARNING: The `cros_async` and `base_tokio` tube wrappers both assume that, if the tube
// is readable, then a call to `Tube::recv` will not block (which ought to be true since we
// use SOCK_SEQPACKET and a single recvmsg call currently).
let msg_size = handle_eintr!(self.socket.inner().peek_size()).map_err(Error::Recv)?;
// This buffer is the right size, as the size received in peek_size() represents the size
// of only the message itself and not the file descriptors. The descriptors are stored

19
base_tokio/Cargo.toml Normal file
View file

@ -0,0 +1,19 @@
[package]
name = "base_tokio"
version = "0.1.0"
authors = ["The ChromiumOS Authors"]
edition = "2021"
[dependencies]
anyhow = "*"
cfg-if = "1.0.0"
futures = { version = "0.3" }
libc = "*"
serde = { version = "1" }
tokio = { workspace = true }
base = { path = "../base" }
sync = { path = "../common/sync" }
[target.'cfg(windows)'.dependencies]
winapi = "*"

31
base_tokio/src/event.rs Normal file
View file

@ -0,0 +1,31 @@
// Copyright 2024 The ChromiumOS Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#[cfg(test)]
mod tests {
use base::Event;
use crate::EventTokio;
#[tokio::test]
async fn already_signaled() {
let event = Event::new().unwrap();
let async_event = EventTokio::new(event.try_clone().unwrap()).unwrap();
event.signal().unwrap();
async_event.wait().await.unwrap();
}
#[tokio::test]
async fn signaled_after_delay() {
let event = Event::new().unwrap();
let async_event = EventTokio::new(event.try_clone().unwrap()).unwrap();
tokio::spawn(async move {
tokio::time::sleep(std::time::Duration::from_millis(1)).await;
event.signal().unwrap();
});
async_event.wait().await.unwrap();
}
}

23
base_tokio/src/lib.rs Normal file
View file

@ -0,0 +1,23 @@
// Copyright 2024 The ChromiumOS Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
//! Tokio versions of type's defined in crosvm's `base` crate.
mod sys {
cfg_if::cfg_if! {
if #[cfg(any(target_os = "android", target_os = "linux"))] {
pub mod linux;
pub use linux::*;
} else if #[cfg(windows)] {
pub mod windows;
pub use windows::*;
}
}
}
mod event;
mod tube;
pub use sys::event::EventTokio;
pub use sys::tube::TubeTokio;

View file

@ -0,0 +1,51 @@
// Copyright 2024 The ChromiumOS Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use std::os::fd::AsRawFd;
use tokio::io::unix::AsyncFd;
/// An async version of `base::Event`.
pub struct EventTokio(AsyncFd<base::SafeDescriptor>);
impl EventTokio {
/// WARNING: Sets O_NONBLOCK internally, which will also affect all clones of the `Event`.
pub fn new(event: base::Event) -> anyhow::Result<Self> {
let fd: base::SafeDescriptor = event.into();
base::add_fd_flags(fd.as_raw_fd(), libc::O_NONBLOCK)?;
Ok(Self(AsyncFd::new(fd)?))
}
/// Blocks until the event is signaled and clears the signal.
///
/// It is undefined behavior to wait on an event from multiple threads or processes
/// simultaneously.
pub async fn wait(&self) -> std::io::Result<()> {
loop {
let mut guard = self.0.readable().await?;
match guard.try_io(|inner| {
let mut buf: u64 = 0;
// SAFETY: This is safe because we made this fd and the pointer we pass can not
// overflow because we give the syscall's size parameter properly.
let ret = unsafe {
libc::read(
inner.as_raw_fd(),
&mut buf as *mut u64 as *mut libc::c_void,
std::mem::size_of::<u64>(),
)
};
if ret < 0 {
return Err(std::io::Error::last_os_error());
}
if ret as usize != std::mem::size_of::<u64>() {
return Err(std::io::Error::from(std::io::ErrorKind::UnexpectedEof));
}
Ok(())
}) {
Ok(result) => return result,
Err(_would_block) => continue,
}
}
}
}

View file

@ -0,0 +1,6 @@
// Copyright 2024 The ChromiumOS Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
pub mod event;
pub mod tube;

View file

@ -0,0 +1,82 @@
// Copyright 2024 The ChromiumOS Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use std::os::fd::AsRawFd;
use tokio::io::unix::AsyncFd;
/// An async version of `base::Tube`.
pub struct TubeTokio(AsyncFd<base::Tube>);
impl TubeTokio {
pub fn new(tube: base::Tube) -> anyhow::Result<Self> {
base::add_fd_flags(tube.as_raw_fd(), libc::O_NONBLOCK)?;
Ok(Self(AsyncFd::new(tube)?))
}
pub async fn into_inner(self) -> base::Tube {
let tube = self.0.into_inner();
base::clear_fd_flags(tube.as_raw_fd(), libc::O_NONBLOCK)
.expect("failed to clear O_NONBLOCK");
tube
}
pub async fn send<T: serde::Serialize + Send + 'static>(&self, msg: T) -> base::TubeResult<()> {
loop {
let mut guard = self.0.writable().await.map_err(base::TubeError::Send)?;
match guard.try_io(|inner| {
// Re-using the non-async send is potentially hazardous since it isn't explicitly
// written with O_NONBLOCK support. However, since it uses SOCK_SEQPACKET and a
// single write syscall, it should be OK.
let r = inner.get_ref().send(&msg);
// Transpose the `std::io::Error` errors outside so that `try_io` can check them
// for `WouldBlock`.
match r {
Ok(x) => Ok(Ok(x)),
Err(base::TubeError::Send(e)) => Err(e),
Err(e) => Ok(Err(e)),
}
}) {
Ok(result) => {
return match result {
Ok(Ok(x)) => Ok(x),
Ok(Err(e)) => Err(e),
Err(e) => Err(base::TubeError::Send(e)),
}
}
Err(_would_block) => continue,
}
}
}
pub async fn recv<T: serde::de::DeserializeOwned + Send + 'static>(
&self,
) -> base::TubeResult<T> {
loop {
let mut guard = self.0.readable().await.map_err(base::TubeError::Recv)?;
match guard.try_io(|inner| {
// Re-using the non-async recv is potentially hazardous since it isn't explicitly
// written with O_NONBLOCK support. However, since it uses SOCK_SEQPACKET and a
// single read syscall, it should be OK.
let r = inner.get_ref().recv();
// Transpose the `std::io::Error` errors outside so that `try_io` can check them
// for `WouldBlock`.
match r {
Ok(x) => Ok(Ok(x)),
Err(base::TubeError::Recv(e)) => Err(e),
Err(e) => Ok(Err(e)),
}
}) {
Ok(result) => {
return match result {
Ok(Ok(x)) => Ok(x),
Ok(Err(e)) => Err(e),
Err(e) => Err(base::TubeError::Recv(e)),
}
}
Err(_would_block) => continue,
}
}
}
}

View file

@ -0,0 +1,25 @@
// Copyright 2024 The ChromiumOS Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
/// An async version of `base::Event`.
pub struct EventTokio(base::Event);
impl EventTokio {
/// Must be a manual-reset event (i.e. created by `base::Event::new()`).
///
/// TODO: Add support for auto-reset events.
pub fn new(event: base::Event) -> anyhow::Result<Self> {
Ok(Self(event))
}
/// Blocks until the event is signaled and clears the signal.
///
/// It is undefined behavior to wait on an event from multiple threads or processes
/// simultaneously.
pub async fn wait(&self) -> std::io::Result<()> {
base::sys::windows::async_wait_for_single_object(&self.0).await?;
self.0.reset()?;
Ok(())
}
}

View file

@ -0,0 +1,6 @@
// Copyright 2024 The ChromiumOS Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
pub mod event;
pub mod tube;

View file

@ -0,0 +1,100 @@
// Copyright 2024 The ChromiumOS Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use base::warn;
use base::AsRawDescriptor;
use base::Descriptor;
use base::Error;
use base::Event;
use base::Tube;
use base::TubeError;
use base::TubeResult;
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use winapi::um::ioapiset::CancelIoEx;
/// An async version of `Tube`.
///
/// Implementation note: We don't trust `base::Tube::recv` to behave in a non-blocking manner even
/// when the read notifier is signalled, so we offload the actual `send` and `recv` calls onto a
/// blocking thread.
pub struct TubeTokio {
worker: tokio::task::JoinHandle<Tube>,
cmd_tx: mpsc::Sender<Box<dyn FnOnce(&Tube) + Send>>,
// Clone of the tube's read notifier.
read_notifier: Event,
// Tube's RawDescriptor.
tube_descriptor: Descriptor,
}
impl TubeTokio {
pub fn new(mut tube: Tube) -> anyhow::Result<Self> {
let read_notifier = tube.get_read_notifier_event().try_clone()?;
let tube_descriptor = Descriptor(tube.as_raw_descriptor());
let (cmd_tx, mut cmd_rx) = mpsc::channel::<Box<dyn FnOnce(&Tube) + Send>>(1);
let worker = tokio::task::spawn_blocking(move || {
while let Some(f) = cmd_rx.blocking_recv() {
f(&mut tube)
}
tube
});
Ok(Self {
worker,
cmd_tx,
read_notifier,
tube_descriptor,
})
}
pub async fn into_inner(self) -> Tube {
drop(self.cmd_tx);
// Attempt to cancel any blocking IO the worker thread is doing so that we don't get stuck
// here if a `recv` call blocked. This is racy since we don't know if the queue'd up IO
// requests have actually started yet.
//
// SAFETY: The descriptor should still be valid since we own the tube in the blocking task.
if unsafe { CancelIoEx(self.tube_descriptor.0, std::ptr::null_mut()) } == 0 {
warn!(
"Cancel IO for handle:{:?} failed with {}",
self.tube_descriptor.0,
Error::last()
);
}
self.worker.await.expect("failed to join tube worker")
}
pub async fn send<T: serde::Serialize + Send + 'static>(&self, msg: T) -> TubeResult<()> {
// It is unlikely the tube is full given crosvm usage patterns, so request the blocking
// send immediately.
let (tx, rx) = oneshot::channel();
self.cmd_tx
.send(Box::new(move |tube| {
let _ = tx.send(tube.send(&msg));
}))
.await
.expect("worker missing");
rx.await.map_err(|_| TubeError::OperationCancelled)??;
Ok(())
}
pub async fn recv<T: serde::de::DeserializeOwned + Send + 'static>(&self) -> TubeResult<T> {
// `Tube`'s read notifier event is a manual-reset event and `Tube::recv` wants to
// handle the reset, so we bypass `EventAsync`.
base::sys::windows::async_wait_for_single_object(&self.read_notifier)
.await
.map_err(TubeError::Recv)?;
let (tx, rx) = oneshot::channel();
self.cmd_tx
.send(Box::new(move |tube| {
let _ = tx.send(tube.recv());
}))
.await
.expect("worker missing");
rx.await.map_err(|_| TubeError::OperationCancelled)?
}
}

25
base_tokio/src/tube.rs Normal file
View file

@ -0,0 +1,25 @@
// Copyright 2024 The ChromiumOS Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#[cfg(test)]
mod tests {
use base::Tube;
use crate::TubeTokio;
#[tokio::test]
async fn recv_send() {
let (a, b) = Tube::pair().unwrap();
let b = TubeTokio::new(b).unwrap();
let blocking_task = tokio::task::spawn_blocking(move || {
a.send(&5u8).unwrap();
a.recv::<u8>().unwrap()
});
assert_eq!(b.recv::<u8>().await.unwrap(), 5u8);
b.send(&16u8).await.unwrap();
assert_eq!(blocking_task.await.unwrap(), 16u8);
}
}

View file

@ -26,7 +26,7 @@ anyhow = "1.0"
serde = "1"
serde_keyvalue = { path = "../serde_keyvalue", features = ["argh_derive"] } # provided by ebuild
static_assertions = "1.1"
tokio = { version = "1.29.1", optional = true, features = ["net", "rt-multi-thread"] }
tokio = { workspace = true, optional = true }
[target.'cfg(any(target_os = "android", target_os = "linux"))'.dependencies]
io_uring = { path = "../io_uring" } # provided by ebuild