Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions cmd/jaeger/internal/integration/badger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,6 @@ func TestBadgerStorage(t *testing.T) {
ConfigFile: "../../config-badger.yaml",
StorageIntegration: integration.StorageIntegration{
CleanUp: purge,

// TODO: remove this once badger supports returning spanKind from GetOperations
// Cf https://github.com/jaegertracing/jaeger/issues/1922
GetOperationsMissingSpanKind: true,
},
}
s.e2eInitialize(t, "badger")
Expand Down
5 changes: 1 addition & 4 deletions internal/storage/integration/badgerstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,7 @@ func TestBadgerStorage(t *testing.T) {
testutils.VerifyGoLeaksOnce(t)
})
s := &BadgerIntegrationStorage{
StorageIntegration: StorageIntegration{
// TODO: remove this badger supports returning spanKind from GetOperations
GetOperationsMissingSpanKind: true,
},
StorageIntegration: StorageIntegration{},
}
s.CleanUp = s.cleanUp
s.initialize(t)
Expand Down
23 changes: 16 additions & 7 deletions internal/storage/v1/badger/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/dgraph-io/badger/v4"
"github.com/spf13/viper"
"go.opentelemetry.io/collector/featuregate"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/internal/storage/v1"
Expand All @@ -37,11 +38,19 @@ const (
)

var ( // interface comformance checks
_ storage.Factory = (*Factory)(nil)
_ io.Closer = (*Factory)(nil)
_ storage.Configurable = (*Factory)(nil)
_ storage.Purger = (*Factory)(nil)
_ storage.SamplingStoreFactory = (*Factory)(nil)
_ storage.Factory = (*Factory)(nil)
_ io.Closer = (*Factory)(nil)
_ storage.Configurable = (*Factory)(nil)
_ storage.Purger = (*Factory)(nil)
_ storage.SamplingStoreFactory = (*Factory)(nil)
includeDualLookUp = featuregate.GlobalRegistry().MustRegister(
"jaeger.badger.dualLookUp",
featuregate.StageBeta, // enabed by default
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a typo in the feature gate stage description: enabed should be enabled.

Suggested change
featuregate.StageBeta, // enabed by default
featuregate.StageBeta, // enabled by default

Spotted by Diamond

Is this helpful? React 👍 or 👎 to let us know.

featuregate.WithRegisterFromVersion("v2.2.0"),
featuregate.WithRegisterToVersion("v2.5.0"),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am confused about the two options:

  1. Versions used
  2. Whether it should be the issue which should be linked or pull request, as issue is not talking about this change directly!

featuregate.WithRegisterDescription("Allows reader to look up for traces from old index key"),
featuregate.WithRegisterReferenceURL("https://github.com/jaegertracing/jaeger/pull/6376"),
)
)

// Factory implements storage.Factory for Badger backend.
Expand Down Expand Up @@ -145,7 +154,7 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger)
}
f.store = store

f.cache = badgerStore.NewCacheStore(f.store, f.Config.TTL.Spans)
f.cache = badgerStore.NewCacheStore(f.Config.TTL.Spans)

