Skip to content

Commit 15e0b00

Browse files
Avoid getLastMessageId RPC when calling hasMessageAvailable after seek by timestamp (#491)
1 parent b0c4412 commit 15e0b00

File tree

2 files changed

+28
-7
lines changed

2 files changed

+28
-7
lines changed

lib/ConsumerImpl.cc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1570,6 +1570,10 @@ void ConsumerImpl::seekAsync(uint64_t timestamp, const ResultCallback& callback)
15701570
bool ConsumerImpl::isReadCompacted() { return readCompacted_; }
15711571

15721572
void ConsumerImpl::hasMessageAvailableAsync(const HasMessageAvailableCallback& callback) {
1573+
if (!incomingMessages_.empty()) {
1574+
callback(ResultOk, true);
1575+
return;
1576+
}
15731577
bool compareMarkDeletePosition;
15741578
{
15751579
std::lock_guard<std::mutex> lock{mutexForMessageId_};
@@ -1735,6 +1739,7 @@ void ConsumerImpl::seekAsyncInternal(long requestId, const SharedBuffer& seek, c
17351739
hasSoughtByTimestamp_.store(true, std::memory_order_release);
17361740
} else {
17371741
seekMessageId_ = *boost::get<MessageId>(&seekArg);
1742+
hasSoughtByTimestamp_.store(false, std::memory_order_release);
17381743
}
17391744
seekStatus_ = SeekStatus::IN_PROGRESS;
17401745
seekCallback_ = callback;

tests/ReaderTest.cc

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,9 @@
2121
#include <pulsar/Reader.h>
2222
#include <time.h>
2323

24+
#include <future>
2425
#include <string>
26+
#include <thread>
2527

2628
#include "HttpHelper.h"
2729
#include "PulsarFriend.h"
@@ -850,7 +852,7 @@ TEST_P(ReaderSeekTest, testHasMessageAvailableAfterSeekToEnd) {
850852
ASSERT_FALSE(hasMessageAvailable);
851853
}
852854

853-
TEST_P(ReaderSeekTest, testHasMessageAvailableAfterSeekTimestamp) {
855+
TEST_F(ReaderSeekTest, testHasMessageAvailableAfterSeekTimestamp) {
854856
using namespace std::chrono;
855857
const auto topic = "test-has-message-available-after-seek-timestamp-" + std::to_string(time(nullptr));
856858
Producer producer;
@@ -862,12 +864,10 @@ TEST_P(ReaderSeekTest, testHasMessageAvailableAfterSeekTimestamp) {
862864

863865
auto createReader = [this, &topic](Reader& reader, const MessageId& msgId) {
864866
ASSERT_EQ(ResultOk, client.createReader(topic, msgId, {}, reader));
865-
if (GetParam()) {
866-
if (msgId == MessageId::earliest()) {
867-
EXPECT_HAS_MESSAGE_AVAILABLE(reader, true);
868-
} else {
869-
EXPECT_HAS_MESSAGE_AVAILABLE(reader, false);
870-
}
867+
if (msgId == MessageId::earliest()) {
868+
EXPECT_HAS_MESSAGE_AVAILABLE(reader, true);
869+
} else {
870+
EXPECT_HAS_MESSAGE_AVAILABLE(reader, false);
871871
}
872872
};
873873

@@ -886,6 +886,22 @@ TEST_P(ReaderSeekTest, testHasMessageAvailableAfterSeekTimestamp) {
886886
ASSERT_EQ(ResultOk, reader.seek(timestampBeforeSend));
887887
EXPECT_HAS_MESSAGE_AVAILABLE(reader, true);
888888
}
889+
890+
// Test `hasMessageAvailableAsync` will complete immediately if the incoming message queue is non-empty
891+
Reader reader;
892+
ASSERT_EQ(ResultOk, client.createReader(topic, MessageId::latest(), {}, reader));
893+
reader.seek(timestampBeforeSend);
894+
std::promise<std::thread::id> threadIdPromise;
895+
896+
waitUntil(seconds(3),
897+
[&reader] { return PulsarFriend::getConsumer(reader)->getNumOfPrefetchedMessages() > 0; });
898+
reader.hasMessageAvailableAsync([&threadIdPromise](Result result, bool hasMessageAvailable) {
899+
ASSERT_EQ(ResultOk, result);
900+
ASSERT_TRUE(hasMessageAvailable);
901+
threadIdPromise.set_value(std::this_thread::get_id());
902+
});
903+
auto threadId = threadIdPromise.get_future().get();
904+
ASSERT_EQ(threadId, std::this_thread::get_id());
889905
}
890906

891907
TEST_F(ReaderSeekTest, testSeekInclusiveChunkMessage) {

0 commit comments

Comments
 (0)