Refactoring: Move common/cros_async to cros_async

This runs the script added in https://crrev.com/c/3533607

BUG=b:22320646
TEST=presubmit

Change-Id: I2e7efdb35508d45281f046e64c24aa43e27f2000
Reviewed-on: https://chromium-review.googlesource.com/c/chromiumos/platform/crosvm/+/3533608
Reviewed-by: Daniel Verkamp <dverkamp@chromium.org>
Tested-by: kokoro <noreply+kokoro@google.com>
This commit is contained in:
Dennis Kempin 2022-03-17 11:26:01 -07:00
parent dca90f49bd
commit 55c6a3b5cd
31 changed files with 9787 additions and 6 deletions

View file

@ -45,6 +45,7 @@ members = [
"arch",
"base",
"bit_field",
"cros_async",
"crosvm-fuzz",
"crosvm_control",
"crosvm_plugin",
@ -192,7 +193,7 @@ assertions = { path = "common/assertions" }
audio_streams = { path = "common/audio_streams" }
base = { path = "base" }
sys_util_core = { path = "common/sys_util_core" }
cros_async = { path = "common/cros_async" }
cros_async = { path = "cros_async" }
cros_fuzz = { path = "common/cros-fuzz" } # ignored by ebuild
data_model = { path = "common/data_model" }
libcras = { path = "libcras_stub" } # ignored by ebuild

View file

@ -9,7 +9,7 @@ chromeos = ["sys_util/chromeos"]
[dependencies]
audio_streams = { path = "../common/audio_streams" }
cros_async = { path = "../common/cros_async" }
cros_async = { path = "../cros_async" }
data_model = { path = "../common/data_model" }
libc = "*"
remain = "0.2"

36
cros_async/Cargo.toml Normal file
View file

@ -0,0 +1,36 @@
[package]
name = "cros_async"
version = "0.1.0"
authors = ["The Chromium OS Authors"]
edition = "2021"
[dependencies]
async-trait = "0.1.36"
async-task = "4"
data_model = { path = "../common/data_model" } # provided by ebuild
intrusive-collections = "0.9"
io_uring = { path = "../common/io_uring" } # provided by ebuild
libc = "*"
once_cell = "1.7.2"
paste = "1.0"
pin-utils = "0.1.0-alpha.4"
remain = "0.2"
slab = "0.4"
sync = { path = "../common/sync" } # provided by ebuild
sys_util = { path = "../common/sys_util" } # provided by ebuild
thiserror = "1.0.20"
audio_streams = { path = "../common/audio_streams" } # provided by ebuild
anyhow = "1.0"
[dependencies.futures]
version = "*"
default-features = false
features = ["alloc"]
[dev-dependencies]
futures = { version = "*", features = ["executor"] }
futures-executor = { version = "0.3", features = ["thread-pool"] }
futures-util = "0.3"
tempfile = "3"

4
cros_async/DEPRECATED.md Normal file
View file

@ -0,0 +1,4 @@
Use crosvm/cros_async instead.
Code in this directory is not used by crosvm, it is only used in ChromeOS and will move to a
separate ChromeOS repository soon.

View file

@ -0,0 +1,68 @@
// Copyright 2022 The Chromium OS Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
//! Implements the interface required by `audio_streams` using the cros_async Executor.
//!
//! It implements the `AudioStreamsExecutor` trait for `Executor`, so it can be passed into
//! the audio_streams API.
#[cfg(unix)]
use std::os::unix::net::UnixStream;
use std::{io::Result, time::Duration};
use super::{AsyncWrapper, IntoAsync, IoSourceExt, TimerAsync};
use async_trait::async_trait;
use audio_streams::async_api::{
AsyncStream, AudioStreamsExecutor, ReadAsync, ReadWriteAsync, WriteAsync,
};
/// A wrapper around IoSourceExt that is compatible with the audio_streams traits.
pub struct IoSourceWrapper<T: IntoAsync + Send> {
source: Box<dyn IoSourceExt<T> + Send>,
}
#[async_trait(?Send)]
impl<T: IntoAsync + Send> ReadAsync for IoSourceWrapper<T> {
async fn read_to_vec<'a>(
&'a self,
file_offset: Option<u64>,
vec: Vec<u8>,
) -> Result<(usize, Vec<u8>)> {
self.source
.read_to_vec(file_offset, vec)
.await
.map_err(Into::into)
}
}
#[async_trait(?Send)]
impl<T: IntoAsync + Send> WriteAsync for IoSourceWrapper<T> {
async fn write_from_vec<'a>(
&'a self,
file_offset: Option<u64>,
vec: Vec<u8>,
) -> Result<(usize, Vec<u8>)> {
self.source
.write_from_vec(file_offset, vec)
.await
.map_err(Into::into)
}
}
#[async_trait(?Send)]
impl<T: IntoAsync + Send> ReadWriteAsync for IoSourceWrapper<T> {}
#[async_trait(?Send)]
impl AudioStreamsExecutor for super::Executor {
#[cfg(unix)]
fn async_unix_stream(&self, stream: UnixStream) -> Result<AsyncStream> {
return Ok(Box::new(IoSourceWrapper {
source: self.async_from(AsyncWrapper::new(stream))?,
}));
}
async fn delay(&self, dur: Duration) -> Result<()> {
TimerAsync::sleep(self, dur).await.map_err(Into::into)
}
}

View file

@ -0,0 +1,9 @@
// Copyright 2021 The Chromium OS Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
mod block_on;
mod pool;
pub use block_on::*;
pub use pool::*;

View file

@ -0,0 +1,202 @@
// Copyright 2020 The Chromium OS Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use std::{
future::Future,
ptr,
sync::{
atomic::{AtomicI32, Ordering},
Arc,
},
task::{Context, Poll},
};
use futures::{
pin_mut,
task::{waker_ref, ArcWake},
};
// Randomly generated values to indicate the state of the current thread.
const WAITING: i32 = 0x25de_74d1;
const WOKEN: i32 = 0x72d3_2c9f;
const FUTEX_WAIT_PRIVATE: libc::c_int = libc::FUTEX_WAIT | libc::FUTEX_PRIVATE_FLAG;
const FUTEX_WAKE_PRIVATE: libc::c_int = libc::FUTEX_WAKE | libc::FUTEX_PRIVATE_FLAG;
thread_local!(static PER_THREAD_WAKER: Arc<Waker> = Arc::new(Waker(AtomicI32::new(WAITING))));
#[repr(transparent)]
struct Waker(AtomicI32);
extern "C" {
#[cfg_attr(target_os = "android", link_name = "__errno")]
#[cfg_attr(target_os = "linux", link_name = "__errno_location")]
fn errno_location() -> *mut libc::c_int;
}
impl ArcWake for Waker {
fn wake_by_ref(arc_self: &Arc<Self>) {
let state = arc_self.0.swap(WOKEN, Ordering::Release);
if state == WAITING {
// The thread hasn't already been woken up so wake it up now. Safe because this doesn't
// modify any memory and we check the return value.
let res = unsafe {
libc::syscall(
libc::SYS_futex,
&arc_self.0,
FUTEX_WAKE_PRIVATE,
libc::INT_MAX, // val
ptr::null() as *const libc::timespec, // timeout
ptr::null() as *const libc::c_int, // uaddr2
0_i32, // val3
)
};
if res < 0 {
panic!("unexpected error from FUTEX_WAKE_PRIVATE: {}", unsafe {
*errno_location()
});
}
}
}
}
/// Run a future to completion on the current thread.
///
/// This method will block the current thread until `f` completes. Useful when you need to call an
/// async fn from a non-async context.
pub fn block_on<F: Future>(f: F) -> F::Output {
pin_mut!(f);
PER_THREAD_WAKER.with(|thread_waker| {
let waker = waker_ref(thread_waker);
let mut cx = Context::from_waker(&waker);
loop {
if let Poll::Ready(t) = f.as_mut().poll(&mut cx) {
return t;
}
let state = thread_waker.0.swap(WAITING, Ordering::Acquire);
if state == WAITING {
// If we weren't already woken up then wait until we are. Safe because this doesn't
// modify any memory and we check the return value.
let res = unsafe {
libc::syscall(
libc::SYS_futex,
&thread_waker.0,
FUTEX_WAIT_PRIVATE,
state,
ptr::null() as *const libc::timespec, // timeout
ptr::null() as *const libc::c_int, // uaddr2
0_i32, // val3
)
};
if res < 0 {
// Safe because libc guarantees that this is a valid pointer.
match unsafe { *errno_location() } {
libc::EAGAIN | libc::EINTR => {}
e => panic!("unexpected error from FUTEX_WAIT_PRIVATE: {}", e),
}
}
// Clear the state to prevent unnecessary extra loop iterations and also to allow
// nested usage of `block_on`.
thread_waker.0.store(WAITING, Ordering::Release);
}
}
})
}
#[cfg(test)]
mod test {
use super::*;
use std::{
future::Future,
pin::Pin,
sync::{
mpsc::{channel, Sender},
Arc,
},
task::{Context, Poll, Waker},
thread,
time::Duration,
};
use super::super::super::sync::SpinLock;
struct TimerState {
fired: bool,
waker: Option<Waker>,
}
struct Timer {
state: Arc<SpinLock<TimerState>>,
}
impl Future for Timer {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let mut state = self.state.lock();
if state.fired {
return Poll::Ready(());
}
state.waker = Some(cx.waker().clone());
Poll::Pending
}
}
fn start_timer(dur: Duration, notify: Option<Sender<()>>) -> Timer {
let state = Arc::new(SpinLock::new(TimerState {
fired: false,
waker: None,
}));
let thread_state = Arc::clone(&state);
thread::spawn(move || {
thread::sleep(dur);
let mut ts = thread_state.lock();
ts.fired = true;
if let Some(waker) = ts.waker.take() {
waker.wake();
}
drop(ts);
if let Some(tx) = notify {
tx.send(()).expect("Failed to send completion notification");
}
});
Timer { state }
}
#[test]
fn it_works() {
block_on(start_timer(Duration::from_millis(100), None));
}
#[test]
fn nested() {
async fn inner() {
block_on(start_timer(Duration::from_millis(100), None));
}
block_on(inner());
}
#[test]
fn ready_before_poll() {
let (tx, rx) = channel();
let timer = start_timer(Duration::from_millis(50), Some(tx));
rx.recv()
.expect("Failed to receive completion notification");
// We know the timer has already fired so the poll should complete immediately.
block_on(timer);
}
}

View file

@ -0,0 +1,524 @@
// Copyright 2021 The Chromium OS Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use std::{
collections::VecDeque,
mem,
sync::{
mpsc::{channel, Receiver, Sender},
Arc,
},
thread::{
JoinHandle, {self},
},
time::{Duration, Instant},
};
use async_task::{Runnable, Task};
use slab::Slab;
use sync::{Condvar, Mutex};
use sys_util::{error, warn};
const DEFAULT_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(10);
struct State {
tasks: VecDeque<Runnable>,
num_threads: usize,
num_idle: usize,
num_notified: usize,
worker_threads: Slab<JoinHandle<()>>,
exited_threads: Option<Receiver<usize>>,
exit: Sender<usize>,
shutting_down: bool,
}
fn run_blocking_thread(idx: usize, inner: Arc<Inner>, exit: Sender<usize>) {
let mut state = inner.state.lock();
while !state.shutting_down {
if let Some(runnable) = state.tasks.pop_front() {
drop(state);
runnable.run();
state = inner.state.lock();
continue;
}
// No more tasks so wait for more work.
state.num_idle += 1;
let (guard, result) = inner
.condvar
.wait_timeout_while(state, inner.keepalive, |s| {
!s.shutting_down && s.num_notified == 0
});
state = guard;
// If `state.num_notified > 0` then this was a real wakeup.
if state.num_notified > 0 {
state.num_notified -= 1;
continue;
}
// Only decrement the idle count if we timed out. Otherwise, it was decremented when new
// work was added to `state.tasks`.
if result.timed_out() {
state.num_idle = state
.num_idle
.checked_sub(1)
.expect("`num_idle` underflow on timeout");
break;
}
}
state.num_threads -= 1;
// If we're shutting down then the BlockingPool will take care of joining all the threads.
// Otherwise, we need to join the last worker thread that exited here.
let last_exited_thread = if let Some(exited_threads) = state.exited_threads.as_mut() {
exited_threads
.try_recv()
.map(|idx| state.worker_threads.remove(idx))
.ok()
} else {
None
};
// Drop the lock before trying to join the last exited thread.
drop(state);
if let Some(handle) = last_exited_thread {
let _ = handle.join();
}
if let Err(e) = exit.send(idx) {
error!("Failed to send thread exit event on channel: {}", e);
}
}
struct Inner {
state: Mutex<State>,
condvar: Condvar,
max_threads: usize,
keepalive: Duration,
}
impl Inner {
fn schedule(self: &Arc<Inner>, runnable: Runnable) {
let mut state = self.state.lock();
// If we're shutting down then nothing is going to run this task.
if state.shutting_down {
return;
}
state.tasks.push_back(runnable);
if state.num_idle == 0 {
// There are no idle threads. Spawn a new one if possible.
if state.num_threads < self.max_threads {
state.num_threads += 1;
let exit = state.exit.clone();
let entry = state.worker_threads.vacant_entry();
let idx = entry.key();
let inner = self.clone();
entry.insert(
thread::Builder::new()
.name(format!("blockingPool{}", idx))
.spawn(move || run_blocking_thread(idx, inner, exit))
.unwrap(),
);
}
} else {
// We have idle threads, wake one up.
state.num_idle -= 1;
state.num_notified += 1;
self.condvar.notify_one();
}
}
}
#[derive(Debug, thiserror::Error)]
#[error("{0} BlockingPool threads did not exit in time and will be detached")]
pub struct ShutdownTimedOut(usize);
/// A thread pool for running work that may block.
///
/// It is generally discouraged to do any blocking work inside an async function. However, this is
/// sometimes unavoidable when dealing with interfaces that don't provide async variants. In this
/// case callers may use the `BlockingPool` to run the blocking work on a different thread and
/// `await` for its result to finish, which will prevent blocking the main thread of the
/// application.
///
/// Since the blocking work is sent to another thread, users should be careful when using the
/// `BlockingPool` for latency-sensitive operations. Additionally, the `BlockingPool` is intended to
/// be used for work that will eventually complete on its own. Users who want to spawn a thread
/// should just use `thread::spawn` directly.
///
/// There is no way to cancel work once it has been picked up by one of the worker threads in the
/// `BlockingPool`. Dropping or shutting down the pool will block up to a timeout (default 10
/// seconds) to wait for any active blocking work to finish. Any threads running tasks that have not
/// completed by that time will be detached.
///
/// # Examples
///
/// Spawn a task to run in the `BlockingPool` and await on its result.
///
/// ```edition2018
/// use cros_async::BlockingPool;
///
/// # async fn do_it() {
/// let pool = BlockingPool::default();
///
/// let res = pool.spawn(move || {
/// // Do some CPU-intensive or blocking work here.
///
/// 42
/// }).await;
///
/// assert_eq!(res, 42);
/// # }
/// # cros_async::block_on(do_it());
/// ```
pub struct BlockingPool {
inner: Arc<Inner>,
}
impl BlockingPool {
/// Create a new `BlockingPool`.
///
/// The `BlockingPool` will never spawn more than `max_threads` threads to do work, regardless
/// of the number of tasks that are added to it. This value should be set relatively low (for
/// example, the number of CPUs on the machine) if the pool is intended to run CPU intensive
/// work or it should be set relatively high (128 or more) if the pool is intended to be used
/// for various IO operations that cannot be completed asynchronously. The default value is 256.
///
/// Worker threads are spawned on demand when new work is added to the pool and will
/// automatically exit after being idle for some time so there is no overhead for setting
/// `max_threads` to a large value when there is little to no work assigned to the
/// `BlockingPool`. `keepalive` determines the idle duration after which the worker thread will
/// exit. The default value is 10 seconds.
pub fn new(max_threads: usize, keepalive: Duration) -> BlockingPool {
let (exit, exited_threads) = channel();
BlockingPool {
inner: Arc::new(Inner {
state: Mutex::new(State {
tasks: VecDeque::new(),
num_threads: 0,
num_idle: 0,
num_notified: 0,
worker_threads: Slab::new(),
exited_threads: Some(exited_threads),
exit,
shutting_down: false,
}),
condvar: Condvar::new(),
max_threads,
keepalive,
}),
}
}
/// Like new but with pre-allocating capacity for up to `max_threads`.
pub fn with_capacity(max_threads: usize, keepalive: Duration) -> BlockingPool {
let (exit, exited_threads) = channel();
BlockingPool {
inner: Arc::new(Inner {
state: Mutex::new(State {
tasks: VecDeque::new(),
num_threads: 0,
num_idle: 0,
num_notified: 0,
worker_threads: Slab::with_capacity(max_threads),
exited_threads: Some(exited_threads),
exit,
shutting_down: false,
}),
condvar: Condvar::new(),
max_threads,
keepalive,
}),
}
}
/// Spawn a task to run in the `BlockingPool`.
///
/// Callers may `await` the returned `Task` to be notified when the work is completed.
///
/// # Panics
///
/// `await`ing a `Task` after dropping the `BlockingPool` or calling `BlockingPool::shutdown`
/// will panic if the work was not completed before the pool was shut down.
pub fn spawn<F, R>(&self, f: F) -> Task<R>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
let raw = Arc::downgrade(&self.inner);
let schedule = move |runnable| {
if let Some(i) = raw.upgrade() {
i.schedule(runnable);
}
};
let (runnable, task) = async_task::spawn(async move { f() }, schedule);
runnable.schedule();
task
}
/// Shut down the `BlockingPool`.
///
/// If `deadline` is provided then this will block until either all worker threads exit or the
/// deadline is exceeded. If `deadline` is not given then this will block indefinitely until all
/// worker threads exit. Any work that was added to the `BlockingPool` but not yet picked up by
/// a worker thread will not complete and `await`ing on the `Task` for that work will panic.
pub fn shutdown(&self, deadline: Option<Instant>) -> Result<(), ShutdownTimedOut> {
let mut state = self.inner.state.lock();
if state.shutting_down {
// We've already shut down this BlockingPool.
return Ok(());
}
state.shutting_down = true;
let exited_threads = state.exited_threads.take().expect("exited_threads missing");
let unfinished_tasks = std::mem::take(&mut state.tasks);
let mut worker_threads = mem::replace(&mut state.worker_threads, Slab::new());
drop(state);
self.inner.condvar.notify_all();
// Cancel any unfinished work after releasing the lock.
drop(unfinished_tasks);
// Now wait for all worker threads to exit.
if let Some(deadline) = deadline {
let mut now = Instant::now();
while now < deadline && !worker_threads.is_empty() {
if let Ok(idx) = exited_threads.recv_timeout(deadline - now) {
let _ = worker_threads.remove(idx).join();
}
now = Instant::now();
}
// Any threads that have not yet joined will just be detached.
if !worker_threads.is_empty() {
return Err(ShutdownTimedOut(worker_threads.len()));
}
Ok(())
} else {
// Block indefinitely until all worker threads exit.
for handle in worker_threads.drain() {
let _ = handle.join();
}
Ok(())
}
}
}
impl Default for BlockingPool {
fn default() -> BlockingPool {
BlockingPool::new(256, Duration::from_secs(10))
}
}
impl Drop for BlockingPool {
fn drop(&mut self) {
if let Err(e) = self.shutdown(Some(Instant::now() + DEFAULT_SHUTDOWN_TIMEOUT)) {
warn!("{}", e);
}
}
}
#[cfg(test)]
mod test {
use std::{
sync::{Arc, Barrier},
thread,
time::{Duration, Instant},
};
use futures::{stream::FuturesUnordered, StreamExt};
use sync::{Condvar, Mutex};
use super::super::super::{block_on, BlockingPool};
#[test]
fn blocking_sleep() {
let pool = BlockingPool::default();
let res = block_on(pool.spawn(|| 42));
assert_eq!(res, 42);
}
#[test]
fn fast_tasks_with_short_keepalive() {
let pool = BlockingPool::new(256, Duration::from_millis(1));
let streams = FuturesUnordered::new();
for _ in 0..2 {
for _ in 0..256 {
let task = pool.spawn(|| ());
streams.push(task);
}
thread::sleep(Duration::from_millis(1));
}
block_on(streams.collect::<Vec<_>>());
// The test passes if there are no panics, which would happen if one of the worker threads
// triggered an underflow on `pool.inner.state.num_idle`.
}
#[test]
fn more_tasks_than_threads() {
let pool = BlockingPool::new(4, Duration::from_secs(10));
let stream = (0..19)
.map(|_| pool.spawn(|| thread::sleep(Duration::from_millis(5))))
.collect::<FuturesUnordered<_>>();
let results = block_on(stream.collect::<Vec<_>>());
assert_eq!(results.len(), 19);
}
#[test]
fn shutdown() {
let pool = BlockingPool::default();
let stream = (0..19)
.map(|_| pool.spawn(|| thread::sleep(Duration::from_millis(5))))
.collect::<FuturesUnordered<_>>();
let results = block_on(stream.collect::<Vec<_>>());
assert_eq!(results.len(), 19);
pool.shutdown(Some(Instant::now() + Duration::from_secs(10)))
.unwrap();
let state = pool.inner.state.lock();
assert_eq!(state.num_threads, 0);
}
#[test]
fn keepalive_timeout() {
// Set the keepalive to a very low value so that threads will exit soon after they run out
// of work.
let pool = BlockingPool::new(7, Duration::from_millis(1));
let stream = (0..19)
.map(|_| pool.spawn(|| thread::sleep(Duration::from_millis(5))))
.collect::<FuturesUnordered<_>>();
let results = block_on(stream.collect::<Vec<_>>());
assert_eq!(results.len(), 19);
// Wait for all threads to exit.
let deadline = Instant::now() + Duration::from_secs(10);
while Instant::now() < deadline {
thread::sleep(Duration::from_millis(100));
let state = pool.inner.state.lock();
if state.num_threads == 0 {
break;
}
}
{
let state = pool.inner.state.lock();
assert_eq!(state.num_threads, 0);
assert_eq!(state.num_idle, 0);
}
}
#[test]
#[should_panic]
fn shutdown_with_pending_work() {
let pool = BlockingPool::new(1, Duration::from_secs(10));
let mu = Arc::new(Mutex::new(false));
let cv = Arc::new(Condvar::new());
// First spawn a thread that blocks the pool.
let task_mu = mu.clone();
let task_cv = cv.clone();
pool.spawn(move || {
let mut ready = task_mu.lock();
while !*ready {
ready = task_cv.wait(ready);
}
})
.detach();
// This task will never finish because we will shut down the pool first.
let unfinished = pool.spawn(|| 5);
// Spawn a thread to unblock the work we started earlier once it sees that the pool is
// shutting down.
let inner = pool.inner.clone();
thread::spawn(move || {
let mut state = inner.state.lock();
while !state.shutting_down {
state = inner.condvar.wait(state);
}
*mu.lock() = true;
cv.notify_all();
});
pool.shutdown(None).unwrap();
// This should panic.
assert_eq!(block_on(unfinished), 5);
}
#[test]
fn unfinished_worker_thread() {
let pool = BlockingPool::default();
let ready = Arc::new(Mutex::new(false));
let cv = Arc::new(Condvar::new());
let barrier = Arc::new(Barrier::new(2));
let thread_ready = ready.clone();
let thread_barrier = barrier.clone();
let thread_cv = cv.clone();
let task = pool.spawn(move || {
thread_barrier.wait();
let mut ready = thread_ready.lock();
while !*ready {
ready = thread_cv.wait(ready);
}
});
// Wait to shut down the pool until after the worker thread has started.
barrier.wait();
pool.shutdown(Some(Instant::now() + Duration::from_millis(5)))
.unwrap_err();
let num_threads = pool.inner.state.lock().num_threads;
assert_eq!(num_threads, 1);
// Now wake up the blocked task so we don't leak the thread.
*ready.lock() = true;
cv.notify_all();
block_on(task);
let deadline = Instant::now() + Duration::from_secs(10);
while Instant::now() < deadline {
thread::sleep(Duration::from_millis(100));
let state = pool.inner.state.lock();
if state.num_threads == 0 {
break;
}
}
{
let state = pool.inner.state.lock();
assert_eq!(state.num_threads, 0);
assert_eq!(state.num_idle, 0);
}
}
}

