Skip to content

Commit 8250415

Browse files
committed
fix: Fix distribution, order, layouts and plan enumeration
1 parent fb1ab08 commit 8250415

File tree

12 files changed

+304
-347
lines changed

12 files changed

+304
-347
lines changed

axiom/optimizer/JoinSample.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -219,8 +219,8 @@ std::pair<float, float> sampleJoin(
219219
const ExprVector& leftKeys,
220220
SchemaTableCP right,
221221
const ExprVector& rightKeys) {
222-
const uint64_t leftRows = left->numRows();
223-
const uint64_t rightRows = right->numRows();
222+
const auto leftRows = left->cardinality;
223+
const auto rightRows = right->cardinality;
224224

225225
const auto leftCard = keyCardinality(leftKeys);
226226
const auto rightCard = keyCardinality(rightKeys);

axiom/optimizer/Optimization.cpp

Lines changed: 28 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -427,10 +427,9 @@ RelationOpPtr repartitionForAgg(const RelationOpPtr& plan, PlanState& state) {
427427

428428
const auto* agg = state.dt->aggregation;
429429

430-
// If no grouping and not yet gathered on a single node, add a gather before
431-
// final agg.
432-
if (agg->groupingKeys().empty() &&
433-
!plan->distribution().distributionType.isGather) {
430+
// If no grouping and not yet gathered on a single node,
431+
// add a gather before final agg.
432+
if (agg->groupingKeys().empty()) {
434433
auto* gather =
435434
make<Repartition>(plan, Distribution::gather(), plan->columns());
436435
state.addCost(*gather);
@@ -478,33 +477,20 @@ bool isIndexColocated(
478477
const IndexInfo& info,
479478
const ExprVector& lookupValues,
480479
const RelationOpPtr& input) {
481-
const auto& distribution = info.index->distribution;
482-
if (distribution.isBroadcast) {
483-
return true;
484-
}
485-
480+
const auto& desired = info.index->distribution;
481+
if (const auto needsShuffle =
482+
input->distribution().maybeNeedsShuffle(desired)) {
483+
return !*needsShuffle;
484+
}
485+
// TODO: Future partition type should be accounted.
486+
// TODO: This code actually doesn't feel right.
487+
// We should check it when we will add indexes.
486488
// True if 'input' is partitioned so that each partitioning key is joined to
487489
// the corresponding partition key in 'info'.
488-
if (input->distribution().distributionType != distribution.distributionType) {
489-
return false;
490-
}
491-
492-
if (input->distribution().partition.empty()) {
493-
return false;
494-
}
495-
496-
if (input->distribution().partition.size() != distribution.partition.size()) {
497-
return false;
498-
}
499-
500-
for (auto i = 0; i < input->distribution().partition.size(); ++i) {
490+
for (size_t i = 0; i < input->distribution().partition.size(); ++i) {
501491
auto nthKey = position(lookupValues, *input->distribution().partition[i]);
502-
if (nthKey != kNotFound) {
503-
if (info.schemaColumn(info.lookupKeys.at(nthKey)) !=
504-
distribution.partition.at(i)) {
505-
return false;
506-
}
507-
} else {
492+
if (nthKey == kNotFound ||
493+
info.schemaColumn(info.lookupKeys[nthKey]) != desired.partition[i]) {
508494
return false;
509495
}
510496
}
@@ -613,7 +599,7 @@ PlanObjectSet availableColumns(BaseTableCP baseTable, ColumnGroupCP index) {
613599
}
614600

615601
bool isBroadcastableSize(PlanP build, PlanState& /*state*/) {
616-
return build->cost.fanout < 100'000;
602+
return build->cost.resultCardinality() < 100'000;
617603
}
618604

619605
// The 'other' side gets shuffled to align with 'input'. If 'input' is not
@@ -1024,9 +1010,7 @@ void Optimization::joinByHash(
10241010
candidate.join->isBroadcastableType() &&
10251011
isBroadcastableSize(buildPlan, state)) {
10261012
auto* broadcast = make<Repartition>(
1027-
buildInput,
1028-
Distribution::broadcast(plan->distribution().distributionType),
1029-
buildInput->columns());
1013+
buildInput, Distribution::broadcast(), buildInput->columns());
10301014
buildState.addCost(*broadcast);
10311015
buildInput = broadcast;
10321016
} else {
@@ -1316,10 +1300,12 @@ void Optimization::addJoin(
13161300

13171301
// If one is much better do not try the other.
13181302
if (toTry.size() == 2 && candidate.tables.size() == 1) {
1303+
VELOX_DCHECK(!options_.syntacticJoinOrder);
13191304
if (toTry[0].isWorse(toTry[1])) {
1320-
toTry.erase(toTry.begin());
1305+
toTry[0] = std::move(toTry[1]);
1306+
toTry.pop_back();
13211307
} else if (toTry[1].isWorse(toTry[0])) {
1322-
toTry.erase(toTry.begin() + 1);
1308+
toTry.pop_back();
13231309
}
13241310
}
13251311
result.insert(result.end(), toTry.begin(), toTry.end());
@@ -1348,7 +1334,7 @@ RelationOpPtr Optimization::placeSingleRowDt(
13481334
memoKey.tables.add(subquery);
13491335
memoKey.columns.unionObjects(subquery->columns);
13501336

1351-
const auto broadcast = Distribution::broadcast({});
1337+
const auto broadcast = Distribution::broadcast();
13521338

13531339
PlanObjectSet empty;
13541340
bool needsShuffle = false;
@@ -1545,11 +1531,7 @@ Distribution somePartition(const RelationOpPtrVector& inputs) {
15451531
}
15461532
}
15471533

1548-
DistributionType distributionType;
1549-
distributionType.numPartitions =
1550-
queryCtx()->optimization()->runnerOptions().numWorkers;
1551-
1552-
return {distributionType, std::move(columns)};
1534+
return {DistributionType{}, std::move(columns)};
15531535
}
15541536

15551537
// Adds the costs in the input states to the first state and if 'distinct' is
@@ -1565,9 +1547,12 @@ PlanP unionPlan(
15651547
for (auto i = 1; i < states.size(); ++i) {
15661548
const auto& otherCost = states[i].cost;
15671549
fullyImported.intersect(inputPlans[i]->fullyImported);
1568-
firstState.cost.add(otherCost);
1569-
// The input cardinality is not additive, the fanout and other metrics are.
1570-
firstState.cost.inputCardinality -= otherCost.inputCardinality;
1550+
// The input cardinality is not additive.
1551+
firstState.cost.setupCost += otherCost.setupCost;
1552+
firstState.cost.unitCost += otherCost.unitCost;
1553+
firstState.cost.fanout += otherCost.fanout;
1554+
firstState.cost.totalBytes += otherCost.totalBytes;
1555+
firstState.cost.transferBytes += otherCost.transferBytes;
15711556
}
15721557
if (distinct) {
15731558
firstState.addCost(*distinct);

axiom/optimizer/Plan.cpp

Lines changed: 75 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -102,12 +102,6 @@ Plan::Plan(RelationOpPtr op, const PlanState& state)
102102
columns(exprColumns(state.targetExprs)),
103103
fullyImported(state.dt->fullyImported) {}
104104

105-
bool Plan::isStateBetter(const PlanState& state, float perRowMargin) const {
106-
return cost.unitCost * cost.inputCardinality + cost.setupCost >
107-
state.cost.unitCost * state.cost.inputCardinality + state.cost.setupCost +
108-
perRowMargin * state.cost.fanout;
109-
}
110-
111105
std::string Plan::printCost() const {
112106
return cost.toString(true, false);
113107
}
@@ -120,9 +114,10 @@ std::string Plan::toString(bool detail) const {
120114
}
121115

122116
void PlanState::addCost(RelationOp& op) {
123-
cost.unitCost += cost.inputCardinality * cost.fanout * op.cost().unitCost;
117+
VELOX_DCHECK_EQ(cost.inputCardinality, 1);
124118
cost.setupCost += op.cost().setupCost;
125-
cost.fanout *= op.cost().fanout;
119+
cost.unitCost += op.cost().unitCost * op.cost().inputCardinality;
120+
cost.fanout = op.cost().resultCardinality();
126121
cost.totalBytes += op.cost().totalBytes;
127122
cost.transferBytes += op.cost().transferBytes;
128123
}
@@ -276,74 +271,76 @@ std::string PlanState::printPlan(RelationOpPtr op, bool detail) const {
276271
}
277272

278273
PlanP PlanSet::addPlan(RelationOpPtr plan, PlanState& state) {
279-
int32_t replaceIndex = -1;
280-
const float shuffleCostPerRow =
281-
shuffleCost(plan->columns()) * state.cost.fanout;
282-
283-
if (!plans.empty()) {
284-
// Compare with existing. If there is one with same distribution and new is
285-
// better, replace. If there is one with a different distribution and the
286-
// new one can produce the same distribution by repartition, for cheaper,
287-
// add the new one and delete the old one.
288-
for (auto i = 0; i < plans.size(); ++i) {
289-
auto old = plans[i].get();
290-
if (state.input != old->input) {
291-
continue;
292-
}
274+
const float shuffleCostPerRow = shuffleCost(plan->columns());
293275

294-
const bool newIsBetter = old->isStateBetter(state);
295-
const bool newIsBetterWithShuffle =
296-
old->isStateBetter(state, shuffleCostPerRow);
297-
const bool sameDist =
298-
old->op->distribution().isSamePartition(plan->distribution());
299-
const bool sameOrder =
300-
old->op->distribution().isSameOrder(plan->distribution());
301-
if (sameDist && sameOrder) {
302-
if (newIsBetter) {
303-
replaceIndex = i;
304-
continue;
305-
}
306-
// There's a better one with same dist and partition.
307-
return nullptr;
308-
}
276+
// Determine is old plan worse the new one in all aspects.
277+
auto isWorse = [&](const Plan& old) {
278+
if (plan->distribution().needsSort(old.op->distribution())) {
279+
// New plan needs a sort to match the old one, so cannot compare.
280+
return false;
281+
}
282+
const bool needsShuffle =
283+
plan->distribution().needsShuffle(old.op->distribution());
284+
return old.cost.cost() >
285+
state.cost.cost(needsShuffle ? shuffleCostPerRow : 0);
286+
};
309287

310-
if (newIsBetterWithShuffle && old->op->distribution().orderKeys.empty()) {
311-
// Old plan has no order and is worse than new plus shuffle. Can't win.
312-
// Erase.
313-
queryCtx()->optimization()->trace(
314-
OptimizerOptions::kExceededBest,
315-
state.dt->id(),
316-
old->cost,
317-
*old->op);
318-
plans.erase(plans.begin() + i);
319-
--i;
320-
continue;
321-
}
288+
// Determine is old plan better than the new one in all aspects.
289+
auto isBetter = [&](const Plan& old) {
290+
if (old.op->distribution().needsSort(plan->distribution())) {
291+
// Old plan needs a sort to match the new one, so cannot compare.
292+
return false;
293+
}
294+
const bool needsShuffle =
295+
old.op->distribution().needsShuffle(plan->distribution());
296+
return state.cost.cost() >
297+
old.cost.cost(needsShuffle ? shuffleCost(old.op->columns()) : 0);
298+
};
322299

323-
if (plan->distribution().orderKeys.empty() &&
324-
!old->isStateBetter(state, -shuffleCostPerRow)) {
325-
// New has no order and old would beat it even after adding shuffle.
326-
return nullptr;
327-
}
300+
// Compare with existing plans.
301+
const auto plansSize = plans.size();
302+
enum {
303+
kFoundWorse = -1,
304+
kNone = 0,
305+
kFoundBetter = 1,
306+
};
307+
auto found = kNone;
308+
for (size_t i = 0; i < plans.size(); ++i) {
309+
const auto& old = *plans[i];
310+
if (old.input != state.input) {
311+
// Different plans, cannot compare.
312+
continue;
328313
}
314+
if (isWorse(old)) {
315+
// Remove old plan, it is worse than the new one in all aspects.
316+
queryCtx()->optimization()->trace(
317+
OptimizerOptions::kExceededBest, state.dt->id(), old.cost, *old.op);
318+
std::swap(plans[i], plans.back());
319+
plans.pop_back();
320+
--i;
321+
found = kFoundWorse;
322+
} else if (found == kNone && isBetter(old)) {
323+
// Old plan is better than the new one in all aspects.
324+
found = kFoundBetter;
325+
}
326+
}
327+
if (found == kFoundBetter) {
328+
// No existing plan was worse than the new one in all aspects,
329+
// and at least one existing plan is better than the new one in all aspects.
330+
// So don't add the new plan.
331+
return nullptr;
329332
}
330333

331334
auto newPlan = std::make_unique<Plan>(std::move(plan), state);
332335
auto* result = newPlan.get();
333-
auto newPlanCost =
334-
result->cost.unitCost + result->cost.setupCost + shuffleCostPerRow;
335-
if (bestCostWithShuffle == 0 || newPlanCost < bestCostWithShuffle) {
336-
bestCostWithShuffle = newPlanCost;
337-
}
338-
if (replaceIndex >= 0) {
339-
plans[replaceIndex] = std::move(newPlan);
340-
} else {
341-
plans.push_back(std::move(newPlan));
342-
}
336+
bestCostWithShuffle =
337+
std::min(bestCostWithShuffle, result->cost.cost(shuffleCostPerRow));
338+
plans.push_back(std::move(newPlan));
343339
return result;
344340
}
345341

346-
PlanP PlanSet::best(const Distribution& distribution, bool& needsShuffle) {
342+
PlanP PlanSet::best(const Distribution& desired, bool& needsShuffle) {
343+
// TODO: Consider desired order here too.
347344
PlanP best = nullptr;
348345
PlanP match = nullptr;
349346
float bestCost = -1;
@@ -352,8 +349,7 @@ PlanP PlanSet::best(const Distribution& distribution, bool& needsShuffle) {
352349
const bool single = isSingleWorker();
353350

354351
for (const auto& plan : plans) {
355-
const float cost =
356-
plan->cost.fanout * plan->cost.unitCost + plan->cost.setupCost;
352+
const float cost = plan->cost.cost();
357353

358354
auto update = [&](PlanP& current, float& currentCost) {
359355
if (!current || cost < currentCost) {
@@ -363,7 +359,7 @@ PlanP PlanSet::best(const Distribution& distribution, bool& needsShuffle) {
363359
};
364360

365361
update(best, bestCost);
366-
if (!single && plan->op->distribution().isSamePartition(distribution)) {
362+
if (!single && !plan->op->distribution().needsShuffle(desired)) {
367363
update(match, matchCost);
368364
}
369365
}
@@ -375,8 +371,9 @@ PlanP PlanSet::best(const Distribution& distribution, bool& needsShuffle) {
375371
}
376372

377373
if (match) {
378-
const float shuffle = shuffleCost(best->op->columns()) * best->cost.fanout;
379-
if (matchCost <= bestCost + shuffle) {
374+
const float bestCostWithShuffle =
375+
best->cost.cost(shuffleCost(best->op->columns()));
376+
if (matchCost <= bestCostWithShuffle) {
380377
return match;
381378
}
382379
}
@@ -489,12 +486,14 @@ std::string JoinCandidate::toString() const {
489486
}
490487

491488
bool NextJoin::isWorse(const NextJoin& other) const {
492-
float shuffle =
493-
plan->distribution().isSamePartition(other.plan->distribution())
494-
? 0
495-
: plan->cost().fanout * shuffleCost(plan->columns());
496-
return cost.unitCost + cost.setupCost + shuffle >
497-
other.cost.unitCost + other.cost.setupCost;
489+
if (other.plan->distribution().needsSort(plan->distribution())) {
490+
// 'other' needs a sort to match 'plan', so cannot compare.
491+
return false;
492+
}
493+
const auto needsShuffle =
494+
other.plan->distribution().needsShuffle(plan->distribution());
495+
return cost.cost() >
496+
other.cost.cost(needsShuffle ? shuffleCost(other.plan->columns()) : 0);
498497
}
499498

500499
size_t MemoKey::hash() const {

axiom/optimizer/Plan.h

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,6 @@ using HashBuildVector = std::vector<HashBuildCP>;
3636
struct Plan {
3737
Plan(RelationOpPtr op, const PlanState& state);
3838

39-
/// True if 'state' has a lower cost than 'this'. If 'perRowMargin' is given,
40-
/// then 'other' must win by margin per row.
41-
bool isStateBetter(const PlanState& state, float perRowMargin = 0) const;
42-
4339
/// Root of the plan tree.
4440
const RelationOpPtr op;
4541

@@ -81,13 +77,13 @@ struct PlanSet {
8177
/// Interesting equivalent plans.
8278
std::vector<std::unique_ptr<Plan>> plans;
8379

84-
/// Cost of lowest-cost plan plus shuffle. If a cutoff is applicable, nothing
85-
/// more expensive than this should be tried.
86-
float bestCostWithShuffle{0};
80+
/// Cost of lowest-cost plan plus shuffle. If a cutoff is applicable,
81+
/// nothing more expensive than this should be tried.
82+
float bestCostWithShuffle{std::numeric_limits<float>::infinity()};
8783

88-
/// Returns the best plan that produces 'distribution'. If the best plan has
89-
/// some other distribution, sets 'needsShuffle ' to true.
90-
PlanP best(const Distribution& distribution, bool& needsShuffle);
84+
/// Returns the best plan that produces 'desired' distribution.
85+
/// If the best plan has some other distribution, sets 'needsShuffle' to true.
86+
PlanP best(const Distribution& desired, bool& needsShuffle);
9187

9288
/// Retruns the best plan when we're ok with any distribution.
9389
PlanP best() {
@@ -275,8 +271,11 @@ struct PlanState {
275271
/// True if the costs accumulated so far are so high that this should not be
276272
/// explored further.
277273
bool isOverBest() const {
278-
return hasCutoff && plans.bestCostWithShuffle != 0 &&
279-
cost.unitCost + cost.setupCost > plans.bestCostWithShuffle;
274+
/// This isn't conservative. Because it's possible that we explore some
275+
/// completely new plan with non-compatible input/distribution to any old
276+
/// plan. This plan if not this condition will be added to plans and later
277+
/// can become part of the best plan.
278+
return hasCutoff && cost.cost() > plans.bestCostWithShuffle;
280279
}
281280

282281
void debugSetFirstTable(int32_t id);

0 commit comments

Comments
 (0)