use std::sync::Arc; use std::thread::ThreadId; use crate::active_query::ActiveQuery; use crate::key::DatabaseKeyIndex; use crate::runtime::WaitResult; use parking_lot::{Condvar, MutexGuard}; use rustc_hash::FxHashMap; use smallvec::SmallVec; type QueryStack = Vec; #[derive(Debug, Default)] pub(super) struct DependencyGraph { /// A `(K -> V)` pair in this map indicates that the the runtime /// `K` is blocked on some query executing in the runtime `V`. /// This encodes a graph that must be acyclic (or else deadlock /// will result). edges: FxHashMap, /// Encodes the `ThreadId` that are blocked waiting for the result /// of a given query. query_dependents: FxHashMap>, /// When a key K completes which had dependent queries Qs blocked on it, /// it stores its `WaitResult` here. As they wake up, each query Q in Qs will /// come here to fetch their results. wait_results: FxHashMap, } #[derive(Debug)] struct Edge { blocked_on_id: ThreadId, blocked_on_key: DatabaseKeyIndex, stack: QueryStack, /// Signalled whenever a query with dependents completes. /// Allows those dependents to check if they are ready to unblock. condvar: Arc, } impl DependencyGraph { /// True if `from_id` depends on `to_id`. /// /// (i.e., there is a path from `from_id` to `to_id` in the graph.) pub(super) fn depends_on(&mut self, from_id: ThreadId, to_id: ThreadId) -> bool { let mut p = from_id; while let Some(q) = self.edges.get(&p).map(|edge| edge.blocked_on_id) { if q == to_id { return true; } p = q; } p == to_id } /// Invokes `closure` with a `&mut ActiveQuery` for each query that participates in the cycle. /// The cycle runs as follows: /// /// 1. The runtime `from_id`, which has the stack `from_stack`, would like to invoke `database_key`... /// 2. ...but `database_key` is already being executed by `to_id`... /// 3. ...and `to_id` is transitively dependent on something which is present on `from_stack`. pub(super) fn for_each_cycle_participant( &mut self, from_id: ThreadId, from_stack: &mut QueryStack, database_key: DatabaseKeyIndex, to_id: ThreadId, mut closure: impl FnMut(&mut [ActiveQuery]), ) { debug_assert!(self.depends_on(to_id, from_id)); // To understand this algorithm, consider this [drawing](https://is.gd/TGLI9v): // // database_key = QB2 // from_id = A // to_id = B // from_stack = [QA1, QA2, QA3] // // self.edges[B] = { C, QC2, [QB1..QB3] } // self.edges[C] = { A, QA2, [QC1..QC3] } // // The cyclic // edge we have // failed to add. // : // A : B C // : // QA1 v QB1 QC1 // ┌► QA2 ┌──► QB2 ┌─► QC2 // │ QA3 ───┘ QB3 ──┘ QC3 ───┐ // │ │ // └───────────────────────────────┘ // // Final output: [QB2, QB3, QC2, QC3, QA2, QA3] let mut id = to_id; let mut key = database_key; while id != from_id { // Looking at the diagram above, the idea is to // take the edge from `to_id` starting at `key` // (inclusive) and down to the end. We can then // load up the next thread (i.e., we start at B/QB2, // and then load up the dependency on C/QC2). let edge = self.edges.get_mut(&id).unwrap(); let prefix = edge .stack .iter_mut() .take_while(|p| p.database_key_index != key) .count(); closure(&mut edge.stack[prefix..]); id = edge.blocked_on_id; key = edge.blocked_on_key; } // Finally, we copy in the results from `from_stack`. let prefix = from_stack .iter_mut() .take_while(|p| p.database_key_index != key) .count(); closure(&mut from_stack[prefix..]); } /// Unblock each blocked runtime (excluding the current one) if some /// query executing in that runtime is participating in cycle fallback. /// /// Returns a boolean (Current, Others) where: /// * Current is true if the current runtime has cycle participants /// with fallback; /// * Others is true if other runtimes were unblocked. pub(super) fn maybe_unblock_runtimes_in_cycle( &mut self, from_id: ThreadId, from_stack: &QueryStack, database_key: DatabaseKeyIndex, to_id: ThreadId, ) -> (bool, bool) { // See diagram in `for_each_cycle_participant`. let mut id = to_id; let mut key = database_key; let mut others_unblocked = false; while id != from_id { let edge = self.edges.get(&id).unwrap(); let prefix = edge .stack .iter() .take_while(|p| p.database_key_index != key) .count(); let next_id = edge.blocked_on_id; let next_key = edge.blocked_on_key; if let Some(cycle) = edge.stack[prefix..] .iter() .rev() .find_map(|aq| aq.cycle.clone()) { // Remove `id` from the list of runtimes blocked on `next_key`: self.query_dependents .get_mut(&next_key) .unwrap() .retain(|r| *r != id); // Unblock runtime so that it can resume execution once lock is released: self.unblock_runtime(id, WaitResult::Cycle(cycle)); others_unblocked = true; } id = next_id; key = next_key; } let prefix = from_stack .iter() .take_while(|p| p.database_key_index != key) .count(); let this_unblocked = from_stack[prefix..].iter().any(|aq| aq.cycle.is_some()); (this_unblocked, others_unblocked) } /// Modifies the graph so that `from_id` is blocked /// on `database_key`, which is being computed by /// `to_id`. /// /// For this to be reasonable, the lock on the /// results table for `database_key` must be held. /// This ensures that computing `database_key` doesn't /// complete before `block_on` executes. /// /// Preconditions: /// * No path from `to_id` to `from_id` /// (i.e., `me.depends_on(to_id, from_id)` is false) /// * `held_mutex` is a read lock (or stronger) on `database_key` pub(super) fn block_on( mut me: MutexGuard<'_, Self>, from_id: ThreadId, database_key: DatabaseKeyIndex, to_id: ThreadId, from_stack: QueryStack, query_mutex_guard: QueryMutexGuard, ) -> (QueryStack, WaitResult) { let condvar = me.add_edge(from_id, database_key, to_id, from_stack); // Release the mutex that prevents `database_key` // from completing, now that the edge has been added. drop(query_mutex_guard); loop { if let Some(stack_and_result) = me.wait_results.remove(&from_id) { debug_assert!(!me.edges.contains_key(&from_id)); return stack_and_result; } condvar.wait(&mut me); } } /// Helper for `block_on`: performs actual graph modification /// to add a dependency edge from `from_id` to `to_id`, which is /// computing `database_key`. fn add_edge( &mut self, from_id: ThreadId, database_key: DatabaseKeyIndex, to_id: ThreadId, from_stack: QueryStack, ) -> Arc { assert_ne!(from_id, to_id); debug_assert!(!self.edges.contains_key(&from_id)); debug_assert!(!self.depends_on(to_id, from_id)); let condvar = Arc::new(Condvar::new()); self.edges.insert( from_id, Edge { blocked_on_id: to_id, blocked_on_key: database_key, stack: from_stack, condvar: condvar.clone(), }, ); self.query_dependents .entry(database_key) .or_default() .push(from_id); condvar } /// Invoked when runtime `to_id` completes executing /// `database_key`. pub(super) fn unblock_runtimes_blocked_on( &mut self, database_key: DatabaseKeyIndex, wait_result: WaitResult, ) { let dependents = self .query_dependents .remove(&database_key) .unwrap_or_default(); for from_id in dependents { self.unblock_runtime(from_id, wait_result.clone()); } } /// Unblock the runtime with the given id with the given wait-result. /// This will cause it resume execution (though it will have to grab /// the lock on this data structure first, to recover the wait result). fn unblock_runtime(&mut self, id: ThreadId, wait_result: WaitResult) { let edge = self.edges.remove(&id).expect("not blocked"); self.wait_results.insert(id, (edge.stack, wait_result)); // Now that we have inserted the `wait_results`, // notify the thread. edge.condvar.notify_one(); } }