Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion nemo_run/core/execution/dgxcloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@
logger = logging.getLogger(__name__)


def _default_custom_spec() -> dict[str, Any]:
"""Default custom_spec with TTL configuration for automatic pod cleanup."""
return {"ttlSecondsAfterFinished": 3600}


class DGXCloudState(Enum):
CREATING = "Creating"
INITIALIZING = "Initializing"
Expand Down Expand Up @@ -62,7 +67,7 @@ class DGXCloudExecutor(Executor):
pvc_job_dir: str = field(init=False, default="")
pvcs: list[dict[str, Any]] = field(default_factory=list)
distributed_framework: str = "PyTorch"
custom_spec: dict[str, Any] = field(default_factory=dict)
custom_spec: dict[str, Any] = field(default_factory=_default_custom_spec)

def get_auth_token(self) -> Optional[str]:
url = f"{self.base_url}/token"
Expand Down
2 changes: 1 addition & 1 deletion nemo_run/run/experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -636,7 +636,7 @@ def run(
If sequential=True, all tasks will be run one after the other.
The order is based on the order in which they were added.

Parallel mode only works if all exectuors in the experiment support it.
Parallel mode only works if all executors in the experiment support it.
Currently, all executors support parallel mode.

In sequential mode, if all executor supports dependencies, then all tasks will be scheduled at once
Expand Down
52 changes: 52 additions & 0 deletions test/core/execution/test_dgxcloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ def test_init(self):
gpus_per_node=8,
pvc_nemo_run_dir="/workspace/nemo_run",
pvcs=[{"path": "/workspace", "claimName": "test-claim"}],
custom_spec={"ttlSecondsAfterFinished": 7200},
)

assert executor.base_url == "https://dgxapi.example.com"
Expand All @@ -48,8 +49,59 @@ def test_init(self):
assert executor.gpus_per_node == 8
assert executor.pvcs == [{"path": "/workspace", "claimName": "test-claim"}]
assert executor.distributed_framework == "PyTorch"
assert executor.custom_spec["ttlSecondsAfterFinished"] == 7200
assert executor.pvc_nemo_run_dir == "/workspace/nemo_run"

def test_init_default_ttl(self):
"""Test that DGXCloudExecutor has default TTL when not specified in custom_spec"""
executor = DGXCloudExecutor(
base_url="https://dgxapi.example.com",
app_id="test_app_id",
app_secret="test_app_secret",
project_name="test_project",
container_image="nvcr.io/nvidia/test:latest",
pvc_nemo_run_dir="/workspace/nemo_run",
)

# Should have default TTL of 3600 seconds (1 hour)
assert executor.custom_spec == {"ttlSecondsAfterFinished": 3600}

def test_init_custom_spec_with_other_fields(self):
"""Test that DGXCloudExecutor can have TTL alongside other custom_spec fields"""
executor = DGXCloudExecutor(
base_url="https://dgxapi.example.com",
app_id="test_app_id",
app_secret="test_app_secret",
project_name="test_project",
container_image="nvcr.io/nvidia/test:latest",
pvc_nemo_run_dir="/workspace/nemo_run",
custom_spec={
"ttlSecondsAfterFinished": 7200,
"activeDeadlineSeconds": 14400,
"restartPolicy": "Never",
},
)

# Should have all custom_spec fields
assert executor.custom_spec["ttlSecondsAfterFinished"] == 7200
assert executor.custom_spec["activeDeadlineSeconds"] == 14400
assert executor.custom_spec["restartPolicy"] == "Never"

def test_init_override_default_ttl(self):
"""Test that DGXCloudExecutor can override default TTL with custom_spec"""
executor = DGXCloudExecutor(
base_url="https://dgxapi.example.com",
app_id="test_app_id",
app_secret="test_app_secret",
project_name="test_project",
container_image="nvcr.io/nvidia/test:latest",
pvc_nemo_run_dir="/workspace/nemo_run",
custom_spec={"restartPolicy": "OnFailure"}, # No TTL specified
)

# Should only have the specified custom_spec field, no default TTL
assert executor.custom_spec == {"restartPolicy": "OnFailure"}

@patch("requests.post")
def test_get_auth_token_success(self, mock_post):
mock_response = MagicMock()
Expand Down
48 changes: 48 additions & 0 deletions test/run/torchx_backend/schedulers/test_dgxcloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ def dgx_cloud_executor():
container_image="nvcr.io/nvidia/test:latest",
pvc_nemo_run_dir="/workspace/nemo_run",
job_dir=tempfile.mkdtemp(),
custom_spec={"ttlSecondsAfterFinished": 7200},
)


Expand Down Expand Up @@ -160,3 +161,50 @@ def test_save_and_get_job_dirs():

assert "test_app_id" in job_dirs
assert isinstance(job_dirs["test_app_id"]["executor"], DGXCloudExecutor)


def test_dgx_cloud_executor_ttl_configuration():
"""Test that DGXCloudExecutor properly handles TTL configuration via custom_spec"""
# Test with custom TTL in custom_spec
executor_with_ttl = DGXCloudExecutor(
base_url="https://dgx.example.com",
app_id="test_app_id",
app_secret="test_secret",
project_name="test_project",
container_image="nvcr.io/nvidia/test:latest",
pvc_nemo_run_dir="/workspace/nemo_run",
job_dir=tempfile.mkdtemp(),
custom_spec={"ttlSecondsAfterFinished": 7200},
)
assert executor_with_ttl.custom_spec["ttlSecondsAfterFinished"] == 7200

# Test with default TTL (should have default 3600 seconds)
executor_default_ttl = DGXCloudExecutor(
base_url="https://dgx.example.com",
app_id="test_app_id",
app_secret="test_secret",
project_name="test_project",
container_image="nvcr.io/nvidia/test:latest",
pvc_nemo_run_dir="/workspace/nemo_run",
job_dir=tempfile.mkdtemp(),
)
assert executor_default_ttl.custom_spec == {"ttlSecondsAfterFinished": 3600}

# Test with TTL and other custom_spec fields
executor_mixed_spec = DGXCloudExecutor(
base_url="https://dgx.example.com",
app_id="test_app_id",
app_secret="test_secret",
project_name="test_project",
container_image="nvcr.io/nvidia/test:latest",
pvc_nemo_run_dir="/workspace/nemo_run",
job_dir=tempfile.mkdtemp(),
custom_spec={
"ttlSecondsAfterFinished": 3600,
"activeDeadlineSeconds": 7200,
"restartPolicy": "OnFailure",
},
)
assert executor_mixed_spec.custom_spec["ttlSecondsAfterFinished"] == 3600
assert executor_mixed_spec.custom_spec["activeDeadlineSeconds"] == 7200
assert executor_mixed_spec.custom_spec["restartPolicy"] == "OnFailure"
Loading