cros_async: Implement "into_source" for HandleSource and remove the

multi-handle feature

Without the "into_source" implementation, the blk device will crash
for Windows.

BUG=b:319154589
TEST=tested downstream to verify this works on Windows

Change-Id: Ida383104e7a05f0f891f4ee99b3ce322e7eda64d
Reviewed-on: https://chromium-review.googlesource.com/c/crosvm/crosvm/+/5201258
Commit-Queue: Richard Zhang <rizhang@google.com>
Reviewed-by: Noah Gold <nkgold@google.com>
This commit is contained in:
Richard Zhang 2024-01-17 01:14:23 +00:00 committed by crosvm LUCI
parent c29032e8dc
commit ff28517db3
3 changed files with 50 additions and 89 deletions

View file

@ -45,8 +45,7 @@ impl AsyncTube {
.await
.map_err(|e| TubeError::Recv(io::Error::new(io::ErrorKind::Other, e)))?;
let tube = Arc::clone(&self.inner);
let handles =
HandleWrapper::new(vec![Descriptor(tube.lock().unwrap().as_raw_descriptor())]);
let handles = HandleWrapper::new(Descriptor(tube.lock().unwrap().as_raw_descriptor()));
unblock(
move || tube.lock().unwrap().recv(),
move || Err(handles.lock().cancel_sync_io(TubeError::OperationCancelled)),
@ -56,8 +55,7 @@ impl AsyncTube {
pub async fn send<T: 'static + Serialize + Send + Sync>(&self, msg: T) -> TubeResult<()> {
let tube = Arc::clone(&self.inner);
let handles =
HandleWrapper::new(vec![Descriptor(tube.lock().unwrap().as_raw_descriptor())]);
let handles = HandleWrapper::new(Descriptor(tube.lock().unwrap().as_raw_descriptor()));
unblock(
move || tube.lock().unwrap().send(&msg),
move || Err(handles.lock().cancel_sync_io(TubeError::OperationCancelled)),

View file

@ -205,9 +205,7 @@ impl common_executor::Reactor for HandleReactor {
_ex: &Arc<RawExecutor<Self>>,
f: F,
) -> AsyncResult<IoSource<F>> {
Ok(IoSource::Handle(super::HandleSource::new(
vec![f].into_boxed_slice(),
)?))
Ok(IoSource::Handle(super::HandleSource::new(f)?))
}
}

View file

@ -28,7 +28,6 @@ use smallvec::SmallVec;
use sync::Mutex;
use thiserror::Error as ThisError;
use winapi::um::ioapiset::CancelIoEx;
use winapi::um::processthreadsapi::GetCurrentThreadId;
use crate::mem::BackingMemory;
use crate::mem::MemRegion;
@ -88,37 +87,32 @@ pub type Result<T> = std::result::Result<T, Error>;
/// Used to shutdown IO running on a CancellableBlockingPool.
pub struct HandleWrapper {
handles: Vec<Descriptor>,
handle: Descriptor,
}
impl HandleWrapper {
pub fn new(handles: Vec<Descriptor>) -> Arc<Mutex<HandleWrapper>> {
Arc::new(Mutex::new(Self { handles }))
pub fn new(handle: Descriptor) -> Arc<Mutex<HandleWrapper>> {
Arc::new(Mutex::new(Self { handle }))
}
pub fn cancel_sync_io<T>(&mut self, ret: T) -> T {
for handle in &self.handles {
// There isn't much we can do if cancel fails.
// SAFETY: trivially safe
if unsafe { CancelIoEx(handle.as_raw_descriptor(), null_mut()) } == 0 {
warn!(
"Cancel IO for handle:{:?} failed with {}",
handle.as_raw_descriptor(),
SysUtilError::last()
);
}
// There isn't much we can do if cancel fails.
// SAFETY: trivially safe
if unsafe { CancelIoEx(self.handle.as_raw_descriptor(), null_mut()) } == 0 {
warn!(
"Cancel IO for handle:{:?} failed with {}",
self.handle.as_raw_descriptor(),
SysUtilError::last()
);
}
ret
}
}
/// Async IO source for Windows that uses a multi-threaded, multi-handle approach to provide fast IO
/// operations. It demuxes IO requests across a set of handles that refer to the same underlying IO
/// source, such as a file, and executes those requests across multiple threads. Benchmarks show
/// that this is the fastest method to perform IO on Windows, especially for file reads.
/// Async IO source for Windows, such as a file.
pub struct HandleSource<F: AsRawDescriptor> {
sources: Box<[F]>,
source_descriptors: Vec<Descriptor>,
source: Option<F>,
source_descriptor: Descriptor,
blocking_pool: CancellableBlockingPool,
}
@ -129,7 +123,7 @@ impl<F: AsRawDescriptor> HandleSource<F> {
/// threads are generally idle because they're waiting on blocking IO, so the cost is minimal.
/// Long term, we may migrate away from this approach toward IOCP or overlapped IO.
///
/// WARNING: every `source` in `sources` MUST be a unique file object (e.g. separate handles
/// WARNING: `source` MUST be a unique file object (e.g. separate handles
/// each created by CreateFile), and point at the same file on disk. This is because IO
/// operations on the HandleSource are randomly distributed to each source.
///
@ -137,21 +131,15 @@ impl<F: AsRawDescriptor> HandleSource<F> {
/// The caller must guarantee that `F`'s handle is compatible with the underlying functions
/// exposed on `HandleSource`. The behavior when calling unsupported functions is not defined
/// by this struct. Note that most winapis will fail with reasonable errors.
pub fn new(sources: Box<[F]>) -> Result<Self> {
let source_count = sources.len();
let mut source_descriptors = Vec::with_capacity(source_count);
// Safe because consumers of the descriptors are tied to the lifetime of HandleSource.
for source in sources.iter() {
source_descriptors.push(Descriptor(source.as_raw_descriptor()));
}
pub fn new(source: F) -> Result<Self> {
let source_descriptor = Descriptor(source.as_raw_descriptor());
Ok(Self {
sources,
source_descriptors,
source: Some(source),
source_descriptor,
blocking_pool: CancellableBlockingPool::new(
// WARNING: this is a safety requirement! Threads are 1:1 with sources.
source_count,
1,
Duration::from_secs(10),
),
})
@ -170,14 +158,6 @@ impl<F: AsRawDescriptor> HandleSource<F> {
})
.collect::<Result<SmallVec<[VolatileSlice; 16]>>>()
}
// Returns a copy of all the source handles as a vector of descriptors.
fn as_descriptors(&self) -> Vec<Descriptor> {
self.sources
.iter()
.map(|i| Descriptor(i.as_raw_descriptor()))
.collect()
}
}
impl<F: AsRawDescriptor> Drop for HandleSource<F> {
@ -188,15 +168,11 @@ impl<F: AsRawDescriptor> Drop for HandleSource<F> {
}
}
fn get_thread_file(descriptors: Vec<Descriptor>) -> ManuallyDrop<File> {
fn get_thread_file(descriptor: Descriptor) -> ManuallyDrop<File> {
// SAFETY: trivially safe
// Safe because all callers must exit *before* these handles will be closed (guaranteed by
// HandleSource's Drop impl.).
unsafe {
ManuallyDrop::new(File::from_raw_descriptor(
descriptors[GetCurrentThreadId() as usize % descriptors.len()].0,
))
}
unsafe { ManuallyDrop::new(File::from_raw_descriptor(descriptor.0)) }
}
impl<F: AsRawDescriptor> HandleSource<F> {
@ -206,8 +182,8 @@ impl<F: AsRawDescriptor> HandleSource<F> {
file_offset: Option<u64>,
mut vec: Vec<u8>,
) -> AsyncResult<(usize, Vec<u8>)> {
let handles = HandleWrapper::new(self.as_descriptors());
let descriptors = self.source_descriptors.clone();
let handles = HandleWrapper::new(self.source_descriptor);
let descriptors = self.source_descriptor;
self.blocking_pool
.spawn(
@ -236,8 +212,8 @@ impl<F: AsRawDescriptor> HandleSource<F> {
mem_offsets: impl IntoIterator<Item = MemRegion>,
) -> AsyncResult<usize> {
let mem_offsets = mem_offsets.into_iter().collect();
let handles = HandleWrapper::new(self.as_descriptors());
let descriptors = self.source_descriptors.clone();
let handles = HandleWrapper::new(self.source_descriptor);
let descriptors = self.source_descriptor;
self.blocking_pool
.spawn(
@ -276,8 +252,8 @@ impl<F: AsRawDescriptor> HandleSource<F> {
file_offset: Option<u64>,
vec: Vec<u8>,
) -> AsyncResult<(usize, Vec<u8>)> {
let handles = HandleWrapper::new(self.as_descriptors());
let descriptors = self.source_descriptors.clone();
let handles = HandleWrapper::new(self.source_descriptor);
let descriptors = self.source_descriptor;
self.blocking_pool
.spawn(
@ -306,8 +282,8 @@ impl<F: AsRawDescriptor> HandleSource<F> {
mem_offsets: impl IntoIterator<Item = MemRegion>,
) -> AsyncResult<usize> {
let mem_offsets = mem_offsets.into_iter().collect();
let handles = HandleWrapper::new(self.as_descriptors());
let descriptors = self.source_descriptors.clone();
let handles = HandleWrapper::new(self.source_descriptor);
let descriptors = self.source_descriptor;
self.blocking_pool
.spawn(
@ -332,8 +308,8 @@ impl<F: AsRawDescriptor> HandleSource<F> {
/// Deallocates the given range of a file.
pub async fn punch_hole(&self, file_offset: u64, len: u64) -> AsyncResult<()> {
let handles = HandleWrapper::new(self.as_descriptors());
let descriptors = self.source_descriptors.clone();
let handles = HandleWrapper::new(self.source_descriptor);
let descriptors = self.source_descriptor;
self.blocking_pool
.spawn(
move || {
@ -350,8 +326,8 @@ impl<F: AsRawDescriptor> HandleSource<F> {
/// Fills the given range with zeroes.
pub async fn write_zeroes_at(&self, file_offset: u64, len: u64) -> AsyncResult<()> {
let handles = HandleWrapper::new(self.as_descriptors());
let descriptors = self.source_descriptors.clone();
let handles = HandleWrapper::new(self.source_descriptor);
let descriptors = self.source_descriptor;
self.blocking_pool
.spawn(
move || {
@ -370,8 +346,8 @@ impl<F: AsRawDescriptor> HandleSource<F> {
/// Sync all completed write operations to the backing storage.
pub async fn fsync(&self) -> AsyncResult<()> {
let handles = HandleWrapper::new(self.as_descriptors());
let descriptors = self.source_descriptors.clone();
let handles = HandleWrapper::new(self.source_descriptor);
let descriptors = self.source_descriptor;
self.blocking_pool
.spawn(
@ -392,38 +368,27 @@ impl<F: AsRawDescriptor> HandleSource<F> {
self.fsync().await
}
/// Note that on Windows w/ multiple sources these functions do not make sense.
/// TODO(nkgold): decide on what these should mean.
/// Yields the underlying IO source.
pub fn into_source(self) -> F {
unimplemented!("`into_source` is not supported on Windows.")
pub fn into_source(mut self) -> F {
self.source.take().expect("`source` should not be `None`")
}
/// Provides a mutable ref to the underlying IO source.
pub fn as_source_mut(&mut self) -> &mut F {
if self.sources.len() == 1 {
return &mut self.sources[0];
}
// Unimplemented for multiple-source use-case
unimplemented!(
"`as_source_mut` doesn't support source len of {}",
self.sources.len()
)
self.source.as_mut().expect("`source` should not be `None`")
}
/// Provides a ref to the underlying IO source.
///
/// In the multi-source case, the 0th source will be returned. If sources are not
/// interchangeable, behavior is undefined.
/// If sources are not interchangeable, behavior is undefined.
pub fn as_source(&self) -> &F {
&self.sources[0]
self.source.as_ref().expect("`source` should not be `None`")
}
/// In the multi-source case, the 0th source is waited on. If sources are not interchangeable,
/// behavior is undefined.
/// If sources are not interchangeable, behavior is undefined.
pub async fn wait_for_handle(&self) -> AsyncResult<()> {
let waiter = super::WaitForHandle::new(&self.sources[0]);
let waiter =
super::WaitForHandle::new(self.source.as_ref().expect("`source` should not be `None`"));
waiter.await.map_err(AsyncError::HandleSource)
}
}
@ -458,7 +423,7 @@ mod tests {
.write(true)
.open(temp_file.path())
.unwrap();
let handle_src = HandleSource::new(vec![f].into_boxed_slice()).unwrap();
let handle_src = HandleSource::new(f).unwrap();
ex.run_until(punch_hole(&handle_src)).unwrap();
let mut buf = vec![0; 11];
@ -489,7 +454,7 @@ mod tests {
.write(true)
.open(temp_file.path())
.unwrap();
let handle_src = HandleSource::new(vec![f].into_boxed_slice()).unwrap();
let handle_src = HandleSource::new(f).unwrap();
ex.run_until(punch_hole(&handle_src)).unwrap();
let mut buf = vec![0; 13];
@ -519,7 +484,7 @@ mod tests {
// .write(true)
// .open(temp_file.path())
// .unwrap();
// let handle_src = HandleSource::new(vec![f].into_boxed_slice()).unwrap();
// let handle_src = HandleSource::new(f).unwrap();
// ex.run_until(punch_hole(&handle_src)).unwrap();
// let mut buf = vec![0; 13];