Skip to main content

MQTT Integration

Ingest data directly from MQTT brokers into Arc. Connect to IoT devices, industrial sensors, and message brokers without middleware.

Available since v26.02.1

MQTT integration with API-driven subscription management is available starting Arc v26.02.1 (February 2026).

Overview

Arc provides native MQTT subscription with dynamic, API-driven configuration. Manage multiple MQTT brokers and subscriptions at runtime without server restarts.

Key features:

  • API-driven subscription management - Create, update, delete, start/stop subscriptions via REST API
  • Multiple simultaneous brokers - Connect to different MQTT brokers for different data sources
  • Topic wildcards - Subscribe using + (single level) and # (multi-level) wildcards
  • Auto-detection - Automatically detects JSON and MessagePack message formats
  • High performance - ~6M records/sec with MessagePack columnar format
  • Topic mapping - Extract tags from topic path segments
  • TLS/SSL support - Client certificates and CA verification
  • Encrypted credentials - Passwords encrypted at rest using AES-256-GCM
  • Auto-reconnect - Exponential backoff on connection loss
  • QoS support - QoS 0, 1, and 2

Prerequisites

  • Arc server running (v26.02.1 or higher)
  • Arc API token (if authentication is enabled)
  • MQTT broker accessible from Arc server

Quick Start

1. Enable MQTT in Arc

MQTT is disabled by default. Enable it via either:

Option A — environment variable (recommended for Docker / Kubernetes):

ARC_MQTT_ENABLED=true

In docker-compose.yml:

services:
arc:
environment:
ARC_MQTT_ENABLED: "true"

Or with docker run:

docker run -e ARC_MQTT_ENABLED=true ...

Option B — arc.toml:

[mqtt]
enabled = true

Arc looks for arc.toml in ., /etc/arc/, and $HOME/.arc/. In containers, editing the file inside a running container does not persist across restarts — use the env-var path or mount the file as a volume.

Restart Arc after changing either source. Verify with:

curl http://localhost:8000/api/v1/mqtt/health
ResponseMeaning
200 with {"status":"healthy", ...}MQTT is enabled and running.
200 with {"status":"disabled","healthy":false}MQTT was not enabled at startup. Check ARC_MQTT_ENABLED / [mqtt].enabled and restart.
503 with {"error":"MQTT subsystem disabled"}Same as above, on older builds.
Plaintext Cannot GET /api/v1/mqtt/healthYour Arc build pre-dates the MQTT API. Upgrade to a release that includes PR #416 (v26.05.1 or later).

2. Create a Subscription

curl -X POST http://localhost:8000/api/v1/mqtt/subscriptions \
-H "Content-Type: application/json" \
-H "Authorization: Bearer $ARC_TOKEN" \
-d '{
"name": "factory-sensors",
"broker": "tcp://localhost:1883",
"topics": ["sensors/#"],
"database": "iot",
"auto_start": true
}'

Response:

{
"id": "sub_abc123",
"name": "factory-sensors",
"broker": "tcp://localhost:1883",
"topics": ["sensors/#"],
"database": "iot",
"status": "running",
"created_at": "2026-02-01T10:00:00Z"
}

3. Send Test Data

Publish a message to your MQTT broker:

mosquitto_pub -h localhost -t "sensors/temperature" \
-m '{"time": 1706745600000000, "value": 23.5, "device_id": "sensor-001"}'

4. Query the Data

curl -X POST http://localhost:8000/api/v1/query \
-H "Authorization: Bearer $ARC_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"sql": "SELECT * FROM iot.temperature ORDER BY time DESC LIMIT 10",
"format": "json"
}'

REST API Reference

Subscription Management

MethodEndpointDescription
POST/api/v1/mqtt/subscriptionsCreate a new subscription
GET/api/v1/mqtt/subscriptionsList all subscriptions
GET/api/v1/mqtt/subscriptions/{id}Get subscription details
PUT/api/v1/mqtt/subscriptions/{id}Update subscription
DELETE/api/v1/mqtt/subscriptions/{id}Delete subscription

Lifecycle Control

MethodEndpointDescription
POST/api/v1/mqtt/subscriptions/{id}/startStart subscription
POST/api/v1/mqtt/subscriptions/{id}/stopStop subscription
POST/api/v1/mqtt/subscriptions/{id}/restartRestart subscription

Monitoring

MethodEndpointDescription
GET/api/v1/mqtt/subscriptions/{id}/statsGet subscription stats
GET/api/v1/mqtt/statsAggregate stats (all subscriptions)
GET/api/v1/mqtt/healthHealth check

