diff --git a/go.mod b/go.mod index ae8d256194d..2107499ac27 100644 --- a/go.mod +++ b/go.mod @@ -59,6 +59,7 @@ require ( go.opentelemetry.io/otel/metric v1.38.0 go.opentelemetry.io/otel/sdk v1.38.0 go.opentelemetry.io/otel/sdk/metric v1.38.0 + go.opentelemetry.io/otel/trace v1.38.0 go.opentelemetry.io/proto/otlp v1.9.0 go.uber.org/zap v1.27.0 golang.org/x/exp v0.0.0-20250718183923-645b1fa84792 @@ -290,7 +291,6 @@ require ( go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.62.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.37.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.34.0 // indirect - go.opentelemetry.io/otel/trace v1.38.0 // indirect go.uber.org/multierr v1.11.0 // indirect go.yaml.in/yaml/v2 v2.4.3 // indirect go.yaml.in/yaml/v3 v3.0.4 // indirect diff --git a/internal/admin/console/api.go b/internal/admin/console/api.go index 22a101dad05..fa2d4edba19 100644 --- a/internal/admin/console/api.go +++ b/internal/admin/console/api.go @@ -260,14 +260,14 @@ func (h *Handler) loadConfigDump() ConfigDumpInfo { if h.providerResources != nil { // Load controller resources directly from the provider resources - controllerResources := h.providerResources.GatewayAPIResources.LoadAll() + controllerResourcesContext := h.providerResources.GatewayAPIResources.LoadAll() - for _, resources := range controllerResources { - if resources == nil { + for _, resourcesContext := range controllerResourcesContext { + if resourcesContext == nil { continue } - for _, res := range *resources { + for _, res := range *resourcesContext.Resources { if res == nil { continue } diff --git a/internal/admin/console/api_test.go b/internal/admin/console/api_test.go index 539dbb7195a..ee353c339dd 100644 --- a/internal/admin/console/api_test.go +++ b/internal/admin/console/api_test.go @@ -151,7 +151,7 @@ func TestLoadConfigDumpWithData(t *testing.T) { // This test focuses on the basic functionality providerRes := &message.ProviderResources{} // Initialize empty watchable map - providerRes.GatewayAPIResources = watchable.Map[string, *resource.ControllerResources]{} + providerRes.GatewayAPIResources = watchable.Map[string, *resource.ControllerResourcesContext]{} // Skip storing to avoid watchable copy issues // providerResources.Store("test", providerRes) diff --git a/internal/cmd/server.go b/internal/cmd/server.go index 5498ddaa3cd..5582dab2666 100644 --- a/internal/cmd/server.go +++ b/internal/cmd/server.go @@ -24,6 +24,7 @@ import ( "github.com/envoyproxy/gateway/internal/message" "github.com/envoyproxy/gateway/internal/metrics" providerrunner "github.com/envoyproxy/gateway/internal/provider/runner" + "github.com/envoyproxy/gateway/internal/traces" xdsrunner "github.com/envoyproxy/gateway/internal/xds/runner" ) @@ -156,6 +157,10 @@ func startRunners(ctx context.Context, cfg *config.Server) (err error) { runners := []struct { runner Runner }{ + { + // Start the Traces Server + runner: traces.New(cfg), + }, { // Start the Provider Service // It fetches the resources from the configured provider type diff --git a/internal/gatewayapi/resource/resource.go b/internal/gatewayapi/resource/resource.go index f06fbfe2915..d66db3c5bfd 100644 --- a/internal/gatewayapi/resource/resource.go +++ b/internal/gatewayapi/resource/resource.go @@ -6,6 +6,8 @@ package resource import ( + "context" + "reflect" "sort" certificatesv1b1 "k8s.io/api/certificates/v1beta1" @@ -208,6 +210,47 @@ func (r *Resources) GetEndpointSlicesForBackend(svcNamespace, svcName, backendKi // ControllerResources holds all the GatewayAPI resources per GatewayClass type ControllerResources []*Resources +// ControllerResourcesContext wraps ControllerResources with trace context +// for propagating spans across async message boundaries +type ControllerResourcesContext struct { + Resources *ControllerResources + Context context.Context +} + +// DeepCopy creates a new ControllerResourcesContext. +// The Context field is preserved (not deep copied) since contexts are meant to be passed around. +func (c *ControllerResourcesContext) DeepCopy() *ControllerResourcesContext { + if c == nil { + return nil + } + var resourcesCopy *ControllerResources + if c.Resources != nil { + resourcesCopy = c.Resources.DeepCopy() + } + return &ControllerResourcesContext{ + Resources: resourcesCopy, + Context: c.Context, + } +} + +// Equal compares two Resources objects for equality. +func (c *ControllerResourcesContext) Equal(other *ControllerResourcesContext) bool { + if c == nil && other == nil { + return true + } + if c == nil || other == nil { + return false + } + if c.Resources == nil && other.Resources == nil { + return true + } + if c.Resources == nil || other.Resources == nil { + return false + } + + return reflect.DeepEqual(c.Resources, other.Resources) +} + // DeepCopy creates a new ControllerResources. // It is handwritten since the tooling was unable to copy into a new slice func (c *ControllerResources) DeepCopy() *ControllerResources { diff --git a/internal/gatewayapi/resource/resource_test.go b/internal/gatewayapi/resource/resource_test.go index 1db2e0f6071..823a08b4914 100644 --- a/internal/gatewayapi/resource/resource_test.go +++ b/internal/gatewayapi/resource/resource_test.go @@ -6,6 +6,7 @@ package resource import ( + "context" "testing" "github.com/google/go-cmp/cmp" @@ -130,6 +131,38 @@ func TestEqualXds(t *testing.T) { } } +func TestEqualControllerResourcesContext(t *testing.T) { + c1 := context.Background() + c2 := context.TODO() + r1 := &ControllerResourcesContext{ + Resources: &ControllerResources{ + { + GatewayClass: &gwapiv1.GatewayClass{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + }, + }, + }, + }, + Context: c1, + } + r2 := &ControllerResourcesContext{ + Resources: &ControllerResources{ + { + GatewayClass: &gwapiv1.GatewayClass{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + }, + }, + }, + }, + Context: c2, + } + + assert.True(t, r1.Equal(r2)) + assert.True(t, r2.Equal(r1)) +} + func TestGetEndpointSlicesForBackendDualStack(t *testing.T) { // Test data setup dualStackService := &discoveryv1.EndpointSlice{ @@ -205,3 +238,141 @@ func TestGetEndpointSlicesForBackendDualStack(t *testing.T) { } }) } + +func TestControllerResourcesContextDeepCopy(t *testing.T) { + tests := []struct { + name string + ctx *ControllerResourcesContext + }{ + { + name: "nil context", + ctx: nil, + }, + { + name: "empty context", + ctx: &ControllerResourcesContext{ + Resources: &ControllerResources{}, + Context: context.Background(), + }, + }, + { + name: "context with resources", + ctx: &ControllerResourcesContext{ + Resources: &ControllerResources{ + { + GatewayClass: &gwapiv1.GatewayClass{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-gateway-class", + }, + }, + }, + }, + Context: context.Background(), + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + copied := tc.ctx.DeepCopy() + + if tc.ctx == nil { + assert.Nil(t, copied) + return + } + + // Verify the copy is not nil + require.NotNil(t, copied) + + // Verify the copy is a different object + assert.NotSame(t, tc.ctx, copied) + + // Verify Resources are deep copied + if tc.ctx.Resources != nil { + require.NotNil(t, copied.Resources) + assert.NotSame(t, tc.ctx.Resources, copied.Resources) + + // Verify the contents are equal + assert.Len(t, *copied.Resources, len(*tc.ctx.Resources)) + } + + // Verify Context is preserved (not deep copied, same reference) + assert.Equal(t, tc.ctx.Context, copied.Context) + }) + } +} + +func TestControllerResourcesDeepCopy(t *testing.T) { + tests := []struct { + name string + resources *ControllerResources + }{ + { + name: "nil resources", + resources: nil, + }, + { + name: "empty resources", + resources: &ControllerResources{}, + }, + { + name: "resources with gateway class", + resources: &ControllerResources{ + { + GatewayClass: &gwapiv1.GatewayClass{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-gateway-class", + }, + }, + }, + }, + }, + { + name: "multiple resources", + resources: &ControllerResources{ + { + GatewayClass: &gwapiv1.GatewayClass{ + ObjectMeta: metav1.ObjectMeta{ + Name: "gateway-class-1", + }, + }, + }, + { + GatewayClass: &gwapiv1.GatewayClass{ + ObjectMeta: metav1.ObjectMeta{ + Name: "gateway-class-2", + }, + }, + }, + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + copied := tc.resources.DeepCopy() + + if tc.resources == nil { + assert.Nil(t, copied) + return + } + + // Verify the copy is not nil + require.NotNil(t, copied) + + // Verify the copy is a different object + assert.NotSame(t, tc.resources, copied) + + // Verify the length is the same + assert.Len(t, *copied, len(*tc.resources)) + + // Verify each resource is deep copied + for i := range *tc.resources { + if (*tc.resources)[i] != nil { + require.NotNil(t, (*copied)[i]) + assert.NotSame(t, (*tc.resources)[i], (*copied)[i]) + } + } + }) + } +} diff --git a/internal/gatewayapi/runner/runner.go b/internal/gatewayapi/runner/runner.go index f3852f773d6..35a5c8fd33c 100644 --- a/internal/gatewayapi/runner/runner.go +++ b/internal/gatewayapi/runner/runner.go @@ -16,6 +16,9 @@ import ( "github.com/docker/docker/pkg/fileutils" "github.com/telepresenceio/watchable" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" kerrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" @@ -46,6 +49,8 @@ const ( hmacSecretKey = "hmac-secret" ) +var tracer = otel.Tracer("envoy-gateway/gateway-api") + type Config struct { config.Server ProviderResources *message.ProviderResources @@ -122,18 +127,38 @@ func (r *Runner) startWasmCache(ctx context.Context) { r.wasmCache.Start(ctx) } -func (r *Runner) subscribeAndTranslate(sub <-chan watchable.Snapshot[string, *resource.ControllerResources]) { +func (r *Runner) subscribeAndTranslate(sub <-chan watchable.Snapshot[string, *resource.ControllerResourcesContext]) { message.HandleSubscription(message.Metadata{Runner: r.Name(), Message: message.ProviderResourcesMessageName}, sub, - func(update message.Update[string, *resource.ControllerResources], errChan chan error) { - r.Logger.Info("received an update", "key", update.Key) - val := update.Value + func(update message.Update[string, *resource.ControllerResourcesContext], errChan chan error) { + parentCtx := context.Background() + if update.Value != nil && update.Value.Context != nil { + parentCtx = update.Value.Context + } + + traceCtx, span := tracer.Start(parentCtx, "GatewayApiRunner.subscribeAndTranslate") + defer span.End() + traceLogger := r.Logger.WithTrace(traceCtx) + traceLogger.Info("received an update", "key", update.Key) + + valWrapper := update.Value // There is only 1 key which is the controller name // so when a delete is triggered, delete all keys - if update.Delete || val == nil { + if update.Delete || valWrapper == nil || valWrapper.Resources == nil { r.deleteAllKeys() return } + val := valWrapper.Resources + + // Add span attributes for observability + span.SetAttributes( + attribute.String("controller.key", update.Key), + attribute.Bool("update.delete", update.Delete), + ) + if val != nil { + span.SetAttributes(attribute.Int("resources.count", len(*val))) + } + // Initialize keysToDelete with tracked keys (mark and sweep approach) keysToDelete := r.keyCache.copy() @@ -143,6 +168,7 @@ func (r *Runner) subscribeAndTranslate(sub <-chan watchable.Snapshot[string, *re var backendTLSPolicyStatusCount, clientTrafficPolicyStatusCount, backendTrafficPolicyStatusCount int var securityPolicyStatusCount, envoyExtensionPolicyStatusCount, backendStatusCount, extensionServerPolicyStatusCount int + span.AddEvent("translate", trace.WithAttributes(attribute.Int("resources.count", len(*val)))) for _, resources := range *val { // Translate and publish IRs. t := &gatewayapi.Translator{ @@ -156,7 +182,7 @@ func (r *Runner) subscribeAndTranslate(sub <-chan watchable.Snapshot[string, *re MergeGateways: gatewayapi.IsMergeGatewaysEnabled(resources), WasmCache: r.wasmCache, RunningOnHost: r.EnvoyGateway.Provider != nil && r.EnvoyGateway.Provider.IsRunningOnHost(), - Logger: r.Logger, + Logger: traceLogger, } // If an extension is loaded, pass its supported groups/kinds to the translator @@ -170,24 +196,26 @@ func (r *Runner) subscribeAndTranslate(sub <-chan watchable.Snapshot[string, *re extGKs = append(extGKs, schema.GroupKind{Group: gvk.Group, Kind: gvk.Kind}) } t.ExtensionGroupKinds = extGKs - r.Logger.Info("extension resources", "GVKs count", len(extGKs)) + traceLogger.Info("extension resources", "GVKs count", len(extGKs)) } // Translate to IR + _, translateToIRSpan := tracer.Start(traceCtx, "GatewayApiRunner.ResoureTranslationCycle.TranslateToIR") result, err := t.Translate(resources) + translateToIRSpan.End() if err != nil { // Currently all errors that Translate returns should just be logged - r.Logger.Error(err, "errors detected during translation", "gateway-class", resources.GatewayClass.Name) + traceLogger.Error(err, "errors detected during translation", "gateway-class", resources.GatewayClass.Name) } // Publish the IRs. // Also validate the ir before sending it. for key, val := range result.InfraIR { - logger := r.Logger.V(1).WithValues(string(message.InfraIRMessageName), key) - if logger.Enabled() { - logger.Info(val.JSONString()) + logV := traceLogger.V(1).WithValues(string(message.InfraIRMessageName), key) + if logV.Enabled() { + logV.Info(val.JSONString()) } if err := val.Validate(); err != nil { - r.Logger.Error(err, "unable to validate infra ir, skipped sending it") + traceLogger.Error(err, "unable to validate infra ir, skipped sending it") errChan <- err } else { r.InfraIR.Store(key, val) @@ -199,20 +227,25 @@ func (r *Runner) subscribeAndTranslate(sub <-chan watchable.Snapshot[string, *re } for key, val := range result.XdsIR { - logger := r.Logger.V(1).WithValues(string(message.XDSIRMessageName), key) - if logger.Enabled() { - logger.Info(val.JSONString()) + logV := traceLogger.V(1).WithValues(string(message.XDSIRMessageName), key) + if logV.Enabled() { + logV.Info(val.JSONString()) } if err := val.Validate(); err != nil { - r.Logger.Error(err, "unable to validate xds ir, skipped sending it") + traceLogger.Error(err, "unable to validate xds ir, skipped sending it") errChan <- err } else { - r.XdsIR.Store(key, val) + m := message.XdsIRWithContext{ + XdsIR: val, + Context: traceCtx, + } + r.XdsIR.Store(key, &m) xdsIRCount++ } } // Update Status + _, statusUpdateSpan := tracer.Start(traceCtx, "GatewayApiRunner.ResoureTranslationCycle.UpdateStatus") if result.GatewayClass != nil { key := utils.NamespacedName(result.GatewayClass) r.ProviderResources.GatewayClassStatuses.Store(key, &result.GatewayClass.Status) @@ -335,6 +368,7 @@ func (r *Runner) subscribeAndTranslate(sub <-chan watchable.Snapshot[string, *re delete(keysToDelete.ExtensionServerPolicyStatus, key) r.keyCache.ExtensionServerPolicyStatus[key] = true } + statusUpdateSpan.End() } // Publish aggregated metrics diff --git a/internal/gatewayapi/runner/runner_test.go b/internal/gatewayapi/runner/runner_test.go index fc1093a431a..50d8f73664d 100644 --- a/internal/gatewayapi/runner/runner_test.go +++ b/internal/gatewayapi/runner/runner_test.go @@ -51,7 +51,7 @@ func TestRunner(t *testing.T) { require.NoError(t, err) // IR is nil at start - require.Equal(t, map[string]*ir.Xds{}, xdsIR.LoadAll()) + require.Equal(t, map[string]*message.XdsIRWithContext{}, xdsIR.LoadAll()) require.Equal(t, map[string]*ir.Infra{}, infraIR.LoadAll()) // TODO: pass valid provider resources @@ -64,7 +64,7 @@ func TestRunner(t *testing.T) { return false } // Ensure ir is empty - return (reflect.DeepEqual(xdsIR.LoadAll(), map[string]*ir.Xds{})) && (reflect.DeepEqual(infraIR.LoadAll(), map[string]*ir.Infra{})) + return (reflect.DeepEqual(xdsIR.LoadAll(), map[string]*message.XdsIRWithContext{})) && (reflect.DeepEqual(infraIR.LoadAll(), map[string]*ir.Infra{})) }, time.Second*1, time.Millisecond*20) } diff --git a/internal/globalratelimit/runner/runner.go b/internal/globalratelimit/runner/runner.go index e91efaca763..27d8119cfe7 100644 --- a/internal/globalratelimit/runner/runner.go +++ b/internal/globalratelimit/runner/runner.go @@ -20,6 +20,8 @@ import ( resourcev3 "github.com/envoyproxy/go-control-plane/pkg/resource/v3" serverv3 "github.com/envoyproxy/go-control-plane/pkg/server/v3" "github.com/telepresenceio/watchable" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" "google.golang.org/grpc" "google.golang.org/grpc/credentials" @@ -47,6 +49,8 @@ const ( rateLimitTLSCACertFilepath = "/certs/ca.crt" ) +var tracer = otel.Tracer("envoy-gateway/global-rate-limit/runner") + type Config struct { config.Server XdsIR *message.XdsIR @@ -132,20 +136,34 @@ func buildXDSResourceFromCache(rateLimitConfigsCache map[string][]cachetype.Reso return xdsResourcesToUpdate } -func (r *Runner) translateFromSubscription(ctx context.Context, c <-chan watchable.Snapshot[string, *ir.Xds]) { +func (r *Runner) translateFromSubscription(ctx context.Context, c <-chan watchable.Snapshot[string, *message.XdsIRWithContext]) { // rateLimitConfigsCache is a cache of the rate limit config, which is keyed by the xdsIR key. rateLimitConfigsCache := map[string][]cachetype.Resource{} message.HandleSubscription(message.Metadata{Runner: r.Name(), Message: message.XDSIRMessageName}, c, - func(update message.Update[string, *ir.Xds], errChan chan error) { - r.Logger.Info("received a notification") + func(update message.Update[string, *message.XdsIRWithContext], errChan chan error) { + parentCtx := ctx + if update.Value != nil && update.Value.Context != nil { + parentCtx = update.Value.Context + } + + traceCtx, span := tracer.Start(parentCtx, "GlobalRateLimitRunner.translateFromSubscription") + defer span.End() + + traceLogger := r.Logger.WithTrace(traceCtx) + traceLogger.Info("received a notification") + + span.SetAttributes( + attribute.String("xds-ir.key", update.Key), + attribute.Bool("update.delete", update.Delete), + ) if update.Delete { delete(rateLimitConfigsCache, update.Key) - r.updateSnapshot(ctx, buildXDSResourceFromCache(rateLimitConfigsCache)) + r.updateSnapshot(traceCtx, buildXDSResourceFromCache(rateLimitConfigsCache)) } else { // Translate to ratelimit xDS Config. - rvt, err := r.translate(update.Value) + rvt, err := r.translate(update.Value.XdsIR) if err != nil { r.Logger.Error(err, "failed to translate an updated xds-ir to ratelimit xDS Config") errChan <- err @@ -155,7 +173,7 @@ func (r *Runner) translateFromSubscription(ctx context.Context, c <-chan watchab if rvt != nil { // Build XdsResources to use for the snapshot update from the cache. rateLimitConfigsCache[update.Key] = rvt.XdsResources[resourcev3.RateLimitConfigType] - r.updateSnapshot(ctx, buildXDSResourceFromCache(rateLimitConfigsCache)) + r.updateSnapshot(traceCtx, buildXDSResourceFromCache(rateLimitConfigsCache)) } } }, @@ -183,6 +201,9 @@ func (r *Runner) translate(xdsIR *ir.Xds) (*types.ResourceVersionTable, error) { } func (r *Runner) updateSnapshot(ctx context.Context, resource types.XdsResources) { + _, span := tracer.Start(ctx, "GlobalRateLimitRunner.updateSnapshot") + defer span.End() + if r.cache == nil { r.Logger.Error(nil, "failed to init the snapshot cache") return diff --git a/internal/globalratelimit/runner/runner_test.go b/internal/globalratelimit/runner/runner_test.go index 932131fc70f..848567143c2 100644 --- a/internal/globalratelimit/runner/runner_test.go +++ b/internal/globalratelimit/runner/runner_test.go @@ -230,7 +230,11 @@ func Test_subscribeAndTranslate(t *testing.T) { xdsIR.Delete(xds.Key) continue } - xdsIR.Store(xds.Key, xds.Value) + m := message.XdsIRWithContext{ + XdsIR: xds.Value, + Context: context.Background(), + } + xdsIR.Store(xds.Key, &m) } diff := "" diff --git a/internal/logging/log.go b/internal/logging/log.go index 48d3b0d5ac2..27b02a01571 100644 --- a/internal/logging/log.go +++ b/internal/logging/log.go @@ -6,11 +6,13 @@ package logging import ( + "context" "io" "os" "github.com/go-logr/logr" "github.com/go-logr/zapr" + "go.opentelemetry.io/otel/trace" "go.uber.org/zap" "go.uber.org/zap/zapcore" @@ -88,6 +90,27 @@ func (l Logger) WithValues(keysAndValues ...interface{}) Logger { return l } +// WithTrace returns a new Logger that includes basic OpenTelemetry metadata +// extracted from the provided context. If the context does not contain a valid +// span, the original Logger is returned unchanged. +func (l Logger) WithTrace(ctx context.Context) Logger { + sc := trace.SpanContextFromContext(ctx) + if !sc.IsValid() { + return l + } + + fields := []interface{}{ + "trace_id", sc.TraceID().String(), + "span_id", sc.SpanID().String(), + } + + if ts := sc.TraceState(); ts.Len() > 0 { + fields = append(fields, "trace_state", ts.String()) + } + + return l.WithValues(fields...) +} + // A Sugar wraps the base Logger functionality in a slower, but less // verbose, API. Any Logger can be converted to a SugaredLogger with its Sugar // method. diff --git a/internal/logging/log_test.go b/internal/logging/log_test.go index a5f75e43816..cb133eb53d3 100644 --- a/internal/logging/log_test.go +++ b/internal/logging/log_test.go @@ -6,12 +6,15 @@ package logging import ( + "bytes" + "context" "errors" "os" "testing" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/trace" "go.uber.org/zap" "go.uber.org/zap/zapcore" @@ -104,3 +107,24 @@ func TestLoggerSugarName(t *testing.T) { capturedOutput := string(outputBytes) assert.Contains(t, capturedOutput, "debugging message", logName) } + +func TestLoggerWithTrace(t *testing.T) { + buffer := &bytes.Buffer{} + logger := NewLogger(buffer, egv1a1.DefaultEnvoyGatewayLogging()) + + traceID := trace.TraceID{0xde, 0xad, 0xbe, 0xef, 0xca, 0xfe, 0xba, 0xbe, 0xfa, 0xce, 0xb0, 0x0c, 0x12, 0x34, 0x56, 0x78} + spanID := trace.SpanID{0xba, 0xad, 0xf0, 0x0d, 0xfe, 0xed, 0xfa, 0xce} + sc := trace.NewSpanContext(trace.SpanContextConfig{ + TraceID: traceID, + SpanID: spanID, + TraceFlags: trace.FlagsSampled, + }) + ctx := trace.ContextWithSpanContext(context.Background(), sc) + + logger.WithTrace(ctx).Info("hello tracing") + + output := buffer.String() + assert.Contains(t, output, traceID.String()) + assert.Contains(t, output, spanID.String()) + assert.Contains(t, output, trace.FlagsSampled.String()) +} diff --git a/internal/message/types.go b/internal/message/types.go index f2ad704c807..1053c09183a 100644 --- a/internal/message/types.go +++ b/internal/message/types.go @@ -6,6 +6,9 @@ package message import ( + "context" + "reflect" + "github.com/telepresenceio/watchable" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" @@ -20,8 +23,8 @@ import ( // ProviderResources message type ProviderResources struct { // GatewayAPIResources is a map from a GatewayClass name to - // a group of gateway API and other related resources. - GatewayAPIResources watchable.Map[string, *resource.ControllerResources] + // a group of gateway API and other related resources with trace context. + GatewayAPIResources watchable.Map[string, *resource.ControllerResourcesContext] // GatewayAPIStatuses is a group of gateway api // resource statuses maps. @@ -40,7 +43,9 @@ func (p *ProviderResources) GetResources() []*resource.Resources { } for _, v := range p.GatewayAPIResources.LoadAll() { - return *v + if v != nil && v.Resources != nil { + return *v.Resources + } } return nil @@ -129,9 +134,47 @@ func (e *ExtensionStatuses) Close() { e.BackendStatuses.Close() } +type XdsIRWithContext struct { + XdsIR *ir.Xds + Context context.Context +} + +// DeepCopy creates a new ControllerResourcesContext. +// The Context field is preserved (not deep copied) since contexts are meant to be passed around. +func (x *XdsIRWithContext) DeepCopy() *XdsIRWithContext { + if x == nil { + return nil + } + var xdsIRCopy *ir.Xds + if x.XdsIR != nil { + xdsIRCopy = x.XdsIR.DeepCopy() + } + return &XdsIRWithContext{ + XdsIR: xdsIRCopy, + Context: x.Context, + } +} + +func (x *XdsIRWithContext) Equal(other *XdsIRWithContext) bool { + if x == nil && other == nil { + return true + } + if x == nil || other == nil { + return false + } + if x.XdsIR == nil && other.XdsIR == nil { + return true + } + if x.XdsIR == nil || other.XdsIR == nil { + return false + } + + return reflect.DeepEqual(x.XdsIR, other.XdsIR) +} + // XdsIR message type XdsIR struct { - watchable.Map[string, *ir.Xds] + watchable.Map[string, *XdsIRWithContext] } // InfraIR message diff --git a/internal/message/types_test.go b/internal/message/types_test.go new file mode 100644 index 00000000000..5f05ee1f91b --- /dev/null +++ b/internal/message/types_test.go @@ -0,0 +1,83 @@ +// Copyright Envoy Gateway Authors +// SPDX-License-Identifier: Apache-2.0 +// The full text of the Apache license is available in the LICENSE file at +// the root of the repo. + +package message_test + +import ( + "context" + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + + egv1a1 "github.com/envoyproxy/gateway/api/v1alpha1" + "github.com/envoyproxy/gateway/internal/ir" + "github.com/envoyproxy/gateway/internal/message" +) + +// XdsIRWithContext structs with differing context values should be Equal +func TestXdsWithContextEqual(t *testing.T) { + xdsIR := &ir.Xds{ + HTTP: []*ir.HTTPListener{ + { + CoreListenerDetails: ir.CoreListenerDetails{ + Name: fmt.Sprintf("default/%s/listener-0", "gwName"), + }, + Routes: []*ir.HTTPRoute{ + { + Name: "route-0", + Traffic: &ir.TrafficFeatures{ + RateLimit: &ir.RateLimit{ + Global: &ir.GlobalRateLimit{ + Rules: []*ir.RateLimitRule{ + { + HeaderMatches: []*ir.StringMatch{ + { + Name: "x-user-id", + Distinct: true, + }, + }, + Limit: ir.RateLimitValue{ + Requests: 100, + Unit: ir.RateLimitUnit(egv1a1.RateLimitUnitMinute), + }, + }, + { + HeaderMatches: []*ir.StringMatch{ + { + Name: "x-another-user-id", + Distinct: true, + }, + }, + Limit: ir.RateLimitValue{ + Requests: 10, + Unit: ir.RateLimitUnit(egv1a1.RateLimitUnitSecond), + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + } + + c1 := context.Background() + c2 := context.TODO() + + x1 := &message.XdsIRWithContext{ + XdsIR: xdsIR, + Context: c1, + } + x2 := &message.XdsIRWithContext{ + XdsIR: xdsIR, + Context: c2, + } + + assert.True(t, x1.Equal(x2)) + assert.True(t, x2.Equal(x1)) +} diff --git a/internal/message/watchutil_test.go b/internal/message/watchutil_test.go index 21411b3f6f9..ed16c17c409 100644 --- a/internal/message/watchutil_test.go +++ b/internal/message/watchutil_test.go @@ -247,19 +247,28 @@ func TestControllerResourceUpdate(t *testing.T) { snapshotC := m.GatewayAPIResources.Subscribe(ctx) endCtx, end := context.WithCancel(ctx) - m.GatewayAPIResources.Store("start", &resource.ControllerResources{}) + m.GatewayAPIResources.Store("start", &resource.ControllerResourcesContext{ + Resources: &resource.ControllerResources{}, + Context: ctx, + }) go func() { <-endCtx.Done() for _, r := range tc.resources { r.Sort() - m.GatewayAPIResources.Store("test", r) + m.GatewayAPIResources.Store("test", &resource.ControllerResourcesContext{ + Resources: r, + Context: ctx, + }) } - m.GatewayAPIResources.Store("end", &resource.ControllerResources{}) + m.GatewayAPIResources.Store("end", &resource.ControllerResourcesContext{ + Resources: &resource.ControllerResources{}, + Context: ctx, + }) }() updates := 0 - message.HandleSubscription(message.Metadata{Runner: "demo", Message: "demo"}, snapshotC, func(u message.Update[string, *resource.ControllerResources], errChans chan error) { + message.HandleSubscription(message.Metadata{Runner: "demo", Message: "demo"}, snapshotC, func(u message.Update[string, *resource.ControllerResourcesContext], errChans chan error) { end() if u.Key == "test" { updates += 1 diff --git a/internal/provider/kubernetes/controller.go b/internal/provider/kubernetes/controller.go index c57a6411ead..6243670d292 100644 --- a/internal/provider/kubernetes/controller.go +++ b/internal/provider/kubernetes/controller.go @@ -13,6 +13,7 @@ import ( "time" "github.com/telepresenceio/watchable" + "go.opentelemetry.io/otel" appsv1 "k8s.io/api/apps/v1" certificatesv1b1 "k8s.io/api/certificates/v1beta1" corev1 "k8s.io/api/core/v1" @@ -57,6 +58,8 @@ var skipNameValidation = func() *bool { return ptr.To(false) } +var tracer = otel.Tracer("envoy-gateway/reconciliation") + type gatewayAPIReconciler struct { client client.Client log logging.Logger @@ -296,11 +299,14 @@ func isTransientError(err error) bool { // same reconcile.Request containing the gateway controller name. This allows multiple resource updates to // be handled by a single call to Reconcile. The reconcile.Request DOES NOT map to a specific resource. func (r *gatewayAPIReconciler) Reconcile(ctx context.Context, _ reconcile.Request) (reconcile.Result, error) { + ctx, span := tracer.Start(ctx, "GatewayAPIReconciler.Reconcile") + defer span.End() + logger := r.log.WithTrace(ctx) var ( managedGCs []*gwapiv1.GatewayClass err error ) - r.log.Info("reconciling gateways") + logger.Info("reconciling gateways") // Get the GatewayClasses managed by the Envoy Gateway Controller. managedGCs, err = r.managedGatewayClasses(ctx) @@ -315,7 +321,7 @@ func (r *gatewayAPIReconciler) Reconcile(ctx context.Context, _ reconcile.Reques defer func() { for _, key := range gcStatusToDelete.UnsortedList() { - r.log.Info("delete from GatewayClass statuses", "key", key) + logger.Info("delete from GatewayClass statuses", "key", key) r.resources.GatewayClassStatuses.Delete(key) } }() @@ -323,7 +329,7 @@ func (r *gatewayAPIReconciler) Reconcile(ctx context.Context, _ reconcile.Reques // The gatewayclass was already deleted/finalized and there are stale queue entries. if managedGCs == nil { r.resources.GatewayAPIResources.Delete(string(r.classController)) - r.log.Info("no accepted gatewayclass") + logger.Info("no accepted gatewayclass") return reconcile.Result{}, nil } @@ -341,18 +347,18 @@ func (r *gatewayAPIReconciler) Reconcile(ctx context.Context, _ reconcile.Reques gwcResource.GatewayClass = managedGC gwcResourceMapping := newResourceMapping() - logger := r.log.WithValues("GatewayClass", managedGC.Name) + gcLogger := logger.WithValues("GatewayClass", managedGC.Name) // Process the parametersRef of the accepted GatewayClass. // This should run before processGateways and processBackendRefs failToProcessGCParamsRef := false if managedGC.Spec.ParametersRef != nil && managedGC.DeletionTimestamp == nil { if err := r.processGatewayClassParamsRef(ctx, managedGC, gwcResourceMapping, gwcResource); err != nil { if isTransientError(err) { - logger.Error(err, "transient error processing parametersRef for GatewayClass") + gcLogger.Error(err, "transient error processing parametersRef for GatewayClass") return reconcile.Result{}, err } - logger.Error(err, "failed to process ParametersRef for GatewayClass") + gcLogger.Error(err, "failed to process ParametersRef for GatewayClass") msg := fmt.Sprintf("%s: %v", status.MsgGatewayClassInvalidParams, err) status.SetGatewayClassAccepted( managedGC, @@ -371,11 +377,11 @@ func (r *gatewayAPIReconciler) Reconcile(ctx context.Context, _ reconcile.Reques // process envoy gateway secret refs if err := r.processEnvoyProxySecretRef(ctx, gwcResource); err != nil { if isTransientError(err) { - logger.Error(err, "transient error processing TLS SecretRef for EnvoyProxy") + gcLogger.Error(err, "transient error processing TLS SecretRef for EnvoyProxy") return reconcile.Result{}, err } - r.log.Error(err, "failed to process TLS SecretRef for EnvoyProxy for GatewayClass") + gcLogger.Error(err, "failed to process TLS SecretRef for EnvoyProxy for GatewayClass") status.SetGatewayClassAccepted( managedGC, false, @@ -391,7 +397,7 @@ func (r *gatewayAPIReconciler) Reconcile(ctx context.Context, _ reconcile.Reques if !failToProcessGCParamsRef { // GatewayClass is valid so far, mark it as accepted. - logger.V(6).Info("Set GatewayClass Accepted") + gcLogger.V(6).Info("Set GatewayClass Accepted") status.SetGatewayClassAccepted( managedGC, true, @@ -407,38 +413,38 @@ func (r *gatewayAPIReconciler) Reconcile(ctx context.Context, _ reconcile.Reques // add the OIDC HMAC Secret to the resourceTree if err = r.processOIDCHMACSecret(ctx, gwcResource, gwcResourceMapping); err != nil { if isTransientError(err) { - logger.Error(err, "transient error processing OIDC HMAC Secret") + gcLogger.Error(err, "transient error processing OIDC HMAC Secret") return reconcile.Result{}, err } - logger.Error(err, "failed to process OIDC HMAC Secret for GatewayClass") + gcLogger.Error(err, "failed to process OIDC HMAC Secret for GatewayClass") } // add the Envoy TLS Secret to the resourceTree if err = r.processEnvoyTLSSecret(ctx, gwcResource, gwcResourceMapping); err != nil { if isTransientError(err) { - logger.Error(err, "transient error processing Envoy TLS Secret") + gcLogger.Error(err, "transient error processing Envoy TLS Secret") return reconcile.Result{}, err } - logger.Error(err, "failed to process EnvoyTLSSecret") + gcLogger.Error(err, "failed to process EnvoyTLSSecret") } // Add all Gateways, their associated Routes, and referenced resources to the resourceTree if err = r.processGateways(ctx, managedGC, gwcResourceMapping, gwcResource); err != nil { if isTransientError(err) { - logger.Error(err, "transient error processing gateways") + gcLogger.Error(err, "transient error processing gateways") return reconcile.Result{}, err } - logger.Error(err, "failed process gateways for GatewayClass") + gcLogger.Error(err, "failed process gateways for GatewayClass") } if r.eppCRDExists { // Add all EnvoyPatchPolicies to the resourceTree if err = r.processEnvoyPatchPolicies(ctx, gwcResource, gwcResourceMapping); err != nil { if isTransientError(err) { - logger.Error(err, "transient error processing EnvoyPatchPolicies") + gcLogger.Error(err, "transient error processing EnvoyPatchPolicies") return reconcile.Result{}, err } - logger.Error(err, "failed to process EnvoyPatchPolicies for GatewayClass") + gcLogger.Error(err, "failed to process EnvoyPatchPolicies for GatewayClass") } } @@ -446,10 +452,10 @@ func (r *gatewayAPIReconciler) Reconcile(ctx context.Context, _ reconcile.Reques // Add all ClientTrafficPolicies and their referenced resources to the resourceTree if err = r.processClientTrafficPolicies(ctx, gwcResource, gwcResourceMapping); err != nil { if isTransientError(err) { - logger.Error(err, "transient error processing ClientTrafficPolicies") + gcLogger.Error(err, "transient error processing ClientTrafficPolicies") return reconcile.Result{}, err } - logger.Error(err, "failed process to ClientTrafficPolicies for GatewayClass") + gcLogger.Error(err, "failed process to ClientTrafficPolicies for GatewayClass") } } @@ -457,10 +463,10 @@ func (r *gatewayAPIReconciler) Reconcile(ctx context.Context, _ reconcile.Reques // Add all BackendTrafficPolicies to the resourceTree if err = r.processBackendTrafficPolicies(ctx, gwcResource, gwcResourceMapping); err != nil { if isTransientError(err) { - logger.Error(err, "transient error processing BackendTrafficPolicies") + gcLogger.Error(err, "transient error processing BackendTrafficPolicies") return reconcile.Result{}, err } - logger.Error(err, "failed to process BackendTrafficPolicies for GatewayClass") + gcLogger.Error(err, "failed to process BackendTrafficPolicies for GatewayClass") } } @@ -468,10 +474,10 @@ func (r *gatewayAPIReconciler) Reconcile(ctx context.Context, _ reconcile.Reques // Add all SecurityPolicies and their referenced resources to the resourceTree if err = r.processSecurityPolicies(ctx, gwcResource, gwcResourceMapping); err != nil { if isTransientError(err) { - logger.Error(err, "transient error processing SecurityPolicies") + gcLogger.Error(err, "transient error processing SecurityPolicies") return reconcile.Result{}, err } - logger.Error(err, "failed to process SecurityPolicies for GatewayClass") + gcLogger.Error(err, "failed to process SecurityPolicies for GatewayClass") } } @@ -479,10 +485,10 @@ func (r *gatewayAPIReconciler) Reconcile(ctx context.Context, _ reconcile.Reques // Add all BackendTLSPolies to the resourceTree if err = r.processBackendTLSPolicies(ctx, gwcResource, gwcResourceMapping); err != nil { if isTransientError(err) { - logger.Error(err, "transient error processing BackendTLSPolicies") + gcLogger.Error(err, "transient error processing BackendTLSPolicies") return reconcile.Result{}, err } - logger.Error(err, "failed to process BackendTLSPolicies for GatewayClass") + gcLogger.Error(err, "failed to process BackendTLSPolicies for GatewayClass") } } @@ -490,19 +496,19 @@ func (r *gatewayAPIReconciler) Reconcile(ctx context.Context, _ reconcile.Reques // Add all EnvoyExtensionPolicies and their referenced resources to the resourceTree if err = r.processEnvoyExtensionPolicies(ctx, gwcResource, gwcResourceMapping); err != nil { if isTransientError(err) { - logger.Error(err, "transient error processing EnvoyExtensionPolicies") + gcLogger.Error(err, "transient error processing EnvoyExtensionPolicies") return reconcile.Result{}, err } - logger.Error(err, "failed to process EnvoyExtensionPolicies for GatewayClass") + gcLogger.Error(err, "failed to process EnvoyExtensionPolicies for GatewayClass") } } if err = r.processExtensionServerPolicies(ctx, gwcResource); err != nil { if isTransientError(err) { - logger.Error(err, "transient error processing ExtensionServerPolicies") + gcLogger.Error(err, "transient error processing ExtensionServerPolicies") return reconcile.Result{}, err } - logger.Error(err, "failed to process ExtensionServerPolicies for GatewayClass") + gcLogger.Error(err, "failed to process ExtensionServerPolicies for GatewayClass") } // Add the referenced services, ServiceImports, and EndpointSlices in @@ -510,11 +516,11 @@ func (r *gatewayAPIReconciler) Reconcile(ctx context.Context, _ reconcile.Reques // BackendRefs are referred by various Route objects and the ExtAuth in SecurityPolicies. if err = r.processBackendRefs(ctx, gwcResource, gwcResourceMapping); err != nil { if isTransientError(err) { - logger.Error(err, "transient error processing BackendRefs") + gcLogger.Error(err, "transient error processing BackendRefs") return reconcile.Result{}, err } - logger.Error(err, "failed to process BackendRefs for GatewayClass") + gcLogger.Error(err, "failed to process BackendRefs for GatewayClass") } // For this particular Gateway, and all associated objects, check whether the @@ -523,10 +529,10 @@ func (r *gatewayAPIReconciler) Reconcile(ctx context.Context, _ reconcile.Reques namespace, err := r.getNamespace(ctx, ns) if err != nil { if isTransientError(err) { - logger.Error(err, "transient error getting namespace", "namespace", ns) + gcLogger.Error(err, "transient error getting namespace", "namespace", ns) return reconcile.Result{}, err } - logger.Error(err, "unable to find the namespace", "namespace", ns) + gcLogger.Error(err, "unable to find the namespace", "namespace", ns) if kerrors.IsNotFound(err) { continue } @@ -541,24 +547,24 @@ func (r *gatewayAPIReconciler) Reconcile(ctx context.Context, _ reconcile.Reques } if len(gwcResource.Gateways) == 0 { - logger.Info("No gateways found for accepted GatewayClass") + gcLogger.Info("No gateways found for accepted GatewayClass") // If needed, remove the finalizer from the accepted GatewayClass. if err := r.removeFinalizer(ctx, managedGC); err != nil { if isTransientError(err) { - logger.Error(err, "transient error removing finalizer from GatewayClass") + gcLogger.Error(err, "transient error removing finalizer from GatewayClass") return reconcile.Result{}, err } - logger.Error(err, "failed to remove finalizer from GatewayClass") + gcLogger.Error(err, "failed to remove finalizer from GatewayClass") } } else { // finalize the accepted GatewayClass. if err := r.addFinalizer(ctx, managedGC); err != nil { if isTransientError(err) { - logger.Error(err, "transient error adding finalizer to gatewayClass") + gcLogger.Error(err, "transient error adding finalizer to gatewayClass") return reconcile.Result{}, err } - logger.Error(err, "failed adding finalizer to gatewayClass") + gcLogger.Error(err, "failed adding finalizer to gatewayClass") } } } @@ -570,17 +576,21 @@ func (r *gatewayAPIReconciler) Reconcile(ctx context.Context, _ reconcile.Reques // which impacts translation output gwcResources.Sort() - // Store the Gateway Resources for the GatewayClass. + // Store the Gateway Resources for the GatewayClass with trace context. // The Store is triggered even when there are no Gateways associated to the // GatewayClass. This would happen in case the last Gateway is removed and the // Store will be required to trigger a cleanup of envoy infra resources. - r.resources.GatewayAPIResources.Store(string(r.classController), &gwcResources) + resourcesWithContext := &resource.ControllerResourcesContext{ + Resources: &gwcResources, + Context: ctx, + } + r.resources.GatewayAPIResources.Store(string(r.classController), resourcesWithContext) message.PublishMetric(message.Metadata{ Runner: string(egv1a1.LogComponentProviderRunner), Message: message.ProviderResourcesMessageName, }, 1) - r.log.Info("reconciled gateways successfully") + logger.Info("reconciled gateways successfully") return reconcile.Result{}, nil } diff --git a/internal/traces/register.go b/internal/traces/register.go new file mode 100644 index 00000000000..bbce812406d --- /dev/null +++ b/internal/traces/register.go @@ -0,0 +1,63 @@ +// Copyright Envoy Gateway Authors +// SPDX-License-Identifier: Apache-2.0 +// The full text of the Apache license is available in the LICENSE file at +// the root of the repo. + +package traces + +import ( + "context" + "time" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/sdk/resource" + "go.opentelemetry.io/otel/sdk/trace" + semconv "go.opentelemetry.io/otel/semconv/v1.4.0" + + "github.com/envoyproxy/gateway/internal/envoygateway/config" +) + +type Runner struct { + cfg *config.Server + tp *trace.TracerProvider +} + +func New(cfg *config.Server) *Runner { + return &Runner{ + cfg: cfg, + } +} + +func (r *Runner) Start(ctx context.Context) error { + // Create resource + res, err := resource.New(ctx, + resource.WithAttributes( + semconv.ServiceNameKey.String("envoy-gateway"), + ), + ) + if err != nil { + return err + } + + tp := trace.NewTracerProvider( + trace.WithResource(res), + ) + + otel.SetTracerProvider(tp) + r.tp = tp + + return nil +} + +func (r *Runner) Name() string { + return "traces" +} + +func (r *Runner) Close() error { + if r.tp != nil { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + return r.tp.Shutdown(ctx) + } + return nil +} diff --git a/internal/traces/register_test.go b/internal/traces/register_test.go new file mode 100644 index 00000000000..a1a8a409902 --- /dev/null +++ b/internal/traces/register_test.go @@ -0,0 +1,58 @@ +// Copyright Envoy Gateway Authors +// SPDX-License-Identifier: Apache-2.0 +// The full text of the Apache license is available in the LICENSE file at +// the root of the repo. + +package traces + +import ( + "testing" + + "github.com/stretchr/testify/require" + + egv1a1 "github.com/envoyproxy/gateway/api/v1alpha1" + "github.com/envoyproxy/gateway/internal/envoygateway/config" +) + +func TestTracesRunner_New(t *testing.T) { + cfg := &config.Server{ + EnvoyGateway: &egv1a1.EnvoyGateway{ + EnvoyGatewaySpec: egv1a1.EnvoyGatewaySpec{ + Telemetry: &egv1a1.EnvoyGatewayTelemetry{}, + }, + }, + } + + runner := New(cfg) + require.NotNil(t, runner) + require.Equal(t, cfg, runner.cfg) + require.Nil(t, runner.tp) +} + +func TestTracesRunner_Close(t *testing.T) { + tests := []struct { + name string + runner *Runner + wantErr bool + }{ + { + name: "close with nil tracer provider", + runner: &Runner{ + cfg: &config.Server{}, + tp: nil, + }, + wantErr: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := tt.runner.Close() + if tt.wantErr { + require.Error(t, err) + } else { + require.NoError(t, err) + } + }) + } +} diff --git a/internal/xds/cache/snapshotcache.go b/internal/xds/cache/snapshotcache.go index 33c4cad18b6..72aaa1ba142 100644 --- a/internal/xds/cache/snapshotcache.go +++ b/internal/xds/cache/snapshotcache.go @@ -25,6 +25,8 @@ import ( discoveryv3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" cachev3 "github.com/envoyproxy/go-control-plane/pkg/cache/v3" serverv3 "github.com/envoyproxy/go-control-plane/pkg/server/v3" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/trace" "go.uber.org/zap" "github.com/envoyproxy/gateway/internal/logging" @@ -32,7 +34,10 @@ import ( "github.com/envoyproxy/gateway/internal/xds/types" ) -var Hash = cachev3.IDHash{} +var ( + Hash = cachev3.IDHash{} + tracer = otel.Tracer("envoy-gateway/xds/snapshotcache") +) // SnapshotCacheWithCallbacks uses the go-control-plane SimpleCache to store snapshots of // Envoy resources, sliced by Node ID so that we can do incremental xDS properly. @@ -46,7 +51,7 @@ var Hash = cachev3.IDHash{} type SnapshotCacheWithCallbacks interface { cachev3.SnapshotCache serverv3.Callbacks - GenerateNewSnapshot(string, types.XdsResources) error + GenerateNewSnapshot(string, types.XdsResources, context.Context) error SnapshotHasIrKey(string) bool GetIrKeys() []string } @@ -73,11 +78,18 @@ type snapshotCache struct { // GenerateNewSnapshot takes a table of resources (the output from the IR->xDS // translator) and updates the snapshot version. -func (s *snapshotCache) GenerateNewSnapshot(irKey string, resources types.XdsResources) error { +func (s *snapshotCache) GenerateNewSnapshot(irKey string, resources types.XdsResources, ctx context.Context) error { s.mu.Lock() defer s.mu.Unlock() - version := s.newSnapshotVersion() + _, span := tracer.Start(ctx, "SnapshotCache.GenerateNewSnapshot") + defer span.End() + + sc := trace.SpanContextFromContext(ctx) + version := sc.TraceID().String() + if !sc.IsValid() { + version = s.newSnapshotVersion() + } // Create a snapshot with all xDS resources. snapshot, err := cachev3.NewSnapshot( diff --git a/internal/xds/runner/runner.go b/internal/xds/runner/runner.go index a875d9d7304..6bbc841fb0b 100644 --- a/internal/xds/runner/runner.go +++ b/internal/xds/runner/runner.go @@ -24,6 +24,8 @@ import ( secretv3 "github.com/envoyproxy/go-control-plane/envoy/service/secret/v3" serverv3 "github.com/envoyproxy/go-control-plane/pkg/server/v3" "github.com/telepresenceio/watchable" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" "google.golang.org/grpc" "google.golang.org/grpc/credentials" "google.golang.org/grpc/keepalive" @@ -35,7 +37,6 @@ import ( extension "github.com/envoyproxy/gateway/internal/extension/types" "github.com/envoyproxy/gateway/internal/infrastructure/host" "github.com/envoyproxy/gateway/internal/infrastructure/kubernetes/ratelimit" - "github.com/envoyproxy/gateway/internal/ir" "github.com/envoyproxy/gateway/internal/message" "github.com/envoyproxy/gateway/internal/xds/bootstrap" "github.com/envoyproxy/gateway/internal/xds/cache" @@ -65,6 +66,8 @@ const ( defaultMaxConnectionAgeGrace = 2 * time.Minute ) +var tracer = otel.Tracer("envoy-gateway/xds") + var maxConnectionAgeValues = []time.Duration{ 10 * time.Hour, 11 * time.Hour, @@ -253,26 +256,41 @@ func registerServer(srv serverv3.Server, g *grpc.Server) { runtimev3.RegisterRuntimeDiscoveryServiceServer(g, srv) } -func (r *Runner) translateFromSubscription(sub <-chan watchable.Snapshot[string, *ir.Xds]) { +func (r *Runner) translateFromSubscription(sub <-chan watchable.Snapshot[string, *message.XdsIRWithContext]) { // Subscribe to resources message.HandleSubscription(message.Metadata{Runner: r.Name(), Message: message.XDSIRMessageName}, sub, - func(update message.Update[string, *ir.Xds], errChan chan error) { - r.Logger.Info("received an update") + func(update message.Update[string, *message.XdsIRWithContext], errChan chan error) { + parentCtx := context.Background() + if update.Value != nil && update.Value.Context != nil { + parentCtx = update.Value.Context + } + + traceCtx, span := tracer.Start(parentCtx, "XdsRunner.subscribeAndTranslate") + defer span.End() + traceLogger := r.Logger.WithTrace(traceCtx) + traceLogger.Info("received an update") + key := update.Key val := update.Value + // Add span attributes for observability + span.SetAttributes( + attribute.String("xds-ir.key", update.Key), + attribute.Bool("update.delete", update.Delete), + ) + if update.Delete { - if err := r.cache.GenerateNewSnapshot(key, nil); err != nil { - r.Logger.Error(err, "failed to delete the snapshot") + if err := r.cache.GenerateNewSnapshot(key, nil, traceCtx); err != nil { + traceLogger.Error(err, "failed to delete the snapshot") errChan <- err } } else { // Translate to xds resources t := &translator.Translator{ ControllerNamespace: r.ControllerNamespace, - FilterOrder: val.FilterOrder, + FilterOrder: val.XdsIR.FilterOrder, RuntimeFlags: r.EnvoyGateway.RuntimeFlags, - Logger: r.Logger, + Logger: traceLogger, } // Set the extension manager if an extension is loaded @@ -289,7 +307,7 @@ func (r *Runner) translateFromSubscription(sub <-chan watchable.Snapshot[string, if r.EnvoyGateway.RateLimit.Timeout != nil { d, err := time.ParseDuration(string(*r.EnvoyGateway.RateLimit.Timeout)) if err != nil { - r.Logger.Error(err, "invalid rateLimit timeout") + traceLogger.Error(err, "invalid rateLimit timeout") errChan <- err } else { t.GlobalRateLimit.Timeout = d @@ -297,16 +315,18 @@ func (r *Runner) translateFromSubscription(sub <-chan watchable.Snapshot[string, } } - result, err := t.Translate(val) + _, translateSpan := tracer.Start(traceCtx, "Translator.Translate") + result, err := t.Translate(val.XdsIR) + translateSpan.End() if err != nil { - r.Logger.Error(err, "failed to translate xds ir") + traceLogger.Error(err, "failed to translate xds ir") errChan <- err } // xDS translation is done in a best-effort manner, so the result // may contain partial resources even if there are errors. if result == nil { - r.Logger.Info("no xds resources to publish") + traceLogger.Info("no xds resources to publish") return } @@ -318,7 +338,7 @@ func (r *Runner) translateFromSubscription(sub <-chan watchable.Snapshot[string, errChan <- err } else { // Update snapshot cache - if err := r.cache.GenerateNewSnapshot(key, result.XdsResources); err != nil { + if err := r.cache.GenerateNewSnapshot(key, result.XdsResources, traceCtx); err != nil { r.Logger.Error(err, "failed to generate a snapshot") errChan <- err } diff --git a/internal/xds/runner/runner_test.go b/internal/xds/runner/runner_test.go index 2100919faed..a3afb438ee7 100644 --- a/internal/xds/runner/runner_test.go +++ b/internal/xds/runner/runner_test.go @@ -23,6 +23,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/tsaarni/certyaml" + "go.opentelemetry.io/otel/trace" "google.golang.org/grpc" "google.golang.org/grpc/credentials" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" @@ -36,6 +37,15 @@ import ( "github.com/envoyproxy/gateway/internal/xds/bootstrap" ) +func newTestTraceContext() context.Context { + sc := trace.NewSpanContext(trace.SpanContextConfig{ + TraceID: trace.TraceID{0x1, 0x2, 0x3, 0x4, 0x5, 0x6, 0x7, 0x8, 0x9, 0xa, 0xb, 0xc, 0xd, 0xe, 0xf, 0x0}, + SpanID: trace.SpanID{0xa, 0xb, 0xc, 0xd, 0xe, 0xf, 0x0, 0x1}, + TraceFlags: trace.FlagsSampled, + }) + return trace.ContextWithSpanContext(context.Background(), sc) +} + func TestTLSConfig(t *testing.T) { // Create trusted CA, server and client certs. trustedCACert := certyaml.Certificate{ @@ -259,7 +269,7 @@ func TestRunner(t *testing.T) { TLSCaPath: caFile, }) - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(newTestTraceContext()) defer cancel() // Start @@ -271,7 +281,7 @@ func TestRunner(t *testing.T) { }() // xDS is nil at start - require.Equal(t, map[string]*ir.Xds{}, xdsIR.LoadAll()) + require.Equal(t, map[string]*message.XdsIRWithContext{}, xdsIR.LoadAll()) // test translation path := "example" @@ -308,7 +318,11 @@ func TestRunner(t *testing.T) { }, }, } - xdsIR.Store("test", &res) + m := message.XdsIRWithContext{ + XdsIR: &res, + Context: newTestTraceContext(), + } + xdsIR.Store("test", &m) require.Eventually(t, func() bool { // Check that the cache has the snapshot for our test key return r.cache.SnapshotHasIrKey("test") @@ -348,7 +362,7 @@ func TestRunner_withExtensionManager_FailOpen(t *testing.T) { TLSCaPath: caFile, }) - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(newTestTraceContext()) defer cancel() // Start @@ -360,7 +374,7 @@ func TestRunner_withExtensionManager_FailOpen(t *testing.T) { }() // xDS is nil at start - require.Equal(t, map[string]*ir.Xds{}, xdsIR.LoadAll()) + require.Equal(t, map[string]*message.XdsIRWithContext{}, xdsIR.LoadAll()) // test translation path := "example" @@ -397,7 +411,11 @@ func TestRunner_withExtensionManager_FailOpen(t *testing.T) { }, }, } - xdsIR.Store("test", &res) + m := message.XdsIRWithContext{ + XdsIR: &res, + Context: newTestTraceContext(), + } + xdsIR.Store("test", &m) require.Eventually(t, func() bool { // Since the extension manager is configured to fail open, in an event of an error // from the extension manager hooks, xds update should be published. @@ -430,7 +448,7 @@ func TestRunner_withExtensionManager_FailClosed(t *testing.T) { TLSCaPath: caFile, }) - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(newTestTraceContext()) defer cancel() // Start @@ -442,7 +460,7 @@ func TestRunner_withExtensionManager_FailClosed(t *testing.T) { }() // xDS is nil at start - require.Equal(t, map[string]*ir.Xds{}, xdsIR.LoadAll()) + require.Equal(t, map[string]*message.XdsIRWithContext{}, xdsIR.LoadAll()) // test translation path := "example" @@ -479,7 +497,11 @@ func TestRunner_withExtensionManager_FailClosed(t *testing.T) { }, }, } - xdsIR.Store("test", &res) + m := message.XdsIRWithContext{ + XdsIR: &res, + Context: newTestTraceContext(), + } + xdsIR.Store("test", &m) require.Never(t, func() bool { // Since the extension manager is configured to fail closed, in an event of an error // from the extension manager hooks, xds update should not be published.