diff --git a/cros_async/src/sys/windows/async_types.rs b/cros_async/src/sys/windows/async_types.rs index 464bd16f28..46c2b52507 100644 --- a/cros_async/src/sys/windows/async_types.rs +++ b/cros_async/src/sys/windows/async_types.rs @@ -45,8 +45,7 @@ impl AsyncTube { .await .map_err(|e| TubeError::Recv(io::Error::new(io::ErrorKind::Other, e)))?; let tube = Arc::clone(&self.inner); - let handles = - HandleWrapper::new(vec![Descriptor(tube.lock().unwrap().as_raw_descriptor())]); + let handles = HandleWrapper::new(Descriptor(tube.lock().unwrap().as_raw_descriptor())); unblock( move || tube.lock().unwrap().recv(), move || Err(handles.lock().cancel_sync_io(TubeError::OperationCancelled)), @@ -56,8 +55,7 @@ impl AsyncTube { pub async fn send(&self, msg: T) -> TubeResult<()> { let tube = Arc::clone(&self.inner); - let handles = - HandleWrapper::new(vec![Descriptor(tube.lock().unwrap().as_raw_descriptor())]); + let handles = HandleWrapper::new(Descriptor(tube.lock().unwrap().as_raw_descriptor())); unblock( move || tube.lock().unwrap().send(&msg), move || Err(handles.lock().cancel_sync_io(TubeError::OperationCancelled)), diff --git a/cros_async/src/sys/windows/handle_executor.rs b/cros_async/src/sys/windows/handle_executor.rs index 17f8611600..70dc951710 100644 --- a/cros_async/src/sys/windows/handle_executor.rs +++ b/cros_async/src/sys/windows/handle_executor.rs @@ -205,9 +205,7 @@ impl common_executor::Reactor for HandleReactor { _ex: &Arc>, f: F, ) -> AsyncResult> { - Ok(IoSource::Handle(super::HandleSource::new( - vec![f].into_boxed_slice(), - )?)) + Ok(IoSource::Handle(super::HandleSource::new(f)?)) } } diff --git a/cros_async/src/sys/windows/handle_source.rs b/cros_async/src/sys/windows/handle_source.rs index ab9a038dfc..7e5c987711 100644 --- a/cros_async/src/sys/windows/handle_source.rs +++ b/cros_async/src/sys/windows/handle_source.rs @@ -28,7 +28,6 @@ use smallvec::SmallVec; use sync::Mutex; use thiserror::Error as ThisError; use winapi::um::ioapiset::CancelIoEx; -use winapi::um::processthreadsapi::GetCurrentThreadId; use crate::mem::BackingMemory; use crate::mem::MemRegion; @@ -88,37 +87,32 @@ pub type Result = std::result::Result; /// Used to shutdown IO running on a CancellableBlockingPool. pub struct HandleWrapper { - handles: Vec, + handle: Descriptor, } impl HandleWrapper { - pub fn new(handles: Vec) -> Arc> { - Arc::new(Mutex::new(Self { handles })) + pub fn new(handle: Descriptor) -> Arc> { + Arc::new(Mutex::new(Self { handle })) } pub fn cancel_sync_io(&mut self, ret: T) -> T { - for handle in &self.handles { - // There isn't much we can do if cancel fails. - // SAFETY: trivially safe - if unsafe { CancelIoEx(handle.as_raw_descriptor(), null_mut()) } == 0 { - warn!( - "Cancel IO for handle:{:?} failed with {}", - handle.as_raw_descriptor(), - SysUtilError::last() - ); - } + // There isn't much we can do if cancel fails. + // SAFETY: trivially safe + if unsafe { CancelIoEx(self.handle.as_raw_descriptor(), null_mut()) } == 0 { + warn!( + "Cancel IO for handle:{:?} failed with {}", + self.handle.as_raw_descriptor(), + SysUtilError::last() + ); } ret } } -/// Async IO source for Windows that uses a multi-threaded, multi-handle approach to provide fast IO -/// operations. It demuxes IO requests across a set of handles that refer to the same underlying IO -/// source, such as a file, and executes those requests across multiple threads. Benchmarks show -/// that this is the fastest method to perform IO on Windows, especially for file reads. +/// Async IO source for Windows, such as a file. pub struct HandleSource { - sources: Box<[F]>, - source_descriptors: Vec, + source: Option, + source_descriptor: Descriptor, blocking_pool: CancellableBlockingPool, } @@ -129,7 +123,7 @@ impl HandleSource { /// threads are generally idle because they're waiting on blocking IO, so the cost is minimal. /// Long term, we may migrate away from this approach toward IOCP or overlapped IO. /// - /// WARNING: every `source` in `sources` MUST be a unique file object (e.g. separate handles + /// WARNING: `source` MUST be a unique file object (e.g. separate handles /// each created by CreateFile), and point at the same file on disk. This is because IO /// operations on the HandleSource are randomly distributed to each source. /// @@ -137,21 +131,15 @@ impl HandleSource { /// The caller must guarantee that `F`'s handle is compatible with the underlying functions /// exposed on `HandleSource`. The behavior when calling unsupported functions is not defined /// by this struct. Note that most winapis will fail with reasonable errors. - pub fn new(sources: Box<[F]>) -> Result { - let source_count = sources.len(); - let mut source_descriptors = Vec::with_capacity(source_count); - - // Safe because consumers of the descriptors are tied to the lifetime of HandleSource. - for source in sources.iter() { - source_descriptors.push(Descriptor(source.as_raw_descriptor())); - } + pub fn new(source: F) -> Result { + let source_descriptor = Descriptor(source.as_raw_descriptor()); Ok(Self { - sources, - source_descriptors, + source: Some(source), + source_descriptor, blocking_pool: CancellableBlockingPool::new( // WARNING: this is a safety requirement! Threads are 1:1 with sources. - source_count, + 1, Duration::from_secs(10), ), }) @@ -170,14 +158,6 @@ impl HandleSource { }) .collect::>>() } - - // Returns a copy of all the source handles as a vector of descriptors. - fn as_descriptors(&self) -> Vec { - self.sources - .iter() - .map(|i| Descriptor(i.as_raw_descriptor())) - .collect() - } } impl Drop for HandleSource { @@ -188,15 +168,11 @@ impl Drop for HandleSource { } } -fn get_thread_file(descriptors: Vec) -> ManuallyDrop { +fn get_thread_file(descriptor: Descriptor) -> ManuallyDrop { // SAFETY: trivially safe // Safe because all callers must exit *before* these handles will be closed (guaranteed by // HandleSource's Drop impl.). - unsafe { - ManuallyDrop::new(File::from_raw_descriptor( - descriptors[GetCurrentThreadId() as usize % descriptors.len()].0, - )) - } + unsafe { ManuallyDrop::new(File::from_raw_descriptor(descriptor.0)) } } impl HandleSource { @@ -206,8 +182,8 @@ impl HandleSource { file_offset: Option, mut vec: Vec, ) -> AsyncResult<(usize, Vec)> { - let handles = HandleWrapper::new(self.as_descriptors()); - let descriptors = self.source_descriptors.clone(); + let handles = HandleWrapper::new(self.source_descriptor); + let descriptors = self.source_descriptor; self.blocking_pool .spawn( @@ -236,8 +212,8 @@ impl HandleSource { mem_offsets: impl IntoIterator, ) -> AsyncResult { let mem_offsets = mem_offsets.into_iter().collect(); - let handles = HandleWrapper::new(self.as_descriptors()); - let descriptors = self.source_descriptors.clone(); + let handles = HandleWrapper::new(self.source_descriptor); + let descriptors = self.source_descriptor; self.blocking_pool .spawn( @@ -276,8 +252,8 @@ impl HandleSource { file_offset: Option, vec: Vec, ) -> AsyncResult<(usize, Vec)> { - let handles = HandleWrapper::new(self.as_descriptors()); - let descriptors = self.source_descriptors.clone(); + let handles = HandleWrapper::new(self.source_descriptor); + let descriptors = self.source_descriptor; self.blocking_pool .spawn( @@ -306,8 +282,8 @@ impl HandleSource { mem_offsets: impl IntoIterator, ) -> AsyncResult { let mem_offsets = mem_offsets.into_iter().collect(); - let handles = HandleWrapper::new(self.as_descriptors()); - let descriptors = self.source_descriptors.clone(); + let handles = HandleWrapper::new(self.source_descriptor); + let descriptors = self.source_descriptor; self.blocking_pool .spawn( @@ -332,8 +308,8 @@ impl HandleSource { /// Deallocates the given range of a file. pub async fn punch_hole(&self, file_offset: u64, len: u64) -> AsyncResult<()> { - let handles = HandleWrapper::new(self.as_descriptors()); - let descriptors = self.source_descriptors.clone(); + let handles = HandleWrapper::new(self.source_descriptor); + let descriptors = self.source_descriptor; self.blocking_pool .spawn( move || { @@ -350,8 +326,8 @@ impl HandleSource { /// Fills the given range with zeroes. pub async fn write_zeroes_at(&self, file_offset: u64, len: u64) -> AsyncResult<()> { - let handles = HandleWrapper::new(self.as_descriptors()); - let descriptors = self.source_descriptors.clone(); + let handles = HandleWrapper::new(self.source_descriptor); + let descriptors = self.source_descriptor; self.blocking_pool .spawn( move || { @@ -370,8 +346,8 @@ impl HandleSource { /// Sync all completed write operations to the backing storage. pub async fn fsync(&self) -> AsyncResult<()> { - let handles = HandleWrapper::new(self.as_descriptors()); - let descriptors = self.source_descriptors.clone(); + let handles = HandleWrapper::new(self.source_descriptor); + let descriptors = self.source_descriptor; self.blocking_pool .spawn( @@ -392,38 +368,27 @@ impl HandleSource { self.fsync().await } - /// Note that on Windows w/ multiple sources these functions do not make sense. - /// TODO(nkgold): decide on what these should mean. - /// Yields the underlying IO source. - pub fn into_source(self) -> F { - unimplemented!("`into_source` is not supported on Windows.") + pub fn into_source(mut self) -> F { + self.source.take().expect("`source` should not be `None`") } /// Provides a mutable ref to the underlying IO source. pub fn as_source_mut(&mut self) -> &mut F { - if self.sources.len() == 1 { - return &mut self.sources[0]; - } - // Unimplemented for multiple-source use-case - unimplemented!( - "`as_source_mut` doesn't support source len of {}", - self.sources.len() - ) + self.source.as_mut().expect("`source` should not be `None`") } /// Provides a ref to the underlying IO source. /// - /// In the multi-source case, the 0th source will be returned. If sources are not - /// interchangeable, behavior is undefined. + /// If sources are not interchangeable, behavior is undefined. pub fn as_source(&self) -> &F { - &self.sources[0] + self.source.as_ref().expect("`source` should not be `None`") } - /// In the multi-source case, the 0th source is waited on. If sources are not interchangeable, - /// behavior is undefined. + /// If sources are not interchangeable, behavior is undefined. pub async fn wait_for_handle(&self) -> AsyncResult<()> { - let waiter = super::WaitForHandle::new(&self.sources[0]); + let waiter = + super::WaitForHandle::new(self.source.as_ref().expect("`source` should not be `None`")); waiter.await.map_err(AsyncError::HandleSource) } } @@ -458,7 +423,7 @@ mod tests { .write(true) .open(temp_file.path()) .unwrap(); - let handle_src = HandleSource::new(vec![f].into_boxed_slice()).unwrap(); + let handle_src = HandleSource::new(f).unwrap(); ex.run_until(punch_hole(&handle_src)).unwrap(); let mut buf = vec![0; 11]; @@ -489,7 +454,7 @@ mod tests { .write(true) .open(temp_file.path()) .unwrap(); - let handle_src = HandleSource::new(vec![f].into_boxed_slice()).unwrap(); + let handle_src = HandleSource::new(f).unwrap(); ex.run_until(punch_hole(&handle_src)).unwrap(); let mut buf = vec![0; 13]; @@ -519,7 +484,7 @@ mod tests { // .write(true) // .open(temp_file.path()) // .unwrap(); - // let handle_src = HandleSource::new(vec![f].into_boxed_slice()).unwrap(); + // let handle_src = HandleSource::new(f).unwrap(); // ex.run_until(punch_hole(&handle_src)).unwrap(); // let mut buf = vec![0; 13];