Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions benchmarks/cdk/bin/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ use datafusion::execution::runtime_env::RuntimeEnv;
use datafusion::physical_plan::execute_stream;
use datafusion::prelude::SessionContext;
use datafusion_distributed::{
ChannelResolver, DistributedExt, DistributedMetricsFormat, DistributedPhysicalOptimizerRule,
Worker, WorkerResolver, display_plan_ascii, get_distributed_channel_resolver,
ChannelResolver, DistributedExt, DistributedMetricsFormat, SessionStateBuilderExt, Worker,
WorkerResolver, display_plan_ascii, get_distributed_channel_resolver,
get_distributed_worker_resolver, rewrite_distributed_plan_with_metrics,
};
use futures::{StreamExt, TryFutureExt};
Expand Down Expand Up @@ -94,7 +94,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
.with_default_features()
.with_runtime_env(Arc::clone(&runtime_env))
.with_distributed_worker_resolver(Ec2WorkerResolver::new())
.with_physical_optimizer_rule(Arc::new(DistributedPhysicalOptimizerRule))
.with_distributed_planner()
.with_distributed_broadcast_joins(cmd.broadcast_joins)?
.build();
let ctx = SessionContext::from(state);
Expand Down
6 changes: 2 additions & 4 deletions benchmarks/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,7 @@ use datafusion::physical_plan::display::DisplayableExecutionPlan;
use datafusion::physical_plan::{collect, displayable};
use datafusion::prelude::*;
use datafusion_distributed::test_utils::localhost::LocalHostWorkerResolver;
use datafusion_distributed::{
DistributedExt, DistributedPhysicalOptimizerRule, NetworkBoundaryExt, Worker,
};
use datafusion_distributed::{DistributedExt, NetworkBoundaryExt, SessionStateBuilderExt, Worker};
use datafusion_distributed_benchmarks::datasets::{clickbench, register_tables, tpcds, tpch};
use std::error::Error;
use std::fs;
Expand Down Expand Up @@ -178,7 +176,7 @@ impl RunOpt {
.with_default_features()
.with_config(self.config()?)
.with_distributed_worker_resolver(LocalHostWorkerResolver::new(self.workers.clone()))
.with_physical_optimizer_rule(Arc::new(DistributedPhysicalOptimizerRule))
.with_distributed_planner()
.with_distributed_files_per_task(
self.files_per_task.unwrap_or(get_available_parallelism()),
)?
Expand Down
4 changes: 2 additions & 2 deletions cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use datafusion_cli::{
use datafusion_distributed::test_utils::in_memory_channel_resolver::{
InMemoryChannelResolver, InMemoryWorkerResolver,
};
use datafusion_distributed::{DistributedExt, DistributedPhysicalOptimizerRule};
use datafusion_distributed::{DistributedExt, SessionStateBuilderExt};
use std::env;
use std::path::Path;
use std::process::ExitCode;
Expand Down Expand Up @@ -148,7 +148,7 @@ async fn main_inner() -> Result<()> {
.with_default_features()
.with_config(session_config)
.with_runtime_env(runtime_env)
.with_physical_optimizer_rule(Arc::new(DistributedPhysicalOptimizerRule))
.with_distributed_planner()
.with_distributed_worker_resolver(InMemoryWorkerResolver::new(16))
.with_distributed_channel_resolver(InMemoryChannelResolver::default())
.build();
Expand Down
5 changes: 2 additions & 3 deletions console/examples/console_run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,10 @@ use datafusion::common::DataFusionError;
use datafusion::execution::SessionStateBuilder;
use datafusion::prelude::{ParquetReadOptions, SessionContext};
use datafusion_distributed::{
DistributedExt, DistributedPhysicalOptimizerRule, WorkerResolver, display_plan_ascii,
DistributedExt, SessionStateBuilderExt, WorkerResolver, display_plan_ascii,
};
use futures::TryStreamExt;
use std::error::Error;
use std::sync::Arc;
use structopt::StructOpt;
use url::Url;

Expand Down Expand Up @@ -38,7 +37,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
let state = SessionStateBuilder::new()
.with_default_features()
.with_distributed_worker_resolver(localhost_resolver)
.with_physical_optimizer_rule(Arc::new(DistributedPhysicalOptimizerRule))
.with_distributed_planner()
.with_distributed_files_per_task(1)?
.build();

Expand Down
4 changes: 2 additions & 2 deletions console/examples/tpcds_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use datafusion::execution::SessionStateBuilder;
use datafusion::physical_plan::execute_stream;
use datafusion::prelude::SessionContext;
use datafusion_distributed::{
DistributedExt, DistributedMetricsFormat, DistributedPhysicalOptimizerRule, WorkerResolver,
DistributedExt, DistributedMetricsFormat, SessionStateBuilderExt, WorkerResolver,
display_plan_ascii,
};
use datafusion_distributed_benchmarks::datasets::{register_tables, tpcds};
Expand Down Expand Up @@ -97,7 +97,7 @@ async fn run_queries(
let state = SessionStateBuilder::new()
.with_default_features()
.with_distributed_worker_resolver(localhost_resolver)
.with_physical_optimizer_rule(Arc::new(DistributedPhysicalOptimizerRule))
.with_distributed_planner()
.build();

let ctx = SessionContext::from(state);
Expand Down
2 changes: 1 addition & 1 deletion docs/source/user-guide/channel-resolver.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ async fn main() {
let state = SessionStateBuilder::new()
// these two are mandatory.
.with_distributed_worker_resolver(todo!())
.with_physical_optimizer_rule(Arc::new(DistributedPhysicalOptimizerRule))
.with_distributed_planner()
// the CustomChannelResolver needs to be passed here once...
.with_distributed_channel_resolver(channel_resolver.clone())
.build();
Expand Down
6 changes: 3 additions & 3 deletions docs/source/user-guide/concepts.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ You'll see these concepts mentioned extensively across the documentation and the

Some other more tangible concepts are the structs and traits exposed publicly, the most important are:

## [DistributedPhysicalOptimizerRule](https://github.com/datafusion-contrib/datafusion-distributed/blob/main/src/distributed_planner/distributed_physical_optimizer_rule.rs)
## [SessionStateBuilderExt](https://github.com/datafusion-contrib/datafusion-distributed/blob/main/src/distributed_planner/session_state_builder_ext.rs)

A physical optimizer rule that transforms single-node DataFusion query plans into distributed query plans. It reads
a fully formed physical plan and injects the appropriate nodes to execute the query in a distributed fashion.
An extension trait for `SessionStateBuilder` that provides `with_distributed_planner()`. This registers a custom
query planner that transforms single-node DataFusion query plans into distributed query plans after physical planning.

It builds the distributed plan from bottom to top, injecting network boundaries at appropriate locations based on
the nodes present in the original plan.
Expand Down
4 changes: 2 additions & 2 deletions docs/source/user-guide/getting-started.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ impl WorkerResolver for LocalhostWorkerResolver {
}
```

Register both the `WorkerResolver` implementation and the `DistributedPhysicalOptimizerRule` in DataFusion's
Register the `WorkerResolver` implementation and the distributed planner in DataFusion's
`SessionStateBuilder` to enable distributed query planning:

```rs
Expand All @@ -59,7 +59,7 @@ let localhost_worker_resolver = LocalhostWorkerResolver {

let state = SessionStateBuilder::new()
.with_distributed_worker_resolver(localhost_worker_resolver)
.with_physical_optimizer_rule(Arc::new(DistributedPhysicalOptimizerRule))
.with_distributed_planner()
.build();

let ctx = SessionContext::from(state);
Expand Down
2 changes: 1 addition & 1 deletion docs/source/user-guide/worker-resolver.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ impl WorkerResolver for CustomWorkerResolver {
async fn main() {
let state = SessionStateBuilder::new()
.with_distributed_worker_resolver(CustomWorkerResolver)
.with_physical_optimizer_rule(Arc::new(DistributedPhysicalOptimizerRule))
.with_distributed_planner()
Comment thread
gabotechs marked this conversation as resolved.
.build();
}
```
Expand Down
4 changes: 2 additions & 2 deletions docs/source/user-guide/worker.md
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ With the resolver in place, wire it into the session and tag each worker with a

```rust
use datafusion::execution::SessionStateBuilder;
use datafusion_distributed::{DistributedExt, DistributedPhysicalOptimizerRule, Worker};
use datafusion_distributed::{DistributedExt, Worker};

let worker_version = std::env::var("COMMIT_HASH").unwrap_or_default();

Expand All @@ -262,7 +262,7 @@ let resolver = VersionAwareWorkerResolver::start_version_filtering(
let state = SessionStateBuilder::new()
.with_default_features()
.with_distributed_worker_resolver(resolver)
.with_physical_optimizer_rule(Arc::new(DistributedPhysicalOptimizerRule))
.with_distributed_planner()
.build();

let ctx = SessionContext::from(state);
Expand Down
6 changes: 3 additions & 3 deletions examples/custom_execution_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ use datafusion_distributed::test_utils::in_memory_channel_resolver::{
InMemoryChannelResolver, InMemoryWorkerResolver,
};
use datafusion_distributed::{
DistributedExt, DistributedPhysicalOptimizerRule, DistributedTaskContext, TaskEstimation,
TaskEstimator, WorkerQueryContext, display_plan_ascii,
DistributedExt, DistributedTaskContext, SessionStateBuilderExt, TaskEstimation, TaskEstimator,
WorkerQueryContext, display_plan_ascii,
};
use datafusion_proto::physical_plan::PhysicalExtensionCodec;
use datafusion_proto::protobuf;
Expand Down Expand Up @@ -382,7 +382,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.with_config(config)
.with_distributed_worker_resolver(worker_resolver)
.with_distributed_channel_resolver(channel_resolver)
.with_physical_optimizer_rule(Arc::new(DistributedPhysicalOptimizerRule))
.with_distributed_planner()
.with_distributed_user_codec(NumbersExecCodec)
.with_distributed_task_estimator(NumbersTaskEstimator)
.build();
Expand Down
5 changes: 2 additions & 3 deletions examples/in_memory_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,13 @@ use datafusion::common::DataFusionError;
use datafusion::execution::SessionStateBuilder;
use datafusion::prelude::{ParquetReadOptions, SessionContext};
use datafusion_distributed::{
BoxCloneSyncChannel, ChannelResolver, DistributedExt, DistributedPhysicalOptimizerRule, Worker,
BoxCloneSyncChannel, ChannelResolver, DistributedExt, SessionStateBuilderExt, Worker,
WorkerQueryContext, WorkerResolver, WorkerServiceClient, create_worker_client,
display_plan_ascii,
};
use futures::TryStreamExt;
use hyper_util::rt::TokioIo;
use std::error::Error;
use std::sync::Arc;
use structopt::StructOpt;
use tonic::transport::{Endpoint, Server};

Expand All @@ -38,7 +37,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
.with_default_features()
.with_distributed_worker_resolver(InMemoryWorkerResolver)
.with_distributed_channel_resolver(InMemoryChannelResolver::new())
.with_physical_optimizer_rule(Arc::new(DistributedPhysicalOptimizerRule))
.with_distributed_planner()
.with_distributed_files_per_task(1)?
.build();

Expand Down
5 changes: 2 additions & 3 deletions examples/localhost_run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,10 @@ use datafusion::common::DataFusionError;
use datafusion::execution::SessionStateBuilder;
use datafusion::prelude::{ParquetReadOptions, SessionContext};
use datafusion_distributed::{
DistributedExt, DistributedPhysicalOptimizerRule, WorkerResolver, display_plan_ascii,
DistributedExt, SessionStateBuilderExt, WorkerResolver, display_plan_ascii,
};
use futures::TryStreamExt;
use std::error::Error;
use std::sync::Arc;
use structopt::StructOpt;
use url::Url;

Expand Down Expand Up @@ -39,7 +38,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
let state = SessionStateBuilder::new()
.with_default_features()
.with_distributed_worker_resolver(localhost_resolver)
.with_physical_optimizer_rule(Arc::new(DistributedPhysicalOptimizerRule))
.with_distributed_planner()
.with_distributed_files_per_task(1)?
.build();

Expand Down
5 changes: 2 additions & 3 deletions examples/localhost_versioned_run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,11 @@ use datafusion::common::DataFusionError;
use datafusion::execution::SessionStateBuilder;
use datafusion::prelude::{ParquetReadOptions, SessionContext};
use datafusion_distributed::{
DefaultChannelResolver, DistributedExt, DistributedPhysicalOptimizerRule, GetWorkerInfoRequest,
DefaultChannelResolver, DistributedExt, GetWorkerInfoRequest, SessionStateBuilderExt,
WorkerResolver, create_worker_client, display_plan_ascii,
};
use futures::TryStreamExt;
use std::error::Error;
use std::sync::Arc;
use structopt::StructOpt;
use url::Url;

Expand Down Expand Up @@ -93,7 +92,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
let state = SessionStateBuilder::new()
.with_default_features()
.with_distributed_worker_resolver(localhost_resolver)
.with_physical_optimizer_rule(Arc::new(DistributedPhysicalOptimizerRule))
.with_distributed_planner()
.with_distributed_files_per_task(1)?
.build();

Expand Down
12 changes: 4 additions & 8 deletions src/distributed_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ pub trait DistributedExt: Sized {
/// # use datafusion::prelude::SessionConfig;
/// # use url::Url;
/// # use std::sync::Arc;
/// # use datafusion_distributed::{BoxCloneSyncChannel, WorkerResolver, DistributedExt, DistributedPhysicalOptimizerRule, WorkerQueryContext};
/// # use datafusion_distributed::{BoxCloneSyncChannel, WorkerResolver, DistributedExt, SessionStateBuilderExt, WorkerQueryContext};
///
/// struct CustomWorkerResolver;
///
Expand All @@ -211,9 +211,7 @@ pub trait DistributedExt: Sized {
/// // This tweaks the SessionState so that it can plan for distributed queries and execute them.
/// let state = SessionStateBuilder::new()
/// .with_distributed_worker_resolver(CustomWorkerResolver)
/// // the DistributedPhysicalOptimizerRule also needs to be passed so that query plans
/// // get distributed.
/// .with_physical_optimizer_rule(Arc::new(DistributedPhysicalOptimizerRule))
/// .with_distributed_planner()
/// .build();
/// ```
fn with_distributed_worker_resolver<T: WorkerResolver + Send + Sync + 'static>(
Expand Down Expand Up @@ -241,7 +239,7 @@ pub trait DistributedExt: Sized {
/// # use datafusion::prelude::SessionConfig;
/// # use url::Url;
/// # use std::sync::Arc;
/// # use datafusion_distributed::{BoxCloneSyncChannel, ChannelResolver, DistributedExt, DistributedPhysicalOptimizerRule, WorkerQueryContext, WorkerServiceClient};
/// # use datafusion_distributed::{BoxCloneSyncChannel, ChannelResolver, DistributedExt, SessionStateBuilderExt, WorkerQueryContext, WorkerServiceClient};
///
/// struct CustomChannelResolver;
///
Expand All @@ -256,9 +254,7 @@ pub trait DistributedExt: Sized {
/// // This tweaks the SessionState so that it can plan for distributed queries and execute them.
/// let state = SessionStateBuilder::new()
/// .with_distributed_channel_resolver(CustomChannelResolver)
/// // the DistributedPhysicalOptimizerRule also needs to be passed so that query plans
/// // get distributed.
/// .with_physical_optimizer_rule(Arc::new(DistributedPhysicalOptimizerRule))
/// .with_distributed_planner()
/// .build();
///
/// // This function can be provided to a Worker so that, upon receiving a distributed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,10 @@ pub(crate) fn batch_coalescing_below_network_boundaries(

#[cfg(test)]
mod tests {
use super::*;
use crate::distributed_planner::session_state_builder_ext::SessionStateBuilderExt;
use crate::test_utils::in_memory_channel_resolver::InMemoryWorkerResolver;
use crate::test_utils::parquet::register_parquet_tables;
use crate::{
DistributedExt, DistributedPhysicalOptimizerRule, assert_snapshot, display_plan_ascii,
};
use crate::{DistributedExt, assert_snapshot, display_plan_ascii};
use datafusion::execution::SessionStateBuilder;
use datafusion::prelude::{SessionConfig, SessionContext};
use itertools::Itertools;
Expand Down Expand Up @@ -161,7 +159,7 @@ mod tests {
let state = SessionStateBuilder::new()
.with_default_features()
.with_config(SessionConfig::new().with_target_partitions(4))
.with_physical_optimizer_rule(Arc::new(DistributedPhysicalOptimizerRule))
.with_distributed_planner()
.with_distributed_worker_resolver(InMemoryWorkerResolver::new(3))
.build();

Expand Down
Loading
Loading