-
Notifications
You must be signed in to change notification settings - Fork 302
feat(broker): Enables message keys for batch publishing #2586
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(broker): Enables message keys for batch publishing #2586
Conversation
Lancetnik
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, but magic {"message": ..., "key": ... } doesn't look as a good API.
One reason - we may want to add more additional information to each method instead of just key (headers, publish timestamp, etc)
This reason we have a feature already - Response (or KafkaResponse)
@broker.subscriber(...)
@broker.publisher(...)
async def handler():
return Response("body", key=b"key")This object allows to setup any information of outgoing message you want
So, I suggest to support smth like this Response object - Message (we can discuss naming)
await broker.publish_batch(KafkaMessage("body", key=b"1"), KafkaMessage("body", key=b"2"), "just a body message")Probably, we can reuse the KafkaMessage class we have already. Also, please take a look at KafkaPublishCommand class
I agree, I didn't like it either. I fixed it. |
49cb334 to
96a7dde
Compare
8274577 to
07762e9
Compare
07762e9 to
29a49b6
Compare
Description
This PR adds support for per-message keys in Kafka/Confluent batch publishing operations. Previously, batch publishers didn't support keys at all. Now users can specify keys in three ways:
KafkaPublishMessage(new alias forKafkaResponse)publish()parameterCode Examples
1. Per-message keys using KafkaPublishMessage
2. Single key for entire batch
3. Default key from factory config
Backward Compatibility
Fully backward compatible - all existing code continues to work:
keyparameter is optional (defaults toNone)Fixes #2514
Type of change
Please delete options that are not relevant.
Checklist
just lintshows no errors)just test-coveragejust static-analysis