introduce Cycle type and use in recovery, errors

The Cycle type gives more structured information and ensures
deterministic ordering of participants within any particular
execution.
This commit is contained in:
Niko Matsakis 2021-10-31 07:21:07 -04:00
parent 3caf965533
commit 75ee3edd2e
8 changed files with 120 additions and 66 deletions

View file

@ -485,11 +485,11 @@ pub(crate) fn query_group(args: TokenStream, input: TokenStream) -> TokenStream
quote! {
const CYCLE_STRATEGY: salsa::plumbing::CycleRecoveryStrategy =
salsa::plumbing::CycleRecoveryStrategy::Fallback;
fn cycle_fallback(db: &<Self as salsa::QueryDb<'_>>::DynDb, cycle: &[salsa::DatabaseKeyIndex], #key_pattern: &<Self as salsa::Query>::Key)
fn cycle_fallback(db: &<Self as salsa::QueryDb<'_>>::DynDb, cycle: &salsa::Cycle, #key_pattern: &<Self as salsa::Query>::Key)
-> <Self as salsa::Query>::Value {
#cycle_recovery_fn(
db,
&cycle.iter().map(|k| format!("{:?}", k.debug(db))).collect::<Vec<String>>(),
cycle,
#(#key_names),*
)
}

View file

@ -251,7 +251,12 @@ where
// stale, or value is absent. Let's execute!
let mut result = active_query.pop_and_execute(db, || Q::execute(db, self.key.clone()));
if let Some(cycle) = &result.cycle {
// Subtle: if we were a participant in a cycle, and we have "fallback" cycle recovery,
// then we need to overwrite the returned value with the fallback value so that our callers
// do not observe the actual value we returned (which is not valid). It's important that
// we ignore the actual value that was returned because otherwise it is easy to have
// "recovery" where the final value is dependent on which node started the cycle.
if let Some(cycle) = &result.cycle_participant {
result.value = Q::cycle_fallback(db, cycle, &self.key);
}
@ -379,7 +384,7 @@ where
cycle_error,
}) => ProbeState::UpToDate(match recovery_strategy {
CycleRecoveryStrategy::Panic => {
Cancelled::UnexpectedCycle(cycle_error.into_unexpected_cycle()).throw()
Cancelled::UnexpectedCycle(cycle_error.cycle).throw()
}
CycleRecoveryStrategy::Fallback => StampedValue {
value: Q::cycle_fallback(db, &cycle_error.cycle, &self.key),

View file

@ -615,7 +615,7 @@ pub enum Cancelled {
/// The query encountered an "unexpected" cycle, meaning one in which some
/// participants lacked cycle recovery annotations.
UnexpectedCycle(UnexpectedCycle),
UnexpectedCycle(Cycle),
}
impl Cancelled {
@ -657,16 +657,32 @@ impl std::error::Error for Cancelled {}
/// Information about an "unexpected" cycle, meaning one where some of the
/// participants lacked cycle recovery annotations.
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct UnexpectedCycle {
pub struct Cycle {
participants: plumbing::CycleParticipants,
}
impl UnexpectedCycle {
impl Cycle {
pub(crate) fn new(participants: plumbing::CycleParticipants) -> Self {
Self { participants }
}
/// Iterate over the [`DatabaseKeyIndex`] for each query participating
/// in the cycle. The start point of this iteration within the cycle
/// is arbitrary but deterministic, but the ordering is otherwise determined
/// by the execution.
pub fn participant_keys(&self) -> impl Iterator<Item = DatabaseKeyIndex> + '_ {
let min = self.participants.iter().min().unwrap();
let index = self.participants.iter().position(|p| p == min).unwrap();
self.participants[index..]
.iter()
.chain(self.participants[..index].iter())
.copied()
}
/// Returns a vector with the debug information for
/// all the participants in the cycle.
pub fn all_participants(&self, db: &dyn Database) -> Vec<String> {
self.participants
.iter()
pub fn all_participants<DB: ?Sized + Database>(&self, db: &DB) -> Vec<String> {
self.participant_keys()
.map(|d| format!("{:?}", d.debug(db)))
.collect()
}
@ -674,18 +690,17 @@ impl UnexpectedCycle {
/// Returns a vector with the debug information for
/// those participants in the cycle that lacked recovery
/// information.
pub fn unexpected_participants(&self, db: &dyn Database) -> Vec<String> {
self.participants
.iter()
.filter(|&&d| db.cycle_recovery_strategy(d) == CycleRecoveryStrategy::Panic)
pub fn unexpected_participants<DB: ?Sized + Database>(&self, db: &DB) -> Vec<String> {
self.participant_keys()
.filter(|&d| db.cycle_recovery_strategy(d) == CycleRecoveryStrategy::Panic)
.map(|d| format!("{:?}", d.debug(db)))
.collect()
}
/// Returns a "debug" view onto this strict that can be used to print out information.
pub fn debug<'me>(&'me self, db: &'me dyn Database) -> impl std::fmt::Debug + 'me {
pub fn debug<'me, DB: ?Sized + Database>(&'me self, db: &'me DB) -> impl std::fmt::Debug + 'me {
struct UnexpectedCycleDebug<'me> {
c: &'me UnexpectedCycle,
c: &'me Cycle,
db: &'me dyn Database,
}
@ -701,7 +716,10 @@ impl UnexpectedCycle {
}
}
UnexpectedCycleDebug { c: self, db }
UnexpectedCycleDebug {
c: self,
db: db.ops_database(),
}
}
}

View file

@ -2,11 +2,11 @@
use crate::debug::TableEntry;
use crate::durability::Durability;
use crate::Cycle;
use crate::Database;
use crate::Query;
use crate::QueryTable;
use crate::QueryTableMut;
use crate::UnexpectedCycle;
use std::borrow::Borrow;
use std::fmt::Debug;
use std::hash::Hash;
@ -80,7 +80,7 @@ pub trait QueryFunction: Query {
fn cycle_fallback(
db: &<Self as QueryDb<'_>>::DynDb,
cycle: &[DatabaseKeyIndex],
cycle: &Cycle,
key: &Self::Key,
) -> Self::Value {
let _ = (db, cycle, key);
@ -240,26 +240,14 @@ pub type CycleParticipants = Arc<Vec<DatabaseKeyIndex>>;
#[derive(Eq, PartialEq, Clone, Debug)]
pub(crate) struct CycleError {
/// The queries that were part of the cycle
pub(crate) cycle: CycleParticipants,
pub(crate) cycle: Cycle,
pub(crate) changed_at: Revision,
pub(crate) durability: Durability,
// pub(crate) recovery_strategy: CycleRecoveryStrategy,
}
impl CycleError {
pub(crate) fn into_unexpected_cycle(self) -> UnexpectedCycle {
UnexpectedCycle {
participants: self.cycle,
}
}
}
impl std::fmt::Display for CycleError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
writeln!(f, "Internal error, cycle detected:\n")?;
for i in &*self.cycle {
writeln!(f, "{:?}", i)?;
}
Ok(())
writeln!(f, "Internal error, cycle detected: {:?}", self.cycle)
}
}

View file

@ -1,7 +1,7 @@
use crate::durability::Durability;
use crate::plumbing::{CycleDetected, CycleError, CycleParticipants, CycleRecoveryStrategy};
use crate::plumbing::{CycleDetected, CycleError, CycleRecoveryStrategy};
use crate::revision::{AtomicRevision, Revision};
use crate::{Cancelled, Database, DatabaseKeyIndex, Event, EventKind};
use crate::{Cancelled, Cycle, Database, DatabaseKeyIndex, Event, EventKind};
use log::debug;
use parking_lot::lock_api::{RawRwLock, RawRwLockRecursive};
use parking_lot::{Mutex, MutexGuard, RwLock};
@ -271,7 +271,7 @@ impl Runtime {
let durability = from_stack.last().unwrap().durability;
// Identify the cycle participants:
let cycle_participants = {
let cycle = {
let mut v = vec![];
dg.for_each_cycle_participant(
from_id,
@ -280,12 +280,12 @@ impl Runtime {
to_id,
|aq| v.push(aq.database_key_index),
);
Arc::new(v)
Cycle::new(Arc::new(v))
};
debug!("cycle participants {:?}", cycle_participants);
debug!("cycle {:?}", cycle.debug(db));
// Identify cycle recovery strategy:
let recovery_strategy = self.mutual_cycle_recovery_strategy(db, &cycle_participants);
let recovery_strategy = self.mutual_cycle_recovery_strategy(db, &cycle);
debug!("cycle recovery strategy {:?}", recovery_strategy);
// If using fallback, we have to mark the cycle participants, so they know to recover.
@ -299,7 +299,7 @@ impl Runtime {
database_key_index,
to_id,
|aq| {
aq.cycle = Some(cycle_participants.clone());
aq.cycle = Some(cycle.clone());
},
);
}
@ -310,7 +310,7 @@ impl Runtime {
CycleDetected {
recovery_strategy,
cycle_error: CycleError {
cycle: cycle_participants,
cycle,
changed_at,
durability,
},
@ -320,16 +320,17 @@ impl Runtime {
fn mutual_cycle_recovery_strategy(
&self,
db: &dyn Database,
cycle: &[DatabaseKeyIndex],
cycle: &Cycle,
) -> CycleRecoveryStrategy {
let crs = db.cycle_recovery_strategy(cycle[0]);
if let Some(key) = cycle[1..]
let participants = &cycle.participants;
let crs = db.cycle_recovery_strategy(participants[0]);
if let Some(key) = participants[1..]
.iter()
.copied()
.find(|&key| db.cycle_recovery_strategy(key) != crs)
{
debug!("mutual_cycle_recovery_strategy: cycle had multiple strategies ({:?} for {:?} vs {:?} for {:?})",
crs, cycle[0],
crs, participants[0],
db.cycle_recovery_strategy(key), key
);
CycleRecoveryStrategy::Panic
@ -484,7 +485,7 @@ struct ActiveQuery {
dependencies: Option<FxIndexSet<DatabaseKeyIndex>>,
/// Stores the entire cycle, if one is found and this query is part of it.
cycle: Option<CycleParticipants>,
cycle: Option<Cycle>,
}
impl ActiveQuery {

View file

@ -1,7 +1,7 @@
use crate::durability::Durability;
use crate::plumbing::CycleParticipants;
use crate::runtime::ActiveQuery;
use crate::runtime::Revision;
use crate::Cycle;
use crate::Database;
use crate::DatabaseKeyIndex;
use crate::Event;
@ -42,8 +42,9 @@ pub(crate) struct ComputedQueryResult<V> {
/// there was an untracked the read.
pub(crate) dependencies: Option<FxIndexSet<DatabaseKeyIndex>>,
/// The cycle if one occured while computing this value
pub(crate) cycle: Option<CycleParticipants>,
/// If this node participated in a cycle, then this value is set
/// to the cycle in which it participated.
pub(crate) cycle_participant: Option<Cycle>,
}
impl Default for LocalState {
@ -211,7 +212,7 @@ impl ActiveQueryGuard<'_> {
durability,
changed_at,
dependencies,
cycle,
cycle_participant: cycle,
}
}
}

View file

@ -117,15 +117,15 @@ trait Database: salsa::Database {
fn cycle_c(&self) -> Result<(), Error>;
}
fn recover_a(_db: &dyn Database, cycle: &[String]) -> Result<(), Error> {
fn recover_a(db: &dyn Database, cycle: &salsa::Cycle) -> Result<(), Error> {
Err(Error {
cycle: cycle.to_owned(),
cycle: cycle.all_participants(db),
})
}
fn recover_b(_db: &dyn Database, cycle: &[String]) -> Result<(), Error> {
fn recover_b(db: &dyn Database, cycle: &salsa::Cycle) -> Result<(), Error> {
Err(Error {
cycle: cycle.to_owned(),
cycle: cycle.all_participants(db),
})
}
@ -218,14 +218,12 @@ fn inner_cycle() {
let err = query.cycle_c();
assert!(err.is_err());
let cycle = err.unwrap_err().cycle;
assert!(
cycle
.iter()
.zip(&["cycle_b", "cycle_a"])
.all(|(l, r)| l.contains(r)),
"{:#?}",
cycle
);
insta::assert_debug_snapshot!(cycle, @r###"
[
"cycle_a(())",
"cycle_b(())",
]
"###);
}
#[test]
@ -309,3 +307,30 @@ fn cycle_mixed_2() {
v => panic!("unexpected result: {:?}", v),
}
}
#[test]
fn cycle_deterministic_order() {
// No matter whether we start from A or B, we get the same set of participants:
let a = DatabaseImpl::default().cycle_a();
let b = DatabaseImpl::default().cycle_b();
insta::assert_debug_snapshot!((a, b), @r###"
(
Err(
Error {
cycle: [
"cycle_a(())",
"cycle_b(())",
],
},
),
Err(
Error {
cycle: [
"cycle_a(())",
"cycle_b(())",
],
},
),
)
"###);
}

View file

@ -28,22 +28,38 @@ use test_env_log::test;
// a2 sees cycle, recovers
// a1 completes, recovers
pub(crate) fn recover_from_cycle_a1(_db: &dyn ParDatabase, _cycle: &[String], key: &i32) -> i32 {
pub(crate) fn recover_from_cycle_a1(
_db: &dyn ParDatabase,
_cycle: &salsa::Cycle,
key: &i32,
) -> i32 {
log::debug!("recover_from_cycle_a1");
key * 10 + 1
}
pub(crate) fn recover_from_cycle_a2(_db: &dyn ParDatabase, _cycle: &[String], key: &i32) -> i32 {
pub(crate) fn recover_from_cycle_a2(
_db: &dyn ParDatabase,
_cycle: &salsa::Cycle,
key: &i32,
) -> i32 {
log::debug!("recover_from_cycle_a2");
key * 10 + 2
}
pub(crate) fn recover_from_cycle_b1(_db: &dyn ParDatabase, _cycle: &[String], key: &i32) -> i32 {
pub(crate) fn recover_from_cycle_b1(
_db: &dyn ParDatabase,
_cycle: &salsa::Cycle,
key: &i32,
) -> i32 {
log::debug!("recover_from_cycle_b1");
key * 20 + 1
}
pub(crate) fn recover_from_cycle_b2(_db: &dyn ParDatabase, _cycle: &[String], key: &i32) -> i32 {
pub(crate) fn recover_from_cycle_b2(
_db: &dyn ParDatabase,
_cycle: &salsa::Cycle,
key: &i32,
) -> i32 {
log::debug!("recover_from_cycle_b2");
key * 20 + 2
}