Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 27 additions & 6 deletions axiom/optimizer/DerivedTable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -176,11 +176,31 @@ void DerivedTable::setStartTables() {

namespace {
// Returns a right exists (semijoin) with 'table' on the left and one of
// 'tables' on the right.
JoinEdgeP makeExists(PlanObjectCP table, const PlanObjectSet& tables) {
for (auto join : joinedBy(table)) {
// 'tables' on the right. Picks a table on the right that is not a right side of
// exists inside 'super'.
JoinEdgeP makeExists(
PlanObjectCP table,
const PlanObjectSet& tables,
const DerivedTable& super) {
// True if 'table' is not to the right of some non-commutative join in
// 'super'.
auto isJoinable = [&](PlanObjectCP table) -> bool {
for (auto& join : super.joins) {
if ((join->rightExists() || join->rightNotExists()) &&
join->rightTable() == table) {
return false;
}
}
return true;
};

for (auto join : optimizer::joinedBy(table)) {
if (!join->isInner()) {
continue;
}
if (join->leftTable() == table) {
if (!tables.contains(join->rightTable())) {
if (!tables.contains(join->rightTable()) ||
!isJoinable(join->rightTable())) {
continue;
}
auto* exists = JoinEdge::makeExists(table, join->rightTable());
Expand All @@ -191,7 +211,8 @@ JoinEdgeP makeExists(PlanObjectCP table, const PlanObjectSet& tables) {
}

if (join->rightTable() == table) {
if (!join->leftTable() || !tables.contains(join->leftTable())) {
if (!join->leftTable() || !tables.contains(join->leftTable()) ||
!isJoinable(join->leftTable())) {
continue;
}

Expand Down Expand Up @@ -323,7 +344,7 @@ void DerivedTable::import(
// with exists to the main table(s) in the 'this'.
importedExistences.unionSet(exists);
auto existsTables = exists.toObjects();
auto existsJoin = makeExists(firstTable, exists);
auto existsJoin = makeExists(firstTable, exists, super);
if (existsTables.size() > 1) {
// There is a join on the right of exists. Needs its own dt.
auto [existsDt, joinWithDt] = makeExistsDtAndJoin(
Expand Down
2 changes: 1 addition & 1 deletion axiom/optimizer/Optimization.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1013,6 +1013,7 @@ void Optimization::joinByIndex(
}
}


void Optimization::joinByHash(
const RelationOpPtr& plan,
const JoinCandidate& candidate,
Expand Down Expand Up @@ -1164,7 +1165,6 @@ void Optimization::joinByHash(
candidate.join->filter(),
fanout,
std::move(columns));

state.addCost(*join);
state.cost.setupCost += buildState.cost.unitCost + buildState.cost.setupCost;
state.cost.totalBytes += buildState.cost.totalBytes;
Expand Down
67 changes: 60 additions & 7 deletions axiom/optimizer/Plan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,28 @@ const PlanObjectSet& PlanState::downstreamColumns() const {

// Joins.
for (auto join : dt->joins) {
if (join->rightExists() || join->rightNotExists()) {
if (placed.contains(join->rightTable())) {
continue;
}
// For an unplaced exists/not exists downstream, we need the left side columns but not the right side since nothing is projected out from the right side.
addExprs(join->leftKeys());

if (!join->filter().empty()) {
// If there is a filter, then the filter columns that do not come from the right side are needed.
PlanObjectSet filterColumns;
for (auto& conjunct : join->filter()) {
filterColumns.unionColumns(conjunct);
}
PlanObjectSet rightColumns;
for (auto& column : tableColumns(join->rightTable())) {
rightColumns.add(column);
}
filterColumns.except(rightColumns);
result.unionSet(filterColumns);
}
continue;
}
bool addFilter = false;
if (!placed.contains(join->rightTable())) {
addFilter = true;
Expand Down Expand Up @@ -401,6 +423,22 @@ const JoinEdgeVector& joinedBy(PlanObjectCP table) {
return table->as<DerivedTable>()->joinedBy;
}

const ColumnVector& tableColumns(PlanObjectCP table) {
if (table->is(PlanType::kTableNode)) {
return table->as<BaseTable>()->columns;
}

if (table->is(PlanType::kDerivedTableNode)) {
return table->as<DerivedTable>()->columns;
}

if (table->is(PlanType::kValuesTableNode)) {
return table->as<ValuesTable>()->columns;
}

VELOX_FAIL("tableColumns expects BaseTable, DerivedTable, or ValuesTable");
}

std::pair<JoinSide, JoinSide> JoinCandidate::joinSides() const {
return {join->sideOf(tables[0], false), join->sideOf(tables[0], true)};
}
Expand Down Expand Up @@ -430,10 +468,24 @@ void JoinCandidate::addEdge(PlanState& state, JoinEdgeP edge) {
for (auto i = 0; i < newPlacedSide.keys.size(); ++i) {
auto* key = newPlacedSide.keys[i];
if (!hasEqual(key, tableSide.keys)) {
// Make sure to create hyper edge.
if (!compositeEdge) {
compositeEdge = make<JoinEdge>(*join);
compositeEdge = JoinEdge::makeInner(nullptr, joined);
if (joined == join->rightTable()) {
for (auto i = 0; i < join->numKeys(); ++i) {
compositeEdge->addEquality(
Copy link
Collaborator

@MBkkt MBkkt Oct 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we check we don't make edges with a lot of duplicates?
When I run tpch q9 I noticed there was duplicates in join keys (in velox execution plan)

join->leftKeys()[i], join->rightKeys()[i]);
}
} else {
for (auto i = 0; i < join->numKeys(); ++i) {
compositeEdge->addEquality(
join->rightKeys()[i], join->leftKeys()[i]);
}
}

join = compositeEdge;
}

auto [other, preFanout] = join->otherTable(placedSide.table);
// do not recompute a fanout after adding more equalities. This makes the
// join edge non-binary and it cannot be sampled.
Expand Down Expand Up @@ -488,12 +540,13 @@ std::string JoinCandidate::toString() const {
}

bool NextJoin::isWorse(const NextJoin& other) const {
float shuffle =
plan->distribution().isSamePartition(other.plan->distribution())
? 0
: plan->cost().fanout * shuffleCost(plan->columns());
return cost.unitCost + cost.setupCost + shuffle >
other.cost.unitCost + other.cost.setupCost;
return false;
// float shuffle =
// plan->distribution().isSamePartition(other.plan->distribution())
// ? 0
// : plan->cost().fanout * shuffleCost(plan->columns());
// return cost.unitCost + cost.setupCost + shuffle >
// other.cost.unitCost + other.cost.setupCost;
}

size_t MemoKey::hash() const {
Expand Down
3 changes: 3 additions & 0 deletions axiom/optimizer/Plan.h
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,9 @@ namespace facebook::axiom::optimizer {

const JoinEdgeVector& joinedBy(PlanObjectCP table);

/// Returns the columns of a BaseTable, DerivedTable, or ValuesTable.
const ColumnVector& tableColumns(PlanObjectCP table);

/// Returns the inverse join type, e.g. right outer from left outer.
/// TODO Move this function to Velox.
velox::core::JoinType reverseJoinType(velox::core::JoinType joinType);
Expand Down
10 changes: 9 additions & 1 deletion axiom/optimizer/QueryGraph.h
Original file line number Diff line number Diff line change
Expand Up @@ -552,6 +552,14 @@ class JoinEdge {
return rightOptional_;
}

bool rightExists() const {
return rightExists_;
}

bool rightNotExists() const {
return rightNotExists_;
}

bool directed() const {
return directed_;
}
Expand Down Expand Up @@ -580,7 +588,7 @@ class JoinEdge {
return false;
}

return !leftTable_ || rightOptional_ || leftOptional_ || rightExists_ ||
return rightOptional_ || leftOptional_ || rightExists_ ||
rightNotExists_ || markColumn_ || directed_;
}

Expand Down
5 changes: 5 additions & 0 deletions axiom/optimizer/ToVelox.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "velox/expression/ExprToSubfieldFilter.h"
#include "velox/expression/ScopedVarSetter.h"
#include "velox/vector/VariantToVector.h"
#include "axiom/optimizer/DerivedTablePrinter.h"

namespace facebook::axiom::optimizer {

Expand Down Expand Up @@ -1526,4 +1527,8 @@ extern std::string planString(const runner::MultiFragmentPlan* plan) {
return plan->toString(true);
}

extern std::string dtString(const DerivedTable* dt) {
return DerivedTablePrinter::toText(*dt);
}

} // namespace facebook::axiom::optimizer
Loading