Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions doc/manuals/taskvine/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -2750,6 +2750,7 @@ change.
|-----------|-------------|---------------|
| attempt-schedule-depth | The amount of tasks to attempt scheduling on each pass of send_one_task in the main loop. | 100 |
| category-steady-n-tasks | Minimum number of successful tasks to use a sample for automatic resource allocation modes after encountering a new resource maximum. | 25 |
| clean-redundant-replicas | Remove redundant temporary file replicas to save worker's local disk space. | 0 |
| default-transfer-rate | The assumed network bandwidth used until sufficient data has been collected. (1MB/s)
| disconnect-slow-workers-factor | Set the multiplier of the average task time at which point to disconnect a worker; disabled if less than 1. (default=0)
| hungry-minimum | Smallest number of waiting tasks in the manager before declaring it hungry | 10 |
Expand All @@ -2767,6 +2768,7 @@ change.
| ramp-down-heuristic | If set to 1 and there are more workers than tasks waiting, then tasks are allocated all the free resources of a worker large enough to run them. If monitoring watchdog is not enabled, then this heuristic has no effect. | 0 |
| resource-submit-multiplier | Assume that workers have `resource x resources-submit-multiplier` available.<br> This overcommits resources at the worker, causing tasks to be sent to workers that cannot be immediately executed.<br>The extra tasks wait at the worker until resources become available. | 1 |
| sandbox-grow-factor | When task disk sandboxes are exhausted, increase the allocation using their measured valued times this factor. Minimum is 1.1. | 2 |
| shift-disk-load | Proactively shift temporary files away from the most disk-heavy worker to those with more available disk. | 0 |
| short-timeout | Set the minimum timeout in seconds when sending a brief message to a single worker. | 5 |
| temp-replica-count | Number of temp file replicas created across workers | 0 |
| transfer-outlier-factor | Transfer that are this many times slower than the average will be terminated. | 10 |
Expand Down
3 changes: 2 additions & 1 deletion taskvine/src/manager/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ SOURCES = \
vine_file_replica_table.c \
vine_fair.c \
vine_runtime_dir.c \
vine_task_groups.c
vine_task_groups.c \
vine_temp.c

PUBLIC_HEADERS = taskvine.h

Expand Down
143 changes: 47 additions & 96 deletions taskvine/src/manager/vine_manager.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ See the file COPYING for details.
#include "vine_taskgraph_log.h"
#include "vine_txn_log.h"
#include "vine_worker_info.h"
#include "vine_temp.h"

#include "address.h"
#include "buffer.h"
Expand Down Expand Up @@ -165,7 +166,6 @@ static int vine_manager_check_inputs_available(struct vine_manager *q, struct vi
static void vine_manager_consider_recovery_task(struct vine_manager *q, struct vine_file *lost_file, struct vine_task *rt);

static void delete_uncacheable_files(struct vine_manager *q, struct vine_worker_info *w, struct vine_task *t);
static int delete_worker_file(struct vine_manager *q, struct vine_worker_info *w, const char *filename, vine_cache_level_t cache_level, vine_cache_level_t delete_upto_level);
static int release_worker(struct vine_manager *q, struct vine_worker_info *w);

struct vine_task *send_library_to_worker(struct vine_manager *q, struct vine_worker_info *w, const char *name);
Expand Down Expand Up @@ -424,9 +424,17 @@ static vine_msg_code_t handle_cache_update(struct vine_manager *q, struct vine_w
replica->type = f->type;
}

/* And if the file is a newly created temporary, replicate as needed. */
if (f->type == VINE_TEMP && *id == 'X' && q->temp_replica_count > 1) {
hash_table_insert(q->temp_files_to_replicate, f->cached_name, NULL);
/* If a TEMP file, perform various actions as needed. */
if (f->type == VINE_TEMP) {
if (q->temp_replica_count > 1) {
vine_temp_queue_for_replication(q, f);
}
if (q->shift_disk_load) {
vine_temp_shift_disk_load(q, w, f);
}
if (q->clean_redundant_replicas) {
vine_temp_clean_redundant_replicas(q, f);
}
}
}
}
Expand Down Expand Up @@ -482,6 +490,12 @@ static vine_msg_code_t handle_cache_invalid(struct vine_manager *q, struct vine_
w->last_failure_time = timestamp_get();
}

/* Respond to a missing replica notification by re-queuing the corresponding file
* for replication. If the replica does not have any ready source, it will be silently
* discarded in the replication phase. */
struct vine_file *f = hash_table_lookup(q->file_table, cachename);
vine_temp_queue_for_replication(q, f);

