From f5c4a2a0ddf9afdb8c759490cd39274b9b10e0de Mon Sep 17 00:00:00 2001 From: Max Brunsfeld Date: Mon, 13 Mar 2023 11:15:22 -0700 Subject: [PATCH] Fix failure to see screenshare tracks that were started prior to joining a call Co-authored-by: Antonio Scandurra --- crates/call/src/room.rs | 2 +- crates/client/src/user.rs | 5 ++ crates/collab/src/tests/integration_tests.rs | 93 ++++++++++++++++++++ crates/live_kit_client/src/test.rs | 49 +++++++++-- 4 files changed, 143 insertions(+), 6 deletions(-) diff --git a/crates/call/src/room.rs b/crates/call/src/room.rs index 257ef52c19..3a662458f2 100644 --- a/crates/call/src/room.rs +++ b/crates/call/src/room.rs @@ -626,7 +626,7 @@ impl Room { if let Some(live_kit) = this.live_kit.as_ref() { let tracks = - live_kit.room.remote_video_tracks(&peer_id.to_string()); + live_kit.room.remote_video_tracks(&user.id.to_string()); for track in tracks { this.remote_video_track_updated( RemoteVideoTrackUpdate::Subscribed(track), diff --git a/crates/client/src/user.rs b/crates/client/src/user.rs index 01fd1773c4..a0a730871d 100644 --- a/crates/client/src/user.rs +++ b/crates/client/src/user.rs @@ -183,6 +183,11 @@ impl UserStore { } } + #[cfg(feature = "test-support")] + pub fn clear_cache(&mut self) { + self.users.clear(); + } + async fn handle_update_invite_info( this: ModelHandle, message: TypedEnvelope, diff --git a/crates/collab/src/tests/integration_tests.rs b/crates/collab/src/tests/integration_tests.rs index c48f6b1604..c04deca9e5 100644 --- a/crates/collab/src/tests/integration_tests.rs +++ b/crates/collab/src/tests/integration_tests.rs @@ -6292,6 +6292,99 @@ async fn test_basic_following( ); } +#[gpui::test(iterations = 10)] +async fn test_join_call_after_screen_was_shared( + deterministic: Arc, + cx_a: &mut TestAppContext, + cx_b: &mut TestAppContext, +) { + deterministic.forbid_parking(); + let mut server = TestServer::start(&deterministic).await; + + let client_a = server.create_client(cx_a, "user_a").await; + let client_b = server.create_client(cx_b, "user_b").await; + server + .make_contacts(&mut [(&client_a, cx_a), (&client_b, cx_b)]) + .await; + + let active_call_a = cx_a.read(ActiveCall::global); + let active_call_b = cx_b.read(ActiveCall::global); + + // Call users B and C from client A. + active_call_a + .update(cx_a, |call, cx| { + call.invite(client_b.user_id().unwrap(), None, cx) + }) + .await + .unwrap(); + let room_a = active_call_a.read_with(cx_a, |call, _| call.room().unwrap().clone()); + deterministic.run_until_parked(); + assert_eq!( + room_participants(&room_a, cx_a), + RoomParticipants { + remote: Default::default(), + pending: vec!["user_b".to_string()] + } + ); + + // User B receives the call. + let mut incoming_call_b = active_call_b.read_with(cx_b, |call, _| call.incoming()); + let call_b = incoming_call_b.next().await.unwrap().unwrap(); + assert_eq!(call_b.calling_user.github_login, "user_a"); + + // User A shares their screen + let display = MacOSDisplay::new(); + active_call_a + .update(cx_a, |call, cx| { + call.room().unwrap().update(cx, |room, cx| { + room.set_display_sources(vec![display.clone()]); + room.share_screen(cx) + }) + }) + .await + .unwrap(); + + client_b.user_store.update(cx_b, |user_store, _| { + user_store.clear_cache(); + }); + + // User B joins the room + active_call_b + .update(cx_b, |call, cx| call.accept_incoming(cx)) + .await + .unwrap(); + let room_b = active_call_b.read_with(cx_b, |call, _| call.room().unwrap().clone()); + assert!(incoming_call_b.next().await.unwrap().is_none()); + + deterministic.run_until_parked(); + assert_eq!( + room_participants(&room_a, cx_a), + RoomParticipants { + remote: vec!["user_b".to_string()], + pending: vec![], + } + ); + assert_eq!( + room_participants(&room_b, cx_b), + RoomParticipants { + remote: vec!["user_a".to_string()], + pending: vec![], + } + ); + + // Ensure User B sees User A's screenshare. + room_b.read_with(cx_b, |room, _| { + assert_eq!( + room.remote_participants() + .get(&client_a.user_id().unwrap()) + .unwrap() + .tracks + .len(), + 1 + ); + }); +} + #[gpui::test] async fn test_following_tab_order( deterministic: Arc, diff --git a/crates/live_kit_client/src/test.rs b/crates/live_kit_client/src/test.rs index 329e4e1176..8d1e4fa16a 100644 --- a/crates/live_kit_client/src/test.rs +++ b/crates/live_kit_client/src/test.rs @@ -104,6 +104,15 @@ impl TestServer { room_name )) } else { + for track in &room.tracks { + client_room + .0 + .lock() + .video_track_updates + .0 + .try_broadcast(RemoteVideoTrackUpdate::Subscribed(track.clone())) + .unwrap(); + } room.client_rooms.insert(identity, client_room); Ok(()) } @@ -167,11 +176,13 @@ impl TestServer { .get_mut(&*room_name) .ok_or_else(|| anyhow!("room {} does not exist", room_name))?; - let update = RemoteVideoTrackUpdate::Subscribed(Arc::new(RemoteVideoTrack { + let track = Arc::new(RemoteVideoTrack { sid: nanoid::nanoid!(17), publisher_id: identity.clone(), frames_rx: local_track.frames_rx.clone(), - })); + }); + + room.tracks.push(track.clone()); for (id, client_room) in &room.client_rooms { if *id != identity { @@ -180,18 +191,30 @@ impl TestServer { .lock() .video_track_updates .0 - .try_broadcast(update.clone()) + .try_broadcast(RemoteVideoTrackUpdate::Subscribed(track.clone())) .unwrap(); } } Ok(()) } + + fn video_tracks(&self, token: String) -> Result>> { + let claims = live_kit_server::token::validate(&token, &self.secret_key)?; + let room_name = claims.video.room.unwrap(); + + let mut server_rooms = self.rooms.lock(); + let room = server_rooms + .get_mut(&*room_name) + .ok_or_else(|| anyhow!("room {} does not exist", room_name))?; + Ok(room.tracks.clone()) + } } #[derive(Default)] struct TestServerRoom { client_rooms: HashMap>, + tracks: Vec>, } impl TestServerRoom {} @@ -307,8 +330,17 @@ impl Room { pub fn unpublish_track(&self, _: LocalTrackPublication) {} - pub fn remote_video_tracks(&self, _: &str) -> Vec> { - Default::default() + pub fn remote_video_tracks(&self, publisher_id: &str) -> Vec> { + if !self.is_connected() { + return Vec::new(); + } + + self.test_server() + .video_tracks(self.token()) + .unwrap() + .into_iter() + .filter(|track| track.publisher_id() == publisher_id) + .collect() } pub fn remote_video_track_updates(&self) -> impl Stream { @@ -332,6 +364,13 @@ impl Room { ConnectionState::Connected { token, .. } => token, } } + + fn is_connected(&self) -> bool { + match *self.0.lock().connection.1.borrow() { + ConnectionState::Disconnected => false, + ConnectionState::Connected { .. } => true, + } + } } impl Drop for Room {