Skip to content

Commit 9dfaa63

Browse files
committed
chore(demo-kafka): fix org.apache.kafka.common.errors.TimeoutException
1 parent f0574e7 commit 9dfaa63

File tree

2 files changed

+52
-7
lines changed

2 files changed

+52
-7
lines changed
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package com.helltractor.demo;
2+
3+
import org.testcontainers.containers.Network;
4+
import org.testcontainers.kafka.ConfluentKafkaContainer;
5+
import org.testcontainers.lifecycle.Startable;
6+
import org.testcontainers.utility.DockerImageName;
7+
8+
public class ConfluentKafkaContainerStandalone implements Startable {
9+
10+
private final Network network;
11+
12+
private final ConfluentKafkaContainer container;
13+
14+
public ConfluentKafkaContainerStandalone(String confluentPlatformVersion) {
15+
this.network = Network.newNetwork();
16+
this.container = new ConfluentKafkaContainer(
17+
DockerImageName.parse("confluentinc/cp-kafka").withTag(confluentPlatformVersion)
18+
)
19+
.withNetwork(this.network)
20+
.withNetworkAliases("kafka-broker")
21+
.withEnv("KAFKA_BROKER_ID", "1")
22+
.withEnv("KAFKA_NODE_ID", "1")
23+
.withEnv("KAFKA_LISTENERS", "PLAINTEXT://0.0.0.0:9092")
24+
.withEnv("KAFKA_ADVERTISED_LISTENERS", "PLAINTEXT://localhost:9092")
25+
.withEnv("KAFKA_CONTROLLER_QUORUM_VOTERS", "1@kafka-broker:9093")
26+
.withEnv("KAFKA_AUTO_CREATE_TOPICS_ENABLE", "true")
27+
.withEnv("KAFKA_LOG_DIRS", "/var/lib/kafka/data")
28+
.withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "1")
29+
.withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "1");
30+
}
31+
32+
33+
@Override
34+
public void start() {
35+
container.start();
36+
}
37+
38+
@Override
39+
public void stop() {
40+
container.stop();
41+
}
42+
}

demo-kafka/src/test/java/com/helltractor/demo/messaging/MessagingTest.java

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package com.helltractor.demo.messaging;
22

3-
import com.helltractor.demo.ConfluentKafkaContainerCluster;
3+
import com.helltractor.demo.ConfluentKafkaContainerStandalone;
44
import com.helltractor.demo.KafkaApplication;
55
import com.helltractor.demo.message.TestMessage;
66
import com.helltractor.demo.util.IpUtil;
@@ -11,38 +11,41 @@
1111
import org.slf4j.LoggerFactory;
1212
import org.springframework.beans.factory.annotation.Autowired;
1313
import org.springframework.boot.test.context.SpringBootTest;
14+
import org.springframework.context.annotation.Lazy;
15+
import org.springframework.test.annotation.DirtiesContext;
1416

1517
import java.util.List;
1618
import java.util.concurrent.atomic.AtomicInteger;
1719

1820
import static org.junit.jupiter.api.Assertions.assertEquals;
1921

2022
@SpringBootTest(classes = KafkaApplication.class)
23+
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
2124
public class MessagingTest {
2225

2326
final static Logger logger = LoggerFactory.getLogger(MessagingTest.class);
2427

28+
@Lazy
2529
@Autowired
2630
MessagingFactory messagingFactory;
2731

28-
ConfluentKafkaContainerCluster cluster;
32+
ConfluentKafkaContainerStandalone standalone;
2933

3034
MessageProducer<TestMessage> processorOne;
3135
MessageProducer<TestMessage> processorTwo;
3236
MessageProducer<TestMessage> processorThree;
3337

3438
@BeforeEach
3539
void init() {
36-
cluster = new ConfluentKafkaContainerCluster("7.4.0", 1, 1);
37-
cluster.start();
40+
standalone = new ConfluentKafkaContainerStandalone("7.4.0");
3841
processorOne = messagingFactory.createMessageProducer(Messaging.Topic.TOPIC_ONE, TestMessage.class);
3942
processorTwo = messagingFactory.createMessageProducer(Messaging.Topic.TOPIC_TWO, TestMessage.class);
4043
processorThree = messagingFactory.createMessageProducer(Messaging.Topic.TOPIC_THREE, TestMessage.class);
4144
}
4245

4346
@AfterEach
4447
void destroy() {
45-
cluster.stop();
48+
standalone.stop();
4649
}
4750

4851
static class TestConsumer {
@@ -65,7 +68,7 @@ int getTotalMessages() {
6568

6669
@Test
6770
void test() throws InterruptedException {
68-
for (int i = 0; i < 1000; i++) {
71+
for (int i = 0; i < 100; i++) {
6972
TestMessage testMessage = new TestMessage();
7073
testMessage.message = "Test-" + i;
7174
processorOne.sendMessages(testMessage);
@@ -80,6 +83,6 @@ void test() throws InterruptedException {
8083
}
8184

8285
Thread.sleep(10000);
83-
assertEquals(3000, testConsumer.getTotalMessages());
86+
assertEquals(300, testConsumer.getTotalMessages());
8487
}
8588
}

0 commit comments

Comments
 (0)