diff --git a/.lycheeignore b/.lycheeignore index 5328505888d..f08a6374e3e 100644 --- a/.lycheeignore +++ b/.lycheeignore @@ -1,4 +1,5 @@ http://localhost +https://localhost http://jaeger-collector https://github.com/open-telemetry/opentelemetry-go/milestone/ https://github.com/open-telemetry/opentelemetry-go/projects diff --git a/CHANGELOG.md b/CHANGELOG.md index bebac401022..cfc468f46e3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -57,6 +57,7 @@ The next release will require at least [Go 1.24]. - Add experimental self-observability trace exporter metrics in `go.opentelemetry.io/otel/exporters/stdout/stdouttrace`. Check the `go.opentelemetry.io/otel/exporters/stdout/stdouttrace/internal/x` package documentation for more information. (#7133) - Support testing of [Go 1.25]. (#7187) +- Add experimental self-observability metrics in `go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc` (#7120) ### Changed diff --git a/exporters/otlp/otlpmetric/otlpmetricgrpc/doc.go b/exporters/otlp/otlpmetric/otlpmetricgrpc/doc.go index dcd8de5df4e..62b05626fc2 100644 --- a/exporters/otlp/otlpmetric/otlpmetricgrpc/doc.go +++ b/exporters/otlp/otlpmetric/otlpmetricgrpc/doc.go @@ -77,6 +77,9 @@ default aggregation to use for histogram instruments. Supported values: The configuration can be overridden by [WithAggregationSelector] option. +See [go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc/internal/x] for information about +the experimental features. + [W3C Baggage HTTP Header Content Format]: https://www.w3.org/TR/baggage/#header-content [Explicit Bucket Histogram Aggregation]: https://github.com/open-telemetry/opentelemetry-specification/blob/v1.26.0/specification/metrics/sdk.md#explicit-bucket-histogram-aggregation [Base2 Exponential Bucket Histogram Aggregation]: https://github.com/open-telemetry/opentelemetry-specification/blob/v1.26.0/specification/metrics/sdk.md#base2-exponential-bucket-histogram-aggregation diff --git a/exporters/otlp/otlpmetric/otlpmetricgrpc/exporter.go b/exporters/otlp/otlpmetric/otlpmetricgrpc/exporter.go index 35cdf466127..3da65f980a4 100644 --- a/exporters/otlp/otlpmetric/otlpmetricgrpc/exporter.go +++ b/exporters/otlp/otlpmetric/otlpmetricgrpc/exporter.go @@ -12,10 +12,12 @@ import ( metricpb "go.opentelemetry.io/proto/otlp/metrics/v1" "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc/internal/oconf" + "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc/internal/selfobservability" "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc/internal/transform" "go.opentelemetry.io/otel/internal/global" "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/semconv/v1.36.0/otelconv" ) // Exporter is a OpenTelemetry metric Exporter using gRPC. @@ -31,6 +33,9 @@ type Exporter struct { aggregationSelector metric.AggregationSelector shutdownOnce sync.Once + + // Self-observability metrics + metrics *selfobservability.ExporterMetrics } func newExporter(c *client, cfg oconf.Config) (*Exporter, error) { @@ -46,11 +51,20 @@ func newExporter(c *client, cfg oconf.Config) (*Exporter, error) { as = metric.DefaultAggregationSelector } + // Extract server address and port from endpoint for self-observability + serverAddress, serverPort := selfobservability.ParseEndpoint(cfg.Metrics.Endpoint) + return &Exporter{ client: c, temporalitySelector: ts, aggregationSelector: as, + + metrics: selfobservability.NewExporterMetrics( + string(otelconv.ComponentTypeOtlpGRPCMetricExporter), + serverAddress, + serverPort, + ), }, nil } @@ -68,19 +82,24 @@ func (e *Exporter) Aggregation(k metric.InstrumentKind) metric.Aggregation { // // This method returns an error if called after Shutdown. // This method returns an error if the method is canceled by the passed context. -func (e *Exporter) Export(ctx context.Context, rm *metricdata.ResourceMetrics) error { +func (e *Exporter) Export(ctx context.Context, rm *metricdata.ResourceMetrics) (finalErr error) { defer global.Debug("OTLP/gRPC exporter export", "Data", rm) + // Track export operation for self-observability + finishTracking := e.metrics.TrackExport(ctx, rm) + defer func() { finishTracking(finalErr) }() + otlpRm, err := transform.ResourceMetrics(rm) // Best effort upload of transformable metrics. e.clientMu.Lock() upErr := e.client.UploadMetrics(ctx, otlpRm) e.clientMu.Unlock() + + // Return the appropriate error if upErr != nil { if err == nil { return fmt.Errorf("failed to upload metrics: %w", upErr) } - // Merge the two errors. return fmt.Errorf("failed to upload incomplete metrics (%w): %w", err, upErr) } return err diff --git a/exporters/otlp/otlpmetric/otlpmetricgrpc/exporter_test.go b/exporters/otlp/otlpmetric/otlpmetricgrpc/exporter_test.go index 3039fc63b17..581c93f70a1 100644 --- a/exporters/otlp/otlpmetric/otlpmetricgrpc/exporter_test.go +++ b/exporters/otlp/otlpmetric/otlpmetricgrpc/exporter_test.go @@ -62,8 +62,8 @@ func TestExporterClientConcurrentSafe(t *testing.T) { } someWork.Wait() - assert.NoError(t, exp.Shutdown(ctx)) - assert.ErrorIs(t, exp.Shutdown(ctx), errShutdown) + require.NoError(t, exp.Shutdown(ctx)) + require.ErrorIs(t, exp.Shutdown(ctx), errShutdown) close(done) wg.Wait() diff --git a/exporters/otlp/otlpmetric/otlpmetricgrpc/go.mod b/exporters/otlp/otlpmetric/otlpmetricgrpc/go.mod index 485cc023030..4d7f243f48b 100644 --- a/exporters/otlp/otlpmetric/otlpmetricgrpc/go.mod +++ b/exporters/otlp/otlpmetric/otlpmetricgrpc/go.mod @@ -9,6 +9,7 @@ require ( github.com/google/go-cmp v0.7.0 github.com/stretchr/testify v1.10.0 go.opentelemetry.io/otel v1.37.0 + go.opentelemetry.io/otel/metric v1.37.0 go.opentelemetry.io/otel/sdk v1.37.0 go.opentelemetry.io/otel/sdk/metric v1.37.0 go.opentelemetry.io/proto/otlp v1.7.1 @@ -25,7 +26,6 @@ require ( github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect go.opentelemetry.io/auto/sdk v1.1.0 // indirect - go.opentelemetry.io/otel/metric v1.37.0 // indirect go.opentelemetry.io/otel/trace v1.37.0 // indirect golang.org/x/net v0.43.0 // indirect golang.org/x/sys v0.35.0 // indirect diff --git a/exporters/otlp/otlpmetric/otlpmetricgrpc/internal/selfobservability/selfobservability.go b/exporters/otlp/otlpmetric/otlpmetricgrpc/internal/selfobservability/selfobservability.go new file mode 100644 index 00000000000..1ee02758b53 --- /dev/null +++ b/exporters/otlp/otlpmetric/otlpmetricgrpc/internal/selfobservability/selfobservability.go @@ -0,0 +1,188 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +// Package selfobservability provides self-observability metrics for OTLP metric exporters. +// This is an experimental feature controlled by the x.SelfObservability feature flag. +package selfobservability // import "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc/internal/selfobservability" + +import ( + "context" + "fmt" + "net/url" + "strconv" + "strings" + "sync/atomic" + "time" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/sdk" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + semconv "go.opentelemetry.io/otel/semconv/v1.36.0" + "go.opentelemetry.io/otel/semconv/v1.36.0/otelconv" + + "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc/internal/x" +) + +// exporterIDCounter is used to generate unique component names for exporters. +var exporterIDCounter atomic.Uint64 + +// nextExporterID returns the next unique exporter ID. +func nextExporterID() uint64 { + return exporterIDCounter.Add(1) - 1 +} + +// ExporterMetrics holds the self-observability metric instruments for an OTLP metric exporter. +type ExporterMetrics struct { + exported otelconv.SDKExporterMetricDataPointExported + inflight otelconv.SDKExporterMetricDataPointInflight + duration otelconv.SDKExporterOperationDuration + attrs []attribute.KeyValue + enabled bool +} + +// NewExporterMetrics creates a new ExporterMetrics instance. +// If self-observability is disabled, returns a no-op instance. +func NewExporterMetrics(componentType, serverAddress string, serverPort int) *ExporterMetrics { + em := &ExporterMetrics{ + enabled: x.SelfObservability.Enabled(), + } + + if !em.enabled { + return em + } + + meter := otel.GetMeterProvider().Meter( + "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc", + metric.WithInstrumentationVersion(sdk.Version()), + metric.WithSchemaURL(semconv.SchemaURL), + ) + + var err error + em.exported, err = otelconv.NewSDKExporterMetricDataPointExported(meter) + if err != nil { + em.enabled = false + return em + } + + em.inflight, err = otelconv.NewSDKExporterMetricDataPointInflight(meter) + if err != nil { + em.enabled = false + return em + } + + em.duration, err = otelconv.NewSDKExporterOperationDuration(meter) + if err != nil { + em.enabled = false + return em + } + + // Set up common attributes + componentName := fmt.Sprintf("%s/%d", componentType, nextExporterID()) + em.attrs = []attribute.KeyValue{ + semconv.OTelComponentTypeKey.String(componentType), + semconv.OTelComponentName(componentName), + semconv.ServerAddress(serverAddress), + semconv.ServerPort(serverPort), + } + + return em +} + +// TrackExport tracks an export operation and returns a function to complete the tracking. +// The returned function should be called when the export operation completes. +func (em *ExporterMetrics) TrackExport(ctx context.Context, rm *metricdata.ResourceMetrics) func(error) { + if !em.enabled { + return func(error) {} + } + + dataPointCount := countDataPoints(rm) + startTime := time.Now() + + em.inflight.Add(ctx, dataPointCount, em.attrs...) + + return func(err error) { + em.inflight.Add(ctx, -dataPointCount, em.attrs...) + + duration := time.Since(startTime).Seconds() + + attrs := make([]attribute.KeyValue, len(em.attrs), len(em.attrs)+1) + copy(attrs, em.attrs) + if err != nil { + attrs = append(attrs, semconv.ErrorTypeOther) + } + em.duration.Inst().Record(ctx, duration, metric.WithAttributes(attrs...)) + + if err == nil { + em.exported.Add(ctx, dataPointCount, em.attrs...) + } + } +} + +// countDataPoints counts the total number of data points in a ResourceMetrics. +func countDataPoints(rm *metricdata.ResourceMetrics) int64 { + if rm == nil { + return 0 + } + + var total int64 + for _, sm := range rm.ScopeMetrics { + for _, m := range sm.Metrics { + switch data := m.Data.(type) { + case metricdata.Gauge[int64]: + total += int64(len(data.DataPoints)) + case metricdata.Gauge[float64]: + total += int64(len(data.DataPoints)) + case metricdata.Sum[int64]: + total += int64(len(data.DataPoints)) + case metricdata.Sum[float64]: + total += int64(len(data.DataPoints)) + case metricdata.Histogram[int64]: + total += int64(len(data.DataPoints)) + case metricdata.Histogram[float64]: + total += int64(len(data.DataPoints)) + case metricdata.ExponentialHistogram[int64]: + total += int64(len(data.DataPoints)) + case metricdata.ExponentialHistogram[float64]: + total += int64(len(data.DataPoints)) + case metricdata.Summary: + total += int64(len(data.DataPoints)) + } + } + } + return total +} + +// ParseEndpoint extracts server address and port from an endpoint URL. +// Returns defaults if parsing fails or endpoint is empty. +func ParseEndpoint(endpoint string) (address string, port int) { + address = "localhost" + port = 4317 + + if endpoint == "" { + return + } + + // Handle endpoint without scheme + if !strings.Contains(endpoint, "://") { + endpoint = "http://" + endpoint + } + + u, err := url.Parse(endpoint) + if err != nil { + return + } + + if u.Hostname() != "" { + address = u.Hostname() + } + + if u.Port() != "" { + if p, err := strconv.Atoi(u.Port()); err == nil { + port = p + } + } + + return +} diff --git a/exporters/otlp/otlpmetric/otlpmetricgrpc/internal/selfobservability/selfobservability_test.go b/exporters/otlp/otlpmetric/otlpmetricgrpc/internal/selfobservability/selfobservability_test.go new file mode 100644 index 00000000000..f7a06abc303 --- /dev/null +++ b/exporters/otlp/otlpmetric/otlpmetricgrpc/internal/selfobservability/selfobservability_test.go @@ -0,0 +1,517 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package selfobservability + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/sdk" + "go.opentelemetry.io/otel/sdk/instrumentation" + "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" + "go.opentelemetry.io/otel/sdk/resource" + semconv "go.opentelemetry.io/otel/semconv/v1.36.0" + "go.opentelemetry.io/otel/semconv/v1.36.0/otelconv" + + "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc/internal/x" +) + +func TestNewExporterMetrics_Disabled(t *testing.T) { + // Ensure feature is disabled + t.Setenv("OTEL_GO_X_SELF_OBSERVABILITY", "false") + + em := NewExporterMetrics("test_component", "localhost", 4317) + + assert.False(t, em.enabled, "metrics should be disabled when feature flag is false") + + // Tracking should be no-op when disabled + reader := metric.NewManualReader() + provider := metric.NewMeterProvider(metric.WithReader(reader)) + otel.SetMeterProvider(provider) + + finish := em.TrackExport(context.Background(), createTestResourceMetrics()) + finish(nil) + finish(errors.New("test error")) + + // Verify no metrics were recorded when disabled + rm := &metricdata.ResourceMetrics{} + err := reader.Collect(context.Background(), rm) + require.NoError(t, err, "failed to collect metrics") + + totalMetrics := 0 + for _, sm := range rm.ScopeMetrics { + totalMetrics += len(sm.Metrics) + } + assert.Zero(t, totalMetrics, "expected no metrics when disabled") +} + +func TestNewExporterMetrics_Enabled(t *testing.T) { + // Enable feature + t.Setenv("OTEL_GO_X_SELF_OBSERVABILITY", "true") + + // Set up a test meter provider + reader := metric.NewManualReader() + provider := metric.NewMeterProvider(metric.WithReader(reader)) + otel.SetMeterProvider(provider) + + em := NewExporterMetrics("test_component", "example.com", 4317) + + assert.True(t, em.enabled, "metrics should be enabled when feature flag is true") + + // Verify attributes are set correctly + expectedAttrs := []attribute.KeyValue{ + semconv.OTelComponentTypeKey.String("test_component"), + semconv.OTelComponentName("test_component/0"), + semconv.ServerAddress("example.com"), + semconv.ServerPort(4317), + } + + assert.Len(t, em.attrs, len(expectedAttrs), "attributes length mismatch") + assert.Equal(t, expectedAttrs, em.attrs, "attributes should match expected values") +} + +func TestNewExporterMetrics_MeterFailure(t *testing.T) { + // Enable feature + t.Setenv("OTEL_GO_X_SELF_OBSERVABILITY", "true") + + // Use a meter provider that will cause metric creation to work + // but test the error handling paths by using nil meter in the semantic convention + reader := metric.NewManualReader() + provider := metric.NewMeterProvider(metric.WithReader(reader)) + otel.SetMeterProvider(provider) + + // This test primarily covers the enabled path, but the error handling + // is covered by the semantic convention's internal nil checks + em := NewExporterMetrics("test_component", "example.com", 4317) + + // Should be enabled with valid meter provider + assert.True(t, em.enabled, "metrics should be enabled when meter provider is valid") + + // Test that tracking works properly + finish := em.TrackExport(context.Background(), createTestResourceMetrics()) + finish(nil) + finish(errors.New("test error")) + + rm := &metricdata.ResourceMetrics{} + err := reader.Collect(context.Background(), rm) + require.NoError(t, err, "failed to collect metrics") + + // Verify metrics were recorded + totalMetrics := 0 + for _, sm := range rm.ScopeMetrics { + if sm.Scope.Name == "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc" { + totalMetrics += len(sm.Metrics) + } + } + assert.Positive(t, totalMetrics, "expected self-observability metrics to be recorded when enabled") +} + +func TestTrackExport_Success(t *testing.T) { + t.Setenv("OTEL_GO_X_SELF_OBSERVABILITY", "true") + + reader := metric.NewManualReader() + provider := metric.NewMeterProvider(metric.WithReader(reader)) + otel.SetMeterProvider(provider) + + em := NewExporterMetrics("test_component", "localhost", 4317) + rm := createTestResourceMetrics() + + // Track export operation + finish := em.TrackExport(context.Background(), rm) + time.Sleep(10 * time.Millisecond) // Small delay to measure duration + finish(nil) // Success + + var got metricdata.ResourceMetrics + err := reader.Collect(context.Background(), &got) + require.NoError(t, err) + require.Len(t, got.ScopeMetrics, 1) + + actualComponentName := extractComponentName(got.ScopeMetrics[0]) + + want := metricdata.ScopeMetrics{ + Scope: instrumentation.Scope{ + Name: "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc", + Version: sdk.Version(), + SchemaURL: semconv.SchemaURL, + }, + Metrics: []metricdata.Metrics{ + { + Name: otelconv.SDKExporterMetricDataPointExported{}.Name(), + Description: otelconv.SDKExporterMetricDataPointExported{}.Description(), + Unit: otelconv.SDKExporterMetricDataPointExported{}.Unit(), + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet( + semconv.OTelComponentName(actualComponentName), + semconv.OTelComponentTypeKey.String("test_component"), + semconv.ServerAddress("localhost"), + semconv.ServerPort(4317), + ), + Value: 10, + }, + }, + }, + }, + { + Name: otelconv.SDKExporterMetricDataPointInflight{}.Name(), + Description: otelconv.SDKExporterMetricDataPointInflight{}.Description(), + Unit: otelconv.SDKExporterMetricDataPointInflight{}.Unit(), + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: false, + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet( + semconv.OTelComponentName(actualComponentName), + semconv.OTelComponentTypeKey.String("test_component"), + semconv.ServerAddress("localhost"), + semconv.ServerPort(4317), + ), + Value: 0, + }, + }, + }, + }, + { + Name: otelconv.SDKExporterOperationDuration{}.Name(), + Description: otelconv.SDKExporterOperationDuration{}.Description(), + Unit: otelconv.SDKExporterOperationDuration{}.Unit(), + Data: metricdata.Histogram[float64]{ + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.HistogramDataPoint[float64]{ + { + Attributes: attribute.NewSet( + semconv.OTelComponentName(actualComponentName), + semconv.OTelComponentTypeKey.String("test_component"), + semconv.ServerAddress("localhost"), + semconv.ServerPort(4317), + ), + Count: 1, + }, + }, + }, + }, + }, + } + + metricdatatest.AssertEqual(t, want, got.ScopeMetrics[0], + metricdatatest.IgnoreTimestamp(), + metricdatatest.IgnoreValue()) +} + +func TestTrackExport_Error(t *testing.T) { + t.Setenv("OTEL_GO_X_SELF_OBSERVABILITY", "true") + + reader := metric.NewManualReader() + provider := metric.NewMeterProvider(metric.WithReader(reader)) + otel.SetMeterProvider(provider) + + em := NewExporterMetrics("test_component", "localhost", 4317) + rm := createTestResourceMetrics() + + // Track export operation that fails + finish := em.TrackExport(context.Background(), rm) + finish(errors.New("export failed")) + + var got metricdata.ResourceMetrics + err := reader.Collect(context.Background(), &got) + require.NoError(t, err) + require.Len(t, got.ScopeMetrics, 1) + + actualComponentName := extractComponentName(got.ScopeMetrics[0]) + + want := metricdata.ScopeMetrics{ + Scope: instrumentation.Scope{ + Name: "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc", + Version: sdk.Version(), + SchemaURL: semconv.SchemaURL, + }, + Metrics: []metricdata.Metrics{ + { + Name: otelconv.SDKExporterMetricDataPointInflight{}.Name(), + Description: otelconv.SDKExporterMetricDataPointInflight{}.Description(), + Unit: otelconv.SDKExporterMetricDataPointInflight{}.Unit(), + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: false, + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet( + semconv.OTelComponentName(actualComponentName), + semconv.OTelComponentTypeKey.String("test_component"), + semconv.ServerAddress("localhost"), + semconv.ServerPort(4317), + ), + Value: 0, + }, + }, + }, + }, + { + Name: otelconv.SDKExporterOperationDuration{}.Name(), + Description: otelconv.SDKExporterOperationDuration{}.Description(), + Unit: otelconv.SDKExporterOperationDuration{}.Unit(), + Data: metricdata.Histogram[float64]{ + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.HistogramDataPoint[float64]{ + { + Attributes: attribute.NewSet( + semconv.ErrorTypeOther, + semconv.OTelComponentName(actualComponentName), + semconv.OTelComponentTypeKey.String("test_component"), + semconv.ServerAddress("localhost"), + semconv.ServerPort(4317), + ), + Count: 1, + }, + }, + }, + }, + }, + } + + metricdatatest.AssertEqual(t, want, got.ScopeMetrics[0], + metricdatatest.IgnoreTimestamp(), + metricdatatest.IgnoreValue()) +} + +func TestCountDataPoints(t *testing.T) { + tests := []struct { + name string + rm *metricdata.ResourceMetrics + expected int64 + }{ + { + name: "nil resource metrics", + rm: nil, + expected: 0, + }, + { + name: "empty resource metrics", + rm: &metricdata.ResourceMetrics{}, + expected: 0, + }, + { + name: "test data", + rm: createTestResourceMetrics(), + expected: 10, // 2 gauge + 1 gauge + 1 sum + 1 sum + 1 histogram + 1 histogram + 1 exponential histogram + 1 exponential histogram + 1 summary + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + count := countDataPoints(tt.rm) + assert.Equal(t, tt.expected, count, "data points count mismatch") + }) + } +} + +func TestParseEndpoint(t *testing.T) { + tests := []struct { + name string + endpoint string + wantAddress string + wantPort int + }{ + { + name: "empty endpoint", + endpoint: "", + wantAddress: "localhost", + wantPort: 4317, + }, + { + name: "host only", + endpoint: "example.com", + wantAddress: "example.com", + wantPort: 4317, + }, + { + name: "host with port", + endpoint: "example.com:9090", + wantAddress: "example.com", + wantPort: 9090, + }, + { + name: "full URL", + endpoint: "https://example.com:9090/v1/metrics", + wantAddress: "example.com", + wantPort: 9090, + }, + { + name: "invalid URL", + endpoint: "://invalid", + wantAddress: "localhost", + wantPort: 4317, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + address, port := ParseEndpoint(tt.endpoint) + assert.Equal(t, tt.wantAddress, address, "address mismatch") + assert.Equal(t, tt.wantPort, port, "port mismatch") + }) + } +} + +func TestIsSelfObservabilityEnabled(t *testing.T) { + tests := []struct { + name string + envValue string + want bool + }{ + {"unset", "", false}, + {"false", "false", false}, + {"true lowercase", "true", true}, + {"true uppercase", "TRUE", true}, + {"true mixed case", "True", true}, + {"invalid", "invalid", false}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if tt.envValue != "" { + t.Setenv("OTEL_GO_X_SELF_OBSERVABILITY", tt.envValue) + } + + got := x.SelfObservability.Enabled() + assert.Equal(t, tt.want, got, "self-observability enabled state mismatch") + }) + } +} + +// extractComponentName extracts the component name from metrics data to handle dynamic counter. +func extractComponentName(scopeMetrics metricdata.ScopeMetrics) string { + for _, m := range scopeMetrics.Metrics { + switch data := m.Data.(type) { + case metricdata.Sum[int64]: + if len(data.DataPoints) > 0 { + attrs := data.DataPoints[0].Attributes.ToSlice() + for _, attr := range attrs { + if attr.Key == semconv.OTelComponentNameKey { + return attr.Value.AsString() + } + } + } + case metricdata.Histogram[float64]: + if len(data.DataPoints) > 0 { + attrs := data.DataPoints[0].Attributes.ToSlice() + for _, attr := range attrs { + if attr.Key == semconv.OTelComponentNameKey { + return attr.Value.AsString() + } + } + } + } + } + return "" +} + +// createTestResourceMetrics creates sample data for testing. +func createTestResourceMetrics() *metricdata.ResourceMetrics { + now := time.Now() + return &metricdata.ResourceMetrics{ + Resource: resource.Default(), + ScopeMetrics: []metricdata.ScopeMetrics{ + { + Scope: instrumentation.Scope{Name: "test", Version: "v1"}, + Metrics: []metricdata.Metrics{ + { + Name: "test_gauge_int", + Data: metricdata.Gauge[int64]{ + DataPoints: []metricdata.DataPoint[int64]{ + {Value: 1, Time: now}, + {Value: 2, Time: now}, + }, + }, + }, + { + Name: "test_gauge_float", + Data: metricdata.Gauge[float64]{ + DataPoints: []metricdata.DataPoint[float64]{ + {Value: 1.5, Time: now}, + }, + }, + }, + { + Name: "test_sum_int", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + {Value: 10, Time: now}, + }, + }, + }, + { + Name: "test_sum_float", + Data: metricdata.Sum[float64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[float64]{ + {Value: 3.5, Time: now}, + }, + }, + }, + { + Name: "test_histogram_int", + Data: metricdata.Histogram[int64]{ + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.HistogramDataPoint[int64]{ + {Count: 5, Sum: 15, Time: now}, + }, + }, + }, + { + Name: "test_histogram_float", + Data: metricdata.Histogram[float64]{ + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.HistogramDataPoint[float64]{ + {Count: 10, Sum: 5.0, Time: now}, + }, + }, + }, + { + Name: "test_exponential_histogram_int", + Data: metricdata.ExponentialHistogram[int64]{ + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.ExponentialHistogramDataPoint[int64]{ + {Count: 3, Sum: 9, Time: now, Scale: 1}, + }, + }, + }, + { + Name: "test_exponential_histogram_float", + Data: metricdata.ExponentialHistogram[float64]{ + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.ExponentialHistogramDataPoint[float64]{ + {Count: 2, Sum: 4.5, Time: now, Scale: 1}, + }, + }, + }, + { + Name: "test_summary", + Data: metricdata.Summary{ + DataPoints: []metricdata.SummaryDataPoint{ + {Count: 7, Sum: 21.0, Time: now}, + }, + }, + }, + }, + }, + }, + } +} diff --git a/exporters/otlp/otlpmetric/otlpmetricgrpc/internal/x/README.md b/exporters/otlp/otlpmetric/otlpmetricgrpc/internal/x/README.md new file mode 100644 index 00000000000..bb43010edbf --- /dev/null +++ b/exporters/otlp/otlpmetric/otlpmetricgrpc/internal/x/README.md @@ -0,0 +1,55 @@ +# Experimental Features + +The OTLP gRPC metric exporter contains features that have not yet stabilized in the OpenTelemetry specification. +These features are added to the OpenTelemetry Go OTLP exporters prior to stabilization in the specification so that users can start experimenting with them and provide feedback. + +These features may change in backwards incompatible ways as feedback is applied. +See the [Compatibility and Stability](#compatibility-and-stability) section for more information. + +## Features + +- [Self-Observability](#self-observability) + +### Self-Observability + +The OTLP gRPC metric exporter can emit self-observability metrics to track its own operation. + +This experimental feature can be enabled by setting the `OTEL_GO_X_SELF_OBSERVABILITY` environment variable. +The value must be the case-insensitive string of `"true"` to enable the feature. +All other values are ignored. + +When enabled, the exporter will emit the following metrics using the global MeterProvider: + +- `otel.sdk.exporter.metric_data_point.exported`: Counter tracking successfully exported data points +- `otel.sdk.exporter.metric_data_point.inflight`: UpDownCounter tracking data points currently being exported +- `otel.sdk.exporter.operation.duration`: Histogram tracking export operation duration in seconds + +All metrics include attributes identifying the exporter component and destination server: + +- `otel.component.type`: Type of component (e.g., "otlp_grpc_metric_exporter") +- `otel.component.name`: Unique component instance name (e.g., "otlp_grpc_metric_exporter/0") +- `server.address`: Server hostname or address +- `server.port`: Server port number + +#### Examples + +Enable self-observability metrics. + +```console +export OTEL_GO_X_SELF_OBSERVABILITY=true +``` + +Disable self-observability metrics. + +```console +unset OTEL_GO_X_SELF_OBSERVABILITY +``` + +## Compatibility and Stability + +Experimental features do not fall within the scope of the OpenTelemetry Go versioning and stability [policy](../../../../../../VERSIONING.md). +These features may be removed or modified in successive version releases, including patch versions. + +When an experimental feature is promoted to a stable feature, a migration path will be included in the changelog entry of the release. +There is no guarantee that any environment variable feature flags that enabled the experimental feature will be supported by the stable version. +If they are supported, they may be accompanied with a deprecation notice stating a timeline for the removal of that support. diff --git a/exporters/otlp/otlpmetric/otlpmetricgrpc/internal/x/x.go b/exporters/otlp/otlpmetric/otlpmetricgrpc/internal/x/x.go new file mode 100644 index 00000000000..04d77542308 --- /dev/null +++ b/exporters/otlp/otlpmetric/otlpmetricgrpc/internal/x/x.go @@ -0,0 +1,66 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +// Package x contains support for OTLP gRPC metric exporter experimental features. +// +// This package should only be used for features defined in the specification. +// It should not be used for experiments or new project ideas. +package x // import "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc/internal/x" + +import ( + "os" + "strings" +) + +// SelfObservability is an experimental feature flag that defines if OTLP +// gRPC metric exporter should include self-observability metrics. +// +// To enable this feature set the OTEL_GO_X_SELF_OBSERVABILITY environment variable +// to the case-insensitive string value of "true" (i.e. "True" and "TRUE" +// will also enable this). +var SelfObservability = newFeature("SELF_OBSERVABILITY", func(v string) (string, bool) { + if strings.EqualFold(v, "true") { + return v, true + } + return "", false +}) + +// Feature is an experimental feature control flag. It provides a uniform way +// to interact with these feature flags and parse their values. +type Feature[T any] struct { + key string + parse func(v string) (T, bool) +} + +func newFeature[T any](suffix string, parse func(string) (T, bool)) Feature[T] { + const envKeyRoot = "OTEL_GO_X_" + return Feature[T]{ + key: envKeyRoot + suffix, + parse: parse, + } +} + +// Key returns the environment variable key that needs to be set to enable the +// feature. +func (f Feature[T]) Key() string { return f.key } + +// Lookup returns the user configured value for the feature and true if the +// user has enabled the feature. Otherwise, if the feature is not enabled, a +// zero-value and false are returned. +func (f Feature[T]) Lookup() (v T, ok bool) { + // https://github.com/open-telemetry/opentelemetry-specification/blob/62effed618589a0bec416a87e559c0a9d96289bb/specification/configuration/sdk-environment-variables.md#parsing-empty-value + // + // > The SDK MUST interpret an empty value of an environment variable the + // > same way as when the variable is unset. + vRaw := os.Getenv(f.key) + if vRaw == "" { + return v, ok + } + return f.parse(vRaw) +} + +// Enabled reports whether the feature is enabled. +func (f Feature[T]) Enabled() bool { + _, ok := f.Lookup() + return ok +} diff --git a/exporters/otlp/otlpmetric/otlpmetricgrpc/internal/x/x_test.go b/exporters/otlp/otlpmetric/otlpmetricgrpc/internal/x/x_test.go new file mode 100644 index 00000000000..04811887a79 --- /dev/null +++ b/exporters/otlp/otlpmetric/otlpmetricgrpc/internal/x/x_test.go @@ -0,0 +1,60 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package x + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestSelfObservability(t *testing.T) { + const key = "OTEL_GO_X_SELF_OBSERVABILITY" + require.Equal(t, key, SelfObservability.Key()) + + t.Run("true", run(setenv("true"), assertEnabled(SelfObservability, "true"))) + t.Run("True", run(setenv("True"), assertEnabled(SelfObservability, "True"))) + t.Run("TRUE", run(setenv("TRUE"), assertEnabled(SelfObservability, "TRUE"))) + t.Run("false", run(setenv("false"), assertDisabled(SelfObservability))) + t.Run("1", run(setenv("1"), assertDisabled(SelfObservability))) + t.Run("empty", run(assertDisabled(SelfObservability))) +} + +func run(steps ...func(*testing.T)) func(*testing.T) { + return func(t *testing.T) { + t.Helper() + for _, step := range steps { + step(t) + } + } +} + +func setenv(v string) func(t *testing.T) { + return func(t *testing.T) { t.Setenv("OTEL_GO_X_SELF_OBSERVABILITY", v) } +} + +func assertEnabled[T any](f Feature[T], want T) func(*testing.T) { + return func(t *testing.T) { + t.Helper() + assert.True(t, f.Enabled(), "not enabled") + + v, ok := f.Lookup() + assert.True(t, ok, "Lookup state") + assert.Equal(t, want, v, "Lookup value") + } +} + +func assertDisabled[T any](f Feature[T]) func(*testing.T) { + var zero T + return func(t *testing.T) { + t.Helper() + + assert.False(t, f.Enabled(), "enabled") + + v, ok := f.Lookup() + assert.False(t, ok, "Lookup state") + assert.Equal(t, zero, v, "Lookup value") + } +} diff --git a/exporters/otlp/otlpmetric/otlpmetricgrpc/selfobservability_test.go b/exporters/otlp/otlpmetric/otlpmetricgrpc/selfobservability_test.go new file mode 100644 index 00000000000..9b76ed993a9 --- /dev/null +++ b/exporters/otlp/otlpmetric/otlpmetricgrpc/selfobservability_test.go @@ -0,0 +1,441 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package otlpmetricgrpc + +import ( + "context" + "net" + "strconv" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc/internal/otest" + "go.opentelemetry.io/otel/sdk" + "go.opentelemetry.io/otel/sdk/instrumentation" + "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" + "go.opentelemetry.io/otel/sdk/resource" + semconv "go.opentelemetry.io/otel/semconv/v1.36.0" + "go.opentelemetry.io/otel/semconv/v1.36.0/otelconv" +) + +func TestSelfObservability_Disabled(t *testing.T) { + // Ensure self-observability is disabled + t.Setenv("OTEL_GO_X_SELF_OBSERVABILITY", "false") + + reader := metric.NewManualReader() + provider := metric.NewMeterProvider(metric.WithReader(reader)) + otel.SetMeterProvider(provider) + + coll, err := otest.NewGRPCCollector("", nil) + require.NoError(t, err) + defer coll.Shutdown() + + exp, err := New(context.Background(), + WithEndpoint(coll.Addr().String()), + WithInsecure()) + require.NoError(t, err) + + rm := createTestResourceMetrics() + err = exp.Export(context.Background(), rm) + require.NoError(t, err) + + // Verify that no self-observability metrics are reported + selfObsMetrics := &metricdata.ResourceMetrics{} + err = reader.Collect(context.Background(), selfObsMetrics) + require.NoError(t, err) + + // Check that no self-observability metrics exist + selfObsMetricCount := 0 + for _, sm := range selfObsMetrics.ScopeMetrics { + if sm.Scope.Name == "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc" { + selfObsMetricCount += len(sm.Metrics) + } + } + assert.Equal(t, 0, selfObsMetricCount, "expected no self-observability metrics when disabled") +} + +func TestSelfObservability_Enabled(t *testing.T) { + // Enable self-observability + t.Setenv("OTEL_GO_X_SELF_OBSERVABILITY", "true") + + reader := metric.NewManualReader() + provider := metric.NewMeterProvider(metric.WithReader(reader)) + otel.SetMeterProvider(provider) + + coll, err := otest.NewGRPCCollector("", nil) + require.NoError(t, err) + defer coll.Shutdown() + + exp, err := New(context.Background(), + WithEndpoint(coll.Addr().String()), + WithInsecure()) + require.NoError(t, err) + + rm := createTestResourceMetrics() + err = exp.Export(context.Background(), rm) + require.NoError(t, err) + + var got metricdata.ResourceMetrics + err = reader.Collect(context.Background(), &got) + require.NoError(t, err) + require.Len(t, got.ScopeMetrics, 1) + + serverAddr, serverPort := parseEndpoint(coll.Addr().String()) + + want := metricdata.ScopeMetrics{ + Scope: instrumentation.Scope{ + Name: "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc", + Version: sdk.Version(), + SchemaURL: semconv.SchemaURL, + }, + Metrics: []metricdata.Metrics{ + { + Name: otelconv.SDKExporterMetricDataPointExported{}.Name(), + Description: otelconv.SDKExporterMetricDataPointExported{}.Description(), + Unit: otelconv.SDKExporterMetricDataPointExported{}.Unit(), + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet( + semconv.OTelComponentName("otlp_grpc_metric_exporter/0"), + semconv.OTelComponentTypeKey.String("otlp_grpc_metric_exporter"), + semconv.ServerAddressKey.String(serverAddr), + semconv.ServerPortKey.Int(serverPort), + ), + Value: 4, + }, + }, + }, + }, + { + Name: otelconv.SDKExporterMetricDataPointInflight{}.Name(), + Description: otelconv.SDKExporterMetricDataPointInflight{}.Description(), + Unit: otelconv.SDKExporterMetricDataPointInflight{}.Unit(), + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: false, + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet( + semconv.OTelComponentName("otlp_grpc_metric_exporter/0"), + semconv.OTelComponentTypeKey.String("otlp_grpc_metric_exporter"), + semconv.ServerAddressKey.String(serverAddr), + semconv.ServerPortKey.Int(serverPort), + ), + Value: 0, + }, + }, + }, + }, + { + Name: otelconv.SDKExporterOperationDuration{}.Name(), + Description: otelconv.SDKExporterOperationDuration{}.Description(), + Unit: otelconv.SDKExporterOperationDuration{}.Unit(), + Data: metricdata.Histogram[float64]{ + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.HistogramDataPoint[float64]{ + { + Attributes: attribute.NewSet( + semconv.OTelComponentName("otlp_grpc_metric_exporter/0"), + semconv.OTelComponentTypeKey.String("otlp_grpc_metric_exporter"), + semconv.ServerAddressKey.String(serverAddr), + semconv.ServerPortKey.Int(serverPort), + ), + Count: 1, + }, + }, + }, + }, + }, + } + + metricdatatest.AssertEqual(t, want, got.ScopeMetrics[0], + metricdatatest.IgnoreTimestamp(), + metricdatatest.IgnoreValue()) +} + +func TestSelfObservability_ExportError(t *testing.T) { + // Enable self-observability + t.Setenv("OTEL_GO_X_SELF_OBSERVABILITY", "true") + + reader := metric.NewManualReader() + provider := metric.NewMeterProvider(metric.WithReader(reader)) + otel.SetMeterProvider(provider) + + // Create exporter with invalid endpoint to force error + exp, err := New(context.Background(), + WithEndpoint("invalid:999999"), + WithInsecure()) + require.NoError(t, err) + + // Export data (should fail) + rm := createTestResourceMetrics() + err = exp.Export(context.Background(), rm) + assert.Error(t, err, "expected error but got none") + + var got metricdata.ResourceMetrics + err = reader.Collect(context.Background(), &got) + require.NoError(t, err) + require.Len(t, got.ScopeMetrics, 1) + + actualComponentName := extractComponentName(got.ScopeMetrics[0]) + + want := metricdata.ScopeMetrics{ + Scope: instrumentation.Scope{ + Name: "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc", + Version: sdk.Version(), + SchemaURL: semconv.SchemaURL, + }, + Metrics: []metricdata.Metrics{ + { + Name: otelconv.SDKExporterMetricDataPointInflight{}.Name(), + Description: otelconv.SDKExporterMetricDataPointInflight{}.Description(), + Unit: otelconv.SDKExporterMetricDataPointInflight{}.Unit(), + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: false, + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet( + semconv.OTelComponentName(actualComponentName), + semconv.OTelComponentTypeKey.String("otlp_grpc_metric_exporter"), + semconv.ServerAddress("invalid"), + semconv.ServerPort(999999), + ), + }, + }, + }, + }, + { + Name: otelconv.SDKExporterOperationDuration{}.Name(), + Description: otelconv.SDKExporterOperationDuration{}.Description(), + Unit: otelconv.SDKExporterOperationDuration{}.Unit(), + Data: metricdata.Histogram[float64]{ + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.HistogramDataPoint[float64]{ + { + Attributes: attribute.NewSet( + semconv.ErrorTypeOther, + semconv.OTelComponentName(actualComponentName), + semconv.OTelComponentTypeKey.String("otlp_grpc_metric_exporter"), + semconv.ServerAddress("invalid"), + semconv.ServerPort(999999), + ), + }, + }, + }, + }, + }, + } + + metricdatatest.AssertEqual(t, want, got.ScopeMetrics[0], + metricdatatest.IgnoreTimestamp(), + metricdatatest.IgnoreValue()) +} + +func TestSelfObservability_EndpointParsing(t *testing.T) { + // Enable self-observability + t.Setenv("OTEL_GO_X_SELF_OBSERVABILITY", "true") + + reader := metric.NewManualReader() + provider := metric.NewMeterProvider(metric.WithReader(reader)) + otel.SetMeterProvider(provider) + + coll, err := otest.NewGRPCCollector("", nil) + require.NoError(t, err) + defer coll.Shutdown() + + exp, err := New(context.Background(), + WithEndpoint(coll.Addr().String()), + WithInsecure()) + require.NoError(t, err) + + rm := createTestResourceMetrics() + err = exp.Export(context.Background(), rm) + require.NoError(t, err) + + var got metricdata.ResourceMetrics + err = reader.Collect(context.Background(), &got) + require.NoError(t, err) + require.Len(t, got.ScopeMetrics, 1) + + serverAddr, serverPort := parseEndpoint(coll.Addr().String()) + + var actualComponentName string + if len(got.ScopeMetrics[0].Metrics) > 0 { + if data, ok := got.ScopeMetrics[0].Metrics[0].Data.(metricdata.Sum[int64]); ok && len(data.DataPoints) > 0 { + attrs := data.DataPoints[0].Attributes.ToSlice() + for _, attr := range attrs { + if attr.Key == semconv.OTelComponentNameKey { + actualComponentName = attr.Value.AsString() + break + } + } + } + } + + want := metricdata.ScopeMetrics{ + Scope: instrumentation.Scope{ + Name: "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc", + Version: sdk.Version(), + SchemaURL: semconv.SchemaURL, + }, + Metrics: []metricdata.Metrics{ + { + Name: otelconv.SDKExporterMetricDataPointExported{}.Name(), + Description: otelconv.SDKExporterMetricDataPointExported{}.Description(), + Unit: otelconv.SDKExporterMetricDataPointExported{}.Unit(), + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet( + semconv.OTelComponentName(actualComponentName), + semconv.OTelComponentTypeKey.String("otlp_grpc_metric_exporter"), + semconv.ServerAddressKey.String(serverAddr), + semconv.ServerPortKey.Int(serverPort), + ), + }, + }, + }, + }, + { + Name: otelconv.SDKExporterMetricDataPointInflight{}.Name(), + Description: otelconv.SDKExporterMetricDataPointInflight{}.Description(), + Unit: otelconv.SDKExporterMetricDataPointInflight{}.Unit(), + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: false, + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet( + semconv.OTelComponentName(actualComponentName), + semconv.OTelComponentTypeKey.String("otlp_grpc_metric_exporter"), + semconv.ServerAddressKey.String(serverAddr), + semconv.ServerPortKey.Int(serverPort), + ), + }, + }, + }, + }, + { + Name: otelconv.SDKExporterOperationDuration{}.Name(), + Description: otelconv.SDKExporterOperationDuration{}.Description(), + Unit: otelconv.SDKExporterOperationDuration{}.Unit(), + Data: metricdata.Histogram[float64]{ + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.HistogramDataPoint[float64]{ + { + Attributes: attribute.NewSet( + semconv.OTelComponentName(actualComponentName), + semconv.OTelComponentTypeKey.String("otlp_grpc_metric_exporter"), + semconv.ServerAddressKey.String(serverAddr), + semconv.ServerPortKey.Int(serverPort), + ), + }, + }, + }, + }, + }, + } + + metricdatatest.AssertEqual(t, want, got.ScopeMetrics[0], + metricdatatest.IgnoreTimestamp(), + metricdatatest.IgnoreValue()) +} + +// parseEndpoint extracts server address and port from endpoint string. +func parseEndpoint(endpoint string) (string, int) { + host, portStr, err := net.SplitHostPort(endpoint) + if err != nil { + return "localhost", 4317 + } + + port, err := strconv.Atoi(portStr) + if err != nil { + port = 4317 + } + + return host, port +} + +// extractComponentName extracts the component name from metrics data to handle dynamic counter. +func extractComponentName(scopeMetrics metricdata.ScopeMetrics) string { + for _, m := range scopeMetrics.Metrics { + switch data := m.Data.(type) { + case metricdata.Sum[int64]: + if len(data.DataPoints) > 0 { + attrs := data.DataPoints[0].Attributes.ToSlice() + for _, attr := range attrs { + if attr.Key == semconv.OTelComponentNameKey { + return attr.Value.AsString() + } + } + } + case metricdata.Histogram[float64]: + if len(data.DataPoints) > 0 { + attrs := data.DataPoints[0].Attributes.ToSlice() + for _, attr := range attrs { + if attr.Key == semconv.OTelComponentNameKey { + return attr.Value.AsString() + } + } + } + } + } + return "" +} + +// createTestResourceMetrics creates sample metric data for testing. +func createTestResourceMetrics() *metricdata.ResourceMetrics { + now := time.Now() + return &metricdata.ResourceMetrics{ + Resource: resource.Default(), + ScopeMetrics: []metricdata.ScopeMetrics{ + { + Scope: instrumentation.Scope{Name: "test", Version: "v1"}, + Metrics: []metricdata.Metrics{ + { + Name: "test_gauge_int", + Data: metricdata.Gauge[int64]{ + DataPoints: []metricdata.DataPoint[int64]{ + {Value: 1, Time: now}, + {Value: 2, Time: now}, + }, + }, + }, + { + Name: "test_sum_float", + Data: metricdata.Sum[float64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[float64]{ + {Value: 3.5, Time: now}, + }, + }, + }, + { + Name: "test_histogram", + Data: metricdata.Histogram[float64]{ + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.HistogramDataPoint[float64]{ + {Count: 10, Sum: 5.0, Time: now}, + }, + }, + }, + }, + }, + }, + } +}