From de2def6f07f3e88f02062b55b0efae981141bf7a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Sun, 9 Nov 2025 21:24:12 +0100 Subject: [PATCH 1/9] Coalesce batches inside hash-repartition --- .../physical-plan/src/repartition/mod.rs | 60 ++++++++++++++++++- 1 file changed, 58 insertions(+), 2 deletions(-) diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index 74cf79889599..41bd1cacfc41 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -41,7 +41,7 @@ use crate::stream::RecordBatchStreamAdapter; use crate::{DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties, Statistics}; use arrow::array::{PrimitiveArray, RecordBatch, RecordBatchOptions}; -use arrow::compute::take_arrays; +use arrow::compute::{take_arrays, BatchCoalescer}; use arrow::datatypes::{SchemaRef, UInt32Type}; use datafusion_common::config::ConfigOptions; use datafusion_common::stats::Precision; @@ -1194,9 +1194,18 @@ impl RepartitionExec { partitioning: Partitioning, metrics: RepartitionMetrics, ) -> Result<()> { + let is_hash_partitioning = matches!(&partitioning, Partitioning::Hash(_, _)); let mut partitioner = BatchPartitioner::try_new(partitioning, metrics.repartition_time.clone())?; + let mut coalesce_batches = vec![]; + + if is_hash_partitioning { + for _ in 0..output_channels.len() { + coalesce_batches.push(BatchCoalescer::new(stream.schema(), 4096)); + } + } + // While there are still outputs to send to, keep pulling inputs let mut batches_until_yield = partitioner.num_partitions(); while !output_channels.is_empty() { @@ -1217,7 +1226,20 @@ impl RepartitionExec { } for res in partitioner.partition_iter(batch)? { - let (partition, batch) = res?; + let (partition, mut batch) = res?; + if is_hash_partitioning { + let coalesce_batches_partition = &mut coalesce_batches[partition]; + coalesce_batches_partition.push_batch(batch)?; + + if coalesce_batches_partition.has_completed_batch() { + batch = coalesce_batches_partition + .next_completed_batch() + .expect("has_completed_batch returned true"); + } else { + // skip sending this batch + continue; + } + } let size = batch.get_array_memory_size(); let timer = metrics.send_time[partition].timer(); @@ -1274,6 +1296,40 @@ impl RepartitionExec { } } + if is_hash_partitioning { + // flush any remaining coalesced batches + for (partition, coalesce_batch) in coalesce_batches.iter_mut().enumerate() { + while let Some(batch) = coalesce_batch.next_completed_batch() { + let size = batch.get_array_memory_size(); + let channel = output_channels.get_mut(&partition).unwrap(); + + let (batch_to_send, is_memory_batch) = + match channel.reservation.lock().try_grow(size) { + Ok(_) => { + // Memory available - send in-memory batch + (RepartitionBatch::Memory(batch), true) + } + Err(_) => { + // We're memory limited - spill to SpillPool + // SpillPool handles file handle reuse and rotation + channel.spill_writer.push_batch(&batch)?; + // Send marker indicating batch was spilled + (RepartitionBatch::Spilled, false) + } + }; + + if channel.sender.send(Some(Ok(batch_to_send))).await.is_err() { + // If the other end has hung up, it was an early shutdown (e.g. LIMIT) + // Only shrink memory if it was a memory batch + if is_memory_batch { + channel.reservation.lock().shrink(size); + } + output_channels.remove(&partition); + } + } + } + } + // Spill writers will auto-finalize when dropped // No need for explicit flush Ok(()) From 59741aac9ada966b04736cef50cf6f951ce48f7d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Sun, 9 Nov 2025 21:47:58 +0100 Subject: [PATCH 2/9] Coalesce batches inside hash-repartition --- .../physical-plan/src/repartition/mod.rs | 47 ++++++++++--------- 1 file changed, 24 insertions(+), 23 deletions(-) diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index 41bd1cacfc41..ad8482d5340b 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -1201,7 +1201,7 @@ impl RepartitionExec { let mut coalesce_batches = vec![]; if is_hash_partitioning { - for _ in 0..output_channels.len() { + for _ in 0..partitioner.num_partitions() { coalesce_batches.push(BatchCoalescer::new(stream.schema(), 4096)); } } @@ -1301,30 +1301,31 @@ impl RepartitionExec { for (partition, coalesce_batch) in coalesce_batches.iter_mut().enumerate() { while let Some(batch) = coalesce_batch.next_completed_batch() { let size = batch.get_array_memory_size(); - let channel = output_channels.get_mut(&partition).unwrap(); - - let (batch_to_send, is_memory_batch) = - match channel.reservation.lock().try_grow(size) { - Ok(_) => { - // Memory available - send in-memory batch - (RepartitionBatch::Memory(batch), true) + // Check if channel still exists (may have been removed if receiver hung up) + if let Some(channel) = output_channels.get_mut(&partition) { + let (batch_to_send, is_memory_batch) = + match channel.reservation.lock().try_grow(size) { + Ok(_) => { + // Memory available - send in-memory batch + (RepartitionBatch::Memory(batch), true) + } + Err(_) => { + // We're memory limited - spill to SpillPool + // SpillPool handles file handle reuse and rotation + channel.spill_writer.push_batch(&batch)?; + // Send marker indicating batch was spilled + (RepartitionBatch::Spilled, false) + } + }; + + if channel.sender.send(Some(Ok(batch_to_send))).await.is_err() { + // If the other end has hung up, it was an early shutdown (e.g. LIMIT) + // Only shrink memory if it was a memory batch + if is_memory_batch { + channel.reservation.lock().shrink(size); } - Err(_) => { - // We're memory limited - spill to SpillPool - // SpillPool handles file handle reuse and rotation - channel.spill_writer.push_batch(&batch)?; - // Send marker indicating batch was spilled - (RepartitionBatch::Spilled, false) - } - }; - - if channel.sender.send(Some(Ok(batch_to_send))).await.is_err() { - // If the other end has hung up, it was an early shutdown (e.g. LIMIT) - // Only shrink memory if it was a memory batch - if is_memory_batch { - channel.reservation.lock().shrink(size); + output_channels.remove(&partition); } - output_channels.remove(&partition); } } } From 46b0c5abc7ce3fccc5382d6bc1b7cfcb10c7e648 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Sun, 9 Nov 2025 21:53:45 +0100 Subject: [PATCH 3/9] fix --- datafusion/physical-plan/src/repartition/mod.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index ad8482d5340b..4f392b6a44bd 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -1299,7 +1299,8 @@ impl RepartitionExec { if is_hash_partitioning { // flush any remaining coalesced batches for (partition, coalesce_batch) in coalesce_batches.iter_mut().enumerate() { - while let Some(batch) = coalesce_batch.next_completed_batch() { + coalesce_batch.finish_buffered_batch()?; + if let Some(batch) = coalesce_batch.next_completed_batch() { let size = batch.get_array_memory_size(); // Check if channel still exists (may have been removed if receiver hung up) if let Some(channel) = output_channels.get_mut(&partition) { From 09d83826322492b3e6d5a314108357c228b395c4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Sun, 9 Nov 2025 22:06:58 +0100 Subject: [PATCH 4/9] fix --- datafusion/physical-plan/src/repartition/mod.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index 4f392b6a44bd..57a1f2516326 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -1232,6 +1232,7 @@ impl RepartitionExec { coalesce_batches_partition.push_batch(batch)?; if coalesce_batches_partition.has_completed_batch() { + coalesce_batches_partition.finish_buffered_batch()?; batch = coalesce_batches_partition .next_completed_batch() .expect("has_completed_batch returned true"); From 63ceda4ee8d6f2f75167bb77b7eca69a7b6d6e82 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Sun, 9 Nov 2025 22:20:00 +0100 Subject: [PATCH 5/9] hardcode to current default --- datafusion/physical-plan/src/repartition/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index 57a1f2516326..84f153cc6533 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -1202,7 +1202,7 @@ impl RepartitionExec { if is_hash_partitioning { for _ in 0..partitioner.num_partitions() { - coalesce_batches.push(BatchCoalescer::new(stream.schema(), 4096)); + coalesce_batches.push(BatchCoalescer::new(stream.schema(), 8192)); } } From 747428a34b97b5b0f5ef07d5ff0fb6732c04a2d0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Sun, 9 Nov 2025 23:14:19 +0100 Subject: [PATCH 6/9] fix --- datafusion/physical-plan/src/repartition/mod.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index 84f153cc6533..9a8d9813e3da 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -1232,7 +1232,6 @@ impl RepartitionExec { coalesce_batches_partition.push_batch(batch)?; if coalesce_batches_partition.has_completed_batch() { - coalesce_batches_partition.finish_buffered_batch()?; batch = coalesce_batches_partition .next_completed_batch() .expect("has_completed_batch returned true"); From ca289440f57a6000f70dd2ccaf7656e79076b81d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Sun, 9 Nov 2025 23:28:42 +0100 Subject: [PATCH 7/9] fix --- datafusion/physical-plan/src/repartition/mod.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index 9a8d9813e3da..54103d70f5b6 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -1202,7 +1202,9 @@ impl RepartitionExec { if is_hash_partitioning { for _ in 0..partitioner.num_partitions() { - coalesce_batches.push(BatchCoalescer::new(stream.schema(), 8192)); + let coalescer = BatchCoalescer::new(stream.schema(), 8192) + .with_biggest_coalesce_batch_size(Some(4096)); + coalesce_batches.push(coalescer); } } From eeafd74a9337ba277731676c177a9a1531dc88b3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Mon, 10 Nov 2025 09:59:55 +0100 Subject: [PATCH 8/9] Only change heuristic --- .../physical-plan/src/repartition/mod.rs | 66 +++---------------- 1 file changed, 8 insertions(+), 58 deletions(-) diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index 54103d70f5b6..321a52516403 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -41,7 +41,7 @@ use crate::stream::RecordBatchStreamAdapter; use crate::{DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties, Statistics}; use arrow::array::{PrimitiveArray, RecordBatch, RecordBatchOptions}; -use arrow::compute::{take_arrays, BatchCoalescer}; +use arrow::compute::take_arrays; use arrow::datatypes::{SchemaRef, UInt32Type}; use datafusion_common::config::ConfigOptions; use datafusion_common::stats::Precision; @@ -1198,15 +1198,7 @@ impl RepartitionExec { let mut partitioner = BatchPartitioner::try_new(partitioning, metrics.repartition_time.clone())?; - let mut coalesce_batches = vec![]; - - if is_hash_partitioning { - for _ in 0..partitioner.num_partitions() { - let coalescer = BatchCoalescer::new(stream.schema(), 8192) - .with_biggest_coalesce_batch_size(Some(4096)); - coalesce_batches.push(coalescer); - } - } + let mut row_counts = vec![0usize; partitioner.num_partitions()]; // While there are still outputs to send to, keep pulling inputs let mut batches_until_yield = partitioner.num_partitions(); @@ -1228,18 +1220,12 @@ impl RepartitionExec { } for res in partitioner.partition_iter(batch)? { - let (partition, mut batch) = res?; + let (partition, batch) = res?; if is_hash_partitioning { - let coalesce_batches_partition = &mut coalesce_batches[partition]; - coalesce_batches_partition.push_batch(batch)?; - - if coalesce_batches_partition.has_completed_batch() { - batch = coalesce_batches_partition - .next_completed_batch() - .expect("has_completed_batch returned true"); - } else { - // skip sending this batch - continue; + row_counts[partition] += batch.num_rows(); + if row_counts[partition] >= 8192 { + row_counts[partition] = 0; + batches_until_yield = 0; // force yield } } let size = batch.get_array_memory_size(); @@ -1293,47 +1279,11 @@ impl RepartitionExec { if batches_until_yield == 0 { tokio::task::yield_now().await; batches_until_yield = partitioner.num_partitions(); - } else { + } else if !is_hash_partitioning { batches_until_yield -= 1; } } - if is_hash_partitioning { - // flush any remaining coalesced batches - for (partition, coalesce_batch) in coalesce_batches.iter_mut().enumerate() { - coalesce_batch.finish_buffered_batch()?; - if let Some(batch) = coalesce_batch.next_completed_batch() { - let size = batch.get_array_memory_size(); - // Check if channel still exists (may have been removed if receiver hung up) - if let Some(channel) = output_channels.get_mut(&partition) { - let (batch_to_send, is_memory_batch) = - match channel.reservation.lock().try_grow(size) { - Ok(_) => { - // Memory available - send in-memory batch - (RepartitionBatch::Memory(batch), true) - } - Err(_) => { - // We're memory limited - spill to SpillPool - // SpillPool handles file handle reuse and rotation - channel.spill_writer.push_batch(&batch)?; - // Send marker indicating batch was spilled - (RepartitionBatch::Spilled, false) - } - }; - - if channel.sender.send(Some(Ok(batch_to_send))).await.is_err() { - // If the other end has hung up, it was an early shutdown (e.g. LIMIT) - // Only shrink memory if it was a memory batch - if is_memory_batch { - channel.reservation.lock().shrink(size); - } - output_channels.remove(&partition); - } - } - } - } - } - // Spill writers will auto-finalize when dropped // No need for explicit flush Ok(()) From fc90f79ce49e7fa86ae8026c1a9869e1f20a5b09 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Mon, 10 Nov 2025 10:15:40 +0100 Subject: [PATCH 9/9] Only change heuristic --- datafusion/physical-plan/src/repartition/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index 321a52516403..573fb75f7855 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -1225,7 +1225,7 @@ impl RepartitionExec { row_counts[partition] += batch.num_rows(); if row_counts[partition] >= 8192 { row_counts[partition] = 0; - batches_until_yield = 0; // force yield + batches_until_yield -= 1; } } let size = batch.get_array_memory_size();