From 89e99d2902d6ff99b4e4ccee52db1f8b90a844a9 Mon Sep 17 00:00:00 2001 From: Max Brunsfeld Date: Thu, 23 Mar 2023 16:04:21 -0700 Subject: [PATCH] :art: Don't store path changes statefully on the background scanner --- crates/project/src/worktree.rs | 115 ++++++++++++++++----------------- 1 file changed, 54 insertions(+), 61 deletions(-) diff --git a/crates/project/src/worktree.rs b/crates/project/src/worktree.rs index 7627368934..807c2b384c 100644 --- a/crates/project/src/worktree.rs +++ b/crates/project/src/worktree.rs @@ -1,12 +1,12 @@ -use super::{ignore::IgnoreStack, DiagnosticSummary}; -use crate::{copy_recursive, ProjectEntryId, RemoveOptions}; +use crate::{ + copy_recursive, ignore::IgnoreStack, DiagnosticSummary, ProjectEntryId, RemoveOptions, +}; use ::ignore::gitignore::{Gitignore, GitignoreBuilder}; use anyhow::{anyhow, Context, Result}; use client::{proto, Client}; use clock::ReplicaId; use collections::{HashMap, VecDeque}; -use fs::LineEnding; -use fs::{repository::GitRepository, Fs}; +use fs::{repository::GitRepository, Fs, LineEnding}; use futures::{ channel::{ mpsc::{self, UnboundedReceiver, UnboundedSender}, @@ -20,17 +20,16 @@ use gpui::{ executor, AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext, Task, }; -use language::File as _; use language::{ proto::{ deserialize_fingerprint, deserialize_version, serialize_fingerprint, serialize_line_ending, serialize_version, }, - Buffer, DiagnosticEntry, PointUtf16, Rope, RopeFingerprint, Unclipped, + Buffer, DiagnosticEntry, File as _, PointUtf16, Rope, RopeFingerprint, Unclipped, }; use parking_lot::Mutex; -use postage::barrier; use postage::{ + barrier, prelude::{Sink as _, Stream as _}, watch, }; @@ -50,8 +49,7 @@ use std::{ time::{Duration, SystemTime}, }; use sum_tree::{Bias, Edit, SeekTarget, SumTree, TreeMap, TreeSet}; -use util::paths::HOME; -use util::{ResultExt, TryFutureExt}; +use util::{paths::HOME, ResultExt, TryFutureExt}; #[derive(Copy, Clone, PartialEq, Eq, Debug, Hash, PartialOrd, Ord)] pub struct WorktreeId(usize); @@ -2141,7 +2139,6 @@ impl<'a> sum_tree::Dimension<'a, EntrySummary> for PathKey { struct BackgroundScanner { fs: Arc, snapshot: Mutex, - changes: HashMap, PathChange>, notify: UnboundedSender, executor: Arc, } @@ -2158,7 +2155,6 @@ impl BackgroundScanner { snapshot: Mutex::new(snapshot), notify, executor, - changes: Default::default(), } } @@ -2167,7 +2163,7 @@ impl BackgroundScanner { } async fn run( - mut self, + self, events_rx: impl Stream>, mut changed_paths: UnboundedReceiver<(Vec, barrier::Sender)>, ) { @@ -2312,32 +2308,31 @@ impl BackgroundScanner { while let Poll::Ready(Some(additional_events)) = futures::poll!(events_rx.next()) { events.extend(additional_events); } + let abs_paths = events.into_iter().map(|e| e.path).collect(); if self.notify.unbounded_send(ScanState::Updating).is_err() { return; } - if !self - .process_events(events.into_iter().map(|e| e.path).collect(), true) - .await - { - return; - } - if self - .notify - .unbounded_send(ScanState::Updated { - snapshot: self.snapshot.lock().clone(), - changes: mem::take(&mut self.changes), - barrier: None, - }) - .is_err() - { + if let Some(changes) = self.process_events(abs_paths, true).await { + if self + .notify + .unbounded_send(ScanState::Updated { + snapshot: self.snapshot.lock().clone(), + changes, + barrier: None, + }) + .is_err() + { + return; + } + } else { return; } } // Continue processing events until the worktree is dropped. loop { - let abs_paths; let barrier; + let abs_paths; select_biased! { request = changed_paths.next().fuse() => { let Some((paths, b)) = request else { break }; @@ -2354,18 +2349,19 @@ impl BackgroundScanner { if self.notify.unbounded_send(ScanState::Updating).is_err() { return; } - if !self.process_events(abs_paths, false).await { - return; - } - if self - .notify - .unbounded_send(ScanState::Updated { - snapshot: self.snapshot.lock().clone(), - changes: mem::take(&mut self.changes), - barrier, - }) - .is_err() - { + if let Some(changes) = self.process_events(abs_paths, false).await { + if self + .notify + .unbounded_send(ScanState::Updated { + snapshot: self.snapshot.lock().clone(), + changes, + barrier, + }) + .is_err() + { + return; + } + } else { return; } } @@ -2505,10 +2501,10 @@ impl BackgroundScanner { } async fn process_events( - &mut self, + &self, abs_paths: Vec, received_before_initialized: bool, - ) -> bool { + ) -> Option, PathChange>> { let (scan_queue_tx, scan_queue_rx) = channel::unbounded(); let prev_snapshot = { @@ -2517,14 +2513,9 @@ impl BackgroundScanner { snapshot.clone() }; - let event_paths = if let Some(event_paths) = self + let event_paths = self .update_entries_for_paths(abs_paths, Some(scan_queue_tx)) - .await - { - event_paths - } else { - return false; - }; + .await?; // Scan any directories that were created as part of this event batch. self.executor @@ -2553,13 +2544,13 @@ impl BackgroundScanner { self.update_ignore_statuses().await; self.update_git_repositories(); - self.build_change_set( + let changes = self.build_change_set( prev_snapshot.snapshot, event_paths, received_before_initialized, ); self.snapshot.lock().scan_completed(); - true + Some(changes) } async fn update_entries_for_paths( @@ -2763,17 +2754,18 @@ impl BackgroundScanner { } fn build_change_set( - &mut self, + &self, old_snapshot: Snapshot, event_paths: Vec>, received_before_initialized: bool, - ) { + ) -> HashMap, PathChange> { + use PathChange::{Added, AddedOrUpdated, Removed, Updated}; + let new_snapshot = self.snapshot.lock(); + let mut changes = HashMap::default(); let mut old_paths = old_snapshot.entries_by_path.cursor::(); let mut new_paths = new_snapshot.entries_by_path.cursor::(); - use PathChange::{Added, AddedOrUpdated, Removed, Updated}; - for path in event_paths { let path = PathKey(path); old_paths.seek(&path, Bias::Left, &()); @@ -2792,7 +2784,7 @@ impl BackgroundScanner { match Ord::cmp(&old_entry.path, &new_entry.path) { Ordering::Less => { - self.changes.insert(old_entry.path.clone(), Removed); + changes.insert(old_entry.path.clone(), Removed); old_paths.next(&()); } Ordering::Equal => { @@ -2800,31 +2792,32 @@ impl BackgroundScanner { // If the worktree was not fully initialized when this event was generated, // we can't know whether this entry was added during the scan or whether // it was merely updated. - self.changes.insert(old_entry.path.clone(), AddedOrUpdated); + changes.insert(old_entry.path.clone(), AddedOrUpdated); } else if old_entry.mtime != new_entry.mtime { - self.changes.insert(old_entry.path.clone(), Updated); + changes.insert(old_entry.path.clone(), Updated); } old_paths.next(&()); new_paths.next(&()); } Ordering::Greater => { - self.changes.insert(new_entry.path.clone(), Added); + changes.insert(new_entry.path.clone(), Added); new_paths.next(&()); } } } (Some(old_entry), None) => { - self.changes.insert(old_entry.path.clone(), Removed); + changes.insert(old_entry.path.clone(), Removed); old_paths.next(&()); } (None, Some(new_entry)) => { - self.changes.insert(new_entry.path.clone(), Added); + changes.insert(new_entry.path.clone(), Added); new_paths.next(&()); } (None, None) => break, } } } + changes } }