Skip to content

Commit 30e7a9f

Browse files
authored
Respect the SkipBinaryPushdown option (#494)
My previous PR introduced a regression for the SkipBinopPushdown flag. Signed-off-by: Filip Petkovski <[email protected]>
1 parent 0b7a5d6 commit 30e7a9f

File tree

2 files changed

+21
-10
lines changed

2 files changed

+21
-10
lines changed

logicalplan/distribute.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -494,10 +494,11 @@ func isDistributive(expr *Node, skipBinaryPushdown bool, engineLabels map[string
494494
case Deduplicate, RemoteExecution:
495495
return false
496496
case *Binary:
497-
if isBinaryExpressionWithOneScalarSide(e) || skipBinaryPushdown {
497+
if isBinaryExpressionWithOneScalarSide(e) {
498498
return true
499499
}
500-
return isBinaryExpressionWithDistributableMatching(e, engineLabels) &&
500+
return !skipBinaryPushdown &&
501+
isBinaryExpressionWithDistributableMatching(e, engineLabels) &&
501502
isDistributive(&e.LHS, skipBinaryPushdown, engineLabels, warns) &&
502503
isDistributive(&e.RHS, skipBinaryPushdown, engineLabels, warns)
503504
case *Aggregation:

logicalplan/distribute_test.go

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,11 @@ var replacements = map[string]*regexp.Regexp{
2626
func TestDistributedExecution(t *testing.T) {
2727
t.Parallel()
2828
cases := []struct {
29-
name string
30-
expr string
31-
expectWarn bool
32-
expected string
29+
name string
30+
expr string
31+
skipBinopPushdown bool
32+
expectWarn bool
33+
expected string
3334
}{
3435
{
3536
name: "selector",
@@ -412,18 +413,27 @@ count by (cluster) (
412413
)`,
413414
expectWarn: true,
414415
},
416+
{
417+
name: "skip binary pushdown when configured",
418+
expr: `metric_a / metric_b`,
419+
expected: `dedup(remote(metric_a), remote(metric_a)) / dedup(remote(metric_b), remote(metric_b))`,
420+
skipBinopPushdown: true,
421+
},
415422
}
416423

417424
engines := []api.RemoteEngine{
418425
newEngineMock(math.MinInt64, math.MaxInt64, []labels.Labels{labels.FromStrings("region", "east"), labels.FromStrings("region", "south")}),
419426
newEngineMock(math.MinInt64, math.MaxInt64, []labels.Labels{labels.FromStrings("region", "west")}),
420427
}
421-
optimizers := []Optimizer{
422-
DistributedExecutionOptimizer{Endpoints: api.NewStaticEndpoints(engines)},
423-
}
424-
425428
for _, tcase := range cases {
426429
t.Run(tcase.name, func(t *testing.T) {
430+
optimizers := []Optimizer{
431+
DistributedExecutionOptimizer{
432+
Endpoints: api.NewStaticEndpoints(engines),
433+
SkipBinaryPushdown: tcase.skipBinopPushdown,
434+
},
435+
}
436+
427437
expr, err := parser.ParseExpr(tcase.expr)
428438
testutil.Ok(t, err)
429439

0 commit comments

Comments
 (0)