Skip to content

feat: move shuffle writer disk I/O off tokio worker threads#1537

Open
hcrosse wants to merge 9 commits intoapache:mainfrom
hcrosse:feat/async-io-shuffle-writer
Open

feat: move shuffle writer disk I/O off tokio worker threads#1537
hcrosse wants to merge 9 commits intoapache:mainfrom
hcrosse:feat/async-io-shuffle-writer

Conversation

@hcrosse
Copy link
Copy Markdown

@hcrosse hcrosse commented Apr 2, 2026

Which issue does this PR close?

Closes #1387.

Rationale

The shuffle writer performs synchronous std::fs I/O inside async task contexts. Under concurrent shuffle workloads, this blocks tokio worker threads and degrades runtime responsiveness for co-located async tasks.

What changes are included in this PR?

Both the hash repartition path in shuffle_writer.rs and the no-repartition path in utils::write_stream_to_disk now use a bounded mpsc::channel(2) to bridge the async record-batch stream to a spawn_blocking task that owns all file I/O. This is the same pattern already used in flight_service.rs.

  • shuffle_writer.rs: hash repartition path rewritten with bounded-channel + spawn_blocking, create_dir_all converted to tokio::fs, removed double-counting of write_time in the no-repartition path, two write-failure regression tests added
  • utils.rs: write_stream_to_disk rewritten with bounded-channel + spawn_blocking
  • ballista/core/Cargo.toml: added "fs" feature to tokio dependency
  • shuffle_bench.rs: added --concurrency flag for measuring concurrent partition execution, modeling how the Ballista executor dispatches tasks via Semaphore(task_slots)

Benchmark results

All numbers from shuffle_bench with 3 iterations averaged, targeting local tmpfs. Format is before -> after (delta).

Default configuration (8 input partitions, 100 output partitions):

Rows Concurrency 1 Concurrency 4 Concurrency 8
1,000,000 - 114 -> 97 (-14.4%) 98 -> 93 (-4.7%)
5,000,000 648 -> 650 (+0.4%) 284 -> 272 (-4.4%) 291 -> 293 (+0.9%)
10,000,000 - 498 -> 487 (-2.4%) -

Output partition sensitivity (5,000,000 rows, 8 input partitions):

Output Part. Concurrency 4 Concurrency 8
50 155 -> 164 (+6.3%) -
100 284 -> 272 (-4.4%) 291 -> 293 (+0.9%)
200 486 -> 491 (+1.0%) 475 -> 473 (-0.4%)

Input partition sensitivity (5,000,000 rows, concurrency 4, 100 output partitions):

Input Part. Before -> After
4 238 -> 241 (+1.2%)
8 284 -> 272 (-4.4%)

Throughput results are mixed, which is expected when the I/O target is local tmpfs with near-zero latency. In this scenario the channel indirection adds overhead that roughly offsets the benefit of freeing tokio workers.

The benefit shows most clearly at moderate concurrency (c=4) with 8 input partitions, where the tokio threadpool has enough in-flight tasks to take advantage of freed workers. At c=1 there is no contention so no benefit. At c=8, I/O bandwidth saturates and the channel overhead roughly cancels out the worker-freeing benefit. The slight regression at p=50 reflects the smaller per-batch partition cost making the channel overhead proportionally larger.

The primary benefit is runtime health rather than raw throughput: keeping tokio workers unblocked prevents latency spikes in co-located async tasks. A separate runtime-probe microbenchmark confirmed that inline sync I/O causes 3-8 second probe latencies vs 2-8 ms with spawn_blocking, even when aggregate throughput is similar.

Future work

Both partitioning and I/O currently run together in spawn_blocking because BatchPartitioner::partition takes a sync closure. Making the private partition_iter method public upstream would let us partition on the async side and only push I/O to the blocking pool. See apache/datafusion#21311.

Are these changes tested?

All existing shuffle_writer tests pass. Two new regression tests verify that errors propagate correctly through the bounded-channel for both the no-repartition and hash repartition paths. The shuffle_bench binary now accepts a --concurrency flag for validating concurrent execution behavior.

Are there any user-facing changes?

No. This is an internal change to the shuffle writer execution path.

