Graph Neighbors
In this example we cover:
- Assigning sub-flows to variables
- Our first multi-input operator,
join
- Indexing multi-input operators by prepending a bracket expression
- The
unique
operator for removing duplicates from a stream- Visualizing hydroflow code via
flow.meta_graph().to_mermaid()
- A first exposure to the concepts of strata and ticks
So far all the operators we've used have one input and one output and therefore create a linear flow of operators. Let's now take a look at a Hydroflow program containing an operator which has multiple inputs; in the following examples we'll extend this to multiple outputs.
To motivate this, we are going to start out on a little project of building a flow-based algorithm
for the problem of graph reachability.
Given an abstract graph—represented as data in the form of a streaming list of edges—which
vertices can be reached from a vertex passed in as the origin
? It turns out this is fairly
naturally represented as a dataflow program.
Note on terminology: In each of the next few examples, we're going to write a Hydroflow program (a dataflow graph) to process data that itself represents some other graph! To avoid confusion, in these examples, we'll refer to the Hydroflow program as a "flow" or "program", and the data as a "graph" of "edges" and "vertices".
But First: Graph Neighbors
Graph reachability exercises a bunch of concepts at once, so we'll start here with a simpler flow that finds graph neighbors: vertices that are just one hop away.
Our graph neighbors Hydroflow program will take
our initial origin
vertex as one input, and join it another input that streams in all the edges—this
join will stream out the vertices that are one hop (edge) away from the starting vertex.
Here is an intuitive diagram of that dataflow program (we'll see complete, autogenerated Hydroflow diagrams below):
graph TD subgraph sources 01[Stream of Edges] end subgraph neighbors of origin 00[Origin Vertex] 20("V ⨝ E") 40[Output] 00 --> 20 01 ---> 20 20 --> 40 end
Lets take a look at some Hydroflow code that implements the program. In your simple
project,
replace the contents of src/main.rs
with the following:
use hydroflow::hydroflow_syntax; pub fn main() { // An edge in the input data = a pair of `usize` vertex IDs. let (edges_send, edges_recv) = hydroflow::util::unbounded_channel::<(usize, usize)>(); let mut flow = hydroflow_syntax! { // inputs: the origin vertex (vertex 0) and stream of input edges origin = source_iter(vec![0]); stream_of_edges = source_stream(edges_recv); // the join my_join = join() -> flat_map(|(src, (_, dst))| [src, dst]); origin -> map(|v| (v, ())) -> [0]my_join; stream_of_edges -> [1]my_join; // the output my_join -> unique() -> for_each(|n| println!("Reached: {}", n)); }; println!( "{}", flow.meta_graph() .expect("No graph found, maybe failed to parse.") .to_mermaid() ); edges_send.send((0, 1)).unwrap(); edges_send.send((2, 4)).unwrap(); edges_send.send((3, 4)).unwrap(); edges_send.send((1, 2)).unwrap(); edges_send.send((0, 3)).unwrap(); edges_send.send((0, 3)).unwrap(); flow.run_available(); }
Run the program and focus on the last three lines of output, which come from flow.run_available()
:
% cargo run
<build output>
<graph output>
Reached: 0
Reached: 3
Reached: 1
That looks right: the edges we "sent" into the flow that start at 0
are
(0, 1)
and (0, 3)
, so the nodes reachable from 0
in 0 or 1 hops are 0, 1, 3
.
Note: When you run the program you may see the lines printed out in a different order. That's OK; the flow we're defining here is producing a
set
of nodes, so the order in which they are printed out is not specified. Thesort_by
operator can be used to sort the output of a flow.
Examining the Hydroflow Code
In the code, we want to start out with the origin vertex, 0
,
and the stream of edges coming in. Because this flow is a bit more complex
than our earlier examples, we break it down into named "subflows", assigning them variable
names that we can reuse. Here we specify two subflows, origin
and stream_of_edges
:
origin = source_iter(vec![0]);
stream_of_edges = source_stream(edges_recv);
The Rust syntax vec![0]
constructs a vector with a single element, 0
, which we iterate
over using source_iter
.
We then set up a join()
that we
name my_join
, which acts like a SQL inner join.
// the join
my_join = join() -> flat_map(|(src, (_, dst))| [src, dst]);
origin -> map(|v| (v, ())) -> [0]my_join;
stream_of_edges -> [1]my_join;
First, note the syntax for passing data into a subflow with multiple inputs requires us to prepend
an input index (starting at 0
) in square brackets to the multi-input variable name or operator. In this example we have -> [0]my_join
and -> [1]my_join
.
Hydroflow's join()
API requires
a little massaging of its inputs to work properly.
The inputs must be of the form of a pair of elements (K, V1)
and (K, V2)
, and the operator joins them on equal keys K
and produces an
output of (K, (V1, V2))
elements. In this case we only want to join on the key v
and
don't have any corresponding value, so we feed origin
through a map()
to generate (v, ())
elements as the first join input.
The stream_of_edges
are (src, dst)
pairs,
so the join's output is (src, ((), dst))
where dst
are new neighbor
vertices. So the my_join
variable feeds the output of the join through a flat_map
to extract the pairs into 2-item arrays, which are flattened to give us a list of all vertices reached.
Finally we print the neighbor vertices as follows:
my_join -> unique() -> for_each(|n| println!("Reached: {}", n));
The unique operator removes duplicates from the stream to make things more readable. Note that unique
does not run in a streaming fashion, which we will talk about more below.
There's
also some extra code here, flow.meta_graph().expect(...).to_mermaid()
, which tells
Hydroflow to
generate a diagram rendered by Mermaid showing
the structure of the graph, and print it to stdout. You can copy that text and paste it into the Mermaid Live Editor to see the graph, which should look as follows:
%%{init: {'theme': 'base', 'themeVariables': {'clusterBkg':'#ddd'}}}%% flowchart TD classDef pullClass fill:#02f,color:#fff,stroke:#000 classDef pushClass fill:#ff0,stroke:#000 linkStyle default stroke:#aaa,stroke-width:4px,color:red,font-size:1.5em; subgraph "sg_1v1 stratum 0" 1v1[\"(1v1) <tt>source_iter(vec! [0])</tt>"/]:::pullClass 2v1[\"(2v1) <tt>source_stream(edges_recv)</tt>"/]:::pullClass 5v1[\"(5v1) <tt>map(| v | (v, ()))</tt>"/]:::pullClass 3v1[\"(3v1) <tt>join()</tt>"/]:::pullClass 4v1[\"(4v1) <tt>flat_map(| (src, (_, dst)) | [src, dst])</tt>"/]:::pullClass 1v1--->5v1 2v1--1--->3v1 5v1--0--->3v1 3v1--->4v1 end subgraph "sg_2v1 stratum 1" 6v1[/"(6v1) <tt>unique()</tt>"\]:::pushClass 7v1[/"(7v1) <tt>for_each(| n | println! ("Reached: {}", n))</tt>"\]:::pushClass 6v1--->7v1 end 4v1--->8v1 8v1["(8v1) <tt>handoff</tt>"]:::otherClass 8v1===o6v1
Notice in the mermaid graph how Hydroflow separates the unique
operator and its downstream dependencies into their own
stratum (plural: strata). Note also the edge coming into unique
is bold and ends in a ball: this is because the input to unique
is
``blocking'', meaning that unique
should not run until all of the input on that edge has been received.
The stratum boundary before unique
ensures that the blocking property is respected.
You may also be wondering why the nodes in the graph have different colors (and shapes, for readers who cannot distinguish
colors easily). The answer has nothing to do with the meaning of the program, only with the way that Hydroflow compiles
operators into Rust. Simply put, blue (wide-topped) boxes pull data, yellow (wide-bottomed) boxes push data, and the handoff
is a special operator that buffers pushed data for subsequent pulling. Hydroflow always places a handoff
between a push producer and a pull consumer, for reasons explained in the Architecture chapter.
Strata and Ticks
Hydroflow runs each stratum in order, one at a time, ensuring all values are computed before moving on to the next stratum. Between strata we see a handoff, which logically buffers the output of the first stratum, and delineates the separation of execution between the 2 strata.
After all strata are run, Hydroflow returns to the first stratum; this begins the next tick. This doesn't really matter for this example, but it is important for long-running Hydroflow services that accept input from the outside world. More on this topic in the chapter on time.
Returning to the code, if you read the edges_send
calls carefully, you'll see that the example data
has vertices (2
, 4
) that are more than one hop away from 0
, which were
not output by our simple program. To extend this example to graph reachability,
we need to recurse: find neighbors of our neighbors, neighbors of our neighbors' neighbors, and so on. In Hydroflow,
this is done by adding a loop to the flow, as we'll see in our next example.