Conversation
0295eec to
f0ee460
Compare
There was a problem hiding this comment.
I believe the upstream work I am taking on will help or modify how this is approached: apache/datafusion#19973
There was a problem hiding this comment.
As in this should be provided as a ColumnStatistic by single-node df
There was a problem hiding this comment.
yes, completely agree
| ProjectionExec: task_count=Some(1) output_rows=184 cost_class=XS accumulated_cost=11592 output_bytes=4784 | ||
| SortPreservingMergeExec: task_count=Some(1) output_rows=184 cost_class=L accumulated_cost=8372 output_bytes=6440 | ||
| [NetworkBoundary] Coalesce: task_count=Some(1) output_rows=184 | ||
| SortExec: task_count=Some(4) output_rows=184 cost_class=XL accumulated_cost=18915 output_bytes=6440 |
There was a problem hiding this comment.
haha cool to see scale up on compute😄
There was a problem hiding this comment.
Yes! the compute cost calculation is very rough right now, I want to improve it before moving this out of draft, and ideally all the stats used for determining how many bytes will flow through the graph are given by DF upstream.
| /// Given a list of children with a different compute cost each, and a restriction about the maximum | ||
| /// tasks in which they are allowed to run, it assigns tasks counts to them so that the following | ||
| /// conditions are met: |
There was a problem hiding this comment.
oh man, I see what you are saying. This is really tricky
| // Adjust total subtasks to match budget (or get as close as possible) | ||
| let mut total_subtasks: usize = child_subtask_counts.iter().sum(); | ||
|
|
||
| // Trim if over budget: reduce from children with the highest subtask count |
There was a problem hiding this comment.
build a heap then pop the top, decrement and push it back? Would reduce time complexity but don't know if its worth legibility
I do remember some unions being quite large though 🤔
There was a problem hiding this comment.
🙈 this does look like a heap-based problem, although I bet that the code can get pretty complicated if we go down that path.
I think unless a heap approach clearly demonstrates better benchmarks we probably should keep this simple.
| } | ||
|
|
||
| // Expand if under budget: add to children with the highest cost that can expand | ||
| while total_subtasks < task_count_budget { |
There was a problem hiding this comment.
another heap could be used here I believe
pop the top (highest cost), checks if it can expand, increment and push it back on
13de2c9 to
6b3e755
Compare
3c213a5 to
28d7425
Compare
28d7425 to
9a4565b
Compare
30504cd to
705a432
Compare
d20ab72 to
7ff2066
Compare
95cc751 to
e211c01
Compare
|
I added some tests that check the difference between the estimated number of rows and the actual number of rows here: Conclusion: even if the |
nice, let's work to get some of that plumbing in place 😄 |
|
No longer working on this |
Reworks the distributed planner so that it integrates with DataFusion upstream statistics system in favor of having users provide a custom TaskEstimator.
There are some key changes in this PR that rework how this project assign tasks and stages to a plan:
Remove TaskEstimator
Users are no longer free to specific how many tasks should be used for a distributed query:
ExecutionPlan::partition_statistics()method. Based on this statistics, Distributed DataFusion calculates how many tasks are appropriateRely on upstream statistics system
This PR heavily consumes the
Statisticsprovided by the different nodes in order to estimate how much data is going to flow through them.There are still some gaps in upstream statistics that are bridge in this project with some sane defaults.
Compute cost assignation
One of the biggest additions in this PR is the compute complexity estimation for the different nodes. Each node has cost attached that is estimated based on how compute heavy they are.
The cost is measured in "bytes processed", which estimates how many bytes are expected to be processed by the node given the node itself, and the estimated rows and bytes that are going to flow through it given by upstream's
ExecutionPlan::partition_statistics.The computational complexity is taking into account for the different operators with the following enum:
Results
TODO: choose better defaults in order to improve performance