Comment thread ballista/core/src/execution_plans/shuffle_writer.rs
Comment thread ballista/core/src/execution_plans/shuffle_writer.rs
Comment thread benchmarks/src/bin/shuffle_bench.rs Outdated
Comment thread ballista/core/src/utils.rs Outdated
Comment thread ballista/core/src/utils.rs
Comment thread ballista/core/src/execution_plans/shuffle_writer.rs Outdated
path,
});
let schema = stream.schema();
let (tx, mut rx) = tokio::sync::mpsc::channel::<RecordBatch>(2);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why 2 ?
Isn't it too low ? This may lead to less throughput due to more context switches.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I originally went with 2 just to be conservative, but ran some benchmarks and I bumped the channel capacity from 2 to 8, and made it configurable via ShuffleWriterExec::with_channel_capacity(). I also added --channel-capacity to the benchmark so this can be checked more easily. Happy to remove the configurability if you feel like that's overkill.

For benchmarking, I ran capacity sweeps from 1 to 32 channels and compared against the baseline at a few concurrency levels (without much variance):

Cap Median Inv CS
1 217.0ms 883
2 203.3ms 845
4 202.6ms 834
8 199.2ms 817
16 203.0ms 787
32 200.2ms 724
Mode Median Inv CS
Channel (cap=8) 199.2ms 817
Inline (no channel) 199.6ms 183

There wasn't a huge difference at any level beyond going from 1 -> 2, except for larger capacity modestly reducing involuntary context switches (from 883 to 724 from cap 1 to 32), which is consistent with fewer producer/consumer sync points.

I also ran a load test with 8 concurrent shuffles on 4 tokio workers, measuring tokio::time::sleep jitter as a proxy for worker health. Inline I/O had p99 jitter of 120-160ms vs ~3-11ms with the channel.

Comment thread ballista/core/src/utils.rs Outdated
@milenkovicm
Copy link
Copy Markdown
Contributor

@sqlbenchmark tpch --iterations 3 --scale-factor 1

@sqlbenchmark
Copy link
Copy Markdown

Ballista TPC-H Benchmark Results

PR: #1537 - Fix formatting in shuffle_bench and sort imports
PR Commit: bd676818
Base Commit: d0b51dd3 (main)
Scale Factor: SF1
Iterations: 3

Query Comparison

Query main (ms) PR (ms) Change
Q1 208.00 220.50 🔴 +6.0%
Q2 102.30 87.50 🟢 -14.5%
Q3 137.00 116.10 🟢 -15.3%
Q4 75.30 64.00 🟢 -15.0%
Q5 275.70 226.10 🟢 -18.0%
Q6 60.00 61.70 ⚪ +2.8%
Q7 368.40 267.30 🟢 -27.4%
Q8 322.80 289.40 🟢 -10.3%
Q9 464.60 383.10 🟢 -17.5%
Q10 166.70 133.40 🟢 -20.0%
Q11 70.70 63.60 🟢 -10.0%
Q12 108.90 111.10 ⚪ +2.0%
Q13 131.60 116.60 🟢 -11.4%
Q14 63.60 63.00 ⚪ -0.9%
Q15 61.70 62.00 ⚪ +0.5%
Q16 54.50 52.30 ⚪ -4.0%
Q17 214.60 199.20 🟢 -7.2%
Q18 310.90 326.30 ⚪ +5.0%
Q19 141.60 141.50 ⚪ -0.1%
Q20 96.10 90.10 🟢 -6.2%
Q21 450.00 365.50 🟢 -18.8%
Q22 48.20 47.50 ⚪ -1.5%

Total: Main=3933.20ms, PR=3487.80ms (-11.3%)


Automated benchmark run by dfbench

@milenkovicm
Copy link
Copy Markdown
Contributor

@sqlbenchmark tpch --iterations 3 --scale-factor 10

@sqlbenchmark
Copy link
Copy Markdown

Ballista TPC-H Benchmark Results

PR: #1537 - configurable channel capacity for shuffle writer, defaults to 8
PR Commit: 7ed27529
Base Commit: d0b51dd3 (main)
Scale Factor: SF10
Iterations: 3

Query Comparison

