diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/DeltaJoinUtil.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/DeltaJoinUtil.java index e96d5be5324fb..d433ba5ed9df0 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/DeltaJoinUtil.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/DeltaJoinUtil.java @@ -18,6 +18,7 @@ package org.apache.flink.table.planner.plan.utils; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.table.catalog.Index; import org.apache.flink.table.catalog.ResolvedSchema; @@ -335,12 +336,14 @@ private static boolean areJoinConditionsSupported(StreamPhysicalJoin join) { return isFilterOnOneSetOfUpsertKeys(nonEquiCond.get(), upsertKeys); } - private static boolean isFilterOnOneSetOfUpsertKeys( + @VisibleForTesting + protected static boolean isFilterOnOneSetOfUpsertKeys( RexNode filter, @Nullable Set upsertKeys) { ImmutableBitSet fieldRefIndices = ImmutableBitSet.of( RexNodeExtractor.extractRefInputFields(Collections.singletonList(filter))); - return upsertKeys.stream().anyMatch(uk -> uk.contains(fieldRefIndices)); + return upsertKeys != null + && upsertKeys.stream().anyMatch(uk -> uk.contains(fieldRefIndices)); } private static boolean areAllJoinTableScansSupported(StreamPhysicalJoin join) { diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/utils/DeltaJoinUtilTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/utils/DeltaJoinUtilTest.java new file mode 100644 index 0000000000000..9a5d8fde20227 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/utils/DeltaJoinUtilTest.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.utils; + +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.planner.calcite.FlinkTypeFactory; +import org.apache.flink.table.planner.calcite.FlinkTypeSystem; +import org.apache.flink.table.types.utils.TypeConversions; + +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.fun.SqlStdOperatorTable; +import org.apache.calcite.util.ImmutableBitSet; +import org.junit.jupiter.api.Test; + +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.apache.flink.table.planner.plan.utils.DeltaJoinUtil.isFilterOnOneSetOfUpsertKeys; +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for {@link DeltaJoinUtil}. */ +class DeltaJoinUtilTest { + + @Test + void testIsFilterOnOneSetOfUpsertKeys() { + FlinkTypeFactory typeFactory = + new FlinkTypeFactory( + Thread.currentThread().getContextClassLoader(), FlinkTypeSystem.INSTANCE); + // input schema: + // a string, + // b bigint, + // c bigint + List allFieldTypes = + Stream.of(DataTypes.VARCHAR(100), DataTypes.BIGINT(), DataTypes.BIGINT()) + .map(TypeConversions::fromDataToLogicalType) + .map(typeFactory::createFieldTypeFromLogicalType) + .collect(Collectors.toList()); + + RexBuilder rexBuilder = new RexBuilder(typeFactory); + + // a = 'jim' + RexNode filter = + rexBuilder.makeCall( + SqlStdOperatorTable.EQUALS, + rexBuilder.makeInputRef(allFieldTypes.get(0), 0), + rexBuilder.makeLiteral("jim", allFieldTypes.get(0))); + + assertThat(isFilterOnOneSetOfUpsertKeys(filter, Set.of(ImmutableBitSet.of(0)))).isTrue(); + assertThat(isFilterOnOneSetOfUpsertKeys(filter, Set.of(ImmutableBitSet.of(2)))).isFalse(); + assertThat(isFilterOnOneSetOfUpsertKeys(filter, Set.of(ImmutableBitSet.of(0, 1)))).isTrue(); + assertThat(isFilterOnOneSetOfUpsertKeys(filter, Set.of(ImmutableBitSet.of(1, 2)))) + .isFalse(); + assertThat( + isFilterOnOneSetOfUpsertKeys( + filter, Set.of(ImmutableBitSet.of(1), ImmutableBitSet.of(2)))) + .isFalse(); + assertThat( + isFilterOnOneSetOfUpsertKeys( + filter, Set.of(ImmutableBitSet.of(1), ImmutableBitSet.of(0)))) + .isTrue(); + assertThat(isFilterOnOneSetOfUpsertKeys(filter, null)).isFalse(); + } +}