From debb694d971cfc398f5dd8d87b7c2bb9e67d682b Mon Sep 17 00:00:00 2001 From: Max Brunsfeld Date: Thu, 13 Apr 2023 16:51:11 -0700 Subject: [PATCH] Always bump scan_id when refreshing an entry The scan_id needs to be bumped even if a scan is already in progress, so that worktree updates can detect that entries have changed. This means that the worktree's completed_scan_id may increase by more than one at the end of a scan. --- crates/project/src/worktree.rs | 130 +++++++++++++++++++-------------- 1 file changed, 75 insertions(+), 55 deletions(-) diff --git a/crates/project/src/worktree.rs b/crates/project/src/worktree.rs index 3459bd7e5d..29aec15610 100644 --- a/crates/project/src/worktree.rs +++ b/crates/project/src/worktree.rs @@ -95,7 +95,17 @@ pub struct Snapshot { root_char_bag: CharBag, entries_by_path: SumTree, entries_by_id: SumTree, + + /// A number that increases every time the worktree begins scanning + /// a set of paths from the filesystem. This scanning could be caused + /// by some operation performed on the worktree, such as reading or + /// writing a file, or by an event reported by the filesystem. scan_id: usize, + + /// The latest scan id that has completed, and whose preceding scans + /// have all completed. The current `scan_id` could be more than one + /// greater than the `completed_scan_id` if operations are performed + /// on the worktree while it is processing a file-system event. completed_scan_id: usize, } @@ -2168,6 +2178,7 @@ impl BackgroundScanner { } { let mut snapshot = self.snapshot.lock(); + snapshot.scan_id += 1; ignore_stack = snapshot.ignore_stack_for_abs_path(&root_abs_path, true); if ignore_stack.is_all() { if let Some(mut root_entry) = snapshot.root_entry().cloned() { @@ -2189,6 +2200,10 @@ impl BackgroundScanner { .unwrap(); drop(scan_job_tx); self.scan_dirs(true, scan_job_rx).await; + { + let mut snapshot = self.snapshot.lock(); + snapshot.completed_scan_id = snapshot.scan_id; + } self.send_status_update(false, None); // Process any any FS events that occurred while performing the initial scan. @@ -2200,7 +2215,6 @@ impl BackgroundScanner { paths.extend(more_events.into_iter().map(|e| e.path)); } self.process_events(paths).await; - self.send_status_update(false, None); } self.finished_initial_scan = true; @@ -2212,9 +2226,8 @@ impl BackgroundScanner { // these before handling changes reported by the filesystem. request = self.refresh_requests_rx.recv().fuse() => { let Ok((paths, barrier)) = request else { break }; - self.reload_entries_for_paths(paths, None).await; - if !self.send_status_update(false, Some(barrier)) { - break; + if !self.process_refresh_request(paths, barrier).await { + return; } } @@ -2225,15 +2238,17 @@ impl BackgroundScanner { paths.extend(more_events.into_iter().map(|e| e.path)); } self.process_events(paths).await; - self.send_status_update(false, None); } } } } - async fn process_events(&mut self, paths: Vec) { - use futures::FutureExt as _; + async fn process_refresh_request(&self, paths: Vec, barrier: barrier::Sender) -> bool { + self.reload_entries_for_paths(paths, None).await; + self.send_status_update(false, Some(barrier)) + } + async fn process_events(&mut self, paths: Vec) { let (scan_job_tx, scan_job_rx) = channel::unbounded(); if let Some(mut paths) = self .reload_entries_for_paths(paths, Some(scan_job_tx.clone())) @@ -2245,35 +2260,7 @@ impl BackgroundScanner { drop(scan_job_tx); self.scan_dirs(false, scan_job_rx).await; - let (ignore_queue_tx, ignore_queue_rx) = channel::unbounded(); - let snapshot = self.update_ignore_statuses(ignore_queue_tx); - self.executor - .scoped(|scope| { - for _ in 0..self.executor.num_cpus() { - scope.spawn(async { - loop { - select_biased! { - // Process any path refresh requests before moving on to process - // the queue of ignore statuses. - request = self.refresh_requests_rx.recv().fuse() => { - let Ok((paths, barrier)) = request else { break }; - self.reload_entries_for_paths(paths, None).await; - if !self.send_status_update(false, Some(barrier)) { - return; - } - } - - // Recursively process directories whose ignores have changed. - job = ignore_queue_rx.recv().fuse() => { - let Ok(job) = job else { break }; - self.update_ignore_status(job, &snapshot).await; - } - } - } - }); - } - }) - .await; + self.update_ignore_statuses().await; let mut snapshot = self.snapshot.lock(); let mut git_repositories = mem::take(&mut snapshot.git_repositories); @@ -2281,6 +2268,9 @@ impl BackgroundScanner { snapshot.git_repositories = git_repositories; snapshot.removed_entry_ids.clear(); snapshot.completed_scan_id = snapshot.scan_id; + drop(snapshot); + + self.send_status_update(false, None); } async fn scan_dirs( @@ -2313,8 +2303,7 @@ impl BackgroundScanner { // the scan queue, so that user operations are prioritized. request = self.refresh_requests_rx.recv().fuse() => { let Ok((paths, barrier)) = request else { break }; - self.reload_entries_for_paths(paths, None).await; - if !self.send_status_update(false, Some(barrier)) { + if !self.process_refresh_request(paths, barrier).await { return; } } @@ -2521,12 +2510,10 @@ impl BackgroundScanner { .await; let mut snapshot = self.snapshot.lock(); - - if snapshot.completed_scan_id == snapshot.scan_id { - snapshot.scan_id += 1; - if !doing_recursive_update { - snapshot.completed_scan_id = snapshot.scan_id; - } + let is_idle = snapshot.completed_scan_id == snapshot.scan_id; + snapshot.scan_id += 1; + if is_idle && !doing_recursive_update { + snapshot.completed_scan_id = snapshot.scan_id; } // Remove any entries for paths that no longer exist or are being recursively @@ -2596,16 +2583,17 @@ impl BackgroundScanner { Some(event_paths) } - fn update_ignore_statuses( - &self, - ignore_queue_tx: Sender, - ) -> LocalSnapshot { + async fn update_ignore_statuses(&self) { + use futures::FutureExt as _; + let mut snapshot = self.snapshot.lock().clone(); let mut ignores_to_update = Vec::new(); let mut ignores_to_delete = Vec::new(); for (parent_abs_path, (_, scan_id)) in &snapshot.ignores_by_parent_abs_path { if let Ok(parent_path) = parent_abs_path.strip_prefix(&snapshot.abs_path) { - if *scan_id == snapshot.scan_id && snapshot.entry_for_path(parent_path).is_some() { + if *scan_id > snapshot.completed_scan_id + && snapshot.entry_for_path(parent_path).is_some() + { ignores_to_update.push(parent_abs_path.clone()); } @@ -2624,6 +2612,7 @@ impl BackgroundScanner { .remove(&parent_abs_path); } + let (ignore_queue_tx, ignore_queue_rx) = channel::unbounded(); ignores_to_update.sort_unstable(); let mut ignores_to_update = ignores_to_update.into_iter().peekable(); while let Some(parent_abs_path) = ignores_to_update.next() { @@ -2642,8 +2631,34 @@ impl BackgroundScanner { })) .unwrap(); } + drop(ignore_queue_tx); - snapshot + self.executor + .scoped(|scope| { + for _ in 0..self.executor.num_cpus() { + scope.spawn(async { + loop { + select_biased! { + // Process any path refresh requests before moving on to process + // the queue of ignore statuses. + request = self.refresh_requests_rx.recv().fuse() => { + let Ok((paths, barrier)) = request else { break }; + if !self.process_refresh_request(paths, barrier).await { + return; + } + } + + // Recursively process directories whose ignores have changed. + job = ignore_queue_rx.recv().fuse() => { + let Ok(job) = job else { break }; + self.update_ignore_status(job, &snapshot).await; + } + } + } + }); + } + }) + .await; } async fn update_ignore_status(&self, job: UpdateIgnoreStatusJob, snapshot: &LocalSnapshot) { @@ -3054,12 +3069,11 @@ mod tests { use fs::repository::FakeGitRepository; use fs::{FakeFs, RealFs}; use gpui::{executor::Deterministic, TestAppContext}; + use pretty_assertions::assert_eq; use rand::prelude::*; use serde_json::json; use std::{env, fmt::Write}; - use util::http::FakeHttpClient; - - use util::test::temp_tree; + use util::{http::FakeHttpClient, test::temp_tree}; #[gpui::test] async fn test_traversal(cx: &mut TestAppContext) { @@ -3461,7 +3475,7 @@ mod tests { } #[gpui::test(iterations = 30)] - async fn test_create_directory(cx: &mut TestAppContext) { + async fn test_create_directory_during_initial_scan(cx: &mut TestAppContext) { let client = cx.read(|cx| Client::new(FakeHttpClient::with_404_response(), cx)); let fs = FakeFs::new(cx.background()); @@ -3486,6 +3500,8 @@ mod tests { .await .unwrap(); + let mut snapshot1 = tree.update(cx, |tree, _| tree.as_local().unwrap().snapshot()); + let entry = tree .update(cx, |tree, cx| { tree.as_local_mut() @@ -3497,10 +3513,14 @@ mod tests { assert!(entry.is_dir()); cx.foreground().run_until_parked(); - tree.read_with(cx, |tree, _| { assert_eq!(tree.entry_for_path("a/e").unwrap().kind, EntryKind::Dir); }); + + let snapshot2 = tree.update(cx, |tree, _| tree.as_local().unwrap().snapshot()); + let update = snapshot2.build_update(&snapshot1, 0, 0, true); + snapshot1.apply_remote_update(update).unwrap(); + assert_eq!(snapshot1.to_vec(true), snapshot2.to_vec(true),); } #[gpui::test(iterations = 100)]