View file

@ -0,0 +1,91 @@
// Copyright 2020 The Chromium OS Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
// Need non-snake case so the macro can re-use type names for variables.
#![allow(non_snake_case)]
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
use futures::future::{maybe_done, MaybeDone};
use pin_utils::unsafe_pinned;
// Macro-generate future combinators to allow for running different numbers of top-level futures in
// this FutureList. Generates the implementation of `FutureList` for the completion types. For an
// explicit example this is modeled after, see `UnitFutures`.
macro_rules! generate {
($(
$(#[$doc:meta])*
($Complete:ident, <$($Fut:ident),*>),
)*) => ($(
#[must_use = "Combinations of futures don't do anything unless run in an executor."]
pub(crate) struct $Complete<$($Fut: Future),*> {
$($Fut: MaybeDone<$Fut>,)*
}
impl<$($Fut),*> $Complete<$($Fut),*>
where $(
$Fut: Future,
)*
{
// Safety:
// * No Drop impl
// * No Unpin impl
// * Not #[repr(packed)]
$(
unsafe_pinned!($Fut: MaybeDone<$Fut>);
)*
pub(crate) fn new($($Fut: $Fut),*) -> $Complete<$($Fut),*> {
$(
let $Fut = maybe_done($Fut);
)*
$Complete {
$($Fut),*
}
}
}
impl<$($Fut),*> Future for $Complete<$($Fut),*>
where $(
$Fut: Future,
)*
{
type Output = ($($Fut::Output),*);
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let mut complete = true;
$(
complete &= self.as_mut().$Fut().poll(cx).is_ready();
)*
if complete {
$(
let $Fut = self.as_mut().$Fut().take_output().unwrap();
)*
Poll::Ready(($($Fut), *))
} else {
Poll::Pending
}
}
}
)*)
}
generate! {
/// _Future for the [`complete2`] function.
(Complete2, <_Fut1, _Fut2>),
/// _Future for the [`complete3`] function.
(Complete3, <_Fut1, _Fut2, _Fut3>),
/// _Future for the [`complete4`] function.
(Complete4, <_Fut1, _Fut2, _Fut3, _Fut4>),
/// _Future for the [`complete5`] function.
(Complete5, <_Fut1, _Fut2, _Fut3, _Fut4, _Fut5>),
}

85
cros_async/src/event.rs Normal file
View file

@ -0,0 +1,85 @@
// Copyright 2020 The Chromium OS Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use sys_util::EventFd;
use super::{AsyncResult, Executor, IntoAsync, IoSourceExt};
/// An async version of `sys_util::EventFd`.
pub struct EventAsync {
io_source: Box<dyn IoSourceExt<EventFd>>,
}
impl EventAsync {
pub fn new(event: EventFd, ex: &Executor) -> AsyncResult<EventAsync> {
ex.async_from(event)
.map(|io_source| EventAsync { io_source })
}
#[cfg(test)]
pub(crate) fn new_poll(event: EventFd, ex: &super::FdExecutor) -> AsyncResult<EventAsync> {
super::executor::async_poll_from(event, ex).map(|io_source| EventAsync { io_source })
}
#[cfg(test)]
pub(crate) fn new_uring(event: EventFd, ex: &super::URingExecutor) -> AsyncResult<EventAsync> {
super::executor::async_uring_from(event, ex).map(|io_source| EventAsync { io_source })
}
/// Gets the next value from the eventfd.
#[allow(dead_code)]
pub async fn next_val(&self) -> AsyncResult<u64> {
self.io_source.read_u64().await
}
}
impl IntoAsync for EventFd {}
#[cfg(test)]
mod tests {
use super::*;
use super::super::{uring_executor::use_uring, Executor, FdExecutor, URingExecutor};
#[test]
fn next_val_reads_value() {
async fn go(event: EventFd, ex: &Executor) -> u64 {
let event_async = EventAsync::new(event, ex).unwrap();
event_async.next_val().await.unwrap()
}
let eventfd = EventFd::new().unwrap();
eventfd.write(0xaa).unwrap();
let ex = Executor::new().unwrap();
let val = ex.run_until(go(eventfd, &ex)).unwrap();
assert_eq!(val, 0xaa);
}
#[test]
fn next_val_reads_value_poll_and_ring() {
if !use_uring() {
return;
}
async fn go(event_async: EventAsync) -> u64 {
event_async.next_val().await.unwrap()
}
let eventfd = EventFd::new().unwrap();
eventfd.write(0xaa).unwrap();
let uring_ex = URingExecutor::new().unwrap();
let val = uring_ex
.run_until(go(EventAsync::new_uring(eventfd, &uring_ex).unwrap()))
.unwrap();
assert_eq!(val, 0xaa);
let eventfd = EventFd::new().unwrap();
eventfd.write(0xaa).unwrap();
let poll_ex = FdExecutor::new().unwrap();
let val = poll_ex
.run_until(go(EventAsync::new_poll(eventfd, &poll_ex).unwrap()))
.unwrap();
assert_eq!(val, 0xaa);
}
}

344
cros_async/src/executor.rs Normal file
View file

@ -0,0 +1,344 @@
// Copyright 2020 The Chromium OS Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use std::future::Future;
use async_task::Task;
use super::{
poll_source::Error as PollError, uring_executor::use_uring, AsyncResult, FdExecutor, IntoAsync,
IoSourceExt, PollSource, URingExecutor, UringSource,
};
pub(crate) fn async_uring_from<'a, F: IntoAsync + Send + 'a>(
f: F,
ex: &URingExecutor,
) -> AsyncResult<Box<dyn IoSourceExt<F> + Send + 'a>> {
Ok(UringSource::new(f, ex).map(|u| Box::new(u) as Box<dyn IoSourceExt<F> + Send>)?)
}
/// Creates a concrete `IoSourceExt` using the fd_executor.
pub(crate) fn async_poll_from<'a, F: IntoAsync + Send + 'a>(
f: F,
ex: &FdExecutor,
) -> AsyncResult<Box<dyn IoSourceExt<F> + Send + 'a>> {
Ok(PollSource::new(f, ex).map(|u| Box::new(u) as Box<dyn IoSourceExt<F> + Send>)?)
}
/// An executor for scheduling tasks that poll futures to completion.
///
/// All asynchronous operations must run within an executor, which is capable of spawning futures as
/// tasks. This executor also provides a mechanism for performing asynchronous I/O operations.
///
/// The returned type is a cheap, clonable handle to the underlying executor. Cloning it will only
/// create a new reference, not a new executor.
///
/// # Examples
///
/// Concurrently wait for multiple files to become readable/writable and then read/write the data.
///
/// ```
/// use std::cmp::min;
/// use std::error::Error;
/// use std::fs::{File, OpenOptions};
///
/// use cros_async::{AsyncResult, Executor, IoSourceExt, complete3};
/// const CHUNK_SIZE: usize = 32;
///
/// // Write all bytes from `data` to `f`.
/// async fn write_file(f: &dyn IoSourceExt<File>, mut data: Vec<u8>) -> AsyncResult<()> {
/// while data.len() > 0 {
/// let (count, mut buf) = f.write_from_vec(None, data).await?;
///
/// data = buf.split_off(count);
/// }
///
/// Ok(())
/// }
///
/// // Transfer `len` bytes of data from `from` to `to`.
/// async fn transfer_data(
/// from: Box<dyn IoSourceExt<File>>,
/// to: Box<dyn IoSourceExt<File>>,
/// len: usize,
/// ) -> AsyncResult<usize> {
/// let mut rem = len;
///
/// while rem > 0 {
/// let buf = vec![0u8; min(rem, CHUNK_SIZE)];
/// let (count, mut data) = from.read_to_vec(None, buf).await?;
///
/// if count == 0 {
/// // End of file. Return the number of bytes transferred.
/// return Ok(len - rem);
/// }
///
/// data.truncate(count);
/// write_file(&*to, data).await?;
///
/// rem = rem.saturating_sub(count);
/// }
///
/// Ok(len)
/// }
///
/// # fn do_it() -> Result<(), Box<dyn Error>> {
/// let ex = Executor::new()?;
///
/// let (rx, tx) = sys_util::pipe(true)?;
/// let zero = File::open("/dev/zero")?;
/// let zero_bytes = CHUNK_SIZE * 7;
/// let zero_to_pipe = transfer_data(
/// ex.async_from(zero)?,
/// ex.async_from(tx.try_clone()?)?,
/// zero_bytes,
/// );
///
/// let rand = File::open("/dev/urandom")?;
/// let rand_bytes = CHUNK_SIZE * 19;
/// let rand_to_pipe = transfer_data(ex.async_from(rand)?, ex.async_from(tx)?, rand_bytes);
///
/// let null = OpenOptions::new().write(true).open("/dev/null")?;
/// let null_bytes = zero_bytes + rand_bytes;
/// let pipe_to_null = transfer_data(ex.async_from(rx)?, ex.async_from(null)?, null_bytes);
///
/// ex.run_until(complete3(
/// async { assert_eq!(pipe_to_null.await.unwrap(), null_bytes) },
/// async { assert_eq!(zero_to_pipe.await.unwrap(), zero_bytes) },
/// async { assert_eq!(rand_to_pipe.await.unwrap(), rand_bytes) },
/// ))?;
///
/// # Ok(())
/// # }
///
/// # do_it().unwrap();
/// ```
#[derive(Clone)]
pub enum Executor {
Uring(URingExecutor),
Fd(FdExecutor),
}
impl Executor {
/// Create a new `Executor`.
pub fn new() -> AsyncResult<Self> {
if use_uring() {
Ok(URingExecutor::new().map(Executor::Uring)?)
} else {
Ok(FdExecutor::new()
.map(Executor::Fd)
.map_err(PollError::Executor)?)
}
}
/// Create a new `Box<dyn IoSourceExt<F>>` associated with `self`. Callers may then use the
/// returned `IoSourceExt` to directly start async operations without needing a separate
/// reference to the executor.
pub fn async_from<'a, F: IntoAsync + Send + 'a>(
&self,
f: F,
) -> AsyncResult<Box<dyn IoSourceExt<F> + Send + 'a>> {
match self {
Executor::Uring(ex) => async_uring_from(f, ex),
Executor::Fd(ex) => async_poll_from(f, ex),
}
}
/// Spawn a new future for this executor to run to completion. Callers may use the returned
/// `Task` to await on the result of `f`. Dropping the returned `Task` will cancel `f`,
/// preventing it from being polled again. To drop a `Task` without canceling the future
/// associated with it use `Task::detach`. To cancel a task gracefully and wait until it is
/// fully destroyed, use `Task::cancel`.
///
/// # Examples
///
/// ```
/// # use cros_async::AsyncResult;
/// # fn example_spawn() -> AsyncResult<()> {
/// # use std::thread;
///
/// # use cros_async::Executor;
/// use futures::executor::block_on;
///
/// # let ex = Executor::new()?;
///
/// # // Spawn a thread that runs the executor.
/// # let ex2 = ex.clone();
/// # thread::spawn(move || ex2.run());
///
/// let task = ex.spawn(async { 7 + 13 });
///
/// let result = block_on(task);
/// assert_eq!(result, 20);
/// # Ok(())
/// # }
///
/// # example_spawn().unwrap();
/// ```
pub fn spawn<F>(&self, f: F) -> Task<F::Output>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
match self {
Executor::Uring(ex) => ex.spawn(f),
Executor::Fd(ex) => ex.spawn(f),
}
}
/// Spawn a thread-local task for this executor to drive to completion. Like `spawn` but without
/// requiring `Send` on `F` or `F::Output`. This method should only be called from the same
/// thread where `run()` or `run_until()` is called.
///
/// # Panics
///
/// `Executor::run` and `Executor::run_util` will panic if they try to poll a future that was
/// added by calling `spawn_local` from a different thread.
///
/// # Examples
///
/// ```
/// # use cros_async::AsyncResult;
/// # fn example_spawn_local() -> AsyncResult<()> {
/// # use cros_async::Executor;
///
/// # let ex = Executor::new()?;
///
/// let task = ex.spawn_local(async { 7 + 13 });
///
/// let result = ex.run_until(task)?;
/// assert_eq!(result, 20);
/// # Ok(())
/// # }
///
/// # example_spawn_local().unwrap();
/// ```
pub fn spawn_local<F>(&self, f: F) -> Task<F::Output>
where
F: Future + 'static,
F::Output: 'static,
{
match self {
Executor::Uring(ex) => ex.spawn_local(f),
Executor::Fd(ex) => ex.spawn_local(f),
}
}
/// Run the provided closure on a dedicated thread where blocking is allowed.
///
/// Callers may `await` on the returned `Task` to wait for the result of `f`. Dropping or
/// canceling the returned `Task` may not cancel the operation if it was already started on a
/// worker thread.
///
/// # Panics
///
/// `await`ing the `Task` after the `Executor` is dropped will panic if the work was not already
/// completed.
///
/// # Examples
///
/// ```edition2018
/// # use cros_async::Executor;
///
/// # async fn do_it(ex: &Executor) {
/// let res = ex.spawn_blocking(move || {
/// // Do some CPU-intensive or blocking work here.
///
/// 42
/// }).await;
///
/// assert_eq!(res, 42);
/// # }
///
/// # let ex = Executor::new().unwrap();
/// # ex.run_until(do_it(&ex)).unwrap();
/// ```
pub fn spawn_blocking<F, R>(&self, f: F) -> Task<R>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
match self {
Executor::Uring(ex) => ex.spawn_blocking(f),
Executor::Fd(ex) => ex.spawn_blocking(f),
}
}
/// Run the executor indefinitely, driving all spawned futures to completion. This method will
/// block the current thread and only return in the case of an error.
///
/// # Panics
///
/// Once this method has been called on a thread, it may only be called on that thread from that
/// point on. Attempting to call it from another thread will panic.
///
/// # Examples
///
/// ```
/// # use cros_async::AsyncResult;
/// # fn example_run() -> AsyncResult<()> {
/// use std::thread;
///
/// use cros_async::Executor;
/// use futures::executor::block_on;
///
/// let ex = Executor::new()?;
///
/// // Spawn a thread that runs the executor.
/// let ex2 = ex.clone();
/// thread::spawn(move || ex2.run());
///
/// let task = ex.spawn(async { 7 + 13 });
///
/// let result = block_on(task);
/// assert_eq!(result, 20);
/// # Ok(())
/// # }
///
/// # example_run().unwrap();
/// ```
pub fn run(&self) -> AsyncResult<()> {
match self {
Executor::Uring(ex) => ex.run()?,
Executor::Fd(ex) => ex.run().map_err(PollError::Executor)?,
}
Ok(())
}
/// Drive all futures spawned in this executor until `f` completes. This method will block the
/// current thread only until `f` is complete and there may still be unfinished futures in the
/// executor.
///
/// # Panics
///
/// Once this method has been called on a thread, from then onwards it may only be called on
/// that thread. Attempting to call it from another thread will panic.
///
/// # Examples
///
/// ```
/// # use cros_async::AsyncResult;
/// # fn example_run_until() -> AsyncResult<()> {
/// use cros_async::Executor;
///
/// let ex = Executor::new()?;
///
/// let task = ex.spawn_local(async { 7 + 13 });
///
/// let result = ex.run_until(task)?;
/// assert_eq!(result, 20);
/// # Ok(())
/// # }
///
/// # example_run_until().unwrap();
/// ```
pub fn run_until<F: Future>(&self, f: F) -> AsyncResult<F::Output> {
match self {
Executor::Uring(ex) => Ok(ex.run_until(f)?),
Executor::Fd(ex) => Ok(ex.run_until(f).map_err(PollError::Executor)?),
}
}
}

