Skip to content

Commit c410486

Browse files
committed
fix(kafka): Use wait strategy instead of deprecated wait_for_logs
1 parent 429a4da commit c410486

File tree

1 file changed

+5
-3
lines changed

1 file changed

+5
-3
lines changed

modules/kafka/testcontainers/kafka/__init__.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import re
12
import tarfile
23
import time
34
from dataclasses import dataclass, field
@@ -10,7 +11,7 @@
1011
from testcontainers.core.container import DockerContainer
1112
from testcontainers.core.utils import raise_for_deprecated_parameter
1213
from testcontainers.core.version import ComparableVersion
13-
from testcontainers.core.waiting_utils import wait_for_logs
14+
from testcontainers.core.wait_strategies import LogMessageWaitStrategy
1415
from testcontainers.kafka._redpanda import RedpandaContainer
1516

1617
__all__ = [
@@ -59,7 +60,7 @@ def __init__(self, image: str = "confluentinc/cp-kafka:7.6.0", port: int = 9093,
5960
super().__init__(image, **kwargs)
6061
self.port = port
6162
self.kraft_enabled = False
62-
self.wait_for = r".*\[KafkaServer id=\d+\] started.*"
63+
self.wait_for = re.compile(r".*\[KafkaServer id=\d+\] started.*")
6364
self.boot_command = ""
6465
self.cluster_id = "MkU3OEVBNTcwNTJENDM2Qk"
6566
self.listeners = f"PLAINTEXT://0.0.0.0:{self.port},BROKER://0.0.0.0:9092"
@@ -179,7 +180,8 @@ def start(self, timeout=30) -> "KafkaContainer":
179180
self.with_command(command)
180181
super().start()
181182
self.tc_start()
182-
wait_for_logs(self, self.wait_for, timeout=timeout)
183+
wait_strategy = LogMessageWaitStrategy(self.wait_for)
184+
wait_strategy.wait_until_ready(self)
183185
return self
184186

185187
def create_file(self, content: bytes, path: str) -> None:

0 commit comments

Comments
 (0)