Skip to content

Commit 93520d8

Browse files
committed
ringbuffer: more over-time aligners
1 parent 1822755 commit 93520d8

File tree

2 files changed

+112
-30
lines changed

2 files changed

+112
-30
lines changed

ringbuffer/overtime.go

Lines changed: 106 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -8,27 +8,33 @@ import (
88
"math"
99

1010
"github.com/thanos-io/promql-engine/execution/telemetry"
11+
"github.com/thanos-io/promql-engine/execution/warnings"
1112
"github.com/thanos-io/promql-engine/query"
1213

1314
"github.com/prometheus/prometheus/model/histogram"
15+
"github.com/prometheus/prometheus/promql/parser/posrange"
16+
"github.com/prometheus/prometheus/util/annotations"
1417
)
1518

16-
// TODO: this is mostly copied over from the rate aligner and not at all special to
17-
// count_over_time - we could of course generalize it to other over_time functions and we
18-
// will do that soon.
19-
// This is really just to get warmed up.
20-
// The idea here is that we tile the range into "stepRanges" and when we push a sample
21-
// and it happens to fall into a step range, we increment the state that we hold in a
22-
// corresponding "resultSamples" slice.
19+
// TODO: ... we should reuse execution/aggregation/accumulator here.. this is a bit janky
20+
// but it helps already
21+
type aggregator func(cur *stepState, next Value) bool
2322

