Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,13 @@ push() pop()
### Queue API
The queue class templates provide the following member functions:
* `try_push` - Appends an element to the end of the queue. Returns `false` when the queue is full.
* `try_push` (batch) - Appends an iterator range to the end of the queue. Returns the iterator one past the last element that got pushed, which could be different from the last iterator when the queue becomes full.
* `try_pop` - Removes an element from the front of the queue. Returns `false` when the queue is empty.
* `try_pop` (batch) - Removes elements from the front of the queue and places them into an iterator. Returns the number of elements popped, which may be different from the number of requested elements if the queue becomes empty.
* `push` (optimist) - Appends an element to the end of the queue. Busy waits when the queue is full. Faster than `try_push` when the queue is not full. Optional FIFO producer queuing and total order.
* `push` (optimist, batch) - Appends an iterator range to the end of the queue. Busy waits when the queue is full. Returns the iterator past the last pushed element. Faster than `try_push` when the queue is not full. Optional FIFO producer queuing and total order.
* `pop` (optimist) - Removes an element from the front of the queue. Busy waits when the queue is empty. Faster than `try_pop` when the queue is not empty. Optional FIFO consumer queuing and total order.
* `pop` (optimist, batch) - Removes elements from the front of the queue and places them into an iterator. Returns the iterator past the last popped element. Busy waits when the queue is empty. Faster than `try_pop` when the queue is not empty. Optional FIFO consumer queuing and total order.
* `was_size` - Returns the number of unconsumed elements during the call. The state may have changed by the time the return value is examined.
* `was_empty` - Returns `true` if the container was empty during the call. The state may have changed by the time the return value is examined.
* `was_full` - Returns `true` if the container was full during the call. The state may have changed by the time the return value is examined.
Expand Down
90 changes: 90 additions & 0 deletions include/atomic_queue/atomic_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <cassert>
#include <cstddef>
#include <cstdint>
#include <iterator>
#include <memory>
#include <utility>

Expand Down Expand Up @@ -389,6 +390,33 @@ class AtomicQueueCommon {
return true;
}

template<class InputIt>
ATOMIC_QUEUE_INLINE InputIt try_push(InputIt first, InputIt const last) noexcept {
int n = static_cast<int>(std::distance(first, last));
auto head = head_.load(X);
if(Derived::spsc_) {
int const slots = static_cast<int>(tail_.load(X) + downcast().size_ - head);
n = std::min(n, slots);
if(n <= 0)
return first;
head_.store(head + static_cast<unsigned>(n), X);
}
else {
int const length = n;
do {
int const slots = static_cast<int>(tail_.load(X) + downcast().size_ - head);
n = std::min(length, slots);
if(n <= 0)
return first;
} while(ATOMIC_QUEUE_UNLIKELY(!head_.compare_exchange_weak(head, head + static_cast<unsigned>(n), X, X))); // This loop is not FIFO.
}

do {
downcast().do_push(*first++, head++);
} while(--n);
return first;
}

