Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 17 additions & 8 deletions lib/ConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1113,7 +1113,11 @@ void ConsumerImpl::messageProcessed(Message& msg, bool track) {
*/
void ConsumerImpl::clearReceiveQueue() {
if (duringSeek()) {
if (!hasSoughtByTimestamp_.load(std::memory_order_acquire)) {
if (hasSoughtByTimestamp()) {
// Invalidate startMessageId_ so that isPriorBatchIndex and isPriorEntryIndex checks will be
// skipped, and hasMessageAvailableAsync won't use startMessageId_ in compare.
startMessageId_ = boost::none;
} else {
startMessageId_ = seekMessageId_.get();
}
SeekStatus expected = SeekStatus::COMPLETED;
Expand Down Expand Up @@ -1578,10 +1582,16 @@ void ConsumerImpl::hasMessageAvailableAsync(const HasMessageAvailableCallback& c
{
std::lock_guard<std::mutex> lock{mutexForMessageId_};
compareMarkDeletePosition =
(lastDequedMessageId_ == MessageId::earliest()) &&
(startMessageId_.get().value_or(MessageId::earliest()) == MessageId::latest());
}
if (compareMarkDeletePosition || hasSoughtByTimestamp_.load(std::memory_order_acquire)) {
// there is no message received by consumer, so we cannot compare the last position with the last
// received position
lastDequedMessageId_ == MessageId::earliest() &&
// If the start message id is latest, we should seek to the actual last message first.
(startMessageId_.get().value_or(MessageId::earliest()) == MessageId::latest() ||
// If there is a previous seek operation by timestamp, the start message id will be incorrect, so
// we cannot compare the start position with the last position.
hasSoughtByTimestamp());
}
if (compareMarkDeletePosition) {
auto self = get_shared_this_ptr();
getLastMessageIdAsync([self, callback](Result result, const GetLastMessageIdResponse& response) {
if (result != ResultOk) {
Expand All @@ -1600,8 +1610,7 @@ void ConsumerImpl::hasMessageAvailableAsync(const HasMessageAvailableCallback& c
callback(ResultOk, false);
}
};
if (self->config_.isStartMessageIdInclusive() &&
!self->hasSoughtByTimestamp_.load(std::memory_order_acquire)) {
if (self->config_.isStartMessageIdInclusive() && !self->hasSoughtByTimestamp()) {
self->seekAsync(response.getLastMessageId(), [callback, handleResponse](Result result) {
if (result != ResultOk) {
callback(result, {});
Expand Down Expand Up @@ -1766,7 +1775,7 @@ void ConsumerImpl::seekAsyncInternal(long requestId, const SharedBuffer& seek, c
// It's during reconnection, complete the seek future after connection is established
seekStatus_ = SeekStatus::COMPLETED;
} else {
if (!hasSoughtByTimestamp_.load(std::memory_order_acquire)) {
if (!hasSoughtByTimestamp()) {
startMessageId_ = seekMessageId_.get();
}
seekCallback_.release()(result);
Expand Down
1 change: 1 addition & 0 deletions lib/ConsumerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ class ConsumerImpl : public ConsumerImplBase {
Synchronized<MessageId> seekMessageId_{MessageId::earliest()};
std::atomic<bool> hasSoughtByTimestamp_{false};

bool hasSoughtByTimestamp() const { return hasSoughtByTimestamp_.load(std::memory_order_acquire); }
bool duringSeek() const { return seekStatus_ != SeekStatus::NOT_STARTED; }

class ChunkedMessageCtx {
Expand Down
10 changes: 10 additions & 0 deletions tests/ReaderTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -885,6 +885,16 @@ TEST_F(ReaderSeekTest, testHasMessageAvailableAfterSeekTimestamp) {
createReader(reader, msgId);
ASSERT_EQ(ResultOk, reader.seek(timestampBeforeSend));
EXPECT_HAS_MESSAGE_AVAILABLE(reader, true);

bool hasMessageAvailable;
while (true) {
ASSERT_EQ(ResultOk, reader.hasMessageAvailable(hasMessageAvailable));
if (!hasMessageAvailable) {
break;
}
Message msg;
ASSERT_EQ(ResultOk, reader.readNext(msg, 3000));
}
}

// Test `hasMessageAvailableAsync` will complete immediately if the incoming message queue is non-empty
Expand Down