base: upstream Tube cross platform support.

Upstreams support for Tubes on Windows, splitting Tube into platform
specific files. This contains several critical enhancements:

* POSIX Tubes support multi producer multi consumer configurations, but
  Windows has remained strictly SPSC for each direction. Windows cannot
  support MPMC, and that configuration is not really something we want
  either. To address that, this CL introduces directional Tubes. A
  SendTube is clonable, and a RecvTube is not, which gives us MPSC.

* This CL also fixes multiple interface conflicts that have developed
  between Linux & Windows:
    + send wasn't async on the Linux AsyncTube.
    + send data wasn't passed as owned on the Linux AsyncTube.
    + Adds the 'static constraint for AsyncTube::send on POSIX. This is an
      requirement on Windows.
    + Event::read_timeout doesn't need to take &mut self, and it wasn't
      downstream. This CL switches to &self.

* Adds the missing notifier.rs file in base.

Note that this CL does not attempt to remove balloon's usage of
Tube::try_clone. That's a somewhat involved issue that should be tackled in
its own CL.

Test: tested downstream on Windows & Linux bots, upstream on Linux bots.

Bug: b:221484449

Change-Id: I288dbc1d1e42f8ce08258cdaaf85100ca93721ef
Reviewed-on: https://chromium-review.googlesource.com/c/chromiumos/platform/crosvm/+/3536897
Reviewed-by: Daniel Verkamp <dverkamp@chromium.org>
Tested-by: kokoro <noreply+kokoro@google.com>
Commit-Queue: Noah Gold <nkgold@google.com>
This commit is contained in:
Noah Gold 2022-03-18 16:04:25 -07:00 committed by Chromeos LUCI
parent 8e0d4a99bf
commit c286772db8
21 changed files with 996 additions and 167 deletions

View file

