Skip to content

Commit 433c80d

Browse files
committed
feat(demo-kafka): add kafka demo module
1 parent a26f5f6 commit 433c80d

File tree

9 files changed

+348
-96
lines changed

9 files changed

+348
-96
lines changed

demo-kafka/pom.xml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
<groupId>org.springframework.boot</groupId>
2424
<artifactId>spring-boot-starter</artifactId>
2525
</dependency>
26-
2726
<dependency>
2827
<groupId>org.springframework.kafka</groupId>
2928
<artifactId>spring-kafka</artifactId>
@@ -39,6 +38,11 @@
3938
<artifactId>slf4j-api</artifactId>
4039
</dependency>
4140

41+
<dependency>
42+
<groupId>org.projectlombok</groupId>
43+
<artifactId>lombok</artifactId>
44+
</dependency>
45+
4246
<dependency>
4347
<groupId>org.springframework.boot</groupId>
4448
<artifactId>spring-boot-starter-test</artifactId>
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package com.helltractor.demo.message;
22

3-
public class TestMessage extends AbstractMessage {
4-
3+
public class SimpleMessage extends AbstractMessage {
4+
55
public String message;
66
}

demo-kafka/src/main/java/com/helltractor/demo/messaging/MessagingConfiguration.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ private Map<String, Object> producerConfigs(String bootstrapServers) {
3535
private Map<String, Object> consumerConfigs(String bootstrapServers, int batchSize) {
3636
Map<String, Object> configs = new HashMap<>();
3737
configs.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, Integer.valueOf(batchSize));
38-
configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
38+
configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
3939
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
4040
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
4141
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

demo-kafka/src/test/java/com/helltractor/demo/ConfluentKafkaContainerCluster.java renamed to demo-kafka/src/test/java/com/helltractor/demo/container/ConfluentKafkaContainerCluster.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package com.helltractor.demo;
1+
package com.helltractor.demo.container;
22

33
import org.apache.kafka.common.Uuid;
44
import org.awaitility.Awaitility;
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
package com.helltractor.demo.container;
2+
3+
import lombok.SneakyThrows;
4+
import org.awaitility.Awaitility;
5+
import org.testcontainers.containers.Container;
6+
import org.testcontainers.containers.GenericContainer;
7+
import org.testcontainers.containers.KafkaContainer;
8+
import org.testcontainers.containers.Network;
9+
import org.testcontainers.lifecycle.Startable;
10+
import org.testcontainers.utility.DockerImageName;
11+
12+
import java.time.Duration;
13+
import java.util.Collection;
14+
import java.util.stream.Collectors;
15+
import java.util.stream.IntStream;
16+
import java.util.stream.Stream;
17+
18+
import static org.assertj.core.api.Assertions.assertThat;
19+
20+
/**
21+
* Provides an easy way to launch a Kafka cluster with multiple brokers.
22+
*/
23+
public class KafkaContainerCluster implements Startable {
24+
25+
private final int brokersNum;
26+
27+
private final Network network;
28+
29+
private final GenericContainer<?> zookeeper;
30+
31+
private final Collection<KafkaContainer> brokers;
32+
33+
public KafkaContainerCluster(String confluentPlatformVersion, int brokersNum, int internalTopicsRf) {
34+
if (brokersNum < 0) {
35+
throw new IllegalArgumentException("brokersNum '" + brokersNum + "' must be greater than 0");
36+
}
37+
if (internalTopicsRf < 0 || internalTopicsRf > brokersNum) {
38+
throw new IllegalArgumentException(
39+
"internalTopicsRf '" + internalTopicsRf + "' must be less than brokersNum and greater than 0"
40+
);
41+
}
42+
43+
this.brokersNum = brokersNum;
44+
this.network = Network.newNetwork();
45+
46+
this.zookeeper =
47+
new GenericContainer<>(DockerImageName.parse("confluentinc/cp-zookeeper").withTag(confluentPlatformVersion))
48+
.withNetwork(network)
49+
.withNetworkAliases("zookeeper")
50+
.withEnv("ZOOKEEPER_CLIENT_PORT", String.valueOf(KafkaContainer.ZOOKEEPER_PORT));
51+
52+
this.brokers =
53+
IntStream
54+
.range(0, this.brokersNum)
55+
.mapToObj(brokerNum -> {
56+
return new KafkaContainer(
57+
DockerImageName.parse("confluentinc/cp-kafka").withTag(confluentPlatformVersion)
58+
)
59+
.withNetwork(this.network)
60+
.withNetworkAliases("broker-" + brokerNum)
61+
.dependsOn(this.zookeeper)
62+
.withExternalZookeeper("zookeeper:" + KafkaContainer.ZOOKEEPER_PORT)
63+
.withEnv("KAFKA_BROKER_ID", brokerNum + "")
64+
.withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", internalTopicsRf + "")
65+
.withEnv("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", internalTopicsRf + "")
66+
.withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", internalTopicsRf + "")
67+
.withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", internalTopicsRf + "")
68+
.withStartupTimeout(Duration.ofMinutes(1));
69+
})
70+
.collect(Collectors.toList());
71+
}
72+
73+
public Collection<KafkaContainer> getBrokers() {
74+
return this.brokers;
75+
}
76+
77+
public String getBootstrapServers() {
78+
return brokers.stream().map(KafkaContainer::getBootstrapServers).collect(Collectors.joining(","));
79+
}
80+
81+
private Stream<GenericContainer<?>> allContainers() {
82+
return Stream.concat(this.brokers.stream(), Stream.of(this.zookeeper));
83+
}
84+
85+
@Override
86+
@SneakyThrows
87+
public void start() {
88+
// sequential start to avoid resource contention on CI systems with weaker hardware
89+
brokers.forEach(GenericContainer::start);
90+
91+
Awaitility
92+
.await()
93+
.atMost(Duration.ofSeconds(30))
94+
.untilAsserted(() -> {
95+
Container.ExecResult result =
96+
this.zookeeper.execInContainer(
97+
"sh",
98+
"-c",
99+
"zookeeper-shell zookeeper:" +
100+
KafkaContainer.ZOOKEEPER_PORT +
101+
" ls /brokers/ids | tail -n 1"
102+
);
103+
String brokers = result.getStdout();
104+
105+
assertThat(brokers.split(",")).hasSize(this.brokersNum);
106+
});
107+
}
108+
109+
@Override
110+
public void stop() {
111+
allContainers().parallel().forEach(GenericContainer::stop);
112+
}
113+
}
Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
package com.helltractor.demo.container;
2+
3+
import java.time.Duration;
4+
import java.util.Collection;
5+
import java.util.Collections;
6+
import java.util.UUID;
7+
import java.util.concurrent.TimeUnit;
8+
9+
import org.apache.kafka.clients.admin.AdminClient;
10+
import org.apache.kafka.clients.admin.AdminClientConfig;
11+
import org.apache.kafka.clients.admin.NewTopic;
12+
import org.apache.kafka.clients.consumer.ConsumerConfig;
13+
import org.apache.kafka.clients.consumer.ConsumerRecord;
14+
import org.apache.kafka.clients.consumer.ConsumerRecords;
15+
import org.apache.kafka.clients.consumer.KafkaConsumer;
16+
import org.apache.kafka.clients.producer.KafkaProducer;
17+
import org.apache.kafka.clients.producer.ProducerConfig;
18+
import org.apache.kafka.clients.producer.ProducerRecord;
19+
import org.apache.kafka.common.serialization.StringDeserializer;
20+
import org.apache.kafka.common.serialization.StringSerializer;
21+
import static org.assertj.core.api.Assertions.assertThat;
22+
import static org.assertj.core.api.Assertions.tuple;
23+
import org.awaitility.Awaitility;
24+
import org.junit.jupiter.api.Test;
25+
import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
26+
27+
class KafkaContainerClusterTest {
28+
29+
@Test
30+
void testKafkaContainerCluster() throws Exception {
31+
try (KafkaContainerCluster cluster = new KafkaContainerCluster("6.2.1", 3, 2)) {
32+
cluster.start();
33+
String bootstrapServers = cluster.getBootstrapServers();
34+
35+
assertThat(cluster.getBrokers()).hasSize(3);
36+
37+
testKafkaFunctionality(bootstrapServers, 3, 2);
38+
}
39+
}
40+
41+
@Test
42+
void testKafkaContainerKraftCluster() throws Exception {
43+
try (KafkaContainerKraftCluster cluster = new KafkaContainerKraftCluster("7.0.0", 3, 2)) {
44+
cluster.start();
45+
String bootstrapServers = cluster.getBootstrapServers();
46+
47+
assertThat(cluster.getBrokers()).hasSize(3);
48+
49+
testKafkaFunctionality(bootstrapServers, 3, 2);
50+
}
51+
}
52+
53+
@Test
54+
void testKafkaContainerKraftClusterAfterConfluentPlatform740() throws Exception {
55+
try (KafkaContainerKraftCluster cluster = new KafkaContainerKraftCluster("7.4.0", 3, 2)) {
56+
cluster.start();
57+
String bootstrapServers = cluster.getBootstrapServers();
58+
59+
assertThat(cluster.getBrokers()).hasSize(3);
60+
61+
testKafkaFunctionality(bootstrapServers, 3, 2);
62+
}
63+
}
64+
65+
protected void testKafkaFunctionality(String bootstrapServers, int partitions, int rf) throws Exception {
66+
try (
67+
AdminClient adminClient = AdminClient.create(
68+
ImmutableMap.of(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
69+
); KafkaProducer<String, String> producer = new KafkaProducer<>(
70+
ImmutableMap.of(
71+
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
72+
bootstrapServers,
73+
ProducerConfig.CLIENT_ID_CONFIG,
74+
UUID.randomUUID().toString()
75+
),
76+
new StringSerializer(),
77+
new StringSerializer()
78+
); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(
79+
ImmutableMap.of(
80+
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
81+
bootstrapServers,
82+
ConsumerConfig.GROUP_ID_CONFIG,
83+
"tc-" + UUID.randomUUID(),
84+
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
85+
"earliest"
86+
),
87+
new StringDeserializer(),
88+
new StringDeserializer()
89+
);) {
90+
String topicName = "messages";
91+
92+
Collection<NewTopic> topics = Collections.singletonList(new NewTopic(topicName, partitions, (short) rf));
93+
adminClient.createTopics(topics).all().get(30, TimeUnit.SECONDS);
94+
95+
consumer.subscribe(Collections.singletonList(topicName));
96+
97+
producer.send(new ProducerRecord<>(topicName, "testcontainers", "rulezzz")).get();
98+
99+
Awaitility
100+
.await()
101+
.atMost(Duration.ofSeconds(10))
102+
.untilAsserted(() -> {
103+
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
104+
105+
assertThat(records)
106+
.hasSize(1)
107+
.extracting(ConsumerRecord::topic, ConsumerRecord::key, ConsumerRecord::value)
108+
.containsExactly(tuple(topicName, "testcontainers", "rulezzz"));
109+
});
110+
111+
consumer.unsubscribe();
112+
}
113+
}
114+
}
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
package com.helltractor.demo.container;
2+
3+
import org.apache.kafka.common.Uuid;
4+
import org.awaitility.Awaitility;
5+
import org.testcontainers.containers.Container;
6+
import org.testcontainers.containers.GenericContainer;
7+
import org.testcontainers.containers.KafkaContainer;
8+
import org.testcontainers.containers.Network;
9+
import org.testcontainers.lifecycle.Startable;
10+
import org.testcontainers.utility.DockerImageName;
11+
12+
import java.time.Duration;
13+
import java.util.Collection;
14+
import java.util.stream.Collectors;
15+
import java.util.stream.IntStream;
16+
17+
import static org.assertj.core.api.Assertions.assertThat;
18+
19+
public class KafkaContainerKraftCluster implements Startable {
20+
21+
private final int brokersNum;
22+
23+
private final Network network;
24+
25+
private final Collection<KafkaContainer> brokers;
26+
27+
public KafkaContainerKraftCluster(String confluentPlatformVersion, int brokersNum, int internalTopicsRf) {
28+
if (brokersNum < 0) {
29+
throw new IllegalArgumentException("brokersNum '" + brokersNum + "' must be greater than 0");
30+
}
31+
if (internalTopicsRf < 0 || internalTopicsRf > brokersNum) {
32+
throw new IllegalArgumentException(
33+
"internalTopicsRf '" + internalTopicsRf + "' must be less than brokersNum and greater than 0"
34+
);
35+
}
36+
37+
this.brokersNum = brokersNum;
38+
this.network = Network.newNetwork();
39+
40+
String controllerQuorumVoters = IntStream
41+
.range(0, brokersNum)
42+
.mapToObj(brokerNum -> String.format("%d@broker-%d:9094", brokerNum, brokerNum))
43+
.collect(Collectors.joining(","));
44+
45+
String clusterId = Uuid.randomUuid().toString();
46+
47+
this.brokers =
48+
IntStream
49+
.range(0, brokersNum)
50+
.mapToObj(brokerNum -> {
51+
return new KafkaContainer(
52+
DockerImageName.parse("confluentinc/cp-kafka").withTag(confluentPlatformVersion)
53+
)
54+
.withNetwork(this.network)
55+
.withNetworkAliases("broker-" + brokerNum)
56+
.withKraft()
57+
.withClusterId(clusterId)
58+
.withEnv("KAFKA_BROKER_ID", brokerNum + "")
59+
.withEnv("KAFKA_NODE_ID", brokerNum + "")
60+
.withEnv("KAFKA_CONTROLLER_QUORUM_VOTERS", controllerQuorumVoters)
61+
.withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", internalTopicsRf + "")
62+
.withEnv("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", internalTopicsRf + "")
63+
.withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", internalTopicsRf + "")
64+
.withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", internalTopicsRf + "")
65+
.withStartupTimeout(Duration.ofMinutes(1));
66+
})
67+
.collect(Collectors.toList());
68+
}
69+
70+
public Collection<KafkaContainer> getBrokers() {
71+
return this.brokers;
72+
}
73+
74+
public String getBootstrapServers() {
75+
return brokers.stream().map(KafkaContainer::getBootstrapServers).collect(Collectors.joining(","));
76+
}
77+
78+
@Override
79+
public void start() {
80+
// Needs to start all the brokers at once
81+
brokers.parallelStream().forEach(GenericContainer::start);
82+
83+
Awaitility
84+
.await()
85+
.atMost(Duration.ofSeconds(30))
86+
.untilAsserted(() -> {
87+
Container.ExecResult result =
88+
this.brokers.stream()
89+
.findFirst()
90+
.get()
91+
.execInContainer(
92+
"sh",
93+
"-c",
94+
"kafka-metadata-shell --snapshot /var/lib/kafka/data/__cluster_metadata-0/00000000000000000000.log ls /brokers | wc -l"
95+
);
96+
String brokers = result.getStdout().replace("\n", "");
97+
98+
assertThat(brokers).isEqualTo(String.valueOf(this.brokersNum));
99+
});
100+
}
101+
102+
@Override
103+
public void stop() {
104+
this.brokers.stream().parallel().forEach(GenericContainer::stop);
105+
}
106+
}

0 commit comments

Comments
 (0)