feat: move shuffle writer disk I/O off tokio worker threads#1537
feat: move shuffle writer disk I/O off tokio worker threads#1537hcrosse wants to merge 9 commits intoapache:mainfrom
Conversation
| path, | ||
| }); | ||
| let schema = stream.schema(); | ||
| let (tx, mut rx) = tokio::sync::mpsc::channel::<RecordBatch>(2); |
There was a problem hiding this comment.
Why 2 ?
Isn't it too low ? This may lead to less throughput due to more context switches.
There was a problem hiding this comment.
I originally went with 2 just to be conservative, but ran some benchmarks and I bumped the channel capacity from 2 to 8, and made it configurable via ShuffleWriterExec::with_channel_capacity(). I also added --channel-capacity to the benchmark so this can be checked more easily. Happy to remove the configurability if you feel like that's overkill.
For benchmarking, I ran capacity sweeps from 1 to 32 channels and compared against the baseline at a few concurrency levels (without much variance):
| Cap | Median | Inv CS |
|---|---|---|
| 1 | 217.0ms | 883 |
| 2 | 203.3ms | 845 |
| 4 | 202.6ms | 834 |
| 8 | 199.2ms | 817 |
| 16 | 203.0ms | 787 |
| 32 | 200.2ms | 724 |
| Mode | Median | Inv CS |
|---|---|---|
| Channel (cap=8) | 199.2ms | 817 |
| Inline (no channel) | 199.6ms | 183 |
There wasn't a huge difference at any level beyond going from 1 -> 2, except for larger capacity modestly reducing involuntary context switches (from 883 to 724 from cap 1 to 32), which is consistent with fewer producer/consumer sync points.
I also ran a load test with 8 concurrent shuffles on 4 tokio workers, measuring tokio::time::sleep jitter as a proxy for worker health. Inline I/O had p99 jitter of 120-160ms vs ~3-11ms with the channel.
|
@sqlbenchmark tpch --iterations 3 --scale-factor 1 |
Ballista TPC-H Benchmark ResultsPR: #1537 - Fix formatting in shuffle_bench and sort imports Query Comparison
Total: Main=3933.20ms, PR=3487.80ms (-11.3%) Automated benchmark run by dfbench |
|
@sqlbenchmark tpch --iterations 3 --scale-factor 10 |
Ballista TPC-H Benchmark ResultsPR: #1537 - configurable channel capacity for shuffle writer, defaults to 8 Query Comparison
Total: Main=41137.80ms, PR=36030.90ms (-12.4%) Automated benchmark run by dfbench |
|
Looks like I was wrong about if this is going to be measurable improvement. We'll done @hcrosse Will have a look over weekend |
|
@sqlbenchmark tpch --iterations 3 --scale-factor 1 |
Ballista TPC-H Benchmark ResultsPR: #1537 - configurable channel capacity for shuffle writer, defaults to 8 Query Comparison
Total: Main=3922.60ms, PR=3452.70ms (-12.0%) Automated benchmark run by dfbench |
milenkovicm
left a comment
There was a problem hiding this comment.
just a heads up #1527 will remove full path from shuffles
| shuffle_output_partitioning, | ||
| metrics: ExecutionPlanMetricsSet::new(), | ||
| properties, | ||
| channel_capacity: DEFAULT_SHUFFLE_CHANNEL_CAPACITY, |
There was a problem hiding this comment.
if we want to make this configurable, perhaps we could add it to BallistaConfig ?
| num_batches += 1; | ||
| num_rows += batch.num_rows(); | ||
| num_bytes += batch_size_bytes; | ||
| let mut writer = |
There was a problem hiding this comment.
Instead of doing more IO - I think this mainly does more CPU in parallel.
(Not necessarily bad, but it will have an effect that each partitions work will no longer be bound to a thread).
There was a problem hiding this comment.
Yeah good point! apache/datafusion#21311 would let us partition on the async side and only move serialization + IO to the blocking pool.
There was a problem hiding this comment.
I mean something slightly differently.
This adds extra parallelism by adding extra worker threads.
This helps when there is some skew, i.e. one partition is finishing faster than the other (and this is likely the source of the speedup) but also increases memory usage and decreases cache-friendliness.
With e.g. morsel-driven parallelism: apache/datafusion#20529 this would be not less needed as each thread is able to pull enough work from the scan.
You could implement a similar approach for ShuffleReader as well, eliminating / reducing the need to do it at the write side.
There was a problem hiding this comment.
Cool proposal! I agree, the channel does add extra parallelism beyond just moving I/O off the runtime, and morsel-driven execution would likely eat into the throughput gains we're seeing from that in the TCP-H benchmarks.
The main motivation for the spawn_blocking work is keeping blocking writes off tokio workers so concurrent tasks don't stall, and I think that need stays even with morsel-driven execution upstream.
On the ShuffleReader side, could you say more about what you have in mind? Local reads already use spawn_blocking but process files sequentially. Are you suggesting we pipeline reads through a channel the same way, or something else?
There was a problem hiding this comment.
For local disk IO stalling the runtime is usually not a big factor, as there is not so much it can do at the same time (in the same worker thread). I bet if you would only use spawn_blocking or tokio::fs for writes you would see smaller gains?
That in itself is interesting to learn.
Maybe I am wrong and mostly projecting my experience with Parquet reads :)
For Shufflereader, i think its a bit tricky given that the current format is entirely stream based.
It probably would need to write the shuffle files in smaller blocks (e.g. of 50MB or something each) so it can parallelize / morselize reads better.
There was a problem hiding this comment.
Indeed, the write path (but not the compress path).
There was a problem hiding this comment.
@andygrove is experimenting with shuffle chunking in apache/datafusion-comet#3845
I wonder should we focus more on improving sort shuffle implementation as this one won't scale with huge number of partitions
There was a problem hiding this comment.
Looked at the comet PR and batch coalescing per partition seems like the right fix for large partition counts, so I agree we should do that as a follow-up (happy to take an attempt myself!)
IMO they're complementary, but open to bundling it here or closing this and starting fresh if that makes more sense.
There was a problem hiding this comment.
It's your call @hcrosse and @Dandandan how to follow up.
@Dandandan wdyt?
There was a problem hiding this comment.
I am not too opinionated how to move forward.
This PR shows great perf improvements, probable the cost (some extra memory usage / bit of extra tokio overhead) is probably relatively minor for the improvement.
In the future we can probably benefit from queu-based scan/ workstealing for file sources and implement it for shuffle scans, so partition skew is handled better.
At that point we probably can disable it in most cases (only keep IO off the path instead of compress + write)
- Hash repartition: send batches over bounded mpsc channel to spawn_blocking task that owns BatchPartitioner and per-output writers - No-repartition: channel + spawn_blocking for write_stream_to_disk - Surface write errors when stream errors take priority - Track partition file size from disk metadata instead of in-memory batch size - Add write failure propagation tests for both paths
- Add ballista.shuffle.writer.channel_capacity config key (default 8) - Read from BallistaConfig in serde deserialization via TaskContext - Wire through execution_engine to ShuffleWriterExec
Move channel_capacity extraction from serde deserialization to the executor's create_query_stage_exec, which now has access to SessionConfig (upstream apache#1542). This is the clean place for executor-side late binding.
183a370 to
004c5f9
Compare
|
@sqlbenchmark tpch --iterations 3 --scale-factor 1 |
Ballista TPC-H Benchmark ResultsPR: #1537 - Read channel capacity from BallistaConfig in create_query_stage_exec Query Comparison
Total: Main=4214.70ms, PR=3752.30ms (-11.0%) Automated benchmark run by dfbench |
|
@sqlbenchmark tpch --iterations 3 --scale-factor 10 |
Ballista TPC-H Benchmark ResultsPR: #1537 - Read channel capacity from BallistaConfig in create_query_stage_exec Query Comparison
Total: Main=40162.70ms, PR=36608.80ms (-8.8%) Automated benchmark run by dfbench |
- Collapse multiline error! macro in utils.rs to single line - Reorder imports alphabetically in execution_engine.rs - Merge import line in serde/mod.rs
Replace .iter() with key destructuring with .values() in three places: - aqe/mod.rs: running_tasks() - execution_graph.rs: running_tasks() - executor_manager.rs: get_expired_executors()
- Replace if-in-match-arm with match guards for Char('4'/'5'/'6') key bindings
- Replace sort_by with sort_by_key in get_jobs
|
looks like there is issue with some of the tests, they never finish (probably there is some unhandled panic) |
The hang on unix CI jobs (test linux balista/crates, macos) surfaces on an EXPLAIN test and on sort_shuffle tests. EXPLAIN does not execute the physical plan, so the hang is outside the modified shuffle paths. These markers let CI stderr reveal exactly which async step is stuck: - setup_standalone entry, scheduler-spawned, each retry of the gRPC connect loop, executor-spawned, return - scheduler init().await boundary, listener bind, gRPC task spawn - executor listener bind, flight gRPC spawn, poll_loop spawn - shuffle_writer Hash branch entry, handle.await boundary (negative control; should not fire on the EXPLAIN test) - write_stream_to_disk entry, drop(tx), handle.await boundary (same negative-control role) All marker lines are prefixed [dbg-1537] so they are easy to grep and strip. Will be reverted once the hang is diagnosed.
|
I wasn't able to reproduce the failure locally, so I'm adding some temporary debug logs to try and see where it fails. |
|
Will try later as well |
|
i could not reproduce it locally as well |

