From d60bf4390310481f7f58f08bae9174d5c4d20132 Mon Sep 17 00:00:00 2001 From: Alexandre Courbot Date: Sun, 17 Apr 2022 11:05:56 +0900 Subject: [PATCH] devices: vhost-user: handler: stop spawning a thread for the socket listener The async methods of the socket VVU handler were calling the non-async accept() on a dedicated thread so the async method does not get blocked awaiting for a connection. This socket-specific behavior forces us to have dedicated code for the socket and VVU handlers. Fix this by adding a poll_descriptor() method to vmm_vhost's Listener trait that returns a descriptor that callers can poll on if a call to accept() can block. This way we can wait for the connection asynchronously and avoid using a thread for that. It also opens the way towards factorizing the socket and VVU specific code. BUG=b:229554679 TEST=vhost-user console device works. Change-Id: I3fbc96d3904f777c6165b04564f577c553ce55d2 Reviewed-on: https://chromium-review.googlesource.com/c/chromiumos/platform/crosvm/+/3592632 Reviewed-by: Keiichi Watanabe Tested-by: kokoro Commit-Queue: Alexandre Courbot --- .../vhost/user/device/handler/sys/unix.rs | 44 ++++++++++++------- third_party/vmm_vhost/src/slave.rs | 13 ++++++ 2 files changed, 40 insertions(+), 17 deletions(-) diff --git a/devices/src/virtio/vhost/user/device/handler/sys/unix.rs b/devices/src/virtio/vhost/user/device/handler/sys/unix.rs index cb5de2b6fc..3b43f51705 100644 --- a/devices/src/virtio/vhost/user/device/handler/sys/unix.rs +++ b/devices/src/virtio/vhost/user/device/handler/sys/unix.rs @@ -14,9 +14,9 @@ use cros_async::{AsyncWrapper, Executor}; use vm_memory::GuestMemory; use vmm_vhost::{ connection::{ - socket::Listener as SocketListener, + socket::{Endpoint as SocketEndpoint, Listener as SocketListener}, vfio::{Endpoint as VfioEndpoint, Listener as VfioListener}, - Endpoint, Listener, + Endpoint, }, message::{MasterReq, VhostUserMemoryRegion}, Error as VhostError, Protocol, Result as VhostResult, SlaveListener, SlaveReqHandler, @@ -255,22 +255,32 @@ where /// Attaches to an already bound socket via `listener` and handles incoming messages from the /// VMM, which are dispatched to the device backend via the `VhostUserBackend` trait methods. - pub async fn run_with_listener( - self, - mut listener: SocketListener, - ex: &Executor, - ) -> Result<()> { - let socket = ex - .spawn_blocking(move || { - listener - .accept() - .context("failed to accept an incoming connection") - }) - .await? - .ok_or(anyhow!("failed to accept an incoming connection"))?; - let req_handler = SlaveReqHandler::from_stream(socket, std::sync::Mutex::new(self)); + pub async fn run_with_listener(self, listener: SocketListener, ex: &Executor) -> Result<()> { + let mut listener = + SlaveListener::, _>::new(listener, std::sync::Mutex::new(self))?; + listener.set_nonblocking(true)?; - run_handler(req_handler, ex).await + loop { + // If the listener is not ready on the first call to `accept` and returns `None`, we + // temporarily convert it into an async I/O source and yield until it signals there is + // input data awaiting, before trying again. + match listener + .accept() + .context("failed to accept an incoming connection")? + { + Some(req_handler) => return run_handler(req_handler, ex).await, + None => { + // Nobody is on the other end yet, wait until we get a connection. + let async_waiter = ex + .async_from_local(AsyncWrapper::new(listener)) + .context("failed to create async waiter")?; + async_waiter.wait_readable().await?; + + // Retrieve the listener back so we can use it again. + listener = async_waiter.into_source().into_inner(); + } + } + } } /// Starts listening virtio-vhost-user device with VFIO to handle incoming vhost-user messages diff --git a/third_party/vmm_vhost/src/slave.rs b/third_party/vmm_vhost/src/slave.rs index 7e04b507d9..4181516873 100644 --- a/third_party/vmm_vhost/src/slave.rs +++ b/third_party/vmm_vhost/src/slave.rs @@ -5,6 +5,8 @@ //! //! These are used on platforms where the slave has to listen for connections (e.g. POSIX only). +use base::AsRawDescriptor; + use super::connection::{Endpoint, Listener}; use super::message::*; use super::{Result, SlaveReqHandler, VhostUserSlaveReqHandler}; @@ -45,6 +47,17 @@ impl, S: VhostUserSlaveReqHandler> SlaveListener { } } +impl AsRawDescriptor for SlaveListener +where + E: Endpoint, + E::Listener: AsRawDescriptor, + S: VhostUserSlaveReqHandler, +{ + fn as_raw_descriptor(&self) -> base::RawDescriptor { + self.listener.as_raw_descriptor() + } +} + #[cfg(test)] mod tests { use std::sync::Mutex;