diff --git a/src/derived.rs b/src/derived.rs index 72750b25..a35dcf15 100644 --- a/src/derived.rs +++ b/src/derived.rs @@ -11,6 +11,8 @@ use crate::QueryFunction; use crate::QueryStorageOps; use crate::UncheckedMutQueryStorageOps; use log::debug; +use parking_lot::Condvar; +use parking_lot::Mutex; use parking_lot::{RwLock, RwLockUpgradableReadGuard}; use rustc_hash::FxHashMap; use std::marker::PhantomData; @@ -41,6 +43,18 @@ where { map: RwLock>>, policy: PhantomData, + + /// This cond var is used when one thread is waiting on another to + /// produce some specific key. In that case, the thread producing + /// the key will signal the cond-var. The threads awaiting the key + /// will check in `map` to see if their key is present and (if + /// not) await the cond-var. + signal_cond_var: Condvar, + + /// Mutex used for `signal_cond_var`. Note that this mutex is + /// never acquired while holding the lock on `map` (but you may + /// acquire the `map` lock while holding this mutex). + signal_mutex: Mutex<()>, } pub trait MemoizationPolicy @@ -151,6 +165,8 @@ where DerivedStorage { map: RwLock::new(FxHashMap::default()), policy: PhantomData, + signal_cond_var: Default::default(), + signal_mutex: Default::default(), } } } @@ -332,6 +348,49 @@ where Err(map) } + /// If some other thread is tasked with producing a memoized + /// result for this value, then wait for them. + /// + /// Pre-conditions: + /// - we have installed ourselves in the dependency graph and set the + /// bool that informs the producer we are waiting + /// - `self.map` must be locked (with `map_guard` as the guard) + fn await_other_thread( + &self, + map_guard: MapGuard, + revision_now: Revision, + key: &Q::Key, + ) -> StampedValue + where + MapGuard: Deref>>, + { + // Intentionally release the lock on map. We cannot be holding + // it while we are sleeping! + std::mem::drop(map_guard); + + let mut signal_lock_guard = self.signal_mutex.lock(); + + loop { + { + let map = self.map.read(); + + if let Some(QueryState::Memoized(m)) = map.get(key) { + assert_eq!(m.verified_at, revision_now); + return if let Some(value) = &m.value { + StampedValue { + value: value.clone(), + changed_at: m.changed_at, + } + } else { + panic!("awaiting production of non-memoized value"); + }; + } + } + + self.signal_cond_var.wait(&mut signal_lock_guard); + } + } + fn overwrite_placeholder( &self, runtime: &Runtime,