diff --git a/base/src/event.rs b/base/src/event.rs index 82ae3f76bc..490f798e82 100644 --- a/base/src/event.rs +++ b/base/src/event.rs @@ -36,7 +36,7 @@ impl Event { self.0.read() } - pub fn read_timeout(&mut self, timeout: Duration) -> Result { + pub fn read_timeout(&self, timeout: Duration) -> Result { self.0.read_timeout(timeout) } diff --git a/base/src/lib.rs b/base/src/lib.rs index 9ce6db618b..d300186bbc 100644 --- a/base/src/lib.rs +++ b/base/src/lib.rs @@ -23,6 +23,7 @@ pub mod common; mod event; mod ioctl; mod mmap; +mod notifiers; mod shm; mod timer; mod tube; @@ -40,9 +41,10 @@ pub use ioctl::{ pub use mmap::{ MemoryMapping, MemoryMappingBuilder, MemoryMappingBuilderUnix, Unix as MemoryMappingUnix, }; +pub use notifiers::*; pub use shm::{SharedMemory, Unix as SharedMemoryUnix}; pub use timer::{FakeTimer, Timer}; -pub use tube::{Error as TubeError, Result as TubeResult, Tube}; +pub use tube::{Error as TubeError, RecvTube, Result as TubeResult, SendTube, Tube}; pub use wait_context::{EventToken, EventType, TriggeredEvent, WaitContext}; /// Wraps an AsRawDescriptor in the simple Descriptor struct, which diff --git a/base/src/notifiers.rs b/base/src/notifiers.rs new file mode 100644 index 0000000000..b01cd2fdd2 --- /dev/null +++ b/base/src/notifiers.rs @@ -0,0 +1,33 @@ +// Copyright 2022 The Chromium OS Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#[cfg(unix)] +use std::os::unix::net::UnixStream; + +use crate::AsRawDescriptor; + +pub trait ReadNotifier { + /// Gets a descriptor that can be used in EventContext to wait for events to be available (e.g. + /// to avoid receive_events blocking). + fn get_read_notifier(&self) -> &dyn AsRawDescriptor; +} + +pub trait CloseNotifier { + /// Gets a descriptor that can be used in EventContext to wait for the closed event. + fn get_close_notifier(&self) -> &dyn AsRawDescriptor; +} + +#[cfg(unix)] +impl ReadNotifier for UnixStream { + fn get_read_notifier(&self) -> &dyn AsRawDescriptor { + self + } +} + +#[cfg(unix)] +impl CloseNotifier for UnixStream { + fn get_close_notifier(&self) -> &dyn AsRawDescriptor { + self + } +} diff --git a/base/src/tube.rs b/base/src/tube.rs index be38122883..f1c8056b8c 100644 --- a/base/src/tube.rs +++ b/base/src/tube.rs @@ -2,39 +2,119 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. -use std::{ - io::{ - IoSlice, {self}, - }, - marker::PhantomData, - os::unix::prelude::{AsRawFd, RawFd}, - time::Duration, -}; - -use crate::{FromRawDescriptor, SafeDescriptor, ScmSocket, UnixSeqpacket, UnsyncMarker}; - -use crate::unix::{ - deserialize_with_descriptors, AsRawDescriptor, RawDescriptor, SerializeDescriptors, -}; use remain::sorted; -use serde::{de::DeserializeOwned, Deserialize, Serialize}; +use std::io; + use thiserror::Error as ThisError; +#[cfg_attr(windows, path = "win/tube.rs")] +#[cfg_attr(not(windows), path = "unix/tube.rs")] +mod tube; +use serde::{de::DeserializeOwned, Deserialize, Serialize}; +use std::time::Duration; +pub use tube::*; + +impl Tube { + /// Creates a Send/Recv pair of Tubes. + pub fn directional_pair() -> Result<(SendTube, RecvTube)> { + let (t1, t2) = Self::pair()?; + Ok((SendTube(t1), RecvTube(t2))) + } +} + +#[derive(Serialize, Deserialize)] +#[serde(transparent)] +/// A Tube end which can only send messages. Cloneable. +pub struct SendTube(Tube); + +#[allow(dead_code)] +impl SendTube { + /// TODO(b/145998747, b/184398671): this method should be removed. + pub fn set_send_timeout(&self, _timeout: Option) -> Result<()> { + unimplemented!("To be removed/refactored upstream."); + } + + pub fn send(&self, msg: &T) -> Result<()> { + self.0.send(msg) + } + + pub fn try_clone(&self) -> Result { + Ok(SendTube( + #[allow(deprecated)] + self.0.try_clone()?, + )) + } + + /// Never call this function, it is for use by cros_async to provide + /// directional wrapper types only. Using it in any other context may + /// violate concurrency assumptions. (Type splitting across crates has put + /// us in a situation where we can't use Rust privacy to enforce this.) + #[deprecated] + pub fn into_tube(self) -> Tube { + self.0 + } +} + +#[derive(Serialize, Deserialize)] +#[serde(transparent)] +/// A Tube end which can only recv messages. +pub struct RecvTube(Tube); + +#[allow(dead_code)] +impl RecvTube { + pub fn recv(&self) -> Result { + self.0.recv() + } + + /// TODO(b/145998747, b/184398671): this method should be removed. + pub fn set_recv_timeout(&self, _timeout: Option) -> Result<()> { + unimplemented!("To be removed/refactored upstream."); + } + + /// Never call this function, it is for use by cros_async to provide + /// directional wrapper types only. Using it in any other context may + /// violate concurrency assumptions. (Type splitting across crates has put + /// us in a situation where we can't use Rust privacy to enforce this.) + #[deprecated] + pub fn into_tube(self) -> Tube { + self.0 + } +} + #[sorted] #[derive(ThisError, Debug)] pub enum Error { - #[error("failed to clone UnixSeqpacket: {0}")] + #[cfg(windows)] + #[error("attempt to duplicate descriptor via broker failed")] + BrokerDupDescriptor, + #[error("failed to clone transport: {0}")] Clone(io::Error), #[error("tube was disconnected")] Disconnected, + #[error("failed to duplicate descriptor: {0}")] + DupDescriptor(io::Error), + #[cfg(windows)] + #[error("failed to flush named pipe: {0}")] + Flush(io::Error), #[error("failed to serialize/deserialize json from packet: {0}")] Json(serde_json::Error), + #[error("cancelled a queued async operation")] + OperationCancelled, #[error("failed to crate tube pair: {0}")] Pair(io::Error), #[error("failed to receive packet: {0}")] Recv(io::Error), + #[error("Received a message with a zero sized body. This should not happen.")] + RecvUnexpectedEmptyBody, #[error("failed to send packet: {0}")] + #[cfg(unix)] Send(crate::unix::Error), + #[cfg(windows)] + Send(crate::windows::Error), + #[error("failed to send packet: {0}")] + SendIo(io::Error), + #[error("failed to write packet to intermediate buffer: {0}")] + SendIoBuf(io::Error), #[error("failed to set recv timeout: {0}")] SetRecvTimeout(io::Error), #[error("failed to set send timeout: {0}")] @@ -43,105 +123,6 @@ pub enum Error { pub type Result = std::result::Result; -/// Bidirectional tube that support both send and recv. -#[derive(Serialize, Deserialize)] -pub struct Tube { - socket: UnixSeqpacket, - _unsync_marker: UnsyncMarker, -} - -impl Tube { - /// Create a pair of connected tubes. Request is send in one direction while response is in the - /// other direction. - pub fn pair() -> Result<(Tube, Tube)> { - let (socket1, socket2) = UnixSeqpacket::pair().map_err(Error::Pair)?; - let tube1 = Tube::new(socket1); - let tube2 = Tube::new(socket2); - Ok((tube1, tube2)) - } - - // Create a new `Tube`. - pub fn new(socket: UnixSeqpacket) -> Tube { - Tube { - socket, - _unsync_marker: PhantomData, - } - } - - pub fn try_clone(&self) -> Result { - self.socket.try_clone().map(Tube::new).map_err(Error::Clone) - } - - pub fn send(&self, msg: &T) -> Result<()> { - let msg_serialize = SerializeDescriptors::new(&msg); - let msg_json = serde_json::to_vec(&msg_serialize).map_err(Error::Json)?; - let msg_descriptors = msg_serialize.into_descriptors(); - - self.socket - .send_with_fds(&[IoSlice::new(&msg_json)], &msg_descriptors) - .map_err(Error::Send)?; - Ok(()) - } - - pub fn recv(&self) -> Result { - let (msg_json, msg_descriptors) = - self.socket.recv_as_vec_with_fds().map_err(Error::Recv)?; - - if msg_json.is_empty() { - return Err(Error::Disconnected); - } - - let mut msg_descriptors_safe = msg_descriptors - .into_iter() - .map(|v| { - Some(unsafe { - // Safe because the socket returns new fds that are owned locally by this scope. - SafeDescriptor::from_raw_descriptor(v) - }) - }) - .collect(); - - deserialize_with_descriptors( - || serde_json::from_slice(&msg_json), - &mut msg_descriptors_safe, - ) - .map_err(Error::Json) - } - - pub fn set_send_timeout(&self, timeout: Option) -> Result<()> { - self.socket - .set_write_timeout(timeout) - .map_err(Error::SetSendTimeout) - } - - pub fn set_recv_timeout(&self, timeout: Option) -> Result<()> { - self.socket - .set_read_timeout(timeout) - .map_err(Error::SetRecvTimeout) - } -} - -impl FromRawDescriptor for Tube { - unsafe fn from_raw_descriptor(descriptor: RawDescriptor) -> Self { - Tube { - socket: UnixSeqpacket::from_raw_descriptor(descriptor), - _unsync_marker: PhantomData, - } - } -} - -impl AsRawDescriptor for Tube { - fn as_raw_descriptor(&self) -> RawDescriptor { - self.socket.as_raw_descriptor() - } -} - -impl AsRawFd for Tube { - fn as_raw_fd(&self) -> RawFd { - self.socket.as_raw_fd() - } -} - #[cfg(test)] mod tests { use super::*; @@ -150,9 +131,20 @@ mod tests { use std::{collections::HashMap, time::Duration}; use serde::{Deserialize, Serialize}; + use std::sync::{Arc, Barrier}; + use std::thread; + + #[derive(Serialize, Deserialize)] + struct DataStruct { + x: u32, + } + + // Magics to identify which producer sent a message (& detect corruption). + const PRODUCER_ID_1: u32 = 801279273; + const PRODUCER_ID_2: u32 = 345234861; #[track_caller] - fn test_event_pair(send: Event, mut recv: Event) { + fn test_event_pair(send: Event, recv: Event) { send.write(1).unwrap(); recv.read_timeout(Duration::from_secs(1)).unwrap(); } @@ -190,6 +182,64 @@ mod tests { test_event_pair(test_msg.b, recv_msg.b); } + /// Send messages to a Tube with the given identifier (see `consume_messages`; we use this to + /// track different message producers). + #[track_caller] + fn produce_messages(tube: SendTube, data: u32, barrier: Arc) -> SendTube { + let data = DataStruct { x: data }; + barrier.wait(); + for _ in 0..100 { + tube.send(&data).unwrap(); + } + tube + } + + /// Consumes the given number of messages from a Tube, returning the number messages read with + /// each producer ID. + #[track_caller] + fn consume_messages( + tube: RecvTube, + count: usize, + barrier: Arc, + ) -> (RecvTube, usize, usize) { + barrier.wait(); + + let mut id1_count = 0usize; + let mut id2_count = 0usize; + + for _ in 0..count { + let msg = tube.recv::().unwrap(); + match msg.x { + PRODUCER_ID_1 => id1_count += 1, + PRODUCER_ID_2 => id2_count += 1, + _ => panic!( + "want message with ID {} or {}; got message w/ ID {}.", + PRODUCER_ID_1, PRODUCER_ID_2, msg.x + ), + } + } + (tube, id1_count, id2_count) + } + + #[test] + fn send_recv_mpsc() { + let (p1, consumer) = Tube::directional_pair().unwrap(); + let p2 = p1.try_clone().unwrap(); + let start_block_p1 = Arc::new(Barrier::new(3)); + let start_block_p2 = start_block_p1.clone(); + let start_block_consumer = start_block_p1.clone(); + + let p1_thread = thread::spawn(move || produce_messages(p1, PRODUCER_ID_1, start_block_p1)); + let p2_thread = thread::spawn(move || produce_messages(p2, PRODUCER_ID_2, start_block_p2)); + + let (_tube, id1_count, id2_count) = consume_messages(consumer, 200, start_block_consumer); + assert_eq!(id1_count, 100); + assert_eq!(id2_count, 100); + + p1_thread.join().unwrap(); + p2_thread.join().unwrap(); + } + #[test] fn send_recv_hash_map() { let (s1, s2) = Tube::pair().unwrap(); diff --git a/base/src/unix/eventfd.rs b/base/src/unix/eventfd.rs index c27e570b47..0dc861be3a 100644 --- a/base/src/unix/eventfd.rs +++ b/base/src/unix/eventfd.rs @@ -93,7 +93,7 @@ impl EventFd { /// a timeout does not occur then the count is returned as a EventReadResult::Count(count), /// and the count is reset to 0. If a timeout does occur then this function will return /// EventReadResult::Timeout. - pub fn read_timeout(&mut self, timeout: Duration) -> Result { + pub fn read_timeout(&self, timeout: Duration) -> Result { let mut pfd = libc::pollfd { fd: self.as_raw_descriptor(), events: POLLIN, @@ -218,7 +218,7 @@ mod tests { #[test] fn timeout() { - let mut evt = EventFd::new().expect("failed to create eventfd"); + let evt = EventFd::new().expect("failed to create eventfd"); assert_eq!( evt.read_timeout(Duration::from_millis(1)) .expect("failed to read from eventfd with timeout"), diff --git a/base/src/unix/tube.rs b/base/src/unix/tube.rs new file mode 100644 index 0000000000..c2214c7011 --- /dev/null +++ b/base/src/unix/tube.rs @@ -0,0 +1,145 @@ +// Copyright 2021 The Chromium OS Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +use std::io::IoSlice; +use std::marker::PhantomData; +use std::os::unix::prelude::{AsRawFd, RawFd}; +use std::time::Duration; + +use crate::{ + tube::{Error, RecvTube, Result, SendTube}, + unix::deserialize_with_descriptors, + unix::SerializeDescriptors, + AsRawDescriptor, FromRawDescriptor, RawDescriptor, ReadNotifier, SafeDescriptor, ScmSocket, + UnixSeqpacket, UnsyncMarker, +}; + +use serde::{de::DeserializeOwned, Deserialize, Serialize}; + +/// Bidirectional tube that support both send and recv. +#[derive(Serialize, Deserialize)] +pub struct Tube { + socket: UnixSeqpacket, + + // Windows is !Sync. We share that characteristic to prevent writing cross-platform incompatible + // code. + _unsync_marker: UnsyncMarker, +} + +impl Tube { + /// Create a pair of connected tubes. Request is sent in one direction while response is in the + /// other direction. + pub fn pair() -> Result<(Tube, Tube)> { + let (socket1, socket2) = UnixSeqpacket::pair().map_err(Error::Pair)?; + let tube1 = Tube::new(socket1); + let tube2 = Tube::new(socket2); + Ok((tube1, tube2)) + } + + /// Create a new `Tube`. + pub fn new(socket: UnixSeqpacket) -> Tube { + Tube { + socket, + _unsync_marker: PhantomData, + } + } + + /// DO NOT USE this method directly as it will become private soon (b/221484449). Use a + /// directional Tube pair instead. + #[deprecated] + pub fn try_clone(&self) -> Result { + self.socket.try_clone().map(Tube::new).map_err(Error::Clone) + } + + pub fn send(&self, msg: &T) -> Result<()> { + let msg_serialize = SerializeDescriptors::new(&msg); + let msg_json = serde_json::to_vec(&msg_serialize).map_err(Error::Json)?; + let msg_descriptors = msg_serialize.into_descriptors(); + + self.socket + .send_with_fds(&[IoSlice::new(&msg_json)], &msg_descriptors) + .map_err(Error::Send)?; + Ok(()) + } + + pub fn recv(&self) -> Result { + let (msg_json, msg_descriptors) = + self.socket.recv_as_vec_with_fds().map_err(Error::Recv)?; + + if msg_json.is_empty() { + return Err(Error::Disconnected); + } + + let mut msg_descriptors_safe = msg_descriptors + .into_iter() + .map(|v| { + Some(unsafe { + // Safe because the socket returns new fds that are owned locally by this scope. + SafeDescriptor::from_raw_descriptor(v) + }) + }) + .collect(); + + deserialize_with_descriptors( + || serde_json::from_slice(&msg_json), + &mut msg_descriptors_safe, + ) + .map_err(Error::Json) + } + + pub fn set_send_timeout(&self, timeout: Option) -> Result<()> { + self.socket + .set_write_timeout(timeout) + .map_err(Error::SetSendTimeout) + } + + pub fn set_recv_timeout(&self, timeout: Option) -> Result<()> { + self.socket + .set_read_timeout(timeout) + .map_err(Error::SetRecvTimeout) + } +} + +impl AsRawDescriptor for Tube { + fn as_raw_descriptor(&self) -> RawDescriptor { + self.socket.as_raw_descriptor() + } +} + +impl AsRawFd for Tube { + fn as_raw_fd(&self) -> RawFd { + self.socket.as_raw_fd() + } +} + +impl ReadNotifier for Tube { + fn get_read_notifier(&self) -> &dyn AsRawDescriptor { + &self.socket + } +} + +impl FromRawDescriptor for Tube { + /// # Safety: + /// Requirements: + /// (1) The caller owns rd. + /// (2) When the call completes, ownership of rd has transferred to the returned value. + unsafe fn from_raw_descriptor(rd: RawDescriptor) -> Self { + Self { + socket: UnixSeqpacket::from_raw_descriptor(rd), + _unsync_marker: PhantomData, + } + } +} + +impl AsRawFd for SendTube { + fn as_raw_fd(&self) -> RawFd { + self.0.as_raw_descriptor() + } +} + +impl AsRawFd for RecvTube { + fn as_raw_fd(&self) -> RawFd { + self.0.as_raw_descriptor() + } +} diff --git a/base/src/win/tube.rs b/base/src/win/tube.rs new file mode 100644 index 0000000000..4791effad2 --- /dev/null +++ b/base/src/win/tube.rs @@ -0,0 +1,446 @@ +// Copyright 2021 The Chromium OS Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +use std::io::{self, Cursor, Read, Write}; +use std::time::Duration; + +use crate::tube::{Error, RecvTube, Result, SendTube}; +use crate::{ + BlockingMode, CloseNotifier, FramingMode, FromRawDescriptor, PollToken, ReadNotifier, + SafeDescriptor, StreamChannel, +}; +use cros_async::{unblock, Executor}; +use data_model::DataInit; +use lazy_static::lazy_static; +use serde::{de::DeserializeOwned, Deserialize, Serialize, Serializer}; +use std::mem; +use std::os::windows::io::{AsRawHandle, RawHandle}; +use std::sync::{Arc, Mutex}; +use win_sys_util::{ + deserialize_with_descriptors, AsRawDescriptor, Descriptor, RawDescriptor, SerializeDescriptors, +}; +use winapi::shared::winerror::ERROR_MORE_DATA; + +/// Bidirectional tube that support both send and recv. +/// +/// NOTE: serializing this type across processes is slightly involved. Suppose there is a Tube pair +/// (A, B). We wish to send B to another process, and communicate with it using A from the current +/// process: +/// 1. B's target_pid must be set to the current PID *before* serialization. There is a +/// serialization hook that sets it to the current PID automatically if target_pid is unset. +/// 2. A's target_pid must be set to the PID of the process where B was sent. +/// +/// If instead you are sending both A and B to separate processes, then: +/// 1. A's target_pid must be set to B's pid, manually. +/// 2. B's target_pid must be set to A's pid, manually. +/// +/// Automating all of this and getting a completely clean interface is tricky. We would need +/// intercept the serialization of Tubes in any part of Serde messages, and use Weak refs to sync +/// state about PIDs between the ends. There are alternatives like reusing the underlying +/// StreamChannel to share PIDs, or having a separate pipe just for this purpose; however, we've yet +/// to find a compelling solution that isn't a mess to implement. Suggestions are welcome. +#[derive(Serialize, Deserialize, Debug)] +pub struct Tube { + socket: StreamChannel, + + // Default target_pid to current PID on serialization (see `Tube` comment header for details). + #[serde(serialize_with = "set_tube_pid_on_serialize")] + target_pid: Option, +} + +/// For a Tube which has not had its target_pid set, when it is serialized, we should automatically +/// default it to the current process, because the other end will be in the current process. +fn set_tube_pid_on_serialize( + existing_pid_value: &Option, + serializer: S, +) -> std::result::Result +where + S: Serializer, +{ + match existing_pid_value { + Some(pid) => serializer.serialize_u32(*pid), + None => serializer.serialize_u32(ALIAS_PID.lock().unwrap_or(std::process::id())), + } +} + +#[derive(Copy, Clone, Debug)] +#[repr(C)] +struct MsgHeader { + msg_json_size: usize, + descriptor_json_size: usize, +} + +// Safe because it only has data and has no implicit padding. +unsafe impl DataInit for MsgHeader {} + +lazy_static! { + static ref DH_TUBE: sync::Mutex> = sync::Mutex::new(None); + static ref ALIAS_PID: sync::Mutex> = sync::Mutex::new(None); +} + +/// Set a tube to delegate duplicate handle calls. +pub fn set_duplicate_handle_tube(dh_tube: DuplicateHandleTube) { + DH_TUBE.lock().replace(dh_tube); +} + +/// Set alias pid for use with a DuplicateHandleTube. +pub fn set_alias_pid(alias_pid: u32) { + ALIAS_PID.lock().replace(alias_pid); +} + +impl Tube { + /// Create a pair of connected tubes. Request is sent in one direction while response is + /// received in the other direction. + /// The result is in the form (server, client). + pub fn pair() -> Result<(Tube, Tube)> { + let (socket1, socket2) = StreamChannel::pair(BlockingMode::Blocking, FramingMode::Message) + .map_err(|e| Error::Pair(io::Error::from_raw_os_error(e.errno())))?; + + Ok((Tube::new(socket1), Tube::new(socket2))) + } + + /// Create a pair of connected tubes with the specified buffer size. + /// Request is sent in one direction while response is received in the other direction. + /// The result is in the form (server, client). + pub fn pair_with_buffer_size(buffer_size: usize) -> Result<(Tube, Tube)> { + let (socket1, socket2) = StreamChannel::pair_with_buffer_size( + BlockingMode::Blocking, + FramingMode::Message, + buffer_size, + ) + .map_err(|e| Error::Pair(io::Error::from_raw_os_error(e.errno())))?; + let tube1 = Tube::new(socket1); + let tube2 = Tube::new(socket2); + Ok((tube1, tube2)) + } + + // Create a new `Tube`. + pub fn new(socket: StreamChannel) -> Tube { + Tube { + socket, + target_pid: None, + } + } + + pub(super) fn try_clone(&self) -> Result { + Ok(Tube { + socket: self.socket.try_clone().map_err(Error::Clone)?, + target_pid: self.target_pid, + }) + } + + pub fn send(&self, msg: &T) -> Result<()> { + serialize_and_send(|buf| self.socket.write_immutable(buf), msg, self.target_pid) + } + + pub fn recv(&self) -> Result { + deserialize_and_recv(|buf| (&*&self.socket).read(buf)) + } + + /// NOTE: On Windows this will only succeed if called on a server pipe. See #pair + /// documentation to ensure you have a server pipe before calling. + #[cfg(windows)] + pub fn flush_blocking(&mut self) -> Result<()> { + self.socket.flush_blocking().map_err(Error::Flush) + } + + /// For Tubes that span processes, this method must be used to set the PID of the other end + /// of the Tube, otherwise sending handles to the other end won't work. + pub fn set_target_pid(&mut self, target_pid: u32) { + self.target_pid = Some(target_pid); + } + + /// Returns the PID of the process at the other end of the Tube, if any is set. + pub fn target_pid(&self) -> Option { + self.target_pid + } + + /// TODO(b/145998747, b/184398671): this method should be removed. + pub fn set_send_timeout(&self, _timeout: Option) -> Result<()> { + unimplemented!("To be removed/refactored upstream."); + } + + /// TODO(b/145998747, b/184398671): this method should be removed. + pub fn set_recv_timeout(&self, _timeout: Option) -> Result<()> { + unimplemented!("To be removed/refactored upstream."); + } +} + +pub fn serialize_and_send io::Result>( + write_fn: F, + msg: &T, + target_pid: Option, +) -> Result<()> { + let msg_serialize = SerializeDescriptors::new(&msg); + let msg_json = serde_json::to_vec(&msg_serialize).map_err(Error::Json)?; + let msg_descriptors = msg_serialize.into_descriptors(); + + let mut duped_descriptors = Vec::with_capacity(msg_descriptors.len()); + for desc in msg_descriptors { + // Safe because these handles are guaranteed to be valid. Details: + // 1. They come from sys_util::descriptor_reflection::with_as_descriptor. + // 2. with_as_descriptor is intended to be applied to owned descriptor types (e.g. File, + // SafeDescriptor). + // 3. The owning object is borrowed by msg until sending is complete. + duped_descriptors.push(duplicate_handle(desc, target_pid)? as usize) + } + + let descriptor_json = if duped_descriptors.is_empty() { + None + } else { + Some(serde_json::to_vec(&duped_descriptors).map_err(Error::Json)?) + }; + + let header = MsgHeader { + msg_json_size: msg_json.len(), + descriptor_json_size: descriptor_json.as_ref().map_or(0, |json| json.len()), + }; + + let mut data_packet = Cursor::new(Vec::with_capacity( + header.as_slice().len() + header.msg_json_size + header.descriptor_json_size, + )); + data_packet + .write(header.as_slice()) + .map_err(Error::SendIoBuf)?; + data_packet + .write(msg_json.as_slice()) + .map_err(Error::SendIoBuf)?; + if let Some(descriptor_json) = descriptor_json { + data_packet + .write(descriptor_json.as_slice()) + .map_err(Error::SendIoBuf)?; + } + + // Multiple writers (producers) are safe because each write is atomic. + let data_bytes = data_packet.into_inner(); + + write_fn(&data_bytes).map_err(Error::SendIo)?; + Ok(()) +} + +fn duplicate_handle(desc: RawHandle, target_pid: Option) -> Result { + match target_pid { + Some(pid) => match &*DH_TUBE.lock() { + Some(tube) => tube.request_duplicate_handle(pid, desc), + None => { + win_util::duplicate_handle_with_target_pid(desc, pid).map_err(Error::DupDescriptor) + } + }, + None => win_util::duplicate_handle(desc).map_err(Error::DupDescriptor), + } +} + +/// Reads a part of a Tube packet asserting that it was correctly read. This means: +/// * Treats partial "message" (transport framing) reads are Ok, as long as we filled our buffer. +/// We use this to ignore errors when reading the message header, which has the lengths we need +/// to allocate our buffers for the remainder of the message. +/// * We filled the supplied buffer. +fn perform_read io::Result>( + read_fn: &F, + buf: &mut [u8], +) -> io::Result { + let res = match read_fn(buf) { + Ok(s) => Ok(s), + Err(e) + if e.raw_os_error() + .map_or(false, |errno| errno == ERROR_MORE_DATA as i32) => + { + Ok(buf.len()) + } + Err(e) => Err(e), + }; + + let bytes_read = res?; + if bytes_read != buf.len() { + Err(io::Error::new( + io::ErrorKind::UnexpectedEof, + "failed to fill whole buffer", + )) + } else { + Ok(bytes_read) + } +} + +/// Deserializes a Tube packet by calling the supplied read function. This function MUST +/// assert that the buffer was filled. +pub fn deserialize_and_recv io::Result>( + read_fn: F, +) -> Result { + let mut header_bytes = vec![0u8; mem::size_of::()]; + perform_read(&read_fn, header_bytes.as_mut_slice()).map_err(Error::Recv)?; + + // Safe because the header is always written by the send function, and only that function + // writes to this channel. + let header = + MsgHeader::from_slice(header_bytes.as_slice()).expect("Tube header failed to deserialize."); + + let mut msg_json = vec![0u8; header.msg_json_size]; + perform_read(&read_fn, msg_json.as_mut_slice()).map_err(Error::Recv)?; + + if msg_json.is_empty() { + // This means we got a message header, but there is no json body (due to a zero size in + // the header). This should never happen because it means the receiver is getting no + // data whatsoever from the sender. + return Err(Error::RecvUnexpectedEmptyBody); + } + + let msg_descriptors: Vec = if header.descriptor_json_size > 0 { + let mut msg_descriptors_json = vec![0u8; header.descriptor_json_size]; + perform_read(&read_fn, msg_descriptors_json.as_mut_slice()).map_err(Error::Recv)?; + let descriptor_usizes: Vec = + serde_json::from_slice(msg_descriptors_json.as_slice()).map_err(Error::Json)?; + + // Safe because the usizes are RawDescriptors that were converted to usize in the send + // method. + descriptor_usizes + .iter() + .map(|item| *item as RawDescriptor) + .collect() + } else { + Vec::new() + }; + + let mut msg_descriptors_safe = msg_descriptors + .into_iter() + .map(|v| { + Some(unsafe { + // Safe because the socket returns new fds that are owned locally by this scope. + SafeDescriptor::from_raw_descriptor(v) + }) + }) + .collect(); + + deserialize_with_descriptors( + || serde_json::from_slice(&msg_json), + &mut msg_descriptors_safe, + ) + .map_err(Error::Json) +} + +#[derive(PollToken, Eq, PartialEq, Copy, Clone)] +enum Token { + SocketReady, +} + +impl AsRawDescriptor for Tube { + fn as_raw_descriptor(&self) -> RawDescriptor { + self.socket.as_raw_descriptor() + } +} + +impl AsRawHandle for Tube { + fn as_raw_handle(&self) -> RawHandle { + self.as_raw_descriptor() + } +} + +impl ReadNotifier for Tube { + fn get_read_notifier(&self) -> &dyn AsRawDescriptor { + self.socket.get_read_notifier() + } +} + +impl CloseNotifier for Tube { + fn get_close_notifier(&self) -> &dyn AsRawDescriptor { + self.socket.get_close_notifier() + } +} + +impl AsRawHandle for SendTube { + fn as_raw_handle(&self) -> RawHandle { + self.0.as_raw_descriptor() + } +} + +impl AsRawHandle for RecvTube { + fn as_raw_handle(&self) -> RawHandle { + self.0.as_raw_descriptor() + } +} + +/// A request to duplicate a handle to a target process. +#[derive(Serialize, Deserialize, Debug)] +pub struct DuplicateHandleRequest { + pub target_alias_pid: u32, + pub handle: usize, +} + +/// Contains a duplicated handle or None if an error occurred. +#[derive(Serialize, Deserialize, Debug)] +pub struct DuplicateHandleResponse { + pub handle: Option, +} + +/// Wrapper for tube which is used to delegate DuplicateHandle function calls to +/// the broker process. +#[derive(Serialize, Deserialize, Debug)] +pub struct DuplicateHandleTube(Tube); + +impl DuplicateHandleTube { + pub fn new(tube: Tube) -> Self { + Self(tube) + } + + pub fn request_duplicate_handle( + &self, + target_alias_pid: u32, + handle: RawHandle, + ) -> Result { + let req = DuplicateHandleRequest { + target_alias_pid, + handle: handle as usize, + }; + self.0.send(&req)?; + let res: DuplicateHandleResponse = self.0.recv()?; + res.handle + .map(|h| h as RawHandle) + .ok_or(Error::BrokerDupDescriptor) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::{EventContext, EventTrigger, PollToken, ReadNotifier}; + use std::time; + + const EVENT_WAIT_TIME: time::Duration = time::Duration::from_secs(10); + + #[derive(PollToken, Debug, Eq, PartialEq, Copy, Clone)] + enum Token { + ReceivedData, + } + + #[test] + fn test_serialize_tube() { + let (tube_1, tube_2) = Tube::pair().unwrap(); + let event_ctx: EventContext = EventContext::build_with(&[EventTrigger::from( + tube_2.get_read_notifier(), + Token::ReceivedData, + )]) + .unwrap(); + + // Serialize the Tube + let msg_serialize = SerializeDescriptors::new(&tube_1); + let serialized = serde_json::to_vec(&msg_serialize).unwrap(); + let msg_descriptors = msg_serialize.into_descriptors(); + + // Deserialize the Tube + let mut msg_descriptors_safe = msg_descriptors + .into_iter() + .map(|v| Some(unsafe { SafeDescriptor::from_raw_descriptor(v) })) + .collect(); + let tube_deserialized: Tube = deserialize_with_descriptors( + || serde_json::from_slice(&serialized), + &mut msg_descriptors_safe, + ) + .unwrap(); + + // Send a message through deserialized Tube + tube_deserialized.send(&"hi".to_string()).unwrap(); + + assert_eq!(event_ctx.wait_timeout(EVENT_WAIT_TIME).unwrap().len(), 1); + assert_eq!(tube_2.recv::().unwrap(), "hi"); + } +} diff --git a/common/sys_util/src/eventfd.rs b/common/sys_util/src/eventfd.rs index 0a45f7d86a..f4f6cac0e5 100644 --- a/common/sys_util/src/eventfd.rs +++ b/common/sys_util/src/eventfd.rs @@ -92,7 +92,7 @@ impl EventFd { /// a timeout does not occur then the count is returned as a EventReadResult::Count(count), /// and the count is reset to 0. If a timeout does occur then this function will return /// EventReadResult::Timeout. - pub fn read_timeout(&mut self, timeout: Duration) -> Result { + pub fn read_timeout(&self, timeout: Duration) -> Result { let mut pfd = libc::pollfd { fd: self.as_raw_descriptor(), events: POLLIN, @@ -217,7 +217,7 @@ mod tests { #[test] fn timeout() { - let mut evt = EventFd::new().expect("failed to create eventfd"); + let evt = EventFd::new().expect("failed to create eventfd"); assert_eq!( evt.read_timeout(Duration::from_millis(1)) .expect("failed to read from eventfd with timeout"), diff --git a/common/win_sys_util/src/win/named_pipes.rs b/common/win_sys_util/src/win/named_pipes.rs index ec1044d4cc..7d9e8f6cf5 100644 --- a/common/win_sys_util/src/win/named_pipes.rs +++ b/common/win_sys_util/src/win/named_pipes.rs @@ -43,7 +43,7 @@ use winapi::{ }; /// The default buffer size for all named pipes in the system. If this size is too small, writers -/// on named pipes that expect not to block can block until the reading side empties the buffer. +/// on named pipes that expect not to block *can* block until the reading side empties the buffer. /// /// The general rule is this should be *at least* as big as the largest message, otherwise /// unexpected blocking behavior can result; for example, if too small, this can interact badly with @@ -714,6 +714,11 @@ impl PipeConnection { } } + /// Get the framing mode of the pipe. + pub fn get_framing_mode(&self) -> FramingMode { + self.framing_mode + } + /// Returns metadata about the connected NamedPipe. pub fn get_info(&self, is_server_connection: bool) -> Result { let mut flags: u32 = 0; diff --git a/common/win_sys_util/src/win/stream_channel.rs b/common/win_sys_util/src/win/stream_channel.rs index 3178e7de06..d5afe6424c 100644 --- a/common/win_sys_util/src/win/stream_channel.rs +++ b/common/win_sys_util/src/win/stream_channel.rs @@ -31,10 +31,15 @@ impl From for named_pipes::BlockingMode { } } +pub const DEFAULT_BUFFER_SIZE: usize = 50 * 1024; + /// An abstraction over named pipes and unix socketpairs. /// /// The ReadNotifier will return an event handle that is set when data is in the channel. /// +/// In message mode, single writes larger than +/// `win_sys_util::named_pipes::DEFAULT_BUFFER_SIZE` are not permitted. +/// /// # Notes for maintainers /// 1. This struct contains extremely subtle thread safety considerations. /// 2. Serialization is not derived! New fields need to be added manually. @@ -75,6 +80,10 @@ pub struct StreamChannel { #[serde(skip)] #[serde(default = "create_true_cell")] is_channel_closed_on_drop: RefCell, + + // For StreamChannels created via pair_with_buffer_size, allows the channel to accept messages + // up to that size. + send_buffer_size: usize, } fn create_read_lock() -> Arc> { @@ -93,13 +102,14 @@ impl Serialize for StreamChannel { where S: Serializer, { - let mut s = serializer.serialize_struct("StreamChannel", 6)?; + let mut s = serializer.serialize_struct("StreamChannel", 7)?; s.serialize_field("pipe_conn", &self.pipe_conn)?; s.serialize_field("write_notify", &self.write_notify)?; s.serialize_field("read_notify", &self.read_notify)?; s.serialize_field("pipe_closed", &self.pipe_closed)?; s.serialize_field("remote_write_lock", &self.remote_write_lock)?; s.serialize_field("local_write_lock", &self.local_write_lock)?; + s.serialize_field("send_buffer_size", &self.send_buffer_size)?; let ret = s.end(); // Because this end has been serialized, the serialized copy is now responsible for setting @@ -134,10 +144,8 @@ impl StreamChannel { } } - // WARNING: Generally, multiple StreamChannels are not wanted. StreamChannels can only - // have 1 reader. However, this is used for the net process backend to work with Slirp, since - // one StreamChannel clone is put in a reader only loop and another is put in an writer only - // loop. + // WARNING: Generally, multiple StreamChannel ends are not wanted. StreamChannel behavior with + // > 1 reader per end is not defined. pub fn try_clone(&self) -> io::Result { Ok(StreamChannel { pipe_conn: self.pipe_conn.try_clone()?, @@ -148,6 +156,7 @@ impl StreamChannel { local_write_lock: self.local_write_lock.try_clone()?, read_lock: self.read_lock.clone(), is_channel_closed_on_drop: create_true_cell(), + send_buffer_size: self.send_buffer_size, }) } @@ -242,6 +251,19 @@ impl StreamChannel { /// Exists as a workaround for Tube which does not expect its transport to be mutable, /// even though io::Write requires it. pub fn write_immutable(&self, buf: &[u8]) -> io::Result { + if self.pipe_conn.get_framing_mode() == named_pipes::FramingMode::Message + && buf.len() > self.send_buffer_size + { + return Err(io::Error::new( + io::ErrorKind::Other, + format!( + "StreamChannel forbids message mode writes larger than the \ + default buffer size of {}.", + self.send_buffer_size, + ), + )); + } + let _lock = self.local_write_lock.lock(); let res = self.pipe_conn.write(buf); @@ -268,6 +290,7 @@ impl StreamChannel { pub fn from_pipes( pipe_a: PipeConnection, pipe_b: PipeConnection, + send_buffer_size: usize, ) -> Result<(StreamChannel, StreamChannel)> { let (notify_a_write, notify_b_write) = (Event::new()?, Event::new()?); let pipe_closed = Event::new()?; @@ -284,6 +307,7 @@ impl StreamChannel { remote_write_lock: write_lock_b.try_clone()?, pipe_closed: pipe_closed.try_clone()?, is_channel_closed_on_drop: create_true_cell(), + send_buffer_size, }; let sock_b = StreamChannel { pipe_conn: pipe_b, @@ -294,10 +318,13 @@ impl StreamChannel { remote_write_lock: write_lock_a, pipe_closed, is_channel_closed_on_drop: create_true_cell(), + send_buffer_size, }; Ok((sock_a, sock_b)) } + /// Create a pair with a specific buffer size. Note that this is the only way to send messages + /// larger than the default named pipe buffer size. pub fn pair_with_buffer_size( blocking_mode: BlockingMode, framing_mode: FramingMode, @@ -310,7 +337,7 @@ impl StreamChannel { buffer_size, false, )?; - Self::from_pipes(pipe_a, pipe_b) + Self::from_pipes(pipe_a, pipe_b, buffer_size) } /// Creates a cross platform channel pair. /// On Windows the result is in the form (server, client). @@ -318,12 +345,14 @@ impl StreamChannel { blocking_mode: BlockingMode, framing_mode: FramingMode, ) -> Result<(StreamChannel, StreamChannel)> { - let (pipe_a, pipe_b) = named_pipes::pair( + let (pipe_a, pipe_b) = named_pipes::pair_with_buffer_size( &named_pipes::FramingMode::from(framing_mode), &named_pipes::BlockingMode::from(blocking_mode), 0, + DEFAULT_BUFFER_SIZE, + false, )?; - Self::from_pipes(pipe_a, pipe_b) + Self::from_pipes(pipe_a, pipe_b, DEFAULT_BUFFER_SIZE) } /// Blocks until the pipe buffer is empty. diff --git a/cros_async/src/async_types.rs b/cros_async/src/async_types.rs index de49f64119..8054de1216 100644 --- a/cros_async/src/async_types.rs +++ b/cros_async/src/async_types.rs @@ -2,12 +2,17 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. -use crate::{Executor, IntoAsync, IoSourceExt}; -use base::{AsRawDescriptor, Tube, TubeResult}; -use serde::de::DeserializeOwned; -use std::ops::Deref; +use crate::{Executor, IntoAsync}; +use base::{AsRawDescriptor, RecvTube, SendTube, Tube, TubeResult}; +use serde::{de::DeserializeOwned, Serialize}; +use std::io; use std::os::unix::io::{AsRawFd, RawFd}; +#[cfg_attr(windows, path = "win/async_types.rs")] +#[cfg_attr(not(windows), path = "unix/async_types.rs")] +mod async_types; +pub use async_types::*; + /// Like `cros_async::IntoAsync`, except for use with crosvm's AsRawDescriptor /// trait object family. pub trait DescriptorIntoAsync: AsRawDescriptor {} @@ -27,34 +32,40 @@ where impl IntoAsync for DescriptorAdapter where T: DescriptorIntoAsync {} impl IntoAsync for Tube {} +impl IntoAsync for SendTube {} +impl IntoAsync for RecvTube {} -pub struct AsyncTube { - inner: Box>, -} - -impl AsyncTube { - pub fn new(ex: &Executor, tube: Tube) -> std::io::Result { - return Ok(AsyncTube { - inner: ex.async_from(tube)?, - }); +pub struct RecvTubeAsync(AsyncTube); +#[allow(dead_code)] +impl RecvTubeAsync { + pub fn new(tube: RecvTube, ex: &Executor) -> io::Result { + Ok(Self(AsyncTube::new( + ex, + #[allow(deprecated)] + tube.into_tube(), + )?)) } - pub async fn next(&self) -> TubeResult { - self.inner.wait_readable().await.unwrap(); - self.inner.as_source().recv() + /// TODO(b/145998747, b/184398671): this async approach needs to be refactored + /// upstream, but for now is implemented to work using simple blocking futures + /// (avoiding the unimplemented wait_readable). + pub async fn next(&self) -> TubeResult { + self.0.next().await } } -impl Deref for AsyncTube { - type Target = Tube; +pub struct SendTubeAsync(AsyncTube); +#[allow(dead_code)] +impl SendTubeAsync { + pub fn new(tube: SendTube, ex: &Executor) -> io::Result { + Ok(Self(AsyncTube::new( + ex, + #[allow(deprecated)] + tube.into_tube(), + )?)) + } - fn deref(&self) -> &Self::Target { - self.inner.as_source() - } -} - -impl From for Tube { - fn from(at: AsyncTube) -> Tube { - at.inner.into_source() + pub async fn send(&self, msg: T) -> TubeResult<()> { + self.0.send(msg).await } } diff --git a/cros_async/src/unix/async_types.rs b/cros_async/src/unix/async_types.rs new file mode 100644 index 0000000000..c1653c2442 --- /dev/null +++ b/cros_async/src/unix/async_types.rs @@ -0,0 +1,42 @@ +// Copyright 2022 The Chromium OS Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. +use crate::{Executor, IoSourceExt}; +use base::{Tube, TubeResult}; +use serde::{de::DeserializeOwned, Serialize}; +use std::io; +use std::ops::Deref; + +pub struct AsyncTube { + inner: Box>, +} + +impl AsyncTube { + pub fn new(ex: &Executor, tube: Tube) -> io::Result { + return Ok(AsyncTube { + inner: ex.async_from(tube)?, + }); + } + pub async fn next(&self) -> TubeResult { + self.inner.wait_readable().await.unwrap(); + self.inner.as_source().recv() + } + + pub async fn send(&self, msg: T) -> TubeResult<()> { + self.inner.as_source().send(&msg) + } +} + +impl Deref for AsyncTube { + type Target = Tube; + + fn deref(&self) -> &Self::Target { + self.inner.as_source() + } +} + +impl From for Tube { + fn from(at: AsyncTube) -> Tube { + at.inner.into_source() + } +} diff --git a/cros_async/src/win/async_types.rs b/cros_async/src/win/async_types.rs new file mode 100644 index 0000000000..bf9c54b4fb --- /dev/null +++ b/cros_async/src/win/async_types.rs @@ -0,0 +1,58 @@ +// Copyright 2020 The Chromium OS Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +use crate::{Executor, HandleWrapper}; +use base::{AsRawDescriptor, Descriptor, RecvTube, SendTube, Tube, TubeResult}; +use serde::de::DeserializeOwned; +use std::io; +use std::os::windows::io::AsRawHandle; +use std::sync::{Arc, Mutex}; + +pub struct AsyncTube { + inner: Arc>, +} + +impl AsyncTube { + pub fn new(ex: &Executor, tube: Tube) -> io::Result { + Ok(AsyncTube { + inner: Arc::new(Mutex::new(tube)), + }) + } + + /// TODO(b/145998747, b/184398671): this async approach needs to be refactored + /// upstream, but for now is implemented to work using simple blocking futures + /// (avoiding the unimplemented wait_readable). + pub async fn next(&self) -> TubeResult { + let tube = Arc::clone(&self.inner); + let handles = HandleWrapper::new(vec![Descriptor(tube.lock().unwrap().as_raw_handle())]); + unblock( + move || tube.lock().unwrap().recv(), + move || Err(handles.lock().cancel_sync_io(Error::OperationCancelled)), + ) + .await + } + + pub async fn send(&self, msg: T) -> TubeResult<()> { + let tube = Arc::clone(&self.inner); + let handles = HandleWrapper::new(vec![Descriptor(tube.lock().unwrap().as_raw_handle())]); + unblock( + move || tube.lock().unwrap().send(&msg), + move || Err(handles.lock().cancel_sync_io(Error::OperationCancelled)), + ) + .await + } +} + +impl From for Tube { + fn from(at: AsyncTube) -> Tube { + // We ensure this is safe by waiting to acquire the mutex on + // the tube before unwrapping. This only works because the + // worker thread in "next" holds the mutex for its entire life. + // + // This does however mean that into will block until all async + // operations are complete. + let _ = at.inner.lock().unwrap(); + Arc::try_unwrap(at.inner).unwrap().into_inner().unwrap() + } +} diff --git a/devices/src/irqchip/kvm/x86_64.rs b/devices/src/irqchip/kvm/x86_64.rs index e535fe526e..3d13d399c7 100644 --- a/devices/src/irqchip/kvm/x86_64.rs +++ b/devices/src/irqchip/kvm/x86_64.rs @@ -950,7 +950,7 @@ mod tests { // setup an event and a resample event for irq line 1 let evt = Event::new().expect("failed to create event"); - let mut resample_evt = Event::new().expect("failed to create event"); + let resample_evt = Event::new().expect("failed to create event"); let evt_index = chip .register_irq_event(1, &evt, Some(&resample_evt)) @@ -1083,7 +1083,7 @@ mod tests { // setup an event and a resample event for irq line 1 let evt = Event::new().expect("failed to create event"); - let mut resample_evt = Event::new().expect("failed to create event"); + let resample_evt = Event::new().expect("failed to create event"); chip.register_irq_event(1, &evt, Some(&resample_evt)) .expect("failed to register_irq_event"); diff --git a/devices/src/virtio/balloon.rs b/devices/src/virtio/balloon.rs index a4235fe453..82645b45c1 100644 --- a/devices/src/virtio/balloon.rs +++ b/devices/src/virtio/balloon.rs @@ -707,6 +707,7 @@ impl VirtioDevice for Balloon { self.kill_evt = Some(self_kill_evt); let state = self.state.clone(); + #[allow(deprecated)] let command_tube = match self.command_tube.try_clone() { Ok(tube) => tube, Err(e) => { diff --git a/devices/src/virtio/block/asynchronous.rs b/devices/src/virtio/block/asynchronous.rs index 58a89fe173..64fc80a372 100644 --- a/devices/src/virtio/block/asynchronous.rs +++ b/devices/src/virtio/block/asynchronous.rs @@ -292,8 +292,10 @@ async fn handle_command_tube( } }; + let resp_clone = resp.clone(); command_tube - .send(&resp) + .send(resp_clone) + .await .map_err(ExecuteError::SendingResponse)?; if let DiskControlResult::Ok = resp { interrupt.borrow().signal_config_changed(); diff --git a/devices/src/virtio/iommu.rs b/devices/src/virtio/iommu.rs index c7a7ea2277..16b31821aa 100644 --- a/devices/src/virtio/iommu.rs +++ b/devices/src/virtio/iommu.rs @@ -574,7 +574,7 @@ impl Worker { Self::handle_vfio(mem, vfio_cmd, endpoints, hp_endpoints_ranges) } }; - if let Err(e) = command_tube.send(&response) { + if let Err(e) = command_tube.send(response).await { error!("{}", IommuError::VirtioIOMMUResponseError(e)); } } @@ -682,7 +682,7 @@ async fn handle_translate_request( .get(&endpoint_id) .unwrap() .send( - &mapper + mapper .lock() .translate(iova, size) .map_err(|e| { @@ -691,6 +691,7 @@ async fn handle_translate_request( }) .ok(), ) + .await .map_err(IommuError::Tube)?; } else { error!("endpoint_id {} not found", endpoint_id) diff --git a/devices/src/virtio/vhost/user/device/gpu.rs b/devices/src/virtio/vhost/user/device/gpu.rs index a10c38e4a2..4689954ca8 100644 --- a/devices/src/virtio/vhost/user/device/gpu.rs +++ b/devices/src/virtio/vhost/user/device/gpu.rs @@ -213,7 +213,7 @@ impl VhostUserBackend for GpuBackend { } }; - if let Err(e) = tube.send(&response) { + if let Err(e) = tube.send(response).await { error!("Failed to send `PciBarConfiguration`: {}", e); } diff --git a/devices/src/virtio/vhost/user/device/wl.rs b/devices/src/virtio/vhost/user/device/wl.rs index ac34e85a21..6b7fc32fce 100644 --- a/devices/src/virtio/vhost/user/device/wl.rs +++ b/devices/src/virtio/vhost/user/device/wl.rs @@ -327,7 +327,7 @@ pub fn run_wl_device(program_name: &str, args: &[&str]) -> anyhow::Result<()> { let wayland_paths: BTreeMap<_, _> = wayland_sock.into_iter().collect(); let resource_bridge = resource_bridge - .map(|p| { + .map(|p| -> anyhow::Result { let deadline = Instant::now() + Duration::from_secs(5); loop { match UnixSeqpacket::connect(&p) { @@ -336,7 +336,7 @@ pub fn run_wl_device(program_name: &str, args: &[&str]) -> anyhow::Result<()> { if Instant::now() < deadline { thread::sleep(Duration::from_millis(50)); } else { - return Err(e); + return Err(anyhow::Error::new(e)); } } } diff --git a/src/linux/device_helpers.rs b/src/linux/device_helpers.rs index a131dd1e25..958c506f10 100644 --- a/src/linux/device_helpers.rs +++ b/src/linux/device_helpers.rs @@ -1201,7 +1201,11 @@ pub fn setup_virtio_access_platform( let CreateIpcMapperRet { mapper: ipc_mapper, response_tx, - } = create_ipc_mapper(endpoint_id, request_tx.try_clone()?); + } = create_ipc_mapper( + endpoint_id, + #[allow(deprecated)] + request_tx.try_clone()?, + ); translate_response_senders .get_or_insert_with(BTreeMap::new) .insert(endpoint_id, response_tx); diff --git a/vm_control/src/lib.rs b/vm_control/src/lib.rs index 4a21a0b787..699c366a37 100644 --- a/vm_control/src/lib.rs +++ b/vm_control/src/lib.rs @@ -154,7 +154,7 @@ impl Display for DiskControlCommand { } } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub enum DiskControlResult { Ok, Err(SysError),