Skip to content
This repository was archived by the owner on Feb 16, 2025. It is now read-only.

Commit d1c43e2

Browse files
committed
Added string string producer / consumer
1 parent 949718f commit d1c43e2

File tree

1 file changed

+22
-0
lines changed

1 file changed

+22
-0
lines changed

kotest-extensions-testcontainers-kafka/src/main/kotlin/io/kotest/extensions/testcontainers/kafka/KafkaContainerExtension.kt

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ import org.apache.kafka.clients.producer.KafkaProducer
1111
import org.apache.kafka.clients.producer.ProducerConfig
1212
import org.apache.kafka.common.serialization.BytesDeserializer
1313
import org.apache.kafka.common.serialization.BytesSerializer
14+
import org.apache.kafka.common.serialization.StringDeserializer
15+
import org.apache.kafka.common.serialization.StringSerializer
1416
import org.apache.kafka.common.utils.Bytes
1517
import org.testcontainers.containers.KafkaContainer
1618
import org.testcontainers.utility.DockerImageName
@@ -43,6 +45,15 @@ fun KafkaContainer.producer(configure: Properties.() -> Unit = {}): KafkaProduce
4345
return KafkaProducer<Bytes, Bytes>(props)
4446
}
4547

48+
fun KafkaContainer.stringStringProducer(configure: Properties.() -> Unit = {}): KafkaProducer<String, String> {
49+
val props = Properties()
50+
props[CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers
51+
props[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
52+
props[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
53+
props.configure()
54+
return KafkaProducer<String, String>(props)
55+
}
56+
4657
fun KafkaContainer.consumer(configure: Properties.() -> Unit = {}): KafkaConsumer<Bytes, Bytes> {
4758
val props = Properties()
4859
props[CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers
@@ -53,3 +64,14 @@ fun KafkaContainer.consumer(configure: Properties.() -> Unit = {}): KafkaConsume
5364
props.configure()
5465
return KafkaConsumer<Bytes, Bytes>(props)
5566
}
67+
68+
fun KafkaContainer.stringStringConsumer(configure: Properties.() -> Unit = {}): KafkaConsumer<String, String> {
69+
val props = Properties()
70+
props[CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers
71+
props[ConsumerConfig.GROUP_ID_CONFIG] = "kotest_consumer_" + System.currentTimeMillis()
72+
props[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "earliest"
73+
props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
74+
props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
75+
props.configure()
76+
return KafkaConsumer<String, String>(props)
77+
}

0 commit comments

Comments
 (0)