Skip to content

Commit dcbd3bd

Browse files
authored
fix(s3): Fix compactor copy metadata via sse (#5774)
1 parent b1b6c01 commit dcbd3bd

File tree

4 files changed

+145
-31
lines changed

4 files changed

+145
-31
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
* [ENHANCEMENT] Unsafe search hints for frontend performance tests [#5723](https://github.com/grafana/tempo/pull/5723) (@ruslan-mikhailov)
1414
* [ENHANCEMENT] Add new livestore alert to the tempo-mixin [#5752](https://github.com/grafana/tempo/pull/5752) (@javiermolinar)
1515
* [ENHANCEMENT] Improve shutdown time in the first 30 seconds [#5725](https://github.com/grafana/tempo/pull/5725) (@ldufr)
16+
* [BUGFIX] Fix compactor to properly consider SSE-KMS information during metadata copy [#5774](https://github.com/grafana/tempo/pull/5774) (@steffsas)
1617
* [BUGFIX] Correctly track and reject too large traces in live stores. [#5757](https://github.com/grafana/tempo/pull/5757) (@joe-elliott)
1718
* [BUGFIX] Fix issues related to integer dedicated columns in vParquet5-preview2 [#5716](https://github.com/grafana/tempo/pull/5716) (@stoewer)
1819
* [FEATURE] Added validation mode and tests for tempo-vulture [#5605](https://github.com/grafana/tempo/pull/5605)

tempodb/backend/s3/compactor.go

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,18 +21,23 @@ func (rw *readerWriter) MarkBlockCompacted(blockID uuid.UUID, tenantID string) e
2121
}
2222

2323
putObjectOptions := getPutObjectOptions(rw)
24-
2524
metaFileName := backend.MetaFileName(blockID, tenantID, rw.cfg.Prefix)
25+
2626
// copy meta.json to meta.compacted.json
27-
_, err := rw.core.CopyObject(
28-
context.TODO(),
29-
rw.cfg.Bucket,
30-
metaFileName,
31-
rw.cfg.Bucket,
32-
backend.CompactedMetaFileName(blockID, tenantID, rw.cfg.Prefix),
33-
nil,
34-
minio.CopySrcOptions{},
35-
putObjectOptions,
27+
// core.CopyObject does not support SSE on src object
28+
_, err := rw.core.Client.CopyObject(context.TODO(),
29+
minio.CopyDestOptions{
30+
Bucket: rw.cfg.Bucket,
31+
Object: backend.CompactedMetaFileName(blockID, tenantID, rw.cfg.Prefix),
32+
Encryption: putObjectOptions.ServerSideEncryption,
33+
UserTags: putObjectOptions.UserTags,
34+
// to set X-Amz-Tagging header, we need to set this flag if tags are present
35+
ReplaceTags: len(putObjectOptions.UserTags) > 0,
36+
}, minio.CopySrcOptions{
37+
Bucket: rw.cfg.Bucket,
38+
Object: metaFileName,
39+
Encryption: putObjectOptions.ServerSideEncryption,
40+
},
3641
)
3742
if err != nil {
3843
return fmt.Errorf("error copying obj meta to compacted obj meta: %w", err)
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
package s3
2+
3+
import (
4+
"encoding/base64"
5+
"net/http"
6+
"testing"
7+
8+
"github.com/google/uuid"
9+
"github.com/grafana/dskit/flagext"
10+
"github.com/stretchr/testify/assert"
11+
"github.com/stretchr/testify/require"
12+
)
13+
14+
func TestMarkBlockCompacted(t *testing.T) {
15+
sseConfig := SSEConfig{
16+
Type: SSEKMS,
17+
KMSKeyID: "my-kms-key-id",
18+
KMSEncryptionContext: "{}",
19+
}
20+
21+
tags := map[string]string{"env": "prod", "app": "thing"}
22+
23+
testedHeaders := []string{
24+
sseHeader,
25+
sseKMSKeyIDHeader,
26+
sseKMSContextHeader,
27+
tagHeader,
28+
}
29+
30+
tests := []struct {
31+
name string
32+
tags map[string]string
33+
sse SSEConfig
34+
expectedHeaderValues map[string]string
35+
}{
36+
{
37+
"sse and tags",
38+
tags,
39+
sseConfig,
40+
map[string]string{
41+
sseHeader: "aws:kms",
42+
sseKMSKeyIDHeader: sseConfig.KMSKeyID,
43+
sseKMSContextHeader: base64.StdEncoding.EncodeToString([]byte(sseConfig.KMSEncryptionContext)),
44+
tagHeader: "app=thing&env=prod",
45+
},
46+
},
47+
{
48+
"tags without sse",
49+
tags,
50+
SSEConfig{},
51+
map[string]string{
52+
tagHeader: "app=thing&env=prod",
53+
},
54+
},
55+
{
56+
"sse without tags",
57+
nil,
58+
sseConfig,
59+
map[string]string{
60+
sseHeader: "aws:kms",
61+
sseKMSKeyIDHeader: sseConfig.KMSKeyID,
62+
sseKMSContextHeader: base64.StdEncoding.EncodeToString([]byte(sseConfig.KMSEncryptionContext)),
63+
},
64+
},
65+
{
66+
"no sse or tag headers",
67+
nil,
68+
SSEConfig{},
69+
map[string]string{},
70+
},
71+
}
72+
73+
for _, tc := range tests {
74+
t.Run(tc.name, func(t *testing.T) {
75+
// rawObject := raw.Object{}
76+
var httpHeader http.Header
77+
78+
server := fakeServerWithHeader(t, &httpHeader)
79+
_, _, c, err := New(&Config{
80+
Region: "blerg",
81+
AccessKey: "test",
82+
SecretKey: flagext.SecretWithValue("test"),
83+
Bucket: "blerg",
84+
Insecure: true,
85+
Endpoint: server.URL[7:], // [7:] -> strip http://
86+
SSE: tc.sse,
87+
Tags: tc.tags,
88+
})
89+
require.NoError(t, err)
90+
91+
_ = c.MarkBlockCompacted(uuid.New(), "tenant1")
92+
93+
// check expected headers to be set with expected values
94+
for headerKey, expectedHeaderValue := range tc.expectedHeaderValues {
95+
headerValue := httpHeader.Get(headerKey)
96+
assert.Equal(t, expectedHeaderValue, headerValue, "expected header %s to have value %s", headerKey, expectedHeaderValue)
97+
}
98+
99+
// check no unexpected headers are set
100+
for _, testedHeader := range testedHeaders {
101+
_, ok := tc.expectedHeaderValues[testedHeader]
102+
if !ok {
103+
require.Empty(t, httpHeader.Get(testedHeader), "expected header %s to be empty", testedHeader)
104+
}
105+
}
106+
})
107+
}
108+
}

tempodb/backend/s3/s3_test.go

Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,13 @@ import (
3131
)
3232

3333
const (
34-
getMethod = "GET"
35-
putMethod = "PUT"
36-
tagHeader = "X-Amz-Tagging"
37-
storageClassHeader = "X-Amz-Storage-Class"
34+
getMethod = "GET"
35+
putMethod = "PUT"
36+
tagHeader = "X-Amz-Tagging"
37+
storageClassHeader = "X-Amz-Storage-Class"
38+
sseHeader = "X-Amz-Server-Side-Encryption"
39+
sseKMSKeyIDHeader = "X-Amz-Server-Side-Encryption-Aws-Kms-Key-Id"
40+
sseKMSContextHeader = "X-Amz-Server-Side-Encryption-Context"
3841

3942
defaultAccessKey = "AKIAIOSFODNN7EXAMPLE"
4043
defaultSecretKey = "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"
@@ -334,21 +337,13 @@ func TestReadError(t *testing.T) {
334337
assert.Equal(t, wups, errB)
335338
}
336339

337-
func fakeServerWithHeader(t *testing.T, obj *url.Values, testedHeaderName string) *httptest.Server {
338-
require.NotNil(t, obj)
340+
func fakeServerWithHeader(t *testing.T, httpHeader *http.Header) *httptest.Server {
341+
require.NotNil(t, httpHeader)
339342

340343
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
341344
switch method := r.Method; method {
342345
case putMethod:
343-
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html
344-
switch testedHeaderValue := r.Header.Get(testedHeaderName); testedHeaderValue {
345-
case "":
346-
default:
347-
348-
value, err := url.ParseQuery(testedHeaderValue)
349-
require.NoError(t, err)
350-
*obj = value
351-
}
346+
*httpHeader = r.Header
352347
case getMethod:
353348
// return fake list response b/c it's the only call that has to succeed
354349
_, _ = w.Write([]byte(`<?xml version="1.0" encoding="UTF-8"?>
@@ -375,9 +370,9 @@ func TestObjectBlockTags(t *testing.T) {
375370
for _, tc := range tests {
376371
t.Run(tc.name, func(t *testing.T) {
377372
// rawObject := raw.Object{}
378-
var obj url.Values
373+
var httpHeaders http.Header
379374

380-
server := fakeServerWithHeader(t, &obj, tagHeader)
375+
server := fakeServerWithHeader(t, &httpHeaders)
381376
_, w, _, err := New(&Config{
382377
Region: "blerg",
383378
AccessKey: "test",
@@ -392,8 +387,13 @@ func TestObjectBlockTags(t *testing.T) {
392387
ctx := context.Background()
393388
_ = w.Write(ctx, "object", backend.KeyPath{"test"}, bytes.NewReader([]byte{}), 0, nil)
394389

390+
testedHeaderValue := httpHeaders.Get(tagHeader)
391+
require.NotEmpty(t, testedHeaderValue)
392+
headerValue, err := url.ParseQuery(testedHeaderValue)
393+
require.NoError(t, err)
394+
395395
for k, v := range tc.tags {
396-
vv := obj.Get(k)
396+
vv := headerValue.Get(k)
397397
require.NotEmpty(t, vv)
398398
require.Equal(t, v, vv)
399399
}
@@ -604,9 +604,9 @@ func TestObjectStorageClass(t *testing.T) {
604604
for _, tc := range tests {
605605
t.Run(tc.name, func(t *testing.T) {
606606
// rawObject := raw.Object{}
607-
var obj url.Values
607+
var httpHeader http.Header
608608

609-
server := fakeServerWithHeader(t, &obj, storageClassHeader)
609+
server := fakeServerWithHeader(t, &httpHeader)
610610
_, w, _, err := New(&Config{
611611
Region: "blerg",
612612
AccessKey: "test",
@@ -620,7 +620,7 @@ func TestObjectStorageClass(t *testing.T) {
620620

621621
ctx := context.Background()
622622
_ = w.Write(ctx, "object", backend.KeyPath{"test"}, bytes.NewReader([]byte{}), 0, nil)
623-
require.Equal(t, obj.Has(tc.StorageClass), true)
623+
require.Equal(t, tc.StorageClass, httpHeader.Get(storageClassHeader))
624624
})
625625
}
626626
}

0 commit comments

Comments
 (0)