diff --git a/cros_async/src/blocking/pool.rs b/cros_async/src/blocking/pool.rs index 2109fd7911..f7183f7c86 100644 --- a/cros_async/src/blocking/pool.rs +++ b/cros_async/src/blocking/pool.rs @@ -24,6 +24,7 @@ struct State { tasks: VecDeque, num_threads: usize, num_idle: usize, + num_notified: usize, worker_threads: Slab>, exited_threads: Option>, exit: Sender, @@ -46,14 +47,23 @@ fn run_blocking_thread(idx: usize, inner: Arc, exit: Sender) { let (guard, result) = inner .condvar .wait_timeout_while(state, inner.keepalive, |s| { - !s.shutting_down && s.tasks.is_empty() + !s.shutting_down && s.num_notified == 0 }); state = guard; + // If `state.num_notified > 0` then this was a real wakeup. + if state.num_notified > 0 { + state.num_notified -= 1; + continue; + } + // Only decrement the idle count if we timed out. Otherwise, it was decremented when new // work was added to `state.tasks`. if result.timed_out() { - state.num_idle -= 1; + state.num_idle = state + .num_idle + .checked_sub(1) + .expect("`num_idle` underflow on timeout"); break; } } @@ -119,6 +129,7 @@ impl Inner { } else { // We have idle threads, wake one up. state.num_idle -= 1; + state.num_notified += 1; self.condvar.notify_one(); } } @@ -192,6 +203,7 @@ impl BlockingPool { tasks: VecDeque::new(), num_threads: 0, num_idle: 0, + num_notified: 0, worker_threads: Slab::new(), exited_threads: Some(exited_threads), exit, @@ -213,6 +225,7 @@ impl BlockingPool { tasks: VecDeque::new(), num_threads: 0, num_idle: 0, + num_notified: 0, worker_threads: Slab::with_capacity(max_threads), exited_threads: Some(exited_threads), exit, @@ -338,6 +351,26 @@ mod test { assert_eq!(res, 42); } + #[test] + fn fast_tasks_with_short_keepalive() { + let pool = BlockingPool::new(256, Duration::from_millis(1)); + + let streams = FuturesUnordered::new(); + for _ in 0..2 { + for _ in 0..256 { + let task = pool.spawn(|| ()); + streams.push(task); + } + + thread::sleep(Duration::from_millis(1)); + } + + block_on(streams.collect::>()); + + // The test passes if there are no panics, which would happen if one of the worker threads + // triggered an underflow on `pool.inner.state.num_idle`. + } + #[test] fn more_tasks_than_threads() { let pool = BlockingPool::new(4, Duration::from_secs(10));