Skip to main content

Dataflow - Composition

Composable Dataflows allow you to create individual package and import them to create dataflows. They are a great way to organize and reuse your code in large dataflows. For simple dataflows, we recommend using Inline Dataflows.

Composition Diagram

Directory Structure

Dataflow and packages require a definition file for code generation and composition:

Consequently, each dataflow and package must have its dedicated directory.

For example, a dataflow named split-sentence and a package sentence:

split-sentence
├── dataflow.yaml
└── packages
├── sentence
│ └── sdf-package.yaml

While placing the packages inside a dataflow is not strictly necessary, it often makes sense.

Packages

Packages are dataflow building blocks of one or more types, states, and functions. You can structure all the components in one package or divide them by responsibility across multiple packages. The packages also allow hierarchical imports, where a package can use other packages' types, states, or functions.

sdf-package.yaml

The SDF command line tool uses the stateful dataflows package definition file sdf-package.yaml to generate the code and state and type definitions for the package. The package file has the following hierarchy:

apiVersion: <version>

meta:
name: <package-name>
version: <package-version>
namespace: <package-namespace>

imports:
- pkg: <package-namespace>/<package-name>@<package-version>
types:
- name: <type-name>
functions:
- name: <function-name>
states:
- name: <state-name>

types:
<type-name>: <type-props>

states:
<state-name>: <state-props>

functions:
<function-name>:
operator: <operator-type>
states:
- name: <state-name>
inputs:
- name: <input-name>
type: <input-type>
output:
type: <output-type>

dev:
converter: <converter-props>
imports:
- pkg: <package-namespace>/<package-name>@<package-version>
path: <relative-path>

The sections are as follows:

  • meta - metadata about the package
  • imports - imports other packages' types, states, functions
  • types - defines types used in the package
  • states - defines states used in functions
  • functions - defines functions in the package
  • dev - defines substitutions used during development and test

Build and Test a Package

We'll use the SDF command line tool with the package definition file to generate the WebAssembly glue code and the placeholder for the custom logic.

$ sdf -h
Stateful Dataflow Command Line Interface

Usage: sdf <COMMAND>

Commands:
clean Clean generated artifacts such as state and internal objects
build Build package (requires: sdf-package.yaml)
generate Generate package (requires: sdf-package.yaml)
update Update package (requires: sdf-package.yaml)
test Test command shell (requires sdf-package.yaml)
run Run dataflow (requires: dataflow.yaml)
setup Setup pre-requisites for Stateful Dataflows
version Prints binary version
log Print dataflow logs

Let's start with an example. Create a package that split sentences into words and counts the number of characters in each word.

1. Create the Package file

Create a fresh project directory split-sentence with two subdirectories: packages and sentence:

$ mkdir -p split-sentence/packages/sentence
$ cd split-sentence/packages/sentence

Inside the sentence directory and create the sdf-package.yaml and add the following content:

#sdf-package.yanl
apiVersion: 0.4.0

meta:
name: sentence-pkg
version: 0.1.0
namespace: example

functions:

sentence-to-words:
operator: flat-map
inputs:
- name: sentence
type: string
output:
type: string

augment-count:
operator: map
inputs:
- name: word
type: string
output:
type: string

dev:
converter: raw
2. Generate the Package Project

Use SDF generate command to build the project:

$ sdf generate

The generator created several directories and files that we'll edit next.

3. Add the Custom Code

First, let's update the first function, sentence-to-words. Open rust/sentence-to-words/src/lib.rs and update the function body with the following code:

pub(crate) fn sentence_to_words(sentence: String) -> Result<Vec<String>, String> {
Ok(sentence.split_whitespace().map(String::from).collect())
}

Next update augment_count. Open rust/augment-count/src/lib.rs and replace the function body:

pub(crate) fn augment_count(word: String) -> Result<String, String> {
Ok(format!("{}({})", word, word.chars().count()))
}

Let's add some tests as well:

#[cfg(test)]
mod test {
use super::*;

#[test]
fn test_augment_count() {
let input = "Hello".to_string();
let output = augment_count(input);
assert_eq!(output.unwrap(), "Hello(5)");
}
}