Query main (ms) PR (ms) Change
Q1 1917.40 1951.90 ⚪ +1.8%
Q2 762.30 688.30 🟢 -9.7%
Q3 1237.70 1047.00 🟢 -15.4%
Q4 583.50 468.60 🟢 -19.7%
Q5 2654.70 2567.10 ⚪ -3.3%
Q6 522.50 520.30 ⚪ -0.4%
Q7 3782.50 2749.50 🟢 -27.3%
Q8 3783.80 2679.50 🟢 -29.2%
Q9 4817.60 4118.90 🟢 -14.5%
Q10 1568.20 1361.80 🟢 -13.2%
Q11 621.00 548.10 🟢 -11.7%
Q12 1731.80 1020.80 🟢 -41.1%
Q13 1307.70 1090.90 🟢 -16.6%
Q14 562.30 553.20 ⚪ -1.6%
Q15 556.60 554.20 ⚪ -0.4%
Q16 341.10 266.10 🟢 -22.0%
Q17 3403.20 3490.10 ⚪ +2.6%
Q18 3833.70 3938.70 ⚪ +2.7%
Q19 1277.70 1243.30 ⚪ -2.7%
Q20 970.30 860.90 🟢 -11.3%
Q21 4575.40 4033.20 🟢 -11.9%
Q22 326.80 278.50 🟢 -14.8%

Total: Main=41137.80ms, PR=36030.90ms (-12.4%)


Automated benchmark run by dfbench

@milenkovicm
Copy link
Copy Markdown
Contributor

Looks like I was wrong about if this is going to be measurable improvement. We'll done @hcrosse

Will have a look over weekend

@milenkovicm
Copy link
Copy Markdown
Contributor

@sqlbenchmark tpch --iterations 3 --scale-factor 1

@sqlbenchmark
Copy link
Copy Markdown

Ballista TPC-H Benchmark Results

PR: #1537 - configurable channel capacity for shuffle writer, defaults to 8
PR Commit: 7ed27529
Base Commit: d0b51dd3 (main)
Scale Factor: SF1
Iterations: 3

Query Comparison

Query main (ms) PR (ms) Change
Q1 208.10 221.90 🔴 +6.6%
Q2 104.50 89.20 🟢 -14.6%
Q3 136.70 99.90 🟢 -26.9%
Q4 75.80 58.80 🟢 -22.4%
Q5 275.60 236.90 🟢 -14.0%
Q6 59.60 61.20 ⚪ +2.7%
Q7 369.50 258.60 🟢 -30.0%
Q8 321.50 273.90 🟢 -14.8%
Q9 459.40 370.40 🟢 -19.4%
Q10 158.00 136.80 🟢 -13.4%
Q11 71.90 63.00 🟢 -12.4%
Q12 109.30 111.40 ⚪ +1.9%
Q13 129.20 112.00 🟢 -13.3%
Q14 63.90 63.20 ⚪ -1.1%
Q15 76.00 77.00 ⚪ +1.3%
Q16 55.10 66.00 🔴 +19.8%
Q17 210.70 198.60 🟢 -5.7%
Q18 316.40 314.60 ⚪ -0.6%
Q19 141.10 139.80 ⚪ -0.9%
Q20 100.50 89.50 🟢 -10.9%
Q21 431.80 362.00 🟢 -16.2%
Q22 48.00 48.00 ⚪ +0.0%

Total: Main=3922.60ms, PR=3452.70ms (-12.0%)


Automated benchmark run by dfbench

Copy link
Copy Markdown
Contributor

@milenkovicm milenkovicm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just a heads up #1527 will remove full path from shuffles

shuffle_output_partitioning,
metrics: ExecutionPlanMetricsSet::new(),
properties,
channel_capacity: DEFAULT_SHUFFLE_CHANNEL_CAPACITY,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we want to make this configurable, perhaps we could add it to BallistaConfig ?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed! Added in 1f7f67f

num_batches += 1;
num_rows += batch.num_rows();
num_bytes += batch_size_bytes;
let mut writer =
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of doing more IO - I think this mainly does more CPU in parallel.
(Not necessarily bad, but it will have an effect that each partitions work will no longer be bound to a thread).

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah good point! apache/datafusion#21311 would let us partition on the async side and only move serialization + IO to the blocking pool.

Copy link
Copy Markdown
Contributor

@Dandandan Dandandan Apr 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean something slightly differently.
This adds extra parallelism by adding extra worker threads.
This helps when there is some skew, i.e. one partition is finishing faster than the other (and this is likely the source of the speedup) but also increases memory usage and decreases cache-friendliness.

With e.g. morsel-driven parallelism: apache/datafusion#20529 this would be not less needed as each thread is able to pull enough work from the scan.

You could implement a similar approach for ShuffleReader as well, eliminating / reducing the need to do it at the write side.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool proposal! I agree, the channel does add extra parallelism beyond just moving I/O off the runtime, and morsel-driven execution would likely eat into the throughput gains we're seeing from that in the TCP-H benchmarks.

