Summary
backend/devices/src/infra/broker/routes.py:30-96 processes events without deduplication. At-least-once delivery from PlaceBrain/places#8 / outbox will cause duplicates; today on_place_deleted runs a cascade that is not safe to repeat.
Changes
- New table
processed_events(event_id UUID PK, consumer TEXT, processed_at TIMESTAMPTZ).
- Each
BaseEvent carries event_id (add to placebrain-contracts if missing).
- Wrap every consumer handler:
async with uow.begin():
if await uow.processed_events.exists(event_id, consumer_name):
return
await handle(payload)
await uow.processed_events.insert(event_id, consumer_name)
- Applies to consumers on
places.member.*, places.place.deleted, telemetry.status, auth.user.deleted.
Verification
- Re-deliver the same
PlaceDeleted event twice (e.g. reset consumer group offset) — devices are deleted once, logs show one cascade, one DevicesBulkDeleted publish.
Summary
backend/devices/src/infra/broker/routes.py:30-96processes events without deduplication. At-least-once delivery from PlaceBrain/places#8 / outbox will cause duplicates; todayon_place_deletedruns a cascade that is not safe to repeat.Changes
processed_events(event_id UUID PK, consumer TEXT, processed_at TIMESTAMPTZ).BaseEventcarriesevent_id(add toplacebrain-contractsif missing).places.member.*,places.place.deleted,telemetry.status,auth.user.deleted.Verification
PlaceDeletedevent twice (e.g. reset consumer group offset) — devices are deleted once, logs show one cascade, oneDevicesBulkDeletedpublish.