🎨 BackgroundScanner::run

This commit is contained in:
Max Brunsfeld 2023-03-23 16:22:07 -07:00
parent 89e99d2902
commit a0e98ccc35

View file

@ -2225,39 +2225,42 @@ impl BackgroundScanner {
.unwrap(); .unwrap();
drop(tx); drop(tx);
// Spawn a worker thread per logical CPU.
self.executor self.executor
.scoped(|scope| { .scoped(|scope| {
// One the first worker thread, listen for change requests from the worktree. // While the scan is running, listen for path update requests from the worktree,
// For each change request, after refreshing the given paths, report // and report updates to the worktree based on a timer.
// a progress update for the snapshot.
scope.spawn(async { scope.spawn(async {
let reporting_timer = self.delay().fuse(); let reporting_timer = self.pause_between_initializing_updates().fuse();
futures::pin_mut!(reporting_timer); futures::pin_mut!(reporting_timer);
loop { loop {
select_biased! { select_biased! {
job = changed_paths.next().fuse() => { job = changed_paths.next().fuse() => {
let Some((abs_paths, barrier)) = job else { break }; let Some((abs_paths, barrier)) = job else { break };
self.update_entries_for_paths(abs_paths, None).await; self.update_entries_for_paths(abs_paths, None).await;
if self.notify.unbounded_send(ScanState::Initializing { if self
snapshot: self.snapshot.lock().clone(), .notify
barrier: Some(barrier), .unbounded_send(ScanState::Initializing {
}).is_err() { snapshot: self.snapshot.lock().clone(),
barrier: Some(barrier),
})
.is_err()
{
break; break;
} }
} }
_ = reporting_timer => { _ = reporting_timer => {
reporting_timer.set(self.delay().fuse()); if self
if self.notify.unbounded_send(ScanState::Initializing { .notify
snapshot: self.snapshot.lock().clone(), .unbounded_send(ScanState::Initializing {
barrier: None, snapshot: self.snapshot.lock().clone(),
}).is_err() { barrier: None,
})
.is_err()
{
break; break;
} }
reporting_timer.set(self.pause_between_initializing_updates().fuse());
} }
job = rx.recv().fuse() => { job = rx.recv().fuse() => {
let Ok(job) = job else { break }; let Ok(job) = job else { break };
if let Err(err) = self if let Err(err) = self
@ -2271,7 +2274,7 @@ impl BackgroundScanner {
} }
}); });
// On all of the remaining worker threads, just scan directories. // Spawn worker threads to scan the directory recursively.
for _ in 1..self.executor.num_cpus() { for _ in 1..self.executor.num_cpus() {
scope.spawn(async { scope.spawn(async {
while let Ok(job) = rx.recv().await { while let Ok(job) = rx.recv().await {
@ -2367,7 +2370,7 @@ impl BackgroundScanner {
} }
} }
async fn delay(&self) { async fn pause_between_initializing_updates(&self) {
#[cfg(any(test, feature = "test-support"))] #[cfg(any(test, feature = "test-support"))]
if self.fs.is_fake() { if self.fs.is_fake() {
return self.executor.simulate_random_delay().await; return self.executor.simulate_random_delay().await;