Flow Syntax

Flows consist of named operators that are connected via flow edges denoted by ->. The example below uses the source_iter operator to generate two strings from a Rust vec, the map operator to apply some Rust code to uppercase each string, and the for_each operator to print each string to stdout.

source_iter(vec!["Hello", "world"])
    -> map(|x| x.to_uppercase()) -> for_each(|x| println!("{}", x));

Flows can be assigned to variable names for convenience. E.g, the above can be rewritten as follows:

source_iter(vec!["Hello", "world"]) -> upper_print;
upper_print = map(|x| x.to_uppercase()) -> for_each(|x| println!("{}", x));

Note that the order of the statements (lines) doesn't matter. In this example, upper_print is referenced before it is assigned, and that is completely OK and better matches the flow of data, making the program more understandable.

Operators with Multiple Ports

Some operators have more than one input port that can be referenced by ->. For example merge merges the contents of many flows, so it can have an abitrary number of input ports. Some operators have multiple outputs, notably tee, which has an arbitrary number of outputs.

In the syntax, we optionally distinguish input ports via an indexing prefix number in square brackets before the name (e.g. [0]my_join and [1]my_join). We can distinguish output ports by an indexing suffix (e.g. my_tee[0]).

Here is an example that tees one flow into two, handles each separately, and then merges them to print out the contents in both lowercase and uppercase:

my_tee = source_iter(vec!["Hello", "world"]) -> tee();
my_tee -> map(|x| x.to_uppercase()) -> my_merge;
my_tee -> map(|x| x.to_lowercase()) -> my_merge;
my_merge = merge() -> for_each(|x| println!("{}", x));

merge() and tee() treat all their input/outputs the same, so we omit the indexing.

Here is a visualization of the flow that was generated:

flowchart TD
classDef pullClass fill:#02f,color:#fff,stroke:#000
classDef pushClass fill:#ff0,stroke:#000
linkStyle default stroke:#aaa,stroke-width:4px,color:red,font-size:1.5em;
subgraph sg_1v1 ["sg_1v1 stratum 0"]
    1v1[\"(1v1) <tt>source_iter(vec! [&quot;Hello&quot;, &quot;world&quot;])</tt>"/]:::pullClass
    2v1[/"(2v1) <tt>tee()</tt>"\]:::pushClass
    subgraph sg_1v1_var_my_tee ["var <tt>my_tee</tt>"]
subgraph sg_2v1 ["sg_2v1 stratum 0"]
    3v1[\"(3v1) <tt>map(| x : &amp; str | x.to_uppercase())</tt>"/]:::pullClass
    4v1[\"(4v1) <tt>map(| x : &amp; str | x.to_lowercase())</tt>"/]:::pullClass
    5v1[\"(5v1) <tt>merge()</tt>"/]:::pullClass
    6v1[/"(6v1) <tt>for_each(| x | println! (&quot;{}&quot;, x))</tt>"\]:::pushClass
    subgraph sg_2v1_var_my_merge ["var <tt>my_merge</tt>"]
7v1["(7v1) <tt>handoff</tt>"]:::otherClass
8v1["(8v1) <tt>handoff</tt>"]:::otherClass

Hydroflow compiled this flow into two subgraphs called compiled components, connected by handoffs. You can ignore these details unless you are interested in low-level performance tuning; they are explained in the discussion of in-out trees.

A note on assigning flows with multiple ports

TODO: Need to document the port numbers for variables assigned to tree- or dag-shaped flows

The context object

Closures inside surface syntax operators have access to a special context object which provides access to scheduling, timing, and state APIs. The object is accessible as a shared reference (&Context) via the special name context. Here is the full API documentation for Context.

    -> for_each(|()| println!("Current tick: {}, stratum: {}", context.current_tick(), context.current_stratum()));
// Current tick: 0, stratum: 0