diff --git a/cros_async/src/sys/windows/async_types.rs b/cros_async/src/sys/windows/async_types.rs index cce18ba25e..464bd16f28 100644 --- a/cros_async/src/sys/windows/async_types.rs +++ b/cros_async/src/sys/windows/async_types.rs @@ -8,6 +8,7 @@ use std::sync::Mutex; use base::AsRawDescriptor; use base::Descriptor; +use base::ReadNotifier; use base::Tube; use base::TubeError; use base::TubeResult; @@ -16,16 +17,22 @@ use serde::Serialize; use super::HandleWrapper; use crate::unblock; +use crate::EventAsync; use crate::Executor; pub struct AsyncTube { inner: Arc>, + read_notifier: EventAsync, } impl AsyncTube { - pub fn new(_ex: &Executor, tube: Tube) -> io::Result { + pub fn new(ex: &Executor, tube: Tube) -> io::Result { + let read_notifier = EventAsync::clone_raw_without_reset(tube.get_read_notifier(), ex) + .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; + let inner = Arc::new(Mutex::new(tube)); Ok(AsyncTube { - inner: Arc::new(Mutex::new(tube)), + inner, + read_notifier, }) } @@ -33,6 +40,10 @@ impl AsyncTube { /// upstream, but for now is implemented to work using simple blocking futures /// (avoiding the unimplemented wait_readable). pub async fn next(&self) -> TubeResult { + self.read_notifier + .next_val() + .await + .map_err(|e| TubeError::Recv(io::Error::new(io::ErrorKind::Other, e)))?; let tube = Arc::clone(&self.inner); let handles = HandleWrapper::new(vec![Descriptor(tube.lock().unwrap().as_raw_descriptor())]);