@@ -6,87 +6,61 @@ package binary
66import (
77 "context"
88 "fmt"
9- "math"
109 "sync"
1110
1211 "github.com/thanos-io/promql-engine/execution/model"
1312 "github.com/thanos-io/promql-engine/execution/telemetry"
13+ "github.com/thanos-io/promql-engine/execution/warnings"
1414 "github.com/thanos-io/promql-engine/extlabels"
1515 "github.com/thanos-io/promql-engine/query"
1616
17+ "github.com/prometheus/prometheus/model/histogram"
1718 "github.com/prometheus/prometheus/model/labels"
1819 "github.com/prometheus/prometheus/promql/parser"
1920)
2021
21- type ScalarSide int
22-
23- const (
24- ScalarSideBoth ScalarSide = iota
25- ScalarSideLeft
26- ScalarSideRight
27- )
28-
2922// scalarOperator evaluates expressions where one operand is a scalarOperator.
3023type scalarOperator struct {
3124 seriesOnce sync.Once
3225 series []labels.Labels
3326
34- pool * model.VectorPool
35- scalar model.VectorOperator
36- next model.VectorOperator
37- opType parser.ItemType
38- getOperands getOperandsFunc
39- operandValIdx int
40- floatOp operation
41- histOp histogramFloatOperation
27+ pool * model.VectorPool
28+ lhs model.VectorOperator
29+ rhs model.VectorOperator
30+ opType parser.ItemType
4231
4332 // If true then return the comparison result as 0/1.
4433 returnBool bool
4534
46- // Keep the result if both sides are scalars.
47- bothScalars bool
35+ lhsType parser. ValueType
36+ rhsType parser. ValueType
4837}
4938
5039func NewScalar (
5140 pool * model.VectorPool ,
52- next model.VectorOperator ,
53- scalar model.VectorOperator ,
54- op parser.ItemType ,
55- scalarSide ScalarSide ,
41+ lhs model.VectorOperator ,
42+ rhs model.VectorOperator ,
43+ lhsType parser.ValueType ,
44+ rhsType parser.ValueType ,
45+ opType parser.ItemType ,
5646 returnBool bool ,
5747 opts * query.Options ,
5848) (model.VectorOperator , error ) {
59- binaryOperation , err := newOperation (op , scalarSide != ScalarSideBoth )
60- if err != nil {
61- return nil , err
62- }
63- // operandValIdx 0 means to get lhs as the return value
64- // while 1 means to get rhs as the return value.
65- operandValIdx := 0
66- getOperands := getOperandsScalarRight
67- if scalarSide == ScalarSideLeft {
68- getOperands = getOperandsScalarLeft
69- operandValIdx = 1
49+ op := & scalarOperator {
50+ pool : pool ,
51+ lhs : lhs ,
52+ rhs : rhs ,
53+ lhsType : lhsType ,
54+ rhsType : rhsType ,
55+ opType : opType ,
56+ returnBool : returnBool ,
7057 }
7158
72- oper := & scalarOperator {
73- pool : pool ,
74- next : next ,
75- scalar : scalar ,
76- floatOp : binaryOperation ,
77- histOp : getHistogramFloatOperation (op , scalarSide ),
78- opType : op ,
79- getOperands : getOperands ,
80- operandValIdx : operandValIdx ,
81- returnBool : returnBool ,
82- bothScalars : scalarSide == ScalarSideBoth ,
83- }
84-
85- return telemetry .NewOperator (telemetry .NewTelemetry (op , opts ), oper ), nil
59+ return telemetry .NewOperator (telemetry .NewTelemetry (op , opts ), op ), nil
8660}
8761
8862func (o * scalarOperator ) Explain () (next []model.VectorOperator ) {
89- return []model.VectorOperator {o .next , o .scalar }
63+ return []model.VectorOperator {o .lhs , o .rhs }
9064}
9165
9266func (o * scalarOperator ) Series (ctx context.Context ) ([]labels.Labels , error ) {
@@ -109,77 +83,69 @@ func (o *scalarOperator) Next(ctx context.Context) ([]model.StepVector, error) {
10983 default :
11084 }
11185
112- in , err := o .next .Next (ctx )
113- if err != nil {
114- return nil , err
115- }
116- if in == nil {
117- return nil , nil
118- }
86+ var err error
11987 o .seriesOnce .Do (func () { err = o .loadSeries (ctx ) })
12088 if err != nil {
12189 return nil , err
12290 }
12391
124- scalarIn , err := o .scalar .Next (ctx )
125- if err != nil {
126- return nil , err
127- }
128-
129- out := o .pool .GetVectorBatch ()
130- for v , vector := range in {
131- step := o .pool .GetStepVector (vector .T )
132- scalarVal := math .NaN ()
133- if len (scalarIn ) > v && len (scalarIn [v ].Samples ) > 0 {
134- scalarVal = scalarIn [v ].Samples [0 ]
92+ var lhs []model.StepVector
93+ var lerrChan = make (chan error , 1 )
94+ go func () {
95+ var err error
96+ lhs , err = o .lhs .Next (ctx )
97+ if err != nil {
98+ lerrChan <- err
13599 }
100+ close (lerrChan )
101+ }()
136102
137- for i := range vector .Samples {
138- operands := o .getOperands (vector , i , scalarVal )
139- val , keep := o .floatOp (operands , o .operandValIdx )
140- if o .returnBool {
141- if ! o .bothScalars {
142- val = 0.0
143- if keep {
144- val = 1.0
145- }
146- }
147- } else if ! keep {
148- continue
149- }
150- step .AppendSample (o .pool , vector .SampleIDs [i ], val )
151- }
152-
153- for i := range vector .HistogramIDs {
154- val := o .histOp (ctx , vector .Histograms [i ], scalarVal )
155- if val != nil {
156- step .AppendHistogram (o .pool , vector .HistogramIDs [i ], val )
157- }
158- }
103+ rhs , rerr := o .rhs .Next (ctx )
104+ lerr := <- lerrChan
105+ if rerr != nil {
106+ return nil , rerr
107+ }
108+ if lerr != nil {
109+ return nil , lerr
110+ }
159111
160- out = append (out , step )
161- o .next .GetPool ().PutStepVector (vector )
112+ // TODO(fpetkovski): When one operator becomes empty,
113+ // we might want to drain or close the other one.
114+ // We don't have a concept of closing an operator yet.
115+ if len (lhs ) == 0 || len (rhs ) == 0 {
116+ return nil , nil
162117 }
163118
164- for i := range scalarIn {
165- o .scalar .GetPool ().PutStepVector (scalarIn [i ])
119+ batch := o .pool .GetVectorBatch ()
120+ for i := range lhs {
121+ if i < len (rhs ) {
122+ step := o .execBinaryOperation (ctx , lhs [i ], rhs [i ])
123+ batch = append (batch , step )
124+ o .rhs .GetPool ().PutStepVector (rhs [i ])
125+ }
126+ o .lhs .GetPool ().PutStepVector (lhs [i ])
166127 }
128+ o .lhs .GetPool ().PutVectors (lhs )
129+ o .rhs .GetPool ().PutVectors (rhs )
167130
168- o .next .GetPool ().PutVectors (in )
169- o .scalar .GetPool ().PutVectors (scalarIn )
131+ return batch , nil
170132
171- return out , nil
172133}
173134
174135func (o * scalarOperator ) GetPool () * model.VectorPool {
175136 return o .pool
176137}
177138
178139func (o * scalarOperator ) loadSeries (ctx context.Context ) error {
179- vectorSeries , err := o .next .Series (ctx )
140+ vectorSide := o .lhs
141+ if o .lhsType == parser .ValueTypeScalar {
142+ vectorSide = o .rhs
143+ }
144+ vectorSeries , err := vectorSide .Series (ctx )
180145 if err != nil {
181146 return err
182147 }
148+
183149 series := make ([]labels.Labels , len (vectorSeries ))
184150 b := labels.ScratchBuilder {}
185151 for i := range vectorSeries {
@@ -198,12 +164,61 @@ func (o *scalarOperator) loadSeries(ctx context.Context) error {
198164 return nil
199165}
200166
201- type getOperandsFunc func (v model.StepVector , i int , scalar float64 ) [2 ]float64
167+ func (o * scalarOperator ) execBinaryOperation (ctx context.Context , lhs , rhs model.StepVector ) model.StepVector {
168+ ts := lhs .T
169+ step := o .pool .GetStepVector (ts )
202170
203- func getOperandsScalarLeft (v model.StepVector , i int , scalar float64 ) [2 ]float64 {
204- return [2 ]float64 {scalar , v .Samples [i ]}
205- }
171+ scalar , other := lhs , rhs
172+ if o .lhsType != parser .ValueTypeScalar {
173+ scalar , other = rhs , lhs
174+ }
175+
176+ var (
177+ v float64
178+ h * histogram.FloatHistogram
179+ keep bool
180+ err error
181+ )
182+ for i , otherVal := range other .Samples {
183+ scalarVal := scalar .Samples [0 ]
184+
185+ if o .lhsType == parser .ValueTypeScalar {
186+ v , _ , keep , err = binOp (o .opType , scalarVal , otherVal , nil , nil )
187+ } else {
188+ v , _ , keep , err = binOp (o .opType , otherVal , scalarVal , nil , nil )
189+ }
190+ if err != nil {
191+ warnings .AddToContext (err , ctx )
192+ continue
193+ }
194+ // in comparison operations between scalars and vectors, the vectors are filtered, regardless if lhs or rhs
195+ if keep && o .opType .IsComparisonOperator () && (o .lhsType == parser .ValueTypeVector || o .rhsType == parser .ValueTypeVector ) {
196+ v = otherVal
197+ }
198+ if o .returnBool {
199+ v = 0.0
200+ if keep {
201+ v = 1.0
202+ }
203+ } else if ! keep {
204+ continue
205+ }
206+ step .AppendSample (o .pool , other .SampleIDs [i ], v )
207+ }
208+ for i , otherVal := range other .Histograms {
209+ scalarVal := scalar .Samples [0 ]
210+
211+ if o .lhsType == parser .ValueTypeScalar {
212+ _ , h , _ , err = binOp (o .opType , scalarVal , 0. , nil , otherVal )
213+ } else {
214+ _ , h , _ , err = binOp (o .opType , 0. , scalarVal , otherVal , nil )
215+ }
216+ if err != nil {
217+ warnings .AddToContext (err , ctx )
218+ continue
219+ }
220+ step .AppendHistogram (o .pool , other .HistogramIDs [i ], h )
221+ }
206222
207- func getOperandsScalarRight (v model.StepVector , i int , scalar float64 ) [2 ]float64 {
208- return [2 ]float64 {v .Samples [i ], scalar }
223+ return step
209224}
0 commit comments