cros_async: Combine platform-specific Executors

Instead of a dispatch enum defined for windows and a dispatch enum
defined for linux, there is now one dispatch enum with branches defined
for the two platforms. This makes it so defining the tokio executor
later only has to be added in one place.

Bug: b/320603688
Test: tools/dev_container tools/presubmit
Test: tools/dev_container tools/presubmit clippy
Change-Id: I6eaf46710bbb54a9684cb1622da36c3026906a5c
Reviewed-on: https://chromium-review.googlesource.com/c/crosvm/crosvm/+/5208332
Commit-Queue: Cody Schuffelen <schuffelen@google.com>
Reviewed-by: Frederick Mayle <fmayle@google.com>
This commit is contained in:
A. Cody Schuffelen 2024-01-17 16:27:46 -08:00 committed by crosvm LUCI
parent 895c6c79f4
commit b7ca74d516
15 changed files with 300 additions and 879 deletions

View file

@ -4,8 +4,18 @@
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
#[cfg(any(target_os = "android", target_os = "linux"))]
use base::warn;
#[cfg(any(target_os = "android", target_os = "linux"))]
use base::AsRawDescriptors;
#[cfg(any(target_os = "android", target_os = "linux"))]
use base::RawDescriptor;
use once_cell::sync::OnceCell;
use crate::common_executor;
use crate::common_executor::RawExecutor;
#[cfg(any(target_os = "android", target_os = "linux"))]
use crate::sys::linux;
#[cfg(windows)]
@ -30,12 +40,6 @@ pub enum ExecutorKind {
SysVariants(ExecutorKindSys),
}
impl Default for ExecutorKind {
fn default() -> ExecutorKind {
ExecutorKind::SysVariants(ExecutorKindSys::default())
}
}
impl From<ExecutorKindSys> for ExecutorKind {
fn from(e: ExecutorKindSys) -> ExecutorKind {
ExecutorKind::SysVariants(e)
@ -51,6 +55,33 @@ impl From<ExecutorKind> for ExecutorKindSys {
}
}
/// If set, [`Executor::new()`] is created with `ExecutorKindSys` of `DEFAULT_EXECUTOR_KIND`.
static DEFAULT_EXECUTOR_KIND: OnceCell<ExecutorKind> = OnceCell::new();
impl Default for ExecutorKind {
fn default() -> Self {
#[cfg(any(target_os = "android", target_os = "linux"))]
let default_fn = || ExecutorKindSys::Fd.into();
#[cfg(windows)]
let default_fn = || ExecutorKindSys::Handle.into();
*DEFAULT_EXECUTOR_KIND.get_or_init(default_fn)
}
}
/// The error type for [`Executor::set_default_executor_kind()`].
#[derive(thiserror::Error, Debug)]
pub enum SetDefaultExecutorKindError {
/// The default executor kind is set more than once.
#[error("The default executor kind is already set to {0:?}")]
SetMoreThanOnce(ExecutorKind),
#[cfg(any(target_os = "android", target_os = "linux"))]
/// io_uring is unavailable. The reason might be the lack of the kernel support,
/// but is not limited to that.
#[error("io_uring is unavailable: {0}")]
UringUnavailable(linux::uring_executor::Error),
}
/// Reference to a task managed by the executor.
///
/// Dropping a `TaskHandle` attempts to cancel the associated task. Call `detach` to allow it to
@ -107,6 +138,27 @@ impl<R: 'static> Future for TaskHandle<R> {
}
}
pub(crate) trait ExecutorTrait {
fn async_from<'a, F: IntoAsync + 'a>(&self, f: F) -> AsyncResult<IoSource<F>>;
fn spawn<F>(&self, f: F) -> TaskHandle<F::Output>
where
F: Future + Send + 'static,
F::Output: Send + 'static;
fn spawn_blocking<F, R>(&self, f: F) -> TaskHandle<R>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static;
fn spawn_local<F>(&self, f: F) -> TaskHandle<F::Output>
where
F: Future + 'static,
F::Output: 'static;
fn run_until<F: Future>(&self, f: F) -> AsyncResult<F::Output>;
}
/// An executor for scheduling tasks that poll futures to completion.
///
/// All asynchronous operations must run within an executor, which is capable of spawning futures as
@ -203,11 +255,108 @@ impl<R: 'static> Future for TaskHandle<R> {
/// #[cfg(any(target_os = "android", target_os = "linux"))]
/// # do_it().unwrap();
/// ```
pub(crate) trait ExecutorTrait {
#[derive(Clone)]
pub enum Executor {
#[cfg(any(target_os = "android", target_os = "linux"))]
Fd(Arc<RawExecutor<linux::EpollReactor>>),
#[cfg(any(target_os = "android", target_os = "linux"))]
Uring(Arc<RawExecutor<linux::UringReactor>>),
#[cfg(windows)]
Handle(Arc<RawExecutor<windows::HandleReactor>>),
#[cfg(windows)]
Overlapped(Arc<RawExecutor<windows::HandleReactor>>),
}
impl Executor {
/// Create a new `Executor`.
pub fn new() -> AsyncResult<Self> {
Executor::with_executor_kind(ExecutorKind::default())
}
/// Create a new `Executor` of the given `ExecutorKind`.
pub fn with_executor_kind(kind: ExecutorKind) -> AsyncResult<Self> {
Ok(match kind {
#[cfg(any(target_os = "android", target_os = "linux"))]
ExecutorKind::SysVariants(ExecutorKindSys::Fd) => Executor::Fd(RawExecutor::new()?),
#[cfg(any(target_os = "android", target_os = "linux"))]
ExecutorKind::SysVariants(ExecutorKindSys::Uring) => {
Executor::Uring(RawExecutor::new()?)
}
#[cfg(windows)]
ExecutorKind::SysVariants(ExecutorKindSys::Handle) => {
Executor::Handle(RawExecutor::new()?)
}
#[cfg(windows)]
ExecutorKind::SysVariants(ExecutorKindSys::Overlapped) => {
Executor::Overlapped(RawExecutor::new()?)
}
})
}
/// Create a new `Executor` of the given `ExecutorKind`.
pub fn with_kind_and_concurrency(kind: ExecutorKind, _concurrency: u32) -> AsyncResult<Self> {
Ok(match kind {
#[cfg(windows)]
ExecutorKind::SysVariants(ExecutorKindSys::Overlapped) => {
let reactor = windows::HandleReactor::new_with(_concurrency)?;
Executor::Overlapped(RawExecutor::new_with(reactor)?)
}
_ => Executor::with_executor_kind(kind)?,
})
}
/// Set the default ExecutorKind for [`Self::new()`]. This call is effective only once.
pub fn set_default_executor_kind(
executor_kind: ExecutorKind,
) -> Result<(), SetDefaultExecutorKindError> {
#[cfg(any(target_os = "android", target_os = "linux"))]
if executor_kind == ExecutorKind::SysVariants(ExecutorKindSys::Uring) {
linux::uring_executor::check_uring_availability()
.map_err(SetDefaultExecutorKindError::UringUnavailable)?;
if !crate::is_uring_stable() {
warn!(
"Enabling io_uring executor on the kernel version where io_uring is unstable"
);
}
}
DEFAULT_EXECUTOR_KIND.set(executor_kind).map_err(|_|
// `expect` succeeds since this closure runs only when DEFAULT_EXECUTOR_KIND is set.
SetDefaultExecutorKindError::SetMoreThanOnce(
*DEFAULT_EXECUTOR_KIND
.get()
.expect("Failed to get DEFAULT_EXECUTOR_KIND"),
))
}
/// Create a new `IoSource<F>` associated with `self`. Callers may then use the returned
/// `IoSource` to directly start async operations without needing a separate reference to the
/// executor.
fn async_from<'a, F: IntoAsync + 'a>(&self, f: F) -> AsyncResult<IoSource<F>>;
pub fn async_from<'a, F: IntoAsync + 'a>(&self, f: F) -> AsyncResult<IoSource<F>> {
match self {
#[cfg(any(target_os = "android", target_os = "linux"))]
Executor::Fd(ex) => ex.async_from(f),
#[cfg(any(target_os = "android", target_os = "linux"))]
Executor::Uring(ex) => ex.async_from(f),
#[cfg(windows)]
Executor::Handle(ex) => ex.async_from(f),
#[cfg(windows)]
Executor::Overlapped(ex) => ex.async_from(f),
}
}
/// Create a new overlapped `IoSource<F>` associated with `self`. Callers may then use the
/// If the executor is not overlapped, then Handle source is returned.
/// returned `IoSource` to directly start async operations without needing a separate reference
/// to the executor.
#[cfg(windows)]
pub fn async_overlapped_from<'a, F: IntoAsync + 'a>(&self, f: F) -> AsyncResult<IoSource<F>> {
match self {
Executor::Overlapped(ex) => Ok(IoSource::Overlapped(windows::OverlappedSource::new(
f, ex, false,
)?)),
_ => self.async_from(f),
}
}
/// Spawn a new future for this executor to run to completion. Callers may use the returned
/// `TaskHandle` to await on the result of `f`. Dropping the returned `TaskHandle` will cancel
@ -239,10 +388,66 @@ pub(crate) trait ExecutorTrait {
///
/// # example_spawn().unwrap();
/// ```
fn spawn<F>(&self, f: F) -> TaskHandle<F::Output>
pub fn spawn<F>(&self, f: F) -> TaskHandle<F::Output>
where
F: Future + Send + 'static,
F::Output: Send + 'static;
F::Output: Send + 'static,
{
match self {
#[cfg(any(target_os = "android", target_os = "linux"))]
Executor::Fd(ex) => ex.spawn(f),
#[cfg(any(target_os = "android", target_os = "linux"))]
Executor::Uring(ex) => ex.spawn(f),
#[cfg(windows)]
Executor::Handle(ex) => ex.spawn(f),
#[cfg(windows)]
Executor::Overlapped(ex) => ex.spawn(f),
}
}
/// Spawn a thread-local task for this executor to drive to completion. Like `spawn` but without
/// requiring `Send` on `F` or `F::Output`. This method should only be called from the same
/// thread where `run()` or `run_until()` is called.
///
/// # Panics
///
/// `Executor::run` and `Executor::run_util` will panic if they try to poll a future that was
/// added by calling `spawn_local` from a different thread.
///
/// # Examples
///
/// ```
/// # use cros_async::AsyncResult;
/// # fn example_spawn_local() -> AsyncResult<()> {
/// # use cros_async::Executor;
///
/// # let ex = Executor::new()?;
///
/// let task = ex.spawn_local(async { 7 + 13 });
///
/// let result = ex.run_until(task)?;
/// assert_eq!(result, 20);
/// Ok(())
/// # }
///
/// # example_spawn_local().unwrap();
/// ```
pub fn spawn_local<F>(&self, f: F) -> TaskHandle<F::Output>
where
F: Future + 'static,
F::Output: 'static,
{
match self {
#[cfg(any(target_os = "android", target_os = "linux"))]
Executor::Fd(ex) => ex.spawn_local(f),
#[cfg(any(target_os = "android", target_os = "linux"))]
Executor::Uring(ex) => ex.spawn_local(f),
#[cfg(windows)]
Executor::Handle(ex) => ex.spawn_local(f),
#[cfg(windows)]
Executor::Overlapped(ex) => ex.spawn_local(f),
}
}
/// Run the provided closure on a dedicated thread where blocking is allowed.
///
@ -273,42 +478,59 @@ pub(crate) trait ExecutorTrait {
/// # let ex = Executor::new().unwrap();
/// # ex.run_until(do_it(&ex)).unwrap();
/// ```
fn spawn_blocking<F, R>(&self, f: F) -> TaskHandle<R>
pub fn spawn_blocking<F, R>(&self, f: F) -> TaskHandle<R>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static;
R: Send + 'static,
{
match self {
#[cfg(any(target_os = "android", target_os = "linux"))]
Executor::Fd(ex) => ex.spawn_blocking(f),
#[cfg(any(target_os = "android", target_os = "linux"))]
Executor::Uring(ex) => ex.spawn_blocking(f),
#[cfg(windows)]
Executor::Handle(ex) => ex.spawn_blocking(f),
#[cfg(windows)]
Executor::Overlapped(ex) => ex.spawn_blocking(f),
}
}
/// Spawn a thread-local task for this executor to drive to completion. Like `spawn` but without
/// requiring `Send` on `F` or `F::Output`. This method should only be called from the same
/// thread where `run()` or `run_until()` is called.
/// Run the executor indefinitely, driving all spawned futures to completion. This method will
/// block the current thread and only return in the case of an error.
///
/// # Panics
///
/// `Executor::run` and `Executor::run_util` will panic if they try to poll a future that was
/// added by calling `spawn_local` from a different thread.
/// Once this method has been called on a thread, it may only be called on that thread from that
/// point on. Attempting to call it from another thread will panic.
///
/// # Examples
///
/// ```
/// # use cros_async::AsyncResult;
/// # fn example_spawn_local() -> AsyncResult<()> {
/// # use cros_async::Executor;
/// # fn example_run() -> AsyncResult<()> {
/// use std::thread;
///
/// # let ex = Executor::new()?;
/// use cros_async::Executor;
/// use futures::executor::block_on;
///
/// let task = ex.spawn_local(async { 7 + 13 });
/// let ex = Executor::new()?;
///
/// let result = ex.run_until(task)?;
/// assert_eq!(result, 20);
/// Ok(())
/// // Spawn a thread that runs the executor.
/// let ex2 = ex.clone();
/// thread::spawn(move || ex2.run());
///
/// let task = ex.spawn(async { 7 + 13 });
///
/// let result = block_on(task);
/// assert_eq!(result, 20);
/// # Ok(())
/// # }
///
/// # example_spawn_local().unwrap();
/// # example_run().unwrap();
/// ```
fn spawn_local<F>(&self, f: F) -> TaskHandle<F::Output>
where
F: Future + 'static,
F::Output: 'static;
pub fn run(&self) -> AsyncResult<()> {
self.run_until(std::future::pending())
}
/// Drive all futures spawned in this executor until `f` completes. This method will block the
/// current thread only until `f` is complete and there may still be unfinished futures in the
@ -337,5 +559,26 @@ pub(crate) trait ExecutorTrait {
///
/// # example_run_until().unwrap();
/// ```
fn run_until<F: Future>(&self, f: F) -> AsyncResult<F::Output>;
pub fn run_until<F: Future>(&self, f: F) -> AsyncResult<F::Output> {
match self {
#[cfg(any(target_os = "android", target_os = "linux"))]
Executor::Fd(ex) => ex.run_until(f),
#[cfg(any(target_os = "android", target_os = "linux"))]
Executor::Uring(ex) => ex.run_until(f),
#[cfg(windows)]
Executor::Handle(ex) => ex.run_until(f),
#[cfg(windows)]
Executor::Overlapped(ex) => ex.run_until(f),
}
}
}
#[cfg(any(target_os = "android", target_os = "linux"))]
impl AsRawDescriptors for Executor {
fn as_raw_descriptors(&self) -> Vec<RawDescriptor> {
match self {
Executor::Fd(ex) => ex.as_raw_descriptors(),
Executor::Uring(ex) => ex.as_raw_descriptors(),
}
}
}

View file

@ -214,7 +214,7 @@ mod tests {
}
let f = tmpfile_with_contents("data".as_bytes());
let ex = Executor::with_executor_kind(kind.into()).unwrap();
let ex = Executor::with_executor_kind(kind).unwrap();
let source = ex.async_from(f).unwrap();
ex.run_until(go(source)).unwrap();
}
@ -232,7 +232,7 @@ mod tests {
}
let mut f = tmpfile_with_contents(&[]);
let ex = Executor::with_executor_kind(kind.into()).unwrap();
let ex = Executor::with_executor_kind(kind).unwrap();
let source = ex.async_from(f.try_clone().unwrap()).unwrap();
ex.run_until(go(source)).unwrap();
@ -266,7 +266,7 @@ mod tests {
}
let f = tmpfile_with_contents("data".as_bytes());
let ex = Executor::with_executor_kind(kind.into()).unwrap();
let ex = Executor::with_executor_kind(kind).unwrap();
let source = ex.async_from(f).unwrap();
ex.run_until(go(source)).unwrap();
}
@ -292,7 +292,7 @@ mod tests {
}
let mut f = tmpfile_with_contents(&[]);
let ex = Executor::with_executor_kind(kind.into()).unwrap();
let ex = Executor::with_executor_kind(kind).unwrap();
let source = ex.async_from(f.try_clone().unwrap()).unwrap();
ex.run_until(go(source)).unwrap();
@ -315,7 +315,7 @@ mod tests {
}
let f = tempfile::tempfile().unwrap();
let ex = Executor::with_executor_kind(kind.into()).unwrap();
let ex = Executor::with_executor_kind(kind).unwrap();
let source = ex.async_from(f).unwrap();
ex.run_until(go(source)).unwrap();
@ -346,7 +346,7 @@ mod tests {
f.write_all(&[0xBB; 32]).unwrap();
f.rewind().unwrap();
let ex = Executor::with_executor_kind(kind.into()).unwrap();
let ex = Executor::with_executor_kind(kind).unwrap();
let source = ex.async_from(f).unwrap();
ex.run_until(go(source)).unwrap();
@ -369,7 +369,7 @@ mod tests {
}
let f = tempfile::tempfile().unwrap();
let ex = Executor::with_executor_kind(kind.into()).unwrap();
let ex = Executor::with_executor_kind(kind).unwrap();
let source = ex.async_from(f).unwrap();
ex.run_until(go(source)).unwrap();
@ -393,7 +393,7 @@ mod tests {
f.write_all(&[0xffu8; 32]).unwrap();
f.rewind().unwrap();
let ex = Executor::with_executor_kind(kind.into()).unwrap();
let ex = Executor::with_executor_kind(kind).unwrap();
let source = ex.async_from(f).unwrap();
ex.run_until(go(source)).unwrap();
@ -419,7 +419,7 @@ mod tests {
}
let mut f = tempfile::tempfile().unwrap();
let ex = Executor::with_executor_kind(kind.into()).unwrap();
let ex = Executor::with_executor_kind(kind).unwrap();
let source = ex.async_from(f.try_clone().unwrap()).unwrap();
ex.run_until(go(source)).unwrap();

View file

@ -69,7 +69,6 @@ pub mod sync;
pub mod sys;
#[cfg(any(target_os = "android", target_os = "linux"))]
pub use sys::linux::uring_executor::is_uring_stable;
pub use sys::Executor;
mod common_executor;
mod timer;
mod waker;
@ -88,6 +87,7 @@ pub use blocking::BlockingPool;
pub use blocking::CancellableBlockingPool;
pub use blocking::TimeoutAction;
pub use event::EventAsync;
pub use executor::Executor;
pub use executor::ExecutorKind;
pub(crate) use executor::ExecutorTrait;
pub use executor::TaskHandle;

View file

@ -14,6 +14,4 @@ cfg_if::cfg_if! {
pub use platform::async_types;
pub use platform::event;
pub use platform::executor::Executor;
pub use platform::executor::ExecutorKindSys;
pub use platform::executor::SetDefaultExecutorKindError;

View file

@ -2,132 +2,8 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use std::future::Future;
use std::sync::Arc;
use base::debug;
use base::warn;
use base::AsRawDescriptors;
use base::RawDescriptor;
use once_cell::sync::OnceCell;
use serde::Deserialize;
use serde::Serialize;
use thiserror::Error as ThisError;
use super::fd_executor::EpollReactor;
use super::uring_executor::check_uring_availability;
use super::uring_executor::is_uring_stable;
use super::uring_executor::Error as UringError;
use super::uring_executor::UringReactor;
use crate::common_executor::RawExecutor;
use crate::AsyncResult;
use crate::ExecutorTrait;
use crate::IntoAsync;
use crate::IoSource;
use crate::TaskHandle;
/// An executor for scheduling tasks that poll futures to completion.
///
/// All asynchronous operations must run within an executor, which is capable of spawning futures as
/// tasks. This executor also provides a mechanism for performing asynchronous I/O operations.
///
/// The returned type is a cheap, clonable handle to the underlying executor. Cloning it will only
/// create a new reference, not a new executor.
///
/// Note that language limitations (trait objects can have <=1 non auto trait) require this to be
/// represented on the POSIX side as an enum, rather than a trait. This leads to some code &
/// interface duplication, but as far as we understand that is unavoidable.
///
/// See <https://chromium-review.googlesource.com/c/chromiumos/platform/crosvm/+/2571401/2..6/cros_async/src/executor.rs#b75>
/// for further details.
///
/// # Examples
///
/// Concurrently wait for multiple files to become readable/writable and then read/write the data.
///
/// ```
/// use std::cmp::min;
/// use std::error::Error;
/// use std::fs::{File, OpenOptions};
///
/// use cros_async::{AsyncResult, Executor, IoSource, complete3};
/// const CHUNK_SIZE: usize = 32;
///
/// // Write all bytes from `data` to `f`.
/// async fn write_file(f: &IoSource<File>, mut data: Vec<u8>) -> AsyncResult<()> {
/// while data.len() > 0 {
/// let (count, mut buf) = f.write_from_vec(None, data).await?;
///
/// data = buf.split_off(count);
/// }
///
/// Ok(())
/// }
///
/// // Transfer `len` bytes of data from `from` to `to`.
/// async fn transfer_data(
/// from: IoSource<File>,
/// to: IoSource<File>,
/// len: usize,
/// ) -> AsyncResult<usize> {
/// let mut rem = len;
///
/// while rem > 0 {
/// let buf = vec![0u8; min(rem, CHUNK_SIZE)];
/// let (count, mut data) = from.read_to_vec(None, buf).await?;
///
/// if count == 0 {
/// // End of file. Return the number of bytes transferred.
/// return Ok(len - rem);
/// }
///
/// data.truncate(count);
/// write_file(&to, data).await?;
///
/// rem = rem.saturating_sub(count);
/// }
///
/// Ok(len)
/// }
///
/// #[cfg(any(target_os = "android", target_os = "linux"))]
/// # fn do_it() -> Result<(), Box<dyn Error>> {
/// let ex = Executor::new()?;
///
/// let (rx, tx) = base::linux::pipe()?;
/// let zero = File::open("/dev/zero")?;
/// let zero_bytes = CHUNK_SIZE * 7;
/// let zero_to_pipe = transfer_data(
/// ex.async_from(zero)?,
/// ex.async_from(tx.try_clone()?)?,
/// zero_bytes,
/// );
///
/// let rand = File::open("/dev/urandom")?;
/// let rand_bytes = CHUNK_SIZE * 19;
/// let rand_to_pipe = transfer_data(ex.async_from(rand)?, ex.async_from(tx)?, rand_bytes);
///
/// let null = OpenOptions::new().write(true).open("/dev/null")?;
/// let null_bytes = zero_bytes + rand_bytes;
/// let pipe_to_null = transfer_data(ex.async_from(rx)?, ex.async_from(null)?, null_bytes);
///
/// ex.run_until(complete3(
/// async { assert_eq!(pipe_to_null.await.unwrap(), null_bytes) },
/// async { assert_eq!(zero_to_pipe.await.unwrap(), zero_bytes) },
/// async { assert_eq!(rand_to_pipe.await.unwrap(), rand_bytes) },
/// ))?;
///
/// # Ok(())
/// # }
/// #[cfg(any(target_os = "android", target_os = "linux"))]
/// # do_it().unwrap();
/// ```
#[derive(Clone)]
pub enum Executor {
Uring(Arc<RawExecutor<UringReactor>>),
Fd(Arc<RawExecutor<EpollReactor>>),
}
/// An enum to express the kind of the backend of `Executor`
#[derive(
@ -140,311 +16,3 @@ pub enum ExecutorKindSys {
#[serde(rename = "epoll")]
Fd,
}
/// If set, [`ExecutorKindSys::default()`] returns the value of `DEFAULT_EXECUTOR_KIND`.
/// If not set, [`ExecutorKindSys::default()`] returns a statically-chosen default value, and
/// [`ExecutorKindSys::default()`] initializes `DEFAULT_EXECUTOR_KIND` with that value.
static DEFAULT_EXECUTOR_KIND: OnceCell<ExecutorKindSys> = OnceCell::new();
impl Default for ExecutorKindSys {
fn default() -> Self {
*DEFAULT_EXECUTOR_KIND.get_or_init(|| ExecutorKindSys::Fd)
}
}
/// The error type for [`Executor::set_default_executor_kind()`].
#[derive(Debug, ThisError)]
pub enum SetDefaultExecutorKindError {
/// The default executor kind is set more than once.
#[error("The default executor kind is already set to {0:?}")]
SetMoreThanOnce(ExecutorKindSys),
/// io_uring is unavailable. The reason might be the lack of the kernel support,
/// but is not limited to that.
#[error("io_uring is unavailable: {0}")]
UringUnavailable(UringError),
}
impl Executor {
/// Create a new `Executor`.
pub fn new() -> AsyncResult<Self> {
Executor::with_executor_kind(ExecutorKindSys::default())
}
/// Create a new `Executor` of the given `ExecutorKind`.
pub fn with_executor_kind(kind: ExecutorKindSys) -> AsyncResult<Self> {
match kind {
ExecutorKindSys::Uring => RawExecutor::new().map(Executor::Uring),
ExecutorKindSys::Fd => RawExecutor::new().map(Executor::Fd),
}
}
/// Set the default ExecutorKind for [`Self::new()`]. This call is effective only once.
/// If a call is the first call, it sets the default, and `set_default_executor_kind`
/// returns `Ok(())`. Otherwise, it returns `SetDefaultExecutorKindError::SetMoreThanOnce`
/// which contains the existing ExecutorKind value configured by the first call.
pub fn set_default_executor_kind(
executor_kind: ExecutorKindSys,
) -> Result<(), SetDefaultExecutorKindError> {
if executor_kind == ExecutorKindSys::Uring {
check_uring_availability().map_err(SetDefaultExecutorKindError::UringUnavailable)?;
if !is_uring_stable() {
warn!(
"Enabling io_uring executor on the kernel version where io_uring is unstable"
);
}
}
debug!("setting the default executor to {:?}", executor_kind);
DEFAULT_EXECUTOR_KIND.set(executor_kind).map_err(|_|
// `expect` succeeds since this closure runs only when DEFAULT_EXECUTOR_KIND is set.
SetDefaultExecutorKindError::SetMoreThanOnce(
*DEFAULT_EXECUTOR_KIND
.get()
.expect("Failed to get DEFAULT_EXECUTOR_KIND"),
))
}
/// Create a new `IoSource<F>` associated with `self`. Callers may then use the returned
/// `IoSource` to directly start async operations without needing a separate reference to the
/// executor.
pub fn async_from<'a, F: IntoAsync + 'a>(&self, f: F) -> AsyncResult<IoSource<F>> {
ExecutorTrait::async_from(self, f)
}
/// Spawn a new future for this executor to run to completion. Callers may use the returned
/// `TaskHandle` to await on the result of `f`. Dropping the returned `TaskHandle` will cancel
/// `f`, preventing it from being polled again. To drop a `TaskHandle` without canceling the
/// future associated with it use `TaskHandle::detach`.
///
/// # Examples
///
/// ```
/// # use cros_async::AsyncResult;
/// # fn example_spawn() -> AsyncResult<()> {
/// # use std::thread;
///
/// # use cros_async::Executor;
/// use futures::executor::block_on;
///
/// # let ex = Executor::new()?;
///
/// # // Spawn a thread that runs the executor.
/// # let ex2 = ex.clone();
/// # thread::spawn(move || ex2.run());
///
/// let task = ex.spawn(async { 7 + 13 });
///
/// let result = block_on(task);
/// assert_eq!(result, 20);
/// # Ok(())
/// # }
///
/// # example_spawn().unwrap();
/// ```
pub fn spawn<F>(&self, f: F) -> TaskHandle<F::Output>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
ExecutorTrait::spawn(self, f)
}
/// Spawn a thread-local task for this executor to drive to completion. Like `spawn` but without
/// requiring `Send` on `F` or `F::Output`. This method should only be called from the same
/// thread where `run()` or `run_until()` is called.
///
/// # Panics
///
/// `Executor::run` and `Executor::run_until` will panic if they try to poll a future that was
/// added by calling `spawn_local` from a different thread.
///
/// # Examples
///
/// ```
/// # use cros_async::AsyncResult;
/// # fn example_spawn_local() -> AsyncResult<()> {
/// # use cros_async::Executor;
///
/// # let ex = Executor::new()?;
///
/// let task = ex.spawn_local(async { 7 + 13 });
///
/// let result = ex.run_until(task)?;
/// assert_eq!(result, 20);
/// # Ok(())
/// # }
///
/// # example_spawn_local().unwrap();
/// ```
pub fn spawn_local<F>(&self, f: F) -> TaskHandle<F::Output>
where
F: Future + 'static,
F::Output: 'static,
{
ExecutorTrait::spawn_local(self, f)
}
/// Run the provided closure on a dedicated thread where blocking is allowed.
///
/// Callers may `await` on the returned `TaskHandle` to wait for the result of `f`. Dropping
/// the returned `TaskHandle` may not cancel the operation if it was already started on a
/// worker thread.
///
/// # Panics
///
/// `await`ing the `TaskHandle` after the `Executor` is dropped will panic if the work was not
/// already completed.
///
/// # Examples
///
/// ```edition2018
/// # use cros_async::Executor;
///
/// # async fn do_it(ex: &Executor) {
/// let res = ex.spawn_blocking(move || {
/// // Do some CPU-intensive or blocking work here.
///
/// 42
/// }).await;
///
/// assert_eq!(res, 42);
/// # }
///
/// # let ex = Executor::new().unwrap();
/// # ex.run_until(do_it(&ex)).unwrap();
/// ```
pub fn spawn_blocking<F, R>(&self, f: F) -> TaskHandle<R>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
ExecutorTrait::spawn_blocking(self, f)
}
/// Run the executor indefinitely, driving all spawned futures to completion. This method will
/// block the current thread and only return in the case of an error.
///
/// # Panics
///
/// Once this method has been called on a thread, it may only be called on that thread from that
/// point on. Attempting to call it from another thread will panic.
///
/// # Examples
///
/// ```
/// # use cros_async::AsyncResult;
/// # fn example_run() -> AsyncResult<()> {
/// use std::thread;
///
/// use cros_async::Executor;
/// use futures::executor::block_on;
///
/// let ex = Executor::new()?;
///
/// // Spawn a thread that runs the executor.
/// let ex2 = ex.clone();
/// thread::spawn(move || ex2.run());
///
/// let task = ex.spawn(async { 7 + 13 });
///
/// let result = block_on(task);
/// assert_eq!(result, 20);
/// # Ok(())
/// # }
///
/// # example_run().unwrap();
/// ```
pub fn run(&self) -> AsyncResult<()> {
self.run_until(std::future::pending())
}
/// Drive all futures spawned in this executor until `f` completes. This method will block the
/// current thread only until `f` is complete and there may still be unfinished futures in the
/// executor.
///
/// # Panics
///
/// Once this method has been called on a thread, from then onwards it may only be called on
/// that thread. Attempting to call it from another thread will panic.
///
/// # Examples
///
/// ```
/// # use cros_async::AsyncResult;
/// # fn example_run_until() -> AsyncResult<()> {
/// use cros_async::Executor;
///
/// let ex = Executor::new()?;
///
/// let task = ex.spawn_local(async { 7 + 13 });
///
/// let result = ex.run_until(task)?;
/// assert_eq!(result, 20);
/// # Ok(())
/// # }
///
/// # example_run_until().unwrap();
/// ```
pub fn run_until<F: Future>(&self, f: F) -> AsyncResult<F::Output> {
ExecutorTrait::run_until(self, f)
}
}
impl ExecutorTrait for Executor {
fn async_from<'a, F: IntoAsync + 'a>(&self, f: F) -> AsyncResult<IoSource<F>> {
match self {
Executor::Uring(ex) => ex.async_from(f),
Executor::Fd(ex) => ex.async_from(f),
}
}
fn spawn<F>(&self, f: F) -> TaskHandle<F::Output>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
match self {
Executor::Uring(ex) => ex.spawn(f),
Executor::Fd(ex) => ex.spawn(f),
}
}
fn spawn_local<F>(&self, f: F) -> TaskHandle<F::Output>
where
F: Future + 'static,
F::Output: 'static,
{
match self {
Executor::Uring(ex) => ex.spawn_local(f),
Executor::Fd(ex) => ex.spawn_local(f),
}
}
fn spawn_blocking<F, R>(&self, f: F) -> TaskHandle<R>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
match self {
Executor::Uring(ex) => ex.spawn_blocking(f),
Executor::Fd(ex) => ex.spawn_blocking(f),
}
}
fn run_until<F: Future>(&self, f: F) -> AsyncResult<F::Output> {
match self {
Executor::Uring(ex) => Ok(ex.run_until(f)?),
Executor::Fd(ex) => Ok(ex.run_until(f)?),
}
}
}
impl AsRawDescriptors for Executor {
fn as_raw_descriptors(&self) -> Vec<RawDescriptor> {
match self {
Executor::Uring(ex) => ex.as_raw_descriptors(),
Executor::Fd(ex) => ex.as_raw_descriptors(),
}
}
}

View file

@ -386,7 +386,7 @@ mod tests {
waker: None,
}));
let uring_ex = Executor::with_executor_kind(ExecutorKindSys::Uring).unwrap();
let uring_ex = Executor::with_executor_kind(ExecutorKindSys::Uring.into()).unwrap();
let f = File::open("/dev/zero").unwrap();
let source = uring_ex.async_from(f).unwrap();
@ -395,7 +395,7 @@ mod tests {
};
let handle = std::thread::spawn(move || uring_ex.run_until(quit));
let poll_ex = Executor::with_executor_kind(ExecutorKindSys::Fd).unwrap();
let poll_ex = Executor::with_executor_kind(ExecutorKindSys::Fd.into()).unwrap();
poll_ex.run_until(go(source)).unwrap();
state.lock().wake();
@ -421,7 +421,7 @@ mod tests {
waker: None,
}));
let poll_ex = Executor::with_executor_kind(ExecutorKindSys::Fd).unwrap();
let poll_ex = Executor::with_executor_kind(ExecutorKindSys::Fd.into()).unwrap();
let f = File::open("/dev/zero").unwrap();
let source = poll_ex.async_from(f).unwrap();
@ -430,7 +430,7 @@ mod tests {
};
let handle = std::thread::spawn(move || poll_ex.run_until(quit));
let uring_ex = Executor::with_executor_kind(ExecutorKindSys::Uring).unwrap();
let uring_ex = Executor::with_executor_kind(ExecutorKindSys::Uring.into()).unwrap();
uring_ex.run_until(go(source)).unwrap();
state.lock().wake();

