Skip to content

feat(codec): add destination parameter to encode#2902

Open
ce1ebrimbor wants to merge 5 commits into
ag2ai:mainfrom
ce1ebrimbor:feat/codec-destination
Open

feat(codec): add destination parameter to encode#2902
ce1ebrimbor wants to merge 5 commits into
ag2ai:mainfrom
ce1ebrimbor:feat/codec-destination

Conversation

@ce1ebrimbor

@ce1ebrimbor ce1ebrimbor commented Jun 3, 2026

Copy link
Copy Markdown
Contributor

Description

Add optional destination: str = "" to CodecProto.encode and BatchCodecProto.encode_batch, wired through all producers and test fakes. Enables destination-aware codecs (e.g. Schema Registry topic→schema resolution) without breaking existing codecs.

WIP #2837

Type of change

Please delete options that are not relevant.

  • New feature (a non-breaking change that adds functionality)

Checklist

  • My code adheres to the style guidelines of this project (just lint shows no errors)
  • I have conducted a self-review of my own code
  • I have made the necessary changes to the documentation
  • My changes do not generate any new warnings
  • I have added tests to validate the effectiveness of my fix or the functionality of my new feature
  • Both new and existing unit tests pass successfully on my local environment by running just test-coverage
  • I have ensured that static analysis tests are passing by running just static-analysis
  • I have included code examples to illustrate the modifications

@github-actions github-actions Bot added Confluent Issues related to `faststream.confluent` module AioKafka Issues related to `faststream.kafka` module NATS Issues related to `faststream.nats` module and NATS broker features MQTT Issues related to `faststream.mqtt` module labels Jun 3, 2026
@ce1ebrimbor ce1ebrimbor force-pushed the feat/codec-destination branch from d20c610 to 6a33f03 Compare June 3, 2026 10:23
@github-actions github-actions Bot added the documentation Improvements or additions to documentation label Jun 3, 2026
@Lancetnik

Copy link
Copy Markdown
Member

GzipCodec is not prefect example because you can achieve the same behavior using middleware - https://github.com/ulbwa/faststream-compressors

Comment thread faststream/confluent/publisher/producer.py Outdated
@ce1ebrimbor

Copy link
Copy Markdown
Contributor Author

GzipCodec is not prefect example because you can achieve the same behavior using middleware - https://github.com/ulbwa/faststream-compressors

I will add a schema registry example, I think this will make more sense.

@ce1ebrimbor ce1ebrimbor force-pushed the feat/codec-destination branch from 6a33f03 to 3e0d1d1 Compare June 4, 2026 19:14
@github-actions github-actions Bot added the Redis Issues related to `faststream.redis` module and Redis features label Jun 4, 2026
@ce1ebrimbor ce1ebrimbor force-pushed the feat/codec-destination branch 4 times, most recently from 9b45655 to 75cc013 Compare June 4, 2026 19:38
@ce1ebrimbor

Copy link
Copy Markdown
Contributor Author

I will add a schema registry example, I think this will make more sense.

@Lancetnik I have added a Schema Registry example I have previously used.
I will need some help for other brokers.

I won't be available next week, if there are new suggestions I will be able to add them after.

@ce1ebrimbor ce1ebrimbor force-pushed the feat/codec-destination branch 2 times, most recently from 9a14d12 to 1239f71 Compare June 4, 2026 19:45
@ce1ebrimbor

ce1ebrimbor commented Jun 4, 2026

Copy link
Copy Markdown
Contributor Author

@Lancetnik I am not comforable with the fact that Rabbit and Redis still need a destination parameter
If we want to keep the consistency we should reworkvthose, your call.
Otherwise let's keep this PR as wip. 🙏

)
encoded_batch = await self.codec.encode_batch(cmd, self.serializer)
else:
from faststream.response.response import PublishCommand as _BaseCmd

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see a reason to use function-levels here (or anywhere)

from faststream.confluent.subscriber.usecase import BatchSubscriber
from faststream.exceptions import SubscriberNotFound
from faststream.message import gen_cor_id
from faststream.response.response import PublishCommand as _BasePublishCommand

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previous file it was imported as _BaseCmd - we should use same naming for same cases

codec: Optional["CodecProto"] = None,
) -> MockConfluentMessage:
"""Build a mock confluent_kafka.Message for a sendable message."""
from faststream.response.publish_type import PublishType

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Function-level imports again

) -> "ConsumerRecord":
"""Build a Kafka ConsumerRecord for a sendable message."""
msg, content_type = await (codec or DefaultCodec()).encode(message, serializer)
from faststream.response.publish_type import PublishType

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And again

@Lancetnik Lancetnik left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should unify behavior and signature for all broker codecs

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

AioKafka Issues related to `faststream.kafka` module Confluent Issues related to `faststream.confluent` module documentation Improvements or additions to documentation MQTT Issues related to `faststream.mqtt` module NATS Issues related to `faststream.nats` module and NATS broker features Redis Issues related to `faststream.redis` module and Redis features

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants