Getting Started with Stateful Dataflows
This guide will get you started with SDF, a utility that helps developers build, troubleshoot, and run full-featured event-driven dataflows.
Overview
This SDF release includes several new features and improvements. The main feature is composition, which allows you to create individual packages and import them to create dataflows.
Example Dataflow
As an example, we'll create a dataflow that splits sentences into words, and counts the number of characters in each word.
The dataflow reads from sentences
topic and writes to words
topics. This example is available for download in github.
Prerequisites
Building a Stateful Dataflow requires the following software:
- Rust Rust 1.72 or beyond - Install Rust
Installing Fluvio & Start a Cluster
SDF requires a Fluvio Cluster to consume, produce, and stream records between services.
Download and install the CLI.
$ curl -fsS https://hub.infinyon.cloud/install/install.sh | bash
This command will download the Fluvio Version Manager (fvm), Fluvio CLI (fluvio) and config files into $HOME/.fluvio
, with the executables in $HOME/.fluvio/bin
. To complete the installation, you must add the executables to your shell $PATH
.
Start a Local cluster:
$ fluvio cluster start
If you prefer to run your cluster in InfinyOn Cloud follow the instructions here.
Run the following command to check the CLI and the Cluster platform versions:
$ fluvio version
Your Fluvio cluster is ready for use.
Install and Setup SDF
SDF is in preview and it requires the following image:
fvm install sdf-preview13
Your SDF environment is ready to go.
Use Composition
to build a Stateful Dataflow
Composition has two main components, packages and dataflows. You can build and test a package independently, then import it into a dataflow. For additional information, check out the Composition section.
1. Building a Package
The package is a collection of services, functions, and states that can be defined, tested, then be imported into a dataflow.
Let's build one:
1.1 Create a Package File
Open the terminal, create a fresh project directory, say split-sentence
, where we'll add the dataflow later on. Inside the project directory, create a packages
directory and a subdirectory sentence
for the package itself:
$ mkdir -p split-sentence/packages/sentence
$ cd split-sentence/packages/sentence
Inside the sentence
directory and create an file called sdf-package.yaml
and add the following content:
apiVersion: 0.4.0
meta:
name: sentence-pkg
version: 0.1.0
namespace: example
functions:
sentence-to-words:
operator: flat-map
inputs:
- name: sentence
type: string
output:
type: string
augment-count:
operator: map
inputs:
- name: word
type: string
output:
type: string
dev:
converter: raw
The package file instructs the generator to build two functions, sentence-to-words
and augment-count
, that we intend to implement. In addition, it tells the sdf test
runtime that our inputs should be ingested as raw
for testing our routines.
1.2 Generate the Package Project
The SDF generate command parses the sdf-package.yaml
file and builds the project:
$ sdf generate
The generator created several directories and files that we'll edit next.
1.3 Add the Custom Code
First let's update the first function sentence-to-words
. Open rust/sentence-to-words/src/lib.rs
and update the function body with the following code:
fn sentence_to_words(sentence: String) -> Result<Vec<String>, String> {
Ok(sentence.split_whitespace().map(String::from).collect())
}
Next update augment_count
. Open rust/augment-count/src/lib.rs
and replace the function body:
fn augment_count(word: String) -> Result<String, String> {
Ok(format!("{}({})", word, word.chars().count()))
}
Let's add some tests as well:
#[cfg(test)]
mod test {
use super::*;
#[test]
fn test_augment_count() {
let input = "Hello".to_string();
let output = Component::augment_count(input);
assert_eq!(output.unwrap(), "Hello(5)");
}
}
We've implemented both functions, it's time to compile and test our work.
1.4 Build and Test the Package
To build the package, run:
$ sdf build
SDF has a built-in test
interactive shell. Let's bring it up:
$ sdf test
In the test shell, you can view the functions available for testing:
>> show functions
sentence-to-words
augment-count
Let's test sentence-to-words
first:
>> test function sentence-to-words --input "Hello World"
Hello
World
Next, test augment-count
:
>> test function augment-count --input "Hello"
Hello(5)
You may also test the rust
code via Cargo:
cd rust/augment-count
cargo test
Tips: If you get an error, you may need to use cargo component build
to compile, then cargo test
to run the tests.
The tests passed, and package now is ready to use in the dataflow file.
2. Build the Stateful Dataflow
We are building a dataflow that reads words from sentences
topic, and publishes the result to words
topic. Let's get started.
2.1 Create a Dataflow File
Navigate to the base project directory.
$ cd ../../
Create a file called dataflow.yaml
and copy/paste the following content:
apiVersion: 0.4.0
meta:
name: split-sentence
version: 0.1.0
namespace: example
imports:
- pkg: example/sentence-pkg@0.1.0
path: ./packages/sentence
functions:
- name: sentence-to-words
- name: augment-count
topics:
sentence:
schema:
value:
type: string
converter: raw
words:
schema:
value:
type: string
converter: raw
services:
sentence-words:
sources:
- type: topic
id: sentence
transforms:
- operator: flat-map
uses: sentence-to-words
- operator: map
uses: augment-count
sinks:
- type: topic
id: words
dev:
imports:
- pkg: example/sentence-pkg@0.1.0
path: ./packages/sentence
This example focuses on composition, and the area of interest is the imports
section:
package
- is the name of the package we are importing (a composition of themeta
fields of the package)path
- is the relative directory where this package can be found.functions
- the name of the functions we want to import.
The imported functions are then referenced them by name in the transforms
section.
2.2 Run the Dataflow
Let's run the project:
$ sdf run --ui
Please visit http://127.0.0.1:8000 to view your workflow visualization
>>
Note:
- The
run
command performs multiple operations:- imports and links all packages
- compiles inline code (if needed)
- looks-up the topics in the cluster and automatically creates them if they don't exist.
- The
--ui
flag generates a visual representation of the dataflow at http://127.0.0.1:8000. - When you close the
run
intractive editor, the dataflow stops processing records.
2.3 Test the Dataflow
To test the dataflow, we'll generate to one topic and read from the other.
Produce a sentence in the sentence
topic:
$ echo 'Hello from stateful dataflows' | fluvio produce sentence
Consume from the words
topic:
$ fluvio consume words -Bd
Consuming records from 'words' starting from the beginning of log
Hello(5)
from(4)
stateful(8)
dataflows(9)
Note, the both functions have been applied, with the first function spliting the sentence into words and the second counting the number of characters in each word.
SDF also exposes execution metrics. To view the metrics, run:
>> show state
Namespace Keys Type
sentence-words/augment-count/metrics 1 table
sentence-words/sentence/topic.offset 1 offset
sentence-words/sentence-to-words/metrics 1 table
sentence-words/sentence/metrics 1 table
Let's pick a metric:
>> show state sentence-words/augment-count/metrics --table
Key Window succeeded failed
stats * 4 0
🎉 Congratulations! You have successfully used compositino to build a Stateful Dataflow.
Other Examples
For additional examples, check out stateful-dataflows-examples in github. The examples cover additional functionality shipped in prior preview releases.
We Love Feedback
Stateful Dataflows is an ambitious project with many possibilities and just as many hazards. Please get in touch, we would love to hear your feedback and help us steer the product in the right direction.