Skip to content

Commit e898a6e

Browse files
Allow RetryInfo to be disabled in per tenant overrides (#5741)
* Allow RetryInfo to be enabled at cluster and runtime overrides * Update manifest.md file * Add CHANGELOG.md entry * use RetryAfterOnResourceExhausted > 0 as cluster level control
1 parent bcfcc5a commit e898a6e

File tree

12 files changed

+148
-16
lines changed

12 files changed

+148
-16
lines changed

CHANGELOG.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
## main / unreleased
22

33
* [CHANGE] Remove busybox from Tempo image to make it more minimal and prevent future vulnerabilities [#5717](https://github.com/grafana/tempo/pull/5717) (@carles-grafana)
4-
* [CHANGE] docs: Add explicit notes about authentication [#5735](https://github.com/grafana/tempo/pull/5735) (@electron0zero)
4+
* [CHANGE] Allow RetryInfo to be disabled in per tenant overrides [#5741](https://github.com/grafana/tempo/pull/5741) (@electron0zero)
55
* [FEATURE] Add `tempo_metrics_generator_registry_active_series_demand_estimate` that estimates metrics-generator active series demand even when the active series limit is reached [#5710](https://github.com/grafana/tempo/pull/5710) (@carles-grafana)
6-
* [FEATURE] Added validation mode and tests for tempo-vulture [#5605](https://github.com/grafana/tempo/pull/5605)
6+
* [FEATURE] Added validation mode and tests for tempo-vulture [#5605](https://github.com/grafana/tempo/pull/5605) (@davidham)
77
* [FEATURE] New block encoding vParquet5-preview3 replacing well-known attributes with dedicated column defaults. This format is in development and breaking changes are expected before final release. [#5696](https://github.com/grafana/tempo/pull/5696) (@stoewer)
8+
* [ENHANCEMENT] docs: Add explicit notes about authentication [#5735](https://github.com/grafana/tempo/pull/5735) (@electron0zero)
89
* [ENHANCEMENT] On startup, first record for live store to consume is not older than two complete block timeouts [#5693](https://github.com/grafana/tempo/pull/5693) (@ruslan-mikhailov)
910
* [ENHANCEMENT] Add secure connection support to tempo-cli [#5692](https://github.com/grafana/tempo/pull/5692) (@TheoBrigitte)
1011
* [ENHANCEMENT] Improve dashboards for livestore [#5755](https://github.com/grafana/tempo/pull/5755) (@javiermolinar)
@@ -19,7 +20,6 @@
1920
* [BUGFIX] Correctly track and reject too large traces in live stores. [#5757](https://github.com/grafana/tempo/pull/5757) (@joe-elliott)
2021
* [BUGFIX] Fix issues related to integer dedicated columns in vParquet5-preview2 [#5716](https://github.com/grafana/tempo/pull/5716) (@stoewer)
2122
* [BUGFIX] Fix: handle collisions with job and instance labels when targetInfo is enabled [#5774](https://github.com/grafana/tempo/pull/5774) (@javiermolinar)
22-
* [FEATURE] Added validation mode and tests for tempo-vulture [#5605](https://github.com/grafana/tempo/pull/5605)
2323

2424
# v2.9.0
2525

Makefile

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -356,6 +356,10 @@ docs-test:
356356
docker pull ${DOCS_IMAGE}
357357
docker run -v ${PWD}/docs/sources/tempo:/hugo/content/docs/tempo/latest:z -p 3002:3002 --rm $(DOCS_IMAGE) /bin/bash -c 'mkdir -p content/docs/grafana/latest/ && touch content/docs/grafana/latest/menu.yaml && make prod'
358358

359+
.PHONY: generate-manifest
360+
generate-manifest: ## Generate manifest.md file
361+
GO111MODULE=on CGO_ENABLED=0 go run -v pkg/docsgen/generate_manifest.go
362+
359363
##@ jsonnet
360364
.PHONY: jsonnet jsonnet-check jsonnet-test
361365
jsonnet: tools-image ## Generate jsonnet
@@ -375,9 +379,5 @@ tempo-mixin: tools-image
375379
tempo-mixin-check: tools-image
376380
$(TOOLS_CMD) make -C operations/tempo-mixin check
377381

378-
.PHONY: generate-manifest
379-
generate-manifest:
380-
GO111MODULE=on CGO_ENABLED=0 go run -v pkg/docsgen/generate_manifest.go
381-
382382
# Import fragments
383383
include build/tools.mk

docs/sources/tempo/configuration/manifest.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1029,6 +1029,7 @@ overrides:
10291029
rate_limit_bytes: 15000000
10301030
burst_size_bytes: 20000000
10311031
max_traces_per_user: 10000
1032+
retry_info_enabled: true
10321033
read:
10331034
max_bytes_per_tag_values_query: 1000000
10341035
metrics_generator:

modules/distributor/distributor.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1043,3 +1043,18 @@ func (d *Distributor) getMaxAttributeBytes(userID string) int {
10431043

10441044
return d.cfg.MaxAttributeBytes
10451045
}
1046+
1047+
func (d *Distributor) RetryInfoEnabled(ctx context.Context) (bool, error) {
1048+
userID, err := user.ExtractOrgID(ctx)
1049+
if err != nil {
1050+
return false, err
1051+
}
1052+
1053+
// if disabled at cluster level, just return false
1054+
if d.cfg.RetryAfterOnResourceExhausted <= 0 {
1055+
return false, nil
1056+
}
1057+
1058+
// cluster level is enabled, check per-tenant override and respect that.
1059+
return d.overrides.IngestionRetryInfoEnabled(userID), nil
1060+
}

modules/distributor/distributor_test.go

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2442,3 +2442,87 @@ func TestRequestsByTraceID_SpanIDValidation(t *testing.T) {
24422442
_, _, _, err := requestsByTraceID(batches, "test-tenant", 1, 1000)
24432443
require.NoError(t, err)
24442444
}
2445+
2446+
func TestRetryInfoEnabled(t *testing.T) {
2447+
tests := []struct {
2448+
name string
2449+
retryAfterOnResourceExhausted time.Duration
2450+
overrideRetryInfoEnabled bool
2451+
expectedResult bool
2452+
expectError bool
2453+
ctx context.Context
2454+
}{
2455+
{
2456+
name: "cluster level disabled with 0",
2457+
retryAfterOnResourceExhausted: 0,
2458+
overrideRetryInfoEnabled: false,
2459+
expectedResult: false, // disabled because retryAfterOnResourceExhausted is <= 0
2460+
expectError: false,
2461+
ctx: user.InjectOrgID(context.Background(), "test-tenant"),
2462+
},
2463+
{
2464+
name: "cluster level disabled with -1",
2465+
retryAfterOnResourceExhausted: -1,
2466+
overrideRetryInfoEnabled: false,
2467+
expectedResult: false, // disabled because retryAfterOnResourceExhausted is <= 0
2468+
expectError: false,
2469+
ctx: user.InjectOrgID(context.Background(), "test-tenant"),
2470+
},
2471+
{
2472+
name: "cluster level disabled, override enabled",
2473+
retryAfterOnResourceExhausted: 0,
2474+
overrideRetryInfoEnabled: true,
2475+
expectedResult: false, // cluster level disable, it takes priority over overrides
2476+
expectError: false,
2477+
ctx: user.InjectOrgID(context.Background(), "test-tenant"),
2478+
},
2479+
{
2480+
name: "cluster level enabled, override disabled",
2481+
retryAfterOnResourceExhausted: 10 * time.Second,
2482+
overrideRetryInfoEnabled: false,
2483+
expectedResult: false, // disabled because disabled in overrides
2484+
expectError: false,
2485+
ctx: user.InjectOrgID(context.Background(), "test-tenant"),
2486+
},
2487+
{
2488+
name: "cluster level enabled, override enabled",
2489+
retryAfterOnResourceExhausted: 10 * time.Second,
2490+
overrideRetryInfoEnabled: true,
2491+
expectedResult: true, // should be true because overrides is enabled.
2492+
expectError: false,
2493+
ctx: user.InjectOrgID(context.Background(), "test-tenant"),
2494+
},
2495+
{
2496+
name: "error extracting org ID",
2497+
retryAfterOnResourceExhausted: 10 * time.Second,
2498+
overrideRetryInfoEnabled: false,
2499+
expectedResult: false, // invalid org id, return false
2500+
expectError: true,
2501+
ctx: context.Background(), // no org ID
2502+
},
2503+
}
2504+
2505+
for _, tt := range tests {
2506+
t.Run(tt.name, func(t *testing.T) {
2507+
limits := overrides.Config{
2508+
Defaults: overrides.Overrides{
2509+
Ingestion: overrides.IngestionOverrides{
2510+
RetryInfoEnabled: tt.overrideRetryInfoEnabled,
2511+
},
2512+
},
2513+
}
2514+
2515+
d, _ := prepare(t, limits, nil)
2516+
d.cfg.RetryAfterOnResourceExhausted = tt.retryAfterOnResourceExhausted
2517+
2518+
result, err := d.RetryInfoEnabled(tt.ctx)
2519+
2520+
if tt.expectError {
2521+
require.Error(t, err)
2522+
} else {
2523+
require.NoError(t, err)
2524+
assert.Equal(t, tt.expectedResult, result)
2525+
}
2526+
})
2527+
}
2528+
}

modules/distributor/receiver/shim.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ func (r *RetryableError) Error() string {
8383
// https://github.com/open-telemetry/opentelemetry-collector/blob/d7b49df5d9e922df6ce56ad4b64ee1c79f9dbdbe/exporter/otlpexporter/otlp.go#L172
8484
// The otel collector considers some errors retryable and other not. "ResourceExhausted" is special in that it requires a
8585
// RetryInfo detail along with the error code
86-
func wrapErrorIfRetryable(err error, dur *durationpb.Duration) error {
86+
func wrapErrorIfRetryable(err error, dur *durationpb.Duration, enabled bool) error {
8787
if dur == nil {
8888
return err
8989
}
@@ -97,6 +97,11 @@ func wrapErrorIfRetryable(err error, dur *durationpb.Duration) error {
9797
return err
9898
}
9999

100+
// If retryInfo is NOT enabled, return error as is.
101+
if !enabled {
102+
return err
103+
}
104+
100105
// ignore error. code only errors if Code() == ok
101106
s, _ = s.WithDetails(&errdetails.RetryInfo{
102107
RetryDelay: dur,
@@ -110,6 +115,7 @@ func wrapErrorIfRetryable(err error, dur *durationpb.Duration) error {
110115

111116
type TracesPusher interface {
112117
PushTraces(ctx context.Context, traces ptrace.Traces) (*tempopb.PushResponse, error)
118+
RetryInfoEnabled(ctx context.Context) (bool, error)
113119
}
114120

115121
var _ services.Service = (*receiversShim)(nil)
@@ -351,7 +357,11 @@ func (r *receiversShim) ConsumeTraces(ctx context.Context, td ptrace.Traces) err
351357
if err != nil {
352358
r.logger.Log("msg", "pusher failed to consume trace data", "err", err)
353359
span.SetStatus(otelcodes.Error, err.Error())
354-
err = wrapErrorIfRetryable(err, r.retryDelay)
360+
retryInfoEnabled, e := r.pusher.RetryInfoEnabled(ctx)
361+
if e != nil {
362+
return e
363+
}
364+
err = wrapErrorIfRetryable(err, r.retryDelay, retryInfoEnabled)
355365
}
356366

357367
return err

modules/distributor/receiver/shim_test.go

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -203,8 +203,9 @@ func (m *mockHost) GetExtensions() map[component.ID]component.Component {
203203
}
204204

205205
type capturingPusher struct {
206-
traces []ptrace.Traces
207-
t *testing.T
206+
traces []ptrace.Traces
207+
retryInfoEnabled bool
208+
t *testing.T
208209
}
209210

210211
func (p *capturingPusher) GetAndClearTraces() []ptrace.Traces {
@@ -222,29 +223,39 @@ func (p *capturingPusher) PushTraces(ctx context.Context, t ptrace.Traces) (*tem
222223
return &tempopb.PushResponse{}, nil
223224
}
224225

226+
func (p *capturingPusher) RetryInfoEnabled(_ context.Context) (bool, error) {
227+
return p.retryInfoEnabled, nil
228+
}
229+
225230
// TestWrapRetryableError confirms that errors are wrapped as expected
226231
func TestWrapRetryableError(t *testing.T) {
227232
// no wrapping b/c not a grpc error
228233
err := errors.New("test error")
229-
wrapped := wrapErrorIfRetryable(err, nil)
234+
wrapped := wrapErrorIfRetryable(err, nil, false)
230235
require.Equal(t, err, wrapped)
231236
require.False(t, isRetryable(wrapped))
232237

233238
// no wrapping b/c not a resource exhausted grpc error
234239
err = status.Error(codes.FailedPrecondition, "failed precondition")
235-
wrapped = wrapErrorIfRetryable(err, nil)
240+
wrapped = wrapErrorIfRetryable(err, nil, false)
236241
require.Equal(t, err, wrapped)
237242
require.False(t, isRetryable(wrapped))
238243

239244
// no wrapping b/c no configured duration
240245
err = status.Error(codes.ResourceExhausted, "res exhausted")
241-
wrapped = wrapErrorIfRetryable(err, nil)
246+
wrapped = wrapErrorIfRetryable(err, nil, false)
247+
require.Equal(t, err, wrapped)
248+
require.False(t, isRetryable(wrapped))
249+
250+
// no wrapping b/c this is a resource exhausted grpc error but retry info is disabled
251+
err = status.Error(codes.ResourceExhausted, "res exhausted")
252+
wrapped = wrapErrorIfRetryable(err, durationpb.New(time.Second), false)
242253
require.Equal(t, err, wrapped)
243254
require.False(t, isRetryable(wrapped))
244255

245-
// wrapping b/c this is a resource exhausted grpc error
256+
// wrapping b/c this is a resource exhausted grpc error with retry info enabled
246257
err = status.Error(codes.ResourceExhausted, "res exhausted")
247-
wrapped = wrapErrorIfRetryable(err, durationpb.New(time.Second))
258+
wrapped = wrapErrorIfRetryable(err, durationpb.New(time.Second), true)
248259
require.NotEqual(t, err, wrapped)
249260
require.True(t, isRetryable(wrapped))
250261
}

modules/overrides/config.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ type IngestionOverrides struct {
7474
TenantShardSize int `yaml:"tenant_shard_size,omitempty" json:"tenant_shard_size,omitempty"`
7575
MaxAttributeBytes int `yaml:"max_attribute_bytes,omitempty" json:"max_attribute_bytes,omitempty"`
7676
ArtificialDelay *time.Duration `yaml:"artificial_delay,omitempty" json:"artificial_delay,omitempty"`
77+
RetryInfoEnabled bool `yaml:"retry_info_enabled,omitempty" json:"retry_info_enabled,omitempty"`
7778
}
7879

7980
type ForwarderOverrides struct {
@@ -267,6 +268,7 @@ func (c *Config) RegisterFlagsAndApplyDefaults(f *flag.FlagSet) {
267268
c.Defaults.MetricsGenerator.GenerateNativeHistograms = histograms.HistogramMethodClassic
268269

269270
// Distributor LegacyOverrides
271+
c.Defaults.Ingestion.RetryInfoEnabled = true // enabled in overrides by default but it's disabled with RetryAfterOnResourceExhausted = 0
270272
f.StringVar(&c.Defaults.Ingestion.RateStrategy, "distributor.rate-limit-strategy", "local", "Whether the various ingestion rate limits should be applied individually to each distributor instance (local), or evenly shared across the cluster (global).")
271273
f.IntVar(&c.Defaults.Ingestion.RateLimitBytes, "distributor.ingestion-rate-limit-bytes", 15e6, "Per-user ingestion rate limit in bytes per second.")
272274
f.IntVar(&c.Defaults.Ingestion.BurstSizeBytes, "distributor.ingestion-burst-size-bytes", 20e6, "Per-user ingestion burst size in bytes. Should be set to the expected size (in bytes) of a single push request.")

modules/overrides/config_legacy.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ func (c *Overrides) toLegacy() LegacyOverrides {
2323
MaxGlobalTracesPerUser: c.Ingestion.MaxGlobalTracesPerUser,
2424
IngestionMaxAttributeBytes: c.Ingestion.MaxAttributeBytes,
2525
IngestionArtificialDelay: c.Ingestion.ArtificialDelay,
26+
IngestionRetryInfoEnabled: c.Ingestion.RetryInfoEnabled,
2627

2728
Forwarders: c.Forwarders,
2829

@@ -93,6 +94,7 @@ type LegacyOverrides struct {
9394
IngestionTenantShardSize int `yaml:"ingestion_tenant_shard_size" json:"ingestion_tenant_shard_size"`
9495
IngestionMaxAttributeBytes int `yaml:"ingestion_max_attribute_bytes" json:"ingestion_max_attribute_bytes"`
9596
IngestionArtificialDelay *time.Duration `yaml:"ingestion_artificial_delay" json:"ingestion_artificial_delay"`
97+
IngestionRetryInfoEnabled bool `yaml:"ingestion_retry_info_enabled" json:"ingestion_retry_info_enabled"`
9698

9799
// Ingester enforced limits.
98100
MaxLocalTracesPerUser int `yaml:"max_traces_per_user" json:"max_traces_per_user"`
@@ -174,6 +176,7 @@ func (l *LegacyOverrides) toNewLimits() Overrides {
174176
TenantShardSize: l.IngestionTenantShardSize,
175177
MaxAttributeBytes: l.IngestionMaxAttributeBytes,
176178
ArtificialDelay: l.IngestionArtificialDelay,
179+
RetryInfoEnabled: l.IngestionRetryInfoEnabled,
177180
},
178181
Read: ReadOverrides{
179182
MaxBytesPerTagValuesQuery: l.MaxBytesPerTagValuesQuery,

modules/overrides/config_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -409,6 +409,7 @@ func generateTestLegacyOverrides() LegacyOverrides {
409409
IngestionTenantShardSize: 3,
410410
IngestionMaxAttributeBytes: 1000,
411411
IngestionArtificialDelay: durationPtr(5 * time.Minute),
412+
IngestionRetryInfoEnabled: true,
412413

413414
MaxLocalTracesPerUser: 1000,
414415
MaxGlobalTracesPerUser: 2000,

0 commit comments

Comments
 (0)