Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 57 additions & 3 deletions engine/explain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,6 @@ func TestQueryAnalyze(t *testing.T) {
}
}
}

func TestAnalyzeOutputNode_Samples(t *testing.T) {
t.Parallel()
ng := engine.New(engine.Opts{EngineOpts: promql.EngineOpts{Timeout: 1 * time.Hour}, EnableAnalysis: true, DecodingConcurrency: 2})
Expand Down Expand Up @@ -299,9 +298,9 @@ func TestAnalyzeOutputNode_Samples(t *testing.T) {
| | |---[duplicateLabelCheck]: max_series: 2 total_samples: 0 peak_samples: 0
| | | |---[coalesce]: max_series: 2 total_samples: 0 peak_samples: 0
| | | | |---[concurrent(buff=2)]: max_series: 1 total_samples: 0 peak_samples: 0
| | | | | |---[matrixSelector] rate({[__name__="http_requests_total"]}[10m0s] 0 mod 2): max_series: 1 total_samples: 1010 peak_samples: 20
| | | | | |---[matrixSelector] rate({[__name__="http_requests_total"]}[10m0s] 0 mod 2): max_series: 1 total_samples: 1010 peak_samples: 200
| | | | |---[concurrent(buff=2)]: max_series: 1 total_samples: 0 peak_samples: 0
| | | | | |---[matrixSelector] rate({[__name__="http_requests_total"]}[10m0s] 1 mod 2): max_series: 1 total_samples: 1010 peak_samples: 20
| | | | | |---[matrixSelector] rate({[__name__="http_requests_total"]}[10m0s] 1 mod 2): max_series: 1 total_samples: 1010 peak_samples: 200
`
require.EqualValues(t, expected, result)
}
Expand Down Expand Up @@ -332,3 +331,58 @@ func renderAnalysisTree(node *engine.AnalyzeOutputNode, level int) string {

return result.String()
}

func TestAnalyzPeak(t *testing.T) {
t.Parallel()
ng := engine.New(engine.Opts{EngineOpts: promql.EngineOpts{Timeout: 1 * time.Hour}, EnableAnalysis: true, DecodingConcurrency: 2})
ctx := context.Background()
load := `load 30s
http_requests_total{pod="nginx-1"} 1+1x100
http_requests_total{pod="nginx-2"} 1+1x100`

tstorage := promqltest.LoadedStorage(t, load)
defer tstorage.Close()
minT := tstorage.Head().Meta().MinTime
maxT := tstorage.Head().Meta().MaxTime

query, err := ng.NewInstantQuery(ctx, tstorage, nil, "http_requests_total", time.Unix(0, 0))
testutil.Ok(t, err)
queryResults := query.Exec(context.Background())
testutil.Ok(t, queryResults.Err)
explainableQuery := query.(engine.ExplainableQuery)
analyzeOutput := explainableQuery.Analyze()
require.Greater(t, analyzeOutput.PeakSamples(), int64(0))
require.Greater(t, analyzeOutput.TotalSamples(), int64(0))

rangeQry, err := ng.NewRangeQuery(
ctx,
tstorage,
promql.NewPrometheusQueryOpts(false, 0),
"sum(rate(http_requests_total[10m])) by (pod)",
time.Unix(minT, 0),
time.Unix(maxT, 0),
60*time.Second,
)
testutil.Ok(t, err)
queryResults = rangeQry.Exec(context.Background())
testutil.Ok(t, queryResults.Err)

explainableQuery = rangeQry.(engine.ExplainableQuery)
analyzeOutput = explainableQuery.Analyze()

t.Logf("value of peak = %v", analyzeOutput.PeakSamples())
require.Equal(t, int64(200), analyzeOutput.PeakSamples())

result := renderAnalysisTree(analyzeOutput, 0)
expected := `[duplicateLabelCheck]: max_series: 2 total_samples: 0 peak_samples: 0
|---[concurrent(buff=2)]: max_series: 2 total_samples: 0 peak_samples: 0
| |---[aggregate] sum by ([pod]): max_series: 2 total_samples: 0 peak_samples: 0
| | |---[duplicateLabelCheck]: max_series: 2 total_samples: 0 peak_samples: 0
| | | |---[coalesce]: max_series: 2 total_samples: 0 peak_samples: 0
| | | | |---[concurrent(buff=2)]: max_series: 1 total_samples: 0 peak_samples: 0
| | | | | |---[matrixSelector] rate({[__name__="http_requests_total"]}[10m0s] 0 mod 2): max_series: 1 total_samples: 1010 peak_samples: 200
| | | | |---[concurrent(buff=2)]: max_series: 1 total_samples: 0 peak_samples: 0
| | | | | |---[matrixSelector] rate({[__name__="http_requests_total"]}[10m0s] 1 mod 2): max_series: 1 total_samples: 1010 peak_samples: 200
Copy link
Contributor

@harry671003 harry671003 Jul 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the peak samples correct here?
For the selector {[__name__="http_requests_total"]}[10m0s]

Conditions:

  • Steps batch is 10.
  • Selectors shards = 2

Calculation:

  • Each Next() call has 10 steps.

  • Each step has 1 series (due to sharding)

  • For each step:

    • Each series has a sample every 30 seconds.
    • In [10m] range, there will be 10*60s/30s = 20 samples
  • Total samples in each step = 10 steps * 1 series * 20 samples = 200

Looks correct.

`
require.EqualValues(t, expected, result)
}
37 changes: 31 additions & 6 deletions execution/telemetry/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type OperatorTelemetry interface {
IncrementSamplesAtTimestamp(samples int, t int64)
Samples() *stats.QuerySamples
LogicalNode() logicalplan.Node
UpdatePeak(count int)
}

func NewTelemetry(operator fmt.Stringer, opts *query.Options) OperatorTelemetry {
Expand Down Expand Up @@ -91,6 +92,8 @@ func (tm *NoopTelemetry) LogicalNode() logicalplan.Node {
return nil
}

func (tm *NoopTelemetry) UpdatePeak(_ int) {}

type TrackedTelemetry struct {
fmt.Stringer

Expand Down Expand Up @@ -144,24 +147,23 @@ func (ti *TrackedTelemetry) NextExecutionTime() time.Duration {
}

func (ti *TrackedTelemetry) IncrementSamplesAtTimestamp(samples int, t int64) {
ti.updatePeak(samples)
ti.LoadedSamples.IncrementSamplesAtTimestamp(t, int64(samples))
}

func (ti *TrackedTelemetry) LogicalNode() logicalplan.Node {
return ti.logicalNode
}

func (ti *TrackedTelemetry) updatePeak(samples int) {
ti.LoadedSamples.UpdatePeak(samples)
}

func (ti *TrackedTelemetry) Samples() *stats.QuerySamples { return ti.LoadedSamples }

func (ti *TrackedTelemetry) MaxSeriesCount() int { return ti.Series }

func (ti *TrackedTelemetry) SetMaxSeriesCount(count int) { ti.Series = count }

func (ti *TrackedTelemetry) UpdatePeak(count int) {
ti.Samples().UpdatePeak(count)
}

type ObservableVectorOperator interface {
model.VectorOperator
OperatorTelemetry
Expand Down Expand Up @@ -203,8 +205,31 @@ func (t *Operator) Series(ctx context.Context) ([]labels.Labels, error) {

func (t *Operator) Next(ctx context.Context) ([]model.StepVector, error) {
start := time.Now()
var totalSamplesBeforeCount int64
totalSamplesBefore := t.OperatorTelemetry.Samples()
if totalSamplesBefore != nil {
totalSamplesBeforeCount = totalSamplesBefore.TotalSamples
} else {
totalSamplesBeforeCount = 0
}

defer func() { t.OperatorTelemetry.AddNextExecutionTime(time.Since(start)) }()
return t.inner.Next(ctx)
out, err := t.inner.Next(ctx)
if err != nil {
return nil, err
}

var totalSamplesAfter int64
totalSamplesAfterSamples := t.OperatorTelemetry.Samples()
if totalSamplesAfterSamples != nil {
totalSamplesAfter = totalSamplesAfterSamples.TotalSamples
} else {
totalSamplesAfter = 0
}

t.OperatorTelemetry.UpdatePeak(int(totalSamplesAfter) - int(totalSamplesBeforeCount))

return out, err
}

func (t *Operator) GetPool() *model.VectorPool {
Expand Down
2 changes: 2 additions & 0 deletions storage/prometheus/vector_selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ func (o *vectorSelector) Next(ctx context.Context) ([]model.StepVector, error) {
// Reset the current timestamp.
ts = o.currentStep
fromSeries := o.currentSeries

for ; o.currentSeries-fromSeries < o.seriesBatchSize && o.currentSeries < int64(len(o.scanners)); o.currentSeries++ {
var (
series = o.scanners[o.currentSeries]
Expand Down Expand Up @@ -164,6 +165,7 @@ func (o *vectorSelector) Next(ctx context.Context) ([]model.StepVector, error) {
seriesTs += o.step
}
}

if o.currentSeries == int64(len(o.scanners)) {
o.currentStep += o.step * int64(o.numSteps)
o.currentSeries = 0
Expand Down
Loading