Skip to main content

Redpanda Connect Integration

Stream data from any of Redpanda Connect's 200+ sources directly into Arc using the native Arc output plugin.

Overview

Redpanda Connect (formerly Benthos) is a stream processor that connects sources to sinks with a single YAML config file. It handles transformations, filtering, batching, retries, and backpressure out of the box. Arc has a native output plugin that speaks Arc's MessagePack ingestion protocol directly, so data flows from your source into Arc's columnar storage with no translation layer.

Benefits:

  • Native MessagePack columnar format with zstd compression
  • 200+ input connectors (Kafka, HTTP, MQTT, S3, GCS, Postgres CDC, etc.)
  • Bloblang transformations for reshaping, filtering, and enriching data in-flight
  • Interpolated measurement names for per-message routing to different Arc tables
  • Single binary, no JVM, no cluster required

Why This Matters

Arc already has native ingestion paths for metrics (Telegraf) and IoT data (MQTT). Redpanda Connect covers a different gap: event-driven data that needs reshaping, filtering, or enrichment before it lands in Arc.

ToolBest For
TelegrafPulling metrics from systems at fixed intervals
Native MQTTSubscribing to IoT brokers directly
Redpanda ConnectEvent streams, CDC, webhooks, complex transformations, fan-out pipelines

Some concrete examples where Redpanda Connect fits:

  • Kafka to Arc — consume events, filter out bot traffic, normalize timestamps, write to Arc
  • Webhooks to Arc — receive HTTP webhooks from third-party APIs, reshape the payload, store for analytics
  • CDC to Arc — capture Postgres/MySQL change events and stream them into Arc for historical tracking
  • Multi-destination — send the same data to Arc and Kafka (or S3, or Elasticsearch) with different transformations per sink

Prerequisites

  • Redpanda Connect 4.88 or higher (required for the arc output)
  • Arc server running and accessible
  • Arc API token (if auth is enabled)

Quick Start

1. Install Redpanda Connect

# Homebrew (macOS/Linux)
brew install redpanda-data/tap/redpanda-connect

# Docker
docker run --rm -v $(pwd)/config.yaml:/config.yaml \
docker.redpanda.com/redpandadata/connect:latest run /config.yaml

# Direct binary download
# https://github.com/redpanda-data/connect/releases

Verify you have 4.88+:

redpanda-connect --version

2. Create a Pipeline Config

Create arc-pipeline.yaml:

input:
generate:
count: 10
interval: 1s
mapping: |
root.vehicle_id = "truck-" + random_int(min: 1, max: 5).string()
root.lat = 40.7128 + (random_int(min: -1000, max: 1000).number() / 10000)
root.lon = -74.0060 + (random_int(min: -1000, max: 1000).number() / 10000)
root.speed_kmh = random_int(min: 0, max: 120)

output:
arc:
base_url: http://localhost:8000
token: "${ARC_TOKEN}"
database: logistics
measurement: fleet_tracking
format: columnar
compression: zstd
batching:
count: 100
period: 1s

3. Run the Pipeline

export ARC_TOKEN="your-arc-token"
redpanda-connect run arc-pipeline.yaml

Expected output:

INFO Running main config from specified file       path=arc-pipeline.yaml
INFO Input type generate is now active
INFO Output type arc is now active
INFO Pipeline has terminated. Shutting down the service

4. Verify Data in Arc

curl -X POST http://localhost:8000/api/v1/query \
-H "Authorization: Bearer $ARC_TOKEN" \
-H "Content-Type: application/json" \
-d '{"sql": "SELECT vehicle_id, speed_kmh FROM logistics.fleet_tracking ORDER BY time DESC LIMIT 10"}'

Configuration Reference

OptionDescriptionDefault
base_urlBase URL of the Arc instanceRequired
tokenBearer token for authenticationOptional
databaseTarget database in Arcdefault
measurementMeasurement (table) name, supports interpolationRequired
formatPayload format: columnar or rowcolumnar
compressionCompression: zstd, gzip, or nonezstd
timestamp_fieldField name in the message containing the timestampempty (uses current time)
timestamp_unitUnit of numeric timestamps: us, ms, s, ns, autoauto
tags_mappingBloblang mapping to extract tags (row format only)Optional
tlsTLS configurationOptional
batchingBatch policy (count, period, byte_size)None
max_in_flightMaximum parallel batches64
timeoutHTTP request timeout5s

