Telemetry ingestion and threshold evaluation for PlaceBrain — Kafka in, TimescaleDB out, MQTT alerts sideways.
The hot path of the platform. Every device message landing in EMQX is bridged to Kafka; this service consumes it, batches it in memory, evaluates configured thresholds, writes rows to a TimescaleDB hypertable with COPY, and publishes alerts back to MQTT when something misbehaves. A small gRPC surface serves aggregated readings back to the rest of the system.
PlaceBrain is an open-source IoT platform for smart buildings. See the organization profile for the full architecture.
- Consumes
telemetry.readings(EMQX → Kafka bridge) anddevices.*(from devices) for threshold cache updates and cascading deletes. - Writes
readings(hypertable) andalertsintotelemetry_db. - Publishes
placebrain/{place_id}/alertson MQTT when a threshold is crossed. - Exposed over gRPC (port 50054) for the gateway to fetch latest/aggregated readings.
- Python 3.14, uv
- gRPC + FastStream on aiokafka,
dishka-faststreamfor DI in subscribers - aiomqtt for alert publishing
- Raw asyncpg pool — no SQLAlchemy, no Repository pattern. The hot path uses
COPYdirectly. - TimescaleDB —
readingsis a hypertable with compression and retention policies - Redis + in-memory dict for the two-tier threshold cache
All internal (no auth, Docker-network only):
| Method | Purpose |
|---|---|
GetLatestReadings(device_id) |
SELECT DISTINCT ON (key) ... ORDER BY time DESC — latest value per sensor key |
GetReadings(device_id, from, to, interval_seconds, keys) |
Raw rows when interval_seconds=0 (max 2h), otherwise time_bucket_gapfill aggregation |
DeleteReadings(device_ids) |
Cascade: delete alerts, then readings |
Proto definitions in placebrain-contracts (collector.proto).
Consumer group collector-service:
| Topic | Event | Action |
|---|---|---|
telemetry.readings |
EmqxTelemetryMessage |
Buffer → COPY into readings, evaluate thresholds, alert |
devices.threshold.created |
ThresholdCreated |
Update threshold cache |
devices.threshold.deleted |
ThresholdDeleted |
Drop from cache |
devices.device.deleted |
DeviceDeleted |
Cascade delete readings + alerts for one device |
devices.device.bulk-deleted |
DevicesBulkDeleted |
Same, but batched (e.g. when a place is deleted) |
TelemetryBuffer— in-memory, async-safe (asyncio.Lock), bounded byBUFFER__MAX_SIZE(default 1000).TelemetryWriterflushes viaasyncpg.copy_records_to_tableon theBUFFER__FLUSH_INTERVALcadence (default 60s) or whenever the buffer is full.ThresholdCache— Redis is the persistent source of truth; a localdictis checked on every incoming message to avoid Redis round-trips.
Full stack (recommended): clone infra and run make dev. The compose stack builds the custom Postgres+TimescaleDB image and runs the init script that creates telemetry_db and enables the extension.
Service-only mode:
uv sync
cp .env.example .env # set DATABASE__URL (TimescaleDB), MQTT__*, KAFKA__URL, REDIS__URL
uv run python -m srcNo migrations — schema is created on startup from src/infra/db.py. You need a TimescaleDB instance with CREATE EXTENSION timescaledb available.
See .env.example.
src/
├── main.py
├── core/config.py Pydantic Settings
├── dependencies/ Dishka providers (config, db pool, kafka, mqtt, redis, services)
├── handlers/readings.py gRPC CollectorHandler
├── services/
│ ├── buffer.py In-memory async-safe telemetry buffer
│ ├── writer.py asyncpg COPY writer
│ ├── readings.py Raw + time_bucket_gapfill queries
│ ├── threshold_cache.py Redis + in-memory cache
│ └── alerts.py Evaluate thresholds, write to `alerts`, publish to MQTT
└── infra/
├── db.py Schema / hypertable / compression / retention setup
└── broker/routes.py KafkaRouter subscribers
Apache License 2.0 — see LICENSE.