diff --git a/gpui/src/app.rs b/gpui/src/app.rs index 5536b78bf5..1156ec40d4 100644 --- a/gpui/src/app.rs +++ b/gpui/src/app.rs @@ -1,6 +1,6 @@ use crate::{ elements::ElementBox, - executor, + executor::{self, Task}, keymap::{self, Keystroke}, platform::{self, CursorStyle, Platform, PromptLevel, WindowOptions}, presenter::Presenter, @@ -8,7 +8,6 @@ use crate::{ AssetCache, AssetSource, ClipboardItem, FontCache, PathPromptOptions, TextLayoutCache, }; use anyhow::{anyhow, Result}; -use async_task::Task; use keymap::MatchResult; use parking_lot::Mutex; use platform::Event; diff --git a/gpui/src/executor.rs b/gpui/src/executor.rs index 0eba748f33..ba5a88e68e 100644 --- a/gpui/src/executor.rs +++ b/gpui/src/executor.rs @@ -1,12 +1,12 @@ use anyhow::{anyhow, Result}; use async_task::Runnable; -pub use async_task::Task; use backtrace::{Backtrace, BacktraceFmt, BytesOrWideString}; use parking_lot::Mutex; use postage::{barrier, prelude::Stream as _}; use rand::prelude::*; use smol::{channel, prelude::*, Executor, Timer}; use std::{ + any::Any, fmt::{self, Debug}, marker::PhantomData, mem, @@ -42,6 +42,24 @@ pub enum Background { }, } +type AnyLocalFuture = Pin>>>; +type AnyFuture = Pin>>>; +type AnyTask = async_task::Task>; +type AnyLocalTask = async_task::Task>; + +pub enum Task { + Local { + any_task: AnyLocalTask, + result_type: PhantomData, + }, + Send { + any_task: AnyTask, + result_type: PhantomData, + }, +} + +unsafe impl Send for Task {} + struct DeterministicState { rng: StdRng, seed: u64, @@ -77,10 +95,7 @@ impl Deterministic { } } - fn spawn_from_foreground(&self, future: Pin>>) -> Task - where - T: 'static, - { + fn spawn_from_foreground(&self, future: AnyLocalFuture) -> AnyLocalTask { let backtrace = Backtrace::new_unresolved(); let scheduled_once = AtomicBool::new(false); let state = self.state.clone(); @@ -99,10 +114,7 @@ impl Deterministic { task } - fn spawn(&self, future: Pin>>) -> Task - where - T: 'static + Send, - { + fn spawn(&self, future: AnyFuture) -> AnyTask { let backtrace = Backtrace::new_unresolved(); let state = self.state.clone(); let unparker = self.parker.lock().unparker(); @@ -117,10 +129,7 @@ impl Deterministic { task } - fn run(&self, mut future: Pin>>) -> T - where - T: 'static, - { + fn run(&self, mut future: AnyLocalFuture) -> Box { let woken = Arc::new(AtomicBool::new(false)); loop { if let Some(result) = self.run_internal(woken.clone(), &mut future) { @@ -138,18 +147,15 @@ impl Deterministic { fn run_until_parked(&self) { let woken = Arc::new(AtomicBool::new(false)); - let mut future = std::future::pending::<()>().boxed_local(); + let mut future = any_local_future(std::future::pending::<()>()); self.run_internal(woken, &mut future); } - fn run_internal( + fn run_internal( &self, woken: Arc, - future: &mut Pin>>, - ) -> Option - where - T: 'static, - { + future: &mut AnyLocalFuture, + ) -> Option> { let unparker = self.parker.lock().unparker(); let waker = waker_fn(move || { woken.store(true, SeqCst); @@ -203,13 +209,7 @@ impl Deterministic { } } - pub fn block_on(&self, future: F) -> Option - where - T: 'static, - F: Future, - { - smol::pin!(future); - + fn block_on(&self, future: &mut AnyLocalFuture) -> Option> { let unparker = self.parker.lock().unparker(); let waker = waker_fn(move || { unparker.unpark(); @@ -394,8 +394,9 @@ impl Foreground { } pub fn spawn(&self, future: impl Future + 'static) -> Task { - let future = future.boxed_local(); - match self { + let future = any_local_future(future); + let any_task = match self { + Self::Deterministic(executor) => executor.spawn_from_foreground(future), Self::Platform { dispatcher, .. } => { let dispatcher = dispatcher.clone(); let schedule = move |runnable: Runnable| dispatcher.run_on_main_thread(runnable); @@ -404,17 +405,18 @@ impl Foreground { task } Self::Test(executor) => executor.spawn(future), - Self::Deterministic(executor) => executor.spawn_from_foreground(future), - } + }; + Task::local(any_task) } pub fn run(&self, future: impl 'static + Future) -> T { - let future = future.boxed_local(); - match self { + let future = any_local_future(future); + let any_value = match self { + Self::Deterministic(executor) => executor.run(future), Self::Platform { .. } => panic!("you can't call run on a platform foreground executor"), Self::Test(executor) => smol::block_on(executor.run(future)), - Self::Deterministic(executor) => executor.run(future), - } + }; + *any_value.downcast().unwrap() } pub fn forbid_parking(&self) { @@ -500,33 +502,34 @@ impl Background { T: 'static + Send, F: Send + Future + 'static, { - let future = future.boxed(); - match self { + let future = any_future(future); + let any_task = match self { Self::Production { executor, .. } => executor.spawn(future), Self::Deterministic(executor) => executor.spawn(future), - } + }; + Task::send(any_task) } pub fn block_with_timeout( &self, timeout: Duration, future: F, - ) -> Result>>> + ) -> Result> where T: 'static, F: 'static + Unpin + Future, { - let mut future = future.boxed_local(); + let mut future = any_local_future(future); if !timeout.is_zero() { let output = match self { Self::Production { .. } => smol::block_on(util::timeout(timeout, &mut future)).ok(), Self::Deterministic(executor) => executor.block_on(&mut future), }; if let Some(output) = output { - return Ok(output); + return Ok(*output.downcast().unwrap()); } } - Err(future) + Err(async { *future.await.downcast().unwrap() }) } pub async fn scoped<'scope, F>(&self, scheduler: F) @@ -576,3 +579,68 @@ pub fn deterministic(seed: u64) -> (Rc, Arc) { Arc::new(Background::Deterministic(executor)), ) } + +impl Task { + fn local(any_task: AnyLocalTask) -> Self { + Self::Local { + any_task, + result_type: PhantomData, + } + } + + pub fn detach(self) { + match self { + Task::Local { any_task, .. } => any_task.detach(), + Task::Send { any_task, .. } => any_task.detach(), + } + } +} + +impl Task { + fn send(any_task: AnyTask) -> Self { + Self::Send { + any_task, + result_type: PhantomData, + } + } +} + +impl fmt::Debug for Task { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Task::Local { any_task, .. } => any_task.fmt(f), + Task::Send { any_task, .. } => any_task.fmt(f), + } + } +} + +impl Future for Task { + type Output = T; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match unsafe { self.get_unchecked_mut() } { + Task::Local { any_task, .. } => { + any_task.poll(cx).map(|value| *value.downcast().unwrap()) + } + Task::Send { any_task, .. } => { + any_task.poll(cx).map(|value| *value.downcast().unwrap()) + } + } + } +} + +fn any_future(future: F) -> AnyFuture +where + T: 'static + Send, + F: Future + Send + 'static, +{ + async { Box::new(future.await) as Box }.boxed() +} + +fn any_local_future(future: F) -> AnyLocalFuture +where + T: 'static, + F: Future + 'static, +{ + async { Box::new(future.await) as Box }.boxed_local() +}