Remove unnecessary Option from ZalsaLocal::query_stack

This commit is contained in:
Lukas Wirth 2024-12-13 12:13:48 +01:00
parent f65ac4b4c3
commit ac40e4cf44
2 changed files with 91 additions and 111 deletions

View file

@ -1,4 +1,5 @@
use std::{
mem,
panic::panic_any,
sync::{
atomic::{AtomicBool, Ordering},
@ -198,18 +199,18 @@ impl Runtime {
},
});
let stack = local_state.take_query_stack();
let (stack, result) = DependencyGraph::block_on(
dg,
thread_id,
database_key,
other_id,
stack,
query_mutex_guard,
);
local_state.restore_query_stack(stack);
let result = local_state.with_query_stack(|stack| {
let (new_stack, result) = DependencyGraph::block_on(
dg,
thread_id,
database_key,
other_id,
mem::take(stack),
query_mutex_guard,
);
*stack = new_stack;
result
});
match result {
WaitResult::Completed => (),
@ -244,84 +245,85 @@ impl Runtime {
database_key_index
);
let mut from_stack = local_state.take_query_stack();
let from_id = std::thread::current().id();
let (me_recovered, others_recovered, cycle) = local_state.with_query_stack(|from_stack| {
let from_id = std::thread::current().id();
// Make a "dummy stack frame". As we iterate through the cycle, we will collect the
// inputs from each participant. Then, if we are participating in cycle recovery, we
// will propagate those results to all participants.
let mut cycle_query = ActiveQuery::new(database_key_index);
// Make a "dummy stack frame". As we iterate through the cycle, we will collect the
// inputs from each participant. Then, if we are participating in cycle recovery, we
// will propagate those results to all participants.
let mut cycle_query = ActiveQuery::new(database_key_index);
// Identify the cycle participants:
let cycle = {
let mut v = vec![];
dg.for_each_cycle_participant(
from_id,
&mut from_stack,
database_key_index,
to_id,
|aqs| {
aqs.iter_mut().for_each(|aq| {
cycle_query.add_from(aq);
v.push(aq.database_key_index);
// Identify the cycle participants:
let cycle = {
let mut v = vec![];
dg.for_each_cycle_participant(
from_id,
from_stack,
database_key_index,
to_id,
|aqs| {
aqs.iter_mut().for_each(|aq| {
cycle_query.add_from(aq);
v.push(aq.database_key_index);
});
},
);
// We want to give the participants in a deterministic order
// (at least for this execution, not necessarily across executions),
// no matter where it started on the stack. Find the minimum
// key and rotate it to the front.
if let Some((_, index, _)) = v
.iter()
.enumerate()
.map(|(idx, key)| (key.ingredient_index.debug_name(db), idx, key))
.min()
{
v.rotate_left(index);
}
Cycle::new(Arc::new(v.into_boxed_slice()))
};
tracing::debug!("cycle {cycle:?}, cycle_query {cycle_query:#?}");
// We can remove the cycle participants from the list of dependencies;
// they are a strongly connected component (SCC) and we only care about
// dependencies to things outside the SCC that control whether it will
// form again.
cycle_query.remove_cycle_participants(&cycle);
// Mark each cycle participant that has recovery set, along with
// any frames that come after them on the same thread. Those frames
// are going to be unwound so that fallback can occur.
dg.for_each_cycle_participant(from_id, from_stack, database_key_index, to_id, |aqs| {
aqs.iter_mut()
.skip_while(|aq| {
match db
.zalsa()
.lookup_ingredient(aq.database_key_index.ingredient_index)
.cycle_recovery_strategy()
{
CycleRecoveryStrategy::Panic => true,
CycleRecoveryStrategy::Fallback => false,
}
})
.for_each(|aq| {
tracing::debug!("marking {:?} for fallback", aq.database_key_index);
aq.take_inputs_from(&cycle_query);
assert!(aq.cycle.is_none());
aq.cycle = Some(cycle.clone());
});
},
);
});
// We want to give the participants in a deterministic order
// (at least for this execution, not necessarily across executions),
// no matter where it started on the stack. Find the minimum
// key and rotate it to the front.
if let Some((_, index, _)) = v
.iter()
.enumerate()
.map(|(idx, key)| (key.ingredient_index.debug_name(db), idx, key))
.min()
{
v.rotate_left(index);
}
Cycle::new(Arc::new(v.into_boxed_slice()))
};
tracing::debug!("cycle {cycle:?}, cycle_query {cycle_query:#?}");
// We can remove the cycle participants from the list of dependencies;
// they are a strongly connected component (SCC) and we only care about
// dependencies to things outside the SCC that control whether it will
// form again.
cycle_query.remove_cycle_participants(&cycle);
// Mark each cycle participant that has recovery set, along with
// any frames that come after them on the same thread. Those frames
// are going to be unwound so that fallback can occur.
dg.for_each_cycle_participant(from_id, &mut from_stack, database_key_index, to_id, |aqs| {
aqs.iter_mut()
.skip_while(|aq| {
match db
.zalsa()
.lookup_ingredient(aq.database_key_index.ingredient_index)
.cycle_recovery_strategy()
{
CycleRecoveryStrategy::Panic => true,
CycleRecoveryStrategy::Fallback => false,
}
})
.for_each(|aq| {
tracing::debug!("marking {:?} for fallback", aq.database_key_index);
aq.take_inputs_from(&cycle_query);
assert!(aq.cycle.is_none());
aq.cycle = Some(cycle.clone());
});
// Unblock every thread that has cycle recovery with a `WaitResult::Cycle`.
// They will throw the cycle, which will be caught by the frame that has
// cycle recovery so that it can execute that recovery.
let (me_recovered, others_recovered) =
dg.maybe_unblock_runtimes_in_cycle(from_id, from_stack, database_key_index, to_id);
(me_recovered, others_recovered, cycle)
});
// Unblock every thread that has cycle recovery with a `WaitResult::Cycle`.
// They will throw the cycle, which will be caught by the frame that has
// cycle recovery so that it can execute that recovery.
let (me_recovered, others_recovered) =
dg.maybe_unblock_runtimes_in_cycle(from_id, &from_stack, database_key_index, to_id);
local_state.restore_query_stack(from_stack);
if me_recovered {
// If the current thread has recovery, we want to throw
// so that it can begin.

View file

@ -37,7 +37,7 @@ pub struct ZalsaLocal {
///
/// Unwinding note: pushes onto this vector must be popped -- even
/// during unwinding.
query_stack: RefCell<Option<Vec<ActiveQuery>>>,
query_stack: RefCell<Vec<ActiveQuery>>,
/// Stores the most recent page for a given ingredient.
/// This is thread-local to avoid contention.
@ -47,7 +47,7 @@ pub struct ZalsaLocal {
impl ZalsaLocal {
pub(crate) fn new() -> Self {
ZalsaLocal {
query_stack: RefCell::new(Some(vec![])),
query_stack: RefCell::new(vec![]),
most_recent_pages: RefCell::new(FxHashMap::default()),
}
}
@ -88,7 +88,6 @@ impl ZalsaLocal {
#[inline]
pub(crate) fn push_query(&self, database_key_index: DatabaseKeyIndex) -> ActiveQueryGuard<'_> {
let mut query_stack = self.query_stack.borrow_mut();
let query_stack = query_stack.as_mut().expect("local stack taken");
query_stack.push(ActiveQuery::new(database_key_index));
ActiveQueryGuard {
local_state: self,
@ -97,12 +96,9 @@ impl ZalsaLocal {
}
}
fn with_query_stack<R>(&self, c: impl FnOnce(&mut Vec<ActiveQuery>) -> R) -> R {
c(self
.query_stack
.borrow_mut()
.as_mut()
.expect("query stack taken"))
/// Executes a closure within the context of the current active query stacks.
pub(crate) fn with_query_stack<R>(&self, c: impl FnOnce(&mut Vec<ActiveQuery>) -> R) -> R {
c(self.query_stack.borrow_mut().as_mut())
}
fn query_in_progress(&self) -> bool {
@ -233,24 +229,6 @@ impl ZalsaLocal {
})
}
/// Takes the query stack and returns it. This is used when
/// the current thread is blocking. The stack must be restored
/// with [`Self::restore_query_stack`] when the thread unblocks.
pub(crate) fn take_query_stack(&self) -> Vec<ActiveQuery> {
assert!(
self.query_stack.borrow().is_some(),
"query stack already taken"
);
self.query_stack.take().unwrap()
}
/// Restores a query stack taken with [`Self::take_query_stack`] once
/// the thread unblocks.
pub(crate) fn restore_query_stack(&self, stack: Vec<ActiveQuery>) {
assert!(self.query_stack.borrow().is_none(), "query stack not taken");
self.query_stack.replace(Some(stack));
}
/// Called when the active queries creates an index from the
/// entity table with the index `entity_index`. Has the following effects:
///