-
Notifications
You must be signed in to change notification settings - Fork 30
Handle orphaned blobs better in blob eviction logic for blob storage #1712
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -261,7 +261,8 @@ fn launch_blob_eviction( | |
| // This gives clients time to reference a blob before it gets evicted. | ||
| let ttl = (Utc::now() - Duration::from_secs(config.blob_eviction.ttl)).naive_utc(); | ||
|
|
||
| let blobs = match database::delete_unreferenced_blobs( | ||
| // Get blob candidates for deletion | ||
| let blob_candidates = match database::get_unreferenced_blobs_for_eviction( | ||
| conn.as_ref(), | ||
| ttl, | ||
| config.blob_eviction.chunk_size, | ||
|
|
@@ -270,20 +271,26 @@ fn launch_blob_eviction( | |
| { | ||
| Ok(b) => b, | ||
| Err(err) => { | ||
| tracing::error!(%err, "Failed to delete blobs for eviction"); | ||
| tracing::error!(%err, "Failed to get blob candidates for eviction"); | ||
| should_sleep = true; | ||
| continue; // Try again on the next tick | ||
| } | ||
| }; | ||
|
|
||
| let deleted = blobs.len(); | ||
| let candidate_count = blob_candidates.len(); | ||
| should_sleep = candidate_count == 0; | ||
|
|
||
| should_sleep = deleted == 0; | ||
| if candidate_count == 0 { | ||
| tracing::info!("No blobs found for eviction"); | ||
| continue; | ||
| } | ||
|
|
||
| tracing::info!(%candidate_count, "Found blob candidates for eviction"); | ||
|
|
||
| tracing::info!(%deleted, "N blobs deleted for eviction"); | ||
| // Delete from blob storage first to ensure no untracked files are left behind | ||
| let mut successfully_deleted_blob_ids = Vec::new(); | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't know Rust. Is the |
||
|
|
||
| // Delete blobs from blob store | ||
| let chunked: Vec<Vec<database::DeletedBlob>> = blobs | ||
| let chunked: Vec<Vec<database::BlobCandidate>> = blob_candidates | ||
| .into_iter() | ||
| .chunks(config.blob_eviction.file_chunk_size) | ||
| .into_iter() | ||
|
|
@@ -292,26 +299,45 @@ fn launch_blob_eviction( | |
|
|
||
| for chunk in chunked { | ||
| let thread_store = blob_stores.clone(); | ||
| let mut chunk_success_ids = Vec::new(); | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same question here. |
||
|
|
||
| tokio::spawn(async move { | ||
| for blob in chunk { | ||
| let store = match thread_store.get(&blob.store_id) { | ||
| Some(s) => s.clone(), | ||
| None => { | ||
| let blob = blob.clone(); | ||
| tracing::info!(%blob.store_id, %blob.key, "Blob has been orphaned!"); | ||
| tracing::error!(%blob.store_id, "Blob's store id missing from activated stores"); | ||
| continue; | ||
| } | ||
| }; | ||
|
|
||
| store.delete_key(blob.key.clone()).await.unwrap_or_else(|err| { | ||
| let blob = blob.clone(); | ||
| // Process chunk synchronously to collect successful deletions | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How critical is the synchronous nature? Unless there's a contents-comparison or something which depends or order, it seems like we could still allow async deletion. Assuming Rust allows objects to be shared between threads, of course, which now that I say it seems like there's a chance it breaks the ownership module or something. Or were we paying extra complexity for no real time/compute savings? |
||
| for blob in chunk { | ||
| let store = match thread_store.get(&blob.store_id) { | ||
| Some(s) => s.clone(), | ||
| None => { | ||
| tracing::info!(%blob.store_id, %blob.key, "Blob has been orphaned!"); | ||
| tracing::error!(%err, "Failed to delete blob from store for eviction. See above for blob info"); | ||
| }); | ||
| tracing::error!(%blob.store_id, "Blob's store id missing from activated stores"); | ||
| continue; | ||
| } | ||
| }; | ||
|
|
||
| match store.delete_key(blob.key.clone()).await { | ||
| Ok(_) => { | ||
| tracing::debug!(%blob.key, "Successfully deleted blob from storage"); | ||
| chunk_success_ids.push(blob.id); | ||
| } | ||
| Err(err) => { | ||
| tracing::error!(%err, %blob.key, %blob.store_id, "Failed to delete blob from storage"); | ||
| } | ||
| } | ||
| }); | ||
| } | ||
|
|
||
| successfully_deleted_blob_ids.extend(chunk_success_ids); | ||
| } | ||
|
|
||
| // Delete from database only the successfully deleted blobs | ||
| if !successfully_deleted_blob_ids.is_empty() { | ||
| match database::delete_blobs_by_ids(conn.as_ref(), &successfully_deleted_blob_ids).await { | ||
| Ok(deleted_count) => { | ||
| tracing::info!(%deleted_count, "Successfully deleted blobs from database after storage deletion"); | ||
| } | ||
| Err(err) => { | ||
| tracing::error!(%err, "Failed to delete blob records from database after successful storage deletion - this may cause inconsistency"); | ||
| } | ||
| } | ||
| } else { | ||
| tracing::warn!("No blobs were successfully deleted from storage"); | ||
| } | ||
| } | ||
| }); | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A) Does
should_sleepget used later on in this scope, and B) whether or not it does, should we use that in theif?