diff --git a/doc/manuals/taskvine/index.md b/doc/manuals/taskvine/index.md index a3d2617784..2412a11b81 100644 --- a/doc/manuals/taskvine/index.md +++ b/doc/manuals/taskvine/index.md @@ -2750,6 +2750,7 @@ change. |-----------|-------------|---------------| | attempt-schedule-depth | The amount of tasks to attempt scheduling on each pass of send_one_task in the main loop. | 100 | | category-steady-n-tasks | Minimum number of successful tasks to use a sample for automatic resource allocation modes after encountering a new resource maximum. | 25 | +| clean-redundant-replicas | Remove redundant temporary file replicas to save worker's local disk space. | 0 | | default-transfer-rate | The assumed network bandwidth used until sufficient data has been collected. (1MB/s) | disconnect-slow-workers-factor | Set the multiplier of the average task time at which point to disconnect a worker; disabled if less than 1. (default=0) | hungry-minimum | Smallest number of waiting tasks in the manager before declaring it hungry | 10 | @@ -2767,6 +2768,7 @@ change. | ramp-down-heuristic | If set to 1 and there are more workers than tasks waiting, then tasks are allocated all the free resources of a worker large enough to run them. If monitoring watchdog is not enabled, then this heuristic has no effect. | 0 | | resource-submit-multiplier | Assume that workers have `resource x resources-submit-multiplier` available.
This overcommits resources at the worker, causing tasks to be sent to workers that cannot be immediately executed.
The extra tasks wait at the worker until resources become available. | 1 | | sandbox-grow-factor | When task disk sandboxes are exhausted, increase the allocation using their measured valued times this factor. Minimum is 1.1. | 2 | +| shift-disk-load | Proactively shift temporary files away from the most disk-heavy worker to those with more available disk. | 0 | | short-timeout | Set the minimum timeout in seconds when sending a brief message to a single worker. | 5 | | temp-replica-count | Number of temp file replicas created across workers | 0 | | transfer-outlier-factor | Transfer that are this many times slower than the average will be terminated. | 10 | diff --git a/taskvine/src/manager/Makefile b/taskvine/src/manager/Makefile index a036e9bd64..db09403667 100644 --- a/taskvine/src/manager/Makefile +++ b/taskvine/src/manager/Makefile @@ -28,7 +28,8 @@ SOURCES = \ vine_file_replica_table.c \ vine_fair.c \ vine_runtime_dir.c \ - vine_task_groups.c + vine_task_groups.c \ + vine_temp.c PUBLIC_HEADERS = taskvine.h diff --git a/taskvine/src/manager/vine_manager.c b/taskvine/src/manager/vine_manager.c index f9135ad47b..3030bfe5ed 100644 --- a/taskvine/src/manager/vine_manager.c +++ b/taskvine/src/manager/vine_manager.c @@ -29,6 +29,7 @@ See the file COPYING for details. #include "vine_taskgraph_log.h" #include "vine_txn_log.h" #include "vine_worker_info.h" +#include "vine_temp.h" #include "address.h" #include "buffer.h" @@ -165,7 +166,6 @@ static int vine_manager_check_inputs_available(struct vine_manager *q, struct vi static void vine_manager_consider_recovery_task(struct vine_manager *q, struct vine_file *lost_file, struct vine_task *rt); static void delete_uncacheable_files(struct vine_manager *q, struct vine_worker_info *w, struct vine_task *t); -static int delete_worker_file(struct vine_manager *q, struct vine_worker_info *w, const char *filename, vine_cache_level_t cache_level, vine_cache_level_t delete_upto_level); static int release_worker(struct vine_manager *q, struct vine_worker_info *w); struct vine_task *send_library_to_worker(struct vine_manager *q, struct vine_worker_info *w, const char *name); @@ -424,9 +424,17 @@ static vine_msg_code_t handle_cache_update(struct vine_manager *q, struct vine_w replica->type = f->type; } - /* And if the file is a newly created temporary, replicate as needed. */ - if (f->type == VINE_TEMP && *id == 'X' && q->temp_replica_count > 1) { - hash_table_insert(q->temp_files_to_replicate, f->cached_name, NULL); + /* If a TEMP file, perform various actions as needed. */ + if (f->type == VINE_TEMP) { + if (q->temp_replica_count > 1) { + vine_temp_queue_for_replication(q, f); + } + if (q->shift_disk_load) { + vine_temp_shift_disk_load(q, w, f); + } + if (q->clean_redundant_replicas) { + vine_temp_clean_redundant_replicas(q, f); + } } } } @@ -482,6 +490,12 @@ static vine_msg_code_t handle_cache_invalid(struct vine_manager *q, struct vine_ w->last_failure_time = timestamp_get(); } + /* Respond to a missing replica notification by re-queuing the corresponding file + * for replication. If the replica does not have any ready source, it will be silently + * discarded in the replication phase. */ + struct vine_file *f = hash_table_lookup(q->file_table, cachename); + vine_temp_queue_for_replication(q, f); + /* Successfully processed this message. */ return VINE_MSG_PROCESSED; } else { @@ -656,6 +670,19 @@ static vine_result_code_t get_completion_result(struct vine_manager *q, struct v itable_remove(q->running_table, t->task_id); vine_task_set_result(t, task_status); + /* A task scheduling may result in a redundant replica of its input due to peer transfers, which can be safely removed when completed. + * However, the general function of taskvine is to replicate files on demand, and to only clean them up when prune is called. + * So, we only clean up redundant replicas for the task-inputs when the manager is configured to do so. */ + if (q->clean_redundant_replicas) { + struct vine_mount *input_mount; + LIST_ITERATE(t->input_mounts, input_mount) + { + if (input_mount->file && input_mount->file->type == VINE_TEMP) { + vine_temp_clean_redundant_replicas(q, input_mount->file); + } + } + } + return VINE_SUCCESS; } @@ -1047,85 +1074,6 @@ static void cleanup_worker(struct vine_manager *q, struct vine_worker_info *w) cleanup_worker_files(q, w); } -/* Start replicating files that may need replication */ -static int consider_tempfile_replications(struct vine_manager *q) -{ - if (hash_table_size(q->temp_files_to_replicate) <= 0) { - return 0; - } - - char *cached_name = NULL; - void *empty_val = NULL; - int total_replication_request_sent = 0; - - static char key_start[PATH_MAX] = "random init"; - int iter_control; - int iter_count_var; - - struct list *to_remove = list_create(); - - HASH_TABLE_ITERATE_FROM_KEY(q->temp_files_to_replicate, iter_control, iter_count_var, key_start, cached_name, empty_val) - { - struct vine_file *f = hash_table_lookup(q->file_table, cached_name); - - if (!f) { - continue; - } - - /* are there any available source workers? */ - struct set *source_workers = hash_table_lookup(q->file_worker_table, f->cached_name); - if (!source_workers) { - /* If no source workers found, it indicates that the file doesn't exist, either pruned or lost. - Because a pruned file is removed from the recovery queue, so it definitely indicates that the file is lost. */ - if (q->transfer_temps_recovery && file_needs_recovery(q, f)) { - vine_manager_consider_recovery_task(q, f, f->recovery_task); - } - list_push_tail(to_remove, xxstrdup(f->cached_name)); - continue; - } - - /* at least one source is able to transfer? */ - int has_valid_source = 0; - struct vine_worker_info *s; - SET_ITERATE(source_workers, s) - { - if (s->transfer_port_active && s->outgoing_xfer_counter < q->worker_source_max_transfers && !s->draining) { - has_valid_source = 1; - break; - } - } - if (!has_valid_source) { - continue; - } - - /* has this file been fully replicated? */ - int nsource_workers = set_size(source_workers); - int to_find = MIN(q->temp_replica_count - nsource_workers, q->transfer_replica_per_cycle); - if (to_find <= 0) { - list_push_tail(to_remove, xxstrdup(f->cached_name)); - continue; - } - - // debug(D_VINE, "Found %d workers holding %s, %d replicas needed", nsource_workers, f->cached_name, to_find); - - int round_replication_request_sent = vine_file_replica_table_replicate(q, f, source_workers, to_find); - total_replication_request_sent += round_replication_request_sent; - - if (total_replication_request_sent >= q->attempt_schedule_depth) { - break; - } - } - - while ((cached_name = list_pop_head(to_remove))) { - hash_table_remove(q->temp_files_to_replicate, cached_name); - free(cached_name); - } - - list_delete(to_remove); - - return total_replication_request_sent; -} - /* Insert into hashtable temp files that may need replication. */ static void recall_worker_lost_temp_files(struct vine_manager *q, struct vine_worker_info *w) @@ -1138,11 +1086,11 @@ static void recall_worker_lost_temp_files(struct vine_manager *q, struct vine_wo // Iterate over files we want might want to recover HASH_TABLE_ITERATE(w->current_files, cached_name, info) { + /* Respond to a data loss due to worker removal by re-queuing the corresponding file + * for replication. If the replica does not have any ready source, it will be silently + * discarded in the replication phase. */ struct vine_file *f = hash_table_lookup(q->file_table, cached_name); - - if (f && f->type == VINE_TEMP) { - hash_table_insert(q->temp_files_to_replicate, cached_name, NULL); - } + vine_temp_queue_for_replication(q, f); } } @@ -1254,7 +1202,7 @@ static void add_worker(struct vine_manager *q) /* Delete a single file on a remote worker except those with greater delete_upto_level cache level */ -static int delete_worker_file(struct vine_manager *q, struct vine_worker_info *w, const char *filename, vine_cache_level_t cache_level, vine_cache_level_t delete_upto_level) +int delete_worker_file(struct vine_manager *q, struct vine_worker_info *w, const char *filename, vine_cache_level_t cache_level, vine_cache_level_t delete_upto_level) { if (cache_level <= delete_upto_level) { process_replica_on_event(q, w, filename, VINE_FILE_REPLICA_STATE_TRANSITION_EVENT_UNLINK); @@ -4146,7 +4094,7 @@ struct vine_manager *vine_ssl_create(int port, const char *key, const char *cert q->worker_table = hash_table_create(0, 0); q->file_worker_table = hash_table_create(0, 0); - q->temp_files_to_replicate = hash_table_create(0, 0); + q->temp_files_to_replicate = priority_queue_create(0); q->worker_blocklist = hash_table_create(0, 0); q->file_table = hash_table_create(0, 0); @@ -4230,6 +4178,8 @@ struct vine_manager *vine_ssl_create(int port, const char *key, const char *cert q->perf_log_interval = VINE_PERF_LOG_INTERVAL; q->temp_replica_count = 1; + q->clean_redundant_replicas = 0; + q->shift_disk_load = 0; q->transfer_temps_recovery = 0; q->transfer_replica_per_cycle = 10; @@ -4498,8 +4448,7 @@ void vine_delete(struct vine_manager *q) hash_table_clear(q->file_worker_table, (void *)set_delete); hash_table_delete(q->file_worker_table); - hash_table_clear(q->temp_files_to_replicate, 0); - hash_table_delete(q->temp_files_to_replicate); + priority_queue_delete(q->temp_files_to_replicate); hash_table_clear(q->factory_table, (void *)vine_factory_info_delete); hash_table_delete(q->factory_table); @@ -5460,7 +5409,7 @@ static struct vine_task *vine_wait_internal(struct vine_manager *q, int timeout, // Check if any temp files need replication and start replicating BEGIN_ACCUM_TIME(q, time_internal); - result = consider_tempfile_replications(q); + result = vine_temp_start_replication(q); END_ACCUM_TIME(q, time_internal); if (result) { // recovered at least one temp file @@ -5853,7 +5802,6 @@ void vine_set_manager_preferred_connection(struct vine_manager *q, const char *p int vine_tune(struct vine_manager *q, const char *name, double value) { - if (!strcmp(name, "attempt-schedule-depth")) { q->attempt_schedule_depth = MAX(1, (int)value); @@ -5992,6 +5940,12 @@ int vine_tune(struct vine_manager *q, const char *name, double value) } else if (!strcmp(name, "enforce-worker-eviction-interval")) { q->enforce_worker_eviction_interval = (timestamp_t)(MAX(0, (int)value) * ONE_SECOND); + } else if (!strcmp(name, "clean-redundant-replicas")) { + q->clean_redundant_replicas = !!((int)value); + + } else if (!strcmp(name, "shift-disk-load")) { + q->shift_disk_load = !!((int)value); + } else { debug(D_NOTICE | D_VINE, "Warning: tuning parameter \"%s\" not recognized\n", name); return -1; @@ -6476,9 +6430,6 @@ int vine_prune_file(struct vine_manager *m, struct vine_file *f) } } - /* also remove from the replication table. */ - hash_table_remove(m->temp_files_to_replicate, f->cached_name); - return pruned_replica_count; } diff --git a/taskvine/src/manager/vine_manager.h b/taskvine/src/manager/vine_manager.h index e53847b855..d6c393e6bd 100644 --- a/taskvine/src/manager/vine_manager.h +++ b/taskvine/src/manager/vine_manager.h @@ -123,7 +123,7 @@ struct vine_manager { struct hash_table *file_table; /* Maps fileid -> struct vine_file.* */ struct hash_table *file_worker_table; /* Maps cachename -> struct set of workers with a replica of the file.* */ - struct hash_table *temp_files_to_replicate; /* Maps cachename -> NULL. Used as a set of temp files to be replicated */ + struct priority_queue *temp_files_to_replicate; /* Priority queue of temp files to be replicated, those with less replicas are at the top. */ /* Primary scheduling controls. */ @@ -217,6 +217,8 @@ struct vine_manager { int transfer_temps_recovery; /* If true, attempt to recover temp files from lost worker to reach threshold required */ int transfer_replica_per_cycle; /* Maximum number of replica to request per temp file per iteration */ int temp_replica_count; /* Number of replicas per temp file */ + int clean_redundant_replicas; /* If true, remove redundant replicas of temp files to save disk space. */ + int shift_disk_load; /* If true, shift storage burden to more available workers to minimize disk usage peaks. */ double resource_submit_multiplier; /* Factor to permit overcommitment of resources at each worker. */ double bandwidth_limit; /* Artificial limit on bandwidth of manager<->worker transfers. */ @@ -291,6 +293,9 @@ void vine_manager_remove_worker(struct vine_manager *q, struct vine_worker_info /* Check if the worker is able to transfer the necessary files for this task. */ int vine_manager_transfer_capacity_available(struct vine_manager *q, struct vine_worker_info *w, struct vine_task *t); +/* Delete a file from a worker. */ +int delete_worker_file(struct vine_manager *q, struct vine_worker_info *w, const char *filename, vine_cache_level_t cache_level, vine_cache_level_t delete_upto_level); + /** Release a random worker to simulate a failure. */ int release_random_worker(struct vine_manager *q); diff --git a/taskvine/src/manager/vine_temp.c b/taskvine/src/manager/vine_temp.c new file mode 100644 index 0000000000..cae4b183ee --- /dev/null +++ b/taskvine/src/manager/vine_temp.c @@ -0,0 +1,416 @@ +#include "vine_temp.h" +#include "vine_file.h" +#include "vine_worker_info.h" +#include "vine_file_replica_table.h" +#include "vine_manager.h" +#include "vine_manager_put.h" +#include "vine_file_replica.h" +#include "vine_task.h" +#include "vine_mount.h" + +#include "priority_queue.h" +#include "macros.h" +#include "stringtools.h" +#include "debug.h" +#include "random.h" +#include "xxmalloc.h" + +/*************************************************************/ +/* Private Functions */ +/*************************************************************/ + +/** +Check whether a worker is eligible to participate in peer transfers. +*/ +static int worker_can_peer_transfer(struct vine_worker_info *w) +{ + if (!w) { + return 0; + } + if (w->type != VINE_WORKER_TYPE_WORKER) { + return 0; + } + if (!w->transfer_port_active) { + return 0; + } + if (w->draining) { + return 0; + } + if (!w->resources) { + return 0; + } + if (w->resources->tag < 0) { + return 0; + } + return 1; +} + +/** +Return available disk space (in bytes) on a worker. +Note that w->resources->disk.total is in megabytes. +Use int64_t in case the actual usage is larger than the total space. +Returns -1 if worker or resources is invalid, or if the available disk space is negative. +*/ +static int64_t worker_get_available_disk(const struct vine_worker_info *w) +{ + if (!w || !w->resources) { + return -1; + } + + int64_t available_disk = (int64_t)MEGABYTES_TO_BYTES(w->resources->disk.total) - w->inuse_cache; + if (available_disk < 0) { + return -1; + } + + return available_disk; +} + +/** +Find the most suitable worker to serve as the source of a replica transfer. +Eligible workers already host the file, have a ready replica, and are not +overloaded with outgoing transfers. Preference is given to workers with fewer +outgoing transfers to balance load. +*/ +static struct vine_worker_info *get_best_source_worker(struct vine_manager *q, struct vine_file *f) +{ + if (!q || !f || f->type != VINE_TEMP) { + return NULL; + } + + struct set *sources = hash_table_lookup(q->file_worker_table, f->cached_name); + if (!sources) { + return NULL; + } + + struct vine_worker_info *best_source_worker = NULL; + + struct vine_worker_info *w = NULL; + SET_ITERATE(sources, w) + { + /* skip if the worker cannot participate in peer transfers */ + if (!worker_can_peer_transfer(w)) { + continue; + } + /* skip if outgoing transfer counter is too high */ + if (w->outgoing_xfer_counter >= q->worker_source_max_transfers) { + continue; + } + /* skip if the worker does not have this file (which is faulty) */ + struct vine_file_replica *replica = vine_file_replica_table_lookup(w, f->cached_name); + if (!replica) { + continue; + } + /* skip if the file is not ready */ + if (replica->state != VINE_FILE_REPLICA_STATE_READY) { + continue; + } + /* workers with fewer outgoing transfers are preferred */ + if (!best_source_worker || w->outgoing_xfer_counter < best_source_worker->outgoing_xfer_counter) { + best_source_worker = w; + } + } + + return best_source_worker; +} + +/** +Select a destination worker that can accept a new replica. Workers must be +active, not currently hosting the file, and have sufficient free cache space. +Those with more available disk space are prioritized to reduce pressure on +heavily utilized workers. +*/ +static struct vine_worker_info *get_best_dest_worker(struct vine_manager *q, struct vine_file *f) +{ + if (!q || !f || f->type != VINE_TEMP) { + return NULL; + } + + struct vine_worker_info *best_dest_worker = NULL; + + char *key; + struct vine_worker_info *w; + HASH_TABLE_ITERATE(q->worker_table, key, w) + { + /* skip if the worker cannot participate in peer transfers */ + if (!worker_can_peer_transfer(w)) { + continue; + } + /* skip if the incoming transfer counter is too high */ + if (w->incoming_xfer_counter >= q->worker_source_max_transfers) { + continue; + } + /* skip if the worker already has this file */ + struct vine_file_replica *replica = vine_file_replica_table_lookup(w, f->cached_name); + if (replica) { + continue; + } + /* skip if the worker does not have enough disk space */ + if (worker_get_available_disk(w) < (int64_t)f->size) { + continue; + } + /* workers with more available disk space are preferred to hold the file */ + if (!best_dest_worker || worker_get_available_disk(w) > worker_get_available_disk(best_dest_worker)) { + best_dest_worker = w; + } + } + + return best_dest_worker; +} + +/** +Initiate a peer-to-peer transfer between two workers for the specified file. +The source worker provides a direct URL so the destination worker can pull the +replica immediately via `vine_manager_put_url_now`. +*/ +static void start_peer_transfer(struct vine_manager *q, struct vine_file *f, struct vine_worker_info *dest_worker, struct vine_worker_info *source_worker) +{ + if (!q || !f || f->type != VINE_TEMP || !dest_worker || !source_worker) { + return; + } + + char *source_addr = string_format("%s/%s", source_worker->transfer_url, f->cached_name); + vine_manager_put_url_now(q, dest_worker, source_worker, source_addr, f); + free(source_addr); +} + +/** +Attempt to replicate a temporary file immediately by selecting compatible +source and destination workers. Returns 1 when a transfer is launched, or 0 if +no suitable pair of workers is currently available. +*/ +static int attempt_replication(struct vine_manager *q, struct vine_file *f) +{ + if (!q || !f || f->type != VINE_TEMP) { + return 0; + } + + struct vine_worker_info *source_worker = get_best_source_worker(q, f); + if (!source_worker) { + return 0; + } + + struct vine_worker_info *dest_worker = get_best_dest_worker(q, f); + if (!dest_worker) { + return 0; + } + + start_peer_transfer(q, f, dest_worker, source_worker); + + return 1; +} + +/*************************************************************/ +/* Public Functions */ +/*************************************************************/ + +/** +Queue a temporary file for replication when it still lacks the target number of +replicas. Files without any replica and those already satisfying the quota are +ignored. A lower priority value gives preference to scarcer replicas. +*/ +int vine_temp_queue_for_replication(struct vine_manager *q, struct vine_file *f) +{ + if (!q || !f || f->type != VINE_TEMP || f->state != VINE_FILE_STATE_CREATED) { + return 0; + } + + if (q->temp_replica_count <= 1) { + return 0; + } + + int current_replica_count = vine_file_replica_count(q, f); + if (current_replica_count == 0 || current_replica_count >= q->temp_replica_count) { + return 0; + } + + priority_queue_push(q->temp_files_to_replicate, f, -current_replica_count); + + return 1; +} + +/** +Iterate through temporary files that still need additional replicas and +trigger peer-to-peer transfers when both a source and destination worker +are available. The function honors the manager's scheduling depth so that we +do not spend too much time evaluating the queue in a single invocation. +Files that cannot be replicated immediately are deferred by lowering their +priority and will be reconsidered in future calls. +*/ +int vine_temp_start_replication(struct vine_manager *q) +{ + if (!q) { + return 0; + } + + /* Count the number of files that have been processed. */ + int processed = 0; + + /* Only examine up to attempt_schedule_depth files to keep the event loop responsive. */ + int iter_depth = MIN(q->attempt_schedule_depth, priority_queue_size(q->temp_files_to_replicate)); + + /* Files that cannot be replicated now are temporarily stored and re-queued at the end. */ + struct list *skipped = list_create(); + + struct vine_file *f; + for (int i = 0; i < iter_depth; i++) { + f = priority_queue_pop(q->temp_files_to_replicate); + if (!f) { + break; + } + /* skip and discard the replication if the file is not valid */ + if (!f || f->type != VINE_TEMP || f->state != VINE_FILE_STATE_CREATED) { + continue; + } + + /* skip and discard the replication if the file has enough replicas or no sources at all */ + int current_replica_count = vine_file_replica_count(q, f); + if (current_replica_count >= q->temp_replica_count || current_replica_count == 0) { + continue; + } + /* skip and discard the replication if the file has no ready replicas, will be re-enqueued upon its next cache-update */ + int current_ready_replica_count = vine_file_replica_table_count_replicas(q, f->cached_name, VINE_FILE_REPLICA_STATE_READY); + if (current_ready_replica_count == 0) { + continue; + } + + /* If reaches here, the file needs more replicas and has at least one ready source, so we start finding a valid source and destination worker + * to trigger the replication. If fails to find a valid source or destination worker, we requeue the file and will consider it later. */ + if (!attempt_replication(q, f)) { + list_push_tail(skipped, f); + continue; + } + + processed++; + + /* Requeue the file with lower priority (-1 because the current replica count is added up) so it can accumulate replicas gradually. */ + vine_temp_queue_for_replication(q, f); + } + + while ((f = list_pop_head(skipped))) { + vine_temp_queue_for_replication(q, f); + } + list_delete(skipped); + + return processed; +} + +/** +Clean redundant replicas of a temporary file. +For example, a file may be transferred to another worker because a task that declares it +as input is scheduled there, resulting in an extra replica that consumes storage space. +This function evaluates whether the file has excessive replicas and removes those on +workers that do not execute their dependent tasks. +*/ +void vine_temp_clean_redundant_replicas(struct vine_manager *q, struct vine_file *f) +{ + if (!f || f->type != VINE_TEMP) { + return; + } + + struct set *source_workers = hash_table_lookup(q->file_worker_table, f->cached_name); + if (!source_workers) { + return; + } + + int excess_replicas = set_size(source_workers) - q->temp_replica_count; + if (excess_replicas <= 0) { + return; + } + /* Note that this replica may serve as a source for a peer transfer. If it is unlinked prematurely, + * the corresponding transfer could fail and leave a task without its required data. + * Therefore, we must wait until all replicas are confirmed ready before proceeding. */ + if (vine_file_replica_table_count_replicas(q, f->cached_name, VINE_FILE_REPLICA_STATE_READY) != set_size(source_workers)) { + return; + } + + struct priority_queue *clean_replicas_from_workers = priority_queue_create(0); + + struct vine_worker_info *source_worker = NULL; + SET_ITERATE(source_workers, source_worker) + { + /* if the file is actively in use by a task (the input to that task), we don't remove the replica on this worker */ + int file_inuse = 0; + + uint64_t task_id; + struct vine_task *task; + ITABLE_ITERATE(source_worker->current_tasks, task_id, task) + { + struct vine_mount *input_mount; + LIST_ITERATE(task->input_mounts, input_mount) + { + if (f == input_mount->file) { + file_inuse = 1; + break; + } + } + if (file_inuse) { + break; + } + } + + if (file_inuse) { + continue; + } + + priority_queue_push(clean_replicas_from_workers, source_worker, source_worker->inuse_cache); + } + + while (excess_replicas > 0) { + source_worker = priority_queue_pop(clean_replicas_from_workers); + if (!source_worker) { + break; + } + delete_worker_file(q, source_worker, f->cached_name, 0, 0); + excess_replicas--; + } + + priority_queue_delete(clean_replicas_from_workers); + + return; +} + +/** +Shift a temp file replica away from the worker using the most cache space. +This function looks for an alternative worker that can accept the file immediately +so that the original replica can be cleaned up later by @vine_temp_clean_redundant_replicas(). +*/ +void vine_temp_shift_disk_load(struct vine_manager *q, struct vine_worker_info *source_worker, struct vine_file *f) +{ + if (!q || !source_worker || !f || f->type != VINE_TEMP) { + return; + } + + struct vine_worker_info *target_worker = NULL; + + char *key; + struct vine_worker_info *w = NULL; + HASH_TABLE_ITERATE(q->worker_table, key, w) + { + /* skip if the worker cannot participate in peer transfers */ + if (!worker_can_peer_transfer(w)) { + continue; + } + /* skip if the worker already has this file */ + if (vine_file_replica_table_lookup(w, f->cached_name)) { + continue; + } + /* skip if the worker does not have enough disk space */ + if (worker_get_available_disk(w) < (int64_t)f->size) { + continue; + } + /* skip if the worker becomes heavier after the transfer */ + if (w->inuse_cache + f->size > source_worker->inuse_cache - f->size) { + continue; + } + /* workers with more available disk space are preferred */ + if (!target_worker || worker_get_available_disk(w) > worker_get_available_disk(target_worker)) { + target_worker = w; + } + } + if (target_worker) { + start_peer_transfer(q, f, target_worker, source_worker); + } + + /* We can clean up the original one safely when the replica arrives at the destination worker. */ + vine_temp_clean_redundant_replicas(q, f); +} \ No newline at end of file diff --git a/taskvine/src/manager/vine_temp.h b/taskvine/src/manager/vine_temp.h new file mode 100644 index 0000000000..de25f26a0f --- /dev/null +++ b/taskvine/src/manager/vine_temp.h @@ -0,0 +1,14 @@ +#ifndef vine_temp_H +#define vine_temp_H + +#include "vine_manager.h" + +/** Replication related functions */ +int vine_temp_queue_for_replication(struct vine_manager *q, struct vine_file *f); +int vine_temp_start_replication(struct vine_manager *q); + +/** Storage management functions */ +void vine_temp_clean_redundant_replicas(struct vine_manager *q, struct vine_file *f); +void vine_temp_shift_disk_load(struct vine_manager *q, struct vine_worker_info *source_worker, struct vine_file *f); + +#endif \ No newline at end of file