Skip to content
Closed
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .lycheeignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,6 @@ https://github.com/open-telemetry/opentelemetry-go/projects
https?:\/\/github\.com\/open-telemetry\/semantic-conventions\/archive\/refs\/tags\/[^.]+\.zip\[[^]]+]
file:///home/runner/work/opentelemetry-go/opentelemetry-go/libraries
file:///home/runner/work/opentelemetry-go/opentelemetry-go/manual
http://4.3.2.1:78/user/123
http://4.3.2.1:78/user/123
http://bar:8080/path
https://localhost:4317/
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- Add native histogram exemplar support in `go.opentelemetry.io/otel/exporters/prometheus`. (#6772)
- Add experimental self-observability log metrics in `go.opentelemetry.io/otel/sdk/log`.
Check the `go.opentelemetry.io/otel/sdk/log/internal/x` package documentation for more information. (#7121)
- Add experimental self-observability trace exporter metrics in `go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc`.
Check the `go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc/internal/x` package documentation for more information.(#7142)

### Changed

Expand Down
151 changes: 150 additions & 1 deletion exporters/otlp/otlptrace/otlptracegrpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,13 @@ package otlptracegrpc // import "go.opentelemetry.io/otel/exporters/otlp/otlptra
import (
"context"
"errors"
"fmt"
"net"
"net/url"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"

coltracepb "go.opentelemetry.io/proto/otlp/collector/trace/v1"
Expand All @@ -18,10 +24,16 @@ import (
"google.golang.org/grpc/status"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc/internal"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc/internal/otlpconfig"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc/internal/retry"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc/internal/x"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/sdk"
semconv "go.opentelemetry.io/otel/semconv/v1.36.0"
"go.opentelemetry.io/otel/semconv/v1.36.0/otelconv"
)

type client struct {
Expand All @@ -45,6 +57,12 @@ type client struct {
conn *grpc.ClientConn
tscMu sync.RWMutex
tsc coltracepb.TraceServiceClient

selfObservabilityEnabled bool
spanInFlightMetric otelconv.SDKExporterSpanInflight
spanExportedMetric otelconv.SDKExporterSpanExported
operationDurationMetric otelconv.SDKExporterOperationDuration
selfObservabilityAttrs []attribute.KeyValue
}

// Compile time check *client implements otlptrace.Client.
Expand Down Expand Up @@ -74,6 +92,8 @@ func newClient(opts ...Option) *client {
c.metadata = metadata.New(cfg.Traces.Headers)
}

c.initSelfObservability()

Comment on lines +95 to +96
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Flattening initSelfObservability seems appropriate. This is the only call site and this function is scoped to setup a new client, which includes telemetry.

return c
}

Expand All @@ -98,6 +118,10 @@ func (c *client) Start(context.Context) error {
c.tsc = coltracepb.NewTraceServiceClient(c.conn)
c.tscMu.Unlock()

if c.selfObservabilityEnabled {
c.selfObservabilityAttrs = append(c.selfObservabilityAttrs, getServerAttrs(c.conn.CanonicalTarget())...)
}

return nil
}

Expand Down Expand Up @@ -174,14 +198,55 @@ var errShutdown = errors.New("the client is shutdown")
//
// Retryable errors from the server will be handled according to any
// RetryConfig the client was created with.
func (c *client) UploadTraces(ctx context.Context, protoSpans []*tracepb.ResourceSpans) error {
func (c *client) UploadTraces(ctx context.Context, protoSpans []*tracepb.ResourceSpans) (err error) {
// Hold a read lock to ensure a shut down initiated after this starts does
// not abandon the export. This read lock acquire has less priority than a
// write lock acquire (i.e. Stop), meaning if the client is shutting down
// this will come after the shut down.
c.tscMu.RLock()
defer c.tscMu.RUnlock()

var start time.Time
var spanCount int
if c.selfObservabilityEnabled {
start = time.Now()
spanCount = 0
for _, ps := range protoSpans {
for _, ss := range ps.ScopeSpans {
spanCount += len(ss.Spans)
Comment on lines +215 to +216
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Handle nil values.

Suggested change
for _, ss := range ps.ScopeSpans {
spanCount += len(ss.Spans)
for _, ss := range ps.GetScopeSpans() {
spanCount += len(ss.GetSpans())

}
}
c.spanInFlightMetric.Add(context.Background(), int64(spanCount), c.selfObservabilityAttrs...)

defer func() {
duration := time.Since(start)
durationAttrs := make([]attribute.KeyValue, 0, len(c.selfObservabilityAttrs)+2)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is allocated every call. A pool should be used to amortize the slice allocation.

durationAttrs = append(durationAttrs, c.selfObservabilityAttrs...)
durationAttrs = append(durationAttrs,
c.operationDurationMetric.AttrRPCGRPCStatusCode(otelconv.RPCGRPCStatusCodeAttr(status.Code(err))))

exportedAttrs := make([]attribute.KeyValue, 0, len(c.selfObservabilityAttrs)+1)
exportedAttrs = append(exportedAttrs, c.selfObservabilityAttrs...)

if err != nil {
// Try to extract the underlying gRPC status error, if there is one
rootErr := err
if s, ok := status.FromError(err); ok {
rootErr = s.Err()
}
Comment on lines +223 to +236
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exact allocations can be made here for the cost of a few more branches which is worth it.

Suggested change
durationAttrs := make([]attribute.KeyValue, 0, len(c.selfObservabilityAttrs)+2)
durationAttrs = append(durationAttrs, c.selfObservabilityAttrs...)
durationAttrs = append(durationAttrs,
c.operationDurationMetric.AttrRPCGRPCStatusCode(otelconv.RPCGRPCStatusCodeAttr(status.Code(err))))
exportedAttrs := make([]attribute.KeyValue, 0, len(c.selfObservabilityAttrs)+1)
exportedAttrs = append(exportedAttrs, c.selfObservabilityAttrs...)
if err != nil {
// Try to extract the underlying gRPC status error, if there is one
rootErr := err
if s, ok := status.FromError(err); ok {
rootErr = s.Err()
}
rootErr := err
// Extract the underlying gRPC status error, if there is one.
if s, ok := status.FromError(err); ok {
rootErr = s.Err()
}
n := len(c.selfObservabilityAttrs)
var durationAttrs, exportedAttrs []attribute.KeyValue
if rootErr != nil {
durationAttrs = make([]attribute.KeyValue, n, n+2)
exportedAttrs = make([]attribute.KeyValue, n, n+1)
} else {
durationAttrs = make([]attribute.KeyValue, n, n+1)
exportedAttrs = make([]attribute.KeyValue, n, n)
}
_ = copy(durationAttrs, c.selfObservabilityAttrs)
scAttr := c.operationDurationMetric.AttrRPCGRPCStatusCode(otelconv.RPCGRPCStatusCodeAttr(status.Code(err)))
durationAttrs = append(durationAttrs, scAttr)
_ = copy(exportedAttrs, c.selfObservabilityAttrs)
if err != nil {

durationAttrs = append(durationAttrs, semconv.ErrorType(rootErr))
exportedAttrs = append(exportedAttrs, semconv.ErrorType(rootErr))
}

c.operationDurationMetric.Record(context.Background(), duration.Seconds(),
durationAttrs...,
)

c.spanExportedMetric.Add(context.Background(), int64(spanCount), exportedAttrs...)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't handle partial successes. If part of the batch was successful, this will add all exported spans as errored.

c.spanInFlightMetric.Add(context.Background(), int64(-spanCount), c.selfObservabilityAttrs...)
}()
}

if c.tsc == nil {
return errShutdown
}
Expand All @@ -201,6 +266,7 @@ func (c *client) UploadTraces(ctx context.Context, protoSpans []*tracepb.Resourc
otel.Handle(err)
}
}

// nil is converted to OK.
if status.Code(err) == codes.OK {
// Success.
Expand Down Expand Up @@ -298,3 +364,86 @@ func (c *client) MarshalLog() any {
Endpoint: c.endpoint,
}
}

func (c *client) initSelfObservability() {
if !x.SelfObservability.Enabled() {
return
}

c.selfObservabilityEnabled = true

c.selfObservabilityAttrs = []attribute.KeyValue{
semconv.OTelComponentName(fmt.Sprintf("%s/%d", otelconv.ComponentTypeOtlpGRPCSpanExporter, nextExporterID())),
semconv.OTelComponentTypeOtlpGRPCSpanExporter,
}

mp := otel.GetMeterProvider()
m := mp.Meter("go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc",
metric.WithInstrumentationVersion(sdk.Version()),
metric.WithSchemaURL(semconv.SchemaURL))
Comment on lines +381 to +383
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
m := mp.Meter("go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc",
metric.WithInstrumentationVersion(sdk.Version()),
metric.WithSchemaURL(semconv.SchemaURL))
m := mp.Meter(
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc",
metric.WithInstrumentationVersion(sdk.Version()),
metric.WithSchemaURL(semconv.SchemaURL),
)


var err error
if c.spanInFlightMetric, err = otelconv.NewSDKExporterSpanInflight(m); err != nil {
otel.Handle(err)
}
if c.spanExportedMetric, err = otelconv.NewSDKExporterSpanExported(m); err != nil {
otel.Handle(err)
}
if c.operationDurationMetric, err = otelconv.NewSDKExporterOperationDuration(m); err != nil {
otel.Handle(err)
}
}

func getServerAttrs(target string) []attribute.KeyValue {
if strings.HasPrefix(target, "unix://") {
path := strings.TrimPrefix(target, "unix://")
if path == "" {
return nil
}
return []attribute.KeyValue{semconv.ServerAddress(path)}
}

if strings.Contains(target, "://") {
u, err := url.Parse(target)
if err != nil || u.Scheme == "" {
return nil
}

// For gRPC targets like dns:///example.com:42 or dns://8.8.8.8/example.com:42,
// use u.Path trimmed of leading slash as host:port
target = strings.TrimPrefix(u.Path, "/")

// Fallback if path empty but host present
if target == "" && u.Host != "" {
target = u.Host
}
}

if target == "" {
return nil
}

// Target should now be of form host:port
host, pStr, err := net.SplitHostPort(target)
if err != nil {
return []attribute.KeyValue{semconv.ServerAddress(target)}
Comment on lines +398 to +429
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

target always has the form "<scheme>://[authority]/<endpoint>". Why not just use url.Parse to handle all schema values?

Suggested change
if strings.HasPrefix(target, "unix://") {
path := strings.TrimPrefix(target, "unix://")
if path == "" {
return nil
}
return []attribute.KeyValue{semconv.ServerAddress(path)}
}
if strings.Contains(target, "://") {
u, err := url.Parse(target)
if err != nil || u.Scheme == "" {
return nil
}
// For gRPC targets like dns:///example.com:42 or dns://8.8.8.8/example.com:42,
// use u.Path trimmed of leading slash as host:port
target = strings.TrimPrefix(u.Path, "/")
// Fallback if path empty but host present
if target == "" && u.Host != "" {
target = u.Host
}
}
if target == "" {
return nil
}
// Target should now be of form host:port
host, pStr, err := net.SplitHostPort(target)
if err != nil {
return []attribute.KeyValue{semconv.ServerAddress(target)}
u, err := url.Parse(target)
if err != nil {
return nil
}
if u.Host == "" {
return nil
}
// Target should now be of form host:port
host, pStr, err := net.SplitHostPort(u.Host)
if err != nil {
return []attribute.KeyValue{semconv.ServerAddress(u.Host)}

}

port, err := strconv.Atoi(pStr)
if err != nil {
return []attribute.KeyValue{semconv.ServerAddress(host)}
}

return []attribute.KeyValue{
semconv.ServerAddress(host),
semconv.ServerPort(port),
}
}

var exporterIDCounter atomic.Int64

// nextExporterID returns a new unique ID for an exporter.
// the starting value is 0, and it increments by 1 for each call.
Comment on lines +445 to +446
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit

Suggested change
// nextExporterID returns a new unique ID for an exporter.
// the starting value is 0, and it increments by 1 for each call.
// nextExporterID returns a monotonically increasing int64 starting at 0

func nextExporterID() int64 {
return exporterIDCounter.Add(1) - 1
}
Loading
Loading