diff --git a/zed/src/worktree.rs b/zed/src/worktree.rs index b4b1a1b332..0a9ce4ea3c 100644 --- a/zed/src/worktree.rs +++ b/zed/src/worktree.rs @@ -22,6 +22,7 @@ use gpui::{ use lazy_static::lazy_static; use parking_lot::Mutex; use postage::{ + broadcast, prelude::{Sink, Stream}, watch, }; @@ -153,7 +154,7 @@ struct InMemoryEntry { struct InMemoryFsState { entries: BTreeMap, next_inode: u64, - events_tx: watch::Sender<()>, + events_tx: broadcast::Sender, } impl InMemoryFsState { @@ -168,16 +169,26 @@ impl InMemoryFsState { Err(anyhow!("invalid ")) } } + + async fn emit_event(&mut self, path: &Path) { + let _ = self + .events_tx + .send(fsevent::Event { + event_id: 0, + flags: fsevent::StreamFlags::empty(), + path: path.to_path_buf(), + }) + .await; + } } pub struct InMemoryFs { state: RwLock, - events_rx: watch::Receiver<()>, } impl InMemoryFs { pub fn new() -> Self { - let (events_tx, events_rx) = watch::channel(); + let (events_tx, _) = broadcast::channel(2048); let mut entries = BTreeMap::new(); entries.insert( Path::new("/").to_path_buf(), @@ -195,7 +206,6 @@ impl InMemoryFs { next_inode: 1, events_tx, }), - events_rx, } } @@ -215,8 +225,33 @@ impl InMemoryFs { content: None, }, ); + state.emit_event(path).await; Ok(()) } + + pub async fn remove(&self, path: &Path) -> Result<()> { + let mut state = self.state.write().await; + state.validate_path(path)?; + + let mut paths = Vec::new(); + state.entries.retain(|path, _| { + if path.starts_with(path) { + paths.push(path.to_path_buf()); + false + } else { + true + } + }); + for path in paths { + state.emit_event(&path).await; + } + + Ok(()) + } + + pub async fn events(&self) -> broadcast::Receiver { + self.state.read().await.events_tx.subscribe() + } } #[async_trait::async_trait] @@ -267,6 +302,7 @@ impl Fs for InMemoryFs { } else { entry.content = Some(text.chunks().collect()); entry.mtime = SystemTime::now(); + state.emit_event(path).await; Ok(()) } } else { @@ -282,6 +318,7 @@ impl Fs for InMemoryFs { content: Some(text.chunks().collect()), }, ); + state.emit_event(path).await; Ok(()) } } @@ -291,7 +328,7 @@ impl Fs for InMemoryFs { enum ScanState { Idle, Scanning, - Err(Arc), + Err(Arc), } pub enum Worktree { @@ -331,7 +368,7 @@ impl Worktree { cx: &mut ModelContext, ) -> Self { let fs = Arc::new(OsFs); - let (mut tree, scan_states_tx) = LocalWorktree::new(path, languages, fs, cx); + let (mut tree, scan_states_tx) = LocalWorktree::new(path, languages, fs.clone(), cx); let (event_stream, event_stream_handle) = fsevent::EventStream::new( &[tree.snapshot.abs_path.as_ref()], Duration::from_millis(100), @@ -339,7 +376,7 @@ impl Worktree { let background_snapshot = tree.background_snapshot.clone(); let id = tree.id; std::thread::spawn(move || { - let scanner = BackgroundScanner::new(background_snapshot, scan_states_tx, id); + let scanner = BackgroundScanner::new(fs, background_snapshot, scan_states_tx, id); scanner.run(event_stream); }); tree._event_stream_handle = Some(event_stream_handle); @@ -356,7 +393,14 @@ impl Worktree { let (tree, scan_states_tx) = LocalWorktree::new(path, languages, fs.clone(), cx); let background_snapshot = tree.background_snapshot.clone(); let id = tree.id; - cx.background().spawn(async move {}).detach(); + let fs = fs.clone(); + cx.background() + .spawn(async move { + let events_rx = fs.events().await; + let scanner = BackgroundScanner::new(fs, background_snapshot, scan_states_tx, id); + scanner.run_test(events_rx).await; + }) + .detach(); Worktree::Local(tree) } @@ -1933,14 +1977,21 @@ impl<'a> sum_tree::Dimension<'a, EntrySummary> for VisibleFileCount { } struct BackgroundScanner { + fs: Arc, snapshot: Arc>, notify: Sender, thread_pool: scoped_pool::Pool, } impl BackgroundScanner { - fn new(snapshot: Arc>, notify: Sender, worktree_id: usize) -> Self { + fn new( + fs: Arc, + snapshot: Arc>, + notify: Sender, + worktree_id: usize, + ) -> Self { Self { + fs, snapshot, notify, thread_pool: scoped_pool::Pool::new(16, format!("worktree-{}-scanner", worktree_id)), @@ -1960,7 +2011,7 @@ impl BackgroundScanner { return; } - if let Err(err) = self.scan_dirs() { + if let Err(err) = smol::block_on(self.scan_dirs()) { if smol::block_on(self.notify.send(ScanState::Err(Arc::new(err)))).is_err() { return; } @@ -1975,7 +2026,7 @@ impl BackgroundScanner { return false; } - if !self.process_events(events) { + if !smol::block_on(self.process_events(events)) { return false; } @@ -1987,46 +2038,82 @@ impl BackgroundScanner { }); } - fn scan_dirs(&mut self) -> io::Result<()> { - self.snapshot.lock().scan_id += 1; + #[cfg(any(test, feature = "test-support"))] + async fn run_test(mut self, mut events_rx: broadcast::Receiver) { + if self.notify.send(ScanState::Scanning).await.is_err() { + return; + } + + if let Err(err) = self.scan_dirs().await { + if self + .notify + .send(ScanState::Err(Arc::new(err))) + .await + .is_err() + { + return; + } + } + + if self.notify.send(ScanState::Idle).await.is_err() { + return; + } + + while let Some(event) = events_rx.recv().await { + let mut events = vec![event]; + while let Ok(event) = events_rx.try_recv() { + events.push(event); + } + + if self.notify.send(ScanState::Scanning).await.is_err() { + break; + } + + if self.process_events(events).await { + break; + } + + if self.notify.send(ScanState::Idle).await.is_err() { + break; + } + } + } + + async fn scan_dirs(&mut self) -> Result<()> { + let next_entry_id; + { + let mut snapshot = self.snapshot.lock(); + snapshot.scan_id += 1; + next_entry_id = snapshot.next_entry_id.clone(); + } let path: Arc = Arc::from(Path::new("")); let abs_path = self.abs_path(); - let metadata = fs::metadata(&abs_path)?; - let inode = metadata.ino(); - let is_symlink = fs::symlink_metadata(&abs_path)?.file_type().is_symlink(); - let is_dir = metadata.file_type().is_dir(); - let mtime = metadata.modified()?; // After determining whether the root entry is a file or a directory, populate the // snapshot's "root name", which will be used for the purpose of fuzzy matching. let mut root_name = abs_path .file_name() .map_or(String::new(), |f| f.to_string_lossy().to_string()); + let root_char_bag = root_name.chars().map(|c| c.to_ascii_lowercase()).collect(); + let entry = self + .fs + .entry(root_char_bag, &next_entry_id, path.clone(), &abs_path) + .await? + .ok_or_else(|| anyhow!("root entry does not exist"))?; + let is_dir = entry.is_dir(); if is_dir { root_name.push('/'); } - let root_char_bag = root_name.chars().map(|c| c.to_ascii_lowercase()).collect(); - let next_entry_id; { let mut snapshot = self.snapshot.lock(); snapshot.root_name = root_name; snapshot.root_char_bag = root_char_bag; - next_entry_id = snapshot.next_entry_id.clone(); } + self.snapshot.lock().insert_entry(entry); if is_dir { - self.snapshot.lock().insert_entry(Entry { - id: next_entry_id.fetch_add(1, SeqCst), - kind: EntryKind::PendingDir, - path: path.clone(), - inode, - mtime, - is_symlink, - is_ignored: false, - }); - let (tx, rx) = crossbeam_channel::unbounded(); tx.send(ScanJob { abs_path: abs_path.to_path_buf(), @@ -2041,31 +2128,23 @@ impl BackgroundScanner { for _ in 0..self.thread_pool.thread_count() { pool.execute(|| { while let Ok(job) = rx.recv() { - if let Err(err) = - self.scan_dir(root_char_bag, next_entry_id.clone(), &job) - { + if let Err(err) = smol::block_on(self.scan_dir( + root_char_bag, + next_entry_id.clone(), + &job, + )) { log::error!("error scanning {:?}: {}", job.abs_path, err); } } }); } }); - } else { - self.snapshot.lock().insert_entry(Entry { - id: next_entry_id.fetch_add(1, SeqCst), - kind: EntryKind::File(char_bag_for_path(root_char_bag, &path)), - path, - inode, - mtime, - is_symlink, - is_ignored: false, - }); } Ok(()) } - fn scan_dir( + async fn scan_dir( &self, root_char_bag: CharBag, next_entry_id: Arc, @@ -2164,7 +2243,7 @@ impl BackgroundScanner { Ok(()) } - fn process_events(&mut self, mut events: Vec) -> bool { + async fn process_events(&mut self, mut events: Vec) -> bool { let mut snapshot = self.snapshot(); snapshot.scan_id += 1; @@ -2207,12 +2286,16 @@ impl BackgroundScanner { } }; - match smol::block_on(OsFs.entry( - snapshot.root_char_bag, - &next_entry_id, - path.clone(), - &event.path, - )) { + match self + .fs + .entry( + snapshot.root_char_bag, + &next_entry_id, + path.clone(), + &event.path, + ) + .await + { Ok(Some(mut fs_entry)) => { let is_dir = fs_entry.is_dir(); let ignore_stack = snapshot.ignore_stack_for_path(&path, is_dir); @@ -2245,8 +2328,11 @@ impl BackgroundScanner { for _ in 0..self.thread_pool.thread_count() { pool.execute(|| { while let Ok(job) = scan_queue_rx.recv() { - if let Err(err) = self.scan_dir(root_char_bag, next_entry_id.clone(), &job) - { + if let Err(err) = smol::block_on(self.scan_dir( + root_char_bag, + next_entry_id.clone(), + &job, + )) { log::error!("error scanning {:?}: {}", job.abs_path, err); } } @@ -3061,6 +3147,7 @@ mod tests { let (notify_tx, _notify_rx) = smol::channel::unbounded(); let mut scanner = BackgroundScanner::new( + Arc::new(OsFs), Arc::new(Mutex::new(Snapshot { id: 0, scan_id: 0, @@ -3076,7 +3163,7 @@ mod tests { notify_tx, 0, ); - scanner.scan_dirs().unwrap(); + smol::block_on(scanner.scan_dirs()).unwrap(); scanner.snapshot().check_invariants(); let mut events = Vec::new(); @@ -3086,7 +3173,7 @@ mod tests { let len = rng.gen_range(0..=events.len()); let to_deliver = events.drain(0..len).collect::>(); log::info!("Delivering events: {:#?}", to_deliver); - scanner.process_events(to_deliver); + smol::block_on(scanner.process_events(to_deliver)); scanner.snapshot().check_invariants(); } else { events.extend(randomly_mutate_tree(root_dir.path(), 0.6, &mut rng).unwrap()); @@ -3094,11 +3181,12 @@ mod tests { } } log::info!("Quiescing: {:#?}", events); - scanner.process_events(events); + smol::block_on(scanner.process_events(events)); scanner.snapshot().check_invariants(); let (notify_tx, _notify_rx) = smol::channel::unbounded(); let mut new_scanner = BackgroundScanner::new( + scanner.fs.clone(), Arc::new(Mutex::new(Snapshot { id: 0, scan_id: 0, @@ -3114,7 +3202,7 @@ mod tests { notify_tx, 1, ); - new_scanner.scan_dirs().unwrap(); + smol::block_on(new_scanner.scan_dirs()).unwrap(); assert_eq!(scanner.snapshot().to_vec(), new_scanner.snapshot().to_vec()); } }