Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 122 additions & 0 deletions engine/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2880,6 +2880,128 @@ func TestXFunctions(t *testing.T) {
}
}

func TestXFunctionsRangeQuery(t *testing.T) {
// Negative offset and at modifier are enabled by default
// since Prometheus v2.33.0, so we also enable them.
opts := promql.EngineOpts{
Timeout: 1 * time.Hour,
MaxSamples: 1e10,
EnableNegativeOffset: true,
EnableAtModifier: true,
}

cases := []struct {
name string
load string
query string
startTime time.Time
endTime time.Time
step time.Duration

expected promql.Matrix
}{
{
name: "gaps between steps",
load: `load 10s
http_requests 1 5 10 20 _ 40`,
query: "xincrease(http_requests[10s])",

startTime: time.Unix(0, 0),
endTime: time.Unix(60, 0),
step: 20 * time.Second,

expected: promql.Matrix{
promql.Series{
Metric: labels.New(),
Floats: []promql.FPoint{
{T: 00_000, F: 1},
{T: 20_000, F: 5},
{T: 40_000, F: 0},
{T: 60_000, F: 0},
},
},
},
},
{
name: "back to back steps",
load: `load 10s
http_requests 1 5 10 20 _ 40`,
query: "xincrease(http_requests[10s])",

startTime: time.Unix(0, 0),
endTime: time.Unix(60, 0),
step: 10 * time.Second,

expected: promql.Matrix{
promql.Series{
Metric: labels.New(),
Floats: []promql.FPoint{
{T: 00_000, F: 1},
{T: 10_000, F: 4},
{T: 20_000, F: 5},
{T: 30_000, F: 10},
{T: 40_000, F: 0},
{T: 50_000, F: 20},
{T: 60_000, F: 0},
},
},
},
},
{
name: "overlapping steps",
load: `load 10s
http_requests 1 5 10 20 _ 40`,
query: "xincrease(http_requests[20s])",

startTime: time.Unix(0, 0),
endTime: time.Unix(60, 0),
step: 10 * time.Second,

expected: promql.Matrix{
promql.Series{
Metric: labels.New(),
Floats: []promql.FPoint{
{T: 00_000, F: 1},
{T: 10_000, F: 4},
{T: 20_000, F: 9},
{T: 30_000, F: 15},
{T: 40_000, F: 10},
{T: 50_000, F: 20},
{T: 60_000, F: 20},
},
},
},
},
}

for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
storage := promqltest.LoadedStorage(t, tc.load)
defer storage.Close()

ctx := context.Background()
newEngine := engine.New(engine.Opts{
EngineOpts: opts,
LogicalOptimizers: logicalplan.AllOptimizers,
EnableXFunctions: true,
})
query, err := newEngine.NewRangeQuery(ctx, storage, nil, tc.query, tc.startTime, tc.endTime, tc.step)
testutil.Ok(t, err)
defer query.Close()

engineResult := query.Exec(ctx)
testutil.Ok(t, engineResult.Err)

gotMatrix, err := engineResult.Matrix()
require.NoError(t, err)

for i := range tc.expected {
testutil.WithGoCmp(comparer).Equals(t, tc.expected[i].Floats, gotMatrix[i].Floats, queryExplanation(query))
}
})
}
}

func TestXFunctionsWhenDisabled(t *testing.T) {
var (
query = "xincrease(http_requests[50s])"
Expand Down
19 changes: 10 additions & 9 deletions storage/prometheus/matrix_selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import (
"github.com/prometheus/prometheus/util/annotations"
)

const notSeenSentinel int64 = math.MinInt64

type matrixScanner struct {
labels labels.Labels
signature uint64
Expand Down Expand Up @@ -241,9 +243,9 @@ func (o *matrixSelector) loadSeries(ctx context.Context) error {
labels: lbls,
signature: s.Signature,
iterator: s.Iterator(nil),
lastSample: ringbuffer.Sample{T: math.MinInt64},
lastSample: ringbuffer.Sample{T: notSeenSentinel},
buffer: o.newBuffer(ctx),
metricAppearedTs: math.MinInt64,
metricAppearedTs: notSeenSentinel,
}
o.series[i] = lbls
}
Expand Down Expand Up @@ -322,16 +324,15 @@ func (m *matrixScanner) selectPoints(
mint = maxInt64(mint, m.buffer.MaxT()+1)
if m.lastSample.T > mint {
m.buffer.Push(m.lastSample.T, m.lastSample.V)
m.lastSample.T = math.MinInt64
m.lastSample.T = notSeenSentinel
mint = maxInt64(mint, m.buffer.MaxT()+1)
}

var (
// The sample that we add for x-functions, -1 is a canary value for the situation
// The sample that we add for x-functions, notSeenSentinel is a canary value for the situation
// where we have no sample in the extended lookback delta
extSample = ringbuffer.Sample{T: -1}
extSample = ringbuffer.Sample{T: notSeenSentinel}
)

for valType := m.iterator.Next(); valType != chunkenc.ValNone; valType = m.iterator.Next() {
switch valType {
case chunkenc.ValHistogram, chunkenc.ValFloatHistogram:
Expand Down Expand Up @@ -360,17 +361,17 @@ func (m *matrixScanner) selectPoints(
if value.IsStaleNaN(v) {
continue
}
if m.metricAppearedTs == math.MinInt64 {
if m.metricAppearedTs == notSeenSentinel {
m.metricAppearedTs = t
}
if t > maxt {
m.lastSample.T, m.lastSample.V.F, m.lastSample.V.H = t, v, nil
return nil
}
if t > mint {
if extSample.T != -1 && isExtFunction {
if extSample.T != notSeenSentinel && isExtFunction {
m.buffer.Push(extSample.T, ringbuffer.Value{F: extSample.V.F})
extSample.T = -1
extSample.T = notSeenSentinel
}
m.buffer.Push(t, ringbuffer.Value{F: v})
continue
Expand Down
Loading