We've implemented both functions; it's time to compile and test our work.

4. Build and Test the Package

To build the package, run:

$ sdf build

Use sdf test interactive shell to test the code:

$ sdf test

In the test shell, you can view the functions available for testing:

>> show functions
sentence-to-words
augment-count

Let's test sentence-to-words first:

>> test function sentence-to-words --input "Hello World"
Hello
World

Next, test augment-count:

>> test function augment-count --input "Hello"
Hello(5)

You may also test the rust code via Cargo:

$ cd rust
$ cargo test

The tests passed, and the package is now ready to use in the dataflow file.

The Dataflow

The dataflow imports functions, types, and states from one more package. Packages may also import components from others; however, dataflow maintains the final composition.

dataflow.yaml

The SDF command line tool uses the dataflow definition file dataflow.yaml to assemble the data application, and it has the following hierarchy:

apiVersion: <version>

meta:
name: <dataflow-name>
version: <dataflow-version>
namespace: <dataflow-namespace>

imports:
- pkg: <package-namespace>/<package-name>@<package-version>
types:
- name: <type-name>
functions:
- name: <function-name>
states:
- name: <state-name>

config:
converter: <converter-props>
consumer: <consumer-props>

types:
<type-name>: <type-props>

topics:
<topic-name>: <topic-props>

services:
<service-name>:
sources:
-type: <source-props>

states:
<state-name>: <state-props>

transforms:
- operator: <operator-type>
uses: <imported-function-name>

sinks:
- type: <sink-props>

dev:
converter: <converter>
imports:
- pkg: <package-namespace>/<package-name>@<package-version>
path: <relative-path>

The sections are as follows:

  • meta - metadata about the package
  • imports - imports other packages' types, states, functions
  • config - short for configurations, holds the default configurations.
  • types - defines types used in the package
  • topics - defines topics used in the package
  • states - defines states used in functions
  • services - defines the service properties
    • sources - defines the source topics used in the service
    • states - defines the state properties used in the service
    • transforms - defines the list of operators and the imported functions used in the service
    • sinks - defines the sink topics used in the service
  • dev - defines substitutions used during development and test

The dataflow file has other variants such as window and partitions, which are omitted for simplicity. For additional details check the Dataflow file section.

Create a Dataflow

We'll import the package built in the previous section to create the split-sentence dataflow.

1. Create the Dataflow file

Ceate a dataflow file in the directory split-sentence directory:

$ cd split-sentence

Create the dataflow.yaml and add the following content:

apiVersion: 0.4.0

meta:
name: split-sentence
version: 0.1.0
namespace: example

imports:
- pkg: example/sentence-pkg@0.1.0
functions:
- name: sentence-to-words
- name: augment-count

config:
converter: raw

topics:
sentence:
schema:
value:
type: string
converter: raw
words:
schema:
value:
type: string
converter: raw

services:
sentence-words:
sources:
- type: topic
id: sentence

transforms:
- operator: flat-map
uses: sentence-to-words
- operator: map
uses: augment-count

sinks:
- type: topic
id: words

dev:
imports:
- pkg: example/sentence-pkg@0.1.0
path: ./packages/sentence
2. Run the Dataflow

Use sdf command line tool to run the dataflow:

$ sdf run --dev

Use --dev to ask the engine to change the path to the local package. Without this flag, the engine will look for the package in InfinyOn Hub.

3. Test the Dataflow
  1. Produce sentences to in sentence topic:
$ fluvio produce sentence
Hello world
Hi there

Consume from words to retrieve the result:

$ fluvio consume words -Bd
Hello(5)
world(5)
Hi(2)
there(5)
4. Show State

The dataflow collects runtime metrics that you can inspect in the runtime terminal.

Check the sentence-to-words counters:

>> show state sentence-words/sentence-to-words/metrics --table
Key Window succeeded failed
stats * 2 0

Check the augment-count counters:

>> show state sentence-words/augment-count/metrics --table
Key Window succeeded failed
stats * 4 0

Congratulations! You've successfully built and run a composable dataflow! The project is available for download in github.

5. Clean-up

Exit the sdf terminal and remove the topics:

fluvio topic delete sentence
fluvio topic delete words

References