Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
481 changes: 227 additions & 254 deletions Cargo.lock

Large diffs are not rendered by default.

28 changes: 14 additions & 14 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
members = ["benchmarks", "cli", "console"]

[workspace.dependencies]
datafusion = { version = "52.0.0", default-features = false }
datafusion-proto = { version = "52.0.0" }
datafusion = { git = "https://github.com/apache/datafusion", branch = "branch-53", default-features = false }
datafusion-proto = { git = "https://github.com/apache/datafusion", branch = "branch-53" }

[package]
name = "datafusion-distributed"
Expand All @@ -19,9 +19,9 @@ datafusion = { workspace = true, features = [
"datetime_expressions",
] }
datafusion-proto = { workspace = true }
arrow-flight = "57.1.0"
arrow-select = "57.1.0"
arrow-ipc = { version = "57.1.0", features = ["zstd"] }
arrow-flight = "58"
arrow-select = "58"
arrow-ipc = { version = "58", features = ["zstd"] }
async-trait = "0.1.89"
tokio = { version = "1.48", features = ["full"] }
tonic = { version = "0.14.1", features = ["transport"] }
Expand All @@ -35,7 +35,7 @@ delegate = "0.13.4"
dashmap = "6.0.1"
prost = "0.14.1"
rand = "0.9"
object_store = "0.12.4"
object_store = "0.13"
bytes = "1.11"
pin-project = "1.1.10"
tokio-stream = { version = "0.1.17", features = ["sync"] }
Expand All @@ -46,10 +46,10 @@ sketches-ddsketch = "0.3.0"

# integration_tests deps
insta = { version = "1.46.0", features = ["filters"], optional = true }
tpchgen = { git = "https://github.com/clflushopt/tpchgen-rs", rev = "e83365a5a9101906eb9f78c5607b83bc59849acf", optional = true }
tpchgen-arrow = { git = "https://github.com/clflushopt/tpchgen-rs", rev = "e83365a5a9101906eb9f78c5607b83bc59849acf", optional = true }
parquet = { version = "57.1.0", optional = true }
arrow = { version = "57.1.0", optional = true, features = ["test_utils"] }
tpchgen = { git = "https://github.com/clflushopt/tpchgen-rs", rev = "438e9c2dbc25b2fff82c0efc08b3f13b5707874f", optional = true }
tpchgen-arrow = { git = "https://github.com/clflushopt/tpchgen-rs", rev = "438e9c2dbc25b2fff82c0efc08b3f13b5707874f", optional = true }
parquet = { version = "58", optional = true }
arrow = { version = "58", optional = true, features = ["test_utils"] }
hyper-util = { version = "0.1.16", optional = true }
pretty_assertions = { version = "1.4", optional = true }
reqwest = { version = "0.12", optional = true }
Expand Down Expand Up @@ -77,10 +77,10 @@ clickbench = ["integration"]
[dev-dependencies]
structopt = "0.3"
insta = { version = "1.46.0", features = ["filters"] }
tpchgen = { git = "https://github.com/clflushopt/tpchgen-rs", rev = "e83365a5a9101906eb9f78c5607b83bc59849acf" }
tpchgen-arrow = { git = "https://github.com/clflushopt/tpchgen-rs", rev = "e83365a5a9101906eb9f78c5607b83bc59849acf" }
parquet = "57.1.0"
arrow = { version = "57.1.0", features = ["test_utils"] }
tpchgen = { git = "https://github.com/clflushopt/tpchgen-rs", rev = "438e9c2dbc25b2fff82c0efc08b3f13b5707874f" }
tpchgen-arrow = { git = "https://github.com/clflushopt/tpchgen-rs", rev = "438e9c2dbc25b2fff82c0efc08b3f13b5707874f" }
parquet = "58"
arrow = { version = "58", features = ["test_utils"] }
tokio-stream = { version = "0.1.17", features = ["sync"] }
hyper-util = "0.1.16"
pretty_assertions = "1.4"
Expand Down
8 changes: 4 additions & 4 deletions benchmarks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,22 @@ datafusion = { workspace = true }
datafusion-proto = { workspace = true }
datafusion-distributed = { path = "..", features = ["integration"] }
tokio = { version = "1.48", features = ["full"] }
parquet = { version = "57.1.0" }
parquet = { version = "58" }
structopt = { version = "0.3.26" }
log = "0.4.27"
serde = "1.0.219"
serde_json = "1.0.141"
env_logger = "0.11.8"
async-trait = "0.1.89"
chrono = "0.4.42"
chrono = "0.4"
futures = "0.3.31"
dashmap = "6.0.1"
prost = "0.14.1"
url = "2.5.7"
arrow-flight = "57.1.0"
arrow-flight = "58"
tonic = { version = "0.14.1", features = ["transport"] }
axum = "0.7"
object_store = { version = "0.12.4", features = ["aws"] }
object_store = { version = "0.13", features = ["aws"] }
aws-config = "1"
aws-sdk-ec2 = "1"
openssl = { version = "0.10", features = ["vendored"] }
Expand Down
12 changes: 4 additions & 8 deletions benchmarks/benches/broadcast_cache_scenarios.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,17 +92,17 @@ struct SyntheticExec {
schema: SchemaRef,
partitions: usize,
batches: Arc<Vec<Arc<RecordBatch>>>,
properties: PlanProperties,
properties: Arc<PlanProperties>,
}

