Skip to content

Commit b0575e5

Browse files
committed
Fix distributed queries with explicit timestamp
The distributed optimizer does not consider queries with a specific timestamp which causes them to be incorrect for some cases. Signed-off-by: Filip Petkovski <[email protected]>
1 parent 1630e99 commit b0575e5

File tree

2 files changed

+26
-4
lines changed

2 files changed

+26
-4
lines changed

engine/distributed_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,10 @@ func TestDistributedAggregations(t *testing.T) {
245245
{name: "subquery over distributed binary expression", query: `max_over_time((bar / bar)[30s:15s])`},
246246
{name: "timestamp", query: `timestamp(bar)`},
247247
{name: "timestamp - step invariant", query: `timestamp(bar @ 6000.000)`},
248+
{name: "query with @start() absolute timestamp", query: `sum(bar @ start())`},
249+
{name: "query with @end() timestamp", query: `sum(bar @ end())`},
250+
{name: "query with numeric timestamp", query: `sum(bar @ 140)`},
251+
{name: "query with range and @end() timestamp", query: `sum(count_over_time(bar[1h] @ end()))`},
248252
}
249253

250254
lookbackDeltas := []time.Duration{0, 30 * time.Second, 5 * time.Minute}

logicalplan/distribute.go

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ func (r RemoteExecution) String() string {
9595
if r.QueryRangeStart.UnixMilli() == 0 {
9696
return fmt.Sprintf("remote(%s)", r.Query)
9797
}
98-
return fmt.Sprintf("remote(%s) [%s]", r.Query, r.QueryRangeStart.UTC().String())
98+
return fmt.Sprintf("remote(%s) [%s, %s]", r.Query, r.QueryRangeStart.UTC().String())
9999
}
100100

101101
func (r RemoteExecution) Type() NodeType { return RemoteExecutionNode }
@@ -308,14 +308,16 @@ func newRemoteAggregation(rootAggregation *Aggregation, engines []api.RemoteEngi
308308
// For each engine which matches the time range of the query, it creates a RemoteExecution scoped to the range of the engine.
309309
// All remote executions are wrapped in a Deduplicate logical node to make sure that results from overlapping engines are deduplicated.
310310
func (m DistributedExecutionOptimizer) distributeQuery(expr *Node, engines []api.RemoteEngine, opts *query.Options, allowedStartOffset time.Duration) Node {
311+
if ts := getQueryTimestamp(expr); ts != nil {
312+
return *expr
313+
}
311314
startOffset := calculateStartOffset(expr, opts.LookbackDelta)
312315
if allowedStartOffset < startOffset {
313316
return *expr
314317
}
315318
if IsConstantExpr(*expr) {
316319
return *expr
317320
}
318-
319321
var globalMinT int64 = math.MaxInt64
320322
for _, e := range engines {
321323
if e.MinT() < globalMinT {
@@ -464,8 +466,10 @@ func calculateStartOffset(expr *Node, lookbackDelta time.Duration) time.Duration
464466
return lookbackDelta
465467
}
466468

467-
var selectRange time.Duration
468-
var offset time.Duration
469+
var (
470+
selectRange time.Duration
471+
offset time.Duration
472+
)
469473
Traverse(expr, func(node *Node) {
470474
switch n := (*node).(type) {
471475
case *Subquery:
@@ -479,6 +483,20 @@ func calculateStartOffset(expr *Node, lookbackDelta time.Duration) time.Duration
479483
return maxDuration(offset+selectRange, lookbackDelta)
480484
}
481485

486+
func getQueryTimestamp(expr *Node) *int64 {
487+
var timestamp *int64
488+
Traverse(expr, func(node *Node) {
489+
switch n := (*node).(type) {
490+
case *VectorSelector:
491+
if n.Timestamp != nil {
492+
timestamp = n.Timestamp
493+
return
494+
}
495+
}
496+
})
497+
return timestamp
498+
}
499+
482500
func numSteps(start, end time.Time, step time.Duration) int64 {
483501
return (end.UnixMilli()-start.UnixMilli())/step.Milliseconds() + 1
484502
}

0 commit comments

Comments
 (0)