Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions pkg/appolly/app/request/metric_attributes.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,22 @@ func ErrorType(val string) attribute.KeyValue {
return attribute.Key(attr.ErrorType).String(val)
}

func MessagingOperationName(val string) attribute.KeyValue {
return attribute.Key(attr.MessagingOpName).String(val)
}

func MessagingOperationType(val string) attribute.KeyValue {
return attribute.Key(attr.MessagingOpType).String(val)
}

func MessagingDestinationName(val string) attribute.KeyValue {
return attribute.Key(attr.MessagingDestination).String(val)
}

func MessagingMessageID(val string) attribute.KeyValue {
return attribute.Key(attr.MessagingMessageID).String(val)
}

func RPCSystem(val string) attribute.KeyValue {
return attribute.Key(attr.RPCSystem).String(val)
}
Expand All @@ -154,6 +166,10 @@ func AWSS3Key(val string) attribute.KeyValue {
return attribute.Key(attr.AWSS3Key).String(val)
}

func AWSSQSQueueURL(val string) attribute.KeyValue {
return attribute.Key(attr.AWSSQSQueueURL).String(val)
}

func CloudRegion(val string) attribute.KeyValue {
return attribute.Key(attr.CloudRegion).String(val)
}
Expand Down
56 changes: 46 additions & 10 deletions pkg/appolly/app/request/span.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ const (
HTTPSubtypeGraphQL = 1 // http + graphql
HTTPSubtypeElasticsearch = 2 // http + elasticsearch
HTTPSubtypeAWSS3 = 3 // http + aws s3
HTTPSubtypeAWSSQS = 4 // http + aws sqs
)

