Sink configuration¶
A pipeline has one or more sinks. With multiple sinks, Courier broadcasts each envelope to every sink and applies backpressure from the slowest sink. Use the table to jump to the configuration reference for each supported sink.
| Kind | Use when |
|---|---|
api |
Sending envelopes to an HTTP endpoint. |
file |
Appending envelopes to a local JSONL or CSV file. |
kafka |
Producing records to a Kafka topic. |
sql |
Inserting envelopes into a SQL table. |
api¶
Sends each envelope to an HTTP endpoint as a JSON request. Useful for webhook integrations, REST forwarding, and posting to internal services.
[[pipelines.sinks]]
type = "api"
url = "https://internal.example.com/webhooks/users"
method = "POST" # default
body = "payload" # default, send only env.payload
headers = { Authorization = "Bearer token" }
timeout_secs = 30 # optional
| Field | Required | Default | Description |
|---|---|---|---|
url |
yes | — | Endpoint to send the request to. |
method |
no | "POST" |
Any HTTP method understood by reqwest::Method, for example POST, PUT, PATCH, or DELETE. |
headers |
no | {} |
String map appended to every request. |
body |
no | "payload" |
"payload" sends env.payload as the JSON body; "envelope" sends the full envelope ({ "meta": ..., "payload": ... }). |
timeout_secs |
no | none | Per-request timeout. Omit to use the underlying reqwest default. |
Any non-2xx response, network error, or timeout is reported as a sink failure and flows through the configured on_error and retry policies. The response body, when present on a failure, is included in the error message so it surfaces in logs and dead-letter entries.
file¶
Appends each envelope to a local file as a single line of JSON or a single CSV row. Useful for local development, debugging, audits, and lightweight ETL.
[[pipelines.sinks]]
type = "file"
path = "./out/users.jsonl"
format = "jsonl" # default
body = "payload" # default, only meaningful for jsonl
[[pipelines.sinks]]
type = "file"
path = "./out/users.csv"
format = "csv"
columns = ["payload.id", "payload.name", "meta.source_id"]
| Field | Required | Default | Description |
|---|---|---|---|
path |
yes | — | Output file path. Parent directories are created automatically. |
format |
no | "jsonl" |
"jsonl" or "csv". |
body |
no (jsonl only) | "payload" |
"payload" writes only env.payload; "envelope" writes the full envelope ({ "meta": ..., "payload": ... }). Ignored for CSV. |
columns |
yes for csv | [] |
Dotted paths evaluated against the full envelope (payload.id, meta.source_id, meta.headers.priority, ...). Required when format = "csv". |
The file is opened in append mode and flushed after every write, so a restart resumes cleanly without truncating prior output. For CSV the header row is written only when the file is empty at open time; a restart against an existing file emits more rows but no extra header.
CSV cells follow RFC 4180: fields containing ,, ", \n, or \r are quoted, and embedded " characters are doubled. Missing columns render as empty cells; nested objects and arrays are JSON-encoded so they remain machine-readable.
Streaming writes are not transactional. A retried write after a partial I/O failure may produce a duplicate row, the same trade-off that applies to every WriteOne-based sink.
kafka¶
Produces records to a Kafka topic via rdkafka.
[[pipelines.sinks]]
type = "kafka"
brokers = "localhost:9092"
topic = "topic1"
on_error = "drop"
[pipelines.sinks.retry]
max_attempts = 5
initial_delay_ms = 100
backoff_multiplier = 2.0
max_delay_ms = 5000
[pipelines.sinks.retry.on_exhausted]
kind = "propagate"
| Field | Required | Description |
|---|---|---|
brokers |
yes | Comma-separated bootstrap broker list. |
topic |
yes | Destination topic. |
meta.key is used as the record key; payload is serialized as the record value.
See Error Handling & Retry for on_error and retry configuration.
sql¶
Inserts one row per envelope into a SQL table. Supports postgres and sqlite drivers.
[[pipelines.sinks]]
type = "sql"
driver = "postgres"
dsn = "postgres://user:pass@localhost/warehouse"
table = "users_snapshot"
mode = "insert"
columns = {
id = "payload.id",
email = "payload.email",
updated_at = "payload.updated_at",
source = "meta.source_id"
}
| Field | Required | Default | Description |
|---|---|---|---|
driver |
yes | — | postgres or sqlite. |
dsn |
yes | — | Driver-specific connection string. |
table |
yes | — | Destination table. Simple identifiers and dotted schema-qualified names are supported. |
mode |
no | "insert" |
Write mode. Only insert is included in this version. |
columns |
yes | — | Map of destination column name to dotted path in the full envelope. |
Column mappings are evaluated against the full envelope, so paths can read from payload, meta.source_id, meta.key, meta.timestamp_ms, or meta.headers.*. Missing paths are inserted as SQL NULL. Scalar JSON values are bound as native booleans, numbers, or strings where possible; arrays and objects are bound as JSON for Postgres and JSON strings for SQLite.
Upsert is not included in this first version. A future API should define conflict columns and update columns explicitly, because the SQL syntax and behavior differ by driver.
This sink is wrapped in ManagedSink, so non-transient database errors, retry, dead-letter, and on_error behavior are handled the same way as other built-in sinks. See Error Handling & Retry.