-
Notifications
You must be signed in to change notification settings - Fork 39
fix: Fix how derived table produced #542
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
99fbab3 to
b035d1c
Compare
|
@mbasmanova has imported this pull request. If you are a Meta employee, you can view this in D85387151. |
|
I'm not able to capture the gist of the new design. Would it be possible to describe how the new logic works to transform a tree of logical plan nodes into a tree of derived tables? |
|
We have logical tree.
It already was like this. What did I change?
|
I'm not following. Would you clarify some more? Are we traversing the logical plan tree bottom up and fill up a DT? When DT is "full", we start a new one? How do we handle nodes with multiple inputs? Do we create separate DTs for each input, then combine these if possible? |
|
@mbasmanova I think changes can be formulated like this
|
In join we want to put join edge/filter/etc inside current dt, left/right can be separate tables with filter or also separate dt.
I didn't change this, I keep like it was, traverse from the root to the childs, in dfs order
Yes, this is also true, for aggregation, limit, orderby, etc, for operators with single input. But for operators with multiple input (on practice only joins, because for union/etc we just created separate dt for entire operation, it's not desired for joins because we want enumerate them in single dt) we want current dt used for join, and new dt if necessary created for childs. Alternatively we can put every join in separate dt with it's left subtree, but it will be bad for join enumeration.
Partially true, we create separate dt for each input, but only if it's necessary to put them into separate DTs, so we don't need to merge them later (because we didn't create separate DTs if it's unnecessary) |
mbasmanova
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PR description mentions a number of improvements. Would it be possible to add tests for these?
|
@mbasmanova I added test for main changes About flatten union I think existing test cover this (they're adjusted, that's why I changed PlanMatcher a little) About sropped order, It's tested in my tests ( If you have thoughts about some concrete tests, can you share them, I will try to add such tests |
|
Still trying to wrap my head around the new design. If we consider only linear trees (no branches / joins) then it makes sense to fill up a DT while traversing the plan bottom up. Before adding a node, check if it "fits" in current DT and if not, finalize current DT and start a new one. This doesn't quite work for joins. Let's consider an example: We start populating a DT with scan 1, then we add scan 2, then add agg 3, then add join 4. They all end up in the same DT, which is incorrect. The agg 3 must be in a separate DT. Is this the bug that triggered this PR? Just want to double check my understanding of the issue with existing code. Now, it looks like Join introduces a restriction of what can be added to the current DT. This restriction is different from rules that decide whether an operation fits into a DT or not (bottom up). The join says that only tables and filters are allowed. Anything else must be wrapped in a separate DT. I believe the existing allow-in-dt mechanism is supposed to enforce that. Is this not working / buggy? To summarize, currently there are 2 mechanism that are supposed to work together to cover all cases. On the way down the tree, join node updates allow-in-dt. If a node is not allowed, then it starts a new DT (wrapInDt logic). On the way up, each node checks whether it fits in a DT and, if not, starts a new DT (finalizeDt logic). The difference is wrapInDt creates an empty DT and fills it up using child nodes. FinalizeDT creates a new DT and places current DT into it as a 'table'. |
Yes I already mentioned current mechanism can work for this, if we will add missing contains checks, the only issue, it won't pushdown filter/etc to such aggregation consider such query With current approach having will be in join dt, but with my approach it will be in left subtree dt |
Would you help me understand why this is important? I believe this is logic in the optimizer to import the filters into DTs. Hence, it is Ok, no? |
|
|
@mbasmanova I can say it this way: We have two approaches to produce DT
In first approach we cannot do finalize later. So in short I tried to make behavior of two approaches consistent which gives some benefits:
|
|
@mbasmanova I rebased PR to fix conflict with refactoring |
4155652 to
8da320d
Compare
axiom/optimizer/DerivedTable.h
Outdated
| /// Number of fully processed leading elements of 'conjuncts'. Set by | ||
| /// distributeConjuncts(). Used to avoid reprocessing conjuncts in repeated | ||
| /// calls to distributeConjuncts(). | ||
| int32_t numCanonicalConjuncts{0}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like this variable is not used. Would you submit a separate PR to remove it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I made PR: #585
|
|
||
| PlanMatcherBuilder& localPartition( | ||
| const std::shared_ptr<PlanMatcher>& matcher); | ||
| std::initializer_list<std::shared_ptr<PlanMatcher>> matcher); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LocalExchange with a single input is a common case. Would it be possible to keep the existing method and add a new one that takes std::initializer_list?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
mbasmanova
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@MBkkt I'll try to understand this PR again. Is PR description up to date? It doesn't seem so. If there is anything you could clarify to help understanding, that would be much appreciated. Otherwise, I'll need to find bandwidth to run and debug the code to try to figure out how it works.
mbasmanova
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If there is any way to break up this PR into smaller pieces, that would help a lot. Thanks.
axiom/optimizer/DerivedTable.cpp
Outdated
| return; | ||
| } | ||
| if (hasLimit() || dt->hasLimit()) { | ||
| // TODO: Can we combine limits? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think so.
// Multiple limits are allowed. If already present, then it is combined
// with the new limit.
makeQueryGraph(*node.onlyInput(), allowedInDt);
addLimit(*node.as<lp::LimitNode>());
...
void ToGraph::addLimit(const lp::LimitNode& limit) {
if (currentDt_->hasLimit()) {
currentDt_->offset += limit.offset();
if (currentDt_->limit <= limit.offset()) {
currentDt_->limit = 0;
} else {
currentDt_->limit =
std::min(limit.count(), currentDt_->limit - limit.offset());
}
} else {
currentDt_->limit = limit.count();
currentDt_->offset = limit.offset();
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes this is why I wrote todo.
But we need to account that one of them can have aggregation, in such case it's invalid, because if one of them have aggregation, it won't work this way.
Same about cardinality estimation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I removed these changes from this PR, they're not necessary here.
Lets continue this discussion in a separate PR: #579
axiom/optimizer/DerivedTable.cpp
Outdated
| tables = dt->tables; | ||
| tableSet = dt->tableSet; | ||
|
|
||
| // TODO: Is setop possible here at all? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is flattenDt for? What does it do? Which code paths does it participate in? How are you thinking about this logic?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
flattenDt called when we optimize DT and this DT have single DT in it's tables.
This is possible in two cases:
- to graph produced such code
- optimizer imported all joins in single dt
Right now we don't have enough tests for this code + setop, so I'm not sure will or not it called
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I removed these changes from this PR, they're not necessary here.
Lets continue this discussion in a separate PR: #579
| auto exprSet = evaluator_.compile(typedExpr); | ||
| const auto& first = *exprSet->exprs().front(); | ||
| if (first.specialFormKind() != velox::exec::SpecialFormKind::kConstant) { | ||
| if (!first.isConstant()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Extracting trivial refactors like this into a separate PR would help reduce cognitive load on readers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I made PR: #584
axiom/optimizer/ToGraph.cpp
Outdated
| uint64_t allowedInDt) { | ||
| auto* outerDt = makeQueryGraph(node, allowedInDt); | ||
| if (currentDt_->hasOrderBy() && !currentDt_->hasLimit()) { | ||
| currentDt_->orderKeys.clear(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a way to avoid adding OrderBy in the first place? Removing it works, but it is a bit awkward and we also keep the expressions that may have been added specifically for OrderBy.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure how, the first idea is handle allowInDt for order by differently.
But I think it doesn't always work.
My suggestion is to think about this separately
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did this as you suggested, now we never reassign orderKeys and never clear them.
We just skip Sort node if it's not necessary
0dd008b to
c381de3
Compare
|
@mbasmanova I separate refactoring/other fixes to separate PRs. This PR now is clear and do only things related to how we produce derived tables. |
| void ToGraph::makeQueryGraph( | ||
| const lp::LogicalPlanNode& node, | ||
| uint64_t allowedInDt) { | ||
| if (!contains(allowedInDt, node.kind())) { | ||
| if (node.kind() == lp::NodeKind::kSort) { | ||
| // Sort not allowed doesn't mean we need to wrap it in DT, | ||
| // instead we should skip it. | ||
| makeQueryGraph(*node.onlyInput(), allowedInDt); | ||
| } else { | ||
| wrapInDt(node, /*unordered=*/false); | ||
| } | ||
| return; | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is core changes @mbasmanova
c668f87 to
d9bc473
Compare
|
@mbasmanova I separated this PR into two commits:
For first commit I created separate PR, that can be merged separately: #599 I will rebase this PR as soon as #599 will be merged |
Fix translation of Joins to DTs. Join(Filter(A), Filter(B)) used to be translated into a single DT, but now it becomes 3 DTs:
DT for Filter(A)
DT for Filter(B)
DT with the join.
Extra wrapping fixes cases where A or B are DTs themselves (see example in facebookincubator#557). The cases where A and B are BaseTables end up with a single DT like before thanks to pre-existing flattenDt logic. In these cases, we first produce 3 DTs, then flatten these into one.
When translating a join, left side is wrapped in a new DT if it contains any of Unnest, Aggregate, Limit, Filter, Sort. Same applies to the right side. For non-inner joins, right side is additionally wrapped in a new DT if it contains another Join.
There was important part of how joins are handled (in case when left part of join tree contained non-deterministic filter).
This code was simplified just to separate DT for filter input + filter
Part of facebookincubator#536 (dropping unnecessary order by is still TBD)
Fixes facebookincubator#557
fix: #536
fix: #557
Because of 3. fixed all issues related to incorrect handled join.
CC: @pashandor789