View file

@ -2,126 +2,8 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use std::future::Future;
use std::sync::Arc;
use once_cell::sync::OnceCell;
use serde::Deserialize;
use serde::Serialize;
use thiserror::Error as ThisError;
use super::HandleReactor;
use crate::common_executor::RawExecutor;
use crate::AsyncResult;
use crate::ExecutorTrait;
use crate::IntoAsync;
use crate::IoSource;
use crate::TaskHandle;
pub const DEFAULT_IO_CONCURRENCY: u32 = 1;
/// An executor for scheduling tasks that poll futures to completion.
///
/// All asynchronous operations must run within an executor, which is capable of spawning futures as
/// tasks. This executor also provides a mechanism for performing asynchronous I/O operations.
///
/// The returned type is a cheap, clonable handle to the underlying executor. Cloning it will only
/// create a new reference, not a new executor.
///
/// Note that language limitations (trait objects can have <=1 non auto trait) require this to be
/// represented on the POSIX side as an enum, rather than a trait. This leads to some code &
/// interface duplication, but as far as we understand that is unavoidable.
///
/// See <https://chromium-review.googlesource.com/c/chromiumos/platform/crosvm/+/2571401/2..6/cros_async/src/executor.rs#b75>
/// for further details.
///
/// # Examples
///
/// Concurrently wait for multiple files to become readable/writable and then read/write the data.
///
/// ```
/// use std::cmp::min;
/// use std::error::Error;
/// use std::fs::{File, OpenOptions};
///
/// use cros_async::{AsyncResult, Executor, IoSource, complete3};
/// const CHUNK_SIZE: usize = 32;
///
/// // Write all bytes from `data` to `f`.
/// async fn write_file(f: &IoSource<File>, mut data: Vec<u8>) -> AsyncResult<()> {
/// while data.len() > 0 {
/// let (count, mut buf) = f.write_from_vec(Some(0), data).await?;
///
/// data = buf.split_off(count);
/// }
///
/// Ok(())
/// }
///
/// // Transfer `len` bytes of data from `from` to `to`.
/// async fn transfer_data(
/// from: IoSource<File>,
/// to: IoSource<File>,
/// len: usize,
/// ) -> AsyncResult<usize> {
/// let mut rem = len;
///
/// while rem > 0 {
/// let buf = vec![0u8; min(rem, CHUNK_SIZE)];
/// let (count, mut data) = from.read_to_vec(Some(0), buf).await?;
///
/// if count == 0 {
/// // End of file. Return the number of bytes transferred.
/// return Ok(len - rem);
/// }
///
/// data.truncate(count);
/// write_file(&*to, data).await?;
///
/// rem = rem.saturating_sub(count);
/// }
///
/// Ok(len)
/// }
///
/// #[cfg(any(target_os = "android", target_os = "linux"))]
/// # fn do_it() -> Result<(), Box<dyn Error>> {
/// let ex = Executor::new()?;
///
/// let (rx, tx) = base::pipe()?;
/// let zero = File::open("/dev/zero")?;
/// let zero_bytes = CHUNK_SIZE * 7;
/// let zero_to_pipe = transfer_data(
/// ex.async_from(zero)?,
/// ex.async_from(tx.try_clone()?)?,
/// zero_bytes,
/// );
///
/// let rand = File::open("/dev/urandom")?;
/// let rand_bytes = CHUNK_SIZE * 19;
/// let rand_to_pipe = transfer_data(ex.async_from(rand)?, ex.async_from(tx)?, rand_bytes);
///
/// let null = OpenOptions::new().write(true).open("/dev/null")?;
/// let null_bytes = zero_bytes + rand_bytes;
/// let pipe_to_null = transfer_data(ex.async_from(rx)?, ex.async_from(null)?, null_bytes);
///
/// ex.run_until(complete3(
/// async { assert_eq!(pipe_to_null.await.unwrap(), null_bytes) },
/// async { assert_eq!(zero_to_pipe.await.unwrap(), zero_bytes) },
/// async { assert_eq!(rand_to_pipe.await.unwrap(), rand_bytes) },
/// ))?;
///
/// # Ok(())
/// # }
/// #[cfg(any(target_os = "android", target_os = "linux"))]
/// # do_it().unwrap();
/// ```
#[derive(Clone)]
pub enum Executor {
Handle(Arc<RawExecutor<HandleReactor>>),
Overlapped(Arc<RawExecutor<HandleReactor>>),
}
/// An enum to express the kind of the backend of `Executor`
#[derive(
@ -132,273 +14,3 @@ pub enum ExecutorKindSys {
Handle,
Overlapped,
}
/// If set, [`Executor::new()`] is created with `ExecutorKindSys` of `DEFAULT_EXECUTOR_KIND`.
static DEFAULT_EXECUTOR_KIND: OnceCell<ExecutorKindSys> = OnceCell::new();
impl Default for ExecutorKindSys {
fn default() -> Self {
DEFAULT_EXECUTOR_KIND
.get()
.copied()
.unwrap_or(ExecutorKindSys::Handle)
}
}
/// The error type for [`Executor::set_default_executor_kind()`].
#[derive(ThisError, Debug)]
pub enum SetDefaultExecutorKindError {
/// The default executor kind is set more than once.
#[error("The default executor kind is already set to {0:?}")]
SetMoreThanOnce(ExecutorKindSys),
}
impl Executor {
/// Create a new `Executor`.
pub fn new() -> AsyncResult<Self> {
Executor::with_executor_kind(ExecutorKindSys::default())
}
/// Create a new `Executor` of the given `ExecutorKindSys`.
pub fn with_executor_kind(kind: ExecutorKindSys) -> AsyncResult<Self> {
match kind {
ExecutorKindSys::Handle => Ok(Executor::Handle(RawExecutor::<HandleReactor>::new()?)),
ExecutorKindSys::Overlapped => {
Ok(Executor::Overlapped(RawExecutor::<HandleReactor>::new()?))
}
}
}
/// Create a new `Executor` of the given `ExecutorKind`.
pub fn with_kind_and_concurrency(kind: ExecutorKindSys, concurrency: u32) -> AsyncResult<Self> {
match kind {
ExecutorKindSys::Handle => Ok(Executor::Handle(RawExecutor::<HandleReactor>::new()?)),
ExecutorKindSys::Overlapped => Ok(Executor::Overlapped(
RawExecutor::<HandleReactor>::new_with(HandleReactor::new_with(concurrency)?)?,
)),
}
}
/// Create a new `IoSource<F>` associated with `self`. Callers may then use the returned
/// `IoSource` to directly start async operations without needing a separate reference to the
/// executor.
pub fn async_from<'a, F: IntoAsync + 'a>(&self, f: F) -> AsyncResult<IoSource<F>> {
ExecutorTrait::async_from(self, f)
}
/// Create a new overlapped `IoSource<F>` associated with `self`. Callers may then use the
/// If the executor is not overlapped, then Handle source is returned.
/// returned `IoSource` to directly start async operations without needing a separate reference
/// to the executor.
pub fn async_overlapped_from<'a, F: IntoAsync + 'a>(&self, f: F) -> AsyncResult<IoSource<F>> {
match self {
Executor::Handle(ex) => ex.async_from(f),
Executor::Overlapped(ex) => Ok(IoSource::Overlapped(super::OverlappedSource::new(
f, ex, false,
)?)),
}
}
/// Set the default ExecutorKind for [`Self::new()`]. This call is effective only once.
/// If a call is the first call, it sets the default, and `set_default_executor_kind`
/// returns `Ok(())`. Otherwise, it returns `SetDefaultExecutorKindError::SetMoreThanOnce`
/// which contains the existing ExecutorKind value configured by the first call.
pub fn set_default_executor_kind(
executor_kind: ExecutorKindSys,
) -> Result<(), SetDefaultExecutorKindError> {
DEFAULT_EXECUTOR_KIND.set(executor_kind).map_err(|_|
// `expect` succeeds since this closure runs only when DEFAULT_EXECUTOR_KIND is set.
SetDefaultExecutorKindError::SetMoreThanOnce(
*DEFAULT_EXECUTOR_KIND
.get()
.expect("Failed to get DEFAULT_EXECUTOR_KIND"),
))
}
/// Spawn a new future for this executor to run to completion. Callers may use the returned
/// `TaskHandle` to await on the result of `f`. Dropping the returned `TaskHandle` will cancel
/// `f`, preventing it from being polled again. To drop a `TaskHandle` without canceling the
/// future associated with it use `TaskHandle::detach`.
///
/// # Examples
///
/// ```
/// # use cros_async::AsyncResult;
/// # fn example_spawn() -> AsyncResult<()> {
/// # use std::thread;
///
/// # use cros_async::Executor;
/// use futures::executor::block_on;
///
/// # let ex = Executor::new()?;
///
/// # // Spawn a thread that runs the executor.
/// # let ex2 = ex.clone();
/// # thread::spawn(move || ex2.run());
///
/// let task = ex.spawn(async { 7 + 13 });
///
/// let result = block_on(task);
/// assert_eq!(result, 20);
/// # Ok(())
/// # }
///
/// # example_spawn().unwrap();
/// ```
pub fn spawn<F>(&self, f: F) -> TaskHandle<F::Output>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
ExecutorTrait::spawn(self, f)
}
/// Spawn a thread-local task for this executor to drive to completion. Like `spawn` but without
/// requiring `Send` on `F` or `F::Output`. This method should only be called from the same
/// thread where `run()` or `run_until()` is called.
///
/// # Panics
///
/// `Executor::run` and `Executor::run_util` will panic if they try to poll a future that was
/// added by calling `spawn_local` from a different thread.
///
/// # Examples
///
/// ```
/// # use cros_async::AsyncResult;
/// # fn example_spawn_local() -> AsyncResult<()> {
/// # use cros_async::Executor;
///
/// # let ex = Executor::new()?;
///
/// let task = ex.spawn_local(async { 7 + 13 });
///
/// let result = ex.run_until(task)?;
/// assert_eq!(result, 20);
/// # Ok(())
/// # }
///
/// # example_spawn_local().unwrap();
/// ```
pub fn spawn_local<F>(&self, f: F) -> TaskHandle<F::Output>
where
F: Future + 'static,
F::Output: 'static,
{
ExecutorTrait::spawn_local(self, f)
}
/// Run the executor indefinitely, driving all spawned futures to completion. This method will
/// block the current thread and only return in the case of an error.
///
/// # Panics
///
/// Once this method has been called on a thread, it may only be called on that thread from that
/// point on. Attempting to call it from another thread will panic.
///
/// # Examples
///
/// ```
/// # use cros_async::AsyncResult;
/// # fn example_run() -> AsyncResult<()> {
/// use std::thread;
///
/// use cros_async::Executor;
/// use futures::executor::block_on;
///
/// let ex = Executor::new()?;
///
/// // Spawn a thread that runs the executor.
/// let ex2 = ex.clone();
/// thread::spawn(move || ex2.run());
///
/// let task = ex.spawn(async { 7 + 13 });
///
/// let result = block_on(task);
/// assert_eq!(result, 20);
/// # Ok(())
/// # }
///
/// # example_run().unwrap();
/// ```
pub fn run(&self) -> AsyncResult<()> {
ExecutorTrait::run_until(self, std::future::pending())
}
/// Drive all futures spawned in this executor until `f` completes. This method will block the
/// current thread only until `f` is complete and there may still be unfinished futures in the
/// executor.
///
/// # Panics
///
/// Once this method has been called on a thread, from then onwards it may only be called on
/// that thread. Attempting to call it from another thread will panic.
///
/// # Examples
///
/// ```
/// # use cros_async::AsyncResult;
/// # fn example_run_until() -> AsyncResult<()> {
/// use cros_async::Executor;
///
/// let ex = Executor::new()?;
///
/// let task = ex.spawn_local(async { 7 + 13 });
///
/// let result = ex.run_until(task)?;
/// assert_eq!(result, 20);
/// # Ok(())
/// # }
///
/// # example_run_until().unwrap();
/// ```
pub fn run_until<F: Future>(&self, f: F) -> AsyncResult<F::Output> {
ExecutorTrait::run_until(self, f)
}
}
impl ExecutorTrait for Executor {
fn async_from<'a, F: IntoAsync + 'a>(&self, f: F) -> AsyncResult<IoSource<F>> {
match self {
Executor::Handle(ex) => ex.async_from(f),
Executor::Overlapped(ex) => ex.async_from(f),
}
}
fn spawn_blocking<F, R>(&self, _f: F) -> TaskHandle<R>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
todo!()
}
fn spawn<F>(&self, f: F) -> TaskHandle<F::Output>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
match self {
Executor::Handle(ex) => ex.spawn(f),
Executor::Overlapped(ex) => ex.spawn(f),
}
}
fn spawn_local<F>(&self, f: F) -> TaskHandle<F::Output>
where
F: Future + 'static,
F::Output: 'static,
{
match self {
Executor::Handle(ex) => ex.spawn_local(f),
Executor::Overlapped(ex) => ex.spawn_local(f),
}
}
fn run_until<F: Future>(&self, f: F) -> AsyncResult<F::Output> {
match self {
Executor::Handle(ex) => ex.run_until(f),
Executor::Overlapped(ex) => ex.run_until(f),
}
}
}

