Skip to content

Commit 49cb334

Browse files
committed
feat(broker): Enables message keys for batch publishing
Allows specifying a message key when publishing a batch of messages, enabling partition control. Introduces the option to set a default key for a batch publisher and override it per message if required. Closes #2514 refactor(confluent): Removes unused blank line feat(publisher): enable per-message keys for batch publishing Add support for supplying per-message publish keys when sending batches from publishers. Introduce a semantic alias for response objects used for publishing, add a publish-key accessor on responses, and implement a singledispatch-based extractor to derive (body, key) tuples from arbitrary batch items. This allows callers to pass message-specific keys (or fall back to a publisher default) without ad-hoc mapping shapes or heavy isinstance checks, and avoids extra allocations when no per-message keys are present. Also expose the new alias in package exports, adjust batch publisher config handling, add tests covering per-message keys and default-key fallback, and bump package version metadata. refactor(response): add precise type hints for batch extraction locals test(tests): verify per-message keys for batch publishing in Kafka and Confluent test(tests.brokers): wrap long assertion messages for readability
1 parent 7186673 commit 49cb334

File tree

15 files changed

+518
-27
lines changed

15 files changed

+518
-27
lines changed

faststream/confluent/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
try:
44
from .annotations import KafkaMessage
55
from .broker import KafkaBroker, KafkaPublisher, KafkaRoute, KafkaRouter
6-
from .response import KafkaPublishCommand, KafkaResponse
6+
from .response import KafkaPublishCommand, KafkaPublishMessage, KafkaResponse
77
from .schemas import TopicPartition
88
from .testing import TestKafkaBroker
99

@@ -19,6 +19,7 @@
1919
"KafkaBroker",
2020
"KafkaMessage",
2121
"KafkaPublishCommand",
22+
"KafkaPublishMessage",
2223
"KafkaPublisher",
2324
"KafkaResponse",
2425
"KafkaRoute",

faststream/confluent/publisher/factory.py

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,6 @@
22
from functools import wraps
33
from typing import TYPE_CHECKING, Any
44

