diff --git a/README.md b/README.md index b3b614f..91e8848 100644 --- a/README.md +++ b/README.md @@ -166,9 +166,13 @@ push() pop() ### Queue API The queue class templates provide the following member functions: * `try_push` - Appends an element to the end of the queue. Returns `false` when the queue is full. +* `try_push` (batch) - Appends an iterator range to the end of the queue. Returns the iterator one past the last element that got pushed, which could be different from the last iterator when the queue becomes full. * `try_pop` - Removes an element from the front of the queue. Returns `false` when the queue is empty. +* `try_pop` (batch) - Removes elements from the front of the queue and places them into an iterator. Returns the number of elements popped, which may be different from the number of requested elements if the queue becomes empty. * `push` (optimist) - Appends an element to the end of the queue. Busy waits when the queue is full. Faster than `try_push` when the queue is not full. Optional FIFO producer queuing and total order. +* `push` (optimist, batch) - Appends an iterator range to the end of the queue. Busy waits when the queue is full. Returns the iterator past the last pushed element. Faster than `try_push` when the queue is not full. Optional FIFO producer queuing and total order. * `pop` (optimist) - Removes an element from the front of the queue. Busy waits when the queue is empty. Faster than `try_pop` when the queue is not empty. Optional FIFO consumer queuing and total order. +* `pop` (optimist, batch) - Removes elements from the front of the queue and places them into an iterator. Returns the iterator past the last popped element. Busy waits when the queue is empty. Faster than `try_pop` when the queue is not empty. Optional FIFO consumer queuing and total order. * `was_size` - Returns the number of unconsumed elements during the call. The state may have changed by the time the return value is examined. * `was_empty` - Returns `true` if the container was empty during the call. The state may have changed by the time the return value is examined. * `was_full` - Returns `true` if the container was full during the call. The state may have changed by the time the return value is examined. diff --git a/include/atomic_queue/atomic_queue.h b/include/atomic_queue/atomic_queue.h index 7ea221f..5983efb 100644 --- a/include/atomic_queue/atomic_queue.h +++ b/include/atomic_queue/atomic_queue.h @@ -10,6 +10,7 @@ #include #include #include +#include #include #include @@ -389,6 +390,33 @@ class AtomicQueueCommon { return true; } + template + ATOMIC_QUEUE_INLINE InputIt try_push(InputIt first, InputIt const last) noexcept { + int n = static_cast(std::distance(first, last)); + auto head = head_.load(X); + if(Derived::spsc_) { + int const slots = static_cast(tail_.load(X) + downcast().size_ - head); + n = std::min(n, slots); + if(n <= 0) + return first; + head_.store(head + static_cast(n), X); + } + else { + int const length = n; + do { + int const slots = static_cast(tail_.load(X) + downcast().size_ - head); + n = std::min(length, slots); + if(n <= 0) + return first; + } while(ATOMIC_QUEUE_UNLIKELY(!head_.compare_exchange_weak(head, head + static_cast(n), X, X))); // This loop is not FIFO. + } + + do { + downcast().do_push(*first++, head++); + } while(--n); + return first; + } + template ATOMIC_QUEUE_INLINE bool try_pop(T& element) noexcept { auto tail = tail_.load(X); @@ -408,6 +436,33 @@ class AtomicQueueCommon { return true; } + template + ATOMIC_QUEUE_INLINE int try_pop(OutputIt& first, int n) noexcept { + auto tail = tail_.load(X); + if(Derived::spsc_) { + int const num_elements = static_cast(head_.load(X) - tail); + n = std::min(n, num_elements); + if(n <= 0) + return 0; + tail_.store(tail + static_cast(n), X); + } + else { + int const desired_pops = n; + do { + int const num_elements = static_cast(head_.load(X) - tail); + n = std::min(desired_pops, num_elements); + if(n <= 0) + return 0; + } while(ATOMIC_QUEUE_UNLIKELY(!tail_.compare_exchange_weak(tail, tail + static_cast(n), X, X))); // This loop is not FIFO. + } + + int i = n; + do { + *first++ = downcast().do_pop(tail++); + } while(--i); + return n; + } + template ATOMIC_QUEUE_INLINE void push(T&& element) noexcept { unsigned head; @@ -422,6 +477,24 @@ class AtomicQueueCommon { downcast().do_push(std::forward(element), head); } + template + ATOMIC_QUEUE_INLINE InputIt push(InputIt first, InputIt const last) noexcept { + unsigned n = static_cast(std::distance(first, last)); + unsigned head; + if(Derived::spsc_) { + head = head_.load(X); + head_.store(head + n, X); + } + else { + constexpr auto memory_order = Derived::total_order_ ? std::memory_order_seq_cst : std::memory_order_relaxed; + head = head_.fetch_add(n, memory_order); // FIFO and total order on Intel regardless, as of 2019. + } + while(n--) { + downcast().do_push(*first++, head++); + } + return first; + } + ATOMIC_QUEUE_INLINE auto pop() noexcept { unsigned tail; if(Derived::spsc_) { @@ -435,6 +508,23 @@ class AtomicQueueCommon { return downcast().do_pop(tail); } + template + ATOMIC_QUEUE_INLINE OutputIt pop(OutputIt first, unsigned n) noexcept { + unsigned tail; + if(Derived::spsc_) { + tail = tail_.load(X); + tail_.store(tail + n, X); + } + else { + constexpr auto memory_order = Derived::total_order_ ? std::memory_order_seq_cst : std::memory_order_relaxed; + tail = tail_.fetch_add(n, memory_order); // FIFO and total order on Intel regardless, as of 2019. + } + while(n--) { + *first++ = downcast().do_pop(tail++); + } + return first; + } + ATOMIC_QUEUE_INLINE bool was_empty() const noexcept { return !was_size(); } diff --git a/src/benchmarks.h b/src/benchmarks.h index fb80adc..c215966 100644 --- a/src/benchmarks.h +++ b/src/benchmarks.h @@ -73,12 +73,29 @@ struct RetryDecorator : Queue { spin_loop_pause(); } + template + ATOMIC_QUEUE_INLINE InputIt push(InputIt first, InputIt const last) noexcept { + while(last != (first = this->try_push(first, last))) { + spin_loop_pause(); + } + return first; + } + ATOMIC_QUEUE_INLINE T pop() noexcept { T element; while(!this->try_pop(element)) spin_loop_pause(); return element; } + + template + ATOMIC_QUEUE_INLINE OutputIt pop(OutputIt first, int n) noexcept { + if (n <= 0) return first; + while(n -= this->try_pop(first, n)) { + spin_loop_pause(); + } + return first; + } }; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/tests.cc b/src/tests.cc index ac45281..2c57031 100644 --- a/src/tests.cc +++ b/src/tests.cc @@ -9,11 +9,16 @@ #include "atomic_queue/barrier.h" #include "benchmarks.h" +#include #include #include +#include #include +#include +#include #include #include +#include //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -96,6 +101,111 @@ BOOST_AUTO_TEST_CASE_TEMPLATE(stress, Queue, stress_queues) { BOOST_CHECK_EQUAL(result_diff, 0); } +// Check that all (batch) push'es are ever (batch) pop'ed once with multiple producer and multiple consumers. +BOOST_AUTO_TEST_CASE_TEMPLATE(stress_batch, Queue, stress_queues) { + enum { + PRODUCERS = Queue::is_spsc() ? 1 : 3, + CONSUMERS = Queue::is_spsc() ? 1 : 3 + }; + using T = typename Queue::value_type; + + Queue q; + Barrier barrier; + + std::vector> number_of_pops(CONSUMERS); + for (auto & val : number_of_pops) val.store(0, X); + + std::vector> consumer_finished(CONSUMERS); + for (auto & val : consumer_finished) val.store(false, X); + + std::thread producers[PRODUCERS]; + for(unsigned i = 0; i < PRODUCERS; ++i) + producers[i] = std::thread([&q, &barrier]() { + std::size_t const seed = std::chrono::high_resolution_clock::now().time_since_epoch().count(); + std::mt19937 gen(seed); + std::uniform_int_distribution<> distr(1, 2 * q.capacity());; + + barrier.wait(); + for(T n = N_STRESS_MSG; n;) { + // Inserting elements into local buffer before + int const BATCH_SIZE = distr(gen); + std::vector buffer(BATCH_SIZE); + typename std::vector::iterator it = buffer.begin(); + while (it != buffer.end() && n) { + *it++ = n--; + } + // Pushing them to the queue + q.push(buffer.begin(), it); + } + }); + + uint64_t results[CONSUMERS]; + std::thread consumers[CONSUMERS]; + for(unsigned i = 0; i < CONSUMERS; ++i) + consumers[i] = std::thread([&q, &barrier, &r = results[i], &n_pop = number_of_pops[i], &finished = consumer_finished[i]]() { + std::size_t const seed = std::chrono::high_resolution_clock::now().time_since_epoch().count(); + std::mt19937 gen(seed); + std::uniform_int_distribution<> distr(1, 2 * q.capacity());; + + barrier.wait(); + uint64_t result = 0; + + // Allocating local buffer + { + bool continue_pops = true; + while (continue_pops) { + int const BATCH_SIZE = distr(gen); + // Popping into local buffer before + n_pop.fetch_add(BATCH_SIZE, A); + std::vector buffer(BATCH_SIZE); + typename std::vector::iterator out_it = q.pop(buffer.begin(), BATCH_SIZE); + // Accumulating the output + for (typename std::vector::iterator it = buffer.begin(); it != out_it; ++it) { + if (*it != static_cast(STOP_MSG)) { + result += *it; + } + else { + continue_pops = false; + } + } + } + finished.store(true, R); + } + r = result; + }); + + barrier.release(PRODUCERS + CONSUMERS); + for(auto& t : producers) + t.join(); + + int number_of_pushes = N_STRESS_MSG * PRODUCERS; + for (auto it = consumer_finished.cbegin(); it != consumer_finished.cend();) { + if (it->load(A)) { + ++it; + continue; + } + for(int n; (n = std::accumulate(number_of_pops.cbegin(), number_of_pops.cend(), 0, [](int acc, const auto &val) { return acc + val.load(X);}) - number_of_pushes) > 0;) { + number_of_pushes += n; + do { + q.push(STOP_MSG); + } while(--n); + } + } + + for(auto& t : consumers) + t.join(); + + constexpr uint64_t expected_result = (N_STRESS_MSG + 1) / 2. * N_STRESS_MSG * PRODUCERS; + constexpr uint64_t consumer_result_min = expected_result / CONSUMERS / 10; + uint64_t result = 0; + for(auto& r : results) { + BOOST_WARN_GT(r, consumer_result_min); // Make sure a consumer didn't starve. False positives are possible here. + result += r; + } + int64_t result_diff = result - expected_result; + BOOST_CHECK_EQUAL(result_diff, 0); +} + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// namespace {