The main motivation for the spawn_blocking work is keeping blocking writes off tokio workers so concurrent tasks don't stall, and I think that need stays even with morsel-driven execution upstream.

On the ShuffleReader side, could you say more about what you have in mind? Local reads already use spawn_blocking but process files sequentially. Are you suggesting we pipeline reads through a channel the same way, or something else?

Copy link
Copy Markdown
Contributor

@Dandandan Dandandan Apr 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For local disk IO stalling the runtime is usually not a big factor, as there is not so much it can do at the same time (in the same worker thread). I bet if you would only use spawn_blocking or tokio::fs for writes you would see smaller gains?
That in itself is interesting to learn.

Maybe I am wrong and mostly projecting my experience with Parquet reads :)

For Shufflereader, i think its a bit tricky given that the current format is entirely stream based.
It probably would need to write the shuffle files in smaller blocks (e.g. of 50MB or something each) so it can parallelize / morselize reads better.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, the write path (but not the compress path).

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@andygrove is experimenting with shuffle chunking in apache/datafusion-comet#3845

I wonder should we focus more on improving sort shuffle implementation as this one won't scale with huge number of partitions

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looked at the comet PR and batch coalescing per partition seems like the right fix for large partition counts, so I agree we should do that as a follow-up (happy to take an attempt myself!)

IMO they're complementary, but open to bundling it here or closing this and starting fresh if that makes more sense.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's your call @hcrosse and @Dandandan how to follow up.

@Dandandan wdyt?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not too opinionated how to move forward.

This PR shows great perf improvements, probable the cost (some extra memory usage / bit of extra tokio overhead) is probably relatively minor for the improvement.

In the future we can probably benefit from queu-based scan/ workstealing for file sources and implement it for shuffle scans, so partition skew is handled better.

At that point we probably can disable it in most cases (only keep IO off the path instead of compress + write)

Comment thread ballista/core/src/serde/generated/ballista.rs Outdated
hcrosse added 4 commits April 15, 2026 10:31
- Hash repartition: send batches over bounded mpsc channel to
  spawn_blocking task that owns BatchPartitioner and per-output writers
- No-repartition: channel + spawn_blocking for write_stream_to_disk
- Surface write errors when stream errors take priority
- Track partition file size from disk metadata instead of in-memory
  batch size
