Clear project's shared state upon every disconnection

Co-authored-by: Nathan Sobo <nathan@zed.dev>
Co-authored-by: Antonio Scandurra <as-cii@zed.dev>
This commit is contained in:
Max Brunsfeld 2022-10-14 10:17:59 -07:00
parent ad6f9b2499
commit 2d3d07d4d7
2 changed files with 188 additions and 165 deletions

View file

@ -807,7 +807,7 @@ async fn test_host_disconnect(
// Drop client A's connection. Collaborators should disappear and the project should not be shown as shared.
server.disconnect_client(client_a.current_user_id(cx_a));
cx_a.foreground().advance_clock(rpc::RECEIVE_TIMEOUT);
deterministic.advance_clock(rpc::RECEIVE_TIMEOUT);
project_a
.condition(cx_a, |project, _| project.collaborators().is_empty())
.await;
@ -829,6 +829,29 @@ async fn test_host_disconnect(
.await
.unwrap();
assert!(can_close);
let active_call_b = cx_b.read(ActiveCall::global);
active_call_b
.update(cx_b, |call, cx| {
call.invite(client_a.user_id().unwrap(), None, cx)
})
.await
.unwrap();
deterministic.run_until_parked();
active_call_a
.update(cx_a, |call, cx| call.accept_incoming(cx))
.await
.unwrap();
active_call_a
.update(cx_a, |call, cx| call.share_project(project_a.clone(), cx))
.await
.unwrap();
// Drop client A's connection again. We should still unshare it successfully.
server.disconnect_client(client_a.current_user_id(cx_a));
deterministic.advance_clock(rpc::RECEIVE_TIMEOUT);
project_a.read_with(cx_a, |project, _| assert!(!project.is_shared()));
}
#[gpui::test(iterations = 10)]

View file

@ -104,7 +104,7 @@ pub struct Project {
user_store: ModelHandle<UserStore>,
project_store: ModelHandle<ProjectStore>,
fs: Arc<dyn Fs>,
client_state: ProjectClientState,
client_state: Option<ProjectClientState>,
collaborators: HashMap<PeerId, Collaborator>,
client_subscriptions: Vec<client::Subscription>,
_subscriptions: Vec<gpui::Subscription>,
@ -151,7 +151,7 @@ enum WorktreeHandle {
enum ProjectClientState {
Local {
remote_id: Option<u64>,
remote_id: u64,
_detect_unshare: Task<Option<()>>,
},
Remote {
@ -418,21 +418,6 @@ impl Project {
cx: &mut MutableAppContext,
) -> ModelHandle<Self> {
cx.add_model(|cx: &mut ModelContext<Self>| {
let mut status = client.status();
let _detect_unshare = cx.spawn_weak(move |this, mut cx| {
async move {
let is_connected = status.next().await.map_or(false, |s| s.is_connected());
// Even if we're initially connected, any future change of the status means we momentarily disconnected.
if !is_connected || status.next().await.is_some() {
if let Some(this) = this.upgrade(&cx) {
let _ = this.update(&mut cx, |this, cx| this.unshare(cx));
}
}
Ok(())
}
.log_err()
});
let handle = cx.weak_handle();
project_store.update(cx, |store, cx| store.add_project(handle, cx));
@ -445,10 +430,7 @@ impl Project {
loading_buffers: Default::default(),
loading_local_worktrees: Default::default(),
buffer_snapshots: Default::default(),
client_state: ProjectClientState::Local {
remote_id: None,
_detect_unshare,
},
client_state: None,
opened_buffer: watch::channel(),
client_subscriptions: Vec::new(),
_subscriptions: vec![cx.observe_global::<Settings, _>(Self::on_settings_changed)],
@ -522,7 +504,7 @@ impl Project {
client_subscriptions: vec![client.add_model_for_remote_entity(remote_id, cx)],
_subscriptions: Default::default(),
client: client.clone(),
client_state: ProjectClientState::Remote {
client_state: Some(ProjectClientState::Remote {
sharing_has_stopped: false,
remote_id,
replica_id,
@ -541,7 +523,7 @@ impl Project {
}
.log_err()
}),
},
}),
language_servers: Default::default(),
language_server_ids: Default::default(),
language_server_settings: Default::default(),
@ -753,21 +735,22 @@ impl Project {
}
pub fn remote_id(&self) -> Option<u64> {
match &self.client_state {
ProjectClientState::Local { remote_id, .. } => *remote_id,
ProjectClientState::Remote { remote_id, .. } => Some(*remote_id),
match self.client_state.as_ref()? {
ProjectClientState::Local { remote_id, .. }
| ProjectClientState::Remote { remote_id, .. } => Some(*remote_id),
}
}
pub fn replica_id(&self) -> ReplicaId {
match &self.client_state {
ProjectClientState::Local { .. } => 0,
ProjectClientState::Remote { replica_id, .. } => *replica_id,
Some(ProjectClientState::Remote { replica_id, .. }) => *replica_id,
_ => 0,
}
}
fn metadata_changed(&mut self, cx: &mut ModelContext<Self>) {
if let ProjectClientState::Local { remote_id, .. } = &self.client_state {
if let Some(ProjectClientState::Local { remote_id, .. }) = &self.client_state {
let project_id = *remote_id;
// Broadcast worktrees only if the project is online.
let worktrees = self
.worktrees
@ -778,40 +761,40 @@ impl Project {
.map(|worktree| worktree.read(cx).as_local().unwrap().metadata_proto())
})
.collect();
if let Some(project_id) = *remote_id {
self.client
.send(proto::UpdateProject {
project_id,
worktrees,
})
.log_err();
let worktrees = self.visible_worktrees(cx).collect::<Vec<_>>();
let scans_complete =
futures::future::join_all(worktrees.iter().filter_map(|worktree| {
Some(worktree.read(cx).as_local()?.scan_complete())
}));
let worktrees = worktrees.into_iter().map(|handle| handle.downgrade());
cx.spawn_weak(move |_, cx| async move {
scans_complete.await;
cx.read(|cx| {
for worktree in worktrees {
if let Some(worktree) = worktree
.upgrade(cx)
.and_then(|worktree| worktree.read(cx).as_local())
{
worktree.send_extension_counts(project_id);
}
}
})
self.client
.send(proto::UpdateProject {
project_id,
worktrees,
})
.detach();
}
.log_err();
self.project_store.update(cx, |_, cx| cx.notify());
cx.notify();
let worktrees = self.visible_worktrees(cx).collect::<Vec<_>>();
let scans_complete = futures::future::join_all(
worktrees
.iter()
.filter_map(|worktree| Some(worktree.read(cx).as_local()?.scan_complete())),
);
let worktrees = worktrees.into_iter().map(|handle| handle.downgrade());
cx.spawn_weak(move |_, cx| async move {
scans_complete.await;
cx.read(|cx| {
for worktree in worktrees {
if let Some(worktree) = worktree
.upgrade(cx)
.and_then(|worktree| worktree.read(cx).as_local())
{
worktree.send_extension_counts(project_id);
}
}
})
})
.detach();
}
self.project_store.update(cx, |_, cx| cx.notify());
cx.notify();
}
pub fn collaborators(&self) -> &HashMap<PeerId, Collaborator> {
@ -1051,113 +1034,129 @@ impl Project {
}
pub fn shared(&mut self, project_id: u64, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
if let ProjectClientState::Local { remote_id, .. } = &mut self.client_state {
if remote_id.is_some() {
return Task::ready(Err(anyhow!("project was already shared")));
}
*remote_id = Some(project_id);
let mut worktree_share_tasks = Vec::new();
for open_buffer in self.opened_buffers.values_mut() {
match open_buffer {
OpenBuffer::Strong(_) => {}
OpenBuffer::Weak(buffer) => {
if let Some(buffer) = buffer.upgrade(cx) {
*open_buffer = OpenBuffer::Strong(buffer);
}
}
OpenBuffer::Operations(_) => unreachable!(),
}
}
for worktree_handle in self.worktrees.iter_mut() {
match worktree_handle {
WorktreeHandle::Strong(_) => {}
WorktreeHandle::Weak(worktree) => {
if let Some(worktree) = worktree.upgrade(cx) {
*worktree_handle = WorktreeHandle::Strong(worktree);
}
}
}
}
for (server_id, status) in &self.language_server_statuses {
self.client
.send(proto::StartLanguageServer {
project_id,
server: Some(proto::LanguageServer {
id: *server_id as u64,
name: status.name.clone(),
}),
})
.log_err();
}
for worktree in self.worktrees(cx).collect::<Vec<_>>() {
worktree.update(cx, |worktree, cx| {
let worktree = worktree.as_local_mut().unwrap();
worktree_share_tasks.push(worktree.share(project_id, cx));
});
}
self.client_subscriptions
.push(self.client.add_model_for_remote_entity(project_id, cx));
self.metadata_changed(cx);
cx.emit(Event::RemoteIdChanged(Some(project_id)));
cx.notify();
cx.foreground().spawn(async move {
futures::future::try_join_all(worktree_share_tasks).await?;
Ok(())
})
} else {
Task::ready(Err(anyhow!("can't share a remote project")))
if self.client_state.is_some() {
return Task::ready(Err(anyhow!("project was already shared")));
}
let mut worktree_share_tasks = Vec::new();
for open_buffer in self.opened_buffers.values_mut() {
match open_buffer {
OpenBuffer::Strong(_) => {}
OpenBuffer::Weak(buffer) => {
if let Some(buffer) = buffer.upgrade(cx) {
*open_buffer = OpenBuffer::Strong(buffer);
}
}
OpenBuffer::Operations(_) => unreachable!(),
}
}
for worktree_handle in self.worktrees.iter_mut() {
match worktree_handle {
WorktreeHandle::Strong(_) => {}
WorktreeHandle::Weak(worktree) => {
if let Some(worktree) = worktree.upgrade(cx) {
*worktree_handle = WorktreeHandle::Strong(worktree);
}
}
}
}
for (server_id, status) in &self.language_server_statuses {
self.client
.send(proto::StartLanguageServer {
project_id,
server: Some(proto::LanguageServer {
id: *server_id as u64,
name: status.name.clone(),
}),
})
.log_err();
}
for worktree in self.worktrees(cx).collect::<Vec<_>>() {
worktree.update(cx, |worktree, cx| {
let worktree = worktree.as_local_mut().unwrap();
worktree_share_tasks.push(worktree.share(project_id, cx));
});
}
self.client_subscriptions
.push(self.client.add_model_for_remote_entity(project_id, cx));
self.metadata_changed(cx);
cx.emit(Event::RemoteIdChanged(Some(project_id)));
cx.notify();
let mut status = self.client.status();
self.client_state = Some(ProjectClientState::Local {
remote_id: project_id,
_detect_unshare: cx.spawn_weak(move |this, mut cx| {
async move {
let is_connected = status.next().await.map_or(false, |s| s.is_connected());
// Even if we're initially connected, any future change of the status means we momentarily disconnected.
if !is_connected || status.next().await.is_some() {
if let Some(this) = this.upgrade(&cx) {
let _ = this.update(&mut cx, |this, cx| this.unshare(cx));
}
}
Ok(())
}
.log_err()
}),
});
cx.foreground().spawn(async move {
futures::future::try_join_all(worktree_share_tasks).await?;
Ok(())
})
}
pub fn unshare(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
if let ProjectClientState::Local { remote_id, .. } = &mut self.client_state {
if let Some(project_id) = remote_id.take() {
self.collaborators.clear();
self.shared_buffers.clear();
self.client_subscriptions.clear();
if self.is_remote() {
return Err(anyhow!("attempted to unshare a remote project"));
}
for worktree_handle in self.worktrees.iter_mut() {
if let WorktreeHandle::Strong(worktree) = worktree_handle {
let is_visible = worktree.update(cx, |worktree, _| {
worktree.as_local_mut().unwrap().unshare();
worktree.is_visible()
});
if !is_visible {
*worktree_handle = WorktreeHandle::Weak(worktree.downgrade());
}
if let Some(ProjectClientState::Local { remote_id, .. }) = self.client_state.take() {
self.collaborators.clear();
self.shared_buffers.clear();
self.client_subscriptions.clear();
for worktree_handle in self.worktrees.iter_mut() {
if let WorktreeHandle::Strong(worktree) = worktree_handle {
let is_visible = worktree.update(cx, |worktree, _| {
worktree.as_local_mut().unwrap().unshare();
worktree.is_visible()
});
if !is_visible {
*worktree_handle = WorktreeHandle::Weak(worktree.downgrade());
}
}
for open_buffer in self.opened_buffers.values_mut() {
if let OpenBuffer::Strong(buffer) = open_buffer {
*open_buffer = OpenBuffer::Weak(buffer.downgrade());
}
}
self.metadata_changed(cx);
cx.notify();
self.client.send(proto::UnshareProject { project_id })?;
}
for open_buffer in self.opened_buffers.values_mut() {
if let OpenBuffer::Strong(buffer) = open_buffer {
*open_buffer = OpenBuffer::Weak(buffer.downgrade());
}
}
self.metadata_changed(cx);
cx.notify();
self.client.send(proto::UnshareProject {
project_id: remote_id,
})?;
Ok(())
} else {
Err(anyhow!("attempted to unshare a remote project"))
Err(anyhow!("attempted to unshare an unshared project"))
}
}
fn disconnected_from_host(&mut self, cx: &mut ModelContext<Self>) {
if let ProjectClientState::Remote {
if let Some(ProjectClientState::Remote {
sharing_has_stopped,
..
} = &mut self.client_state
}) = &mut self.client_state
{
*sharing_has_stopped = true;
self.collaborators.clear();
@ -1181,18 +1180,18 @@ impl Project {
pub fn is_read_only(&self) -> bool {
match &self.client_state {
ProjectClientState::Local { .. } => false,
ProjectClientState::Remote {
Some(ProjectClientState::Remote {
sharing_has_stopped,
..
} => *sharing_has_stopped,
}) => *sharing_has_stopped,
_ => false,
}
}
pub fn is_local(&self) -> bool {
match &self.client_state {
ProjectClientState::Local { .. } => true,
ProjectClientState::Remote { .. } => false,
Some(ProjectClientState::Remote { .. }) => false,
_ => true,
}
}
@ -4165,8 +4164,8 @@ impl Project {
pub fn is_shared(&self) -> bool {
match &self.client_state {
ProjectClientState::Local { remote_id, .. } => remote_id.is_some(),
ProjectClientState::Remote { .. } => false,
Some(ProjectClientState::Local { .. }) => true,
_ => false,
}
}
@ -5958,20 +5957,21 @@ impl Entity for Project {
self.project_store.update(cx, ProjectStore::prune_projects);
match &self.client_state {
ProjectClientState::Local { remote_id, .. } => {
if let Some(project_id) = *remote_id {
self.client
.send(proto::UnshareProject { project_id })
.log_err();
}
Some(ProjectClientState::Local { remote_id, .. }) => {
self.client
.send(proto::UnshareProject {
project_id: *remote_id,
})
.log_err();
}
ProjectClientState::Remote { remote_id, .. } => {
Some(ProjectClientState::Remote { remote_id, .. }) => {
self.client
.send(proto::LeaveProject {
project_id: *remote_id,
})
.log_err();
}
_ => {}
}
}