Payload Formats

Transposes batched messages into column arrays. This is Arc's fastest ingestion path because it maps directly to Arc's Arrow buffers and avoids per-row overhead.

output:
arc:
base_url: http://localhost:8000
database: logistics
measurement: fleet_tracking
format: columnar
compression: zstd

Requirement: all messages within a single batch must have the same set of fields. Arc validates this server-side and rejects batches with mismatched columns. Schema evolution across separate batches is fully supported.

Row

Sends each message as an individual record with fields and optional tags. Useful when messages within a batch have varying schemas, or when you need per-message tags.

output:
arc:
base_url: http://localhost:8000
database: logistics
measurement: fleet_tracking
format: row
tags_mapping: |
root = {"vehicle_id": this.vehicle_id, "fleet": this.fleet, "region": this.region}

Real-World Examples

Kafka Events to Arc

Consume JSON events from a Kafka topic, drop bot traffic, reshape fields, and normalize the timestamp:

input:
kafka:
addresses: ["kafka:9092"]
topics: ["app-events"]
consumer_group: "arc-analytics"

pipeline:
processors:
- mapping: |
# Drop bot traffic
root = if this.user_id.has_prefix("bot-") { deleted() }
# Reshape the fields we care about
root.user_id = this.user_id
root.page = this.page
root.duration_ms = this.duration_ms
root.event_type = this.event

output:
arc:
base_url: http://localhost:8000
token: "${ARC_TOKEN}"
database: analytics
measurement: page_views
format: columnar
timestamp_field: timestamp
timestamp_unit: ms
compression: zstd
batching:
count: 5000
period: 5s

HTTP Webhooks to Arc

Expose an HTTP endpoint that receives webhooks and writes them to Arc:

input:
http_server:
address: "0.0.0.0:8080"
path: /webhook

pipeline:
processors:
- mapping: |
root.source = meta("Http_Header_X_Webhook_Source")
root.received_at = now()
root.payload = this

output:
arc:
base_url: http://localhost:8000
token: "${ARC_TOKEN}"
database: webhooks
measurement: "${!metadata(\"Http_Header_X_Webhook_Source\")}"
format: row
compression: zstd
batching:
count: 100
period: 2s

MQTT to Arc with Transformations

When you want Redpanda Connect's transformation power on top of MQTT (instead of the native MQTT ingestion):

input:
mqtt:
urls: ["tcp://broker.example.com:1883"]
topics: ["sensors/#"]
client_id: "arc-connect"

pipeline:
processors:
- mapping: |
root.device_id = meta("mqtt_topic").split("/").index(1)
root.reading = this.value
root.temperature_c = (this.value - 32) * 5 / 9

output:
arc:
base_url: http://localhost:8000
token: "${ARC_TOKEN}"
database: sensors
measurement: readings
format: columnar
compression: zstd
batching:
count: 1000
period: 1s

Multi-Destination Fan-Out

Send the same events to Arc and Kafka simultaneously:

output:
broker:
pattern: fan_out
outputs:
- arc:
base_url: http://localhost:8000
token: "${ARC_TOKEN}"
database: events
measurement: user_actions
format: columnar
- kafka:
addresses: ["kafka:9092"]
topic: processed-events

Dynamic Measurement Routing

The measurement field supports Redpanda Connect's Bloblang interpolation. Messages with different types can be routed to different Arc tables in a single pipeline:

output:
arc:
base_url: http://localhost:8000
database: telemetry
# Messages with {"asset_type": "truck", ...} go to the "truck" table
# Messages with {"asset_type": "drone", ...} go to the "drone" table
measurement: ${!json("asset_type")}

Or route from message metadata (e.g., from Kafka headers, HTTP headers, or MQTT topics):

output:
arc:
base_url: http://localhost:8000
database: telemetry
measurement: ${!metadata("measurement")}

Bloblang Transformations

Bloblang is Redpanda Connect's built-in mapping language. A few patterns that come up when writing to Arc:

Drop messages conditionally