Subscription Options

Create Subscription Request

{
"name": "factory-sensors",
"broker": "tcp://localhost:1883",
"topics": ["sensors/#", "factory/+/metrics"],
"database": "iot",
"qos": 1,
"client_id": "arc-factory",
"username": "mqtt_user",
"password": "mqtt_pass",
"tls_enabled": false,
"tls_cert_path": "/path/to/client.crt",
"tls_key_path": "/path/to/client.key",
"tls_ca_path": "/path/to/ca.crt",
"topic_mapping": {},
"keep_alive_seconds": 60,
"connect_timeout_seconds": 30,
"reconnect_min_seconds": 1,
"reconnect_max_seconds": 60,
"auto_start": true
}

Field Reference

FieldTypeRequiredDefaultDescription
namestringYes-Unique subscription name
brokerstringYes-MQTT broker URL (tcp://, ssl://, ws://, wss://)
topicsarrayYes-List of topics to subscribe
databasestringYes-Target Arc database
qosintNo1QoS level: 0, 1, or 2
client_idstringNoautoMQTT client ID
usernamestringNo-MQTT username
passwordstringNo-MQTT password (encrypted at rest)
tls_enabledboolNofalseEnable TLS/SSL
tls_cert_pathstringNo-Client certificate path
tls_key_pathstringNo-Client key path
tls_ca_pathstringNo-CA certificate path
topic_mappingobject ({string: string})NoPer-topic target-database override: maps an exact MQTT topic to a database name, overriding database for messages on that topic. See Topic Mapping.
keep_alive_secondsintNo60MQTT keep-alive interval
connect_timeout_secondsintNo30Connection timeout
reconnect_min_secondsintNo1Minimum reconnect delay
reconnect_max_secondsintNo60Maximum reconnect delay
auto_startboolNotrueStart on creation and server restart

Message Formats

Arc automatically detects the message format based on content.

JSON Single Record

{
"time": 1706745600000000,
"temperature": 23.5,
"humidity": 65.2,
"device_id": "sensor-001"
}

JSON Batch

[
{"time": 1706745600000000, "temperature": 23.5},
{"time": 1706745601000000, "temperature": 23.6},
{"time": 1706745602000000, "temperature": 23.4}
]

MessagePack Row-Based

Same structure as JSON, but MessagePack encoded. Detected via magic bytes.

MessagePack Columnar (Fastest)

{
"m": "temperature",
"columns": {
"time": [1706745600000000, 1706745601000000],
"value": [23.5, 23.6],
"device_id": ["sensor-001", "sensor-001"]
}
}

Performance: ~6M records/sec with MessagePack columnar format.

Timestamp Handling

  • If time field is present: used as-is (auto-detects milliseconds/microseconds/nanoseconds)
  • If time field is missing: current UTC time is used

Measurement, Tags, and Fields

Arc derives the measurement, tags, and fields from the message payload, not from the topic structure. The topic itself is not parsed for the measurement name or for tag values.

For each decoded message:

  • Measurement — taken from the payload's m field, or measurement field. If neither is present, it defaults to mqtt.
  • Tags — taken from a tags object in the payload (string values).
  • Fields — taken from a fields object if present; otherwise every remaining top-level key (anything other than m/measurement, t/time/timestamp, tags, fields) is treated as a field.
  • Timestamp — from t, time, or timestamp (auto-detects ms/µs/ns); current UTC time if absent.

So to land in measurement machine_metrics with tags line and machine_id, publish a payload like:

{
"m": "machine_metrics",
"time": 1706745600000000,
"tags": { "line": "A", "machine_id": "42" },
"fields": { "temperature": 71.5, "rpm": 1480 }
}

A flat payload with no m/tags/fields (e.g. {"time": ..., "temperature": 23.5}) is also accepted: it lands in the default mqtt measurement with the remaining keys as fields and no tags.

note

Deriving the measurement or tags from topic path segments (e.g. tags_from_topic / positional extraction) is not currently supported. Set the measurement and tags in the published payload as shown above.

Topic Mapping (Per-Topic Database Override)

topic_mapping maps an exact MQTT topic string to a target database name, overriding the subscription's database for messages received on that topic. It is a flat { "<topic>": "<database>" } object — it does not configure measurements or tags.

{
"name": "factory-sensors",
"broker": "tcp://localhost:1883",
"topics": ["factory/line1/metrics", "factory/line2/metrics"],
"database": "iot",
"topic_mapping": {
"factory/line2/metrics": "iot_line2"
}
}

In this example, messages on factory/line1/metrics are written to the default iot database, while messages on factory/line2/metrics are routed to iot_line2.

note

The mapping key is matched against the message's actual topic by exact string equality — wildcard topic patterns (+, #) are not expanded for matching. A subscription may use wildcards in topics, but topic_mapping keys must be the concrete topics you want to route to a different database.

Authentication

Basic Authentication

curl -X POST http://localhost:8000/api/v1/mqtt/subscriptions \
-H "Content-Type: application/json" \
-d '{
"name": "authenticated-broker",
"broker": "tcp://broker.example.com:1883",
"topics": ["data/#"],
"database": "production",
"username": "mqtt_user",
"password": "mqtt_password"
}'

