diff --git a/doc/manuals/taskvine/index.md b/doc/manuals/taskvine/index.md
index a3d2617784..2412a11b81 100644
--- a/doc/manuals/taskvine/index.md
+++ b/doc/manuals/taskvine/index.md
@@ -2750,6 +2750,7 @@ change.
|-----------|-------------|---------------|
| attempt-schedule-depth | The amount of tasks to attempt scheduling on each pass of send_one_task in the main loop. | 100 |
| category-steady-n-tasks | Minimum number of successful tasks to use a sample for automatic resource allocation modes after encountering a new resource maximum. | 25 |
+| clean-redundant-replicas | Remove redundant temporary file replicas to save worker's local disk space. | 0 |
| default-transfer-rate | The assumed network bandwidth used until sufficient data has been collected. (1MB/s)
| disconnect-slow-workers-factor | Set the multiplier of the average task time at which point to disconnect a worker; disabled if less than 1. (default=0)
| hungry-minimum | Smallest number of waiting tasks in the manager before declaring it hungry | 10 |
@@ -2767,6 +2768,7 @@ change.
| ramp-down-heuristic | If set to 1 and there are more workers than tasks waiting, then tasks are allocated all the free resources of a worker large enough to run them. If monitoring watchdog is not enabled, then this heuristic has no effect. | 0 |
| resource-submit-multiplier | Assume that workers have `resource x resources-submit-multiplier` available.
This overcommits resources at the worker, causing tasks to be sent to workers that cannot be immediately executed.
The extra tasks wait at the worker until resources become available. | 1 |
| sandbox-grow-factor | When task disk sandboxes are exhausted, increase the allocation using their measured valued times this factor. Minimum is 1.1. | 2 |
+| shift-disk-load | Proactively shift temporary files away from the most disk-heavy worker to those with more available disk. | 0 |
| short-timeout | Set the minimum timeout in seconds when sending a brief message to a single worker. | 5 |
| temp-replica-count | Number of temp file replicas created across workers | 0 |
| transfer-outlier-factor | Transfer that are this many times slower than the average will be terminated. | 10 |
diff --git a/taskvine/src/manager/Makefile b/taskvine/src/manager/Makefile
index a036e9bd64..db09403667 100644
--- a/taskvine/src/manager/Makefile
+++ b/taskvine/src/manager/Makefile
@@ -28,7 +28,8 @@ SOURCES = \
vine_file_replica_table.c \
vine_fair.c \
vine_runtime_dir.c \
- vine_task_groups.c
+ vine_task_groups.c \
+ vine_temp.c
PUBLIC_HEADERS = taskvine.h
diff --git a/taskvine/src/manager/vine_manager.c b/taskvine/src/manager/vine_manager.c
index f9135ad47b..3030bfe5ed 100644
--- a/taskvine/src/manager/vine_manager.c
+++ b/taskvine/src/manager/vine_manager.c
@@ -29,6 +29,7 @@ See the file COPYING for details.
#include "vine_taskgraph_log.h"
#include "vine_txn_log.h"
#include "vine_worker_info.h"
+#include "vine_temp.h"
#include "address.h"
#include "buffer.h"
@@ -165,7 +166,6 @@ static int vine_manager_check_inputs_available(struct vine_manager *q, struct vi
static void vine_manager_consider_recovery_task(struct vine_manager *q, struct vine_file *lost_file, struct vine_task *rt);
static void delete_uncacheable_files(struct vine_manager *q, struct vine_worker_info *w, struct vine_task *t);
-static int delete_worker_file(struct vine_manager *q, struct vine_worker_info *w, const char *filename, vine_cache_level_t cache_level, vine_cache_level_t delete_upto_level);
static int release_worker(struct vine_manager *q, struct vine_worker_info *w);
struct vine_task *send_library_to_worker(struct vine_manager *q, struct vine_worker_info *w, const char *name);
@@ -424,9 +424,17 @@ static vine_msg_code_t handle_cache_update(struct vine_manager *q, struct vine_w
replica->type = f->type;
}
- /* And if the file is a newly created temporary, replicate as needed. */
- if (f->type == VINE_TEMP && *id == 'X' && q->temp_replica_count > 1) {
- hash_table_insert(q->temp_files_to_replicate, f->cached_name, NULL);
+ /* If a TEMP file, perform various actions as needed. */
+ if (f->type == VINE_TEMP) {
+ if (q->temp_replica_count > 1) {
+ vine_temp_queue_for_replication(q, f);
+ }
+ if (q->shift_disk_load) {
+ vine_temp_shift_disk_load(q, w, f);
+ }
+ if (q->clean_redundant_replicas) {
+ vine_temp_clean_redundant_replicas(q, f);
+ }
}
}
}
@@ -482,6 +490,12 @@ static vine_msg_code_t handle_cache_invalid(struct vine_manager *q, struct vine_
w->last_failure_time = timestamp_get();
}
+ /* Respond to a missing replica notification by re-queuing the corresponding file
+ * for replication. If the replica does not have any ready source, it will be silently
+ * discarded in the replication phase. */
+ struct vine_file *f = hash_table_lookup(q->file_table, cachename);
+ vine_temp_queue_for_replication(q, f);
+
/* Successfully processed this message. */
return VINE_MSG_PROCESSED;
} else {
@@ -656,6 +670,19 @@ static vine_result_code_t get_completion_result(struct vine_manager *q, struct v
itable_remove(q->running_table, t->task_id);
vine_task_set_result(t, task_status);
+ /* A task scheduling may result in a redundant replica of its input due to peer transfers, which can be safely removed when completed.
+ * However, the general function of taskvine is to replicate files on demand, and to only clean them up when prune is called.
+ * So, we only clean up redundant replicas for the task-inputs when the manager is configured to do so. */
+ if (q->clean_redundant_replicas) {
+ struct vine_mount *input_mount;
+ LIST_ITERATE(t->input_mounts, input_mount)
+ {
+ if (input_mount->file && input_mount->file->type == VINE_TEMP) {
+ vine_temp_clean_redundant_replicas(q, input_mount->file);
+ }
+ }
+ }
+
return VINE_SUCCESS;
}
@@ -1047,85 +1074,6 @@ static void cleanup_worker(struct vine_manager *q, struct vine_worker_info *w)
cleanup_worker_files(q, w);
}
-/* Start replicating files that may need replication */
-static int consider_tempfile_replications(struct vine_manager *q)
-{
- if (hash_table_size(q->temp_files_to_replicate) <= 0) {
- return 0;
- }
-
- char *cached_name = NULL;
- void *empty_val = NULL;
- int total_replication_request_sent = 0;
-
- static char key_start[PATH_MAX] = "random init";
- int iter_control;
- int iter_count_var;
-
- struct list *to_remove = list_create();
-
- HASH_TABLE_ITERATE_FROM_KEY(q->temp_files_to_replicate, iter_control, iter_count_var, key_start, cached_name, empty_val)
- {
- struct vine_file *f = hash_table_lookup(q->file_table, cached_name);
-
- if (!f) {
- continue;
- }
-
- /* are there any available source workers? */
- struct set *source_workers = hash_table_lookup(q->file_worker_table, f->cached_name);
- if (!source_workers) {
- /* If no source workers found, it indicates that the file doesn't exist, either pruned or lost.
- Because a pruned file is removed from the recovery queue, so it definitely indicates that the file is lost. */
- if (q->transfer_temps_recovery && file_needs_recovery(q, f)) {
- vine_manager_consider_recovery_task(q, f, f->recovery_task);
- }
- list_push_tail(to_remove, xxstrdup(f->cached_name));
- continue;
- }
-
- /* at least one source is able to transfer? */
- int has_valid_source = 0;
- struct vine_worker_info *s;
- SET_ITERATE(source_workers, s)
- {
- if (s->transfer_port_active && s->outgoing_xfer_counter < q->worker_source_max_transfers && !s->draining) {
- has_valid_source = 1;
- break;
- }
- }
- if (!has_valid_source) {
- continue;
- }
-
- /* has this file been fully replicated? */
- int nsource_workers = set_size(source_workers);
- int to_find = MIN(q->temp_replica_count - nsource_workers, q->transfer_replica_per_cycle);
- if (to_find <= 0) {
- list_push_tail(to_remove, xxstrdup(f->cached_name));
- continue;
- }
-
- // debug(D_VINE, "Found %d workers holding %s, %d replicas needed", nsource_workers, f->cached_name, to_find);
-
- int round_replication_request_sent = vine_file_replica_table_replicate(q, f, source_workers, to_find);
- total_replication_request_sent += round_replication_request_sent;
-
- if (total_replication_request_sent >= q->attempt_schedule_depth) {
- break;
- }
- }
-
- while ((cached_name = list_pop_head(to_remove))) {
- hash_table_remove(q->temp_files_to_replicate, cached_name);
- free(cached_name);
- }
-
- list_delete(to_remove);
-
- return total_replication_request_sent;
-}
-
/* Insert into hashtable temp files that may need replication. */
static void recall_worker_lost_temp_files(struct vine_manager *q, struct vine_worker_info *w)
@@ -1138,11 +1086,11 @@ static void recall_worker_lost_temp_files(struct vine_manager *q, struct vine_wo
// Iterate over files we want might want to recover
HASH_TABLE_ITERATE(w->current_files, cached_name, info)
{
+ /* Respond to a data loss due to worker removal by re-queuing the corresponding file
+ * for replication. If the replica does not have any ready source, it will be silently
+ * discarded in the replication phase. */
struct vine_file *f = hash_table_lookup(q->file_table, cached_name);
-
- if (f && f->type == VINE_TEMP) {
- hash_table_insert(q->temp_files_to_replicate, cached_name, NULL);
- }
+ vine_temp_queue_for_replication(q, f);
}
}
@@ -1254,7 +1202,7 @@ static void add_worker(struct vine_manager *q)
/* Delete a single file on a remote worker except those with greater delete_upto_level cache level */
-static int delete_worker_file(struct vine_manager *q, struct vine_worker_info *w, const char *filename, vine_cache_level_t cache_level, vine_cache_level_t delete_upto_level)
+int delete_worker_file(struct vine_manager *q, struct vine_worker_info *w, const char *filename, vine_cache_level_t cache_level, vine_cache_level_t delete_upto_level)
{
if (cache_level <= delete_upto_level) {
process_replica_on_event(q, w, filename, VINE_FILE_REPLICA_STATE_TRANSITION_EVENT_UNLINK);
@@ -4146,7 +4094,7 @@ struct vine_manager *vine_ssl_create(int port, const char *key, const char *cert
q->worker_table = hash_table_create(0, 0);
q->file_worker_table = hash_table_create(0, 0);
- q->temp_files_to_replicate = hash_table_create(0, 0);
+ q->temp_files_to_replicate = priority_queue_create(0);
q->worker_blocklist = hash_table_create(0, 0);
q->file_table = hash_table_create(0, 0);
@@ -4230,6 +4178,8 @@ struct vine_manager *vine_ssl_create(int port, const char *key, const char *cert
q->perf_log_interval = VINE_PERF_LOG_INTERVAL;
q->temp_replica_count = 1;
+ q->clean_redundant_replicas = 0;
+ q->shift_disk_load = 0;
q->transfer_temps_recovery = 0;
q->transfer_replica_per_cycle = 10;
@@ -4498,8 +4448,7 @@ void vine_delete(struct vine_manager *q)
hash_table_clear(q->file_worker_table, (void *)set_delete);
hash_table_delete(q->file_worker_table);
- hash_table_clear(q->temp_files_to_replicate, 0);
- hash_table_delete(q->temp_files_to_replicate);
+ priority_queue_delete(q->temp_files_to_replicate);
hash_table_clear(q->factory_table, (void *)vine_factory_info_delete);
hash_table_delete(q->factory_table);
@@ -5460,7 +5409,7 @@ static struct vine_task *vine_wait_internal(struct vine_manager *q, int timeout,
// Check if any temp files need replication and start replicating
BEGIN_ACCUM_TIME(q, time_internal);
- result = consider_tempfile_replications(q);
+ result = vine_temp_start_replication(q);
END_ACCUM_TIME(q, time_internal);
if (result) {
// recovered at least one temp file
@@ -5853,7 +5802,6 @@ void vine_set_manager_preferred_connection(struct vine_manager *q, const char *p
int vine_tune(struct vine_manager *q, const char *name, double value)
{
-
if (!strcmp(name, "attempt-schedule-depth")) {
q->attempt_schedule_depth = MAX(1, (int)value);
@@ -5992,6 +5940,12 @@ int vine_tune(struct vine_manager *q, const char *name, double value)
} else if (!strcmp(name, "enforce-worker-eviction-interval")) {
q->enforce_worker_eviction_interval = (timestamp_t)(MAX(0, (int)value) * ONE_SECOND);
+ } else if (!strcmp(name, "clean-redundant-replicas")) {
+ q->clean_redundant_replicas = !!((int)value);
+
+ } else if (!strcmp(name, "shift-disk-load")) {
+ q->shift_disk_load = !!((int)value);
+
} else {
debug(D_NOTICE | D_VINE, "Warning: tuning parameter \"%s\" not recognized\n", name);
return -1;
@@ -6476,9 +6430,6 @@ int vine_prune_file(struct vine_manager *m, struct vine_file *f)
}
}
- /* also remove from the replication table. */
- hash_table_remove(m->temp_files_to_replicate, f->cached_name);
-
return pruned_replica_count;
}
diff --git a/taskvine/src/manager/vine_manager.h b/taskvine/src/manager/vine_manager.h
index e53847b855..d6c393e6bd 100644
--- a/taskvine/src/manager/vine_manager.h
+++ b/taskvine/src/manager/vine_manager.h
@@ -123,7 +123,7 @@ struct vine_manager {
struct hash_table *file_table; /* Maps fileid -> struct vine_file.* */
struct hash_table *file_worker_table; /* Maps cachename -> struct set of workers with a replica of the file.* */
- struct hash_table *temp_files_to_replicate; /* Maps cachename -> NULL. Used as a set of temp files to be replicated */
+ struct priority_queue *temp_files_to_replicate; /* Priority queue of temp files to be replicated, those with less replicas are at the top. */
/* Primary scheduling controls. */
@@ -217,6 +217,8 @@ struct vine_manager {
int transfer_temps_recovery; /* If true, attempt to recover temp files from lost worker to reach threshold required */
int transfer_replica_per_cycle; /* Maximum number of replica to request per temp file per iteration */
int temp_replica_count; /* Number of replicas per temp file */
+ int clean_redundant_replicas; /* If true, remove redundant replicas of temp files to save disk space. */
+ int shift_disk_load; /* If true, shift storage burden to more available workers to minimize disk usage peaks. */
double resource_submit_multiplier; /* Factor to permit overcommitment of resources at each worker. */
double bandwidth_limit; /* Artificial limit on bandwidth of manager<->worker transfers. */
@@ -291,6 +293,9 @@ void vine_manager_remove_worker(struct vine_manager *q, struct vine_worker_info
/* Check if the worker is able to transfer the necessary files for this task. */
int vine_manager_transfer_capacity_available(struct vine_manager *q, struct vine_worker_info *w, struct vine_task *t);
+/* Delete a file from a worker. */
+int delete_worker_file(struct vine_manager *q, struct vine_worker_info *w, const char *filename, vine_cache_level_t cache_level, vine_cache_level_t delete_upto_level);
+
/** Release a random worker to simulate a failure. */
int release_random_worker(struct vine_manager *q);
diff --git a/taskvine/src/manager/vine_temp.c b/taskvine/src/manager/vine_temp.c
new file mode 100644
index 0000000000..cae4b183ee
--- /dev/null
+++ b/taskvine/src/manager/vine_temp.c
@@ -0,0 +1,416 @@
+#include "vine_temp.h"
+#include "vine_file.h"
+#include "vine_worker_info.h"
+#include "vine_file_replica_table.h"
+#include "vine_manager.h"
+#include "vine_manager_put.h"
+#include "vine_file_replica.h"
+#include "vine_task.h"
+#include "vine_mount.h"
+
+#include "priority_queue.h"
+#include "macros.h"
+#include "stringtools.h"
+#include "debug.h"
+#include "random.h"
+#include "xxmalloc.h"
+
+/*************************************************************/
+/* Private Functions */
+/*************************************************************/
+
+/**
+Check whether a worker is eligible to participate in peer transfers.
+*/
+static int worker_can_peer_transfer(struct vine_worker_info *w)
+{
+ if (!w) {
+ return 0;
+ }
+ if (w->type != VINE_WORKER_TYPE_WORKER) {
+ return 0;
+ }
+ if (!w->transfer_port_active) {
+ return 0;
+ }
+ if (w->draining) {
+ return 0;
+ }
+ if (!w->resources) {
+ return 0;
+ }
+ if (w->resources->tag < 0) {
+ return 0;
+ }
+ return 1;
+}
+
+/**
+Return available disk space (in bytes) on a worker.
+Note that w->resources->disk.total is in megabytes.
+Use int64_t in case the actual usage is larger than the total space.
+Returns -1 if worker or resources is invalid, or if the available disk space is negative.
+*/
+static int64_t worker_get_available_disk(const struct vine_worker_info *w)
+{
+ if (!w || !w->resources) {
+ return -1;
+ }
+
+ int64_t available_disk = (int64_t)MEGABYTES_TO_BYTES(w->resources->disk.total) - w->inuse_cache;
+ if (available_disk < 0) {
+ return -1;
+ }
+
+ return available_disk;
+}
+
+/**
+Find the most suitable worker to serve as the source of a replica transfer.
+Eligible workers already host the file, have a ready replica, and are not
+overloaded with outgoing transfers. Preference is given to workers with fewer
+outgoing transfers to balance load.
+*/
+static struct vine_worker_info *get_best_source_worker(struct vine_manager *q, struct vine_file *f)
+{
+ if (!q || !f || f->type != VINE_TEMP) {
+ return NULL;
+ }
+
+ struct set *sources = hash_table_lookup(q->file_worker_table, f->cached_name);
+ if (!sources) {
+ return NULL;
+ }
+
+ struct vine_worker_info *best_source_worker = NULL;
+
+ struct vine_worker_info *w = NULL;
+ SET_ITERATE(sources, w)
+ {
+ /* skip if the worker cannot participate in peer transfers */
+ if (!worker_can_peer_transfer(w)) {
+ continue;
+ }
+ /* skip if outgoing transfer counter is too high */
+ if (w->outgoing_xfer_counter >= q->worker_source_max_transfers) {
+ continue;
+ }
+ /* skip if the worker does not have this file (which is faulty) */
+ struct vine_file_replica *replica = vine_file_replica_table_lookup(w, f->cached_name);
+ if (!replica) {
+ continue;
+ }
+ /* skip if the file is not ready */
+ if (replica->state != VINE_FILE_REPLICA_STATE_READY) {
+ continue;
+ }
+ /* workers with fewer outgoing transfers are preferred */
+ if (!best_source_worker || w->outgoing_xfer_counter < best_source_worker->outgoing_xfer_counter) {
+ best_source_worker = w;
+ }
+ }
+
+ return best_source_worker;
+}
+
+/**
+Select a destination worker that can accept a new replica. Workers must be
+active, not currently hosting the file, and have sufficient free cache space.
+Those with more available disk space are prioritized to reduce pressure on
+heavily utilized workers.
+*/
+static struct vine_worker_info *get_best_dest_worker(struct vine_manager *q, struct vine_file *f)
+{
+ if (!q || !f || f->type != VINE_TEMP) {
+ return NULL;
+ }
+
+ struct vine_worker_info *best_dest_worker = NULL;
+
+ char *key;
+ struct vine_worker_info *w;
+ HASH_TABLE_ITERATE(q->worker_table, key, w)
+ {
+ /* skip if the worker cannot participate in peer transfers */
+ if (!worker_can_peer_transfer(w)) {
+ continue;
+ }
+ /* skip if the incoming transfer counter is too high */
+ if (w->incoming_xfer_counter >= q->worker_source_max_transfers) {
+ continue;
+ }
+ /* skip if the worker already has this file */
+ struct vine_file_replica *replica = vine_file_replica_table_lookup(w, f->cached_name);
+ if (replica) {
+ continue;
+ }
+ /* skip if the worker does not have enough disk space */
+ if (worker_get_available_disk(w) < (int64_t)f->size) {
+ continue;
+ }
+ /* workers with more available disk space are preferred to hold the file */
+ if (!best_dest_worker || worker_get_available_disk(w) > worker_get_available_disk(best_dest_worker)) {
+ best_dest_worker = w;
+ }
+ }
+
+ return best_dest_worker;
+}
+
+/**
+Initiate a peer-to-peer transfer between two workers for the specified file.
+The source worker provides a direct URL so the destination worker can pull the
+replica immediately via `vine_manager_put_url_now`.
+*/
+static void start_peer_transfer(struct vine_manager *q, struct vine_file *f, struct vine_worker_info *dest_worker, struct vine_worker_info *source_worker)
+{
+ if (!q || !f || f->type != VINE_TEMP || !dest_worker || !source_worker) {
+ return;
+ }
+
+ char *source_addr = string_format("%s/%s", source_worker->transfer_url, f->cached_name);
+ vine_manager_put_url_now(q, dest_worker, source_worker, source_addr, f);
+ free(source_addr);
+}
+
+/**
+Attempt to replicate a temporary file immediately by selecting compatible
+source and destination workers. Returns 1 when a transfer is launched, or 0 if
+no suitable pair of workers is currently available.
+*/
+static int attempt_replication(struct vine_manager *q, struct vine_file *f)
+{
+ if (!q || !f || f->type != VINE_TEMP) {
+ return 0;
+ }
+
+ struct vine_worker_info *source_worker = get_best_source_worker(q, f);
+ if (!source_worker) {
+ return 0;
+ }
+
+ struct vine_worker_info *dest_worker = get_best_dest_worker(q, f);
+ if (!dest_worker) {
+ return 0;
+ }
+
+ start_peer_transfer(q, f, dest_worker, source_worker);
+
+ return 1;
+}
+
+/*************************************************************/
+/* Public Functions */
+/*************************************************************/
+
+/**
+Queue a temporary file for replication when it still lacks the target number of
+replicas. Files without any replica and those already satisfying the quota are
+ignored. A lower priority value gives preference to scarcer replicas.
+*/
+int vine_temp_queue_for_replication(struct vine_manager *q, struct vine_file *f)
+{
+ if (!q || !f || f->type != VINE_TEMP || f->state != VINE_FILE_STATE_CREATED) {
+ return 0;
+ }
+
+ if (q->temp_replica_count <= 1) {
+ return 0;
+ }
+
+ int current_replica_count = vine_file_replica_count(q, f);
+ if (current_replica_count == 0 || current_replica_count >= q->temp_replica_count) {
+ return 0;
+ }
+
+ priority_queue_push(q->temp_files_to_replicate, f, -current_replica_count);
+
+ return 1;
+}
+
+/**
+Iterate through temporary files that still need additional replicas and
+trigger peer-to-peer transfers when both a source and destination worker
+are available. The function honors the manager's scheduling depth so that we
+do not spend too much time evaluating the queue in a single invocation.
+Files that cannot be replicated immediately are deferred by lowering their
+priority and will be reconsidered in future calls.
+*/
+int vine_temp_start_replication(struct vine_manager *q)
+{
+ if (!q) {
+ return 0;
+ }
+
+ /* Count the number of files that have been processed. */
+ int processed = 0;
+
+ /* Only examine up to attempt_schedule_depth files to keep the event loop responsive. */
+ int iter_depth = MIN(q->attempt_schedule_depth, priority_queue_size(q->temp_files_to_replicate));
+
+ /* Files that cannot be replicated now are temporarily stored and re-queued at the end. */
+ struct list *skipped = list_create();
+
+ struct vine_file *f;
+ for (int i = 0; i < iter_depth; i++) {
+ f = priority_queue_pop(q->temp_files_to_replicate);
+ if (!f) {
+ break;
+ }
+ /* skip and discard the replication if the file is not valid */
+ if (!f || f->type != VINE_TEMP || f->state != VINE_FILE_STATE_CREATED) {
+ continue;
+ }
+
+ /* skip and discard the replication if the file has enough replicas or no sources at all */
+ int current_replica_count = vine_file_replica_count(q, f);
+ if (current_replica_count >= q->temp_replica_count || current_replica_count == 0) {
+ continue;
+ }
+ /* skip and discard the replication if the file has no ready replicas, will be re-enqueued upon its next cache-update */
+ int current_ready_replica_count = vine_file_replica_table_count_replicas(q, f->cached_name, VINE_FILE_REPLICA_STATE_READY);
+ if (current_ready_replica_count == 0) {
+ continue;
+ }
+
+ /* If reaches here, the file needs more replicas and has at least one ready source, so we start finding a valid source and destination worker
+ * to trigger the replication. If fails to find a valid source or destination worker, we requeue the file and will consider it later. */
+ if (!attempt_replication(q, f)) {
+ list_push_tail(skipped, f);
+ continue;
+ }
+
+ processed++;
+
+ /* Requeue the file with lower priority (-1 because the current replica count is added up) so it can accumulate replicas gradually. */
+ vine_temp_queue_for_replication(q, f);
+ }
+
+ while ((f = list_pop_head(skipped))) {
+ vine_temp_queue_for_replication(q, f);
+ }
+ list_delete(skipped);
+
+ return processed;
+}
+
+/**
+Clean redundant replicas of a temporary file.
+For example, a file may be transferred to another worker because a task that declares it
+as input is scheduled there, resulting in an extra replica that consumes storage space.
+This function evaluates whether the file has excessive replicas and removes those on
+workers that do not execute their dependent tasks.
+*/
+void vine_temp_clean_redundant_replicas(struct vine_manager *q, struct vine_file *f)
+{
+ if (!f || f->type != VINE_TEMP) {
+ return;
+ }
+
+ struct set *source_workers = hash_table_lookup(q->file_worker_table, f->cached_name);
+ if (!source_workers) {
+ return;
+ }
+
+ int excess_replicas = set_size(source_workers) - q->temp_replica_count;
+ if (excess_replicas <= 0) {
+ return;
+ }
+ /* Note that this replica may serve as a source for a peer transfer. If it is unlinked prematurely,
+ * the corresponding transfer could fail and leave a task without its required data.
+ * Therefore, we must wait until all replicas are confirmed ready before proceeding. */
+ if (vine_file_replica_table_count_replicas(q, f->cached_name, VINE_FILE_REPLICA_STATE_READY) != set_size(source_workers)) {
+ return;
+ }
+
+ struct priority_queue *clean_replicas_from_workers = priority_queue_create(0);
+
+ struct vine_worker_info *source_worker = NULL;
+ SET_ITERATE(source_workers, source_worker)
+ {
+ /* if the file is actively in use by a task (the input to that task), we don't remove the replica on this worker */
+ int file_inuse = 0;
+
+ uint64_t task_id;
+ struct vine_task *task;
+ ITABLE_ITERATE(source_worker->current_tasks, task_id, task)
+ {
+ struct vine_mount *input_mount;
+ LIST_ITERATE(task->input_mounts, input_mount)
+ {
+ if (f == input_mount->file) {
+ file_inuse = 1;
+ break;
+ }
+ }
+ if (file_inuse) {
+ break;
+ }
+ }
+
+ if (file_inuse) {
+ continue;
+ }
+
+ priority_queue_push(clean_replicas_from_workers, source_worker, source_worker->inuse_cache);
+ }
+
+ while (excess_replicas > 0) {
+ source_worker = priority_queue_pop(clean_replicas_from_workers);
+ if (!source_worker) {
+ break;
+ }
+ delete_worker_file(q, source_worker, f->cached_name, 0, 0);
+ excess_replicas--;
+ }
+
+ priority_queue_delete(clean_replicas_from_workers);
+
+ return;
+}
+
+/**
+Shift a temp file replica away from the worker using the most cache space.
+This function looks for an alternative worker that can accept the file immediately
+so that the original replica can be cleaned up later by @vine_temp_clean_redundant_replicas().
+*/
+void vine_temp_shift_disk_load(struct vine_manager *q, struct vine_worker_info *source_worker, struct vine_file *f)
+{
+ if (!q || !source_worker || !f || f->type != VINE_TEMP) {
+ return;
+ }
+
+ struct vine_worker_info *target_worker = NULL;
+
+ char *key;
+ struct vine_worker_info *w = NULL;
+ HASH_TABLE_ITERATE(q->worker_table, key, w)
+ {
+ /* skip if the worker cannot participate in peer transfers */
+ if (!worker_can_peer_transfer(w)) {
+ continue;
+ }
+ /* skip if the worker already has this file */
+ if (vine_file_replica_table_lookup(w, f->cached_name)) {
+ continue;
+ }
+ /* skip if the worker does not have enough disk space */
+ if (worker_get_available_disk(w) < (int64_t)f->size) {
+ continue;
+ }
+ /* skip if the worker becomes heavier after the transfer */
+ if (w->inuse_cache + f->size > source_worker->inuse_cache - f->size) {
+ continue;
+ }
+ /* workers with more available disk space are preferred */
+ if (!target_worker || worker_get_available_disk(w) > worker_get_available_disk(target_worker)) {
+ target_worker = w;
+ }
+ }
+ if (target_worker) {
+ start_peer_transfer(q, f, target_worker, source_worker);
+ }
+
+ /* We can clean up the original one safely when the replica arrives at the destination worker. */
+ vine_temp_clean_redundant_replicas(q, f);
+}
\ No newline at end of file
diff --git a/taskvine/src/manager/vine_temp.h b/taskvine/src/manager/vine_temp.h
new file mode 100644
index 0000000000..de25f26a0f
--- /dev/null
+++ b/taskvine/src/manager/vine_temp.h
@@ -0,0 +1,14 @@
+#ifndef vine_temp_H
+#define vine_temp_H
+
+#include "vine_manager.h"
+
+/** Replication related functions */
+int vine_temp_queue_for_replication(struct vine_manager *q, struct vine_file *f);
+int vine_temp_start_replication(struct vine_manager *q);
+
+/** Storage management functions */
+void vine_temp_clean_redundant_replicas(struct vine_manager *q, struct vine_file *f);
+void vine_temp_shift_disk_load(struct vine_manager *q, struct vine_worker_info *source_worker, struct vine_file *f);
+
+#endif
\ No newline at end of file