diff --git a/crates/client/src/client.rs b/crates/client/src/client.rs index aa46d64fcb..60af7e0d0b 100644 --- a/crates/client/src/client.rs +++ b/crates/client/src/client.rs @@ -851,6 +851,7 @@ impl Client { }) .detach(); + let t0 = Instant::now(); let this = self.clone(); let cx = cx.clone(); cx.foreground() @@ -867,7 +868,12 @@ impl Client { } } Err(err) => { - log::error!("connection error: {:?}", err); + // TODO - remove. Make the test's non-determinism more apparent by + // only sometimes formatting this stack trace. + if Instant::now().duration_since(t0).as_nanos() % 2 == 0 { + log::error!("connection error: {:?}", err); + } + this.set_status(Status::ConnectionLost, &cx); } } diff --git a/crates/collab/src/executor.rs b/crates/collab/src/executor.rs index d2253f8ccb..cd3cc60d4a 100644 --- a/crates/collab/src/executor.rs +++ b/crates/collab/src/executor.rs @@ -33,4 +33,12 @@ impl Executor { } } } + + pub fn record_backtrace(&self) { + match self { + Executor::Production => {} + #[cfg(test)] + Executor::Deterministic(background) => background.record_backtrace(), + } + } } diff --git a/crates/collab/src/rpc.rs b/crates/collab/src/rpc.rs index 9014ef7f40..bba07d34ef 100644 --- a/crates/collab/src/rpc.rs +++ b/crates/collab/src/rpc.rs @@ -95,6 +95,7 @@ struct Session { peer: Arc, connection_pool: Arc>, live_kit_client: Option>, + executor: Executor, } impl Session { @@ -521,7 +522,8 @@ impl Server { db: Arc::new(tokio::sync::Mutex::new(DbHandle(this.app_state.db.clone()))), peer: this.peer.clone(), connection_pool: this.connection_pool.clone(), - live_kit_client: this.app_state.live_kit_client.clone() + live_kit_client: this.app_state.live_kit_client.clone(), + executor: executor.clone(), }; update_user_contacts(user_id, &session).await?; @@ -1515,6 +1517,7 @@ async fn update_language_server( request: proto::UpdateLanguageServer, session: Session, ) -> Result<()> { + session.executor.record_backtrace(); let project_id = ProjectId::from_proto(request.project_id); let project_connection_ids = session .db() @@ -1541,6 +1544,7 @@ async fn forward_project_request( where T: EntityMessage + RequestMessage, { + session.executor.record_backtrace(); let project_id = ProjectId::from_proto(request.remote_entity_id()); let host_connection_id = { let collaborators = session @@ -1609,6 +1613,7 @@ async fn create_buffer_for_peer( request: proto::CreateBufferForPeer, session: Session, ) -> Result<()> { + session.executor.record_backtrace(); let peer_id = request.peer_id.ok_or_else(|| anyhow!("invalid peer id"))?; session .peer @@ -1621,6 +1626,7 @@ async fn update_buffer( response: Response, session: Session, ) -> Result<()> { + session.executor.record_backtrace(); let project_id = ProjectId::from_proto(request.project_id); let project_connection_ids = session .db() @@ -1628,6 +1634,8 @@ async fn update_buffer( .project_connection_ids(project_id, session.connection_id) .await?; + session.executor.record_backtrace(); + broadcast( session.connection_id, project_connection_ids.iter().copied(), diff --git a/crates/collab/src/tests/randomized_integration_tests.rs b/crates/collab/src/tests/randomized_integration_tests.rs index 1deaafcba2..6b6166ec08 100644 --- a/crates/collab/src/tests/randomized_integration_tests.rs +++ b/crates/collab/src/tests/randomized_integration_tests.rs @@ -17,7 +17,7 @@ use project::{search::SearchQuery, Project}; use rand::prelude::*; use std::{env, path::PathBuf, sync::Arc}; -#[gpui::test(iterations = 100)] +#[gpui::test(iterations = 100, detect_nondeterminism = true)] async fn test_random_collaboration( cx: &mut TestAppContext, deterministic: Arc, diff --git a/crates/gpui/src/executor.rs b/crates/gpui/src/executor.rs index 876e48351d..a78a8c4b2e 100644 --- a/crates/gpui/src/executor.rs +++ b/crates/gpui/src/executor.rs @@ -4,7 +4,7 @@ use futures::channel::mpsc; use smol::{channel, prelude::*, Executor}; use std::{ any::Any, - fmt::{self, Display}, + fmt::{self, Display, Write as _}, marker::PhantomData, mem, pin::Pin, @@ -17,7 +17,8 @@ use std::{ use crate::{ platform::{self, Dispatcher}, - util, MutableAppContext, + util::{self, CwdBacktrace}, + MutableAppContext, }; pub enum Foreground { @@ -74,11 +75,18 @@ struct DeterministicState { pending_timers: Vec<(usize, std::time::Instant, postage::barrier::Sender)>, waiting_backtrace: Option, next_runnable_id: usize, - poll_history: Vec, + poll_history: Vec, + previous_poll_history: Option>, enable_runnable_backtraces: bool, runnable_backtraces: collections::HashMap, } +#[derive(Copy, Clone, Debug, PartialEq, Eq)] +pub enum ExecutorEvent { + PollRunnable { id: usize }, + EnqueuRunnable { id: usize }, +} + #[cfg(any(test, feature = "test-support"))] struct ForegroundRunnable { id: usize, @@ -130,6 +138,7 @@ impl Deterministic { waiting_backtrace: None, next_runnable_id: 0, poll_history: Default::default(), + previous_poll_history: Default::default(), enable_runnable_backtraces: false, runnable_backtraces: Default::default(), })), @@ -137,10 +146,14 @@ impl Deterministic { }) } - pub fn runnable_history(&self) -> Vec { + pub fn execution_history(&self) -> Vec { self.state.lock().poll_history.clone() } + pub fn set_previous_execution_history(&self, history: Option>) { + self.state.lock().previous_poll_history = history; + } + pub fn enable_runnable_backtrace(&self) { self.state.lock().enable_runnable_backtraces = true; } @@ -185,6 +198,9 @@ impl Deterministic { let unparker = self.parker.lock().unparker(); let (runnable, task) = async_task::spawn_local(future, move |runnable| { let mut state = state.lock(); + state + .poll_history + .push(ExecutorEvent::EnqueuRunnable { id }); state .scheduled_from_foreground .entry(cx_id) @@ -212,6 +228,9 @@ impl Deterministic { let unparker = self.parker.lock().unparker(); let (runnable, task) = async_task::spawn(future, move |runnable| { let mut state = state.lock(); + state + .poll_history + .push(ExecutorEvent::EnqueuRunnable { id }); state .scheduled_from_background .push(BackgroundRunnable { id, runnable }); @@ -314,7 +333,9 @@ impl Deterministic { let background_len = state.scheduled_from_background.len(); let ix = state.rng.gen_range(0..background_len); let background_runnable = state.scheduled_from_background.remove(ix); - state.poll_history.push(background_runnable.id); + state.push_to_history(ExecutorEvent::PollRunnable { + id: background_runnable.id, + }); drop(state); background_runnable.runnable.run(); } else if !state.scheduled_from_foreground.is_empty() { @@ -332,7 +353,9 @@ impl Deterministic { if scheduled_from_cx.is_empty() { state.scheduled_from_foreground.remove(&cx_id_to_run); } - state.poll_history.push(foreground_runnable.id); + state.push_to_history(ExecutorEvent::PollRunnable { + id: foreground_runnable.id, + }); drop(state); @@ -366,7 +389,9 @@ impl Deterministic { let ix = state.rng.gen_range(0..=runnable_count); if ix < state.scheduled_from_background.len() { let background_runnable = state.scheduled_from_background.remove(ix); - state.poll_history.push(background_runnable.id); + state.push_to_history(ExecutorEvent::PollRunnable { + id: background_runnable.id, + }); drop(state); background_runnable.runnable.run(); } else { @@ -465,6 +490,25 @@ impl Deterministic { } } } + + pub fn record_backtrace(&self) { + let mut state = self.state.lock(); + if state.enable_runnable_backtraces { + let current_id = state + .poll_history + .iter() + .rev() + .find_map(|event| match event { + ExecutorEvent::PollRunnable { id } => Some(*id), + _ => None, + }); + if let Some(id) = current_id { + state + .runnable_backtraces + .insert(id, backtrace::Backtrace::new_unresolved()); + } + } + } } impl Drop for Timer { @@ -506,6 +550,38 @@ impl Future for Timer { #[cfg(any(test, feature = "test-support"))] impl DeterministicState { + fn push_to_history(&mut self, event: ExecutorEvent) { + self.poll_history.push(event); + if let Some(prev_history) = &self.previous_poll_history { + let ix = self.poll_history.len() - 1; + let prev_event = prev_history[ix]; + if event != prev_event { + let mut message = String::new(); + writeln!( + &mut message, + "current runnable backtrace:\n{:?}", + self.runnable_backtraces.get_mut(&event.id()).map(|trace| { + trace.resolve(); + CwdBacktrace(trace) + }) + ) + .unwrap(); + writeln!( + &mut message, + "previous runnable backtrace:\n{:?}", + self.runnable_backtraces + .get_mut(&prev_event.id()) + .map(|trace| { + trace.resolve(); + CwdBacktrace(trace) + }) + ) + .unwrap(); + panic!("detected non-determinism after {ix}. {message}"); + } + } + } + fn will_park(&mut self) { if self.forbid_parking { let mut backtrace_message = String::new(); @@ -526,6 +602,16 @@ impl DeterministicState { } } +#[cfg(any(test, feature = "test-support"))] +impl ExecutorEvent { + pub fn id(&self) -> usize { + match self { + ExecutorEvent::PollRunnable { id } => *id, + ExecutorEvent::EnqueuRunnable { id } => *id, + } + } +} + impl Foreground { pub fn platform(dispatcher: Arc) -> Result { if dispatcher.is_main_thread() { @@ -755,6 +841,16 @@ impl Background { } } } + + #[cfg(any(test, feature = "test-support"))] + pub fn record_backtrace(&self) { + match self { + Self::Deterministic { executor, .. } => executor.record_backtrace(), + _ => { + panic!("this method can only be called on a deterministic executor") + } + } + } } impl Default for Background { diff --git a/crates/gpui/src/test.rs b/crates/gpui/src/test.rs index aade1054a8..eb992b638a 100644 --- a/crates/gpui/src/test.rs +++ b/crates/gpui/src/test.rs @@ -1,7 +1,10 @@ use crate::{ - elements::Empty, executor, platform, util::CwdBacktrace, Element, ElementBox, Entity, - FontCache, Handle, LeakDetector, MutableAppContext, Platform, RenderContext, Subscription, - TestAppContext, View, + elements::Empty, + executor::{self, ExecutorEvent}, + platform, + util::CwdBacktrace, + Element, ElementBox, Entity, FontCache, Handle, LeakDetector, MutableAppContext, Platform, + RenderContext, Subscription, TestAppContext, View, }; use futures::StreamExt; use parking_lot::Mutex; @@ -62,7 +65,7 @@ pub fn run_test( let platform = Arc::new(platform::test::platform()); let font_system = platform.fonts(); let font_cache = Arc::new(FontCache::new(font_system)); - let mut prev_runnable_history: Option> = None; + let mut prev_runnable_history: Option> = None; for _ in 0..num_iterations { let seed = atomic_seed.load(SeqCst); @@ -73,6 +76,7 @@ pub fn run_test( let deterministic = executor::Deterministic::new(seed); if detect_nondeterminism { + deterministic.set_previous_execution_history(prev_runnable_history.clone()); deterministic.enable_runnable_backtrace(); } @@ -98,7 +102,7 @@ pub fn run_test( leak_detector.lock().detect(); if detect_nondeterminism { - let curr_runnable_history = deterministic.runnable_history(); + let curr_runnable_history = deterministic.execution_history(); if let Some(prev_runnable_history) = prev_runnable_history { let mut prev_entries = prev_runnable_history.iter().fuse(); let mut curr_entries = curr_runnable_history.iter().fuse(); @@ -138,7 +142,7 @@ pub fn run_test( let last_common_backtrace = common_history_prefix .last() - .map(|runnable_id| deterministic.runnable_backtrace(*runnable_id)); + .map(|event| deterministic.runnable_backtrace(event.id())); writeln!( &mut error,