mirror of
https://github.com/salsa-rs/salsa.git
synced 2025-01-23 13:10:19 +00:00
improve parallel cycle tests
Before we could not observe the case where: * thread A is blocked on B * cycle detected in thread B * some participants are on thread A and have to be marked (In particular, I commented out some code that seemed necessary and didn't see any tests fail)
This commit is contained in:
parent
187bd54fa9
commit
79f8acc3aa
2 changed files with 86 additions and 48 deletions
|
@ -6,78 +6,118 @@ use crate::setup::{Knobs, ParDatabase, ParDatabaseImpl};
|
||||||
use salsa::{Cancelled, ParallelDatabase};
|
use salsa::{Cancelled, ParallelDatabase};
|
||||||
use test_env_log::test;
|
use test_env_log::test;
|
||||||
|
|
||||||
pub(crate) fn recover_cycle(_db: &dyn ParDatabase, _cycle: &[String], key: &i32) -> i32 {
|
// Recover cycle test:
|
||||||
key * 10
|
//
|
||||||
|
// The pattern is as follows.
|
||||||
|
//
|
||||||
|
// Thread A Thread B
|
||||||
|
// -------- --------
|
||||||
|
// a1 b1
|
||||||
|
// | wait for stage 1 (blocks)
|
||||||
|
// signal stage 1 |
|
||||||
|
// wait for stage 2 (blocks) (unblocked)
|
||||||
|
// | signal stage 2
|
||||||
|
// (unblocked) wait for stage 3 (blocks)
|
||||||
|
// a2 |
|
||||||
|
// b1 (blocks -> stage 3) |
|
||||||
|
// | (unblocked)
|
||||||
|
// | b2
|
||||||
|
// | a1 (cycle detected, recovers)
|
||||||
|
// | b2 completes, recovers
|
||||||
|
// | b1 completes, recovers
|
||||||
|
// a2 sees cycle, recovers
|
||||||
|
// a1 completes, recovers
|
||||||
|
|
||||||
|
pub(crate) fn recover_from_cycle_a1(_db: &dyn ParDatabase, _cycle: &[String], key: &i32) -> i32 {
|
||||||
|
log::debug!("recover_from_cycle_a1");
|
||||||
|
key * 10 + 1
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn recover_cycle_a(db: &dyn ParDatabase, key: i32) -> i32 {
|
pub(crate) fn recover_from_cycle_a2(_db: &dyn ParDatabase, _cycle: &[String], key: &i32) -> i32 {
|
||||||
// Wait to create the cycle until both threads have entered
|
log::debug!("recover_from_cycle_a2");
|
||||||
db.signal(0);
|
key * 10 + 2
|
||||||
db.wait_for(1);
|
|
||||||
|
|
||||||
db.recover_cycle_b(key)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn recover_cycle_b(db: &dyn ParDatabase, key: i32) -> i32 {
|
pub(crate) fn recover_from_cycle_b1(_db: &dyn ParDatabase, _cycle: &[String], key: &i32) -> i32 {
|
||||||
|
log::debug!("recover_from_cycle_b1");
|
||||||
|
key * 20 + 1
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn recover_from_cycle_b2(_db: &dyn ParDatabase, _cycle: &[String], key: &i32) -> i32 {
|
||||||
|
log::debug!("recover_from_cycle_b2");
|
||||||
|
key * 20 + 2
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn recover_cycle_a1(db: &dyn ParDatabase, key: i32) -> i32 {
|
||||||
// Wait to create the cycle until both threads have entered
|
// Wait to create the cycle until both threads have entered
|
||||||
db.wait_for(0);
|
|
||||||
db.signal(1);
|
db.signal(1);
|
||||||
|
db.wait_for(2);
|
||||||
|
|
||||||
if key < 0 && db.should_cycle() {
|
db.recover_cycle_a2(key)
|
||||||
db.recover_cycle_a(key)
|
}
|
||||||
} else {
|
|
||||||
key
|
pub(crate) fn recover_cycle_a2(db: &dyn ParDatabase, key: i32) -> i32 {
|
||||||
}
|
db.recover_cycle_b1(key)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn recover_cycle_b1(db: &dyn ParDatabase, key: i32) -> i32 {
|
||||||
|
// Wait to create the cycle until both threads have entered
|
||||||
|
db.wait_for(1);
|
||||||
|
db.signal(2);
|
||||||
|
|
||||||
|
// Wait for thread A to block on this thread
|
||||||
|
db.wait_for(3);
|
||||||
|
|
||||||
|
db.recover_cycle_b2(key)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn recover_cycle_b2(db: &dyn ParDatabase, key: i32) -> i32 {
|
||||||
|
db.recover_cycle_a1(key)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn panic_cycle_a(db: &dyn ParDatabase, key: i32) -> i32 {
|
pub(crate) fn panic_cycle_a(db: &dyn ParDatabase, key: i32) -> i32 {
|
||||||
// Wait to create the cycle until both threads have entered
|
// Wait to create the cycle until both threads have entered
|
||||||
db.signal(0);
|
db.signal(1);
|
||||||
db.wait_for(1);
|
db.wait_for(2);
|
||||||
|
|
||||||
db.panic_cycle_b(key)
|
db.panic_cycle_b(key)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn panic_cycle_b(db: &dyn ParDatabase, key: i32) -> i32 {
|
pub(crate) fn panic_cycle_b(db: &dyn ParDatabase, key: i32) -> i32 {
|
||||||
// Wait to create the cycle until both threads have entered
|
// Wait to create the cycle until both threads have entered
|
||||||
db.wait_for(0);
|
db.wait_for(1);
|
||||||
db.signal(1);
|
db.signal(2);
|
||||||
|
|
||||||
// Wait for thread A to block on this thread
|
// Wait for thread A to block on this thread
|
||||||
db.wait_for(2);
|
db.wait_for(3);
|
||||||
|
|
||||||
// Now try to execute A
|
// Now try to execute A
|
||||||
if key < 0 && db.should_cycle() {
|
db.panic_cycle_a(key)
|
||||||
db.panic_cycle_a(key)
|
|
||||||
} else {
|
|
||||||
key
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn recover_parallel_cycle() {
|
fn recover_parallel_cycle() {
|
||||||
let mut db = ParDatabaseImpl::default();
|
let mut db = ParDatabaseImpl::default();
|
||||||
db.set_should_cycle(true);
|
db.knobs().signal_on_will_block.set(3);
|
||||||
|
|
||||||
let thread_a = std::thread::spawn({
|
let thread_a = std::thread::spawn({
|
||||||
let db = db.snapshot();
|
let db = db.snapshot();
|
||||||
move || db.recover_cycle_a(-1)
|
move || db.recover_cycle_a1(1)
|
||||||
});
|
});
|
||||||
|
|
||||||
let thread_b = std::thread::spawn({
|
let thread_b = std::thread::spawn({
|
||||||
let db = db.snapshot();
|
let db = db.snapshot();
|
||||||
move || db.recover_cycle_b(-1)
|
move || db.recover_cycle_b1(1)
|
||||||
});
|
});
|
||||||
|
|
||||||
assert_eq!(thread_a.join().unwrap(), -10);
|
assert_eq!(thread_a.join().unwrap(), 11);
|
||||||
assert_eq!(thread_b.join().unwrap(), -10);
|
assert_eq!(thread_b.join().unwrap(), 21);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn panic_parallel_cycle() {
|
fn panic_parallel_cycle() {
|
||||||
let mut db = ParDatabaseImpl::default();
|
let mut db = ParDatabaseImpl::default();
|
||||||
db.set_should_cycle(true);
|
db.knobs().signal_on_will_block.set(3);
|
||||||
db.knobs().signal_on_will_block.set(2);
|
|
||||||
|
|
||||||
let thread_a = std::thread::spawn({
|
let thread_a = std::thread::spawn({
|
||||||
let db = db.snapshot();
|
let db = db.snapshot();
|
||||||
|
|
|
@ -27,23 +27,21 @@ pub(crate) trait ParDatabase: Knobs {
|
||||||
/// Invokes `sum2_drop_sum`
|
/// Invokes `sum2_drop_sum`
|
||||||
fn sum3_drop_sum(&self, key: &'static str) -> usize;
|
fn sum3_drop_sum(&self, key: &'static str) -> usize;
|
||||||
|
|
||||||
// Cycle tests:
|
#[salsa::cycle(crate::cycles::recover_from_cycle_a1)]
|
||||||
//
|
#[salsa::invoke(crate::cycles::recover_cycle_a1)]
|
||||||
// A ───────► B
|
fn recover_cycle_a1(&self, key: i32) -> i32;
|
||||||
//
|
|
||||||
// ▲ │
|
|
||||||
// └──────────┘
|
|
||||||
// if key > 0
|
|
||||||
// && should_cycle
|
|
||||||
|
|
||||||
#[salsa::input]
|
#[salsa::cycle(crate::cycles::recover_from_cycle_a2)]
|
||||||
fn should_cycle(&self) -> bool;
|
#[salsa::invoke(crate::cycles::recover_cycle_a2)]
|
||||||
#[salsa::cycle(crate::cycles::recover_cycle)]
|
fn recover_cycle_a2(&self, key: i32) -> i32;
|
||||||
#[salsa::invoke(crate::cycles::recover_cycle_a)]
|
|
||||||
fn recover_cycle_a(&self, key: i32) -> i32;
|
#[salsa::cycle(crate::cycles::recover_from_cycle_b1)]
|
||||||
#[salsa::cycle(crate::cycles::recover_cycle)]
|
#[salsa::invoke(crate::cycles::recover_cycle_b1)]
|
||||||
#[salsa::invoke(crate::cycles::recover_cycle_b)]
|
fn recover_cycle_b1(&self, key: i32) -> i32;
|
||||||
fn recover_cycle_b(&self, key: i32) -> i32;
|
|
||||||
|
#[salsa::cycle(crate::cycles::recover_from_cycle_b2)]
|
||||||
|
#[salsa::invoke(crate::cycles::recover_cycle_b2)]
|
||||||
|
fn recover_cycle_b2(&self, key: i32) -> i32;
|
||||||
|
|
||||||
#[salsa::invoke(crate::cycles::panic_cycle_a)]
|
#[salsa::invoke(crate::cycles::panic_cycle_a)]
|
||||||
fn panic_cycle_a(&self, key: i32) -> i32;
|
fn panic_cycle_a(&self, key: i32) -> i32;
|
||||||
|
|
Loading…
Reference in a new issue