gpu_display: deprecate MessageRelayThread.

We used to spawn a separate thread to poll gpu_main_display_tube for
messages from the external supervisor, and then wrap those messages in
thread messages (WM_USER_HANDLE_SERVICE_MESSAGE_INTERNAL) that are
posted to the WndProc thread. Now that we have replaced GetMessageW() with
MsgWaitForMultipleObjects(), we can poll that tube and the message queue
at the same time in the WndProc thread, and then route service messages
to WindowMessageDispatcher::process_service_message().

The newly added MsgWaitContext looks similar to EventContext but it is
much simpler, since we don't need to worry about removing handles,
setting timeout, etc. It may get more complex when we start to add more
handles to it (for event devices, etc.), and we might look for ways to
unify the logic with EventContext, but this simple struct should suffice
for now.

Bug: 244489783
Bug: 244491590
Test: flat run emulator
Test: flat run battlestar

Change-Id: I5722523ad1424a1860f2883d33728a1baf9550cc
Reviewed-on: https://chromium-review.googlesource.com/c/crosvm/crosvm/+/4719209
Reviewed-by: Pujun Lun <lunpujun@google.com>
Reviewed-by: Noah Gold <nkgold@google.com>
Commit-Queue: Kaiyi Li <kaiyili@google.com>
This commit is contained in:
lunpujun 2022-11-03 10:58:37 -07:00 committed by crosvm LUCI
parent 17656db647
commit d56dbed44d
5 changed files with 175 additions and 238 deletions

View file

@ -1,147 +0,0 @@
// Copyright 2022 The ChromiumOS Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use std::marker::PhantomData;
use std::sync::Arc;
use std::thread;
use std::thread::JoinHandle;
use anyhow::anyhow;
use anyhow::Context;
use anyhow::Result;
use base::error;
use base::info;
use base::Event;
use base::EventToken;
use base::ReadNotifier;
use base::Tube;
use base::WaitContext;
use serde::de::DeserializeOwned;
use sync::Mutex;
use winapi::shared::minwindef::UINT;
use super::thread_message_util;
/// A trait implemented by all variants of `MessageRelayThread`.
pub(crate) trait MessageRelayThreadTrait {}
/// When receiving a message from the given tube, this thread posts a thread message to the WndProc
/// thread. A pointer to the message from the tube will be sent as the lParam, and the WndProc
/// thread is responsible for destructing it.
pub(crate) struct MessageRelayThread<T: DeserializeOwned + 'static> {
thread: Option<JoinHandle<()>>,
exit_event: Event,
_marker: PhantomData<T>,
}
impl<T: DeserializeOwned + 'static> MessageRelayThread<T> {
/// # Safety
/// Since this class posts messages to the WndProc thread, the instance of it must not be
/// created before the message queue is created on the WndProc thread, and must not outlive the
/// WndProc thread.
pub unsafe fn start_thread(
vm_tube: Arc<Mutex<Tube>>,
wndproc_thread_id: u32,
thread_message_id: UINT,
) -> Result<Box<dyn MessageRelayThreadTrait>> {
let exit_event = Event::new().unwrap();
let exit_event_clone = exit_event
.try_clone()
.map_err(|e| anyhow!("Failed to clone exit_event: {}", e))?;
let thread = thread::Builder::new()
.name("gpu_display_message_relay".into())
.spawn(move || {
Self::run_poll_loop(
vm_tube,
wndproc_thread_id,
thread_message_id,
exit_event_clone,
);
})
.context("When spawning message relay thread")?;
Ok(Box::new(Self {
thread: Some(thread),
exit_event,
_marker: PhantomData,
}))
}
fn run_poll_loop(
vm_tube: Arc<Mutex<Tube>>,
wndproc_thread_id: u32,
thread_message_id: UINT,
exit_event: Event,
) {
#[derive(EventToken)]
enum Token {
Message,
Exit,
}
let wait_ctx = WaitContext::build_with(&[
(vm_tube.lock().get_read_notifier(), Token::Message),
(&exit_event, Token::Exit),
])
.unwrap();
info!("Message relay thread entering poll loop");
'poll: loop {
let events = {
match wait_ctx.wait() {
Ok(v) => v,
Err(e) => {
error!("Failed to wait: {:?}", e);
break;
}
}
};
for event in events.iter().filter(|e| e.is_readable) {
match event.token {
Token::Message => match vm_tube.lock().recv::<T>() {
Ok(message) => {
Self::relay_message(wndproc_thread_id, thread_message_id, message);
}
Err(e) => error!("Failed to receive message through the tube: {:?}", e),
},
Token::Exit => {
break 'poll;
}
}
}
}
info!("Message relay thread exiting poll loop");
}
fn relay_message(wndproc_thread_id: u32, thread_message_id: UINT, message: T) {
// Safe because the user of this class guarantees that the WndProc thread is alive and has
// created the message queue.
if let Err(e) = unsafe {
thread_message_util::post_message_carrying_object(
wndproc_thread_id,
thread_message_id,
message,
)
} {
error!("Failed to relay message: {:?}", e);
}
}
}
impl<T: DeserializeOwned + 'static> Drop for MessageRelayThread<T> {
fn drop(&mut self) {
if let Err(e) = self.exit_event.signal() {
error!("Failed to inform message relay thread to exit: {:?}", e);
return;
}
if let Err(e) = self.thread.take().unwrap().join() {
error!("Failed to join with the message relay thread: {:?}", e);
return;
}
info!("Message relay thread exited gracefully");
}
}
impl<T: DeserializeOwned + 'static> MessageRelayThreadTrait for MessageRelayThread<T> {}

