From 87d6de735cb6d77f8b5cff96fef103705eb16464 Mon Sep 17 00:00:00 2001 From: Vikram Auradkar Date: Fri, 10 Mar 2023 17:28:22 -0800 Subject: [PATCH] base: service_ipc: Add and use MultiPartMesgPipe Messages to/from service are multipart - variable length message follows fixed length header. MultiPartMessage ensures that (Header, Message) pair is read/written by one call from one thread. Bug: 269191436 Test: flat run emulator Change-Id: Icaf43ad99049f2b593aa20462ec1410d2db6b8a1 Reviewed-on: https://chromium-review.googlesource.com/c/crosvm/crosvm/+/4973856 Commit-Queue: Vikram Auradkar Reviewed-by: Vikram Auradkar Auto-Submit: Kaiyi Li --- base/src/sys/windows/named_pipes.rs | 297 +++++++++++++++++++--------- vm_control/src/sys/windows.rs | 16 +- 2 files changed, 212 insertions(+), 101 deletions(-) diff --git a/base/src/sys/windows/named_pipes.rs b/base/src/sys/windows/named_pipes.rs index 5954101e6e..4ca07d3630 100644 --- a/base/src/sys/windows/named_pipes.rs +++ b/base/src/sys/windows/named_pipes.rs @@ -10,12 +10,15 @@ use std::mem; use std::os::windows::fs::OpenOptionsExt; use std::process; use std::ptr; +use std::sync::atomic::AtomicBool; use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering; +use std::sync::Arc; use rand::Rng; use serde::Deserialize; use serde::Serialize; +use sync::Mutex; use win_util::fail_if_zero; use win_util::SecurityAttributes; use win_util::SelfRelativeSecurityDescriptor; @@ -655,34 +658,6 @@ impl PipeConnection { Ok(()) } - /// Reads a variable size message and returns the message on success. - /// The size of the message is expected to proceed the message in - /// the form of `header_size` message. - /// - /// `parse_message_size` lets caller parse the header to extract - /// message size. - /// - /// Event on `exit_event` is used to interrupt the blocked read. - pub fn read_overlapped_blocking_message usize>( - &mut self, - header_size: usize, - parse_message_size: F, - overlapped_wrapper: &mut OverlappedWrapper, - exit_event: &Event, - ) -> Result> { - let mut header = vec![0; header_size]; - header.resize_with(header_size, Default::default); - self.read_overlapped_blocking(&mut header, overlapped_wrapper, exit_event)?; - let message_size = parse_message_size(&header); - if message_size == 0 { - return Ok(vec![]); - } - let mut buf = vec![]; - buf.resize_with(message_size, Default::default); - self.read_overlapped_blocking(&mut buf, overlapped_wrapper, exit_event)?; - Ok(buf) - } - /// Gets the size in bytes of data in the pipe. /// /// Note that PeekNamedPipes (the underlying win32 API) will return zero if the packets have @@ -713,32 +688,6 @@ impl PipeConnection { unsafe { PipeConnection::write_internal(&self.handle, buf, None) } } - /// Sends, blockingly,`buf` over the pipe in its entirety. Partial write is considered - /// as a failure. - pub fn write_overlapped_blocking_message( - &mut self, - buf: &[T], - overlapped_wrapper: &mut OverlappedWrapper, - ) -> Result<()> { - // SAFETY: buf & overlapped_wrapper live until the overlapped operation is - // complete, so this is safe. - unsafe { self.write_overlapped(buf, overlapped_wrapper)? }; - - let size_written_in_bytes = self.get_overlapped_result(overlapped_wrapper)?; - - if size_written_in_bytes as usize != buf.len() { - return Err(std::io::Error::new( - std::io::ErrorKind::UnexpectedEof, - format!( - "Short write expected:{} found:{}", - size_written_in_bytes, - buf.len(), - ), - )); - } - Ok(()) - } - /// Similar to `PipeConnection::write` except it also allows: /// 1. The same end of the named pipe to read and write at the same time in different /// threads. @@ -1087,9 +1036,160 @@ pub struct NamedPipeInfo { pub flags: u32, } +/// This is a wrapper around PipeConnection. This allows a read and a write operations +/// to run in parallel but not multiple reads or writes in parallel. +/// +/// Reason: The message from/to service are two-parts - a fixed size header that +/// contains the size of the actual message. By allowing only one write at a time +/// we ensure that the variable size message is written/read right after writing/reading +/// fixed size header. For example it avoid sending or receiving in messages in order like +/// H1, H2, M1, M2 +/// - where header H1 and its message M1 are sent by one event loop and H2 and its +/// message M2 are sent by another event loop. +/// +/// Do not expose direct access to reader or writer pipes. +/// +/// The struct is clone-able so that different event loops can talk to the other end. +#[derive(Clone)] +pub struct MultiPartMessagePipe { + // Lock protected pipe to receive messages. + reader: Arc>, + // Lock protected pipe to send messages. + writer: Arc>, + // Whether this end is created as server or client. The variable helps to + // decide if something meanigful should be done when `wait_for_connection` is called. + is_server: bool, + // Always true if pipe is created as client. + // Defaults to false on server. Updated to true on calling `wait_for_connection` + // after a client connects. + is_connected: Arc, +} + +impl MultiPartMessagePipe { + fn create_from_pipe(pipe: PipeConnection, is_server: bool) -> Result { + Ok(Self { + reader: Arc::new(Mutex::new(pipe.try_clone()?)), + writer: Arc::new(Mutex::new(pipe)), + is_server, + is_connected: Arc::new(AtomicBool::new(false)), + }) + } + + /// Create client side of MutiPartMessagePipe. + pub fn create_as_client(pipe_name: &str) -> Result { + let pipe = create_client_pipe( + &format!(r"\\.\pipe\{}", pipe_name), + &FramingMode::Message, + &BlockingMode::Wait, + /* overlapped= */ true, + )?; + Self::create_from_pipe(pipe, false) + } + + /// Create server side of MutiPartMessagePipe. + pub fn create_as_server(pipe_name: &str) -> Result { + let pipe = create_server_pipe( + &format!(r"\\.\pipe\{}", pipe_name,), + &FramingMode::Message, + &BlockingMode::Wait, + 0, + 1024 * 1024, + true, + )?; + Self::create_from_pipe(pipe, true) + } + + /// If the struct is created as a server then waits for client connection to arrive. + /// It only waits on reader as reader and writer are clones. + pub fn wait_for_connection(&self) -> Result<()> { + if self.is_server && !self.is_connected.load(Ordering::Relaxed) { + self.reader.lock().wait_for_client_connection()?; + self.is_connected.store(true, Ordering::Relaxed); + } + Ok(()) + } + + /// # Safety + /// `buf` and `overlapped_wrapper` will be in use for the duration of + /// the overlapped operation. These must not be reused and must live until + /// after `get_overlapped_result()` has been called which is done right + /// after this call. + fn write_overlapped_blocking_message_internal( + pipe: &mut PipeConnection, + buf: &[T], + overlapped_wrapper: &mut OverlappedWrapper, + ) -> Result<()> { + unsafe { + pipe.write_overlapped(buf, overlapped_wrapper)?; + } + + let size_written_in_bytes = pipe.get_overlapped_result(overlapped_wrapper)?; + + if size_written_in_bytes as usize != buf.len() { + return Err(std::io::Error::new( + std::io::ErrorKind::UnexpectedEof, + format!( + "Short write expected:{} found:{}", + size_written_in_bytes, + buf.len(), + ), + )); + } + Ok(()) + } + /// Sends, blockingly,`buf` over the pipe in its entirety. Partial write is considered + pub fn write_overlapped_blocking_message( + &self, + header: &[T], + message: &[T], + overlapped_wrapper: &mut OverlappedWrapper, + ) -> Result<()> { + let mut writer = self.writer.lock(); + Self::write_overlapped_blocking_message_internal(&mut writer, header, overlapped_wrapper)?; + Self::write_overlapped_blocking_message_internal(&mut writer, message, overlapped_wrapper) + } + + /// Reads a variable size message and returns the message on success. + /// The size of the message is expected to proceed the message in + /// the form of `header_size` message. + /// + /// `parse_message_size` lets caller parse the header to extract + /// message size. + /// + /// Event on `exit_event` is used to interrupt the blocked read. + pub fn read_overlapped_blocking_message usize>( + &self, + header_size: usize, + parse_message_size: F, + overlapped_wrapper: &mut OverlappedWrapper, + exit_event: &Event, + ) -> Result> { + let mut pipe = self.reader.lock(); + let mut header = vec![0; header_size]; + header.resize_with(header_size, Default::default); + pipe.read_overlapped_blocking(&mut header, overlapped_wrapper, exit_event)?; + let message_size = parse_message_size(&header); + if message_size == 0 { + return Ok(vec![]); + } + let mut buf = vec![]; + buf.resize_with(message_size, Default::default); + pipe.read_overlapped_blocking(&mut buf, overlapped_wrapper, exit_event)?; + Ok(buf) + } +} + +impl TryFrom for MultiPartMessagePipe { + type Error = std::io::Error; + fn try_from(pipe: PipeConnection) -> Result { + Self::create_from_pipe(pipe, false) + } +} + #[cfg(test)] mod tests { use std::mem::size_of; + use std::thread::JoinHandle; use std::time::Duration; use super::*; @@ -1320,47 +1420,58 @@ mod tests { ) } + fn send_receive_msgs(pipe: MultiPartMessagePipe, msg_count: u32) -> JoinHandle<()> { + let messages = ["a", "bb", "ccc", "dddd", "eeeee", "ffffff"]; + std::thread::spawn(move || { + let mut overlapped_wrapper = OverlappedWrapper::new(/* include_event= */ true).unwrap(); + let exit_event = Event::new().unwrap(); + for _i in 0..msg_count { + let message = *messages + .get(rand::thread_rng().gen::() % messages.len()) + .unwrap(); + pipe.write_overlapped_blocking_message( + &message.len().to_be_bytes(), + message.as_bytes(), + &mut overlapped_wrapper, + ) + .unwrap(); + } + for _i in 0..msg_count { + let message = pipe + .read_overlapped_blocking_message( + size_of::(), + |bytes: &[u8]| { + assert_eq!(bytes.len(), size_of::()); + usize::from_be_bytes( + bytes.try_into().expect("failed to get array from slice"), + ) + }, + &mut overlapped_wrapper, + &exit_event, + ) + .unwrap(); + assert_eq!( + *messages.get(message.len() - 1).unwrap(), + std::str::from_utf8(&message).unwrap(), + ); + } + }) + } + #[test] - fn read_write_overlapped_message() { + fn multipart_message_smoke_test() { let pipe_name = generate_pipe_name(); - - let mut p1 = create_server_pipe( - &pipe_name, - &FramingMode::Message, - &BlockingMode::Wait, - /* timeout= */ 0, - /* buffer_size= */ 1000, - /* overlapped= */ true, - ) - .unwrap(); - - let mut p2 = create_client_pipe( - &pipe_name, - &FramingMode::Message, - &BlockingMode::Wait, - /* overlapped= */ true, - ) - .unwrap(); - - // Safe because `read_overlapped` can be called since overlapped struct is created. - let mut p1_overlapped_wrapper = OverlappedWrapper::new(/* include_event= */ true).unwrap(); - const MSG: [u8; 6] = [75, 77, 54, 82, 76, 65]; - p1.write_overlapped_blocking_message(&MSG.len().to_be_bytes(), &mut p1_overlapped_wrapper) - .unwrap(); - p1.write_overlapped_blocking_message(&MSG, &mut p1_overlapped_wrapper) - .unwrap(); - - let mut p2_overlapped_wrapper = OverlappedWrapper::new(/* include_event= */ true).unwrap(); - let exit_event = Event::new().unwrap(); - let recv_buffer = p2 - .read_overlapped_blocking_message( - size_of::(), - |buf| usize::from_be_bytes(buf.try_into().expect("failed to get array from slice")), - &mut p2_overlapped_wrapper, - &exit_event, - ) - .unwrap(); - assert_eq!(recv_buffer, MSG); + let server = MultiPartMessagePipe::create_as_server(&pipe_name).unwrap(); + let client = MultiPartMessagePipe::create_as_client(&pipe_name).unwrap(); + let handles = [ + send_receive_msgs(server.clone(), 100), + send_receive_msgs(client.clone(), 100), + send_receive_msgs(server, 100), + send_receive_msgs(client, 100), + ]; + for h in handles { + h.join().unwrap(); + } } #[test] diff --git a/vm_control/src/sys/windows.rs b/vm_control/src/sys/windows.rs index 16ed2d32b2..45696f9226 100644 --- a/vm_control/src/sys/windows.rs +++ b/vm_control/src/sys/windows.rs @@ -12,10 +12,10 @@ use std::path::Path; use base::error; use base::named_pipes::BlockingMode; use base::named_pipes::FramingMode; +use base::named_pipes::MultiPartMessagePipe; use base::named_pipes::OverlappedWrapper; use base::Error; use base::Event; -use base::PipeConnection; use base::PipeTube; use hypervisor::MemSlot; use hypervisor::Vm; @@ -71,16 +71,16 @@ pub fn handle_request + std::fmt::Debug>( /// /// A helper function to keep communication with service consistent across crosvm code. pub fn send_service_message( - connection: &mut PipeConnection, + connection: &MultiPartMessagePipe, message: &[u8], overlapped_wrapper: &mut OverlappedWrapper, ) -> Result<()> { let size_in_bytes = message.len() as u32; - - connection - .write_overlapped_blocking_message(&size_in_bytes.to_be_bytes(), overlapped_wrapper)?; - connection.write_overlapped_blocking_message(message, overlapped_wrapper)?; - Ok(()) + connection.write_overlapped_blocking_message( + &size_in_bytes.to_be_bytes(), + message, + overlapped_wrapper, + ) } /// Read and wait for the header to arrive in the named pipe. Once header is available, use the @@ -88,7 +88,7 @@ pub fn send_service_message( /// /// A helper function to keep communication with service consistent across crosvm code. pub fn recv_service_message( - connection: &mut PipeConnection, + connection: &MultiPartMessagePipe, overlapped_wrapper: &mut OverlappedWrapper, exit_event: &Event, ) -> Result> {