Graph Neighbors

In this example we cover:

  • Assigning sub-flows to variables
  • Our first multi-input operator, join
  • Indexing multi-input operators by prepending a bracket expression
  • The unique operator for removing duplicates from a stream
  • Visualizing hydroflow code via flow.meta_graph().to_mermaid()
  • A first exposure to the concepts of strata and ticks

So far all the operators we've used have one input and one output and therefore create a linear flow of operators. Let's now take a look at a Hydroflow program containing an operator which has multiple inputs; in the following examples we'll extend this to multiple outputs.

To motivate this, we are going to start out on a little project of building a flow-based algorithm for the problem of graph reachability. Given an abstract graph—represented as data in the form of a streaming list of edges—which vertices can be reached from a vertex passed in as the origin? It turns out this is fairly naturally represented as a dataflow program.

Note on terminology: In each of the next few examples, we're going to write a Hydroflow program (a dataflow graph) to process data that itself represents some other graph! To avoid confusion, in these examples, we'll refer to the Hydroflow program as a "flow" or "program", and the data as a "graph" of "edges" and "vertices".

But First: Graph Neighbors

Graph reachability exercises a bunch of concepts at once, so we'll start here with a simpler flow that finds graph neighbors: vertices that are just one hop away.

Our graph neighbors Hydroflow program will take our initial origin vertex as one input, and join it another input that streams in all the edges—this join will stream out the vertices that are one hop (edge) away from the starting vertex.

