diff --git a/Cargo.lock b/Cargo.lock index bfd1f31f4..b17a29873 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1070,11 +1070,17 @@ name = "ampcc" version = "0.1.0" dependencies = [ "admin-client", + "amp-client", "anyhow", + "arrow", + "chrono", "crossterm 0.29.0", + "csv", "datasets-common", "directories", "figment", + "futures", + "hex", "ratatui", "reqwest", "serde", diff --git a/crates/bin/ampcc/CLAUDE.md b/crates/bin/ampcc/CLAUDE.md new file mode 100644 index 000000000..17c040f73 --- /dev/null +++ b/crates/bin/ampcc/CLAUDE.md @@ -0,0 +1,766 @@ +# Ampcc - Amp Command & Control TUI + +## Overview + +**Ampcc** (Amp Command & Control) is a terminal-based user interface (TUI) for managing and monitoring Amp datasets and jobs. It provides an interactive dashboard for exploring datasets, monitoring ETL jobs, and managing worker nodes across both local Amp instances and the public dataset registry. + +## Quick Reference + +| Need to... | Look at... | +|------------|-----------| +| Understand overall architecture | [Architecture](#architecture) section | +| Modify UI rendering | `ui.rs` (~1,285 lines) | +| Add new keyboard shortcuts | `main.rs` event handling (~722 lines) | +| Change application state | `app.rs` (~1,035 lines) | +| Add registry API features | `registry.rs` (~247 lines) | +| Modify configuration | `config.rs` (~50 lines) | + +## Architecture + +### Module Structure + +``` +ampcc/ +├── main.rs (722 lines) # Event loop & input handling +├── app.rs (1,035 lines) # Application state & business logic +├── ui.rs (1,285 lines) # Ratatui rendering +├── config.rs (50 lines) # Configuration management +└── registry.rs (247 lines) # Registry API client +``` + +### Data Flow + +``` +User Input (keyboard/mouse) + ↓ +main.rs (event loop) + ↓ +app.rs (state updates) + ↓ +Async Tasks (background HTTP requests) + ↓ +mpsc channel (results back to main loop) + ↓ +ui.rs (render updated state) + ↓ +Terminal (display) +``` + +### External Integration + +``` +ampcc (TUI) + ├── admin-client → Local Amp Instance + │ └── HTTP API: Jobs, Workers, Datasets, Manifests + │ └── Endpoints: localhost:1610 (default) + │ + └── registry.rs → Public Registry API + └── HTTPS API: Datasets, Versions, Manifests + └── Endpoint: api.registry.amp.staging.thegraph.com (default) +``` + +## Key Components + +### 1. Application State (`app.rs`) + +**Purpose**: Core business logic and state management + +**Key Types**: +- `App` - Main application state struct + - Holds UI state, data, and clients + - Manages dataset lists, job lists, worker lists + - Tracks scroll positions, selection indices + - Contains search state and error messages + +- `ActivePane` - Focus tracking enum + - `Header`, `Datasets`, `Jobs`, `Workers`, `Manifest`, `Schema`, `Detail` + - Determines which section responds to keyboard input + - Used by UI to highlight active section + +- `ContentView` - What's displayed in content pane + - `DatasetManifest` - JSON manifest with syntax highlighting + - `DatasetSchema` - Parsed Arrow type information + - `JobDetail` - Selected job information + - `WorkerDetail` - Selected worker node details + +- `DataSource` - Operating mode + - `Local` - Connected to local Amp admin API + - `Registry` - Connected to public dataset registry + +- `DatasetEntry` - Unified dataset representation + - Works for both Local and Registry sources + - Contains: namespace, name, description, URL + - Tracks expansion state (collapsed/expanded with versions) + - Version list populated on demand + +- `VersionEntry` - Dataset version information + - Version string, status, timestamp + - Optional digest for validation + +- `InspectResult` - Parsed schema from manifest + - Extracts Arrow field definitions + - Formats complex types for display + +**Key Behaviors**: +- `fetch_datasets()` - Load from Local or Registry source +- `fetch_manifest()` - Retrieve dataset manifest JSON +- `inspect_manifest()` - Parse Arrow schema from manifest +- `toggle_dataset()` - Expand/collapse version list +- `get_selected_dataset()` - Map flat index to dataset +- `filter_datasets()` - Search by keyword +- `switch_source()` - Toggle between Local and Registry + +### 2. Event Loop (`main.rs`) + +**Purpose**: Terminal setup, input handling, async task coordination + +**Key Features**: + +**Terminal Management**: +- Raw mode enable/disable for direct keyboard control +- Alternate screen buffer for clean exit +- Mouse capture for click handling +- Panic handler ensures terminal restoration + +**Event Handling**: +- Non-blocking event polling with `crossterm::event::poll()` +- Keyboard shortcut processing +- Mouse click detection for pane switching +- Async channel for background task results + +**Async Task Spawning**: +- `spawn_fetch_manifest(dataset_ref)` - Get dataset manifest +- `spawn_fetch_jobs()` - List jobs from admin API +- `spawn_fetch_workers()` - List worker nodes +- `spawn_fetch_worker_detail(node_id)` - Get detailed worker info +- `spawn_stop_job(job_id)` - Stop a running job +- `spawn_delete_job(job_id)` - Delete a terminal job + +**Auto-Refresh**: +- 10-second interval for jobs/workers in Local mode +- Controlled by `last_refresh` timestamp +- Only refreshes when in Local source + +**Keyboard Shortcuts**: + +| Key | Action | Notes | +|-----|--------|-------| +| `q` / `Ctrl+C` | Quit application | Restores terminal (cancels streaming if active) | +| `1` | Switch to Local source | Loads local datasets | +| `2` | Switch to Registry source | Loads registry datasets | +| `/` | Enter search mode | Search datasets by keyword | +| `Esc` | Exit search mode | Clear search filter | +| `r` | Refresh | Context-sensitive refresh | +| `Tab` | Next pane | Cycles through panes | +| `Shift+Tab` | Previous pane | Reverse cycle | +| `↑` / `k` | Move up | Navigate lists | +| `↓` / `j` | Move down | Navigate lists | +| `Ctrl+U` | Page up | Half-page scroll | +| `Ctrl+D` | Page down | Half-page scroll | +| `Enter` | Select/Expand | Expand dataset or show details | +| `s` | Stop job | Only in Jobs pane, for running jobs | +| `d` | Delete job | Only in Jobs pane, for terminal jobs | +| `Q` | Open SQL query panel | In Datasets pane (Local mode only) | +| `Ctrl+Enter` | Execute query | In Query input mode | +| `Ctrl+R` | History search | Reverse search through query history | +| `Ctrl+F` | Favorites panel | Open/close favorites panel | +| `*` / `F` | Toggle favorite | Add/remove current query from favorites | +| `T` | Template picker | Open query template selection | +| `E` | Export results | Export query results to CSV | +| `s` | Sort by column | In Query Results pane | +| `S` | Clear sort | In Query Results pane | + +**Mouse Support**: +- Click to switch active pane +- Click to select items in lists +- Automatic focus management + +### 3. UI Rendering (`ui.rs`) + +**Purpose**: All terminal UI rendering using Ratatui framework + +**Color Theme** (The Graph's official palette): +- Primary: Purple `#6F4CFF`, Dark `#0C0A1D`, Gray `#494755` +- Secondary: Blue `#4C66FF`, Aqua `#66D8FF`, Green `#4BCA81`, Pink `#FF79C6`, Yellow `#FFA801` +- Status: Green for success, red for errors, yellow for warnings + +**Layout Structure**: +``` +┌─────────────────────────────────────────────┐ +│ Header (source info, status) │ ~3 lines +├─────────────┬───────────────────────────────┤ +│ │ │ +│ Sidebar │ Content Pane │ Main area +│ (35%) │ (65%) │ (remaining height) +│ │ │ +│ - Datasets │ - Dataset Manifest │ +│ - Jobs │ - Dataset Schema │ +│ - Workers │ - Job Details │ +│ │ - Worker Details │ +├─────────────┴───────────────────────────────┤ +│ Footer (help text, status messages) │ 1 line +└─────────────────────────────────────────────┘ +``` + +**Key Rendering Functions**: + +- `draw(frame, app)` - Main entry point + - Lays out header/main/footer sections + - Routes to splash or main view + +- `draw_header(frame, area, app)` - Status bar + - Shows current source (Local/Registry) + - Displays error messages if present + - Source indicators with color coding + +- `draw_splash(frame, area)` - Welcome screen + - The Graph ASCII logo + - Instructions to switch sources + - Displayed until data loads + +- `draw_main(frame, area, app)` - Main layout manager + - Splits into sidebar (35%) and content (65%) + - Routes to Local or Registry sidebar + +- `draw_sidebar_local(frame, area, app)` - Local mode sidebar + - Three sections: Datasets, Jobs, Workers + - Dynamic height allocation + - Scrollable lists with highlighting + +- `draw_sidebar_registry(frame, area, app)` - Registry mode sidebar + - Single section: Datasets only + - Full height for dataset list + +- `draw_datasets_section(frame, area, app, is_active)` - Dataset list + - Expandable tree view (▾ expanded, ▸ collapsed) + - Shows versions when expanded + - Indented version entries + - Search filtering with match count + +- `draw_jobs_section(frame, area, app, is_active)` - Job list + - Status icons: ▶ Running, ◷ Pending, ✓ Success, ✗ Failed + - Color-coded status (green/yellow/red) + - Job ID and dataset reference + +- `draw_workers_section(frame, area, app, is_active)` - Worker list + - Node ID display + - Status indicator + - Selection highlighting + +- `draw_content(frame, area, app)` - Content display + - Routes to manifest, schema, job detail, or worker detail + - Scrollable content with scrollbar + - Syntax highlighting for JSON + - Formatted table for schemas + +- `draw_footer(frame, area, app)` - Status/help text + - Search mode indicator + - Context-sensitive help text + - Status messages + +**UI Patterns**: + +- **Active Pane Highlighting**: Aqua border when focused, gray when not +- **Scrollbar Indicators**: Shows position in long content +- **Responsive Layout**: Adjusts to terminal size +- **Smart Selection**: Highlights selected items with different background +- **Status Icons**: Visual feedback for job states +- **Expandable Lists**: Toggle visibility of child items +- **Search Highlighting**: Shows match count in header + +### 4. Configuration Management (`config.rs`) + +**Purpose**: Load and manage application configuration + +**Configuration Sources** (priority order): +1. Configuration file: `~/.config/ampcc/config.toml` +2. Environment variables: `AMP_CC_*` prefix + +**Configuration Options**: + +```rust +pub struct Config { + /// gRPC endpoint for local query service + /// Default: "grpc://localhost:1602" + /// Env: AMP_CC_LOCAL_QUERY_URL + pub local_query_url: String, + + /// HTTP endpoint for local admin API + /// Default: "http://localhost:1610" + /// Env: AMP_CC_LOCAL_ADMIN_URL + pub local_admin_url: String, + + /// HTTPS endpoint for public registry API + /// Default: "https://api.registry.amp.staging.thegraph.com" + /// Env: AMP_CC_REGISTRY_URL + pub registry_url: String, + + /// Which source to load on startup ("local" or "registry") + /// Default: "registry" + /// Env: AMP_CC_DEFAULT_SOURCE + pub default_source: String, +} +``` + +**Example Configuration File**: + +```toml +# ~/.config/ampcc/config.toml +local_query_url = "grpc://localhost:1602" +local_admin_url = "http://localhost:1610" +registry_url = "https://api.registry.amp.staging.thegraph.com" +default_source = "registry" +``` + +**Loading with Figment**: +- Merges configuration from multiple sources +- Environment variables override file values +- Provides sensible defaults for all options + +### 5. Registry API Client (`registry.rs`) + +**Purpose**: HTTP client for the public Amp dataset registry + +**Key Types**: + +- `RegistryClient` - HTTP client wrapper + - Built on `reqwest::Client` + - Optional auth token support + - Base URL configuration + +- `RegistryDataset` - Dataset from registry API + - Namespace, name, description + - URLs, metadata + - Version count, latest version + +- `RegistryVersion` - Version information + - Version string, digest + - Created timestamp + - Size, file count + +- `RegistryError` - Error handling + - HTTP errors, JSON parsing errors + - Connection failures + +**API Methods**: + +```rust +impl RegistryClient { + /// Create new client with base URL + pub fn new(base_url: impl Into) -> Result + + /// Create client with auth token + pub fn with_auth_token(base_url: impl Into, token: String) -> Result + + /// List datasets (paginated) + pub async fn list_datasets(&self, page: u32) -> Result, RegistryError> + + /// Search datasets by keyword + pub async fn search_datasets(&self, query: &str) -> Result, RegistryError> + + /// Get versions for a dataset + pub async fn get_versions(&self, namespace: &str, name: &str) -> Result, RegistryError> + + /// Get manifest JSON for a dataset version + pub async fn get_manifest(&self, namespace: &str, name: &str, version: &str) -> Result + + /// Test connection to registry + pub async fn test_connection(&self) -> Result<(), RegistryError> + + /// Auto-load auth token from file system + pub fn load_auth_token() -> Option +} +``` + +**Authentication**: +- Reads token from `AMP_AUTH_TOKEN` environment variable +- Falls back to `~/.amp/cache/amp_cli_auth` file +- Sends as Bearer token in Authorization header +- Optional - client works without auth for public datasets + +**API Endpoints**: +- `GET /datasets?page={page}` - List datasets +- `GET /datasets/search?q={query}` - Search datasets +- `GET /datasets/{namespace}/{name}/versions` - List versions +- `GET /datasets/{namespace}/{name}/versions/{version}/manifest` - Get manifest + +## Dependencies + +### External Crates + +| Crate | Purpose | Notes | +|-------|---------|-------| +| `ratatui` (0.30) | Terminal UI framework | Core rendering engine | +| `crossterm` (0.29.0) | Terminal control | Event handling, raw mode | +| `tokio` | Async runtime | Background tasks | +| `reqwest` | HTTP client | Registry API calls | +| `serde` / `serde_json` | Serialization | JSON parsing | +| `figment` | Configuration | Multi-source config loading | +| `thiserror` | Error handling | Error type derivation | +| `anyhow` | Error handling | Error context | +| `directories` (6.0) | Path resolution | Config file location | +| `urlencoding` (2.1) | URL encoding | Search query encoding | + +### Internal Amp Crates + +| Crate | Purpose | Types/Functions Used | +|-------|---------|---------------------| +| `admin-client` | Admin API client | `Client`, `JobInfo`, `WorkerInfo`, `WorkerDetailResponse` | +| `worker` | Worker types | `JobId`, `NodeId` | +| `datasets-common` | Dataset types | `Reference`, `Revision` | + +## Development Guidelines + +### Adding New Features + +**1. Adding a Keyboard Shortcut**: +- Edit `main.rs` in the event handling section +- Add case to `match event` block +- Update `draw_footer()` in `ui.rs` with help text +- Consider context sensitivity (which pane is active) + +**2. Adding a New Pane**: +- Add variant to `ActivePane` enum in `app.rs` +- Add tab navigation logic in `main.rs` +- Create `draw_xxx_section()` function in `ui.rs` +- Update layout in `draw_sidebar_local()` or `draw_main()` + +**3. Adding Registry API Feature**: +- Add method to `RegistryClient` in `registry.rs` +- Define response types with `#[derive(Deserialize)]` +- Handle errors with `RegistryError` +- Add async spawn function in `main.rs` +- Update `App` state in `app.rs` to store results + +**4. Modifying UI Layout**: +- Edit `ui.rs` rendering functions +- Use Ratatui's `Layout::default()` for splitting +- Maintain color theme constants at top of file +- Test with different terminal sizes + +### Testing Locally + +**Prerequisites**: +- Local Amp instance running (ampd, controller, workers) +- Admin API accessible at `http://localhost:1610` (or configured URL) +- Optional: Auth token for registry access + +**Running**: +```bash +# Run with default config +cargo run -p ampcc + +# Run with custom config +AMP_CC_LOCAL_ADMIN_URL=http://localhost:8080 cargo run -p ampcc + +# Run with auth token +AMP_AUTH_TOKEN=your_token_here cargo run -p ampcc +``` + +**Testing Checklist**: +- [ ] Both sources (Local and Registry) load data +- [ ] Dataset expansion shows versions +- [ ] Manifest and schema display correctly +- [ ] Jobs list updates (if Local mode) +- [ ] Workers list updates (if Local mode) +- [ ] Search filtering works +- [ ] Keyboard navigation responsive +- [ ] Mouse clicks work +- [ ] Auto-refresh works (10-second interval) +- [ ] Job stop/delete operations work +- [ ] Terminal restores properly on exit + +### Code Style + +**Follow Amp project conventions**: +- Use `/code-format` skill after editing +- Use `/code-check` skill for compilation and clippy +- Fix all clippy warnings before committing +- Use `thiserror` for error types +- Use `anyhow` for error context in functions +- Prefer `async/await` over callbacks + +**UI-Specific Conventions**: +- Keep color constants at top of `ui.rs` +- Use descriptive function names: `draw_xxx_section()` +- Separate layout logic from rendering logic +- Document complex UI calculations + +### Performance Considerations + +**Efficient Rendering**: +- Only redraw when `needs_redraw` flag is true +- Reset flag after successful render +- Avoid unnecessary allocations in render functions + +**Async Task Management**: +- Spawn tasks for HTTP requests (don't block UI) +- Use channels for result communication +- Handle task failures gracefully + +**State Management**: +- Lazy load version lists (only when dataset expanded) +- Cache manifest and schema results +- Clear stale data when switching sources + +## Common Tasks + +### Debugging Connection Issues + +**Local Admin API not responding**: +1. Check if ampd is running: `ps aux | grep ampd` +2. Verify admin API port: `netstat -an | grep 1610` +3. Test endpoint: `curl http://localhost:1610/api/datasets` +4. Check config: `cat ~/.config/ampcc/config.toml` +5. Override with env var: `AMP_CC_LOCAL_ADMIN_URL=http://localhost:8080` + +**Registry API failing**: +1. Test connection: `curl https://api.registry.amp.staging.thegraph.com/datasets` +2. Check auth token: `cat ~/.amp/cache/amp_cli_auth` +3. Set token manually: `AMP_AUTH_TOKEN=your_token` +4. Review error message in ampcc header + +### Adding Custom Color Theme + +Edit color constants in `ui.rs`: + +```rust +const COLOR_PRIMARY: Color = Color::Rgb(111, 76, 255); // #6F4CFF +const COLOR_SECONDARY: Color = Color::Rgb(76, 102, 255); // #4C66FF +// ... modify as needed +``` + +### Extending Dataset Display + +To show additional dataset metadata: + +1. Update `DatasetEntry` struct in `app.rs` with new fields +2. Modify `fetch_datasets()` to populate new fields from API +3. Update `draw_datasets_section()` in `ui.rs` to render new fields +4. Adjust layout to accommodate additional text + +## Architecture Patterns + +### Async Task Pattern + +All HTTP requests follow this pattern: + +```rust +// 1. Spawn async task +let tx = tx.clone(); +let client = client.clone(); +tokio::spawn(async move { + // 2. Make HTTP request + let result = client.some_request().await; + + // 3. Send result back via channel + let _ = tx.send(AppEvent::SomeResult(result)).await; +}); + +// 4. In main loop, receive result +match rx.recv().await { + Some(AppEvent::SomeResult(result)) => { + // 5. Update app state + app.update_from_result(result); + needs_redraw = true; + } + // ... +} +``` + +**Benefits**: +- UI never blocks +- Multiple requests can run concurrently +- Clean separation of async and UI code + +### State Update Pattern + +All state modifications go through `App` methods: + +```rust +impl App { + // Encapsulate state changes + pub fn set_datasets(&mut self, datasets: Vec) { + self.datasets = datasets; + self.datasets_selected = 0; + self.datasets_scroll_state = ScrollState::default(); + } + + // Validate state consistency + pub fn select_dataset(&mut self, index: usize) { + if index < self.datasets.len() { + self.datasets_selected = index; + } + } +} +``` + +**Benefits**: +- Single source of truth +- State consistency guaranteed +- Easier to debug and test + +### Error Handling Pattern + +Use `Result` types throughout: + +```rust +// Define error types with thiserror +#[derive(Debug, thiserror::Error)] +pub enum RegistryError { + #[error("HTTP request failed: {0}")] + RequestFailed(#[from] reqwest::Error), + + #[error("JSON parse error: {0}")] + JsonError(#[from] serde_json::Error), +} + +// Return errors, don't panic +pub async fn fetch_data(&self) -> Result { + let response = self.client.get(url).send().await?; + let data = response.json().await?; + Ok(data) +} + +// Handle errors in main loop +match fetch_data().await { + Ok(data) => app.set_data(data), + Err(e) => app.set_error(format!("Failed: {}", e)), +} +``` + +**Benefits**: +- Explicit error handling +- Clean error propagation with `?` +- User-friendly error messages + +## SQL Query Panel + +The SQL Query Panel allows users to execute SQL queries against datasets in Local mode. + +### Features + +**Query Input**: +- Multi-line SQL input with syntax highlighting +- Enter for newlines, Ctrl+Enter to execute +- Query history navigation with Up/Down arrows +- Reverse history search with Ctrl+R +- Query templates with `T` key + +**Query Results**: +- Streaming results display with progressive loading +- Results appear as batches arrive from Flight service +- Real-time row count updates during streaming +- Ctrl+C cancels streaming query mid-execution +- Sortable columns with `s` key + column number +- Export to CSV with `E` key +- Auto-sized column widths based on content + +**Favorites**: +- Toggle favorite with `*` or `F` key +- View favorites panel with Ctrl+F +- Persistent storage to `~/.config/ampcc/favorites.json` + +### Streaming Query Execution + +Queries execute via Flight protocol with streaming results: + +``` +┌─────────────────────────────────────────────────────────────┐ +│ Query Results (streaming... 1,234 rows) [Ctrl+C to cancel] │ +├─────────────────────────────────────────────────────────────┤ +│ block_number │ hash │ +│ 1000001 │ 0xabc123... │ +│ 1000002 │ 0xdef456... │ +│ ... (streaming) │ +└─────────────────────────────────────────────────────────────┘ +``` + +**State during streaming**: +- `app.query_streaming`: true while results are streaming +- `app.query_cancel_token`: CancellationToken for Ctrl+C cancellation +- `results.streaming`: true while batches are arriving +- `results.stream_complete`: true when all batches received + +**Events**: +- `QueryColumnsReceived`: First batch, initializes columns +- `QueryBatchReceived`: Subsequent batches, appends rows +- `QueryStreamComplete`: All batches received +- `QueryCancelled`: User cancelled with Ctrl+C + +### History Storage + +Query history persisted to `~/.config/ampcc/history.json`: +- Per-dataset history with global fallback +- Automatic save on exit, load on startup +- Search with Ctrl+R + +## Known Limitations + +1. **No pagination UI** - Registry datasets fetched one page at a time, no UI for next/previous page +2. **Search is client-side** - Filters loaded datasets, doesn't use registry search API +3. **No worker detail in Registry mode** - Worker management only works with Local source +4. **No job creation UI** - Can only monitor/stop/delete existing jobs +5. **No configuration UI** - Must edit config file or set env vars manually +6. **Limited schema parsing** - Only handles Arrow types, doesn't show full dataset metadata + +## Future Enhancement Ideas + +- [ ] Add pagination controls for registry datasets +- [ ] Server-side search using registry API +- [ ] Job creation wizard (select dataset, configure params) +- [ ] Configuration editor (interactive TUI for settings) +- [ ] Enhanced schema viewer (show constraints, metadata) +- [ ] Job logs viewer (tail job output in real-time) +- [ ] Worker metrics dashboard (CPU, memory, job count) +- [ ] Export dataset list to CSV/JSON +- [ ] Bookmark favorite datasets +- [ ] Multi-source comparison view + +## Troubleshooting + +### Terminal Display Issues + +**Symptoms**: Garbled text, incorrect colors, layout broken +**Solutions**: +- Ensure terminal supports 256 colors: `echo $TERM` should show `xterm-256color` or similar +- Resize terminal and press `r` to refresh +- Try different terminal emulator (iTerm2, Alacritty, etc.) + +### Compilation Errors + +**Missing dependencies**: +```bash +# Update dependencies +cargo update -p ampcc + +# Rebuild from scratch +cargo clean && cargo build -p ampcc +``` + +**Clippy warnings**: +```bash +# Run clippy +just check + +# Auto-fix some warnings +cargo clippy --fix -p ampcc +``` + +### Runtime Panics + +**Terminal not restored**: +- Force restore: `reset` command in shell +- Or: Close terminal and open new one + +**Panic on startup**: +- Check config file syntax: `cat ~/.config/ampcc/config.toml` +- Verify URLs are well-formed +- Check auth token file exists: `ls ~/.amp/cache/amp_cli_auth` + +## Related Documentation + +- **Project Architecture**: `/docs/architecture.md` +- **Admin API Spec**: `/docs/openapi-specs/admin-api.yaml` +- **Dataset Patterns**: `/.patterns/datasets/` +- **Service Patterns**: `/.patterns/services-pattern.md` diff --git a/crates/bin/ampcc/Cargo.toml b/crates/bin/ampcc/Cargo.toml index afc6a4790..ed4bd1474 100644 --- a/crates/bin/ampcc/Cargo.toml +++ b/crates/bin/ampcc/Cargo.toml @@ -20,3 +20,9 @@ serde_json.workspace = true reqwest.workspace = true thiserror.workspace = true urlencoding = "2.1" +amp-client = { path = "../../clients/flight" } +arrow.workspace = true +futures.workspace = true +hex = "0.4" +chrono = { version = "0.4", features = ["serde"] } +csv = "1.3" diff --git a/crates/bin/ampcc/docs/query-panel.md b/crates/bin/ampcc/docs/query-panel.md new file mode 100644 index 000000000..9618fae96 --- /dev/null +++ b/crates/bin/ampcc/docs/query-panel.md @@ -0,0 +1,232 @@ +# SQL Query Panel + +The SQL Query Panel in ampcc allows you to run SQL queries against local Amp datasets directly from the TUI. + +> **Note**: This feature is only available in **Local mode** (press `1` to switch to Local mode). + +## Getting Started + +1. Switch to Local mode by pressing `1` +2. Select a dataset from the sidebar +3. Press `Q` to enter query mode + +A query input panel will appear with a template query like: +```sql +SELECT * FROM namespace.dataset LIMIT 10 +``` + +## Multi-line Query Input + +The query panel supports multi-line SQL queries, making it easy to write formatted, readable queries. + +### Keyboard Shortcuts + +| Key | Action | +|-----|--------| +| `Enter` | Insert a new line | +| `Ctrl+Enter` | Execute the query | +| `Esc` | Cancel and return to normal mode | +| `↑` / `↓` | Navigate between lines (or browse history at first/last line) | +| `←` / `→` | Move cursor (wraps across line boundaries) | +| `Home` | Jump to start of current line | +| `End` | Jump to end of current line | +| `Backspace` | Delete before cursor (joins lines at line start) | +| `Delete` | Delete at cursor (joins lines at line end) | + +### Example + +```sql +SELECT + block_number, + transaction_hash, + gas_used +FROM ethereum.transactions +WHERE block_number > 1000000 +ORDER BY gas_used DESC +LIMIT 100 +``` + +The input area automatically expands (up to 10 lines) to accommodate your query. + +## Syntax Highlighting + +SQL queries are automatically highlighted with colors to improve readability: + +| Element | Color | Examples | +|---------|-------|----------| +| Keywords | Purple | `SELECT`, `FROM`, `WHERE`, `JOIN`, `ORDER BY` | +| Strings | Green | `'hello'`, `"world"` | +| Numbers | Yellow | `100`, `3.14` | +| Operators | Gray | `=`, `<`, `>`, `+`, `-` | +| Identifiers | White | table names, column names | + +Highlighting updates as you type and works with both single and multi-line queries. + +## Query History + +Your queries are saved automatically and can be recalled: + +- Press `↑` at the first line to browse older queries +- Press `↓` at the last line to browse newer queries +- Press `Ctrl+R` to search history by keyword + - Type to filter matches + - Press `Ctrl+R` again to cycle through matches + - Press `Enter` to accept, `Esc` to cancel + +### Per-Dataset History + +History is organized **per-dataset** to keep your queries contextually relevant: + +| Scenario | History Shown | +|----------|---------------| +| Dataset has queries | Dataset-specific history | +| New dataset (no history) | Empty (start fresh) | +| No dataset selected | Global history | + +**How it works:** +- Each dataset (`namespace.name`) maintains its own isolated history +- When you execute a query, it's saved to the current dataset's history +- Switching datasets shows that dataset's history (or empty if new) +- Each dataset builds up its own relevant query history over time + +**Example workflow:** +1. Select `eth.blocks`, press `Q` to enter query mode +2. Run `SELECT * FROM eth.blocks LIMIT 10` +3. Press `↑` → See the query you just ran +4. Switch to `eth.logs` (new dataset) +5. Press `↑` → Empty history (fresh start for this dataset) +6. Run `SELECT * FROM eth.logs WHERE topic = '...'` +7. Press `↑` → See only `eth.logs` queries +8. Switch back to `eth.blocks`, press `↑` → See only `eth.blocks` queries + +### Persistent History + +History is saved to `~/.config/ampcc/history.json` and persists across sessions: + +```json +{ + "version": 2, + "history": [...], // Global history + "dataset_history": { // Per-dataset history + "eth.blocks": [...], + "eth.logs": [...] + } +} +``` + +- Maximum 100 entries per dataset and 100 in global history +- Consecutive duplicate queries are not saved +- History loads on startup, saves on quit + +## Query Templates + +Press `T` while in query mode to open the template picker - a popup with common SQL patterns that auto-fill based on your selected dataset. + +### Available Templates + +| Template | Pattern | Description | +|----------|---------|-------------| +| Preview data | `SELECT * FROM {table} LIMIT 10` | Quick look at sample rows | +| Row count | `SELECT COUNT(*) FROM {table}` | Total row count | +| Filter by column | `SELECT * FROM {table} WHERE {column} = '?'` | Filter with placeholder | +| Group by | `SELECT {column}, COUNT(*) FROM {table} GROUP BY {column}` | Aggregate by column | +| Unique values | `SELECT DISTINCT {column} FROM {table}` | List distinct values | +| Table schema | `DESCRIBE {table}` | Show table structure | + +### Using the Template Picker + +| Key | Action | +|-----|--------| +| `T` | Open template picker | +| `↑` / `↓` | Navigate templates | +| `Enter` | Select and insert template | +| `Esc` | Close without inserting | + +### Placeholder Resolution + +Templates contain placeholders that are automatically resolved: + +| Placeholder | Replaced With | Example | +|-------------|---------------|---------| +| `{table}` | Selected dataset as `namespace.name` | `ethereum.blocks` | +| `{column}` | First column from the dataset schema | `block_number` | + +**Example workflow:** +1. Select `ethereum.transactions` in the sidebar +2. Press `Q` to enter query mode +3. Press `T` to open template picker +4. Select "Group by" template +5. The query is inserted as: `SELECT gas_used, COUNT(*) FROM ethereum.transactions GROUP BY gas_used` + +The picker shows a **live preview** of the resolved SQL for each template, so you can see exactly what will be inserted before selecting. + +## Favorite Queries + +Save frequently-used queries as favorites: + +- Press `*` or `F` to toggle the current query as a favorite +- Press `Ctrl+F` to open the favorites panel +- Press `d` in the favorites panel to delete a favorite + +Favorites are persisted to `~/.config/ampcc/favorites.json`. + +## Query Results + +Results are displayed in a scrollable table below the query input: + +| Key | Action | +|-----|--------| +| `j` / `↓` | Scroll down | +| `k` / `↑` | Scroll up | +| `Ctrl+D` | Page down | +| `Ctrl+U` | Page up | +| `E` | Export results to CSV | +| `s` + `1-9` | Sort by column number | +| `S` | Clear sort (show original order) | +| `Q` | Edit query | + +### Exporting Results + +Press `E` to export the current results to a CSV file. The file is saved to the current directory with a timestamped filename like `query_results_20260119_143052.csv`. + +### Sorting Results + +Column headers display numbers like `[1] column_name`, `[2] column_name` to identify sortable columns. + +**How to sort:** + +1. Press `s` to enter sort mode (footer shows "Press 1-9 for column") +2. Press a digit `1-9` to sort by that column +3. Press the same column again to toggle between ascending (▲) and descending (▼) +4. Press `S` to clear sorting and restore original order + +**Sort indicators:** + +- Column headers show `▲` for ascending sort, `▼` for descending +- The title bar shows the current sort state: `"sorted by column_name ▲"` + +**Sort behavior:** + +- Numeric columns are sorted numerically (e.g., 2 < 10, not "10" < "2") +- Text columns are sorted alphabetically (case-sensitive) +- NULL and empty values are sorted to the end +- Sorting is performed client-side on the displayed results + +### Column Sizing + +Columns are automatically sized based on their content: + +- Width is calculated from header names and data values +- Minimum column width: 5 characters +- Maximum column width: 50 characters +- When columns exceed available space, they scale proportionally +- Narrow columns (IDs, booleans) get less space +- Wider columns (hashes, descriptions) get more space + +## Tips + +- Use `Tab` to cycle between Query input and Query results panes +- Long cell values are truncated in the display but exported in full +- NULL values sort to the end when sorting columns +- The title bar shows the current line count and history position +- SQL keywords are highlighted regardless of case (`select`, `SELECT`, `Select` all work) diff --git a/crates/bin/ampcc/src/app.rs b/crates/bin/ampcc/src/app.rs index 3cd1a1be9..62043820c 100644 --- a/crates/bin/ampcc/src/app.rs +++ b/crates/bin/ampcc/src/app.rs @@ -1,6 +1,11 @@ //! Application state and business logic. -use std::{collections::HashMap, sync::Arc, time::Instant}; +use std::{ + collections::HashMap, + path::PathBuf, + sync::Arc, + time::{Duration, Instant}, +}; use admin_client::{ Client, @@ -13,11 +18,69 @@ use url::Url; use crate::{config::Config, registry::RegistryClient}; +/// Cursor position in multi-line text. +#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)] +pub struct TextPosition { + pub line: usize, + pub column: usize, +} + +/// The history file structure for persistence. +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct HistoryFile { + pub dataset_history: HashMap>, +} + +/// The favorites file structure for persistence. +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct FavoritesFile { + pub version: u32, + pub favorites: Vec, +} + +/// A SQL query template. +#[derive(Debug, Clone)] +pub struct QueryTemplate { + /// The template pattern with placeholders. + pub pattern: &'static str, + /// Short description of the template. + pub description: &'static str, +} + +/// Available query templates. +pub const QUERY_TEMPLATES: &[QueryTemplate] = &[ + QueryTemplate { + pattern: "SELECT * FROM {table} LIMIT 10", + description: "Preview data", + }, + QueryTemplate { + pattern: "SELECT COUNT(*) FROM {table}", + description: "Row count", + }, + QueryTemplate { + pattern: "SELECT * FROM {table} WHERE {column} = '?'", + description: "Filter by column", + }, + QueryTemplate { + pattern: "SELECT {column}, COUNT(*) FROM {table} GROUP BY {column}", + description: "Group by", + }, + QueryTemplate { + pattern: "SELECT DISTINCT {column} FROM {table}", + description: "Unique values", + }, + QueryTemplate { + pattern: "DESCRIBE {table}", + description: "Table schema", + }, +]; + /// Input mode for the application. #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum InputMode { Normal, Search, + Query, } /// Active pane for focus tracking. @@ -27,17 +90,20 @@ pub enum InputMode { pub enum ActivePane { Header, Datasets, - Jobs, // Local only - Workers, // Local only - Manifest, // Dataset manifest pane (content area) - Schema, // Dataset schema pane (content area) - Detail, // Job/Worker detail view (Local only) + Jobs, // Local only + Workers, // Local only + Manifest, // Dataset manifest pane (content area) + Schema, // Dataset schema pane (content area) + Detail, // Job/Worker detail view (Local only) + Query, // SQL query input pane (Local only) + QueryResult, // SQL query results pane (Local only) } impl ActivePane { /// Cycle to the next pane. /// Local mode: Header -> Datasets -> Jobs -> Workers -> Detail -> Header /// Registry mode: Header -> Datasets -> Manifest -> Schema -> Header + /// Query panes: Query -> QueryResult -> Datasets (exit query mode) pub fn next(self, is_local: bool) -> Self { match self { ActivePane::Header => ActivePane::Datasets, @@ -53,12 +119,16 @@ impl ActivePane { ActivePane::Manifest => ActivePane::Schema, ActivePane::Schema => ActivePane::Detail, ActivePane::Detail => ActivePane::Header, + // Query panes cycle within query view + ActivePane::Query => ActivePane::QueryResult, + ActivePane::QueryResult => ActivePane::Datasets, } } /// Cycle to the previous pane. /// Local mode: Header -> Detail -> Workers -> Jobs -> Datasets -> Header /// Registry mode: Header -> Schema -> Manifest -> Datasets -> Header + /// Query panes: QueryResult -> Query -> Datasets (exit query mode) pub fn prev(self, is_local: bool) -> Self { match self { ActivePane::Header => { @@ -74,6 +144,9 @@ impl ActivePane { ActivePane::Manifest => ActivePane::Datasets, ActivePane::Schema => ActivePane::Manifest, ActivePane::Detail => ActivePane::Workers, + // Query panes cycle within query view + ActivePane::Query => ActivePane::Datasets, + ActivePane::QueryResult => ActivePane::Query, } } } @@ -87,10 +160,21 @@ pub enum ContentView { Job(JobInfo), /// Worker details Worker(WorkerDetailResponse), + /// SQL query results + QueryResults, /// Nothing selected None, } +/// Query results from SQL execution. +#[derive(Debug, Clone, Default)] +pub struct QueryResults { + pub columns: Vec, + pub rows: Vec>, + pub row_count: usize, + pub error: Option, +} + /// Data source for datasets. #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum DataSource { @@ -379,6 +463,10 @@ pub struct App { pub loading: bool, pub error_message: Option, + // Success message (auto-expires after 3 seconds) + pub success_message: Option, + pub message_expires: Option, + // Spinner animation state pub spinner_frame: usize, pub loading_message: Option, @@ -398,6 +486,40 @@ pub struct App { // Redraw flag for CPU optimization pub needs_redraw: bool, + + // SQL Query state (Local mode only) + pub query_input: String, + pub query_cursor: TextPosition, + pub query_input_scroll: u16, // Scroll offset for tall query input + pub query_results: Option, + pub query_scroll: u16, + pub query_scroll_state: ScrollbarState, + pub query_content_length: usize, + + // Query history (per-dataset, persisted) + pub query_history: HashMap>, + pub query_history_index: Option, // None = current input, Some(i) = history[i] + pub query_draft: String, // Preserved current input when navigating history + + // History search state (Ctrl+R) + pub history_search_active: bool, + pub history_search_query: String, + pub history_search_matches: Vec, // indices into query_history + pub history_search_index: usize, // index into matches + + // Template picker state + pub template_picker_open: bool, + pub template_picker_index: usize, + + // Result sorting state + pub result_sort_column: Option, + pub result_sort_ascending: bool, + pub result_sort_pending: bool, // true when waiting for column number input + + // Favorite queries state + pub favorite_queries: Vec, + pub favorites_panel_open: bool, + pub favorites_panel_index: usize, } impl App { @@ -437,6 +559,8 @@ impl App { current_inspect: None, loading: false, error_message: None, + success_message: None, + message_expires: None, spinner_frame: 0, loading_message: None, manifest_scroll: 0, @@ -449,6 +573,28 @@ impl App { schema_content_length: 0, detail_content_length: 0, needs_redraw: true, + query_input: String::new(), + query_cursor: TextPosition::default(), + query_input_scroll: 0, + query_results: None, + query_scroll: 0, + query_scroll_state: ScrollbarState::default(), + query_content_length: 0, + query_history: HashMap::new(), + query_history_index: None, + query_draft: String::new(), + history_search_active: false, + history_search_query: String::new(), + history_search_matches: Vec::new(), + history_search_index: 0, + template_picker_open: false, + template_picker_index: 0, + result_sort_column: None, + result_sort_ascending: true, + result_sort_pending: false, + favorite_queries: Vec::new(), + favorites_panel_open: false, + favorites_panel_index: 0, }) } @@ -485,6 +631,24 @@ impl App { self.loading_message = None; } + /// Set a success message that auto-expires after 3 seconds. + pub fn set_success_message(&mut self, msg: String) { + self.success_message = Some(msg); + self.message_expires = Some(Instant::now() + Duration::from_secs(3)); + // Clear any error message when showing success + self.error_message = None; + } + + /// Tick message expiration - call in main loop. + pub fn tick_messages(&mut self) { + if let Some(expires) = self.message_expires + && Instant::now() > expires + { + self.success_message = None; + self.message_expires = None; + } + } + /// Scroll up in the focused pane. pub fn scroll_up(&mut self) { match self.active_pane { @@ -506,7 +670,16 @@ impl App { .detail_scroll_state .position(self.detail_scroll as usize); } - ActivePane::Header | ActivePane::Datasets | ActivePane::Jobs | ActivePane::Workers => {} + ActivePane::QueryResult => { + self.query_scroll = self.query_scroll.saturating_sub(1); + self.query_scroll_state = + self.query_scroll_state.position(self.query_scroll as usize); + } + ActivePane::Header + | ActivePane::Datasets + | ActivePane::Jobs + | ActivePane::Workers + | ActivePane::Query => {} } } @@ -540,7 +713,19 @@ impl App { .position(self.detail_scroll as usize); } } - ActivePane::Header | ActivePane::Datasets | ActivePane::Jobs | ActivePane::Workers => {} + ActivePane::QueryResult => { + let max_scroll = self.query_content_length.saturating_sub(1); + if (self.query_scroll as usize) < max_scroll { + self.query_scroll = self.query_scroll.saturating_add(1); + self.query_scroll_state = + self.query_scroll_state.position(self.query_scroll as usize); + } + } + ActivePane::Header + | ActivePane::Datasets + | ActivePane::Jobs + | ActivePane::Workers + | ActivePane::Query => {} } } @@ -552,9 +737,12 @@ impl App { self.schema_scroll_state = ScrollbarState::default(); self.detail_scroll = 0; self.detail_scroll_state = ScrollbarState::default(); + self.query_scroll = 0; + self.query_scroll_state = ScrollbarState::default(); self.manifest_content_length = 0; self.schema_content_length = 0; self.detail_content_length = 0; + self.query_content_length = 0; } /// Page up in the focused pane. @@ -578,7 +766,16 @@ impl App { .detail_scroll_state .position(self.detail_scroll as usize); } - ActivePane::Header | ActivePane::Datasets | ActivePane::Jobs | ActivePane::Workers => {} + ActivePane::QueryResult => { + self.query_scroll = self.query_scroll.saturating_sub(page_size); + self.query_scroll_state = + self.query_scroll_state.position(self.query_scroll as usize); + } + ActivePane::Header + | ActivePane::Datasets + | ActivePane::Jobs + | ActivePane::Workers + | ActivePane::Query => {} } } @@ -609,7 +806,17 @@ impl App { .detail_scroll_state .position(self.detail_scroll as usize); } - ActivePane::Header | ActivePane::Datasets | ActivePane::Jobs | ActivePane::Workers => {} + ActivePane::QueryResult => { + let max_scroll = self.query_content_length.saturating_sub(1) as u16; + self.query_scroll = self.query_scroll.saturating_add(page_size).min(max_scroll); + self.query_scroll_state = + self.query_scroll_state.position(self.query_scroll as usize); + } + ActivePane::Header + | ActivePane::Datasets + | ActivePane::Jobs + | ActivePane::Workers + | ActivePane::Query => {} } } @@ -981,9 +1188,43 @@ impl App { async fn fetch_versions(&self, namespace: &str, name: &str) -> Result> { match self.current_source { DataSource::Local => { - // Local doesn't have version listing in admin-client currently - // Return just the latest version if available - Ok(Vec::new()) + use datasets_common::fqn::FullyQualifiedName; + + // Construct FQN for the dataset + let fqn_str = format!("{}/{}", namespace, name); + let fqn: FullyQualifiedName = fqn_str + .parse() + .map_err(|e| anyhow::anyhow!("invalid FQN: {}", e))?; + + // Call admin API to list versions + match self.local_client.datasets().list_versions(&fqn).await { + Ok(versions_response) => { + // Determine which version is the latest + let latest_version = versions_response.special_tags.latest.as_ref(); + + // Map API response to VersionEntry structs + let versions = versions_response + .versions + .into_iter() + .map(|v| { + let is_latest = + latest_version.map(|lv| lv == &v.version).unwrap_or(false); + VersionEntry { + version_tag: v.version.to_string(), + status: "registered".to_string(), + created_at: v.created_at, + is_latest, + } + }) + .collect(); + Ok(versions) + } + Err(e) => { + // Log error but don't crash - graceful degradation + eprintln!("Failed to fetch versions for {}: {}", fqn_str, e); + Ok(Vec::new()) + } + } } DataSource::Registry => { let versions = self.registry_client.get_versions(namespace, name).await?; @@ -1032,4 +1273,493 @@ impl App { } } } + + /// Get the path to the history file. + pub fn history_file_path() -> Result { + let config_dir = directories::ProjectDirs::from("com", "thegraph", "ampcc") + .context("could not determine config directory")? + .config_dir() + .to_path_buf(); + Ok(config_dir.join("history.json")) + } + + /// Load history from disk on startup. + pub fn load_history(&mut self) -> Result<()> { + let path = Self::history_file_path()?; + if !path.exists() { + return Ok(()); // No history yet + } + + let content = std::fs::read_to_string(&path)?; + let file: HistoryFile = serde_json::from_str(&content)?; + self.query_history = file.dataset_history; + + Ok(()) + } + + /// Save history to disk on shutdown. + pub fn save_history(&self) -> Result<()> { + let path = Self::history_file_path()?; + + // Ensure directory exists + if let Some(parent) = path.parent() { + std::fs::create_dir_all(parent)?; + } + + // Limit history per dataset + const MAX_HISTORY_PER_DATASET: usize = 100; + let mut limited_history: HashMap> = HashMap::new(); + for (key, queries) in &self.query_history { + let limited: Vec = queries + .iter() + .rev() + .take(MAX_HISTORY_PER_DATASET) + .rev() + .cloned() + .collect(); + if !limited.is_empty() { + limited_history.insert(key.clone(), limited); + } + } + + let file = HistoryFile { + dataset_history: limited_history, + }; + + let content = serde_json::to_string_pretty(&file)?; + std::fs::write(&path, content)?; + + Ok(()) + } + + /// Get the history key for the current dataset. + pub fn current_history_key(&self) -> String { + if let Some((ns, name, _)) = self.get_selected_manifest_params() { + format!("{}.{}", ns, name) + } else { + "global".to_string() + } + } + + /// Get the history for the current dataset. + pub fn current_history(&self) -> Vec { + let key = self.current_history_key(); + self.query_history.get(&key).cloned().unwrap_or_default() + } + + /// Add a query to the current dataset's history. + pub fn add_to_history(&mut self, query: String) { + let key = self.current_history_key(); + let history = self.query_history.entry(key).or_default(); + // Avoid consecutive duplicates + if history.last() != Some(&query) { + history.push(query); + // Limit in-memory history + const MAX_HISTORY: usize = 100; + if history.len() > MAX_HISTORY { + history.remove(0); + } + } + } + + // ======================================================================== + // Multi-line Query Input Helpers + // ======================================================================== + + /// Get the lines of query input. + pub fn query_lines(&self) -> Vec<&str> { + if self.query_input.is_empty() { + vec![""] + } else { + self.query_input.split('\n').collect() + } + } + + /// Get the number of lines in query input. + pub fn query_line_count(&self) -> usize { + if self.query_input.is_empty() { + 1 + } else { + self.query_input.split('\n').count() + } + } + + /// Get the length of a specific line. + pub fn get_line_length(&self, line: usize) -> usize { + self.query_lines().get(line).map(|l| l.len()).unwrap_or(0) + } + + /// Convert TextPosition to byte offset in query_input. + pub fn cursor_to_offset(&self) -> usize { + let mut offset = 0; + for (i, line) in self.query_input.split('\n').enumerate() { + if i == self.query_cursor.line { + return offset + self.query_cursor.column.min(line.len()); + } + offset += line.len() + 1; // +1 for newline + } + self.query_input.len() + } + + /// Convert byte offset to TextPosition. + #[allow(dead_code)] + pub fn offset_to_cursor(input: &str, offset: usize) -> TextPosition { + let mut line = 0; + let mut column = 0; + let mut current = 0; + + for ch in input.chars() { + if current >= offset { + break; + } + if ch == '\n' { + line += 1; + column = 0; + } else { + column += 1; + } + current += ch.len_utf8(); + } + + TextPosition { line, column } + } + + /// Move cursor up one line (returns true if moved). + pub fn cursor_up(&mut self) -> bool { + if self.query_cursor.line > 0 { + self.query_cursor.line -= 1; + let line_len = self.get_line_length(self.query_cursor.line); + self.query_cursor.column = self.query_cursor.column.min(line_len); + true + } else { + false + } + } + + /// Move cursor down one line (returns true if moved). + pub fn cursor_down(&mut self) -> bool { + let line_count = self.query_line_count(); + if self.query_cursor.line < line_count.saturating_sub(1) { + self.query_cursor.line += 1; + let line_len = self.get_line_length(self.query_cursor.line); + self.query_cursor.column = self.query_cursor.column.min(line_len); + true + } else { + false + } + } + + /// Move cursor left, wrapping to previous line if at start. + pub fn cursor_left(&mut self) { + if self.query_cursor.column > 0 { + self.query_cursor.column -= 1; + } else if self.query_cursor.line > 0 { + // Move to end of previous line + self.query_cursor.line -= 1; + self.query_cursor.column = self.get_line_length(self.query_cursor.line); + } + } + + /// Move cursor right, wrapping to next line if at end. + pub fn cursor_right(&mut self) { + let line_len = self.get_line_length(self.query_cursor.line); + if self.query_cursor.column < line_len { + self.query_cursor.column += 1; + } else if self.query_cursor.line < self.query_line_count().saturating_sub(1) { + // Move to start of next line + self.query_cursor.line += 1; + self.query_cursor.column = 0; + } + } + + /// Move cursor to start of current line. + pub fn cursor_home(&mut self) { + self.query_cursor.column = 0; + } + + /// Move cursor to end of current line. + pub fn cursor_end(&mut self) { + self.query_cursor.column = self.get_line_length(self.query_cursor.line); + } + + /// Insert a character at cursor position. + pub fn insert_char(&mut self, c: char) { + let offset = self.cursor_to_offset(); + self.query_input.insert(offset, c); + if c == '\n' { + self.query_cursor.line += 1; + self.query_cursor.column = 0; + } else { + self.query_cursor.column += 1; + } + } + + /// Delete character before cursor (backspace). + /// Returns true if something was deleted. + pub fn backspace(&mut self) -> bool { + if self.query_cursor.column > 0 { + // Delete character in current line + let offset = self.cursor_to_offset(); + self.query_input.remove(offset - 1); + self.query_cursor.column -= 1; + true + } else if self.query_cursor.line > 0 { + // Join with previous line + let offset = self.cursor_to_offset(); + let prev_line_len = self.get_line_length(self.query_cursor.line - 1); + self.query_input.remove(offset - 1); // Remove the newline + self.query_cursor.line -= 1; + self.query_cursor.column = prev_line_len; + true + } else { + false + } + } + + /// Delete character at cursor position (delete key). + /// Returns true if something was deleted. + pub fn delete_char(&mut self) -> bool { + let offset = self.cursor_to_offset(); + if offset < self.query_input.len() { + self.query_input.remove(offset); + true + } else { + false + } + } + + /// Set query input and reset cursor to end. + pub fn set_query_input(&mut self, input: String) { + self.query_input = input; + // Move cursor to end of input + let line_count = self.query_line_count(); + if line_count > 0 { + self.query_cursor.line = line_count - 1; + self.query_cursor.column = self.get_line_length(self.query_cursor.line); + } else { + self.query_cursor = TextPosition::default(); + } + } + + // ======================================================================== + // History Search (Ctrl+R) + // ======================================================================== + + /// Update history search matches based on current search query. + pub fn update_history_search(&mut self) { + let history = self.current_history(); + if self.history_search_query.is_empty() { + // Empty search query matches all history + self.history_search_matches = (0..history.len()).rev().collect(); + } else { + let query_lower = self.history_search_query.to_lowercase(); + self.history_search_matches = history + .iter() + .enumerate() + .filter(|(_, h)| h.to_lowercase().contains(&query_lower)) + .map(|(i, _)| i) + .rev() // Most recent first + .collect(); + } + + // Reset to first match + self.history_search_index = 0; + + // Update query input to show current match + if let Some(&idx) = self.history_search_matches.first() + && let Some(query) = history.get(idx) + { + self.set_query_input(query.clone()); + } + } + + /// Cycle to the next history search match. + pub fn cycle_history_search(&mut self) { + if self.history_search_matches.is_empty() { + return; + } + + self.history_search_index = + (self.history_search_index + 1) % self.history_search_matches.len(); + + let history = self.current_history(); + let idx = self.history_search_matches[self.history_search_index]; + if let Some(query) = history.get(idx) { + self.set_query_input(query.clone()); + } + } + + /// Enter history search mode. + pub fn enter_history_search(&mut self) { + // Save current input as draft before starting search + if !self.history_search_active { + self.query_draft = self.query_input.clone(); + } + self.history_search_active = true; + self.history_search_query.clear(); + self.history_search_matches.clear(); + self.history_search_index = 0; + } + + /// Exit history search mode, accepting current match. + pub fn accept_history_search(&mut self) { + self.history_search_active = false; + self.history_search_query.clear(); + // query_input already has the selected match + } + + /// Cancel history search mode, restoring original input. + pub fn cancel_history_search(&mut self) { + self.history_search_active = false; + self.history_search_query.clear(); + self.set_query_input(self.query_draft.clone()); + } + + /// Resolve template placeholders with current context. + pub fn resolve_template(&self, template: &str) -> String { + let table = self + .current_inspect + .as_ref() + .and_then(|i| i.tables.first()) + .map(|t| t.name.as_str()) + .unwrap_or("table_name"); + + let column = self + .current_inspect + .as_ref() + .and_then(|i| i.tables.first()) + .and_then(|t| t.columns.first()) + .map(|c| c.name.as_str()) + .unwrap_or("column_name"); + + template + .replace("{table}", table) + .replace("{column}", column) + } + + /// Get sorted indices for query results based on current sort state. + /// Returns None if no sort is active, otherwise returns sorted row indices. + pub fn get_sorted_indices(&self) -> Option> { + let col = self.result_sort_column?; + let results = self.query_results.as_ref()?; + + if col >= results.columns.len() { + return None; + } + + let mut indices: Vec = (0..results.rows.len()).collect(); + let ascending = self.result_sort_ascending; + + indices.sort_by(|&a, &b| { + let val_a = results.rows[a].get(col).map(|s| s.as_str()).unwrap_or(""); + let val_b = results.rows[b].get(col).map(|s| s.as_str()).unwrap_or(""); + + // Try numeric comparison first + let cmp = match (val_a.parse::(), val_b.parse::()) { + (Ok(num_a), Ok(num_b)) => num_a + .partial_cmp(&num_b) + .unwrap_or(std::cmp::Ordering::Equal), + _ => val_a.cmp(val_b), + }; + + if ascending { cmp } else { cmp.reverse() } + }); + + Some(indices) + } + + /// Toggle sort on the specified column. + /// If already sorted by this column, reverses direction. + /// If sorted by different column, starts ascending sort on new column. + pub fn toggle_sort(&mut self, col: usize) { + if self.result_sort_column == Some(col) { + // Toggle direction + self.result_sort_ascending = !self.result_sort_ascending; + } else { + // New column, start ascending + self.result_sort_column = Some(col); + self.result_sort_ascending = true; + } + self.result_sort_pending = false; + } + + /// Clear all sorting. + pub fn clear_sort(&mut self) { + self.result_sort_column = None; + self.result_sort_ascending = true; + self.result_sort_pending = false; + } + + /// Get the path to the favorites file. + pub fn favorites_file_path() -> Result { + let config_dir = directories::ProjectDirs::from("com", "thegraph", "ampcc") + .context("could not determine config directory")? + .config_dir() + .to_path_buf(); + Ok(config_dir.join("favorites.json")) + } + + /// Load favorites from disk. + pub fn load_favorites(&mut self) { + if let Ok(path) = Self::favorites_file_path() + && path.exists() + && let Ok(contents) = std::fs::read_to_string(&path) + && let Ok(file) = serde_json::from_str::(&contents) + { + self.favorite_queries = file.favorites; + } + } + + /// Save favorites to disk. + pub fn save_favorites(&self) { + if let Ok(path) = Self::favorites_file_path() { + if let Some(parent) = path.parent() { + let _ = std::fs::create_dir_all(parent); + } + let file = FavoritesFile { + version: 1, + favorites: self.favorite_queries.clone(), + }; + if let Ok(contents) = serde_json::to_string_pretty(&file) { + let _ = std::fs::write(&path, contents); + } + } + } + + /// Check if current query is a favorite. + pub fn is_current_query_favorite(&self) -> bool { + let trimmed = self.query_input.trim(); + !trimmed.is_empty() && self.favorite_queries.iter().any(|f| f.trim() == trimmed) + } + + /// Toggle favorite status for current query. + pub fn toggle_favorite(&mut self) { + let trimmed = self.query_input.trim().to_string(); + if trimmed.is_empty() { + return; + } + + if let Some(idx) = self + .favorite_queries + .iter() + .position(|f| f.trim() == trimmed) + { + self.favorite_queries.remove(idx); + } else { + self.favorite_queries.push(trimmed); + } + } + + /// Remove a favorite by index. + pub fn remove_favorite(&mut self, idx: usize) { + if idx < self.favorite_queries.len() { + self.favorite_queries.remove(idx); + // Adjust panel index if needed + if self.favorites_panel_index >= self.favorite_queries.len() + && !self.favorite_queries.is_empty() + { + self.favorites_panel_index = self.favorite_queries.len() - 1; + } + } + } } diff --git a/crates/bin/ampcc/src/main.rs b/crates/bin/ampcc/src/main.rs index ecafd1dd6..c0291f669 100644 --- a/crates/bin/ampcc/src/main.rs +++ b/crates/bin/ampcc/src/main.rs @@ -10,6 +10,9 @@ use admin_client::{ workers::{WorkerDetailResponse, WorkerInfo}, }; use anyhow::Result; +use arrow::array::{ + Array, BinaryArray, BooleanArray, Float64Array, Int64Array, StringArray, UInt64Array, +}; use crossterm::{ event::{ self, DisableMouseCapture, EnableMouseCapture, Event, KeyCode, KeyModifiers, MouseEventKind, @@ -17,6 +20,7 @@ use crossterm::{ execute, terminal::{EnterAlternateScreen, LeaveAlternateScreen, disable_raw_mode, enable_raw_mode}, }; +use futures::StreamExt; use ratatui::{Terminal, backend::CrosstermBackend}; use tokio::sync::mpsc; use worker::{job::JobId, node_id::NodeId}; @@ -26,7 +30,10 @@ mod config; mod registry; mod ui; -use app::{ActivePane, App, ContentView, DataSource, InputMode, InspectResult}; +use app::{ + ActivePane, App, ContentView, DataSource, InputMode, InspectResult, QUERY_TEMPLATES, + QueryResults, +}; use ratatui::layout::{Constraint, Direction, Layout, Rect}; /// Auto-refresh interval for jobs/workers (10 seconds). @@ -40,6 +47,7 @@ enum AppEvent { WorkerDetailLoaded(Option), JobStopped(Result<(), String>), JobDeleted(Result<(), String>), + QueryCompleted(QueryResults), Error(String), } @@ -48,6 +56,15 @@ async fn main() -> Result<()> { let config = config::Config::load()?; let mut app = App::new(config)?; + // Load persisted history + if let Err(e) = app.load_history() { + eprintln!("Warning: could not load history: {}", e); + // Continue anyway - not fatal + } + + // Load favorites + app.load_favorites(); + // Fetch initial datasets if let Err(e) = app.fetch_datasets().await { eprintln!("Failed to fetch datasets: {}", e); @@ -110,6 +127,13 @@ where app.needs_redraw = true; } + // Tick message expiration (for success messages) + let had_message = app.success_message.is_some(); + app.tick_messages(); + if had_message && app.success_message.is_none() { + app.needs_redraw = true; + } + // Auto-refresh jobs/workers in Local mode if app.is_local() && app.last_refresh.elapsed() >= REFRESH_INTERVAL { spawn_fetch_jobs(app, tx.clone()); @@ -172,6 +196,15 @@ where } app.stop_loading(); } + AppEvent::QueryCompleted(results) => { + app.query_results = Some(results); + app.query_scroll = 0; + app.query_scroll_state = ratatui::widgets::ScrollbarState::default(); + app.content_view = ContentView::QueryResults; + app.active_pane = ActivePane::QueryResult; + app.input_mode = InputMode::Normal; + app.stop_loading(); + } AppEvent::Error(msg) => { app.error_message = Some(msg); app.stop_loading(); @@ -184,6 +217,70 @@ where if event::poll(tick_rate)? { match event::read()? { Event::Key(key) => { + // Handle template picker if open (modal popup) + if app.template_picker_open { + match key.code { + KeyCode::Up => { + if app.template_picker_index > 0 { + app.template_picker_index -= 1; + } + } + KeyCode::Down => { + if app.template_picker_index < QUERY_TEMPLATES.len() - 1 { + app.template_picker_index += 1; + } + } + KeyCode::Enter => { + // Apply selected template + let template = &QUERY_TEMPLATES[app.template_picker_index]; + let resolved = app.resolve_template(template.pattern); + app.set_query_input(resolved); + app.template_picker_open = false; + } + KeyCode::Esc => { + app.template_picker_open = false; + } + _ => {} + } + continue; + } + + // Handle favorites panel if open (modal popup) + if app.favorites_panel_open { + match key.code { + KeyCode::Up => { + if app.favorites_panel_index > 0 { + app.favorites_panel_index -= 1; + } + } + KeyCode::Down => { + if app.favorites_panel_index + < app.favorite_queries.len().saturating_sub(1) + { + app.favorites_panel_index += 1; + } + } + KeyCode::Enter => { + // Load selected favorite into query input + if let Some(query) = + app.favorite_queries.get(app.favorites_panel_index).cloned() + { + app.set_query_input(query); + app.favorites_panel_open = false; + } + } + KeyCode::Char('d') => { + // Delete the selected favorite + app.remove_favorite(app.favorites_panel_index); + } + KeyCode::Esc => { + app.favorites_panel_open = false; + } + _ => {} + } + continue; + } + match app.input_mode { InputMode::Normal => { match key.code { @@ -223,6 +320,25 @@ where app.input_mode = InputMode::Search; } + // Query mode (Q key) - only in Local mode + KeyCode::Char('Q') => { + if app.is_local() { + app.input_mode = InputMode::Query; + app.active_pane = ActivePane::Query; + app.content_view = ContentView::QueryResults; + // Pre-populate with template if dataset selected and input empty + if app.query_input.is_empty() + && let Some((ns, name, _)) = + app.get_selected_manifest_params() + { + app.set_query_input(format!( + "SELECT * FROM {}.{} LIMIT 10", + ns, name + )); + } + } + } + // Refresh - context sensitive KeyCode::Char('r') => { let tx = tx.clone(); @@ -263,10 +379,9 @@ where } } - // Stop job (s key) - KeyCode::Char('s') => { - if app.active_pane == ActivePane::Jobs - && let Some(job) = app.get_selected_job() + // Stop job (s key in Jobs pane) + KeyCode::Char('s') if app.active_pane == ActivePane::Jobs => { + if let Some(job) = app.get_selected_job() && App::can_stop_job(&job.status) { let job_id = job.id; @@ -399,6 +514,64 @@ where app.active_pane = app.active_pane.prev(is_local); } + // Export query results to CSV (E key in QueryResult pane) + KeyCode::Char('E') | KeyCode::Char('e') + if app.active_pane == ActivePane::QueryResult => + { + if let Some(results) = &app.query_results { + match export_results_csv(results) { + Ok(filename) => { + app.set_success_message(format!( + "Exported {} rows to {}", + results.row_count, filename + )); + } + Err(e) => { + app.error_message = + Some(format!("Export failed: {}", e)); + } + } + } else { + app.error_message = + Some("No results to export".to_string()); + } + } + + // Sort query results (s key in QueryResult pane) + KeyCode::Char('s') + if app.active_pane == ActivePane::QueryResult + && app.query_results.is_some() => + { + app.result_sort_pending = true; + } + + // Clear sort (S key in QueryResult pane) + KeyCode::Char('S') + if app.active_pane == ActivePane::QueryResult => + { + app.clear_sort(); + } + + // Column selection for sorting (1-9 keys when sort pending) + KeyCode::Char(c @ '1'..='9') + if app.result_sort_pending + && app.active_pane == ActivePane::QueryResult => + { + let col = c.to_digit(10).unwrap() as usize - 1; // 0-indexed + if let Some(results) = &app.query_results { + if col < results.columns.len() { + app.toggle_sort(col); + } else { + app.result_sort_pending = false; + } + } + } + + // Cancel sort mode with Escape + KeyCode::Esc if app.result_sort_pending => { + app.result_sort_pending = false; + } + _ => {} } } @@ -421,6 +594,161 @@ where } _ => {} }, + InputMode::Query => match key.code { + KeyCode::Enter if key.modifiers.contains(KeyModifiers::CONTROL) => { + // Execute query with Ctrl+Enter + let sql = app.query_input.clone(); + if !sql.trim().is_empty() { + // Add to per-dataset history + app.add_to_history(sql.trim().to_string()); + + // Reset history navigation state + app.query_history_index = None; + app.query_draft.clear(); + + app.start_loading("Executing query..."); + spawn_execute_query(app, sql, tx.clone()); + } + } + KeyCode::Enter => { + if app.history_search_active { + // Accept current match + app.accept_history_search(); + } else { + // Insert newline (plain Enter without Ctrl) + app.query_history_index = None; + app.insert_char('\n'); + } + } + KeyCode::Esc => { + if app.history_search_active { + // Cancel search, restore original input + app.cancel_history_search(); + } else { + // Cancel query input, return to normal mode + app.input_mode = InputMode::Normal; + app.active_pane = ActivePane::Datasets; + // Restore previous content view if we have results + if app.query_results.is_none() { + app.content_view = ContentView::Dataset; + } + } + } + KeyCode::Up => { + // First try to move cursor up in multiline input + if !app.cursor_up() { + // At first line, navigate history + let history = app.current_history(); + if !history.is_empty() { + match app.query_history_index { + None => { + // Save current input as draft, load most recent history + app.query_draft = app.query_input.clone(); + let last_idx = history.len() - 1; + app.query_history_index = Some(last_idx); + app.set_query_input(history[last_idx].clone()); + } + Some(idx) if idx > 0 => { + // Move to older entry + let new_idx = idx - 1; + app.query_history_index = Some(new_idx); + app.set_query_input(history[new_idx].clone()); + } + Some(_) => { + // Already at oldest entry, do nothing + } + } + } + } + } + KeyCode::Down => { + // First try to move cursor down in multiline input + if !app.cursor_down() { + // At last line, navigate history + let history = app.current_history(); + if let Some(idx) = app.query_history_index { + if idx < history.len() - 1 { + // Move to newer entry + let new_idx = idx + 1; + app.query_history_index = Some(new_idx); + app.set_query_input(history[new_idx].clone()); + } else { + // At newest entry, restore draft + app.query_history_index = None; + app.set_query_input(app.query_draft.clone()); + } + } + } + } + KeyCode::Char('T') | KeyCode::Char('t') + if key.modifiers.is_empty() + || key.modifiers == KeyModifiers::SHIFT => + { + // Open template picker (only in query mode, only 'T' or 't') + app.template_picker_open = true; + app.template_picker_index = 0; + } + // History search (Ctrl+R) + KeyCode::Char('r') if key.modifiers.contains(KeyModifiers::CONTROL) => { + if app.history_search_active { + // Cycle to next match + app.cycle_history_search(); + } else { + // Enter search mode + app.enter_history_search(); + } + } + KeyCode::Char(c) if app.history_search_active => { + app.history_search_query.push(c); + app.update_history_search(); + } + KeyCode::Backspace if app.history_search_active => { + app.history_search_query.pop(); + app.update_history_search(); + } + KeyCode::Char('f') if key.modifiers == KeyModifiers::CONTROL => { + // Open favorites panel with Ctrl+F + if !app.favorite_queries.is_empty() { + app.favorites_panel_open = true; + app.favorites_panel_index = 0; + } + } + KeyCode::Char('*') | KeyCode::Char('F') + if key.modifiers.is_empty() + || key.modifiers == KeyModifiers::SHIFT => + { + // Toggle favorite for current query + app.toggle_favorite(); + } + KeyCode::Char(c) => { + // Reset history navigation on edit + app.query_history_index = None; + app.insert_char(c); + } + KeyCode::Backspace => { + // Reset history navigation on edit + app.query_history_index = None; + app.backspace(); + } + KeyCode::Delete => { + // Reset history navigation on edit + app.query_history_index = None; + app.delete_char(); + } + KeyCode::Left => { + app.cursor_left(); + } + KeyCode::Right => { + app.cursor_right(); + } + KeyCode::Home => { + app.cursor_home(); + } + KeyCode::End => { + app.cursor_end(); + } + _ => {} + }, } } Event::Mouse(mouse) => { @@ -538,6 +866,31 @@ where app.active_pane = ActivePane::Schema; } } + ContentView::QueryResults => { + // Dynamic height for query input + let query_height = + (app.query_line_count() as u16 + 2).clamp(3, 10); + let query_chunks = Layout::default() + .direction(Direction::Vertical) + .constraints([ + Constraint::Length(query_height), // Query input + Constraint::Min(0), // Results + ]) + .split(content_area); + + let query_input_area = query_chunks[0]; + let query_results_area = query_chunks[1]; + + if y >= query_input_area.y + && y < query_input_area.y + query_input_area.height + { + app.active_pane = ActivePane::Query; + } else if y >= query_results_area.y + && y < query_results_area.y + query_results_area.height + { + app.active_pane = ActivePane::QueryResult; + } + } ContentView::Job(_) | ContentView::Worker(_) | ContentView::None => { @@ -560,6 +913,11 @@ where } if app.should_quit { + // Save history and favorites before quitting + if let Err(e) = app.save_history() { + eprintln!("Warning: could not save history: {}", e); + } + app.save_favorites(); return Ok(()); } } @@ -719,3 +1077,146 @@ fn spawn_delete_job(app: &App, job_id: JobId, tx: mpsc::Sender) { } }); } + +fn spawn_execute_query(app: &App, sql: String, tx: mpsc::Sender) { + let query_url = app.config.local_query_url.clone(); + + tokio::spawn(async move { + let results = execute_query(&query_url, &sql).await; + let _ = tx.send(AppEvent::QueryCompleted(results)).await; + }); +} + +async fn execute_query(query_url: &str, sql: &str) -> QueryResults { + // Connect to Flight service + let mut client = match amp_client::AmpClient::from_endpoint(query_url).await { + Ok(c) => c, + Err(e) => { + return QueryResults { + columns: vec![], + rows: vec![], + row_count: 0, + error: Some(format!("Connection failed: {}", e)), + }; + } + }; + + // Execute query + let mut stream = match client.query(sql).await { + Ok(s) => s, + Err(e) => { + return QueryResults { + columns: vec![], + rows: vec![], + row_count: 0, + error: Some(format!("Query failed: {}", e)), + }; + } + }; + + // Collect results + let mut columns: Option> = None; + let mut rows: Vec> = Vec::new(); + + while let Some(batch_result) = stream.next().await { + match batch_result { + Ok(batch) => { + // Extract column names from schema (once) + if columns.is_none() { + columns = Some( + batch + .schema() + .fields() + .iter() + .map(|f| f.name().clone()) + .collect(), + ); + } + + // Convert batch to rows + for row_idx in 0..batch.num_rows() { + let row: Vec = (0..batch.num_columns()) + .map(|col_idx| format_array_value(batch.column(col_idx).as_ref(), row_idx)) + .collect(); + rows.push(row); + } + } + Err(e) => { + let row_count = rows.len(); + return QueryResults { + columns: columns.unwrap_or_default(), + rows, + row_count, + error: Some(format!("Fetch error: {}", e)), + }; + } + } + } + + let row_count = rows.len(); + QueryResults { + columns: columns.unwrap_or_default(), + rows, + row_count, + error: None, + } +} + +fn format_array_value(array: &dyn Array, idx: usize) -> String { + if array.is_null(idx) { + return "NULL".to_string(); + } + + // Handle common types + if let Some(arr) = array.as_any().downcast_ref::() { + return arr.value(idx).to_string(); + } + if let Some(arr) = array.as_any().downcast_ref::() { + return arr.value(idx).to_string(); + } + if let Some(arr) = array.as_any().downcast_ref::() { + return arr.value(idx).to_string(); + } + if let Some(arr) = array.as_any().downcast_ref::() { + return format!("{:.6}", arr.value(idx)); + } + if let Some(arr) = array.as_any().downcast_ref::() { + return arr.value(idx).to_string(); + } + // Binary types - show hex + if let Some(arr) = array.as_any().downcast_ref::() { + let bytes = arr.value(idx); + if bytes.len() > 16 { + return format!("0x{}...", hex::encode(&bytes[..16])); + } + return format!("0x{}", hex::encode(bytes)); + } + + // Fallback: use array_value_to_string from Arrow + arrow::util::display::array_value_to_string(array, idx).unwrap_or_else(|_| "?".to_string()) +} + +/// Export query results to a CSV file. +/// +/// Returns the filename on success, or an error message on failure. +fn export_results_csv(results: &QueryResults) -> Result { + let timestamp = chrono::Local::now().format("%Y%m%d_%H%M%S"); + let filename = format!("query_results_{}.csv", timestamp); + + let mut wtr = + csv::Writer::from_path(&filename).map_err(|e| format!("Failed to create file: {}", e))?; + + // Write header + wtr.write_record(&results.columns) + .map_err(|e| format!("Failed to write header: {}", e))?; + + // Write rows + for row in &results.rows { + wtr.write_record(row) + .map_err(|e| format!("Failed to write row: {}", e))?; + } + + wtr.flush().map_err(|e| format!("Failed to flush: {}", e))?; + + Ok(filename) +} diff --git a/crates/bin/ampcc/src/ui.rs b/crates/bin/ampcc/src/ui.rs index 660e2bc11..3cc739eb5 100644 --- a/crates/bin/ampcc/src/ui.rs +++ b/crates/bin/ampcc/src/ui.rs @@ -9,11 +9,15 @@ use ratatui::{ style::{Color, Modifier, Style}, text::{Line, Span}, widgets::{ - Block, Borders, List, ListItem, ListState, Paragraph, Scrollbar, ScrollbarOrientation, + Block, Borders, Cell, List, ListItem, ListState, Paragraph, Row, Scrollbar, + ScrollbarOrientation, Table, Wrap, }, }; -use crate::app::{ActivePane, App, ContentView, DataSource, InputMode, InspectResult}; +use crate::app::{ + ActivePane, App, ContentView, DataSource, InputMode, InspectResult, QUERY_TEMPLATES, + QueryResults, +}; // ============================================================================ // The Graph Color Palette @@ -176,6 +180,276 @@ impl Theme { } } +// ============================================================================ +// SQL Syntax Highlighting +// ============================================================================ + +/// SQL keywords for syntax highlighting. +const SQL_KEYWORDS: &[&str] = &[ + "SELECT", + "FROM", + "WHERE", + "AND", + "OR", + "NOT", + "IN", + "LIKE", + "JOIN", + "LEFT", + "RIGHT", + "INNER", + "OUTER", + "ON", + "ORDER", + "BY", + "ASC", + "DESC", + "GROUP", + "HAVING", + "LIMIT", + "OFFSET", + "INSERT", + "INTO", + "VALUES", + "UPDATE", + "SET", + "DELETE", + "CREATE", + "TABLE", + "INDEX", + "DROP", + "ALTER", + "AS", + "DISTINCT", + "ALL", + "UNION", + "INTERSECT", + "EXCEPT", + "CASE", + "WHEN", + "THEN", + "ELSE", + "END", + "NULL", + "TRUE", + "FALSE", + "COUNT", + "SUM", + "AVG", + "MIN", + "MAX", + "DESCRIBE", + "IS", + "BETWEEN", + "EXISTS", + "CROSS", + "FULL", + "NATURAL", + "USING", + "WITH", + "RECURSIVE", + "OVER", + "PARTITION", + "WINDOW", + "ROWS", + "RANGE", + "UNBOUNDED", + "PRECEDING", + "FOLLOWING", + "CURRENT", + "ROW", +]; + +/// SQL token types for syntax highlighting. +#[derive(Debug, Clone)] +enum SqlToken { + Keyword(String), + String(String), + Number(String), + Operator(String), + Identifier(String), + Whitespace(String), + Punctuation(String), +} + +/// Tokenize SQL input for syntax highlighting. +fn tokenize_sql(input: &str) -> Vec { + let mut tokens = Vec::new(); + let mut chars = input.chars().peekable(); + + while let Some(&ch) = chars.peek() { + match ch { + // Whitespace + ' ' | '\t' | '\n' | '\r' => { + let mut ws = String::new(); + while let Some(&c) = chars.peek() { + if c.is_whitespace() { + ws.push(chars.next().unwrap()); + } else { + break; + } + } + tokens.push(SqlToken::Whitespace(ws)); + } + // String literal (single quote) + '\'' => { + let quote = chars.next().unwrap(); + let mut s = String::from(quote); + while let Some(&c) = chars.peek() { + s.push(chars.next().unwrap()); + if c == '\'' { + break; + } + } + tokens.push(SqlToken::String(s)); + } + // String literal (double quote) + '"' => { + let quote = chars.next().unwrap(); + let mut s = String::from(quote); + while let Some(&c) = chars.peek() { + s.push(chars.next().unwrap()); + if c == '"' { + break; + } + } + tokens.push(SqlToken::String(s)); + } + // Number + '0'..='9' => { + let mut num = String::new(); + while let Some(&c) = chars.peek() { + if c.is_ascii_digit() || c == '.' { + num.push(chars.next().unwrap()); + } else { + break; + } + } + tokens.push(SqlToken::Number(num)); + } + // Operators + '=' | '<' | '>' | '!' | '+' | '-' | '*' | '/' | '%' => { + let mut op = String::new(); + op.push(chars.next().unwrap()); + // Handle two-character operators like !=, <=, >=, <> + if let Some(&next_ch) = chars.peek() + && ((ch == '!' && next_ch == '=') + || (ch == '<' && (next_ch == '=' || next_ch == '>')) + || (ch == '>' && next_ch == '=')) + { + op.push(chars.next().unwrap()); + } + tokens.push(SqlToken::Operator(op)); + } + // Punctuation + '(' | ')' | ',' | ';' | '.' | ':' => { + tokens.push(SqlToken::Punctuation(chars.next().unwrap().to_string())); + } + // Word (keyword or identifier) + _ if ch.is_alphabetic() || ch == '_' => { + let mut word = String::new(); + while let Some(&c) = chars.peek() { + if c.is_alphanumeric() || c == '_' { + word.push(chars.next().unwrap()); + } else { + break; + } + } + if SQL_KEYWORDS.contains(&word.to_uppercase().as_str()) { + tokens.push(SqlToken::Keyword(word)); + } else { + tokens.push(SqlToken::Identifier(word)); + } + } + // Other characters (backticks, brackets, etc.) + _ => { + tokens.push(SqlToken::Identifier(chars.next().unwrap().to_string())); + } + } + } + + tokens +} + +/// Get the style for a SQL token. +fn sql_token_style(token: &SqlToken) -> Style { + match token { + SqlToken::Keyword(_) => Theme::accent(), + SqlToken::String(_) => Theme::status_success(), + SqlToken::Number(_) => Theme::type_annotation(), + SqlToken::Operator(_) => Theme::text_secondary(), + SqlToken::Identifier(_) => Theme::text_primary(), + SqlToken::Whitespace(_) => Style::default(), + SqlToken::Punctuation(_) => Theme::text_secondary(), + } +} + +/// Get the text content of a SQL token. +fn sql_token_text(token: &SqlToken) -> &str { + match token { + SqlToken::Keyword(s) + | SqlToken::String(s) + | SqlToken::Number(s) + | SqlToken::Operator(s) + | SqlToken::Identifier(s) + | SqlToken::Whitespace(s) + | SqlToken::Punctuation(s) => s, + } +} + +/// Highlight SQL and insert cursor at specified position. +/// Returns spans for a single line with cursor indicator. +fn highlight_sql_with_cursor(line: &str, cursor_col: Option) -> Vec> { + let mut result = Vec::new(); + + if let Some(col) = cursor_col { + // Tokenize the entire line + let tokens = tokenize_sql(line); + + let mut current_pos = 0; + let mut cursor_inserted = false; + + for token in &tokens { + let token_text = sql_token_text(token); + let token_len = token_text.len(); + let token_end = current_pos + token_len; + let style = sql_token_style(token); + + if !cursor_inserted && col >= current_pos && col < token_end { + // Cursor is within this token + let offset_in_token = col - current_pos; + let (before, after) = token_text.split_at(offset_in_token); + if !before.is_empty() { + result.push(Span::styled(before.to_string(), style)); + } + result.push(Span::styled("_", Theme::status_warning())); + cursor_inserted = true; + if !after.is_empty() { + result.push(Span::styled(after.to_string(), style)); + } + } else { + result.push(Span::styled(token_text.to_string(), style)); + } + + current_pos = token_end; + } + + // If cursor is at end of line + if !cursor_inserted && col >= current_pos { + result.push(Span::styled("_", Theme::status_warning())); + } + } else { + // No cursor on this line - just highlight without cursor + for token in tokenize_sql(line) { + let style = sql_token_style(&token); + let text = sql_token_text(&token).to_string(); + result.push(Span::styled(text, style)); + } + } + + result +} + /// ASCII art logo for splash screen (displayed when Header pane is focused). const AMP_LOGO: &str = r#" ▒█░ @@ -215,6 +489,16 @@ pub fn draw(f: &mut Frame, app: &mut App) { draw_header(f, app, chunks[0]); draw_main(f, app, chunks[1]); draw_footer(f, app, chunks[2]); + + // Draw template picker popup on top if open + if app.template_picker_open { + draw_template_picker(f, app); + } + + // Draw favorites panel popup on top if open + if app.favorites_panel_open { + draw_favorites_panel(f, app); + } } /// Draw the header with source information. @@ -659,6 +943,20 @@ fn draw_content(f: &mut Frame, app: &mut App, area: Rect) { ContentView::Worker(worker) => { draw_worker_detail(f, app, worker.clone(), area); } + ContentView::QueryResults => { + // Dynamic height for query input: line count + 2 (borders), clamped to 3-10 lines + let query_height = (app.query_line_count() as u16 + 2).clamp(3, 10); + let chunks = Layout::default() + .direction(Direction::Vertical) + .constraints([ + Constraint::Length(query_height), // Query input + Constraint::Min(0), // Results + ]) + .split(area); + + draw_query_input(f, app, chunks[0]); + draw_query_results(f, app, chunks[1]); + } ContentView::None => { draw_empty_content(f, area); } @@ -680,6 +978,285 @@ fn draw_empty_content(f: &mut Frame, area: Rect) { f.render_widget(text, area); } +/// Draw the SQL query input panel. +fn draw_query_input(f: &mut Frame, app: &App, area: Rect) { + let border_style = if app.active_pane == ActivePane::Query { + Theme::border_focused() + } else { + Theme::border_unfocused() + }; + + let line_count = app.query_line_count(); + let star = if app.is_current_query_favorite() { + " ★" + } else { + "" + }; + let history = app.current_history(); + let title = if app.input_mode == InputMode::Query { + if let Some(idx) = app.query_history_index { + format!( + "SQL Query{} (history {}/{}) [Ctrl+Enter execute, Esc cancel]", + star, + idx + 1, + history.len() + ) + } else if !history.is_empty() { + format!( + "SQL Query{} ({} lines) [↑↓ nav, Ctrl+F fav, F/*, Ctrl+Enter]", + star, line_count + ) + } else { + format!( + "SQL Query{} ({} lines) [F/* toggle fav, Ctrl+Enter execute]", + star, line_count + ) + } + } else { + format!("SQL Query{} (press Q to edit)", star) + }; + + let block = Block::default() + .title(title) + .borders(Borders::ALL) + .border_style(border_style); + + // Build lines with SQL syntax highlighting and cursor rendering + let lines: Vec = if app.input_mode == InputMode::Query { + let query_lines = app.query_lines(); + query_lines + .iter() + .enumerate() + .map(|(line_idx, line_text)| { + let cursor_col = if line_idx == app.query_cursor.line { + Some(app.query_cursor.column.min(line_text.len())) + } else { + None + }; + Line::from(highlight_sql_with_cursor(line_text, cursor_col)) + }) + .collect() + } else { + // Not in query mode, show syntax highlighting without cursor + app.query_lines() + .iter() + .map(|line| Line::from(highlight_sql_with_cursor(line, None))) + .collect() + }; + + let paragraph = Paragraph::new(lines) + .block(block) + .scroll((app.query_input_scroll, 0)); + f.render_widget(paragraph, area); +} + +/// Calculate column widths based on content, with min/max constraints. +/// +/// Returns a vector of pixel widths for each column, auto-sized based on content +/// and scaled proportionally if total exceeds available width. +fn calculate_column_widths(results: &QueryResults, available_width: u16) -> Vec { + let col_count = results.columns.len(); + if col_count == 0 { + return vec![]; + } + + // Calculate max width needed for each column (considering headers) + let mut max_widths: Vec = results.columns.iter().map(|c| c.len()).collect(); + + // Check data rows for wider content + for row in &results.rows { + for (i, cell) in row.iter().enumerate() { + if i < max_widths.len() { + max_widths[i] = max_widths[i].max(cell.len()); + } + } + } + + // Apply min (5 chars) and max (50 chars) constraints + const MIN_WIDTH: u16 = 5; + const MAX_WIDTH: u16 = 50; + + let mut widths: Vec = max_widths + .iter() + .map(|&w| (w as u16).clamp(MIN_WIDTH, MAX_WIDTH)) + .collect(); + + // Account for borders and spacing between columns (3 chars per column for " | ") + let total_padding = (col_count as u16).saturating_mul(3); + let usable = available_width.saturating_sub(total_padding); + + // Scale proportionally if total exceeds available width + let total: u16 = widths.iter().sum(); + if total > usable && total > 0 { + let scale = usable as f32 / total as f32; + widths = widths + .iter() + .map(|&w| ((w as f32 * scale) as u16).max(MIN_WIDTH)) + .collect(); + } + + widths +} + +/// Draw the SQL query results table. +fn draw_query_results(f: &mut Frame, app: &mut App, area: Rect) { + let border_style = if app.active_pane == ActivePane::QueryResult { + Theme::border_focused() + } else { + Theme::border_unfocused() + }; + + let Some(results) = &app.query_results else { + let block = Block::default() + .title("Query Results") + .borders(Borders::ALL) + .border_style(border_style); + let text = Paragraph::new("No query executed yet. Press Q to enter a SQL query.") + .block(block) + .style(Theme::text_secondary()); + f.render_widget(text, area); + return; + }; + + // Show error if present + if let Some(error) = &results.error { + let block = Block::default() + .title("Query Error") + .borders(Borders::ALL) + .border_style(Theme::status_error()); + let text = Paragraph::new(error.as_str()) + .block(block) + .style(Theme::status_error()) + .wrap(Wrap { trim: false }); + f.render_widget(text, area); + return; + } + + // Build title with sort info + let sort_info = if app.result_sort_pending { + " - Press 1-9 for column".to_string() + } else if let Some(col) = app.result_sort_column { + let dir = if app.result_sort_ascending { + "▲" + } else { + "▼" + }; + let col_name = results.columns.get(col).map(|s| s.as_str()).unwrap_or("?"); + format!(" - sorted by {} {}", col_name, dir) + } else { + String::new() + }; + + let title = format!( + "Query Results ({} rows){} [E]xport [s]ort [S]clear", + results.row_count, sort_info + ); + let block = Block::default() + .title(title) + .borders(Borders::ALL) + .border_style(border_style); + + // Handle empty results + if results.columns.is_empty() { + let text = Paragraph::new("Query returned no columns.") + .block(block) + .style(Theme::text_secondary()); + f.render_widget(text, area); + return; + } + + // Calculate auto-sized column widths based on content + // Account for 2 chars of border on each side of the table + let available_width = area.width.saturating_sub(2); + let col_widths = calculate_column_widths(results, available_width); + + // Build table header with column numbers and sort indicators + let header_cells: Vec = results + .columns + .iter() + .enumerate() + .map(|(idx, c)| { + let col_num = idx + 1; // 1-indexed for user display + let sort_indicator = if app.result_sort_column == Some(idx) { + if app.result_sort_ascending { + " ▲" + } else { + " ▼" + } + } else { + "" + }; + let header_text = format!("[{}] {}{}", col_num, c, sort_indicator); + Cell::from(header_text).style(Theme::text_primary().add_modifier(Modifier::BOLD)) + }) + .collect(); + let header = Row::new(header_cells).height(1); + + // Build table rows with truncation based on calculated column widths + // Use sorted indices if sorting is active, otherwise use original order + let sorted_indices = app.get_sorted_indices(); + let row_indices: Vec = + sorted_indices.unwrap_or_else(|| (0..results.rows.len()).collect()); + + let rows: Vec = row_indices + .iter() + .map(|&row_idx| { + let row = &results.rows[row_idx]; + let cells: Vec = row + .iter() + .enumerate() + .map(|(col_idx, val)| { + // Get the calculated width for this column (default to 50 if out of bounds) + let max_width = col_widths.get(col_idx).copied().unwrap_or(50) as usize; + let display = if val.len() > max_width && max_width > 3 { + format!("{}...", &val[..max_width.saturating_sub(3)]) + } else { + val.clone() + }; + Cell::from(display) + }) + .collect(); + Row::new(cells) + }) + .collect(); + + // Convert widths to constraints for the table + let widths: Vec = col_widths.iter().map(|&w| Constraint::Length(w)).collect(); + + let table = Table::new(rows, widths) + .header(header) + .block(block) + .row_highlight_style(Theme::selection()); + + // Update content length for scrolling + app.query_content_length = results.rows.len(); + app.query_scroll_state = app.query_scroll_state.content_length(results.rows.len()); + + // Render with scroll state + let mut table_state = ratatui::widgets::TableState::default(); + table_state.select(Some(app.query_scroll as usize)); + + f.render_stateful_widget(table, area, &mut table_state); + + // Draw scrollbar if content exceeds visible area + let visible_rows = area.height.saturating_sub(3) as usize; // -3 for borders and header + if results.rows.len() > visible_rows { + let scrollbar = Scrollbar::new(ScrollbarOrientation::VerticalRight); + let mut scrollbar_state = app + .query_scroll_state + .content_length(results.rows.len()) + .position(app.query_scroll as usize); + f.render_stateful_widget( + scrollbar, + area.inner(ratatui::layout::Margin { + vertical: 1, + horizontal: 0, + }), + &mut scrollbar_state, + ); + } +} + /// Format job descriptor fields as readable lines (generic key-value display). fn format_descriptor_lines(descriptor: &serde_json::Value) -> Vec> { let mut lines = Vec::new(); @@ -1200,6 +1777,8 @@ fn draw_footer(f: &mut Frame, app: &App, area: Rect) { "Search: {}_ (Enter to confirm, Esc to cancel)", app.search_query ) + } else if matches!(app.input_mode, InputMode::Query) { + "Query: Type SQL, Ctrl+Enter to execute, Esc to cancel".to_string() } else { let source_hint = match app.current_source { DataSource::Local => "[1]Local [2]Registry", @@ -1210,9 +1789,29 @@ fn draw_footer(f: &mut Frame, app: &App, area: Rect) { let context_keys = match app.active_pane { ActivePane::Jobs => "[s] Stop [d] Delete [r] Refresh", ActivePane::Workers => "[r] Refresh", - ActivePane::Datasets => "[Enter] Expand [r] Refresh", + ActivePane::Datasets => { + if app.is_local() { + "[Enter] Expand [r] Refresh [Q] Query" + } else { + "[Enter] Expand [r] Refresh" + } + } ActivePane::Manifest | ActivePane::Schema | ActivePane::Detail => "[Ctrl+u/d] Scroll", ActivePane::Header => "[Tab] Navigate", + ActivePane::Query => { + if app.current_history().is_empty() { + "[Ctrl+Enter] Execute [Esc] Cancel" + } else { + "[↑↓] History [Ctrl+Enter] Execute [Esc] Cancel" + } + } + ActivePane::QueryResult => { + if app.result_sort_pending { + "Sort: Press 1-9 for column, Esc to cancel" + } else { + "[s] Sort [S] Clear sort [E] Export [j/k] Scroll [Q] Edit" + } + } }; format!( @@ -1221,7 +1820,7 @@ fn draw_footer(f: &mut Frame, app: &App, area: Rect) { ) }; - // Build the right side (loading indicator or error message) + // Build the right side (loading indicator, success message, or error message) let (right_spans, right_width): (Vec, u16) = if app.loading { // Show loading spinner let mut spans = Vec::new(); @@ -1238,6 +1837,19 @@ fn draw_footer(f: &mut Frame, app: &App, area: Rect) { .map(|m| m.len() + 1) .unwrap_or(0); (spans, (msg_len + 2) as u16) + } else if let Some(success) = &app.success_message { + // Show success message (green) + let display_success = if success.len() > 60 { + format!("{}...", &success[..57]) + } else { + success.clone() + }; + let spans = vec![Span::styled( + format!("✓ {}", display_success), + Theme::status_success(), + )]; + let width = (display_success.len() + 3) as u16; // icon + space + message + (spans, width) } else if let Some(error) = &app.error_message { // Show error message let display_error = if error.len() > 50 { @@ -1283,3 +1895,143 @@ fn truncate_url(url: &str, max_len: usize) -> String { format!("{}...", &url[..max_len - 3]) } } + +/// Draw the template picker popup. +fn draw_template_picker(f: &mut Frame, app: &App) { + // Calculate popup dimensions + let popup_width = 60u16; + let popup_height = (QUERY_TEMPLATES.len() + 4) as u16; // templates + borders + title + footer + + // Center the popup + let area = f.area(); + let x = area.width.saturating_sub(popup_width) / 2; + let y = area.height.saturating_sub(popup_height) / 2; + let popup_area = Rect::new( + x, + y, + popup_width.min(area.width), + popup_height.min(area.height), + ); + + // Clear the popup area with a background + let clear = ratatui::widgets::Clear; + f.render_widget(clear, popup_area); + + // Build template list items + let items: Vec = QUERY_TEMPLATES + .iter() + .enumerate() + .map(|(idx, template)| { + // Resolve the template to show preview + let resolved = app.resolve_template(template.pattern); + let truncated = if resolved.len() > 50 { + format!("{}...", &resolved[..47]) + } else { + resolved + }; + + let style = if idx == app.template_picker_index { + Theme::selection() + } else { + Theme::text_primary() + }; + + let line = Line::from(vec![ + Span::styled( + format!("{:<16}", template.description), + Theme::text_secondary(), + ), + Span::styled(truncated, style), + ]); + ListItem::new(line) + }) + .collect(); + + let list = List::new(items) + .block( + Block::default() + .title("Select Template (↑↓ navigate, Enter select, Esc cancel)") + .borders(Borders::ALL) + .border_style(Theme::border_focused()), + ) + .highlight_style(Theme::selection()) + .highlight_symbol(">> "); + + let mut state = ListState::default(); + state.select(Some(app.template_picker_index)); + + f.render_stateful_widget(list, popup_area, &mut state); +} + +/// Draw the favorites panel popup. +fn draw_favorites_panel(f: &mut Frame, app: &App) { + // Calculate popup dimensions + let popup_width = 70u16; + let popup_height = (app.favorite_queries.len().min(10) + 4) as u16; + + // Center the popup + let area = f.area(); + let x = area.width.saturating_sub(popup_width) / 2; + let y = area.height.saturating_sub(popup_height) / 2; + let popup_area = Rect::new( + x, + y, + popup_width.min(area.width), + popup_height.min(area.height), + ); + + // Clear the popup area + let clear = ratatui::widgets::Clear; + f.render_widget(clear, popup_area); + + if app.favorite_queries.is_empty() { + // Show empty message + let block = Block::default() + .title("Favorites (empty)") + .borders(Borders::ALL) + .border_style(Theme::border_focused()); + let text = Paragraph::new("No favorites yet. Press * or F in query mode to add.") + .block(block) + .style(Theme::text_secondary()); + f.render_widget(text, popup_area); + return; + } + + // Build favorites list items + let items: Vec = app + .favorite_queries + .iter() + .enumerate() + .map(|(idx, query)| { + // Truncate long queries + let truncated = if query.len() > 60 { + format!("{}...", &query[..57]) + } else { + query.clone() + }; + + let style = if idx == app.favorites_panel_index { + Theme::selection() + } else { + Theme::text_primary() + }; + + ListItem::new(Span::styled(truncated, style)) + }) + .collect(); + + let list = List::new(items) + .block( + Block::default() + .title("Favorites (↑↓ nav, Enter load, d delete, Esc close)") + .borders(Borders::ALL) + .border_style(Theme::border_focused()), + ) + .highlight_style(Theme::selection()) + .highlight_symbol("★ "); + + let mut state = ListState::default(); + state.select(Some(app.favorites_panel_index)); + + f.render_stateful_widget(list, popup_area, &mut state); +}