diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.java index a832696fbda74..eb8797e212b8a 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.java @@ -27,6 +27,7 @@ import org.apache.flink.table.planner.utils.TableTestBase; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import scala.Enumeration; @@ -50,7 +51,7 @@ void setup() { util.tableEnv() .executeSql( "CREATE TABLE Users (" - + " user_id_0 STRING PRIMARY KEY NOT ENFORCED," + + " user_id STRING PRIMARY KEY NOT ENFORCED," + " name STRING," + " cash INT" + ") WITH ('connector' = 'values', 'changelog-mode' = 'I,UA,D')"); @@ -59,7 +60,7 @@ void setup() { .executeSql( "CREATE TABLE Orders (" + " order_id STRING PRIMARY KEY NOT ENFORCED," - + " user_id_1 STRING," + + " user_id STRING," + " product STRING" + ") WITH ('connector' = 'values', 'changelog-mode' = 'I,D')"); @@ -68,16 +69,26 @@ void setup() { "CREATE TABLE Payments (" + " payment_id STRING PRIMARY KEY NOT ENFORCED," + " price INT," - + " user_id_2 STRING" + + " user_id STRING" + ") WITH ('connector' = 'values', 'changelog-mode' = 'I')"); util.tableEnv() .executeSql( "CREATE TABLE Shipments (" + " location STRING," - + " user_id_3 STRING" + + " user_id STRING" + ") WITH ('connector' = 'values', 'changelog-mode' = 'I,UA,UB,D')"); + util.tableEnv() + .executeSql( + "CREATE TABLE Detail (" + + " detail_id STRING PRIMARY KEY NOT ENFORCED," + + " description STRING," + + " user_id STRING," + + " data STRING," + + " `timestamp` BIGINT" + + ") WITH ('connector' = 'values', 'changelog-mode' = 'I')"); + // Tables for testing temporal join exclusion util.tableEnv() .executeSql( @@ -101,8 +112,8 @@ void setup() { "CREATE TABLE EventTable1 (" + " id STRING," + " val INT," - + " rowtime TIMESTAMP(3)," - + " WATERMARK FOR rowtime AS rowtime - INTERVAL '5' SECOND" + + " `$rowtime` TIMESTAMP(3)," + + " WATERMARK FOR `$rowtime` AS `$rowtime` - INTERVAL '5' SECOND" + ") WITH ('connector' = 'values', 'changelog-mode' = 'I')"); util.tableEnv() @@ -110,15 +121,15 @@ void setup() { "CREATE TABLE EventTable2 (" + " id STRING," + " price DOUBLE," - + " rowtime TIMESTAMP(3)," - + " WATERMARK FOR rowtime AS rowtime - INTERVAL '5' SECOND" + + " `$rowtime` TIMESTAMP(3)," + + " WATERMARK FOR `$rowtime` AS `$rowtime` - INTERVAL '5' SECOND" + ") WITH ('connector' = 'values', 'changelog-mode' = 'I')"); // Tables for testing time attribute materialization in multi-join util.tableEnv() .executeSql( "CREATE TABLE UsersWithProctime (" - + " user_id_0 STRING PRIMARY KEY NOT ENFORCED," + + " user_id STRING PRIMARY KEY NOT ENFORCED," + " name STRING," + " proctime AS PROCTIME()" + ") WITH ('connector' = 'values', 'changelog-mode' = 'I')"); @@ -127,9 +138,9 @@ void setup() { .executeSql( "CREATE TABLE OrdersWithRowtime (" + " order_id STRING PRIMARY KEY NOT ENFORCED," - + " user_id_1 STRING," - + " rowtime TIMESTAMP(3)," - + " WATERMARK FOR rowtime AS rowtime" + + " user_id STRING," + + " `$rowtime` TIMESTAMP(3)," + + " WATERMARK FOR `$rowtime` AS `$rowtime`" + ") WITH ('connector' = 'values', 'changelog-mode' = 'I')"); // Tables for testing upsert key preservation util.tableEnv() @@ -170,64 +181,64 @@ void setup() { @Test void testThreeWayInnerJoinRelPlan() { util.verifyRelPlan( - "SELECT u.user_id_0, u.name, o.order_id, p.payment_id " + "SELECT u.user_id, u.name, o.order_id, p.payment_id " + "FROM Users u " - + "INNER JOIN Orders o ON u.user_id_0 = o.user_id_1 " - + "INNER JOIN Payments p ON u.user_id_0 = p.user_id_2"); + + "INNER JOIN Orders o ON u.user_id = o.user_id " + + "INNER JOIN Payments p ON u.user_id = p.user_id"); } @Test void testThreeWayInnerJoinNoCommonJoinKeyRelPlan() { util.verifyRelPlan( - "SELECT u.user_id_0, u.name, o.order_id, p.payment_id " + "SELECT u.user_id, u.name, o.order_id, p.payment_id " + "FROM Users u " - + "INNER JOIN Orders o ON u.user_id_0 = o.user_id_1 " + + "INNER JOIN Orders o ON u.user_id = o.user_id " + "INNER JOIN Payments p ON u.cash = p.price"); } @Test void testThreeWayInnerJoinExecPlan() { util.verifyExecPlan( - "SELECT u.user_id_0, u.name, o.order_id, p.payment_id " + "SELECT u.user_id, u.name, o.order_id, p.payment_id " + "FROM Users u " - + "INNER JOIN Orders o ON u.user_id_0 = o.user_id_1 " - + "INNER JOIN Payments p ON u.user_id_0 = p.user_id_2"); + + "INNER JOIN Orders o ON u.user_id = o.user_id " + + "INNER JOIN Payments p ON u.user_id = p.user_id"); } @Test void testThreeWayLeftOuterJoinRelPlan() { util.verifyRelPlan( - "SELECT u.user_id_0, u.name, o.order_id, p.payment_id " + "SELECT u.user_id, u.name, o.order_id, p.payment_id " + "FROM Users u " - + "LEFT JOIN Orders o ON u.user_id_0 = o.user_id_1 " - + "LEFT JOIN Payments p ON u.user_id_0 = p.user_id_2"); + + "LEFT JOIN Orders o ON u.user_id = o.user_id " + + "LEFT JOIN Payments p ON u.user_id = p.user_id"); } @Test void testThreeWayInnerJoinWithTttlHints() { util.verifyRelPlan( - "SELECT /*+ STATE_TTL(u='1d', o='2d', p='1h') */u.user_id_0, u.name, o.order_id, p.payment_id " + "SELECT /*+ STATE_TTL(u='1d', o='2d', p='1h') */u.user_id, u.name, o.order_id, p.payment_id " + "FROM Users u " - + "INNER JOIN Orders o ON u.user_id_0 = o.user_id_1 " - + "INNER JOIN Payments p ON u.user_id_0 = p.user_id_2"); + + "INNER JOIN Orders o ON u.user_id = o.user_id " + + "INNER JOIN Payments p ON u.user_id = p.user_id"); } @Test void testThreeWayInnerJoinWithSingleTttlHint() { util.verifyRelPlan( - "SELECT /*+ STaTE_tTL(o='2d') */u.user_id_0, u.name, o.order_id, p.payment_id " + "SELECT /*+ STaTE_tTL(o='2d') */u.user_id, u.name, o.order_id, p.payment_id " + "FROM Users u " - + "INNER JOIN Orders o ON u.user_id_0 = o.user_id_1 " - + "INNER JOIN Payments p ON u.user_id_0 = p.user_id_2"); + + "INNER JOIN Orders o ON u.user_id = o.user_id " + + "INNER JOIN Payments p ON u.user_id = p.user_id"); } @Test void testThreeWayLeftOuterJoinExecPlan() { util.verifyExecPlan( - "SELECT u.user_id_0, u.name, o.order_id, p.payment_id " + "SELECT u.user_id, u.name, o.order_id, p.payment_id " + "FROM Users u " - + "LEFT JOIN Orders o ON u.user_id_0 = o.user_id_1 " - + "LEFT JOIN Payments p ON u.user_id_0 = p.user_id_2"); + + "LEFT JOIN Orders o ON u.user_id = o.user_id " + + "LEFT JOIN Payments p ON u.user_id = p.user_id"); } @Test @@ -236,7 +247,7 @@ void testTwoWayJoinWithUnion() { .executeSql( "CREATE TABLE Orders2 (" + " order_id STRING PRIMARY KEY NOT ENFORCED," - + " user_id_1 STRING," + + " user_id STRING," + " product STRING" + ") WITH ('connector' = 'values', 'changelog-mode' = 'I,D')"); @@ -248,23 +259,19 @@ void testTwoWayJoinWithUnion() { + ") " + "SELECT * FROM OrdersUnion o " + "LEFT JOIN Users u " - + "ON o.user_id_1 = u.user_id_0"); + + "ON o.user_id = u.user_id"); } @Test void testTwoWayJoinWithRank() { - util.getTableEnv() - .getConfig() - .set(OptimizerConfigOptions.TABLE_OPTIMIZER_MULTI_JOIN_ENABLED, true); - util.verifyRelPlan( "WITH JoinedEvents as (" - + "SELECT e1.id as id, e1.val, e1.rowtime as `rowtime`, e2.price " + + "SELECT e1.id as id, e1.val, e1.`$rowtime` as `$rowtime`, e2.price " + "FROM EventTable1 e1 " + "JOIN EventTable2 e2 ON e1.id = e2.id) " - + "SELECT id, val, `rowtime` FROM (" + + "SELECT id, val, `$rowtime` FROM (" + "SELECT *, " - + "ROW_NUMBER() OVER (PARTITION BY id ORDER BY `rowtime` DESC) as ts " + + "ROW_NUMBER() OVER (PARTITION BY id ORDER BY `$rowtime` DESC) as ts " + "FROM JoinedEvents) " + "WHERE ts = 1"); } @@ -272,17 +279,17 @@ void testTwoWayJoinWithRank() { @Test void testFourWayComplexJoinRelPlan() { util.verifyRelPlan( - "SELECT u.user_id_0, u.name, o.order_id, p.payment_id, s.location " + "SELECT u.user_id, u.name, o.order_id, p.payment_id, s.location " + "FROM Users u " - + "LEFT JOIN Orders o ON u.user_id_0 = o.user_id_1 " - + "INNER JOIN Payments p ON u.user_id_0 = p.user_id_2 AND (u.cash >= p.price OR p.price < 0) " - + "LEFT JOIN Shipments s ON p.user_id_2 = s.user_id_3"); + + "LEFT JOIN Orders o ON u.user_id = o.user_id " + + "INNER JOIN Payments p ON u.user_id = p.user_id AND (u.cash >= p.price OR p.price < 0) " + + "LEFT JOIN Shipments s ON p.user_id = s.user_id"); } @Test void testThreeWayJoinNoJoinKeyExecPlan() { util.verifyExecPlan( - "SELECT u.user_id_0, u.name, o.order_id, p.payment_id " + "SELECT u.user_id, u.name, o.order_id, p.payment_id " + "FROM Users u " + "LEFT JOIN Orders o ON TRUE " + "INNER JOIN Payments p ON TRUE "); @@ -291,49 +298,50 @@ void testThreeWayJoinNoJoinKeyExecPlan() { @Test void testFourWayJoinNoCommonJoinKeyRelPlan() { util.verifyRelPlan( - "SELECT u.user_id_0, u.name, o.order_id, p.payment_id, s.location " + "SELECT u.user_id, u.name, o.order_id, p.payment_id, s.location " + "FROM Users u " - + "LEFT JOIN Orders o ON u.user_id_0 = o.user_id_1 " - + "INNER JOIN Payments p ON u.user_id_0 = p.user_id_2 " - + "LEFT JOIN Shipments s ON p.payment_id = s.user_id_3"); + + "LEFT JOIN Orders o ON u.user_id = o.user_id " + + "LEFT JOIN LookupTable ON u.name = LookupTable.name " + + "LEFT JOIN Payments p ON u.user_id = p.user_id " + + "LEFT JOIN Shipments s ON o.user_id = s.user_id"); } @Test void testFourWayComplexJoinExecPlan() { util.verifyExecPlan( - "SELECT u.user_id_0, u.name, o.order_id, p.payment_id, s.location " + "SELECT u.user_id, u.name, o.order_id, p.payment_id, s.location " + "FROM Users u " - + "LEFT JOIN Orders o ON u.user_id_0 = o.user_id_1 " - + "INNER JOIN Payments p ON u.user_id_0 = p.user_id_2 AND (u.cash >= p.price OR p.price < 0) " - + "LEFT JOIN Shipments s ON p.user_id_2 = s.user_id_3"); + + "LEFT JOIN Orders o ON u.user_id = o.user_id " + + "INNER JOIN Payments p ON u.user_id = p.user_id AND (u.cash >= p.price OR p.price < 0) " + + "LEFT JOIN Shipments s ON p.user_id = s.user_id"); } @Test void testThreeWayInnerJoinExplain() { util.verifyExplain( - "SELECT u.user_id_0, u.name, o.order_id, p.payment_id " + "SELECT u.user_id, u.name, o.order_id, p.payment_id " + "FROM Users u " - + "INNER JOIN Orders o ON u.user_id_0 = o.user_id_1 " - + "INNER JOIN Payments p ON u.user_id_0 = p.user_id_2"); + + "INNER JOIN Orders o ON u.user_id = o.user_id " + + "INNER JOIN Payments p ON u.user_id = p.user_id"); } @Test void testThreeWayLeftOuterJoinExplain() { util.verifyExplain( - "SELECT u.user_id_0, u.name, o.order_id, p.payment_id " + "SELECT u.user_id, u.name, o.order_id, p.payment_id " + "FROM Users u " - + "LEFT JOIN Orders o ON u.user_id_0 = o.user_id_1 " - + "LEFT JOIN Payments p ON u.user_id_0 = p.user_id_2"); + + "LEFT JOIN Orders o ON u.user_id = o.user_id " + + "LEFT JOIN Payments p ON u.user_id = p.user_id"); } @Test void testFourWayComplexJoinExplain() { util.verifyExplain( - "SELECT u.user_id_0, u.name, o.order_id, p.payment_id, s.location " + "SELECT u.user_id, u.name, o.order_id, p.payment_id, s.location " + "FROM Users u " - + "LEFT JOIN Orders o ON u.user_id_0 = o.user_id_1 " - + "INNER JOIN Payments p ON u.user_id_0 = p.user_id_2 AND (u.cash >= p.price OR p.price < 0) " - + "LEFT JOIN Shipments s ON p.user_id_2 = s.user_id_3"); + + "LEFT JOIN Orders o ON u.user_id = o.user_id " + + "INNER JOIN Payments p ON u.user_id = p.user_id AND (u.cash >= p.price OR p.price < 0) " + + "LEFT JOIN Shipments s ON p.user_id = s.user_id"); } @Test @@ -353,37 +361,37 @@ void testIntervalJoinExcludedFromMultiJoin() { "SELECT e1.id, e1.val, e2.price " + "FROM EventTable1 e1 " + "JOIN EventTable2 e2 ON e1.id = e2.id " - + "AND e1.rowtime BETWEEN e2.rowtime - INTERVAL '1' MINUTE " - + "AND e2.rowtime + INTERVAL '1' MINUTE"); + + "AND e1.`$rowtime` BETWEEN e2.`$rowtime` - INTERVAL '1' MINUTE " + + "AND e2.`$rowtime` + INTERVAL '1' MINUTE"); } @Test void testThreeWayLeftOuterJoinWithWhereClauseRelPlan() { util.verifyRelPlan( - "SELECT u.user_id_0, u.name, o.order_id, p.payment_id " + "SELECT u.user_id, u.name, o.order_id, p.payment_id " + "FROM Users u " - + "LEFT JOIN Orders o ON u.user_id_0 = o.user_id_1 " - + "LEFT JOIN Payments p ON u.user_id_0 = p.user_id_2 " + + "LEFT JOIN Orders o ON u.user_id = o.user_id " + + "LEFT JOIN Payments p ON u.user_id = p.user_id " + "WHERE u.name = 'Gus' AND p.price > 10"); } @Test void testThreeWayLeftOuterJoinWithWhereClauseExecPlan() { util.verifyExecPlan( - "SELECT u.user_id_0, u.name, o.order_id, p.payment_id " + "SELECT u.user_id, u.name, o.order_id, p.payment_id " + "FROM Users u " - + "LEFT JOIN Orders o ON u.user_id_0 = o.user_id_1 " - + "LEFT JOIN Payments p ON u.user_id_0 = p.user_id_2 " + + "LEFT JOIN Orders o ON u.user_id = o.user_id " + + "LEFT JOIN Payments p ON u.user_id = p.user_id " + "WHERE u.name = 'Gus' AND p.price > 10"); } @Test void testThreeWayLeftOuterJoinWithWhereClauseExplain() { util.verifyExplain( - "SELECT u.user_id_0, u.name, o.order_id, p.payment_id " + "SELECT u.user_id, u.name, o.order_id, p.payment_id " + "FROM Users u " - + "LEFT JOIN Orders o ON u.user_id_0 = o.user_id_1 " - + "LEFT JOIN Payments p ON u.user_id_0 = p.user_id_2 " + + "LEFT JOIN Orders o ON u.user_id = o.user_id " + + "LEFT JOIN Payments p ON u.user_id = p.user_id " + "WHERE u.name = 'Gus' AND p.price > 10"); } @@ -391,53 +399,53 @@ void testThreeWayLeftOuterJoinWithWhereClauseExplain() { void testRegularJoinsAreMergedApartFromTemporalJoin() { // Regular joins should still be eligible for MultiJoin but not mixed with temporal joins util.verifyRelPlan( - "SELECT u.user_id_0, u.name, o.order_id, temporal.age " + "SELECT u.user_id, u.name, o.order_id, temporal.age " + "FROM Users u " - + "INNER JOIN Orders o ON u.user_id_0 = o.user_id_1 " + + "INNER JOIN Orders o ON u.user_id = o.user_id " + "INNER JOIN (" + " SELECT s.user_id, l.age " + " FROM StreamTable s " + " JOIN LookupTable FOR SYSTEM_TIME AS OF s.proctime AS l " + " ON s.user_id = l.id" - + ") temporal ON u.user_id_0 = temporal.user_id"); + + ") temporal ON u.user_id = temporal.user_id"); } @Test void testFourWayJoinTransitiveCommonJoinKeyRelPlan() { util.verifyRelPlan( - "SELECT u.user_id_0, u.name, o.order_id, p.payment_id, s.location " + "SELECT u.user_id, u.name, o.order_id, p.payment_id, s.location " + "FROM Users u " - + "LEFT JOIN Orders o ON u.user_id_0 = o.user_id_1 " - + "LEFT JOIN Payments p ON o.user_id_1 = p.user_id_2 " - + "LEFT JOIN Shipments s ON p.user_id_2 = s.user_id_3"); + + "LEFT JOIN Orders o ON u.user_id = o.user_id " + + "LEFT JOIN Payments p ON o.user_id = p.user_id " + + "LEFT JOIN Shipments s ON p.user_id = s.user_id"); } /* Update this to supported with FLINK-37973 https://issues.apache.org/jira/browse/FLINK-37973 */ @Test void testRightJoinNotSupported() { util.verifyRelPlan( - "SELECT u.user_id_0, u.name, o.order_id, p.payment_id " + "SELECT u.user_id, u.name, o.order_id, p.payment_id " + "FROM Users u " - + "RIGHT JOIN Orders o ON u.user_id_0 = o.user_id_1 " - + "RIGHT JOIN Payments p ON o.user_id_1 = p.user_id_2"); + + "RIGHT JOIN Orders o ON u.user_id = o.user_id " + + "RIGHT JOIN Payments p ON o.user_id = p.user_id"); } @Test void testFullOuterNotSupported() { util.verifyRelPlan( - "SELECT u.user_id_0, u.name, o.order_id, p.payment_id " + "SELECT u.user_id, u.name, o.order_id, p.payment_id " + "FROM Users u " - + "FULL OUTER JOIN Orders o ON u.user_id_0 = o.user_id_1 " - + "FULL OUTER JOIN Payments p ON o.user_id_1 = p.user_id_2"); + + "FULL OUTER JOIN Orders o ON u.user_id = o.user_id " + + "FULL OUTER JOIN Payments p ON o.user_id = p.user_id"); } @Test void testThreeWayJoinWithTimeAttributesMaterialization() { util.verifyRelPlan( - "SELECT u.name, u.proctime, o.rowtime, p.price " + "SELECT u.name, u.proctime, o.`$rowtime`, p.price " + "FROM UsersWithProctime u " - + "JOIN OrdersWithRowtime o ON u.user_id_0 = o.user_id_1 " - + "JOIN Payments p ON u.user_id_0 = p.user_id_2"); + + "JOIN OrdersWithRowtime o ON u.user_id = o.user_id " + + "JOIN Payments p ON u.user_id = p.user_id"); } @Test @@ -568,14 +576,14 @@ void testPreservesUpsertKeyFourWayComplex() { util.tableEnv() .executeSql( "CREATE TABLE sink_four_way (" - + " user_id_0 STRING NOT NULL," + + " user_id STRING NOT NULL," + " order_id STRING NOT NULL," - + " user_id_1 STRING NOT NULL," + + " user_id1 STRING NOT NULL," + " payment_id STRING NOT NULL," - + " user_id_2 STRING NOT NULL," + + " user_id2 STRING NOT NULL," + " name STRING," + " location STRING," - + " CONSTRAINT `PRIMARY` PRIMARY KEY (`user_id_0`, `order_id`, `user_id_1`, `payment_id`, `user_id_2`) NOT ENFORCED" + + " CONSTRAINT `PRIMARY` PRIMARY KEY (`user_id`, `order_id`, `user_id1`, `payment_id`, `user_id2`) NOT ENFORCED" + ") WITH (" + " 'connector' = 'values'," + " 'sink-insert-only' = 'false'" @@ -684,4 +692,528 @@ void testMultiSinkOnMultiJoinedView() { false, false); } + + /* The following tests contain mapped bugs while adding validation and are disabled for now + * They will be addressed in FLINK-38576 */ + @Disabled + @Test + void testFourWayJoinNoCommonJoinKeyWithFunctionInCondition() { + util.verifyRelPlan( + "SELECT u.user_id, u.name, o.order_id, p.payment_id, s.location " + + "FROM Users u " + + "LEFT JOIN Orders o ON o.user_id = u.user_id " + + "LEFT JOIN Payments p ON u.user_id = p.user_id " + + " AND UPPER(u.name) = UPPER(p.payment_id) " + + " AND (FLOOR(u.cash) >= FLOOR(p.price) OR p.price < 0) " + + "LEFT JOIN Shipments s ON p.payment_id = s.location "); + } + + @Disabled + @Test + void testComplexCommonJoinKeyMissingProjection() { + util.tableEnv() + .executeSql( + "CREATE TABLE Assignments (" + + " assignment_id STRING PRIMARY KEY NOT ENFORCED," + + " user_id STRING," + + " detail_id STRING," + + " common_id STRING" + + ") WITH ('connector' = 'values', 'changelog-mode' = 'I,UA,D')"); + + util.tableEnv() + .executeSql( + "CREATE TABLE Documents (" + + " detail_id STRING PRIMARY KEY NOT ENFORCED," + + " creator_nm STRING," + + " common_id STRING" + + ") WITH ('connector' = 'values', 'changelog-mode' = 'I,UA,D')"); + + util.verifyRelPlan( + "SELECT *\n" + + " FROM Assignments assignments\n" + + " LEFT JOIN Documents AS documents\n" + + " ON assignments.detail_id = documents.detail_id\n" + + " AND assignments.common_id = documents.common_id\n" + + " LEFT JOIN Documents AS other_documents\n" + + " ON assignments.user_id = documents.common_id\n"); + } + + /* + * In this case we have common_id across conditions but in a more complex way. + * but ultimately every combination should be possible. Any of the keys should be able to be + * used in a following join condition: + * - assignments.common_id + * - customer.common_id + * - organizations.common_id + * - documents.common_id + * - creators.common_id + * */ + @Disabled + @Test + void testComplexCommonJoinKey() { + util.tableEnv() + .executeSql( + "CREATE TABLE Assignments (" + + " assignment_id STRING PRIMARY KEY NOT ENFORCED," + + " user_id STRING," + + " detail_id STRING," + + " common_id STRING" + + ") WITH ('connector' = 'values', 'changelog-mode' = 'I,UA,D')"); + + util.tableEnv() + .executeSql( + "CREATE TABLE Customers (" + + " user_id STRING PRIMARY KEY NOT ENFORCED," + + " name STRING," + + " depart_num STRING," + + " common_id STRING" + + ") WITH ('connector' = 'values', 'changelog-mode' = 'I,UA,D')"); + + util.tableEnv() + .executeSql( + "CREATE TABLE Documents (" + + " detail_id STRING PRIMARY KEY NOT ENFORCED," + + " creator_nm STRING," + + " common_id STRING" + + ") WITH ('connector' = 'values', 'changelog-mode' = 'I,UA,D')"); + + util.tableEnv() + .executeSql( + "CREATE TABLE PhaseDetails (" + + " phase_id STRING PRIMARY KEY NOT ENFORCED," + + " common_id STRING" + + ") WITH ('connector' = 'values', 'changelog-mode' = 'I,UA,D')"); + + util.tableEnv() + .executeSql( + "CREATE TABLE Organizations (" + + " org_id STRING PRIMARY KEY NOT ENFORCED," + + " org_name STRING," + + " common_id STRING" + + ") WITH ('connector' = 'values', 'changelog-mode' = 'I,UA,D')"); + + util.verifyExecPlan( + "SELECT *\n" + + " FROM Assignments assignments\n" + + " LEFT JOIN Customers AS customer\n" + + " ON assignments.user_id = customer.user_id\n" + + " AND assignments.common_id = customer.common_id\n" + + " LEFT JOIN Documents AS documents\n" + + " ON assignments.detail_id = documents.detail_id\n" + + " AND assignments.common_id = documents.common_id\n" + + " LEFT JOIN PhaseDetails AS phase_details\n" + + " ON documents.common_id = phase_details.common_id\n" + + " LEFT JOIN Organizations AS organizations\n" + + " ON customer.depart_num = organizations.org_id\n" + + " AND customer.common_id = organizations.common_id\n" + + " LEFT JOIN Customers AS creators\n" + + " ON documents.creator_nm = creators.depart_num\n" + + " AND documents.common_id = creators.common_id"); + } + + @Test + void testComplexConditionalLogicWithMultiJoin() { + util.tableEnv() + .executeSql( + "CREATE TABLE ProductCategories (" + + " category_id STRING PRIMARY KEY NOT ENFORCED," + + " category_name STRING," + + " is_premium BOOLEAN," + + " discount_rate DOUBLE" + + ") WITH ('connector' = 'values', 'changelog-mode' = 'I,UA,D')"); + + util.tableEnv() + .executeSql( + "CREATE TABLE ProductReviews (" + + " review_id STRING PRIMARY KEY NOT ENFORCED," + + " product_id STRING," + + " rating INT," + + " is_verified BOOLEAN" + + ") WITH ('connector' = 'values', 'changelog-mode' = 'I,UA,D')"); + + util.verifyRelPlan( + "SELECT " + + "u.user_id, " + + "o.order_id, " + + "p.payment_id, " + + "pc.category_name, " + + "CASE " + + " WHEN pc.is_premium = true AND p.price > 1000 THEN 'High-Value Premium' " + + " WHEN pc.is_premium = true THEN 'Premium' " + + " WHEN p.price > 500 THEN 'Standard High-Value' " + + " ELSE 'Standard' " + + "END AS product_tier, " + + "CASE " + + " WHEN pr.rating >= 4 AND pr.is_verified = true THEN 'Highly Recommended' " + + " WHEN pr.rating >= 3 THEN 'Recommended' " + + " WHEN pr.rating >= 2 THEN 'Average' " + + " ELSE 'Not Recommended' " + + "END AS recommendation_status, " + + "CASE " + + " WHEN pc.discount_rate > 0.2 THEN p.price * (1 - pc.discount_rate) " + + " ELSE p.price " + + "END AS final_price " + + "FROM Users u " + + "LEFT JOIN Orders o ON u.user_id = o.user_id " + + "LEFT JOIN Payments p ON u.user_id = p.user_id " + + "LEFT JOIN ProductCategories pc ON o.product = pc.category_id " + + "LEFT JOIN ProductReviews pr ON o.product = pr.product_id"); + } + + @Test + void testComplexCTEWithMultiJoin() { + util.tableEnv() + .executeSql( + "CREATE TABLE OrderStatus (" + + " status_id STRING PRIMARY KEY NOT ENFORCED," + + " status_name STRING," + + " is_final BOOLEAN" + + ") WITH ('connector' = 'values', 'changelog-mode' = 'I,UA,D')"); + + util.tableEnv() + .executeSql( + "CREATE TABLE PaymentMethods (" + + " method_id STRING PRIMARY KEY NOT ENFORCED," + + " method_name STRING," + + " processing_fee DOUBLE" + + ") WITH ('connector' = 'values', 'changelog-mode' = 'I,UA,D')"); + + util.verifyRelPlan( + "WITH user_orders AS (" + + " SELECT u.user_id, u.name, o.order_id, o.product, p.payment_id, p.price " + + " FROM Users u " + + " LEFT JOIN Orders o ON u.user_id = o.user_id " + + " LEFT JOIN Payments p ON u.user_id = p.user_id" + + "), " + + "order_details AS (" + + " SELECT uo.*, os.status_name, os.is_final, pm.method_name, pm.processing_fee " + + " FROM user_orders uo " + + " LEFT JOIN OrderStatus os ON uo.order_id = os.status_id " + + " LEFT JOIN PaymentMethods pm ON uo.payment_id = pm.method_id" + + "), " + + "final_summary AS (" + + " SELECT " + + " user_id, " + + " name, " + + " COUNT(order_id) as total_orders, " + + " SUM(price) as total_spent, " + + " AVG(price) as avg_order_value, " + + " COUNT(CASE WHEN is_final = true THEN 1 END) as completed_orders " + + " FROM order_details " + + " GROUP BY user_id, name" + + ") " + + "SELECT * FROM final_summary"); + } + + @Test + void testAggregationAndGroupingWithMultiJoin() { + util.tableEnv() + .executeSql( + "CREATE TABLE OrderItems (" + + " item_id STRING PRIMARY KEY NOT ENFORCED," + + " order_id STRING," + + " product_name STRING," + + " quantity INT," + + " unit_price DOUBLE" + + ") WITH ('connector' = 'values', 'changelog-mode' = 'I,UA,D')"); + + util.tableEnv() + .executeSql( + "CREATE TABLE ProductCategories (" + + " category_id STRING PRIMARY KEY NOT ENFORCED," + + " category_name STRING," + + " parent_category STRING" + + ") WITH ('connector' = 'values', 'changelog-mode' = 'I,UA,D')"); + + util.verifyRelPlan( + "SELECT " + + "u.user_id, " + + "u.name, " + + "pc.category_name, " + + "COUNT(DISTINCT o.order_id) as order_count, " + + "SUM(oi.quantity) as total_items, " + + "SUM(oi.quantity * oi.unit_price) as total_value, " + + "AVG(oi.unit_price) as avg_item_price, " + + "MAX(p.price) as max_payment, " + + "MIN(p.price) as min_payment, " + + "COUNT(CASE WHEN oi.quantity > 5 THEN 1 END) as bulk_orders " + + "FROM Users u " + + "LEFT JOIN Orders o ON u.user_id = o.user_id " + + "LEFT JOIN OrderItems oi ON o.order_id = oi.order_id " + + "LEFT JOIN ProductCategories pc ON oi.product_name = pc.category_id " + + "LEFT JOIN Payments p ON u.user_id = p.user_id " + + "GROUP BY u.user_id, u.name, pc.category_name " + + "HAVING COUNT(DISTINCT o.order_id) > 0"); + } + + @Test + void testFunctionAndExpressionWithMultiJoin() { + util.tableEnv() + .executeSql( + "CREATE TABLE ProductDetails (" + + " product_id STRING PRIMARY KEY NOT ENFORCED," + + " product_name STRING," + + " description STRING," + + " created_date BIGINT," + + " tags STRING" + + ") WITH ('connector' = 'values', 'changelog-mode' = 'I,UA,D')"); + + util.tableEnv() + .executeSql( + "CREATE TABLE UserPreferences (" + + " user_id STRING PRIMARY KEY NOT ENFORCED," + + " preferred_category STRING," + + " notification_level STRING" + + ") WITH ('connector' = 'values', 'changelog-mode' = 'I,UA,D')"); + + util.verifyRelPlan( + "SELECT " + + "u.user_id, " + + "UPPER(u.name) as user_name_upper, " + + "LOWER(o.product) as product_lower, " + + "CONCAT(u.name, ' - ', o.product) as user_product, " + + "SUBSTRING(pd.description, 1, 50) as description_preview, " + + "CHAR_LENGTH(pd.description) as description_length, " + + "FLOOR(p.price / 100.0) * 100 as price_rounded, " + + "CASE " + + " WHEN p.price > 1000 THEN 'High' " + + " WHEN p.price > 500 THEN 'Medium' " + + " ELSE 'Low' " + + "END as price_tier, " + + "REGEXP_REPLACE(pd.tags, ',', ' | ') as formatted_tags, " + + "TO_TIMESTAMP_LTZ(pd.created_date, 3) as product_created, " + + "COALESCE(up.preferred_category, 'None') as user_preference, " + + "CASE " + + " WHEN up.notification_level = 'HIGH' THEN 'Frequent Updates' " + + " WHEN up.notification_level = 'MEDIUM' THEN 'Daily Updates' " + + " ELSE 'Weekly Updates' " + + "END as notification_frequency " + + "FROM Users u " + + "LEFT JOIN Orders o ON u.user_id = o.user_id " + + "LEFT JOIN Payments p ON u.user_id = p.user_id " + + "LEFT JOIN ProductDetails pd ON o.product = pd.product_id " + + "LEFT JOIN UserPreferences up ON u.user_id = up.user_id"); + } + + @Test + void testComplexNestedCTEWithAggregationAndFunctions() { + util.tableEnv() + .executeSql( + "CREATE TABLE OrderMetrics (" + + " metric_id STRING PRIMARY KEY NOT ENFORCED," + + " order_id STRING," + + " metric_type STRING," + + " metric_value DOUBLE" + + ") WITH ('connector' = 'values', 'changelog-mode' = 'I,UA,D')"); + + util.verifyRelPlan( + "WITH base_orders AS (" + + " SELECT u.user_id, u.name, o.order_id, p.payment_id, p.price " + + " FROM Users u " + + " INNER JOIN Orders o ON u.user_id = o.user_id " + + " INNER JOIN Payments p ON u.user_id = p.user_id" + + "), " + + "enriched_orders AS (" + + " SELECT " + + " bo.*, " + + " om.metric_type, " + + " om.metric_value, " + + " CASE " + + " WHEN bo.price > 1000 THEN 'Premium' " + + " WHEN bo.price > 500 THEN 'Standard' " + + " ELSE 'Basic' " + + " END as order_tier " + + " FROM base_orders bo " + + " LEFT JOIN OrderMetrics om ON bo.order_id = om.order_id" + + "), " + + "aggregated_metrics AS (" + + " SELECT " + + " user_id, " + + " name, " + + " COUNT(DISTINCT order_id) as total_orders, " + + " SUM(price) as total_spent, " + + " AVG(price) as avg_order_value, " + + " MAX(metric_value) as max_metric, " + + " MIN(metric_value) as min_metric, " + + " COUNT(CASE WHEN order_tier = 'Premium' THEN 1 END) as premium_orders " + + " FROM enriched_orders " + + " GROUP BY user_id, name" + + ") " + + "SELECT " + + " user_id, " + + " UPPER(name) as user_name, " + + " total_orders, " + + " ROUND(total_spent, 2) as total_spent_rounded, " + + " ROUND(avg_order_value, 2) as avg_order_value_rounded, " + + " CONCAT('User: ', name, ' has ', CAST(total_orders AS STRING), ' orders') as summary, " + + " CASE " + + " WHEN total_orders > 10 THEN 'Frequent Customer' " + + " WHEN total_orders > 5 THEN 'Regular Customer' " + + " ELSE 'Occasional Customer' " + + " END as customer_type " + + "FROM aggregated_metrics " + + "WHERE total_spent > 0"); + } + + @Test + void testCTEWithMultiJoinV2() { + util.tableEnv() + .executeSql( + "CREATE TABLE Departments (" + + " dept_id STRING PRIMARY KEY NOT ENFORCED," + + " dept_name STRING," + + " budget DOUBLE" + + ") WITH ('connector' = 'values', 'changelog-mode' = 'I,UA,D')"); + + util.tableEnv() + .executeSql( + "CREATE TABLE Projects (" + + " project_id STRING PRIMARY KEY NOT ENFORCED," + + " project_name STRING," + + " dept_id STRING," + + " status STRING" + + ") WITH ('connector' = 'values', 'changelog-mode' = 'I,UA,D')"); + + util.verifyRelPlan( + "WITH high_budget_depts AS (" + + " SELECT dept_id, dept_name, budget " + + " FROM Departments " + + " WHERE budget > 600000" + + "), " + + "active_projects AS (" + + " SELECT project_id, project_name, dept_id " + + " FROM Projects " + + " WHERE status = 'ACTIVE'" + + ") " + + "SELECT " + + " u.user_id, " + + " u.name, " + + " o.order_id, " + + " hbd.dept_name, " + + " ap.project_name, " + + " hbd.budget " + + "FROM Users u " + + "LEFT JOIN Orders o ON u.user_id = o.user_id " + + "LEFT JOIN high_budget_depts hbd ON o.product = hbd.dept_id " + + "LEFT JOIN active_projects ap ON hbd.dept_id = ap.dept_id"); + } + + @Test + void testAggregationAndGroupingWithMultiJoinV2() { + util.tableEnv() + .executeSql( + "CREATE TABLE Categories (" + + " category_id STRING PRIMARY KEY NOT ENFORCED," + + " category_name STRING," + + " parent_category STRING" + + ") WITH ('connector' = 'values', 'changelog-mode' = 'I,UA,D')"); + + util.tableEnv() + .executeSql( + "CREATE TABLE Sales (" + + " sale_id STRING PRIMARY KEY NOT ENFORCED," + + " user_id STRING," + + " product_id STRING," + + " amount DOUBLE," + + " sale_date DATE" + + ") WITH ('connector' = 'values', 'changelog-mode' = 'I,UA,D')"); + + util.verifyRelPlan( + "SELECT " + + " c.category_name, " + + " COUNT(DISTINCT u.user_id) AS unique_users, " + + " COUNT(s.sale_id) AS total_sales, " + + " SUM(s.amount) AS total_revenue, " + + " AVG(s.amount) AS avg_sale_amount, " + + " MAX(s.amount) AS max_sale_amount " + + "FROM Users u " + + "LEFT JOIN Orders o ON u.user_id = o.user_id " + + "LEFT JOIN Categories c ON o.product = c.category_id " + + "LEFT JOIN Sales s ON u.user_id = s.user_id " + + "GROUP BY c.category_name " + + "HAVING COUNT(s.sale_id) > 0"); + } + + @Test + void testFunctionAndExpressionWithMultiJoinV2() { + util.tableEnv() + .executeSql( + "CREATE TABLE ProductDetails (" + + " product_id STRING PRIMARY KEY NOT ENFORCED," + + " product_name STRING," + + " price DOUBLE," + + " weight DOUBLE," + + " created_date DATE" + + ") WITH ('connector' = 'values', 'changelog-mode' = 'I,UA,D')"); + + util.tableEnv() + .executeSql( + "CREATE TABLE Reviews (" + + " review_id STRING PRIMARY KEY NOT ENFORCED," + + " product_id STRING," + + " rating INT," + + " review_text STRING," + + " review_date DATE" + + ") WITH ('connector' = 'values', 'changelog-mode' = 'I,UA,D')"); + + util.verifyRelPlan( + "SELECT " + + " u.user_id, " + + " u.name, " + + " o.order_id, " + + " pd.product_name, " + + " pd.price, " + + " ROUND(pd.price * 1.1, 2) AS price_with_tax, " + + " CONCAT('Product: ', pd.product_name) AS product_description, " + + " CHAR_LENGTH(r.review_text) AS review_length, " + + " UPPER(SUBSTRING(r.review_text, 1, 10)) AS review_preview, " + + " CASE " + + " WHEN r.rating >= 4 THEN 'High Rating' " + + " WHEN r.rating >= 3 THEN 'Medium Rating' " + + " ELSE 'Low Rating' " + + " END AS rating_category, " + + " TIMESTAMPDIFF(DAY, pd.created_date, CURRENT_DATE) AS days_since_created " + + "FROM Users u " + + "LEFT JOIN Orders o ON u.user_id = o.user_id " + + "LEFT JOIN ProductDetails pd ON o.product = pd.product_id " + + "LEFT JOIN Reviews r ON pd.product_id = r.product_id"); + } + + @Test + void testCrossJoinUnnestWithMultiJoinInsert() { + util.tableEnv() + .executeSql( + "CREATE TABLE UnnestSink (" + + " detail_id STRING," + + " element_data STRING," + + " data_value_id INT," + + " user_id STRING," + + " order_id STRING" + + ") WITH ('connector' = 'values', 'sink-insert-only' = 'false')"); + + util.verifyRelPlanInsert( + "INSERT INTO UnnestSink\n" + + "(\n" + + " detail_id,\n" + + " element_data,\n" + + " data_value_id,\n" + + " user_id,\n" + + " order_id\n" + + ")\n" + + "SELECT\n" + + " d.detail_id,\n" + + " TRIM(REGEXP_REPLACE(edata, '[\\[\\]\\\"]', '')) AS element_data,\n" + + " ARRAY_POSITION(split(REGEXP_REPLACE(d.data, '^\\[\"|\"\\]$', '') , '\", \"'), edata) as data_value_id,\n" + + " d.user_id,\n" + + " o.order_id\n" + + "FROM Detail d\n" + + "INNER JOIN Orders o\n" + + " ON o.user_id = d.user_id\n" + + "INNER JOIN Payments p\n" + + " ON p.user_id = d.user_id\n" + + "LEFT JOIN Shipments s\n" + + " ON s.user_id = d.user_id\n" + + "CROSS JOIN UNNEST(split(REGEXP_REPLACE(d.data, '^\\[\"|\"\\]$', '') , '\", \"')) AS T(edata)\n" + + "WHERE NOT (s.location IS NOT NULL)"); + } } diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.xml index 149ed09e91157..39453a724c3e0 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.xml @@ -16,13 +16,398 @@ See the License for the specific language governing permissions and limitations under the License. --> + + + 5 THEN 1 END) as bulk_orders FROM Users u LEFT JOIN Orders o ON u.user_id = o.user_id LEFT JOIN OrderItems oi ON o.order_id = oi.order_id LEFT JOIN ProductCategories pc ON oi.product_name = pc.category_id LEFT JOIN Payments p ON u.user_id = p.user_id GROUP BY u.user_id, u.name, pc.category_name HAVING COUNT(DISTINCT o.order_id) > 0]]> + + + ($3, 0)]) ++- LogicalAggregate(group=[{0, 1, 2}], order_count=[COUNT(DISTINCT $3)], total_items=[SUM($4)], total_value=[SUM($5)], avg_item_price=[AVG($6)], max_payment=[MAX($7)], min_payment=[MIN($7)], bulk_orders=[COUNT($8)]) + +- LogicalProject(user_id=[$0], name=[$1], category_name=[$12], order_id=[$3], quantity=[$9], $f5=[*($9, $10)], unit_price=[$10], price=[$15], $f8=[CASE(>($9, 5), 1, null:INTEGER)]) + +- LogicalJoin(condition=[=($0, $16)], joinType=[left]) + :- LogicalJoin(condition=[=($8, $11)], joinType=[left]) + : :- LogicalJoin(condition=[=($3, $7)], joinType=[left]) + : : :- LogicalJoin(condition=[=($0, $4)], joinType=[left]) + : : : :- LogicalTableScan(table=[[default_catalog, default_database, Users]]) + : : : +- LogicalTableScan(table=[[default_catalog, default_database, Orders]]) + : : +- LogicalTableScan(table=[[default_catalog, default_database, OrderItems]]) + : +- LogicalTableScan(table=[[default_catalog, default_database, ProductCategories]]) + +- LogicalTableScan(table=[[default_catalog, default_database, Payments]]) +]]> + + + (order_count, 0)]) ++- GroupAggregate(groupBy=[user_id, name, category_name], select=[user_id, name, category_name, COUNT_RETRACT(DISTINCT order_id) AS order_count, SUM_RETRACT(quantity) AS total_items, SUM_RETRACT($f5) AS total_value, AVG_RETRACT(unit_price) AS avg_item_price, MAX_RETRACT(price) AS max_payment, MIN_RETRACT(price) AS min_payment, COUNT_RETRACT($f8) AS bulk_orders]) + +- Exchange(distribution=[hash[user_id, name, category_name]]) + +- Calc(select=[user_id, name, category_name, order_id, quantity, *(quantity, unit_price) AS $f5, unit_price, price, CASE(>(quantity, 5), 1, null:INTEGER) AS $f8]) + +- MultiJoin(commonJoinKey=[user_id], joinTypes=[LEFT], inputUniqueKeys=[noUniqueKey, noUniqueKey], joinConditions=[=(user_id, user_id1)], select=[user_id,name,order_id,quantity,unit_price,category_name,price,user_id1], rowType=[RecordType(VARCHAR(2147483647) user_id, VARCHAR(2147483647) name, VARCHAR(2147483647) order_id, INTEGER quantity, DOUBLE unit_price, VARCHAR(2147483647) category_name, INTEGER price, VARCHAR(2147483647) user_id1)]) + :- Exchange(distribution=[hash[user_id]]) + : +- Calc(select=[user_id, name, order_id, quantity, unit_price, category_name]) + : +- MultiJoin(commonJoinKey=[product_name], joinTypes=[LEFT], inputUniqueKeys=[noUniqueKey, (category_id)], joinConditions=[=(product_name, category_id)], select=[user_id,name,order_id,product_name,quantity,unit_price,category_id,category_name], rowType=[RecordType(VARCHAR(2147483647) user_id, VARCHAR(2147483647) name, VARCHAR(2147483647) order_id, VARCHAR(2147483647) product_name, INTEGER quantity, DOUBLE unit_price, VARCHAR(2147483647) category_id, VARCHAR(2147483647) category_name)]) + : :- Exchange(distribution=[hash[product_name]]) + : : +- Calc(select=[user_id, name, order_id, product_name, quantity, unit_price]) + : : +- MultiJoin(commonJoinKey=[order_id], joinTypes=[LEFT], inputUniqueKeys=[noUniqueKey, noUniqueKey], joinConditions=[=(order_id, order_id0)], select=[user_id,name,order_id,order_id0,product_name,quantity,unit_price], rowType=[RecordType(VARCHAR(2147483647) user_id, VARCHAR(2147483647) name, VARCHAR(2147483647) order_id, VARCHAR(2147483647) order_id0, VARCHAR(2147483647) product_name, INTEGER quantity, DOUBLE unit_price)]) + : : :- Exchange(distribution=[hash[order_id]]) + : : : +- Calc(select=[user_id, name, order_id]) + : : : +- MultiJoin(commonJoinKey=[user_id], joinTypes=[LEFT], inputUniqueKeys=[(user_id), (order_id)], joinConditions=[=(user_id, user_id0)], select=[user_id,name,order_id,user_id0], rowType=[RecordType(VARCHAR(2147483647) user_id, VARCHAR(2147483647) name, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id0)]) + : : : :- Exchange(distribution=[hash[user_id]]) + : : : : +- ChangelogNormalize(key=[user_id]) + : : : : +- Exchange(distribution=[hash[user_id]]) + : : : : +- TableSourceScan(table=[[default_catalog, default_database, Users, project=[user_id, name], metadata=[]]], fields=[user_id, name]) + : : : +- Exchange(distribution=[hash[user_id]]) + : : : +- TableSourceScan(table=[[default_catalog, default_database, Orders, project=[order_id, user_id], metadata=[]]], fields=[order_id, user_id]) + : : +- Exchange(distribution=[hash[order_id]]) + : : +- Calc(select=[order_id, product_name, quantity, unit_price]) + : : +- ChangelogNormalize(key=[item_id]) + : : +- Exchange(distribution=[hash[item_id]]) + : : +- TableSourceScan(table=[[default_catalog, default_database, OrderItems]], fields=[item_id, order_id, product_name, quantity, unit_price]) + : +- Exchange(distribution=[hash[category_id]]) + : +- ChangelogNormalize(key=[category_id]) + : +- Exchange(distribution=[hash[category_id]]) + : +- TableSourceScan(table=[[default_catalog, default_database, ProductCategories, project=[category_id, category_name], metadata=[]]], fields=[category_id, category_name]) + +- Exchange(distribution=[hash[user_id]]) + +- TableSourceScan(table=[[default_catalog, default_database, Payments, project=[price, user_id], metadata=[]]], fields=[price, user_id]) +]]> + + + + + 0]]> + + + ($2, 0)]) ++- LogicalAggregate(group=[{0}], unique_users=[COUNT(DISTINCT $1)], total_sales=[COUNT($2)], total_revenue=[SUM($3)], avg_sale_amount=[AVG($3)], max_sale_amount=[MAX($3)]) + +- LogicalProject(category_name=[$7], user_id=[$0], sale_id=[$9], amount=[$12]) + +- LogicalJoin(condition=[=($0, $10)], joinType=[left]) + :- LogicalJoin(condition=[=($5, $6)], joinType=[left]) + : :- LogicalJoin(condition=[=($0, $4)], joinType=[left]) + : : :- LogicalTableScan(table=[[default_catalog, default_database, Users]]) + : : +- LogicalTableScan(table=[[default_catalog, default_database, Orders]]) + : +- LogicalTableScan(table=[[default_catalog, default_database, Categories]]) + +- LogicalTableScan(table=[[default_catalog, default_database, Sales]]) +]]> + + + (total_sales, 0)]) ++- GroupAggregate(groupBy=[category_name], select=[category_name, COUNT_RETRACT(DISTINCT user_id) AS unique_users, COUNT_RETRACT(sale_id) AS total_sales, SUM_RETRACT(amount) AS total_revenue, AVG_RETRACT(amount) AS avg_sale_amount, MAX_RETRACT(amount) AS max_sale_amount]) + +- Exchange(distribution=[hash[category_name]]) + +- Calc(select=[category_name, user_id, sale_id, amount]) + +- MultiJoin(commonJoinKey=[user_id], joinTypes=[LEFT], inputUniqueKeys=[noUniqueKey, (sale_id)], joinConditions=[=(user_id, user_id1)], select=[user_id,category_name,sale_id,user_id1,amount], rowType=[RecordType(VARCHAR(2147483647) user_id, VARCHAR(2147483647) category_name, VARCHAR(2147483647) sale_id, VARCHAR(2147483647) user_id1, DOUBLE amount)]) + :- Exchange(distribution=[hash[user_id]]) + : +- Calc(select=[user_id, category_name]) + : +- MultiJoin(commonJoinKey=[product], joinTypes=[LEFT], inputUniqueKeys=[noUniqueKey, (category_id)], joinConditions=[=(product, category_id)], select=[user_id,product,category_id,category_name], rowType=[RecordType(VARCHAR(2147483647) user_id, VARCHAR(2147483647) product, VARCHAR(2147483647) category_id, VARCHAR(2147483647) category_name)]) + : :- Exchange(distribution=[hash[product]]) + : : +- Calc(select=[user_id, product]) + : : +- MultiJoin(commonJoinKey=[user_id], joinTypes=[LEFT], inputUniqueKeys=[(user_id), noUniqueKey], joinConditions=[=(user_id, user_id0)], select=[user_id,user_id0,product], rowType=[RecordType(VARCHAR(2147483647) user_id, VARCHAR(2147483647) user_id0, VARCHAR(2147483647) product)]) + : : :- Exchange(distribution=[hash[user_id]]) + : : : +- ChangelogNormalize(key=[user_id]) + : : : +- Exchange(distribution=[hash[user_id]]) + : : : +- TableSourceScan(table=[[default_catalog, default_database, Users, project=[user_id], metadata=[]]], fields=[user_id]) + : : +- Exchange(distribution=[hash[user_id]]) + : : +- TableSourceScan(table=[[default_catalog, default_database, Orders, project=[user_id, product], metadata=[]]], fields=[user_id, product]) + : +- Exchange(distribution=[hash[category_id]]) + : +- ChangelogNormalize(key=[category_id]) + : +- Exchange(distribution=[hash[category_id]]) + : +- TableSourceScan(table=[[default_catalog, default_database, Categories, project=[category_id, category_name], metadata=[]]], fields=[category_id, category_name]) + +- Exchange(distribution=[hash[user_id]]) + +- ChangelogNormalize(key=[sale_id]) + +- Exchange(distribution=[hash[sale_id]]) + +- TableSourceScan(table=[[default_catalog, default_database, Sales, project=[sale_id, user_id, amount], metadata=[]]], fields=[sale_id, user_id, amount]) +]]> + + + + + 1000 THEN 'High-Value Premium' WHEN pc.is_premium = true THEN 'Premium' WHEN p.price > 500 THEN 'Standard High-Value' ELSE 'Standard' END AS product_tier, CASE WHEN pr.rating >= 4 AND pr.is_verified = true THEN 'Highly Recommended' WHEN pr.rating >= 3 THEN 'Recommended' WHEN pr.rating >= 2 THEN 'Average' ELSE 'Not Recommended' END AS recommendation_status, CASE WHEN pc.discount_rate > 0.2 THEN p.price * (1 - pc.discount_rate) ELSE p.price END AS final_price FROM Users u LEFT JOIN Orders o ON u.user_id = o.user_id LEFT JOIN Payments p ON u.user_id = p.user_id LEFT JOIN ProductCategories pc ON o.product = pc.category_id LEFT JOIN ProductReviews pr ON o.product = pr.product_id]]> + + + ($7, 1000)), _UTF-16LE'High-Value Premium':VARCHAR(19) CHARACTER SET "UTF-16LE", $11, _UTF-16LE'Premium':VARCHAR(19) CHARACTER SET "UTF-16LE", >($7, 500), _UTF-16LE'Standard High-Value':VARCHAR(19) CHARACTER SET "UTF-16LE", _UTF-16LE'Standard':VARCHAR(19) CHARACTER SET "UTF-16LE")], recommendation_status=[CASE(AND(>=($15, 4), $16), _UTF-16LE'Highly Recommended':VARCHAR(18) CHARACTER SET "UTF-16LE", >=($15, 3), _UTF-16LE'Recommended':VARCHAR(18) CHARACTER SET "UTF-16LE", >=($15, 2), _UTF-16LE'Average':VARCHAR(18) CHARACTER SET "UTF-16LE", _UTF-16LE'Not Recommended':VARCHAR(18) CHARACTER SET "UTF-16LE")], final_price=[CASE(>($12, 0.2:DECIMAL(2, 1)), *($7, -(1, $12)), CAST($7):DOUBLE)]) ++- LogicalJoin(condition=[=($5, $14)], joinType=[left]) + :- LogicalJoin(condition=[=($5, $9)], joinType=[left]) + : :- LogicalJoin(condition=[=($0, $8)], joinType=[left]) + : : :- LogicalJoin(condition=[=($0, $4)], joinType=[left]) + : : : :- LogicalTableScan(table=[[default_catalog, default_database, Users]]) + : : : +- LogicalTableScan(table=[[default_catalog, default_database, Orders]]) + : : +- LogicalTableScan(table=[[default_catalog, default_database, Payments]]) + : +- LogicalTableScan(table=[[default_catalog, default_database, ProductCategories]]) + +- LogicalTableScan(table=[[default_catalog, default_database, ProductReviews]]) +]]> + + + (price, 1000)), 'High-Value Premium', is_premium, 'Premium', >(price, 500), 'Standard High-Value', 'Standard') AS product_tier, CASE(AND(>=(rating, 4), is_verified), 'Highly Recommended', >=(rating, 3), 'Recommended', >=(rating, 2), 'Average', 'Not Recommended') AS recommendation_status, CASE(>(discount_rate, 0.2), *(price, -(1, discount_rate)), CAST(price AS DOUBLE)) AS final_price]) ++- MultiJoin(commonJoinKey=[product], joinTypes=[LEFT, LEFT], inputUniqueKeys=[noUniqueKey, (category_id), noUniqueKey], joinConditions=[=(product, category_id), =(product, product_id)], select=[user_id,order_id,product,payment_id,price,category_id,category_name,is_premium,discount_rate,product_id,rating,is_verified], rowType=[RecordType(VARCHAR(2147483647) user_id, VARCHAR(2147483647) order_id, VARCHAR(2147483647) product, VARCHAR(2147483647) payment_id, INTEGER price, VARCHAR(2147483647) category_id, VARCHAR(2147483647) category_name, BOOLEAN is_premium, DOUBLE discount_rate, VARCHAR(2147483647) product_id, INTEGER rating, BOOLEAN is_verified)]) + :- Exchange(distribution=[hash[product]]) + : +- Calc(select=[user_id, order_id, product, payment_id, price]) + : +- MultiJoin(commonJoinKey=[user_id], joinTypes=[LEFT, LEFT], inputUniqueKeys=[(user_id), (order_id), (payment_id)], joinConditions=[=(user_id, user_id0), =(user_id, user_id1)], select=[user_id,order_id,user_id0,product,payment_id,price,user_id1], rowType=[RecordType(VARCHAR(2147483647) user_id, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id0, VARCHAR(2147483647) product, VARCHAR(2147483647) payment_id, INTEGER price, VARCHAR(2147483647) user_id1)]) + : :- Exchange(distribution=[hash[user_id]]) + : : +- ChangelogNormalize(key=[user_id]) + : : +- Exchange(distribution=[hash[user_id]]) + : : +- TableSourceScan(table=[[default_catalog, default_database, Users, project=[user_id], metadata=[]]], fields=[user_id]) + : :- Exchange(distribution=[hash[user_id]]) + : : +- TableSourceScan(table=[[default_catalog, default_database, Orders]], fields=[order_id, user_id, product]) + : +- Exchange(distribution=[hash[user_id]]) + : +- TableSourceScan(table=[[default_catalog, default_database, Payments]], fields=[payment_id, price, user_id]) + :- Exchange(distribution=[hash[category_id]]) + : +- ChangelogNormalize(key=[category_id]) + : +- Exchange(distribution=[hash[category_id]]) + : +- TableSourceScan(table=[[default_catalog, default_database, ProductCategories]], fields=[category_id, category_name, is_premium, discount_rate]) + +- Exchange(distribution=[hash[product_id]]) + +- Calc(select=[product_id, rating, is_verified]) + +- ChangelogNormalize(key=[review_id]) + +- Exchange(distribution=[hash[review_id]]) + +- TableSourceScan(table=[[default_catalog, default_database, ProductReviews]], fields=[review_id, product_id, rating, is_verified]) +]]> + + + + + + + + + + + + + + + + 1000 THEN 'Premium' WHEN bo.price > 500 THEN 'Standard' ELSE 'Basic' END as order_tier FROM base_orders bo LEFT JOIN OrderMetrics om ON bo.order_id = om.order_id), aggregated_metrics AS ( SELECT user_id, name, COUNT(DISTINCT order_id) as total_orders, SUM(price) as total_spent, AVG(price) as avg_order_value, MAX(metric_value) as max_metric, MIN(metric_value) as min_metric, COUNT(CASE WHEN order_tier = 'Premium' THEN 1 END) as premium_orders FROM enriched_orders GROUP BY user_id, name) SELECT user_id, UPPER(name) as user_name, total_orders, ROUND(total_spent, 2) as total_spent_rounded, ROUND(avg_order_value, 2) as avg_order_value_rounded, CONCAT('User: ', name, ' has ', CAST(total_orders AS STRING), ' orders') as summary, CASE WHEN total_orders > 10 THEN 'Frequent Customer' WHEN total_orders > 5 THEN 'Regular Customer' ELSE 'Occasional Customer' END as customer_type FROM aggregated_metrics WHERE total_spent > 0]]> + + + ($2, 10), _UTF-16LE'Frequent Customer':VARCHAR(19) CHARACTER SET "UTF-16LE", >($2, 5), _UTF-16LE'Regular Customer':VARCHAR(19) CHARACTER SET "UTF-16LE", _UTF-16LE'Occasional Customer':VARCHAR(19) CHARACTER SET "UTF-16LE")]) ++- LogicalFilter(condition=[>($3, 0)]) + +- LogicalAggregate(group=[{0, 1}], total_orders=[COUNT(DISTINCT $2)], total_spent=[SUM($3)], avg_order_value=[AVG($3)], max_metric=[MAX($4)], min_metric=[MIN($4)], premium_orders=[COUNT($5)]) + +- LogicalProject(user_id=[$0], name=[$1], order_id=[$2], price=[$4], metric_value=[$6], $f5=[CASE(=($7, _UTF-16LE'Premium'), 1, null:INTEGER)]) + +- LogicalProject(user_id=[$0], name=[$1], order_id=[$2], payment_id=[$3], price=[$4], metric_type=[$7], metric_value=[$8], order_tier=[CASE(>($4, 1000), _UTF-16LE'Premium':VARCHAR(8) CHARACTER SET "UTF-16LE", >($4, 500), _UTF-16LE'Standard':VARCHAR(8) CHARACTER SET "UTF-16LE", _UTF-16LE'Basic':VARCHAR(8) CHARACTER SET "UTF-16LE")]) + +- LogicalJoin(condition=[=($2, $6)], joinType=[left]) + :- LogicalProject(user_id=[$0], name=[$1], order_id=[$3], payment_id=[$6], price=[$7]) + : +- LogicalJoin(condition=[=($0, $8)], joinType=[inner]) + : :- LogicalJoin(condition=[=($0, $4)], joinType=[inner]) + : : :- LogicalTableScan(table=[[default_catalog, default_database, Users]]) + : : +- LogicalTableScan(table=[[default_catalog, default_database, Orders]]) + : +- LogicalTableScan(table=[[default_catalog, default_database, Payments]]) + +- LogicalTableScan(table=[[default_catalog, default_database, OrderMetrics]]) +]]> + + + (total_orders, 10), 'Frequent Customer', >(total_orders, 5), 'Regular Customer', 'Occasional Customer') AS customer_type], where=[>(total_spent, 0)]) ++- GroupAggregate(groupBy=[user_id, name], select=[user_id, name, COUNT_RETRACT(DISTINCT order_id) AS total_orders, SUM_RETRACT(price) AS total_spent, AVG_RETRACT(price) AS avg_order_value]) + +- Exchange(distribution=[hash[user_id, name]]) + +- MultiJoin(commonJoinKey=[order_id], joinTypes=[LEFT], inputUniqueKeys=[noUniqueKey, noUniqueKey], joinConditions=[=(order_id, order_id0)], select=[user_id,name,order_id,price,order_id0,metric_value], rowType=[RecordType(VARCHAR(2147483647) user_id, VARCHAR(2147483647) name, VARCHAR(2147483647) order_id, INTEGER price, VARCHAR(2147483647) order_id0, DOUBLE metric_value)]) + :- Exchange(distribution=[hash[order_id]]) + : +- Calc(select=[user_id, name, order_id, price]) + : +- MultiJoin(commonJoinKey=[user_id], joinTypes=[INNER, INNER], inputUniqueKeys=[(user_id), (order_id), noUniqueKey], joinConditions=[=(user_id, user_id0), =(user_id, user_id1)], select=[user_id,name,order_id,user_id0,price,user_id1], rowType=[RecordType(VARCHAR(2147483647) user_id, VARCHAR(2147483647) name, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id0, INTEGER price, VARCHAR(2147483647) user_id1)]) + : :- Exchange(distribution=[hash[user_id]]) + : : +- ChangelogNormalize(key=[user_id]) + : : +- Exchange(distribution=[hash[user_id]]) + : : +- TableSourceScan(table=[[default_catalog, default_database, Users, project=[user_id, name], metadata=[]]], fields=[user_id, name]) + : :- Exchange(distribution=[hash[user_id]]) + : : +- TableSourceScan(table=[[default_catalog, default_database, Orders, project=[order_id, user_id], metadata=[]]], fields=[order_id, user_id]) + : +- Exchange(distribution=[hash[user_id]]) + : +- TableSourceScan(table=[[default_catalog, default_database, Payments, project=[price, user_id], metadata=[]]], fields=[price, user_id]) + +- Exchange(distribution=[hash[order_id]]) + +- Calc(select=[order_id, metric_value]) + +- ChangelogNormalize(key=[metric_id]) + +- Exchange(distribution=[hash[metric_id]]) + +- TableSourceScan(table=[[default_catalog, default_database, OrderMetrics, project=[order_id, metric_value, metric_id], metadata=[]]], fields=[order_id, metric_value, metric_id]) +]]> + + + + + + + + + + + + + + + + + + + + + + + + + + + 600000), active_projects AS ( SELECT project_id, project_name, dept_id FROM Projects WHERE status = 'ACTIVE') SELECT u.user_id, u.name, o.order_id, hbd.dept_name, ap.project_name, hbd.budget FROM Users u LEFT JOIN Orders o ON u.user_id = o.user_id LEFT JOIN high_budget_depts hbd ON o.product = hbd.dept_id LEFT JOIN active_projects ap ON hbd.dept_id = ap.dept_id]]> + + + ($2, 600000)]) + : +- LogicalTableScan(table=[[default_catalog, default_database, Departments]]) + +- LogicalProject(project_id=[$0], project_name=[$1], dept_id=[$2]) + +- LogicalFilter(condition=[=($3, _UTF-16LE'ACTIVE')]) + +- LogicalTableScan(table=[[default_catalog, default_database, Projects]]) +]]> + + + (budget, 600000)]) + : +- Exchange(distribution=[hash[dept_id]]) + : +- TableSourceScan(table=[[default_catalog, default_database, Departments, filter=[]]], fields=[dept_id, dept_name, budget]) + +- Exchange(distribution=[hash[dept_id]]) + +- Calc(select=[project_name, dept_id]) + +- ChangelogNormalize(key=[project_id], condition=[=(status, 'ACTIVE')]) + +- Exchange(distribution=[hash[project_id]]) + +- TableSourceScan(table=[[default_catalog, default_database, Projects, filter=[]]], fields=[project_id, project_name, dept_id, status]) +]]> + + - = p.price OR p.price < 0) LEFT JOIN Shipments s ON p.user_id_2 = s.user_id_3]]> + = p.price OR p.price < 0) LEFT JOIN Shipments s ON p.user_id = s.user_id]]> =($2, $7), <($7, 0)))], joinType=[inner]) : :- LogicalJoin(condition=[=($0, $4)], joinType=[left]) @@ -34,25 +419,25 @@ LogicalProject(user_id_0=[$0], name=[$1], order_id=[$3], payment_id=[$6], locati = price) OR (price < 0))), (user_id_2 = user_id_3)], select=[user_id_0,name,cash,order_id,user_id_1,payment_id,price,user_id_2,location,user_id_3], rowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) name, INTEGER cash, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, VARCHAR(2147483647) payment_id, INTEGER price, VARCHAR(2147483647) user_id_2, VARCHAR(2147483647) location, VARCHAR(2147483647) user_id_3)]) - :- Exchange(distribution=[hash[user_id_0]]) - : +- ChangelogNormalize(key=[user_id_0]) - : +- Exchange(distribution=[hash[user_id_0]]) - : +- TableSourceScan(table=[[default_catalog, default_database, Users]], fields=[user_id_0, name, cash]) - :- Exchange(distribution=[hash[user_id_1]]) - : +- TableSourceScan(table=[[default_catalog, default_database, Orders, project=[order_id, user_id_1], metadata=[]]], fields=[order_id, user_id_1]) - :- Exchange(distribution=[hash[user_id_2]]) - : +- TableSourceScan(table=[[default_catalog, default_database, Payments]], fields=[payment_id, price, user_id_2]) - +- Exchange(distribution=[hash[user_id_3]]) - +- TableSourceScan(table=[[default_catalog, default_database, Shipments]], fields=[location, user_id_3]) +Calc(select=[user_id, name, order_id, payment_id, location]) ++- MultiJoin(commonJoinKey=[user_id], joinTypes=[LEFT, INNER, LEFT], inputUniqueKeys=[(user_id), (order_id), (payment_id), noUniqueKey], joinConditions=[(user_id = user_id0), ((user_id = user_id1) AND ((cash >= price) OR (price < 0))), (user_id1 = user_id2)], select=[user_id,name,cash,order_id,user_id0,payment_id,price,user_id1,location,user_id2], rowType=[RecordType(VARCHAR(2147483647) user_id, VARCHAR(2147483647) name, INTEGER cash, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id0, VARCHAR(2147483647) payment_id, INTEGER price, VARCHAR(2147483647) user_id1, VARCHAR(2147483647) location, VARCHAR(2147483647) user_id2)]) + :- Exchange(distribution=[hash[user_id]]) + : +- ChangelogNormalize(key=[user_id]) + : +- Exchange(distribution=[hash[user_id]]) + : +- TableSourceScan(table=[[default_catalog, default_database, Users]], fields=[user_id, name, cash]) + :- Exchange(distribution=[hash[user_id]]) + : +- TableSourceScan(table=[[default_catalog, default_database, Orders, project=[order_id, user_id], metadata=[]]], fields=[order_id, user_id]) + :- Exchange(distribution=[hash[user_id]]) + : +- TableSourceScan(table=[[default_catalog, default_database, Payments]], fields=[payment_id, price, user_id]) + +- Exchange(distribution=[hash[user_id]]) + +- TableSourceScan(table=[[default_catalog, default_database, Shipments]], fields=[location, user_id]) ]]> =($2, $7), <($7, 0)))], joinType=[inner]) : :- LogicalJoin(condition=[=($0, $4)], joinType=[left]) @@ -62,42 +447,42 @@ LogicalProject(user_id_0=[$0], name=[$1], order_id=[$3], payment_id=[$6], locati +- LogicalTableScan(table=[[default_catalog, default_database, Shipments]]) == Optimized Physical Plan == -Calc(select=[user_id_0, name, order_id, payment_id, location]) -+- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[LEFT, INNER, LEFT], inputUniqueKeys=[(user_id_0), (order_id), (payment_id), noUniqueKey], joinConditions=[=(user_id_0, user_id_1), AND(=(user_id_0, user_id_2), OR(>=(cash, price), <(price, 0))), =(user_id_2, user_id_3)], select=[user_id_0,name,cash,order_id,user_id_1,payment_id,price,user_id_2,location,user_id_3], rowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) name, INTEGER cash, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, VARCHAR(2147483647) payment_id, INTEGER price, VARCHAR(2147483647) user_id_2, VARCHAR(2147483647) location, VARCHAR(2147483647) user_id_3)]) - :- Exchange(distribution=[hash[user_id_0]]) - : +- ChangelogNormalize(key=[user_id_0]) - : +- Exchange(distribution=[hash[user_id_0]]) - : +- TableSourceScan(table=[[default_catalog, default_database, Users]], fields=[user_id_0, name, cash]) - :- Exchange(distribution=[hash[user_id_1]]) - : +- TableSourceScan(table=[[default_catalog, default_database, Orders, project=[order_id, user_id_1], metadata=[]]], fields=[order_id, user_id_1]) - :- Exchange(distribution=[hash[user_id_2]]) - : +- TableSourceScan(table=[[default_catalog, default_database, Payments]], fields=[payment_id, price, user_id_2]) - +- Exchange(distribution=[hash[user_id_3]]) - +- TableSourceScan(table=[[default_catalog, default_database, Shipments]], fields=[location, user_id_3]) +Calc(select=[user_id, name, order_id, payment_id, location]) ++- MultiJoin(commonJoinKey=[user_id], joinTypes=[LEFT, INNER, LEFT], inputUniqueKeys=[(user_id), (order_id), (payment_id), noUniqueKey], joinConditions=[=(user_id, user_id0), AND(=(user_id, user_id1), OR(>=(cash, price), <(price, 0))), =(user_id1, user_id2)], select=[user_id,name,cash,order_id,user_id0,payment_id,price,user_id1,location,user_id2], rowType=[RecordType(VARCHAR(2147483647) user_id, VARCHAR(2147483647) name, INTEGER cash, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id0, VARCHAR(2147483647) payment_id, INTEGER price, VARCHAR(2147483647) user_id1, VARCHAR(2147483647) location, VARCHAR(2147483647) user_id2)]) + :- Exchange(distribution=[hash[user_id]]) + : +- ChangelogNormalize(key=[user_id]) + : +- Exchange(distribution=[hash[user_id]]) + : +- TableSourceScan(table=[[default_catalog, default_database, Users]], fields=[user_id, name, cash]) + :- Exchange(distribution=[hash[user_id]]) + : +- TableSourceScan(table=[[default_catalog, default_database, Orders, project=[order_id, user_id], metadata=[]]], fields=[order_id, user_id]) + :- Exchange(distribution=[hash[user_id]]) + : +- TableSourceScan(table=[[default_catalog, default_database, Payments]], fields=[payment_id, price, user_id]) + +- Exchange(distribution=[hash[user_id]]) + +- TableSourceScan(table=[[default_catalog, default_database, Shipments]], fields=[location, user_id]) == Optimized Execution Plan == -Calc(select=[user_id_0, name, order_id, payment_id, location]) -+- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[LEFT, INNER, LEFT], inputUniqueKeys=[(user_id_0), (order_id), (payment_id), noUniqueKey], joinConditions=[(user_id_0 = user_id_1), ((user_id_0 = user_id_2) AND ((cash >= price) OR (price < 0))), (user_id_2 = user_id_3)], select=[user_id_0,name,cash,order_id,user_id_1,payment_id,price,user_id_2,location,user_id_3], rowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) name, INTEGER cash, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, VARCHAR(2147483647) payment_id, INTEGER price, VARCHAR(2147483647) user_id_2, VARCHAR(2147483647) location, VARCHAR(2147483647) user_id_3)]) - :- Exchange(distribution=[hash[user_id_0]]) - : +- ChangelogNormalize(key=[user_id_0]) - : +- Exchange(distribution=[hash[user_id_0]]) - : +- TableSourceScan(table=[[default_catalog, default_database, Users]], fields=[user_id_0, name, cash]) - :- Exchange(distribution=[hash[user_id_1]]) - : +- TableSourceScan(table=[[default_catalog, default_database, Orders, project=[order_id, user_id_1], metadata=[]]], fields=[order_id, user_id_1]) - :- Exchange(distribution=[hash[user_id_2]]) - : +- TableSourceScan(table=[[default_catalog, default_database, Payments]], fields=[payment_id, price, user_id_2]) - +- Exchange(distribution=[hash[user_id_3]]) - +- TableSourceScan(table=[[default_catalog, default_database, Shipments]], fields=[location, user_id_3]) +Calc(select=[user_id, name, order_id, payment_id, location]) ++- MultiJoin(commonJoinKey=[user_id], joinTypes=[LEFT, INNER, LEFT], inputUniqueKeys=[(user_id), (order_id), (payment_id), noUniqueKey], joinConditions=[(user_id = user_id0), ((user_id = user_id1) AND ((cash >= price) OR (price < 0))), (user_id1 = user_id2)], select=[user_id,name,cash,order_id,user_id0,payment_id,price,user_id1,location,user_id2], rowType=[RecordType(VARCHAR(2147483647) user_id, VARCHAR(2147483647) name, INTEGER cash, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id0, VARCHAR(2147483647) payment_id, INTEGER price, VARCHAR(2147483647) user_id1, VARCHAR(2147483647) location, VARCHAR(2147483647) user_id2)]) + :- Exchange(distribution=[hash[user_id]]) + : +- ChangelogNormalize(key=[user_id]) + : +- Exchange(distribution=[hash[user_id]]) + : +- TableSourceScan(table=[[default_catalog, default_database, Users]], fields=[user_id, name, cash]) + :- Exchange(distribution=[hash[user_id]]) + : +- TableSourceScan(table=[[default_catalog, default_database, Orders, project=[order_id, user_id], metadata=[]]], fields=[order_id, user_id]) + :- Exchange(distribution=[hash[user_id]]) + : +- TableSourceScan(table=[[default_catalog, default_database, Payments]], fields=[payment_id, price, user_id]) + +- Exchange(distribution=[hash[user_id]]) + +- TableSourceScan(table=[[default_catalog, default_database, Shipments]], fields=[location, user_id]) ]]> - = p.price OR p.price < 0) LEFT JOIN Shipments s ON p.user_id_2 = s.user_id_3]]> + = p.price OR p.price < 0) LEFT JOIN Shipments s ON p.user_id = s.user_id]]> =($2, $7), <($7, 0)))], joinType=[inner]) : :- LogicalJoin(condition=[=($0, $4)], joinType=[left]) @@ -109,64 +494,73 @@ LogicalProject(user_id_0=[$0], name=[$1], order_id=[$3], payment_id=[$6], locati =(cash, price), <(price, 0))), =(user_id_2, user_id_3)], select=[user_id_0,name,cash,order_id,user_id_1,payment_id,price,user_id_2,location,user_id_3], rowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) name, INTEGER cash, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, VARCHAR(2147483647) payment_id, INTEGER price, VARCHAR(2147483647) user_id_2, VARCHAR(2147483647) location, VARCHAR(2147483647) user_id_3)]) - :- Exchange(distribution=[hash[user_id_0]]) - : +- ChangelogNormalize(key=[user_id_0]) - : +- Exchange(distribution=[hash[user_id_0]]) - : +- TableSourceScan(table=[[default_catalog, default_database, Users]], fields=[user_id_0, name, cash]) - :- Exchange(distribution=[hash[user_id_1]]) - : +- TableSourceScan(table=[[default_catalog, default_database, Orders, project=[order_id, user_id_1], metadata=[]]], fields=[order_id, user_id_1]) - :- Exchange(distribution=[hash[user_id_2]]) - : +- TableSourceScan(table=[[default_catalog, default_database, Payments]], fields=[payment_id, price, user_id_2]) - +- Exchange(distribution=[hash[user_id_3]]) - +- TableSourceScan(table=[[default_catalog, default_database, Shipments]], fields=[location, user_id_3]) +Calc(select=[user_id, name, order_id, payment_id, location]) ++- MultiJoin(commonJoinKey=[user_id], joinTypes=[LEFT, INNER, LEFT], inputUniqueKeys=[(user_id), (order_id), (payment_id), noUniqueKey], joinConditions=[=(user_id, user_id0), AND(=(user_id, user_id1), OR(>=(cash, price), <(price, 0))), =(user_id1, user_id2)], select=[user_id,name,cash,order_id,user_id0,payment_id,price,user_id1,location,user_id2], rowType=[RecordType(VARCHAR(2147483647) user_id, VARCHAR(2147483647) name, INTEGER cash, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id0, VARCHAR(2147483647) payment_id, INTEGER price, VARCHAR(2147483647) user_id1, VARCHAR(2147483647) location, VARCHAR(2147483647) user_id2)]) + :- Exchange(distribution=[hash[user_id]]) + : +- ChangelogNormalize(key=[user_id]) + : +- Exchange(distribution=[hash[user_id]]) + : +- TableSourceScan(table=[[default_catalog, default_database, Users]], fields=[user_id, name, cash]) + :- Exchange(distribution=[hash[user_id]]) + : +- TableSourceScan(table=[[default_catalog, default_database, Orders, project=[order_id, user_id], metadata=[]]], fields=[order_id, user_id]) + :- Exchange(distribution=[hash[user_id]]) + : +- TableSourceScan(table=[[default_catalog, default_database, Payments]], fields=[payment_id, price, user_id]) + +- Exchange(distribution=[hash[user_id]]) + +- TableSourceScan(table=[[default_catalog, default_database, Shipments]], fields=[location, user_id]) ]]> - + - + - + + + + + + 1000 THEN 'High' WHEN p.price > 500 THEN 'Medium' ELSE 'Low' END as price_tier, REGEXP_REPLACE(pd.tags, ',', ' | ') as formatted_tags, TO_TIMESTAMP_LTZ(pd.created_date, 3) as product_created, COALESCE(up.preferred_category, 'None') as user_preference, CASE WHEN up.notification_level = 'HIGH' THEN 'Frequent Updates' WHEN up.notification_level = 'MEDIUM' THEN 'Daily Updates' ELSE 'Weekly Updates' END as notification_frequency FROM Users u LEFT JOIN Orders o ON u.user_id = o.user_id LEFT JOIN Payments p ON u.user_id = p.user_id LEFT JOIN ProductDetails pd ON o.product = pd.product_id LEFT JOIN UserPreferences up ON u.user_id = up.user_id]]> + + + ($7, 1000), _UTF-16LE'High':VARCHAR(6) CHARACTER SET "UTF-16LE", >($7, 500), _UTF-16LE'Medium':VARCHAR(6) CHARACTER SET "UTF-16LE", _UTF-16LE'Low':VARCHAR(6) CHARACTER SET "UTF-16LE")], formatted_tags=[REGEXP_REPLACE($13, _UTF-16LE',', _UTF-16LE' | ')], product_created=[TO_TIMESTAMP_LTZ($12, 3)], user_preference=[COALESCE($15, _UTF-16LE'None')], notification_frequency=[CASE(=($16, _UTF-16LE'HIGH'), _UTF-16LE'Frequent Updates':VARCHAR(16) CHARACTER SET "UTF-16LE", =($16, _UTF-16LE'MEDIUM'), _UTF-16LE'Daily Updates':VARCHAR(16) CHARACTER SET "UTF-16LE", _UTF-16LE'Weekly Updates':VARCHAR(16) CHARACTER SET "UTF-16LE")]) ++- LogicalJoin(condition=[=($0, $14)], joinType=[left]) + :- LogicalJoin(condition=[=($5, $9)], joinType=[left]) + : :- LogicalJoin(condition=[=($0, $8)], joinType=[left]) + : : :- LogicalJoin(condition=[=($0, $4)], joinType=[left]) + : : : :- LogicalTableScan(table=[[default_catalog, default_database, Users]]) + : : : +- LogicalTableScan(table=[[default_catalog, default_database, Orders]]) + : : +- LogicalTableScan(table=[[default_catalog, default_database, Payments]]) + : +- LogicalTableScan(table=[[default_catalog, default_database, ProductDetails]]) + +- LogicalTableScan(table=[[default_catalog, default_database, UserPreferences]]) +]]> + + + (price, 1000), 'High', >(price, 500), 'Medium', 'Low') AS price_tier, REGEXP_REPLACE(tags, ',', ' | ') AS formatted_tags, TO_TIMESTAMP_LTZ(created_date, 3) AS product_created, COALESCE(preferred_category, 'None') AS user_preference, CASE(=(notification_level, 'HIGH'), 'Frequent Updates', =(notification_level, 'MEDIUM'), 'Daily Updates', 'Weekly Updates') AS notification_frequency]) ++- MultiJoin(commonJoinKey=[user_id], joinTypes=[LEFT], inputUniqueKeys=[noUniqueKey, (user_id)], joinConditions=[=(user_id, user_id2)], select=[user_id,name,product,price,description,created_date,tags,user_id2,preferred_category,notification_level], rowType=[RecordType(VARCHAR(2147483647) user_id, VARCHAR(2147483647) name, VARCHAR(2147483647) product, INTEGER price, VARCHAR(2147483647) description, BIGINT created_date, VARCHAR(2147483647) tags, VARCHAR(2147483647) user_id2, VARCHAR(2147483647) preferred_category, VARCHAR(2147483647) notification_level)]) + :- Exchange(distribution=[hash[user_id]]) + : +- Calc(select=[user_id, name, product, price, description, created_date, tags]) + : +- MultiJoin(commonJoinKey=[product], joinTypes=[LEFT], inputUniqueKeys=[noUniqueKey, (product_id)], joinConditions=[=(product, product_id)], select=[user_id,name,product,price,product_id,description,created_date,tags], rowType=[RecordType(VARCHAR(2147483647) user_id, VARCHAR(2147483647) name, VARCHAR(2147483647) product, INTEGER price, VARCHAR(2147483647) product_id, VARCHAR(2147483647) description, BIGINT created_date, VARCHAR(2147483647) tags)]) + : :- Exchange(distribution=[hash[product]]) + : : +- Calc(select=[user_id, name, product, price]) + : : +- MultiJoin(commonJoinKey=[user_id], joinTypes=[LEFT, LEFT], inputUniqueKeys=[(user_id), noUniqueKey, noUniqueKey], joinConditions=[=(user_id, user_id0), =(user_id, user_id1)], select=[user_id,name,user_id0,product,price,user_id1], rowType=[RecordType(VARCHAR(2147483647) user_id, VARCHAR(2147483647) name, VARCHAR(2147483647) user_id0, VARCHAR(2147483647) product, INTEGER price, VARCHAR(2147483647) user_id1)]) + : : :- Exchange(distribution=[hash[user_id]]) + : : : +- ChangelogNormalize(key=[user_id]) + : : : +- Exchange(distribution=[hash[user_id]]) + : : : +- TableSourceScan(table=[[default_catalog, default_database, Users, project=[user_id, name], metadata=[]]], fields=[user_id, name]) + : : :- Exchange(distribution=[hash[user_id]]) + : : : +- TableSourceScan(table=[[default_catalog, default_database, Orders, project=[user_id, product], metadata=[]]], fields=[user_id, product]) + : : +- Exchange(distribution=[hash[user_id]]) + : : +- TableSourceScan(table=[[default_catalog, default_database, Payments, project=[price, user_id], metadata=[]]], fields=[price, user_id]) + : +- Exchange(distribution=[hash[product_id]]) + : +- ChangelogNormalize(key=[product_id]) + : +- Exchange(distribution=[hash[product_id]]) + : +- TableSourceScan(table=[[default_catalog, default_database, ProductDetails, project=[product_id, description, created_date, tags], metadata=[]]], fields=[product_id, description, created_date, tags]) + +- Exchange(distribution=[hash[user_id]]) + +- ChangelogNormalize(key=[user_id]) + +- Exchange(distribution=[hash[user_id]]) + +- TableSourceScan(table=[[default_catalog, default_database, UserPreferences]], fields=[user_id, preferred_category, notification_level]) +]]> + + + + + = 4 THEN 'High Rating' WHEN r.rating >= 3 THEN 'Medium Rating' ELSE 'Low Rating' END AS rating_category, TIMESTAMPDIFF(DAY, pd.created_date, CURRENT_DATE) AS days_since_created FROM Users u LEFT JOIN Orders o ON u.user_id = o.user_id LEFT JOIN ProductDetails pd ON o.product = pd.product_id LEFT JOIN Reviews r ON pd.product_id = r.product_id]]> + + + =($13, 4), _UTF-16LE'High Rating':VARCHAR(13) CHARACTER SET "UTF-16LE", >=($13, 3), _UTF-16LE'Medium Rating':VARCHAR(13) CHARACTER SET "UTF-16LE", _UTF-16LE'Low Rating':VARCHAR(13) CHARACTER SET "UTF-16LE")], days_since_created=[CAST(/INT(Reinterpret(-(CURRENT_DATE, $10)), 86400000)):INTEGER]) ++- LogicalJoin(condition=[=($6, $12)], joinType=[left]) + :- LogicalJoin(condition=[=($5, $6)], joinType=[left]) + : :- LogicalJoin(condition=[=($0, $4)], joinType=[left]) + : : :- LogicalTableScan(table=[[default_catalog, default_database, Users]]) + : : +- LogicalTableScan(table=[[default_catalog, default_database, Orders]]) + : +- LogicalTableScan(table=[[default_catalog, default_database, ProductDetails]]) + +- LogicalTableScan(table=[[default_catalog, default_database, Reviews]]) +]]> + + + =(rating, 4), 'High Rating', >=(rating, 3), 'Medium Rating', 'Low Rating') AS rating_category, CAST(/INT(Reinterpret(-(CURRENT_DATE(), created_date)), 86400000) AS INTEGER) AS days_since_created]) ++- MultiJoin(commonJoinKey=[product], joinTypes=[LEFT, LEFT], inputUniqueKeys=[noUniqueKey, (product_id), noUniqueKey], joinConditions=[=(product, product_id), =(product_id, product_id0)], select=[user_id,name,order_id,product,product_id,product_name,price,created_date,product_id0,rating,review_text], rowType=[RecordType(VARCHAR(2147483647) user_id, VARCHAR(2147483647) name, VARCHAR(2147483647) order_id, VARCHAR(2147483647) product, VARCHAR(2147483647) product_id, VARCHAR(2147483647) product_name, DOUBLE price, DATE created_date, VARCHAR(2147483647) product_id0, INTEGER rating, VARCHAR(2147483647) review_text)]) + :- Exchange(distribution=[hash[product]]) + : +- Calc(select=[user_id, name, order_id, product]) + : +- MultiJoin(commonJoinKey=[user_id], joinTypes=[LEFT], inputUniqueKeys=[(user_id), (order_id)], joinConditions=[=(user_id, user_id0)], select=[user_id,name,order_id,user_id0,product], rowType=[RecordType(VARCHAR(2147483647) user_id, VARCHAR(2147483647) name, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id0, VARCHAR(2147483647) product)]) + : :- Exchange(distribution=[hash[user_id]]) + : : +- ChangelogNormalize(key=[user_id]) + : : +- Exchange(distribution=[hash[user_id]]) + : : +- TableSourceScan(table=[[default_catalog, default_database, Users, project=[user_id, name], metadata=[]]], fields=[user_id, name]) + : +- Exchange(distribution=[hash[user_id]]) + : +- TableSourceScan(table=[[default_catalog, default_database, Orders]], fields=[order_id, user_id, product]) + :- Exchange(distribution=[hash[product_id]]) + : +- ChangelogNormalize(key=[product_id]) + : +- Exchange(distribution=[hash[product_id]]) + : +- TableSourceScan(table=[[default_catalog, default_database, ProductDetails, project=[product_id, product_name, price, created_date], metadata=[]]], fields=[product_id, product_name, price, created_date]) + +- Exchange(distribution=[hash[product_id]]) + +- Calc(select=[product_id, rating, review_text]) + +- ChangelogNormalize(key=[review_id]) + +- Exchange(distribution=[hash[review_id]]) + +- TableSourceScan(table=[[default_catalog, default_database, Reviews, project=[product_id, rating, review_text, review_id], metadata=[]]], fields=[product_id, rating, review_text, review_id]) ]]> - + =($2, -($5, 60000:INTERVAL MINUTE)), <=($2, +($5, 60000:INTERVAL MINUTE)))], joinType=[inner]) - :- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($2, 5000:INTERVAL SECOND)]) + :- LogicalWatermarkAssigner(rowtime=[$rowtime], watermark=[-($2, 5000:INTERVAL SECOND)]) : +- LogicalTableScan(table=[[default_catalog, default_database, EventTable1]]) - +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($2, 5000:INTERVAL SECOND)]) + +- LogicalWatermarkAssigner(rowtime=[$rowtime], watermark=[-($2, 5000:INTERVAL SECOND)]) +- LogicalTableScan(table=[[default_catalog, default_database, EventTable2]]) ]]> =(rowtime, -(rowtime0, 60000:INTERVAL MINUTE)), <=(rowtime, +(rowtime0, 60000:INTERVAL MINUTE)))], select=[id, val, rowtime, id0, price, rowtime0]) ++- IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true, leftLowerBound=-60000, leftUpperBound=60000, leftTimeIndex=2, rightTimeIndex=2], where=[AND(=(id, id0), >=($rowtime, -($rowtime0, 60000:INTERVAL MINUTE)), <=($rowtime, +($rowtime0, 60000:INTERVAL MINUTE)))], select=[id, val, $rowtime, id0, price, $rowtime0]) :- Exchange(distribution=[hash[id]]) - : +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 5000:INTERVAL SECOND)]) - : +- TableSourceScan(table=[[default_catalog, default_database, EventTable1]], fields=[id, val, rowtime]) + : +- WatermarkAssigner(rowtime=[$rowtime], watermark=[-($rowtime, 5000:INTERVAL SECOND)]) + : +- TableSourceScan(table=[[default_catalog, default_database, EventTable1]], fields=[id, val, $rowtime]) +- Exchange(distribution=[hash[id]]) - +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 5000:INTERVAL SECOND)]) - +- TableSourceScan(table=[[default_catalog, default_database, EventTable2]], fields=[id, price, rowtime]) + +- WatermarkAssigner(rowtime=[$rowtime], watermark=[-($rowtime, 5000:INTERVAL SECOND)]) + +- TableSourceScan(table=[[default_catalog, default_database, EventTable2]], fields=[id, price, $rowtime]) ]]> @@ -324,35 +806,6 @@ Sink(table=[default_catalog.default_database.sink_four_way], fields=[user_id, or +- ChangelogNormalize(key=[user_id], condition=[IS NOT NULL(location)]) +- Exchange(distribution=[hash[user_id]]) +- TableSourceScan(table=[[default_catalog, default_database, AddressPK, filter=[]]], fields=[user_id, location]) -]]> - - - - - - - - - - - @@ -437,11 +890,11 @@ Sink(table=[default_catalog.default_database.sink_two_way], fields=[user_id, ord - + - + @@ -526,11 +979,11 @@ Calc(select=[user_id, amount, name, age]) - + - + - + - + - - - - - - + - + - + - + - + - + - 10]]> + ($7, 10))]) - +- LogicalJoin(condition=[=($0, $8)], joinType=[left]) - :- LogicalJoin(condition=[=($0, $4)], joinType=[left]) - : :- LogicalTableScan(table=[[default_catalog, default_database, Users]]) - : +- LogicalTableScan(table=[[default_catalog, default_database, Orders]]) - +- LogicalTableScan(table=[[default_catalog, default_database, Payments]]) +LogicalProject(user_id=[$0], name=[$1], order_id=[$3], payment_id=[$6]) ++- LogicalJoin(condition=[=($0, $8)], joinType=[left]) + :- LogicalJoin(condition=[=($0, $4)], joinType=[left]) + : :- LogicalTableScan(table=[[default_catalog, default_database, Users]]) + : +- LogicalTableScan(table=[[default_catalog, default_database, Orders]]) + +- LogicalTableScan(table=[[default_catalog, default_database, Payments]]) ]]> - + (price, 10)]) - +- TableSourceScan(table=[[default_catalog, default_database, Payments, filter=[]]], fields=[payment_id, price, user_id_2]) +Calc(select=[user_id, name, order_id, payment_id]) ++- MultiJoin(commonJoinKey=[user_id], joinTypes=[LEFT, LEFT], inputUniqueKeys=[(user_id), (order_id), (payment_id)], joinConditions=[(user_id = user_id0), (user_id = user_id1)], select=[user_id,name,order_id,user_id0,payment_id,user_id1], rowType=[RecordType(VARCHAR(2147483647) user_id, VARCHAR(2147483647) name, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id0, VARCHAR(2147483647) payment_id, VARCHAR(2147483647) user_id1)]) + :- Exchange(distribution=[hash[user_id]]) + : +- TableSourceScan(table=[[default_catalog, default_database, Users, project=[user_id, name], metadata=[]]], fields=[user_id, name]) + :- Exchange(distribution=[hash[user_id]]) + : +- TableSourceScan(table=[[default_catalog, default_database, Orders, project=[order_id, user_id], metadata=[]]], fields=[order_id, user_id]) + +- Exchange(distribution=[hash[user_id]]) + +- TableSourceScan(table=[[default_catalog, default_database, Payments, project=[payment_id, user_id], metadata=[]]], fields=[payment_id, user_id]) ]]> - - - - - - + + - + + + + + + + + + - + - + 10]]> ($7, 10))]) + +- LogicalJoin(condition=[=($0, $8)], joinType=[left]) + :- LogicalJoin(condition=[=($0, $4)], joinType=[left]) + : :- LogicalTableScan(table=[[default_catalog, default_database, Users]]) + : +- LogicalTableScan(table=[[default_catalog, default_database, Orders]]) + +- LogicalTableScan(table=[[default_catalog, default_database, Payments]]) ]]> - + 10)]) + +- TableSourceScan(table=[[default_catalog, default_database, Payments, filter=[]]], fields=[payment_id, price, user_id]) ]]> ($7, 10))]) +- LogicalJoin(condition=[=($0, $8)], joinType=[left]) :- LogicalJoin(condition=[=($0, $4)], joinType=[left]) @@ -888,42 +1342,42 @@ LogicalProject(user_id_0=[$0], name=[$1], order_id=[$3], payment_id=[$6]) +- LogicalTableScan(table=[[default_catalog, default_database, Payments]]) == Optimized Physical Plan == -Calc(select=[user_id_0, CAST(_UTF-16LE'Gus':VARCHAR(2147483647) CHARACTER SET "UTF-16LE" AS VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS name, order_id, CAST(payment_id AS VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS payment_id]) -+- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[LEFT, INNER], inputUniqueKeys=[(user_id_0), (order_id), (payment_id)], joinConditions=[=(user_id_0, user_id_1), =(user_id_0, user_id_2)], select=[user_id_0,order_id,user_id_1,payment_id,user_id_2], rowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, VARCHAR(2147483647) payment_id, VARCHAR(2147483647) user_id_2)]) - :- Exchange(distribution=[hash[user_id_0]]) - : +- Calc(select=[user_id_0]) - : +- ChangelogNormalize(key=[user_id_0], condition=[=(name, _UTF-16LE'Gus':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")]) - : +- Exchange(distribution=[hash[user_id_0]]) - : +- TableSourceScan(table=[[default_catalog, default_database, Users, filter=[], project=[user_id_0, name], metadata=[]]], fields=[user_id_0, name]) - :- Exchange(distribution=[hash[user_id_1]]) - : +- TableSourceScan(table=[[default_catalog, default_database, Orders, project=[order_id, user_id_1], metadata=[]]], fields=[order_id, user_id_1]) - +- Exchange(distribution=[hash[user_id_2]]) - +- Calc(select=[payment_id, user_id_2], where=[>(price, 10)]) - +- TableSourceScan(table=[[default_catalog, default_database, Payments, filter=[]]], fields=[payment_id, price, user_id_2]) +Calc(select=[user_id, CAST(_UTF-16LE'Gus':VARCHAR(2147483647) CHARACTER SET "UTF-16LE" AS VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS name, order_id, CAST(payment_id AS VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS payment_id]) ++- MultiJoin(commonJoinKey=[user_id], joinTypes=[LEFT, INNER], inputUniqueKeys=[(user_id), (order_id), (payment_id)], joinConditions=[=(user_id, user_id0), =(user_id, user_id1)], select=[user_id,order_id,user_id0,payment_id,user_id1], rowType=[RecordType(VARCHAR(2147483647) user_id, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id0, VARCHAR(2147483647) payment_id, VARCHAR(2147483647) user_id1)]) + :- Exchange(distribution=[hash[user_id]]) + : +- Calc(select=[user_id]) + : +- ChangelogNormalize(key=[user_id], condition=[=(name, _UTF-16LE'Gus':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")]) + : +- Exchange(distribution=[hash[user_id]]) + : +- TableSourceScan(table=[[default_catalog, default_database, Users, filter=[], project=[user_id, name], metadata=[]]], fields=[user_id, name]) + :- Exchange(distribution=[hash[user_id]]) + : +- TableSourceScan(table=[[default_catalog, default_database, Orders, project=[order_id, user_id], metadata=[]]], fields=[order_id, user_id]) + +- Exchange(distribution=[hash[user_id]]) + +- Calc(select=[payment_id, user_id], where=[>(price, 10)]) + +- TableSourceScan(table=[[default_catalog, default_database, Payments, filter=[]]], fields=[payment_id, price, user_id]) == Optimized Execution Plan == -Calc(select=[user_id_0, CAST('Gus' AS VARCHAR(2147483647)) AS name, order_id, CAST(payment_id AS VARCHAR(2147483647)) AS payment_id]) -+- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[LEFT, INNER], inputUniqueKeys=[(user_id_0), (order_id), (payment_id)], joinConditions=[(user_id_0 = user_id_1), (user_id_0 = user_id_2)], select=[user_id_0,order_id,user_id_1,payment_id,user_id_2], rowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, VARCHAR(2147483647) payment_id, VARCHAR(2147483647) user_id_2)]) - :- Exchange(distribution=[hash[user_id_0]]) - : +- Calc(select=[user_id_0]) - : +- ChangelogNormalize(key=[user_id_0], condition=[(name = 'Gus')]) - : +- Exchange(distribution=[hash[user_id_0]]) - : +- TableSourceScan(table=[[default_catalog, default_database, Users, filter=[], project=[user_id_0, name], metadata=[]]], fields=[user_id_0, name]) - :- Exchange(distribution=[hash[user_id_1]]) - : +- TableSourceScan(table=[[default_catalog, default_database, Orders, project=[order_id, user_id_1], metadata=[]]], fields=[order_id, user_id_1]) - +- Exchange(distribution=[hash[user_id_2]]) - +- Calc(select=[payment_id, user_id_2], where=[(price > 10)]) - +- TableSourceScan(table=[[default_catalog, default_database, Payments, filter=[]]], fields=[payment_id, price, user_id_2]) +Calc(select=[user_id, CAST('Gus' AS VARCHAR(2147483647)) AS name, order_id, CAST(payment_id AS VARCHAR(2147483647)) AS payment_id]) ++- MultiJoin(commonJoinKey=[user_id], joinTypes=[LEFT, INNER], inputUniqueKeys=[(user_id), (order_id), (payment_id)], joinConditions=[(user_id = user_id0), (user_id = user_id1)], select=[user_id,order_id,user_id0,payment_id,user_id1], rowType=[RecordType(VARCHAR(2147483647) user_id, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id0, VARCHAR(2147483647) payment_id, VARCHAR(2147483647) user_id1)]) + :- Exchange(distribution=[hash[user_id]]) + : +- Calc(select=[user_id]) + : +- ChangelogNormalize(key=[user_id], condition=[(name = 'Gus')]) + : +- Exchange(distribution=[hash[user_id]]) + : +- TableSourceScan(table=[[default_catalog, default_database, Users, filter=[], project=[user_id, name], metadata=[]]], fields=[user_id, name]) + :- Exchange(distribution=[hash[user_id]]) + : +- TableSourceScan(table=[[default_catalog, default_database, Orders, project=[order_id, user_id], metadata=[]]], fields=[order_id, user_id]) + +- Exchange(distribution=[hash[user_id]]) + +- Calc(select=[payment_id, user_id], where=[(price > 10)]) + +- TableSourceScan(table=[[default_catalog, default_database, Payments, filter=[]]], fields=[payment_id, price, user_id]) ]]> - + - 10]]> + 10]]> ($7, 10))]) +- LogicalJoin(condition=[=($0, $8)], joinType=[left]) :- LogicalJoin(condition=[=($0, $4)], joinType=[left]) @@ -932,82 +1386,81 @@ LogicalProject(user_id_0=[$0], name=[$1], order_id=[$3], payment_id=[$6]) +- LogicalTableScan(table=[[default_catalog, default_database, Payments]]) ]]> - + 10)]) - +- TableSourceScan(table=[[default_catalog, default_database, Payments, filter=[]]], fields=[payment_id, price, user_id_2]) +Calc(select=[user_id, CAST('Gus' AS VARCHAR(2147483647)) AS name, order_id, CAST(payment_id AS VARCHAR(2147483647)) AS payment_id]) ++- MultiJoin(commonJoinKey=[user_id], joinTypes=[LEFT, INNER], inputUniqueKeys=[(user_id), (order_id), (payment_id)], joinConditions=[=(user_id, user_id0), =(user_id, user_id1)], select=[user_id,order_id,user_id0,payment_id,user_id1], rowType=[RecordType(VARCHAR(2147483647) user_id, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id0, VARCHAR(2147483647) payment_id, VARCHAR(2147483647) user_id1)]) + :- Exchange(distribution=[hash[user_id]]) + : +- Calc(select=[user_id]) + : +- ChangelogNormalize(key=[user_id], condition=[=(name, 'Gus')]) + : +- Exchange(distribution=[hash[user_id]]) + : +- TableSourceScan(table=[[default_catalog, default_database, Users, filter=[], project=[user_id, name], metadata=[]]], fields=[user_id, name]) + :- Exchange(distribution=[hash[user_id]]) + : +- TableSourceScan(table=[[default_catalog, default_database, Orders, project=[order_id, user_id], metadata=[]]], fields=[order_id, user_id]) + +- Exchange(distribution=[hash[user_id]]) + +- Calc(select=[payment_id, user_id], where=[>(price, 10)]) + +- TableSourceScan(table=[[default_catalog, default_database, Payments, filter=[]]], fields=[payment_id, price, user_id]) ]]> - + - + - +