Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,081 changes: 864 additions & 217 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
members = ["benchmarks", "cli", "console"]

[workspace.dependencies]
datafusion = { version = "52.0.0", default-features = false }
datafusion-proto = { version = "52.0.0" }
datafusion = { git = "https://github.com/jayshrivastava/datafusion.git", rev = "f3b6568afcc0ed8fc3455458a80a49377f75d4e4", default-features = false }
datafusion-proto = { git = "https://github.com/jayshrivastava/datafusion.git", rev = "f3b6568afcc0ed8fc3455458a80a49377f75d4e4" }

[package]
name = "datafusion-distributed"
Expand Down
11 changes: 10 additions & 1 deletion src/flight_service/do_get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,16 @@ impl Worker {
let stage_data = once
.get_or_try_init(|| async {
let proto_node = PhysicalPlanNode::try_decode(doget.plan_proto.as_ref())?;
let mut plan = proto_node.try_into_physical_plan(&task_ctx, &codec)?;

// Use DeduplicatingProtoConverter to restore Arc-sharing relationships
// (especially important for dynamic filters with shared inner state)
use datafusion_proto::physical_plan::DeduplicatingProtoConverter;
let converter = DeduplicatingProtoConverter::default();
let mut plan = proto_node.try_into_physical_plan_with_converter(
&task_ctx,
&codec,
&converter,
)?;
for hook in self.hooks.on_plan.iter() {
plan = hook(plan)
}
Expand Down
8 changes: 7 additions & 1 deletion src/protobuf/distributed_codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use datafusion::physical_plan::{ExecutionPlan, Partitioning, PlanProperties};
use datafusion::prelude::SessionConfig;
use datafusion_proto::physical_plan::from_proto::parse_protobuf_partitioning;
use datafusion_proto::physical_plan::to_proto::serialize_partitioning;
use datafusion_proto::physical_plan::{ComposedPhysicalExtensionCodec, PhysicalExtensionCodec};
use datafusion_proto::physical_plan::{ComposedPhysicalExtensionCodec, PhysicalExtensionCodec, DefaultPhysicalProtoConverter};
use datafusion_proto::protobuf;
use datafusion_proto::protobuf::proto_error;
use itertools::Itertools;
Expand Down Expand Up @@ -108,6 +108,7 @@ impl PhysicalExtensionCodec for DistributedCodec {
ctx,
&schema,
&DistributedCodec {},
&DefaultPhysicalProtoConverter,
)?
.ok_or(proto_error("NetworkShuffleExec is missing partitioning"))?;

Expand All @@ -132,6 +133,7 @@ impl PhysicalExtensionCodec for DistributedCodec {
ctx,
&schema,
&DistributedCodec {},
&DefaultPhysicalProtoConverter,
)?
.ok_or(proto_error("NetworkCoalesceExec is missing partitioning"))?;

Expand Down Expand Up @@ -171,6 +173,7 @@ impl PhysicalExtensionCodec for DistributedCodec {
ctx,
&schema,
&DistributedCodec {},
&DefaultPhysicalProtoConverter,
)?
.ok_or(proto_error("NetworkBroadcastExec is missing partitioning"))?;

Expand Down Expand Up @@ -258,6 +261,7 @@ impl PhysicalExtensionCodec for DistributedCodec {
partitioning: Some(serialize_partitioning(
node.properties().output_partitioning(),
&DistributedCodec {},
&DefaultPhysicalProtoConverter,
)?),
input_stage: Some(encode_stage_proto(node.input_stage())?),
};
Expand All @@ -273,6 +277,7 @@ impl PhysicalExtensionCodec for DistributedCodec {
partitioning: Some(serialize_partitioning(
node.properties().output_partitioning(),
&DistributedCodec {},
&DefaultPhysicalProtoConverter,
)?),
input_stage: Some(encode_stage_proto(node.input_stage())?),
};
Expand All @@ -298,6 +303,7 @@ impl PhysicalExtensionCodec for DistributedCodec {
partitioning: Some(serialize_partitioning(
node.properties().output_partitioning(),
&DistributedCodec {},
&DefaultPhysicalProtoConverter,
)?),
input_stage: Some(encode_stage_proto(node.input_stage())?),
};
Expand Down
16 changes: 13 additions & 3 deletions src/stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,11 +106,21 @@ pub(crate) enum MaybeEncodedPlan {
impl MaybeEncodedPlan {
pub(crate) fn to_encoded(&self, codec: &dyn PhysicalExtensionCodec) -> Result<Self> {
Ok(match self {
Self::Decoded(plan) => Self::Encoded(
PhysicalPlanNode::try_from_physical_plan(Arc::clone(plan), codec)?
Self::Decoded(plan) => {
// Use DeduplicatingProtoConverter to preserve Arc-sharing relationships
// (especially important for dynamic filters)
use datafusion_proto::physical_plan::DeduplicatingProtoConverter;
let converter = DeduplicatingProtoConverter::default();
Self::Encoded(
PhysicalPlanNode::try_from_physical_plan_with_converter(
Arc::clone(plan),
codec,
&converter,
)?
.encode_to_vec()
.into(),
),
)
}
Self::Encoded(plan) => Self::Encoded(plan.clone()),
})
}
Expand Down
251 changes: 250 additions & 1 deletion tests/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ mod tests {
DefaultSessionBuilder, assert_snapshot, display_plan_ascii,
test_utils::localhost::start_localhost_context,
};
use std::sync::Arc;

fn set_configs(ctx: &mut SessionContext) {
// Preserve hive-style file partitions.
Expand Down Expand Up @@ -43,6 +44,13 @@ mod tests {
.options_mut()
.optimizer
.hash_join_single_partition_threshold_rows = 0;
// Enable dynamic filter pushdown
ctx.state_ref()
.write()
.config_mut()
.options_mut()
.optimizer
.enable_dynamic_filter_pushdown = true;
}

async fn register_tables(ctx: &SessionContext) -> Result<()> {
Expand Down Expand Up @@ -70,15 +78,88 @@ mod tests {
) -> Result<(String, Vec<RecordBatch>)> {
let df = ctx.sql(query).await?;
let (state, logical_plan) = df.into_parts();

// Create physical plan WITHOUT distributed optimizer
use datafusion::execution::SessionStateBuilder;
let mut state_builder = SessionStateBuilder::new_from_existing(state.clone());
state_builder = state_builder.with_physical_optimizer_rules(vec![]);
let non_distributed_state = state_builder.build();
let non_distributed_plan = non_distributed_state.create_physical_plan(&logical_plan).await?;

println!("\n——————— BEFORE DISTRIBUTED OPTIMIZER ———————\n");
print_datasource_predicates(&non_distributed_plan, 0);
println!("\n——————— ARC ADDRESSES BEFORE DISTRIBUTED OPTIMIZER ———————\n");
print_dynamic_filter_arc_addresses(&non_distributed_plan, 0);

// Create physical plan WITH distributed optimizer
let physical_plan = state.create_physical_plan(&logical_plan).await?;

println!("\n——————— AFTER DISTRIBUTED OPTIMIZER ———————\n");
print_datasource_predicates(&physical_plan, 0);
println!("\n——————— ARC ADDRESSES AFTER DISTRIBUTED OPTIMIZER ———————\n");
print_dynamic_filter_arc_addresses(&physical_plan, 0);

let distributed_plan = display_plan_ascii(physical_plan.as_ref(), false);
println!("\n——————— DISTRIBUTED PLAN ———————\n\n{distributed_plan}");

let distributed_results = collect(physical_plan, state.task_ctx()).await?;
let distributed_results = collect(physical_plan.clone(), state.task_ctx()).await?;
pretty::print_batches(&distributed_results)?;

// Print plan with metrics after execution
let distributed_plan_with_metrics = display_plan_ascii(physical_plan.as_ref(), true);
println!("\n——————— PLAN WITH METRICS ———————\n\n{distributed_plan_with_metrics}");

// Print detailed metrics from the physical plan
use datafusion::physical_plan::displayable;
let metrics_str = displayable(physical_plan.as_ref()).indent(true).to_string();
println!("\n——————— DETAILED METRICS ———————\n\n{metrics_str}");

Ok((distributed_plan, distributed_results))
}

fn print_datasource_predicates(plan: &Arc<dyn datafusion::physical_plan::ExecutionPlan>, depth: usize) {
use datafusion::physical_plan::displayable;
let indent = " ".repeat(depth);
let plan_name = plan.name();

// Use displayable format which shows predicates
if plan_name.starts_with("DataSourceExec") || plan_name.starts_with("ParquetExec") {
let display_str = displayable(plan.as_ref()).indent(false).to_string();
println!("{indent}{}", display_str);
}

for child in plan.children() {
print_datasource_predicates(child, depth + 1);
}
}

fn print_dynamic_filter_arc_addresses(plan: &Arc<dyn datafusion::physical_plan::ExecutionPlan>, depth: usize) {
use datafusion::physical_plan::joins::HashJoinExec;
use datafusion::datasource::source::DataSourceExec;

let indent = " ".repeat(depth);

// Check if this is a HashJoinExec and print its dynamic filter address
if let Some(hash_join) = plan.as_any().downcast_ref::<HashJoinExec>() {
if let Some(filter) = hash_join.dynamic_filter_for_test() {
let ptr_addr = Arc::as_ptr(filter) as *const () as u64;
println!("{}HashJoinExec dynamic filter Arc address: 0x{:x}", indent, ptr_addr);
}
}

// Check if this is a DataSourceExec and print its filter address
if let Some(data_source) = plan.as_any().downcast_ref::<DataSourceExec>() {
if let Some(filter) = data_source.filter_for_test() {
let ptr = Arc::as_ptr(&filter) as *const () as u64;
println!("{}DataSourceExec filter Arc address: 0x{:x}", indent, ptr);
}
}

for child in plan.children() {
print_dynamic_filter_arc_addresses(child, depth + 1);
}
}

#[tokio::test]
async fn test_join_hive() -> Result<(), Box<dyn std::error::Error>> {
let query = r#"
Expand Down Expand Up @@ -150,6 +231,174 @@ mod tests {
Ok(())
}

/// Test showing dynamic filter behavior in Partitioned hash join mode.
///
/// This test demonstrates a known correctness bug with dynamic filtering in distributed
/// Partitioned mode: https://github.com/apache/datafusion/pull/20175
///
/// Expected behavior comparison:
///
/// ============================================================
/// DISABLED (no dynamic filter) - WORKS CORRECTLY:
/// ============================================================
///
/// ┌───── DistributedExec ── Tasks: t0:[p0]
/// │ SortPreservingMergeExec: [f_dkey@0 ASC NULLS LAST, timestamp@1 ASC NULLS LAST]
/// │ metrics=[output_rows=14]
/// │ [Stage 1] => NetworkCoalesceExec: output_partitions=4, input_tasks=2
/// └──────────────────────────────────────────────────
/// ┌───── Stage 1 ── Tasks: t0:[p0..p1] t1:[p2..p3]
/// │ ProjectionExec: expr=[f_dkey@5, timestamp@3, value@4, env@0, service@1, host@2]
/// │ metrics=[output_rows=14]
/// │ HashJoinExec: mode=Partitioned, join_type=Inner, on=[(d_dkey@3, f_dkey@2)]
/// │ metrics=[output_rows=14, input_batches=4, input_rows=24, probe_hit_rate=58% (14/24)]
/// │ ^^^^^^^^^^^ ← Reads ALL 24 rows
/// │ FilterExec: service@1 = log
/// │ PartitionIsolatorExec: t0:[p0,p1,__,__] t1:[__,__,p0,p1]
/// │ DataSourceExec: file_groups={4 groups: [d_dkey=A, B, C, D]}
/// │ predicate=service@1 = log
/// │ metrics=[output_rows=2]
/// │ PartitionIsolatorExec: t0:[p0,p1,__,__] t1:[__,__,p0,p1]
/// │ DataSourceExec: file_groups={4 groups: [f_dkey=A, B, C, D]}
/// │ metrics=[output_rows=24, output_batches=4,
/// │ files_ranges_pruned_statistics=4 total → 4 matched, ← ALL 4 files
/// │ bytes_scanned=767]
/// └──────────────────────────────────────────────────
///
/// Results: 1 batches, 14 total rows ✓
///
///
/// ============================================================
/// ENABLED (with dynamic filter) - Dynamic filtering works but results are wrong (known issue):
/// ============================================================
///
/// ┌───── DistributedExec ── Tasks: t0:[p0]
/// │ SortPreservingMergeExec: [f_dkey@0 ASC NULLS LAST, timestamp@1 ASC NULLS LAST]
/// │ metrics=[output_rows=0] ← NO OUTPUT!
/// │ [Stage 1] => NetworkCoalesceExec: output_partitions=4, input_tasks=2
/// └──────────────────────────────────────────────────
/// ┌───── Stage 1 ── Tasks: t0:[p0..p1] t1:[p2..p3]
/// │ ProjectionExec: expr=[f_dkey@5, timestamp@3, value@4, env@0, service@1, host@2]
/// │ metrics=[output_rows=0] ← NO OUTPUT!
/// │ HashJoinExec: mode=Partitioned, join_type=Inner, on=[(d_dkey@3, f_dkey@2)]
/// │ metrics=[output_rows=0, input_batches=0, input_rows=0, probe_hit_rate=N/A (0/0)]
/// │ ^^^^^^^^^^^ ← NO INPUT!
/// │ FilterExec: service@1 = log
/// │ PartitionIsolatorExec: t0:[p0,p1,__,__] t1:[__,__,p0,p1]
/// │ DataSourceExec: file_groups={4 groups: [d_dkey=A, B, C, D]}
/// │ predicate=service@1 = log
/// │ metrics=[output_rows=2] ← Build side is fine
/// │ PartitionIsolatorExec: t0:[p0,p1,__,__] t1:[__,__,p0,p1]
/// │ DataSourceExec: file_groups={4 groups: [f_dkey=A, B, C, D]}
/// │ predicate=DynamicFilter [ empty ]
/// │ ^^^^^^^^^^^^^^^^^^^^^^^^^
/// │ NOTE: Shows "empty" because the plan is displayed on the coordinator,
/// │ which doesn't have the dynamic filter state. On the workers, the filter
/// │ exists but incorrectly prunes ALL data.
/// │ metrics=[output_rows=0, output_batches=0,
/// │ files_ranges_pruned_statistics=4 total → 0 matched, ← PRUNED ALL!
/// │ bytes_scanned=0] ← NO DATA READ!
/// └──────────────────────────────────────────────────
///
/// Results: 0 batches, 0 total rows ✗ (Should be 14 rows!)
///
/// KNOWN ISSUE: Dynamic filtering in Partitioned mode incorrectly prunes all rows,
/// returning 0 results instead of 14. This is a correctness bug tracked at:
/// https://github.com/apache/datafusion/pull/20175
#[tokio::test]
async fn test_join_hive_dynamic_filter_comparison() -> Result<(), Box<dyn std::error::Error>> {
async fn run_with_dynamic_filter(enable: bool, query: &str) -> Result<(), Box<dyn std::error::Error>> {
let (mut ctx, _guard, _) =
start_localhost_context(2, DefaultSessionBuilder).await;

// Use same config as test_join_hive to get Partitioned mode
ctx.state_ref()
.write()
.config_mut()
.options_mut()
.optimizer
.preserve_file_partitions = 1;
ctx.state_ref()
.write()
.config_mut()
.options_mut()
.execution
.target_partitions = 4;
// Force Partitioned mode by setting thresholds to 0
ctx.state_ref()
.write()
.config_mut()
.options_mut()
.optimizer
.hash_join_single_partition_threshold = 0;
ctx.state_ref()
.write()
.config_mut()
.options_mut()
.optimizer
.hash_join_single_partition_threshold_rows = 0;

// Set dynamic filter flag
ctx.state_ref()
.write()
.config_mut()
.options_mut()
.optimizer
.enable_join_dynamic_filter_pushdown = enable;

register_tables(&ctx).await?;

println!("\n{}", "=".repeat(60));
println!("Dynamic Filter Pushdown: {}", if enable { "ENABLED" } else { "DISABLED" });
println!("{}\n", "=".repeat(60));

let df = ctx.sql(query).await?;
let mut physical_plan = df.create_physical_plan().await?;

// Execute using execute_stream to preserve metrics
use datafusion::physical_plan::execute_stream;
use futures::TryStreamExt;
let results = execute_stream(physical_plan.clone(), ctx.task_ctx())?
.try_collect::<Vec<_>>()
.await?;

// Rewrite plan with metrics collected from workers
use datafusion_distributed::{display_plan_ascii, rewrite_distributed_plan_with_metrics, DistributedMetricsFormat};
physical_plan = rewrite_distributed_plan_with_metrics(
physical_plan.clone(),
DistributedMetricsFormat::Aggregated,
)?;

// Display plan with metrics
let plan_with_metrics = display_plan_ascii(physical_plan.as_ref(), true);
println!("\nPlan with metrics:\n{}\n", plan_with_metrics);

println!("Results: {} batches, {} total rows\n", results.len(), results.iter().map(|b| b.num_rows()).sum::<usize>());

Ok(())
}

let query = r#"
SELECT
f.f_dkey,
f.timestamp,
f.value,
d.env,
d.service,
d.host
FROM dim d
INNER JOIN fact f ON d.d_dkey = f.f_dkey
WHERE d.service = 'log'
ORDER BY f_dkey, timestamp
"#;

run_with_dynamic_filter(false, query).await?;
println!("\n\n");
run_with_dynamic_filter(true, query).await?;

Ok(())
}

#[tokio::test]
async fn test_join_agg_hive() -> Result<(), Box<dyn std::error::Error>> {
let query = r#"
Expand Down
Loading