base: service_ipc: Add and use MultiPartMesgPipe

Messages to/from service are multipart - variable length message
follows fixed length header. MultiPartMessage ensures that
(Header, Message) pair is read/written by one call from one thread.

Bug: 269191436
Test: flat run emulator
Change-Id: Icaf43ad99049f2b593aa20462ec1410d2db6b8a1
Reviewed-on: https://chromium-review.googlesource.com/c/crosvm/crosvm/+/4973856
Commit-Queue: Vikram Auradkar <auradkar@google.com>
Reviewed-by: Vikram Auradkar <auradkar@google.com>
Auto-Submit: Kaiyi Li <kaiyili@google.com>
This commit is contained in:
Vikram Auradkar 2023-03-10 17:28:22 -08:00 committed by crosvm LUCI
parent 26e17ab549
commit 87d6de735c
2 changed files with 212 additions and 101 deletions

View file

@ -10,12 +10,15 @@ use std::mem;
use std::os::windows::fs::OpenOptionsExt;
use std::process;
use std::ptr;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use rand::Rng;
use serde::Deserialize;
use serde::Serialize;
use sync::Mutex;
use win_util::fail_if_zero;
use win_util::SecurityAttributes;
use win_util::SelfRelativeSecurityDescriptor;
@ -655,34 +658,6 @@ impl PipeConnection {
Ok(())
}
/// Reads a variable size message and returns the message on success.
/// The size of the message is expected to proceed the message in
/// the form of `header_size` message.
///
/// `parse_message_size` lets caller parse the header to extract
/// message size.
///
/// Event on `exit_event` is used to interrupt the blocked read.
pub fn read_overlapped_blocking_message<F: FnOnce(&[u8]) -> usize>(
&mut self,
header_size: usize,
parse_message_size: F,
overlapped_wrapper: &mut OverlappedWrapper,
exit_event: &Event,
) -> Result<Vec<u8>> {
let mut header = vec![0; header_size];
header.resize_with(header_size, Default::default);
self.read_overlapped_blocking(&mut header, overlapped_wrapper, exit_event)?;
let message_size = parse_message_size(&header);
if message_size == 0 {
return Ok(vec![]);
}
let mut buf = vec![];
buf.resize_with(message_size, Default::default);
self.read_overlapped_blocking(&mut buf, overlapped_wrapper, exit_event)?;
Ok(buf)
}
/// Gets the size in bytes of data in the pipe.
///
/// Note that PeekNamedPipes (the underlying win32 API) will return zero if the packets have
@ -713,32 +688,6 @@ impl PipeConnection {
unsafe { PipeConnection::write_internal(&self.handle, buf, None) }
}
/// Sends, blockingly,`buf` over the pipe in its entirety. Partial write is considered
/// as a failure.
pub fn write_overlapped_blocking_message<T: PipeSendable>(
&mut self,
buf: &[T],
overlapped_wrapper: &mut OverlappedWrapper,
) -> Result<()> {
// SAFETY: buf & overlapped_wrapper live until the overlapped operation is
// complete, so this is safe.
unsafe { self.write_overlapped(buf, overlapped_wrapper)? };
let size_written_in_bytes = self.get_overlapped_result(overlapped_wrapper)?;
if size_written_in_bytes as usize != buf.len() {
return Err(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
format!(
"Short write expected:{} found:{}",
size_written_in_bytes,
buf.len(),
),
));
}
Ok(())
}
/// Similar to `PipeConnection::write` except it also allows:
/// 1. The same end of the named pipe to read and write at the same time in different
/// threads.
@ -1087,9 +1036,160 @@ pub struct NamedPipeInfo {
pub flags: u32,
}
/// This is a wrapper around PipeConnection. This allows a read and a write operations
/// to run in parallel but not multiple reads or writes in parallel.
///
/// Reason: The message from/to service are two-parts - a fixed size header that
/// contains the size of the actual message. By allowing only one write at a time
/// we ensure that the variable size message is written/read right after writing/reading
/// fixed size header. For example it avoid sending or receiving in messages in order like
/// H1, H2, M1, M2
/// - where header H1 and its message M1 are sent by one event loop and H2 and its
/// message M2 are sent by another event loop.
///
/// Do not expose direct access to reader or writer pipes.
///
/// The struct is clone-able so that different event loops can talk to the other end.
#[derive(Clone)]
pub struct MultiPartMessagePipe {
// Lock protected pipe to receive messages.
reader: Arc<Mutex<PipeConnection>>,
// Lock protected pipe to send messages.
writer: Arc<Mutex<PipeConnection>>,
// Whether this end is created as server or client. The variable helps to
// decide if something meanigful should be done when `wait_for_connection` is called.
is_server: bool,
// Always true if pipe is created as client.
// Defaults to false on server. Updated to true on calling `wait_for_connection`
// after a client connects.
is_connected: Arc<AtomicBool>,
}
impl MultiPartMessagePipe {
fn create_from_pipe(pipe: PipeConnection, is_server: bool) -> Result<Self> {
Ok(Self {
reader: Arc::new(Mutex::new(pipe.try_clone()?)),
writer: Arc::new(Mutex::new(pipe)),
is_server,
is_connected: Arc::new(AtomicBool::new(false)),
})
}
/// Create client side of MutiPartMessagePipe.
pub fn create_as_client(pipe_name: &str) -> Result<Self> {
let pipe = create_client_pipe(
&format!(r"\\.\pipe\{}", pipe_name),
&FramingMode::Message,
&BlockingMode::Wait,
/* overlapped= */ true,
)?;
Self::create_from_pipe(pipe, false)
}
/// Create server side of MutiPartMessagePipe.
pub fn create_as_server(pipe_name: &str) -> Result<Self> {
let pipe = create_server_pipe(
&format!(r"\\.\pipe\{}", pipe_name,),
&FramingMode::Message,
&BlockingMode::Wait,
0,
1024 * 1024,
true,
)?;
Self::create_from_pipe(pipe, true)
}
/// If the struct is created as a server then waits for client connection to arrive.
/// It only waits on reader as reader and writer are clones.
pub fn wait_for_connection(&self) -> Result<()> {
if self.is_server && !self.is_connected.load(Ordering::Relaxed) {
self.reader.lock().wait_for_client_connection()?;
self.is_connected.store(true, Ordering::Relaxed);
}
Ok(())
}
/// # Safety
/// `buf` and `overlapped_wrapper` will be in use for the duration of
/// the overlapped operation. These must not be reused and must live until
/// after `get_overlapped_result()` has been called which is done right
/// after this call.
fn write_overlapped_blocking_message_internal<T: PipeSendable>(
pipe: &mut PipeConnection,
buf: &[T],
overlapped_wrapper: &mut OverlappedWrapper,
) -> Result<()> {
unsafe {
pipe.write_overlapped(buf, overlapped_wrapper)?;
}
let size_written_in_bytes = pipe.get_overlapped_result(overlapped_wrapper)?;
if size_written_in_bytes as usize != buf.len() {
return Err(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
format!(
"Short write expected:{} found:{}",
size_written_in_bytes,
buf.len(),
),
));
}
Ok(())
}
/// Sends, blockingly,`buf` over the pipe in its entirety. Partial write is considered
pub fn write_overlapped_blocking_message<T: PipeSendable>(
&self,
header: &[T],
message: &[T],
overlapped_wrapper: &mut OverlappedWrapper,
) -> Result<()> {
let mut writer = self.writer.lock();
Self::write_overlapped_blocking_message_internal(&mut writer, header, overlapped_wrapper)?;
Self::write_overlapped_blocking_message_internal(&mut writer, message, overlapped_wrapper)
}
/// Reads a variable size message and returns the message on success.
/// The size of the message is expected to proceed the message in
/// the form of `header_size` message.
///
/// `parse_message_size` lets caller parse the header to extract
/// message size.
///
/// Event on `exit_event` is used to interrupt the blocked read.
pub fn read_overlapped_blocking_message<F: FnOnce(&[u8]) -> usize>(
&self,
header_size: usize,
parse_message_size: F,
overlapped_wrapper: &mut OverlappedWrapper,
exit_event: &Event,
) -> Result<Vec<u8>> {
let mut pipe = self.reader.lock();
let mut header = vec![0; header_size];
header.resize_with(header_size, Default::default);
pipe.read_overlapped_blocking(&mut header, overlapped_wrapper, exit_event)?;
let message_size = parse_message_size(&header);
if message_size == 0 {
return Ok(vec![]);
}
let mut buf = vec![];
buf.resize_with(message_size, Default::default);
pipe.read_overlapped_blocking(&mut buf, overlapped_wrapper, exit_event)?;
Ok(buf)
}
}
impl TryFrom<PipeConnection> for MultiPartMessagePipe {
type Error = std::io::Error;
fn try_from(pipe: PipeConnection) -> Result<Self> {
Self::create_from_pipe(pipe, false)
}
}
#[cfg(test)]
mod tests {
use std::mem::size_of;
use std::thread::JoinHandle;
use std::time::Duration;
use super::*;
@ -1320,47 +1420,58 @@ mod tests {
)
}
fn send_receive_msgs(pipe: MultiPartMessagePipe, msg_count: u32) -> JoinHandle<()> {
let messages = ["a", "bb", "ccc", "dddd", "eeeee", "ffffff"];
std::thread::spawn(move || {
let mut overlapped_wrapper = OverlappedWrapper::new(/* include_event= */ true).unwrap();
let exit_event = Event::new().unwrap();
for _i in 0..msg_count {
let message = *messages
.get(rand::thread_rng().gen::<usize>() % messages.len())
.unwrap();
pipe.write_overlapped_blocking_message(
&message.len().to_be_bytes(),
message.as_bytes(),
&mut overlapped_wrapper,
)
.unwrap();
}
for _i in 0..msg_count {
let message = pipe
.read_overlapped_blocking_message(
size_of::<usize>(),
|bytes: &[u8]| {
assert_eq!(bytes.len(), size_of::<usize>());
usize::from_be_bytes(
bytes.try_into().expect("failed to get array from slice"),
)
},
&mut overlapped_wrapper,
&exit_event,
)
.unwrap();
assert_eq!(
*messages.get(message.len() - 1).unwrap(),
std::str::from_utf8(&message).unwrap(),
);
}
})
}
#[test]
fn read_write_overlapped_message() {
fn multipart_message_smoke_test() {
let pipe_name = generate_pipe_name();
let mut p1 = create_server_pipe(
&pipe_name,
&FramingMode::Message,
&BlockingMode::Wait,
/* timeout= */ 0,
/* buffer_size= */ 1000,
/* overlapped= */ true,
)
.unwrap();
let mut p2 = create_client_pipe(
&pipe_name,
&FramingMode::Message,
&BlockingMode::Wait,
/* overlapped= */ true,
)
.unwrap();
// Safe because `read_overlapped` can be called since overlapped struct is created.
let mut p1_overlapped_wrapper = OverlappedWrapper::new(/* include_event= */ true).unwrap();
const MSG: [u8; 6] = [75, 77, 54, 82, 76, 65];
p1.write_overlapped_blocking_message(&MSG.len().to_be_bytes(), &mut p1_overlapped_wrapper)
.unwrap();
p1.write_overlapped_blocking_message(&MSG, &mut p1_overlapped_wrapper)
.unwrap();
let mut p2_overlapped_wrapper = OverlappedWrapper::new(/* include_event= */ true).unwrap();
let exit_event = Event::new().unwrap();
let recv_buffer = p2
.read_overlapped_blocking_message(
size_of::<usize>(),
|buf| usize::from_be_bytes(buf.try_into().expect("failed to get array from slice")),
&mut p2_overlapped_wrapper,
&exit_event,
)
.unwrap();
assert_eq!(recv_buffer, MSG);
let server = MultiPartMessagePipe::create_as_server(&pipe_name).unwrap();
let client = MultiPartMessagePipe::create_as_client(&pipe_name).unwrap();
let handles = [
send_receive_msgs(server.clone(), 100),
send_receive_msgs(client.clone(), 100),
send_receive_msgs(server, 100),
send_receive_msgs(client, 100),
];
for h in handles {
h.join().unwrap();
}
}
#[test]

View file

@ -12,10 +12,10 @@ use std::path::Path;
use base::error;
use base::named_pipes::BlockingMode;
use base::named_pipes::FramingMode;
use base::named_pipes::MultiPartMessagePipe;
use base::named_pipes::OverlappedWrapper;
use base::Error;
use base::Event;
use base::PipeConnection;
use base::PipeTube;
use hypervisor::MemSlot;
use hypervisor::Vm;
@ -71,16 +71,16 @@ pub fn handle_request<T: AsRef<Path> + std::fmt::Debug>(
///
/// A helper function to keep communication with service consistent across crosvm code.
pub fn send_service_message(
connection: &mut PipeConnection,
connection: &MultiPartMessagePipe,
message: &[u8],
overlapped_wrapper: &mut OverlappedWrapper,
) -> Result<()> {
let size_in_bytes = message.len() as u32;
connection
.write_overlapped_blocking_message(&size_in_bytes.to_be_bytes(), overlapped_wrapper)?;
connection.write_overlapped_blocking_message(message, overlapped_wrapper)?;
Ok(())
connection.write_overlapped_blocking_message(
&size_in_bytes.to_be_bytes(),
message,
overlapped_wrapper,
)
}
/// Read and wait for the header to arrive in the named pipe. Once header is available, use the
@ -88,7 +88,7 @@ pub fn send_service_message(
///
/// A helper function to keep communication with service consistent across crosvm code.
pub fn recv_service_message(
connection: &mut PipeConnection,
connection: &MultiPartMessagePipe,
overlapped_wrapper: &mut OverlappedWrapper,
exit_event: &Event,
) -> Result<Vec<u8>> {