base: windows: support Sync usage of StreamChannel

I want the async tube types to be `Sync` and they wrap `StreamChannel`
(via `Tube`). It is important for async types to be `Sync` so that they
can be freely used in spawned futures.

BUG=b:338274203

Change-Id: I76d7ff1f4001a7aa71ace929dc5b4c9f25acad0b
Reviewed-on: https://chromium-review.googlesource.com/c/crosvm/crosvm/+/5507772
Commit-Queue: Frederick Mayle <fmayle@google.com>
Reviewed-by: Noah Gold <nkgold@google.com>
This commit is contained in:
Frederick Mayle 2024-04-25 16:26:38 -07:00 committed by crosvm LUCI
parent c736d23241
commit 44a84f4360

View file

@ -2,7 +2,6 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use std::cell::RefCell;
use std::io;
use std::sync::Arc;
@ -101,8 +100,8 @@ pub struct StreamChannel {
// signal the channel has been closed when it was dropped, because a copy of it was sent to
// another process. It is the copy's responsibility to close the pipe.
#[serde(skip)]
#[serde(default = "create_true_cell")]
is_channel_closed_on_drop: RefCell<bool>,
#[serde(default = "create_true_mutex")]
is_channel_closed_on_drop: Mutex<bool>,
// For StreamChannels created via pair_with_buffer_size, allows the channel to accept messages
// up to that size.
@ -113,8 +112,8 @@ fn create_read_lock() -> Arc<Mutex<()>> {
Arc::new(Mutex::new(()))
}
fn create_true_cell() -> RefCell<bool> {
RefCell::new(true)
fn create_true_mutex() -> Mutex<bool> {
Mutex::new(true)
}
/// Serialize is manually implemented because we need to tell the local copy that a remote copy
@ -138,7 +137,7 @@ impl Serialize for StreamChannel {
// Because this end has been serialized, the serialized copy is now responsible for setting
// the close event.
if ret.is_ok() {
*self.is_channel_closed_on_drop.borrow_mut() = false;
*self.is_channel_closed_on_drop.lock() = false;
}
ret
@ -147,7 +146,7 @@ impl Serialize for StreamChannel {
impl Drop for StreamChannel {
fn drop(&mut self) {
if *self.is_channel_closed_on_drop.borrow() {
if *self.is_channel_closed_on_drop.lock() {
if let Err(e) = self.pipe_closed.signal() {
warn!("failed to notify on channel drop: {}", e);
}
@ -178,7 +177,7 @@ impl StreamChannel {
remote_write_lock: self.remote_write_lock.try_clone()?,
local_write_lock: self.local_write_lock.try_clone()?,
read_lock: self.read_lock.clone(),
is_channel_closed_on_drop: create_true_cell(),
is_channel_closed_on_drop: create_true_mutex(),
send_buffer_size: self.send_buffer_size,
})
}
@ -334,7 +333,7 @@ impl StreamChannel {
local_write_lock: write_lock_a.try_clone()?,
remote_write_lock: write_lock_b.try_clone()?,
pipe_closed: pipe_closed.try_clone()?,
is_channel_closed_on_drop: create_true_cell(),
is_channel_closed_on_drop: create_true_mutex(),
send_buffer_size,
};
let sock_b = StreamChannel {
@ -345,7 +344,7 @@ impl StreamChannel {
local_write_lock: write_lock_b,
remote_write_lock: write_lock_a,
pipe_closed,
is_channel_closed_on_drop: create_true_cell(),
is_channel_closed_on_drop: create_true_mutex(),
send_buffer_size,
};
Ok((sock_a, sock_b))