diff --git a/Cargo.lock b/Cargo.lock index 58aff982bf..5c3136f082 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -782,6 +782,7 @@ dependencies = [ "parking_lot", "pathfinder_color", "pathfinder_geometry", + "pin-project", "rand 0.8.3", "smallvec", "smol", @@ -1076,6 +1077,26 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "19b17cddbe7ec3f8bc800887bab5e717348c95ea2ca0b1bf0837fb964dc67099" +[[package]] +name = "pin-project" +version = "1.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "96fa8ebb90271c4477f144354485b8068bd8f6b78b428b01ba892ca26caf0b63" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "1.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "758669ae3558c6f74bd2a18b41f7ac0b5a195aea6639d6a9b5e5d1ad5ba24c0b" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "pin-project-lite" version = "0.2.4" diff --git a/gpui/Cargo.toml b/gpui/Cargo.toml index 43a95abcc7..edeea5be5a 100644 --- a/gpui/Cargo.toml +++ b/gpui/Cargo.toml @@ -12,6 +12,7 @@ ordered-float = "2.1.1" parking_lot = "0.11.1" pathfinder_color = "0.5" pathfinder_geometry = "0.5" +pin-project = "1.0.5" rand = "0.8.3" smallvec = "1.6.1" smol = "1.2" diff --git a/gpui/src/app.rs b/gpui/src/app.rs index d377afc2db..cf58a389b6 100644 --- a/gpui/src/app.rs +++ b/gpui/src/app.rs @@ -1,6 +1,6 @@ use crate::{ elements::Element, - executor::{self}, + executor::{self, ForegroundTask}, keymap::{self, Keystroke}, platform::{self, App as _}, util::post_inc, @@ -292,7 +292,8 @@ pub struct MutableAppContext { HashMap>, foreground: Rc, background: Arc, - task_callbacks: HashMap, + future_handlers: HashMap, + stream_handlers: HashMap, task_done: (channel::Sender, channel::Receiver), pending_effects: VecDeque, pending_flushes: usize, @@ -321,7 +322,8 @@ impl MutableAppContext { invalidation_callbacks: HashMap::new(), foreground, background: Arc::new(executor::Background::new()), - task_callbacks: HashMap::new(), + future_handlers: HashMap::new(), + stream_handlers: HashMap::new(), task_done: channel::unbounded(), pending_effects: VecDeque::new(), pending_flushes: 0, @@ -869,97 +871,71 @@ impl MutableAppContext { self.flush_effects(); } - fn spawn_local(&mut self, future: F) -> usize + fn spawn(&mut self, future: F) -> (usize, ForegroundTask>) where F: 'static + Future, + T: 'static, { let task_id = post_inc(&mut self.next_task_id); - let app = self.weak_self.as_ref().unwrap().clone(); - self.foreground - .spawn(async move { - let output = future.await; - if let Some(app) = app.upgrade() { - app.borrow_mut() - .relay_task_output(task_id, Box::new(output)); - } - }) - .detach(); - task_id + let app = self.weak_self.as_ref().unwrap().upgrade().unwrap(); + let task = self.foreground.spawn(async move { + let output = future.await; + app.borrow_mut() + .handle_future_output(task_id, Box::new(output)) + .map(|result| *result.downcast::().unwrap()) + }); + (task_id, task) } - fn spawn_stream_local(&mut self, mut stream: F, done_tx: channel::Sender<()>) -> usize + fn spawn_stream(&mut self, mut stream: F) -> (usize, ForegroundTask>) where F: 'static + Stream + Unpin, + T: 'static, { let task_id = post_inc(&mut self.next_task_id); - let app = self.weak_self.as_ref().unwrap().clone(); - self.foreground - .spawn(async move { - loop { - match stream.next().await { - item @ Some(_) => { - if let Some(app) = app.upgrade() { - let mut app = app.borrow_mut(); - if app.relay_task_output(task_id, Box::new(item)) { - app.stream_completed(task_id); - break; - } - } else { - break; - } - } - item @ None => { - if let Some(app) = app.upgrade() { - let mut app = app.borrow_mut(); - app.relay_task_output(task_id, Box::new(item)); - app.stream_completed(task_id); - } - let _ = done_tx.send(()).await; + let app = self.weak_self.as_ref().unwrap().upgrade().unwrap(); + let task = self.foreground.spawn(async move { + loop { + match stream.next().await { + Some(item) => { + let mut app = app.borrow_mut(); + if app.handle_stream_item(task_id, Box::new(item)) { break; } } + None => { + break; + } } - }) - .detach(); - task_id + } + + app.borrow_mut() + .stream_completed(task_id) + .map(|result| *result.downcast::().unwrap()) + }); + + (task_id, task) } - fn relay_task_output(&mut self, task_id: usize, output: Box) -> bool { + fn handle_future_output( + &mut self, + task_id: usize, + output: Box, + ) -> Option> { self.pending_flushes += 1; - let task_callback = self.task_callbacks.remove(&task_id).unwrap(); + let future_callback = self.future_handlers.remove(&task_id).unwrap(); - let halt = match task_callback { - TaskCallback::OnModelFromFuture { model_id, callback } => { + let mut result = None; + + match future_callback { + FutureHandler::Model { model_id, callback } => { if let Some(mut model) = self.ctx.models.remove(&model_id) { - callback( - model.as_any_mut(), - output, - self, - model_id, - self.foreground.clone(), - ); + result = Some(callback(model.as_any_mut(), output, self, model_id)); self.ctx.models.insert(model_id, model); } self.task_done(task_id); - true } - TaskCallback::OnModelFromStream { - model_id, - mut callback, - } => { - if let Some(mut model) = self.ctx.models.remove(&model_id) { - let halt = callback(model.as_any_mut(), output, self, model_id); - self.ctx.models.insert(model_id, model); - self.task_callbacks.insert( - task_id, - TaskCallback::OnModelFromStream { model_id, callback }, - ); - halt - } else { - true - } - } - TaskCallback::OnViewFromFuture { + FutureHandler::View { window_id, view_id, callback, @@ -970,14 +946,7 @@ impl MutableAppContext { .get_mut(&window_id) .and_then(|w| w.views.remove(&view_id)) { - callback( - view.as_mut(), - output, - self, - window_id, - view_id, - self.foreground.clone(), - ); + result = Some(callback(view.as_mut(), output, self, window_id, view_id)); self.ctx .windows .get_mut(&window_id) @@ -986,12 +955,37 @@ impl MutableAppContext { .insert(view_id, view); } self.task_done(task_id); - true } - TaskCallback::OnViewFromStream { + }; + + self.flush_effects(); + result + } + + fn handle_stream_item(&mut self, task_id: usize, output: Box) -> bool { + self.pending_flushes += 1; + + let mut handler = self.stream_handlers.remove(&task_id).unwrap(); + let halt = match &mut handler { + StreamHandler::Model { + model_id, + item_callback, + .. + } => { + if let Some(mut model) = self.ctx.models.remove(&model_id) { + let halt = item_callback(model.as_any_mut(), output, self, *model_id); + self.ctx.models.insert(*model_id, model); + self.stream_handlers.insert(task_id, handler); + halt + } else { + true + } + } + StreamHandler::View { window_id, view_id, - mut callback, + item_callback, + .. } => { if let Some(mut view) = self .ctx @@ -999,34 +993,67 @@ impl MutableAppContext { .get_mut(&window_id) .and_then(|w| w.views.remove(&view_id)) { - let halt = callback(view.as_mut(), output, self, window_id, view_id); + let halt = item_callback(view.as_mut(), output, self, *window_id, *view_id); self.ctx .windows .get_mut(&window_id) .unwrap() .views - .insert(view_id, view); - self.task_callbacks.insert( - task_id, - TaskCallback::OnViewFromStream { - window_id, - view_id, - callback, - }, - ); + .insert(*view_id, view); + self.stream_handlers.insert(task_id, handler); halt } else { true } } }; + self.flush_effects(); halt } - fn stream_completed(&mut self, task_id: usize) { - self.task_callbacks.remove(&task_id); + fn stream_completed(&mut self, task_id: usize) -> Option> { + let result = match self.stream_handlers.remove(&task_id).unwrap() { + StreamHandler::Model { + model_id, + done_callback, + .. + } => { + if let Some(mut model) = self.ctx.models.remove(&model_id) { + let result = done_callback(model.as_any_mut(), self, model_id); + self.ctx.models.insert(model_id, model); + Some(result) + } else { + None + } + } + StreamHandler::View { + window_id, + view_id, + done_callback, + .. + } => { + if let Some(mut view) = self + .ctx + .windows + .get_mut(&window_id) + .and_then(|w| w.views.remove(&view_id)) + { + let result = done_callback(view.as_mut(), self, window_id, view_id); + self.ctx + .windows + .get_mut(&window_id) + .unwrap() + .views + .insert(view_id, view); + Some(result) + } else { + None + } + } + }; self.task_done(task_id); + result } fn task_done(&self, task_id: usize) { @@ -1039,7 +1066,7 @@ impl MutableAppContext { } pub fn finish_pending_tasks(&self) -> impl Future { - let mut pending_tasks = self.task_callbacks.keys().cloned().collect::>(); + let mut pending_tasks = self.future_handlers.keys().cloned().collect::>(); let task_done = self.task_done.1.clone(); async move { @@ -1404,82 +1431,68 @@ impl<'a, T: Entity> ModelContext<'a, T> { }); } - pub fn spawn_local(&mut self, future: S, callback: F) -> impl Future + pub fn spawn(&mut self, future: S, callback: F) -> ForegroundTask> where S: 'static + Future, F: 'static + FnOnce(&mut T, S::Output, &mut ModelContext) -> U, U: 'static, { - let (tx, rx) = channel::bounded(1); + let (task_id, task) = self.app.spawn::(future); - let task_id = self.app.spawn_local(future); - - self.app.task_callbacks.insert( + self.app.future_handlers.insert( task_id, - TaskCallback::OnModelFromFuture { - model_id: self.model_id, - callback: Box::new(move |model, output, app, model_id, executor| { - let model = model.downcast_mut().unwrap(); - let output = *output.downcast().unwrap(); - let result = callback(model, output, &mut ModelContext::new(app, model_id)); - executor - .spawn(async move { tx.send(result).await }) - .detach(); - }), - }, - ); - - async move { rx.recv().await.unwrap() } - } - - pub fn spawn(&mut self, future: S, callback: F) -> impl Future - where - S: 'static + Future + Send, - S::Output: Send, - F: 'static + FnOnce(&mut T, S::Output, &mut ModelContext) -> U, - U: 'static, - { - let (tx, rx) = channel::bounded(1); - - self.app - .background - .spawn(async move { - if let Err(e) = tx.send(future.await).await { - log::error!("error sending background task result to main thread: {}", e); - } - }) - .detach(); - - self.spawn_local(async move { rx.recv().await.unwrap() }, callback) - } - - pub fn spawn_stream_local( - &mut self, - stream: S, - mut callback: F, - ) -> impl Future - where - S: 'static + Stream + Unpin, - F: 'static + FnMut(&mut T, Option, &mut ModelContext), - { - let (tx, rx) = channel::bounded(1); - - let task_id = self.app.spawn_stream_local(stream, tx); - self.app.task_callbacks.insert( - task_id, - TaskCallback::OnModelFromStream { + FutureHandler::Model { model_id: self.model_id, callback: Box::new(move |model, output, app, model_id| { let model = model.downcast_mut().unwrap(); let output = *output.downcast().unwrap(); - let mut ctx = ModelContext::new(app, model_id); - callback(model, output, &mut ctx); - ctx.halt_stream + Box::new(callback( + model, + output, + &mut ModelContext::new(app, model_id), + )) }), }, ); - async move { rx.recv().await.unwrap() } + task + } + + pub fn spawn_stream( + &mut self, + stream: S, + mut item_callback: F, + done_callback: G, + ) -> ForegroundTask> + where + S: 'static + Stream + Unpin, + F: 'static + FnMut(&mut T, S::Item, &mut ModelContext), + G: 'static + FnOnce(&mut T, &mut ModelContext) -> U, + U: 'static + Any, + { + let (task_id, task) = self.app.spawn_stream(stream); + self.app.stream_handlers.insert( + task_id, + StreamHandler::Model { + model_id: self.model_id, + item_callback: Box::new(move |model, output, app, model_id| { + let model = model.downcast_mut().unwrap(); + let output = *output.downcast().unwrap(); + let mut ctx = ModelContext::new(app, model_id); + item_callback(model, output, &mut ctx); + ctx.halt_stream + }), + done_callback: Box::new( + move |model: &mut dyn Any, app: &mut MutableAppContext, model_id| { + let model = model.downcast_mut().unwrap(); + let mut ctx = ModelContext::new(app, model_id); + Box::new(done_callback(model, &mut ctx)) + }, + ), + }, + ); + + task } } @@ -1674,85 +1687,67 @@ impl<'a, T: View> ViewContext<'a, T> { self.halt_stream = true; } - pub fn spawn_local(&mut self, future: S, callback: F) -> impl Future + pub fn spawn(&mut self, future: S, callback: F) -> ForegroundTask> where S: 'static + Future, F: 'static + FnOnce(&mut T, S::Output, &mut ViewContext) -> U, U: 'static, { - let (tx, rx) = channel::bounded(1); + let (task_id, task) = self.app.spawn(future); - let task_id = self.app.spawn_local(future); - - self.app.task_callbacks.insert( + self.app.future_handlers.insert( task_id, - TaskCallback::OnViewFromFuture { - window_id: self.window_id, - view_id: self.view_id, - callback: Box::new(move |view, output, app, window_id, view_id, executor| { - let view = view.as_any_mut().downcast_mut().unwrap(); - let output = *output.downcast().unwrap(); - let result = - callback(view, output, &mut ViewContext::new(app, window_id, view_id)); - executor - .spawn(async move { tx.send(result).await }) - .detach(); - }), - }, - ); - - async move { rx.recv().await.unwrap() } - } - - pub fn spawn(&mut self, future: S, callback: F) -> impl Future - where - S: 'static + Future + Send, - S::Output: Send, - F: 'static + FnOnce(&mut T, S::Output, &mut ViewContext) -> U, - U: 'static, - { - let (tx, rx) = channel::bounded(1); - - self.app - .background - .spawn(async move { - if let Err(_) = tx.send(future.await).await { - log::error!("Error sending background task result to main thread",); - } - }) - .detach(); - - self.spawn_local(async move { rx.recv().await.unwrap() }, callback) - } - - pub fn spawn_stream_local( - &mut self, - stream: S, - mut callback: F, - ) -> impl Future - where - S: 'static + Stream + Unpin, - F: 'static + FnMut(&mut T, Option, &mut ViewContext), - { - let (tx, rx) = channel::bounded(1); - - let task_id = self.app.spawn_stream_local(stream, tx); - self.app.task_callbacks.insert( - task_id, - TaskCallback::OnViewFromStream { + FutureHandler::View { window_id: self.window_id, view_id: self.view_id, callback: Box::new(move |view, output, app, window_id, view_id| { let view = view.as_any_mut().downcast_mut().unwrap(); let output = *output.downcast().unwrap(); - let mut ctx = ViewContext::new(app, window_id, view_id); - callback(view, output, &mut ctx); - ctx.halt_stream + Box::new(callback( + view, + output, + &mut ViewContext::new(app, window_id, view_id), + )) }), }, ); - async move { rx.recv().await.unwrap() } + task + } + + pub fn spawn_stream( + &mut self, + stream: S, + mut item_callback: F, + done_callback: G, + ) -> ForegroundTask> + where + S: 'static + Stream + Unpin, + F: 'static + FnMut(&mut T, S::Item, &mut ViewContext), + G: 'static + FnOnce(&mut T, &mut ViewContext) -> U, + U: 'static + Any, + { + let (task_id, task) = self.app.spawn_stream(stream); + self.app.stream_handlers.insert( + task_id, + StreamHandler::View { + window_id: self.window_id, + view_id: self.view_id, + item_callback: Box::new(move |view, output, app, window_id, view_id| { + let view = view.as_any_mut().downcast_mut().unwrap(); + let output = *output.downcast().unwrap(); + let mut ctx = ViewContext::new(app, window_id, view_id); + item_callback(view, output, &mut ctx); + ctx.halt_stream + }), + done_callback: Box::new(move |view, app, window_id, view_id| { + let view = view.as_any_mut().downcast_mut().unwrap(); + let mut ctx = ViewContext::new(app, window_id, view_id); + Box::new(done_callback(view, &mut ctx)) + }), + }, + ); + task } } @@ -2192,24 +2187,14 @@ enum Observation { }, } -enum TaskCallback { - OnModelFromFuture { +enum FutureHandler { + Model { model_id: usize, callback: Box< - dyn FnOnce( - &mut dyn Any, - Box, - &mut MutableAppContext, - usize, - Rc, - ), + dyn FnOnce(&mut dyn Any, Box, &mut MutableAppContext, usize) -> Box, >, }, - OnModelFromStream { - model_id: usize, - callback: Box, &mut MutableAppContext, usize) -> bool>, - }, - OnViewFromFuture { + View { window_id: usize, view_id: usize, callback: Box< @@ -2219,16 +2204,26 @@ enum TaskCallback { &mut MutableAppContext, usize, usize, - Rc, - ), + ) -> Box, >, }, - OnViewFromStream { +} + +enum StreamHandler { + Model { + model_id: usize, + item_callback: + Box, &mut MutableAppContext, usize) -> bool>, + done_callback: Box Box>, + }, + View { window_id: usize, view_id: usize, - callback: Box< + item_callback: Box< dyn FnMut(&mut dyn AnyView, Box, &mut MutableAppContext, usize, usize) -> bool, >, + done_callback: + Box Box>, }, } @@ -2395,7 +2390,7 @@ mod tests { let handle = app.add_model(|_| Model::default()); handle .update(&mut app, |_, c| { - c.spawn_local(async { 7 }, |model, output, _| { + c.spawn(async { 7 }, |model, output, _| { model.count = output; }) }) @@ -2428,9 +2423,15 @@ mod tests { let handle = app.add_model(|_| Model::default()); handle .update(&mut app, |_, c| { - c.spawn_stream_local(smol::stream::iter(vec![1, 2, 3]), |model, output, _| { - model.events.push(output); - }) + c.spawn_stream( + smol::stream::iter(vec![1, 2, 3]), + |model, output, _| { + model.events.push(Some(output)); + }, + |model, _| { + model.events.push(None); + }, + ) }) .await; @@ -2802,7 +2803,7 @@ mod tests { let (_, handle) = app.add_window(|_| View::default()); handle .update(&mut app, |_, c| { - c.spawn_local(async { 7 }, |me, output, _| { + c.spawn(async { 7 }, |me, output, _| { me.count = output; }) }) @@ -2844,9 +2845,15 @@ mod tests { let (_, handle) = app.add_window(|_| View::default()); handle .update(&mut app, |_, c| { - c.spawn_stream_local(smol::stream::iter(vec![1, 2, 3]), |me, output, _| { - me.events.push(output); - }) + c.spawn_stream( + smol::stream::iter(vec![1_usize, 2, 3]), + |me, output, _| { + me.events.push(Some(output)); + }, + |me, _| { + me.events.push(None); + }, + ) }) .await; @@ -3159,19 +3166,21 @@ mod tests { model.update(&mut app, |_, ctx| { let _ = ctx.spawn(async {}, |_, _, _| {}); - let _ = ctx.spawn_local(async {}, |_, _, _| {}); - let _ = ctx.spawn_stream_local(smol::stream::iter(vec![1, 2, 3]), |_, _, _| {}); + let _ = ctx.spawn(async {}, |_, _, _| {}); + let _ = + ctx.spawn_stream(smol::stream::iter(vec![1, 2, 3]), |_, _, _| {}, |_, _| {}); }); view.update(&mut app, |_, ctx| { let _ = ctx.spawn(async {}, |_, _, _| {}); - let _ = ctx.spawn_local(async {}, |_, _, _| {}); - let _ = ctx.spawn_stream_local(smol::stream::iter(vec![1, 2, 3]), |_, _, _| {}); + let _ = ctx.spawn(async {}, |_, _, _| {}); + let _ = + ctx.spawn_stream(smol::stream::iter(vec![1, 2, 3]), |_, _, _| {}, |_, _| {}); }); - assert!(!app.0.borrow().task_callbacks.is_empty()); + assert!(!app.0.borrow().future_handlers.is_empty()); app.finish_pending_tasks().await; - assert!(app.0.borrow().task_callbacks.is_empty()); + assert!(app.0.borrow().future_handlers.is_empty()); app.finish_pending_tasks().await; // Don't block if there are no tasks }); } diff --git a/gpui/src/executor.rs b/gpui/src/executor.rs index 1261ec9e87..e9900ead11 100644 --- a/gpui/src/executor.rs +++ b/gpui/src/executor.rs @@ -1,6 +1,6 @@ -// #[cfg(not(test))] use anyhow::{anyhow, Result}; use async_task::Runnable; +use pin_project::pin_project; use smol::prelude::*; use smol::{channel, Executor}; use std::rc::Rc; @@ -17,9 +17,24 @@ pub enum Foreground { Test(smol::LocalExecutor<'static>), } +#[pin_project(project = ForegroundTaskProject)] pub enum ForegroundTask { - Platform(async_task::Task), - Test(smol::Task), + Platform(#[pin] async_task::Task), + Test(#[pin] smol::Task), +} + +impl Future for ForegroundTask { + type Output = T; + + fn poll( + self: std::pin::Pin<&mut Self>, + ctx: &mut std::task::Context<'_>, + ) -> std::task::Poll { + match self.project() { + ForegroundTaskProject::Platform(task) => task.poll(ctx), + ForegroundTaskProject::Test(task) => task.poll(ctx), + } + } } pub struct Background { @@ -27,6 +42,7 @@ pub struct Background { _stop: channel::Sender<()>, } +#[must_use] pub type BackgroundTask = smol::Task; impl Foreground { @@ -69,6 +85,7 @@ impl Foreground { } } +#[must_use] impl ForegroundTask { pub fn detach(self) { match self { diff --git a/zed/src/editor/buffer_view.rs b/zed/src/editor/buffer_view.rs index 28513f70d7..d6fd9672f4 100644 --- a/zed/src/editor/buffer_view.rs +++ b/zed/src/editor/buffer_view.rs @@ -1066,7 +1066,7 @@ impl BufferView { ctx.notify(); let epoch = self.next_blink_epoch(); - let _ = ctx.spawn_local( + let _ = ctx.spawn( async move { Timer::after(CURSOR_BLINK_INTERVAL).await; epoch @@ -1088,7 +1088,7 @@ impl BufferView { ctx.notify(); let epoch = self.next_blink_epoch(); - let _ = ctx.spawn_local( + let _ = ctx.spawn( async move { Timer::after(CURSOR_BLINK_INTERVAL).await; epoch diff --git a/zed/src/watch.rs b/zed/src/watch.rs index 7cc7f59fbe..9405338c98 100644 --- a/zed/src/watch.rs +++ b/zed/src/watch.rs @@ -38,14 +38,14 @@ impl Receiver { impl Receiver { pub fn notify_model_on_change(&self, ctx: &mut ModelContext) { let watch = self.clone(); - let _ = ctx.spawn_local(async move { watch.updated().await }, |_, _, ctx| { + let _ = ctx.spawn(async move { watch.updated().await }, |_, _, ctx| { ctx.notify() }); } pub fn notify_view_on_change(&self, ctx: &mut ViewContext) { let watch = self.clone(); - let _ = ctx.spawn_local(async move { watch.updated().await }, |_, _, ctx| { + let _ = ctx.spawn(async move { watch.updated().await }, |_, _, ctx| { ctx.notify() }); } diff --git a/zed/src/worktree/worktree.rs b/zed/src/worktree/worktree.rs index f22603da87..ab8d41c46b 100644 --- a/zed/src/worktree/worktree.rs +++ b/zed/src/worktree/worktree.rs @@ -62,18 +62,19 @@ impl Worktree { let tree = tree.clone(); let (tx, rx) = smol::channel::bounded(1); - ctx.background_executor() - .spawn(async move { - tx.send(tree.scan_dirs()).await.unwrap(); - }) - .detach(); + let task = ctx.background_executor().spawn(async move { + let _ = tx.send(tree.scan_dirs()?).await; + Ok(()) + }); - let _ = ctx.spawn_local(async move { rx.recv().await.unwrap() }, Self::done_scanning); + ctx.spawn(task, Self::done_scanning).detach(); - let _ = ctx.spawn_stream_local( + ctx.spawn_stream( timer::repeat(Duration::from_millis(100)).map(|_| ()), Self::scanning, - ); + |_, _| {}, + ) + .detach(); } tree @@ -347,7 +348,7 @@ impl Worktree { } } - fn scanning(&mut self, _: Option<()>, ctx: &mut ModelContext) { + fn scanning(&mut self, _: (), ctx: &mut ModelContext) { if self.0.read().scanning { ctx.notify(); } else { @@ -356,6 +357,7 @@ impl Worktree { } fn done_scanning(&mut self, result: io::Result<()>, ctx: &mut ModelContext) { + log::info!("done scanning"); self.0.write().scanning = false; if let Err(error) = result { log::error!("error populating worktree: {}", error);