feat(codec): add destination parameter to encode#2902
Conversation
d20c610 to
6a33f03
Compare
|
|
I will add a schema registry example, I think this will make more sense. |
6a33f03 to
3e0d1d1
Compare
9b45655 to
75cc013
Compare
@Lancetnik I have added a Schema Registry example I have previously used. I won't be available next week, if there are new suggestions I will be able to add them after. |
9a14d12 to
1239f71
Compare
|
@Lancetnik I am not comforable with the fact that Rabbit and Redis still need a destination parameter |
| ) | ||
| encoded_batch = await self.codec.encode_batch(cmd, self.serializer) | ||
| else: | ||
| from faststream.response.response import PublishCommand as _BaseCmd |
There was a problem hiding this comment.
I don't see a reason to use function-levels here (or anywhere)
| from faststream.confluent.subscriber.usecase import BatchSubscriber | ||
| from faststream.exceptions import SubscriberNotFound | ||
| from faststream.message import gen_cor_id | ||
| from faststream.response.response import PublishCommand as _BasePublishCommand |
There was a problem hiding this comment.
Previous file it was imported as _BaseCmd - we should use same naming for same cases
| codec: Optional["CodecProto"] = None, | ||
| ) -> MockConfluentMessage: | ||
| """Build a mock confluent_kafka.Message for a sendable message.""" | ||
| from faststream.response.publish_type import PublishType |
| ) -> "ConsumerRecord": | ||
| """Build a Kafka ConsumerRecord for a sendable message.""" | ||
| msg, content_type = await (codec or DefaultCodec()).encode(message, serializer) | ||
| from faststream.response.publish_type import PublishType |
Lancetnik
left a comment
There was a problem hiding this comment.
We should unify behavior and signature for all broker codecs
Description
Add optional destination:
str = ""toCodecProto.encodeandBatchCodecProto.encode_batch, wired through all producers and test fakes. Enables destination-aware codecs (e.g. Schema Registry topic→schema resolution) without breaking existing codecs.WIP #2837
Type of change
Please delete options that are not relevant.
Checklist
just lintshows no errors)just test-coveragejust static-analysis