44package engine
55
66import (
7+ "sync"
8+
79 "github.com/prometheus/prometheus/promql"
810
911 "github.com/thanos-io/promql-engine/execution/model"
@@ -18,7 +20,12 @@ type ExplainableQuery interface {
1820
1921type AnalyzeOutputNode struct {
2022 OperatorTelemetry model.OperatorTelemetry `json:"telemetry,omitempty"`
21- Children []AnalyzeOutputNode `json:"children,omitempty"`
23+ Children []* AnalyzeOutputNode `json:"children,omitempty"`
24+
25+ once sync.Once
26+ totalSamples int64
27+ peakSamples int64
28+ totalSamplesPerStep []int64
2229}
2330
2431type ExplainOutputNode struct {
@@ -29,60 +36,48 @@ type ExplainOutputNode struct {
2936var _ ExplainableQuery = & compatibilityQuery {}
3037
3138func (a * AnalyzeOutputNode ) TotalSamples () int64 {
32- var total int64
33- if a .OperatorTelemetry .Samples () != nil {
34- total += a .OperatorTelemetry .Samples ().TotalSamples
35- }
36- if a .OperatorTelemetry .SubQuery () {
37- // Returning here to avoid double counting samples from children of subquery.
38- return total
39- }
40-
41- for _ , child := range a .Children {
42- c := child .TotalSamples ()
43- if c > 0 {
44- total += child .TotalSamples ()
45- }
46- }
47-
48- return total
39+ a .aggregateSamples ()
40+ return a .totalSamples
4941}
5042
5143func (a * AnalyzeOutputNode ) TotalSamplesPerStep () []int64 {
52- if a .OperatorTelemetry .Samples () == nil {
53- return []int64 {}
54- }
55-
56- total := a .OperatorTelemetry .Samples ().TotalSamplesPerStep
57- for _ , child := range a .Children {
58- for i , s := range child .TotalSamplesPerStep () {
59- total [i ] += s
60- }
61- }
62-
63- return total
44+ a .aggregateSamples ()
45+ return a .totalSamplesPerStep
6446}
6547
6648func (a * AnalyzeOutputNode ) PeakSamples () int64 {
67- var peak int64
68- if a .OperatorTelemetry .Samples () != nil {
69- peak = int64 (a .OperatorTelemetry .Samples ().PeakSamples )
70- }
71- for _ , child := range a .Children {
72- childPeak := child .PeakSamples ()
73- if childPeak > peak {
74- peak = childPeak
49+ a .aggregateSamples ()
50+ return a .peakSamples
51+ }
52+
53+ func (a * AnalyzeOutputNode ) aggregateSamples () {
54+ a .once .Do (func () {
55+ if nodeSamples := a .OperatorTelemetry .Samples (); nodeSamples != nil {
56+ a .totalSamples += nodeSamples .TotalSamples
57+ a .peakSamples += int64 (nodeSamples .PeakSamples )
58+ a .totalSamplesPerStep = nodeSamples .TotalSamplesPerStep
7559 }
76- }
77- return peak
60+
61+ for _ , child := range a .Children {
62+ childPeak := child .PeakSamples ()
63+ a .peakSamples = max (a .peakSamples , childPeak )
64+ for i , s := range child .TotalSamplesPerStep () {
65+ a .totalSamplesPerStep [i ] += s
66+ }
67+ // Aggregate only if the node is not a subquery to avoid double counting samples from children.
68+ if ! a .OperatorTelemetry .SubQuery () {
69+ a .totalSamples += child .TotalSamples ()
70+ }
71+ }
72+ })
7873}
7974
8075func analyzeQuery (obsv model.ObservableVectorOperator ) * AnalyzeOutputNode {
8176 children := obsv .Explain ()
82- var childTelemetry []AnalyzeOutputNode
77+ var childTelemetry []* AnalyzeOutputNode
8378 for _ , child := range children {
8479 if obsChild , ok := child .(model.ObservableVectorOperator ); ok {
85- childTelemetry = append (childTelemetry , * analyzeQuery (obsChild ))
80+ childTelemetry = append (childTelemetry , analyzeQuery (obsChild ))
8681 }
8782 }
8883
0 commit comments