@ -36,7 +36,7 @@ impl Event {
self.0.read()
}
pub fn read_timeout(&mut self, timeout: Duration) -> Result<EventReadResult> {
pub fn read_timeout(&self, timeout: Duration) -> Result<EventReadResult> {
self.0.read_timeout(timeout)
}

View file

@ -23,6 +23,7 @@ pub mod common;
mod event;
mod ioctl;
mod mmap;
mod notifiers;
mod shm;
mod timer;
mod tube;
@ -40,9 +41,10 @@ pub use ioctl::{
pub use mmap::{
MemoryMapping, MemoryMappingBuilder, MemoryMappingBuilderUnix, Unix as MemoryMappingUnix,
};
pub use notifiers::*;
pub use shm::{SharedMemory, Unix as SharedMemoryUnix};
pub use timer::{FakeTimer, Timer};
pub use tube::{Error as TubeError, Result as TubeResult, Tube};
pub use tube::{Error as TubeError, RecvTube, Result as TubeResult, SendTube, Tube};
pub use wait_context::{EventToken, EventType, TriggeredEvent, WaitContext};
/// Wraps an AsRawDescriptor in the simple Descriptor struct, which

33
base/src/notifiers.rs Normal file
View file

@ -0,0 +1,33 @@
// Copyright 2022 The Chromium OS Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#[cfg(unix)]
use std::os::unix::net::UnixStream;
use crate::AsRawDescriptor;
pub trait ReadNotifier {
/// Gets a descriptor that can be used in EventContext to wait for events to be available (e.g.
/// to avoid receive_events blocking).
fn get_read_notifier(&self) -> &dyn AsRawDescriptor;
}
pub trait CloseNotifier {
/// Gets a descriptor that can be used in EventContext to wait for the closed event.
fn get_close_notifier(&self) -> &dyn AsRawDescriptor;
}
#[cfg(unix)]
impl ReadNotifier for UnixStream {
fn get_read_notifier(&self) -> &dyn AsRawDescriptor {
self
}
}
#[cfg(unix)]
impl CloseNotifier for UnixStream {
fn get_close_notifier(&self) -> &dyn AsRawDescriptor {
self
}
}

View file

@ -2,39 +2,119 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use std::{
io::{
IoSlice, {self},
},
marker::PhantomData,
os::unix::prelude::{AsRawFd, RawFd},
time::Duration,
};
use crate::{FromRawDescriptor, SafeDescriptor, ScmSocket, UnixSeqpacket, UnsyncMarker};
use crate::unix::{
deserialize_with_descriptors, AsRawDescriptor, RawDescriptor, SerializeDescriptors,
};
use remain::sorted;
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use std::io;
use thiserror::Error as ThisError;
#[cfg_attr(windows, path = "win/tube.rs")]
#[cfg_attr(not(windows), path = "unix/tube.rs")]
mod tube;
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use std::time::Duration;
pub use tube::*;
impl Tube {
/// Creates a Send/Recv pair of Tubes.
pub fn directional_pair() -> Result<(SendTube, RecvTube)> {
let (t1, t2) = Self::pair()?;
Ok((SendTube(t1), RecvTube(t2)))
}
}
#[derive(Serialize, Deserialize)]
#[serde(transparent)]
/// A Tube end which can only send messages. Cloneable.
pub struct SendTube(Tube);
#[allow(dead_code)]
impl SendTube {
/// TODO(b/145998747, b/184398671): this method should be removed.
pub fn set_send_timeout(&self, _timeout: Option<Duration>) -> Result<()> {
unimplemented!("To be removed/refactored upstream.");
}
pub fn send<T: Serialize>(&self, msg: &T) -> Result<()> {
self.0.send(msg)
}
pub fn try_clone(&self) -> Result<Self> {
Ok(SendTube(
#[allow(deprecated)]
self.0.try_clone()?,
))
}
/// Never call this function, it is for use by cros_async to provide
/// directional wrapper types only. Using it in any other context may
/// violate concurrency assumptions. (Type splitting across crates has put
/// us in a situation where we can't use Rust privacy to enforce this.)
#[deprecated]
pub fn into_tube(self) -> Tube {
self.0
}
}
#[derive(Serialize, Deserialize)]
#[serde(transparent)]
/// A Tube end which can only recv messages.
pub struct RecvTube(Tube);
#[allow(dead_code)]
impl RecvTube {
pub fn recv<T: DeserializeOwned>(&self) -> Result<T> {
self.0.recv()
}
/// TODO(b/145998747, b/184398671): this method should be removed.
pub fn set_recv_timeout(&self, _timeout: Option<Duration>) -> Result<()> {
unimplemented!("To be removed/refactored upstream.");
}
/// Never call this function, it is for use by cros_async to provide
/// directional wrapper types only. Using it in any other context may
/// violate concurrency assumptions. (Type splitting across crates has put
/// us in a situation where we can't use Rust privacy to enforce this.)
#[deprecated]
pub fn into_tube(self) -> Tube {
self.0
}
}
#[sorted]
#[derive(ThisError, Debug)]
pub enum Error {
#[error("failed to clone UnixSeqpacket: {0}")]
#[cfg(windows)]
#[error("attempt to duplicate descriptor via broker failed")]
BrokerDupDescriptor,
#[error("failed to clone transport: {0}")]
Clone(io::Error),
#[error("tube was disconnected")]
Disconnected,
#[error("failed to duplicate descriptor: {0}")]
DupDescriptor(io::Error),
#[cfg(windows)]
#[error("failed to flush named pipe: {0}")]
Flush(io::Error),
#[error("failed to serialize/deserialize json from packet: {0}")]
Json(serde_json::Error),
#[error("cancelled a queued async operation")]
OperationCancelled,
#[error("failed to crate tube pair: {0}")]
Pair(io::Error),
#[error("failed to receive packet: {0}")]
Recv(io::Error),
#[error("Received a message with a zero sized body. This should not happen.")]
RecvUnexpectedEmptyBody,
#[error("failed to send packet: {0}")]
#[cfg(unix)]
Send(crate::unix::Error),
#[cfg(windows)]
Send(crate::windows::Error),
#[error("failed to send packet: {0}")]
SendIo(io::Error),
#[error("failed to write packet to intermediate buffer: {0}")]
SendIoBuf(io::Error),
#[error("failed to set recv timeout: {0}")]
SetRecvTimeout(io::Error),
#[error("failed to set send timeout: {0}")]
@ -43,105 +123,6 @@ pub enum Error {
pub type Result<T> = std::result::Result<T, Error>;
/// Bidirectional tube that support both send and recv.
#[derive(Serialize, Deserialize)]
pub struct Tube {
socket: UnixSeqpacket,
_unsync_marker: UnsyncMarker,
}
impl Tube {
/// Create a pair of connected tubes. Request is send in one direction while response is in the
/// other direction.
pub fn pair() -> Result<(Tube, Tube)> {
let (socket1, socket2) = UnixSeqpacket::pair().map_err(Error::Pair)?;
let tube1 = Tube::new(socket1);
let tube2 = Tube::new(socket2);
Ok((tube1, tube2))
}
// Create a new `Tube`.
pub fn new(socket: UnixSeqpacket) -> Tube {
Tube {
socket,
_unsync_marker: PhantomData,
}
}
pub fn try_clone(&self) -> Result<Self> {
self.socket.try_clone().map(Tube::new).map_err(Error::Clone)
}
pub fn send<T: Serialize>(&self, msg: &T) -> Result<()> {
let msg_serialize = SerializeDescriptors::new(&msg);
let msg_json = serde_json::to_vec(&msg_serialize).map_err(Error::Json)?;
let msg_descriptors = msg_serialize.into_descriptors();
self.socket
.send_with_fds(&[IoSlice::new(&msg_json)], &msg_descriptors)
.map_err(Error::Send)?;
Ok(())
}
pub fn recv<T: DeserializeOwned>(&self) -> Result<T> {
let (msg_json, msg_descriptors) =
self.socket.recv_as_vec_with_fds().map_err(Error::Recv)?;
if msg_json.is_empty() {
return Err(Error::Disconnected);
}
let mut msg_descriptors_safe = msg_descriptors
.into_iter()
.map(|v| {
Some(unsafe {
// Safe because the socket returns new fds that are owned locally by this scope.
SafeDescriptor::from_raw_descriptor(v)
})
})
.collect();
deserialize_with_descriptors(
|| serde_json::from_slice(&msg_json),
&mut msg_descriptors_safe,
)
.map_err(Error::Json)
}
pub fn set_send_timeout(&self, timeout: Option<Duration>) -> Result<()> {
self.socket
.set_write_timeout(timeout)
.map_err(Error::SetSendTimeout)
}
pub fn set_recv_timeout(&self, timeout: Option<Duration>) -> Result<()> {
self.socket
.set_read_timeout(timeout)
.map_err(Error::SetRecvTimeout)
}
}
impl FromRawDescriptor for Tube {
unsafe fn from_raw_descriptor(descriptor: RawDescriptor) -> Self {
Tube {
socket: UnixSeqpacket::from_raw_descriptor(descriptor),
_unsync_marker: PhantomData,
}
}
}
impl AsRawDescriptor for Tube {
fn as_raw_descriptor(&self) -> RawDescriptor {
self.socket.as_raw_descriptor()
}
}
impl AsRawFd for Tube {
fn as_raw_fd(&self) -> RawFd {
self.socket.as_raw_fd()
}
}
#[cfg(test)]
mod tests {
use super::*;
@ -150,9 +131,20 @@ mod tests {
use std::{collections::HashMap, time::Duration};
use serde::{Deserialize, Serialize};
use std::sync::{Arc, Barrier};
use std::thread;
#[derive(Serialize, Deserialize)]
struct DataStruct {
x: u32,
}
// Magics to identify which producer sent a message (& detect corruption).
const PRODUCER_ID_1: u32 = 801279273;
const PRODUCER_ID_2: u32 = 345234861;
#[track_caller]
fn test_event_pair(send: Event, mut recv: Event) {
fn test_event_pair(send: Event, recv: Event) {
send.write(1).unwrap();
recv.read_timeout(Duration::from_secs(1)).unwrap();
}
@ -190,6 +182,64 @@ mod tests {
test_event_pair(test_msg.b, recv_msg.b);
}
/// Send messages to a Tube with the given identifier (see `consume_messages`; we use this to
/// track different message producers).
#[track_caller]
fn produce_messages(tube: SendTube, data: u32, barrier: Arc<Barrier>) -> SendTube {
let data = DataStruct { x: data };
barrier.wait();
for _ in 0..100 {
tube.send(&data).unwrap();
}
tube
}
/// Consumes the given number of messages from a Tube, returning the number messages read with
/// each producer ID.
#[track_caller]
fn consume_messages(
tube: RecvTube,
count: usize,
barrier: Arc<Barrier>,
) -> (RecvTube, usize, usize) {
barrier.wait();
let mut id1_count = 0usize;
let mut id2_count = 0usize;
for _ in 0..count {
let msg = tube.recv::<DataStruct>().unwrap();
match msg.x {
PRODUCER_ID_1 => id1_count += 1,
PRODUCER_ID_2 => id2_count += 1,
_ => panic!(
"want message with ID {} or {}; got message w/ ID {}.",
PRODUCER_ID_1, PRODUCER_ID_2, msg.x
),
}
}
(tube, id1_count, id2_count)
}
#[test]
fn send_recv_mpsc() {
let (p1, consumer) = Tube::directional_pair().unwrap();
let p2 = p1.try_clone().unwrap();
let start_block_p1 = Arc::new(Barrier::new(3));
let start_block_p2 = start_block_p1.clone();
let start_block_consumer = start_block_p1.clone();
let p1_thread = thread::spawn(move || produce_messages(p1, PRODUCER_ID_1, start_block_p1));
let p2_thread = thread::spawn(move || produce_messages(p2, PRODUCER_ID_2, start_block_p2));
let (_tube, id1_count, id2_count) = consume_messages(consumer, 200, start_block_consumer);
assert_eq!(id1_count, 100);
assert_eq!(id2_count, 100);
p1_thread.join().unwrap();
p2_thread.join().unwrap();
}
#[test]
fn send_recv_hash_map() {
let (s1, s2) = Tube::pair().unwrap();

View file

@ -93,7 +93,7 @@ impl EventFd {
/// a timeout does not occur then the count is returned as a EventReadResult::Count(count),
/// and the count is reset to 0. If a timeout does occur then this function will return
/// EventReadResult::Timeout.
pub fn read_timeout(&mut self, timeout: Duration) -> Result<EventReadResult> {
pub fn read_timeout(&self, timeout: Duration) -> Result<EventReadResult> {
let mut pfd = libc::pollfd {
fd: self.as_raw_descriptor(),
events: POLLIN,
@ -218,7 +218,7 @@ mod tests {
#[test]
fn timeout() {
let mut evt = EventFd::new().expect("failed to create eventfd");
let evt = EventFd::new().expect("failed to create eventfd");
assert_eq!(
evt.read_timeout(Duration::from_millis(1))
.expect("failed to read from eventfd with timeout"),

145
base/src/unix/tube.rs Normal file
View file

@ -0,0 +1,145 @@
// Copyright 2021 The Chromium OS Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use std::io::IoSlice;
use std::marker::PhantomData;
use std::os::unix::prelude::{AsRawFd, RawFd};
use std::time::Duration;
use crate::{
tube::{Error, RecvTube, Result, SendTube},
unix::deserialize_with_descriptors,
unix::SerializeDescriptors,
AsRawDescriptor, FromRawDescriptor, RawDescriptor, ReadNotifier, SafeDescriptor, ScmSocket,
UnixSeqpacket, UnsyncMarker,
};
use serde::{de::DeserializeOwned, Deserialize, Serialize};
/// Bidirectional tube that support both send and recv.
#[derive(Serialize, Deserialize)]
pub struct Tube {
socket: UnixSeqpacket,
// Windows is !Sync. We share that characteristic to prevent writing cross-platform incompatible
// code.
_unsync_marker: UnsyncMarker,
}
impl Tube {
/// Create a pair of connected tubes. Request is sent in one direction while response is in the
/// other direction.
pub fn pair() -> Result<(Tube, Tube)> {
let (socket1, socket2) = UnixSeqpacket::pair().map_err(Error::Pair)?;
let tube1 = Tube::new(socket1);
let tube2 = Tube::new(socket2);
Ok((tube1, tube2))
}
/// Create a new `Tube`.
pub fn new(socket: UnixSeqpacket) -> Tube {
Tube {
socket,
_unsync_marker: PhantomData,
}
}
/// DO NOT USE this method directly as it will become private soon (b/221484449). Use a
/// directional Tube pair instead.
#[deprecated]
pub fn try_clone(&self) -> Result<Self> {
self.socket.try_clone().map(Tube::new).map_err(Error::Clone)
}
pub fn send<T: Serialize>(&self, msg: &T) -> Result<()> {
let msg_serialize = SerializeDescriptors::new(&msg);
let msg_json = serde_json::to_vec(&msg_serialize).map_err(Error::Json)?;
let msg_descriptors = msg_serialize.into_descriptors();
self.socket
.send_with_fds(&[IoSlice::new(&msg_json)], &msg_descriptors)
.map_err(Error::Send)?;
Ok(())
}
pub fn recv<T: DeserializeOwned>(&self) -> Result<T> {
let (msg_json, msg_descriptors) =
self.socket.recv_as_vec_with_fds().map_err(Error::Recv)?;
if msg_json.is_empty() {
return Err(Error::Disconnected);
}
let mut msg_descriptors_safe = msg_descriptors
.into_iter()
.map(|v| {
Some(unsafe {
// Safe because the socket returns new fds that are owned locally by this scope.
SafeDescriptor::from_raw_descriptor(v)
})
})
.collect();
deserialize_with_descriptors(
|| serde_json::from_slice(&msg_json),
&mut msg_descriptors_safe,
)
.map_err(Error::Json)
}
pub fn set_send_timeout(&self, timeout: Option<Duration>) -> Result<()> {
self.socket
.set_write_timeout(timeout)
.map_err(Error::SetSendTimeout)
}
pub fn set_recv_timeout(&self, timeout: Option<Duration>) -> Result<()> {
self.socket
.set_read_timeout(timeout)
.map_err(Error::SetRecvTimeout)
}
}
impl AsRawDescriptor for Tube {
fn as_raw_descriptor(&self) -> RawDescriptor {
self.socket.as_raw_descriptor()
}
}
impl AsRawFd for Tube {
fn as_raw_fd(&self) -> RawFd {
self.socket.as_raw_fd()
}
}
impl ReadNotifier for Tube {
fn get_read_notifier(&self) -> &dyn AsRawDescriptor {
&self.socket
}
}
impl FromRawDescriptor for Tube {
/// # Safety:
/// Requirements:
/// (1) The caller owns rd.
/// (2) When the call completes, ownership of rd has transferred to the returned value.
unsafe fn from_raw_descriptor(rd: RawDescriptor) -> Self {
Self {
socket: UnixSeqpacket::from_raw_descriptor(rd),
_unsync_marker: PhantomData,
}
}
}
impl AsRawFd for SendTube {
fn as_raw_fd(&self) -> RawFd {
self.0.as_raw_descriptor()
}
}
impl AsRawFd for RecvTube {
fn as_raw_fd(&self) -> RawFd {
self.0.as_raw_descriptor()
}
}

446
base/src/win/tube.rs Normal file
View file

@ -0,0 +1,446 @@
// Copyright 2021 The Chromium OS Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use std::io::{self, Cursor, Read, Write};
use std::time::Duration;
use crate::tube::{Error, RecvTube, Result, SendTube};
use crate::{
BlockingMode, CloseNotifier, FramingMode, FromRawDescriptor, PollToken, ReadNotifier,
SafeDescriptor, StreamChannel,
};
use cros_async::{unblock, Executor};
use data_model::DataInit;
use lazy_static::lazy_static;
use serde::{de::DeserializeOwned, Deserialize, Serialize, Serializer};
use std::mem;
use std::os::windows::io::{AsRawHandle, RawHandle};
use std::sync::{Arc, Mutex};
use win_sys_util::{
deserialize_with_descriptors, AsRawDescriptor, Descriptor, RawDescriptor, SerializeDescriptors,
};
use winapi::shared::winerror::ERROR_MORE_DATA;
/// Bidirectional tube that support both send and recv.
///
/// NOTE: serializing this type across processes is slightly involved. Suppose there is a Tube pair
/// (A, B). We wish to send B to another process, and communicate with it using A from the current
/// process:
/// 1. B's target_pid must be set to the current PID *before* serialization. There is a
/// serialization hook that sets it to the current PID automatically if target_pid is unset.
/// 2. A's target_pid must be set to the PID of the process where B was sent.
///
/// If instead you are sending both A and B to separate processes, then:
/// 1. A's target_pid must be set to B's pid, manually.
/// 2. B's target_pid must be set to A's pid, manually.
///
/// Automating all of this and getting a completely clean interface is tricky. We would need
/// intercept the serialization of Tubes in any part of Serde messages, and use Weak refs to sync
/// state about PIDs between the ends. There are alternatives like reusing the underlying
/// StreamChannel to share PIDs, or having a separate pipe just for this purpose; however, we've yet
/// to find a compelling solution that isn't a mess to implement. Suggestions are welcome.
#[derive(Serialize, Deserialize, Debug)]
pub struct Tube {
socket: StreamChannel,
// Default target_pid to current PID on serialization (see `Tube` comment header for details).
#[serde(serialize_with = "set_tube_pid_on_serialize")]
target_pid: Option<u32>,
}
/// For a Tube which has not had its target_pid set, when it is serialized, we should automatically
/// default it to the current process, because the other end will be in the current process.
fn set_tube_pid_on_serialize<S>(
existing_pid_value: &Option<u32>,
serializer: S,
) -> std::result::Result<S::Ok, S::Error>
where
S: Serializer,
{
match existing_pid_value {
Some(pid) => serializer.serialize_u32(*pid),
None => serializer.serialize_u32(ALIAS_PID.lock().unwrap_or(std::process::id())),
}
}
#[derive(Copy, Clone, Debug)]
#[repr(C)]
struct MsgHeader {
msg_json_size: usize,
descriptor_json_size: usize,
}
// Safe because it only has data and has no implicit padding.
unsafe impl DataInit for MsgHeader {}
lazy_static! {
static ref DH_TUBE: sync::Mutex<Option<DuplicateHandleTube>> = sync::Mutex::new(None);
static ref ALIAS_PID: sync::Mutex<Option<u32>> = sync::Mutex::new(None);
}
/// Set a tube to delegate duplicate handle calls.
pub fn set_duplicate_handle_tube(dh_tube: DuplicateHandleTube) {
DH_TUBE.lock().replace(dh_tube);
}
/// Set alias pid for use with a DuplicateHandleTube.
pub fn set_alias_pid(alias_pid: u32) {
ALIAS_PID.lock().replace(alias_pid);
}
impl Tube {
/// Create a pair of connected tubes. Request is sent in one direction while response is
/// received in the other direction.
/// The result is in the form (server, client).
pub fn pair() -> Result<(Tube, Tube)> {
let (socket1, socket2) = StreamChannel::pair(BlockingMode::Blocking, FramingMode::Message)
.map_err(|e| Error::Pair(io::Error::from_raw_os_error(e.errno())))?;
Ok((Tube::new(socket1), Tube::new(socket2)))
}
/// Create a pair of connected tubes with the specified buffer size.
/// Request is sent in one direction while response is received in the other direction.
/// The result is in the form (server, client).
pub fn pair_with_buffer_size(buffer_size: usize) -> Result<(Tube, Tube)> {
let (socket1, socket2) = StreamChannel::pair_with_buffer_size(
BlockingMode::Blocking,
FramingMode::Message,
buffer_size,
)
.map_err(|e| Error::Pair(io::Error::from_raw_os_error(e.errno())))?;
let tube1 = Tube::new(socket1);
let tube2 = Tube::new(socket2);
Ok((tube1, tube2))
}
// Create a new `Tube`.
pub fn new(socket: StreamChannel) -> Tube {
Tube {
socket,
target_pid: None,
}
}
pub(super) fn try_clone(&self) -> Result<Self> {
Ok(Tube {
socket: self.socket.try_clone().map_err(Error::Clone)?,
target_pid: self.target_pid,
})
}
pub fn send<T: Serialize>(&self, msg: &T) -> Result<()> {
serialize_and_send(|buf| self.socket.write_immutable(buf), msg, self.target_pid)
}
pub fn recv<T: DeserializeOwned>(&self) -> Result<T> {
deserialize_and_recv(|buf| (&*&self.socket).read(buf))
}
/// NOTE: On Windows this will only succeed if called on a server pipe. See #pair
/// documentation to ensure you have a server pipe before calling.
#[cfg(windows)]
pub fn flush_blocking(&mut self) -> Result<()> {
self.socket.flush_blocking().map_err(Error::Flush)
}
/// For Tubes that span processes, this method must be used to set the PID of the other end
/// of the Tube, otherwise sending handles to the other end won't work.
pub fn set_target_pid(&mut self, target_pid: u32) {
self.target_pid = Some(target_pid);
}
/// Returns the PID of the process at the other end of the Tube, if any is set.
pub fn target_pid(&self) -> Option<u32> {
self.target_pid
}
/// TODO(b/145998747, b/184398671): this method should be removed.
pub fn set_send_timeout(&self, _timeout: Option<Duration>) -> Result<()> {
unimplemented!("To be removed/refactored upstream.");
}
/// TODO(b/145998747, b/184398671): this method should be removed.
pub fn set_recv_timeout(&self, _timeout: Option<Duration>) -> Result<()> {
unimplemented!("To be removed/refactored upstream.");
}
}
pub fn serialize_and_send<T: Serialize, F: Fn(&[u8]) -> io::Result<usize>>(
write_fn: F,
msg: &T,
target_pid: Option<u32>,
) -> Result<()> {
let msg_serialize = SerializeDescriptors::new(&msg);
let msg_json = serde_json::to_vec(&msg_serialize).map_err(Error::Json)?;
let msg_descriptors = msg_serialize.into_descriptors();
let mut duped_descriptors = Vec::with_capacity(msg_descriptors.len());
for desc in msg_descriptors {
// Safe because these handles are guaranteed to be valid. Details:
// 1. They come from sys_util::descriptor_reflection::with_as_descriptor.
// 2. with_as_descriptor is intended to be applied to owned descriptor types (e.g. File,
// SafeDescriptor).
// 3. The owning object is borrowed by msg until sending is complete.
duped_descriptors.push(duplicate_handle(desc, target_pid)? as usize)
}
let descriptor_json = if duped_descriptors.is_empty() {
None
} else {
Some(serde_json::to_vec(&duped_descriptors).map_err(Error::Json)?)
};
let header = MsgHeader {
msg_json_size: msg_json.len(),
descriptor_json_size: descriptor_json.as_ref().map_or(0, |json| json.len()),
};
let mut data_packet = Cursor::new(Vec::with_capacity(
header.as_slice().len() + header.msg_json_size + header.descriptor_json_size,
));
data_packet
.write(header.as_slice())
.map_err(Error::SendIoBuf)?;
data_packet
.write(msg_json.as_slice())
.map_err(Error::SendIoBuf)?;
if let Some(descriptor_json) = descriptor_json {
data_packet
.write(descriptor_json.as_slice())
.map_err(Error::SendIoBuf)?;
}
// Multiple writers (producers) are safe because each write is atomic.
let data_bytes = data_packet.into_inner();
write_fn(&data_bytes).map_err(Error::SendIo)?;
Ok(())
}
fn duplicate_handle(desc: RawHandle, target_pid: Option<u32>) -> Result<RawHandle> {
match target_pid {
Some(pid) => match &*DH_TUBE.lock() {
Some(tube) => tube.request_duplicate_handle(pid, desc),
None => {
win_util::duplicate_handle_with_target_pid(desc, pid).map_err(Error::DupDescriptor)
}
},
None => win_util::duplicate_handle(desc).map_err(Error::DupDescriptor),
}
}
/// Reads a part of a Tube packet asserting that it was correctly read. This means:
/// * Treats partial "message" (transport framing) reads are Ok, as long as we filled our buffer.
/// We use this to ignore errors when reading the message header, which has the lengths we need
/// to allocate our buffers for the remainder of the message.
/// * We filled the supplied buffer.
fn perform_read<F: Fn(&mut [u8]) -> io::Result<usize>>(
read_fn: &F,
buf: &mut [u8],
) -> io::Result<usize> {
let res = match read_fn(buf) {
Ok(s) => Ok(s),
Err(e)
if e.raw_os_error()
.map_or(false, |errno| errno == ERROR_MORE_DATA as i32) =>
{
Ok(buf.len())
}
Err(e) => Err(e),
};
let bytes_read = res?;
if bytes_read != buf.len() {
Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"failed to fill whole buffer",
))
} else {
Ok(bytes_read)
}
}
/// Deserializes a Tube packet by calling the supplied read function. This function MUST
/// assert that the buffer was filled.
pub fn deserialize_and_recv<T: DeserializeOwned, F: Fn(&mut [u8]) -> io::Result<usize>>(
read_fn: F,
) -> Result<T> {
let mut header_bytes = vec![0u8; mem::size_of::<MsgHeader>()];
perform_read(&read_fn, header_bytes.as_mut_slice()).map_err(Error::Recv)?;
// Safe because the header is always written by the send function, and only that function
// writes to this channel.
let header =
MsgHeader::from_slice(header_bytes.as_slice()).expect("Tube header failed to deserialize.");
let mut msg_json = vec![0u8; header.msg_json_size];
perform_read(&read_fn, msg_json.as_mut_slice()).map_err(Error::Recv)?;
if msg_json.is_empty() {
// This means we got a message header, but there is no json body (due to a zero size in
// the header). This should never happen because it means the receiver is getting no
// data whatsoever from the sender.
return Err(Error::RecvUnexpectedEmptyBody);
}
let msg_descriptors: Vec<RawDescriptor> = if header.descriptor_json_size > 0 {
let mut msg_descriptors_json = vec![0u8; header.descriptor_json_size];
perform_read(&read_fn, msg_descriptors_json.as_mut_slice()).map_err(Error::Recv)?;
let descriptor_usizes: Vec<usize> =
serde_json::from_slice(msg_descriptors_json.as_slice()).map_err(Error::Json)?;
// Safe because the usizes are RawDescriptors that were converted to usize in the send
// method.
descriptor_usizes
.iter()
.map(|item| *item as RawDescriptor)
.collect()
} else {
Vec::new()
};
let mut msg_descriptors_safe = msg_descriptors
.into_iter()
.map(|v| {
Some(unsafe {
// Safe because the socket returns new fds that are owned locally by this scope.
SafeDescriptor::from_raw_descriptor(v)
})
})
.collect();
deserialize_with_descriptors(
|| serde_json::from_slice(&msg_json),
&mut msg_descriptors_safe,
)
.map_err(Error::Json)
}
#[derive(PollToken, Eq, PartialEq, Copy, Clone)]
enum Token {
SocketReady,
}
impl AsRawDescriptor for Tube {
fn as_raw_descriptor(&self) -> RawDescriptor {
self.socket.as_raw_descriptor()
}
}
impl AsRawHandle for Tube {
fn as_raw_handle(&self) -> RawHandle {
self.as_raw_descriptor()
}
}
impl ReadNotifier for Tube {
fn get_read_notifier(&self) -> &dyn AsRawDescriptor {
self.socket.get_read_notifier()
}
}
impl CloseNotifier for Tube {
fn get_close_notifier(&self) -> &dyn AsRawDescriptor {
self.socket.get_close_notifier()
}
}
impl AsRawHandle for SendTube {
fn as_raw_handle(&self) -> RawHandle {
self.0.as_raw_descriptor()
}
}
impl AsRawHandle for RecvTube {
fn as_raw_handle(&self) -> RawHandle {
self.0.as_raw_descriptor()
}
}
/// A request to duplicate a handle to a target process.
#[derive(Serialize, Deserialize, Debug)]
pub struct DuplicateHandleRequest {
pub target_alias_pid: u32,
pub handle: usize,
}
/// Contains a duplicated handle or None if an error occurred.
#[derive(Serialize, Deserialize, Debug)]
pub struct DuplicateHandleResponse {
pub handle: Option<usize>,
}
/// Wrapper for tube which is used to delegate DuplicateHandle function calls to
/// the broker process.
#[derive(Serialize, Deserialize, Debug)]
pub struct DuplicateHandleTube(Tube);
impl DuplicateHandleTube {
pub fn new(tube: Tube) -> Self {
Self(tube)
}
pub fn request_duplicate_handle(
&self,
target_alias_pid: u32,
handle: RawHandle,
) -> Result<RawHandle> {
let req = DuplicateHandleRequest {
target_alias_pid,
handle: handle as usize,
};
self.0.send(&req)?;
let res: DuplicateHandleResponse = self.0.recv()?;
res.handle
.map(|h| h as RawHandle)
.ok_or(Error::BrokerDupDescriptor)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{EventContext, EventTrigger, PollToken, ReadNotifier};
use std::time;
const EVENT_WAIT_TIME: time::Duration = time::Duration::from_secs(10);
#[derive(PollToken, Debug, Eq, PartialEq, Copy, Clone)]
enum Token {
ReceivedData,
}
#[test]
fn test_serialize_tube() {
let (tube_1, tube_2) = Tube::pair().unwrap();
let event_ctx: EventContext<Token> = EventContext::build_with(&[EventTrigger::from(
tube_2.get_read_notifier(),
Token::ReceivedData,
)])
.unwrap();
// Serialize the Tube
let msg_serialize = SerializeDescriptors::new(&tube_1);
let serialized = serde_json::to_vec(&msg_serialize).unwrap();
let msg_descriptors = msg_serialize.into_descriptors();
// Deserialize the Tube
let mut msg_descriptors_safe = msg_descriptors
.into_iter()
.map(|v| Some(unsafe { SafeDescriptor::from_raw_descriptor(v) }))
.collect();
let tube_deserialized: Tube = deserialize_with_descriptors(
|| serde_json::from_slice(&serialized),
&mut msg_descriptors_safe,
)
.unwrap();
// Send a message through deserialized Tube
tube_deserialized.send(&"hi".to_string()).unwrap();
assert_eq!(event_ctx.wait_timeout(EVENT_WAIT_TIME).unwrap().len(), 1);
assert_eq!(tube_2.recv::<String>().unwrap(), "hi");
}
}

View file

@ -92,7 +92,7 @@ impl EventFd {
/// a timeout does not occur then the count is returned as a EventReadResult::Count(count),
/// and the count is reset to 0. If a timeout does occur then this function will return
/// EventReadResult::Timeout.
pub fn read_timeout(&mut self, timeout: Duration) -> Result<EventReadResult> {
pub fn read_timeout(&self, timeout: Duration) -> Result<EventReadResult> {
let mut pfd = libc::pollfd {
fd: self.as_raw_descriptor(),
events: POLLIN,
@ -217,7 +217,7 @@ mod tests {
#[test]
fn timeout() {
let mut evt = EventFd::new().expect("failed to create eventfd");
let evt = EventFd::new().expect("failed to create eventfd");
assert_eq!(
evt.read_timeout(Duration::from_millis(1))
.expect("failed to read from eventfd with timeout"),

View file

@ -43,7 +43,7 @@ use winapi::{
};
/// The default buffer size for all named pipes in the system. If this size is too small, writers
/// on named pipes that expect not to block can block until the reading side empties the buffer.
/// on named pipes that expect not to block *can* block until the reading side empties the buffer.
///
/// The general rule is this should be *at least* as big as the largest message, otherwise
/// unexpected blocking behavior can result; for example, if too small, this can interact badly with
@ -714,6 +714,11 @@ impl PipeConnection {
}
}
/// Get the framing mode of the pipe.
pub fn get_framing_mode(&self) -> FramingMode {
self.framing_mode
}
/// Returns metadata about the connected NamedPipe.
pub fn get_info(&self, is_server_connection: bool) -> Result<NamedPipeInfo> {
let mut flags: u32 = 0;

View file

@ -31,10 +31,15 @@ impl From<BlockingMode> for named_pipes::BlockingMode {
}
}
pub const DEFAULT_BUFFER_SIZE: usize = 50 * 1024;
/// An abstraction over named pipes and unix socketpairs.
///
/// The ReadNotifier will return an event handle that is set when data is in the channel.
///
/// In message mode, single writes larger than
/// `win_sys_util::named_pipes::DEFAULT_BUFFER_SIZE` are not permitted.
///
/// # Notes for maintainers
/// 1. This struct contains extremely subtle thread safety considerations.
/// 2. Serialization is not derived! New fields need to be added manually.
@ -75,6 +80,10 @@ pub struct StreamChannel {
#[serde(skip)]
#[serde(default = "create_true_cell")]
is_channel_closed_on_drop: RefCell<bool>,
// For StreamChannels created via pair_with_buffer_size, allows the channel to accept messages
// up to that size.
send_buffer_size: usize,
}
fn create_read_lock() -> Arc<Mutex<()>> {
@ -93,13 +102,14 @@ impl Serialize for StreamChannel {
where
S: Serializer,
{
let mut s = serializer.serialize_struct("StreamChannel", 6)?;
let mut s = serializer.serialize_struct("StreamChannel", 7)?;
s.serialize_field("pipe_conn", &self.pipe_conn)?;
s.serialize_field("write_notify", &self.write_notify)?;
s.serialize_field("read_notify", &self.read_notify)?;
s.serialize_field("pipe_closed", &self.pipe_closed)?;
s.serialize_field("remote_write_lock", &self.remote_write_lock)?;
s.serialize_field("local_write_lock", &self.local_write_lock)?;
s.serialize_field("send_buffer_size", &self.send_buffer_size)?;
let ret = s.end();
// Because this end has been serialized, the serialized copy is now responsible for setting
@ -134,10 +144,8 @@ impl StreamChannel {
}
}
// WARNING: Generally, multiple StreamChannels are not wanted. StreamChannels can only
// have 1 reader. However, this is used for the net process backend to work with Slirp, since
// one StreamChannel clone is put in a reader only loop and another is put in an writer only
// loop.
// WARNING: Generally, multiple StreamChannel ends are not wanted. StreamChannel behavior with
// > 1 reader per end is not defined.
pub fn try_clone(&self) -> io::Result<Self> {
Ok(StreamChannel {
pipe_conn: self.pipe_conn.try_clone()?,
@ -148,6 +156,7 @@ impl StreamChannel {
local_write_lock: self.local_write_lock.try_clone()?,
read_lock: self.read_lock.clone(),
is_channel_closed_on_drop: create_true_cell(),
send_buffer_size: self.send_buffer_size,
})
}
@ -242,6 +251,19 @@ impl StreamChannel {
/// Exists as a workaround for Tube which does not expect its transport to be mutable,
/// even though io::Write requires it.
pub fn write_immutable(&self, buf: &[u8]) -> io::Result<usize> {
if self.pipe_conn.get_framing_mode() == named_pipes::FramingMode::Message
&& buf.len() > self.send_buffer_size
{
return Err(io::Error::new(
io::ErrorKind::Other,
format!(
"StreamChannel forbids message mode writes larger than the \
default buffer size of {}.",
self.send_buffer_size,
),
));
}
let _lock = self.local_write_lock.lock();
let res = self.pipe_conn.write(buf);
@ -268,6 +290,7 @@ impl StreamChannel {
pub fn from_pipes(
pipe_a: PipeConnection,
pipe_b: PipeConnection,
send_buffer_size: usize,
) -> Result<(StreamChannel, StreamChannel)> {
let (notify_a_write, notify_b_write) = (Event::new()?, Event::new()?);
let pipe_closed = Event::new()?;
@ -284,6 +307,7 @@ impl StreamChannel {
remote_write_lock: write_lock_b.try_clone()?,
pipe_closed: pipe_closed.try_clone()?,
is_channel_closed_on_drop: create_true_cell(),
send_buffer_size,
};
let sock_b = StreamChannel {
pipe_conn: pipe_b,
@ -294,10 +318,13 @@ impl StreamChannel {
remote_write_lock: write_lock_a,
pipe_closed,
is_channel_closed_on_drop: create_true_cell(),
send_buffer_size,
};
Ok((sock_a, sock_b))
}
/// Create a pair with a specific buffer size. Note that this is the only way to send messages
/// larger than the default named pipe buffer size.
pub fn pair_with_buffer_size(
blocking_mode: BlockingMode,
framing_mode: FramingMode,
@ -310,7 +337,7 @@ impl StreamChannel {
buffer_size,
false,
)?;
Self::from_pipes(pipe_a, pipe_b)
Self::from_pipes(pipe_a, pipe_b, buffer_size)
}
/// Creates a cross platform channel pair.
/// On Windows the result is in the form (server, client).
@ -318,12 +345,14 @@ impl StreamChannel {
blocking_mode: BlockingMode,
framing_mode: FramingMode,
) -> Result<(StreamChannel, StreamChannel)> {
let (pipe_a, pipe_b) = named_pipes::pair(
let (pipe_a, pipe_b) = named_pipes::pair_with_buffer_size(
&named_pipes::FramingMode::from(framing_mode),
&named_pipes::BlockingMode::from(blocking_mode),
0,
DEFAULT_BUFFER_SIZE,
false,
)?;
Self::from_pipes(pipe_a, pipe_b)
Self::from_pipes(pipe_a, pipe_b, DEFAULT_BUFFER_SIZE)
}
/// Blocks until the pipe buffer is empty.

View file

@ -2,12 +2,17 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use crate::{Executor, IntoAsync, IoSourceExt};
use base::{AsRawDescriptor, Tube, TubeResult};
use serde::de::DeserializeOwned;
use std::ops::Deref;
use crate::{Executor, IntoAsync};
use base::{AsRawDescriptor, RecvTube, SendTube, Tube, TubeResult};
use serde::{de::DeserializeOwned, Serialize};
use std::io;
use std::os::unix::io::{AsRawFd, RawFd};
#[cfg_attr(windows, path = "win/async_types.rs")]
#[cfg_attr(not(windows), path = "unix/async_types.rs")]
mod async_types;
pub use async_types::*;
/// Like `cros_async::IntoAsync`, except for use with crosvm's AsRawDescriptor
/// trait object family.
pub trait DescriptorIntoAsync: AsRawDescriptor {}
@ -27,34 +32,40 @@ where
impl<T> IntoAsync for DescriptorAdapter<T> where T: DescriptorIntoAsync {}
impl IntoAsync for Tube {}
impl IntoAsync for SendTube {}
impl IntoAsync for RecvTube {}
pub struct AsyncTube {
inner: Box<dyn IoSourceExt<Tube>>,
}
impl AsyncTube {
pub fn new(ex: &Executor, tube: Tube) -> std::io::Result<AsyncTube> {
return Ok(AsyncTube {
inner: ex.async_from(tube)?,
});
pub struct RecvTubeAsync(AsyncTube);
#[allow(dead_code)]
impl RecvTubeAsync {
pub fn new(tube: RecvTube, ex: &Executor) -> io::Result<Self> {
Ok(Self(AsyncTube::new(
ex,
#[allow(deprecated)]
tube.into_tube(),
)?))
}
pub async fn next<T: DeserializeOwned>(&self) -> TubeResult<T> {
self.inner.wait_readable().await.unwrap();
self.inner.as_source().recv()
/// TODO(b/145998747, b/184398671): this async approach needs to be refactored
/// upstream, but for now is implemented to work using simple blocking futures
/// (avoiding the unimplemented wait_readable).
pub async fn next<T: 'static + DeserializeOwned + Send>(&self) -> TubeResult<T> {
self.0.next().await
}
}
impl Deref for AsyncTube {
type Target = Tube;
pub struct SendTubeAsync(AsyncTube);
#[allow(dead_code)]
impl SendTubeAsync {
pub fn new(tube: SendTube, ex: &Executor) -> io::Result<Self> {
Ok(Self(AsyncTube::new(
ex,
#[allow(deprecated)]
tube.into_tube(),
)?))
}
fn deref(&self) -> &Self::Target {
self.inner.as_source()
}
}
impl From<AsyncTube> for Tube {
fn from(at: AsyncTube) -> Tube {
at.inner.into_source()
pub async fn send<T: 'static + Serialize + Send + Sync>(&self, msg: T) -> TubeResult<()> {
self.0.send(msg).await
}
}

View file

@ -0,0 +1,42 @@
// Copyright 2022 The Chromium OS Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use crate::{Executor, IoSourceExt};
use base::{Tube, TubeResult};
use serde::{de::DeserializeOwned, Serialize};
use std::io;
use std::ops::Deref;
pub struct AsyncTube {
inner: Box<dyn IoSourceExt<Tube>>,
}
impl AsyncTube {
pub fn new(ex: &Executor, tube: Tube) -> io::Result<AsyncTube> {
return Ok(AsyncTube {
inner: ex.async_from(tube)?,
});
}
pub async fn next<T: DeserializeOwned>(&self) -> TubeResult<T> {
self.inner.wait_readable().await.unwrap();
self.inner.as_source().recv()
}
pub async fn send<T: 'static + Serialize + Send + Sync>(&self, msg: T) -> TubeResult<()> {
self.inner.as_source().send(&msg)
}
}
impl Deref for AsyncTube {
type Target = Tube;
fn deref(&self) -> &Self::Target {
self.inner.as_source()
}
}
impl From<AsyncTube> for Tube {
fn from(at: AsyncTube) -> Tube {
at.inner.into_source()
}
}

View file

@ -0,0 +1,58 @@
// Copyright 2020 The Chromium OS Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use crate::{Executor, HandleWrapper};
use base::{AsRawDescriptor, Descriptor, RecvTube, SendTube, Tube, TubeResult};
use serde::de::DeserializeOwned;
use std::io;
use std::os::windows::io::AsRawHandle;
use std::sync::{Arc, Mutex};
pub struct AsyncTube {
inner: Arc<Mutex<Tube>>,
}
impl AsyncTube {
pub fn new(ex: &Executor, tube: Tube) -> io::Result<AsyncTube> {
Ok(AsyncTube {
inner: Arc::new(Mutex::new(tube)),
})
}
/// TODO(b/145998747, b/184398671): this async approach needs to be refactored
/// upstream, but for now is implemented to work using simple blocking futures
/// (avoiding the unimplemented wait_readable).
pub async fn next<T: 'static + DeserializeOwned + Send>(&self) -> TubeResult<T> {
let tube = Arc::clone(&self.inner);
let handles = HandleWrapper::new(vec![Descriptor(tube.lock().unwrap().as_raw_handle())]);
unblock(
move || tube.lock().unwrap().recv(),
move || Err(handles.lock().cancel_sync_io(Error::OperationCancelled)),
)
.await
}
pub async fn send<T: 'static + Serialize + Send + Sync>(&self, msg: T) -> TubeResult<()> {
let tube = Arc::clone(&self.inner);
let handles = HandleWrapper::new(vec![Descriptor(tube.lock().unwrap().as_raw_handle())]);
unblock(
move || tube.lock().unwrap().send(&msg),
move || Err(handles.lock().cancel_sync_io(Error::OperationCancelled)),
)
.await
}
}
impl From<AsyncTube> for Tube {
fn from(at: AsyncTube) -> Tube {
// We ensure this is safe by waiting to acquire the mutex on
// the tube before unwrapping. This only works because the
// worker thread in "next" holds the mutex for its entire life.
//
// This does however mean that into will block until all async
// operations are complete.
let _ = at.inner.lock().unwrap();
Arc::try_unwrap(at.inner).unwrap().into_inner().unwrap()
}
}

View file

@ -950,7 +950,7 @@ mod tests {
// setup an event and a resample event for irq line 1
let evt = Event::new().expect("failed to create event");
let mut resample_evt = Event::new().expect("failed to create event");
let resample_evt = Event::new().expect("failed to create event");
let evt_index = chip
.register_irq_event(1, &evt, Some(&resample_evt))
@ -1083,7 +1083,7 @@ mod tests {
// setup an event and a resample event for irq line 1
let evt = Event::new().expect("failed to create event");
let mut resample_evt = Event::new().expect("failed to create event");
let resample_evt = Event::new().expect("failed to create event");
chip.register_irq_event(1, &evt, Some(&resample_evt))
.expect("failed to register_irq_event");

View file

@ -707,6 +707,7 @@ impl VirtioDevice for Balloon {
self.kill_evt = Some(self_kill_evt);
let state = self.state.clone();
#[allow(deprecated)]
let command_tube = match self.command_tube.try_clone() {
Ok(tube) => tube,
Err(e) => {

View file

@ -292,8 +292,10 @@ async fn handle_command_tube(
}
};
let resp_clone = resp.clone();
command_tube
.send(&resp)
.send(resp_clone)
.await
.map_err(ExecuteError::SendingResponse)?;
if let DiskControlResult::Ok = resp {
interrupt.borrow().signal_config_changed();

View file

@ -574,7 +574,7 @@ impl Worker {
Self::handle_vfio(mem, vfio_cmd, endpoints, hp_endpoints_ranges)
}
};
if let Err(e) = command_tube.send(&response) {
if let Err(e) = command_tube.send(response).await {
error!("{}", IommuError::VirtioIOMMUResponseError(e));
}
}
@ -682,7 +682,7 @@ async fn handle_translate_request(
.get(&endpoint_id)
.unwrap()
.send(
&mapper
mapper
.lock()
.translate(iova, size)
.map_err(|e| {
@ -691,6 +691,7 @@ async fn handle_translate_request(
})
.ok(),
)
.await
.map_err(IommuError::Tube)?;
} else {
error!("endpoint_id {} not found", endpoint_id)

View file

@ -213,7 +213,7 @@ impl VhostUserBackend for GpuBackend {
}
};
if let Err(e) = tube.send(&response) {
if let Err(e) = tube.send(response).await {
error!("Failed to send `PciBarConfiguration`: {}", e);
}

View file

@ -327,7 +327,7 @@ pub fn run_wl_device(program_name: &str, args: &[&str]) -> anyhow::Result<()> {
let wayland_paths: BTreeMap<_, _> = wayland_sock.into_iter().collect();
let resource_bridge = resource_bridge
.map(|p| {
.map(|p| -> anyhow::Result<Tube> {
let deadline = Instant::now() + Duration::from_secs(5);
loop {
match UnixSeqpacket::connect(&p) {
@ -336,7 +336,7 @@ pub fn run_wl_device(program_name: &str, args: &[&str]) -> anyhow::Result<()> {
if Instant::now() < deadline {
thread::sleep(Duration::from_millis(50));
} else {
return Err(e);
return Err(anyhow::Error::new(e));
}
}
}

View file

@ -1201,7 +1201,11 @@ pub fn setup_virtio_access_platform(
let CreateIpcMapperRet {
mapper: ipc_mapper,
response_tx,
} = create_ipc_mapper(endpoint_id, request_tx.try_clone()?);
} = create_ipc_mapper(
endpoint_id,
#[allow(deprecated)]
request_tx.try_clone()?,
);
translate_response_senders
.get_or_insert_with(BTreeMap::new)
.insert(endpoint_id, response_tx);

View file

@ -154,7 +154,7 @@ impl Display for DiskControlCommand {
}
}
#[derive(Serialize, Deserialize, Debug)]
#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum DiskControlResult {
Ok,
Err(SysError),