From 2d3d07d4d7d217a7afafc19c97b35e8c8b5bd5f7 Mon Sep 17 00:00:00 2001 From: Max Brunsfeld Date: Fri, 14 Oct 2022 10:17:59 -0700 Subject: [PATCH] Clear project's shared state upon every disconnection Co-authored-by: Nathan Sobo Co-authored-by: Antonio Scandurra --- crates/collab/src/integration_tests.rs | 25 +- crates/project/src/project.rs | 328 ++++++++++++------------- 2 files changed, 188 insertions(+), 165 deletions(-) diff --git a/crates/collab/src/integration_tests.rs b/crates/collab/src/integration_tests.rs index 1f8f1394ce..90d7b6d4b5 100644 --- a/crates/collab/src/integration_tests.rs +++ b/crates/collab/src/integration_tests.rs @@ -807,7 +807,7 @@ async fn test_host_disconnect( // Drop client A's connection. Collaborators should disappear and the project should not be shown as shared. server.disconnect_client(client_a.current_user_id(cx_a)); - cx_a.foreground().advance_clock(rpc::RECEIVE_TIMEOUT); + deterministic.advance_clock(rpc::RECEIVE_TIMEOUT); project_a .condition(cx_a, |project, _| project.collaborators().is_empty()) .await; @@ -829,6 +829,29 @@ async fn test_host_disconnect( .await .unwrap(); assert!(can_close); + + let active_call_b = cx_b.read(ActiveCall::global); + active_call_b + .update(cx_b, |call, cx| { + call.invite(client_a.user_id().unwrap(), None, cx) + }) + .await + .unwrap(); + deterministic.run_until_parked(); + active_call_a + .update(cx_a, |call, cx| call.accept_incoming(cx)) + .await + .unwrap(); + + active_call_a + .update(cx_a, |call, cx| call.share_project(project_a.clone(), cx)) + .await + .unwrap(); + + // Drop client A's connection again. We should still unshare it successfully. + server.disconnect_client(client_a.current_user_id(cx_a)); + deterministic.advance_clock(rpc::RECEIVE_TIMEOUT); + project_a.read_with(cx_a, |project, _| assert!(!project.is_shared())); } #[gpui::test(iterations = 10)] diff --git a/crates/project/src/project.rs b/crates/project/src/project.rs index ea74dda434..f964726c4c 100644 --- a/crates/project/src/project.rs +++ b/crates/project/src/project.rs @@ -104,7 +104,7 @@ pub struct Project { user_store: ModelHandle, project_store: ModelHandle, fs: Arc, - client_state: ProjectClientState, + client_state: Option, collaborators: HashMap, client_subscriptions: Vec, _subscriptions: Vec, @@ -151,7 +151,7 @@ enum WorktreeHandle { enum ProjectClientState { Local { - remote_id: Option, + remote_id: u64, _detect_unshare: Task>, }, Remote { @@ -418,21 +418,6 @@ impl Project { cx: &mut MutableAppContext, ) -> ModelHandle { cx.add_model(|cx: &mut ModelContext| { - let mut status = client.status(); - let _detect_unshare = cx.spawn_weak(move |this, mut cx| { - async move { - let is_connected = status.next().await.map_or(false, |s| s.is_connected()); - // Even if we're initially connected, any future change of the status means we momentarily disconnected. - if !is_connected || status.next().await.is_some() { - if let Some(this) = this.upgrade(&cx) { - let _ = this.update(&mut cx, |this, cx| this.unshare(cx)); - } - } - Ok(()) - } - .log_err() - }); - let handle = cx.weak_handle(); project_store.update(cx, |store, cx| store.add_project(handle, cx)); @@ -445,10 +430,7 @@ impl Project { loading_buffers: Default::default(), loading_local_worktrees: Default::default(), buffer_snapshots: Default::default(), - client_state: ProjectClientState::Local { - remote_id: None, - _detect_unshare, - }, + client_state: None, opened_buffer: watch::channel(), client_subscriptions: Vec::new(), _subscriptions: vec![cx.observe_global::(Self::on_settings_changed)], @@ -522,7 +504,7 @@ impl Project { client_subscriptions: vec![client.add_model_for_remote_entity(remote_id, cx)], _subscriptions: Default::default(), client: client.clone(), - client_state: ProjectClientState::Remote { + client_state: Some(ProjectClientState::Remote { sharing_has_stopped: false, remote_id, replica_id, @@ -541,7 +523,7 @@ impl Project { } .log_err() }), - }, + }), language_servers: Default::default(), language_server_ids: Default::default(), language_server_settings: Default::default(), @@ -753,21 +735,22 @@ impl Project { } pub fn remote_id(&self) -> Option { - match &self.client_state { - ProjectClientState::Local { remote_id, .. } => *remote_id, - ProjectClientState::Remote { remote_id, .. } => Some(*remote_id), + match self.client_state.as_ref()? { + ProjectClientState::Local { remote_id, .. } + | ProjectClientState::Remote { remote_id, .. } => Some(*remote_id), } } pub fn replica_id(&self) -> ReplicaId { match &self.client_state { - ProjectClientState::Local { .. } => 0, - ProjectClientState::Remote { replica_id, .. } => *replica_id, + Some(ProjectClientState::Remote { replica_id, .. }) => *replica_id, + _ => 0, } } fn metadata_changed(&mut self, cx: &mut ModelContext) { - if let ProjectClientState::Local { remote_id, .. } = &self.client_state { + if let Some(ProjectClientState::Local { remote_id, .. }) = &self.client_state { + let project_id = *remote_id; // Broadcast worktrees only if the project is online. let worktrees = self .worktrees @@ -778,40 +761,40 @@ impl Project { .map(|worktree| worktree.read(cx).as_local().unwrap().metadata_proto()) }) .collect(); - if let Some(project_id) = *remote_id { - self.client - .send(proto::UpdateProject { - project_id, - worktrees, - }) - .log_err(); - - let worktrees = self.visible_worktrees(cx).collect::>(); - let scans_complete = - futures::future::join_all(worktrees.iter().filter_map(|worktree| { - Some(worktree.read(cx).as_local()?.scan_complete()) - })); - - let worktrees = worktrees.into_iter().map(|handle| handle.downgrade()); - cx.spawn_weak(move |_, cx| async move { - scans_complete.await; - cx.read(|cx| { - for worktree in worktrees { - if let Some(worktree) = worktree - .upgrade(cx) - .and_then(|worktree| worktree.read(cx).as_local()) - { - worktree.send_extension_counts(project_id); - } - } - }) + self.client + .send(proto::UpdateProject { + project_id, + worktrees, }) - .detach(); - } + .log_err(); - self.project_store.update(cx, |_, cx| cx.notify()); - cx.notify(); + let worktrees = self.visible_worktrees(cx).collect::>(); + let scans_complete = futures::future::join_all( + worktrees + .iter() + .filter_map(|worktree| Some(worktree.read(cx).as_local()?.scan_complete())), + ); + + let worktrees = worktrees.into_iter().map(|handle| handle.downgrade()); + + cx.spawn_weak(move |_, cx| async move { + scans_complete.await; + cx.read(|cx| { + for worktree in worktrees { + if let Some(worktree) = worktree + .upgrade(cx) + .and_then(|worktree| worktree.read(cx).as_local()) + { + worktree.send_extension_counts(project_id); + } + } + }) + }) + .detach(); } + + self.project_store.update(cx, |_, cx| cx.notify()); + cx.notify(); } pub fn collaborators(&self) -> &HashMap { @@ -1051,113 +1034,129 @@ impl Project { } pub fn shared(&mut self, project_id: u64, cx: &mut ModelContext) -> Task> { - if let ProjectClientState::Local { remote_id, .. } = &mut self.client_state { - if remote_id.is_some() { - return Task::ready(Err(anyhow!("project was already shared"))); - } - - *remote_id = Some(project_id); - - let mut worktree_share_tasks = Vec::new(); - - for open_buffer in self.opened_buffers.values_mut() { - match open_buffer { - OpenBuffer::Strong(_) => {} - OpenBuffer::Weak(buffer) => { - if let Some(buffer) = buffer.upgrade(cx) { - *open_buffer = OpenBuffer::Strong(buffer); - } - } - OpenBuffer::Operations(_) => unreachable!(), - } - } - - for worktree_handle in self.worktrees.iter_mut() { - match worktree_handle { - WorktreeHandle::Strong(_) => {} - WorktreeHandle::Weak(worktree) => { - if let Some(worktree) = worktree.upgrade(cx) { - *worktree_handle = WorktreeHandle::Strong(worktree); - } - } - } - } - - for (server_id, status) in &self.language_server_statuses { - self.client - .send(proto::StartLanguageServer { - project_id, - server: Some(proto::LanguageServer { - id: *server_id as u64, - name: status.name.clone(), - }), - }) - .log_err(); - } - - for worktree in self.worktrees(cx).collect::>() { - worktree.update(cx, |worktree, cx| { - let worktree = worktree.as_local_mut().unwrap(); - worktree_share_tasks.push(worktree.share(project_id, cx)); - }); - } - - self.client_subscriptions - .push(self.client.add_model_for_remote_entity(project_id, cx)); - self.metadata_changed(cx); - cx.emit(Event::RemoteIdChanged(Some(project_id))); - cx.notify(); - - cx.foreground().spawn(async move { - futures::future::try_join_all(worktree_share_tasks).await?; - Ok(()) - }) - } else { - Task::ready(Err(anyhow!("can't share a remote project"))) + if self.client_state.is_some() { + return Task::ready(Err(anyhow!("project was already shared"))); } + + let mut worktree_share_tasks = Vec::new(); + + for open_buffer in self.opened_buffers.values_mut() { + match open_buffer { + OpenBuffer::Strong(_) => {} + OpenBuffer::Weak(buffer) => { + if let Some(buffer) = buffer.upgrade(cx) { + *open_buffer = OpenBuffer::Strong(buffer); + } + } + OpenBuffer::Operations(_) => unreachable!(), + } + } + + for worktree_handle in self.worktrees.iter_mut() { + match worktree_handle { + WorktreeHandle::Strong(_) => {} + WorktreeHandle::Weak(worktree) => { + if let Some(worktree) = worktree.upgrade(cx) { + *worktree_handle = WorktreeHandle::Strong(worktree); + } + } + } + } + + for (server_id, status) in &self.language_server_statuses { + self.client + .send(proto::StartLanguageServer { + project_id, + server: Some(proto::LanguageServer { + id: *server_id as u64, + name: status.name.clone(), + }), + }) + .log_err(); + } + + for worktree in self.worktrees(cx).collect::>() { + worktree.update(cx, |worktree, cx| { + let worktree = worktree.as_local_mut().unwrap(); + worktree_share_tasks.push(worktree.share(project_id, cx)); + }); + } + + self.client_subscriptions + .push(self.client.add_model_for_remote_entity(project_id, cx)); + self.metadata_changed(cx); + cx.emit(Event::RemoteIdChanged(Some(project_id))); + cx.notify(); + + let mut status = self.client.status(); + self.client_state = Some(ProjectClientState::Local { + remote_id: project_id, + _detect_unshare: cx.spawn_weak(move |this, mut cx| { + async move { + let is_connected = status.next().await.map_or(false, |s| s.is_connected()); + // Even if we're initially connected, any future change of the status means we momentarily disconnected. + if !is_connected || status.next().await.is_some() { + if let Some(this) = this.upgrade(&cx) { + let _ = this.update(&mut cx, |this, cx| this.unshare(cx)); + } + } + Ok(()) + } + .log_err() + }), + }); + + cx.foreground().spawn(async move { + futures::future::try_join_all(worktree_share_tasks).await?; + Ok(()) + }) } pub fn unshare(&mut self, cx: &mut ModelContext) -> Result<()> { - if let ProjectClientState::Local { remote_id, .. } = &mut self.client_state { - if let Some(project_id) = remote_id.take() { - self.collaborators.clear(); - self.shared_buffers.clear(); - self.client_subscriptions.clear(); + if self.is_remote() { + return Err(anyhow!("attempted to unshare a remote project")); + } - for worktree_handle in self.worktrees.iter_mut() { - if let WorktreeHandle::Strong(worktree) = worktree_handle { - let is_visible = worktree.update(cx, |worktree, _| { - worktree.as_local_mut().unwrap().unshare(); - worktree.is_visible() - }); - if !is_visible { - *worktree_handle = WorktreeHandle::Weak(worktree.downgrade()); - } + if let Some(ProjectClientState::Local { remote_id, .. }) = self.client_state.take() { + self.collaborators.clear(); + self.shared_buffers.clear(); + self.client_subscriptions.clear(); + + for worktree_handle in self.worktrees.iter_mut() { + if let WorktreeHandle::Strong(worktree) = worktree_handle { + let is_visible = worktree.update(cx, |worktree, _| { + worktree.as_local_mut().unwrap().unshare(); + worktree.is_visible() + }); + if !is_visible { + *worktree_handle = WorktreeHandle::Weak(worktree.downgrade()); } } - - for open_buffer in self.opened_buffers.values_mut() { - if let OpenBuffer::Strong(buffer) = open_buffer { - *open_buffer = OpenBuffer::Weak(buffer.downgrade()); - } - } - - self.metadata_changed(cx); - cx.notify(); - self.client.send(proto::UnshareProject { project_id })?; } + for open_buffer in self.opened_buffers.values_mut() { + if let OpenBuffer::Strong(buffer) = open_buffer { + *open_buffer = OpenBuffer::Weak(buffer.downgrade()); + } + } + + self.metadata_changed(cx); + cx.notify(); + self.client.send(proto::UnshareProject { + project_id: remote_id, + })?; + Ok(()) } else { - Err(anyhow!("attempted to unshare a remote project")) + Err(anyhow!("attempted to unshare an unshared project")) } } fn disconnected_from_host(&mut self, cx: &mut ModelContext) { - if let ProjectClientState::Remote { + if let Some(ProjectClientState::Remote { sharing_has_stopped, .. - } = &mut self.client_state + }) = &mut self.client_state { *sharing_has_stopped = true; self.collaborators.clear(); @@ -1181,18 +1180,18 @@ impl Project { pub fn is_read_only(&self) -> bool { match &self.client_state { - ProjectClientState::Local { .. } => false, - ProjectClientState::Remote { + Some(ProjectClientState::Remote { sharing_has_stopped, .. - } => *sharing_has_stopped, + }) => *sharing_has_stopped, + _ => false, } } pub fn is_local(&self) -> bool { match &self.client_state { - ProjectClientState::Local { .. } => true, - ProjectClientState::Remote { .. } => false, + Some(ProjectClientState::Remote { .. }) => false, + _ => true, } } @@ -4165,8 +4164,8 @@ impl Project { pub fn is_shared(&self) -> bool { match &self.client_state { - ProjectClientState::Local { remote_id, .. } => remote_id.is_some(), - ProjectClientState::Remote { .. } => false, + Some(ProjectClientState::Local { .. }) => true, + _ => false, } } @@ -5958,20 +5957,21 @@ impl Entity for Project { self.project_store.update(cx, ProjectStore::prune_projects); match &self.client_state { - ProjectClientState::Local { remote_id, .. } => { - if let Some(project_id) = *remote_id { - self.client - .send(proto::UnshareProject { project_id }) - .log_err(); - } + Some(ProjectClientState::Local { remote_id, .. }) => { + self.client + .send(proto::UnshareProject { + project_id: *remote_id, + }) + .log_err(); } - ProjectClientState::Remote { remote_id, .. } => { + Some(ProjectClientState::Remote { remote_id, .. }) => { self.client .send(proto::LeaveProject { project_id: *remote_id, }) .log_err(); } + _ => {} } }