- Add write failure propagation tests for both paths
- Add ballista.shuffle.writer.channel_capacity config key (default 8)
- Read from BallistaConfig in serde deserialization via TaskContext
- Wire through execution_engine to ShuffleWriterExec
Move channel_capacity extraction from serde deserialization to the
executor's create_query_stage_exec, which now has access to SessionConfig
(upstream apache#1542). This is the clean place for executor-side late binding.
@hcrosse hcrosse force-pushed the feat/async-io-shuffle-writer branch from 183a370 to 004c5f9 Compare April 15, 2026 19:59
@milenkovicm
Copy link
Copy Markdown
Contributor

@sqlbenchmark tpch --iterations 3 --scale-factor 1

@sqlbenchmark
Copy link
Copy Markdown

Ballista TPC-H Benchmark Results

PR: #1537 - Read channel capacity from BallistaConfig in create_query_stage_exec
PR Commit: 004c5f96
Base Commit: aba8c6c0 (main)
Scale Factor: SF1
Iterations: 3

Query Comparison

Query main (ms) PR (ms) Change
Q1 215.30 205.10 ⚪ -4.7%
Q2 116.50 141.70 🔴 +21.6%
Q3 138.30 122.90 🟢 -11.1%
Q4 83.40 77.30 🟢 -7.3%
Q5 283.20 235.10 🟢 -17.0%
Q6 73.50 59.50 🟢 -19.0%
Q7 366.80 267.80 🟢 -27.0%
Q8 340.30 274.80 🟢 -19.2%
Q9 470.60 417.50 🟢 -11.3%
Q10 218.40 139.80 🟢 -36.0%
Q11 107.90 72.30 🟢 -33.0%
Q12 105.90 107.40 ⚪ +1.4%
Q13 128.30 126.10 ⚪ -1.7%
Q14 64.40 89.90 🔴 +39.6%
Q15 101.40 62.80 🟢 -38.1%
Q16 79.70 76.40 ⚪ -4.1%
Q17 211.70 223.90 🔴 +5.8%
Q18 338.60 334.70 ⚪ -1.2%
Q19 137.90 138.50 ⚪ +0.4%
Q20 128.70 128.60 ⚪ -0.1%
Q21 445.90 372.50 🟢 -16.5%
Q22 58.00 77.70 🔴 +34.0%

Total: Main=4214.70ms, PR=3752.30ms (-11.0%)


Automated benchmark run by dfbench

@milenkovicm
Copy link
Copy Markdown
Contributor

@sqlbenchmark tpch --iterations 3 --scale-factor 10

@sqlbenchmark
Copy link
Copy Markdown

Ballista TPC-H Benchmark Results

PR: #1537 - Read channel capacity from BallistaConfig in create_query_stage_exec
PR Commit: 004c5f96
Base Commit: aba8c6c0 (main)
Scale Factor: SF10
Iterations: 3

Query Comparison

Query main (ms) PR (ms) Change
Q1 1880.30 1967.00 ⚪ +4.6%
Q2 761.00 709.90 🟢 -6.7%
Q3 1212.00 1031.90 🟢 -14.9%
Q4 567.00 469.40 🟢 -17.2%
Q5 2622.60 2218.40 🟢 -15.4%
Q6 501.00 508.10 ⚪ +1.4%
Q7 3797.00 2777.10 🟢 -26.9%
Q8 3241.20 2714.10 🟢 -16.3%
Q9 4757.30 4169.60 🟢 -12.4%
Q10 1536.00 1432.90 🟢 -6.7%
Q11 553.70 575.30 ⚪ +3.9%
Q12 972.70 1009.50 ⚪ +3.8%
Q13 1430.40 1262.50 🟢 -11.7%
Q14 527.90 541.90 ⚪ +2.7%
Q15 585.80 594.50 ⚪ +1.5%
Q16 341.30 277.20 🟢 -18.8%
Q17 3682.00 3832.50 ⚪ +4.1%
Q18 3677.10 3991.30 🔴 +8.5%
Q19 1255.90 1246.50 ⚪ -0.7%
Q20 974.10 886.30 🟢 -9.0%
Q21 4995.60 4118.60 🟢 -17.6%
Q22 290.80 274.30 🟢 -5.7%

Total: Main=40162.70ms, PR=36608.80ms (-8.8%)


Automated benchmark run by dfbench

hcrosse added 3 commits April 16, 2026 13:57
- Collapse multiline error! macro in utils.rs to single line
- Reorder imports alphabetically in execution_engine.rs
- Merge import line in serde/mod.rs
Replace .iter() with key destructuring with .values() in three places:
- aqe/mod.rs: running_tasks()
- execution_graph.rs: running_tasks()
- executor_manager.rs: get_expired_executors()
- Replace if-in-match-arm with match guards for Char('4'/'5'/'6') key bindings
- Replace sort_by with sort_by_key in get_jobs
@milenkovicm
Copy link
Copy Markdown
Contributor

looks like there is issue with some of the tests, they never finish (probably there is some unhandled panic)

hcrosse added 2 commits April 17, 2026 13:03
The hang on unix CI jobs (test linux balista/crates, macos) surfaces
on an EXPLAIN test and on sort_shuffle tests. EXPLAIN does not execute
the physical plan, so the hang is outside the modified shuffle paths.
These markers let CI stderr reveal exactly which async step is stuck:

- setup_standalone entry, scheduler-spawned, each retry of the gRPC
  connect loop, executor-spawned, return
- scheduler init().await boundary, listener bind, gRPC task spawn
- executor listener bind, flight gRPC spawn, poll_loop spawn
- shuffle_writer Hash branch entry, handle.await boundary (negative
  control; should not fire on the EXPLAIN test)
- write_stream_to_disk entry, drop(tx), handle.await boundary (same
  negative-control role)

All marker lines are prefixed [dbg-1537] so they are easy to grep and
strip. Will be reverted once the hang is diagnosed.
@hcrosse
Copy link
Copy Markdown
Author

hcrosse commented Apr 17, 2026

I wasn't able to reproduce the failure locally, so I'm adding some temporary debug logs to try and see where it fails.

@milenkovicm
Copy link
Copy Markdown
Contributor

Will try later as well

@milenkovicm
Copy link
Copy Markdown
Contributor

i could not reproduce it locally as well

@milenkovicm
Copy link
Copy Markdown
Contributor

image

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

perf: Consider using async I/O (tokio::fs) in shuffle writer

5 participants