f.metrics.ValueLogSpaceAvailable = metricsFactory.Gauge(metrics.Options{Name: valueLogSpaceAvailableName})
f.metrics.KeyLogSpaceAvailable = metricsFactory.Gauge(metrics.Options{Name: keyLogSpaceAvailableName})
Expand All @@ -171,7 +180,7 @@ func initializeDir(path string) {

// CreateSpanReader implements storage.Factory
func (f *Factory) CreateSpanReader() (spanstore.Reader, error) {
tr := badgerStore.NewTraceReader(f.store, f.cache, true)
tr := badgerStore.NewTraceReader(f.store, f.cache, true, includeDualLookUp.IsEnabled())
return spanstoremetrics.NewReaderDecorator(tr, f.metricsFactory), nil
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright (c) 2025 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0

package spanstore

import (
"context"
"math/rand"
"testing"
"time"

"github.com/dgraph-io/badger/v4"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/jaegertracing/jaeger-idl/model/v1"
"github.com/jaegertracing/jaeger/internal/storage/v1/api/spanstore"
)

// This test is for checking the backward compatibility after changing the index.
// Once dual lookup is completely removed, this test can be removed
func TestBackwardCompatibility(t *testing.T) {
runWithBadger(t, func(store *badger.DB, t *testing.T) {
startT := time.Now()
tid := startT
cache := NewCacheStore(1 * time.Hour)
reader := NewTraceReader(store, cache, true, true)
writer := NewSpanWriter(store, cache, 1*time.Hour)
oldSpan := model.Span{
TraceID: model.TraceID{
Low: 0,
High: 1,
},
SpanID: model.SpanID(rand.Uint64()),
OperationName: "operation-1",
Process: &model.Process{
ServiceName: "service",
},
StartTime: tid,
Duration: time.Duration(time.Duration(1) * time.Millisecond),
}
err := writer.writeSpan(&oldSpan, true)
require.NoError(t, err)
traces, err := reader.FindTraces(context.Background(), &spanstore.TraceQueryParameters{
ServiceName: "service",
OperationName: "operation-1",
StartTimeMin: startT,
StartTimeMax: startT.Add(time.Duration(time.Millisecond * 10)),
})
require.NoError(t, err)
assert.Len(t, traces, 1)
assert.Len(t, traces[0].Spans, 1)
assert.Equal(t, oldSpan.TraceID, traces[0].Spans[0].TraceID)
assert.Equal(t, oldSpan.SpanID, traces[0].Spans[0].SpanID)
})
}
96 changes: 55 additions & 41 deletions internal/storage/v1/badger/spanstore/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,29 +8,37 @@ import (
"sync"
"time"

"github.com/dgraph-io/badger/v4"

"github.com/jaegertracing/jaeger-idl/model/v1"
"github.com/jaegertracing/jaeger/internal/storage/v1/api/spanstore"
)

// CacheStore saves expensive calculations from the K/V store
type CacheStore struct {
// Given the small amount of data these will store, we use the same structure as the memory store
cacheLock sync.Mutex // write heavy - Mutex is faster than RWMutex for writes
services map[string]uint64
operations map[string]map[string]uint64
cacheLock sync.Mutex // write heavy - Mutex is faster than RWMutex for writes
services map[string]uint64
// This map is for the hierarchy: service name, kind and operation name.
// Each service contains the span kinds, and then operation names belonging to that kind.
// This structure will look like:
/*
"service1":{
SpanKind.unspecified: {
"operation1": uint64
}
}
*/
// The uint64 value is the expiry time of operation
operations map[string]map[model.SpanKind]map[string]uint64

store *badger.DB
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't find any use of store in Cache when the responsibility to fill cache is given to reader

ttl time.Duration
ttl time.Duration
}

// NewCacheStore returns initialized CacheStore for badger use
func NewCacheStore(db *badger.DB, ttl time.Duration) *CacheStore {
func NewCacheStore(ttl time.Duration) *CacheStore {
cs := &CacheStore{
services: make(map[string]uint64),
operations: make(map[string]map[string]uint64),
operations: make(map[string]map[model.SpanKind]map[string]uint64),
ttl: ttl,
store: db,
}
return cs
}
Expand All @@ -48,67 +56,73 @@ func (c *CacheStore) AddService(service string, keyTTL uint64) {
}

// AddOperation adds the cache with operation names with most updated expiration time
func (c *CacheStore) AddOperation(service, operation string, keyTTL uint64) {
func (c *CacheStore) AddOperation(service, operation string, kind model.SpanKind, keyTTL uint64) {
c.cacheLock.Lock()
defer c.cacheLock.Unlock()
if _, found := c.operations[service]; !found {
c.operations[service] = make(map[string]uint64)
c.operations[service] = make(map[model.SpanKind]map[string]uint64)
}
if v, found := c.operations[service][operation]; found {
if _, found := c.operations[service][kind]; !found {
c.operations[service][kind] = make(map[string]uint64)
}
if v, found := c.operations[service][kind][operation]; found {
if v > keyTTL {
return
}
}
c.operations[service][operation] = keyTTL
c.operations[service][kind][operation] = keyTTL
}

// Update caches the results of service and service + operation indexes and maintains their TTL
func (c *CacheStore) Update(service, operation string, expireTime uint64) {
func (c *CacheStore) Update(service, operation string, kind model.SpanKind, expireTime uint64) {
c.cacheLock.Lock()

c.services[service] = expireTime
if _, ok := c.operations[service]; !ok {
c.operations[service] = make(map[string]uint64)
if _, found := c.operations[service]; !found {
c.operations[service] = make(map[model.SpanKind]map[string]uint64)
}
if _, found := c.operations[service][kind]; !found {
c.operations[service][kind] = make(map[string]uint64)
}
c.operations[service][operation] = expireTime
c.operations[service][kind][operation] = expireTime
c.cacheLock.Unlock()
}

// GetOperations returns all operations for a specific service & spanKind traced by Jaeger
func (c *CacheStore) GetOperations(service string) ([]spanstore.Operation, error) {
operations := make([]string, 0, len(c.services))
func (c *CacheStore) GetOperations(service string, kind string) ([]spanstore.Operation, error) {
operations := make([]spanstore.Operation, 0, len(c.services))
//nolint: gosec // G115
t := uint64(time.Now().Unix())
currentTime := uint64(time.Now().Unix())
c.cacheLock.Lock()
defer c.cacheLock.Unlock()

if v, ok := c.services[service]; ok {
if v < t {
if expiryTimeOfService, ok := c.services[service]; ok {
if expiryTimeOfService < currentTime {
// Expired, remove
delete(c.services, service)
delete(c.operations, service)
return []spanstore.Operation{}, nil // empty slice rather than nil
}
for o, e := range c.operations[service] {
if e > t {
operations = append(operations, o)
} else {
delete(c.operations[service], o)
for sKind := range c.operations[service] {
if kind != "" && kind != string(sKind) {
continue
}
for o, expiryTimeOfOperation := range c.operations[service][sKind] {
if expiryTimeOfOperation > currentTime {
op := spanstore.Operation{Name: o, SpanKind: string(sKind)}
operations = append(operations, op)
} else {
delete(c.operations[service][sKind], o)
}
sort.Slice(operations, func(i, j int) bool {
if operations[i].SpanKind == operations[j].SpanKind {
return operations[i].Name < operations[j].Name
}
return operations[i].SpanKind < operations[j].SpanKind
})
}
}
}

sort.Strings(operations)

// TODO: https://github.com/jaegertracing/jaeger/issues/1922
// - return the operations with actual spanKind
result := make([]spanstore.Operation, 0, len(operations))
for _, op := range operations {
result = append(result, spanstore.Operation{
Name: op,
})
}
return result, nil
return operations, nil
}

// GetServices returns all services traced by Jaeger
Expand Down
Loading
Loading