Is your feature request related to a problem or challenge?
BatchPartitioner::partition takes a sync FnMut closure, which means consumers that need to do I/O with the partitioned batches have to do it inline. In Ballista's shuffle writer, this blocks tokio worker threads because file I/O happens inside the closure.
The workaround is to move the entire partition call into spawn_blocking, but that also moves the CPU-bound partitioning work off the tokio workers, which is wasteful.
partition_iter already exists as a private method and returns an iterator over (partition_index, RecordBatch) pairs. DataFusion's own RepartitionExec uses it directly to iterate results and send them through async channels. The doc comment on the method says this separation was intentional:
"we need to have a variant of partition that works w/ sync functions, and one that works w/ async. Using an iterator as an intermediate representation was the best way to achieve this"
But since partition_iter is private, downstream crates can't use this pattern.
Describe the solution you'd like
Make partition_iter public (or add a public equivalent). The signature is already suitable:
pub fn partition_iter(
&mut self,
batch: RecordBatch,
) -> Result<impl Iterator<Item = Result<(usize, RecordBatch)>> + Send + '_>
This lets consumers partition on the async side and only push the I/O into spawn_blocking:
// async side
for (partition, batch) in partitioner.partition_iter(input_batch)? {
tx.send((partition, batch)).await?;
}
// blocking side
while let Some((partition, batch)) = rx.blocking_recv() {
writers[partition].write(&batch)?;
}
Describe alternatives you've considered
In apache/datafusion-ballista#1537 we moved both partitioning and I/O into spawn_blocking together. It works, but it's leaving performance on the table by running CPU work on the blocking pool.
Additional context
Ballista PR: apache/datafusion-ballista#1537
Is your feature request related to a problem or challenge?
BatchPartitioner::partitiontakes a syncFnMutclosure, which means consumers that need to do I/O with the partitioned batches have to do it inline. In Ballista's shuffle writer, this blocks tokio worker threads because file I/O happens inside the closure.The workaround is to move the entire
partitioncall intospawn_blocking, but that also moves the CPU-bound partitioning work off the tokio workers, which is wasteful.partition_iteralready exists as a private method and returns an iterator over(partition_index, RecordBatch)pairs. DataFusion's ownRepartitionExecuses it directly to iterate results and send them through async channels. The doc comment on the method says this separation was intentional:But since
partition_iteris private, downstream crates can't use this pattern.Describe the solution you'd like
Make
partition_iterpublic (or add a public equivalent). The signature is already suitable:This lets consumers partition on the async side and only push the I/O into
spawn_blocking:Describe alternatives you've considered
In apache/datafusion-ballista#1537 we moved both partitioning and I/O into
spawn_blockingtogether. It works, but it's leaving performance on the table by running CPU work on the blocking pool.Additional context
Ballista PR: apache/datafusion-ballista#1537