Skip to content

Commit c168863

Browse files
committed
execution: add extwarning and extmath packages
Signed-off-by: Michael Hoffmann <[email protected]>
1 parent 6a2ae45 commit c168863

File tree

7 files changed

+247
-243
lines changed

7 files changed

+247
-243
lines changed

execution/aggregate/hashaggregate.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"github.com/thanos-io/promql-engine/execution/parse"
1414
"github.com/thanos-io/promql-engine/execution/telemetry"
1515
"github.com/thanos-io/promql-engine/execution/warnings"
16+
"github.com/thanos-io/promql-engine/extwarnings"
1617
"github.com/thanos-io/promql-engine/query"
1718

1819
"github.com/efficientgo/core/errors"
@@ -175,13 +176,13 @@ func (a *aggregate) Next(ctx context.Context) ([]model.StepVector, error) {
175176
}
176177

177178
func (a *aggregate) aggregate(in []model.StepVector) error {
178-
var warn warning
179+
var err error
179180
for i, vector := range in {
180-
warn = coalesceWarn(warn, a.tables[i].aggregate(vector))
181+
err = extwarnings.Coalesce(err, a.tables[i].aggregate(vector))
181182
a.next.GetPool().PutStepVector(vector)
182183
}
183184
a.next.GetPool().PutVectors(in)
184-
return warn
185+
return err
185186
}
186187

