From e50c48852ae4d7569c6ab28f5019ec5addf314b4 Mon Sep 17 00:00:00 2001 From: Max Brunsfeld Date: Fri, 7 Apr 2023 16:27:48 -0700 Subject: [PATCH] Wait for host to acknowledge buffer updates before sending them to other guests --- crates/collab/src/rpc.rs | 35 +++++++++++++++++++++++++++++------ crates/project/src/project.rs | 6 +++--- crates/rpc/src/rpc.rs | 2 +- 3 files changed, 33 insertions(+), 10 deletions(-) diff --git a/crates/collab/src/rpc.rs b/crates/collab/src/rpc.rs index 42a88d7d4c..c9b9efdc4c 100644 --- a/crates/collab/src/rpc.rs +++ b/crates/collab/src/rpc.rs @@ -1655,17 +1655,40 @@ async fn update_buffer( ) -> Result<()> { session.executor.record_backtrace(); let project_id = ProjectId::from_proto(request.project_id); - let project_connection_ids = session - .db() - .await - .project_connection_ids(project_id, session.connection_id) - .await?; + let host_connection_id = { + let collaborators = session + .db() + .await + .project_collaborators(project_id, session.connection_id) + .await?; + + let host = collaborators + .iter() + .find(|collaborator| collaborator.is_host) + .ok_or_else(|| anyhow!("host not found"))?; + host.connection_id + }; + + if host_connection_id != session.connection_id { + session + .peer + .forward_request(session.connection_id, host_connection_id, request.clone()) + .await?; + } session.executor.record_backtrace(); + let collaborators = session + .db() + .await + .project_collaborators(project_id, session.connection_id) + .await?; broadcast( Some(session.connection_id), - project_connection_ids.iter().copied(), + collaborators + .iter() + .filter(|collaborator| !collaborator.is_host) + .map(|collaborator| collaborator.connection_id), |connection_id| { session .peer diff --git a/crates/project/src/project.rs b/crates/project/src/project.rs index bbaa76ea69..376c84a9d0 100644 --- a/crates/project/src/project.rs +++ b/crates/project/src/project.rs @@ -380,7 +380,7 @@ impl Project { client.add_model_message_handler(Self::handle_unshare_project); client.add_model_message_handler(Self::handle_create_buffer_for_peer); client.add_model_message_handler(Self::handle_update_buffer_file); - client.add_model_message_handler(Self::handle_update_buffer); + client.add_model_request_handler(Self::handle_update_buffer); client.add_model_message_handler(Self::handle_update_diagnostic_summary); client.add_model_message_handler(Self::handle_update_worktree); client.add_model_request_handler(Self::handle_create_project_entry); @@ -5160,7 +5160,7 @@ impl Project { envelope: TypedEnvelope, _: Arc, mut cx: AsyncAppContext, - ) -> Result<()> { + ) -> Result { this.update(&mut cx, |this, cx| { let payload = envelope.payload.clone(); let buffer_id = payload.buffer_id; @@ -5187,7 +5187,7 @@ impl Project { e.insert(OpenBuffer::Operations(ops)); } } - Ok(()) + Ok(proto::Ack {}) }) } diff --git a/crates/rpc/src/rpc.rs b/crates/rpc/src/rpc.rs index bec518b707..898c8c5e98 100644 --- a/crates/rpc/src/rpc.rs +++ b/crates/rpc/src/rpc.rs @@ -6,4 +6,4 @@ pub use conn::Connection; pub use peer::*; mod macros; -pub const PROTOCOL_VERSION: u32 = 50; +pub const PROTOCOL_VERSION: u32 = 51;