diff --git a/crates/call/src/room.rs b/crates/call/src/room.rs index b8da95bfb5..613a52a908 100644 --- a/crates/call/src/room.rs +++ b/crates/call/src/room.rs @@ -157,6 +157,7 @@ impl Room { screen_track: LocalTrack::None, microphone_track: LocalTrack::None, next_publish_id: 0, + deafened: false, _maintain_room, _maintain_tracks: [_maintain_video_tracks, _maintain_audio_tracks], }) @@ -223,7 +224,9 @@ impl Room { || &*util::channel::RELEASE_CHANNEL != &ReleaseChannel::Dev { let share_mic = room.update(&mut cx, |room, cx| room.share_mic(cx)); - cx.background().spawn(share_mic).detach(); + cx.update(|cx| { + cx.background().spawn(share_mic).detach_and_log_err(cx); + }); } match room @@ -1039,7 +1042,7 @@ impl Room { let (canceled, muted) = if let LocalTrack::Pending { publish_id: cur_publish_id, - muted + muted, } = &live_kit.microphone_track { (*cur_publish_id != publish_id, *muted) @@ -1053,11 +1056,11 @@ impl Room { live_kit.room.unpublish_track(publication); } else { if muted { - cx.background().spawn(publication.mute()).detach(); + cx.background().spawn(publication.set_mute(muted)).detach(); } live_kit.microphone_track = LocalTrack::Published { track_publication: publication, - muted + muted, }; cx.notify(); } @@ -1139,7 +1142,7 @@ impl Room { live_kit.room.unpublish_track(publication); } else { if muted { - cx.background().spawn(publication.mute()).detach(); + cx.background().spawn(publication.set_mute(muted)).detach(); } live_kit.screen_track = LocalTrack::Published { track_publication: publication, @@ -1177,11 +1180,7 @@ impl Room { } => { *muted = !*muted; - if *muted { - Ok(cx.background().spawn(track_publication.mute())) - } else { - Ok(cx.background().spawn(track_publication.unmute())) - } + Ok(cx.background().spawn(track_publication.set_mute(*muted))) } } } else { @@ -1189,9 +1188,32 @@ impl Room { } } - pub fn toggle_deafen(&mut self, _cx: &mut ModelContext) -> Task> { - // iterate through publications and mute (?????) - todo!(); + pub fn toggle_deafen(&mut self, cx: &mut ModelContext) -> Result>> { + if let Some(live_kit) = &mut self.live_kit { + (*live_kit).deafened = !live_kit.deafened + } + + if let Some(live_kit) = &self.live_kit { + let mut tasks = Vec::with_capacity(self.remote_participants.len()); + + for participant in self.remote_participants.values() { + for track in live_kit + .room + .remote_audio_track_publications(&participant.user.id.to_string()) + { + tasks.push(cx.background().spawn(track.set_enabled(live_kit.deafened))); + } + } + + Ok(cx.background().spawn(async move { + for task in tasks { + task.await?; + } + Ok(()) + })) + } else { + Err(anyhow!("LiveKit not started")) + } } pub fn unshare_screen(&mut self, cx: &mut ModelContext) -> Result<()> { @@ -1233,6 +1255,7 @@ struct LiveKitRoom { room: Arc, screen_track: LocalTrack, microphone_track: LocalTrack, + deafened: bool, next_publish_id: usize, _maintain_room: Task<()>, _maintain_tracks: [Task<()>; 2], diff --git a/crates/collab_ui/src/collab_titlebar_item.rs b/crates/collab_ui/src/collab_titlebar_item.rs index 919e712ad1..87fd7470d6 100644 --- a/crates/collab_ui/src/collab_titlebar_item.rs +++ b/crates/collab_ui/src/collab_titlebar_item.rs @@ -23,7 +23,7 @@ use theme::{AvatarStyle, Theme}; use util::ResultExt; use workspace::{FollowNextCollaborator, Workspace}; -const MAX_TITLE_LENGTH: usize = 75; +// const MAX_TITLE_LENGTH: usize = 75; actions!( collab, diff --git a/crates/collab_ui/src/collab_ui.rs b/crates/collab_ui/src/collab_ui.rs index 9de8cbf9dd..940700a526 100644 --- a/crates/collab_ui/src/collab_ui.rs +++ b/crates/collab_ui/src/collab_ui.rs @@ -47,12 +47,12 @@ pub fn toggle_screen_sharing(_: &ToggleScreenSharing, cx: &mut AppContext) { pub fn toggle_mute(_: &ToggleMute, cx: &mut AppContext) { if let Some(room) = ActiveCall::global(cx).read(cx).room().cloned() { - room.update(cx, Room::toggle_mute).map(Task::detach).log_err(); + room.update(cx, Room::toggle_mute).map(|task| task.detach_and_log_err(cx)).log_err(); } } pub fn toggle_deafen(_: &ToggleDeafen, cx: &mut AppContext) { if let Some(room) = ActiveCall::global(cx).read(cx).room().cloned() { - room.update(cx, Room::toggle_deafen).detach_and_log_err(cx); + room.update(cx, Room::toggle_deafen).map(|task| task.detach_and_log_err(cx)).log_err(); } } diff --git a/crates/gpui/src/executor.rs b/crates/gpui/src/executor.rs index 365766fb9d..0923826a09 100644 --- a/crates/gpui/src/executor.rs +++ b/crates/gpui/src/executor.rs @@ -12,7 +12,7 @@ use std::{ sync::Arc, task::{Context, Poll}, thread, - time::Duration, + time::Duration, panic::Location, }; use crate::{ @@ -965,10 +965,12 @@ impl Task { } impl Task> { + #[track_caller] pub fn detach_and_log_err(self, cx: &mut AppContext) { cx.spawn(|_| async move { if let Err(err) = self.await { - log::error!("{:#}", err); + let caller = Location::caller(); + log::error!("{}:{}: {:#}", caller.file(), caller.line(), err); } }) .detach(); diff --git a/crates/live_kit_client/LiveKitBridge/Sources/LiveKitBridge/LiveKitBridge.swift b/crates/live_kit_client/LiveKitBridge/Sources/LiveKitBridge/LiveKitBridge.swift index d28dc828f1..666da3d533 100644 --- a/crates/live_kit_client/LiveKitBridge/Sources/LiveKitBridge/LiveKitBridge.swift +++ b/crates/live_kit_client/LiveKitBridge/Sources/LiveKitBridge/LiveKitBridge.swift @@ -169,6 +169,18 @@ public func LKRoomAudioTracksForRemoteParticipant(room: UnsafeRawPointer, partic return nil; } +@_cdecl("LKRoomAudioTrackPublicationsForRemoteParticipant") +public func LKRoomAudioTrackPublicationsForRemoteParticipant(room: UnsafeRawPointer, participantId: CFString) -> CFArray? { + let room = Unmanaged.fromOpaque(room).takeUnretainedValue() + + for (_, participant) in room.remoteParticipants { + if participant.identity == participantId as String { + return participant.audioTracks.compactMap { $0 as? RemoteTrackPublication } as CFArray? + } + } + + return nil; +} @_cdecl("LKRoomVideoTracksForRemoteParticipant") public func LKRoomVideoTracksForRemoteParticipant(room: UnsafeRawPointer, participantId: CFString) -> CFArray? { @@ -235,33 +247,45 @@ public func LKDisplaySources(data: UnsafeRawPointer, callback: @escaping @conven } } -@_cdecl("LKLocalTrackPublicationMute") -public func LKLocalTrackPublicationMute( +@_cdecl("LKLocalTrackPublicationSetMute") +public func LKLocalTrackPublicationSetMute( publication: UnsafeRawPointer, + muted: Bool, on_complete: @escaping @convention(c) (UnsafeRawPointer, CFString?) -> Void, callback_data: UnsafeRawPointer ) { let publication = Unmanaged.fromOpaque(publication).takeUnretainedValue() - publication.mute().then { - on_complete(callback_data, nil) - }.catch { error in - on_complete(callback_data, error.localizedDescription as CFString) + if muted { + publication.mute().then { + on_complete(callback_data, nil) + }.catch { error in + on_complete(callback_data, error.localizedDescription as CFString) + } + } else { + publication.unmute().then { + on_complete(callback_data, nil) + }.catch { error in + on_complete(callback_data, error.localizedDescription as CFString) + } } - } -@_cdecl("LKLocalTrackPublicationUnmute") -public func LKLocalTrackPublicationUnmute( +@_cdecl("LKRemoteTrackPublicationSetEnabled") +public func LKRemoteTrackPublicationSetEnabled( publication: UnsafeRawPointer, + enabled: Bool, on_complete: @escaping @convention(c) (UnsafeRawPointer, CFString?) -> Void, callback_data: UnsafeRawPointer ) { - let publication = Unmanaged.fromOpaque(publication).takeUnretainedValue() + let publication = Unmanaged.fromOpaque(publication).takeUnretainedValue() - publication.unmute().then { + publication.set(enabled: enabled).then { on_complete(callback_data, nil) }.catch { error in on_complete(callback_data, error.localizedDescription as CFString) } } + + + diff --git a/crates/live_kit_client/src/prod.rs b/crates/live_kit_client/src/prod.rs index 46516d49be..76f06b562f 100644 --- a/crates/live_kit_client/src/prod.rs +++ b/crates/live_kit_client/src/prod.rs @@ -72,6 +72,11 @@ extern "C" { participant_id: CFStringRef, ) -> CFArrayRef; + fn LKRoomAudioTrackPublicationsForRemoteParticipant( + room: *const c_void, + participant_id: CFStringRef, + ) -> CFArrayRef; + fn LKRoomVideoTracksForRemoteParticipant( room: *const c_void, participant_id: CFStringRef, @@ -98,13 +103,16 @@ extern "C" { fn LKCreateScreenShareTrackForDisplay(display: *const c_void) -> *const c_void; fn LKLocalAudioTrackCreateTrack() -> *const c_void; - fn LKLocalTrackPublicationMute( + fn LKLocalTrackPublicationSetMute( publication: *const c_void, + muted: bool, on_complete: extern "C" fn(callback_data: *mut c_void, error: CFStringRef), callback_data: *mut c_void, ); - fn LKLocalTrackPublicationUnmute( + + fn LKRemoteTrackPublicationSetEnabled( publication: *const c_void, + enabled: bool, on_complete: extern "C" fn(callback_data: *mut c_void, error: CFStringRef), callback_data: *mut c_void, ); @@ -318,6 +326,29 @@ impl Room { } } + pub fn remote_audio_track_publications(&self, participant_id: &str) -> Vec> { + unsafe { + let tracks = LKRoomAudioTrackPublicationsForRemoteParticipant( + self.native_room, + CFString::new(participant_id).as_concrete_TypeRef(), + ); + + if tracks.is_null() { + Vec::new() + } else { + let tracks = CFArray::wrap_under_get_rule(tracks); + tracks + .into_iter() + .map(|native_track_publication| { + let native_track_publication = *native_track_publication; + Arc::new(RemoteTrackPublication(native_track_publication)) + }) + .collect() + } + } + } + + pub fn remote_audio_track_updates(&self) -> mpsc::UnboundedReceiver { let (tx, rx) = mpsc::unbounded(); self.remote_audio_track_subscribers.lock().push(tx); @@ -531,7 +562,7 @@ impl Drop for LocalVideoTrack { pub struct LocalTrackPublication(*const c_void); impl LocalTrackPublication { - pub fn mute(&self) -> impl Future> { + pub fn set_mute(&self, muted: bool) -> impl Future> { let (tx, rx) = futures::channel::oneshot::channel(); extern "C" fn complete_callback(callback_data: *mut c_void, error: CFStringRef) { @@ -545,32 +576,9 @@ impl LocalTrackPublication { } unsafe { - LKLocalTrackPublicationMute( - self.0, - complete_callback, - Box::into_raw(Box::new(tx)) as *mut c_void, - ) - } - - async move { rx.await.unwrap() } - } - - pub fn unmute(&self) -> impl Future> { - let (tx, rx) = futures::channel::oneshot::channel(); - - extern "C" fn complete_callback(callback_data: *mut c_void, error: CFStringRef) { - let tx = unsafe { Box::from_raw(callback_data as *mut oneshot::Sender>) }; - if error.is_null() { - tx.send(Ok(())).ok(); - } else { - let error = unsafe { CFString::wrap_under_get_rule(error).to_string() }; - tx.send(Err(anyhow!(error))).ok(); - } - } - - unsafe { - LKLocalTrackPublicationUnmute( + LKLocalTrackPublicationSetMute( self.0, + muted, complete_callback, Box::into_raw(Box::new(tx)) as *mut c_void, ) @@ -586,6 +594,42 @@ impl Drop for LocalTrackPublication { } } +pub struct RemoteTrackPublication(*const c_void); + +impl RemoteTrackPublication { + pub fn set_enabled(&self, enabled: bool) -> impl Future> { + let (tx, rx) = futures::channel::oneshot::channel(); + + extern "C" fn complete_callback(callback_data: *mut c_void, error: CFStringRef) { + let tx = unsafe { Box::from_raw(callback_data as *mut oneshot::Sender>) }; + if error.is_null() { + tx.send(Ok(())).ok(); + } else { + let error = unsafe { CFString::wrap_under_get_rule(error).to_string() }; + tx.send(Err(anyhow!(error))).ok(); + } + } + + unsafe { + LKRemoteTrackPublicationSetEnabled( + self.0, + enabled, + complete_callback, + Box::into_raw(Box::new(tx)) as *mut c_void, + ) + } + + async move { rx.await.unwrap() } + } +} + +impl Drop for RemoteTrackPublication { + fn drop(&mut self) { + unsafe { CFRelease(self.0) } + } +} + + #[derive(Debug)] pub struct RemoteAudioTrack { _native_track: *const c_void, @@ -612,6 +656,18 @@ impl RemoteAudioTrack { pub fn publisher_id(&self) -> &str { &self.publisher_id } + + pub fn enable(&self) -> impl Future> { + async { + Ok(()) + } + } + + pub fn disable(&self) -> impl Future> { + async { + Ok(()) + } + } } #[derive(Debug)] diff --git a/crates/live_kit_client/src/test.rs b/crates/live_kit_client/src/test.rs index 21211ce473..e8c5247f53 100644 --- a/crates/live_kit_client/src/test.rs +++ b/crates/live_kit_client/src/test.rs @@ -410,6 +410,25 @@ impl Room { .collect() } + pub fn remote_audio_track_publications( + &self, + publisher_id: &str, + ) -> Vec> { + if !self.is_connected() { + return Vec::new(); + } + + self.test_server() + .audio_tracks(self.token()) + .unwrap() + .into_iter() + .filter(|track| track.publisher_id() == publisher_id) + .map(|_track| { + Arc::new(RemoteTrackPublication {}) + }) + .collect() + } + pub fn remote_video_tracks(&self, publisher_id: &str) -> Vec> { if !self.is_connected() { return Vec::new(); @@ -476,16 +495,16 @@ impl Drop for Room { pub struct LocalTrackPublication; impl LocalTrackPublication { - pub fn mute(&self) -> impl Future> { - async { - Ok(()) - } + pub fn set_mute(&self, _mute: bool) -> impl Future> { + async { Ok(()) } } +} - pub fn unmute(&self) -> impl Future> { - async { - Ok(()) - } +pub struct RemoteTrackPublication; + +impl RemoteTrackPublication { + pub fn set_enabled(&self, _enabled: bool) -> impl Future> { + async { Ok(()) } } } @@ -545,6 +564,14 @@ impl RemoteAudioTrack { pub fn publisher_id(&self) -> &str { &self.publisher_id } + + pub fn enable(&self) -> impl Future> { + async { Ok(()) } + } + + pub fn disable(&self) -> impl Future> { + async { Ok(()) } + } } #[derive(Clone)]