-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Description
Overview
The latest version at time of writing - 2.2.11 - is causing our Kafka unit tests to fail:
09:41:39 test_send_and_receive (core.tests.messaging.test_kafka.KafkaBasicTest.test_send_and_receive)
09:41:40 Send and receive a message. ...
09:41:40 2025-06-09 08:41:39,907 kafka.cluster WARNING Topic messaging-test is not available during auto-create initialization
09:41:43 2025-06-09 08:41:43,134 kafka.coordinator.assignors.range WARNING No partition metadata for topic messaging-test
09:41:50 2025-06-09 08:41:49,023 kafka.coordinator.heartbeat WARNING Heartbeat thread did not fully terminate during close
09:41:50 FAIL
...
09:41:55 test_can_request_retry (core.tests.messaging.test_kafka.KafkaRequestRetryTest.test_can_request_retry)
09:41:58 Request a retry i.e. when TransientProcessingException is observed. ... FAIL
When we pin the version to 2.2.10, the tests pass (a warning message is emitted but we're not so concerned about that currently):
kafka.coordinator.heartbeat WARNING Heartbeat thread did not fully terminate during close
It seems that the key ("messaging-test") that we use to guarantee strict message ordering is not being respected because the topic (also "messaging-test") is not available during auto-create initialization, hence the messages are arriving out of order.
We use bitnami/kafka:3.5.1 as the container image.
Description
Test failure 1
We mock out Kafka for most unit tests and only have a couple which test its basic functionality. test_send_and_receive is relatively simple:
def test_send_and_receive(self):
"""Send and receive a message."""
m = {"x": "y"}
# send the message, immediately closing the producer
self.producer.send(m, key=TEST_KEY)
self.producer.close()
# receive the message, immediately acknowledging and closing the consumer
received = self.consumer.receive()
self.consumer.acknowledge()
self.consumer.close()
# check the message is what we expected
self.assertEqual(m, received)With 2.2.11 this results in AssertionError: {'x': 'y'} != None, suggesting that the consumer receives nothing.
Test failure 2
The following test, test_can_request_retry, sends five messages with data {"value": i} where i is 0-4:
def test_can_request_retry(self):
"""Request a retry i.e. when TransientProcessingException is observed."""
# send some messages, immediately closing the producer
for i in range(5):
self.producer.send({"value": i}, TEST_KEY)
self.producer.close()
# receive and acknowledge 3 messages
for i in range(3):
received = self.consumer.receive()
self.assertEqual(received, {"value": i})
self.consumer.acknowledge()
# receive next message and request retry ten times
received = self.consumer.receive()
self.assertEqual(received, {"value": 3})
for i in range(10):
self.consumer.request_retry()
received = self.consumer.receive()
self.assertEqual(received, {"value": 3})
# finally acknowledge the message
self.consumer.acknowledge()
# receive the next message and tidy up
received = self.consumer.receive()
self.assertEqual(received, {"value": 4})
self.consumer.acknowledge()
self.consumer.close()However, the consumer in this test receives the message sent in the previous test and fails before receiving any further messages:
09:44:03 AssertionError: {'x': 'y'} != {'value': 0}
09:44:03 - {'x': 'y'}
09:44:03 + {'value': 0}
Test context
For completeness, these two test cases inherit from the same base class:
class KafkaTestCase(TestCase):
"""
Typically the producer and consumer objects would be long-lived.
Here they are closed immediately to minimise the window for test collisions.
These would ideally be the _only_ tests to actually send and receive messages for real, with
all other tests checking the interaction with the producer and sender via mocks.
"""
def setUp(self):
self.producer = Producer(TEST_TOPIC)
self.consumer = Consumer(TEST_TOPIC, TEST_GROUP)
# clean up to 500 lingering messages from the test topic to avoid stale messages causing test failure
for _ in range(500):
received = self.consumer.receive()
if received is None:
breakThe constants referenced in these examples are:
TEST_TOPIC = "messaging-test"
TEST_GROUP = "messaging-test-group"
TEST_KEY = b"messaging-test" # used to guarantee strict message orderingQuestions
Why is this now an issue for us, as of 2.2.11? Presumably it is a result of one of these changes:
- Add synchronized decorator; add lock to subscription state #2636
- Do not ignore metadata response for single topic with error #2640
However, we are unsure which one or why. Is there something we need to do to resolve this, or does this need to be addressed in the kafka-python repository?
Happy to provide further information as required. Thanks!