Networked Services 1: EchoServer

In this example we cover:

  • The standard project template for networked Hydroflow services.
  • Rust's clap crate for command-line options
  • Defining message types
  • Destination operators (e.g. for sending data to a network)
  • Network sources and dests with built-in serde (source_stream_serde, dest_sink_serde)
  • The source_stdin source
  • Long-running services via run_async

Our examples up to now have been simple single-node programs, to get us comfortable with Hydroflow's surface syntax. But the whole point of Hydroflow is to help us write distributed programs or services that run on a cluster of machines!

In this example we'll study the "hello, world" of distributed systems -- a simple echo server. It will listen on a UDP port, and send back a copy of any message it receives, with a timestamp. We will also look at a client to accept strings from the command line, send them to the echo server, and print responses.

We will use a fresh hydroflow-template project template to get started. Change to the directory where you'd like to put your project, and once again run:

cargo generate hydro-project/hydroflow-template

Then change directory into the resulting project.

The for the template project is a good place to start. It contains a brief overview of the project structure, and how to build and run the example. Here we'll spend more time learning from the code.

Hydroflow Project Structure

The Hydroflow template project auto-generates this example for us. If you prefer, you can find the source in the examples/echo_server directory of the Hydroflow repository.

The directory structure encouraged by the template is as follows:

project/           # documentation
project/Cargo.toml          # package and dependency info
project/src/         # main function
project/src/     # message types exchanged between roles
project/src/      # helper functions used by all roles
project/src/<roleA>.rs      # service definition for role A (e.g. server)
project/src/<roleB>.rs      # service definition for role B (e.g. client)

In the default example, the roles we use are Client and Server, but you can imagine different roles depending on the structure of your service or application.

We start with a main function that parses command-line options, and invokes the appropriate role-specific service. After a prelude of imports, we start by defining a Rust enum for the Roles that the service supports.

use clap::{Parser, ValueEnum};
use client::run_client;
use hydroflow::tokio;
use hydroflow::util::{bind_udp_bytes, ipv4_resolve};
use server::run_server;
use std::net::SocketAddr;

mod client;
mod helpers;
mod protocol;
mod server;