5-
from faststream.exceptions import SetupError
6-
75
from .config import KafkaPublisherConfig, KafkaPublisherSpecificationConfig
86
from .specification import KafkaPublisherSpecification
97
from .usecase import BatchPublisher, DefaultPublisher
@@ -54,10 +52,6 @@ def create_publisher(
5452

5553
publisher: BatchPublisher | DefaultPublisher
5654
if batch:
57-
if key:
58-
msg = "You can't setup `key` with batch publisher"
59-
raise SetupError(msg)
60-
6155
publisher = BatchPublisher(publisher_config, specification)
6256
publish_method = "_basic_publish_batch"
6357

faststream/confluent/publisher/producer.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ async def publish_batch(self, cmd: "KafkaPublishCommand") -> None:
157157

158158
headers_to_send = cmd.headers_to_publish()
159159

160-
for msg in cmd.batch_bodies:
160+
for message_position, msg in enumerate(cmd.batch_bodies):
161161
message, content_type = encode_message(msg, serializer=self.serializer)
162162

163163
if content_type:
@@ -169,7 +169,7 @@ async def publish_batch(self, cmd: "KafkaPublishCommand") -> None:
169169
final_headers = headers_to_send.copy()
170170

171171
batch.append(
172-
key=None,
172+
key=cmd.key_for(message_position),
173173
value=message,
174174
timestamp=cmd.timestamp_ms,
175175
headers=[(i, j.encode()) for i, j in final_headers.items()],

faststream/confluent/publisher/usecase.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -215,11 +215,20 @@ async def request(
215215

216216

217217
class BatchPublisher(LogicPublisher):
218+
def __init__(
219+
self,
220+
config: "KafkaPublisherConfig",
221+
specification: "PublisherSpecification[Any, Any]",
222+
) -> None:
223+
super().__init__(config, specification)
224+
self.key = config.key
225+
218226
@override
219227
async def publish(
220228
self,
221229
*messages: "SendableMessage",
222230
topic: str = "",
231+
key: bytes | str | None = None,
223232
partition: int | None = None,
224233
timestamp_ms: int | None = None,
225234
headers: dict[str, str] | None = None,
@@ -229,7 +238,7 @@ async def publish(
229238
) -> None:
230239
cmd = KafkaPublishCommand(
231240
*messages,
232-
key=None,
241+
key=key or self.key,
233242
topic=topic or self.topic,
234243
partition=partition if partition is not None else self.partition,
235244
reply_to=reply_to or self.reply_to,
@@ -261,6 +270,7 @@ async def _publish(
261270
cmd.reply_to = cmd.reply_to or self.reply_to
262271

263272
cmd.partition = cmd.partition if cmd.partition is not None else self.partition
273+
cmd.key = cmd.key or self.key
264274

265275
await self._basic_publish_batch(
266276
cmd,

faststream/confluent/response.py

Lines changed: 37 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,21 +3,36 @@
33
from typing_extensions import override
44

55
from faststream.response.publish_type import PublishType
6-
from faststream.response.response import BatchPublishCommand, PublishCommand, Response
6+
from faststream.response.response import (
7+
BatchPublishCommand,
8+
PublishCommand,
9+
Response,
10+
extract_per_message_keys_and_bodies,
11+
key_for_index,
12+
)
713

814
if TYPE_CHECKING:
915
from faststream._internal.basic_types import SendableMessage
1016

1117

1218
class KafkaResponse(Response):
19+
"""Kafka-specific response object for outgoing messages.
20+
21+
Can be used in two ways:
22+
1. As a return value from handler to send a response message
23+
2. Directly in publish_batch() to set per-message attributes (key, headers, etc.)
24+
25+
For publish operations, consider using the more semantic alias `KafkaPublishMessage`.
26+
"""
27+
1328
def __init__(
1429
self,
1530
body: "SendableMessage",
1631
*,
1732
headers: dict[str, Any] | None = None,
1833
correlation_id: str | None = None,
1934
timestamp_ms: int | None = None,
20-
key: bytes | str | None = None,
35+
key: bytes | Any | None = None,
2136
) -> None:
2237
super().__init__(
2338
body=body,
@@ -28,6 +43,11 @@ def __init__(
2843
self.timestamp_ms = timestamp_ms
2944
self.key = key
3045

46+
@override
47+
def get_publish_key(self) -> bytes | Any | None:
48+
"""Return the Kafka message key for publishing."""
49+
return self.key
50+
3151
@override
3252
def as_publish_command(self) -> "KafkaPublishCommand":
3353
return KafkaPublishCommand(
@@ -50,7 +70,7 @@ def __init__(
5070
*messages: "SendableMessage",
5171
topic: str,
5272
_publish_type: PublishType,
53-
key: bytes | str | None = None,
73+
key: bytes | Any | None = None,
5474
partition: int | None = None,
5575
timestamp_ms: int | None = None,
5676
headers: dict[str, str] | None = None,
@@ -77,6 +97,12 @@ def __init__(
7797
# request option
7898
self.timeout = timeout
7999

100+
# per-message keys support
101+
keys, normalized = extract_per_message_keys_and_bodies(self.batch_bodies)
102+
if normalized is not None:
103+
self.batch_bodies = normalized
104+
self._per_message_keys = keys
105+
80106
@classmethod
81107
def from_cmd(
82108
cls,
@@ -100,6 +126,9 @@ def from_cmd(
100126
_publish_type=cmd.publish_type,
101127
)
102128

129+
def key_for(self, index: int) -> Any | None:
130+
return key_for_index(self._per_message_keys, self.key, index)
131+
103132
def headers_to_publish(self) -> dict[str, str]:
104133
headers = {}
105134

@@ -110,3 +139,8 @@ def headers_to_publish(self) -> dict[str, str]:
110139
headers["reply_to"] = self.reply_to
111140

112141
return headers | self.headers
142+
143+
144+
# Semantic alias for publish operations
145+
# More intuitive name when using in publish_batch() rather than as handler return value
146+
KafkaPublishMessage = KafkaResponse

faststream/confluent/testing.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,12 +154,13 @@ async def publish_batch(self, cmd: "KafkaPublishCommand") -> None:
154154
topic=cmd.destination,
155155
partition=cmd.partition,
156156
timestamp_ms=cmd.timestamp_ms,
157+
key=cmd.key_for(message_position),
157158
headers=cmd.headers,
158159
correlation_id=cmd.correlation_id,
159160
reply_to=cmd.reply_to,
160161
serializer=self.broker.config.fd_config._serializer,
161162
)
162-
for message in cmd.batch_bodies
163+
for message_position, message in enumerate(cmd.batch_bodies)
163164
)
164165

165166
if isinstance(handler, BatchSubscriber):

faststream/kafka/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
from .annotations import KafkaMessage
88
from .broker import KafkaBroker, KafkaPublisher, KafkaRoute, KafkaRouter
9-
from .response import KafkaPublishCommand, KafkaResponse
9+
from .response import KafkaPublishCommand, KafkaPublishMessage, KafkaResponse
1010
from .testing import TestKafkaBroker
1111

1212
except ImportError as e:
@@ -22,6 +22,7 @@
2222
"KafkaBroker",
2323
"KafkaMessage",
2424
"KafkaPublishCommand",
25+
"KafkaPublishMessage",
2526
"KafkaPublisher",
2627
"KafkaResponse",
2728
"KafkaRoute",

faststream/kafka/publisher/factory.py

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,6 @@
55
Any,
66
)
77

8-
from faststream.exceptions import SetupError
9-
108
from .config import KafkaPublisherConfig, KafkaPublisherSpecificationConfig
119
from .specification import KafkaPublisherSpecification
1210
from .usecase import BatchPublisher, DefaultPublisher
@@ -56,10 +54,6 @@ def create_publisher(
5654
)
5755

5856
if batch:
59-
if key:
60-
msg = "You can't setup `key` with batch publisher"
61-
raise SetupError(msg)
62-
6357
publisher: BatchPublisher | DefaultPublisher = BatchPublisher(
6458
publisher_config,
6559
specification,

faststream/kafka/publisher/producer.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ async def publish_batch(
147147
final_headers = headers_to_send.copy()
148148

149149
metadata = batch.append(
150-
key=None,
150+
key=cmd.key_for(message_position),
151151
value=message,
152152
timestamp=cmd.timestamp_ms,
153153
headers=[(i, j.encode()) for i, j in final_headers.items()],

faststream/kafka/publisher/usecase.py

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -298,11 +298,20 @@ async def request(
298298

299299

300300
class BatchPublisher(LogicPublisher):
301+
def __init__(
302+
self,
303+
config: "KafkaPublisherConfig",
304+
specification: "PublisherSpecification[Any, Any]",
305+
) -> None:
306+
super().__init__(config, specification)
307+
self.key = config.key
308+
301309
@overload
302310
async def publish(
303311
self,
304312
*messages: "SendableMessage",
305313
topic: str = "",
314+
key: bytes | Any | None = None,
306315
partition: int | None = None,
307316
timestamp_ms: int | None = None,
308317
headers: dict[str, str] | None = None,
@@ -316,6 +325,7 @@ async def publish(
316325
self,
317326
*messages: "SendableMessage",
318327
topic: str = "",
328+
key: bytes | Any | None = None,
319329
partition: int | None = None,
320330
timestamp_ms: int | None = None,
321331
headers: dict[str, str] | None = None,
@@ -329,6 +339,7 @@ async def publish(
329339
self,
330340
*messages: "SendableMessage",
331341
topic: str = "",
342+
key: bytes | Any | None = None,
332343
partition: int | None = None,
333344
timestamp_ms: int | None = None,
334345
headers: dict[str, str] | None = None,
@@ -342,6 +353,7 @@ async def publish(
342353
self,
343354
*messages: "SendableMessage",
344355
topic: str = "",
356+
key: bytes | Any | None = None,
345357
partition: int | None = None,
346358
timestamp_ms: int | None = None,
347359
headers: dict[str, str] | None = None,
@@ -356,6 +368,13 @@ async def publish(
356368
Messages bodies to send.
357369
topic:
358370
Topic where the message will be published.
371+
key:
372+
A single key to associate with every message in this batch. If a
373+
partition is not specified and the producer uses the default
374+
partitioner, messages with the same key will be routed to the
375+
same partition. Must be bytes or serializable to bytes via the
376+
configured key serializer. If omitted, falls back to the
377+
publisher's default key (if configured).
359378
partition:
360379
Specify a partition. If not set, the partition will be
361380
selected using the configured `partitioner`
@@ -378,7 +397,7 @@ async def publish(
378397
"""
379398
cmd = KafkaPublishCommand(
380399
*messages,
381-
key=None,
400+
key=key or self.key,
382401
topic=topic or self.topic,
383402
partition=partition if partition is not None else self.partition,
384403
reply_to=reply_to or self.reply_to,
@@ -410,6 +429,7 @@ async def _publish(
410429
cmd.reply_to = cmd.reply_to or self.reply_to
411430

412431
cmd.partition = cmd.partition if cmd.partition is not None else self.partition
432+
cmd.key = cmd.key or self.key
413433

414434
await self._basic_publish_batch(
415435
cmd,

0 commit comments

Comments
 (0)