move revisions into a vector indexed by durability

This commit is contained in:
Niko Matsakis 2019-06-24 17:59:19 -04:00
parent c756d98fbb
commit 25ce7ab47b

View file

@ -155,17 +155,13 @@ where
/// Read current value of the revision counter. /// Read current value of the revision counter.
#[inline] #[inline]
pub(crate) fn current_revision(&self) -> Revision { pub(crate) fn current_revision(&self) -> Revision {
self.shared_state.revision.load() self.shared_state.revisions[0].load()
} }
/// The revision in which constants last changed. /// The revision in which constants last changed.
#[inline] #[inline]
pub(crate) fn durability_last_changed_revision(&self, d: Durability) -> Revision { pub(crate) fn durability_last_changed_revision(&self, d: Durability) -> Revision {
if d == Durability::CONSTANT { self.shared_state.revisions[d.index()].load()
self.shared_state.constant_revision.load()
} else {
self.current_revision()
}
} }
/// Read current value of the revision counter. /// Read current value of the revision counter.
@ -289,7 +285,7 @@ where
// To modify the revision, we need the lock. // To modify the revision, we need the lock.
let _lock = self.shared_state.query_lock.write(); let _lock = self.shared_state.query_lock.write();
let old_revision = self.shared_state.revision.fetch_then_increment(); let old_revision = self.shared_state.revisions[0].fetch_then_increment();
assert_eq!(current_revision, old_revision); assert_eq!(current_revision, old_revision);
let new_revision = current_revision.next(); let new_revision = current_revision.next();
@ -444,11 +440,8 @@ where
/// dependent on constants (which otherwise might not get /// dependent on constants (which otherwise might not get
/// re-evaluated). /// re-evaluated).
pub(crate) fn mark_durability_as_changed(&self, d: Durability) { pub(crate) fn mark_durability_as_changed(&self, d: Durability) {
if d == Durability::CONSTANT { for rev in &self.runtime.shared_state.revisions[1..=d.index()] {
self.runtime rev.store(self.new_revision);
.shared_state
.constant_revision
.store(self.new_revision);
} }
} }
} }
@ -470,28 +463,40 @@ struct SharedState<DB: Database> {
/// to ensure a higher-level consistency property. /// to ensure a higher-level consistency property.
query_lock: RwLock<()>, query_lock: RwLock<()>,
/// Stores the current revision. This is an `AtomicU64` because
/// it may be *read* at any point without holding the
/// `query_lock`. Updates, however, require the `query_lock` to be
/// acquired. (See `query_lock` for details.)
revision: AtomicRevision,
/// This is typically equal to `revision` -- set to `revision+1` /// This is typically equal to `revision` -- set to `revision+1`
/// when a new revision is pending (which implies that the current /// when a new revision is pending (which implies that the current
/// revision is canceled). /// revision is canceled).
pending_revision: AtomicRevision, pending_revision: AtomicRevision,
/// The last time that a value marked as "constant" changed. Like /// Stores the "last change" revision for values of each duration.
/// `revision` and `pending_revision`, this is readable without /// This vector is always of length at least 1 (for Durability 0)
/// any lock but requires the query-lock to be write-locked for /// but its total length depends on the number of durations. The
/// updates /// element at index 0 is special as it represents the "current
constant_revision: AtomicRevision, /// revision". In general, we have the invariant that revisions
/// in here are *declining* -- that is, `revisions[i] >=
/// revisions[i + 1]`, for all `i`. This is because when you
/// modify a value with durability D, that implies that values
/// with durability less than D may have changed too.
revisions: Vec<AtomicRevision>,
/// The dependency graph tracks which runtimes are blocked on one /// The dependency graph tracks which runtimes are blocked on one
/// another, waiting for queries to terminate. /// another, waiting for queries to terminate.
dependency_graph: Mutex<DependencyGraph<DB>>, dependency_graph: Mutex<DependencyGraph<DB>>,
} }
impl<DB: Database> SharedState<DB> {
fn with_durabilities(durabilities: usize) -> Self {
SharedState {
next_id: AtomicUsize::new(1),
storage: Default::default(),
query_lock: Default::default(),
revisions: (0..durabilities).map(|_| AtomicRevision::start()).collect(),
pending_revision: AtomicRevision::start(),
dependency_graph: Default::default(),
}
}
}
impl<DB> std::panic::RefUnwindSafe for SharedState<DB> impl<DB> std::panic::RefUnwindSafe for SharedState<DB>
where where
DB: Database, DB: Database,
@ -501,15 +506,7 @@ where
impl<DB: Database> Default for SharedState<DB> { impl<DB: Database> Default for SharedState<DB> {
fn default() -> Self { fn default() -> Self {
SharedState { Self::with_durabilities(2)
next_id: AtomicUsize::new(1),
storage: Default::default(),
query_lock: Default::default(),
revision: AtomicRevision::start(),
pending_revision: AtomicRevision::start(),
constant_revision: AtomicRevision::start(),
dependency_graph: Default::default(),
}
} }
} }
@ -527,7 +524,7 @@ where
}; };
fmt.debug_struct("SharedState") fmt.debug_struct("SharedState")
.field("query_lock", &query_lock) .field("query_lock", &query_lock)
.field("revision", &self.revision) .field("revisions", &self.revisions)
.field("pending_revision", &self.pending_revision) .field("pending_revision", &self.pending_revision)
.finish() .finish()
} }
@ -617,6 +614,10 @@ impl Durability {
pub(crate) fn and(self, c: Durability) -> Durability { pub(crate) fn and(self, c: Durability) -> Durability {
Durability(self.0 & c.0) Durability(self.0 & c.0)
} }
fn index(self) -> usize {
self.0 as usize
}
} }
#[derive(Clone, Debug)] #[derive(Clone, Debug)]