From d56dbed44d732b8a7ef98d55f824b417e7c9d61d Mon Sep 17 00:00:00 2001 From: lunpujun Date: Thu, 3 Nov 2022 10:58:37 -0700 Subject: [PATCH] gpu_display: deprecate MessageRelayThread. We used to spawn a separate thread to poll gpu_main_display_tube for messages from the external supervisor, and then wrap those messages in thread messages (WM_USER_HANDLE_SERVICE_MESSAGE_INTERNAL) that are posted to the WndProc thread. Now that we have replaced GetMessageW() with MsgWaitForMultipleObjects(), we can poll that tube and the message queue at the same time in the WndProc thread, and then route service messages to WindowMessageDispatcher::process_service_message(). The newly added MsgWaitContext looks similar to EventContext but it is much simpler, since we don't need to worry about removing handles, setting timeout, etc. It may get more complex when we start to add more handles to it (for event devices, etc.), and we might look for ways to unify the logic with EventContext, but this simple struct should suffice for now. Bug: 244489783 Bug: 244491590 Test: flat run emulator Test: flat run battlestar Change-Id: I5722523ad1424a1860f2883d33728a1baf9550cc Reviewed-on: https://chromium-review.googlesource.com/c/crosvm/crosvm/+/4719209 Reviewed-by: Pujun Lun Reviewed-by: Noah Gold Commit-Queue: Kaiyi Li --- .../gpu_display_win/message_relay_thread.rs | 147 ----------- gpu_display/src/gpu_display_win/mod.rs | 2 - .../window_message_dispatcher.rs | 23 +- .../window_message_processor.rs | 12 +- .../window_procedure_thread.rs | 229 +++++++++++++----- 5 files changed, 175 insertions(+), 238 deletions(-) delete mode 100644 gpu_display/src/gpu_display_win/message_relay_thread.rs diff --git a/gpu_display/src/gpu_display_win/message_relay_thread.rs b/gpu_display/src/gpu_display_win/message_relay_thread.rs deleted file mode 100644 index 7c36a2f9c3..0000000000 --- a/gpu_display/src/gpu_display_win/message_relay_thread.rs +++ /dev/null @@ -1,147 +0,0 @@ -// Copyright 2022 The ChromiumOS Authors -// Use of this source code is governed by a BSD-style license that can be -// found in the LICENSE file. - -use std::marker::PhantomData; -use std::sync::Arc; -use std::thread; -use std::thread::JoinHandle; - -use anyhow::anyhow; -use anyhow::Context; -use anyhow::Result; -use base::error; -use base::info; -use base::Event; -use base::EventToken; -use base::ReadNotifier; -use base::Tube; -use base::WaitContext; -use serde::de::DeserializeOwned; -use sync::Mutex; -use winapi::shared::minwindef::UINT; - -use super::thread_message_util; - -/// A trait implemented by all variants of `MessageRelayThread`. -pub(crate) trait MessageRelayThreadTrait {} - -/// When receiving a message from the given tube, this thread posts a thread message to the WndProc -/// thread. A pointer to the message from the tube will be sent as the lParam, and the WndProc -/// thread is responsible for destructing it. -pub(crate) struct MessageRelayThread { - thread: Option>, - exit_event: Event, - _marker: PhantomData, -} - -impl MessageRelayThread { - /// # Safety - /// Since this class posts messages to the WndProc thread, the instance of it must not be - /// created before the message queue is created on the WndProc thread, and must not outlive the - /// WndProc thread. - pub unsafe fn start_thread( - vm_tube: Arc>, - wndproc_thread_id: u32, - thread_message_id: UINT, - ) -> Result> { - let exit_event = Event::new().unwrap(); - let exit_event_clone = exit_event - .try_clone() - .map_err(|e| anyhow!("Failed to clone exit_event: {}", e))?; - let thread = thread::Builder::new() - .name("gpu_display_message_relay".into()) - .spawn(move || { - Self::run_poll_loop( - vm_tube, - wndproc_thread_id, - thread_message_id, - exit_event_clone, - ); - }) - .context("When spawning message relay thread")?; - - Ok(Box::new(Self { - thread: Some(thread), - exit_event, - _marker: PhantomData, - })) - } - - fn run_poll_loop( - vm_tube: Arc>, - wndproc_thread_id: u32, - thread_message_id: UINT, - exit_event: Event, - ) { - #[derive(EventToken)] - enum Token { - Message, - Exit, - } - - let wait_ctx = WaitContext::build_with(&[ - (vm_tube.lock().get_read_notifier(), Token::Message), - (&exit_event, Token::Exit), - ]) - .unwrap(); - - info!("Message relay thread entering poll loop"); - 'poll: loop { - let events = { - match wait_ctx.wait() { - Ok(v) => v, - Err(e) => { - error!("Failed to wait: {:?}", e); - break; - } - } - }; - - for event in events.iter().filter(|e| e.is_readable) { - match event.token { - Token::Message => match vm_tube.lock().recv::() { - Ok(message) => { - Self::relay_message(wndproc_thread_id, thread_message_id, message); - } - Err(e) => error!("Failed to receive message through the tube: {:?}", e), - }, - Token::Exit => { - break 'poll; - } - } - } - } - info!("Message relay thread exiting poll loop"); - } - - fn relay_message(wndproc_thread_id: u32, thread_message_id: UINT, message: T) { - // Safe because the user of this class guarantees that the WndProc thread is alive and has - // created the message queue. - if let Err(e) = unsafe { - thread_message_util::post_message_carrying_object( - wndproc_thread_id, - thread_message_id, - message, - ) - } { - error!("Failed to relay message: {:?}", e); - } - } -} - -impl Drop for MessageRelayThread { - fn drop(&mut self) { - if let Err(e) = self.exit_event.signal() { - error!("Failed to inform message relay thread to exit: {:?}", e); - return; - } - if let Err(e) = self.thread.take().unwrap().join() { - error!("Failed to join with the message relay thread: {:?}", e); - return; - } - info!("Message relay thread exited gracefully"); - } -} - -impl MessageRelayThreadTrait for MessageRelayThread {} diff --git a/gpu_display/src/gpu_display_win/mod.rs b/gpu_display/src/gpu_display_win/mod.rs index db894dafff..620bee0991 100644 --- a/gpu_display/src/gpu_display_win/mod.rs +++ b/gpu_display/src/gpu_display_win/mod.rs @@ -3,8 +3,6 @@ // found in the LICENSE file. mod math_util; -#[allow(dead_code)] -mod message_relay_thread; pub mod surface; mod thread_message_util; mod window; diff --git a/gpu_display/src/gpu_display_win/window_message_dispatcher.rs b/gpu_display/src/gpu_display_win/window_message_dispatcher.rs index f41e2b2d09..429b195d84 100644 --- a/gpu_display/src/gpu_display_win/window_message_dispatcher.rs +++ b/gpu_display/src/gpu_display_win/window_message_dispatcher.rs @@ -103,8 +103,17 @@ impl WindowMessageDispatcher { Ok(dispatcher) } + #[cfg(feature = "kiwi")] + pub fn process_service_message(self: Pin<&mut Self>, message: &ServiceSendToGpu) { + // Safe because we won't move the dispatcher out of the returned mutable reference. + match unsafe { self.get_unchecked_mut().message_processor.as_mut() } { + Some(processor) => processor.handle_service_message(message), + None => error!("Cannot handle service message because there is no message processor!"), + } + } + pub fn process_thread_message(self: Pin<&mut Self>, packet: &MessagePacket) { - // Safe because we won't move the dispatcher out of it. + // Safe because we won't move the dispatcher out of the returned mutable reference. unsafe { self.get_unchecked_mut() .process_thread_message_internal(packet); @@ -158,18 +167,6 @@ impl WindowMessageDispatcher { l_param, } = *packet; match msg { - #[cfg(feature = "kiwi")] - WM_USER_HANDLE_SERVICE_MESSAGE_INTERNAL => { - // Safe because the sender gives up the ownership and expects the receiver to - // destruct the message. - let message = unsafe { Box::from_raw(l_param as *mut ServiceSendToGpu) }; - match &mut self.message_processor { - Some(processor) => processor.handle_service_message(&*message), - None => error!( - "Cannot handle service message because there is no message processor!" - ), - } - } WM_USER_HANDLE_DISPLAY_MESSAGE_INTERNAL => { // Safe because the sender gives up the ownership and expects the receiver to // destruct the message. diff --git a/gpu_display/src/gpu_display_win/window_message_processor.rs b/gpu_display/src/gpu_display_win/window_message_processor.rs index 0a9d78611c..d3e568ff51 100644 --- a/gpu_display/src/gpu_display_win/window_message_processor.rs +++ b/gpu_display/src/gpu_display_win/window_message_processor.rs @@ -28,21 +28,15 @@ use crate::EventDevice; /// Relying on destructors might be a safer choice. pub(crate) const WM_USER_WNDPROC_THREAD_DROP_KILL_WINDOW_INTERNAL: UINT = WM_USER; -// Thread message for handling the message sent from service. When the main event loop receives a -// message from service, it converts that to `ServiceSendToGpu` enum and sends it through a tube. -// The message relay thread receives it and sends the pointer to `ServiceSendToGpu` as the lParam. -#[cfg(feature = "kiwi")] -pub(crate) const WM_USER_HANDLE_SERVICE_MESSAGE_INTERNAL: UINT = WM_USER + 1; - // Message for handling the change in host viewport. This is sent when the host window size changes // and we need to render to a different part of the window. The new width and height are sent as the // low/high word of lParam. -pub(crate) const WM_USER_HOST_VIEWPORT_CHANGE_INTERNAL: UINT = WM_USER + 2; +pub(crate) const WM_USER_HOST_VIEWPORT_CHANGE_INTERNAL: UINT = WM_USER + 1; /// Thread message for handling the message sent from the GPU worker thread. A pointer to enum /// `DisplaySendToWndProc` is sent as the lParam. Note that the receiver is responsible for /// destructing the message. -pub(crate) const WM_USER_HANDLE_DISPLAY_MESSAGE_INTERNAL: UINT = WM_USER + 3; +pub(crate) const WM_USER_HANDLE_DISPLAY_MESSAGE_INTERNAL: UINT = WM_USER + 2; pub type CreateMessageHandlerFunction = Box Result>; @@ -129,7 +123,7 @@ pub trait HandleWindowMessage { /// Called when processing `WM_DISPLAYCHANGE`. fn on_display_change(&mut self, _window: &Window) {} - /// Called when processing `WM_USER_HANDLE_SERVICE_MESSAGE_INTERNAL`. + /// Called when processing requests from the service. #[cfg(feature = "kiwi")] fn on_handle_service_message(&mut self, _window: &Window, _message: &ServiceSendToGpu) {} diff --git a/gpu_display/src/gpu_display_win/window_procedure_thread.rs b/gpu_display/src/gpu_display_win/window_procedure_thread.rs index c35129ff92..909f2bdd92 100644 --- a/gpu_display/src/gpu_display_win/window_procedure_thread.rs +++ b/gpu_display/src/gpu_display_win/window_procedure_thread.rs @@ -2,8 +2,10 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. +use std::collections::HashMap; use std::marker::PhantomData; use std::mem; +use std::os::windows::io::RawHandle; use std::pin::Pin; use std::ptr::null_mut; use std::sync::atomic::AtomicI32; @@ -21,29 +23,29 @@ use anyhow::Result; use base::error; use base::info; use base::warn; +use base::AsRawDescriptor; use base::Event; +use base::ReadNotifier; use base::Tube; use euclid::size2; use sync::Mutex; #[cfg(feature = "kiwi")] use vm_control::ServiceSendToGpu; +use win_util::syscall_bail; use win_util::win32_wide_string; +use winapi::shared::minwindef::DWORD; use winapi::shared::minwindef::FALSE; use winapi::shared::minwindef::LPARAM; use winapi::shared::minwindef::LRESULT; use winapi::shared::minwindef::UINT; use winapi::shared::minwindef::WPARAM; use winapi::shared::windef::HWND; -use winapi::um::errhandlingapi::GetLastError; use winapi::um::processthreadsapi::GetCurrentThreadId; use winapi::um::winbase::INFINITE; -use winapi::um::winbase::WAIT_FAILED; use winapi::um::winbase::WAIT_OBJECT_0; +use winapi::um::winnt::MAXIMUM_WAIT_OBJECTS; use winapi::um::winuser::*; -#[cfg(feature = "kiwi")] -use super::message_relay_thread::MessageRelayThread; -use super::message_relay_thread::MessageRelayThreadTrait; use super::thread_message_util; use super::window::MessagePacket; use super::window::Window; @@ -68,6 +70,81 @@ enum MessageLoopState { ExitedWithError, } +#[derive(Copy, Clone, PartialEq)] +enum Token { + MessagePump, + ServiceMessage, +} + +/// A context that can wait on both the thread-specific message queue and the given handles. +struct MsgWaitContext { + triggers: HashMap, + raw_handles: Vec, +} + +impl MsgWaitContext { + pub fn new() -> Self { + Self { + triggers: HashMap::new(), + raw_handles: Vec::new(), + } + } + + /// Note that since there is no handle associated with `Token::MessagePump`, this token will be + /// used internally by `MsgWaitContext` and the caller should never use it. + pub fn add(&mut self, handle: &dyn AsRawDescriptor, token: Token) -> Result<()> { + if token == Token::MessagePump { + bail!("Token::MessagePump is reserved!"); + } + if self.raw_handles.len() == MAXIMUM_WAIT_OBJECTS as usize { + bail!("Number of raw handles exceeding MAXIMUM_WAIT_OBJECTS!"); + } + + let raw_descriptor = handle.as_raw_descriptor(); + if self.triggers.contains_key(&raw_descriptor) { + bail!("The handle has already been registered in MsgWaitContext!") + } + + self.triggers.insert(raw_descriptor, token); + self.raw_handles.push(raw_descriptor); + Ok(()) + } + + /// Blocks the thread until there is any new message available on the message queue, or if any + /// of the given handles is signaled, and returns the associated token. + /// + /// If multiple handles are signaled, this will return the token associated with the one that + /// was first added to this context. + /// + /// # Safety + /// + /// Caller is responsible for ensuring that the handles are still valid. + pub unsafe fn wait(&self) -> Result { + let num_handles = self.raw_handles.len(); + // Safe because the caller is required to guarantee that the handles are valid, and failures + // are handled below. + let result = MsgWaitForMultipleObjects( + num_handles as DWORD, + self.raw_handles.as_ptr(), + /* fWaitAll= */ FALSE, + INFINITE, + QS_ALLINPUT, + ); + match (result - WAIT_OBJECT_0) as usize { + // At least one of the handles has been signaled. + index if index < num_handles => Ok(self.triggers[&self.raw_handles[index]]), + // At least one message is available at the message queue. + index if index == num_handles => Ok(Token::MessagePump), + // Invalid cases. This is most likely a `WAIT_FAILED`, but anything not matched by the + // above is an error case. + _ => syscall_bail!(format!( + "MsgWaitForMultipleObjects() unexpectedly returned {}", + result + )), + } + } +} + /// This class runs the WndProc thread, and provides helper functions for other threads to /// communicate with it. pub struct WindowProcedureThread { @@ -92,19 +169,10 @@ impl WindowProcedureThread { let thread = match ThreadBuilder::new() .name("gpu_display_wndproc".into()) .spawn(move || { - // Safe because GetCurrentThreadId has no failure mode. - let thread_id = unsafe { GetCurrentThreadId() }; - // Must be called before any other threads post messages to the WndProc thread. thread_message_util::force_create_message_queue(); - // Safe because the message queue has been created, and the returned thread will go - // out of scope and get dropped before the WndProc thread exits. - let _message_relay_thread = unsafe { - vm_tube.and_then(|tube| Self::start_message_relay_thread(tube, thread_id)) - }; - - Self::run_message_loop(thread_id_sender, message_loop_state_clone); + Self::run_message_loop(thread_id_sender, message_loop_state_clone, vm_tube); if let Err(e) = thread_terminated_event_clone.signal() { error!("Failed to signal thread terminated event: {}", e); @@ -151,7 +219,11 @@ impl WindowProcedureThread { } } - fn run_message_loop(thread_id_sender: Sender>, message_loop_state: Arc) { + fn run_message_loop( + thread_id_sender: Sender>, + message_loop_state: Arc, + vm_tube: Option>>, + ) { // Safe because the dispatcher will take care of the lifetime of the `Window` object. let create_window_res = unsafe { Self::create_window() }; match create_window_res.and_then(|window| WindowMessageDispatcher::::create(window)) { @@ -163,7 +235,7 @@ impl WindowProcedureThread { error!("Failed to send WndProc thread ID: {}", e); } - let exit_state = Self::run_message_loop_body(dispatcher); + let exit_state = Self::run_message_loop_body(dispatcher, vm_tube); message_loop_state.store(exit_state as i32, Ordering::SeqCst); } Err(e) => { @@ -181,36 +253,47 @@ impl WindowProcedureThread { fn run_message_loop_body( mut message_dispatcher: Pin>>, + vm_tube: Option>>, ) -> MessageLoopState { + let mut msg_wait_ctx = MsgWaitContext::new(); + if let Some(tube) = &vm_tube { + if let Err(e) = msg_wait_ctx.add(tube.lock().get_read_notifier(), Token::ServiceMessage) + { + error!( + "Failed to add service message read notifier to MsgWaitContext: {:?}", + e + ); + return MessageLoopState::EarlyTerminatedWithError; + } + } + loop { - // Safe because we don't pass in any handle, and failures are handled below. - match unsafe { - MsgWaitForMultipleObjects( - /* nCount= */ 0, - /* pHandles= */ null_mut(), - /* fWaitAll= */ FALSE, - INFINITE, - QS_ALLINPUT, - ) - } { - WAIT_OBJECT_0 => { - if !Self::retrieve_and_dispatch_messages(&mut message_dispatcher) { - info!("WndProc thread exiting message loop normally"); - return MessageLoopState::ExitedNormally; + // Safe because the lifetime of handles are at least as long as the function call. + match unsafe { msg_wait_ctx.wait() } { + Ok(token) => match token { + Token::MessagePump => { + if !Self::retrieve_and_dispatch_messages(&mut message_dispatcher) { + info!("WndProc thread exiting message loop normally"); + return MessageLoopState::ExitedNormally; + } } - } - WAIT_FAILED => { + Token::ServiceMessage => { + #[cfg(feature = "kiwi")] + Self::read_and_dispatch_service_message( + &mut message_dispatcher, + // We never use this token if `vm_tube` is None, so `expect()` should + // always succeed. + vm_tube.as_ref().expect("Service message tube is None"), + ); + } + }, + Err(e) => { error!( - "WndProc thread exiting message loop because MsgWaitForMultipleObjects() \ - failed with error code {}", - unsafe { GetLastError() } + "WndProc thread exiting message loop because of error: {:?}", + e ); return MessageLoopState::ExitedWithError; } - ret => warn!( - "MsgWaitForMultipleObjects() returned unexpected value {}", - ret - ), } } } @@ -270,6 +353,25 @@ impl WindowProcedureThread { } } + #[cfg(feature = "kiwi")] + fn read_and_dispatch_service_message( + message_dispatcher: &mut Pin>>, + vm_tube: &Arc>, + ) { + // We might want to send messages via `vm_tube` while processing service messages, so we + // have to make sure the mutex on this tube has been released before processing the message. + let message = match vm_tube.lock().recv::() { + Ok(message) => message, + Err(e) => { + error!("Failed to receive service message through the tube: {}", e); + return; + } + }; + message_dispatcher + .as_mut() + .process_service_message(&message); + } + fn is_message_loop_running(&self) -> bool { self.message_loop_state.as_ref().map_or(false, |state| { state.load(Ordering::SeqCst) == MessageLoopState::Running as i32 @@ -338,33 +440,6 @@ impl WindowProcedureThread { ) } - /// # Safety - /// The message queue must have been created on the WndProc thread before calling this, and the - /// returned thread must not outlive the WndProc thread. - unsafe fn start_message_relay_thread( - #[allow(unused_variables)] vm_tube: Arc>, - #[allow(unused_variables)] wndproc_thread_id: u32, - ) -> Option> { - #[cfg(feature = "kiwi")] - match MessageRelayThread::::start_thread( - vm_tube, - wndproc_thread_id, - WM_USER_HANDLE_SERVICE_MESSAGE_INTERNAL, - ) { - Ok(thread) => Some(thread), - // We won't get messages from the service if we failed to spawn this thread. It may not - // worth terminating the WndProc thread and crashing the emulator in that case, so we - // just log the error. - Err(e) => { - error!("{:?}", e); - None - } - } - - #[cfg(not(feature = "kiwi"))] - None - } - unsafe extern "system" fn wnd_proc( hwnd: HWND, msg: UINT, @@ -398,3 +473,23 @@ impl Drop for WindowProcedureThread { // Since `WindowProcedureThread` does not hold anything that cannot be transferred between threads, // we can implement `Send` for it. unsafe impl Send for WindowProcedureThread {} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn error_on_adding_reserved_token_to_context() { + let mut ctx = MsgWaitContext::new(); + let event = Event::new().unwrap(); + assert!(ctx.add(&event, Token::MessagePump).is_err()); + } + + #[test] + fn error_on_adding_duplicated_handle_to_context() { + let mut ctx = MsgWaitContext::new(); + let event = Event::new().unwrap(); + assert!(ctx.add(&event, Token::ServiceMessage).is_ok()); + assert!(ctx.add(&event, Token::ServiceMessage).is_err()); + } +}