cros_async: Fix underflow in BlockingPool worker threads

The `num_idle` field of the shared state between BlockingPool worker
threads can underflow in the following case:

* state.num_idle == 2.
* We spawn 2 new tasks into the BlockingPool.
* Both idle worker threads are woken up.  `state.num_idle` goes to 0.
* The first worker thread wakes up and pulls a task from the queue.
  That task finishes very quickly so the worker thread pulls the second
  task from the queue before the second worker thread is scheduled.
* The second worker thread is scheduled.  It sees that
  `s.tasks.is_empty() == true` so it goes back to waiting on the
  Condvar.
* The second worker thread's wait times out and it tries to decrement
  `state.num_idle` leading to underflow.

Fix this by adding a `num_notified` field to the shared worker state.
This field acts like a counter for the number of idle worker threads
that have been woken up.

When an idle thread is waiting on a Condvar, rather than checking if the
task queue is empty, it will instead check if num_notified > 0.  When an
idle worker thread observes that num_notified > 0 it decrements it by 1
and then goes back to processing tasks from the queue.  num_idle is only
decremented when num_notified is 0.

Change the num_idle decrement to a checked_sub so that we can catch it
even when -Coverflow_checks=off.  Also add a test for this case.  This
test consistently panics without the num_notified changes.

BUG=none
TEST=unit tests

Change-Id: Ia1b348605e0d02415635cdd023db1c10201ab661
Reviewed-on: https://chromium-review.googlesource.com/c/chromiumos/platform/crosvm/+/3139159
Tested-by: kokoro <noreply+kokoro@google.com>
Reviewed-by: Richard Zhang <rizhang@google.com>
Reviewed-by: Noah Gold <nkgold@google.com>
Reviewed-by: Daniel Verkamp <dverkamp@chromium.org>
Commit-Queue: Chirantan Ekbote <chirantan@chromium.org>
This commit is contained in:
Chirantan Ekbote 2021-09-02 18:38:44 +09:00 committed by Commit Bot
parent 6d9243865b
commit 2590740b08

View file

@ -24,6 +24,7 @@ struct State {
tasks: VecDeque<Runnable>,
num_threads: usize,
num_idle: usize,
num_notified: usize,
worker_threads: Slab<JoinHandle<()>>,
exited_threads: Option<Receiver<usize>>,
exit: Sender<usize>,
@ -46,14 +47,23 @@ fn run_blocking_thread(idx: usize, inner: Arc<Inner>, exit: Sender<usize>) {
let (guard, result) = inner
.condvar
.wait_timeout_while(state, inner.keepalive, |s| {
!s.shutting_down && s.tasks.is_empty()
!s.shutting_down && s.num_notified == 0
});
state = guard;
// If `state.num_notified > 0` then this was a real wakeup.
if state.num_notified > 0 {
state.num_notified -= 1;
continue;
}
// Only decrement the idle count if we timed out. Otherwise, it was decremented when new
// work was added to `state.tasks`.
if result.timed_out() {
state.num_idle -= 1;
state.num_idle = state
.num_idle
.checked_sub(1)
.expect("`num_idle` underflow on timeout");
break;
}
}
@ -119,6 +129,7 @@ impl Inner {
} else {
// We have idle threads, wake one up.
state.num_idle -= 1;
state.num_notified += 1;
self.condvar.notify_one();
}
}
@ -192,6 +203,7 @@ impl BlockingPool {
tasks: VecDeque::new(),
num_threads: 0,
num_idle: 0,
num_notified: 0,
worker_threads: Slab::new(),
exited_threads: Some(exited_threads),
exit,
@ -213,6 +225,7 @@ impl BlockingPool {
tasks: VecDeque::new(),
num_threads: 0,
num_idle: 0,
num_notified: 0,
worker_threads: Slab::with_capacity(max_threads),
exited_threads: Some(exited_threads),
exit,
@ -338,6 +351,26 @@ mod test {
assert_eq!(res, 42);
}
#[test]
fn fast_tasks_with_short_keepalive() {
let pool = BlockingPool::new(256, Duration::from_millis(1));
let streams = FuturesUnordered::new();
for _ in 0..2 {
for _ in 0..256 {
let task = pool.spawn(|| ());
streams.push(task);
}
thread::sleep(Duration::from_millis(1));
}
block_on(streams.collect::<Vec<_>>());
// The test passes if there are no panics, which would happen if one of the worker threads
// triggered an underflow on `pool.inner.state.num_idle`.
}
#[test]
fn more_tasks_than_threads() {
let pool = BlockingPool::new(4, Duration::from_secs(10));