View file

@ -0,0 +1,634 @@
// Copyright 2020 The Chromium OS Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
//! The executor runs all given futures to completion. Futures register wakers associated with file
//! descriptors. The wakers will be called when the FD becomes readable or writable depending on
//! the situation.
//!
//! `FdExecutor` is meant to be used with the `futures-rs` crate that provides combinators and
//! utility functions to combine futures.
use std::{
fs::File,
future::Future,
io, mem,
os::unix::io::{AsRawFd, FromRawFd, RawFd},
pin::Pin,
sync::{
atomic::{AtomicI32, Ordering},
Arc, Weak,
},
task::{Context, Poll, Waker},
};
use async_task::Task;
use futures::task::noop_waker;
use pin_utils::pin_mut;
use remain::sorted;
use slab::Slab;
use sync::Mutex;
use sys_util::{add_fd_flags, warn, EpollContext, EpollEvents, EventFd, WatchingEvents};
use thiserror::Error as ThisError;
use super::{
queue::RunnableQueue,
waker::{new_waker, WakerToken, WeakWake},
BlockingPool,
};
#[sorted]
#[derive(Debug, ThisError)]
pub enum Error {
/// Failed to clone the EventFd for waking the executor.
#[error("Failed to clone the EventFd for waking the executor: {0}")]
CloneEventFd(sys_util::Error),
/// Failed to create the EventFd for waking the executor.
#[error("Failed to create the EventFd for waking the executor: {0}")]
CreateEventFd(sys_util::Error),
/// Creating a context to wait on FDs failed.
#[error("An error creating the fd waiting context: {0}")]
CreatingContext(sys_util::Error),
/// Failed to copy the FD for the polling context.
#[error("Failed to copy the FD for the polling context: {0}")]
DuplicatingFd(sys_util::Error),
/// The Executor is gone.
#[error("The FDExecutor is gone")]
ExecutorGone,
/// PollContext failure.
#[error("PollContext failure: {0}")]
PollContextError(sys_util::Error),
/// An error occurred when setting the FD non-blocking.
#[error("An error occurred setting the FD non-blocking: {0}.")]
SettingNonBlocking(sys_util::Error),
/// Failed to submit the waker to the polling context.
#[error("An error adding to the Aio context: {0}")]
SubmittingWaker(sys_util::Error),
/// A Waker was canceled, but the operation isn't running.
#[error("Unknown waker")]
UnknownWaker,
}
pub type Result<T> = std::result::Result<T, Error>;
impl From<Error> for io::Error {
fn from(e: Error) -> Self {
use Error::*;
match e {
CloneEventFd(e) => e.into(),
CreateEventFd(e) => e.into(),
DuplicatingFd(e) => e.into(),
ExecutorGone => io::Error::new(io::ErrorKind::Other, e),
CreatingContext(e) => e.into(),
PollContextError(e) => e.into(),
SettingNonBlocking(e) => e.into(),
SubmittingWaker(e) => e.into(),
UnknownWaker => io::Error::new(io::ErrorKind::Other, e),
}
}
}
// A poll operation that has been submitted and is potentially being waited on.
struct OpData {
file: File,
waker: Option<Waker>,
}
// The current status of a submitted operation.
enum OpStatus {
Pending(OpData),
Completed,
}
// An IO source previously registered with an FdExecutor. Used to initiate asynchronous IO with the
// associated executor.
pub struct RegisteredSource<F> {
source: F,
ex: Weak<RawExecutor>,
}
impl<F: AsRawFd> RegisteredSource<F> {
// Start an asynchronous operation to wait for this source to become readable. The returned
// future will not be ready until the source is readable.
pub fn wait_readable(&self) -> Result<PendingOperation> {
let ex = self.ex.upgrade().ok_or(Error::ExecutorGone)?;
let token =
ex.add_operation(self.source.as_raw_fd(), WatchingEvents::empty().set_read())?;
Ok(PendingOperation {
token: Some(token),
ex: self.ex.clone(),
})
}
// Start an asynchronous operation to wait for this source to become writable. The returned
// future will not be ready until the source is writable.
pub fn wait_writable(&self) -> Result<PendingOperation> {
let ex = self.ex.upgrade().ok_or(Error::ExecutorGone)?;
let token =
ex.add_operation(self.source.as_raw_fd(), WatchingEvents::empty().set_write())?;
Ok(PendingOperation {
token: Some(token),
ex: self.ex.clone(),
})
}
}
impl<F> RegisteredSource<F> {
// Consume this RegisteredSource and return the inner IO source.
pub fn into_source(self) -> F {
self.source
}
}
impl<F> AsRef<F> for RegisteredSource<F> {
fn as_ref(&self) -> &F {
&self.source
}
}
impl<F> AsMut<F> for RegisteredSource<F> {
fn as_mut(&mut self) -> &mut F {
&mut self.source
}
}
/// A token returned from `add_operation` that can be used to cancel the waker before it completes.
/// Used to manage getting the result from the underlying executor for a completed operation.
/// Dropping a `PendingOperation` will get the result from the executor.
pub struct PendingOperation {
token: Option<WakerToken>,
ex: Weak<RawExecutor>,
}
impl Future for PendingOperation {
type Output = Result<()>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let token = self
.token
.as_ref()
.expect("PendingOperation polled after returning Poll::Ready");
if let Some(ex) = self.ex.upgrade() {
if ex.is_ready(token, cx) {
self.token = None;
Poll::Ready(Ok(()))
} else {
Poll::Pending
}
} else {
Poll::Ready(Err(Error::ExecutorGone))
}
}
}
impl Drop for PendingOperation {
fn drop(&mut self) {
if let Some(token) = self.token.take() {
if let Some(ex) = self.ex.upgrade() {
let _ = ex.cancel_operation(token);
}
}
}
}
// This function exists to guarantee that non-epoll futures will not starve until an epoll future is
// ready to be polled again. The mechanism is very similar to the self-pipe trick used by C programs
// to reliably mix select / poll with signal handling. This is how it works:
//
// * RawExecutor::new creates an eventfd, dupes it, and spawns this async function with the duped fd.
// * The first time notify_task is polled it tries to read from the eventfd and if that fails, waits
// for the fd to become readable.
// * Meanwhile the RawExecutor keeps the original fd for the eventfd.
// * Whenever RawExecutor::wake is called it will write to the eventfd if it determines that the
// executor thread is currently blocked inside an io_epoll_enter call. This can happen when a
// non-epoll future becomes ready to poll.
// * The write to the eventfd causes the fd to become readable, which then allows the epoll() call
// to return with at least one readable fd.
// * The executor then polls the non-epoll future that became ready, any epoll futures that
// completed, and the notify_task function, which then queues up another read on the eventfd and
// the process can repeat.
async fn notify_task(notify: EventFd, raw: Weak<RawExecutor>) {
add_fd_flags(notify.as_raw_fd(), libc::O_NONBLOCK)
.expect("Failed to set notify EventFd as non-blocking");
loop {
match notify.read() {
Ok(_) => {}
Err(e) if e.errno() == libc::EWOULDBLOCK => {}
Err(e) => panic!("Unexpected error while reading notify EventFd: {}", e),
}
if let Some(ex) = raw.upgrade() {
let token = ex
.add_operation(notify.as_raw_fd(), WatchingEvents::empty().set_read())
.expect("Failed to add notify EventFd to PollCtx");
// We don't want to hold an active reference to the executor in the .await below.
mem::drop(ex);
let op = PendingOperation {
token: Some(token),
ex: raw.clone(),
};
match op.await {
Ok(()) => {}
Err(Error::ExecutorGone) => break,
Err(e) => panic!("Unexpected error while waiting for notify EventFd: {}", e),
}
} else {
// The executor is gone so we should also exit.
break;
}
}
}
// Indicates that the executor is either within or about to make a PollContext::wait() call. When a
// waker sees this value, it will write to the notify EventFd, which will cause the
// PollContext::wait() call to return.
const WAITING: i32 = 0x1d5b_c019u32 as i32;
// Indicates that the executor is processing any futures that are ready to run.
const PROCESSING: i32 = 0xd474_77bcu32 as i32;
// Indicates that one or more futures may be ready to make progress.
const WOKEN: i32 = 0x3e4d_3276u32 as i32;
struct RawExecutor {
queue: RunnableQueue,
poll_ctx: EpollContext<usize>,
ops: Mutex<Slab<OpStatus>>,
blocking_pool: BlockingPool,
state: AtomicI32,
notify: EventFd,
}
impl RawExecutor {
fn new(notify: EventFd) -> Result<Self> {
Ok(RawExecutor {
queue: RunnableQueue::new(),
poll_ctx: EpollContext::new().map_err(Error::CreatingContext)?,
ops: Mutex::new(Slab::with_capacity(64)),
blocking_pool: Default::default(),
state: AtomicI32::new(PROCESSING),
notify,
})
}
fn add_operation(&self, fd: RawFd, events: WatchingEvents) -> Result<WakerToken> {
let duped_fd = unsafe {
// Safe because duplicating an FD doesn't affect memory safety, and the dup'd FD
// will only be added to the poll loop.
File::from_raw_fd(dup_fd(fd)?)
};
let mut ops = self.ops.lock();
let entry = ops.vacant_entry();
let next_token = entry.key();
self.poll_ctx
.add_fd_with_events(&duped_fd, events, next_token)
.map_err(Error::SubmittingWaker)?;
entry.insert(OpStatus::Pending(OpData {
file: duped_fd,
waker: None,
}));
Ok(WakerToken(next_token))
}
fn wake(&self) {
let oldstate = self.state.swap(WOKEN, Ordering::Release);
if oldstate == WAITING {
if let Err(e) = self.notify.write(1) {
warn!("Failed to notify executor that a future is ready: {}", e);
}
}
}
fn spawn<F>(self: &Arc<Self>, f: F) -> Task<F::Output>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
let raw = Arc::downgrade(self);
let schedule = move |runnable| {
if let Some(r) = raw.upgrade() {
r.queue.push_back(runnable);
r.wake();
}
};
let (runnable, task) = async_task::spawn(f, schedule);
runnable.schedule();
task
}
fn spawn_local<F>(self: &Arc<Self>, f: F) -> Task<F::Output>
where
F: Future + 'static,
F::Output: 'static,
{
let raw = Arc::downgrade(self);
let schedule = move |runnable| {
if let Some(r) = raw.upgrade() {
r.queue.push_back(runnable);
r.wake();
}
};
let (runnable, task) = async_task::spawn_local(f, schedule);
runnable.schedule();
task
}
fn spawn_blocking<F, R>(self: &Arc<Self>, f: F) -> Task<R>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
self.blocking_pool.spawn(f)
}
fn run<F: Future>(&self, cx: &mut Context, done: F) -> Result<F::Output> {
let events = EpollEvents::new();
pin_mut!(done);
loop {
self.state.store(PROCESSING, Ordering::Release);
for runnable in self.queue.iter() {
runnable.run();
}
if let Poll::Ready(val) = done.as_mut().poll(cx) {
return Ok(val);
}
let oldstate = self.state.compare_exchange(
PROCESSING,
WAITING,
Ordering::Acquire,
Ordering::Acquire,
);
if let Err(oldstate) = oldstate {
debug_assert_eq!(oldstate, WOKEN);
// One or more futures have become runnable.
continue;
}
let events = self
.poll_ctx
.wait(&events)
.map_err(Error::PollContextError)?;
// Set the state back to PROCESSING to prevent any tasks woken up by the loop below from
// writing to the eventfd.
self.state.store(PROCESSING, Ordering::Release);
for e in events.iter() {
let token = e.token();
let mut ops = self.ops.lock();
// The op could have been canceled and removed by another thread so ignore it if it
// doesn't exist.
if let Some(op) = ops.get_mut(token) {
let (file, waker) = match mem::replace(op, OpStatus::Completed) {
OpStatus::Pending(OpData { file, waker }) => (file, waker),
OpStatus::Completed => panic!("poll operation completed more than once"),
};
mem::drop(ops);
self.poll_ctx
.delete(&file)
.map_err(Error::PollContextError)?;
if let Some(waker) = waker {
waker.wake();
}
}
}
}
}
fn is_ready(&self, token: &WakerToken, cx: &mut Context) -> bool {
let mut ops = self.ops.lock();
let op = ops
.get_mut(token.0)
.expect("`is_ready` called on unknown operation");
match op {
OpStatus::Pending(data) => {
data.waker = Some(cx.waker().clone());
false
}
OpStatus::Completed => {
ops.remove(token.0);
true
}
}
}
// Remove the waker for the given token if it hasn't fired yet.
fn cancel_operation(&self, token: WakerToken) -> Result<()> {
match self.ops.lock().remove(token.0) {
OpStatus::Pending(data) => self
.poll_ctx
.delete(&data.file)
.map_err(Error::PollContextError),
OpStatus::Completed => Ok(()),
}
}
}
impl WeakWake for RawExecutor {
fn wake_by_ref(weak_self: &Weak<Self>) {
if let Some(arc_self) = weak_self.upgrade() {
RawExecutor::wake(&arc_self);
}
}
}
impl Drop for RawExecutor {
fn drop(&mut self) {
// Wake up the notify_task. We set the state to WAITING here so that wake() will write to
// the eventfd.
self.state.store(WAITING, Ordering::Release);
self.wake();
// Wake up any futures still waiting on poll operations as they are just going to get an
// ExecutorGone error now.
for op in self.ops.get_mut().drain() {
match op {
OpStatus::Pending(mut data) => {
if let Some(waker) = data.waker.take() {
waker.wake();
}
if let Err(e) = self.poll_ctx.delete(&data.file) {
warn!("Failed to remove file from EpollCtx: {}", e);
}
}
OpStatus::Completed => {}
}
}
// Now run the executor one more time to drive any remaining futures to completion.
let waker = noop_waker();
let mut cx = Context::from_waker(&waker);
if let Err(e) = self.run(&mut cx, async {}) {
warn!("Failed to drive FdExecutor to completion: {}", e);
}
}
}
#[derive(Clone)]
pub struct FdExecutor {
raw: Arc<RawExecutor>,
}
impl FdExecutor {
pub fn new() -> Result<FdExecutor> {
let notify = EventFd::new().map_err(Error::CreateEventFd)?;
let raw = notify
.try_clone()
.map_err(Error::CloneEventFd)
.and_then(RawExecutor::new)
.map(Arc::new)?;
raw.spawn(notify_task(notify, Arc::downgrade(&raw)))
.detach();
Ok(FdExecutor { raw })
}
pub fn spawn<F>(&self, f: F) -> Task<F::Output>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
self.raw.spawn(f)
}
pub fn spawn_local<F>(&self, f: F) -> Task<F::Output>
where
F: Future + 'static,
F::Output: 'static,
{
self.raw.spawn_local(f)
}
pub fn spawn_blocking<F, R>(&self, f: F) -> Task<R>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
self.raw.spawn_blocking(f)
}
pub fn run(&self) -> Result<()> {
let waker = new_waker(Arc::downgrade(&self.raw));
let mut cx = Context::from_waker(&waker);
self.raw.run(&mut cx, super::empty::<()>())
}
pub fn run_until<F: Future>(&self, f: F) -> Result<F::Output> {
let waker = new_waker(Arc::downgrade(&self.raw));
let mut ctx = Context::from_waker(&waker);
self.raw.run(&mut ctx, f)
}
pub(crate) fn register_source<F: AsRawFd>(&self, f: F) -> Result<RegisteredSource<F>> {
add_fd_flags(f.as_raw_fd(), libc::O_NONBLOCK).map_err(Error::SettingNonBlocking)?;
Ok(RegisteredSource {
source: f,
ex: Arc::downgrade(&self.raw),
})
}
}
// Used to `dup` the FDs passed to the executor so there is a guarantee they aren't closed while
// waiting in TLS to be added to the main polling context.
unsafe fn dup_fd(fd: RawFd) -> Result<RawFd> {
let ret = libc::fcntl(fd, libc::F_DUPFD_CLOEXEC, 0);
if ret < 0 {
Err(Error::DuplicatingFd(sys_util::Error::last()))
} else {
Ok(ret)
}
}
#[cfg(test)]
mod test {
use std::{
cell::RefCell,
io::{Read, Write},
rc::Rc,
};
use futures::future::Either;
use super::*;
#[test]
fn test_it() {
async fn do_test(ex: &FdExecutor) {
let (r, _w) = sys_util::pipe(true).unwrap();
let done = Box::pin(async { 5usize });
let source = ex.register_source(r).unwrap();
let pending = source.wait_readable().unwrap();
match futures::future::select(pending, done).await {
Either::Right((5, pending)) => std::mem::drop(pending),
_ => panic!("unexpected select result"),
}
}
let ex = FdExecutor::new().unwrap();
ex.run_until(do_test(&ex)).unwrap();
// Example of starting the framework and running a future:
async fn my_async(x: Rc<RefCell<u64>>) {
x.replace(4);
}
let x = Rc::new(RefCell::new(0));
super::super::run_one_poll(my_async(x.clone())).unwrap();
assert_eq!(*x.borrow(), 4);
}
#[test]
fn drop_before_completion() {
const VALUE: u64 = 0x66ae_cb65_12fb_d260;
async fn write_value(mut tx: File) {
let buf = VALUE.to_ne_bytes();
tx.write_all(&buf[..]).expect("Failed to write to pipe");
}
async fn check_op(op: PendingOperation) {
let err = op.await.expect_err("Task completed successfully");
match err {
Error::ExecutorGone => {}
e => panic!("Unexpected error from task: {}", e),
}
}
let (mut rx, tx) = sys_util::pipe(true).expect("Pipe failed");
let ex = FdExecutor::new().unwrap();
let source = ex.register_source(tx.try_clone().unwrap()).unwrap();
let op = source.wait_writable().unwrap();
ex.spawn_local(write_value(tx)).detach();
ex.spawn_local(check_op(op)).detach();
// Now drop the executor. It should still run until the write to the pipe is complete.
mem::drop(ex);
let mut buf = 0u64.to_ne_bytes();
rx.read_exact(&mut buf[..])
.expect("Failed to read from pipe");
assert_eq!(u64::from_ne_bytes(buf), VALUE);
}
}