Which issue does this PR close?
Closes #1387.
Rationale
The shuffle writer performs synchronous
std::fsI/O inside async task contexts. Under concurrent shuffle workloads, this blocks tokio worker threads and degrades runtime responsiveness for co-located async tasks.What changes are included in this PR?
Both the hash repartition path in
shuffle_writer.rsand the no-repartition path inutils::write_stream_to_disknow use a boundedmpsc::channel(2)to bridge the async record-batch stream to aspawn_blockingtask that owns all file I/O. This is the same pattern already used inflight_service.rs.shuffle_writer.rs: hash repartition path rewritten with bounded-channel +spawn_blocking,create_dir_allconverted totokio::fs, removed double-counting ofwrite_timein the no-repartition path, two write-failure regression tests addedutils.rs:write_stream_to_diskrewritten with bounded-channel +spawn_blockingballista/core/Cargo.toml: added"fs"feature to tokio dependencyshuffle_bench.rs: added--concurrencyflag for measuring concurrent partition execution, modeling how the Ballista executor dispatches tasks viaSemaphore(task_slots)Benchmark results
All numbers from
shuffle_benchwith 3 iterations averaged, targeting local tmpfs. Format is before -> after (delta).Default configuration (8 input partitions, 100 output partitions):
Output partition sensitivity (5,000,000 rows, 8 input partitions):
Input partition sensitivity (5,000,000 rows, concurrency 4, 100 output partitions):
Throughput results are mixed, which is expected when the I/O target is local tmpfs with near-zero latency. In this scenario the channel indirection adds overhead that roughly offsets the benefit of freeing tokio workers.
The benefit shows most clearly at moderate concurrency (c=4) with 8 input partitions, where the tokio threadpool has enough in-flight tasks to take advantage of freed workers. At c=1 there is no contention so no benefit. At c=8, I/O bandwidth saturates and the channel overhead roughly cancels out the worker-freeing benefit. The slight regression at p=50 reflects the smaller per-batch partition cost making the channel overhead proportionally larger.
The primary benefit is runtime health rather than raw throughput: keeping tokio workers unblocked prevents latency spikes in co-located async tasks. A separate runtime-probe microbenchmark confirmed that inline sync I/O causes 3-8 second probe latencies vs 2-8 ms with
spawn_blocking, even when aggregate throughput is similar.Future work
Both partitioning and I/O currently run together in
spawn_blockingbecauseBatchPartitioner::partitiontakes a sync closure. Making the privatepartition_itermethod public upstream would let us partition on the async side and only push I/O to the blocking pool. See apache/datafusion#21311.Are these changes tested?
All existing
shuffle_writertests pass. Two new regression tests verify that errors propagate correctly through the bounded-channel for both the no-repartition and hash repartition paths. Theshuffle_benchbinary now accepts a--concurrencyflag for validating concurrent execution behavior.Are there any user-facing changes?
No. This is an internal change to the shuffle writer execution path.