diff --git a/execution/aggregate/accumulator.go b/execution/aggregate/accumulator.go index f8e3dbfa..8538dce5 100644 --- a/execution/aggregate/accumulator.go +++ b/execution/aggregate/accumulator.go @@ -4,11 +4,8 @@ package aggregate import ( - "context" "math" - "github.com/thanos-io/promql-engine/execution/warnings" - "github.com/efficientgo/core/errors" "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/promql/parser/posrange" @@ -24,15 +21,17 @@ const ( MixedTypeValue ) +type warning error + type accumulator interface { - Add(ctx context.Context, v float64, h *histogram.FloatHistogram) error + Add(v float64, h *histogram.FloatHistogram) warning Value() (float64, *histogram.FloatHistogram) ValueType() ValueType Reset(float64) } type vectorAccumulator interface { - AddVector(ctx context.Context, vs []float64, hs []*histogram.FloatHistogram) error + AddVector(vs []float64, hs []*histogram.FloatHistogram) warning Value() (float64, *histogram.FloatHistogram) ValueType() ValueType Reset(float64) @@ -49,7 +48,7 @@ func newSumAcc() *sumAcc { return &sumAcc{} } -func (s *sumAcc) AddVector(ctx context.Context, float64s []float64, histograms []*histogram.FloatHistogram) error { +func (s *sumAcc) AddVector(float64s []float64, histograms []*histogram.FloatHistogram) warning { if len(float64s) > 0 { s.value, s.compensation = KahanSumInc(SumCompensated(float64s), s.value, s.compensation) s.hasFloatVal = true @@ -57,12 +56,12 @@ func (s *sumAcc) AddVector(ctx context.Context, float64s []float64, histograms [ var err error if len(histograms) > 0 { - s.histSum, err = histogramSum(ctx, s.histSum, histograms) + s.histSum, err = histogramSum(s.histSum, histograms) } return err } -func (s *sumAcc) Add(ctx context.Context, v float64, h *histogram.FloatHistogram) error { +func (s *sumAcc) Add(v float64, h *histogram.FloatHistogram) warning { if h == nil { s.hasFloatVal = true s.value, s.compensation = KahanSumInc(v, s.value, s.compensation) @@ -78,12 +77,10 @@ func (s *sumAcc) Add(ctx context.Context, v float64, h *histogram.FloatHistogram if h.Schema >= s.histSum.Schema { if s.histSum, err = s.histSum.Add(h); err != nil { if errors.Is(err, histogram.ErrHistogramsIncompatibleSchema) { - warnings.AddToContext(annotations.MixedExponentialCustomHistogramsWarning, ctx) - return nil + return annotations.MixedExponentialCustomHistogramsWarning } if errors.Is(err, histogram.ErrHistogramsIncompatibleBounds) { - warnings.AddToContext(annotations.IncompatibleCustomBucketsHistogramsWarning, ctx) - return nil + return annotations.IncompatibleCustomBucketsHistogramsWarning } return err } @@ -91,12 +88,10 @@ func (s *sumAcc) Add(ctx context.Context, v float64, h *histogram.FloatHistogram t := h.Copy() if s.histSum, err = t.Add(s.histSum); err != nil { if errors.Is(err, histogram.ErrHistogramsIncompatibleSchema) { - warnings.AddToContext(annotations.MixedExponentialCustomHistogramsWarning, ctx) - return nil + return annotations.MixedExponentialCustomHistogramsWarning } if errors.Is(err, histogram.ErrHistogramsIncompatibleBounds) { - warnings.AddToContext(annotations.IncompatibleCustomBucketsHistogramsWarning, ctx) - return nil + return annotations.IncompatibleCustomBucketsHistogramsWarning } return err } @@ -138,26 +133,25 @@ type maxAcc struct { hasValue bool } -func (c *maxAcc) AddVector(ctx context.Context, vs []float64, hs []*histogram.FloatHistogram) error { +func (c *maxAcc) AddVector(vs []float64, hs []*histogram.FloatHistogram) warning { + var warn warning if len(hs) > 0 { - warnings.AddToContext(annotations.NewHistogramIgnoredInAggregationInfo("max", posrange.PositionRange{}), ctx) + warn = annotations.NewHistogramIgnoredInAggregationInfo("max", posrange.PositionRange{}) } - if len(vs) == 0 { - return nil + return warn } fst, rem := vs[0], vs[1:] - if err := c.Add(ctx, fst, nil); err != nil { - return err - } + warn = coalesceWarn(warn, c.Add(fst, nil)) if len(rem) == 0 { - return nil + return warn } - return c.Add(ctx, floats.Max(rem), nil) + warn = coalesceWarn(warn, c.Add(floats.Max(rem), nil)) + return warn } -func (c *maxAcc) Add(ctx context.Context, v float64, h *histogram.FloatHistogram) error { +func (c *maxAcc) Add(v float64, h *histogram.FloatHistogram) warning { if h != nil { return nil } @@ -199,26 +193,25 @@ type minAcc struct { hasValue bool } -func (c *minAcc) AddVector(ctx context.Context, vs []float64, hs []*histogram.FloatHistogram) error { +func (c *minAcc) AddVector(vs []float64, hs []*histogram.FloatHistogram) warning { + var warn warning if len(hs) > 0 { - warnings.AddToContext(annotations.NewHistogramIgnoredInAggregationInfo("min", posrange.PositionRange{}), ctx) + warn = annotations.NewHistogramIgnoredInAggregationInfo("min", posrange.PositionRange{}) } - if len(vs) == 0 { - return nil + return warn } fst, rem := vs[0], vs[1:] - if err := c.Add(ctx, fst, nil); err != nil { - return err - } + warn = coalesceWarn(warn, c.Add(fst, nil)) if len(rem) == 0 { - return nil + return warn } - return c.Add(ctx, floats.Min(rem), nil) + + return coalesceWarn(warn, c.Add(floats.Min(rem), nil)) } -func (c *minAcc) Add(ctx context.Context, v float64, h *histogram.FloatHistogram) error { +func (c *minAcc) Add(v float64, h *histogram.FloatHistogram) warning { if h != nil { return nil } @@ -260,7 +253,7 @@ type groupAcc struct { hasValue bool } -func (c *groupAcc) AddVector(ctx context.Context, vs []float64, hs []*histogram.FloatHistogram) error { +func (c *groupAcc) AddVector(vs []float64, hs []*histogram.FloatHistogram) warning { if len(vs) == 0 && len(hs) == 0 { return nil } @@ -269,7 +262,7 @@ func (c *groupAcc) AddVector(ctx context.Context, vs []float64, hs []*histogram. return nil } -func (c *groupAcc) Add(ctx context.Context, v float64, h *histogram.FloatHistogram) error { +func (c *groupAcc) Add(v float64, h *histogram.FloatHistogram) warning { c.hasValue = true c.value = 1 return nil @@ -301,8 +294,7 @@ func newCountAcc() *countAcc { return &countAcc{} } -func (c *countAcc) AddVector(ctx context.Context, vs []float64, hs []*histogram.FloatHistogram) error { - +func (c *countAcc) AddVector(vs []float64, hs []*histogram.FloatHistogram) warning { if len(vs) > 0 || len(hs) > 0 { c.hasValue = true c.value += float64(len(vs)) + float64(len(hs)) @@ -310,7 +302,7 @@ func (c *countAcc) AddVector(ctx context.Context, vs []float64, hs []*histogram. return nil } -func (c *countAcc) Add(ctx context.Context, v float64, h *histogram.FloatHistogram) error { +func (c *countAcc) Add(v float64, h *histogram.FloatHistogram) warning { c.hasValue = true c.value += 1 return nil @@ -350,7 +342,7 @@ func newAvgAcc() *avgAcc { return &avgAcc{} } -func (a *avgAcc) Add(ctx context.Context, v float64, h *histogram.FloatHistogram) error { +func (a *avgAcc) Add(v float64, h *histogram.FloatHistogram) warning { if h != nil { a.histCount++ if a.histSum == nil { @@ -427,27 +419,25 @@ func (a *avgAcc) Add(ctx context.Context, v float64, h *histogram.FloatHistogram return nil } -func (a *avgAcc) AddVector(ctx context.Context, vs []float64, hs []*histogram.FloatHistogram) error { +func (a *avgAcc) AddVector(vs []float64, hs []*histogram.FloatHistogram) warning { for _, v := range vs { - if err := a.Add(ctx, v, nil); err != nil { + if err := a.Add(v, nil); err != nil { return err } } for _, h := range hs { - if err := a.Add(ctx, 0, h); err != nil { + if err := a.Add(0, h); err != nil { if errors.Is(err, histogram.ErrHistogramsIncompatibleSchema) { // to make valueType NoValue a.histSum = nil a.histCount = 0 - warnings.AddToContext(annotations.NewMixedExponentialCustomHistogramsWarning("", posrange.PositionRange{}), ctx) - return nil + return annotations.MixedExponentialCustomHistogramsWarning } if errors.Is(err, histogram.ErrHistogramsIncompatibleBounds) { // to make valueType NoValue a.histSum = nil a.histCount = 0 - warnings.AddToContext(annotations.NewIncompatibleCustomBucketsHistogramsWarning("", posrange.PositionRange{}), ctx) - return nil + return annotations.IncompatibleCustomBucketsHistogramsWarning } return err } @@ -518,11 +508,10 @@ func newStdDevAcc() *stdDevAcc { return &stdDevAcc{} } -func (s *stdDevAcc) Add(ctx context.Context, v float64, h *histogram.FloatHistogram) error { +func (s *stdDevAcc) Add(v float64, h *histogram.FloatHistogram) warning { if h != nil { // ignore native histogram for STDDEV. - warnings.AddToContext(annotations.NewHistogramIgnoredInAggregationInfo("stddev", posrange.PositionRange{}), ctx) - return nil + return annotations.NewHistogramIgnoredInAggregationInfo("stddev", posrange.PositionRange{}) } s.hasValue = true @@ -557,11 +546,9 @@ func newStdVarAcc() *stdVarAcc { return &stdVarAcc{} } -func (s *stdVarAcc) Add(ctx context.Context, v float64, h *histogram.FloatHistogram) error { +func (s *stdVarAcc) Add(v float64, h *histogram.FloatHistogram) warning { if h != nil { - // ignore native histogram for STDVAR. - warnings.AddToContext(annotations.NewHistogramIgnoredInAggregationInfo("stdvar", posrange.PositionRange{}), ctx) - return nil + return annotations.NewHistogramIgnoredInAggregationInfo("stdvar", posrange.PositionRange{}) } s.hasValue = true @@ -598,10 +585,9 @@ func newQuantileAcc() accumulator { return &quantileAcc{} } -func (q *quantileAcc) Add(ctx context.Context, v float64, h *histogram.FloatHistogram) error { +func (q *quantileAcc) Add(v float64, h *histogram.FloatHistogram) warning { if h != nil { - warnings.AddToContext(annotations.NewHistogramIgnoredInAggregationInfo("quantile", posrange.PositionRange{}), ctx) - return nil + return annotations.NewHistogramIgnoredInAggregationInfo("quantile", posrange.PositionRange{}) } q.hasValue = true @@ -639,7 +625,7 @@ func newHistogramAvg() *histogramAvg { } } -func (acc *histogramAvg) Add(ctx context.Context, v float64, h *histogram.FloatHistogram) error { +func (acc *histogramAvg) Add(v float64, h *histogram.FloatHistogram) warning { if h == nil { acc.hasFloat = true } @@ -718,3 +704,10 @@ func KahanSumInc(inc, sum, c float64) (newSum, newC float64) { } return t, c } + +func coalesceWarn(a, b warning) warning { + if a != nil { + return a + } + return b +} diff --git a/execution/aggregate/hashaggregate.go b/execution/aggregate/hashaggregate.go index f613270b..71d9f169 100644 --- a/execution/aggregate/hashaggregate.go +++ b/execution/aggregate/hashaggregate.go @@ -135,8 +135,8 @@ func (a *aggregate) Next(ctx context.Context) ([]model.StepVector, error) { a.tables[i].reset(p) } if a.lastBatch != nil { - if err := a.aggregate(ctx, a.lastBatch); err != nil { - return nil, err + if warn := a.aggregate(a.lastBatch); warn != nil { + warnings.AddToContext(warn, ctx) } a.lastBatch = nil } @@ -151,8 +151,8 @@ func (a *aggregate) Next(ctx context.Context) ([]model.StepVector, error) { // Keep aggregating samples as long as timestamps of batches are equal. currentTs := a.tables[0].timestamp() if currentTs == math.MinInt64 || next[0].T == currentTs { - if err := a.aggregate(ctx, next); err != nil { - return nil, err + if warn := a.aggregate(next); warn != nil { + warnings.AddToContext(warn, ctx) } continue } @@ -174,15 +174,14 @@ func (a *aggregate) Next(ctx context.Context) ([]model.StepVector, error) { return result, nil } -func (a *aggregate) aggregate(ctx context.Context, in []model.StepVector) error { +func (a *aggregate) aggregate(in []model.StepVector) error { + var warn warning for i, vector := range in { - if err := a.tables[i].aggregate(ctx, vector); err != nil { - return err - } + warn = coalesceWarn(warn, a.tables[i].aggregate(vector)) a.next.GetPool().PutStepVector(vector) } a.next.GetPool().PutVectors(in) - return nil + return warn } func (a *aggregate) initializeTables(ctx context.Context) error { diff --git a/execution/aggregate/scalar_table.go b/execution/aggregate/scalar_table.go index c9ceda8d..3857ec8e 100644 --- a/execution/aggregate/scalar_table.go +++ b/execution/aggregate/scalar_table.go @@ -28,7 +28,7 @@ type aggregateTable interface { // If the table is empty, it returns math.MinInt64. timestamp() int64 // aggregate aggregates the given vector into the table. - aggregate(ctx context.Context, vector model.StepVector) error + aggregate(vector model.StepVector) warning // toVector writes out the accumulated result to the given vector and // resets the table. toVector(ctx context.Context, pool *model.VectorPool) model.StepVector @@ -77,34 +77,31 @@ func newScalarTable(inputSampleIDs []uint64, outputs []*model.Series, aggregatio }, nil } -func (t *scalarTable) aggregate(ctx context.Context, vector model.StepVector) error { +func (t *scalarTable) aggregate(vector model.StepVector) warning { t.ts = vector.T + var warn warning for i := range vector.Samples { - if err := t.addSample(ctx, vector.SampleIDs[i], vector.Samples[i]); err != nil { - return err - } + warn = coalesceWarn(warn, t.addSample(vector.SampleIDs[i], vector.Samples[i])) } for i := range vector.Histograms { - if err := t.addHistogram(ctx, vector.HistogramIDs[i], vector.Histograms[i]); err != nil { - return err - } + warn = coalesceWarn(warn, t.addHistogram(vector.HistogramIDs[i], vector.Histograms[i])) } - return nil + return warn } -func (t *scalarTable) addSample(ctx context.Context, sampleID uint64, sample float64) error { +func (t *scalarTable) addSample(sampleID uint64, sample float64) warning { outputSampleID := t.inputs[sampleID] output := t.outputs[outputSampleID] - return t.accumulators[output.ID].Add(ctx, sample, nil) + return t.accumulators[output.ID].Add(sample, nil) } -func (t *scalarTable) addHistogram(ctx context.Context, sampleID uint64, h *histogram.FloatHistogram) error { +func (t *scalarTable) addHistogram(sampleID uint64, h *histogram.FloatHistogram) warning { outputSampleID := t.inputs[sampleID] output := t.outputs[outputSampleID] - return t.accumulators[output.ID].Add(ctx, 0, h) + return t.accumulators[output.ID].Add(0, h) } func (t *scalarTable) reset(arg float64) { diff --git a/execution/aggregate/vector_table.go b/execution/aggregate/vector_table.go index c0ba522e..90bbf8bd 100644 --- a/execution/aggregate/vector_table.go +++ b/execution/aggregate/vector_table.go @@ -48,9 +48,9 @@ func (t *vectorTable) timestamp() int64 { return t.ts } -func (t *vectorTable) aggregate(ctx context.Context, vector model.StepVector) error { +func (t *vectorTable) aggregate(vector model.StepVector) warning { t.ts = vector.T - return t.accumulator.AddVector(ctx, vector.Samples, vector.Histograms) + return t.accumulator.AddVector(vector.Samples, vector.Histograms) } func (t *vectorTable) toVector(ctx context.Context, pool *model.VectorPool) model.StepVector { @@ -96,7 +96,7 @@ func newVectorAccumulator(expr parser.ItemType) (vectorAccumulator, error) { return nil, errors.Wrap(parse.ErrNotSupportedExpr, msg) } -func histogramSum(ctx context.Context, current *histogram.FloatHistogram, histograms []*histogram.FloatHistogram) (*histogram.FloatHistogram, error) { +func histogramSum(current *histogram.FloatHistogram, histograms []*histogram.FloatHistogram) (*histogram.FloatHistogram, warning) { if len(histograms) == 0 { return current, nil } @@ -117,12 +117,10 @@ func histogramSum(ctx context.Context, current *histogram.FloatHistogram, histog histSum, err = histSum.Add(histograms[i]) if err != nil { if errors.Is(err, histogram.ErrHistogramsIncompatibleSchema) { - warnings.AddToContext(annotations.NewMixedExponentialCustomHistogramsWarning("", posrange.PositionRange{}), ctx) - return nil, nil + return nil, annotations.MixedExponentialCustomHistogramsWarning } if errors.Is(err, histogram.ErrHistogramsIncompatibleBounds) { - warnings.AddToContext(annotations.NewIncompatibleCustomBucketsHistogramsWarning("", posrange.PositionRange{}), ctx) - return nil, nil + return nil, annotations.IncompatibleCustomBucketsHistogramsWarning } return nil, err } @@ -130,12 +128,10 @@ func histogramSum(ctx context.Context, current *histogram.FloatHistogram, histog t := histograms[i].Copy() if histSum, err = t.Add(histSum); err != nil { if errors.Is(err, histogram.ErrHistogramsIncompatibleSchema) { - warnings.AddToContext(annotations.NewMixedExponentialCustomHistogramsWarning("", posrange.PositionRange{}), ctx) - return nil, nil + return nil, annotations.NewMixedExponentialCustomHistogramsWarning("", posrange.PositionRange{}) } if errors.Is(err, histogram.ErrHistogramsIncompatibleBounds) { - warnings.AddToContext(annotations.NewIncompatibleCustomBucketsHistogramsWarning("", posrange.PositionRange{}), ctx) - return nil, nil + return nil, annotations.NewIncompatibleCustomBucketsHistogramsWarning("", posrange.PositionRange{}) } return nil, err }