479
cros_async/src/io_ext.rs Normal file
View file

@ -0,0 +1,479 @@
// Copyright 2020 The Chromium OS Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
//! # `IoSourceExt`
//!
//! User functions to asynchronously access files.
//! Using `IoSource` directly is inconvenient and requires dealing with state
//! machines for the backing uring, future libraries, etc. `IoSourceExt` instead
//! provides users with a future that can be `await`ed from async context.
//!
//! Each member of `IoSourceExt` returns a future for the supported operation. One or more
//! operation can be pending at a time.
//!
//! Operations can only access memory in a `Vec` or an implementor of `BackingMemory`. See the
//! `URingExecutor` documentation for an explaination of why.
use std::{
fs::File,
io,
ops::{Deref, DerefMut},
os::unix::io::{AsRawFd, RawFd},
sync::Arc,
};
use async_trait::async_trait;
use remain::sorted;
use sys_util::net::UnixSeqpacket;
use thiserror::Error as ThisError;
use super::{BackingMemory, MemRegion};
#[sorted]
#[derive(ThisError, Debug)]
pub enum Error {
/// An error with a polled(FD) source.
#[error("An error with a poll source: {0}")]
Poll(#[from] super::poll_source::Error),
/// An error with a uring source.
#[error("An error with a uring source: {0}")]
Uring(#[from] super::uring_executor::Error),
}
pub type Result<T> = std::result::Result<T, Error>;
impl From<Error> for io::Error {
fn from(e: Error) -> Self {
use Error::*;
match e {
Poll(e) => e.into(),
Uring(e) => e.into(),
}
}
}
/// Ergonomic methods for async reads.
#[async_trait(?Send)]
pub trait ReadAsync {
/// Reads from the iosource at `file_offset` and fill the given `vec`.
async fn read_to_vec<'a>(
&'a self,
file_offset: Option<u64>,
vec: Vec<u8>,
) -> Result<(usize, Vec<u8>)>;
/// Reads to the given `mem` at the given offsets from the file starting at `file_offset`.
async fn read_to_mem<'a>(
&'a self,
file_offset: Option<u64>,
mem: Arc<dyn BackingMemory + Send + Sync>,
mem_offsets: &'a [MemRegion],
) -> Result<usize>;
/// Wait for the FD of `self` to be readable.
async fn wait_readable(&self) -> Result<()>;
/// Reads a single u64 from the current offset.
async fn read_u64(&self) -> Result<u64>;
}
/// Ergonomic methods for async writes.
#[async_trait(?Send)]
pub trait WriteAsync {
/// Writes from the given `vec` to the file starting at `file_offset`.
async fn write_from_vec<'a>(
&'a self,
file_offset: Option<u64>,
vec: Vec<u8>,
) -> Result<(usize, Vec<u8>)>;
/// Writes from the given `mem` from the given offsets to the file starting at `file_offset`.
async fn write_from_mem<'a>(
&'a self,
file_offset: Option<u64>,
mem: Arc<dyn BackingMemory + Send + Sync>,
mem_offsets: &'a [MemRegion],
) -> Result<usize>;
/// See `fallocate(2)`. Note this op is synchronous when using the Polled backend.
async fn fallocate(&self, file_offset: u64, len: u64, mode: u32) -> Result<()>;
/// Sync all completed write operations to the backing storage.
async fn fsync(&self) -> Result<()>;
}
/// Subtrait for general async IO.
#[async_trait(?Send)]
pub trait IoSourceExt<F>: ReadAsync + WriteAsync {
/// Yields the underlying IO source.
fn into_source(self: Box<Self>) -> F;
/// Provides a mutable ref to the underlying IO source.
fn as_source_mut(&mut self) -> &mut F;
/// Provides a ref to the underlying IO source.
fn as_source(&self) -> &F;
}
/// Marker trait signifying that the implementor is suitable for use with
/// cros_async. Examples of this include File, and sys_util::net::UnixSeqpacket.
///
/// (Note: it'd be really nice to implement a TryFrom for any implementors, and
/// remove our factory functions. Unfortunately
/// <https://github.com/rust-lang/rust/issues/50133> makes that too painful.)
pub trait IntoAsync: AsRawFd {}
impl IntoAsync for File {}
impl IntoAsync for UnixSeqpacket {}
impl IntoAsync for &UnixSeqpacket {}
/// Simple wrapper struct to implement IntoAsync on foreign types.
pub struct AsyncWrapper<T>(T);
impl<T> AsyncWrapper<T> {
/// Create a new `AsyncWrapper` that wraps `val`.
pub fn new(val: T) -> Self {
AsyncWrapper(val)
}
/// Consumes the `AsyncWrapper`, returning the inner struct.
pub fn into_inner(self) -> T {
self.0
}
}
impl<T> Deref for AsyncWrapper<T> {
type Target = T;
fn deref(&self) -> &T {
&self.0
}
}
impl<T> DerefMut for AsyncWrapper<T> {
fn deref_mut(&mut self) -> &mut T {
&mut self.0
}
}
impl<T: AsRawFd> AsRawFd for AsyncWrapper<T> {
fn as_raw_fd(&self) -> RawFd {
self.0.as_raw_fd()
}
}
impl<T: AsRawFd> IntoAsync for AsyncWrapper<T> {}
#[cfg(test)]
mod tests {
use std::{
fs::{File, OpenOptions},
future::Future,
os::unix::io::AsRawFd,
pin::Pin,
sync::Arc,
task::{Context, Poll, Waker},
thread,
};
use sync::Mutex;
use super::{
super::{
executor::{async_poll_from, async_uring_from},
mem::VecIoWrapper,
uring_executor::use_uring,
Executor, FdExecutor, MemRegion, PollSource, URingExecutor, UringSource,
},
*,
};
struct State {
should_quit: bool,
waker: Option<Waker>,
}
impl State {
fn wake(&mut self) {
self.should_quit = true;
let waker = self.waker.take();
if let Some(waker) = waker {
waker.wake();
}
}
}
struct Quit {
state: Arc<Mutex<State>>,
}
impl Future for Quit {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<()> {
let mut state = self.state.lock();
if state.should_quit {
return Poll::Ready(());
}
state.waker = Some(cx.waker().clone());
Poll::Pending
}
}
#[test]
fn await_uring_from_poll() {
if !use_uring() {
return;
}
// Start a uring operation and then await the result from an FdExecutor.
async fn go(source: UringSource<File>) {
let v = vec![0xa4u8; 16];
let (len, vec) = source.read_to_vec(None, v).await.unwrap();
assert_eq!(len, 16);
assert!(vec.iter().all(|&b| b == 0));
}
let state = Arc::new(Mutex::new(State {
should_quit: false,
waker: None,
}));
let uring_ex = URingExecutor::new().unwrap();
let f = File::open("/dev/zero").unwrap();
let source = UringSource::new(f, &uring_ex).unwrap();
let quit = Quit {
state: state.clone(),
};
let handle = thread::spawn(move || uring_ex.run_until(quit));
let poll_ex = FdExecutor::new().unwrap();
poll_ex.run_until(go(source)).unwrap();
state.lock().wake();
handle.join().unwrap().unwrap();
}
#[test]
fn await_poll_from_uring() {
if !use_uring() {
return;
}
// Start a poll operation and then await the result from a URingExecutor.
async fn go(source: PollSource<File>) {
let v = vec![0x2cu8; 16];
let (len, vec) = source.read_to_vec(None, v).await.unwrap();
assert_eq!(len, 16);
assert!(vec.iter().all(|&b| b == 0));
}
let state = Arc::new(Mutex::new(State {
should_quit: false,
waker: None,
}));
let poll_ex = FdExecutor::new().unwrap();
let f = File::open("/dev/zero").unwrap();
let source = PollSource::new(f, &poll_ex).unwrap();
let quit = Quit {
state: state.clone(),
};
let handle = thread::spawn(move || poll_ex.run_until(quit));
let uring_ex = URingExecutor::new().unwrap();
uring_ex.run_until(go(source)).unwrap();
state.lock().wake();
handle.join().unwrap().unwrap();
}
#[test]
fn readvec() {
if !use_uring() {
return;
}
async fn go<F: AsRawFd>(async_source: Box<dyn IoSourceExt<F>>) {
let v = vec![0x55u8; 32];
let v_ptr = v.as_ptr();
let ret = async_source.read_to_vec(None, v).await.unwrap();
assert_eq!(ret.0, 32);
let ret_v = ret.1;
assert_eq!(v_ptr, ret_v.as_ptr());
assert!(ret_v.iter().all(|&b| b == 0));
}
let f = File::open("/dev/zero").unwrap();
let uring_ex = URingExecutor::new().unwrap();
let uring_source = async_uring_from(f, &uring_ex).unwrap();
uring_ex.run_until(go(uring_source)).unwrap();
let f = File::open("/dev/zero").unwrap();
let poll_ex = FdExecutor::new().unwrap();
let poll_source = async_poll_from(f, &poll_ex).unwrap();
poll_ex.run_until(go(poll_source)).unwrap();
}
#[test]
fn writevec() {
if !use_uring() {
return;
}
async fn go<F: AsRawFd>(async_source: Box<dyn IoSourceExt<F>>) {
let v = vec![0x55u8; 32];
let v_ptr = v.as_ptr();
let ret = async_source.write_from_vec(None, v).await.unwrap();
assert_eq!(ret.0, 32);
let ret_v = ret.1;
assert_eq!(v_ptr, ret_v.as_ptr());
}
let f = OpenOptions::new().write(true).open("/dev/null").unwrap();
let ex = URingExecutor::new().unwrap();
let uring_source = async_uring_from(f, &ex).unwrap();
ex.run_until(go(uring_source)).unwrap();
let f = OpenOptions::new().write(true).open("/dev/null").unwrap();
let poll_ex = FdExecutor::new().unwrap();
let poll_source = async_poll_from(f, &poll_ex).unwrap();
poll_ex.run_until(go(poll_source)).unwrap();
}
#[test]
fn readmem() {
if !use_uring() {
return;
}
async fn go<F: AsRawFd>(async_source: Box<dyn IoSourceExt<F>>) {
let mem = Arc::new(VecIoWrapper::from(vec![0x55u8; 8192]));
let ret = async_source
.read_to_mem(
None,
Arc::<VecIoWrapper>::clone(&mem),
&[
MemRegion { offset: 0, len: 32 },
MemRegion {
offset: 200,
len: 56,
},
],
)
.await
.unwrap();
assert_eq!(ret, 32 + 56);
let vec: Vec<u8> = match Arc::try_unwrap(mem) {
Ok(v) => v.into(),
Err(_) => panic!("Too many vec refs"),
};
assert!(vec.iter().take(32).all(|&b| b == 0));
assert!(vec.iter().skip(32).take(168).all(|&b| b == 0x55));
assert!(vec.iter().skip(200).take(56).all(|&b| b == 0));
assert!(vec.iter().skip(256).all(|&b| b == 0x55));
}
let f = File::open("/dev/zero").unwrap();
let ex = URingExecutor::new().unwrap();
let uring_source = async_uring_from(f, &ex).unwrap();
ex.run_until(go(uring_source)).unwrap();
let f = File::open("/dev/zero").unwrap();
let poll_ex = FdExecutor::new().unwrap();
let poll_source = async_poll_from(f, &poll_ex).unwrap();
poll_ex.run_until(go(poll_source)).unwrap();
}
#[test]
fn writemem() {
if !use_uring() {
return;
}
async fn go<F: AsRawFd>(async_source: Box<dyn IoSourceExt<F>>) {
let mem = Arc::new(VecIoWrapper::from(vec![0x55u8; 8192]));
let ret = async_source
.write_from_mem(
None,
Arc::<VecIoWrapper>::clone(&mem),
&[MemRegion { offset: 0, len: 32 }],
)
.await
.unwrap();
assert_eq!(ret, 32);
}
let f = OpenOptions::new().write(true).open("/dev/null").unwrap();
let ex = URingExecutor::new().unwrap();
let uring_source = async_uring_from(f, &ex).unwrap();
ex.run_until(go(uring_source)).unwrap();
let f = OpenOptions::new().write(true).open("/dev/null").unwrap();
let poll_ex = FdExecutor::new().unwrap();
let poll_source = async_poll_from(f, &poll_ex).unwrap();
poll_ex.run_until(go(poll_source)).unwrap();
}
#[test]
fn read_u64s() {
if !use_uring() {
return;
}
async fn go(async_source: File, ex: URingExecutor) -> u64 {
let source = async_uring_from(async_source, &ex).unwrap();
source.read_u64().await.unwrap()
}
let f = File::open("/dev/zero").unwrap();
let ex = URingExecutor::new().unwrap();
let val = ex.run_until(go(f, ex.clone())).unwrap();
assert_eq!(val, 0);
}
#[test]
fn read_eventfds() {
if !use_uring() {
return;
}
use sys_util::EventFd;
async fn go<F: AsRawFd>(source: Box<dyn IoSourceExt<F>>) -> u64 {
source.read_u64().await.unwrap()
}
let eventfd = EventFd::new().unwrap();
eventfd.write(0x55).unwrap();
let ex = URingExecutor::new().unwrap();
let uring_source = async_uring_from(eventfd, &ex).unwrap();
let val = ex.run_until(go(uring_source)).unwrap();
assert_eq!(val, 0x55);
let eventfd = EventFd::new().unwrap();
eventfd.write(0xaa).unwrap();
let poll_ex = FdExecutor::new().unwrap();
let poll_source = async_poll_from(eventfd, &poll_ex).unwrap();
let val = poll_ex.run_until(go(poll_source)).unwrap();
assert_eq!(val, 0xaa);
}
#[test]
fn fsync() {
if !use_uring() {
return;
}
async fn go<F: AsRawFd>(source: Box<dyn IoSourceExt<F>>) {
let v = vec![0x55u8; 32];
let v_ptr = v.as_ptr();
let ret = source.write_from_vec(None, v).await.unwrap();
assert_eq!(ret.0, 32);
let ret_v = ret.1;
assert_eq!(v_ptr, ret_v.as_ptr());
source.fsync().await.unwrap();
}
let f = tempfile::tempfile().unwrap();
let ex = Executor::new().unwrap();
let source = ex.async_from(f).unwrap();
ex.run_until(go(source)).unwrap();
}
}

540
cros_async/src/lib.rs Normal file
View file

