diff --git a/faststream/confluent/parser.py b/faststream/confluent/parser.py index fed9ae72fe..fe88fc88cd 100644 --- a/faststream/confluent/parser.py +++ b/faststream/confluent/parser.py @@ -1,4 +1,3 @@ -from collections.abc import Sequence from typing import TYPE_CHECKING, Any from faststream.message import StreamMessage, decode_message @@ -28,7 +27,15 @@ async def parse_message( message: "Message", ) -> KafkaMessage: """Parses a Kafka message.""" - headers = _parse_msg_headers(message.headers() or ()) + headers = dict(message.headers() or ()) + decoded_headers = { + header: value.decode(errors="replace") + for header, value in ( + headers.get("reply_to"), + headers.get("content-type"), + headers.get("correlation_id"), + ) + } body = message.value() or b"" offset = message.offset() @@ -37,10 +44,10 @@ async def parse_message( return KafkaMessage( body=body, headers=headers, - reply_to=headers.get("reply_to", ""), - content_type=headers.get("content-type"), + reply_to=decoded_headers.get("reply_to") or None, + content_type=decoded_headers.get("content-type") or None, message_id=f"{offset}-{timestamp}", - correlation_id=headers.get("correlation_id"), + correlation_id=decoded_headers.get("correlation_id") or None, raw_message=message, consumer=self._consumer, is_manual=self.is_manual, @@ -52,14 +59,14 @@ async def parse_message_batch( ) -> KafkaMessage: """Parses a batch of messages from a Kafka consumer.""" body: list[Any] = [] - batch_headers: list[dict[str, str]] = [] + batch_headers: list[dict[str, bytes]] = [] first = message[0] last = message[-1] for m in message: body.append(m.value() or b"") - batch_headers.append(_parse_msg_headers(m.headers() or ())) + batch_headers.append(dict(m.headers() or ())) headers = next(iter(batch_headers), {}) @@ -69,10 +76,16 @@ async def parse_message_batch( body=body, headers=headers, batch_headers=batch_headers, - reply_to=headers.get("reply_to", ""), - content_type=headers.get("content-type"), + reply_to=headers.get("reply_to").decode() + if "content-type" in headers + else None, + content_type=headers.get("content-type").decode() + if "content-type" in headers + else None, message_id=f"{first.offset()}-{last.offset()}-{first_timestamp}", - correlation_id=headers.get("correlation_id"), + correlation_id=headers.get("correlation_id").decode() + if "correlation_id" in headers + else None, raw_message=message, consumer=self._consumer, is_manual=self.is_manual, @@ -91,9 +104,3 @@ async def decode_message_batch( ) -> "DecodedMessage": """Decode a batch of messages.""" return [decode_message(await self.parse_message(m)) for m in msg.raw_message] - - -def _parse_msg_headers( - headers: Sequence[tuple[str, bytes | str]], -) -> dict[str, str]: - return {i: j if isinstance(j, str) else j.decode() for i, j in headers} diff --git a/faststream/kafka/parser.py b/faststream/kafka/parser.py index cb3e956bb1..c2e31f17ef 100644 --- a/faststream/kafka/parser.py +++ b/faststream/kafka/parser.py @@ -38,7 +38,7 @@ async def parse_message( message: Union["ConsumerRecord", "KafkaRawMessage"], ) -> "StreamMessage[ConsumerRecord]": """Parses a Kafka message.""" - headers = {i: j.decode() for i, j in message.headers} + headers = {i: j.decode(errors="replace") for i, j in message.headers} return self.msg_class( body=message.value or b"", diff --git a/faststream/message/message.py b/faststream/message/message.py index 8a73da3aa4..e90a1f570b 100644 --- a/faststream/message/message.py +++ b/faststream/message/message.py @@ -31,9 +31,9 @@ def __init__( raw_message: "MsgType", body: bytes | Any, *, - headers: dict[str, Any] | None = None, + headers: dict[str, bytes] | None = None, reply_to: str = "", - batch_headers: list[dict[str, Any]] | None = None, + batch_headers: list[dict[str, bytes]] | None = None, path: dict[str, Any] | None = None, content_type: str | None = None, correlation_id: str | None = None,