devices: vhost-user: handler: stop spawning a thread for the socket listener

The async methods of the socket VVU handler were calling the non-async
accept() on a dedicated thread so the async method does not get blocked
awaiting for a connection. This socket-specific behavior forces us to
have dedicated code for the socket and VVU handlers.

Fix this by adding a poll_descriptor() method to vmm_vhost's Listener
trait that returns a descriptor that callers can poll on if a call to
accept() can block. This way we can wait for the connection
asynchronously and avoid using a thread for that. It also opens the way
towards factorizing the socket and VVU specific code.

BUG=b:229554679
TEST=vhost-user console device works.

Change-Id: I3fbc96d3904f777c6165b04564f577c553ce55d2
Reviewed-on: https://chromium-review.googlesource.com/c/chromiumos/platform/crosvm/+/3592632
Reviewed-by: Keiichi Watanabe <keiichiw@chromium.org>
Tested-by: kokoro <noreply+kokoro@google.com>
Commit-Queue: Alexandre Courbot <acourbot@chromium.org>
This commit is contained in:
Alexandre Courbot 2022-04-17 11:05:56 +09:00 committed by Chromeos LUCI
parent a5c1b2ff7f
commit d60bf43903
2 changed files with 40 additions and 17 deletions

View file

@ -14,9 +14,9 @@ use cros_async::{AsyncWrapper, Executor};
use vm_memory::GuestMemory;
use vmm_vhost::{
connection::{
socket::Listener as SocketListener,
socket::{Endpoint as SocketEndpoint, Listener as SocketListener},
vfio::{Endpoint as VfioEndpoint, Listener as VfioListener},
Endpoint, Listener,
Endpoint,
},
message::{MasterReq, VhostUserMemoryRegion},
Error as VhostError, Protocol, Result as VhostResult, SlaveListener, SlaveReqHandler,
@ -255,22 +255,32 @@ where
/// Attaches to an already bound socket via `listener` and handles incoming messages from the
/// VMM, which are dispatched to the device backend via the `VhostUserBackend` trait methods.
pub async fn run_with_listener(
self,
mut listener: SocketListener,
ex: &Executor,
) -> Result<()> {
let socket = ex
.spawn_blocking(move || {
listener
.accept()
.context("failed to accept an incoming connection")
})
.await?
.ok_or(anyhow!("failed to accept an incoming connection"))?;
let req_handler = SlaveReqHandler::from_stream(socket, std::sync::Mutex::new(self));
pub async fn run_with_listener(self, listener: SocketListener, ex: &Executor) -> Result<()> {
let mut listener =
SlaveListener::<SocketEndpoint<_>, _>::new(listener, std::sync::Mutex::new(self))?;
listener.set_nonblocking(true)?;
run_handler(req_handler, ex).await
loop {
// If the listener is not ready on the first call to `accept` and returns `None`, we
// temporarily convert it into an async I/O source and yield until it signals there is
// input data awaiting, before trying again.
match listener
.accept()
.context("failed to accept an incoming connection")?
{
Some(req_handler) => return run_handler(req_handler, ex).await,
None => {
// Nobody is on the other end yet, wait until we get a connection.
let async_waiter = ex
.async_from_local(AsyncWrapper::new(listener))
.context("failed to create async waiter")?;
async_waiter.wait_readable().await?;
// Retrieve the listener back so we can use it again.
listener = async_waiter.into_source().into_inner();
}
}
}
}
/// Starts listening virtio-vhost-user device with VFIO to handle incoming vhost-user messages

View file

@ -5,6 +5,8 @@
//!
//! These are used on platforms where the slave has to listen for connections (e.g. POSIX only).
use base::AsRawDescriptor;
use super::connection::{Endpoint, Listener};
use super::message::*;
use super::{Result, SlaveReqHandler, VhostUserSlaveReqHandler};
@ -45,6 +47,17 @@ impl<E: Endpoint<MasterReq>, S: VhostUserSlaveReqHandler> SlaveListener<E, S> {
}
}
impl<E, S> AsRawDescriptor for SlaveListener<E, S>
where
E: Endpoint<MasterReq>,
E::Listener: AsRawDescriptor,
S: VhostUserSlaveReqHandler,
{
fn as_raw_descriptor(&self) -> base::RawDescriptor {
self.listener.as_raw_descriptor()
}
}
#[cfg(test)]
mod tests {
use std::sync::Mutex;