Skip to content

Commit d2e9e4e

Browse files
committed
sdk/log: add self-observability metric for SimpleProcessor (#7016)
Implements `otel.sdk.processor.log.processed` as defined in the semantic conventions.
1 parent 68841fa commit d2e9e4e

File tree

3 files changed

+280
-2
lines changed

3 files changed

+280
-2
lines changed

sdk/log/internal/x/README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ To opt-in, set the environment variable `OTEL_GO_X_SELF_OBSERVABILITY` to `true`
1919
When enabled, the SDK will create the following metrics using the global `MeterProvider`:
2020

2121
- `otel.sdk.log.created`
22+
- `otel.sdk.processor.log.processed`
23+
2224

2325
Please see the [Semantic conventions for OpenTelemetry SDK metrics] documentation for more details on these metrics.
2426

sdk/log/simple.go

Lines changed: 56 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,19 +5,35 @@ package log // import "go.opentelemetry.io/otel/sdk/log"
55

66
import (
77
"context"
8+
"fmt"
89
"sync"
10+
"sync/atomic"
11+
12+
"go.opentelemetry.io/otel"
13+
"go.opentelemetry.io/otel/metric"
14+
"go.opentelemetry.io/otel/sdk"
15+
"go.opentelemetry.io/otel/sdk/log/internal/x"
16+
semconv "go.opentelemetry.io/otel/semconv/v1.36.0"
17+
"go.opentelemetry.io/otel/semconv/v1.36.0/otelconv"
918
)
1019

1120
// Compile-time check SimpleProcessor implements Processor.
1221
var _ Processor = (*SimpleProcessor)(nil)
1322

23+
// simpleProcessorInstanceCounter is used to generate unique component names.
24+
var simpleProcessorInstanceCounter atomic.Uint64
25+
1426
// SimpleProcessor is an processor that synchronously exports log records.
1527
//
1628
// Use [NewSimpleProcessor] to create a SimpleProcessor.
1729
type SimpleProcessor struct {
1830
mu sync.Mutex
1931
exporter Exporter
2032

33+
selfObservabilityEnabled bool
34+
processedMetric otelconv.SDKProcessorLogProcessed
35+
componentName string
36+
2137
noCmp [0]func() //nolint: unused // This is indeed used.
2238
}
2339

@@ -30,7 +46,30 @@ type SimpleProcessor struct {
3046
// [NewBatchProcessor] instead. However, there may be exceptions where certain
3147
// [Exporter] implementations perform better with this Processor.
3248
func NewSimpleProcessor(exporter Exporter, _ ...SimpleProcessorOption) *SimpleProcessor {
33-
return &SimpleProcessor{exporter: exporter}
49+
instanceID := simpleProcessorInstanceCounter.Add(1) - 1
50+
s := &SimpleProcessor{
51+
exporter: exporter,
52+
componentName: fmt.Sprintf("%s/%d", string(otelconv.ComponentTypeSimpleLogProcessor), instanceID),
53+
}
54+
s.initSelfObservability()
55+
return s
56+
}
57+
58+
func (s *SimpleProcessor) initSelfObservability() {
59+
if !x.SelfObservability.Enabled() {
60+
return
61+
}
62+
63+
s.selfObservabilityEnabled = true
64+
mp := otel.GetMeterProvider()
65+
m := mp.Meter("go.opentelemetry.io/otel/sdk/log",
66+
metric.WithInstrumentationVersion(sdk.Version()),
67+
metric.WithSchemaURL(semconv.SchemaURL))
68+
69+
var err error
70+
if s.processedMetric, err = otelconv.NewSDKProcessorLogProcessed(m); err != nil {
71+
otel.Handle(err)
72+
}
3473
}
3574

3675
var simpleProcRecordsPool = sync.Pool{
@@ -55,7 +94,22 @@ func (s *SimpleProcessor) OnEmit(ctx context.Context, r *Record) error {
5594
simpleProcRecordsPool.Put(records)
5695
}()
5796

58-
return s.exporter.Export(ctx, *records)
97+
err := s.exporter.Export(ctx, *records)
98+
99+
if s.selfObservabilityEnabled && err != nil {
100+
s.processedMetric.Add(context.Background(), 1,
101+
s.processedMetric.AttrComponentType(otelconv.ComponentTypeSimpleLogProcessor),
102+
s.processedMetric.AttrComponentName(s.componentName),
103+
s.processedMetric.AttrErrorType(otelconv.ErrorTypeOther))
104+
}
105+
106+
if s.selfObservabilityEnabled && err == nil {
107+
s.processedMetric.Add(context.Background(), 1,
108+
s.processedMetric.AttrComponentType(otelconv.ComponentTypeSimpleLogProcessor),
109+
s.processedMetric.AttrComponentName(s.componentName))
110+
}
111+
112+
return err
59113
}
60114

61115
// Shutdown shuts down the exporter.

sdk/log/simple_test.go

Lines changed: 222 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package log_test
55

66
import (
77
"context"
8+
"errors"
89
"io"
910
"strings"
1011
"sync"
@@ -13,7 +14,14 @@ import (
1314
"github.com/stretchr/testify/assert"
1415
"github.com/stretchr/testify/require"
1516

17+
"go.opentelemetry.io/otel"
18+
"go.opentelemetry.io/otel/metric"
19+
"go.opentelemetry.io/otel/metric/noop"
1620
"go.opentelemetry.io/otel/sdk/log"
21+
"go.opentelemetry.io/otel/sdk/log/internal/x"
22+
metricSDK "go.opentelemetry.io/otel/sdk/metric"
23+
"go.opentelemetry.io/otel/sdk/metric/metricdata"
24+
"go.opentelemetry.io/otel/semconv/v1.36.0/otelconv"
1725
)
1826

1927
type exporter struct {
@@ -120,6 +128,220 @@ func TestSimpleProcessorConcurrentSafe(*testing.T) {
120128
wg.Wait()
121129
}
122130

131+
type errorExporter struct {
132+
err error
133+
}
134+
135+
func (e *errorExporter) Export(_ context.Context, _ []log.Record) error {
136+
return e.err
137+
}
138+
139+
func (e *errorExporter) Shutdown(context.Context) error {
140+
return nil
141+
}
142+
143+
func (e *errorExporter) ForceFlush(context.Context) error {
144+
return nil
145+
}
146+
147+
type failingMeterProvider struct {
148+
noop.MeterProvider
149+
}
150+
151+
func (mp *failingMeterProvider) Meter(name string, opts ...metric.MeterOption) metric.Meter {
152+
return &failingMeter{Meter: noop.NewMeterProvider().Meter(name, opts...)}
153+
}
154+
155+
type failingMeter struct {
156+
metric.Meter
157+
}
158+
159+
func (m *failingMeter) Int64Counter(_ string, _ ...metric.Int64CounterOption) (metric.Int64Counter, error) {
160+
return nil, errors.New("failed to create counter")
161+
}
162+
163+
func TestSimpleProcessorSelfObservability(t *testing.T) {
164+
originalMP := otel.GetMeterProvider()
165+
defer otel.SetMeterProvider(originalMP)
166+
167+
t.Run("self observability disabled", func(t *testing.T) {
168+
t.Setenv(x.SelfObservability.Key(), "")
169+
170+
e := new(exporter)
171+
s := log.NewSimpleProcessor(e)
172+
173+
r := new(log.Record)
174+
r.SetSeverityText("test")
175+
_ = s.OnEmit(context.Background(), r)
176+
177+
require.True(t, e.exportCalled)
178+
assert.Equal(t, []log.Record{*r}, e.records)
179+
})
180+
181+
t.Run("self observability enabled without error", func(t *testing.T) {
182+
t.Setenv(x.SelfObservability.Key(), "true")
183+
184+
reader := metricSDK.NewManualReader()
185+
mp := metricSDK.NewMeterProvider(metricSDK.WithReader(reader))
186+
otel.SetMeterProvider(mp)
187+
188+
e := new(exporter)
189+
s := log.NewSimpleProcessor(e)
190+
191+
r := new(log.Record)
192+
r.SetSeverityText("test")
193+
194+
var err error
195+
err = s.OnEmit(context.Background(), r)
196+
require.NoError(t, err)
197+
198+
err = s.OnEmit(context.Background(), r)
199+
require.NoError(t, err)
200+
201+
err = s.OnEmit(context.Background(), r)
202+
require.NoError(t, err)
203+
204+
rm := metricdata.ResourceMetrics{}
205+
err = reader.Collect(context.Background(), &rm)
206+
require.NoError(t, err)
207+
208+
var processedMetric *metricdata.ScopeMetrics
209+
for _, scopeMetrics := range rm.ScopeMetrics {
210+
for _, m := range scopeMetrics.Metrics {
211+
if m.Name == "otel.sdk.processor.log.processed" {
212+
processedMetric = &scopeMetrics
213+
break
214+
}
215+
}
216+
}
217+
218+
require.NotNil(t, processedMetric)
219+
220+
totalCount, _, hasComponentType, hasComponentName := extractProcessedLogMetrics(processedMetric, false)
221+
222+
assert.Equal(t, int64(3), totalCount)
223+
assert.True(t, hasComponentType)
224+
assert.True(t, hasComponentName)
225+
})
226+
227+
t.Run("self observability enabled with error", func(t *testing.T) {
228+
t.Setenv(x.SelfObservability.Key(), "true")
229+
230+
reader := metricSDK.NewManualReader()
231+
mp := metricSDK.NewMeterProvider(metricSDK.WithReader(reader))
232+
otel.SetMeterProvider(mp)
233+
234+
e := &errorExporter{err: errors.New("export failed")}
235+
s := log.NewSimpleProcessor(e)
236+
237+
r := new(log.Record)
238+
r.SetSeverityText("test")
239+
_ = s.OnEmit(context.Background(), r)
240+
_ = s.OnEmit(context.Background(), r)
241+
242+
rm := metricdata.ResourceMetrics{}
243+
err := reader.Collect(context.Background(), &rm)
244+
require.NoError(t, err)
245+
246+
var processedMetric *metricdata.ScopeMetrics
247+
for _, scopeMetrics := range rm.ScopeMetrics {
248+
for _, m := range scopeMetrics.Metrics {
249+
if m.Name == "otel.sdk.processor.log.processed" {
250+
processedMetric = &scopeMetrics
251+
break
252+
}
253+
}
254+
}
255+
256+
require.NotNil(t, processedMetric)
257+
258+
totalCount, hasErrorType, hasComponentType, hasComponentName := extractProcessedLogMetrics(
259+
processedMetric, true,
260+
)
261+
262+
assert.Equal(t, int64(2), totalCount)
263+
assert.True(t, hasErrorType)
264+
assert.True(t, hasComponentType)
265+
assert.True(t, hasComponentName)
266+
})
267+
268+
t.Run("self observability enabled", func(t *testing.T) {
269+
t.Setenv(x.SelfObservability.Key(), "true")
270+
271+
otel.SetMeterProvider(noop.NewMeterProvider())
272+
273+
e := new(exporter)
274+
s := log.NewSimpleProcessor(e)
275+
276+
r := new(log.Record)
277+
r.SetSeverityText("test")
278+
assert.NotPanics(t, func() {
279+
_ = s.OnEmit(context.Background(), r)
280+
})
281+
282+
require.True(t, e.exportCalled)
283+
assert.Equal(t, []log.Record{*r}, e.records)
284+
})
285+
286+
t.Run("self observability metric creation error handled", func(t *testing.T) {
287+
t.Setenv(x.SelfObservability.Key(), "true")
288+
289+
failingMP := &failingMeterProvider{}
290+
otel.SetMeterProvider(failingMP)
291+
292+
assert.NotPanics(t, func() {
293+
e := new(exporter)
294+
s := log.NewSimpleProcessor(e)
295+
296+
r := new(log.Record)
297+
r.SetSeverityText("test")
298+
_ = s.OnEmit(context.Background(), r)
299+
300+
require.True(t, e.exportCalled)
301+
assert.Equal(t, []log.Record{*r}, e.records)
302+
})
303+
})
304+
}
305+
306+
func extractProcessedLogMetrics(
307+
processedMetric *metricdata.ScopeMetrics,
308+
checkError bool,
309+
) (totalCount int64, hasErrorType, hasComponentType, hasComponentName bool) {
310+
for _, m := range processedMetric.Metrics {
311+
if m.Name != "otel.sdk.processor.log.processed" {
312+
continue
313+
}
314+
315+
data, ok := m.Data.(metricdata.Sum[int64])
316+
if !ok {
317+
continue
318+
}
319+
320+
for _, dataPoint := range data.DataPoints {
321+
totalCount += dataPoint.Value
322+
for _, attr := range dataPoint.Attributes.ToSlice() {
323+
switch attr.Key {
324+
case "error.type":
325+
if checkError && attr.Value.AsString() == string(otelconv.ErrorTypeOther) {
326+
hasErrorType = true
327+
}
328+
case "otel.component.type":
329+
if attr.Value.AsString() == string(otelconv.ComponentTypeSimpleLogProcessor) {
330+
hasComponentType = true
331+
}
332+
case "otel.component.name":
333+
if strings.HasPrefix(attr.Value.AsString(), "simple_log_processor/") {
334+
hasComponentName = true
335+
} else if checkError {
336+
hasComponentName = true
337+
}
338+
}
339+
}
340+
}
341+
}
342+
return totalCount, hasErrorType, hasComponentType, hasComponentName
343+
}
344+
123345
func BenchmarkSimpleProcessorOnEmit(b *testing.B) {
124346
r := new(log.Record)
125347
r.SetSeverityText("test")

0 commit comments

Comments
 (0)