Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 12 additions & 12 deletions include/ParallelPriorityQueue/SpapQueue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ namespace spapq {
* @see QNetwork
* @see WorkerResource
*/
template <typename T, QNetwork netw, template <class, BasicQueue, std::size_t> class WorkerTemplate, BasicQueue LocalQType>
template <typename T, QNetwork netw, template <typename, BasicQueue, std::size_t> class WorkerTemplate, BasicQueue LocalQType>
class SpapQueue final {
template <typename, BasicQueue, std::size_t>
friend class WorkerTemplate;
Expand Down Expand Up @@ -163,7 +163,7 @@ class SpapQueue final {
* @brief Wait till the whole queue has finished processing all tasks.
*
*/
template <typename T, QNetwork netw, template <class, BasicQueue, std::size_t> class WorkerTemplate, BasicQueue LocalQType>
template <typename T, QNetwork netw, template <typename, BasicQueue, std::size_t> class WorkerTemplate, BasicQueue LocalQType>
void SpapQueue<T, netw, WorkerTemplate, LocalQType>::waitProcessFinish() {
for (auto &thread : workers_) {
if (thread.joinable()) { thread.join(); }
Expand All @@ -181,7 +181,7 @@ void SpapQueue<T, netw, WorkerTemplate, LocalQType>::waitProcessFinish() {
* @return true If initialisation has succeeded, i.e., not already initialised.
* @return false If the queue is already active.
*/
template <typename T, QNetwork netw, template <class, BasicQueue, std::size_t> class WorkerTemplate, BasicQueue LocalQType>
template <typename T, QNetwork netw, template <typename, BasicQueue, std::size_t> class WorkerTemplate, BasicQueue LocalQType>
template <typename... Args>
bool SpapQueue<T, netw, WorkerTemplate, LocalQType>::initQueue(Args &&...workerArgs) {
if (queueActive_.exchange(true, std::memory_order_acq_rel)) {
Expand All @@ -203,13 +203,13 @@ bool SpapQueue<T, netw, WorkerTemplate, LocalQType>::initQueue(Args &&...workerA
* @brief Signals the workers to begin processing the queue.
*
*/
template <typename T, QNetwork netw, template <class, BasicQueue, std::size_t> class WorkerTemplate, BasicQueue LocalQType>
template <typename T, QNetwork netw, template <typename, BasicQueue, std::size_t> class WorkerTemplate, BasicQueue LocalQType>
void SpapQueue<T, netw, WorkerTemplate, LocalQType>::processQueue() {
startSignal_.test_and_set(std::memory_order_release);
startSignal_.notify_all();
}

template <typename T, QNetwork netw, template <class, BasicQueue, std::size_t> class WorkerTemplate, BasicQueue LocalQType>
template <typename T, QNetwork netw, template <typename, BasicQueue, std::size_t> class WorkerTemplate, BasicQueue LocalQType>
template <std::size_t tupleSize,
class InputIt,
typename... Args,
Expand Down Expand Up @@ -241,7 +241,7 @@ inline bool SpapQueue<T, netw, WorkerTemplate, LocalQType>::pushInternalHelper(
* @brief Batch push onto channel, return whether succeeded.
*
*/
template <typename T, QNetwork netw, template <class, BasicQueue, std::size_t> class WorkerTemplate, BasicQueue LocalQType>
template <typename T, QNetwork netw, template <typename, BasicQueue, std::size_t> class WorkerTemplate, BasicQueue LocalQType>
template <class InputIt>
inline bool SpapQueue<T, netw, WorkerTemplate, LocalQType>::pushInternal(InputIt first,
InputIt last,
Expand All @@ -260,7 +260,7 @@ inline bool SpapQueue<T, netw, WorkerTemplate, LocalQType>::pushInternal(InputIt
* @param stoken Stop token
* @param workerArgs Arguments to be passed to the worker constructor.
*/
template <typename T, QNetwork netw, template <class, BasicQueue, std::size_t> class WorkerTemplate, BasicQueue LocalQType>
template <typename T, QNetwork netw, template <typename, BasicQueue, std::size_t> class WorkerTemplate, BasicQueue LocalQType>
template <std::size_t N, typename... Args>
void SpapQueue<T, netw, WorkerTemplate, LocalQType>::threadWork(std::stop_token stoken, Args &&...workerArgs) {
static_assert(N < netw.numWorkers_);
Expand Down Expand Up @@ -348,22 +348,22 @@ void SpapQueue<T, netw, WorkerTemplate, LocalQType>::threadWork(std::stop_token
* @brief Request early stop or termination of the queue.
*
*/
template <typename T, QNetwork netw, template <class, BasicQueue, std::size_t> class WorkerTemplate, BasicQueue LocalQType>
template <typename T, QNetwork netw, template <typename, BasicQueue, std::size_t> class WorkerTemplate, BasicQueue LocalQType>
void SpapQueue<T, netw, WorkerTemplate, LocalQType>::requestStop() {
if (not queueActive_.load(std::memory_order_acquire)) { return; }

for (auto &workerThread : workers_) { workerThread.request_stop(); }
processQueue(); // In case worker threads are waiting for start signal
}

template <typename T, QNetwork netw, template <class, BasicQueue, std::size_t> class WorkerTemplate, BasicQueue LocalQType>
template <typename T, QNetwork netw, template <typename, BasicQueue, std::size_t> class WorkerTemplate, BasicQueue LocalQType>
SpapQueue<T, netw, WorkerTemplate, LocalQType>::~SpapQueue() noexcept {
queueActive_.store(true, std::memory_order_relaxed); // Such that nobody else can start the queue
requestStop(); // Required because worker threads can be stuck awaiting start signal
// Deconstructor of jthread automatically joins the worker threads and thus destroys the worker resources
}

template <typename T, QNetwork netw, template <class, BasicQueue, std::size_t> class WorkerTemplate, BasicQueue LocalQType>
template <typename T, QNetwork netw, template <typename, BasicQueue, std::size_t> class WorkerTemplate, BasicQueue LocalQType>
template <std::size_t tupleSize,
typename... Args,
bool networkHomogeneousInPorts,
Expand All @@ -388,7 +388,7 @@ inline void SpapQueue<T, netw, WorkerTemplate, LocalQType>::pushBeforeProcessing
* @param val Task or queue element.
* @param workerId Worker id whose local queue to push to.
*/
template <typename T, QNetwork netw, template <class, BasicQueue, std::size_t> class WorkerTemplate, BasicQueue LocalQType>
template <typename T, QNetwork netw, template <typename, BasicQueue, std::size_t> class WorkerTemplate, BasicQueue LocalQType>
inline void SpapQueue<T, netw, WorkerTemplate, LocalQType>::pushBeforeProcessing(
const value_type val, const std::size_t workerId) noexcept {
globalCount_.fetch_add(1U, std::memory_order_relaxed);
Expand All @@ -409,7 +409,7 @@ inline void SpapQueue<T, netw, WorkerTemplate, LocalQType>::pushBeforeProcessing
* @return false If push failed. This is either because the channel buffer is full or the queue has already
* finished.
*/
template <typename T, QNetwork netw, template <class, BasicQueue, std::size_t> class WorkerTemplate, BasicQueue LocalQType>
template <typename T, QNetwork netw, template <typename, BasicQueue, std::size_t> class WorkerTemplate, BasicQueue LocalQType>
template <std::size_t channel>
inline bool SpapQueue<T, netw, WorkerTemplate, LocalQType>::pushDuringProcessing(const value_type val) noexcept {
static_assert(channel < netw.numChannels_, "Must be a valid channel in the QNetwork.");
Expand Down
14 changes: 7 additions & 7 deletions include/ParallelPriorityQueue/SpapQueueWorker.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ template <typename GlobalQType, BasicQueue LocalQType, std::size_t numPorts>
class WorkerResource {
template <typename, BasicQueue, std::size_t>
friend class WorkerResource;
template <typename, QNetwork, template <class, BasicQueue, std::size_t> class, BasicQueue>
template <typename, QNetwork, template <typename, BasicQueue, std::size_t> class, BasicQueue>
friend class SpapQueue;

public:
Expand Down Expand Up @@ -120,8 +120,8 @@ class WorkerResource {
* @see SpapQueue
* @see WorkerResource
*/
template <template <class, BasicQueue, std::size_t> class WorkerTemplate,
class GlobalQType,
template <template <typename, BasicQueue, std::size_t> class WorkerTemplate,
typename GlobalQType,
BasicQueue LocalQType,
std::size_t N>
consteval bool isDerivedWorkerResource() {
Expand All @@ -145,9 +145,9 @@ consteval bool isDerivedWorkerResource() {
* @tparam LocalQType Worker local queue type.
* @tparam N Tuple of first N workers.
*/
template <template <class, BasicQueue, std::size_t> class WorkerTemplate,
class GlobalQType,
class LocalQType,
template <template <typename, BasicQueue, std::size_t> class WorkerTemplate,
typename GlobalQType,
BasicQueue LocalQType,
std::size_t N>
struct WorkerCollectiveHelper {
static_assert(N <= GlobalQType::netw_.numWorkers_);
Expand All @@ -156,7 +156,7 @@ struct WorkerCollectiveHelper {
template type<WorkerTemplate<GlobalQType, LocalQType, GlobalQType::netw_.numPorts_[N - 1]> *, Args...>;
};

template <template <class, BasicQueue, std::size_t> class WorkerTemplate, class GlobalQType, class LocalQType>
template <template <typename, BasicQueue, std::size_t> class WorkerTemplate, typename GlobalQType, BasicQueue LocalQType>
struct WorkerCollectiveHelper<WorkerTemplate, GlobalQType, LocalQType, 0> {
template <typename... Args>
using type = std::tuple<Args...>;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ template <typename GlobalQType, BasicQueue LocalQType, std::size_t numPorts>
class FibonacciWorker final : public WorkerResource<GlobalQType, LocalQType, numPorts> {
template <typename, BasicQueue, std::size_t>
friend class FibonacciWorker;
template <typename, QNetwork, template <class, BasicQueue, std::size_t> class, BasicQueue>
template <typename, QNetwork, template <typename, BasicQueue, std::size_t> class, BasicQueue>
friend class SpapQueue;

using BaseT = WorkerResource<GlobalQType, LocalQType, numPorts>;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ template <typename GlobalQType, BasicQueue LocalQType, std::size_t numPorts>
class SSSPWorker final : public WorkerResource<GlobalQType, LocalQType, numPorts> {
template <typename, BasicQueue, std::size_t>
friend class SSSPWorker;
template <typename, QNetwork, template <class, BasicQueue, std::size_t> class, BasicQueue>
template <typename, QNetwork, template <typename, BasicQueue, std::size_t> class, BasicQueue>
friend class SpapQueue;

using BaseT = WorkerResource<GlobalQType, LocalQType, numPorts>;
Expand Down
4 changes: 2 additions & 2 deletions tests/SpapQueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ template <typename GlobalQType, BasicQueue LocalQType, std::size_t numPorts>
class DivisorWorker final : public WorkerResource<GlobalQType, LocalQType, numPorts> {
template <typename, BasicQueue, std::size_t>
friend class DivisorWorker;
template <typename, QNetwork, template <class, BasicQueue, std::size_t> class, BasicQueue>
template <typename, QNetwork, template <typename, BasicQueue, std::size_t> class, BasicQueue>
friend class SpapQueue;

using BaseT = WorkerResource<GlobalQType, LocalQType, numPorts>;
Expand Down Expand Up @@ -92,7 +92,7 @@ template <typename GlobalQType, BasicQueue LocalQType, std::size_t numPorts>
class FibonacciWorker final : public WorkerResource<GlobalQType, LocalQType, numPorts> {
template <typename, BasicQueue, std::size_t>
friend class FibonacciWorker;
template <typename, QNetwork, template <class, BasicQueue, std::size_t> class, BasicQueue>
template <typename, QNetwork, template <typename, BasicQueue, std::size_t> class, BasicQueue>
friend class SpapQueue;

using BaseT = WorkerResource<GlobalQType, LocalQType, numPorts>;
Expand Down