Here is an intuitive diagram of that dataflow program (we'll see complete, autogenerated Hydroflow diagrams below):

graph TD
  subgraph sources
    01[Stream of Edges]
  end
  subgraph neighbors of origin
    00[Origin Vertex]
    20("V ⨝ E")
    40[Output]

    00 --> 20
    
    01 ---> 20
    20 --> 40
    
  end

Lets take a look at some Hydroflow code that implements the program. In your simple project, replace the contents of src/main.rs with the following:

use hydroflow::hydroflow_syntax;

pub fn main() {
    // An edge in the input data = a pair of `usize` vertex IDs.
    let (edges_send, edges_recv) = hydroflow::util::unbounded_channel::<(usize, usize)>();

    let mut flow = hydroflow_syntax! {
        // inputs: the origin vertex (vertex 0) and stream of input edges
        origin = source_iter(vec![0]);
        stream_of_edges = source_stream(edges_recv);

        // the join
        my_join = join() -> flat_map(|(src, (_, dst))| [src, dst]);
        origin -> map(|v| (v, ())) -> [0]my_join;
        stream_of_edges -> [1]my_join;

        // the output
        my_join -> unique() -> for_each(|n| println!("Reached: {}", n));
    };

    println!(
        "{}",
        flow.meta_graph()
            .expect("No graph found, maybe failed to parse.")
            .to_mermaid()
    );
    edges_send.send((0, 1)).unwrap();
    edges_send.send((2, 4)).unwrap();
    edges_send.send((3, 4)).unwrap();
    edges_send.send((1, 2)).unwrap();
    edges_send.send((0, 3)).unwrap();
    edges_send.send((0, 3)).unwrap();
    flow.run_available();
}

Run the program and focus on the last three lines of output, which come from flow.run_available():

% cargo run
<build output>
<graph output>
Reached: 0
Reached: 3
Reached: 1

That looks right: the edges we "sent" into the flow that start at 0 are (0, 1) and (0, 3), so the nodes reachable from 0 in 0 or 1 hops are 0, 1, 3.

Note: When you run the program you may see the lines printed out in a different order. That's OK; the flow we're defining here is producing a set of nodes, so the order in which they are printed out is not specified. The sort_by operator can be used to sort the output of a flow.

Examining the Hydroflow Code

In the code, we want to start out with the origin vertex, 0, and the stream of edges coming in. Because this flow is a bit more complex than our earlier examples, we break it down into named "subflows", assigning them variable names that we can reuse. Here we specify two subflows, origin and stream_of_edges:

    origin = source_iter(vec![0]);
    stream_of_edges = source_stream(edges_recv);

The Rust syntax vec![0] constructs a vector with a single element, 0, which we iterate over using source_iter.

We then set up a join() that we name my_join, which acts like a SQL inner join.

    // the join
    my_join = join() -> flat_map(|(src, (_, dst))| [src, dst]);
    origin -> map(|v| (v, ())) -> [0]my_join;
    stream_of_edges -> [1]my_join;

First, note the syntax for passing data into a subflow with multiple inputs requires us to prepend an input index (starting at 0) in square brackets to the multi-input variable name or operator. In this example we have -> [0]my_join and -> [1]my_join.

Hydroflow's join() API requires a little massaging of its inputs to work properly. The inputs must be of the form of a pair of elements (K, V1) and (K, V2), and the operator joins them on equal keys K and produces an output of (K, (V1, V2)) elements. In this case we only want to join on the key v and don't have any corresponding value, so we feed origin through a map() to generate (v, ()) elements as the first join input.

The stream_of_edges are (src, dst) pairs, so the join's output is (src, ((), dst)) where dst are new neighbor vertices. So the my_join variable feeds the output of the join through a flat_map to extract the pairs into 2-item arrays, which are flattened to give us a list of all vertices reached. Finally we print the neighbor vertices as follows:

    my_join -> unique() -> for_each(|n| println!("Reached: {}", n));

The unique operator removes duplicates from the stream to make things more readable. Note that unique does not run in a streaming fashion, which we will talk about more below.

There's also some extra code here, flow.meta_graph().expect(...).to_mermaid(), which tells Hydroflow to generate a diagram rendered by Mermaid showing the structure of the graph, and print it to stdout. You can copy that text and paste it into the Mermaid Live Editor to see the graph, which should look as follows:

%%{init: {'theme': 'base', 'themeVariables': {'clusterBkg':'#ddd'}}}%%
flowchart TD
classDef pullClass fill:#02f,color:#fff,stroke:#000
classDef pushClass fill:#ff0,stroke:#000
linkStyle default stroke:#aaa,stroke-width:4px,color:red,font-size:1.5em;
subgraph "sg_1v1 stratum 0"
    1v1[\"(1v1) <tt>source_iter(vec! [0])</tt>"/]:::pullClass
    2v1[\"(2v1) <tt>source_stream(edges_recv)</tt>"/]:::pullClass
    5v1[\"(5v1) <tt>map(| v | (v, ()))</tt>"/]:::pullClass
    3v1[\"(3v1) <tt>join()</tt>"/]:::pullClass
    4v1[\"(4v1) <tt>flat_map(| (src, (_, dst)) | [src, dst])</tt>"/]:::pullClass
    1v1--->5v1
    2v1--1--->3v1
    5v1--0--->3v1
    3v1--->4v1
end
subgraph "sg_2v1 stratum 1"
    6v1[/"(6v1) <tt>unique()</tt>"\]:::pushClass
    7v1[/"(7v1) <tt>for_each(| n | println! (&quot;Reached: {}&quot;, n))</tt>"\]:::pushClass
    6v1--->7v1
end
4v1--->8v1
8v1["(8v1) <tt>handoff</tt>"]:::otherClass
8v1===o6v1

Notice in the mermaid graph how Hydroflow separates the unique operator and its downstream dependencies into their own stratum (plural: strata). Note also the edge coming into unique is bold and ends in a ball: this is because the input to unique is ``blocking'', meaning that unique should not run until all of the input on that edge has been received. The stratum boundary before unique ensures that the blocking property is respected.

You may also be wondering why the nodes in the graph have different colors (and shapes, for readers who cannot distinguish colors easily). The answer has nothing to do with the meaning of the program, only with the way that Hydroflow compiles operators into Rust. Simply put, blue (wide-topped) boxes pull data, yellow (wide-bottomed) boxes push data, and the handoff is a special operator that buffers pushed data for subsequent pulling. Hydroflow always places a handoff between a push producer and a pull consumer, for reasons explained in the Architecture chapter.

Strata and Ticks

Hydroflow runs each stratum in order, one at a time, ensuring all values are computed before moving on to the next stratum. Between strata we see a handoff, which logically buffers the output of the first stratum, and delineates the separation of execution between the 2 strata.

After all strata are run, Hydroflow returns to the first stratum; this begins the next tick. This doesn't really matter for this example, but it is important for long-running Hydroflow services that accept input from the outside world. More on this topic in the chapter on time.

Returning to the code, if you read the edges_send calls carefully, you'll see that the example data has vertices (2, 4) that are more than one hop away from 0, which were not output by our simple program. To extend this example to graph reachability, we need to recurse: find neighbors of our neighbors, neighbors of our neighbors' neighbors, and so on. In Hydroflow, this is done by adding a loop to the flow, as we'll see in our next example.