cros_async: make overlapped IO sources Send

Futures that use `IoSource` methods can't be sent across threads because
they might be backed by an overlapped op, which contains raw pointers.
This makes `Executor::spawn` virtually unusable.

`OVERLAPPED` creation has been moved into `OverlappedOperation::new` so
that it is harder to keep one on the stack of an async function (and
inadvertently make it `!Send`).

A new type, `BoxedOverlapped`, is has been added as a workaround to mark
`OVERLAPPED` as `Send` and `Sync`. The `Pin` wasn't needed and has been
removed.

There is a similar issue on the linux side. I'll add a test after it is
fixed.

BUG=b:271297810

Change-Id: Ibfb5a9a47bdf731e0647d4813c7bb2f4a3db8299
Reviewed-on: https://chromium-review.googlesource.com/c/crosvm/crosvm/+/5086672
Commit-Queue: Frederick Mayle <fmayle@google.com>
Reviewed-by: Vikram Auradkar <auradkar@google.com>
This commit is contained in:
Frederick Mayle 2023-12-04 20:43:11 -08:00 committed by crosvm LUCI
parent 59e5ba50ea
commit e66920249d
3 changed files with 30 additions and 28 deletions

View file

@ -89,12 +89,22 @@ pub struct PipeConnection {
blocking_mode: BlockingMode,
}
/// `OVERLAPPED` is allocated on the heap because it must not move while performing I/O operations.
///
/// Defined as a separate type so that we can mark it as `Send` and `Sync`.
pub struct BoxedOverlapped(pub Box<OVERLAPPED>);
// SAFETY: `OVERLAPPED` is not automatically `Send` because it contains a `HANDLE`, which is a raw
// pointer, but `HANDLE`s are safe to move between threads and thus so is `OVERLAPPED`.
unsafe impl Send for BoxedOverlapped {}
// SAFETY: See the argument for `Send` above. `HANDLE`s are also safe to share between threads.
unsafe impl Sync for BoxedOverlapped {}
/// Wraps the OVERLAPPED structure. Also keeps track of whether OVERLAPPED is being used by a
/// Readfile or WriteFile operation and holds onto the event object so it doesn't get dropped.
pub struct OverlappedWrapper {
// Allocated on the heap so that the OVERLAPPED struct doesn't move when performing I/O
// operations.
overlapped: Box<OVERLAPPED>,
overlapped: BoxedOverlapped,
// This field prevents the event handle from being dropped too early and allows callers to
// be notified when a read or write overlapped operation has completed.
h_event: Option<Event>,
@ -130,17 +140,13 @@ impl OverlappedWrapper {
};
Ok(OverlappedWrapper {
overlapped: Box::new(overlapped),
overlapped: BoxedOverlapped(Box::new(overlapped)),
h_event,
in_use: false,
})
}
}
// SAFETY:
// Safe because all of the contained fields may be safely sent to another thread.
unsafe impl Send for OverlappedWrapper {}
pub trait WriteOverlapped {
/// Perform an overlapped write operation with the specified buffer and overlapped wrapper.
/// If successful, the write operation will complete asynchronously, and
@ -564,7 +570,7 @@ impl PipeConnection {
&self.handle,
self.blocking_mode,
buf,
Some(&mut overlapped_wrapper.overlapped),
Some(&mut overlapped_wrapper.overlapped.0),
)?;
Ok(())
}
@ -727,7 +733,7 @@ impl PipeConnection {
PipeConnection::write_internal(
&self.handle,
buf,
Some(&mut overlapped_wrapper.overlapped),
Some(&mut overlapped_wrapper.overlapped.0),
)?;
Ok(())
}
@ -850,7 +856,7 @@ impl PipeConnection {
self.as_raw_descriptor(),
// Note: The overlapped structure is only used if the pipe was opened in
// OVERLAPPED mode, but is necessary in that case.
&mut *overlapped_wrapper.overlapped,
&mut *overlapped_wrapper.overlapped.0,
);
if success_flag == 0 {
return match GetLastError() {
@ -934,7 +940,7 @@ impl PipeConnection {
fail_if_zero!(unsafe {
GetOverlappedResult(
self.handle.as_raw_descriptor(),
&mut *overlapped_wrapper.overlapped,
&mut *overlapped_wrapper.overlapped.0,
&mut size_transferred,
if wait { TRUE } else { FALSE },
)

View file

@ -12,6 +12,7 @@ use std::sync::Arc;
use std::sync::Weak;
use std::task::Waker;
use base::named_pipes::BoxedOverlapped;
use base::warn;
use base::AsRawDescriptor;
use base::Error as SysError;
@ -234,9 +235,9 @@ impl RegisteredOverlappedSource {
/// in the IO call.
pub fn register_overlapped_operation(
&self,
overlapped: OVERLAPPED,
offset: Option<u64>,
) -> Result<OverlappedOperation> {
OverlappedOperation::new(overlapped, self.ex.clone())
OverlappedOperation::new(offset, self.ex.clone())
}
}
@ -256,15 +257,15 @@ impl WeakWake for HandleReactor {
/// ensure the waker has been registered. (If the executor polls the IOCP before the waker
/// is registered, the future will stall.)
pub(crate) struct OverlappedOperation {
overlapped: Pin<Box<OVERLAPPED>>,
overlapped: BoxedOverlapped,
ex: Weak<RawExecutor<HandleReactor>>,
completed: bool,
}
impl OverlappedOperation {
fn new(overlapped: OVERLAPPED, ex: Weak<RawExecutor<HandleReactor>>) -> Result<Self> {
fn new(offset: Option<u64>, ex: Weak<RawExecutor<HandleReactor>>) -> Result<Self> {
let ret = Self {
overlapped: Box::pin(overlapped),
overlapped: BoxedOverlapped(Box::new(base::create_overlapped(offset, None))),
ex,
completed: false,
};
@ -285,12 +286,12 @@ impl OverlappedOperation {
/// when making the overlapped IO call or the executor will not be able to wake the right
/// future.
pub fn get_overlapped(&mut self) -> &mut OVERLAPPED {
&mut self.overlapped
&mut self.overlapped.0
}
#[inline]
pub fn get_token(&self) -> WakerToken {
WakerToken((&*self.overlapped) as *const _ as usize)
WakerToken((&*self.overlapped.0) as *const _ as usize)
}
}

View file

@ -10,7 +10,6 @@ use std::io::Write;
use std::mem::ManuallyDrop;
use std::sync::Arc;
use base::create_overlapped;
use base::error;
use base::AsRawDescriptor;
use base::Descriptor;
@ -146,8 +145,7 @@ impl<F: AsRawDescriptor> OverlappedSource<F> {
io::Error::new(io::ErrorKind::InvalidInput, "seek on non-seekable handle"),
)));
}
let overlapped = create_overlapped(file_offset, None);
let mut overlapped_op = self.reg_source.register_overlapped_operation(overlapped)?;
let mut overlapped_op = self.reg_source.register_overlapped_operation(file_offset)?;
// SAFETY:
// Safe because we pass a pointer to a valid vec and that same vector's length.
@ -188,8 +186,7 @@ impl<F: AsRawDescriptor> OverlappedSource<F> {
};
for region in mem_offsets.into_iter() {
let overlapped = create_overlapped(offset, None);
let mut overlapped_op = self.reg_source.register_overlapped_operation(overlapped)?;
let mut overlapped_op = self.reg_source.register_overlapped_operation(offset)?;
let slice = mem.get_volatile_slice(region).map_err(|e| {
AsyncError::OverlappedSource(Error::BackingMemoryVolatileSliceFetchFailed(e))
@ -236,8 +233,7 @@ impl<F: AsRawDescriptor> OverlappedSource<F> {
io::Error::new(io::ErrorKind::InvalidInput, "seek on non-seekable handle"),
)));
}
let overlapped = create_overlapped(file_offset, None);
let mut overlapped_op = self.reg_source.register_overlapped_operation(overlapped)?;
let mut overlapped_op = self.reg_source.register_overlapped_operation(file_offset)?;
// SAFETY:
// Safe because we pass a pointer to a valid vec and that same vector's length.
@ -279,8 +275,7 @@ impl<F: AsRawDescriptor> OverlappedSource<F> {
};
for region in mem_offsets.into_iter() {
let overlapped = create_overlapped(offset, None);
let mut overlapped_op = self.reg_source.register_overlapped_operation(overlapped)?;
let mut overlapped_op = self.reg_source.register_overlapped_operation(offset)?;
let slice = mem.get_volatile_slice(region).map_err(|e| {
AsyncError::OverlappedSource(Error::BackingMemoryVolatileSliceFetchFailed(e))