Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion pkg/block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func UploadPromBlock(ctx context.Context, logger log.Logger, bkt objstore.Bucket
// It makes sure cleanup is done on error to avoid partial block uploads.
// TODO(bplotka): Ensure bucket operations have reasonable backoff retries.
// NOTE: Upload updates `meta.Thanos.File` section.
func upload(ctx context.Context, logger log.Logger, bkt objstore.Bucket, bdir string, hf metadata.HashFunc, checkExternalLabels bool, options ...objstore.UploadOption) error {
func upload(ctx context.Context, logger log.Logger, bkt objstore.Bucket, bdir string, hf metadata.HashFunc, checkExternalLabels bool, options ...objstore.UploadOption) (err error) {
df, err := os.Stat(bdir)
if err != nil {
return err
Expand Down Expand Up @@ -141,6 +141,26 @@ func upload(ctx context.Context, logger log.Logger, bkt objstore.Bucket, bdir st
return errors.Wrap(err, "gather meta file stats")
}

// Track if we started uploading so we can clean up on failure.
// We only clean up if upload started but failed, to avoid partial blocks in object storage.
uploadStarted := false
defer func() {
if err != nil && uploadStarted {
// Upload failed after we started - clean up partial block from object storage.
level.Warn(logger).Log("msg", "upload failed, cleaning up partial block from object storage", "block", id, "err", err)

// Use a new context since the original might be canceled/timed out.
cleanupCtx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()

if deleteErr := Delete(cleanupCtx, logger, bkt, id); deleteErr != nil {
level.Error(logger).Log("msg", "failed to clean up partial block after upload failure", "block", id, "err", deleteErr)
}
}
}()

uploadStarted = true

if err := objstore.UploadDir(ctx, logger, bkt, filepath.Join(bdir, ChunksDirname), path.Join(id.String(), ChunksDirname), options...); err != nil {
return errors.Wrap(err, "upload chunks")
}
Expand Down
45 changes: 20 additions & 25 deletions pkg/block/block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"os"
"path"
Expand Down Expand Up @@ -466,7 +465,7 @@ func TestHashDownload(t *testing.T) {
err = Download(ctx, log.NewNopLogger(), instrumentedBkt, m.ULID, path.Join(tmpDir, b1.String()))
testutil.Ok(t, err)
testutil.Ok(t, promtest.GatherAndCompare(r, strings.NewReader(`
# HELP thanos_objstore_bucket_operations_total Total number of all attempted operations against a bucket.
# HELP thanos_objstore_bucket_operations_total Total number of all attempted operations against a bucket.
# TYPE thanos_objstore_bucket_operations_total counter
thanos_objstore_bucket_operations_total{bucket="test",operation="attributes"} 0
thanos_objstore_bucket_operations_total{bucket="test",operation="delete"} 0
Expand All @@ -475,7 +474,7 @@ func TestHashDownload(t *testing.T) {
thanos_objstore_bucket_operations_total{bucket="test",operation="get_range"} 0
thanos_objstore_bucket_operations_total{bucket="test",operation="iter"} 2
thanos_objstore_bucket_operations_total{bucket="test",operation="upload"} 3
`), `thanos_objstore_bucket_operations_total`))
`), `thanos_objstore_bucket_operations_total`))
}

// Ensures that we always download MetaFile.
Expand All @@ -484,7 +483,7 @@ func TestHashDownload(t *testing.T) {
err = Download(ctx, log.NewNopLogger(), instrumentedBkt, m.ULID, path.Join(tmpDir, b1.String()))
testutil.Ok(t, err)
testutil.Ok(t, promtest.GatherAndCompare(r, strings.NewReader(`
# HELP thanos_objstore_bucket_operations_total Total number of all attempted operations against a bucket.
# HELP thanos_objstore_bucket_operations_total Total number of all attempted operations against a bucket.
# TYPE thanos_objstore_bucket_operations_total counter
thanos_objstore_bucket_operations_total{bucket="test",operation="attributes"} 0
thanos_objstore_bucket_operations_total{bucket="test",operation="delete"} 0
Expand All @@ -493,7 +492,7 @@ func TestHashDownload(t *testing.T) {
thanos_objstore_bucket_operations_total{bucket="test",operation="get_range"} 0
thanos_objstore_bucket_operations_total{bucket="test",operation="iter"} 4
thanos_objstore_bucket_operations_total{bucket="test",operation="upload"} 3
`), `thanos_objstore_bucket_operations_total`))
`), `thanos_objstore_bucket_operations_total`))
}

// Remove chunks => gets redownloaded.
Expand All @@ -504,16 +503,16 @@ func TestHashDownload(t *testing.T) {
err = Download(ctx, log.NewNopLogger(), instrumentedBkt, m.ULID, path.Join(tmpDir, b1.String()))
testutil.Ok(t, err)
testutil.Ok(t, promtest.GatherAndCompare(r, strings.NewReader(`
# HELP thanos_objstore_bucket_operations_total Total number of all attempted operations against a bucket.
# TYPE thanos_objstore_bucket_operations_total counter
thanos_objstore_bucket_operations_total{bucket="test",operation="attributes"} 0
thanos_objstore_bucket_operations_total{bucket="test",operation="delete"} 0
thanos_objstore_bucket_operations_total{bucket="test",operation="exists"} 0
thanos_objstore_bucket_operations_total{bucket="test",operation="get"} 7
thanos_objstore_bucket_operations_total{bucket="test",operation="get_range"} 0
thanos_objstore_bucket_operations_total{bucket="test",operation="iter"} 6
thanos_objstore_bucket_operations_total{bucket="test",operation="upload"} 3
`), `thanos_objstore_bucket_operations_total`))
# HELP thanos_objstore_bucket_operations_total Total number of all attempted operations against a bucket.
# TYPE thanos_objstore_bucket_operations_total counter
thanos_objstore_bucket_operations_total{bucket="test",operation="attributes"} 0
thanos_objstore_bucket_operations_total{bucket="test",operation="delete"} 0
thanos_objstore_bucket_operations_total{bucket="test",operation="exists"} 0
thanos_objstore_bucket_operations_total{bucket="test",operation="get"} 7
thanos_objstore_bucket_operations_total{bucket="test",operation="get_range"} 0
thanos_objstore_bucket_operations_total{bucket="test",operation="iter"} 6
thanos_objstore_bucket_operations_total{bucket="test",operation="upload"} 3
`), `thanos_objstore_bucket_operations_total`))
}
}

Expand All @@ -540,10 +539,9 @@ func TestUploadCleanup(t *testing.T) {
uploadErr := Upload(ctx, log.NewNopLogger(), errBkt, path.Join(tmpDir, b1.String()), metadata.NoneFunc)
testutil.Assert(t, errors.Is(uploadErr, errUploadFailed))

// If upload of index fails, the objects remain because the deletion of partial blocks
// is taken care of by the Compactor.
testutil.Equals(t, 2, len(bkt.Objects()))
testutil.Assert(t, len(bkt.Objects()[path.Join(DebugMetas, fmt.Sprintf("%s.json", b1.String()))]) == 0)
// If upload of index fails, the partial upload is cleaned up from object storage.
// This ensures no partial blocks are left behind.
testutil.Equals(t, 0, len(bkt.Objects()))
}

{
Expand All @@ -552,12 +550,9 @@ func TestUploadCleanup(t *testing.T) {
uploadErr := Upload(ctx, log.NewNopLogger(), errBkt, path.Join(tmpDir, b1.String()), metadata.NoneFunc)
testutil.Assert(t, errors.Is(uploadErr, errUploadFailed))

// If upload of meta.json fails, nothing is cleaned up.
testutil.Equals(t, 3, len(bkt.Objects()))
testutil.Assert(t, len(bkt.Objects()[path.Join(b1.String(), ChunksDirname, "000001")]) > 0)
testutil.Assert(t, len(bkt.Objects()[path.Join(b1.String(), IndexFilename)]) > 0)
testutil.Assert(t, len(bkt.Objects()[path.Join(b1.String(), MetaFilename)]) > 0)
testutil.Assert(t, len(bkt.Objects()[path.Join(DebugMetas, fmt.Sprintf("%s.json", b1.String()))]) == 0)
// If upload of meta.json fails, all uploaded chunks and index are cleaned up.
// This prevents partial blocks from being discovered and eventually deleted by BestEffortCleanAbortedPartialUploads.
testutil.Equals(t, 0, len(bkt.Objects()))
}
}

Expand Down
Loading