Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ The next release will require at least [Go 1.24].
- The `go.opentelemetry.io/otel/semconv/v1.37.0` package.
The package contains semantic conventions from the `v1.37.0` version of the OpenTelemetry Semantic Conventions.
See the [migration documentation](./semconv/v1.37.0/MIGRATION.md) for information on how to upgrade from `go.opentelemetry.io/otel/semconv/v1.36.0.`(#7254)
- Add experimental self-observability simple span processor metrics in `go.opentelemetry.io/otel/sdk/trace`.
Check the `go.opentelemetry.io/otel/sdk/trace/internal/x` package documentation for more information. (#7162)

### Changed

Expand Down
1 change: 1 addition & 0 deletions sdk/trace/internal/x/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ When enabled, the SDK will create the following metrics using the global `MeterP

- `otel.sdk.span.live`
- `otel.sdk.span.started`
- `otel.sdk.processor.span.processed` (only for simple span processor)

Please see the [Semantic conventions for OpenTelemetry SDK metrics] documentation for more details on these metrics.

Expand Down
78 changes: 76 additions & 2 deletions sdk/trace/simple_span_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,42 @@ package trace // import "go.opentelemetry.io/otel/sdk/trace"

import (
"context"
"fmt"
"sync"
"sync/atomic"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/internal/global"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/sdk"
"go.opentelemetry.io/otel/sdk/trace/internal/x"
semconv "go.opentelemetry.io/otel/semconv/v1.36.0"
"go.opentelemetry.io/otel/semconv/v1.36.0/otelconv"
"go.opentelemetry.io/otel/trace"
)

var measureAttrsPool = sync.Pool{
New: func() any {
// "component.name" + "component.type" + "error.type"
const n = 1 + 1 + 1
s := make([]attribute.KeyValue, 0, n)
// Return a pointer to a slice instead of a slice itself
// to avoid allocations on every call.
return &s
},
}

// simpleSpanProcessor is a SpanProcessor that synchronously sends all
// completed Spans to a trace.Exporter immediately.
type simpleSpanProcessor struct {
exporterMu sync.Mutex
exporter SpanExporter
stopOnce sync.Once

selfObservabilityEnabled bool
componentNameAttr attribute.KeyValue
spansProcessedCounter otelconv.SDKProcessorSpanProcessed
}

var _ SpanProcessor = (*simpleSpanProcessor)(nil)
Expand All @@ -31,13 +55,46 @@ var _ SpanProcessor = (*simpleSpanProcessor)(nil)
// use instead.
func NewSimpleSpanProcessor(exporter SpanExporter) SpanProcessor {
ssp := &simpleSpanProcessor{
exporter: exporter,
exporter: exporter,
selfObservabilityEnabled: x.SelfObservability.Enabled(),
}

if ssp.selfObservabilityEnabled {
ssp.componentNameAttr = semconv.OTelComponentName(
fmt.Sprintf("%s/%d", otelconv.ComponentTypeSimpleSpanProcessor, nextSimpleProcessorID()))

var err error
ssp.spansProcessedCounter, err = newSpanProcessedInst()
if err != nil {
msg := "failed to create self-observability metrics for simple span processor: %w"
err := fmt.Errorf(msg, err)
otel.Handle(err)
}
}

global.Warn("SimpleSpanProcessor is not recommended for production use, consider using BatchSpanProcessor instead.")

return ssp
}

var simpleProcessorIDCounter atomic.Int64

// nextSimpleProcessorID returns an identifier for this simple span processor,
// starting with 0 and incrementing by 1 each time it is called.
func nextSimpleProcessorID() int64 {
return simpleProcessorIDCounter.Add(1) - 1
}

func newSpanProcessedInst() (otelconv.SDKProcessorSpanProcessed, error) {
meter := otel.GetMeterProvider().Meter(
selfObsScopeName,
metric.WithInstrumentationVersion(sdk.Version()),
metric.WithSchemaURL(semconv.SchemaURL),
)
spansProcessedCounter, err := otelconv.NewSDKProcessorSpanProcessed(meter)
return spansProcessedCounter, err
}

// OnStart does nothing.
func (*simpleSpanProcessor) OnStart(context.Context, ReadWriteSpan) {}

Expand All @@ -47,8 +104,25 @@ func (ssp *simpleSpanProcessor) OnEnd(s ReadOnlySpan) {
defer ssp.exporterMu.Unlock()

if ssp.exporter != nil && s.SpanContext().TraceFlags().IsSampled() {
if err := ssp.exporter.ExportSpans(context.Background(), []ReadOnlySpan{s}); err != nil {
attrs := measureAttrsPool.Get().(*[]attribute.KeyValue)
defer func() {
*attrs = (*attrs)[:0] // reset the slice for reuse
measureAttrsPool.Put(attrs)
}()
*attrs = append(*attrs,
ssp.componentNameAttr,
ssp.spansProcessedCounter.AttrComponentType(otelconv.ComponentTypeSimpleSpanProcessor))

err := ssp.exporter.ExportSpans(context.Background(), []ReadOnlySpan{s})
if err != nil {
otel.Handle(err)
*attrs = append(*attrs, semconv.ErrorType(err))
}
if ssp.selfObservabilityEnabled {
// Add the span to the context to ensure the metric is recorded
// with the correct span context.
ctx := trace.ContextWithSpanContext(context.Background(), s.SpanContext())
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the correct way to add span to context given we have ReadOnlySpan in this method instead of Span?

ssp.spansProcessedCounter.Add(ctx, 1, *attrs...)
}
}
}
Expand Down
160 changes: 160 additions & 0 deletions sdk/trace/simple_span_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,23 @@ package trace
import (
"context"
"errors"
"strconv"
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk"
"go.opentelemetry.io/otel/sdk/instrumentation"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
semconv "go.opentelemetry.io/otel/semconv/v1.36.0"
"go.opentelemetry.io/otel/semconv/v1.36.0/otelconv"
)

type simpleTestExporter struct {
Expand All @@ -34,6 +46,17 @@ func (t *simpleTestExporter) Shutdown(ctx context.Context) error {
}
}

var _ SpanExporter = (*failingTestExporter)(nil)

type failingTestExporter struct {
simpleTestExporter
}

func (f *failingTestExporter) ExportSpans(ctx context.Context, spans []ReadOnlySpan) error {
_ = f.simpleTestExporter.ExportSpans(ctx, spans)
return errors.New("failed to export spans")
}

var _ SpanExporter = (*simpleTestExporter)(nil)

func TestNewSimpleSpanProcessor(t *testing.T) {
Expand Down Expand Up @@ -168,3 +191,140 @@ func TestSimpleSpanProcessorShutdownHonorsContextCancel(t *testing.T) {
t.Errorf("SimpleSpanProcessor.Shutdown did not return %v, got %v", want, got)
}
}

func TestSimpleSpanProcessorSelfObservability(t *testing.T) {
tests := []struct {
name string
enabled bool
exporter SpanExporter
assertMetrics func(t *testing.T, rm metricdata.ResourceMetrics)
}{
{
name: "Disabled",
enabled: false,
exporter: &simpleTestExporter{},
assertMetrics: func(t *testing.T, rm metricdata.ResourceMetrics) {
assert.Empty(t, rm.ScopeMetrics)
},
},
{
name: "Enabled",
enabled: true,
exporter: &simpleTestExporter{},
assertMetrics: func(t *testing.T, rm metricdata.ResourceMetrics) {
assert.Len(t, rm.ScopeMetrics, 1)
sm := rm.ScopeMetrics[0]

want := metricdata.ScopeMetrics{
Scope: instrumentation.Scope{
Name: "go.opentelemetry.io/otel/sdk/trace",
Version: sdk.Version(),
SchemaURL: semconv.SchemaURL,
},
Metrics: []metricdata.Metrics{
{
Name: otelconv.SDKProcessorSpanProcessed{}.Name(),
Description: otelconv.SDKProcessorSpanProcessed{}.Description(),
Unit: otelconv.SDKProcessorSpanProcessed{}.Unit(),
Data: metricdata.Sum[int64]{
DataPoints: []metricdata.DataPoint[int64]{
{
Value: 1,
Attributes: attribute.NewSet(
semconv.OTelComponentName("simple_span_processor/0"),
semconv.OTelComponentTypeKey.String("simple_span_processor"),
),
},
},
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
},
},
},
}

metricdatatest.AssertEqual(
t,
want,
sm,
metricdatatest.IgnoreTimestamp(),
metricdatatest.IgnoreExemplars(),
)
},
},
{
name: "Enabled, Exporter error",
enabled: true,
exporter: &failingTestExporter{
simpleTestExporter: simpleTestExporter{},
},
assertMetrics: func(t *testing.T, rm metricdata.ResourceMetrics) {
assert.Len(t, rm.ScopeMetrics, 1)
sm := rm.ScopeMetrics[0]

want := metricdata.ScopeMetrics{
Scope: instrumentation.Scope{
Name: "go.opentelemetry.io/otel/sdk/trace",
Version: sdk.Version(),
SchemaURL: semconv.SchemaURL,
},
Metrics: []metricdata.Metrics{
{
Name: otelconv.SDKProcessorSpanProcessed{}.Name(),
Description: otelconv.SDKProcessorSpanProcessed{}.Description(),
Unit: otelconv.SDKProcessorSpanProcessed{}.Unit(),
Data: metricdata.Sum[int64]{
DataPoints: []metricdata.DataPoint[int64]{
{
Value: 1,
Attributes: attribute.NewSet(
semconv.OTelComponentName("simple_span_processor/0"),
semconv.OTelComponentTypeKey.String("simple_span_processor"),
semconv.ErrorTypeKey.String("*errors.errorString"),
),
},
},
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
},
},
},
}

metricdatatest.AssertEqual(
t,
want,
sm,
metricdatatest.IgnoreTimestamp(),
metricdatatest.IgnoreExemplars(),
)
},
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
t.Setenv("OTEL_GO_X_SELF_OBSERVABILITY", strconv.FormatBool(test.enabled))

original := otel.GetMeterProvider()
t.Cleanup(func() { otel.SetMeterProvider(original) })

r := metric.NewManualReader()
mp := metric.NewMeterProvider(
metric.WithReader(r),
metric.WithView(dropSpanMetricsView),
)
otel.SetMeterProvider(mp)

ssp := NewSimpleSpanProcessor(test.exporter)
tp := basicTracerProvider(t)
tp.RegisterSpanProcessor(ssp)
startSpan(tp, test.name).End()

var rm metricdata.ResourceMetrics
require.NoError(t, r.Collect(context.Background(), &rm))
test.assertMetrics(t, rm)
simpleProcessorIDCounter.Store(0) // reset simpleProcessorIDCounter
})
}
}
Loading