Skip to content

Commit 7a87d6b

Browse files
fpetkovskisubhramit
authored andcommitted
Fix distributed queries with explicit timestamp (thanos-io#510)
* Fix distributed queries with explicit timestamp The distributed optimizer does not consider queries with a specific timestamp which causes them to be incorrect for some cases. Signed-off-by: Filip Petkovski <[email protected]> * Fallback when no match Signed-off-by: Filip Petkovski <[email protected]> * Fix tests Signed-off-by: Filip Petkovski <[email protected]> * Remove leftover Signed-off-by: Filip Petkovski <[email protected]> * Cover subqueries Signed-off-by: Filip Petkovski <[email protected]> * Remove field Signed-off-by: Filip Petkovski <[email protected]> * Add logical tests Signed-off-by: Filip Petkovski <[email protected]> --------- Signed-off-by: Filip Petkovski <[email protected]> Signed-off-by: subhramit <[email protected]>
1 parent 7d16969 commit 7a87d6b

File tree

6 files changed

+99
-19
lines changed

6 files changed

+99
-19
lines changed

engine/distributed_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,11 @@ func TestDistributedAggregations(t *testing.T) {
245245
{name: "subquery over distributed binary expression", query: `max_over_time((bar / bar)[30s:15s])`},
246246
{name: "timestamp", query: `timestamp(bar)`},
247247
{name: "timestamp - step invariant", query: `timestamp(bar @ 6000.000)`},
248+
{name: "query with @start() absolute timestamp", query: `sum(bar @ start())`},
249+
{name: "query with @end() timestamp", query: `sum(bar @ end())`},
250+
{name: "query with numeric timestamp", query: `sum(bar @ 140.000)`},
251+
{name: "query with range and @end() timestamp", query: `sum(count_over_time(bar[1h] @ end()))`, expectFallback: true},
252+
{name: `subquery with @end() timestamp`, query: `bar @ 100.000 - bar @ 150.000`},
248253
}
249254

250255
lookbackDeltas := []time.Duration{0, 30 * time.Second, 5 * time.Minute}

execution/execution.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -376,7 +376,7 @@ func newDeduplication(ctx context.Context, e logicalplan.Deduplicate, scanners s
376376

377377
func newRemoteExecution(ctx context.Context, e logicalplan.RemoteExecution, opts *query.Options, hints promstorage.SelectHints) (model.VectorOperator, error) {
378378
// Create a new remote query scoped to the calculated start time.
379-
qry, err := e.Engine.NewRangeQuery(ctx, promql.NewPrometheusQueryOpts(false, opts.LookbackDelta), e.Query, e.QueryRangeStart, opts.End, opts.Step)
379+
qry, err := e.Engine.NewRangeQuery(ctx, promql.NewPrometheusQueryOpts(false, opts.LookbackDelta), e.Query, e.QueryRangeStart, e.QueryRangeEnd, opts.Step)
380380
if err != nil {
381381
return nil, err
382382
}
@@ -386,7 +386,7 @@ func newRemoteExecution(ctx context.Context, e logicalplan.RemoteExecution, opts
386386
// We need to set the lookback for the selector to 0 since the remote query already applies one lookback.
387387
selectorOpts := *opts
388388
selectorOpts.LookbackDelta = 0
389-
remoteExec := remote.NewExecution(qry, model.NewVectorPool(opts.StepsBatch), e.QueryRangeStart, e.Engine.LabelSets(), &selectorOpts, hints)
389+
remoteExec := remote.NewExecution(qry, model.NewVectorPool(opts.StepsBatch), e.QueryRangeStart, e.QueryRangeEnd, e.Engine.LabelSets(), &selectorOpts, hints)
390390
return exchange.NewConcurrent(remoteExec, 2, opts), nil
391391
}
392392

execution/remote/operator.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,17 +28,20 @@ type Execution struct {
2828
query promql.Query
2929
opts *query.Options
3030
queryRangeStart time.Time
31-
vectorSelector model.VectorOperator
31+
queryRangeEnd time.Time
32+
33+
vectorSelector model.VectorOperator
3234
telemetry.OperatorTelemetry
3335
}
3436

35-
func NewExecution(query promql.Query, pool *model.VectorPool, queryRangeStart time.Time, engineLabels []labels.Labels, opts *query.Options, _ storage.SelectHints) *Execution {
37+
func NewExecution(query promql.Query, pool *model.VectorPool, queryRangeStart, queryRangeEnd time.Time, engineLabels []labels.Labels, opts *query.Options, _ storage.SelectHints) *Execution {
3638
storage := newStorageFromQuery(query, opts, engineLabels)
3739
oper := &Execution{
3840
storage: storage,
3941
query: query,
4042
opts: opts,
4143
queryRangeStart: queryRangeStart,
44+
queryRangeEnd: queryRangeEnd,
4245
vectorSelector: promstorage.NewVectorSelector(pool, storage, opts, 0, 0, false, 0, 1),
4346
}
4447

@@ -57,7 +60,7 @@ func (e *Execution) Series(ctx context.Context) ([]labels.Labels, error) {
5760
}
5861

5962
func (e *Execution) String() string {
60-
return fmt.Sprintf("[remoteExec] %s (%d, %d)", e.query, e.queryRangeStart.Unix(), e.opts.End.Unix())
63+
return fmt.Sprintf("[remoteExec] %s (%d, %d)", e.query, e.queryRangeStart.Unix(), e.queryRangeEnd.Unix())
6164
}
6265

6366
func (e *Execution) Next(ctx context.Context) ([]model.StepVector, error) {

logicalplan/distribute.go

Lines changed: 43 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ type RemoteExecution struct {
8383
Engine api.RemoteEngine
8484
Query Node
8585
QueryRangeStart time.Time
86+
QueryRangeEnd time.Time
8687
}
8788

8889
func (r RemoteExecution) Clone() Node {
@@ -95,7 +96,7 @@ func (r RemoteExecution) String() string {
9596
if r.QueryRangeStart.UnixMilli() == 0 {
9697
return fmt.Sprintf("remote(%s)", r.Query)
9798
}
98-
return fmt.Sprintf("remote(%s) [%s]", r.Query, r.QueryRangeStart.UTC().String())
99+
return fmt.Sprintf("remote(%s) [%s, %s]", r.Query, r.QueryRangeStart.UTC().String(), r.QueryRangeEnd.UTC().String())
99100
}
100101

101102
func (r RemoteExecution) Type() NodeType { return RemoteExecutionNode }
@@ -316,6 +317,21 @@ func (m DistributedExecutionOptimizer) distributeQuery(expr *Node, engines []api
316317
return *expr
317318
}
318319

320+
// Selectors in queries can be scoped to a single timestamp. This case is hard to
321+
// distribute properly and can lead to flaky results.
322+
// We only do it if all engines have sufficient scope for the full range of the query,
323+
// adjusted for the timestamp.
324+
// Otherwise, we fall back to the default mode of not executing the query remotely.
325+
if timestamps := getQueryTimestamps(expr); len(timestamps) > 0 {
326+
for _, e := range engines {
327+
for _, ts := range timestamps {
328+
if e.MinT() > ts-startOffset.Milliseconds() || e.MaxT() < ts {
329+
return *expr
330+
}
331+
}
332+
}
333+
}
334+
319335
var globalMinT int64 = math.MaxInt64
320336
for _, e := range engines {
321337
if e.MinT() < globalMinT {
@@ -344,6 +360,7 @@ func (m DistributedExecutionOptimizer) distributeQuery(expr *Node, engines []api
344360
Engine: e,
345361
Query: (*expr).Clone(),
346362
QueryRangeStart: start,
363+
QueryRangeEnd: opts.End,
347364
})
348365
}
349366

@@ -369,6 +386,7 @@ func (m DistributedExecutionOptimizer) distributeAbsent(expr Node, engines []api
369386
Engine: engines[i],
370387
Query: expr.Clone(),
371388
QueryRangeStart: opts.Start,
389+
QueryRangeEnd: opts.End,
372390
})
373391
}
374392
// We need to make sure that absent is at least evaluated against one engine.
@@ -380,6 +398,7 @@ func (m DistributedExecutionOptimizer) distributeAbsent(expr Node, engines []api
380398
Engine: engines[len(engines)-1],
381399
Query: expr,
382400
QueryRangeStart: opts.Start,
401+
QueryRangeEnd: opts.End,
383402
}
384403
}
385404

@@ -464,8 +483,10 @@ func calculateStartOffset(expr *Node, lookbackDelta time.Duration) time.Duration
464483
return lookbackDelta
465484
}
466485

467-
var selectRange time.Duration
468-
var offset time.Duration
486+
var (
487+
selectRange time.Duration
488+
offset time.Duration
489+
)
469490
Traverse(expr, func(node *Node) {
470491
switch n := (*node).(type) {
471492
case *Subquery:
@@ -479,6 +500,25 @@ func calculateStartOffset(expr *Node, lookbackDelta time.Duration) time.Duration
479500
return maxDuration(offset+selectRange, lookbackDelta)
480501
}
481502

503+
func getQueryTimestamps(expr *Node) []int64 {
504+
var timestamps []int64
505+
Traverse(expr, func(node *Node) {
506+
switch n := (*node).(type) {
507+
case *Subquery:
508+
if n.Timestamp != nil {
509+
timestamps = append(timestamps, *n.Timestamp)
510+
return
511+
}
512+
case *VectorSelector:
513+
if n.Timestamp != nil {
514+
timestamps = append(timestamps, *n.Timestamp)
515+
return
516+
}
517+
}
518+
})
519+
return timestamps
520+
}
521+
482522
func numSteps(start, end time.Time, step time.Duration) int64 {
483523
return (end.UnixMilli()-start.UnixMilli())/step.Milliseconds() + 1
484524
}

logicalplan/distribute_test.go

Lines changed: 41 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -339,9 +339,9 @@ remote(sum by (pod, region) (rate(http_requests_total[2m]) * 60))))`,
339339
expr: `sum_over_time(max(http_requests_total)[5m:1m])`,
340340
expected: `
341341
sum_over_time(max(dedup(
342-
remote(max by (region) (http_requests_total)) [1969-12-31 23:55:00 +0000 UTC],
343-
remote(max by (region) (http_requests_total)) [1969-12-31 23:55:00 +0000 UTC]
344-
))[5m:1m])`,
342+
remote(max by (region) (http_requests_total)) [1969-12-31 23:55:00 +0000 UTC, 1970-01-01 00:00:00 +0000 UTC],
343+
remote(max by (region) (http_requests_total)) [1969-12-31 23:55:00 +0000 UTC, 1970-01-01 00:00:00 +0000 UTC])
344+
)[5m:1m])`,
345345
},
346346
{
347347
name: "label based pruning matches one engine",
@@ -493,7 +493,7 @@ func TestDistributedExecutionWithLongSelectorRanges(t *testing.T) {
493493
expected: `
494494
dedup(
495495
remote(sum_over_time(metric[5m])),
496-
remote(sum_over_time(metric[5m])) [1970-01-01 06:05:00 +0000 UTC]
496+
remote(sum_over_time(metric[5m])) [1970-01-01 06:05:00 +0000 UTC, 1970-01-01 12:00:00 +0000 UTC]
497497
)`,
498498
},
499499
{
@@ -510,7 +510,7 @@ dedup(
510510
expected: `
511511
dedup(
512512
remote(sum_over_time(metric[2h])),
513-
remote(sum_over_time(metric[2h])) [1970-01-01 08:00:00 +0000 UTC]
513+
remote(sum_over_time(metric[2h])) [1970-01-01 08:00:00 +0000 UTC, 1970-01-01 12:00:00 +0000 UTC]
514514
)`,
515515
},
516516
{
@@ -527,7 +527,7 @@ dedup(
527527
expected: `
528528
dedup(
529529
remote(sum_over_time(sum_over_time(metric[1h])[1h:30m])),
530-
remote(sum_over_time(sum_over_time(metric[1h])[1h:30m])) [1970-01-01 08:00:00 +0000 UTC]
530+
remote(sum_over_time(sum_over_time(metric[1h])[1h:30m])) [1970-01-01 08:00:00 +0000 UTC, 1970-01-01 12:00:00 +0000 UTC]
531531
)`,
532532
},
533533
{
@@ -543,7 +543,7 @@ dedup(
543543
expr: `max_over_time(sum_over_time(sum_over_time(metric[5m])[45m:10m])[15m:15m])`,
544544
expected: `dedup(
545545
remote(max_over_time(sum_over_time(sum_over_time(metric[5m])[45m:10m])[15m:15m])),
546-
remote(max_over_time(sum_over_time(sum_over_time(metric[5m])[45m:10m])[15m:15m])) [1970-01-01 07:05:00 +0000 UTC])`,
546+
remote(max_over_time(sum_over_time(sum_over_time(metric[5m])[45m:10m])[15m:15m])) [1970-01-01 07:05:00 +0000 UTC, 1970-01-01 12:00:00 +0000 UTC])`,
547547
},
548548
{
549549
name: "subquery with a total 4h range is cannot be distributed",
@@ -571,6 +571,36 @@ dedup(
571571
expr: `sum_over_time(metric[3h])`,
572572
expected: `sum_over_time(metric[3h])`,
573573
},
574+
{
575+
name: "distribute queries with timestamp",
576+
firstEngineOpts: engineOpts{
577+
minTime: queryStart,
578+
maxTime: time.Unix(0, 0).Add(eightHours),
579+
},
580+
secondEngineOpts: engineOpts{
581+
minTime: time.Unix(0, 0).Add(sixHours),
582+
maxTime: queryEnd,
583+
},
584+
expr: `sum(metric @ 25200)`,
585+
expected: `
586+
sum(dedup(
587+
remote(sum by (region) (metric @ 25200.000)),
588+
remote(sum by (region) (metric @ 25200.000)) [1970-01-01 06:00:00 +0000 UTC, 1970-01-01 12:00:00 +0000 UTC]
589+
))`,
590+
},
591+
{
592+
name: "skip distributing queries with timestamps outside of the range of an engine",
593+
firstEngineOpts: engineOpts{
594+
minTime: queryStart,
595+
maxTime: time.Unix(0, 0).Add(eightHours),
596+
},
597+
secondEngineOpts: engineOpts{
598+
minTime: time.Unix(0, 0).Add(sixHours),
599+
maxTime: queryEnd,
600+
},
601+
expr: `sum(metric @ 18000)`,
602+
expected: `sum(sum by (region) (metric @ 18000.000))`,
603+
},
574604
}
575605

576606
for _, tcase := range cases {
@@ -616,14 +646,14 @@ func TestDistributedExecutionPruningByTime(t *testing.T) {
616646
expr: `sum(metric)`,
617647
queryStart: time.Unix(0, 0).Add(7 * time.Hour),
618648
queryEnd: time.Unix(0, 0).Add(8 * time.Hour),
619-
expected: `sum(dedup(remote(sum by (region) (metric)) [1970-01-01 07:00:00 +0000 UTC]))`,
649+
expected: `sum(dedup(remote(sum by (region) (metric)) [1970-01-01 07:00:00 +0000 UTC, 1970-01-01 08:00:00 +0000 UTC]))`,
620650
},
621651
{
622652
name: "1 hour range query at the start of the range prunes the second engine",
623653
expr: `sum(metric)`,
624654
queryStart: time.Unix(0, 0).Add(1 * time.Hour),
625655
queryEnd: time.Unix(0, 0).Add(2 * time.Hour),
626-
expected: `sum(dedup(remote(sum by (region) (metric)) [1970-01-01 01:00:00 +0000 UTC]))`,
656+
expected: `sum(dedup(remote(sum by (region) (metric)) [1970-01-01 01:00:00 +0000 UTC, 1970-01-01 02:00:00 +0000 UTC]))`,
627657
},
628658
{
629659
name: "instant query in the overlapping range queries both engines",
@@ -633,8 +663,8 @@ func TestDistributedExecutionPruningByTime(t *testing.T) {
633663
expected: `
634664
sum(
635665
dedup(
636-
remote(sum by (region) (metric)) [1970-01-01 06:00:00 +0000 UTC],
637-
remote(sum by (region) (metric)) [1970-01-01 06:00:00 +0000 UTC]
666+
remote(sum by (region) (metric)) [1970-01-01 06:00:00 +0000 UTC, 1970-01-01 06:00:00 +0000 UTC],
667+
remote(sum by (region) (metric)) [1970-01-01 06:00:00 +0000 UTC, 1970-01-01 06:00:00 +0000 UTC]
638668
)
639669
)`,
640670
},

logicalplan/passthrough.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ func (m PassthroughOptimizer) Optimize(plan Node, opts *query.Options) (Node, an
5252
Engine: engines[0],
5353
Query: plan.Clone(),
5454
QueryRangeStart: opts.Start,
55+
QueryRangeEnd: opts.End,
5556
}, nil
5657
}
5758

@@ -78,6 +79,7 @@ func (m PassthroughOptimizer) Optimize(plan Node, opts *query.Options) (Node, an
7879
Engine: matchingLabelsEngines[0],
7980
Query: plan.Clone(),
8081
QueryRangeStart: opts.Start,
82+
QueryRangeEnd: opts.End,
8183
}, nil
8284
}
8385

0 commit comments

Comments
 (0)