Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 26 additions & 7 deletions work_queue/src/work_queue.c
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ struct work_queue {

int wait_for_workers; /* wait for these many workers before dispatching tasks at start of execution. */
int attempt_schedule_depth; /* number of submitted tasks to attempt scheduling before we continue to retrievals */
int ht_dispatch; /* internal flag determining whether to iterate the whole ready list or rotate through attempt_schedule_depth */

work_queue_category_mode_t allocation_default_mode;

Expand Down Expand Up @@ -2000,20 +2001,27 @@ static int expire_waiting_tasks(struct work_queue *q)

int tasks_considered = 0;
double current_time = timestamp_get() / ONE_SECOND;
while( (t = list_rotate(q->ready_list)) ) {
if(tasks_considered > q->attempt_schedule_depth) {

void *(*list_iter_fn)(struct list*) = q->ht_dispatch ? list_rotate : list_next_item;

if(!q->ht_dispatch){
list_first_item(q->ready_list);
}

while( (t = list_iter_fn(q->ready_list)) ) {
if((tasks_considered > q->attempt_schedule_depth) && q->ht_dispatch) {
return expired;
}
if(t->resources_requested->end > 0 && t->resources_requested->end <= current_time) {
update_task_result(t, WORK_QUEUE_RESULT_TASK_TIMEOUT);
change_task_state(q, t, WORK_QUEUE_TASK_RETRIEVED);
expired++;
list_pop_tail(q->ready_list);
list_remove_item(q->ready_list);
} else if(t->max_retries > 0 && t->try_count > t->max_retries) {
update_task_result(t, WORK_QUEUE_RESULT_MAX_RETRIES);
change_task_state(q, t, WORK_QUEUE_TASK_RETRIEVED);
expired++;
list_pop_tail(q->ready_list);
list_remove_item(q->ready_list);
}
tasks_considered++;
}
Expand Down Expand Up @@ -4589,8 +4597,14 @@ static int send_one_task( struct work_queue *q )
int tasks_considered = 0;
timestamp_t now = timestamp_get();

while( (t = list_rotate(q->ready_list)) ) {
if(tasks_considered++ > q->attempt_schedule_depth) {
void *(*list_iter_fn)(struct list*) = q->ht_dispatch ? list_rotate : list_next_item;

if(!q->ht_dispatch){
list_first_item(q->ready_list);
}

while( (t = list_iter_fn(q->ready_list)) ) {
if((tasks_considered++ > q->attempt_schedule_depth) && q->ht_dispatch) {
return 0;
}

Expand All @@ -4612,7 +4626,8 @@ static int send_one_task( struct work_queue *q )
}

// Otherwise, remove it from the ready list and start it:
list_pop_tail(q->ready_list);
list_remove_item(q->ready_list);

commit_task_to_worker(q,w,t);
return 1;
}
Expand Down Expand Up @@ -5928,6 +5943,7 @@ struct work_queue *work_queue_ssl_create(int port, const char *key, const char *

q->wait_for_workers = 0;
q->attempt_schedule_depth = 100;
q->ht_dispatch = 1;

q->proportional_resources = 1;
q->proportional_whole_tasks = 1;
Expand Down Expand Up @@ -6411,6 +6427,9 @@ void push_task_to_ready_list( struct work_queue *q, struct work_queue_task *t )

if(by_priority) {
list_push_priority(q->ready_list, work_queue_task_priority, t);
if(t->priority){
q->ht_dispatch = 0;
}
} else {
list_push_head(q->ready_list,t);
}
Expand Down
Loading