Skip to content

Commit 91e6e32

Browse files
Fixing Peak Sample Count Per Series (#604)
* fix: Peak Sample Per Series Signed-off-by: Naman-B-Parlecha <[email protected]> * refactor: migrating to new sample update Signed-off-by: Naman-B-Parlecha <[email protected]> * update: changing opentel next Signed-off-by: Naman-B-Parlecha <[email protected]> * test: peak value Signed-off-by: Naman-B-Parlecha <[email protected]> * removing update peak Signed-off-by: Naman-B-Parlecha <[email protected]> * update: new UpdatePeak Interface In OperatorTelemetry Signed-off-by: Naman-B-Parlecha <[email protected]> * chore: removing unused updatePeak Signed-off-by: Naman-B-Parlecha <[email protected]> * update: Handling NoopTelemetry Error Signed-off-by: Naman-B-Parlecha <[email protected]> * fix: making sampleCount zero Signed-off-by: Naman-B-Parlecha <[email protected]> --------- Signed-off-by: Naman-B-Parlecha <[email protected]>
1 parent c8ce33a commit 91e6e32

File tree

3 files changed

+90
-9
lines changed

3 files changed

+90
-9
lines changed

engine/explain_test.go

Lines changed: 57 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -251,7 +251,6 @@ func TestQueryAnalyze(t *testing.T) {
251251
}
252252
}
253253
}
254-
255254
func TestAnalyzeOutputNode_Samples(t *testing.T) {
256255
t.Parallel()
257256
ng := engine.New(engine.Opts{EngineOpts: promql.EngineOpts{Timeout: 1 * time.Hour}, EnableAnalysis: true, DecodingConcurrency: 2})
@@ -299,9 +298,9 @@ func TestAnalyzeOutputNode_Samples(t *testing.T) {
299298
| | |---[duplicateLabelCheck]: max_series: 2 total_samples: 0 peak_samples: 0
300299
| | | |---[coalesce]: max_series: 2 total_samples: 0 peak_samples: 0
301300
| | | | |---[concurrent(buff=2)]: max_series: 1 total_samples: 0 peak_samples: 0
302-
| | | | | |---[matrixSelector] rate({[__name__="http_requests_total"]}[10m0s] 0 mod 2): max_series: 1 total_samples: 1010 peak_samples: 20
301+
| | | | | |---[matrixSelector] rate({[__name__="http_requests_total"]}[10m0s] 0 mod 2): max_series: 1 total_samples: 1010 peak_samples: 200
303302
| | | | |---[concurrent(buff=2)]: max_series: 1 total_samples: 0 peak_samples: 0
304-
| | | | | |---[matrixSelector] rate({[__name__="http_requests_total"]}[10m0s] 1 mod 2): max_series: 1 total_samples: 1010 peak_samples: 20
303+
| | | | | |---[matrixSelector] rate({[__name__="http_requests_total"]}[10m0s] 1 mod 2): max_series: 1 total_samples: 1010 peak_samples: 200
305304
`
306305
require.EqualValues(t, expected, result)
307306
}
@@ -332,3 +331,58 @@ func renderAnalysisTree(node *engine.AnalyzeOutputNode, level int) string {
332331

333332
return result.String()
334333
}
334+
335+
func TestAnalyzPeak(t *testing.T) {
336+
t.Parallel()
337+
ng := engine.New(engine.Opts{EngineOpts: promql.EngineOpts{Timeout: 1 * time.Hour}, EnableAnalysis: true, DecodingConcurrency: 2})
338+
ctx := context.Background()
339+
load := `load 30s
340+
http_requests_total{pod="nginx-1"} 1+1x100
341+
http_requests_total{pod="nginx-2"} 1+1x100`
342+
343+
tstorage := promqltest.LoadedStorage(t, load)
344+
defer tstorage.Close()
345+
minT := tstorage.Head().Meta().MinTime
346+
maxT := tstorage.Head().Meta().MaxTime
347+
348+
query, err := ng.NewInstantQuery(ctx, tstorage, nil, "http_requests_total", time.Unix(0, 0))
349+
testutil.Ok(t, err)
350+
queryResults := query.Exec(context.Background())
351+
testutil.Ok(t, queryResults.Err)
352+
explainableQuery := query.(engine.ExplainableQuery)
353+
analyzeOutput := explainableQuery.Analyze()
354+
require.Greater(t, analyzeOutput.PeakSamples(), int64(0))
355+
require.Greater(t, analyzeOutput.TotalSamples(), int64(0))
356+
357+
rangeQry, err := ng.NewRangeQuery(
358+
ctx,
359+
tstorage,
360+
promql.NewPrometheusQueryOpts(false, 0),
361+
"sum(rate(http_requests_total[10m])) by (pod)",
362+
time.Unix(minT, 0),
363+
time.Unix(maxT, 0),
364+
60*time.Second,
365+
)
366+
testutil.Ok(t, err)
367+
queryResults = rangeQry.Exec(context.Background())
368+
testutil.Ok(t, queryResults.Err)
369+
370+
explainableQuery = rangeQry.(engine.ExplainableQuery)
371+
analyzeOutput = explainableQuery.Analyze()
372+
373+
t.Logf("value of peak = %v", analyzeOutput.PeakSamples())
374+
require.Equal(t, int64(200), analyzeOutput.PeakSamples())
375+
376+
result := renderAnalysisTree(analyzeOutput, 0)
377+
expected := `[duplicateLabelCheck]: max_series: 2 total_samples: 0 peak_samples: 0
378+
|---[concurrent(buff=2)]: max_series: 2 total_samples: 0 peak_samples: 0
379+
| |---[aggregate] sum by ([pod]): max_series: 2 total_samples: 0 peak_samples: 0
380+
| | |---[duplicateLabelCheck]: max_series: 2 total_samples: 0 peak_samples: 0
381+
| | | |---[coalesce]: max_series: 2 total_samples: 0 peak_samples: 0
382+
| | | | |---[concurrent(buff=2)]: max_series: 1 total_samples: 0 peak_samples: 0
383+
| | | | | |---[matrixSelector] rate({[__name__="http_requests_total"]}[10m0s] 0 mod 2): max_series: 1 total_samples: 1010 peak_samples: 200
384+
| | | | |---[concurrent(buff=2)]: max_series: 1 total_samples: 0 peak_samples: 0
385+
| | | | | |---[matrixSelector] rate({[__name__="http_requests_total"]}[10m0s] 1 mod 2): max_series: 1 total_samples: 1010 peak_samples: 200
386+
`
387+
require.EqualValues(t, expected, result)
388+
}

execution/telemetry/telemetry.go

Lines changed: 31 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ type OperatorTelemetry interface {
3030
IncrementSamplesAtTimestamp(samples int, t int64)
3131
Samples() *stats.QuerySamples
3232
LogicalNode() logicalplan.Node
33+
UpdatePeak(count int)
3334
}
3435

3536
func NewTelemetry(operator fmt.Stringer, opts *query.Options) OperatorTelemetry {
@@ -91,6 +92,8 @@ func (tm *NoopTelemetry) LogicalNode() logicalplan.Node {
9192
return nil
9293
}
9394

95+
func (tm *NoopTelemetry) UpdatePeak(_ int) {}
96+
9497
type TrackedTelemetry struct {
9598
fmt.Stringer
9699

@@ -144,24 +147,23 @@ func (ti *TrackedTelemetry) NextExecutionTime() time.Duration {
144147
}
145148

146149
func (ti *TrackedTelemetry) IncrementSamplesAtTimestamp(samples int, t int64) {
147-
ti.updatePeak(samples)
148150
ti.LoadedSamples.IncrementSamplesAtTimestamp(t, int64(samples))
149151
}
150152

151153
func (ti *TrackedTelemetry) LogicalNode() logicalplan.Node {
152154
return ti.logicalNode
153155
}
154156

155-
func (ti *TrackedTelemetry) updatePeak(samples int) {
156-
ti.LoadedSamples.UpdatePeak(samples)
157-
}
158-
159157
func (ti *TrackedTelemetry) Samples() *stats.QuerySamples { return ti.LoadedSamples }
160158

161159
func (ti *TrackedTelemetry) MaxSeriesCount() int { return ti.Series }
162160

163161
func (ti *TrackedTelemetry) SetMaxSeriesCount(count int) { ti.Series = count }
164162

163+
func (ti *TrackedTelemetry) UpdatePeak(count int) {
164+
ti.Samples().UpdatePeak(count)
165+
}
166+
165167
type ObservableVectorOperator interface {
166168
model.VectorOperator
167169
OperatorTelemetry
@@ -203,8 +205,31 @@ func (t *Operator) Series(ctx context.Context) ([]labels.Labels, error) {
203205

204206
func (t *Operator) Next(ctx context.Context) ([]model.StepVector, error) {
205207
start := time.Now()
208+
var totalSamplesBeforeCount int64
209+
totalSamplesBefore := t.OperatorTelemetry.Samples()
210+
if totalSamplesBefore != nil {
211+
totalSamplesBeforeCount = totalSamplesBefore.TotalSamples
212+
} else {
213+
totalSamplesBeforeCount = 0
214+
}
215+
206216
defer func() { t.OperatorTelemetry.AddNextExecutionTime(time.Since(start)) }()
207-
return t.inner.Next(ctx)
217+
out, err := t.inner.Next(ctx)
218+
if err != nil {
219+
return nil, err
220+
}
221+
222+
var totalSamplesAfter int64
223+
totalSamplesAfterSamples := t.OperatorTelemetry.Samples()
224+
if totalSamplesAfterSamples != nil {
225+
totalSamplesAfter = totalSamplesAfterSamples.TotalSamples
226+
} else {
227+
totalSamplesAfter = 0
228+
}
229+
230+
t.OperatorTelemetry.UpdatePeak(int(totalSamplesAfter) - int(totalSamplesBeforeCount))
231+
232+
return out, err
208233
}
209234

210235
func (t *Operator) GetPool() *model.VectorPool {

storage/prometheus/vector_selector.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,7 @@ func (o *vectorSelector) Next(ctx context.Context) ([]model.StepVector, error) {
137137
// Reset the current timestamp.
138138
ts = o.currentStep
139139
fromSeries := o.currentSeries
140+
140141
for ; o.currentSeries-fromSeries < o.seriesBatchSize && o.currentSeries < int64(len(o.scanners)); o.currentSeries++ {
141142
var (
142143
series = o.scanners[o.currentSeries]
@@ -164,6 +165,7 @@ func (o *vectorSelector) Next(ctx context.Context) ([]model.StepVector, error) {
164165
seriesTs += o.step
165166
}
166167
}
168+
167169
if o.currentSeries == int64(len(o.scanners)) {
168170
o.currentStep += o.step * int64(o.numSteps)
169171
o.currentSeries = 0

0 commit comments

Comments
 (0)