Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion sdk/metric/periodic_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,14 @@ type periodicReaderConfig struct {
interval time.Duration
timeout time.Duration
producers []Producer
context context.Context
}

// newPeriodicReaderConfig returns a periodicReaderConfig configured with
// options.
func newPeriodicReaderConfig(options []PeriodicReaderOption) periodicReaderConfig {
c := periodicReaderConfig{
context: context.Background(),
interval: envDuration(envInterval, defaultInterval),
timeout: envDuration(envTimeout, defaultTimeout),
}
Expand Down Expand Up @@ -94,6 +96,17 @@ func WithInterval(d time.Duration) PeriodicReaderOption {
})
}

// WithContext allows setting a context to be used when calling the collector.
// If no context is set, the PeriodicReader will use a background context.
// This can be used to pass context to the called collectors, such as logging
// or tracing information.
func WithContext(ctx context.Context) PeriodicReaderOption {
return periodicReaderOptionFunc(func(conf periodicReaderConfig) periodicReaderConfig {
conf.context = ctx
return conf
})
}

// NewPeriodicReader returns a Reader that collects and exports metric data to
// the exporter at a defined interval. By default, the returned Reader will
// collect and export data every 60 seconds, and will cancel any attempts that
Expand All @@ -105,7 +118,7 @@ func WithInterval(d time.Duration) PeriodicReaderOption {
// exporter. That is left to the user to accomplish.
func NewPeriodicReader(exporter Exporter, options ...PeriodicReaderOption) *PeriodicReader {
conf := newPeriodicReaderConfig(options)
ctx, cancel := context.WithCancel(context.Background())
ctx, cancel := context.WithCancel(conf.context)
r := &PeriodicReader{
interval: conf.interval,
timeout: conf.timeout,
Expand Down
22 changes: 11 additions & 11 deletions sdk/metric/periodic_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@
},
}

r := NewPeriodicReader(exp, WithProducer(testExternalProducer{}))
r := NewPeriodicReader(exp, WithProducer(testExternalProducer{}), WithContext(t.Context()))

Check failure on line 283 in sdk/metric/periodic_reader_test.go

View workflow job for this annotation

GitHub Actions / lint

stdversion: testing.Context requires go1.24 or later (file is go1.23) (govet)
r.register(testSDKProducer{})
trigger <- time.Now()
assert.Equal(t, assert.AnError, <-eh.Err)
Expand Down Expand Up @@ -308,7 +308,7 @@

t.Run("ForceFlush", func(t *testing.T) {
exp, called := expFunc(t)
r := NewPeriodicReader(exp, WithProducer(testExternalProducer{}))
r := NewPeriodicReader(exp, WithProducer(testExternalProducer{}), WithContext(t.Context()))

Check failure on line 311 in sdk/metric/periodic_reader_test.go

View workflow job for this annotation

GitHub Actions / lint

stdversion: testing.Context requires go1.24 or later (file is go1.23) (govet)
r.register(testSDKProducer{})
assert.Equal(t, assert.AnError, r.ForceFlush(context.Background()), "export error not returned")
assert.True(t, *called, "exporter Export method not called, pending telemetry not flushed")
Expand All @@ -320,7 +320,7 @@
t.Run("ForceFlush timeout on producer", func(t *testing.T) {
exp, called := expFunc(t)
timeout := time.Millisecond
r := NewPeriodicReader(exp, WithTimeout(timeout), WithProducer(testExternalProducer{}))
r := NewPeriodicReader(exp, WithTimeout(timeout), WithProducer(testExternalProducer{}), WithContext(t.Context()))

Check failure on line 323 in sdk/metric/periodic_reader_test.go

View workflow job for this annotation

GitHub Actions / lint

File is not properly formatted (golines)
r.register(testSDKProducer{
produceFunc: func(ctx context.Context, rm *metricdata.ResourceMetrics) error {
select {
Expand All @@ -343,7 +343,7 @@
t.Run("ForceFlush timeout on external producer", func(t *testing.T) {
exp, called := expFunc(t)
timeout := time.Millisecond
r := NewPeriodicReader(exp, WithTimeout(timeout), WithProducer(testExternalProducer{
r := NewPeriodicReader(exp, WithTimeout(timeout), WithContext(t.Context()), WithProducer(testExternalProducer{

Check failure on line 346 in sdk/metric/periodic_reader_test.go

View workflow job for this annotation

GitHub Actions / lint

stdversion: testing.Context requires go1.24 or later (file is go1.23) (govet)
produceFunc: func(ctx context.Context) ([]metricdata.ScopeMetrics, error) {
select {
case <-time.After(timeout + time.Second):
Expand All @@ -364,7 +364,7 @@

t.Run("Shutdown", func(t *testing.T) {
exp, called := expFunc(t)
r := NewPeriodicReader(exp, WithProducer(testExternalProducer{}))
r := NewPeriodicReader(exp, WithProducer(testExternalProducer{}), WithContext(t.Context()))

Check failure on line 367 in sdk/metric/periodic_reader_test.go

View workflow job for this annotation

GitHub Actions / lint

stdversion: testing.Context requires go1.24 or later (file is go1.23) (govet)
r.register(testSDKProducer{})
assert.Equal(t, assert.AnError, r.Shutdown(context.Background()), "export error not returned")
assert.True(t, *called, "exporter Export method not called, pending telemetry not flushed")
Expand All @@ -373,7 +373,7 @@
t.Run("Shutdown timeout on producer", func(t *testing.T) {
exp, called := expFunc(t)
timeout := time.Millisecond
r := NewPeriodicReader(exp, WithTimeout(timeout), WithProducer(testExternalProducer{}))
r := NewPeriodicReader(exp, WithTimeout(timeout), WithProducer(testExternalProducer{}), WithContext(t.Context()))
r.register(testSDKProducer{
produceFunc: func(ctx context.Context, rm *metricdata.ResourceMetrics) error {
select {
Expand All @@ -393,7 +393,7 @@
t.Run("Shutdown timeout on external producer", func(t *testing.T) {
exp, called := expFunc(t)
timeout := time.Millisecond
r := NewPeriodicReader(exp, WithTimeout(timeout), WithProducer(testExternalProducer{
r := NewPeriodicReader(exp, WithTimeout(timeout), WithContext(t.Context()), WithProducer(testExternalProducer{

Check failure on line 396 in sdk/metric/periodic_reader_test.go

View workflow job for this annotation

GitHub Actions / lint

stdversion: testing.Context requires go1.24 or later (file is go1.23) (govet)
produceFunc: func(ctx context.Context) ([]metricdata.ScopeMetrics, error) {
select {
case <-time.After(timeout + time.Second):
Expand All @@ -412,15 +412,15 @@

func TestPeriodicReaderMultipleForceFlush(t *testing.T) {
ctx := context.Background()
r := NewPeriodicReader(new(fnExporter), WithProducer(testExternalProducer{}))
r := NewPeriodicReader(new(fnExporter), WithContext(t.Context()), WithProducer(testExternalProducer{}))

Check failure on line 415 in sdk/metric/periodic_reader_test.go

View workflow job for this annotation

GitHub Actions / lint

stdversion: testing.Context requires go1.24 or later (file is go1.23) (govet)
r.register(testSDKProducer{})
require.NoError(t, r.ForceFlush(ctx))
require.NoError(t, r.ForceFlush(ctx))
require.NoError(t, r.Shutdown(ctx))
}

func BenchmarkPeriodicReader(b *testing.B) {
r := NewPeriodicReader(new(fnExporter))
r := NewPeriodicReader(new(fnExporter), WithContext(b.Context()))

Check failure on line 423 in sdk/metric/periodic_reader_test.go

View workflow job for this annotation

GitHub Actions / lint

stdversion: testing.Context requires go1.24 or later (file is go1.23) (govet)
b.Run("Collect", benchReaderCollectFunc(r))
require.NoError(b, r.Shutdown(context.Background()))
}
Expand Down Expand Up @@ -453,7 +453,7 @@
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
var undefinedInstrument InstrumentKind
rdr := NewPeriodicReader(tt.exporter)
rdr := NewPeriodicReader(tt.exporter, WithContext(t.Context()))

Check failure on line 456 in sdk/metric/periodic_reader_test.go

View workflow job for this annotation

GitHub Actions / lint

stdversion: testing.Context requires go1.24 or later (file is go1.23) (govet)
assert.Equal(t, tt.wantTemporality.String(), rdr.temporality(undefinedInstrument).String())
})
}
Expand Down Expand Up @@ -482,7 +482,7 @@

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
rdr := NewPeriodicReader(new(fnExporter))
rdr := NewPeriodicReader(new(fnExporter), WithContext(t.Context()))

Check failure on line 485 in sdk/metric/periodic_reader_test.go

View workflow job for this annotation

GitHub Actions / lint

stdversion: testing.Context requires go1.24 or later (file is go1.23) (govet)
mp := NewMeterProvider(WithReader(rdr))
meter := mp.Meter("test")

Expand Down
Loading