Skip to content

Commit eb66ef7

Browse files
ringbuffer: revert interface changes (#644)
Signed-off-by: Michael Hoffmann <[email protected]>
1 parent 3225af5 commit eb66ef7

File tree

5 files changed

+152
-29
lines changed

5 files changed

+152
-29
lines changed

engine/engine_test.go

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2601,6 +2601,128 @@ func TestEdgeCases(t *testing.T) {
26012601
}
26022602
}
26032603

2604+
func TestXFunctionsRangeQuery(t *testing.T) {
2605+
// Negative offset and at modifier are enabled by default
2606+
// since Prometheus v2.33.0, so we also enable them.
2607+
opts := promql.EngineOpts{
2608+
Timeout: 1 * time.Hour,
2609+
MaxSamples: 1e10,
2610+
EnableNegativeOffset: true,
2611+
EnableAtModifier: true,
2612+
}
2613+
2614+
cases := []struct {
2615+
name string
2616+
load string
2617+
query string
2618+
startTime time.Time
2619+
endTime time.Time
2620+
step time.Duration
2621+
2622+
expected promql.Matrix
2623+
}{
2624+
{
2625+
name: "gaps between steps",
2626+
load: `load 10s
2627+
http_requests 1 5 10 20 _ 40`,
2628+
query: "xincrease(http_requests[10s])",
2629+
2630+
startTime: time.Unix(0, 0),
2631+
endTime: time.Unix(60, 0),
2632+
step: 20 * time.Second,
2633+
2634+
expected: promql.Matrix{
2635+
promql.Series{
2636+
Metric: labels.New(),
2637+
Floats: []promql.FPoint{
2638+
{T: 00_000, F: 1},
2639+
{T: 20_000, F: 9}, // TODO: this seems odd, feels like it should be 5
2640+
{T: 40_000, F: 0},
2641+
{T: 60_000, F: 0},
2642+
},
2643+
},
2644+
},
2645+
},
2646+
{
2647+
name: "back to back steps",
2648+
load: `load 10s
2649+
http_requests 1 5 10 20 _ 40`,
2650+
query: "xincrease(http_requests[10s])",
2651+
2652+
startTime: time.Unix(0, 0),
2653+
endTime: time.Unix(60, 0),
2654+
step: 10 * time.Second,
2655+
2656+
expected: promql.Matrix{
2657+
promql.Series{
2658+
Metric: labels.New(),
2659+
Floats: []promql.FPoint{
2660+
{T: 00_000, F: 1},
2661+
{T: 10_000, F: 4},
2662+
{T: 20_000, F: 5},
2663+
{T: 30_000, F: 10},
2664+
{T: 40_000, F: 0},
2665+
{T: 50_000, F: 20},
2666+
{T: 60_000, F: 0},
2667+
},
2668+
},
2669+
},
2670+
},
2671+
{
2672+
name: "overlapping steps",
2673+
load: `load 10s
2674+
http_requests 1 5 10 20 _ 40`,
2675+
query: "xincrease(http_requests[20s])",
2676+
2677+
startTime: time.Unix(0, 0),
2678+
endTime: time.Unix(60, 0),
2679+
step: 10 * time.Second,
2680+
2681+
expected: promql.Matrix{
2682+
promql.Series{
2683+
Metric: labels.New(),
2684+
Floats: []promql.FPoint{
2685+
{T: 00_000, F: 1},
2686+
{T: 10_000, F: 4},
2687+
{T: 20_000, F: 9},
2688+
{T: 30_000, F: 15},
2689+
{T: 40_000, F: 10},
2690+
{T: 50_000, F: 20},
2691+
{T: 60_000, F: 20},
2692+
},
2693+
},
2694+
},
2695+
},
2696+
}
2697+
2698+
for _, tc := range cases {
2699+
t.Run(tc.name, func(t *testing.T) {
2700+
storage := promqltest.LoadedStorage(t, tc.load)
2701+
defer storage.Close()
2702+
2703+
ctx := context.Background()
2704+
newEngine := engine.New(engine.Opts{
2705+
EngineOpts: opts,
2706+
LogicalOptimizers: logicalplan.AllOptimizers,
2707+
EnableXFunctions: true,
2708+
})
2709+
query, err := newEngine.NewRangeQuery(ctx, storage, nil, tc.query, tc.startTime, tc.endTime, tc.step)
2710+
testutil.Ok(t, err)
2711+
defer query.Close()
2712+
2713+
engineResult := query.Exec(ctx)
2714+
testutil.Ok(t, engineResult.Err)
2715+
2716+
gotMatrix, err := engineResult.Matrix()
2717+
require.NoError(t, err)
2718+
2719+
for i := range tc.expected {
2720+
testutil.WithGoCmp(comparer).Equals(t, tc.expected[i].Floats, gotMatrix[i].Floats, queryExplanation(query))
2721+
}
2722+
})
2723+
}
2724+
}
2725+
26042726
func TestXFunctionsWithNativeHistograms(t *testing.T) {
26052727
defaultQueryTime := time.Unix(50, 0)
26062728

ringbuffer/generic.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,12 @@ type Buffer interface {
1818
Reset(mint int64, evalt int64)
1919
Eval(ctx context.Context, _, _ float64, _ int64) (float64, *histogram.FloatHistogram, bool, error)
2020
SampleCount() int
21+
22+
// to handle extlookback properly, only used by buffers that implement xincrease or xrate
23+
ReadIntoLast(f func(*Sample))
2124
}
2225

23-
func Empty(b Buffer) bool { return b.MaxT() != math.MinInt64 }
26+
func Empty(b Buffer) bool { return b.MaxT() == math.MinInt64 }
2427

2528
type Value struct {
2629
F float64
@@ -80,6 +83,11 @@ func (r *GenericRingBuffer) MaxT() int64 {
8083
return r.items[len(r.items)-1].T
8184
}
8285

86+
// ReadIntoLast reads a sample into the last slot in the buffer, replacing the existing sample.
87+
func (r *GenericRingBuffer) ReadIntoLast(f func(*Sample)) {
88+
f(&r.items[len(r.items)-1])
89+
}
90+
8391
// Push adds a new sample to the buffer.
8492
func (r *GenericRingBuffer) Push(t int64, v Value) {
8593
n := len(r.items)

ringbuffer/overtime.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,8 @@ func (r *CountOverTimeBuffer) Reset(mint int64, evalt int64) {
131131
r.firstTimestamps[lastSample] = math.MaxInt64
132132
}
133133

134+
func (r *CountOverTimeBuffer) ReadIntoLast(func(*Sample)) {}
135+
134136
func (r *CountOverTimeBuffer) Eval(ctx context.Context, _, _ float64, _ int64) (float64, *histogram.FloatHistogram, bool, error) {
135137
if r.firstTimestamps[0] == math.MaxInt64 {
136138
return 0, nil, false, nil

ringbuffer/rate.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,8 @@ func (r *RateBuffer) Eval(ctx context.Context, _, _ float64, _ int64) (float64,
192192
return extrapolatedRate(ctx, r.rateBuffer, numSamples, r.isCounter, r.isRate, r.evalTs, r.selectRange, r.offset)
193193
}
194194

195+
func (r *RateBuffer) ReadIntoLast(func(*Sample)) {}
196+
195197
func querySteps(o query.Options) int64 {
196198
// Instant evaluation is executed as a range evaluation with one step.
197199
if o.Step.Milliseconds() == 0 {

storage/prometheus/matrix_selector.go

Lines changed: 17 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,8 @@ func (o *matrixSelector) Next(ctx context.Context) ([]model.StepVector, error) {
183183
for currStep := 0; currStep < o.numSteps && seriesTs <= o.maxt; currStep++ {
184184
maxt := seriesTs - o.offset
185185
mint := maxt - o.selectRange
186-
if err := scanner.selectPoints(mint, maxt, seriesTs, o.fhReader, o.extLookbackDelta, o.isExtFunction); err != nil {
186+
187+
if err := scanner.selectPoints(mint, maxt, seriesTs, o.fhReader, o.isExtFunction); err != nil {
187188
return nil, err
188189
}
189190
// TODO(saswatamcode): Handle multi-arg functions for matrixSelectors.
@@ -308,7 +309,6 @@ func (o *matrixSelector) String() string {
308309
func (m *matrixScanner) selectPoints(
309310
mint, maxt, evalt int64,
310311
fh *histogram.FloatHistogram,
311-
extLookbackDelta int64,
312312
isExtFunction bool,
313313
) error {
314314
m.buffer.Reset(mint, evalt)
@@ -319,19 +319,14 @@ func (m *matrixScanner) selectPoints(
319319
if bufMaxt := m.buffer.MaxT() + 1; bufMaxt > mint {
320320
mint = bufMaxt
321321
}
322-
mint = maxInt64(mint, m.buffer.MaxT()+1)
322+
mint = max(mint, m.buffer.MaxT()+1)
323323
if m.lastSample.T > mint {
324324
m.buffer.Push(m.lastSample.T, m.lastSample.V)
325325
m.lastSample.T = math.MinInt64
326-
mint = maxInt64(mint, m.buffer.MaxT()+1)
326+
mint = max(mint, m.buffer.MaxT()+1)
327327
}
328328

329-
var (
330-
// The sample that we add for x-functions, -1 is a canary value for the situation
331-
// where we have no sample in the extended lookback delta
332-
extSample = ringbuffer.Sample{T: -1}
333-
)
334-
329+
appendedPointBeforeMint := !ringbuffer.Empty(m.buffer)
335330
for valType := m.iterator.Next(); valType != chunkenc.ValNone; valType = m.iterator.Next() {
336331
switch valType {
337332
case chunkenc.ValHistogram, chunkenc.ValFloatHistogram:
@@ -367,27 +362,21 @@ func (m *matrixScanner) selectPoints(
367362
m.lastSample.T, m.lastSample.V.F, m.lastSample.V.H = t, v, nil
368363
return nil
369364
}
370-
if t > mint {
371-
if extSample.T != -1 && isExtFunction {
372-
m.buffer.Push(extSample.T, ringbuffer.Value{F: extSample.V.F})
373-
extSample.T = -1
365+
if isExtFunction {
366+
if t > mint || !appendedPointBeforeMint {
367+
m.buffer.Push(t, ringbuffer.Value{F: v})
368+
appendedPointBeforeMint = true
369+
} else {
370+
m.buffer.ReadIntoLast(func(s *ringbuffer.Sample) {
371+
s.T, s.V.F, s.V.H = t, v, nil
372+
})
373+
}
374+
} else {
375+
if t > mint {
376+
m.buffer.Push(t, ringbuffer.Value{F: v})
374377
}
375-
m.buffer.Push(t, ringbuffer.Value{F: v})
376-
continue
377-
}
378-
if isExtFunction && t > mint-extLookbackDelta {
379-
extSample.T = t
380-
extSample.V.F = v
381378
}
382379
}
383380
}
384381
return m.iterator.Err()
385382
}
386-
387-
func maxInt64(a, b int64) int64 {
388-
if a > b {
389-
return a
390-
}
391-
return b
392-
393-
}

0 commit comments

Comments
 (0)