Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
3cd9d34
Upgrade datafusion version to v52.3.0 (#369)
jayshrivastava Mar 16, 2026
e56922a
feat(console): rewrite TUI with worker-centric dashboard and live met…
EdsonPetry Mar 17, 2026
38d0105
Revert "Upgrade datafusion version to v52.3.0" (#371)
gabotechs Mar 17, 2026
46cdf9e
Fix system metrics never collecting due to async closure in `thread::…
EdsonPetry Mar 18, 2026
1dc0b4c
feat: add new rpc for automatic worker discovery
EdsonPetry Mar 6, 2026
d06a63e
feat: change observability service method to accept user defined work…
EdsonPetry Mar 6, 2026
381e9ee
feat(console): add automatic worker discovery via GetCluserWorkers RPC
EdsonPetry Mar 6, 2026
61d254c
feat(benchmarks): add observability service to benchmark worker.rs
EdsonPetry Mar 6, 2026
b4e218d
refactor(console): remove manual --cluster-ports mode, always use aut…
EdsonPetry Mar 6, 2026
629d294
fmt: cargo fmt
EdsonPetry Mar 13, 2026
9b4dd5a
feat: add observability service to benchmark ec2 worker
EdsonPetry Mar 17, 2026
17f5b80
refactor: change worker resolver field in observability_service type …
EdsonPetry Mar 17, 2026
295896c
fix: remove clap `arg` for structopt
EdsonPetry Mar 17, 2026
972c1fd
fix: remove duplicate worker resolvers, wrap worker resolvers in Arc
EdsonPetry Mar 17, 2026
280af17
fix: add ShuffleBench to lib.rs
EdsonPetry Mar 17, 2026
1af8462
docs(console): add README and update examples for auto-discovery work…
EdsonPetry Mar 17, 2026
b7dda94
doc(console): update README
EdsonPetry Mar 17, 2026
f5f7a8e
refactor(console): replace --connect with required positional port ar…
EdsonPetry Mar 18, 2026
8da964e
fix: cargo fmt
EdsonPetry Mar 18, 2026
8c87e4a
feat: add "system-metrics" feature to benchmarks Cargo.toml
EdsonPetry Mar 18, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ tokio-stream = { version = "0.1.17", features = ["sync"] }
tokio-util = "0.7"
moka = { version = "0.12", features = ["sync", "future"] }
crossbeam-queue = "0.3"
sysinfo = { version = "0.30", optional = true }
sketches-ddsketch = { version = "0.3", features = ["use_serde"] }
bincode = "1"

Expand Down Expand Up @@ -71,9 +72,12 @@ integration = [
"zip",
]

system-metrics = ["sysinfo"]

tpch = ["integration"]
tpcds = ["integration"]
clickbench = ["integration"]
sysinfo = ["dep:sysinfo"]

[dev-dependencies]
structopt = "0.3"
Expand Down
8 changes: 5 additions & 3 deletions benchmarks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ default-run = "dfbench"
[dependencies]
datafusion = { workspace = true }
datafusion-proto = { workspace = true }
datafusion-distributed = { path = "..", features = ["integration"] }
datafusion-distributed = { path = "..", features = [
"integration",
"system-metrics",
] }
tokio = { version = "1.48", features = ["full"] }
parquet = { version = "57.1.0" }
structopt = { version = "0.3.26" }
Expand Down Expand Up @@ -36,7 +39,7 @@ criterion = "0.5"
sysinfo = "0.30"

[build-dependencies]
built = { version = "0.8" , features = ["git2", "chrono"]}
built = { version = "0.8", features = ["git2", "chrono"] }

[[bin]]
name = "dfbench"
Expand All @@ -53,4 +56,3 @@ harness = false
[[bench]]
name = "shuffle"
harness = false

2 changes: 2 additions & 0 deletions benchmarks/cdk/bin/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,9 @@ async fn main() -> Result<(), Box<dyn Error>> {
}),
),
);
let ec2_worker_resolver = Arc::new(Ec2WorkerResolver::new());
let grpc_server = Server::builder()
.add_service(worker.with_observability_service(ec2_worker_resolver))
.add_service(worker.into_flight_server())
.serve(WORKER_ADDR.parse()?);

Expand Down
6 changes: 3 additions & 3 deletions console/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,17 @@ edition = "2024"

[dependencies]
datafusion = { workspace = true }
structopt = "0.3"
color-eyre = "0.6.5"
crossterm = "0.29.0"
futures = "0.3.31"
hex = "0.4"
ratatui = "0.30.0"
tokio = { version = "1.49.0", features = ["full"] }
tonic = "0.14.2"
datafusion-distributed = { path = "..", features = ["integration"] }
datafusion-distributed = { path = "..", features = ["integration", "system-metrics"] }
arrow-flight = "57.1.0"
structopt = "0.3.26"
url = "2.5.7"
tokio-stream = "0.1.18"

[dev-dependencies]
arrow = "57.1.0"
Expand Down
85 changes: 85 additions & 0 deletions console/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
# datafusion-distributed-console

A terminal UI (TUI) for monitoring [DataFusion Distributed](../README.md)
clusters in real time. Built with [ratatui](https://ratatui.rs).

## Quick-start

```bash
# Start a local cluster (16 workers on ports 9001-9016)
cargo run -p datafusion-distributed-console --example cluster

# In another terminal, open the console (connect to any worker port)
cargo run -p datafusion-distributed-console -- 9001
```

The console requires a port argument and auto-discovers all workers in the
cluster via the `GetClusterWorkers` RPC.

## Usage

```
datafusion-distributed-console <PORT> [OPTIONS]
```

| Argument / Flag | Required | Description |
|--------------------|----------|------------------------------------------------------|
| `PORT` | Yes | Port of a seed worker for auto-discovery |
| `--poll-interval` | No | Polling interval in milliseconds (default: 100) |

## Views

### Cluster Overview (`1`)

A table of all workers showing connection status, active tasks, queries in
flight, CPU usage, memory, and throughput. Columns are sortable.

### Worker Detail (`2`)

Drill into a single worker to see per-task progress (active and completed),
CPU/memory sparklines, and task durations.

## Worker Discovery

The console uses a single seed port to discover the full cluster.
On startup and every 5 seconds, it calls `GetClusterWorkers` on the seed worker,
which returns URLs for all known workers via its `WorkerResolver`. New workers
are added automatically; removed workers are cleaned up.

## Monitoring an EC2 Benchmark Cluster

The benchmark workers in [`benchmarks/cdk/`](../benchmarks/cdk/README.md) run on
EC2 instances with the observability service enabled. Each worker listens on port
9001 (gRPC/Flight + Observability) and port 9000 (HTTP benchmarks). The
`Ec2WorkerResolver` discovers peers via `DescribeInstances`, so connecting the
console to any single worker exposes the full cluster.

To run the console, SSH into any instance in the cluster and install it there
(the console runs inside the VPC so it can reach all workers on their private IPs):

```bash
cd benchmarks/cdk/
npm run deploy

# Connect to an instance via SSM
aws ssm start-session --target "$INSTANCE_ID" --region "$AWS_REGION"

# Install the Rust toolchain
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y
source "$HOME/.cargo/env"

# Install the console binary from the repo
cargo install --locked --git https://github.com/datafusion-contrib/datafusion-distributed.git \
datafusion-distributed-console

# Run — connect to the local worker on port 9001
datafusion-distributed-console 9001
```

## Examples

| Example | Description |
|--------------------------------------------------|------------------------------------------------|
| [`cluster`](examples/cluster.md) | Start a local multi-worker cluster |
| [`console_worker`](examples/console.md) | Start individual workers with observability |
| [`tpcds_runner`](examples/tpcds_runner.md) | Run TPC-DS queries with live monitoring |
62 changes: 62 additions & 0 deletions console/examples/cluster.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
# Cluster Example

Starts an in-memory cluster of DataFusion distributed workers with observability
enabled. This is the fastest way to get a local cluster running for use with the
console TUI or the TPC-DS runner.

## Quick-start

```bash
# Terminal 1 — start 16 workers
cargo run -p datafusion-distributed-console --example cluster

# Terminal 2 — open the console (connect to any worker port)
cargo run -p datafusion-distributed-console -- 9001
```

## Usage

```bash
cargo run -p datafusion-distributed-console --example cluster
```

This starts 16 workers on ports 9001 through 9016 and prints the commands to connect.

### Customize the cluster

```bash
cargo run -p datafusion-distributed-console --example cluster -- \
--workers 8 \
--base-port 5000
```

| Flag | Default | Description |
|-----------------|---------|----------------------------------------------------------|
| `--workers` | 16 | Number of workers to start |
| `--base-port` | 9001 | Starting port; workers bind to consecutive ports |

Workers bind to consecutive ports starting from `--base-port`
(e.g. `--base-port 5000` gives 5000, 5001, ..., 5015 for 16 workers).

If you change the base port, tell the console which worker to connect to:

```bash
cargo run -p datafusion-distributed-console -- 5000
```

## Connecting to the cluster

After starting, the example prints ready-to-use commands. For example:

```
Started 16 workers on ports: 9001,9002,...,9016

Console (connect to any worker for auto-discovery):
cargo run -p datafusion-distributed-console -- 9001
TPC-DS runner:
cargo run -p datafusion-distributed-console --example tpcds_runner -- --cluster-ports 9001,9002,...,9016
Single query:
cargo run -p datafusion-distributed-console --example console_run -- --cluster-ports 9001,9002,...,9016 "SELECT 1"
```

Press `Ctrl+C` to stop all workers.
109 changes: 109 additions & 0 deletions console/examples/cluster.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
use async_trait::async_trait;
use datafusion::error::DataFusionError;
use datafusion_distributed::{Worker, WorkerResolver};
use std::error::Error;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::sync::Arc;
use structopt::StructOpt;
use tokio::net::TcpListener;
use tonic::transport::Server;
use url::Url;

#[derive(StructOpt)]
#[structopt(
name = "cluster",
about = "Start an in-memory cluster of workers with observability"
)]
struct Args {
/// Number of workers to start
#[structopt(long, default_value = "16")]
workers: usize,

/// Starting port. Workers bind to consecutive ports from this value.
#[structopt(long, default_value = "9001")]
base_port: u16,
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let args = Args::from_args();

let mut ports = Vec::new();
let mut listeners = Vec::new();

// Bind all listeners first so we know all ports before starting workers
for i in 0..args.workers {
let addr = SocketAddr::new(
IpAddr::V4(Ipv4Addr::LOCALHOST),
args.base_port
.checked_add(i as u16)
.expect("port overflow: base_port + workers exceeds u16::MAX"),
);
let listener = TcpListener::bind(addr).await?;
let port = listener.local_addr()?.port();
ports.push(port);
listeners.push(listener);
}

let localhost_resolver = Arc::new(LocalhostWorkerResolver {
ports: ports.clone(),
});

for listener in listeners {
let resolver = localhost_resolver.clone();
tokio::spawn(async move {
let worker = Worker::default();

Server::builder()
.add_service(worker.with_observability_service(resolver))
.add_service(worker.into_flight_server())
.serve_with_incoming(tokio_stream::wrappers::TcpListenerStream::new(listener))
.await
.expect("worker server failed");
});
}

let ports_csv = ports
.iter()
.map(|p| p.to_string())
.collect::<Vec<_>>()
.join(",");

println!("Started {} workers on ports: {ports_csv}\n", args.workers);
println!("Console (connect to any worker for auto-discovery):");
println!(
"\tcargo run -p datafusion-distributed-console -- {}",
ports[0]
);
println!("TPC-DS runner:");
println!(
"\tcargo run -p datafusion-distributed-console --example tpcds_runner -- --cluster-ports {ports_csv}"
);
println!("Single query:");
println!(
"\tcargo run -p datafusion-distributed-console --example console_run -- --cluster-ports {ports_csv} \"SELECT 1\""
);
println!("Press Ctrl+C to stop all workers.");

tokio::signal::ctrl_c().await?;

Ok(())
}

#[derive(Clone)]
struct LocalhostWorkerResolver {
ports: Vec<u16>,
}

#[async_trait]
impl WorkerResolver for LocalhostWorkerResolver {
fn get_urls(&self) -> Result<Vec<Url>, DataFusionError> {
self.ports
.iter()
.map(|port| {
let url_string = format!("http://localhost:{port}");
Url::parse(&url_string).map_err(|e| DataFusionError::External(Box::new(e)))
})
.collect::<Result<Vec<Url>, _>>()
}
}
Loading