cros_async: upstream Windows support.

Major changes:
* Adds Windows implementations
* Refactors cros_async to use the styleguide for cross platform code.
* Adds platform specific Descriptor impls to Timer & Event (short term;
  this will go away in the next CL in the series).

Minor adjustments:
* The doctests for the Executor were passing the wrong type of seek
  parameter to write_from_vec. It was disabling seeking (None) when it
  should have been  seeking to zero (Some(0)).

BUG=b:213147081
TEST=tested by Windows & Linux bots.

Change-Id: Id7e025ceb9f1be4a165de1e9ba824cf60dd076ff
Reviewed-on: https://chromium-review.googlesource.com/c/chromiumos/platform/crosvm/+/3579735
Reviewed-by: Alexandre Courbot <acourbot@chromium.org>
Reviewed-by: Keiichi Watanabe <keiichiw@chromium.org>
Tested-by: kokoro <noreply+kokoro@google.com>
Commit-Queue: Noah Gold <nkgold@google.com>
This commit is contained in:
Noah Gold 2022-04-12 21:36:24 -07:00 committed by Chromeos LUCI
parent 1def2e61d9
commit 68bbe92339
37 changed files with 2579 additions and 340 deletions

View file

@ -13,6 +13,9 @@ use crate::{generate_scoped_event, platform::EventFd, RawDescriptor, Result};
#[cfg(unix)]
use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
#[cfg(windows)]
use std::os::windows::io::{AsRawHandle, RawHandle};
/// See [EventFd](crate::platform::EventFd) for struct- and method-level
/// documentation.
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
@ -65,4 +68,11 @@ impl AsRawFd for Event {
}
}
#[cfg(windows)]
impl AsRawHandle for Event {
fn as_raw_handle(&self) -> RawHandle {
self.0.as_raw_handle()
}
}
generate_scoped_event!(Event);

View file