/* Successfully processed this message. */
return VINE_MSG_PROCESSED;
} else {
Expand Down Expand Up @@ -656,6 +670,19 @@ static vine_result_code_t get_completion_result(struct vine_manager *q, struct v
itable_remove(q->running_table, t->task_id);
vine_task_set_result(t, task_status);

/* A task scheduling may result in a redundant replica of its input due to peer transfers, which can be safely removed when completed.
* However, the general function of taskvine is to replicate files on demand, and to only clean them up when prune is called.
* So, we only clean up redundant replicas for the task-inputs when the manager is configured to do so. */
if (q->clean_redundant_replicas) {
struct vine_mount *input_mount;
LIST_ITERATE(t->input_mounts, input_mount)
{
if (input_mount->file && input_mount->file->type == VINE_TEMP) {
vine_temp_clean_redundant_replicas(q, input_mount->file);
}
}
}

return VINE_SUCCESS;
}

Expand Down Expand Up @@ -1047,85 +1074,6 @@ static void cleanup_worker(struct vine_manager *q, struct vine_worker_info *w)
cleanup_worker_files(q, w);
}

/* Start replicating files that may need replication */
static int consider_tempfile_replications(struct vine_manager *q)
{
if (hash_table_size(q->temp_files_to_replicate) <= 0) {
return 0;
}

char *cached_name = NULL;
void *empty_val = NULL;
int total_replication_request_sent = 0;

static char key_start[PATH_MAX] = "random init";
int iter_control;
int iter_count_var;

struct list *to_remove = list_create();

HASH_TABLE_ITERATE_FROM_KEY(q->temp_files_to_replicate, iter_control, iter_count_var, key_start, cached_name, empty_val)
{
struct vine_file *f = hash_table_lookup(q->file_table, cached_name);

if (!f) {
continue;
}

/* are there any available source workers? */
struct set *source_workers = hash_table_lookup(q->file_worker_table, f->cached_name);
if (!source_workers) {
/* If no source workers found, it indicates that the file doesn't exist, either pruned or lost.
Because a pruned file is removed from the recovery queue, so it definitely indicates that the file is lost. */
if (q->transfer_temps_recovery && file_needs_recovery(q, f)) {
vine_manager_consider_recovery_task(q, f, f->recovery_task);
}
list_push_tail(to_remove, xxstrdup(f->cached_name));
continue;
}

/* at least one source is able to transfer? */
int has_valid_source = 0;
struct vine_worker_info *s;
SET_ITERATE(source_workers, s)
{
if (s->transfer_port_active && s->outgoing_xfer_counter < q->worker_source_max_transfers && !s->draining) {
has_valid_source = 1;
break;
}
}
if (!has_valid_source) {
continue;
}

/* has this file been fully replicated? */
int nsource_workers = set_size(source_workers);
int to_find = MIN(q->temp_replica_count - nsource_workers, q->transfer_replica_per_cycle);
if (to_find <= 0) {
list_push_tail(to_remove, xxstrdup(f->cached_name));
continue;
}

// debug(D_VINE, "Found %d workers holding %s, %d replicas needed", nsource_workers, f->cached_name, to_find);

int round_replication_request_sent = vine_file_replica_table_replicate(q, f, source_workers, to_find);
total_replication_request_sent += round_replication_request_sent;

if (total_replication_request_sent >= q->attempt_schedule_depth) {
break;
}
}

while ((cached_name = list_pop_head(to_remove))) {
hash_table_remove(q->temp_files_to_replicate, cached_name);
free(cached_name);
}

list_delete(to_remove);

return total_replication_request_sent;
}

/* Insert into hashtable temp files that may need replication. */

static void recall_worker_lost_temp_files(struct vine_manager *q, struct vine_worker_info *w)
Expand All @@ -1138,11 +1086,11 @@ static void recall_worker_lost_temp_files(struct vine_manager *q, struct vine_wo
// Iterate over files we want might want to recover
HASH_TABLE_ITERATE(w->current_files, cached_name, info)
{
/* Respond to a data loss due to worker removal by re-queuing the corresponding file
* for replication. If the replica does not have any ready source, it will be silently
* discarded in the replication phase. */
struct vine_file *f = hash_table_lookup(q->file_table, cached_name);

if (f && f->type == VINE_TEMP) {
hash_table_insert(q->temp_files_to_replicate, cached_name, NULL);
}
vine_temp_queue_for_replication(q, f);
}
}

Expand Down Expand Up @@ -1254,7 +1202,7 @@ static void add_worker(struct vine_manager *q)

/* Delete a single file on a remote worker except those with greater delete_upto_level cache level */

