Skip to content
Draft
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 61 additions & 2 deletions datafusion/physical-plan/src/repartition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use crate::stream::RecordBatchStreamAdapter;
use crate::{DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties, Statistics};

use arrow::array::{PrimitiveArray, RecordBatch, RecordBatchOptions};
use arrow::compute::take_arrays;
use arrow::compute::{take_arrays, BatchCoalescer};
use arrow::datatypes::{SchemaRef, UInt32Type};
use datafusion_common::config::ConfigOptions;
use datafusion_common::stats::Precision;
Expand Down Expand Up @@ -1194,9 +1194,18 @@ impl RepartitionExec {
partitioning: Partitioning,
metrics: RepartitionMetrics,
) -> Result<()> {
let is_hash_partitioning = matches!(&partitioning, Partitioning::Hash(_, _));
let mut partitioner =
BatchPartitioner::try_new(partitioning, metrics.repartition_time.clone())?;

let mut coalesce_batches = vec![];

if is_hash_partitioning {
for _ in 0..partitioner.num_partitions() {
coalesce_batches.push(BatchCoalescer::new(stream.schema(), 4096));
Copy link

Copilot AI Nov 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The hardcoded batch size of 4096 should use the configured batch_size from the session config. Other uses of BatchCoalescer::new in the codebase use context.session_config().batch_size() or config.execution.batch_size. Consider passing the batch_size as a parameter to pull_from_input from the caller (consume_input_streams) which has access to the context: Arc<TaskContext>.

Copilot uses AI. Check for mistakes.
}
}

// While there are still outputs to send to, keep pulling inputs
let mut batches_until_yield = partitioner.num_partitions();
while !output_channels.is_empty() {
Expand All @@ -1217,7 +1226,21 @@ impl RepartitionExec {
}

for res in partitioner.partition_iter(batch)? {
let (partition, batch) = res?;
let (partition, mut batch) = res?;
if is_hash_partitioning {
let coalesce_batches_partition = &mut coalesce_batches[partition];
coalesce_batches_partition.push_batch(batch)?;

if coalesce_batches_partition.has_completed_batch() {
coalesce_batches_partition.finish_buffered_batch()?;
batch = coalesce_batches_partition
.next_completed_batch()
.expect("has_completed_batch returned true");
} else {
// skip sending this batch
continue;
}
}
let size = batch.get_array_memory_size();

let timer = metrics.send_time[partition].timer();
Expand Down Expand Up @@ -1274,6 +1297,42 @@ impl RepartitionExec {
}
}

if is_hash_partitioning {
// flush any remaining coalesced batches
for (partition, coalesce_batch) in coalesce_batches.iter_mut().enumerate() {
coalesce_batch.finish_buffered_batch()?;
if let Some(batch) = coalesce_batch.next_completed_batch() {
let size = batch.get_array_memory_size();
// Check if channel still exists (may have been removed if receiver hung up)
if let Some(channel) = output_channels.get_mut(&partition) {
let (batch_to_send, is_memory_batch) =
match channel.reservation.lock().try_grow(size) {
Ok(_) => {
// Memory available - send in-memory batch
(RepartitionBatch::Memory(batch), true)
}
Err(_) => {
// We're memory limited - spill to SpillPool
// SpillPool handles file handle reuse and rotation
channel.spill_writer.push_batch(&batch)?;
// Send marker indicating batch was spilled
(RepartitionBatch::Spilled, false)
}
};

if channel.sender.send(Some(Ok(batch_to_send))).await.is_err() {
Copy link

Copilot AI Nov 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The send timer metric (metrics.send_time[partition]) is not being tracked for these final flush batches, unlike the main sending logic at line 1245. This will result in inaccurate metrics for hash partitioning operations as the time spent sending flushed batches won't be recorded.

Copilot uses AI. Check for mistakes.
// If the other end has hung up, it was an early shutdown (e.g. LIMIT)
// Only shrink memory if it was a memory batch
if is_memory_batch {
channel.reservation.lock().shrink(size);
}
output_channels.remove(&partition);
}
}
Copy link

Copilot AI Nov 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The batch sending logic in this flush section duplicates the logic from lines 1244-1272. This code duplication makes maintenance harder and increases the risk of inconsistencies. Consider extracting this logic into a helper function that both the main loop and flush section can use. The function could be named something like send_batch_to_channel and accept parameters for the batch, partition, channel, and metrics.

Copilot uses AI. Check for mistakes.
Copy link

Copilot AI Nov 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The flush section is missing the send_time timing metrics that are recorded in the main loop (line 1246). For consistency and proper performance monitoring, consider wrapping the sending logic with let timer = metrics.send_time[partition].timer(); ... timer.done(); similar to the main loop.

Copilot uses AI. Check for mistakes.
}
}
}

// Spill writers will auto-finalize when dropped
// No need for explicit flush
Ok(())
Expand Down
Loading