diff --git a/lib/src/dag_walk.rs b/lib/src/dag_walk.rs index fc708421b..17cd1db83 100644 --- a/lib/src/dag_walk.rs +++ b/lib/src/dag_walk.rs @@ -12,9 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashSet; +use std::collections::{BinaryHeap, HashMap, HashSet}; use std::hash::Hash; -use std::iter; +use std::{iter, mem}; use itertools::Itertools as _; @@ -93,6 +93,151 @@ where result } +/// Like `topo_order_reverse()`, but can iterate linear DAG lazily. +/// +/// The DAG is supposed to be (mostly) topologically ordered by `T: Ord`. +/// For example, topological order of chronological data should respect +/// timestamp (except a few outliers caused by clock skew.) +/// +/// Use `topo_order_reverse()` if the DAG is heavily branched. This can +/// only process linear part lazily. +pub fn topo_order_reverse_lazy( + start: II, + id_fn: impl Fn(&T) -> ID, + mut neighbors_fn: impl FnMut(&T) -> NI, +) -> impl Iterator +where + T: Ord, + ID: Hash + Eq + Clone, + II: IntoIterator, + NI: IntoIterator, +{ + let mut inner = TopoOrderReverseLazyInner::new(start.into_iter().collect()); + iter::from_fn(move || inner.next(&id_fn, &mut neighbors_fn)) +} + +#[derive(Clone, Debug)] +struct TopoOrderReverseLazyInner { + start: Vec, + result: Vec, + emitted: HashSet, +} + +impl TopoOrderReverseLazyInner { + fn new(start: Vec) -> Self { + TopoOrderReverseLazyInner { + start, + result: Vec::new(), + emitted: HashSet::new(), + } + } + + fn next>( + &mut self, + id_fn: impl Fn(&T) -> ID, + mut neighbors_fn: impl FnMut(&T) -> NI, + ) -> Option { + if let Some(node) = self.result.pop() { + return Some(node); + } + + // Fast path for linear DAG + if self.start.len() <= 1 { + let node = self.start.pop()?; + self.start.extend(neighbors_fn(&node)); + assert!(self.emitted.insert(id_fn(&node)), "graph has cycle"); + return Some(node); + } + + // Extract graph nodes based on T's order, and sort them by using ids + // (because we wouldn't want to clone T itself) + let start_ids = self.start.iter().map(&id_fn).collect_vec(); + let (mut node_map, neighbor_ids_map, remainder) = + look_ahead_sub_graph(mem::take(&mut self.start), &id_fn, &mut neighbors_fn); + self.start = remainder; + let sorted_ids = topo_order_forward(&start_ids, |id| *id, |id| &neighbor_ids_map[id]); + self.result.reserve(sorted_ids.len()); + for id in sorted_ids { + let (id, node) = node_map.remove_entry(id).unwrap(); + assert!(self.emitted.insert(id), "graph has cycle"); + self.result.push(node); + } + self.result.pop() + } +} + +/// Splits DAG at single fork point, and extracts branchy part as sub graph. +/// +/// ```text +/// o | C +/// | o B +/// |/ <---- split here (A->B or A->C would create cycle) +/// o A +/// ``` +/// +/// If a branch reached to root (empty neighbors), the graph can't be split +/// anymore because the other branch may be connected to a descendant of +/// the rooted branch. +/// +/// ```text +/// o | C +/// | o B +/// | <---- can't split here (there may be edge A->B) +/// o A +/// ``` +/// +/// We assume the graph is (mostly) topologically ordered by `T: Ord`. +#[allow(clippy::type_complexity)] +fn look_ahead_sub_graph( + start: Vec, + id_fn: impl Fn(&T) -> ID, + mut neighbors_fn: impl FnMut(&T) -> NI, +) -> (HashMap, HashMap>, Vec) +where + T: Ord, + ID: Hash + Eq + Clone, + NI: IntoIterator, +{ + let mut queue: BinaryHeap = start.into(); + // Build separate node/neighbors maps since lifetime is different at caller + let mut node_map: HashMap = HashMap::new(); + let mut neighbor_ids_map: HashMap> = HashMap::new(); + let mut has_reached_root = false; + while queue.len() > 1 || node_map.is_empty() || has_reached_root { + let node = if let Some(node) = queue.pop() { + node + } else { + break; + }; + let node_id = id_fn(&node); + if node_map.contains_key(&node_id) { + continue; + } + + let mut neighbor_ids = Vec::new(); + let mut neighbors_iter = neighbors_fn(&node).into_iter().peekable(); + has_reached_root |= neighbors_iter.peek().is_none(); + for neighbor in neighbors_iter { + neighbor_ids.push(id_fn(&neighbor)); + queue.push(neighbor); + } + node_map.insert(node_id.clone(), node); + neighbor_ids_map.insert(node_id, neighbor_ids); + } + + assert!(queue.len() <= 1, "order of remainder shouldn't matter"); + let remainder = queue.into_vec(); + + // Omit unvisited neighbors + if let Some(unvisited_id) = remainder.first().map(&id_fn) { + for neighbor_ids in neighbor_ids_map.values_mut() { + neighbor_ids.retain(|id| *id != unvisited_id); + } + } + + (node_map, neighbor_ids_map, remainder) +} + pub fn leaves( start: II, id_fn: impl Fn(&T) -> ID, @@ -231,6 +376,13 @@ mod tests { assert_eq!(common, vec!['C', 'B', 'A']); let common = topo_order_reverse(vec!['B', 'C'], id_fn, neighbors_fn); assert_eq!(common, vec!['C', 'B', 'A']); + + let common = topo_order_reverse_lazy(vec!['C'], id_fn, neighbors_fn).collect_vec(); + assert_eq!(common, vec!['C', 'B', 'A']); + let common = topo_order_reverse_lazy(vec!['C', 'B'], id_fn, neighbors_fn).collect_vec(); + assert_eq!(common, vec!['C', 'B', 'A']); + let common = topo_order_reverse_lazy(vec!['B', 'C'], id_fn, neighbors_fn).collect_vec(); + assert_eq!(common, vec!['C', 'B', 'A']); } #[test] @@ -262,6 +414,15 @@ mod tests { assert_eq!(common, vec!['F', 'D', 'E', 'C', 'B', 'A']); let common = topo_order_reverse(vec!['F', 'D', 'E'], id_fn, neighbors_fn); assert_eq!(common, vec!['F', 'D', 'C', 'B', 'E', 'A']); + + let common = topo_order_reverse_lazy(vec!['F'], id_fn, neighbors_fn).collect_vec(); + assert_eq!(common, vec!['F', 'E', 'D', 'C', 'B', 'A']); + let common = + topo_order_reverse_lazy(vec!['F', 'E', 'C'], id_fn, neighbors_fn).collect_vec(); + assert_eq!(common, vec!['F', 'D', 'E', 'C', 'B', 'A']); + let common = + topo_order_reverse_lazy(vec!['F', 'D', 'E'], id_fn, neighbors_fn).collect_vec(); + assert_eq!(common, vec!['F', 'D', 'C', 'B', 'E', 'A']); } #[test] @@ -296,6 +457,217 @@ mod tests { let common = topo_order_reverse(vec!['I'], id_fn, neighbors_fn); assert_eq!(common, vec!['I', 'D', 'B', 'H', 'F', 'G', 'E', 'C', 'A']); + + let common = topo_order_reverse_lazy(vec!['I'], id_fn, neighbors_fn).collect_vec(); + assert_eq!(common, vec!['I', 'D', 'B', 'H', 'F', 'G', 'E', 'C', 'A']); + } + + #[test] + fn test_topo_order_reverse_nested_merges_bad_order() { + // This graph: + // o I + // |\ + // | |\ + // | | |\ + // | | | o h (h > I) + // | | |/| + // | | o | G + // | |/| o f + // | o |/ e (e > I, G) + // |/| o D + // o |/ C + // | o b (b > D) + // |/ + // o A + + let neighbors = hashmap! { + 'A' => vec![], + 'b' => vec!['A'], + 'C' => vec!['A'], + 'D' => vec!['b'], + 'e' => vec!['C', 'b'], + 'f' => vec!['D'], + 'G' => vec!['e', 'D'], + 'h' => vec!['G', 'f'], + 'I' => vec!['C', 'e', 'G', 'h'], + }; + let id_fn = |node: &char| *node; + let neighbors_fn = |node: &char| neighbors[node].clone(); + + let common = topo_order_reverse(vec!['I'], id_fn, neighbors_fn); + assert_eq!(common, vec!['I', 'h', 'G', 'e', 'C', 'f', 'D', 'b', 'A']); + + let common = topo_order_reverse_lazy(vec!['I'], id_fn, neighbors_fn).collect_vec(); + assert_eq!(common, vec!['I', 'h', 'G', 'e', 'C', 'f', 'D', 'b', 'A']); + } + + #[test] + fn test_topo_order_reverse_merge_bad_fork_order_at_root() { + // This graph: + // o E + // |\ + // o | D + // | o C + // | o B + // |/ + // o a (a > D, B) + + let neighbors = hashmap! { + 'a' => vec![], + 'B' => vec!['a'], + 'C' => vec!['B'], + 'D' => vec!['a'], + 'E' => vec!['D', 'C'], + }; + let id_fn = |node: &char| *node; + let neighbors_fn = |node: &char| neighbors[node].clone(); + + let common = topo_order_reverse(vec!['E'], id_fn, neighbors_fn); + assert_eq!(common, vec!['E', 'D', 'C', 'B', 'a']); + + // The root node 'a' is visited before 'C'. If the graph were split there, + // the branch 'C->B->a' would be orphaned. + let common = topo_order_reverse_lazy(vec!['E'], id_fn, neighbors_fn).collect_vec(); + assert_eq!(common, vec!['E', 'D', 'C', 'B', 'a']); + } + + #[test] + fn test_topo_order_reverse_merge_and_linear() { + // This graph: + // o G + // |\ + // | o F + // o | E + // | o D + // |/ + // o C + // o B + // o A + + let neighbors = hashmap! { + 'A' => vec![], + 'B' => vec!['A'], + 'C' => vec!['B'], + 'D' => vec!['C'], + 'E' => vec!['C'], + 'F' => vec!['D'], + 'G' => vec!['E', 'F'], + }; + let id_fn = |node: &char| *node; + let neighbors_fn = |node: &char| neighbors[node].clone(); + + let common = topo_order_reverse(vec!['G'], id_fn, neighbors_fn); + assert_eq!(common, vec!['G', 'E', 'F', 'D', 'C', 'B', 'A']); + + let common = topo_order_reverse_lazy(vec!['G'], id_fn, neighbors_fn).collect_vec(); + assert_eq!(common, vec!['G', 'E', 'F', 'D', 'C', 'B', 'A']); + + // Iterator can be lazy for linear chunks. + let mut inner_iter = TopoOrderReverseLazyInner::new(vec!['G']); + assert_eq!(inner_iter.next(id_fn, neighbors_fn), Some('G')); + assert!(!inner_iter.start.is_empty()); + assert!(inner_iter.result.is_empty()); + assert_eq!( + iter::from_fn(|| inner_iter.next(id_fn, neighbors_fn)) + .take(4) + .collect_vec(), + vec!['E', 'F', 'D', 'C'], + ); + assert!(!inner_iter.start.is_empty()); + assert!(inner_iter.result.is_empty()); + } + + #[test] + fn test_topo_order_reverse_merge_and_linear_bad_fork_order() { + // This graph: + // o G + // |\ + // o | F + // o | E + // | o D + // |/ + // o c (c > E, D) + // o B + // o A + + let neighbors = hashmap! { + 'A' => vec![], + 'B' => vec!['A'], + 'c' => vec!['B'], + 'D' => vec!['c'], + 'E' => vec!['c'], + 'F' => vec!['E'], + 'G' => vec!['F', 'D'], + }; + let id_fn = |node: &char| *node; + let neighbors_fn = |node: &char| neighbors[node].clone(); + + let common = topo_order_reverse(vec!['G'], id_fn, neighbors_fn); + assert_eq!(common, vec!['G', 'F', 'E', 'D', 'c', 'B', 'A']); + + let common = topo_order_reverse_lazy(vec!['G'], id_fn, neighbors_fn).collect_vec(); + assert_eq!(common, vec!['G', 'F', 'E', 'D', 'c', 'B', 'A']); + + // Iterator can be lazy for linear chunks. The node 'c' is visited before 'D', + // but it will be processed lazily. + let mut inner_iter = TopoOrderReverseLazyInner::new(vec!['G']); + assert_eq!(inner_iter.next(id_fn, neighbors_fn), Some('G')); + assert!(!inner_iter.start.is_empty()); + assert!(inner_iter.result.is_empty()); + assert_eq!( + iter::from_fn(|| inner_iter.next(id_fn, neighbors_fn)) + .take(4) + .collect_vec(), + vec!['F', 'E', 'D', 'c'], + ); + assert!(!inner_iter.start.is_empty()); + assert!(inner_iter.result.is_empty()); + } + + #[test] + fn test_topo_order_reverse_merge_and_linear_bad_merge_order() { + // This graph: + // o G + // |\ + // o | f (f > G) + // o | e + // | o d (d > G) + // |/ + // o C + // o B + // o A + + let neighbors = hashmap! { + 'A' => vec![], + 'B' => vec!['A'], + 'C' => vec!['B'], + 'd' => vec!['C'], + 'e' => vec!['C'], + 'f' => vec!['e'], + 'G' => vec!['f', 'd'], + }; + let id_fn = |node: &char| *node; + let neighbors_fn = |node: &char| neighbors[node].clone(); + + let common = topo_order_reverse(vec!['G'], id_fn, neighbors_fn); + assert_eq!(common, vec!['G', 'f', 'e', 'd', 'C', 'B', 'A']); + + let common = topo_order_reverse_lazy(vec!['G'], id_fn, neighbors_fn).collect_vec(); + assert_eq!(common, vec!['G', 'f', 'e', 'd', 'C', 'B', 'A']); + + // Iterator can be lazy for linear chunks. + let mut inner_iter = TopoOrderReverseLazyInner::new(vec!['G']); + assert_eq!(inner_iter.next(id_fn, neighbors_fn), Some('G')); + assert!(!inner_iter.start.is_empty()); + assert!(inner_iter.result.is_empty()); + assert_eq!( + iter::from_fn(|| inner_iter.next(id_fn, neighbors_fn)) + .take(4) + .collect_vec(), + vec!['f', 'e', 'd', 'C'], + ); + assert!(!inner_iter.start.is_empty()); + assert!(inner_iter.result.is_empty()); } #[test] @@ -325,6 +697,9 @@ mod tests { let common = topo_order_reverse(vec!['F', 'C'], id_fn, neighbors_fn); assert_eq!(common, vec!['F', 'E', 'D', 'C', 'B', 'A']); + + let common = topo_order_reverse_lazy(vec!['F', 'C'], id_fn, neighbors_fn).collect_vec(); + assert_eq!(common, vec!['F', 'E', 'D', 'C', 'B', 'A']); } #[test] @@ -347,6 +722,9 @@ mod tests { let common = topo_order_reverse(vec!['D'], id_fn, neighbors_fn); assert_eq!(common, vec!['D', 'C', 'B', 'A']); + + let common = topo_order_reverse_lazy(vec!['D'], id_fn, neighbors_fn).collect_vec(); + assert_eq!(common, vec!['D', 'C', 'B', 'A']); } #[test] @@ -366,6 +744,49 @@ mod tests { let result = panic::catch_unwind(|| topo_order_reverse(vec!['C'], id_fn, neighbors_fn)); assert!(result.is_err()); + + topo_order_reverse_lazy(vec!['C'], id_fn, neighbors_fn) + .take(3) + .collect_vec(); // sanity check + let result = panic::catch_unwind(|| { + topo_order_reverse_lazy(vec!['C'], id_fn, neighbors_fn) + .take(4) + .collect_vec() + }); + assert!(result.is_err()); + } + + #[test] + fn test_topo_order_reverse_cycle_to_branchy_sub_graph() { + // This graph: + // o D + // |\ + // | o C + // |/ + // o B + // o A (to C) + + let neighbors = hashmap! { + 'A' => vec!['C'], + 'B' => vec!['A'], + 'C' => vec!['B'], + 'D' => vec!['B', 'C'], + }; + let id_fn = |node: &char| *node; + let neighbors_fn = |node: &char| neighbors[node].clone(); + + let result = panic::catch_unwind(|| topo_order_reverse(vec!['D'], id_fn, neighbors_fn)); + assert!(result.is_err()); + + topo_order_reverse_lazy(vec!['D'], id_fn, neighbors_fn) + .take(4) + .collect_vec(); // sanity check + let result = panic::catch_unwind(|| { + topo_order_reverse_lazy(vec!['D'], id_fn, neighbors_fn) + .take(5) + .collect_vec() + }); + assert!(result.is_err()); } #[test]