vmm_vhost: rename Endpoint to Connection

Change-Id: If7e8a845a50d09c5ee2bdfe1e807121d835b1431
Reviewed-on: https://chromium-review.googlesource.com/c/crosvm/crosvm/+/5068400
Commit-Queue: Frederick Mayle <fmayle@google.com>
Reviewed-by: Daniel Verkamp <dverkamp@chromium.org>
This commit is contained in:
Frederick Mayle 2023-11-28 15:37:09 -08:00 committed by crosvm LUCI
parent 78a0f3d042
commit 862d4709c9
19 changed files with 135 additions and 135 deletions

View file

@ -86,7 +86,7 @@ use vmm_vhost::message::VhostUserShmemUnmapMsg;
use vmm_vhost::message::VhostUserSingleMemoryRegion;
use vmm_vhost::message::VhostUserVringAddrFlags;
use vmm_vhost::message::VhostUserVringState;
use vmm_vhost::Endpoint;
use vmm_vhost::Connection;
use vmm_vhost::Error as VhostError;
use vmm_vhost::Result as VhostResult;
use vmm_vhost::Slave;
@ -666,7 +666,7 @@ impl VhostUserSlaveReqHandlerMut for DeviceRequestHandler {
Ok(())
}
fn set_slave_req_fd(&mut self, ep: Endpoint<SlaveReq>) {
fn set_slave_req_fd(&mut self, ep: Connection<SlaveReq>) {
let conn = VhostBackendReqConnection::new(
Slave::new(ep),
self.backend.get_shared_memory_region().map(|r| r.id),

View file

@ -84,7 +84,7 @@ pub mod test_helpers {
mut listener: SocketListener,
handler: S,
) -> SlaveReqHandler<S> {
let endpoint = listener.accept().unwrap().unwrap();
SlaveReqHandler::new(endpoint, handler)
let connection = listener.accept().unwrap().unwrap();
SlaveReqHandler::new(connection, handler)
}
}

View file

@ -59,8 +59,8 @@ async fn run_with_handler(
.accept()
.context("failed to accept an incoming connection")?
{
Some(endpoint) => {
let req_handler = SlaveReqHandler::new(endpoint, handler);
Some(connection) => {
let req_handler = SlaveReqHandler::new(connection, handler);
return run_handler(req_handler, ex).await;
}
None => {

View file

@ -23,7 +23,7 @@ use data_model::Le64;
use vhost::Vhost;
use vhost::Vsock;
use vm_memory::GuestMemory;
use vmm_vhost::connection::Endpoint;
use vmm_vhost::connection::Connection;
use vmm_vhost::message::SlaveReq;
use vmm_vhost::message::VhostSharedMemoryRegion;
use vmm_vhost::message::VhostUserConfigFlags;
@ -398,7 +398,7 @@ impl VhostUserSlaveReqHandlerMut for VsockBackend {
Err(Error::InvalidOperation)
}
fn set_slave_req_fd(&mut self, _vu_req: Endpoint<SlaveReq>) {
fn set_slave_req_fd(&mut self, _vu_req: Connection<SlaveReq>) {
// We didn't set VhostUserProtocolFeatures::SLAVE_REQ
unreachable!("unexpected set_slave_req_fd");
}

View file

@ -16,7 +16,7 @@ use cros_async::Executor;
use futures::pin_mut;
use futures::select;
use futures::FutureExt;
use vmm_vhost::connection::TubeEndpoint;
use vmm_vhost::connection::TubePlatformConnection;
use vmm_vhost::message::MasterReq;
use vmm_vhost::message::VhostUserProtocolFeatures;
use vmm_vhost::Master;

View file

@ -70,7 +70,7 @@ pub struct DevicesCommand {
)]
/// start a serial device.
/// Possible key values:
/// vhost=PATH - Path to a vhost-user endpoint to listen to.
/// vhost=PATH - Path to a vhost-user socket to listen to.
/// This parameter must be given in first position.
/// type=(stdout,syslog,sink,file) - Where to route the
/// serial device
@ -95,7 +95,7 @@ pub struct DevicesCommand {
#[argh(option, arg_name = "vhost=PATH[, block options]")]
/// start a block device.
/// Possible key values:
/// vhost=PATH - Path to a vhost-user endpoint to listen to.
/// vhost=PATH - Path to a vhost-user socket to listen to.
/// This parameter must be given in first position.
/// block options:
/// See help from `crosvm run` command.
@ -104,7 +104,7 @@ pub struct DevicesCommand {
#[argh(option, arg_name = "vhost=PATH,cid=CID[,device=VHOST_DEVICE]")]
/// start a vsock device.
/// Possible key values:
/// vhost=PATH - Path to a vhost-user endpoint to listen to.
/// vhost=PATH - Path to a vhost-user socket to listen to.
/// This parameter must be given in first position.
/// cid=CID - CID to use for the device.
/// device=VHOST_DEVICE - path to the vhost-vsock device to
@ -115,7 +115,7 @@ pub struct DevicesCommand {
#[argh(option, arg_name = "net options")]
/// start a network device.
/// Possible key values:
/// vhost=PATH - Path to a vhost-user endpoint to listen to.
/// vhost=PATH - Path to a vhost-user socket to listen to.
/// This parameter must be given in first position.
/// network options:
/// See help from the `crosvm run` command.

View file

@ -1,7 +1,7 @@
// Copyright (C) 2019 Alibaba Cloud Computing. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//! Common data structures for listener and endpoint.
//! Common data structures for listener and connection.
cfg_if::cfg_if! {
if #[cfg(unix)] {
@ -9,7 +9,7 @@ cfg_if::cfg_if! {
mod unix;
} else if #[cfg(windows)] {
mod tube;
pub use tube::TubeEndpoint;
pub use tube::TubePlatformConnection;
mod windows;
}
}
@ -29,7 +29,7 @@ use crate::connection::Req;
use crate::message::MasterReq;
use crate::message::SlaveReq;
use crate::message::*;
use crate::sys::PlatformEndpoint;
use crate::sys::PlatformConnection;
use crate::Error;
use crate::Result;
use crate::SystemStream;
@ -37,7 +37,7 @@ use crate::SystemStream;
/// Listener for accepting connections.
pub trait Listener: Sized {
/// Accept an incoming connection.
fn accept(&mut self) -> Result<Option<Endpoint<MasterReq>>>;
fn accept(&mut self) -> Result<Option<Connection<MasterReq>>>;
/// Change blocking status on the listener.
fn set_nonblocking(&self, block: bool) -> Result<()>;
@ -86,39 +86,39 @@ fn advance_slices_mut(bufs: &mut &mut [&mut [u8]], mut count: usize) {
/// A vhost-user connection at a low abstraction level. Provides methods for sending and receiving
/// vhost-user message headers and bodies.
///
/// Builds on top of `PlatformEndpoint`, which provides methods for sending and receiving raw bytes
/// and file descriptors (a thin cross-platform abstraction for unix domain sockets).
pub struct Endpoint<R: Req>(
pub(crate) PlatformEndpoint<R>,
// Mark `Endpoint` as `!Sync` because message sends and recvs cannot safely be done
/// Builds on top of `PlatformConnection`, which provides methods for sending and receiving raw
/// bytes and file descriptors (a thin cross-platform abstraction for unix domain sockets).
pub struct Connection<R: Req>(
pub(crate) PlatformConnection<R>,
// Mark `Connection` as `!Sync` because message sends and recvs cannot safely be done
// concurrently.
std::marker::PhantomData<std::cell::Cell<()>>,
);
impl<R: Req> From<SystemStream> for Endpoint<R> {
impl<R: Req> From<SystemStream> for Connection<R> {
fn from(sock: SystemStream) -> Self {
Self(PlatformEndpoint::from(sock), std::marker::PhantomData)
Self(PlatformConnection::from(sock), std::marker::PhantomData)
}
}
impl<R: Req> Endpoint<R> {
impl<R: Req> Connection<R> {
/// Create a new stream by connecting to server at `path`.
pub fn connect<P: AsRef<Path>>(path: P) -> Result<Self> {
Ok(Self(
PlatformEndpoint::connect(path)?,
PlatformConnection::connect(path)?,
std::marker::PhantomData,
))
}
/// Constructs the slave request endpoint for self.
/// Constructs the slave request connection for self.
///
/// # Arguments
/// * `files` - Files from which to create the endpoint
pub fn create_slave_request_endpoint(
/// * `files` - Files from which to create the connection
pub fn create_slave_request_connection(
&mut self,
files: Option<Vec<File>>,
) -> Result<super::Endpoint<SlaveReq>> {
self.0.create_slave_request_endpoint(files)
) -> Result<super::Connection<SlaveReq>> {
self.0.create_slave_request_connection(files)
}
/// Sends all bytes from scatter-gather vectors with optional attached file descriptors. Will
@ -336,7 +336,7 @@ impl<R: Req> Endpoint<R> {
}
}
impl<R: Req> AsRawDescriptor for Endpoint<R> {
impl<R: Req> AsRawDescriptor for Connection<R> {
fn as_raw_descriptor(&self) -> RawDescriptor {
self.0.as_raw_descriptor()
}

View file

@ -1,7 +1,7 @@
// Copyright 2021 The Chromium OS Authors. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//! Structs for Unix Domain Socket listener and endpoint.
//! Structs for Unix Domain Socket listener and connection.
use std::any::Any;
use std::fs::File;
@ -23,7 +23,7 @@ use crate::connection::Req;
use crate::message::*;
use crate::take_single_file;
use crate::unix::SystemListener;
use crate::Endpoint;
use crate::Connection;
use crate::Error;
use crate::Result;
use crate::SystemStream;
@ -78,11 +78,11 @@ impl Listener for SocketListener {
/// * - Some(SystemListener): new SystemListener object if new incoming connection is available.
/// * - None: no incoming connection available.
/// * - SocketError: errors from accept().
fn accept(&mut self) -> Result<Option<Endpoint<MasterReq>>> {
fn accept(&mut self) -> Result<Option<Connection<MasterReq>>> {
loop {
match self.fd.accept() {
Ok((stream, _addr)) => {
return Ok(Some(Endpoint::from(stream)));
return Ok(Some(Connection::from(stream)));
}
Err(e) => {
match e.kind() {
@ -115,14 +115,14 @@ impl AsRawDescriptor for SocketListener {
}
}
/// Unix domain socket endpoint for vhost-user connection.
pub struct SocketEndpoint<R: Req> {
/// Unix domain socket based vhost-user connection.
pub struct SocketPlatformConnection<R: Req> {
sock: ScmSocket<SystemStream>,
_r: PhantomData<R>,
}
// TODO: Switch to TryFrom to avoid the unwrap.
impl<R: Req> From<SystemStream> for SocketEndpoint<R> {
impl<R: Req> From<SystemStream> for SocketPlatformConnection<R> {
fn from(sock: SystemStream) -> Self {
Self {
sock: sock.try_into().unwrap(),
@ -131,11 +131,11 @@ impl<R: Req> From<SystemStream> for SocketEndpoint<R> {
}
}
impl<R: Req> SocketEndpoint<R> {
impl<R: Req> SocketPlatformConnection<R> {
/// Create a new stream by connecting to server at `str`.
///
/// # Return:
/// * - the new SocketEndpoint object on success.
/// * - the new SocketPlatformConnection object on success.
/// * - SocketConnect: failed to connect to peer.
pub fn connect<P: AsRef<Path>>(path: P) -> Result<Self> {
let sock = SystemStream::connect(path).map_err(Error::SocketConnect)?;
@ -212,24 +212,24 @@ impl<R: Req> SocketEndpoint<R> {
Ok((bytes, files))
}
pub fn create_slave_request_endpoint(
pub fn create_slave_request_connection(
&mut self,
files: Option<Vec<File>>,
) -> Result<Endpoint<SlaveReq>> {
) -> Result<Connection<SlaveReq>> {
let file = take_single_file(files).ok_or(Error::InvalidMessage)?;
// Safe because we own the file
let tube = unsafe { SystemStream::from_raw_descriptor(file.into_raw_descriptor()) };
Ok(Endpoint::<SlaveReq>::from(tube))
Ok(Connection::<SlaveReq>::from(tube))
}
}
impl<T: Req> AsRawDescriptor for SocketEndpoint<T> {
impl<T: Req> AsRawDescriptor for SocketPlatformConnection<T> {
fn as_raw_descriptor(&self) -> RawDescriptor {
self.sock.as_raw_descriptor()
}
}
impl<T: Req> AsMut<SystemStream> for SocketEndpoint<T> {
impl<T: Req> AsMut<SystemStream> for SocketPlatformConnection<T> {
fn as_mut(&mut self) -> &mut SystemStream {
self.sock.inner_mut()
}
@ -284,7 +284,7 @@ mod tests {
path.push("sock");
let mut listener = SocketListener::new(&path, true).unwrap();
listener.set_nonblocking(true).unwrap();
let master = Endpoint::<MasterReq>::connect(&path).unwrap();
let master = Connection::<MasterReq>::connect(&path).unwrap();
let slave = listener.accept().unwrap().unwrap();
let buf1 = [0x1, 0x2, 0x3, 0x4];
@ -311,7 +311,7 @@ mod tests {
path.push("sock");
let mut listener = SocketListener::new(&path, true).unwrap();
listener.set_nonblocking(true).unwrap();
let master = Endpoint::<MasterReq>::connect(&path).unwrap();
let master = Connection::<MasterReq>::connect(&path).unwrap();
let slave = listener.accept().unwrap().unwrap();
let mut fd = tempfile().unwrap();
@ -484,7 +484,7 @@ mod tests {
path.push("sock");
let mut listener = SocketListener::new(&path, true).unwrap();
listener.set_nonblocking(true).unwrap();
let master = Endpoint::<MasterReq>::connect(&path).unwrap();
let master = Connection::<MasterReq>::connect(&path).unwrap();
let slave = listener.accept().unwrap().unwrap();
let mut hdr1 =

View file

@ -1,8 +1,8 @@
// Copyright 2022 The Chromium OS Authors. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//! Structs for Tube based endpoint. Listeners are not used with Tubes, since they are essentially
//! fancy socket pairs.
//! Structs for Tube based connection. Listeners are not used with Tubes, since they are
//! essentially fancy socket pairs.
use std::cmp::min;
use std::fs::File;
@ -23,7 +23,7 @@ use tube_transporter::packed_tube;
use crate::connection::Req;
use crate::message::SlaveReq;
use crate::take_single_file;
use crate::Endpoint;
use crate::Connection;
use crate::Error;
use crate::Result;
@ -34,24 +34,24 @@ struct RawDescriptorContainer {
}
#[derive(Serialize, Deserialize)]
struct EndpointMessage {
struct Message {
rds: Vec<RawDescriptorContainer>,
data: Vec<u8>,
}
/// Tube endpoint for vhost-user connection.
pub struct TubeEndpoint<R: Req> {
/// Tube based vhost-user connection.
pub struct TubePlatformConnection<R: Req> {
tube: Tube,
_r: PhantomData<R>,
}
impl<R: Req> TubeEndpoint<R> {
impl<R: Req> TubePlatformConnection<R> {
pub(crate) fn get_tube(&self) -> &Tube {
&self.tube
}
}
impl<R: Req> From<Tube> for TubeEndpoint<R> {
impl<R: Req> From<Tube> for TubePlatformConnection<R> {
fn from(tube: Tube) -> Self {
Self {
tube,
@ -60,7 +60,7 @@ impl<R: Req> From<Tube> for TubeEndpoint<R> {
}
}
impl<R: Req> TubeEndpoint<R> {
impl<R: Req> TubePlatformConnection<R> {
pub fn connect<P: AsRef<Path>>(_path: P) -> Result<Self> {
unimplemented!("connections not supported on Tubes")
}
@ -78,7 +78,7 @@ impl<R: Req> TubeEndpoint<R> {
data.extend(iov.iter());
}
let mut msg = EndpointMessage {
let mut msg = Message {
data,
rds: Vec::with_capacity(rds.map_or(0, |rds| rds.len())),
};
@ -109,7 +109,7 @@ impl<R: Req> TubeEndpoint<R> {
) -> Result<(usize, Option<Vec<File>>)> {
// TODO(b/221882601): implement "allow_rds"
let msg: EndpointMessage = self.tube.recv()?;
let msg: Message = self.tube.recv()?;
let files = match msg.rds.len() {
0 => None,
@ -156,18 +156,18 @@ impl<R: Req> TubeEndpoint<R> {
Ok((bytes_read, files))
}
pub fn create_slave_request_endpoint(
pub fn create_slave_request_connection(
&mut self,
files: Option<Vec<File>>,
) -> Result<Endpoint<SlaveReq>> {
) -> Result<Connection<SlaveReq>> {
let file = take_single_file(files).ok_or(Error::InvalidMessage)?;
// Safe because the file represents a packed tube.
let tube = unsafe { packed_tube::unpack(file.into()).expect("unpacked Tube") };
Ok(Endpoint::from(tube))
Ok(Connection::from(tube))
}
}
impl<R: Req> AsRawDescriptor for TubeEndpoint<R> {
impl<R: Req> AsRawDescriptor for TubePlatformConnection<R> {
/// WARNING: this function does not return a waitable descriptor! Use base::ReadNotifier
/// instead.
fn as_raw_descriptor(&self) -> RawDescriptor {
@ -192,13 +192,13 @@ mod tests {
use crate::message::MasterReq;
use crate::message::VhostUserMsgHeader;
use crate::message::VhostUserMsgValidator;
use crate::Endpoint;
use crate::Connection;
fn create_pair() -> (Endpoint<MasterReq>, Endpoint<MasterReq>) {
fn create_pair() -> (Connection<MasterReq>, Connection<MasterReq>) {
let (master_tube, slave_tube) = Tube::pair().unwrap();
(
Endpoint::<MasterReq>::from(master_tube),
Endpoint::<MasterReq>::from(slave_tube),
Connection::<MasterReq>::from(master_tube),
Connection::<MasterReq>::from(slave_tube),
)
}

View file

@ -14,13 +14,13 @@ pub(crate) mod tests {
use crate::message::MasterReq;
use crate::slave_req_handler::SlaveReqHandler;
use crate::slave_req_handler::VhostUserSlaveReqHandler;
use crate::Endpoint;
use crate::Connection;
pub(crate) fn temp_dir() -> TempDir {
Builder::new().prefix("/tmp/vhost_test").tempdir().unwrap()
}
pub(crate) fn create_pair() -> (Master, Endpoint<MasterReq>) {
pub(crate) fn create_pair() -> (Master, Connection<MasterReq>) {
let dir = temp_dir();
let mut path = dir.path().to_owned();
path.push("sock");
@ -40,8 +40,8 @@ pub(crate) mod tests {
path.push("sock");
let mut listener = SocketListener::new(&path, true).unwrap();
let master = Master::connect(&path).unwrap();
let endpoint = listener.accept().unwrap().unwrap();
let req_handler = SlaveReqHandler::new(endpoint, backend);
let connection = listener.accept().unwrap().unwrap();
let req_handler = SlaveReqHandler::new(connection, backend);
(master, req_handler)
}

View file

@ -9,13 +9,13 @@ pub(crate) mod tests {
use crate::message::MasterReq;
use crate::slave_req_handler::SlaveReqHandler;
use crate::slave_req_handler::VhostUserSlaveReqHandler;
use crate::Endpoint;
use crate::Connection;
use crate::SystemStream;
pub(crate) fn create_pair() -> (Master, Endpoint<MasterReq>) {
pub(crate) fn create_pair() -> (Master, Connection<MasterReq>) {
let (master_tube, slave_tube) = SystemStream::pair().unwrap();
let master = Master::from_stream(master_tube);
(master, Endpoint::from(slave_tube))
(master, Connection::from(slave_tube))
}
pub(crate) fn create_master_slave_pair<S>(backend: S) -> (Master, SlaveReqHandler<S>)

View file

@ -46,7 +46,7 @@ pub use message::VHOST_USER_F_PROTOCOL_FEATURES;
pub mod connection;
mod sys;
pub use connection::Endpoint;
pub use connection::Connection;
pub use message::MasterReq;
pub use message::SlaveReq;
pub use sys::SystemStream;

View file

@ -20,7 +20,7 @@ use crate::backend::VhostUserMemoryRegionInfo;
use crate::backend::VringConfigData;
use crate::message::*;
use crate::take_single_file;
use crate::Endpoint;
use crate::Connection;
use crate::Error as VhostUserError;
use crate::MasterReq;
use crate::Result as VhostUserResult;
@ -30,7 +30,7 @@ use crate::SystemStream;
/// Client for a vhost-user device. The API is a thin abstraction over the vhost-user protocol.
pub struct Master {
// Used to send requests to the slave.
main_sock: Endpoint<MasterReq>,
main_sock: Connection<MasterReq>,
// Cached virtio features from the slave.
virtio_features: u64,
// Cached acked virtio features from the driver.
@ -42,11 +42,11 @@ pub struct Master {
impl Master {
/// Create a new instance from a Unix stream socket.
pub fn from_stream(sock: SystemStream) -> Self {
Self::new(Endpoint::from(sock))
Self::new(Connection::from(sock))
}
/// Create a new instance.
fn new(ep: Endpoint<MasterReq>) -> Self {
fn new(ep: Connection<MasterReq>) -> Self {
Master {
main_sock: ep,
virtio_features: 0,
@ -55,7 +55,7 @@ impl Master {
}
}
/// Create a new vhost-user master endpoint.
/// Create a new vhost-user master connection.
///
/// Will retry as the backend may not be ready to accept the connection.
///
@ -63,9 +63,9 @@ impl Master {
/// * `path` - path of Unix domain socket listener to connect to
pub fn connect<P: AsRef<Path>>(path: P) -> Result<Self> {
let mut retry_count = 5;
let endpoint = loop {
match Endpoint::connect(&path) {
Ok(endpoint) => break Ok(endpoint),
let connection = loop {
match Connection::connect(&path) {
Ok(connection) => break Ok(connection),
Err(e) => match &e {
VhostUserError::SocketConnect(why) => {
if why.kind() == std::io::ErrorKind::ConnectionRefused && retry_count > 0 {
@ -81,7 +81,7 @@ impl Master {
}
}?;
Ok(Self::new(endpoint))
Ok(Self::new(connection))
}
/// Get a bitmask of supported virtio/vhost features.
@ -858,7 +858,7 @@ mod tests {
.unwrap_err();
}
fn create_pair2() -> (Master, Endpoint<MasterReq>) {
fn create_pair2() -> (Master, Connection<MasterReq>) {
let (mut master, peer) = create_pair();
master.virtio_features = 0xffff_ffff;

View file

@ -18,7 +18,7 @@ use base::AsRawDescriptor;
use base::SafeDescriptor;
use crate::message::*;
use crate::Endpoint;
use crate::Connection;
use crate::Error;
use crate::HandlerResult;
use crate::Result;
@ -226,7 +226,7 @@ impl<S: VhostUserMasterReqHandlerMut> VhostUserMasterReqHandler for Mutex<S> {
/// Server to handle service requests from slaves from the slave communication channel.
pub struct MasterReqHandler<S: VhostUserMasterReqHandler> {
// underlying Unix domain socket for communication
sub_sock: Endpoint<SlaveReq>,
sub_sock: Connection<SlaveReq>,
tx_sock: Option<SystemStream>,
// Serializes tx_sock for passing to the backend.
serialize_tx: Box<dyn Fn(SystemStream) -> SafeDescriptor + Send>,
@ -253,7 +253,7 @@ impl<S: VhostUserMasterReqHandler> MasterReqHandler<S> {
let (tx, rx) = SystemStream::pair()?;
Ok(MasterReqHandler {
sub_sock: Endpoint::from(rx),
sub_sock: Connection::from(rx),
tx_sock: Some(tx),
serialize_tx,
reply_ack_negotiated: false,

View file

@ -12,7 +12,7 @@ use base::RawDescriptor;
use zerocopy::AsBytes;
use crate::message::*;
use crate::Endpoint;
use crate::Connection;
use crate::Error;
use crate::HandlerResult;
use crate::Result;
@ -21,12 +21,12 @@ use crate::SystemStream;
use crate::VhostUserMasterReqHandler;
struct SlaveInternal {
sock: Endpoint<SlaveReq>,
sock: Connection<SlaveReq>,
// Protocol feature VHOST_USER_PROTOCOL_F_REPLY_ACK has been negotiated.
reply_ack_negotiated: bool,
// whether the endpoint has encountered any failure
// whether the connection has encountered any failure
error: Option<i32>,
}
@ -87,8 +87,8 @@ pub struct Slave {
}
impl Slave {
/// Constructs a new slave proxy from the given endpoint.
pub fn new(ep: Endpoint<SlaveReq>) -> Self {
/// Constructs a new slave proxy from the given connection.
pub fn new(ep: Connection<SlaveReq>) -> Self {
Slave {
node: Arc::new(Mutex::new(SlaveInternal {
sock: ep,
@ -118,7 +118,7 @@ impl Slave {
/// Create a new instance from a `SystemStream` object.
pub fn from_stream(sock: SystemStream) -> Self {
Self::new(Endpoint::from(sock))
Self::new(Connection::from(sock))
}
/// Set the negotiation state of the `VHOST_USER_PROTOCOL_F_REPLY_ACK` protocol feature.
@ -130,7 +130,7 @@ impl Slave {
self.node().reply_ack_negotiated = enable;
}
/// Mark endpoint as failed with specified error code.
/// Mark connection as failed with specified error code.
pub fn set_failed(&self, error: i32) {
self.node().error = Some(error);
}
@ -204,7 +204,7 @@ mod tests {
fn test_slave_recv_negative() {
let (p1, p2) = SystemStream::pair().unwrap();
let fs_cache = Slave::from_stream(p1);
let master = Endpoint::from(p2);
let master = Connection::from(p2);
let len = mem::size_of::<VhostUserFSSlaveMsg>();
let mut hdr = VhostUserMsgHeader::new(

View file

@ -15,7 +15,7 @@ use zerocopy::Ref;
use crate::message::*;
use crate::take_single_file;
use crate::Endpoint;
use crate::Connection;
use crate::Error;
use crate::MasterReq;
use crate::Result;
@ -71,7 +71,7 @@ pub trait VhostUserSlaveReqHandler {
fn set_vring_enable(&self, index: u32, enable: bool) -> Result<()>;
fn get_config(&self, offset: u32, size: u32, flags: VhostUserConfigFlags) -> Result<Vec<u8>>;
fn set_config(&self, offset: u32, buf: &[u8], flags: VhostUserConfigFlags) -> Result<()>;
fn set_slave_req_fd(&self, _vu_req: Endpoint<SlaveReq>) {}
fn set_slave_req_fd(&self, _vu_req: Connection<SlaveReq>) {}
fn get_inflight_fd(&self, inflight: &VhostUserInflight) -> Result<(VhostUserInflight, File)>;
fn set_inflight_fd(&self, inflight: &VhostUserInflight, file: File) -> Result<()>;
fn get_max_mem_slots(&self) -> Result<u64>;
@ -121,7 +121,7 @@ pub trait VhostUserSlaveReqHandlerMut {
flags: VhostUserConfigFlags,
) -> Result<Vec<u8>>;
fn set_config(&mut self, offset: u32, buf: &[u8], flags: VhostUserConfigFlags) -> Result<()>;
fn set_slave_req_fd(&mut self, _vu_req: Endpoint<SlaveReq>) {}
fn set_slave_req_fd(&mut self, _vu_req: Connection<SlaveReq>) {}
fn get_inflight_fd(
&mut self,
inflight: &VhostUserInflight,
@ -224,7 +224,7 @@ impl<T: VhostUserSlaveReqHandlerMut> VhostUserSlaveReqHandler for Mutex<T> {
self.lock().unwrap().set_config(offset, buf, flags)
}
fn set_slave_req_fd(&self, vu_req: Endpoint<SlaveReq>) {
fn set_slave_req_fd(&self, vu_req: Connection<SlaveReq>) {
self.lock().unwrap().set_slave_req_fd(vu_req)
}
@ -354,7 +354,7 @@ where
self.as_ref().set_config(offset, buf, flags)
}
fn set_slave_req_fd(&self, vu_req: Endpoint<SlaveReq>) {
fn set_slave_req_fd(&self, vu_req: Connection<SlaveReq>) {
self.as_ref().set_slave_req_fd(vu_req)
}
@ -399,20 +399,20 @@ where
}
}
/// Abstracts |Endpoint| related operations for vhost-user slave implementations.
/// Abstracts |Connection| related operations for vhost-user slave implementations.
pub struct SlaveReqHelper {
/// Underlying endpoint for communication.
endpoint: Endpoint<MasterReq>,
/// Underlying connection for communication.
connection: Connection<MasterReq>,
/// Sending ack for messages without payload.
reply_ack_enabled: bool,
}
impl SlaveReqHelper {
/// Creates a new |SlaveReqHelper| instance with an |Endpoint| underneath it.
pub fn new(endpoint: Endpoint<MasterReq>) -> Self {
/// Creates a new |SlaveReqHelper| instance with an |Connection| underneath it.
pub fn new(connection: Connection<MasterReq>) -> Self {
SlaveReqHelper {
endpoint,
connection,
reply_ack_enabled: false,
}
}
@ -444,7 +444,7 @@ impl SlaveReqHelper {
self.new_reply_header::<VhostUserU64>(req, 0)?;
let val = if success { 0 } else { 1 };
let msg = VhostUserU64::new(val);
self.endpoint.send_message(&hdr, &msg, None)?;
self.connection.send_message(&hdr, &msg, None)?;
}
Ok(())
}
@ -455,7 +455,7 @@ impl SlaveReqHelper {
msg: &T,
) -> Result<()> {
let hdr = self.new_reply_header::<T>(req, 0)?;
self.endpoint.send_message(&hdr, msg, None)?;
self.connection.send_message(&hdr, msg, None)?;
Ok(())
}
@ -466,7 +466,7 @@ impl SlaveReqHelper {
payload: &[u8],
) -> Result<()> {
let hdr = self.new_reply_header::<T>(req, payload.len())?;
self.endpoint
self.connection
.send_message_with_payload(&hdr, msg, payload, None)?;
Ok(())
}
@ -501,21 +501,21 @@ impl SlaveReqHelper {
}
}
impl AsRef<Endpoint<MasterReq>> for SlaveReqHelper {
fn as_ref(&self) -> &Endpoint<MasterReq> {
&self.endpoint
impl AsRef<Connection<MasterReq>> for SlaveReqHelper {
fn as_ref(&self) -> &Connection<MasterReq> {
&self.connection
}
}
impl AsMut<Endpoint<MasterReq>> for SlaveReqHelper {
fn as_mut(&mut self) -> &mut Endpoint<MasterReq> {
&mut self.endpoint
impl AsMut<Connection<MasterReq>> for SlaveReqHelper {
fn as_mut(&mut self) -> &mut Connection<MasterReq> {
&mut self.connection
}
}
impl AsRawDescriptor for SlaveReqHelper {
fn as_raw_descriptor(&self) -> RawDescriptor {
self.endpoint.as_raw_descriptor()
self.connection.as_raw_descriptor()
}
}
@ -542,9 +542,9 @@ pub struct SlaveReqHandler<S: VhostUserSlaveReqHandler> {
}
impl<S: VhostUserSlaveReqHandler> SlaveReqHandler<S> {
/// Create a vhost-user slave endpoint from a connected socket.
/// Create a vhost-user slave connection from a connected socket.
pub fn from_stream(socket: SystemStream, backend: S) -> Self {
Self::new(Endpoint::from(socket), backend)
Self::new(Connection::from(socket), backend)
}
}
@ -555,10 +555,10 @@ impl<S: VhostUserSlaveReqHandler> AsRef<S> for SlaveReqHandler<S> {
}
impl<S: VhostUserSlaveReqHandler> SlaveReqHandler<S> {
/// Create a vhost-user slave endpoint.
pub fn new(endpoint: Endpoint<MasterReq>, backend: S) -> Self {
/// Create a vhost-user slave connection.
pub fn new(connection: Connection<MasterReq>, backend: S) -> Self {
SlaveReqHandler {
slave_req_helper: SlaveReqHelper::new(endpoint),
slave_req_helper: SlaveReqHelper::new(connection),
backend,
virtio_features: 0,
acked_virtio_features: 0,
@ -624,7 +624,7 @@ impl<S: VhostUserSlaveReqHandler> SlaveReqHandler<S> {
// . recv optional message body and payload according size field in
// message header
// . validate message body and optional payload
let (hdr, files) = match self.slave_req_helper.endpoint.recv_header() {
let (hdr, files) = match self.slave_req_helper.connection.recv_header() {
Ok((hdr, files)) => (hdr, files),
Err(Error::Disconnect) => {
// If the client closed the connection before sending a header, this should be
@ -667,7 +667,7 @@ impl<S: VhostUserSlaveReqHandler> SlaveReqHandler<S> {
hdr: VhostUserMsgHeader<MasterReq>,
files: Option<Vec<File>>,
) -> Result<()> {
let buf = self.slave_req_helper.endpoint.recv_body_bytes(&hdr)?;
let buf = self.slave_req_helper.connection.recv_body_bytes(&hdr)?;
let size = buf.len();
match hdr.get_code() {
@ -836,7 +836,7 @@ impl<S: VhostUserSlaveReqHandler> SlaveReqHandler<S> {
let reply_hdr = self
.slave_req_helper
.new_reply_header::<VhostUserInflight>(&hdr, 0)?;
self.slave_req_helper.endpoint.send_message(
self.slave_req_helper.connection.send_message(
&reply_hdr,
&inflight,
Some(&[file.as_raw_descriptor()]),
@ -1037,8 +1037,8 @@ impl<S: VhostUserSlaveReqHandler> SlaveReqHandler<S> {
fn set_slave_req_fd(&mut self, files: Option<Vec<File>>) -> Result<()> {
let ep = self
.slave_req_helper
.endpoint
.create_slave_request_endpoint(files)?;
.connection
.create_slave_request_connection(files)?;
self.backend.set_slave_req_fd(ep);
Ok(())
}
@ -1113,7 +1113,7 @@ impl<S: VhostUserSlaveReqHandler> SlaveReqHandler<S> {
impl<S: VhostUserSlaveReqHandler> AsRawDescriptor for SlaveReqHandler<S> {
fn as_raw_descriptor(&self) -> RawDescriptor {
// TODO(b/221882601): figure out if this used for polling.
self.slave_req_helper.endpoint.as_raw_descriptor()
self.slave_req_helper.connection.as_raw_descriptor()
}
}
@ -1123,15 +1123,15 @@ mod tests {
use super::*;
use crate::dummy_slave::DummySlaveReqHandler;
use crate::Endpoint;
use crate::Connection;
use crate::SystemStream;
#[test]
fn test_slave_req_handler_new() {
let (p1, _p2) = SystemStream::pair().unwrap();
let endpoint = Endpoint::from(p1);
let connection = Connection::from(p1);
let backend = Mutex::new(DummySlaveReqHandler::new());
let handler = SlaveReqHandler::new(endpoint, backend);
let handler = SlaveReqHandler::new(connection, backend);
assert!(handler.as_raw_descriptor() != INVALID_DESCRIPTOR);
}

View file

@ -15,6 +15,6 @@ cfg_if::cfg_if! {
}
}
pub(crate) use platform::PlatformEndpoint;
pub(crate) use platform::PlatformConnection;
pub use platform::SystemStream;

View file

@ -12,4 +12,4 @@ pub type SystemListener = UnixListener;
/// Alias to enable platform independent code.
pub type SystemStream = UnixStream;
pub(crate) use crate::connection::socket::SocketEndpoint as PlatformEndpoint;
pub(crate) use crate::connection::socket::SocketPlatformConnection as PlatformConnection;

View file

@ -8,4 +8,4 @@ use base::Tube;
/// Alias to enable platform independent code.
pub type SystemStream = Tube;
pub(crate) use crate::connection::TubeEndpoint as PlatformEndpoint;
pub(crate) use crate::connection::TubePlatformConnection as PlatformConnection;