Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 53 additions & 60 deletions execution/aggregate/accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,8 @@
package aggregate

import (
"context"
"math"

"github.com/thanos-io/promql-engine/execution/warnings"

"github.com/efficientgo/core/errors"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/promql/parser/posrange"
Expand All @@ -24,15 +21,17 @@ const (
MixedTypeValue
)

type warning error

type accumulator interface {
Add(ctx context.Context, v float64, h *histogram.FloatHistogram) error
Add(v float64, h *histogram.FloatHistogram) warning
Value() (float64, *histogram.FloatHistogram)
ValueType() ValueType
Reset(float64)
}

type vectorAccumulator interface {
AddVector(ctx context.Context, vs []float64, hs []*histogram.FloatHistogram) error
AddVector(vs []float64, hs []*histogram.FloatHistogram) warning
Value() (float64, *histogram.FloatHistogram)
ValueType() ValueType
Reset(float64)
Expand All @@ -49,20 +48,20 @@ func newSumAcc() *sumAcc {
return &sumAcc{}
}

func (s *sumAcc) AddVector(ctx context.Context, float64s []float64, histograms []*histogram.FloatHistogram) error {
func (s *sumAcc) AddVector(float64s []float64, histograms []*histogram.FloatHistogram) warning {
if len(float64s) > 0 {
s.value, s.compensation = KahanSumInc(SumCompensated(float64s), s.value, s.compensation)
s.hasFloatVal = true
}

var err error
if len(histograms) > 0 {
s.histSum, err = histogramSum(ctx, s.histSum, histograms)
s.histSum, err = histogramSum(s.histSum, histograms)
}
return err
}

func (s *sumAcc) Add(ctx context.Context, v float64, h *histogram.FloatHistogram) error {
func (s *sumAcc) Add(v float64, h *histogram.FloatHistogram) warning {
if h == nil {
s.hasFloatVal = true
s.value, s.compensation = KahanSumInc(v, s.value, s.compensation)
Expand All @@ -78,25 +77,21 @@ func (s *sumAcc) Add(ctx context.Context, v float64, h *histogram.FloatHistogram
if h.Schema >= s.histSum.Schema {
if s.histSum, err = s.histSum.Add(h); err != nil {
if errors.Is(err, histogram.ErrHistogramsIncompatibleSchema) {
warnings.AddToContext(annotations.MixedExponentialCustomHistogramsWarning, ctx)
return nil
return annotations.MixedExponentialCustomHistogramsWarning
}
if errors.Is(err, histogram.ErrHistogramsIncompatibleBounds) {
warnings.AddToContext(annotations.IncompatibleCustomBucketsHistogramsWarning, ctx)
return nil
return annotations.IncompatibleCustomBucketsHistogramsWarning
}
return err
}
} else {
t := h.Copy()
if s.histSum, err = t.Add(s.histSum); err != nil {
if errors.Is(err, histogram.ErrHistogramsIncompatibleSchema) {
warnings.AddToContext(annotations.MixedExponentialCustomHistogramsWarning, ctx)
return nil
return annotations.MixedExponentialCustomHistogramsWarning
}
if errors.Is(err, histogram.ErrHistogramsIncompatibleBounds) {
warnings.AddToContext(annotations.IncompatibleCustomBucketsHistogramsWarning, ctx)
return nil
return annotations.IncompatibleCustomBucketsHistogramsWarning
}
return err
}
Expand Down Expand Up @@ -138,26 +133,25 @@ type maxAcc struct {
hasValue bool
}

func (c *maxAcc) AddVector(ctx context.Context, vs []float64, hs []*histogram.FloatHistogram) error {
func (c *maxAcc) AddVector(vs []float64, hs []*histogram.FloatHistogram) warning {
var warn warning
if len(hs) > 0 {
warnings.AddToContext(annotations.NewHistogramIgnoredInAggregationInfo("max", posrange.PositionRange{}), ctx)
warn = annotations.NewHistogramIgnoredInAggregationInfo("max", posrange.PositionRange{})
}

if len(vs) == 0 {
return nil
return warn
}

fst, rem := vs[0], vs[1:]
if err := c.Add(ctx, fst, nil); err != nil {
return err
}
warn = coalesceWarn(warn, c.Add(fst, nil))
if len(rem) == 0 {
return nil
return warn
}
return c.Add(ctx, floats.Max(rem), nil)
warn = coalesceWarn(warn, c.Add(floats.Max(rem), nil))
return warn
}

func (c *maxAcc) Add(ctx context.Context, v float64, h *histogram.FloatHistogram) error {
func (c *maxAcc) Add(v float64, h *histogram.FloatHistogram) warning {
if h != nil {
return nil
}
Expand Down Expand Up @@ -199,26 +193,25 @@ type minAcc struct {
hasValue bool
}

func (c *minAcc) AddVector(ctx context.Context, vs []float64, hs []*histogram.FloatHistogram) error {
func (c *minAcc) AddVector(vs []float64, hs []*histogram.FloatHistogram) warning {
var warn warning
if len(hs) > 0 {
warnings.AddToContext(annotations.NewHistogramIgnoredInAggregationInfo("min", posrange.PositionRange{}), ctx)
warn = annotations.NewHistogramIgnoredInAggregationInfo("min", posrange.PositionRange{})
}

if len(vs) == 0 {
return nil
return warn
}

fst, rem := vs[0], vs[1:]
if err := c.Add(ctx, fst, nil); err != nil {
return err
}
warn = coalesceWarn(warn, c.Add(fst, nil))
if len(rem) == 0 {
return nil
return warn
}
return c.Add(ctx, floats.Min(rem), nil)

return coalesceWarn(warn, c.Add(floats.Min(rem), nil))
}

func (c *minAcc) Add(ctx context.Context, v float64, h *histogram.FloatHistogram) error {
func (c *minAcc) Add(v float64, h *histogram.FloatHistogram) warning {
if h != nil {
return nil
}
Expand Down Expand Up @@ -260,7 +253,7 @@ type groupAcc struct {
hasValue bool
}

func (c *groupAcc) AddVector(ctx context.Context, vs []float64, hs []*histogram.FloatHistogram) error {
func (c *groupAcc) AddVector(vs []float64, hs []*histogram.FloatHistogram) warning {
if len(vs) == 0 && len(hs) == 0 {
return nil
}
Expand All @@ -269,7 +262,7 @@ func (c *groupAcc) AddVector(ctx context.Context, vs []float64, hs []*histogram.
return nil
}

func (c *groupAcc) Add(ctx context.Context, v float64, h *histogram.FloatHistogram) error {
func (c *groupAcc) Add(v float64, h *histogram.FloatHistogram) warning {
c.hasValue = true
c.value = 1
return nil
Expand Down Expand Up @@ -301,16 +294,15 @@ func newCountAcc() *countAcc {
return &countAcc{}
}

func (c *countAcc) AddVector(ctx context.Context, vs []float64, hs []*histogram.FloatHistogram) error {

func (c *countAcc) AddVector(vs []float64, hs []*histogram.FloatHistogram) warning {
if len(vs) > 0 || len(hs) > 0 {
c.hasValue = true
c.value += float64(len(vs)) + float64(len(hs))
}
return nil
}

func (c *countAcc) Add(ctx context.Context, v float64, h *histogram.FloatHistogram) error {
func (c *countAcc) Add(v float64, h *histogram.FloatHistogram) warning {
c.hasValue = true
c.value += 1
return nil
Expand Down Expand Up @@ -350,7 +342,7 @@ func newAvgAcc() *avgAcc {
return &avgAcc{}
}

func (a *avgAcc) Add(ctx context.Context, v float64, h *histogram.FloatHistogram) error {
func (a *avgAcc) Add(v float64, h *histogram.FloatHistogram) warning {
if h != nil {
a.histCount++
if a.histSum == nil {
Expand Down Expand Up @@ -427,27 +419,25 @@ func (a *avgAcc) Add(ctx context.Context, v float64, h *histogram.FloatHistogram
return nil
}

func (a *avgAcc) AddVector(ctx context.Context, vs []float64, hs []*histogram.FloatHistogram) error {
func (a *avgAcc) AddVector(vs []float64, hs []*histogram.FloatHistogram) warning {
for _, v := range vs {
if err := a.Add(ctx, v, nil); err != nil {
if err := a.Add(v, nil); err != nil {
return err
}
}
for _, h := range hs {
if err := a.Add(ctx, 0, h); err != nil {
if err := a.Add(0, h); err != nil {
if errors.Is(err, histogram.ErrHistogramsIncompatibleSchema) {
// to make valueType NoValue
a.histSum = nil
a.histCount = 0
warnings.AddToContext(annotations.NewMixedExponentialCustomHistogramsWarning("", posrange.PositionRange{}), ctx)
return nil
return annotations.MixedExponentialCustomHistogramsWarning
}
if errors.Is(err, histogram.ErrHistogramsIncompatibleBounds) {
// to make valueType NoValue
a.histSum = nil
a.histCount = 0
warnings.AddToContext(annotations.NewIncompatibleCustomBucketsHistogramsWarning("", posrange.PositionRange{}), ctx)
return nil
return annotations.IncompatibleCustomBucketsHistogramsWarning
}
return err
}
Expand Down Expand Up @@ -518,11 +508,10 @@ func newStdDevAcc() *stdDevAcc {
return &stdDevAcc{}
}

func (s *stdDevAcc) Add(ctx context.Context, v float64, h *histogram.FloatHistogram) error {
func (s *stdDevAcc) Add(v float64, h *histogram.FloatHistogram) warning {
if h != nil {
// ignore native histogram for STDDEV.
warnings.AddToContext(annotations.NewHistogramIgnoredInAggregationInfo("stddev", posrange.PositionRange{}), ctx)
return nil
return annotations.NewHistogramIgnoredInAggregationInfo("stddev", posrange.PositionRange{})
}

s.hasValue = true
Expand Down Expand Up @@ -557,11 +546,9 @@ func newStdVarAcc() *stdVarAcc {
return &stdVarAcc{}
}

func (s *stdVarAcc) Add(ctx context.Context, v float64, h *histogram.FloatHistogram) error {
func (s *stdVarAcc) Add(v float64, h *histogram.FloatHistogram) warning {
if h != nil {
// ignore native histogram for STDVAR.
warnings.AddToContext(annotations.NewHistogramIgnoredInAggregationInfo("stdvar", posrange.PositionRange{}), ctx)
return nil
return annotations.NewHistogramIgnoredInAggregationInfo("stdvar", posrange.PositionRange{})
}

s.hasValue = true
Expand Down Expand Up @@ -598,10 +585,9 @@ func newQuantileAcc() accumulator {
return &quantileAcc{}
}

func (q *quantileAcc) Add(ctx context.Context, v float64, h *histogram.FloatHistogram) error {
func (q *quantileAcc) Add(v float64, h *histogram.FloatHistogram) warning {
if h != nil {
warnings.AddToContext(annotations.NewHistogramIgnoredInAggregationInfo("quantile", posrange.PositionRange{}), ctx)
return nil
return annotations.NewHistogramIgnoredInAggregationInfo("quantile", posrange.PositionRange{})
}

q.hasValue = true
Expand Down Expand Up @@ -639,7 +625,7 @@ func newHistogramAvg() *histogramAvg {
}
}

func (acc *histogramAvg) Add(ctx context.Context, v float64, h *histogram.FloatHistogram) error {
func (acc *histogramAvg) Add(v float64, h *histogram.FloatHistogram) warning {
if h == nil {
acc.hasFloat = true
}
Expand Down Expand Up @@ -718,3 +704,10 @@ func KahanSumInc(inc, sum, c float64) (newSum, newC float64) {
}
return t, c
}

func coalesceWarn(a, b warning) warning {
if a != nil {
return a
}
return b
}
17 changes: 8 additions & 9 deletions execution/aggregate/hashaggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,8 @@ func (a *aggregate) Next(ctx context.Context) ([]model.StepVector, error) {
a.tables[i].reset(p)
}
if a.lastBatch != nil {
if err := a.aggregate(ctx, a.lastBatch); err != nil {
return nil, err
if warn := a.aggregate(a.lastBatch); warn != nil {
warnings.AddToContext(warn, ctx)
}
a.lastBatch = nil
}
Expand All @@ -151,8 +151,8 @@ func (a *aggregate) Next(ctx context.Context) ([]model.StepVector, error) {
// Keep aggregating samples as long as timestamps of batches are equal.
currentTs := a.tables[0].timestamp()
if currentTs == math.MinInt64 || next[0].T == currentTs {
if err := a.aggregate(ctx, next); err != nil {
return nil, err
if warn := a.aggregate(next); warn != nil {
warnings.AddToContext(warn, ctx)
}
continue
}
Expand All @@ -174,15 +174,14 @@ func (a *aggregate) Next(ctx context.Context) ([]model.StepVector, error) {
return result, nil
}

func (a *aggregate) aggregate(ctx context.Context, in []model.StepVector) error {
func (a *aggregate) aggregate(in []model.StepVector) error {
var warn warning
for i, vector := range in {
if err := a.tables[i].aggregate(ctx, vector); err != nil {
return err
}
warn = coalesceWarn(warn, a.tables[i].aggregate(vector))
a.next.GetPool().PutStepVector(vector)
}
a.next.GetPool().PutVectors(in)
return nil
return warn
}

func (a *aggregate) initializeTables(ctx context.Context) error {
Expand Down
23 changes: 10 additions & 13 deletions execution/aggregate/scalar_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type aggregateTable interface {
// If the table is empty, it returns math.MinInt64.
timestamp() int64
// aggregate aggregates the given vector into the table.
aggregate(ctx context.Context, vector model.StepVector) error
aggregate(vector model.StepVector) warning
// toVector writes out the accumulated result to the given vector and
// resets the table.
toVector(ctx context.Context, pool *model.VectorPool) model.StepVector
Expand Down Expand Up @@ -77,34 +77,31 @@ func newScalarTable(inputSampleIDs []uint64, outputs []*model.Series, aggregatio
}, nil
}

func (t *scalarTable) aggregate(ctx context.Context, vector model.StepVector) error {
func (t *scalarTable) aggregate(vector model.StepVector) warning {
t.ts = vector.T

var warn warning
for i := range vector.Samples {
if err := t.addSample(ctx, vector.SampleIDs[i], vector.Samples[i]); err != nil {
return err
}
warn = coalesceWarn(warn, t.addSample(vector.SampleIDs[i], vector.Samples[i]))
}
for i := range vector.Histograms {
if err := t.addHistogram(ctx, vector.HistogramIDs[i], vector.Histograms[i]); err != nil {
return err
}
warn = coalesceWarn(warn, t.addHistogram(vector.HistogramIDs[i], vector.Histograms[i]))
}
return nil
return warn
}

func (t *scalarTable) addSample(ctx context.Context, sampleID uint64, sample float64) error {
func (t *scalarTable) addSample(sampleID uint64, sample float64) warning {
outputSampleID := t.inputs[sampleID]
output := t.outputs[outputSampleID]

return t.accumulators[output.ID].Add(ctx, sample, nil)
return t.accumulators[output.ID].Add(sample, nil)
}

func (t *scalarTable) addHistogram(ctx context.Context, sampleID uint64, h *histogram.FloatHistogram) error {
func (t *scalarTable) addHistogram(sampleID uint64, h *histogram.FloatHistogram) warning {
outputSampleID := t.inputs[sampleID]
output := t.outputs[outputSampleID]

return t.accumulators[output.ID].Add(ctx, 0, h)
return t.accumulators[output.ID].Add(0, h)
}

func (t *scalarTable) reset(arg float64) {
Expand Down
Loading
Loading