Skip to content

Commit 8736964

Browse files
committed
chore(demo-kafka): fix org.apache.kafka.common.errors.TimeoutException
1 parent a302f80 commit 8736964

17 files changed

+317
-198
lines changed

demo-kafka/pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
44
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
55
<modelVersion>4.0.0</modelVersion>
6+
67
<parent>
78
<groupId>com.helltractor.demo</groupId>
89
<artifactId>parent</artifactId>

demo-kafka/src/main/java/com/helltractor/demo/KafkaApplication.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55

66
@SpringBootApplication
77
public class KafkaApplication {
8-
8+
99
public static void main(String[] args) {
1010
SpringApplication.run(KafkaApplication.class, args);
1111
}

demo-kafka/src/main/java/com/helltractor/demo/message/AbstractMessage.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,12 @@
66
* Base message object for extends.
77
*/
88
public class AbstractMessage implements Serializable {
9-
9+
1010
/**
1111
* Reference id, or null if not set.
1212
*/
1313
public String refId = null;
14-
14+
1515
/**
1616
* Message create time
1717
*/
Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
package com.helltractor.demo.messaging;
22

3-
import com.helltractor.demo.message.AbstractMessage;
4-
53
import java.util.List;
64

5+
import com.helltractor.demo.message.AbstractMessage;
6+
77
@FunctionalInterface
88
public interface BatchMessageHandler<T extends AbstractMessage> {
9-
9+
1010
void processMessages(List<T> messages);
1111
}

demo-kafka/src/main/java/com/helltractor/demo/messaging/MessageConsumer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,6 @@
22

33
@FunctionalInterface
44
public interface MessageConsumer {
5-
5+
66
void stop();
77
}
Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,17 @@
11
package com.helltractor.demo.messaging;
22

3-
import com.helltractor.demo.message.AbstractMessage;
4-
53
import java.util.List;
64

5+
import com.helltractor.demo.message.AbstractMessage;
6+
77
@FunctionalInterface
88
public interface MessageProducer<T extends AbstractMessage> {
9-
9+
1010
default void sendMessages(List<T> messages) {
1111
for (T message : messages) {
1212
sendMessages(message);
1313
}
1414
}
15-
15+
1616
void sendMessages(T message);
1717
}

demo-kafka/src/main/java/com/helltractor/demo/messaging/MessageTypes.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,15 @@
2424
*/
2525
@Component
2626
public class MessageTypes {
27-
27+
2828
private static final char SEP = '#';
29-
29+
3030
private final Logger logger = LoggerFactory.getLogger(getClass());
31-
31+
3232
private final String messagePackage = AbstractMessage.class.getPackageName();
33-
33+
3434
private final Map<String, Class<? extends AbstractMessage>> messageTypes = new HashMap<>();
35-
35+
3636
/**
3737
* Find all message classes in the package and instantiate them.
3838
*/
@@ -67,13 +67,13 @@ public boolean match(MetadataReader metadataReader, MetadataReaderFactory metada
6767
}
6868
}
6969
}
70-
70+
7171
public String serialize(AbstractMessage message) {
7272
String type = message.getClass().getName();
7373
String json = JsonUtil.writeJson(message);
7474
return type + SEP + json;
7575
}
76-
76+
7777
public AbstractMessage deserialize(String data) {
7878
int pos = data.indexOf(SEP);
7979
if (pos == -1) {
@@ -87,15 +87,15 @@ public AbstractMessage deserialize(String data) {
8787
String json = data.substring(pos + 1);
8888
return JsonUtil.readJson(json, clazz);
8989
}
90-
90+
9191
public List<AbstractMessage> deserialize(List<String> dataList) {
9292
List<AbstractMessage> list = new ArrayList<>(dataList.size());
9393
for (String data : dataList) {
9494
list.add(deserialize(data));
9595
}
9696
return list;
9797
}
98-
98+
9999
public List<AbstractMessage> deserializeConsumerRecords(List<ConsumerRecord<String, String>> dataList) {
100100
List<AbstractMessage> list = new ArrayList<>(dataList.size());
101101
for (ConsumerRecord<String, String> data : dataList) {
Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,25 @@
11
package com.helltractor.demo.messaging;
22

33
public interface Messaging {
4-
4+
55
enum Topic {
6-
6+
77
TOPIC_ONE(1),
8-
98
TOPIC_TWO(2),
10-
119
TOPIC_THREE(3);
12-
10+
1311
final int concurrency;
14-
12+
1513
Topic(int concurrency) {
1614
this.concurrency = concurrency;
1715
}
18-
16+
1917
public int getConcurrency() {
2018
return this.concurrency;
2119
}
22-
20+
2321
public int getPartitions() {
2422
return this.concurrency;
2523
}
2624
}
27-
}
25+
}
Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
package com.helltractor.demo.messaging;
22

3+
import java.util.HashMap;
4+
import java.util.Map;
5+
36
import org.apache.kafka.clients.admin.AdminClientConfig;
47
import org.apache.kafka.clients.consumer.ConsumerConfig;
58
import org.apache.kafka.clients.producer.ProducerConfig;
@@ -13,25 +16,27 @@
1316
import org.springframework.context.annotation.Configuration;
1417
import org.springframework.kafka.annotation.EnableKafka;
1518
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
16-
import org.springframework.kafka.core.*;
17-
18-
import java.util.HashMap;
19-
import java.util.Map;
19+
import org.springframework.kafka.core.ConsumerFactory;
20+
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
21+
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
22+
import org.springframework.kafka.core.KafkaAdmin;
23+
import org.springframework.kafka.core.KafkaTemplate;
24+
import org.springframework.kafka.core.ProducerFactory;
2025

2126
@EnableKafka
2227
@Configuration
2328
public class MessagingConfiguration {
24-
29+
2530
private final Logger logger = LoggerFactory.getLogger(getClass());
26-
31+
2732
private Map<String, Object> producerConfigs(String bootstrapServers) {
2833
Map<String, Object> configs = new HashMap<>();
2934
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
3035
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
3136
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
3237
return configs;
3338
}
34-
39+
3540
private Map<String, Object> consumerConfigs(String bootstrapServers, int batchSize) {
3641
Map<String, Object> configs = new HashMap<>();
3742
configs.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, Integer.valueOf(batchSize));
@@ -41,27 +46,27 @@ private Map<String, Object> consumerConfigs(String bootstrapServers, int batchSi
4146
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
4247
return configs;
4348
}
44-
49+
4550
@Bean
4651
public KafkaAdmin admin(@Value("${spring.kafka.bootstrap-servers}") String bootstrapServers) {
4752
Map<String, Object> configs = new HashMap<>();
4853
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
4954
return new KafkaAdmin(configs);
5055
}
51-
56+
5257
@Bean
5358
public ProducerFactory<String, String> producerFactory(
5459
@Value("${spring.kafka.bootstrap-servers}") String bootstrapServers) {
5560
logger.info("init kafka producer from bootstrap servers: {}", bootstrapServers);
5661
return new DefaultKafkaProducerFactory<>(producerConfigs(bootstrapServers));
5762
}
58-
63+
5964
@Bean
6065
public KafkaTemplate<String, String> kafkaTemplate(@Autowired ProducerFactory<String, String> producerFactory) {
6166
logger.info("init kafka template...");
6267
return new KafkaTemplate<>(producerFactory);
6368
}
64-
69+
6570
@Bean
6671
public ConsumerFactory<String, String> consumerFactory(
6772
@Value("${spring.kafka.bootstrap-servers}") String bootstrapServers,
@@ -70,7 +75,7 @@ public ConsumerFactory<String, String> consumerFactory(
7075
Integer.valueOf(batchSize));
7176
return new DefaultKafkaConsumerFactory<>(consumerConfigs(bootstrapServers, batchSize));
7277
}
73-
78+
7479
@Bean
7580
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(
7681
@Autowired ConsumerFactory<String, String> consumerFactory) {
@@ -81,4 +86,4 @@ public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerCont
8186
factory.setBatchListener(Boolean.TRUE);
8287
return factory;
8388
}
84-
}
89+
}

0 commit comments

Comments
 (0)