From 68bbe92339cf96efb2832bb8ddd61e4921a044a4 Mon Sep 17 00:00:00 2001 From: Noah Gold Date: Tue, 12 Apr 2022 21:36:24 -0700 Subject: [PATCH] cros_async: upstream Windows support. Major changes: * Adds Windows implementations * Refactors cros_async to use the styleguide for cross platform code. * Adds platform specific Descriptor impls to Timer & Event (short term; this will go away in the next CL in the series). Minor adjustments: * The doctests for the Executor were passing the wrong type of seek parameter to write_from_vec. It was disabling seeking (None) when it should have been seeking to zero (Some(0)). BUG=b:213147081 TEST=tested by Windows & Linux bots. Change-Id: Id7e025ceb9f1be4a165de1e9ba824cf60dd076ff Reviewed-on: https://chromium-review.googlesource.com/c/chromiumos/platform/crosvm/+/3579735 Reviewed-by: Alexandre Courbot Reviewed-by: Keiichi Watanabe Tested-by: kokoro Commit-Queue: Noah Gold --- base/src/event.rs | 10 + base/src/timer.rs | 10 + cros_async/Cargo.toml | 10 +- cros_async/src/async_types.rs | 24 +- cros_async/src/audio_streams_async.rs | 11 +- cros_async/src/blocking.rs | 6 +- cros_async/src/blocking/cancellable_pool.rs | 463 +++++++++++++ cros_async/src/blocking/pool.rs | 39 +- cros_async/src/blocking/sys.rs | 6 + cros_async/src/blocking/sys/unix.rs | 5 + .../src/blocking/{ => sys/unix}/block_on.rs | 2 +- cros_async/src/event.rs | 82 +-- cros_async/src/io_ext.rs | 105 ++- cros_async/src/lib.rs | 101 +-- cros_async/src/mem.rs | 3 +- cros_async/src/select.rs | 1 - cros_async/src/sync/cv.rs | 2 + cros_async/src/sync/mu.rs | 2 + cros_async/src/sys.rs | 13 + cros_async/src/sys/unix.rs | 81 +++ cros_async/src/{ => sys}/unix/async_types.rs | 13 +- cros_async/src/sys/unix/event.rs | 78 +++ cros_async/src/{ => sys/unix}/executor.rs | 24 +- cros_async/src/{ => sys/unix}/fd_executor.rs | 6 +- cros_async/src/{ => sys/unix}/poll_source.rs | 19 +- cros_async/src/sys/unix/timer.rs | 83 +++ .../src/{ => sys/unix}/uring_executor.rs | 13 +- cros_async/src/{ => sys/unix}/uring_source.rs | 37 +- cros_async/src/sys/windows.rs | 52 ++ .../src/{win => sys/windows}/async_types.rs | 27 +- cros_async/src/sys/windows/event.rs | 37 + cros_async/src/sys/windows/executor.rs | 291 ++++++++ cros_async/src/sys/windows/handle_executor.rs | 198 ++++++ cros_async/src/sys/windows/handle_source.rs | 633 ++++++++++++++++++ cros_async/src/sys/windows/timer.rs | 28 + cros_async/src/sys/windows/wait_for_handle.rs | 309 +++++++++ cros_async/src/timer.rs | 95 +-- 37 files changed, 2579 insertions(+), 340 deletions(-) create mode 100644 cros_async/src/blocking/cancellable_pool.rs create mode 100644 cros_async/src/blocking/sys.rs create mode 100644 cros_async/src/blocking/sys/unix.rs rename cros_async/src/blocking/{ => sys/unix}/block_on.rs (99%) create mode 100644 cros_async/src/sys.rs create mode 100644 cros_async/src/sys/unix.rs rename cros_async/src/{ => sys}/unix/async_types.rs (79%) create mode 100644 cros_async/src/sys/unix/event.rs rename cros_async/src/{ => sys/unix}/executor.rs (92%) rename cros_async/src/{ => sys/unix}/fd_executor.rs (99%) rename cros_async/src/{ => sys/unix}/poll_source.rs (97%) create mode 100644 cros_async/src/sys/unix/timer.rs rename cros_async/src/{ => sys/unix}/uring_executor.rs (99%) rename cros_async/src/{ => sys/unix}/uring_source.rs (95%) create mode 100644 cros_async/src/sys/windows.rs rename cros_async/src/{win => sys/windows}/async_types.rs (70%) create mode 100644 cros_async/src/sys/windows/event.rs create mode 100644 cros_async/src/sys/windows/executor.rs create mode 100644 cros_async/src/sys/windows/handle_executor.rs create mode 100644 cros_async/src/sys/windows/handle_source.rs create mode 100644 cros_async/src/sys/windows/timer.rs create mode 100644 cros_async/src/sys/windows/wait_for_handle.rs diff --git a/base/src/event.rs b/base/src/event.rs index cefeb77c83..2bb28b045c 100644 --- a/base/src/event.rs +++ b/base/src/event.rs @@ -13,6 +13,9 @@ use crate::{generate_scoped_event, platform::EventFd, RawDescriptor, Result}; #[cfg(unix)] use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; +#[cfg(windows)] +use std::os::windows::io::{AsRawHandle, RawHandle}; + /// See [EventFd](crate::platform::EventFd) for struct- and method-level /// documentation. #[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] @@ -65,4 +68,11 @@ impl AsRawFd for Event { } } +#[cfg(windows)] +impl AsRawHandle for Event { + fn as_raw_handle(&self) -> RawHandle { + self.0.as_raw_handle() + } +} + generate_scoped_event!(Event); diff --git a/base/src/timer.rs b/base/src/timer.rs index af5370f2f5..755451bb50 100644 --- a/base/src/timer.rs +++ b/base/src/timer.rs @@ -12,6 +12,9 @@ use sync::Mutex; #[cfg(unix)] use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; +#[cfg(windows)] +use std::os::windows::io::{AsRawHandle, RawHandle}; + /// See [TimerFd](crate::platform::TimerFd) for struct- and method-level /// documentation. pub struct Timer(pub TimerFd); @@ -21,6 +24,13 @@ impl Timer { } } +#[cfg(windows)] +impl AsRawHandle for Timer { + fn as_raw_handle(&self) -> RawHandle { + self.0.as_raw_handle() + } +} + #[cfg(unix)] impl AsRawFd for Timer { fn as_raw_fd(&self) -> RawFd { diff --git a/cros_async/Cargo.toml b/cros_async/Cargo.toml index b081b74891..0bbe40223c 100644 --- a/cros_async/Cargo.toml +++ b/cros_async/Cargo.toml @@ -7,9 +7,9 @@ edition = "2021" [dependencies] async-trait = "0.1.36" async-task = "4" +cfg-if = "1.0.0" data_model = { path = "../common/data_model" } # provided by ebuild intrusive-collections = "0.9" -io_uring = { path = "../io_uring" } # provided by ebuild libc = "*" once_cell = "1.7.2" paste = "1.0" @@ -23,6 +23,14 @@ audio_streams = { path = "../common/audio_streams" } # provided by ebuild anyhow = "1.0" serde = "*" +[target.'cfg(unix)'.dependencies] +io_uring = { path = "../io_uring" } # provided by ebuild + +[target.'cfg(windows)'.dependencies] +winapi = "*" +win_util = { path = "../win_util" } +smallvec = "*" + [dependencies.futures] version = "*" default-features = false diff --git a/cros_async/src/async_types.rs b/cros_async/src/async_types.rs index 8054de1216..1b3cad2c3f 100644 --- a/cros_async/src/async_types.rs +++ b/cros_async/src/async_types.rs @@ -6,12 +6,8 @@ use crate::{Executor, IntoAsync}; use base::{AsRawDescriptor, RecvTube, SendTube, Tube, TubeResult}; use serde::{de::DeserializeOwned, Serialize}; use std::io; -use std::os::unix::io::{AsRawFd, RawFd}; -#[cfg_attr(windows, path = "win/async_types.rs")] -#[cfg_attr(not(windows), path = "unix/async_types.rs")] -mod async_types; -pub use async_types::*; +pub use crate::sys::async_types::*; /// Like `cros_async::IntoAsync`, except for use with crosvm's AsRawDescriptor /// trait object family. @@ -21,16 +17,18 @@ pub trait DescriptorIntoAsync: AsRawDescriptor {} /// DescriptorIntoAsync (to signify it is suitable for use with async /// operations), and then wrapped with this type. pub struct DescriptorAdapter(pub T); -impl AsRawFd for DescriptorAdapter -where - T: DescriptorIntoAsync, -{ - fn as_raw_fd(&self) -> RawFd { - self.0.as_raw_descriptor() - } -} + impl IntoAsync for DescriptorAdapter where T: DescriptorIntoAsync {} +// NOTE: A StreamChannel can either be used fully in async mode, or not in async +// mode. Mixing modes will break StreamChannel's internal read/write +// notification system. +// +// TODO(b/213153157): this type isn't properly available upstream yet. Once it +// is, we can re-enable these implementations. +// impl DescriptorIntoAsync for StreamChannel {} +// impl DescriptorIntoAsync for &StreamChannel {} + impl IntoAsync for Tube {} impl IntoAsync for SendTube {} impl IntoAsync for RecvTube {} diff --git a/cros_async/src/audio_streams_async.rs b/cros_async/src/audio_streams_async.rs index 7350066585..2816b136ba 100644 --- a/cros_async/src/audio_streams_async.rs +++ b/cros_async/src/audio_streams_async.rs @@ -11,11 +11,14 @@ use std::os::unix::net::UnixStream; use std::{io::Result, time::Duration}; -use super::{AsyncWrapper, IntoAsync, IoSourceExt, TimerAsync}; +use crate::{IntoAsync, IoSourceExt, TimerAsync}; use async_trait::async_trait; -use audio_streams::async_api::{ - AsyncStream, AudioStreamsExecutor, ReadAsync, ReadWriteAsync, WriteAsync, -}; +use audio_streams::async_api::{AudioStreamsExecutor, ReadAsync, ReadWriteAsync, WriteAsync}; + +#[cfg(unix)] +use super::AsyncWrapper; +#[cfg(unix)] +use audio_streams::async_api::AsyncStream; /// A wrapper around IoSourceExt that is compatible with the audio_streams traits. pub struct IoSourceWrapper { diff --git a/cros_async/src/blocking.rs b/cros_async/src/blocking.rs index f6430a78b7..e9d8421cf6 100644 --- a/cros_async/src/blocking.rs +++ b/cros_async/src/blocking.rs @@ -2,8 +2,10 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. -mod block_on; +pub mod sys; + +mod cancellable_pool; mod pool; -pub use block_on::*; +pub use cancellable_pool::*; pub use pool::*; diff --git a/cros_async/src/blocking/cancellable_pool.rs b/cros_async/src/blocking/cancellable_pool.rs new file mode 100644 index 0000000000..53f594a9dc --- /dev/null +++ b/cros_async/src/blocking/cancellable_pool.rs @@ -0,0 +1,463 @@ +// Copyright 2022 The Chromium OS Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +//! Provides an async blocking pool whose tasks can be cancelled. + +use std::{ + collections::HashMap, + sync::Arc, + time::{Duration, Instant}, +}; + +use crate::BlockingPool; +use async_task::Task; +use once_cell::sync::Lazy; +use sync::{Condvar, Mutex}; +use thiserror::Error as ThisError; + +/// Global executor. +/// +/// This is convenient, though not preferred. Pros/cons: +/// + It avoids passing executor all the way to each call sites. +/// + The call site can assume that executor will never shutdown. +/// + Provides similar functionality as async_task with a few improvements +/// around ability to cancel. +/// - Globals are harder to reason about. +static EXECUTOR: Lazy = + Lazy::new(|| CancellableBlockingPool::new(256, Duration::from_secs(10))); + +const DEFAULT_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(5); + +#[derive(PartialEq, PartialOrd)] +enum WindDownStates { + Armed, + Disarmed, + ShuttingDown, + ShutDown, +} + +impl Default for WindDownStates { + fn default() -> Self { + WindDownStates::Armed + } +} + +#[derive(Default)] +struct State { + wind_down: WindDownStates, + + /// Helps to generate unique id to associate `cancel` with task. + current_cancellable_id: u64, + + /// A map of all the `cancel` routines of queued/in-flight tasks. + cancellables: HashMap>, +} + +#[derive(Debug, Clone, Copy)] +pub enum TimeoutAction { + /// Do nothing on timeout. + None, + /// Panic the thread on timeout. + Panic, +} + +#[derive(ThisError, Debug, PartialEq)] +pub enum Error { + #[error("Timeout occurred while trying to join threads")] + Timedout, + #[error("Shutdown is in progress")] + ShutdownInProgress, + #[error("Already shut down")] + AlreadyShutdown, +} + +struct Inner { + blocking_pool: BlockingPool, + state: Mutex, + + /// This condvar gets notified when `cancellables` is empty after removing an + /// entry. + cancellables_cv: Condvar, +} + +impl Inner { + pub fn spawn(self: &Arc, f: F) -> Task + where + F: FnOnce() -> R + Send + 'static, + R: Send + 'static, + { + self.blocking_pool.spawn(f) + } + + /// Adds cancel to a cancellables and returns an `id` with which `cancel` can be + /// accessed/removed. + fn add_cancellable(&self, cancel: Box) -> u64 { + let mut state = self.state.lock(); + let id = state.current_cancellable_id; + state.current_cancellable_id += 1; + state.cancellables.insert(id, cancel); + id + } +} + +/// A thread pool for running work that may block. +/// +/// This is a wrapper around `BlockingPool` with an ability to cancel queued tasks. +/// See [BlockingPool] for more info. +/// +/// # Examples +/// +/// Spawn a task to run in the `CancellableBlockingPool` and await on its result. +/// +/// ```edition2018 +/// use cros_async::CancellableBlockingPool; +/// +/// # async fn do_it() { +/// let pool = CancellableBlockingPool::default(); +/// let CANCELLED = 0; +/// +/// let res = pool.spawn(move || { +/// // Do some CPU-intensive or blocking work here. +/// +/// 42 +/// }, move || CANCELLED).await; +/// +/// assert_eq!(res, 42); +/// # } +/// # futures::executor::block_on(do_it()); +/// ``` +#[derive(Clone)] +pub struct CancellableBlockingPool { + inner: Arc, +} + +impl CancellableBlockingPool { + const RETRY_COUNT: usize = 10; + const SLEEP_DURATION: Duration = Duration::from_millis(100); + + /// Create a new `CancellableBlockingPool`. + /// + /// When we try to shutdown or drop `CancellableBlockingPool`, it may happen that a hung thread + /// might prevent `CancellableBlockingPool` pool from getting dropped. On failure to shutdown in + /// `watchdog_opts.timeout` duration, `CancellableBlockingPool` can take an action specified by + /// `watchdog_opts.action`. + /// + /// See also: [BlockingPool::new()](BlockingPool::new) + pub fn new(max_threads: usize, keepalive: Duration) -> CancellableBlockingPool { + CancellableBlockingPool { + inner: Arc::new(Inner { + blocking_pool: BlockingPool::new(max_threads, keepalive), + state: Default::default(), + cancellables_cv: Condvar::new(), + }), + } + } + + /// Like [Self::new] but with pre-allocating capacity for up to `max_threads`. + pub fn with_capacity(max_threads: usize, keepalive: Duration) -> CancellableBlockingPool { + CancellableBlockingPool { + inner: Arc::new(Inner { + blocking_pool: BlockingPool::with_capacity(max_threads, keepalive), + state: Mutex::new(State::default()), + cancellables_cv: Condvar::new(), + }), + } + } + + /// Spawn a task to run in the `CancellableBlockingPool`. + /// + /// Callers may `await` the returned `Task` to be notified when the work is completed. + /// + /// `cancel` helps to cancel a queued or in-flight operation `f`. + /// `cancel` may be called more than once if `f` doesn't respond to `cancel`. + /// `cancel` is not called if `f` completes successfully. For example, + /// # Examples + /// + /// ```edition2018 + /// use {cros_async::CancellableBlockingPool, std::sync::{Arc, Mutex, Condvar}}; + /// + /// # async fn cancel_it() { + /// let pool = CancellableBlockingPool::default(); + /// let cancelled: i32 = 1; + /// let success: i32 = 2; + /// + /// let shared = Arc::new((Mutex::new(0), Condvar::new())); + /// let shared2 = shared.clone(); + /// let shared3 = shared.clone(); + /// + /// let res = pool + /// .spawn( + /// move || { + /// let guard = shared.0.lock().unwrap(); + /// let mut guard = shared.1.wait_while(guard, |state| *state == 0).unwrap(); + /// if *guard != cancelled { + /// *guard = success; + /// } + /// }, + /// move || { + /// *shared2.0.lock().unwrap() = cancelled; + /// shared2.1.notify_all(); + /// }, + /// ) + /// .await; + /// pool.shutdown(); + /// + /// assert_eq!(*shared3.0.lock().unwrap(), cancelled); + /// # } + /// ``` + pub fn spawn(&self, f: F, cancel: G) -> Task + where + F: FnOnce() -> R + Send + 'static, + R: Send + 'static, + G: Fn() -> R + Send + 'static, + { + let inner = self.inner.clone(); + let cancelled = Arc::new(Mutex::new(None)); + let cancelled_spawn = cancelled.clone(); + let id = inner.add_cancellable(Box::new(move || { + let mut c = cancelled.lock(); + *c = Some(cancel()); + })); + + self.inner.spawn(move || { + if let Some(res) = cancelled_spawn.lock().take() { + return res; + } + let ret = f(); + let mut state = inner.state.lock(); + state.cancellables.remove(&id); + if state.cancellables.is_empty() { + inner.cancellables_cv.notify_one(); + } + ret + }) + } + + /// Iterates over all the queued tasks and marks them as cancelled. + fn drain_cancellables(&self) { + let mut state = self.inner.state.lock(); + // Iterate a few times to try cancelling all the tasks. + for _ in 0..Self::RETRY_COUNT { + // Nothing left to do. + if state.cancellables.is_empty() { + return; + } + + // We only cancel the task and do not remove it from the cancellables. It is runner's + // job to remove from state.cancellables. + for cancel in state.cancellables.values() { + cancel(); + } + // Hold the state lock in a block before sleeping so that woken up threads can get to + // hold the lock. + // Wait for a while so that the threads get a chance complete task in flight. + let (state1, _cv_timeout) = self + .inner + .cancellables_cv + .wait_timeout(state, Self::SLEEP_DURATION); + state = state1; + } + } + + /// Marks all the queued and in-flight tasks as cancelled. Any tasks queued after `disarm`ing + /// will be cancelled. + /// Does not wait for all the tasks to get cancelled. + pub fn disarm(&self) { + { + let mut state = self.inner.state.lock(); + + if state.wind_down >= WindDownStates::Disarmed { + return; + } + + // At this point any new incoming request will be cancelled when run. + state.wind_down = WindDownStates::Disarmed; + } + self.drain_cancellables(); + } + + /// Shut down the `CancellableBlockingPool`. + /// + /// This will block until all work that has been started by the worker threads is finished. Any + /// work that was added to the `CancellableBlockingPool` but not yet picked up by a worker + /// thread will not complete and `await`ing on the `Task` for that work will panic. + /// + pub fn shutdown(&self) -> Result<(), Error> { + self.disarm(); + { + let mut state = self.inner.state.lock(); + if state.wind_down == WindDownStates::ShuttingDown { + return Err(Error::ShutdownInProgress); + } + if state.wind_down == WindDownStates::ShutDown { + return Err(Error::AlreadyShutdown); + } + state.wind_down = WindDownStates::ShuttingDown; + } + + let res = self.inner.blocking_pool.shutdown(/* deadline: */ Some( + Instant::now() + DEFAULT_SHUTDOWN_TIMEOUT, + )); + + self.inner.state.lock().wind_down = WindDownStates::ShutDown; + match res { + Ok(_) => Ok(()), + Err(_) => Err(Error::Timedout), + } + } +} + +impl Default for CancellableBlockingPool { + fn default() -> CancellableBlockingPool { + CancellableBlockingPool::new(256, Duration::from_secs(10)) + } +} + +impl Drop for CancellableBlockingPool { + fn drop(&mut self) { + let _ = self.shutdown(); + } +} + +/// Spawn a task to run in the `CancellableBlockingPool` static executor. +/// +/// `cancel` in-flight operation. cancel is called on operation during `disarm` or during +/// `shutdown`. Cancel may be called multiple times if running task doesn't get cancelled on first +/// attempt. +/// +/// Callers may `await` the returned `Task` to be notified when the work is completed. +/// +/// See also: `spawn`. +pub fn unblock(f: F, cancel: G) -> Task +where + F: FnOnce() -> R + Send + 'static, + R: Send + 'static, + G: Fn() -> R + Send + 'static, +{ + EXECUTOR.spawn(f, cancel) +} + +/// Marks all the queued and in-flight tasks as cancelled. Any tasks queued after `disarm`ing +/// will be cancelled. +/// Doesn't not wait for all the tasks to get cancelled. +pub fn unblock_disarm() { + EXECUTOR.disarm() +} + +#[cfg(test)] +mod test { + use std::{sync::Arc, thread, time::Duration}; + + use futures::executor::block_on; + use sync::{Condvar, Mutex}; + + use crate::{blocking::Error, CancellableBlockingPool}; + use std::sync::Barrier; + + #[test] + fn disarm_with_pending_work() { + // Create a pool with only one thread. + let pool = CancellableBlockingPool::new(1, Duration::from_secs(10)); + + let mu = Arc::new(Mutex::new(false)); + let cv = Arc::new(Condvar::new()); + let blocker_is_running = Arc::new(Barrier::new(2)); + + // First spawn a thread that blocks the pool. + let task_mu = mu.clone(); + let task_cv = cv.clone(); + let task_blocker_is_running = blocker_is_running.clone(); + pool.spawn( + move || { + task_blocker_is_running.wait(); + let mut ready = task_mu.lock(); + while !*ready { + ready = task_cv.wait(ready); + } + }, + move || {}, + ) + .detach(); + + // Wait for the worker to start running the blocking thread. + blocker_is_running.wait(); + + // This task will never finish because we will disarm the pool first. + let unfinished = pool.spawn(|| 5, || 0); + + // Disarming should cancel the task. + let _ = pool.disarm(); + + // Shutdown the blocking thread. This will allow a worker to pick up the task that has + // to be cancelled. + *mu.lock() = true; + cv.notify_all(); + + // We expect the cancelled value to be returned. + assert_eq!(block_on(unfinished), 0); + + // Now the pool is empty and can be shutdown without blocking. + let _ = pool.shutdown().unwrap(); + } + + #[test] + fn shutdown_with_blocked_work_should_panic() { + let pool = CancellableBlockingPool::new(1, Duration::from_secs(10)); + + let running = Arc::new((Mutex::new(false), Condvar::new())); + let running1 = running.clone(); + pool.spawn( + move || { + *running1.0.lock() = true; + running1.1.notify_one(); + thread::sleep(Duration::from_secs(10000)); + }, + move || {}, + ) + .detach(); + + let mut is_running = running.0.lock(); + while !*is_running { + is_running = running.1.wait(is_running); + } + + assert_eq!(pool.shutdown().err().unwrap(), Error::Timedout); + } + + #[test] + fn multiple_shutdown_returns_error() { + let pool = CancellableBlockingPool::new(1, Duration::from_secs(10)); + let _ = pool.shutdown(); + assert_eq!(pool.shutdown(), Err(Error::AlreadyShutdown)); + } + + #[test] + fn shutdown_in_progress() { + let pool = CancellableBlockingPool::new(1, Duration::from_secs(10)); + + let running = Arc::new((Mutex::new(false), Condvar::new())); + let running1 = running.clone(); + pool.spawn( + move || { + *running1.0.lock() = true; + running1.1.notify_one(); + thread::sleep(Duration::from_secs(10000)); + }, + move || {}, + ) + .detach(); + + let mut is_running = running.0.lock(); + while !*is_running { + is_running = running.1.wait(is_running); + } + + let pool_clone = pool.clone(); + thread::spawn(move || { + while !pool_clone.inner.blocking_pool.shutting_down() {} + assert_eq!(pool_clone.shutdown(), Err(Error::ShutdownInProgress)); + }); + assert_eq!(pool.shutdown().err().unwrap(), Error::Timedout); + } +} diff --git a/cros_async/src/blocking/pool.rs b/cros_async/src/blocking/pool.rs index 26df32e974..0d940756b0 100644 --- a/cros_async/src/blocking/pool.rs +++ b/cros_async/src/blocking/pool.rs @@ -135,6 +135,24 @@ impl Inner { self.condvar.notify_one(); } } + + pub fn spawn(self: &Arc, f: F) -> Task + where + F: FnOnce() -> R + Send + 'static, + R: Send + 'static, + { + let raw = Arc::downgrade(self); + let schedule = move |runnable| { + if let Some(i) = raw.upgrade() { + i.schedule(runnable); + } + }; + + let (runnable, task) = async_task::spawn(async move { f() }, schedule); + runnable.schedule(); + + task + } } #[derive(Debug, thiserror::Error)] @@ -253,17 +271,7 @@ impl BlockingPool { F: FnOnce() -> R + Send + 'static, R: Send + 'static, { - let raw = Arc::downgrade(&self.inner); - let schedule = move |runnable| { - if let Some(i) = raw.upgrade() { - i.schedule(runnable); - } - }; - - let (runnable, task) = async_task::spawn(async move { f() }, schedule); - runnable.schedule(); - - task + self.inner.spawn(f) } /// Shut down the `BlockingPool`. @@ -316,6 +324,11 @@ impl BlockingPool { Ok(()) } } + + #[cfg(test)] + pub(crate) fn shutting_down(&self) -> bool { + self.inner.state.lock().shutting_down + } } impl Default for BlockingPool { @@ -340,10 +353,10 @@ mod test { time::{Duration, Instant}, }; - use futures::{stream::FuturesUnordered, StreamExt}; + use futures::{executor::block_on, stream::FuturesUnordered, StreamExt}; use sync::{Condvar, Mutex}; - use super::super::super::{block_on, BlockingPool}; + use super::super::super::BlockingPool; #[test] fn blocking_sleep() { diff --git a/cros_async/src/blocking/sys.rs b/cros_async/src/blocking/sys.rs new file mode 100644 index 0000000000..69884f11f6 --- /dev/null +++ b/cros_async/src/blocking/sys.rs @@ -0,0 +1,6 @@ +// Copyright 2022 The Chromium OS Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#[cfg(unix)] +pub mod unix; diff --git a/cros_async/src/blocking/sys/unix.rs b/cros_async/src/blocking/sys/unix.rs new file mode 100644 index 0000000000..8aa965e4f4 --- /dev/null +++ b/cros_async/src/blocking/sys/unix.rs @@ -0,0 +1,5 @@ +// Copyright 2022 The Chromium OS Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +pub mod block_on; diff --git a/cros_async/src/blocking/block_on.rs b/cros_async/src/blocking/sys/unix/block_on.rs similarity index 99% rename from cros_async/src/blocking/block_on.rs rename to cros_async/src/blocking/sys/unix/block_on.rs index 2cb0b1e24e..e422fbfe4a 100644 --- a/cros_async/src/blocking/block_on.rs +++ b/cros_async/src/blocking/sys/unix/block_on.rs @@ -125,7 +125,7 @@ mod test { time::Duration, }; - use super::super::super::sync::SpinLock; + use crate::sync::SpinLock; struct TimerState { fired: bool, diff --git a/cros_async/src/event.rs b/cros_async/src/event.rs index 703205f8f9..11a3c42932 100644 --- a/cros_async/src/event.rs +++ b/cros_async/src/event.rs @@ -2,85 +2,23 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. -use super::{AsyncResult, Executor, IntoAsync, IoSourceExt}; -use base::Event as EventFd; +use crate::{IntoAsync, IoSourceExt}; +use base::Event; -/// An async version of `base::EventFd`. +/// An async version of `base::Event`. pub struct EventAsync { - io_source: Box>, + pub(crate) io_source: Box>, + #[cfg(windows)] + pub(crate) reset_after_read: bool, } impl EventAsync { - pub fn new(event: EventFd, ex: &Executor) -> AsyncResult { - ex.async_from(event) - .map(|io_source| EventAsync { io_source }) - } - - #[cfg(test)] - pub(crate) fn new_poll(event: EventFd, ex: &super::FdExecutor) -> AsyncResult { - super::executor::async_poll_from(event, ex).map(|io_source| EventAsync { io_source }) - } - - #[cfg(test)] - pub(crate) fn new_uring(event: EventFd, ex: &super::URingExecutor) -> AsyncResult { - super::executor::async_uring_from(event, ex).map(|io_source| EventAsync { io_source }) - } - - /// Gets the next value from the eventfd. - #[allow(dead_code)] - pub async fn next_val(&self) -> AsyncResult { - self.io_source.read_u64().await + pub fn get_io_source_ref(&self) -> &dyn IoSourceExt { + self.io_source.as_ref() } } -impl IntoAsync for EventFd {} +impl IntoAsync for Event {} -#[cfg(test)] -mod tests { - use super::*; - - use super::super::{uring_executor::use_uring, Executor, FdExecutor, URingExecutor}; - - #[test] - fn next_val_reads_value() { - async fn go(event: EventFd, ex: &Executor) -> u64 { - let event_async = EventAsync::new(event, ex).unwrap(); - event_async.next_val().await.unwrap() - } - - let eventfd = EventFd::new().unwrap(); - eventfd.write(0xaa).unwrap(); - let ex = Executor::new().unwrap(); - let val = ex.run_until(go(eventfd, &ex)).unwrap(); - assert_eq!(val, 0xaa); - } - - #[test] - fn next_val_reads_value_poll_and_ring() { - if !use_uring() { - return; - } - - async fn go(event_async: EventAsync) -> u64 { - event_async.next_val().await.unwrap() - } - - let eventfd = EventFd::new().unwrap(); - eventfd.write(0xaa).unwrap(); - let uring_ex = URingExecutor::new().unwrap(); - let val = uring_ex - .run_until(go(EventAsync::new_uring(eventfd, &uring_ex).unwrap())) - .unwrap(); - assert_eq!(val, 0xaa); - - let eventfd = EventFd::new().unwrap(); - eventfd.write(0xaa).unwrap(); - let poll_ex = FdExecutor::new().unwrap(); - let val = poll_ex - .run_until(go(EventAsync::new_poll(eventfd, &poll_ex).unwrap())) - .unwrap(); - assert_eq!(val, 0xaa); - } -} -// Safe because an `EventFd` is used underneath, which is safe to pass between threads. +// Safe because an `Event` is used underneath, which is safe to pass between threads. unsafe impl Send for EventAsync {} diff --git a/cros_async/src/io_ext.rs b/cros_async/src/io_ext.rs index b1486b1083..990c99fe31 100644 --- a/cros_async/src/io_ext.rs +++ b/cros_async/src/io_ext.rs @@ -19,29 +19,64 @@ use std::{ fs::File, io, ops::{Deref, DerefMut}, - os::unix::io::{AsRawFd, RawFd}, sync::Arc, }; use async_trait::async_trait; -use base::UnixSeqpacket; use remain::sorted; use thiserror::Error as ThisError; use super::{BackingMemory, MemRegion}; +#[cfg(unix)] +use base::UnixSeqpacket; +#[cfg(unix)] +use std::os::unix::io::{AsRawFd, RawFd}; + +#[cfg(windows)] +use std::os::windows::io::{AsRawHandle, RawHandle}; + +#[cfg(unix)] #[sorted] #[derive(ThisError, Debug)] pub enum Error { /// An error with a polled(FD) source. #[error("An error with a poll source: {0}")] - Poll(#[from] super::poll_source::Error), + Poll(crate::sys::unix::poll_source::Error), /// An error with a uring source. #[error("An error with a uring source: {0}")] - Uring(#[from] super::uring_executor::Error), + Uring(crate::sys::unix::uring_executor::Error), } + +#[cfg(windows)] +#[sorted] +#[derive(ThisError, Debug)] +pub enum Error { + #[error("An error with an EventAsync: {0}")] + EventAsync(base::Error), + #[error("An error with a handle executor: {0}")] + HandleExecutor(crate::sys::windows::handle_executor::Error), + #[error("An error with a handle source: {0}")] + HandleSource(crate::sys::windows::handle_source::Error), +} + pub type Result = std::result::Result; +#[cfg(unix)] +impl From for Error { + fn from(err: crate::sys::unix::uring_executor::Error) -> Self { + Error::Uring(err) + } +} + +#[cfg(unix)] +impl From for Error { + fn from(err: crate::sys::unix::poll_source::Error) -> Self { + Error::Poll(err) + } +} + +#[cfg(unix)] impl From for io::Error { fn from(e: Error) -> Self { use Error::*; @@ -52,6 +87,32 @@ impl From for io::Error { } } +#[cfg(windows)] +impl From for io::Error { + fn from(e: Error) -> Self { + use Error::*; + match e { + EventAsync(e) => e.into(), + HandleExecutor(e) => e.into(), + HandleSource(e) => e.into(), + } + } +} + +#[cfg(windows)] +impl From for Error { + fn from(err: crate::sys::windows::handle_source::Error) -> Self { + Error::HandleSource(err) + } +} + +#[cfg(windows)] +impl From for Error { + fn from(err: crate::sys::windows::handle_executor::Error) -> Self { + Error::HandleExecutor(err) + } +} + /// Ergonomic methods for async reads. #[async_trait(?Send)] pub trait ReadAsync { @@ -136,6 +197,10 @@ pub trait IoSourceExt: ReadAsync + WriteAsync { /// Provides a ref to the underlying IO source. fn as_source(&self) -> &F; + + /// Waits on a waitable handle. Needed for + /// Windows currently, and subject to a potential future upstream. + async fn wait_for_handle(&self) -> Result; } /// Marker trait signifying that the implementor is suitable for use with @@ -144,10 +209,15 @@ pub trait IoSourceExt: ReadAsync + WriteAsync { /// (Note: it'd be really nice to implement a TryFrom for any implementors, and /// remove our factory functions. Unfortunately /// makes that too painful.) +#[cfg(unix)] pub trait IntoAsync: AsRawFd {} +#[cfg(windows)] +pub trait IntoAsync: AsRawHandle {} impl IntoAsync for File {} +#[cfg(unix)] impl IntoAsync for UnixSeqpacket {} +#[cfg(unix)] impl IntoAsync for &UnixSeqpacket {} /// Simple wrapper struct to implement IntoAsync on foreign types. @@ -179,16 +249,29 @@ impl DerefMut for AsyncWrapper { } } +#[cfg(unix)] impl AsRawFd for AsyncWrapper { fn as_raw_fd(&self) -> RawFd { self.0.as_raw_fd() } } +#[cfg(windows)] +impl AsRawHandle for AsyncWrapper { + fn as_raw_handle(&self) -> RawHandle { + self.0.as_raw_handle() + } +} + +#[cfg(unix)] impl IntoAsync for AsyncWrapper {} -#[cfg(test)] +#[cfg(windows)] +impl IntoAsync for AsyncWrapper {} + +#[cfg(all(test, unix))] mod tests { + use base::Event; use std::{ fs::{File, OpenOptions}, future::Future, @@ -200,16 +283,16 @@ mod tests { }; use sync::Mutex; - use super::{ - super::{ + use super::*; + use crate::{ + mem::VecIoWrapper, + sys::unix::{ executor::{async_poll_from, async_uring_from}, - mem::VecIoWrapper, uring_executor::use_uring, - Executor, FdExecutor, MemRegion, PollSource, URingExecutor, UringSource, + FdExecutor, PollSource, URingExecutor, UringSource, }, - *, + Executor, MemRegion, }; - use base::Event; struct State { should_quit: bool, diff --git a/cros_async/src/lib.rs b/cros_async/src/lib.rs index 5afc57a473..d3f9cf0d74 100644 --- a/cros_async/src/lib.rs +++ b/cros_async/src/lib.rs @@ -63,44 +63,41 @@ pub mod audio_streams_async; mod blocking; mod complete; mod event; -mod executor; -mod fd_executor; mod io_ext; pub mod mem; -mod poll_source; mod queue; mod select; pub mod sync; +pub mod sys; +pub use sys::Executor; mod timer; -mod uring_executor; -mod uring_source; mod waker; pub use async_types::*; -pub use base; -pub use blocking::{block_on, BlockingPool}; +pub use base::Event; +#[cfg(unix)] +pub use blocking::sys::unix::block_on::block_on; +pub use blocking::BlockingPool; pub use event::EventAsync; -pub use executor::Executor; -pub use fd_executor::FdExecutor; +#[cfg(windows)] +pub use futures::executor::block_on; pub use io_ext::{ AllocateMode, AsyncWrapper, Error as AsyncError, IntoAsync, IoSourceExt, ReadAsync, Result as AsyncResult, WriteAsync, }; pub use mem::{BackingMemory, MemRegion}; -pub use poll_source::PollSource; pub use select::SelectResult; +pub use sys::run_one; pub use timer::TimerAsync; -pub use uring_executor::URingExecutor; -pub use uring_source::UringSource; use std::{ future::Future, - io, marker::PhantomData, pin::Pin, task::{Context, Poll}, }; +pub use blocking::{unblock, unblock_disarm, CancellableBlockingPool, TimeoutAction}; use remain::sorted; use thiserror::Error as ThisError; @@ -108,32 +105,26 @@ use thiserror::Error as ThisError; #[derive(ThisError, Debug)] pub enum Error { /// Error from the FD executor. + #[cfg(unix)] #[error("Failure in the FD executor: {0}")] - FdExecutor(fd_executor::Error), + FdExecutor(sys::unix::fd_executor::Error), + /// Error from the handle executor. + #[cfg(windows)] + #[error("Failure in the handle executor: {0}")] + HandleExecutor(sys::windows::handle_executor::Error), + /// Error from Timer. + #[error("Failure in Timer: {0}")] + Timer(base::Error), /// Error from TimerFd. #[error("Failure in TimerAsync: {0}")] TimerAsync(AsyncError), - /// Error from TimerFd. - #[error("Failure in TimerFd: {0}")] - TimerFd(base::Error), /// Error from the uring executor. + #[cfg(unix)] #[error("Failure in the uring executor: {0}")] - URingExecutor(uring_executor::Error), + URingExecutor(sys::unix::uring_executor::Error), } pub type Result = std::result::Result; -impl From for io::Error { - fn from(e: Error) -> Self { - use Error::*; - match e { - FdExecutor(e) => e.into(), - URingExecutor(e) => e.into(), - TimerFd(e) => e.into(), - TimerAsync(e) => e.into(), - } - } -} - // A Future that never completes. pub struct Empty { phantom: PhantomData, @@ -153,56 +144,6 @@ pub fn empty() -> Empty { } } -/// Creates an Executor that runs one future to completion. -/// -/// # Example -/// -/// ``` -/// use cros_async::run_one; -/// -/// let fut = async { 55 }; -/// assert_eq!(55, run_one(fut).unwrap()); -/// ``` -pub fn run_one(fut: F) -> Result { - if uring_executor::use_uring() { - run_one_uring(fut) - } else { - run_one_poll(fut) - } -} - -/// Creates a URingExecutor that runs one future to completion. -/// -/// # Example -/// -/// ``` -/// use cros_async::run_one_uring; -/// -/// let fut = async { 55 }; -/// assert_eq!(55, run_one_uring(fut).unwrap()); -/// ``` -pub fn run_one_uring(fut: F) -> Result { - URingExecutor::new() - .and_then(|ex| ex.run_until(fut)) - .map_err(Error::URingExecutor) -} - -/// Creates a FdExecutor that runs one future to completion. -/// -/// # Example -/// -/// ``` -/// use cros_async::run_one_poll; -/// -/// let fut = async { 55 }; -/// assert_eq!(55, run_one_poll(fut).unwrap()); -/// ``` -pub fn run_one_poll(fut: F) -> Result { - FdExecutor::new() - .and_then(|ex| ex.run_until(fut)) - .map_err(Error::FdExecutor) -} - // Select helpers to run until any future completes. /// Creates a combinator that runs the two given futures until one completes, returning a tuple diff --git a/cros_async/src/mem.rs b/cros_async/src/mem.rs index 691e629fec..88eee3ac57 100644 --- a/cros_async/src/mem.rs +++ b/cros_async/src/mem.rs @@ -25,7 +25,7 @@ pub struct MemRegion { pub len: usize, } -/// Trait for memory that can yeild both iovecs in to the backing memory. +/// Trait for memory that can yield both iovecs in to the backing memory. /// Must be OK to modify the backing memory without owning a mut able reference. For example, /// this is safe for GuestMemory and VolatileSlices in crosvm as those types guarantee they are /// dealt with as volatile. @@ -63,6 +63,7 @@ impl From for Vec { impl VecIoWrapper { /// Get the length of the Vec that is wrapped. + #[cfg_attr(windows, allow(dead_code))] pub fn len(&self) -> usize { self.inner.len() } diff --git a/cros_async/src/select.rs b/cros_async/src/select.rs index acf83e7683..a78a0d92bc 100644 --- a/cros_async/src/select.rs +++ b/cros_async/src/select.rs @@ -26,7 +26,6 @@ macro_rules! generate { $(#[$doc:meta])* ($Select:ident, <$($Fut:ident),*>), )*) => ($( - paste::item! { pub(crate) struct $Select<$($Fut: Future + Unpin),*> { $($Fut: MaybeDone<$Fut>,)* diff --git a/cros_async/src/sync/cv.rs b/cros_async/src/sync/cv.rs index 46b96dd188..b5455833f2 100644 --- a/cros_async/src/sync/cv.rs +++ b/cros_async/src/sync/cv.rs @@ -449,6 +449,8 @@ fn cancel_waiter(cv: usize, waiter: &Waiter, wake_next: bool) { unsafe { (*condvar).cancel_waiter(waiter, wake_next) } } +// TODO(b/194338842): Fix tests for windows +#[cfg(unix)] #[cfg(test)] mod test { use super::*; diff --git a/cros_async/src/sync/mu.rs b/cros_async/src/sync/mu.rs index e1f408dfa2..0aead01c20 100644 --- a/cros_async/src/sync/mu.rs +++ b/cros_async/src/sync/mu.rs @@ -883,6 +883,8 @@ impl<'a, T: ?Sized> Drop for MutexReadGuard<'a, T> { } } +// TODO(b/194338842): Fix tests for windows +#[cfg(unix)] #[cfg(test)] mod test { use super::*; diff --git a/cros_async/src/sys.rs b/cros_async/src/sys.rs new file mode 100644 index 0000000000..5dfbfc2aa3 --- /dev/null +++ b/cros_async/src/sys.rs @@ -0,0 +1,13 @@ +// Copyright 2022 The Chromium OS Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +cfg_if::cfg_if! { + if #[cfg(unix)] { + pub mod unix; + pub use unix::{async_types, event, executor::Executor, run_one}; + } else if #[cfg(windows)] { + pub mod windows; + pub use windows::{async_types, event, executor::Executor, run_one}; + } +} diff --git a/cros_async/src/sys/unix.rs b/cros_async/src/sys/unix.rs new file mode 100644 index 0000000000..04bfb26626 --- /dev/null +++ b/cros_async/src/sys/unix.rs @@ -0,0 +1,81 @@ +// Copyright 2021 The Chromium OS Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +pub mod async_types; +pub mod event; +pub mod executor; +pub mod fd_executor; +pub mod poll_source; +pub mod uring_executor; +pub mod uring_source; +pub use fd_executor::FdExecutor; +pub use poll_source::PollSource; +pub use uring_executor::URingExecutor; +pub use uring_source::UringSource; +mod timer; + +use crate::{Error, Result}; +use std::future::Future; + +/// Creates a URingExecutor that runs one future to completion. +/// +/// # Example +/// +/// ``` +/// use cros_async::sys::unix::run_one_uring; +/// +/// let fut = async { 55 }; +/// assert_eq!(55, run_one_uring(fut).unwrap()); +/// ``` +pub fn run_one_uring(fut: F) -> Result { + URingExecutor::new() + .and_then(|ex| ex.run_until(fut)) + .map_err(Error::URingExecutor) +} + +/// Creates a FdExecutor that runs one future to completion. +/// +/// # Example +/// +/// ``` +/// use cros_async::sys::unix::run_one_poll; +/// +/// let fut = async { 55 }; +/// assert_eq!(55, run_one_poll(fut).unwrap()); +/// ``` +pub fn run_one_poll(fut: F) -> Result { + FdExecutor::new() + .and_then(|ex| ex.run_until(fut)) + .map_err(Error::FdExecutor) +} + +/// Creates an Executor that runs one future to completion. +/// +/// # Example +/// +/// ``` +/// use cros_async::sys::unix::run_one; +/// +/// let fut = async { 55 }; +/// assert_eq!(55, run_one(fut).unwrap()); +/// ``` +pub fn run_one(fut: F) -> Result { + if uring_executor::use_uring() { + run_one_uring(fut) + } else { + run_one_poll(fut) + } +} + +impl From for std::io::Error { + fn from(e: Error) -> Self { + use Error::*; + match e { + FdExecutor(e) => e.into(), + URingExecutor(e) => e.into(), + Timer(e) => e.into(), + TimerAsync(e) => e.into(), + } + } +} diff --git a/cros_async/src/unix/async_types.rs b/cros_async/src/sys/unix/async_types.rs similarity index 79% rename from cros_async/src/unix/async_types.rs rename to cros_async/src/sys/unix/async_types.rs index c1653c2442..6d7c12d5c7 100644 --- a/cros_async/src/unix/async_types.rs +++ b/cros_async/src/sys/unix/async_types.rs @@ -1,11 +1,13 @@ // Copyright 2022 The Chromium OS Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. -use crate::{Executor, IoSourceExt}; + +use crate::{DescriptorAdapter, DescriptorIntoAsync, Executor, IoSourceExt}; use base::{Tube, TubeResult}; use serde::{de::DeserializeOwned, Serialize}; use std::io; use std::ops::Deref; +use std::os::unix::io::{AsRawFd, RawFd}; pub struct AsyncTube { inner: Box>, @@ -40,3 +42,12 @@ impl From for Tube { at.inner.into_source() } } + +impl AsRawFd for DescriptorAdapter +where + T: DescriptorIntoAsync, +{ + fn as_raw_fd(&self) -> RawFd { + self.0.as_raw_descriptor() + } +} diff --git a/cros_async/src/sys/unix/event.rs b/cros_async/src/sys/unix/event.rs new file mode 100644 index 0000000000..30d4aca78b --- /dev/null +++ b/cros_async/src/sys/unix/event.rs @@ -0,0 +1,78 @@ +// Copyright 2022 The Chromium OS Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#[cfg(test)] +use super::{FdExecutor, URingExecutor}; +use crate::{AsyncResult, EventAsync, Executor}; +use base::Event as EventFd; + +impl EventAsync { + pub fn new(event: EventFd, ex: &Executor) -> AsyncResult { + ex.async_from(event) + .map(|io_source| EventAsync { io_source }) + } + + /// Gets the next value from the eventfd. + pub async fn next_val(&self) -> AsyncResult { + self.io_source.read_u64().await + } + + #[cfg(test)] + pub(crate) fn new_poll(event: EventFd, ex: &FdExecutor) -> AsyncResult { + super::executor::async_poll_from(event, ex).map(|io_source| EventAsync { io_source }) + } + + #[cfg(test)] + pub(crate) fn new_uring(event: EventFd, ex: &URingExecutor) -> AsyncResult { + super::executor::async_uring_from(event, ex).map(|io_source| EventAsync { io_source }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + use crate::sys::unix::uring_executor::use_uring; + + #[test] + fn next_val_reads_value() { + async fn go(event: EventFd, ex: &Executor) -> u64 { + let event_async = EventAsync::new(event, ex).unwrap(); + event_async.next_val().await.unwrap() + } + + let eventfd = EventFd::new().unwrap(); + eventfd.write(0xaa).unwrap(); + let ex = Executor::new().unwrap(); + let val = ex.run_until(go(eventfd, &ex)).unwrap(); + assert_eq!(val, 0xaa); + } + + #[test] + fn next_val_reads_value_poll_and_ring() { + if !use_uring() { + return; + } + + async fn go(event_async: EventAsync) -> u64 { + event_async.next_val().await.unwrap() + } + + let eventfd = EventFd::new().unwrap(); + eventfd.write(0xaa).unwrap(); + let uring_ex = URingExecutor::new().unwrap(); + let val = uring_ex + .run_until(go(EventAsync::new_uring(eventfd, &uring_ex).unwrap())) + .unwrap(); + assert_eq!(val, 0xaa); + + let eventfd = EventFd::new().unwrap(); + eventfd.write(0xaa).unwrap(); + let poll_ex = FdExecutor::new().unwrap(); + let val = poll_ex + .run_until(go(EventAsync::new_poll(eventfd, &poll_ex).unwrap())) + .unwrap(); + assert_eq!(val, 0xaa); + } +} diff --git a/cros_async/src/executor.rs b/cros_async/src/sys/unix/executor.rs similarity index 92% rename from cros_async/src/executor.rs rename to cros_async/src/sys/unix/executor.rs index e79790901b..075d861865 100644 --- a/cros_async/src/executor.rs +++ b/cros_async/src/sys/unix/executor.rs @@ -7,14 +7,16 @@ use std::future::Future; use async_task::Task; use super::{ - poll_source::Error as PollError, uring_executor::use_uring, AsyncResult, FdExecutor, IntoAsync, - IoSourceExt, PollSource, URingExecutor, UringSource, + poll_source::Error as PollError, uring_executor::use_uring, FdExecutor, PollSource, + URingExecutor, UringSource, }; +use crate::{AsyncResult, IntoAsync, IoSourceExt}; + pub(crate) fn async_uring_from<'a, F: IntoAsync + Send + 'a>( f: F, ex: &URingExecutor, -) -> AsyncResult + Send + 'a>> { +) -> AsyncResult + 'a + Send>> { Ok(UringSource::new(f, ex).map(|u| Box::new(u) as Box + Send>)?) } @@ -34,6 +36,13 @@ pub(crate) fn async_poll_from<'a, F: IntoAsync + Send + 'a>( /// The returned type is a cheap, clonable handle to the underlying executor. Cloning it will only /// create a new reference, not a new executor. /// +/// Note that language limitations (trait objects can have <=1 non auto trait) require this to be +/// represented on the POSIX side as an enum, rather than a trait. This leads to some code & +/// interface duplication, but as far as we understand that is unavoidable. +/// +/// See https://chromium-review.googlesource.com/c/chromiumos/platform/crosvm/+/2571401/2..6/cros_async/src/executor.rs#b75 +/// for further details. +/// /// # Examples /// /// Concurrently wait for multiple files to become readable/writable and then read/write the data. @@ -49,7 +58,7 @@ pub(crate) fn async_poll_from<'a, F: IntoAsync + Send + 'a>( /// // Write all bytes from `data` to `f`. /// async fn write_file(f: &dyn IoSourceExt, mut data: Vec) -> AsyncResult<()> { /// while data.len() > 0 { -/// let (count, mut buf) = f.write_from_vec(None, data).await?; +/// let (count, mut buf) = f.write_from_vec(Some(0), data).await?; /// /// data = buf.split_off(count); /// } @@ -67,7 +76,7 @@ pub(crate) fn async_poll_from<'a, F: IntoAsync + Send + 'a>( /// /// while rem > 0 { /// let buf = vec![0u8; min(rem, CHUNK_SIZE)]; -/// let (count, mut data) = from.read_to_vec(None, buf).await?; +/// let (count, mut data) = from.read_to_vec(Some(0), buf).await?; /// /// if count == 0 { /// // End of file. Return the number of bytes transferred. @@ -83,10 +92,11 @@ pub(crate) fn async_poll_from<'a, F: IntoAsync + Send + 'a>( /// Ok(len) /// } /// +/// #[cfg(unix)] /// # fn do_it() -> Result<(), Box> { /// let ex = Executor::new()?; /// -/// let (rx, tx) = base::pipe(true)?; +/// let (rx, tx) = base::unix::pipe(true)?; /// let zero = File::open("/dev/zero")?; /// let zero_bytes = CHUNK_SIZE * 7; /// let zero_to_pipe = transfer_data( @@ -111,7 +121,7 @@ pub(crate) fn async_poll_from<'a, F: IntoAsync + Send + 'a>( /// /// # Ok(()) /// # } -/// +/// #[cfg(unix)] /// # do_it().unwrap(); /// ``` diff --git a/cros_async/src/fd_executor.rs b/cros_async/src/sys/unix/fd_executor.rs similarity index 99% rename from cros_async/src/fd_executor.rs rename to cros_async/src/sys/unix/fd_executor.rs index 85ee796373..15f1e0a41a 100644 --- a/cros_async/src/fd_executor.rs +++ b/cros_async/src/sys/unix/fd_executor.rs @@ -23,7 +23,7 @@ use std::{ }; use async_task::Task; -use base::{add_fd_flags, warn, EpollContext, EpollEvents, EventFd, WatchingEvents}; +use base::{add_fd_flags, warn, EpollContext, EpollEvents, Event as EventFd, WatchingEvents}; use futures::task::noop_waker; use pin_utils::pin_mut; use remain::sorted; @@ -31,7 +31,7 @@ use slab::Slab; use sync::Mutex; use thiserror::Error as ThisError; -use super::{ +use crate::{ queue::RunnableQueue, waker::{new_waker, WakerToken, WeakWake}, BlockingPool, @@ -527,7 +527,7 @@ impl FdExecutor { let waker = new_waker(Arc::downgrade(&self.raw)); let mut cx = Context::from_waker(&waker); - self.raw.run(&mut cx, super::empty::<()>()) + self.raw.run(&mut cx, crate::empty::<()>()) } pub fn run_until(&self, f: F) -> Result { diff --git a/cros_async/src/poll_source.rs b/cros_async/src/sys/unix/poll_source.rs similarity index 97% rename from cros_async/src/poll_source.rs rename to cros_async/src/sys/unix/poll_source.rs index 3477041dd9..c092cff24d 100644 --- a/cros_async/src/poll_source.rs +++ b/cros_async/src/sys/unix/poll_source.rs @@ -17,14 +17,12 @@ use data_model::VolatileSlice; use remain::sorted; use thiserror::Error as ThisError; -use crate::AllocateMode; - -use super::{ - fd_executor::{ - FdExecutor, RegisteredSource, {self}, - }, +use super::fd_executor::{ + FdExecutor, RegisteredSource, {self}, +}; +use crate::{ mem::{BackingMemory, MemRegion}, - AsyncError, AsyncResult, IoSourceExt, ReadAsync, WriteAsync, + AllocateMode, AsyncError, AsyncResult, IoSourceExt, ReadAsync, WriteAsync, }; #[sorted] @@ -320,10 +318,11 @@ impl WriteAsync for PollSource { /// See `fallocate(2)` for details. async fn fallocate(&self, file_offset: u64, len: u64, mode: AllocateMode) -> AsyncResult<()> { + let mode_u32: u32 = mode.into(); let ret = unsafe { libc::fallocate64( self.as_raw_fd(), - mode as libc::c_int, + mode_u32 as libc::c_int, file_offset as libc::off64_t, len as libc::off64_t, ) @@ -362,6 +361,10 @@ impl IoSourceExt for PollSource { fn as_source(&self) -> &F { self } + + async fn wait_for_handle(&self) -> AsyncResult { + self.read_u64().await + } } #[cfg(test)] diff --git a/cros_async/src/sys/unix/timer.rs b/cros_async/src/sys/unix/timer.rs new file mode 100644 index 0000000000..22457de4bb --- /dev/null +++ b/cros_async/src/sys/unix/timer.rs @@ -0,0 +1,83 @@ +// Copyright 2022 The Chromium OS Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +// For the moment, the only platform specific code is related to tests. +#![cfg(test)] + +use super::{FdExecutor, URingExecutor}; +use crate::{sys::unix::executor, AsyncResult, TimerAsync}; +use base::Timer; + +impl TimerAsync { + pub(crate) fn new_poll(timer: Timer, ex: &FdExecutor) -> AsyncResult { + executor::async_poll_from(timer, ex).map(|io_source| TimerAsync { io_source }) + } + + pub(crate) fn new_uring(timer: Timer, ex: &URingExecutor) -> AsyncResult { + executor::async_uring_from(timer, ex).map(|io_source| TimerAsync { io_source }) + } +} + +mod tests { + use super::*; + use crate::{sys::unix::uring_executor::use_uring, Executor}; + use std::time::{Duration, Instant}; + + #[test] + fn timer() { + async fn this_test(ex: &Executor) { + let dur = Duration::from_millis(200); + let now = Instant::now(); + TimerAsync::sleep(ex, dur).await.expect("unable to sleep"); + assert!(now.elapsed() >= dur); + } + + let ex = Executor::new().expect("creating an executor failed"); + ex.run_until(this_test(&ex)).unwrap(); + } + + #[test] + fn one_shot() { + if !use_uring() { + return; + } + + async fn this_test(ex: &URingExecutor) { + let mut tfd = Timer::new().expect("failed to create timerfd"); + + let dur = Duration::from_millis(200); + let now = Instant::now(); + tfd.reset(dur, None).expect("failed to arm timer"); + + let t = TimerAsync::new_uring(tfd, ex).unwrap(); + let count = t.next_val().await.expect("unable to wait for timer"); + + assert_eq!(count, 1); + assert!(now.elapsed() >= dur); + } + + let ex = URingExecutor::new().unwrap(); + ex.run_until(this_test(&ex)).unwrap(); + } + + #[test] + fn one_shot_fd() { + async fn this_test(ex: &FdExecutor) { + let mut tfd = Timer::new().expect("failed to create timerfd"); + + let dur = Duration::from_millis(200); + let now = Instant::now(); + tfd.reset(dur, None).expect("failed to arm timer"); + + let t = TimerAsync::new_poll(tfd, ex).unwrap(); + let count = t.next_val().await.expect("unable to wait for timer"); + + assert_eq!(count, 1); + assert!(now.elapsed() >= dur); + } + + let ex = FdExecutor::new().unwrap(); + ex.run_until(this_test(&ex)).unwrap(); + } +} diff --git a/cros_async/src/uring_executor.rs b/cros_async/src/sys/unix/uring_executor.rs similarity index 99% rename from cros_async/src/uring_executor.rs rename to cros_async/src/sys/unix/uring_executor.rs index 743939b755..446e8c9a32 100644 --- a/cros_async/src/uring_executor.rs +++ b/cros_async/src/sys/unix/uring_executor.rs @@ -83,7 +83,7 @@ use slab::Slab; use sync::Mutex; use thiserror::Error as ThisError; -use super::{ +use crate::{ mem::{BackingMemory, MemRegion}, queue::RunnableQueue, waker::{new_waker, WakerToken, WeakWake}, @@ -542,7 +542,6 @@ impl RawExecutor { self.ctx .add_poll_fd(src.as_raw_fd(), events, usize_to_u64(next_op_token)) .map_err(Error::SubmittingOp)?; - entry.insert(OpStatus::Pending(OpData { _file: src, _mem: None, @@ -600,7 +599,6 @@ impl RawExecutor { self.ctx .add_fsync(src.as_raw_fd(), usize_to_u64(next_op_token)) .map_err(Error::SubmittingOp)?; - entry.insert(OpStatus::Pending(OpData { _file: src, _mem: None, @@ -807,7 +805,7 @@ impl URingExecutor { let waker = new_waker(Arc::downgrade(&self.raw)); let mut cx = Context::from_waker(&waker); - self.raw.run(&mut cx, super::empty::<()>()) + self.raw.run(&mut cx, crate::empty::<()>()) } pub fn run_until(&self, f: F) -> Result { @@ -908,13 +906,10 @@ mod tests { task::{Context, Poll}, }; + use super::*; + use crate::mem::{BackingMemory, MemRegion, VecIoWrapper}; use futures::executor::block_on; - use super::{ - super::mem::{BackingMemory, MemRegion, VecIoWrapper}, - *, - }; - // A future that returns ready when the uring queue is empty. struct UringQueueEmpty<'a> { ex: &'a URingExecutor, diff --git a/cros_async/src/uring_source.rs b/cros_async/src/sys/unix/uring_source.rs similarity index 95% rename from cros_async/src/uring_source.rs rename to cros_async/src/sys/unix/uring_source.rs index bd8bd4e2c2..bc7c407b1d 100644 --- a/cros_async/src/uring_source.rs +++ b/cros_async/src/sys/unix/uring_source.rs @@ -12,12 +12,10 @@ use std::{ use async_trait::async_trait; -use crate::AllocateMode; - -use super::{ +use super::uring_executor::{Error, RegisteredSource, Result, URingExecutor}; +use crate::{ mem::{BackingMemory, MemRegion, VecIoWrapper}, - uring_executor::{Error, RegisteredSource, Result, URingExecutor}, - AsyncError, AsyncResult, + AllocateMode, AsyncError, AsyncResult, ReadAsync, WriteAsync, }; /// `UringSource` wraps FD backed IO sources for use with io_uring. It is a thin wrapper around @@ -45,7 +43,7 @@ impl UringSource { } #[async_trait(?Send)] -impl super::ReadAsync for UringSource { +impl ReadAsync for UringSource { /// Reads from the iosource at `file_offset` and fill the given `vec`. async fn read_to_vec<'a>( &'a self, @@ -125,7 +123,7 @@ impl super::ReadAsync for UringSource { } #[async_trait(?Send)] -impl super::WriteAsync for UringSource { +impl WriteAsync for UringSource { /// Writes from the given `vec` to the file starting at `file_offset`. async fn write_from_vec<'a>( &'a self, @@ -185,7 +183,7 @@ impl super::WriteAsync for UringSource { } #[async_trait(?Send)] -impl super::IoSourceExt for UringSource { +impl crate::IoSourceExt for UringSource { /// Yields the underlying IO source. fn into_source(self: Box) -> F { self.source @@ -200,6 +198,10 @@ impl super::IoSourceExt for UringSource { fn as_source_mut(&mut self) -> &mut F { &mut self.source } + + async fn wait_for_handle(&self) -> AsyncResult { + self.read_u64().await + } } impl Deref for UringSource { @@ -224,11 +226,8 @@ mod tests { path::PathBuf, }; - use super::super::{ - io_ext::{ReadAsync, WriteAsync}, - uring_executor::use_uring, - UringSource, - }; + use super::super::{uring_executor::use_uring, UringSource}; + use crate::io_ext::{ReadAsync, WriteAsync}; use super::*; @@ -238,7 +237,7 @@ mod tests { return; } - use super::super::mem::VecIoWrapper; + use crate::mem::VecIoWrapper; use std::io::Write; use tempfile::tempfile; @@ -345,7 +344,7 @@ mod tests { return; } - use base::EventFd; + use base::Event as EventFd; async fn write_event(ev: EventFd, wait: EventFd, ex: &URingExecutor) { let wait = UringSource::new(wait, ex).unwrap(); @@ -511,9 +510,9 @@ mod tests { let source = UringSource::new(f, ex).unwrap(); if let Err(e) = source.fallocate(0, 4096, AllocateMode::Default).await { match e { - super::super::io_ext::Error::Uring( - super::super::uring_executor::Error::Io(io_err), - ) => { + crate::io_ext::Error::Uring(super::super::uring_executor::Error::Io( + io_err, + )) => { if io_err.kind() == std::io::ErrorKind::InvalidInput { // Skip the test on kernels before fallocate support. return; @@ -577,7 +576,7 @@ mod tests { .unwrap(); let source = UringSource::new(f, ex).unwrap(); let v = vec![0x55u8; 64]; - let vw = Arc::new(super::super::mem::VecIoWrapper::from(v)); + let vw = Arc::new(crate::mem::VecIoWrapper::from(v)); let ret = source .write_from_mem(None, vw, &[MemRegion { offset: 0, len: 32 }]) .await diff --git a/cros_async/src/sys/windows.rs b/cros_async/src/sys/windows.rs new file mode 100644 index 0000000000..8fe7ef672d --- /dev/null +++ b/cros_async/src/sys/windows.rs @@ -0,0 +1,52 @@ +// Copyright 2021 The Chromium OS Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +pub mod async_types; +pub mod event; +pub mod executor; +pub mod handle_executor; +pub mod handle_source; +mod timer; +pub mod wait_for_handle; +pub(crate) use wait_for_handle::WaitForHandle; +pub use { + handle_executor::HandleExecutor, + handle_source::{HandleSource, HandleWrapper}, +}; + +use crate::{Error, Result}; +use std::future::Future; + +pub fn run_one_handle(fut: F) -> Result { + let ex = HandleExecutor::new(); + ex.run_until(fut).map_err(Error::HandleExecutor) +} + +/// Creates an Executor that runs one future to completion. +/// +/// # Example +/// +/// ``` +/// #[cfg(unix)] +/// { +/// use cros_async::run_one; +/// +/// let fut = async { 55 }; +/// assert_eq!(55, run_one(fut).unwrap()); +/// } +/// ``` +pub fn run_one(fut: F) -> Result { + run_one_handle(fut) +} + +impl From for std::io::Error { + fn from(e: Error) -> Self { + use Error::*; + match e { + HandleExecutor(e) => e.into(), + Timer(e) => e.into(), + TimerAsync(e) => e.into(), + } + } +} diff --git a/cros_async/src/win/async_types.rs b/cros_async/src/sys/windows/async_types.rs similarity index 70% rename from cros_async/src/win/async_types.rs rename to cros_async/src/sys/windows/async_types.rs index bf9c54b4fb..12e7f22812 100644 --- a/cros_async/src/win/async_types.rs +++ b/cros_async/src/sys/windows/async_types.rs @@ -2,11 +2,12 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. -use crate::{Executor, HandleWrapper}; -use base::{AsRawDescriptor, Descriptor, RecvTube, SendTube, Tube, TubeResult}; -use serde::de::DeserializeOwned; +use super::HandleWrapper; +use crate::{unblock, DescriptorAdapter, DescriptorIntoAsync, Executor}; +use base::{Descriptor, Tube, TubeError, TubeResult}; +use serde::{de::DeserializeOwned, Serialize}; use std::io; -use std::os::windows::io::AsRawHandle; +use std::os::windows::io::{AsRawHandle, RawHandle}; use std::sync::{Arc, Mutex}; pub struct AsyncTube { @@ -14,7 +15,7 @@ pub struct AsyncTube { } impl AsyncTube { - pub fn new(ex: &Executor, tube: Tube) -> io::Result { + pub fn new(_ex: &Executor, tube: Tube) -> io::Result { Ok(AsyncTube { inner: Arc::new(Mutex::new(tube)), }) @@ -28,7 +29,7 @@ impl AsyncTube { let handles = HandleWrapper::new(vec![Descriptor(tube.lock().unwrap().as_raw_handle())]); unblock( move || tube.lock().unwrap().recv(), - move || Err(handles.lock().cancel_sync_io(Error::OperationCancelled)), + move || Err(handles.lock().cancel_sync_io(TubeError::OperationCancelled)), ) .await } @@ -38,7 +39,7 @@ impl AsyncTube { let handles = HandleWrapper::new(vec![Descriptor(tube.lock().unwrap().as_raw_handle())]); unblock( move || tube.lock().unwrap().send(&msg), - move || Err(handles.lock().cancel_sync_io(Error::OperationCancelled)), + move || Err(handles.lock().cancel_sync_io(TubeError::OperationCancelled)), ) .await } @@ -52,7 +53,17 @@ impl From for Tube { // // This does however mean that into will block until all async // operations are complete. - let _ = at.inner.lock().unwrap(); + std::mem::drop(at.inner.lock().unwrap()); Arc::try_unwrap(at.inner).unwrap().into_inner().unwrap() } } + +#[cfg(windows)] +impl AsRawHandle for DescriptorAdapter +where + T: DescriptorIntoAsync, +{ + fn as_raw_handle(&self) -> RawHandle { + self.0.as_raw_descriptor() + } +} diff --git a/cros_async/src/sys/windows/event.rs b/cros_async/src/sys/windows/event.rs new file mode 100644 index 0000000000..8d0aea7466 --- /dev/null +++ b/cros_async/src/sys/windows/event.rs @@ -0,0 +1,37 @@ +// Copyright 2022 The Chromium OS Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +use crate::{AsyncError, AsyncResult, EventAsync, Executor}; +use base::Event; + +impl EventAsync { + pub fn new(event: Event, ex: &Executor) -> AsyncResult { + ex.async_from(event).map(|io_source| EventAsync { + io_source, + reset_after_read: true, + }) + } + + /// For Windows events, especially those used in overlapped IO, we don't want to reset them + /// after "reading" from them because the signaling state is entirely managed by the kernel. + pub fn new_without_reset(event: Event, ex: &Executor) -> AsyncResult { + ex.async_from(event).map(|io_source| EventAsync { + io_source, + reset_after_read: false, + }) + } + + /// Gets the next value from the eventfd. + pub async fn next_val(&self) -> AsyncResult { + let res = self.io_source.wait_for_handle().await; + + if self.reset_after_read { + self.io_source + .as_source() + .reset() + .map_err(AsyncError::EventAsync)?; + } + res + } +} diff --git a/cros_async/src/sys/windows/executor.rs b/cros_async/src/sys/windows/executor.rs new file mode 100644 index 0000000000..586e3c0eb1 --- /dev/null +++ b/cros_async/src/sys/windows/executor.rs @@ -0,0 +1,291 @@ +// Copyright 2020 The Chromium OS Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +use std::future::Future; + +use async_task::Task; + +use super::{HandleExecutor, HandleSource}; +use crate::{AsyncResult, IntoAsync, IoSourceExt}; + +/// Creates a concrete `IoSourceExt` using the handle_executor. +pub(crate) fn async_handle_from<'a, F: IntoAsync + 'a + Send>( + f: F, +) -> AsyncResult + 'a + Send>> { + Ok(HandleSource::new(vec![f].into_boxed_slice()) + .map(|u| Box::new(u) as Box + Send>)?) +} + +/// An executor for scheduling tasks that poll futures to completion. +/// +/// All asynchronous operations must run within an executor, which is capable of spawning futures as +/// tasks. This executor also provides a mechanism for performing asynchronous I/O operations. +/// +/// The returned type is a cheap, clonable handle to the underlying executor. Cloning it will only +/// create a new reference, not a new executor. +/// +/// Note that language limitations (trait objects can have <=1 non auto trait) require this to be +/// represented on the POSIX side as an enum, rather than a trait. This leads to some code & +/// interface duplication, but as far as we understand that is unavoidable. +/// +/// See https://chromium-review.googlesource.com/c/chromiumos/platform/crosvm/+/2571401/2..6/cros_async/src/executor.rs#b75 +/// for further details. +/// +/// # Examples +/// +/// Concurrently wait for multiple files to become readable/writable and then read/write the data. +/// +/// ``` +/// use std::cmp::min; +/// use std::error::Error; +/// use std::fs::{File, OpenOptions}; +/// +/// use cros_async::{AsyncResult, Executor, IoSourceExt, complete3}; +/// const CHUNK_SIZE: usize = 32; +/// +/// // Write all bytes from `data` to `f`. +/// async fn write_file(f: &dyn IoSourceExt, mut data: Vec) -> AsyncResult<()> { +/// while data.len() > 0 { +/// let (count, mut buf) = f.write_from_vec(Some(0), data).await?; +/// +/// data = buf.split_off(count); +/// } +/// +/// Ok(()) +/// } +/// +/// // Transfer `len` bytes of data from `from` to `to`. +/// async fn transfer_data( +/// from: Box>, +/// to: Box>, +/// len: usize, +/// ) -> AsyncResult { +/// let mut rem = len; +/// +/// while rem > 0 { +/// let buf = vec![0u8; min(rem, CHUNK_SIZE)]; +/// let (count, mut data) = from.read_to_vec(Some(0), buf).await?; +/// +/// if count == 0 { +/// // End of file. Return the number of bytes transferred. +/// return Ok(len - rem); +/// } +/// +/// data.truncate(count); +/// write_file(&*to, data).await?; +/// +/// rem = rem.saturating_sub(count); +/// } +/// +/// Ok(len) +/// } +/// +/// #[cfg(unix)] +/// # fn do_it() -> Result<(), Box> { +/// let ex = Executor::new()?; +/// +/// let (rx, tx) = sys_util::pipe(true)?; +/// let zero = File::open("/dev/zero")?; +/// let zero_bytes = CHUNK_SIZE * 7; +/// let zero_to_pipe = transfer_data( +/// ex.async_from(zero)?, +/// ex.async_from(tx.try_clone()?)?, +/// zero_bytes, +/// ); +/// +/// let rand = File::open("/dev/urandom")?; +/// let rand_bytes = CHUNK_SIZE * 19; +/// let rand_to_pipe = transfer_data(ex.async_from(rand)?, ex.async_from(tx)?, rand_bytes); +/// +/// let null = OpenOptions::new().write(true).open("/dev/null")?; +/// let null_bytes = zero_bytes + rand_bytes; +/// let pipe_to_null = transfer_data(ex.async_from(rx)?, ex.async_from(null)?, null_bytes); +/// +/// ex.run_until(complete3( +/// async { assert_eq!(pipe_to_null.await.unwrap(), null_bytes) }, +/// async { assert_eq!(zero_to_pipe.await.unwrap(), zero_bytes) }, +/// async { assert_eq!(rand_to_pipe.await.unwrap(), rand_bytes) }, +/// ))?; +/// +/// # Ok(()) +/// # } +/// #[cfg(unix)] +/// # do_it().unwrap(); +/// ``` + +#[derive(Clone)] +pub enum Executor { + Handle(HandleExecutor), +} + +impl Executor { + /// Create a new `Executor`. + pub fn new() -> AsyncResult { + Ok(Executor::Handle(HandleExecutor::new())) + } + + /// Create a new `Box>` associated with `self`. Callers may then use the + /// returned `IoSourceExt` to directly start async operations without needing a separate + /// reference to the executor. + pub fn async_from<'a, F: IntoAsync + 'a + Send>( + &self, + f: F, + ) -> AsyncResult + 'a + Send>> { + match self { + Executor::Handle(_) => async_handle_from(f), + } + } + + /// Spawn a new future for this executor to run to completion. Callers may use the returned + /// `Task` to await on the result of `f`. Dropping the returned `Task` will cancel `f`, + /// preventing it from being polled again. To drop a `Task` without canceling the future + /// associated with it use `Task::detach`. To cancel a task gracefully and wait until it is + /// fully destroyed, use `Task::cancel`. + /// + /// # Examples + /// + /// ``` + /// # use cros_async::AsyncResult; + /// # fn example_spawn() -> AsyncResult<()> { + /// # use std::thread; + /// + /// # use cros_async::Executor; + /// use futures::executor::block_on; + /// + /// # let ex = Executor::new()?; + /// + /// # // Spawn a thread that runs the executor. + /// # let ex2 = ex.clone(); + /// # thread::spawn(move || ex2.run()); + /// + /// let task = ex.spawn(async { 7 + 13 }); + /// + /// let result = block_on(task); + /// assert_eq!(result, 20); + /// # Ok(()) + /// # } + /// + /// # example_spawn().unwrap(); + /// ``` + pub fn spawn(&self, f: F) -> Task + where + F: Future + Send + 'static, + F::Output: Send + 'static, + { + match self { + Executor::Handle(ex) => ex.spawn(f), + } + } + + /// Spawn a thread-local task for this executor to drive to completion. Like `spawn` but without + /// requiring `Send` on `F` or `F::Output`. This method should only be called from the same + /// thread where `run()` or `run_until()` is called. + /// + /// # Panics + /// + /// `Executor::run` and `Executor::run_util` will panic if they try to poll a future that was + /// added by calling `spawn_local` from a different thread. + /// + /// # Examples + /// + /// ``` + /// # use cros_async::AsyncResult; + /// # fn example_spawn_local() -> AsyncResult<()> { + /// # use cros_async::Executor; + /// + /// # let ex = Executor::new()?; + /// + /// let task = ex.spawn_local(async { 7 + 13 }); + /// + /// let result = ex.run_until(task)?; + /// assert_eq!(result, 20); + /// # Ok(()) + /// # } + /// + /// # example_spawn_local().unwrap(); + /// ``` + pub fn spawn_local(&self, f: F) -> Task + where + F: Future + 'static, + F::Output: 'static, + { + match self { + Executor::Handle(ex) => ex.spawn_local(f), + } + } + + /// Run the executor indefinitely, driving all spawned futures to completion. This method will + /// block the current thread and only return in the case of an error. + /// + /// # Panics + /// + /// Once this method has been called on a thread, it may only be called on that thread from that + /// point on. Attempting to call it from another thread will panic. + /// + /// # Examples + /// + /// ``` + /// # use cros_async::AsyncResult; + /// # fn example_run() -> AsyncResult<()> { + /// use std::thread; + /// + /// use cros_async::Executor; + /// use futures::executor::block_on; + /// + /// let ex = Executor::new()?; + /// + /// // Spawn a thread that runs the executor. + /// let ex2 = ex.clone(); + /// thread::spawn(move || ex2.run()); + /// + /// let task = ex.spawn(async { 7 + 13 }); + /// + /// let result = block_on(task); + /// assert_eq!(result, 20); + /// # Ok(()) + /// # } + /// + /// # example_run().unwrap(); + /// ``` + pub fn run(&self) -> AsyncResult<()> { + match self { + Executor::Handle(ex) => ex.run()?, + } + + Ok(()) + } + + /// Drive all futures spawned in this executor until `f` completes. This method will block the + /// current thread only until `f` is complete and there may still be unfinished futures in the + /// executor. + /// + /// # Panics + /// + /// Once this method has been called on a thread, from then onwards it may only be called on + /// that thread. Attempting to call it from another thread will panic. + /// + /// # Examples + /// + /// ``` + /// # use cros_async::AsyncResult; + /// # fn example_run_until() -> AsyncResult<()> { + /// use cros_async::Executor; + /// + /// let ex = Executor::new()?; + /// + /// let task = ex.spawn_local(async { 7 + 13 }); + /// + /// let result = ex.run_until(task)?; + /// assert_eq!(result, 20); + /// # Ok(()) + /// # } + /// + /// # example_run_until().unwrap(); + /// ``` + pub fn run_until(&self, f: F) -> AsyncResult { + match self { + Executor::Handle(ex) => Ok(ex.run_until(f)?), + } + } +} diff --git a/cros_async/src/sys/windows/handle_executor.rs b/cros_async/src/sys/windows/handle_executor.rs new file mode 100644 index 0000000000..0afa626291 --- /dev/null +++ b/cros_async/src/sys/windows/handle_executor.rs @@ -0,0 +1,198 @@ +// Copyright 2020 The Chromium OS Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +use crate::{ + queue::RunnableQueue, + waker::{new_waker, WeakWake}, +}; +use async_task::{Runnable, Task}; +use futures::task::{Context, Poll}; +use pin_utils::pin_mut; +use std::{ + future::Future, + io, + sync::{mpsc, Arc, Weak}, +}; +use sync::{Condvar, Mutex}; +use thiserror::Error as ThisError; + +#[derive(Debug, ThisError)] +pub enum Error { + #[error("Failed to get future from executor run.")] + FailedToReadFutureFromWakerChannel(mpsc::RecvError), +} + +impl From for io::Error { + fn from(e: Error) -> Self { + use Error::*; + match e { + FailedToReadFutureFromWakerChannel(e) => io::Error::new(io::ErrorKind::Other, e), + } + } +} + +pub type Result = std::result::Result; + +#[derive(Clone)] +pub struct HandleExecutor { + raw: Arc, +} + +impl HandleExecutor { + pub fn new() -> Self { + Self { + raw: Arc::new(RawExecutor::new()), + } + } + + pub fn spawn(&self, f: F) -> Task + where + F: Future + Send + 'static, + F::Output: Send + 'static, + { + self.raw.spawn(f) + } + + pub fn spawn_local(&self, f: F) -> Task + where + F: Future + 'static, + F::Output: 'static, + { + self.raw.spawn_local(f) + } + + pub fn run(&self) -> Result<()> { + let waker = new_waker(Arc::downgrade(&self.raw)); + let mut cx = Context::from_waker(&waker); + self.raw.run(&mut cx, crate::empty::<()>()) + } + + pub fn run_until(&self, f: F) -> Result { + let waker = new_waker(Arc::downgrade(&self.raw)); + let mut cx = Context::from_waker(&waker); + self.raw.run(&mut cx, f) + } +} + +struct RawExecutor { + queue: RunnableQueue, + woken: Mutex, + wakeup: Condvar, +} + +impl RawExecutor { + fn new() -> Self { + Self { + queue: RunnableQueue::new(), + woken: Mutex::new(false), + wakeup: Condvar::new(), + } + } + + fn make_schedule_fn(self: &Arc) -> impl Fn(Runnable) { + let raw = Arc::downgrade(self); + move |runnable| { + if let Some(r) = raw.upgrade() { + r.queue.push_back(runnable); + r.wake(); + } + } + } + + fn spawn(self: &Arc, f: F) -> Task + where + F: Future + Send + 'static, + F::Output: Send + 'static, + { + let (runnable, task) = async_task::spawn(f, self.make_schedule_fn()); + runnable.schedule(); + task + } + + fn spawn_local(self: &Arc, f: F) -> Task + where + F: Future + 'static, + F::Output: 'static, + { + let (runnable, task) = async_task::spawn_local(f, self.make_schedule_fn()); + runnable.schedule(); + task + } + + fn run(&self, cx: &mut Context, done: F) -> Result { + pin_mut!(done); + + loop { + for runnable in self.queue.iter() { + runnable.run(); + } + if let Poll::Ready(val) = done.as_mut().poll(cx) { + return Ok(val); + } + + self.wait() + } + } + + fn wait(&self) { + let mut woken = self.woken.lock(); + while !*woken { + woken = self.wakeup.wait(woken); + } + *woken = false; + } + + fn wake(self: &Arc) { + *self.woken.lock() = true; + self.wakeup.notify_one(); + } +} + +impl WeakWake for RawExecutor { + fn wake_by_ref(weak_self: &Weak) { + if let Some(arc_self) = weak_self.upgrade() { + RawExecutor::wake(&arc_self); + } + } +} + +#[cfg(test)] +mod test { + use super::*; + const FUT_MSG: i32 = 5; + use futures::{channel::mpsc as fut_mpsc, SinkExt, StreamExt}; + + #[test] + fn run_future() { + let (send, recv) = mpsc::channel(); + async fn this_test(send: mpsc::Sender) { + send.send(FUT_MSG).unwrap(); + } + + let ex = HandleExecutor::new(); + ex.run_until(this_test(send)).unwrap(); + assert_eq!(recv.recv().unwrap(), FUT_MSG); + } + + #[test] + fn spawn_future() { + let (send, recv) = fut_mpsc::channel(1); + let (send_done_signal, receive_done_signal) = mpsc::channel(); + + async fn message_sender(mut send: fut_mpsc::Sender) { + send.send(FUT_MSG).await.unwrap(); + } + + async fn message_receiver(mut recv: fut_mpsc::Receiver, done: mpsc::Sender) { + assert_eq!(recv.next().await.unwrap(), FUT_MSG); + done.send(true).unwrap(); + } + + let ex = HandleExecutor::new(); + ex.spawn(message_sender(send)).detach(); + ex.run_until(message_receiver(recv, send_done_signal)) + .unwrap(); + assert_eq!(receive_done_signal.recv().unwrap(), true); + } +} diff --git a/cros_async/src/sys/windows/handle_source.rs b/cros_async/src/sys/windows/handle_source.rs new file mode 100644 index 0000000000..94e88b6a12 --- /dev/null +++ b/cros_async/src/sys/windows/handle_source.rs @@ -0,0 +1,633 @@ +// Copyright 2020 The Chromium OS Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +use crate::{ + io_ext::AllocateMode, + mem::{BackingMemory, MemRegion}, + AsyncError, AsyncResult, CancellableBlockingPool, IoSourceExt, ReadAsync, WriteAsync, +}; +use async_trait::async_trait; +use base::{ + error, warn, AsRawDescriptor, Descriptor, Error as SysUtilError, FileReadWriteAtVolatile, + FileReadWriteVolatile, PunchHole, WriteZeroesAt, +}; +use data_model::VolatileSlice; +use smallvec::SmallVec; +use std::{ + fs::File, + io::{ + Read, Seek, SeekFrom, Write, {self}, + }, + mem::ManuallyDrop, + os::windows::io::{AsRawHandle, FromRawHandle}, + ptr::null_mut, + sync::Arc, + time::Duration, +}; +use sync::Mutex; +use thiserror::Error as ThisError; +use winapi::um::{ioapiset::CancelIoEx, processthreadsapi::GetCurrentThreadId}; + +#[derive(ThisError, Debug)] +pub enum Error { + #[error("An error occurred trying to seek: {0}.")] + IoSeekError(io::Error), + #[error("An error occurred trying to read: {0}.")] + IoReadError(io::Error), + #[error("An error occurred trying to write: {0}.")] + IoWriteError(io::Error), + #[error("An error occurred trying to flush: {0}.")] + IoFlushError(io::Error), + #[error("An error occurred trying to punch hole: {0}.")] + IoPunchHoleError(io::Error), + #[error("An error occurred trying to write zeroes: {0}.")] + IoWriteZeroesError(io::Error), + #[error("An error occurred trying to duplicate source handles: {0}.")] + HandleDuplicationFailed(io::Error), + #[error("An error occurred trying to wait on source handles: {0}.")] + HandleWaitFailed(base::Error), + #[error("An error occurred trying to get a VolatileSlice into BackingMemory: {0}.")] + BackingMemoryVolatileSliceFetchFailed(crate::mem::Error), + #[error("HandleSource is gone, so no handles are available to fulfill the IO request.")] + NoHandleSource, + #[error("Operation on HandleSource is cancelled.")] + OperationCancelled, + #[error("Operation on HandleSource was aborted (unexpected).")] + OperationAborted, +} + +impl From for io::Error { + fn from(e: Error) -> Self { + use Error::*; + match e { + IoSeekError(e) => e, + IoReadError(e) => e, + IoWriteError(e) => e, + IoFlushError(e) => e, + IoPunchHoleError(e) => e, + IoWriteZeroesError(e) => e, + HandleDuplicationFailed(e) => e, + HandleWaitFailed(e) => e.into(), + BackingMemoryVolatileSliceFetchFailed(e) => io::Error::new(io::ErrorKind::Other, e), + NoHandleSource => io::Error::new(io::ErrorKind::Other, NoHandleSource), + OperationCancelled => io::Error::new(io::ErrorKind::Interrupted, OperationCancelled), + OperationAborted => io::Error::new(io::ErrorKind::Interrupted, OperationAborted), + } + } +} + +pub type Result = std::result::Result; + +/// Used to shutdown IO running on a CancellableBlockingPool. +pub struct HandleWrapper { + handles: Vec, +} + +impl HandleWrapper { + pub fn new(handles: Vec) -> Arc> { + Arc::new(Mutex::new(Self { handles })) + } + + pub fn cancel_sync_io(&mut self, ret: T) -> T { + for handle in &self.handles { + // There isn't much we can do if cancel fails. + if unsafe { CancelIoEx(handle.as_raw_descriptor(), null_mut()) } == 0 { + warn!( + "Cancel IO for handle:{:?} failed with {}", + handle.as_raw_descriptor(), + SysUtilError::last() + ); + } + } + ret + } +} + +/// Async IO source for Windows that uses a multi-threaded, multi-handle approach to provide fast IO +/// operations. It demuxes IO requests across a set of handles that refer to the same underlying IO +/// source, such as a file, and executes those requests across multiple threads. Benchmarks show +/// that this is the fastest method to perform IO on Windows, especially for file reads. +pub struct HandleSource { + sources: Box<[F]>, + source_descriptors: Vec, + blocking_pool: CancellableBlockingPool, +} + +impl HandleSource { + /// Create a new `HandleSource` from the given IO source. + /// + /// Each HandleSource uses its own thread pool, with one thread per source supplied. Since these + /// threads are generally idle because they're waiting on blocking IO, so the cost is minimal. + /// Long term, we may migrate away from this approach toward IOCP or overlapped IO. + /// + /// WARNING: every `source` in `sources` MUST be a unique file object (e.g. separate handles + /// each created by CreateFile), and point at the same file on disk. This is because IO + /// operations on the HandleSource are randomly distributed to each source. + /// + /// # Safety + /// The caller must guarantee that `F`'s handle is compatible with the underlying functions + /// exposed on `HandleSource`. The behavior when calling unsupported functions is not defined + /// by this struct. Note that most winapis will fail with reasonable errors. + pub fn new(sources: Box<[F]>) -> Result { + let source_count = sources.len(); + let mut source_descriptors = Vec::with_capacity(source_count); + + // Safe because consumers of the descriptors are tied to the lifetime of HandleSource. + for source in sources.iter() { + source_descriptors.push(Descriptor(source.as_raw_handle())); + } + + Ok(Self { + sources, + source_descriptors, + blocking_pool: CancellableBlockingPool::new( + // WARNING: this is a safety requirement! Threads are 1:1 with sources. + source_count, + Duration::from_secs(10), + ), + }) + } + + #[inline] + fn get_slices( + mem: &Arc, + mem_offsets: Vec, + ) -> Result> { + mem_offsets + .into_iter() + .map(|region| { + mem.get_volatile_slice(region) + .map_err(Error::BackingMemoryVolatileSliceFetchFailed) + }) + .collect::>>() + } + + // Returns a copy of all the source handles as a vector of descriptors. + fn as_descriptors(&self) -> Vec { + self.sources + .iter() + .map(|i| Descriptor(i.as_raw_handle())) + .collect() + } +} + +impl Drop for HandleSource { + fn drop(&mut self) { + if let Err(e) = self.blocking_pool.shutdown() { + error!("failed to clean up HandleSource: {}", e); + } + } +} + +fn get_thread_file(descriptors: Vec) -> ManuallyDrop { + // Safe because all callers must exit *before* these handles will be closed (guaranteed by + // HandleSource's Drop impl.). + unsafe { + ManuallyDrop::new(File::from_raw_handle( + descriptors[GetCurrentThreadId() as usize % descriptors.len()].0, + )) + } +} + +#[async_trait(?Send)] +impl ReadAsync for HandleSource { + /// Reads from the iosource at `file_offset` and fill the given `vec`. + async fn read_to_vec<'a>( + &'a self, + file_offset: Option, + mut vec: Vec, + ) -> AsyncResult<(usize, Vec)> { + let handles = HandleWrapper::new(self.as_descriptors()); + let descriptors = self.source_descriptors.clone(); + + self.blocking_pool + .spawn( + move || { + let mut file = get_thread_file(descriptors); + if let Some(file_offset) = file_offset { + file.seek(SeekFrom::Start(file_offset)) + .map_err(Error::IoSeekError)?; + } + Ok(( + file.read(vec.as_mut_slice()).map_err(Error::IoReadError)?, + vec, + )) + }, + move || Err(handles.lock().cancel_sync_io(Error::OperationCancelled)), + ) + .await + .map_err(AsyncError::HandleSource) + } + + /// Reads to the given `mem` at the given offsets from the file starting at `file_offset`. + async fn read_to_mem<'a>( + &'a self, + file_offset: Option, + mem: Arc, + mem_offsets: &'a [MemRegion], + ) -> AsyncResult { + let mem_offsets = mem_offsets.to_owned(); + let handles = HandleWrapper::new(self.as_descriptors()); + let descriptors = self.source_descriptors.clone(); + + self.blocking_pool + .spawn( + move || { + let mut file = get_thread_file(descriptors); + let memory_slices = Self::get_slices(&mem, mem_offsets)?; + + match file_offset { + Some(file_offset) => file + .read_vectored_at_volatile(memory_slices.as_slice(), file_offset) + .map_err(Error::IoReadError), + None => file + .read_vectored_volatile(memory_slices.as_slice()) + .map_err(Error::IoReadError), + } + }, + move || Err(handles.lock().cancel_sync_io(Error::OperationCancelled)), + ) + .await + .map_err(AsyncError::HandleSource) + } + + /// Wait for the handle of `self` to be readable. + async fn wait_readable(&self) -> AsyncResult<()> { + unimplemented!() + } + + /// Reads a single u64 from the current offset. + async fn read_u64(&self) -> AsyncResult { + unimplemented!() + } +} + +#[async_trait(?Send)] +impl WriteAsync for HandleSource { + /// Writes from the given `vec` to the file starting at `file_offset`. + async fn write_from_vec<'a>( + &'a self, + file_offset: Option, + vec: Vec, + ) -> AsyncResult<(usize, Vec)> { + let handles = HandleWrapper::new(self.as_descriptors()); + let descriptors = self.source_descriptors.clone(); + + self.blocking_pool + .spawn( + move || { + let mut file = get_thread_file(descriptors); + if let Some(file_offset) = file_offset { + file.seek(SeekFrom::Start(file_offset)) + .map_err(Error::IoSeekError)?; + } + Ok(( + file.write(vec.as_slice()).map_err(Error::IoWriteError)?, + vec, + )) + }, + move || Err(handles.lock().cancel_sync_io(Error::OperationCancelled)), + ) + .await + .map_err(AsyncError::HandleSource) + } + + /// Writes from the given `mem` from the given offsets to the file starting at `file_offset`. + async fn write_from_mem<'a>( + &'a self, + file_offset: Option, + mem: Arc, + mem_offsets: &'a [MemRegion], + ) -> AsyncResult { + let mem_offsets = mem_offsets.to_owned(); + let handles = HandleWrapper::new(self.as_descriptors()); + let descriptors = self.source_descriptors.clone(); + + self.blocking_pool + .spawn( + move || { + let mut file = get_thread_file(descriptors); + let memory_slices = Self::get_slices(&mem, mem_offsets)?; + + match file_offset { + Some(file_offset) => file + .write_vectored_at_volatile(memory_slices.as_slice(), file_offset) + .map_err(Error::IoWriteError), + None => file + .write_vectored_volatile(memory_slices.as_slice()) + .map_err(Error::IoWriteError), + } + }, + move || Err(handles.lock().cancel_sync_io(Error::OperationCancelled)), + ) + .await + .map_err(AsyncError::HandleSource) + } + + /// See `fallocate(2)`. Note this op is synchronous when using the Polled backend. + async fn fallocate(&self, file_offset: u64, len: u64, mode: AllocateMode) -> AsyncResult<()> { + let handles = HandleWrapper::new(self.as_descriptors()); + let descriptors = self.source_descriptors.clone(); + self.blocking_pool + .spawn( + move || { + let mut file = get_thread_file(descriptors); + match mode { + AllocateMode::PunchHole => { + file.punch_hole(file_offset, len) + .map_err(Error::IoPunchHoleError)?; + } + // ZeroRange calls `punch_hole` which doesn't extend the File size if it needs to. + // Will fix if it becomes a problem. + AllocateMode::ZeroRange => { + file.write_zeroes_at(file_offset, len as usize) + .map_err(Error::IoWriteZeroesError)?; + } + } + Ok(()) + }, + move || Err(handles.lock().cancel_sync_io(Error::OperationCancelled)), + ) + .await + .map_err(AsyncError::HandleSource) + } + + /// Sync all completed write operations to the backing storage. + async fn fsync(&self) -> AsyncResult<()> { + let handles = HandleWrapper::new(self.as_descriptors()); + let descriptors = self.source_descriptors.clone(); + + self.blocking_pool + .spawn( + move || { + let mut file = get_thread_file(descriptors); + file.flush().map_err(Error::IoFlushError) + }, + move || Err(handles.lock().cancel_sync_io(Error::OperationCancelled)), + ) + .await + .map_err(AsyncError::HandleSource) + } +} + +/// Subtrait for general async IO. Some not supported on Windows when multiple +/// sources are present. +/// +/// Note that on Windows w/ multiple sources these functions do not make sense. +/// TODO(nkgold): decide on what these should mean. +#[async_trait(?Send)] +impl IoSourceExt for HandleSource { + /// Yields the underlying IO source. + fn into_source(self: Box) -> F { + unimplemented!("`into_source` is not supported on Windows.") + } + + /// Provides a mutable ref to the underlying IO source. + fn as_source_mut(&mut self) -> &mut F { + if self.sources.len() == 1 { + return &mut self.sources[0]; + } + // Unimplemented for multiple-source use-case + unimplemented!( + "`as_source_mut` doesn't support source len of {}", + self.sources.len() + ) + } + + /// Provides a ref to the underlying IO source. + /// + /// In the multi-source case, the 0th source will be returned. If sources are not + /// interchangeable, behavior is undefined. + fn as_source(&self) -> &F { + return &self.sources[0]; + } + + async fn wait_for_handle(&self) -> AsyncResult { + let waiter = super::WaitForHandle::new(self); + match waiter.await { + Err(e) => Err(AsyncError::HandleSource(e)), + Ok(()) => Ok(0), + } + } +} + +#[cfg(test)] +mod tests { + use super::super::HandleExecutor; + use super::*; + use crate::mem::VecIoWrapper; + use std::fs; + use tempfile::{tempfile, NamedTempFile}; + + #[test] + fn test_read_vec() { + let mut f = tempfile().unwrap(); + f.write_all("data".as_bytes()).unwrap(); + f.flush().unwrap(); + f.seek(SeekFrom::Start(0)).unwrap(); + + async fn read_data(handle_src: &HandleSource) { + let buf: Vec = vec![0; 4]; + let (bytes_read, buf) = handle_src.read_to_vec(Some(0), buf).await.unwrap(); + assert_eq!(bytes_read, 4); + assert_eq!(std::str::from_utf8(buf.as_slice()).unwrap(), "data"); + } + + let ex = HandleExecutor::new(); + let handle_src = HandleSource::new(vec![f].into_boxed_slice()).unwrap(); + ex.run_until(read_data(&handle_src)).unwrap(); + } + + #[test] + fn test_read_mem() { + let mut f = tempfile().unwrap(); + f.write_all("data".as_bytes()).unwrap(); + f.flush().unwrap(); + f.seek(SeekFrom::Start(0)).unwrap(); + + async fn read_data(handle_src: &HandleSource) { + let mem = Arc::new(VecIoWrapper::from(vec![0; 4])); + let bytes_read = handle_src + .read_to_mem( + Some(0), + Arc::::clone(&mem), + &[ + MemRegion { offset: 0, len: 2 }, + MemRegion { offset: 2, len: 2 }, + ], + ) + .await + .unwrap(); + assert_eq!(bytes_read, 4); + let vec: Vec = match Arc::try_unwrap(mem) { + Ok(v) => v.into(), + Err(_) => panic!("Too many vec refs"), + }; + assert_eq!(std::str::from_utf8(vec.as_slice()).unwrap(), "data"); + } + + let ex = HandleExecutor::new(); + let handle_src = HandleSource::new(vec![f].into_boxed_slice()).unwrap(); + ex.run_until(read_data(&handle_src)).unwrap(); + } + + #[test] + fn test_write_vec() { + let mut temp_file = NamedTempFile::new().unwrap(); + + async fn write_data(handle_src: &HandleSource) { + let mut buf: Vec = Vec::new(); + buf.extend_from_slice("data".as_bytes()); + + let (bytes_written, _) = handle_src.write_from_vec(Some(0), buf).await.unwrap(); + assert_eq!(bytes_written, 4); + } + + let ex = HandleExecutor::new(); + let f = fs::OpenOptions::new() + .write(true) + .open(temp_file.path()) + .unwrap(); + let handle_src = HandleSource::new(vec![f].into_boxed_slice()).unwrap(); + ex.run_until(write_data(&handle_src)).unwrap(); + + let mut buf = vec![0; 4]; + temp_file.read_exact(&mut buf).unwrap(); + assert_eq!(std::str::from_utf8(buf.as_slice()).unwrap(), "data"); + } + + #[test] + fn test_write_mem() { + let mut temp_file = NamedTempFile::new().unwrap(); + + async fn write_data(handle_src: &HandleSource) { + let mut buf: Vec = Vec::new(); + buf.extend_from_slice("data".as_bytes()); + let mem = Arc::new(VecIoWrapper::from(buf)); + let bytes_written = handle_src + .write_from_mem( + Some(0), + Arc::::clone(&mem), + &[ + MemRegion { offset: 0, len: 2 }, + MemRegion { offset: 2, len: 2 }, + ], + ) + .await + .unwrap(); + assert_eq!(bytes_written, 4); + match Arc::try_unwrap(mem) { + Ok(_) => (), + Err(_) => panic!("Too many vec refs"), + }; + } + + let ex = HandleExecutor::new(); + let f = fs::OpenOptions::new() + .write(true) + .open(temp_file.path()) + .unwrap(); + let handle_src = HandleSource::new(vec![f].into_boxed_slice()).unwrap(); + ex.run_until(write_data(&handle_src)).unwrap(); + + let mut buf = vec![0; 4]; + temp_file.read_exact(&mut buf).unwrap(); + assert_eq!(std::str::from_utf8(buf.as_slice()).unwrap(), "data"); + } + + #[test] + fn test_punch_holes() { + let mut temp_file = NamedTempFile::new().unwrap(); + temp_file.write_all("abcdefghijk".as_bytes()).unwrap(); + temp_file.flush().unwrap(); + temp_file.seek(SeekFrom::Start(0)).unwrap(); + + async fn punch_hole(handle_src: &HandleSource) { + let offset = 1; + let len = 3; + handle_src + .fallocate(offset, len, AllocateMode::PunchHole) + .await + .unwrap(); + } + + let ex = HandleExecutor::new(); + let f = fs::OpenOptions::new() + .write(true) + .open(temp_file.path()) + .unwrap(); + let handle_src = HandleSource::new(vec![f].into_boxed_slice()).unwrap(); + ex.run_until(punch_hole(&handle_src)).unwrap(); + + let mut buf = vec![0; 11]; + temp_file.read_exact(&mut buf).unwrap(); + assert_eq!( + std::str::from_utf8(buf.as_slice()).unwrap(), + "a\0\0\0efghijk" + ); + } + + /// Test should fail because punch hole should not be allowed to allocate more memory + #[test] + fn test_punch_holes_fail_out_of_bounds() { + let mut temp_file = NamedTempFile::new().unwrap(); + temp_file.write_all("abcdefghijk".as_bytes()).unwrap(); + temp_file.flush().unwrap(); + temp_file.seek(SeekFrom::Start(0)).unwrap(); + + async fn punch_hole(handle_src: &HandleSource) { + let offset = 9; + let len = 4; + handle_src + .fallocate(offset, len, AllocateMode::PunchHole) + .await + .unwrap(); + } + + let ex = HandleExecutor::new(); + let f = fs::OpenOptions::new() + .write(true) + .open(temp_file.path()) + .unwrap(); + let handle_src = HandleSource::new(vec![f].into_boxed_slice()).unwrap(); + ex.run_until(punch_hole(&handle_src)).unwrap(); + + let mut buf = vec![0; 13]; + assert!(temp_file.read_exact(&mut buf).is_err()); + } + + // TODO(b/194338842): "ZeroRange" is supposed to allocate more memory if it goes out of the + // bounds of the file. Determine if we need to support this, since Windows doesn't do this yet. + // #[test] + // fn test_write_zeroes() { + // let mut temp_file = NamedTempFile::new().unwrap(); + // temp_file.write("abcdefghijk".as_bytes()).unwrap(); + // temp_file.flush().unwrap(); + // temp_file.seek(SeekFrom::Start(0)).unwrap(); + + // async fn punch_hole(handle_src: &HandleSource) { + // let offset = 9; + // let len = 4; + // handle_src + // .fallocate(offset, len, AllocateMode::ZeroRange) + // .await + // .unwrap(); + // } + + // let ex = HandleExecutor::new(); + // let f = fs::OpenOptions::new() + // .write(true) + // .open(temp_file.path()) + // .unwrap(); + // let handle_src = HandleSource::new(vec![f].into_boxed_slice()).unwrap(); + // ex.run_until(punch_hole(&handle_src)).unwrap(); + + // let mut buf = vec![0; 13]; + // temp_file.read_exact(&mut buf).unwrap(); + // assert_eq!( + // std::str::from_utf8(buf.as_slice()).unwrap(), + // "abcdefghi\0\0\0\0" + // ); + // } +} diff --git a/cros_async/src/sys/windows/timer.rs b/cros_async/src/sys/windows/timer.rs new file mode 100644 index 0000000000..0f0d2244fe --- /dev/null +++ b/cros_async/src/sys/windows/timer.rs @@ -0,0 +1,28 @@ +// Copyright 2022 The Chromium OS Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#[cfg(test)] +mod test { + use crate::{Executor, TimerAsync}; + use std::time::{Duration, Instant}; + + #[test] + fn timer() { + async fn this_test(ex: &Executor) { + // On Windows, SetWaitableTimer, the underlying timer API, is not + // guaranteed to sleep for *at least* the supplied duration, so here + // we permit early wakeups. + let dur = Duration::from_millis(200); + let min_duration = Duration::from_millis(190); + + let now = Instant::now(); + TimerAsync::sleep(ex, dur).await.expect("unable to sleep"); + let actual_sleep_duration = now.elapsed(); + assert!(actual_sleep_duration >= min_duration); + } + + let ex = Executor::new().expect("creating an executor failed"); + ex.run_until(this_test(&ex)).unwrap(); + } +} diff --git a/cros_async/src/sys/windows/wait_for_handle.rs b/cros_async/src/sys/windows/wait_for_handle.rs new file mode 100644 index 0000000000..ffe0aacb43 --- /dev/null +++ b/cros_async/src/sys/windows/wait_for_handle.rs @@ -0,0 +1,309 @@ +// Copyright 2021 The Chromium OS Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +use base::{error, warn, Descriptor}; +use std::{ + ffi::c_void, + future::Future, + marker::{PhantomData, PhantomPinned}, + os::windows::io::AsRawHandle, + pin::Pin, + ptr::null_mut, + sync::MutexGuard, + task::{Context, Poll, Waker}, +}; +use sync::Mutex; +use winapi::{ + shared::ntdef::FALSE, + um::{ + handleapi::INVALID_HANDLE_VALUE, + threadpoollegacyapiset::UnregisterWaitEx, + winbase::{RegisterWaitForSingleObject, INFINITE}, + winnt::{BOOLEAN, PVOID, WT_EXECUTEONLYONCE}, + }, +}; + +use crate::{ + sys::windows::{ + handle_source::{Error, Result}, + HandleSource, + }, + IoSourceExt, +}; + +/// Inner state shared between the future struct & the kernel invoked waiter callback. +struct WaitForHandleInner { + wait_state: WaitState, + wait_object: Descriptor, + waker: Option, +} +impl WaitForHandleInner { + fn new() -> WaitForHandleInner { + WaitForHandleInner { + wait_state: WaitState::New, + wait_object: Descriptor(null_mut::()), + waker: None, + } + } +} + +/// Future's state. +#[derive(Clone, Copy, PartialEq)] +enum WaitState { + New, + Sleeping, + Woken, + Aborted, + Finished, + Failed, +} + +/// Waits for a single handle valued HandleSource to be readable. +pub struct WaitForHandle<'a, T: AsRawHandle> { + handle: Descriptor, + inner: Mutex, + _marker: PhantomData<&'a HandleSource>, + _pinned_marker: PhantomPinned, +} + +impl<'a, T> WaitForHandle<'a, T> +where + T: AsRawHandle, +{ + pub fn new(handle_source: &'a HandleSource) -> WaitForHandle<'a, T> { + WaitForHandle { + handle: Descriptor(handle_source.as_source().as_raw_handle()), + inner: Mutex::new(WaitForHandleInner::new()), + _marker: PhantomData, + _pinned_marker: PhantomPinned, + } + } +} + +impl<'a, T> Future for WaitForHandle<'a, T> +where + T: AsRawHandle, +{ + type Output = Result<()>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { + let inner_for_callback = &self.inner as *const _ as *mut Mutex; + let mut inner = self.inner.lock(); + match inner.wait_state { + WaitState::New => { + // Safe because: + // a) the callback only runs when WaitForHandle is alive (we cancel it on + // drop). + // b) inner & its children are owned by WaitForHandle. + let err = unsafe { + RegisterWaitForSingleObject( + &mut inner.wait_object as *mut _ as *mut *mut c_void, + self.handle.0, + Some(wait_for_handle_waker), + inner_for_callback as *mut c_void, + INFINITE, + WT_EXECUTEONLYONCE, + ) + }; + if err == 0 { + return Poll::Ready(Err(Error::HandleWaitFailed(base::Error::last()))); + } + + inner.wait_state = WaitState::Sleeping; + inner.waker = Some(cx.waker().clone()); + Poll::Pending + } + WaitState::Sleeping => { + // In case we are polled with a different waker which won't be woken by the existing + // waker, we'll have to update to the new waker. + if inner + .waker + .as_ref() + .map(|w| !w.will_wake(cx.waker())) + .unwrap_or(true) + { + inner.waker = Some(cx.waker().clone()); + } + Poll::Pending + } + WaitState::Woken => { + inner.wait_state = WaitState::Finished; + + // Safe because: + // a) we know a wait was registered and hasn't been unregistered yet. + // b) the callback is not queued because we set WT_EXECUTEONLYONCE, and we know + // it has already completed. + unsafe { unregister_wait(inner.wait_object) } + + Poll::Ready(Ok(())) + } + WaitState::Aborted => Poll::Ready(Err(Error::OperationAborted)), + WaitState::Finished => panic!("polled an already completed WaitForHandle future."), + WaitState::Failed => { + panic!("WaitForHandle future's waiter callback hit unexpected behavior.") + } + } + } +} + +impl<'a, T> Drop for WaitForHandle<'a, T> +where + T: AsRawHandle, +{ + fn drop(&mut self) { + // We cannot hold the lock over the call to unregister_wait, otherwise we could deadlock + // with the callback trying to access the same data. It is sufficient to just verify + // (without mutual exclusion beyond the data access itself) that we have exited the New + // state before attempting to unregister. This works because once we have exited New, we + // cannot ever re-enter that state, and we know for sure that inner.wait_object is a valid + // wait object. + let (current_state, wait_object) = { + let inner = self.inner.lock(); + (inner.wait_state, inner.wait_object) + }; + + // Safe because self.descriptor is valid in any state except New or Finished. + // + // Note: this method call is critical for supplying the safety guarantee relied upon by + // wait_for_handle_waker. Upon return, it ensures that wait_for_handle_waker is not running + // and won't be scheduled again, which makes it safe to drop self.inner_for_callback + // (wait_for_handle_waker has a non owning pointer to self.inner_for_callback). + if current_state != WaitState::New && current_state != WaitState::Finished { + unsafe { unregister_wait(wait_object) } + } + } +} + +/// Safe portion of the RegisterWaitForSingleObject callback. +fn process_wait_state_change( + mut state: MutexGuard, + wait_fired: bool, +) -> Option { + let mut waker = None; + state.wait_state = match state.wait_state { + WaitState::Sleeping => { + let new_state = if wait_fired { + WaitState::Woken + } else { + // This should never happen. + error!("wait_for_handle_waker did not wake due to wait firing."); + WaitState::Aborted + }; + + match state.waker.take() { + Some(w) => { + waker = Some(w); + new_state + } + None => { + error!("wait_for_handler_waker called, but no waker available."); + WaitState::Failed + } + } + } + _ => { + error!("wait_for_handle_waker called with state != sleeping."); + WaitState::Failed + } + }; + waker +} + +/// # Safety +/// a) inner_ptr is valid whenever this function can be called. This is guaranteed by WaitForHandle, +/// which cannot be dropped until this function has finished running & is no longer queued for +/// execution because the Drop impl calls UnregisterWaitEx, which blocks on that condition. +unsafe extern "system" fn wait_for_handle_waker(inner_ptr: PVOID, timer_or_wait_fired: BOOLEAN) { + let inner = inner_ptr as *const Mutex; + let inner_locked = (*inner).lock(); + let waker = process_wait_state_change( + inner_locked, + /* wait_fired= */ timer_or_wait_fired == FALSE, + ); + + // We wake *after* releasing the lock to avoid waking up a thread that then will go back to + // sleep because the lock it needs is currently held. + if let Some(w) = waker { + w.wake() + } +} + +/// # Safety +/// a) desc must be a valid wait handle from RegisterWaitForSingleObject. +unsafe fn unregister_wait(desc: Descriptor) { + if UnregisterWaitEx(desc.0, INVALID_HANDLE_VALUE) == 0 { + warn!( + "WaitForHandle: failed to clean up RegisterWaitForSingleObject wait handle: {}", + base::Error::last() + ) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::{ + waker::{new_waker, WeakWake}, + EventAsync, Executor, + }; + use base::{thread::spawn_with_timeout, Event}; + use futures::pin_mut; + use std::{ + sync::{Arc, Weak}, + time::Duration, + }; + + struct FakeWaker {} + impl WeakWake for FakeWaker { + fn wake_by_ref(_weak_self: &Weak) { + // Do nothing. + } + } + + #[test] + fn test_unsignaled_event() { + async fn wait_on_unsignaled_event(evt: EventAsync) { + evt.next_val().await.unwrap(); + panic!("await should never terminate"); + } + + let fake_waker = Arc::new(FakeWaker {}); + let waker = new_waker(Arc::downgrade(&fake_waker)); + let mut cx = Context::from_waker(&waker); + + let ex = Executor::new().unwrap(); + let evt = Event::new().unwrap(); + let async_evt = EventAsync::new(evt, &ex).unwrap(); + + let fut = wait_on_unsignaled_event(async_evt); + pin_mut!(fut); + + // Assert we make it to the pending state. This means we've registered a wait. + assert_eq!(fut.poll(&mut cx), Poll::Pending); + + // If this test doesn't crash trying to drop the future, it is considered successful. + } + + #[test] + fn test_signaled_event() { + let join_handle = spawn_with_timeout(|| { + async fn wait_on_signaled_event(evt: EventAsync) { + evt.next_val().await.unwrap(); + } + + let ex = Executor::new().unwrap(); + let evt = Event::new().unwrap(); + evt.write(0).unwrap(); + let async_evt = EventAsync::new(evt, &ex).unwrap(); + + let fut = wait_on_signaled_event(async_evt); + pin_mut!(fut); + + ex.run_until(fut).unwrap(); + }); + join_handle + .try_join(Duration::from_secs(5)) + .expect("async wait never returned from signaled event."); + } +} diff --git a/cros_async/src/timer.rs b/cros_async/src/timer.rs index e281d0ab0f..269dbb303a 100644 --- a/cros_async/src/timer.rs +++ b/cros_async/src/timer.rs @@ -2,44 +2,30 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. -use base::{Result as SysResult, Timer as TimerFd}; +use crate::{AsyncResult, Error, Executor, IntoAsync, IoSourceExt}; +use base::{Result as SysResult, Timer}; use std::time::Duration; -use super::{AsyncResult, Error, Executor, IntoAsync, IoSourceExt}; - -#[cfg(test)] -use super::{FdExecutor, URingExecutor}; - -/// An async version of base::TimerFd. +/// An async version of base::Timer. pub struct TimerAsync { - io_source: Box>, + pub(crate) io_source: Box>, } impl TimerAsync { - pub fn new(timer: TimerFd, ex: &Executor) -> AsyncResult { + pub fn new(timer: Timer, ex: &Executor) -> AsyncResult { ex.async_from(timer) .map(|io_source| TimerAsync { io_source }) } - #[cfg(test)] - pub(crate) fn new_poll(timer: TimerFd, ex: &FdExecutor) -> AsyncResult { - super::executor::async_poll_from(timer, ex).map(|io_source| TimerAsync { io_source }) - } - - #[cfg(test)] - pub(crate) fn new_uring(timer: TimerFd, ex: &URingExecutor) -> AsyncResult { - super::executor::async_uring_from(timer, ex).map(|io_source| TimerAsync { io_source }) - } - /// Gets the next value from the timer. pub async fn next_val(&self) -> AsyncResult { - self.io_source.read_u64().await + self.io_source.wait_for_handle().await } /// Async sleep for the given duration pub async fn sleep(ex: &Executor, dur: Duration) -> std::result::Result<(), Error> { - let mut tfd = TimerFd::new().map_err(Error::TimerFd)?; - tfd.reset(dur, None).map_err(Error::TimerFd)?; + let mut tfd = Timer::new().map_err(Error::Timer)?; + tfd.reset(dur, None).map_err(Error::Timer)?; let t = TimerAsync::new(tfd, ex).map_err(Error::TimerAsync)?; t.next_val().await.map_err(Error::TimerAsync)?; Ok(()) @@ -53,67 +39,4 @@ impl TimerAsync { } } -impl IntoAsync for TimerFd {} - -#[cfg(test)] -mod tests { - use super::{super::uring_executor::use_uring, *}; - use std::time::{Duration, Instant}; - - #[test] - fn one_shot() { - if !use_uring() { - return; - } - - async fn this_test(ex: &URingExecutor) { - let mut tfd = TimerFd::new().expect("failed to create timerfd"); - - let dur = Duration::from_millis(200); - let now = Instant::now(); - tfd.reset(dur, None).expect("failed to arm timer"); - - let t = TimerAsync::new_uring(tfd, ex).unwrap(); - let count = t.next_val().await.expect("unable to wait for timer"); - - assert_eq!(count, 1); - assert!(now.elapsed() >= dur); - } - - let ex = URingExecutor::new().unwrap(); - ex.run_until(this_test(&ex)).unwrap(); - } - - #[test] - fn one_shot_fd() { - async fn this_test(ex: &FdExecutor) { - let mut tfd = TimerFd::new().expect("failed to create timerfd"); - - let dur = Duration::from_millis(200); - let now = Instant::now(); - tfd.reset(dur, None).expect("failed to arm timer"); - - let t = TimerAsync::new_poll(tfd, ex).unwrap(); - let count = t.next_val().await.expect("unable to wait for timer"); - - assert_eq!(count, 1); - assert!(now.elapsed() >= dur); - } - - let ex = FdExecutor::new().unwrap(); - ex.run_until(this_test(&ex)).unwrap(); - } - - #[test] - fn timer() { - async fn this_test(ex: &Executor) { - let dur = Duration::from_millis(200); - let now = Instant::now(); - TimerAsync::sleep(ex, dur).await.expect("unable to sleep"); - assert!(now.elapsed() >= dur); - } - - let ex = Executor::new().expect("creating an executor failed"); - ex.run_until(this_test(&ex)).unwrap(); - } -} +impl IntoAsync for Timer {}