Skip to content
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- Add unmarshaling and validation for `CardinalityLimits` and `SpanLimits` to v1.0.0 model in `go.opentelemetry.io/contrib/otelconf`. (#8043)
- Add unmarshaling and validation for `BatchLogRecordProcessor`, `BatchSpanProcessor`, and `PeriodicMetricReader` to v1.0.0 model in `go.opentelemetry.io/contrib/otelconf`. (#8049)
- Add unmarshaling and validation for `TextMapPropagator` to v1.0.0 model in `go.opentelemetry.io/contrib/otelconf`. (#8052)
- Add `jaeger.sampler.type`/`jaeger.sampler.param` attributes for adaptive sampling support and option `WithAttributesDisabled` in `go.opentelemetry.io/contrib/samplers/jaegerremote`. (#8073)
- Add unmarshaling and validation for `OTLPHttpExporter`, `OTLPGrpcExporter`, `OTLPGrpcMetricExporter` and `OTLPHttpMetricExporter` to v1.0.0 model in `go.opentelemetry.io/contrib/otelconf`. (#8112)

### Changed
Expand Down
2 changes: 1 addition & 1 deletion samplers/jaegerremote/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
github.com/gogo/protobuf v1.3.2
github.com/jaegertracing/jaeger-idl v0.6.0
github.com/stretchr/testify v1.11.1
go.opentelemetry.io/otel v1.38.0
go.opentelemetry.io/otel/sdk v1.38.0
go.opentelemetry.io/otel/trace v1.38.0
)
Expand All @@ -18,7 +19,6 @@ require (
github.com/google/uuid v1.6.0 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
go.opentelemetry.io/auto/sdk v1.2.1 // indirect
go.opentelemetry.io/otel v1.38.0 // indirect
go.opentelemetry.io/otel/metric v1.38.0 // indirect
golang.org/x/net v0.46.0 // indirect
golang.org/x/sys v0.37.0 // indirect
Expand Down
67 changes: 52 additions & 15 deletions samplers/jaegerremote/sampler.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"sync"

jaeger_api_v2 "github.com/jaegertracing/jaeger-idl/proto-gen/api_v2"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/trace"
oteltrace "go.opentelemetry.io/otel/trace"

Expand All @@ -34,25 +35,40 @@ const (
defaultMaxOperations = 2000
)

const (
samplerTypeKey = "jaeger.sampler.type"
samplerParamKey = "jaeger.sampler.param"
samplerTypeValueProbabilistic = "probabilistic"
samplerTypeValueRateLimiting = "ratelimiting"
)

// -----------------------

// probabilisticSampler is a sampler that randomly samples a certain percentage
// of traces.
type probabilisticSampler struct {
samplingRate float64
sampler trace.Sampler
samplingRate float64
sampler trace.Sampler
attributes []attribute.KeyValue
attributesDisabled bool
}

// newProbabilisticSampler creates a sampler that randomly samples a certain percentage of traces specified by the
// samplingRate, in the range between 0.0 and 1.0. it utilizes the SDK `trace.TraceIDRatioBased` sampler.
func newProbabilisticSampler(samplingRate float64) *probabilisticSampler {
s := new(probabilisticSampler)
func newProbabilisticSampler(samplingRate float64, attributesDisabled bool) *probabilisticSampler {
s := &probabilisticSampler{
attributesDisabled: attributesDisabled,
}
return s.init(samplingRate)
}

func (s *probabilisticSampler) init(samplingRate float64) *probabilisticSampler {
s.samplingRate = math.Max(0.0, math.Min(samplingRate, 1.0))
s.sampler = trace.TraceIDRatioBased(s.samplingRate)
if s.attributesDisabled {
return s
}
s.attributes = []attribute.KeyValue{attribute.String(samplerTypeKey, samplerTypeValueProbabilistic), attribute.Float64(samplerParamKey, s.samplingRate)}
return s
}

Expand All @@ -62,7 +78,12 @@ func (s *probabilisticSampler) SamplingRate() float64 {
}

func (s *probabilisticSampler) ShouldSample(p trace.SamplingParameters) trace.SamplingResult {
return s.sampler.ShouldSample(p)
r := s.sampler.ShouldSample(p)
if r.Decision == trace.Drop {
return r
}
r.Attributes = s.attributes
return r
}

// Equal compares with another sampler.
Expand Down Expand Up @@ -95,11 +116,16 @@ func (s *probabilisticSampler) Description() string {
type rateLimitingSampler struct {
maxTracesPerSecond float64
rateLimiter *ratelimiter.RateLimiter
attributes []attribute.KeyValue
attributesDisabled bool
}

// newRateLimitingSampler creates new rateLimitingSampler.
func newRateLimitingSampler(maxTracesPerSecond float64) *rateLimitingSampler {
s := new(rateLimitingSampler)
func newRateLimitingSampler(maxTracesPerSecond float64, attributesDisabled bool) *rateLimitingSampler {
s := &rateLimitingSampler{
attributesDisabled: attributesDisabled,
}

return s.init(maxTracesPerSecond)
}

Expand All @@ -110,6 +136,10 @@ func (s *rateLimitingSampler) init(maxTracesPerSecond float64) *rateLimitingSamp
s.rateLimiter.Update(maxTracesPerSecond, math.Max(maxTracesPerSecond, 1.0))
}
s.maxTracesPerSecond = maxTracesPerSecond
if s.attributesDisabled {
return s
}
s.attributes = []attribute.KeyValue{attribute.String(samplerTypeKey, samplerTypeValueRateLimiting), attribute.Float64(samplerParamKey, s.maxTracesPerSecond)}
return s
}

Expand All @@ -119,6 +149,7 @@ func (s *rateLimitingSampler) ShouldSample(p trace.SamplingParameters) trace.Sam
return trace.SamplingResult{
Decision: trace.RecordAndSample,
Tracestate: psc.TraceState(),
Attributes: s.attributes,
}
}
return trace.SamplingResult{
Expand Down Expand Up @@ -161,20 +192,22 @@ type guaranteedThroughputProbabilisticSampler struct {
lowerBoundSampler *rateLimitingSampler
samplingRate float64
lowerBound float64
attributesDisabled bool
}

func newGuaranteedThroughputProbabilisticSampler(lowerBound, samplingRate float64) *guaranteedThroughputProbabilisticSampler {
func newGuaranteedThroughputProbabilisticSampler(lowerBound, samplingRate float64, attributesDisabled bool) *guaranteedThroughputProbabilisticSampler {
s := &guaranteedThroughputProbabilisticSampler{
lowerBoundSampler: newRateLimitingSampler(lowerBound),
lowerBound: lowerBound,
lowerBoundSampler: newRateLimitingSampler(lowerBound, attributesDisabled),
lowerBound: lowerBound,
attributesDisabled: attributesDisabled,
}
s.setProbabilisticSampler(samplingRate)
return s
}

func (s *guaranteedThroughputProbabilisticSampler) setProbabilisticSampler(samplingRate float64) {
if s.probabilisticSampler == nil {
s.probabilisticSampler = newProbabilisticSampler(samplingRate)
s.probabilisticSampler = newProbabilisticSampler(samplingRate, s.attributesDisabled)
} else if s.samplingRate != samplingRate {
s.probabilisticSampler.init(samplingRate)
}
Expand Down Expand Up @@ -218,6 +251,7 @@ type perOperationSampler struct {

// see description in perOperationSamplerParams
operationNameLateBinding bool
attributesDisabled bool
}

// perOperationSamplerParams defines parameters when creating perOperationSampler.
Expand All @@ -238,7 +272,7 @@ type perOperationSamplerParams struct {
}

// newPerOperationSampler returns a new perOperationSampler.
func newPerOperationSampler(params perOperationSamplerParams) *perOperationSampler {
func newPerOperationSampler(params perOperationSamplerParams, attributesDisabled bool) *perOperationSampler {
if params.MaxOperations <= 0 {
params.MaxOperations = defaultMaxOperations
}
Expand All @@ -247,15 +281,17 @@ func newPerOperationSampler(params perOperationSamplerParams) *perOperationSampl
sampler := newGuaranteedThroughputProbabilisticSampler(
params.Strategies.DefaultLowerBoundTracesPerSecond,
strategy.ProbabilisticSampling.SamplingRate,
attributesDisabled,
)
samplers[strategy.Operation] = sampler
}
return &perOperationSampler{
samplers: samplers,
defaultSampler: newProbabilisticSampler(params.Strategies.DefaultSamplingProbability),
defaultSampler: newProbabilisticSampler(params.Strategies.DefaultSamplingProbability, attributesDisabled),
lowerBound: params.Strategies.DefaultLowerBoundTracesPerSecond,
maxOperations: params.MaxOperations,
operationNameLateBinding: params.OperationNameLateBinding,
attributesDisabled: attributesDisabled,
}
}

Expand Down Expand Up @@ -284,7 +320,7 @@ func (s *perOperationSampler) getSamplerForOperation(operation string) trace.Sam
if len(s.samplers) >= s.maxOperations {
return s.defaultSampler
}
newSampler := newGuaranteedThroughputProbabilisticSampler(s.lowerBound, s.defaultSampler.SamplingRate())
newSampler := newGuaranteedThroughputProbabilisticSampler(s.lowerBound, s.defaultSampler.SamplingRate(), s.attributesDisabled)
s.samplers[operation] = newSampler
return newSampler
}
Expand All @@ -308,13 +344,14 @@ func (s *perOperationSampler) update(strategies *jaeger_api_v2.PerOperationSampl
sampler := newGuaranteedThroughputProbabilisticSampler(
lowerBound,
samplingRate,
s.attributesDisabled,
)
newSamplers[operation] = sampler
}
}
s.lowerBound = strategies.DefaultLowerBoundTracesPerSecond
if s.defaultSampler.SamplingRate() != strategies.DefaultSamplingProbability {
s.defaultSampler = newProbabilisticSampler(strategies.DefaultSamplingProbability)
s.defaultSampler = newProbabilisticSampler(strategies.DefaultSamplingProbability, s.attributesDisabled)
}
s.samplers = newSamplers
}
19 changes: 12 additions & 7 deletions samplers/jaegerremote/sampler_remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,10 +189,12 @@ func (s *Sampler) updateSamplerViaUpdaters(strategy any) error {
// -----------------------

// probabilisticSamplerUpdater is used by Sampler to parse sampling configuration.
type probabilisticSamplerUpdater struct{}
type probabilisticSamplerUpdater struct {
attributesDisabled bool
}

// Update implements Update of samplerUpdater.
func (*probabilisticSamplerUpdater) Update(sampler trace.Sampler, strategy any) (trace.Sampler, error) {
func (u *probabilisticSamplerUpdater) Update(sampler trace.Sampler, strategy any) (trace.Sampler, error) {
type response interface {
GetProbabilisticSampling() *jaeger_api_v2.ProbabilisticSamplingStrategy
}
Expand All @@ -205,7 +207,7 @@ func (*probabilisticSamplerUpdater) Update(sampler trace.Sampler, strategy any)
}
return sampler, nil
}
return newProbabilisticSampler(probabilistic.SamplingRate), nil
return newProbabilisticSampler(probabilistic.SamplingRate, u.attributesDisabled), nil
}
}
return nil, nil
Expand All @@ -214,10 +216,12 @@ func (*probabilisticSamplerUpdater) Update(sampler trace.Sampler, strategy any)
// -----------------------

// rateLimitingSamplerUpdater is used by Sampler to parse sampling configuration.
type rateLimitingSamplerUpdater struct{}
type rateLimitingSamplerUpdater struct {
attributesDisabled bool
}

// Update implements Update of samplerUpdater.
func (*rateLimitingSamplerUpdater) Update(sampler trace.Sampler, strategy any) (trace.Sampler, error) {
func (u *rateLimitingSamplerUpdater) Update(sampler trace.Sampler, strategy any) (trace.Sampler, error) {
type response interface {
GetRateLimitingSampling() *jaeger_api_v2.RateLimitingSamplingStrategy
}
Expand All @@ -229,7 +233,7 @@ func (*rateLimitingSamplerUpdater) Update(sampler trace.Sampler, strategy any) (
rl.Update(rateLimit)
return rl, nil
}
return newRateLimitingSampler(rateLimit), nil
return newRateLimitingSampler(rateLimit, u.attributesDisabled), nil
}
}
return nil, nil
Expand All @@ -242,6 +246,7 @@ func (*rateLimitingSamplerUpdater) Update(sampler trace.Sampler, strategy any) (
type perOperationSamplerUpdater struct {
MaxOperations int
OperationNameLateBinding bool
attributesDisabled bool
}

// Update implements Update of samplerUpdater.
Expand All @@ -260,7 +265,7 @@ func (u *perOperationSamplerUpdater) Update(sampler trace.Sampler, strategy any)
MaxOperations: u.MaxOperations,
OperationNameLateBinding: u.OperationNameLateBinding,
Strategies: operations,
}), nil
}, u.attributesDisabled), nil
}
}
return nil, nil
Expand Down
Loading
Loading