1919
2020#include " NegativeAcksTracker.h"
2121
22+ #include < cstdint>
2223#include < functional>
2324#include < set>
25+ #include < utility>
2426
2527#include " ClientImpl.h"
2628#include " ConsumerImpl.h"
2729#include " ExecutorService.h"
2830#include " LogUtils.h"
2931#include " MessageIdUtil.h"
32+ #include " pulsar/MessageBuilder.h"
33+ #include " pulsar/MessageId.h"
34+ #include " pulsar/MessageIdBuilder.h"
3035DECLARE_LOG_OBJECT ()
3136
3237namespace pulsar {
@@ -41,6 +46,7 @@ NegativeAcksTracker::NegativeAcksTracker(const ClientImplPtr &client, ConsumerIm
4146 nackDelay_ =
4247 std::chrono::milliseconds (std::max (conf.getNegativeAckRedeliveryDelayMs (), MIN_NACK_DELAY_MILLIS));
4348 timerInterval_ = std::chrono::milliseconds ((long )(nackDelay_.count () / 3 ));
49+ nackPrecisionBit_ = conf.getNegativeAckPrecisionBitCnt ();
4450 LOG_DEBUG (" Created negative ack tracker with delay: " << nackDelay_.count () << " ms - Timer interval: "
4551 << timerInterval_.count ());
4652}
@@ -75,13 +81,22 @@ void NegativeAcksTracker::handleTimer(const ASIO_ERROR &ec) {
7581
7682 auto now = Clock::now ();
7783
84+ // The map is sorted by time, so we can exit immediately when we traverse to a time that does not match
7885 for (auto it = nackedMessages_.begin (); it != nackedMessages_.end ();) {
79- if (it->second < now) {
80- messagesToRedeliver.insert (it->first );
81- it = nackedMessages_.erase (it);
82- } else {
83- ++it;
86+ if (it->first > now) {
87+ // We are done with all the messages that need to be redelivered
88+ break ;
8489 }
90+
91+ auto ledgerMap = it->second ;
92+ for (auto ledgerIt = ledgerMap.begin (); ledgerIt != ledgerMap.end (); ++ledgerIt) {
93+ auto entrySet = ledgerIt->second ;
94+ for (auto setIt = entrySet.begin (); setIt != entrySet.end (); ++setIt) {
95+ messagesToRedeliver.insert (
96+ MessageIdBuilder ().ledgerId (ledgerIt->first ).entryId (*setIt).build ());
97+ }
98+ }
99+ it = nackedMessages_.erase (it);
85100 }
86101 lock.unlock ();
87102
@@ -92,14 +107,27 @@ void NegativeAcksTracker::handleTimer(const ASIO_ERROR &ec) {
92107 scheduleTimer ();
93108}
94109
110+ std::chrono::steady_clock::time_point trimLowerBit (const std::chrono::steady_clock::time_point &tp,
111+ int bits) {
112+ // get origin timestamp in nanoseconds
113+ auto timestamp = std::chrono::duration_cast<std::chrono::nanoseconds>(tp.time_since_epoch ()).count ();
114+
115+ // trim lower bits
116+ auto trimmedTimestamp = timestamp & (~((1LL << bits) - 1 ));
117+
118+ return std::chrono::steady_clock::time_point (std::chrono::nanoseconds (trimmedTimestamp));
119+ }
120+
95121void NegativeAcksTracker::add (const MessageId &m) {
96122 auto msgId = discardBatch (m);
97123 auto now = Clock::now ();
98124
99125 {
100126 std::lock_guard<std::mutex> lock{mutex_};
127+ auto trimmedTimestamp = trimLowerBit (now + nackDelay_, nackPrecisionBit_);
128+ // If the timestamp is already in the map, we can just add the message to the existing entry
101129 // Erase batch id to group all nacks from same batch
102- nackedMessages_[msgId] = now + nackDelay_ ;
130+ nackedMessages_[trimmedTimestamp][ msgId. ledgerId ()]. add (( uint64_t )msgId. entryId ()) ;
103131 }
104132
105133 scheduleTimer ();
0 commit comments