move the stack instead of cloning

Currently, when one thread blocks on another, we clone the
stack from that task. This results in a lot of clones, but it also
means that we can't mark all the frames involved in a cycle atomically.
Instead, when we propagate information between threads, we also
propagate the participants of the cycle and so forth.

This branch *moves* the stack into the runtime while a thread
is blocked, and then moves it back out when the thread resumes.
This permits the runtime to mark all the cycle participants at once.
It also avoids cloning.
This commit is contained in:
Niko Matsakis 2021-10-28 17:06:27 -04:00
parent e83bae717d
commit 35ddd36b49
4 changed files with 173 additions and 247 deletions

View file

@ -349,7 +349,6 @@ where
if result.cycle.is_empty() {
ProbeState::Retry
} else {
runtime.mark_cycle_participants(&result.cycle);
ProbeState::UpToDate(Ok(StampedValue {
value: Q::cycle_fallback(db, &result.cycle, &self.key),
durability: result.value.durability,

View file

@ -305,16 +305,26 @@ impl Runtime {
database_key_index
);
let cycle = self.find_cycle_participants(database_key_index, error);
let crs = self.mutual_cycle_recovery_strategy(db, &cycle);
let mut cycle_participants = vec![];
let mut stack = self.local_state.take_query_stack();
let mut dg = self.shared_state.dependency_graph.lock();
dg.for_each_cycle_participant(error.from, &mut stack, database_key_index, error.to, |aq| {
cycle_participants.push(aq.database_key_index)
});
dg.for_each_cycle_participant(error.from, &mut stack, database_key_index, error.to, |aq| {
aq.cycle = cycle_participants.clone()
});
self.local_state.restore_query_stack(stack);
let crs = self.mutual_cycle_recovery_strategy(db, &cycle_participants);
debug!(
"cycle recovery strategy {:?} for participants {:?}",
crs, cycle
crs, cycle_participants
);
(
crs,
CycleError {
cycle,
cycle: cycle_participants,
changed_at,
durability: Durability::MAX,
},
@ -342,85 +352,6 @@ impl Runtime {
}
}
fn find_cycle_participants(
&self,
database_key_index: DatabaseKeyIndex,
error: CycleDetected,
) -> Vec<DatabaseKeyIndex> {
let mut query_stack = self.local_state.borrow_query_stack_mut();
if error.from == error.to {
// All queries in the cycle is local
let start_index = match query_stack
.iter()
.rposition(|active_query| active_query.database_key_index == database_key_index)
{
Some(i) => i,
None => panic!(
"did not find {:?} on the stack: {:?}",
database_key_index,
query_stack
.iter()
.map(|s| s.database_key_index)
.collect::<Vec<_>>()
),
};
let mut cycle = Vec::new();
let cycle_participants = &mut query_stack[start_index..];
for active_query in &mut *cycle_participants {
cycle.push(active_query.database_key_index);
}
assert!(!cycle.is_empty());
for active_query in cycle_participants {
active_query.cycle = cycle.clone();
}
cycle
} else {
// Part of the cycle is on another thread so we need to lock and inspect the shared
// state
let dependency_graph = self.shared_state.dependency_graph.lock();
let mut cycle = Vec::new();
dependency_graph.push_cycle_path(
database_key_index,
error.to,
query_stack.iter().map(|query| query.database_key_index),
&mut cycle,
);
cycle.push(database_key_index);
assert!(!cycle.is_empty());
for active_query in query_stack
.iter_mut()
.filter(|query| cycle.iter().any(|key| *key == query.database_key_index))
{
active_query.cycle = cycle.clone();
}
cycle
}
}
pub(crate) fn mark_cycle_participants(&self, participants: &[DatabaseKeyIndex]) {
for active_query in self
.local_state
.borrow_query_stack_mut()
.iter_mut()
.rev()
.take_while(|active_query| {
participants
.iter()
.any(|e| *e == active_query.database_key_index)
})
{
active_query.cycle = participants.to_owned();
}
}
/// Try to make this runtime blocked on `other_id`. Returns true
/// upon success or false if `other_id` is already blocked on us.
pub(crate) fn try_block_on<QueryMutexGuard>(
@ -446,17 +377,20 @@ impl Runtime {
},
});
Ok(DependencyGraph::block_on(
let stack = self.local_state.take_query_stack();
let (stack, result) = DependencyGraph::block_on(
dg,
self.id(),
database_key,
other_id,
self.local_state
.borrow_query_stack()
.iter()
.map(|query| query.database_key_index),
stack,
query_mutex_guard,
))
);
self.local_state.restore_query_stack(stack);
Ok(result)
}
}
@ -550,6 +484,7 @@ impl std::fmt::Debug for SharedState {
}
}
#[derive(Debug)]
struct ActiveQuery {
/// What query is executing
database_key_index: DatabaseKeyIndex,

View file

@ -1,11 +1,14 @@
use std::sync::Arc;
use crate::runtime::WaitResult;
use crate::{DatabaseKeyIndex, RuntimeId};
use parking_lot::MutexGuard;
use rustc_hash::FxHashMap;
use smallvec::SmallVec;
use super::{ActiveQuery, WaitResult};
type QueryStack = Vec<ActiveQuery>;
#[derive(Debug, Default)]
pub(super) struct DependencyGraph {
/// A `(K -> V)` pair in this map indicates that the the runtime
@ -21,7 +24,7 @@ pub(super) struct DependencyGraph {
/// When a key K completes which had dependent queries Qs blocked on it,
/// it stores its `WaitResult` here. As they wake up, each query Q in Qs will
/// come here to fetch their results.
wait_results: FxHashMap<RuntimeId, Option<WaitResult>>,
wait_results: FxHashMap<RuntimeId, (QueryStack, Option<WaitResult>)>,
/// Signalled whenever a query with dependents completes.
/// Allows those dependents to check if they are ready to unblock.
@ -30,8 +33,9 @@ pub(super) struct DependencyGraph {
#[derive(Debug)]
struct Edge {
id: RuntimeId,
path: Vec<DatabaseKeyIndex>,
blocked_on_id: RuntimeId,
blocked_on_key: DatabaseKeyIndex,
stack: QueryStack,
}
impl DependencyGraph {
@ -40,14 +44,78 @@ impl DependencyGraph {
/// (i.e., there is a path from `from_id` to `to_id` in the graph.)
pub(super) fn depends_on(&mut self, from_id: RuntimeId, to_id: RuntimeId) -> bool {
let mut p = from_id;
while let Some(q) = self.edges.get(&p).map(|edge| edge.id) {
while let Some(q) = self.edges.get(&p).map(|edge| edge.blocked_on_id) {
if q == to_id {
return true;
}
p = q;
}
false
p == to_id
}
/// Invokes `closure` with a `&mut ActiveQuery` for each query that participates in the cycle.
/// The cycle runs as follows:
///
/// 1. The runtime `from_id`, which has the stack `from_stack`, would like to invoke `database_key`...
/// 2. ...but `database_key` is already being executed by `to_id`...
/// 3. ...and `to_id` is transitively dependent on something which is present on `from_stack`.
pub(super) fn for_each_cycle_participant(
&mut self,
from_id: RuntimeId,
from_stack: &mut QueryStack,
database_key: DatabaseKeyIndex,
to_id: RuntimeId,
mut closure: impl FnMut(&mut ActiveQuery),
) {
debug_assert!(self.depends_on(to_id, from_id));
// To understand this algorithm, consider this [drawing](https://is.gd/TGLI9v):
//
// database_key = QB2
// from_id = A
// to_id = B
// from_stack = [QA1, QA2, QA3]
//
// self.edges[B] = { C, QC2, [QB1..QB3] }
// self.edges[C] = { A, QA2, [QC1..QC3] }
//
// The cyclic
// edge we have
// failed to add.
// :
// A : B C
// :
// QA1 v QB1 QC1
// ┌► QA2 ┌──► QB2 ┌─► QC2
// │ QA3 ───┘ QB3 ──┘ QC3 ───┐
// │ │
// └───────────────────────────────┘
//
// Final output: [QB2, QB3, QC2, QC3, QA2, QA3]
let mut id = to_id;
let mut key = database_key;
while id != from_id {
// Looking at the diagram above, the idea is to
// take the edge from `to_id` starting at `key`
// (inclusive) and down to the end. We can then
// load up the next thread (i.e., we start at B/QB2,
// and then load up the dependency on C/QC2).
let edge = self.edges.get_mut(&id).unwrap();
edge.stack
.iter_mut()
.skip_while(|p| p.database_key_index != key)
.for_each(&mut closure);
id = edge.blocked_on_id;
key = edge.blocked_on_key;
}
// Finally, we copy in the results from `from_stack`.
from_stack
.iter_mut()
.skip_while(|p| p.database_key_index != key)
.for_each(&mut closure);
}
/// Modifies the graph so that `from_id` is blocked
@ -68,10 +136,10 @@ impl DependencyGraph {
from_id: RuntimeId,
database_key: DatabaseKeyIndex,
to_id: RuntimeId,
path: impl IntoIterator<Item = DatabaseKeyIndex>,
from_stack: QueryStack,
query_mutex_guard: QueryMutexGuard,
) -> Option<WaitResult> {
me.add_edge(from_id, database_key, to_id, path);
) -> (QueryStack, Option<WaitResult>) {
me.add_edge(from_id, database_key, to_id, from_stack);
// Release the mut&mut meex that prevents `database_key`
// from completing, now that the edge has been added.
@ -82,9 +150,9 @@ impl DependencyGraph {
let condvar = me.condvar.clone();
loop {
if let Some(wait_result) = me.wait_results.remove(&from_id) {
if let Some(stack_and_result) = me.wait_results.remove(&from_id) {
debug_assert!(!me.edges.contains_key(&from_id));
return wait_result;
return stack_and_result;
}
condvar.wait(&mut me);
}
@ -98,7 +166,7 @@ impl DependencyGraph {
from_id: RuntimeId,
database_key: DatabaseKeyIndex,
to_id: RuntimeId,
path: impl IntoIterator<Item = DatabaseKeyIndex>,
from_stack: QueryStack,
) {
assert_ne!(from_id, to_id);
debug_assert!(!self.edges.contains_key(&from_id));
@ -107,8 +175,9 @@ impl DependencyGraph {
self.edges.insert(
from_id,
Edge {
id: to_id,
path: path.into_iter().chain(Some(database_key.clone())).collect(),
blocked_on_id: to_id,
blocked_on_key: database_key,
stack: from_stack,
},
);
self.query_dependents
@ -125,121 +194,20 @@ impl DependencyGraph {
to_id: RuntimeId,
wait_result: Option<WaitResult>,
) {
let dependents = self.remove_edge(database_key, to_id);
let dependents = self
.query_dependents
.remove(&database_key)
.unwrap_or_default();
for d in dependents {
self.wait_results.insert(d, wait_result.clone());
for from_id in dependents {
let edge = self.edges.remove(&from_id).expect("no edge for dependent");
assert_eq!(to_id, edge.blocked_on_id);
self.wait_results
.insert(from_id, (edge.stack, wait_result.clone()));
}
// Now that we have inserted the `wait_results`,
// notify all potential dependents.
self.condvar.notify_all();
}
/// Remove all dependency edges into `database_key`
/// (being computed by `to_id`) and return the list of
/// dependent runtimes that were waiting for `database_key`
/// to complete.
fn remove_edge(
&mut self,
database_key: DatabaseKeyIndex,
to_id: RuntimeId,
) -> impl IntoIterator<Item = RuntimeId> {
let vec = self
.query_dependents
.remove(&database_key)
.unwrap_or_default();
for from_id in &vec {
let to_id1 = self.edges.remove(from_id).map(|edge| edge.id);
assert_eq!(Some(to_id), to_id1);
}
vec
}
pub(super) fn push_cycle_path(
&self,
database_key: DatabaseKeyIndex,
to: RuntimeId,
local_path: impl IntoIterator<Item = DatabaseKeyIndex>,
output: &mut Vec<DatabaseKeyIndex>,
) {
let mut current = Some((to, std::slice::from_ref(&database_key)));
let mut last = None;
let mut local_path = Some(local_path);
loop {
match current.take() {
Some((id, path)) => {
let link_key = path.last().unwrap();
output.extend(path.iter().cloned());
current = self.edges.get(&id).map(|edge| {
let i = edge.path.iter().rposition(|p| p == link_key).unwrap();
(edge.id, &edge.path[i + 1..])
});
if current.is_none() {
last = local_path.take().map(|local_path| {
local_path
.into_iter()
.skip_while(move |p| *p != *link_key)
.skip(1)
});
}
}
None => break,
}
}
if let Some(iter) = &mut last {
output.extend(iter);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
fn dki(n: u32) -> DatabaseKeyIndex {
DatabaseKeyIndex {
group_index: 0,
query_index: 0,
key_index: n,
}
}
macro_rules! dkivec {
($($n:expr),*) => {
vec![$(dki($n)),*]
}
}
#[test]
fn dependency_graph_path1() {
let mut graph = DependencyGraph::default();
let a = RuntimeId { counter: 0 };
let b = RuntimeId { counter: 1 };
graph.add_edge(a, dki(2), b, dkivec![1]);
let mut v = vec![];
graph.push_cycle_path(dki(1), a, dkivec![3, 2], &mut v);
assert_eq!(v, vec![dki(1), dki(2)]);
}
#[test]
fn dependency_graph_path2() {
let mut graph = DependencyGraph::default();
let a = RuntimeId { counter: 0 };
let b = RuntimeId { counter: 1 };
let c = RuntimeId { counter: 2 };
graph.add_edge(a, dki(3), b, dkivec![1]);
graph.add_edge(b, dki(4), c, dkivec![2, 3]);
// assert!(graph.add_edge(c, &1, a, vec![5, 6, 4, 7]));
let mut v = vec![];
graph.push_cycle_path(dki(1), a, dkivec![5, 6, 4, 7], &mut v);
assert_eq!(v, dkivec![1, 3, 4, 7]);
}
}

View file

@ -2,7 +2,7 @@ use crate::durability::Durability;
use crate::runtime::ActiveQuery;
use crate::runtime::Revision;
use crate::DatabaseKeyIndex;
use std::cell::{Ref, RefCell, RefMut};
use std::cell::RefCell;
/// State that is specific to a single execution thread.
///
@ -13,15 +13,18 @@ use std::cell::{Ref, RefCell, RefMut};
pub(super) struct LocalState {
/// Vector of active queries.
///
/// This is normally `Some`, but it is set to `None`
/// while the query is blocked waiting for a result.
///
/// Unwinding note: pushes onto this vector must be popped -- even
/// during unwinding.
query_stack: RefCell<Vec<ActiveQuery>>,
query_stack: RefCell<Option<Vec<ActiveQuery>>>,
}
impl Default for LocalState {
fn default() -> Self {
LocalState {
query_stack: Default::default(),
query_stack: RefCell::new(Some(Vec::new())),
}
}
}
@ -33,6 +36,7 @@ impl LocalState {
max_durability: Durability,
) -> ActiveQueryGuard<'_> {
let mut query_stack = self.query_stack.borrow_mut();
let query_stack = query_stack.as_mut().expect("local stack taken");
query_stack.push(ActiveQuery::new(database_key_index, max_durability));
ActiveQueryGuard {
local_state: self,
@ -40,28 +44,24 @@ impl LocalState {
}
}
/// Returns a reference to the active query stack.
///
/// **Warning:** Because this reference holds the ref-cell lock,
/// you should not use any mutating methods of `LocalState` while
/// reading from it.
pub(super) fn borrow_query_stack(&self) -> Ref<'_, Vec<ActiveQuery>> {
self.query_stack.borrow()
}
pub(super) fn borrow_query_stack_mut(&self) -> RefMut<'_, Vec<ActiveQuery>> {
self.query_stack.borrow_mut()
fn with_query_stack<R>(&self, c: impl FnOnce(&mut Vec<ActiveQuery>) -> R) -> R {
c(self
.query_stack
.borrow_mut()
.as_mut()
.expect("query stack taken"))
}
pub(super) fn query_in_progress(&self) -> bool {
!self.query_stack.borrow().is_empty()
self.with_query_stack(|stack| !stack.is_empty())
}
pub(super) fn active_query(&self) -> Option<DatabaseKeyIndex> {
self.query_stack
.borrow()
self.with_query_stack(|stack| {
stack
.last()
.map(|active_query| active_query.database_key_index)
})
}
pub(super) fn report_query_read(
@ -70,21 +70,45 @@ impl LocalState {
durability: Durability,
changed_at: Revision,
) {
if let Some(top_query) = self.query_stack.borrow_mut().last_mut() {
self.with_query_stack(|stack| {
if let Some(top_query) = stack.last_mut() {
top_query.add_read(input, durability, changed_at);
}
})
}
pub(super) fn report_untracked_read(&self, current_revision: Revision) {
if let Some(top_query) = self.query_stack.borrow_mut().last_mut() {
self.with_query_stack(|stack| {
if let Some(top_query) = stack.last_mut() {
top_query.add_untracked_read(current_revision);
}
})
}
pub(super) fn report_synthetic_read(&self, durability: Durability, current_revision: Revision) {
if let Some(top_query) = self.query_stack.borrow_mut().last_mut() {
self.with_query_stack(|stack| {
if let Some(top_query) = stack.last_mut() {
top_query.add_synthetic_read(durability, current_revision);
}
})
}
/// Takes the query stack and returns it. This is used when
/// the current thread is blocking. The stack must be restored
/// with [`Self::restore_query_stack`] when the thread unblocks.
pub(super) fn take_query_stack(&self) -> Vec<ActiveQuery> {
assert!(
self.query_stack.borrow().is_some(),
"query stack already taken"
);
self.query_stack.replace(None).unwrap()
}
/// Restores a query stack taken with [`Self::take_query_stack`] once
/// the thread unblocks.
pub(super) fn restore_query_stack(&self, stack: Vec<ActiveQuery>) {
assert!(self.query_stack.borrow().is_none(), "query stack not taken");
self.query_stack.replace(Some(stack));
}
}
@ -101,12 +125,12 @@ pub(super) struct ActiveQueryGuard<'me> {
impl ActiveQueryGuard<'_> {
fn pop_helper(&self) -> ActiveQuery {
let mut query_stack = self.local_state.query_stack.borrow_mut();
self.local_state.with_query_stack(|stack| {
// Sanity check: pushes and pops should be balanced.
assert_eq!(query_stack.len(), self.push_len);
assert_eq!(stack.len(), self.push_len);
query_stack.pop().unwrap()
stack.pop().unwrap()
})
}
/// Invoked when the query has successfully completed execution.