diff --git a/crates/project/src/worktree.rs b/crates/project/src/worktree.rs index 42284f1a13..afce1b5abe 100644 --- a/crates/project/src/worktree.rs +++ b/crates/project/src/worktree.rs @@ -64,10 +64,8 @@ pub enum Worktree { pub struct LocalWorktree { snapshot: LocalSnapshot, background_snapshot: Arc>, - background_changes: Arc, PathChange>>>, - last_scan_state_rx: watch::Receiver, + is_scanning: (watch::Sender, watch::Receiver), _background_scanner_task: Task<()>, - poll_task: Option>, share: Option, diagnostics: HashMap, Vec>>>, diagnostic_summaries: TreeMap, @@ -123,6 +121,7 @@ impl std::fmt::Debug for GitRepositoryEntry { } } +#[derive(Debug)] pub struct LocalSnapshot { ignores_by_parent_abs_path: HashMap, (Arc, usize)>, git_repositories: Vec, @@ -159,11 +158,12 @@ impl DerefMut for LocalSnapshot { #[derive(Clone, Debug)] enum ScanState { - Idle, /// The worktree is performing its initial scan of the filesystem. - Initializing, + Initializing(LocalSnapshot), + Initialized(LocalSnapshot), /// The worktree is updating in response to filesystem events. Updating, + Updated(LocalSnapshot, HashMap, PathChange>), Err(Arc), } @@ -235,49 +235,37 @@ impl Worktree { } let (scan_states_tx, mut scan_states_rx) = mpsc::unbounded(); - let (mut last_scan_state_tx, last_scan_state_rx) = - watch::channel_with(ScanState::Initializing); let background_snapshot = Arc::new(Mutex::new(snapshot.clone())); - let background_changes = Arc::new(Mutex::new(HashMap::default())); cx.spawn_weak(|this, mut cx| async move { - while let Some(scan_state) = scan_states_rx.next().await { - if let Some(this) = this.upgrade(&cx) { - last_scan_state_tx.blocking_send(scan_state).ok(); - this.update(&mut cx, |this, cx| { - this.as_local_mut().unwrap().poll_snapshot(false, cx) - }); - } else { - break; - } + while let Some((state, this)) = scan_states_rx.next().await.zip(this.upgrade(&cx)) { + this.update(&mut cx, |this, cx| { + this.as_local_mut() + .unwrap() + .background_scanner_updated(state, cx); + }); } }) .detach(); + let background_scanner_task = cx.background().spawn({ + let fs = fs.clone(); + let background_snapshot = background_snapshot.clone(); + let background = cx.background().clone(); + async move { + let events = fs.watch(&abs_path, Duration::from_millis(100)).await; + BackgroundScanner::new(background_snapshot, scan_states_tx, fs, background) + .run(events) + .await; + } + }); + Worktree::Local(LocalWorktree { - _background_scanner_task: cx.background().spawn({ - let fs = fs.clone(); - let background_snapshot = background_snapshot.clone(); - let background_changes = background_changes.clone(); - let background = cx.background().clone(); - async move { - let events = fs.watch(&abs_path, Duration::from_millis(100)).await; - let scanner = BackgroundScanner::new( - background_snapshot, - background_changes, - scan_states_tx, - fs, - background, - ); - scanner.run(events).await; - } - }), snapshot, background_snapshot, - background_changes, - last_scan_state_rx, + is_scanning: watch::channel_with(true), share: None, - poll_task: None, + _background_scanner_task: background_scanner_task, diagnostics: Default::default(), diagnostic_summaries: Default::default(), client, @@ -335,7 +323,9 @@ impl Worktree { if let Some(this) = this.upgrade(&cx) { this.update(&mut cx, |this, cx| { let this = this.as_remote_mut().unwrap(); - this.poll_snapshot(cx); + this.snapshot = this.background_snapshot.lock().clone(); + cx.emit(Event::UpdatedEntries(Default::default())); + cx.notify(); while let Some((scan_id, _)) = this.snapshot_subscriptions.front() { if this.observed_snapshot(*scan_id) { let (_, tx) = this.snapshot_subscriptions.pop_front().unwrap(); @@ -539,66 +529,49 @@ impl LocalWorktree { Ok(updated) } - fn poll_snapshot(&mut self, force: bool, cx: &mut ModelContext) { - self.poll_task.take(); - - match self.last_scan_state_rx.borrow().clone() { - ScanState::Idle => { - let new_snapshot = self.background_snapshot.lock().clone(); - let changes = mem::take(&mut *self.background_changes.lock()); - let updated_repos = Self::changed_repos( - &self.snapshot.git_repositories, - &new_snapshot.git_repositories, - ); - self.snapshot = new_snapshot; - - if let Some(share) = self.share.as_mut() { - *share.snapshots_tx.borrow_mut() = self.snapshot.clone(); - } - + fn background_scanner_updated( + &mut self, + scan_state: ScanState, + cx: &mut ModelContext, + ) { + match scan_state { + ScanState::Initializing(new_snapshot) => { + *self.is_scanning.0.borrow_mut() = true; + self.set_snapshot(new_snapshot, cx); + } + ScanState::Initialized(new_snapshot) => { + *self.is_scanning.0.borrow_mut() = false; + self.set_snapshot(new_snapshot, cx); + } + ScanState::Updating => { + *self.is_scanning.0.borrow_mut() = true; + } + ScanState::Updated(new_snapshot, changes) => { + *self.is_scanning.0.borrow_mut() = false; cx.emit(Event::UpdatedEntries(changes)); - - if !updated_repos.is_empty() { - cx.emit(Event::UpdatedGitRepositories(updated_repos)); - } + self.set_snapshot(new_snapshot, cx); } - - ScanState::Initializing => { - let is_fake_fs = self.fs.is_fake(); - - let new_snapshot = self.background_snapshot.lock().clone(); - let updated_repos = Self::changed_repos( - &self.snapshot.git_repositories, - &new_snapshot.git_repositories, - ); - self.snapshot = new_snapshot; - - self.poll_task = Some(cx.spawn_weak(|this, mut cx| async move { - if is_fake_fs { - #[cfg(any(test, feature = "test-support"))] - cx.background().simulate_random_delay().await; - } else { - smol::Timer::after(Duration::from_millis(100)).await; - } - if let Some(this) = this.upgrade(&cx) { - this.update(&mut cx, |this, cx| { - this.as_local_mut().unwrap().poll_snapshot(false, cx) - }); - } - })); - - if !updated_repos.is_empty() { - cx.emit(Event::UpdatedGitRepositories(updated_repos)); - } - } - - _ => { - if force { - self.snapshot = self.background_snapshot.lock().clone(); - } + ScanState::Err(error) => { + *self.is_scanning.0.borrow_mut() = false; + log::error!("error scanning worktree {:?}", error); } } + } + fn set_snapshot(&mut self, new_snapshot: LocalSnapshot, cx: &mut ModelContext) { + let updated_repos = Self::changed_repos( + &self.snapshot.git_repositories, + &new_snapshot.git_repositories, + ); + self.snapshot = new_snapshot; + + if let Some(share) = self.share.as_mut() { + *share.snapshots_tx.borrow_mut() = self.snapshot.clone(); + } + + if !updated_repos.is_empty() { + cx.emit(Event::UpdatedGitRepositories(updated_repos)); + } cx.notify(); } @@ -631,11 +604,15 @@ impl LocalWorktree { } pub fn scan_complete(&self) -> impl Future { - let mut scan_state_rx = self.last_scan_state_rx.clone(); + let mut is_scanning_rx = self.is_scanning.1.clone(); async move { - let mut scan_state = Some(scan_state_rx.borrow().clone()); - while let Some(ScanState::Initializing | ScanState::Updating) = scan_state { - scan_state = scan_state_rx.recv().await; + let mut is_scanning = is_scanning_rx.borrow().clone(); + while is_scanning { + if let Some(value) = is_scanning_rx.recv().await { + is_scanning = value; + } else { + break; + } } } } @@ -827,11 +804,14 @@ impl LocalWorktree { delete.await?; this.update(&mut cx, |this, cx| { let this = this.as_local_mut().unwrap(); - { - let mut snapshot = this.background_snapshot.lock(); - snapshot.delete_entry(entry_id); + + this.background_snapshot.lock().delete_entry(entry_id); + + if let Some(path) = this.snapshot.delete_entry(entry_id) { + cx.emit(Event::UpdatedEntries( + [(path, PathChange::Removed)].into_iter().collect(), + )); } - this.poll_snapshot(true, cx); }); Ok(()) })) @@ -953,13 +933,8 @@ impl LocalWorktree { cx: &mut ModelContext, ) -> Task> { let fs = self.fs.clone(); - let root_char_bag; - let next_entry_id; - { - let snapshot = self.background_snapshot.lock(); - root_char_bag = snapshot.root_char_bag; - next_entry_id = snapshot.next_entry_id.clone(); - } + let root_char_bag = self.snapshot.root_char_bag; + let next_entry_id = self.snapshot.next_entry_id.clone(); cx.spawn_weak(|this, mut cx| async move { let metadata = fs .metadata(&abs_path) @@ -970,32 +945,43 @@ impl LocalWorktree { .ok_or_else(|| anyhow!("worktree was dropped"))?; this.update(&mut cx, |this, cx| { let this = this.as_local_mut().unwrap(); - let inserted_entry; + let mut entry = Entry::new(path, &metadata, &next_entry_id, root_char_bag); + entry.is_ignored = this + .snapshot + .ignore_stack_for_abs_path(&abs_path, entry.is_dir()) + .is_abs_path_ignored(&abs_path, entry.is_dir()); + { let mut snapshot = this.background_snapshot.lock(); - let mut changes = this.background_changes.lock(); - let mut entry = Entry::new(path, &metadata, &next_entry_id, root_char_bag); - entry.is_ignored = snapshot - .ignore_stack_for_abs_path(&abs_path, entry.is_dir()) - .is_abs_path_ignored(&abs_path, entry.is_dir()); - if let Some(old_path) = old_path { - snapshot.remove_path(&old_path); - changes.insert(old_path.clone(), PathChange::Removed); - } snapshot.scan_started(); - let exists = snapshot.entry_for_path(&entry.path).is_some(); - inserted_entry = snapshot.insert_entry(entry, fs.as_ref()); - changes.insert( - inserted_entry.path.clone(), - if exists { - PathChange::Updated - } else { - PathChange::Added - }, - ); + if let Some(old_path) = &old_path { + snapshot.remove_path(old_path); + } + snapshot.insert_entry(entry.clone(), fs.as_ref()); snapshot.scan_completed(); } - this.poll_snapshot(true, cx); + + let mut changes = HashMap::default(); + + this.snapshot.scan_started(); + if let Some(old_path) = &old_path { + this.snapshot.remove_path(old_path); + changes.insert(old_path.clone(), PathChange::Removed); + } + let exists = this.snapshot.entry_for_path(&entry.path).is_some(); + let inserted_entry = this.snapshot.insert_entry(entry, fs.as_ref()); + changes.insert( + inserted_entry.path.clone(), + if exists { + PathChange::Updated + } else { + PathChange::Added + }, + ); + this.snapshot.scan_completed(); + + eprintln!("refreshed {:?}", changes); + cx.emit(Event::UpdatedEntries(changes)); Ok(inserted_entry) }) }) @@ -1099,12 +1085,6 @@ impl RemoteWorktree { self.snapshot.clone() } - fn poll_snapshot(&mut self, cx: &mut ModelContext) { - self.snapshot = self.background_snapshot.lock().clone(); - cx.emit(Event::UpdatedEntries(Default::default())); - cx.notify(); - } - pub fn disconnected_from_host(&mut self) { self.updates_tx.take(); self.snapshot_subscriptions.clear(); @@ -1264,28 +1244,25 @@ impl Snapshot { Ok(entry) } - fn delete_entry(&mut self, entry_id: ProjectEntryId) -> bool { - if let Some(removed_entry) = self.entries_by_id.remove(&entry_id, &()) { - self.entries_by_path = { - let mut cursor = self.entries_by_path.cursor(); - let mut new_entries_by_path = - cursor.slice(&TraversalTarget::Path(&removed_entry.path), Bias::Left, &()); - while let Some(entry) = cursor.item() { - if entry.path.starts_with(&removed_entry.path) { - self.entries_by_id.remove(&entry.id, &()); - cursor.next(&()); - } else { - break; - } + fn delete_entry(&mut self, entry_id: ProjectEntryId) -> Option> { + let removed_entry = self.entries_by_id.remove(&entry_id, &())?; + self.entries_by_path = { + let mut cursor = self.entries_by_path.cursor(); + let mut new_entries_by_path = + cursor.slice(&TraversalTarget::Path(&removed_entry.path), Bias::Left, &()); + while let Some(entry) = cursor.item() { + if entry.path.starts_with(&removed_entry.path) { + self.entries_by_id.remove(&entry.id, &()); + cursor.next(&()); + } else { + break; } - new_entries_by_path.push_tree(cursor.suffix(&()), &()); - new_entries_by_path - }; + } + new_entries_by_path.push_tree(cursor.suffix(&()), &()); + new_entries_by_path + }; - true - } else { - false - } + Some(removed_entry.path) } pub(crate) fn apply_remote_update(&mut self, update: proto::UpdateWorktree) -> Result<()> { @@ -2204,7 +2181,7 @@ impl<'a> sum_tree::Dimension<'a, EntrySummary> for PathKey { struct BackgroundScanner { fs: Arc, snapshot: Arc>, - changes: Arc, PathChange>>>, + changes: HashMap, PathChange>, notify: UnboundedSender, executor: Arc, } @@ -2212,7 +2189,6 @@ struct BackgroundScanner { impl BackgroundScanner { fn new( snapshot: Arc>, - changes: Arc, PathChange>>>, notify: UnboundedSender, fs: Arc, executor: Arc, @@ -2220,9 +2196,9 @@ impl BackgroundScanner { Self { fs, snapshot, - changes, notify, executor, + changes: Default::default(), } } @@ -2231,10 +2207,34 @@ impl BackgroundScanner { } async fn run(mut self, events_rx: impl Stream>) { - if self.notify.unbounded_send(ScanState::Initializing).is_err() { - return; - } + // While performing the initial scan, send a new snapshot to the main + // thread on a recurring interval. + let initializing_task = self.executor.spawn({ + let executor = self.executor.clone(); + let snapshot = self.snapshot.clone(); + let notify = self.notify.clone(); + let is_fake_fs = self.fs.is_fake(); + async move { + loop { + if is_fake_fs { + #[cfg(any(test, feature = "test-support"))] + executor.simulate_random_delay().await; + } else { + smol::Timer::after(Duration::from_millis(100)).await; + } + executor.timer(Duration::from_millis(100)).await; + if notify + .unbounded_send(ScanState::Initializing(snapshot.lock().clone())) + .is_err() + { + break; + } + } + } + }); + + // Scan the entire directory. if let Err(err) = self.scan_dirs().await { if self .notify @@ -2245,7 +2245,13 @@ impl BackgroundScanner { } } - if self.notify.unbounded_send(ScanState::Idle).is_err() { + drop(initializing_task); + + if self + .notify + .unbounded_send(ScanState::Initialized(self.snapshot.lock().clone())) + .is_err() + { return; } @@ -2264,7 +2270,14 @@ impl BackgroundScanner { if !self.process_events(events, true).await { return; } - if self.notify.unbounded_send(ScanState::Idle).is_err() { + if self + .notify + .unbounded_send(ScanState::Updated( + self.snapshot.lock().clone(), + mem::take(&mut self.changes), + )) + .is_err() + { return; } } @@ -2280,7 +2293,14 @@ impl BackgroundScanner { if !self.process_events(events, false).await { return; } - if self.notify.unbounded_send(ScanState::Idle).is_err() { + if self + .notify + .unbounded_send(ScanState::Updated( + self.snapshot.lock().clone(), + mem::take(&mut self.changes), + )) + .is_err() + { return; } } @@ -2737,7 +2757,7 @@ impl BackgroundScanner { } fn build_change_set( - &self, + &mut self, old_snapshot: Snapshot, event_paths: Vec>, received_before_initialized: bool, @@ -2746,7 +2766,8 @@ impl BackgroundScanner { let mut old_paths = old_snapshot.entries_by_path.cursor::(); let mut new_paths = new_snapshot.entries_by_path.cursor::(); - let mut change_set = self.changes.lock(); + use PathChange::{Added, AddedOrUpdated, Removed, Updated}; + for path in event_paths { let path = PathKey(path); old_paths.seek(&path, Bias::Left, &()); @@ -2765,7 +2786,7 @@ impl BackgroundScanner { match Ord::cmp(&old_entry.path, &new_entry.path) { Ordering::Less => { - change_set.insert(old_entry.path.clone(), PathChange::Removed); + self.changes.insert(old_entry.path.clone(), Removed); old_paths.next(&()); } Ordering::Equal => { @@ -2773,26 +2794,25 @@ impl BackgroundScanner { // If the worktree was not fully initialized when this event was generated, // we can't know whether this entry was added during the scan or whether // it was merely updated. - change_set - .insert(old_entry.path.clone(), PathChange::AddedOrUpdated); + self.changes.insert(old_entry.path.clone(), AddedOrUpdated); } else if old_entry.mtime != new_entry.mtime { - change_set.insert(old_entry.path.clone(), PathChange::Updated); + self.changes.insert(old_entry.path.clone(), Updated); } old_paths.next(&()); new_paths.next(&()); } Ordering::Greater => { - change_set.insert(new_entry.path.clone(), PathChange::Added); + self.changes.insert(new_entry.path.clone(), Added); new_paths.next(&()); } } } (Some(old_entry), None) => { - change_set.insert(old_entry.path.clone(), PathChange::Removed); + self.changes.insert(old_entry.path.clone(), Removed); old_paths.next(&()); } (None, Some(new_entry)) => { - change_set.insert(new_entry.path.clone(), PathChange::Added); + self.changes.insert(new_entry.path.clone(), Added); new_paths.next(&()); } (None, None) => break, @@ -3567,6 +3587,49 @@ mod tests { .update(cx, |tree, _| tree.as_local_mut().unwrap().scan_complete()) .await; + // After the initial scan is complete, the `UpdatedEntries` event can + // be used to follow along with all changes to the worktree's snapshot. + worktree.update(cx, |tree, cx| { + let mut paths = tree + .as_local() + .unwrap() + .paths() + .cloned() + .collect::>(); + + cx.subscribe(&worktree, move |tree, _, event, _| { + if let Event::UpdatedEntries(changes) = event { + for (path, change_type) in changes.iter() { + let path = path.clone(); + let ix = match paths.binary_search(&path) { + Ok(ix) | Err(ix) => ix, + }; + match change_type { + PathChange::Added => { + assert_ne!(paths.get(ix), Some(&path)); + paths.insert(ix, path); + } + PathChange::Removed => { + assert_eq!(paths.get(ix), Some(&path)); + paths.remove(ix); + } + PathChange::Updated => { + assert_eq!(paths.get(ix), Some(&path)); + } + PathChange::AddedOrUpdated => { + if paths[ix] != path { + paths.insert(ix, path); + } + } + } + } + let new_paths = tree.paths().cloned().collect::>(); + assert_eq!(paths, new_paths, "incorrect changes: {:?}", changes); + } + }) + .detach(); + }); + let mut snapshots = Vec::new(); let mut mutations_len = operations; while mutations_len > 1 {