//nolint:cyclop
Expand Down Expand Up @@ -185,15 +186,30 @@ type Elasticsearch struct {
type AWS struct {
// https://opentelemetry.io/docs/specs/semconv/object-stores/s3/
S3 AWSS3 `json:"s3"`
// https://opentelemetry.io/docs/specs/semconv/messaging/sqs/
SQS AWSSQS `json:"sqs"`
}

type AWSS3 struct {
type AWSMeta struct {
RequestID string `json:"requestId"`
ExtendedRequestID string `json:"extendedRequestId"`
Region string `json:"region"`
Method string `json:"method"`
Bucket string `json:"bucket"`
Key string `json:"key"`
}

type AWSS3 struct {
Meta AWSMeta `json:"meta"`
Method string `json:"method"`
Bucket string `json:"bucket"`
Key string `json:"key"`
}

type AWSSQS struct {
Meta AWSMeta `json:"meta"`
OperationName string `json:"operationName"`
OperationType string `json:"operationType"`
Destination string `json:"destination"`
QueueURL string `json:"queueUrl"`
MessageID string `json:"messageId"`
}

// Span contains the information being submitted by the following nodes in the graph.
Expand Down Expand Up @@ -290,12 +306,24 @@ func spanAttributes(s *Span) SpanAttributes {
attrs["dbQueryText"] = s.Elasticsearch.DBQueryText
}
if s.SubType == HTTPSubtypeAWSS3 && s.AWS != nil {
attrs["awsRequestID"] = s.AWS.S3.RequestID
attrs["awsExtendedRequestID"] = s.AWS.S3.ExtendedRequestID
attrs["awsRegion"] = s.AWS.S3.Region
attrs["awsS3Method"] = s.AWS.S3.Method
attrs["awsS3Bucket"] = s.AWS.S3.Bucket
attrs["awsS3Key"] = s.AWS.S3.Key
s3 := s.AWS.S3
attrs["awsRequestID"] = s3.Meta.RequestID
attrs["awsExtendedRequestID"] = s3.Meta.ExtendedRequestID
attrs["awsRegion"] = s3.Meta.Region
attrs["awsS3Method"] = s3.Method
attrs["awsS3Bucket"] = s3.Bucket
attrs["awsS3Key"] = s3.Key
}
if s.SubType == HTTPSubtypeAWSSQS && s.AWS != nil {
sqs := s.AWS.SQS
attrs["awsRequestID"] = sqs.Meta.RequestID
attrs["awsExtendedRequestID"] = sqs.Meta.ExtendedRequestID
attrs["awsRegion"] = sqs.Meta.Region
attrs["awsSQSOperationName"] = sqs.OperationName
attrs["awsSQSOperationType"] = sqs.OperationType
attrs["awsSQSDestination"] = sqs.Destination
attrs["awsSQSQueueURL"] = sqs.QueueURL
attrs["awsSQSMessageID"] = sqs.MessageID
}
return attrs
case EventTypeGRPC:
Expand Down Expand Up @@ -656,6 +684,14 @@ func (s *Span) TraceName() string {
}
}

if s.Type == EventTypeHTTPClient && s.SubType == HTTPSubtypeAWSSQS && s.AWS != nil {
if s.AWS.SQS.OperationName != "" {
return "sqs." + s.AWS.SQS.OperationName
} else {
return "sqs.Operation"
}
}

name := s.Method
if s.Route != "" {
name += " " + s.Route
Expand Down
40 changes: 37 additions & 3 deletions pkg/appolly/app/request/span_getters.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,15 +124,42 @@ func spanOTELGetters(name attr.Name) (attributes.Getter[*Span, attribute.KeyValu
if span.Type == EventTypeKafkaClient || span.Type == EventTypeKafkaServer {
return semconv.MessagingSystem("kafka")
}
if span.Type == EventTypeHTTPClient && span.SubType == HTTPSubtypeAWSSQS && span.AWS != nil {
return semconv.MessagingSystem("aws-sqs")
}
return semconv.MessagingSystem("unknown")
}
case attr.MessagingDestination:
getter = func(span *Span) attribute.KeyValue {
if span.Type == EventTypeKafkaClient || span.Type == EventTypeKafkaServer {
return semconv.MessagingDestinationName(span.Path)
}
if span.Type == EventTypeHTTPClient && span.SubType == HTTPSubtypeAWSSQS && span.AWS != nil {
return semconv.MessagingDestinationName(span.AWS.SQS.Destination)
}
return semconv.MessagingDestinationName("")
}
case attr.MessagingOpName:
getter = func(span *Span) attribute.KeyValue {
if span.Type == EventTypeHTTPClient && span.SubType == HTTPSubtypeAWSSQS && span.AWS != nil {
return MessagingOperationName(span.AWS.SQS.OperationName)
}
return MessagingOperationName("")
}
case attr.MessagingOpType:
getter = func(span *Span) attribute.KeyValue {
if span.Type == EventTypeHTTPClient && span.SubType == HTTPSubtypeAWSSQS && span.AWS != nil {
return MessagingOperationType(span.AWS.SQS.OperationType)
}
return MessagingOperationType("")
}
case attr.MessagingMessageID:
getter = func(span *Span) attribute.KeyValue {
if span.Type == EventTypeHTTPClient && span.SubType == HTTPSubtypeAWSSQS && span.AWS != nil {
return MessagingMessageID(span.AWS.SQS.MessageID)
}
return MessagingMessageID("")
}
case attr.CudaKernelName:
getter = func(span *Span) attribute.KeyValue { return CudaKernel(span.Method) }
case attr.CudaMemcpyKind:
Expand Down Expand Up @@ -186,14 +213,14 @@ func spanOTELGetters(name attr.Name) (attributes.Getter[*Span, attribute.KeyValu
case attr.AWSRequestID:
getter = func(s *Span) attribute.KeyValue {
if s.Type == EventTypeHTTPClient && s.SubType == HTTPSubtypeAWSS3 && s.AWS != nil {
return AWSRequestID(s.AWS.S3.RequestID)
return AWSRequestID(s.AWS.S3.Meta.RequestID)
}
return AWSRequestID("")
}
case attr.AWSExtendedRequestID:
getter = func(s *Span) attribute.KeyValue {
if s.Type == EventTypeHTTPClient && s.SubType == HTTPSubtypeAWSS3 && s.AWS != nil {
return AWSExtendedRequestID(s.AWS.S3.ExtendedRequestID)
return AWSExtendedRequestID(s.AWS.S3.Meta.ExtendedRequestID)
}
return AWSExtendedRequestID("")
}
Expand All @@ -211,10 +238,17 @@ func spanOTELGetters(name attr.Name) (attributes.Getter[*Span, attribute.KeyValu
}
return AWSS3Key("")
}
case attr.AWSSQSQueueURL:
getter = func(s *Span) attribute.KeyValue {
if s.Type == EventTypeHTTPClient && s.SubType == HTTPSubtypeAWSSQS && s.AWS != nil {
return AWSSQSQueueURL(s.AWS.SQS.QueueURL)
}
return AWSSQSQueueURL("")
}
case attr.CloudRegion:
getter = func(s *Span) attribute.KeyValue {
if s.Type == EventTypeHTTPClient && s.SubType == HTTPSubtypeAWSS3 && s.AWS != nil {
return CloudRegion(s.AWS.S3.Region)
return CloudRegion(s.AWS.S3.Meta.Region)
}
return CloudRegion("")
}
Expand Down
52 changes: 52 additions & 0 deletions pkg/ebpf/common/http/aws_common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package ebpfcommon

import (
"errors"
"net/http"
"regexp"
"strings"

"go.opentelemetry.io/obi/pkg/appolly/app/request"
)

var (
requestIDHeader = "x-amz-requestid"
requestIDHeader2 = "x-amz-request-id"
requestIDHeader3 = "x-amzn-requestid"
requestIDHeader4 = "x-amzn-request-id"
extendedRequestIDHeader = "x-amz-id-2"
)

var awsRegionRgx = regexp.MustCompile(`(?:^|\.)([a-z]{2}-[a-z]+-\d)\.amazonaws\.com$`)

func parseAWSMeta(req *http.Request, resp *http.Response) (request.AWSMeta, error) {
meta := request.AWSMeta{}

for k, v := range resp.Header {
lk := strings.ToLower(k)
if lk == requestIDHeader || lk == requestIDHeader2 || lk == requestIDHeader3 || lk == requestIDHeader4 {
meta.RequestID = v[0]
}
if lk == extendedRequestIDHeader {
meta.ExtendedRequestID = v[0]
}
}
if meta.RequestID == "" {
return meta, errors.New("missing x-amz-request-id header")
}

meta.Region = parseAWSRegion(req)

return meta, nil
}

func parseAWSRegion(req *http.Request) string {
match := awsRegionRgx.FindStringSubmatch(req.URL.Host)
if len(match) >= 2 {
return match[1]
}
return ""
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,12 @@ package ebpfcommon
import (
"bytes"
"errors"
"fmt"
"io"
"net/http"
"regexp"
"strings"

"go.opentelemetry.io/obi/pkg/appolly/app/request"
)

const (
requestIDHeader = "x-amz-request-id"
requestIDHeader2 = "x-amz-requestid"
extendedRequestIDHeader = "x-amz-id-2"
)

var awsRegionRgx = regexp.MustCompile(`(?:^|\.)([a-z]{2}-[a-z]+-\d)\.amazonaws\.com$`)

func AWSS3Span(baseSpan *request.Span, req *http.Request, resp *http.Response) (request.Span, bool) {
s3, err := parseAWSS3(req, resp)
if err != nil {
Expand All @@ -41,34 +30,19 @@ func AWSS3Span(baseSpan *request.Span, req *http.Request, resp *http.Response) (
func parseAWSS3(req *http.Request, resp *http.Response) (request.AWSS3, error) {
s3 := request.AWSS3{}

reqB, err := io.ReadAll(req.Body)
if err != nil {
return s3, fmt.Errorf("read S3 request body: %w", err)
}
req.Body = io.NopCloser(bytes.NewBuffer(reqB))

respB, err := io.ReadAll(resp.Body)
var err error
s3.Meta, err = parseAWSMeta(req, resp)
if err != nil {
return s3, fmt.Errorf("read S3 response body: %w", err)
}
resp.Body = io.NopCloser(bytes.NewBuffer(respB))

for k, v := range resp.Header {
lk := strings.ToLower(k)
if lk == requestIDHeader || lk == requestIDHeader2 {
s3.RequestID = v[0]
}
if lk == extendedRequestIDHeader {
s3.ExtendedRequestID = v[0]
}
return s3, err
}
if s3.RequestID == "" {
return s3, errors.New("missing x-amz-request-id header")
if s3.Meta.ExtendedRequestID == "" {
return s3, errors.New("missing x-amz-id-2 header")
}

s3.Bucket, s3.Key = parseS3bucketKey(req.URL.Path)
s3.Region = parseAWSRegion(req)
s3.Method = inferS3Method(req)
if s3.Method == "" {
return s3, errors.New("unable to parse s3 operation")
}

return s3, nil
}
Expand All @@ -86,14 +60,6 @@ func parseS3bucketKey(path string) (string, string) {
return bucket, key
}

func parseAWSRegion(req *http.Request) string {
match := awsRegionRgx.FindStringSubmatch(req.URL.Host)
if len(match) >= 2 {
return match[1]
}
return ""
}

// This is a naive inference of S3 operations based on HTTP method and URL path/query
func inferS3Method(req *http.Request) string {
q := req.URL.Query()
Expand Down
Loading