vmm_vhost: change Connection send API to send_message()

Make the connection API provide a send_message() API instead of
send_iovec(). This moves the responsibility for ensuring all data of a
message is sent into the platform-specific code (SocketConnection and
TubeConnection), which can now use different approaches.

The send_iovec_all() function is moved into the socket code, and the
socket implementation of send_message() now sends all of the data in a
single sendmsg() call. This is acceptable now, since the
Windows-specific requirement for splitting the header and data into
separate sends does not apply to the unix-only socket code.

The tube code now relies on the Tube::send() function to ensure all data
is delivered, removing the send_iovec_all() retry loop from Windows
entirely.

BUG=b:273574299

Change-Id: I9652e4ee3e95bb9ecf700dac93b0d5b806469ab2
Reviewed-on: https://chromium-review.googlesource.com/c/crosvm/crosvm/+/5075018
Reviewed-by: Frederick Mayle <fmayle@google.com>
Reviewed-by: Noah Gold <nkgold@google.com>
Commit-Queue: Daniel Verkamp <dverkamp@chromium.org>
This commit is contained in:
Daniel Verkamp 2023-11-28 16:43:48 -08:00 committed by crosvm LUCI
parent 69a9a0e4a0
commit 2a417a0b1b
4 changed files with 145 additions and 168 deletions

View file

