Skip to content

Asynchronous distributed planning#383

Open
gabotechs wants to merge 6 commits intomainfrom
gabrielmusat/async-distributed-planning
Open

Asynchronous distributed planning#383
gabotechs wants to merge 6 commits intomainfrom
gabrielmusat/async-distributed-planning

Conversation

@gabotechs
Copy link
Copy Markdown
Collaborator

@gabotechs gabotechs commented Mar 22, 2026

Closes #382

Allows running asynchronous code during distributed planning.

The rationale of this change is explained in #382.


Note

I don't think the tpch_sf1 improvements are real, they just happen to have less network noise at the moment of running.

=== Comparing tpch_sf1 results from engine 'datafusion-distributed-custom-worker-service' [prev] with 'datafusion-distributed-async-distributed-planning' [new] ===
      q1: prev= 237 ms, new= 236 ms, diff=1.00 faster ✔
      q2: prev= 336 ms, new= 288 ms, diff=1.17 faster ✔
      q3: prev= 514 ms, new= 446 ms, diff=1.15 faster ✔
      q4: prev= 214 ms, new= 145 ms, diff=1.48 faster ✅
      q5: prev= 298 ms, new= 228 ms, diff=1.31 faster ✅
      q6: prev= 237 ms, new= 168 ms, diff=1.41 faster ✅
      q7: prev= 345 ms, new= 323 ms, diff=1.07 faster ✔
      q8: prev= 360 ms, new= 343 ms, diff=1.05 faster ✔
      q9: prev= 379 ms, new= 311 ms, diff=1.22 faster ✅
     q10: prev= 258 ms, new= 220 ms, diff=1.17 faster ✔
     q11: prev= 350 ms, new= 272 ms, diff=1.29 faster ✅
     q12: prev= 201 ms, new= 199 ms, diff=1.01 faster ✔
     q13: prev= 224 ms, new= 163 ms, diff=1.37 faster ✅
     q14: prev= 234 ms, new= 191 ms, diff=1.23 faster ✅
     q15: prev= 304 ms, new= 201 ms, diff=1.51 faster ✅
     q16: prev= 331 ms, new= 280 ms, diff=1.18 faster ✔
     q17: prev= 374 ms, new= 362 ms, diff=1.03 faster ✔
     q18: prev= 313 ms, new= 264 ms, diff=1.19 faster ✔
     q19: prev= 455 ms, new= 394 ms, diff=1.15 faster ✔
     q20: prev= 300 ms, new= 334 ms, diff=1.11 slower ✖
     q21: prev= 607 ms, new= 580 ms, diff=1.05 faster ✔
     q22: prev= 228 ms, new= 229 ms, diff=1.00 slower ✖
   TOTAL: prev=21325.567500000005 ms, new=18566.214098 ms, diff=1.15 faster ✔


=== Comparing tpch_sf10 results from engine 'datafusion-distributed-custom-worker-service' [prev] with 'datafusion-distributed-async-distributed-planning' [new] ===
      q1: prev=1291 ms, new=1241 ms, diff=1.04 faster ✔
      q2: prev= 452 ms, new= 451 ms, diff=1.00 faster ✔
      q3: prev=1057 ms, new= 939 ms, diff=1.13 faster ✔
      q4: prev= 551 ms, new= 602 ms, diff=1.09 slower ✖
      q5: prev=1518 ms, new=1448 ms, diff=1.05 faster ✔
      q6: prev= 813 ms, new= 681 ms, diff=1.19 faster ✔
      q7: prev=1654 ms, new=1604 ms, diff=1.03 faster ✔
      q8: prev=1867 ms, new=1793 ms, diff=1.04 faster ✔
      q9: prev=2208 ms, new=2249 ms, diff=1.02 slower ✖
     q10: prev=1172 ms, new=1090 ms, diff=1.08 faster ✔
     q11: prev= 397 ms, new= 397 ms, diff=1.00 slower ✖
     q12: prev= 829 ms, new= 775 ms, diff=1.07 faster ✔
     q13: prev= 682 ms, new= 686 ms, diff=1.01 slower ✖
     q14: prev= 703 ms, new= 789 ms, diff=1.12 slower ✖
     q15: prev= 884 ms, new= 951 ms, diff=1.08 slower ✖
     q16: prev= 353 ms, new= 342 ms, diff=1.03 faster ✔
     q17: prev=1914 ms, new=2039 ms, diff=1.07 slower ✖
     q18: prev=1972 ms, new=1872 ms, diff=1.05 faster ✔
     q19: prev= 899 ms, new= 830 ms, diff=1.08 faster ✔
     q20: prev= 822 ms, new= 815 ms, diff=1.01 faster ✔
     q21: prev=2452 ms, new=2329 ms, diff=1.05 faster ✔
     q22: prev= 360 ms, new= 336 ms, diff=1.07 faster ✔
   TOTAL: prev=74580.649137 ms, new=72810.241408 ms, diff=1.02 faster ✔

Base automatically changed from gabrielmusat/custom-worker-service to main March 24, 2026 15:15
@gabotechs
Copy link
Copy Markdown
Collaborator Author

gabotechs commented Mar 24, 2026

🤔 There's another option that comes to mind:

rather than calling the distribute_plan().await lazily at DistributedExec::execute(), hook into the QueryPlanner trait and provide a custom Distributed DataFusion AsyncPhysicalOptimizerRule that will append some async equivalent of physical optimizer rules...

On one hand, that looks cleaner for this immediate PR, but on the other hand, it will make it difficult if we want to dynamically decide the task count of stages above at runtime while stages below execute...

I'm a bit on the fence on this, maybe it's worth going with the QueryPlanner approach anyways

@gabotechs gabotechs force-pushed the gabrielmusat/async-distributed-planning branch from caaf185 to 3d82488 Compare March 24, 2026 15:35
@gabotechs
Copy link
Copy Markdown
Collaborator Author

I tried this #383 (comment) in 68f16d3, and I like it much better.

@gabotechs gabotechs force-pushed the gabrielmusat/async-distributed-planning branch from a5a34bc to 6401ae6 Compare April 14, 2026 08:41
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Asynchronous distributed planning

1 participant