Graph Un-Reachability
In this example we cover:
- Extending a program with additional downstream logic.
- Hydroflow's (
difference
) operator- Further examples of automatic stratification.
Our next example builds on the previous by finding vertices that are not reachable. To do this, we need to capture the set all_vertices
, and use a difference operator to form the difference between that set of vertices and reachable_vertices
.
Essentially we want a flow like this:
graph TD subgraph sources 01[Stream of Edges] end subgraph reachable from origin 00[Origin Vertex] 10[Reached Vertices] 20("V ⨝ E") 00 --> 10 10 --> 20 20 --> 10 01 --> 20 end subgraph unreachable 15[All Vertices] 30(All - Reached) 01 ---> 15 15 --> 30 10 --> 30 30 --> 40 end 40[Output]
This is a simple augmentation of our previous example. Replace the contents of src/main.rs
with the following:
use hydroflow::hydroflow_syntax; pub fn main() { // An edge in the input data = a pair of `usize` vertex IDs. let (pairs_send, pairs_recv) = hydroflow::util::unbounded_channel::<(usize, usize)>(); let mut flow = hydroflow_syntax! { origin = source_iter(vec![0]); stream_of_edges = source_stream(pairs_recv) -> tee(); reached_vertices = merge()->tee(); origin -> [0]reached_vertices; // the join for reachable vertices my_join = join() -> flat_map(|(src, ((), dst))| [src, dst]); reached_vertices[0] -> map(|v| (v, ())) -> [0]my_join; stream_of_edges[1] -> [1]my_join; // the loop my_join -> [1]reached_vertices; // the difference all_vertices - reached_vertices all_vertices = stream_of_edges[0] -> flat_map(|(src, dst)| [src, dst]) -> tee(); unreached_vertices = difference(); all_vertices[0] -> [pos]unreached_vertices; reached_vertices[1] -> [neg]unreached_vertices; // the output all_vertices[1] -> unique() -> for_each(|v| println!("Received vertex: {}", v)); unreached_vertices -> for_each(|v| println!("unreached_vertices vertex: {}", v)); }; println!( "{}", flow.meta_graph() .expect("No graph found, maybe failed to parse.") .to_mermaid() ); pairs_send.send((5, 10)).unwrap(); pairs_send.send((0, 3)).unwrap(); pairs_send.send((3, 6)).unwrap(); pairs_send.send((6, 5)).unwrap(); pairs_send.send((11, 12)).unwrap(); flow.run_available(); }
Notice that we are now sending in some new pairs to test this code. The output should be:
% cargo run
<build output>
<graph output>
Received vertex: 12
Received vertex: 6
Received vertex: 11
Received vertex: 0
Received vertex: 5
Received vertex: 10
Received vertex: 3
unreached_vertices vertex: 12
unreached_vertices vertex: 11
Let's review the changes, all of which come at the end of the program. First,
we remove code to print reached_vertices
. Then we define all_vertices
to be
the vertices that appear in any edge (using familiar flat_map
code from the previous
examples.) In the last few lines, we wire up a
difference operator
to compute the difference between all_vertices
and reached_vertices
; note
how we wire up the pos
and neg
inputs to the difference
operator!
Finally we print both all_vertices
and unreached_vertices
.
The auto-generated mermaid looks like so:
%%{init: {'theme': 'base', 'themeVariables': {'clusterBkg':'#ddd'}}}%% flowchart TD classDef pullClass fill:#02f,color:#fff,stroke:#000 classDef pushClass fill:#ff0,stroke:#000 linkStyle default stroke:#aaa,stroke-width:4px,color:red,font-size:1.5em; subgraph "sg_1v1 stratum 0" 1v1[\"(1v1) <tt>source_iter(vec! [0])</tt>"/]:::pullClass 8v1[\"(8v1) <tt>map(| v | (v, ()))</tt>"/]:::pullClass 6v1[\"(6v1) <tt>join()</tt>"/]:::pullClass 7v1[\"(7v1) <tt>flat_map(| (src, ((), dst)) | [src, dst])</tt>"/]:::pullClass 4v1[\"(4v1) <tt>merge()</tt>"/]:::pullClass 5v1[/"(5v1) <tt>tee()</tt>"\]:::pushClass 15v1["(15v1) <tt>handoff</tt>"]:::otherClass 15v1--->8v1 1v1--0--->4v1 8v1--0--->6v1 6v1--->7v1 7v1--1--->4v1 4v1--->5v1 5v1--0--->15v1 end subgraph "sg_2v1 stratum 0" 2v1[\"(2v1) <tt>source_stream(pairs_recv)</tt>"/]:::pullClass 3v1[/"(3v1) <tt>tee()</tt>"\]:::pushClass 9v1[/"(9v1) <tt>flat_map(| (src, dst) | [src, dst])</tt>"\]:::pushClass 10v1[/"(10v1) <tt>tee()</tt>"\]:::pushClass 2v1--->3v1 3v1--0--->9v1 9v1--->10v1 end subgraph "sg_3v1 stratum 1" 12v1[/"(12v1) <tt>unique()</tt>"\]:::pushClass 13v1[/"(13v1) <tt>for_each(| v | println! ("Received vertex: {}", v))</tt>"\]:::pushClass 12v1--->13v1 end subgraph "sg_4v1 stratum 1" 11v1[\"(11v1) <tt>difference()</tt>"/]:::pullClass 14v1[/"(14v1) <tt>for_each(| v | println! ("unreached_vertices vertex: {}", v))</tt>"\]:::pushClass 11v1--->14v1 end 3v1--1--->16v1 5v1--1--->18v1 10v1--0--->17v1 10v1--1--->19v1 16v1["(16v1) <tt>handoff</tt>"]:::otherClass 16v1--1--->6v1 17v1["(17v1) <tt>handoff</tt>"]:::otherClass 17v1--pos--->11v1 18v1["(18v1) <tt>handoff</tt>"]:::otherClass 18v1==neg===o11v1 19v1["(19v1) <tt>handoff</tt>"]:::otherClass 19v1===o12v1
If you look carefully, you'll see two subgraphs labeled with stratum 0
, and two with
stratum 1
. The reason the strata were broken into subgraphs has nothing to do with
correctness, but rather the way that Hydroflow graphs are compiled and scheduled, as
discussed in the chapter on Architecture.
All the subgraphs labeled stratum 0
are run first to completion,
and then all the subgraphs labeled stratum 1
are run. This captures the requirements of the unique
and difference
operators used in the lower subgraphs: each has to wait for its full inputs before it can start producing output. Note
how the difference
operator has two inputs (labeled pos
and neg
), and only the neg
input shows up as blocking (with the bold edge ending in a ball).