View file

@ -27,7 +27,6 @@ use winapi::um::minwinbase::OVERLAPPED;
use crate::common_executor;
use crate::common_executor::RawExecutor;
use crate::common_executor::RawTaskHandle;
use crate::sys::windows::executor::DEFAULT_IO_CONCURRENCY;
use crate::sys::windows::io_completion_port::CompletionPacket;
use crate::sys::windows::io_completion_port::IoCompletionPort;
use crate::waker::WakerToken;
@ -37,6 +36,8 @@ use crate::AsyncResult;
use crate::IoSource;
use crate::TaskHandle;
const DEFAULT_IO_CONCURRENCY: u32 = 1;
#[derive(Debug, ThisError)]
pub enum Error {
#[error("IO completion port operation failed: {0}")]

View file

@ -22,7 +22,7 @@ fn all_kinds() -> Vec<ExecutorKind> {
#[test]
fn cancel_pending_task() {
for kind in all_kinds() {
let ex = Executor::with_executor_kind(kind.into()).unwrap();
let ex = Executor::with_executor_kind(kind).unwrap();
let task = ex.spawn(std::future::pending::<()>());
assert_eq!(ex.run_until(task.cancel()).unwrap(), None);
}
@ -34,7 +34,7 @@ fn cancel_pending_task() {
#[test]
fn cancel_ready_task() {
for kind in all_kinds() {
let ex = Executor::with_executor_kind(kind.into()).unwrap();
let ex = Executor::with_executor_kind(kind).unwrap();
let (s, r) = futures::channel::oneshot::channel();
let mut s = Some(s);
let task = ex.spawn(futures::future::poll_fn(move |_| {

View file

@ -61,7 +61,6 @@ impl DiskOption {
impl BlockAsync {
pub fn create_executor(&self) -> Executor {
Executor::with_executor_kind(self.executor_kind.into())
.expect("Failed to create an executor")
Executor::with_executor_kind(self.executor_kind).expect("Failed to create an executor")
}
}

View file

@ -66,7 +66,7 @@ impl DiskOption {
impl BlockAsync {
pub fn create_executor(&self) -> Executor {
Executor::with_kind_and_concurrency(self.executor_kind.into(), self.io_concurrency)
Executor::with_kind_and_concurrency(self.executor_kind, self.io_concurrency)
.expect("Failed to create an executor")
}
}

View file

@ -678,8 +678,8 @@ impl VirtioDevice for Controller {
let intr = interrupt.clone();
let worker_thread = WorkerThread::start("v_scsi_ctrlq", move |kill_evt| {
let ex = Executor::with_executor_kind(executor_kind.into())
.expect("Failed to create an executor");
let ex =
Executor::with_executor_kind(executor_kind).expect("Failed to create an executor");
if let Err(err) = ex
.run_until(run_worker(
&ex,
@ -701,7 +701,7 @@ impl VirtioDevice for Controller {
let interrupt = interrupt.clone();
let worker_thread =
WorkerThread::start(format!("v_scsi_req_{}", i + 2), move |kill_evt| {
let ex = Executor::with_executor_kind(executor_kind.into())
let ex = Executor::with_executor_kind(executor_kind)
.expect("Failed to create an executor");
let async_logical_unit = targets
.0

View file

@ -70,7 +70,7 @@ pub fn start_device(opts: Options) -> anyhow::Result<()> {
let kind = disk_option
.async_executor
.unwrap_or(ExecutorKindSys::Handle.into());
let ex = Executor::with_executor_kind(kind.into()).context("failed to create executor")?;
let ex = Executor::with_executor_kind(kind).context("failed to create executor")?;
let block = Box::new(BlockAsync::new(
base_features(ProtectionType::Unprotected),

View file

@ -1628,7 +1628,7 @@ fn get_default_hypervisor() -> Option<HypervisorKind> {
pub fn run_config(cfg: Config) -> Result<ExitState> {
if let Some(async_executor) = cfg.async_executor {
Executor::set_default_executor_kind(async_executor.into())
Executor::set_default_executor_kind(async_executor)
.context("Failed to set the default async executor")?;
}
@ -4514,7 +4514,7 @@ fn start_vhost_user_control_server(
pub fn start_devices(opts: DevicesCommand) -> anyhow::Result<()> {
if let Some(async_executor) = opts.async_executor {
Executor::set_default_executor_kind(async_executor.into())
Executor::set_default_executor_kind(async_executor)
.context("Failed to set the default async executor")?;
}

View file

@ -503,7 +503,7 @@ fn create_qcow2(cmd: cmdline::CreateQcow2Command) -> std::result::Result<(), ()>
fn start_device(opts: cmdline::DeviceCommand) -> std::result::Result<(), ()> {
if let Some(async_executor) = opts.async_executor {
cros_async::Executor::set_default_executor_kind(async_executor.into())
cros_async::Executor::set_default_executor_kind(async_executor)
.map_err(|e| error!("Failed to set the default async executor: {:#}", e))?;
}