@ -0,0 +1,540 @@
// Copyright 2020 The Chromium OS Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
//! An Executor and future combinators based on operations that block on file descriptors.
//!
//! This crate is meant to be used with the `futures-rs` crate that provides further combinators
//! and utility functions to combine and manage futures. All futures will run until they block on a
//! file descriptor becoming readable or writable. Facilities are provided to register future
//! wakers based on such events.
//!
//! # Running top-level futures.
//!
//! Use helper functions based the desired behavior of your application.
//!
//! ## Running one future.
//!
//! If there is only one top-level future to run, use the [`run_one`](fn.run_one.html) function.
//!
//! ## Completing one of several futures.
//!
//! If there are several top level tasks that should run until any one completes, use the "select"
//! family of executor constructors. These return an [`Executor`](trait.Executor.html) whose `run`
//! function will return when the first future completes. The uncompleted futures will also be
//! returned so they can be run further or otherwise cleaned up. These functions are inspired by
//! the `select_all` function from futures-rs, but built to be run inside an FD based executor and
//! to poll only when necessary. See the docs for [`select2`](fn.select2.html),
//! [`select3`](fn.select3.html), [`select4`](fn.select4.html), and [`select5`](fn.select5.html).
//!
//! ## Completing all of several futures.
//!
//! If there are several top level tasks that all need to be completed, use the "complete" family
//! of executor constructors. These return an [`Executor`](trait.Executor.html) whose `run`
//! function will return only once all the futures passed to it have completed. These functions are
//! inspired by the `join_all` function from futures-rs, but built to be run inside an FD based
//! executor and to poll only when necessary. See the docs for [`complete2`](fn.complete2.html),
//! [`complete3`](fn.complete3.html), [`complete4`](fn.complete4.html), and
//! [`complete5`](fn.complete5.html).
//!
//! # Implementing new FD-based futures.
//!
//! For URing implementations should provide an implementation of the `IoSource` trait.
//! For the FD executor, new futures can use the existing ability to poll a source to build async
//! functionality on top of.
//!
//! # Implementations
//!
//! Currently there are two paths for using the asynchronous IO. One uses a PollContext and drivers
//! futures based on the FDs signaling they are ready for the opteration. This method will exist so
//! long as kernels < 5.4 are supported.
//! The other method submits operations to io_uring and is signaled when they complete. This is more
//! efficient, but only supported on kernel 5.4+.
//! If `IoSourceExt::new` is used to interface with async IO, then the correct backend will be chosen
//! automatically.
//!
//! # Examples
//!
//! See the docs for `IoSourceExt` if support for kernels <5.4 is required. Focus on `UringSource` if
//! all systems have support for io_uring.
pub mod audio_streams_async;
mod blocking;
mod complete;
mod event;
mod executor;
mod fd_executor;
mod io_ext;
pub mod mem;
mod poll_source;
mod queue;
mod select;
pub mod sync;
mod timer;
mod uring_executor;
mod uring_source;
mod waker;
pub use blocking::{block_on, BlockingPool};
pub use event::EventAsync;
pub use executor::Executor;
pub use fd_executor::FdExecutor;
pub use io_ext::{
AsyncWrapper, Error as AsyncError, IntoAsync, IoSourceExt, ReadAsync, Result as AsyncResult,
WriteAsync,
};
pub use mem::{BackingMemory, MemRegion};
pub use poll_source::PollSource;
pub use select::SelectResult;
pub use sys_util;
pub use timer::TimerAsync;
pub use uring_executor::URingExecutor;
pub use uring_source::UringSource;
use std::{
future::Future,
io,
marker::PhantomData,
pin::Pin,
task::{Context, Poll},
};
use remain::sorted;
use thiserror::Error as ThisError;
#[sorted]
#[derive(ThisError, Debug)]
pub enum Error {
/// Error from the FD executor.
#[error("Failure in the FD executor: {0}")]
FdExecutor(fd_executor::Error),
/// Error from TimerFd.
#[error("Failure in TimerAsync: {0}")]
TimerAsync(AsyncError),
/// Error from TimerFd.
#[error("Failure in TimerFd: {0}")]
TimerFd(sys_util::Error),
/// Error from the uring executor.
#[error("Failure in the uring executor: {0}")]
URingExecutor(uring_executor::Error),
}
pub type Result<T> = std::result::Result<T, Error>;
impl From<Error> for io::Error {
fn from(e: Error) -> Self {
use Error::*;
match e {
FdExecutor(e) => e.into(),
URingExecutor(e) => e.into(),
TimerFd(e) => e.into(),
TimerAsync(e) => e.into(),
}
}
}
// A Future that never completes.
pub struct Empty<T> {
phantom: PhantomData<T>,
}
impl<T> Future for Empty<T> {
type Output = T;
fn poll(self: Pin<&mut Self>, _: &mut Context) -> Poll<T> {
Poll::Pending
}
}
pub fn empty<T>() -> Empty<T> {
Empty {
phantom: PhantomData,
}
}
/// Creates an Executor that runs one future to completion.
///
/// # Example
///
/// ```
/// use cros_async::run_one;
///
/// let fut = async { 55 };
/// assert_eq!(55, run_one(fut).unwrap());
/// ```
pub fn run_one<F: Future>(fut: F) -> Result<F::Output> {
if uring_executor::use_uring() {
run_one_uring(fut)
} else {
run_one_poll(fut)
}
}
/// Creates a URingExecutor that runs one future to completion.
///
/// # Example
///
/// ```
/// use cros_async::run_one_uring;
///
/// let fut = async { 55 };
/// assert_eq!(55, run_one_uring(fut).unwrap());
/// ```
pub fn run_one_uring<F: Future>(fut: F) -> Result<F::Output> {
URingExecutor::new()
.and_then(|ex| ex.run_until(fut))
.map_err(Error::URingExecutor)
}
/// Creates a FdExecutor that runs one future to completion.
///
/// # Example
///
/// ```
/// use cros_async::run_one_poll;
///
/// let fut = async { 55 };
/// assert_eq!(55, run_one_poll(fut).unwrap());
/// ```
pub fn run_one_poll<F: Future>(fut: F) -> Result<F::Output> {
FdExecutor::new()
.and_then(|ex| ex.run_until(fut))
.map_err(Error::FdExecutor)
}
// Select helpers to run until any future completes.
/// Creates a combinator that runs the two given futures until one completes, returning a tuple
/// containing the result of the finished future and the still pending future.
///
/// # Example
///
/// ```
/// use cros_async::{SelectResult, select2, run_one};
/// use futures::future::pending;
/// use futures::pin_mut;
///
/// let first = async {5};
/// let second = async {let () = pending().await;};
/// pin_mut!(first);
/// pin_mut!(second);
/// match run_one(select2(first, second)) {
/// Ok((SelectResult::Finished(5), SelectResult::Pending(_second))) => (),
/// _ => panic!("Select didn't return the first future"),
/// };
/// ```
pub async fn select2<F1: Future + Unpin, F2: Future + Unpin>(
f1: F1,
f2: F2,
) -> (SelectResult<F1>, SelectResult<F2>) {
select::Select2::new(f1, f2).await
}
/// Creates a combinator that runs the three given futures until one or more completes, returning a
/// tuple containing the result of the finished future(s) and the still pending future(s).
///
/// # Example
///
/// ```
/// use cros_async::{SelectResult, select3, run_one};
/// use futures::future::pending;
/// use futures::pin_mut;
///
/// let first = async {4};
/// let second = async {let () = pending().await;};
/// let third = async {5};
/// pin_mut!(first);
/// pin_mut!(second);
/// pin_mut!(third);
/// match run_one(select3(first, second, third)) {
/// Ok((SelectResult::Finished(4),
/// SelectResult::Pending(_second),
/// SelectResult::Finished(5))) => (),
/// _ => panic!("Select didn't return the futures"),
/// };
/// ```
pub async fn select3<F1: Future + Unpin, F2: Future + Unpin, F3: Future + Unpin>(
f1: F1,
f2: F2,
f3: F3,
) -> (SelectResult<F1>, SelectResult<F2>, SelectResult<F3>) {
select::Select3::new(f1, f2, f3).await
}
/// Creates a combinator that runs the four given futures until one or more completes, returning a
/// tuple containing the result of the finished future(s) and the still pending future(s).
///
/// # Example
///
/// ```
/// use cros_async::{SelectResult, select4, run_one};
/// use futures::future::pending;
/// use futures::pin_mut;
///
/// let first = async {4};
/// let second = async {let () = pending().await;};
/// let third = async {5};
/// let fourth = async {let () = pending().await;};
/// pin_mut!(first);
/// pin_mut!(second);
/// pin_mut!(third);
/// pin_mut!(fourth);
/// match run_one(select4(first, second, third, fourth)) {
/// Ok((SelectResult::Finished(4), SelectResult::Pending(_second),
/// SelectResult::Finished(5), SelectResult::Pending(_fourth))) => (),
/// _ => panic!("Select didn't return the futures"),
/// };
/// ```
pub async fn select4<
F1: Future + Unpin,
F2: Future + Unpin,
F3: Future + Unpin,
F4: Future + Unpin,
>(
f1: F1,
f2: F2,
f3: F3,
f4: F4,
) -> (
SelectResult<F1>,
SelectResult<F2>,
SelectResult<F3>,
SelectResult<F4>,
) {
select::Select4::new(f1, f2, f3, f4).await
}
/// Creates a combinator that runs the five given futures until one or more completes, returning a
/// tuple containing the result of the finished future(s) and the still pending future(s).
///
/// # Example
///
/// ```
/// use cros_async::{SelectResult, select5, run_one};
/// use futures::future::pending;
/// use futures::pin_mut;
///
/// let first = async {4};
/// let second = async {let () = pending().await;};
/// let third = async {5};
/// let fourth = async {let () = pending().await;};
/// let fifth = async {6};
/// pin_mut!(first);
/// pin_mut!(second);
/// pin_mut!(third);
/// pin_mut!(fourth);
/// pin_mut!(fifth);
/// match run_one(select5(first, second, third, fourth, fifth)) {
/// Ok((SelectResult::Finished(4), SelectResult::Pending(_second),
/// SelectResult::Finished(5), SelectResult::Pending(_fourth),
/// SelectResult::Finished(6))) => (),
/// _ => panic!("Select didn't return the futures"),
/// };
/// ```
pub async fn select5<
F1: Future + Unpin,
F2: Future + Unpin,
F3: Future + Unpin,
F4: Future + Unpin,
F5: Future + Unpin,
>(
f1: F1,
f2: F2,
f3: F3,
f4: F4,
f5: F5,
) -> (
SelectResult<F1>,
SelectResult<F2>,
SelectResult<F3>,
SelectResult<F4>,
SelectResult<F5>,
) {
select::Select5::new(f1, f2, f3, f4, f5).await
}
/// Creates a combinator that runs the six given futures until one or more completes, returning a
/// tuple containing the result of the finished future(s) and the still pending future(s).
///
/// # Example
///
/// ```
/// use cros_async::{SelectResult, select6, run_one};
/// use futures::future::pending;
/// use futures::pin_mut;
///
/// let first = async {1};
/// let second = async {let () = pending().await;};
/// let third = async {3};
/// let fourth = async {let () = pending().await;};
/// let fifth = async {5};
/// let sixth = async {6};
/// pin_mut!(first);
/// pin_mut!(second);
/// pin_mut!(third);
/// pin_mut!(fourth);
/// pin_mut!(fifth);
/// pin_mut!(sixth);
/// match run_one(select6(first, second, third, fourth, fifth, sixth)) {
/// Ok((SelectResult::Finished(1), SelectResult::Pending(_second),
/// SelectResult::Finished(3), SelectResult::Pending(_fourth),
/// SelectResult::Finished(5), SelectResult::Finished(6))) => (),
/// _ => panic!("Select didn't return the futures"),
/// };
/// ```
pub async fn select6<
F1: Future + Unpin,
F2: Future + Unpin,
F3: Future + Unpin,
F4: Future + Unpin,
F5: Future + Unpin,
F6: Future + Unpin,
>(
f1: F1,
f2: F2,
f3: F3,
f4: F4,
f5: F5,
f6: F6,
) -> (
SelectResult<F1>,
SelectResult<F2>,
SelectResult<F3>,
SelectResult<F4>,
SelectResult<F5>,
SelectResult<F6>,
) {
select::Select6::new(f1, f2, f3, f4, f5, f6).await
}
pub async fn select7<
F1: Future + Unpin,
F2: Future + Unpin,
F3: Future + Unpin,
F4: Future + Unpin,
F5: Future + Unpin,
F6: Future + Unpin,
F7: Future + Unpin,
>(
f1: F1,
f2: F2,
f3: F3,
f4: F4,
f5: F5,
f6: F6,
f7: F7,
) -> (
SelectResult<F1>,
SelectResult<F2>,
SelectResult<F3>,
SelectResult<F4>,
SelectResult<F5>,
SelectResult<F6>,
SelectResult<F7>,
) {
select::Select7::new(f1, f2, f3, f4, f5, f6, f7).await
}
// Combination helpers to run until all futures are complete.
/// Creates a combinator that runs the two given futures to completion, returning a tuple of the
/// outputs each yields.
///
/// # Example
///
/// ```
/// use cros_async::{complete2, run_one};
///
/// let first = async {5};
/// let second = async {6};
/// assert_eq!(run_one(complete2(first, second)).unwrap_or((0,0)), (5,6));
/// ```
pub async fn complete2<F1, F2>(f1: F1, f2: F2) -> (F1::Output, F2::Output)
where
F1: Future,
F2: Future,
{
complete::Complete2::new(f1, f2).await
}
/// Creates a combinator that runs the three given futures to completion, returning a tuple of the
/// outputs each yields.
///
/// # Example
///
/// ```
/// use cros_async::{complete3, run_one};
///
/// let first = async {5};
/// let second = async {6};
/// let third = async {7};
/// assert_eq!(run_one(complete3(first, second, third)).unwrap_or((0,0,0)), (5,6,7));
/// ```
pub async fn complete3<F1, F2, F3>(f1: F1, f2: F2, f3: F3) -> (F1::Output, F2::Output, F3::Output)
where
F1: Future,
F2: Future,
F3: Future,
{
complete::Complete3::new(f1, f2, f3).await
}
/// Creates a combinator that runs the four given futures to completion, returning a tuple of the
/// outputs each yields.
///
/// # Example
///
/// ```
/// use cros_async::{complete4, run_one};
///
/// let first = async {5};
/// let second = async {6};
/// let third = async {7};
/// let fourth = async {8};
/// assert_eq!(run_one(complete4(first, second, third, fourth)).unwrap_or((0,0,0,0)), (5,6,7,8));
/// ```
pub async fn complete4<F1, F2, F3, F4>(
f1: F1,
f2: F2,
f3: F3,
f4: F4,
) -> (F1::Output, F2::Output, F3::Output, F4::Output)
where
F1: Future,
F2: Future,
F3: Future,
F4: Future,
{
complete::Complete4::new(f1, f2, f3, f4).await
}
/// Creates a combinator that runs the five given futures to completion, returning a tuple of the
/// outputs each yields.
///
/// # Example
///
/// ```
/// use cros_async::{complete5, run_one};
///
/// let first = async {5};
/// let second = async {6};
/// let third = async {7};
/// let fourth = async {8};
/// let fifth = async {9};
/// assert_eq!(run_one(complete5(first, second, third, fourth, fifth)).unwrap_or((0,0,0,0,0)),
/// (5,6,7,8,9));
/// ```
pub async fn complete5<F1, F2, F3, F4, F5>(
f1: F1,
f2: F2,
f3: F3,
f4: F4,
f5: F5,
) -> (F1::Output, F2::Output, F3::Output, F4::Output, F5::Output)
where
F1: Future,
F2: Future,
F3: Future,
F4: Future,
F5: Future,
{
complete::Complete5::new(f1, f2, f3, f4, f5).await
}

98
cros_async/src/mem.rs Normal file
View file

@ -0,0 +1,98 @@
// Copyright 2020 The Chromium OS Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use data_model::VolatileSlice;
use remain::sorted;
use thiserror::Error as ThisError;
#[sorted]
#[derive(ThisError, Debug)]
pub enum Error {
/// Invalid offset or length given for an iovec in backing memory.
#[error("Invalid offset/len for getting a slice from {0} with len {1}.")]
InvalidOffset(u64, usize),
}
pub type Result<T> = std::result::Result<T, Error>;
/// Used to index subslices of backing memory. Like an iovec, but relative to the start of the
/// memory region instead of an absolute pointer.
/// The backing memory referenced by the region can be an array, an mmapped file, or guest memory.
/// The offset is a u64 to allow having file or guest offsets >4GB when run on a 32bit host.
#[derive(Copy, Clone, Debug)]
pub struct MemRegion {
pub offset: u64,
pub len: usize,
}
/// Trait for memory that can yeild both iovecs in to the backing memory.
/// Must be OK to modify the backing memory without owning a mut able reference. For example,
/// this is safe for GuestMemory and VolatileSlices in crosvm as those types guarantee they are
/// dealt with as volatile.
pub unsafe trait BackingMemory {
/// Returns VolatileSlice pointing to the backing memory. This is most commonly unsafe.
/// To implement this safely the implementor must guarantee that the backing memory can be
/// modified out of band without affecting safety guarantees.
fn get_volatile_slice(&self, mem_range: MemRegion) -> Result<VolatileSlice>;
}
/// Wrapper to be used for passing a Vec in as backing memory for asynchronous operations. The
/// wrapper owns a Vec according to the borrow checker. It is loaning this vec out to the kernel(or
/// other modifiers) through the `BackingMemory` trait. This allows multiple modifiers of the array
/// in the `Vec` while this struct is alive. The data in the Vec is loaned to the kernel not the
/// data structure itself, the length, capacity, and pointer to memory cannot be modified.
/// To ensure that those operations can be done safely, no access is allowed to the `Vec`'s memory
/// starting at the time that `VecIoWrapper` is constructed until the time it is turned back in to a
/// `Vec` using `to_inner`. The returned `Vec` is guaranteed to be valid as any combination of bits
/// in a `Vec` of `u8` is valid.
pub(crate) struct VecIoWrapper {
inner: Box<[u8]>,
}
impl From<Vec<u8>> for VecIoWrapper {
fn from(vec: Vec<u8>) -> Self {
VecIoWrapper { inner: vec.into() }
}
}
impl From<VecIoWrapper> for Vec<u8> {
fn from(v: VecIoWrapper) -> Vec<u8> {
v.inner.into()
}
}
impl VecIoWrapper {
/// Get the length of the Vec that is wrapped.
pub fn len(&self) -> usize {
self.inner.len()
}
// Check that the offsets are all valid in the backing vec.
fn check_addrs(&self, mem_range: &MemRegion) -> Result<()> {
let end = mem_range
.offset
.checked_add(mem_range.len as u64)
.ok_or(Error::InvalidOffset(mem_range.offset, mem_range.len))?;
if end > self.inner.len() as u64 {
return Err(Error::InvalidOffset(mem_range.offset, mem_range.len));
}
Ok(())
}
}
// Safe to implement BackingMemory as the vec is only accessible inside the wrapper and these iovecs
// are the only thing allowed to modify it. Nothing else can get a reference to the vec until all
// iovecs are dropped because they borrow Self. Nothing can borrow the owned inner vec until self
// is consumed by `into`, which can't happen if there are outstanding mut borrows.
unsafe impl BackingMemory for VecIoWrapper {
fn get_volatile_slice(&self, mem_range: MemRegion) -> Result<VolatileSlice<'_>> {
self.check_addrs(&mem_range)?;
// Safe because the mem_range range is valid in the backing memory as checked above.
unsafe {
Ok(VolatileSlice::from_raw_parts(
self.inner.as_ptr().add(mem_range.offset as usize) as *mut _,
mem_range.len,
))
}
}
}

View file

