Refactor cros_async interface to avoid RawFd.

Instead of creating IoSourceExt from AsRawFd implementers, we've
switched to creating from a marker trait `IntoAsync`. This lets us use
other types like RawDescriptor easily with this crate.  By using the
marker, we also provide some type safety by requiring consumers of
IoSourceExt to declare that their type is expected to work with async
operations. This way we can provide stronger guarantees that an async
IO trait object will behave in a reasonable way.

This CL also purges the cros_async -> base and io_uring -> base
references, and provides the base types needed to add new async
primitives to base.

BUG=none
TEST=builds

Change-Id: I0b0ce6ca7938b22ae8e8fb4e604439f0292678f2
Reviewed-on: https://chromium-review.googlesource.com/c/chromiumos/platform/crosvm/+/2504481
Tested-by: kokoro <noreply+kokoro@google.com>
Commit-Queue: Noah Gold <nkgold@google.com>
Reviewed-by: Chirantan Ekbote <chirantan@chromium.org>
This commit is contained in:
Noah Gold 2020-11-05 19:10:36 -08:00 committed by Commit Bot
parent 290d5c49ce
commit b3fca20e24
20 changed files with 189 additions and 125 deletions

5
Cargo.lock generated
View file

@ -77,6 +77,7 @@ dependencies = [
name = "base"
version = "0.1.0"
dependencies = [
"cros_async 0.1.0",
"data_model 0.1.0",
"libc 0.2.65 (registry+https://github.com/rust-lang/crates.io-index)",
"sync 0.1.0",
@ -127,13 +128,13 @@ name = "cros_async"
version = "0.1.0"
dependencies = [
"async-trait 0.1.36 (registry+https://github.com/rust-lang/crates.io-index)",
"base 0.1.0",
"futures 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
"io_uring 0.1.0",
"libc 0.2.65 (registry+https://github.com/rust-lang/crates.io-index)",
"paste 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)",
"pin-utils 0.1.0-alpha.4 (registry+https://github.com/rust-lang/crates.io-index)",
"slab 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)",
"sys_util 0.1.0",
"syscall_defines 0.1.0",
"thiserror 1.0.20 (registry+https://github.com/rust-lang/crates.io-index)",
]
@ -427,8 +428,8 @@ dependencies = [
name = "io_uring"
version = "0.1.0"
dependencies = [
"base 0.1.0",
"libc 0.2.65 (registry+https://github.com/rust-lang/crates.io-index)",
"sys_util 0.1.0",
"syscall_defines 0.1.0",
]

View file

@ -8,6 +8,7 @@ edition = "2018"
chromeos = ["sys_util/chromeos"]
[dependencies]
cros_async = { path = "../cros_async" }
data_model = { path = "../data_model" }
libc = "*"
sync = { path = "../sync" }

25
base/src/async_types.rs Normal file
View file

@ -0,0 +1,25 @@
// Copyright 2020 The Chromium OS Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use crate::AsRawDescriptor;
use cros_async::IntoAsync;
use std::os::unix::io::{AsRawFd, RawFd};
/// Like `cros_async::IntoAsync`, except for use with crosvm's AsRawDescriptor
/// trait object family.
pub trait DescriptorIntoAsync: AsRawDescriptor {}
/// To use an IO struct with cros_async, the type must be marked with
/// DescriptorIntoAsync (to signify it is suitable for use with async
/// operations), and then wrapped with this type.
pub struct DescriptorAdapter<T: DescriptorIntoAsync>(pub T);
impl<T> AsRawFd for DescriptorAdapter<T>
where
T: DescriptorIntoAsync,
{
fn as_raw_fd(&self) -> RawFd {
self.0.as_raw_descriptor()
}
}
impl<T> IntoAsync for DescriptorAdapter<T> where T: DescriptorIntoAsync {}

View file

@ -15,7 +15,7 @@ pub use sys_util::EventReadResult;
/// See [EventFd](sys_util::EventFd) for struct- and method-level
/// documentation.
#[derive(Debug, PartialEq, Eq)]
pub struct Event(pub(self) EventFd);
pub struct Event(pub EventFd);
impl Event {
pub fn new() -> Result<Event> {
EventFd::new().map(|eventfd| Event(eventfd))

View file

@ -4,6 +4,7 @@
pub use sys_util::*;
mod async_types;
mod event;
mod ioctl;
mod mmap;
@ -11,6 +12,7 @@ mod shm;
mod timer;
mod wait_context;
pub use async_types::*;
pub use event::{Event, EventReadResult, ScopedEvent};
pub use ioctl::{
ioctl, ioctl_with_mut_ptr, ioctl_with_mut_ref, ioctl_with_ptr, ioctl_with_ref, ioctl_with_val,

View file

@ -42,7 +42,7 @@ pub enum EventType {
///
/// ```
/// # use base::{
/// Result, Event, WaitContext, EventMethods, EventTrigger,
/// Result, Event, WaitContext,
/// };
/// # fn test() -> Result<()> {
/// let evt1 = Event::new()?;

View file

@ -10,7 +10,7 @@ io_uring = { path = "../io_uring" }
libc = "*"
paste = "*"
pin-utils = "0.1.0-alpha.4"
base = { path = "../base" }
sys_util = { path = "../sys_util" }
syscall_defines = { path = "../syscall_defines" }
slab = "0.4"
thiserror = "1.0.20"
@ -21,6 +21,5 @@ default-features = false
features = ["alloc"]
[dev-dependencies]
base = { path = "../base" }
tempfile = { path = "../tempfile" }
vm_memory = { path = "../vm_memory" }

View file

@ -2,27 +2,29 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use crate::{new, AsyncResult, IoSourceExt};
use std::os::unix::io::AsRawFd;
use crate::io_ext::async_from;
use crate::{AsyncError, AsyncResult, IntoAsync, IoSourceExt};
use std::convert::TryFrom;
use sys_util::EventFd;
/// An async version of sys_util::EventFd.
pub struct EventAsync<F> {
io_source: Box<dyn IoSourceExt<F> + 'static>,
/// An async version of `sys_util::EventFd`.
pub struct EventAsync {
io_source: Box<dyn IoSourceExt<EventFd>>,
}
impl<F> EventAsync<F> {
/// Creates a new EventAsync wrapper around the provided eventfd.
#[allow(dead_code)]
#[allow(clippy::new_ret_no_self)]
pub fn new<G: AsRawFd + 'static>(g: G) -> AsyncResult<EventAsync<G>> {
Ok(EventAsync { io_source: new(g)? })
impl EventAsync {
#[cfg(test)]
pub(crate) fn new_poll(event: EventFd) -> AsyncResult<EventAsync> {
Ok(EventAsync {
io_source: crate::io_ext::async_poll_from(event)?,
})
}
/// Like new, but allows the source to be constructed directly. Used for
/// testing only.
#[cfg(test)]
pub(crate) fn new_from_source(io_source: Box<dyn IoSourceExt<F> + 'static>) -> EventAsync<F> {
EventAsync { io_source }
pub(crate) fn new_uring(event: EventFd) -> AsyncResult<EventAsync> {
Ok(EventAsync {
io_source: crate::io_ext::async_uring_from(event)?,
})
}
/// Gets the next value from the eventfd.
@ -32,21 +34,33 @@ impl<F> EventAsync<F> {
}
}
impl TryFrom<EventFd> for EventAsync {
type Error = AsyncError;
/// Creates a new EventAsync wrapper around the provided eventfd.
fn try_from(event: EventFd) -> AsyncResult<Self> {
Ok(EventAsync {
io_source: async_from(event)?,
})
}
}
impl IntoAsync for EventFd {}
#[cfg(test)]
mod tests {
use super::*;
use crate::io_ext::{new_poll, new_uring};
use base::Event;
use futures::pin_mut;
use std::convert::TryInto;
#[test]
fn next_val_reads_value() {
async fn go(event: Event) -> u64 {
let event_async = EventAsync::new(event).unwrap();
async fn go(event: EventFd) -> u64 {
let event_async: EventAsync = event.try_into().unwrap();
event_async.next_val().await.unwrap()
}
let eventfd = Event::new().unwrap();
let eventfd = EventFd::new().unwrap();
eventfd.write(0xaa).unwrap();
let fut = go(eventfd);
pin_mut!(fut);
@ -56,21 +70,20 @@ mod tests {
#[test]
fn next_val_reads_value_poll_and_ring() {
async fn go(source: Box<dyn IoSourceExt<Event> + 'static>) -> u64 {
let event_async = EventAsync::new_from_source(source);
async fn go(event_async: EventAsync) -> u64 {
event_async.next_val().await.unwrap()
}
let eventfd = Event::new().unwrap();
let eventfd = EventFd::new().unwrap();
eventfd.write(0xaa).unwrap();
let fut = go(new_poll(eventfd).unwrap());
let fut = go(EventAsync::new_uring(eventfd).unwrap());
pin_mut!(fut);
let val = crate::run_executor(crate::RunOne::new(fut)).unwrap();
assert_eq!(val, 0xaa);
let eventfd = Event::new().unwrap();
let eventfd = EventFd::new().unwrap();
eventfd.write(0xaa).unwrap();
let fut = go(new_uring(eventfd).unwrap());
let fut = go(EventAsync::new_poll(eventfd).unwrap());
pin_mut!(fut);
let val = crate::run_executor(crate::RunOne::new(fut)).unwrap();
assert_eq!(val, 0xaa);

View file

@ -21,7 +21,7 @@ use std::task::Waker;
use slab::Slab;
use base::{PollContext, WatchingEvents};
use sys_util::{PollContext, WatchingEvents};
use crate::executor::{ExecutableFuture, Executor, FutureList};
use crate::WakerToken;
@ -31,15 +31,15 @@ pub enum Error {
/// Attempts to create two Executors on the same thread fail.
AttemptedDuplicateExecutor,
/// Failed to copy the FD for the polling context.
DuplicatingFd(base::Error),
DuplicatingFd(sys_util::Error),
/// Failed accessing the thread local storage for wakers.
InvalidContext,
/// Creating a context to wait on FDs failed.
CreatingContext(base::Error),
CreatingContext(sys_util::Error),
/// PollContext failure.
PollContextError(base::Error),
PollContextError(sys_util::Error),
/// Failed to submit the waker to the polling context.
SubmittingWaker(base::Error),
SubmittingWaker(sys_util::Error),
/// A Waker was canceled, but the operation isn't running.
UnknownWaker,
}
@ -274,7 +274,7 @@ impl<T: FutureList> Drop for FdExecutor<T> {
unsafe fn dup_fd(fd: RawFd) -> Result<RawFd> {
let ret = libc::dup(fd);
if ret < 0 {
Err(Error::DuplicatingFd(base::Error::last()))
Err(Error::DuplicatingFd(sys_util::Error::last()))
} else {
Ok(ret)
}
@ -338,7 +338,7 @@ mod test {
#[test]
fn test_it() {
async fn do_test() {
let (r, _w) = base::pipe(true).unwrap();
let (r, _w) = sys_util::pipe(true).unwrap();
let done = Box::pin(async { 5usize });
let pending = Box::pin(TestFut::new(r));
match futures::future::select(pending, done).await {

View file

@ -18,11 +18,13 @@
use crate::poll_source::PollSource;
use crate::UringSource;
use async_trait::async_trait;
use std::fs::File;
use std::os::unix::io::AsRawFd;
use std::rc::Rc;
use thiserror::Error as ThisError;
use crate::uring_mem::{BackingMemory, MemRegion};
use sys_util::net::UnixSeqpacket;
#[derive(ThisError, Debug)]
pub enum Error {
@ -94,27 +96,41 @@ pub trait IoSourceExt<F>: ReadAsync + WriteAsync {
fn as_source(&self) -> &F;
}
/// Marker trait signifying that the implementor is suitable for use with
/// cros_async. Examples of this include File, and sys_util::net::UnixSeqpacket.
///
/// (Note: it'd be really nice to implement a TryFrom for any implementors, and
/// remove our factory functions. Unfortunately
/// https://github.com/rust-lang/rust/issues/50133 makes that too painful.)
pub trait IntoAsync: AsRawFd {}
impl IntoAsync for File {}
impl IntoAsync for UnixSeqpacket {}
impl IntoAsync for &UnixSeqpacket {}
/// Creates a concrete `IoSourceExt` that uses uring if available or falls back to the fd_executor if not.
/// Note that on older kernels (pre 5.6) FDs such as event or timer FDs are unreliable when
/// having readvwritev performed through io_uring. To deal with EventFd or TimerFd, use
/// `IoSourceExt::read_u64`.
pub fn new<'a, F: AsRawFd + 'a>(f: F) -> Result<Box<dyn IoSourceExt<F> + 'a>> {
pub fn async_from<'a, F: IntoAsync + 'a>(f: F) -> Result<Box<dyn IoSourceExt<F> + 'a>> {
if crate::uring_executor::use_uring() {
new_uring(f)
async_uring_from(f)
} else {
new_poll(f)
async_poll_from(f)
}
}
/// Creates a concrete `IoSourceExt` using Uring.
pub(crate) fn new_uring<'a, F: AsRawFd + 'a>(f: F) -> Result<Box<dyn IoSourceExt<F> + 'a>> {
pub(crate) fn async_uring_from<'a, F: IntoAsync + 'a>(
f: F,
) -> Result<Box<dyn IoSourceExt<F> + 'a>> {
UringSource::new(f)
.map(|u| Box::new(u) as Box<dyn IoSourceExt<F>>)
.map_err(Error::Uring)
}
/// Creates a concrete `IoSourceExt` using the fd_executor.
pub(crate) fn new_poll<'a, F: AsRawFd + 'a>(f: F) -> Result<Box<dyn IoSourceExt<F> + 'a>> {
pub(crate) fn async_poll_from<'a, F: IntoAsync + 'a>(f: F) -> Result<Box<dyn IoSourceExt<F> + 'a>> {
PollSource::new(f)
.map(|u| Box::new(u) as Box<dyn IoSourceExt<F>>)
.map_err(Error::Poll)
@ -145,7 +161,7 @@ mod tests {
}
let f = File::open("/dev/zero").unwrap();
let uring_source = new_uring(f).unwrap();
let uring_source = async_uring_from(f).unwrap();
let fut = go(uring_source);
pin_mut!(fut);
crate::uring_executor::URingExecutor::new(crate::RunOne::new(fut))
@ -154,7 +170,7 @@ mod tests {
.unwrap();
let f = File::open("/dev/zero").unwrap();
let poll_source = new_poll(f).unwrap();
let poll_source = async_poll_from(f).unwrap();
let fut = go(poll_source);
pin_mut!(fut);
crate::fd_executor::FdExecutor::new(crate::RunOne::new(fut))
@ -175,7 +191,7 @@ mod tests {
}
let f = OpenOptions::new().write(true).open("/dev/null").unwrap();
let uring_source = new_uring(f).unwrap();
let uring_source = async_uring_from(f).unwrap();
let fut = go(uring_source);
pin_mut!(fut);
crate::uring_executor::URingExecutor::new(crate::RunOne::new(fut))
@ -184,7 +200,7 @@ mod tests {
.unwrap();
let f = OpenOptions::new().write(true).open("/dev/null").unwrap();
let poll_source = new_poll(f).unwrap();
let poll_source = async_poll_from(f).unwrap();
let fut = go(poll_source);
pin_mut!(fut);
crate::fd_executor::FdExecutor::new(crate::RunOne::new(fut))
@ -223,7 +239,7 @@ mod tests {
}
let f = File::open("/dev/zero").unwrap();
let uring_source = new_uring(f).unwrap();
let uring_source = async_uring_from(f).unwrap();
let fut = go(uring_source);
pin_mut!(fut);
crate::uring_executor::URingExecutor::new(crate::RunOne::new(fut))
@ -232,7 +248,7 @@ mod tests {
.unwrap();
let f = File::open("/dev/zero").unwrap();
let poll_source = new_poll(f).unwrap();
let poll_source = async_poll_from(f).unwrap();
let fut = go(poll_source);
pin_mut!(fut);
crate::fd_executor::FdExecutor::new(crate::RunOne::new(fut))
@ -257,7 +273,7 @@ mod tests {
}
let f = OpenOptions::new().write(true).open("/dev/null").unwrap();
let uring_source = new_uring(f).unwrap();
let uring_source = async_uring_from(f).unwrap();
let fut = go(uring_source);
pin_mut!(fut);
crate::uring_executor::URingExecutor::new(crate::RunOne::new(fut))
@ -266,7 +282,7 @@ mod tests {
.unwrap();
let f = OpenOptions::new().write(true).open("/dev/null").unwrap();
let poll_source = new_poll(f).unwrap();
let poll_source = async_poll_from(f).unwrap();
let fut = go(poll_source);
pin_mut!(fut);
crate::fd_executor::FdExecutor::new(crate::RunOne::new(fut))
@ -277,8 +293,8 @@ mod tests {
#[test]
fn read_u64s() {
async fn go<F: AsRawFd + Unpin>(async_source: F) -> u64 {
let source = new(async_source).unwrap();
async fn go(async_source: File) -> u64 {
let source = async_from(async_source).unwrap();
source.read_u64().await.unwrap()
}
@ -294,7 +310,7 @@ mod tests {
#[test]
fn read_eventfds() {
use base::EventFd;
use sys_util::EventFd;
async fn go<F: AsRawFd + Unpin>(source: Box<dyn IoSourceExt<F>>) -> u64 {
source.read_u64().await.unwrap()
@ -302,7 +318,7 @@ mod tests {
let eventfd = EventFd::new().unwrap();
eventfd.write(0x55).unwrap();
let fut = go(new(eventfd).unwrap());
let fut = go(async_uring_from(eventfd).unwrap());
pin_mut!(fut);
let val = crate::uring_executor::URingExecutor::new(crate::RunOne::new(fut))
.unwrap()
@ -312,7 +328,7 @@ mod tests {
let eventfd = EventFd::new().unwrap();
eventfd.write(0xaa).unwrap();
let fut = go(new_poll(eventfd).unwrap());
let fut = go(async_poll_from(eventfd).unwrap());
pin_mut!(fut);
let val = crate::fd_executor::FdExecutor::new(crate::RunOne::new(fut))
.unwrap()
@ -334,7 +350,7 @@ mod tests {
}
let f = tempfile::tempfile().unwrap();
let source = new(f).unwrap();
let source = async_from(f).unwrap();
let fut = go(source);
pin_mut!(fut);

View file

@ -74,7 +74,8 @@ mod waker;
pub use event::EventAsync;
pub use executor::Executor;
pub use io_ext::{
new, Error as AsyncError, IoSourceExt, ReadAsync, Result as AsyncResult, WriteAsync,
async_from, Error as AsyncError, IntoAsync, IoSourceExt, ReadAsync, Result as AsyncResult,
WriteAsync,
};
pub use poll_source::PollSource;
pub use select::SelectResult;

View file

@ -21,7 +21,7 @@ use crate::uring_mem::{BackingMemory, BorrowedIoVec, MemRegion};
use crate::AsyncError;
use crate::AsyncResult;
use crate::{IoSourceExt, ReadAsync, WriteAsync};
use base::{self, add_fd_flags};
use sys_util::{self, add_fd_flags};
use thiserror::Error as ThisError;
#[derive(ThisError, Debug)]
@ -31,23 +31,23 @@ pub enum Error {
AddingWaker(fd_executor::Error),
/// An error occurred when executing fallocate synchronously.
#[error("An error occurred when executing fallocate synchronously: {0}")]
Fallocate(base::Error),
Fallocate(sys_util::Error),
/// An error occurred when executing fsync synchronously.
#[error("An error occurred when executing fsync synchronously: {0}")]
Fsync(base::Error),
Fsync(sys_util::Error),
/// An error occurred when reading the FD.
///
#[error("An error occurred when reading the FD: {0}.")]
Read(base::Error),
Read(sys_util::Error),
/// Can't seek file.
#[error("An error occurred when seeking the FD: {0}.")]
Seeking(base::Error),
Seeking(sys_util::Error),
/// An error occurred when setting the FD non-blocking.
#[error("An error occurred setting the FD non-blocking: {0}.")]
SettingNonBlocking(base::Error),
SettingNonBlocking(sys_util::Error),
/// An error occurred when writing the FD.
#[error("An error occurred when writing the FD: {0}.")]
Write(base::Error),
Write(sys_util::Error),
}
pub type Result<T> = std::result::Result<T, Error>;
@ -186,7 +186,7 @@ impl<F: AsRawFd> WriteAsync for PollSource<F> {
if ret == 0 {
Ok(())
} else {
Err(AsyncError::Poll(Error::Fallocate(base::Error::last())))
Err(AsyncError::Poll(Error::Fallocate(sys_util::Error::last())))
}
}
@ -196,7 +196,7 @@ impl<F: AsRawFd> WriteAsync for PollSource<F> {
if ret == 0 {
Ok(())
} else {
Err(AsyncError::Poll(Error::Fsync(base::Error::last())))
Err(AsyncError::Poll(Error::Fsync(sys_util::Error::last())))
}
}
}
@ -236,7 +236,7 @@ impl<'a, F: AsRawFd> Future for PollReadVec<'a, F> {
type Output = Result<(usize, Vec<u8>)>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
fn do_read(fd: RawFd, file_offset: u64, buf: &mut [u8]) -> base::Result<usize> {
fn do_read(fd: RawFd, file_offset: u64, buf: &mut [u8]) -> sys_util::Result<usize> {
// Safe because we trust the kernel not to write past the length given and the length is
// guaranteed to be valid from the pointer by the mut slice.
let ret = unsafe {
@ -249,7 +249,7 @@ impl<'a, F: AsRawFd> Future for PollReadVec<'a, F> {
};
match ret {
n if n >= 0 => Ok(n as usize),
_ => base::errno_result(),
_ => sys_util::errno_result(),
}
}
@ -293,13 +293,13 @@ impl<'a, F: AsRawFd> Future for PollReadU64<'a, F> {
type Output = Result<u64>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
fn do_read(fd: RawFd, buf: &mut [u8]) -> base::Result<usize> {
fn do_read(fd: RawFd, buf: &mut [u8]) -> sys_util::Result<usize> {
// Safe because we trust the kernel not to write past the length given and the length is
// guaranteed to be valid from the pointer by the mut slice.
let ret = unsafe { libc::read(fd, buf.as_mut_ptr() as *mut _, buf.len()) };
match ret {
n if n >= 0 => Ok(n as usize),
_ => base::errno_result(),
_ => sys_util::errno_result(),
}
}
@ -343,10 +343,10 @@ impl<'a, F: AsRawFd> Future for PollWaitReadable<'a, F> {
type Output = Result<()>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
fn do_poll(fd: RawFd) -> base::Result<usize> {
fn do_poll(fd: RawFd) -> sys_util::Result<usize> {
let poll_arg = libc::pollfd {
fd,
events: base::WatchingEvents::empty().set_read().get_raw() as i16,
events: sys_util::WatchingEvents::empty().set_read().get_raw() as i16,
revents: 0,
};
@ -354,7 +354,7 @@ impl<'a, F: AsRawFd> Future for PollWaitReadable<'a, F> {
let ret = unsafe { libc::poll(&poll_arg as *const _ as *mut _, 1, 0) };
match ret {
n if n >= 0 => Ok(n as usize),
_ => base::errno_result(),
_ => sys_util::errno_result(),
}
}
@ -390,7 +390,7 @@ impl<'a, F: AsRawFd> Future for PollWriteVec<'a, F> {
type Output = Result<(usize, Vec<u8>)>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
fn do_write(fd: RawFd, file_offset: u64, buf: &[u8]) -> base::Result<usize> {
fn do_write(fd: RawFd, file_offset: u64, buf: &[u8]) -> sys_util::Result<usize> {
// Safe because we trust the kernel not to write to the buffer, only read from it and
// the length is guaranteed to be valid from the pointer by the mut slice.
let ret = unsafe {
@ -403,7 +403,7 @@ impl<'a, F: AsRawFd> Future for PollWriteVec<'a, F> {
};
match ret {
n if n >= 0 => Ok(n as usize),
_ => base::errno_result(),
_ => sys_util::errno_result(),
}
}
@ -455,7 +455,7 @@ impl<'a, F: AsRawFd> Future for PollReadMem<'a, F> {
file_offset: u64,
mem: &dyn BackingMemory,
mem_offsets: &[MemRegion],
) -> base::Result<usize> {
) -> sys_util::Result<usize> {
let mut iovecs = mem_offsets
.iter()
.filter_map(|&mem_vec| mem.get_iovec(mem_vec).ok())
@ -472,7 +472,7 @@ impl<'a, F: AsRawFd> Future for PollReadMem<'a, F> {
};
match ret {
n if n >= 0 => Ok(n as usize),
_ => base::errno_result(),
_ => sys_util::errno_result(),
}
}
@ -523,7 +523,7 @@ impl<'a, F: AsRawFd> Future for PollWriteMem<'a, F> {
file_offset: u64,
mem: &dyn BackingMemory,
mem_offsets: &[MemRegion],
) -> base::Result<usize> {
) -> sys_util::Result<usize> {
let iovecs = mem_offsets
.iter()
.map(|&mem_vec| mem.get_iovec(mem_vec))
@ -541,7 +541,7 @@ impl<'a, F: AsRawFd> Future for PollWriteMem<'a, F> {
};
match ret {
n if n >= 0 => Ok(n as usize),
_ => base::errno_result(),
_ => sys_util::errno_result(),
}
}

View file

@ -68,8 +68,8 @@ use std::task::{Context, Poll};
use futures::pin_mut;
use slab::Slab;
use base::WatchingEvents;
use io_uring::URingContext;
use sys_util::WatchingEvents;
use crate::executor::{ExecutableFuture, Executor, FutureList};
use crate::uring_mem::{BackingMemory, MemRegion};
@ -80,7 +80,7 @@ pub enum Error {
/// Attempts to create two Executors on the same thread fail.
AttemptedDuplicateExecutor,
/// Failed to copy the FD for the polling context.
DuplicatingFd(base::Error),
DuplicatingFd(sys_util::Error),
/// Failed accessing the thread local storage for wakers.
InvalidContext,
/// Invalid offset or length given for an iovec in backing memory.
@ -346,7 +346,7 @@ impl RingWakerState {
fn submit_poll(
&mut self,
source: &RegisteredSource,
events: &base::WatchingEvents,
events: &sys_util::WatchingEvents,
) -> Result<WakerToken> {
let src = self
.registered_sources
@ -679,7 +679,7 @@ impl<T: FutureList> Drop for URingExecutor<T> {
unsafe fn dup_fd(fd: RawFd) -> Result<RawFd> {
let ret = libc::dup(fd);
if ret < 0 {
Err(Error::DuplicatingFd(base::Error::last()))
Err(Error::DuplicatingFd(sys_util::Error::last()))
} else {
Ok(ret)
}
@ -733,7 +733,7 @@ mod tests {
let bm = Rc::new(VecIoWrapper::from(vec![0u8; 4096])) as Rc<dyn BackingMemory>;
// Use pipes to create a future that will block forever.
let (rx, mut tx) = base::pipe(true).unwrap();
let (rx, mut tx) = sys_util::pipe(true).unwrap();
// Set up the TLS for the uring_executor by creating one.
let _ex = URingExecutor::new(crate::executor::UnitFutures::new()).unwrap();
@ -771,7 +771,7 @@ mod tests {
let bm = Rc::new(VecIoWrapper::from(vec![0u8; 4096])) as Rc<dyn BackingMemory>;
// Use pipes to create a future that will block forever.
let (mut rx, tx) = base::new_pipe_full().expect("Pipe failed");
let (mut rx, tx) = sys_util::new_pipe_full().expect("Pipe failed");
// Set up the TLS for the uring_executor by creating one.
let _ex = URingExecutor::new(crate::executor::UnitFutures::new()).unwrap();
@ -797,7 +797,7 @@ mod tests {
// Finishing the operation should put the Rc count back to 1.
// write to the pipe to wake the read pipe and then wait for the uring result in the
// executor.
let mut buf = vec![0u8; base::round_up_to_page_size(1)];
let mut buf = vec![0u8; sys_util::round_up_to_page_size(1)];
rx.read(&mut buf).expect("read to empty failed");
RingWakerState::wait_wake_event().expect("Failed to wait for read pipe ready");
assert_eq!(Rc::strong_count(&bm), 1);
@ -806,7 +806,7 @@ mod tests {
#[test]
fn registered_source_outlives_executor() {
let bm = Rc::new(VecIoWrapper::from(vec![0u8; 4096])) as Rc<dyn BackingMemory>;
let (rx, tx) = base::pipe(true).unwrap();
let (rx, tx) = sys_util::pipe(true).unwrap();
// Register a source before creating the executor.
let rx_source = register_source(&rx).expect("register source failed");
@ -854,7 +854,7 @@ mod tests {
let bm = Rc::new(VecIoWrapper::from(vec![0u8; 16])) as Rc<dyn BackingMemory>;
let (rx, tx) = base::pipe(true).expect("Pipe failed");
let (rx, tx) = sys_util::pipe(true).expect("Pipe failed");
let mut ex = URingExecutor::new(crate::executor::UnitFutures::new()).unwrap();

View file

@ -144,9 +144,9 @@ mod tests {
#[test]
fn event() {
use base::Event;
use sys_util::EventFd;
async fn write_event(ev: Event, wait: Event) {
async fn write_event(ev: EventFd, wait: EventFd) {
let wait = UringSource::new(wait).unwrap();
ev.write(55).unwrap();
read_u64(&wait).await;
@ -156,7 +156,7 @@ mod tests {
read_u64(&wait).await;
}
async fn read_events(ev: Event, signal: Event) {
async fn read_events(ev: EventFd, signal: EventFd) {
let source = UringSource::new(ev).unwrap();
assert_eq!(read_u64(&source).await, 55);
signal.write(1).unwrap();
@ -166,8 +166,8 @@ mod tests {
signal.write(1).unwrap();
}
let event = Event::new().unwrap();
let signal_wait = Event::new().unwrap();
let event = EventFd::new().unwrap();
let signal_wait = EventFd::new().unwrap();
let write_task = write_event(event.try_clone().unwrap(), signal_wait.try_clone().unwrap());
let read_task = read_events(event, signal_wait);
let joined = futures::future::join(read_task, write_task);
@ -180,7 +180,7 @@ mod tests {
use futures::future::Either;
async fn do_test() {
let (read_source, _w) = base::pipe(true).unwrap();
let (read_source, _w) = sys_util::pipe(true).unwrap();
let source = UringSource::new(read_source).unwrap();
let done = async { 5usize };
let pending = read_u64(&source);

View file

@ -8,7 +8,7 @@ use std::num::Wrapping;
use std::rc::Rc;
use std::sync::atomic::{fence, Ordering};
use base::{error, AsRawDescriptor};
use base::error;
use cros_async::{AsyncError, EventAsync};
use virtio_sys::virtio_ring::VIRTIO_RING_F_EVENT_IDX;
use vm_memory::{GuestAddress, GuestMemory};
@ -385,10 +385,10 @@ impl Queue {
/// Asynchronously read the next descriptor chain from the queue.
/// Returns a `DescriptorChain` when it is `await`ed.
pub async fn next_async<F: AsRawDescriptor + Unpin>(
pub async fn next_async(
&mut self,
mem: &GuestMemory,
eventfd: &mut EventAsync<F>,
eventfd: &mut EventAsync,
) -> std::result::Result<DescriptorChain, AsyncError> {
loop {
// Check if there are more descriptors available.

View file

@ -382,7 +382,7 @@ pub struct SingleFileDisk {
impl TryFrom<File> for SingleFileDisk {
type Error = Error;
fn try_from(inner: File) -> Result<Self> {
cros_async::new(inner)
cros_async::async_from(inner)
.map_err(Error::CreateSingleFileDisk)
.map(|inner| SingleFileDisk { inner })
}

View file

@ -7,7 +7,7 @@ edition = "2018"
[dependencies]
libc = "*"
syscall_defines = { path = "../syscall_defines" }
base = { path = "../base" }
sys_util = { path = "../sys_util" }
[dev-dependencies]
tempfile = { path = "../tempfile" }

View file

@ -14,7 +14,7 @@ use std::pin::Pin;
use std::ptr::null_mut;
use std::sync::atomic::{AtomicU32, Ordering};
use base::{MappedRegion, MemoryMapping, MemoryMappingBuilder, WatchingEvents};
use sys_util::{MappedRegion, MemoryMapping, Protection, WatchingEvents};
use crate::bindings::*;
use crate::syscalls::*;
@ -30,11 +30,11 @@ pub enum Error {
/// The call to `io_uring_setup` failed with the given errno.
Setup(libc::c_int),
/// Failed to map the completion ring.
MappingCompleteRing(base::MmapError),
MappingCompleteRing(sys_util::MmapError),
/// Failed to map the submit ring.
MappingSubmitRing(base::MmapError),
MappingSubmitRing(sys_util::MmapError),
/// Failed to map submit entries.
MappingSubmitEntries(base::MmapError),
MappingSubmitEntries(sys_util::MmapError),
/// Too many ops are already queued.
NoSpace,
}
@ -78,7 +78,7 @@ pub struct URingStats {
/// # use std::fs::File;
/// # use std::os::unix::io::AsRawFd;
/// # use std::path::Path;
/// # use base::WatchingEvents;
/// # use sys_util::WatchingEvents;
/// # use io_uring::URingContext;
/// let f = File::open(Path::new("/dev/zero")).unwrap();
/// let mut uring = URingContext::new(16).unwrap();
@ -121,40 +121,40 @@ impl URingContext {
// Safe because we trust the kernel to set valid sizes in `io_uring_setup` and any error
// is checked.
let submit_ring = SubmitQueueState::new(
MemoryMappingBuilder::new(
MemoryMapping::from_fd_offset_protection_populate(
&ring_file,
ring_params.sq_off.array as usize
+ ring_params.sq_entries as usize * std::mem::size_of::<u32>(),
u64::from(IORING_OFF_SQ_RING),
Protection::read_write(),
true,
)
.from_descriptor(&ring_file)
.offset(u64::from(IORING_OFF_SQ_RING))
.populate()
.build()
.map_err(Error::MappingSubmitRing)?,
&ring_params,
);
let num_sqe = ring_params.sq_entries as usize;
let submit_queue_entries = SubmitQueueEntries {
mmap: MemoryMappingBuilder::new(
mmap: MemoryMapping::from_fd_offset_protection_populate(
&ring_file,
ring_params.sq_entries as usize * std::mem::size_of::<io_uring_sqe>(),
u64::from(IORING_OFF_SQES),
Protection::read_write(),
true,
)
.from_descriptor(&ring_file)
.offset(u64::from(IORING_OFF_SQES))
.populate()
.build()
.map_err(Error::MappingSubmitEntries)?,
len: num_sqe,
};
let complete_ring = CompleteQueueState::new(
MemoryMappingBuilder::new(
MemoryMapping::from_fd_offset_protection_populate(
&ring_file,
ring_params.cq_off.cqes as usize
+ ring_params.cq_entries as usize * std::mem::size_of::<io_uring_cqe>(),
u64::from(IORING_OFF_CQ_RING),
Protection::read_write(),
true,
)
.from_descriptor(&ring_file)
.offset(u64::from(IORING_OFF_CQ_RING))
.populate()
.build()
.map_err(Error::MappingCompleteRing)?,
&ring_params,
);
@ -735,7 +735,7 @@ mod tests {
use std::path::{Path, PathBuf};
use std::time::Duration;
use base::PollContext;
use sys_util::PollContext;
use tempfile::{tempfile, TempDir};
use super::*;

View file

@ -6,8 +6,8 @@ mod msg_on_socket;
mod serializable_descriptors;
use base::{
handle_eintr, net::UnixSeqpacket, AsRawDescriptor, Error as SysError, Fd, RawDescriptor,
ScmSocket, UnsyncMarker,
handle_eintr, net::UnixSeqpacket, AsRawDescriptor, Error as SysError, RawDescriptor, ScmSocket,
UnsyncMarker,
};
use std::io::{IoSlice, Result};
use std::marker::PhantomData;
@ -215,7 +215,7 @@ impl<'a, I: MsgOnSocket, O: MsgOnSocket> AsyncReceiver<'a, I, O> {
}
pub async fn next(&mut self) -> MsgResult<O> {
let p = cros_async::new(Fd(self.inner.as_raw_descriptor())).unwrap();
let p = cros_async::async_from(&self.inner.sock).unwrap();
p.wait_readable().await.unwrap();
self.inner.recv()
}

View file

@ -339,6 +339,12 @@ impl AsRawFd for UnixSeqpacket {
}
}
impl AsRawFd for &UnixSeqpacket {
fn as_raw_fd(&self) -> RawFd {
self.fd
}
}
impl AsRawDescriptor for UnixSeqpacket {
fn as_raw_descriptor(&self) -> RawDescriptor {
self.fd