cros_async: avoid unblock in Tube until readable.

Because unblock tasks cannot be canceled, and the ones started for
AsyncTube hold locks, this can cause code trying to convert the
AsyncTube back to a Tube to stall because the lock is held on the
unblock thread and won't ever be released until the global pool is
shutdown.

The right solution is to abandon unblock or make it cancellable.
Those are significant undertakings, so in the interim, we can
avoid starting unblock until the read notifier is triggered.
This way, we won't actually be in unblock, and the lock will
not be held.

BUG=b:294134741
TEST=snapshot no longer stalls on Windows.

Change-Id: I36bfd0ca5546c43f766c431a2fb97d2b87a679ae
Reviewed-on: https://chromium-review.googlesource.com/c/crosvm/crosvm/+/4749226
Reviewed-by: Frederick Mayle <fmayle@google.com>
Commit-Queue: Noah Gold <nkgold@google.com>
Reviewed-by: Richard Zhang <rizhang@google.com>
This commit is contained in:
Noah Gold 2023-08-03 21:29:26 -07:00 committed by crosvm LUCI
parent 2bfd77c093
commit 7a5226e6e9

View file

@ -8,6 +8,7 @@ use std::sync::Mutex;
use base::AsRawDescriptor;
use base::Descriptor;
use base::ReadNotifier;
use base::Tube;
use base::TubeError;
use base::TubeResult;
@ -16,16 +17,22 @@ use serde::Serialize;
use super::HandleWrapper;
use crate::unblock;
use crate::EventAsync;
use crate::Executor;
pub struct AsyncTube {
inner: Arc<Mutex<Tube>>,
read_notifier: EventAsync,
}
impl AsyncTube {
pub fn new(_ex: &Executor, tube: Tube) -> io::Result<AsyncTube> {
pub fn new(ex: &Executor, tube: Tube) -> io::Result<AsyncTube> {
let read_notifier = EventAsync::clone_raw_without_reset(tube.get_read_notifier(), ex)
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
let inner = Arc::new(Mutex::new(tube));
Ok(AsyncTube {
inner: Arc::new(Mutex::new(tube)),
inner,
read_notifier,
})
}
@ -33,6 +40,10 @@ impl AsyncTube {
/// upstream, but for now is implemented to work using simple blocking futures
/// (avoiding the unimplemented wait_readable).
pub async fn next<T: 'static + DeserializeOwned + Send>(&self) -> TubeResult<T> {
self.read_notifier
.next_val()
.await
.map_err(|e| TubeError::Recv(io::Error::new(io::ErrorKind::Other, e)))?;
let tube = Arc::clone(&self.inner);
let handles =
HandleWrapper::new(vec![Descriptor(tube.lock().unwrap().as_raw_descriptor())]);