@ -0,0 +1,450 @@
// Copyright 2020 The Chromium OS Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
//! A wrapped IO source that uses FdExecutor to drive asynchronous completion. Used from
//! `IoSourceExt::new` when uring isn't available in the kernel.
use std::{
io,
ops::{Deref, DerefMut},
os::unix::io::AsRawFd,
sync::Arc,
};
use async_trait::async_trait;
use data_model::VolatileSlice;
use remain::sorted;
use thiserror::Error as ThisError;
use super::{
fd_executor::{
FdExecutor, RegisteredSource, {self},
},
mem::{BackingMemory, MemRegion},
AsyncError, AsyncResult, IoSourceExt, ReadAsync, WriteAsync,
};
#[sorted]
#[derive(ThisError, Debug)]
pub enum Error {
/// An error occurred attempting to register a waker with the executor.
#[error("An error occurred attempting to register a waker with the executor: {0}.")]
AddingWaker(fd_executor::Error),
/// An executor error occurred.
#[error("An executor error occurred: {0}")]
Executor(fd_executor::Error),
/// An error occurred when executing fallocate synchronously.
#[error("An error occurred when executing fallocate synchronously: {0}")]
Fallocate(sys_util::Error),
/// An error occurred when executing fsync synchronously.
#[error("An error occurred when executing fsync synchronously: {0}")]
Fsync(sys_util::Error),
/// An error occurred when reading the FD.
#[error("An error occurred when reading the FD: {0}.")]
Read(sys_util::Error),
/// Can't seek file.
#[error("An error occurred when seeking the FD: {0}.")]
Seeking(sys_util::Error),
/// An error occurred when writing the FD.
#[error("An error occurred when writing the FD: {0}.")]
Write(sys_util::Error),
}
pub type Result<T> = std::result::Result<T, Error>;
impl From<Error> for io::Error {
fn from(e: Error) -> Self {
use Error::*;
match e {
AddingWaker(e) => e.into(),
Executor(e) => e.into(),
Fallocate(e) => e.into(),
Fsync(e) => e.into(),
Read(e) => e.into(),
Seeking(e) => e.into(),
Write(e) => e.into(),
}
}
}
/// Async wrapper for an IO source that uses the FD executor to drive async operations.
/// Used by `IoSourceExt::new` when uring isn't available.
pub struct PollSource<F>(RegisteredSource<F>);
impl<F: AsRawFd> PollSource<F> {
/// Create a new `PollSource` from the given IO source.
pub fn new(f: F, ex: &FdExecutor) -> Result<Self> {
ex.register_source(f)
.map(PollSource)
.map_err(Error::Executor)
}
/// Return the inner source.
pub fn into_source(self) -> F {
self.0.into_source()
}
}
impl<F: AsRawFd> Deref for PollSource<F> {
type Target = F;
fn deref(&self) -> &Self::Target {
self.0.as_ref()
}
}
impl<F: AsRawFd> DerefMut for PollSource<F> {
fn deref_mut(&mut self) -> &mut Self::Target {
self.0.as_mut()
}
}
#[async_trait(?Send)]
impl<F: AsRawFd> ReadAsync for PollSource<F> {
/// Reads from the iosource at `file_offset` and fill the given `vec`.
async fn read_to_vec<'a>(
&'a self,
file_offset: Option<u64>,
mut vec: Vec<u8>,
) -> AsyncResult<(usize, Vec<u8>)> {
loop {
// Safe because this will only modify `vec` and we check the return value.
let res = if let Some(offset) = file_offset {
unsafe {
libc::pread64(
self.as_raw_fd(),
vec.as_mut_ptr() as *mut libc::c_void,
vec.len(),
offset as libc::off64_t,
)
}
} else {
unsafe {
libc::read(
self.as_raw_fd(),
vec.as_mut_ptr() as *mut libc::c_void,
vec.len(),
)
}
};
if res >= 0 {
return Ok((res as usize, vec));
}
match sys_util::Error::last() {
e if e.errno() == libc::EWOULDBLOCK => {
let op = self.0.wait_readable().map_err(Error::AddingWaker)?;
op.await.map_err(Error::Executor)?;
}
e => return Err(Error::Read(e).into()),
}
}
}
/// Reads to the given `mem` at the given offsets from the file starting at `file_offset`.
async fn read_to_mem<'a>(
&'a self,
file_offset: Option<u64>,
mem: Arc<dyn BackingMemory + Send + Sync>,
mem_offsets: &'a [MemRegion],
) -> AsyncResult<usize> {
let mut iovecs = mem_offsets
.iter()
.filter_map(|&mem_vec| mem.get_volatile_slice(mem_vec).ok())
.collect::<Vec<VolatileSlice>>();
loop {
// Safe because we trust the kernel not to write path the length given and the length is
// guaranteed to be valid from the pointer by io_slice_mut.
let res = if let Some(offset) = file_offset {
unsafe {
libc::preadv64(
self.as_raw_fd(),
iovecs.as_mut_ptr() as *mut _,
iovecs.len() as i32,
offset as libc::off64_t,
)
}
} else {
unsafe {
libc::readv(
self.as_raw_fd(),
iovecs.as_mut_ptr() as *mut _,
iovecs.len() as i32,
)
}
};
if res >= 0 {
return Ok(res as usize);
}
match sys_util::Error::last() {
e if e.errno() == libc::EWOULDBLOCK => {
let op = self.0.wait_readable().map_err(Error::AddingWaker)?;
op.await.map_err(Error::Executor)?;
}
e => return Err(Error::Read(e).into()),
}
}
}
/// Wait for the FD of `self` to be readable.
async fn wait_readable(&self) -> AsyncResult<()> {
let op = self.0.wait_readable().map_err(Error::AddingWaker)?;
op.await.map_err(Error::Executor)?;
Ok(())
}
async fn read_u64(&self) -> AsyncResult<u64> {
let mut buf = 0u64.to_ne_bytes();
loop {
// Safe because this will only modify `buf` and we check the return value.
let res = unsafe {
libc::read(
self.as_raw_fd(),
buf.as_mut_ptr() as *mut libc::c_void,
buf.len(),
)
};
if res >= 0 {
return Ok(u64::from_ne_bytes(buf));
}
match sys_util::Error::last() {
e if e.errno() == libc::EWOULDBLOCK => {
let op = self.0.wait_readable().map_err(Error::AddingWaker)?;
op.await.map_err(Error::Executor)?;
}
e => return Err(Error::Read(e).into()),
}
}
}
}
#[async_trait(?Send)]
impl<F: AsRawFd> WriteAsync for PollSource<F> {
/// Writes from the given `vec` to the file starting at `file_offset`.
async fn write_from_vec<'a>(
&'a self,
file_offset: Option<u64>,
vec: Vec<u8>,
) -> AsyncResult<(usize, Vec<u8>)> {
loop {
// Safe because this will not modify any memory and we check the return value.
let res = if let Some(offset) = file_offset {
unsafe {
libc::pwrite64(
self.as_raw_fd(),
vec.as_ptr() as *const libc::c_void,
vec.len(),
offset as libc::off64_t,
)
}
} else {
unsafe {
libc::write(
self.as_raw_fd(),
vec.as_ptr() as *const libc::c_void,
vec.len(),
)
}
};
if res >= 0 {
return Ok((res as usize, vec));
}
match sys_util::Error::last() {
e if e.errno() == libc::EWOULDBLOCK => {
let op = self.0.wait_writable().map_err(Error::AddingWaker)?;
op.await.map_err(Error::Executor)?;
}
e => return Err(Error::Write(e).into()),
}
}
}
/// Writes from the given `mem` from the given offsets to the file starting at `file_offset`.
async fn write_from_mem<'a>(
&'a self,
file_offset: Option<u64>,
mem: Arc<dyn BackingMemory + Send + Sync>,
mem_offsets: &'a [MemRegion],
) -> AsyncResult<usize> {
let iovecs = mem_offsets
.iter()
.map(|&mem_vec| mem.get_volatile_slice(mem_vec))
.filter_map(|r| r.ok())
.collect::<Vec<VolatileSlice>>();
loop {
// Safe because we trust the kernel not to write path the length given and the length is
// guaranteed to be valid from the pointer by io_slice_mut.
let res = if let Some(offset) = file_offset {
unsafe {
libc::pwritev64(
self.as_raw_fd(),
iovecs.as_ptr() as *mut _,
iovecs.len() as i32,
offset as libc::off64_t,
)
}
} else {
unsafe {
libc::writev(
self.as_raw_fd(),
iovecs.as_ptr() as *mut _,
iovecs.len() as i32,
)
}
};
if res >= 0 {
return Ok(res as usize);
}
match sys_util::Error::last() {
e if e.errno() == libc::EWOULDBLOCK => {
let op = self.0.wait_writable().map_err(Error::AddingWaker)?;
op.await.map_err(Error::Executor)?;
}
e => return Err(Error::Write(e).into()),
}
}
}
/// See `fallocate(2)` for details.
async fn fallocate(&self, file_offset: u64, len: u64, mode: u32) -> AsyncResult<()> {
let ret = unsafe {
libc::fallocate64(
self.as_raw_fd(),
mode as libc::c_int,
file_offset as libc::off64_t,
len as libc::off64_t,
)
};
if ret == 0 {
Ok(())
} else {
Err(AsyncError::Poll(Error::Fallocate(sys_util::Error::last())))
}
}
/// Sync all completed write operations to the backing storage.
async fn fsync(&self) -> AsyncResult<()> {
let ret = unsafe { libc::fsync(self.as_raw_fd()) };
if ret == 0 {
Ok(())
} else {
Err(AsyncError::Poll(Error::Fsync(sys_util::Error::last())))
}
}
}
#[async_trait(?Send)]
impl<F: AsRawFd> IoSourceExt<F> for PollSource<F> {
/// Yields the underlying IO source.
fn into_source(self: Box<Self>) -> F {
self.0.into_source()
}
/// Provides a mutable ref to the underlying IO source.
fn as_source_mut(&mut self) -> &mut F {
self
}
/// Provides a ref to the underlying IO source.
fn as_source(&self) -> &F {
self
}
}
#[cfg(test)]
mod tests {
use std::{
fs::{File, OpenOptions},
path::PathBuf,
};
use super::*;
#[test]
fn readvec() {
async fn go(ex: &FdExecutor) {
let f = File::open("/dev/zero").unwrap();
let async_source = PollSource::new(f, ex).unwrap();
let v = vec![0x55u8; 32];
let v_ptr = v.as_ptr();
let ret = async_source.read_to_vec(None, v).await.unwrap();
assert_eq!(ret.0, 32);
let ret_v = ret.1;
assert_eq!(v_ptr, ret_v.as_ptr());
assert!(ret_v.iter().all(|&b| b == 0));
}
let ex = FdExecutor::new().unwrap();
ex.run_until(go(&ex)).unwrap();
}
#[test]
fn writevec() {
async fn go(ex: &FdExecutor) {
let f = OpenOptions::new().write(true).open("/dev/null").unwrap();
let async_source = PollSource::new(f, ex).unwrap();
let v = vec![0x55u8; 32];
let v_ptr = v.as_ptr();
let ret = async_source.write_from_vec(None, v).await.unwrap();
assert_eq!(ret.0, 32);
let ret_v = ret.1;
assert_eq!(v_ptr, ret_v.as_ptr());
}
let ex = FdExecutor::new().unwrap();
ex.run_until(go(&ex)).unwrap();
}
#[test]
fn fallocate() {
async fn go(ex: &FdExecutor) {
let dir = tempfile::TempDir::new().unwrap();
let mut file_path = PathBuf::from(dir.path());
file_path.push("test");
let f = OpenOptions::new()
.create(true)
.write(true)
.open(&file_path)
.unwrap();
let source = PollSource::new(f, ex).unwrap();
source.fallocate(0, 4096, 0).await.unwrap();
let meta_data = std::fs::metadata(&file_path).unwrap();
assert_eq!(meta_data.len(), 4096);
}
let ex = FdExecutor::new().unwrap();
ex.run_until(go(&ex)).unwrap();
}
#[test]
fn memory_leak() {
// This test needs to run under ASAN to detect memory leaks.
async fn owns_poll_source(source: PollSource<File>) {
let _ = source.wait_readable().await;
}
let (rx, _tx) = sys_util::pipe(true).unwrap();
let ex = FdExecutor::new().unwrap();
let source = PollSource::new(rx, &ex).unwrap();
ex.spawn_local(owns_poll_source(source)).detach();
// Drop `ex` without running. This would cause a memory leak if PollSource owned a strong
// reference to the executor because it owns a reference to the future that owns PollSource
// (via its Runnable). The strong reference prevents the drop impl from running, which would
// otherwise poll the future and have it return with an error.
}
}

66
cros_async/src/queue.rs Normal file
View file

@ -0,0 +1,66 @@
// Copyright 2020 The Chromium OS Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use std::collections::VecDeque;
use async_task::Runnable;
use sync::Mutex;
/// A queue of `Runnables`. Intended to be used by executors to keep track of futures that have been
/// scheduled to run.
pub struct RunnableQueue {
runnables: Mutex<VecDeque<Runnable>>,
}
impl RunnableQueue {
/// Create a new, empty `RunnableQueue`.
pub fn new() -> RunnableQueue {
RunnableQueue {
runnables: Mutex::new(VecDeque::new()),
}
}
/// Schedule `runnable` to run in the future by adding it to this `RunnableQueue`.
pub fn push_back(&self, runnable: Runnable) {
self.runnables.lock().push_back(runnable);
}
/// Remove and return the first `Runnable` in this `RunnableQueue` or `None` if it is empty.
pub fn pop_front(&self) -> Option<Runnable> {
self.runnables.lock().pop_front()
}
/// Create an iterator over this `RunnableQueue` that repeatedly calls `pop_front()` until it is
/// empty.
pub fn iter(&self) -> RunnableQueueIter {
self.into_iter()
}
}
impl Default for RunnableQueue {
fn default() -> Self {
Self::new()
}
}
impl<'q> IntoIterator for &'q RunnableQueue {
type Item = Runnable;
type IntoIter = RunnableQueueIter<'q>;
fn into_iter(self) -> Self::IntoIter {
RunnableQueueIter { queue: self }
}
}
/// An iterator over a `RunnableQueue`.
pub struct RunnableQueueIter<'q> {
queue: &'q RunnableQueue,
}
impl<'q> Iterator for RunnableQueueIter<'q> {
type Item = Runnable;
fn next(&mut self) -> Option<Self::Item> {
self.queue.pop_front()
}
}

92
cros_async/src/select.rs Normal file
View file

@ -0,0 +1,92 @@
// Copyright 2020 The Chromium OS Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
// Need non-snake case so the macro can re-use type names for variables.
#![allow(non_snake_case)]
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
use futures::future::{maybe_done, FutureExt, MaybeDone};
pub enum SelectResult<F: Future> {
Pending(F),
Finished(F::Output),
}
// Macro-generate future combinators to allow for running different numbers of top-level futures in
// this FutureList. Generates the implementation of `FutureList` for the select types. For an
// explicit example this is modeled after, see `UnitFutures`.
macro_rules! generate {
($(
$(#[$doc:meta])*
($Select:ident, <$($Fut:ident),*>),
)*) => ($(
paste::item! {
pub(crate) struct $Select<$($Fut: Future + Unpin),*> {
$($Fut: MaybeDone<$Fut>,)*
}
}
impl<$($Fut: Future + Unpin),*> $Select<$($Fut),*> {
paste::item! {
pub(crate) fn new($($Fut: $Fut),*) -> $Select<$($Fut),*> {
$Select {
$($Fut: maybe_done($Fut),)*
}
}
}
}
impl<$($Fut: Future + Unpin),*> Future for $Select<$($Fut),*> {
type Output = ($(SelectResult<$Fut>),*);
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let mut complete = false;
$(
let $Fut = Pin::new(&mut self.$Fut);
// The future impls `Unpin`, use `poll_unpin` to avoid wrapping it in
// `Pin` to call `poll`.
complete |= self.$Fut.poll_unpin(cx).is_ready();
)*
if complete {
Poll::Ready(($(
match std::mem::replace(&mut self.$Fut, MaybeDone::Gone) {
MaybeDone::Future(f) => SelectResult::Pending(f),
MaybeDone::Done(o) => SelectResult::Finished(o),
MaybeDone::Gone => unreachable!(),
}
), *))
} else {
Poll::Pending
}
}
}
)*)
}
generate! {
/// _Future for the [`select2`] function.
(Select2, <_Fut1, _Fut2>),
/// _Future for the [`select3`] function.
(Select3, <_Fut1, _Fut2, _Fut3>),
/// _Future for the [`select4`] function.
(Select4, <_Fut1, _Fut2, _Fut3, _Fut4>),
/// _Future for the [`select5`] function.
(Select5, <_Fut1, _Fut2, _Fut3, _Fut4, _Fut5>),
/// _Future for the [`select6`] function.
(Select6, <_Fut1, _Fut2, _Fut3, _Fut4, _Fut5, _Fut6>),
/// _Future for the [`select7`] function.
(Select7, <_Fut1, _Fut2, _Fut3, _Fut4, _Fut5, _Fut6, _Fut7>),
}

12
cros_async/src/sync.rs Normal file
View file

@ -0,0 +1,12 @@
// Copyright 2020 The Chromium OS Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
mod cv;
mod mu;
mod spin;
mod waiter;
pub use cv::Condvar;
pub use mu::Mutex;
pub use spin::SpinLock;

1179
cros_async/src/sync/cv.rs Normal file

File diff suppressed because it is too large Load diff

2305
cros_async/src/sync/mu.rs Normal file

File diff suppressed because it is too large Load diff

284
cros_async/src/sync/spin.rs Normal file
View file

