Dataflow - Composition
Composable Dataflows allow you to create individual package and import them to create dataflows. They are a great way to organize and reuse your code in large dataflows. For simple dataflows, we recommend using Inline Dataflows.
Directory Structure
Dataflow and packages require a definition file for code generation and composition:
- sdf-package.yaml - code generation definitions
- dataflow.yaml - composition definitions
Consequently, each dataflow and package must have its dedicated directory.
For example, a dataflow named split-sentence
and a package sentence
:
split-sentence
├── dataflow.yaml
└── packages
├── sentence
│ └── sdf-package.yaml
While placing the packages inside a dataflow is not strictly necessary, it often makes sense.
Packages
Packages are dataflow building blocks of one or more types, states, and functions. You can structure all the components in one package or divide them by responsibility across multiple packages. The packages also allow hierarchical imports, where a package can use other packages' types, states, or functions.
sdf-package.yaml
The SDF command line tool uses the stateful dataflows package definition file sdf-package.yaml
to generate the code and state and type definitions for the package. The package file has the following hierarchy:
apiVersion: <version>
meta:
name: <package-name>
version: <package-version>
namespace: <package-namespace>
imports:
- pkg: <package-namespace>/<package-name>@<package-version>
types:
- name: <type-name>
functions:
- name: <function-name>
states:
- name: <state-name>
types:
<type-name>: <type-props>
states:
<state-name>: <state-props>
functions:
<function-name>:
operator: <operator-type>
states:
- name: <state-name>
inputs:
- name: <input-name>
type: <input-type>
output:
type: <output-type>
dev:
converter: <converter-props>
imports:
- pkg: <package-namespace>/<package-name>@<package-version>
path: <relative-path>
The sections are as follows:
meta
- metadata about the packageimports
- imports other packages' types, states, functionstypes
- defines types used in the packagestates
- defines states used in functionsfunctions
- defines functions in the packagedev
- defines substitutions used during development and test
Build and Test a Package
We'll use the SDF command line tool with the package definition file to generate the WebAssembly glue code and the placeholder for the custom logic.
$ sdf -h
Stateful Dataflow Command Line Interface
Usage: sdf <COMMAND>
Commands:
clean Clean generated artifacts such as state and internal objects
build Build package (requires: sdf-package.yaml)
generate Generate package (requires: sdf-package.yaml)
update Update package (requires: sdf-package.yaml)
test Test command shell (requires sdf-package.yaml)
run Run dataflow (requires: dataflow.yaml)
setup Setup pre-requisites for Stateful Dataflows
version Prints binary version
log Print dataflow logs
Let's start with an example. Create a package that split sentences into words and counts the number of characters in each word.
1. Create the Package file
Create a fresh project directory split-sentence
with two subdirectories: packages
and sentence
:
$ mkdir -p split-sentence/packages/sentence
$ cd split-sentence/packages/sentence
Inside the sentence
directory and create the sdf-package.yaml
and add the following content:
#sdf-package.yanl
apiVersion: 0.4.0
meta:
name: sentence-pkg
version: 0.1.0
namespace: example
functions:
sentence-to-words:
operator: flat-map
inputs:
- name: sentence
type: string
output:
type: string
augment-count:
operator: map
inputs:
- name: word
type: string
output:
type: string
dev:
converter: raw
2. Generate the Package Project
Use SDF generate command to build the project:
$ sdf generate
The generator created several directories and files that we'll edit next.
3. Add the Custom Code
First, let's update the first function, sentence-to-words
. Open rust/sentence-to-words/src/lib.rs
and update the function body with the following code:
pub(crate) fn sentence_to_words(sentence: String) -> Result<Vec<String>, String> {
Ok(sentence.split_whitespace().map(String::from).collect())
}
Next update augment_count
. Open rust/augment-count/src/lib.rs
and replace the function body:
pub(crate) fn augment_count(word: String) -> Result<String, String> {
Ok(format!("{}({})", word, word.chars().count()))
}
Let's add some tests as well:
#[cfg(test)]
mod test {
use super::*;
#[test]
fn test_augment_count() {
let input = "Hello".to_string();
let output = augment_count(input);
assert_eq!(output.unwrap(), "Hello(5)");
}
}
We've implemented both functions; it's time to compile and test our work.
4. Build and Test the Package
To build the package, run:
$ sdf build
Use sdf test
interactive shell to test the code:
$ sdf test
In the test shell, you can view the functions available for testing:
>> show functions
sentence-to-words
augment-count
Let's test sentence-to-words
first:
>> test function sentence-to-words --input "Hello World"
Hello
World
Next, test augment-count
:
>> test function augment-count --input "Hello"
Hello(5)
You may also test the rust
code via Cargo:
$ cd rust
$ cargo test
The tests passed, and the package is now ready to use in the dataflow file.
The Dataflow
The dataflow imports functions, types, and states from one more package. Packages may also import components from others; however, dataflow maintains the final composition.
dataflow.yaml
The SDF command line tool uses the dataflow definition file dataflow.yaml
to assemble the data application, and it has the following hierarchy:
apiVersion: <version>
meta:
name: <dataflow-name>
version: <dataflow-version>
namespace: <dataflow-namespace>
imports:
- pkg: <package-namespace>/<package-name>@<package-version>
types:
- name: <type-name>
functions:
- name: <function-name>
states:
- name: <state-name>
config:
converter: <converter-props>
consumer: <consumer-props>
types:
<type-name>: <type-props>
topics:
<topic-name>: <topic-props>
services:
<service-name>:
sources:
-type: <source-props>
states:
<state-name>: <state-props>
transforms:
- operator: <operator-type>
uses: <imported-function-name>
sinks:
- type: <sink-props>
dev:
converter: <converter>
imports:
- pkg: <package-namespace>/<package-name>@<package-version>
path: <relative-path>
The sections are as follows:
meta
- metadata about the packageimports
- imports other packages' types, states, functionsconfig
- short for configurations, holds the default configurations.types
- defines types used in the packagetopics
- defines topics used in the packagestates
- defines states used in functionsservices
- defines the service propertiessources
- defines the source topics used in the servicestates
- defines the state properties used in the servicetransforms
- defines the list of operators and the imported functions used in the servicesinks
- defines the sink topics used in the service
dev
- defines substitutions used during development and test
The dataflow file has other variants such as window
and partitions
, which are omitted for simplicity. For additional details check the Dataflow file section.
Create a Dataflow
We'll import the package built in the previous section to create the split-sentence
dataflow.
1. Create the Dataflow file
Ceate a dataflow file in the directory split-sentence
directory:
$ cd split-sentence
Create the dataflow.yaml
and add the following content:
apiVersion: 0.4.0
meta:
name: split-sentence
version: 0.1.0
namespace: example
imports:
- pkg: example/sentence-pkg@0.1.0
functions:
- name: sentence-to-words
- name: augment-count
config:
converter: raw
topics:
sentence:
schema:
value:
type: string
converter: raw
words:
schema:
value:
type: string
converter: raw
services:
sentence-words:
sources:
- type: topic
id: sentence
transforms:
- operator: flat-map
uses: sentence-to-words
- operator: map
uses: augment-count
sinks:
- type: topic
id: words
dev:
imports:
- pkg: example/sentence-pkg@0.1.0
path: ./packages/sentence
2. Run the Dataflow
Use sdf
command line tool to run the dataflow:
$ sdf run --dev
Use --dev
to ask the engine to change the path to the local package. Without this flag, the engine will look for the package in InfinyOn Hub
.
3. Test the Dataflow
- Produce sentences to in
sentence
topic:
$ fluvio produce sentence
Hello world
Hi there
Consume from words
to retrieve the result:
$ fluvio consume words -Bd
Hello(5)
world(5)
Hi(2)
there(5)
4. Show State
The dataflow collects runtime metrics that you can inspect in the runtime terminal.
Check the sentence-to-words
counters:
>> show state sentence-words/sentence-to-words/metrics --table
Key Window succeeded failed
stats * 2 0
Check the augment-count
counters:
>> show state sentence-words/augment-count/metrics --table
Key Window succeeded failed
stats * 4 0
Congratulations! You've successfully built and run a composable dataflow! The project is available for download in github.
5. Clean-up
Exit the sdf
terminal and remove the topics:
fluvio topic delete sentence
fluvio topic delete words