428: salsa-2022: fix hanging cancellations due to cvar not being notified r=nikomatsakis a=manapointer

This PR fixes an issue with the `cvar` condvar field of `Shared<DB>` not being notified when `Arc<Shared<DB>>`s were getting dropped. 

Previously, the condvar was being notified here:

```rust
impl<DB> Drop for Shared<DB>
where
    DB: HasJars,
{
    fn drop(&mut self) {
        self.cvar.notify_all();
    }
}
```

However, because this is implemented on `Shared<DB>`, the `drop` code only ran after all `Arc<Shared<DB>>`s (including that of the actual database and not just its snapshots) had dropped first, even though the intention was for the `drop` code to run when each individual `Arc<Shared<DB>>` was dropped so that the condvar would be notified each time.

To fix this, I've modified the `Shared<DB>` struct to instead hold `Arc`s to both the jars and the condvar, and `Storage` now holds `Shared<DB>` directly rather than `Arc<Shared<DB>>`. When `Shared<DB>` is dropped, it first drops its `Arc` for the jars, and then notifies the condvar.

## Note 
On my local branch I have a test case for this functionality, although it relied on timing using `std:🧵:sleep`. I figured this was less than ideal, so I decided not to include it in this PR - I'd love to get feedback on a better way to test this!

Co-authored-by: manapointer <manapointer@gmail.com>
Co-authored-by: Niko Matsakis <niko@alum.mit.edu>
This commit is contained in:
bors[bot] 2022-12-29 16:37:53 +00:00 committed by GitHub
commit 20c7834ff3
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -19,10 +19,7 @@ pub struct Storage<DB: HasJars> {
/// Data shared across all databases. This contains the ingredients needed by each jar. /// Data shared across all databases. This contains the ingredients needed by each jar.
/// See the ["jars and ingredients" chapter](https://salsa-rs.github.io/salsa/plumbing/jars_and_ingredients.html) /// See the ["jars and ingredients" chapter](https://salsa-rs.github.io/salsa/plumbing/jars_and_ingredients.html)
/// for more detailed description. /// for more detailed description.
/// shared: Shared<DB>,
/// Even though this struct is stored in an `Arc`, we sometimes get mutable access to it
/// by using `Arc::get_mut`. This is only possible when all parallel snapshots have been dropped.
shared: Arc<Shared<DB>>,
/// The "ingredients" structure stores the information about how to find each ingredient in the database. /// The "ingredients" structure stores the information about how to find each ingredient in the database.
/// It allows us to take the [`IngredientIndex`] assigned to a particular ingredient /// It allows us to take the [`IngredientIndex`] assigned to a particular ingredient
@ -43,11 +40,14 @@ struct Shared<DB: HasJars> {
/// Contains the data for each jar in the database. /// Contains the data for each jar in the database.
/// Each jar stores its own structs in there that ultimately contain ingredients /// Each jar stores its own structs in there that ultimately contain ingredients
/// (types that implement the [`Ingredient`] trait, like [`crate::function::FunctionIngredient`]). /// (types that implement the [`Ingredient`] trait, like [`crate::function::FunctionIngredient`]).
jars: DB::Jars, ///
/// Even though these jars are stored in an `Arc`, we sometimes get mutable access to them
/// by using `Arc::get_mut`. This is only possible when all parallel snapshots have been dropped.
jars: Option<Arc<DB::Jars>>,
/// Conditional variable that is used to coordinate cancellation. /// Conditional variable that is used to coordinate cancellation.
/// When the main thread writes to the database, it blocks until each of the snapshots can be cancelled. /// When the main thread writes to the database, it blocks until each of the snapshots can be cancelled.
cvar: Condvar, cvar: Arc<Condvar>,
} }
// ANCHOR: default // ANCHOR: default
@ -59,10 +59,10 @@ where
let mut routes = Routes::new(); let mut routes = Routes::new();
let jars = DB::create_jars(&mut routes); let jars = DB::create_jars(&mut routes);
Self { Self {
shared: Arc::new(Shared { shared: Shared {
jars, jars: Some(Arc::new(jars)),
cvar: Default::default(), cvar: Arc::new(Default::default()),
}), },
routes: Arc::new(routes), routes: Arc::new(routes),
runtime: Runtime::default(), runtime: Runtime::default(),
} }
@ -86,7 +86,7 @@ where
} }
pub fn jars(&self) -> (&DB::Jars, &Runtime) { pub fn jars(&self) -> (&DB::Jars, &Runtime) {
(&self.shared.jars, &self.runtime) (self.shared.jars.as_ref().unwrap(), &self.runtime)
} }
pub fn runtime(&self) -> &Runtime { pub fn runtime(&self) -> &Runtime {
@ -111,17 +111,17 @@ where
// Acquire `&mut` access to `self.shared` -- this is only possible because // Acquire `&mut` access to `self.shared` -- this is only possible because
// the snapshots have all been dropped, so we hold the only handle to the `Arc`. // the snapshots have all been dropped, so we hold the only handle to the `Arc`.
let shared = Arc::get_mut(&mut self.shared).unwrap(); let jars = Arc::get_mut(self.shared.jars.as_mut().unwrap()).unwrap();
// Inform other ingredients that a new revision has begun. // Inform other ingredients that a new revision has begun.
// This gives them a chance to free resources that were being held until the next revision. // This gives them a chance to free resources that were being held until the next revision.
let routes = self.routes.clone(); let routes = self.routes.clone();
for route in routes.reset_routes() { for route in routes.reset_routes() {
route(&mut shared.jars).reset_for_new_revision(); route(jars).reset_for_new_revision();
} }
// Return mut ref to jars + runtime. // Return mut ref to jars + runtime.
(&mut shared.jars, &mut self.runtime) (jars, &mut self.runtime)
} }
// ANCHOR_END: jars_mut // ANCHOR_END: jars_mut
@ -136,7 +136,7 @@ where
self.runtime.set_cancellation_flag(); self.runtime.set_cancellation_flag();
// If we have unique access to the jars, we are done. // If we have unique access to the jars, we are done.
if Arc::get_mut(&mut self.shared).is_some() { if Arc::get_mut(self.shared.jars.as_mut().unwrap()).is_some() {
return; return;
} }
@ -155,16 +155,31 @@ where
pub fn ingredient(&self, ingredient_index: IngredientIndex) -> &dyn Ingredient<DB> { pub fn ingredient(&self, ingredient_index: IngredientIndex) -> &dyn Ingredient<DB> {
let route = self.routes.route(ingredient_index); let route = self.routes.route(ingredient_index);
route(&self.shared.jars) route(self.shared.jars.as_ref().unwrap())
} }
} }
impl<DB> Drop for Shared<DB> impl<DB> Clone for Shared<DB>
where
DB: HasJars,
{
fn clone(&self) -> Self {
Self {
jars: self.jars.clone(),
cvar: self.cvar.clone(),
}
}
}
impl<DB> Drop for Storage<DB>
where where
DB: HasJars, DB: HasJars,
{ {
fn drop(&mut self) { fn drop(&mut self) {
self.cvar.notify_all(); // Drop the Arc reference before the cvar is notified,
// since other threads are sleeping, waiting for it to reach 1.
drop(self.shared.jars.take());
self.shared.cvar.notify_all();
} }
} }