Skip to content

Commit 9e119ce

Browse files
Fix acknowledgeCumulative never returns when accepting an invalid message id for a multi-topics consumer (#492)
1 parent 15e0b00 commit 9e119ce

File tree

2 files changed

+56
-11
lines changed

2 files changed

+56
-11
lines changed

lib/MultiTopicsConsumerImpl.cc

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -649,6 +649,14 @@ void MultiTopicsConsumerImpl::notifyPendingReceivedCallback(Result result, const
649649
callback(result, msg);
650650
}
651651

652+
static void logErrorTopicNameForAcknowledge(const std::string& topic) {
653+
if (topic.empty()) {
654+
LOG_ERROR("MessageId without a topic name cannot be acknowledged for a multi-topics consumer");
655+
} else {
656+
LOG_ERROR("Message of topic: " << topic << " not in consumers");
657+
}
658+
}
659+
652660
void MultiTopicsConsumerImpl::acknowledgeAsync(const MessageId& msgId, const ResultCallback& callback) {
653661
if (state_ != Ready) {
654662
interceptors_->onAcknowledge(Consumer(shared_from_this()), ResultAlreadyClosed, msgId);
@@ -657,19 +665,14 @@ void MultiTopicsConsumerImpl::acknowledgeAsync(const MessageId& msgId, const Res
657665
}
658666

659667
const std::string& topicPartitionName = msgId.getTopicName();
660-
if (topicPartitionName.empty()) {
661-
LOG_ERROR("MessageId without a topic name cannot be acknowledged for a multi-topics consumer");
662-
callback(ResultOperationNotSupported);
663-
return;
664-
}
665668
auto optConsumer = consumers_.find(topicPartitionName);
666669

667670
if (optConsumer) {
668671
unAckedMessageTrackerPtr_->remove(msgId);
669672
optConsumer.value()->acknowledgeAsync(msgId, callback);
670673
} else {
671-
LOG_ERROR("Message of topic: " << topicPartitionName << " not in unAckedMessageTracker");
672-
callback(ResultUnknownError);
674+
logErrorTopicNameForAcknowledge(topicPartitionName);
675+
callback(ResultOperationNotSupported);
673676
}
674677
}
675678

@@ -684,7 +687,7 @@ void MultiTopicsConsumerImpl::acknowledgeAsync(const MessageIdList& messageIdLis
684687
for (const MessageId& messageId : messageIdList) {
685688
const auto& topicName = messageId.getTopicName();
686689
if (topicName.empty()) {
687-
LOG_ERROR("MessageId without a topic name cannot be acknowledged for a multi-topics consumer");
690+
logErrorTopicNameForAcknowledge(topicName);
688691
callback(ResultOperationNotSupported);
689692
return;
690693
}
@@ -710,19 +713,22 @@ void MultiTopicsConsumerImpl::acknowledgeAsync(const MessageIdList& messageIdLis
710713
unAckedMessageTrackerPtr_->remove(kv.second);
711714
optConsumer.value()->acknowledgeAsync(kv.second, cb);
712715
} else {
713-
LOG_ERROR("Message of topic: " << kv.first << " not in consumers");
714-
callback(ResultUnknownError);
716+
logErrorTopicNameForAcknowledge(kv.first);
717+
callback(ResultOperationNotSupported);
715718
}
716719
}
717720
}
718721

719722
void MultiTopicsConsumerImpl::acknowledgeCumulativeAsync(const MessageId& msgId,
720723
const ResultCallback& callback) {
721-
msgId.getTopicName();
724+
const auto& topic = msgId.getTopicName();
722725
auto optConsumer = consumers_.find(msgId.getTopicName());
723726
if (optConsumer) {
724727
unAckedMessageTrackerPtr_->removeMessagesTill(msgId);
725728
optConsumer.value()->acknowledgeCumulativeAsync(msgId, callback);
729+
} else {
730+
logErrorTopicNameForAcknowledge(topic);
731+
callback(ResultOperationNotSupported);
726732
}
727733
}
728734

tests/MultiTopicsConsumerTest.cc

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,3 +103,42 @@ TEST(MultiTopicsConsumerTest, testSeekToNewerPosition) {
103103

104104
client.close();
105105
}
106+
107+
TEST(MultiTopicsConsumerTest, testAcknowledgeInvalidMessageId) {
108+
const std::string topicPrefix = "multi-topics-consumer-ack-invalid-msg-id";
109+
Client client{lookupUrl};
110+
std::vector<std::string> topics(2);
111+
for (size_t i = 0; i < topics.size(); i++) {
112+
Producer producer;
113+
auto topic = topicPrefix + unique_str();
114+
ASSERT_EQ(ResultOk, client.createProducer(topic, producer));
115+
ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent("msg-" + std::to_string(i)).build()));
116+
topics[i] = std::move(topic);
117+
}
118+
119+
Consumer consumer;
120+
ConsumerConfiguration conf;
121+
conf.setSubscriptionInitialPosition(InitialPositionEarliest);
122+
ASSERT_EQ(ResultOk, client.subscribe(topics, "sub", conf, consumer));
123+
124+
std::vector<MessageId> msgIds(topics.size());
125+
for (size_t i = 0; i < topics.size(); i++) {
126+
Message msg;
127+
ASSERT_EQ(ResultOk, consumer.receive(msg, 3000));
128+
std::string serialized;
129+
msg.getMessageId().serialize(serialized);
130+
msgIds[i] = MessageId::deserialize(serialized);
131+
}
132+
133+
ASSERT_EQ(ResultOperationNotSupported, consumer.acknowledge(msgIds[0]));
134+
ASSERT_EQ(ResultOperationNotSupported, consumer.acknowledge(msgIds));
135+
ASSERT_EQ(ResultOperationNotSupported, consumer.acknowledgeCumulative(msgIds[1]));
136+
137+
msgIds[0].setTopicName("invalid-topic");
138+
msgIds[1].setTopicName("invalid-topic");
139+
ASSERT_EQ(ResultOperationNotSupported, consumer.acknowledge(msgIds[0]));
140+
ASSERT_EQ(ResultOperationNotSupported, consumer.acknowledge(msgIds));
141+
ASSERT_EQ(ResultOperationNotSupported, consumer.acknowledgeCumulative(msgIds[1]));
142+
143+
client.close();
144+
}

0 commit comments

Comments
 (0)