diff --git a/engine/explain_test.go b/engine/explain_test.go index f6604b841..ffac59c13 100644 --- a/engine/explain_test.go +++ b/engine/explain_test.go @@ -251,7 +251,6 @@ func TestQueryAnalyze(t *testing.T) { } } } - func TestAnalyzeOutputNode_Samples(t *testing.T) { t.Parallel() ng := engine.New(engine.Opts{EngineOpts: promql.EngineOpts{Timeout: 1 * time.Hour}, EnableAnalysis: true, DecodingConcurrency: 2}) @@ -299,9 +298,9 @@ func TestAnalyzeOutputNode_Samples(t *testing.T) { | | |---[duplicateLabelCheck]: max_series: 2 total_samples: 0 peak_samples: 0 | | | |---[coalesce]: max_series: 2 total_samples: 0 peak_samples: 0 | | | | |---[concurrent(buff=2)]: max_series: 1 total_samples: 0 peak_samples: 0 -| | | | | |---[matrixSelector] rate({[__name__="http_requests_total"]}[10m0s] 0 mod 2): max_series: 1 total_samples: 1010 peak_samples: 20 +| | | | | |---[matrixSelector] rate({[__name__="http_requests_total"]}[10m0s] 0 mod 2): max_series: 1 total_samples: 1010 peak_samples: 200 | | | | |---[concurrent(buff=2)]: max_series: 1 total_samples: 0 peak_samples: 0 -| | | | | |---[matrixSelector] rate({[__name__="http_requests_total"]}[10m0s] 1 mod 2): max_series: 1 total_samples: 1010 peak_samples: 20 +| | | | | |---[matrixSelector] rate({[__name__="http_requests_total"]}[10m0s] 1 mod 2): max_series: 1 total_samples: 1010 peak_samples: 200 ` require.EqualValues(t, expected, result) } @@ -332,3 +331,58 @@ func renderAnalysisTree(node *engine.AnalyzeOutputNode, level int) string { return result.String() } + +func TestAnalyzPeak(t *testing.T) { + t.Parallel() + ng := engine.New(engine.Opts{EngineOpts: promql.EngineOpts{Timeout: 1 * time.Hour}, EnableAnalysis: true, DecodingConcurrency: 2}) + ctx := context.Background() + load := `load 30s + http_requests_total{pod="nginx-1"} 1+1x100 + http_requests_total{pod="nginx-2"} 1+1x100` + + tstorage := promqltest.LoadedStorage(t, load) + defer tstorage.Close() + minT := tstorage.Head().Meta().MinTime + maxT := tstorage.Head().Meta().MaxTime + + query, err := ng.NewInstantQuery(ctx, tstorage, nil, "http_requests_total", time.Unix(0, 0)) + testutil.Ok(t, err) + queryResults := query.Exec(context.Background()) + testutil.Ok(t, queryResults.Err) + explainableQuery := query.(engine.ExplainableQuery) + analyzeOutput := explainableQuery.Analyze() + require.Greater(t, analyzeOutput.PeakSamples(), int64(0)) + require.Greater(t, analyzeOutput.TotalSamples(), int64(0)) + + rangeQry, err := ng.NewRangeQuery( + ctx, + tstorage, + promql.NewPrometheusQueryOpts(false, 0), + "sum(rate(http_requests_total[10m])) by (pod)", + time.Unix(minT, 0), + time.Unix(maxT, 0), + 60*time.Second, + ) + testutil.Ok(t, err) + queryResults = rangeQry.Exec(context.Background()) + testutil.Ok(t, queryResults.Err) + + explainableQuery = rangeQry.(engine.ExplainableQuery) + analyzeOutput = explainableQuery.Analyze() + + t.Logf("value of peak = %v", analyzeOutput.PeakSamples()) + require.Equal(t, int64(200), analyzeOutput.PeakSamples()) + + result := renderAnalysisTree(analyzeOutput, 0) + expected := `[duplicateLabelCheck]: max_series: 2 total_samples: 0 peak_samples: 0 +|---[concurrent(buff=2)]: max_series: 2 total_samples: 0 peak_samples: 0 +| |---[aggregate] sum by ([pod]): max_series: 2 total_samples: 0 peak_samples: 0 +| | |---[duplicateLabelCheck]: max_series: 2 total_samples: 0 peak_samples: 0 +| | | |---[coalesce]: max_series: 2 total_samples: 0 peak_samples: 0 +| | | | |---[concurrent(buff=2)]: max_series: 1 total_samples: 0 peak_samples: 0 +| | | | | |---[matrixSelector] rate({[__name__="http_requests_total"]}[10m0s] 0 mod 2): max_series: 1 total_samples: 1010 peak_samples: 200 +| | | | |---[concurrent(buff=2)]: max_series: 1 total_samples: 0 peak_samples: 0 +| | | | | |---[matrixSelector] rate({[__name__="http_requests_total"]}[10m0s] 1 mod 2): max_series: 1 total_samples: 1010 peak_samples: 200 +` + require.EqualValues(t, expected, result) +} diff --git a/execution/telemetry/telemetry.go b/execution/telemetry/telemetry.go index 69966989a..ca486eb2e 100644 --- a/execution/telemetry/telemetry.go +++ b/execution/telemetry/telemetry.go @@ -30,6 +30,7 @@ type OperatorTelemetry interface { IncrementSamplesAtTimestamp(samples int, t int64) Samples() *stats.QuerySamples LogicalNode() logicalplan.Node + UpdatePeak(count int) } func NewTelemetry(operator fmt.Stringer, opts *query.Options) OperatorTelemetry { @@ -91,6 +92,8 @@ func (tm *NoopTelemetry) LogicalNode() logicalplan.Node { return nil } +func (tm *NoopTelemetry) UpdatePeak(_ int) {} + type TrackedTelemetry struct { fmt.Stringer @@ -144,7 +147,6 @@ func (ti *TrackedTelemetry) NextExecutionTime() time.Duration { } func (ti *TrackedTelemetry) IncrementSamplesAtTimestamp(samples int, t int64) { - ti.updatePeak(samples) ti.LoadedSamples.IncrementSamplesAtTimestamp(t, int64(samples)) } @@ -152,16 +154,16 @@ func (ti *TrackedTelemetry) LogicalNode() logicalplan.Node { return ti.logicalNode } -func (ti *TrackedTelemetry) updatePeak(samples int) { - ti.LoadedSamples.UpdatePeak(samples) -} - func (ti *TrackedTelemetry) Samples() *stats.QuerySamples { return ti.LoadedSamples } func (ti *TrackedTelemetry) MaxSeriesCount() int { return ti.Series } func (ti *TrackedTelemetry) SetMaxSeriesCount(count int) { ti.Series = count } +func (ti *TrackedTelemetry) UpdatePeak(count int) { + ti.Samples().UpdatePeak(count) +} + type ObservableVectorOperator interface { model.VectorOperator OperatorTelemetry @@ -203,8 +205,31 @@ func (t *Operator) Series(ctx context.Context) ([]labels.Labels, error) { func (t *Operator) Next(ctx context.Context) ([]model.StepVector, error) { start := time.Now() + var totalSamplesBeforeCount int64 + totalSamplesBefore := t.OperatorTelemetry.Samples() + if totalSamplesBefore != nil { + totalSamplesBeforeCount = totalSamplesBefore.TotalSamples + } else { + totalSamplesBeforeCount = 0 + } + defer func() { t.OperatorTelemetry.AddNextExecutionTime(time.Since(start)) }() - return t.inner.Next(ctx) + out, err := t.inner.Next(ctx) + if err != nil { + return nil, err + } + + var totalSamplesAfter int64 + totalSamplesAfterSamples := t.OperatorTelemetry.Samples() + if totalSamplesAfterSamples != nil { + totalSamplesAfter = totalSamplesAfterSamples.TotalSamples + } else { + totalSamplesAfter = 0 + } + + t.OperatorTelemetry.UpdatePeak(int(totalSamplesAfter) - int(totalSamplesBeforeCount)) + + return out, err } func (t *Operator) GetPool() *model.VectorPool { diff --git a/storage/prometheus/vector_selector.go b/storage/prometheus/vector_selector.go index 19fd0809d..d00bc7c05 100644 --- a/storage/prometheus/vector_selector.go +++ b/storage/prometheus/vector_selector.go @@ -137,6 +137,7 @@ func (o *vectorSelector) Next(ctx context.Context) ([]model.StepVector, error) { // Reset the current timestamp. ts = o.currentStep fromSeries := o.currentSeries + for ; o.currentSeries-fromSeries < o.seriesBatchSize && o.currentSeries < int64(len(o.scanners)); o.currentSeries++ { var ( series = o.scanners[o.currentSeries] @@ -164,6 +165,7 @@ func (o *vectorSelector) Next(ctx context.Context) ([]model.StepVector, error) { seriesTs += o.step } } + if o.currentSeries == int64(len(o.scanners)) { o.currentStep += o.step * int64(o.numSteps) o.currentSeries = 0