Skip to content
Closed
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ The next release will require at least [Go 1.24].
- The `go.opentelemetry.io/otel/semconv/v1.36.0` package.
The package contains semantic conventions from the `v1.36.0` version of the OpenTelemetry Semantic Conventions.
See the [migration documentation](./semconv/v1.36.0/MIGRATION.md) for information on how to upgrade from `go.opentelemetry.io/otel/semconv/v1.34.0.`(#7032)
- Add experimental self-observability span and batch span processor metrics in `go.opentelemetry.io/otel/sdk/trace`.
Check the `go.opentelemetry.io/otel/sdk/trace/internal/x` package documentation for more information. (#7027, #6393)
- Add experimental self-observability span, simple span processor, and batch span processor metrics in `go.opentelemetry.io/otel/sdk/trace`.
Check the `go.opentelemetry.io/otel/sdk/trace/internal/x` package documentation for more information. (#7027, #6393, #7162)
- Add native histogram exemplar support in `go.opentelemetry.io/otel/exporters/prometheus`. (#6772)
- Add experimental self-observability log metrics in `go.opentelemetry.io/otel/sdk/log`.
Check the `go.opentelemetry.io/otel/sdk/log/internal/x` package documentation for more information. (#7121)
Expand Down
1 change: 1 addition & 0 deletions sdk/trace/internal/x/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ When enabled, the SDK will create the following metrics using the global `MeterP

- `otel.sdk.span.live`
- `otel.sdk.span.started`
- `otel.sdk.processor.span.processed` (only for simple span processor)

Please see the [Semantic conventions for OpenTelemetry SDK metrics] documentation for more details on these metrics.

Expand Down
54 changes: 53 additions & 1 deletion sdk/trace/simple_span_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,18 @@ package trace // import "go.opentelemetry.io/otel/sdk/trace"

import (
"context"
"fmt"
"sync"
"sync/atomic"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/internal/global"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/sdk"
"go.opentelemetry.io/otel/sdk/trace/internal/x"
semconv "go.opentelemetry.io/otel/semconv/v1.36.0"
"go.opentelemetry.io/otel/semconv/v1.36.0/otelconv"
)

// simpleSpanProcessor is a SpanProcessor that synchronously sends all
Expand All @@ -17,6 +25,10 @@ type simpleSpanProcessor struct {
exporterMu sync.Mutex
exporter SpanExporter
stopOnce sync.Once

selfObservabilityEnabled bool
componentNameAttr attribute.KeyValue
spansProcessedCounter otelconv.SDKProcessorSpanProcessed
}

var _ SpanProcessor = (*simpleSpanProcessor)(nil)
Expand All @@ -33,11 +45,42 @@ func NewSimpleSpanProcessor(exporter SpanExporter) SpanProcessor {
ssp := &simpleSpanProcessor{
exporter: exporter,
}
ssp.configureSelfObservability()

global.Warn("SimpleSpanProcessor is not recommended for production use, consider using BatchSpanProcessor instead.")

return ssp
}

var simpleProcessorIDCounter atomic.Int64

// nextSimpleProcessorID returns an identifier for this simple span processor,
// starting with 0 and incrementing by 1 each time it is called.
func nextSimpleProcessorID() int64 {
return simpleProcessorIDCounter.Add(1) - 1
}

// configureSelfObservability configures metrics for the simple span processor.
func (ssp *simpleSpanProcessor) configureSelfObservability() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please flatten this into the span processor creation function or refactor it to not produce side-effects.

Related:

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

if !x.SelfObservability.Enabled() {
return
}
ssp.selfObservabilityEnabled = true
ssp.componentNameAttr = semconv.OTelComponentName(
fmt.Sprintf("%s/%d", otelconv.ComponentTypeSimpleSpanProcessor, nextSimpleProcessorID()))
meter := otel.GetMeterProvider().Meter(
selfObsScopeName,
metric.WithInstrumentationVersion(sdk.Version()),
metric.WithSchemaURL(semconv.SchemaURL),
)

var err error
ssp.spansProcessedCounter, err = otelconv.NewSDKProcessorSpanProcessed(meter)
if err != nil {
otel.Handle(err)
}
}

// OnStart does nothing.
func (*simpleSpanProcessor) OnStart(context.Context, ReadWriteSpan) {}

Expand All @@ -47,8 +90,17 @@ func (ssp *simpleSpanProcessor) OnEnd(s ReadOnlySpan) {
defer ssp.exporterMu.Unlock()

if ssp.exporter != nil && s.SpanContext().TraceFlags().IsSampled() {
if err := ssp.exporter.ExportSpans(context.Background(), []ReadOnlySpan{s}); err != nil {
attrs := []attribute.KeyValue{
ssp.componentNameAttr,
ssp.spansProcessedCounter.AttrComponentType(otelconv.ComponentTypeSimpleSpanProcessor),
}
err := ssp.exporter.ExportSpans(context.Background(), []ReadOnlySpan{s})
if err != nil {
otel.Handle(err)
attrs = append(attrs, semconv.ErrorType(err))
}
if ssp.selfObservabilityEnabled {
ssp.spansProcessedCounter.Add(context.Background(), 1, attrs...)
}
}
}
Expand Down
147 changes: 147 additions & 0 deletions sdk/trace/simple_span_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,23 @@ package trace
import (
"context"
"errors"
"strconv"
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk"
"go.opentelemetry.io/otel/sdk/instrumentation"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
semconv "go.opentelemetry.io/otel/semconv/v1.36.0"
"go.opentelemetry.io/otel/semconv/v1.36.0/otelconv"
)

type simpleTestExporter struct {
Expand All @@ -34,6 +46,17 @@ func (t *simpleTestExporter) Shutdown(ctx context.Context) error {
}
}

var _ SpanExporter = (*failingTestExporter)(nil)

type failingTestExporter struct {
simpleTestExporter
}

func (f *failingTestExporter) ExportSpans(ctx context.Context, spans []ReadOnlySpan) error {
_ = f.simpleTestExporter.ExportSpans(ctx, spans)
return errors.New("failed to export spans")
}

var _ SpanExporter = (*simpleTestExporter)(nil)

func TestNewSimpleSpanProcessor(t *testing.T) {
Expand Down Expand Up @@ -168,3 +191,127 @@ func TestSimpleSpanProcessorShutdownHonorsContextCancel(t *testing.T) {
t.Errorf("SimpleSpanProcessor.Shutdown did not return %v, got %v", want, got)
}
}

func TestSimpleSpanProcessorSelfObservability(t *testing.T) {
tests := []struct {
name string
enabled bool
exporter SpanExporter
assertMetrics func(t *testing.T, rm metricdata.ResourceMetrics)
}{
{
name: "Disabled",
enabled: false,
exporter: &simpleTestExporter{},
assertMetrics: func(t *testing.T, rm metricdata.ResourceMetrics) {
assert.Empty(t, rm.ScopeMetrics)
},
},
{
name: "Enabled",
enabled: true,
exporter: &simpleTestExporter{},
assertMetrics: func(t *testing.T, rm metricdata.ResourceMetrics) {
assert.Len(t, rm.ScopeMetrics, 1)
sm := rm.ScopeMetrics[0]

want := metricdata.ScopeMetrics{
Scope: instrumentation.Scope{
Name: "go.opentelemetry.io/otel/sdk/trace",
Version: sdk.Version(),
SchemaURL: semconv.SchemaURL,
},
Metrics: []metricdata.Metrics{
{
Name: otelconv.SDKProcessorSpanProcessed{}.Name(),
Description: otelconv.SDKProcessorSpanProcessed{}.Description(),
Unit: otelconv.SDKProcessorSpanProcessed{}.Unit(),
Data: metricdata.Sum[int64]{
DataPoints: []metricdata.DataPoint[int64]{
{
Value: 1,
Attributes: attribute.NewSet(
semconv.OTelComponentName("simple_span_processor/0"),
semconv.OTelComponentTypeKey.String("simple_span_processor"),
),
},
},
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
},
},
},
}

metricdatatest.AssertEqual(t, want, sm, metricdatatest.IgnoreTimestamp())
},
},
{
name: "Enabled, Exporter error",
enabled: true,
exporter: &failingTestExporter{
simpleTestExporter: simpleTestExporter{},
},
assertMetrics: func(t *testing.T, rm metricdata.ResourceMetrics) {
assert.Len(t, rm.ScopeMetrics, 1)
sm := rm.ScopeMetrics[0]

want := metricdata.ScopeMetrics{
Scope: instrumentation.Scope{
Name: "go.opentelemetry.io/otel/sdk/trace",
Version: sdk.Version(),
SchemaURL: semconv.SchemaURL,
},
Metrics: []metricdata.Metrics{
{
Name: otelconv.SDKProcessorSpanProcessed{}.Name(),
Description: otelconv.SDKProcessorSpanProcessed{}.Description(),
Unit: otelconv.SDKProcessorSpanProcessed{}.Unit(),
Data: metricdata.Sum[int64]{
DataPoints: []metricdata.DataPoint[int64]{
{
Value: 1,
Attributes: attribute.NewSet(
semconv.OTelComponentName("simple_span_processor/1"),
semconv.OTelComponentTypeKey.String("simple_span_processor"),
semconv.ErrorTypeKey.String("*errors.errorString"),
),
},
},
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
},
},
},
}

metricdatatest.AssertEqual(t, want, sm, metricdatatest.IgnoreTimestamp())
},
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
t.Setenv("OTEL_GO_X_SELF_OBSERVABILITY", strconv.FormatBool(test.enabled))

original := otel.GetMeterProvider()
t.Cleanup(func() { otel.SetMeterProvider(original) })

r := metric.NewManualReader()
mp := metric.NewMeterProvider(
metric.WithReader(r),
metric.WithView(dropSpanMetricsView),
)
otel.SetMeterProvider(mp)

ssp := NewSimpleSpanProcessor(test.exporter)
tp := basicTracerProvider(t)
tp.RegisterSpanProcessor(ssp)
startSpan(tp, test.name).End()

var rm metricdata.ResourceMetrics
require.NoError(t, r.Collect(context.Background(), &rm))
test.assertMetrics(t, rm)
})
}
}
Loading