From 777bf3f0d0d7f3e80fcf4871b65bd592c48f0ac5 Mon Sep 17 00:00:00 2001 From: Ritvik Rao Date: Wed, 5 Nov 2025 14:46:26 -0600 Subject: [PATCH 01/15] Support queue registration in reconverse --- src/CMakeLists.txt | 2 +- src/convcore.cpp | 3 +- src/converse_internal.h | 4 +- src/scheduler.cpp | 332 ++++++++++++++------------------------ src/scheduler.h | 22 +++ src/scheduler_helpers.cpp | 63 ++++++++ 6 files changed, 208 insertions(+), 218 deletions(-) create mode 100644 src/scheduler_helpers.cpp diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 4566195..293083d 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -1,6 +1,6 @@ target_include_directories(reconverse PRIVATE .) target_sources(reconverse PRIVATE conv-conds.cpp convcore.cpp random.cpp - scheduler.cpp cpuaffinity.cpp collectives.cpp + scheduler.cpp scheduler_helpers.cpp cpuaffinity.cpp collectives.cpp comm_backend/comm_backend_internal.cpp threads.cpp cldb.rand.cpp cldb.cpp cmirdmautils.cpp conv-rdma.cpp conv-topology.cpp msgmgr.cpp queueing.cpp) target_include_directories( diff --git a/src/convcore.cpp b/src/convcore.cpp index 0f43b21..2dc1abf 100644 --- a/src/convcore.cpp +++ b/src/convcore.cpp @@ -159,10 +159,11 @@ void converseRunPe(int rank) { } void CmiStartThreads() { - // allocate global arrayss + // allocate global arrays Cmi_queues = new ConverseQueue *[Cmi_mynodesize]; CmiHandlerTable = new std::vector *[Cmi_mynodesize]; CmiNodeQueue = new ConverseNodeQueue(); + CmiQueueRegisterInit(); _smp_mutex = CmiCreateLock(); CmiMemLock_lock = CmiCreateLock(); diff --git a/src/converse_internal.h b/src/converse_internal.h index 1d55c5a..4bc3866 100644 --- a/src/converse_internal.h +++ b/src/converse_internal.h @@ -67,9 +67,11 @@ CmiState *CmiGetState(void); void CmiInitState(int pe); ConverseQueue *CmiGetQueue(int pe); void CrnInit(void); - void CmiPushPE(int destPE, int messageSize, void *msg); +//queue reg init +void CmiQueueRegisterInit(void); + // node queue ConverseNodeQueue *CmiGetNodeQueue(); diff --git a/src/scheduler.cpp b/src/scheduler.cpp index 0a7e999..d7cd33a 100644 --- a/src/scheduler.cpp +++ b/src/scheduler.cpp @@ -1,145 +1,129 @@ #include "scheduler.h" -#include "converse.h" -#include "converse_internal.h" -#include "queue.h" -#include -/** - * The main scheduler loop for the Charm++ runtime. - */ -void CsdScheduler() { - // get pthread level queue - ConverseQueue *queue = CmiGetQueue(CmiMyRank()); +extern std::vector g_handlers; //list of handlers +extern Groups g_groups; //groups of handlers by index - // get node level queue - ConverseNodeQueue *nodeQueue = CmiGetNodeQueue(); - - int loop_counter = 0; - - while (CmiStopFlag() == 0) { - - CcdRaiseCondition(CcdSCHEDLOOP); +static inline void releaseIdle() { + if (CmiGetIdle()) { + CmiSetIdle(false); + CcdRaiseCondition(CcdPROCESSOR_END_IDLE); + } +} - // poll node queue - if (!nodeQueue->empty()) { - auto result = nodeQueue->pop(); - if (result) { - void *msg = result.value(); - // process event - CmiHandleMessage(msg); +static inline void setIdle() { + if (!CmiGetIdle()) { + CmiSetIdle(true); + CmiSetIdleTime(CmiWallTimer()); + CcdRaiseCondition(CcdPROCESSOR_BEGIN_IDLE); + } + // if already idle, call still idle and (maybe) long idle + else { + CcdRaiseCondition(CcdPROCESSOR_STILL_IDLE); + if (CmiWallTimer() - CmiGetIdleTime() > 10.0) { + CcdRaiseCondition(CcdPROCESSOR_LONG_IDLE); + } + } +} - // release idle if necessary - if (CmiGetIdle()) { - CmiSetIdle(false); - CcdRaiseCondition(CcdPROCESSOR_END_IDLE); - } - } +//poll converse-level node queue +bool pollConverseNodeQueue() { + ConverseNodeQueue *nodeQueue = CmiGetNodeQueue(); + if (!nodeQueue->empty()) { + auto result = nodeQueue->pop(); + if (result) { + void *msg = result.value(); + // process event + CmiHandleMessage(msg); + releaseIdle(); + return true; } + } + return false; +} - // poll thread queue - else if (!queue->empty()) { - // get next event (guaranteed to be there because only single consumer) - void *msg = queue->pop().value(); +//poll converse-level thread queue +bool pollConverseThreadQueue() { + ConverseQueue *queue = CmiGetQueue(CmiMyRank()); + if (!queue->empty()) { + // get next event (guaranteed to be there because only single consumer) + void *msg = queue->pop().value(); + // process event + CmiHandleMessage(msg); + releaseIdle(); + return true; + } + return false; +} +//poll node priority queue +bool pollNodePrioQueue() { + // Try to acquire lock without blocking + if (CmiTryLock(CsvAccess(CsdNodeQueueLock)) == 0) { + if (!QueueEmpty(CsvAccess(CsdNodeQueue))) { + void *msg = QueueTop(CsvAccess(CsdNodeQueue)); + QueuePop(CsvAccess(CsdNodeQueue)); + CmiUnlock(CsvAccess(CsdNodeQueueLock)); // process event CmiHandleMessage(msg); - - // release idle if necessary - if (CmiGetIdle()) { - CmiSetIdle(false); - CcdRaiseCondition(CcdPROCESSOR_END_IDLE); - } + releaseIdle(); + return true; + } else { + CmiUnlock(CsvAccess(CsdNodeQueueLock)); } + } + return false; +} - // poll node prio queue - else { - // Try to acquire lock without blocking - if (CmiTryLock(CsvAccess(CsdNodeQueueLock)) == 0) { - if (!QueueEmpty(CsvAccess(CsdNodeQueue))) { - void* msg = QueueTop(CsvAccess(CsdNodeQueue)); - QueuePop(CsvAccess(CsdNodeQueue)); - CmiUnlock(CsvAccess(CsdNodeQueueLock)); - // process event - CmiHandleMessage(msg); +//poll thread priority queue +bool pollThreadPrioQueue() { + if (!QueueEmpty(CpvAccess(CsdSchedQueue))) { + void *msg = QueueTop(CpvAccess(CsdSchedQueue)); + QueuePop(CpvAccess(CsdSchedQueue)); + // process event + CmiHandleMessage(msg); + releaseIdle(); + return true; + } + return false; +} - // release idle if necessary - if (CmiGetIdle()) { - CmiSetIdle(false); - CcdRaiseCondition(CcdPROCESSOR_END_IDLE); - } - } - else { - CmiUnlock(CsvAccess(CsdNodeQueueLock)); - //empty queue so check thread prio queue - if (!QueueEmpty(CpvAccess(CsdSchedQueue))) { - void *msg = QueueTop(CpvAccess(CsdSchedQueue)); - QueuePop(CpvAccess(CsdSchedQueue)); +bool pollProgress() +{ + if(CmiMyRank() % backend_poll_thread == 0) return comm_backend::progress(); + return false; +} - // process event - CmiHandleMessage(msg); +//will add queue polling functions +//called at node level (before threads created) +void CmiQueueRegisterInit() { + add_handler(pollConverseNodeQueue, 8); + add_handler(pollConverseThreadQueue, 1); + add_handler(pollNodePrioQueue, 16); + add_handler(pollThreadPrioQueue, 1); + add_handler(pollProgress, backend_poll_freq); +} - // release idle if necessary - if (CmiGetIdle()) { - CmiSetIdle(false); - CcdRaiseCondition(CcdPROCESSOR_END_IDLE); - } - } else { - // the processor is idle - // if not already idle, set idle and raise condition - if (!CmiGetIdle()) { - CmiSetIdle(true); - CmiSetIdleTime(CmiWallTimer()); - CcdRaiseCondition(CcdPROCESSOR_BEGIN_IDLE); - } - // if already idle, call still idle and (maybe) long idle - else { - CcdRaiseCondition(CcdPROCESSOR_STILL_IDLE); - if (CmiWallTimer() - CmiGetIdleTime() > 10.0) { - CcdRaiseCondition(CcdPROCESSOR_LONG_IDLE); - } - } - } - } - } - else { - // Could not acquire node queue lock, skip to thread prio queue - if (!QueueEmpty(CpvAccess(CsdSchedQueue))) { - void *msg = QueueTop(CpvAccess(CsdSchedQueue)); - QueuePop(CpvAccess(CsdSchedQueue)); +/** + * The main scheduler loop for the Charm++ runtime. + */ +void CsdScheduler() { - // process event - CmiHandleMessage(msg); + uint64_t loop_counter = 0; - // release idle if necessary - if (CmiGetIdle()) { - CmiSetIdle(false); - CcdRaiseCondition(CcdPROCESSOR_END_IDLE); - } - } else { - // the processor is idle - // if not already idle, set idle and raise condition - if (!CmiGetIdle()) { - CmiSetIdle(true); - CmiSetIdleTime(CmiWallTimer()); - CcdRaiseCondition(CcdPROCESSOR_BEGIN_IDLE); - } - // if already idle, call still idle and (maybe) long idle - else { - CcdRaiseCondition(CcdPROCESSOR_STILL_IDLE); - if (CmiWallTimer() - CmiGetIdleTime() > 10.0) { - CcdRaiseCondition(CcdPROCESSOR_LONG_IDLE); - } - } - } - } + while (CmiStopFlag() == 0) { + + CcdRaiseCondition(CcdSCHEDLOOP); + //poll queues + unsigned idx = static_cast(loop_counter & 63ULL); + bool workDone = false; + for (auto fn : g_groups[idx]) { + workDone |= fn(); } - if((CmiMyRank() % backend_poll_thread == 0) && (loop_counter++ == (backend_poll_freq - 1))) - { - loop_counter = 0; - comm_backend::progress(); + if(!workDone) { + setIdle(); } - CcdCallBacks(); + loop_counter++; } } @@ -149,106 +133,24 @@ void CsdScheduler() { * are empty, not when the scheduler is stopped. */ void CsdSchedulePoll() { - // get pthread level queue - ConverseQueue *queue = CmiGetQueue(CmiMyRank()); - - // get node level queue - ConverseNodeQueue *nodeQueue = CmiGetNodeQueue(); + uint64_t loop_counter = 0; while(1){ CcdCallBacks(); - CcdRaiseCondition(CcdSCHEDLOOP); - - // poll node queue - if (!nodeQueue->empty()) { - auto result = nodeQueue->pop(); - if (result) { - void *msg = result.value(); - // process event - CmiHandleMessage(msg); - - // release idle if necessary - if (CmiGetIdle()) { - CmiSetIdle(false); - CcdRaiseCondition(CcdPROCESSOR_END_IDLE); - } - } + //poll queues + unsigned idx = static_cast(loop_counter & 63ULL); + bool workDone = false; + for (auto fn : g_groups[idx]) { + workDone |= fn(); } - - // poll thread queue - else if (!queue->empty()) { - // get next event (guaranteed to be there because only single consumer) - void *msg = queue->pop().value(); - - // process event - CmiHandleMessage(msg); - - // release idle if necessary - if (CmiGetIdle()) { - CmiSetIdle(false); - CcdRaiseCondition(CcdPROCESSOR_END_IDLE); - } - } - - // poll node prio queue - else { - // Try to acquire lock without blocking - if (CmiTryLock(CsvAccess(CsdNodeQueueLock)) == 0) { - if (!QueueEmpty(CsvAccess(CsdNodeQueue))) { - void *msg = QueueTop(CsvAccess(CsdNodeQueue)); - QueuePop(CsvAccess(CsdNodeQueue)); - CmiUnlock(CsvAccess(CsdNodeQueueLock)); - // process event - CmiHandleMessage(msg); - - // release idle if necessary - if (CmiGetIdle()) { - CmiSetIdle(false); - CcdRaiseCondition(CcdPROCESSOR_END_IDLE); - } - } - else { - CmiUnlock(CsvAccess(CsdNodeQueueLock)); - if (!QueueEmpty(CpvAccess(CsdSchedQueue))) { - void *msg = QueueTop(CpvAccess(CsdSchedQueue)); - QueuePop(CpvAccess(CsdSchedQueue)); - - // process event - CmiHandleMessage(msg); - - // release idle if necessary - if (CmiGetIdle()) { - CmiSetIdle(false); - CcdRaiseCondition(CcdPROCESSOR_END_IDLE); - } - } else { - comm_backend::progress(); - break; //break when queues are empty - } - } - } - else { - // Could not acquire node queue lock, skip to thread prio queue - if (!QueueEmpty(CpvAccess(CsdSchedQueue))) { - void *msg = QueueTop(CpvAccess(CsdSchedQueue)); - QueuePop(CpvAccess(CsdSchedQueue)); - - // process event - CmiHandleMessage(msg); - - // release idle if necessary - if (CmiGetIdle()) { - CmiSetIdle(false); - CcdRaiseCondition(CcdPROCESSOR_END_IDLE); - } - } else { - comm_backend::progress(); - break; //break when queues are empty - } - } + if(!workDone) { + setIdle(); + break; } + loop_counter++; + } } diff --git a/src/scheduler.h b/src/scheduler.h index a311734..33b878d 100644 --- a/src/scheduler.h +++ b/src/scheduler.h @@ -1,2 +1,24 @@ +#ifndef _SCHEDULER_H_ +#define _SCHEDULER_H_ +#include "converse.h" +#include "converse_internal.h" +#include "queue.h" +#include +#include +#include + +using QueuePollHandlerFn = bool(*)(void); //we need a return value to indicate if work was done + +struct QueuePollHandler { + QueuePollHandlerFn fn; + uint64_t mask{0}; // 64-bit mask: bit i == call at loop index i (0..63) + unsigned period{0}; // 1..64, 0 => disabled + unsigned phase{0}; +}; + +using Groups = std::array, 64>; + +void add_handler(QueuePollHandlerFn fn, unsigned period, unsigned phase = 0); void CsdScheduler(); +#endif diff --git a/src/scheduler_helpers.cpp b/src/scheduler_helpers.cpp new file mode 100644 index 0000000..2dea2b9 --- /dev/null +++ b/src/scheduler_helpers.cpp @@ -0,0 +1,63 @@ +#include "scheduler.h" + +std::vector g_handlers; //list of handlers +Groups g_groups; //groups of handlers by index + +// Build a 64-bit mask for a period n (1..64) with optional phase (0..n-1) +inline uint64_t make_mask_every_n(unsigned n, unsigned phase = 0) { + if (n == 0) return 0ULL; + if (n == 1) return ~0ULL; + if (n > 64) n = 64; // clamp to 64 + uint64_t mask = 0ULL; + for (unsigned pos = 0; pos < 64; ++pos) { + if (((pos + phase) % n) == 0) mask |= (1ULL << pos); + } + return mask; +} + +// Rebuild groups from current handler masks (in-place). +// Single-threaded callers may call this whenever a handler mask changes. +inline void rebuild_groups() { + // Clear all groups + for (auto &v : g_groups) v.clear(); + + // Populate groups from each handler's mask + for (const auto &h : g_handlers) { + uint64_t m = h.mask; + if (m == 0) continue; + for (unsigned bit = 0; bit < 64; ++bit) { + if ((m >> bit) & 1ULL) { + g_groups[bit].push_back(h.fn); + } + } + } +} + +// Set handler period and phase (period: 1..64, 0 disables). +// Rebuilds groups immediately (cheap relative to hot path). +inline void set_frequency(size_t handlerIndex, unsigned period, unsigned phase = 0) { + if (handlerIndex >= g_handlers.size()) return; + QueuePollHandler &h = g_handlers[handlerIndex]; + + if (period == 0) { + h.period = 0; + h.phase = 0; + h.mask = 0ULL; + } else { + if (period > 64) period = 64; + h.period = period; + h.phase = phase % period; + h.mask = make_mask_every_n(h.period, h.phase); + } + rebuild_groups(); +} + +// Add a handler that will poll a queue at given frequency. +void add_handler(QueuePollHandlerFn fn, unsigned period, unsigned phase) +{ + g_handlers.push_back({fn}); + size_t index = g_handlers.size() - 1; + set_frequency(index, period, phase); +} + + From d5839f337703f148658c4b1f4b885cf653f25a53 Mon Sep 17 00:00:00 2001 From: Ritvik Rao Date: Wed, 5 Nov 2025 15:00:24 -0600 Subject: [PATCH 02/15] polling progress doesn't count --- src/scheduler.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/scheduler.cpp b/src/scheduler.cpp index d7cd33a..151efa2 100644 --- a/src/scheduler.cpp +++ b/src/scheduler.cpp @@ -89,8 +89,8 @@ bool pollThreadPrioQueue() { bool pollProgress() { - if(CmiMyRank() % backend_poll_thread == 0) return comm_backend::progress(); - return false; + if(CmiMyRank() % backend_poll_thread == 0) comm_backend::progress(); + return false; //polling progress doesn't count } //will add queue polling functions From f5aad458e9fdb7a343eb661b68ce3fd4de098d06 Mon Sep 17 00:00:00 2001 From: Ritvik Rao Date: Fri, 5 Dec 2025 08:13:42 -0600 Subject: [PATCH 03/15] new way to add handlers --- src/scheduler.h | 7 +++++++ src/scheduler_helpers.cpp | 35 ++++++++++++++++++++++++++++++++++- 2 files changed, 41 insertions(+), 1 deletion(-) diff --git a/src/scheduler.h b/src/scheduler.h index 33b878d..52ec57b 100644 --- a/src/scheduler.h +++ b/src/scheduler.h @@ -20,5 +20,12 @@ using Groups = std::array, 64>; void add_handler(QueuePollHandlerFn fn, unsigned period, unsigned phase = 0); +// Add multiple handlers at once +// pairs of poll handlers and relative frequencies (will be normalized regardless of actual value) +// (frequency/total)*64 +// example: if the frequencies are 8, 1, 16, 1, 4, then they are added up to 30, then normalized to 17, 2, 34, 2, 9 +// then assign to slots based on these normalized values +void add_list_of_handlers(const std::vector>& handlers); + void CsdScheduler(); #endif diff --git a/src/scheduler_helpers.cpp b/src/scheduler_helpers.cpp index 2dea2b9..dec258b 100644 --- a/src/scheduler_helpers.cpp +++ b/src/scheduler_helpers.cpp @@ -2,6 +2,8 @@ std::vector g_handlers; //list of handlers Groups g_groups; //groups of handlers by index +QueuePollHandlerFn *poll_handlers; // fixed size array +#define ARRAY_SIZE 64 // Build a 64-bit mask for a period n (1..64) with optional phase (0..n-1) inline uint64_t make_mask_every_n(unsigned n, unsigned phase = 0) { @@ -60,4 +62,35 @@ void add_handler(QueuePollHandlerFn fn, unsigned period, unsigned phase) set_frequency(index, period, phase); } - +void add_list_of_handlers(const std::vector>& handlers){ + // total frequency + unsigned int total = 0; + for(const auto& handler : handlers){ + total += handler.second; + } + if(total == 0) return; // nothing to add + // loop through handlers and add them to the table + // spread out based on normalized frequency + poll_handlers = new QueuePollHandlerFn[ARRAY_SIZE]; + unsigned int current_index = 0; + for(const auto& handler : handlers){ + unsigned int freq = handler.second; + unsigned int normalized = (freq * ARRAY_SIZE) / total; //estimate of how many slots this handler should take + if(normalized == 0) normalized = 1; // at least once + // go through loop and find empty slots + // spread out as evenly as possible + unsigned int remaining = normalized; + unsigned int step = ARRAY_SIZE / normalized; + unsigned int index = current_index; + while(remaining > 0){ + //find next empty slot + while(poll_handlers[index] != nullptr){ + index = (index + 1) % ARRAY_SIZE; + } + poll_handlers[index] = handler.first; + remaining--; + index = (index + step) % ARRAY_SIZE; + } + current_index = (current_index + 1) % ARRAY_SIZE; + } +} From 0cd7e0ac3db1420d9e6b69073b6497db45d03695 Mon Sep 17 00:00:00 2001 From: Ritvik Rao Date: Mon, 8 Dec 2025 12:28:09 -0600 Subject: [PATCH 04/15] properly spread out handlers across table --- src/scheduler.cpp | 7 +++++++ src/scheduler.h | 1 + src/scheduler_helpers.cpp | 13 +++++++++++-- 3 files changed, 19 insertions(+), 2 deletions(-) diff --git a/src/scheduler.cpp b/src/scheduler.cpp index b57d02c..0e30eb1 100644 --- a/src/scheduler.cpp +++ b/src/scheduler.cpp @@ -96,6 +96,13 @@ bool pollProgress() //will add queue polling functions //called at node level (before threads created) void CmiQueueRegisterInit() { + std::vector> handlers; + handlers.push_back(std::make_pair(pollConverseNodeQueue, 65)); + handlers.push_back(std::make_pair(pollConverseThreadQueue, 87)); + handlers.push_back(std::make_pair(pollNodePrioQueue, 37)); + handlers.push_back(std::make_pair(pollThreadPrioQueue, 83)); + handlers.push_back(std::make_pair(pollProgress, 67)); + add_list_of_handlers(handlers); add_handler(pollConverseNodeQueue, 8); add_handler(pollConverseThreadQueue, 1); add_handler(pollNodePrioQueue, 16); diff --git a/src/scheduler.h b/src/scheduler.h index 52ec57b..04b4bec 100644 --- a/src/scheduler.h +++ b/src/scheduler.h @@ -6,6 +6,7 @@ #include #include #include +#include using QueuePollHandlerFn = bool(*)(void); //we need a return value to indicate if work was done diff --git a/src/scheduler_helpers.cpp b/src/scheduler_helpers.cpp index dec258b..4b74ef5 100644 --- a/src/scheduler_helpers.cpp +++ b/src/scheduler_helpers.cpp @@ -72,10 +72,13 @@ void add_list_of_handlers(const std::vector(total)); //estimate of how many slots this handler should take + //CmiPrintf("Handler %d frequency %u normalized to %ld\n", handler_index, freq, normalized); if(normalized == 0) normalized = 1; // at least once // go through loop and find empty slots // spread out as evenly as possible @@ -84,13 +87,19 @@ void add_list_of_handlers(const std::vector 0){ //find next empty slot + if(total_assigned >= ARRAY_SIZE){ + break; // all slots assigned + } while(poll_handlers[index] != nullptr){ index = (index + 1) % ARRAY_SIZE; } poll_handlers[index] = handler.first; + total_assigned++; + //CmiPrintf("Adding handler %d at index %d\n", handler_index, index); remaining--; index = (index + step) % ARRAY_SIZE; } current_index = (current_index + 1) % ARRAY_SIZE; + handler_index++; } } From ff6f1e1c3d57c4c96d9cb7b4b8ea2dba9b542361 Mon Sep 17 00:00:00 2001 From: Ritvik Rao Date: Mon, 8 Dec 2025 15:37:19 -0600 Subject: [PATCH 05/15] print to debug performance --- src/scheduler.cpp | 10 +++++----- src/scheduler_helpers.cpp | 4 ++-- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/src/scheduler.cpp b/src/scheduler.cpp index 0e30eb1..be092ac 100644 --- a/src/scheduler.cpp +++ b/src/scheduler.cpp @@ -97,11 +97,11 @@ bool pollProgress() //called at node level (before threads created) void CmiQueueRegisterInit() { std::vector> handlers; - handlers.push_back(std::make_pair(pollConverseNodeQueue, 65)); - handlers.push_back(std::make_pair(pollConverseThreadQueue, 87)); - handlers.push_back(std::make_pair(pollNodePrioQueue, 37)); - handlers.push_back(std::make_pair(pollThreadPrioQueue, 83)); - handlers.push_back(std::make_pair(pollProgress, 67)); + handlers.push_back(std::make_pair(pollConverseNodeQueue, 8)); + handlers.push_back(std::make_pair(pollConverseThreadQueue, 1)); + handlers.push_back(std::make_pair(pollNodePrioQueue, 16)); + handlers.push_back(std::make_pair(pollThreadPrioQueue, 1)); + handlers.push_back(std::make_pair(pollProgress, 1)); add_list_of_handlers(handlers); add_handler(pollConverseNodeQueue, 8); add_handler(pollConverseThreadQueue, 1); diff --git a/src/scheduler_helpers.cpp b/src/scheduler_helpers.cpp index 4b74ef5..b3d255f 100644 --- a/src/scheduler_helpers.cpp +++ b/src/scheduler_helpers.cpp @@ -78,7 +78,7 @@ void add_list_of_handlers(const std::vector(total)); //estimate of how many slots this handler should take - //CmiPrintf("Handler %d frequency %u normalized to %ld\n", handler_index, freq, normalized); + CmiPrintf("Handler %d frequency %u normalized to %ld\n", handler_index, freq, normalized); if(normalized == 0) normalized = 1; // at least once // go through loop and find empty slots // spread out as evenly as possible @@ -95,7 +95,7 @@ void add_list_of_handlers(const std::vector Date: Mon, 8 Dec 2025 15:51:32 -0600 Subject: [PATCH 06/15] debug --- src/scheduler_helpers.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/scheduler_helpers.cpp b/src/scheduler_helpers.cpp index b3d255f..dab96f6 100644 --- a/src/scheduler_helpers.cpp +++ b/src/scheduler_helpers.cpp @@ -78,7 +78,7 @@ void add_list_of_handlers(const std::vector(total)); //estimate of how many slots this handler should take - CmiPrintf("Handler %d frequency %u normalized to %ld\n", handler_index, freq, normalized); + CmiPrintf("[process %d] Handler %d frequency %u normalized to %ld\n", CmiMyRank(), handler_index, freq, normalized); if(normalized == 0) normalized = 1; // at least once // go through loop and find empty slots // spread out as evenly as possible @@ -95,7 +95,7 @@ void add_list_of_handlers(const std::vector Date: Mon, 8 Dec 2025 16:18:34 -0600 Subject: [PATCH 07/15] test pe-local registration --- src/convcore.cpp | 2 ++ src/converse_internal.h | 1 + src/scheduler.cpp | 9 ++++++--- src/scheduler_helpers.cpp | 16 ++++++++++------ 4 files changed, 19 insertions(+), 9 deletions(-) diff --git a/src/convcore.cpp b/src/convcore.cpp index 5c84ed8..e4988ff 100644 --- a/src/convcore.cpp +++ b/src/convcore.cpp @@ -126,6 +126,8 @@ void converseRunPe(int rank, int everReturn) { // init comm_backend comm_backend::initThread(rank, CmiMyNodeSize()); + CmiQueueRegisterInitThread(); + // init things like cld module, ccs, etc CldModuleInit(CmiMyArgv); diff --git a/src/converse_internal.h b/src/converse_internal.h index 093783f..afe6c14 100644 --- a/src/converse_internal.h +++ b/src/converse_internal.h @@ -71,6 +71,7 @@ void CmiPushPE(int destPE, int messageSize, void *msg); //queue reg init void CmiQueueRegisterInit(void); +void CmiQueueRegisterInitThread(void); // node queue ConverseNodeQueue *CmiGetNodeQueue(); diff --git a/src/scheduler.cpp b/src/scheduler.cpp index be092ac..b65b4eb 100644 --- a/src/scheduler.cpp +++ b/src/scheduler.cpp @@ -93,9 +93,7 @@ bool pollProgress() return false; //polling progress doesn't count } -//will add queue polling functions -//called at node level (before threads created) -void CmiQueueRegisterInit() { +void CmiQueueRegisterInitThread() { std::vector> handlers; handlers.push_back(std::make_pair(pollConverseNodeQueue, 8)); handlers.push_back(std::make_pair(pollConverseThreadQueue, 1)); @@ -103,6 +101,11 @@ void CmiQueueRegisterInit() { handlers.push_back(std::make_pair(pollThreadPrioQueue, 1)); handlers.push_back(std::make_pair(pollProgress, 1)); add_list_of_handlers(handlers); +} + +//will add queue polling functions +//called at node level (before threads created) +void CmiQueueRegisterInit() { add_handler(pollConverseNodeQueue, 8); add_handler(pollConverseThreadQueue, 1); add_handler(pollNodePrioQueue, 16); diff --git a/src/scheduler_helpers.cpp b/src/scheduler_helpers.cpp index dab96f6..e1a66e9 100644 --- a/src/scheduler_helpers.cpp +++ b/src/scheduler_helpers.cpp @@ -2,7 +2,8 @@ std::vector g_handlers; //list of handlers Groups g_groups; //groups of handlers by index -QueuePollHandlerFn *poll_handlers; // fixed size array +CpvDeclare(QueuePollHandlerFn *, poll_handlers); +//QueuePollHandlerFn *poll_handlers; // fixed size array #define ARRAY_SIZE 64 // Build a 64-bit mask for a period n (1..64) with optional phase (0..n-1) @@ -71,14 +72,16 @@ void add_list_of_handlers(const std::vector(total)); //estimate of how many slots this handler should take - CmiPrintf("[process %d] Handler %d frequency %u normalized to %ld\n", CmiMyRank(), handler_index, freq, normalized); + CmiPrintf("Handler %d frequency %u normalized to %ld\n", handler_index, freq, normalized); if(normalized == 0) normalized = 1; // at least once // go through loop and find empty slots // spread out as evenly as possible @@ -90,12 +93,13 @@ void add_list_of_handlers(const std::vector= ARRAY_SIZE){ break; // all slots assigned } - while(poll_handlers[index] != nullptr){ + while(CpvAccess(poll_handlers)[index] != nullptr){ index = (index + 1) % ARRAY_SIZE; } - poll_handlers[index] = handler.first; + CpvAccess(poll_handlers)[index] = handler.first; + //poll_handlers[index] = handler.first; total_assigned++; - CmiPrintf("[process %d] Adding handler %d at index %d\n", CmiMyRank(), handler_index, index); + CmiPrintf("Adding handler %d at index %d\n", handler_index, index); remaining--; index = (index + step) % ARRAY_SIZE; } From 53865c4d069c675f62860f444b7896d65e331b92 Mon Sep 17 00:00:00 2001 From: Ritvik Rao Date: Mon, 8 Dec 2025 16:22:03 -0600 Subject: [PATCH 08/15] comment out --- src/convcore.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/convcore.cpp b/src/convcore.cpp index e4988ff..4066a65 100644 --- a/src/convcore.cpp +++ b/src/convcore.cpp @@ -126,7 +126,7 @@ void converseRunPe(int rank, int everReturn) { // init comm_backend comm_backend::initThread(rank, CmiMyNodeSize()); - CmiQueueRegisterInitThread(); + //CmiQueueRegisterInitThread(); // init things like cld module, ccs, etc CldModuleInit(CmiMyArgv); From e388730141357bac250e418285dfc98d48817ff0 Mon Sep 17 00:00:00 2001 From: Ritvik Rao Date: Mon, 8 Dec 2025 16:25:26 -0600 Subject: [PATCH 09/15] change comment --- src/convcore.cpp | 2 +- src/scheduler.cpp | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/convcore.cpp b/src/convcore.cpp index 4066a65..e4988ff 100644 --- a/src/convcore.cpp +++ b/src/convcore.cpp @@ -126,7 +126,7 @@ void converseRunPe(int rank, int everReturn) { // init comm_backend comm_backend::initThread(rank, CmiMyNodeSize()); - //CmiQueueRegisterInitThread(); + CmiQueueRegisterInitThread(); // init things like cld module, ccs, etc CldModuleInit(CmiMyArgv); diff --git a/src/scheduler.cpp b/src/scheduler.cpp index b65b4eb..9901ff1 100644 --- a/src/scheduler.cpp +++ b/src/scheduler.cpp @@ -100,7 +100,7 @@ void CmiQueueRegisterInitThread() { handlers.push_back(std::make_pair(pollNodePrioQueue, 16)); handlers.push_back(std::make_pair(pollThreadPrioQueue, 1)); handlers.push_back(std::make_pair(pollProgress, 1)); - add_list_of_handlers(handlers); + //add_list_of_handlers(handlers); } //will add queue polling functions From 0ed4f5352d38f076e287dfdd110dba48d1519278 Mon Sep 17 00:00:00 2001 From: Ritvik Rao Date: Tue, 9 Dec 2025 16:20:04 -0600 Subject: [PATCH 10/15] change comment --- src/scheduler.cpp | 2 +- src/scheduler_helpers.cpp | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/src/scheduler.cpp b/src/scheduler.cpp index 9901ff1..b65b4eb 100644 --- a/src/scheduler.cpp +++ b/src/scheduler.cpp @@ -100,7 +100,7 @@ void CmiQueueRegisterInitThread() { handlers.push_back(std::make_pair(pollNodePrioQueue, 16)); handlers.push_back(std::make_pair(pollThreadPrioQueue, 1)); handlers.push_back(std::make_pair(pollProgress, 1)); - //add_list_of_handlers(handlers); + add_list_of_handlers(handlers); } //will add queue polling functions diff --git a/src/scheduler_helpers.cpp b/src/scheduler_helpers.cpp index e1a66e9..29f82cb 100644 --- a/src/scheduler_helpers.cpp +++ b/src/scheduler_helpers.cpp @@ -88,6 +88,7 @@ void add_list_of_handlers(const std::vector 0){ //find next empty slot if(total_assigned >= ARRAY_SIZE){ @@ -103,6 +104,7 @@ void add_list_of_handlers(const std::vector Date: Tue, 9 Dec 2025 16:34:02 -0600 Subject: [PATCH 11/15] separate assigned array --- src/scheduler_helpers.cpp | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/src/scheduler_helpers.cpp b/src/scheduler_helpers.cpp index 29f82cb..737333f 100644 --- a/src/scheduler_helpers.cpp +++ b/src/scheduler_helpers.cpp @@ -3,6 +3,7 @@ std::vector g_handlers; //list of handlers Groups g_groups; //groups of handlers by index CpvDeclare(QueuePollHandlerFn *, poll_handlers); +CpvDeclare(int*, poll_handler_assigned); //QueuePollHandlerFn *poll_handlers; // fixed size array #define ARRAY_SIZE 64 @@ -74,6 +75,11 @@ void add_list_of_handlers(const std::vector 0){ //find next empty slot if(total_assigned >= ARRAY_SIZE){ break; // all slots assigned } - while(CpvAccess(poll_handlers)[index] != nullptr){ + while(CpvAccess(poll_handler_assigned)[index] == 0){ index = (index + 1) % ARRAY_SIZE; } CpvAccess(poll_handlers)[index] = handler.first; + CpvAccess(poll_handler_assigned)[index] = 1; //poll_handlers[index] = handler.first; total_assigned++; CmiPrintf("Adding handler %d at index %d\n", handler_index, index); remaining--; index = (index + step) % ARRAY_SIZE; } - */ current_index = (current_index + 1) % ARRAY_SIZE; handler_index++; } From a7763f0eab255357b10b03b1b4bd630ba52ab478 Mon Sep 17 00:00:00 2001 From: Ritvik Rao Date: Wed, 10 Dec 2025 09:35:22 -0600 Subject: [PATCH 12/15] test not assigning --- src/scheduler_helpers.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/scheduler_helpers.cpp b/src/scheduler_helpers.cpp index 737333f..09554ea 100644 --- a/src/scheduler_helpers.cpp +++ b/src/scheduler_helpers.cpp @@ -102,7 +102,7 @@ void add_list_of_handlers(const std::vector Date: Wed, 10 Dec 2025 10:41:13 -0600 Subject: [PATCH 13/15] fix bug --- src/scheduler_helpers.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/scheduler_helpers.cpp b/src/scheduler_helpers.cpp index 09554ea..ec2f6d9 100644 --- a/src/scheduler_helpers.cpp +++ b/src/scheduler_helpers.cpp @@ -99,7 +99,7 @@ void add_list_of_handlers(const std::vector= ARRAY_SIZE){ break; // all slots assigned } - while(CpvAccess(poll_handler_assigned)[index] == 0){ + while(CpvAccess(poll_handler_assigned)[index] != 0){ index = (index + 1) % ARRAY_SIZE; } //CpvAccess(poll_handlers)[index] = handler.first; From e4bded5ba704cb476ad045212ab5bc3397f0cda7 Mon Sep 17 00:00:00 2001 From: Ritvik Rao Date: Wed, 10 Dec 2025 10:49:09 -0600 Subject: [PATCH 14/15] add back assignment to table --- src/scheduler_helpers.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/scheduler_helpers.cpp b/src/scheduler_helpers.cpp index ec2f6d9..bb45502 100644 --- a/src/scheduler_helpers.cpp +++ b/src/scheduler_helpers.cpp @@ -102,7 +102,7 @@ void add_list_of_handlers(const std::vector Date: Wed, 10 Dec 2025 10:53:56 -0600 Subject: [PATCH 15/15] turn off prints --- src/scheduler_helpers.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/scheduler_helpers.cpp b/src/scheduler_helpers.cpp index bb45502..5ccace5 100644 --- a/src/scheduler_helpers.cpp +++ b/src/scheduler_helpers.cpp @@ -87,7 +87,7 @@ void add_list_of_handlers(const std::vector(total)); //estimate of how many slots this handler should take - CmiPrintf("Handler %d frequency %u normalized to %ld\n", handler_index, freq, normalized); + //CmiPrintf("Handler %d frequency %u normalized to %ld\n", handler_index, freq, normalized); if(normalized == 0) normalized = 1; // at least once // go through loop and find empty slots // spread out as evenly as possible @@ -106,7 +106,7 @@ void add_list_of_handlers(const std::vector