From b7ca74d51602463fb21ec711d24429c8884659bc Mon Sep 17 00:00:00 2001 From: "A. Cody Schuffelen" Date: Wed, 17 Jan 2024 16:27:46 -0800 Subject: [PATCH] cros_async: Combine platform-specific Executors Instead of a dispatch enum defined for windows and a dispatch enum defined for linux, there is now one dispatch enum with branches defined for the two platforms. This makes it so defining the tokio executor later only has to be added in one place. Bug: b/320603688 Test: tools/dev_container tools/presubmit Test: tools/dev_container tools/presubmit clippy Change-Id: I6eaf46710bbb54a9684cb1622da36c3026906a5c Reviewed-on: https://chromium-review.googlesource.com/c/crosvm/crosvm/+/5208332 Commit-Queue: Cody Schuffelen Reviewed-by: Frederick Mayle --- cros_async/src/executor.rs | 303 ++++++++++-- cros_async/src/io_source.rs | 18 +- cros_async/src/lib.rs | 2 +- cros_async/src/sys.rs | 2 - cros_async/src/sys/linux/executor.rs | 432 ------------------ cros_async/src/sys/linux/uring_source.rs | 8 +- cros_async/src/sys/windows/executor.rs | 388 ---------------- cros_async/src/sys/windows/handle_executor.rs | 3 +- cros_async/tests/executor.rs | 4 +- devices/src/virtio/block/sys/linux.rs | 3 +- devices/src/virtio/block/sys/windows.rs | 2 +- devices/src/virtio/scsi/device.rs | 6 +- .../vhost/user/device/block/sys/windows.rs | 2 +- src/crosvm/sys/linux.rs | 4 +- src/main.rs | 2 +- 15 files changed, 300 insertions(+), 879 deletions(-) diff --git a/cros_async/src/executor.rs b/cros_async/src/executor.rs index c39add7920..2f1e7166be 100644 --- a/cros_async/src/executor.rs +++ b/cros_async/src/executor.rs @@ -4,8 +4,18 @@ use std::future::Future; use std::pin::Pin; +use std::sync::Arc; + +#[cfg(any(target_os = "android", target_os = "linux"))] +use base::warn; +#[cfg(any(target_os = "android", target_os = "linux"))] +use base::AsRawDescriptors; +#[cfg(any(target_os = "android", target_os = "linux"))] +use base::RawDescriptor; +use once_cell::sync::OnceCell; use crate::common_executor; +use crate::common_executor::RawExecutor; #[cfg(any(target_os = "android", target_os = "linux"))] use crate::sys::linux; #[cfg(windows)] @@ -30,12 +40,6 @@ pub enum ExecutorKind { SysVariants(ExecutorKindSys), } -impl Default for ExecutorKind { - fn default() -> ExecutorKind { - ExecutorKind::SysVariants(ExecutorKindSys::default()) - } -} - impl From for ExecutorKind { fn from(e: ExecutorKindSys) -> ExecutorKind { ExecutorKind::SysVariants(e) @@ -51,6 +55,33 @@ impl From for ExecutorKindSys { } } +/// If set, [`Executor::new()`] is created with `ExecutorKindSys` of `DEFAULT_EXECUTOR_KIND`. +static DEFAULT_EXECUTOR_KIND: OnceCell = OnceCell::new(); + +impl Default for ExecutorKind { + fn default() -> Self { + #[cfg(any(target_os = "android", target_os = "linux"))] + let default_fn = || ExecutorKindSys::Fd.into(); + #[cfg(windows)] + let default_fn = || ExecutorKindSys::Handle.into(); + *DEFAULT_EXECUTOR_KIND.get_or_init(default_fn) + } +} + +/// The error type for [`Executor::set_default_executor_kind()`]. +#[derive(thiserror::Error, Debug)] +pub enum SetDefaultExecutorKindError { + /// The default executor kind is set more than once. + #[error("The default executor kind is already set to {0:?}")] + SetMoreThanOnce(ExecutorKind), + + #[cfg(any(target_os = "android", target_os = "linux"))] + /// io_uring is unavailable. The reason might be the lack of the kernel support, + /// but is not limited to that. + #[error("io_uring is unavailable: {0}")] + UringUnavailable(linux::uring_executor::Error), +} + /// Reference to a task managed by the executor. /// /// Dropping a `TaskHandle` attempts to cancel the associated task. Call `detach` to allow it to @@ -107,6 +138,27 @@ impl Future for TaskHandle { } } +pub(crate) trait ExecutorTrait { + fn async_from<'a, F: IntoAsync + 'a>(&self, f: F) -> AsyncResult>; + + fn spawn(&self, f: F) -> TaskHandle + where + F: Future + Send + 'static, + F::Output: Send + 'static; + + fn spawn_blocking(&self, f: F) -> TaskHandle + where + F: FnOnce() -> R + Send + 'static, + R: Send + 'static; + + fn spawn_local(&self, f: F) -> TaskHandle + where + F: Future + 'static, + F::Output: 'static; + + fn run_until(&self, f: F) -> AsyncResult; +} + /// An executor for scheduling tasks that poll futures to completion. /// /// All asynchronous operations must run within an executor, which is capable of spawning futures as @@ -203,11 +255,108 @@ impl Future for TaskHandle { /// #[cfg(any(target_os = "android", target_os = "linux"))] /// # do_it().unwrap(); /// ``` -pub(crate) trait ExecutorTrait { +#[derive(Clone)] +pub enum Executor { + #[cfg(any(target_os = "android", target_os = "linux"))] + Fd(Arc>), + #[cfg(any(target_os = "android", target_os = "linux"))] + Uring(Arc>), + #[cfg(windows)] + Handle(Arc>), + #[cfg(windows)] + Overlapped(Arc>), +} + +impl Executor { + /// Create a new `Executor`. + pub fn new() -> AsyncResult { + Executor::with_executor_kind(ExecutorKind::default()) + } + + /// Create a new `Executor` of the given `ExecutorKind`. + pub fn with_executor_kind(kind: ExecutorKind) -> AsyncResult { + Ok(match kind { + #[cfg(any(target_os = "android", target_os = "linux"))] + ExecutorKind::SysVariants(ExecutorKindSys::Fd) => Executor::Fd(RawExecutor::new()?), + #[cfg(any(target_os = "android", target_os = "linux"))] + ExecutorKind::SysVariants(ExecutorKindSys::Uring) => { + Executor::Uring(RawExecutor::new()?) + } + #[cfg(windows)] + ExecutorKind::SysVariants(ExecutorKindSys::Handle) => { + Executor::Handle(RawExecutor::new()?) + } + #[cfg(windows)] + ExecutorKind::SysVariants(ExecutorKindSys::Overlapped) => { + Executor::Overlapped(RawExecutor::new()?) + } + }) + } + + /// Create a new `Executor` of the given `ExecutorKind`. + pub fn with_kind_and_concurrency(kind: ExecutorKind, _concurrency: u32) -> AsyncResult { + Ok(match kind { + #[cfg(windows)] + ExecutorKind::SysVariants(ExecutorKindSys::Overlapped) => { + let reactor = windows::HandleReactor::new_with(_concurrency)?; + Executor::Overlapped(RawExecutor::new_with(reactor)?) + } + _ => Executor::with_executor_kind(kind)?, + }) + } + + /// Set the default ExecutorKind for [`Self::new()`]. This call is effective only once. + pub fn set_default_executor_kind( + executor_kind: ExecutorKind, + ) -> Result<(), SetDefaultExecutorKindError> { + #[cfg(any(target_os = "android", target_os = "linux"))] + if executor_kind == ExecutorKind::SysVariants(ExecutorKindSys::Uring) { + linux::uring_executor::check_uring_availability() + .map_err(SetDefaultExecutorKindError::UringUnavailable)?; + if !crate::is_uring_stable() { + warn!( + "Enabling io_uring executor on the kernel version where io_uring is unstable" + ); + } + } + DEFAULT_EXECUTOR_KIND.set(executor_kind).map_err(|_| + // `expect` succeeds since this closure runs only when DEFAULT_EXECUTOR_KIND is set. + SetDefaultExecutorKindError::SetMoreThanOnce( + *DEFAULT_EXECUTOR_KIND + .get() + .expect("Failed to get DEFAULT_EXECUTOR_KIND"), + )) + } + /// Create a new `IoSource` associated with `self`. Callers may then use the returned /// `IoSource` to directly start async operations without needing a separate reference to the /// executor. - fn async_from<'a, F: IntoAsync + 'a>(&self, f: F) -> AsyncResult>; + pub fn async_from<'a, F: IntoAsync + 'a>(&self, f: F) -> AsyncResult> { + match self { + #[cfg(any(target_os = "android", target_os = "linux"))] + Executor::Fd(ex) => ex.async_from(f), + #[cfg(any(target_os = "android", target_os = "linux"))] + Executor::Uring(ex) => ex.async_from(f), + #[cfg(windows)] + Executor::Handle(ex) => ex.async_from(f), + #[cfg(windows)] + Executor::Overlapped(ex) => ex.async_from(f), + } + } + + /// Create a new overlapped `IoSource` associated with `self`. Callers may then use the + /// If the executor is not overlapped, then Handle source is returned. + /// returned `IoSource` to directly start async operations without needing a separate reference + /// to the executor. + #[cfg(windows)] + pub fn async_overlapped_from<'a, F: IntoAsync + 'a>(&self, f: F) -> AsyncResult> { + match self { + Executor::Overlapped(ex) => Ok(IoSource::Overlapped(windows::OverlappedSource::new( + f, ex, false, + )?)), + _ => self.async_from(f), + } + } /// Spawn a new future for this executor to run to completion. Callers may use the returned /// `TaskHandle` to await on the result of `f`. Dropping the returned `TaskHandle` will cancel @@ -239,10 +388,66 @@ pub(crate) trait ExecutorTrait { /// /// # example_spawn().unwrap(); /// ``` - fn spawn(&self, f: F) -> TaskHandle + pub fn spawn(&self, f: F) -> TaskHandle where F: Future + Send + 'static, - F::Output: Send + 'static; + F::Output: Send + 'static, + { + match self { + #[cfg(any(target_os = "android", target_os = "linux"))] + Executor::Fd(ex) => ex.spawn(f), + #[cfg(any(target_os = "android", target_os = "linux"))] + Executor::Uring(ex) => ex.spawn(f), + #[cfg(windows)] + Executor::Handle(ex) => ex.spawn(f), + #[cfg(windows)] + Executor::Overlapped(ex) => ex.spawn(f), + } + } + + /// Spawn a thread-local task for this executor to drive to completion. Like `spawn` but without + /// requiring `Send` on `F` or `F::Output`. This method should only be called from the same + /// thread where `run()` or `run_until()` is called. + /// + /// # Panics + /// + /// `Executor::run` and `Executor::run_util` will panic if they try to poll a future that was + /// added by calling `spawn_local` from a different thread. + /// + /// # Examples + /// + /// ``` + /// # use cros_async::AsyncResult; + /// # fn example_spawn_local() -> AsyncResult<()> { + /// # use cros_async::Executor; + /// + /// # let ex = Executor::new()?; + /// + /// let task = ex.spawn_local(async { 7 + 13 }); + /// + /// let result = ex.run_until(task)?; + /// assert_eq!(result, 20); + /// Ok(()) + /// # } + /// + /// # example_spawn_local().unwrap(); + /// ``` + pub fn spawn_local(&self, f: F) -> TaskHandle + where + F: Future + 'static, + F::Output: 'static, + { + match self { + #[cfg(any(target_os = "android", target_os = "linux"))] + Executor::Fd(ex) => ex.spawn_local(f), + #[cfg(any(target_os = "android", target_os = "linux"))] + Executor::Uring(ex) => ex.spawn_local(f), + #[cfg(windows)] + Executor::Handle(ex) => ex.spawn_local(f), + #[cfg(windows)] + Executor::Overlapped(ex) => ex.spawn_local(f), + } + } /// Run the provided closure on a dedicated thread where blocking is allowed. /// @@ -273,42 +478,59 @@ pub(crate) trait ExecutorTrait { /// # let ex = Executor::new().unwrap(); /// # ex.run_until(do_it(&ex)).unwrap(); /// ``` - fn spawn_blocking(&self, f: F) -> TaskHandle + pub fn spawn_blocking(&self, f: F) -> TaskHandle where F: FnOnce() -> R + Send + 'static, - R: Send + 'static; + R: Send + 'static, + { + match self { + #[cfg(any(target_os = "android", target_os = "linux"))] + Executor::Fd(ex) => ex.spawn_blocking(f), + #[cfg(any(target_os = "android", target_os = "linux"))] + Executor::Uring(ex) => ex.spawn_blocking(f), + #[cfg(windows)] + Executor::Handle(ex) => ex.spawn_blocking(f), + #[cfg(windows)] + Executor::Overlapped(ex) => ex.spawn_blocking(f), + } + } - /// Spawn a thread-local task for this executor to drive to completion. Like `spawn` but without - /// requiring `Send` on `F` or `F::Output`. This method should only be called from the same - /// thread where `run()` or `run_until()` is called. + /// Run the executor indefinitely, driving all spawned futures to completion. This method will + /// block the current thread and only return in the case of an error. /// /// # Panics /// - /// `Executor::run` and `Executor::run_util` will panic if they try to poll a future that was - /// added by calling `spawn_local` from a different thread. + /// Once this method has been called on a thread, it may only be called on that thread from that + /// point on. Attempting to call it from another thread will panic. /// /// # Examples /// /// ``` /// # use cros_async::AsyncResult; - /// # fn example_spawn_local() -> AsyncResult<()> { - /// # use cros_async::Executor; + /// # fn example_run() -> AsyncResult<()> { + /// use std::thread; /// - /// # let ex = Executor::new()?; + /// use cros_async::Executor; + /// use futures::executor::block_on; /// - /// let task = ex.spawn_local(async { 7 + 13 }); + /// let ex = Executor::new()?; /// - /// let result = ex.run_until(task)?; - /// assert_eq!(result, 20); - /// Ok(()) + /// // Spawn a thread that runs the executor. + /// let ex2 = ex.clone(); + /// thread::spawn(move || ex2.run()); + /// + /// let task = ex.spawn(async { 7 + 13 }); + /// + /// let result = block_on(task); + /// assert_eq!(result, 20); + /// # Ok(()) /// # } /// - /// # example_spawn_local().unwrap(); + /// # example_run().unwrap(); /// ``` - fn spawn_local(&self, f: F) -> TaskHandle - where - F: Future + 'static, - F::Output: 'static; + pub fn run(&self) -> AsyncResult<()> { + self.run_until(std::future::pending()) + } /// Drive all futures spawned in this executor until `f` completes. This method will block the /// current thread only until `f` is complete and there may still be unfinished futures in the @@ -337,5 +559,26 @@ pub(crate) trait ExecutorTrait { /// /// # example_run_until().unwrap(); /// ``` - fn run_until(&self, f: F) -> AsyncResult; + pub fn run_until(&self, f: F) -> AsyncResult { + match self { + #[cfg(any(target_os = "android", target_os = "linux"))] + Executor::Fd(ex) => ex.run_until(f), + #[cfg(any(target_os = "android", target_os = "linux"))] + Executor::Uring(ex) => ex.run_until(f), + #[cfg(windows)] + Executor::Handle(ex) => ex.run_until(f), + #[cfg(windows)] + Executor::Overlapped(ex) => ex.run_until(f), + } + } +} + +#[cfg(any(target_os = "android", target_os = "linux"))] +impl AsRawDescriptors for Executor { + fn as_raw_descriptors(&self) -> Vec { + match self { + Executor::Fd(ex) => ex.as_raw_descriptors(), + Executor::Uring(ex) => ex.as_raw_descriptors(), + } + } } diff --git a/cros_async/src/io_source.rs b/cros_async/src/io_source.rs index c61a337321..efc4c03b51 100644 --- a/cros_async/src/io_source.rs +++ b/cros_async/src/io_source.rs @@ -214,7 +214,7 @@ mod tests { } let f = tmpfile_with_contents("data".as_bytes()); - let ex = Executor::with_executor_kind(kind.into()).unwrap(); + let ex = Executor::with_executor_kind(kind).unwrap(); let source = ex.async_from(f).unwrap(); ex.run_until(go(source)).unwrap(); } @@ -232,7 +232,7 @@ mod tests { } let mut f = tmpfile_with_contents(&[]); - let ex = Executor::with_executor_kind(kind.into()).unwrap(); + let ex = Executor::with_executor_kind(kind).unwrap(); let source = ex.async_from(f.try_clone().unwrap()).unwrap(); ex.run_until(go(source)).unwrap(); @@ -266,7 +266,7 @@ mod tests { } let f = tmpfile_with_contents("data".as_bytes()); - let ex = Executor::with_executor_kind(kind.into()).unwrap(); + let ex = Executor::with_executor_kind(kind).unwrap(); let source = ex.async_from(f).unwrap(); ex.run_until(go(source)).unwrap(); } @@ -292,7 +292,7 @@ mod tests { } let mut f = tmpfile_with_contents(&[]); - let ex = Executor::with_executor_kind(kind.into()).unwrap(); + let ex = Executor::with_executor_kind(kind).unwrap(); let source = ex.async_from(f.try_clone().unwrap()).unwrap(); ex.run_until(go(source)).unwrap(); @@ -315,7 +315,7 @@ mod tests { } let f = tempfile::tempfile().unwrap(); - let ex = Executor::with_executor_kind(kind.into()).unwrap(); + let ex = Executor::with_executor_kind(kind).unwrap(); let source = ex.async_from(f).unwrap(); ex.run_until(go(source)).unwrap(); @@ -346,7 +346,7 @@ mod tests { f.write_all(&[0xBB; 32]).unwrap(); f.rewind().unwrap(); - let ex = Executor::with_executor_kind(kind.into()).unwrap(); + let ex = Executor::with_executor_kind(kind).unwrap(); let source = ex.async_from(f).unwrap(); ex.run_until(go(source)).unwrap(); @@ -369,7 +369,7 @@ mod tests { } let f = tempfile::tempfile().unwrap(); - let ex = Executor::with_executor_kind(kind.into()).unwrap(); + let ex = Executor::with_executor_kind(kind).unwrap(); let source = ex.async_from(f).unwrap(); ex.run_until(go(source)).unwrap(); @@ -393,7 +393,7 @@ mod tests { f.write_all(&[0xffu8; 32]).unwrap(); f.rewind().unwrap(); - let ex = Executor::with_executor_kind(kind.into()).unwrap(); + let ex = Executor::with_executor_kind(kind).unwrap(); let source = ex.async_from(f).unwrap(); ex.run_until(go(source)).unwrap(); @@ -419,7 +419,7 @@ mod tests { } let mut f = tempfile::tempfile().unwrap(); - let ex = Executor::with_executor_kind(kind.into()).unwrap(); + let ex = Executor::with_executor_kind(kind).unwrap(); let source = ex.async_from(f.try_clone().unwrap()).unwrap(); ex.run_until(go(source)).unwrap(); diff --git a/cros_async/src/lib.rs b/cros_async/src/lib.rs index 3506bebef1..70ac127d17 100644 --- a/cros_async/src/lib.rs +++ b/cros_async/src/lib.rs @@ -69,7 +69,6 @@ pub mod sync; pub mod sys; #[cfg(any(target_os = "android", target_os = "linux"))] pub use sys::linux::uring_executor::is_uring_stable; -pub use sys::Executor; mod common_executor; mod timer; mod waker; @@ -88,6 +87,7 @@ pub use blocking::BlockingPool; pub use blocking::CancellableBlockingPool; pub use blocking::TimeoutAction; pub use event::EventAsync; +pub use executor::Executor; pub use executor::ExecutorKind; pub(crate) use executor::ExecutorTrait; pub use executor::TaskHandle; diff --git a/cros_async/src/sys.rs b/cros_async/src/sys.rs index 60705799b3..ff1ab59307 100644 --- a/cros_async/src/sys.rs +++ b/cros_async/src/sys.rs @@ -14,6 +14,4 @@ cfg_if::cfg_if! { pub use platform::async_types; pub use platform::event; -pub use platform::executor::Executor; pub use platform::executor::ExecutorKindSys; -pub use platform::executor::SetDefaultExecutorKindError; diff --git a/cros_async/src/sys/linux/executor.rs b/cros_async/src/sys/linux/executor.rs index b5c434007c..a08ff255ac 100644 --- a/cros_async/src/sys/linux/executor.rs +++ b/cros_async/src/sys/linux/executor.rs @@ -2,132 +2,8 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. -use std::future::Future; -use std::sync::Arc; - -use base::debug; -use base::warn; -use base::AsRawDescriptors; -use base::RawDescriptor; -use once_cell::sync::OnceCell; use serde::Deserialize; use serde::Serialize; -use thiserror::Error as ThisError; - -use super::fd_executor::EpollReactor; -use super::uring_executor::check_uring_availability; -use super::uring_executor::is_uring_stable; -use super::uring_executor::Error as UringError; -use super::uring_executor::UringReactor; -use crate::common_executor::RawExecutor; -use crate::AsyncResult; -use crate::ExecutorTrait; -use crate::IntoAsync; -use crate::IoSource; -use crate::TaskHandle; - -/// An executor for scheduling tasks that poll futures to completion. -/// -/// All asynchronous operations must run within an executor, which is capable of spawning futures as -/// tasks. This executor also provides a mechanism for performing asynchronous I/O operations. -/// -/// The returned type is a cheap, clonable handle to the underlying executor. Cloning it will only -/// create a new reference, not a new executor. -/// -/// Note that language limitations (trait objects can have <=1 non auto trait) require this to be -/// represented on the POSIX side as an enum, rather than a trait. This leads to some code & -/// interface duplication, but as far as we understand that is unavoidable. -/// -/// See -/// for further details. -/// -/// # Examples -/// -/// Concurrently wait for multiple files to become readable/writable and then read/write the data. -/// -/// ``` -/// use std::cmp::min; -/// use std::error::Error; -/// use std::fs::{File, OpenOptions}; -/// -/// use cros_async::{AsyncResult, Executor, IoSource, complete3}; -/// const CHUNK_SIZE: usize = 32; -/// -/// // Write all bytes from `data` to `f`. -/// async fn write_file(f: &IoSource, mut data: Vec) -> AsyncResult<()> { -/// while data.len() > 0 { -/// let (count, mut buf) = f.write_from_vec(None, data).await?; -/// -/// data = buf.split_off(count); -/// } -/// -/// Ok(()) -/// } -/// -/// // Transfer `len` bytes of data from `from` to `to`. -/// async fn transfer_data( -/// from: IoSource, -/// to: IoSource, -/// len: usize, -/// ) -> AsyncResult { -/// let mut rem = len; -/// -/// while rem > 0 { -/// let buf = vec![0u8; min(rem, CHUNK_SIZE)]; -/// let (count, mut data) = from.read_to_vec(None, buf).await?; -/// -/// if count == 0 { -/// // End of file. Return the number of bytes transferred. -/// return Ok(len - rem); -/// } -/// -/// data.truncate(count); -/// write_file(&to, data).await?; -/// -/// rem = rem.saturating_sub(count); -/// } -/// -/// Ok(len) -/// } -/// -/// #[cfg(any(target_os = "android", target_os = "linux"))] -/// # fn do_it() -> Result<(), Box> { -/// let ex = Executor::new()?; -/// -/// let (rx, tx) = base::linux::pipe()?; -/// let zero = File::open("/dev/zero")?; -/// let zero_bytes = CHUNK_SIZE * 7; -/// let zero_to_pipe = transfer_data( -/// ex.async_from(zero)?, -/// ex.async_from(tx.try_clone()?)?, -/// zero_bytes, -/// ); -/// -/// let rand = File::open("/dev/urandom")?; -/// let rand_bytes = CHUNK_SIZE * 19; -/// let rand_to_pipe = transfer_data(ex.async_from(rand)?, ex.async_from(tx)?, rand_bytes); -/// -/// let null = OpenOptions::new().write(true).open("/dev/null")?; -/// let null_bytes = zero_bytes + rand_bytes; -/// let pipe_to_null = transfer_data(ex.async_from(rx)?, ex.async_from(null)?, null_bytes); -/// -/// ex.run_until(complete3( -/// async { assert_eq!(pipe_to_null.await.unwrap(), null_bytes) }, -/// async { assert_eq!(zero_to_pipe.await.unwrap(), zero_bytes) }, -/// async { assert_eq!(rand_to_pipe.await.unwrap(), rand_bytes) }, -/// ))?; -/// -/// # Ok(()) -/// # } -/// #[cfg(any(target_os = "android", target_os = "linux"))] -/// # do_it().unwrap(); -/// ``` - -#[derive(Clone)] -pub enum Executor { - Uring(Arc>), - Fd(Arc>), -} /// An enum to express the kind of the backend of `Executor` #[derive( @@ -140,311 +16,3 @@ pub enum ExecutorKindSys { #[serde(rename = "epoll")] Fd, } - -/// If set, [`ExecutorKindSys::default()`] returns the value of `DEFAULT_EXECUTOR_KIND`. -/// If not set, [`ExecutorKindSys::default()`] returns a statically-chosen default value, and -/// [`ExecutorKindSys::default()`] initializes `DEFAULT_EXECUTOR_KIND` with that value. -static DEFAULT_EXECUTOR_KIND: OnceCell = OnceCell::new(); - -impl Default for ExecutorKindSys { - fn default() -> Self { - *DEFAULT_EXECUTOR_KIND.get_or_init(|| ExecutorKindSys::Fd) - } -} - -/// The error type for [`Executor::set_default_executor_kind()`]. -#[derive(Debug, ThisError)] -pub enum SetDefaultExecutorKindError { - /// The default executor kind is set more than once. - #[error("The default executor kind is already set to {0:?}")] - SetMoreThanOnce(ExecutorKindSys), - - /// io_uring is unavailable. The reason might be the lack of the kernel support, - /// but is not limited to that. - #[error("io_uring is unavailable: {0}")] - UringUnavailable(UringError), -} - -impl Executor { - /// Create a new `Executor`. - pub fn new() -> AsyncResult { - Executor::with_executor_kind(ExecutorKindSys::default()) - } - - /// Create a new `Executor` of the given `ExecutorKind`. - pub fn with_executor_kind(kind: ExecutorKindSys) -> AsyncResult { - match kind { - ExecutorKindSys::Uring => RawExecutor::new().map(Executor::Uring), - ExecutorKindSys::Fd => RawExecutor::new().map(Executor::Fd), - } - } - - /// Set the default ExecutorKind for [`Self::new()`]. This call is effective only once. - /// If a call is the first call, it sets the default, and `set_default_executor_kind` - /// returns `Ok(())`. Otherwise, it returns `SetDefaultExecutorKindError::SetMoreThanOnce` - /// which contains the existing ExecutorKind value configured by the first call. - pub fn set_default_executor_kind( - executor_kind: ExecutorKindSys, - ) -> Result<(), SetDefaultExecutorKindError> { - if executor_kind == ExecutorKindSys::Uring { - check_uring_availability().map_err(SetDefaultExecutorKindError::UringUnavailable)?; - if !is_uring_stable() { - warn!( - "Enabling io_uring executor on the kernel version where io_uring is unstable" - ); - } - } - - debug!("setting the default executor to {:?}", executor_kind); - DEFAULT_EXECUTOR_KIND.set(executor_kind).map_err(|_| - // `expect` succeeds since this closure runs only when DEFAULT_EXECUTOR_KIND is set. - SetDefaultExecutorKindError::SetMoreThanOnce( - *DEFAULT_EXECUTOR_KIND - .get() - .expect("Failed to get DEFAULT_EXECUTOR_KIND"), - )) - } - - /// Create a new `IoSource` associated with `self`. Callers may then use the returned - /// `IoSource` to directly start async operations without needing a separate reference to the - /// executor. - pub fn async_from<'a, F: IntoAsync + 'a>(&self, f: F) -> AsyncResult> { - ExecutorTrait::async_from(self, f) - } - - /// Spawn a new future for this executor to run to completion. Callers may use the returned - /// `TaskHandle` to await on the result of `f`. Dropping the returned `TaskHandle` will cancel - /// `f`, preventing it from being polled again. To drop a `TaskHandle` without canceling the - /// future associated with it use `TaskHandle::detach`. - /// - /// # Examples - /// - /// ``` - /// # use cros_async::AsyncResult; - /// # fn example_spawn() -> AsyncResult<()> { - /// # use std::thread; - /// - /// # use cros_async::Executor; - /// use futures::executor::block_on; - /// - /// # let ex = Executor::new()?; - /// - /// # // Spawn a thread that runs the executor. - /// # let ex2 = ex.clone(); - /// # thread::spawn(move || ex2.run()); - /// - /// let task = ex.spawn(async { 7 + 13 }); - /// - /// let result = block_on(task); - /// assert_eq!(result, 20); - /// # Ok(()) - /// # } - /// - /// # example_spawn().unwrap(); - /// ``` - pub fn spawn(&self, f: F) -> TaskHandle - where - F: Future + Send + 'static, - F::Output: Send + 'static, - { - ExecutorTrait::spawn(self, f) - } - - /// Spawn a thread-local task for this executor to drive to completion. Like `spawn` but without - /// requiring `Send` on `F` or `F::Output`. This method should only be called from the same - /// thread where `run()` or `run_until()` is called. - /// - /// # Panics - /// - /// `Executor::run` and `Executor::run_until` will panic if they try to poll a future that was - /// added by calling `spawn_local` from a different thread. - /// - /// # Examples - /// - /// ``` - /// # use cros_async::AsyncResult; - /// # fn example_spawn_local() -> AsyncResult<()> { - /// # use cros_async::Executor; - /// - /// # let ex = Executor::new()?; - /// - /// let task = ex.spawn_local(async { 7 + 13 }); - /// - /// let result = ex.run_until(task)?; - /// assert_eq!(result, 20); - /// # Ok(()) - /// # } - /// - /// # example_spawn_local().unwrap(); - /// ``` - pub fn spawn_local(&self, f: F) -> TaskHandle - where - F: Future + 'static, - F::Output: 'static, - { - ExecutorTrait::spawn_local(self, f) - } - - /// Run the provided closure on a dedicated thread where blocking is allowed. - /// - /// Callers may `await` on the returned `TaskHandle` to wait for the result of `f`. Dropping - /// the returned `TaskHandle` may not cancel the operation if it was already started on a - /// worker thread. - /// - /// # Panics - /// - /// `await`ing the `TaskHandle` after the `Executor` is dropped will panic if the work was not - /// already completed. - /// - /// # Examples - /// - /// ```edition2018 - /// # use cros_async::Executor; - /// - /// # async fn do_it(ex: &Executor) { - /// let res = ex.spawn_blocking(move || { - /// // Do some CPU-intensive or blocking work here. - /// - /// 42 - /// }).await; - /// - /// assert_eq!(res, 42); - /// # } - /// - /// # let ex = Executor::new().unwrap(); - /// # ex.run_until(do_it(&ex)).unwrap(); - /// ``` - pub fn spawn_blocking(&self, f: F) -> TaskHandle - where - F: FnOnce() -> R + Send + 'static, - R: Send + 'static, - { - ExecutorTrait::spawn_blocking(self, f) - } - - /// Run the executor indefinitely, driving all spawned futures to completion. This method will - /// block the current thread and only return in the case of an error. - /// - /// # Panics - /// - /// Once this method has been called on a thread, it may only be called on that thread from that - /// point on. Attempting to call it from another thread will panic. - /// - /// # Examples - /// - /// ``` - /// # use cros_async::AsyncResult; - /// # fn example_run() -> AsyncResult<()> { - /// use std::thread; - /// - /// use cros_async::Executor; - /// use futures::executor::block_on; - /// - /// let ex = Executor::new()?; - /// - /// // Spawn a thread that runs the executor. - /// let ex2 = ex.clone(); - /// thread::spawn(move || ex2.run()); - /// - /// let task = ex.spawn(async { 7 + 13 }); - /// - /// let result = block_on(task); - /// assert_eq!(result, 20); - /// # Ok(()) - /// # } - /// - /// # example_run().unwrap(); - /// ``` - pub fn run(&self) -> AsyncResult<()> { - self.run_until(std::future::pending()) - } - - /// Drive all futures spawned in this executor until `f` completes. This method will block the - /// current thread only until `f` is complete and there may still be unfinished futures in the - /// executor. - /// - /// # Panics - /// - /// Once this method has been called on a thread, from then onwards it may only be called on - /// that thread. Attempting to call it from another thread will panic. - /// - /// # Examples - /// - /// ``` - /// # use cros_async::AsyncResult; - /// # fn example_run_until() -> AsyncResult<()> { - /// use cros_async::Executor; - /// - /// let ex = Executor::new()?; - /// - /// let task = ex.spawn_local(async { 7 + 13 }); - /// - /// let result = ex.run_until(task)?; - /// assert_eq!(result, 20); - /// # Ok(()) - /// # } - /// - /// # example_run_until().unwrap(); - /// ``` - pub fn run_until(&self, f: F) -> AsyncResult { - ExecutorTrait::run_until(self, f) - } -} - -impl ExecutorTrait for Executor { - fn async_from<'a, F: IntoAsync + 'a>(&self, f: F) -> AsyncResult> { - match self { - Executor::Uring(ex) => ex.async_from(f), - Executor::Fd(ex) => ex.async_from(f), - } - } - - fn spawn(&self, f: F) -> TaskHandle - where - F: Future + Send + 'static, - F::Output: Send + 'static, - { - match self { - Executor::Uring(ex) => ex.spawn(f), - Executor::Fd(ex) => ex.spawn(f), - } - } - - fn spawn_local(&self, f: F) -> TaskHandle - where - F: Future + 'static, - F::Output: 'static, - { - match self { - Executor::Uring(ex) => ex.spawn_local(f), - Executor::Fd(ex) => ex.spawn_local(f), - } - } - - fn spawn_blocking(&self, f: F) -> TaskHandle - where - F: FnOnce() -> R + Send + 'static, - R: Send + 'static, - { - match self { - Executor::Uring(ex) => ex.spawn_blocking(f), - Executor::Fd(ex) => ex.spawn_blocking(f), - } - } - - fn run_until(&self, f: F) -> AsyncResult { - match self { - Executor::Uring(ex) => Ok(ex.run_until(f)?), - Executor::Fd(ex) => Ok(ex.run_until(f)?), - } - } -} - -impl AsRawDescriptors for Executor { - fn as_raw_descriptors(&self) -> Vec { - match self { - Executor::Uring(ex) => ex.as_raw_descriptors(), - Executor::Fd(ex) => ex.as_raw_descriptors(), - } - } -} diff --git a/cros_async/src/sys/linux/uring_source.rs b/cros_async/src/sys/linux/uring_source.rs index 7a21574d75..d01762a6a8 100644 --- a/cros_async/src/sys/linux/uring_source.rs +++ b/cros_async/src/sys/linux/uring_source.rs @@ -386,7 +386,7 @@ mod tests { waker: None, })); - let uring_ex = Executor::with_executor_kind(ExecutorKindSys::Uring).unwrap(); + let uring_ex = Executor::with_executor_kind(ExecutorKindSys::Uring.into()).unwrap(); let f = File::open("/dev/zero").unwrap(); let source = uring_ex.async_from(f).unwrap(); @@ -395,7 +395,7 @@ mod tests { }; let handle = std::thread::spawn(move || uring_ex.run_until(quit)); - let poll_ex = Executor::with_executor_kind(ExecutorKindSys::Fd).unwrap(); + let poll_ex = Executor::with_executor_kind(ExecutorKindSys::Fd.into()).unwrap(); poll_ex.run_until(go(source)).unwrap(); state.lock().wake(); @@ -421,7 +421,7 @@ mod tests { waker: None, })); - let poll_ex = Executor::with_executor_kind(ExecutorKindSys::Fd).unwrap(); + let poll_ex = Executor::with_executor_kind(ExecutorKindSys::Fd.into()).unwrap(); let f = File::open("/dev/zero").unwrap(); let source = poll_ex.async_from(f).unwrap(); @@ -430,7 +430,7 @@ mod tests { }; let handle = std::thread::spawn(move || poll_ex.run_until(quit)); - let uring_ex = Executor::with_executor_kind(ExecutorKindSys::Uring).unwrap(); + let uring_ex = Executor::with_executor_kind(ExecutorKindSys::Uring.into()).unwrap(); uring_ex.run_until(go(source)).unwrap(); state.lock().wake(); diff --git a/cros_async/src/sys/windows/executor.rs b/cros_async/src/sys/windows/executor.rs index 34348206f5..3d9053b22c 100644 --- a/cros_async/src/sys/windows/executor.rs +++ b/cros_async/src/sys/windows/executor.rs @@ -2,126 +2,8 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. -use std::future::Future; -use std::sync::Arc; - -use once_cell::sync::OnceCell; use serde::Deserialize; use serde::Serialize; -use thiserror::Error as ThisError; - -use super::HandleReactor; -use crate::common_executor::RawExecutor; -use crate::AsyncResult; -use crate::ExecutorTrait; -use crate::IntoAsync; -use crate::IoSource; -use crate::TaskHandle; - -pub const DEFAULT_IO_CONCURRENCY: u32 = 1; - -/// An executor for scheduling tasks that poll futures to completion. -/// -/// All asynchronous operations must run within an executor, which is capable of spawning futures as -/// tasks. This executor also provides a mechanism for performing asynchronous I/O operations. -/// -/// The returned type is a cheap, clonable handle to the underlying executor. Cloning it will only -/// create a new reference, not a new executor. -/// -/// Note that language limitations (trait objects can have <=1 non auto trait) require this to be -/// represented on the POSIX side as an enum, rather than a trait. This leads to some code & -/// interface duplication, but as far as we understand that is unavoidable. -/// -/// See -/// for further details. -/// -/// # Examples -/// -/// Concurrently wait for multiple files to become readable/writable and then read/write the data. -/// -/// ``` -/// use std::cmp::min; -/// use std::error::Error; -/// use std::fs::{File, OpenOptions}; -/// -/// use cros_async::{AsyncResult, Executor, IoSource, complete3}; -/// const CHUNK_SIZE: usize = 32; -/// -/// // Write all bytes from `data` to `f`. -/// async fn write_file(f: &IoSource, mut data: Vec) -> AsyncResult<()> { -/// while data.len() > 0 { -/// let (count, mut buf) = f.write_from_vec(Some(0), data).await?; -/// -/// data = buf.split_off(count); -/// } -/// -/// Ok(()) -/// } -/// -/// // Transfer `len` bytes of data from `from` to `to`. -/// async fn transfer_data( -/// from: IoSource, -/// to: IoSource, -/// len: usize, -/// ) -> AsyncResult { -/// let mut rem = len; -/// -/// while rem > 0 { -/// let buf = vec![0u8; min(rem, CHUNK_SIZE)]; -/// let (count, mut data) = from.read_to_vec(Some(0), buf).await?; -/// -/// if count == 0 { -/// // End of file. Return the number of bytes transferred. -/// return Ok(len - rem); -/// } -/// -/// data.truncate(count); -/// write_file(&*to, data).await?; -/// -/// rem = rem.saturating_sub(count); -/// } -/// -/// Ok(len) -/// } -/// -/// #[cfg(any(target_os = "android", target_os = "linux"))] -/// # fn do_it() -> Result<(), Box> { -/// let ex = Executor::new()?; -/// -/// let (rx, tx) = base::pipe()?; -/// let zero = File::open("/dev/zero")?; -/// let zero_bytes = CHUNK_SIZE * 7; -/// let zero_to_pipe = transfer_data( -/// ex.async_from(zero)?, -/// ex.async_from(tx.try_clone()?)?, -/// zero_bytes, -/// ); -/// -/// let rand = File::open("/dev/urandom")?; -/// let rand_bytes = CHUNK_SIZE * 19; -/// let rand_to_pipe = transfer_data(ex.async_from(rand)?, ex.async_from(tx)?, rand_bytes); -/// -/// let null = OpenOptions::new().write(true).open("/dev/null")?; -/// let null_bytes = zero_bytes + rand_bytes; -/// let pipe_to_null = transfer_data(ex.async_from(rx)?, ex.async_from(null)?, null_bytes); -/// -/// ex.run_until(complete3( -/// async { assert_eq!(pipe_to_null.await.unwrap(), null_bytes) }, -/// async { assert_eq!(zero_to_pipe.await.unwrap(), zero_bytes) }, -/// async { assert_eq!(rand_to_pipe.await.unwrap(), rand_bytes) }, -/// ))?; -/// -/// # Ok(()) -/// # } -/// #[cfg(any(target_os = "android", target_os = "linux"))] -/// # do_it().unwrap(); -/// ``` - -#[derive(Clone)] -pub enum Executor { - Handle(Arc>), - Overlapped(Arc>), -} /// An enum to express the kind of the backend of `Executor` #[derive( @@ -132,273 +14,3 @@ pub enum ExecutorKindSys { Handle, Overlapped, } - -/// If set, [`Executor::new()`] is created with `ExecutorKindSys` of `DEFAULT_EXECUTOR_KIND`. -static DEFAULT_EXECUTOR_KIND: OnceCell = OnceCell::new(); - -impl Default for ExecutorKindSys { - fn default() -> Self { - DEFAULT_EXECUTOR_KIND - .get() - .copied() - .unwrap_or(ExecutorKindSys::Handle) - } -} - -/// The error type for [`Executor::set_default_executor_kind()`]. -#[derive(ThisError, Debug)] -pub enum SetDefaultExecutorKindError { - /// The default executor kind is set more than once. - #[error("The default executor kind is already set to {0:?}")] - SetMoreThanOnce(ExecutorKindSys), -} - -impl Executor { - /// Create a new `Executor`. - pub fn new() -> AsyncResult { - Executor::with_executor_kind(ExecutorKindSys::default()) - } - - /// Create a new `Executor` of the given `ExecutorKindSys`. - pub fn with_executor_kind(kind: ExecutorKindSys) -> AsyncResult { - match kind { - ExecutorKindSys::Handle => Ok(Executor::Handle(RawExecutor::::new()?)), - ExecutorKindSys::Overlapped => { - Ok(Executor::Overlapped(RawExecutor::::new()?)) - } - } - } - - /// Create a new `Executor` of the given `ExecutorKind`. - pub fn with_kind_and_concurrency(kind: ExecutorKindSys, concurrency: u32) -> AsyncResult { - match kind { - ExecutorKindSys::Handle => Ok(Executor::Handle(RawExecutor::::new()?)), - ExecutorKindSys::Overlapped => Ok(Executor::Overlapped( - RawExecutor::::new_with(HandleReactor::new_with(concurrency)?)?, - )), - } - } - - /// Create a new `IoSource` associated with `self`. Callers may then use the returned - /// `IoSource` to directly start async operations without needing a separate reference to the - /// executor. - pub fn async_from<'a, F: IntoAsync + 'a>(&self, f: F) -> AsyncResult> { - ExecutorTrait::async_from(self, f) - } - - /// Create a new overlapped `IoSource` associated with `self`. Callers may then use the - /// If the executor is not overlapped, then Handle source is returned. - /// returned `IoSource` to directly start async operations without needing a separate reference - /// to the executor. - pub fn async_overlapped_from<'a, F: IntoAsync + 'a>(&self, f: F) -> AsyncResult> { - match self { - Executor::Handle(ex) => ex.async_from(f), - Executor::Overlapped(ex) => Ok(IoSource::Overlapped(super::OverlappedSource::new( - f, ex, false, - )?)), - } - } - - /// Set the default ExecutorKind for [`Self::new()`]. This call is effective only once. - /// If a call is the first call, it sets the default, and `set_default_executor_kind` - /// returns `Ok(())`. Otherwise, it returns `SetDefaultExecutorKindError::SetMoreThanOnce` - /// which contains the existing ExecutorKind value configured by the first call. - pub fn set_default_executor_kind( - executor_kind: ExecutorKindSys, - ) -> Result<(), SetDefaultExecutorKindError> { - DEFAULT_EXECUTOR_KIND.set(executor_kind).map_err(|_| - // `expect` succeeds since this closure runs only when DEFAULT_EXECUTOR_KIND is set. - SetDefaultExecutorKindError::SetMoreThanOnce( - *DEFAULT_EXECUTOR_KIND - .get() - .expect("Failed to get DEFAULT_EXECUTOR_KIND"), - )) - } - - /// Spawn a new future for this executor to run to completion. Callers may use the returned - /// `TaskHandle` to await on the result of `f`. Dropping the returned `TaskHandle` will cancel - /// `f`, preventing it from being polled again. To drop a `TaskHandle` without canceling the - /// future associated with it use `TaskHandle::detach`. - /// - /// # Examples - /// - /// ``` - /// # use cros_async::AsyncResult; - /// # fn example_spawn() -> AsyncResult<()> { - /// # use std::thread; - /// - /// # use cros_async::Executor; - /// use futures::executor::block_on; - /// - /// # let ex = Executor::new()?; - /// - /// # // Spawn a thread that runs the executor. - /// # let ex2 = ex.clone(); - /// # thread::spawn(move || ex2.run()); - /// - /// let task = ex.spawn(async { 7 + 13 }); - /// - /// let result = block_on(task); - /// assert_eq!(result, 20); - /// # Ok(()) - /// # } - /// - /// # example_spawn().unwrap(); - /// ``` - pub fn spawn(&self, f: F) -> TaskHandle - where - F: Future + Send + 'static, - F::Output: Send + 'static, - { - ExecutorTrait::spawn(self, f) - } - - /// Spawn a thread-local task for this executor to drive to completion. Like `spawn` but without - /// requiring `Send` on `F` or `F::Output`. This method should only be called from the same - /// thread where `run()` or `run_until()` is called. - /// - /// # Panics - /// - /// `Executor::run` and `Executor::run_util` will panic if they try to poll a future that was - /// added by calling `spawn_local` from a different thread. - /// - /// # Examples - /// - /// ``` - /// # use cros_async::AsyncResult; - /// # fn example_spawn_local() -> AsyncResult<()> { - /// # use cros_async::Executor; - /// - /// # let ex = Executor::new()?; - /// - /// let task = ex.spawn_local(async { 7 + 13 }); - /// - /// let result = ex.run_until(task)?; - /// assert_eq!(result, 20); - /// # Ok(()) - /// # } - /// - /// # example_spawn_local().unwrap(); - /// ``` - pub fn spawn_local(&self, f: F) -> TaskHandle - where - F: Future + 'static, - F::Output: 'static, - { - ExecutorTrait::spawn_local(self, f) - } - - /// Run the executor indefinitely, driving all spawned futures to completion. This method will - /// block the current thread and only return in the case of an error. - /// - /// # Panics - /// - /// Once this method has been called on a thread, it may only be called on that thread from that - /// point on. Attempting to call it from another thread will panic. - /// - /// # Examples - /// - /// ``` - /// # use cros_async::AsyncResult; - /// # fn example_run() -> AsyncResult<()> { - /// use std::thread; - /// - /// use cros_async::Executor; - /// use futures::executor::block_on; - /// - /// let ex = Executor::new()?; - /// - /// // Spawn a thread that runs the executor. - /// let ex2 = ex.clone(); - /// thread::spawn(move || ex2.run()); - /// - /// let task = ex.spawn(async { 7 + 13 }); - /// - /// let result = block_on(task); - /// assert_eq!(result, 20); - /// # Ok(()) - /// # } - /// - /// # example_run().unwrap(); - /// ``` - pub fn run(&self) -> AsyncResult<()> { - ExecutorTrait::run_until(self, std::future::pending()) - } - - /// Drive all futures spawned in this executor until `f` completes. This method will block the - /// current thread only until `f` is complete and there may still be unfinished futures in the - /// executor. - /// - /// # Panics - /// - /// Once this method has been called on a thread, from then onwards it may only be called on - /// that thread. Attempting to call it from another thread will panic. - /// - /// # Examples - /// - /// ``` - /// # use cros_async::AsyncResult; - /// # fn example_run_until() -> AsyncResult<()> { - /// use cros_async::Executor; - /// - /// let ex = Executor::new()?; - /// - /// let task = ex.spawn_local(async { 7 + 13 }); - /// - /// let result = ex.run_until(task)?; - /// assert_eq!(result, 20); - /// # Ok(()) - /// # } - /// - /// # example_run_until().unwrap(); - /// ``` - pub fn run_until(&self, f: F) -> AsyncResult { - ExecutorTrait::run_until(self, f) - } -} - -impl ExecutorTrait for Executor { - fn async_from<'a, F: IntoAsync + 'a>(&self, f: F) -> AsyncResult> { - match self { - Executor::Handle(ex) => ex.async_from(f), - Executor::Overlapped(ex) => ex.async_from(f), - } - } - - fn spawn_blocking(&self, _f: F) -> TaskHandle - where - F: FnOnce() -> R + Send + 'static, - R: Send + 'static, - { - todo!() - } - - fn spawn(&self, f: F) -> TaskHandle - where - F: Future + Send + 'static, - F::Output: Send + 'static, - { - match self { - Executor::Handle(ex) => ex.spawn(f), - Executor::Overlapped(ex) => ex.spawn(f), - } - } - - fn spawn_local(&self, f: F) -> TaskHandle - where - F: Future + 'static, - F::Output: 'static, - { - match self { - Executor::Handle(ex) => ex.spawn_local(f), - Executor::Overlapped(ex) => ex.spawn_local(f), - } - } - - fn run_until(&self, f: F) -> AsyncResult { - match self { - Executor::Handle(ex) => ex.run_until(f), - Executor::Overlapped(ex) => ex.run_until(f), - } - } -} diff --git a/cros_async/src/sys/windows/handle_executor.rs b/cros_async/src/sys/windows/handle_executor.rs index 251bd5cf02..6147308016 100644 --- a/cros_async/src/sys/windows/handle_executor.rs +++ b/cros_async/src/sys/windows/handle_executor.rs @@ -27,7 +27,6 @@ use winapi::um::minwinbase::OVERLAPPED; use crate::common_executor; use crate::common_executor::RawExecutor; use crate::common_executor::RawTaskHandle; -use crate::sys::windows::executor::DEFAULT_IO_CONCURRENCY; use crate::sys::windows::io_completion_port::CompletionPacket; use crate::sys::windows::io_completion_port::IoCompletionPort; use crate::waker::WakerToken; @@ -37,6 +36,8 @@ use crate::AsyncResult; use crate::IoSource; use crate::TaskHandle; +const DEFAULT_IO_CONCURRENCY: u32 = 1; + #[derive(Debug, ThisError)] pub enum Error { #[error("IO completion port operation failed: {0}")] diff --git a/cros_async/tests/executor.rs b/cros_async/tests/executor.rs index dd269762ea..84b009cfea 100644 --- a/cros_async/tests/executor.rs +++ b/cros_async/tests/executor.rs @@ -22,7 +22,7 @@ fn all_kinds() -> Vec { #[test] fn cancel_pending_task() { for kind in all_kinds() { - let ex = Executor::with_executor_kind(kind.into()).unwrap(); + let ex = Executor::with_executor_kind(kind).unwrap(); let task = ex.spawn(std::future::pending::<()>()); assert_eq!(ex.run_until(task.cancel()).unwrap(), None); } @@ -34,7 +34,7 @@ fn cancel_pending_task() { #[test] fn cancel_ready_task() { for kind in all_kinds() { - let ex = Executor::with_executor_kind(kind.into()).unwrap(); + let ex = Executor::with_executor_kind(kind).unwrap(); let (s, r) = futures::channel::oneshot::channel(); let mut s = Some(s); let task = ex.spawn(futures::future::poll_fn(move |_| { diff --git a/devices/src/virtio/block/sys/linux.rs b/devices/src/virtio/block/sys/linux.rs index 3cf7796917..56b3949a85 100644 --- a/devices/src/virtio/block/sys/linux.rs +++ b/devices/src/virtio/block/sys/linux.rs @@ -61,7 +61,6 @@ impl DiskOption { impl BlockAsync { pub fn create_executor(&self) -> Executor { - Executor::with_executor_kind(self.executor_kind.into()) - .expect("Failed to create an executor") + Executor::with_executor_kind(self.executor_kind).expect("Failed to create an executor") } } diff --git a/devices/src/virtio/block/sys/windows.rs b/devices/src/virtio/block/sys/windows.rs index cebadbb72a..4c215290f0 100644 --- a/devices/src/virtio/block/sys/windows.rs +++ b/devices/src/virtio/block/sys/windows.rs @@ -66,7 +66,7 @@ impl DiskOption { impl BlockAsync { pub fn create_executor(&self) -> Executor { - Executor::with_kind_and_concurrency(self.executor_kind.into(), self.io_concurrency) + Executor::with_kind_and_concurrency(self.executor_kind, self.io_concurrency) .expect("Failed to create an executor") } } diff --git a/devices/src/virtio/scsi/device.rs b/devices/src/virtio/scsi/device.rs index e028cf9cdc..4a8576e415 100644 --- a/devices/src/virtio/scsi/device.rs +++ b/devices/src/virtio/scsi/device.rs @@ -678,8 +678,8 @@ impl VirtioDevice for Controller { let intr = interrupt.clone(); let worker_thread = WorkerThread::start("v_scsi_ctrlq", move |kill_evt| { - let ex = Executor::with_executor_kind(executor_kind.into()) - .expect("Failed to create an executor"); + let ex = + Executor::with_executor_kind(executor_kind).expect("Failed to create an executor"); if let Err(err) = ex .run_until(run_worker( &ex, @@ -701,7 +701,7 @@ impl VirtioDevice for Controller { let interrupt = interrupt.clone(); let worker_thread = WorkerThread::start(format!("v_scsi_req_{}", i + 2), move |kill_evt| { - let ex = Executor::with_executor_kind(executor_kind.into()) + let ex = Executor::with_executor_kind(executor_kind) .expect("Failed to create an executor"); let async_logical_unit = targets .0 diff --git a/devices/src/virtio/vhost/user/device/block/sys/windows.rs b/devices/src/virtio/vhost/user/device/block/sys/windows.rs index 9e02df1ea5..058c823a7e 100644 --- a/devices/src/virtio/vhost/user/device/block/sys/windows.rs +++ b/devices/src/virtio/vhost/user/device/block/sys/windows.rs @@ -70,7 +70,7 @@ pub fn start_device(opts: Options) -> anyhow::Result<()> { let kind = disk_option .async_executor .unwrap_or(ExecutorKindSys::Handle.into()); - let ex = Executor::with_executor_kind(kind.into()).context("failed to create executor")?; + let ex = Executor::with_executor_kind(kind).context("failed to create executor")?; let block = Box::new(BlockAsync::new( base_features(ProtectionType::Unprotected), diff --git a/src/crosvm/sys/linux.rs b/src/crosvm/sys/linux.rs index 2807c595b7..92596e166a 100644 --- a/src/crosvm/sys/linux.rs +++ b/src/crosvm/sys/linux.rs @@ -1628,7 +1628,7 @@ fn get_default_hypervisor() -> Option { pub fn run_config(cfg: Config) -> Result { if let Some(async_executor) = cfg.async_executor { - Executor::set_default_executor_kind(async_executor.into()) + Executor::set_default_executor_kind(async_executor) .context("Failed to set the default async executor")?; } @@ -4514,7 +4514,7 @@ fn start_vhost_user_control_server( pub fn start_devices(opts: DevicesCommand) -> anyhow::Result<()> { if let Some(async_executor) = opts.async_executor { - Executor::set_default_executor_kind(async_executor.into()) + Executor::set_default_executor_kind(async_executor) .context("Failed to set the default async executor")?; } diff --git a/src/main.rs b/src/main.rs index 499729d0ce..c65fab97d5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -503,7 +503,7 @@ fn create_qcow2(cmd: cmdline::CreateQcow2Command) -> std::result::Result<(), ()> fn start_device(opts: cmdline::DeviceCommand) -> std::result::Result<(), ()> { if let Some(async_executor) = opts.async_executor { - cros_async::Executor::set_default_executor_kind(async_executor.into()) + cros_async::Executor::set_default_executor_kind(async_executor) .map_err(|e| error!("Failed to set the default async executor: {:#}", e))?; }