Merge
Dataflows can merge events from multiple topics. The merge behavior is defined in the sources
section of the service definition. The record type must match the schema of the target topic, otherwise use the map
operator transform as required.
sources:
- type: topic
id: <topic-id>
transforms:
- operator: <operator-name>
...
- type: topic
id: <topic-id>
transforms:
- operator: <operator-name>
The transforms
section defines the transformation to apply to the event before sending it to the target topic. The operator type depends on the desired business logic.
Merge Example
We'll create a dataflow that reads records from the child
and adult
topics and merges into the people
topic.
1. Create a Dataflow file
Create a directory merge-test
:
$ mkdir merge-test; cd merge-test
Add the following dataflow.yaml
file:
# dataflow.yaml
apiVersion: 0.4.0
meta:
name: merge
version: 0.1.0
namespace: examples
config:
converter: json
types:
person:
type: object
properties:
name:
type: string
age:
type: i32
topics:
child:
schema:
value:
type: person
adult:
schema:
value:
type: person
people:
schema:
value:
type: person
services:
merge-service:
sources:
- type: topic
id: child
- type: topic
id: adult
sinks:
- type: topic
id: people
In this example, we don't use any operators as the target schema matches the source. If there is a mismatch, use map
and update the record accordingly.
$ sdf run
Add --ui
if you want to see the graphical representation of the dataflow.
2. Test the Dataflow
Produce to adult
:
$ fluvio produce adult
{"name":"Randy","age":32}
{"name":"Alice","age":28}
Produce to child
:
$ fluvio produce child
{"age":16,"name":"Andrew"}
{"age":17,"name":"Jackson"}
{"age":15,"name":"Linda"}
Consume from people
:
%copy%
$ fluvio consume people -Bd
{"age":32,"name":"Randy"}
{"age":28,"name":"Alice"}
{"age":16,"name":"Andrew"}
{"age":17,"name":"Jackson"}
{"age":15,"name":"Linda"}
In summary, merge-service
utilizes sources
to merge the data from different topics: child
and adult
into a single topic people
.