diff --git a/CHANGELOG.md b/CHANGELOG.md index 73ed7f6f9c1..d7fbbcb1cf4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - Add unmarshaling and validation for `CardinalityLimits` and `SpanLimits` to v1.0.0 model in `go.opentelemetry.io/contrib/otelconf`. (#8043) - Add unmarshaling and validation for `BatchLogRecordProcessor`, `BatchSpanProcessor`, and `PeriodicMetricReader` to v1.0.0 model in `go.opentelemetry.io/contrib/otelconf`. (#8049) - Add unmarshaling and validation for `TextMapPropagator` to v1.0.0 model in `go.opentelemetry.io/contrib/otelconf`. (#8052) +- Add `jaeger.sampler.type`/`jaeger.sampler.param` attributes for adaptive sampling support and option `WithAttributesDisabled` in `go.opentelemetry.io/contrib/samplers/jaegerremote`. (#8073) - Add unmarshaling and validation for `OTLPHttpExporter`, `OTLPGrpcExporter`, `OTLPGrpcMetricExporter` and `OTLPHttpMetricExporter` to v1.0.0 model in `go.opentelemetry.io/contrib/otelconf`. (#8112) ### Changed diff --git a/samplers/jaegerremote/go.mod b/samplers/jaegerremote/go.mod index 63b5d27294e..7a77050c9d9 100644 --- a/samplers/jaegerremote/go.mod +++ b/samplers/jaegerremote/go.mod @@ -7,6 +7,7 @@ require ( github.com/gogo/protobuf v1.3.2 github.com/jaegertracing/jaeger-idl v0.6.0 github.com/stretchr/testify v1.11.1 + go.opentelemetry.io/otel v1.38.0 go.opentelemetry.io/otel/sdk v1.38.0 go.opentelemetry.io/otel/trace v1.38.0 ) @@ -18,7 +19,6 @@ require ( github.com/google/uuid v1.6.0 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect go.opentelemetry.io/auto/sdk v1.2.1 // indirect - go.opentelemetry.io/otel v1.38.0 // indirect go.opentelemetry.io/otel/metric v1.38.0 // indirect golang.org/x/net v0.46.0 // indirect golang.org/x/sys v0.37.0 // indirect diff --git a/samplers/jaegerremote/sampler.go b/samplers/jaegerremote/sampler.go index 259f7a669a7..b934e4430b6 100644 --- a/samplers/jaegerremote/sampler.go +++ b/samplers/jaegerremote/sampler.go @@ -24,6 +24,7 @@ import ( "sync" jaeger_api_v2 "github.com/jaegertracing/jaeger-idl/proto-gen/api_v2" + "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/sdk/trace" oteltrace "go.opentelemetry.io/otel/trace" @@ -34,25 +35,40 @@ const ( defaultMaxOperations = 2000 ) +const ( + samplerTypeKey = "jaeger.sampler.type" + samplerParamKey = "jaeger.sampler.param" + samplerTypeValueProbabilistic = "probabilistic" + samplerTypeValueRateLimiting = "ratelimiting" +) + // ----------------------- // probabilisticSampler is a sampler that randomly samples a certain percentage // of traces. type probabilisticSampler struct { - samplingRate float64 - sampler trace.Sampler + samplingRate float64 + sampler trace.Sampler + attributes []attribute.KeyValue + attributesDisabled bool } // newProbabilisticSampler creates a sampler that randomly samples a certain percentage of traces specified by the // samplingRate, in the range between 0.0 and 1.0. it utilizes the SDK `trace.TraceIDRatioBased` sampler. -func newProbabilisticSampler(samplingRate float64) *probabilisticSampler { - s := new(probabilisticSampler) +func newProbabilisticSampler(samplingRate float64, attributesDisabled bool) *probabilisticSampler { + s := &probabilisticSampler{ + attributesDisabled: attributesDisabled, + } return s.init(samplingRate) } func (s *probabilisticSampler) init(samplingRate float64) *probabilisticSampler { s.samplingRate = math.Max(0.0, math.Min(samplingRate, 1.0)) s.sampler = trace.TraceIDRatioBased(s.samplingRate) + if s.attributesDisabled { + return s + } + s.attributes = []attribute.KeyValue{attribute.String(samplerTypeKey, samplerTypeValueProbabilistic), attribute.Float64(samplerParamKey, s.samplingRate)} return s } @@ -62,7 +78,12 @@ func (s *probabilisticSampler) SamplingRate() float64 { } func (s *probabilisticSampler) ShouldSample(p trace.SamplingParameters) trace.SamplingResult { - return s.sampler.ShouldSample(p) + r := s.sampler.ShouldSample(p) + if r.Decision == trace.Drop { + return r + } + r.Attributes = s.attributes + return r } // Equal compares with another sampler. @@ -95,11 +116,16 @@ func (s *probabilisticSampler) Description() string { type rateLimitingSampler struct { maxTracesPerSecond float64 rateLimiter *ratelimiter.RateLimiter + attributes []attribute.KeyValue + attributesDisabled bool } // newRateLimitingSampler creates new rateLimitingSampler. -func newRateLimitingSampler(maxTracesPerSecond float64) *rateLimitingSampler { - s := new(rateLimitingSampler) +func newRateLimitingSampler(maxTracesPerSecond float64, attributesDisabled bool) *rateLimitingSampler { + s := &rateLimitingSampler{ + attributesDisabled: attributesDisabled, + } + return s.init(maxTracesPerSecond) } @@ -110,6 +136,10 @@ func (s *rateLimitingSampler) init(maxTracesPerSecond float64) *rateLimitingSamp s.rateLimiter.Update(maxTracesPerSecond, math.Max(maxTracesPerSecond, 1.0)) } s.maxTracesPerSecond = maxTracesPerSecond + if s.attributesDisabled { + return s + } + s.attributes = []attribute.KeyValue{attribute.String(samplerTypeKey, samplerTypeValueRateLimiting), attribute.Float64(samplerParamKey, s.maxTracesPerSecond)} return s } @@ -119,6 +149,7 @@ func (s *rateLimitingSampler) ShouldSample(p trace.SamplingParameters) trace.Sam return trace.SamplingResult{ Decision: trace.RecordAndSample, Tracestate: psc.TraceState(), + Attributes: s.attributes, } } return trace.SamplingResult{ @@ -161,12 +192,14 @@ type guaranteedThroughputProbabilisticSampler struct { lowerBoundSampler *rateLimitingSampler samplingRate float64 lowerBound float64 + attributesDisabled bool } -func newGuaranteedThroughputProbabilisticSampler(lowerBound, samplingRate float64) *guaranteedThroughputProbabilisticSampler { +func newGuaranteedThroughputProbabilisticSampler(lowerBound, samplingRate float64, attributesDisabled bool) *guaranteedThroughputProbabilisticSampler { s := &guaranteedThroughputProbabilisticSampler{ - lowerBoundSampler: newRateLimitingSampler(lowerBound), - lowerBound: lowerBound, + lowerBoundSampler: newRateLimitingSampler(lowerBound, attributesDisabled), + lowerBound: lowerBound, + attributesDisabled: attributesDisabled, } s.setProbabilisticSampler(samplingRate) return s @@ -174,7 +207,7 @@ func newGuaranteedThroughputProbabilisticSampler(lowerBound, samplingRate float6 func (s *guaranteedThroughputProbabilisticSampler) setProbabilisticSampler(samplingRate float64) { if s.probabilisticSampler == nil { - s.probabilisticSampler = newProbabilisticSampler(samplingRate) + s.probabilisticSampler = newProbabilisticSampler(samplingRate, s.attributesDisabled) } else if s.samplingRate != samplingRate { s.probabilisticSampler.init(samplingRate) } @@ -218,6 +251,7 @@ type perOperationSampler struct { // see description in perOperationSamplerParams operationNameLateBinding bool + attributesDisabled bool } // perOperationSamplerParams defines parameters when creating perOperationSampler. @@ -238,7 +272,7 @@ type perOperationSamplerParams struct { } // newPerOperationSampler returns a new perOperationSampler. -func newPerOperationSampler(params perOperationSamplerParams) *perOperationSampler { +func newPerOperationSampler(params perOperationSamplerParams, attributesDisabled bool) *perOperationSampler { if params.MaxOperations <= 0 { params.MaxOperations = defaultMaxOperations } @@ -247,15 +281,17 @@ func newPerOperationSampler(params perOperationSamplerParams) *perOperationSampl sampler := newGuaranteedThroughputProbabilisticSampler( params.Strategies.DefaultLowerBoundTracesPerSecond, strategy.ProbabilisticSampling.SamplingRate, + attributesDisabled, ) samplers[strategy.Operation] = sampler } return &perOperationSampler{ samplers: samplers, - defaultSampler: newProbabilisticSampler(params.Strategies.DefaultSamplingProbability), + defaultSampler: newProbabilisticSampler(params.Strategies.DefaultSamplingProbability, attributesDisabled), lowerBound: params.Strategies.DefaultLowerBoundTracesPerSecond, maxOperations: params.MaxOperations, operationNameLateBinding: params.OperationNameLateBinding, + attributesDisabled: attributesDisabled, } } @@ -284,7 +320,7 @@ func (s *perOperationSampler) getSamplerForOperation(operation string) trace.Sam if len(s.samplers) >= s.maxOperations { return s.defaultSampler } - newSampler := newGuaranteedThroughputProbabilisticSampler(s.lowerBound, s.defaultSampler.SamplingRate()) + newSampler := newGuaranteedThroughputProbabilisticSampler(s.lowerBound, s.defaultSampler.SamplingRate(), s.attributesDisabled) s.samplers[operation] = newSampler return newSampler } @@ -308,13 +344,14 @@ func (s *perOperationSampler) update(strategies *jaeger_api_v2.PerOperationSampl sampler := newGuaranteedThroughputProbabilisticSampler( lowerBound, samplingRate, + s.attributesDisabled, ) newSamplers[operation] = sampler } } s.lowerBound = strategies.DefaultLowerBoundTracesPerSecond if s.defaultSampler.SamplingRate() != strategies.DefaultSamplingProbability { - s.defaultSampler = newProbabilisticSampler(strategies.DefaultSamplingProbability) + s.defaultSampler = newProbabilisticSampler(strategies.DefaultSamplingProbability, s.attributesDisabled) } s.samplers = newSamplers } diff --git a/samplers/jaegerremote/sampler_remote.go b/samplers/jaegerremote/sampler_remote.go index 4cc1531cf53..dd798edb316 100644 --- a/samplers/jaegerremote/sampler_remote.go +++ b/samplers/jaegerremote/sampler_remote.go @@ -189,10 +189,12 @@ func (s *Sampler) updateSamplerViaUpdaters(strategy any) error { // ----------------------- // probabilisticSamplerUpdater is used by Sampler to parse sampling configuration. -type probabilisticSamplerUpdater struct{} +type probabilisticSamplerUpdater struct { + attributesDisabled bool +} // Update implements Update of samplerUpdater. -func (*probabilisticSamplerUpdater) Update(sampler trace.Sampler, strategy any) (trace.Sampler, error) { +func (u *probabilisticSamplerUpdater) Update(sampler trace.Sampler, strategy any) (trace.Sampler, error) { type response interface { GetProbabilisticSampling() *jaeger_api_v2.ProbabilisticSamplingStrategy } @@ -205,7 +207,7 @@ func (*probabilisticSamplerUpdater) Update(sampler trace.Sampler, strategy any) } return sampler, nil } - return newProbabilisticSampler(probabilistic.SamplingRate), nil + return newProbabilisticSampler(probabilistic.SamplingRate, u.attributesDisabled), nil } } return nil, nil @@ -214,10 +216,12 @@ func (*probabilisticSamplerUpdater) Update(sampler trace.Sampler, strategy any) // ----------------------- // rateLimitingSamplerUpdater is used by Sampler to parse sampling configuration. -type rateLimitingSamplerUpdater struct{} +type rateLimitingSamplerUpdater struct { + attributesDisabled bool +} // Update implements Update of samplerUpdater. -func (*rateLimitingSamplerUpdater) Update(sampler trace.Sampler, strategy any) (trace.Sampler, error) { +func (u *rateLimitingSamplerUpdater) Update(sampler trace.Sampler, strategy any) (trace.Sampler, error) { type response interface { GetRateLimitingSampling() *jaeger_api_v2.RateLimitingSamplingStrategy } @@ -229,7 +233,7 @@ func (*rateLimitingSamplerUpdater) Update(sampler trace.Sampler, strategy any) ( rl.Update(rateLimit) return rl, nil } - return newRateLimitingSampler(rateLimit), nil + return newRateLimitingSampler(rateLimit, u.attributesDisabled), nil } } return nil, nil @@ -242,6 +246,7 @@ func (*rateLimitingSamplerUpdater) Update(sampler trace.Sampler, strategy any) ( type perOperationSamplerUpdater struct { MaxOperations int OperationNameLateBinding bool + attributesDisabled bool } // Update implements Update of samplerUpdater. @@ -260,7 +265,7 @@ func (u *perOperationSamplerUpdater) Update(sampler trace.Sampler, strategy any) MaxOperations: u.MaxOperations, OperationNameLateBinding: u.OperationNameLateBinding, Strategies: operations, - }), nil + }, u.attributesDisabled), nil } } return nil, nil diff --git a/samplers/jaegerremote/sampler_remote_integration_test.go b/samplers/jaegerremote/sampler_remote_integration_test.go new file mode 100644 index 00000000000..6af6091e3f4 --- /dev/null +++ b/samplers/jaegerremote/sampler_remote_integration_test.go @@ -0,0 +1,144 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package jaegerremote_test + +import ( + "encoding/binary" + "testing" + "time" + + jaeger_api_v2 "github.com/jaegertracing/jaeger-idl/proto-gen/api_v2" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/sdk/trace" + oteltrace "go.opentelemetry.io/otel/trace" + + "go.opentelemetry.io/contrib/samplers/jaegerremote" + "go.opentelemetry.io/contrib/samplers/jaegerremote/internal/testutils" +) + +const ( + testDefaultSamplingProbability = 0.5 + testMaxID = uint64(1) << 63 +) + +func TestRemotelyControlledSampler_Attributes(t *testing.T) { + agent, err := testutils.StartMockAgent() + require.NoError(t, err) + + remoteSampler := jaegerremote.New( + "client app", + jaegerremote.WithSamplingServerURL("http://"+agent.SamplingServerAddr()), + jaegerremote.WithSamplingRefreshInterval(time.Minute), + ) + remoteSampler.Close() // stop timer-based updates, we want to call them manually + defer agent.Close() + + var traceID oteltrace.TraceID + binary.BigEndian.PutUint64(traceID[8:], testMaxID-20) + + t.Run("probabilistic", func(t *testing.T) { + agent.AddSamplingStrategy("client app", + &jaeger_api_v2.SamplingStrategyResponse{ + StrategyType: jaeger_api_v2.SamplingStrategyType_PROBABILISTIC, + ProbabilisticSampling: &jaeger_api_v2.ProbabilisticSamplingStrategy{ + SamplingRate: testDefaultSamplingProbability, + }, + }) + remoteSampler.UpdateSampler() + + result := remoteSampler.ShouldSample(trace.SamplingParameters{TraceID: traceID}) + assert.Equal(t, trace.RecordAndSample, result.Decision) + assert.Equal(t, []attribute.KeyValue{attribute.String("jaeger.sampler.type", "probabilistic"), attribute.Float64("jaeger.sampler.param", 0.5)}, result.Attributes) + }) + + t.Run("ratelimitng", func(t *testing.T) { + agent.AddSamplingStrategy("client app", + &jaeger_api_v2.SamplingStrategyResponse{ + StrategyType: jaeger_api_v2.SamplingStrategyType_RATE_LIMITING, + RateLimitingSampling: &jaeger_api_v2.RateLimitingSamplingStrategy{ + MaxTracesPerSecond: 1, + }, + }) + remoteSampler.UpdateSampler() + + result := remoteSampler.ShouldSample(trace.SamplingParameters{TraceID: traceID}) + assert.Equal(t, trace.RecordAndSample, result.Decision) + assert.Equal(t, []attribute.KeyValue{attribute.String("jaeger.sampler.type", "ratelimiting"), attribute.Float64("jaeger.sampler.param", 1)}, result.Attributes) + }) + + t.Run("per operation", func(t *testing.T) { + agent.AddSamplingStrategy("client app", + &jaeger_api_v2.SamplingStrategyResponse{OperationSampling: &jaeger_api_v2.PerOperationSamplingStrategies{ + DefaultSamplingProbability: testDefaultSamplingProbability, + DefaultLowerBoundTracesPerSecond: 1.0, + }}) + remoteSampler.UpdateSampler() + + result := remoteSampler.ShouldSample(trace.SamplingParameters{TraceID: traceID}) + assert.Equal(t, trace.RecordAndSample, result.Decision) + assert.Equal(t, []attribute.KeyValue{attribute.String("jaeger.sampler.type", "probabilistic"), attribute.Float64("jaeger.sampler.param", 0.5)}, result.Attributes) + }) +} + +func TestRemotelyControlledSampler_AttributesDisabled(t *testing.T) { + agent, err := testutils.StartMockAgent() + require.NoError(t, err) + + remoteSampler := jaegerremote.New( + "client app", + jaegerremote.WithSamplingServerURL("http://"+agent.SamplingServerAddr()), + jaegerremote.WithSamplingRefreshInterval(time.Minute), + jaegerremote.WithAttributesDisabled(), + ) + remoteSampler.Close() // stop timer-based updates, we want to call them manually + defer agent.Close() + + var traceID oteltrace.TraceID + binary.BigEndian.PutUint64(traceID[8:], testMaxID-20) + + t.Run("probabilistic", func(t *testing.T) { + agent.AddSamplingStrategy("client app", + &jaeger_api_v2.SamplingStrategyResponse{ + StrategyType: jaeger_api_v2.SamplingStrategyType_PROBABILISTIC, + ProbabilisticSampling: &jaeger_api_v2.ProbabilisticSamplingStrategy{ + SamplingRate: testDefaultSamplingProbability, + }, + }) + remoteSampler.UpdateSampler() + + result := remoteSampler.ShouldSample(trace.SamplingParameters{TraceID: traceID}) + assert.Equal(t, trace.RecordAndSample, result.Decision) + assert.Nil(t, result.Attributes) + }) + + t.Run("ratelimitng", func(t *testing.T) { + agent.AddSamplingStrategy("client app", + &jaeger_api_v2.SamplingStrategyResponse{ + StrategyType: jaeger_api_v2.SamplingStrategyType_RATE_LIMITING, + RateLimitingSampling: &jaeger_api_v2.RateLimitingSamplingStrategy{ + MaxTracesPerSecond: 1, + }, + }) + remoteSampler.UpdateSampler() + + result := remoteSampler.ShouldSample(trace.SamplingParameters{TraceID: traceID}) + assert.Equal(t, trace.RecordAndSample, result.Decision) + assert.Nil(t, result.Attributes) + }) + + t.Run("per operation", func(t *testing.T) { + agent.AddSamplingStrategy("client app", + &jaeger_api_v2.SamplingStrategyResponse{OperationSampling: &jaeger_api_v2.PerOperationSamplingStrategies{ + DefaultSamplingProbability: testDefaultSamplingProbability, + DefaultLowerBoundTracesPerSecond: 1.0, + }}) + remoteSampler.UpdateSampler() + + result := remoteSampler.ShouldSample(trace.SamplingParameters{TraceID: traceID}) + assert.Equal(t, trace.RecordAndSample, result.Decision) + assert.Nil(t, result.Attributes) + }) +} diff --git a/samplers/jaegerremote/sampler_remote_options.go b/samplers/jaegerremote/sampler_remote_options.go index 3679b148db3..c26a4c55ef3 100644 --- a/samplers/jaegerremote/sampler_remote_options.go +++ b/samplers/jaegerremote/sampler_remote_options.go @@ -38,6 +38,7 @@ type config struct { updaters []samplerUpdater posParams perOperationSamplerParams logger logr.Logger + attributesDisabled bool } func getEnvOptions() ([]Option, []error) { @@ -87,15 +88,10 @@ func getEnvOptions() ([]Option, []error) { // newConfig returns an appropriately configured config. func newConfig(options ...Option) config { c := config{ - sampler: newProbabilisticSampler(0.001), samplingServerURL: defaultSamplingServerURL, samplingRefreshInterval: defaultSamplingRefreshInterval, samplingFetcher: newHTTPSamplingStrategyFetcher(defaultSamplingServerURL), samplingParser: new(samplingStrategyParserImpl), - updaters: []samplerUpdater{ - new(probabilisticSamplerUpdater), - new(rateLimitingSamplerUpdater), - }, posParams: perOperationSamplerParams{ MaxOperations: defaultSamplingMaxOperations, OperationNameLateBinding: defaultSamplingOperationNameLateBinding, @@ -115,11 +111,20 @@ func newConfig(options ...Option) config { for _, err := range errs { c.logger.Error(err, "env variable parsing failure") } + c.updaters = []samplerUpdater{ + &perOperationSamplerUpdater{ + MaxOperations: c.posParams.MaxOperations, + OperationNameLateBinding: c.posParams.OperationNameLateBinding, + attributesDisabled: c.attributesDisabled, + }, + &probabilisticSamplerUpdater{attributesDisabled: c.attributesDisabled}, + &rateLimitingSamplerUpdater{attributesDisabled: c.attributesDisabled}, + } + + if c.sampler == nil { + c.sampler = newProbabilisticSampler(0.001, c.attributesDisabled) + } - c.updaters = append([]samplerUpdater{&perOperationSamplerUpdater{ - MaxOperations: c.posParams.MaxOperations, - OperationNameLateBinding: c.posParams.OperationNameLateBinding, - }}, c.updaters...) return c } @@ -192,16 +197,16 @@ func WithSamplingStrategyFetcher(fetcher SamplingStrategyFetcher) Option { }) } -// samplingStrategyParser creates a Option that initializes sampling strategy parser. -func withSamplingStrategyParser(parser samplingStrategyParser) Option { +// WithAttributesDisabled configures the sampler to disable setting attributes jaeger.sampler.type and jaeger.sampler.param. +func WithAttributesDisabled() Option { return optionFunc(func(c *config) { - c.samplingParser = parser + c.attributesDisabled = true }) } -// withUpdaters creates a Option that initializes sampler updaters. -func withUpdaters(updaters ...samplerUpdater) Option { +// samplingStrategyParser creates a Option that initializes sampling strategy parser. +func withSamplingStrategyParser(parser samplingStrategyParser) Option { return optionFunc(func(c *config) { - c.updaters = updaters + c.samplingParser = parser }) } diff --git a/samplers/jaegerremote/sampler_remote_test.go b/samplers/jaegerremote/sampler_remote_test.go index 6392edf488a..58a935cd93c 100644 --- a/samplers/jaegerremote/sampler_remote_test.go +++ b/samplers/jaegerremote/sampler_remote_test.go @@ -38,10 +38,9 @@ import ( ) func TestRemotelyControlledSampler_updateConcurrentSafe(*testing.T) { - initSampler := newProbabilisticSampler(0.123) + initSampler := newProbabilisticSampler(0.123, false) fetcher := &testSamplingStrategyFetcher{response: []byte("probabilistic")} parser := new(testSamplingStrategyParser) - updaters := []samplerUpdater{new(probabilisticSamplerUpdater)} sampler := New( "test", WithMaxOperations(42), @@ -51,7 +50,6 @@ func TestRemotelyControlledSampler_updateConcurrentSafe(*testing.T) { WithSamplingRefreshInterval(time.Millisecond), WithSamplingStrategyFetcher(fetcher), withSamplingStrategyParser(parser), - withUpdaters(updaters...), ) defer sampler.Close() @@ -107,11 +105,10 @@ func (*testSamplingStrategyParser) Parse(response []byte) (any, error) { } func TestRemoteSamplerOptions(t *testing.T) { - initSampler := newProbabilisticSampler(0.123) + initSampler := newProbabilisticSampler(0.123, false) fetcher := new(fakeSamplingFetcher) parser := new(samplingStrategyParserImpl) logger := testr.New(t) - updaters := []samplerUpdater{new(probabilisticSamplerUpdater)} sampler := New( "test", WithMaxOperations(42), @@ -121,8 +118,8 @@ func TestRemoteSamplerOptions(t *testing.T) { WithSamplingRefreshInterval(42*time.Second), WithSamplingStrategyFetcher(fetcher), withSamplingStrategyParser(parser), - withUpdaters(updaters...), WithLogger(logger), + WithAttributesDisabled(), ) defer sampler.Close() assert.Equal(t, 42, sampler.posParams.MaxOperations) @@ -132,8 +129,9 @@ func TestRemoteSamplerOptions(t *testing.T) { assert.Equal(t, 42*time.Second, sampler.samplingRefreshInterval) assert.Same(t, fetcher, sampler.samplingFetcher) assert.Same(t, parser, sampler.samplingParser) - assert.EqualValues(t, &perOperationSamplerUpdater{MaxOperations: 42, OperationNameLateBinding: true}, sampler.updaters[0]) + assert.EqualValues(t, &perOperationSamplerUpdater{MaxOperations: 42, OperationNameLateBinding: true, attributesDisabled: true}, sampler.updaters[0]) assert.Equal(t, logger, sampler.logger) + assert.True(t, sampler.attributesDisabled) } func TestRemoteSamplerOptionsDefaults(t *testing.T) { @@ -150,7 +148,7 @@ func initAgent(t *testing.T) (*testutils.MockAgent, *Sampler) { agent, err := testutils.StartMockAgent() require.NoError(t, err) - initialSampler := newProbabilisticSampler(0.001) + initialSampler := newProbabilisticSampler(0.001, false) sampler := New( "client app", WithSamplingServerURL("http://"+agent.SamplingServerAddr()), @@ -177,7 +175,7 @@ func TestRemotelyControlledSampler(t *testing.T) { agent, remoteSampler := initAgent(t) defer agent.Close() - defaultSampler := newProbabilisticSampler(0.001) + defaultSampler := newProbabilisticSampler(0.001, false) remoteSampler.setSampler(defaultSampler) agent.AddSamplingStrategy("client app", @@ -294,10 +292,9 @@ func TestRemotelyControlledSampler_updateSampler(t *testing.T) { } func TestRemotelyControlledSampler_ImmediatelyUpdateOnStartup(t *testing.T) { - initSampler := newProbabilisticSampler(0.123) + initSampler := newProbabilisticSampler(0.123, false) fetcher := &testSamplingStrategyFetcher{response: []byte("rateLimiting")} parser := new(testSamplingStrategyParser) - updaters := []samplerUpdater{new(probabilisticSamplerUpdater), new(rateLimitingSamplerUpdater)} sampler := New( "test", WithMaxOperations(42), @@ -307,7 +304,6 @@ func TestRemotelyControlledSampler_ImmediatelyUpdateOnStartup(t *testing.T) { WithSamplingRefreshInterval(10*time.Minute), WithSamplingStrategyFetcher(fetcher), withSamplingStrategyParser(parser), - withUpdaters(updaters...), ) time.Sleep(100 * time.Millisecond) // waiting for s.pollController sampler.Close() // stop pollController, avoid date race @@ -390,7 +386,7 @@ func TestRemotelyControlledSampler_updateSamplerFromAdaptiveSampler(t *testing.T adaptiveSampler := newPerOperationSampler(perOperationSamplerParams{ MaxOperations: testDefaultMaxOperations, Strategies: strategies, - }) + }, false) // Overwrite the sampler with an adaptive sampler remoteSampler.setSampler(adaptiveSampler) @@ -423,12 +419,12 @@ func TestRemotelyControlledSampler_updateSamplerFromAdaptiveSampler(t *testing.T } func TestRemotelyControlledSampler_updateRateLimitingOrProbabilisticSampler(t *testing.T) { - probabilisticSampler := newProbabilisticSampler(0.002) - otherProbabilisticSampler := newProbabilisticSampler(0.003) - maxProbabilisticSampler := newProbabilisticSampler(1.0) + probabilisticSampler := newProbabilisticSampler(0.002, false) + otherProbabilisticSampler := newProbabilisticSampler(0.003, false) + maxProbabilisticSampler := newProbabilisticSampler(1.0, false) - rateLimitingSampler := newRateLimitingSampler(2) - otherRateLimitingSampler := newRateLimitingSampler(3) + rateLimitingSampler := newRateLimitingSampler(2, false) + otherRateLimitingSampler := newRateLimitingSampler(3, false) testCases := []struct { res *jaeger_api_v2.SamplingStrategyResponse @@ -494,10 +490,6 @@ func TestRemotelyControlledSampler_updateRateLimitingOrProbabilisticSampler(t *t remoteSampler := New( "test", WithInitialSampler(testCase.initSampler), - withUpdaters( - new(probabilisticSamplerUpdater), - new(rateLimitingSamplerUpdater), - ), ) defer remoteSampler.Close() err := remoteSampler.updateSamplerViaUpdaters(testCase.res) diff --git a/samplers/jaegerremote/sampler_test.go b/samplers/jaegerremote/sampler_test.go index 2d4c4dc9657..972c3264939 100644 --- a/samplers/jaegerremote/sampler_test.go +++ b/samplers/jaegerremote/sampler_test.go @@ -27,6 +27,7 @@ import ( jaeger_api_v2 "github.com/jaegertracing/jaeger-idl/proto-gen/api_v2" "github.com/stretchr/testify/assert" + "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/sdk/trace" oteltrace "go.opentelemetry.io/otel/trace" ) @@ -67,7 +68,7 @@ func defaultIDGenerator() *randomIDGenerator { func TestProbabilisticSampler(t *testing.T) { var traceID oteltrace.TraceID - sampler := newProbabilisticSampler(0.5) + sampler := newProbabilisticSampler(0.5, false) binary.BigEndian.PutUint64(traceID[8:], testMaxID+10) result := sampler.ShouldSample(trace.SamplingParameters{TraceID: traceID}) assert.Equal(t, trace.Drop, result.Decision) @@ -88,7 +89,7 @@ func TestProbabilisticSampler(t *testing.T) { t.Run("test_parity", func(t *testing.T) { numTests := 1000 - sampler := newProbabilisticSampler(0.5) + sampler := newProbabilisticSampler(0.5, true) oracle := trace.TraceIDRatioBased(0.5) idGenerator := defaultIDGenerator() @@ -102,16 +103,16 @@ func TestProbabilisticSampler(t *testing.T) { }) t.Run("Equals", func(t *testing.T) { - sampler := newProbabilisticSampler(0.5) - assert.True(t, sampler.Equal(newProbabilisticSampler(0.5))) - assert.False(t, sampler.Equal(newProbabilisticSampler(0.0))) - assert.False(t, sampler.Equal(newProbabilisticSampler(0.75))) - assert.False(t, sampler.Equal(newProbabilisticSampler(1.0))) + sampler := newProbabilisticSampler(0.5, false) + assert.True(t, sampler.Equal(newProbabilisticSampler(0.5, false))) + assert.False(t, sampler.Equal(newProbabilisticSampler(0.0, false))) + assert.False(t, sampler.Equal(newProbabilisticSampler(0.75, false))) + assert.False(t, sampler.Equal(newProbabilisticSampler(1.0, false))) }) } func TestRateLimitingSampler(t *testing.T) { - sampler := newRateLimitingSampler(2) + sampler := newRateLimitingSampler(2, false) result := sampler.ShouldSample(trace.SamplingParameters{Name: testOperationName}) assert.Equal(t, trace.RecordAndSample, result.Decision) result = sampler.ShouldSample(trace.SamplingParameters{Name: testOperationName}) @@ -119,13 +120,13 @@ func TestRateLimitingSampler(t *testing.T) { result = sampler.ShouldSample(trace.SamplingParameters{Name: testOperationName}) assert.Equal(t, trace.Drop, result.Decision) - sampler = newRateLimitingSampler(0.1) + sampler = newRateLimitingSampler(0.1, false) result = sampler.ShouldSample(trace.SamplingParameters{Name: testOperationName}) assert.Equal(t, trace.RecordAndSample, result.Decision) result = sampler.ShouldSample(trace.SamplingParameters{Name: testOperationName}) assert.Equal(t, trace.Drop, result.Decision) - sampler = newRateLimitingSampler(0) + sampler = newRateLimitingSampler(0, false) result = sampler.ShouldSample(trace.SamplingParameters{Name: testOperationName}) assert.Equal(t, trace.Drop, result.Decision) } @@ -133,7 +134,7 @@ func TestRateLimitingSampler(t *testing.T) { func TestGuaranteedThroughputProbabilisticSamplerUpdate(t *testing.T) { samplingRate := 0.5 lowerBound := 2.0 - sampler := newGuaranteedThroughputProbabilisticSampler(lowerBound, samplingRate) + sampler := newGuaranteedThroughputProbabilisticSampler(lowerBound, samplingRate, false) assert.Equal(t, lowerBound, sampler.lowerBound) assert.Equal(t, samplingRate, sampler.samplingRate) @@ -164,16 +165,16 @@ func TestAdaptiveSampler(t *testing.T) { sampler := newPerOperationSampler(perOperationSamplerParams{ Strategies: strategies, MaxOperations: 42, - }) + }, false) assert.Equal(t, 42, sampler.maxOperations) - sampler = newPerOperationSampler(perOperationSamplerParams{Strategies: strategies}) + sampler = newPerOperationSampler(perOperationSamplerParams{Strategies: strategies}, false) assert.Equal(t, 2000, sampler.maxOperations, "default MaxOperations applied") sampler = newPerOperationSampler(perOperationSamplerParams{ MaxOperations: testDefaultMaxOperations, Strategies: strategies, - }) + }, false) result := sampler.ShouldSample(makeSamplingParameters(testMaxID+10, testOperationName)) assert.Equal(t, trace.RecordAndSample, result.Decision) @@ -204,14 +205,14 @@ func TestAdaptiveSamplerErrors(t *testing.T) { sampler := newPerOperationSampler(perOperationSamplerParams{ MaxOperations: testDefaultMaxOperations, Strategies: strategies, - }) + }, false) assert.Equal(t, 0.0, sampler.samplers[testOperationName].samplingRate) strategies.PerOperationStrategies[0].ProbabilisticSampling.SamplingRate = 1.1 sampler = newPerOperationSampler(perOperationSamplerParams{ MaxOperations: testDefaultMaxOperations, Strategies: strategies, - }) + }, false) assert.Equal(t, 1.0, sampler.samplers[testOperationName].samplingRate) } @@ -233,7 +234,7 @@ func TestAdaptiveSamplerUpdate(t *testing.T) { sampler := newPerOperationSampler(perOperationSamplerParams{ MaxOperations: testDefaultMaxOperations, Strategies: strategies, - }) + }, false) assert.Equal(t, lowerBound, sampler.lowerBound) assert.Equal(t, testDefaultSamplingProbability, sampler.defaultSampler.SamplingRate()) @@ -281,8 +282,157 @@ func TestMaxOperations(t *testing.T) { sampler := newPerOperationSampler(perOperationSamplerParams{ MaxOperations: 1, Strategies: strategies, - }) + }, false) result := sampler.ShouldSample(makeSamplingParameters(testMaxID-10, testFirstTimeOperationName)) assert.Equal(t, trace.RecordAndSample, result.Decision) } + +func TestAttributes(t *testing.T) { + t.Parallel() + + t.Run("probabilistic", func(t *testing.T) { + t.Parallel() + + var traceID oteltrace.TraceID + s := newProbabilisticSampler(0.5, false) + binary.BigEndian.PutUint64(traceID[:8], math.MaxUint64) + binary.BigEndian.PutUint64(traceID[8:], testMaxID+10) + result := s.ShouldSample(trace.SamplingParameters{TraceID: traceID}) + assert.Equal(t, trace.Drop, result.Decision) + assert.Nil(t, result.Attributes) + + binary.BigEndian.PutUint64(traceID[8:], testMaxID-20) + result = s.ShouldSample(trace.SamplingParameters{TraceID: traceID}) + assert.Equal(t, trace.RecordAndSample, result.Decision) + assert.Equal(t, []attribute.KeyValue{attribute.String(samplerTypeKey, samplerTypeValueProbabilistic), attribute.Float64(samplerParamKey, 0.5)}, result.Attributes) + + s = newProbabilisticSampler(1.0, false) + result = s.ShouldSample(trace.SamplingParameters{TraceID: traceID}) + assert.Equal(t, trace.RecordAndSample, result.Decision) + assert.Equal(t, []attribute.KeyValue{attribute.String(samplerTypeKey, samplerTypeValueProbabilistic), attribute.Float64(samplerParamKey, 1.0)}, result.Attributes) + }) + + t.Run("probabilistic attributes disabled", func(t *testing.T) { + t.Parallel() + + var traceID oteltrace.TraceID + s := newProbabilisticSampler(0.5, true) + binary.BigEndian.PutUint64(traceID[:8], math.MaxUint64) + binary.BigEndian.PutUint64(traceID[8:], testMaxID+10) + result := s.ShouldSample(trace.SamplingParameters{TraceID: traceID}) + assert.Equal(t, trace.Drop, result.Decision) + assert.Nil(t, result.Attributes) + + binary.BigEndian.PutUint64(traceID[8:], testMaxID-20) + result = s.ShouldSample(trace.SamplingParameters{TraceID: traceID}) + assert.Equal(t, trace.RecordAndSample, result.Decision) + assert.Nil(t, result.Attributes) + }) + + t.Run("ratelimiting", func(t *testing.T) { + t.Parallel() + + s := newRateLimitingSampler(1, false) + result := s.ShouldSample(trace.SamplingParameters{Name: testOperationName}) + assert.Equal(t, trace.RecordAndSample, result.Decision) + assert.Equal(t, []attribute.KeyValue{attribute.String(samplerTypeKey, samplerTypeValueRateLimiting), attribute.Float64(samplerParamKey, 1)}, result.Attributes) + result = s.ShouldSample(trace.SamplingParameters{Name: testOperationName}) + assert.Equal(t, trace.Drop, result.Decision) + assert.Nil(t, result.Attributes) + + s = newRateLimitingSampler(0.1, false) + result = s.ShouldSample(trace.SamplingParameters{Name: testOperationName}) + assert.Equal(t, trace.RecordAndSample, result.Decision) + assert.Equal(t, []attribute.KeyValue{attribute.String(samplerTypeKey, samplerTypeValueRateLimiting), attribute.Float64(samplerParamKey, 0.1)}, result.Attributes) + result = s.ShouldSample(trace.SamplingParameters{Name: testOperationName}) + assert.Equal(t, trace.Drop, result.Decision) + assert.Nil(t, result.Attributes) + }) + + t.Run("ratelimiting attributes disabled", func(t *testing.T) { + t.Parallel() + + s := newRateLimitingSampler(1, true) + result := s.ShouldSample(trace.SamplingParameters{Name: testOperationName}) + assert.Equal(t, trace.RecordAndSample, result.Decision) + assert.Nil(t, result.Attributes) + result = s.ShouldSample(trace.SamplingParameters{Name: testOperationName}) + assert.Equal(t, trace.Drop, result.Decision) + assert.Nil(t, result.Attributes) + }) + + t.Run("per operation", func(t *testing.T) { + t.Parallel() + + samplingRates := []*jaeger_api_v2.OperationSamplingStrategy{ + { + Operation: testOperationName, + ProbabilisticSampling: &jaeger_api_v2.ProbabilisticSamplingStrategy{SamplingRate: testDefaultSamplingProbability}, + }, + } + strategies := &jaeger_api_v2.PerOperationSamplingStrategies{ + DefaultSamplingProbability: testDefaultSamplingProbability, + DefaultLowerBoundTracesPerSecond: 1.0, + PerOperationStrategies: samplingRates, + } + s := newPerOperationSampler(perOperationSamplerParams{ + MaxOperations: testDefaultMaxOperations, + Strategies: strategies, + }, false) + + result := s.ShouldSample(makeSamplingParameters(testMaxID+10, testOperationName)) + assert.Equal(t, trace.RecordAndSample, result.Decision) + assert.Equal(t, []attribute.KeyValue{attribute.String(samplerTypeKey, samplerTypeValueRateLimiting), attribute.Float64(samplerParamKey, 1)}, result.Attributes) + + result = s.ShouldSample(makeSamplingParameters(testMaxID-20, testOperationName)) + assert.Equal(t, trace.RecordAndSample, result.Decision) + assert.Equal(t, []attribute.KeyValue{attribute.String(samplerTypeKey, samplerTypeValueProbabilistic), attribute.Float64(samplerParamKey, 0.5)}, result.Attributes) + + result = s.ShouldSample(makeSamplingParameters(testMaxID+10, testOperationName)) + assert.Equal(t, trace.Drop, result.Decision) + assert.Nil(t, result.Attributes) + + // This operation is seen for the first time by the s + result = s.ShouldSample(makeSamplingParameters(testMaxID, testFirstTimeOperationName)) + assert.Equal(t, trace.RecordAndSample, result.Decision) + assert.Equal(t, []attribute.KeyValue{attribute.String(samplerTypeKey, samplerTypeValueRateLimiting), attribute.Float64(samplerParamKey, 1)}, result.Attributes) + }) + + t.Run("per operation attributes disabled", func(t *testing.T) { + t.Parallel() + + samplingRates := []*jaeger_api_v2.OperationSamplingStrategy{ + { + Operation: testOperationName, + ProbabilisticSampling: &jaeger_api_v2.ProbabilisticSamplingStrategy{SamplingRate: testDefaultSamplingProbability}, + }, + } + strategies := &jaeger_api_v2.PerOperationSamplingStrategies{ + DefaultSamplingProbability: testDefaultSamplingProbability, + DefaultLowerBoundTracesPerSecond: 1.0, + PerOperationStrategies: samplingRates, + } + s := newPerOperationSampler(perOperationSamplerParams{ + MaxOperations: testDefaultMaxOperations, + Strategies: strategies, + }, true) + + result := s.ShouldSample(makeSamplingParameters(testMaxID+10, testOperationName)) + assert.Equal(t, trace.RecordAndSample, result.Decision) + assert.Nil(t, result.Attributes) + + result = s.ShouldSample(makeSamplingParameters(testMaxID-20, testOperationName)) + assert.Equal(t, trace.RecordAndSample, result.Decision) + assert.Nil(t, result.Attributes) + + result = s.ShouldSample(makeSamplingParameters(testMaxID+10, testOperationName)) + assert.Equal(t, trace.Drop, result.Decision) + assert.Nil(t, result.Attributes) + + // This operation is seen for the first time by the s + result = s.ShouldSample(makeSamplingParameters(testMaxID, testFirstTimeOperationName)) + assert.Equal(t, trace.RecordAndSample, result.Decision) + assert.Nil(t, result.Attributes) + }) +}