-
Notifications
You must be signed in to change notification settings - Fork 2.7k
[feat][storage] Add SpanKind support for badger #6376
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
Manik2708
wants to merge
7
commits into
jaegertracing:main
Choose a base branch
from
Manik2708:kind
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
7 commits
Select commit
Hold shift + click to select a range
b2842fd
conflicts
Manik2708 72c23ac
Merge branch 'main' into kind
Manik2708 69098e8
conflicts
Manik2708 146b659
Merge branch 'main' into kind
Manik2708 e1deae4
Update internal/storage/v1/badger/spanstore/reader.go
Manik2708 a820441
Merge branch 'main' into kind
Manik2708 e6cadda
typo fix
Manik2708 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -15,6 +15,7 @@ import ( | |
|
|
||
| "github.com/dgraph-io/badger/v4" | ||
| "github.com/spf13/viper" | ||
| "go.opentelemetry.io/collector/featuregate" | ||
| "go.uber.org/zap" | ||
|
|
||
| "github.com/jaegertracing/jaeger/internal/storage/v1" | ||
|
|
@@ -37,11 +38,19 @@ const ( | |
| ) | ||
|
|
||
| var ( // interface comformance checks | ||
| _ storage.Factory = (*Factory)(nil) | ||
| _ io.Closer = (*Factory)(nil) | ||
| _ storage.Configurable = (*Factory)(nil) | ||
| _ storage.Purger = (*Factory)(nil) | ||
| _ storage.SamplingStoreFactory = (*Factory)(nil) | ||
| _ storage.Factory = (*Factory)(nil) | ||
| _ io.Closer = (*Factory)(nil) | ||
| _ storage.Configurable = (*Factory)(nil) | ||
| _ storage.Purger = (*Factory)(nil) | ||
| _ storage.SamplingStoreFactory = (*Factory)(nil) | ||
| includeDualLookUp = featuregate.GlobalRegistry().MustRegister( | ||
| "jaeger.badger.dualLookUp", | ||
| featuregate.StageBeta, // enabed by default | ||
| featuregate.WithRegisterFromVersion("v2.2.0"), | ||
| featuregate.WithRegisterToVersion("v2.5.0"), | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am confused about the two options:
|
||
| featuregate.WithRegisterDescription("Allows reader to look up for traces from old index key"), | ||
| featuregate.WithRegisterReferenceURL("https://github.com/jaegertracing/jaeger/pull/6376"), | ||
| ) | ||
| ) | ||
|
|
||
| // Factory implements storage.Factory for Badger backend. | ||
|
|
@@ -145,7 +154,7 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) | |
| } | ||
| f.store = store | ||
|
|
||
| f.cache = badgerStore.NewCacheStore(f.store, f.Config.TTL.Spans) | ||
| f.cache = badgerStore.NewCacheStore(f.Config.TTL.Spans) | ||
|
|
||
| f.metrics.ValueLogSpaceAvailable = metricsFactory.Gauge(metrics.Options{Name: valueLogSpaceAvailableName}) | ||
| f.metrics.KeyLogSpaceAvailable = metricsFactory.Gauge(metrics.Options{Name: keyLogSpaceAvailableName}) | ||
|
|
@@ -171,7 +180,7 @@ func initializeDir(path string) { | |
|
|
||
| // CreateSpanReader implements storage.Factory | ||
| func (f *Factory) CreateSpanReader() (spanstore.Reader, error) { | ||
| tr := badgerStore.NewTraceReader(f.store, f.cache, true) | ||
| tr := badgerStore.NewTraceReader(f.store, f.cache, true, includeDualLookUp.IsEnabled()) | ||
| return spanstoremetrics.NewReaderDecorator(tr, f.metricsFactory), nil | ||
| } | ||
|
|
||
|
|
||
56 changes: 56 additions & 0 deletions
56
internal/storage/v1/badger/spanstore/backward_compatibility_test.go
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,56 @@ | ||
| // Copyright (c) 2025 The Jaeger Authors. | ||
| // SPDX-License-Identifier: Apache-2.0 | ||
|
|
||
| package spanstore | ||
|
|
||
| import ( | ||
| "context" | ||
| "math/rand" | ||
| "testing" | ||
| "time" | ||
|
|
||
| "github.com/dgraph-io/badger/v4" | ||
| "github.com/stretchr/testify/assert" | ||
| "github.com/stretchr/testify/require" | ||
|
|
||
| "github.com/jaegertracing/jaeger-idl/model/v1" | ||
| "github.com/jaegertracing/jaeger/internal/storage/v1/api/spanstore" | ||
| ) | ||
|
|
||
| // This test is for checking the backward compatibility after changing the index. | ||
| // Once dual lookup is completely removed, this test can be removed | ||
| func TestBackwardCompatibility(t *testing.T) { | ||
| runWithBadger(t, func(store *badger.DB, t *testing.T) { | ||
| startT := time.Now() | ||
| tid := startT | ||
| cache := NewCacheStore(1 * time.Hour) | ||
| reader := NewTraceReader(store, cache, true, true) | ||
| writer := NewSpanWriter(store, cache, 1*time.Hour) | ||
| oldSpan := model.Span{ | ||
| TraceID: model.TraceID{ | ||
| Low: 0, | ||
| High: 1, | ||
| }, | ||
| SpanID: model.SpanID(rand.Uint64()), | ||
| OperationName: "operation-1", | ||
| Process: &model.Process{ | ||
| ServiceName: "service", | ||
| }, | ||
| StartTime: tid, | ||
| Duration: time.Duration(time.Duration(1) * time.Millisecond), | ||
| } | ||
| err := writer.writeSpan(&oldSpan, true) | ||
| require.NoError(t, err) | ||
| traces, err := reader.FindTraces(context.Background(), &spanstore.TraceQueryParameters{ | ||
| ServiceName: "service", | ||
| OperationName: "operation-1", | ||
| StartTimeMin: startT, | ||
| StartTimeMax: startT.Add(time.Duration(time.Millisecond * 10)), | ||
| }) | ||
| require.NoError(t, err) | ||
| assert.Len(t, traces, 1) | ||
| assert.Len(t, traces[0].Spans, 1) | ||
| assert.Equal(t, oldSpan.TraceID, traces[0].Spans[0].TraceID) | ||
| assert.Equal(t, oldSpan.SpanID, traces[0].Spans[0].SpanID) | ||
| }) | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -8,29 +8,37 @@ import ( | |
| "sync" | ||
| "time" | ||
|
|
||
| "github.com/dgraph-io/badger/v4" | ||
|
|
||
| "github.com/jaegertracing/jaeger-idl/model/v1" | ||
| "github.com/jaegertracing/jaeger/internal/storage/v1/api/spanstore" | ||
| ) | ||
|
|
||
| // CacheStore saves expensive calculations from the K/V store | ||
| type CacheStore struct { | ||
| // Given the small amount of data these will store, we use the same structure as the memory store | ||
| cacheLock sync.Mutex // write heavy - Mutex is faster than RWMutex for writes | ||
| services map[string]uint64 | ||
| operations map[string]map[string]uint64 | ||
| cacheLock sync.Mutex // write heavy - Mutex is faster than RWMutex for writes | ||
| services map[string]uint64 | ||
| // This map is for the hierarchy: service name, kind and operation name. | ||
| // Each service contains the span kinds, and then operation names belonging to that kind. | ||
| // This structure will look like: | ||
| /* | ||
| "service1":{ | ||
| SpanKind.unspecified: { | ||
| "operation1": uint64 | ||
| } | ||
| } | ||
| */ | ||
| // The uint64 value is the expiry time of operation | ||
| operations map[string]map[model.SpanKind]map[string]uint64 | ||
|
|
||
| store *badger.DB | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can't find any use of store in Cache when the responsibility to fill cache is given to reader |
||
| ttl time.Duration | ||
| ttl time.Duration | ||
| } | ||
|
|
||
| // NewCacheStore returns initialized CacheStore for badger use | ||
| func NewCacheStore(db *badger.DB, ttl time.Duration) *CacheStore { | ||
| func NewCacheStore(ttl time.Duration) *CacheStore { | ||
| cs := &CacheStore{ | ||
| services: make(map[string]uint64), | ||
| operations: make(map[string]map[string]uint64), | ||
| operations: make(map[string]map[model.SpanKind]map[string]uint64), | ||
| ttl: ttl, | ||
| store: db, | ||
| } | ||
| return cs | ||
| } | ||
|
|
@@ -48,67 +56,73 @@ func (c *CacheStore) AddService(service string, keyTTL uint64) { | |
| } | ||
|
|
||
| // AddOperation adds the cache with operation names with most updated expiration time | ||
| func (c *CacheStore) AddOperation(service, operation string, keyTTL uint64) { | ||
| func (c *CacheStore) AddOperation(service, operation string, kind model.SpanKind, keyTTL uint64) { | ||
| c.cacheLock.Lock() | ||
| defer c.cacheLock.Unlock() | ||
| if _, found := c.operations[service]; !found { | ||
| c.operations[service] = make(map[string]uint64) | ||
| c.operations[service] = make(map[model.SpanKind]map[string]uint64) | ||
| } | ||
| if v, found := c.operations[service][operation]; found { | ||
| if _, found := c.operations[service][kind]; !found { | ||
| c.operations[service][kind] = make(map[string]uint64) | ||
| } | ||
| if v, found := c.operations[service][kind][operation]; found { | ||
| if v > keyTTL { | ||
| return | ||
| } | ||
| } | ||
| c.operations[service][operation] = keyTTL | ||
| c.operations[service][kind][operation] = keyTTL | ||
| } | ||
|
|
||
| // Update caches the results of service and service + operation indexes and maintains their TTL | ||
| func (c *CacheStore) Update(service, operation string, expireTime uint64) { | ||
| func (c *CacheStore) Update(service, operation string, kind model.SpanKind, expireTime uint64) { | ||
| c.cacheLock.Lock() | ||
|
|
||
| c.services[service] = expireTime | ||
| if _, ok := c.operations[service]; !ok { | ||
| c.operations[service] = make(map[string]uint64) | ||
| if _, found := c.operations[service]; !found { | ||
| c.operations[service] = make(map[model.SpanKind]map[string]uint64) | ||
| } | ||
| if _, found := c.operations[service][kind]; !found { | ||
| c.operations[service][kind] = make(map[string]uint64) | ||
| } | ||
| c.operations[service][operation] = expireTime | ||
| c.operations[service][kind][operation] = expireTime | ||
| c.cacheLock.Unlock() | ||
| } | ||
|
|
||
| // GetOperations returns all operations for a specific service & spanKind traced by Jaeger | ||
| func (c *CacheStore) GetOperations(service string) ([]spanstore.Operation, error) { | ||
| operations := make([]string, 0, len(c.services)) | ||
| func (c *CacheStore) GetOperations(service string, kind string) ([]spanstore.Operation, error) { | ||
| operations := make([]spanstore.Operation, 0, len(c.services)) | ||
| //nolint: gosec // G115 | ||
| t := uint64(time.Now().Unix()) | ||
| currentTime := uint64(time.Now().Unix()) | ||
| c.cacheLock.Lock() | ||
| defer c.cacheLock.Unlock() | ||
|
|
||
| if v, ok := c.services[service]; ok { | ||
| if v < t { | ||
| if expiryTimeOfService, ok := c.services[service]; ok { | ||
| if expiryTimeOfService < currentTime { | ||
| // Expired, remove | ||
| delete(c.services, service) | ||
| delete(c.operations, service) | ||
| return []spanstore.Operation{}, nil // empty slice rather than nil | ||
| } | ||
| for o, e := range c.operations[service] { | ||
| if e > t { | ||
| operations = append(operations, o) | ||
| } else { | ||
| delete(c.operations[service], o) | ||
| for sKind := range c.operations[service] { | ||
| if kind != "" && kind != string(sKind) { | ||
| continue | ||
| } | ||
| for o, expiryTimeOfOperation := range c.operations[service][sKind] { | ||
| if expiryTimeOfOperation > currentTime { | ||
| op := spanstore.Operation{Name: o, SpanKind: string(sKind)} | ||
| operations = append(operations, op) | ||
| } else { | ||
| delete(c.operations[service][sKind], o) | ||
| } | ||
| sort.Slice(operations, func(i, j int) bool { | ||
| if operations[i].SpanKind == operations[j].SpanKind { | ||
| return operations[i].Name < operations[j].Name | ||
| } | ||
| return operations[i].SpanKind < operations[j].SpanKind | ||
| }) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| sort.Strings(operations) | ||
|
|
||
| // TODO: https://github.com/jaegertracing/jaeger/issues/1922 | ||
| // - return the operations with actual spanKind | ||
| result := make([]spanstore.Operation, 0, len(operations)) | ||
| for _, op := range operations { | ||
| result = append(result, spanstore.Operation{ | ||
| Name: op, | ||
| }) | ||
| } | ||
| return result, nil | ||
| return operations, nil | ||
| } | ||
|
|
||
| // GetServices returns all services traced by Jaeger | ||
|
|
||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a typo in the feature gate stage description:
enabedshould beenabled.Spotted by Diamond
Is this helpful? React 👍 or 👎 to let us know.