Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions execution/aggregate/hashaggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/thanos-io/promql-engine/execution/parse"
"github.com/thanos-io/promql-engine/execution/telemetry"
"github.com/thanos-io/promql-engine/execution/warnings"
"github.com/thanos-io/promql-engine/extwarnings"
"github.com/thanos-io/promql-engine/query"

"github.com/efficientgo/core/errors"
Expand Down Expand Up @@ -175,13 +176,13 @@ func (a *aggregate) Next(ctx context.Context) ([]model.StepVector, error) {
}

func (a *aggregate) aggregate(in []model.StepVector) error {
var warn warning
var err error
for i, vector := range in {
warn = coalesceWarn(warn, a.tables[i].aggregate(vector))
err = extwarnings.Coalesce(err, a.tables[i].aggregate(vector))
a.next.GetPool().PutStepVector(vector)
}
a.next.GetPool().PutVectors(in)
return warn
return err
}

func (a *aggregate) initializeTables(ctx context.Context) error {
Expand Down
75 changes: 26 additions & 49 deletions execution/aggregate/scalar_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@ import (
"context"
"fmt"
"math"
"sort"

"github.com/thanos-io/promql-engine/execution/model"
"github.com/thanos-io/promql-engine/execution/parse"
"github.com/thanos-io/promql-engine/execution/warnings"
"github.com/thanos-io/promql-engine/extmath"
"github.com/thanos-io/promql-engine/extwarnings"

"github.com/efficientgo/core/errors"
"github.com/prometheus/prometheus/model/histogram"
Expand All @@ -28,7 +29,7 @@ type aggregateTable interface {
// If the table is empty, it returns math.MinInt64.
timestamp() int64
// aggregate aggregates the given vector into the table.
aggregate(vector model.StepVector) warning
aggregate(vector model.StepVector) error
// toVector writes out the accumulated result to the given vector and
// resets the table.
toVector(ctx context.Context, pool *model.VectorPool) model.StepVector
Expand All @@ -41,7 +42,7 @@ type scalarTable struct {
ts int64
inputs []uint64
outputs []*model.Series
accumulators []accumulator
accumulators []extmath.Accumulator
}

func newScalarTables(stepsBatch int, inputCache []uint64, outputCache []*model.Series, aggregation parser.ItemType) ([]aggregateTable, error) {
Expand All @@ -61,7 +62,7 @@ func (t *scalarTable) timestamp() int64 {
}

func newScalarTable(inputSampleIDs []uint64, outputs []*model.Series, aggregation parser.ItemType) (*scalarTable, error) {
accumulators := make([]accumulator, len(outputs))
accumulators := make([]extmath.Accumulator, len(outputs))
for i := range accumulators {
acc, err := newScalarAccumulator(aggregation)
if err != nil {
Expand All @@ -77,27 +78,27 @@ func newScalarTable(inputSampleIDs []uint64, outputs []*model.Series, aggregatio
}, nil
}

func (t *scalarTable) aggregate(vector model.StepVector) warning {
func (t *scalarTable) aggregate(vector model.StepVector) error {
t.ts = vector.T

var warn warning
var err error
for i := range vector.Samples {
warn = coalesceWarn(warn, t.addSample(vector.SampleIDs[i], vector.Samples[i]))
err = extwarnings.Coalesce(err, t.addSample(vector.SampleIDs[i], vector.Samples[i]))
}
for i := range vector.Histograms {
warn = coalesceWarn(warn, t.addHistogram(vector.HistogramIDs[i], vector.Histograms[i]))
err = extwarnings.Coalesce(err, t.addHistogram(vector.HistogramIDs[i], vector.Histograms[i]))
}
return warn
return err
}

func (t *scalarTable) addSample(sampleID uint64, sample float64) warning {
func (t *scalarTable) addSample(sampleID uint64, sample float64) error {
outputSampleID := t.inputs[sampleID]
output := t.outputs[outputSampleID]

return t.accumulators[output.ID].Add(sample, nil)
}

func (t *scalarTable) addHistogram(sampleID uint64, h *histogram.FloatHistogram) warning {
func (t *scalarTable) addHistogram(sampleID uint64, h *histogram.FloatHistogram) error {
outputSampleID := t.inputs[sampleID]
output := t.outputs[outputSampleID]

Expand All @@ -115,16 +116,16 @@ func (t *scalarTable) toVector(ctx context.Context, pool *model.VectorPool) mode
result := pool.GetStepVector(t.ts)
for i, v := range t.outputs {
switch t.accumulators[i].ValueType() {
case NoValue:
case extmath.NoValue:
continue
case SingleTypeValue:
case extmath.SingleTypeValue:
f, h := t.accumulators[i].Value()
if h == nil {
result.AppendSample(pool, v.ID, f)
} else {
result.AppendHistogram(pool, v.ID, h)
}
case MixedTypeValue:
case extmath.MixedTypeValue:
warnings.AddToContext(annotations.NewMixedFloatsHistogramsAggWarning(posrange.PositionRange{}), ctx)
}
}
Expand Down Expand Up @@ -181,55 +182,31 @@ func addRatioSample(ratioLimit float64, series labels.Labels) bool {
(ratioLimit < 0 && sampleOffset >= (1.0+ratioLimit))
}

func newScalarAccumulator(expr parser.ItemType) (accumulator, error) {
func newScalarAccumulator(expr parser.ItemType) (extmath.Accumulator, error) {
t := parser.ItemTypeStr[expr]
switch t {
case "sum":
return newSumAcc(), nil
return extmath.NewSumAcc(), nil
case "max":
return newMaxAcc(), nil
return extmath.NewMaxAcc(), nil
case "min":
return newMinAcc(), nil
return extmath.NewMinAcc(), nil
case "count":
return newCountAcc(), nil
return extmath.NewCountAcc(), nil
case "avg":
return newAvgAcc(), nil
return extmath.NewAvgAcc(), nil
case "group":
return newGroupAcc(), nil
return extmath.NewGroupAcc(), nil
case "stddev":
return newStdDevAcc(), nil
return extmath.NewStdDevAcc(), nil
case "stdvar":
return newStdVarAcc(), nil
return extmath.NewStdVarAcc(), nil
case "quantile":
return newQuantileAcc(), nil
return extmath.NewQuantileAcc(), nil
case "histogram_avg":
return newHistogramAvg(), nil
return extmath.NewHistogramAvgAcc(), nil
}

msg := fmt.Sprintf("unknown aggregation function %s", t)
return nil, errors.Wrap(parse.ErrNotSupportedExpr, msg)
}

func Quantile(q float64, points []float64) float64 {
if len(points) == 0 || math.IsNaN(q) {
return math.NaN()
}
if q < 0 {
return math.Inf(-1)
}
if q > 1 {
return math.Inf(+1)
}
sort.Float64s(points)

n := float64(len(points))
// When the quantile lies between two samples,
// we use a weighted average of the two samples.
rank := q * (n - 1)

lowerIndex := math.Max(0, math.Floor(rank))
upperIndex := math.Min(n-1, lowerIndex+1)

weight := rank - math.Floor(rank)
return points[int(lowerIndex)]*(1-weight) + points[int(upperIndex)]*weight
}
72 changes: 14 additions & 58 deletions execution/aggregate/vector_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,17 @@ import (
"github.com/thanos-io/promql-engine/execution/model"
"github.com/thanos-io/promql-engine/execution/parse"
"github.com/thanos-io/promql-engine/execution/warnings"
"github.com/thanos-io/promql-engine/extmath"

"github.com/efficientgo/core/errors"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/promql/parser"
"github.com/prometheus/prometheus/promql/parser/posrange"
"github.com/prometheus/prometheus/util/annotations"
)

type vectorTable struct {
ts int64
accumulator vectorAccumulator
accumulator extmath.VectorAccumulator
}

func newVectorizedTables(stepsBatch int, a parser.ItemType) ([]aggregateTable, error) {
Expand All @@ -37,7 +37,7 @@ func newVectorizedTables(stepsBatch int, a parser.ItemType) ([]aggregateTable, e
return tables, nil
}

func newVectorizedTable(a vectorAccumulator) *vectorTable {
func newVectorizedTable(a extmath.VectorAccumulator) *vectorTable {
return &vectorTable{
ts: math.MinInt64,
accumulator: a,
Expand All @@ -48,24 +48,24 @@ func (t *vectorTable) timestamp() int64 {
return t.ts
}

func (t *vectorTable) aggregate(vector model.StepVector) warning {
func (t *vectorTable) aggregate(vector model.StepVector) error {
t.ts = vector.T
return t.accumulator.AddVector(vector.Samples, vector.Histograms)
}

func (t *vectorTable) toVector(ctx context.Context, pool *model.VectorPool) model.StepVector {
result := pool.GetStepVector(t.ts)
switch t.accumulator.ValueType() {
case NoValue:
case extmath.NoValue:
return result
case SingleTypeValue:
case extmath.SingleTypeValue:
v, h := t.accumulator.Value()
if h == nil {
result.AppendSample(pool, 0, v)
} else {
result.AppendHistogram(pool, 0, h)
}
case MixedTypeValue:
case extmath.MixedTypeValue:
warnings.AddToContext(annotations.NewMixedFloatsHistogramsAggWarning(posrange.PositionRange{}), ctx)
}
return result
Expand All @@ -76,66 +76,22 @@ func (t *vectorTable) reset(p float64) {
t.accumulator.Reset(p)
}

func newVectorAccumulator(expr parser.ItemType) (vectorAccumulator, error) {
func newVectorAccumulator(expr parser.ItemType) (extmath.VectorAccumulator, error) {
t := parser.ItemTypeStr[expr]
switch t {
case "sum":
return newSumAcc(), nil
return extmath.NewSumAcc(), nil
case "max":
return newMaxAcc(), nil
return extmath.NewMaxAcc(), nil
case "min":
return newMinAcc(), nil
return extmath.NewMinAcc(), nil
case "count":
return newCountAcc(), nil
return extmath.NewCountAcc(), nil
case "avg":
return newAvgAcc(), nil
return extmath.NewAvgAcc(), nil
case "group":
return newGroupAcc(), nil
return extmath.NewGroupAcc(), nil
}
msg := fmt.Sprintf("unknown aggregation function %s", t)
return nil, errors.Wrap(parse.ErrNotSupportedExpr, msg)
}

func histogramSum(current *histogram.FloatHistogram, histograms []*histogram.FloatHistogram) (*histogram.FloatHistogram, warning) {
if len(histograms) == 0 {
return current, nil
}
if current == nil && len(histograms) == 1 {
return histograms[0].Copy(), nil
}
var histSum *histogram.FloatHistogram
if current != nil {
histSum = current.Copy()
} else {
histSum = histograms[0].Copy()
histograms = histograms[1:]
}

var err error
for i := range histograms {
if histograms[i].Schema >= histSum.Schema {
histSum, err = histSum.Add(histograms[i])
if err != nil {
if errors.Is(err, histogram.ErrHistogramsIncompatibleSchema) {
return nil, annotations.MixedExponentialCustomHistogramsWarning
}
if errors.Is(err, histogram.ErrHistogramsIncompatibleBounds) {
return nil, annotations.IncompatibleCustomBucketsHistogramsWarning
}
return nil, err
}
} else {
t := histograms[i].Copy()
if histSum, err = t.Add(histSum); err != nil {
if errors.Is(err, histogram.ErrHistogramsIncompatibleSchema) {
return nil, annotations.NewMixedExponentialCustomHistogramsWarning("", posrange.PositionRange{})
}
if errors.Is(err, histogram.ErrHistogramsIncompatibleBounds) {
return nil, annotations.NewIncompatibleCustomBucketsHistogramsWarning("", posrange.PositionRange{})
}
return nil, err
}
}
}
return histSum, nil
}
6 changes: 3 additions & 3 deletions execution/function/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"math"
"time"

"github.com/thanos-io/promql-engine/execution/aggregate"
"github.com/thanos-io/promql-engine/extmath"

"github.com/prometheus/prometheus/model/histogram"
)
Expand Down Expand Up @@ -337,7 +337,7 @@ func histogramStdDev(h *histogram.FloatHistogram) float64 {
}
}
delta := val - mean
variance, cVariance = aggregate.KahanSumInc(bucket.Count*delta*delta, variance, cVariance)
variance, cVariance = extmath.KahanSumInc(bucket.Count*delta*delta, variance, cVariance)
}
variance += cVariance
variance /= h.Count
Expand Down Expand Up @@ -370,7 +370,7 @@ func histogramStdVar(h *histogram.FloatHistogram) float64 {
}
}
delta := val - mean
variance, cVariance = aggregate.KahanSumInc(bucket.Count*delta*delta, variance, cVariance)
variance, cVariance = extmath.KahanSumInc(bucket.Count*delta*delta, variance, cVariance)
}
variance += cVariance
variance /= h.Count
Expand Down
Loading
Loading