-
Notifications
You must be signed in to change notification settings - Fork 215
[WIP] Batch pushes and pops #103
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Draft
raphael-s-steiner
wants to merge
17
commits into
max0x7ba:master
Choose a base branch
from
raphael-s-steiner:master
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+214
−0
Draft
Changes from all commits
Commits
Show all changes
17 commits
Select commit
Hold shift + click to select a range
d863612
batch pushes
raphael-s-steiner 1f2211d
Changed to Pointer Signature
raphael-s-steiner 9d7c03c
typo
raphael-s-steiner 9978158
changed signature and tests for batch push
raphael-s-steiner 3d2702b
changed signedness of pointer difference for fewer instructions
raphael-s-steiner 000ac7d
changed signature to iterator
raphael-s-steiner 6b736ad
added return iterator
raphael-s-steiner 5330997
batch pops
raphael-s-steiner a139a59
Batch pop signature change
raphael-s-steiner 38db7c1
formatting
raphael-s-steiner 77e930d
api documentation
raphael-s-steiner 5c050d5
Merge remote-tracking branch 'upstream/master'
raphael-s-steiner 05b9c73
added Batch to test
raphael-s-steiner cac80b9
using auto fo type deduction
raphael-s-steiner 8601f80
separating batch tests from existing tests
raphael-s-steiner accd5a7
safer tests
raphael-s-steiner e619cb3
changed to higher granularity
raphael-s-steiner File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -10,6 +10,7 @@ | |
| #include <cassert> | ||
| #include <cstddef> | ||
| #include <cstdint> | ||
| #include <iterator> | ||
| #include <memory> | ||
| #include <utility> | ||
|
|
||
|
|
@@ -389,6 +390,33 @@ class AtomicQueueCommon { | |
| return true; | ||
| } | ||
|
|
||
| template<class InputIt> | ||
| ATOMIC_QUEUE_INLINE InputIt try_push(InputIt first, InputIt const last) noexcept { | ||
| int n = static_cast<int>(std::distance(first, last)); | ||
| auto head = head_.load(X); | ||
| if(Derived::spsc_) { | ||
| int const slots = static_cast<int>(tail_.load(X) + downcast().size_ - head); | ||
| n = std::min(n, slots); | ||
| if(n <= 0) | ||
| return first; | ||
| head_.store(head + static_cast<unsigned>(n), X); | ||
| } | ||
| else { | ||
| int const length = n; | ||
| do { | ||
| int const slots = static_cast<int>(tail_.load(X) + downcast().size_ - head); | ||
| n = std::min(length, slots); | ||
| if(n <= 0) | ||
| return first; | ||
| } while(ATOMIC_QUEUE_UNLIKELY(!head_.compare_exchange_weak(head, head + static_cast<unsigned>(n), X, X))); // This loop is not FIFO. | ||
| } | ||
|
|
||
| do { | ||
| downcast().do_push(*first++, head++); | ||
| } while(--n); | ||
| return first; | ||
| } | ||
|
|
||
| template<class T> | ||
| ATOMIC_QUEUE_INLINE bool try_pop(T& element) noexcept { | ||
| auto tail = tail_.load(X); | ||
|
|
@@ -408,6 +436,33 @@ class AtomicQueueCommon { | |
| return true; | ||
| } | ||
|
|
||
| template<class OutputIt> | ||
| ATOMIC_QUEUE_INLINE int try_pop(OutputIt& first, int n) noexcept { | ||
| auto tail = tail_.load(X); | ||
| if(Derived::spsc_) { | ||
| int const num_elements = static_cast<int>(head_.load(X) - tail); | ||
| n = std::min(n, num_elements); | ||
| if(n <= 0) | ||
| return 0; | ||
| tail_.store(tail + static_cast<unsigned>(n), X); | ||
| } | ||
| else { | ||
| int const desired_pops = n; | ||
| do { | ||
| int const num_elements = static_cast<int>(head_.load(X) - tail); | ||
| n = std::min(desired_pops, num_elements); | ||
| if(n <= 0) | ||
| return 0; | ||
| } while(ATOMIC_QUEUE_UNLIKELY(!tail_.compare_exchange_weak(tail, tail + static_cast<unsigned>(n), X, X))); // This loop is not FIFO. | ||
| } | ||
|
|
||
| int i = n; | ||
| do { | ||
| *first++ = downcast().do_pop(tail++); | ||
| } while(--i); | ||
| return n; | ||
| } | ||
|
|
||
| template<class T> | ||
| ATOMIC_QUEUE_INLINE void push(T&& element) noexcept { | ||
| unsigned head; | ||
|
|
@@ -422,6 +477,24 @@ class AtomicQueueCommon { | |
| downcast().do_push(std::forward<T>(element), head); | ||
| } | ||
|
|
||
| template<class InputIt> | ||
| ATOMIC_QUEUE_INLINE InputIt push(InputIt first, InputIt const last) noexcept { | ||
| unsigned n = static_cast<unsigned>(std::distance(first, last)); | ||
| unsigned head; | ||
| if(Derived::spsc_) { | ||
| head = head_.load(X); | ||
| head_.store(head + n, X); | ||
| } | ||
| else { | ||
| constexpr auto memory_order = Derived::total_order_ ? std::memory_order_seq_cst : std::memory_order_relaxed; | ||
| head = head_.fetch_add(n, memory_order); // FIFO and total order on Intel regardless, as of 2019. | ||
| } | ||
| while(n--) { | ||
| downcast().do_push(*first++, head++); | ||
| } | ||
| return first; | ||
| } | ||
|
|
||
| ATOMIC_QUEUE_INLINE auto pop() noexcept { | ||
| unsigned tail; | ||
| if(Derived::spsc_) { | ||
|
|
@@ -435,6 +508,23 @@ class AtomicQueueCommon { | |
| return downcast().do_pop(tail); | ||
| } | ||
|
|
||
| template<class OutputIt> | ||
| ATOMIC_QUEUE_INLINE OutputIt pop(OutputIt first, unsigned n) noexcept { | ||
| unsigned tail; | ||
| if(Derived::spsc_) { | ||
| tail = tail_.load(X); | ||
| tail_.store(tail + n, X); | ||
|
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same issue here. |
||
| } | ||
| else { | ||
| constexpr auto memory_order = Derived::total_order_ ? std::memory_order_seq_cst : std::memory_order_relaxed; | ||
| tail = tail_.fetch_add(n, memory_order); // FIFO and total order on Intel regardless, as of 2019. | ||
| } | ||
| while(n--) { | ||
| *first++ = downcast().do_pop(tail++); | ||
| } | ||
| return first; | ||
| } | ||
|
|
||
| ATOMIC_QUEUE_INLINE bool was_empty() const noexcept { | ||
| return !was_size(); | ||
| } | ||
|
|
||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ncan be greater than the buffer size of the number of free slots in the queue. These conditions must be checked.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There does seem to be an issue here, but it is not so trivial to pinpoint the exact data race with a loop around of the buffer that triggers it.
I will need some time to figure this one out.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was an issue in the tests. The stopping condition was not correctly implemented when CONSUMERS * BATCH_SIZE > CAPACITY. This is now fixed.
In general, the issue you are describing can occur already when CONSUMERS * BATCH_SIZE > CAPACITY, meaning this can already happen with a lot of consumer threads even with single optimist push'es and pop's.
The good news this is not a problem for deadlocking (when properly dealing with the (batched) optimist queue). I shall sketch why this is the case later. Tests have been added to cover this case. The bad news is that when CONSUMERS * BATCH_SIZE > CAPACITY, push'es and pops can happen out of order. For example, two producers can be allocated slots head_1 and head_2 with head_1 < head_2 and head_1 % CAPACITY == head_2 % CAPACITY and it can happen now that the producer with allocated slot head_2 push'es first to the slot and the producer with allocated slot head_1 has to wait. This is an issue in so far as the queue no longer acts as "FIFO". The sketch as to why this does not deadlock is that one can imagine that the two producers swap "roles" when this happens, i.e. you swap the data that would be pushed to head_1 and head_2 and now just pretend that it was the first producer corresponding to head_1 that did the push and the second producer is the one waiting. (I need to write this down better)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The case BATCH_SIZE > CAPACITY is not an issue, it just means the producer has to wait until consumers pop'ed enough for the producer to continue push'ing.