Password Encryption

Passwords are encrypted at rest using AES-256-GCM. Set the encryption key:

# Generate a 32-byte key
openssl rand -base64 32

# Set environment variable before starting Arc
export ARC_ENCRYPTION_KEY="your-base64-encoded-32-byte-key"

Note: The encryption key is only required when subscriptions have passwords. Subscriptions without credentials work without the key.

TLS/SSL Configuration

Server Certificate Verification

curl -X POST http://localhost:8000/api/v1/mqtt/subscriptions \
-H "Content-Type: application/json" \
-d '{
"name": "secure-broker",
"broker": "ssl://broker.example.com:8883",
"topics": ["secure/#"],
"database": "production",
"tls_enabled": true,
"tls_ca_path": "/etc/arc/certs/ca.crt"
}'

Client Certificate Authentication

curl -X POST http://localhost:8000/api/v1/mqtt/subscriptions \
-H "Content-Type: application/json" \
-d '{
"name": "mtls-broker",
"broker": "ssl://broker.example.com:8883",
"topics": ["secure/#"],
"database": "production",
"tls_enabled": true,
"tls_cert_path": "/etc/arc/certs/client.crt",
"tls_key_path": "/etc/arc/certs/client.key",
"tls_ca_path": "/etc/arc/certs/ca.crt"
}'

Configuration Examples

Multiple Brokers

Connect to different brokers for different environments:

# Production broker
curl -X POST http://localhost:8000/api/v1/mqtt/subscriptions \
-H "Content-Type: application/json" \
-d '{
"name": "production",
"broker": "ssl://prod-mqtt.example.com:8883",
"topics": ["prod/#"],
"database": "production",
"tls_enabled": true
}'

# Development broker
curl -X POST http://localhost:8000/api/v1/mqtt/subscriptions \
-H "Content-Type: application/json" \
-d '{
"name": "development",
"broker": "tcp://dev-mqtt.example.com:1883",
"topics": ["dev/#"],
"database": "development"
}'

IoT Sensor Network

curl -X POST http://localhost:8000/api/v1/mqtt/subscriptions \
-H "Content-Type: application/json" \
-d '{
"name": "iot-sensors",
"broker": "tcp://mosquitto:1883",
"topics": [
"sensors/+/temperature",
"sensors/+/humidity",
"sensors/+/pressure"
],
"database": "iot",
"qos": 1
}'

Devices set the measurement and tags in the payload — e.g. a temperature sensor publishes:

{ "m": "temperature", "tags": { "sensor_id": "temp-001" }, "fields": { "value": 23.5 } }

Industrial Factory

curl -X POST http://localhost:8000/api/v1/mqtt/subscriptions \
-H "Content-Type: application/json" \
-d '{
"name": "factory-floor",
"broker": "tcp://factory-mqtt:1883",
"topics": ["factory/+/+/metrics"],
"database": "manufacturing",
"qos": 2
}'

Machines publish the measurement and tags in the payload:

{
"m": "machine_metrics",
"tags": { "line": "A", "machine_id": "42" },
"fields": { "temperature": 71.5, "rpm": 1480 }
}

Monitoring

Subscription Stats

# Stats for a specific subscription
curl http://localhost:8000/api/v1/mqtt/subscriptions/{id}/stats

# Aggregate stats for all subscriptions
curl http://localhost:8000/api/v1/mqtt/stats

Response:

{
"status": "success",
"running_count": 2,
"subscriptions_stats": {
"sub_abc123": {
"messages_received": 15420,
"bytes_received": 2458320,
"decode_errors": 0,
"last_message_at": "2026-02-01T10:30:15Z",
"topics": {
"sensors/temperature": 8500,
"sensors/humidity": 6920
}
}
}
}

Health Check

curl http://localhost:8000/api/v1/mqtt/health

