diff --git a/cros_async/src/sys/unix/executor.rs b/cros_async/src/sys/unix/executor.rs index 075d861865..358cabe462 100644 --- a/cros_async/src/sys/unix/executor.rs +++ b/cros_async/src/sys/unix/executor.rs @@ -6,6 +6,8 @@ use std::future::Future; use async_task::Task; +use base::{AsRawDescriptors, RawDescriptor}; + use super::{ poll_source::Error as PollError, uring_executor::use_uring, FdExecutor, PollSource, URingExecutor, UringSource, @@ -352,3 +354,12 @@ impl Executor { } } } + +impl AsRawDescriptors for Executor { + fn as_raw_descriptors(&self) -> Vec { + match self { + Executor::Uring(ex) => ex.as_raw_descriptors(), + Executor::Fd(ex) => ex.as_raw_descriptors(), + } + } +} diff --git a/cros_async/src/sys/unix/fd_executor.rs b/cros_async/src/sys/unix/fd_executor.rs index 27e881fe32..6f4515f73f 100644 --- a/cros_async/src/sys/unix/fd_executor.rs +++ b/cros_async/src/sys/unix/fd_executor.rs @@ -23,7 +23,10 @@ use std::{ }; use async_task::Task; -use base::{add_fd_flags, warn, AsRawDescriptor, EpollContext, EpollEvents, Event, WatchingEvents}; +use base::{ + add_fd_flags, warn, AsRawDescriptor, AsRawDescriptors, EpollContext, EpollEvents, Event, + RawDescriptor, WatchingEvents, +}; use futures::task::noop_waker; use pin_utils::pin_mut; use remain::sorted; @@ -271,10 +274,17 @@ struct RawExecutor { blocking_pool: BlockingPool, state: AtomicI32, notify: Event, + // Descriptor of the original event that was cloned to create notify. + // This is only needed for the AsRawDescriptors implementation. + notify_dup: RawDescriptor, } impl RawExecutor { - fn new(notify: Event) -> Result { + fn new(notify: &Event) -> Result { + // Save the original descriptor before cloning. This descriptor will be used when creating + // the notify task, so we need to preserve it for AsRawDescriptors. + let notify_dup = notify.as_raw_descriptor(); + let notify = notify.try_clone().map_err(Error::CloneEvent)?; Ok(RawExecutor { queue: RunnableQueue::new(), poll_ctx: EpollContext::new().map_err(Error::CreatingContext)?, @@ -282,6 +292,7 @@ impl RawExecutor { blocking_pool: Default::default(), state: AtomicI32::new(PROCESSING), notify, + notify_dup, }) } @@ -445,6 +456,16 @@ impl RawExecutor { } } +impl AsRawDescriptors for RawExecutor { + fn as_raw_descriptors(&self) -> Vec { + vec![ + self.poll_ctx.as_raw_descriptor(), + self.notify.as_raw_descriptor(), + self.notify_dup, + ] + } +} + impl WeakWake for RawExecutor { fn wake_by_ref(weak_self: &Weak) { if let Some(arc_self) = weak_self.upgrade() { @@ -494,11 +515,7 @@ pub struct FdExecutor { impl FdExecutor { pub fn new() -> Result { let notify = Event::new().map_err(Error::CreateEvent)?; - let raw = notify - .try_clone() - .map_err(Error::CloneEvent) - .and_then(RawExecutor::new) - .map(Arc::new)?; + let raw = RawExecutor::new(¬ify).map(Arc::new)?; raw.spawn(notify_task(notify, Arc::downgrade(&raw))) .detach(); @@ -553,6 +570,12 @@ impl FdExecutor { } } +impl AsRawDescriptors for FdExecutor { + fn as_raw_descriptors(&self) -> Vec { + self.raw.as_raw_descriptors() + } +} + // Used to `dup` the FDs passed to the executor so there is a guarantee they aren't closed while // waiting in TLS to be added to the main polling context. unsafe fn dup_fd(fd: RawFd) -> Result { diff --git a/cros_async/src/sys/unix/uring_executor.rs b/cros_async/src/sys/unix/uring_executor.rs index 4e78f23a8c..5d961a8896 100644 --- a/cros_async/src/sys/unix/uring_executor.rs +++ b/cros_async/src/sys/unix/uring_executor.rs @@ -73,7 +73,7 @@ use std::{ }; use async_task::Task; -use base::{warn, AsRawDescriptor, WatchingEvents}; +use base::{warn, AsRawDescriptor, RawDescriptor, WatchingEvents}; use futures::task::noop_waker; use io_uring::URingContext; use once_cell::sync::Lazy; @@ -726,6 +726,12 @@ impl RawExecutor { } } +impl AsRawDescriptor for RawExecutor { + fn as_raw_descriptor(&self) -> RawDescriptor { + self.ctx.as_raw_descriptor() + } +} + impl WeakWake for RawExecutor { fn wake_by_ref(weak_self: &Weak) { if let Some(arc_self) = weak_self.upgrade() { @@ -850,6 +856,12 @@ impl URingExecutor { } } +impl AsRawDescriptor for URingExecutor { + fn as_raw_descriptor(&self) -> RawDescriptor { + self.raw.as_raw_descriptor() + } +} + // Used to dup the FDs passed to the executor so there is a guarantee they aren't closed while // waiting in TLS to be added to the main polling context. unsafe fn dup_fd(fd: RawFd) -> Result { diff --git a/devices/src/virtio/vhost/user/device/fs.rs b/devices/src/virtio/vhost/user/device/fs.rs index a310ee7f58..10daccc2bb 100644 --- a/devices/src/virtio/vhost/user/device/fs.rs +++ b/devices/src/virtio/vhost/user/device/fs.rs @@ -10,7 +10,10 @@ use std::sync::Arc; use anyhow::{anyhow, bail, Context}; use argh::FromArgs; -use base::{error, get_max_open_files, warn, Event, RawDescriptor, Tube, UnlinkUnixListener}; +use base::{ + error, get_max_open_files, warn, AsRawDescriptors, Event, RawDescriptor, Tube, + UnlinkUnixListener, +}; use cros_async::{EventAsync, Executor}; use data_model::{DataInit, Le32}; use fuse::Server; @@ -142,10 +145,13 @@ impl FsBackend { let mut keep_rds: Vec = [0, 1, 2].to_vec(); keep_rds.append(&mut fs.keep_rds()); + let ex = ex.clone(); + keep_rds.extend(ex.as_raw_descriptors()); + let server = Arc::new(Server::new(fs)); Ok(FsBackend { - ex: ex.clone(), + ex, server, tag: fs_tag, avail_features,