Skip to content

Commit 1a7b200

Browse files
revert to list_iterate if task priority has been set
1 parent a6ea721 commit 1a7b200

File tree

1 file changed

+85
-36
lines changed

1 file changed

+85
-36
lines changed

work_queue/src/work_queue.c

Lines changed: 85 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,7 @@ struct work_queue {
205205

206206
int wait_for_workers; /* wait for these many workers before dispatching tasks at start of execution. */
207207
int attempt_schedule_depth; /* number of submitted tasks to attempt scheduling before we continue to retrievals */
208+
int ht_dispatch; /* internal flag determining whether to iterate the whole ready list or rotate through attempt_schedule_depth */
208209

209210
work_queue_category_mode_t allocation_default_mode;
210211

@@ -2000,22 +2001,39 @@ static int expire_waiting_tasks(struct work_queue *q)
20002001

20012002
int tasks_considered = 0;
20022003
double current_time = timestamp_get() / ONE_SECOND;
2003-
while( (t = list_rotate(q->ready_list)) ) {
2004-
if(tasks_considered > q->attempt_schedule_depth) {
2005-
return expired;
2006-
}
2007-
if(t->resources_requested->end > 0 && t->resources_requested->end <= current_time) {
2008-
update_task_result(t, WORK_QUEUE_RESULT_TASK_TIMEOUT);
2009-
change_task_state(q, t, WORK_QUEUE_TASK_RETRIEVED);
2010-
expired++;
2011-
list_pop_tail(q->ready_list);
2012-
} else if(t->max_retries > 0 && t->try_count > t->max_retries) {
2013-
update_task_result(t, WORK_QUEUE_RESULT_MAX_RETRIES);
2014-
change_task_state(q, t, WORK_QUEUE_TASK_RETRIEVED);
2015-
expired++;
2016-
list_pop_tail(q->ready_list);
2004+
if(q->ht_dispatch) {
2005+
while( (t = list_rotate(q->ready_list)) ) {
2006+
if(tasks_considered > q->attempt_schedule_depth) {
2007+
return expired;
2008+
}
2009+
if(t->resources_requested->end > 0 && t->resources_requested->end <= current_time) {
2010+
update_task_result(t, WORK_QUEUE_RESULT_TASK_TIMEOUT);
2011+
change_task_state(q, t, WORK_QUEUE_TASK_RETRIEVED);
2012+
expired++;
2013+
list_pop_tail(q->ready_list);
2014+
} else if(t->max_retries > 0 && t->try_count > t->max_retries) {
2015+
update_task_result(t, WORK_QUEUE_RESULT_MAX_RETRIES);
2016+
change_task_state(q, t, WORK_QUEUE_TASK_RETRIEVED);
2017+
expired++;
2018+
list_pop_tail(q->ready_list);
2019+
}
2020+
tasks_considered++;
2021+
}
2022+
} else {
2023+
list_first_item(q->ready_list);
2024+
while( (t = list_next_item(q->ready_list))) {
2025+
if(t->resources_requested->end > 0 && t->resources_requested->end <= current_time) {
2026+
update_task_result(t, WORK_QUEUE_RESULT_TASK_TIMEOUT);
2027+
change_task_state(q, t, WORK_QUEUE_TASK_RETRIEVED);
2028+
expired++;
2029+
list_pop_tail(q->ready_list);
2030+
} else if(t->max_retries > 0 && t->try_count > t->max_retries) {
2031+
update_task_result(t, WORK_QUEUE_RESULT_MAX_RETRIES);
2032+
change_task_state(q, t, WORK_QUEUE_TASK_RETRIEVED);
2033+
expired++;
2034+
list_pop_tail(q->ready_list);
2035+
}
20172036
}
2018-
tasks_considered++;
20192037
}
20202038
return expired;
20212039
}
@@ -4589,32 +4607,59 @@ static int send_one_task( struct work_queue *q )
45894607
int tasks_considered = 0;
45904608
timestamp_t now = timestamp_get();
45914609

4592-
while( (t = list_rotate(q->ready_list)) ) {
4593-
if(tasks_considered++ > q->attempt_schedule_depth) {
4594-
return 0;
4595-
}
4610+
if(q->ht_dispatch){
4611+
while( (t = list_rotate(q->ready_list)) ) {
4612+
if(tasks_considered++ > q->attempt_schedule_depth) {
4613+
return 0;
4614+
}
45964615

4597-
// Skip task if min requested start time not met.
4598-
if(t->resources_requested->start > now) {
4599-
continue;
4600-
}
4616+
// Skip task if min requested start time not met.
4617+
if(t->resources_requested->start > now) {
4618+
continue;
4619+
}
46014620

4602-
struct category *c = work_queue_category_lookup_or_create(q, t->category);
4603-
if (c->max_concurrent > -1 && c->max_concurrent < c->wq_stats->tasks_running) {
4604-
continue;
4605-
}
4621+
struct category *c = work_queue_category_lookup_or_create(q, t->category);
4622+
if (c->max_concurrent > -1 && c->max_concurrent < c->wq_stats->tasks_running) {
4623+
continue;
4624+
}
46064625

4607-
// Find the best worker for the task at the head of the list
4608-
w = find_best_worker(q,t);
4626+
// Find the best worker for the task at the head of the list
4627+
w = find_best_worker(q,t);
46094628

4610-
if(!w) {
4611-
continue;
4612-
}
4629+
if(!w) {
4630+
continue;
4631+
}
46134632

4614-
// Otherwise, remove it from the ready list and start it:
4615-
list_pop_tail(q->ready_list);
4616-
commit_task_to_worker(q,w,t);
4617-
return 1;
4633+
// Otherwise, remove it from the ready list and start it:
4634+
list_pop_tail(q->ready_list);
4635+
commit_task_to_worker(q,w,t);
4636+
return 1;
4637+
}
4638+
} else {
4639+
list_first_item(q->ready_list);
4640+
while( (t = list_next_item(q->ready_list))) {
4641+
// Skip task if min requested start time not met.
4642+
if(t->resources_requested->start > now) {
4643+
continue;
4644+
}
4645+
4646+
struct category *c = work_queue_category_lookup_or_create(q, t->category);
4647+
if (c->max_concurrent > -1 && c->max_concurrent < c->wq_stats->tasks_running) {
4648+
continue;
4649+
}
4650+
4651+
// Find the best worker for the task at the head of the list
4652+
w = find_best_worker(q,t);
4653+
4654+
if(!w) {
4655+
continue;
4656+
}
4657+
4658+
// Otherwise, remove it from the ready list and start it:
4659+
list_pop_tail(q->ready_list);
4660+
commit_task_to_worker(q,w,t);
4661+
return 1;
4662+
}
46184663
}
46194664

46204665
// if we made it here we reached the end of the list
@@ -5928,6 +5973,7 @@ struct work_queue *work_queue_ssl_create(int port, const char *key, const char *
59285973

59295974
q->wait_for_workers = 0;
59305975
q->attempt_schedule_depth = 100;
5976+
q->ht_dispatch = 1;
59315977

59325978
q->proportional_resources = 1;
59335979
q->proportional_whole_tasks = 1;
@@ -6411,6 +6457,9 @@ void push_task_to_ready_list( struct work_queue *q, struct work_queue_task *t )
64116457

64126458
if(by_priority) {
64136459
list_push_priority(q->ready_list, work_queue_task_priority, t);
6460+
if(t->priority){
6461+
q->ht_dispatch = 0;
6462+
}
64146463
} else {
64156464
list_push_head(q->ready_list,t);
64166465
}

0 commit comments

Comments
 (0)