Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
d3b3422
feat(cluster-mon): Add BaseCollector ABC and complete Pydantic config…
nileshnegi Mar 21, 2026
4ae8db0
fix(cluster-mon/base): fix code quality issues from review
nileshnegi Mar 21, 2026
395aa5f
feat(cluster-mon): Add SSH port-forwarding support
nileshnegi Mar 21, 2026
23d7705
feat(cluster-mon): Update collectors to BaseCollector + refactor AppS…
nileshnegi Mar 21, 2026
41ecf3d
fix(cluster-mon/ssh): Fix critical code quality issues from review
nileshnegi Mar 21, 2026
74de84e
feat(cluster-mon): Add Redis init and collectors status API
nileshnegi Mar 21, 2026
22f82ff
feat(cluster-mon/rccl): Add RCCL data layer (Phase 1)
nileshnegi Mar 21, 2026
b6a7853
feat(cluster-mon/rccl): RCCL Collector + API + AppState integration (…
nileshnegi Mar 21, 2026
b7c9993
fix(cluster-mon): Fix post-review issues
nileshnegi Mar 21, 2026
010b11b
fix(cluster-mon): Fix blockers — RLock deadlock, GPU sync exec, WebSo…
nileshnegi Mar 21, 2026
4d96c5b
fix(cluster-mon): Address remaining review findings
nileshnegi Mar 21, 2026
726dd54
feat(cluster-mon/frontend): Add RCCL Health, Topology, and Timeline p…
nileshnegi Mar 21, 2026
28bcff2
refactor(cluster-mon): Rename ncclras → rcclras throughout codebase
nileshnegi Mar 21, 2026
7e2fda4
feat(cluster-mon/rccl): Add rcclras text parser from real fixture data
nileshnegi Mar 21, 2026
f6bdb50
fix(cluster-mon): Address staged review recommendations
nileshnegi Mar 21, 2026
1ae11fd
Technical Report
nileshnegi Mar 21, 2026
471e847
Ignore .venv (from cluster-mon/backend)
nileshnegi Mar 30, 2026
7805dc4
Fix v1
nileshnegi Mar 30, 2026
f3b7e29
Fix v2
nileshnegi Mar 30, 2026
e4a5e53
Fix v3
nileshnegi Mar 30, 2026
fab09a0
Fix CVS exit cleanup
nileshnegi Mar 30, 2026
2f76193
Updated Technical Report
nileshnegi Mar 30, 2026
0866e3a
Implement RCCL Inspector Plugin support
nileshnegi Apr 5, 2026
fd54411
Updated Technical Report with RCCL Inspector
nileshnegi Apr 5, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cvs/monitors/cluster-mon/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ __pycache__/
*.so
.Python
venv/
.venv/
env/
ENV/
*.egg-info/
Expand Down
4 changes: 3 additions & 1 deletion cvs/monitors/cluster-mon/backend/app/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"""

from fastapi import APIRouter
from app.api import cluster, nodes, metrics, config, software, restart, packages, logs, ssh_keys
from app.api import cluster, nodes, metrics, config, software, restart, packages, logs, ssh_keys, collectors, rccl_endpoints

router = APIRouter()

Expand All @@ -17,3 +17,5 @@
router.include_router(packages.router, prefix="/packages", tags=["packages"])
router.include_router(logs.router, prefix="/logs", tags=["logs"])
router.include_router(ssh_keys.router, prefix="/ssh-keys", tags=["ssh-keys"])
router.include_router(collectors.router, prefix="/collectors", tags=["collectors"])
router.include_router(rccl_endpoints.router, prefix="/rccl", tags=["rccl"])
76 changes: 76 additions & 0 deletions cvs/monitors/cluster-mon/backend/app/api/collectors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
"""
Collectors status API endpoint.
Returns per-collector state and aggregate overall_status.
"""

from fastapi import APIRouter
from typing import Any

router = APIRouter()


def _compute_overall_status(collector_results: dict, collectors_meta: dict) -> str:
"""
Compute aggregate status from per-collector results.

- "healthy" : all collectors in OK or NO_SERVICE state
- "degraded" : some collectors erroring, but no critical ones
- "critical" : any collector marked critical=True is in ERROR or UNREACHABLE
"""
if not collector_results:
return "healthy"

for name, result in collector_results.items():
state = result.state if hasattr(result, 'state') else result.get('state', 'ok')
state_str = state.value if hasattr(state, 'value') else str(state)
is_error = state_str in ("error", "unreachable")
is_critical = collectors_meta.get(name, {}).get('critical', False)
if is_error and is_critical:
return "critical"

any_error = any(
(r.state.value if hasattr(r.state, 'value') else str(r.state)) in ("error", "unreachable")
for r in collector_results.values()
if hasattr(r, 'state')
)
return "degraded" if any_error else "healthy"


@router.get("/status")
async def get_collectors_status() -> dict[str, Any]:
"""
Return per-collector state and aggregate overall_status.

Response shape:
{
"gpu": {"state": "ok", "timestamp": "...", "error": null},
"nic": {"state": "ok", "timestamp": "...", "error": null},
"rccl": {"state": "no_service", "timestamp": "...", "error": "..."},
"overall_status": "healthy"
}
"""
from app.main import app_state, REGISTERED_COLLECTORS

# Build collectors metadata (critical flag) from REGISTERED_COLLECTORS
collectors_meta = {
cls.name: {"critical": getattr(cls, "critical", False)}
for cls in REGISTERED_COLLECTORS
}

result: dict[str, Any] = {}
for name, collector_result in app_state.collector_results.items():
state_val = (
collector_result.state.value
if hasattr(collector_result.state, 'value')
else str(collector_result.state)
)
result[name] = {
"state": state_val,
"timestamp": collector_result.timestamp,
"error": collector_result.error,
}

result["overall_status"] = _compute_overall_status(
app_state.collector_results, collectors_meta
)
return result
7 changes: 3 additions & 4 deletions cvs/monitors/cluster-mon/backend/app/api/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,9 +184,8 @@ async def update_configuration(config: ClusterConfigUpdate) -> Dict[str, Any]:
if config.jump_host.auth_method == "password" and config.jump_host.password:
# Store in memory
app_state.jump_host_password = config.jump_host.password
# Also save to YAML for development/testing (WARNING: Not secure for production)
cluster_config["cluster"]["ssh"]["jump_host"]["password"] = config.jump_host.password
logger.warning("⚠️ Jump host password saved to cluster.yaml - NOT RECOMMENDED FOR PRODUCTION")
# SECURITY: Password is stored in memory only (app_state), never persisted to disk
# cluster_config["cluster"]["ssh"]["jump_host"]["password"] is intentionally omitted
else:
app_state.jump_host_password = None
# Remove password from YAML if using key-based auth
Expand Down Expand Up @@ -251,7 +250,7 @@ async def get_current_configuration() -> Dict[str, Any]:
"""
Get current configuration including all SSH and jump host settings.
"""
from app.core.simple_config import config as settings
from app.core.config import settings

nodes = settings.load_nodes_from_file()

Expand Down
141 changes: 141 additions & 0 deletions cvs/monitors/cluster-mon/backend/app/api/rccl_endpoints.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
"""
RCCL monitoring REST API endpoints.
Phase 1: status, communicators, events, markers.
"""

import logging
from typing import Any, Optional
from fastapi import APIRouter, HTTPException, Query
from app.models.rccl_models import RCCLMarker

router = APIRouter()
logger = logging.getLogger(__name__)


@router.get("/status")
async def get_rccl_status() -> dict[str, Any]:
"""
Current RCCL job state and communicator health summary.
Returns the latest snapshot from app_state.latest_rccl_snapshot.
Falls back to {'state': 'no_job'} if no snapshot yet collected.
"""
from app.main import app_state

snapshot = getattr(app_state, 'latest_rccl_snapshot', None)
if snapshot is None:
return {"state": "no_job", "message": "No RCCL snapshot collected yet"}
return snapshot


@router.get("/communicators")
async def get_rccl_communicators() -> list[dict]:
"""All communicators with per-rank detail from the latest snapshot."""
from app.main import app_state

snapshot = getattr(app_state, 'latest_rccl_snapshot', None)
if snapshot is None:
return []
return snapshot.get("communicators", [])


@router.get("/communicators/{comm_hash}")
async def get_rccl_communicator(comm_hash: str) -> dict[str, Any]:
"""Single communicator deep-dive by hash."""
from app.main import app_state

snapshot = getattr(app_state, 'latest_rccl_snapshot', None)
if snapshot is None:
raise HTTPException(status_code=404, detail="No snapshot available")
for comm in snapshot.get("communicators", []):
if comm.get("comm_hash") == comm_hash:
return comm
raise HTTPException(status_code=404, detail=f"Communicator {comm_hash!r} not found")


@router.get("/events")
async def get_rccl_events(
since: Optional[float] = Query(None, description="Start timestamp (unix)"),
until: Optional[float] = Query(None, description="End timestamp (unix)"),
event_type: Optional[str] = Query(None, alias="type"),
) -> dict:
"""Filtered event log from Redis event stream (or in-memory fallback)."""
from app.main import app_state
import time

data_store = getattr(app_state, 'rccl_data_store', None)
if data_store is None:
return {"events": [], "truncated": False}

start = since or (time.time() - 3600) # default: last hour
end = until or time.time()
events = await data_store.get_events_in_range(start, end)

if event_type:
events = [e for e in events if e.get("event_type") == event_type]

return {
"events": events,
"truncated": data_store.is_memory_capped,
}


@router.get("/performance")
async def get_rccl_performance() -> dict:
"""
Latest Inspector performance snapshot.
Returns bandwidth stats across all ranks from the most recent poll cycle.
Returns 503 when Inspector collector is disabled or has not run yet.
"""
from app.main import app_state
from fastapi import Response
import time

data_store = getattr(app_state, 'rccl_data_store', None)
if data_store is None:
raise HTTPException(status_code=503, detail="Data store not initialized")

snapshot = await data_store.get_inspector_current()
if snapshot is None:
raise HTTPException(
status_code=503,
detail="No Inspector snapshot available. Check that rccl.inspector.enabled=true and a job is running.",
)
return snapshot


@router.get("/performance/history")
async def get_rccl_performance_history(
count: int = Query(50, ge=1, le=500, description="Number of snapshots to return"),
) -> dict:
"""
Recent Inspector performance snapshots for time-series charting.
Returns up to `count` snapshots, newest first.
"""
from app.main import app_state

data_store = getattr(app_state, 'rccl_data_store', None)
if data_store is None:
return {"snapshots": []}

snapshots = await data_store.get_inspector_snapshots(count=count)
return {"snapshots": snapshots, "count": len(snapshots)}


@router.post("/markers", status_code=201)
async def post_rccl_marker(marker: RCCLMarker) -> dict[str, str]:
"""
PyTorch callback endpoint for training step/loss markers.
Stores marker as an event in the RCCL event stream.
"""
from app.main import app_state
import time

event = marker.model_dump()
event.setdefault("event_type", "training_marker")
event.setdefault("timestamp", time.time())

data_store = getattr(app_state, 'rccl_data_store', None)
if data_store:
await data_store.push_event(event)

return {"status": "accepted"}
Loading
Loading