diff --git a/src/derived/slot.rs b/src/derived/slot.rs index 5e583118..acd2f00c 100644 --- a/src/derived/slot.rs +++ b/src/derived/slot.rs @@ -378,21 +378,17 @@ where return match self.block_on_in_progress_thread(db, runtime, other_id, state) { Ok(WaitResult::Panicked) => Cancelled::throw(), Ok(WaitResult::Completed) => ProbeState::Retry, - Err(err) => ProbeState::UpToDate( - match runtime.report_unexpected_cycle( - db.ops_database(), - self.database_key_index, - err, - revision_now, - ) { - (CycleRecoveryStrategy::Panic, err) => Err(err), - (CycleRecoveryStrategy::Fallback, err) => Ok(StampedValue { - value: Q::cycle_fallback(db, &err.cycle, &self.key), - changed_at: err.changed_at, - durability: err.durability, - }), - }, - ), + Err(CycleDetected { + recovery_strategy, + cycle_error, + }) => ProbeState::UpToDate(match recovery_strategy { + CycleRecoveryStrategy::Panic => Err(cycle_error), + CycleRecoveryStrategy::Fallback => Ok(StampedValue { + value: Q::cycle_fallback(db, &cycle_error.cycle, &self.key), + changed_at: cycle_error.changed_at, + durability: cycle_error.durability, + }), + }), }; } diff --git a/src/plumbing.rs b/src/plumbing.rs index 8a4acbdd..458f5a70 100644 --- a/src/plumbing.rs +++ b/src/plumbing.rs @@ -6,7 +6,6 @@ use crate::Database; use crate::Query; use crate::QueryTable; use crate::QueryTableMut; -use crate::RuntimeId; use std::borrow::Borrow; use std::fmt::Debug; use std::hash::Hash; @@ -21,8 +20,8 @@ pub use crate::{revision::Revision, DatabaseKeyIndex, QueryDb, Runtime}; #[derive(Clone, Debug)] pub struct CycleDetected { - pub(crate) from: RuntimeId, - pub(crate) to: RuntimeId, + pub(crate) recovery_strategy: CycleRecoveryStrategy, + pub(crate) cycle_error: crate::CycleError, } /// Defines various associated types. An impl of this diff --git a/src/runtime.rs b/src/runtime.rs index 4846e477..72b49774 100644 --- a/src/runtime.rs +++ b/src/runtime.rs @@ -4,7 +4,7 @@ use crate::revision::{AtomicRevision, Revision}; use crate::{Cancelled, Database, DatabaseKeyIndex, Event, EventKind}; use log::debug; use parking_lot::lock_api::{RawRwLock, RawRwLockRecursive}; -use parking_lot::{Mutex, RwLock}; +use parking_lot::{Mutex, MutexGuard, RwLock}; use rustc_hash::FxHasher; use std::hash::{BuildHasherDefault, Hash}; use std::sync::atomic::{AtomicUsize, Ordering}; @@ -252,44 +252,69 @@ impl Runtime { .report_synthetic_read(durability, changed_at); } - /// Obviously, this should be user configurable at some point. - pub(crate) fn report_unexpected_cycle( + fn create_cycle_error( &self, db: &dyn Database, + mut dg: MutexGuard<'_, DependencyGraph>, database_key_index: DatabaseKeyIndex, - error: CycleDetected, - changed_at: Revision, - ) -> (CycleRecoveryStrategy, crate::CycleError) { - debug!( - "report_unexpected_cycle(database_key={:?})", - database_key_index - ); + to_id: RuntimeId, + ) -> CycleDetected { + debug!("create_cycle_error(database_key={:?})", database_key_index); - let mut cycle_participants = vec![]; - let mut stack = self.local_state.take_query_stack(); - let mut dg = self.shared_state.dependency_graph.lock(); - dg.for_each_cycle_participant(error.from, &mut stack, database_key_index, error.to, |aq| { - cycle_participants.push(aq.database_key_index); - }); - let cycle_participants = Arc::new(cycle_participants); - dg.for_each_cycle_participant(error.from, &mut stack, database_key_index, error.to, |aq| { - aq.cycle = Some(cycle_participants.clone()); - }); - self.local_state.restore_query_stack(stack); - let crs = self.mutual_cycle_recovery_strategy(db, &cycle_participants); - debug!( - "cycle recovery strategy {:?} for participants {:?}", - crs, cycle_participants - ); + let mut from_stack = self.local_state.take_query_stack(); + let from_id = self.id(); - ( - crs, - CycleError { + // Extract the changed_at and durability values from the top of the stack; + // these reflect the queries which were executed thus far and which + // led to the cycle. + let changed_at = from_stack.last().unwrap().changed_at; + let durability = from_stack.last().unwrap().durability; + + // Identify the cycle participants: + let cycle_participants = { + let mut v = vec![]; + dg.for_each_cycle_participant( + from_id, + &mut from_stack, + database_key_index, + to_id, + |aq| v.push(aq.database_key_index), + ); + Arc::new(v) + }; + debug!("cycle participants {:?}", cycle_participants); + + // Identify cycle recovery strategy: + let recovery_strategy = self.mutual_cycle_recovery_strategy(db, &cycle_participants); + debug!("cycle recovery strategy {:?}", recovery_strategy); + + // If using fallback, we have to mark the cycle participants, so they know to recover. + match recovery_strategy { + CycleRecoveryStrategy::Panic => {} + CycleRecoveryStrategy::Fallback => { + // Mark the cycle participants, so they know to recover: + dg.for_each_cycle_participant( + from_id, + &mut from_stack, + database_key_index, + to_id, + |aq| { + aq.cycle = Some(cycle_participants.clone()); + }, + ); + } + } + + self.local_state.restore_query_stack(from_stack); + + CycleDetected { + recovery_strategy, + cycle_error: CycleError { cycle: cycle_participants, changed_at, - durability: Durability::MAX, + durability, }, - ) + } } fn mutual_cycle_recovery_strategy( @@ -325,10 +350,7 @@ impl Runtime { let mut dg = self.shared_state.dependency_graph.lock(); if self.id() == other_id || dg.depends_on(other_id, self.id()) { - Err(CycleDetected { - from: self.id(), - to: other_id, - }) + Err(self.create_cycle_error(db, dg, database_key, other_id)) } else { db.salsa_event(Event { runtime_id: self.id(),