From 75ee3edd2ef664476e64edfd396ea0c6c80bc3ff Mon Sep 17 00:00:00 2001 From: Niko Matsakis Date: Sun, 31 Oct 2021 07:21:07 -0400 Subject: [PATCH] introduce Cycle type and use in recovery, errors The Cycle type gives more structured information and ensures deterministic ordering of participants within any particular execution. --- components/salsa-macros/src/query_group.rs | 4 +- src/derived/slot.rs | 9 +++- src/lib.rs | 44 +++++++++++++------ src/plumbing.rs | 20 ++------- src/runtime.rs | 27 ++++++------ src/runtime/local_state.rs | 9 ++-- tests/cycles.rs | 49 ++++++++++++++++------ tests/parallel/cycles.rs | 24 +++++++++-- 8 files changed, 120 insertions(+), 66 deletions(-) diff --git a/components/salsa-macros/src/query_group.rs b/components/salsa-macros/src/query_group.rs index e66fce15..40f22c92 100644 --- a/components/salsa-macros/src/query_group.rs +++ b/components/salsa-macros/src/query_group.rs @@ -485,11 +485,11 @@ pub(crate) fn query_group(args: TokenStream, input: TokenStream) -> TokenStream quote! { const CYCLE_STRATEGY: salsa::plumbing::CycleRecoveryStrategy = salsa::plumbing::CycleRecoveryStrategy::Fallback; - fn cycle_fallback(db: &>::DynDb, cycle: &[salsa::DatabaseKeyIndex], #key_pattern: &::Key) + fn cycle_fallback(db: &>::DynDb, cycle: &salsa::Cycle, #key_pattern: &::Key) -> ::Value { #cycle_recovery_fn( db, - &cycle.iter().map(|k| format!("{:?}", k.debug(db))).collect::>(), + cycle, #(#key_names),* ) } diff --git a/src/derived/slot.rs b/src/derived/slot.rs index d91fe34d..d14ee135 100644 --- a/src/derived/slot.rs +++ b/src/derived/slot.rs @@ -251,7 +251,12 @@ where // stale, or value is absent. Let's execute! let mut result = active_query.pop_and_execute(db, || Q::execute(db, self.key.clone())); - if let Some(cycle) = &result.cycle { + // Subtle: if we were a participant in a cycle, and we have "fallback" cycle recovery, + // then we need to overwrite the returned value with the fallback value so that our callers + // do not observe the actual value we returned (which is not valid). It's important that + // we ignore the actual value that was returned because otherwise it is easy to have + // "recovery" where the final value is dependent on which node started the cycle. + if let Some(cycle) = &result.cycle_participant { result.value = Q::cycle_fallback(db, cycle, &self.key); } @@ -379,7 +384,7 @@ where cycle_error, }) => ProbeState::UpToDate(match recovery_strategy { CycleRecoveryStrategy::Panic => { - Cancelled::UnexpectedCycle(cycle_error.into_unexpected_cycle()).throw() + Cancelled::UnexpectedCycle(cycle_error.cycle).throw() } CycleRecoveryStrategy::Fallback => StampedValue { value: Q::cycle_fallback(db, &cycle_error.cycle, &self.key), diff --git a/src/lib.rs b/src/lib.rs index 6629ae1c..7742b41b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -615,7 +615,7 @@ pub enum Cancelled { /// The query encountered an "unexpected" cycle, meaning one in which some /// participants lacked cycle recovery annotations. - UnexpectedCycle(UnexpectedCycle), + UnexpectedCycle(Cycle), } impl Cancelled { @@ -657,16 +657,32 @@ impl std::error::Error for Cancelled {} /// Information about an "unexpected" cycle, meaning one where some of the /// participants lacked cycle recovery annotations. #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] -pub struct UnexpectedCycle { +pub struct Cycle { participants: plumbing::CycleParticipants, } -impl UnexpectedCycle { +impl Cycle { + pub(crate) fn new(participants: plumbing::CycleParticipants) -> Self { + Self { participants } + } + + /// Iterate over the [`DatabaseKeyIndex`] for each query participating + /// in the cycle. The start point of this iteration within the cycle + /// is arbitrary but deterministic, but the ordering is otherwise determined + /// by the execution. + pub fn participant_keys(&self) -> impl Iterator + '_ { + let min = self.participants.iter().min().unwrap(); + let index = self.participants.iter().position(|p| p == min).unwrap(); + self.participants[index..] + .iter() + .chain(self.participants[..index].iter()) + .copied() + } + /// Returns a vector with the debug information for /// all the participants in the cycle. - pub fn all_participants(&self, db: &dyn Database) -> Vec { - self.participants - .iter() + pub fn all_participants(&self, db: &DB) -> Vec { + self.participant_keys() .map(|d| format!("{:?}", d.debug(db))) .collect() } @@ -674,18 +690,17 @@ impl UnexpectedCycle { /// Returns a vector with the debug information for /// those participants in the cycle that lacked recovery /// information. - pub fn unexpected_participants(&self, db: &dyn Database) -> Vec { - self.participants - .iter() - .filter(|&&d| db.cycle_recovery_strategy(d) == CycleRecoveryStrategy::Panic) + pub fn unexpected_participants(&self, db: &DB) -> Vec { + self.participant_keys() + .filter(|&d| db.cycle_recovery_strategy(d) == CycleRecoveryStrategy::Panic) .map(|d| format!("{:?}", d.debug(db))) .collect() } /// Returns a "debug" view onto this strict that can be used to print out information. - pub fn debug<'me>(&'me self, db: &'me dyn Database) -> impl std::fmt::Debug + 'me { + pub fn debug<'me, DB: ?Sized + Database>(&'me self, db: &'me DB) -> impl std::fmt::Debug + 'me { struct UnexpectedCycleDebug<'me> { - c: &'me UnexpectedCycle, + c: &'me Cycle, db: &'me dyn Database, } @@ -701,7 +716,10 @@ impl UnexpectedCycle { } } - UnexpectedCycleDebug { c: self, db } + UnexpectedCycleDebug { + c: self, + db: db.ops_database(), + } } } diff --git a/src/plumbing.rs b/src/plumbing.rs index 52324e41..5c3b4895 100644 --- a/src/plumbing.rs +++ b/src/plumbing.rs @@ -2,11 +2,11 @@ use crate::debug::TableEntry; use crate::durability::Durability; +use crate::Cycle; use crate::Database; use crate::Query; use crate::QueryTable; use crate::QueryTableMut; -use crate::UnexpectedCycle; use std::borrow::Borrow; use std::fmt::Debug; use std::hash::Hash; @@ -80,7 +80,7 @@ pub trait QueryFunction: Query { fn cycle_fallback( db: &>::DynDb, - cycle: &[DatabaseKeyIndex], + cycle: &Cycle, key: &Self::Key, ) -> Self::Value { let _ = (db, cycle, key); @@ -240,26 +240,14 @@ pub type CycleParticipants = Arc>; #[derive(Eq, PartialEq, Clone, Debug)] pub(crate) struct CycleError { /// The queries that were part of the cycle - pub(crate) cycle: CycleParticipants, + pub(crate) cycle: Cycle, pub(crate) changed_at: Revision, pub(crate) durability: Durability, // pub(crate) recovery_strategy: CycleRecoveryStrategy, } -impl CycleError { - pub(crate) fn into_unexpected_cycle(self) -> UnexpectedCycle { - UnexpectedCycle { - participants: self.cycle, - } - } -} - impl std::fmt::Display for CycleError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - writeln!(f, "Internal error, cycle detected:\n")?; - for i in &*self.cycle { - writeln!(f, "{:?}", i)?; - } - Ok(()) + writeln!(f, "Internal error, cycle detected: {:?}", self.cycle) } } diff --git a/src/runtime.rs b/src/runtime.rs index acf9d795..64308456 100644 --- a/src/runtime.rs +++ b/src/runtime.rs @@ -1,7 +1,7 @@ use crate::durability::Durability; -use crate::plumbing::{CycleDetected, CycleError, CycleParticipants, CycleRecoveryStrategy}; +use crate::plumbing::{CycleDetected, CycleError, CycleRecoveryStrategy}; use crate::revision::{AtomicRevision, Revision}; -use crate::{Cancelled, Database, DatabaseKeyIndex, Event, EventKind}; +use crate::{Cancelled, Cycle, Database, DatabaseKeyIndex, Event, EventKind}; use log::debug; use parking_lot::lock_api::{RawRwLock, RawRwLockRecursive}; use parking_lot::{Mutex, MutexGuard, RwLock}; @@ -271,7 +271,7 @@ impl Runtime { let durability = from_stack.last().unwrap().durability; // Identify the cycle participants: - let cycle_participants = { + let cycle = { let mut v = vec![]; dg.for_each_cycle_participant( from_id, @@ -280,12 +280,12 @@ impl Runtime { to_id, |aq| v.push(aq.database_key_index), ); - Arc::new(v) + Cycle::new(Arc::new(v)) }; - debug!("cycle participants {:?}", cycle_participants); + debug!("cycle {:?}", cycle.debug(db)); // Identify cycle recovery strategy: - let recovery_strategy = self.mutual_cycle_recovery_strategy(db, &cycle_participants); + let recovery_strategy = self.mutual_cycle_recovery_strategy(db, &cycle); debug!("cycle recovery strategy {:?}", recovery_strategy); // If using fallback, we have to mark the cycle participants, so they know to recover. @@ -299,7 +299,7 @@ impl Runtime { database_key_index, to_id, |aq| { - aq.cycle = Some(cycle_participants.clone()); + aq.cycle = Some(cycle.clone()); }, ); } @@ -310,7 +310,7 @@ impl Runtime { CycleDetected { recovery_strategy, cycle_error: CycleError { - cycle: cycle_participants, + cycle, changed_at, durability, }, @@ -320,16 +320,17 @@ impl Runtime { fn mutual_cycle_recovery_strategy( &self, db: &dyn Database, - cycle: &[DatabaseKeyIndex], + cycle: &Cycle, ) -> CycleRecoveryStrategy { - let crs = db.cycle_recovery_strategy(cycle[0]); - if let Some(key) = cycle[1..] + let participants = &cycle.participants; + let crs = db.cycle_recovery_strategy(participants[0]); + if let Some(key) = participants[1..] .iter() .copied() .find(|&key| db.cycle_recovery_strategy(key) != crs) { debug!("mutual_cycle_recovery_strategy: cycle had multiple strategies ({:?} for {:?} vs {:?} for {:?})", - crs, cycle[0], + crs, participants[0], db.cycle_recovery_strategy(key), key ); CycleRecoveryStrategy::Panic @@ -484,7 +485,7 @@ struct ActiveQuery { dependencies: Option>, /// Stores the entire cycle, if one is found and this query is part of it. - cycle: Option, + cycle: Option, } impl ActiveQuery { diff --git a/src/runtime/local_state.rs b/src/runtime/local_state.rs index 439813b6..6759c641 100644 --- a/src/runtime/local_state.rs +++ b/src/runtime/local_state.rs @@ -1,7 +1,7 @@ use crate::durability::Durability; -use crate::plumbing::CycleParticipants; use crate::runtime::ActiveQuery; use crate::runtime::Revision; +use crate::Cycle; use crate::Database; use crate::DatabaseKeyIndex; use crate::Event; @@ -42,8 +42,9 @@ pub(crate) struct ComputedQueryResult { /// there was an untracked the read. pub(crate) dependencies: Option>, - /// The cycle if one occured while computing this value - pub(crate) cycle: Option, + /// If this node participated in a cycle, then this value is set + /// to the cycle in which it participated. + pub(crate) cycle_participant: Option, } impl Default for LocalState { @@ -211,7 +212,7 @@ impl ActiveQueryGuard<'_> { durability, changed_at, dependencies, - cycle, + cycle_participant: cycle, } } } diff --git a/tests/cycles.rs b/tests/cycles.rs index 31d5c18b..1fa4e64d 100644 --- a/tests/cycles.rs +++ b/tests/cycles.rs @@ -117,15 +117,15 @@ trait Database: salsa::Database { fn cycle_c(&self) -> Result<(), Error>; } -fn recover_a(_db: &dyn Database, cycle: &[String]) -> Result<(), Error> { +fn recover_a(db: &dyn Database, cycle: &salsa::Cycle) -> Result<(), Error> { Err(Error { - cycle: cycle.to_owned(), + cycle: cycle.all_participants(db), }) } -fn recover_b(_db: &dyn Database, cycle: &[String]) -> Result<(), Error> { +fn recover_b(db: &dyn Database, cycle: &salsa::Cycle) -> Result<(), Error> { Err(Error { - cycle: cycle.to_owned(), + cycle: cycle.all_participants(db), }) } @@ -218,14 +218,12 @@ fn inner_cycle() { let err = query.cycle_c(); assert!(err.is_err()); let cycle = err.unwrap_err().cycle; - assert!( - cycle - .iter() - .zip(&["cycle_b", "cycle_a"]) - .all(|(l, r)| l.contains(r)), - "{:#?}", - cycle - ); + insta::assert_debug_snapshot!(cycle, @r###" + [ + "cycle_a(())", + "cycle_b(())", + ] + "###); } #[test] @@ -309,3 +307,30 @@ fn cycle_mixed_2() { v => panic!("unexpected result: {:?}", v), } } + +#[test] +fn cycle_deterministic_order() { + // No matter whether we start from A or B, we get the same set of participants: + let a = DatabaseImpl::default().cycle_a(); + let b = DatabaseImpl::default().cycle_b(); + insta::assert_debug_snapshot!((a, b), @r###" + ( + Err( + Error { + cycle: [ + "cycle_a(())", + "cycle_b(())", + ], + }, + ), + Err( + Error { + cycle: [ + "cycle_a(())", + "cycle_b(())", + ], + }, + ), + ) + "###); +} diff --git a/tests/parallel/cycles.rs b/tests/parallel/cycles.rs index 686d0a10..e706d8bd 100644 --- a/tests/parallel/cycles.rs +++ b/tests/parallel/cycles.rs @@ -28,22 +28,38 @@ use test_env_log::test; // a2 sees cycle, recovers // a1 completes, recovers -pub(crate) fn recover_from_cycle_a1(_db: &dyn ParDatabase, _cycle: &[String], key: &i32) -> i32 { +pub(crate) fn recover_from_cycle_a1( + _db: &dyn ParDatabase, + _cycle: &salsa::Cycle, + key: &i32, +) -> i32 { log::debug!("recover_from_cycle_a1"); key * 10 + 1 } -pub(crate) fn recover_from_cycle_a2(_db: &dyn ParDatabase, _cycle: &[String], key: &i32) -> i32 { +pub(crate) fn recover_from_cycle_a2( + _db: &dyn ParDatabase, + _cycle: &salsa::Cycle, + key: &i32, +) -> i32 { log::debug!("recover_from_cycle_a2"); key * 10 + 2 } -pub(crate) fn recover_from_cycle_b1(_db: &dyn ParDatabase, _cycle: &[String], key: &i32) -> i32 { +pub(crate) fn recover_from_cycle_b1( + _db: &dyn ParDatabase, + _cycle: &salsa::Cycle, + key: &i32, +) -> i32 { log::debug!("recover_from_cycle_b1"); key * 20 + 1 } -pub(crate) fn recover_from_cycle_b2(_db: &dyn ParDatabase, _cycle: &[String], key: &i32) -> i32 { +pub(crate) fn recover_from_cycle_b2( + _db: &dyn ParDatabase, + _cycle: &salsa::Cycle, + key: &i32, +) -> i32 { log::debug!("recover_from_cycle_b2"); key * 20 + 2 }