Transform examples¶
Transforms sit between the source and sinks. Use them to set routing metadata, filter envelopes, mutate payloads, batch events, or run scripts.
API to Kafka with a key¶
Use set_key to copy a payload field into meta.key, which the Kafka sink uses as the record key.
[[pipelines]]
name = "api->kafka-keyed"
[pipelines.source]
type = "api_poll"
url = "https://jsonplaceholder.typicode.com/posts/1"
interval_secs = 3
[[pipelines.transforms]]
type = "set_key"
from_field = "userId"
[[pipelines.sinks]]
type = "kafka"
brokers = "localhost:9092"
topic = "topic1"
Rhai transform inline¶
Use script with Rhai for small, embedded transforms.
[[pipelines]]
name = "api->kafka-rhai"
[pipelines.source]
type = "api_poll"
url = "https://jsonplaceholder.typicode.com/posts/1"
interval_secs = 3
[[pipelines.transforms]]
type = "script"
runtime = "rhai"
on_error = "drop"
script = """
fn transform(env) {
if env.payload["userId"] == 1 {
env.meta.headers["priority"] = "high";
}
env.payload["processed"] = true;
env
}
"""
[[pipelines.sinks]]
type = "kafka"
brokers = "localhost:9092"
topic = "topic1"
Lua transform from a file¶
Keep longer scripts in version-controlled source files instead of inline TOML.
Python transform with a virtualenv¶
Point python_bin at a virtualenv if your script needs third-party packages.