diff --git a/benchmarks/cdk/bin/worker.rs b/benchmarks/cdk/bin/worker.rs index 8ab21c1a..90b9e23b 100644 --- a/benchmarks/cdk/bin/worker.rs +++ b/benchmarks/cdk/bin/worker.rs @@ -10,8 +10,8 @@ use datafusion::execution::runtime_env::RuntimeEnv; use datafusion::physical_plan::execute_stream; use datafusion::prelude::SessionContext; use datafusion_distributed::{ - ChannelResolver, DistributedExt, DistributedMetricsFormat, DistributedPhysicalOptimizerRule, - Worker, WorkerResolver, display_plan_ascii, get_distributed_channel_resolver, + ChannelResolver, DistributedExt, DistributedMetricsFormat, SessionStateBuilderExt, Worker, + WorkerResolver, display_plan_ascii, get_distributed_channel_resolver, get_distributed_worker_resolver, rewrite_distributed_plan_with_metrics, }; use futures::{StreamExt, TryFutureExt}; @@ -94,7 +94,7 @@ async fn main() -> Result<(), Box> { .with_default_features() .with_runtime_env(Arc::clone(&runtime_env)) .with_distributed_worker_resolver(Ec2WorkerResolver::new()) - .with_physical_optimizer_rule(Arc::new(DistributedPhysicalOptimizerRule)) + .with_distributed_planner() .with_distributed_broadcast_joins(cmd.broadcast_joins)? .build(); let ctx = SessionContext::from(state); diff --git a/benchmarks/src/run.rs b/benchmarks/src/run.rs index d6ff4ec4..5147fc97 100644 --- a/benchmarks/src/run.rs +++ b/benchmarks/src/run.rs @@ -28,9 +28,7 @@ use datafusion::physical_plan::display::DisplayableExecutionPlan; use datafusion::physical_plan::{collect, displayable}; use datafusion::prelude::*; use datafusion_distributed::test_utils::localhost::LocalHostWorkerResolver; -use datafusion_distributed::{ - DistributedExt, DistributedPhysicalOptimizerRule, NetworkBoundaryExt, Worker, -}; +use datafusion_distributed::{DistributedExt, NetworkBoundaryExt, SessionStateBuilderExt, Worker}; use datafusion_distributed_benchmarks::datasets::{clickbench, register_tables, tpcds, tpch}; use std::error::Error; use std::fs; @@ -178,7 +176,7 @@ impl RunOpt { .with_default_features() .with_config(self.config()?) .with_distributed_worker_resolver(LocalHostWorkerResolver::new(self.workers.clone())) - .with_physical_optimizer_rule(Arc::new(DistributedPhysicalOptimizerRule)) + .with_distributed_planner() .with_distributed_files_per_task( self.files_per_task.unwrap_or(get_available_parallelism()), )? diff --git a/cli/src/main.rs b/cli/src/main.rs index fee4fc45..f3a10491 100644 --- a/cli/src/main.rs +++ b/cli/src/main.rs @@ -36,7 +36,7 @@ use datafusion_cli::{ use datafusion_distributed::test_utils::in_memory_channel_resolver::{ InMemoryChannelResolver, InMemoryWorkerResolver, }; -use datafusion_distributed::{DistributedExt, DistributedPhysicalOptimizerRule}; +use datafusion_distributed::{DistributedExt, SessionStateBuilderExt}; use std::env; use std::path::Path; use std::process::ExitCode; @@ -148,7 +148,7 @@ async fn main_inner() -> Result<()> { .with_default_features() .with_config(session_config) .with_runtime_env(runtime_env) - .with_physical_optimizer_rule(Arc::new(DistributedPhysicalOptimizerRule)) + .with_distributed_planner() .with_distributed_worker_resolver(InMemoryWorkerResolver::new(16)) .with_distributed_channel_resolver(InMemoryChannelResolver::default()) .build(); diff --git a/console/examples/console_run.rs b/console/examples/console_run.rs index 5da8823f..e0bfb5d7 100644 --- a/console/examples/console_run.rs +++ b/console/examples/console_run.rs @@ -4,11 +4,10 @@ use datafusion::common::DataFusionError; use datafusion::execution::SessionStateBuilder; use datafusion::prelude::{ParquetReadOptions, SessionContext}; use datafusion_distributed::{ - DistributedExt, DistributedPhysicalOptimizerRule, WorkerResolver, display_plan_ascii, + DistributedExt, SessionStateBuilderExt, WorkerResolver, display_plan_ascii, }; use futures::TryStreamExt; use std::error::Error; -use std::sync::Arc; use structopt::StructOpt; use url::Url; @@ -38,7 +37,7 @@ async fn main() -> Result<(), Box> { let state = SessionStateBuilder::new() .with_default_features() .with_distributed_worker_resolver(localhost_resolver) - .with_physical_optimizer_rule(Arc::new(DistributedPhysicalOptimizerRule)) + .with_distributed_planner() .with_distributed_files_per_task(1)? .build(); diff --git a/console/examples/tpcds_runner.rs b/console/examples/tpcds_runner.rs index 4f82cb8b..672dd4e5 100644 --- a/console/examples/tpcds_runner.rs +++ b/console/examples/tpcds_runner.rs @@ -4,7 +4,7 @@ use datafusion::execution::SessionStateBuilder; use datafusion::physical_plan::execute_stream; use datafusion::prelude::SessionContext; use datafusion_distributed::{ - DistributedExt, DistributedMetricsFormat, DistributedPhysicalOptimizerRule, WorkerResolver, + DistributedExt, DistributedMetricsFormat, SessionStateBuilderExt, WorkerResolver, display_plan_ascii, }; use datafusion_distributed_benchmarks::datasets::{register_tables, tpcds}; @@ -97,7 +97,7 @@ async fn run_queries( let state = SessionStateBuilder::new() .with_default_features() .with_distributed_worker_resolver(localhost_resolver) - .with_physical_optimizer_rule(Arc::new(DistributedPhysicalOptimizerRule)) + .with_distributed_planner() .build(); let ctx = SessionContext::from(state); diff --git a/docs/source/user-guide/channel-resolver.md b/docs/source/user-guide/channel-resolver.md index 9fcdac18..07aa81d1 100644 --- a/docs/source/user-guide/channel-resolver.md +++ b/docs/source/user-guide/channel-resolver.md @@ -43,7 +43,7 @@ async fn main() { let state = SessionStateBuilder::new() // these two are mandatory. .with_distributed_worker_resolver(todo!()) - .with_physical_optimizer_rule(Arc::new(DistributedPhysicalOptimizerRule)) + .with_distributed_planner() // the CustomChannelResolver needs to be passed here once... .with_distributed_channel_resolver(channel_resolver.clone()) .build(); diff --git a/docs/source/user-guide/concepts.md b/docs/source/user-guide/concepts.md index d66235ad..3c34535a 100644 --- a/docs/source/user-guide/concepts.md +++ b/docs/source/user-guide/concepts.md @@ -24,10 +24,10 @@ You'll see these concepts mentioned extensively across the documentation and the Some other more tangible concepts are the structs and traits exposed publicly, the most important are: -## [DistributedPhysicalOptimizerRule](https://github.com/datafusion-contrib/datafusion-distributed/blob/main/src/distributed_planner/distributed_physical_optimizer_rule.rs) +## [SessionStateBuilderExt](https://github.com/datafusion-contrib/datafusion-distributed/blob/main/src/distributed_planner/session_state_builder_ext.rs) -A physical optimizer rule that transforms single-node DataFusion query plans into distributed query plans. It reads -a fully formed physical plan and injects the appropriate nodes to execute the query in a distributed fashion. +An extension trait for `SessionStateBuilder` that provides `with_distributed_planner()`. This registers a custom +query planner that transforms single-node DataFusion query plans into distributed query plans after physical planning. It builds the distributed plan from bottom to top, injecting network boundaries at appropriate locations based on the nodes present in the original plan. diff --git a/docs/source/user-guide/getting-started.md b/docs/source/user-guide/getting-started.md index 9b7dd089..a336ec57 100644 --- a/docs/source/user-guide/getting-started.md +++ b/docs/source/user-guide/getting-started.md @@ -49,7 +49,7 @@ impl WorkerResolver for LocalhostWorkerResolver { } ``` -Register both the `WorkerResolver` implementation and the `DistributedPhysicalOptimizerRule` in DataFusion's +Register the `WorkerResolver` implementation and the distributed planner in DataFusion's `SessionStateBuilder` to enable distributed query planning: ```rs @@ -59,7 +59,7 @@ let localhost_worker_resolver = LocalhostWorkerResolver { let state = SessionStateBuilder::new() .with_distributed_worker_resolver(localhost_worker_resolver) - .with_physical_optimizer_rule(Arc::new(DistributedPhysicalOptimizerRule)) + .with_distributed_planner() .build(); let ctx = SessionContext::from(state); diff --git a/docs/source/user-guide/worker-resolver.md b/docs/source/user-guide/worker-resolver.md index a17e5915..f6b8da6f 100644 --- a/docs/source/user-guide/worker-resolver.md +++ b/docs/source/user-guide/worker-resolver.md @@ -24,7 +24,7 @@ impl WorkerResolver for CustomWorkerResolver { async fn main() { let state = SessionStateBuilder::new() .with_distributed_worker_resolver(CustomWorkerResolver) - .with_physical_optimizer_rule(Arc::new(DistributedPhysicalOptimizerRule)) + .with_distributed_planner() .build(); } ``` diff --git a/docs/source/user-guide/worker.md b/docs/source/user-guide/worker.md index 2b1b6380..3e1bbf1e 100644 --- a/docs/source/user-guide/worker.md +++ b/docs/source/user-guide/worker.md @@ -248,7 +248,7 @@ With the resolver in place, wire it into the session and tag each worker with a ```rust use datafusion::execution::SessionStateBuilder; -use datafusion_distributed::{DistributedExt, DistributedPhysicalOptimizerRule, Worker}; +use datafusion_distributed::{DistributedExt, Worker}; let worker_version = std::env::var("COMMIT_HASH").unwrap_or_default(); @@ -262,7 +262,7 @@ let resolver = VersionAwareWorkerResolver::start_version_filtering( let state = SessionStateBuilder::new() .with_default_features() .with_distributed_worker_resolver(resolver) - .with_physical_optimizer_rule(Arc::new(DistributedPhysicalOptimizerRule)) + .with_distributed_planner() .build(); let ctx = SessionContext::from(state); diff --git a/examples/custom_execution_plan.rs b/examples/custom_execution_plan.rs index 2fe269dd..f9a551ba 100644 --- a/examples/custom_execution_plan.rs +++ b/examples/custom_execution_plan.rs @@ -39,8 +39,8 @@ use datafusion_distributed::test_utils::in_memory_channel_resolver::{ InMemoryChannelResolver, InMemoryWorkerResolver, }; use datafusion_distributed::{ - DistributedExt, DistributedPhysicalOptimizerRule, DistributedTaskContext, TaskEstimation, - TaskEstimator, WorkerQueryContext, display_plan_ascii, + DistributedExt, DistributedTaskContext, SessionStateBuilderExt, TaskEstimation, TaskEstimator, + WorkerQueryContext, display_plan_ascii, }; use datafusion_proto::physical_plan::PhysicalExtensionCodec; use datafusion_proto::protobuf; @@ -382,7 +382,7 @@ async fn main() -> Result<(), Box> { .with_config(config) .with_distributed_worker_resolver(worker_resolver) .with_distributed_channel_resolver(channel_resolver) - .with_physical_optimizer_rule(Arc::new(DistributedPhysicalOptimizerRule)) + .with_distributed_planner() .with_distributed_user_codec(NumbersExecCodec) .with_distributed_task_estimator(NumbersTaskEstimator) .build(); diff --git a/examples/in_memory_cluster.rs b/examples/in_memory_cluster.rs index 6ae97e28..fd92a48d 100644 --- a/examples/in_memory_cluster.rs +++ b/examples/in_memory_cluster.rs @@ -4,14 +4,13 @@ use datafusion::common::DataFusionError; use datafusion::execution::SessionStateBuilder; use datafusion::prelude::{ParquetReadOptions, SessionContext}; use datafusion_distributed::{ - BoxCloneSyncChannel, ChannelResolver, DistributedExt, DistributedPhysicalOptimizerRule, Worker, + BoxCloneSyncChannel, ChannelResolver, DistributedExt, SessionStateBuilderExt, Worker, WorkerQueryContext, WorkerResolver, WorkerServiceClient, create_worker_client, display_plan_ascii, }; use futures::TryStreamExt; use hyper_util::rt::TokioIo; use std::error::Error; -use std::sync::Arc; use structopt::StructOpt; use tonic::transport::{Endpoint, Server}; @@ -38,7 +37,7 @@ async fn main() -> Result<(), Box> { .with_default_features() .with_distributed_worker_resolver(InMemoryWorkerResolver) .with_distributed_channel_resolver(InMemoryChannelResolver::new()) - .with_physical_optimizer_rule(Arc::new(DistributedPhysicalOptimizerRule)) + .with_distributed_planner() .with_distributed_files_per_task(1)? .build(); diff --git a/examples/localhost_run.rs b/examples/localhost_run.rs index eae9b6cd..73d37d52 100644 --- a/examples/localhost_run.rs +++ b/examples/localhost_run.rs @@ -4,11 +4,10 @@ use datafusion::common::DataFusionError; use datafusion::execution::SessionStateBuilder; use datafusion::prelude::{ParquetReadOptions, SessionContext}; use datafusion_distributed::{ - DistributedExt, DistributedPhysicalOptimizerRule, WorkerResolver, display_plan_ascii, + DistributedExt, SessionStateBuilderExt, WorkerResolver, display_plan_ascii, }; use futures::TryStreamExt; use std::error::Error; -use std::sync::Arc; use structopt::StructOpt; use url::Url; @@ -39,7 +38,7 @@ async fn main() -> Result<(), Box> { let state = SessionStateBuilder::new() .with_default_features() .with_distributed_worker_resolver(localhost_resolver) - .with_physical_optimizer_rule(Arc::new(DistributedPhysicalOptimizerRule)) + .with_distributed_planner() .with_distributed_files_per_task(1)? .build(); diff --git a/examples/localhost_versioned_run.rs b/examples/localhost_versioned_run.rs index a41fa8d0..cf60cd6a 100644 --- a/examples/localhost_versioned_run.rs +++ b/examples/localhost_versioned_run.rs @@ -4,12 +4,11 @@ use datafusion::common::DataFusionError; use datafusion::execution::SessionStateBuilder; use datafusion::prelude::{ParquetReadOptions, SessionContext}; use datafusion_distributed::{ - DefaultChannelResolver, DistributedExt, DistributedPhysicalOptimizerRule, GetWorkerInfoRequest, + DefaultChannelResolver, DistributedExt, GetWorkerInfoRequest, SessionStateBuilderExt, WorkerResolver, create_worker_client, display_plan_ascii, }; use futures::TryStreamExt; use std::error::Error; -use std::sync::Arc; use structopt::StructOpt; use url::Url; @@ -93,7 +92,7 @@ async fn main() -> Result<(), Box> { let state = SessionStateBuilder::new() .with_default_features() .with_distributed_worker_resolver(localhost_resolver) - .with_physical_optimizer_rule(Arc::new(DistributedPhysicalOptimizerRule)) + .with_distributed_planner() .with_distributed_files_per_task(1)? .build(); diff --git a/src/distributed_ext.rs b/src/distributed_ext.rs index 7c957926..4d357fc7 100644 --- a/src/distributed_ext.rs +++ b/src/distributed_ext.rs @@ -197,7 +197,7 @@ pub trait DistributedExt: Sized { /// # use datafusion::prelude::SessionConfig; /// # use url::Url; /// # use std::sync::Arc; - /// # use datafusion_distributed::{BoxCloneSyncChannel, WorkerResolver, DistributedExt, DistributedPhysicalOptimizerRule, WorkerQueryContext}; + /// # use datafusion_distributed::{BoxCloneSyncChannel, WorkerResolver, DistributedExt, SessionStateBuilderExt, WorkerQueryContext}; /// /// struct CustomWorkerResolver; /// @@ -211,9 +211,7 @@ pub trait DistributedExt: Sized { /// // This tweaks the SessionState so that it can plan for distributed queries and execute them. /// let state = SessionStateBuilder::new() /// .with_distributed_worker_resolver(CustomWorkerResolver) - /// // the DistributedPhysicalOptimizerRule also needs to be passed so that query plans - /// // get distributed. - /// .with_physical_optimizer_rule(Arc::new(DistributedPhysicalOptimizerRule)) + /// .with_distributed_planner() /// .build(); /// ``` fn with_distributed_worker_resolver( @@ -241,7 +239,7 @@ pub trait DistributedExt: Sized { /// # use datafusion::prelude::SessionConfig; /// # use url::Url; /// # use std::sync::Arc; - /// # use datafusion_distributed::{BoxCloneSyncChannel, ChannelResolver, DistributedExt, DistributedPhysicalOptimizerRule, WorkerQueryContext, WorkerServiceClient}; + /// # use datafusion_distributed::{BoxCloneSyncChannel, ChannelResolver, DistributedExt, SessionStateBuilderExt, WorkerQueryContext, WorkerServiceClient}; /// /// struct CustomChannelResolver; /// @@ -256,9 +254,7 @@ pub trait DistributedExt: Sized { /// // This tweaks the SessionState so that it can plan for distributed queries and execute them. /// let state = SessionStateBuilder::new() /// .with_distributed_channel_resolver(CustomChannelResolver) - /// // the DistributedPhysicalOptimizerRule also needs to be passed so that query plans - /// // get distributed. - /// .with_physical_optimizer_rule(Arc::new(DistributedPhysicalOptimizerRule)) + /// .with_distributed_planner() /// .build(); /// /// // This function can be provided to a Worker so that, upon receiving a distributed diff --git a/src/distributed_planner/batch_coalescing_below_network_boundaries.rs b/src/distributed_planner/batch_coalescing_below_network_boundaries.rs index f84443fa..bf927460 100644 --- a/src/distributed_planner/batch_coalescing_below_network_boundaries.rs +++ b/src/distributed_planner/batch_coalescing_below_network_boundaries.rs @@ -61,12 +61,10 @@ pub(crate) fn batch_coalescing_below_network_boundaries( #[cfg(test)] mod tests { - use super::*; + use crate::distributed_planner::session_state_builder_ext::SessionStateBuilderExt; use crate::test_utils::in_memory_channel_resolver::InMemoryWorkerResolver; use crate::test_utils::parquet::register_parquet_tables; - use crate::{ - DistributedExt, DistributedPhysicalOptimizerRule, assert_snapshot, display_plan_ascii, - }; + use crate::{DistributedExt, assert_snapshot, display_plan_ascii}; use datafusion::execution::SessionStateBuilder; use datafusion::prelude::{SessionConfig, SessionContext}; use itertools::Itertools; @@ -161,7 +159,7 @@ mod tests { let state = SessionStateBuilder::new() .with_default_features() .with_config(SessionConfig::new().with_target_partitions(4)) - .with_physical_optimizer_rule(Arc::new(DistributedPhysicalOptimizerRule)) + .with_distributed_planner() .with_distributed_worker_resolver(InMemoryWorkerResolver::new(3)) .build(); diff --git a/src/distributed_planner/distributed_physical_optimizer_rule.rs b/src/distributed_planner/distribute_plan.rs similarity index 95% rename from src/distributed_planner/distributed_physical_optimizer_rule.rs rename to src/distributed_planner/distribute_plan.rs index 59543c60..02f1adf4 100644 --- a/src/distributed_planner/distributed_physical_optimizer_rule.rs +++ b/src/distributed_planner/distribute_plan.rs @@ -1,80 +1,82 @@ use crate::common::require_one_child; -use crate::distributed_planner::batch_coalescing_below_network_boundaries; +use crate::distributed_planner::batch_coalescing_below_network_boundaries::batch_coalescing_below_network_boundaries; +use crate::distributed_planner::insert_broadcast::insert_broadcast_execs; use crate::distributed_planner::plan_annotator::{ AnnotatedPlan, PlanOrNetworkBoundary, annotate_plan, }; use crate::{ - DistributedConfig, DistributedExec, NetworkBroadcastExec, NetworkCoalesceExec, + DistributedConfig, NetworkBoundaryExt, NetworkBroadcastExec, NetworkCoalesceExec, NetworkShuffleExec, TaskEstimator, }; +use datafusion::common::DataFusionError; +use datafusion::common::tree_node::TreeNode; use datafusion::config::ConfigOptions; -use datafusion::error::DataFusionError; -use datafusion::physical_optimizer::PhysicalOptimizerRule; use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec; use datafusion::physical_plan::{ExecutionPlan, ExecutionPlanProperties}; -use std::fmt::Debug; use std::ops::AddAssign; use std::sync::Arc; use uuid::Uuid; -use super::insert_broadcast::insert_broadcast_execs; - -/// Physical optimizer rule that inspects the plan, places the appropriate network -/// boundaries, and breaks it down into stages that can be executed in a distributed manner. +/// Inspects the plan, places the appropriate network boundaries, and breaks it down into stages +/// that can be executed in a distributed manner. +/// +/// It performs the following operations: /// -/// The rule has three steps: +/// 1. It prepares the plan for distribution, adding some extra single-node nodes like +/// [BroadcastExec] or [CoalescePartitionsExec] that will signal the following steps to +/// introduce network boundaries in the appropriate places. /// -/// 1. Annotate the plan with [annotate_plan]: adds some annotations to each node about how +/// 2. Annotate the plan with [annotate_plan]: adds some annotations to each node about how /// many distributed tasks should be used in the stage containing them, and whether they /// need a network boundary below or not. /// For more information about this step, read [annotate_plan] docs. /// -/// 2. Based on the [AnnotatedPlan] returned by [annotate_plan], place all the appropriate +/// 3. Based on the [AnnotatedPlan] returned by [annotate_plan], place all the appropriate /// network boundaries ([NetworkShuffleExec] and [NetworkCoalesceExec]) with the task count /// assignation that the annotations required. After this, the plan is already a distributed /// executable plan. /// -/// 3. Place the [CoalesceBatchesExec] in the appropriate places (just below network boundaries), +/// 4. Place the [CoalesceBatchesExec] in the appropriate places (just below network boundaries), /// so that we send fewer and bigger record batches over the wire instead of a lot of small ones. -#[derive(Debug, Default)] -pub struct DistributedPhysicalOptimizerRule; - -impl PhysicalOptimizerRule for DistributedPhysicalOptimizerRule { - fn optimize( - &self, - original: Arc, - cfg: &ConfigOptions, - ) -> datafusion::common::Result> { - if original.as_any().is::() { - return Ok(original); - } +/// +/// This function returns None if the plan was left undistributed. +pub(super) async fn distribute_plan( + original: Arc, + cfg: &ConfigOptions, +) -> datafusion::common::Result>> { + // Keep this function idempotent. + if original.exists(|plan| Ok(plan.is_network_boundary()))? { + return Ok(None); + } - let mut plan = Arc::clone(&original); - if original.output_partitioning().partition_count() > 1 { - plan = Arc::new(CoalescePartitionsExec::new(plan)) - } + let mut plan = Arc::clone(&original); - plan = insert_broadcast_execs(plan, cfg)?; + // Add a CoalescePartitionsExec on top of the plan if necessary. The plan annotator will see + // this and will place a NetworkCoalesceExec below it. + if plan.output_partitioning().partition_count() > 1 { + plan = Arc::new(CoalescePartitionsExec::new(plan)); + } - let annotated = annotate_plan(plan, cfg)?; + // Insert BroadcastExec nodes in collect left joins so that the plan annotator can inject + // broadcast network boundaries above. + plan = insert_broadcast_execs(plan, cfg)?; - let mut stage_id = 1; - let distributed = distribute_plan(annotated, cfg, Uuid::new_v4(), &mut stage_id)?; - if stage_id == 1 { - return Ok(original); - } - let distributed = batch_coalescing_below_network_boundaries(distributed, cfg)?; + // Annotate the plan with network boundary and task count information. + let annotated = annotate_plan(plan, cfg).await?; - Ok(Arc::new(DistributedExec::new(distributed))) + // Based on the annotations, place the actual network boundaries with the appropriate dimensions. + let mut stage_id = 1; + let plan = _distribute_plan(annotated, cfg, Uuid::new_v4(), &mut stage_id)?; + if stage_id == 1 { + return Ok(None); } - fn name(&self) -> &str { - "DistributedPhysicalOptimizer" - } + // Place some batch coalescing nodes before network boundaries in order to send big batches + // over the wire. + // TODO: This should be removed after DataFusion 53 upgrade, as CoalesceBatchesExec is deprecated. + let plan = batch_coalescing_below_network_boundaries(plan, cfg)?; - fn schema_check(&self) -> bool { - true - } + Ok(Some(plan)) } /// Takes an [AnnotatedPlan] and returns a modified [ExecutionPlan] with all the network boundaries @@ -84,7 +86,7 @@ impl PhysicalOptimizerRule for DistributedPhysicalOptimizerRule { /// which they are going to run. This is configurable by the user via the [TaskEstimator] trait. /// - The appropriate network boundaries are placed in the plan depending on how it was annotated, /// so new nodes like [NetworkBroadcastExec], [NetworkCoalesceExec] and [NetworkShuffleExec] will be present. -fn distribute_plan( +fn _distribute_plan( annotated_plan: AnnotatedPlan, cfg: &ConfigOptions, query_id: Uuid, @@ -96,7 +98,7 @@ fn distribute_plan( let max_child_task_count = children.iter().map(|v| v.task_count.as_usize()).max(); let new_children = children .into_iter() - .map(|child| distribute_plan(child, cfg, query_id, stage_id)) + .map(|child| _distribute_plan(child, cfg, query_id, stage_id)) .collect::, _>>()?; match annotated_plan.plan_or_nb { // This is a leaf node. It needs to be scaled up in order to account for it running in @@ -174,12 +176,9 @@ mod tests { BuildSideOneTaskEstimator, TestPlanOptions, base_session_builder, context_with_query, sql_to_physical_plan, }; - use crate::{ - DistributedExt, DistributedPhysicalOptimizerRule, assert_snapshot, display_plan_ascii, - }; + use crate::{DistributedExt, SessionStateBuilderExt, assert_snapshot, display_plan_ascii}; use datafusion::execution::SessionStateBuilder; use datafusion::physical_plan::displayable; - use std::sync::Arc; /* schema for the "weather" table MinTemp [type=DOUBLE] [repetitiontype=OPTIONAL] @@ -227,7 +226,7 @@ mod tests { b.with_distributed_worker_resolver(InMemoryWorkerResolver::new(3)) }) .await; - assert_snapshot!(plan, @" + assert_snapshot!(plan, @r" ┌───── DistributedExec ── Tasks: t0:[p0] │ ProjectionExec: expr=[count(*)@0 as count(*), RainToday@1 as RainToday] │ SortPreservingMergeExec: [count(Int64(1))@2 ASC NULLS LAST] @@ -257,7 +256,7 @@ mod tests { b.with_distributed_worker_resolver(InMemoryWorkerResolver::new(2)) }) .await; - assert_snapshot!(plan, @" + assert_snapshot!(plan, @r" ┌───── DistributedExec ── Tasks: t0:[p0] │ ProjectionExec: expr=[count(*)@0 as count(*), RainToday@1 as RainToday] │ SortPreservingMergeExec: [count(Int64(1))@2 ASC NULLS LAST] @@ -310,7 +309,7 @@ mod tests { .unwrap() }) .await; - assert_snapshot!(plan, @" + assert_snapshot!(plan, @r" ┌───── DistributedExec ── Tasks: t0:[p0] │ ProjectionExec: expr=[count(*)@0 as count(*), RainToday@1 as RainToday] │ SortPreservingMergeExec: [count(Int64(1))@2 ASC NULLS LAST] @@ -360,7 +359,7 @@ mod tests { b.with_distributed_worker_resolver(InMemoryWorkerResolver::new(3)) }) .await; - assert_snapshot!(plan, @" + assert_snapshot!(plan, @r" ┌───── DistributedExec ── Tasks: t0:[p0] │ ProjectionExec: expr=[count(*)@0 as count(*), RainToday@1 as RainToday] │ SortPreservingMergeExec: [count(Int64(1))@2 ASC NULLS LAST] @@ -427,7 +426,7 @@ mod tests { b.with_distributed_worker_resolver(InMemoryWorkerResolver::new(3)) }) .await; - assert_snapshot!(plan, @" + assert_snapshot!(plan, @r" ┌───── DistributedExec ── Tasks: t0:[p0] │ CoalescePartitionsExec │ HashJoinExec: mode=CollectLeft, join_type=Left, on=[(RainTomorrow@1, RainTomorrow@1)], projection=[MinTemp@0, MaxTemp@2] @@ -470,7 +469,7 @@ mod tests { b.with_distributed_worker_resolver(InMemoryWorkerResolver::new(3)) }) .await; - assert_snapshot!(plan, @" + assert_snapshot!(plan, @r" ┌───── DistributedExec ── Tasks: t0:[p0] │ SortPreservingMergeExec: [MinTemp@0 DESC] │ [Stage 1] => NetworkCoalesceExec: output_partitions=3, input_tasks=3 @@ -492,7 +491,7 @@ mod tests { b.with_distributed_worker_resolver(InMemoryWorkerResolver::new(3)) }) .await; - assert_snapshot!(plan, @" + assert_snapshot!(plan, @r" ┌───── DistributedExec ── Tasks: t0:[p0] │ CoalescePartitionsExec │ [Stage 2] => NetworkCoalesceExec: output_partitions=8, input_tasks=2 @@ -589,7 +588,7 @@ mod tests { b.with_distributed_worker_resolver(InMemoryWorkerResolver::new(6)) }) .await; - assert_snapshot!(plan, @" + assert_snapshot!(plan, @r" ┌───── DistributedExec ── Tasks: t0:[p0] │ CoalescePartitionsExec │ [Stage 1] => NetworkCoalesceExec: output_partitions=24, input_tasks=6 @@ -621,7 +620,7 @@ mod tests { b.with_distributed_worker_resolver(InMemoryWorkerResolver::new(3)) }) .await; - assert_snapshot!(plan, @" + assert_snapshot!(plan, @r" ┌───── DistributedExec ── Tasks: t0:[p0] │ CoalescePartitionsExec │ [Stage 1] => NetworkCoalesceExec: output_partitions=12, input_tasks=3 @@ -732,7 +731,7 @@ mod tests { ON a."RainToday" = b."RainToday" "#; let annotated = sql_to_explain_with_broadcast(query, 3, true).await; - assert_snapshot!(annotated, @" + assert_snapshot!(annotated, @r" ┌───── DistributedExec ── Tasks: t0:[p0] │ CoalescePartitionsExec │ [Stage 2] => NetworkCoalesceExec: output_partitions=3, input_tasks=3 @@ -761,7 +760,7 @@ mod tests { INNER JOIN weather c ON b."RainToday" = c."RainToday" "#; let plan = sql_to_explain_with_broadcast(query, 3, true).await; - assert_snapshot!(plan, @" + assert_snapshot!(plan, @r" ┌───── DistributedExec ── Tasks: t0:[p0] │ CoalescePartitionsExec │ [Stage 3] => NetworkCoalesceExec: output_partitions=3, input_tasks=3 @@ -806,7 +805,7 @@ mod tests { "); let plan = sql_to_explain_with_broadcast(query, 3, true).await; - assert_snapshot!(plan, @" + assert_snapshot!(plan, @r" ┌───── DistributedExec ── Tasks: t0:[p0] │ CoalescePartitionsExec │ [Stage 2] => NetworkCoalesceExec: output_partitions=3, input_tasks=3 @@ -877,7 +876,7 @@ mod tests { ON a."RainToday" = b."RainToday" "#; let plan = sql_to_explain_with_broadcast_one_to_many(query, 3).await; - assert_snapshot!(plan, @" + assert_snapshot!(plan, @r" ┌───── DistributedExec ── Tasks: t0:[p0] │ CoalescePartitionsExec │ [Stage 2] => NetworkCoalesceExec: output_partitions=3, input_tasks=3 @@ -949,8 +948,7 @@ mod tests { options.broadcast_enabled, ); if use_optimizer { - builder = - builder.with_physical_optimizer_rule(Arc::new(DistributedPhysicalOptimizerRule)); + builder = builder.with_distributed_planner() } let builder = configure(builder); let (ctx, query) = context_with_query(builder, query).await; diff --git a/src/distributed_planner/distributed_config.rs b/src/distributed_planner/distributed_config.rs index 8aeaa435..bbe81ee8 100644 --- a/src/distributed_planner/distributed_config.rs +++ b/src/distributed_planner/distributed_config.rs @@ -118,8 +118,8 @@ impl ConfigExtension for DistributedConfig { // FIXME: Ideally, both ChannelResolverExtension and TaskEstimators would be passed as // extensions in SessionConfig's AnyMap instead of the ConfigOptions. However, we need // to pass this as ConfigOptions as we need these two fields to be present during -// planning in the DistributedPhysicalOptimizerRule, and the signature of the optimize() -// method there accepts a ConfigOptions instead of a SessionConfig. +// planning in the DistributedQueryPlanner, and the signature of the create_physical_plan() +// method there accepts a SessionState which only provides ConfigOptions. // The following PR addresses this: https://github.com/apache/datafusion/pull/18168 // but it still has not been accepted or merged. // Because of this, all the boilerplate trait implementations below are needed. diff --git a/src/distributed_planner/mod.rs b/src/distributed_planner/mod.rs index 2f9b2895..15a3dc30 100644 --- a/src/distributed_planner/mod.rs +++ b/src/distributed_planner/mod.rs @@ -1,14 +1,14 @@ mod batch_coalescing_below_network_boundaries; +mod distribute_plan; mod distributed_config; -mod distributed_physical_optimizer_rule; mod insert_broadcast; mod network_boundary; mod plan_annotator; +mod session_state_builder_ext; mod task_estimator; -pub(crate) use batch_coalescing_below_network_boundaries::batch_coalescing_below_network_boundaries; pub use distributed_config::DistributedConfig; -pub use distributed_physical_optimizer_rule::DistributedPhysicalOptimizerRule; pub use network_boundary::{NetworkBoundary, NetworkBoundaryExt}; +pub use session_state_builder_ext::SessionStateBuilderExt; pub(crate) use task_estimator::set_distributed_task_estimator; pub use task_estimator::{TaskCountAnnotation, TaskEstimation, TaskEstimator}; diff --git a/src/distributed_planner/plan_annotator.rs b/src/distributed_planner/plan_annotator.rs index 9aeeb461..f865cdb6 100644 --- a/src/distributed_planner/plan_annotator.rs +++ b/src/distributed_planner/plan_annotator.rs @@ -1,7 +1,7 @@ use crate::TaskCountAnnotation::{Desired, Maximum}; use crate::execution_plans::ChildrenIsolatorUnionExec; use crate::{BroadcastExec, DistributedConfig, TaskCountAnnotation, TaskEstimator}; -use datafusion::common::{DataFusionError, plan_datafusion_err}; +use datafusion::common::{DataFusionError, Result, plan_datafusion_err}; use datafusion::config::ConfigOptions; use datafusion::physical_expr::Partitioning; use datafusion::physical_plan::ExecutionPlan; @@ -158,19 +158,19 @@ impl Debug for AnnotatedPlan { /// ``` /// /// ``` -pub(super) fn annotate_plan( +pub(super) async fn annotate_plan( plan: Arc, cfg: &ConfigOptions, ) -> Result { - _annotate_plan(plan, None, cfg, true) + _annotate_plan(plan, None, cfg, true).await } -fn _annotate_plan( +async fn _annotate_plan( plan: Arc, parent: Option<&Arc>, cfg: &ConfigOptions, root: bool, -) -> Result { +) -> Result { let d_cfg = DistributedConfig::from_config_options(cfg)?; let broadcast_joins = d_cfg.broadcast_joins; let estimator = &d_cfg.__private_task_estimator; @@ -179,11 +179,13 @@ fn _annotate_plan( v => v, }; - let annotated_children = plan - .children() - .iter() - .map(|child| _annotate_plan(Arc::clone(child), Some(&plan), cfg, false)) - .collect::, _>>()?; + let children = plan.children(); + let mut futures = Vec::with_capacity(children.len()); + for child in children { + let child = Arc::clone(child); + futures.push(Box::pin(_annotate_plan(child, Some(&plan), cfg, false))); + } + let annotated_children = futures::future::try_join_all(futures).await?; if plan.children().is_empty() { // This is a leaf node, maybe a DataSourceExec, or maybe something else custom from the @@ -265,7 +267,7 @@ fn _annotate_plan( // If the parent is trying to coalesce all partitions into one, we need to introduce // a network coalesce right below it (or in other words, above the current node) && (parent.as_any().is::() - || parent.as_any().is::()) + || parent.as_any().is::()) { // A BroadcastExec underneath a coalesce parent means the build side will cross stages. if plan.as_any().is::() { @@ -980,10 +982,12 @@ mod tests { let df = ctx.sql(&query).await.unwrap(); let mut plan = df.create_physical_plan().await.unwrap(); - plan = insert_broadcast_execs(plan, ctx.state_ref().read().config_options().as_ref()) + let session_config = ctx.copied_config(); + plan = insert_broadcast_execs(plan, session_config.options()) .expect("failed to insert broadcasts"); - let annotated = annotate_plan(plan, ctx.state_ref().read().config_options().as_ref()) + let annotated = annotate_plan(plan, session_config.options()) + .await .expect("failed to annotate plan"); format!("{annotated:?}") } diff --git a/src/distributed_planner/session_state_builder_ext.rs b/src/distributed_planner/session_state_builder_ext.rs new file mode 100644 index 00000000..5b1757f0 --- /dev/null +++ b/src/distributed_planner/session_state_builder_ext.rs @@ -0,0 +1,60 @@ +use crate::DistributedExec; +use crate::distributed_planner::distribute_plan::distribute_plan; +use async_trait::async_trait; +use datafusion::common::Result; +use datafusion::execution::context::QueryPlanner; +use datafusion::execution::{SessionState, SessionStateBuilder}; +use datafusion::logical_expr::LogicalPlan; +use datafusion::physical_plan::ExecutionPlan; +use datafusion::physical_planner::{DefaultPhysicalPlanner, PhysicalPlanner}; +use std::sync::Arc; + +/// Extension trait for [SessionStateBuilder]. +pub trait SessionStateBuilderExt { + /// Injects a [QueryPlanner] implementation that attempts to distribute the plan after the + /// normal planning passes are performed. + /// + /// It will wrap the existing query planner if one, so while setting up DataFusion's + /// [SessionStateBuilder], it's important to inject the custom user query planner implementation + /// with [SessionStateBuilderExt::with_distributed_planner] strictly *before* calling + /// [SessionStateBuilder::with_query_planner]. + fn with_distributed_planner(self) -> Self; +} + +impl SessionStateBuilderExt for SessionStateBuilder { + fn with_distributed_planner(mut self) -> Self { + let prev = std::mem::take(self.query_planner()); + self.with_query_planner(Arc::new(DistributedQueryPlanner { prev })) + } +} +#[derive(Debug)] +struct DistributedQueryPlanner { + prev: Option>, +} + +#[async_trait] +impl QueryPlanner for DistributedQueryPlanner { + async fn create_physical_plan( + &self, + logical_plan: &LogicalPlan, + session_state: &SessionState, + ) -> Result> { + let s_plan = match &self.prev { + None => { + // Use the default physical planner. + let planner = DefaultPhysicalPlanner::default(); + planner + .create_physical_plan(logical_plan, session_state) + .await? + } + Some(prev) => { + prev.create_physical_plan(logical_plan, session_state) + .await? + } + }; + match distribute_plan(Arc::clone(&s_plan), session_state.config_options()).await? { + Some(d_plan) => Ok(Arc::new(DistributedExec::new(d_plan))), + None => Ok(s_plan), + } + } +} diff --git a/src/lib.rs b/src/lib.rs index 988903ac..49c6d81a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -20,7 +20,7 @@ pub mod test_utils; pub use arrow_ipc::CompressionType; pub use distributed_ext::DistributedExt; pub use distributed_planner::{ - DistributedConfig, DistributedPhysicalOptimizerRule, NetworkBoundary, NetworkBoundaryExt, + DistributedConfig, NetworkBoundary, NetworkBoundaryExt, SessionStateBuilderExt, TaskCountAnnotation, TaskEstimation, TaskEstimator, }; pub use execution_plans::{ diff --git a/src/metrics/task_metrics_collector.rs b/src/metrics/task_metrics_collector.rs index d524d158..ac110e82 100644 --- a/src/metrics/task_metrics_collector.rs +++ b/src/metrics/task_metrics_collector.rs @@ -130,7 +130,7 @@ mod tests { count_plan_nodes_up_to_network_boundary, get_stages_and_task_keys, }; use crate::test_utils::session_context::register_temp_parquet_table; - use crate::{DistributedExt, DistributedPhysicalOptimizerRule}; + use crate::{DistributedExt, SessionStateBuilderExt}; use datafusion::execution::{SessionStateBuilder, context::SessionContext}; use datafusion::prelude::SessionConfig; use datafusion::{ @@ -151,7 +151,7 @@ mod tests { .with_config(config) .with_distributed_worker_resolver(InMemoryWorkerResolver::new(10)) .with_distributed_channel_resolver(InMemoryChannelResolver::default()) - .with_physical_optimizer_rule(Arc::new(DistributedPhysicalOptimizerRule)) + .with_distributed_planner() .with_distributed_task_estimator(2) .with_distributed_metrics_collection(true) .unwrap() diff --git a/src/metrics/task_metrics_rewriter.rs b/src/metrics/task_metrics_rewriter.rs index 5e71a9fe..081de9d5 100644 --- a/src/metrics/task_metrics_rewriter.rs +++ b/src/metrics/task_metrics_rewriter.rs @@ -284,7 +284,7 @@ mod tests { use crate::test_utils::plans::count_plan_nodes_up_to_network_boundary; use crate::test_utils::session_context::register_temp_parquet_table; use crate::worker::generated::worker as pb; - use crate::{DistributedExec, DistributedPhysicalOptimizerRule}; + use crate::{DistributedExec, SessionStateBuilderExt}; use datafusion::arrow::array::{Int32Array, StringArray}; use datafusion::arrow::datatypes::{DataType, Field, Schema}; use datafusion::arrow::record_batch::RecordBatch; @@ -328,7 +328,7 @@ mod tests { .with_distributed_channel_resolver(InMemoryChannelResolver::default()) .with_distributed_metrics_collection(true) .unwrap() - .with_physical_optimizer_rule(Arc::new(DistributedPhysicalOptimizerRule)) + .with_distributed_planner() .with_distributed_task_estimator(2) } diff --git a/src/test_utils/localhost.rs b/src/test_utils/localhost.rs index 2db20968..10632022 100644 --- a/src/test_utils/localhost.rs +++ b/src/test_utils/localhost.rs @@ -1,13 +1,10 @@ -use crate::{ - DistributedExt, DistributedPhysicalOptimizerRule, Worker, WorkerResolver, WorkerSessionBuilder, -}; +use crate::{DistributedExt, SessionStateBuilderExt, Worker, WorkerResolver, WorkerSessionBuilder}; use async_trait::async_trait; use datafusion::common::DataFusionError; use datafusion::common::runtime::JoinSet; use datafusion::execution::SessionStateBuilder; use datafusion::prelude::SessionContext; use std::error::Error; -use std::sync::Arc; use std::time::Duration; use tokio::net::TcpListener; use tonic::transport::Server; @@ -68,7 +65,7 @@ where let worker_resolver = LocalHostWorkerResolver::new(ports); let mut state = SessionStateBuilder::new() .with_default_features() - .with_physical_optimizer_rule(Arc::new(DistributedPhysicalOptimizerRule)) + .with_distributed_planner() .with_distributed_worker_resolver(worker_resolver) .build(); state.config_mut().options_mut().execution.target_partitions = 3; diff --git a/tests/udfs.rs b/tests/udfs.rs index 3df06e75..c95ebc46 100644 --- a/tests/udfs.rs +++ b/tests/udfs.rs @@ -1,6 +1,5 @@ #[cfg(all(feature = "integration", test))] mod tests { - use arrow::datatypes::{Field, Schema}; use arrow::util::pretty::pretty_format_batches; use datafusion::arrow::datatypes::DataType; use datafusion::error::DataFusionError; @@ -8,17 +7,10 @@ mod tests { use datafusion::logical_expr::{ ColumnarValue, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, Signature, Volatility, }; - use datafusion::physical_expr::expressions::lit; - use datafusion::physical_expr::{Partitioning, ScalarFunctionExpr}; - use datafusion::physical_optimizer::PhysicalOptimizerRule; - use datafusion::physical_plan::empty::EmptyExec; - use datafusion::physical_plan::repartition::RepartitionExec; - use datafusion::physical_plan::{ExecutionPlan, execute_stream}; + use datafusion::physical_plan::execute_stream; use datafusion_distributed::test_utils::localhost::start_localhost_context; - use datafusion_distributed::{ - DistributedExt, DistributedPhysicalOptimizerRule, WorkerQueryContext, assert_snapshot, - display_plan_ascii, - }; + use datafusion_distributed::test_utils::parquet::register_parquet_tables; + use datafusion_distributed::{WorkerQueryContext, assert_snapshot, display_plan_ascii}; use futures::TryStreamExt; use std::any::Any; use std::error::Error; @@ -30,48 +22,39 @@ mod tests { Ok(ctx.builder.with_scalar_functions(vec![udf()]).build()) } - let (mut ctx, _guard, _) = start_localhost_context(3, build_state).await; - ctx = SessionStateBuilder::from(ctx.state()) - .with_distributed_task_estimator(2) + let (ctx, _guard, _) = start_localhost_context(3, build_state).await; + let ctx = SessionStateBuilder::from(ctx.state()) .with_scalar_functions(vec![udf()]) .build() .into(); - let wrap = |input: Arc| -> Arc { - Arc::new( - RepartitionExec::try_new( - input, - Partitioning::Hash( - vec![Arc::new(ScalarFunctionExpr::new( - "test_udf", - udf(), - vec![lit(1)], - Arc::new(Field::new("return", DataType::Int32, false)), - Default::default(), - ))], - 1, - ), - ) - .unwrap(), - ) - }; - - let node = wrap(wrap(Arc::new(EmptyExec::new(Arc::new(Schema::empty()))))); - - let physical_distributed = - DistributedPhysicalOptimizerRule.optimize(node, ctx.copied_config().options())?; + register_parquet_tables(&ctx).await?; + let df = ctx + .sql(r#"SELECT test_udf("RainToday"), count(*) FROM weather GROUP BY test_udf("RainToday") ORDER BY count(*)"#) + .await?; + let physical_distributed = df.create_physical_plan().await?; let physical_distributed_str = display_plan_ascii(physical_distributed.as_ref(), false); assert_snapshot!(physical_distributed_str, @r" ┌───── DistributedExec ── Tasks: t0:[p0] - │ [Stage 2] => NetworkShuffleExec: output_partitions=1, input_tasks=2 + │ ProjectionExec: expr=[test_udf(weather.RainToday)@0 as test_udf(weather.RainToday), count(*)@1 as count(*)] + │ SortPreservingMergeExec: [count(Int64(1))@2 ASC NULLS LAST] + │ [Stage 2] => NetworkCoalesceExec: output_partitions=6, input_tasks=2 └────────────────────────────────────────────────── - ┌───── Stage 1 ── Tasks: t0:[p0..p1] t1:[p0..p1] - │ RepartitionExec: partitioning=Hash([test_udf(1)], 2), input_partitions=1 - │ EmptyExec + ┌───── Stage 2 ── Tasks: t0:[p0..p2] t1:[p0..p2] + │ SortExec: expr=[count(*)@1 ASC NULLS LAST], preserve_partitioning=[true] + │ ProjectionExec: expr=[test_udf(weather.RainToday)@0 as test_udf(weather.RainToday), count(Int64(1))@1 as count(*), count(Int64(1))@1 as count(Int64(1))] + │ AggregateExec: mode=FinalPartitioned, gby=[test_udf(weather.RainToday)@0 as test_udf(weather.RainToday)], aggr=[count(Int64(1))] + │ [Stage 1] => NetworkShuffleExec: output_partitions=3, input_tasks=3 └────────────────────────────────────────────────── + ┌───── Stage 1 ── Tasks: t0:[p0..p5] t1:[p0..p5] t2:[p0..p5] + │ RepartitionExec: partitioning=Hash([test_udf(weather.RainToday)@0], 6), input_partitions=1 + │ AggregateExec: mode=Partial, gby=[test_udf(RainToday@0) as test_udf(weather.RainToday)], aggr=[count(Int64(1))] + │ PartitionIsolatorExec: tasks=3 partitions=3 + │ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[RainToday], file_type=parquet + └────────────────────────────────────────────────── ", ); @@ -82,8 +65,12 @@ mod tests { )?; assert_snapshot!(batches, @r" - ++ - ++ + +-----------------------------+----------+ + | test_udf(weather.RainToday) | count(*) | + +-----------------------------+----------+ + | Yes | 66 | + | No | 300 | + +-----------------------------+----------+ "); Ok(()) }