diff --git a/work_queue/src/work_queue.c b/work_queue/src/work_queue.c index 7c74896b40..c4e095d207 100644 --- a/work_queue/src/work_queue.c +++ b/work_queue/src/work_queue.c @@ -205,6 +205,7 @@ struct work_queue { int wait_for_workers; /* wait for these many workers before dispatching tasks at start of execution. */ int attempt_schedule_depth; /* number of submitted tasks to attempt scheduling before we continue to retrievals */ + int ht_dispatch; /* internal flag determining whether to iterate the whole ready list or rotate through attempt_schedule_depth */ work_queue_category_mode_t allocation_default_mode; @@ -2000,20 +2001,27 @@ static int expire_waiting_tasks(struct work_queue *q) int tasks_considered = 0; double current_time = timestamp_get() / ONE_SECOND; - while( (t = list_rotate(q->ready_list)) ) { - if(tasks_considered > q->attempt_schedule_depth) { + + void *(*list_iter_fn)(struct list*) = q->ht_dispatch ? list_rotate : list_next_item; + + if(!q->ht_dispatch){ + list_first_item(q->ready_list); + } + + while( (t = list_iter_fn(q->ready_list)) ) { + if((tasks_considered > q->attempt_schedule_depth) && q->ht_dispatch) { return expired; } if(t->resources_requested->end > 0 && t->resources_requested->end <= current_time) { update_task_result(t, WORK_QUEUE_RESULT_TASK_TIMEOUT); change_task_state(q, t, WORK_QUEUE_TASK_RETRIEVED); expired++; - list_pop_tail(q->ready_list); + list_remove_item(q->ready_list); } else if(t->max_retries > 0 && t->try_count > t->max_retries) { update_task_result(t, WORK_QUEUE_RESULT_MAX_RETRIES); change_task_state(q, t, WORK_QUEUE_TASK_RETRIEVED); expired++; - list_pop_tail(q->ready_list); + list_remove_item(q->ready_list); } tasks_considered++; } @@ -4589,8 +4597,14 @@ static int send_one_task( struct work_queue *q ) int tasks_considered = 0; timestamp_t now = timestamp_get(); - while( (t = list_rotate(q->ready_list)) ) { - if(tasks_considered++ > q->attempt_schedule_depth) { + void *(*list_iter_fn)(struct list*) = q->ht_dispatch ? list_rotate : list_next_item; + + if(!q->ht_dispatch){ + list_first_item(q->ready_list); + } + + while( (t = list_iter_fn(q->ready_list)) ) { + if((tasks_considered++ > q->attempt_schedule_depth) && q->ht_dispatch) { return 0; } @@ -4612,7 +4626,8 @@ static int send_one_task( struct work_queue *q ) } // Otherwise, remove it from the ready list and start it: - list_pop_tail(q->ready_list); + list_remove_item(q->ready_list); + commit_task_to_worker(q,w,t); return 1; } @@ -5928,6 +5943,7 @@ struct work_queue *work_queue_ssl_create(int port, const char *key, const char * q->wait_for_workers = 0; q->attempt_schedule_depth = 100; + q->ht_dispatch = 1; q->proportional_resources = 1; q->proportional_whole_tasks = 1; @@ -6411,6 +6427,9 @@ void push_task_to_ready_list( struct work_queue *q, struct work_queue_task *t ) if(by_priority) { list_push_priority(q->ready_list, work_queue_task_priority, t); + if(t->priority){ + q->ht_dispatch = 0; + } } else { list_push_head(q->ready_list,t); }