Response:

{
"status": "healthy",
"healthy": true,
"running_count": 2,
"connected_count": 2,
"disconnected_count": 0,
"service": "mqtt_subscriptions"
}

Prometheus Metrics

Arc exposes MQTT metrics for Prometheus:

MetricTypeDescription
arc_mqtt_messages_received_totalCounterTotal messages received
arc_mqtt_bytes_received_totalCounterTotal bytes received
arc_mqtt_decode_errors_totalCounterMessage decode errors
arc_mqtt_connection_statusGaugeConnection status (1=connected)

Querying MQTT Data

List Measurements

SHOW TABLES FROM iot;

Basic Query

SELECT * FROM iot.temperature
ORDER BY time DESC
LIMIT 10;

Time-Based Aggregation

SELECT
time_bucket(INTERVAL '5 minutes', time) as bucket,
AVG(value) as avg_temp,
MIN(value) as min_temp,
MAX(value) as max_temp
FROM iot.temperature
WHERE time > NOW() - INTERVAL '1 hour'
GROUP BY bucket
ORDER BY bucket DESC;

Filter by Tag

SELECT * FROM iot.sensor_data
WHERE sensor_id = 'temp-001'
AND time > NOW() - INTERVAL '24 hours'
ORDER BY time DESC;

Cross-Measurement Join

SELECT
t.time,
t.value as temperature,
h.value as humidity
FROM iot.temperature t
JOIN iot.humidity h ON t.time = h.time AND t.sensor_id = h.sensor_id
WHERE t.time > NOW() - INTERVAL '1 hour'
ORDER BY t.time DESC;

Troubleshooting

Connection Failed

# Check subscription status
curl http://localhost:8000/api/v1/mqtt/subscriptions/{id}

If status is error, check:

  • Broker URL is correct (tcp://, ssl://, ws://)
  • Broker is reachable from Arc server
  • Credentials are correct
  • TLS certificates are valid

No Data Appearing

  1. Verify subscription is running:
curl http://localhost:8000/api/v1/mqtt/subscriptions/{id}
# status should be "running"
  1. Check stats for received messages:
curl http://localhost:8000/api/v1/mqtt/subscriptions/{id}/stats
  1. Verify topic pattern matches published topics

  2. Check Arc logs for decode errors

Messages Not Decoding

Ensure messages are valid JSON or MessagePack:

# Test with simple JSON
mosquitto_pub -h localhost -t "test/data" \
-m '{"time": 1706745600000000, "value": 42}'

Check for decode errors in stats:

curl http://localhost:8000/api/v1/mqtt/subscriptions/{id}/stats | jq '.decode_errors'

Subscription Won't Start

Check for errors:

curl http://localhost:8000/api/v1/mqtt/subscriptions/{id} | jq '.error_message'

Common issues:

  • Another client using same client_id
  • Invalid broker URL
  • Network connectivity issues

Best Practices

1. Use Descriptive Names

{
"name": "prod-factory-floor-sensors",
"broker": "ssl://prod-mqtt.example.com:8883"
}

2. Separate Databases by Environment

// Production
{"database": "production", "topics": ["prod/#"]}

// Staging
{"database": "staging", "topics": ["staging/#"]}

3. Use QoS Appropriately

  • QoS 0: Fire-and-forget, no guarantees (highest throughput)
  • QoS 1: At least once delivery (recommended for most cases)
  • QoS 2: Exactly once delivery (highest overhead)

4. Set Reasonable Reconnect Intervals

{
"reconnect_min_seconds": 1,
"reconnect_max_seconds": 60
}

5. Use Topic Wildcards Efficiently

# Good - specific wildcards
sensors/+/temperature
factory/line1/+/metrics

# Avoid - too broad
#
sensors/#/#

6. Monitor Subscription Health

Set up alerts on:

  • arc_mqtt_connection_status == 0 (disconnected)
  • rate(arc_mqtt_decode_errors_total[5m]) > 0 (decode errors)

Docker Compose Example

version: '3.8'
services:
arc:
image: basekick/arc:latest
ports:
- "8000:8000"
volumes:
- arc-data:/data
environment:
- ARC_ENCRYPTION_KEY=${ARC_ENCRYPTION_KEY}
depends_on:
- mosquitto

mosquitto:
image: eclipse-mosquitto:2
ports:
- "1883:1883"
volumes:
- ./mosquitto.conf:/mosquitto/config/mosquitto.conf

volumes:
arc-data:

mosquitto.conf:

listener 1883
allow_anonymous true

Next Steps