@ -0,0 +1,284 @@
// Copyright 2020 The Chromium OS Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use std::{
cell::UnsafeCell,
hint,
ops::{Deref, DerefMut},
sync::atomic::{AtomicBool, Ordering},
};
const UNLOCKED: bool = false;
const LOCKED: bool = true;
/// A primitive that provides safe, mutable access to a shared resource.
///
/// Unlike `Mutex`, a `SpinLock` will not voluntarily yield its CPU time until the resource is
/// available and will instead keep spinning until the resource is acquired. For the vast majority
/// of cases, `Mutex` is a better choice than `SpinLock`. If a `SpinLock` must be used then users
/// should try to do as little work as possible while holding the `SpinLock` and avoid any sort of
/// blocking at all costs as it can severely penalize performance.
///
/// # Poisoning
///
/// This `SpinLock` does not implement lock poisoning so it is possible for threads to access
/// poisoned data if a thread panics while holding the lock. If lock poisoning is needed, it can be
/// implemented by wrapping the `SpinLock` in a new type that implements poisoning. See the
/// implementation of `std::sync::Mutex` for an example of how to do this.
#[repr(align(128))]
pub struct SpinLock<T: ?Sized> {
lock: AtomicBool,
value: UnsafeCell<T>,
}
impl<T> SpinLock<T> {
/// Creates a new, unlocked `SpinLock` that's ready for use.
pub fn new(value: T) -> SpinLock<T> {
SpinLock {
lock: AtomicBool::new(UNLOCKED),
value: UnsafeCell::new(value),
}
}
/// Consumes the `SpinLock` and returns the value guarded by it. This method doesn't perform any
/// locking as the compiler guarantees that there are no references to `self`.
pub fn into_inner(self) -> T {
// No need to take the lock because the compiler can statically guarantee
// that there are no references to the SpinLock.
self.value.into_inner()
}
}
impl<T: ?Sized> SpinLock<T> {
/// Acquires exclusive, mutable access to the resource protected by the `SpinLock`, blocking the
/// current thread until it is able to do so. Upon returning, the current thread will be the
/// only thread with access to the resource. The `SpinLock` will be released when the returned
/// `SpinLockGuard` is dropped. Attempting to call `lock` while already holding the `SpinLock`
/// will cause a deadlock.
pub fn lock(&self) -> SpinLockGuard<T> {
loop {
let state = self.lock.load(Ordering::Relaxed);
if state == UNLOCKED
&& self
.lock
.compare_exchange_weak(UNLOCKED, LOCKED, Ordering::Acquire, Ordering::Relaxed)
.is_ok()
{
break;
}
hint::spin_loop();
}
SpinLockGuard {
lock: self,
value: unsafe { &mut *self.value.get() },
}
}
fn unlock(&self) {
// Don't need to compare and swap because we exclusively hold the lock.
self.lock.store(UNLOCKED, Ordering::Release);
}
/// Returns a mutable reference to the contained value. This method doesn't perform any locking
/// as the compiler will statically guarantee that there are no other references to `self`.
pub fn get_mut(&mut self) -> &mut T {
// Safe because the compiler can statically guarantee that there are no other references to
// `self`. This is also why we don't need to acquire the lock.
unsafe { &mut *self.value.get() }
}
}
unsafe impl<T: ?Sized + Send> Send for SpinLock<T> {}
unsafe impl<T: ?Sized + Send> Sync for SpinLock<T> {}
impl<T: ?Sized + Default> Default for SpinLock<T> {
fn default() -> Self {
Self::new(Default::default())
}
}
impl<T> From<T> for SpinLock<T> {
fn from(source: T) -> Self {
Self::new(source)
}
}
/// An RAII implementation of a "scoped lock" for a `SpinLock`. When this structure is dropped, the
/// lock will be released. The resource protected by the `SpinLock` can be accessed via the `Deref`
/// and `DerefMut` implementations of this structure.
pub struct SpinLockGuard<'a, T: 'a + ?Sized> {
lock: &'a SpinLock<T>,
value: &'a mut T,
}
impl<'a, T: ?Sized> Deref for SpinLockGuard<'a, T> {
type Target = T;
fn deref(&self) -> &T {
self.value
}
}
impl<'a, T: ?Sized> DerefMut for SpinLockGuard<'a, T> {
fn deref_mut(&mut self) -> &mut T {
self.value
}
}
impl<'a, T: ?Sized> Drop for SpinLockGuard<'a, T> {
fn drop(&mut self) {
self.lock.unlock();
}
}
#[cfg(test)]
mod test {
use super::*;
use std::{
mem,
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
thread,
};
#[derive(PartialEq, Eq, Debug)]
struct NonCopy(u32);
#[test]
fn it_works() {
let sl = SpinLock::new(NonCopy(13));
assert_eq!(*sl.lock(), NonCopy(13));
}
#[test]
fn smoke() {
let sl = SpinLock::new(NonCopy(7));
mem::drop(sl.lock());
mem::drop(sl.lock());
}
#[test]
fn send() {
let sl = SpinLock::new(NonCopy(19));
thread::spawn(move || {
let value = sl.lock();
assert_eq!(*value, NonCopy(19));
})
.join()
.unwrap();
}
#[test]
fn high_contention() {
const THREADS: usize = 23;
const ITERATIONS: usize = 101;
let mut threads = Vec::with_capacity(THREADS);
let sl = Arc::new(SpinLock::new(0usize));
for _ in 0..THREADS {
let sl2 = sl.clone();
threads.push(thread::spawn(move || {
for _ in 0..ITERATIONS {
*sl2.lock() += 1;
}
}));
}
for t in threads.into_iter() {
t.join().unwrap();
}
assert_eq!(*sl.lock(), THREADS * ITERATIONS);
}
#[test]
fn get_mut() {
let mut sl = SpinLock::new(NonCopy(13));
*sl.get_mut() = NonCopy(17);
assert_eq!(sl.into_inner(), NonCopy(17));
}
#[test]
fn into_inner() {
let sl = SpinLock::new(NonCopy(29));
assert_eq!(sl.into_inner(), NonCopy(29));
}
#[test]
fn into_inner_drop() {
struct NeedsDrop(Arc<AtomicUsize>);
impl Drop for NeedsDrop {
fn drop(&mut self) {
self.0.fetch_add(1, Ordering::AcqRel);
}
}
let value = Arc::new(AtomicUsize::new(0));
let needs_drop = SpinLock::new(NeedsDrop(value.clone()));
assert_eq!(value.load(Ordering::Acquire), 0);
{
let inner = needs_drop.into_inner();
assert_eq!(inner.0.load(Ordering::Acquire), 0);
}
assert_eq!(value.load(Ordering::Acquire), 1);
}
#[test]
fn arc_nested() {
// Tests nested sltexes and access to underlying data.
let sl = SpinLock::new(1);
let arc = Arc::new(SpinLock::new(sl));
thread::spawn(move || {
let nested = arc.lock();
let lock2 = nested.lock();
assert_eq!(*lock2, 1);
})
.join()
.unwrap();
}
#[test]
fn arc_access_in_unwind() {
let arc = Arc::new(SpinLock::new(1));
let arc2 = arc.clone();
thread::spawn(move || {
struct Unwinder {
i: Arc<SpinLock<i32>>,
}
impl Drop for Unwinder {
fn drop(&mut self) {
*self.i.lock() += 1;
}
}
let _u = Unwinder { i: arc2 };
panic!();
})
.join()
.expect_err("thread did not panic");
let lock = arc.lock();
assert_eq!(*lock, 2);
}
#[test]
fn unsized_value() {
let sltex: &SpinLock<[i32]> = &SpinLock::new([1, 2, 3]);
{
let b = &mut *sltex.lock();
b[0] = 4;
b[2] = 5;
}
let expected: &[i32] = &[4, 2, 5];
assert_eq!(&*sltex.lock(), expected);
}
}

View file

@ -0,0 +1,288 @@
// Copyright 2020 The Chromium OS Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use std::{
cell::UnsafeCell,
future::Future,
mem,
pin::Pin,
ptr::NonNull,
sync::{
atomic::{AtomicBool, AtomicU8, Ordering},
Arc,
},
task::{Context, Poll, Waker},
};
use intrusive_collections::{
intrusive_adapter,
linked_list::{LinkedList, LinkedListOps},
DefaultLinkOps, LinkOps,
};
use super::super::sync::SpinLock;
// An atomic version of a LinkedListLink. See https://github.com/Amanieu/intrusive-rs/issues/47 for
// more details.
#[repr(align(128))]
pub struct AtomicLink {
prev: UnsafeCell<Option<NonNull<AtomicLink>>>,
next: UnsafeCell<Option<NonNull<AtomicLink>>>,
linked: AtomicBool,
}
impl AtomicLink {
fn new() -> AtomicLink {
AtomicLink {
linked: AtomicBool::new(false),
prev: UnsafeCell::new(None),
next: UnsafeCell::new(None),
}
}
fn is_linked(&self) -> bool {
self.linked.load(Ordering::Relaxed)
}
}
impl DefaultLinkOps for AtomicLink {
type Ops = AtomicLinkOps;
const NEW: Self::Ops = AtomicLinkOps;
}
// Safe because the only way to mutate `AtomicLink` is via the `LinkedListOps` trait whose methods
// are all unsafe and require that the caller has first called `acquire_link` (and had it return
// true) to use them safely.
unsafe impl Send for AtomicLink {}
unsafe impl Sync for AtomicLink {}
#[derive(Copy, Clone, Default)]
pub struct AtomicLinkOps;
unsafe impl LinkOps for AtomicLinkOps {
type LinkPtr = NonNull<AtomicLink>;
unsafe fn acquire_link(&mut self, ptr: Self::LinkPtr) -> bool {
!ptr.as_ref().linked.swap(true, Ordering::Acquire)
}
unsafe fn release_link(&mut self, ptr: Self::LinkPtr) {
ptr.as_ref().linked.store(false, Ordering::Release)
}
}
unsafe impl LinkedListOps for AtomicLinkOps {
unsafe fn next(&self, ptr: Self::LinkPtr) -> Option<Self::LinkPtr> {
*ptr.as_ref().next.get()
}
unsafe fn prev(&self, ptr: Self::LinkPtr) -> Option<Self::LinkPtr> {
*ptr.as_ref().prev.get()
}
unsafe fn set_next(&mut self, ptr: Self::LinkPtr, next: Option<Self::LinkPtr>) {
*ptr.as_ref().next.get() = next;
}
unsafe fn set_prev(&mut self, ptr: Self::LinkPtr, prev: Option<Self::LinkPtr>) {
*ptr.as_ref().prev.get() = prev;
}
}
#[derive(Clone, Copy)]
pub enum Kind {
Shared,
Exclusive,
}
enum State {
Init,
Waiting(Waker),
Woken,
Finished,
Processing,
}
// Indicates the queue to which the waiter belongs. It is the responsibility of the Mutex and
// Condvar implementations to update this value when adding/removing a Waiter from their respective
// waiter lists.
#[repr(u8)]
#[derive(Debug, Eq, PartialEq)]
pub enum WaitingFor {
// The waiter is either not linked into a waiter list or it is linked into a temporary list.
None = 0,
// The waiter is linked into the Mutex's waiter list.
Mutex = 1,
// The waiter is linked into the Condvar's waiter list.
Condvar = 2,
}
// Represents a thread currently blocked on a Condvar or on acquiring a Mutex.
pub struct Waiter {
link: AtomicLink,
state: SpinLock<State>,
cancel: fn(usize, &Waiter, bool),
cancel_data: usize,
kind: Kind,
waiting_for: AtomicU8,
}
impl Waiter {
// Create a new, initialized Waiter.
//
// `kind` should indicate whether this waiter represent a thread that is waiting for a shared
// lock or an exclusive lock.
//
// `cancel` is the function that is called when a `WaitFuture` (returned by the `wait()`
// function) is dropped before it can complete. `cancel_data` is used as the first parameter of
// the `cancel` function. The second parameter is the `Waiter` that was canceled and the third
// parameter indicates whether the `WaitFuture` was dropped after it was woken (but before it
// was polled to completion). A value of `false` for the third parameter may already be stale
// by the time the cancel function runs and so does not guarantee that the waiter was not woken.
// In this case, implementations should still check if the Waiter was woken. However, a value of
// `true` guarantees that the waiter was already woken up so no additional checks are necessary.
// In this case, the cancel implementation should wake up the next waiter in its wait list, if
// any.
//
// `waiting_for` indicates the waiter list to which this `Waiter` will be added. See the
// documentation of the `WaitingFor` enum for the meaning of the different values.
pub fn new(
kind: Kind,
cancel: fn(usize, &Waiter, bool),
cancel_data: usize,
waiting_for: WaitingFor,
) -> Waiter {
Waiter {
link: AtomicLink::new(),
state: SpinLock::new(State::Init),
cancel,
cancel_data,
kind,
waiting_for: AtomicU8::new(waiting_for as u8),
}
}
// The kind of lock that this `Waiter` is waiting to acquire.
pub fn kind(&self) -> Kind {
self.kind
}
// Returns true if this `Waiter` is currently linked into a waiter list.
pub fn is_linked(&self) -> bool {
self.link.is_linked()
}
// Indicates the waiter list to which this `Waiter` belongs.
pub fn is_waiting_for(&self) -> WaitingFor {
match self.waiting_for.load(Ordering::Acquire) {
0 => WaitingFor::None,
1 => WaitingFor::Mutex,
2 => WaitingFor::Condvar,
v => panic!("Unknown value for `WaitingFor`: {}", v),
}
}
// Change the waiter list to which this `Waiter` belongs. This will panic if called when the
// `Waiter` is still linked into a waiter list.
pub fn set_waiting_for(&self, waiting_for: WaitingFor) {
self.waiting_for.store(waiting_for as u8, Ordering::Release);
}
// Reset the Waiter back to its initial state. Panics if this `Waiter` is still linked into a
// waiter list.
pub fn reset(&self, waiting_for: WaitingFor) {
debug_assert!(!self.is_linked(), "Cannot reset `Waiter` while linked");
self.set_waiting_for(waiting_for);
let mut state = self.state.lock();
if let State::Waiting(waker) = mem::replace(&mut *state, State::Init) {
mem::drop(state);
mem::drop(waker);
}
}
// Wait until woken up by another thread.
pub fn wait(&self) -> WaitFuture<'_> {
WaitFuture { waiter: self }
}
// Wake up the thread associated with this `Waiter`. Panics if `waiting_for()` does not return
// `WaitingFor::None` or if `is_linked()` returns true.
pub fn wake(&self) {
debug_assert!(!self.is_linked(), "Cannot wake `Waiter` while linked");
debug_assert_eq!(self.is_waiting_for(), WaitingFor::None);
let mut state = self.state.lock();
if let State::Waiting(waker) = mem::replace(&mut *state, State::Woken) {
mem::drop(state);
waker.wake();
}
}
}
pub struct WaitFuture<'w> {
waiter: &'w Waiter,
}
impl<'w> Future for WaitFuture<'w> {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let mut state = self.waiter.state.lock();
match mem::replace(&mut *state, State::Processing) {
State::Init => {
*state = State::Waiting(cx.waker().clone());
Poll::Pending
}
State::Waiting(old_waker) => {
*state = State::Waiting(cx.waker().clone());
mem::drop(state);
mem::drop(old_waker);
Poll::Pending
}
State::Woken => {
*state = State::Finished;
Poll::Ready(())
}
State::Finished => {
panic!("Future polled after returning Poll::Ready");
}
State::Processing => {
panic!("Unexpected waker state");
}
}
}
}
impl<'w> Drop for WaitFuture<'w> {
fn drop(&mut self) {
let state = self.waiter.state.lock();
match *state {
State::Finished => {}
State::Processing => panic!("Unexpected waker state"),
State::Woken => {
mem::drop(state);
// We were woken but not polled. Wake up the next waiter.
(self.waiter.cancel)(self.waiter.cancel_data, self.waiter, true);
}
_ => {
mem::drop(state);
// Not woken. No need to wake up any waiters.
(self.waiter.cancel)(self.waiter.cancel_data, self.waiter, false);
}
}
}
}
intrusive_adapter!(pub WaiterAdapter = Arc<Waiter>: Waiter { link: AtomicLink });
pub type WaiterList = LinkedList<WaiterAdapter>;

126
cros_async/src/timer.rs Normal file
View file

