mirror of
https://github.com/salsa-rs/salsa.git
synced 2025-01-23 13:10:19 +00:00
cleanup: move cycle recovery into try_block_on
This is just a refactoring.
This commit is contained in:
parent
f659e1d2dd
commit
4a1dffe7bc
3 changed files with 70 additions and 53 deletions
|
@ -378,21 +378,17 @@ where
|
||||||
return match self.block_on_in_progress_thread(db, runtime, other_id, state) {
|
return match self.block_on_in_progress_thread(db, runtime, other_id, state) {
|
||||||
Ok(WaitResult::Panicked) => Cancelled::throw(),
|
Ok(WaitResult::Panicked) => Cancelled::throw(),
|
||||||
Ok(WaitResult::Completed) => ProbeState::Retry,
|
Ok(WaitResult::Completed) => ProbeState::Retry,
|
||||||
Err(err) => ProbeState::UpToDate(
|
Err(CycleDetected {
|
||||||
match runtime.report_unexpected_cycle(
|
recovery_strategy,
|
||||||
db.ops_database(),
|
cycle_error,
|
||||||
self.database_key_index,
|
}) => ProbeState::UpToDate(match recovery_strategy {
|
||||||
err,
|
CycleRecoveryStrategy::Panic => Err(cycle_error),
|
||||||
revision_now,
|
CycleRecoveryStrategy::Fallback => Ok(StampedValue {
|
||||||
) {
|
value: Q::cycle_fallback(db, &cycle_error.cycle, &self.key),
|
||||||
(CycleRecoveryStrategy::Panic, err) => Err(err),
|
changed_at: cycle_error.changed_at,
|
||||||
(CycleRecoveryStrategy::Fallback, err) => Ok(StampedValue {
|
durability: cycle_error.durability,
|
||||||
value: Q::cycle_fallback(db, &err.cycle, &self.key),
|
}),
|
||||||
changed_at: err.changed_at,
|
}),
|
||||||
durability: err.durability,
|
|
||||||
}),
|
|
||||||
},
|
|
||||||
),
|
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -6,7 +6,6 @@ use crate::Database;
|
||||||
use crate::Query;
|
use crate::Query;
|
||||||
use crate::QueryTable;
|
use crate::QueryTable;
|
||||||
use crate::QueryTableMut;
|
use crate::QueryTableMut;
|
||||||
use crate::RuntimeId;
|
|
||||||
use std::borrow::Borrow;
|
use std::borrow::Borrow;
|
||||||
use std::fmt::Debug;
|
use std::fmt::Debug;
|
||||||
use std::hash::Hash;
|
use std::hash::Hash;
|
||||||
|
@ -21,8 +20,8 @@ pub use crate::{revision::Revision, DatabaseKeyIndex, QueryDb, Runtime};
|
||||||
|
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
pub struct CycleDetected {
|
pub struct CycleDetected {
|
||||||
pub(crate) from: RuntimeId,
|
pub(crate) recovery_strategy: CycleRecoveryStrategy,
|
||||||
pub(crate) to: RuntimeId,
|
pub(crate) cycle_error: crate::CycleError,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Defines various associated types. An impl of this
|
/// Defines various associated types. An impl of this
|
||||||
|
|
|
@ -4,7 +4,7 @@ use crate::revision::{AtomicRevision, Revision};
|
||||||
use crate::{Cancelled, Database, DatabaseKeyIndex, Event, EventKind};
|
use crate::{Cancelled, Database, DatabaseKeyIndex, Event, EventKind};
|
||||||
use log::debug;
|
use log::debug;
|
||||||
use parking_lot::lock_api::{RawRwLock, RawRwLockRecursive};
|
use parking_lot::lock_api::{RawRwLock, RawRwLockRecursive};
|
||||||
use parking_lot::{Mutex, RwLock};
|
use parking_lot::{Mutex, MutexGuard, RwLock};
|
||||||
use rustc_hash::FxHasher;
|
use rustc_hash::FxHasher;
|
||||||
use std::hash::{BuildHasherDefault, Hash};
|
use std::hash::{BuildHasherDefault, Hash};
|
||||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||||
|
@ -252,44 +252,69 @@ impl Runtime {
|
||||||
.report_synthetic_read(durability, changed_at);
|
.report_synthetic_read(durability, changed_at);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Obviously, this should be user configurable at some point.
|
fn create_cycle_error(
|
||||||
pub(crate) fn report_unexpected_cycle(
|
|
||||||
&self,
|
&self,
|
||||||
db: &dyn Database,
|
db: &dyn Database,
|
||||||
|
mut dg: MutexGuard<'_, DependencyGraph>,
|
||||||
database_key_index: DatabaseKeyIndex,
|
database_key_index: DatabaseKeyIndex,
|
||||||
error: CycleDetected,
|
to_id: RuntimeId,
|
||||||
changed_at: Revision,
|
) -> CycleDetected {
|
||||||
) -> (CycleRecoveryStrategy, crate::CycleError) {
|
debug!("create_cycle_error(database_key={:?})", database_key_index);
|
||||||
debug!(
|
|
||||||
"report_unexpected_cycle(database_key={:?})",
|
|
||||||
database_key_index
|
|
||||||
);
|
|
||||||
|
|
||||||
let mut cycle_participants = vec![];
|
let mut from_stack = self.local_state.take_query_stack();
|
||||||
let mut stack = self.local_state.take_query_stack();
|
let from_id = self.id();
|
||||||
let mut dg = self.shared_state.dependency_graph.lock();
|
|
||||||
dg.for_each_cycle_participant(error.from, &mut stack, database_key_index, error.to, |aq| {
|
|
||||||
cycle_participants.push(aq.database_key_index);
|
|
||||||
});
|
|
||||||
let cycle_participants = Arc::new(cycle_participants);
|
|
||||||
dg.for_each_cycle_participant(error.from, &mut stack, database_key_index, error.to, |aq| {
|
|
||||||
aq.cycle = Some(cycle_participants.clone());
|
|
||||||
});
|
|
||||||
self.local_state.restore_query_stack(stack);
|
|
||||||
let crs = self.mutual_cycle_recovery_strategy(db, &cycle_participants);
|
|
||||||
debug!(
|
|
||||||
"cycle recovery strategy {:?} for participants {:?}",
|
|
||||||
crs, cycle_participants
|
|
||||||
);
|
|
||||||
|
|
||||||
(
|
// Extract the changed_at and durability values from the top of the stack;
|
||||||
crs,
|
// these reflect the queries which were executed thus far and which
|
||||||
CycleError {
|
// led to the cycle.
|
||||||
|
let changed_at = from_stack.last().unwrap().changed_at;
|
||||||
|
let durability = from_stack.last().unwrap().durability;
|
||||||
|
|
||||||
|
// Identify the cycle participants:
|
||||||
|
let cycle_participants = {
|
||||||
|
let mut v = vec![];
|
||||||
|
dg.for_each_cycle_participant(
|
||||||
|
from_id,
|
||||||
|
&mut from_stack,
|
||||||
|
database_key_index,
|
||||||
|
to_id,
|
||||||
|
|aq| v.push(aq.database_key_index),
|
||||||
|
);
|
||||||
|
Arc::new(v)
|
||||||
|
};
|
||||||
|
debug!("cycle participants {:?}", cycle_participants);
|
||||||
|
|
||||||
|
// Identify cycle recovery strategy:
|
||||||
|
let recovery_strategy = self.mutual_cycle_recovery_strategy(db, &cycle_participants);
|
||||||
|
debug!("cycle recovery strategy {:?}", recovery_strategy);
|
||||||
|
|
||||||
|
// If using fallback, we have to mark the cycle participants, so they know to recover.
|
||||||
|
match recovery_strategy {
|
||||||
|
CycleRecoveryStrategy::Panic => {}
|
||||||
|
CycleRecoveryStrategy::Fallback => {
|
||||||
|
// Mark the cycle participants, so they know to recover:
|
||||||
|
dg.for_each_cycle_participant(
|
||||||
|
from_id,
|
||||||
|
&mut from_stack,
|
||||||
|
database_key_index,
|
||||||
|
to_id,
|
||||||
|
|aq| {
|
||||||
|
aq.cycle = Some(cycle_participants.clone());
|
||||||
|
},
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
self.local_state.restore_query_stack(from_stack);
|
||||||
|
|
||||||
|
CycleDetected {
|
||||||
|
recovery_strategy,
|
||||||
|
cycle_error: CycleError {
|
||||||
cycle: cycle_participants,
|
cycle: cycle_participants,
|
||||||
changed_at,
|
changed_at,
|
||||||
durability: Durability::MAX,
|
durability,
|
||||||
},
|
},
|
||||||
)
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn mutual_cycle_recovery_strategy(
|
fn mutual_cycle_recovery_strategy(
|
||||||
|
@ -325,10 +350,7 @@ impl Runtime {
|
||||||
let mut dg = self.shared_state.dependency_graph.lock();
|
let mut dg = self.shared_state.dependency_graph.lock();
|
||||||
|
|
||||||
if self.id() == other_id || dg.depends_on(other_id, self.id()) {
|
if self.id() == other_id || dg.depends_on(other_id, self.id()) {
|
||||||
Err(CycleDetected {
|
Err(self.create_cycle_error(db, dg, database_key, other_id))
|
||||||
from: self.id(),
|
|
||||||
to: other_id,
|
|
||||||
})
|
|
||||||
} else {
|
} else {
|
||||||
db.salsa_event(Event {
|
db.salsa_event(Event {
|
||||||
runtime_id: self.id(),
|
runtime_id: self.id(),
|
||||||
|
|
Loading…
Reference in a new issue