diff --git a/base/src/lib.rs b/base/src/lib.rs index 66735d8065..054a11fa3e 100644 --- a/base/src/lib.rs +++ b/base/src/lib.rs @@ -50,7 +50,7 @@ cfg_if::cfg_if! { pub use unix::net; - // File related exports. + // File related exports. pub use platform::{FileFlags, get_max_open_files}; // memory/mmap related exports. @@ -65,13 +65,11 @@ cfg_if::cfg_if! { validate_raw_descriptor, clear_descriptor_cloexec, }; - // Event/signal related exports. pub use platform::{ block_signal, clear_signal, get_blocked_signals, new_pipe_full, register_rt_signal_handler, signal, unblock_signal, Killable, SIGRTMIN, - WatchingEvents, EpollContext, EpollEvents, AcpiNotifyEvent, NetlinkGenericSocket, - SignalFd, Terminal, EventFd, + WatchingEvents, AcpiNotifyEvent, NetlinkGenericSocket, SignalFd, Terminal, EventFd, }; pub use platform::{ diff --git a/base/src/sys/unix/mod.rs b/base/src/sys/unix/mod.rs index 162479a306..33a5454a12 100644 --- a/base/src/sys/unix/mod.rs +++ b/base/src/sys/unix/mod.rs @@ -62,7 +62,7 @@ pub use get_filesystem_type::*; pub use ioctl::*; pub use mmap::*; pub use netlink::*; -pub use poll::{EpollContext, EpollEvents, PollContext as EventContext, PollToken, WatchingEvents}; +pub use poll::{PollContext as EventContext, PollToken, WatchingEvents}; pub use priority::*; pub use sched::*; pub use scoped_signal_handler::*; diff --git a/base/src/sys/unix/poll.rs b/base/src/sys/unix/poll.rs index d47f93ffc9..58cfa20c51 100644 --- a/base/src/sys/unix/poll.rs +++ b/base/src/sys/unix/poll.rs @@ -3,21 +3,13 @@ // found in the LICENSE file. use std::{ - cell::{Cell, Ref, RefCell}, - cmp::min, - fs::File, - i32, i64, - marker::PhantomData, - ptr::null_mut, - slice, thread, - time::Duration, + cmp::min, fs::File, marker::PhantomData, mem::MaybeUninit, ptr::null_mut, time::Duration, }; use libc::{ c_int, epoll_create1, epoll_ctl, epoll_event, epoll_wait, EPOLLHUP, EPOLLIN, EPOLLOUT, EPOLLRDHUP, EPOLL_CLOEXEC, EPOLL_CTL_ADD, EPOLL_CTL_DEL, EPOLL_CTL_MOD, }; -use log::warn; use smallvec::SmallVec; use super::{errno_result, Result}; @@ -27,29 +19,15 @@ use crate::{ const POLL_CONTEXT_MAX_EVENTS: usize = 16; -fn convert_to_watching_events(event_type: EventType) -> WatchingEvents { - match event_type { - EventType::None => WatchingEvents::empty(), - EventType::Read => WatchingEvents::empty().set_read(), - EventType::Write => WatchingEvents::empty().set_write(), - EventType::ReadWrite => WatchingEvents::empty().set_read().set_write(), - } -} - -/// EpollEvents wraps raw epoll_events, it should only be used with EpollContext. -pub struct EpollEvents(RefCell<[epoll_event; POLL_CONTEXT_MAX_EVENTS]>); - -impl EpollEvents { - pub fn new() -> EpollEvents { - EpollEvents(RefCell::new( - [epoll_event { events: 0, u64: 0 }; POLL_CONTEXT_MAX_EVENTS], - )) - } -} - -impl Default for EpollEvents { - fn default() -> EpollEvents { - Self::new() +impl From for u32 { + fn from(et: EventType) -> u32 { + let v = match et { + EventType::None => 0, + EventType::Read => EPOLLIN, + EventType::Write => EPOLLOUT, + EventType::ReadWrite => EPOLLIN | EPOLLOUT, + }; + v as u32 } } @@ -128,115 +106,6 @@ impl PollToken for () { fn from_raw_token(_data: u64) -> Self {} } -/// An event returned by `PollContext::wait`. -pub struct PollEvent<'a, T> { - event: &'a epoll_event, - token: PhantomData, // Needed to satisfy usage of T -} - -impl<'a, T: PollToken> PollEvent<'a, T> { - /// Gets the token associated in `PollContext::add` with this event. - pub fn token(&self) -> T { - T::from_raw_token(self.event.u64) - } - - /// True if the `fd` associated with this token in `PollContext::add` is readable. - pub fn readable(&self) -> bool { - self.event.events & (EPOLLIN as u32) != 0 - } - - /// True if the `fd` associated with this token in `PollContext::add` is writable. - pub fn writable(&self) -> bool { - self.event.events & (EPOLLOUT as u32) != 0 - } - - /// True if the `fd` associated with this token in `PollContext::add` has been hungup on. - pub fn hungup(&self) -> bool { - self.event.events & ((EPOLLHUP | EPOLLRDHUP) as u32) != 0 - } -} - -/// An iterator over some (sub)set of events returned by `PollContext::wait`. -pub struct PollEventIter<'a, I, T> -where - I: Iterator, -{ - mask: u32, - iter: I, - tokens: PhantomData<[T]>, // Needed to satisfy usage of T -} - -impl<'a, I, T> Iterator for PollEventIter<'a, I, T> -where - I: Iterator, - T: PollToken, -{ - type Item = PollEvent<'a, T>; - fn next(&mut self) -> Option { - let mask = self.mask; - self.iter - .find(|event| (event.events & mask) != 0) - .map(|event| PollEvent { - event, - token: PhantomData, - }) - } -} - -/// The list of event returned by `PollContext::wait`. -pub struct PollEvents<'a, T> { - count: usize, - events: Ref<'a, [epoll_event; POLL_CONTEXT_MAX_EVENTS]>, - tokens: PhantomData<[T]>, // Needed to satisfy usage of T -} - -impl<'a, T: PollToken> PollEvents<'a, T> { - /// Iterates over each event. - pub fn iter(&self) -> PollEventIter, T> { - PollEventIter { - mask: 0xffff_ffff, - iter: self.events[..self.count].iter(), - tokens: PhantomData, - } - } - - /// Iterates over each readable event. - pub fn iter_readable(&self) -> PollEventIter, T> { - PollEventIter { - mask: EPOLLIN as u32, - iter: self.events[..self.count].iter(), - tokens: PhantomData, - } - } - - /// Iterates over each writable event. - pub fn iter_writable(&self) -> PollEventIter, T> { - PollEventIter { - mask: EPOLLOUT as u32, - iter: self.events[..self.count].iter(), - tokens: PhantomData, - } - } - - /// Iterates over each hungup event. - pub fn iter_hungup(&self) -> PollEventIter, T> { - PollEventIter { - mask: (EPOLLHUP | EPOLLRDHUP) as u32, - iter: self.events[..self.count].iter(), - tokens: PhantomData, - } - } -} - -impl<'a, T: PollToken> IntoIterator for &'a PollEvents<'_, T> { - type Item = PollEvent<'a, T>; - type IntoIter = PollEventIter<'a, slice::Iter<'a, epoll_event>, T>; - - fn into_iter(self) -> Self::IntoIter { - self.iter() - } -} - /// Watching events taken by PollContext. pub struct WatchingEvents(u32); @@ -271,37 +140,36 @@ impl WatchingEvents { } } -/// EpollContext wraps linux epoll. It provides similar interface to PollContext. -/// It is thread safe while PollContext is not. It requires user to pass in a reference of -/// EpollEvents while PollContext does not. Always use PollContext if you don't need to access the -/// same epoll from different threads. -pub struct EpollContext { +/// Used to poll multiple objects that have file descriptors. +/// +/// See [`crate::WaitContext`] for an example that uses the cross-platform wrapper. +pub struct PollContext { epoll_ctx: File, // Needed to satisfy usage of T tokens: PhantomData<[T]>, } -impl EpollContext { - /// Creates a new `EpollContext`. - pub fn new() -> Result> { +impl PollContext { + /// Creates a new `PollContext`. + pub fn new() -> Result> { // Safe because we check the return value. let epoll_fd = unsafe { epoll_create1(EPOLL_CLOEXEC) }; if epoll_fd < 0 { return errno_result(); } - Ok(EpollContext { + Ok(PollContext { epoll_ctx: unsafe { File::from_raw_descriptor(epoll_fd) }, tokens: PhantomData, }) } - /// Creates a new `EpollContext` and adds the slice of `fd` and `token` tuples to the new + /// Creates a new `PollContext` and adds the slice of `fd` and `token` tuples to the new /// context. /// /// This is equivalent to calling `new` followed by `add_many`. If there is an error, this will /// return the error instead of the new context. - pub fn build_with(fd_tokens: &[(&dyn AsRawDescriptor, T)]) -> Result> { - let ctx = EpollContext::new()?; + pub fn build_with(fd_tokens: &[(&dyn AsRawDescriptor, T)]) -> Result> { + let ctx = PollContext::new()?; ctx.add_many(fd_tokens)?; Ok(ctx) } @@ -325,23 +193,24 @@ impl EpollContext { /// there were no duplicated file descriptors (i.e. adding the same descriptor with a different /// FD number) added to this context, events will not be reported by `wait` anymore. pub fn add(&self, fd: &dyn AsRawDescriptor, token: T) -> Result<()> { - self.add_fd_with_events(fd, WatchingEvents::empty().set_read(), token) + self.add_for_event(fd, EventType::Read, token) } - /// Adds the given `fd` to this context, watching for the specified events and associates the - /// given 'token' with those events. + /// Adds the given `descriptor` to this context, watching for the specified events and + /// associates the given 'token' with those events. /// - /// A `fd` can only be added once and does not need to be kept open. If the `fd` is dropped and - /// there were no duplicated file descriptors (i.e. adding the same descriptor with a different - /// FD number) added to this context, events will not be reported by `wait` anymore. - pub fn add_fd_with_events( + /// A `descriptor` can only be added once and does not need to be kept open. If the `descriptor` + /// is dropped and there were no duplicated file descriptors (i.e. adding the same descriptor + /// with a different FD number) added to this context, events will not be reported by `wait` + /// anymore. + pub fn add_for_event( &self, - fd: &dyn AsRawDescriptor, - events: WatchingEvents, + descriptor: &dyn AsRawDescriptor, + event_type: EventType, token: T, ) -> Result<()> { let mut evt = epoll_event { - events: events.get_raw(), + events: event_type.into(), u64: token.as_raw_token(), }; // Safe because we give a valid epoll FD and FD to watch, as well as a valid epoll_event @@ -350,7 +219,7 @@ impl EpollContext { epoll_ctl( self.epoll_ctx.as_raw_descriptor(), EPOLL_CTL_ADD, - fd.as_raw_descriptor(), + descriptor.as_raw_descriptor(), &mut evt, ) }; @@ -361,10 +230,10 @@ impl EpollContext { } /// If `fd` was previously added to this context, the watched events will be replaced with - /// `events` and the token associated with it will be replaced with the given `token`. - pub fn modify(&self, fd: &dyn AsRawDescriptor, events: WatchingEvents, token: T) -> Result<()> { + /// `event_type` and the token associated with it will be replaced with the given `token`. + pub fn modify(&self, fd: &dyn AsRawDescriptor, event_type: EventType, token: T) -> Result<()> { let mut evt = epoll_event { - events: events.0, + events: event_type.into(), u64: token.as_raw_token(), }; // Safe because we give a valid epoll FD and FD to modify, as well as a valid epoll_event @@ -413,19 +282,23 @@ impl EpollContext { /// return immediately. The consequence of not handling an event perpetually while calling /// `wait` is that the callers loop will degenerated to busy loop polling, pinning a CPU to /// ~100% usage. - pub fn wait<'a>(&self, events: &'a EpollEvents) -> Result> { - self.wait_timeout(events, Duration::new(i64::MAX as u64, 0)) + pub fn wait(&self) -> Result; 16]>> { + self.wait_timeout(Duration::new(i64::MAX as u64, 0)) } /// Like `wait` except will only block for a maximum of the given `timeout`. /// /// This may return earlier than `timeout` with zero events if the duration indicated exceeds /// system limits. - pub fn wait_timeout<'a>( - &self, - events: &'a EpollEvents, - timeout: Duration, - ) -> Result> { + pub fn wait_timeout(&self, timeout: Duration) -> Result; 16]>> { + // SAFETY: + // `MaybeUnint` has the same layout as plain `T` (`epoll_event` in our case). + // We submit an uninitialized array to the `epoll_wait` system call, which returns how many + // elements it initialized, and then we convert only the initialized `MaybeUnint` values + // into `epoll_event` structures after the call. + let mut epoll_events: [MaybeUninit; POLL_CONTEXT_MAX_EVENTS] = + unsafe { MaybeUninit::uninit().assume_init() }; + let timeout_millis = if timeout.as_secs() as i64 == i64::max_value() { // We make the convenient assumption that 2^63 seconds is an effectively unbounded time // frame. This is meant to mesh with `wait` calling us with no timeout. @@ -441,14 +314,15 @@ impl EpollContext { min(i32::max_value() as u64, millis) as i32 }; let ret = { - let mut epoll_events = events.0.borrow_mut(); let max_events = epoll_events.len() as c_int; // Safe because we give an epoll context and a properly sized epoll_events array - // pointer, which we trust the kernel to fill in properly. + // pointer, which we trust the kernel to fill in properly. The `transmute` is safe, + // since `MaybeUnint` has the same layout as `T`, and the `epoll_wait` syscall will + // initialize as many elements of the `epoll_events` array as it returns. unsafe { handle_eintr_errno!(epoll_wait( self.epoll_ctx.as_raw_descriptor(), - &mut epoll_events[0], + std::mem::transmute(&mut epoll_events[0]), max_events, timeout_millis )) @@ -457,206 +331,24 @@ impl EpollContext { if ret < 0 { return errno_result(); } - let epoll_events = events.0.borrow(); - let events = PollEvents { - count: ret as usize, - events: epoll_events, - tokens: PhantomData, - }; - Ok(events) - } -} + let count = ret as usize; -impl AsRawDescriptor for EpollContext { - fn as_raw_descriptor(&self) -> RawDescriptor { - self.epoll_ctx.as_raw_descriptor() - } -} - -impl IntoRawDescriptor for EpollContext { - fn into_raw_descriptor(self) -> RawDescriptor { - self.epoll_ctx.into_raw_descriptor() - } -} - -/// Used to poll multiple objects that have file descriptors. -/// -/// See [`crate::WaitContext`] for an example that uses the cross-platform wrapper. -pub struct PollContext { - epoll_ctx: EpollContext, - - // We use a RefCell here so that the `wait` method only requires an immutable self reference - // while returning the events (encapsulated by PollEvents). Without the RefCell, `wait` would - // hold a mutable reference that lives as long as its returned reference (i.e. the PollEvents), - // even though that reference is immutable. This is terribly inconvenient for the caller because - // the borrow checking would prevent them from using `delete` and `add` while the events are in - // scope. - events: EpollEvents, - - // Hangup busy loop detection variables. See `check_for_hungup_busy_loop`. - hangups: Cell, - max_hangups: Cell, -} - -impl PollContext { - /// Creates a new `PollContext`. - pub fn new() -> Result> { - Ok(PollContext { - epoll_ctx: EpollContext::new()?, - events: EpollEvents::new(), - hangups: Cell::new(0), - max_hangups: Cell::new(0), - }) - } - - /// Creates a new `PollContext` and adds the slice of `fd` and `token` tuples to the new - /// context. - /// - /// This is equivalent to calling `new` followed by `add_many`. If there is an error, this will - /// return the error instead of the new context. - pub fn build_with(fd_tokens: &[(&dyn AsRawDescriptor, T)]) -> Result> { - let ctx = PollContext::new()?; - ctx.add_many(fd_tokens)?; - Ok(ctx) - } - - /// Adds the given slice of `fd` and `token` tuples to this context. - /// - /// This is equivalent to calling `add` with each `fd` and `token`. If there are any errors, - /// this method will stop adding `fd`s and return the first error, leaving this context in a - /// undefined state. - pub fn add_many(&self, fd_tokens: &[(&dyn AsRawDescriptor, T)]) -> Result<()> { - for (fd, token) in fd_tokens { - self.add(*fd, T::from_raw_token(token.as_raw_token()))?; - } - Ok(()) - } - - /// Adds the given `fd` to this context and associates the given `token` with the `fd`'s - /// readable events. - /// - /// A `fd` can only be added once and does not need to be kept open. If the `fd` is dropped and - /// there were no duplicated file descriptors (i.e. adding the same descriptor with a different - /// FD number) added to this context, events will not be reported by `wait` anymore. - pub fn add(&self, fd: &dyn AsRawDescriptor, token: T) -> Result<()> { - self.add_fd_with_events(fd, WatchingEvents::empty().set_read(), token) - } - - /// Adds the given `descriptor` to this context, watching for the specified events and - /// associates the given 'token' with those events. - /// - /// A `descriptor` can only be added once and does not need to be kept open. If the `descriptor` - /// is dropped and there were no duplicated file descriptors (i.e. adding the same descriptor - /// with a different FD number) added to this context, events will not be reported by `wait` - /// anymore. - pub fn add_for_event( - &self, - descriptor: &dyn AsRawDescriptor, - event_type: EventType, - token: T, - ) -> Result<()> { - self.add_fd_with_events(descriptor, convert_to_watching_events(event_type), token) - } - - fn add_fd_with_events( - &self, - fd: &dyn AsRawDescriptor, - events: WatchingEvents, - token: T, - ) -> Result<()> { - self.epoll_ctx.add_fd_with_events(fd, events, token)?; - self.hangups.set(0); - self.max_hangups.set(self.max_hangups.get() + 1); - Ok(()) - } - - /// If `fd` was previously added to this context, the watched events will be replaced with - /// `event_type` and the token associated with it will be replaced with the given `token`. - pub fn modify(&self, fd: &dyn AsRawDescriptor, event_type: EventType, token: T) -> Result<()> { - self.epoll_ctx - .modify(fd, convert_to_watching_events(event_type), token) - } - - /// Deletes the given `fd` from this context. - /// - /// If an `fd`'s token shows up in the list of hangup events, it should be removed using this - /// method or by closing/dropping (if and only if the fd was never dup()'d/fork()'d) the `fd`. - /// Failure to do so will cause the `wait` method to always return immediately, causing ~100% - /// CPU load. - pub fn delete(&self, fd: &dyn AsRawDescriptor) -> Result<()> { - self.epoll_ctx.delete(fd)?; - self.hangups.set(0); - self.max_hangups.set(self.max_hangups.get() - 1); - Ok(()) - } - - // This method determines if the the user of wait is misusing the `PollContext` by leaving FDs - // in this `PollContext` that have been shutdown or hungup on. Such an FD will cause `wait` to - // return instantly with a hungup event. If that FD is perpetually left in this context, a busy - // loop burning ~100% of one CPU will silently occur with no human visible malfunction. - // - // How do we know if the client of this context is ignoring hangups? A naive implementation - // would trigger if consecutive wait calls yield hangup events, but there are legitimate cases - // for this, such as two distinct sockets becoming hungup across two consecutive wait calls. A - // smarter implementation would only trigger if `delete` wasn't called between waits that - // yielded hangups. Sadly `delete` isn't the only way to remove an FD from this context. The - // other way is for the client to close the hungup FD, which automatically removes it from this - // context. Assuming that the client always uses close, this implementation would too eagerly - // trigger. - // - // The implementation used here keeps an upper bound of FDs in this context using a counter - // hooked into add/delete (which is imprecise because close can also remove FDs without us - // knowing). The number of consecutive (no add or delete in between) hangups yielded by wait - // calls is counted and compared to the upper bound. If the upper bound is exceeded by the - // consecutive hangups, the implementation triggers the check and logs. - // - // This implementation has false negatives because the upper bound can be completely too high, - // in the worst case caused by only using close instead of delete. However, this method has the - // advantage of always triggering eventually genuine busy loop cases, requires no dynamic - // allocations, is fast and constant time to compute, and has no false positives. - fn check_for_hungup_busy_loop(&self, new_hangups: usize) { - let old_hangups = self.hangups.get(); - let max_hangups = self.max_hangups.get(); - if old_hangups <= max_hangups && old_hangups + new_hangups > max_hangups { - warn!( - "busy poll wait loop with hungup FDs detected on thread {}", - thread::current().name().unwrap_or("") - ); - // This panic is helpful for tests of this functionality. - #[cfg(test)] - panic!("hungup busy loop detected"); - } - self.hangups.set(old_hangups + new_hangups); - } - - /// Waits for any events to occur in FDs that were previously added to this context. - /// - /// The events are level-triggered, meaning that if any events are unhandled (i.e. not reading - /// for readable events and not closing for hungup events), subsequent calls to `wait` will - /// return immediately. The consequence of not handling an event perpetually while calling - /// `wait` is that the callers loop will degenerated to busy loop polling, pinning a CPU to - /// ~100% usage. - pub fn wait(&self) -> Result; 16]>> { - self.wait_timeout(Duration::new(i64::MAX as u64, 0)) - } - - /// Like `wait` except will only block for a maximum of the given `timeout`. - /// - /// This may return earlier than `timeout` with zero events if the duration indicated exceeds - /// system limits. - pub fn wait_timeout(&self, timeout: Duration) -> Result; 16]>> { - let events = self.epoll_ctx.wait_timeout(&self.events, timeout)?; - let hangups = events.iter_hungup().count(); - self.check_for_hungup_busy_loop(hangups); - Ok(events + let events = epoll_events[0..count] .iter() - .map(|event| TriggeredEvent { - token: event.token(), - is_readable: event.readable(), - is_writable: event.writable(), - is_hungup: event.hungup(), + .map(|e| { + // SAFETY: + // Converting `MaybeUninit` into `epoll_event` is safe here, since we + // are only iterating over elements that the `epoll_wait` system call initialized. + let e = unsafe { e.assume_init() }; + TriggeredEvent { + token: T::from_raw_token(e.u64), + is_readable: e.events & (EPOLLIN as u32) != 0, + is_writable: e.events & (EPOLLOUT as u32) != 0, + is_hungup: e.events & ((EPOLLHUP | EPOLLRDHUP) as u32) != 0, + } }) - .collect()) + .collect(); + Ok(events) } } @@ -676,7 +368,7 @@ impl IntoRawDescriptor for PollContext { mod tests { use super::{super::Event, *}; use base_poll_token_derive::PollToken; - use std::{os::unix::net::UnixStream, time::Instant}; + use std::time::Instant; #[test] fn poll_context() { @@ -726,23 +418,6 @@ mod tests { } } - #[test] - #[should_panic] - fn poll_context_hungup() { - let (s1, s2) = UnixStream::pair().unwrap(); - let ctx: PollContext = PollContext::new().unwrap(); - ctx.add(&s1, 1).unwrap(); - - // Causes s1 to receive hangup events, which we purposefully ignore to trip the detection - // logic in `PollContext`. - drop(s2); - - // Should easily panic within this many iterations. - for _ in 0..1000 { - ctx.wait().unwrap(); - } - } - #[test] fn poll_context_timeout() { let ctx: PollContext = PollContext::new().unwrap(); diff --git a/cros_async/src/sys/unix/fd_executor.rs b/cros_async/src/sys/unix/fd_executor.rs index 6f4515f73f..5b51d21c93 100644 --- a/cros_async/src/sys/unix/fd_executor.rs +++ b/cros_async/src/sys/unix/fd_executor.rs @@ -24,8 +24,8 @@ use std::{ use async_task::Task; use base::{ - add_fd_flags, warn, AsRawDescriptor, AsRawDescriptors, EpollContext, EpollEvents, Event, - RawDescriptor, WatchingEvents, + add_fd_flags, warn, AsRawDescriptor, AsRawDescriptors, Event, EventType, RawDescriptor, + WaitContext, }; use futures::task::noop_waker; use pin_utils::pin_mut; @@ -115,10 +115,7 @@ impl RegisteredSource { pub fn wait_readable(&self) -> Result { let ex = self.ex.upgrade().ok_or(Error::ExecutorGone)?; - let token = ex.add_operation( - self.source.as_raw_descriptor(), - WatchingEvents::empty().set_read(), - )?; + let token = ex.add_operation(self.source.as_raw_descriptor(), EventType::Read)?; Ok(PendingOperation { token: Some(token), @@ -131,10 +128,7 @@ impl RegisteredSource { pub fn wait_writable(&self) -> Result { let ex = self.ex.upgrade().ok_or(Error::ExecutorGone)?; - let token = ex.add_operation( - self.source.as_raw_descriptor(), - WatchingEvents::empty().set_write(), - )?; + let token = ex.add_operation(self.source.as_raw_descriptor(), EventType::Write)?; Ok(PendingOperation { token: Some(token), @@ -230,10 +224,7 @@ async fn notify_task(notify: Event, raw: Weak) { if let Some(ex) = raw.upgrade() { let token = ex - .add_operation( - notify.as_raw_descriptor(), - WatchingEvents::empty().set_read(), - ) + .add_operation(notify.as_raw_descriptor(), EventType::Read) .expect("Failed to add notify Event to PollCtx"); // We don't want to hold an active reference to the executor in the .await below. @@ -269,7 +260,7 @@ const WOKEN: i32 = 0x3e4d_3276u32 as i32; struct RawExecutor { queue: RunnableQueue, - poll_ctx: EpollContext, + poll_ctx: WaitContext, ops: Mutex>, blocking_pool: BlockingPool, state: AtomicI32, @@ -287,7 +278,7 @@ impl RawExecutor { let notify = notify.try_clone().map_err(Error::CloneEvent)?; Ok(RawExecutor { queue: RunnableQueue::new(), - poll_ctx: EpollContext::new().map_err(Error::CreatingContext)?, + poll_ctx: WaitContext::new().map_err(Error::CreatingContext)?, ops: Mutex::new(Slab::with_capacity(64)), blocking_pool: Default::default(), state: AtomicI32::new(PROCESSING), @@ -296,7 +287,7 @@ impl RawExecutor { }) } - fn add_operation(&self, fd: RawFd, events: WatchingEvents) -> Result { + fn add_operation(&self, fd: RawFd, event_type: EventType) -> Result { let duped_fd = unsafe { // Safe because duplicating an FD doesn't affect memory safety, and the dup'd FD // will only be added to the poll loop. @@ -306,7 +297,7 @@ impl RawExecutor { let entry = ops.vacant_entry(); let next_token = entry.key(); self.poll_ctx - .add_fd_with_events(&duped_fd, events, next_token) + .add_for_event(&duped_fd, event_type, next_token) .map_err(Error::SubmittingWaker)?; entry.insert(OpStatus::Pending(OpData { file: duped_fd, @@ -367,7 +358,6 @@ impl RawExecutor { } fn run(&self, cx: &mut Context, done: F) -> Result { - let events = EpollEvents::new(); pin_mut!(done); loop { @@ -392,16 +382,13 @@ impl RawExecutor { continue; } - let events = self - .poll_ctx - .wait(&events) - .map_err(Error::PollContextError)?; + let events = self.poll_ctx.wait().map_err(Error::PollContextError)?; // Set the state back to PROCESSING to prevent any tasks woken up by the loop below from // writing to the eventfd. self.state.store(PROCESSING, Ordering::Release); for e in events.iter() { - let token = e.token(); + let token = e.token; let mut ops = self.ops.lock(); // The op could have been canceled and removed by another thread so ignore it if it diff --git a/devices/src/usb/host_backend/host_backend_device_provider.rs b/devices/src/usb/host_backend/host_backend_device_provider.rs index 541d161a74..4c50b6f7aa 100644 --- a/devices/src/usb/host_backend/host_backend_device_provider.rs +++ b/devices/src/usb/host_backend/host_backend_device_provider.rs @@ -13,7 +13,7 @@ use crate::utils::AsyncJobQueue; use crate::utils::{EventHandler, EventLoop, FailHandle}; use anyhow::Context; -use base::{error, AsRawDescriptor, RawDescriptor, Tube, WatchingEvents}; +use base::{error, AsRawDescriptor, EventType, RawDescriptor, Tube}; use std::collections::HashMap; use std::mem; use std::time::Duration; @@ -73,7 +73,7 @@ impl HostBackendDeviceProvider { event_loop .add_event( &*inner.control_tube.lock(), - WatchingEvents::empty().set_read(), + EventType::Read, Arc::downgrade(&handler), ) .map_err(Error::AddToEventLoop)?; @@ -171,7 +171,7 @@ impl ProviderInner { if let Err(e) = self.event_loop.add_event( &*arc_mutex_device.lock(), - WatchingEvents::empty().set_read().set_write(), + EventType::ReadWrite, Arc::downgrade(&event_handler), ) { error!("failed to add USB device fd to event handler: {}", e); diff --git a/devices/src/usb/xhci/intr_resample_handler.rs b/devices/src/usb/xhci/intr_resample_handler.rs index 2526226cf3..0b99f2b6ba 100644 --- a/devices/src/usb/xhci/intr_resample_handler.rs +++ b/devices/src/usb/xhci/intr_resample_handler.rs @@ -6,7 +6,7 @@ use super::interrupter::Interrupter; use crate::utils::{EventHandler, EventLoop}; use anyhow::Context; -use base::{error, Event, WatchingEvents}; +use base::{error, Event, EventType}; use std::sync::Arc; use sync::Mutex; @@ -30,7 +30,7 @@ impl IntrResampleHandler { let tmp_handler: Arc = handler.clone(); if let Err(e) = event_loop.add_event( &handler.resample_evt, - WatchingEvents::empty().set_read(), + EventType::Read, Arc::downgrade(&tmp_handler), ) { error!("cannot add intr resample handler to event loop: {}", e); diff --git a/devices/src/usb/xhci/ring_buffer_controller.rs b/devices/src/usb/xhci/ring_buffer_controller.rs index 49ff719a79..7fbb03e118 100644 --- a/devices/src/usb/xhci/ring_buffer_controller.rs +++ b/devices/src/usb/xhci/ring_buffer_controller.rs @@ -10,7 +10,7 @@ use std::sync::{Arc, MutexGuard}; use sync::Mutex; use anyhow::Context; -use base::{error, Error as SysError, Event, WatchingEvents}; +use base::{error, Error as SysError, Event, EventType}; use remain::sorted; use thiserror::Error; use vm_memory::{GuestAddress, GuestMemory}; @@ -107,7 +107,7 @@ where event_loop .add_event( &controller.event, - WatchingEvents::empty().set_read(), + EventType::Read, Arc::downgrade(&event_handler), ) .map_err(Error::AddEvent)?; diff --git a/devices/src/utils/async_job_queue.rs b/devices/src/utils/async_job_queue.rs index 72956118e4..8ec8fb6c46 100644 --- a/devices/src/utils/async_job_queue.rs +++ b/devices/src/utils/async_job_queue.rs @@ -6,7 +6,7 @@ use super::{Error, Result}; use super::{EventHandler, EventLoop}; use anyhow::Context; -use base::{Event, WatchingEvents}; +use base::{Event, EventType}; use std::mem; use std::sync::Arc; use sync::Mutex; @@ -26,11 +26,7 @@ impl AsyncJobQueue { evt, }); let handler: Arc = queue.clone(); - event_loop.add_event( - &queue.evt, - WatchingEvents::empty().set_read(), - Arc::downgrade(&handler), - )?; + event_loop.add_event(&queue.evt, EventType::Read, Arc::downgrade(&handler))?; Ok(queue) } diff --git a/devices/src/utils/event_loop.rs b/devices/src/utils/event_loop.rs index f2bb8fda25..01b6208045 100644 --- a/devices/src/utils/event_loop.rs +++ b/devices/src/utils/event_loop.rs @@ -4,8 +4,7 @@ use super::error::{Error, Result}; use base::{ - error, warn, AsRawDescriptor, Descriptor, EpollContext, EpollEvents, Event, RawDescriptor, - WatchingEvents, + error, warn, AsRawDescriptor, Descriptor, Event, EventType, RawDescriptor, WaitContext, }; use std::collections::BTreeMap; use std::mem::drop; @@ -37,11 +36,11 @@ impl FailHandle for Option> { } } -/// EpollEventLoop is an event loop blocked on a set of fds. When a monitered events is triggered, +/// EventLoop is an event loop blocked on a set of fds. When a monitered events is triggered, /// event loop will invoke the mapped handler. pub struct EventLoop { fail_handle: Option>, - poll_ctx: Arc>, + poll_ctx: Arc>, handlers: Arc>>>, stop_evt: Event, } @@ -63,7 +62,7 @@ impl EventLoop { let fd_callbacks: Arc>>> = Arc::new(Mutex::new(BTreeMap::new())); - let poll_ctx: EpollContext = EpollContext::new() + let poll_ctx: WaitContext = WaitContext::new() .and_then(|pc| { pc.add(&stop_evt, Descriptor(stop_evt.as_raw_descriptor())) .and(Ok(pc)) @@ -81,13 +80,12 @@ impl EventLoop { let handle = thread::Builder::new() .name(name) .spawn(move || { - let event_loop = EpollEvents::new(); loop { if fail_handle.failed() { error!("xhci controller already failed, stopping event ring"); return; } - let events = match poll_ctx.wait(&event_loop) { + let events = match poll_ctx.wait() { Ok(events) => events, Err(e) => { error!("cannot poll {:?}", e); @@ -96,7 +94,7 @@ impl EventLoop { } }; for event in &events { - let fd = event.token().as_raw_descriptor(); + let fd = event.token.as_raw_descriptor(); if fd == stop_evt.as_raw_descriptor() { return; } @@ -112,7 +110,7 @@ impl EventLoop { // If the file descriptor is hung up, remove it after calling the handler // one final time. - let mut remove = event.hungup(); + let mut remove = event.is_hungup; if let Some(handler) = weak_handler.upgrade() { // Drop lock before triggering the event. @@ -128,7 +126,7 @@ impl EventLoop { } if remove { - let _ = poll_ctx.delete(&event.token()); + let _ = poll_ctx.delete(&event.token); let _ = locked.remove(&fd); } } @@ -148,7 +146,7 @@ impl EventLoop { pub fn add_event( &self, descriptor: &dyn AsRawDescriptor, - events: WatchingEvents, + event_type: EventType, handler: Weak, ) -> Result<()> { if self.fail_handle.failed() { @@ -159,9 +157,9 @@ impl EventLoop { .insert(descriptor.as_raw_descriptor(), handler); // This might fail due to epoll syscall. Check epoll_ctl(2). self.poll_ctx - .add_fd_with_events( + .add_for_event( descriptor, - events, + event_type, Descriptor(descriptor.as_raw_descriptor()), ) .map_err(Error::WaitContextAddDescriptor) @@ -230,12 +228,8 @@ mod tests { evt, }); let t: Arc = h.clone(); - l.add_event( - &h.evt, - WatchingEvents::empty().set_read(), - Arc::downgrade(&t), - ) - .unwrap(); + l.add_event(&h.evt, EventType::Read, Arc::downgrade(&t)) + .unwrap(); self_evt.write(1).unwrap(); { let mut val = h.val.lock().unwrap();