From 9e2949e7baa848ce20717a27170b97d13dd7138a Mon Sep 17 00:00:00 2001 From: Julia Date: Wed, 19 Apr 2023 16:30:24 -0400 Subject: [PATCH] Refactor language server startup Avoid parallel vecs Co-Authored-By: Max Brunsfeld --- crates/language/src/language.rs | 99 +++--- crates/project/src/project.rs | 559 ++++++++++++++++---------------- 2 files changed, 325 insertions(+), 333 deletions(-) diff --git a/crates/language/src/language.rs b/crates/language/src/language.rs index 38548c02db..6c440e116b 100644 --- a/crates/language/src/language.rs +++ b/crates/language/src/language.rs @@ -782,13 +782,14 @@ impl LanguageRegistry { self.state.read().languages.iter().cloned().collect() } - pub fn start_language_servers( + pub fn start_language_server( self: &Arc, language: Arc, + adapter: Arc, root_path: Arc, http_client: Arc, cx: &mut AppContext, - ) -> Vec { + ) -> Option { #[cfg(any(test, feature = "test-support"))] if language.fake_adapter.is_some() { let task = cx.spawn(|cx| async move { @@ -819,70 +820,60 @@ impl LanguageRegistry { }); let server_id = post_inc(&mut self.state.write().next_language_server_id); - return vec![PendingLanguageServer { server_id, task }]; + return Some(PendingLanguageServer { server_id, task }); } let download_dir = self .language_server_download_dir .clone() .ok_or_else(|| anyhow!("language server download directory has not been assigned")) - .log_err(); - let download_dir = match download_dir { - Some(download_dir) => download_dir, - None => return Vec::new(), - }; + .log_err()?; - let mut results = Vec::new(); + let this = self.clone(); + let language = language.clone(); + let http_client = http_client.clone(); + let download_dir = download_dir.clone(); + let root_path = root_path.clone(); + let adapter = adapter.clone(); + let lsp_binary_statuses = self.lsp_binary_statuses_tx.clone(); + let login_shell_env_loaded = self.login_shell_env_loaded.clone(); + let server_id = post_inc(&mut self.state.write().next_language_server_id); - for adapter in &language.adapters { - let this = self.clone(); - let language = language.clone(); - let http_client = http_client.clone(); - let download_dir = download_dir.clone(); - let root_path = root_path.clone(); - let adapter = adapter.clone(); - let lsp_binary_statuses = self.lsp_binary_statuses_tx.clone(); - let login_shell_env_loaded = self.login_shell_env_loaded.clone(); - let server_id = post_inc(&mut self.state.write().next_language_server_id); + let task = cx.spawn(|cx| async move { + login_shell_env_loaded.await; - let task = cx.spawn(|cx| async move { - login_shell_env_loaded.await; + let mut lock = this.lsp_binary_paths.lock(); + let entry = lock + .entry(adapter.name.clone()) + .or_insert_with(|| { + get_binary( + adapter.clone(), + language.clone(), + http_client, + download_dir, + lsp_binary_statuses, + ) + .map_err(Arc::new) + .boxed() + .shared() + }) + .clone(); + drop(lock); + let binary = entry.clone().map_err(|e| anyhow!(e)).await?; - let mut lock = this.lsp_binary_paths.lock(); - let entry = lock - .entry(adapter.name.clone()) - .or_insert_with(|| { - get_binary( - adapter.clone(), - language.clone(), - http_client, - download_dir, - lsp_binary_statuses, - ) - .map_err(Arc::new) - .boxed() - .shared() - }) - .clone(); - drop(lock); - let binary = entry.clone().map_err(|e| anyhow!(e)).await?; + let server = lsp::LanguageServer::new( + server_id, + &binary.path, + &binary.arguments, + &root_path, + adapter.code_action_kinds(), + cx, + )?; - let server = lsp::LanguageServer::new( - server_id, - &binary.path, - &binary.arguments, - &root_path, - adapter.code_action_kinds(), - cx, - )?; + Ok(server) + }); - Ok(server) - }); - - results.push(PendingLanguageServer { server_id, task }); - } - - results + Some(PendingLanguageServer { server_id, task }) } pub fn language_server_binary_statuses( diff --git a/crates/project/src/project.rs b/crates/project/src/project.rs index 3963c42c46..95049e8aeb 100644 --- a/crates/project/src/project.rs +++ b/crates/project/src/project.rs @@ -2137,17 +2137,23 @@ impl Project { return; } - let adapters = language.lsp_adapters(); - let language_servers = self.languages.start_language_servers( - language.clone(), - worktree_path.clone(), - self.client.http_client(), - cx, - ); - debug_assert_eq!(adapters.len(), language_servers.len()); - - for (adapter, pending_server) in adapters.into_iter().zip(language_servers.into_iter()) { + for adapter in language.lsp_adapters() { let key = (worktree_id, adapter.name.clone()); + if self.language_server_ids.contains_key(&key) { + continue; + } + + let pending_server = match self.languages.start_language_server( + language.clone(), + adapter.clone(), + worktree_path.clone(), + self.client.http_client(), + cx, + ) { + Some(pending_server) => pending_server, + None => continue, + }; + let lsp = &cx.global::().lsp.get(&adapter.name.0); let override_options = lsp.map(|s| s.initialization_options.clone()).flatten(); @@ -2160,17 +2166,17 @@ impl Project { _ => {} } - if !self.language_server_ids.contains_key(&key) { - let adapter = self.setup_pending_language_server( - initialization_options, - pending_server, - adapter.clone(), - language.clone(), - key.clone(), - cx, - ); - self.language_server_ids.insert(key.clone(), adapter); - } + let server_id = pending_server.server_id; + let state = self.setup_pending_language_server( + initialization_options, + pending_server, + adapter.clone(), + language.clone(), + key.clone(), + cx, + ); + self.language_servers.insert(server_id, state); + self.language_server_ids.insert(key.clone(), server_id); } } @@ -2182,286 +2188,281 @@ impl Project { language: Arc, key: (WorktreeId, LanguageServerName), cx: &mut ModelContext, - ) -> usize { + ) -> LanguageServerState { let server_id = pending_server.server_id; let languages = self.languages.clone(); - self.language_servers.insert( - server_id, - LanguageServerState::Starting(cx.spawn_weak(|this, mut cx| async move { - let workspace_config = cx.update(|cx| languages.workspace_configuration(cx)).await; - let language_server = pending_server.task.await.log_err()?; - let language_server = language_server - .initialize(initialization_options) - .await - .log_err()?; - let this = this.upgrade(&cx)?; + LanguageServerState::Starting(cx.spawn_weak(|this, mut cx| async move { + let workspace_config = cx.update(|cx| languages.workspace_configuration(cx)).await; + let language_server = pending_server.task.await.log_err()?; + let language_server = language_server + .initialize(initialization_options) + .await + .log_err()?; + let this = this.upgrade(&cx)?; - language_server - .on_notification::({ - let this = this.downgrade(); + language_server + .on_notification::({ + let this = this.downgrade(); + let adapter = adapter.clone(); + move |mut params, cx| { + let this = this; let adapter = adapter.clone(); - move |mut params, cx| { - let this = this; - let adapter = adapter.clone(); - cx.spawn(|mut cx| async move { - adapter.process_diagnostics(&mut params).await; - if let Some(this) = this.upgrade(&cx) { - this.update(&mut cx, |this, cx| { - this.update_diagnostics( - server_id, - params, - &adapter.disk_based_diagnostic_sources, - cx, - ) - .log_err(); - }); - } - }) - .detach(); - } - }) - .detach(); - - language_server - .on_request::({ - let languages = languages.clone(); - move |params, mut cx| { - let languages = languages.clone(); - async move { - dbg!(¶ms.items); - let workspace_config = - cx.update(|cx| languages.workspace_configuration(cx)).await; - Ok(params - .items - .into_iter() - .map(|item| { - if let Some(section) = &item.section { - workspace_config - .get(section) - .cloned() - .unwrap_or(serde_json::Value::Null) - } else { - workspace_config.clone() - } - }) - .collect()) - } - } - }) - .detach(); - - // Even though we don't have handling for these requests, respond to them to - // avoid stalling any language server like `gopls` which waits for a response - // to these requests when initializing. - language_server - .on_request::({ - let this = this.downgrade(); - move |params, mut cx| async move { - if let Some(this) = this.upgrade(&cx) { - this.update(&mut cx, |this, _| { - if let Some(status) = - this.language_server_statuses.get_mut(&server_id) - { - if let lsp::NumberOrString::String(token) = params.token { - status.progress_tokens.insert(token); - } - } - }); - } - Ok(()) - } - }) - .detach(); - language_server - .on_request::({ - let this = this.downgrade(); - move |params, mut cx| async move { - let this = this - .upgrade(&cx) - .ok_or_else(|| anyhow!("project dropped"))?; - for reg in params.registrations { - if reg.method == "workspace/didChangeWatchedFiles" { - if let Some(options) = reg.register_options { - let options = serde_json::from_value(options)?; - this.update(&mut cx, |this, cx| { - this.on_lsp_did_change_watched_files( - server_id, options, cx, - ); - }); - } - } - } - Ok(()) - } - }) - .detach(); - - language_server - .on_request::({ - let this = this.downgrade(); - let adapter = adapter.clone(); - let language_server = language_server.clone(); - move |params, cx| { - Self::on_lsp_workspace_edit( - this, - params, - server_id, - adapter.clone(), - language_server.clone(), - cx, - ) - } - }) - .detach(); - - let disk_based_diagnostics_progress_token = - adapter.disk_based_diagnostics_progress_token.clone(); - - language_server - .on_notification::({ - let this = this.downgrade(); - move |params, mut cx| { + cx.spawn(|mut cx| async move { + adapter.process_diagnostics(&mut params).await; if let Some(this) = this.upgrade(&cx) { this.update(&mut cx, |this, cx| { - this.on_lsp_progress( - params, + this.update_diagnostics( server_id, - disk_based_diagnostics_progress_token.clone(), + params, + &adapter.disk_based_diagnostic_sources, cx, - ); + ) + .log_err(); }); } + }) + .detach(); + } + }) + .detach(); + + language_server + .on_request::({ + let languages = languages.clone(); + move |params, mut cx| { + let languages = languages.clone(); + async move { + let workspace_config = + cx.update(|cx| languages.workspace_configuration(cx)).await; + Ok(params + .items + .into_iter() + .map(|item| { + if let Some(section) = &item.section { + workspace_config + .get(section) + .cloned() + .unwrap_or(serde_json::Value::Null) + } else { + workspace_config.clone() + } + }) + .collect()) + } + } + }) + .detach(); + + // Even though we don't have handling for these requests, respond to them to + // avoid stalling any language server like `gopls` which waits for a response + // to these requests when initializing. + language_server + .on_request::({ + let this = this.downgrade(); + move |params, mut cx| async move { + if let Some(this) = this.upgrade(&cx) { + this.update(&mut cx, |this, _| { + if let Some(status) = + this.language_server_statuses.get_mut(&server_id) + { + if let lsp::NumberOrString::String(token) = params.token { + status.progress_tokens.insert(token); + } + } + }); + } + Ok(()) + } + }) + .detach(); + language_server + .on_request::({ + let this = this.downgrade(); + move |params, mut cx| async move { + let this = this + .upgrade(&cx) + .ok_or_else(|| anyhow!("project dropped"))?; + for reg in params.registrations { + if reg.method == "workspace/didChangeWatchedFiles" { + if let Some(options) = reg.register_options { + let options = serde_json::from_value(options)?; + this.update(&mut cx, |this, cx| { + this.on_lsp_did_change_watched_files( + server_id, options, cx, + ); + }); + } + } } - }) - .detach(); - - language_server - .notify::( - lsp::DidChangeConfigurationParams { - settings: workspace_config, - }, - ) - .ok(); - - this.update(&mut cx, |this, cx| { - // If the language server for this key doesn't match the server id, don't store the - // server. Which will cause it to be dropped, killing the process - if this - .language_server_ids - .get(&key) - .map(|id| id != &server_id) - .unwrap_or(false) - { - return None; + Ok(()) } + }) + .detach(); - // Update language_servers collection with Running variant of LanguageServerState - // indicating that the server is up and running and ready - this.language_servers.insert( - server_id, - LanguageServerState::Running { - adapter: adapter.clone(), - language: language.clone(), - watched_paths: Default::default(), - server: language_server.clone(), - simulate_disk_based_diagnostics_completion: None, - }, - ); - this.language_server_statuses.insert( - server_id, - LanguageServerStatus { - name: language_server.name().to_string(), - pending_work: Default::default(), - has_pending_diagnostic_updates: false, - progress_tokens: Default::default(), - }, - ); - - if let Some(project_id) = this.remote_id() { - this.client - .send(proto::StartLanguageServer { - project_id, - server: Some(proto::LanguageServer { - id: server_id as u64, - name: language_server.name().to_string(), - }), - }) - .log_err(); + language_server + .on_request::({ + let this = this.downgrade(); + let adapter = adapter.clone(); + let language_server = language_server.clone(); + move |params, cx| { + Self::on_lsp_workspace_edit( + this, + params, + server_id, + adapter.clone(), + language_server.clone(), + cx, + ) } + }) + .detach(); - // Tell the language server about every open buffer in the worktree that matches the language. - for buffer in this.opened_buffers.values() { - if let Some(buffer_handle) = buffer.upgrade(cx) { - let buffer = buffer_handle.read(cx); - let file = match File::from_dyn(buffer.file()) { - Some(file) => file, - None => continue, - }; - let language = match buffer.language() { - Some(language) => language, - None => continue, - }; + let disk_based_diagnostics_progress_token = + adapter.disk_based_diagnostics_progress_token.clone(); - if file.worktree.read(cx).id() != key.0 - || !language.lsp_adapters().iter().any(|a| a.name == key.1) - { - continue; - } - - let file = file.as_local()?; - let versions = this - .buffer_snapshots - .entry(buffer.remote_id()) - .or_default() - .entry(server_id) - .or_insert_with(|| { - vec![LspBufferSnapshot { - version: 0, - snapshot: buffer.text_snapshot(), - }] - }); - - let snapshot = versions.last().unwrap(); - let version = snapshot.version; - let initial_snapshot = &snapshot.snapshot; - let uri = lsp::Url::from_file_path(file.abs_path(cx)).unwrap(); - language_server - .notify::( - lsp::DidOpenTextDocumentParams { - text_document: lsp::TextDocumentItem::new( - uri, - adapter - .language_ids - .get(language.name().as_ref()) - .cloned() - .unwrap_or_default(), - version, - initial_snapshot.text(), - ), - }, - ) - .log_err()?; - buffer_handle.update(cx, |buffer, cx| { - buffer.set_completion_triggers( - language_server - .capabilities() - .completion_provider - .as_ref() - .and_then(|provider| provider.trigger_characters.clone()) - .unwrap_or_default(), + language_server + .on_notification::({ + let this = this.downgrade(); + move |params, mut cx| { + if let Some(this) = this.upgrade(&cx) { + this.update(&mut cx, |this, cx| { + this.on_lsp_progress( + params, + server_id, + disk_based_diagnostics_progress_token.clone(), cx, - ) + ); }); } } - - cx.notify(); - Some(language_server) }) - })), - ); - server_id + .detach(); + + language_server + .notify::( + lsp::DidChangeConfigurationParams { + settings: workspace_config, + }, + ) + .ok(); + + this.update(&mut cx, |this, cx| { + // If the language server for this key doesn't match the server id, don't store the + // server. Which will cause it to be dropped, killing the process + if this + .language_server_ids + .get(&key) + .map(|id| id != &server_id) + .unwrap_or(false) + { + return None; + } + + // Update language_servers collection with Running variant of LanguageServerState + // indicating that the server is up and running and ready + this.language_servers.insert( + server_id, + LanguageServerState::Running { + adapter: adapter.clone(), + language: language.clone(), + watched_paths: Default::default(), + server: language_server.clone(), + simulate_disk_based_diagnostics_completion: None, + }, + ); + this.language_server_statuses.insert( + server_id, + LanguageServerStatus { + name: language_server.name().to_string(), + pending_work: Default::default(), + has_pending_diagnostic_updates: false, + progress_tokens: Default::default(), + }, + ); + + if let Some(project_id) = this.remote_id() { + this.client + .send(proto::StartLanguageServer { + project_id, + server: Some(proto::LanguageServer { + id: server_id as u64, + name: language_server.name().to_string(), + }), + }) + .log_err(); + } + + // Tell the language server about every open buffer in the worktree that matches the language. + for buffer in this.opened_buffers.values() { + if let Some(buffer_handle) = buffer.upgrade(cx) { + let buffer = buffer_handle.read(cx); + let file = match File::from_dyn(buffer.file()) { + Some(file) => file, + None => continue, + }; + let language = match buffer.language() { + Some(language) => language, + None => continue, + }; + + if file.worktree.read(cx).id() != key.0 + || !language.lsp_adapters().iter().any(|a| a.name == key.1) + { + continue; + } + + let file = file.as_local()?; + let versions = this + .buffer_snapshots + .entry(buffer.remote_id()) + .or_default() + .entry(server_id) + .or_insert_with(|| { + vec![LspBufferSnapshot { + version: 0, + snapshot: buffer.text_snapshot(), + }] + }); + + let snapshot = versions.last().unwrap(); + let version = snapshot.version; + let initial_snapshot = &snapshot.snapshot; + let uri = lsp::Url::from_file_path(file.abs_path(cx)).unwrap(); + language_server + .notify::( + lsp::DidOpenTextDocumentParams { + text_document: lsp::TextDocumentItem::new( + uri, + adapter + .language_ids + .get(language.name().as_ref()) + .cloned() + .unwrap_or_default(), + version, + initial_snapshot.text(), + ), + }, + ) + .log_err()?; + buffer_handle.update(cx, |buffer, cx| { + buffer.set_completion_triggers( + language_server + .capabilities() + .completion_provider + .as_ref() + .and_then(|provider| provider.trigger_characters.clone()) + .unwrap_or_default(), + cx, + ) + }); + } + } + + cx.notify(); + Some(language_server) + }) + })) } // Returns a list of all of the worktrees which no longer have a language server and the root path