mirror of
https://github.com/salsa-rs/salsa.git
synced 2025-01-23 05:07:27 +00:00
Merge pull request #57 from nikomatsakis/issue-53-frozen-revision
Issue 53 frozen revision
This commit is contained in:
commit
bfc56d2591
7 changed files with 179 additions and 46 deletions
|
@ -12,6 +12,7 @@ readme = "README.md"
|
|||
derive-new = "0.5.5"
|
||||
rustc-hash = "1.0"
|
||||
parking_lot = "0.6.4"
|
||||
lock_api = "0.1.4"
|
||||
indexmap = "1.0.1"
|
||||
log = "0.4.5"
|
||||
smallvec = "0.6.5"
|
||||
|
|
|
@ -209,7 +209,7 @@ where
|
|||
) -> Result<StampedValue<Q::Value>, CycleDetected> {
|
||||
let runtime = db.salsa_runtime();
|
||||
|
||||
let _read_lock = runtime.freeze_revision();
|
||||
let _read_lock = runtime.start_query();
|
||||
|
||||
let revision_now = runtime.current_revision();
|
||||
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
use crate::Database;
|
||||
use lock_api::RawRwLock;
|
||||
use log::debug;
|
||||
use parking_lot::{Mutex, RwLock, RwLockReadGuard, RwLockUpgradableReadGuard};
|
||||
use rustc_hash::{FxHashMap, FxHasher};
|
||||
|
@ -100,18 +101,42 @@ where
|
|||
/// (However, if other threads invoke `increment_revision`, then
|
||||
/// the current revision may be considered cancelled, which can be
|
||||
/// observed through `is_current_revision_canceled`.)
|
||||
pub(crate) fn freeze_revision(&self) -> Option<RevisionGuard<'_, DB>> {
|
||||
pub(crate) fn start_query(&self) -> Option<QueryGuard<'_, DB>> {
|
||||
let mut local_state = self.local_state.borrow_mut();
|
||||
if !local_state.query_in_progress {
|
||||
local_state.query_in_progress = true;
|
||||
let guard = self.shared_state.query_lock.read();
|
||||
|
||||
Some(RevisionGuard::new(self, guard))
|
||||
Some(QueryGuard::new(self, guard))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
/// Locks the current revision and returns a guard object that --
|
||||
/// when dropped -- will unlock it. While a revision is locked,
|
||||
/// queries can execute as normal but calls to `set` will block
|
||||
/// (note that calls to `set` *do* set the cancellation flag,
|
||||
/// which you can can check with
|
||||
/// `is_current_revision_canceled`). The intention is that you can
|
||||
/// lock the revision and then do multiple queries, thus
|
||||
/// guaranteeing that all of those queries execute against a
|
||||
/// consistent "view" of the database.
|
||||
///
|
||||
/// Note that, unlike most RAII guards, the guard returned by this
|
||||
/// method does not borrow the database or the runtime
|
||||
/// (internally, it uses an `Arc` handle). This means it can be
|
||||
/// sent to other threads without a problem -- the lock persists
|
||||
/// as long as the guard has not yet been dropped.
|
||||
///
|
||||
/// ### Deadlock warning
|
||||
///
|
||||
/// If you invoke `lock_revision` and then, from the same thread,
|
||||
/// call `set` on some input, you will get a deadlock.
|
||||
pub fn lock_revision(&self) -> RevisionGuard<DB> {
|
||||
RevisionGuard::new(&self.shared_state)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub(crate) fn id(&self) -> RuntimeId {
|
||||
self.id
|
||||
|
@ -379,18 +404,18 @@ impl<DB: Database> Default for LocalState<DB> {
|
|||
}
|
||||
}
|
||||
|
||||
pub(crate) struct RevisionGuard<'db, DB: Database + 'db> {
|
||||
pub(crate) struct QueryGuard<'db, DB: Database + 'db> {
|
||||
db: &'db Runtime<DB>,
|
||||
lock: RwLockReadGuard<'db, ()>,
|
||||
}
|
||||
|
||||
impl<'db, DB: Database> RevisionGuard<'db, DB> {
|
||||
impl<'db, DB: Database> QueryGuard<'db, DB> {
|
||||
fn new(db: &'db Runtime<DB>, lock: RwLockReadGuard<'db, ()>) -> Self {
|
||||
Self { db, lock }
|
||||
}
|
||||
}
|
||||
|
||||
impl<'db, DB: Database> Drop for RevisionGuard<'db, DB> {
|
||||
impl<'db, DB: Database> Drop for QueryGuard<'db, DB> {
|
||||
fn drop(&mut self) {
|
||||
let mut local_state = self.db.local_state.borrow_mut();
|
||||
assert!(local_state.query_in_progress);
|
||||
|
@ -398,6 +423,38 @@ impl<'db, DB: Database> Drop for RevisionGuard<'db, DB> {
|
|||
}
|
||||
}
|
||||
|
||||
/// The guard returned by `lock_revision`. Once this guard is dropped,
|
||||
/// the revision will be unlocked, and calls to `set` can proceed.
|
||||
pub struct RevisionGuard<DB: Database> {
|
||||
shared_state: Arc<SharedState<DB>>,
|
||||
}
|
||||
|
||||
impl<DB: Database> RevisionGuard<DB> {
|
||||
/// Creates a new revision guard, acquiring the query read-lock in the process.
|
||||
fn new(shared_state: &Arc<SharedState<DB>>) -> Self {
|
||||
// Acquire the read-lock without using RAII. This requires the
|
||||
// unsafe keyword because, if we were to unlock the lock this way,
|
||||
// we would screw up other people using the safe APIs.
|
||||
unsafe {
|
||||
shared_state.query_lock.raw().lock_shared();
|
||||
}
|
||||
|
||||
Self {
|
||||
shared_state: shared_state.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<DB: Database> Drop for RevisionGuard<DB> {
|
||||
fn drop(&mut self) {
|
||||
// Release our read-lock without using RAII. As in `new`
|
||||
// above, this requires the unsafe keyword.
|
||||
unsafe {
|
||||
self.shared_state.query_lock.raw().unlock_shared();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct ActiveQuery<DB: Database> {
|
||||
/// What query is executing
|
||||
descriptor: DB::QueryDescriptor,
|
||||
|
|
|
@ -3,4 +3,6 @@ mod setup;
|
|||
mod cancellation;
|
||||
mod independent;
|
||||
mod race;
|
||||
mod revision_lock;
|
||||
mod signal;
|
||||
mod true_parallel;
|
||||
|
|
72
tests/parallel/revision_lock.rs
Normal file
72
tests/parallel/revision_lock.rs
Normal file
|
@ -0,0 +1,72 @@
|
|||
use crate::setup::{Input, ParDatabase, ParDatabaseImpl};
|
||||
use crate::signal::Signal;
|
||||
use salsa::{Database, ParallelDatabase};
|
||||
use std::sync::Arc;
|
||||
|
||||
/// Add test where a call to `sum` is cancelled by a simultaneous
|
||||
/// write. Check that we recompute the result in next revision, even
|
||||
/// though none of the inputs have changed.
|
||||
#[test]
|
||||
fn in_par_get_set_cancellation() {
|
||||
let db = ParDatabaseImpl::default();
|
||||
|
||||
db.query(Input).set('a', 1);
|
||||
|
||||
let signal = Arc::new(Signal::default());
|
||||
|
||||
let lock = db.salsa_runtime().lock_revision();
|
||||
let thread1 = std::thread::spawn({
|
||||
let db = db.fork();
|
||||
let signal = signal.clone();
|
||||
move || {
|
||||
// Check that cancellation flag is not yet set, because
|
||||
// `set` cannot have been called yet.
|
||||
assert!(!db.salsa_runtime().is_current_revision_canceled());
|
||||
|
||||
// Signal other thread to proceed.
|
||||
signal.signal(1);
|
||||
|
||||
// Wait for other thread to signal cancellation
|
||||
while !db.salsa_runtime().is_current_revision_canceled() {
|
||||
std::thread::yield_now();
|
||||
}
|
||||
|
||||
// Since we have not yet released revision lock, we should
|
||||
// see 1 here.
|
||||
let v = db.input('a');
|
||||
|
||||
// Release the lock.
|
||||
std::mem::drop(lock);
|
||||
|
||||
// This could come before or after the `set` in the other
|
||||
// thread.
|
||||
let w = db.input('a');
|
||||
|
||||
(v, w)
|
||||
}
|
||||
});
|
||||
|
||||
let thread2 = std::thread::spawn({
|
||||
let db = db.fork();
|
||||
let signal = signal.clone();
|
||||
move || {
|
||||
// Wait until thread 1 has asserted that they are not cancelled
|
||||
// before we invoke `set.`
|
||||
signal.wait_for(1);
|
||||
|
||||
// This will block until thread1 drops the revision lock.
|
||||
db.query(Input).set('a', 2);
|
||||
|
||||
db.input('a')
|
||||
}
|
||||
});
|
||||
|
||||
// The first read is done with the revision lock, so it *must* see
|
||||
// `1`; the second read could see either `1` or `2`.
|
||||
let (a, b) = thread1.join().unwrap();
|
||||
assert_eq!(a, 1);
|
||||
assert!(b == 1 || b == 2, "saw unexpected value for b: {}", b);
|
||||
|
||||
let c = thread2.join().unwrap();
|
||||
assert_eq!(c, 2);
|
||||
}
|
|
@ -1,4 +1,4 @@
|
|||
use parking_lot::{Condvar, Mutex};
|
||||
use crate::signal::Signal;
|
||||
use salsa::Database;
|
||||
use salsa::ParallelDatabase;
|
||||
use std::cell::Cell;
|
||||
|
@ -70,45 +70,6 @@ pub(crate) struct KnobsStruct {
|
|||
pub(crate) sum_signal_on_exit: Cell<usize>,
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub(crate) struct Signal {
|
||||
value: Mutex<usize>,
|
||||
cond_var: Condvar,
|
||||
}
|
||||
|
||||
impl Signal {
|
||||
pub(crate) fn signal(&self, stage: usize) {
|
||||
log::debug!("signal({})", stage);
|
||||
|
||||
// This check avoids acquiring the lock for things that will
|
||||
// clearly be a no-op. Not *necessary* but helps to ensure we
|
||||
// are more likely to encounter weird race conditions;
|
||||
// otherwise calls to `sum` will tend to be unnecessarily
|
||||
// synchronous.
|
||||
if stage > 0 {
|
||||
let mut v = self.value.lock();
|
||||
if stage > *v {
|
||||
*v = stage;
|
||||
self.cond_var.notify_all();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Waits until the given condition is true; the fn is invoked
|
||||
/// with the current stage.
|
||||
pub(crate) fn wait_for(&self, stage: usize) {
|
||||
log::debug!("wait_for({})", stage);
|
||||
|
||||
// As above, avoid lock if clearly a no-op.
|
||||
if stage > 0 {
|
||||
let mut v = self.value.lock();
|
||||
while *v < stage {
|
||||
self.cond_var.wait(&mut v);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn sum(db: &impl ParDatabase, key: &'static str) -> usize {
|
||||
let mut sum = 0;
|
||||
|
||||
|
|
40
tests/parallel/signal.rs
Normal file
40
tests/parallel/signal.rs
Normal file
|
@ -0,0 +1,40 @@
|
|||
use parking_lot::{Condvar, Mutex};
|
||||
|
||||
#[derive(Default)]
|
||||
pub(crate) struct Signal {
|
||||
value: Mutex<usize>,
|
||||
cond_var: Condvar,
|
||||
}
|
||||
|
||||
impl Signal {
|
||||
pub(crate) fn signal(&self, stage: usize) {
|
||||
log::debug!("signal({})", stage);
|
||||
|
||||
// This check avoids acquiring the lock for things that will
|
||||
// clearly be a no-op. Not *necessary* but helps to ensure we
|
||||
// are more likely to encounter weird race conditions;
|
||||
// otherwise calls to `sum` will tend to be unnecessarily
|
||||
// synchronous.
|
||||
if stage > 0 {
|
||||
let mut v = self.value.lock();
|
||||
if stage > *v {
|
||||
*v = stage;
|
||||
self.cond_var.notify_all();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Waits until the given condition is true; the fn is invoked
|
||||
/// with the current stage.
|
||||
pub(crate) fn wait_for(&self, stage: usize) {
|
||||
log::debug!("wait_for({})", stage);
|
||||
|
||||
// As above, avoid lock if clearly a no-op.
|
||||
if stage > 0 {
|
||||
let mut v = self.value.lock();
|
||||
while *v < stage {
|
||||
self.cond_var.wait(&mut v);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in a new issue