Skip to main content
Version: 0.11.9

Fluvio MQTT Connector

Official Infinyon MQTT connector

Source Connector

Reads record from MQTT topic and writes to Fluvio topic.

Supports MQTT V3.1.1 and V5 protocols.

See docs here. Tutorial for MQTT to SQL Pipeline.

Configuration

Optiondefaulttypedescription
timeout60sDurationmqtt broker connect timeout in seconds and nanoseconds
url-SecretStringMQTT url which includes schema, domain, port and credentials such as username and password.
topic-Stringmqtt topic to subscribe and source events from
client_idUUID V4Stringmqtt client ID. Using same client id in different connectors may close connection
payload_output_typebinaryStringcontrols how the output of payload field is produced

url option with type SecretString can be set as raw string value:

url: "mqtt://test.mosquitto.org/"

or, as a reference to a secret with the given name:

url:
secret:
name: "URL_SECRET_NAME"

Record Type Output

JSON Serialized string with fields mqtt_topic and payload

Payload Output Type

ValueOutput
binaryArray of bytes
jsonUTF-8 JSON Serialized String

Usage Example

This is an example of connector config file:

# sample-config.yaml
apiVersion: 0.1.0
meta:
version: 0.2.5
name: my-mqtt-connector
type: mqtt-source
topic: mqtt-topic
create-topic: true
mqtt:
url: "mqtt://test.mosquitto.org/"
topic: "mqtt-to-fluvio"
timeout:
secs: 30
nanos: 0
payload_output_type: json

Run connector locally using cdk tool (from root directory or any sub-directory):

cdk deploy start --config sample-config.yaml

cdk deploy list # to see the status
cdk deploy log my-mqtt-connector # to see connector's logs

Install MQTT Client such as

# for mac , this takes while....
brew install mosquitto

Insert records:

mosquitto_pub -h test.mosquitto.org -t mqtt-to-fluvio -m '{"device": {"device_id":1, "name":"device1"}}'

The produced record in Fluvio topic will be:

{
"mqtt_topic": "mqtt-to-fluvio",
"payload": {
"device": {
"device_id": 1,
"name": "device1"
}
}
}

Transformations

Fluvio MQTT Source Connector supports Transformations. Records can be modified before sending to Fluvio topic.

The previous example can be extended to add extra transformations to outgoing records:

# sample-config.yaml
apiVersion: 0.1.0
meta:
version: 0.2.5
name: my-mqtt-connector
type: mqtt-source
topic: mqtt-topic
create-topic: true
mqtt:
url: "mqtt://test.mosquitto.org/"
topic: "mqtt-to-fluvio"
timeout:
secs: 30
nanos: 0
payload_output_type: json
transforms:
- uses: infinyon/jolt@0.1.0
with:
spec:
- operation: shift
spec:
payload:
device: "device"
- operation: default
spec:
source: "mqtt-connector"

The object device in the resulting record will be "unwrapped" and the addition field source with value mqtt-connector will be added.

Read more about JSON to JSON transformations.