Skip to content

feat(reliability): idempotent Kafka consumers with processed-events table #13

@MarkCesium

Description

@MarkCesium

Summary

backend/devices/src/infra/broker/routes.py:30-96 processes events without deduplication. At-least-once delivery from PlaceBrain/places#8 / outbox will cause duplicates; today on_place_deleted runs a cascade that is not safe to repeat.

Changes

  • New table processed_events(event_id UUID PK, consumer TEXT, processed_at TIMESTAMPTZ).
  • Each BaseEvent carries event_id (add to placebrain-contracts if missing).
  • Wrap every consumer handler:
    async with uow.begin():
        if await uow.processed_events.exists(event_id, consumer_name):
            return
        await handle(payload)
        await uow.processed_events.insert(event_id, consumer_name)
  • Applies to consumers on places.member.*, places.place.deleted, telemetry.status, auth.user.deleted.

Verification

  • Re-deliver the same PlaceDeleted event twice (e.g. reset consumer group offset) — devices are deleted once, logs show one cascade, one DevicesBulkDeleted publish.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions