diff --git a/Cargo.lock b/Cargo.lock index 1a20fcab..222e846c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2158,6 +2158,7 @@ dependencies = [ "reqwest", "sketches-ddsketch", "structopt", + "sysinfo", "test-case", "tokio", "tokio-stream", @@ -2238,10 +2239,10 @@ dependencies = [ "datafusion", "datafusion-distributed", "futures", - "hex", "ratatui", "structopt", "tokio", + "tokio-stream", "tonic", "url", ] diff --git a/Cargo.toml b/Cargo.toml index 343eee9c..e300c59b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -42,6 +42,7 @@ tokio-stream = { version = "0.1.17", features = ["sync"] } tokio-util = "0.7" moka = { version = "0.12", features = ["sync", "future"] } crossbeam-queue = "0.3" +sysinfo = { version = "0.30", optional = true } sketches-ddsketch = { version = "0.3", features = ["use_serde"] } bincode = "1" @@ -71,9 +72,12 @@ integration = [ "zip", ] +system-metrics = ["sysinfo"] + tpch = ["integration"] tpcds = ["integration"] clickbench = ["integration"] +sysinfo = ["dep:sysinfo"] [dev-dependencies] structopt = "0.3" diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml index 87644b04..90921f6c 100644 --- a/benchmarks/Cargo.toml +++ b/benchmarks/Cargo.toml @@ -7,7 +7,10 @@ default-run = "dfbench" [dependencies] datafusion = { workspace = true } datafusion-proto = { workspace = true } -datafusion-distributed = { path = "..", features = ["integration"] } +datafusion-distributed = { path = "..", features = [ + "integration", + "system-metrics", +] } tokio = { version = "1.48", features = ["full"] } parquet = { version = "57.1.0" } structopt = { version = "0.3.26" } @@ -36,7 +39,7 @@ criterion = "0.5" sysinfo = "0.30" [build-dependencies] -built = { version = "0.8" , features = ["git2", "chrono"]} +built = { version = "0.8", features = ["git2", "chrono"] } [[bin]] name = "dfbench" @@ -53,4 +56,3 @@ harness = false [[bench]] name = "shuffle" harness = false - diff --git a/benchmarks/cdk/bin/worker.rs b/benchmarks/cdk/bin/worker.rs index 5d019c5e..978522de 100644 --- a/benchmarks/cdk/bin/worker.rs +++ b/benchmarks/cdk/bin/worker.rs @@ -205,7 +205,9 @@ async fn main() -> Result<(), Box> { }), ), ); + let ec2_worker_resolver = Arc::new(Ec2WorkerResolver::new()); let grpc_server = Server::builder() + .add_service(worker.with_observability_service(ec2_worker_resolver)) .add_service(worker.into_flight_server()) .serve(WORKER_ADDR.parse()?); diff --git a/console/Cargo.toml b/console/Cargo.toml index e7a009c1..f4cea5d3 100644 --- a/console/Cargo.toml +++ b/console/Cargo.toml @@ -5,17 +5,17 @@ edition = "2024" [dependencies] datafusion = { workspace = true } +structopt = "0.3" color-eyre = "0.6.5" crossterm = "0.29.0" futures = "0.3.31" -hex = "0.4" ratatui = "0.30.0" tokio = { version = "1.49.0", features = ["full"] } tonic = "0.14.2" -datafusion-distributed = { path = "..", features = ["integration"] } +datafusion-distributed = { path = "..", features = ["integration", "system-metrics"] } arrow-flight = "57.1.0" -structopt = "0.3.26" url = "2.5.7" +tokio-stream = "0.1.18" [dev-dependencies] arrow = "57.1.0" diff --git a/console/README.md b/console/README.md new file mode 100644 index 00000000..81058f22 --- /dev/null +++ b/console/README.md @@ -0,0 +1,85 @@ +# datafusion-distributed-console + +A terminal UI (TUI) for monitoring [DataFusion Distributed](../README.md) +clusters in real time. Built with [ratatui](https://ratatui.rs). + +## Quick-start + +```bash +# Start a local cluster (16 workers on ports 9001-9016) +cargo run -p datafusion-distributed-console --example cluster + +# In another terminal, open the console (connect to any worker port) +cargo run -p datafusion-distributed-console -- 9001 +``` + +The console requires a port argument and auto-discovers all workers in the +cluster via the `GetClusterWorkers` RPC. + +## Usage + +``` +datafusion-distributed-console [OPTIONS] +``` + +| Argument / Flag | Required | Description | +|--------------------|----------|------------------------------------------------------| +| `PORT` | Yes | Port of a seed worker for auto-discovery | +| `--poll-interval` | No | Polling interval in milliseconds (default: 100) | + +## Views + +### Cluster Overview (`1`) + +A table of all workers showing connection status, active tasks, queries in +flight, CPU usage, memory, and throughput. Columns are sortable. + +### Worker Detail (`2`) + +Drill into a single worker to see per-task progress (active and completed), +CPU/memory sparklines, and task durations. + +## Worker Discovery + +The console uses a single seed port to discover the full cluster. +On startup and every 5 seconds, it calls `GetClusterWorkers` on the seed worker, +which returns URLs for all known workers via its `WorkerResolver`. New workers +are added automatically; removed workers are cleaned up. + +## Monitoring an EC2 Benchmark Cluster + +The benchmark workers in [`benchmarks/cdk/`](../benchmarks/cdk/README.md) run on +EC2 instances with the observability service enabled. Each worker listens on port +9001 (gRPC/Flight + Observability) and port 9000 (HTTP benchmarks). The +`Ec2WorkerResolver` discovers peers via `DescribeInstances`, so connecting the +console to any single worker exposes the full cluster. + +To run the console, SSH into any instance in the cluster and install it there +(the console runs inside the VPC so it can reach all workers on their private IPs): + +```bash +cd benchmarks/cdk/ +npm run deploy + +# Connect to an instance via SSM +aws ssm start-session --target "$INSTANCE_ID" --region "$AWS_REGION" + +# Install the Rust toolchain +curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y +source "$HOME/.cargo/env" + +# Install the console binary from the repo +cargo install --locked --git https://github.com/datafusion-contrib/datafusion-distributed.git \ + datafusion-distributed-console + +# Run — connect to the local worker on port 9001 +datafusion-distributed-console 9001 +``` + +## Examples + +| Example | Description | +|--------------------------------------------------|------------------------------------------------| +| [`cluster`](examples/cluster.md) | Start a local multi-worker cluster | +| [`console_worker`](examples/console.md) | Start individual workers with observability | +| [`tpcds_runner`](examples/tpcds_runner.md) | Run TPC-DS queries with live monitoring | diff --git a/console/examples/cluster.md b/console/examples/cluster.md new file mode 100644 index 00000000..312efdc0 --- /dev/null +++ b/console/examples/cluster.md @@ -0,0 +1,62 @@ +# Cluster Example + +Starts an in-memory cluster of DataFusion distributed workers with observability +enabled. This is the fastest way to get a local cluster running for use with the +console TUI or the TPC-DS runner. + +## Quick-start + +```bash +# Terminal 1 — start 16 workers +cargo run -p datafusion-distributed-console --example cluster + +# Terminal 2 — open the console (connect to any worker port) +cargo run -p datafusion-distributed-console -- 9001 +``` + +## Usage + +```bash +cargo run -p datafusion-distributed-console --example cluster +``` + +This starts 16 workers on ports 9001 through 9016 and prints the commands to connect. + +### Customize the cluster + +```bash +cargo run -p datafusion-distributed-console --example cluster -- \ + --workers 8 \ + --base-port 5000 +``` + +| Flag | Default | Description | +|-----------------|---------|----------------------------------------------------------| +| `--workers` | 16 | Number of workers to start | +| `--base-port` | 9001 | Starting port; workers bind to consecutive ports | + +Workers bind to consecutive ports starting from `--base-port` +(e.g. `--base-port 5000` gives 5000, 5001, ..., 5015 for 16 workers). + +If you change the base port, tell the console which worker to connect to: + +```bash +cargo run -p datafusion-distributed-console -- 5000 +``` + +## Connecting to the cluster + +After starting, the example prints ready-to-use commands. For example: + +``` +Started 16 workers on ports: 9001,9002,...,9016 + +Console (connect to any worker for auto-discovery): + cargo run -p datafusion-distributed-console -- 9001 +TPC-DS runner: + cargo run -p datafusion-distributed-console --example tpcds_runner -- --cluster-ports 9001,9002,...,9016 +Single query: + cargo run -p datafusion-distributed-console --example console_run -- --cluster-ports 9001,9002,...,9016 "SELECT 1" +``` + +Press `Ctrl+C` to stop all workers. diff --git a/console/examples/cluster.rs b/console/examples/cluster.rs new file mode 100644 index 00000000..cb541240 --- /dev/null +++ b/console/examples/cluster.rs @@ -0,0 +1,109 @@ +use async_trait::async_trait; +use datafusion::error::DataFusionError; +use datafusion_distributed::{Worker, WorkerResolver}; +use std::error::Error; +use std::net::{IpAddr, Ipv4Addr, SocketAddr}; +use std::sync::Arc; +use structopt::StructOpt; +use tokio::net::TcpListener; +use tonic::transport::Server; +use url::Url; + +#[derive(StructOpt)] +#[structopt( + name = "cluster", + about = "Start an in-memory cluster of workers with observability" +)] +struct Args { + /// Number of workers to start + #[structopt(long, default_value = "16")] + workers: usize, + + /// Starting port. Workers bind to consecutive ports from this value. + #[structopt(long, default_value = "9001")] + base_port: u16, +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + let args = Args::from_args(); + + let mut ports = Vec::new(); + let mut listeners = Vec::new(); + + // Bind all listeners first so we know all ports before starting workers + for i in 0..args.workers { + let addr = SocketAddr::new( + IpAddr::V4(Ipv4Addr::LOCALHOST), + args.base_port + .checked_add(i as u16) + .expect("port overflow: base_port + workers exceeds u16::MAX"), + ); + let listener = TcpListener::bind(addr).await?; + let port = listener.local_addr()?.port(); + ports.push(port); + listeners.push(listener); + } + + let localhost_resolver = Arc::new(LocalhostWorkerResolver { + ports: ports.clone(), + }); + + for listener in listeners { + let resolver = localhost_resolver.clone(); + tokio::spawn(async move { + let worker = Worker::default(); + + Server::builder() + .add_service(worker.with_observability_service(resolver)) + .add_service(worker.into_flight_server()) + .serve_with_incoming(tokio_stream::wrappers::TcpListenerStream::new(listener)) + .await + .expect("worker server failed"); + }); + } + + let ports_csv = ports + .iter() + .map(|p| p.to_string()) + .collect::>() + .join(","); + + println!("Started {} workers on ports: {ports_csv}\n", args.workers); + println!("Console (connect to any worker for auto-discovery):"); + println!( + "\tcargo run -p datafusion-distributed-console -- {}", + ports[0] + ); + println!("TPC-DS runner:"); + println!( + "\tcargo run -p datafusion-distributed-console --example tpcds_runner -- --cluster-ports {ports_csv}" + ); + println!("Single query:"); + println!( + "\tcargo run -p datafusion-distributed-console --example console_run -- --cluster-ports {ports_csv} \"SELECT 1\"" + ); + println!("Press Ctrl+C to stop all workers."); + + tokio::signal::ctrl_c().await?; + + Ok(()) +} + +#[derive(Clone)] +struct LocalhostWorkerResolver { + ports: Vec, +} + +#[async_trait] +impl WorkerResolver for LocalhostWorkerResolver { + fn get_urls(&self) -> Result, DataFusionError> { + self.ports + .iter() + .map(|port| { + let url_string = format!("http://localhost:{port}"); + Url::parse(&url_string).map_err(|e| DataFusionError::External(Box::new(e))) + }) + .collect::, _>>() + } +} diff --git a/console/examples/console.md b/console/examples/console.md index 03710e86..871db718 100644 --- a/console/examples/console.md +++ b/console/examples/console.md @@ -5,26 +5,41 @@ while running distributed queries. ## Terminal 1: Start workers with observability +The easiest way is to use the cluster example, which starts 16 workers on ports +9001-9016 (see [cluster.md](cluster.md)): + +```bash + cargo run -p datafusion-distributed-console --example cluster +``` + +Or start individual workers manually: + ```bash - cargo run -p datafusion-distributed-console --example console_worker -- 8080 - cargo run -p datafusion-distributed-console --example console_worker -- 8081 + cargo run -p datafusion-distributed-console --example console_worker -- 9001 + cargo run -p datafusion-distributed-console --example console_worker -- 9002 ``` ## Terminal 2: Start console ```bash - cargo run -p datafusion-distributed-console -- --cluster-ports 8080,8081 + cargo run -p datafusion-distributed-console -- 9001 ``` -The TUI console will start and connect to the workers. It will show "Waiting for tasks..." -until queries are executed. +The console auto-discovers all workers in the cluster via `GetClusterWorkers`. +It will show "Waiting for tasks..." until queries are executed. + +To connect to a worker on a different port: + +```bash + cargo run -p datafusion-distributed-console -- 9002 +``` ## Terminal 3: Run a query ```bash cargo run -p datafusion-distributed-console --example console_run -- \ "SELECT * FROM weather LIMIT 100" \ - --cluster-ports 8080,8081 + --cluster-ports 9001,9002 ``` The console will display real-time task progress across all workers. diff --git a/console/examples/console_run.rs b/console/examples/console_run.rs index 289cee14..5da8823f 100644 --- a/console/examples/console_run.rs +++ b/console/examples/console_run.rs @@ -16,7 +16,6 @@ use url::Url; #[structopt(name = "console_run", about = "Run queries with console integration")] struct Args { /// The SQL query to run. - #[structopt()] query: String, /// The ports holding Distributed DataFusion workers. diff --git a/console/examples/console_worker.rs b/console/examples/console_worker.rs index 7dc827bf..80178e4d 100644 --- a/console/examples/console_worker.rs +++ b/console/examples/console_worker.rs @@ -1,8 +1,12 @@ -use datafusion_distributed::{ObservabilityServiceServer, Worker}; +use async_trait::async_trait; +use datafusion::error::DataFusionError; +use datafusion_distributed::{Worker, WorkerResolver}; use std::error::Error; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; +use std::sync::Arc; use structopt::StructOpt; use tonic::transport::Server; +use url::Url; #[derive(StructOpt)] #[structopt( @@ -10,22 +14,46 @@ use tonic::transport::Server; about = "A localhost DataFusion worker with observability" )] struct Args { - #[structopt(default_value = "8080")] + /// Port to listen on. port: u16, + + /// The ports holding Distributed DataFusion workers. + #[structopt(long = "cluster-ports", use_delimiter = true)] + cluster_ports: Vec, } #[tokio::main] async fn main() -> Result<(), Box> { let args = Args::from_args(); + let localhost_resolver = Arc::new(LocalhostWorkerResolver { + ports: args.cluster_ports.clone(), + }); let worker = Worker::default(); - let observability_service = ObservabilityServiceServer::new(worker.observability_service()); Server::builder() + .add_service(worker.with_observability_service(localhost_resolver)) .add_service(worker.into_flight_server()) - .add_service(observability_service) .serve(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), args.port)) .await?; Ok(()) } + +#[derive(Clone)] +struct LocalhostWorkerResolver { + ports: Vec, +} + +#[async_trait] +impl WorkerResolver for LocalhostWorkerResolver { + fn get_urls(&self) -> Result, DataFusionError> { + self.ports + .iter() + .map(|port| { + let url_string = format!("http://localhost:{port}"); + Url::parse(&url_string).map_err(|e| DataFusionError::External(Box::new(e))) + }) + .collect::, _>>() + } +} diff --git a/console/examples/tpcds_runner.md b/console/examples/tpcds_runner.md index 630d9396..5b1c11f9 100644 --- a/console/examples/tpcds_runner.md +++ b/console/examples/tpcds_runner.md @@ -17,21 +17,27 @@ may take a few minutes). ## Usage -### Step 1: Start Workers with Observability (Terminals 1-4) +### Step 1: Start Workers with Observability -Start 4 workers on different ports in different terminals: +The quickest way is to start a cluster (defaults to 16 workers on ports 9001-9016): ```bash -cargo run -p datafusion-distributed-console --example -- console_worker -- 8080 -cargo run -p datafusion-distributed-console --example -- console_worker -- 8081 -cargo run -p datafusion-distributed-console --example -- console_worker -- 8082 -cargo run -p datafusion-distributed-console --example -- console_worker -- 8083 +cargo run -p datafusion-distributed-console --example cluster +``` + +Or start workers individually in separate terminals: + +```bash +cargo run -p datafusion-distributed-console --example console_worker -- 9001 +cargo run -p datafusion-distributed-console --example console_worker -- 9002 +cargo run -p datafusion-distributed-console --example console_worker -- 9003 +cargo run -p datafusion-distributed-console --example console_worker -- 9004 ``` ### Step 2: Start the Console (Terminal 5) ```bash -cargo run -p datafusion-distributed-console +cargo run -p datafusion-distributed-console -- 9001 ``` ### Step 3: Run TPC-DS Queries (Terminal 6) @@ -39,14 +45,14 @@ cargo run -p datafusion-distributed-console #### Run a single query ```bash -cargo run -p datafusion-distributed-console --example tpcds_runner \ - --cluster-ports 8080,8081,8082,8083 \ +cargo run -p datafusion-distributed-console --example tpcds_runner -- \ + --cluster-ports 9001,9002,9003,9004 \ --query q99 ``` #### Run all TPC-DS queries sequentially ```bash -cargo run -p datafusion-distributed-console --example tpcds_runner \ - --cluster-ports 8080,8081,8082,8083 \ +cargo run -p datafusion-distributed-console --example tpcds_runner -- \ + --cluster-ports 9001,9002,9003,9004 ``` diff --git a/console/examples/tpcds_runner.rs b/console/examples/tpcds_runner.rs index 65cfe8e3..cda72ee9 100644 --- a/console/examples/tpcds_runner.rs +++ b/console/examples/tpcds_runner.rs @@ -1,19 +1,26 @@ use async_trait::async_trait; use datafusion::common::DataFusionError; use datafusion::execution::SessionStateBuilder; +use datafusion::physical_plan::execute_stream; use datafusion::prelude::SessionContext; -use datafusion_distributed::{DistributedExt, DistributedPhysicalOptimizerRule, WorkerResolver}; +use datafusion_distributed::{ + DistributedExt, DistributedMetricsFormat, DistributedPhysicalOptimizerRule, WorkerResolver, + display_plan_ascii, +}; +use futures::TryStreamExt; use std::error::Error; +use std::num::NonZeroUsize; use std::path::Path; use std::sync::Arc; use structopt::StructOpt; +use tokio::sync::Semaphore; +use tokio::task::JoinSet; use url::Url; #[derive(StructOpt)] #[structopt(name = "tpcds_runner", about = "Run TPC-DS with observability")] struct Args { - /// Start a worker with observability service - /// Run TPC-DS queries against workers + /// Comma-delimited list of worker ports (e.g. 8080,8081) #[structopt(long, use_delimiter = true)] cluster_ports: Vec, @@ -26,6 +33,19 @@ struct Args { /// Specific query to run (e.g., "q1"), or "all" to run all queries #[structopt(long, default_value = "all")] query: String, + + /// Maximum number of concurrent queries. Defaults to all queries at once. + /// Use --concurrency 1 for sequential execution. + #[structopt(long)] + concurrency: Option, + + /// Run the query and display the plan with execution metrics (EXPLAIN ANALYZE) + #[structopt(long)] + explain_analyze: bool, + + /// Whether the distributed plan should be rendered instead of executing the query. + #[structopt(long)] + show_distributed_plan: bool, } #[tokio::main] @@ -37,6 +57,9 @@ async fn main() -> Result<(), Box> { args.scale_factor, args.parquet_partitions, &args.query, + args.concurrency, + args.explain_analyze, + args.show_distributed_plan, ) .await } @@ -46,11 +69,12 @@ async fn run_queries( scale_factor: f64, parquet_partitions: usize, query_id: &str, + concurrency: Option, + explain_analyze: bool, + show_distributed_plan: bool, ) -> Result<(), Box> { use datafusion_distributed::test_utils::{benchmarks_common, tpcds}; use std::fs; - use std::time::Instant; - use tokio::time::sleep; println!( "Running TPC-DS queries (SF={scale_factor}, partitions={parquet_partitions}) against workers: {cluster_ports:?}" @@ -87,62 +111,88 @@ async fn run_queries( // Determine which queries to run let queries: Vec = if query_id == "all" { - // Run all TPC-DS queries (q1 through q99) (1..=99).map(|i| format!("q{i}")).collect() } else { vec![query_id.to_string()] }; - println!("\nRunning {} queries...\n", queries.len()); - - for query in queries { - let query_sql = match tpcds::get_query(&query) { - Ok(sql) => sql, - Err(e) => { - println!("Skipping {query}: {e}\n"); - continue; - } - }; + let max_concurrent = concurrency.map(NonZeroUsize::get).unwrap_or(queries.len()); - // Add sleep to observe "completed" state in console - sleep(tokio::time::Duration::from_millis(1000)).await; - - println!("Running {query}"); - - let start = Instant::now(); - - match run_single_query(&ctx, &query_sql).await { - Ok(batches) => { - let duration = start.elapsed(); - let row_count: usize = batches.iter().map(|b| b.num_rows()).sum(); + println!( + "\nRunning {} queries (concurrency={})...\n", + queries.len(), + max_concurrent + ); - println!("{query} completed in {duration:?}"); - println!("\tRows returned: {row_count}"); - } + // Resolve query SQL upfront + let query_sqls: Vec<(String, String)> = queries + .into_iter() + .filter_map(|query| match tpcds::get_query(&query) { + Ok(sql) => Some((query, sql)), Err(e) => { - println!("{query} failed: {e}"); + println!("Skipping {query}: {e}"); + None } - } + }) + .collect(); + + let semaphore = Arc::new(Semaphore::new(max_concurrent)); + let mut tasks = JoinSet::new(); + + for (query_id, query_sql) in query_sqls { + let permit = semaphore.clone().acquire_owned().await.unwrap(); + let ctx = ctx.clone(); + + tasks.spawn(async move { + use std::time::Instant; + + let start = Instant::now(); + let result = + run_single_query(&ctx, &query_sql, explain_analyze, show_distributed_plan).await; + drop(permit); + + let duration = start.elapsed(); + match result { + Ok(batches) => { + let row_count: usize = batches.iter().map(|b| b.num_rows()).sum(); + format!("{query_id} completed in {duration:?} ({row_count} rows)") + } + Err(e) => format!("{query_id} failed: {e}"), + } + }); + } - println!(); + while let Some(result) = tasks.join_next().await { + match result { + Ok(msg) => println!("{msg}"), + Err(e) => eprintln!("Task panicked: {e}"), + } } + println!("\nAll queries finished."); + Ok(()) } async fn run_single_query( ctx: &SessionContext, query_sql: &str, -) -> Result, Box> { - use futures::StreamExt; - + explain_analyze: bool, + show_distributed_plan: bool, +) -> Result, Box> { let df = ctx.sql(query_sql).await?; - let stream = df.execute_stream().await?; - let batches = stream - .collect::>() - .await - .into_iter() - .collect::, _>>()?; + let plan = df.create_physical_plan().await?; + if show_distributed_plan { + println!("{}", display_plan_ascii(plan.as_ref(), false)); + return Ok(vec![]); + } + let stream = execute_stream(plan.clone(), ctx.task_ctx())?; + let batches = stream.try_collect::>().await?; + if explain_analyze { + let output = + datafusion_distributed::explain_analyze(plan, DistributedMetricsFormat::Aggregated)?; + println!("{output}"); + } Ok(batches) } diff --git a/console/src/app.rs b/console/src/app.rs index b9b0b7e7..04ec2b19 100644 --- a/console/src/app.rs +++ b/console/src/app.rs @@ -1,81 +1,78 @@ -use datafusion_distributed::{ - GetTaskProgressRequest, ObservabilityServiceClient, ObservabilityStageKey, PingRequest, - TaskStatus, -}; +use crate::state::{ClusterViewState, SortColumn, SortDirection, View, WorkerViewState}; +use crate::worker::{ConnectionStatus, WorkerConn, discover_cluster_workers}; use std::collections::HashSet; use std::time::{Duration, Instant}; -use tonic::transport::Channel; use url::Url; /// App holds the main application state. -pub struct App { - pub workers: Vec, - pub should_quit: bool, - pub console_state: ConsoleState, +pub(crate) struct App { + pub(crate) workers: Vec, + active_query_count: usize, + pub(crate) current_view: View, + pub(crate) cluster_state: ClusterViewState, + pub(crate) worker_state: WorkerViewState, + pub(crate) paused: bool, + pub(crate) show_help: bool, + pub(crate) should_quit: bool, + pub(crate) start_time: Instant, + poll_count: u64, + /// Previous tick's cluster-wide output rows total (for throughput delta). + prev_output_rows_total: u64, + prev_output_rows_time: Option, + /// Smoothed cluster-wide throughput in rows/s. + pub(crate) current_throughput: f64, + /// Seed URL for worker discovery via `GetClusterWorkers`. + seed_url: Url, + /// Last time we ran worker discovery. + last_discovery: Option, } -/// Represents overall state of the console application. -#[derive(Clone, PartialEq)] -pub enum ConsoleState { - Idle, - Active, - Completed, +/// Cluster-wide statistics for the header. +pub(crate) struct ClusterStats { + pub(crate) total: usize, + pub(crate) active_count: usize, + pub(crate) idle_count: usize, + pub(crate) connecting_count: usize, + pub(crate) disconnected_count: usize, + pub(crate) total_tasks: usize, + pub(crate) total_completed: usize, + pub(crate) active_queries: usize, } -/// Tracks individual worker conneciton states. -#[derive(Clone)] -enum ConnectionStatus { - Connecting, - Idle, - Active, - Disconnected { reason: String }, -} +/// Interval between worker discovery polls. +const DISCOVERY_INTERVAL: Duration = Duration::from_secs(5); impl App { - /// Create a new App with the given worker URLs. - pub fn new(worker_urls: Vec) -> Self { - let workers = worker_urls - .into_iter() - .map(|url| WorkerState { - url, - client: None, - connection_status: ConnectionStatus::Connecting, - tasks: Vec::new(), - completed_tasks: Vec::new(), - last_poll: None, - last_reconnect_attempt: None, - last_seen_query_ids: HashSet::new(), - }) - .collect(); - + /// Create a new App that discovers workers via `GetClusterWorkers` on the seed URL. + pub(crate) fn new(seed_url: Url) -> Self { App { - workers, + workers: Vec::new(), + active_query_count: 0, + current_view: View::ClusterOverview, + cluster_state: ClusterViewState::default(), + worker_state: WorkerViewState::default(), + paused: false, + show_help: false, should_quit: false, - console_state: ConsoleState::Idle, + start_time: Instant::now(), + poll_count: 0, + prev_output_rows_total: 0, + prev_output_rows_time: None, + current_throughput: 0.0, + seed_url, + last_discovery: None, } } - /// Handle keyboard events - pub fn handle_key_event(&mut self, key: crossterm::event::KeyEvent) { - use crossterm::event::{KeyCode, KeyEventKind}; - - if key.kind == KeyEventKind::Press { - match key.code { - KeyCode::Char('q') | KeyCode::Esc => self.should_quit = true, - KeyCode::Char('r') => { - // Clear completed tasks but keep connections - for worker in &mut self.workers { - worker.completed_tasks.clear(); - } - self.console_state = ConsoleState::Idle; - } - _ => {} - } + /// Poll all workers for task progress. Called on the gRPC tick interval. + pub(crate) async fn tick(&mut self) { + if self.paused { + return; } - } - /// Poll all workers for task progress. - pub async fn tick(&mut self) { + // Run worker discovery if in auto mode + self.maybe_discover_workers().await; + // Attempt connection for workers in Connecting or Disconnected state for worker in &mut self.workers { if worker.should_retry_connection() { @@ -97,303 +94,197 @@ impl App { }) .await; - self.update_console_state(); + self.poll_count += 1; + self.rebuild_queries(); + self.update_throughput(); } - /// Update overall console state based on worker states. - fn update_console_state(&mut self) { - if self.workers.is_empty() { - self.console_state = ConsoleState::Idle; + /// Periodically discovers workers via `GetClusterWorkers` on the seed URL. + async fn maybe_discover_workers(&mut self) { + let should_discover = match self.last_discovery { + None => true, + Some(last) => last.elapsed() >= DISCOVERY_INTERVAL, + }; + + if !should_discover { return; } - let has_active = self - .workers - .iter() - .any(|w| matches!(w.connection_status, ConnectionStatus::Active)); + self.last_discovery = Some(Instant::now()); - let has_running_tasks = self.workers.iter().any(|w| !w.tasks.is_empty()); - let has_completed_tasks = self.workers.iter().any(|w| w.has_completed_tasks()); + let discovered_urls = match discover_cluster_workers(&self.seed_url).await { + Ok(urls) => urls, + Err(_) => return, + }; - if has_active || has_running_tasks { - self.console_state = ConsoleState::Active; - } else if has_completed_tasks { - // All tasks completed, no running tasks - self.console_state = ConsoleState::Completed; - } else { - self.console_state = ConsoleState::Idle; + // Build set of currently known URLs (owned to avoid borrow conflict) + let known_urls: HashSet = self.workers.iter().map(|w| w.url.clone()).collect(); + + // Add new workers + for url in &discovered_urls { + if !known_urls.contains(url) { + self.workers.push(WorkerConn::new(url.clone())); + } } + + // Remove workers that are no longer in the discovered set + let discovered_set: HashSet<&Url> = discovered_urls.iter().collect(); + self.workers.retain(|w| discovered_set.contains(&w.url)); + } + + /// Update cluster-wide throughput from output rows delta. + fn update_throughput(&mut self) { + let current_total: u64 = self.workers.iter().map(|w| w.output_rows_total).sum(); + let now = Instant::now(); + + if let Some(prev_time) = self.prev_output_rows_time { + let elapsed = prev_time.elapsed().as_secs_f64(); + if elapsed > 0.0 { + let delta = current_total.saturating_sub(self.prev_output_rows_total); + let instantaneous = delta as f64 / elapsed; + // Exponential smoothing + self.current_throughput = 0.7 * instantaneous + 0.3 * self.current_throughput; + } + } + + self.prev_output_rows_total = current_total; + self.prev_output_rows_time = Some(now); + } + + /// Count distinct active queries across all workers. + fn rebuild_queries(&mut self) { + let mut query_ids = HashSet::new(); + for worker in &self.workers { + for task in &worker.tasks { + if let Some(sk) = &task.stage_key { + query_ids.insert(&sk.query_id); + } + } + } + self.active_query_count = query_ids.len(); } /// Get cluster-wide statistics. - pub fn cluster_stats(&self) -> ClusterStats { + pub(crate) fn cluster_stats(&self) -> ClusterStats { let mut stats = ClusterStats { + total: self.workers.len(), active_count: 0, idle_count: 0, - completed: 0, + connecting_count: 0, disconnected_count: 0, total_tasks: 0, + total_completed: 0, + active_queries: self.active_query_count, }; for worker in &self.workers { match worker.connection_status { ConnectionStatus::Active => stats.active_count += 1, ConnectionStatus::Idle => stats.idle_count += 1, + ConnectionStatus::Connecting => stats.connecting_count += 1, ConnectionStatus::Disconnected { .. } => stats.disconnected_count += 1, - ConnectionStatus::Connecting => {} } stats.total_tasks += worker.tasks.len(); + stats.total_completed += worker.completed_tasks.len(); } stats } -} -/// Cluster-wide statistics for display in the UI. -pub struct ClusterStats { - pub active_count: usize, - pub idle_count: usize, - pub completed: usize, - pub disconnected_count: usize, - pub total_tasks: usize, -} + /// Get the sorted worker indices for the cluster view. + pub(crate) fn sorted_worker_indices(&self) -> Vec { + let mut indices: Vec = (0..self.workers.len()).collect(); + let direction = self.cluster_state.sort_direction; -/// Tracks state for a single worker. -pub struct WorkerState { - pub url: Url, - client: Option>, - connection_status: ConnectionStatus, - pub tasks: Vec, - pub completed_tasks: Vec, - last_poll: Option, - last_reconnect_attempt: Option, - last_seen_query_ids: HashSet>, -} - -/// Stores information about completed tasks for progress display after they are removed from the -/// moka TTL cache. -#[derive(Clone, Debug)] -pub struct CompletedTask { - _stage_key: ObservabilityStageKey, - pub total_partitions: u64, - query_id: Vec, -} + if direction == SortDirection::Unsorted { + return indices; + } -impl WorkerState { - /// Attempts to establish a gRPC connection to a worker. - async fn try_connect(&mut self) { - self.last_reconnect_attempt = Some(Instant::now()); + let ascending = direction == SortDirection::Ascending; - match ObservabilityServiceClient::connect(self.url.to_string()).await { - Ok(mut client) => match client.ping(PingRequest {}).await { - Ok(_) => { - self.client = Some(client); - self.connection_status = ConnectionStatus::Idle; - self.tasks.clear(); - } - Err(e) => { - self.client = None; - self.connection_status = ConnectionStatus::Disconnected { - reason: format!("Ping failed: {e}"), - }; - } - }, - Err(e) => { - self.client = None; - self.connection_status = ConnectionStatus::Disconnected { - reason: format!("Connection failed: {e}"), - }; + match self.cluster_state.selected_column { + SortColumn::Worker => { + indices.sort_by(|&a, &b| { + let cmp = self.workers[a].url.cmp(&self.workers[b].url); + if ascending { cmp } else { cmp.reverse() } + }); } - } - } - - /// Returns true if the worker should attempt a connection. This covers the initial - /// `Connecting` state (first tick) and `Disconnected` state with a 5-second backoff. - fn should_retry_connection(&self) -> bool { - match &self.connection_status { - ConnectionStatus::Connecting => { - // First tick: no attempt made yet - self.last_reconnect_attempt.is_none() + SortColumn::Status => { + indices.sort_by(|&a, &b| { + let cmp = self.workers[a] + .status_sort_key() + .cmp(&self.workers[b].status_sort_key()); + if ascending { cmp } else { cmp.reverse() } + }); } - ConnectionStatus::Disconnected { .. } => { - if let Some(last_attempt) = self.last_reconnect_attempt { - last_attempt.elapsed() >= Duration::from_secs(5) - } else { - true - } + SortColumn::Tasks => { + indices.sort_by(|&a, &b| { + let cmp = self.workers[a] + .tasks + .len() + .cmp(&self.workers[b].tasks.len()); + if ascending { cmp } else { cmp.reverse() } + }); } - _ => false, - } - } - - /// Queries a worker for task progress. - async fn poll(&mut self) { - if let Some(client) = &mut self.client { - match client.get_task_progress(GetTaskProgressRequest {}).await { - Ok(response) => { - let new_tasks = response.into_inner().tasks; - self.last_poll = Some(Instant::now()); - - // Detect completed tasks: tasks that were running but now have completed and - // been removed from the TTL cache. - for old_task in &self.tasks { - if old_task.status == TaskStatus::Running as i32 { - let still_exists = new_tasks.iter().any(|t| { - if let (Some(old_key), Some(new_key)) = - (&old_task.stage_key, &t.stage_key) - { - old_key.query_id == new_key.query_id - && old_key.stage_id == new_key.stage_id - && old_key.task_number == new_key.task_number - } else { - false - } - }); - - // Assume completion - if !still_exists { - if let Some(stage_key) = &old_task.stage_key { - self.completed_tasks.push(CompletedTask { - _stage_key: stage_key.clone(), - total_partitions: old_task.total_partitions, - query_id: stage_key.query_id.clone(), - }); - } - } - } - } - - // Update current tasks - self.tasks = new_tasks; - - // Collect query IDs from current tasks - let mut current_query_ids = HashSet::new(); - let mut has_running = false; - - for task in &self.tasks { - if let Some(stage_key) = &task.stage_key { - current_query_ids.insert(stage_key.query_id.clone()); - - if task.status == TaskStatus::Running as i32 { - has_running = true; - } - } - } - - // If new work starts, clear old completed tasks - if has_running && !self.completed_tasks.is_empty() { - // Check if this is a new query - let completed_query_ids: HashSet<_> = self - .completed_tasks - .iter() - .map(|t| t.query_id.clone()) - .collect(); - - if !current_query_ids - .iter() - .any(|id| completed_query_ids.contains(id)) - { - // New query started, clear old completed tasks - self.completed_tasks.clear(); - } - } - - // Update connection status based on task activity - match &self.connection_status { - ConnectionStatus::Active => { - if !has_running { - // All tasks disappeared, go to Idle - self.connection_status = ConnectionStatus::Idle; - } - // Otherwise stay Active - } - ConnectionStatus::Idle => { - if has_running { - // New tasks started - self.connection_status = ConnectionStatus::Active; - } - } - _ => { - // For Connecting or Disconnected states - if has_running { - self.connection_status = ConnectionStatus::Active; - } else { - self.connection_status = ConnectionStatus::Idle; - } - } - } - - // Update tracked query IDs - self.last_seen_query_ids = current_query_ids; - } - Err(e) => { - // Connection lost - self.client = None; - self.tasks.clear(); - self.connection_status = ConnectionStatus::Disconnected { - reason: format!("Poll failed: {e}"), - }; - self.last_seen_query_ids.clear(); - } + SortColumn::Queries => { + indices.sort_by(|&a, &b| { + let cmp = self.workers[a] + .distinct_query_count() + .cmp(&self.workers[b].distinct_query_count()); + if ascending { cmp } else { cmp.reverse() } + }); + } + SortColumn::Cpu => { + indices.sort_by(|&a, &b| { + let cmp = self.workers[a] + .cpu_usage_percent + .partial_cmp(&self.workers[b].cpu_usage_percent) + .unwrap_or(std::cmp::Ordering::Equal); + if ascending { cmp } else { cmp.reverse() } + }); + } + SortColumn::Memory => { + indices.sort_by(|&a, &b| { + let cmp = self.workers[a].rss_bytes.cmp(&self.workers[b].rss_bytes); + if ascending { cmp } else { cmp.reverse() } + }); } } + indices } - /// Returns string representation of connection status. - pub fn status_text(&self) -> String { - match &self.connection_status { - ConnectionStatus::Connecting => "CONNECTING".to_string(), - ConnectionStatus::Idle => "IDLE".to_string(), - ConnectionStatus::Active => "ACTIVE".to_string(), - ConnectionStatus::Disconnected { .. } => "DISCONNECTED".to_string(), - } - } - - /// Returns color for UI display based on status. - pub fn status_color(&self) -> ratatui::style::Color { - use ratatui::style::Color; - match self.connection_status { - ConnectionStatus::Connecting => Color::Blue, - ConnectionStatus::Idle => Color::Yellow, - ConnectionStatus::Active => Color::Green, - ConnectionStatus::Disconnected { .. } => Color::Red, + /// Average observed_duration across all workers' completed tasks. + pub(crate) fn cluster_avg_task_duration(&self) -> Option { + let mut total_secs = 0.0f64; + let mut count = 0usize; + for worker in &self.workers { + for ct in &worker.completed_tasks { + total_secs += ct.observed_duration.as_secs_f64(); + count += 1; + } } - } - - /// Extracts disconnection reason if applicable. - pub fn disconnect_reason(&self) -> Option<&str> { - if let ConnectionStatus::Disconnected { reason } = &self.connection_status { - Some(reason) + if count > 0 { + Some(Duration::from_secs_f64(total_secs / count as f64)) } else { None } } - /// Get aggregated progress across all tasks on this worker. - pub fn aggregate_progress(&self) -> (u64, u64) { - let running_completed: u64 = self.tasks.iter().map(|t| t.completed_partitions).sum(); - let running_total: u64 = self.tasks.iter().map(|t| t.total_partitions).sum(); - - // Add completed task partitions (all completed, so total = completed) - let completed_total: u64 = self - .completed_tasks + /// Longest currently-running task across all workers. + pub(crate) fn cluster_longest_active_task(&self) -> Duration { + self.workers .iter() - .map(|t| t.total_partitions) - .sum(); - - ( - running_completed + completed_total, - running_total + completed_total, - ) + .map(|w| w.longest_task_duration()) + .max() + .unwrap_or_default() } - /// Check if worker has any completed tasks. - pub fn has_completed_tasks(&self) -> bool { - !self.completed_tasks.is_empty() - } - - /// Check if all tasks are completed. - pub fn all_tasks_completed(&self) -> bool { - self.tasks.is_empty() && !self.completed_tasks.is_empty() + /// Get average task count across all workers (for hot spot detection). + pub(crate) fn avg_tasks_per_worker(&self) -> f64 { + if self.workers.is_empty() { + return 0.0; + } + let total: usize = self.workers.iter().map(|w| w.tasks.len()).sum(); + total as f64 / self.workers.len() as f64 } } diff --git a/console/src/input.rs b/console/src/input.rs new file mode 100644 index 00000000..d9fea894 --- /dev/null +++ b/console/src/input.rs @@ -0,0 +1,219 @@ +use crate::app::App; +use crate::state::{View, WorkerPanel}; +use crossterm::event::{KeyCode, KeyEvent, KeyEventKind, KeyModifiers}; + +/// Handle a key event, dispatching to the appropriate view handler. +pub(crate) fn handle_key_event(app: &mut App, key: KeyEvent) { + if key.kind != KeyEventKind::Press { + return; + } + + // Help overlay takes priority — any key closes it + if app.show_help { + app.show_help = false; + return; + } + + // Global keys + match (key.code, key.modifiers) { + (KeyCode::Char('q'), _) | (KeyCode::Char('c'), KeyModifiers::CONTROL) => { + app.should_quit = true; + return; + } + (KeyCode::Char('?'), _) => { + app.show_help = true; + return; + } + (KeyCode::Char('p'), _) => { + app.paused = !app.paused; + return; + } + (KeyCode::Char('1'), _) => { + app.current_view = View::ClusterOverview; + return; + } + (KeyCode::Char('2'), _) => { + if !app.workers.is_empty() { + app.current_view = View::WorkerDetail; + } + return; + } + _ => {} + } + + // View-specific keys + match app.current_view { + View::ClusterOverview => handle_cluster_keys(app, key), + View::WorkerDetail => handle_worker_keys(app, key), + } +} + +fn handle_cluster_keys(app: &mut App, key: KeyEvent) { + let worker_count = app.workers.len(); + if worker_count == 0 { + return; + } + + match key.code { + KeyCode::Down | KeyCode::Char('j') => { + let selected = app + .cluster_state + .table + .selected() + .unwrap_or(0) + .saturating_add(1) + .min(worker_count.saturating_sub(1)); + app.cluster_state.table.select(Some(selected)); + } + KeyCode::Up | KeyCode::Char('k') => { + let selected = app + .cluster_state + .table + .selected() + .unwrap_or(0) + .saturating_sub(1); + app.cluster_state.table.select(Some(selected)); + } + KeyCode::Home | KeyCode::Char('g') => { + app.cluster_state.table.select(Some(0)); + } + KeyCode::End | KeyCode::Char('G') => { + app.cluster_state + .table + .select(Some(worker_count.saturating_sub(1))); + } + KeyCode::Enter => { + if let Some(selected) = app.cluster_state.table.selected() { + let sorted = app.sorted_worker_indices(); + if let Some(&real_idx) = sorted.get(selected) { + app.worker_state.worker_idx = real_idx; + app.worker_state.active_table = Default::default(); + app.worker_state.completed_table = Default::default(); + app.worker_state.focused_panel = WorkerPanel::ActiveTasks; + app.current_view = View::WorkerDetail; + } + } + } + KeyCode::Left => { + app.cluster_state.selected_column = app.cluster_state.selected_column.prev(true); + } + KeyCode::Right => { + app.cluster_state.selected_column = app.cluster_state.selected_column.next(true); + } + KeyCode::Char(' ') => { + app.cluster_state.sort_direction = app.cluster_state.sort_direction.next(); + } + KeyCode::Char('r') => { + for worker in &mut app.workers { + worker.completed_tasks.clear(); + } + } + KeyCode::Tab => { + if let Some(selected) = app.cluster_state.table.selected() { + let sorted = app.sorted_worker_indices(); + if let Some(&real_idx) = sorted.get(selected) { + app.worker_state.worker_idx = real_idx; + app.current_view = View::WorkerDetail; + } + } + } + _ => {} + } +} + +fn handle_worker_keys(app: &mut App, key: KeyEvent) { + if app.worker_state.worker_idx >= app.workers.len() { + return; + } + + match key.code { + KeyCode::Esc | KeyCode::Tab => { + app.current_view = View::ClusterOverview; + } + KeyCode::Left | KeyCode::Char('h') => { + // Previous worker + if !app.workers.is_empty() { + if app.worker_state.worker_idx == 0 { + app.worker_state.worker_idx = app.workers.len() - 1; + } else { + app.worker_state.worker_idx -= 1; + } + app.worker_state.active_table = Default::default(); + app.worker_state.completed_table = Default::default(); + } + } + KeyCode::Right | KeyCode::Char('l') => { + // Next worker + if !app.workers.is_empty() { + app.worker_state.worker_idx = (app.worker_state.worker_idx + 1) % app.workers.len(); + app.worker_state.active_table = Default::default(); + app.worker_state.completed_table = Default::default(); + } + } + KeyCode::Down | KeyCode::Char('j') => { + let worker = &app.workers[app.worker_state.worker_idx]; + match app.worker_state.focused_panel { + WorkerPanel::Metrics => {} // Sparklines are not scrollable + WorkerPanel::ActiveTasks => { + let count = worker.tasks.len(); + if count > 0 { + let selected = app + .worker_state + .active_table + .selected() + .map(|s| (s + 1).min(count - 1)) + .unwrap_or(0); + app.worker_state.active_table.select(Some(selected)); + } + } + WorkerPanel::CompletedTasks => { + let count = worker.completed_tasks.len(); + if count > 0 { + let selected = app + .worker_state + .completed_table + .selected() + .map(|s| (s + 1).min(count - 1)) + .unwrap_or(0); + app.worker_state.completed_table.select(Some(selected)); + } + } + } + } + KeyCode::Up | KeyCode::Char('k') => match app.worker_state.focused_panel { + WorkerPanel::Metrics => {} // Sparklines are not scrollable + WorkerPanel::ActiveTasks => { + let selected = app + .worker_state + .active_table + .selected() + .map(|s| s.saturating_sub(1)) + .unwrap_or(0); + app.worker_state.active_table.select(Some(selected)); + } + WorkerPanel::CompletedTasks => { + let selected = app + .worker_state + .completed_table + .selected() + .map(|s| s.saturating_sub(1)) + .unwrap_or(0); + app.worker_state.completed_table.select(Some(selected)); + } + }, + KeyCode::BackTab => { + // Shift+Tab: cycle focus between panels + app.worker_state.focused_panel = match app.worker_state.focused_panel { + WorkerPanel::Metrics => WorkerPanel::ActiveTasks, + WorkerPanel::ActiveTasks => WorkerPanel::CompletedTasks, + WorkerPanel::CompletedTasks => WorkerPanel::Metrics, + }; + } + KeyCode::Char('r') => { + if let Some(worker) = app.workers.get_mut(app.worker_state.worker_idx) { + worker.completed_tasks.clear(); + } + } + _ => {} + } +} diff --git a/console/src/main.rs b/console/src/main.rs index 8c8f4133..731b504b 100644 --- a/console/src/main.rs +++ b/console/src/main.rs @@ -1,10 +1,13 @@ mod app; +mod input; +mod state; mod ui; +mod worker; use app::App; use crossterm::event::{self, Event}; use ratatui::DefaultTerminal; -use std::time::Duration; +use std::time::{Duration, Instant}; use structopt::StructOpt; use url::Url; @@ -14,9 +17,13 @@ use url::Url; about = "Console for monitoring DataFusion distributed workers" )] struct Args { - /// Comma-delimited list of worker ports (e.g. 8080,8081) - #[structopt(long = "cluster-ports", use_delimiter = true)] - cluster_ports: Vec, + /// Port of a worker to connect to for auto-discovery. + /// The console calls GetClusterWorkers on this worker to discover the full cluster. + port: u16, + + /// Polling interval in milliseconds + #[structopt(long = "poll-interval", default_value = "100")] + poll_interval: u64, } #[tokio::main] @@ -25,36 +32,40 @@ async fn main() -> color_eyre::Result<()> { let args = Args::from_args(); - let worker_urls: Vec = args - .cluster_ports - .iter() - .map(|port| Url::parse(&format!("http://localhost:{port}")).expect("valid localhost URL")) - .collect(); + let seed_url = Url::parse(&format!("http://localhost:{}", args.port)).expect("valid URL"); - let mut app = App::new(worker_urls); + let poll_interval = Duration::from_millis(args.poll_interval); + let mut app = App::new(seed_url); - // Initialize terminal let mut terminal = ratatui::init(); terminal.clear()?; - // Run TUI loop - let result = run_app(&mut terminal, &mut app).await; + let result = run_app(&mut terminal, &mut app, poll_interval).await; ratatui::restore(); result } -/// Main application loop -async fn run_app(terminal: &mut DefaultTerminal, app: &mut App) -> color_eyre::Result<()> { +async fn run_app( + terminal: &mut DefaultTerminal, + app: &mut App, + poll_interval: Duration, +) -> color_eyre::Result<()> { + let mut last_poll = Instant::now(); + loop { - app.tick().await; + if last_poll.elapsed() >= poll_interval { + app.tick().await; + last_poll = Instant::now(); + } terminal.draw(|frame| ui::render(frame, app))?; - if event::poll(Duration::from_millis(10))? { + // Check for keyboard input (16ms timeout ~ 60fps responsiveness) + if event::poll(Duration::from_millis(16))? { if let Event::Key(key) = event::read()? { - app.handle_key_event(key); + input::handle_key_event(app, key); } } diff --git a/console/src/state.rs b/console/src/state.rs new file mode 100644 index 00000000..78cb67a0 --- /dev/null +++ b/console/src/state.rs @@ -0,0 +1,131 @@ +use ratatui::widgets::TableState; + +/// Which view is currently active. +#[derive(Clone, Copy, PartialEq)] +pub(crate) enum View { + ClusterOverview, + WorkerDetail, +} + +/// Which column is selected for sorting in the cluster table. +#[derive(Clone, Copy, PartialEq, Default)] +pub(crate) enum SortColumn { + #[default] + Worker, + Status, + Tasks, + Queries, + Cpu, + Memory, +} + +impl SortColumn { + /// Move to the next column. When `wide` is false, skip Queries. + pub(crate) fn next(self, wide: bool) -> Self { + match self { + SortColumn::Worker => SortColumn::Status, + SortColumn::Status => SortColumn::Tasks, + SortColumn::Tasks => { + if wide { + SortColumn::Queries + } else { + SortColumn::Cpu + } + } + SortColumn::Queries => SortColumn::Cpu, + SortColumn::Cpu => SortColumn::Memory, + SortColumn::Memory => SortColumn::Worker, + } + } + + /// Move to the previous column. When `wide` is false, skip Queries. + pub(crate) fn prev(self, wide: bool) -> Self { + match self { + SortColumn::Worker => SortColumn::Memory, + SortColumn::Status => SortColumn::Worker, + SortColumn::Tasks => SortColumn::Status, + SortColumn::Queries => SortColumn::Tasks, + SortColumn::Cpu => { + if wide { + SortColumn::Queries + } else { + SortColumn::Tasks + } + } + SortColumn::Memory => SortColumn::Cpu, + } + } +} + +/// Sort direction for the selected column. +#[derive(Clone, Copy, PartialEq, Default)] +pub(crate) enum SortDirection { + #[default] + Unsorted, + Ascending, + Descending, +} + +impl SortDirection { + pub(crate) fn next(self) -> Self { + match self { + SortDirection::Unsorted => SortDirection::Ascending, + SortDirection::Ascending => SortDirection::Descending, + SortDirection::Descending => SortDirection::Unsorted, + } + } + + pub(crate) fn indicator(self) -> &'static str { + match self { + SortDirection::Unsorted => "", + SortDirection::Ascending => " ▲", + SortDirection::Descending => " ▼", + } + } +} + +/// Which panel is focused in worker detail view. +#[derive(Clone, Copy, PartialEq)] +pub(crate) enum WorkerPanel { + Metrics, + ActiveTasks, + CompletedTasks, +} + +/// State for the Cluster Overview view. +pub(crate) struct ClusterViewState { + pub(crate) table: TableState, + pub(crate) selected_column: SortColumn, + pub(crate) sort_direction: SortDirection, +} + +impl Default for ClusterViewState { + fn default() -> Self { + let mut table = TableState::default(); + table.select(Some(0)); + Self { + table, + selected_column: SortColumn::default(), + sort_direction: SortDirection::default(), + } + } +} + +/// State for the Worker Detail view. +pub(crate) struct WorkerViewState { + pub(crate) worker_idx: usize, + pub(crate) active_table: TableState, + pub(crate) completed_table: TableState, + pub(crate) focused_panel: WorkerPanel, +} + +impl Default for WorkerViewState { + fn default() -> Self { + Self { + worker_idx: 0, + active_table: TableState::default(), + completed_table: TableState::default(), + focused_panel: WorkerPanel::ActiveTasks, + } + } +} diff --git a/console/src/ui.rs b/console/src/ui.rs deleted file mode 100644 index 3756c167..00000000 --- a/console/src/ui.rs +++ /dev/null @@ -1,336 +0,0 @@ -use crate::app::{App, ConsoleState, WorkerState}; -use ratatui::{ - Frame, - layout::{Constraint, Direction, Layout, Rect}, - style::{Color, Modifier, Style}, - text::{Line, Span}, - widgets::{Block, Borders, Gauge, Paragraph}, -}; - -/// Main rendering function -pub fn render(frame: &mut Frame, app: &App) { - let chunks = Layout::default() - .direction(Direction::Vertical) - .constraints([ - Constraint::Length(3), // Header with cluster summary - Constraint::Min(0), // Workers - Constraint::Length(3), // Footer - ]) - .split(frame.area()); - - render_header(frame, chunks[0], app); - render_workers(frame, chunks[1], app); - render_footer(frame, chunks[2], app); -} - -/// Render header with cluster summary -fn render_header(frame: &mut Frame, area: Rect, app: &App) { - let stats = app.cluster_stats(); - let worker_count = app.workers.len(); - - // Show console state prominently - let (state_text, state_color) = match app.console_state { - ConsoleState::Idle => ("IDLE", Color::Yellow), - ConsoleState::Active => ("ACTIVE", Color::Green), - ConsoleState::Completed => ("COMPLETED", Color::Cyan), - }; - - let header_text = vec![Line::from(vec![ - Span::styled("Console: ", Style::default().add_modifier(Modifier::BOLD)), - Span::styled( - state_text, - Style::default() - .fg(state_color) - .add_modifier(Modifier::BOLD), - ), - Span::raw(" | "), - Span::styled( - format!("{worker_count} workers"), - Style::default().add_modifier(Modifier::BOLD), - ), - Span::raw(" | "), - Span::styled("Workers: ", Style::default().add_modifier(Modifier::BOLD)), - Span::styled( - format!("{} ACTIVE", stats.active_count), - Style::default() - .fg(Color::Green) - .add_modifier(Modifier::BOLD), - ), - Span::raw(", "), - Span::styled( - format!("{} IDLE", stats.idle_count), - Style::default() - .fg(Color::Yellow) - .add_modifier(Modifier::BOLD), - ), - Span::raw(", "), - Span::styled( - format!("{} DISCONNECTED", stats.disconnected_count), - Style::default().fg(Color::Red).add_modifier(Modifier::BOLD), - ), - Span::raw(" | "), - Span::styled("Tasks: ", Style::default().add_modifier(Modifier::BOLD)), - Span::styled( - stats.total_tasks.to_string(), - Style::default() - .fg(Color::Cyan) - .add_modifier(Modifier::BOLD), - ), - Span::raw(", "), - Span::styled( - format!("{} COMPLETED", stats.completed), - Style::default() - .fg(Color::Cyan) - .add_modifier(Modifier::BOLD), - ), - ])]; - - let header = Paragraph::new(header_text).block( - Block::default() - .borders(Borders::ALL) - .title("DataFusion Distributed Console"), - ); - - frame.render_widget(header, area); -} - -/// Render all workers in vertical layout -fn render_workers(frame: &mut Frame, area: Rect, app: &App) { - if app.workers.is_empty() { - let empty_msg = Paragraph::new(vec![ - Line::from(""), - Line::from(Span::styled( - "No workers configured", - Style::default() - .fg(Color::DarkGray) - .add_modifier(Modifier::ITALIC), - )), - ]) - .block(Block::default().borders(Borders::ALL).title("Workers")) - .centered(); - - frame.render_widget(empty_msg, area); - return; - } - - // Each worker needs only ~5 lines now (border + label + bar + spacing) - let worker_height = 5; - let constraints: Vec<_> = app - .workers - .iter() - .map(|_| Constraint::Length(worker_height)) - .collect(); - - let worker_chunks = Layout::default() - .direction(Direction::Vertical) - .constraints(constraints) - .split(area); - - for (i, worker) in app.workers.iter().enumerate() { - if i < worker_chunks.len() { - render_worker_section(frame, worker_chunks[i], worker); - } - } -} - -/// Render a single worker section -fn render_worker_section(frame: &mut Frame, area: Rect, worker: &WorkerState) { - let status = worker.status_text(); - let status_color = worker.status_color(); - - let title = format!("Worker: :{} [{}]", worker.url.as_str(), status); - - let block = Block::default().borders(Borders::ALL).title(Span::styled( - title, - Style::default() - .fg(status_color) - .add_modifier(Modifier::BOLD), - )); - - let inner_area = block.inner(area); - frame.render_widget(block, area); - - // Render content based on state - if let Some(reason) = worker.disconnect_reason() { - // Show disconnection reason - let disconnect_msg = Paragraph::new(vec![ - Line::from(""), - Line::from(Span::styled( - format!("Connection failed: {reason}"), - Style::default().fg(Color::Red), - )), - ]) - .centered(); - - frame.render_widget(disconnect_msg, inner_area); - } else if worker.all_tasks_completed() { - // Show completed state (100% progress bar) - render_completed_state(frame, inner_area, worker); - } else if worker.tasks.is_empty() && !worker.has_completed_tasks() { - // Show idle message - let idle_msg = Paragraph::new(vec![Line::from(Span::styled( - "No active tasks", - Style::default() - .fg(Color::DarkGray) - .add_modifier(Modifier::ITALIC), - ))]) - .centered(); - - frame.render_widget(idle_msg, inner_area); - } else { - // Show aggregated progress bar (running + completed) - render_aggregated_progress(frame, inner_area, worker); - } -} - -/// Render aggregated progress bar for all tasks on a worker -fn render_aggregated_progress(frame: &mut Frame, area: Rect, worker: &WorkerState) { - let (completed, total) = worker.aggregate_progress(); - let task_count = worker.tasks.len(); - - let ratio = if total > 0 { - completed as f64 / total as f64 - } else { - 0.0 - }; - - let chunks = Layout::default() - .direction(Direction::Vertical) - .constraints([ - Constraint::Length(1), // Label line - Constraint::Length(2), // Progress bar - ]) - .split(area); - - // Label: "Overall Progress (N tasks)" - let label = Line::from(Span::styled( - format!("Overall Progress ({task_count} tasks)"), - Style::default().fg(Color::Cyan), - )); - frame.render_widget(Paragraph::new(label), chunks[0]); - - // Progress bar - let progress_label = format!("{}/{} partitions ({:.1}%)", completed, total, ratio * 100.0); - - let gauge = Gauge::default() - .gauge_style( - Style::default() - .fg(Color::Green) - .bg(Color::DarkGray) - .add_modifier(Modifier::BOLD), - ) - .label(progress_label) - .use_unicode(true) - .ratio(ratio); - - frame.render_widget(gauge, chunks[1]); -} - -/// Render completed state for a worker -fn render_completed_state(frame: &mut Frame, area: Rect, worker: &WorkerState) { - let total: u64 = worker - .completed_tasks - .iter() - .map(|t| t.total_partitions) - .sum(); - let task_count = worker.completed_tasks.len(); - - let chunks = Layout::default() - .direction(Direction::Vertical) - .constraints([ - Constraint::Length(1), // Label line - Constraint::Length(2), // Progress bar - ]) - .split(area); - - // Label: "Tasks Completed ✓ (N tasks)" - let label = Line::from(Span::styled( - format!("Tasks Completed ✓ ({task_count} tasks)"), - Style::default() - .fg(Color::Cyan) - .add_modifier(Modifier::BOLD), - )); - frame.render_widget(Paragraph::new(label), chunks[0]); - - // Progress bar showing 100% complete in cyan - let progress_label = format!("{total}/{total} partitions (100.0%)"); - - let gauge = Gauge::default() - .gauge_style( - Style::default() - .fg(Color::Cyan) - .bg(Color::DarkGray) - .add_modifier(Modifier::BOLD), - ) - .label(progress_label) - .use_unicode(true) - .ratio(1.0); - - frame.render_widget(gauge, chunks[1]); -} - -/// Render footer with controls -fn render_footer(frame: &mut Frame, area: Rect, app: &App) { - let footer_text = match app.console_state { - ConsoleState::Idle => Line::from(vec![ - Span::styled( - "Waiting for tasks... ", - Style::default().fg(Color::DarkGray), - ), - Span::styled("Press ", Style::default().fg(Color::DarkGray)), - Span::styled( - "'q'", - Style::default() - .fg(Color::Cyan) - .add_modifier(Modifier::BOLD), - ), - Span::styled(" to quit", Style::default().fg(Color::DarkGray)), - ]), - ConsoleState::Completed => Line::from(vec![ - Span::styled( - "Query completed! Press ", - Style::default().fg(Color::DarkGray), - ), - Span::styled( - "'r'", - Style::default() - .fg(Color::Cyan) - .add_modifier(Modifier::BOLD), - ), - Span::styled(" to reset | ", Style::default().fg(Color::DarkGray)), - Span::styled( - "'q'", - Style::default() - .fg(Color::Cyan) - .add_modifier(Modifier::BOLD), - ), - Span::styled(" to quit", Style::default().fg(Color::DarkGray)), - ]), - ConsoleState::Active => Line::from(vec![ - Span::styled("Press ", Style::default().fg(Color::DarkGray)), - Span::styled( - "'q'", - Style::default() - .fg(Color::Cyan) - .add_modifier(Modifier::BOLD), - ), - Span::styled(" or ", Style::default().fg(Color::DarkGray)), - Span::styled( - "ESC", - Style::default() - .fg(Color::Cyan) - .add_modifier(Modifier::BOLD), - ), - Span::styled( - " to quit | Updates every 10ms", - Style::default().fg(Color::DarkGray), - ), - ]), - }; - - let footer = Paragraph::new(footer_text) - .block(Block::default().borders(Borders::ALL)) - .centered(); - - frame.render_widget(footer, area); -} diff --git a/console/src/ui/cluster.rs b/console/src/ui/cluster.rs new file mode 100644 index 00000000..af2cc399 --- /dev/null +++ b/console/src/ui/cluster.rs @@ -0,0 +1,339 @@ +use super::format::{format_bytes, format_duration, format_rows_throughput}; +use crate::app::App; +use crate::state::{SortColumn, SortDirection}; +use ratatui::Frame; +use ratatui::layout::{Constraint, Layout, Rect}; +use ratatui::style::{Color, Modifier, Style}; +use ratatui::text::{Line, Span}; +use ratatui::widgets::{Block, Borders, Cell, Paragraph, Row, Table}; + +pub(super) fn render(frame: &mut Frame, area: Rect, app: &mut App) { + let [metrics_area, table_area, summary_area] = Layout::vertical([ + Constraint::Length(4), + Constraint::Min(3), + Constraint::Length(1), + ]) + .areas(area); + + render_cluster_metrics(frame, metrics_area, app); + render_worker_table(frame, table_area, app); + render_task_distribution(frame, summary_area, app); +} + +fn render_cluster_metrics(frame: &mut Frame, area: Rect, app: &App) { + let block = Block::default().borders(Borders::ALL).title(Span::styled( + " Cluster Metrics ", + Style::default().add_modifier(Modifier::BOLD), + )); + + let inner = block.inner(area); + frame.render_widget(block, area); + + let [line1_area, line2_area] = + Layout::vertical([Constraint::Length(1), Constraint::Length(1)]).areas(inner); + + let stats = app.cluster_stats(); + + // Line 1: Throughput and active workers + let throughput_str = if app.current_throughput > 0.0 { + format_rows_throughput(app.current_throughput) + } else { + "--".to_string() + }; + let throughput_color = if app.current_throughput > 0.0 { + Color::Green + } else { + Color::DarkGray + }; + + let active_str = format!("{}/{} workers", stats.active_count, stats.total); + let active_color = if stats.active_count > 0 { + Color::Green + } else { + Color::DarkGray + }; + + let queries_str = format!("{}", stats.active_queries); + let queries_color = if stats.active_queries > 0 { + Color::Cyan + } else { + Color::DarkGray + }; + + let line1 = Line::from(vec![ + Span::styled( + " Throughput: ", + Style::default().add_modifier(Modifier::BOLD), + ), + Span::styled(throughput_str, Style::default().fg(throughput_color)), + Span::raw(" "), + Span::styled(" Active: ", Style::default().add_modifier(Modifier::BOLD)), + Span::styled(active_str, Style::default().fg(active_color)), + Span::raw(" "), + Span::styled( + " Queries in flight: ", + Style::default().add_modifier(Modifier::BOLD), + ), + Span::styled(queries_str, Style::default().fg(queries_color)), + ]); + frame.render_widget(Paragraph::new(line1), line1_area); + + // Line 2: Completed tasks, avg duration, longest active task + let completed_str = format!("{} tasks", stats.total_completed); + + let avg_dur_str = app + .cluster_avg_task_duration() + .map(format_duration) + .unwrap_or_else(|| "--".to_string()); + + let longest = app.cluster_longest_active_task(); + let longest_str = if longest.is_zero() { + "--".to_string() + } else { + format_duration(longest) + }; + let longest_color = if longest.as_secs() > 60 { + Color::Red + } else if longest.as_secs() > 30 { + Color::Yellow + } else { + Color::DarkGray + }; + + let line2 = Line::from(vec![ + Span::styled( + " Completed: ", + Style::default().add_modifier(Modifier::BOLD), + ), + Span::styled(completed_str, Style::default().fg(Color::Cyan)), + Span::raw(" "), + Span::styled( + "Avg duration: ", + Style::default().add_modifier(Modifier::BOLD), + ), + Span::styled(avg_dur_str, Style::default().fg(Color::DarkGray)), + Span::raw(" "), + Span::styled("Longest: ", Style::default().add_modifier(Modifier::BOLD)), + Span::styled(longest_str, Style::default().fg(longest_color)), + ]); + frame.render_widget(Paragraph::new(line2), line2_area); +} + +fn render_worker_table(frame: &mut Frame, area: Rect, app: &mut App) { + let sorted_indices = app.sorted_worker_indices(); + let avg_tasks = app.avg_tasks_per_worker(); + let wide = area.width >= 100; + + let columns: Vec<(&str, SortColumn)> = if wide { + vec![ + ("Worker", SortColumn::Worker), + ("Status", SortColumn::Status), + ("Tasks", SortColumn::Tasks), + ("Queries", SortColumn::Queries), + ("CPU", SortColumn::Cpu), + ("Memory", SortColumn::Memory), + ] + } else { + vec![ + ("Worker", SortColumn::Worker), + ("Status", SortColumn::Status), + ("Tasks", SortColumn::Tasks), + ("CPU", SortColumn::Cpu), + ("Memory", SortColumn::Memory), + ] + }; + + let selected_col = app.cluster_state.selected_column; + let sort_dir = app.cluster_state.sort_direction; + + let header = Row::new(columns.iter().map(|(label, col)| { + let is_selected = *col == selected_col; + let indicator = if is_selected { + sort_dir.indicator() + } else { + "" + }; + let text = format!("{label}{indicator}"); + + let style = if is_selected && sort_dir != SortDirection::Unsorted { + Style::default() + .fg(Color::Cyan) + .add_modifier(Modifier::BOLD | Modifier::UNDERLINED) + } else if is_selected { + Style::default() + .fg(Color::White) + .add_modifier(Modifier::BOLD | Modifier::UNDERLINED) + } else { + Style::default() + .fg(Color::White) + .add_modifier(Modifier::BOLD) + }; + + Cell::from(text).style(style) + })) + .height(1); + + let rows: Vec = sorted_indices + .iter() + .map(|&idx| { + let worker = &app.workers[idx]; + let task_count = worker.tasks.len(); + let status_color = worker.status_color(); + let is_disconnected = matches!( + worker.connection_status, + crate::worker::ConnectionStatus::Disconnected { .. } + ); + + // URL display + let url_str = worker.url.as_str(); + let url_display = if wide || url_str.len() <= 30 { + url_str.to_string() + } else { + format!("..{}", &url_str[url_str.len().saturating_sub(28)..]) + }; + + // Task count with hot spot highlighting + let task_style = + if task_count > 0 && avg_tasks > 0.0 && task_count as f64 > avg_tasks * 2.0 { + Style::default().fg(Color::Red).add_modifier(Modifier::BOLD) + } else if task_count > 0 { + Style::default().fg(Color::Green) + } else { + Style::default().fg(Color::DarkGray) + }; + + let task_str = if is_disconnected { + "-".to_string() + } else { + task_count.to_string() + }; + + // CPU usage + let (cpu_str, cpu_style) = if is_disconnected { + ("-".to_string(), Style::default().fg(Color::DarkGray)) + } else if worker.cpu_usage_percent > 0.0 { + let style = if worker.cpu_usage_percent > 95.0 { + Style::default().fg(Color::Red).add_modifier(Modifier::BOLD) + } else if worker.cpu_usage_percent > 80.0 { + Style::default().fg(Color::Yellow) + } else { + Style::default().fg(Color::Green) + }; + (format!("{:.1}%", worker.cpu_usage_percent), style) + } else { + ("--".to_string(), Style::default().fg(Color::DarkGray)) + }; + + // RSS memory + let (rss_str, rss_style) = if is_disconnected { + ("-".to_string(), Style::default().fg(Color::DarkGray)) + } else if worker.rss_bytes > 0 { + ( + format_bytes(worker.rss_bytes), + Style::default().fg(Color::White), + ) + } else { + ("--".to_string(), Style::default().fg(Color::DarkGray)) + }; + + if wide { + let query_count = worker.distinct_query_count(); + let query_str = if is_disconnected { + "-".to_string() + } else { + query_count.to_string() + }; + + Row::new(vec![ + Cell::from(url_display), + Cell::from(worker.status_text()).style(Style::default().fg(status_color)), + Cell::from(task_str).style(task_style), + Cell::from(query_str).style(Style::default().fg(Color::DarkGray)), + Cell::from(cpu_str).style(cpu_style), + Cell::from(rss_str).style(rss_style), + ]) + } else { + Row::new(vec![ + Cell::from(url_display), + Cell::from(worker.status_text()).style(Style::default().fg(status_color)), + Cell::from(task_str).style(task_style), + Cell::from(cpu_str).style(cpu_style), + Cell::from(rss_str).style(rss_style), + ]) + } + }) + .collect(); + + let widths = if wide { + vec![ + Constraint::Percentage(30), + Constraint::Percentage(12), + Constraint::Percentage(10), + Constraint::Percentage(12), + Constraint::Percentage(13), + Constraint::Percentage(13), + ] + } else { + vec![ + Constraint::Percentage(35), + Constraint::Percentage(15), + Constraint::Percentage(15), + Constraint::Percentage(15), + Constraint::Percentage(20), + ] + }; + + let table = Table::new(rows, widths) + .header(header) + .block(Block::default().borders(Borders::ALL).title(" Workers ")) + .row_highlight_style( + Style::default() + .bg(Color::DarkGray) + .add_modifier(Modifier::BOLD), + ) + .highlight_symbol("▸ "); + + frame.render_stateful_widget(table, area, &mut app.cluster_state.table); +} + +fn render_task_distribution(frame: &mut Frame, area: Rect, app: &App) { + if app.workers.is_empty() { + return; + } + + let mut task_counts: Vec = app.workers.iter().map(|w| w.tasks.len()).collect(); + task_counts.sort(); + + let min = task_counts.first().copied().unwrap_or(0); + let max = task_counts.last().copied().unwrap_or(0); + let sum: usize = task_counts.iter().sum(); + let avg = sum as f64 / task_counts.len() as f64; + let median = task_counts[task_counts.len() / 2]; + + let line = Line::from(vec![ + Span::styled(" Tasks/worker: ", Style::default().fg(Color::DarkGray)), + Span::styled(format!("min={min}"), Style::default().fg(Color::White)), + Span::raw(" "), + Span::styled( + format!("max={max}"), + if max > 0 && max as f64 > avg * 2.0 { + Style::default().fg(Color::Red).add_modifier(Modifier::BOLD) + } else { + Style::default().fg(Color::White) + }, + ), + Span::raw(" "), + Span::styled(format!("avg={avg:.1}"), Style::default().fg(Color::White)), + Span::raw(" "), + Span::styled( + format!("median={median}"), + Style::default().fg(Color::White), + ), + Span::styled( + format!(" ({} workers)", app.workers.len()), + Style::default().fg(Color::DarkGray), + ), + ]); + + frame.render_widget(Paragraph::new(line), area); +} diff --git a/console/src/ui/footer.rs b/console/src/ui/footer.rs new file mode 100644 index 00000000..94e0363f --- /dev/null +++ b/console/src/ui/footer.rs @@ -0,0 +1,86 @@ +use crate::app::App; +use crate::state::View; +use ratatui::Frame; +use ratatui::layout::Rect; +use ratatui::style::{Color, Modifier, Style}; +use ratatui::text::{Line, Span}; +use ratatui::widgets::Paragraph; + +pub(super) fn render(frame: &mut Frame, area: Rect, app: &App) { + let tab_spans = |view: View| -> Vec { + let cluster_style = if view == View::ClusterOverview { + Style::default() + .fg(Color::Cyan) + .add_modifier(Modifier::BOLD) + } else { + Style::default().fg(Color::DarkGray) + }; + let worker_style = if view == View::WorkerDetail { + Style::default() + .fg(Color::Cyan) + .add_modifier(Modifier::BOLD) + } else { + Style::default().fg(Color::DarkGray) + }; + + vec![ + Span::styled("[1]Cluster", cluster_style), + Span::raw(" "), + Span::styled("[2]Worker", worker_style), + ] + }; + + let context_hints = match app.current_view { + View::ClusterOverview => vec![ + Span::styled(" j/k", Style::default().fg(Color::Yellow)), + Span::styled(":select", Style::default().fg(Color::DarkGray)), + Span::raw(" "), + Span::styled("Enter", Style::default().fg(Color::Yellow)), + Span::styled(":detail", Style::default().fg(Color::DarkGray)), + Span::raw(" "), + Span::styled("\u{2190}\u{2192}", Style::default().fg(Color::Yellow)), + Span::styled(":column", Style::default().fg(Color::DarkGray)), + Span::raw(" "), + Span::styled("Space", Style::default().fg(Color::Yellow)), + Span::styled(":sort", Style::default().fg(Color::DarkGray)), + Span::raw(" "), + Span::styled("p", Style::default().fg(Color::Yellow)), + Span::styled(":pause", Style::default().fg(Color::DarkGray)), + Span::raw(" "), + Span::styled("?", Style::default().fg(Color::Yellow)), + Span::styled(":help", Style::default().fg(Color::DarkGray)), + Span::raw(" "), + Span::styled("q", Style::default().fg(Color::Yellow)), + Span::styled(":quit", Style::default().fg(Color::DarkGray)), + ], + View::WorkerDetail => vec![ + Span::styled(" h/l", Style::default().fg(Color::Yellow)), + Span::styled(":prev/next", Style::default().fg(Color::DarkGray)), + Span::raw(" "), + Span::styled("j/k", Style::default().fg(Color::Yellow)), + Span::styled(":select", Style::default().fg(Color::DarkGray)), + Span::raw(" "), + Span::styled("S-Tab", Style::default().fg(Color::Yellow)), + Span::styled(":panel", Style::default().fg(Color::DarkGray)), + Span::raw(" "), + Span::styled("Esc", Style::default().fg(Color::Yellow)), + Span::styled(":back", Style::default().fg(Color::DarkGray)), + Span::raw(" "), + Span::styled("p", Style::default().fg(Color::Yellow)), + Span::styled(":pause", Style::default().fg(Color::DarkGray)), + Span::raw(" "), + Span::styled("?", Style::default().fg(Color::Yellow)), + Span::styled(":help", Style::default().fg(Color::DarkGray)), + Span::raw(" "), + Span::styled("q", Style::default().fg(Color::Yellow)), + Span::styled(":quit", Style::default().fg(Color::DarkGray)), + ], + }; + + let mut spans = tab_spans(app.current_view); + spans.push(Span::styled(" │", Style::default().fg(Color::DarkGray))); + spans.extend(context_hints); + + let footer = Paragraph::new(Line::from(spans)); + frame.render_widget(footer, area); +} diff --git a/console/src/ui/format.rs b/console/src/ui/format.rs new file mode 100644 index 00000000..8328616e --- /dev/null +++ b/console/src/ui/format.rs @@ -0,0 +1,69 @@ +use ratatui::style::Color; +use std::time::Duration; + +/// Format a byte count as a human-readable string. +/// Returns `"--"` for zero bytes. +pub(super) fn format_bytes(bytes: u64) -> String { + if bytes == 0 { + "--".to_string() + } else if bytes >= 1_073_741_824 { + format!("{:.1} GB", bytes as f64 / 1_073_741_824.0) + } else if bytes >= 1_048_576 { + format!("{:.1} MB", bytes as f64 / 1_048_576.0) + } else if bytes >= 1_024 { + format!("{:.1} KB", bytes as f64 / 1_024.0) + } else { + format!("{bytes} B") + } +} + +/// Format a duration as a compact human-readable string. +pub(super) fn format_duration(d: Duration) -> String { + let secs = d.as_secs(); + let millis = d.subsec_millis(); + if secs == 0 { + format!("{millis}ms") + } else if secs < 60 { + format!("{secs}.{:01}s", millis / 100) + } else if secs < 3600 { + format!("{}m {}s", secs / 60, secs % 60) + } else { + format!("{}h {}m", secs / 3600, (secs % 3600) / 60) + } +} + +/// Format a row count with K/M suffixes. +/// Returns `"--"` for zero rows. +pub(super) fn format_row_count(rows: u64) -> String { + if rows == 0 { + "--".to_string() + } else if rows >= 1_000_000 { + format!("{:.1}M", rows as f64 / 1_000_000.0) + } else if rows >= 1_000 { + format!("{:.1}K", rows as f64 / 1_000.0) + } else { + rows.to_string() + } +} + +/// Format a rows-per-second throughput value. +pub(super) fn format_rows_throughput(rows_per_sec: f64) -> String { + if rows_per_sec >= 1_000_000.0 { + format!("{:.1}M rows out/s", rows_per_sec / 1_000_000.0) + } else if rows_per_sec >= 1_000.0 { + format!("{:.1}K rows out/s", rows_per_sec / 1_000.0) + } else { + format!("{rows_per_sec:.0} rows out/s") + } +} + +/// Return a color for CPU usage percentage. +pub(super) fn cpu_color(pct: f64) -> Color { + if pct > 95.0 { + Color::Red + } else if pct > 80.0 { + Color::Yellow + } else { + Color::Green + } +} diff --git a/console/src/ui/header.rs b/console/src/ui/header.rs new file mode 100644 index 00000000..2d142a12 --- /dev/null +++ b/console/src/ui/header.rs @@ -0,0 +1,84 @@ +use super::format::format_duration; +use crate::app::App; +use crate::state::View; +use ratatui::Frame; +use ratatui::layout::Rect; +use ratatui::style::{Color, Modifier, Style}; +use ratatui::text::{Line, Span}; +use ratatui::widgets::{Block, Borders, Paragraph}; + +pub(super) fn render(frame: &mut Frame, area: Rect, app: &App) { + let stats = app.cluster_stats(); + + let view_name = match app.current_view { + View::ClusterOverview => "Cluster Overview", + View::WorkerDetail => "Worker Detail", + }; + + let live_badge = if app.paused { + Span::styled( + " PAUSED ", + Style::default().fg(Color::Red).add_modifier(Modifier::BOLD), + ) + } else { + Span::styled( + " LIVE ", + Style::default() + .fg(Color::Green) + .add_modifier(Modifier::BOLD), + ) + }; + + let uptime = app.start_time.elapsed(); + let uptime_str = format_duration(uptime); + + let line = Line::from(vec![ + Span::styled( + format!("Workers: {} total ", stats.total), + Style::default().add_modifier(Modifier::BOLD), + ), + Span::styled( + format!("{} active", stats.active_count), + Style::default() + .fg(Color::Green) + .add_modifier(Modifier::BOLD), + ), + Span::raw(" "), + Span::styled( + format!("{} idle", stats.idle_count), + Style::default() + .fg(Color::Yellow) + .add_modifier(Modifier::BOLD), + ), + Span::raw(" "), + Span::styled( + format!("{} disconnected", stats.disconnected_count), + Style::default().fg(Color::Red).add_modifier(Modifier::BOLD), + ), + Span::raw(" "), + Span::styled( + format!("Queries: {}", stats.active_queries), + Style::default() + .fg(Color::Cyan) + .add_modifier(Modifier::BOLD), + ), + Span::raw(" "), + Span::styled( + format!("Tasks: {}", stats.total_tasks), + Style::default() + .fg(Color::Cyan) + .add_modifier(Modifier::BOLD), + ), + Span::raw(" "), + Span::styled(uptime_str, Style::default().fg(Color::DarkGray)), + Span::raw(" "), + live_badge, + ]); + + let title = format!(" datafusion-distributed-console — {view_name} "); + let header = Paragraph::new(line).block(Block::default().borders(Borders::BOTTOM).title( + Span::styled(title, Style::default().add_modifier(Modifier::BOLD)), + )); + + frame.render_widget(header, area); +} diff --git a/console/src/ui/help.rs b/console/src/ui/help.rs new file mode 100644 index 00000000..961ed7a1 --- /dev/null +++ b/console/src/ui/help.rs @@ -0,0 +1,133 @@ +use ratatui::Frame; +use ratatui::layout::{Constraint, Layout, Rect}; +use ratatui::style::{Color, Modifier, Style}; +use ratatui::text::{Line, Span}; +use ratatui::widgets::{Block, Borders, Clear, Paragraph, Wrap}; + +pub(super) fn render_overlay(frame: &mut Frame) { + let area = centered_rect(50, 60, frame.area()); + + // Clear the background + frame.render_widget(Clear, area); + + let lines = vec![ + Line::from(""), + Line::from(Span::styled( + " Global", + Style::default() + .fg(Color::Cyan) + .add_modifier(Modifier::BOLD), + )), + Line::from(vec![ + Span::styled(" 1 / 2 ", Style::default().fg(Color::Yellow)), + Span::raw("Switch views"), + ]), + Line::from(vec![ + Span::styled(" Tab ", Style::default().fg(Color::Yellow)), + Span::raw("Switch views"), + ]), + Line::from(vec![ + Span::styled(" p ", Style::default().fg(Color::Yellow)), + Span::raw("Pause/resume polling"), + ]), + Line::from(vec![ + Span::styled(" q / Ctrl+C ", Style::default().fg(Color::Yellow)), + Span::raw("Quit"), + ]), + Line::from(vec![ + Span::styled(" ? ", Style::default().fg(Color::Yellow)), + Span::raw("Toggle this help"), + ]), + Line::from(""), + Line::from(Span::styled( + " Navigation", + Style::default() + .fg(Color::Cyan) + .add_modifier(Modifier::BOLD), + )), + Line::from(vec![ + Span::styled(" j / k ", Style::default().fg(Color::Yellow)), + Span::raw("Move selection up/down"), + ]), + Line::from(vec![ + Span::styled(" Enter ", Style::default().fg(Color::Yellow)), + Span::raw("Drill into worker"), + ]), + Line::from(vec![ + Span::styled(" Esc ", Style::default().fg(Color::Yellow)), + Span::raw("Go back"), + ]), + Line::from(vec![ + Span::styled(" h / l ", Style::default().fg(Color::Yellow)), + Span::raw("Prev/next worker (detail)"), + ]), + Line::from(vec![ + Span::styled(" Shift+Tab ", Style::default().fg(Color::Yellow)), + Span::raw("Cycle panels (Metrics/Tasks/Completed)"), + ]), + Line::from(""), + Line::from(Span::styled( + " Cluster View", + Style::default() + .fg(Color::Cyan) + .add_modifier(Modifier::BOLD), + )), + Line::from(vec![ + Span::styled( + " \u{2190} / \u{2192} ", + Style::default().fg(Color::Yellow), + ), + Span::raw("Select sort column"), + ]), + Line::from(vec![ + Span::styled(" Space ", Style::default().fg(Color::Yellow)), + Span::raw("Cycle sort direction (\u{25b2}/\u{25bc})"), + ]), + Line::from(vec![ + Span::styled(" r ", Style::default().fg(Color::Yellow)), + Span::raw("Reset completed tasks"), + ]), + Line::from(vec![ + Span::styled(" g / G ", Style::default().fg(Color::Yellow)), + Span::raw("Jump to top/bottom"), + ]), + Line::from(""), + Line::from(Span::styled( + " Press any key to close", + Style::default().fg(Color::DarkGray), + )), + ]; + + let help = Paragraph::new(lines).wrap(Wrap { trim: false }).block( + Block::default() + .borders(Borders::ALL) + .title(Span::styled( + " Help ", + Style::default() + .fg(Color::Cyan) + .add_modifier(Modifier::BOLD), + )) + .style(Style::default().bg(Color::Black)), + ); + + frame.render_widget(help, area); +} + +/// Create a centered rect using percentages of the parent area. +fn centered_rect(percent_x: u16, percent_y: u16, area: Rect) -> Rect { + let [_, center_v, _] = Layout::vertical([ + Constraint::Percentage((100 - percent_y) / 2), + Constraint::Percentage(percent_y), + Constraint::Percentage((100 - percent_y) / 2), + ]) + .areas(area); + + let [_, center, _] = Layout::horizontal([ + Constraint::Percentage((100 - percent_x) / 2), + Constraint::Percentage(percent_x), + Constraint::Percentage((100 - percent_x) / 2), + ]) + .areas(center_v); + + center +} diff --git a/console/src/ui/mod.rs b/console/src/ui/mod.rs new file mode 100644 index 00000000..5601e64f --- /dev/null +++ b/console/src/ui/mod.rs @@ -0,0 +1,34 @@ +mod cluster; +mod footer; +mod format; +mod header; +mod help; +mod worker; + +use crate::app::App; +use crate::state::View; +use ratatui::Frame; +use ratatui::layout::{Constraint, Layout}; + +/// Top-level render dispatch. +pub(crate) fn render(frame: &mut Frame, app: &mut App) { + let [header_area, content_area, footer_area] = Layout::vertical([ + Constraint::Length(2), + Constraint::Min(5), + Constraint::Length(1), + ]) + .areas(frame.area()); + + header::render(frame, header_area, app); + + match app.current_view { + View::ClusterOverview => cluster::render(frame, content_area, app), + View::WorkerDetail => worker::render(frame, content_area, app), + } + + footer::render(frame, footer_area, app); + + if app.show_help { + help::render_overlay(frame); + } +} diff --git a/console/src/ui/worker.rs b/console/src/ui/worker.rs new file mode 100644 index 00000000..9bee0d9e --- /dev/null +++ b/console/src/ui/worker.rs @@ -0,0 +1,386 @@ +use super::format::{cpu_color, format_bytes, format_duration, format_row_count}; +use crate::app::App; +use crate::state::WorkerPanel; +use ratatui::Frame; +use ratatui::layout::{Constraint, Layout, Rect}; +use ratatui::style::{Color, Modifier, Style}; +use ratatui::text::{Line, Span}; +use ratatui::widgets::{Block, Borders, Cell, Paragraph, Row, Sparkline, SparklineBar, Table}; +use std::collections::VecDeque; + +pub(super) fn render(frame: &mut Frame, area: Rect, app: &mut App) { + let idx = app.worker_state.worker_idx; + if idx >= app.workers.len() { + let msg = Paragraph::new("No worker selected") + .style(Style::default().fg(Color::DarkGray)) + .centered(); + frame.render_widget(msg, area); + return; + } + + let [ + summary_area, + metrics_area, + active_area, + completed_area, + conn_area, + ] = Layout::vertical([ + Constraint::Length(1), + Constraint::Length(9), + Constraint::Percentage(45), + Constraint::Min(4), + Constraint::Length(1), + ]) + .areas(area); + + render_summary(frame, summary_area, app, idx); + render_metrics(frame, metrics_area, app, idx); + render_active_tasks(frame, active_area, app, idx); + render_completed_tasks(frame, completed_area, app, idx); + render_connection_info(frame, conn_area, app, idx); +} + +fn render_summary(frame: &mut Frame, area: Rect, app: &App, idx: usize) { + let worker = &app.workers[idx]; + let status_color = worker.status_color(); + + let line = Line::from(vec![ + Span::styled(" Worker: ", Style::default().add_modifier(Modifier::BOLD)), + Span::styled( + worker.url.as_str(), + Style::default().add_modifier(Modifier::BOLD), + ), + Span::raw(" "), + Span::styled("Status: ", Style::default().fg(Color::DarkGray)), + Span::styled( + worker.status_text(), + Style::default() + .fg(status_color) + .add_modifier(Modifier::BOLD), + ), + Span::raw(" "), + Span::styled("Tasks: ", Style::default().fg(Color::DarkGray)), + Span::styled( + worker.tasks.len().to_string(), + Style::default() + .fg(Color::Cyan) + .add_modifier(Modifier::BOLD), + ), + Span::raw(" "), + Span::styled("Queries: ", Style::default().fg(Color::DarkGray)), + Span::styled( + worker.distinct_query_count().to_string(), + Style::default() + .fg(Color::Cyan) + .add_modifier(Modifier::BOLD), + ), + Span::raw(format!(" [{}/{}]", idx + 1, app.workers.len())), + ]); + + frame.render_widget(Paragraph::new(line), area); +} + +fn render_active_tasks(frame: &mut Frame, area: Rect, app: &mut App, idx: usize) { + let worker = &app.workers[idx]; + let focused = app.worker_state.focused_panel == WorkerPanel::ActiveTasks; + + let header = Row::new(vec!["Query", "Stage", "Task#", "Duration", "Output Rows"]).style( + Style::default() + .fg(Color::White) + .add_modifier(Modifier::BOLD), + ); + + // Sort tasks by duration descending (longest first) + let mut task_indices: Vec = (0..worker.tasks.len()).collect(); + task_indices.sort_by(|&a, &b| { + let dur_a = worker.tasks[a] + .stage_key + .as_ref() + .map(|sk| worker.task_duration(&sk.query_id, sk.stage_id, sk.task_number)) + .unwrap_or_default(); + let dur_b = worker.tasks[b] + .stage_key + .as_ref() + .map(|sk| worker.task_duration(&sk.query_id, sk.stage_id, sk.task_number)) + .unwrap_or_default(); + dur_b.cmp(&dur_a) + }); + + let rows: Vec = task_indices + .iter() + .map(|&i| { + let task = &worker.tasks[i]; + if let Some(sk) = &task.stage_key { + let query_hex = hex_prefix(&sk.query_id, 8); + let duration = worker.task_duration(&sk.query_id, sk.stage_id, sk.task_number); + let dur_str = format_duration(duration); + let dur_style = if duration.as_secs() > 60 { + Style::default().fg(Color::Red) + } else if duration.as_secs() > 30 { + Style::default().fg(Color::Yellow) + } else { + Style::default() + }; + + let output_rows_str = format_row_count(task.output_rows); + + Row::new(vec![ + Cell::from(query_hex).style(Style::default().fg(Color::Cyan)), + Cell::from(format!("S{}", sk.stage_id)), + Cell::from(format!("T{}", sk.task_number)), + Cell::from(dur_str).style(dur_style), + Cell::from(output_rows_str).style(Style::default().fg(Color::DarkGray)), + ]) + } else { + Row::new(vec![ + Cell::from("?"), + Cell::from("?"), + Cell::from("?"), + Cell::from("-"), + Cell::from("-"), + ]) + } + }) + .collect(); + + let title_style = if focused { + Style::default() + .fg(Color::Cyan) + .add_modifier(Modifier::BOLD) + } else { + Style::default().fg(Color::DarkGray) + }; + + let table = Table::new( + rows, + [ + Constraint::Percentage(25), + Constraint::Percentage(15), + Constraint::Percentage(15), + Constraint::Percentage(20), + Constraint::Percentage(25), + ], + ) + .header(header) + .block(Block::default().borders(Borders::ALL).title(Span::styled( + format!(" Active Tasks ({}) ", worker.tasks.len()), + title_style, + ))) + .row_highlight_style( + Style::default() + .bg(Color::DarkGray) + .add_modifier(Modifier::BOLD), + ) + .highlight_symbol("▸ "); + + frame.render_stateful_widget(table, area, &mut app.worker_state.active_table); +} + +fn render_completed_tasks(frame: &mut Frame, area: Rect, app: &mut App, idx: usize) { + let worker = &app.workers[idx]; + let focused = app.worker_state.focused_panel == WorkerPanel::CompletedTasks; + + let header = Row::new(vec!["Query", "Stage", "Task#", "Duration"]).style( + Style::default() + .fg(Color::White) + .add_modifier(Modifier::BOLD), + ); + + let rows: Vec = worker + .completed_tasks + .iter() + .map(|ct| { + let query_hex = hex_prefix(&ct.query_id, 8); + let dur_str = format!("~{}", format_duration(ct.observed_duration)); + + Row::new(vec![ + Cell::from(query_hex).style(Style::default().fg(Color::DarkGray)), + Cell::from(format!("S{}", ct.stage_id)), + Cell::from(format!("T{}", ct.task_number)), + Cell::from(dur_str).style(Style::default().fg(Color::DarkGray)), + ]) + }) + .collect(); + + let title_style = if focused { + Style::default() + .fg(Color::Cyan) + .add_modifier(Modifier::BOLD) + } else { + Style::default().fg(Color::DarkGray) + }; + + let table = Table::new( + rows, + [ + Constraint::Percentage(30), + Constraint::Percentage(20), + Constraint::Percentage(20), + Constraint::Percentage(30), + ], + ) + .header(header) + .block(Block::default().borders(Borders::ALL).title(Span::styled( + format!(" Recently Completed ({}) ", worker.completed_tasks.len()), + title_style, + ))) + .row_highlight_style( + Style::default() + .bg(Color::DarkGray) + .add_modifier(Modifier::BOLD), + ); + + frame.render_stateful_widget(table, area, &mut app.worker_state.completed_table); +} + +fn render_connection_info(frame: &mut Frame, area: Rect, app: &App, idx: usize) { + let worker = &app.workers[idx]; + + let connected_str = worker + .connected_since + .map(|since| format_duration(since.elapsed())) + .unwrap_or_else(|| "-".to_string()); + + let line = if let Some(reason) = worker.disconnect_reason() { + Line::from(vec![ + Span::styled(" Disconnected: ", Style::default().fg(Color::Red)), + Span::styled(reason, Style::default().fg(Color::Red)), + ]) + } else { + Line::from(vec![ + Span::styled(" Connected: ", Style::default().fg(Color::DarkGray)), + Span::raw(connected_str), + Span::styled(" Polls: ", Style::default().fg(Color::DarkGray)), + Span::raw(format!("{}", worker.poll_count)), + ]) + }; + + frame.render_widget(Paragraph::new(line), area); +} + +fn render_metrics(frame: &mut Frame, area: Rect, app: &App, idx: usize) { + let worker = &app.workers[idx]; + let focused = app.worker_state.focused_panel == WorkerPanel::Metrics; + + let title_style = if focused { + Style::default() + .fg(Color::Cyan) + .add_modifier(Modifier::BOLD) + } else { + Style::default().fg(Color::DarkGray) + }; + + let outer_block = Block::default() + .borders(Borders::ALL) + .title(Span::styled(" Metrics ", title_style)); + + let inner = outer_block.inner(area); + frame.render_widget(outer_block, area); + + let [cpu_area, rss_area, rows_area] = Layout::horizontal([ + Constraint::Percentage(33), + Constraint::Percentage(34), + Constraint::Percentage(33), + ]) + .areas(inner); + + render_sparkline_panel( + frame, + cpu_area, + "CPU", + &worker.cpu_history, + 10_000, // 100.00% scaled to 10000 + format!("{:.1}%", worker.cpu_usage_percent), + cpu_color(worker.cpu_usage_percent), + ); + + render_sparkline_panel( + frame, + rss_area, + "Memory", + &worker.rss_history, + 0, // auto-scale + format_bytes(worker.rss_bytes), + Color::Cyan, + ); + + render_sparkline_panel( + frame, + rows_area, + "Rows/poll", + &worker.rows_history, + 0, // auto-scale + format_row_count(worker.rows_history.back().copied().unwrap_or(0)), + Color::Green, + ); +} + +fn render_sparkline_panel( + frame: &mut Frame, + area: Rect, + title: &str, + history: &VecDeque, + max_value: u64, + current_label: String, + bar_color: Color, +) { + let block = Block::default().borders(Borders::ALL).title(Span::styled( + format!(" {title} "), + Style::default().fg(Color::DarkGray), + )); + + let inner = block.inner(area); + frame.render_widget(block, area); + + if inner.height < 2 { + return; + } + + let [spark_area, label_area] = + Layout::vertical([Constraint::Min(1), Constraint::Length(1)]).areas(inner); + + // Right-align data: take only the most recent `width` samples, + // pad with None (absent) so empty region is distinct from real zero readings. + let width = spark_area.width as usize; + let skip = history.len().saturating_sub(width); + let recent: Vec = history.iter().copied().skip(skip).collect(); + + let data: Vec = if recent.len() < width { + let padding = width - recent.len(); + std::iter::repeat_n(SparklineBar::from(None), padding) + .chain(recent.into_iter().map(SparklineBar::from)) + .collect() + } else { + recent.into_iter().map(SparklineBar::from).collect() + }; + + let effective_max = if max_value > 0 { + max_value + } else { + history.iter().copied().max().unwrap_or(1).max(1) + }; + + let sparkline = Sparkline::default() + .data(data) + .max(effective_max) + .style(Style::default().fg(bar_color)) + .absent_value_style(Style::default().fg(Color::DarkGray)); + + frame.render_widget(sparkline, spark_area); + + let label = Line::from(Span::styled( + format!(" {current_label}"), + Style::default().fg(bar_color).add_modifier(Modifier::BOLD), + )); + frame.render_widget(Paragraph::new(label), label_area); +} + +/// Format the first `n` bytes of a byte slice as hex. +fn hex_prefix(bytes: &[u8], n: usize) -> String { + bytes + .iter() + .take(n) + .map(|b| format!("{b:02x}")) + .collect::>() + .join("") +} diff --git a/console/src/worker.rs b/console/src/worker.rs new file mode 100644 index 00000000..ffc4c14e --- /dev/null +++ b/console/src/worker.rs @@ -0,0 +1,393 @@ +use datafusion_distributed::{ + GetClusterWorkersRequest, GetTaskProgressRequest, ObservabilityServiceClient, PingRequest, + TaskProgress, TaskStatus, +}; +use std::collections::{HashMap, HashSet, VecDeque}; +use std::time::{Duration, Instant}; +use tonic::transport::Channel; +use url::Url; + +/// Maximum number of completed tasks to retain per worker. +const MAX_COMPLETED_TASKS: usize = 50; + +/// Number of metric samples to keep per worker (300 * 100ms = 30s of history). +pub(crate) const METRIC_HISTORY_LEN: usize = 300; + +/// Tracks connection and task state for a single worker. +pub(crate) struct WorkerConn { + pub(crate) url: Url, + client: Option>, + pub(crate) connection_status: ConnectionStatus, + pub(crate) tasks: Vec, + pub(crate) completed_tasks: VecDeque, + task_first_seen: HashMap, + pub(crate) connected_since: Option, + pub(crate) poll_count: u64, + last_reconnect_attempt: Option, + last_seen_query_ids: HashSet>, + /// Worker RSS memory in bytes (from WorkerMetrics). + pub(crate) rss_bytes: u64, + /// Worker CPU usage percentage (from WorkerMetrics). + pub(crate) cpu_usage_percent: f64, + /// Sum of output_rows across all running tasks on this worker. + pub(crate) output_rows_total: u64, + /// Time-series history for sparkline graphs. + pub(crate) cpu_history: VecDeque, + pub(crate) rss_history: VecDeque, + pub(crate) rows_history: VecDeque, + /// Previous output_rows_total for computing per-poll delta. + prev_output_rows: u64, +} + +/// Unique key for a task: (query_id, stage_id, task_number). +type TaskKey = (Vec, u64, u64); + +/// Record of a completed task with observed duration. +#[derive(Clone, Debug)] +pub(crate) struct CompletedTaskRecord { + pub(crate) query_id: Vec, + pub(crate) stage_id: u64, + pub(crate) task_number: u64, + pub(crate) observed_duration: Duration, +} + +/// Connection status for a worker. +#[derive(Clone)] +pub(crate) enum ConnectionStatus { + Connecting, + Idle, + Active, + Disconnected { reason: String }, +} + +impl WorkerConn { + /// Create a new WorkerConn in the initial Connecting state. + pub(crate) fn new(url: Url) -> Self { + Self { + url, + client: None, + connection_status: ConnectionStatus::Connecting, + tasks: Vec::new(), + completed_tasks: VecDeque::new(), + task_first_seen: HashMap::new(), + connected_since: None, + poll_count: 0, + last_reconnect_attempt: None, + last_seen_query_ids: HashSet::new(), + rss_bytes: 0, + cpu_usage_percent: 0.0, + output_rows_total: 0, + cpu_history: VecDeque::with_capacity(METRIC_HISTORY_LEN), + rss_history: VecDeque::with_capacity(METRIC_HISTORY_LEN), + rows_history: VecDeque::with_capacity(METRIC_HISTORY_LEN), + prev_output_rows: 0, + } + } + + /// Attempts to establish a gRPC connection to a worker. + pub(crate) async fn try_connect(&mut self) { + self.last_reconnect_attempt = Some(Instant::now()); + + match ObservabilityServiceClient::connect(self.url.to_string()).await { + Ok(mut client) => match client.ping(PingRequest {}).await { + Ok(_) => { + self.client = Some(client); + self.connection_status = ConnectionStatus::Idle; + self.connected_since = Some(Instant::now()); + self.tasks.clear(); + self.task_first_seen.clear(); + } + Err(e) => { + self.client = None; + self.connected_since = None; + self.connection_status = ConnectionStatus::Disconnected { + reason: format!("Ping failed: {e}"), + }; + } + }, + Err(e) => { + self.client = None; + self.connected_since = None; + self.connection_status = ConnectionStatus::Disconnected { + reason: format!("Connection failed: {e}"), + }; + } + } + } + + /// Returns true if the worker should attempt a (re)connection. + pub(crate) fn should_retry_connection(&self) -> bool { + match &self.connection_status { + ConnectionStatus::Connecting => self.last_reconnect_attempt.is_none(), + ConnectionStatus::Disconnected { .. } => { + if let Some(last_attempt) = self.last_reconnect_attempt { + last_attempt.elapsed() >= Duration::from_secs(5) + } else { + true + } + } + _ => false, + } + } + + /// Queries a worker for task progress. + pub(crate) async fn poll(&mut self) { + let Some(client) = &mut self.client else { + return; + }; + + match client.get_task_progress(GetTaskProgressRequest {}).await { + Ok(response) => { + let response = response.into_inner(); + let new_tasks = response.tasks; + + // Store worker-level metrics + if let Some(wm) = &response.worker_metrics { + self.rss_bytes = wm.rss_bytes; + self.cpu_usage_percent = wm.cpu_usage_percent; + } + + // Compute output rows total across running tasks + self.output_rows_total = new_tasks.iter().map(|t| t.output_rows).sum(); + + // Record metric history samples for sparkline graphs + // Scale CPU% (0.0–100.0) by 100 → 0–10000 range for sparkline precision. + push_history( + &mut self.cpu_history, + (self.cpu_usage_percent * 100.0) as u64, + ); + push_history(&mut self.rss_history, self.rss_bytes); + let rows_delta = self.output_rows_total.saturating_sub(self.prev_output_rows); + push_history(&mut self.rows_history, rows_delta); + self.prev_output_rows = self.output_rows_total; + + self.poll_count += 1; + + // Build set of new task keys for quick lookup + let new_task_keys: HashSet = new_tasks + .iter() + .filter_map(|t| { + t.stage_key + .as_ref() + .map(|sk| (sk.query_id.clone(), sk.stage_id, sk.task_number)) + }) + .collect(); + + // Detect completed tasks: tasks that were running but disappeared + for old_task in &self.tasks { + if old_task.status == TaskStatus::Running as i32 { + if let Some(sk) = &old_task.stage_key { + let key = (sk.query_id.clone(), sk.stage_id, sk.task_number); + if !new_task_keys.contains(&key) { + // Task disappeared — assume completed + let observed_duration = self + .task_first_seen + .get(&key) + .map(|first| first.elapsed()) + .unwrap_or_default(); + + self.completed_tasks.push_front(CompletedTaskRecord { + query_id: sk.query_id.clone(), + stage_id: sk.stage_id, + task_number: sk.task_number, + observed_duration, + }); + + // Maintain bounded size + while self.completed_tasks.len() > MAX_COMPLETED_TASKS { + self.completed_tasks.pop_back(); + } + + // Remove from first_seen tracking + self.task_first_seen.remove(&key); + } + } + } + } + + // Track first_seen for new tasks + let now = Instant::now(); + for task in &new_tasks { + if let Some(sk) = &task.stage_key { + let key = (sk.query_id.clone(), sk.stage_id, sk.task_number); + self.task_first_seen.entry(key).or_insert(now); + } + } + + // Clean up first_seen for tasks no longer present + self.task_first_seen + .retain(|k, _| new_task_keys.contains(k)); + + // Update current tasks + self.tasks = new_tasks; + + // Collect current query IDs + let mut current_query_ids = HashSet::new(); + let mut has_running = false; + + for task in &self.tasks { + if let Some(sk) = &task.stage_key { + current_query_ids.insert(sk.query_id.clone()); + if task.status == TaskStatus::Running as i32 { + has_running = true; + } + } + } + + // If a new query starts, clear old completed tasks from previous queries + if has_running && !self.completed_tasks.is_empty() { + let completed_query_ids: HashSet<_> = self + .completed_tasks + .iter() + .map(|t| t.query_id.clone()) + .collect(); + + if !current_query_ids + .iter() + .any(|id| completed_query_ids.contains(id)) + { + self.completed_tasks.clear(); + } + } + + // Update connection status + if has_running { + self.connection_status = ConnectionStatus::Active; + } else { + match &self.connection_status { + ConnectionStatus::Active | ConnectionStatus::Connecting => { + self.connection_status = ConnectionStatus::Idle; + } + ConnectionStatus::Idle => {} + ConnectionStatus::Disconnected { .. } => { + self.connection_status = ConnectionStatus::Idle; + } + } + } + + self.last_seen_query_ids = current_query_ids; + } + Err(e) => { + self.client = None; + self.connected_since = None; + self.tasks.clear(); + self.task_first_seen.clear(); + self.connection_status = ConnectionStatus::Disconnected { + reason: format!("Poll failed: {e}"), + }; + self.last_seen_query_ids.clear(); + // Push zeros so sparkline shows the gap + push_history(&mut self.cpu_history, 0); + push_history(&mut self.rss_history, 0); + push_history(&mut self.rows_history, 0); + } + } + } + + /// Status text for display. + pub(crate) fn status_text(&self) -> &'static str { + match &self.connection_status { + ConnectionStatus::Connecting => "CONNECTING", + ConnectionStatus::Idle => "IDLE", + ConnectionStatus::Active => "ACTIVE", + ConnectionStatus::Disconnected { .. } => "DISCONNECTED", + } + } + + /// Status color for display. + pub(crate) fn status_color(&self) -> ratatui::style::Color { + use ratatui::style::Color; + match self.connection_status { + ConnectionStatus::Connecting => Color::Blue, + ConnectionStatus::Idle => Color::Yellow, + ConnectionStatus::Active => Color::Green, + ConnectionStatus::Disconnected { .. } => Color::Red, + } + } + + /// Sort key for status ordering (disconnected first, then active, idle, connecting). + pub(crate) fn status_sort_key(&self) -> u8 { + match self.connection_status { + ConnectionStatus::Disconnected { .. } => 0, + ConnectionStatus::Active => 1, + ConnectionStatus::Idle => 2, + ConnectionStatus::Connecting => 3, + } + } + + /// Disconnect reason if applicable. + pub(crate) fn disconnect_reason(&self) -> Option<&str> { + if let ConnectionStatus::Disconnected { reason } = &self.connection_status { + Some(reason) + } else { + None + } + } + + /// Duration of the longest-running task on this worker. + pub(crate) fn longest_task_duration(&self) -> Duration { + self.task_first_seen + .values() + .map(|first| first.elapsed()) + .max() + .unwrap_or_default() + } + + /// Number of distinct queries this worker has tasks for. + pub(crate) fn distinct_query_count(&self) -> usize { + let ids: HashSet<_> = self + .tasks + .iter() + .filter_map(|t| t.stage_key.as_ref().map(|sk| &sk.query_id)) + .collect(); + ids.len() + } + + /// Get task duration for a specific task. + pub(crate) fn task_duration( + &self, + query_id: &[u8], + stage_id: u64, + task_number: u64, + ) -> Duration { + let key = (query_id.to_vec(), stage_id, task_number); + self.task_first_seen + .get(&key) + .map(|first| first.elapsed()) + .unwrap_or_default() + } +} + +/// Push a value into a ring buffer, evicting the oldest if at capacity. +fn push_history(buf: &mut VecDeque, value: u64) { + if buf.len() >= METRIC_HISTORY_LEN { + buf.pop_front(); + } + buf.push_back(value); +} + +/// Connects to a seed worker and calls `GetClusterWorkers` to discover all worker URLs. +pub(crate) async fn discover_cluster_workers(seed_url: &Url) -> Result, String> { + let mut client = ObservabilityServiceClient::connect(seed_url.to_string()) + .await + .map_err(|e| format!("Failed to connect to seed worker {seed_url}: {e}"))?; + + client + .ping(PingRequest {}) + .await + .map_err(|e| format!("Seed worker {seed_url} ping failed: {e}"))?; + + let response = client + .get_cluster_workers(GetClusterWorkersRequest {}) + .await + .map_err(|e| format!("GetClusterWorkers failed on {seed_url}: {e}"))?; + + let urls = response + .into_inner() + .worker_urls + .into_iter() + .filter_map(|s| Url::parse(&s).ok()) + .collect(); + + Ok(urls) +} diff --git a/src/flight_service/do_action.rs b/src/flight_service/do_action.rs index c2defb57..14a8cc2a 100644 --- a/src/flight_service/do_action.rs +++ b/src/flight_service/do_action.rs @@ -38,7 +38,7 @@ pub struct TaskData { /// Task context suitable for execute different partitions from the same task. pub(super) task_ctx: Arc, /// Plan to be executed. - pub(super) plan: Arc, + pub(crate) plan: Arc, /// `num_partitions_remaining` is initialized to the total number of partitions in the task (not /// only tasks in the partition group). This is decremented for each request to the endpoint /// for this task. Once this count is zero, the task is likely complete. The task may not be diff --git a/src/flight_service/worker.rs b/src/flight_service/worker.rs index c8b8fc8b..d98cb497 100644 --- a/src/flight_service/worker.rs +++ b/src/flight_service/worker.rs @@ -2,7 +2,9 @@ use crate::flight_service::WorkerSessionBuilder; use crate::flight_service::do_action::{INIT_ACTION_TYPE, TaskData}; use crate::flight_service::single_write_multi_read::SingleWriteMultiRead; use crate::protobuf::StageKey; -use crate::{DefaultSessionBuilder, ObservabilityServiceImpl}; +use crate::{ + DefaultSessionBuilder, ObservabilityServiceImpl, ObservabilityServiceServer, WorkerResolver, +}; use arrow_flight::flight_service_server::{FlightService, FlightServiceServer}; use arrow_flight::{ Action, ActionType, Criteria, Empty, FlightData, FlightDescriptor, FlightInfo, @@ -133,8 +135,19 @@ impl Worker { .max_encoding_message_size(usize::MAX) } - pub fn observability_service(&self) -> ObservabilityServiceImpl { - ObservabilityServiceImpl::new(self.task_data_entries.clone()) + /// Creates an [`ObservabilityServiceServer`] that exposes task progress and cluster + /// worker discovery via the provided [`WorkerResolver`]. + /// + /// The returned server is meant to be added to the same [`tonic::transport::Server`] as the + /// Flight service — gRPC multiplexes both services on a single port. + pub fn with_observability_service( + &self, + worker_resolver: Arc, + ) -> ObservabilityServiceServer { + ObservabilityServiceServer::new(ObservabilityServiceImpl::new( + self.task_data_entries.clone(), + worker_resolver, + )) } /// Returns the number of cached task entries currently held by this worker. diff --git a/src/lib.rs b/src/lib.rs index eb931440..f14a32e3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -46,9 +46,10 @@ pub use stage::{ }; pub use observability::{ - GetTaskProgressRequest, GetTaskProgressResponse, ObservabilityService, - ObservabilityServiceClient, ObservabilityServiceImpl, ObservabilityServiceServer, PingRequest, - PingResponse, StageKey as ObservabilityStageKey, TaskProgress, TaskStatus, + GetClusterWorkersRequest, GetClusterWorkersResponse, GetTaskProgressRequest, + GetTaskProgressResponse, ObservabilityService, ObservabilityServiceClient, + ObservabilityServiceImpl, ObservabilityServiceServer, PingRequest, PingResponse, + StageKey as ObservabilityStageKey, TaskProgress, TaskStatus, WorkerMetrics, }; pub use protobuf::StageKey; diff --git a/src/observability/generated/observability.rs b/src/observability/generated/observability.rs index b8e56a86..cc7fe513 100644 --- a/src/observability/generated/observability.rs +++ b/src/observability/generated/observability.rs @@ -28,11 +28,30 @@ pub struct TaskProgress { pub completed_partitions: u64, #[prost(enumeration = "TaskStatus", tag = "4")] pub status: i32, + #[prost(uint64, tag = "5")] + pub output_rows: u64, +} +/// Worker-level system metrics +#[derive(Clone, Copy, PartialEq, ::prost::Message)] +pub struct WorkerMetrics { + #[prost(uint64, tag = "1")] + pub rss_bytes: u64, + #[prost(double, tag = "2")] + pub cpu_usage_percent: f64, } #[derive(Clone, PartialEq, ::prost::Message)] pub struct GetTaskProgressResponse { #[prost(message, repeated, tag = "1")] pub tasks: ::prost::alloc::vec::Vec, + #[prost(message, optional, tag = "2")] + pub worker_metrics: ::core::option::Option, +} +#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)] +pub struct GetClusterWorkersRequest {} +#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] +pub struct GetClusterWorkersResponse { + #[prost(string, repeated, tag = "1")] + pub worker_urls: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, } #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] #[repr(i32)] @@ -186,6 +205,25 @@ pub mod observability_service_client { )); self.inner.unary(req, path, codec).await } + pub async fn get_cluster_workers( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result, tonic::Status> + { + self.inner.ready().await.map_err(|e| { + tonic::Status::unknown(format!("Service was not ready: {}", e.into())) + })?; + let codec = tonic_prost::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/observability.ObservabilityService/GetClusterWorkers", + ); + let mut req = request.into_request(); + req.extensions_mut().insert(GrpcMethod::new( + "observability.ObservabilityService", + "GetClusterWorkers", + )); + self.inner.unary(req, path, codec).await + } } } /// Generated server implementations. @@ -209,6 +247,10 @@ pub mod observability_service_server { &self, request: tonic::Request, ) -> std::result::Result, tonic::Status>; + async fn get_cluster_workers( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status>; } #[derive(Debug)] pub struct ObservabilityServiceServer { @@ -365,6 +407,49 @@ pub mod observability_service_server { }; Box::pin(fut) } + "/observability.ObservabilityService/GetClusterWorkers" => { + #[allow(non_camel_case_types)] + struct GetClusterWorkersSvc(pub Arc); + impl + tonic::server::UnaryService + for GetClusterWorkersSvc + { + type Response = super::GetClusterWorkersResponse; + type Future = BoxFuture, tonic::Status>; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::get_cluster_workers(&inner, request) + .await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = GetClusterWorkersSvc(inner); + let codec = tonic_prost::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } _ => Box::pin(async move { let mut response = http::Response::new(tonic::body::Body::default()); let headers = response.headers_mut(); diff --git a/src/observability/mod.rs b/src/observability/mod.rs index d7da760a..c48906ae 100644 --- a/src/observability/mod.rs +++ b/src/observability/mod.rs @@ -7,7 +7,8 @@ pub use generated::observability::observability_service_server::{ }; pub use generated::observability::{ - GetTaskProgressRequest, GetTaskProgressResponse, PingRequest, PingResponse, StageKey, - TaskProgress, TaskStatus, + GetClusterWorkersRequest, GetClusterWorkersResponse, GetTaskProgressRequest, + GetTaskProgressResponse, PingRequest, PingResponse, StageKey, TaskProgress, TaskStatus, + WorkerMetrics, }; pub use service::ObservabilityServiceImpl; diff --git a/src/observability/proto/observability.proto b/src/observability/proto/observability.proto index 2b38f637..6c0c403d 100644 --- a/src/observability/proto/observability.proto +++ b/src/observability/proto/observability.proto @@ -4,6 +4,7 @@ package observability; service ObservabilityService { rpc Ping (PingRequest) returns (PingResponse); rpc GetTaskProgress (GetTaskProgressRequest) returns (GetTaskProgressResponse); + rpc GetClusterWorkers (GetClusterWorkersRequest) returns (GetClusterWorkersResponse); } message PingRequest {} @@ -26,6 +27,7 @@ message TaskProgress { uint64 total_partitions = 2; uint64 completed_partitions = 3; TaskStatus status = 4; + uint64 output_rows = 5; } enum TaskStatus { @@ -33,6 +35,19 @@ enum TaskStatus { TASK_STATUS_RUNNING = 1; } +// Worker-level system metrics +message WorkerMetrics { + uint64 rss_bytes = 1; + double cpu_usage_percent = 2; +} + message GetTaskProgressResponse { repeated TaskProgress tasks = 1; + WorkerMetrics worker_metrics = 2; +} + +message GetClusterWorkersRequest {} + +message GetClusterWorkersResponse { + repeated string worker_urls = 1; } diff --git a/src/observability/service.rs b/src/observability/service.rs index ad43d991..00672677 100644 --- a/src/observability/service.rs +++ b/src/observability/service.rs @@ -1,26 +1,81 @@ use crate::flight_service::{SingleWriteMultiRead, TaskData}; +use crate::networking::WorkerResolver; use crate::protobuf::StageKey; use datafusion::error::DataFusionError; +use datafusion::physical_plan::ExecutionPlan; use moka::future::Cache; use std::sync::Arc; +#[cfg(feature = "system-metrics")] +use std::time::Duration; +#[cfg(feature = "system-metrics")] +use sysinfo::{Pid, ProcessRefreshKind}; +#[cfg(feature = "system-metrics")] +use tokio::sync::watch; use tonic::{Request, Response, Status}; use super::{ - GetTaskProgressResponse, ObservabilityService, TaskProgress, TaskStatus, - generated::observability::{GetTaskProgressRequest, PingRequest, PingResponse}, + GetClusterWorkersResponse, GetTaskProgressResponse, ObservabilityService, TaskProgress, + TaskStatus, WorkerMetrics, + generated::observability::{ + GetClusterWorkersRequest, GetTaskProgressRequest, PingRequest, PingResponse, + }, }; type ResultTaskData = Result>; pub struct ObservabilityServiceImpl { task_data_entries: Arc>>>, + worker_resolver: Arc, + #[cfg(feature = "system-metrics")] + system: watch::Receiver, } impl ObservabilityServiceImpl { pub fn new( task_data_entries: Arc>>>, + worker_resolver: Arc, ) -> Self { - Self { task_data_entries } + #[cfg(feature = "system-metrics")] + let (tx, rx) = tokio::sync::watch::channel(WorkerMetrics::default()); + + #[cfg(feature = "system-metrics")] + { + let pid = Pid::from_u32(std::process::id()); + let mut sys = sysinfo::System::new_all(); + + // Spawn background task to periodically collect and send system metrics. + tokio::task::spawn(async move { + loop { + sys.refresh_process_specifics( + pid, + ProcessRefreshKind::new().with_cpu().with_memory(), + ); + + if let Some(process) = sys.process(pid) { + let num_cpus = std::thread::available_parallelism() + .map(|n| n.get() as f64) + .unwrap_or(1.0); + let metrics = WorkerMetrics { + rss_bytes: process.memory(), + cpu_usage_percent: process.cpu_usage() as f64 / num_cpus, + }; + if tx.send(metrics).is_err() { + break; + } + } else if tx.send(WorkerMetrics::default()).is_err() { + break; + }; + + tokio::time::sleep(Duration::from_millis(100)).await; + } + }); + } + Self { + task_data_entries, + worker_resolver, + #[cfg(feature = "system-metrics")] + system: rx, + } } } @@ -44,17 +99,50 @@ impl ObservabilityService for ObservabilityServiceImpl { let total_partitions = task_data.total_partitions() as u64; let remaining = task_data.num_partitions_remaining() as u64; let completed_partitions = total_partitions.saturating_sub(remaining); + let output_rows = output_rows_from_plan(&task_data.plan); tasks.push(TaskProgress { stage_key: Some(convert_stage_key(&internal_key)), total_partitions, completed_partitions, status: TaskStatus::Running as i32, + output_rows, }); } } - Ok(Response::new(GetTaskProgressResponse { tasks })) + let worker_metrics = Some(self.collect_worker_metrics()); + + Ok(Response::new(GetTaskProgressResponse { + tasks, + worker_metrics, + })) + } + + async fn get_cluster_workers( + &self, + _request: Request, + ) -> Result, Status> { + let urls = self + .worker_resolver + .get_urls() + .map_err(|e| Status::internal(format!("Failed to resolve workers: {e}")))?; + + let worker_urls = urls.into_iter().map(|url| url.to_string()).collect(); + + Ok(Response::new(GetClusterWorkersResponse { worker_urls })) + } +} + +impl ObservabilityServiceImpl { + fn collect_worker_metrics(&self) -> WorkerMetrics { + #[cfg(not(feature = "system-metrics"))] + { + WorkerMetrics::default() + } + + #[cfg(feature = "system-metrics")] + return *self.system.borrow(); } } @@ -66,3 +154,8 @@ fn convert_stage_key(key: &StageKey) -> super::StageKey { task_number: key.task_number, } } + +/// Extracts output rows from the root plan node's metrics. +fn output_rows_from_plan(plan: &Arc) -> u64 { + plan.metrics().and_then(|m| m.output_rows()).unwrap_or(0) as u64 +}