From 59ab0bd7a24f80c32757dfc1fb8aa4db29ea3efc Mon Sep 17 00:00:00 2001 From: Niko Matsakis Date: Tue, 30 Oct 2018 20:18:56 -0400 Subject: [PATCH 1/4] introduce simple callbacks that can be used to build better logging --- src/derived.rs | 25 +++++++++++--- src/input.rs | 32 ++++++++++++++--- src/lib.rs | 91 ++++++++++++++++++++++++++++++++++++++++++++++--- src/plumbing.rs | 10 ++++-- src/runtime.rs | 26 ++++++++++++-- 5 files changed, 166 insertions(+), 18 deletions(-) diff --git a/src/derived.rs b/src/derived.rs index 8aa15e4f..472e667a 100644 --- a/src/derived.rs +++ b/src/derived.rs @@ -10,8 +10,7 @@ use crate::runtime::Revision; use crate::runtime::Runtime; use crate::runtime::RuntimeId; use crate::runtime::StampedValue; -use crate::Database; -use crate::SweepStrategy; +use crate::{Database, Event, EventKind, SweepStrategy}; use log::{debug, info}; use parking_lot::Mutex; use parking_lot::{RwLock, RwLockUpgradableReadGuard}; @@ -258,7 +257,7 @@ where ); // First, do a check with a read-lock. - match self.probe(self.map.read(), runtime, revision_now, descriptor, key) { + match self.probe(db, self.map.read(), runtime, revision_now, descriptor, key) { ProbeState::UpToDate(v) => return v, ProbeState::StaleOrAbsent(_guard) => (), } @@ -283,6 +282,7 @@ where // already. (This permits other readers but prevents anyone // else from running `read_upgrade` at the same time.) let mut old_memo = match self.probe( + db, self.map.upgradable_read(), runtime, revision_now, @@ -314,6 +314,13 @@ where key ); + db.salsa_event(|| Event { + id: runtime.id(), + kind: EventKind::DidValidateMemoizedValue { + descriptor: descriptor.clone(), + }, + }); + self.overwrite_placeholder( runtime, descriptor, @@ -322,13 +329,14 @@ where &value, panic_guard, ); + return Ok(value); } } // Query was not previously executed, or value is potentially // stale, or value is absent. Let's execute! - let mut result = runtime.execute_query_implementation(descriptor, || { + let mut result = runtime.execute_query_implementation(db, descriptor, || { info!("{:?}({:?}): executing query", Q::default(), key); if !self.should_track_inputs(key) { @@ -434,6 +442,7 @@ where /// `map` will have been released. fn probe( &self, + db: &DB, map: MapGuard, runtime: &Runtime, revision_now: Revision, @@ -454,6 +463,14 @@ where // can complete. std::mem::drop(map); + db.salsa_event(|| Event { + id: db.salsa_runtime().id(), + kind: EventKind::WillBlockOn { + other_id, + descriptor: descriptor.clone(), + }, + }); + let value = rx.recv().unwrap(); ProbeState::UpToDate(Ok(value)) } diff --git a/src/input.rs b/src/input.rs index 144cbc69..f501a290 100644 --- a/src/input.rs +++ b/src/input.rs @@ -7,6 +7,8 @@ use crate::runtime::ChangedAt; use crate::runtime::Revision; use crate::runtime::StampedValue; use crate::Database; +use crate::Event; +use crate::EventKind; use crate::Query; use crate::SweepStrategy; use log::debug; @@ -60,7 +62,14 @@ where panic!("no value set for {:?}({:?})", Q::default(), key) } - fn set_common(&self, db: &DB, key: &Q::Key, value: Q::Value, is_constant: IsConstant) { + fn set_common( + &self, + db: &DB, + key: &Q::Key, + descriptor: &DB::QueryDescriptor, + value: Q::Value, + is_constant: IsConstant, + ) { let key = key.clone(); // The value is changing, so even if we are setting this to a @@ -74,6 +83,13 @@ where db.salsa_runtime().with_incremented_revision(|next_revision| { let mut map = self.map.write(); + db.salsa_event(|| Event { + id: db.salsa_runtime().id(), + kind: EventKind::WillChangeInputValue { + descriptor: descriptor.clone(), + }, + }); + // Do this *after* we acquire the lock, so that we are not // racing with somebody else to modify this same cell. // (Otherwise, someone else might write a *newer* revision @@ -190,16 +206,22 @@ where Q: Query, DB: Database, { - fn set(&self, db: &DB, key: &Q::Key, value: Q::Value) { + fn set(&self, db: &DB, key: &Q::Key, descriptor: &DB::QueryDescriptor, value: Q::Value) { log::debug!("{:?}({:?}) = {:?}", Q::default(), key, value); - self.set_common(db, key, value, IsConstant(false)) + self.set_common(db, key, descriptor, value, IsConstant(false)) } - fn set_constant(&self, db: &DB, key: &Q::Key, value: Q::Value) { + fn set_constant( + &self, + db: &DB, + key: &Q::Key, + descriptor: &DB::QueryDescriptor, + value: Q::Value, + ) { log::debug!("{:?}({:?}) = {:?}", Q::default(), key, value); - self.set_common(db, key, value, IsConstant(true)) + self.set_common(db, key, descriptor, value, IsConstant(true)) } } diff --git a/src/lib.rs b/src/lib.rs index 3d225ecd..22f1eaaa 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -17,11 +17,12 @@ use crate::plumbing::QueryStorageMassOps; use crate::plumbing::QueryStorageOps; use crate::plumbing::UncheckedMutQueryStorageOps; use derive_new::new; -use std::fmt::Debug; +use std::fmt::{self, Debug}; use std::hash::Hash; -pub use crate::runtime::Runtime; pub use crate::runtime::RevisionGuard; +pub use crate::runtime::Runtime; +pub use crate::runtime::RuntimeId; /// The base trait which your "query context" must implement. Gives /// access to the salsa runtime, which you must embed into your query @@ -62,6 +63,86 @@ pub trait Database: plumbing::DatabaseStorageTypes + plumbing::DatabaseOps { { >::get_query_table(self) } + + /// This function is invoked at key points in the salsa + /// runtime. It permits the database to be customized and to + /// inject logging or other custom behavior. + fn salsa_event(&self, event_fn: impl Fn() -> Event) { + #![allow(unused_variables)] + } +} + +pub struct Event { + pub id: RuntimeId, + pub kind: EventKind, +} + +impl fmt::Debug for Event { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("Event") + .field("id", &self.id) + .field("kind", &self.kind) + .finish() + } +} + +pub enum EventKind { + /// Occurs when we found that all inputs to a memoized value are + /// up-to-date and hence the value can be re-used without + /// executing the closure. + /// + /// Executes before the "re-used" value is returned. + DidValidateMemoizedValue { descriptor: DB::QueryDescriptor }, + + /// Indicates that another thread (with id `other_id`) is processing the + /// given query (`descriptor`), so we will block until they + /// finish. + /// + /// Executes after we have registered with the other thread but + /// before they have answered us. + /// + /// (NB: you can find the `id` of the current thread via the + /// `salsa_runtime`) + WillBlockOn { + other_id: RuntimeId, + descriptor: DB::QueryDescriptor, + }, + + /// Indicates that the input value will change after this + /// callback, e.g. due to a call to `set`. + WillChangeInputValue { descriptor: DB::QueryDescriptor }, + + /// Indicates that the function for this query will be executed. + /// This is either because it has never executed before or because + /// its inputs may be out of date. + WillExecute { descriptor: DB::QueryDescriptor }, +} + +impl fmt::Debug for EventKind { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + EventKind::DidValidateMemoizedValue { descriptor } => fmt + .debug_struct("DidValidateMemoizedValue") + .field("descriptor", descriptor) + .finish(), + EventKind::WillBlockOn { + other_id, + descriptor, + } => fmt + .debug_struct("WillBlockOn") + .field("other_id", other_id) + .field("descriptor", descriptor) + .finish(), + EventKind::WillChangeInputValue { descriptor } => fmt + .debug_struct("WillChangeInputValue") + .field("descriptor", descriptor) + .finish(), + EventKind::WillExecute { descriptor } => fmt + .debug_struct("WillExecute") + .field("descriptor", descriptor) + .finish(), + } + } } /// The sweep strategy controls what data we will keep/discard when we @@ -149,7 +230,8 @@ where where Q::Storage: plumbing::InputQueryStorageOps, { - self.storage.set(self.db, &key, value); + self.storage + .set(self.db, &key, &self.descriptor(&key), value); } /// Assign a value to an "input query", with the additional @@ -159,7 +241,8 @@ where where Q::Storage: plumbing::InputQueryStorageOps, { - self.storage.set_constant(self.db, &key, value); + self.storage + .set_constant(self.db, &key, &self.descriptor(&key), value); } /// Assigns a value to the query **bypassing the normal diff --git a/src/plumbing.rs b/src/plumbing.rs index 66482ed1..d6810665 100644 --- a/src/plumbing.rs +++ b/src/plumbing.rs @@ -121,9 +121,15 @@ where DB: Database, Q: Query, { - fn set(&self, db: &DB, key: &Q::Key, new_value: Q::Value); + fn set(&self, db: &DB, key: &Q::Key, descriptor: &DB::QueryDescriptor, new_value: Q::Value); - fn set_constant(&self, db: &DB, key: &Q::Key, new_value: Q::Value); + fn set_constant( + &self, + db: &DB, + key: &Q::Key, + descriptor: &DB::QueryDescriptor, + new_value: Q::Value, + ); } /// An optional trait that is implemented for "user mutable" storage: diff --git a/src/runtime.rs b/src/runtime.rs index 85f82ff6..41ae6461 100644 --- a/src/runtime.rs +++ b/src/runtime.rs @@ -1,4 +1,4 @@ -use crate::{Database, SweepStrategy}; +use crate::{Database, Event, EventKind, SweepStrategy}; use lock_api::RawRwLock; use log::debug; use parking_lot::{Mutex, RwLock, RwLockReadGuard}; @@ -147,11 +147,23 @@ where RevisionGuard::new(&self.shared_state) } + /// The unique identifier attached to this `SalsaRuntime`. Each + /// forked runtime has a distinct identifier. #[inline] - pub(crate) fn id(&self) -> RuntimeId { + pub fn id(&self) -> RuntimeId { self.id } + /// Returns the descriptor for the query that this thread is + /// actively executing (if any). + pub fn active_query(&self) -> Option { + self.local_state + .borrow() + .query_stack + .last() + .map(|active_query| active_query.descriptor.clone()) + } + /// Read current value of the revision counter. #[inline] pub(crate) fn current_revision(&self) -> Revision { @@ -239,11 +251,19 @@ where pub(crate) fn execute_query_implementation( &self, + db: &DB, descriptor: &DB::QueryDescriptor, execute: impl FnOnce() -> V, ) -> ComputedQueryResult { debug!("{:?}: execute_query_implementation invoked", descriptor); + db.salsa_event(|| Event { + id: db.salsa_runtime().id(), + kind: EventKind::WillExecute { + descriptor: descriptor.clone(), + }, + }); + // Push the active query onto the stack. let push_len = { let mut local_state = self.local_state.borrow_mut(); @@ -524,7 +544,7 @@ impl ActiveQuery { } #[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)] -pub(crate) struct RuntimeId { +pub struct RuntimeId { counter: usize, } From e35530055483e994da68b34a5e35c601d278d551 Mon Sep 17 00:00:00 2001 From: Niko Matsakis Date: Tue, 23 Oct 2018 05:25:09 -0400 Subject: [PATCH 2/4] use callbacks in parallel test --- tests/parallel/setup.rs | 14 ++++++++++++++ tests/parallel/true_parallel.rs | 8 +++++--- 2 files changed, 19 insertions(+), 3 deletions(-) diff --git a/tests/parallel/setup.rs b/tests/parallel/setup.rs index be89f720..81d9f1be 100644 --- a/tests/parallel/setup.rs +++ b/tests/parallel/setup.rs @@ -56,6 +56,9 @@ pub(crate) struct KnobsStruct { /// threads to ensure we reach various weird states. pub(crate) signal: Arc, + /// When this database is about to block, send a signal. + pub(crate) signal_on_will_block: Cell, + /// Invocations of `sum` will signal this stage on entry. pub(crate) sum_signal_on_entry: Cell, @@ -114,6 +117,17 @@ impl Database for ParDatabaseImpl { fn salsa_runtime(&self) -> &salsa::Runtime { &self.runtime } + + fn salsa_event(&self, event_fn: impl Fn() -> salsa::Event) { + let event = event_fn(); + match event.kind { + salsa::EventKind::WillBlockOn { .. } => { + self.signal(self.knobs().signal_on_will_block.get()); + } + + _ => {} + } + } } impl ParallelDatabase for ParDatabaseImpl { diff --git a/tests/parallel/true_parallel.rs b/tests/parallel/true_parallel.rs index e8073dac..278e2e00 100644 --- a/tests/parallel/true_parallel.rs +++ b/tests/parallel/true_parallel.rs @@ -66,13 +66,15 @@ fn true_parallel_same_keys() { } }); - // Thread 2 will sync barrier *just* before calling `sum`. Doesn't - // guarantee the race we want but makes it highly likely. + // Thread 2 will wait until Thread 1 has entered sum and then -- + // once it has set tself to block -- signal Thread 1 to + // continue. This way, we test out the mechanism of one thread + // blocking on another. let thread2 = std::thread::spawn({ let db = db.fork(); move || { db.knobs().signal.wait_for(1); - db.knobs().signal.signal(2); + db.knobs().signal_on_will_block.set(2); db.sum("abc") } }); From cf9db9cc7f113ccb98957c6cb5a1cc3dac6fdfde Mon Sep 17 00:00:00 2001 From: Niko Matsakis Date: Tue, 30 Oct 2018 20:32:05 -0400 Subject: [PATCH 3/4] fix typo --- tests/parallel/true_parallel.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/parallel/true_parallel.rs b/tests/parallel/true_parallel.rs index 278e2e00..834ce514 100644 --- a/tests/parallel/true_parallel.rs +++ b/tests/parallel/true_parallel.rs @@ -67,7 +67,7 @@ fn true_parallel_same_keys() { }); // Thread 2 will wait until Thread 1 has entered sum and then -- - // once it has set tself to block -- signal Thread 1 to + // once it has set itself to block -- signal Thread 1 to // continue. This way, we test out the mechanism of one thread // blocking on another. let thread2 = std::thread::spawn({ From 2e3f8b1a3d6e667731447857427be4fed0a168b5 Mon Sep 17 00:00:00 2001 From: Niko Matsakis Date: Tue, 30 Oct 2018 20:39:56 -0400 Subject: [PATCH 4/4] name the field `runtime_id` --- src/derived.rs | 6 +++--- src/input.rs | 2 +- src/lib.rs | 12 ++++++------ src/runtime.rs | 2 +- 4 files changed, 11 insertions(+), 11 deletions(-) diff --git a/src/derived.rs b/src/derived.rs index 472e667a..8df16108 100644 --- a/src/derived.rs +++ b/src/derived.rs @@ -315,7 +315,7 @@ where ); db.salsa_event(|| Event { - id: runtime.id(), + runtime_id: runtime.id(), kind: EventKind::DidValidateMemoizedValue { descriptor: descriptor.clone(), }, @@ -464,9 +464,9 @@ where std::mem::drop(map); db.salsa_event(|| Event { - id: db.salsa_runtime().id(), + runtime_id: db.salsa_runtime().id(), kind: EventKind::WillBlockOn { - other_id, + other_runtime_id: other_id, descriptor: descriptor.clone(), }, }); diff --git a/src/input.rs b/src/input.rs index f501a290..278f5b66 100644 --- a/src/input.rs +++ b/src/input.rs @@ -84,7 +84,7 @@ where let mut map = self.map.write(); db.salsa_event(|| Event { - id: db.salsa_runtime().id(), + runtime_id: db.salsa_runtime().id(), kind: EventKind::WillChangeInputValue { descriptor: descriptor.clone(), }, diff --git a/src/lib.rs b/src/lib.rs index 22f1eaaa..5039e167 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -73,14 +73,14 @@ pub trait Database: plumbing::DatabaseStorageTypes + plumbing::DatabaseOps { } pub struct Event { - pub id: RuntimeId, + pub runtime_id: RuntimeId, pub kind: EventKind, } impl fmt::Debug for Event { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { fmt.debug_struct("Event") - .field("id", &self.id) + .field("runtime_id", &self.runtime_id) .field("kind", &self.kind) .finish() } @@ -94,7 +94,7 @@ pub enum EventKind { /// Executes before the "re-used" value is returned. DidValidateMemoizedValue { descriptor: DB::QueryDescriptor }, - /// Indicates that another thread (with id `other_id`) is processing the + /// Indicates that another thread (with id `other_runtime_id`) is processing the /// given query (`descriptor`), so we will block until they /// finish. /// @@ -104,7 +104,7 @@ pub enum EventKind { /// (NB: you can find the `id` of the current thread via the /// `salsa_runtime`) WillBlockOn { - other_id: RuntimeId, + other_runtime_id: RuntimeId, descriptor: DB::QueryDescriptor, }, @@ -126,11 +126,11 @@ impl fmt::Debug for EventKind { .field("descriptor", descriptor) .finish(), EventKind::WillBlockOn { - other_id, + other_runtime_id, descriptor, } => fmt .debug_struct("WillBlockOn") - .field("other_id", other_id) + .field("other_runtime_id", other_runtime_id) .field("descriptor", descriptor) .finish(), EventKind::WillChangeInputValue { descriptor } => fmt diff --git a/src/runtime.rs b/src/runtime.rs index 41ae6461..1e158cb3 100644 --- a/src/runtime.rs +++ b/src/runtime.rs @@ -258,7 +258,7 @@ where debug!("{:?}: execute_query_implementation invoked", descriptor); db.salsa_event(|| Event { - id: db.salsa_runtime().id(), + runtime_id: db.salsa_runtime().id(), kind: EventKind::WillExecute { descriptor: descriptor.clone(), },