Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
23536b6
Fix metrics display on leaf nodes
gabotechs Jan 31, 2026
ae61551
Add bytes_per_row.rs
gabotechs Jan 23, 2026
1e3074a
Add compute_per_node.rs
gabotechs Jan 23, 2026
1383ac4
Add rows_per_node.rs
gabotechs Jan 24, 2026
74bb88e
Refactor: move children split logic to children_isolator_union_split.rs
gabotechs Jan 24, 2026
3810d81
Clean up public API of statistics/ module
gabotechs Jan 25, 2026
71076d6
Adapt codebase for new cost-based planner
gabotechs Jan 25, 2026
b1c2874
Adapt tests for cost-based planner
gabotechs Jan 25, 2026
aa2611c
Adapt examples for cost-based planner
gabotechs Jan 25, 2026
573bcc4
Adapt benchmarks for cost-based planner
gabotechs Jan 25, 2026
50c57d4
Fix integration tests
gabotechs Jan 27, 2026
53549bc
Change force_one_task to max_tasks
gabotechs Jan 27, 2026
dbcf555
Fix test
gabotechs Jan 27, 2026
79d5054
Add LIKE selectivity
gabotechs Jan 31, 2026
7c49321
Improve compute_per_node.rs
gabotechs Jan 31, 2026
7ea55e3
Better bytes-processed-per-partition
gabotechs Jan 31, 2026
5f46f89
Add some cost to DataSourceExec
gabotechs Jan 31, 2026
b34b74b
Fix bad estimations on JOINs
gabotechs Feb 1, 2026
bc524c5
Use compute class M for DataSourceExec
gabotechs Feb 1, 2026
ab2b734
Use upstream statistics system
gabotechs Feb 6, 2026
2f2d2e7
Add one more test to children_isolator_union_split.rs
gabotechs Feb 7, 2026
06d4119
Expose ComputeCostClass to the public and make it configurable by the…
gabotechs Feb 7, 2026
2ca7657
Add complexity-based cost attribution
gabotechs Feb 8, 2026
e0c3f2b
Fix network boundary cost
gabotechs Feb 8, 2026
456c193
Adapt estimations about sizes and cost
gabotechs Feb 9, 2026
7ff2066
Complexity based cost analyzer
gabotechs Feb 9, 2026
e211c01
Revert unintended changes
gabotechs Feb 9, 2026
b664e99
cargo fmt
gabotechs Feb 9, 2026
2e8d1d3
Rename file to default_bytes_for_datatype.rs
gabotechs Feb 9, 2026
346a535
Better display of time complexity
gabotechs Feb 9, 2026
45aeb0b
Add integration tests for statistics propagation
gabotechs Feb 10, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 5 additions & 10 deletions benchmarks/cdk/bin/datafusion-bench.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@ async function main() {
program
.requiredOption('--dataset <string>', 'Dataset to run queries on')
.option('-i, --iterations <number>', 'Number of iterations', '3')
.option('--files-per-task <number>', 'Files per task', '8')
.option('--cardinality-task-sf <number>', 'Cardinality task scale factor', '1')
.option('--bytes-processed-per-partition <number>', 'How many bytes each partition is expected to process', '8388608') // 8 Mb default
.option('--batch-size <number>', 'Standard Batch coalescing size (number of rows)', '32768')
.option('--shuffle-batch-size <number>', 'Shuffle batch coalescing size (number of rows)', '32768')
.option('--children-isolator-unions <number>', 'Use children isolator unions', 'true')
Expand All @@ -29,8 +28,7 @@ async function main() {

const dataset: string = options.dataset
const iterations = parseInt(options.iterations);
const filesPerTask = parseInt(options.filesPerTask);
const cardinalityTaskSf = parseInt(options.cardinalityTaskSf);
const bytesProcessedPerPartition = parseInt(options.bytesProcessedPerPartition)
const batchSize = parseInt(options.batchSize);
const shuffleBatchSize = parseInt(options.shuffleBatchSize);
const compression = options.compression;
Expand All @@ -42,8 +40,7 @@ async function main() {
const warmup = options.warmup === 'true' || options.debug === 1

const runner = new DataFusionRunner({
filesPerTask,
cardinalityTaskSf,
bytesProcessedPerPartition,
batchSize,
shuffleBatchSize,
collectMetrics,
Expand Down Expand Up @@ -72,8 +69,7 @@ class DataFusionRunner implements BenchmarkRunner {
private url = 'http://localhost:9000';

constructor(private readonly options: {
filesPerTask: number;
cardinalityTaskSf: number;
bytesProcessedPerPartition: number;
batchSize: number;
shuffleBatchSize: number;
collectMetrics: boolean;
Expand Down Expand Up @@ -124,8 +120,7 @@ class DataFusionRunner implements BenchmarkRunner {
}
await this.query(stmt);
await this.query(`
SET distributed.files_per_task=${this.options.filesPerTask};
SET distributed.cardinality_task_count_factor=${this.options.cardinalityTaskSf};
SET distributed.bytes_processed_per_partition=${this.options.bytesProcessedPerPartition};
SET datafusion.execution.batch_size=${this.options.batchSize};
SET distributed.shuffle_batch_size=${this.options.shuffleBatchSize};
SET distributed.collect_metrics=${this.options.collectMetrics};
Expand Down
17 changes: 4 additions & 13 deletions benchmarks/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,9 @@ pub struct RunOpt {
#[structopt(long)]
threads: Option<usize>,

/// Number of files per each distributed task.
#[structopt(long)]
files_per_task: Option<usize>,

/// Task count scale factor for when nodes in stages change the cardinality of the data
#[structopt(long)]
cardinality_task_sf: Option<f64>,
/// Number of bytes each partition is expected to process.
#[structopt(long, default_value = "8388608")] // 8 Mb
bytes_processed_per_partition: usize,

/// Use children isolator UNIONs for distributing UNION operations.
#[structopt(long)]
Expand Down Expand Up @@ -176,12 +172,7 @@ impl RunOpt {
.with_config(self.config()?)
.with_distributed_worker_resolver(LocalHostWorkerResolver::new(self.workers.clone()))
.with_physical_optimizer_rule(Arc::new(DistributedPhysicalOptimizerRule))
.with_distributed_files_per_task(
self.files_per_task.unwrap_or(get_available_parallelism()),
)?
.with_distributed_cardinality_effect_task_scale_factor(
self.cardinality_task_sf.unwrap_or(1.0),
)?
.with_distributed_bytes_processed_per_partition(self.bytes_processed_per_partition)?
.with_distributed_compression(match self.compression.as_str() {
"zstd" => Some(CompressionType::ZSTD),
"lz4" => Some(CompressionType::LZ4_FRAME),
Expand Down
77 changes: 36 additions & 41 deletions examples/custom_execution_plan.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,8 @@ Uses `DistributedTaskContext` to determine which range to generate.
**NumbersExecCodec** – Protobuf-based serialization implementing `PhysicalExtensionCodec`.
Must be registered in the `SessionStateBuilder` that initiates the query as well as the one used by `Worker`s.

**NumbersTaskEstimator** – Controls distributed parallelism:

- `task_estimation()` - Returns how many tasks needed based on range size and config
- `scale_up_leaf_node()` - Splits single range of numbers into N per-task ranges

**NumbersConfig** – Custom config extension for controlling distributed parallelism (`numbers_per_task: usize`)
**NumbersDistributedPlannerExtension** – Controls distributed parallelism:
- `scale_up_leaf_node()` - Splits single range of numbers into N per-task ranges

## Usage

Expand Down Expand Up @@ -49,16 +45,20 @@ SortPreservingMergeExec: [number@0 ASC NULLS LAST]

This will print a non-distributed plan, as the range of numbers we are querying (`numbers(0, 10)`) is small.

The config parameter `numbers.numbers_per_task` is the one that controls how many distributed tasks are used in the
query, and it's default value is `10`, so querying 10 numbers will not distribute the plan.
Distributed DataFusion has a config parameter that allows controlling the parallelism of a distributed query:
`distributed.bytes_processed_per_partition`.

It determines how many bytes each partition is expected to handle, and if handling the query would require more
partitions than CPUs the machine has, then the query will get distributed across workers.

However, if we try to query 11 numbers:
For example, if we set `distributed.bytes_processed_per_partition` to something very low, like 10 bytes,
the query will get distributed:

```bash
cargo run \
--features integration \
--example custom_execution_plan \
"SELECT DISTINCT number FROM numbers(0, 11) ORDER BY number" \
"SET distributed.bytes_processed_per_partition=10;SELECT DISTINCT number FROM numbers(0, 11) ORDER BY number" \
--show-distributed-plan
```

Expand All @@ -70,79 +70,74 @@ cargo run \
┌───── Stage 2 ── Tasks: t0:[p0..p15] t1:[p0..p15]
│ SortExec: expr=[number@0 ASC NULLS LAST], preserve_partitioning=[true]
│ AggregateExec: mode=FinalPartitioned, gby=[number@0 as number], aggr=[]
│ [Stage 1] => NetworkShuffleExec: output_partitions=16, input_tasks=2
│ [Stage 1] => NetworkShuffleExec: output_partitions=16, input_tasks=3
└──────────────────────────────────────────────────
┌───── Stage 1 ── Tasks: t0:[p0..p31] t1:[p0..p31]
│ CoalesceBatchesExec: target_batch_size=8192
│ RepartitionExec: partitioning=Hash([number@0], 32), input_partitions=16
│ AggregateExec: mode=Partial, gby=[number@0 as number], aggr=[]
│ RepartitionExec: partitioning=RoundRobinBatch(16), input_partitions=1
│ CooperativeExec
│ NumbersExec: t0:[0-6), t1:[6-11)
┌───── Stage 1 ── Tasks: t0:[p0..p31] t1:[p0..p31] t2:[p0..p31]
│ RepartitionExec: partitioning=Hash([number@0], 32), input_partitions=1
│ AggregateExec: mode=Partial, gby=[number@0 as number], aggr=[]
│ CooperativeExec
│ NumbersExec: t0:[0-4), t1:[4-8), t2:[8-11)
└──────────────────────────────────────────────────
```

The distribution rule kicks in, and the plan gets distributed.

Note that the parallelism in the plan has an upper threshold, so for example, if we query 100 numbers:
Note that the parallelism in the plan has an upper threshold, so for example, if we query 100 numbers so that
more rows flow through the query:

```bash
cargo run \
--features integration \
--example custom_execution_plan \
"SELECT DISTINCT number FROM numbers(0, 100) ORDER BY number" \
"SET distributed.bytes_processed_per_partition=10;SELECT DISTINCT number FROM numbers(0, 100) ORDER BY number" \
--show-distributed-plan
```

```
┌───── DistributedExec ── Tasks: t0:[p0]
│ SortPreservingMergeExec: [number@0 ASC NULLS LAST]
│ [Stage 2] => NetworkCoalesceExec: output_partitions=48, input_tasks=3
│ [Stage 2] => NetworkCoalesceExec: output_partitions=64, input_tasks=4
└──────────────────────────────────────────────────
┌───── Stage 2 ── Tasks: t0:[p0..p15] t1:[p0..p15] t2:[p0..p15]
┌───── Stage 2 ── Tasks: t0:[p0..p15] t1:[p0..p15] t2:[p0..p15] t3:[p0..p15]
│ SortExec: expr=[number@0 ASC NULLS LAST], preserve_partitioning=[true]
│ AggregateExec: mode=FinalPartitioned, gby=[number@0 as number], aggr=[]
│ [Stage 1] => NetworkShuffleExec: output_partitions=16, input_tasks=4
└──────────────────────────────────────────────────
┌───── Stage 1 ── Tasks: t0:[p0..p47] t1:[p0..p47] t2:[p0..p47] t3:[p0..p47]
│ CoalesceBatchesExec: target_batch_size=8192
│ RepartitionExec: partitioning=Hash([number@0], 48), input_partitions=16
│ AggregateExec: mode=Partial, gby=[number@0 as number], aggr=[]
│ RepartitionExec: partitioning=RoundRobinBatch(16), input_partitions=1
│ CooperativeExec
│ NumbersExec: t0:[0-25), t1:[25-50), t2:[50-75), t3:[75-100)
┌───── Stage 1 ── Tasks: t0:[p0..p63] t1:[p0..p63] t2:[p0..p63] t3:[p0..p63]
│ RepartitionExec: partitioning=Hash([number@0], 64), input_partitions=1
│ AggregateExec: mode=Partial, gby=[number@0 as number], aggr=[]
│ CooperativeExec
│ NumbersExec: t0:[0-25), t1:[25-50), t2:[50-75), t3:[75-100)
└──────────────────────────────────────────────────
```

We do not get 100/10 = 10 distributed tasks, we just get 4. This is because the example is configured by default to
simulate a 4-worker cluster. If we increase the worker count, we get a highly distributed plan out with 10 tasks:
We do not get many more distributed tasks, we just get 4. This is because the example is configured by default to
simulate a 4-worker cluster. If we increase the worker count, we get a highly distributed plan with more parallelism:

```bash
cargo run \
--features integration \
--example custom_execution_plan \
"SELECT DISTINCT number FROM numbers(0, 100) ORDER BY number" \
"SET distributed.bytes_processed_per_partition=10;SELECT DISTINCT number FROM numbers(0, 100) ORDER BY number" \
--workers 10 \
--show-distributed-plan
```

```
┌───── DistributedExec ── Tasks: t0:[p0]
│ SortPreservingMergeExec: [number@0 ASC NULLS LAST]
│ [Stage 2] => NetworkCoalesceExec: output_partitions=112, input_tasks=7
│ [Stage 2] => NetworkCoalesceExec: output_partitions=160, input_tasks=10
└──────────────────────────────────────────────────
┌───── Stage 2 ── Tasks: t0:[p0..p15] t1:[p0..p15] t2:[p0..p15] t3:[p0..p15] t4:[p0..p15] t5:[p0..p15] t6:[p0..p15]
┌───── Stage 2 ── Tasks: t0:[p0..p15] t1:[p0..p15] t2:[p0..p15] t3:[p0..p15] t4:[p0..p15] t5:[p0..p15] t6:[p0..p15] t7:[p0..p15] t8:[p0..p15] t9:[p0..p15]
│ SortExec: expr=[number@0 ASC NULLS LAST], preserve_partitioning=[true]
│ AggregateExec: mode=FinalPartitioned, gby=[number@0 as number], aggr=[]
│ [Stage 1] => NetworkShuffleExec: output_partitions=16, input_tasks=10
└──────────────────────────────────────────────────
┌───── Stage 1 ── Tasks: t0:[p0..p111] t1:[p0..p111] t2:[p0..p111] t3:[p0..p111] t4:[p0..p111] t5:[p0..p111] t6:[p0..p111] t7:[p0..p111] t8:[p0..p111] t9:[p0..p111]
│ CoalesceBatchesExec: target_batch_size=8192
│ RepartitionExec: partitioning=Hash([number@0], 112), input_partitions=16
│ AggregateExec: mode=Partial, gby=[number@0 as number], aggr=[]
│ RepartitionExec: partitioning=RoundRobinBatch(16), input_partitions=1
│ CooperativeExec
│ NumbersExec: t0:[0-10), t1:[10-20), t2:[20-30), t3:[30-40), t4:[40-50), t5:[50-60), t6:[60-70), t7:[70-80), t8:[80-90), t9:[90-100)
┌───── Stage 1 ── Tasks: t0:[p0..p159] t1:[p0..p159] t2:[p0..p159] t3:[p0..p159] t4:[p0..p159] t5:[p0..p159] t6:[p0..p159] t7:[p0..p159] t8:[p0..p159] t9:[p0..p159]
│ RepartitionExec: partitioning=Hash([number@0], 160), input_partitions=1
│ AggregateExec: mode=Partial, gby=[number@0 as number], aggr=[]
│ CooperativeExec
│ NumbersExec: t0:[0-10), t1:[10-20), t2:[20-30), t3:[30-40), t4:[40-50), t5:[50-60), t6:[60-70), t7:[70-80), t8:[80-90), t9:[90-100)
└──────────────────────────────────────────────────
```

72 changes: 31 additions & 41 deletions examples/custom_execution_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
//! - Custom TableProvider for mapping the table function to an execution plan
//! - Custom ExecutionPlan for returning the requested number range
//! - Custom PhysicalExtensionCodec for serialization across the network
//! - Custom TaskEstimator to control parallelism
//! - Custom DistributedPlannerExtension to control how to scale up a custom node
//!
//! Run this example with:
//! ```bash
Expand All @@ -23,24 +23,26 @@ use arrow::record_batch::RecordBatchOptions;
use arrow::util::pretty::pretty_format_batches;
use async_trait::async_trait;
use datafusion::catalog::{Session, TableFunctionImpl};
use datafusion::common::stats::Precision;
use datafusion::common::{
DataFusionError, Result, ScalarValue, exec_err, extensions_options, internal_err, plan_err,
DataFusionError, Result, ScalarValue, Statistics, exec_err, internal_err, plan_err,
};
use datafusion::config::ConfigExtension;
use datafusion::datasource::{TableProvider, TableType};
use datafusion::execution::{SendableRecordBatchStream, SessionStateBuilder, TaskContext};
use datafusion::logical_expr::Expr;
use datafusion::physical_expr::EquivalenceProperties;
use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
use datafusion::prelude::{SessionConfig, SessionContext};
use datafusion::physical_plan::{
ColumnStatistics, DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties,
};
use datafusion::prelude::SessionContext;
use datafusion_distributed::test_utils::in_memory_channel_resolver::{
InMemoryChannelResolver, InMemoryWorkerResolver,
};
use datafusion_distributed::{
DistributedExt, DistributedPhysicalOptimizerRule, DistributedTaskContext, TaskEstimation,
TaskEstimator, WorkerQueryContext, display_plan_ascii,
DistributedExt, DistributedPhysicalOptimizerRule, DistributedPlannerExtension,
DistributedTaskContext, WorkerQueryContext, display_plan_ascii,
};
use datafusion_proto::physical_plan::PhysicalExtensionCodec;
use datafusion_proto::protobuf;
Expand Down Expand Up @@ -219,6 +221,23 @@ impl ExecutionPlan for NumbersExec {
stream::once(async { Ok(batch) }),
)))
}

/// Implementing [ExecutionPlan::partition_statistics] is essential for Distributed DataFusion
/// to infer how much compute is going to be needed across the plan.
fn partition_statistics(&self, _: Option<usize>) -> Result<Statistics> {
let mut stats = Statistics::default();
let num_rows = self
.ranges_per_task
.iter()
.map(|v| v.end - v.start)
.sum::<i64>();

stats.num_rows = Precision::Exact(num_rows as usize);
stats.column_statistics =
vec![ColumnStatistics::new_unknown().with_distinct_count(stats.num_rows)];

Ok(stats)
}
}

/// Custom codec for serializing/deserializing NumbersExec across the network. As the NumbersExec
Expand Down Expand Up @@ -261,7 +280,7 @@ impl PhysicalExtensionCodec for NumbersExecCodec {
.schema
.as_ref()
.map(|s| s.try_into())
.ok_or(proto_error("NetworkShuffleExec is missing schema"))??;
.ok_or(proto_error("NumbersExec is missing schema"))??;

Ok(Arc::new(NumbersExec::new(
proto.ranges.iter().map(|v| v.start..v.end),
Expand Down Expand Up @@ -292,37 +311,11 @@ impl PhysicalExtensionCodec for NumbersExecCodec {
}
}

extensions_options! {
/// Custom ConfigExtension for configuring NumbersExec distributed task estimation behavior
/// at runtime with SET statements.
struct NumbersConfig {
/// how many numbers each task will produce
numbers_per_task: usize, default = 10
}
}

impl ConfigExtension for NumbersConfig {
const PREFIX: &'static str = "numbers";
}

/// Custom TaskEstimator that tells the planner how to distribute NumbersExec.
/// Custom [DistributedPlannerExtension] that tells the planner how to distribute [NumbersExec].
#[derive(Debug)]
struct NumbersTaskEstimator;

impl TaskEstimator for NumbersTaskEstimator {
fn task_estimation(
&self,
plan: &Arc<dyn ExecutionPlan>,
cfg: &datafusion::config::ConfigOptions,
) -> Option<TaskEstimation> {
let plan = plan.as_any().downcast_ref::<NumbersExec>()?;
let cfg: &NumbersConfig = cfg.extensions.get()?;
let task_count = (plan.ranges_per_task[0].end - plan.ranges_per_task[0].start) as f64
/ cfg.numbers_per_task as f64;

Some(TaskEstimation::desired(task_count.ceil() as usize))
}
struct NumbersDistributedPlannerExtension;

impl DistributedPlannerExtension for NumbersDistributedPlannerExtension {
fn scale_up_leaf_node(
&self,
plan: &Arc<dyn ExecutionPlan>,
Expand Down Expand Up @@ -375,16 +368,13 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.build())
});

let config = SessionConfig::new().with_option_extension(NumbersConfig::default());

let state = SessionStateBuilder::new()
.with_default_features()
.with_config(config)
.with_distributed_worker_resolver(worker_resolver)
.with_distributed_channel_resolver(channel_resolver)
.with_physical_optimizer_rule(Arc::new(DistributedPhysicalOptimizerRule))
.with_distributed_user_codec(NumbersExecCodec)
.with_distributed_task_estimator(NumbersTaskEstimator)
.with_distributed_planner_extension(NumbersDistributedPlannerExtension)
.build();

let ctx = SessionContext::from(state);
Expand Down
3 changes: 2 additions & 1 deletion examples/in_memory_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,9 @@ async fn main() -> Result<(), Box<dyn Error>> {
.with_default_features()
.with_distributed_worker_resolver(InMemoryWorkerResolver)
.with_distributed_channel_resolver(InMemoryChannelResolver::new())
// Set to something very low so that we see some distribution.
.with_distributed_bytes_processed_per_partition(100)?
.with_physical_optimizer_rule(Arc::new(DistributedPhysicalOptimizerRule))
.with_distributed_files_per_task(1)?
.build();

let ctx = SessionContext::from(state);
Expand Down
3 changes: 2 additions & 1 deletion examples/localhost_run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@ async fn main() -> Result<(), Box<dyn Error>> {
let state = SessionStateBuilder::new()
.with_default_features()
.with_distributed_worker_resolver(localhost_resolver)
// Set to something very low so that we see some distribution.
.with_distributed_bytes_processed_per_partition(100)?
.with_physical_optimizer_rule(Arc::new(DistributedPhysicalOptimizerRule))
.with_distributed_files_per_task(1)?
.build();

let ctx = SessionContext::from(state);
Expand Down
Loading
Loading