Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 50 additions & 24 deletions rust/rsc/src/bin/rsc/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,8 @@ fn launch_blob_eviction(
// This gives clients time to reference a blob before it gets evicted.
let ttl = (Utc::now() - Duration::from_secs(config.blob_eviction.ttl)).naive_utc();

let blobs = match database::delete_unreferenced_blobs(
// Get blob candidates for deletion
let blob_candidates = match database::get_unreferenced_blobs_for_eviction(
conn.as_ref(),
ttl,
config.blob_eviction.chunk_size,
Expand All @@ -270,20 +271,26 @@ fn launch_blob_eviction(
{
Ok(b) => b,
Err(err) => {
tracing::error!(%err, "Failed to delete blobs for eviction");
tracing::error!(%err, "Failed to get blob candidates for eviction");
should_sleep = true;
continue; // Try again on the next tick
}
};

let deleted = blobs.len();
let candidate_count = blob_candidates.len();
should_sleep = candidate_count == 0;

should_sleep = deleted == 0;
if candidate_count == 0 {
Comment on lines +281 to +283
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A) Does should_sleep get used later on in this scope, and B) whether or not it does, should we use that in the if?

tracing::info!("No blobs found for eviction");
continue;
}

tracing::info!(%candidate_count, "Found blob candidates for eviction");

tracing::info!(%deleted, "N blobs deleted for eviction");
// Delete from blob storage first to ensure no untracked files are left behind
let mut successfully_deleted_blob_ids = Vec::new();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know Rust. Is the mut saying that the contents of the vector might change, or is it overkill and indicates that the vector container itself might be switched out?


// Delete blobs from blob store
let chunked: Vec<Vec<database::DeletedBlob>> = blobs
let chunked: Vec<Vec<database::BlobCandidate>> = blob_candidates
.into_iter()
.chunks(config.blob_eviction.file_chunk_size)
.into_iter()
Expand All @@ -292,26 +299,45 @@ fn launch_blob_eviction(

for chunk in chunked {
let thread_store = blob_stores.clone();
let mut chunk_success_ids = Vec::new();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same question here.


tokio::spawn(async move {
for blob in chunk {
let store = match thread_store.get(&blob.store_id) {
Some(s) => s.clone(),
None => {
let blob = blob.clone();
tracing::info!(%blob.store_id, %blob.key, "Blob has been orphaned!");
tracing::error!(%blob.store_id, "Blob's store id missing from activated stores");
continue;
}
};

store.delete_key(blob.key.clone()).await.unwrap_or_else(|err| {
let blob = blob.clone();
// Process chunk synchronously to collect successful deletions
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How critical is the synchronous nature? Unless there's a contents-comparison or something which depends or order, it seems like we could still allow async deletion. Assuming Rust allows objects to be shared between threads, of course, which now that I say it seems like there's a chance it breaks the ownership module or something. Or were we paying extra complexity for no real time/compute savings?

for blob in chunk {
let store = match thread_store.get(&blob.store_id) {
Some(s) => s.clone(),
None => {
tracing::info!(%blob.store_id, %blob.key, "Blob has been orphaned!");
tracing::error!(%err, "Failed to delete blob from store for eviction. See above for blob info");
});
tracing::error!(%blob.store_id, "Blob's store id missing from activated stores");
continue;
}
};

match store.delete_key(blob.key.clone()).await {
Ok(_) => {
tracing::debug!(%blob.key, "Successfully deleted blob from storage");
chunk_success_ids.push(blob.id);
}
Err(err) => {
tracing::error!(%err, %blob.key, %blob.store_id, "Failed to delete blob from storage");
}
}
});
}

successfully_deleted_blob_ids.extend(chunk_success_ids);
}

// Delete from database only the successfully deleted blobs
if !successfully_deleted_blob_ids.is_empty() {
match database::delete_blobs_by_ids(conn.as_ref(), &successfully_deleted_blob_ids).await {
Ok(deleted_count) => {
tracing::info!(%deleted_count, "Successfully deleted blobs from database after storage deletion");
}
Err(err) => {
tracing::error!(%err, "Failed to delete blob records from database after successful storage deletion - this may cause inconsistency");
}
}
} else {
tracing::warn!("No blobs were successfully deleted from storage");
}
}
});
Expand Down
77 changes: 76 additions & 1 deletion rust/rsc/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,7 @@ where
let hashes = JobHistoryHash::find_by_statement(Statement::from_sql_and_values(
DbBackend::Postgres,
r#"
DELETE FROM job
DELETE FROM job
WHERE created_at <= $1
RETURNING hash
"#,
Expand Down Expand Up @@ -670,6 +670,13 @@ pub struct DeletedBlob {
pub key: String,
}

#[derive(Clone, Debug, FromQueryResult)]
pub struct BlobCandidate {
pub id: Uuid,
pub store_id: Uuid,
pub key: String,
}

// Deletes blobs from the database that are unreferenced and have surpassed the allocated grace
// period to be referenced.
//
Expand Down Expand Up @@ -705,6 +712,74 @@ pub async fn delete_unreferenced_blobs<T: ConnectionTrait>(
.await
}

pub async fn get_unreferenced_blobs_for_eviction<T: ConnectionTrait>(
db: &T,
ttl: NaiveDateTime,
chunk: u32,
) -> Result<Vec<BlobCandidate>, DbErr> {
BlobCandidate::find_by_statement(Statement::from_sql_and_values(
DbBackend::Postgres,
r#"
WITH
eligible_blob_ids as (
SELECT DISTINCT id FROM blob
WHERE updated_at <= $1
EXCEPT (
SELECT blob_id FROM output_file
UNION ALL SELECT stdout_blob_id FROM job
UNION ALL SELECT stderr_blob_id FROM job
)
LIMIT $2
)
SELECT b.id, b.store_id, b.key
FROM blob b
WHERE b.id IN (SELECT id FROM eligible_blob_ids)
"#,
[ttl.into(), chunk.into()],
))
.all(db)
.await
}

pub async fn delete_blobs_by_ids<T: ConnectionTrait>(
db: &T,
blob_ids: &[Uuid],
) -> Result<u64, DbErr> {
if blob_ids.is_empty() {
return Ok(0);
}

let chunked: Vec<Vec<Uuid>> = blob_ids
.iter()
.chunks(1000) // Batch in chunks of 1000 to avoid parameter limits
.into_iter()
.map(|chunk| chunk.cloned().collect())
.collect();

let mut total_deleted = 0;

for chunk in chunked {
let placeholders = chunk.iter().enumerate()
.map(|(i, _)| format!("${}", i + 1))
.collect::<Vec<_>>()
.join(",");

let sql = format!("DELETE FROM blob WHERE id IN ({})", placeholders);

let values: Vec<sea_orm::Value> = chunk.into_iter().map(|id| id.into()).collect();

let result = db.execute(Statement::from_sql_and_values(
DbBackend::Postgres,
&sql,
values,
)).await?;

total_deleted += result.rows_affected();
}

Ok(total_deleted)
}

// --------------------------------------------------
// ---------- JobAudit ----------
// --------------------------------------------------
Expand Down