template<class T>
ATOMIC_QUEUE_INLINE bool try_pop(T& element) noexcept {
auto tail = tail_.load(X);
Expand All @@ -408,6 +436,33 @@ class AtomicQueueCommon {
return true;
}

template<class OutputIt>
ATOMIC_QUEUE_INLINE int try_pop(OutputIt& first, int n) noexcept {
auto tail = tail_.load(X);
if(Derived::spsc_) {
int const num_elements = static_cast<int>(head_.load(X) - tail);
n = std::min(n, num_elements);
if(n <= 0)
return 0;
tail_.store(tail + static_cast<unsigned>(n), X);
}
else {
int const desired_pops = n;
do {
int const num_elements = static_cast<int>(head_.load(X) - tail);
n = std::min(desired_pops, num_elements);
if(n <= 0)
return 0;
} while(ATOMIC_QUEUE_UNLIKELY(!tail_.compare_exchange_weak(tail, tail + static_cast<unsigned>(n), X, X))); // This loop is not FIFO.
}

int i = n;
do {
*first++ = downcast().do_pop(tail++);
} while(--i);
return n;
}

template<class T>
ATOMIC_QUEUE_INLINE void push(T&& element) noexcept {
unsigned head;
Expand All @@ -422,6 +477,24 @@ class AtomicQueueCommon {
downcast().do_push(std::forward<T>(element), head);
}

template<class InputIt>
ATOMIC_QUEUE_INLINE InputIt push(InputIt first, InputIt const last) noexcept {
unsigned n = static_cast<unsigned>(std::distance(first, last));
unsigned head;
if(Derived::spsc_) {
head = head_.load(X);
head_.store(head + n, X);
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

n can be greater than the buffer size of the number of free slots in the queue. These conditions must be checked.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There does seem to be an issue here, but it is not so trivial to pinpoint the exact data race with a loop around of the buffer that triggers it.

I will need some time to figure this one out.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There was an issue in the tests. The stopping condition was not correctly implemented when CONSUMERS * BATCH_SIZE > CAPACITY. This is now fixed.

In general, the issue you are describing can occur already when CONSUMERS * BATCH_SIZE > CAPACITY, meaning this can already happen with a lot of consumer threads even with single optimist push'es and pop's.

The good news this is not a problem for deadlocking (when properly dealing with the (batched) optimist queue). I shall sketch why this is the case later. Tests have been added to cover this case. The bad news is that when CONSUMERS * BATCH_SIZE > CAPACITY, push'es and pops can happen out of order. For example, two producers can be allocated slots head_1 and head_2 with head_1 < head_2 and head_1 % CAPACITY == head_2 % CAPACITY and it can happen now that the producer with allocated slot head_2 push'es first to the slot and the producer with allocated slot head_1 has to wait. This is an issue in so far as the queue no longer acts as "FIFO". The sketch as to why this does not deadlock is that one can imagine that the two producers swap "roles" when this happens, i.e. you swap the data that would be pushed to head_1 and head_2 and now just pretend that it was the first producer corresponding to head_1 that did the push and the second producer is the one waiting. (I need to write this down better)

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The case BATCH_SIZE > CAPACITY is not an issue, it just means the producer has to wait until consumers pop'ed enough for the producer to continue push'ing.

}
else {
constexpr auto memory_order = Derived::total_order_ ? std::memory_order_seq_cst : std::memory_order_relaxed;
head = head_.fetch_add(n, memory_order); // FIFO and total order on Intel regardless, as of 2019.
}
while(n--) {
downcast().do_push(*first++, head++);
}
return first;
}

ATOMIC_QUEUE_INLINE auto pop() noexcept {
unsigned tail;
if(Derived::spsc_) {
Expand All @@ -435,6 +508,23 @@ class AtomicQueueCommon {
return downcast().do_pop(tail);
}

template<class OutputIt>
ATOMIC_QUEUE_INLINE OutputIt pop(OutputIt first, unsigned n) noexcept {
unsigned tail;
if(Derived::spsc_) {
tail = tail_.load(X);
tail_.store(tail + n, X);
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same issue here.

}
else {
constexpr auto memory_order = Derived::total_order_ ? std::memory_order_seq_cst : std::memory_order_relaxed;
tail = tail_.fetch_add(n, memory_order); // FIFO and total order on Intel regardless, as of 2019.
}
while(n--) {
*first++ = downcast().do_pop(tail++);
}
return first;
}

ATOMIC_QUEUE_INLINE bool was_empty() const noexcept {
return !was_size();
}
Expand Down
16 changes: 16 additions & 0 deletions src/benchmarks.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,28 @@ struct RetryDecorator : Queue {
spin_loop_pause();
}

template<class InputIt>
ATOMIC_QUEUE_INLINE InputIt push(InputIt first, InputIt const last) noexcept {
while(last != (first = this->try_push(first, last))) {
spin_loop_pause();
}
return first;
}

ATOMIC_QUEUE_INLINE T pop() noexcept {
T element;
while(!this->try_pop(element))
spin_loop_pause();
return element;
}

template<class OutputIt>
ATOMIC_QUEUE_INLINE OutputIt pop(OutputIt first, int n) noexcept {
while(n -= this->try_pop(first, n)) {
spin_loop_pause();
}
return first;
}
};

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
Expand Down
104 changes: 104 additions & 0 deletions src/tests.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,16 @@
#include "atomic_queue/barrier.h"
#include "benchmarks.h"

#include <algorithm>
#include <boost/mpl/list.hpp>
#include <bitset>
#include <chrono>
#include <cstdint>
#include <numeric>
#include <random>
#include <thread>
#include <string>
#include <vector>

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

Expand Down Expand Up @@ -96,6 +101,105 @@ BOOST_AUTO_TEST_CASE_TEMPLATE(stress, Queue, stress_queues) {
BOOST_CHECK_EQUAL(result_diff, 0);
}

// Check that all (batch) push'es are ever (batch) pop'ed once with multiple producer and multiple consumers.
BOOST_AUTO_TEST_CASE_TEMPLATE(stress_batch, Queue, stress_queues) {
enum {
PRODUCERS = Queue::is_spsc() ? 1 : 3,
CONSUMERS = Queue::is_spsc() ? 1 : 3
};
using T = typename Queue::value_type;

Queue q;
Barrier barrier;

std::size_t const seed = std::chrono::high_resolution_clock::now().time_since_epoch().count();
std::mt19937 gen(seed);
std::uniform_int_distribution<> distr(1, 2 * q.capacity());
int const BATCH_SIZE = distr(gen);

std::vector<std::atomic<int>> number_of_pops(CONSUMERS);
for (auto & val : number_of_pops) val.store(0, X);

std::vector<std::atomic<bool>> consumer_finished(CONSUMERS);
for (auto & val : consumer_finished) val.store(false, X);

std::thread producers[PRODUCERS];
for(unsigned i = 0; i < PRODUCERS; ++i)
producers[i] = std::thread([&q, &barrier, BATCH_SIZE]() {
barrier.wait();
for(T n = N_STRESS_MSG; n;) {
// Inserting elements into local buffer before
std::vector<T> buffer(BATCH_SIZE);
typename std::vector<T>::iterator it = buffer.begin();
while (it != buffer.end() && n) {
*it++ = n--;
}
// Pushing them to the queue
q.push(buffer.begin(), it);
}
});

uint64_t results[CONSUMERS];
std::thread consumers[CONSUMERS];
for(unsigned i = 0; i < CONSUMERS; ++i)
consumers[i] = std::thread([&q, &barrier, &r = results[i], &n_pop = number_of_pops[i], &finished = consumer_finished[i], BATCH_SIZE]() {
barrier.wait();
uint64_t result = 0;

// Allocating local buffer
std::vector<T> buffer(BATCH_SIZE);
{
bool continue_pops = true;
while (continue_pops) {
// Popping into local buffer before
n_pop.fetch_add(BATCH_SIZE, A);
typename std::vector<T>::iterator out_it = q.pop(buffer.begin(), BATCH_SIZE);
// Accumulating the output
for (typename std::vector<T>::iterator it = buffer.begin(); it != out_it; ++it) {
if (*it != static_cast<T>(STOP_MSG)) {
result += *it;
}
else {
continue_pops = false;
}
}
}
finished.store(true, R);
}
r = result;
});

barrier.release(PRODUCERS + CONSUMERS);
for(auto& t : producers)
t.join();

int const residue = (N_STRESS_MSG * PRODUCERS) % BATCH_SIZE;
for(int i = BATCH_SIZE - residue; i--;) {
q.push(STOP_MSG);
}
int number_of_pushes = (N_STRESS_MSG * PRODUCERS) / BATCH_SIZE * BATCH_SIZE + BATCH_SIZE;
while(not std::all_of(consumer_finished.cbegin(), consumer_finished.cend(), [](const auto &val) { return val.load(A); })) {
while(std::accumulate(number_of_pops.cbegin(), number_of_pops.cend(), 0, [](int acc, const auto &val) { return acc + val.load(X);} ) > number_of_pushes) {
for(int i = BATCH_SIZE; i--;)
q.push(STOP_MSG);
number_of_pushes += BATCH_SIZE;
}
}

for(auto& t : consumers)
t.join();

constexpr uint64_t expected_result = (N_STRESS_MSG + 1) / 2. * N_STRESS_MSG * PRODUCERS;
constexpr uint64_t consumer_result_min = expected_result / CONSUMERS / 10;
uint64_t result = 0;
for(auto& r : results) {
BOOST_WARN_GT(r, consumer_result_min); // Make sure a consumer didn't starve. False positives are possible here.
result += r;
}
int64_t result_diff = result - expected_result;
BOOST_CHECK_EQUAL(result_diff, 0);
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

namespace {
Expand Down