From d86361287cc5503b5697fe5264d74527872f7333 Mon Sep 17 00:00:00 2001 From: Raphael Steiner Date: Sat, 11 Apr 2026 23:03:38 +0200 Subject: [PATCH 01/19] batch pushes --- include/atomic_queue/atomic_queue.h | 45 +++++++++++++++++++++++++++++ 1 file changed, 45 insertions(+) diff --git a/include/atomic_queue/atomic_queue.h b/include/atomic_queue/atomic_queue.h index 1caa82e..7f2ed21 100644 --- a/include/atomic_queue/atomic_queue.h +++ b/include/atomic_queue/atomic_queue.h @@ -10,6 +10,7 @@ #include #include #include +#include #include #include @@ -292,6 +293,33 @@ class AtomicQueueCommon { return true; } + template + ATOMIC_QUEUE_INLINE InputIt try_push(InputIt first, InputIt const last) noexcept { + int pushes = static_cast(std::distance(first, last)); + auto head = head_.load(X); + if(Derived::spsc_) { + int const slots = static_cast(tail_.load(X) + static_cast(*this).size_ - head); + pushes = std::min(slots, pushes); + if (pushes > 0) + head_.store(head + static_cast(pushes), X); + } + else { + int const length = pushes; + do { + int const slots = static_cast(tail_.load(X) + static_cast(*this).size_ - head); + pushes = std::min(slots, length); + if (pushes <= 0) + break; + } while(ATOMIC_QUEUE_UNLIKELY(!head_.compare_exchange_weak(head, head + static_cast(pushes), X, X))); // This loop is not FIFO. + } + + int const end = head + pushes; + while (head < end) { + static_cast(*this).do_push(*first++, head++); + } + return first; + } + template ATOMIC_QUEUE_INLINE bool try_pop(T& element) noexcept { auto tail = tail_.load(X); @@ -325,6 +353,23 @@ class AtomicQueueCommon { static_cast(*this).do_push(std::forward(element), head); } + template + ATOMIC_QUEUE_INLINE void push(InputIt first, InputIt const last) noexcept { + unsigned const length = static_cast(std::distance(first, last)); + unsigned head; + if(Derived::spsc_) { + head = head_.load(X); + head_.store(head + length, X); + } + else { + constexpr auto memory_order = Derived::total_order_ ? std::memory_order_seq_cst : std::memory_order_relaxed; + head = head_.fetch_add(length, memory_order); // FIFO and total order on Intel regardless, as of 2019. + } + while (first != last) { + static_cast(*this).do_push(*first++, head++); + } + } + ATOMIC_QUEUE_INLINE auto pop() noexcept { unsigned tail; if(Derived::spsc_) { From 1f2211d4c033e6f17842540cd8d54101391e5fac Mon Sep 17 00:00:00 2001 From: Raphael Steiner Date: Sun, 12 Apr 2026 09:43:19 +0200 Subject: [PATCH 02/19] Changed to Pointer Signature --- include/atomic_queue/atomic_queue.h | 42 ++++++++++++++--------------- 1 file changed, 20 insertions(+), 22 deletions(-) diff --git a/include/atomic_queue/atomic_queue.h b/include/atomic_queue/atomic_queue.h index 7f2ed21..58a49dc 100644 --- a/include/atomic_queue/atomic_queue.h +++ b/include/atomic_queue/atomic_queue.h @@ -293,31 +293,30 @@ class AtomicQueueCommon { return true; } - template - ATOMIC_QUEUE_INLINE InputIt try_push(InputIt first, InputIt const last) noexcept { - int pushes = static_cast(std::distance(first, last)); + template + ATOMIC_QUEUE_INLINE unsigned try_push(T* ATOMIC_QUEUE_RESTRICT p, unsigned n) noexcept { auto head = head_.load(X); if(Derived::spsc_) { int const slots = static_cast(tail_.load(X) + static_cast(*this).size_ - head); - pushes = std::min(slots, pushes); - if (pushes > 0) - head_.store(head + static_cast(pushes), X); + if (slots <= 0) + return 0; + n = std::min(n, static_cast(slots)); + head_.store(head + n, X); } else { - int const length = pushes; + unsigned const length = n; do { int const slots = static_cast(tail_.load(X) + static_cast(*this).size_ - head); - pushes = std::min(slots, length); - if (pushes <= 0) - break; - } while(ATOMIC_QUEUE_UNLIKELY(!head_.compare_exchange_weak(head, head + static_cast(pushes), X, X))); // This loop is not FIFO. + if (slots <= 0) + return 0; + n = std::min(length, static_cast(slots)); + } while(ATOMIC_QUEUE_UNLIKELY(!head_.compare_exchange_weak(head, head + n, X, X))); // This loop is not FIFO. } - int const end = head + pushes; - while (head < end) { - static_cast(*this).do_push(*first++, head++); + for (T* q = p + n; p != q;) + static_cast(*this).do_push(*p++, head++); } - return first; + return n; } template @@ -353,20 +352,19 @@ class AtomicQueueCommon { static_cast(*this).do_push(std::forward(element), head); } - template - ATOMIC_QUEUE_INLINE void push(InputIt first, InputIt const last) noexcept { - unsigned const length = static_cast(std::distance(first, last)); + template + ATOMIC_QUEUE_INLINE void push(T* ATOMIC_QUEUE_RESTRICT p, unsigned n) noexcept { unsigned head; if(Derived::spsc_) { head = head_.load(X); - head_.store(head + length, X); + head_.store(head + n, X); } else { constexpr auto memory_order = Derived::total_order_ ? std::memory_order_seq_cst : std::memory_order_relaxed; - head = head_.fetch_add(length, memory_order); // FIFO and total order on Intel regardless, as of 2019. + head = head_.fetch_add(n, memory_order); // FIFO and total order on Intel regardless, as of 2019. } - while (first != last) { - static_cast(*this).do_push(*first++, head++); + for (T* q = p + n; p != q;) + static_cast(*this).do_push(*p++, head++); } } From 9d7c03c10af937f4a16a8591fac22dd61536f46a Mon Sep 17 00:00:00 2001 From: Raphael Steiner Date: Sun, 12 Apr 2026 10:55:13 +0200 Subject: [PATCH 03/19] typo --- include/atomic_queue/atomic_queue.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/include/atomic_queue/atomic_queue.h b/include/atomic_queue/atomic_queue.h index 58a49dc..e8e6b09 100644 --- a/include/atomic_queue/atomic_queue.h +++ b/include/atomic_queue/atomic_queue.h @@ -313,7 +313,7 @@ class AtomicQueueCommon { } while(ATOMIC_QUEUE_UNLIKELY(!head_.compare_exchange_weak(head, head + n, X, X))); // This loop is not FIFO. } - for (T* q = p + n; p != q;) + for (T* q = p + n; p != q;) { static_cast(*this).do_push(*p++, head++); } return n; @@ -363,7 +363,7 @@ class AtomicQueueCommon { constexpr auto memory_order = Derived::total_order_ ? std::memory_order_seq_cst : std::memory_order_relaxed; head = head_.fetch_add(n, memory_order); // FIFO and total order on Intel regardless, as of 2019. } - for (T* q = p + n; p != q;) + for (T* q = p + n; p != q;) { static_cast(*this).do_push(*p++, head++); } } From 99781581cce35b43e5ac3a49bea4b774fbb260bc Mon Sep 17 00:00:00 2001 From: Raphael Steiner Date: Sun, 12 Apr 2026 12:10:13 +0200 Subject: [PATCH 04/19] changed signature and tests for batch push --- include/atomic_queue/atomic_queue.h | 26 +++++---- src/tests.cc | 81 +++++++++++++++++++++++++++-- 2 files changed, 95 insertions(+), 12 deletions(-) diff --git a/include/atomic_queue/atomic_queue.h b/include/atomic_queue/atomic_queue.h index e8e6b09..8ba6247 100644 --- a/include/atomic_queue/atomic_queue.h +++ b/include/atomic_queue/atomic_queue.h @@ -294,12 +294,13 @@ class AtomicQueueCommon { } template - ATOMIC_QUEUE_INLINE unsigned try_push(T* ATOMIC_QUEUE_RESTRICT p, unsigned n) noexcept { + ATOMIC_QUEUE_INLINE T* try_push(T* ATOMIC_QUEUE_RESTRICT first, T* const last) noexcept { + unsigned n = static_cast (last - first); auto head = head_.load(X); if(Derived::spsc_) { int const slots = static_cast(tail_.load(X) + static_cast(*this).size_ - head); if (slots <= 0) - return 0; + return first; n = std::min(n, static_cast(slots)); head_.store(head + n, X); } @@ -308,15 +309,15 @@ class AtomicQueueCommon { do { int const slots = static_cast(tail_.load(X) + static_cast(*this).size_ - head); if (slots <= 0) - return 0; + return first; n = std::min(length, static_cast(slots)); } while(ATOMIC_QUEUE_UNLIKELY(!head_.compare_exchange_weak(head, head + n, X, X))); // This loop is not FIFO. } - for (T* q = p + n; p != q;) { - static_cast(*this).do_push(*p++, head++); + while (n--) { + static_cast(*this).do_push(*first++, head++); } - return n; + return first; } template @@ -353,7 +354,8 @@ class AtomicQueueCommon { } template - ATOMIC_QUEUE_INLINE void push(T* ATOMIC_QUEUE_RESTRICT p, unsigned n) noexcept { + ATOMIC_QUEUE_INLINE void push(T* ATOMIC_QUEUE_RESTRICT first, T* const last) noexcept { + unsigned n = static_cast(last - first); unsigned head; if(Derived::spsc_) { head = head_.load(X); @@ -363,8 +365,8 @@ class AtomicQueueCommon { constexpr auto memory_order = Derived::total_order_ ? std::memory_order_seq_cst : std::memory_order_relaxed; head = head_.fetch_add(n, memory_order); // FIFO and total order on Intel regardless, as of 2019. } - for (T* q = p + n; p != q;) { - static_cast(*this).do_push(*p++, head++); + while (n--) { + static_cast(*this).do_push(*first++, head++); } } @@ -694,6 +696,12 @@ struct RetryDecorator : Queue { spin_loop_pause(); } + ATOMIC_QUEUE_INLINE void push(T* first, T* const last) noexcept { + while(last != (first = this->try_push(first, last))) { + spin_loop_pause(); + } + } + ATOMIC_QUEUE_INLINE T pop() noexcept { T element; while(!this->try_pop(element)) diff --git a/src/tests.cc b/src/tests.cc index 62175be..54af710 100644 --- a/src/tests.cc +++ b/src/tests.cc @@ -21,7 +21,7 @@ namespace { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // Check that all push'es are ever pop'ed once with multiple producer and multiple consumers. -template +template void stress() { constexpr unsigned N = 1000000; using T = typename Queue::value_type; @@ -34,8 +34,19 @@ void stress() { for(unsigned i = 0; i < PRODUCERS; ++i) producers[i] = std::thread([&q, &barrier, N=N]() { barrier.wait(); - for(T n = N; n; --n) - q.push(n); + for(T n = N; n;) { + if (BATCH <= 1) { + q.push(n--); + } + else { + T buffer[BATCH]; + unsigned j = 0; + while (j < BATCH && n) { + buffer[j++] = n--; + } + q.push(buffer, buffer + j); + } + } }); uint64_t results[CONSUMERS]; @@ -160,66 +171,130 @@ BOOST_AUTO_TEST_CASE(stress_AtomicQueue) { stress>>(); } +BOOST_AUTO_TEST_CASE(stress_AtomicQueue_batch) { + stress>, 3, 3, 12>(); +} + BOOST_AUTO_TEST_CASE(stress_OptimistAtomicQueue) { stress>(); } +BOOST_AUTO_TEST_CASE(stress_OptimistAtomicQueue_batch) { + stress, 3, 3, 12>(); +} + BOOST_AUTO_TEST_CASE(stress_AtomicQueue2) { stress>>(); } +BOOST_AUTO_TEST_CASE(stress_AtomicQueue2_batch) { + stress>, 3, 3, 12>(); +} + BOOST_AUTO_TEST_CASE(stress_OptimistAtomicQueue2) { stress>(); } +BOOST_AUTO_TEST_CASE(stress_OptimistAtomicQueue2_batch) { + stress, 3, 3, 12>(); +} + BOOST_AUTO_TEST_CASE(spsc_stress_AtomicQueue) { stress>, 1, 1>(); } +BOOST_AUTO_TEST_CASE(spsc_stress_AtomicQueue_batch) { + stress>, 1, 1, 12>(); +} + BOOST_AUTO_TEST_CASE(spsc_stress_OptimistAtomicQueue) { stress, 1, 1>(); } +BOOST_AUTO_TEST_CASE(spsc_stress_OptimistAtomicQueue_batch) { + stress, 1, 1, 12>(); +} + BOOST_AUTO_TEST_CASE(spsc_stress_AtomicQueue2) { stress>, 1, 1>(); } +BOOST_AUTO_TEST_CASE(spsc_stress_AtomicQueue2_batch) { + stress>, 1, 1, 12>(); +} + BOOST_AUTO_TEST_CASE(spsc_stress_OptimistAtomicQueue2) { stress, 1, 1>(); } +BOOST_AUTO_TEST_CASE(spsc_stress_OptimistAtomicQueue2_batch) { + stress, 1, 1, 12>(); +} + BOOST_AUTO_TEST_CASE(stress_AtomicQueueB) { stress, CAPACITY>>>(); } +BOOST_AUTO_TEST_CASE(stress_AtomicQueueB_batch) { + stress, CAPACITY>>, 3, 3, 12>(); +} + BOOST_AUTO_TEST_CASE(stress_OptimistAtomicQueueB) { stress, CAPACITY>>(); } +BOOST_AUTO_TEST_CASE(stress_OptimistAtomicQueueB_batch) { + stress, CAPACITY>, 3, 3, 12>(); +} + BOOST_AUTO_TEST_CASE(stress_AtomicQueueB2) { stress, CAPACITY>>>(); } +BOOST_AUTO_TEST_CASE(stress_AtomicQueueB2_batch) { + stress, CAPACITY>>, 3, 3, 12>(); +} + BOOST_AUTO_TEST_CASE(stress_OptimistAtomicQueueB2) { stress, CAPACITY>>(); } +BOOST_AUTO_TEST_CASE(stress_OptimistAtomicQueueB2_batch) { + stress, CAPACITY>, 3, 3, 12>(); +} + BOOST_AUTO_TEST_CASE(spsc_stress_AtomicQueueB) { stress, 0u, true, false, true>, CAPACITY>>, 1, 1>(); } +BOOST_AUTO_TEST_CASE(spsc_stress_AtomicQueueB_batch) { + stress, 0u, true, false, true>, CAPACITY>>, 1, 1, 12>(); +} + BOOST_AUTO_TEST_CASE(spsc_stress_OptimistAtomicQueueB) { stress, 0u, true, false, true>, CAPACITY>, 1, 1>(); } +BOOST_AUTO_TEST_CASE(spsc_stress_OptimistAtomicQueueB_batch) { + stress, 0u, true, false, true>, CAPACITY>, 1, 1, 12>(); +} + BOOST_AUTO_TEST_CASE(spsc_stress_AtomicQueueB2) { stress, true, false, true>, CAPACITY>>, 1, 1>(); } +BOOST_AUTO_TEST_CASE(spsc_stress_AtomicQueueB2_batch) { + stress, true, false, true>, CAPACITY>>, 1, 1, 12>(); +} + BOOST_AUTO_TEST_CASE(spsc_stress_OptimistAtomicQueueB2) { stress, true, false, true>, CAPACITY>, 1, 1>(); } +BOOST_AUTO_TEST_CASE(spsc_stress_OptimistAtomicQueueB2_batch) { + stress, true, false, true>, CAPACITY>, 1, 1, 12>(); +} + BOOST_AUTO_TEST_CASE(move_only_2) { AtomicQueue2, 2> q; test_unique_ptr_int(q); From 3d2702b9dbbb74df03d84429707d5dd35d970888 Mon Sep 17 00:00:00 2001 From: Raphael Steiner Date: Sun, 12 Apr 2026 12:45:35 +0200 Subject: [PATCH 05/19] changed signedness of pointer difference for fewer instructions --- include/atomic_queue/atomic_queue.h | 21 ++++++++++----------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/include/atomic_queue/atomic_queue.h b/include/atomic_queue/atomic_queue.h index 8ba6247..8e6426e 100644 --- a/include/atomic_queue/atomic_queue.h +++ b/include/atomic_queue/atomic_queue.h @@ -10,7 +10,6 @@ #include #include #include -#include #include #include @@ -295,28 +294,28 @@ class AtomicQueueCommon { template ATOMIC_QUEUE_INLINE T* try_push(T* ATOMIC_QUEUE_RESTRICT first, T* const last) noexcept { - unsigned n = static_cast (last - first); + int n = static_cast(last - first); auto head = head_.load(X); if(Derived::spsc_) { int const slots = static_cast(tail_.load(X) + static_cast(*this).size_ - head); - if (slots <= 0) + n = std::min(n, slots); + if (n <= 0) return first; - n = std::min(n, static_cast(slots)); - head_.store(head + n, X); + head_.store(head + static_cast(n), X); } else { - unsigned const length = n; + int const length = n; do { int const slots = static_cast(tail_.load(X) + static_cast(*this).size_ - head); - if (slots <= 0) + n = std::min(length, slots); + if (n <= 0) return first; - n = std::min(length, static_cast(slots)); - } while(ATOMIC_QUEUE_UNLIKELY(!head_.compare_exchange_weak(head, head + n, X, X))); // This loop is not FIFO. + } while(ATOMIC_QUEUE_UNLIKELY(!head_.compare_exchange_weak(head, head + static_cast(n), X, X))); // This loop is not FIFO. } - while (n--) { + do { static_cast(*this).do_push(*first++, head++); - } + } while (--n); return first; } From 000ac7d9a8d658a0e9a22a3c674b263e1e077e61 Mon Sep 17 00:00:00 2001 From: Raphael Steiner Date: Wed, 15 Apr 2026 18:26:51 +0200 Subject: [PATCH 06/19] changed signature to iterator --- include/atomic_queue/atomic_queue.h | 16 +++++++++------- src/tests.cc | 11 ++++++----- 2 files changed, 15 insertions(+), 12 deletions(-) diff --git a/include/atomic_queue/atomic_queue.h b/include/atomic_queue/atomic_queue.h index 8e6426e..86684b2 100644 --- a/include/atomic_queue/atomic_queue.h +++ b/include/atomic_queue/atomic_queue.h @@ -10,6 +10,7 @@ #include #include #include +#include #include #include @@ -292,9 +293,9 @@ class AtomicQueueCommon { return true; } - template - ATOMIC_QUEUE_INLINE T* try_push(T* ATOMIC_QUEUE_RESTRICT first, T* const last) noexcept { - int n = static_cast(last - first); + template + ATOMIC_QUEUE_INLINE InputIt try_push(InputIt first, InputIt const last) noexcept { + int n = static_cast(std::distance(first, last)); auto head = head_.load(X); if(Derived::spsc_) { int const slots = static_cast(tail_.load(X) + static_cast(*this).size_ - head); @@ -352,9 +353,9 @@ class AtomicQueueCommon { static_cast(*this).do_push(std::forward(element), head); } - template - ATOMIC_QUEUE_INLINE void push(T* ATOMIC_QUEUE_RESTRICT first, T* const last) noexcept { - unsigned n = static_cast(last - first); + template + ATOMIC_QUEUE_INLINE void push(InputIt first, InputIt const last) noexcept { + unsigned n = static_cast(std::distance(first, last)); unsigned head; if(Derived::spsc_) { head = head_.load(X); @@ -695,7 +696,8 @@ struct RetryDecorator : Queue { spin_loop_pause(); } - ATOMIC_QUEUE_INLINE void push(T* first, T* const last) noexcept { + template + ATOMIC_QUEUE_INLINE void push(InputIt first, InputIt const last) noexcept { while(last != (first = this->try_push(first, last))) { spin_loop_pause(); } diff --git a/src/tests.cc b/src/tests.cc index 54af710..30ef10a 100644 --- a/src/tests.cc +++ b/src/tests.cc @@ -8,6 +8,7 @@ #include "atomic_queue/atomic_queue.h" #include "atomic_queue/barrier.h" +#include #include #include #include @@ -39,12 +40,12 @@ void stress() { q.push(n--); } else { - T buffer[BATCH]; - unsigned j = 0; - while (j < BATCH && n) { - buffer[j++] = n--; + std::array buffer; + typename std::array::iterator it = buffer.begin(); + while (it != buffer.end() && n) { + *it++ = n--; } - q.push(buffer, buffer + j); + q.push(buffer.begin(), it); } } }); From 6b736ad37916c3437f35fd6cac2a86deefd3de6f Mon Sep 17 00:00:00 2001 From: Raphael Steiner Date: Wed, 15 Apr 2026 22:18:10 +0200 Subject: [PATCH 07/19] added return iterator --- include/atomic_queue/atomic_queue.h | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/include/atomic_queue/atomic_queue.h b/include/atomic_queue/atomic_queue.h index 86684b2..4ca6f91 100644 --- a/include/atomic_queue/atomic_queue.h +++ b/include/atomic_queue/atomic_queue.h @@ -354,7 +354,7 @@ class AtomicQueueCommon { } template - ATOMIC_QUEUE_INLINE void push(InputIt first, InputIt const last) noexcept { + ATOMIC_QUEUE_INLINE InputIt push(InputIt first, InputIt const last) noexcept { unsigned n = static_cast(std::distance(first, last)); unsigned head; if(Derived::spsc_) { @@ -368,6 +368,7 @@ class AtomicQueueCommon { while (n--) { static_cast(*this).do_push(*first++, head++); } + return first; } ATOMIC_QUEUE_INLINE auto pop() noexcept { @@ -697,10 +698,11 @@ struct RetryDecorator : Queue { } template - ATOMIC_QUEUE_INLINE void push(InputIt first, InputIt const last) noexcept { + ATOMIC_QUEUE_INLINE InputIt push(InputIt first, InputIt const last) noexcept { while(last != (first = this->try_push(first, last))) { spin_loop_pause(); } + return first; } ATOMIC_QUEUE_INLINE T pop() noexcept { From 5330997cde8e352ca56d1ed6e7df4bf9a6ed9a4f Mon Sep 17 00:00:00 2001 From: Raphael Steiner Date: Wed, 15 Apr 2026 22:35:01 +0200 Subject: [PATCH 08/19] batch pops --- include/atomic_queue/atomic_queue.h | 53 +++++++++++++++++++++++++++++ src/tests.cc | 23 +++++++++++-- 2 files changed, 73 insertions(+), 3 deletions(-) diff --git a/include/atomic_queue/atomic_queue.h b/include/atomic_queue/atomic_queue.h index 4ca6f91..135efde 100644 --- a/include/atomic_queue/atomic_queue.h +++ b/include/atomic_queue/atomic_queue.h @@ -339,6 +339,33 @@ class AtomicQueueCommon { return true; } + template + ATOMIC_QUEUE_INLINE OutputIt try_pop(OutputIt first, int& remaining_requested_pops) noexcept { + auto tail = tail_.load(X); + int num_pops; + if(Derived::spsc_) { + int const num_elements = static_cast(head_.load(X) - tail); + num_pops = std::min(remaining_requested_pops, num_elements); + if(num_pops <= 0) + return first; + tail_.store(tail + static_cast(num_pops), X); + } + else { + do { + int const num_elements = static_cast(head_.load(X) - tail); + num_pops = std::min(remaining_requested_pops, num_elements); + if(num_pops <= 0) + return first; + } while(ATOMIC_QUEUE_UNLIKELY(!tail_.compare_exchange_weak(tail, tail + static_cast(num_pops), X, X))); // This loop is not FIFO. + } + + remaining_requested_pops -= num_pops; + do { + *first++ = static_cast(*this).do_pop(tail++); + } while (--num_pops); + return first; + } + template ATOMIC_QUEUE_INLINE void push(T&& element) noexcept { unsigned head; @@ -384,6 +411,23 @@ class AtomicQueueCommon { return static_cast(*this).do_pop(tail); } + template + ATOMIC_QUEUE_INLINE OutputIt pop(OutputIt first, unsigned n) noexcept { + unsigned tail; + if(Derived::spsc_) { + tail = tail_.load(X); + tail_.store(tail + n, X); + } + else { + constexpr auto memory_order = Derived::total_order_ ? std::memory_order_seq_cst : std::memory_order_relaxed; + tail = tail_.fetch_add(n, memory_order); // FIFO and total order on Intel regardless, as of 2019. + } + while(n--) { + *first++ = static_cast(*this).do_pop(tail++); + } + return first; + } + ATOMIC_QUEUE_INLINE bool was_empty() const noexcept { return !was_size(); } @@ -711,6 +755,15 @@ struct RetryDecorator : Queue { spin_loop_pause(); return element; } + + template + ATOMIC_QUEUE_INLINE OutputIt pop(OutputIt first, int n) noexcept { + while(n > 0) { + first = this->try_pop(first, n); + spin_loop_pause(); + } + return first; + } }; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/tests.cc b/src/tests.cc index 30ef10a..a29f32c 100644 --- a/src/tests.cc +++ b/src/tests.cc @@ -56,15 +56,32 @@ void stress() { consumers[i] = std::thread([&q, &barrier, &r = results[i], STOP = STOP]() { barrier.wait(); uint64_t result = 0; - for(T n; (n = q.pop()) != STOP;) - result += n; + if (BATCH <= 1) { + for(T n; (n = q.pop()) != STOP;) + result += n; + } + else { + bool continue_pops = true; + while (continue_pops) { + std::array buffer; + typename std::array::iterator end_it = q.pop(buffer.begin(), BATCH); + for (typename std::array::iterator it = buffer.begin(); it != end_it; ++it) { + if (*it != STOP) { + result += *it; + } + else { + continue_pops = false; + } + } + } + } r = result; }); barrier.release(PRODUCERS + CONSUMERS); for(auto& t : producers) t.join(); - for(int i = CONSUMERS; i--;) + for(int i = CONSUMERS * std::max(1, BATCH); i--;) q.push(STOP); for(auto& t : consumers) t.join(); From a139a59327c098c654475f6f66acd1b31bd34f39 Mon Sep 17 00:00:00 2001 From: Raphael Steiner Date: Thu, 16 Apr 2026 19:46:38 +0200 Subject: [PATCH 09/19] Batch pop signature change --- include/atomic_queue/atomic_queue.h | 29 ++++++++++++++--------------- src/tests.cc | 4 ++-- 2 files changed, 16 insertions(+), 17 deletions(-) diff --git a/include/atomic_queue/atomic_queue.h b/include/atomic_queue/atomic_queue.h index 135efde..982b1be 100644 --- a/include/atomic_queue/atomic_queue.h +++ b/include/atomic_queue/atomic_queue.h @@ -340,30 +340,30 @@ class AtomicQueueCommon { } template - ATOMIC_QUEUE_INLINE OutputIt try_pop(OutputIt first, int& remaining_requested_pops) noexcept { + ATOMIC_QUEUE_INLINE int try_pop(OutputIt& first, int n) noexcept { auto tail = tail_.load(X); - int num_pops; if(Derived::spsc_) { int const num_elements = static_cast(head_.load(X) - tail); - num_pops = std::min(remaining_requested_pops, num_elements); - if(num_pops <= 0) - return first; - tail_.store(tail + static_cast(num_pops), X); + n = std::min(n, num_elements); + if(n <= 0) + return 0; + tail_.store(tail + static_cast(n), X); } else { + int const desired_pops = n; do { int const num_elements = static_cast(head_.load(X) - tail); - num_pops = std::min(remaining_requested_pops, num_elements); - if(num_pops <= 0) - return first; - } while(ATOMIC_QUEUE_UNLIKELY(!tail_.compare_exchange_weak(tail, tail + static_cast(num_pops), X, X))); // This loop is not FIFO. + n = std::min(desired_pops, num_elements); + if(n <= 0) + return 0; + } while(ATOMIC_QUEUE_UNLIKELY(!tail_.compare_exchange_weak(tail, tail + static_cast(n), X, X))); // This loop is not FIFO. } - remaining_requested_pops -= num_pops; + int i = n; do { *first++ = static_cast(*this).do_pop(tail++); - } while (--num_pops); - return first; + } while (--i); + return n; } template @@ -758,8 +758,7 @@ struct RetryDecorator : Queue { template ATOMIC_QUEUE_INLINE OutputIt pop(OutputIt first, int n) noexcept { - while(n > 0) { - first = this->try_pop(first, n); + while (n -= this->try_pop(first, n)) { spin_loop_pause(); } return first; diff --git a/src/tests.cc b/src/tests.cc index a29f32c..f93e620 100644 --- a/src/tests.cc +++ b/src/tests.cc @@ -64,8 +64,8 @@ void stress() { bool continue_pops = true; while (continue_pops) { std::array buffer; - typename std::array::iterator end_it = q.pop(buffer.begin(), BATCH); - for (typename std::array::iterator it = buffer.begin(); it != end_it; ++it) { + typename std::array::iterator out_it = q.pop(buffer.begin(), BATCH); + for (typename std::array::iterator it = buffer.begin(); it != out_it; ++it) { if (*it != STOP) { result += *it; } From 38db7c146ace31c33819dce69f8c012be9c64ee1 Mon Sep 17 00:00:00 2001 From: Raphael Steiner Date: Sun, 19 Apr 2026 19:40:24 +0200 Subject: [PATCH 10/19] formatting --- include/atomic_queue/atomic_queue.h | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/include/atomic_queue/atomic_queue.h b/include/atomic_queue/atomic_queue.h index 982b1be..77cc5dd 100644 --- a/include/atomic_queue/atomic_queue.h +++ b/include/atomic_queue/atomic_queue.h @@ -300,7 +300,7 @@ class AtomicQueueCommon { if(Derived::spsc_) { int const slots = static_cast(tail_.load(X) + static_cast(*this).size_ - head); n = std::min(n, slots); - if (n <= 0) + if(n <= 0) return first; head_.store(head + static_cast(n), X); } @@ -309,14 +309,14 @@ class AtomicQueueCommon { do { int const slots = static_cast(tail_.load(X) + static_cast(*this).size_ - head); n = std::min(length, slots); - if (n <= 0) + if(n <= 0) return first; } while(ATOMIC_QUEUE_UNLIKELY(!head_.compare_exchange_weak(head, head + static_cast(n), X, X))); // This loop is not FIFO. } do { static_cast(*this).do_push(*first++, head++); - } while (--n); + } while(--n); return first; } @@ -362,7 +362,7 @@ class AtomicQueueCommon { int i = n; do { *first++ = static_cast(*this).do_pop(tail++); - } while (--i); + } while(--i); return n; } @@ -392,7 +392,7 @@ class AtomicQueueCommon { constexpr auto memory_order = Derived::total_order_ ? std::memory_order_seq_cst : std::memory_order_relaxed; head = head_.fetch_add(n, memory_order); // FIFO and total order on Intel regardless, as of 2019. } - while (n--) { + while(n--) { static_cast(*this).do_push(*first++, head++); } return first; @@ -758,7 +758,7 @@ struct RetryDecorator : Queue { template ATOMIC_QUEUE_INLINE OutputIt pop(OutputIt first, int n) noexcept { - while (n -= this->try_pop(first, n)) { + while(n -= this->try_pop(first, n)) { spin_loop_pause(); } return first; From 77e930dca47ad943422bcda24dec80e4cf8d9680 Mon Sep 17 00:00:00 2001 From: Raphael Steiner Date: Sun, 19 Apr 2026 19:54:51 +0200 Subject: [PATCH 11/19] api documentation --- README.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/README.md b/README.md index 9b9f45f..44a4a09 100644 --- a/README.md +++ b/README.md @@ -156,9 +156,13 @@ push() pop() ### Queue API The queue class templates provide the following member functions: * `try_push` - Appends an element to the end of the queue. Returns `false` when the queue is full. +* `try_push` (batch) - Appends an iterator range to the end of the queue. Returns the iterator one past the last element that got pushed, which could be different from the last iterator when the queue becomes full. * `try_pop` - Removes an element from the front of the queue. Returns `false` when the queue is empty. +* `try_pop` (batch) - Removes elements from the front of the queue and places them into an iterator. Returns the number of elements popped, which may be different from the number of requested elements if the queue becomes empty. * `push` (optimist) - Appends an element to the end of the queue. Busy waits when the queue is full. Faster than `try_push` when the queue is not full. Optional FIFO producer queuing and total order. +* `push` (optimist, batch) - Appends an iterator range to the end of the queue. Busy waits when the queue is full. Returns the iterator past the last pushed element. Faster than `try_push` when the queue is not full. Optional FIFO producer queuing and total order. * `pop` (optimist) - Removes an element from the front of the queue. Busy waits when the queue is empty. Faster than `try_pop` when the queue is not empty. Optional FIFO consumer queuing and total order. +* `pop` (optimist, batch) - Removes elements from the front of the queue and places them into an iterator. Returns the iterator past the last popped element. Busy waits when the queue is empty. Faster than `try_pop` when the queue is not empty. Optional FIFO consumer queuing and total order. * `was_size` - Returns the number of unconsumed elements during the call. The state may have changed by the time the return value is examined. * `was_empty` - Returns `true` if the container was empty during the call. The state may have changed by the time the return value is examined. * `was_full` - Returns `true` if the container was full during the call. The state may have changed by the time the return value is examined. From 05b9c73f7a0cf3ae2698730e64129cc915ce2ad6 Mon Sep 17 00:00:00 2001 From: Raphael Steiner Date: Mon, 20 Apr 2026 18:55:09 +0200 Subject: [PATCH 12/19] added Batch to test --- src/tests.cc | 25 +++++++++++++------------ 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/src/tests.cc b/src/tests.cc index 7cc28fc..04ae5d7 100644 --- a/src/tests.cc +++ b/src/tests.cc @@ -15,6 +15,7 @@ #include #include #include +#include //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -25,7 +26,7 @@ using namespace ::atomic_queue; enum { N_STRESS_MSG = 1000000 }; enum { STOP_MSG = -1 }; enum { CAPACITY = 4096 }; -constexpr std::array BATCH_SIZES = {1, 4, 12}; +constexpr std::array BATCH_SIZES = {1, 4, 12}; using stress_queues = boost::mpl::list< AtomicQueue, @@ -57,21 +58,21 @@ BOOST_AUTO_TEST_CASE_TEMPLATE(stress, Queue, stress_queues) { }; using T = typename Queue::value_type; - for (int const BATCH : BATCH_SIZES) { + for (int const BATCH_SIZE : BATCH_SIZES) { Queue q; Barrier barrier; std::thread producers[PRODUCERS]; for(unsigned i = 0; i < PRODUCERS; ++i) - producers[i] = std::thread([&q, &barrier]() { + producers[i] = std::thread([&q, &barrier, BATCH_SIZE]() { barrier.wait(); for(T n = N_STRESS_MSG; n;) { - if (BATCH <= 1) { + if (BATCH_SIZE <= 1) { q.push(n--); } else { - std::array buffer; - typename std::array::iterator it = buffer.begin(); + std::vector buffer(BATCH_SIZE); + typename std::vector::iterator it = buffer.begin(); while (it != buffer.end() && n) { *it++ = n--; } @@ -83,19 +84,19 @@ BOOST_AUTO_TEST_CASE_TEMPLATE(stress, Queue, stress_queues) { uint64_t results[CONSUMERS]; std::thread consumers[CONSUMERS]; for(unsigned i = 0; i < CONSUMERS; ++i) - consumers[i] = std::thread([&q, &barrier, &r = results[i]]() { + consumers[i] = std::thread([&q, &barrier, &r = results[i], BATCH_SIZE]() { barrier.wait(); uint64_t result = 0; - if (BATCH <= 1) { + if (BATCH_SIZE <= 1) { for(T n; (n = q.pop()) != static_cast(STOP_MSG);) result += n; } else { bool continue_pops = true; while (continue_pops) { - std::array buffer; - typename std::array::iterator out_it = q.pop(buffer.begin(), BATCH); - for (typename std::array::iterator it = buffer.begin(); it != out_it; ++it) { + std::vector buffer(BATCH_SIZE); + typename std::vector::iterator out_it = q.pop(buffer.begin(), BATCH_SIZE); + for (typename std::vector::iterator it = buffer.begin(); it != out_it; ++it) { if (*it != static_cast(STOP_MSG)) { result += *it; } @@ -111,7 +112,7 @@ BOOST_AUTO_TEST_CASE_TEMPLATE(stress, Queue, stress_queues) { barrier.release(PRODUCERS + CONSUMERS); for(auto& t : producers) t.join(); - for(int i = CONSUMERS * std::max(1, BATCH); i--;) + for(int i = CONSUMERS * std::max(1, BATCH_SIZE); i--;) q.push(STOP_MSG); for(auto& t : consumers) t.join(); From cac80b9152087e4bc5bc589ec3e6d8a2fe06e7c6 Mon Sep 17 00:00:00 2001 From: Raphael Steiner Date: Mon, 20 Apr 2026 19:05:11 +0200 Subject: [PATCH 13/19] using auto fo type deduction --- src/tests.cc | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/tests.cc b/src/tests.cc index 04ae5d7..15f7f80 100644 --- a/src/tests.cc +++ b/src/tests.cc @@ -9,7 +9,6 @@ #include "atomic_queue/barrier.h" #include "benchmarks.h" -#include #include #include #include @@ -26,7 +25,7 @@ using namespace ::atomic_queue; enum { N_STRESS_MSG = 1000000 }; enum { STOP_MSG = -1 }; enum { CAPACITY = 4096 }; -constexpr std::array BATCH_SIZES = {1, 4, 12}; +constexpr auto BATCH_SIZES = {1, 4, 7, 12}; using stress_queues = boost::mpl::list< AtomicQueue, From 8601f80b1936d8c13a1c5819d585b37581a7760a Mon Sep 17 00:00:00 2001 From: Raphael Steiner Date: Tue, 21 Apr 2026 19:22:20 +0200 Subject: [PATCH 14/19] separating batch tests from existing tests --- src/tests.cc | 82 +++++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 65 insertions(+), 17 deletions(-) diff --git a/src/tests.cc b/src/tests.cc index 15f7f80..5303c3d 100644 --- a/src/tests.cc +++ b/src/tests.cc @@ -25,6 +25,7 @@ using namespace ::atomic_queue; enum { N_STRESS_MSG = 1000000 }; enum { STOP_MSG = -1 }; enum { CAPACITY = 4096 }; + constexpr auto BATCH_SIZES = {1, 4, 7, 12}; using stress_queues = boost::mpl::list< @@ -57,6 +58,55 @@ BOOST_AUTO_TEST_CASE_TEMPLATE(stress, Queue, stress_queues) { }; using T = typename Queue::value_type; + Queue q; + Barrier barrier; + + std::thread producers[PRODUCERS]; + for(unsigned i = 0; i < PRODUCERS; ++i) + producers[i] = std::thread([&q, &barrier]() { + barrier.wait(); + for(T n = N_STRESS_MSG; n; --n) + q.push(n); + }); + + uint64_t results[CONSUMERS]; + std::thread consumers[CONSUMERS]; + for(unsigned i = 0; i < CONSUMERS; ++i) + consumers[i] = std::thread([&q, &barrier, &r = results[i]]() { + barrier.wait(); + uint64_t result = 0; + for(T n; (n = q.pop()) != static_cast(STOP_MSG);) + result += n; + r = result; + }); + + barrier.release(PRODUCERS + CONSUMERS); + for(auto& t : producers) + t.join(); + for(int i = CONSUMERS; i--;) + q.push(STOP_MSG); + for(auto& t : consumers) + t.join(); + + constexpr uint64_t expected_result = (N_STRESS_MSG + 1) / 2. * N_STRESS_MSG * PRODUCERS; + constexpr uint64_t consumer_result_min = expected_result / CONSUMERS / 10; + uint64_t result = 0; + for(auto& r : results) { + BOOST_WARN_GT(r, consumer_result_min); // Make sure a consumer didn't starve. False positives are possible here. + result += r; + } + int64_t result_diff = result - expected_result; + BOOST_CHECK_EQUAL(result_diff, 0); +} + +// Check that all (batch) push'es are ever (batch) pop'ed once with multiple producer and multiple consumers. +BOOST_AUTO_TEST_CASE_TEMPLATE(stress_batch, Queue, stress_queues) { + enum { + PRODUCERS = Queue::is_spsc() ? 1 : 3, + CONSUMERS = Queue::is_spsc() ? 1 : 3 + }; + using T = typename Queue::value_type; + for (int const BATCH_SIZE : BATCH_SIZES) { Queue q; Barrier barrier; @@ -66,17 +116,14 @@ BOOST_AUTO_TEST_CASE_TEMPLATE(stress, Queue, stress_queues) { producers[i] = std::thread([&q, &barrier, BATCH_SIZE]() { barrier.wait(); for(T n = N_STRESS_MSG; n;) { - if (BATCH_SIZE <= 1) { - q.push(n--); - } - else { - std::vector buffer(BATCH_SIZE); - typename std::vector::iterator it = buffer.begin(); - while (it != buffer.end() && n) { - *it++ = n--; - } - q.push(buffer.begin(), it); + // Inserting elements into local buffer before + std::vector buffer(BATCH_SIZE); + typename std::vector::iterator it = buffer.begin(); + while (it != buffer.end() && n) { + *it++ = n--; } + // Pushing them to the queue + q.push(buffer.begin(), it); } }); @@ -86,15 +133,15 @@ BOOST_AUTO_TEST_CASE_TEMPLATE(stress, Queue, stress_queues) { consumers[i] = std::thread([&q, &barrier, &r = results[i], BATCH_SIZE]() { barrier.wait(); uint64_t result = 0; - if (BATCH_SIZE <= 1) { - for(T n; (n = q.pop()) != static_cast(STOP_MSG);) - result += n; - } - else { + + // Allocating local buffer + std::vector buffer(BATCH_SIZE); + { bool continue_pops = true; while (continue_pops) { - std::vector buffer(BATCH_SIZE); + // Popping into local buffer before typename std::vector::iterator out_it = q.pop(buffer.begin(), BATCH_SIZE); + // Accumulating the output for (typename std::vector::iterator it = buffer.begin(); it != out_it; ++it) { if (*it != static_cast(STOP_MSG)) { result += *it; @@ -111,7 +158,8 @@ BOOST_AUTO_TEST_CASE_TEMPLATE(stress, Queue, stress_queues) { barrier.release(PRODUCERS + CONSUMERS); for(auto& t : producers) t.join(); - for(int i = CONSUMERS * std::max(1, BATCH_SIZE); i--;) + // At least (CONSUMERS - 1) * BATCH_SIZE + 1 stop messages are required due to batch pop's + for(int i = CONSUMERS * BATCH_SIZE; i--;) q.push(STOP_MSG); for(auto& t : consumers) t.join(); From accd5a7b9433b20f17b7ee33a51b905dee588339 Mon Sep 17 00:00:00 2001 From: Raphael Steiner Date: Wed, 22 Apr 2026 18:56:38 +0200 Subject: [PATCH 15/19] safer tests --- src/tests.cc | 150 +++++++++++++++++++++++++++++---------------------- 1 file changed, 87 insertions(+), 63 deletions(-) diff --git a/src/tests.cc b/src/tests.cc index 5303c3d..3566846 100644 --- a/src/tests.cc +++ b/src/tests.cc @@ -9,9 +9,13 @@ #include "atomic_queue/barrier.h" #include "benchmarks.h" +#include #include #include #include +#include +#include +#include #include #include #include @@ -26,8 +30,6 @@ enum { N_STRESS_MSG = 1000000 }; enum { STOP_MSG = -1 }; enum { CAPACITY = 4096 }; -constexpr auto BATCH_SIZES = {1, 4, 7, 12}; - using stress_queues = boost::mpl::list< AtomicQueue, AtomicQueue2, @@ -107,73 +109,95 @@ BOOST_AUTO_TEST_CASE_TEMPLATE(stress_batch, Queue, stress_queues) { }; using T = typename Queue::value_type; - for (int const BATCH_SIZE : BATCH_SIZES) { - Queue q; - Barrier barrier; - - std::thread producers[PRODUCERS]; - for(unsigned i = 0; i < PRODUCERS; ++i) - producers[i] = std::thread([&q, &barrier, BATCH_SIZE]() { - barrier.wait(); - for(T n = N_STRESS_MSG; n;) { - // Inserting elements into local buffer before - std::vector buffer(BATCH_SIZE); - typename std::vector::iterator it = buffer.begin(); - while (it != buffer.end() && n) { - *it++ = n--; - } - // Pushing them to the queue - q.push(buffer.begin(), it); - } - }); - - uint64_t results[CONSUMERS]; - std::thread consumers[CONSUMERS]; - for(unsigned i = 0; i < CONSUMERS; ++i) - consumers[i] = std::thread([&q, &barrier, &r = results[i], BATCH_SIZE]() { - barrier.wait(); - uint64_t result = 0; - - // Allocating local buffer + Queue q; + Barrier barrier; + + std::size_t const seed = std::time(nullptr); + std::mt19937 gen(seed); + std::uniform_int_distribution<> distr(1, 2 * q.capacity()); + int const BATCH_SIZE = distr(gen); + + std::vector> number_of_pops(CONSUMERS); + for (auto & val : number_of_pops) val.store(0, X); + + std::vector> consumer_finished(CONSUMERS); + for (auto & val : consumer_finished) val.store(false, X); + + std::thread producers[PRODUCERS]; + for(unsigned i = 0; i < PRODUCERS; ++i) + producers[i] = std::thread([&q, &barrier, BATCH_SIZE]() { + barrier.wait(); + for(T n = N_STRESS_MSG; n;) { + // Inserting elements into local buffer before std::vector buffer(BATCH_SIZE); - { - bool continue_pops = true; - while (continue_pops) { - // Popping into local buffer before - typename std::vector::iterator out_it = q.pop(buffer.begin(), BATCH_SIZE); - // Accumulating the output - for (typename std::vector::iterator it = buffer.begin(); it != out_it; ++it) { - if (*it != static_cast(STOP_MSG)) { - result += *it; - } - else { - continue_pops = false; - } + typename std::vector::iterator it = buffer.begin(); + while (it != buffer.end() && n) { + *it++ = n--; + } + // Pushing them to the queue + q.push(buffer.begin(), it); + } + }); + + uint64_t results[CONSUMERS]; + std::thread consumers[CONSUMERS]; + for(unsigned i = 0; i < CONSUMERS; ++i) + consumers[i] = std::thread([&q, &barrier, &r = results[i], &n_pop = number_of_pops[i], &finished = consumer_finished[i], BATCH_SIZE]() { + barrier.wait(); + uint64_t result = 0; + + // Allocating local buffer + std::vector buffer(BATCH_SIZE); + { + bool continue_pops = true; + while (continue_pops) { + // Popping into local buffer before + n_pop.fetch_add(BATCH_SIZE, A); + typename std::vector::iterator out_it = q.pop(buffer.begin(), BATCH_SIZE); + // Accumulating the output + for (typename std::vector::iterator it = buffer.begin(); it != out_it; ++it) { + if (*it != static_cast(STOP_MSG)) { + result += *it; + } + else { + continue_pops = false; } } } - r = result; - }); - - barrier.release(PRODUCERS + CONSUMERS); - for(auto& t : producers) - t.join(); - // At least (CONSUMERS - 1) * BATCH_SIZE + 1 stop messages are required due to batch pop's - for(int i = CONSUMERS * BATCH_SIZE; i--;) - q.push(STOP_MSG); - for(auto& t : consumers) - t.join(); - - constexpr uint64_t expected_result = (N_STRESS_MSG + 1) / 2. * N_STRESS_MSG * PRODUCERS; - constexpr uint64_t consumer_result_min = expected_result / CONSUMERS / 10; - uint64_t result = 0; - for(auto& r : results) { - BOOST_WARN_GT(r, consumer_result_min); // Make sure a consumer didn't starve. False positives are possible here. - result += r; + finished.store(true, R); + } + r = result; + }); + + barrier.release(PRODUCERS + CONSUMERS); + for(auto& t : producers) + t.join(); + + int const residue = (N_STRESS_MSG * PRODUCERS) % BATCH_SIZE; + for(int i = BATCH_SIZE - residue; i--;) { + q.push(STOP_MSG); + } + int number_of_pushes = (N_STRESS_MSG * PRODUCERS) / BATCH_SIZE * BATCH_SIZE + BATCH_SIZE; + while(not std::all_of(consumer_finished.cbegin(), consumer_finished.cend(), [](const auto &val) { return val.load(A); })) { + while(std::accumulate(number_of_pops.cbegin(), number_of_pops.cend(), 0, [](int acc, const auto &val) { return acc + val.load(X);} ) > number_of_pushes) { + for(int i = BATCH_SIZE; i--;) + q.push(STOP_MSG); + number_of_pushes += BATCH_SIZE; } - int64_t result_diff = result - expected_result; - BOOST_CHECK_EQUAL(result_diff, 0); } + + for(auto& t : consumers) + t.join(); + + constexpr uint64_t expected_result = (N_STRESS_MSG + 1) / 2. * N_STRESS_MSG * PRODUCERS; + constexpr uint64_t consumer_result_min = expected_result / CONSUMERS / 10; + uint64_t result = 0; + for(auto& r : results) { + BOOST_WARN_GT(r, consumer_result_min); // Make sure a consumer didn't starve. False positives are possible here. + result += r; + } + int64_t result_diff = result - expected_result; + BOOST_CHECK_EQUAL(result_diff, 0); } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// From e619cb3c3bab63ab864463cb7fa03a1bc27129de Mon Sep 17 00:00:00 2001 From: Raphael Steiner Date: Thu, 23 Apr 2026 07:06:42 +0200 Subject: [PATCH 16/19] changed to higher granularity --- src/tests.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/tests.cc b/src/tests.cc index 3566846..fc2d46d 100644 --- a/src/tests.cc +++ b/src/tests.cc @@ -12,8 +12,8 @@ #include #include #include +#include #include -#include #include #include #include @@ -112,7 +112,7 @@ BOOST_AUTO_TEST_CASE_TEMPLATE(stress_batch, Queue, stress_queues) { Queue q; Barrier barrier; - std::size_t const seed = std::time(nullptr); + std::size_t const seed = std::chrono::high_resolution_clock::now().time_since_epoch().count(); std::mt19937 gen(seed); std::uniform_int_distribution<> distr(1, 2 * q.capacity()); int const BATCH_SIZE = distr(gen); From be688a868f71e6b3de201ed345cd276122060b9a Mon Sep 17 00:00:00 2001 From: Raphael Steiner Date: Thu, 23 Apr 2026 19:02:01 +0200 Subject: [PATCH 17/19] minor bugfix --- src/benchmarks.h | 1 + 1 file changed, 1 insertion(+) diff --git a/src/benchmarks.h b/src/benchmarks.h index 203050f..c215966 100644 --- a/src/benchmarks.h +++ b/src/benchmarks.h @@ -90,6 +90,7 @@ struct RetryDecorator : Queue { template ATOMIC_QUEUE_INLINE OutputIt pop(OutputIt first, int n) noexcept { + if (n <= 0) return first; while(n -= this->try_pop(first, n)) { spin_loop_pause(); } From 4e6525f52010cd61d9dc021bcba9d4c23d526608 Mon Sep 17 00:00:00 2001 From: Raphael Steiner Date: Thu, 23 Apr 2026 19:44:43 +0200 Subject: [PATCH 18/19] improved test coverage --- src/tests.cc | 34 ++++++++++++++++++---------------- 1 file changed, 18 insertions(+), 16 deletions(-) diff --git a/src/tests.cc b/src/tests.cc index fc2d46d..ec0f332 100644 --- a/src/tests.cc +++ b/src/tests.cc @@ -112,11 +112,6 @@ BOOST_AUTO_TEST_CASE_TEMPLATE(stress_batch, Queue, stress_queues) { Queue q; Barrier barrier; - std::size_t const seed = std::chrono::high_resolution_clock::now().time_since_epoch().count(); - std::mt19937 gen(seed); - std::uniform_int_distribution<> distr(1, 2 * q.capacity()); - int const BATCH_SIZE = distr(gen); - std::vector> number_of_pops(CONSUMERS); for (auto & val : number_of_pops) val.store(0, X); @@ -125,10 +120,15 @@ BOOST_AUTO_TEST_CASE_TEMPLATE(stress_batch, Queue, stress_queues) { std::thread producers[PRODUCERS]; for(unsigned i = 0; i < PRODUCERS; ++i) - producers[i] = std::thread([&q, &barrier, BATCH_SIZE]() { + producers[i] = std::thread([&q, &barrier]() { + std::size_t const seed = std::chrono::high_resolution_clock::now().time_since_epoch().count(); + std::mt19937 gen(seed); + std::uniform_int_distribution<> distr(1, 2 * q.capacity());; + barrier.wait(); for(T n = N_STRESS_MSG; n;) { // Inserting elements into local buffer before + int const BATCH_SIZE = distr(gen); std::vector buffer(BATCH_SIZE); typename std::vector::iterator it = buffer.begin(); while (it != buffer.end() && n) { @@ -142,17 +142,22 @@ BOOST_AUTO_TEST_CASE_TEMPLATE(stress_batch, Queue, stress_queues) { uint64_t results[CONSUMERS]; std::thread consumers[CONSUMERS]; for(unsigned i = 0; i < CONSUMERS; ++i) - consumers[i] = std::thread([&q, &barrier, &r = results[i], &n_pop = number_of_pops[i], &finished = consumer_finished[i], BATCH_SIZE]() { + consumers[i] = std::thread([&q, &barrier, &r = results[i], &n_pop = number_of_pops[i], &finished = consumer_finished[i]]() { + std::size_t const seed = std::chrono::high_resolution_clock::now().time_since_epoch().count(); + std::mt19937 gen(seed); + std::uniform_int_distribution<> distr(1, 2 * q.capacity());; + barrier.wait(); uint64_t result = 0; // Allocating local buffer - std::vector buffer(BATCH_SIZE); { bool continue_pops = true; while (continue_pops) { + int const BATCH_SIZE = distr(gen); // Popping into local buffer before n_pop.fetch_add(BATCH_SIZE, A); + std::vector buffer(BATCH_SIZE); typename std::vector::iterator out_it = q.pop(buffer.begin(), BATCH_SIZE); // Accumulating the output for (typename std::vector::iterator it = buffer.begin(); it != out_it; ++it) { @@ -173,16 +178,13 @@ BOOST_AUTO_TEST_CASE_TEMPLATE(stress_batch, Queue, stress_queues) { for(auto& t : producers) t.join(); - int const residue = (N_STRESS_MSG * PRODUCERS) % BATCH_SIZE; - for(int i = BATCH_SIZE - residue; i--;) { - q.push(STOP_MSG); - } - int number_of_pushes = (N_STRESS_MSG * PRODUCERS) / BATCH_SIZE * BATCH_SIZE + BATCH_SIZE; + int number_of_pushes = N_STRESS_MSG * PRODUCERS; while(not std::all_of(consumer_finished.cbegin(), consumer_finished.cend(), [](const auto &val) { return val.load(A); })) { - while(std::accumulate(number_of_pops.cbegin(), number_of_pops.cend(), 0, [](int acc, const auto &val) { return acc + val.load(X);} ) > number_of_pushes) { - for(int i = BATCH_SIZE; i--;) + for(int n; (n = std::accumulate(number_of_pops.cbegin(), number_of_pops.cend(), 0, [](int acc, const auto &val) { return acc + val.load(X);}) - number_of_pushes) > 0;) { + number_of_pushes += n; + do { q.push(STOP_MSG); - number_of_pushes += BATCH_SIZE; + } while(--n); } } From 3f07e1b698b139e7579efc1fd135a7747fe9b825 Mon Sep 17 00:00:00 2001 From: Raphael Steiner Date: Thu, 23 Apr 2026 19:50:49 +0200 Subject: [PATCH 19/19] improved test speed --- src/tests.cc | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/tests.cc b/src/tests.cc index ec0f332..2c57031 100644 --- a/src/tests.cc +++ b/src/tests.cc @@ -179,7 +179,11 @@ BOOST_AUTO_TEST_CASE_TEMPLATE(stress_batch, Queue, stress_queues) { t.join(); int number_of_pushes = N_STRESS_MSG * PRODUCERS; - while(not std::all_of(consumer_finished.cbegin(), consumer_finished.cend(), [](const auto &val) { return val.load(A); })) { + for (auto it = consumer_finished.cbegin(); it != consumer_finished.cend();) { + if (it->load(A)) { + ++it; + continue; + } for(int n; (n = std::accumulate(number_of_pops.cbegin(), number_of_pops.cend(), 0, [](int acc, const auto &val) { return acc + val.load(X);}) - number_of_pushes) > 0;) { number_of_pushes += n; do {