feat sqs broker#2913
Draft
vvlrff wants to merge 14 commits into
Draft
Conversation
Implement the SQS broker for FastStream, providing support for standard and FIFO queues, manual acknowledgement, RPC (request/response), and integration with FastAPI. - Add `SQSBroker` and `SQSRouter` - Implement `SQSQueue` and `FifoQueue` schemas for queue declaration - Support SQS-specific features: FIFO, redrive policies, and visibility timeouts - Add `SQSMessage` with `ack`, `nack`, and `reject` methods - Implement RPC pattern using a dedicated response queue - Add OpenTelemetry and Prometheus middleware support - Include comprehensive test suite with `TestSQSBroker` in-memory double - Add documentation for SQS usage and configuration
Add LocalStack to docker-compose.yaml and documentation to enable local AWS service emulation, specifically for testing the new SQS broker integration. Includes a healthcheck to ensure SQS availability.
Refactor the SQS implementation to use a centralized configuration module,
improve queue declaration logic, and enhance testing coverage.
- Move `SQSBrokerConfig` to `faststream.sqs.configs`
- Update `SQSSubscriber` to support automatic queue declaration via `SQSQueue`
- Implement prefix support in `SQSPublisher`
- Update `docker-compose.yaml` to rename `localstack` service to `sqs`
- Add comprehensive test suite for SQS brokers, including:
- Consumption and publishing
- Router and middleware integration
- Request/Response patterns
- Test client functionality
- Fix `SQSRegistrator` to raise `SetupError` instead of `TypeError`
Introduce support for batch message processing, improved SQS-specific exception handling, and stricter validation for producer constraints. - Add `SQSBatchMessage` and `batch=True` support for subscribers - Implement producer guards for SQS limits (message size, attribute count, and FIFO requirements) - Add specialized SQS exceptions: `BatchSendError`, `FifoQueueError`, `MessageTooLargeError`, and `TooManyMessageAttributesError` - Enhance `SQSMessage` with system attribute access (e.g., `group_id`, `sequence_number`) - Improve `SQSParser` to handle binary attributes and empty bodies correctly - Add robust error handling and exponential backoff for response queue consumption in `SQSBroker` - Update `SQSQueue` schema to support tags and KMS configuration options
Add new GitHub Actions jobs to validate SQS functionality: - `test-sqs-smoke`: runs unit tests with mocked SQS - `test-sqs-real`: runs integration tests using LocalStack Includes coverage reporting and artifact uploading for SQS tests.
Add extensive documentation and test coverage for the SQS broker, including: - New documentation files for SQS features: acknowledgement policies, FIFO queues, publishing, RPC, security, and subscriptions. - Documentation source files for generating examples. - Comprehensive AsyncAPI schema tests for SQS (v2.6.0 and v3.0.0). - Integration tests for SQS with FastAPI, OpenTelemetry, and Prometheus. - Tests for SQS-specific middleware and telemetry providers. - Tests for SQS broker functionality including manual acknowledgement, DLQs, and batch publishing.
Add support for the `ReceiveRequestAttemptId` parameter in SQS subscribers. This allows users to provide a deduplication token during `ReceiveMessage` calls, which is useful for retrying failed receives in FIFO queues to ensure the same batch of messages is returned. - Implement `request_attempt_id` in `SQSRegistrator`, `SQSRouter`, and `SQSSubscriber`. - Add validation to ensure `request_attempt_id` is only used with FIFO queues, raising a `SetupError` otherwise. - Update documentation and add integration tests for the new feature.
- restore non-UTF-8 publishing: bodies are base64-encoded with a reserved `base64-body` attribute instead of crashing with UnicodeDecodeError - make `system_attributes` an instance attribute (was a shared class-level dict) and populate it for batch messages (+ per-message `batch_system_attributes`); `decode_batch` reuses the parses - `get_one`: build kwargs via `_receive_kwargs()` (restores VisibilityTimeout) and keep WaitTimeSeconds strictly below the client deadline so near-deadline messages are not lost into the visibility window - share one poll-with-backoff helper between the consume loop, `__aiter__` (was a silent busy-spin via `suppress`) and the RPC response loop (which now also deletes replies with one `delete_message_batch`) - `ping()` honors its `timeout` via `anyio.move_on_after` - FastAPI `SQSRouter.subscriber` exposes the `batch` parameter - batch-aware Prometheus/OTel providers (size in bytes + real message count) - `publish_batch` honors user codecs implementing `BatchCodecProto` - default `wait_time_seconds` 5 -> 20 (AWS long-polling guidance) - test cleanup: FakeProducer reuses the parent constructor, fake connect goes through `SQSBrokerConfig.connect`, ElasticMQ settings deduplicated Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
Move every concrete TestBroker's EnterType-binding overloads under `if TYPE_CHECKING:` so no runtime `__init__` frames are added (mypy cannot infer a subclass type argument from base-class overloads, so per-subclass copies stay, but as pure type stubs). This lets the AST-based `connect_only` detection go back to the deterministic `extract_stack()[-3]` lookup, fixing misdetection when users open `with TestBroker(...)` inside their own `__init__`. The multi-broker `__aenter__` now returns a tuple, matching the declared types, and the try-it-out factory no longer needs its `cast` workaround. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
The runtime dependency is `aiobotocore>=2.13,<3`, but the lint group pulled 3.x stubs, so mypy validated faststream/sqs against a different major API than the one installed. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
No description provided.