24-
// CountOverTimeBuffer is a Buffer which can calculate count_over_time for a series in a
23+
type stepState struct {
24+
sample Value
25+
initialized bool
26+
warning error
27+
}
28+
29+
// OverTimeBuffer is a Buffer which can calculate [agg]_over_time for a series in a
2530
// streaming manner, calculating the value incrementally for each step where the sample is used.
26-
type CountOverTimeBuffer struct {
31+
type OverTimeBuffer struct {
2732
// stepRanges contain the bounds and number of samples for each evaluation step.
2833
stepRanges []stepRange
29-
30-
// resultscounts contains the resulting state for each evaluation step.
31-
resultCounts []float64
34+
// stepStates contains the aggregation state for the corresponding stepRange
35+
stepStates []stepState
36+
// aggregator aggregates the current stepState with the new value
37+
aggregator aggregator
3238

3339
// firstTimestamps contains the timestamp of the first sample for each evaluation step.
3440
firstTimestamps []int64
@@ -39,8 +45,7 @@ type CountOverTimeBuffer struct {
3945
step int64
4046
}
4147

42-
// NewCountOverTime creates a new CountOverTimeBuffer.
43-
func NewCountOverTimeBuffer(opts query.Options, selectRange, offset int64) *CountOverTimeBuffer {
48+
func newOverTimeBuffer(opts query.Options, selectRange, offset int64, agg aggregator) *OverTimeBuffer {
4449
var (
4550
step = max(1, opts.Step.Milliseconds())
4651
numSteps = min(
@@ -49,38 +54,93 @@ func NewCountOverTimeBuffer(opts query.Options, selectRange, offset int64) *Coun
4954
)
5055

5156
current = opts.Start.UnixMilli()
52-
resultCounts = make([]float64, 0, numSteps)
5357
firstTimestamps = make([]int64, 0, numSteps)
5458
stepRanges = make([]stepRange, 0, numSteps)
59+
stepStates = make([]stepState, 0, numSteps)
5560
)
5661
for range int(numSteps) {
5762
var (
5863
maxt = current - offset
5964
mint = maxt - selectRange
6065
)
6166
stepRanges = append(stepRanges, stepRange{mint: mint, maxt: maxt})
62-
resultCounts = append(resultCounts, 0.)
67+
stepStates = append(stepStates, stepState{})
6368
firstTimestamps = append(firstTimestamps, math.MaxInt64)
6469

6570
current += step
6671
}
6772

68-
return &CountOverTimeBuffer{
73+
return &OverTimeBuffer{
74+
aggregator: agg,
6975
step: step,
7076
stepRanges: stepRanges,
71-
resultCounts: resultCounts,
77+
stepStates: stepStates,
7278
firstTimestamps: firstTimestamps,
7379
lastTimestamp: math.MinInt64,
7480
}
7581
}
7682

77-
func (r *CountOverTimeBuffer) SampleCount() int {
83+
// NewCountOverTimeBuffer creates a new OverTimeBuffer for the count_over_time function.
84+
func NewCountOverTimeBuffer(opts query.Options, selectRange, offset int64) *OverTimeBuffer {
85+
return newOverTimeBuffer(opts, selectRange, offset, func(cur *stepState, _ Value) bool {
86+
cur.sample.F += 1
87+
return true
88+
})
89+
}
90+
91+
// NewMaxOverTimeBuffer creates a new OverTimeBuffer for the max_over_time function.
92+
func NewMaxOverTimeBuffer(opts query.Options, selectRange, offset int64) *OverTimeBuffer {
93+
return newOverTimeBuffer(opts, selectRange, offset, func(cur *stepState, next Value) bool {
94+
if next.H != nil {
95+
if cur.initialized {
96+
cur.warning = annotations.NewHistogramIgnoredInMixedRangeInfo("", posrange.PositionRange{})
97+
}
98+
return false
99+
}
100+
if !cur.initialized {
101+
cur.sample.F = next.F
102+
cur.initialized = true
103+
} else if cur.sample.F < next.F || math.IsNaN(cur.sample.F) {
104+
cur.sample.F = next.F
105+
}
106+
return true
107+
})
108+
}
109+
110+
// NewMinOverTime creates a new OverTimeBuffer for the min_over_time function.
111+
func NewMinOverTimeBuffer(opts query.Options, selectRange, offset int64) *OverTimeBuffer {
112+
return newOverTimeBuffer(opts, selectRange, offset, func(cur *stepState, next Value) bool {
113+
if next.H != nil {
114+
if cur.initialized {
115+
cur.warning = annotations.NewHistogramIgnoredInMixedRangeInfo("", posrange.PositionRange{})
116+
}
117+
return false
118+
}
119+
if !cur.initialized {
120+
cur.sample.F = next.F
121+
cur.initialized = true
122+
} else if cur.sample.F > next.F || math.IsNaN(cur.sample.F) {
123+
cur.sample.F = next.F
124+
}
125+
return true
126+
})
127+
}
128+
129+
// NewLastOverTime creates a new OverTimeBuffer for the last_over_time function.
130+
func NewLastOverTimeBuffer(opts query.Options, selectRange, offset int64) *OverTimeBuffer {
131+
return newOverTimeBuffer(opts, selectRange, offset, func(cur *stepState, next Value) bool {
132+
cur.sample = next
133+
return true
134+
})
135+
}
136+
137+
func (r *OverTimeBuffer) SampleCount() int {
78138
return r.stepRanges[0].sampleCount
79139
}
80140

81-
func (r *CountOverTimeBuffer) MaxT() int64 { return r.lastTimestamp }
141+
func (r *OverTimeBuffer) MaxT() int64 { return r.lastTimestamp }
82142

83-
func (r *CountOverTimeBuffer) Push(t int64, v Value) {
143+
func (r *OverTimeBuffer) Push(t int64, v Value) {
84144
// Set the lastSample sample for the current evaluation step.
85145
r.lastTimestamp = t
86146

@@ -93,8 +153,10 @@ func (r *CountOverTimeBuffer) Push(t int64, v Value) {
93153
r.stepRanges[i].sampleCount++
94154
}
95155

96-
// Count the current sample in its range
97-
r.resultCounts[i] += 1
156+
// Aggregate the sample to the current step
157+
if ok := r.aggregator(&(r.stepStates[i]), v); !ok {
158+
continue
159+
}
98160

99161
if fts := r.firstTimestamps[i]; t >= fts {
100162
continue
@@ -103,7 +165,7 @@ func (r *CountOverTimeBuffer) Push(t int64, v Value) {
103165
}
104166
}
105167

106-
func (r *CountOverTimeBuffer) Reset(mint int64, evalt int64) {
168+
func (r *OverTimeBuffer) Reset(mint int64, evalt int64) {
107169
if r.stepRanges[0].mint == mint {
108170
return
109171
}
@@ -113,22 +175,36 @@ func (r *CountOverTimeBuffer) Reset(mint int64, evalt int64) {
113175
nextMint = r.stepRanges[lastSample].mint + r.step
114176
nextMaxt = r.stepRanges[lastSample].maxt + r.step
115177
)
178+
nextStepRange := r.stepRanges[0]
116179
copy(r.stepRanges, r.stepRanges[1:])
117-
r.stepRanges[lastSample] = stepRange{mint: nextMint, maxt: nextMaxt}
118-
119-
copy(r.resultCounts, r.resultCounts[1:])
120-
r.resultCounts[lastSample] = 0
180+
r.stepRanges[lastSample] = nextStepRange
181+
r.stepRanges[lastSample].mint = nextMint
182+
r.stepRanges[lastSample].maxt = nextMaxt
183+
r.stepRanges[lastSample].sampleCount = 0
184+
r.stepRanges[lastSample].numSamples = 0
185+
186+
nextFirstState := r.stepStates[0]
187+
copy(r.stepStates, r.stepStates[1:])
188+
r.stepStates[lastSample] = nextFirstState
189+
r.stepStates[lastSample].sample.H = nil
190+
r.stepStates[lastSample].sample.F = 0
191+
r.stepStates[lastSample].initialized = false
192+
r.stepStates[lastSample].warning = nil
121193

122194
nextFirstTimestamp := r.firstTimestamps[0]
123195
copy(r.firstTimestamps, r.firstTimestamps[1:])
124196
r.firstTimestamps[lastSample] = nextFirstTimestamp
125197
r.firstTimestamps[lastSample] = math.MaxInt64
126198
}
127199

128-
func (r *CountOverTimeBuffer) Eval(ctx context.Context, _, _ float64, _ int64) (float64, *histogram.FloatHistogram, bool, error) {
200+
func (r *OverTimeBuffer) Eval(ctx context.Context, _, _ float64, _ int64) (float64, *histogram.FloatHistogram, bool, error) {
201+
if r.stepStates[0].warning != nil {
202+
warnings.AddToContext(r.stepStates[0].warning, ctx)
203+
}
204+
129205
if r.firstTimestamps[0] == math.MaxInt64 {
130206
return 0, nil, false, nil
131207
}
132208

133-
return r.resultCounts[0], nil, true, nil
209+
return r.stepStates[0].sample.F, r.stepStates[0].sample.H, true, nil
134210
}

storage/prometheus/matrix_selector.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -280,6 +280,12 @@ func (o *matrixSelector) newBuffer(ctx context.Context) ringbuffer.Buffer {
280280
return ringbuffer.NewRateBuffer(ctx, *o.opts, false, false, o.selectRange, o.offset)
281281
case "count_over_time":
282282
return ringbuffer.NewCountOverTimeBuffer(*o.opts, o.selectRange, o.offset)
283+
case "max_over_time":
284+
return ringbuffer.NewMaxOverTimeBuffer(*o.opts, o.selectRange, o.offset)
285+
case "min_over_time":
286+
return ringbuffer.NewMinOverTimeBuffer(*o.opts, o.selectRange, o.offset)
287+
case "last_over_time":
288+
return ringbuffer.NewLastOverTimeBuffer(*o.opts, o.selectRange, o.offset)
283289
}
284290

285291
if o.isExtFunction {

0 commit comments

Comments
 (0)