processors:
- mapping: |
root = if this.value == null { deleted() }

Flatten nested structures

processors:
- mapping: |
root.device_id = this.device.id
root.device_model = this.device.model
root.reading = this.payload.reading

Parse timestamps from strings

processors:
- mapping: |
root.event_time = this.timestamp.ts_parse("2006-01-02T15:04:05Z")
root.event_name = this.event

Enrich with static or derived fields

processors:
- mapping: |
root = this
root.region = env("DEPLOY_REGION")
root.ingested_at = now()

Querying the Data

Once data is in Arc, query it with standard SQL:

-- Latest records per vehicle
SELECT vehicle_id, lat, lon, speed_kmh, time
FROM logistics.fleet_tracking
WHERE time > NOW() - INTERVAL '1 hour'
ORDER BY time DESC
LIMIT 100;

-- Average speed by vehicle over the last 24h
SELECT
vehicle_id,
AVG(speed_kmh) as avg_speed,
MAX(speed_kmh) as max_speed,
COUNT(*) as reading_count
FROM logistics.fleet_tracking
WHERE time > NOW() - INTERVAL '24 hours'
GROUP BY vehicle_id
ORDER BY avg_speed DESC;

-- Hourly throughput of ingested events
SELECT
time_bucket(INTERVAL '1 hour', time) as hour,
COUNT(*) as events
FROM analytics.page_views
WHERE time > NOW() - INTERVAL '7 days'
GROUP BY hour
ORDER BY hour DESC;

Performance Tuning

Batch Size

Arc's columnar format is significantly more efficient with larger batches. Tune batching.count and batching.period based on your volume.

VolumeRecommended batching.count
Low (<1K msg/sec)100 – 500
Medium (1K – 10K/sec)1000 – 5000
High (>10K/sec)5000 – 10000

Max In Flight

max_in_flight controls how many batches can be sent concurrently. Default is 64. For very high throughput, increase it along with the Arc server's resources:

output:
arc:
max_in_flight: 128
batching:
count: 5000
period: 1s

Compression Choice

  • zstd (default) — Best decompression performance on the Arc server. Recommended for most workloads.
  • gzip — Slightly smaller payloads but higher CPU. Use if the Arc server is I/O bound and CPU is plentiful.
  • none — Only useful for debugging or when running on localhost with very small payloads.

Format Choice

Prefer columnar whenever batches share a consistent schema. It is significantly faster end-to-end. Use row only when you need per-message tags or flexible per-message fields.

Troubleshooting

401 Unauthorized

The Arc token is missing, invalid, or not being expanded by the shell.

output:
arc:
token: "${ARC_TOKEN}" # Make sure ARC_TOKEN is exported in your env

Test the token directly:

curl -H "Authorization: Bearer $ARC_TOKEN" http://localhost:8000/api/v1/query \
-d '{"sql": "SHOW DATABASES"}'

400 Bad Request with "column length mismatch"

Columnar format requires all messages in a batch to share the same set of fields. If some messages have extra or missing fields, Arc rejects the batch.

Options:

  • Switch to format: row if messages have varying schemas
  • Add a Bloblang step that normalizes fields before the output
  • Reduce batch size so each batch is more homogeneous

Messages written but nothing queryable

Arc buffers data in memory before flushing to Parquet (default 5 seconds). If you're checking immediately after writing, wait a few seconds and try again. For very small batches in local dev, set:

batching:
count: 10
period: 1s

Measurement name rejected

Arc validates measurement names (alphanumeric, underscores, hyphens, max 64 chars, must start with a letter). If you're using interpolation, make sure the value is clean:

measurement: ${!json("type").string()}

Timestamps in the wrong unit

If your source produces timestamps in milliseconds but Arc is interpreting them as something else, set timestamp_unit explicitly:

timestamp_field: ts
timestamp_unit: ms # us | ms | s | ns | auto

The auto default detects the unit from magnitude, which is usually correct but fails for edge cases (e.g. very small timestamps from the 1970s).

Resources

Next Steps

  • Pair with Grafana to visualize the data Redpanda Connect ingests
  • Use Arc's native MQTT when you don't need transformations
  • Use Telegraf for system/infrastructure metrics