Skip to content

Commit 137df29

Browse files
[fix][client] fix the beforeConsume() method earlier hit with message listener (#23578)
1 parent b5484f6 commit 137df29

File tree

4 files changed

+82
-2
lines changed

4 files changed

+82
-2
lines changed

pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -476,6 +476,84 @@ public void onAckTimeoutSend(Consumer<String> consumer, Set<MessageId> messageId
476476
consumer.close();
477477
}
478478

479+
@Test(dataProvider = "topicPartition")
480+
public void testDoNotEarlierHitBeforeConsumerWithMessageListener(int partitions) throws Exception {
481+
482+
AtomicInteger beforeConsumeCount = new AtomicInteger(0);
483+
PulsarClient client = PulsarClient.builder()
484+
.serviceUrl(lookupUrl.toString())
485+
.listenerThreads(1)
486+
.build();
487+
488+
ConsumerInterceptor<String> interceptor = new ConsumerInterceptor<>() {
489+
@Override
490+
public void close() {
491+
}
492+
493+
@Override
494+
public Message<String> beforeConsume(Consumer<String> consumer, Message<String> message) {
495+
beforeConsumeCount.incrementAndGet();
496+
log.info("beforeConsume messageId: {}", message.getMessageId());
497+
return message;
498+
}
499+
500+
@Override
501+
public void onAcknowledge(Consumer<String> consumer, MessageId messageId, Throwable cause) {
502+
}
503+
504+
@Override
505+
public void onAcknowledgeCumulative(Consumer<String> consumer, MessageId messageId, Throwable cause) {
506+
}
507+
508+
@Override
509+
public void onNegativeAcksSend(Consumer<String> consumer, Set<MessageId> messageIds) {
510+
}
511+
512+
@Override
513+
public void onAckTimeoutSend(Consumer<String> consumer, Set<MessageId> messageIds) {
514+
}
515+
};
516+
517+
final String topicName = "persistent://my-property/my-ns/my-topic";
518+
519+
if (partitions > 0) {
520+
admin.topics().createPartitionedTopic(topicName, partitions);
521+
} else {
522+
admin.topics().createNonPartitionedTopic(topicName);
523+
}
524+
525+
Consumer<String> consumer = client.newConsumer(Schema.STRING)
526+
.topic(topicName)
527+
.subscriptionType(SubscriptionType.Shared)
528+
.intercept(interceptor)
529+
.subscriptionName("my-subscription")
530+
.messageListener((c, m) -> {
531+
// Simulate a long processing time
532+
try {
533+
Thread.sleep(60000);
534+
} catch (InterruptedException e) {
535+
throw new RuntimeException(e);
536+
}
537+
})
538+
.subscribe();
539+
540+
Producer<String> producer = client.newProducer(Schema.STRING)
541+
.topic("persistent://my-property/my-ns/my-topic")
542+
.create();
543+
544+
final int messages = 10;
545+
for (int i = 0; i < messages; i++) {
546+
producer.newMessage().value("Hello Pulsar!").send();
547+
}
548+
Awaitility.await().untilAsserted(() -> {
549+
// Ensure that the interceptor is not hit before the message listener
550+
Assert.assertEquals(beforeConsumeCount.get(), 1);
551+
});
552+
producer.close();
553+
consumer.close();
554+
client.close();
555+
}
556+
479557
@Test
480558
public void testConsumerInterceptorWithPatternTopicSubscribe() throws PulsarClientException {
481559

pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1180,6 +1180,7 @@ protected void callMessageListener(Message<T> msg) {
11801180
id = msg.getMessageId();
11811181
}
11821182
unAckedMessageTracker.add(id, msg.getRedeliveryCount());
1183+
beforeConsume(msg);
11831184
listener.received(ConsumerBase.this, msg);
11841185
} catch (Throwable t) {
11851186
log.error("[{}][{}] Message listener error in processing message: {}", topic, subscription,

pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -542,7 +542,8 @@ protected Message<T> internalReceive(long timeout, TimeUnit unit) throws PulsarC
542542
return null;
543543
}
544544
messageProcessed(message);
545-
return beforeConsume(message);
545+
message = listener == null ? beforeConsume(message) : message;
546+
return message;
546547
} catch (InterruptedException e) {
547548
ExceptionHandler.handleInterruptedException(e);
548549
State state = getState();

pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -401,7 +401,7 @@ protected Message<T> internalReceive(long timeout, TimeUnit unit) throws PulsarC
401401
decreaseIncomingMessageSize(message);
402402
checkArgument(message instanceof TopicMessageImpl);
403403
trackUnAckedMsgIfNoListener(message.getMessageId(), message.getRedeliveryCount());
404-
message = beforeConsume(message);
404+
message = listener == null ? beforeConsume(message) : message;
405405
}
406406
resumeReceivingFromPausedConsumersIfNeeded();
407407
return message;

0 commit comments

Comments
 (0)