Skip to content

Commit 0b7a5d6

Browse files
authored
Fix distributing binary operator with engine labels matching (#493)
When distributing a binary operation, we need to check if both RHS and RHS are distributive. The new test case fails without this change because only the LHS gets distributed. Signed-off-by: Filip Petkovski <[email protected]>
1 parent 4a33cf4 commit 0b7a5d6

File tree

2 files changed

+36
-1
lines changed

2 files changed

+36
-1
lines changed

logicalplan/distribute.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -494,7 +494,12 @@ func isDistributive(expr *Node, skipBinaryPushdown bool, engineLabels map[string
494494
case Deduplicate, RemoteExecution:
495495
return false
496496
case *Binary:
497-
return isBinaryExpressionWithOneScalarSide(e) || (!skipBinaryPushdown && isBinaryExpressionWithDistributableMatching(e, engineLabels))
497+
if isBinaryExpressionWithOneScalarSide(e) || skipBinaryPushdown {
498+
return true
499+
}
500+
return isBinaryExpressionWithDistributableMatching(e, engineLabels) &&
501+
isDistributive(&e.LHS, skipBinaryPushdown, engineLabels, warns) &&
502+
isDistributive(&e.RHS, skipBinaryPushdown, engineLabels, warns)
498503
case *Aggregation:
499504
// Certain aggregations are currently not supported.
500505
if _, ok := distributiveAggregations[e.Op]; !ok {

logicalplan/distribute_test.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -382,6 +382,36 @@ sum_over_time(max(dedup(
382382
expr: `X * on (foo) Y`,
383383
expected: `dedup(remote(X), remote(X)) * on (foo) dedup(remote(Y), remote(Y))`,
384384
},
385+
386+
{
387+
name: "binary matching and label replace with local label",
388+
expr: `
389+
count by (cluster) (
390+
label_replace(up, "ns", "$0", "namespace", ".*")
391+
* on(region) group_left(project) label_replace(k8s_cluster_info, "k8s_cluster", "$0", "cluster", ".*")
392+
)`,
393+
expected: `
394+
sum by (cluster) (dedup(
395+
remote(count by (cluster, region) (label_replace(up, "ns", "$0", "namespace", ".*") * on (region) group_left (project) label_replace(k8s_cluster_info, "k8s_cluster", "$0", "cluster", ".*"))),
396+
remote(count by (cluster, region) (label_replace(up, "ns", "$0", "namespace", ".*") * on (region) group_left (project) label_replace(k8s_cluster_info, "k8s_cluster", "$0", "cluster", ".*"))))
397+
)`,
398+
},
399+
{
400+
name: "binary matching and label replace with engine label",
401+
expr: `
402+
count by (cluster) (
403+
label_replace(up, "region", "$0", "k8s_region", ".*")
404+
* on(region) group_left(project) label_replace(k8s_cluster_info, "k8s_cluster", "$0", "cluster", ".*"))`,
405+
expected: `
406+
count by (cluster) (
407+
label_replace(dedup(remote(up), remote(up)), "region", "$0", "k8s_region", ".*")
408+
* on (region) group_left (project) dedup(
409+
remote(label_replace(k8s_cluster_info, "k8s_cluster", "$0", "cluster", ".*")),
410+
remote(label_replace(k8s_cluster_info, "k8s_cluster", "$0", "cluster", ".*"))
411+
)
412+
)`,
413+
expectWarn: true,
414+
},
385415
}
386416

387417
engines := []api.RemoteEngine{

0 commit comments

Comments
 (0)