Skip to content

feat sqs broker#2913

Draft
vvlrff wants to merge 14 commits into
ag2ai:mainfrom
vvlrff:feat/sqs-broker
Draft

feat sqs broker#2913
vvlrff wants to merge 14 commits into
ag2ai:mainfrom
vvlrff:feat/sqs-broker

Conversation

@vvlrff

@vvlrff vvlrff commented Jun 10, 2026

Copy link
Copy Markdown
Contributor

No description provided.

vvlrff and others added 13 commits June 5, 2026 19:12
Implement the SQS broker for FastStream, providing support for standard
and FIFO queues, manual acknowledgement, RPC (request/response), and
integration with FastAPI.

- Add `SQSBroker` and `SQSRouter`
- Implement `SQSQueue` and `FifoQueue` schemas for queue declaration
- Support SQS-specific features: FIFO, redrive policies, and visibility
  timeouts
- Add `SQSMessage` with `ack`, `nack`, and `reject` methods
- Implement RPC pattern using a dedicated response queue
- Add OpenTelemetry and Prometheus middleware support
- Include comprehensive test suite with `TestSQSBroker` in-memory double
- Add documentation for SQS usage and configuration
Add LocalStack to docker-compose.yaml and documentation to enable
local AWS service emulation, specifically for testing the new SQS
broker integration. Includes a healthcheck to ensure SQS availability.
Refactor the SQS implementation to use a centralized configuration module,
improve queue declaration logic, and enhance testing coverage.

- Move `SQSBrokerConfig` to `faststream.sqs.configs`
- Update `SQSSubscriber` to support automatic queue declaration via `SQSQueue`
- Implement prefix support in `SQSPublisher`
- Update `docker-compose.yaml` to rename `localstack` service to `sqs`
- Add comprehensive test suite for SQS brokers, including:
    - Consumption and publishing
    - Router and middleware integration
    - Request/Response patterns
    - Test client functionality
- Fix `SQSRegistrator` to raise `SetupError` instead of `TypeError`
Introduce support for batch message processing, improved SQS-specific
exception handling, and stricter validation for producer constraints.

- Add `SQSBatchMessage` and `batch=True` support for subscribers
- Implement producer guards for SQS limits (message size, attribute count,
  and FIFO requirements)
- Add specialized SQS exceptions: `BatchSendError`, `FifoQueueError`,
  `MessageTooLargeError`, and `TooManyMessageAttributesError`
- Enhance `SQSMessage` with system attribute access (e.g., `group_id`,
  `sequence_number`)
- Improve `SQSParser` to handle binary attributes and empty bodies correctly
- Add robust error handling and exponential backoff for response queue
  consumption in `SQSBroker`
- Update `SQSQueue` schema to support tags and KMS configuration options
Add new GitHub Actions jobs to validate SQS functionality:
- `test-sqs-smoke`: runs unit tests with mocked SQS
- `test-sqs-real`: runs integration tests using LocalStack

Includes coverage reporting and artifact uploading for SQS tests.
Add extensive documentation and test coverage for the SQS broker, including:

- New documentation files for SQS features: acknowledgement policies, FIFO queues, publishing, RPC, security, and subscriptions.
- Documentation source files for generating examples.
- Comprehensive AsyncAPI schema tests for SQS (v2.6.0 and v3.0.0).
- Integration tests for SQS with FastAPI, OpenTelemetry, and Prometheus.
- Tests for SQS-specific middleware and telemetry providers.
- Tests for SQS broker functionality including manual acknowledgement, DLQs, and batch publishing.
Add support for the `ReceiveRequestAttemptId` parameter in SQS subscribers.
This allows users to provide a deduplication token during `ReceiveMessage`
calls, which is useful for retrying failed receives in FIFO queues to
ensure the same batch of messages is returned.

- Implement `request_attempt_id` in `SQSRegistrator`, `SQSRouter`, and
  `SQSSubscriber`.
- Add validation to ensure `request_attempt_id` is only used with FIFO
  queues, raising a `SetupError` otherwise.
- Update documentation and add integration tests for the new feature.
- restore non-UTF-8 publishing: bodies are base64-encoded with a reserved
  `base64-body` attribute instead of crashing with UnicodeDecodeError
- make `system_attributes` an instance attribute (was a shared class-level
  dict) and populate it for batch messages (+ per-message
  `batch_system_attributes`); `decode_batch` reuses the parses
- `get_one`: build kwargs via `_receive_kwargs()` (restores
  VisibilityTimeout) and keep WaitTimeSeconds strictly below the client
  deadline so near-deadline messages are not lost into the visibility window
- share one poll-with-backoff helper between the consume loop, `__aiter__`
  (was a silent busy-spin via `suppress`) and the RPC response loop (which
  now also deletes replies with one `delete_message_batch`)
- `ping()` honors its `timeout` via `anyio.move_on_after`
- FastAPI `SQSRouter.subscriber` exposes the `batch` parameter
- batch-aware Prometheus/OTel providers (size in bytes + real message count)
- `publish_batch` honors user codecs implementing `BatchCodecProto`
- default `wait_time_seconds` 5 -> 20 (AWS long-polling guidance)
- test cleanup: FakeProducer reuses the parent constructor, fake connect
  goes through `SQSBrokerConfig.connect`, ElasticMQ settings deduplicated

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
Move every concrete TestBroker's EnterType-binding overloads under
`if TYPE_CHECKING:` so no runtime `__init__` frames are added (mypy cannot
infer a subclass type argument from base-class overloads, so per-subclass
copies stay, but as pure type stubs). This lets the AST-based
`connect_only` detection go back to the deterministic
`extract_stack()[-3]` lookup, fixing misdetection when users open
`with TestBroker(...)` inside their own `__init__`. The multi-broker
`__aenter__` now returns a tuple, matching the declared types, and the
try-it-out factory no longer needs its `cast` workaround.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
The runtime dependency is `aiobotocore>=2.13,<3`, but the lint group pulled
3.x stubs, so mypy validated faststream/sqs against a different major API
than the one installed.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
@github-actions github-actions Bot added documentation Improvements or additions to documentation dependencies Pull requests that update a dependency file github_actions Pull requests that update GitHub Actions code Confluent Issues related to `faststream.confluent` module AioKafka Issues related to `faststream.kafka` module NATS Issues related to `faststream.nats` module and NATS broker features Redis Issues related to `faststream.redis` module and Redis features MQTT Issues related to `faststream.mqtt` module labels Jun 10, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

AioKafka Issues related to `faststream.kafka` module Confluent Issues related to `faststream.confluent` module dependencies Pull requests that update a dependency file documentation Improvements or additions to documentation github_actions Pull requests that update GitHub Actions code MQTT Issues related to `faststream.mqtt` module NATS Issues related to `faststream.nats` module and NATS broker features Redis Issues related to `faststream.redis` module and Redis features

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant