Skip to content
Open
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- Add unmarshaling and validation for `CardinalityLimits` and `SpanLimits` to v1.0.0 model in `go.opentelemetry.io/contrib/otelconf`. (#8043)
- Add unmarshaling and validation for `BatchLogRecordProcessor`, `BatchSpanProcessor`, and `PeriodicMetricReader` to v1.0.0 model in `go.opentelemetry.io/contrib/otelconf`. (#8049)
- Add unmarshaling and validation for `TextMapPropagator` to v1.0.0 model in `go.opentelemetry.io/contrib/otelconf`. (#8052)
- Add `jaeger.sampler.type`/`jaeger.sampler.param` attributes for adaptive sampling support and option `WithAttributesDisabled` in `go.opentelemetry.io/contrib/samplers/jaegerremote`. (#8073)
- Add unmarshaling and validation for `OTLPHttpExporter`, `OTLPGrpcExporter`, `OTLPGrpcMetricExporter` and `OTLPHttpMetricExporter` to v1.0.0 model in `go.opentelemetry.io/contrib/otelconf`. (#8112)

### Changed
Expand Down
2 changes: 1 addition & 1 deletion samplers/jaegerremote/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
github.com/gogo/protobuf v1.3.2
github.com/jaegertracing/jaeger-idl v0.6.0
github.com/stretchr/testify v1.11.1
go.opentelemetry.io/otel v1.38.0
go.opentelemetry.io/otel/sdk v1.38.0
go.opentelemetry.io/otel/trace v1.38.0
)
Expand All @@ -18,7 +19,6 @@ require (
github.com/google/uuid v1.6.0 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
go.opentelemetry.io/auto/sdk v1.2.1 // indirect
go.opentelemetry.io/otel v1.38.0 // indirect
go.opentelemetry.io/otel/metric v1.38.0 // indirect
golang.org/x/net v0.46.0 // indirect
golang.org/x/sys v0.37.0 // indirect
Expand Down
67 changes: 52 additions & 15 deletions samplers/jaegerremote/sampler.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"sync"

jaeger_api_v2 "github.com/jaegertracing/jaeger-idl/proto-gen/api_v2"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/trace"
oteltrace "go.opentelemetry.io/otel/trace"

Expand All @@ -34,25 +35,40 @@ const (
defaultMaxOperations = 2000
)

const (
samplerTypeKey = "jaeger.sampler.type"
samplerParamKey = "jaeger.sampler.param"
samplerTypeValueProbabilistic = "probabilistic"
samplerTypeValueRateLimiting = "ratelimiting"
)

// -----------------------

// probabilisticSampler is a sampler that randomly samples a certain percentage
// of traces.
type probabilisticSampler struct {
samplingRate float64
sampler trace.Sampler
samplingRate float64
sampler trace.Sampler
attributes []attribute.KeyValue
attributesDisabled bool
}

// newProbabilisticSampler creates a sampler that randomly samples a certain percentage of traces specified by the
// samplingRate, in the range between 0.0 and 1.0. it utilizes the SDK `trace.TraceIDRatioBased` sampler.
func newProbabilisticSampler(samplingRate float64) *probabilisticSampler {
s := new(probabilisticSampler)
func newProbabilisticSampler(samplingRate float64, attributesDisabled bool) *probabilisticSampler {
s := &probabilisticSampler{
attributesDisabled: attributesDisabled,
}
return s.init(samplingRate)
}

func (s *probabilisticSampler) init(samplingRate float64) *probabilisticSampler {
s.samplingRate = math.Max(0.0, math.Min(samplingRate, 1.0))
s.sampler = trace.TraceIDRatioBased(s.samplingRate)
if s.attributesDisabled {
return s
}
s.attributes = []attribute.KeyValue{attribute.String(samplerTypeKey, samplerTypeValueProbabilistic), attribute.Float64(samplerParamKey, s.samplingRate)}
return s
}

Expand All @@ -62,7 +78,12 @@ func (s *probabilisticSampler) SamplingRate() float64 {
}

func (s *probabilisticSampler) ShouldSample(p trace.SamplingParameters) trace.SamplingResult {
return s.sampler.ShouldSample(p)
r := s.sampler.ShouldSample(p)
if r.Decision == trace.Drop {
return r
}
r.Attributes = s.attributes
return r
}

// Equal compares with another sampler.
Expand Down Expand Up @@ -95,11 +116,16 @@ func (s *probabilisticSampler) Description() string {
type rateLimitingSampler struct {
maxTracesPerSecond float64
rateLimiter *ratelimiter.RateLimiter
attributes []attribute.KeyValue
attributesDisabled bool
}

// newRateLimitingSampler creates new rateLimitingSampler.
func newRateLimitingSampler(maxTracesPerSecond float64) *rateLimitingSampler {
s := new(rateLimitingSampler)
func newRateLimitingSampler(maxTracesPerSecond float64, attributesDisabled bool) *rateLimitingSampler {
s := &rateLimitingSampler{
attributesDisabled: attributesDisabled,
}

return s.init(maxTracesPerSecond)
}

Expand All @@ -110,6 +136,10 @@ func (s *rateLimitingSampler) init(maxTracesPerSecond float64) *rateLimitingSamp
s.rateLimiter.Update(maxTracesPerSecond, math.Max(maxTracesPerSecond, 1.0))
}
s.maxTracesPerSecond = maxTracesPerSecond
if s.attributesDisabled {
return s
}
s.attributes = []attribute.KeyValue{attribute.String(samplerTypeKey, samplerTypeValueRateLimiting), attribute.Float64(samplerParamKey, s.maxTracesPerSecond)}
return s
}

Expand All @@ -119,6 +149,7 @@ func (s *rateLimitingSampler) ShouldSample(p trace.SamplingParameters) trace.Sam
return trace.SamplingResult{
Decision: trace.RecordAndSample,
Tracestate: psc.TraceState(),
Attributes: s.attributes,
}
}
return trace.SamplingResult{
Expand Down Expand Up @@ -161,20 +192,22 @@ type guaranteedThroughputProbabilisticSampler struct {
lowerBoundSampler *rateLimitingSampler
samplingRate float64
lowerBound float64
attributesDisabled bool
}

func newGuaranteedThroughputProbabilisticSampler(lowerBound, samplingRate float64) *guaranteedThroughputProbabilisticSampler {
func newGuaranteedThroughputProbabilisticSampler(lowerBound, samplingRate float64, attributesDisabled bool) *guaranteedThroughputProbabilisticSampler {
s := &guaranteedThroughputProbabilisticSampler{
lowerBoundSampler: newRateLimitingSampler(lowerBound),
lowerBound: lowerBound,
lowerBoundSampler: newRateLimitingSampler(lowerBound, attributesDisabled),
lowerBound: lowerBound,
attributesDisabled: attributesDisabled,
}
s.setProbabilisticSampler(samplingRate)
return s
}

func (s *guaranteedThroughputProbabilisticSampler) setProbabilisticSampler(samplingRate float64) {
if s.probabilisticSampler == nil {
s.probabilisticSampler = newProbabilisticSampler(samplingRate)
s.probabilisticSampler = newProbabilisticSampler(samplingRate, s.attributesDisabled)
} else if s.samplingRate != samplingRate {
s.probabilisticSampler.init(samplingRate)
}
Expand Down Expand Up @@ -218,6 +251,7 @@ type perOperationSampler struct {

// see description in perOperationSamplerParams
operationNameLateBinding bool
attributesDisabled bool
}

// perOperationSamplerParams defines parameters when creating perOperationSampler.
Expand All @@ -238,7 +272,7 @@ type perOperationSamplerParams struct {
}

// newPerOperationSampler returns a new perOperationSampler.
func newPerOperationSampler(params perOperationSamplerParams) *perOperationSampler {
func newPerOperationSampler(params perOperationSamplerParams, attributesDisabled bool) *perOperationSampler {
if params.MaxOperations <= 0 {
params.MaxOperations = defaultMaxOperations
}
Expand All @@ -247,15 +281,17 @@ func newPerOperationSampler(params perOperationSamplerParams) *perOperationSampl
sampler := newGuaranteedThroughputProbabilisticSampler(
params.Strategies.DefaultLowerBoundTracesPerSecond,
strategy.ProbabilisticSampling.SamplingRate,
attributesDisabled,
)
samplers[strategy.Operation] = sampler
}
return &perOperationSampler{
samplers: samplers,
defaultSampler: newProbabilisticSampler(params.Strategies.DefaultSamplingProbability),
defaultSampler: newProbabilisticSampler(params.Strategies.DefaultSamplingProbability, attributesDisabled),
lowerBound: params.Strategies.DefaultLowerBoundTracesPerSecond,
maxOperations: params.MaxOperations,
operationNameLateBinding: params.OperationNameLateBinding,
attributesDisabled: attributesDisabled,
}
}

Expand Down Expand Up @@ -284,7 +320,7 @@ func (s *perOperationSampler) getSamplerForOperation(operation string) trace.Sam
if len(s.samplers) >= s.maxOperations {
return s.defaultSampler
}
newSampler := newGuaranteedThroughputProbabilisticSampler(s.lowerBound, s.defaultSampler.SamplingRate())
newSampler := newGuaranteedThroughputProbabilisticSampler(s.lowerBound, s.defaultSampler.SamplingRate(), s.attributesDisabled)
s.samplers[operation] = newSampler
return newSampler
}
Expand All @@ -308,13 +344,14 @@ func (s *perOperationSampler) update(strategies *jaeger_api_v2.PerOperationSampl
sampler := newGuaranteedThroughputProbabilisticSampler(
lowerBound,
samplingRate,
s.attributesDisabled,
)
newSamplers[operation] = sampler
}
}
s.lowerBound = strategies.DefaultLowerBoundTracesPerSecond
if s.defaultSampler.SamplingRate() != strategies.DefaultSamplingProbability {
s.defaultSampler = newProbabilisticSampler(strategies.DefaultSamplingProbability)
s.defaultSampler = newProbabilisticSampler(strategies.DefaultSamplingProbability, s.attributesDisabled)
}
s.samplers = newSamplers
}
16 changes: 8 additions & 8 deletions samplers/jaegerremote/sampler_remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ type samplingStrategyParser interface {
//
// Sampler invokes the updaters while holding a lock on the main sampler.
type samplerUpdater interface {
Update(sampler trace.Sampler, strategy any) (modified trace.Sampler, err error)
Update(sampler trace.Sampler, strategy any, attributesDisabled bool) (modified trace.Sampler, err error)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not necessary to change this interface to pass attributesDisabled. When individual updaters are created the value of this flag is known and can be captured in the updaters themselves. This way we separate configuration information from the execution information (only sampler and strategy are variable at runtime)

Copy link
Contributor Author

@etilite etilite Nov 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point! Can I remove withUpdaters option then? Cause it makes harder to properly create updaters and pass attributesDisabled to them. Anyway withUpdaters is not needed for tests now (can be safely removed) and seems to be an old arfifact

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

withUpdaters is private, safe to remove if not needed. These APIs used to be public for extensibility.

}

// Sampler is a delegating sampler that polls a remote server
Expand Down Expand Up @@ -174,7 +174,7 @@ func (s *Sampler) UpdateSampler() {
// NB: this function should only be called while holding a Write lock.
func (s *Sampler) updateSamplerViaUpdaters(strategy any) error {
for _, updater := range s.updaters {
sampler, err := updater.Update(s.sampler, strategy)
sampler, err := updater.Update(s.sampler, strategy, s.attributesDisabled)
if err != nil {
return err
}
Expand All @@ -192,7 +192,7 @@ func (s *Sampler) updateSamplerViaUpdaters(strategy any) error {
type probabilisticSamplerUpdater struct{}

// Update implements Update of samplerUpdater.
func (*probabilisticSamplerUpdater) Update(sampler trace.Sampler, strategy any) (trace.Sampler, error) {
func (*probabilisticSamplerUpdater) Update(sampler trace.Sampler, strategy any, attributesDisabled bool) (trace.Sampler, error) {
type response interface {
GetProbabilisticSampling() *jaeger_api_v2.ProbabilisticSamplingStrategy
}
Expand All @@ -205,7 +205,7 @@ func (*probabilisticSamplerUpdater) Update(sampler trace.Sampler, strategy any)
}
return sampler, nil
}
return newProbabilisticSampler(probabilistic.SamplingRate), nil
return newProbabilisticSampler(probabilistic.SamplingRate, attributesDisabled), nil
}
}
return nil, nil
Expand All @@ -217,7 +217,7 @@ func (*probabilisticSamplerUpdater) Update(sampler trace.Sampler, strategy any)
type rateLimitingSamplerUpdater struct{}

// Update implements Update of samplerUpdater.
func (*rateLimitingSamplerUpdater) Update(sampler trace.Sampler, strategy any) (trace.Sampler, error) {
func (*rateLimitingSamplerUpdater) Update(sampler trace.Sampler, strategy any, attributesDisabled bool) (trace.Sampler, error) {
type response interface {
GetRateLimitingSampling() *jaeger_api_v2.RateLimitingSamplingStrategy
}
Expand All @@ -229,7 +229,7 @@ func (*rateLimitingSamplerUpdater) Update(sampler trace.Sampler, strategy any) (
rl.Update(rateLimit)
return rl, nil
}
return newRateLimitingSampler(rateLimit), nil
return newRateLimitingSampler(rateLimit, attributesDisabled), nil
}
}
return nil, nil
Expand All @@ -245,7 +245,7 @@ type perOperationSamplerUpdater struct {
}

// Update implements Update of samplerUpdater.
func (u *perOperationSamplerUpdater) Update(sampler trace.Sampler, strategy any) (trace.Sampler, error) {
func (u *perOperationSamplerUpdater) Update(sampler trace.Sampler, strategy any, attributesDisabled bool) (trace.Sampler, error) {
type response interface {
GetOperationSampling() *jaeger_api_v2.PerOperationSamplingStrategies
}
Expand All @@ -260,7 +260,7 @@ func (u *perOperationSamplerUpdater) Update(sampler trace.Sampler, strategy any)
MaxOperations: u.MaxOperations,
OperationNameLateBinding: u.OperationNameLateBinding,
Strategies: operations,
}), nil
}, attributesDisabled), nil
}
}
return nil, nil
Expand Down
Loading
Loading