-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Coalesce batches inside hash-repartition #18572
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 4 commits
de2def6
59741aa
46b0c5a
09d8382
63ceda4
747428a
ca28944
eeafd74
fc90f79
0292b44
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -41,7 +41,7 @@ use crate::stream::RecordBatchStreamAdapter; | |
| use crate::{DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties, Statistics}; | ||
|
|
||
| use arrow::array::{PrimitiveArray, RecordBatch, RecordBatchOptions}; | ||
| use arrow::compute::take_arrays; | ||
| use arrow::compute::{take_arrays, BatchCoalescer}; | ||
| use arrow::datatypes::{SchemaRef, UInt32Type}; | ||
| use datafusion_common::config::ConfigOptions; | ||
| use datafusion_common::stats::Precision; | ||
|
|
@@ -1194,9 +1194,18 @@ impl RepartitionExec { | |
| partitioning: Partitioning, | ||
| metrics: RepartitionMetrics, | ||
| ) -> Result<()> { | ||
| let is_hash_partitioning = matches!(&partitioning, Partitioning::Hash(_, _)); | ||
| let mut partitioner = | ||
| BatchPartitioner::try_new(partitioning, metrics.repartition_time.clone())?; | ||
|
|
||
| let mut coalesce_batches = vec![]; | ||
|
|
||
| if is_hash_partitioning { | ||
| for _ in 0..partitioner.num_partitions() { | ||
| coalesce_batches.push(BatchCoalescer::new(stream.schema(), 4096)); | ||
| } | ||
| } | ||
|
|
||
| // While there are still outputs to send to, keep pulling inputs | ||
| let mut batches_until_yield = partitioner.num_partitions(); | ||
| while !output_channels.is_empty() { | ||
|
|
@@ -1217,7 +1226,21 @@ impl RepartitionExec { | |
| } | ||
|
|
||
| for res in partitioner.partition_iter(batch)? { | ||
| let (partition, batch) = res?; | ||
| let (partition, mut batch) = res?; | ||
| if is_hash_partitioning { | ||
| let coalesce_batches_partition = &mut coalesce_batches[partition]; | ||
| coalesce_batches_partition.push_batch(batch)?; | ||
|
|
||
| if coalesce_batches_partition.has_completed_batch() { | ||
| coalesce_batches_partition.finish_buffered_batch()?; | ||
| batch = coalesce_batches_partition | ||
| .next_completed_batch() | ||
| .expect("has_completed_batch returned true"); | ||
| } else { | ||
| // skip sending this batch | ||
| continue; | ||
| } | ||
| } | ||
| let size = batch.get_array_memory_size(); | ||
|
|
||
| let timer = metrics.send_time[partition].timer(); | ||
|
|
@@ -1274,6 +1297,42 @@ impl RepartitionExec { | |
| } | ||
| } | ||
|
|
||
| if is_hash_partitioning { | ||
| // flush any remaining coalesced batches | ||
| for (partition, coalesce_batch) in coalesce_batches.iter_mut().enumerate() { | ||
Dandandan marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| coalesce_batch.finish_buffered_batch()?; | ||
| if let Some(batch) = coalesce_batch.next_completed_batch() { | ||
| let size = batch.get_array_memory_size(); | ||
| // Check if channel still exists (may have been removed if receiver hung up) | ||
| if let Some(channel) = output_channels.get_mut(&partition) { | ||
| let (batch_to_send, is_memory_batch) = | ||
| match channel.reservation.lock().try_grow(size) { | ||
| Ok(_) => { | ||
| // Memory available - send in-memory batch | ||
| (RepartitionBatch::Memory(batch), true) | ||
| } | ||
| Err(_) => { | ||
| // We're memory limited - spill to SpillPool | ||
| // SpillPool handles file handle reuse and rotation | ||
| channel.spill_writer.push_batch(&batch)?; | ||
| // Send marker indicating batch was spilled | ||
| (RepartitionBatch::Spilled, false) | ||
| } | ||
| }; | ||
|
|
||
| if channel.sender.send(Some(Ok(batch_to_send))).await.is_err() { | ||
|
||
| // If the other end has hung up, it was an early shutdown (e.g. LIMIT) | ||
| // Only shrink memory if it was a memory batch | ||
| if is_memory_batch { | ||
| channel.reservation.lock().shrink(size); | ||
| } | ||
| output_channels.remove(&partition); | ||
| } | ||
| } | ||
|
||
| } | ||
| } | ||
| } | ||
|
|
||
| // Spill writers will auto-finalize when dropped | ||
| // No need for explicit flush | ||
| Ok(()) | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The hardcoded batch size of 4096 should use the configured batch_size from the session config. Other uses of
BatchCoalescer::newin the codebase usecontext.session_config().batch_size()orconfig.execution.batch_size. Consider passing the batch_size as a parameter topull_from_inputfrom the caller (consume_input_streams) which has access to thecontext: Arc<TaskContext>.