From d74fb97158e5e886c1aab3c2b8aa45df35641661 Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Tue, 6 Dec 2022 16:45:09 +0100 Subject: [PATCH] Remove `Executor` trait from `collab` and use an enum instead This will let us save off the executor and avoid using generics. --- crates/collab/src/executor.rs | 36 ++++++++++++++++++++++ crates/collab/src/integration_tests.rs | 23 ++++----------- crates/collab/src/lib.rs | 1 + crates/collab/src/rpc.rs | 41 ++++---------------------- 4 files changed, 48 insertions(+), 53 deletions(-) create mode 100644 crates/collab/src/executor.rs diff --git a/crates/collab/src/executor.rs b/crates/collab/src/executor.rs new file mode 100644 index 0000000000..d2253f8ccb --- /dev/null +++ b/crates/collab/src/executor.rs @@ -0,0 +1,36 @@ +use std::{future::Future, time::Duration}; + +#[derive(Clone)] +pub enum Executor { + Production, + #[cfg(test)] + Deterministic(std::sync::Arc), +} + +impl Executor { + pub fn spawn_detached(&self, future: F) + where + F: 'static + Send + Future, + { + match self { + Executor::Production => { + tokio::spawn(future); + } + #[cfg(test)] + Executor::Deterministic(background) => { + background.spawn(future).detach(); + } + } + } + + pub fn sleep(&self, duration: Duration) -> impl Future { + let this = self.clone(); + async move { + match this { + Executor::Production => tokio::time::sleep(duration).await, + #[cfg(test)] + Executor::Deterministic(background) => background.timer(duration).await, + } + } + } +} diff --git a/crates/collab/src/integration_tests.rs b/crates/collab/src/integration_tests.rs index a77ae4925d..96fed5887b 100644 --- a/crates/collab/src/integration_tests.rs +++ b/crates/collab/src/integration_tests.rs @@ -1,9 +1,9 @@ use crate::{ db::{self, NewUserParams, TestDb, UserId}, - rpc::{Executor, Server}, + executor::Executor, + rpc::Server, AppState, }; - use ::rpc::Peer; use anyhow::anyhow; use call::{room, ActiveCall, ParticipantLocation, Room}; @@ -17,7 +17,7 @@ use editor::{ ToggleCodeActions, Undo, }; use fs::{FakeFs, Fs as _, HomeDir, LineEnding}; -use futures::{channel::oneshot, Future, StreamExt as _}; +use futures::{channel::oneshot, StreamExt as _}; use gpui::{ executor::{self, Deterministic}, geometry::vector::vec2f, @@ -45,7 +45,6 @@ use std::{ atomic::{AtomicBool, AtomicUsize, Ordering::SeqCst}, Arc, }, - time::Duration, }; use theme::ThemeRegistry; use unindent::Unindent as _; @@ -417,7 +416,7 @@ async fn test_leaving_room_on_disconnection( // When user A disconnects, both client A and B clear their room on the active call. server.disconnect_client(client_a.peer_id().unwrap()); - cx_a.foreground().advance_clock(rpc::RECEIVE_TIMEOUT); + deterministic.advance_clock(rpc::RECEIVE_TIMEOUT); active_call_a.read_with(cx_a, |call, _| assert!(call.room().is_none())); active_call_b.read_with(cx_b, |call, _| assert!(call.room().is_none())); assert_eq!( @@ -6000,7 +5999,7 @@ impl TestServer { client_name, user, Some(connection_id_tx), - cx.background(), + Executor::Deterministic(cx.background()), )) .detach(); let connection_id = connection_id_rx.await.unwrap(); @@ -6829,18 +6828,6 @@ impl Drop for TestClient { } } -impl Executor for Arc { - type Sleep = gpui::executor::Timer; - - fn spawn_detached>(&self, future: F) { - self.spawn(future).detach(); - } - - fn sleep(&self, duration: Duration) -> Self::Sleep { - self.as_ref().timer(duration) - } -} - #[derive(Debug, Eq, PartialEq)] struct RoomParticipants { remote: Vec, diff --git a/crates/collab/src/lib.rs b/crates/collab/src/lib.rs index 24a9fc6117..b9d43cd2ee 100644 --- a/crates/collab/src/lib.rs +++ b/crates/collab/src/lib.rs @@ -2,6 +2,7 @@ pub mod api; pub mod auth; pub mod db; pub mod env; +mod executor; #[cfg(test)] mod integration_tests; pub mod rpc; diff --git a/crates/collab/src/rpc.rs b/crates/collab/src/rpc.rs index 736f5eb31b..c1f9eb039b 100644 --- a/crates/collab/src/rpc.rs +++ b/crates/collab/src/rpc.rs @@ -3,6 +3,7 @@ mod connection_pool; use crate::{ auth, db::{self, Database, ProjectId, RoomId, User, UserId}, + executor::Executor, AppState, Result, }; use anyhow::anyhow; @@ -50,12 +51,8 @@ use std::{ atomic::{AtomicBool, Ordering::SeqCst}, Arc, }, - time::Duration, -}; -use tokio::{ - sync::{Mutex, MutexGuard}, - time::Sleep, }; +use tokio::sync::{Mutex, MutexGuard}; use tower::ServiceBuilder; use tracing::{info_span, instrument, Instrument}; @@ -145,15 +142,6 @@ pub struct Server { handlers: HashMap, } -pub trait Executor: Send + Clone { - type Sleep: Send + Future; - fn spawn_detached>(&self, future: F); - fn sleep(&self, duration: Duration) -> Self::Sleep; -} - -#[derive(Clone)] -pub struct RealExecutor; - pub(crate) struct ConnectionPoolGuard<'a> { guard: MutexGuard<'a, ConnectionPool>, _not_send: PhantomData>, @@ -330,13 +318,13 @@ impl Server { }) } - pub fn handle_connection( + pub fn handle_connection( self: &Arc, connection: Connection, address: String, user: User, mut send_connection_id: Option>, - executor: E, + executor: Executor, ) -> impl Future> { let this = self.clone(); let user_id = user.id; @@ -347,12 +335,7 @@ impl Server { .peer .add_connection(connection, { let executor = executor.clone(); - move |duration| { - let timer = executor.sleep(duration); - async move { - timer.await; - } - } + move |duration| executor.sleep(duration) }); tracing::info!(%user_id, %login, %connection_id, %address, "connection opened"); @@ -543,18 +526,6 @@ impl<'a> Drop for ConnectionPoolGuard<'a> { } } -impl Executor for RealExecutor { - type Sleep = Sleep; - - fn spawn_detached>(&self, future: F) { - tokio::task::spawn(future); - } - - fn sleep(&self, duration: Duration) -> Self::Sleep { - tokio::time::sleep(duration) - } -} - fn broadcast( sender_id: ConnectionId, receiver_ids: impl IntoIterator, @@ -636,7 +607,7 @@ pub async fn handle_websocket_request( let connection = Connection::new(Box::pin(socket)); async move { server - .handle_connection(connection, socket_address, user, None, RealExecutor) + .handle_connection(connection, socket_address, user, None, Executor::Production) .await .log_err(); }