Skip to main content
Version: 0.11.9

Rust SDK Examples

  • The Rust client is the core client for all language clients.
    • New features arrive in the Rust client before any of the other clients
  • Full support for the Admin API.
  • This client uses async Rust for all blocking calls.

Refer to the fluvio docs.rs page for full detail.

Example Workflow

Follow the installation instructions to run this example.

[package]
edition = "2021"
name = "fluvio-rust-example"
publish = false
version = "0.0.0"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
async-std = {version = "1", features = ["attributes"]}
chrono = "0.4"
fluvio = "0.22"
use async_std::stream::StreamExt;
use chrono::Local;
use fluvio::metadata::topic::TopicSpec;
use fluvio::{consumer::ConsumerConfigExtBuilder, Fluvio, RecordKey};

const TOPIC_NAME: &str = "hello-rust";
const PARTITION_NUM: u32 = 0;
const PARTITIONS: u32 = 1;
const REPLICAS: u32 = 1;

/// This is an example of a basic Fluvio workflow in Rust
///
/// 1. Establish a connection to the Fluvio cluster
/// 2. Create a topic to store data in
/// 3. Create a producer and send some bytes
/// 4. Create a consumer, and stream the data back
#[async_std::main]
async fn main() {
// Connect to Fluvio cluster
let fluvio = Fluvio::connect().await.unwrap();

// Create a topic
let admin = fluvio.admin().await;
let topic_spec = TopicSpec::new_computed(PARTITIONS, REPLICAS, None);
let _topic_create = admin
.create(TOPIC_NAME.to_string(), false, topic_spec)
.await;

// Create a record
let record = format!("Hello World! - Time is {}", Local::now().to_rfc2822());

// Produce to a topic
let producer = fluvio::producer(TOPIC_NAME).await.unwrap();
producer.send(RecordKey::NULL, record).await.unwrap();
// Fluvio batches outgoing records by default, so flush producer to ensure all records are sent
producer.flush().await.unwrap();

// Consume last record from topic
let fluvio = Fluvio::connect().await.unwrap();
let config = ConsumerConfigExtBuilder::default()
.topic(TOPIC_NAME)
.partition(PARTITION_NUM)
.offset_start(fluvio::Offset::from_end(1))
.build()
.unwrap();

let mut stream = fluvio.consumer_with_config(config).await.unwrap();
if let Some(Ok(record)) = stream.next().await {
let string = String::from_utf8_lossy(record.value());
println!("{}", string);
}
}

Run

$ cargo run

Additional Producer options

Alternatively, we can create a producer with custom configuration:

Example:

This is how to configure a Producer with a batch_size of 500 bytes, linger of 500ms , and Gzip type compression.

let config = TopicProducerConfigBuilder::default()
.batch_size(500)
.linger(std::time::Duration::from_millis(500))
.compression(Compression::Gzip)
.build().expect("Failed to create topic producer config");
let producer = fluvio.topic_producer_with_config("my-fluvio-topic", config).await.expect("Failed to create a producer");

Using a SmartModule with the Rust Consumer

Below is an example of how to use a SmartModule filter with the Rust consumer.

use std::io::Read;
use flate2::bufread::GzEncoder;
use flate2::Compression;
use fluvio::{Fluvio, Offset, PartitionConsumer};
use fluvio::consumer::{
SmartModuleInvocation,
SmartModuleInvocationWasm,
SmartModuleKind,
ConsumerConfig
};
use async_std::stream::StreamExt;

let raw_buffer = std::fs::read("/my_projects/example_filter/target/wasm32-unknown-unknown/release/example_filter.wasm").expect("wasm file is missing");
let mut encoder = GzEncoder::new(raw_buffer.as_slice(), Compression::default());
let mut buffer = Vec::with_capacity(raw_buffer.len());
encoder.read_to_end(&mut buffer);

let mut builder = ConsumerConfig::builder();
builder.smartmodule(Some(SmartModuleInvocation {
wasm: SmartModuleInvocationWasm::AdHoc(buffer),
kind: SmartModuleKind::Filter,
params: Default::default()
}));
let filter_config = builder.build().expect("Failed to create config");

// create partition consumer
let consumer = fluvio.partition_consumer("my-topic", 0).await.expect("failed to create consumer");
// stream from beginning
let mut stream = consumer.stream_with_config(Offset::beginning(),filter_config).await.expect("Failed to create stream");

while let Some(Ok(record)) = stream.next().await {
let key = record.key().map(|key| String::from_utf8_lossy(key).to_string());
let value = String::from_utf8_lossy(record.value()).to_string();
println!("Got filter event: key={:?}, value={}", key, value);
}

Refer to the fluvio docs.rs page for full detail.