@ -12,6 +12,9 @@ use sync::Mutex;
#[cfg(unix)]
use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
#[cfg(windows)]
use std::os::windows::io::{AsRawHandle, RawHandle};
/// See [TimerFd](crate::platform::TimerFd) for struct- and method-level
/// documentation.
pub struct Timer(pub TimerFd);
@ -21,6 +24,13 @@ impl Timer {
}
}
#[cfg(windows)]
impl AsRawHandle for Timer {
fn as_raw_handle(&self) -> RawHandle {
self.0.as_raw_handle()
}
}
#[cfg(unix)]
impl AsRawFd for Timer {
fn as_raw_fd(&self) -> RawFd {

View file

@ -7,9 +7,9 @@ edition = "2021"
[dependencies]
async-trait = "0.1.36"
async-task = "4"
cfg-if = "1.0.0"
data_model = { path = "../common/data_model" } # provided by ebuild
intrusive-collections = "0.9"
io_uring = { path = "../io_uring" } # provided by ebuild
libc = "*"
once_cell = "1.7.2"
paste = "1.0"
@ -23,6 +23,14 @@ audio_streams = { path = "../common/audio_streams" } # provided by ebuild
anyhow = "1.0"
serde = "*"
[target.'cfg(unix)'.dependencies]
io_uring = { path = "../io_uring" } # provided by ebuild
[target.'cfg(windows)'.dependencies]
winapi = "*"
win_util = { path = "../win_util" }
smallvec = "*"
[dependencies.futures]
version = "*"
default-features = false

View file

@ -6,12 +6,8 @@ use crate::{Executor, IntoAsync};
use base::{AsRawDescriptor, RecvTube, SendTube, Tube, TubeResult};
use serde::{de::DeserializeOwned, Serialize};
use std::io;
use std::os::unix::io::{AsRawFd, RawFd};
#[cfg_attr(windows, path = "win/async_types.rs")]
#[cfg_attr(not(windows), path = "unix/async_types.rs")]
mod async_types;
pub use async_types::*;
pub use crate::sys::async_types::*;
/// Like `cros_async::IntoAsync`, except for use with crosvm's AsRawDescriptor
/// trait object family.
@ -21,16 +17,18 @@ pub trait DescriptorIntoAsync: AsRawDescriptor {}
/// DescriptorIntoAsync (to signify it is suitable for use with async
/// operations), and then wrapped with this type.
pub struct DescriptorAdapter<T: DescriptorIntoAsync>(pub T);
impl<T> AsRawFd for DescriptorAdapter<T>
where
T: DescriptorIntoAsync,
{
fn as_raw_fd(&self) -> RawFd {
self.0.as_raw_descriptor()
}
}
impl<T> IntoAsync for DescriptorAdapter<T> where T: DescriptorIntoAsync {}
// NOTE: A StreamChannel can either be used fully in async mode, or not in async
// mode. Mixing modes will break StreamChannel's internal read/write
// notification system.
//
// TODO(b/213153157): this type isn't properly available upstream yet. Once it
// is, we can re-enable these implementations.
// impl DescriptorIntoAsync for StreamChannel {}
// impl DescriptorIntoAsync for &StreamChannel {}
impl IntoAsync for Tube {}
impl IntoAsync for SendTube {}
impl IntoAsync for RecvTube {}

View file

@ -11,11 +11,14 @@ use std::os::unix::net::UnixStream;
use std::{io::Result, time::Duration};
use super::{AsyncWrapper, IntoAsync, IoSourceExt, TimerAsync};
use crate::{IntoAsync, IoSourceExt, TimerAsync};
use async_trait::async_trait;
use audio_streams::async_api::{
AsyncStream, AudioStreamsExecutor, ReadAsync, ReadWriteAsync, WriteAsync,
};
use audio_streams::async_api::{AudioStreamsExecutor, ReadAsync, ReadWriteAsync, WriteAsync};
#[cfg(unix)]
use super::AsyncWrapper;
#[cfg(unix)]
use audio_streams::async_api::AsyncStream;
/// A wrapper around IoSourceExt that is compatible with the audio_streams traits.
pub struct IoSourceWrapper<T: IntoAsync + Send> {

View file

@ -2,8 +2,10 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
mod block_on;
pub mod sys;
mod cancellable_pool;
mod pool;
pub use block_on::*;
pub use cancellable_pool::*;
pub use pool::*;

View file

@ -0,0 +1,463 @@
// Copyright 2022 The Chromium OS Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
//! Provides an async blocking pool whose tasks can be cancelled.
use std::{
collections::HashMap,
sync::Arc,
time::{Duration, Instant},
};
use crate::BlockingPool;
use async_task::Task;
use once_cell::sync::Lazy;
use sync::{Condvar, Mutex};
use thiserror::Error as ThisError;
/// Global executor.
///
/// This is convenient, though not preferred. Pros/cons:
/// + It avoids passing executor all the way to each call sites.
/// + The call site can assume that executor will never shutdown.
/// + Provides similar functionality as async_task with a few improvements
/// around ability to cancel.
/// - Globals are harder to reason about.
static EXECUTOR: Lazy<CancellableBlockingPool> =
Lazy::new(|| CancellableBlockingPool::new(256, Duration::from_secs(10)));
const DEFAULT_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(5);
#[derive(PartialEq, PartialOrd)]
enum WindDownStates {
Armed,
Disarmed,
ShuttingDown,
ShutDown,
}
impl Default for WindDownStates {
fn default() -> Self {
WindDownStates::Armed
}
}
#[derive(Default)]
struct State {
wind_down: WindDownStates,
/// Helps to generate unique id to associate `cancel` with task.
current_cancellable_id: u64,
/// A map of all the `cancel` routines of queued/in-flight tasks.
cancellables: HashMap<u64, Box<dyn Fn() + Send + 'static>>,
}
#[derive(Debug, Clone, Copy)]
pub enum TimeoutAction {
/// Do nothing on timeout.
None,
/// Panic the thread on timeout.
Panic,
}
#[derive(ThisError, Debug, PartialEq)]
pub enum Error {
#[error("Timeout occurred while trying to join threads")]
Timedout,
#[error("Shutdown is in progress")]
ShutdownInProgress,
#[error("Already shut down")]
AlreadyShutdown,
}
struct Inner {
blocking_pool: BlockingPool,
state: Mutex<State>,
/// This condvar gets notified when `cancellables` is empty after removing an
/// entry.
cancellables_cv: Condvar,
}
impl Inner {
pub fn spawn<F, R>(self: &Arc<Self>, f: F) -> Task<R>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
self.blocking_pool.spawn(f)
}
/// Adds cancel to a cancellables and returns an `id` with which `cancel` can be
/// accessed/removed.
fn add_cancellable(&self, cancel: Box<dyn Fn() + Send + 'static>) -> u64 {
let mut state = self.state.lock();
let id = state.current_cancellable_id;
state.current_cancellable_id += 1;
state.cancellables.insert(id, cancel);
id
}
}
/// A thread pool for running work that may block.
///
/// This is a wrapper around `BlockingPool` with an ability to cancel queued tasks.
/// See [BlockingPool] for more info.
///
/// # Examples
///
/// Spawn a task to run in the `CancellableBlockingPool` and await on its result.
///
/// ```edition2018
/// use cros_async::CancellableBlockingPool;
///
/// # async fn do_it() {
/// let pool = CancellableBlockingPool::default();
/// let CANCELLED = 0;
///
/// let res = pool.spawn(move || {
/// // Do some CPU-intensive or blocking work here.
///
/// 42
/// }, move || CANCELLED).await;
///
/// assert_eq!(res, 42);
/// # }
/// # futures::executor::block_on(do_it());
/// ```
#[derive(Clone)]
pub struct CancellableBlockingPool {
inner: Arc<Inner>,
}
impl CancellableBlockingPool {
const RETRY_COUNT: usize = 10;
const SLEEP_DURATION: Duration = Duration::from_millis(100);
/// Create a new `CancellableBlockingPool`.
///
/// When we try to shutdown or drop `CancellableBlockingPool`, it may happen that a hung thread
/// might prevent `CancellableBlockingPool` pool from getting dropped. On failure to shutdown in
/// `watchdog_opts.timeout` duration, `CancellableBlockingPool` can take an action specified by
/// `watchdog_opts.action`.
///
/// See also: [BlockingPool::new()](BlockingPool::new)
pub fn new(max_threads: usize, keepalive: Duration) -> CancellableBlockingPool {
CancellableBlockingPool {
inner: Arc::new(Inner {
blocking_pool: BlockingPool::new(max_threads, keepalive),
state: Default::default(),
cancellables_cv: Condvar::new(),
}),
}
}
/// Like [Self::new] but with pre-allocating capacity for up to `max_threads`.
pub fn with_capacity(max_threads: usize, keepalive: Duration) -> CancellableBlockingPool {
CancellableBlockingPool {
inner: Arc::new(Inner {
blocking_pool: BlockingPool::with_capacity(max_threads, keepalive),
state: Mutex::new(State::default()),
cancellables_cv: Condvar::new(),
}),
}
}
/// Spawn a task to run in the `CancellableBlockingPool`.
///
/// Callers may `await` the returned `Task` to be notified when the work is completed.
///
/// `cancel` helps to cancel a queued or in-flight operation `f`.
/// `cancel` may be called more than once if `f` doesn't respond to `cancel`.
/// `cancel` is not called if `f` completes successfully. For example,
/// # Examples
///
/// ```edition2018
/// use {cros_async::CancellableBlockingPool, std::sync::{Arc, Mutex, Condvar}};
///
/// # async fn cancel_it() {
/// let pool = CancellableBlockingPool::default();
/// let cancelled: i32 = 1;
/// let success: i32 = 2;
///
/// let shared = Arc::new((Mutex::new(0), Condvar::new()));
/// let shared2 = shared.clone();
/// let shared3 = shared.clone();
///
/// let res = pool
/// .spawn(
/// move || {
/// let guard = shared.0.lock().unwrap();
/// let mut guard = shared.1.wait_while(guard, |state| *state == 0).unwrap();
/// if *guard != cancelled {
/// *guard = success;
/// }
/// },
/// move || {
/// *shared2.0.lock().unwrap() = cancelled;
/// shared2.1.notify_all();
/// },
/// )
/// .await;
/// pool.shutdown();
///
/// assert_eq!(*shared3.0.lock().unwrap(), cancelled);
/// # }
/// ```
pub fn spawn<F, R, G>(&self, f: F, cancel: G) -> Task<R>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
G: Fn() -> R + Send + 'static,
{
let inner = self.inner.clone();
let cancelled = Arc::new(Mutex::new(None));
let cancelled_spawn = cancelled.clone();
let id = inner.add_cancellable(Box::new(move || {
let mut c = cancelled.lock();
*c = Some(cancel());
}));
self.inner.spawn(move || {
if let Some(res) = cancelled_spawn.lock().take() {
return res;
}
let ret = f();
let mut state = inner.state.lock();
state.cancellables.remove(&id);
if state.cancellables.is_empty() {
inner.cancellables_cv.notify_one();
}
ret
})
}
/// Iterates over all the queued tasks and marks them as cancelled.
fn drain_cancellables(&self) {
let mut state = self.inner.state.lock();
// Iterate a few times to try cancelling all the tasks.
for _ in 0..Self::RETRY_COUNT {
// Nothing left to do.
if state.cancellables.is_empty() {
return;
}
// We only cancel the task and do not remove it from the cancellables. It is runner's
// job to remove from state.cancellables.
for cancel in state.cancellables.values() {
cancel();
}
// Hold the state lock in a block before sleeping so that woken up threads can get to
// hold the lock.
// Wait for a while so that the threads get a chance complete task in flight.
let (state1, _cv_timeout) = self
.inner
.cancellables_cv
.wait_timeout(state, Self::SLEEP_DURATION);
state = state1;
}
}
/// Marks all the queued and in-flight tasks as cancelled. Any tasks queued after `disarm`ing
/// will be cancelled.
/// Does not wait for all the tasks to get cancelled.
pub fn disarm(&self) {
{
let mut state = self.inner.state.lock();
if state.wind_down >= WindDownStates::Disarmed {
return;
}
// At this point any new incoming request will be cancelled when run.
state.wind_down = WindDownStates::Disarmed;
}
self.drain_cancellables();
}
/// Shut down the `CancellableBlockingPool`.
///
/// This will block until all work that has been started by the worker threads is finished. Any
/// work that was added to the `CancellableBlockingPool` but not yet picked up by a worker
/// thread will not complete and `await`ing on the `Task` for that work will panic.
///
pub fn shutdown(&self) -> Result<(), Error> {
self.disarm();
{
let mut state = self.inner.state.lock();
if state.wind_down == WindDownStates::ShuttingDown {
return Err(Error::ShutdownInProgress);
}
if state.wind_down == WindDownStates::ShutDown {
return Err(Error::AlreadyShutdown);
}
state.wind_down = WindDownStates::ShuttingDown;
}
let res = self.inner.blocking_pool.shutdown(/* deadline: */ Some(
Instant::now() + DEFAULT_SHUTDOWN_TIMEOUT,
));
self.inner.state.lock().wind_down = WindDownStates::ShutDown;
match res {
Ok(_) => Ok(()),
Err(_) => Err(Error::Timedout),
}
}
}
impl Default for CancellableBlockingPool {
fn default() -> CancellableBlockingPool {
CancellableBlockingPool::new(256, Duration::from_secs(10))
}
}
impl Drop for CancellableBlockingPool {
fn drop(&mut self) {
let _ = self.shutdown();
}
}
/// Spawn a task to run in the `CancellableBlockingPool` static executor.
///
/// `cancel` in-flight operation. cancel is called on operation during `disarm` or during
/// `shutdown`. Cancel may be called multiple times if running task doesn't get cancelled on first
/// attempt.
///
/// Callers may `await` the returned `Task` to be notified when the work is completed.
///
/// See also: `spawn`.
pub fn unblock<F, R, G>(f: F, cancel: G) -> Task<R>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
G: Fn() -> R + Send + 'static,
{
EXECUTOR.spawn(f, cancel)
}
/// Marks all the queued and in-flight tasks as cancelled. Any tasks queued after `disarm`ing
/// will be cancelled.
/// Doesn't not wait for all the tasks to get cancelled.
pub fn unblock_disarm() {
EXECUTOR.disarm()
}
#[cfg(test)]
mod test {
use std::{sync::Arc, thread, time::Duration};
use futures::executor::block_on;
use sync::{Condvar, Mutex};
use crate::{blocking::Error, CancellableBlockingPool};
use std::sync::Barrier;
#[test]
fn disarm_with_pending_work() {
// Create a pool with only one thread.
let pool = CancellableBlockingPool::new(1, Duration::from_secs(10));
let mu = Arc::new(Mutex::new(false));
let cv = Arc::new(Condvar::new());
let blocker_is_running = Arc::new(Barrier::new(2));
// First spawn a thread that blocks the pool.
let task_mu = mu.clone();
let task_cv = cv.clone();
let task_blocker_is_running = blocker_is_running.clone();
pool.spawn(
move || {
task_blocker_is_running.wait();
let mut ready = task_mu.lock();
while !*ready {
ready = task_cv.wait(ready);
}
},
move || {},
)
.detach();
// Wait for the worker to start running the blocking thread.
blocker_is_running.wait();
// This task will never finish because we will disarm the pool first.
let unfinished = pool.spawn(|| 5, || 0);
// Disarming should cancel the task.
let _ = pool.disarm();
// Shutdown the blocking thread. This will allow a worker to pick up the task that has
// to be cancelled.
*mu.lock() = true;
cv.notify_all();
// We expect the cancelled value to be returned.
assert_eq!(block_on(unfinished), 0);
// Now the pool is empty and can be shutdown without blocking.
let _ = pool.shutdown().unwrap();
}
#[test]
fn shutdown_with_blocked_work_should_panic() {
let pool = CancellableBlockingPool::new(1, Duration::from_secs(10));
let running = Arc::new((Mutex::new(false), Condvar::new()));
let running1 = running.clone();
pool.spawn(
move || {
*running1.0.lock() = true;
running1.1.notify_one();
thread::sleep(Duration::from_secs(10000));
},
move || {},
)
.detach();
let mut is_running = running.0.lock();
while !*is_running {
is_running = running.1.wait(is_running);
}
assert_eq!(pool.shutdown().err().unwrap(), Error::Timedout);
}
#[test]
fn multiple_shutdown_returns_error() {
let pool = CancellableBlockingPool::new(1, Duration::from_secs(10));
let _ = pool.shutdown();
assert_eq!(pool.shutdown(), Err(Error::AlreadyShutdown));
}
#[test]
fn shutdown_in_progress() {
let pool = CancellableBlockingPool::new(1, Duration::from_secs(10));
let running = Arc::new((Mutex::new(false), Condvar::new()));
let running1 = running.clone();
pool.spawn(
move || {
*running1.0.lock() = true;
running1.1.notify_one();
thread::sleep(Duration::from_secs(10000));
},
move || {},
)
.detach();
let mut is_running = running.0.lock();
while !*is_running {
is_running = running.1.wait(is_running);
}
let pool_clone = pool.clone();
thread::spawn(move || {
while !pool_clone.inner.blocking_pool.shutting_down() {}
assert_eq!(pool_clone.shutdown(), Err(Error::ShutdownInProgress));
});
assert_eq!(pool.shutdown().err().unwrap(), Error::Timedout);
}
}

View file

@ -135,6 +135,24 @@ impl Inner {
self.condvar.notify_one();
}
}
pub fn spawn<F, R>(self: &Arc<Self>, f: F) -> Task<R>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
let raw = Arc::downgrade(self);
let schedule = move |runnable| {
if let Some(i) = raw.upgrade() {
i.schedule(runnable);
}
};
let (runnable, task) = async_task::spawn(async move { f() }, schedule);
runnable.schedule();
task
}
}
#[derive(Debug, thiserror::Error)]
@ -253,17 +271,7 @@ impl BlockingPool {
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
let raw = Arc::downgrade(&self.inner);
let schedule = move |runnable| {
if let Some(i) = raw.upgrade() {
i.schedule(runnable);
}
};
let (runnable, task) = async_task::spawn(async move { f() }, schedule);
runnable.schedule();
task
self.inner.spawn(f)
}
/// Shut down the `BlockingPool`.
@ -316,6 +324,11 @@ impl BlockingPool {
Ok(())
}
}
#[cfg(test)]
pub(crate) fn shutting_down(&self) -> bool {
self.inner.state.lock().shutting_down
}
}
impl Default for BlockingPool {
@ -340,10 +353,10 @@ mod test {
time::{Duration, Instant},
};
use futures::{stream::FuturesUnordered, StreamExt};
use futures::{executor::block_on, stream::FuturesUnordered, StreamExt};
use sync::{Condvar, Mutex};
use super::super::super::{block_on, BlockingPool};
use super::super::super::BlockingPool;
#[test]
fn blocking_sleep() {

View file

@ -0,0 +1,6 @@
// Copyright 2022 The Chromium OS Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#[cfg(unix)]
pub mod unix;

View file

@ -0,0 +1,5 @@
// Copyright 2022 The Chromium OS Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
pub mod block_on;

View file

@ -125,7 +125,7 @@ mod test {
time::Duration,
};
use super::super::super::sync::SpinLock;
use crate::sync::SpinLock;
struct TimerState {
fired: bool,

View file

@ -2,85 +2,23 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use super::{AsyncResult, Executor, IntoAsync, IoSourceExt};
use base::Event as EventFd;
use crate::{IntoAsync, IoSourceExt};
use base::Event;
/// An async version of `base::EventFd`.
/// An async version of `base::Event`.
pub struct EventAsync {
io_source: Box<dyn IoSourceExt<EventFd>>,
pub(crate) io_source: Box<dyn IoSourceExt<Event>>,
#[cfg(windows)]
pub(crate) reset_after_read: bool,
}
impl EventAsync {
pub fn new(event: EventFd, ex: &Executor) -> AsyncResult<EventAsync> {
ex.async_from(event)
.map(|io_source| EventAsync { io_source })
}
#[cfg(test)]
pub(crate) fn new_poll(event: EventFd, ex: &super::FdExecutor) -> AsyncResult<EventAsync> {
super::executor::async_poll_from(event, ex).map(|io_source| EventAsync { io_source })
}
#[cfg(test)]
pub(crate) fn new_uring(event: EventFd, ex: &super::URingExecutor) -> AsyncResult<EventAsync> {
super::executor::async_uring_from(event, ex).map(|io_source| EventAsync { io_source })
}
/// Gets the next value from the eventfd.
#[allow(dead_code)]
pub async fn next_val(&self) -> AsyncResult<u64> {
self.io_source.read_u64().await
pub fn get_io_source_ref(&self) -> &dyn IoSourceExt<Event> {
self.io_source.as_ref()
}
}
impl IntoAsync for EventFd {}
impl IntoAsync for Event {}
#[cfg(test)]
mod tests {
use super::*;
use super::super::{uring_executor::use_uring, Executor, FdExecutor, URingExecutor};
#[test]
fn next_val_reads_value() {
async fn go(event: EventFd, ex: &Executor) -> u64 {
let event_async = EventAsync::new(event, ex).unwrap();
event_async.next_val().await.unwrap()
}
let eventfd = EventFd::new().unwrap();
eventfd.write(0xaa).unwrap();
let ex = Executor::new().unwrap();
let val = ex.run_until(go(eventfd, &ex)).unwrap();
assert_eq!(val, 0xaa);
}
#[test]
fn next_val_reads_value_poll_and_ring() {
if !use_uring() {
return;
}
async fn go(event_async: EventAsync) -> u64 {
event_async.next_val().await.unwrap()
}
let eventfd = EventFd::new().unwrap();
eventfd.write(0xaa).unwrap();
let uring_ex = URingExecutor::new().unwrap();
let val = uring_ex
.run_until(go(EventAsync::new_uring(eventfd, &uring_ex).unwrap()))
.unwrap();
assert_eq!(val, 0xaa);
let eventfd = EventFd::new().unwrap();
eventfd.write(0xaa).unwrap();
let poll_ex = FdExecutor::new().unwrap();
let val = poll_ex
.run_until(go(EventAsync::new_poll(eventfd, &poll_ex).unwrap()))
.unwrap();
assert_eq!(val, 0xaa);
}
}
// Safe because an `EventFd` is used underneath, which is safe to pass between threads.
// Safe because an `Event` is used underneath, which is safe to pass between threads.
unsafe impl Send for EventAsync {}

View file

@ -19,29 +19,64 @@ use std::{
fs::File,
io,
ops::{Deref, DerefMut},
os::unix::io::{AsRawFd, RawFd},
sync::Arc,
};
use async_trait::async_trait;
use base::UnixSeqpacket;
use remain::sorted;
use thiserror::Error as ThisError;
use super::{BackingMemory, MemRegion};
#[cfg(unix)]
use base::UnixSeqpacket;
#[cfg(unix)]
use std::os::unix::io::{AsRawFd, RawFd};
#[cfg(windows)]
use std::os::windows::io::{AsRawHandle, RawHandle};
#[cfg(unix)]
#[sorted]
#[derive(ThisError, Debug)]
pub enum Error {
/// An error with a polled(FD) source.
#[error("An error with a poll source: {0}")]
Poll(#[from] super::poll_source::Error),
Poll(crate::sys::unix::poll_source::Error),
/// An error with a uring source.
#[error("An error with a uring source: {0}")]
Uring(#[from] super::uring_executor::Error),
Uring(crate::sys::unix::uring_executor::Error),
}
#[cfg(windows)]
#[sorted]
#[derive(ThisError, Debug)]
pub enum Error {
#[error("An error with an EventAsync: {0}")]
EventAsync(base::Error),
#[error("An error with a handle executor: {0}")]
HandleExecutor(crate::sys::windows::handle_executor::Error),
#[error("An error with a handle source: {0}")]
HandleSource(crate::sys::windows::handle_source::Error),
}
pub type Result<T> = std::result::Result<T, Error>;
#[cfg(unix)]
impl From<crate::sys::unix::uring_executor::Error> for Error {
fn from(err: crate::sys::unix::uring_executor::Error) -> Self {
Error::Uring(err)
}
}
#[cfg(unix)]
impl From<crate::sys::unix::poll_source::Error> for Error {
fn from(err: crate::sys::unix::poll_source::Error) -> Self {
Error::Poll(err)
}
}
#[cfg(unix)]
impl From<Error> for io::Error {
fn from(e: Error) -> Self {
use Error::*;
@ -52,6 +87,32 @@ impl From<Error> for io::Error {
}
}
#[cfg(windows)]
impl From<Error> for io::Error {
fn from(e: Error) -> Self {
use Error::*;
match e {
EventAsync(e) => e.into(),
HandleExecutor(e) => e.into(),
HandleSource(e) => e.into(),
}
}
}
#[cfg(windows)]
impl From<crate::sys::windows::handle_source::Error> for Error {
fn from(err: crate::sys::windows::handle_source::Error) -> Self {
Error::HandleSource(err)
}
}
#[cfg(windows)]
impl From<crate::sys::windows::handle_executor::Error> for Error {
fn from(err: crate::sys::windows::handle_executor::Error) -> Self {
Error::HandleExecutor(err)
}
}
/// Ergonomic methods for async reads.
#[async_trait(?Send)]
pub trait ReadAsync {
@ -136,6 +197,10 @@ pub trait IoSourceExt<F>: ReadAsync + WriteAsync {
/// Provides a ref to the underlying IO source.
fn as_source(&self) -> &F;
/// Waits on a waitable handle. Needed for
/// Windows currently, and subject to a potential future upstream.
async fn wait_for_handle(&self) -> Result<u64>;
}
/// Marker trait signifying that the implementor is suitable for use with
@ -144,10 +209,15 @@ pub trait IoSourceExt<F>: ReadAsync + WriteAsync {
/// (Note: it'd be really nice to implement a TryFrom for any implementors, and
/// remove our factory functions. Unfortunately
/// <https://github.com/rust-lang/rust/issues/50133> makes that too painful.)
#[cfg(unix)]
pub trait IntoAsync: AsRawFd {}
#[cfg(windows)]
pub trait IntoAsync: AsRawHandle {}
impl IntoAsync for File {}
#[cfg(unix)]
impl IntoAsync for UnixSeqpacket {}
#[cfg(unix)]
impl IntoAsync for &UnixSeqpacket {}
/// Simple wrapper struct to implement IntoAsync on foreign types.
@ -179,16 +249,29 @@ impl<T> DerefMut for AsyncWrapper<T> {
}
}
#[cfg(unix)]
impl<T: AsRawFd> AsRawFd for AsyncWrapper<T> {
fn as_raw_fd(&self) -> RawFd {
self.0.as_raw_fd()
}
}
#[cfg(windows)]
impl<T: AsRawHandle> AsRawHandle for AsyncWrapper<T> {
fn as_raw_handle(&self) -> RawHandle {
self.0.as_raw_handle()
}
}
#[cfg(unix)]
impl<T: AsRawFd> IntoAsync for AsyncWrapper<T> {}
#[cfg(test)]
#[cfg(windows)]
impl<T: AsRawHandle> IntoAsync for AsyncWrapper<T> {}
#[cfg(all(test, unix))]
mod tests {
use base::Event;
use std::{
fs::{File, OpenOptions},
future::Future,
@ -200,16 +283,16 @@ mod tests {
};
use sync::Mutex;
use super::{
super::{
use super::*;
use crate::{
mem::VecIoWrapper,
sys::unix::{
executor::{async_poll_from, async_uring_from},
mem::VecIoWrapper,
uring_executor::use_uring,
Executor, FdExecutor, MemRegion, PollSource, URingExecutor, UringSource,
FdExecutor, PollSource, URingExecutor, UringSource,
},
*,
Executor, MemRegion,
};
use base::Event;
struct State {
should_quit: bool,

View file

@ -63,44 +63,41 @@ pub mod audio_streams_async;
mod blocking;
mod complete;
mod event;
mod executor;
mod fd_executor;
mod io_ext;
pub mod mem;
mod poll_source;
mod queue;
mod select;
pub mod sync;
pub mod sys;
pub use sys::Executor;
mod timer;
mod uring_executor;
mod uring_source;
mod waker;
pub use async_types::*;
pub use base;
pub use blocking::{block_on, BlockingPool};
pub use base::Event;
#[cfg(unix)]
pub use blocking::sys::unix::block_on::block_on;
pub use blocking::BlockingPool;
pub use event::EventAsync;
pub use executor::Executor;
pub use fd_executor::FdExecutor;
#[cfg(windows)]
pub use futures::executor::block_on;
pub use io_ext::{
AllocateMode, AsyncWrapper, Error as AsyncError, IntoAsync, IoSourceExt, ReadAsync,
Result as AsyncResult, WriteAsync,
};
pub use mem::{BackingMemory, MemRegion};
pub use poll_source::PollSource;
pub use select::SelectResult;
pub use sys::run_one;
pub use timer::TimerAsync;
pub use uring_executor::URingExecutor;
pub use uring_source::UringSource;
use std::{
future::Future,
io,
marker::PhantomData,
pin::Pin,
task::{Context, Poll},
};
pub use blocking::{unblock, unblock_disarm, CancellableBlockingPool, TimeoutAction};
use remain::sorted;
use thiserror::Error as ThisError;
@ -108,32 +105,26 @@ use thiserror::Error as ThisError;
#[derive(ThisError, Debug)]
pub enum Error {
/// Error from the FD executor.
#[cfg(unix)]
#[error("Failure in the FD executor: {0}")]
FdExecutor(fd_executor::Error),
FdExecutor(sys::unix::fd_executor::Error),
/// Error from the handle executor.
#[cfg(windows)]
#[error("Failure in the handle executor: {0}")]
HandleExecutor(sys::windows::handle_executor::Error),
/// Error from Timer.
#[error("Failure in Timer: {0}")]
Timer(base::Error),
/// Error from TimerFd.
#[error("Failure in TimerAsync: {0}")]
TimerAsync(AsyncError),
/// Error from TimerFd.
#[error("Failure in TimerFd: {0}")]
TimerFd(base::Error),
/// Error from the uring executor.
#[cfg(unix)]
#[error("Failure in the uring executor: {0}")]
URingExecutor(uring_executor::Error),
URingExecutor(sys::unix::uring_executor::Error),
}
pub type Result<T> = std::result::Result<T, Error>;
impl From<Error> for io::Error {
fn from(e: Error) -> Self {
use Error::*;
match e {
FdExecutor(e) => e.into(),
URingExecutor(e) => e.into(),
TimerFd(e) => e.into(),
TimerAsync(e) => e.into(),
}
}
}
// A Future that never completes.
pub struct Empty<T> {
phantom: PhantomData<T>,
@ -153,56 +144,6 @@ pub fn empty<T>() -> Empty<T> {
}
}
/// Creates an Executor that runs one future to completion.
///
/// # Example
///
/// ```
/// use cros_async::run_one;
///
/// let fut = async { 55 };
/// assert_eq!(55, run_one(fut).unwrap());
/// ```
pub fn run_one<F: Future>(fut: F) -> Result<F::Output> {
if uring_executor::use_uring() {
run_one_uring(fut)
} else {
run_one_poll(fut)
}
}
/// Creates a URingExecutor that runs one future to completion.
///
/// # Example
///
/// ```
/// use cros_async::run_one_uring;
///
/// let fut = async { 55 };
/// assert_eq!(55, run_one_uring(fut).unwrap());
/// ```
pub fn run_one_uring<F: Future>(fut: F) -> Result<F::Output> {
URingExecutor::new()
.and_then(|ex| ex.run_until(fut))
.map_err(Error::URingExecutor)
}
/// Creates a FdExecutor that runs one future to completion.
///
/// # Example
///
/// ```
/// use cros_async::run_one_poll;
///
/// let fut = async { 55 };
/// assert_eq!(55, run_one_poll(fut).unwrap());
/// ```
pub fn run_one_poll<F: Future>(fut: F) -> Result<F::Output> {
FdExecutor::new()
.and_then(|ex| ex.run_until(fut))
.map_err(Error::FdExecutor)
}
// Select helpers to run until any future completes.
/// Creates a combinator that runs the two given futures until one completes, returning a tuple

View file

@ -25,7 +25,7 @@ pub struct MemRegion {
pub len: usize,
}
/// Trait for memory that can yeild both iovecs in to the backing memory.
/// Trait for memory that can yield both iovecs in to the backing memory.
/// Must be OK to modify the backing memory without owning a mut able reference. For example,
/// this is safe for GuestMemory and VolatileSlices in crosvm as those types guarantee they are
/// dealt with as volatile.
@ -63,6 +63,7 @@ impl From<VecIoWrapper> for Vec<u8> {
impl VecIoWrapper {
/// Get the length of the Vec that is wrapped.
#[cfg_attr(windows, allow(dead_code))]
pub fn len(&self) -> usize {
self.inner.len()
}

View file

@ -26,7 +26,6 @@ macro_rules! generate {
$(#[$doc:meta])*
($Select:ident, <$($Fut:ident),*>),
)*) => ($(
paste::item! {
pub(crate) struct $Select<$($Fut: Future + Unpin),*> {
$($Fut: MaybeDone<$Fut>,)*

View file

@ -449,6 +449,8 @@ fn cancel_waiter(cv: usize, waiter: &Waiter, wake_next: bool) {
unsafe { (*condvar).cancel_waiter(waiter, wake_next) }
}
// TODO(b/194338842): Fix tests for windows
#[cfg(unix)]
#[cfg(test)]
mod test {
use super::*;

View file

@ -883,6 +883,8 @@ impl<'a, T: ?Sized> Drop for MutexReadGuard<'a, T> {
}
}
// TODO(b/194338842): Fix tests for windows
#[cfg(unix)]
#[cfg(test)]
mod test {
use super::*;

13
cros_async/src/sys.rs Normal file
View file

@ -0,0 +1,13 @@
// Copyright 2022 The Chromium OS Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
cfg_if::cfg_if! {
if #[cfg(unix)] {
pub mod unix;
pub use unix::{async_types, event, executor::Executor, run_one};
} else if #[cfg(windows)] {
pub mod windows;
pub use windows::{async_types, event, executor::Executor, run_one};
}
}

View file

@ -0,0 +1,81 @@
// Copyright 2021 The Chromium OS Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
pub mod async_types;
pub mod event;
pub mod executor;
pub mod fd_executor;
pub mod poll_source;
pub mod uring_executor;
pub mod uring_source;
pub use fd_executor::FdExecutor;
pub use poll_source::PollSource;
pub use uring_executor::URingExecutor;
pub use uring_source::UringSource;
mod timer;
use crate::{Error, Result};
use std::future::Future;
/// Creates a URingExecutor that runs one future to completion.
///
/// # Example
///
/// ```
/// use cros_async::sys::unix::run_one_uring;
///
/// let fut = async { 55 };
/// assert_eq!(55, run_one_uring(fut).unwrap());
/// ```
pub fn run_one_uring<F: Future>(fut: F) -> Result<F::Output> {
URingExecutor::new()
.and_then(|ex| ex.run_until(fut))
.map_err(Error::URingExecutor)
}
/// Creates a FdExecutor that runs one future to completion.
///
/// # Example
///
/// ```
/// use cros_async::sys::unix::run_one_poll;
///
/// let fut = async { 55 };
/// assert_eq!(55, run_one_poll(fut).unwrap());
/// ```
pub fn run_one_poll<F: Future>(fut: F) -> Result<F::Output> {
FdExecutor::new()
.and_then(|ex| ex.run_until(fut))
.map_err(Error::FdExecutor)
}
/// Creates an Executor that runs one future to completion.
///
/// # Example
///
/// ```
/// use cros_async::sys::unix::run_one;
///
/// let fut = async { 55 };
/// assert_eq!(55, run_one(fut).unwrap());
/// ```
pub fn run_one<F: Future>(fut: F) -> Result<F::Output> {
if uring_executor::use_uring() {
run_one_uring(fut)
} else {
run_one_poll(fut)
}
}
impl From<Error> for std::io::Error {
fn from(e: Error) -> Self {
use Error::*;
match e {
FdExecutor(e) => e.into(),
URingExecutor(e) => e.into(),
Timer(e) => e.into(),
TimerAsync(e) => e.into(),
}
}
}

View file

@ -1,11 +1,13 @@
// Copyright 2022 The Chromium OS Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use crate::{Executor, IoSourceExt};
use crate::{DescriptorAdapter, DescriptorIntoAsync, Executor, IoSourceExt};
use base::{Tube, TubeResult};
use serde::{de::DeserializeOwned, Serialize};
use std::io;
use std::ops::Deref;
use std::os::unix::io::{AsRawFd, RawFd};
pub struct AsyncTube {
inner: Box<dyn IoSourceExt<Tube>>,
@ -40,3 +42,12 @@ impl From<AsyncTube> for Tube {
at.inner.into_source()
}
}
impl<T> AsRawFd for DescriptorAdapter<T>
where
T: DescriptorIntoAsync,
{
fn as_raw_fd(&self) -> RawFd {
self.0.as_raw_descriptor()
}
}

View file

@ -0,0 +1,78 @@
// Copyright 2022 The Chromium OS Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#[cfg(test)]
use super::{FdExecutor, URingExecutor};
use crate::{AsyncResult, EventAsync, Executor};
use base::Event as EventFd;
impl EventAsync {
pub fn new(event: EventFd, ex: &Executor) -> AsyncResult<EventAsync> {
ex.async_from(event)
.map(|io_source| EventAsync { io_source })
}
/// Gets the next value from the eventfd.
pub async fn next_val(&self) -> AsyncResult<u64> {
self.io_source.read_u64().await
}
#[cfg(test)]
pub(crate) fn new_poll(event: EventFd, ex: &FdExecutor) -> AsyncResult<EventAsync> {
super::executor::async_poll_from(event, ex).map(|io_source| EventAsync { io_source })
}
#[cfg(test)]
pub(crate) fn new_uring(event: EventFd, ex: &URingExecutor) -> AsyncResult<EventAsync> {
super::executor::async_uring_from(event, ex).map(|io_source| EventAsync { io_source })
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::sys::unix::uring_executor::use_uring;
#[test]
fn next_val_reads_value() {
async fn go(event: EventFd, ex: &Executor) -> u64 {
let event_async = EventAsync::new(event, ex).unwrap();
event_async.next_val().await.unwrap()
}
let eventfd = EventFd::new().unwrap();
eventfd.write(0xaa).unwrap();
let ex = Executor::new().unwrap();
let val = ex.run_until(go(eventfd, &ex)).unwrap();
assert_eq!(val, 0xaa);
}
#[test]
fn next_val_reads_value_poll_and_ring() {
if !use_uring() {
return;
}
async fn go(event_async: EventAsync) -> u64 {
event_async.next_val().await.unwrap()
}
let eventfd = EventFd::new().unwrap();
eventfd.write(0xaa).unwrap();
let uring_ex = URingExecutor::new().unwrap();
let val = uring_ex
.run_until(go(EventAsync::new_uring(eventfd, &uring_ex).unwrap()))
.unwrap();
assert_eq!(val, 0xaa);
let eventfd = EventFd::new().unwrap();
eventfd.write(0xaa).unwrap();
let poll_ex = FdExecutor::new().unwrap();
let val = poll_ex
.run_until(go(EventAsync::new_poll(eventfd, &poll_ex).unwrap()))
.unwrap();
assert_eq!(val, 0xaa);
}
}

View file

@ -7,14 +7,16 @@ use std::future::Future;
use async_task::Task;
use super::{
poll_source::Error as PollError, uring_executor::use_uring, AsyncResult, FdExecutor, IntoAsync,
IoSourceExt, PollSource, URingExecutor, UringSource,
poll_source::Error as PollError, uring_executor::use_uring, FdExecutor, PollSource,
URingExecutor, UringSource,
};
use crate::{AsyncResult, IntoAsync, IoSourceExt};
pub(crate) fn async_uring_from<'a, F: IntoAsync + Send + 'a>(
f: F,
ex: &URingExecutor,
) -> AsyncResult<Box<dyn IoSourceExt<F> + Send + 'a>> {
) -> AsyncResult<Box<dyn IoSourceExt<F> + 'a + Send>> {
Ok(UringSource::new(f, ex).map(|u| Box::new(u) as Box<dyn IoSourceExt<F> + Send>)?)
}
@ -34,6 +36,13 @@ pub(crate) fn async_poll_from<'a, F: IntoAsync + Send + 'a>(
/// The returned type is a cheap, clonable handle to the underlying executor. Cloning it will only
/// create a new reference, not a new executor.
///
/// Note that language limitations (trait objects can have <=1 non auto trait) require this to be
/// represented on the POSIX side as an enum, rather than a trait. This leads to some code &
/// interface duplication, but as far as we understand that is unavoidable.
///
/// See https://chromium-review.googlesource.com/c/chromiumos/platform/crosvm/+/2571401/2..6/cros_async/src/executor.rs#b75
/// for further details.
///
/// # Examples
///
/// Concurrently wait for multiple files to become readable/writable and then read/write the data.
@ -49,7 +58,7 @@ pub(crate) fn async_poll_from<'a, F: IntoAsync + Send + 'a>(
/// // Write all bytes from `data` to `f`.
/// async fn write_file(f: &dyn IoSourceExt<File>, mut data: Vec<u8>) -> AsyncResult<()> {
/// while data.len() > 0 {
/// let (count, mut buf) = f.write_from_vec(None, data).await?;
/// let (count, mut buf) = f.write_from_vec(Some(0), data).await?;
///
/// data = buf.split_off(count);
/// }
@ -67,7 +76,7 @@ pub(crate) fn async_poll_from<'a, F: IntoAsync + Send + 'a>(
///
/// while rem > 0 {
/// let buf = vec![0u8; min(rem, CHUNK_SIZE)];
/// let (count, mut data) = from.read_to_vec(None, buf).await?;
/// let (count, mut data) = from.read_to_vec(Some(0), buf).await?;
///
/// if count == 0 {
/// // End of file. Return the number of bytes transferred.
@ -83,10 +92,11 @@ pub(crate) fn async_poll_from<'a, F: IntoAsync + Send + 'a>(
/// Ok(len)
/// }
///
/// #[cfg(unix)]
/// # fn do_it() -> Result<(), Box<dyn Error>> {
/// let ex = Executor::new()?;
///
/// let (rx, tx) = base::pipe(true)?;
/// let (rx, tx) = base::unix::pipe(true)?;
/// let zero = File::open("/dev/zero")?;
/// let zero_bytes = CHUNK_SIZE * 7;
/// let zero_to_pipe = transfer_data(
@ -111,7 +121,7 @@ pub(crate) fn async_poll_from<'a, F: IntoAsync + Send + 'a>(
///
/// # Ok(())
/// # }
///
/// #[cfg(unix)]
/// # do_it().unwrap();
/// ```

View file

@ -23,7 +23,7 @@ use std::{
};
use async_task::Task;
use base::{add_fd_flags, warn, EpollContext, EpollEvents, EventFd, WatchingEvents};
use base::{add_fd_flags, warn, EpollContext, EpollEvents, Event as EventFd, WatchingEvents};
use futures::task::noop_waker;
use pin_utils::pin_mut;
use remain::sorted;
@ -31,7 +31,7 @@ use slab::Slab;
use sync::Mutex;
use thiserror::Error as ThisError;
use super::{
use crate::{
queue::RunnableQueue,
waker::{new_waker, WakerToken, WeakWake},
BlockingPool,
@ -527,7 +527,7 @@ impl FdExecutor {
let waker = new_waker(Arc::downgrade(&self.raw));
let mut cx = Context::from_waker(&waker);
self.raw.run(&mut cx, super::empty::<()>())
self.raw.run(&mut cx, crate::empty::<()>())
}
pub fn run_until<F: Future>(&self, f: F) -> Result<F::Output> {

View file

@ -17,14 +17,12 @@ use data_model::VolatileSlice;
use remain::sorted;
use thiserror::Error as ThisError;
use crate::AllocateMode;
use super::{
fd_executor::{
FdExecutor, RegisteredSource, {self},
},
use super::fd_executor::{
FdExecutor, RegisteredSource, {self},
};
use crate::{
mem::{BackingMemory, MemRegion},
AsyncError, AsyncResult, IoSourceExt, ReadAsync, WriteAsync,
AllocateMode, AsyncError, AsyncResult, IoSourceExt, ReadAsync, WriteAsync,
};
#[sorted]
@ -320,10 +318,11 @@ impl<F: AsRawFd> WriteAsync for PollSource<F> {
/// See `fallocate(2)` for details.
async fn fallocate(&self, file_offset: u64, len: u64, mode: AllocateMode) -> AsyncResult<()> {
let mode_u32: u32 = mode.into();
let ret = unsafe {
libc::fallocate64(
self.as_raw_fd(),
mode as libc::c_int,
mode_u32 as libc::c_int,
file_offset as libc::off64_t,
len as libc::off64_t,
)
@ -362,6 +361,10 @@ impl<F: AsRawFd> IoSourceExt<F> for PollSource<F> {
fn as_source(&self) -> &F {
self
}
async fn wait_for_handle(&self) -> AsyncResult<u64> {
self.read_u64().await
}
}
#[cfg(test)]

View file

@ -0,0 +1,83 @@
// Copyright 2022 The Chromium OS Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
// For the moment, the only platform specific code is related to tests.
#![cfg(test)]
use super::{FdExecutor, URingExecutor};
use crate::{sys::unix::executor, AsyncResult, TimerAsync};
use base::Timer;
impl TimerAsync {
pub(crate) fn new_poll(timer: Timer, ex: &FdExecutor) -> AsyncResult<TimerAsync> {
executor::async_poll_from(timer, ex).map(|io_source| TimerAsync { io_source })
}
pub(crate) fn new_uring(timer: Timer, ex: &URingExecutor) -> AsyncResult<TimerAsync> {
executor::async_uring_from(timer, ex).map(|io_source| TimerAsync { io_source })
}
}
mod tests {
use super::*;
use crate::{sys::unix::uring_executor::use_uring, Executor};
use std::time::{Duration, Instant};
#[test]
fn timer() {
async fn this_test(ex: &Executor) {
let dur = Duration::from_millis(200);
let now = Instant::now();
TimerAsync::sleep(ex, dur).await.expect("unable to sleep");
assert!(now.elapsed() >= dur);
}
let ex = Executor::new().expect("creating an executor failed");
ex.run_until(this_test(&ex)).unwrap();
}
#[test]
fn one_shot() {
if !use_uring() {
return;
}
async fn this_test(ex: &URingExecutor) {
let mut tfd = Timer::new().expect("failed to create timerfd");
let dur = Duration::from_millis(200);
let now = Instant::now();
tfd.reset(dur, None).expect("failed to arm timer");
let t = TimerAsync::new_uring(tfd, ex).unwrap();
let count = t.next_val().await.expect("unable to wait for timer");
assert_eq!(count, 1);
assert!(now.elapsed() >= dur);
}
let ex = URingExecutor::new().unwrap();
ex.run_until(this_test(&ex)).unwrap();
}
#[test]
fn one_shot_fd() {
async fn this_test(ex: &FdExecutor) {
let mut tfd = Timer::new().expect("failed to create timerfd");
let dur = Duration::from_millis(200);
let now = Instant::now();
tfd.reset(dur, None).expect("failed to arm timer");
let t = TimerAsync::new_poll(tfd, ex).unwrap();
let count = t.next_val().await.expect("unable to wait for timer");
assert_eq!(count, 1);
assert!(now.elapsed() >= dur);
}
let ex = FdExecutor::new().unwrap();
ex.run_until(this_test(&ex)).unwrap();
}
}

View file

@ -83,7 +83,7 @@ use slab::Slab;
use sync::Mutex;
use thiserror::Error as ThisError;
use super::{
use crate::{
mem::{BackingMemory, MemRegion},
queue::RunnableQueue,
waker::{new_waker, WakerToken, WeakWake},
@ -542,7 +542,6 @@ impl RawExecutor {
self.ctx
.add_poll_fd(src.as_raw_fd(), events, usize_to_u64(next_op_token))
.map_err(Error::SubmittingOp)?;
entry.insert(OpStatus::Pending(OpData {
_file: src,
_mem: None,
@ -600,7 +599,6 @@ impl RawExecutor {
self.ctx
.add_fsync(src.as_raw_fd(), usize_to_u64(next_op_token))
.map_err(Error::SubmittingOp)?;
entry.insert(OpStatus::Pending(OpData {
_file: src,
_mem: None,
@ -807,7 +805,7 @@ impl URingExecutor {
let waker = new_waker(Arc::downgrade(&self.raw));
let mut cx = Context::from_waker(&waker);
self.raw.run(&mut cx, super::empty::<()>())
self.raw.run(&mut cx, crate::empty::<()>())
}
pub fn run_until<F: Future>(&self, f: F) -> Result<F::Output> {
@ -908,13 +906,10 @@ mod tests {
task::{Context, Poll},
};
use super::*;
use crate::mem::{BackingMemory, MemRegion, VecIoWrapper};
use futures::executor::block_on;
use super::{
super::mem::{BackingMemory, MemRegion, VecIoWrapper},
*,
};
// A future that returns ready when the uring queue is empty.
struct UringQueueEmpty<'a> {
ex: &'a URingExecutor,

View file

@ -12,12 +12,10 @@ use std::{
use async_trait::async_trait;
use crate::AllocateMode;
use super::{
use super::uring_executor::{Error, RegisteredSource, Result, URingExecutor};
use crate::{
mem::{BackingMemory, MemRegion, VecIoWrapper},
uring_executor::{Error, RegisteredSource, Result, URingExecutor},
AsyncError, AsyncResult,
AllocateMode, AsyncError, AsyncResult, ReadAsync, WriteAsync,
};
/// `UringSource` wraps FD backed IO sources for use with io_uring. It is a thin wrapper around
@ -45,7 +43,7 @@ impl<F: AsRawFd> UringSource<F> {
}
#[async_trait(?Send)]
impl<F: AsRawFd> super::ReadAsync for UringSource<F> {
impl<F: AsRawFd> ReadAsync for UringSource<F> {
/// Reads from the iosource at `file_offset` and fill the given `vec`.
async fn read_to_vec<'a>(
&'a self,
@ -125,7 +123,7 @@ impl<F: AsRawFd> super::ReadAsync for UringSource<F> {
}
#[async_trait(?Send)]
impl<F: AsRawFd> super::WriteAsync for UringSource<F> {
impl<F: AsRawFd> WriteAsync for UringSource<F> {
/// Writes from the given `vec` to the file starting at `file_offset`.
async fn write_from_vec<'a>(
&'a self,
@ -185,7 +183,7 @@ impl<F: AsRawFd> super::WriteAsync for UringSource<F> {
}
#[async_trait(?Send)]
impl<F: AsRawFd> super::IoSourceExt<F> for UringSource<F> {
impl<F: AsRawFd> crate::IoSourceExt<F> for UringSource<F> {
/// Yields the underlying IO source.
fn into_source(self: Box<Self>) -> F {
self.source
@ -200,6 +198,10 @@ impl<F: AsRawFd> super::IoSourceExt<F> for UringSource<F> {
fn as_source_mut(&mut self) -> &mut F {
&mut self.source
}
async fn wait_for_handle(&self) -> AsyncResult<u64> {
self.read_u64().await
}
}
impl<F: AsRawFd> Deref for UringSource<F> {
@ -224,11 +226,8 @@ mod tests {
path::PathBuf,
};
use super::super::{
io_ext::{ReadAsync, WriteAsync},
uring_executor::use_uring,
UringSource,
};
use super::super::{uring_executor::use_uring, UringSource};
use crate::io_ext::{ReadAsync, WriteAsync};
use super::*;
@ -238,7 +237,7 @@ mod tests {
return;
}
use super::super::mem::VecIoWrapper;
use crate::mem::VecIoWrapper;
use std::io::Write;
use tempfile::tempfile;
@ -345,7 +344,7 @@ mod tests {
return;
}
use base::EventFd;
use base::Event as EventFd;
async fn write_event(ev: EventFd, wait: EventFd, ex: &URingExecutor) {
let wait = UringSource::new(wait, ex).unwrap();
@ -511,9 +510,9 @@ mod tests {
let source = UringSource::new(f, ex).unwrap();
if let Err(e) = source.fallocate(0, 4096, AllocateMode::Default).await {
match e {
super::super::io_ext::Error::Uring(
super::super::uring_executor::Error::Io(io_err),
) => {
crate::io_ext::Error::Uring(super::super::uring_executor::Error::Io(
io_err,
)) => {
if io_err.kind() == std::io::ErrorKind::InvalidInput {
// Skip the test on kernels before fallocate support.
return;
@ -577,7 +576,7 @@ mod tests {
.unwrap();
let source = UringSource::new(f, ex).unwrap();
let v = vec![0x55u8; 64];
let vw = Arc::new(super::super::mem::VecIoWrapper::from(v));
let vw = Arc::new(crate::mem::VecIoWrapper::from(v));
let ret = source
.write_from_mem(None, vw, &[MemRegion { offset: 0, len: 32 }])
.await

View file

@ -0,0 +1,52 @@
// Copyright 2021 The Chromium OS Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
pub mod async_types;
pub mod event;
pub mod executor;
pub mod handle_executor;
pub mod handle_source;
mod timer;
pub mod wait_for_handle;
pub(crate) use wait_for_handle::WaitForHandle;
pub use {
handle_executor::HandleExecutor,
handle_source::{HandleSource, HandleWrapper},
};
use crate::{Error, Result};
use std::future::Future;
pub fn run_one_handle<F: Future>(fut: F) -> Result<F::Output> {
let ex = HandleExecutor::new();
ex.run_until(fut).map_err(Error::HandleExecutor)
}
/// Creates an Executor that runs one future to completion.
///
/// # Example
///
/// ```
/// #[cfg(unix)]
/// {
/// use cros_async::run_one;
///
/// let fut = async { 55 };
/// assert_eq!(55, run_one(fut).unwrap());
/// }
/// ```
pub fn run_one<F: Future>(fut: F) -> Result<F::Output> {
run_one_handle(fut)
}
impl From<Error> for std::io::Error {
fn from(e: Error) -> Self {
use Error::*;
match e {
HandleExecutor(e) => e.into(),
Timer(e) => e.into(),
TimerAsync(e) => e.into(),
}
}
}

View file

@ -2,11 +2,12 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use crate::{Executor, HandleWrapper};
use base::{AsRawDescriptor, Descriptor, RecvTube, SendTube, Tube, TubeResult};
use serde::de::DeserializeOwned;
use super::HandleWrapper;
use crate::{unblock, DescriptorAdapter, DescriptorIntoAsync, Executor};
use base::{Descriptor, Tube, TubeError, TubeResult};
use serde::{de::DeserializeOwned, Serialize};
use std::io;
use std::os::windows::io::AsRawHandle;
use std::os::windows::io::{AsRawHandle, RawHandle};
use std::sync::{Arc, Mutex};
pub struct AsyncTube {
@ -14,7 +15,7 @@ pub struct AsyncTube {
}
impl AsyncTube {
pub fn new(ex: &Executor, tube: Tube) -> io::Result<AsyncTube> {
pub fn new(_ex: &Executor, tube: Tube) -> io::Result<AsyncTube> {
Ok(AsyncTube {
inner: Arc::new(Mutex::new(tube)),
})
@ -28,7 +29,7 @@ impl AsyncTube {
let handles = HandleWrapper::new(vec![Descriptor(tube.lock().unwrap().as_raw_handle())]);
unblock(
move || tube.lock().unwrap().recv(),
move || Err(handles.lock().cancel_sync_io(Error::OperationCancelled)),
move || Err(handles.lock().cancel_sync_io(TubeError::OperationCancelled)),
)
.await
}
@ -38,7 +39,7 @@ impl AsyncTube {
let handles = HandleWrapper::new(vec![Descriptor(tube.lock().unwrap().as_raw_handle())]);
unblock(
move || tube.lock().unwrap().send(&msg),
move || Err(handles.lock().cancel_sync_io(Error::OperationCancelled)),
move || Err(handles.lock().cancel_sync_io(TubeError::OperationCancelled)),
)
.await
}
@ -52,7 +53,17 @@ impl From<AsyncTube> for Tube {
//
// This does however mean that into will block until all async
// operations are complete.
let _ = at.inner.lock().unwrap();
std::mem::drop(at.inner.lock().unwrap());
Arc::try_unwrap(at.inner).unwrap().into_inner().unwrap()
}
}
#[cfg(windows)]
impl<T> AsRawHandle for DescriptorAdapter<T>
where
T: DescriptorIntoAsync,
{
fn as_raw_handle(&self) -> RawHandle {
self.0.as_raw_descriptor()
}
}

View file

@ -0,0 +1,37 @@
// Copyright 2022 The Chromium OS Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use crate::{AsyncError, AsyncResult, EventAsync, Executor};
use base::Event;
impl EventAsync {
pub fn new(event: Event, ex: &Executor) -> AsyncResult<EventAsync> {
ex.async_from(event).map(|io_source| EventAsync {
io_source,
reset_after_read: true,
})
}
/// For Windows events, especially those used in overlapped IO, we don't want to reset them
/// after "reading" from them because the signaling state is entirely managed by the kernel.
pub fn new_without_reset(event: Event, ex: &Executor) -> AsyncResult<EventAsync> {
ex.async_from(event).map(|io_source| EventAsync {
io_source,
reset_after_read: false,
})
}
/// Gets the next value from the eventfd.
pub async fn next_val(&self) -> AsyncResult<u64> {
let res = self.io_source.wait_for_handle().await;
if self.reset_after_read {
self.io_source
.as_source()
.reset()
.map_err(AsyncError::EventAsync)?;
}
res
}
}

View file

@ -0,0 +1,291 @@
// Copyright 2020 The Chromium OS Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use std::future::Future;
use async_task::Task;
use super::{HandleExecutor, HandleSource};
use crate::{AsyncResult, IntoAsync, IoSourceExt};
/// Creates a concrete `IoSourceExt` using the handle_executor.
pub(crate) fn async_handle_from<'a, F: IntoAsync + 'a + Send>(
f: F,
) -> AsyncResult<Box<dyn IoSourceExt<F> + 'a + Send>> {
Ok(HandleSource::new(vec![f].into_boxed_slice())
.map(|u| Box::new(u) as Box<dyn IoSourceExt<F> + Send>)?)
}
/// An executor for scheduling tasks that poll futures to completion.
///
/// All asynchronous operations must run within an executor, which is capable of spawning futures as
/// tasks. This executor also provides a mechanism for performing asynchronous I/O operations.
///
/// The returned type is a cheap, clonable handle to the underlying executor. Cloning it will only
/// create a new reference, not a new executor.
///
/// Note that language limitations (trait objects can have <=1 non auto trait) require this to be
/// represented on the POSIX side as an enum, rather than a trait. This leads to some code &
/// interface duplication, but as far as we understand that is unavoidable.
///
/// See https://chromium-review.googlesource.com/c/chromiumos/platform/crosvm/+/2571401/2..6/cros_async/src/executor.rs#b75
/// for further details.
///
/// # Examples
///
/// Concurrently wait for multiple files to become readable/writable and then read/write the data.
///
/// ```
/// use std::cmp::min;
/// use std::error::Error;
/// use std::fs::{File, OpenOptions};
///
/// use cros_async::{AsyncResult, Executor, IoSourceExt, complete3};
/// const CHUNK_SIZE: usize = 32;
///
/// // Write all bytes from `data` to `f`.
/// async fn write_file(f: &dyn IoSourceExt<File>, mut data: Vec<u8>) -> AsyncResult<()> {
/// while data.len() > 0 {
/// let (count, mut buf) = f.write_from_vec(Some(0), data).await?;
///
/// data = buf.split_off(count);
/// }
///
/// Ok(())
/// }
///
/// // Transfer `len` bytes of data from `from` to `to`.
/// async fn transfer_data(
/// from: Box<dyn IoSourceExt<File>>,
/// to: Box<dyn IoSourceExt<File>>,
/// len: usize,
/// ) -> AsyncResult<usize> {
/// let mut rem = len;
///
/// while rem > 0 {
/// let buf = vec![0u8; min(rem, CHUNK_SIZE)];
/// let (count, mut data) = from.read_to_vec(Some(0), buf).await?;
///
/// if count == 0 {
/// // End of file. Return the number of bytes transferred.
/// return Ok(len - rem);
/// }
///
/// data.truncate(count);
/// write_file(&*to, data).await?;
///
/// rem = rem.saturating_sub(count);
/// }
///
/// Ok(len)
/// }
///
/// #[cfg(unix)]
/// # fn do_it() -> Result<(), Box<dyn Error>> {
/// let ex = Executor::new()?;
///
/// let (rx, tx) = sys_util::pipe(true)?;
/// let zero = File::open("/dev/zero")?;
/// let zero_bytes = CHUNK_SIZE * 7;
/// let zero_to_pipe = transfer_data(
/// ex.async_from(zero)?,
/// ex.async_from(tx.try_clone()?)?,
/// zero_bytes,
/// );
///
/// let rand = File::open("/dev/urandom")?;
/// let rand_bytes = CHUNK_SIZE * 19;
/// let rand_to_pipe = transfer_data(ex.async_from(rand)?, ex.async_from(tx)?, rand_bytes);
///
/// let null = OpenOptions::new().write(true).open("/dev/null")?;
/// let null_bytes = zero_bytes + rand_bytes;
/// let pipe_to_null = transfer_data(ex.async_from(rx)?, ex.async_from(null)?, null_bytes);
///
/// ex.run_until(complete3(
/// async { assert_eq!(pipe_to_null.await.unwrap(), null_bytes) },
/// async { assert_eq!(zero_to_pipe.await.unwrap(), zero_bytes) },
/// async { assert_eq!(rand_to_pipe.await.unwrap(), rand_bytes) },
/// ))?;
///
/// # Ok(())
/// # }
/// #[cfg(unix)]
/// # do_it().unwrap();
/// ```
#[derive(Clone)]
pub enum Executor {
Handle(HandleExecutor),
}
impl Executor {
/// Create a new `Executor`.
pub fn new() -> AsyncResult<Self> {
Ok(Executor::Handle(HandleExecutor::new()))
}
/// Create a new `Box<dyn IoSourceExt<F>>` associated with `self`. Callers may then use the
/// returned `IoSourceExt` to directly start async operations without needing a separate
/// reference to the executor.
pub fn async_from<'a, F: IntoAsync + 'a + Send>(
&self,
f: F,
) -> AsyncResult<Box<dyn IoSourceExt<F> + 'a + Send>> {
match self {
Executor::Handle(_) => async_handle_from(f),
}
}
/// Spawn a new future for this executor to run to completion. Callers may use the returned
/// `Task` to await on the result of `f`. Dropping the returned `Task` will cancel `f`,
/// preventing it from being polled again. To drop a `Task` without canceling the future
/// associated with it use `Task::detach`. To cancel a task gracefully and wait until it is
/// fully destroyed, use `Task::cancel`.
///
/// # Examples
///
/// ```
/// # use cros_async::AsyncResult;
/// # fn example_spawn() -> AsyncResult<()> {
/// # use std::thread;
///
/// # use cros_async::Executor;
/// use futures::executor::block_on;
///
/// # let ex = Executor::new()?;
///
/// # // Spawn a thread that runs the executor.
/// # let ex2 = ex.clone();
/// # thread::spawn(move || ex2.run());
///
/// let task = ex.spawn(async { 7 + 13 });
///
/// let result = block_on(task);
/// assert_eq!(result, 20);
/// # Ok(())
/// # }
///
/// # example_spawn().unwrap();
/// ```
pub fn spawn<F>(&self, f: F) -> Task<F::Output>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
match self {
Executor::Handle(ex) => ex.spawn(f),
}
}
/// Spawn a thread-local task for this executor to drive to completion. Like `spawn` but without
/// requiring `Send` on `F` or `F::Output`. This method should only be called from the same
/// thread where `run()` or `run_until()` is called.
///
/// # Panics
///
/// `Executor::run` and `Executor::run_util` will panic if they try to poll a future that was
/// added by calling `spawn_local` from a different thread.
///
/// # Examples
///
/// ```
/// # use cros_async::AsyncResult;
/// # fn example_spawn_local() -> AsyncResult<()> {
/// # use cros_async::Executor;
///
/// # let ex = Executor::new()?;
///
/// let task = ex.spawn_local(async { 7 + 13 });
///
/// let result = ex.run_until(task)?;
/// assert_eq!(result, 20);
/// # Ok(())
/// # }
///
/// # example_spawn_local().unwrap();
/// ```
pub fn spawn_local<F>(&self, f: F) -> Task<F::Output>
where
F: Future + 'static,
F::Output: 'static,
{
match self {
Executor::Handle(ex) => ex.spawn_local(f),
}
}
/// Run the executor indefinitely, driving all spawned futures to completion. This method will
/// block the current thread and only return in the case of an error.
///
/// # Panics
///
/// Once this method has been called on a thread, it may only be called on that thread from that
/// point on. Attempting to call it from another thread will panic.
///
/// # Examples
///
/// ```
/// # use cros_async::AsyncResult;
/// # fn example_run() -> AsyncResult<()> {
/// use std::thread;
///
/// use cros_async::Executor;
/// use futures::executor::block_on;
///
/// let ex = Executor::new()?;
///
/// // Spawn a thread that runs the executor.
/// let ex2 = ex.clone();
/// thread::spawn(move || ex2.run());
///
/// let task = ex.spawn(async { 7 + 13 });
///
/// let result = block_on(task);
/// assert_eq!(result, 20);
/// # Ok(())
/// # }
///
/// # example_run().unwrap();
/// ```
pub fn run(&self) -> AsyncResult<()> {
match self {
Executor::Handle(ex) => ex.run()?,
}
Ok(())
}
/// Drive all futures spawned in this executor until `f` completes. This method will block the
/// current thread only until `f` is complete and there may still be unfinished futures in the
/// executor.
///
/// # Panics
///
/// Once this method has been called on a thread, from then onwards it may only be called on
/// that thread. Attempting to call it from another thread will panic.
///
/// # Examples
///
/// ```
/// # use cros_async::AsyncResult;
/// # fn example_run_until() -> AsyncResult<()> {
/// use cros_async::Executor;
///
/// let ex = Executor::new()?;
///
/// let task = ex.spawn_local(async { 7 + 13 });
///
/// let result = ex.run_until(task)?;
/// assert_eq!(result, 20);
/// # Ok(())
/// # }
///
/// # example_run_until().unwrap();
/// ```
pub fn run_until<F: Future>(&self, f: F) -> AsyncResult<F::Output> {
match self {
Executor::Handle(ex) => Ok(ex.run_until(f)?),
}
}
}

View file

@ -0,0 +1,198 @@
// Copyright 2020 The Chromium OS Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use crate::{
queue::RunnableQueue,
waker::{new_waker, WeakWake},
};
use async_task::{Runnable, Task};
use futures::task::{Context, Poll};
use pin_utils::pin_mut;
use std::{
future::Future,
io,
sync::{mpsc, Arc, Weak},
};
use sync::{Condvar, Mutex};
use thiserror::Error as ThisError;
#[derive(Debug, ThisError)]
pub enum Error {
#[error("Failed to get future from executor run.")]
FailedToReadFutureFromWakerChannel(mpsc::RecvError),
}
impl From<Error> for io::Error {
fn from(e: Error) -> Self {
use Error::*;
match e {
FailedToReadFutureFromWakerChannel(e) => io::Error::new(io::ErrorKind::Other, e),
}
}
}
pub type Result<T> = std::result::Result<T, Error>;
#[derive(Clone)]
pub struct HandleExecutor {
raw: Arc<RawExecutor>,
}
impl HandleExecutor {
pub fn new() -> Self {
Self {
raw: Arc::new(RawExecutor::new()),
}
}
pub fn spawn<F>(&self, f: F) -> Task<F::Output>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
self.raw.spawn(f)
}
pub fn spawn_local<F>(&self, f: F) -> Task<F::Output>
where
F: Future + 'static,
F::Output: 'static,
{
self.raw.spawn_local(f)
}
pub fn run(&self) -> Result<()> {
let waker = new_waker(Arc::downgrade(&self.raw));
let mut cx = Context::from_waker(&waker);
self.raw.run(&mut cx, crate::empty::<()>())
}
pub fn run_until<F: Future>(&self, f: F) -> Result<F::Output> {
let waker = new_waker(Arc::downgrade(&self.raw));
let mut cx = Context::from_waker(&waker);
self.raw.run(&mut cx, f)
}
}
struct RawExecutor {
queue: RunnableQueue,
woken: Mutex<bool>,
wakeup: Condvar,
}
impl RawExecutor {
fn new() -> Self {
Self {
queue: RunnableQueue::new(),
woken: Mutex::new(false),
wakeup: Condvar::new(),
}
}
fn make_schedule_fn(self: &Arc<Self>) -> impl Fn(Runnable) {
let raw = Arc::downgrade(self);
move |runnable| {
if let Some(r) = raw.upgrade() {
r.queue.push_back(runnable);
r.wake();
}
}
}
fn spawn<F>(self: &Arc<Self>, f: F) -> Task<F::Output>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
let (runnable, task) = async_task::spawn(f, self.make_schedule_fn());
runnable.schedule();
task
}
fn spawn_local<F>(self: &Arc<Self>, f: F) -> Task<F::Output>
where
F: Future + 'static,
F::Output: 'static,
{
let (runnable, task) = async_task::spawn_local(f, self.make_schedule_fn());
runnable.schedule();
task
}
fn run<F: Future>(&self, cx: &mut Context, done: F) -> Result<F::Output> {
pin_mut!(done);
loop {
for runnable in self.queue.iter() {
runnable.run();
}
if let Poll::Ready(val) = done.as_mut().poll(cx) {
return Ok(val);
}
self.wait()
}
}
fn wait(&self) {
let mut woken = self.woken.lock();
while !*woken {
woken = self.wakeup.wait(woken);
}
*woken = false;
}
fn wake(self: &Arc<Self>) {
*self.woken.lock() = true;
self.wakeup.notify_one();
}
}
impl WeakWake for RawExecutor {
fn wake_by_ref(weak_self: &Weak<Self>) {
if let Some(arc_self) = weak_self.upgrade() {
RawExecutor::wake(&arc_self);
}
}
}
#[cfg(test)]
mod test {
use super::*;
const FUT_MSG: i32 = 5;
use futures::{channel::mpsc as fut_mpsc, SinkExt, StreamExt};
#[test]
fn run_future() {
let (send, recv) = mpsc::channel();
async fn this_test(send: mpsc::Sender<i32>) {
send.send(FUT_MSG).unwrap();
}
let ex = HandleExecutor::new();
ex.run_until(this_test(send)).unwrap();
assert_eq!(recv.recv().unwrap(), FUT_MSG);
}
#[test]
fn spawn_future() {
let (send, recv) = fut_mpsc::channel(1);
let (send_done_signal, receive_done_signal) = mpsc::channel();
async fn message_sender(mut send: fut_mpsc::Sender<i32>) {
send.send(FUT_MSG).await.unwrap();
}
async fn message_receiver(mut recv: fut_mpsc::Receiver<i32>, done: mpsc::Sender<bool>) {
assert_eq!(recv.next().await.unwrap(), FUT_MSG);
done.send(true).unwrap();
}
let ex = HandleExecutor::new();
ex.spawn(message_sender(send)).detach();
ex.run_until(message_receiver(recv, send_done_signal))
.unwrap();
assert_eq!(receive_done_signal.recv().unwrap(), true);
}
}

View file

@ -0,0 +1,633 @@
// Copyright 2020 The Chromium OS Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use crate::{
io_ext::AllocateMode,
mem::{BackingMemory, MemRegion},
AsyncError, AsyncResult, CancellableBlockingPool, IoSourceExt, ReadAsync, WriteAsync,
};
use async_trait::async_trait;
use base::{
error, warn, AsRawDescriptor, Descriptor, Error as SysUtilError, FileReadWriteAtVolatile,
FileReadWriteVolatile, PunchHole, WriteZeroesAt,
};
use data_model::VolatileSlice;
use smallvec::SmallVec;
use std::{
fs::File,
io::{
Read, Seek, SeekFrom, Write, {self},
},
mem::ManuallyDrop,
os::windows::io::{AsRawHandle, FromRawHandle},
ptr::null_mut,
sync::Arc,
time::Duration,
};
use sync::Mutex;
use thiserror::Error as ThisError;
use winapi::um::{ioapiset::CancelIoEx, processthreadsapi::GetCurrentThreadId};
#[derive(ThisError, Debug)]
pub enum Error {
#[error("An error occurred trying to seek: {0}.")]
IoSeekError(io::Error),
#[error("An error occurred trying to read: {0}.")]
IoReadError(io::Error),
#[error("An error occurred trying to write: {0}.")]
IoWriteError(io::Error),
#[error("An error occurred trying to flush: {0}.")]
IoFlushError(io::Error),
#[error("An error occurred trying to punch hole: {0}.")]
IoPunchHoleError(io::Error),
#[error("An error occurred trying to write zeroes: {0}.")]
IoWriteZeroesError(io::Error),
#[error("An error occurred trying to duplicate source handles: {0}.")]
HandleDuplicationFailed(io::Error),
#[error("An error occurred trying to wait on source handles: {0}.")]
HandleWaitFailed(base::Error),
#[error("An error occurred trying to get a VolatileSlice into BackingMemory: {0}.")]
BackingMemoryVolatileSliceFetchFailed(crate::mem::Error),
#[error("HandleSource is gone, so no handles are available to fulfill the IO request.")]
NoHandleSource,
#[error("Operation on HandleSource is cancelled.")]
OperationCancelled,
#[error("Operation on HandleSource was aborted (unexpected).")]
OperationAborted,
}
impl From<Error> for io::Error {
fn from(e: Error) -> Self {
use Error::*;
match e {
IoSeekError(e) => e,
IoReadError(e) => e,
IoWriteError(e) => e,
IoFlushError(e) => e,
IoPunchHoleError(e) => e,
IoWriteZeroesError(e) => e,
HandleDuplicationFailed(e) => e,
HandleWaitFailed(e) => e.into(),
BackingMemoryVolatileSliceFetchFailed(e) => io::Error::new(io::ErrorKind::Other, e),
NoHandleSource => io::Error::new(io::ErrorKind::Other, NoHandleSource),
OperationCancelled => io::Error::new(io::ErrorKind::Interrupted, OperationCancelled),
OperationAborted => io::Error::new(io::ErrorKind::Interrupted, OperationAborted),
}
}
}
pub type Result<T> = std::result::Result<T, Error>;
/// Used to shutdown IO running on a CancellableBlockingPool.
pub struct HandleWrapper {
handles: Vec<Descriptor>,
}
impl HandleWrapper {
pub fn new(handles: Vec<Descriptor>) -> Arc<Mutex<HandleWrapper>> {
Arc::new(Mutex::new(Self { handles }))
}
pub fn cancel_sync_io<T>(&mut self, ret: T) -> T {
for handle in &self.handles {
// There isn't much we can do if cancel fails.
if unsafe { CancelIoEx(handle.as_raw_descriptor(), null_mut()) } == 0 {
warn!(
"Cancel IO for handle:{:?} failed with {}",
handle.as_raw_descriptor(),
SysUtilError::last()
);
}
}
ret
}
}
/// Async IO source for Windows that uses a multi-threaded, multi-handle approach to provide fast IO
/// operations. It demuxes IO requests across a set of handles that refer to the same underlying IO
/// source, such as a file, and executes those requests across multiple threads. Benchmarks show
/// that this is the fastest method to perform IO on Windows, especially for file reads.
pub struct HandleSource<F: AsRawHandle> {
sources: Box<[F]>,
source_descriptors: Vec<Descriptor>,
blocking_pool: CancellableBlockingPool,
}
impl<F: AsRawHandle> HandleSource<F> {
/// Create a new `HandleSource` from the given IO source.
///
/// Each HandleSource uses its own thread pool, with one thread per source supplied. Since these
/// threads are generally idle because they're waiting on blocking IO, so the cost is minimal.
/// Long term, we may migrate away from this approach toward IOCP or overlapped IO.
///
/// WARNING: every `source` in `sources` MUST be a unique file object (e.g. separate handles
/// each created by CreateFile), and point at the same file on disk. This is because IO
/// operations on the HandleSource are randomly distributed to each source.
///
/// # Safety
/// The caller must guarantee that `F`'s handle is compatible with the underlying functions
/// exposed on `HandleSource`. The behavior when calling unsupported functions is not defined
/// by this struct. Note that most winapis will fail with reasonable errors.
pub fn new(sources: Box<[F]>) -> Result<Self> {
let source_count = sources.len();
let mut source_descriptors = Vec::with_capacity(source_count);
// Safe because consumers of the descriptors are tied to the lifetime of HandleSource.
for source in sources.iter() {
source_descriptors.push(Descriptor(source.as_raw_handle()));
}
Ok(Self {
sources,
source_descriptors,
blocking_pool: CancellableBlockingPool::new(
// WARNING: this is a safety requirement! Threads are 1:1 with sources.
source_count,
Duration::from_secs(10),
),
})
}
#[inline]
fn get_slices(
mem: &Arc<dyn BackingMemory + Send + Sync>,
mem_offsets: Vec<MemRegion>,
) -> Result<SmallVec<[VolatileSlice; 16]>> {
mem_offsets
.into_iter()
.map(|region| {
mem.get_volatile_slice(region)
.map_err(Error::BackingMemoryVolatileSliceFetchFailed)
})
.collect::<Result<SmallVec<[VolatileSlice; 16]>>>()
}
// Returns a copy of all the source handles as a vector of descriptors.
fn as_descriptors(&self) -> Vec<Descriptor> {
self.sources
.iter()
.map(|i| Descriptor(i.as_raw_handle()))
.collect()
}
}
impl<F: AsRawHandle> Drop for HandleSource<F> {
fn drop(&mut self) {
if let Err(e) = self.blocking_pool.shutdown() {
error!("failed to clean up HandleSource: {}", e);
}
}
}
fn get_thread_file(descriptors: Vec<Descriptor>) -> ManuallyDrop<File> {
// Safe because all callers must exit *before* these handles will be closed (guaranteed by
// HandleSource's Drop impl.).
unsafe {
ManuallyDrop::new(File::from_raw_handle(
descriptors[GetCurrentThreadId() as usize % descriptors.len()].0,
))
}
}
#[async_trait(?Send)]
impl<F: AsRawHandle> ReadAsync for HandleSource<F> {
/// Reads from the iosource at `file_offset` and fill the given `vec`.
async fn read_to_vec<'a>(
&'a self,
file_offset: Option<u64>,
mut vec: Vec<u8>,
) -> AsyncResult<(usize, Vec<u8>)> {
let handles = HandleWrapper::new(self.as_descriptors());
let descriptors = self.source_descriptors.clone();
self.blocking_pool
.spawn(
move || {
let mut file = get_thread_file(descriptors);
if let Some(file_offset) = file_offset {
file.seek(SeekFrom::Start(file_offset))
.map_err(Error::IoSeekError)?;
}
Ok((
file.read(vec.as_mut_slice()).map_err(Error::IoReadError)?,
vec,
))
},
move || Err(handles.lock().cancel_sync_io(Error::OperationCancelled)),
)
.await
.map_err(AsyncError::HandleSource)
}
/// Reads to the given `mem` at the given offsets from the file starting at `file_offset`.
async fn read_to_mem<'a>(
&'a self,
file_offset: Option<u64>,
mem: Arc<dyn BackingMemory + Send + Sync>,
mem_offsets: &'a [MemRegion],
) -> AsyncResult<usize> {
let mem_offsets = mem_offsets.to_owned();
let handles = HandleWrapper::new(self.as_descriptors());
let descriptors = self.source_descriptors.clone();
self.blocking_pool
.spawn(
move || {
let mut file = get_thread_file(descriptors);
let memory_slices = Self::get_slices(&mem, mem_offsets)?;
match file_offset {
Some(file_offset) => file
.read_vectored_at_volatile(memory_slices.as_slice(), file_offset)
.map_err(Error::IoReadError),
None => file
.read_vectored_volatile(memory_slices.as_slice())
.map_err(Error::IoReadError),
}
},
move || Err(handles.lock().cancel_sync_io(Error::OperationCancelled)),
)
.await
.map_err(AsyncError::HandleSource)
}
/// Wait for the handle of `self` to be readable.
async fn wait_readable(&self) -> AsyncResult<()> {
unimplemented!()
}
/// Reads a single u64 from the current offset.
async fn read_u64(&self) -> AsyncResult<u64> {
unimplemented!()
}
}
#[async_trait(?Send)]
impl<F: AsRawHandle> WriteAsync for HandleSource<F> {
/// Writes from the given `vec` to the file starting at `file_offset`.
async fn write_from_vec<'a>(
&'a self,
file_offset: Option<u64>,
vec: Vec<u8>,
) -> AsyncResult<(usize, Vec<u8>)> {
let handles = HandleWrapper::new(self.as_descriptors());
let descriptors = self.source_descriptors.clone();
self.blocking_pool
.spawn(
move || {
let mut file = get_thread_file(descriptors);
if let Some(file_offset) = file_offset {
file.seek(SeekFrom::Start(file_offset))
.map_err(Error::IoSeekError)?;
}
Ok((
file.write(vec.as_slice()).map_err(Error::IoWriteError)?,
vec,
))
},
move || Err(handles.lock().cancel_sync_io(Error::OperationCancelled)),
)
.await
.map_err(AsyncError::HandleSource)
}
/// Writes from the given `mem` from the given offsets to the file starting at `file_offset`.
async fn write_from_mem<'a>(
&'a self,
file_offset: Option<u64>,
mem: Arc<dyn BackingMemory + Send + Sync>,
mem_offsets: &'a [MemRegion],
) -> AsyncResult<usize> {
let mem_offsets = mem_offsets.to_owned();
let handles = HandleWrapper::new(self.as_descriptors());
let descriptors = self.source_descriptors.clone();
self.blocking_pool
.spawn(
move || {
let mut file = get_thread_file(descriptors);
let memory_slices = Self::get_slices(&mem, mem_offsets)?;
match file_offset {
Some(file_offset) => file
.write_vectored_at_volatile(memory_slices.as_slice(), file_offset)
.map_err(Error::IoWriteError),
None => file
.write_vectored_volatile(memory_slices.as_slice())
.map_err(Error::IoWriteError),
}
},
move || Err(handles.lock().cancel_sync_io(Error::OperationCancelled)),
)
.await
.map_err(AsyncError::HandleSource)
}
/// See `fallocate(2)`. Note this op is synchronous when using the Polled backend.
async fn fallocate(&self, file_offset: u64, len: u64, mode: AllocateMode) -> AsyncResult<()> {
let handles = HandleWrapper::new(self.as_descriptors());
let descriptors = self.source_descriptors.clone();
self.blocking_pool
.spawn(
move || {
let mut file = get_thread_file(descriptors);
match mode {
AllocateMode::PunchHole => {
file.punch_hole(file_offset, len)
.map_err(Error::IoPunchHoleError)?;
}
// ZeroRange calls `punch_hole` which doesn't extend the File size if it needs to.
// Will fix if it becomes a problem.
AllocateMode::ZeroRange => {
file.write_zeroes_at(file_offset, len as usize)
.map_err(Error::IoWriteZeroesError)?;
}
}
Ok(())
},
move || Err(handles.lock().cancel_sync_io(Error::OperationCancelled)),
)
.await
.map_err(AsyncError::HandleSource)
}
/// Sync all completed write operations to the backing storage.
async fn fsync(&self) -> AsyncResult<()> {
let handles = HandleWrapper::new(self.as_descriptors());
let descriptors = self.source_descriptors.clone();
self.blocking_pool
.spawn(
move || {
let mut file = get_thread_file(descriptors);
file.flush().map_err(Error::IoFlushError)
},
move || Err(handles.lock().cancel_sync_io(Error::OperationCancelled)),
)
.await
.map_err(AsyncError::HandleSource)
}
}
/// Subtrait for general async IO. Some not supported on Windows when multiple
/// sources are present.
///
/// Note that on Windows w/ multiple sources these functions do not make sense.
/// TODO(nkgold): decide on what these should mean.
#[async_trait(?Send)]
impl<F: AsRawHandle> IoSourceExt<F> for HandleSource<F> {
/// Yields the underlying IO source.
fn into_source(self: Box<Self>) -> F {
unimplemented!("`into_source` is not supported on Windows.")
}
/// Provides a mutable ref to the underlying IO source.
fn as_source_mut(&mut self) -> &mut F {
if self.sources.len() == 1 {
return &mut self.sources[0];
}
// Unimplemented for multiple-source use-case
unimplemented!(
"`as_source_mut` doesn't support source len of {}",
self.sources.len()
)
}
/// Provides a ref to the underlying IO source.
///
/// In the multi-source case, the 0th source will be returned. If sources are not
/// interchangeable, behavior is undefined.
fn as_source(&self) -> &F {
return &self.sources[0];
}
async fn wait_for_handle(&self) -> AsyncResult<u64> {
let waiter = super::WaitForHandle::new(self);
match waiter.await {
Err(e) => Err(AsyncError::HandleSource(e)),
Ok(()) => Ok(0),
}
}
}
#[cfg(test)]
mod tests {
use super::super::HandleExecutor;
use super::*;
use crate::mem::VecIoWrapper;
use std::fs;
use tempfile::{tempfile, NamedTempFile};
#[test]
fn test_read_vec() {
let mut f = tempfile().unwrap();
f.write_all("data".as_bytes()).unwrap();
f.flush().unwrap();
f.seek(SeekFrom::Start(0)).unwrap();
async fn read_data(handle_src: &HandleSource<File>) {
let buf: Vec<u8> = vec![0; 4];
let (bytes_read, buf) = handle_src.read_to_vec(Some(0), buf).await.unwrap();
assert_eq!(bytes_read, 4);
assert_eq!(std::str::from_utf8(buf.as_slice()).unwrap(), "data");
}
let ex = HandleExecutor::new();
let handle_src = HandleSource::new(vec![f].into_boxed_slice()).unwrap();
ex.run_until(read_data(&handle_src)).unwrap();
}
#[test]
fn test_read_mem() {
let mut f = tempfile().unwrap();
f.write_all("data".as_bytes()).unwrap();
f.flush().unwrap();
f.seek(SeekFrom::Start(0)).unwrap();
async fn read_data(handle_src: &HandleSource<File>) {
let mem = Arc::new(VecIoWrapper::from(vec![0; 4]));
let bytes_read = handle_src
.read_to_mem(
Some(0),
Arc::<VecIoWrapper>::clone(&mem),
&[
MemRegion { offset: 0, len: 2 },
MemRegion { offset: 2, len: 2 },
],
)
.await
.unwrap();
assert_eq!(bytes_read, 4);
let vec: Vec<u8> = match Arc::try_unwrap(mem) {
Ok(v) => v.into(),
Err(_) => panic!("Too many vec refs"),
};
assert_eq!(std::str::from_utf8(vec.as_slice()).unwrap(), "data");
}
let ex = HandleExecutor::new();
let handle_src = HandleSource::new(vec![f].into_boxed_slice()).unwrap();
ex.run_until(read_data(&handle_src)).unwrap();
}
#[test]
fn test_write_vec() {
let mut temp_file = NamedTempFile::new().unwrap();
async fn write_data(handle_src: &HandleSource<File>) {
let mut buf: Vec<u8> = Vec::new();
buf.extend_from_slice("data".as_bytes());
let (bytes_written, _) = handle_src.write_from_vec(Some(0), buf).await.unwrap();
assert_eq!(bytes_written, 4);
}
let ex = HandleExecutor::new();
let f = fs::OpenOptions::new()
.write(true)
.open(temp_file.path())
.unwrap();
let handle_src = HandleSource::new(vec![f].into_boxed_slice()).unwrap();
ex.run_until(write_data(&handle_src)).unwrap();
let mut buf = vec![0; 4];
temp_file.read_exact(&mut buf).unwrap();
assert_eq!(std::str::from_utf8(buf.as_slice()).unwrap(), "data");
}
#[test]
fn test_write_mem() {
let mut temp_file = NamedTempFile::new().unwrap();
async fn write_data(handle_src: &HandleSource<File>) {
let mut buf: Vec<u8> = Vec::new();
buf.extend_from_slice("data".as_bytes());
let mem = Arc::new(VecIoWrapper::from(buf));
let bytes_written = handle_src
.write_from_mem(
Some(0),
Arc::<VecIoWrapper>::clone(&mem),
&[
MemRegion { offset: 0, len: 2 },
MemRegion { offset: 2, len: 2 },
],
)
.await
.unwrap();
assert_eq!(bytes_written, 4);
match Arc::try_unwrap(mem) {
Ok(_) => (),
Err(_) => panic!("Too many vec refs"),
};
}
let ex = HandleExecutor::new();
let f = fs::OpenOptions::new()
.write(true)
.open(temp_file.path())
.unwrap();
let handle_src = HandleSource::new(vec![f].into_boxed_slice()).unwrap();
ex.run_until(write_data(&handle_src)).unwrap();
let mut buf = vec![0; 4];
temp_file.read_exact(&mut buf).unwrap();
assert_eq!(std::str::from_utf8(buf.as_slice()).unwrap(), "data");
}
#[test]
fn test_punch_holes() {
let mut temp_file = NamedTempFile::new().unwrap();
temp_file.write_all("abcdefghijk".as_bytes()).unwrap();
temp_file.flush().unwrap();
temp_file.seek(SeekFrom::Start(0)).unwrap();
async fn punch_hole(handle_src: &HandleSource<File>) {
let offset = 1;
let len = 3;
handle_src
.fallocate(offset, len, AllocateMode::PunchHole)
.await
.unwrap();
}
let ex = HandleExecutor::new();
let f = fs::OpenOptions::new()
.write(true)
.open(temp_file.path())
.unwrap();
let handle_src = HandleSource::new(vec![f].into_boxed_slice()).unwrap();
ex.run_until(punch_hole(&handle_src)).unwrap();
let mut buf = vec![0; 11];
temp_file.read_exact(&mut buf).unwrap();
assert_eq!(
std::str::from_utf8(buf.as_slice()).unwrap(),
"a\0\0\0efghijk"
);
}
/// Test should fail because punch hole should not be allowed to allocate more memory
#[test]
fn test_punch_holes_fail_out_of_bounds() {
let mut temp_file = NamedTempFile::new().unwrap();
temp_file.write_all("abcdefghijk".as_bytes()).unwrap();
temp_file.flush().unwrap();
temp_file.seek(SeekFrom::Start(0)).unwrap();
async fn punch_hole(handle_src: &HandleSource<File>) {
let offset = 9;
let len = 4;
handle_src
.fallocate(offset, len, AllocateMode::PunchHole)
.await
.unwrap();
}
let ex = HandleExecutor::new();
let f = fs::OpenOptions::new()
.write(true)
.open(temp_file.path())
.unwrap();
let handle_src = HandleSource::new(vec![f].into_boxed_slice()).unwrap();
ex.run_until(punch_hole(&handle_src)).unwrap();
let mut buf = vec![0; 13];
assert!(temp_file.read_exact(&mut buf).is_err());
}
// TODO(b/194338842): "ZeroRange" is supposed to allocate more memory if it goes out of the
// bounds of the file. Determine if we need to support this, since Windows doesn't do this yet.
// #[test]
// fn test_write_zeroes() {
// let mut temp_file = NamedTempFile::new().unwrap();
// temp_file.write("abcdefghijk".as_bytes()).unwrap();
// temp_file.flush().unwrap();
// temp_file.seek(SeekFrom::Start(0)).unwrap();
// async fn punch_hole(handle_src: &HandleSource<File>) {
// let offset = 9;
// let len = 4;
// handle_src
// .fallocate(offset, len, AllocateMode::ZeroRange)
// .await
// .unwrap();
// }
// let ex = HandleExecutor::new();
// let f = fs::OpenOptions::new()
// .write(true)
// .open(temp_file.path())
// .unwrap();
// let handle_src = HandleSource::new(vec![f].into_boxed_slice()).unwrap();
// ex.run_until(punch_hole(&handle_src)).unwrap();
// let mut buf = vec![0; 13];
// temp_file.read_exact(&mut buf).unwrap();
// assert_eq!(
// std::str::from_utf8(buf.as_slice()).unwrap(),
// "abcdefghi\0\0\0\0"
// );
// }
}

View file

@ -0,0 +1,28 @@
// Copyright 2022 The Chromium OS Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#[cfg(test)]
mod test {
use crate::{Executor, TimerAsync};
use std::time::{Duration, Instant};
#[test]
fn timer() {
async fn this_test(ex: &Executor) {
// On Windows, SetWaitableTimer, the underlying timer API, is not
// guaranteed to sleep for *at least* the supplied duration, so here
// we permit early wakeups.
let dur = Duration::from_millis(200);
let min_duration = Duration::from_millis(190);
let now = Instant::now();
TimerAsync::sleep(ex, dur).await.expect("unable to sleep");
let actual_sleep_duration = now.elapsed();
assert!(actual_sleep_duration >= min_duration);
}
let ex = Executor::new().expect("creating an executor failed");
ex.run_until(this_test(&ex)).unwrap();
}
}

View file

@ -0,0 +1,309 @@
// Copyright 2021 The Chromium OS Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use base::{error, warn, Descriptor};
use std::{
ffi::c_void,
future::Future,
marker::{PhantomData, PhantomPinned},
os::windows::io::AsRawHandle,
pin::Pin,
ptr::null_mut,
sync::MutexGuard,
task::{Context, Poll, Waker},
};
use sync::Mutex;
use winapi::{
shared::ntdef::FALSE,
um::{
handleapi::INVALID_HANDLE_VALUE,
threadpoollegacyapiset::UnregisterWaitEx,
winbase::{RegisterWaitForSingleObject, INFINITE},
winnt::{BOOLEAN, PVOID, WT_EXECUTEONLYONCE},
},
};
use crate::{
sys::windows::{
handle_source::{Error, Result},
HandleSource,
},
IoSourceExt,
};
/// Inner state shared between the future struct & the kernel invoked waiter callback.
struct WaitForHandleInner {
wait_state: WaitState,
wait_object: Descriptor,
waker: Option<Waker>,
}
impl WaitForHandleInner {
fn new() -> WaitForHandleInner {
WaitForHandleInner {
wait_state: WaitState::New,
wait_object: Descriptor(null_mut::<c_void>()),
waker: None,
}
}
}
/// Future's state.
#[derive(Clone, Copy, PartialEq)]
enum WaitState {
New,
Sleeping,
Woken,
Aborted,
Finished,
Failed,
}
/// Waits for a single handle valued HandleSource to be readable.
pub struct WaitForHandle<'a, T: AsRawHandle> {
handle: Descriptor,
inner: Mutex<WaitForHandleInner>,
_marker: PhantomData<&'a HandleSource<T>>,
_pinned_marker: PhantomPinned,
}
impl<'a, T> WaitForHandle<'a, T>
where
T: AsRawHandle,
{
pub fn new(handle_source: &'a HandleSource<T>) -> WaitForHandle<'a, T> {
WaitForHandle {
handle: Descriptor(handle_source.as_source().as_raw_handle()),
inner: Mutex::new(WaitForHandleInner::new()),
_marker: PhantomData,
_pinned_marker: PhantomPinned,
}
}
}
impl<'a, T> Future for WaitForHandle<'a, T>
where
T: AsRawHandle,
{
type Output = Result<()>;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let inner_for_callback = &self.inner as *const _ as *mut Mutex<WaitForHandleInner>;
let mut inner = self.inner.lock();
match inner.wait_state {
WaitState::New => {
// Safe because:
// a) the callback only runs when WaitForHandle is alive (we cancel it on
// drop).
// b) inner & its children are owned by WaitForHandle.
let err = unsafe {
RegisterWaitForSingleObject(
&mut inner.wait_object as *mut _ as *mut *mut c_void,
self.handle.0,
Some(wait_for_handle_waker),
inner_for_callback as *mut c_void,
INFINITE,
WT_EXECUTEONLYONCE,
)
};
if err == 0 {
return Poll::Ready(Err(Error::HandleWaitFailed(base::Error::last())));
}
inner.wait_state = WaitState::Sleeping;
inner.waker = Some(cx.waker().clone());
Poll::Pending
}
WaitState::Sleeping => {
// In case we are polled with a different waker which won't be woken by the existing
// waker, we'll have to update to the new waker.
if inner
.waker
.as_ref()
.map(|w| !w.will_wake(cx.waker()))
.unwrap_or(true)
{
inner.waker = Some(cx.waker().clone());
}
Poll::Pending
}
WaitState::Woken => {
inner.wait_state = WaitState::Finished;
// Safe because:
// a) we know a wait was registered and hasn't been unregistered yet.
// b) the callback is not queued because we set WT_EXECUTEONLYONCE, and we know
// it has already completed.
unsafe { unregister_wait(inner.wait_object) }
Poll::Ready(Ok(()))
}
WaitState::Aborted => Poll::Ready(Err(Error::OperationAborted)),
WaitState::Finished => panic!("polled an already completed WaitForHandle future."),
WaitState::Failed => {
panic!("WaitForHandle future's waiter callback hit unexpected behavior.")
}
}
}
}
impl<'a, T> Drop for WaitForHandle<'a, T>
where
T: AsRawHandle,
{
fn drop(&mut self) {
// We cannot hold the lock over the call to unregister_wait, otherwise we could deadlock
// with the callback trying to access the same data. It is sufficient to just verify
// (without mutual exclusion beyond the data access itself) that we have exited the New
// state before attempting to unregister. This works because once we have exited New, we
// cannot ever re-enter that state, and we know for sure that inner.wait_object is a valid
// wait object.
let (current_state, wait_object) = {
let inner = self.inner.lock();
(inner.wait_state, inner.wait_object)
};
// Safe because self.descriptor is valid in any state except New or Finished.
//
// Note: this method call is critical for supplying the safety guarantee relied upon by
// wait_for_handle_waker. Upon return, it ensures that wait_for_handle_waker is not running
// and won't be scheduled again, which makes it safe to drop self.inner_for_callback
// (wait_for_handle_waker has a non owning pointer to self.inner_for_callback).
if current_state != WaitState::New && current_state != WaitState::Finished {
unsafe { unregister_wait(wait_object) }
}
}
}
/// Safe portion of the RegisterWaitForSingleObject callback.
fn process_wait_state_change(
mut state: MutexGuard<WaitForHandleInner>,
wait_fired: bool,
) -> Option<Waker> {
let mut waker = None;
state.wait_state = match state.wait_state {
WaitState::Sleeping => {
let new_state = if wait_fired {
WaitState::Woken
} else {
// This should never happen.
error!("wait_for_handle_waker did not wake due to wait firing.");
WaitState::Aborted
};
match state.waker.take() {
Some(w) => {
waker = Some(w);
new_state
}
None => {
error!("wait_for_handler_waker called, but no waker available.");
WaitState::Failed
}
}
}
_ => {
error!("wait_for_handle_waker called with state != sleeping.");
WaitState::Failed
}
};
waker
}
/// # Safety
/// a) inner_ptr is valid whenever this function can be called. This is guaranteed by WaitForHandle,
/// which cannot be dropped until this function has finished running & is no longer queued for
/// execution because the Drop impl calls UnregisterWaitEx, which blocks on that condition.
unsafe extern "system" fn wait_for_handle_waker(inner_ptr: PVOID, timer_or_wait_fired: BOOLEAN) {
let inner = inner_ptr as *const Mutex<WaitForHandleInner>;
let inner_locked = (*inner).lock();
let waker = process_wait_state_change(
inner_locked,
/* wait_fired= */ timer_or_wait_fired == FALSE,
);
// We wake *after* releasing the lock to avoid waking up a thread that then will go back to
// sleep because the lock it needs is currently held.
if let Some(w) = waker {
w.wake()
}
}
/// # Safety
/// a) desc must be a valid wait handle from RegisterWaitForSingleObject.
unsafe fn unregister_wait(desc: Descriptor) {
if UnregisterWaitEx(desc.0, INVALID_HANDLE_VALUE) == 0 {
warn!(
"WaitForHandle: failed to clean up RegisterWaitForSingleObject wait handle: {}",
base::Error::last()
)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
waker::{new_waker, WeakWake},
EventAsync, Executor,
};
use base::{thread::spawn_with_timeout, Event};
use futures::pin_mut;
use std::{
sync::{Arc, Weak},
time::Duration,
};
struct FakeWaker {}
impl WeakWake for FakeWaker {
fn wake_by_ref(_weak_self: &Weak<Self>) {
// Do nothing.
}
}
#[test]
fn test_unsignaled_event() {
async fn wait_on_unsignaled_event(evt: EventAsync) {
evt.next_val().await.unwrap();
panic!("await should never terminate");
}
let fake_waker = Arc::new(FakeWaker {});
let waker = new_waker(Arc::downgrade(&fake_waker));
let mut cx = Context::from_waker(&waker);
let ex = Executor::new().unwrap();
let evt = Event::new().unwrap();
let async_evt = EventAsync::new(evt, &ex).unwrap();
let fut = wait_on_unsignaled_event(async_evt);
pin_mut!(fut);
// Assert we make it to the pending state. This means we've registered a wait.
assert_eq!(fut.poll(&mut cx), Poll::Pending);
// If this test doesn't crash trying to drop the future, it is considered successful.
}
#[test]
fn test_signaled_event() {
let join_handle = spawn_with_timeout(|| {
async fn wait_on_signaled_event(evt: EventAsync) {
evt.next_val().await.unwrap();
}
let ex = Executor::new().unwrap();
let evt = Event::new().unwrap();
evt.write(0).unwrap();
let async_evt = EventAsync::new(evt, &ex).unwrap();
let fut = wait_on_signaled_event(async_evt);
pin_mut!(fut);
ex.run_until(fut).unwrap();
});
join_handle
.try_join(Duration::from_secs(5))
.expect("async wait never returned from signaled event.");
}
}

View file

@ -2,44 +2,30 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use base::{Result as SysResult, Timer as TimerFd};
use crate::{AsyncResult, Error, Executor, IntoAsync, IoSourceExt};
use base::{Result as SysResult, Timer};
use std::time::Duration;
use super::{AsyncResult, Error, Executor, IntoAsync, IoSourceExt};
#[cfg(test)]
use super::{FdExecutor, URingExecutor};
/// An async version of base::TimerFd.
/// An async version of base::Timer.
pub struct TimerAsync {
io_source: Box<dyn IoSourceExt<TimerFd>>,
pub(crate) io_source: Box<dyn IoSourceExt<Timer>>,
}
impl TimerAsync {
pub fn new(timer: TimerFd, ex: &Executor) -> AsyncResult<TimerAsync> {
pub fn new(timer: Timer, ex: &Executor) -> AsyncResult<TimerAsync> {
ex.async_from(timer)
.map(|io_source| TimerAsync { io_source })
}
#[cfg(test)]
pub(crate) fn new_poll(timer: TimerFd, ex: &FdExecutor) -> AsyncResult<TimerAsync> {
super::executor::async_poll_from(timer, ex).map(|io_source| TimerAsync { io_source })
}
#[cfg(test)]
pub(crate) fn new_uring(timer: TimerFd, ex: &URingExecutor) -> AsyncResult<TimerAsync> {
super::executor::async_uring_from(timer, ex).map(|io_source| TimerAsync { io_source })
}
/// Gets the next value from the timer.
pub async fn next_val(&self) -> AsyncResult<u64> {
self.io_source.read_u64().await
self.io_source.wait_for_handle().await
}
/// Async sleep for the given duration
pub async fn sleep(ex: &Executor, dur: Duration) -> std::result::Result<(), Error> {
let mut tfd = TimerFd::new().map_err(Error::TimerFd)?;
tfd.reset(dur, None).map_err(Error::TimerFd)?;
let mut tfd = Timer::new().map_err(Error::Timer)?;
tfd.reset(dur, None).map_err(Error::Timer)?;
let t = TimerAsync::new(tfd, ex).map_err(Error::TimerAsync)?;
t.next_val().await.map_err(Error::TimerAsync)?;
Ok(())
@ -53,67 +39,4 @@ impl TimerAsync {
}
}
impl IntoAsync for TimerFd {}
#[cfg(test)]
mod tests {
use super::{super::uring_executor::use_uring, *};
use std::time::{Duration, Instant};
#[test]
fn one_shot() {
if !use_uring() {
return;
}
async fn this_test(ex: &URingExecutor) {
let mut tfd = TimerFd::new().expect("failed to create timerfd");
let dur = Duration::from_millis(200);
let now = Instant::now();
tfd.reset(dur, None).expect("failed to arm timer");
let t = TimerAsync::new_uring(tfd, ex).unwrap();
let count = t.next_val().await.expect("unable to wait for timer");
assert_eq!(count, 1);
assert!(now.elapsed() >= dur);
}
let ex = URingExecutor::new().unwrap();
ex.run_until(this_test(&ex)).unwrap();
}
#[test]
fn one_shot_fd() {
async fn this_test(ex: &FdExecutor) {
let mut tfd = TimerFd::new().expect("failed to create timerfd");
let dur = Duration::from_millis(200);
let now = Instant::now();
tfd.reset(dur, None).expect("failed to arm timer");
let t = TimerAsync::new_poll(tfd, ex).unwrap();
let count = t.next_val().await.expect("unable to wait for timer");
assert_eq!(count, 1);
assert!(now.elapsed() >= dur);
}
let ex = FdExecutor::new().unwrap();
ex.run_until(this_test(&ex)).unwrap();
}
#[test]
fn timer() {
async fn this_test(ex: &Executor) {
let dur = Duration::from_millis(200);
let now = Instant::now();
TimerAsync::sleep(ex, dur).await.expect("unable to sleep");
assert!(now.elapsed() >= dur);
}
let ex = Executor::new().expect("creating an executor failed");
ex.run_until(this_test(&ex)).unwrap();
}
}
impl IntoAsync for Timer {}