Skip to content
1 change: 1 addition & 0 deletions .codespellignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ nam
valu
thirdparty
addOpt
observ
105 changes: 18 additions & 87 deletions sdk/trace/batch_span_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,14 @@ package trace // import "go.opentelemetry.io/otel/sdk/trace"
import (
"context"
"errors"
"fmt"
"sync"
"sync/atomic"
"time"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/internal/global"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/sdk"
"go.opentelemetry.io/otel/sdk/internal/env"
"go.opentelemetry.io/otel/sdk/trace/internal/x"
semconv "go.opentelemetry.io/otel/semconv/v1.37.0"
"go.opentelemetry.io/otel/semconv/v1.37.0/otelconv"
"go.opentelemetry.io/otel/sdk/trace/internal/observ"
"go.opentelemetry.io/otel/trace"
)

Expand All @@ -33,8 +27,6 @@ const (
DefaultMaxExportBatchSize = 512
)

var queueFull = otelconv.ErrorTypeAttr("queue_full")

// BatchSpanProcessorOption configures a BatchSpanProcessor.
type BatchSpanProcessorOption func(o *BatchSpanProcessorOptions)

Expand Down Expand Up @@ -78,10 +70,7 @@ type batchSpanProcessor struct {
queue chan ReadOnlySpan
dropped uint32

selfObservabilityEnabled bool
callbackRegistration metric.Registration
spansProcessedCounter otelconv.SDKProcessorSpanProcessed
componentNameAttr attribute.KeyValue
inst *observ.BSP

batch []ReadOnlySpan
batchMutex sync.Mutex
Expand Down Expand Up @@ -124,19 +113,14 @@ func NewBatchSpanProcessor(exporter SpanExporter, options ...BatchSpanProcessorO
stopCh: make(chan struct{}),
}

if x.SelfObservability.Enabled() {
bsp.selfObservabilityEnabled = true
bsp.componentNameAttr = componentName()

var err error
bsp.spansProcessedCounter, bsp.callbackRegistration, err = newBSPObs(
bsp.componentNameAttr,
func() int64 { return int64(len(bsp.queue)) },
int64(bsp.o.MaxQueueSize),
)
if err != nil {
otel.Handle(err)
}
var err error
bsp.inst, err = observ.NewBSP(
nextProcessorID(),
func() int64 { return int64(len(bsp.queue)) },
int64(bsp.o.MaxQueueSize),
)
if err != nil {
otel.Handle(err)
}

bsp.stopWait.Add(1)
Expand All @@ -157,51 +141,6 @@ func nextProcessorID() int64 {
return processorIDCounter.Add(1) - 1
}

func componentName() attribute.KeyValue {
id := nextProcessorID()
name := fmt.Sprintf("%s/%d", otelconv.ComponentTypeBatchingSpanProcessor, id)
return semconv.OTelComponentName(name)
}

// newBSPObs creates and returns a new set of metrics instruments and a
// registration for a BatchSpanProcessor. It is the caller's responsibility
// to unregister the registration when it is no longer needed.
func newBSPObs(
cmpnt attribute.KeyValue,
qLen func() int64,
qMax int64,
) (otelconv.SDKProcessorSpanProcessed, metric.Registration, error) {
meter := otel.GetMeterProvider().Meter(
selfObsScopeName,
metric.WithInstrumentationVersion(sdk.Version()),
metric.WithSchemaURL(semconv.SchemaURL),
)

qCap, err := otelconv.NewSDKProcessorSpanQueueCapacity(meter)

qSize, e := otelconv.NewSDKProcessorSpanQueueSize(meter)
err = errors.Join(err, e)

spansProcessed, e := otelconv.NewSDKProcessorSpanProcessed(meter)
err = errors.Join(err, e)

cmpntT := semconv.OTelComponentTypeBatchingSpanProcessor
attrs := metric.WithAttributes(cmpnt, cmpntT)

reg, e := meter.RegisterCallback(
func(_ context.Context, o metric.Observer) error {
o.ObserveInt64(qSize.Inst(), qLen(), attrs)
o.ObserveInt64(qCap.Inst(), qMax, attrs)
return nil
},
qSize.Inst(),
qCap.Inst(),
)
err = errors.Join(err, e)

return spansProcessed, reg, err
}

// OnStart method does nothing.
func (*batchSpanProcessor) OnStart(context.Context, ReadWriteSpan) {}

Expand Down Expand Up @@ -242,8 +181,8 @@ func (bsp *batchSpanProcessor) Shutdown(ctx context.Context) error {
case <-ctx.Done():
err = ctx.Err()
}
if bsp.selfObservabilityEnabled {
err = errors.Join(err, bsp.callbackRegistration.Unregister())
if bsp.inst != nil {
err = errors.Join(err, bsp.inst.Shutdown())
}
})
return err
Expand Down Expand Up @@ -357,10 +296,8 @@ func (bsp *batchSpanProcessor) exportSpans(ctx context.Context) error {

if l := len(bsp.batch); l > 0 {
global.Debug("exporting spans", "count", len(bsp.batch), "total_dropped", atomic.LoadUint32(&bsp.dropped))
if bsp.selfObservabilityEnabled {
bsp.spansProcessedCounter.Add(ctx, int64(l),
bsp.componentNameAttr,
bsp.spansProcessedCounter.AttrComponentType(otelconv.ComponentTypeBatchingSpanProcessor))
if bsp.inst != nil {
bsp.inst.Processed(ctx, int64(l))
}
err := bsp.e.ExportSpans(ctx, bsp.batch)

Expand Down Expand Up @@ -470,11 +407,8 @@ func (bsp *batchSpanProcessor) enqueueBlockOnQueueFull(ctx context.Context, sd R
case bsp.queue <- sd:
return true
case <-ctx.Done():
if bsp.selfObservabilityEnabled {
bsp.spansProcessedCounter.Add(ctx, 1,
bsp.componentNameAttr,
bsp.spansProcessedCounter.AttrComponentType(otelconv.ComponentTypeBatchingSpanProcessor),
bsp.spansProcessedCounter.AttrErrorType(queueFull))
if bsp.inst != nil {
bsp.inst.ProcessedQueueFull(ctx, 1)
}
return false
}
Expand All @@ -490,11 +424,8 @@ func (bsp *batchSpanProcessor) enqueueDrop(ctx context.Context, sd ReadOnlySpan)
return true
default:
atomic.AddUint32(&bsp.dropped, 1)
if bsp.selfObservabilityEnabled {
bsp.spansProcessedCounter.Add(ctx, 1,
bsp.componentNameAttr,
bsp.spansProcessedCounter.AttrComponentType(otelconv.ComponentTypeBatchingSpanProcessor),
bsp.spansProcessedCounter.AttrErrorType(queueFull))
if bsp.inst != nil {
bsp.inst.ProcessedQueueFull(ctx, 1)
}
}
return false
Expand Down
59 changes: 43 additions & 16 deletions sdk/trace/batch_span_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,14 @@ import (
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
"go.opentelemetry.io/otel/sdk/trace/internal/observ"
semconv "go.opentelemetry.io/otel/semconv/v1.37.0"
"go.opentelemetry.io/otel/semconv/v1.37.0/otelconv"
"go.opentelemetry.io/otel/trace"
)

const componentID = 0

type testBatchExporter struct {
mu sync.Mutex
spans []ReadOnlySpan
Expand Down Expand Up @@ -693,6 +696,9 @@ func TestBatchSpanProcessorMetricsDisabled(t *testing.T) {
}

func TestBatchSpanProcessorMetrics(t *testing.T) {
// Reset for deterministic component ID.
processorIDCounter.Store(componentID)

t.Setenv("OTEL_GO_X_SELF_OBSERVABILITY", "true")
tp := basicTracerProvider(t)
reader := sdkmetric.NewManualReader()
Expand All @@ -710,7 +716,6 @@ func TestBatchSpanProcessorMetrics(t *testing.T) {
WithMaxQueueSize(2),
WithMaxExportBatchSize(2),
)
internalBsp := bsp.(*batchSpanProcessor)
tp.RegisterSpanProcessor(bsp)

tr := tp.Tracer("TestBatchSpanProcessorMetrics")
Expand All @@ -719,15 +724,25 @@ func TestBatchSpanProcessorMetrics(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
assert.NoError(t, me.waitForSpans(ctx, 2))
assertSelfObsScopeMetrics(t, internalBsp.componentNameAttr, reader,
expectMetrics{queueCapacity: 2, queueSize: 0, successProcessed: 2})
assertSelfObsScopeMetrics(t, reader, expectMetrics{
queueCapacity: 2,
queueSize: 0,
successProcessed: 2,
})
// Generate 3 spans. 2 fill the queue, and 1 is dropped because the queue is full.
generateSpan(t, tr, testOption{genNumSpans: 3})
assertSelfObsScopeMetrics(t, internalBsp.componentNameAttr, reader,
expectMetrics{queueCapacity: 2, queueSize: 2, queueFullProcessed: 1, successProcessed: 2})
assertSelfObsScopeMetrics(t, reader, expectMetrics{
queueCapacity: 2,
queueSize: 2,
queueFullProcessed: 1,
successProcessed: 2,
})
}

func TestBatchSpanProcessorBlockingMetrics(t *testing.T) {
// Reset for deterministic component ID.
processorIDCounter.Store(componentID)

t.Setenv("OTEL_GO_X_SELF_OBSERVABILITY", "true")
tp := basicTracerProvider(t)
reader := sdkmetric.NewManualReader()
Expand All @@ -747,7 +762,6 @@ func TestBatchSpanProcessorBlockingMetrics(t *testing.T) {
WithMaxQueueSize(2),
WithMaxExportBatchSize(2),
)
internalBsp := bsp.(*batchSpanProcessor)
tp.RegisterSpanProcessor(bsp)

tr := tp.Tracer("TestBatchSpanProcessorBlockingMetrics")
Expand All @@ -756,23 +770,33 @@ func TestBatchSpanProcessorBlockingMetrics(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
assert.NoError(t, me.waitForSpans(ctx, 2))
assertSelfObsScopeMetrics(t, internalBsp.componentNameAttr, reader,
expectMetrics{queueCapacity: 2, queueSize: 0, successProcessed: 2})
assertSelfObsScopeMetrics(t, reader, expectMetrics{
queueCapacity: 2,
queueSize: 0,
successProcessed: 2,
})
// Generate 2 spans to fill the queue.
generateSpan(t, tr, testOption{genNumSpans: 2})
go func() {
// Generate a span which blocks because the queue is full.
generateSpan(t, tr, testOption{genNumSpans: 1})
}()
assertSelfObsScopeMetrics(t, internalBsp.componentNameAttr, reader,
expectMetrics{queueCapacity: 2, queueSize: 2, successProcessed: 2})
assertSelfObsScopeMetrics(t, reader, expectMetrics{
queueCapacity: 2,
queueSize: 2,
successProcessed: 2,
})

// Use ForceFlush to force the span that is blocking on the full queue to be dropped.
ctx, cancel = context.WithTimeout(context.Background(), 10*time.Millisecond)
defer cancel()
assert.Error(t, tp.ForceFlush(ctx))
assertSelfObsScopeMetrics(t, internalBsp.componentNameAttr, reader,
expectMetrics{queueCapacity: 2, queueSize: 2, queueFullProcessed: 1, successProcessed: 2})
assertSelfObsScopeMetrics(t, reader, expectMetrics{
queueCapacity: 2,
queueSize: 2,
queueFullProcessed: 1,
successProcessed: 2,
})
}

type expectMetrics struct {
Expand All @@ -782,13 +806,16 @@ type expectMetrics struct {
queueFullProcessed int64
}

func assertSelfObsScopeMetrics(t *testing.T, componentNameAttr attribute.KeyValue, reader sdkmetric.Reader,
func assertSelfObsScopeMetrics(
t *testing.T,
reader sdkmetric.Reader,
expectation expectMetrics,
) {
t.Helper()
gotResourceMetrics := new(metricdata.ResourceMetrics)
assert.NoError(t, reader.Collect(context.Background(), gotResourceMetrics))

componentNameAttr := observ.BSPComponentName(componentID)
baseAttrs := attribute.NewSet(
semconv.OTelComponentTypeBatchingSpanProcessor,
componentNameAttr,
Expand Down Expand Up @@ -832,7 +859,7 @@ func assertSelfObsScopeMetrics(t *testing.T, componentNameAttr attribute.KeyValu
Attributes: attribute.NewSet(
semconv.OTelComponentTypeBatchingSpanProcessor,
componentNameAttr,
semconv.ErrorTypeKey.String(string(queueFull)),
observ.ErrQueueFull,
),
})
}
Expand All @@ -854,9 +881,9 @@ func assertSelfObsScopeMetrics(t *testing.T, componentNameAttr attribute.KeyValu

wantScopeMetric := metricdata.ScopeMetrics{
Scope: instrumentation.Scope{
Name: "go.opentelemetry.io/otel/sdk/trace",
Name: observ.ScopeName,
Version: sdk.Version(),
SchemaURL: semconv.SchemaURL,
SchemaURL: observ.SchemaURL,
},
Metrics: wantMetrics,
}
Expand Down
Loading
Loading