Skip to content

Conversation

@ozeranskii
Copy link
Contributor

@ozeranskii ozeranskii commented Oct 15, 2025

Description

This PR adds support for per-message keys in Kafka/Confluent batch publishing operations. Previously, batch publishers didn't support keys at all. Now users can specify keys in three ways:

  1. Per-message keys using KafkaPublishMessage (new alias for KafkaResponse)
  2. Single key for entire batch via publish() parameter
  3. Default key from publisher config (factory-level configuration)

Code Examples

1. Per-message keys using KafkaPublishMessage

from faststream.kafka import KafkaBroker, KafkaPublishMessage

broker = KafkaBroker()

@broker.subscriber("input")
async def handler():
    await broker.publish_batch(
        KafkaPublishMessage("user:1", key=b"user1"),
        KafkaPublishMessage("user:2", key=b"user2"),
        "user:3",  # Uses default key (None)
        topic="output"
    )

2. Single key for entire batch

publisher = broker.publisher("topic", batch=True)

await publisher.publish(
    "message1",
    "message2",
    "message3",
    key=b"partition_key"  # All messages get same key
)

3. Default key from factory config

publisher = broker.publisher(
    "topic",
    batch=True,
    key=b"default_key"  # Applied to all messages by default
)

await publisher.publish("msg1", "msg2")  # Both get default_key

# Override default for specific messages
await publisher.publish(
    KafkaPublishMessage("msg1", key=b"override"),
    "msg2"  # Uses default_key
)

Backward Compatibility

Fully backward compatible - all existing code continues to work:

  • key parameter is optional (defaults to None)
  • Existing batch publishers without keys work unchanged

Fixes #2514

Type of change

Please delete options that are not relevant.

  • Documentation (typos, code examples, or any documentation updates)
  • Bug fix (a non-breaking change that resolves an issue)
  • New feature (a non-breaking change that adds functionality)
  • Breaking change (a fix or feature that would disrupt existing functionality)
  • This change requires a documentation update

Checklist

  • My code adheres to the style guidelines of this project (just lint shows no errors)
  • I have conducted a self-review of my own code
  • I have made the necessary changes to the documentation
  • My changes do not generate any new warnings
  • I have added tests to validate the effectiveness of my fix or the functionality of my new feature
  • Both new and existing unit tests pass successfully on my local environment by running just test-coverage
  • I have ensured that static analysis tests are passing by running just static-analysis
  • I have included code examples to illustrate the modifications

@ozeranskii ozeranskii requested a review from Lancetnik as a code owner October 15, 2025 10:55
@github-actions github-actions bot added Confluent Issues related to `faststream.confluent` module AioKafka Issues related to `faststream.kafka` module labels Oct 15, 2025
Copy link
Member

@Lancetnik Lancetnik left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, but magic {"message": ..., "key": ... } doesn't look as a good API.
One reason - we may want to add more additional information to each method instead of just key (headers, publish timestamp, etc)

This reason we have a feature already - Response (or KafkaResponse)

@broker.subscriber(...)
@broker.publisher(...)
async def handler():
     return Response("body", key=b"key")

This object allows to setup any information of outgoing message you want

So, I suggest to support smth like this Response object - Message (we can discuss naming)

await broker.publish_batch(KafkaMessage("body", key=b"1"), KafkaMessage("body", key=b"2"), "just a body message")

Probably, we can reuse the KafkaMessage class we have already. Also, please take a look at KafkaPublishCommand class

@ozeranskii ozeranskii requested a review from Lancetnik October 30, 2025 00:01
@ozeranskii
Copy link
Contributor Author

Sorry, but magic {"message": ..., "key": ... } doesn't look as a good API. One reason - we may want to add more additional information to each method instead of just key (headers, publish timestamp, etc)

This reason we have a feature already - Response (or KafkaResponse)

@broker.subscriber(...)
@broker.publisher(...)
async def handler():
     return Response("body", key=b"key")

This object allows to setup any information of outgoing message you want

So, I suggest to support smth like this Response object - Message (we can discuss naming)

await broker.publish_batch(KafkaMessage("body", key=b"1"), KafkaMessage("body", key=b"2"), "just a body message")

Probably, we can reuse the KafkaMessage class we have already. Also, please take a look at KafkaPublishCommand class

I agree, I didn't like it either. I fixed it.

@ozeranskii ozeranskii force-pushed the 2514-set-the-key-for-batch-producing branch 4 times, most recently from 49cb334 to 96a7dde Compare November 2, 2025 19:36
@github-actions github-actions bot added the documentation Improvements or additions to documentation label Nov 2, 2025
@ozeranskii ozeranskii force-pushed the 2514-set-the-key-for-batch-producing branch from 8274577 to 07762e9 Compare November 2, 2025 19:39
@ozeranskii ozeranskii force-pushed the 2514-set-the-key-for-batch-producing branch from 07762e9 to 29a49b6 Compare November 2, 2025 19:41
@Lancetnik Lancetnik added this pull request to the merge queue Nov 11, 2025
@github-merge-queue github-merge-queue bot removed this pull request from the merge queue due to failed status checks Nov 11, 2025
@Lancetnik Lancetnik enabled auto-merge November 11, 2025 09:05
@Lancetnik Lancetnik added this pull request to the merge queue Nov 11, 2025
Merged via the queue into ag2ai:main with commit cba7f34 Nov 11, 2025
29 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

AioKafka Issues related to `faststream.kafka` module Confluent Issues related to `faststream.confluent` module documentation Improvements or additions to documentation

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Set the key for batch producing

3 participants