@@ -793,6 +793,9 @@ constexpr uint64_t deny(uint64_t mask, T... op) {
793793 return (mask & ... & ~allow (op));
794794}
795795
796+ constexpr uint64_t kUnorderedAllowedInDt =
797+ deny (kAllAllowedInDt , lp::NodeKind::kSort );
798+
796799} // namespace
797800
798801std::optional<ExprCP> ToGraph::translateSubfieldFunction (
@@ -1156,6 +1159,9 @@ AggregationPlanCP ToGraph::translateAggregation(const lp::AggregateNode& agg) {
11561159}
11571160
11581161void ToGraph::addOrderBy (const lp::SortNode& order) {
1162+ VELOX_DCHECK (currentDt_->orderKeys .empty ());
1163+ VELOX_DCHECK (currentDt_->orderTypes .empty ());
1164+
11591165 auto [deduppedOrderKeys, deduppedOrderTypes] =
11601166 dedupOrdering (order.ordering ());
11611167
@@ -1308,9 +1314,9 @@ DerivedTableP ToGraph::newDt() {
13081314 return dt;
13091315}
13101316
1311- void ToGraph::wrapInDt (const lp::LogicalPlanNode& node) {
1317+ void ToGraph::wrapInDt (const lp::LogicalPlanNode& node, bool unordered ) {
13121318 auto * outerDt = std::exchange (currentDt_, newDt ());
1313- makeQueryGraph (node, kAllAllowedInDt );
1319+ makeQueryGraph (node, unordered ? kUnorderedAllowedInDt : kAllAllowedInDt );
13141320 finalizeDt (node, outerDt);
13151321}
13161322
@@ -1564,7 +1570,7 @@ DerivedTableP ToGraph::translateSubquery(
15641570 VELOX_CHECK (correlatedConjuncts_.empty ());
15651571
15661572 auto * outerDt = std::exchange (currentDt_, newDt ());
1567- makeQueryGraph (node, kAllAllowedInDt );
1573+ makeQueryGraph (node, kUnorderedAllowedInDt );
15681574 auto * subqueryDt = currentDt_;
15691575 finalizeSubqueryDt (node, outerDt);
15701576
@@ -1851,7 +1857,7 @@ bool hasNondeterministic(const lp::ExprPtr& expr) {
18511857void ToGraph::translateSetJoin (const lp::SetNode& set) {
18521858 auto * setDt = currentDt_;
18531859 for (auto & input : set.inputs ()) {
1854- wrapInDt (*input);
1860+ wrapInDt (*input, /* unordered= */ true );
18551861 }
18561862
18571863 const bool exists = set.operation () == lp::SetOperation::kIntersect ;
@@ -1960,7 +1966,7 @@ void ToGraph::translateUnion(const lp::SetNode& set) {
19601966 auto translateUnionInput = [&](const lp::LogicalPlanNode& input) {
19611967 renames_ = renames;
19621968 currentDt_ = newDt ();
1963- makeQueryGraph (input, kAllAllowedInDt );
1969+ makeQueryGraph (input, kUnorderedAllowedInDt );
19641970 auto * newDt = std::exchange (currentDt_, setDt);
19651971
19661972 const auto & type = input.outputType ();
@@ -2016,7 +2022,13 @@ void ToGraph::makeQueryGraph(
20162022 const lp::LogicalPlanNode& node,
20172023 uint64_t allowedInDt) {
20182024 if (!contains (allowedInDt, node.kind ())) {
2019- wrapInDt (node);
2025+ if (node.kind () == lp::NodeKind::kSort ) {
2026+ // Sort not allowed doesn't mean we need to wrap it in DT,
2027+ // instead we should skip it.
2028+ makeQueryGraph (*node.onlyInput (), allowedInDt);
2029+ } else {
2030+ wrapInDt (node, /* unordered=*/ false );
2031+ }
20202032 return ;
20212033 }
20222034
@@ -2034,7 +2046,10 @@ void ToGraph::makeQueryGraph(
20342046 const auto & filter = *node.as <lp::FilterNode>();
20352047 if (hasNondeterministic (filter.predicate ())) {
20362048 auto * outerDt = std::exchange (currentDt_, newDt ());
2037- makeQueryGraph (input, kAllAllowedInDt );
2049+ allowedInDt = contains (allowedInDt, lp::NodeKind::kSort )
2050+ ? kAllAllowedInDt
2051+ : kUnorderedAllowedInDt ;
2052+ makeQueryGraph (input, allowedInDt);
20382053 addFilter (filter);
20392054 finalizeDt (node, outerDt);
20402055 break ;
@@ -2048,13 +2063,12 @@ void ToGraph::makeQueryGraph(
20482063 } break ;
20492064 case lp::NodeKind::kAggregate : {
20502065 const auto & input = *node.onlyInput ();
2051- makeQueryGraph (input, allowedInDt);
2066+ makeQueryGraph (input, deny ( allowedInDt, lp::NodeKind:: kSort ) );
20522067 if (currentDt_->hasAggregation () || currentDt_->hasLimit ()) {
20532068 finalizeDt (input);
2054- } else if (currentDt_->hasOrderBy ()) {
2055- currentDt_->orderKeys .clear ();
2056- currentDt_->orderTypes .clear ();
20572069 }
2070+ VELOX_DCHECK (currentDt_->orderKeys .empty ());
2071+ VELOX_DCHECK (currentDt_->orderTypes .empty ());
20582072
20592073 auto * agg = translateAggregation (*node.as <lp::AggregateNode>());
20602074
@@ -2134,13 +2148,14 @@ void ToGraph::makeQueryGraph(
21342148 } break ;
21352149 case lp::NodeKind::kSort : {
21362150 const auto & input = *node.onlyInput ();
2137- makeQueryGraph (input, allowedInDt);
2151+ makeQueryGraph (input, deny ( allowedInDt, lp::NodeKind:: kSort ) );
21382152 if (currentDt_->hasLimit ()) {
21392153 finalizeDt (input);
21402154 }
21412155 addOrderBy (*node.as <lp::SortNode>());
21422156 } break ;
21432157 case lp::NodeKind::kLimit : {
2158+ allowedInDt |= allow (lp::NodeKind::kSort );
21442159 makeQueryGraph (*node.onlyInput (), allowedInDt);
21452160 addLimit (*node.as <lp::LimitNode>());
21462161 } break ;
@@ -2162,7 +2177,7 @@ void ToGraph::makeQueryGraph(
21622177 } break ;
21632178 case lp::NodeKind::kTableWrite : {
21642179 VELOX_DCHECK_EQ (allowedInDt, kAllAllowedInDt );
2165- wrapInDt (*node.onlyInput ());
2180+ wrapInDt (*node.onlyInput (), /* unordered= */ true );
21662181 addWrite (*node.as <lp::TableWriteNode>());
21672182 } break ;
21682183 default :
0 commit comments