View file

@ -3,8 +3,6 @@
// found in the LICENSE file.
mod math_util;
#[allow(dead_code)]
mod message_relay_thread;
pub mod surface;
mod thread_message_util;
mod window;

View file

@ -103,8 +103,17 @@ impl<T: HandleWindowMessage> WindowMessageDispatcher<T> {
Ok(dispatcher)
}
#[cfg(feature = "kiwi")]
pub fn process_service_message(self: Pin<&mut Self>, message: &ServiceSendToGpu) {
// Safe because we won't move the dispatcher out of the returned mutable reference.
match unsafe { self.get_unchecked_mut().message_processor.as_mut() } {
Some(processor) => processor.handle_service_message(message),
None => error!("Cannot handle service message because there is no message processor!"),
}
}
pub fn process_thread_message(self: Pin<&mut Self>, packet: &MessagePacket) {
// Safe because we won't move the dispatcher out of it.
// Safe because we won't move the dispatcher out of the returned mutable reference.
unsafe {
self.get_unchecked_mut()
.process_thread_message_internal(packet);
@ -158,18 +167,6 @@ impl<T: HandleWindowMessage> WindowMessageDispatcher<T> {
l_param,
} = *packet;
match msg {
#[cfg(feature = "kiwi")]
WM_USER_HANDLE_SERVICE_MESSAGE_INTERNAL => {
// Safe because the sender gives up the ownership and expects the receiver to
// destruct the message.
let message = unsafe { Box::from_raw(l_param as *mut ServiceSendToGpu) };
match &mut self.message_processor {
Some(processor) => processor.handle_service_message(&*message),
None => error!(
"Cannot handle service message because there is no message processor!"
),
}
}
WM_USER_HANDLE_DISPLAY_MESSAGE_INTERNAL => {
// Safe because the sender gives up the ownership and expects the receiver to
// destruct the message.

View file

@ -28,21 +28,15 @@ use crate::EventDevice;
/// Relying on destructors might be a safer choice.
pub(crate) const WM_USER_WNDPROC_THREAD_DROP_KILL_WINDOW_INTERNAL: UINT = WM_USER;
// Thread message for handling the message sent from service. When the main event loop receives a
// message from service, it converts that to `ServiceSendToGpu` enum and sends it through a tube.
// The message relay thread receives it and sends the pointer to `ServiceSendToGpu` as the lParam.
#[cfg(feature = "kiwi")]
pub(crate) const WM_USER_HANDLE_SERVICE_MESSAGE_INTERNAL: UINT = WM_USER + 1;
// Message for handling the change in host viewport. This is sent when the host window size changes
// and we need to render to a different part of the window. The new width and height are sent as the
// low/high word of lParam.
pub(crate) const WM_USER_HOST_VIEWPORT_CHANGE_INTERNAL: UINT = WM_USER + 2;
pub(crate) const WM_USER_HOST_VIEWPORT_CHANGE_INTERNAL: UINT = WM_USER + 1;
/// Thread message for handling the message sent from the GPU worker thread. A pointer to enum
/// `DisplaySendToWndProc` is sent as the lParam. Note that the receiver is responsible for
/// destructing the message.
pub(crate) const WM_USER_HANDLE_DISPLAY_MESSAGE_INTERNAL: UINT = WM_USER + 3;
pub(crate) const WM_USER_HANDLE_DISPLAY_MESSAGE_INTERNAL: UINT = WM_USER + 2;
pub type CreateMessageHandlerFunction<T> =
Box<dyn FnOnce(&Window, DisplayEventDispatcher) -> Result<T>>;
@ -129,7 +123,7 @@ pub trait HandleWindowMessage {
/// Called when processing `WM_DISPLAYCHANGE`.
fn on_display_change(&mut self, _window: &Window) {}
/// Called when processing `WM_USER_HANDLE_SERVICE_MESSAGE_INTERNAL`.
/// Called when processing requests from the service.
#[cfg(feature = "kiwi")]
fn on_handle_service_message(&mut self, _window: &Window, _message: &ServiceSendToGpu) {}

View file

@ -2,8 +2,10 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use std::collections::HashMap;
use std::marker::PhantomData;
use std::mem;
use std::os::windows::io::RawHandle;
use std::pin::Pin;
use std::ptr::null_mut;
use std::sync::atomic::AtomicI32;
@ -21,29 +23,29 @@ use anyhow::Result;
use base::error;
use base::info;
use base::warn;
use base::AsRawDescriptor;
use base::Event;
use base::ReadNotifier;
use base::Tube;
use euclid::size2;
use sync::Mutex;
#[cfg(feature = "kiwi")]
use vm_control::ServiceSendToGpu;
use win_util::syscall_bail;
use win_util::win32_wide_string;
use winapi::shared::minwindef::DWORD;
use winapi::shared::minwindef::FALSE;
use winapi::shared::minwindef::LPARAM;
use winapi::shared::minwindef::LRESULT;
use winapi::shared::minwindef::UINT;
use winapi::shared::minwindef::WPARAM;
use winapi::shared::windef::HWND;
use winapi::um::errhandlingapi::GetLastError;
use winapi::um::processthreadsapi::GetCurrentThreadId;
use winapi::um::winbase::INFINITE;
use winapi::um::winbase::WAIT_FAILED;
use winapi::um::winbase::WAIT_OBJECT_0;
use winapi::um::winnt::MAXIMUM_WAIT_OBJECTS;
use winapi::um::winuser::*;
#[cfg(feature = "kiwi")]
use super::message_relay_thread::MessageRelayThread;
use super::message_relay_thread::MessageRelayThreadTrait;
use super::thread_message_util;
use super::window::MessagePacket;
use super::window::Window;
@ -68,6 +70,81 @@ enum MessageLoopState {
ExitedWithError,
}
#[derive(Copy, Clone, PartialEq)]
enum Token {
MessagePump,
ServiceMessage,
}
/// A context that can wait on both the thread-specific message queue and the given handles.
struct MsgWaitContext {
triggers: HashMap<RawHandle, Token>,
raw_handles: Vec<RawHandle>,
}
impl MsgWaitContext {
pub fn new() -> Self {
Self {
triggers: HashMap::new(),
raw_handles: Vec::new(),
}
}
/// Note that since there is no handle associated with `Token::MessagePump`, this token will be
/// used internally by `MsgWaitContext` and the caller should never use it.
pub fn add(&mut self, handle: &dyn AsRawDescriptor, token: Token) -> Result<()> {
if token == Token::MessagePump {
bail!("Token::MessagePump is reserved!");
}
if self.raw_handles.len() == MAXIMUM_WAIT_OBJECTS as usize {
bail!("Number of raw handles exceeding MAXIMUM_WAIT_OBJECTS!");
}
let raw_descriptor = handle.as_raw_descriptor();
if self.triggers.contains_key(&raw_descriptor) {
bail!("The handle has already been registered in MsgWaitContext!")
}
self.triggers.insert(raw_descriptor, token);
self.raw_handles.push(raw_descriptor);
Ok(())
}
/// Blocks the thread until there is any new message available on the message queue, or if any
/// of the given handles is signaled, and returns the associated token.
///
/// If multiple handles are signaled, this will return the token associated with the one that
/// was first added to this context.
///
/// # Safety
///
/// Caller is responsible for ensuring that the handles are still valid.
pub unsafe fn wait(&self) -> Result<Token> {
let num_handles = self.raw_handles.len();
// Safe because the caller is required to guarantee that the handles are valid, and failures
// are handled below.
let result = MsgWaitForMultipleObjects(
num_handles as DWORD,
self.raw_handles.as_ptr(),
/* fWaitAll= */ FALSE,
INFINITE,
QS_ALLINPUT,
);
match (result - WAIT_OBJECT_0) as usize {
// At least one of the handles has been signaled.
index if index < num_handles => Ok(self.triggers[&self.raw_handles[index]]),
// At least one message is available at the message queue.
index if index == num_handles => Ok(Token::MessagePump),
// Invalid cases. This is most likely a `WAIT_FAILED`, but anything not matched by the
// above is an error case.
_ => syscall_bail!(format!(
"MsgWaitForMultipleObjects() unexpectedly returned {}",
result
)),
}
}
}
/// This class runs the WndProc thread, and provides helper functions for other threads to
/// communicate with it.
pub struct WindowProcedureThread<T: HandleWindowMessage> {
@ -92,19 +169,10 @@ impl<T: HandleWindowMessage> WindowProcedureThread<T> {
let thread = match ThreadBuilder::new()
.name("gpu_display_wndproc".into())
.spawn(move || {
// Safe because GetCurrentThreadId has no failure mode.
let thread_id = unsafe { GetCurrentThreadId() };
// Must be called before any other threads post messages to the WndProc thread.
thread_message_util::force_create_message_queue();
// Safe because the message queue has been created, and the returned thread will go
// out of scope and get dropped before the WndProc thread exits.
let _message_relay_thread = unsafe {
vm_tube.and_then(|tube| Self::start_message_relay_thread(tube, thread_id))
};
Self::run_message_loop(thread_id_sender, message_loop_state_clone);
Self::run_message_loop(thread_id_sender, message_loop_state_clone, vm_tube);
if let Err(e) = thread_terminated_event_clone.signal() {
error!("Failed to signal thread terminated event: {}", e);
@ -151,7 +219,11 @@ impl<T: HandleWindowMessage> WindowProcedureThread<T> {
}
}
fn run_message_loop(thread_id_sender: Sender<Result<u32>>, message_loop_state: Arc<AtomicI32>) {
fn run_message_loop(
thread_id_sender: Sender<Result<u32>>,
message_loop_state: Arc<AtomicI32>,
vm_tube: Option<Arc<Mutex<Tube>>>,
) {
// Safe because the dispatcher will take care of the lifetime of the `Window` object.
let create_window_res = unsafe { Self::create_window() };
match create_window_res.and_then(|window| WindowMessageDispatcher::<T>::create(window)) {
@ -163,7 +235,7 @@ impl<T: HandleWindowMessage> WindowProcedureThread<T> {
error!("Failed to send WndProc thread ID: {}", e);
}
let exit_state = Self::run_message_loop_body(dispatcher);
let exit_state = Self::run_message_loop_body(dispatcher, vm_tube);
message_loop_state.store(exit_state as i32, Ordering::SeqCst);
}
Err(e) => {
@ -181,36 +253,47 @@ impl<T: HandleWindowMessage> WindowProcedureThread<T> {
fn run_message_loop_body(
mut message_dispatcher: Pin<Box<WindowMessageDispatcher<T>>>,
vm_tube: Option<Arc<Mutex<Tube>>>,
) -> MessageLoopState {
let mut msg_wait_ctx = MsgWaitContext::new();
if let Some(tube) = &vm_tube {
if let Err(e) = msg_wait_ctx.add(tube.lock().get_read_notifier(), Token::ServiceMessage)
{
error!(
"Failed to add service message read notifier to MsgWaitContext: {:?}",
e
);
return MessageLoopState::EarlyTerminatedWithError;
}
}
loop {
// Safe because we don't pass in any handle, and failures are handled below.
match unsafe {
MsgWaitForMultipleObjects(
/* nCount= */ 0,
/* pHandles= */ null_mut(),
/* fWaitAll= */ FALSE,
INFINITE,
QS_ALLINPUT,
)
} {
WAIT_OBJECT_0 => {
if !Self::retrieve_and_dispatch_messages(&mut message_dispatcher) {
info!("WndProc thread exiting message loop normally");
return MessageLoopState::ExitedNormally;
// Safe because the lifetime of handles are at least as long as the function call.
match unsafe { msg_wait_ctx.wait() } {
Ok(token) => match token {
Token::MessagePump => {
if !Self::retrieve_and_dispatch_messages(&mut message_dispatcher) {
info!("WndProc thread exiting message loop normally");
return MessageLoopState::ExitedNormally;
}
}
}
WAIT_FAILED => {
Token::ServiceMessage => {
#[cfg(feature = "kiwi")]
Self::read_and_dispatch_service_message(
&mut message_dispatcher,
// We never use this token if `vm_tube` is None, so `expect()` should
// always succeed.
vm_tube.as_ref().expect("Service message tube is None"),
);
}
},
Err(e) => {
error!(
"WndProc thread exiting message loop because MsgWaitForMultipleObjects() \
failed with error code {}",
unsafe { GetLastError() }
"WndProc thread exiting message loop because of error: {:?}",
e
);
return MessageLoopState::ExitedWithError;
}
ret => warn!(
"MsgWaitForMultipleObjects() returned unexpected value {}",
ret
),
}
}
}
@ -270,6 +353,25 @@ impl<T: HandleWindowMessage> WindowProcedureThread<T> {
}
}
#[cfg(feature = "kiwi")]
fn read_and_dispatch_service_message(
message_dispatcher: &mut Pin<Box<WindowMessageDispatcher<T>>>,
vm_tube: &Arc<Mutex<Tube>>,
) {
// We might want to send messages via `vm_tube` while processing service messages, so we
// have to make sure the mutex on this tube has been released before processing the message.
let message = match vm_tube.lock().recv::<ServiceSendToGpu>() {
Ok(message) => message,
Err(e) => {
error!("Failed to receive service message through the tube: {}", e);
return;
}
};
message_dispatcher
.as_mut()
.process_service_message(&message);
}
fn is_message_loop_running(&self) -> bool {
self.message_loop_state.as_ref().map_or(false, |state| {
state.load(Ordering::SeqCst) == MessageLoopState::Running as i32
@ -338,33 +440,6 @@ impl<T: HandleWindowMessage> WindowProcedureThread<T> {
)
}
/// # Safety
/// The message queue must have been created on the WndProc thread before calling this, and the
/// returned thread must not outlive the WndProc thread.
unsafe fn start_message_relay_thread(
#[allow(unused_variables)] vm_tube: Arc<Mutex<Tube>>,
#[allow(unused_variables)] wndproc_thread_id: u32,
) -> Option<Box<dyn MessageRelayThreadTrait>> {
#[cfg(feature = "kiwi")]
match MessageRelayThread::<ServiceSendToGpu>::start_thread(
vm_tube,
wndproc_thread_id,
WM_USER_HANDLE_SERVICE_MESSAGE_INTERNAL,
) {
Ok(thread) => Some(thread),
// We won't get messages from the service if we failed to spawn this thread. It may not
// worth terminating the WndProc thread and crashing the emulator in that case, so we
// just log the error.
Err(e) => {
error!("{:?}", e);
None
}
}
#[cfg(not(feature = "kiwi"))]
None
}
unsafe extern "system" fn wnd_proc(
hwnd: HWND,
msg: UINT,
@ -398,3 +473,23 @@ impl<T: HandleWindowMessage> Drop for WindowProcedureThread<T> {
// Since `WindowProcedureThread` does not hold anything that cannot be transferred between threads,
// we can implement `Send` for it.
unsafe impl<T: HandleWindowMessage> Send for WindowProcedureThread<T> {}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn error_on_adding_reserved_token_to_context() {
let mut ctx = MsgWaitContext::new();
let event = Event::new().unwrap();
assert!(ctx.add(&event, Token::MessagePump).is_err());
}
#[test]
fn error_on_adding_duplicated_handle_to_context() {
let mut ctx = MsgWaitContext::new();
let event = Event::new().unwrap();
assert!(ctx.add(&event, Token::ServiceMessage).is_ok());
assert!(ctx.add(&event, Token::ServiceMessage).is_err());
}
}