mirror of
https://chromium.googlesource.com/crosvm/crosvm
synced 2024-11-24 12:34:31 +00:00
base: unix: merge EpollContext into PollContext
The main distinction between PollContext and EpollContext was that the latter was safe to use from multiple threads. This was not true of the more widely-used PollContext for two reasons: 1. The `events` array was stored inside the `PollContext` structure, whereas `EpollContext` required the caller to pass their own storage for events. 2. `PollContext` had a hangup detection feature used to debug busy looping cases that result from a failure to remove hungup file descriptors from the context. Point 1 is resolved by returning a `SmallVec` of events that avoids allocation for the normal case (and in fact it should avoid allocation in all cases on Linux, where the maximum number of events returned from a single `epoll_wait()` call is limited to the same size as the preallocated `SmallVec`). This simplifies the API and also means that there is no need for per-context storage. Point 2 can't easily be resolved, since it would require the `wait` call to mutate shared state (this could be done by adding a mutex around the shared data, but that seems like too much overhead for the value of the feature). Instead, this patch just removes the hangup detection code. BUG=b:213153157 TEST=tools/dev_container tools/presubmit --all Change-Id: Ia48c46de96976da27cb5387e3e5e8fcf92d0e85b Reviewed-on: https://chromium-review.googlesource.com/c/chromiumos/platform/crosvm/+/3633111 Commit-Queue: Daniel Verkamp <dverkamp@chromium.org> Reviewed-by: Noah Gold <nkgold@google.com> Tested-by: kokoro <noreply+kokoro@google.com>
This commit is contained in:
parent
33adade230
commit
3e8054ee7c
9 changed files with 103 additions and 453 deletions
|
@ -50,7 +50,7 @@ cfg_if::cfg_if! {
|
|||
|
||||
pub use unix::net;
|
||||
|
||||
// File related exports.
|
||||
// File related exports.
|
||||
pub use platform::{FileFlags, get_max_open_files};
|
||||
|
||||
// memory/mmap related exports.
|
||||
|
@ -65,13 +65,11 @@ cfg_if::cfg_if! {
|
|||
validate_raw_descriptor, clear_descriptor_cloexec,
|
||||
};
|
||||
|
||||
|
||||
// Event/signal related exports.
|
||||
pub use platform::{
|
||||
block_signal, clear_signal, get_blocked_signals, new_pipe_full,
|
||||
register_rt_signal_handler, signal, unblock_signal, Killable, SIGRTMIN,
|
||||
WatchingEvents, EpollContext, EpollEvents, AcpiNotifyEvent, NetlinkGenericSocket,
|
||||
SignalFd, Terminal, EventFd,
|
||||
WatchingEvents, AcpiNotifyEvent, NetlinkGenericSocket, SignalFd, Terminal, EventFd,
|
||||
};
|
||||
|
||||
pub use platform::{
|
||||
|
|
|
@ -62,7 +62,7 @@ pub use get_filesystem_type::*;
|
|||
pub use ioctl::*;
|
||||
pub use mmap::*;
|
||||
pub use netlink::*;
|
||||
pub use poll::{EpollContext, EpollEvents, PollContext as EventContext, PollToken, WatchingEvents};
|
||||
pub use poll::{PollContext as EventContext, PollToken, WatchingEvents};
|
||||
pub use priority::*;
|
||||
pub use sched::*;
|
||||
pub use scoped_signal_handler::*;
|
||||
|
|
|
@ -3,21 +3,13 @@
|
|||
// found in the LICENSE file.
|
||||
|
||||
use std::{
|
||||
cell::{Cell, Ref, RefCell},
|
||||
cmp::min,
|
||||
fs::File,
|
||||
i32, i64,
|
||||
marker::PhantomData,
|
||||
ptr::null_mut,
|
||||
slice, thread,
|
||||
time::Duration,
|
||||
cmp::min, fs::File, marker::PhantomData, mem::MaybeUninit, ptr::null_mut, time::Duration,
|
||||
};
|
||||
|
||||
use libc::{
|
||||
c_int, epoll_create1, epoll_ctl, epoll_event, epoll_wait, EPOLLHUP, EPOLLIN, EPOLLOUT,
|
||||
EPOLLRDHUP, EPOLL_CLOEXEC, EPOLL_CTL_ADD, EPOLL_CTL_DEL, EPOLL_CTL_MOD,
|
||||
};
|
||||
use log::warn;
|
||||
use smallvec::SmallVec;
|
||||
|
||||
use super::{errno_result, Result};
|
||||
|
@ -27,29 +19,15 @@ use crate::{
|
|||
|
||||
const POLL_CONTEXT_MAX_EVENTS: usize = 16;
|
||||
|
||||
fn convert_to_watching_events(event_type: EventType) -> WatchingEvents {
|
||||
match event_type {
|
||||
EventType::None => WatchingEvents::empty(),
|
||||
EventType::Read => WatchingEvents::empty().set_read(),
|
||||
EventType::Write => WatchingEvents::empty().set_write(),
|
||||
EventType::ReadWrite => WatchingEvents::empty().set_read().set_write(),
|
||||
}
|
||||
}
|
||||
|
||||
/// EpollEvents wraps raw epoll_events, it should only be used with EpollContext.
|
||||
pub struct EpollEvents(RefCell<[epoll_event; POLL_CONTEXT_MAX_EVENTS]>);
|
||||
|
||||
impl EpollEvents {
|
||||
pub fn new() -> EpollEvents {
|
||||
EpollEvents(RefCell::new(
|
||||
[epoll_event { events: 0, u64: 0 }; POLL_CONTEXT_MAX_EVENTS],
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for EpollEvents {
|
||||
fn default() -> EpollEvents {
|
||||
Self::new()
|
||||
impl From<EventType> for u32 {
|
||||
fn from(et: EventType) -> u32 {
|
||||
let v = match et {
|
||||
EventType::None => 0,
|
||||
EventType::Read => EPOLLIN,
|
||||
EventType::Write => EPOLLOUT,
|
||||
EventType::ReadWrite => EPOLLIN | EPOLLOUT,
|
||||
};
|
||||
v as u32
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -128,115 +106,6 @@ impl PollToken for () {
|
|||
fn from_raw_token(_data: u64) -> Self {}
|
||||
}
|
||||
|
||||
/// An event returned by `PollContext::wait`.
|
||||
pub struct PollEvent<'a, T> {
|
||||
event: &'a epoll_event,
|
||||
token: PhantomData<T>, // Needed to satisfy usage of T
|
||||
}
|
||||
|
||||
impl<'a, T: PollToken> PollEvent<'a, T> {
|
||||
/// Gets the token associated in `PollContext::add` with this event.
|
||||
pub fn token(&self) -> T {
|
||||
T::from_raw_token(self.event.u64)
|
||||
}
|
||||
|
||||
/// True if the `fd` associated with this token in `PollContext::add` is readable.
|
||||
pub fn readable(&self) -> bool {
|
||||
self.event.events & (EPOLLIN as u32) != 0
|
||||
}
|
||||
|
||||
/// True if the `fd` associated with this token in `PollContext::add` is writable.
|
||||
pub fn writable(&self) -> bool {
|
||||
self.event.events & (EPOLLOUT as u32) != 0
|
||||
}
|
||||
|
||||
/// True if the `fd` associated with this token in `PollContext::add` has been hungup on.
|
||||
pub fn hungup(&self) -> bool {
|
||||
self.event.events & ((EPOLLHUP | EPOLLRDHUP) as u32) != 0
|
||||
}
|
||||
}
|
||||
|
||||
/// An iterator over some (sub)set of events returned by `PollContext::wait`.
|
||||
pub struct PollEventIter<'a, I, T>
|
||||
where
|
||||
I: Iterator<Item = &'a epoll_event>,
|
||||
{
|
||||
mask: u32,
|
||||
iter: I,
|
||||
tokens: PhantomData<[T]>, // Needed to satisfy usage of T
|
||||
}
|
||||
|
||||
impl<'a, I, T> Iterator for PollEventIter<'a, I, T>
|
||||
where
|
||||
I: Iterator<Item = &'a epoll_event>,
|
||||
T: PollToken,
|
||||
{
|
||||
type Item = PollEvent<'a, T>;
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
let mask = self.mask;
|
||||
self.iter
|
||||
.find(|event| (event.events & mask) != 0)
|
||||
.map(|event| PollEvent {
|
||||
event,
|
||||
token: PhantomData,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// The list of event returned by `PollContext::wait`.
|
||||
pub struct PollEvents<'a, T> {
|
||||
count: usize,
|
||||
events: Ref<'a, [epoll_event; POLL_CONTEXT_MAX_EVENTS]>,
|
||||
tokens: PhantomData<[T]>, // Needed to satisfy usage of T
|
||||
}
|
||||
|
||||
impl<'a, T: PollToken> PollEvents<'a, T> {
|
||||
/// Iterates over each event.
|
||||
pub fn iter(&self) -> PollEventIter<slice::Iter<epoll_event>, T> {
|
||||
PollEventIter {
|
||||
mask: 0xffff_ffff,
|
||||
iter: self.events[..self.count].iter(),
|
||||
tokens: PhantomData,
|
||||
}
|
||||
}
|
||||
|
||||
/// Iterates over each readable event.
|
||||
pub fn iter_readable(&self) -> PollEventIter<slice::Iter<epoll_event>, T> {
|
||||
PollEventIter {
|
||||
mask: EPOLLIN as u32,
|
||||
iter: self.events[..self.count].iter(),
|
||||
tokens: PhantomData,
|
||||
}
|
||||
}
|
||||
|
||||
/// Iterates over each writable event.
|
||||
pub fn iter_writable(&self) -> PollEventIter<slice::Iter<epoll_event>, T> {
|
||||
PollEventIter {
|
||||
mask: EPOLLOUT as u32,
|
||||
iter: self.events[..self.count].iter(),
|
||||
tokens: PhantomData,
|
||||
}
|
||||
}
|
||||
|
||||
/// Iterates over each hungup event.
|
||||
pub fn iter_hungup(&self) -> PollEventIter<slice::Iter<epoll_event>, T> {
|
||||
PollEventIter {
|
||||
mask: (EPOLLHUP | EPOLLRDHUP) as u32,
|
||||
iter: self.events[..self.count].iter(),
|
||||
tokens: PhantomData,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, T: PollToken> IntoIterator for &'a PollEvents<'_, T> {
|
||||
type Item = PollEvent<'a, T>;
|
||||
type IntoIter = PollEventIter<'a, slice::Iter<'a, epoll_event>, T>;
|
||||
|
||||
fn into_iter(self) -> Self::IntoIter {
|
||||
self.iter()
|
||||
}
|
||||
}
|
||||
|
||||
/// Watching events taken by PollContext.
|
||||
pub struct WatchingEvents(u32);
|
||||
|
||||
|
@ -271,37 +140,36 @@ impl WatchingEvents {
|
|||
}
|
||||
}
|
||||
|
||||
/// EpollContext wraps linux epoll. It provides similar interface to PollContext.
|
||||
/// It is thread safe while PollContext is not. It requires user to pass in a reference of
|
||||
/// EpollEvents while PollContext does not. Always use PollContext if you don't need to access the
|
||||
/// same epoll from different threads.
|
||||
pub struct EpollContext<T> {
|
||||
/// Used to poll multiple objects that have file descriptors.
|
||||
///
|
||||
/// See [`crate::WaitContext`] for an example that uses the cross-platform wrapper.
|
||||
pub struct PollContext<T> {
|
||||
epoll_ctx: File,
|
||||
// Needed to satisfy usage of T
|
||||
tokens: PhantomData<[T]>,
|
||||
}
|
||||
|
||||
impl<T: PollToken> EpollContext<T> {
|
||||
/// Creates a new `EpollContext`.
|
||||
pub fn new() -> Result<EpollContext<T>> {
|
||||
impl<T: PollToken> PollContext<T> {
|
||||
/// Creates a new `PollContext`.
|
||||
pub fn new() -> Result<PollContext<T>> {
|
||||
// Safe because we check the return value.
|
||||
let epoll_fd = unsafe { epoll_create1(EPOLL_CLOEXEC) };
|
||||
if epoll_fd < 0 {
|
||||
return errno_result();
|
||||
}
|
||||
Ok(EpollContext {
|
||||
Ok(PollContext {
|
||||
epoll_ctx: unsafe { File::from_raw_descriptor(epoll_fd) },
|
||||
tokens: PhantomData,
|
||||
})
|
||||
}
|
||||
|
||||
/// Creates a new `EpollContext` and adds the slice of `fd` and `token` tuples to the new
|
||||
/// Creates a new `PollContext` and adds the slice of `fd` and `token` tuples to the new
|
||||
/// context.
|
||||
///
|
||||
/// This is equivalent to calling `new` followed by `add_many`. If there is an error, this will
|
||||
/// return the error instead of the new context.
|
||||
pub fn build_with(fd_tokens: &[(&dyn AsRawDescriptor, T)]) -> Result<EpollContext<T>> {
|
||||
let ctx = EpollContext::new()?;
|
||||
pub fn build_with(fd_tokens: &[(&dyn AsRawDescriptor, T)]) -> Result<PollContext<T>> {
|
||||
let ctx = PollContext::new()?;
|
||||
ctx.add_many(fd_tokens)?;
|
||||
Ok(ctx)
|
||||
}
|
||||
|
@ -325,23 +193,24 @@ impl<T: PollToken> EpollContext<T> {
|
|||
/// there were no duplicated file descriptors (i.e. adding the same descriptor with a different
|
||||
/// FD number) added to this context, events will not be reported by `wait` anymore.
|
||||
pub fn add(&self, fd: &dyn AsRawDescriptor, token: T) -> Result<()> {
|
||||
self.add_fd_with_events(fd, WatchingEvents::empty().set_read(), token)
|
||||
self.add_for_event(fd, EventType::Read, token)
|
||||
}
|
||||
|
||||
/// Adds the given `fd` to this context, watching for the specified events and associates the
|
||||
/// given 'token' with those events.
|
||||
/// Adds the given `descriptor` to this context, watching for the specified events and
|
||||
/// associates the given 'token' with those events.
|
||||
///
|
||||
/// A `fd` can only be added once and does not need to be kept open. If the `fd` is dropped and
|
||||
/// there were no duplicated file descriptors (i.e. adding the same descriptor with a different
|
||||
/// FD number) added to this context, events will not be reported by `wait` anymore.
|
||||
pub fn add_fd_with_events(
|
||||
/// A `descriptor` can only be added once and does not need to be kept open. If the `descriptor`
|
||||
/// is dropped and there were no duplicated file descriptors (i.e. adding the same descriptor
|
||||
/// with a different FD number) added to this context, events will not be reported by `wait`
|
||||
/// anymore.
|
||||
pub fn add_for_event(
|
||||
&self,
|
||||
fd: &dyn AsRawDescriptor,
|
||||
events: WatchingEvents,
|
||||
descriptor: &dyn AsRawDescriptor,
|
||||
event_type: EventType,
|
||||
token: T,
|
||||
) -> Result<()> {
|
||||
let mut evt = epoll_event {
|
||||
events: events.get_raw(),
|
||||
events: event_type.into(),
|
||||
u64: token.as_raw_token(),
|
||||
};
|
||||
// Safe because we give a valid epoll FD and FD to watch, as well as a valid epoll_event
|
||||
|
@ -350,7 +219,7 @@ impl<T: PollToken> EpollContext<T> {
|
|||
epoll_ctl(
|
||||
self.epoll_ctx.as_raw_descriptor(),
|
||||
EPOLL_CTL_ADD,
|
||||
fd.as_raw_descriptor(),
|
||||
descriptor.as_raw_descriptor(),
|
||||
&mut evt,
|
||||
)
|
||||
};
|
||||
|
@ -361,10 +230,10 @@ impl<T: PollToken> EpollContext<T> {
|
|||
}
|
||||
|
||||
/// If `fd` was previously added to this context, the watched events will be replaced with
|
||||
/// `events` and the token associated with it will be replaced with the given `token`.
|
||||
pub fn modify(&self, fd: &dyn AsRawDescriptor, events: WatchingEvents, token: T) -> Result<()> {
|
||||
/// `event_type` and the token associated with it will be replaced with the given `token`.
|
||||
pub fn modify(&self, fd: &dyn AsRawDescriptor, event_type: EventType, token: T) -> Result<()> {
|
||||
let mut evt = epoll_event {
|
||||
events: events.0,
|
||||
events: event_type.into(),
|
||||
u64: token.as_raw_token(),
|
||||
};
|
||||
// Safe because we give a valid epoll FD and FD to modify, as well as a valid epoll_event
|
||||
|
@ -413,19 +282,23 @@ impl<T: PollToken> EpollContext<T> {
|
|||
/// return immediately. The consequence of not handling an event perpetually while calling
|
||||
/// `wait` is that the callers loop will degenerated to busy loop polling, pinning a CPU to
|
||||
/// ~100% usage.
|
||||
pub fn wait<'a>(&self, events: &'a EpollEvents) -> Result<PollEvents<'a, T>> {
|
||||
self.wait_timeout(events, Duration::new(i64::MAX as u64, 0))
|
||||
pub fn wait(&self) -> Result<SmallVec<[TriggeredEvent<T>; 16]>> {
|
||||
self.wait_timeout(Duration::new(i64::MAX as u64, 0))
|
||||
}
|
||||
|
||||
/// Like `wait` except will only block for a maximum of the given `timeout`.
|
||||
///
|
||||
/// This may return earlier than `timeout` with zero events if the duration indicated exceeds
|
||||
/// system limits.
|
||||
pub fn wait_timeout<'a>(
|
||||
&self,
|
||||
events: &'a EpollEvents,
|
||||
timeout: Duration,
|
||||
) -> Result<PollEvents<'a, T>> {
|
||||
pub fn wait_timeout(&self, timeout: Duration) -> Result<SmallVec<[TriggeredEvent<T>; 16]>> {
|
||||
// SAFETY:
|
||||
// `MaybeUnint<T>` has the same layout as plain `T` (`epoll_event` in our case).
|
||||
// We submit an uninitialized array to the `epoll_wait` system call, which returns how many
|
||||
// elements it initialized, and then we convert only the initialized `MaybeUnint` values
|
||||
// into `epoll_event` structures after the call.
|
||||
let mut epoll_events: [MaybeUninit<epoll_event>; POLL_CONTEXT_MAX_EVENTS] =
|
||||
unsafe { MaybeUninit::uninit().assume_init() };
|
||||
|
||||
let timeout_millis = if timeout.as_secs() as i64 == i64::max_value() {
|
||||
// We make the convenient assumption that 2^63 seconds is an effectively unbounded time
|
||||
// frame. This is meant to mesh with `wait` calling us with no timeout.
|
||||
|
@ -441,14 +314,15 @@ impl<T: PollToken> EpollContext<T> {
|
|||
min(i32::max_value() as u64, millis) as i32
|
||||
};
|
||||
let ret = {
|
||||
let mut epoll_events = events.0.borrow_mut();
|
||||
let max_events = epoll_events.len() as c_int;
|
||||
// Safe because we give an epoll context and a properly sized epoll_events array
|
||||
// pointer, which we trust the kernel to fill in properly.
|
||||
// pointer, which we trust the kernel to fill in properly. The `transmute` is safe,
|
||||
// since `MaybeUnint<T>` has the same layout as `T`, and the `epoll_wait` syscall will
|
||||
// initialize as many elements of the `epoll_events` array as it returns.
|
||||
unsafe {
|
||||
handle_eintr_errno!(epoll_wait(
|
||||
self.epoll_ctx.as_raw_descriptor(),
|
||||
&mut epoll_events[0],
|
||||
std::mem::transmute(&mut epoll_events[0]),
|
||||
max_events,
|
||||
timeout_millis
|
||||
))
|
||||
|
@ -457,206 +331,24 @@ impl<T: PollToken> EpollContext<T> {
|
|||
if ret < 0 {
|
||||
return errno_result();
|
||||
}
|
||||
let epoll_events = events.0.borrow();
|
||||
let events = PollEvents {
|
||||
count: ret as usize,
|
||||
events: epoll_events,
|
||||
tokens: PhantomData,
|
||||
};
|
||||
Ok(events)
|
||||
}
|
||||
}
|
||||
let count = ret as usize;
|
||||
|
||||
impl<T: PollToken> AsRawDescriptor for EpollContext<T> {
|
||||
fn as_raw_descriptor(&self) -> RawDescriptor {
|
||||
self.epoll_ctx.as_raw_descriptor()
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: PollToken> IntoRawDescriptor for EpollContext<T> {
|
||||
fn into_raw_descriptor(self) -> RawDescriptor {
|
||||
self.epoll_ctx.into_raw_descriptor()
|
||||
}
|
||||
}
|
||||
|
||||
/// Used to poll multiple objects that have file descriptors.
|
||||
///
|
||||
/// See [`crate::WaitContext`] for an example that uses the cross-platform wrapper.
|
||||
pub struct PollContext<T> {
|
||||
epoll_ctx: EpollContext<T>,
|
||||
|
||||
// We use a RefCell here so that the `wait` method only requires an immutable self reference
|
||||
// while returning the events (encapsulated by PollEvents). Without the RefCell, `wait` would
|
||||
// hold a mutable reference that lives as long as its returned reference (i.e. the PollEvents),
|
||||
// even though that reference is immutable. This is terribly inconvenient for the caller because
|
||||
// the borrow checking would prevent them from using `delete` and `add` while the events are in
|
||||
// scope.
|
||||
events: EpollEvents,
|
||||
|
||||
// Hangup busy loop detection variables. See `check_for_hungup_busy_loop`.
|
||||
hangups: Cell<usize>,
|
||||
max_hangups: Cell<usize>,
|
||||
}
|
||||
|
||||
impl<T: PollToken> PollContext<T> {
|
||||
/// Creates a new `PollContext`.
|
||||
pub fn new() -> Result<PollContext<T>> {
|
||||
Ok(PollContext {
|
||||
epoll_ctx: EpollContext::new()?,
|
||||
events: EpollEvents::new(),
|
||||
hangups: Cell::new(0),
|
||||
max_hangups: Cell::new(0),
|
||||
})
|
||||
}
|
||||
|
||||
/// Creates a new `PollContext` and adds the slice of `fd` and `token` tuples to the new
|
||||
/// context.
|
||||
///
|
||||
/// This is equivalent to calling `new` followed by `add_many`. If there is an error, this will
|
||||
/// return the error instead of the new context.
|
||||
pub fn build_with(fd_tokens: &[(&dyn AsRawDescriptor, T)]) -> Result<PollContext<T>> {
|
||||
let ctx = PollContext::new()?;
|
||||
ctx.add_many(fd_tokens)?;
|
||||
Ok(ctx)
|
||||
}
|
||||
|
||||
/// Adds the given slice of `fd` and `token` tuples to this context.
|
||||
///
|
||||
/// This is equivalent to calling `add` with each `fd` and `token`. If there are any errors,
|
||||
/// this method will stop adding `fd`s and return the first error, leaving this context in a
|
||||
/// undefined state.
|
||||
pub fn add_many(&self, fd_tokens: &[(&dyn AsRawDescriptor, T)]) -> Result<()> {
|
||||
for (fd, token) in fd_tokens {
|
||||
self.add(*fd, T::from_raw_token(token.as_raw_token()))?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Adds the given `fd` to this context and associates the given `token` with the `fd`'s
|
||||
/// readable events.
|
||||
///
|
||||
/// A `fd` can only be added once and does not need to be kept open. If the `fd` is dropped and
|
||||
/// there were no duplicated file descriptors (i.e. adding the same descriptor with a different
|
||||
/// FD number) added to this context, events will not be reported by `wait` anymore.
|
||||
pub fn add(&self, fd: &dyn AsRawDescriptor, token: T) -> Result<()> {
|
||||
self.add_fd_with_events(fd, WatchingEvents::empty().set_read(), token)
|
||||
}
|
||||
|
||||
/// Adds the given `descriptor` to this context, watching for the specified events and
|
||||
/// associates the given 'token' with those events.
|
||||
///
|
||||
/// A `descriptor` can only be added once and does not need to be kept open. If the `descriptor`
|
||||
/// is dropped and there were no duplicated file descriptors (i.e. adding the same descriptor
|
||||
/// with a different FD number) added to this context, events will not be reported by `wait`
|
||||
/// anymore.
|
||||
pub fn add_for_event(
|
||||
&self,
|
||||
descriptor: &dyn AsRawDescriptor,
|
||||
event_type: EventType,
|
||||
token: T,
|
||||
) -> Result<()> {
|
||||
self.add_fd_with_events(descriptor, convert_to_watching_events(event_type), token)
|
||||
}
|
||||
|
||||
fn add_fd_with_events(
|
||||
&self,
|
||||
fd: &dyn AsRawDescriptor,
|
||||
events: WatchingEvents,
|
||||
token: T,
|
||||
) -> Result<()> {
|
||||
self.epoll_ctx.add_fd_with_events(fd, events, token)?;
|
||||
self.hangups.set(0);
|
||||
self.max_hangups.set(self.max_hangups.get() + 1);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// If `fd` was previously added to this context, the watched events will be replaced with
|
||||
/// `event_type` and the token associated with it will be replaced with the given `token`.
|
||||
pub fn modify(&self, fd: &dyn AsRawDescriptor, event_type: EventType, token: T) -> Result<()> {
|
||||
self.epoll_ctx
|
||||
.modify(fd, convert_to_watching_events(event_type), token)
|
||||
}
|
||||
|
||||
/// Deletes the given `fd` from this context.
|
||||
///
|
||||
/// If an `fd`'s token shows up in the list of hangup events, it should be removed using this
|
||||
/// method or by closing/dropping (if and only if the fd was never dup()'d/fork()'d) the `fd`.
|
||||
/// Failure to do so will cause the `wait` method to always return immediately, causing ~100%
|
||||
/// CPU load.
|
||||
pub fn delete(&self, fd: &dyn AsRawDescriptor) -> Result<()> {
|
||||
self.epoll_ctx.delete(fd)?;
|
||||
self.hangups.set(0);
|
||||
self.max_hangups.set(self.max_hangups.get() - 1);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// This method determines if the the user of wait is misusing the `PollContext` by leaving FDs
|
||||
// in this `PollContext` that have been shutdown or hungup on. Such an FD will cause `wait` to
|
||||
// return instantly with a hungup event. If that FD is perpetually left in this context, a busy
|
||||
// loop burning ~100% of one CPU will silently occur with no human visible malfunction.
|
||||
//
|
||||
// How do we know if the client of this context is ignoring hangups? A naive implementation
|
||||
// would trigger if consecutive wait calls yield hangup events, but there are legitimate cases
|
||||
// for this, such as two distinct sockets becoming hungup across two consecutive wait calls. A
|
||||
// smarter implementation would only trigger if `delete` wasn't called between waits that
|
||||
// yielded hangups. Sadly `delete` isn't the only way to remove an FD from this context. The
|
||||
// other way is for the client to close the hungup FD, which automatically removes it from this
|
||||
// context. Assuming that the client always uses close, this implementation would too eagerly
|
||||
// trigger.
|
||||
//
|
||||
// The implementation used here keeps an upper bound of FDs in this context using a counter
|
||||
// hooked into add/delete (which is imprecise because close can also remove FDs without us
|
||||
// knowing). The number of consecutive (no add or delete in between) hangups yielded by wait
|
||||
// calls is counted and compared to the upper bound. If the upper bound is exceeded by the
|
||||
// consecutive hangups, the implementation triggers the check and logs.
|
||||
//
|
||||
// This implementation has false negatives because the upper bound can be completely too high,
|
||||
// in the worst case caused by only using close instead of delete. However, this method has the
|
||||
// advantage of always triggering eventually genuine busy loop cases, requires no dynamic
|
||||
// allocations, is fast and constant time to compute, and has no false positives.
|
||||
fn check_for_hungup_busy_loop(&self, new_hangups: usize) {
|
||||
let old_hangups = self.hangups.get();
|
||||
let max_hangups = self.max_hangups.get();
|
||||
if old_hangups <= max_hangups && old_hangups + new_hangups > max_hangups {
|
||||
warn!(
|
||||
"busy poll wait loop with hungup FDs detected on thread {}",
|
||||
thread::current().name().unwrap_or("")
|
||||
);
|
||||
// This panic is helpful for tests of this functionality.
|
||||
#[cfg(test)]
|
||||
panic!("hungup busy loop detected");
|
||||
}
|
||||
self.hangups.set(old_hangups + new_hangups);
|
||||
}
|
||||
|
||||
/// Waits for any events to occur in FDs that were previously added to this context.
|
||||
///
|
||||
/// The events are level-triggered, meaning that if any events are unhandled (i.e. not reading
|
||||
/// for readable events and not closing for hungup events), subsequent calls to `wait` will
|
||||
/// return immediately. The consequence of not handling an event perpetually while calling
|
||||
/// `wait` is that the callers loop will degenerated to busy loop polling, pinning a CPU to
|
||||
/// ~100% usage.
|
||||
pub fn wait(&self) -> Result<SmallVec<[TriggeredEvent<T>; 16]>> {
|
||||
self.wait_timeout(Duration::new(i64::MAX as u64, 0))
|
||||
}
|
||||
|
||||
/// Like `wait` except will only block for a maximum of the given `timeout`.
|
||||
///
|
||||
/// This may return earlier than `timeout` with zero events if the duration indicated exceeds
|
||||
/// system limits.
|
||||
pub fn wait_timeout(&self, timeout: Duration) -> Result<SmallVec<[TriggeredEvent<T>; 16]>> {
|
||||
let events = self.epoll_ctx.wait_timeout(&self.events, timeout)?;
|
||||
let hangups = events.iter_hungup().count();
|
||||
self.check_for_hungup_busy_loop(hangups);
|
||||
Ok(events
|
||||
let events = epoll_events[0..count]
|
||||
.iter()
|
||||
.map(|event| TriggeredEvent {
|
||||
token: event.token(),
|
||||
is_readable: event.readable(),
|
||||
is_writable: event.writable(),
|
||||
is_hungup: event.hungup(),
|
||||
.map(|e| {
|
||||
// SAFETY:
|
||||
// Converting `MaybeUninit<epoll_event>` into `epoll_event` is safe here, since we
|
||||
// are only iterating over elements that the `epoll_wait` system call initialized.
|
||||
let e = unsafe { e.assume_init() };
|
||||
TriggeredEvent {
|
||||
token: T::from_raw_token(e.u64),
|
||||
is_readable: e.events & (EPOLLIN as u32) != 0,
|
||||
is_writable: e.events & (EPOLLOUT as u32) != 0,
|
||||
is_hungup: e.events & ((EPOLLHUP | EPOLLRDHUP) as u32) != 0,
|
||||
}
|
||||
})
|
||||
.collect())
|
||||
.collect();
|
||||
Ok(events)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -676,7 +368,7 @@ impl<T: PollToken> IntoRawDescriptor for PollContext<T> {
|
|||
mod tests {
|
||||
use super::{super::Event, *};
|
||||
use base_poll_token_derive::PollToken;
|
||||
use std::{os::unix::net::UnixStream, time::Instant};
|
||||
use std::time::Instant;
|
||||
|
||||
#[test]
|
||||
fn poll_context() {
|
||||
|
@ -726,23 +418,6 @@ mod tests {
|
|||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[should_panic]
|
||||
fn poll_context_hungup() {
|
||||
let (s1, s2) = UnixStream::pair().unwrap();
|
||||
let ctx: PollContext<u32> = PollContext::new().unwrap();
|
||||
ctx.add(&s1, 1).unwrap();
|
||||
|
||||
// Causes s1 to receive hangup events, which we purposefully ignore to trip the detection
|
||||
// logic in `PollContext`.
|
||||
drop(s2);
|
||||
|
||||
// Should easily panic within this many iterations.
|
||||
for _ in 0..1000 {
|
||||
ctx.wait().unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn poll_context_timeout() {
|
||||
let ctx: PollContext<u32> = PollContext::new().unwrap();
|
||||
|
|
|
@ -24,8 +24,8 @@ use std::{
|
|||
|
||||
use async_task::Task;
|
||||
use base::{
|
||||
add_fd_flags, warn, AsRawDescriptor, AsRawDescriptors, EpollContext, EpollEvents, Event,
|
||||
RawDescriptor, WatchingEvents,
|
||||
add_fd_flags, warn, AsRawDescriptor, AsRawDescriptors, Event, EventType, RawDescriptor,
|
||||
WaitContext,
|
||||
};
|
||||
use futures::task::noop_waker;
|
||||
use pin_utils::pin_mut;
|
||||
|
@ -115,10 +115,7 @@ impl<F: AsRawDescriptor> RegisteredSource<F> {
|
|||
pub fn wait_readable(&self) -> Result<PendingOperation> {
|
||||
let ex = self.ex.upgrade().ok_or(Error::ExecutorGone)?;
|
||||
|
||||
let token = ex.add_operation(
|
||||
self.source.as_raw_descriptor(),
|
||||
WatchingEvents::empty().set_read(),
|
||||
)?;
|
||||
let token = ex.add_operation(self.source.as_raw_descriptor(), EventType::Read)?;
|
||||
|
||||
Ok(PendingOperation {
|
||||
token: Some(token),
|
||||
|
@ -131,10 +128,7 @@ impl<F: AsRawDescriptor> RegisteredSource<F> {
|
|||
pub fn wait_writable(&self) -> Result<PendingOperation> {
|
||||
let ex = self.ex.upgrade().ok_or(Error::ExecutorGone)?;
|
||||
|
||||
let token = ex.add_operation(
|
||||
self.source.as_raw_descriptor(),
|
||||
WatchingEvents::empty().set_write(),
|
||||
)?;
|
||||
let token = ex.add_operation(self.source.as_raw_descriptor(), EventType::Write)?;
|
||||
|
||||
Ok(PendingOperation {
|
||||
token: Some(token),
|
||||
|
@ -230,10 +224,7 @@ async fn notify_task(notify: Event, raw: Weak<RawExecutor>) {
|
|||
|
||||
if let Some(ex) = raw.upgrade() {
|
||||
let token = ex
|
||||
.add_operation(
|
||||
notify.as_raw_descriptor(),
|
||||
WatchingEvents::empty().set_read(),
|
||||
)
|
||||
.add_operation(notify.as_raw_descriptor(), EventType::Read)
|
||||
.expect("Failed to add notify Event to PollCtx");
|
||||
|
||||
// We don't want to hold an active reference to the executor in the .await below.
|
||||
|
@ -269,7 +260,7 @@ const WOKEN: i32 = 0x3e4d_3276u32 as i32;
|
|||
|
||||
struct RawExecutor {
|
||||
queue: RunnableQueue,
|
||||
poll_ctx: EpollContext<usize>,
|
||||
poll_ctx: WaitContext<usize>,
|
||||
ops: Mutex<Slab<OpStatus>>,
|
||||
blocking_pool: BlockingPool,
|
||||
state: AtomicI32,
|
||||
|
@ -287,7 +278,7 @@ impl RawExecutor {
|
|||
let notify = notify.try_clone().map_err(Error::CloneEvent)?;
|
||||
Ok(RawExecutor {
|
||||
queue: RunnableQueue::new(),
|
||||
poll_ctx: EpollContext::new().map_err(Error::CreatingContext)?,
|
||||
poll_ctx: WaitContext::new().map_err(Error::CreatingContext)?,
|
||||
ops: Mutex::new(Slab::with_capacity(64)),
|
||||
blocking_pool: Default::default(),
|
||||
state: AtomicI32::new(PROCESSING),
|
||||
|
@ -296,7 +287,7 @@ impl RawExecutor {
|
|||
})
|
||||
}
|
||||
|
||||
fn add_operation(&self, fd: RawFd, events: WatchingEvents) -> Result<WakerToken> {
|
||||
fn add_operation(&self, fd: RawFd, event_type: EventType) -> Result<WakerToken> {
|
||||
let duped_fd = unsafe {
|
||||
// Safe because duplicating an FD doesn't affect memory safety, and the dup'd FD
|
||||
// will only be added to the poll loop.
|
||||
|
@ -306,7 +297,7 @@ impl RawExecutor {
|
|||
let entry = ops.vacant_entry();
|
||||
let next_token = entry.key();
|
||||
self.poll_ctx
|
||||
.add_fd_with_events(&duped_fd, events, next_token)
|
||||
.add_for_event(&duped_fd, event_type, next_token)
|
||||
.map_err(Error::SubmittingWaker)?;
|
||||
entry.insert(OpStatus::Pending(OpData {
|
||||
file: duped_fd,
|
||||
|
@ -367,7 +358,6 @@ impl RawExecutor {
|
|||
}
|
||||
|
||||
fn run<F: Future>(&self, cx: &mut Context, done: F) -> Result<F::Output> {
|
||||
let events = EpollEvents::new();
|
||||
pin_mut!(done);
|
||||
|
||||
loop {
|
||||
|
@ -392,16 +382,13 @@ impl RawExecutor {
|
|||
continue;
|
||||
}
|
||||
|
||||
let events = self
|
||||
.poll_ctx
|
||||
.wait(&events)
|
||||
.map_err(Error::PollContextError)?;
|
||||
let events = self.poll_ctx.wait().map_err(Error::PollContextError)?;
|
||||
|
||||
// Set the state back to PROCESSING to prevent any tasks woken up by the loop below from
|
||||
// writing to the eventfd.
|
||||
self.state.store(PROCESSING, Ordering::Release);
|
||||
for e in events.iter() {
|
||||
let token = e.token();
|
||||
let token = e.token;
|
||||
let mut ops = self.ops.lock();
|
||||
|
||||
// The op could have been canceled and removed by another thread so ignore it if it
|
||||
|
|
|
@ -13,7 +13,7 @@ use crate::utils::AsyncJobQueue;
|
|||
use crate::utils::{EventHandler, EventLoop, FailHandle};
|
||||
|
||||
use anyhow::Context;
|
||||
use base::{error, AsRawDescriptor, RawDescriptor, Tube, WatchingEvents};
|
||||
use base::{error, AsRawDescriptor, EventType, RawDescriptor, Tube};
|
||||
use std::collections::HashMap;
|
||||
use std::mem;
|
||||
use std::time::Duration;
|
||||
|
@ -73,7 +73,7 @@ impl HostBackendDeviceProvider {
|
|||
event_loop
|
||||
.add_event(
|
||||
&*inner.control_tube.lock(),
|
||||
WatchingEvents::empty().set_read(),
|
||||
EventType::Read,
|
||||
Arc::downgrade(&handler),
|
||||
)
|
||||
.map_err(Error::AddToEventLoop)?;
|
||||
|
@ -171,7 +171,7 @@ impl ProviderInner {
|
|||
|
||||
if let Err(e) = self.event_loop.add_event(
|
||||
&*arc_mutex_device.lock(),
|
||||
WatchingEvents::empty().set_read().set_write(),
|
||||
EventType::ReadWrite,
|
||||
Arc::downgrade(&event_handler),
|
||||
) {
|
||||
error!("failed to add USB device fd to event handler: {}", e);
|
||||
|
|
|
@ -6,7 +6,7 @@ use super::interrupter::Interrupter;
|
|||
use crate::utils::{EventHandler, EventLoop};
|
||||
|
||||
use anyhow::Context;
|
||||
use base::{error, Event, WatchingEvents};
|
||||
use base::{error, Event, EventType};
|
||||
use std::sync::Arc;
|
||||
use sync::Mutex;
|
||||
|
||||
|
@ -30,7 +30,7 @@ impl IntrResampleHandler {
|
|||
let tmp_handler: Arc<dyn EventHandler> = handler.clone();
|
||||
if let Err(e) = event_loop.add_event(
|
||||
&handler.resample_evt,
|
||||
WatchingEvents::empty().set_read(),
|
||||
EventType::Read,
|
||||
Arc::downgrade(&tmp_handler),
|
||||
) {
|
||||
error!("cannot add intr resample handler to event loop: {}", e);
|
||||
|
|
|
@ -10,7 +10,7 @@ use std::sync::{Arc, MutexGuard};
|
|||
use sync::Mutex;
|
||||
|
||||
use anyhow::Context;
|
||||
use base::{error, Error as SysError, Event, WatchingEvents};
|
||||
use base::{error, Error as SysError, Event, EventType};
|
||||
use remain::sorted;
|
||||
use thiserror::Error;
|
||||
use vm_memory::{GuestAddress, GuestMemory};
|
||||
|
@ -107,7 +107,7 @@ where
|
|||
event_loop
|
||||
.add_event(
|
||||
&controller.event,
|
||||
WatchingEvents::empty().set_read(),
|
||||
EventType::Read,
|
||||
Arc::downgrade(&event_handler),
|
||||
)
|
||||
.map_err(Error::AddEvent)?;
|
||||
|
|
|
@ -6,7 +6,7 @@ use super::{Error, Result};
|
|||
use super::{EventHandler, EventLoop};
|
||||
|
||||
use anyhow::Context;
|
||||
use base::{Event, WatchingEvents};
|
||||
use base::{Event, EventType};
|
||||
use std::mem;
|
||||
use std::sync::Arc;
|
||||
use sync::Mutex;
|
||||
|
@ -26,11 +26,7 @@ impl AsyncJobQueue {
|
|||
evt,
|
||||
});
|
||||
let handler: Arc<dyn EventHandler> = queue.clone();
|
||||
event_loop.add_event(
|
||||
&queue.evt,
|
||||
WatchingEvents::empty().set_read(),
|
||||
Arc::downgrade(&handler),
|
||||
)?;
|
||||
event_loop.add_event(&queue.evt, EventType::Read, Arc::downgrade(&handler))?;
|
||||
Ok(queue)
|
||||
}
|
||||
|
||||
|
|
|
@ -4,8 +4,7 @@
|
|||
|
||||
use super::error::{Error, Result};
|
||||
use base::{
|
||||
error, warn, AsRawDescriptor, Descriptor, EpollContext, EpollEvents, Event, RawDescriptor,
|
||||
WatchingEvents,
|
||||
error, warn, AsRawDescriptor, Descriptor, Event, EventType, RawDescriptor, WaitContext,
|
||||
};
|
||||
use std::collections::BTreeMap;
|
||||
use std::mem::drop;
|
||||
|
@ -37,11 +36,11 @@ impl FailHandle for Option<Arc<dyn FailHandle>> {
|
|||
}
|
||||
}
|
||||
|
||||
/// EpollEventLoop is an event loop blocked on a set of fds. When a monitered events is triggered,
|
||||
/// EventLoop is an event loop blocked on a set of fds. When a monitered events is triggered,
|
||||
/// event loop will invoke the mapped handler.
|
||||
pub struct EventLoop {
|
||||
fail_handle: Option<Arc<dyn FailHandle>>,
|
||||
poll_ctx: Arc<EpollContext<Descriptor>>,
|
||||
poll_ctx: Arc<WaitContext<Descriptor>>,
|
||||
handlers: Arc<Mutex<BTreeMap<RawDescriptor, Weak<dyn EventHandler>>>>,
|
||||
stop_evt: Event,
|
||||
}
|
||||
|
@ -63,7 +62,7 @@ impl EventLoop {
|
|||
|
||||
let fd_callbacks: Arc<Mutex<BTreeMap<RawDescriptor, Weak<dyn EventHandler>>>> =
|
||||
Arc::new(Mutex::new(BTreeMap::new()));
|
||||
let poll_ctx: EpollContext<Descriptor> = EpollContext::new()
|
||||
let poll_ctx: WaitContext<Descriptor> = WaitContext::new()
|
||||
.and_then(|pc| {
|
||||
pc.add(&stop_evt, Descriptor(stop_evt.as_raw_descriptor()))
|
||||
.and(Ok(pc))
|
||||
|
@ -81,13 +80,12 @@ impl EventLoop {
|
|||
let handle = thread::Builder::new()
|
||||
.name(name)
|
||||
.spawn(move || {
|
||||
let event_loop = EpollEvents::new();
|
||||
loop {
|
||||
if fail_handle.failed() {
|
||||
error!("xhci controller already failed, stopping event ring");
|
||||
return;
|
||||
}
|
||||
let events = match poll_ctx.wait(&event_loop) {
|
||||
let events = match poll_ctx.wait() {
|
||||
Ok(events) => events,
|
||||
Err(e) => {
|
||||
error!("cannot poll {:?}", e);
|
||||
|
@ -96,7 +94,7 @@ impl EventLoop {
|
|||
}
|
||||
};
|
||||
for event in &events {
|
||||
let fd = event.token().as_raw_descriptor();
|
||||
let fd = event.token.as_raw_descriptor();
|
||||
if fd == stop_evt.as_raw_descriptor() {
|
||||
return;
|
||||
}
|
||||
|
@ -112,7 +110,7 @@ impl EventLoop {
|
|||
|
||||
// If the file descriptor is hung up, remove it after calling the handler
|
||||
// one final time.
|
||||
let mut remove = event.hungup();
|
||||
let mut remove = event.is_hungup;
|
||||
|
||||
if let Some(handler) = weak_handler.upgrade() {
|
||||
// Drop lock before triggering the event.
|
||||
|
@ -128,7 +126,7 @@ impl EventLoop {
|
|||
}
|
||||
|
||||
if remove {
|
||||
let _ = poll_ctx.delete(&event.token());
|
||||
let _ = poll_ctx.delete(&event.token);
|
||||
let _ = locked.remove(&fd);
|
||||
}
|
||||
}
|
||||
|
@ -148,7 +146,7 @@ impl EventLoop {
|
|||
pub fn add_event(
|
||||
&self,
|
||||
descriptor: &dyn AsRawDescriptor,
|
||||
events: WatchingEvents,
|
||||
event_type: EventType,
|
||||
handler: Weak<dyn EventHandler>,
|
||||
) -> Result<()> {
|
||||
if self.fail_handle.failed() {
|
||||
|
@ -159,9 +157,9 @@ impl EventLoop {
|
|||
.insert(descriptor.as_raw_descriptor(), handler);
|
||||
// This might fail due to epoll syscall. Check epoll_ctl(2).
|
||||
self.poll_ctx
|
||||
.add_fd_with_events(
|
||||
.add_for_event(
|
||||
descriptor,
|
||||
events,
|
||||
event_type,
|
||||
Descriptor(descriptor.as_raw_descriptor()),
|
||||
)
|
||||
.map_err(Error::WaitContextAddDescriptor)
|
||||
|
@ -230,12 +228,8 @@ mod tests {
|
|||
evt,
|
||||
});
|
||||
let t: Arc<dyn EventHandler> = h.clone();
|
||||
l.add_event(
|
||||
&h.evt,
|
||||
WatchingEvents::empty().set_read(),
|
||||
Arc::downgrade(&t),
|
||||
)
|
||||
.unwrap();
|
||||
l.add_event(&h.evt, EventType::Read, Arc::downgrade(&t))
|
||||
.unwrap();
|
||||
self_evt.write(1).unwrap();
|
||||
{
|
||||
let mut val = h.val.lock().unwrap();
|
||||
|
|
Loading…
Reference in a new issue