Skip to content
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- Add experimental observability metrics in `go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp`. (#7486)
- Add experimental observability metrics for simple span processor in `go.opentelemetry.io/otel/sdk/trace`. (#7374)
- Add experimental observability metrics in `go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp`. (#7512)
- Add experimental observability metrics for simple log processor in `go.opentelemetry.io/otel/sdk/log`. (#7548)

### Fixed

Expand Down
31 changes: 31 additions & 0 deletions sdk/log/internal/counter/counter.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

65 changes: 65 additions & 0 deletions sdk/log/internal/counter/counter_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions sdk/log/internal/gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,8 @@
// Package internal provides internal functionality for the sdk/log package.
package internal // import "go.opentelemetry.io/otel/sdk/log/internal"

//go:generate gotmpl --body=../../../internal/shared/counter/counter.go.tmpl "--data={ \"pkg\": \"go.opentelemetry.io/otel/sdk/log/internal/counter\" }" --out=counter/counter.go
//go:generate gotmpl --body=../../../internal/shared/counter/counter_test.go.tmpl "--data={}" --out=counter/counter_test.go

//go:generate gotmpl --body=../../../internal/shared/x/x.go.tmpl "--data={ \"pkg\": \"go.opentelemetry.io/otel/sdk/log\" }" --out=x/x.go
//go:generate gotmpl --body=../../../internal/shared/x/x_test.go.tmpl "--data={}" --out=x/x_test.go
25 changes: 21 additions & 4 deletions sdk/log/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ package log // import "go.opentelemetry.io/otel/sdk/log"
import (
"context"
"sync"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/sdk/log/internal/counter"
"go.opentelemetry.io/otel/sdk/log/internal/observ"
)

// Compile-time check SimpleProcessor implements Processor.
Expand All @@ -17,8 +21,8 @@ var _ Processor = (*SimpleProcessor)(nil)
type SimpleProcessor struct {
mu sync.Mutex
exporter Exporter

noCmp [0]func() //nolint: unused // This is indeed used.
inst *observ.SLP
noCmp [0]func() //nolint: unused // This is indeed used.
}

// NewSimpleProcessor is a simple Processor adapter.
Expand All @@ -30,7 +34,15 @@ type SimpleProcessor struct {
// [NewBatchProcessor] instead. However, there may be exceptions where certain
// [Exporter] implementations perform better with this Processor.
func NewSimpleProcessor(exporter Exporter, _ ...SimpleProcessorOption) *SimpleProcessor {
return &SimpleProcessor{exporter: exporter}
slp := &SimpleProcessor{
exporter: exporter,
}
var err error
slp.inst, err = observ.NewSLP(counter.NextExporterID())
if err != nil {
otel.Handle(err)
}
return slp
}

var simpleProcRecordsPool = sync.Pool{
Expand All @@ -41,7 +53,7 @@ var simpleProcRecordsPool = sync.Pool{
}

// OnEmit batches provided log record.
func (s *SimpleProcessor) OnEmit(ctx context.Context, r *Record) error {
func (s *SimpleProcessor) OnEmit(ctx context.Context, r *Record) (err error) {
if s.exporter == nil {
return nil
}
Expand All @@ -55,6 +67,11 @@ func (s *SimpleProcessor) OnEmit(ctx context.Context, r *Record) error {
simpleProcRecordsPool.Put(records)
}()

if s.inst != nil {
defer func() {
s.inst.LogProcessed(ctx, err)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking that when this part of the logic is triggered, this section should also be reported.

if s.exporter == nil {
	return nil
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@flc1125, I don't quite understand this part.
could you explain it in detail?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I want to express is that we should also report this metric regarding this part of the situation.

However, what's a bit special here is that what is returned here is nil, but I'm wondering, when reporting the metric, should we define a custom error to describe the fact that it wasn't successfully exported (something like: simple processor has no exporter configured).

cc @MrAlias, if you have any good suggestions, please feel free to add them.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@flc1125 about this issue, maybe we could address this in an separate PR, WDYT?

Copy link
Member

@flc1125 flc1125 Nov 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we at least need to support indicators reporting for this scenario; the point of disagreement lies in the issue of err or nil.

if s.exporter == nil {
	return nil
}

Regarding the points of disagreement, I think it's fine to discuss them in another PR or issue.

}()
}
return s.exporter.Export(ctx, *records)
}

Expand Down
192 changes: 192 additions & 0 deletions sdk/log/simple_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,26 @@ package log_test
import (
"context"
"io"
"strconv"
"strings"
"sync"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk"
"go.opentelemetry.io/otel/sdk/instrumentation"
"go.opentelemetry.io/otel/sdk/log"
"go.opentelemetry.io/otel/sdk/log/internal/counter"
"go.opentelemetry.io/otel/sdk/log/internal/observ"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
semconv "go.opentelemetry.io/otel/semconv/v1.37.0"
"go.opentelemetry.io/otel/semconv/v1.37.0/otelconv"
)

type exporter struct {
Expand All @@ -40,6 +52,17 @@ func (e *exporter) ForceFlush(context.Context) error {
return nil
}

var _ log.Exporter = (*failingTestExporter)(nil)

type failingTestExporter struct {
exporter
}

func (f *failingTestExporter) Export(ctx context.Context, r []log.Record) error {
_ = f.exporter.Export(ctx, r)
return assert.AnError
}

func TestSimpleProcessorOnEmit(t *testing.T) {
e := new(exporter)
s := log.NewSimpleProcessor(e)
Expand Down Expand Up @@ -138,3 +161,172 @@ func BenchmarkSimpleProcessorOnEmit(b *testing.B) {
_ = out
})
}

func BenchmarkSimpleProcessorObservability(b *testing.B) {
run := func(b *testing.B) {
slp := log.NewSimpleProcessor(&failingTestExporter{exporter: exporter{}})
record := new(log.Record)
record.SetSeverityText("test")

ctx := b.Context()
b.ReportAllocs()
b.ResetTimer()

var err error
for b.Loop() {
err = slp.OnEmit(ctx, record)
}
_ = err
}

b.Run("Observability", func(b *testing.B) {
b.Setenv("OTEL_GO_X_OBSERVABILITY", "true")
run(b)
})
b.Run("NoObservability", run)
}

func TestSimpleLogProcessorObservability(t *testing.T) {
testcases := []struct {
name string
enabled bool
exporter log.Exporter
wantErr error
assertMetrics func(t *testing.T, rm metricdata.ResourceMetrics)
}{
{
name: "disabled",
enabled: false,
exporter: new(exporter),
assertMetrics: func(t *testing.T, rm metricdata.ResourceMetrics) {
assert.Empty(t, rm.ScopeMetrics)
},
},
{
name: "enabled",
enabled: true,
exporter: new(exporter),
assertMetrics: func(t *testing.T, rm metricdata.ResourceMetrics) {
assert.Len(t, rm.ScopeMetrics, 1)
sm := rm.ScopeMetrics[0]

p := otelconv.SDKProcessorLogProcessed{}

want := metricdata.ScopeMetrics{
Scope: instrumentation.Scope{
Name: observ.ScopeName,
Version: sdk.Version(),
SchemaURL: semconv.SchemaURL,
},
Metrics: []metricdata.Metrics{
{
Name: p.Name(),
Description: p.Description(),
Unit: p.Unit(),
Data: metricdata.Sum[int64]{
DataPoints: []metricdata.DataPoint[int64]{
{
Value: 1,
Attributes: attribute.NewSet(
observ.GetSLPComponentName(0),
semconv.OTelComponentTypeKey.String(
string(otelconv.ComponentTypeSimpleLogProcessor),
),
),
},
},
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
},
},
},
}

metricdatatest.AssertEqual(
t,
want,
sm,
metricdatatest.IgnoreExemplars(),
metricdatatest.IgnoreTimestamp(),
)
},
},
{
name: "Enable Exporter error",
enabled: true,
wantErr: assert.AnError,
exporter: &failingTestExporter{
exporter: exporter{},
},
assertMetrics: func(t *testing.T, rm metricdata.ResourceMetrics) {
assert.Len(t, rm.ScopeMetrics, 1)
sm := rm.ScopeMetrics[0]
p := otelconv.SDKProcessorLogProcessed{}

want := metricdata.ScopeMetrics{
Scope: instrumentation.Scope{
Name: "go.opentelemetry.io/otel/sdk/log/internal/observ",
Version: sdk.Version(),
SchemaURL: semconv.SchemaURL,
},
Metrics: []metricdata.Metrics{
{
Name: p.Name(),
Description: p.Description(),
Unit: p.Unit(),
Data: metricdata.Sum[int64]{
DataPoints: []metricdata.DataPoint[int64]{
{
Value: 1,
Attributes: attribute.NewSet(
observ.GetSLPComponentName(0),
semconv.OTelComponentTypeKey.String(
string(otelconv.ComponentTypeSimpleLogProcessor),
),
semconv.ErrorTypeKey.String("*errors.errorString"),
),
},
},
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
},
},
},
}

metricdatatest.AssertEqual(
t,
want,
sm,
metricdatatest.IgnoreTimestamp(),
metricdatatest.IgnoreExemplars(),
)
},
},
}

for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
t.Setenv("OTEL_GO_X_OBSERVABILITY", strconv.FormatBool(tc.enabled))

original := otel.GetMeterProvider()
t.Cleanup(func() {
otel.SetMeterProvider(original)
})

r := metric.NewManualReader()
mp := metric.NewMeterProvider(metric.WithReader(r))
otel.SetMeterProvider(mp)

slp := log.NewSimpleProcessor(tc.exporter)
record := new(log.Record)
record.SetSeverityText("test")
err := slp.OnEmit(t.Context(), record)
require.ErrorIs(t, err, tc.wantErr)
var rm metricdata.ResourceMetrics
require.NoError(t, r.Collect(t.Context(), &rm))
tc.assertMetrics(t, rm)
counter.SetExporterID(0)
})
}
}
Loading