Skip to content

Make BatchPartitioner::partition_iter public for downstream async consumers #21311

@hcrosse

Description

@hcrosse

Is your feature request related to a problem or challenge?

BatchPartitioner::partition takes a sync FnMut closure, which means consumers that need to do I/O with the partitioned batches have to do it inline. In Ballista's shuffle writer, this blocks tokio worker threads because file I/O happens inside the closure.

The workaround is to move the entire partition call into spawn_blocking, but that also moves the CPU-bound partitioning work off the tokio workers, which is wasteful.

partition_iter already exists as a private method and returns an iterator over (partition_index, RecordBatch) pairs. DataFusion's own RepartitionExec uses it directly to iterate results and send them through async channels. The doc comment on the method says this separation was intentional:

"we need to have a variant of partition that works w/ sync functions, and one that works w/ async. Using an iterator as an intermediate representation was the best way to achieve this"

But since partition_iter is private, downstream crates can't use this pattern.

Describe the solution you'd like

Make partition_iter public (or add a public equivalent). The signature is already suitable:

pub fn partition_iter(
    &mut self,
    batch: RecordBatch,
) -> Result<impl Iterator<Item = Result<(usize, RecordBatch)>> + Send + '_>

This lets consumers partition on the async side and only push the I/O into spawn_blocking:

// async side
for (partition, batch) in partitioner.partition_iter(input_batch)? {
    tx.send((partition, batch)).await?;
}

// blocking side
while let Some((partition, batch)) = rx.blocking_recv() {
    writers[partition].write(&batch)?;
}

Describe alternatives you've considered

In apache/datafusion-ballista#1537 we moved both partitioning and I/O into spawn_blocking together. It works, but it's leaving performance on the table by running CPU work on the blocking pool.

Additional context

Ballista PR: apache/datafusion-ballista#1537

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions