Skip to content

Commit 23b7ebc

Browse files
committed
chore(demo-kafka): fix org.apache.kafka.common.errors.TimeoutException
1 parent 9dfaa63 commit 23b7ebc

File tree

5 files changed

+94
-70
lines changed

5 files changed

+94
-70
lines changed

demo-kafka/pom.xml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,11 @@
4444
<artifactId>spring-boot-starter-test</artifactId>
4545
<scope>test</scope>
4646
</dependency>
47+
<dependency>
48+
<groupId>org.springframework.boot</groupId>
49+
<artifactId>spring-boot-testcontainers</artifactId>
50+
<scope>test</scope>
51+
</dependency>
4752
<dependency>
4853
<groupId>org.springframework.kafka</groupId>
4954
<artifactId>spring-kafka-test</artifactId>
@@ -54,6 +59,12 @@
5459
<artifactId>junit-jupiter-api</artifactId>
5560
<scope>test</scope>
5661
</dependency>
62+
<dependency>
63+
<groupId>org.testcontainers</groupId>
64+
<artifactId>junit-jupiter</artifactId>
65+
<version>${testcontainers.version}</version>
66+
<scope>test</scope>
67+
</dependency>
5768
<dependency>
5869
<groupId>org.testcontainers</groupId>
5970
<artifactId>kafka</artifactId>

demo-kafka/src/test/java/com/helltractor/demo/ConfluentKafkaContainerStandalone.java

Lines changed: 0 additions & 42 deletions
This file was deleted.

demo-kafka/src/test/java/com/helltractor/demo/ConfluentKafkaContainerCluster.java renamed to demo-kafka/src/test/java/com/helltractor/demo/container/ConfluentKafkaContainerCluster.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package com.helltractor.demo;
1+
package com.helltractor.demo.container;
22

33
import org.apache.kafka.common.Uuid;
44
import org.awaitility.Awaitility;
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
package com.helltractor.demo.container;
2+
3+
import com.github.dockerjava.api.command.InspectContainerResponse;
4+
import org.testcontainers.containers.GenericContainer;
5+
import org.testcontainers.containers.wait.strategy.Wait;
6+
import org.testcontainers.containers.wait.strategy.WaitAllStrategy;
7+
import org.testcontainers.images.builder.Transferable;
8+
import org.testcontainers.utility.DockerImageName;
9+
10+
public class KafkaLocalContainer extends GenericContainer<KafkaLocalContainer> {
11+
12+
private static final String STARTER_SCRIPT = "/testcontainers_start.sh";
13+
14+
public KafkaLocalContainer(String image) {
15+
super(DockerImageName.parse(image));
16+
withExposedPorts(9092, 8082);
17+
var waitStrategy = new WaitAllStrategy().withStrategy(Wait.forLogMessage(".*started.*\\n", 1))
18+
.withStrategy(Wait.forHttp("/").forPort(8082).forStatusCode(200));
19+
waitingFor(waitStrategy);
20+
withCreateContainerCmdModifier(cmd -> {
21+
cmd.withEntrypoint("sh");
22+
});
23+
withCommand("-c", "while [ ! -f " + STARTER_SCRIPT + " ]; do sleep 0.1; done; " + STARTER_SCRIPT);
24+
}
25+
26+
@Override
27+
protected void containerIsStarting(InspectContainerResponse containerInfo) {
28+
var defaultListeners = "PLAINTEXT://localhost:29092,CONTROLLER://localhost:29093,PLAINTEXT_HOST://0.0.0.0:9092";
29+
var defaultAdvertisedListeners = "PLAINTEXT://localhost:29092,PLAINTEXT_HOST://%s:%d".formatted(getHost(),
30+
getMappedPort(9092));
31+
var defaultSecurityProtocolMap = "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT";
32+
33+
var script = """
34+
#!/bin/bash
35+
export KAFKA_LISTENERS="%s"
36+
export KAFKA_ADVERTISED_LISTENERS="%s"
37+
export KAFKA_LISTENER_SECURITY_PROTOCOL_MAP="%s"
38+
39+
/etc/confluent/docker/run
40+
""".formatted(defaultListeners, defaultAdvertisedListeners, defaultSecurityProtocolMap);
41+
copyFileToContainer(Transferable.of(script, 0777), STARTER_SCRIPT);
42+
}
43+
44+
public String getBootstrapServer() {
45+
return "%s:%d".formatted(getHost(), getMappedPort(9092));
46+
}
47+
48+
public String getRestProxyUrl() {
49+
return "http://%s:%d".formatted(getHost(), getMappedPort(8082));
50+
}
51+
}
Lines changed: 31 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,69 +1,70 @@
11
package com.helltractor.demo.messaging;
22

3-
import com.helltractor.demo.ConfluentKafkaContainerStandalone;
43
import com.helltractor.demo.KafkaApplication;
4+
import com.helltractor.demo.container.ConfluentKafkaContainerCluster;
55
import com.helltractor.demo.message.TestMessage;
66
import com.helltractor.demo.util.IpUtil;
7+
import org.awaitility.Awaitility;
78
import org.junit.jupiter.api.AfterEach;
89
import org.junit.jupiter.api.BeforeEach;
910
import org.junit.jupiter.api.Test;
1011
import org.slf4j.Logger;
1112
import org.slf4j.LoggerFactory;
1213
import org.springframework.beans.factory.annotation.Autowired;
1314
import org.springframework.boot.test.context.SpringBootTest;
14-
import org.springframework.context.annotation.Lazy;
15-
import org.springframework.test.annotation.DirtiesContext;
15+
import org.springframework.test.context.DynamicPropertyRegistry;
16+
import org.springframework.test.context.DynamicPropertySource;
1617

18+
import java.time.Duration;
1719
import java.util.List;
1820
import java.util.concurrent.atomic.AtomicInteger;
1921

2022
import static org.junit.jupiter.api.Assertions.assertEquals;
2123

2224
@SpringBootTest(classes = KafkaApplication.class)
23-
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
2425
public class MessagingTest {
2526

2627
final static Logger logger = LoggerFactory.getLogger(MessagingTest.class);
2728

28-
@Lazy
29+
static class TestConsumer {
30+
31+
final AtomicInteger messageCount = new AtomicInteger();
32+
33+
final long startTime = System.currentTimeMillis();
34+
35+
void processMessages(List<TestMessage> messages) {
36+
messageCount.addAndGet(messages.size());
37+
long currentTime = System.currentTimeMillis();
38+
logger.info("Received {} messages, Total: {}, Elapsed time: {} ms",
39+
messages.size(), messageCount.get(), currentTime - startTime);
40+
}
41+
42+
int getTotalMessages() {
43+
return messageCount.get();
44+
}
45+
}
46+
2947
@Autowired
3048
MessagingFactory messagingFactory;
3149

32-
ConfluentKafkaContainerStandalone standalone;
50+
ConfluentKafkaContainerCluster cluster;
3351

3452
MessageProducer<TestMessage> processorOne;
3553
MessageProducer<TestMessage> processorTwo;
3654
MessageProducer<TestMessage> processorThree;
3755

3856
@BeforeEach
3957
void init() {
40-
standalone = new ConfluentKafkaContainerStandalone("7.4.0");
58+
cluster = new ConfluentKafkaContainerCluster("7.4.0", 1, 1);
59+
cluster.start();
4160
processorOne = messagingFactory.createMessageProducer(Messaging.Topic.TOPIC_ONE, TestMessage.class);
4261
processorTwo = messagingFactory.createMessageProducer(Messaging.Topic.TOPIC_TWO, TestMessage.class);
4362
processorThree = messagingFactory.createMessageProducer(Messaging.Topic.TOPIC_THREE, TestMessage.class);
4463
}
4564

4665
@AfterEach
4766
void destroy() {
48-
standalone.stop();
49-
}
50-
51-
static class TestConsumer {
52-
53-
final AtomicInteger messageCount = new AtomicInteger();
54-
55-
final long startTime = System.currentTimeMillis();
56-
57-
void processMessages(List<TestMessage> messages) {
58-
messageCount.addAndGet(messages.size());
59-
long currentTime = System.currentTimeMillis();
60-
logger.info("Received {} messages, Total: {}, Elapsed time: {} ms",
61-
messages.size(), messageCount.get(), currentTime - startTime);
62-
}
63-
64-
int getTotalMessages() {
65-
return messageCount.get();
66-
}
67+
cluster.stop();
6768
}
6869

6970
@Test
@@ -82,7 +83,10 @@ void test() throws InterruptedException {
8283
messagingFactory.createBatchMessageListener(topic, groupId, testConsumer::processMessages);
8384
}
8485

85-
Thread.sleep(10000);
86+
Awaitility.await()
87+
.atMost(Duration.ofSeconds(10))
88+
.until(() -> testConsumer.getTotalMessages() == 300);
89+
8690
assertEquals(300, testConsumer.getTotalMessages());
8791
}
8892
}

0 commit comments

Comments
 (0)