Skip to content
Open
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- Add unmarshaling and validation for `CardinalityLimits` and `SpanLimits` to v1.0.0 model in `go.opentelemetry.io/contrib/otelconf`. (#8043)
- Add unmarshaling and validation for `BatchLogRecordProcessor`, `BatchSpanProcessor`, and `PeriodicMetricReader` to v1.0.0 model in `go.opentelemetry.io/contrib/otelconf`. (#8049)
- Add unmarshaling and validation for `TextMapPropagator` to v1.0.0 model in `go.opentelemetry.io/contrib/otelconf`. (#8052)
- Add option `WithAttributesOn` to enable `sampler.type`/`sampler.param` attributes for adaptive sampling support in `go.opentelemetry.io/contrib/samplers/jaegerremote`. (#8073)
- Add unmarshaling and validation for `OTLPHttpExporter`, `OTLPGrpcExporter`, `OTLPGrpcMetricExporter` and `OTLPHttpMetricExporter` to v1.0.0 model in `go.opentelemetry.io/contrib/otelconf`. (#8112)

### Changed
Expand Down
2 changes: 1 addition & 1 deletion samplers/jaegerremote/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
github.com/gogo/protobuf v1.3.2
github.com/jaegertracing/jaeger-idl v0.6.0
github.com/stretchr/testify v1.11.1
go.opentelemetry.io/otel v1.38.0
go.opentelemetry.io/otel/sdk v1.38.0
go.opentelemetry.io/otel/trace v1.38.0
)
Expand All @@ -18,7 +19,6 @@ require (
github.com/google/uuid v1.6.0 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
go.opentelemetry.io/auto/sdk v1.2.1 // indirect
go.opentelemetry.io/otel v1.38.0 // indirect
go.opentelemetry.io/otel/metric v1.38.0 // indirect
golang.org/x/net v0.46.0 // indirect
golang.org/x/sys v0.37.0 // indirect
Expand Down
80 changes: 68 additions & 12 deletions samplers/jaegerremote/sampler.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"sync"

jaeger_api_v2 "github.com/jaegertracing/jaeger-idl/proto-gen/api_v2"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/trace"
oteltrace "go.opentelemetry.io/otel/trace"

Expand All @@ -34,25 +35,55 @@ const (
defaultMaxOperations = 2000
)

const (
samplerTypeKey = "sampler.type"
samplerTypeValueProbabilistic = "probabilistic"
samplerTypeValueRateLimiting = "ratelimiting"
samplerParamKey = "sampler.param"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would make sense to prefix with jaeger., This may require small change in the backend to recognize, but it separates custom attributes into clear namespace.

Suggested change
samplerTypeKey = "sampler.type"
samplerTypeValueProbabilistic = "probabilistic"
samplerTypeValueRateLimiting = "ratelimiting"
samplerParamKey = "sampler.param"
samplerTypeKey = "jaeger.sampler.type"
samplerParamKey = "jaeger.sampler.param"
samplerTypeValueProbabilistic = "probabilistic"
samplerTypeValueRateLimiting = "ratelimiting"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this is reasonable but I see some difficulties when using in mixed environments (jaeger and otel services) where will be different sampler attributes in that case. Not sure if backend can support more that one key.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a Jaeger-specific functionality and a Jaeger-specific sampler. Since it's not setting any attributes now then nothing is depending on them.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So can we keep canonical keys? I did small research and using new keys will require some overhead on collector to transform these attributes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since those are jaeger specific, prefixing them with jaeger definitely makes sense.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added prefix

)

type samplerOptions struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is redundant. The package already has the functional options mechanism

func New(
        serviceName string,
        opts ...Option,
) *Sampler {
        options := newConfig(opts...)

So WithAttributesOn() should be implemented as Option just like other options, and should set a flag on the config struct.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the idea behind these internal options is to have different behavior when creating samplers without need to change every test to add new argument in constructor.
I can use flag arg in sampler's constructors if it is ok with you.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

without need to change every test to add new argument in constructor.

a) why do you need to change every test? do they all compare attributes?
b) even if you need to, it seems like a trivial task for a Copilot

Copy link
Contributor Author

@etilite etilite Nov 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a) cause a lot of tests call these constructors
b) yeah but it is harder to review a lot of small changes
nvm I'll change to flag argument 👍

attributesOn bool
}

type samplerOptionFunc func(*samplerOptions)

func withAttributesOn() samplerOptionFunc {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would make then on by default, and have this option to turn off. The function needs to be public in order to be used from a different package.

Suggested change
func withAttributesOn() samplerOptionFunc {
func WithAttributesDisabled() samplerOptionFunc {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wasn't sure that we want to change current default behavior - jaegerremote does not set any attributes.

This is supposed to be an internal option for creating and updating samplers which constructors also aren't exported.

For external usage I added WithAttributesOn option.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do want to change the current behavior. The current behavior is useless, the sampler does not actually work with Jaeger's adaptive backend. And the change itself is additive, it's backwards compatible (especially since the new attributes are prefixed with jaeger.).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need an option to disable these attributes in that case? If not implementation will be simple and no need for internal sampler options too

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally I would argue you don't even need the opt-out option, OTEL SDK is already extremely verbose in terms of how many attributes it captures, having two extra ones on root span only seems irrelevant. But for 100% backwards compat you would provide an option to opt-out.

return func(o *samplerOptions) {
o.attributesOn = true
}
}

// -----------------------

// probabilisticSampler is a sampler that randomly samples a certain percentage
// of traces.
type probabilisticSampler struct {
samplingRate float64
sampler trace.Sampler
attributes []attribute.KeyValue
attributesOn bool
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this flag is only used at initialization, you don't really need to store it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When probabilisticSampler or ratelimitngSampler are being updated its init method is also called, so I not sure if we can omit this flag.

}

// newProbabilisticSampler creates a sampler that randomly samples a certain percentage of traces specified by the
// samplingRate, in the range between 0.0 and 1.0. it utilizes the SDK `trace.TraceIDRatioBased` sampler.
func newProbabilisticSampler(samplingRate float64) *probabilisticSampler {
s := new(probabilisticSampler)
func newProbabilisticSampler(samplingRate float64, opts ...samplerOptionFunc) *probabilisticSampler {
var options samplerOptions
for _, fn := range opts {
fn(&options)
}
s := &probabilisticSampler{
attributesOn: options.attributesOn,
}
return s.init(samplingRate)
}

func (s *probabilisticSampler) init(samplingRate float64) *probabilisticSampler {
s.samplingRate = math.Max(0.0, math.Min(samplingRate, 1.0))
s.sampler = trace.TraceIDRatioBased(s.samplingRate)
if s.attributesOn {
s.attributes = []attribute.KeyValue{attribute.String(samplerTypeKey, samplerTypeValueProbabilistic), attribute.Float64(samplerParamKey, s.samplingRate)}
}
return s
}

Expand All @@ -62,7 +93,12 @@ func (s *probabilisticSampler) SamplingRate() float64 {
}

func (s *probabilisticSampler) ShouldSample(p trace.SamplingParameters) trace.SamplingResult {
return s.sampler.ShouldSample(p)
r := s.sampler.ShouldSample(p)
if r.Decision == trace.Drop {
return r
}
r.Attributes = s.attributes
return r
}

// Equal compares with another sampler.
Expand Down Expand Up @@ -95,11 +131,20 @@ func (s *probabilisticSampler) Description() string {
type rateLimitingSampler struct {
maxTracesPerSecond float64
rateLimiter *ratelimiter.RateLimiter
attributes []attribute.KeyValue
attributesOn bool
}

// newRateLimitingSampler creates new rateLimitingSampler.
func newRateLimitingSampler(maxTracesPerSecond float64) *rateLimitingSampler {
s := new(rateLimitingSampler)
func newRateLimitingSampler(maxTracesPerSecond float64, opts ...samplerOptionFunc) *rateLimitingSampler {
var options samplerOptions
for _, fn := range opts {
fn(&options)
}
s := &rateLimitingSampler{
attributesOn: options.attributesOn,
}

return s.init(maxTracesPerSecond)
}

Expand All @@ -110,6 +155,9 @@ func (s *rateLimitingSampler) init(maxTracesPerSecond float64) *rateLimitingSamp
s.rateLimiter.Update(maxTracesPerSecond, math.Max(maxTracesPerSecond, 1.0))
}
s.maxTracesPerSecond = maxTracesPerSecond
if s.attributesOn {
s.attributes = []attribute.KeyValue{attribute.String(samplerTypeKey, samplerTypeValueRateLimiting), attribute.Float64(samplerParamKey, s.maxTracesPerSecond)}
}
return s
}

Expand All @@ -119,6 +167,7 @@ func (s *rateLimitingSampler) ShouldSample(p trace.SamplingParameters) trace.Sam
return trace.SamplingResult{
Decision: trace.RecordAndSample,
Tracestate: psc.TraceState(),
Attributes: s.attributes,
}
}
return trace.SamplingResult{
Expand Down Expand Up @@ -161,20 +210,22 @@ type guaranteedThroughputProbabilisticSampler struct {
lowerBoundSampler *rateLimitingSampler
samplingRate float64
lowerBound float64
options []samplerOptionFunc
}

func newGuaranteedThroughputProbabilisticSampler(lowerBound, samplingRate float64) *guaranteedThroughputProbabilisticSampler {
func newGuaranteedThroughputProbabilisticSampler(lowerBound, samplingRate float64, opts ...samplerOptionFunc) *guaranteedThroughputProbabilisticSampler {
s := &guaranteedThroughputProbabilisticSampler{
lowerBoundSampler: newRateLimitingSampler(lowerBound),
lowerBoundSampler: newRateLimitingSampler(lowerBound, opts...),
lowerBound: lowerBound,
options: opts,
}
s.setProbabilisticSampler(samplingRate)
return s
}

func (s *guaranteedThroughputProbabilisticSampler) setProbabilisticSampler(samplingRate float64) {
if s.probabilisticSampler == nil {
s.probabilisticSampler = newProbabilisticSampler(samplingRate)
s.probabilisticSampler = newProbabilisticSampler(samplingRate, s.options...)
} else if s.samplingRate != samplingRate {
s.probabilisticSampler.init(samplingRate)
}
Expand Down Expand Up @@ -218,6 +269,8 @@ type perOperationSampler struct {

// see description in perOperationSamplerParams
operationNameLateBinding bool

options []samplerOptionFunc
}

// perOperationSamplerParams defines parameters when creating perOperationSampler.
Expand All @@ -238,7 +291,7 @@ type perOperationSamplerParams struct {
}

// newPerOperationSampler returns a new perOperationSampler.
func newPerOperationSampler(params perOperationSamplerParams) *perOperationSampler {
func newPerOperationSampler(params perOperationSamplerParams, opts ...samplerOptionFunc) *perOperationSampler {
if params.MaxOperations <= 0 {
params.MaxOperations = defaultMaxOperations
}
Expand All @@ -247,15 +300,17 @@ func newPerOperationSampler(params perOperationSamplerParams) *perOperationSampl
sampler := newGuaranteedThroughputProbabilisticSampler(
params.Strategies.DefaultLowerBoundTracesPerSecond,
strategy.ProbabilisticSampling.SamplingRate,
opts...,
)
samplers[strategy.Operation] = sampler
}
return &perOperationSampler{
samplers: samplers,
defaultSampler: newProbabilisticSampler(params.Strategies.DefaultSamplingProbability),
defaultSampler: newProbabilisticSampler(params.Strategies.DefaultSamplingProbability, opts...),
lowerBound: params.Strategies.DefaultLowerBoundTracesPerSecond,
maxOperations: params.MaxOperations,
operationNameLateBinding: params.OperationNameLateBinding,
options: opts,
}
}

Expand Down Expand Up @@ -284,7 +339,7 @@ func (s *perOperationSampler) getSamplerForOperation(operation string) trace.Sam
if len(s.samplers) >= s.maxOperations {
return s.defaultSampler
}
newSampler := newGuaranteedThroughputProbabilisticSampler(s.lowerBound, s.defaultSampler.SamplingRate())
newSampler := newGuaranteedThroughputProbabilisticSampler(s.lowerBound, s.defaultSampler.SamplingRate(), s.options...)
s.samplers[operation] = newSampler
return newSampler
}
Expand All @@ -308,13 +363,14 @@ func (s *perOperationSampler) update(strategies *jaeger_api_v2.PerOperationSampl
sampler := newGuaranteedThroughputProbabilisticSampler(
lowerBound,
samplingRate,
s.options...,
)
newSamplers[operation] = sampler
}
}
s.lowerBound = strategies.DefaultLowerBoundTracesPerSecond
if s.defaultSampler.SamplingRate() != strategies.DefaultSamplingProbability {
s.defaultSampler = newProbabilisticSampler(strategies.DefaultSamplingProbability)
s.defaultSampler = newProbabilisticSampler(strategies.DefaultSamplingProbability, s.options...)
}
s.samplers = newSamplers
}
16 changes: 8 additions & 8 deletions samplers/jaegerremote/sampler_remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ type samplingStrategyParser interface {
//
// Sampler invokes the updaters while holding a lock on the main sampler.
type samplerUpdater interface {
Update(sampler trace.Sampler, strategy any) (modified trace.Sampler, err error)
Update(sampler trace.Sampler, strategy any, samplerOptions ...samplerOptionFunc) (modified trace.Sampler, err error)
}

// Sampler is a delegating sampler that polls a remote server
Expand Down Expand Up @@ -174,7 +174,7 @@ func (s *Sampler) UpdateSampler() {
// NB: this function should only be called while holding a Write lock.
func (s *Sampler) updateSamplerViaUpdaters(strategy any) error {
for _, updater := range s.updaters {
sampler, err := updater.Update(s.sampler, strategy)
sampler, err := updater.Update(s.sampler, strategy, s.samplerOptions...)
if err != nil {
return err
}
Expand All @@ -192,7 +192,7 @@ func (s *Sampler) updateSamplerViaUpdaters(strategy any) error {
type probabilisticSamplerUpdater struct{}

// Update implements Update of samplerUpdater.
func (*probabilisticSamplerUpdater) Update(sampler trace.Sampler, strategy any) (trace.Sampler, error) {
func (*probabilisticSamplerUpdater) Update(sampler trace.Sampler, strategy any, samplerOptions ...samplerOptionFunc) (trace.Sampler, error) {
type response interface {
GetProbabilisticSampling() *jaeger_api_v2.ProbabilisticSamplingStrategy
}
Expand All @@ -205,7 +205,7 @@ func (*probabilisticSamplerUpdater) Update(sampler trace.Sampler, strategy any)
}
return sampler, nil
}
return newProbabilisticSampler(probabilistic.SamplingRate), nil
return newProbabilisticSampler(probabilistic.SamplingRate, samplerOptions...), nil
}
}
return nil, nil
Expand All @@ -217,7 +217,7 @@ func (*probabilisticSamplerUpdater) Update(sampler trace.Sampler, strategy any)
type rateLimitingSamplerUpdater struct{}

// Update implements Update of samplerUpdater.
func (*rateLimitingSamplerUpdater) Update(sampler trace.Sampler, strategy any) (trace.Sampler, error) {
func (*rateLimitingSamplerUpdater) Update(sampler trace.Sampler, strategy any, samplerOptions ...samplerOptionFunc) (trace.Sampler, error) {
type response interface {
GetRateLimitingSampling() *jaeger_api_v2.RateLimitingSamplingStrategy
}
Expand All @@ -229,7 +229,7 @@ func (*rateLimitingSamplerUpdater) Update(sampler trace.Sampler, strategy any) (
rl.Update(rateLimit)
return rl, nil
}
return newRateLimitingSampler(rateLimit), nil
return newRateLimitingSampler(rateLimit, samplerOptions...), nil
}
}
return nil, nil
Expand All @@ -245,7 +245,7 @@ type perOperationSamplerUpdater struct {
}

// Update implements Update of samplerUpdater.
func (u *perOperationSamplerUpdater) Update(sampler trace.Sampler, strategy any) (trace.Sampler, error) {
func (u *perOperationSamplerUpdater) Update(sampler trace.Sampler, strategy any, samplerOptions ...samplerOptionFunc) (trace.Sampler, error) {
type response interface {
GetOperationSampling() *jaeger_api_v2.PerOperationSamplingStrategies
}
Expand All @@ -260,7 +260,7 @@ func (u *perOperationSamplerUpdater) Update(sampler trace.Sampler, strategy any)
MaxOperations: u.MaxOperations,
OperationNameLateBinding: u.OperationNameLateBinding,
Strategies: operations,
}), nil
}, samplerOptions...), nil
}
}
return nil, nil
Expand Down
85 changes: 85 additions & 0 deletions samplers/jaegerremote/sampler_remote_integration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package jaegerremote_test

import (
"encoding/binary"
"testing"
"time"

jaeger_api_v2 "github.com/jaegertracing/jaeger-idl/proto-gen/api_v2"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/trace"
oteltrace "go.opentelemetry.io/otel/trace"

"go.opentelemetry.io/contrib/samplers/jaegerremote"
"go.opentelemetry.io/contrib/samplers/jaegerremote/internal/testutils"
)

const (
testDefaultSamplingProbability = 0.5
testMaxID = uint64(1) << 63
)

func TestRemotelyControlledSampler_WithAttributesOn(t *testing.T) {
agent, err := testutils.StartMockAgent()
require.NoError(t, err)

remoteSampler := jaegerremote.New(
"client app",
jaegerremote.WithSamplingServerURL("http://"+agent.SamplingServerAddr()),
jaegerremote.WithSamplingRefreshInterval(time.Minute),
jaegerremote.WithAttributesOn(),
)
remoteSampler.Close() // stop timer-based updates, we want to call them manually
defer agent.Close()

var traceID oteltrace.TraceID
binary.BigEndian.PutUint64(traceID[8:], testMaxID-20)

t.Run("probabilistic", func(t *testing.T) {
agent.AddSamplingStrategy("client app",
&jaeger_api_v2.SamplingStrategyResponse{
StrategyType: jaeger_api_v2.SamplingStrategyType_PROBABILISTIC,
ProbabilisticSampling: &jaeger_api_v2.ProbabilisticSamplingStrategy{
SamplingRate: testDefaultSamplingProbability,
},
})
remoteSampler.UpdateSampler()

result := remoteSampler.ShouldSample(trace.SamplingParameters{TraceID: traceID})
assert.Equal(t, trace.RecordAndSample, result.Decision)
assert.Equal(t, []attribute.KeyValue{attribute.String("sampler.type", "probabilistic"), attribute.Float64("sampler.param", 0.5)}, result.Attributes)
})

t.Run("ratelimitng", func(t *testing.T) {
agent.AddSamplingStrategy("client app",
&jaeger_api_v2.SamplingStrategyResponse{
StrategyType: jaeger_api_v2.SamplingStrategyType_RATE_LIMITING,
RateLimitingSampling: &jaeger_api_v2.RateLimitingSamplingStrategy{
MaxTracesPerSecond: 1,
},
})
remoteSampler.UpdateSampler()

result := remoteSampler.ShouldSample(trace.SamplingParameters{TraceID: traceID})
assert.Equal(t, trace.RecordAndSample, result.Decision)
assert.Equal(t, []attribute.KeyValue{attribute.String("sampler.type", "ratelimiting"), attribute.Float64("sampler.param", 1)}, result.Attributes)
})

t.Run("per operation", func(t *testing.T) {
agent.AddSamplingStrategy("client app",
&jaeger_api_v2.SamplingStrategyResponse{OperationSampling: &jaeger_api_v2.PerOperationSamplingStrategies{
DefaultSamplingProbability: testDefaultSamplingProbability,
DefaultLowerBoundTracesPerSecond: 1.0,
}})
remoteSampler.UpdateSampler()

result := remoteSampler.ShouldSample(trace.SamplingParameters{TraceID: traceID})
assert.Equal(t, trace.RecordAndSample, result.Decision)
assert.Equal(t, []attribute.KeyValue{attribute.String("sampler.type", "probabilistic"), attribute.Float64("sampler.param", 0.5)}, result.Attributes)
})
}
Loading
Loading