Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- Add experimental observability metrics in `go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp`. (#7512)
- Add experimental observability metrics for manual reader in `go.opentelemetry.io/otel/sdk/metric`. (#7524)
- Add experimental observability metrics for periodic reader in `go.opentelemetry.io/otel/sdk/metric`. (#7571)
- Add experimental observability metrics for simple log processor in `go.opentelemetry.io/otel/sdk/log`. (#7548)

### Fixed

Expand Down
19 changes: 19 additions & 0 deletions sdk/log/internal/observ/simple_log_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"fmt"
"sync"
"sync/atomic"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
Expand All @@ -33,6 +34,24 @@ var measureAttrsPool = sync.Pool{
},
}

// simpleProcessorN is a global 0-based count of the number of simple processor created.
var simpleProcessorN atomic.Int64

// NextSimpleProcessorID returns the next unique ID for an simpleProcessor.
func NextSimpleProcessorID() int64 {
const inc = 1
return simpleProcessorN.Add(inc) - inc
}

// SetSimpleProcessorID sets the exporter ID counter to v and returns the previous
// value.
//
// This function is useful for testing purposes, allowing you to reset the
// counter. It should not be used in production code.
func SetSimpleProcessorID(v int64) int64 {
return simpleProcessorN.Swap(v)
}

// GetSLPComponentName returns the component name attribute for a
// SimpleLogProcessor with the given ID.
func GetSLPComponentName(id int64) attribute.KeyValue {
Expand Down
47 changes: 47 additions & 0 deletions sdk/log/internal/observ/simple_log_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package observ

import (
"errors"
"sync"
"testing"

"github.com/stretchr/testify/assert"
Expand All @@ -23,6 +24,52 @@ import (
"go.opentelemetry.io/otel/semconv/v1.37.0/otelconv"
)

func TestNextExporterID(t *testing.T) {
SetSimpleProcessorID(0)

var expected int64
for range 10 {
id := NextSimpleProcessorID()
assert.Equal(t, expected, id)
expected++
}
}

func TestSetExporterID(t *testing.T) {
SetSimpleProcessorID(0)

prev := SetSimpleProcessorID(42)
assert.Equal(t, int64(0), prev)

id := NextSimpleProcessorID()
assert.Equal(t, int64(42), id)
}

func TestNextExporterIDConcurrentSafe(t *testing.T) {
SetSimpleProcessorID(0)

const goroutines = 100
const increments = 10

var wg sync.WaitGroup
wg.Add(goroutines)

for range goroutines {
go func() {
defer wg.Done()
for range increments {
NextSimpleProcessorID()
}
}()
}

wg.Wait()

expected := int64(goroutines * increments)
id := NextSimpleProcessorID()
assert.Equal(t, expected, id)
}

type errMeterProvider struct {
mapi.MeterProvider
err error
Expand Down
24 changes: 20 additions & 4 deletions sdk/log/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ package log // import "go.opentelemetry.io/otel/sdk/log"
import (
"context"
"sync"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/sdk/log/internal/observ"
)

// Compile-time check SimpleProcessor implements Processor.
Expand All @@ -17,8 +20,8 @@ var _ Processor = (*SimpleProcessor)(nil)
type SimpleProcessor struct {
mu sync.Mutex
exporter Exporter

noCmp [0]func() //nolint: unused // This is indeed used.
inst *observ.SLP
noCmp [0]func() //nolint: unused // This is indeed used.
}

// NewSimpleProcessor is a simple Processor adapter.
Expand All @@ -30,7 +33,15 @@ type SimpleProcessor struct {
// [NewBatchProcessor] instead. However, there may be exceptions where certain
// [Exporter] implementations perform better with this Processor.
func NewSimpleProcessor(exporter Exporter, _ ...SimpleProcessorOption) *SimpleProcessor {
return &SimpleProcessor{exporter: exporter}
slp := &SimpleProcessor{
exporter: exporter,
}
var err error
slp.inst, err = observ.NewSLP(observ.NextSimpleProcessorID())
if err != nil {
otel.Handle(err)
}
return slp
}

var simpleProcRecordsPool = sync.Pool{
Expand All @@ -41,7 +52,7 @@ var simpleProcRecordsPool = sync.Pool{
}

// OnEmit batches provided log record.
func (s *SimpleProcessor) OnEmit(ctx context.Context, r *Record) error {
func (s *SimpleProcessor) OnEmit(ctx context.Context, r *Record) (err error) {
if s.exporter == nil {
return nil
}
Expand All @@ -55,6 +66,11 @@ func (s *SimpleProcessor) OnEmit(ctx context.Context, r *Record) error {
simpleProcRecordsPool.Put(records)
}()

if s.inst != nil {
defer func() {
s.inst.LogProcessed(ctx, err)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking that when this part of the logic is triggered, this section should also be reported.

if s.exporter == nil {
	return nil
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@flc1125, I don't quite understand this part.
could you explain it in detail?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I want to express is that we should also report this metric regarding this part of the situation.

However, what's a bit special here is that what is returned here is nil, but I'm wondering, when reporting the metric, should we define a custom error to describe the fact that it wasn't successfully exported (something like: simple processor has no exporter configured).

cc @MrAlias, if you have any good suggestions, please feel free to add them.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@flc1125 about this issue, maybe we could address this in an separate PR, WDYT?

Copy link
Member

@flc1125 flc1125 Nov 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we at least need to support indicators reporting for this scenario; the point of disagreement lies in the issue of err or nil.

if s.exporter == nil {
	return nil
}

Regarding the points of disagreement, I think it's fine to discuss them in another PR or issue.

}()
}
return s.exporter.Export(ctx, *records)
}

Expand Down
191 changes: 191 additions & 0 deletions sdk/log/simple_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,25 @@ package log_test
import (
"context"
"io"
"strconv"
"strings"
"sync"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk"
"go.opentelemetry.io/otel/sdk/instrumentation"
"go.opentelemetry.io/otel/sdk/log"
"go.opentelemetry.io/otel/sdk/log/internal/observ"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
semconv "go.opentelemetry.io/otel/semconv/v1.37.0"
"go.opentelemetry.io/otel/semconv/v1.37.0/otelconv"
)

type exporter struct {
Expand All @@ -40,6 +51,17 @@ func (e *exporter) ForceFlush(context.Context) error {
return nil
}

var _ log.Exporter = (*failingTestExporter)(nil)

type failingTestExporter struct {
exporter
}

func (f *failingTestExporter) Export(ctx context.Context, r []log.Record) error {
_ = f.exporter.Export(ctx, r)
return assert.AnError
}

func TestSimpleProcessorOnEmit(t *testing.T) {
e := new(exporter)
s := log.NewSimpleProcessor(e)
Expand Down Expand Up @@ -138,3 +160,172 @@ func BenchmarkSimpleProcessorOnEmit(b *testing.B) {
_ = out
})
}

func BenchmarkSimpleProcessorObservability(b *testing.B) {
run := func(b *testing.B) {
slp := log.NewSimpleProcessor(&failingTestExporter{exporter: exporter{}})
record := new(log.Record)
record.SetSeverityText("test")

ctx := b.Context()
b.ReportAllocs()
b.ResetTimer()

var err error
for b.Loop() {
err = slp.OnEmit(ctx, record)
}
_ = err
}

b.Run("Observability", func(b *testing.B) {
b.Setenv("OTEL_GO_X_OBSERVABILITY", "true")
run(b)
})
b.Run("NoObservability", run)
}

func TestSimpleLogProcessorObservability(t *testing.T) {
testcases := []struct {
name string
enabled bool
exporter log.Exporter
wantErr error
assertMetrics func(t *testing.T, rm metricdata.ResourceMetrics)
}{
{
name: "disabled",
enabled: false,
exporter: new(exporter),
assertMetrics: func(t *testing.T, rm metricdata.ResourceMetrics) {
assert.Empty(t, rm.ScopeMetrics)
},
},
{
name: "enabled",
enabled: true,
exporter: new(exporter),
assertMetrics: func(t *testing.T, rm metricdata.ResourceMetrics) {
assert.Len(t, rm.ScopeMetrics, 1)
sm := rm.ScopeMetrics[0]

p := otelconv.SDKProcessorLogProcessed{}

want := metricdata.ScopeMetrics{
Scope: instrumentation.Scope{
Name: observ.ScopeName,
Version: sdk.Version(),
SchemaURL: semconv.SchemaURL,
},
Metrics: []metricdata.Metrics{
{
Name: p.Name(),
Description: p.Description(),
Unit: p.Unit(),
Data: metricdata.Sum[int64]{
DataPoints: []metricdata.DataPoint[int64]{
{
Value: 1,
Attributes: attribute.NewSet(
observ.GetSLPComponentName(0),
semconv.OTelComponentTypeKey.String(
string(otelconv.ComponentTypeSimpleLogProcessor),
),
),
},
},
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
},
},
},
}

metricdatatest.AssertEqual(
t,
want,
sm,
metricdatatest.IgnoreExemplars(),
metricdatatest.IgnoreTimestamp(),
)
},
},
{
name: "Enable Exporter error",
enabled: true,
wantErr: assert.AnError,
exporter: &failingTestExporter{
exporter: exporter{},
},
assertMetrics: func(t *testing.T, rm metricdata.ResourceMetrics) {
assert.Len(t, rm.ScopeMetrics, 1)
sm := rm.ScopeMetrics[0]
p := otelconv.SDKProcessorLogProcessed{}

want := metricdata.ScopeMetrics{
Scope: instrumentation.Scope{
Name: "go.opentelemetry.io/otel/sdk/log/internal/observ",
Version: sdk.Version(),
SchemaURL: semconv.SchemaURL,
},
Metrics: []metricdata.Metrics{
{
Name: p.Name(),
Description: p.Description(),
Unit: p.Unit(),
Data: metricdata.Sum[int64]{
DataPoints: []metricdata.DataPoint[int64]{
{
Value: 1,
Attributes: attribute.NewSet(
observ.GetSLPComponentName(0),
semconv.OTelComponentTypeKey.String(
string(otelconv.ComponentTypeSimpleLogProcessor),
),
semconv.ErrorTypeKey.String("*errors.errorString"),
),
},
},
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
},
},
},
}

metricdatatest.AssertEqual(
t,
want,
sm,
metricdatatest.IgnoreTimestamp(),
metricdatatest.IgnoreExemplars(),
)
},
},
}

for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
t.Setenv("OTEL_GO_X_OBSERVABILITY", strconv.FormatBool(tc.enabled))

original := otel.GetMeterProvider()
t.Cleanup(func() {
otel.SetMeterProvider(original)
})

r := metric.NewManualReader()
mp := metric.NewMeterProvider(metric.WithReader(r))
otel.SetMeterProvider(mp)

slp := log.NewSimpleProcessor(tc.exporter)
record := new(log.Record)
record.SetSeverityText("test")
err := slp.OnEmit(t.Context(), record)
require.ErrorIs(t, err, tc.wantErr)
var rm metricdata.ResourceMetrics
require.NoError(t, r.Collect(t.Context(), &rm))
tc.assertMetrics(t, rm)
observ.SetSimpleProcessorID(0)
})
}
}
Loading