#[derive(Clone, ValueEnum, Debug)]
enum Role {

Following that, we use Rust's clap (Command Line Argument Parser) crate to parse command-line options:

#[derive(Parser, Debug)]
struct Opts {
    #[clap(value_enum, long)]
    role: Role,
    #[clap(long, value_parser = ipv4_resolve)]
    addr: Option<SocketAddr>,
    #[clap(long, value_parser = ipv4_resolve)]
    server_addr: Option<SocketAddr>,

This sets up 3 command-line flags: role, addr, and server_addr. Note how the addr and server_addr flags are made optional via wrapping in a Rust Option; by contrast, the role option is required. The clap crate will parse the command-line options and populate the Opts struct with the values. clap handles parsing the command line strings into the associated Rust types -- the value_parser attribute tells clap to use Hydroflow's ipv4_resolve helper function to parse a string like "" into a SocketAddr.

This brings us to the main function itself. It is prefaced by a #[tokio::main] attribute, which is a macro that sets up the tokio runtime. This is necessary because Hydroflow uses the tokio runtime for asynchronous execution as a service.

async fn main() {
    // parse command line arguments
    let opts = Opts::parse();
    // if no addr was provided, we ask the OS to assign a local port by passing in "localhost:0"
    let addr = opts
        .unwrap_or_else(|| ipv4_resolve("localhost:0").unwrap());

    // allocate `outbound` sink and `inbound` stream
    let (outbound, inbound, addr) = bind_udp_bytes(addr).await;
    println!("Listening on {:?}", addr);

After parsing the command line arguments we set up some Rust-based networking. Specifically, for either client or server roles we will need to allocate a UDP socket that is used for both sending and receiving messages. We do this by calling the async bind_udp_bytes function, which is defined in the hydroflow/src/util module. As an async function it returns a Future, so requires appending .await; the function returns a triple of type (UdpSink, UdpSource, SocketAddr). The first two are the types that we'll use in Hydroflow to send and receive messages, respectively. (Note: your IDE might expand out the UdpSink and UdpSource traits to their more verbose definitions. That is fine; you can ignore those.) The SocketAddr is there in case you specified port 0 in your addr argument, in which case this return value tells you what port the OS has assigned for you.

All that's left is to fire up the code for the appropriate role!

    match opts.role {
        Role::Server => {
            run_server(outbound, inbound, opts).await;
        Role::Client => {
            run_client(outbound, inbound, opts).await;

As a design pattern, it is natural in distributed Hydroflow programs to define various message types in a file with structures shared for use by all the Hydroflow logic across roles. In this simple example, we define only one message type: EchoMsg, and a simple struct with two fields: payload and ts (timestamp). The payload field is a string, and the ts field is a DateTime<Utc>, which is a type from the chrono crate. Note the various derived traits on EchoMsg—specifically Serialize and Deserialize—these are required for structs that we send over the network.

use chrono::prelude::*;
use serde::{Deserialize, Serialize};

#[derive(PartialEq, Clone, Serialize, Deserialize, Debug)]
pub struct EchoMsg {
    pub payload: String,
    pub ts: DateTime<Utc>,

Things get interesting when we look at the run_server function. This function is the main entry point for the server. It takes as arguments the outbound and inbound sockets from main, and the Opts (which are ignored).

After printing a cheery message, we get into the Hydroflow code for the server, consisting of three short pipelines.

use crate::protocol::EchoMsg;
use chrono::prelude::*;
use hydroflow::hydroflow_syntax;
use hydroflow::scheduled::graph::Hydroflow;
use hydroflow::util::{UdpSink, UdpStream};
use std::net::SocketAddr;

pub(crate) async fn run_server(outbound: UdpSink, inbound: UdpStream, _opts: crate::Opts) {
    println!("Server live!");

    let mut flow: Hydroflow = hydroflow_syntax! {
        // Define a shared inbound channel
        inbound_chan = source_stream_serde(inbound) -> tee();

        // Print all messages for debugging purposes
            -> for_each(|(msg, addr): (EchoMsg, SocketAddr)| println!("{}: Got {:?} from {:?}", Utc::now(), msg, addr));

        // Echo back the Echo messages with updated timestamp
            -> map(|(EchoMsg {payload, ..}, addr)| (EchoMsg { payload, ts: Utc::now() }, addr) ) -> dest_sink_serde(outbound);

    // run the server

Lets take the Hydroflow code one statement at a time.

The first pipeline, inbound_chan uses a source operator we have not seen before, source_stream_serde(). This is a streaming source like source_stream, but for network streams. It takes a UdpSource as an argument, and has a particular output type: a stream of (T, SocketAddr) pairs where T is some type that implements the Serialize and Deserialize traits (together known as "serde"), and SocketAddr is the network address of the sender of the item. In this case, T is EchoMsg, which we defined in, and the SocketAddr is the address of the client that sent the message. We pipe the result into a tee() for reuse.

The second pipeline is a simple for_each to print the messages received at the server.

The third and final pipeline constructs a response EchoMsg with the local timestamp copied in. It then pipes the result into a dest_XXX operator—the first that we've seen! A dest is the opposite of a source_XXX operator: it can go at the end of a pipeline and sends data out on a tokio channel. The specific operator used here is dest_sink_serde(). This is a dest operator like dest_sink, but for network streams. It takes a UdpSink as an argument, and requires a particular input type: a stream of (T, SocketAddr) pairs where T is some type that implements the Serialize and Deserialize traits, and SocketAddr is the network address of the destination. In this case, T is once again EchoMsg, and the SocketAddr is the address of the client that sent the original message.

The remaining line of code runs the server. The run_async() function is a method on the Hydroflow type. It is an async function, so we append .await to the call. The program will block on this call until the server is terminated.

The client begins by making sure the user specified a server address at the command line. After printing a message to the terminal, it constructs a hydroflow graph.

Again, we start the hydroflow code defining shared inbound and outbound channels. The code here is simplified compared to the server because the inbound_chan and outbound_chan are each referenced only once, so they do not require tee or merge operators, respectively (they have been commented out).

The inbound_chan drives a pipeline that prints messages to the screen.

Only the last pipeline is novel for us by now. It uses another new source operator source_stdin(), which does what you might expect: it streams lines of text as they arrive from stdin (i.e. as they are typed into a terminal). It then uses a map to construct an EchoMsg with each line of text and the current timestamp. The result is piped into a sink operator dest_sink_serde(), which sends the message to the server.

The client logic ends by launching the flow graph with flow.run_async().await.unwrap().

use crate::protocol::EchoMsg;
use crate::Opts;
use chrono::prelude::*;
use hydroflow::hydroflow_syntax;
use hydroflow::util::{UdpSink, UdpStream};
use std::net::SocketAddr;

pub(crate) async fn run_client(outbound: UdpSink, inbound: UdpStream, opts: Opts) {
    // server_addr is required for client
    let server_addr = opts.server_addr.expect("Client requires a server address");
    println!("Client live!");

    let mut flow = hydroflow_syntax! {
        // Define shared inbound and outbound channels
        inbound_chan = source_stream_serde(inbound)
            // -> tee() // commented out since we only use this once in the client template
        outbound_chan = // merge() ->  // commented out since we only use this once in the client template

        // Print all messages for debugging purposes
            -> for_each(|(m, a): (EchoMsg, SocketAddr)| println!("{}: Got {:?} from {:?}", Utc::now(), m, a));

        // take stdin and send to server as an Message::Echo
        source_stdin() -> map(|l| (EchoMsg{ payload: l.unwrap(), ts: Utc::now(), }, server_addr) )
            -> outbound_chan;


Running the example

As described in the file, we can run the server in one terminal, and the client in another. The server will print the messages it receives, and the client will print the messages it receives back from the server. The client and servers' `--server-addr' arguments need to match or this won't work!

Fire up the server in terminal 1:

% cargo run -p hydroflow --example echoserver -- --role server --addr localhost:12347

Then start the client in terminal 2 and type some messages!

% cargo run -p hydroflow --example echoserver -- --role client --server-addr localhost:12347
Listening on
Connecting to server at
Client live!
This is a test
2022-12-15 05:40:11.258052 UTC: Got Echo { payload: "This is a test", ts: 2022-12-15T05:40:11.257145Z } from
This is the rest
2022-12-15 05:40:14.025216 UTC: Got Echo { payload: "This is the rest", ts: 2022-12-15T05:40:14.023577Z } from

And have a look back at the server console!

Listening on
Server live!
2022-12-15 05:40:11.256640 UTC: Got Echo { payload: "This is a test", ts: 2022-12-15T05:40:11.254207Z } from
2022-12-15 05:40:14.023363 UTC: Got Echo { payload: "This is the rest", ts: 2022-12-15T05:40:14.020897Z } from