Replace crossbeam_channel with an async smol::channel

This commit is contained in:
Antonio Scandurra 2021-07-09 15:06:51 +02:00
parent 102026f3c7
commit 6957027341

View file

@ -27,7 +27,7 @@ use postage::{
watch, watch,
}; };
use smol::{ use smol::{
channel::Sender, channel::{self, Sender},
io::{AsyncReadExt, AsyncWriteExt}, io::{AsyncReadExt, AsyncWriteExt},
lock::RwLock, lock::RwLock,
}; };
@ -2119,13 +2119,14 @@ impl BackgroundScanner {
self.snapshot.lock().insert_entry(entry); self.snapshot.lock().insert_entry(entry);
if is_dir { if is_dir {
let (tx, rx) = crossbeam_channel::unbounded(); let (tx, rx) = channel::unbounded();
tx.send(ScanJob { tx.send(ScanJob {
abs_path: abs_path.to_path_buf(), abs_path: abs_path.to_path_buf(),
path, path,
ignore_stack: IgnoreStack::none(), ignore_stack: IgnoreStack::none(),
scan_queue: tx.clone(), scan_queue: tx.clone(),
}) })
.await
.unwrap(); .unwrap();
drop(tx); drop(tx);
@ -2133,12 +2134,11 @@ impl BackgroundScanner {
.scoped(|scope| { .scoped(|scope| {
for _ in 0..self.executor.threads() { for _ in 0..self.executor.threads() {
scope.spawn(async { scope.spawn(async {
while let Ok(job) = rx.recv() { while let Ok(job) = rx.recv().await {
if let Err(err) = smol::block_on(self.scan_dir( if let Err(err) = self
root_char_bag, .scan_dir(root_char_bag, next_entry_id.clone(), &job)
next_entry_id.clone(), .await
&job, {
)) {
log::error!("error scanning {:?}: {}", job.abs_path, err); log::error!("error scanning {:?}: {}", job.abs_path, err);
} }
} }
@ -2244,7 +2244,7 @@ impl BackgroundScanner {
.lock() .lock()
.populate_dir(job.path.clone(), new_entries, new_ignore); .populate_dir(job.path.clone(), new_entries, new_ignore);
for new_job in new_jobs { for new_job in new_jobs {
job.scan_queue.send(new_job).unwrap(); job.scan_queue.send(new_job).await.unwrap();
} }
Ok(()) Ok(())
@ -2279,7 +2279,7 @@ impl BackgroundScanner {
} }
} }
let (scan_queue_tx, scan_queue_rx) = crossbeam_channel::unbounded(); let (scan_queue_tx, scan_queue_rx) = channel::unbounded();
for event in events { for event in events {
let path: Arc<Path> = match event.path.strip_prefix(&root_abs_path) { let path: Arc<Path> = match event.path.strip_prefix(&root_abs_path) {
Ok(path) => Arc::from(path.to_path_buf()), Ok(path) => Arc::from(path.to_path_buf()),
@ -2316,6 +2316,7 @@ impl BackgroundScanner {
ignore_stack, ignore_stack,
scan_queue: scan_queue_tx.clone(), scan_queue: scan_queue_tx.clone(),
}) })
.await
.unwrap(); .unwrap();
} }
} }
@ -2335,7 +2336,7 @@ impl BackgroundScanner {
.scoped(|scope| { .scoped(|scope| {
for _ in 0..self.executor.threads() { for _ in 0..self.executor.threads() {
scope.spawn(async { scope.spawn(async {
while let Ok(job) = scan_queue_rx.recv() { while let Ok(job) = scan_queue_rx.recv().await {
if let Err(err) = self if let Err(err) = self
.scan_dir(root_char_bag, next_entry_id.clone(), &job) .scan_dir(root_char_bag, next_entry_id.clone(), &job)
.await .await
@ -2376,7 +2377,7 @@ impl BackgroundScanner {
self.snapshot.lock().ignores.remove(&parent_path); self.snapshot.lock().ignores.remove(&parent_path);
} }
let (ignore_queue_tx, ignore_queue_rx) = crossbeam_channel::unbounded(); let (ignore_queue_tx, ignore_queue_rx) = channel::unbounded();
ignores_to_update.sort_unstable(); ignores_to_update.sort_unstable();
let mut ignores_to_update = ignores_to_update.into_iter().peekable(); let mut ignores_to_update = ignores_to_update.into_iter().peekable();
while let Some(parent_path) = ignores_to_update.next() { while let Some(parent_path) = ignores_to_update.next() {
@ -2394,6 +2395,7 @@ impl BackgroundScanner {
ignore_stack, ignore_stack,
ignore_queue: ignore_queue_tx.clone(), ignore_queue: ignore_queue_tx.clone(),
}) })
.await
.unwrap(); .unwrap();
} }
drop(ignore_queue_tx); drop(ignore_queue_tx);
@ -2402,8 +2404,8 @@ impl BackgroundScanner {
.scoped(|scope| { .scoped(|scope| {
for _ in 0..self.executor.threads() { for _ in 0..self.executor.threads() {
scope.spawn(async { scope.spawn(async {
while let Ok(job) = ignore_queue_rx.recv() { while let Ok(job) = ignore_queue_rx.recv().await {
self.update_ignore_status(job, &snapshot); self.update_ignore_status(job, &snapshot).await;
} }
}); });
} }
@ -2411,7 +2413,7 @@ impl BackgroundScanner {
.await; .await;
} }
fn update_ignore_status(&self, job: UpdateIgnoreStatusJob, snapshot: &Snapshot) { async fn update_ignore_status(&self, job: UpdateIgnoreStatusJob, snapshot: &Snapshot) {
let mut ignore_stack = job.ignore_stack; let mut ignore_stack = job.ignore_stack;
if let Some((ignore, _)) = snapshot.ignores.get(&job.path) { if let Some((ignore, _)) = snapshot.ignores.get(&job.path) {
ignore_stack = ignore_stack.append(job.path.clone(), ignore.clone()); ignore_stack = ignore_stack.append(job.path.clone(), ignore.clone());
@ -2433,6 +2435,7 @@ impl BackgroundScanner {
ignore_stack: child_ignore_stack, ignore_stack: child_ignore_stack,
ignore_queue: job.ignore_queue.clone(), ignore_queue: job.ignore_queue.clone(),
}) })
.await
.unwrap(); .unwrap();
} }
@ -2478,13 +2481,13 @@ struct ScanJob {
abs_path: PathBuf, abs_path: PathBuf,
path: Arc<Path>, path: Arc<Path>,
ignore_stack: Arc<IgnoreStack>, ignore_stack: Arc<IgnoreStack>,
scan_queue: crossbeam_channel::Sender<ScanJob>, scan_queue: Sender<ScanJob>,
} }
struct UpdateIgnoreStatusJob { struct UpdateIgnoreStatusJob {
path: Arc<Path>, path: Arc<Path>,
ignore_stack: Arc<IgnoreStack>, ignore_stack: Arc<IgnoreStack>,
ignore_queue: crossbeam_channel::Sender<UpdateIgnoreStatusJob>, ignore_queue: Sender<UpdateIgnoreStatusJob>,
} }
pub trait WorktreeHandle { pub trait WorktreeHandle {