Data Ingestion
How to write data to Arc using the Python SDK.
Overview
The SDK provides multiple ways to ingest data, each optimized for different use cases:
| Method | Best For | Performance | Format |
|---|---|---|---|
write_columnar() | High-throughput ingestion | 9M+ records/sec | MessagePack columnar |
write_dataframe() | pandas/polars workflows | 5M+ records/sec | MessagePack columnar |
buffered() | Streaming data | Auto-batched | MessagePack columnar |
write_line_protocol() | InfluxDB compatibility | 1M+ records/sec | Line protocol text |
All write methods are available on client.write.
Columnar Format (Recommended)
The fastest way to write data. Data is organized by columns (like a DataFrame) rather than rows, which enables efficient compression and fast serialization.
Basic Usage
from arc_client import ArcClient
with ArcClient(host="localhost", token=os.environ["ARC_TOKEN"]) as client:
client.write.write_columnar(
measurement="cpu",
columns={
"time": [1704067200000000, 1704067260000000, 1704067320000000],
"host": ["server01", "server01", "server01"],
"region": ["us-east", "us-east", "us-east"],
"usage_idle": [95.2, 94.8, 93.1],
"usage_system": [2.1, 2.5, 3.2],
"usage_user": [2.7, 2.7, 3.7],
},
)
Parameters
| Parameter | Type | Required | Description |
|---|---|---|---|
measurement | str | Yes | Name of the measurement (like a table) |
columns | dict | Yes | Column name → list of values |
database | str | No | Target database (default: client's database) |
Column Types
The SDK automatically handles type conversion:
| Python Type | Arc Type | Example |
|---|---|---|
int | Integer | [1, 2, 3] |
float | Float | [1.5, 2.7, 3.9] |
str | String (tag or field) | ["a", "b", "c"] |
bool | Boolean | [True, False, True] |
datetime | Timestamp (microseconds) | [datetime.now()] |
Timestamps
The time column must contain a numeric Unix epoch. Arc auto-detects the
unit by magnitude — seconds, milliseconds, microseconds, or nanoseconds — and
normalizes everything to microseconds internally. Microseconds is the
recommended, unambiguous form:
import time
from datetime import datetime
# From current time
timestamp_us = int(time.time() * 1_000_000)
# From datetime
timestamp_us = int(datetime.now().timestamp() * 1_000_000)
# Multiple timestamps
timestamps = [
1704067200000000, # 2024-01-01 00:00:00 UTC
1704067260000000, # 2024-01-01 00:01:00 UTC
1704067320000000, # 2024-01-01 00:02:00 UTC
]
time must be numeric, and must never be null. Arc stores time as a
TIMESTAMP WITH TIME ZONE column (UTC). To keep this column type consistent
across every file in a partition — which compaction relies on — the ingest path
rejects a time column sent as a string (e.g. an ISO-8601 datetime
string like "2024-01-01T00:00:00Z") or containing null values. Such a
write fails with a clear error rather than being silently accepted:
- ❌
"time": ["2024-01-01T00:00:00Z"]— string timestamps are rejected. Convert to a numeric epoch first:int(datetime.fromisoformat("2024-01-01T00:00:00+00:00").timestamp() * 1_000_000). - ❌
"time": [1704067200000000, None]— null timestamps are rejected (a nulltimewould otherwise be silently routed to the1970-01-01partition). - ✅
"time": [1704067200000000, 1704067260000000]— numeric epoch (any unit).
If you previously sent string timestamps and saw them ingest "successfully" on
Arc 26.05.1, note that those writes produced files whose time column type
disagreed with normally-ingested files, which prevented those partitions from
compacting. Sending a numeric epoch avoids this entirely.
Tags vs Fields
In Arc:
- Tags are indexed string columns used for filtering (e.g.,
host,region,sensor_id) - Fields are the actual metric values (e.g.,
temperature,cpu_usage,count)
For write_columnar(), all columns are sent directly to Arc. Tag/field distinction is handled by the Arc server based on its schema detection.
For write_dataframe(), you can explicitly specify which columns are tags using the tag_columns parameter.
DataFrame Ingestion
Write directly from pandas or polars DataFrames. The SDK converts DataFrames to columnar format automatically.
pandas Example
import pandas as pd
from arc_client import ArcClient
# Create a DataFrame
df = pd.DataFrame({
"time": pd.date_range("2024-01-01", periods=100, freq="1min"),
"host": ["server-01"] * 50 + ["server-02"] * 50,
"region": ["us-east"] * 100,
"cpu_usage": [50 + i * 0.1 for i in range(100)],
"memory_mb": [1024 + i for i in range(100)],
})
with ArcClient(host="localhost", token=os.environ["ARC_TOKEN"]) as client:
client.write.write_dataframe(
df,
measurement="server_metrics",
time_column="time", # Column containing timestamps
tag_columns=["host", "region"], # Columns to treat as tags
)
print(f"Wrote {len(df)} rows")
Parameters
| Parameter | Type | Required | Description |
|---|---|---|---|
df | DataFrame | Yes | pandas or polars DataFrame |
measurement | str | Yes | Target measurement name |
time_column | str | Yes | Name of the timestamp column |
tag_columns | list[str] | No | Columns to treat as indexed tags |
database | str | No | Target database |
Polars Example
import polars as pl
from arc_client import ArcClient
df = pl.DataFrame({
"time": pl.datetime_range(
datetime(2024, 1, 1),
datetime(2024, 1, 1, 1),
interval="1m",
eager=True
),
"sensor_id": ["sensor-001"] * 61,
"temperature": [20.0 + i * 0.1 for i in range(61)],
})
with ArcClient(host="localhost", token=os.environ["ARC_TOKEN"]) as client:
client.write.write_dataframe(
df,
measurement="temperatures",
time_column="time",
tag_columns=["sensor_id"],
)
Buffered Writes
For streaming or high-throughput scenarios, use buffered writes. The buffer automatically batches records and flushes them efficiently.
Basic Usage
from arc_client import ArcClient
with ArcClient(host="localhost", token=os.environ["ARC_TOKEN"]) as client:
with client.write.buffered(batch_size=5000, flush_interval=2.0) as buffer:
for i in range(50000):
buffer.write(
measurement="events",
tags={"source": "sensor-001", "type": "temperature"},
fields={"value": 22.5 + i * 0.01},
timestamp=1704067200000000 + (i * 1000),
)
# Auto-flushes on exit
Parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
batch_size | int | 5000 | Flush after N records |
flush_interval | float | 5.0 | Flush after N seconds (even if batch not full) |
How It Works
- Records are queued in memory
- When
batch_sizeis reached ORflush_intervalexpires, the buffer flushes - On context manager exit, any remaining records are flushed
- Uses columnar format internally for best performance
When to Use Buffered Writes
✅ Use buffered writes when:
- Processing streaming data (sensors, logs, events)
- Ingesting data in a loop
- You don't know the batch size ahead of time
❌ Don't use buffered writes when:
- You already have data in columnar format or DataFrame
- You're writing a single batch (use
write_columnar()directly)
Async Buffered Writes
import asyncio
from arc_client import AsyncArcClient
async def ingest_stream():
async with AsyncArcClient(host="localhost", token=os.environ["ARC_TOKEN"]) as client:
async with client.write.buffered(batch_size=5000) as buffer:
async for event in event_stream():
await buffer.write(
measurement="events",
tags={"source": event.source},
fields={"value": event.value},
timestamp=event.timestamp,
)
asyncio.run(ingest_stream())
Line Protocol
For compatibility with InfluxDB tooling (Telegraf, etc.), use line protocol format.
Basic Usage
from arc_client import ArcClient
with ArcClient(host="localhost", token=os.environ["ARC_TOKEN"]) as client:
# Single line
client.write.write_line_protocol(
"cpu,host=server01,region=us-east usage_idle=95.2 1704067200000000000"
)
# Multiple lines
lines = [
"cpu,host=server01 usage_idle=95.2,usage_system=2.1",
"cpu,host=server02 usage_idle=87.5,usage_system=4.3",
"mem,host=server01 used_percent=45.2",
]
client.write.write_line_protocol(lines)
Line Protocol Format
<measurement>,<tag_key>=<tag_value>,... <field_key>=<field_value>,... [timestamp]
Example breakdown:
cpu,host=server01,region=us-east usage_idle=95.2,usage_system=2.1 1704067200000000000
│ │ │ │
│ │ │ └── timestamp (nanoseconds)
│ │ └── fields (space-separated from tags)
│ └── tags (comma-separated)
└── measurement name
When to Use Line Protocol
✅ Use line protocol when:
- Integrating with Telegraf or other InfluxDB tools
- Migrating from InfluxDB
- You already have data in line protocol format
❌ Don't use line protocol when:
- Building new applications (use columnar format)
- Performance is critical (columnar is 8x faster)
Async Ingestion
All write methods have async equivalents:
import asyncio
from arc_client import AsyncArcClient
async def main():
async with AsyncArcClient(host="localhost", token=os.environ["ARC_TOKEN"]) as client:
# Columnar write
await client.write.write_columnar(
measurement="cpu",
columns={
"time": [1704067200000000],
"host": ["server01"],
"usage": [45.2],
},
)
# DataFrame write
await client.write.write_dataframe(
df, measurement="metrics", time_column="time"
)
# Line protocol
await client.write.write_line_protocol("cpu,host=server01 usage=45.2")
asyncio.run(main())
Error Handling
from arc_client import ArcClient
from arc_client.exceptions import (
ArcIngestionError,
ArcValidationError,
ArcConnectionError,
)
with ArcClient(host="localhost", token=os.environ["ARC_TOKEN"]) as client:
try:
client.write.write_columnar(
measurement="cpu",
columns={"time": [1], "value": [1.0]},
)
except ArcValidationError as e:
print(f"Invalid data: {e}")
except ArcIngestionError as e:
print(f"Write failed: {e}")
except ArcConnectionError as e:
print(f"Connection error: {e}")
Best Practices
1. Batch Your Data
Send multiple rows per request rather than one at a time:
# ✅ Good: Batch write
client.write.write_columnar(
measurement="cpu",
columns={
"time": [t1, t2, t3, ...], # 1000+ values
"host": [h1, h2, h3, ...],
"value": [v1, v2, v3, ...],
},
)
# ❌ Bad: Individual writes
for record in records:
client.write.write_columnar(
measurement="cpu",
columns={
"time": [record.time],
"host": [record.host],
"value": [record.value],
},
)
2. Use Appropriate Batch Sizes
- Small batches (100-1000): Lower latency, more HTTP overhead
- Medium batches (1000-10000): Good balance for most use cases
- Large batches (10000+): Best throughput, higher memory usage
3. Handle Backpressure
For high-throughput scenarios, implement backpressure handling:
import time
def write_with_backoff(client, data, max_retries=3):
for attempt in range(max_retries):
try:
client.write.write_columnar(**data)
return
except ArcIngestionError as e:
if attempt == max_retries - 1:
raise
time.sleep(2 ** attempt) # Exponential backoff
Next Steps
- Querying - Query data and work with DataFrames
- Data Management - Retention, CQs, and deletion
- API Reference - Raw REST API documentation