Data Sources and Sinks in Rust
Any useful flow requires us to define sources of data, either generated computationally or received from and outside environment via I/O.
source_iter
A flow can receive data from a Rust collection object via the source_iter
operator, which takes the
iterable collection as an argument and passes the items down the flow.
For example, here we iterate through a vector of usize
items and push them down the flow:
source_iter(vec![0, 1]) -> ...
The Hello, World example above uses this construct.
source_stream
More commonly, a flow should handle external data coming in asynchronously from a Tokio runtime.
One way to do this is with channels that allow Rust code to send data into the Hydroflow inputs.
The code below creates a channel for data of (Rust) type (usize, usize)
:
let (input_send, input_recv) = hydroflow::util::unbounded_channel::<(usize, usize)>();
Under the hood this uses Tokio unbounded channels. Now in Rust we can now push data into the channel. E.g. for testing we can do it explicitly as follows:
input_send.send((0, 1)).unwrap()
And in our Hydroflow syntax we can receive the data from the channel using the source_stream
syntax and
pass it along a flow:
source_stream(input_recv) -> ...
To put this together, let's revisit our Hello, World example from above with data sent in from outside the flow:
#![allow(unused)] fn main() { use hydroflow::hydroflow_syntax; let (input_send, input_recv) = hydroflow::util::unbounded_channel::<&str>(); let mut flow = hydroflow_syntax! { source_stream(input_recv) -> map(|x| x.to_uppercase()) -> for_each(|x| println!("{}", x)); }; input_send.send("Hello").unwrap(); input_send.send("World").unwrap(); flow.run_available(); }
TODO: add source_stream_serde