187188
func (a *aggregate) initializeTables(ctx context.Context) error {

execution/aggregate/scalar_table.go

Lines changed: 26 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,12 @@ import (
77
"context"
88
"fmt"
99
"math"
10-
"sort"
1110

1211
"github.com/thanos-io/promql-engine/execution/model"
1312
"github.com/thanos-io/promql-engine/execution/parse"
1413
"github.com/thanos-io/promql-engine/execution/warnings"
14+
"github.com/thanos-io/promql-engine/extmath"
15+
"github.com/thanos-io/promql-engine/extwarnings"
1516

1617
"github.com/efficientgo/core/errors"
1718
"github.com/prometheus/prometheus/model/histogram"
@@ -28,7 +29,7 @@ type aggregateTable interface {
2829
// If the table is empty, it returns math.MinInt64.
2930
timestamp() int64
3031
// aggregate aggregates the given vector into the table.
31-
aggregate(vector model.StepVector) warning
32+
aggregate(vector model.StepVector) error
3233
// toVector writes out the accumulated result to the given vector and
3334
// resets the table.
3435
toVector(ctx context.Context, pool *model.VectorPool) model.StepVector
@@ -41,7 +42,7 @@ type scalarTable struct {
4142
ts int64
4243
inputs []uint64
4344
outputs []*model.Series
44-
accumulators []accumulator
45+
accumulators []extmath.Accumulator
4546
}
4647

4748
func newScalarTables(stepsBatch int, inputCache []uint64, outputCache []*model.Series, aggregation parser.ItemType) ([]aggregateTable, error) {
@@ -61,7 +62,7 @@ func (t *scalarTable) timestamp() int64 {
6162
}
6263

6364
func newScalarTable(inputSampleIDs []uint64, outputs []*model.Series, aggregation parser.ItemType) (*scalarTable, error) {
64-
accumulators := make([]accumulator, len(outputs))
65+
accumulators := make([]extmath.Accumulator, len(outputs))
6566
for i := range accumulators {
6667
acc, err := newScalarAccumulator(aggregation)
6768
if err != nil {
@@ -77,27 +78,27 @@ func newScalarTable(inputSampleIDs []uint64, outputs []*model.Series, aggregatio
7778
}, nil
7879
}
7980

80-
func (t *scalarTable) aggregate(vector model.StepVector) warning {
81+
func (t *scalarTable) aggregate(vector model.StepVector) error {
8182
t.ts = vector.T
8283

83-
var warn warning
84+
var err error
8485
for i := range vector.Samples {
85-
warn = coalesceWarn(warn, t.addSample(vector.SampleIDs[i], vector.Samples[i]))
86+
err = extwarnings.Coalesce(err, t.addSample(vector.SampleIDs[i], vector.Samples[i]))
8687
}
8788
for i := range vector.Histograms {
88-
warn = coalesceWarn(warn, t.addHistogram(vector.HistogramIDs[i], vector.Histograms[i]))
89+
err = extwarnings.Coalesce(err, t.addHistogram(vector.HistogramIDs[i], vector.Histograms[i]))
8990
}
90-
return warn
91+
return err
9192
}
9293

93-
func (t *scalarTable) addSample(sampleID uint64, sample float64) warning {
94+
func (t *scalarTable) addSample(sampleID uint64, sample float64) error {
9495
outputSampleID := t.inputs[sampleID]
9596
output := t.outputs[outputSampleID]
9697

9798
return t.accumulators[output.ID].Add(sample, nil)
9899
}
99100

100-
func (t *scalarTable) addHistogram(sampleID uint64, h *histogram.FloatHistogram) warning {
101+
func (t *scalarTable) addHistogram(sampleID uint64, h *histogram.FloatHistogram) error {
101102
outputSampleID := t.inputs[sampleID]
102103
output := t.outputs[outputSampleID]
103104

@@ -115,16 +116,16 @@ func (t *scalarTable) toVector(ctx context.Context, pool *model.VectorPool) mode
115116
result := pool.GetStepVector(t.ts)
116117
for i, v := range t.outputs {
117118
switch t.accumulators[i].ValueType() {
118-
case NoValue:
119+
case extmath.NoValue:
119120
continue
120-
case SingleTypeValue:
121+
case extmath.SingleTypeValue:
121122
f, h := t.accumulators[i].Value()
122123
if h == nil {
123124
result.AppendSample(pool, v.ID, f)
124125
} else {
125126
result.AppendHistogram(pool, v.ID, h)
126127
}
127-
case MixedTypeValue:
128+
case extmath.MixedTypeValue:
128129
warnings.AddToContext(annotations.NewMixedFloatsHistogramsAggWarning(posrange.PositionRange{}), ctx)
129130
}
130131
}
@@ -181,55 +182,31 @@ func addRatioSample(ratioLimit float64, series labels.Labels) bool {
181182
(ratioLimit < 0 && sampleOffset >= (1.0+ratioLimit))
182183
}
183184

184-
func newScalarAccumulator(expr parser.ItemType) (accumulator, error) {
185+
func newScalarAccumulator(expr parser.ItemType) (extmath.Accumulator, error) {
185186
t := parser.ItemTypeStr[expr]
186187
switch t {
187188
case "sum":
188-
return newSumAcc(), nil
189+
return extmath.NewSumAcc(), nil
189190
case "max":
190-
return newMaxAcc(), nil
191+
return extmath.NewMaxAcc(), nil
191192
case "min":
192-
return newMinAcc(), nil
193+
return extmath.NewMinAcc(), nil
193194
case "count":
194-
return newCountAcc(), nil
195+
return extmath.NewCountAcc(), nil
195196
case "avg":
196-
return newAvgAcc(), nil
197+
return extmath.NewAvgAcc(), nil
197198
case "group":
198-
return newGroupAcc(), nil
199+
return extmath.NewGroupAcc(), nil
199200
case "stddev":
200-
return newStdDevAcc(), nil
201+
return extmath.NewStdDevAcc(), nil
201202
case "stdvar":
202-
return newStdVarAcc(), nil
203+
return extmath.NewStdVarAcc(), nil
203204
case "quantile":
204-
return newQuantileAcc(), nil
205+
return extmath.NewQuantileAcc(), nil
205206
case "histogram_avg":
206-
return newHistogramAvg(), nil
207+
return extmath.NewHistogramAvgAcc(), nil
207208
}
208209

209210
msg := fmt.Sprintf("unknown aggregation function %s", t)
210211
return nil, errors.Wrap(parse.ErrNotSupportedExpr, msg)
211212
}
212-
213-
func Quantile(q float64, points []float64) float64 {
214-
if len(points) == 0 || math.IsNaN(q) {
215-
return math.NaN()
216-
}
217-
if q < 0 {
218-
return math.Inf(-1)
219-
}
220-
if q > 1 {
221-
return math.Inf(+1)
222-
}
223-
sort.Float64s(points)
224-
225-
n := float64(len(points))
226-
// When the quantile lies between two samples,
227-
// we use a weighted average of the two samples.
228-
rank := q * (n - 1)
229-
230-
lowerIndex := math.Max(0, math.Floor(rank))
231-
upperIndex := math.Min(n-1, lowerIndex+1)
232-
233-
weight := rank - math.Floor(rank)
234-
return points[int(lowerIndex)]*(1-weight) + points[int(upperIndex)]*weight
235-
}

execution/aggregate/vector_table.go

Lines changed: 14 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -11,17 +11,17 @@ import (
1111
"github.com/thanos-io/promql-engine/execution/model"
1212
"github.com/thanos-io/promql-engine/execution/parse"
1313
"github.com/thanos-io/promql-engine/execution/warnings"
14+
"github.com/thanos-io/promql-engine/extmath"
1415

1516
"github.com/efficientgo/core/errors"
16-
"github.com/prometheus/prometheus/model/histogram"
1717
"github.com/prometheus/prometheus/promql/parser"
1818
"github.com/prometheus/prometheus/promql/parser/posrange"
1919
"github.com/prometheus/prometheus/util/annotations"
2020
)
2121

2222
type vectorTable struct {
2323
ts int64
24-
accumulator vectorAccumulator
24+
accumulator extmath.VectorAccumulator
2525
}
2626

2727
func newVectorizedTables(stepsBatch int, a parser.ItemType) ([]aggregateTable, error) {
@@ -37,7 +37,7 @@ func newVectorizedTables(stepsBatch int, a parser.ItemType) ([]aggregateTable, e
3737
return tables, nil
3838
}
3939

40-
func newVectorizedTable(a vectorAccumulator) *vectorTable {
40+
func newVectorizedTable(a extmath.VectorAccumulator) *vectorTable {
4141
return &vectorTable{
4242
ts: math.MinInt64,
4343
accumulator: a,
@@ -48,24 +48,24 @@ func (t *vectorTable) timestamp() int64 {
4848
return t.ts
4949
}
5050

51-
func (t *vectorTable) aggregate(vector model.StepVector) warning {
51+
func (t *vectorTable) aggregate(vector model.StepVector) error {
5252
t.ts = vector.T
5353
return t.accumulator.AddVector(vector.Samples, vector.Histograms)
5454
}
5555

5656
func (t *vectorTable) toVector(ctx context.Context, pool *model.VectorPool) model.StepVector {
5757
result := pool.GetStepVector(t.ts)
5858
switch t.accumulator.ValueType() {
59-
case NoValue:
59+
case extmath.NoValue:
6060
return result
61-
case SingleTypeValue:
61+
case extmath.SingleTypeValue:
6262
v, h := t.accumulator.Value()
6363
if h == nil {
6464
result.AppendSample(pool, 0, v)
6565
} else {
6666
result.AppendHistogram(pool, 0, h)
6767
}
68-
case MixedTypeValue:
68+
case extmath.MixedTypeValue:
6969
warnings.AddToContext(annotations.NewMixedFloatsHistogramsAggWarning(posrange.PositionRange{}), ctx)
7070
}
7171
return result
@@ -76,66 +76,22 @@ func (t *vectorTable) reset(p float64) {
7676
t.accumulator.Reset(p)
7777
}
7878

79-
func newVectorAccumulator(expr parser.ItemType) (vectorAccumulator, error) {
79+
func newVectorAccumulator(expr parser.ItemType) (extmath.VectorAccumulator, error) {
8080
t := parser.ItemTypeStr[expr]
8181
switch t {
8282
case "sum":
83-
return newSumAcc(), nil
83+
return extmath.NewSumAcc(), nil
8484
case "max":
85-
return newMaxAcc(), nil
85+
return extmath.NewMaxAcc(), nil
8686
case "min":
87-
return newMinAcc(), nil
87+
return extmath.NewMinAcc(), nil
8888
case "count":
89-
return newCountAcc(), nil
89+
return extmath.NewCountAcc(), nil
9090
case "avg":
91-
return newAvgAcc(), nil
91+
return extmath.NewAvgAcc(), nil
9292
case "group":
93-
return newGroupAcc(), nil
93+
return extmath.NewGroupAcc(), nil
9494
}
9595
msg := fmt.Sprintf("unknown aggregation function %s", t)
9696
return nil, errors.Wrap(parse.ErrNotSupportedExpr, msg)
9797
}
98-
99-
func histogramSum(current *histogram.FloatHistogram, histograms []*histogram.FloatHistogram) (*histogram.FloatHistogram, warning) {
100-
if len(histograms) == 0 {
101-
return current, nil
102-
}
103-
if current == nil && len(histograms) == 1 {
104-
return histograms[0].Copy(), nil
105-
}
106-
var histSum *histogram.FloatHistogram
107-
if current != nil {
108-
histSum = current.Copy()
109-
} else {
110-
histSum = histograms[0].Copy()
111-
histograms = histograms[1:]
112-
}
113-
114-
var err error
115-
for i := range histograms {
116-
if histograms[i].Schema >= histSum.Schema {
117-
histSum, err = histSum.Add(histograms[i])
118-
if err != nil {
119-
if errors.Is(err, histogram.ErrHistogramsIncompatibleSchema) {
120-
return nil, annotations.MixedExponentialCustomHistogramsWarning
121-
}
122-
if errors.Is(err, histogram.ErrHistogramsIncompatibleBounds) {
123-
return nil, annotations.IncompatibleCustomBucketsHistogramsWarning
124-
}
125-
return nil, err
126-
}
127-
} else {
128-
t := histograms[i].Copy()
129-
if histSum, err = t.Add(histSum); err != nil {
130-
if errors.Is(err, histogram.ErrHistogramsIncompatibleSchema) {
131-
return nil, annotations.NewMixedExponentialCustomHistogramsWarning("", posrange.PositionRange{})
132-
}
133-
if errors.Is(err, histogram.ErrHistogramsIncompatibleBounds) {
134-
return nil, annotations.NewIncompatibleCustomBucketsHistogramsWarning("", posrange.PositionRange{})
135-
}
136-
return nil, err
137-
}
138-
}
139-
}
140-
return histSum, nil
141-
}

execution/function/functions.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import (
77
"math"
88
"time"
99

10-
"github.com/thanos-io/promql-engine/execution/aggregate"
10+
"github.com/thanos-io/promql-engine/extmath"
1111

1212
"github.com/prometheus/prometheus/model/histogram"
1313
)
@@ -337,7 +337,7 @@ func histogramStdDev(h *histogram.FloatHistogram) float64 {
337337
}
338338
}
339339
delta := val - mean
340-
variance, cVariance = aggregate.KahanSumInc(bucket.Count*delta*delta, variance, cVariance)
340+
variance, cVariance = extmath.KahanSumInc(bucket.Count*delta*delta, variance, cVariance)
341341
}
342342
variance += cVariance
343343
variance /= h.Count
@@ -370,7 +370,7 @@ func histogramStdVar(h *histogram.FloatHistogram) float64 {
370370
}
371371
}
372372
delta := val - mean
373-
variance, cVariance = aggregate.KahanSumInc(bucket.Count*delta*delta, variance, cVariance)
373+
variance, cVariance = extmath.KahanSumInc(bucket.Count*delta*delta, variance, cVariance)
374374
}
375375
variance += cVariance
376376
variance /= h.Count

0 commit comments

Comments
 (0)