static int delete_worker_file(struct vine_manager *q, struct vine_worker_info *w, const char *filename, vine_cache_level_t cache_level, vine_cache_level_t delete_upto_level)
int delete_worker_file(struct vine_manager *q, struct vine_worker_info *w, const char *filename, vine_cache_level_t cache_level, vine_cache_level_t delete_upto_level)
{
if (cache_level <= delete_upto_level) {
process_replica_on_event(q, w, filename, VINE_FILE_REPLICA_STATE_TRANSITION_EVENT_UNLINK);
Expand Down Expand Up @@ -4146,7 +4094,7 @@ struct vine_manager *vine_ssl_create(int port, const char *key, const char *cert

q->worker_table = hash_table_create(0, 0);
q->file_worker_table = hash_table_create(0, 0);
q->temp_files_to_replicate = hash_table_create(0, 0);
q->temp_files_to_replicate = priority_queue_create(0);
q->worker_blocklist = hash_table_create(0, 0);

q->file_table = hash_table_create(0, 0);
Expand Down Expand Up @@ -4230,6 +4178,8 @@ struct vine_manager *vine_ssl_create(int port, const char *key, const char *cert
q->perf_log_interval = VINE_PERF_LOG_INTERVAL;

q->temp_replica_count = 1;
q->clean_redundant_replicas = 0;
q->shift_disk_load = 0;
q->transfer_temps_recovery = 0;
q->transfer_replica_per_cycle = 10;

Expand Down Expand Up @@ -4498,8 +4448,7 @@ void vine_delete(struct vine_manager *q)
hash_table_clear(q->file_worker_table, (void *)set_delete);
hash_table_delete(q->file_worker_table);

hash_table_clear(q->temp_files_to_replicate, 0);
hash_table_delete(q->temp_files_to_replicate);
priority_queue_delete(q->temp_files_to_replicate);

hash_table_clear(q->factory_table, (void *)vine_factory_info_delete);
hash_table_delete(q->factory_table);
Expand Down Expand Up @@ -5460,7 +5409,7 @@ static struct vine_task *vine_wait_internal(struct vine_manager *q, int timeout,

// Check if any temp files need replication and start replicating
BEGIN_ACCUM_TIME(q, time_internal);
result = consider_tempfile_replications(q);
result = vine_temp_start_replication(q);
END_ACCUM_TIME(q, time_internal);
if (result) {
// recovered at least one temp file
Expand Down Expand Up @@ -5853,7 +5802,6 @@ void vine_set_manager_preferred_connection(struct vine_manager *q, const char *p

int vine_tune(struct vine_manager *q, const char *name, double value)
{

if (!strcmp(name, "attempt-schedule-depth")) {
q->attempt_schedule_depth = MAX(1, (int)value);

Expand Down Expand Up @@ -5992,6 +5940,12 @@ int vine_tune(struct vine_manager *q, const char *name, double value)
} else if (!strcmp(name, "enforce-worker-eviction-interval")) {
q->enforce_worker_eviction_interval = (timestamp_t)(MAX(0, (int)value) * ONE_SECOND);

} else if (!strcmp(name, "clean-redundant-replicas")) {
q->clean_redundant_replicas = !!((int)value);

} else if (!strcmp(name, "shift-disk-load")) {
q->shift_disk_load = !!((int)value);

} else {
debug(D_NOTICE | D_VINE, "Warning: tuning parameter \"%s\" not recognized\n", name);
return -1;
Expand Down Expand Up @@ -6476,9 +6430,6 @@ int vine_prune_file(struct vine_manager *m, struct vine_file *f)
}
}

/* also remove from the replication table. */
hash_table_remove(m->temp_files_to_replicate, f->cached_name);

return pruned_replica_count;
}

Expand Down
7 changes: 6 additions & 1 deletion taskvine/src/manager/vine_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ struct vine_manager {

struct hash_table *file_table; /* Maps fileid -> struct vine_file.* */
struct hash_table *file_worker_table; /* Maps cachename -> struct set of workers with a replica of the file.* */
struct hash_table *temp_files_to_replicate; /* Maps cachename -> NULL. Used as a set of temp files to be replicated */
struct priority_queue *temp_files_to_replicate; /* Priority queue of temp files to be replicated, those with less replicas are at the top. */


/* Primary scheduling controls. */
Expand Down Expand Up @@ -217,6 +217,8 @@ struct vine_manager {
int transfer_temps_recovery; /* If true, attempt to recover temp files from lost worker to reach threshold required */
int transfer_replica_per_cycle; /* Maximum number of replica to request per temp file per iteration */
int temp_replica_count; /* Number of replicas per temp file */
int clean_redundant_replicas; /* If true, remove redundant replicas of temp files to save disk space. */
int shift_disk_load; /* If true, shift storage burden to more available workers to minimize disk usage peaks. */

double resource_submit_multiplier; /* Factor to permit overcommitment of resources at each worker. */
double bandwidth_limit; /* Artificial limit on bandwidth of manager<->worker transfers. */
Expand Down Expand Up @@ -291,6 +293,9 @@ void vine_manager_remove_worker(struct vine_manager *q, struct vine_worker_info
/* Check if the worker is able to transfer the necessary files for this task. */
int vine_manager_transfer_capacity_available(struct vine_manager *q, struct vine_worker_info *w, struct vine_task *t);

/* Delete a file from a worker. */
int delete_worker_file(struct vine_manager *q, struct vine_worker_info *w, const char *filename, vine_cache_level_t cache_level, vine_cache_level_t delete_upto_level);

/** Release a random worker to simulate a failure. */
int release_random_worker(struct vine_manager *q);

Expand Down
Loading