From b3fca20e24b8559d5de157342b89ca44785a3928 Mon Sep 17 00:00:00 2001 From: Noah Gold Date: Thu, 5 Nov 2020 19:10:36 -0800 Subject: [PATCH] Refactor cros_async interface to avoid RawFd. Instead of creating IoSourceExt from AsRawFd implementers, we've switched to creating from a marker trait `IntoAsync`. This lets us use other types like RawDescriptor easily with this crate. By using the marker, we also provide some type safety by requiring consumers of IoSourceExt to declare that their type is expected to work with async operations. This way we can provide stronger guarantees that an async IO trait object will behave in a reasonable way. This CL also purges the cros_async -> base and io_uring -> base references, and provides the base types needed to add new async primitives to base. BUG=none TEST=builds Change-Id: I0b0ce6ca7938b22ae8e8fb4e604439f0292678f2 Reviewed-on: https://chromium-review.googlesource.com/c/chromiumos/platform/crosvm/+/2504481 Tested-by: kokoro Commit-Queue: Noah Gold Reviewed-by: Chirantan Ekbote --- Cargo.lock | 5 +- base/Cargo.toml | 1 + base/src/async_types.rs | 25 +++++++++ base/src/event.rs | 2 +- base/src/lib.rs | 2 + base/src/wait_context.rs | 2 +- cros_async/Cargo.toml | 3 +- cros_async/src/event.rs | 65 ++++++++++++++---------- cros_async/src/fd_executor.rs | 14 ++--- cros_async/src/io_ext.rs | 54 +++++++++++++------- cros_async/src/lib.rs | 3 +- cros_async/src/poll_source.rs | 44 ++++++++-------- cros_async/src/uring_executor.rs | 18 +++---- cros_async/src/uring_futures/read_vec.rs | 12 ++--- devices/src/virtio/queue.rs | 6 +-- disk/src/disk.rs | 2 +- io_uring/Cargo.toml | 2 +- io_uring/src/uring.rs | 42 +++++++-------- msg_socket/src/lib.rs | 6 +-- sys_util/src/net.rs | 6 +++ 20 files changed, 189 insertions(+), 125 deletions(-) create mode 100644 base/src/async_types.rs diff --git a/Cargo.lock b/Cargo.lock index fef00b4f39..facf50de50 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -77,6 +77,7 @@ dependencies = [ name = "base" version = "0.1.0" dependencies = [ + "cros_async 0.1.0", "data_model 0.1.0", "libc 0.2.65 (registry+https://github.com/rust-lang/crates.io-index)", "sync 0.1.0", @@ -127,13 +128,13 @@ name = "cros_async" version = "0.1.0" dependencies = [ "async-trait 0.1.36 (registry+https://github.com/rust-lang/crates.io-index)", - "base 0.1.0", "futures 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", "io_uring 0.1.0", "libc 0.2.65 (registry+https://github.com/rust-lang/crates.io-index)", "paste 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", "pin-utils 0.1.0-alpha.4 (registry+https://github.com/rust-lang/crates.io-index)", "slab 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)", + "sys_util 0.1.0", "syscall_defines 0.1.0", "thiserror 1.0.20 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -427,8 +428,8 @@ dependencies = [ name = "io_uring" version = "0.1.0" dependencies = [ - "base 0.1.0", "libc 0.2.65 (registry+https://github.com/rust-lang/crates.io-index)", + "sys_util 0.1.0", "syscall_defines 0.1.0", ] diff --git a/base/Cargo.toml b/base/Cargo.toml index c4e19de8d3..2520d42568 100644 --- a/base/Cargo.toml +++ b/base/Cargo.toml @@ -8,6 +8,7 @@ edition = "2018" chromeos = ["sys_util/chromeos"] [dependencies] +cros_async = { path = "../cros_async" } data_model = { path = "../data_model" } libc = "*" sync = { path = "../sync" } diff --git a/base/src/async_types.rs b/base/src/async_types.rs new file mode 100644 index 0000000000..4fbe53e93e --- /dev/null +++ b/base/src/async_types.rs @@ -0,0 +1,25 @@ +// Copyright 2020 The Chromium OS Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +use crate::AsRawDescriptor; +use cros_async::IntoAsync; +use std::os::unix::io::{AsRawFd, RawFd}; + +/// Like `cros_async::IntoAsync`, except for use with crosvm's AsRawDescriptor +/// trait object family. +pub trait DescriptorIntoAsync: AsRawDescriptor {} + +/// To use an IO struct with cros_async, the type must be marked with +/// DescriptorIntoAsync (to signify it is suitable for use with async +/// operations), and then wrapped with this type. +pub struct DescriptorAdapter(pub T); +impl AsRawFd for DescriptorAdapter +where + T: DescriptorIntoAsync, +{ + fn as_raw_fd(&self) -> RawFd { + self.0.as_raw_descriptor() + } +} +impl IntoAsync for DescriptorAdapter where T: DescriptorIntoAsync {} diff --git a/base/src/event.rs b/base/src/event.rs index fa01dc6cc8..abd9433c31 100644 --- a/base/src/event.rs +++ b/base/src/event.rs @@ -15,7 +15,7 @@ pub use sys_util::EventReadResult; /// See [EventFd](sys_util::EventFd) for struct- and method-level /// documentation. #[derive(Debug, PartialEq, Eq)] -pub struct Event(pub(self) EventFd); +pub struct Event(pub EventFd); impl Event { pub fn new() -> Result { EventFd::new().map(|eventfd| Event(eventfd)) diff --git a/base/src/lib.rs b/base/src/lib.rs index 9a9ca2b53f..6a47fe86e7 100644 --- a/base/src/lib.rs +++ b/base/src/lib.rs @@ -4,6 +4,7 @@ pub use sys_util::*; +mod async_types; mod event; mod ioctl; mod mmap; @@ -11,6 +12,7 @@ mod shm; mod timer; mod wait_context; +pub use async_types::*; pub use event::{Event, EventReadResult, ScopedEvent}; pub use ioctl::{ ioctl, ioctl_with_mut_ptr, ioctl_with_mut_ref, ioctl_with_ptr, ioctl_with_ref, ioctl_with_val, diff --git a/base/src/wait_context.rs b/base/src/wait_context.rs index 39371f13ec..86a048805d 100644 --- a/base/src/wait_context.rs +++ b/base/src/wait_context.rs @@ -42,7 +42,7 @@ pub enum EventType { /// /// ``` /// # use base::{ -/// Result, Event, WaitContext, EventMethods, EventTrigger, +/// Result, Event, WaitContext, /// }; /// # fn test() -> Result<()> { /// let evt1 = Event::new()?; diff --git a/cros_async/Cargo.toml b/cros_async/Cargo.toml index b502d24883..e2356783ef 100644 --- a/cros_async/Cargo.toml +++ b/cros_async/Cargo.toml @@ -10,7 +10,7 @@ io_uring = { path = "../io_uring" } libc = "*" paste = "*" pin-utils = "0.1.0-alpha.4" -base = { path = "../base" } +sys_util = { path = "../sys_util" } syscall_defines = { path = "../syscall_defines" } slab = "0.4" thiserror = "1.0.20" @@ -21,6 +21,5 @@ default-features = false features = ["alloc"] [dev-dependencies] -base = { path = "../base" } tempfile = { path = "../tempfile" } vm_memory = { path = "../vm_memory" } diff --git a/cros_async/src/event.rs b/cros_async/src/event.rs index fcd51950bc..03f419b355 100644 --- a/cros_async/src/event.rs +++ b/cros_async/src/event.rs @@ -2,27 +2,29 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. -use crate::{new, AsyncResult, IoSourceExt}; -use std::os::unix::io::AsRawFd; +use crate::io_ext::async_from; +use crate::{AsyncError, AsyncResult, IntoAsync, IoSourceExt}; +use std::convert::TryFrom; +use sys_util::EventFd; -/// An async version of sys_util::EventFd. -pub struct EventAsync { - io_source: Box + 'static>, +/// An async version of `sys_util::EventFd`. +pub struct EventAsync { + io_source: Box>, } -impl EventAsync { - /// Creates a new EventAsync wrapper around the provided eventfd. - #[allow(dead_code)] - #[allow(clippy::new_ret_no_self)] - pub fn new(g: G) -> AsyncResult> { - Ok(EventAsync { io_source: new(g)? }) +impl EventAsync { + #[cfg(test)] + pub(crate) fn new_poll(event: EventFd) -> AsyncResult { + Ok(EventAsync { + io_source: crate::io_ext::async_poll_from(event)?, + }) } - /// Like new, but allows the source to be constructed directly. Used for - /// testing only. #[cfg(test)] - pub(crate) fn new_from_source(io_source: Box + 'static>) -> EventAsync { - EventAsync { io_source } + pub(crate) fn new_uring(event: EventFd) -> AsyncResult { + Ok(EventAsync { + io_source: crate::io_ext::async_uring_from(event)?, + }) } /// Gets the next value from the eventfd. @@ -32,21 +34,33 @@ impl EventAsync { } } +impl TryFrom for EventAsync { + type Error = AsyncError; + + /// Creates a new EventAsync wrapper around the provided eventfd. + fn try_from(event: EventFd) -> AsyncResult { + Ok(EventAsync { + io_source: async_from(event)?, + }) + } +} + +impl IntoAsync for EventFd {} + #[cfg(test)] mod tests { use super::*; - use crate::io_ext::{new_poll, new_uring}; - use base::Event; use futures::pin_mut; + use std::convert::TryInto; #[test] fn next_val_reads_value() { - async fn go(event: Event) -> u64 { - let event_async = EventAsync::new(event).unwrap(); + async fn go(event: EventFd) -> u64 { + let event_async: EventAsync = event.try_into().unwrap(); event_async.next_val().await.unwrap() } - let eventfd = Event::new().unwrap(); + let eventfd = EventFd::new().unwrap(); eventfd.write(0xaa).unwrap(); let fut = go(eventfd); pin_mut!(fut); @@ -56,21 +70,20 @@ mod tests { #[test] fn next_val_reads_value_poll_and_ring() { - async fn go(source: Box + 'static>) -> u64 { - let event_async = EventAsync::new_from_source(source); + async fn go(event_async: EventAsync) -> u64 { event_async.next_val().await.unwrap() } - let eventfd = Event::new().unwrap(); + let eventfd = EventFd::new().unwrap(); eventfd.write(0xaa).unwrap(); - let fut = go(new_poll(eventfd).unwrap()); + let fut = go(EventAsync::new_uring(eventfd).unwrap()); pin_mut!(fut); let val = crate::run_executor(crate::RunOne::new(fut)).unwrap(); assert_eq!(val, 0xaa); - let eventfd = Event::new().unwrap(); + let eventfd = EventFd::new().unwrap(); eventfd.write(0xaa).unwrap(); - let fut = go(new_uring(eventfd).unwrap()); + let fut = go(EventAsync::new_poll(eventfd).unwrap()); pin_mut!(fut); let val = crate::run_executor(crate::RunOne::new(fut)).unwrap(); assert_eq!(val, 0xaa); diff --git a/cros_async/src/fd_executor.rs b/cros_async/src/fd_executor.rs index 4e20b2fc3c..515391e903 100644 --- a/cros_async/src/fd_executor.rs +++ b/cros_async/src/fd_executor.rs @@ -21,7 +21,7 @@ use std::task::Waker; use slab::Slab; -use base::{PollContext, WatchingEvents}; +use sys_util::{PollContext, WatchingEvents}; use crate::executor::{ExecutableFuture, Executor, FutureList}; use crate::WakerToken; @@ -31,15 +31,15 @@ pub enum Error { /// Attempts to create two Executors on the same thread fail. AttemptedDuplicateExecutor, /// Failed to copy the FD for the polling context. - DuplicatingFd(base::Error), + DuplicatingFd(sys_util::Error), /// Failed accessing the thread local storage for wakers. InvalidContext, /// Creating a context to wait on FDs failed. - CreatingContext(base::Error), + CreatingContext(sys_util::Error), /// PollContext failure. - PollContextError(base::Error), + PollContextError(sys_util::Error), /// Failed to submit the waker to the polling context. - SubmittingWaker(base::Error), + SubmittingWaker(sys_util::Error), /// A Waker was canceled, but the operation isn't running. UnknownWaker, } @@ -274,7 +274,7 @@ impl Drop for FdExecutor { unsafe fn dup_fd(fd: RawFd) -> Result { let ret = libc::dup(fd); if ret < 0 { - Err(Error::DuplicatingFd(base::Error::last())) + Err(Error::DuplicatingFd(sys_util::Error::last())) } else { Ok(ret) } @@ -338,7 +338,7 @@ mod test { #[test] fn test_it() { async fn do_test() { - let (r, _w) = base::pipe(true).unwrap(); + let (r, _w) = sys_util::pipe(true).unwrap(); let done = Box::pin(async { 5usize }); let pending = Box::pin(TestFut::new(r)); match futures::future::select(pending, done).await { diff --git a/cros_async/src/io_ext.rs b/cros_async/src/io_ext.rs index 0a946e3c12..5743d3dff4 100644 --- a/cros_async/src/io_ext.rs +++ b/cros_async/src/io_ext.rs @@ -18,11 +18,13 @@ use crate::poll_source::PollSource; use crate::UringSource; use async_trait::async_trait; +use std::fs::File; use std::os::unix::io::AsRawFd; use std::rc::Rc; use thiserror::Error as ThisError; use crate::uring_mem::{BackingMemory, MemRegion}; +use sys_util::net::UnixSeqpacket; #[derive(ThisError, Debug)] pub enum Error { @@ -94,27 +96,41 @@ pub trait IoSourceExt: ReadAsync + WriteAsync { fn as_source(&self) -> &F; } +/// Marker trait signifying that the implementor is suitable for use with +/// cros_async. Examples of this include File, and sys_util::net::UnixSeqpacket. +/// +/// (Note: it'd be really nice to implement a TryFrom for any implementors, and +/// remove our factory functions. Unfortunately +/// https://github.com/rust-lang/rust/issues/50133 makes that too painful.) +pub trait IntoAsync: AsRawFd {} + +impl IntoAsync for File {} +impl IntoAsync for UnixSeqpacket {} +impl IntoAsync for &UnixSeqpacket {} + /// Creates a concrete `IoSourceExt` that uses uring if available or falls back to the fd_executor if not. /// Note that on older kernels (pre 5.6) FDs such as event or timer FDs are unreliable when /// having readvwritev performed through io_uring. To deal with EventFd or TimerFd, use /// `IoSourceExt::read_u64`. -pub fn new<'a, F: AsRawFd + 'a>(f: F) -> Result + 'a>> { +pub fn async_from<'a, F: IntoAsync + 'a>(f: F) -> Result + 'a>> { if crate::uring_executor::use_uring() { - new_uring(f) + async_uring_from(f) } else { - new_poll(f) + async_poll_from(f) } } /// Creates a concrete `IoSourceExt` using Uring. -pub(crate) fn new_uring<'a, F: AsRawFd + 'a>(f: F) -> Result + 'a>> { +pub(crate) fn async_uring_from<'a, F: IntoAsync + 'a>( + f: F, +) -> Result + 'a>> { UringSource::new(f) .map(|u| Box::new(u) as Box>) .map_err(Error::Uring) } /// Creates a concrete `IoSourceExt` using the fd_executor. -pub(crate) fn new_poll<'a, F: AsRawFd + 'a>(f: F) -> Result + 'a>> { +pub(crate) fn async_poll_from<'a, F: IntoAsync + 'a>(f: F) -> Result + 'a>> { PollSource::new(f) .map(|u| Box::new(u) as Box>) .map_err(Error::Poll) @@ -145,7 +161,7 @@ mod tests { } let f = File::open("/dev/zero").unwrap(); - let uring_source = new_uring(f).unwrap(); + let uring_source = async_uring_from(f).unwrap(); let fut = go(uring_source); pin_mut!(fut); crate::uring_executor::URingExecutor::new(crate::RunOne::new(fut)) @@ -154,7 +170,7 @@ mod tests { .unwrap(); let f = File::open("/dev/zero").unwrap(); - let poll_source = new_poll(f).unwrap(); + let poll_source = async_poll_from(f).unwrap(); let fut = go(poll_source); pin_mut!(fut); crate::fd_executor::FdExecutor::new(crate::RunOne::new(fut)) @@ -175,7 +191,7 @@ mod tests { } let f = OpenOptions::new().write(true).open("/dev/null").unwrap(); - let uring_source = new_uring(f).unwrap(); + let uring_source = async_uring_from(f).unwrap(); let fut = go(uring_source); pin_mut!(fut); crate::uring_executor::URingExecutor::new(crate::RunOne::new(fut)) @@ -184,7 +200,7 @@ mod tests { .unwrap(); let f = OpenOptions::new().write(true).open("/dev/null").unwrap(); - let poll_source = new_poll(f).unwrap(); + let poll_source = async_poll_from(f).unwrap(); let fut = go(poll_source); pin_mut!(fut); crate::fd_executor::FdExecutor::new(crate::RunOne::new(fut)) @@ -223,7 +239,7 @@ mod tests { } let f = File::open("/dev/zero").unwrap(); - let uring_source = new_uring(f).unwrap(); + let uring_source = async_uring_from(f).unwrap(); let fut = go(uring_source); pin_mut!(fut); crate::uring_executor::URingExecutor::new(crate::RunOne::new(fut)) @@ -232,7 +248,7 @@ mod tests { .unwrap(); let f = File::open("/dev/zero").unwrap(); - let poll_source = new_poll(f).unwrap(); + let poll_source = async_poll_from(f).unwrap(); let fut = go(poll_source); pin_mut!(fut); crate::fd_executor::FdExecutor::new(crate::RunOne::new(fut)) @@ -257,7 +273,7 @@ mod tests { } let f = OpenOptions::new().write(true).open("/dev/null").unwrap(); - let uring_source = new_uring(f).unwrap(); + let uring_source = async_uring_from(f).unwrap(); let fut = go(uring_source); pin_mut!(fut); crate::uring_executor::URingExecutor::new(crate::RunOne::new(fut)) @@ -266,7 +282,7 @@ mod tests { .unwrap(); let f = OpenOptions::new().write(true).open("/dev/null").unwrap(); - let poll_source = new_poll(f).unwrap(); + let poll_source = async_poll_from(f).unwrap(); let fut = go(poll_source); pin_mut!(fut); crate::fd_executor::FdExecutor::new(crate::RunOne::new(fut)) @@ -277,8 +293,8 @@ mod tests { #[test] fn read_u64s() { - async fn go(async_source: F) -> u64 { - let source = new(async_source).unwrap(); + async fn go(async_source: File) -> u64 { + let source = async_from(async_source).unwrap(); source.read_u64().await.unwrap() } @@ -294,7 +310,7 @@ mod tests { #[test] fn read_eventfds() { - use base::EventFd; + use sys_util::EventFd; async fn go(source: Box>) -> u64 { source.read_u64().await.unwrap() @@ -302,7 +318,7 @@ mod tests { let eventfd = EventFd::new().unwrap(); eventfd.write(0x55).unwrap(); - let fut = go(new(eventfd).unwrap()); + let fut = go(async_uring_from(eventfd).unwrap()); pin_mut!(fut); let val = crate::uring_executor::URingExecutor::new(crate::RunOne::new(fut)) .unwrap() @@ -312,7 +328,7 @@ mod tests { let eventfd = EventFd::new().unwrap(); eventfd.write(0xaa).unwrap(); - let fut = go(new_poll(eventfd).unwrap()); + let fut = go(async_poll_from(eventfd).unwrap()); pin_mut!(fut); let val = crate::fd_executor::FdExecutor::new(crate::RunOne::new(fut)) .unwrap() @@ -334,7 +350,7 @@ mod tests { } let f = tempfile::tempfile().unwrap(); - let source = new(f).unwrap(); + let source = async_from(f).unwrap(); let fut = go(source); pin_mut!(fut); diff --git a/cros_async/src/lib.rs b/cros_async/src/lib.rs index 7438ec818c..7bde5116b6 100644 --- a/cros_async/src/lib.rs +++ b/cros_async/src/lib.rs @@ -74,7 +74,8 @@ mod waker; pub use event::EventAsync; pub use executor::Executor; pub use io_ext::{ - new, Error as AsyncError, IoSourceExt, ReadAsync, Result as AsyncResult, WriteAsync, + async_from, Error as AsyncError, IntoAsync, IoSourceExt, ReadAsync, Result as AsyncResult, + WriteAsync, }; pub use poll_source::PollSource; pub use select::SelectResult; diff --git a/cros_async/src/poll_source.rs b/cros_async/src/poll_source.rs index ed9404c3d6..f100368669 100644 --- a/cros_async/src/poll_source.rs +++ b/cros_async/src/poll_source.rs @@ -21,7 +21,7 @@ use crate::uring_mem::{BackingMemory, BorrowedIoVec, MemRegion}; use crate::AsyncError; use crate::AsyncResult; use crate::{IoSourceExt, ReadAsync, WriteAsync}; -use base::{self, add_fd_flags}; +use sys_util::{self, add_fd_flags}; use thiserror::Error as ThisError; #[derive(ThisError, Debug)] @@ -31,23 +31,23 @@ pub enum Error { AddingWaker(fd_executor::Error), /// An error occurred when executing fallocate synchronously. #[error("An error occurred when executing fallocate synchronously: {0}")] - Fallocate(base::Error), + Fallocate(sys_util::Error), /// An error occurred when executing fsync synchronously. #[error("An error occurred when executing fsync synchronously: {0}")] - Fsync(base::Error), + Fsync(sys_util::Error), /// An error occurred when reading the FD. /// #[error("An error occurred when reading the FD: {0}.")] - Read(base::Error), + Read(sys_util::Error), /// Can't seek file. #[error("An error occurred when seeking the FD: {0}.")] - Seeking(base::Error), + Seeking(sys_util::Error), /// An error occurred when setting the FD non-blocking. #[error("An error occurred setting the FD non-blocking: {0}.")] - SettingNonBlocking(base::Error), + SettingNonBlocking(sys_util::Error), /// An error occurred when writing the FD. #[error("An error occurred when writing the FD: {0}.")] - Write(base::Error), + Write(sys_util::Error), } pub type Result = std::result::Result; @@ -186,7 +186,7 @@ impl WriteAsync for PollSource { if ret == 0 { Ok(()) } else { - Err(AsyncError::Poll(Error::Fallocate(base::Error::last()))) + Err(AsyncError::Poll(Error::Fallocate(sys_util::Error::last()))) } } @@ -196,7 +196,7 @@ impl WriteAsync for PollSource { if ret == 0 { Ok(()) } else { - Err(AsyncError::Poll(Error::Fsync(base::Error::last()))) + Err(AsyncError::Poll(Error::Fsync(sys_util::Error::last()))) } } } @@ -236,7 +236,7 @@ impl<'a, F: AsRawFd> Future for PollReadVec<'a, F> { type Output = Result<(usize, Vec)>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { - fn do_read(fd: RawFd, file_offset: u64, buf: &mut [u8]) -> base::Result { + fn do_read(fd: RawFd, file_offset: u64, buf: &mut [u8]) -> sys_util::Result { // Safe because we trust the kernel not to write past the length given and the length is // guaranteed to be valid from the pointer by the mut slice. let ret = unsafe { @@ -249,7 +249,7 @@ impl<'a, F: AsRawFd> Future for PollReadVec<'a, F> { }; match ret { n if n >= 0 => Ok(n as usize), - _ => base::errno_result(), + _ => sys_util::errno_result(), } } @@ -293,13 +293,13 @@ impl<'a, F: AsRawFd> Future for PollReadU64<'a, F> { type Output = Result; fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { - fn do_read(fd: RawFd, buf: &mut [u8]) -> base::Result { + fn do_read(fd: RawFd, buf: &mut [u8]) -> sys_util::Result { // Safe because we trust the kernel not to write past the length given and the length is // guaranteed to be valid from the pointer by the mut slice. let ret = unsafe { libc::read(fd, buf.as_mut_ptr() as *mut _, buf.len()) }; match ret { n if n >= 0 => Ok(n as usize), - _ => base::errno_result(), + _ => sys_util::errno_result(), } } @@ -343,10 +343,10 @@ impl<'a, F: AsRawFd> Future for PollWaitReadable<'a, F> { type Output = Result<()>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { - fn do_poll(fd: RawFd) -> base::Result { + fn do_poll(fd: RawFd) -> sys_util::Result { let poll_arg = libc::pollfd { fd, - events: base::WatchingEvents::empty().set_read().get_raw() as i16, + events: sys_util::WatchingEvents::empty().set_read().get_raw() as i16, revents: 0, }; @@ -354,7 +354,7 @@ impl<'a, F: AsRawFd> Future for PollWaitReadable<'a, F> { let ret = unsafe { libc::poll(&poll_arg as *const _ as *mut _, 1, 0) }; match ret { n if n >= 0 => Ok(n as usize), - _ => base::errno_result(), + _ => sys_util::errno_result(), } } @@ -390,7 +390,7 @@ impl<'a, F: AsRawFd> Future for PollWriteVec<'a, F> { type Output = Result<(usize, Vec)>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { - fn do_write(fd: RawFd, file_offset: u64, buf: &[u8]) -> base::Result { + fn do_write(fd: RawFd, file_offset: u64, buf: &[u8]) -> sys_util::Result { // Safe because we trust the kernel not to write to the buffer, only read from it and // the length is guaranteed to be valid from the pointer by the mut slice. let ret = unsafe { @@ -403,7 +403,7 @@ impl<'a, F: AsRawFd> Future for PollWriteVec<'a, F> { }; match ret { n if n >= 0 => Ok(n as usize), - _ => base::errno_result(), + _ => sys_util::errno_result(), } } @@ -455,7 +455,7 @@ impl<'a, F: AsRawFd> Future for PollReadMem<'a, F> { file_offset: u64, mem: &dyn BackingMemory, mem_offsets: &[MemRegion], - ) -> base::Result { + ) -> sys_util::Result { let mut iovecs = mem_offsets .iter() .filter_map(|&mem_vec| mem.get_iovec(mem_vec).ok()) @@ -472,7 +472,7 @@ impl<'a, F: AsRawFd> Future for PollReadMem<'a, F> { }; match ret { n if n >= 0 => Ok(n as usize), - _ => base::errno_result(), + _ => sys_util::errno_result(), } } @@ -523,7 +523,7 @@ impl<'a, F: AsRawFd> Future for PollWriteMem<'a, F> { file_offset: u64, mem: &dyn BackingMemory, mem_offsets: &[MemRegion], - ) -> base::Result { + ) -> sys_util::Result { let iovecs = mem_offsets .iter() .map(|&mem_vec| mem.get_iovec(mem_vec)) @@ -541,7 +541,7 @@ impl<'a, F: AsRawFd> Future for PollWriteMem<'a, F> { }; match ret { n if n >= 0 => Ok(n as usize), - _ => base::errno_result(), + _ => sys_util::errno_result(), } } diff --git a/cros_async/src/uring_executor.rs b/cros_async/src/uring_executor.rs index 3d27eda046..462582f6f7 100644 --- a/cros_async/src/uring_executor.rs +++ b/cros_async/src/uring_executor.rs @@ -68,8 +68,8 @@ use std::task::{Context, Poll}; use futures::pin_mut; use slab::Slab; -use base::WatchingEvents; use io_uring::URingContext; +use sys_util::WatchingEvents; use crate::executor::{ExecutableFuture, Executor, FutureList}; use crate::uring_mem::{BackingMemory, MemRegion}; @@ -80,7 +80,7 @@ pub enum Error { /// Attempts to create two Executors on the same thread fail. AttemptedDuplicateExecutor, /// Failed to copy the FD for the polling context. - DuplicatingFd(base::Error), + DuplicatingFd(sys_util::Error), /// Failed accessing the thread local storage for wakers. InvalidContext, /// Invalid offset or length given for an iovec in backing memory. @@ -346,7 +346,7 @@ impl RingWakerState { fn submit_poll( &mut self, source: &RegisteredSource, - events: &base::WatchingEvents, + events: &sys_util::WatchingEvents, ) -> Result { let src = self .registered_sources @@ -679,7 +679,7 @@ impl Drop for URingExecutor { unsafe fn dup_fd(fd: RawFd) -> Result { let ret = libc::dup(fd); if ret < 0 { - Err(Error::DuplicatingFd(base::Error::last())) + Err(Error::DuplicatingFd(sys_util::Error::last())) } else { Ok(ret) } @@ -733,7 +733,7 @@ mod tests { let bm = Rc::new(VecIoWrapper::from(vec![0u8; 4096])) as Rc; // Use pipes to create a future that will block forever. - let (rx, mut tx) = base::pipe(true).unwrap(); + let (rx, mut tx) = sys_util::pipe(true).unwrap(); // Set up the TLS for the uring_executor by creating one. let _ex = URingExecutor::new(crate::executor::UnitFutures::new()).unwrap(); @@ -771,7 +771,7 @@ mod tests { let bm = Rc::new(VecIoWrapper::from(vec![0u8; 4096])) as Rc; // Use pipes to create a future that will block forever. - let (mut rx, tx) = base::new_pipe_full().expect("Pipe failed"); + let (mut rx, tx) = sys_util::new_pipe_full().expect("Pipe failed"); // Set up the TLS for the uring_executor by creating one. let _ex = URingExecutor::new(crate::executor::UnitFutures::new()).unwrap(); @@ -797,7 +797,7 @@ mod tests { // Finishing the operation should put the Rc count back to 1. // write to the pipe to wake the read pipe and then wait for the uring result in the // executor. - let mut buf = vec![0u8; base::round_up_to_page_size(1)]; + let mut buf = vec![0u8; sys_util::round_up_to_page_size(1)]; rx.read(&mut buf).expect("read to empty failed"); RingWakerState::wait_wake_event().expect("Failed to wait for read pipe ready"); assert_eq!(Rc::strong_count(&bm), 1); @@ -806,7 +806,7 @@ mod tests { #[test] fn registered_source_outlives_executor() { let bm = Rc::new(VecIoWrapper::from(vec![0u8; 4096])) as Rc; - let (rx, tx) = base::pipe(true).unwrap(); + let (rx, tx) = sys_util::pipe(true).unwrap(); // Register a source before creating the executor. let rx_source = register_source(&rx).expect("register source failed"); @@ -854,7 +854,7 @@ mod tests { let bm = Rc::new(VecIoWrapper::from(vec![0u8; 16])) as Rc; - let (rx, tx) = base::pipe(true).expect("Pipe failed"); + let (rx, tx) = sys_util::pipe(true).expect("Pipe failed"); let mut ex = URingExecutor::new(crate::executor::UnitFutures::new()).unwrap(); diff --git a/cros_async/src/uring_futures/read_vec.rs b/cros_async/src/uring_futures/read_vec.rs index f80472b21d..09d851d85b 100644 --- a/cros_async/src/uring_futures/read_vec.rs +++ b/cros_async/src/uring_futures/read_vec.rs @@ -144,9 +144,9 @@ mod tests { #[test] fn event() { - use base::Event; + use sys_util::EventFd; - async fn write_event(ev: Event, wait: Event) { + async fn write_event(ev: EventFd, wait: EventFd) { let wait = UringSource::new(wait).unwrap(); ev.write(55).unwrap(); read_u64(&wait).await; @@ -156,7 +156,7 @@ mod tests { read_u64(&wait).await; } - async fn read_events(ev: Event, signal: Event) { + async fn read_events(ev: EventFd, signal: EventFd) { let source = UringSource::new(ev).unwrap(); assert_eq!(read_u64(&source).await, 55); signal.write(1).unwrap(); @@ -166,8 +166,8 @@ mod tests { signal.write(1).unwrap(); } - let event = Event::new().unwrap(); - let signal_wait = Event::new().unwrap(); + let event = EventFd::new().unwrap(); + let signal_wait = EventFd::new().unwrap(); let write_task = write_event(event.try_clone().unwrap(), signal_wait.try_clone().unwrap()); let read_task = read_events(event, signal_wait); let joined = futures::future::join(read_task, write_task); @@ -180,7 +180,7 @@ mod tests { use futures::future::Either; async fn do_test() { - let (read_source, _w) = base::pipe(true).unwrap(); + let (read_source, _w) = sys_util::pipe(true).unwrap(); let source = UringSource::new(read_source).unwrap(); let done = async { 5usize }; let pending = read_u64(&source); diff --git a/devices/src/virtio/queue.rs b/devices/src/virtio/queue.rs index 8c805e05c9..45c84bba37 100644 --- a/devices/src/virtio/queue.rs +++ b/devices/src/virtio/queue.rs @@ -8,7 +8,7 @@ use std::num::Wrapping; use std::rc::Rc; use std::sync::atomic::{fence, Ordering}; -use base::{error, AsRawDescriptor}; +use base::error; use cros_async::{AsyncError, EventAsync}; use virtio_sys::virtio_ring::VIRTIO_RING_F_EVENT_IDX; use vm_memory::{GuestAddress, GuestMemory}; @@ -385,10 +385,10 @@ impl Queue { /// Asynchronously read the next descriptor chain from the queue. /// Returns a `DescriptorChain` when it is `await`ed. - pub async fn next_async( + pub async fn next_async( &mut self, mem: &GuestMemory, - eventfd: &mut EventAsync, + eventfd: &mut EventAsync, ) -> std::result::Result { loop { // Check if there are more descriptors available. diff --git a/disk/src/disk.rs b/disk/src/disk.rs index 70d58b280d..3fa5fcaf9f 100644 --- a/disk/src/disk.rs +++ b/disk/src/disk.rs @@ -382,7 +382,7 @@ pub struct SingleFileDisk { impl TryFrom for SingleFileDisk { type Error = Error; fn try_from(inner: File) -> Result { - cros_async::new(inner) + cros_async::async_from(inner) .map_err(Error::CreateSingleFileDisk) .map(|inner| SingleFileDisk { inner }) } diff --git a/io_uring/Cargo.toml b/io_uring/Cargo.toml index ad3a815aae..1b633d396d 100644 --- a/io_uring/Cargo.toml +++ b/io_uring/Cargo.toml @@ -7,7 +7,7 @@ edition = "2018" [dependencies] libc = "*" syscall_defines = { path = "../syscall_defines" } -base = { path = "../base" } +sys_util = { path = "../sys_util" } [dev-dependencies] tempfile = { path = "../tempfile" } diff --git a/io_uring/src/uring.rs b/io_uring/src/uring.rs index cc6ed0f132..48461c3dec 100644 --- a/io_uring/src/uring.rs +++ b/io_uring/src/uring.rs @@ -14,7 +14,7 @@ use std::pin::Pin; use std::ptr::null_mut; use std::sync::atomic::{AtomicU32, Ordering}; -use base::{MappedRegion, MemoryMapping, MemoryMappingBuilder, WatchingEvents}; +use sys_util::{MappedRegion, MemoryMapping, Protection, WatchingEvents}; use crate::bindings::*; use crate::syscalls::*; @@ -30,11 +30,11 @@ pub enum Error { /// The call to `io_uring_setup` failed with the given errno. Setup(libc::c_int), /// Failed to map the completion ring. - MappingCompleteRing(base::MmapError), + MappingCompleteRing(sys_util::MmapError), /// Failed to map the submit ring. - MappingSubmitRing(base::MmapError), + MappingSubmitRing(sys_util::MmapError), /// Failed to map submit entries. - MappingSubmitEntries(base::MmapError), + MappingSubmitEntries(sys_util::MmapError), /// Too many ops are already queued. NoSpace, } @@ -78,7 +78,7 @@ pub struct URingStats { /// # use std::fs::File; /// # use std::os::unix::io::AsRawFd; /// # use std::path::Path; -/// # use base::WatchingEvents; +/// # use sys_util::WatchingEvents; /// # use io_uring::URingContext; /// let f = File::open(Path::new("/dev/zero")).unwrap(); /// let mut uring = URingContext::new(16).unwrap(); @@ -121,40 +121,40 @@ impl URingContext { // Safe because we trust the kernel to set valid sizes in `io_uring_setup` and any error // is checked. let submit_ring = SubmitQueueState::new( - MemoryMappingBuilder::new( + MemoryMapping::from_fd_offset_protection_populate( + &ring_file, ring_params.sq_off.array as usize + ring_params.sq_entries as usize * std::mem::size_of::(), + u64::from(IORING_OFF_SQ_RING), + Protection::read_write(), + true, ) - .from_descriptor(&ring_file) - .offset(u64::from(IORING_OFF_SQ_RING)) - .populate() - .build() .map_err(Error::MappingSubmitRing)?, &ring_params, ); let num_sqe = ring_params.sq_entries as usize; let submit_queue_entries = SubmitQueueEntries { - mmap: MemoryMappingBuilder::new( + mmap: MemoryMapping::from_fd_offset_protection_populate( + &ring_file, ring_params.sq_entries as usize * std::mem::size_of::(), + u64::from(IORING_OFF_SQES), + Protection::read_write(), + true, ) - .from_descriptor(&ring_file) - .offset(u64::from(IORING_OFF_SQES)) - .populate() - .build() .map_err(Error::MappingSubmitEntries)?, len: num_sqe, }; let complete_ring = CompleteQueueState::new( - MemoryMappingBuilder::new( + MemoryMapping::from_fd_offset_protection_populate( + &ring_file, ring_params.cq_off.cqes as usize + ring_params.cq_entries as usize * std::mem::size_of::(), + u64::from(IORING_OFF_CQ_RING), + Protection::read_write(), + true, ) - .from_descriptor(&ring_file) - .offset(u64::from(IORING_OFF_CQ_RING)) - .populate() - .build() .map_err(Error::MappingCompleteRing)?, &ring_params, ); @@ -735,7 +735,7 @@ mod tests { use std::path::{Path, PathBuf}; use std::time::Duration; - use base::PollContext; + use sys_util::PollContext; use tempfile::{tempfile, TempDir}; use super::*; diff --git a/msg_socket/src/lib.rs b/msg_socket/src/lib.rs index 296058726f..bb73ace7c8 100644 --- a/msg_socket/src/lib.rs +++ b/msg_socket/src/lib.rs @@ -6,8 +6,8 @@ mod msg_on_socket; mod serializable_descriptors; use base::{ - handle_eintr, net::UnixSeqpacket, AsRawDescriptor, Error as SysError, Fd, RawDescriptor, - ScmSocket, UnsyncMarker, + handle_eintr, net::UnixSeqpacket, AsRawDescriptor, Error as SysError, RawDescriptor, ScmSocket, + UnsyncMarker, }; use std::io::{IoSlice, Result}; use std::marker::PhantomData; @@ -215,7 +215,7 @@ impl<'a, I: MsgOnSocket, O: MsgOnSocket> AsyncReceiver<'a, I, O> { } pub async fn next(&mut self) -> MsgResult { - let p = cros_async::new(Fd(self.inner.as_raw_descriptor())).unwrap(); + let p = cros_async::async_from(&self.inner.sock).unwrap(); p.wait_readable().await.unwrap(); self.inner.recv() } diff --git a/sys_util/src/net.rs b/sys_util/src/net.rs index aab0a95fe1..5cc87f560c 100644 --- a/sys_util/src/net.rs +++ b/sys_util/src/net.rs @@ -339,6 +339,12 @@ impl AsRawFd for UnixSeqpacket { } } +impl AsRawFd for &UnixSeqpacket { + fn as_raw_fd(&self) -> RawFd { + self.fd + } +} + impl AsRawDescriptor for UnixSeqpacket { fn as_raw_descriptor(&self) -> RawDescriptor { self.fd