@ -0,0 +1,126 @@
// Copyright 2020 The Chromium OS Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use std::time::Duration;
use sys_util::{Result as SysResult, TimerFd};
use super::{AsyncResult, Error, Executor, IntoAsync, IoSourceExt};
#[cfg(test)]
use super::{FdExecutor, URingExecutor};
/// An async version of sys_util::TimerFd.
pub struct TimerAsync {
io_source: Box<dyn IoSourceExt<TimerFd>>,
}
impl TimerAsync {
pub fn new(timer: TimerFd, ex: &Executor) -> AsyncResult<TimerAsync> {
ex.async_from(timer)
.map(|io_source| TimerAsync { io_source })
}
#[cfg(test)]
pub(crate) fn new_poll(timer: TimerFd, ex: &FdExecutor) -> AsyncResult<TimerAsync> {
super::executor::async_poll_from(timer, ex).map(|io_source| TimerAsync { io_source })
}
#[cfg(test)]
pub(crate) fn new_uring(timer: TimerFd, ex: &URingExecutor) -> AsyncResult<TimerAsync> {
super::executor::async_uring_from(timer, ex).map(|io_source| TimerAsync { io_source })
}
/// Gets the next value from the timer.
pub async fn next_val(&self) -> AsyncResult<u64> {
self.io_source.read_u64().await
}
/// Async sleep for the given duration
pub async fn sleep(ex: &Executor, dur: Duration) -> std::result::Result<(), Error> {
let tfd = TimerFd::new().map_err(Error::TimerFd)?;
tfd.reset(dur, None).map_err(Error::TimerFd)?;
let t = TimerAsync::new(tfd, ex).map_err(Error::TimerAsync)?;
t.next_val().await.map_err(Error::TimerAsync)?;
Ok(())
}
/// Sets the timer to expire after `dur`. If `interval` is not `None` it represents
/// the period for repeated expirations after the initial expiration. Otherwise
/// the timer will expire just once. Cancels any existing duration and repeating interval.
pub fn reset(&mut self, dur: Duration, interval: Option<Duration>) -> SysResult<()> {
self.io_source.as_source_mut().reset(dur, interval)
}
}
impl IntoAsync for TimerFd {}
#[cfg(test)]
mod tests {
use super::{super::uring_executor::use_uring, *};
use std::time::{Duration, Instant};
#[test]
fn one_shot() {
if !use_uring() {
return;
}
async fn this_test(ex: &URingExecutor) {
let tfd = TimerFd::new().expect("failed to create timerfd");
assert_eq!(tfd.is_armed().unwrap(), false);
let dur = Duration::from_millis(200);
let now = Instant::now();
tfd.reset(dur, None).expect("failed to arm timer");
assert_eq!(tfd.is_armed().unwrap(), true);
let t = TimerAsync::new_uring(tfd, ex).unwrap();
let count = t.next_val().await.expect("unable to wait for timer");
assert_eq!(count, 1);
assert!(now.elapsed() >= dur);
}
let ex = URingExecutor::new().unwrap();
ex.run_until(this_test(&ex)).unwrap();
}
#[test]
fn one_shot_fd() {
async fn this_test(ex: &FdExecutor) {
let tfd = TimerFd::new().expect("failed to create timerfd");
assert_eq!(tfd.is_armed().unwrap(), false);
let dur = Duration::from_millis(200);
let now = Instant::now();
tfd.reset(dur, None).expect("failed to arm timer");
assert_eq!(tfd.is_armed().unwrap(), true);
let t = TimerAsync::new_poll(tfd, ex).unwrap();
let count = t.next_val().await.expect("unable to wait for timer");
assert_eq!(count, 1);
assert!(now.elapsed() >= dur);
}
let ex = FdExecutor::new().unwrap();
ex.run_until(this_test(&ex)).unwrap();
}
#[test]
fn timer() {
async fn this_test(ex: &Executor) {
let dur = Duration::from_millis(200);
let now = Instant::now();
TimerAsync::sleep(ex, dur).await.expect("unable to sleep");
assert!(now.elapsed() >= dur);
}
let ex = Executor::new().expect("creating an executor failed");
ex.run_until(this_test(&ex)).unwrap();
}
}

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,643 @@
// Copyright 2020 The Chromium OS Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use std::{
convert::TryInto,
io,
ops::{Deref, DerefMut},
os::unix::io::AsRawFd,
sync::Arc,
};
use async_trait::async_trait;
use super::{
mem::{BackingMemory, MemRegion, VecIoWrapper},
uring_executor::{Error, RegisteredSource, Result, URingExecutor},
AsyncError, AsyncResult,
};
/// `UringSource` wraps FD backed IO sources for use with io_uring. It is a thin wrapper around
/// registering an IO source with the uring that provides an `IoSource` implementation.
/// Most useful functions are provided by 'IoSourceExt'.
pub struct UringSource<F: AsRawFd> {
registered_source: RegisteredSource,
source: F,
}
impl<F: AsRawFd> UringSource<F> {
/// Creates a new `UringSource` that wraps the given `io_source` object.
pub fn new(io_source: F, ex: &URingExecutor) -> Result<UringSource<F>> {
let r = ex.register_source(&io_source)?;
Ok(UringSource {
registered_source: r,
source: io_source,
})
}
/// Consume `self` and return the object used to create it.
pub fn into_source(self) -> F {
self.source
}
}
#[async_trait(?Send)]
impl<F: AsRawFd> super::ReadAsync for UringSource<F> {
/// Reads from the iosource at `file_offset` and fill the given `vec`.
async fn read_to_vec<'a>(
&'a self,
file_offset: Option<u64>,
vec: Vec<u8>,
) -> AsyncResult<(usize, Vec<u8>)> {
let buf = Arc::new(VecIoWrapper::from(vec));
let op = self.registered_source.start_read_to_mem(
file_offset.unwrap_or(0),
buf.clone(),
&[MemRegion {
offset: 0,
len: buf.len(),
}],
)?;
let len = op.await?;
let bytes = if let Ok(v) = Arc::try_unwrap(buf) {
v.into()
} else {
panic!("too many refs on buf");
};
Ok((len as usize, bytes))
}
/// Wait for the FD of `self` to be readable.
async fn wait_readable(&self) -> AsyncResult<()> {
let op = self.registered_source.poll_fd_readable()?;
op.await?;
Ok(())
}
/// Reads a single u64 (e.g. from an eventfd).
async fn read_u64(&self) -> AsyncResult<u64> {
// This doesn't just forward to read_to_vec to avoid an unnecessary extra allocation from
// async-trait.
let buf = Arc::new(VecIoWrapper::from(0u64.to_ne_bytes().to_vec()));
let op = self.registered_source.start_read_to_mem(
0,
buf.clone(),
&[MemRegion {
offset: 0,
len: buf.len(),
}],
)?;
let len = op.await?;
if len != buf.len() as u32 {
Err(AsyncError::Uring(Error::Io(io::Error::new(
io::ErrorKind::Other,
format!("expected to read {} bytes, but read {}", buf.len(), len),
))))
} else {
let bytes: Vec<u8> = if let Ok(v) = Arc::try_unwrap(buf) {
v.into()
} else {
panic!("too many refs on buf");
};
// Will never panic because bytes is of the appropriate size.
Ok(u64::from_ne_bytes(bytes[..].try_into().unwrap()))
}
}
/// Reads to the given `mem` at the given offsets from the file starting at `file_offset`.
async fn read_to_mem<'a>(
&'a self,
file_offset: Option<u64>,
mem: Arc<dyn BackingMemory + Send + Sync>,
mem_offsets: &'a [MemRegion],
) -> AsyncResult<usize> {
let op =
self.registered_source
.start_read_to_mem(file_offset.unwrap_or(0), mem, mem_offsets)?;
let len = op.await?;
Ok(len as usize)
}
}
#[async_trait(?Send)]
impl<F: AsRawFd> super::WriteAsync for UringSource<F> {
/// Writes from the given `vec` to the file starting at `file_offset`.
async fn write_from_vec<'a>(
&'a self,
file_offset: Option<u64>,
vec: Vec<u8>,
) -> AsyncResult<(usize, Vec<u8>)> {
let buf = Arc::new(VecIoWrapper::from(vec));
let op = self.registered_source.start_write_from_mem(
file_offset.unwrap_or(0),
buf.clone(),
&[MemRegion {
offset: 0,
len: buf.len(),
}],
)?;
let len = op.await?;
let bytes = if let Ok(v) = Arc::try_unwrap(buf) {
v.into()
} else {
panic!("too many refs on buf");
};
Ok((len as usize, bytes))
}
/// Writes from the given `mem` from the given offsets to the file starting at `file_offset`.
async fn write_from_mem<'a>(
&'a self,
file_offset: Option<u64>,
mem: Arc<dyn BackingMemory + Send + Sync>,
mem_offsets: &'a [MemRegion],
) -> AsyncResult<usize> {
let op = self.registered_source.start_write_from_mem(
file_offset.unwrap_or(0),
mem,
mem_offsets,
)?;
let len = op.await?;
Ok(len as usize)
}
/// See `fallocate(2)`. Note this op is synchronous when using the Polled backend.
async fn fallocate(&self, file_offset: u64, len: u64, mode: u32) -> AsyncResult<()> {
let op = self
.registered_source
.start_fallocate(file_offset, len, mode)?;
let _ = op.await?;
Ok(())
}
/// Sync all completed write operations to the backing storage.
async fn fsync(&self) -> AsyncResult<()> {
let op = self.registered_source.start_fsync()?;
let _ = op.await?;
Ok(())
}
}
#[async_trait(?Send)]
impl<F: AsRawFd> super::IoSourceExt<F> for UringSource<F> {
/// Yields the underlying IO source.
fn into_source(self: Box<Self>) -> F {
self.source
}
/// Provides a mutable ref to the underlying IO source.
fn as_source(&self) -> &F {
&self.source
}
/// Provides a ref to the underlying IO source.
fn as_source_mut(&mut self) -> &mut F {
&mut self.source
}
}
impl<F: AsRawFd> Deref for UringSource<F> {
type Target = F;
fn deref(&self) -> &Self::Target {
&self.source
}
}
impl<F: AsRawFd> DerefMut for UringSource<F> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.source
}
}
#[cfg(test)]
mod tests {
use std::{
fs::{File, OpenOptions},
os::unix::io::AsRawFd,
path::PathBuf,
};
use super::super::{
io_ext::{ReadAsync, WriteAsync},
uring_executor::use_uring,
UringSource,
};
use super::*;
#[test]
fn read_to_mem() {
if !use_uring() {
return;
}
use super::super::mem::VecIoWrapper;
use std::io::Write;
use tempfile::tempfile;
let ex = URingExecutor::new().unwrap();
// Use guest memory as a test file, it implements AsRawFd.
let mut source = tempfile().unwrap();
let data = vec![0x55; 8192];
source.write_all(&data).unwrap();
let io_obj = UringSource::new(source, &ex).unwrap();
// Start with memory filled with 0x44s.
let buf: Arc<VecIoWrapper> = Arc::new(VecIoWrapper::from(vec![0x44; 8192]));
let fut = io_obj.read_to_mem(
None,
Arc::<VecIoWrapper>::clone(&buf),
&[MemRegion {
offset: 0,
len: 8192,
}],
);
assert_eq!(8192, ex.run_until(fut).unwrap().unwrap());
let vec: Vec<u8> = match Arc::try_unwrap(buf) {
Ok(v) => v.into(),
Err(_) => panic!("Too many vec refs"),
};
assert!(vec.iter().all(|&b| b == 0x55));
}
#[test]
fn readvec() {
if !use_uring() {
return;
}
async fn go(ex: &URingExecutor) {
let f = File::open("/dev/zero").unwrap();
let source = UringSource::new(f, ex).unwrap();
let v = vec![0x55u8; 32];
let v_ptr = v.as_ptr();
let ret = source.read_to_vec(None, v).await.unwrap();
assert_eq!(ret.0, 32);
let ret_v = ret.1;
assert_eq!(v_ptr, ret_v.as_ptr());
assert!(ret_v.iter().all(|&b| b == 0));
}
let ex = URingExecutor::new().unwrap();
ex.run_until(go(&ex)).unwrap();
}
#[test]
fn readmulti() {
if !use_uring() {
return;
}
async fn go(ex: &URingExecutor) {
let f = File::open("/dev/zero").unwrap();
let source = UringSource::new(f, ex).unwrap();
let v = vec![0x55u8; 32];
let v2 = vec![0x55u8; 32];
let (ret, ret2) = futures::future::join(
source.read_to_vec(None, v),
source.read_to_vec(Some(32), v2),
)
.await;
assert!(ret.unwrap().1.iter().all(|&b| b == 0));
assert!(ret2.unwrap().1.iter().all(|&b| b == 0));
}
let ex = URingExecutor::new().unwrap();
ex.run_until(go(&ex)).unwrap();
}
async fn read_u64<T: AsRawFd>(source: &UringSource<T>) -> u64 {
// Init a vec that translates to u64::max;
let u64_mem = vec![0xffu8; std::mem::size_of::<u64>()];
let (ret, u64_mem) = source.read_to_vec(None, u64_mem).await.unwrap();
assert_eq!(ret as usize, std::mem::size_of::<u64>());
let mut val = 0u64.to_ne_bytes();
val.copy_from_slice(&u64_mem);
u64::from_ne_bytes(val)
}
#[test]
fn u64_from_file() {
if !use_uring() {
return;
}
let f = File::open("/dev/zero").unwrap();
let ex = URingExecutor::new().unwrap();
let source = UringSource::new(f, &ex).unwrap();
assert_eq!(0u64, ex.run_until(read_u64(&source)).unwrap());
}
#[test]
fn event() {
if !use_uring() {
return;
}
use sys_util::EventFd;
async fn write_event(ev: EventFd, wait: EventFd, ex: &URingExecutor) {
let wait = UringSource::new(wait, ex).unwrap();
ev.write(55).unwrap();
read_u64(&wait).await;
ev.write(66).unwrap();
read_u64(&wait).await;
ev.write(77).unwrap();
read_u64(&wait).await;
}
async fn read_events(ev: EventFd, signal: EventFd, ex: &URingExecutor) {
let source = UringSource::new(ev, ex).unwrap();
assert_eq!(read_u64(&source).await, 55);
signal.write(1).unwrap();
assert_eq!(read_u64(&source).await, 66);
signal.write(1).unwrap();
assert_eq!(read_u64(&source).await, 77);
signal.write(1).unwrap();
}
let event = EventFd::new().unwrap();
let signal_wait = EventFd::new().unwrap();
let ex = URingExecutor::new().unwrap();
let write_task = write_event(
event.try_clone().unwrap(),
signal_wait.try_clone().unwrap(),
&ex,
);
let read_task = read_events(event, signal_wait, &ex);
ex.run_until(futures::future::join(read_task, write_task))
.unwrap();
}
#[test]
fn pend_on_pipe() {
if !use_uring() {
return;
}
use std::io::Write;
use futures::future::Either;
async fn do_test(ex: &URingExecutor) {
let (read_source, mut w) = sys_util::pipe(true).unwrap();
let source = UringSource::new(read_source, ex).unwrap();
let done = Box::pin(async { 5usize });
let pending = Box::pin(read_u64(&source));
match futures::future::select(pending, done).await {
Either::Right((5, pending)) => {
// Write to the pipe so that the kernel will release the memory associated with
// the uring read operation.
w.write_all(&[0]).expect("failed to write to pipe");
::std::mem::drop(pending);
}
_ => panic!("unexpected select result"),
};
}
let ex = URingExecutor::new().unwrap();
ex.run_until(do_test(&ex)).unwrap();
}
#[test]
fn readmem() {
if !use_uring() {
return;
}
async fn go(ex: &URingExecutor) {
let f = File::open("/dev/zero").unwrap();
let source = UringSource::new(f, ex).unwrap();
let v = vec![0x55u8; 64];
let vw = Arc::new(VecIoWrapper::from(v));
let ret = source
.read_to_mem(
None,
Arc::<VecIoWrapper>::clone(&vw),
&[MemRegion { offset: 0, len: 32 }],
)
.await
.unwrap();
assert_eq!(32, ret);
let vec: Vec<u8> = match Arc::try_unwrap(vw) {
Ok(v) => v.into(),
Err(_) => panic!("Too many vec refs"),
};
assert!(vec.iter().take(32).all(|&b| b == 0));
assert!(vec.iter().skip(32).all(|&b| b == 0x55));
// test second half of memory too.
let v = vec![0x55u8; 64];
let vw = Arc::new(VecIoWrapper::from(v));
let ret = source
.read_to_mem(
None,
Arc::<VecIoWrapper>::clone(&vw),
&[MemRegion {
offset: 32,
len: 32,
}],
)
.await
.unwrap();
assert_eq!(32, ret);
let v: Vec<u8> = match Arc::try_unwrap(vw) {
Ok(v) => v.into(),
Err(_) => panic!("Too many vec refs"),
};
assert!(v.iter().take(32).all(|&b| b == 0x55));
assert!(v.iter().skip(32).all(|&b| b == 0));
}
let ex = URingExecutor::new().unwrap();
ex.run_until(go(&ex)).unwrap();
}
#[test]
fn range_error() {
if !use_uring() {
return;
}
async fn go(ex: &URingExecutor) {
let f = File::open("/dev/zero").unwrap();
let source = UringSource::new(f, ex).unwrap();
let v = vec![0x55u8; 64];
let vw = Arc::new(VecIoWrapper::from(v));
let ret = source
.read_to_mem(
None,
Arc::<VecIoWrapper>::clone(&vw),
&[MemRegion {
offset: 32,
len: 33,
}],
)
.await;
assert!(ret.is_err());
}
let ex = URingExecutor::new().unwrap();
ex.run_until(go(&ex)).unwrap();
}
#[test]
fn fallocate() {
if !use_uring() {
return;
}
async fn go(ex: &URingExecutor) {
let dir = tempfile::TempDir::new().unwrap();
let mut file_path = PathBuf::from(dir.path());
file_path.push("test");
let f = OpenOptions::new()
.create(true)
.write(true)
.open(&file_path)
.unwrap();
let source = UringSource::new(f, ex).unwrap();
if let Err(e) = source.fallocate(0, 4096, 0).await {
match e {
super::super::io_ext::Error::Uring(
super::super::uring_executor::Error::Io(io_err),
) => {
if io_err.kind() == std::io::ErrorKind::InvalidInput {
// Skip the test on kernels before fallocate support.
return;
}
}
_ => panic!("Unexpected uring error on fallocate: {}", e),
}
}
let meta_data = std::fs::metadata(&file_path).unwrap();
assert_eq!(meta_data.len(), 4096);
}
let ex = URingExecutor::new().unwrap();
ex.run_until(go(&ex)).unwrap();
}
#[test]
fn fsync() {
if !use_uring() {
return;
}
async fn go(ex: &URingExecutor) {
let f = tempfile::tempfile().unwrap();
let source = UringSource::new(f, ex).unwrap();
source.fsync().await.unwrap();
}
let ex = URingExecutor::new().unwrap();
ex.run_until(go(&ex)).unwrap();
}
#[test]
fn wait_read() {
if !use_uring() {
return;
}
async fn go(ex: &URingExecutor) {
let f = File::open("/dev/zero").unwrap();
let source = UringSource::new(f, ex).unwrap();
source.wait_readable().await.unwrap();
}
let ex = URingExecutor::new().unwrap();
ex.run_until(go(&ex)).unwrap();
}
#[test]
fn writemem() {
if !use_uring() {
return;
}
async fn go(ex: &URingExecutor) {
let f = OpenOptions::new()
.create(true)
.write(true)
.open("/tmp/write_from_vec")
.unwrap();
let source = UringSource::new(f, ex).unwrap();
let v = vec![0x55u8; 64];
let vw = Arc::new(super::super::mem::VecIoWrapper::from(v));
let ret = source
.write_from_mem(None, vw, &[MemRegion { offset: 0, len: 32 }])
.await
.unwrap();
assert_eq!(32, ret);
}
let ex = URingExecutor::new().unwrap();
ex.run_until(go(&ex)).unwrap();
}
#[test]
fn writevec() {
if !use_uring() {
return;
}
async fn go(ex: &URingExecutor) {
let f = OpenOptions::new()
.create(true)
.truncate(true)
.write(true)
.open("/tmp/write_from_vec")
.unwrap();
let source = UringSource::new(f, ex).unwrap();
let v = vec![0x55u8; 32];
let v_ptr = v.as_ptr();
let (ret, ret_v) = source.write_from_vec(None, v).await.unwrap();
assert_eq!(32, ret);
assert_eq!(v_ptr, ret_v.as_ptr());
}
let ex = URingExecutor::new().unwrap();
ex.run_until(go(&ex)).unwrap();
}
#[test]
fn writemulti() {
if !use_uring() {
return;
}
async fn go(ex: &URingExecutor) {
let f = OpenOptions::new()
.create(true)
.truncate(true)
.write(true)
.open("/tmp/write_from_vec")
.unwrap();
let source = UringSource::new(f, ex).unwrap();
let v = vec![0x55u8; 32];
let v2 = vec![0x55u8; 32];
let (r, r2) = futures::future::join(
source.write_from_vec(None, v),
source.write_from_vec(Some(32), v2),
)
.await;
assert_eq!(32, r.unwrap().0);
assert_eq!(32, r2.unwrap().0);
}
let ex = URingExecutor::new().unwrap();
ex.run_until(go(&ex)).unwrap();
}
}

70
cros_async/src/waker.rs Normal file
View file

@ -0,0 +1,70 @@
// Copyright 2020 The Chromium OS Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use std::{
mem::{drop, ManuallyDrop},
sync::Weak,
task::{RawWaker, RawWakerVTable, Waker},
};
/// Wrapper around a usize used as a token to uniquely identify a pending waker.
#[derive(Debug)]
pub(crate) struct WakerToken(pub(crate) usize);
/// Like `futures::task::ArcWake` but uses `Weak<T>` instead of `Arc<T>`.
pub(crate) trait WeakWake: Send + Sync {
fn wake_by_ref(weak_self: &Weak<Self>);
fn wake(weak_self: Weak<Self>) {
Self::wake_by_ref(&weak_self)
}
}
fn waker_vtable<W: WeakWake>() -> &'static RawWakerVTable {
&RawWakerVTable::new(
clone_weak_raw::<W>,
wake_weak_raw::<W>,
wake_by_ref_weak_raw::<W>,
drop_weak_raw::<W>,
)
}
unsafe fn clone_weak_raw<W: WeakWake>(data: *const ()) -> RawWaker {
// Get a handle to the Weak<T> but wrap it in a ManuallyDrop so that we don't reduce the
// refcount at the end of this function.
let weak = ManuallyDrop::new(Weak::<W>::from_raw(data as *const W));
// Now increase the weak count and keep it in a ManuallyDrop so that it doesn't get decreased
// at the end of this function.
let _weak_clone: ManuallyDrop<_> = weak.clone();
RawWaker::new(data, waker_vtable::<W>())
}
unsafe fn wake_weak_raw<W: WeakWake>(data: *const ()) {
let weak: Weak<W> = Weak::from_raw(data as *const W);
WeakWake::wake(weak)
}
unsafe fn wake_by_ref_weak_raw<W: WeakWake>(data: *const ()) {
// Get a handle to the Weak<T> but wrap it in a ManuallyDrop so that we don't reduce the
// refcount at the end of this function.
let weak = ManuallyDrop::new(Weak::<W>::from_raw(data as *const W));
WeakWake::wake_by_ref(&weak)
}
unsafe fn drop_weak_raw<W: WeakWake>(data: *const ()) {
drop(Weak::from_raw(data as *const W))
}
pub(crate) fn new_waker<W: WeakWake>(w: Weak<W>) -> Waker {
unsafe {
Waker::from_raw(RawWaker::new(
w.into_raw() as *const (),
waker_vtable::<W>(),
))
}
}

View file

@ -28,7 +28,7 @@ audio_streams = "*"
balloon_control = { path = "../common/balloon_control" }
base = { path = "../base" }
bit_field = { path = "../bit_field" }
cros_async = { path = "../common/cros_async" }
cros_async = { path = "../cros_async" }
data_model = { path = "../common/data_model" }
dbus = { version = "0.9", optional = true }
disk = { path = "../disk" }

View file

@ -20,7 +20,7 @@ remain = "*"
tempfile = "3"
thiserror = "*"
uuid = { version = "0.8.2", features = ["v4"], optional = true }
cros_async = { path = "../common/cros_async" }
cros_async = { path = "../cros_async" }
data_model = { path = "../common/data_model" }
protos = { path = "../protos", features = ["composite-disk"], optional = true }
vm_memory = { path = "../vm_memory" }

View file

@ -9,6 +9,6 @@ libc = "*"
data_model = { path = "../common/data_model" }
net_sys = { path = "../net_sys" }
base = { path = "../base" }
cros_async = { path = "../common/cros_async" }
cros_async = { path = "../cros_async" }
remain = "*"
thiserror = "*"

View file

@ -6,7 +6,7 @@ edition = "2021"
include = ["src/**/*", "Cargo.toml"]
[dependencies]
cros_async = { path = "../common/cros_async" }
cros_async = { path = "../cros_async" }
data_model = { path = "../common/data_model" }
libc = "*"
base = { path = "../base" }