Skip to content

Commit e7ad926

Browse files
committed
Fallback when no match
Signed-off-by: Filip Petkovski <[email protected]>
1 parent 8b6ccd4 commit e7ad926

File tree

3 files changed

+30
-25
lines changed

3 files changed

+30
-25
lines changed

engine/distributed_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -247,8 +247,9 @@ func TestDistributedAggregations(t *testing.T) {
247247
{name: "timestamp - step invariant", query: `timestamp(bar @ 6000.000)`},
248248
{name: "query with @start() absolute timestamp", query: `sum(bar @ start())`},
249249
{name: "query with @end() timestamp", query: `sum(bar @ end())`},
250-
{name: "query with numeric timestamp", query: `sum(bar @ 140)`},
250+
{name: "query with numeric timestamp", query: `sum(bar @ 140.000)`},
251251
{name: "query with range and @end() timestamp", query: `sum(count_over_time(bar[1h] @ end()))`, expectFallback: true},
252+
{name: `subquery with @end() timestamp`, query: `bar @ 100.000 - bar @ 150.000`},
252253
}
253254

254255
lookbackDeltas := []time.Duration{0, 30 * time.Second, 5 * time.Minute}

logicalplan/distribute.go

Lines changed: 24 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -317,32 +317,32 @@ func (m DistributedExecutionOptimizer) distributeQuery(expr *Node, engines []api
317317
return *expr
318318
}
319319

320-
// If a query is scoped to a single timestamp, we distribute it to all engines which have sufficient scope
321-
// for the timestamp and range of the query.
322-
// One example of such a query is `sum(metric @ end())`.
323-
if ts := getQueryTimestamp(expr); ts != nil && !opts.IsInstantQuery() {
324-
remoteQueries := make(RemoteExecutions, 0, len(engines))
320+
// Selectors in queries can be scoped to a single timestamp. This case is hard to
321+
// distribute properly and can lead to flaky results.
322+
// We only do it if all engines have sufficient scope for the full range of the query,
323+
// adjusted for the timestamp.
324+
// Otherwise, we fall back to the default mode of not executing the query remotely.
325+
if timestamps := getQueryTimestamps(expr); len(timestamps) > 0 {
326+
var numMatches int
325327
for _, e := range engines {
326-
if !matchesExternalLabelSet(*expr, e.LabelSets()) {
327-
continue
328-
}
329-
if e.MinT() > *ts-startOffset.Milliseconds() {
330-
continue
328+
containsTimestamps := true
329+
for _, ts := range timestamps {
330+
if e.MinT() > ts-startOffset.Milliseconds() {
331+
containsTimestamps = false
332+
break
333+
}
334+
if e.MaxT() < ts {
335+
containsTimestamps = false
336+
break
337+
}
331338
}
332-
if e.MaxT() < *ts {
333-
continue
339+
if containsTimestamps {
340+
numMatches++
334341
}
335-
remoteQueries = append(remoteQueries, RemoteExecution{
336-
Engine: e,
337-
Query: (*expr).Clone(),
338-
QueryRangeStart: opts.Start,
339-
QueryRangeEnd: opts.End,
340-
})
341342
}
342-
if len(remoteQueries) == 0 {
343+
if numMatches != len(engines) {
343344
return *expr
344345
}
345-
return Deduplicate{Expressions: remoteQueries}
346346
}
347347

348348
var globalMinT int64 = math.MaxInt64
@@ -513,18 +513,18 @@ func calculateStartOffset(expr *Node, lookbackDelta time.Duration) time.Duration
513513
return maxDuration(offset+selectRange, lookbackDelta)
514514
}
515515

516-
func getQueryTimestamp(expr *Node) *int64 {
517-
var timestamp *int64
516+
func getQueryTimestamps(expr *Node) []int64 {
517+
var timestamps []int64
518518
Traverse(expr, func(node *Node) {
519519
switch n := (*node).(type) {
520520
case *VectorSelector:
521521
if n.Timestamp != nil {
522-
timestamp = n.Timestamp
522+
timestamps = append(timestamps, *n.Timestamp)
523523
return
524524
}
525525
}
526526
})
527-
return timestamp
527+
return timestamps
528528
}
529529

530530
func numSteps(start, end time.Time, step time.Duration) int64 {

logicalplan/plan.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ type plan struct {
4545

4646
type PlanOptions struct {
4747
DisableDuplicateLabelCheck bool
48+
StripStepInvariant bool
4849
}
4950

5051
// New creates a new logical plan from logical node.
@@ -244,6 +245,9 @@ func replacePrometheusNodes(plan parser.Expr) Node {
244245
case *parser.NumberLiteral:
245246
return &NumberLiteral{Val: t.Val}
246247
case *parser.StepInvariantExpr:
248+
//if opts.StripStepInvariant {
249+
// return replacePrometheusNodes(t.Expr, opts)
250+
//}
247251
return &StepInvariantExpr{Expr: replacePrometheusNodes(t.Expr)}
248252
case *parser.MatrixSelector:
249253
return &MatrixSelector{

0 commit comments

Comments
 (0)