impl SyntheticExec {
fn new(schema: SchemaRef, partitions: usize, batches: Arc<Vec<Arc<RecordBatch>>>) -> Self {
let properties = PlanProperties::new(
let properties = Arc::new(PlanProperties::new(
EquivalenceProperties::new(Arc::clone(&schema)),
Partitioning::UnknownPartitioning(partitions),
EmissionType::Incremental,
Boundedness::Bounded,
);
));
Self {
schema,
partitions,
Expand Down Expand Up @@ -130,7 +130,7 @@ impl ExecutionPlan for SyntheticExec {
self
}

fn properties(&self) -> &PlanProperties {
fn properties(&self) -> &Arc<PlanProperties> {
&self.properties
}

Expand Down Expand Up @@ -163,10 +163,6 @@ impl ExecutionPlan for SyntheticExec {
Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
}

fn statistics(&self) -> Result<Statistics> {
Ok(Statistics::new_unknown(&self.schema))
}

fn partition_statistics(&self, _partition: Option<usize>) -> Result<Statistics> {
Ok(Statistics::new_unknown(&self.schema))
}
Expand Down
9 changes: 1 addition & 8 deletions cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,11 @@ edition = "2024"
[dependencies]
datafusion = { workspace = true }
datafusion-distributed = { path = "..", features = ["avro", "integration"] }
datafusion-cli = { version = "52", default-features = false }
datafusion-cli = { git = "https://github.com/apache/datafusion", branch = "branch-53", default-features = false }
tokio = { version = "1.48", features = ["full"] }
clap = { version = "4", features = ["derive"] }
env_logger = "0.11"
dirs = "6"
arrow-flight = "57.1.0"
tonic = { version = "0.14.1", features = ["transport"] }
tower = "0.5.2"
hyper-util = "0.1.16"
tokio-stream = "0.1.17"
async-trait = "0.1.89"
url = "2.5.7"

[[bin]]
name = "datafusion-distributed-cli"
Expand Down
4 changes: 2 additions & 2 deletions console/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@ ratatui = "0.30.0"
tokio = { version = "1.49.0", features = ["full"] }
tonic = "0.14.2"
datafusion-distributed = { path = "..", features = ["integration"] }
arrow-flight = "57.1.0"
arrow-flight = "58"
structopt = "0.3.26"
url = "2.5.7"

[dev-dependencies]
arrow = "57.1.0"
arrow = "58"
async-trait = "0.1.89"
futures = "0.3.31"
url = "2.5.7"
8 changes: 4 additions & 4 deletions examples/custom_execution_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,17 +131,17 @@ impl TableProvider for NumbersTableProvider {
#[derive(Debug, Clone)]
struct NumbersExec {
ranges_per_task: Vec<Range<i64>>,
plan_properties: PlanProperties,
plan_properties: Arc<PlanProperties>,
}

impl NumbersExec {
fn new(ranges_per_task: impl IntoIterator<Item = Range<i64>>, schema: SchemaRef) -> Self {
let plan_properties = PlanProperties::new(
let plan_properties = Arc::new(PlanProperties::new(
EquivalenceProperties::new(schema.clone()),
datafusion::physical_expr::Partitioning::UnknownPartitioning(1),
EmissionType::Incremental,
Boundedness::Bounded,
);
));
Self {
ranges_per_task: ranges_per_task.into_iter().collect(),
plan_properties,
Expand Down Expand Up @@ -171,7 +171,7 @@ impl ExecutionPlan for NumbersExec {
self
}

fn properties(&self) -> &PlanProperties {
fn properties(&self) -> &Arc<PlanProperties> {
&self.plan_properties
}

Expand Down
2 changes: 1 addition & 1 deletion rust-toolchain.toml
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
[toolchain]
channel = "1.88.0"
channel = "1.91"
profile = "default"
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use datafusion::common::DataFusionError;
use datafusion::common::tree_node::{Transformed, TreeNode};
use datafusion::config::ConfigOptions;
use datafusion::physical_plan::ExecutionPlan;
#[expect(deprecated)]
use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
use std::sync::Arc;

Expand All @@ -30,6 +31,7 @@ pub(crate) fn batch_coalescing_below_network_boundaries(
}

let input = require_one_child(plan.children())?;
#[expect(deprecated)]
if let Some(existing_coalesce) = input.as_any().downcast_ref::<CoalesceBatchesExec>() {
// There was already a CoalesceBatchesExec below...
if existing_coalesce.target_batch_size() == d_cfg.shuffle_batch_size {
Expand All @@ -38,6 +40,7 @@ pub(crate) fn batch_coalescing_below_network_boundaries(
} else {
// ... or replace it with one with the correct batch size.
let coalesce_input = existing_coalesce.input();
#[expect(deprecated)]
let new_coalesce =
CoalesceBatchesExec::new(Arc::clone(coalesce_input), d_cfg.shuffle_batch_size)
.with_fetch(existing_coalesce.fetch());
Expand All @@ -47,6 +50,7 @@ pub(crate) fn batch_coalescing_below_network_boundaries(
} else {
// No CoalesceBatchesExec below, need to put one.
let coalesce_input = input;
#[expect(deprecated)]
let new_coalesce = CoalesceBatchesExec::new(coalesce_input, d_cfg.shuffle_batch_size);
let new_plan = plan.with_new_children(vec![Arc::new(new_coalesce)])?;
Ok(Transformed::yes(new_plan))
Expand Down
4 changes: 2 additions & 2 deletions src/execution_plans/benchmarks/shuffle_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,12 +140,12 @@ impl ShuffleBench {
let mut join_set = JoinSet::default();
for i in 0..self.consumer_tasks {
let shuffle = NetworkShuffleExec {
properties: PlanProperties::new(
properties: Arc::new(PlanProperties::new(
EquivalenceProperties::new(schema.clone()),
Partitioning::UnknownPartitioning(self.partitions),
EmissionType::Incremental,
Boundedness::Bounded,
),
)),
input_stage: input_stage.clone(),
worker_connections: WorkerConnectionPool::new(self.producer_tasks),
metrics_collection: Arc::new(Default::default()),
Expand Down
14 changes: 7 additions & 7 deletions src/execution_plans/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ use tokio_stream::wrappers::WatchStream;
pub struct BroadcastExec {
input: Arc<dyn ExecutionPlan>,
consumer_task_count: usize,
properties: PlanProperties,
properties: Arc<PlanProperties>,
queues: Vec<OnceLock<Result<StreamAndTask, Arc<DataFusionError>>>>,
}

Expand All @@ -83,10 +83,10 @@ impl BroadcastExec {
let input_partition_count = input.properties().partitioning.partition_count();
let output_partition_count = input_partition_count * consumer_task_count;

let properties = input
.properties()
.clone()
.with_partitioning(Partitioning::UnknownPartitioning(output_partition_count));
let properties = Arc::new(
PlanProperties::clone(input.properties().as_ref())
.with_partitioning(Partitioning::UnknownPartitioning(output_partition_count)),
);

let queues = (0..input_partition_count)
.map(|_| OnceLock::new())
Expand Down Expand Up @@ -131,7 +131,7 @@ impl ExecutionPlan for BroadcastExec {
self
}

fn properties(&self) -> &PlanProperties {
fn properties(&self) -> &Arc<PlanProperties> {
&self.properties
}

Expand Down Expand Up @@ -179,7 +179,7 @@ impl ExecutionPlan for BroadcastExec {
while let Some(msg) = stream.next().await {
match msg {
Ok(record_batch) => {
let mut reservation = mem_consumer.clone_with_new_id().register(&pool);
let reservation = mem_consumer.clone_with_new_id().register(&pool);
reservation.grow(record_batch.get_array_memory_size());
queue.push(Ok((record_batch, Arc::new(reservation))));
}
Expand Down
11 changes: 7 additions & 4 deletions src/execution_plans/children_isolator_union.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ use std::vec;
/// ```
#[derive(Debug, Clone)]
pub struct ChildrenIsolatorUnionExec {
pub(crate) properties: PlanProperties,
pub(crate) properties: Arc<PlanProperties>,
pub(crate) metrics: ExecutionPlanMetricsSet,
pub(crate) children: Vec<Arc<dyn ExecutionPlan>>,
pub(crate) task_idx_map: Vec<
Expand Down Expand Up @@ -136,10 +136,13 @@ impl ChildrenIsolatorUnionExec {
// It's not supper efficient to build a UnionExec just to get the properties out, but the
// other solution is to copy-paste a bunch of code from upstream for computing the properties
// of a union, so we prefer to just reuse it like this.
let mut properties = UnionExec::try_new(children.clone())?.properties().clone();
let mut properties = UnionExec::try_new(children.clone())?
.properties()
.as_ref()
.clone();
properties.partitioning = Partitioning::UnknownPartitioning(*partition_count);
Ok(Self {
properties,
properties: Arc::new(properties),
metrics: ExecutionPlanMetricsSet::default(),
children,
task_idx_map,
Expand Down Expand Up @@ -187,7 +190,7 @@ impl ExecutionPlan for ChildrenIsolatorUnionExec {
self
}

fn properties(&self) -> &PlanProperties {
fn properties(&self) -> &Arc<PlanProperties> {
&self.properties
}

Expand Down
7 changes: 4 additions & 3 deletions src/execution_plans/common.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
use datafusion::physical_expr::Partitioning;
use datafusion::physical_plan::PlanProperties;
use std::sync::Arc;

pub(super) fn scale_partitioning_props(
props: &PlanProperties,
f: impl FnOnce(usize) -> usize,
) -> PlanProperties {
PlanProperties::new(
) -> Arc<PlanProperties> {
Arc::new(PlanProperties::new(
props.eq_properties.clone(),
scale_partitioning(&props.partitioning, f),
props.emission_type,
props.boundedness,
)
))
}

pub(super) fn scale_partitioning(
Expand Down
2 changes: 1 addition & 1 deletion src/execution_plans/distributed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ impl ExecutionPlan for DistributedExec {
self
}

fn properties(&self) -> &PlanProperties {
fn properties(&self) -> &Arc<PlanProperties> {
self.plan.properties()
}

Expand Down
2 changes: 1 addition & 1 deletion src/execution_plans/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ impl ExecutionPlan for MetricsWrapperExec {
delegate! {
to self.inner {
fn name(&self) -> &str;
fn properties(&self) -> &PlanProperties;
fn properties(&self) -> &Arc<PlanProperties>;
fn as_any(&self) -> &dyn Any;
}
}
Expand Down
15 changes: 9 additions & 6 deletions src/execution_plans/network_broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ use uuid::Uuid;
/// job to merge these partial partitions to then broadcast complete data to the consumers.
#[derive(Debug, Clone)]
pub struct NetworkBroadcastExec {
pub(crate) properties: PlanProperties,
pub(crate) properties: Arc<PlanProperties>,
pub(crate) input_stage: Stage,
pub(crate) worker_connections: WorkerConnectionPool,
pub(crate) metrics_collection: Arc<DashMap<StageKey, Vec<MetricsSetProto>>>,
Expand Down Expand Up @@ -150,10 +150,13 @@ impl NetworkBroadcastExec {
let broadcast_exec: Arc<dyn ExecutionPlan> =
Arc::new(super::BroadcastExec::new(child, consumer_task_count));

let properties = broadcast_exec
.properties()
.clone()
.with_partitioning(Partitioning::UnknownPartitioning(input_partition_count));
let properties = Arc::new(
broadcast_exec
.properties()
.as_ref()
.clone()
.with_partitioning(Partitioning::UnknownPartitioning(input_partition_count)),
);

let input_stage = Stage::new(query_id, stage_num, broadcast_exec, input_task_count);

Expand Down Expand Up @@ -208,7 +211,7 @@ impl ExecutionPlan for NetworkBroadcastExec {
self
}

fn properties(&self) -> &PlanProperties {
fn properties(&self) -> &Arc<PlanProperties> {
&self.properties
}

Expand Down
Loading
Loading