Skip to content

Commit b408850

Browse files
authored
[FLINK-38640][table-planner] Fix NPE in DeltaJoinUtil#isFilterOnOneSetOfUpsertKeys (#27204) (#27215)
1 parent 81f86ae commit b408850

File tree

2 files changed

+88
-2
lines changed

2 files changed

+88
-2
lines changed

flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/DeltaJoinUtil.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package org.apache.flink.table.planner.plan.utils;
2020

21+
import org.apache.flink.annotation.VisibleForTesting;
2122
import org.apache.flink.api.java.tuple.Tuple2;
2223
import org.apache.flink.table.catalog.Index;
2324
import org.apache.flink.table.catalog.ResolvedSchema;
@@ -335,12 +336,14 @@ private static boolean areJoinConditionsSupported(StreamPhysicalJoin join) {
335336
return isFilterOnOneSetOfUpsertKeys(nonEquiCond.get(), upsertKeys);
336337
}
337338

338-
private static boolean isFilterOnOneSetOfUpsertKeys(
339+
@VisibleForTesting
340+
protected static boolean isFilterOnOneSetOfUpsertKeys(
339341
RexNode filter, @Nullable Set<ImmutableBitSet> upsertKeys) {
340342
ImmutableBitSet fieldRefIndices =
341343
ImmutableBitSet.of(
342344
RexNodeExtractor.extractRefInputFields(Collections.singletonList(filter)));
343-
return upsertKeys.stream().anyMatch(uk -> uk.contains(fieldRefIndices));
345+
return upsertKeys != null
346+
&& upsertKeys.stream().anyMatch(uk -> uk.contains(fieldRefIndices));
344347
}
345348

346349
private static boolean areAllJoinTableScansSupported(StreamPhysicalJoin join) {
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.table.planner.plan.utils;
20+
21+
import org.apache.flink.table.api.DataTypes;
22+
import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
23+
import org.apache.flink.table.planner.calcite.FlinkTypeSystem;
24+
import org.apache.flink.table.types.utils.TypeConversions;
25+
26+
import org.apache.calcite.rel.type.RelDataType;
27+
import org.apache.calcite.rex.RexBuilder;
28+
import org.apache.calcite.rex.RexNode;
29+
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
30+
import org.apache.calcite.util.ImmutableBitSet;
31+
import org.junit.jupiter.api.Test;
32+
33+
import java.util.List;
34+
import java.util.Set;
35+
import java.util.stream.Collectors;
36+
import java.util.stream.Stream;
37+
38+
import static org.apache.flink.table.planner.plan.utils.DeltaJoinUtil.isFilterOnOneSetOfUpsertKeys;
39+
import static org.assertj.core.api.Assertions.assertThat;
40+
41+
/** Test for {@link DeltaJoinUtil}. */
42+
class DeltaJoinUtilTest {
43+
44+
@Test
45+
void testIsFilterOnOneSetOfUpsertKeys() {
46+
FlinkTypeFactory typeFactory =
47+
new FlinkTypeFactory(
48+
Thread.currentThread().getContextClassLoader(), FlinkTypeSystem.INSTANCE);
49+
// input schema:
50+
// a string,
51+
// b bigint,
52+
// c bigint
53+
List<RelDataType> allFieldTypes =
54+
Stream.of(DataTypes.VARCHAR(100), DataTypes.BIGINT(), DataTypes.BIGINT())
55+
.map(TypeConversions::fromDataToLogicalType)
56+
.map(typeFactory::createFieldTypeFromLogicalType)
57+
.collect(Collectors.toList());
58+
59+
RexBuilder rexBuilder = new RexBuilder(typeFactory);
60+
61+
// a = 'jim'
62+
RexNode filter =
63+
rexBuilder.makeCall(
64+
SqlStdOperatorTable.EQUALS,
65+
rexBuilder.makeInputRef(allFieldTypes.get(0), 0),
66+
rexBuilder.makeLiteral("jim", allFieldTypes.get(0)));
67+
68+
assertThat(isFilterOnOneSetOfUpsertKeys(filter, Set.of(ImmutableBitSet.of(0)))).isTrue();
69+
assertThat(isFilterOnOneSetOfUpsertKeys(filter, Set.of(ImmutableBitSet.of(2)))).isFalse();
70+
assertThat(isFilterOnOneSetOfUpsertKeys(filter, Set.of(ImmutableBitSet.of(0, 1)))).isTrue();
71+
assertThat(isFilterOnOneSetOfUpsertKeys(filter, Set.of(ImmutableBitSet.of(1, 2))))
72+
.isFalse();
73+
assertThat(
74+
isFilterOnOneSetOfUpsertKeys(
75+
filter, Set.of(ImmutableBitSet.of(1), ImmutableBitSet.of(2))))
76+
.isFalse();
77+
assertThat(
78+
isFilterOnOneSetOfUpsertKeys(
79+
filter, Set.of(ImmutableBitSet.of(1), ImmutableBitSet.of(0))))
80+
.isTrue();
81+
assertThat(isFilterOnOneSetOfUpsertKeys(filter, null)).isFalse();
82+
}
83+
}

0 commit comments

Comments
 (0)