gRPC service for IoT devices, sensors, actuators and thresholds — the device plane of PlaceBrain.
Owns the model of physical devices inside a place: a Device has many Sensors and Actuators; a sensor can have SensorThresholds that trigger alerts. Also responsible for MQTT authentication: every device and every web user is authenticated by this service through EMQX webhooks.
PlaceBrain is an open-source IoT platform for smart buildings. See the organization profile for the full architecture.
- Called over gRPC by gateway for device/sensor/actuator/threshold CRUD and command dispatch.
- Called internally by EMQX (through gateway's
/api/internal/mqtt/*webhooks) forauthandacldecisions. - Maintains a role cache in Redis, fed by Kafka events from places, so it never has to make a synchronous gRPC call to authorize a request.
- Publishes its own events to Kafka — collector consumes them to drop readings/alerts and update threshold caches.
- Python 3.14, uv
- gRPC + FastStream on aiokafka, with
dishka-faststreamfor DI in subscribers - aiomqtt for command publishing
- Redis (
redis-py) for role cache + short-lived MQTT user credentials - Dishka DI, SQLAlchemy 2.0 async + asyncpg, Alembic migrations
- Async bcrypt via
loop.run_in_executor()(MQTT tokens and user passwords)
Read: ListDevices, GetDevice, GetSensors, GetActuators, GetSensorThresholds, GetLatestDeviceReadings
Write: CreateDevice, UpdateDevice, DeleteDevice, CreateSensor, UpdateSensor, DeleteSensor, CreateActuator, UpdateActuator, DeleteActuator, SetSensorThreshold, DeleteSensorThreshold, SendCommand, RegenerateMqttToken
Internal (no auth, Docker-network only): GetAllThresholds, UpdateDeviceStatus, InvalidateMqttCredentials, DeleteDevicesByPlace
Proto definitions live in placebrain-contracts (devices.proto).
Subscribers live in src/infra/broker/routes.py using a KafkaRouter and FromDishka[].
Consumes (consumer group devices-service):
| Topic | Event | Action |
|---|---|---|
places.member.added |
MemberAdded |
Update role cache |
places.member.removed |
MemberRemoved |
Remove role + invalidate MQTT credentials |
places.member.role-changed |
MemberRoleChanged |
Update role cache |
places.place.deleted |
PlaceDeleted |
Drop roles, delete devices, emit DevicesBulkDeleted via publisher chain |
telemetry.status |
EmqxStatusMessage |
Device online/offline transitions |
Produces:
| Topic | Event | Trigger |
|---|---|---|
devices.device.deleted |
DeviceDeleted |
DeleteDevice |
devices.device.bulk-deleted |
DevicesBulkDeleted |
publisher chain from on_place_deleted |
devices.threshold.created |
ThresholdCreated |
SetSensorThreshold |
devices.threshold.deleted |
ThresholdDeleted |
DeleteSensorThreshold |
EMQX calls the gateway's internal webhooks, which relay to this service.
- Device usernames:
device:{device_id}— password is a persistent token (bcrypt-hashed in Postgres). - User usernames:
user:{user_id}— short-lived credentials minted on demand and cached in Redis (mqtt:cred:user:{user_id}, TTL 24h; invalidated byMemberRemoved/PlaceDeleted). collector— internal username used by the collector service.
Full stack (recommended): clone infra and run make dev.
Service-only mode:
uv sync
cp .env.example .env # set DATABASE__URL, REDIS__URL, KAFKA__URL, MQTT__URL
uv run alembic upgrade head
uv run python -m srcRequires PostgreSQL (devices_db), Redis, Kafka and an MQTT broker reachable at the configured URLs.
See .env.example.
src/
├── main.py
├── core/ Settings, shared authorization helpers, typed exceptions, role constants
├── dependencies/ Dishka providers (config, db/uow, kafka, mqtt, redis, services, mqtt_auth)
├── handlers/devices.py gRPC DevicesHandler (all methods)
├── services/
│ ├── devices.py CRUD devices, status, token
│ ├── sensors.py CRUD sensors, thresholds (publishes to Kafka)
│ ├── actuators.py CRUD actuators
│ ├── commands.py MQTT command dispatch
│ ├── mqtt_auth.py EMQX auth + ACL
│ └── role_cache.py Redis + in-memory role cache
└── infra/
├── db/ Models (Device, Sensor, Actuator, SensorThreshold), repositories, UoW
└── broker/routes.py KafkaRouter subscribers with DI
Apache License 2.0 — see LICENSE.