Skip to content

Conversation

@MBkkt
Copy link
Collaborator

@MBkkt MBkkt commented Oct 22, 2025

fix: #536
fix: #557

  1. Order by is dropped when it's not needed (before union/intersects/join/aggregation/order by/etc). So instead of old code when we reassign order by, now we just skip logical plan order by if it's not needed.
  2. Simplify how non deterministic filter handled
  3. Use logical plan NodeKind instead of PlanNode (graph type was used only because in old version of axiom code, there wasn't logical plan at all). Also now allowedInDt never equals to zero. Because now allowedInDt handled in common place.

Because of 3. fixed all issues related to incorrect handled join.

CC: @pashandor789

@MBkkt MBkkt requested a review from mbasmanova October 22, 2025 16:32
@meta-cla meta-cla bot added the CLA Signed This label is managed by the Meta Open Source bot. label Oct 22, 2025
@MBkkt MBkkt force-pushed the mbkkt/fix-dt branch 3 times, most recently from 99fbab3 to b035d1c Compare October 22, 2025 17:57
@meta-codesync
Copy link

meta-codesync bot commented Oct 23, 2025

@mbasmanova has imported this pull request. If you are a Meta employee, you can view this in D85387151.

@mbasmanova
Copy link
Contributor

I'm not able to capture the gist of the new design. Would it be possible to describe how the new logic works to transform a tree of logical plan nodes into a tree of derived tables?

@MBkkt
Copy link
Collaborator Author

MBkkt commented Oct 23, 2025

@mbasmanova

We have logical tree.
We transform it to derived table graph.
We have two ways how to control this transformation

  1. after input was transformed we can check some properties (has aggregation, limit, order by), and finish input's derived table and create the new one that will be read from input's derived table and apply current node operation to this input.
  2. But in some cases we need to keep current dt for our current operator (join, and potentially union/etc) in such cases it's possible to make transformation that instead of using current dt for input nodes will create new one and put this new dt (with inputs nodes) into old one (with current)

It already was like this.

What did I change?

  • All logic nodes limit/order by/aggregation/etc can be accounted in the second method now (before my changes limit wasn't accounted, that's why bug was happened)
  • In second case I changed that finalizeDt called not in the input, but in the caller, so pushdown in such cases works better and we can do things like remove order by from join/union/etc childs and before aggregation/write/etc.
  • I made flatten for union because I'm anyway changed the interface of methods and this force me to read and understand and change this code
  • Because this better handling of second case I was able to simplify some code like nondeterministic filter handling

@mbasmanova
Copy link
Contributor

But in some cases we need to keep current dt for our current operator (join, and potentially union/etc) in such cases it's possible to make transformation that instead of using current dt for input nodes will create new one and put this new dt (with inputs nodes) into old one (with current)

I'm not following. Would you clarify some more?

Are we traversing the logical plan tree bottom up and fill up a DT? When DT is "full", we start a new one? How do we handle nodes with multiple inputs? Do we create separate DTs for each input, then combine these if possible?

@MBkkt
Copy link
Collaborator Author

MBkkt commented Oct 23, 2025

@mbasmanova I think changes can be formulated like this

  1. contains + allowInDt can works for all logical nodes now
  2. finalizeDt called by caller not in the wrapInDt, example where it's needed
    select * from (select * from t group by order by) join t2 ..., if finalizeDt will be called in wrapInDt. It will happens before order by/other nodes that was in the logical plan before disabled node types. so order by will be attached to incorrect dt (in this case it's not super critical, because we anyway want to drop this order by, but I think idea is clear). We want caller who defined allowInDt call finalizeDt
  3. Because code needs some rewrite to this new model, in the process of rewrite I were improve few things: remove unnecessary order by, flatten union, simplify nondeterministic

@MBkkt
Copy link
Collaborator Author

MBkkt commented Oct 23, 2025

@mbasmanova

I'm not following. Would you clarify some more?

In join we want to put join edge/filter/etc inside current dt, left/right can be separate tables with filter or also separate dt.
It's different compared to aggregation, where we put aggregation not in the current but in new dt if in the input was aggregation.

Are we traversing the logical plan tree bottom up and fill up a DT?

I didn't change this, I keep like it was, traverse from the root to the childs, in dfs order

When DT is "full", we start a new one?

Yes, this is also true, for aggregation, limit, orderby, etc, for operators with single input.

But for operators with multiple input (on practice only joins, because for union/etc we just created separate dt for entire operation, it's not desired for joins because we want enumerate them in single dt) we want current dt used for join, and new dt if necessary created for childs. Alternatively we can put every join in separate dt with it's left subtree, but it will be bad for join enumeration.

How do we handle nodes with multiple inputs?
Do we create separate DTs for each input, then combine these if possible?

Partially true, we create separate dt for each input, but only if it's necessary to put them into separate DTs, so we don't need to merge them later (because we didn't create separate DTs if it's unnecessary)

Copy link
Contributor

@mbasmanova mbasmanova left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR description mentions a number of improvements. Would it be possible to add tests for these?

@MBkkt
Copy link
Collaborator Author

MBkkt commented Oct 24, 2025

@mbasmanova I added test for main changes

About flatten union I think existing test cover this (they're adjusted, that's why I changed PlanMatcher a little)

About sropped order, It's tested in my tests ((... limit order by) join ...

If you have thoughts about some concrete tests, can you share them, I will try to add such tests

@mbasmanova
Copy link
Contributor

Still trying to wrap my head around the new design.

If we consider only linear trees (no branches / joins) then it makes sense to fill up a DT while traversing the plan bottom up. Before adding a node, check if it "fits" in current DT and if not, finalize current DT and start a new one.

This doesn't quite work for joins. Let's consider an example:

- 4: join
   - 3: agg
     -  2: scan
   - 1: scan

We start populating a DT with scan 1, then we add scan 2, then add agg 3, then add join 4. They all end up in the same DT, which is incorrect. The agg 3 must be in a separate DT. Is this the bug that triggered this PR? Just want to double check my understanding of the issue with existing code.

Now, it looks like Join introduces a restriction of what can be added to the current DT. This restriction is different from rules that decide whether an operation fits into a DT or not (bottom up). The join says that only tables and filters are allowed. Anything else must be wrapped in a separate DT. I believe the existing allow-in-dt mechanism is supposed to enforce that. Is this not working / buggy?

To summarize, currently there are 2 mechanism that are supposed to work together to cover all cases. On the way down the tree, join node updates allow-in-dt. If a node is not allowed, then it starts a new DT (wrapInDt logic). On the way up, each node checks whether it fits in a DT and, if not, starts a new DT (finalizeDt logic). The difference is wrapInDt creates an empty DT and fills it up using child nodes. FinalizeDT creates a new DT and places current DT into it as a 'table'.

@MBkkt
Copy link
Collaborator Author

MBkkt commented Oct 24, 2025

@mbasmanova

Now, it looks like Join introduces a restriction of what can be added to the current DT. This restriction is different from rules that decide whether an operation fits into a DT or not (bottom up). The join says that only tables and filters are allowed. Anything else must be wrapped in a separate DT. I believe the existing allow-in-dt mechanism is supposed to enforce that. Is this not working / buggy?

Yes I already mentioned current mechanism can work for this, if we will add missing contains checks, the only issue, it won't pushdown filter/etc to such aggregation

consider such query
select * from
(select * from ... group by ... having ...)
join ...

With current approach having will be in join dt, but with my approach it will be in left subtree dt

@mbasmanova
Copy link
Contributor

With current approach having will be in join dt, but with my approach it will be in left subtree dt

Would you help me understand why this is important? I believe this is logic in the optimizer to import the filters into DTs. Hence, it is Ok, no?

@MBkkt
Copy link
Collaborator Author

MBkkt commented Oct 24, 2025

@mbasmanova

  1. Logic is correct now not only for this case but for other too
  2. I think some optimization works only for having, they don't work for filter (in such case we can always check if has aggregation => produce new dt). Example
    // Push HAVING clause that uses only grouping keys below the aggregation.

@MBkkt
Copy link
Collaborator Author

MBkkt commented Oct 24, 2025

@mbasmanova I can say it this way:

We have two approaches to produce DT

  1. finalize then it's necessary (works for linear operators)
  2. create new dt for input (needed for joins)

In first approach we cannot do finalize later.
In second approach with my code we also cannot do this later, but we can do it later with current main code.

So in short I tried to make behavior of two approaches consistent which gives some benefits:

  1. correct having
  2. remove unnecessary order by
  3. maybe some other, I'm not sure, needs to think more about what cases we have.

@MBkkt
Copy link
Collaborator Author

MBkkt commented Oct 24, 2025

@mbasmanova I rebased PR to fix conflict with refactoring

/// Number of fully processed leading elements of 'conjuncts'. Set by
/// distributeConjuncts(). Used to avoid reprocessing conjuncts in repeated
/// calls to distributeConjuncts().
int32_t numCanonicalConjuncts{0};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like this variable is not used. Would you submit a separate PR to remove it?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made PR: #585


PlanMatcherBuilder& localPartition(
const std::shared_ptr<PlanMatcher>& matcher);
std::initializer_list<std::shared_ptr<PlanMatcher>> matcher);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LocalExchange with a single input is a common case. Would it be possible to keep the existing method and add a new one that takes std::initializer_list?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Copy link
Contributor

@mbasmanova mbasmanova left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@MBkkt I'll try to understand this PR again. Is PR description up to date? It doesn't seem so. If there is anything you could clarify to help understanding, that would be much appreciated. Otherwise, I'll need to find bandwidth to run and debug the code to try to figure out how it works.

Copy link
Contributor

@mbasmanova mbasmanova left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is any way to break up this PR into smaller pieces, that would help a lot. Thanks.

return;
}
if (hasLimit() || dt->hasLimit()) {
// TODO: Can we combine limits?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so.

      // Multiple limits are allowed. If already present, then it is combined
      // with the new limit.
      makeQueryGraph(*node.onlyInput(), allowedInDt);
      addLimit(*node.as<lp::LimitNode>());
...
void ToGraph::addLimit(const lp::LimitNode& limit) {
  if (currentDt_->hasLimit()) {
    currentDt_->offset += limit.offset();

    if (currentDt_->limit <= limit.offset()) {
      currentDt_->limit = 0;
    } else {
      currentDt_->limit =
          std::min(limit.count(), currentDt_->limit - limit.offset());
    }
  } else {
    currentDt_->limit = limit.count();
    currentDt_->offset = limit.offset();
  }
}

Copy link
Collaborator Author

@MBkkt MBkkt Nov 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes this is why I wrote todo.
But we need to account that one of them can have aggregation, in such case it's invalid, because if one of them have aggregation, it won't work this way.
Same about cardinality estimation

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed these changes from this PR, they're not necessary here.
Lets continue this discussion in a separate PR: #579

tables = dt->tables;
tableSet = dt->tableSet;

// TODO: Is setop possible here at all?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is flattenDt for? What does it do? Which code paths does it participate in? How are you thinking about this logic?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

flattenDt called when we optimize DT and this DT have single DT in it's tables.

This is possible in two cases:

  1. to graph produced such code
  2. optimizer imported all joins in single dt

Right now we don't have enough tests for this code + setop, so I'm not sure will or not it called

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed these changes from this PR, they're not necessary here.
Lets continue this discussion in a separate PR: #579

auto exprSet = evaluator_.compile(typedExpr);
const auto& first = *exprSet->exprs().front();
if (first.specialFormKind() != velox::exec::SpecialFormKind::kConstant) {
if (!first.isConstant()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extracting trivial refactors like this into a separate PR would help reduce cognitive load on readers.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made PR: #584

uint64_t allowedInDt) {
auto* outerDt = makeQueryGraph(node, allowedInDt);
if (currentDt_->hasOrderBy() && !currentDt_->hasLimit()) {
currentDt_->orderKeys.clear();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way to avoid adding OrderBy in the first place? Removing it works, but it is a bit awkward and we also keep the expressions that may have been added specifically for OrderBy.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure how, the first idea is handle allowInDt for order by differently.
But I think it doesn't always work.
My suggestion is to think about this separately

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did this as you suggested, now we never reassign orderKeys and never clear them.
We just skip Sort node if it's not necessary

@MBkkt MBkkt force-pushed the mbkkt/fix-dt branch 7 times, most recently from 0dd008b to c381de3 Compare November 1, 2025 05:33
@MBkkt
Copy link
Collaborator Author

MBkkt commented Nov 1, 2025

@mbasmanova I separate refactoring/other fixes to separate PRs.

This PR now is clear and do only things related to how we produce derived tables.
In theory it can be separated more, but it will requires to write new code, not just separate, so I prefer to avoid this.

Comment on lines 1976 to +2034
void ToGraph::makeQueryGraph(
const lp::LogicalPlanNode& node,
uint64_t allowedInDt) {
if (!contains(allowedInDt, node.kind())) {
if (node.kind() == lp::NodeKind::kSort) {
// Sort not allowed doesn't mean we need to wrap it in DT,
// instead we should skip it.
makeQueryGraph(*node.onlyInput(), allowedInDt);
} else {
wrapInDt(node, /*unordered=*/false);
}
return;
}

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is core changes @mbasmanova

@MBkkt MBkkt mentioned this pull request Nov 3, 2025
@MBkkt MBkkt force-pushed the mbkkt/fix-dt branch 4 times, most recently from c668f87 to d9bc473 Compare November 6, 2025 12:46
@MBkkt
Copy link
Collaborator Author

MBkkt commented Nov 6, 2025

@mbasmanova I separated this PR into two commits:

  1. fix for joins/filters
  2. fix for order by

For first commit I created separate PR, that can be merged separately: #599

I will rebase this PR as soon as #599 will be merged

MBkkt added 3 commits November 8, 2025 12:09
Fix translation of Joins to DTs. Join(Filter(A), Filter(B)) used to be translated into a single DT, but now it becomes 3 DTs:
    DT for Filter(A)
    DT for Filter(B)
    DT with the join.

Extra wrapping fixes cases where A or B are DTs themselves (see example in facebookincubator#557). The cases where A and B are BaseTables end up with a single DT like before thanks to pre-existing flattenDt logic. In these cases, we first produce 3 DTs, then flatten these into one.

When translating a join, left side is wrapped in a new DT if it contains any of Unnest, Aggregate, Limit, Filter, Sort. Same applies to the right side. For non-inner joins, right side is additionally wrapped in a new DT if it contains another Join.

There was important part of how joins are handled (in case when left part of join tree contained non-deterministic filter).
This code was simplified just to separate DT for filter input + filter

Part of facebookincubator#536 (dropping unnecessary order by is still TBD)
Fixes facebookincubator#557
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

CLA Signed This label is managed by the Meta Open Source bot.

Projects

None yet

3 participants