From 35ddd36b499fff7ef9fe9a333c072adc52e385e6 Mon Sep 17 00:00:00 2001 From: Niko Matsakis Date: Thu, 28 Oct 2021 17:06:27 -0400 Subject: [PATCH] move the stack instead of cloning Currently, when one thread blocks on another, we clone the stack from that task. This results in a lot of clones, but it also means that we can't mark all the frames involved in a cycle atomically. Instead, when we propagate information between threads, we also propagate the participants of the cycle and so forth. This branch *moves* the stack into the runtime while a thread is blocked, and then moves it back out when the thread resumes. This permits the runtime to mark all the cycle participants at once. It also avoids cloning. --- src/derived/slot.rs | 1 - src/runtime.rs | 113 ++++------------- src/runtime/dependency_graph.rs | 216 ++++++++++++++------------------ src/runtime/local_state.rs | 90 ++++++++----- 4 files changed, 173 insertions(+), 247 deletions(-) diff --git a/src/derived/slot.rs b/src/derived/slot.rs index 30e691cb..1fe333c0 100644 --- a/src/derived/slot.rs +++ b/src/derived/slot.rs @@ -349,7 +349,6 @@ where if result.cycle.is_empty() { ProbeState::Retry } else { - runtime.mark_cycle_participants(&result.cycle); ProbeState::UpToDate(Ok(StampedValue { value: Q::cycle_fallback(db, &result.cycle, &self.key), durability: result.value.durability, diff --git a/src/runtime.rs b/src/runtime.rs index 93ff276d..3cc2bc2c 100644 --- a/src/runtime.rs +++ b/src/runtime.rs @@ -305,16 +305,26 @@ impl Runtime { database_key_index ); - let cycle = self.find_cycle_participants(database_key_index, error); - let crs = self.mutual_cycle_recovery_strategy(db, &cycle); + let mut cycle_participants = vec![]; + let mut stack = self.local_state.take_query_stack(); + let mut dg = self.shared_state.dependency_graph.lock(); + dg.for_each_cycle_participant(error.from, &mut stack, database_key_index, error.to, |aq| { + cycle_participants.push(aq.database_key_index) + }); + dg.for_each_cycle_participant(error.from, &mut stack, database_key_index, error.to, |aq| { + aq.cycle = cycle_participants.clone() + }); + self.local_state.restore_query_stack(stack); + let crs = self.mutual_cycle_recovery_strategy(db, &cycle_participants); debug!( "cycle recovery strategy {:?} for participants {:?}", - crs, cycle + crs, cycle_participants ); + ( crs, CycleError { - cycle, + cycle: cycle_participants, changed_at, durability: Durability::MAX, }, @@ -342,85 +352,6 @@ impl Runtime { } } - fn find_cycle_participants( - &self, - database_key_index: DatabaseKeyIndex, - error: CycleDetected, - ) -> Vec { - let mut query_stack = self.local_state.borrow_query_stack_mut(); - - if error.from == error.to { - // All queries in the cycle is local - let start_index = match query_stack - .iter() - .rposition(|active_query| active_query.database_key_index == database_key_index) - { - Some(i) => i, - None => panic!( - "did not find {:?} on the stack: {:?}", - database_key_index, - query_stack - .iter() - .map(|s| s.database_key_index) - .collect::>() - ), - }; - let mut cycle = Vec::new(); - let cycle_participants = &mut query_stack[start_index..]; - for active_query in &mut *cycle_participants { - cycle.push(active_query.database_key_index); - } - - assert!(!cycle.is_empty()); - - for active_query in cycle_participants { - active_query.cycle = cycle.clone(); - } - - cycle - } else { - // Part of the cycle is on another thread so we need to lock and inspect the shared - // state - let dependency_graph = self.shared_state.dependency_graph.lock(); - - let mut cycle = Vec::new(); - dependency_graph.push_cycle_path( - database_key_index, - error.to, - query_stack.iter().map(|query| query.database_key_index), - &mut cycle, - ); - cycle.push(database_key_index); - - assert!(!cycle.is_empty()); - - for active_query in query_stack - .iter_mut() - .filter(|query| cycle.iter().any(|key| *key == query.database_key_index)) - { - active_query.cycle = cycle.clone(); - } - - cycle - } - } - - pub(crate) fn mark_cycle_participants(&self, participants: &[DatabaseKeyIndex]) { - for active_query in self - .local_state - .borrow_query_stack_mut() - .iter_mut() - .rev() - .take_while(|active_query| { - participants - .iter() - .any(|e| *e == active_query.database_key_index) - }) - { - active_query.cycle = participants.to_owned(); - } - } - /// Try to make this runtime blocked on `other_id`. Returns true /// upon success or false if `other_id` is already blocked on us. pub(crate) fn try_block_on( @@ -446,17 +377,20 @@ impl Runtime { }, }); - Ok(DependencyGraph::block_on( + let stack = self.local_state.take_query_stack(); + + let (stack, result) = DependencyGraph::block_on( dg, self.id(), database_key, other_id, - self.local_state - .borrow_query_stack() - .iter() - .map(|query| query.database_key_index), + stack, query_mutex_guard, - )) + ); + + self.local_state.restore_query_stack(stack); + + Ok(result) } } @@ -550,6 +484,7 @@ impl std::fmt::Debug for SharedState { } } +#[derive(Debug)] struct ActiveQuery { /// What query is executing database_key_index: DatabaseKeyIndex, diff --git a/src/runtime/dependency_graph.rs b/src/runtime/dependency_graph.rs index 33a53499..1cf8b979 100644 --- a/src/runtime/dependency_graph.rs +++ b/src/runtime/dependency_graph.rs @@ -1,11 +1,14 @@ use std::sync::Arc; -use crate::runtime::WaitResult; use crate::{DatabaseKeyIndex, RuntimeId}; use parking_lot::MutexGuard; use rustc_hash::FxHashMap; use smallvec::SmallVec; +use super::{ActiveQuery, WaitResult}; + +type QueryStack = Vec; + #[derive(Debug, Default)] pub(super) struct DependencyGraph { /// A `(K -> V)` pair in this map indicates that the the runtime @@ -21,7 +24,7 @@ pub(super) struct DependencyGraph { /// When a key K completes which had dependent queries Qs blocked on it, /// it stores its `WaitResult` here. As they wake up, each query Q in Qs will /// come here to fetch their results. - wait_results: FxHashMap>, + wait_results: FxHashMap)>, /// Signalled whenever a query with dependents completes. /// Allows those dependents to check if they are ready to unblock. @@ -30,8 +33,9 @@ pub(super) struct DependencyGraph { #[derive(Debug)] struct Edge { - id: RuntimeId, - path: Vec, + blocked_on_id: RuntimeId, + blocked_on_key: DatabaseKeyIndex, + stack: QueryStack, } impl DependencyGraph { @@ -40,14 +44,78 @@ impl DependencyGraph { /// (i.e., there is a path from `from_id` to `to_id` in the graph.) pub(super) fn depends_on(&mut self, from_id: RuntimeId, to_id: RuntimeId) -> bool { let mut p = from_id; - while let Some(q) = self.edges.get(&p).map(|edge| edge.id) { + while let Some(q) = self.edges.get(&p).map(|edge| edge.blocked_on_id) { if q == to_id { return true; } p = q; } - false + p == to_id + } + + /// Invokes `closure` with a `&mut ActiveQuery` for each query that participates in the cycle. + /// The cycle runs as follows: + /// + /// 1. The runtime `from_id`, which has the stack `from_stack`, would like to invoke `database_key`... + /// 2. ...but `database_key` is already being executed by `to_id`... + /// 3. ...and `to_id` is transitively dependent on something which is present on `from_stack`. + pub(super) fn for_each_cycle_participant( + &mut self, + from_id: RuntimeId, + from_stack: &mut QueryStack, + database_key: DatabaseKeyIndex, + to_id: RuntimeId, + mut closure: impl FnMut(&mut ActiveQuery), + ) { + debug_assert!(self.depends_on(to_id, from_id)); + + // To understand this algorithm, consider this [drawing](https://is.gd/TGLI9v): + // + // database_key = QB2 + // from_id = A + // to_id = B + // from_stack = [QA1, QA2, QA3] + // + // self.edges[B] = { C, QC2, [QB1..QB3] } + // self.edges[C] = { A, QA2, [QC1..QC3] } + // + // The cyclic + // edge we have + // failed to add. + // : + // A : B C + // : + // QA1 v QB1 QC1 + // ┌► QA2 ┌──► QB2 ┌─► QC2 + // │ QA3 ───┘ QB3 ──┘ QC3 ───┐ + // │ │ + // └───────────────────────────────┘ + // + // Final output: [QB2, QB3, QC2, QC3, QA2, QA3] + + let mut id = to_id; + let mut key = database_key; + while id != from_id { + // Looking at the diagram above, the idea is to + // take the edge from `to_id` starting at `key` + // (inclusive) and down to the end. We can then + // load up the next thread (i.e., we start at B/QB2, + // and then load up the dependency on C/QC2). + let edge = self.edges.get_mut(&id).unwrap(); + edge.stack + .iter_mut() + .skip_while(|p| p.database_key_index != key) + .for_each(&mut closure); + id = edge.blocked_on_id; + key = edge.blocked_on_key; + } + + // Finally, we copy in the results from `from_stack`. + from_stack + .iter_mut() + .skip_while(|p| p.database_key_index != key) + .for_each(&mut closure); } /// Modifies the graph so that `from_id` is blocked @@ -68,10 +136,10 @@ impl DependencyGraph { from_id: RuntimeId, database_key: DatabaseKeyIndex, to_id: RuntimeId, - path: impl IntoIterator, + from_stack: QueryStack, query_mutex_guard: QueryMutexGuard, - ) -> Option { - me.add_edge(from_id, database_key, to_id, path); + ) -> (QueryStack, Option) { + me.add_edge(from_id, database_key, to_id, from_stack); // Release the mut&mut meex that prevents `database_key` // from completing, now that the edge has been added. @@ -82,9 +150,9 @@ impl DependencyGraph { let condvar = me.condvar.clone(); loop { - if let Some(wait_result) = me.wait_results.remove(&from_id) { + if let Some(stack_and_result) = me.wait_results.remove(&from_id) { debug_assert!(!me.edges.contains_key(&from_id)); - return wait_result; + return stack_and_result; } condvar.wait(&mut me); } @@ -98,7 +166,7 @@ impl DependencyGraph { from_id: RuntimeId, database_key: DatabaseKeyIndex, to_id: RuntimeId, - path: impl IntoIterator, + from_stack: QueryStack, ) { assert_ne!(from_id, to_id); debug_assert!(!self.edges.contains_key(&from_id)); @@ -107,8 +175,9 @@ impl DependencyGraph { self.edges.insert( from_id, Edge { - id: to_id, - path: path.into_iter().chain(Some(database_key.clone())).collect(), + blocked_on_id: to_id, + blocked_on_key: database_key, + stack: from_stack, }, ); self.query_dependents @@ -125,121 +194,20 @@ impl DependencyGraph { to_id: RuntimeId, wait_result: Option, ) { - let dependents = self.remove_edge(database_key, to_id); + let dependents = self + .query_dependents + .remove(&database_key) + .unwrap_or_default(); - for d in dependents { - self.wait_results.insert(d, wait_result.clone()); + for from_id in dependents { + let edge = self.edges.remove(&from_id).expect("no edge for dependent"); + assert_eq!(to_id, edge.blocked_on_id); + self.wait_results + .insert(from_id, (edge.stack, wait_result.clone())); } // Now that we have inserted the `wait_results`, // notify all potential dependents. self.condvar.notify_all(); } - - /// Remove all dependency edges into `database_key` - /// (being computed by `to_id`) and return the list of - /// dependent runtimes that were waiting for `database_key` - /// to complete. - fn remove_edge( - &mut self, - database_key: DatabaseKeyIndex, - to_id: RuntimeId, - ) -> impl IntoIterator { - let vec = self - .query_dependents - .remove(&database_key) - .unwrap_or_default(); - - for from_id in &vec { - let to_id1 = self.edges.remove(from_id).map(|edge| edge.id); - assert_eq!(Some(to_id), to_id1); - } - - vec - } - - pub(super) fn push_cycle_path( - &self, - database_key: DatabaseKeyIndex, - to: RuntimeId, - local_path: impl IntoIterator, - output: &mut Vec, - ) { - let mut current = Some((to, std::slice::from_ref(&database_key))); - let mut last = None; - let mut local_path = Some(local_path); - - loop { - match current.take() { - Some((id, path)) => { - let link_key = path.last().unwrap(); - - output.extend(path.iter().cloned()); - - current = self.edges.get(&id).map(|edge| { - let i = edge.path.iter().rposition(|p| p == link_key).unwrap(); - (edge.id, &edge.path[i + 1..]) - }); - - if current.is_none() { - last = local_path.take().map(|local_path| { - local_path - .into_iter() - .skip_while(move |p| *p != *link_key) - .skip(1) - }); - } - } - None => break, - } - } - - if let Some(iter) = &mut last { - output.extend(iter); - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - - fn dki(n: u32) -> DatabaseKeyIndex { - DatabaseKeyIndex { - group_index: 0, - query_index: 0, - key_index: n, - } - } - - macro_rules! dkivec { - ($($n:expr),*) => { - vec![$(dki($n)),*] - } - } - - #[test] - fn dependency_graph_path1() { - let mut graph = DependencyGraph::default(); - let a = RuntimeId { counter: 0 }; - let b = RuntimeId { counter: 1 }; - graph.add_edge(a, dki(2), b, dkivec![1]); - let mut v = vec![]; - graph.push_cycle_path(dki(1), a, dkivec![3, 2], &mut v); - assert_eq!(v, vec![dki(1), dki(2)]); - } - - #[test] - fn dependency_graph_path2() { - let mut graph = DependencyGraph::default(); - let a = RuntimeId { counter: 0 }; - let b = RuntimeId { counter: 1 }; - let c = RuntimeId { counter: 2 }; - graph.add_edge(a, dki(3), b, dkivec![1]); - graph.add_edge(b, dki(4), c, dkivec![2, 3]); - // assert!(graph.add_edge(c, &1, a, vec![5, 6, 4, 7])); - let mut v = vec![]; - graph.push_cycle_path(dki(1), a, dkivec![5, 6, 4, 7], &mut v); - assert_eq!(v, dkivec![1, 3, 4, 7]); - } } diff --git a/src/runtime/local_state.rs b/src/runtime/local_state.rs index 04fd6f8e..60a4d9e9 100644 --- a/src/runtime/local_state.rs +++ b/src/runtime/local_state.rs @@ -2,7 +2,7 @@ use crate::durability::Durability; use crate::runtime::ActiveQuery; use crate::runtime::Revision; use crate::DatabaseKeyIndex; -use std::cell::{Ref, RefCell, RefMut}; +use std::cell::RefCell; /// State that is specific to a single execution thread. /// @@ -13,15 +13,18 @@ use std::cell::{Ref, RefCell, RefMut}; pub(super) struct LocalState { /// Vector of active queries. /// + /// This is normally `Some`, but it is set to `None` + /// while the query is blocked waiting for a result. + /// /// Unwinding note: pushes onto this vector must be popped -- even /// during unwinding. - query_stack: RefCell>, + query_stack: RefCell>>, } impl Default for LocalState { fn default() -> Self { LocalState { - query_stack: Default::default(), + query_stack: RefCell::new(Some(Vec::new())), } } } @@ -33,6 +36,7 @@ impl LocalState { max_durability: Durability, ) -> ActiveQueryGuard<'_> { let mut query_stack = self.query_stack.borrow_mut(); + let query_stack = query_stack.as_mut().expect("local stack taken"); query_stack.push(ActiveQuery::new(database_key_index, max_durability)); ActiveQueryGuard { local_state: self, @@ -40,28 +44,24 @@ impl LocalState { } } - /// Returns a reference to the active query stack. - /// - /// **Warning:** Because this reference holds the ref-cell lock, - /// you should not use any mutating methods of `LocalState` while - /// reading from it. - pub(super) fn borrow_query_stack(&self) -> Ref<'_, Vec> { - self.query_stack.borrow() - } - - pub(super) fn borrow_query_stack_mut(&self) -> RefMut<'_, Vec> { - self.query_stack.borrow_mut() + fn with_query_stack(&self, c: impl FnOnce(&mut Vec) -> R) -> R { + c(self + .query_stack + .borrow_mut() + .as_mut() + .expect("query stack taken")) } pub(super) fn query_in_progress(&self) -> bool { - !self.query_stack.borrow().is_empty() + self.with_query_stack(|stack| !stack.is_empty()) } pub(super) fn active_query(&self) -> Option { - self.query_stack - .borrow() - .last() - .map(|active_query| active_query.database_key_index) + self.with_query_stack(|stack| { + stack + .last() + .map(|active_query| active_query.database_key_index) + }) } pub(super) fn report_query_read( @@ -70,21 +70,45 @@ impl LocalState { durability: Durability, changed_at: Revision, ) { - if let Some(top_query) = self.query_stack.borrow_mut().last_mut() { - top_query.add_read(input, durability, changed_at); - } + self.with_query_stack(|stack| { + if let Some(top_query) = stack.last_mut() { + top_query.add_read(input, durability, changed_at); + } + }) } pub(super) fn report_untracked_read(&self, current_revision: Revision) { - if let Some(top_query) = self.query_stack.borrow_mut().last_mut() { - top_query.add_untracked_read(current_revision); - } + self.with_query_stack(|stack| { + if let Some(top_query) = stack.last_mut() { + top_query.add_untracked_read(current_revision); + } + }) } pub(super) fn report_synthetic_read(&self, durability: Durability, current_revision: Revision) { - if let Some(top_query) = self.query_stack.borrow_mut().last_mut() { - top_query.add_synthetic_read(durability, current_revision); - } + self.with_query_stack(|stack| { + if let Some(top_query) = stack.last_mut() { + top_query.add_synthetic_read(durability, current_revision); + } + }) + } + + /// Takes the query stack and returns it. This is used when + /// the current thread is blocking. The stack must be restored + /// with [`Self::restore_query_stack`] when the thread unblocks. + pub(super) fn take_query_stack(&self) -> Vec { + assert!( + self.query_stack.borrow().is_some(), + "query stack already taken" + ); + self.query_stack.replace(None).unwrap() + } + + /// Restores a query stack taken with [`Self::take_query_stack`] once + /// the thread unblocks. + pub(super) fn restore_query_stack(&self, stack: Vec) { + assert!(self.query_stack.borrow().is_none(), "query stack not taken"); + self.query_stack.replace(Some(stack)); } } @@ -101,12 +125,12 @@ pub(super) struct ActiveQueryGuard<'me> { impl ActiveQueryGuard<'_> { fn pop_helper(&self) -> ActiveQuery { - let mut query_stack = self.local_state.query_stack.borrow_mut(); + self.local_state.with_query_stack(|stack| { + // Sanity check: pushes and pops should be balanced. + assert_eq!(stack.len(), self.push_len); - // Sanity check: pushes and pops should be balanced. - assert_eq!(query_stack.len(), self.push_len); - - query_stack.pop().unwrap() + stack.pop().unwrap() + }) } /// Invoked when the query has successfully completed execution.