From 47348542efeab4ec4d041da1bb41dae00bc94073 Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Wed, 21 Dec 2022 14:20:56 +0100 Subject: [PATCH] Synchronize buffers when either the host or a guest reconnects --- crates/collab/src/rpc.rs | 1 + crates/editor/src/multi_buffer.rs | 2 +- crates/language/src/buffer.rs | 8 +- crates/language/src/buffer_tests.rs | 10 +- crates/project/src/project.rs | 176 +++++++++++++++++++++++++--- crates/rpc/proto/zed.proto | 16 +++ crates/rpc/src/proto.rs | 4 + 7 files changed, 194 insertions(+), 23 deletions(-) diff --git a/crates/collab/src/rpc.rs b/crates/collab/src/rpc.rs index d7cff90d61..9014ef7f40 100644 --- a/crates/collab/src/rpc.rs +++ b/crates/collab/src/rpc.rs @@ -216,6 +216,7 @@ impl Server { .add_request_handler(forward_project_request::) .add_request_handler(forward_project_request::) .add_request_handler(forward_project_request::) + .add_request_handler(forward_project_request::) .add_request_handler(forward_project_request::) .add_request_handler(forward_project_request::) .add_request_handler(forward_project_request::) diff --git a/crates/editor/src/multi_buffer.rs b/crates/editor/src/multi_buffer.rs index 0f6e357ddd..0a55fc1f4e 100644 --- a/crates/editor/src/multi_buffer.rs +++ b/crates/editor/src/multi_buffer.rs @@ -3651,7 +3651,7 @@ mod tests { let state = host_buffer.read(cx).to_proto(); let ops = cx .background() - .block(host_buffer.read(cx).serialize_ops(cx)); + .block(host_buffer.read(cx).serialize_ops(None, cx)); let mut buffer = Buffer::from_proto(1, state, None).unwrap(); buffer .apply_ops( diff --git a/crates/language/src/buffer.rs b/crates/language/src/buffer.rs index a78bb4af79..41bc2a8bab 100644 --- a/crates/language/src/buffer.rs +++ b/crates/language/src/buffer.rs @@ -398,7 +398,11 @@ impl Buffer { } } - pub fn serialize_ops(&self, cx: &AppContext) -> Task> { + pub fn serialize_ops( + &self, + since: Option, + cx: &AppContext, + ) -> Task> { let mut operations = Vec::new(); operations.extend(self.deferred_ops.iter().map(proto::serialize_operation)); operations.extend(self.remote_selections.iter().map(|(_, set)| { @@ -422,9 +426,11 @@ impl Buffer { let text_operations = self.text.operations().clone(); cx.background().spawn(async move { + let since = since.unwrap_or_default(); operations.extend( text_operations .iter() + .filter(|(_, op)| !since.observed(op.local_timestamp())) .map(|(_, op)| proto::serialize_operation(&Operation::Buffer(op.clone()))), ); operations.sort_unstable_by_key(proto::lamport_timestamp_for_operation); diff --git a/crates/language/src/buffer_tests.rs b/crates/language/src/buffer_tests.rs index 5f2fdf6e8e..e0b7d080cb 100644 --- a/crates/language/src/buffer_tests.rs +++ b/crates/language/src/buffer_tests.rs @@ -1275,7 +1275,9 @@ fn test_serialization(cx: &mut gpui::MutableAppContext) { assert_eq!(buffer1.read(cx).text(), "abcDF"); let state = buffer1.read(cx).to_proto(); - let ops = cx.background().block(buffer1.read(cx).serialize_ops(cx)); + let ops = cx + .background() + .block(buffer1.read(cx).serialize_ops(None, cx)); let buffer2 = cx.add_model(|cx| { let mut buffer = Buffer::from_proto(1, state, None).unwrap(); buffer @@ -1316,7 +1318,7 @@ fn test_random_collaboration(cx: &mut MutableAppContext, mut rng: StdRng) { let state = base_buffer.read(cx).to_proto(); let ops = cx .background() - .block(base_buffer.read(cx).serialize_ops(cx)); + .block(base_buffer.read(cx).serialize_ops(None, cx)); let mut buffer = Buffer::from_proto(i as ReplicaId, state, None).unwrap(); buffer .apply_ops( @@ -1413,7 +1415,9 @@ fn test_random_collaboration(cx: &mut MutableAppContext, mut rng: StdRng) { } 50..=59 if replica_ids.len() < max_peers => { let old_buffer_state = buffer.read(cx).to_proto(); - let old_buffer_ops = cx.background().block(buffer.read(cx).serialize_ops(cx)); + let old_buffer_ops = cx + .background() + .block(buffer.read(cx).serialize_ops(None, cx)); let new_replica_id = (0..=replica_ids.len() as ReplicaId) .filter(|replica_id| *replica_id != buffer.read(cx).replica_id()) .choose(&mut rng) diff --git a/crates/project/src/project.rs b/crates/project/src/project.rs index 73f11f09b0..7a6bce3b9d 100644 --- a/crates/project/src/project.rs +++ b/crates/project/src/project.rs @@ -379,6 +379,7 @@ impl Project { client.add_model_request_handler(Self::handle_apply_additional_edits_for_completion); client.add_model_request_handler(Self::handle_apply_code_action); client.add_model_request_handler(Self::handle_reload_buffers); + client.add_model_request_handler(Self::handle_synchronize_buffers); client.add_model_request_handler(Self::handle_format_buffers); client.add_model_request_handler(Self::handle_get_code_actions); client.add_model_request_handler(Self::handle_get_completions); @@ -1082,7 +1083,6 @@ impl Project { ) -> Result<()> { self.set_worktrees_from_proto(message.worktrees, cx)?; self.set_collaborators_from_proto(message.collaborators, cx)?; - self.language_server_statuses = message .language_servers .into_iter() @@ -1098,6 +1098,7 @@ impl Project { ) }) .collect(); + self.synchronize_remote_buffers(cx).detach_and_log_err(cx); cx.notify(); Ok(()) @@ -4631,12 +4632,17 @@ impl Project { .collaborators .remove(&old_peer_id) .ok_or_else(|| anyhow!("received UpdateProjectCollaborator for unknown peer"))?; + let is_host = collaborator.replica_id == 0; this.collaborators.insert(new_peer_id, collaborator); if let Some(buffers) = this.shared_buffers.remove(&old_peer_id) { this.shared_buffers.insert(new_peer_id, buffers); } + if is_host { + this.synchronize_remote_buffers(cx).detach_and_log_err(cx); + } + cx.emit(Event::CollaboratorUpdated { old_peer_id, new_peer_id, @@ -5131,6 +5137,55 @@ impl Project { }) } + async fn handle_synchronize_buffers( + this: ModelHandle, + envelope: TypedEnvelope, + _: Arc, + cx: AsyncAppContext, + ) -> Result { + let project_id = envelope.payload.project_id; + let mut response = proto::SynchronizeBuffersResponse { + buffers: Default::default(), + }; + + this.read_with(&cx, |this, cx| { + for buffer in envelope.payload.buffers { + let buffer_id = buffer.id; + let remote_version = language::proto::deserialize_version(buffer.version); + if let Some(buffer) = this.buffer_for_id(buffer_id, cx) { + let buffer = buffer.read(cx); + response.buffers.push(proto::BufferVersion { + id: buffer_id, + version: language::proto::serialize_version(&buffer.version), + }); + + let operations = buffer.serialize_ops(Some(remote_version), cx); + let client = this.client.clone(); + cx.background() + .spawn( + async move { + let operations = operations.await; + for chunk in split_operations(operations) { + client + .request(proto::UpdateBuffer { + project_id, + buffer_id, + operations: chunk, + }) + .await?; + } + anyhow::Ok(()) + } + .log_err(), + ) + .detach(); + } + } + }); + + Ok(response) + } + async fn handle_format_buffers( this: ModelHandle, envelope: TypedEnvelope, @@ -5557,12 +5612,12 @@ impl Project { if shared_buffers.insert(buffer_id) { let buffer = buffer.read(cx); let state = buffer.to_proto(); - let operations = buffer.serialize_ops(cx); + let operations = buffer.serialize_ops(None, cx); let client = self.client.clone(); cx.background() .spawn( async move { - let mut operations = operations.await; + let operations = operations.await; client.send(proto::CreateBufferForPeer { project_id, @@ -5570,17 +5625,9 @@ impl Project { variant: Some(proto::create_buffer_for_peer::Variant::State(state)), })?; - loop { - #[cfg(any(test, feature = "test-support"))] - const CHUNK_SIZE: usize = 5; - - #[cfg(not(any(test, feature = "test-support")))] - const CHUNK_SIZE: usize = 100; - - let chunk = operations - .drain(..cmp::min(CHUNK_SIZE, operations.len())) - .collect(); - let is_last = operations.is_empty(); + let mut chunks = split_operations(operations).peekable(); + while let Some(chunk) = chunks.next() { + let is_last = chunks.peek().is_none(); client.send(proto::CreateBufferForPeer { project_id, peer_id: Some(peer_id), @@ -5592,10 +5639,6 @@ impl Project { }, )), })?; - - if is_last { - break; - } } Ok(()) @@ -5638,6 +5681,81 @@ impl Project { }) } + fn synchronize_remote_buffers(&mut self, cx: &mut ModelContext) -> Task> { + let project_id = match self.client_state.as_ref() { + Some(ProjectClientState::Remote { + sharing_has_stopped, + remote_id, + .. + }) => { + if *sharing_has_stopped { + return Task::ready(Err(anyhow!( + "can't synchronize remote buffers on a readonly project" + ))); + } else { + *remote_id + } + } + Some(ProjectClientState::Local { .. }) | None => { + return Task::ready(Err(anyhow!( + "can't synchronize remote buffers on a local project" + ))) + } + }; + + let client = self.client.clone(); + cx.spawn(|this, cx| async move { + let buffers = this.read_with(&cx, |this, cx| { + this.opened_buffers + .iter() + .filter_map(|(id, buffer)| { + let buffer = buffer.upgrade(cx)?; + Some(proto::BufferVersion { + id: *id, + version: language::proto::serialize_version(&buffer.read(cx).version), + }) + }) + .collect() + }); + let response = client + .request(proto::SynchronizeBuffers { + project_id, + buffers, + }) + .await?; + + let send_updates_for_buffers = response.buffers.into_iter().map(|buffer| { + let client = client.clone(); + let buffer_id = buffer.id; + let remote_version = language::proto::deserialize_version(buffer.version); + this.read_with(&cx, |this, cx| { + if let Some(buffer) = this.buffer_for_id(buffer_id, cx) { + let operations = buffer.read(cx).serialize_ops(Some(remote_version), cx); + cx.background().spawn(async move { + let operations = operations.await; + for chunk in split_operations(operations) { + client + .request(proto::UpdateBuffer { + project_id, + buffer_id, + operations: chunk, + }) + .await?; + } + anyhow::Ok(()) + }) + } else { + Task::ready(Ok(())) + } + }) + }); + futures::future::join_all(send_updates_for_buffers) + .await + .into_iter() + .collect() + }) + } + pub fn worktree_metadata_protos(&self, cx: &AppContext) -> Vec { self.worktrees(cx) .map(|worktree| { @@ -6126,6 +6244,28 @@ impl> From<(WorktreeId, P)> for ProjectPath { } } +fn split_operations( + mut operations: Vec, +) -> impl Iterator> { + #[cfg(any(test, feature = "test-support"))] + const CHUNK_SIZE: usize = 5; + + #[cfg(not(any(test, feature = "test-support")))] + const CHUNK_SIZE: usize = 100; + + std::iter::from_fn(move || { + if operations.is_empty() { + return None; + } + + Some( + operations + .drain(..cmp::min(CHUNK_SIZE, operations.len())) + .collect(), + ) + }) +} + fn serialize_symbol(symbol: &Symbol) -> proto::Symbol { proto::Symbol { language_server_name: symbol.language_server_name.0.to_string(), diff --git a/crates/rpc/proto/zed.proto b/crates/rpc/proto/zed.proto index f3b8a41a1d..740ac1467c 100644 --- a/crates/rpc/proto/zed.proto +++ b/crates/rpc/proto/zed.proto @@ -79,6 +79,8 @@ message Envelope { BufferReloaded buffer_reloaded = 61; ReloadBuffers reload_buffers = 62; ReloadBuffersResponse reload_buffers_response = 63; + SynchronizeBuffers synchronize_buffers = 200; + SynchronizeBuffersResponse synchronize_buffers_response = 201; FormatBuffers format_buffers = 64; FormatBuffersResponse format_buffers_response = 65; GetCompletions get_completions = 66; @@ -538,6 +540,20 @@ message ReloadBuffersResponse { ProjectTransaction transaction = 1; } +message SynchronizeBuffers { + uint64 project_id = 1; + repeated BufferVersion buffers = 2; +} + +message SynchronizeBuffersResponse { + repeated BufferVersion buffers = 1; +} + +message BufferVersion { + uint64 id = 1; + repeated VectorClockEntry version = 2; +} + enum FormatTrigger { Save = 0; Manual = 1; diff --git a/crates/rpc/src/proto.rs b/crates/rpc/src/proto.rs index b2017b839a..14541e4b66 100644 --- a/crates/rpc/src/proto.rs +++ b/crates/rpc/src/proto.rs @@ -207,6 +207,8 @@ messages!( (ShareProjectResponse, Foreground), (ShowContacts, Foreground), (StartLanguageServer, Foreground), + (SynchronizeBuffers, Foreground), + (SynchronizeBuffersResponse, Foreground), (Test, Foreground), (Unfollow, Foreground), (UnshareProject, Foreground), @@ -274,6 +276,7 @@ request_messages!( (SearchProject, SearchProjectResponse), (SendChannelMessage, SendChannelMessageResponse), (ShareProject, ShareProjectResponse), + (SynchronizeBuffers, SynchronizeBuffersResponse), (Test, Test), (UpdateBuffer, Ack), (UpdateParticipantLocation, Ack), @@ -315,6 +318,7 @@ entity_messages!( SaveBuffer, SearchProject, StartLanguageServer, + SynchronizeBuffers, Unfollow, UnshareProject, UpdateBuffer,