diff --git a/engine/distributed_test.go b/engine/distributed_test.go index 938442582..94b1e964d 100644 --- a/engine/distributed_test.go +++ b/engine/distributed_test.go @@ -245,6 +245,11 @@ func TestDistributedAggregations(t *testing.T) { {name: "subquery over distributed binary expression", query: `max_over_time((bar / bar)[30s:15s])`}, {name: "timestamp", query: `timestamp(bar)`}, {name: "timestamp - step invariant", query: `timestamp(bar @ 6000.000)`}, + {name: "query with @start() absolute timestamp", query: `sum(bar @ start())`}, + {name: "query with @end() timestamp", query: `sum(bar @ end())`}, + {name: "query with numeric timestamp", query: `sum(bar @ 140.000)`}, + {name: "query with range and @end() timestamp", query: `sum(count_over_time(bar[1h] @ end()))`, expectFallback: true}, + {name: `subquery with @end() timestamp`, query: `bar @ 100.000 - bar @ 150.000`}, } lookbackDeltas := []time.Duration{0, 30 * time.Second, 5 * time.Minute} diff --git a/execution/execution.go b/execution/execution.go index d46541019..7cf72d7df 100644 --- a/execution/execution.go +++ b/execution/execution.go @@ -376,7 +376,7 @@ func newDeduplication(ctx context.Context, e logicalplan.Deduplicate, scanners s func newRemoteExecution(ctx context.Context, e logicalplan.RemoteExecution, opts *query.Options, hints promstorage.SelectHints) (model.VectorOperator, error) { // Create a new remote query scoped to the calculated start time. - qry, err := e.Engine.NewRangeQuery(ctx, promql.NewPrometheusQueryOpts(false, opts.LookbackDelta), e.Query, e.QueryRangeStart, opts.End, opts.Step) + qry, err := e.Engine.NewRangeQuery(ctx, promql.NewPrometheusQueryOpts(false, opts.LookbackDelta), e.Query, e.QueryRangeStart, e.QueryRangeEnd, opts.Step) if err != nil { return nil, err } @@ -386,7 +386,7 @@ func newRemoteExecution(ctx context.Context, e logicalplan.RemoteExecution, opts // We need to set the lookback for the selector to 0 since the remote query already applies one lookback. selectorOpts := *opts selectorOpts.LookbackDelta = 0 - remoteExec := remote.NewExecution(qry, model.NewVectorPool(opts.StepsBatch), e.QueryRangeStart, e.Engine.LabelSets(), &selectorOpts, hints) + remoteExec := remote.NewExecution(qry, model.NewVectorPool(opts.StepsBatch), e.QueryRangeStart, e.QueryRangeEnd, e.Engine.LabelSets(), &selectorOpts, hints) return exchange.NewConcurrent(remoteExec, 2, opts), nil } diff --git a/execution/remote/operator.go b/execution/remote/operator.go index 761e69812..e1efb98f2 100644 --- a/execution/remote/operator.go +++ b/execution/remote/operator.go @@ -28,17 +28,20 @@ type Execution struct { query promql.Query opts *query.Options queryRangeStart time.Time - vectorSelector model.VectorOperator + queryRangeEnd time.Time + + vectorSelector model.VectorOperator telemetry.OperatorTelemetry } -func NewExecution(query promql.Query, pool *model.VectorPool, queryRangeStart time.Time, engineLabels []labels.Labels, opts *query.Options, _ storage.SelectHints) *Execution { +func NewExecution(query promql.Query, pool *model.VectorPool, queryRangeStart, queryRangeEnd time.Time, engineLabels []labels.Labels, opts *query.Options, _ storage.SelectHints) *Execution { storage := newStorageFromQuery(query, opts, engineLabels) oper := &Execution{ storage: storage, query: query, opts: opts, queryRangeStart: queryRangeStart, + queryRangeEnd: queryRangeEnd, vectorSelector: promstorage.NewVectorSelector(pool, storage, opts, 0, 0, false, 0, 1), } @@ -57,7 +60,7 @@ func (e *Execution) Series(ctx context.Context) ([]labels.Labels, error) { } func (e *Execution) String() string { - return fmt.Sprintf("[remoteExec] %s (%d, %d)", e.query, e.queryRangeStart.Unix(), e.opts.End.Unix()) + return fmt.Sprintf("[remoteExec] %s (%d, %d)", e.query, e.queryRangeStart.Unix(), e.queryRangeEnd.Unix()) } func (e *Execution) Next(ctx context.Context) ([]model.StepVector, error) { diff --git a/logicalplan/distribute.go b/logicalplan/distribute.go index f1ae3e0cb..7eb63d4f9 100644 --- a/logicalplan/distribute.go +++ b/logicalplan/distribute.go @@ -83,6 +83,7 @@ type RemoteExecution struct { Engine api.RemoteEngine Query Node QueryRangeStart time.Time + QueryRangeEnd time.Time } func (r RemoteExecution) Clone() Node { @@ -95,7 +96,7 @@ func (r RemoteExecution) String() string { if r.QueryRangeStart.UnixMilli() == 0 { return fmt.Sprintf("remote(%s)", r.Query) } - return fmt.Sprintf("remote(%s) [%s]", r.Query, r.QueryRangeStart.UTC().String()) + return fmt.Sprintf("remote(%s) [%s, %s]", r.Query, r.QueryRangeStart.UTC().String(), r.QueryRangeEnd.UTC().String()) } func (r RemoteExecution) Type() NodeType { return RemoteExecutionNode } @@ -316,6 +317,21 @@ func (m DistributedExecutionOptimizer) distributeQuery(expr *Node, engines []api return *expr } + // Selectors in queries can be scoped to a single timestamp. This case is hard to + // distribute properly and can lead to flaky results. + // We only do it if all engines have sufficient scope for the full range of the query, + // adjusted for the timestamp. + // Otherwise, we fall back to the default mode of not executing the query remotely. + if timestamps := getQueryTimestamps(expr); len(timestamps) > 0 { + for _, e := range engines { + for _, ts := range timestamps { + if e.MinT() > ts-startOffset.Milliseconds() || e.MaxT() < ts { + return *expr + } + } + } + } + var globalMinT int64 = math.MaxInt64 for _, e := range engines { if e.MinT() < globalMinT { @@ -344,6 +360,7 @@ func (m DistributedExecutionOptimizer) distributeQuery(expr *Node, engines []api Engine: e, Query: (*expr).Clone(), QueryRangeStart: start, + QueryRangeEnd: opts.End, }) } @@ -369,6 +386,7 @@ func (m DistributedExecutionOptimizer) distributeAbsent(expr Node, engines []api Engine: engines[i], Query: expr.Clone(), QueryRangeStart: opts.Start, + QueryRangeEnd: opts.End, }) } // We need to make sure that absent is at least evaluated against one engine. @@ -380,6 +398,7 @@ func (m DistributedExecutionOptimizer) distributeAbsent(expr Node, engines []api Engine: engines[len(engines)-1], Query: expr, QueryRangeStart: opts.Start, + QueryRangeEnd: opts.End, } } @@ -464,8 +483,10 @@ func calculateStartOffset(expr *Node, lookbackDelta time.Duration) time.Duration return lookbackDelta } - var selectRange time.Duration - var offset time.Duration + var ( + selectRange time.Duration + offset time.Duration + ) Traverse(expr, func(node *Node) { switch n := (*node).(type) { case *Subquery: @@ -479,6 +500,25 @@ func calculateStartOffset(expr *Node, lookbackDelta time.Duration) time.Duration return maxDuration(offset+selectRange, lookbackDelta) } +func getQueryTimestamps(expr *Node) []int64 { + var timestamps []int64 + Traverse(expr, func(node *Node) { + switch n := (*node).(type) { + case *Subquery: + if n.Timestamp != nil { + timestamps = append(timestamps, *n.Timestamp) + return + } + case *VectorSelector: + if n.Timestamp != nil { + timestamps = append(timestamps, *n.Timestamp) + return + } + } + }) + return timestamps +} + func numSteps(start, end time.Time, step time.Duration) int64 { return (end.UnixMilli()-start.UnixMilli())/step.Milliseconds() + 1 } diff --git a/logicalplan/distribute_test.go b/logicalplan/distribute_test.go index 62fd0977a..7354c0752 100644 --- a/logicalplan/distribute_test.go +++ b/logicalplan/distribute_test.go @@ -339,9 +339,9 @@ remote(sum by (pod, region) (rate(http_requests_total[2m]) * 60))))`, expr: `sum_over_time(max(http_requests_total)[5m:1m])`, expected: ` sum_over_time(max(dedup( - remote(max by (region) (http_requests_total)) [1969-12-31 23:55:00 +0000 UTC], - remote(max by (region) (http_requests_total)) [1969-12-31 23:55:00 +0000 UTC] -))[5m:1m])`, + remote(max by (region) (http_requests_total)) [1969-12-31 23:55:00 +0000 UTC, 1970-01-01 00:00:00 +0000 UTC], + remote(max by (region) (http_requests_total)) [1969-12-31 23:55:00 +0000 UTC, 1970-01-01 00:00:00 +0000 UTC]) +)[5m:1m])`, }, { name: "label based pruning matches one engine", @@ -493,7 +493,7 @@ func TestDistributedExecutionWithLongSelectorRanges(t *testing.T) { expected: ` dedup( remote(sum_over_time(metric[5m])), - remote(sum_over_time(metric[5m])) [1970-01-01 06:05:00 +0000 UTC] + remote(sum_over_time(metric[5m])) [1970-01-01 06:05:00 +0000 UTC, 1970-01-01 12:00:00 +0000 UTC] )`, }, { @@ -510,7 +510,7 @@ dedup( expected: ` dedup( remote(sum_over_time(metric[2h])), - remote(sum_over_time(metric[2h])) [1970-01-01 08:00:00 +0000 UTC] + remote(sum_over_time(metric[2h])) [1970-01-01 08:00:00 +0000 UTC, 1970-01-01 12:00:00 +0000 UTC] )`, }, { @@ -527,7 +527,7 @@ dedup( expected: ` dedup( remote(sum_over_time(sum_over_time(metric[1h])[1h:30m])), - remote(sum_over_time(sum_over_time(metric[1h])[1h:30m])) [1970-01-01 08:00:00 +0000 UTC] + remote(sum_over_time(sum_over_time(metric[1h])[1h:30m])) [1970-01-01 08:00:00 +0000 UTC, 1970-01-01 12:00:00 +0000 UTC] )`, }, { @@ -543,7 +543,7 @@ dedup( expr: `max_over_time(sum_over_time(sum_over_time(metric[5m])[45m:10m])[15m:15m])`, expected: `dedup( remote(max_over_time(sum_over_time(sum_over_time(metric[5m])[45m:10m])[15m:15m])), - remote(max_over_time(sum_over_time(sum_over_time(metric[5m])[45m:10m])[15m:15m])) [1970-01-01 07:05:00 +0000 UTC])`, + remote(max_over_time(sum_over_time(sum_over_time(metric[5m])[45m:10m])[15m:15m])) [1970-01-01 07:05:00 +0000 UTC, 1970-01-01 12:00:00 +0000 UTC])`, }, { name: "subquery with a total 4h range is cannot be distributed", @@ -571,6 +571,36 @@ dedup( expr: `sum_over_time(metric[3h])`, expected: `sum_over_time(metric[3h])`, }, + { + name: "distribute queries with timestamp", + firstEngineOpts: engineOpts{ + minTime: queryStart, + maxTime: time.Unix(0, 0).Add(eightHours), + }, + secondEngineOpts: engineOpts{ + minTime: time.Unix(0, 0).Add(sixHours), + maxTime: queryEnd, + }, + expr: `sum(metric @ 25200)`, + expected: ` +sum(dedup( + remote(sum by (region) (metric @ 25200.000)), + remote(sum by (region) (metric @ 25200.000)) [1970-01-01 06:00:00 +0000 UTC, 1970-01-01 12:00:00 +0000 UTC] +))`, + }, + { + name: "skip distributing queries with timestamps outside of the range of an engine", + firstEngineOpts: engineOpts{ + minTime: queryStart, + maxTime: time.Unix(0, 0).Add(eightHours), + }, + secondEngineOpts: engineOpts{ + minTime: time.Unix(0, 0).Add(sixHours), + maxTime: queryEnd, + }, + expr: `sum(metric @ 18000)`, + expected: `sum(sum by (region) (metric @ 18000.000))`, + }, } for _, tcase := range cases { @@ -616,14 +646,14 @@ func TestDistributedExecutionPruningByTime(t *testing.T) { expr: `sum(metric)`, queryStart: time.Unix(0, 0).Add(7 * time.Hour), queryEnd: time.Unix(0, 0).Add(8 * time.Hour), - expected: `sum(dedup(remote(sum by (region) (metric)) [1970-01-01 07:00:00 +0000 UTC]))`, + expected: `sum(dedup(remote(sum by (region) (metric)) [1970-01-01 07:00:00 +0000 UTC, 1970-01-01 08:00:00 +0000 UTC]))`, }, { name: "1 hour range query at the start of the range prunes the second engine", expr: `sum(metric)`, queryStart: time.Unix(0, 0).Add(1 * time.Hour), queryEnd: time.Unix(0, 0).Add(2 * time.Hour), - expected: `sum(dedup(remote(sum by (region) (metric)) [1970-01-01 01:00:00 +0000 UTC]))`, + expected: `sum(dedup(remote(sum by (region) (metric)) [1970-01-01 01:00:00 +0000 UTC, 1970-01-01 02:00:00 +0000 UTC]))`, }, { name: "instant query in the overlapping range queries both engines", @@ -633,8 +663,8 @@ func TestDistributedExecutionPruningByTime(t *testing.T) { expected: ` sum( dedup( - remote(sum by (region) (metric)) [1970-01-01 06:00:00 +0000 UTC], - remote(sum by (region) (metric)) [1970-01-01 06:00:00 +0000 UTC] + remote(sum by (region) (metric)) [1970-01-01 06:00:00 +0000 UTC, 1970-01-01 06:00:00 +0000 UTC], + remote(sum by (region) (metric)) [1970-01-01 06:00:00 +0000 UTC, 1970-01-01 06:00:00 +0000 UTC] ) )`, }, diff --git a/logicalplan/passthrough.go b/logicalplan/passthrough.go index 489092dde..3f3bbf346 100644 --- a/logicalplan/passthrough.go +++ b/logicalplan/passthrough.go @@ -52,6 +52,7 @@ func (m PassthroughOptimizer) Optimize(plan Node, opts *query.Options) (Node, an Engine: engines[0], Query: plan.Clone(), QueryRangeStart: opts.Start, + QueryRangeEnd: opts.End, }, nil } @@ -78,6 +79,7 @@ func (m PassthroughOptimizer) Optimize(plan Node, opts *query.Options) (Node, an Engine: matchingLabelsEngines[0], Query: plan.Clone(), QueryRangeStart: opts.Start, + QueryRangeEnd: opts.End, }, nil }