From 79f8acc3aa058c86c3664c6b2ee77420313d4aa0 Mon Sep 17 00:00:00 2001 From: Niko Matsakis Date: Wed, 27 Oct 2021 06:21:09 -0400 Subject: [PATCH] improve parallel cycle tests Before we could not observe the case where: * thread A is blocked on B * cycle detected in thread B * some participants are on thread A and have to be marked (In particular, I commented out some code that seemed necessary and didn't see any tests fail) --- tests/parallel/cycles.rs | 104 +++++++++++++++++++++++++++------------ tests/parallel/setup.rs | 30 ++++++----- 2 files changed, 86 insertions(+), 48 deletions(-) diff --git a/tests/parallel/cycles.rs b/tests/parallel/cycles.rs index f6436f06..8717b2b6 100644 --- a/tests/parallel/cycles.rs +++ b/tests/parallel/cycles.rs @@ -6,78 +6,118 @@ use crate::setup::{Knobs, ParDatabase, ParDatabaseImpl}; use salsa::{Cancelled, ParallelDatabase}; use test_env_log::test; -pub(crate) fn recover_cycle(_db: &dyn ParDatabase, _cycle: &[String], key: &i32) -> i32 { - key * 10 +// Recover cycle test: +// +// The pattern is as follows. +// +// Thread A Thread B +// -------- -------- +// a1 b1 +// | wait for stage 1 (blocks) +// signal stage 1 | +// wait for stage 2 (blocks) (unblocked) +// | signal stage 2 +// (unblocked) wait for stage 3 (blocks) +// a2 | +// b1 (blocks -> stage 3) | +// | (unblocked) +// | b2 +// | a1 (cycle detected, recovers) +// | b2 completes, recovers +// | b1 completes, recovers +// a2 sees cycle, recovers +// a1 completes, recovers + +pub(crate) fn recover_from_cycle_a1(_db: &dyn ParDatabase, _cycle: &[String], key: &i32) -> i32 { + log::debug!("recover_from_cycle_a1"); + key * 10 + 1 } -pub(crate) fn recover_cycle_a(db: &dyn ParDatabase, key: i32) -> i32 { - // Wait to create the cycle until both threads have entered - db.signal(0); - db.wait_for(1); - - db.recover_cycle_b(key) +pub(crate) fn recover_from_cycle_a2(_db: &dyn ParDatabase, _cycle: &[String], key: &i32) -> i32 { + log::debug!("recover_from_cycle_a2"); + key * 10 + 2 } -pub(crate) fn recover_cycle_b(db: &dyn ParDatabase, key: i32) -> i32 { +pub(crate) fn recover_from_cycle_b1(_db: &dyn ParDatabase, _cycle: &[String], key: &i32) -> i32 { + log::debug!("recover_from_cycle_b1"); + key * 20 + 1 +} + +pub(crate) fn recover_from_cycle_b2(_db: &dyn ParDatabase, _cycle: &[String], key: &i32) -> i32 { + log::debug!("recover_from_cycle_b2"); + key * 20 + 2 +} + +pub(crate) fn recover_cycle_a1(db: &dyn ParDatabase, key: i32) -> i32 { // Wait to create the cycle until both threads have entered - db.wait_for(0); db.signal(1); + db.wait_for(2); - if key < 0 && db.should_cycle() { - db.recover_cycle_a(key) - } else { - key - } + db.recover_cycle_a2(key) +} + +pub(crate) fn recover_cycle_a2(db: &dyn ParDatabase, key: i32) -> i32 { + db.recover_cycle_b1(key) +} + +pub(crate) fn recover_cycle_b1(db: &dyn ParDatabase, key: i32) -> i32 { + // Wait to create the cycle until both threads have entered + db.wait_for(1); + db.signal(2); + + // Wait for thread A to block on this thread + db.wait_for(3); + + db.recover_cycle_b2(key) +} + +pub(crate) fn recover_cycle_b2(db: &dyn ParDatabase, key: i32) -> i32 { + db.recover_cycle_a1(key) } pub(crate) fn panic_cycle_a(db: &dyn ParDatabase, key: i32) -> i32 { // Wait to create the cycle until both threads have entered - db.signal(0); - db.wait_for(1); + db.signal(1); + db.wait_for(2); db.panic_cycle_b(key) } pub(crate) fn panic_cycle_b(db: &dyn ParDatabase, key: i32) -> i32 { // Wait to create the cycle until both threads have entered - db.wait_for(0); - db.signal(1); + db.wait_for(1); + db.signal(2); // Wait for thread A to block on this thread - db.wait_for(2); + db.wait_for(3); // Now try to execute A - if key < 0 && db.should_cycle() { - db.panic_cycle_a(key) - } else { - key - } + db.panic_cycle_a(key) } #[test] fn recover_parallel_cycle() { let mut db = ParDatabaseImpl::default(); - db.set_should_cycle(true); + db.knobs().signal_on_will_block.set(3); let thread_a = std::thread::spawn({ let db = db.snapshot(); - move || db.recover_cycle_a(-1) + move || db.recover_cycle_a1(1) }); let thread_b = std::thread::spawn({ let db = db.snapshot(); - move || db.recover_cycle_b(-1) + move || db.recover_cycle_b1(1) }); - assert_eq!(thread_a.join().unwrap(), -10); - assert_eq!(thread_b.join().unwrap(), -10); + assert_eq!(thread_a.join().unwrap(), 11); + assert_eq!(thread_b.join().unwrap(), 21); } #[test] fn panic_parallel_cycle() { let mut db = ParDatabaseImpl::default(); - db.set_should_cycle(true); - db.knobs().signal_on_will_block.set(2); + db.knobs().signal_on_will_block.set(3); let thread_a = std::thread::spawn({ let db = db.snapshot(); diff --git a/tests/parallel/setup.rs b/tests/parallel/setup.rs index be81fa19..9691c3c9 100644 --- a/tests/parallel/setup.rs +++ b/tests/parallel/setup.rs @@ -27,23 +27,21 @@ pub(crate) trait ParDatabase: Knobs { /// Invokes `sum2_drop_sum` fn sum3_drop_sum(&self, key: &'static str) -> usize; - // Cycle tests: - // - // A ───────► B - // - // ▲ │ - // └──────────┘ - // if key > 0 - // && should_cycle + #[salsa::cycle(crate::cycles::recover_from_cycle_a1)] + #[salsa::invoke(crate::cycles::recover_cycle_a1)] + fn recover_cycle_a1(&self, key: i32) -> i32; - #[salsa::input] - fn should_cycle(&self) -> bool; - #[salsa::cycle(crate::cycles::recover_cycle)] - #[salsa::invoke(crate::cycles::recover_cycle_a)] - fn recover_cycle_a(&self, key: i32) -> i32; - #[salsa::cycle(crate::cycles::recover_cycle)] - #[salsa::invoke(crate::cycles::recover_cycle_b)] - fn recover_cycle_b(&self, key: i32) -> i32; + #[salsa::cycle(crate::cycles::recover_from_cycle_a2)] + #[salsa::invoke(crate::cycles::recover_cycle_a2)] + fn recover_cycle_a2(&self, key: i32) -> i32; + + #[salsa::cycle(crate::cycles::recover_from_cycle_b1)] + #[salsa::invoke(crate::cycles::recover_cycle_b1)] + fn recover_cycle_b1(&self, key: i32) -> i32; + + #[salsa::cycle(crate::cycles::recover_from_cycle_b2)] + #[salsa::invoke(crate::cycles::recover_cycle_b2)] + fn recover_cycle_b2(&self, key: i32) -> i32; #[salsa::invoke(crate::cycles::panic_cycle_a)] fn panic_cycle_a(&self, key: i32) -> i32;