@ -17,7 +17,6 @@ cfg_if::cfg_if! {
}
use std::fs::File;
use std::io::IoSlice;
use std::io::IoSliceMut;
use std::mem;
use std::path::Path;
@ -44,25 +43,6 @@ pub trait Listener: Sized {
fn set_nonblocking(&self, block: bool) -> Result<()>;
}
// Advance the internal cursor of the slices.
// This is same with a nightly API `IoSlice::advance_slices` but for `&[u8]`.
fn advance_slices(bufs: &mut &mut [&[u8]], mut count: usize) {
use std::mem::take;
let mut idx = 0;
for b in bufs.iter() {
if count < b.len() {
break;
}
count -= b.len();
idx += 1;
}
*bufs = &mut take(bufs)[idx..];
if !bufs.is_empty() {
bufs[0] = &bufs[0][count..];
}
}
// Advance the internal cursor of the slices.
// This is same with a nightly API `IoSliceMut::advance_slices` but for `&mut [u8]`.
fn advance_slices_mut(bufs: &mut &mut [&mut [u8]], mut count: usize) {
@ -117,54 +97,19 @@ impl<R: Req> Connection<R> {
))
}
/// Sends all bytes from scatter-gather vectors with optional attached file descriptors. Will
/// loop until all data has been transfered.
///
/// # TODO
/// This function takes a slice of `&[u8]` instead of `IoSlice` because the internal
/// cursor needs to be moved by `advance_slices()`.
/// Once `IoSlice::advance_slices()` becomes stable, this should be updated.
/// <https://github.com/rust-lang/rust/issues/62726>.
fn send_iovec_all(
&self,
mut iovs: &mut [&[u8]],
mut fds: Option<&[RawDescriptor]>,
) -> Result<()> {
// Guarantee that `iovs` becomes empty if it doesn't contain any data.
advance_slices(&mut iovs, 0);
while !iovs.is_empty() {
let iovec: Vec<_> = iovs.iter_mut().map(|i| IoSlice::new(i)).collect();
match self.0.send_iovec(&iovec, fds) {
Ok(n) => {
fds = None;
advance_slices(&mut iovs, n);
}
Err(e) => match e {
Error::SocketRetry(_) => {}
_ => return Err(e),
},
}
}
Ok(())
}
/// Sends bytes from a slice with optional attached file descriptors.
///
/// # Return:
/// * - number of bytes sent on success
#[cfg(test)]
fn send_slice(&self, data: IoSlice, fds: Option<&[RawDescriptor]>) -> Result<usize> {
self.0.send_iovec(&[data], fds)
fn send_slice(&self, data: &[u8], fds: Option<&[RawDescriptor]>) -> Result<()> {
self.0.send_message(data, &[], &[], fds)
}
/// Sends a header-only message with optional attached file descriptors.
pub fn send_header(
pub fn send_header_only_message(
&self,
hdr: &VhostUserMsgHeader<R>,
fds: Option<&[RawDescriptor]>,
) -> Result<()> {
self.send_iovec_all(&mut [hdr.as_bytes()], fds)
self.0.send_message(hdr.as_bytes(), &[], &[], fds)
}
/// Send a message with header and body. Optional file descriptors may be attached to
@ -175,11 +120,8 @@ impl<R: Req> Connection<R> {
body: &T,
fds: Option<&[RawDescriptor]>,
) -> Result<()> {
// We send the header and the body separately here. This is necessary on Windows. Otherwise
// the recv side cannot read the header independently (the transport is message oriented).
self.send_iovec_all(&mut [hdr.as_bytes()], fds)?;
self.send_iovec_all(&mut [body.as_bytes()], None)?;
Ok(())
self.0
.send_message(hdr.as_bytes(), body.as_bytes(), &[], fds)
}
/// Send a message with header and body. `payload` is appended to the end of the body. Optional
@ -191,18 +133,8 @@ impl<R: Req> Connection<R> {
payload: &[u8],
fds: Option<&[RawDescriptor]>,
) -> Result<()> {
if let Some(fd_arr) = fds {
if fd_arr.len() > MAX_ATTACHED_FD_ENTRIES {
return Err(Error::IncorrectFds);
}
}
// We send the header and the body separately here. This is necessary on Windows. Otherwise
// the recv side cannot read the header independently (the transport is message oriented).
self.send_iovec_all(&mut [hdr.as_bytes()], fds)?;
self.send_iovec_all(&mut [body.as_bytes(), payload], None)?;
Ok(())
self.0
.send_message(hdr.as_bytes(), body.as_bytes(), payload, fds)
}
/// Reads all bytes into the given scatter/gather vectors with optional attached files. Will
@ -349,18 +281,6 @@ pub(crate) mod tests {
}
}
#[test]
fn test_advance_slices() {
// Test case from https://doc.rust-lang.org/std/io/struct.IoSlice.html#method.advance_slices
let buf1 = [1; 8];
let buf2 = [2; 16];
let buf3 = [3; 8];
let mut bufs = &mut [&buf1[..], &buf2[..], &buf3[..]][..];
advance_slices(&mut bufs, 10);
assert_eq!(bufs[0], [2; 14].as_ref());
assert_eq!(bufs[1], [3; 8].as_ref());
}
#[test]
fn test_advance_slices_mut() {
// Test case from https://doc.rust-lang.org/std/io/struct.IoSliceMut.html#method.advance_slices

View file

@ -125,6 +125,25 @@ impl From<SystemStream> for SocketPlatformConnection {
}
}
// Advance the internal cursor of the slices.
// This is same with a nightly API `IoSlice::advance_slices` but for `&[u8]`.
fn advance_slices(bufs: &mut &mut [&[u8]], mut count: usize) {
use std::mem::take;
let mut idx = 0;
for b in bufs.iter() {
if count < b.len() {
break;
}
count -= b.len();
idx += 1;
}
*bufs = &mut take(bufs)[idx..];
if !bufs.is_empty() {
bufs[0] = &bufs[0][count..];
}
}
impl SocketPlatformConnection {
/// Create a new stream by connecting to server at `str`.
///
@ -136,22 +155,52 @@ impl SocketPlatformConnection {
Ok(Self::from(sock))
}
/// Sends bytes from scatter-gather vectors over the socket with optional attached file
/// descriptors.
/// Sends all bytes from scatter-gather vectors with optional attached file descriptors. Will
/// loop until all data has been transfered.
///
/// # Return:
/// * - number of bytes sent on success
/// * - SocketRetry: temporary error caused by signals or short of resources.
/// * - SocketBroken: the underline socket is broken.
/// * - SocketError: other socket related errors.
pub fn send_iovec(&self, iovs: &[IoSlice], fds: Option<&[RawDescriptor]>) -> Result<usize> {
let rfds = match fds {
Some(rfds) => rfds,
_ => &[],
};
self.sock
.send_vectored_with_fds(iovs, rfds)
.map_err(Into::into)
/// # TODO
/// This function takes a slice of `&[u8]` instead of `IoSlice` because the internal
/// cursor needs to be moved by `advance_slices()`.
/// Once `IoSlice::advance_slices()` becomes stable, this should be updated.
/// <https://github.com/rust-lang/rust/issues/62726>.
fn send_iovec_all(
&self,
mut iovs: &mut [&[u8]],
mut fds: Option<&[RawDescriptor]>,
) -> Result<()> {
// Guarantee that `iovs` becomes empty if it doesn't contain any data.
advance_slices(&mut iovs, 0);
while !iovs.is_empty() {
let iovec: Vec<_> = iovs.iter_mut().map(|i| IoSlice::new(i)).collect();
match self.sock.send_vectored_with_fds(&iovec, fds.unwrap_or(&[])) {
Ok(n) => {
fds = None;
advance_slices(&mut iovs, n);
}
Err(e) => match e.kind() {
ErrorKind::WouldBlock | ErrorKind::Interrupted => {}
_ => return Err(Error::SocketError(e)),
},
}
}
Ok(())
}
/// Sends a single message over the socket with optional attached file descriptors.
///
/// - `hdr`: vhost message header
/// - `body`: vhost message body (may be empty to send a header-only message)
/// - `payload`: additional bytes to append to `body` (may be empty)
pub fn send_message(
&self,
hdr: &[u8],
body: &[u8],
payload: &[u8],
fds: Option<&[RawDescriptor]>,
) -> Result<()> {
let mut iobufs = [hdr, body, payload];
self.send_iovec_all(&mut iobufs, fds)
}
/// Reads bytes from the socket into the given scatter/gather vectors with optional attached
@ -270,14 +319,12 @@ mod tests {
let slave = listener.accept().unwrap().unwrap();
let buf1 = [0x1, 0x2, 0x3, 0x4];
let mut len = master.send_slice(IoSlice::new(&buf1[..]), None).unwrap();
assert_eq!(len, 4);
master.send_slice(&buf1, None).unwrap();
let (bytes, buf2, _) = slave.recv_into_buf(0x1000).unwrap();
assert_eq!(bytes, 4);
assert_eq!(&buf1[..], &buf2[..bytes]);
len = master.send_slice(IoSlice::new(&buf1[..]), None).unwrap();
assert_eq!(len, 4);
master.send_slice(&buf1, None).unwrap();
let (bytes, buf2, _) = slave.recv_into_buf(0x2).unwrap();
assert_eq!(bytes, 2);
assert_eq!(&buf1[..2], &buf2[..]);
@ -301,10 +348,9 @@ mod tests {
// Normal case for sending/receiving file descriptors
let buf1 = [0x1, 0x2, 0x3, 0x4];
let len = master
.send_slice(IoSlice::new(&buf1[..]), Some(&[fd.as_raw_descriptor()]))
master
.send_slice(&buf1, Some(&[fd.as_raw_descriptor()]))
.unwrap();
assert_eq!(len, 4);
let (bytes, buf2, files) = slave.recv_into_buf(4).unwrap();
assert_eq!(bytes, 4);
@ -323,9 +369,9 @@ mod tests {
// Following communication pattern should work:
// Sending side: data(header, body) with fds
// Receiving side: data(header) with fds, data(body)
let len = master
master
.send_slice(
IoSlice::new(&buf1[..]),
&buf1,
Some(&[
fd.as_raw_descriptor(),
fd.as_raw_descriptor(),
@ -333,7 +379,6 @@ mod tests {
]),
)
.unwrap();
assert_eq!(len, 4);
let (bytes, buf2, files) = slave.recv_into_buf(0x2).unwrap();
assert_eq!(bytes, 2);
@ -356,9 +401,9 @@ mod tests {
// Following communication pattern should not work:
// Sending side: data(header, body) with fds
// Receiving side: data(header), data(body) with fds
let len = master
master
.send_slice(
IoSlice::new(&buf1[..]),
&buf1,
Some(&[
fd.as_raw_descriptor(),
fd.as_raw_descriptor(),
@ -366,7 +411,6 @@ mod tests {
]),
)
.unwrap();
assert_eq!(len, 4);
let mut buf4 = vec![0u8; 2];
slave.recv_into_bufs_all(&mut [&mut buf4[..]]).unwrap();
@ -379,11 +423,10 @@ mod tests {
// Following communication pattern should work:
// Sending side: data, data with fds
// Receiving side: data, data with fds
let len = master.send_slice(IoSlice::new(&buf1[..]), None).unwrap();
assert_eq!(len, 4);
let len = master
master.send_slice(&buf1, None).unwrap();
master
.send_slice(
IoSlice::new(&buf1[..]),
&buf1,
Some(&[
fd.as_raw_descriptor(),
fd.as_raw_descriptor(),
@ -391,7 +434,6 @@ mod tests {
]),
)
.unwrap();
assert_eq!(len, 4);
let (bytes, buf2, files) = slave.recv_into_buf(0x4).unwrap();
assert_eq!(bytes, 4);
@ -419,11 +461,10 @@ mod tests {
// Following communication pattern should not work:
// Sending side: data1, data2 with fds
// Receiving side: data + partial of data2, left of data2 with fds
let len = master.send_slice(IoSlice::new(&buf1[..]), None).unwrap();
assert_eq!(len, 4);
let len = master
master.send_slice(&buf1, None).unwrap();
master
.send_slice(
IoSlice::new(&buf1[..]),
&buf1,
Some(&[
fd.as_raw_descriptor(),
fd.as_raw_descriptor(),
@ -431,7 +472,6 @@ mod tests {
]),
)
.unwrap();
assert_eq!(len, 4);
let mut v = vec![0u8; 5];
slave.recv_into_bufs_all(&mut [&mut v[..]]).unwrap();
@ -442,9 +482,9 @@ mod tests {
assert!(files.is_none());
// If the target fd array is too small, extra file descriptors will get lost.
let len = master
master
.send_slice(
IoSlice::new(&buf1[..]),
&buf1,
Some(&[
fd.as_raw_descriptor(),
fd.as_raw_descriptor(),
@ -452,7 +492,6 @@ mod tests {
]),
)
.unwrap();
assert_eq!(len, 4);
let (bytes, _, files) = slave.recv_into_buf(0x4).unwrap();
assert_eq!(bytes, 4);
@ -485,9 +524,21 @@ mod tests {
assert_eq!(features1, features2);
assert!(files.is_none());
master.send_header(&hdr1, None).unwrap();
master.send_header_only_message(&hdr1, None).unwrap();
let (hdr2, files) = slave.recv_header().unwrap();
assert_eq!(hdr1, hdr2);
assert!(files.is_none());
}
#[test]
fn test_advance_slices() {
// Test case from https://doc.rust-lang.org/std/io/struct.IoSlice.html#method.advance_slices
let buf1 = [1; 8];
let buf2 = [2; 16];
let buf3 = [3; 8];
let mut bufs = &mut [&buf1[..], &buf2[..], &buf3[..]][..];
advance_slices(&mut bufs, 10);
assert_eq!(bufs[0], [2; 14].as_ref());
assert_eq!(bufs[1], [3; 8].as_ref());
}
}

View file

@ -6,7 +6,6 @@
use std::cmp::min;
use std::fs::File;
use std::io::IoSlice;
use std::io::IoSliceMut;
use std::path::Path;
use std::ptr::copy_nonoverlapping;
@ -58,30 +57,43 @@ impl TubePlatformConnection {
unimplemented!("connections not supported on Tubes")
}
/// Sends bytes from scatter-gather vectors with optional attached file descriptors.
/// Sends a single message over the socket with optional attached file descriptors.
///
/// # Return:
/// * - number of bytes sent on success
/// * - TubeError: tube related errors.
pub fn send_iovec(&self, iovs: &[IoSlice], rds: Option<&[RawDescriptor]>) -> Result<usize> {
// Gather the iovecs
let total_bytes = iovs.iter().map(|iov| iov.len()).sum();
let mut data = Vec::with_capacity(total_bytes);
for iov in iovs {
data.extend(iov.iter());
/// - `hdr`: vhost message header
/// - `body`: vhost message body (may be empty to send a header-only message)
/// - `payload`: additional bytes to append to `body` (may be empty)
pub fn send_message(
&self,
hdr: &[u8],
body: &[u8],
payload: &[u8],
rds: Option<&[RawDescriptor]>,
) -> Result<()> {
let hdr_msg = Message {
rds: rds
.unwrap_or(&[])
.iter()
.map(|rd| RawDescriptorContainer { rd: *rd })
.collect(),
data: hdr.to_vec(),
};
let mut body_data = Vec::with_capacity(body.len() + payload.len());
body_data.extend_from_slice(body);
body_data.extend_from_slice(payload);
let body_msg = Message {
rds: Vec::new(),
data: body_data,
};
// We send the header and the body separately here. This is necessary on Windows. Otherwise
// the recv side cannot read the header independently (the transport is message oriented).
self.tube.send(&hdr_msg)?;
if !body_msg.data.is_empty() {
self.tube.send(&body_msg)?;
}
let mut msg = Message {
data,
rds: Vec::with_capacity(rds.map_or(0, |rds| rds.len())),
};
if let Some(rds) = rds {
for rd in rds {
msg.rds.push(RawDescriptorContainer { rd: *rd });
}
}
self.tube.send(&msg)?;
Ok(total_bytes)
Ok(())
}
/// Reads bytes from the tube into the given scatter/gather vectors with optional attached
@ -171,7 +183,6 @@ impl AsRawDescriptor for TubePlatformConnection {
#[cfg(test)]
mod tests {
use std::io::IoSlice;
use std::io::Read;
use std::io::Seek;
use std::io::SeekFrom;
@ -201,8 +212,7 @@ mod tests {
let (master, slave) = create_pair();
let buf1 = vec![0x1, 0x2, 0x3, 0x4];
let len = master.send_slice(IoSlice::new(&buf1[..]), None).unwrap();
assert_eq!(len, 4);
master.send_slice(&buf1, None).unwrap();
let (bytes, buf2, _) = slave.recv_into_buf(0x1000).unwrap();
assert_eq!(bytes, 4);
assert_eq!(&buf1[..], &buf2[..bytes]);
@ -217,10 +227,9 @@ mod tests {
// Normal case for sending/receiving file descriptors
let buf1 = vec![0x1, 0x2, 0x3, 0x4];
let len = master
.send_slice(IoSlice::new(&buf1[..]), Some(&[file.as_raw_descriptor()]))
master
.send_slice(&buf1, Some(&[file.as_raw_descriptor()]))
.unwrap();
assert_eq!(len, 4);
let (bytes, buf2, files) = slave.recv_into_buf(4).unwrap();
assert_eq!(bytes, 4);
@ -239,11 +248,10 @@ mod tests {
// Following communication pattern should work:
// Sending side: data, data with fds
// Receiving side: data, data with fds
let len = master.send_slice(IoSlice::new(&buf1[..]), None).unwrap();
assert_eq!(len, 4);
let len = master
master.send_slice(&buf1, None).unwrap();
master
.send_slice(
IoSlice::new(&buf1[..]),
&buf1,
Some(&[
file.as_raw_descriptor(),
file.as_raw_descriptor(),
@ -251,7 +259,6 @@ mod tests {
]),
)
.unwrap();
assert_eq!(len, 4);
let (bytes, buf2, files) = slave.recv_into_buf(0x4).unwrap();
assert_eq!(bytes, 4);
@ -276,9 +283,9 @@ mod tests {
//
// Porting note: no, they won't. The FD array is sized to whatever the header says it
// should be.
let len = master
master
.send_slice(
IoSlice::new(&buf1[..]),
&buf1,
Some(&[
file.as_raw_descriptor(),
file.as_raw_descriptor(),
@ -286,7 +293,6 @@ mod tests {
]),
)
.unwrap();
assert_eq!(len, 4);
let (bytes, _, files) = slave.recv_into_buf(0x4).unwrap();
assert_eq!(bytes, 4);
@ -313,7 +319,7 @@ mod tests {
assert_eq!(features1, features2);
assert!(files.is_none());
master.send_header(&hdr1, None).unwrap();
master.send_header_only_message(&hdr1, None).unwrap();
let (hdr2, files) = slave.recv_header().unwrap();
assert_eq!(hdr1, hdr2);
assert!(files.is_none());

View file

@ -552,7 +552,7 @@ impl Master {
fds: Option<&[RawDescriptor]>,
) -> VhostUserResult<VhostUserMsgHeader<MasterReq>> {
let hdr = self.new_request_header(code, 0);
self.main_sock.send_header(&hdr, fds)?;
self.main_sock.send_header_only_message(&hdr, fds)?;
Ok(hdr)
}