From 21fe7cee8f83402845cc8fce9e18ba39a33118fc Mon Sep 17 00:00:00 2001 From: Rajesh Thummala Date: Fri, 3 Apr 2026 12:42:42 -0500 Subject: [PATCH 01/15] Chnages related ro Rocm module load change --- cvs/input/cluster_file/cluster.json | 4 + cvs/lib/parallel_ssh_lib.py | 97 +++++++++++-------- cvs/tests/health/agfhc_cvs.py | 3 +- cvs/tests/health/csp_qual_agfhc.py | 3 +- cvs/tests/health/install/install_agfhc.py | 3 +- .../health/install/install_babelstream.py | 3 +- cvs/tests/health/install/install_rocblas.py | 3 +- cvs/tests/health/install/install_rvs.py | 6 +- .../health/install/install_transferbench.py | 6 +- cvs/tests/health/rvs_cvs.py | 3 +- cvs/tests/health/transferbench_cvs.py | 3 +- cvs/tests/ibperf/ib_perf_bw_test.py | 6 +- cvs/tests/ibperf/install_ibperf_tools.py | 6 +- .../inferencemax_gpt_oss_120b_single.py | 6 +- .../pytorch_xdit_flux1_dev_single.py | 3 + .../pytorch_xdit_wan22_14b_single.py | 3 + .../sglang_deepseek_r1_671b_distributed.py | 12 ++- .../sglang/sglang_llama_70b_distributed.py | 12 ++- .../vllm/vllm_deepseek31_685b_single.py | 6 +- .../vllm/vllm_gpt_oss_120b_single.py | 6 +- .../inference/vllm/vllm_qwen3_235b_single.py | 6 +- .../inference/vllm/vllm_qwen3_80b_single.py | 6 +- cvs/tests/mori/mori_benchmark_test.py | 3 +- cvs/tests/platform/host_configs_cvs.py | 3 +- cvs/tests/rccl/rccl_heatmap_cvs.py | 6 +- cvs/tests/rccl/rccl_multinode_cvs.py | 6 +- cvs/tests/rccl/rccl_multinode_default_cvs.py | 6 +- cvs/tests/rccl/rccl_singlenode_cvs.py | 6 +- .../jax/jax_llama3_1_405b_distributed.py | 3 +- .../jax/jax_llama3_1_70b_distributed.py | 3 +- .../training/jax/jax_llama3_1_70b_single.py | 3 +- .../megatron_llama3_1_70b_distributed.py | 3 +- .../megatron/megatron_llama3_1_70b_single.py | 3 +- .../megatron_llama3_1_8b_distributed.py | 3 +- .../megatron/megatron_llama3_1_8b_single.py | 3 +- 35 files changed, 165 insertions(+), 92 deletions(-) diff --git a/cvs/input/cluster_file/cluster.json b/cvs/input/cluster_file/cluster.json index 9fa7d18f..c769e7ac 100644 --- a/cvs/input/cluster_file/cluster.json +++ b/cvs/input/cluster_file/cluster.json @@ -12,6 +12,10 @@ { "mgmt_ip": "{xx.xx.xx.xx|hostname}" }, + "env_vars": + { + "ROCM":"/shared/apps/ubuntu/opt/rocm-7.11.0/bin/" + }, "node_dict": { "{xx.xx.xx.xx|hostname}": diff --git a/cvs/lib/parallel_ssh_lib.py b/cvs/lib/parallel_ssh_lib.py index f11e8093..efca7471 100755 --- a/cvs/lib/parallel_ssh_lib.py +++ b/cvs/lib/parallel_ssh_lib.py @@ -4,30 +4,30 @@ The year included in the foregoing notice is the year of creation of the work. All code contained here is Property of Advanced Micro Devices, Inc. ''' - + from __future__ import print_function from pssh.clients import ParallelSSHClient from pssh.exceptions import Timeout, ConnectionError, SessionError - + import time - + # Following used only for scp of file import paramiko from paramiko import SSHClient from scp import SCPClient - - + + class Pssh: """ ParallelSessions - Uses the pssh library that is based of Paramiko, that lets you take multiple parallel ssh sessions to hosts and execute commands. - + Input host_config should be in this format .. mandatory args = user, password (or) 'private_key': load_private_key('my_key.pem') """ - + def __init__( - self, log, host_list, user=None, password=None, pkey='id_rsa', host_key_check=False, stop_on_errors=True + self, log, host_list, user=None, password=None, pkey='id_rsa', host_key_check=False, stop_on_errors=True, env_vars=None ): self.log = log self.host_list = host_list @@ -38,7 +38,8 @@ def __init__( self.host_key_check = host_key_check self.stop_on_errors = stop_on_errors self.unreachable_hosts = [] - + self.env_prefix = f"export PATH={':'.join(env_vars.values())}:$PATH" if env_vars else None + if self.password is None: print(self.reachable_hosts) print(self.user) @@ -48,7 +49,7 @@ def __init__( self.client = ParallelSSHClient( self.reachable_hosts, user=self.user, password=self.password, keepalive_seconds=30 ) - + def check_connectivity(self, hosts): """ Check connectivity for a list of hosts using one ParallelSSHClient. @@ -67,11 +68,11 @@ def check_connectivity(self, hosts): output = temp_client.run_command('echo 1', stop_on_errors=False, read_timeout=2) unreachable = [item.host for item in output if item.exception] return unreachable - + def prune_unreachable_hosts(self, output): """ Prune unreachable hosts from self.reachable_hosts if they have ConnectionError or Timeout exceptions and also fail connectivity check. - + Targeted pruning: Only ConnectionError and Timeout exceptions trigger pruning to avoid removing hosts for transient failures like authentication errors or SSH protocol issues, which may succeed on next try. ConnectionErrors and Timeouts are indicative of potential unreachability, so we perform an additional connectivity check before pruning. This ensures @@ -98,7 +99,7 @@ def prune_unreachable_hosts(self, output): self.client = ParallelSSHClient( self.reachable_hosts, user=self.user, password=self.password, keepalive_seconds=30 ) - + def inform_unreachability(self, cmd_output): """ Update cmd_output with "Host Unreachable" for all hosts in self.unreachable_hosts. @@ -106,7 +107,7 @@ def inform_unreachability(self, cmd_output): """ for host in self.unreachable_hosts: cmd_output[host] = cmd_output.get(host, "") + "\nABORT: Host Unreachable Error" - + def _process_output(self, output, cmd=None, cmd_list=None, print_console=True): """ Helper method to process output from run_command, collect results, and handle pruning. @@ -147,13 +148,13 @@ def _process_output(self, output, cmd=None, cmd_list=None, print_console=True): if cmd_list: i += 1 cmd_output[item.host] = cmd_out_str - + if not self.stop_on_errors: self.prune_unreachable_hosts(output) self.inform_unreachability(cmd_output) - + return cmd_output - + def _handle_timeout_exception(self, output, e): """ Helper method to handle Timeout exceptions by setting exceptions for all hosts in output. @@ -163,48 +164,60 @@ def _handle_timeout_exception(self, output, e): for item in output: if item.exception is None: item.exception = e - + def exec(self, cmd, timeout=None, print_console=True): """ Returns a dictionary of host as key and command output as values """ - print(f'cmd = {cmd}') - + if self.env_prefix: + full_cmd = f"{self.env_prefix} ; {cmd}" + else: + full_cmd = cmd + + print(f'cmd = {full_cmd}') + # Log command execution if self.log: if timeout is not None: - self.log.debug(f"Executing command on {len(self.reachable_hosts)} host(s) [timeout={timeout}s]: {cmd}") + self.log.debug( + f"Executing command on {len(self.reachable_hosts)} host(s) [timeout={timeout}s]: {full_cmd}" + ) else: - self.log.debug(f"Executing command on {len(self.reachable_hosts)} host(s): {cmd}") - + self.log.debug(f"Executing command on {len(self.reachable_hosts)} host(s): {full_cmd}") + if timeout is None: - output = self.client.run_command(cmd, stop_on_errors=self.stop_on_errors) + output = self.client.run_command(full_cmd, stop_on_errors=self.stop_on_errors) else: - output = self.client.run_command(cmd, read_timeout=timeout, stop_on_errors=self.stop_on_errors) - cmd_output = self._process_output(output, cmd=cmd, print_console=print_console) - + output = self.client.run_command(full_cmd, read_timeout=timeout, stop_on_errors=self.stop_on_errors) + cmd_output = self._process_output(output, cmd=full_cmd, print_console=print_console) + # Log per-host execution completion if self.log: for host in cmd_output.keys(): self.log.debug(f"Command completed on {host}: {cmd}") - + return cmd_output - + def exec_cmd_list(self, cmd_list, timeout=None, print_console=True): """ Run different commands on different hosts compared to to exec which runs the same command on all hosts. Returns a dictionary of host as key and command output as values """ + if self.env_prefix: + cmd_list = [f"{self.env_prefix} ; {cmd}" for cmd in cmd_list] + else: + cmd_list = cmd_list + print(cmd_list) - + # Log command list execution if self.log: if timeout is not None: self.log.debug(f"Executing command list on {len(self.reachable_hosts)} host(s) [timeout={timeout}s]") else: self.log.debug(f"Executing command list on {len(self.reachable_hosts)} host(s)") - + if timeout is None: output = self.client.run_command('%s', host_args=cmd_list, stop_on_errors=self.stop_on_errors) else: @@ -212,14 +225,14 @@ def exec_cmd_list(self, cmd_list, timeout=None, print_console=True): '%s', host_args=cmd_list, read_timeout=timeout, stop_on_errors=self.stop_on_errors ) cmd_output = self._process_output(output, cmd_list=cmd_list, print_console=print_console) - + # Log per-host command execution if self.log: for host, cmd in zip(self.reachable_hosts, cmd_list): self.log.debug(f"Command on {host}: {cmd}") - + return cmd_output - + def scp_file(self, local_file, remote_file, recurse=False): print('About to copy local file {} to remote {} on all Hosts'.format(local_file, remote_file)) cmds = self.client.copy_file(local_file, remote_file, recurse=recurse) @@ -230,16 +243,16 @@ def scp_file(self, local_file, remote_file, recurse=False): except IOError: raise Exception("Expected IOError exception, got none") return - + def reboot_connections(self): print('Rebooting Connections') self.client.run_command('reboot -f', stop_on_errors=self.stop_on_errors) - + def destroy_clients(self): print('Destroying Current phdl connections ..') del self.client - - + + def scp(src, dst, srcusername, srcpassword, dstusername=None, dstpassword=None): """ This method gets/puts files from one server to another @@ -253,7 +266,7 @@ def scp(src, dst, srcusername, srcpassword, dstusername=None, dstpassword=None): To copy remote file '/tmp/x' from 1.1.1.1 to remote server 1.1.1.2 '/home/user/x' scp('1.1.1.1:/tmp/x','1.1.1.2:/home/user/x','root','docker','root','docker') """ - + ssh = SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh.load_system_host_keys() @@ -263,7 +276,7 @@ def scp(src, dst, srcusername, srcpassword, dstusername=None, dstpassword=None): get_put = 1 srcip = None dstip = None - + if len(srclist) == 2: srcip = srclist[0] srcfile = srclist[1] @@ -271,7 +284,7 @@ def scp(src, dst, srcusername, srcpassword, dstusername=None, dstpassword=None): get_put = 0 else: srcfile = srclist[0] - + if len(dstlist) == 2: dstip = dstlist[0] dstfile = dstlist[1] @@ -300,4 +313,4 @@ def scp(src, dst, srcusername, srcpassword, dstusername=None, dstpassword=None): time.sleep(1) ssh.exec_command('ssh-keyscan %s >> ~/.ssh/known_hosts' % (dstip)) time.sleep(1) - ssh.exec_command('sshpass -p %s scp %s %s@%s:%s' % (dstpassword, srcfile, dstusername, dstip, dstfile)) + ssh.exec_command('sshpass -p %s scp %s %s@%s:%s' % (dstpassword, srcfile, dstusername, dstip, dstfile)) \ No newline at end of file diff --git a/cvs/tests/health/agfhc_cvs.py b/cvs/tests/health/agfhc_cvs.py index 5bb18ccc..7f7d3f3f 100644 --- a/cvs/tests/health/agfhc_cvs.py +++ b/cvs/tests/health/agfhc_cvs.py @@ -119,8 +119,9 @@ def phdl(cluster_dict): Pssh: A handle to execute commands across all nodes. """ print(cluster_dict) + env_vars = cluster_dict.get("env_vars") node_list = list(cluster_dict['node_dict'].keys()) - phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'], stop_on_errors=False) + phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'], stop_on_errors=False, env_vars=env_vars) return phdl diff --git a/cvs/tests/health/csp_qual_agfhc.py b/cvs/tests/health/csp_qual_agfhc.py index 97f37b3c..46b1173c 100644 --- a/cvs/tests/health/csp_qual_agfhc.py +++ b/cvs/tests/health/csp_qual_agfhc.py @@ -142,8 +142,9 @@ def phdl(cluster_dict): Pssh: A handle to execute commands across all nodes. """ print(cluster_dict) + env_vars = cluster_dict.get("env_vars") node_list = list(cluster_dict['node_dict'].keys()) - phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file']) + phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'], env_vars=env_vars) return phdl diff --git a/cvs/tests/health/install/install_agfhc.py b/cvs/tests/health/install/install_agfhc.py index 112f2734..70f43575 100644 --- a/cvs/tests/health/install/install_agfhc.py +++ b/cvs/tests/health/install/install_agfhc.py @@ -123,8 +123,9 @@ def phdl(cluster_dict): Pssh: A handle to execute commands across all nodes. """ print(cluster_dict) + env_vars = cluster_dict.get("env_vars") node_list = list(cluster_dict['node_dict'].keys()) - phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file']) + phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'], env_vars=env_vars) return phdl diff --git a/cvs/tests/health/install/install_babelstream.py b/cvs/tests/health/install/install_babelstream.py index f028a72c..72b0a872 100644 --- a/cvs/tests/health/install/install_babelstream.py +++ b/cvs/tests/health/install/install_babelstream.py @@ -138,8 +138,9 @@ def phdl(cluster_dict): """ print(cluster_dict) + env_vars = cluster_dict.get("env_vars") node_list = list(cluster_dict['node_dict'].keys()) - phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file']) + phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'], env_vars=env_vars) return phdl diff --git a/cvs/tests/health/install/install_rocblas.py b/cvs/tests/health/install/install_rocblas.py index 905d0c3a..1a9fd80e 100644 --- a/cvs/tests/health/install/install_rocblas.py +++ b/cvs/tests/health/install/install_rocblas.py @@ -133,8 +133,9 @@ def phdl(cluster_dict): """ print(cluster_dict) + env_vars = cluster_dict.get("env_vars") node_list = list(cluster_dict['node_dict'].keys()) - phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file']) + phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'], env_vars=env_vars) return phdl diff --git a/cvs/tests/health/install/install_rvs.py b/cvs/tests/health/install/install_rvs.py index 326d0afa..d2db9020 100644 --- a/cvs/tests/health/install/install_rvs.py +++ b/cvs/tests/health/install/install_rvs.py @@ -183,8 +183,9 @@ def shdl(cluster_dict): - Module scope ensures a single connection context for the duration of the module. """ node_list = list(cluster_dict['node_dict'].keys()) + env_vars = cluster_dict.get("env_vars") head_node = node_list[0] - shdl = Pssh(log, [head_node], user=cluster_dict['username'], pkey=cluster_dict['priv_key_file']) + shdl = Pssh(log, [head_node], user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'], env_vars=env_vars) return shdl @@ -204,8 +205,9 @@ def phdl(cluster_dict): Pssh: Handle that executes commands on all nodes and returns dict[node] -> output. """ print(cluster_dict) + env_vars = cluster_dict.get("env_vars") node_list = list(cluster_dict['node_dict'].keys()) - phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file']) + phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'], env_vars=env_vars) return phdl diff --git a/cvs/tests/health/install/install_transferbench.py b/cvs/tests/health/install/install_transferbench.py index 3fb0dec5..2cee070a 100644 --- a/cvs/tests/health/install/install_transferbench.py +++ b/cvs/tests/health/install/install_transferbench.py @@ -187,8 +187,9 @@ def shdl(cluster_dict): - nhdl_dict is currently unused; it can be removed unless used elsewhere. """ node_list = list(cluster_dict['node_dict'].keys()) + env_vars = cluster_dict.get("env_vars") head_node = node_list[0] - shdl = Pssh(log, [head_node], user=cluster_dict['username'], pkey=cluster_dict['priv_key_file']) + shdl = Pssh(log, [head_node], user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'], env_vars=env_vars) return shdl @@ -209,8 +210,9 @@ def phdl(cluster_dict): """ print(cluster_dict) + env_vars = cluster_dict.get("env_vars") node_list = list(cluster_dict['node_dict'].keys()) - phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file']) + phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'], env_vars=env_vars) return phdl diff --git a/cvs/tests/health/rvs_cvs.py b/cvs/tests/health/rvs_cvs.py index 9df7df22..35bc621c 100644 --- a/cvs/tests/health/rvs_cvs.py +++ b/cvs/tests/health/rvs_cvs.py @@ -60,8 +60,9 @@ def config_dict(config_file, cluster_dict): @pytest.fixture(scope="module") def phdl(cluster_dict): print(cluster_dict) + env_vars = cluster_dict.get("env_vars") node_list = list(cluster_dict['node_dict'].keys()) - phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'], stop_on_errors=False) + phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'], stop_on_errors=False, env_vars=env_vars) return phdl diff --git a/cvs/tests/health/transferbench_cvs.py b/cvs/tests/health/transferbench_cvs.py index 75e098f5..e5cb8a2f 100644 --- a/cvs/tests/health/transferbench_cvs.py +++ b/cvs/tests/health/transferbench_cvs.py @@ -251,8 +251,9 @@ def parse_tb_example_test_results(out_dict, exp_dict): @pytest.fixture(scope="module") def phdl(cluster_dict): print(cluster_dict) + env_vars = cluster_dict.get("env_vars") node_list = list(cluster_dict['node_dict'].keys()) - phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file']) + phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'], env_vars=env_vars) return phdl diff --git a/cvs/tests/ibperf/ib_perf_bw_test.py b/cvs/tests/ibperf/ib_perf_bw_test.py index 59be244f..9139fae0 100644 --- a/cvs/tests/ibperf/ib_perf_bw_test.py +++ b/cvs/tests/ibperf/ib_perf_bw_test.py @@ -133,10 +133,11 @@ def phdl(cluster_dict): - Assumes Pssh(log, node_list, user=..., pkey=...) is available in scope. """ print(cluster_dict) + env_vars = cluster_dict.get("env_vars") node_list = list(cluster_dict['node_dict'].keys()) if len(node_list) % 2 != 0: node_list.pop() - phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file']) + phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'], env_vars=env_vars) return phdl @@ -157,8 +158,9 @@ def shdl(cluster_dict): - nhdl_dict is currently unused; it can be removed unless used elsewhere. """ node_list = list(cluster_dict['node_dict'].keys()) + env_vars = cluster_dict.get("env_vars") head_node = node_list[0] - shdl = Pssh(log, [head_node], user=cluster_dict['username'], pkey=cluster_dict['priv_key_file']) + shdl = Pssh(log, [head_node], user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'], env_vars=env_vars) return shdl diff --git a/cvs/tests/ibperf/install_ibperf_tools.py b/cvs/tests/ibperf/install_ibperf_tools.py index eef40003..4aadea93 100644 --- a/cvs/tests/ibperf/install_ibperf_tools.py +++ b/cvs/tests/ibperf/install_ibperf_tools.py @@ -131,10 +131,11 @@ def phdl(cluster_dict): - Assumes Pssh(log, node_list, user=..., pkey=...) is available in scope. """ print(cluster_dict) + env_vars = cluster_dict.get("env_vars") node_list = list(cluster_dict['node_dict'].keys()) if len(node_list) % 2 != 0: node_list.pop() - phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file']) + phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'], env_vars=env_vars) return phdl @@ -155,8 +156,9 @@ def shdl(cluster_dict): - nhdl_dict is currently unused; it can be removed unless used elsewhere. """ node_list = list(cluster_dict['node_dict'].keys()) + env_vars = cluster_dict.get("env_vars") head_node = node_list[0] - shdl = Pssh(log, [head_node], user=cluster_dict['username'], pkey=cluster_dict['priv_key_file']) + shdl = Pssh(log, [head_node], user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'], env_vars=env_vars) return shdl diff --git a/cvs/tests/inference/inferencemax/inferencemax_gpt_oss_120b_single.py b/cvs/tests/inference/inferencemax/inferencemax_gpt_oss_120b_single.py index 9658bf19..a2ec8520 100644 --- a/cvs/tests/inference/inferencemax/inferencemax_gpt_oss_120b_single.py +++ b/cvs/tests/inference/inferencemax/inferencemax_gpt_oss_120b_single.py @@ -154,8 +154,9 @@ def s_phdl(cluster_dict): - This fixture has module scope, so a single connection handle is reused for all tests in the module. """ print(cluster_dict) + env_vars = cluster_dict.get("env_vars") node_list = list(cluster_dict['node_dict'].keys()) - s_phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file']) + s_phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'], env_vars=env_vars) return s_phdl @@ -181,8 +182,9 @@ def c_phdl(cluster_dict): - This fixture has module scope, so a single connection handle is reused for all tests in the module. """ print(cluster_dict) + env_vars = cluster_dict.get("env_vars") node_list = list(cluster_dict['node_dict'].keys()) - c_phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file']) + c_phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'], env_vars=env_vars) return c_phdl diff --git a/cvs/tests/inference/pytorch_xdit/pytorch_xdit_flux1_dev_single.py b/cvs/tests/inference/pytorch_xdit/pytorch_xdit_flux1_dev_single.py index 8ca2a92f..a1c94644 100644 --- a/cvs/tests/inference/pytorch_xdit/pytorch_xdit_flux1_dev_single.py +++ b/cvs/tests/inference/pytorch_xdit/pytorch_xdit_flux1_dev_single.py @@ -274,6 +274,7 @@ def hf_token(inference_dict): def s_phdl(cluster_dict): """Create and return a command execution handle for all cluster nodes.""" node_list = list(cluster_dict['node_dict'].keys()) + env_vars = cluster_dict.get("env_vars") # Single-node mode: execute locally ONLY when the target actually refers to this machine. # @@ -292,6 +293,7 @@ def s_phdl(cluster_dict): user=cluster_dict.get("username"), password=cluster_dict.get("password"), pkey=cluster_dict.get("priv_key_file"), + env_vars=env_vars, ) log.info(f"Using parallel-ssh execution mode for {len(node_list)} node(s)") @@ -301,6 +303,7 @@ def s_phdl(cluster_dict): user=cluster_dict.get('username'), password=cluster_dict.get('password'), pkey=cluster_dict.get('priv_key_file'), + env_vars=env_vars, ) diff --git a/cvs/tests/inference/pytorch_xdit/pytorch_xdit_wan22_14b_single.py b/cvs/tests/inference/pytorch_xdit/pytorch_xdit_wan22_14b_single.py index 330cb0d3..1f848625 100644 --- a/cvs/tests/inference/pytorch_xdit/pytorch_xdit_wan22_14b_single.py +++ b/cvs/tests/inference/pytorch_xdit/pytorch_xdit_wan22_14b_single.py @@ -251,6 +251,7 @@ def hf_token(inference_dict): def s_phdl(cluster_dict): """Create and return a command execution handle for all cluster nodes.""" node_list = list(cluster_dict['node_dict'].keys()) + env_vars = cluster_dict.get("env_vars") # Single-node mode: execute locally ONLY when the target actually refers to this machine. # @@ -269,6 +270,7 @@ def s_phdl(cluster_dict): user=cluster_dict.get("username"), password=cluster_dict.get("password"), pkey=cluster_dict.get("priv_key_file"), + env_vars=env_vars, ) log.info(f"Using parallel-ssh execution mode for {len(node_list)} node(s)") @@ -278,6 +280,7 @@ def s_phdl(cluster_dict): user=cluster_dict.get('username'), password=cluster_dict.get('password'), pkey=cluster_dict.get('priv_key_file'), + env_vars=env_vars, ) diff --git a/cvs/tests/inference/sglang/sglang_deepseek_r1_671b_distributed.py b/cvs/tests/inference/sglang/sglang_deepseek_r1_671b_distributed.py index 2b27a03a..44e754d5 100644 --- a/cvs/tests/inference/sglang/sglang_deepseek_r1_671b_distributed.py +++ b/cvs/tests/inference/sglang/sglang_deepseek_r1_671b_distributed.py @@ -135,16 +135,18 @@ def hf_token(inference_dict): @pytest.fixture(scope="module") def p_phdl(cluster_dict, inference_dict): print(cluster_dict) + env_vars = cluster_dict.get("env_vars") p_phdl = Pssh( - log, inference_dict['prefill_node_list'], user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'] + log, inference_dict['prefill_node_list'], user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'], env_vars=env_vars ) return p_phdl @pytest.fixture(scope="module") def d_phdl(cluster_dict, inference_dict): + env_vars = cluster_dict.get("env_vars") d_phdl = Pssh( - log, inference_dict['decode_node_list'], user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'] + log, inference_dict['decode_node_list'], user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'], env_vars=env_vars ) return d_phdl @@ -152,16 +154,18 @@ def d_phdl(cluster_dict, inference_dict): @pytest.fixture(scope="module") def r_phdl(cluster_dict, inference_dict): node_list = [] + env_vars = cluster_dict.get("env_vars") node_list.append(inference_dict['proxy_router_node']) - r_phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file']) + r_phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'], env_vars=env_vars) return r_phdl @pytest.fixture(scope="module") def b_phdl(cluster_dict, inference_dict): node_list = [] + env_vars = cluster_dict.get("env_vars") node_list.append(inference_dict['benchmark_serv_node']) - b_phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file']) + b_phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'], env_vars=env_vars) return b_phdl diff --git a/cvs/tests/inference/sglang/sglang_llama_70b_distributed.py b/cvs/tests/inference/sglang/sglang_llama_70b_distributed.py index 595fd649..e8cbb44d 100644 --- a/cvs/tests/inference/sglang/sglang_llama_70b_distributed.py +++ b/cvs/tests/inference/sglang/sglang_llama_70b_distributed.py @@ -135,16 +135,18 @@ def hf_token(inference_dict): @pytest.fixture(scope="module") def p_phdl(cluster_dict, inference_dict): print(cluster_dict) + env_vars = cluster_dict.get("env_vars") p_phdl = Pssh( - log, inference_dict['prefill_node_list'], user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'] + log, inference_dict['prefill_node_list'], user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'], env_vars=env_vars ) return p_phdl @pytest.fixture(scope="module") def d_phdl(cluster_dict, inference_dict): + env_vars = cluster_dict.get("env_vars") d_phdl = Pssh( - log, inference_dict['decode_node_list'], user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'] + log, inference_dict['decode_node_list'], user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'], env_vars=env_vars ) return d_phdl @@ -152,16 +154,18 @@ def d_phdl(cluster_dict, inference_dict): @pytest.fixture(scope="module") def r_phdl(cluster_dict, inference_dict): node_list = [] + env_vars = cluster_dict.get("env_vars") node_list.append(inference_dict['proxy_router_node']) - r_phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file']) + r_phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'], env_vars=env_vars) return r_phdl @pytest.fixture(scope="module") def b_phdl(cluster_dict, inference_dict): node_list = [] + env_vars = cluster_dict.get("env_vars") node_list.append(inference_dict['benchmark_serv_node']) - b_phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file']) + b_phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'], env_vars=env_vars) return b_phdl diff --git a/cvs/tests/inference/vllm/vllm_deepseek31_685b_single.py b/cvs/tests/inference/vllm/vllm_deepseek31_685b_single.py index 8f6e06eb..4373751a 100644 --- a/cvs/tests/inference/vllm/vllm_deepseek31_685b_single.py +++ b/cvs/tests/inference/vllm/vllm_deepseek31_685b_single.py @@ -244,8 +244,9 @@ def s_phdl(cluster_dict): - This fixture has module scope, so a single connection handle is reused for all tests in the module. """ print(cluster_dict) + env_vars = cluster_dict.get("env_vars") node_list = list(cluster_dict['node_dict'].keys()) - s_phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file']) + s_phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'], env_vars=env_vars) return s_phdl @@ -271,8 +272,9 @@ def c_phdl(cluster_dict): - This fixture has module scope, so a single connection handle is reused for all tests in the module. """ print(cluster_dict) + env_vars = cluster_dict.get("env_vars") node_list = list(cluster_dict['node_dict'].keys()) - c_phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file']) + c_phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'], env_vars=env_vars) return c_phdl diff --git a/cvs/tests/inference/vllm/vllm_gpt_oss_120b_single.py b/cvs/tests/inference/vllm/vllm_gpt_oss_120b_single.py index 37726453..6a0e83c6 100644 --- a/cvs/tests/inference/vllm/vllm_gpt_oss_120b_single.py +++ b/cvs/tests/inference/vllm/vllm_gpt_oss_120b_single.py @@ -243,8 +243,9 @@ def s_phdl(cluster_dict): - This fixture has module scope, so a single connection handle is reused for all tests in the module. """ print(cluster_dict) + env_vars = cluster_dict.get("env_vars") node_list = list(cluster_dict['node_dict'].keys()) - s_phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file']) + s_phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'], env_vars=env_vars) return s_phdl @@ -270,8 +271,9 @@ def c_phdl(cluster_dict): - This fixture has module scope, so a single connection handle is reused for all tests in the module. """ print(cluster_dict) + env_vars = cluster_dict.get("env_vars") node_list = list(cluster_dict['node_dict'].keys()) - c_phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file']) + c_phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'], env_vars=env_vars) return c_phdl diff --git a/cvs/tests/inference/vllm/vllm_qwen3_235b_single.py b/cvs/tests/inference/vllm/vllm_qwen3_235b_single.py index c419b617..cb0d6d3a 100644 --- a/cvs/tests/inference/vllm/vllm_qwen3_235b_single.py +++ b/cvs/tests/inference/vllm/vllm_qwen3_235b_single.py @@ -243,8 +243,9 @@ def s_phdl(cluster_dict): - This fixture has module scope, so a single connection handle is reused for all tests in the module. """ print(cluster_dict) + env_vars = cluster_dict.get("env_vars") node_list = list(cluster_dict['node_dict'].keys()) - s_phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file']) + s_phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'], env_vars=env_vars) return s_phdl @@ -270,8 +271,9 @@ def c_phdl(cluster_dict): - This fixture has module scope, so a single connection handle is reused for all tests in the module. """ print(cluster_dict) + env_vars = cluster_dict.get("env_vars") node_list = list(cluster_dict['node_dict'].keys()) - c_phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file']) + c_phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'], env_vars=env_vars) return c_phdl diff --git a/cvs/tests/inference/vllm/vllm_qwen3_80b_single.py b/cvs/tests/inference/vllm/vllm_qwen3_80b_single.py index 7363fe43..66f1bfa3 100644 --- a/cvs/tests/inference/vllm/vllm_qwen3_80b_single.py +++ b/cvs/tests/inference/vllm/vllm_qwen3_80b_single.py @@ -243,8 +243,9 @@ def s_phdl(cluster_dict): - This fixture has module scope, so a single connection handle is reused for all tests in the module. """ print(cluster_dict) + env_vars = cluster_dict.get("env_vars") node_list = list(cluster_dict['node_dict'].keys()) - s_phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file']) + s_phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'], env_vars=env_vars) return s_phdl @@ -270,8 +271,9 @@ def c_phdl(cluster_dict): - This fixture has module scope, so a single connection handle is reused for all tests in the module. """ print(cluster_dict) + env_vars = cluster_dict.get("env_vars") node_list = list(cluster_dict['node_dict'].keys()) - c_phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file']) + c_phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'], env_vars=env_vars) return c_phdl diff --git a/cvs/tests/mori/mori_benchmark_test.py b/cvs/tests/mori/mori_benchmark_test.py index 7ad10e3c..83fb2101 100644 --- a/cvs/tests/mori/mori_benchmark_test.py +++ b/cvs/tests/mori/mori_benchmark_test.py @@ -84,8 +84,9 @@ def mori_dict(mori_config_file, cluster_dict): @pytest.fixture(scope="module") def phdl(cluster_dict): print(cluster_dict) + env_vars = cluster_dict.get("env_vars") node_list = list(cluster_dict['node_dict'].keys()) - phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file']) + phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'], env_vars=env_vars) return phdl diff --git a/cvs/tests/platform/host_configs_cvs.py b/cvs/tests/platform/host_configs_cvs.py index 0b93b4a8..67a9ca59 100644 --- a/cvs/tests/platform/host_configs_cvs.py +++ b/cvs/tests/platform/host_configs_cvs.py @@ -114,8 +114,9 @@ def phdl(cluster_dict): - Assumes Pssh is available in scope and accepts (log, node_list, user, pkey) in its constructor. """ print(cluster_dict) + env_vars = cluster_dict.get("env_vars") node_list = list(cluster_dict['node_dict'].keys()) - phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file']) + phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'], env_vars=env_vars) return phdl diff --git a/cvs/tests/rccl/rccl_heatmap_cvs.py b/cvs/tests/rccl/rccl_heatmap_cvs.py index a467b980..65e251c9 100644 --- a/cvs/tests/rccl/rccl_heatmap_cvs.py +++ b/cvs/tests/rccl/rccl_heatmap_cvs.py @@ -141,8 +141,9 @@ def phdl(cluster_dict): - Assumes Pssh(log, node_list, user=..., pkey=...) is available in scope. """ print(cluster_dict) + env_vars = cluster_dict.get("env_vars") node_list = list(cluster_dict['node_dict'].keys()) - phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file']) + phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'], env_vars=env_vars) return phdl @@ -163,8 +164,9 @@ def shdl(cluster_dict): - nhdl_dict is currently unused; it can be removed unless used elsewhere. """ node_list = list(cluster_dict['node_dict'].keys()) + env_vars = cluster_dict.get("env_vars") head_node = node_list[0] - shdl = Pssh(log, [head_node], user=cluster_dict['username'], pkey=cluster_dict['priv_key_file']) + shdl = Pssh(log, [head_node], user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'], env_vars=env_vars) return shdl diff --git a/cvs/tests/rccl/rccl_multinode_cvs.py b/cvs/tests/rccl/rccl_multinode_cvs.py index 571003f3..43b8c017 100644 --- a/cvs/tests/rccl/rccl_multinode_cvs.py +++ b/cvs/tests/rccl/rccl_multinode_cvs.py @@ -136,8 +136,9 @@ def phdl(cluster_dict): - Assumes Pssh(log, node_list, user=..., pkey=...) is available in scope. """ print(cluster_dict) + env_vars = cluster_dict.get("env_vars") node_list = list(cluster_dict['node_dict'].keys()) - phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file']) + phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'], env_vars=env_vars) return phdl @@ -158,8 +159,9 @@ def shdl(cluster_dict): - nhdl_dict is currently unused; it can be removed unless used elsewhere. """ node_list = list(cluster_dict['node_dict'].keys()) + env_vars = cluster_dict.get("env_vars") head_node = node_list[0] - shdl = Pssh(log, [head_node], user=cluster_dict['username'], pkey=cluster_dict['priv_key_file']) + shdl = Pssh(log, [head_node], user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'], env_vars=env_vars) return shdl diff --git a/cvs/tests/rccl/rccl_multinode_default_cvs.py b/cvs/tests/rccl/rccl_multinode_default_cvs.py index 3d2b6461..25e987aa 100644 --- a/cvs/tests/rccl/rccl_multinode_default_cvs.py +++ b/cvs/tests/rccl/rccl_multinode_default_cvs.py @@ -138,8 +138,9 @@ def phdl(cluster_dict): - Assumes Pssh(log, node_list, user=..., pkey=...) is available in scope. """ print(cluster_dict) + env_vars = cluster_dict.get("env_vars") node_list = list(cluster_dict['node_dict'].keys()) - phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file']) + phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'], env_vars=env_vars) return phdl @@ -160,8 +161,9 @@ def shdl(cluster_dict): - nhdl_dict is currently unused; it can be removed unless used elsewhere. """ node_list = list(cluster_dict['node_dict'].keys()) + env_vars = cluster_dict.get("env_vars") head_node = node_list[0] - shdl = Pssh(log, [head_node], user=cluster_dict['username'], pkey=cluster_dict['priv_key_file']) + shdl = Pssh(log, [head_node], user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'], env_vars=env_vars) return shdl diff --git a/cvs/tests/rccl/rccl_singlenode_cvs.py b/cvs/tests/rccl/rccl_singlenode_cvs.py index ab785947..e9affdb4 100644 --- a/cvs/tests/rccl/rccl_singlenode_cvs.py +++ b/cvs/tests/rccl/rccl_singlenode_cvs.py @@ -138,8 +138,9 @@ def phdl(cluster_dict): - Assumes Pssh(log, node_list, user=..., pkey=...) is available in scope. """ print(cluster_dict) + env_vars = cluster_dict.get("env_vars") node_list = list(cluster_dict['node_dict'].keys()) - phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file']) + phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'], env_vars=env_vars) return phdl @@ -160,8 +161,9 @@ def shdl(cluster_dict): - nhdl_dict is currently unused; it can be removed unless used elsewhere. """ node_list = list(cluster_dict['node_dict'].keys()) + env_vars = cluster_dict.get("env_vars") head_node = node_list[0] - shdl = Pssh(log, [head_node], user=cluster_dict['username'], pkey=cluster_dict['priv_key_file']) + shdl = Pssh(log, [head_node], user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'], env_vars=env_vars) return shdl diff --git a/cvs/tests/training/jax/jax_llama3_1_405b_distributed.py b/cvs/tests/training/jax/jax_llama3_1_405b_distributed.py index 1f73073c..264ea934 100644 --- a/cvs/tests/training/jax/jax_llama3_1_405b_distributed.py +++ b/cvs/tests/training/jax/jax_llama3_1_405b_distributed.py @@ -177,8 +177,9 @@ def phdl(cluster_dict): """ print(cluster_dict) + env_vars = cluster_dict.get("env_vars") node_list = list(cluster_dict['node_dict'].keys()) - phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file']) + phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'], env_vars=env_vars) return phdl diff --git a/cvs/tests/training/jax/jax_llama3_1_70b_distributed.py b/cvs/tests/training/jax/jax_llama3_1_70b_distributed.py index 492f4eb7..5b157d03 100644 --- a/cvs/tests/training/jax/jax_llama3_1_70b_distributed.py +++ b/cvs/tests/training/jax/jax_llama3_1_70b_distributed.py @@ -177,8 +177,9 @@ def phdl(cluster_dict): """ print(cluster_dict) + env_vars = cluster_dict.get("env_vars") node_list = list(cluster_dict['node_dict'].keys()) - phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file']) + phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'], env_vars=env_vars) return phdl diff --git a/cvs/tests/training/jax/jax_llama3_1_70b_single.py b/cvs/tests/training/jax/jax_llama3_1_70b_single.py index 9be11272..ec26c53b 100644 --- a/cvs/tests/training/jax/jax_llama3_1_70b_single.py +++ b/cvs/tests/training/jax/jax_llama3_1_70b_single.py @@ -177,8 +177,9 @@ def phdl(cluster_dict): """ print(cluster_dict) + env_vars = cluster_dict.get("env_vars") node_list = list(cluster_dict['node_dict'].keys()) - phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file']) + phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'], env_vars=env_vars) return phdl diff --git a/cvs/tests/training/megatron/megatron_llama3_1_70b_distributed.py b/cvs/tests/training/megatron/megatron_llama3_1_70b_distributed.py index 8fdfb965..542daa59 100644 --- a/cvs/tests/training/megatron/megatron_llama3_1_70b_distributed.py +++ b/cvs/tests/training/megatron/megatron_llama3_1_70b_distributed.py @@ -180,8 +180,9 @@ def phdl(cluster_dict): - This fixture has module scope, so a single connection handle is reused for all tests in the module. """ print(cluster_dict) + env_vars = cluster_dict.get("env_vars") node_list = list(cluster_dict['node_dict'].keys()) - phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file']) + phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'], env_vars=env_vars) return phdl diff --git a/cvs/tests/training/megatron/megatron_llama3_1_70b_single.py b/cvs/tests/training/megatron/megatron_llama3_1_70b_single.py index a6326df1..a9805c25 100644 --- a/cvs/tests/training/megatron/megatron_llama3_1_70b_single.py +++ b/cvs/tests/training/megatron/megatron_llama3_1_70b_single.py @@ -178,8 +178,9 @@ def phdl(cluster_dict): - This fixture has module scope, so a single connection handle is reused for all tests in the module. """ print(cluster_dict) + env_vars = cluster_dict.get("env_vars") node_list = list(cluster_dict['node_dict'].keys()) - phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file']) + phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'], env_vars=env_vars) return phdl diff --git a/cvs/tests/training/megatron/megatron_llama3_1_8b_distributed.py b/cvs/tests/training/megatron/megatron_llama3_1_8b_distributed.py index ec89049a..40920477 100644 --- a/cvs/tests/training/megatron/megatron_llama3_1_8b_distributed.py +++ b/cvs/tests/training/megatron/megatron_llama3_1_8b_distributed.py @@ -180,8 +180,9 @@ def phdl(cluster_dict): - This fixture has module scope, so a single connection handle is reused for all tests in the module. """ print(cluster_dict) + env_vars = cluster_dict.get("env_vars") node_list = list(cluster_dict['node_dict'].keys()) - phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file']) + phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'], env_vars=env_vars) return phdl diff --git a/cvs/tests/training/megatron/megatron_llama3_1_8b_single.py b/cvs/tests/training/megatron/megatron_llama3_1_8b_single.py index 30943a10..572618aa 100644 --- a/cvs/tests/training/megatron/megatron_llama3_1_8b_single.py +++ b/cvs/tests/training/megatron/megatron_llama3_1_8b_single.py @@ -179,8 +179,9 @@ def phdl(cluster_dict): - This fixture has module scope, so a single connection handle is reused for all tests in the module. """ print(cluster_dict) + env_vars = cluster_dict.get("env_vars") node_list = list(cluster_dict['node_dict'].keys()) - phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file']) + phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'], env_vars=env_vars) return phdl From 325594c35084c3119500375eff35708a8dceb77a Mon Sep 17 00:00:00 2001 From: Rajesh Thummala Date: Fri, 3 Apr 2026 12:47:38 -0500 Subject: [PATCH 02/15] make fmt changes --- cvs/lib/parallel_ssh_lib.py | 84 ++++++++++--------- cvs/tests/health/agfhc_cvs.py | 9 +- cvs/tests/health/rvs_cvs.py | 9 +- .../sglang_deepseek_r1_671b_distributed.py | 12 ++- .../sglang/sglang_llama_70b_distributed.py | 12 ++- 5 files changed, 82 insertions(+), 44 deletions(-) diff --git a/cvs/lib/parallel_ssh_lib.py b/cvs/lib/parallel_ssh_lib.py index efca7471..3e1c2fc9 100755 --- a/cvs/lib/parallel_ssh_lib.py +++ b/cvs/lib/parallel_ssh_lib.py @@ -4,30 +4,38 @@ The year included in the foregoing notice is the year of creation of the work. All code contained here is Property of Advanced Micro Devices, Inc. ''' - + from __future__ import print_function from pssh.clients import ParallelSSHClient from pssh.exceptions import Timeout, ConnectionError, SessionError - + import time - + # Following used only for scp of file import paramiko from paramiko import SSHClient from scp import SCPClient - - + + class Pssh: """ ParallelSessions - Uses the pssh library that is based of Paramiko, that lets you take multiple parallel ssh sessions to hosts and execute commands. - + Input host_config should be in this format .. mandatory args = user, password (or) 'private_key': load_private_key('my_key.pem') """ - + def __init__( - self, log, host_list, user=None, password=None, pkey='id_rsa', host_key_check=False, stop_on_errors=True, env_vars=None + self, + log, + host_list, + user=None, + password=None, + pkey='id_rsa', + host_key_check=False, + stop_on_errors=True, + env_vars=None, ): self.log = log self.host_list = host_list @@ -39,7 +47,7 @@ def __init__( self.stop_on_errors = stop_on_errors self.unreachable_hosts = [] self.env_prefix = f"export PATH={':'.join(env_vars.values())}:$PATH" if env_vars else None - + if self.password is None: print(self.reachable_hosts) print(self.user) @@ -49,7 +57,7 @@ def __init__( self.client = ParallelSSHClient( self.reachable_hosts, user=self.user, password=self.password, keepalive_seconds=30 ) - + def check_connectivity(self, hosts): """ Check connectivity for a list of hosts using one ParallelSSHClient. @@ -68,11 +76,11 @@ def check_connectivity(self, hosts): output = temp_client.run_command('echo 1', stop_on_errors=False, read_timeout=2) unreachable = [item.host for item in output if item.exception] return unreachable - + def prune_unreachable_hosts(self, output): """ Prune unreachable hosts from self.reachable_hosts if they have ConnectionError or Timeout exceptions and also fail connectivity check. - + Targeted pruning: Only ConnectionError and Timeout exceptions trigger pruning to avoid removing hosts for transient failures like authentication errors or SSH protocol issues, which may succeed on next try. ConnectionErrors and Timeouts are indicative of potential unreachability, so we perform an additional connectivity check before pruning. This ensures @@ -99,7 +107,7 @@ def prune_unreachable_hosts(self, output): self.client = ParallelSSHClient( self.reachable_hosts, user=self.user, password=self.password, keepalive_seconds=30 ) - + def inform_unreachability(self, cmd_output): """ Update cmd_output with "Host Unreachable" for all hosts in self.unreachable_hosts. @@ -107,7 +115,7 @@ def inform_unreachability(self, cmd_output): """ for host in self.unreachable_hosts: cmd_output[host] = cmd_output.get(host, "") + "\nABORT: Host Unreachable Error" - + def _process_output(self, output, cmd=None, cmd_list=None, print_console=True): """ Helper method to process output from run_command, collect results, and handle pruning. @@ -148,13 +156,13 @@ def _process_output(self, output, cmd=None, cmd_list=None, print_console=True): if cmd_list: i += 1 cmd_output[item.host] = cmd_out_str - + if not self.stop_on_errors: self.prune_unreachable_hosts(output) self.inform_unreachability(cmd_output) - + return cmd_output - + def _handle_timeout_exception(self, output, e): """ Helper method to handle Timeout exceptions by setting exceptions for all hosts in output. @@ -164,7 +172,7 @@ def _handle_timeout_exception(self, output, e): for item in output: if item.exception is None: item.exception = e - + def exec(self, cmd, timeout=None, print_console=True): """ Returns a dictionary of host as key and command output as values @@ -173,9 +181,9 @@ def exec(self, cmd, timeout=None, print_console=True): full_cmd = f"{self.env_prefix} ; {cmd}" else: full_cmd = cmd - + print(f'cmd = {full_cmd}') - + # Log command execution if self.log: if timeout is not None: @@ -184,20 +192,20 @@ def exec(self, cmd, timeout=None, print_console=True): ) else: self.log.debug(f"Executing command on {len(self.reachable_hosts)} host(s): {full_cmd}") - + if timeout is None: output = self.client.run_command(full_cmd, stop_on_errors=self.stop_on_errors) else: output = self.client.run_command(full_cmd, read_timeout=timeout, stop_on_errors=self.stop_on_errors) cmd_output = self._process_output(output, cmd=full_cmd, print_console=print_console) - + # Log per-host execution completion if self.log: for host in cmd_output.keys(): self.log.debug(f"Command completed on {host}: {cmd}") - + return cmd_output - + def exec_cmd_list(self, cmd_list, timeout=None, print_console=True): """ Run different commands on different hosts compared to to exec @@ -208,16 +216,16 @@ def exec_cmd_list(self, cmd_list, timeout=None, print_console=True): cmd_list = [f"{self.env_prefix} ; {cmd}" for cmd in cmd_list] else: cmd_list = cmd_list - + print(cmd_list) - + # Log command list execution if self.log: if timeout is not None: self.log.debug(f"Executing command list on {len(self.reachable_hosts)} host(s) [timeout={timeout}s]") else: self.log.debug(f"Executing command list on {len(self.reachable_hosts)} host(s)") - + if timeout is None: output = self.client.run_command('%s', host_args=cmd_list, stop_on_errors=self.stop_on_errors) else: @@ -225,14 +233,14 @@ def exec_cmd_list(self, cmd_list, timeout=None, print_console=True): '%s', host_args=cmd_list, read_timeout=timeout, stop_on_errors=self.stop_on_errors ) cmd_output = self._process_output(output, cmd_list=cmd_list, print_console=print_console) - + # Log per-host command execution if self.log: for host, cmd in zip(self.reachable_hosts, cmd_list): self.log.debug(f"Command on {host}: {cmd}") - + return cmd_output - + def scp_file(self, local_file, remote_file, recurse=False): print('About to copy local file {} to remote {} on all Hosts'.format(local_file, remote_file)) cmds = self.client.copy_file(local_file, remote_file, recurse=recurse) @@ -243,16 +251,16 @@ def scp_file(self, local_file, remote_file, recurse=False): except IOError: raise Exception("Expected IOError exception, got none") return - + def reboot_connections(self): print('Rebooting Connections') self.client.run_command('reboot -f', stop_on_errors=self.stop_on_errors) - + def destroy_clients(self): print('Destroying Current phdl connections ..') del self.client - - + + def scp(src, dst, srcusername, srcpassword, dstusername=None, dstpassword=None): """ This method gets/puts files from one server to another @@ -266,7 +274,7 @@ def scp(src, dst, srcusername, srcpassword, dstusername=None, dstpassword=None): To copy remote file '/tmp/x' from 1.1.1.1 to remote server 1.1.1.2 '/home/user/x' scp('1.1.1.1:/tmp/x','1.1.1.2:/home/user/x','root','docker','root','docker') """ - + ssh = SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh.load_system_host_keys() @@ -276,7 +284,7 @@ def scp(src, dst, srcusername, srcpassword, dstusername=None, dstpassword=None): get_put = 1 srcip = None dstip = None - + if len(srclist) == 2: srcip = srclist[0] srcfile = srclist[1] @@ -284,7 +292,7 @@ def scp(src, dst, srcusername, srcpassword, dstusername=None, dstpassword=None): get_put = 0 else: srcfile = srclist[0] - + if len(dstlist) == 2: dstip = dstlist[0] dstfile = dstlist[1] @@ -313,4 +321,4 @@ def scp(src, dst, srcusername, srcpassword, dstusername=None, dstpassword=None): time.sleep(1) ssh.exec_command('ssh-keyscan %s >> ~/.ssh/known_hosts' % (dstip)) time.sleep(1) - ssh.exec_command('sshpass -p %s scp %s %s@%s:%s' % (dstpassword, srcfile, dstusername, dstip, dstfile)) \ No newline at end of file + ssh.exec_command('sshpass -p %s scp %s %s@%s:%s' % (dstpassword, srcfile, dstusername, dstip, dstfile)) diff --git a/cvs/tests/health/agfhc_cvs.py b/cvs/tests/health/agfhc_cvs.py index 7f7d3f3f..3216895e 100644 --- a/cvs/tests/health/agfhc_cvs.py +++ b/cvs/tests/health/agfhc_cvs.py @@ -121,7 +121,14 @@ def phdl(cluster_dict): print(cluster_dict) env_vars = cluster_dict.get("env_vars") node_list = list(cluster_dict['node_dict'].keys()) - phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'], stop_on_errors=False, env_vars=env_vars) + phdl = Pssh( + log, + node_list, + user=cluster_dict['username'], + pkey=cluster_dict['priv_key_file'], + stop_on_errors=False, + env_vars=env_vars, + ) return phdl diff --git a/cvs/tests/health/rvs_cvs.py b/cvs/tests/health/rvs_cvs.py index 35bc621c..9fdc9d7b 100644 --- a/cvs/tests/health/rvs_cvs.py +++ b/cvs/tests/health/rvs_cvs.py @@ -62,7 +62,14 @@ def phdl(cluster_dict): print(cluster_dict) env_vars = cluster_dict.get("env_vars") node_list = list(cluster_dict['node_dict'].keys()) - phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'], stop_on_errors=False, env_vars=env_vars) + phdl = Pssh( + log, + node_list, + user=cluster_dict['username'], + pkey=cluster_dict['priv_key_file'], + stop_on_errors=False, + env_vars=env_vars, + ) return phdl diff --git a/cvs/tests/inference/sglang/sglang_deepseek_r1_671b_distributed.py b/cvs/tests/inference/sglang/sglang_deepseek_r1_671b_distributed.py index 44e754d5..978d1b83 100644 --- a/cvs/tests/inference/sglang/sglang_deepseek_r1_671b_distributed.py +++ b/cvs/tests/inference/sglang/sglang_deepseek_r1_671b_distributed.py @@ -137,7 +137,11 @@ def p_phdl(cluster_dict, inference_dict): print(cluster_dict) env_vars = cluster_dict.get("env_vars") p_phdl = Pssh( - log, inference_dict['prefill_node_list'], user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'], env_vars=env_vars + log, + inference_dict['prefill_node_list'], + user=cluster_dict['username'], + pkey=cluster_dict['priv_key_file'], + env_vars=env_vars, ) return p_phdl @@ -146,7 +150,11 @@ def p_phdl(cluster_dict, inference_dict): def d_phdl(cluster_dict, inference_dict): env_vars = cluster_dict.get("env_vars") d_phdl = Pssh( - log, inference_dict['decode_node_list'], user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'], env_vars=env_vars + log, + inference_dict['decode_node_list'], + user=cluster_dict['username'], + pkey=cluster_dict['priv_key_file'], + env_vars=env_vars, ) return d_phdl diff --git a/cvs/tests/inference/sglang/sglang_llama_70b_distributed.py b/cvs/tests/inference/sglang/sglang_llama_70b_distributed.py index e8cbb44d..6dd7759b 100644 --- a/cvs/tests/inference/sglang/sglang_llama_70b_distributed.py +++ b/cvs/tests/inference/sglang/sglang_llama_70b_distributed.py @@ -137,7 +137,11 @@ def p_phdl(cluster_dict, inference_dict): print(cluster_dict) env_vars = cluster_dict.get("env_vars") p_phdl = Pssh( - log, inference_dict['prefill_node_list'], user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'], env_vars=env_vars + log, + inference_dict['prefill_node_list'], + user=cluster_dict['username'], + pkey=cluster_dict['priv_key_file'], + env_vars=env_vars, ) return p_phdl @@ -146,7 +150,11 @@ def p_phdl(cluster_dict, inference_dict): def d_phdl(cluster_dict, inference_dict): env_vars = cluster_dict.get("env_vars") d_phdl = Pssh( - log, inference_dict['decode_node_list'], user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'], env_vars=env_vars + log, + inference_dict['decode_node_list'], + user=cluster_dict['username'], + pkey=cluster_dict['priv_key_file'], + env_vars=env_vars, ) return d_phdl From 1da2fb60cc5ccf6148a0f5ed3ebf95e9597b918f Mon Sep 17 00:00:00 2001 From: Rajesh Thummala Date: Fri, 3 Apr 2026 17:13:13 -0500 Subject: [PATCH 03/15] PR comment implementation --- cvs/input/cluster_file/cluster.json | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cvs/input/cluster_file/cluster.json b/cvs/input/cluster_file/cluster.json index c769e7ac..90d7e863 100644 --- a/cvs/input/cluster_file/cluster.json +++ b/cvs/input/cluster_file/cluster.json @@ -14,7 +14,8 @@ }, "env_vars": { - "ROCM":"/shared/apps/ubuntu/opt/rocm-7.11.0/bin/" + "_env_var_comment": "If your cluster supports multiple rocm env via modules, choose the module and provide the path to use by cvs scripts", + "ROCM":"" }, "node_dict": { From 99463e29f63a7c33fce97ecf0d05d587f7b4fc1c Mon Sep 17 00:00:00 2001 From: Rajesh Thummala Date: Fri, 3 Apr 2026 17:31:14 -0500 Subject: [PATCH 04/15] PR Suggest chnages --- cvs/input/cluster_file/cluster.json | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/cvs/input/cluster_file/cluster.json b/cvs/input/cluster_file/cluster.json index 90d7e863..2495e155 100644 --- a/cvs/input/cluster_file/cluster.json +++ b/cvs/input/cluster_file/cluster.json @@ -12,11 +12,8 @@ { "mgmt_ip": "{xx.xx.xx.xx|hostname}" }, - "env_vars": - { - "_env_var_comment": "If your cluster supports multiple rocm env via modules, choose the module and provide the path to use by cvs scripts", - "ROCM":"" - }, + "_env_varS_comment": "Custom env variables to be exported before executing each command, Example in a cluster with multiple rocm installed set ROCM_PATH : /opt/rocm-7.2", + "env_vars": {}, "node_dict": { "{xx.xx.xx.xx|hostname}": From 15b29cedc0ee0e36d971137cc55f413b3eecaffa Mon Sep 17 00:00:00 2001 From: Rajesh Thummala Date: Fri, 3 Apr 2026 17:53:31 -0500 Subject: [PATCH 05/15] PR comments --- cvs/input/cluster_file/cluster.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cvs/input/cluster_file/cluster.json b/cvs/input/cluster_file/cluster.json index 2495e155..a4a03d90 100644 --- a/cvs/input/cluster_file/cluster.json +++ b/cvs/input/cluster_file/cluster.json @@ -12,7 +12,7 @@ { "mgmt_ip": "{xx.xx.xx.xx|hostname}" }, - "_env_varS_comment": "Custom env variables to be exported before executing each command, Example in a cluster with multiple rocm installed set ROCM_PATH : /opt/rocm-7.2", + "_env_vars_comment": "Custom env variables to be exported before executing each command, Example in a cluster with multiple rocm installed set "ROCM_PATH" : "/opt/rocm-7.2" ", "env_vars": {}, "node_dict": { From 9c08dff1e88b3a47446e546a61f72ad034dd8414 Mon Sep 17 00:00:00 2001 From: Rajesh Thummala Date: Fri, 3 Apr 2026 18:06:57 -0500 Subject: [PATCH 06/15] Addming additional example variable --- cvs/input/cluster_file/cluster.json | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/cvs/input/cluster_file/cluster.json b/cvs/input/cluster_file/cluster.json index a4a03d90..f5d8c59c 100644 --- a/cvs/input/cluster_file/cluster.json +++ b/cvs/input/cluster_file/cluster.json @@ -12,7 +12,11 @@ { "mgmt_ip": "{xx.xx.xx.xx|hostname}" }, - "_env_vars_comment": "Custom env variables to be exported before executing each command, Example in a cluster with multiple rocm installed set "ROCM_PATH" : "/opt/rocm-7.2" ", + "_env_vars_comment": "Custom env variables to be exported before executing each command", + "_env_vars_example": + { + "ROCM_PATH": "/opt/rocm-7.2" + }, "env_vars": {}, "node_dict": { From 693e9dc6288d5bf30fb28fd8c01c7269af3a38c8 Mon Sep 17 00:00:00 2001 From: Rajesh Thummala Date: Mon, 6 Apr 2026 08:26:09 -0500 Subject: [PATCH 07/15] Implemented PR comments --- cvs/input/cluster_file/cluster.json | 2 +- cvs/lib/parallel_ssh_lib.py | 9 ++++++++- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/cvs/input/cluster_file/cluster.json b/cvs/input/cluster_file/cluster.json index f5d8c59c..1d5b5be9 100644 --- a/cvs/input/cluster_file/cluster.json +++ b/cvs/input/cluster_file/cluster.json @@ -15,7 +15,7 @@ "_env_vars_comment": "Custom env variables to be exported before executing each command", "_env_vars_example": { - "ROCM_PATH": "/opt/rocm-7.2" + "PATH": "/opt/rocm-7.2" }, "env_vars": {}, "node_dict": diff --git a/cvs/lib/parallel_ssh_lib.py b/cvs/lib/parallel_ssh_lib.py index 3e1c2fc9..94f03e22 100755 --- a/cvs/lib/parallel_ssh_lib.py +++ b/cvs/lib/parallel_ssh_lib.py @@ -15,6 +15,7 @@ import paramiko from paramiko import SSHClient from scp import SCPClient +import shlex class Pssh: @@ -46,7 +47,13 @@ def __init__( self.host_key_check = host_key_check self.stop_on_errors = stop_on_errors self.unreachable_hosts = [] - self.env_prefix = f"export PATH={':'.join(env_vars.values())}:$PATH" if env_vars else None + self.env_prefix = ( + " ".join( + f"export {key}={shlex.quote(str(value))}" + for key, value in env_vars.items() + ) + if env_vars else "" + ) if self.password is None: print(self.reachable_hosts) From f92fc861c644a12f7f6eff80ee19e603f812e643 Mon Sep 17 00:00:00 2001 From: Rajesh Thummala Date: Mon, 6 Apr 2026 08:31:41 -0500 Subject: [PATCH 08/15] make fmt changes --- cvs/lib/parallel_ssh_lib.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/cvs/lib/parallel_ssh_lib.py b/cvs/lib/parallel_ssh_lib.py index 94f03e22..9992089f 100755 --- a/cvs/lib/parallel_ssh_lib.py +++ b/cvs/lib/parallel_ssh_lib.py @@ -48,11 +48,7 @@ def __init__( self.stop_on_errors = stop_on_errors self.unreachable_hosts = [] self.env_prefix = ( - " ".join( - f"export {key}={shlex.quote(str(value))}" - for key, value in env_vars.items() - ) - if env_vars else "" + " ".join(f"export {key}={shlex.quote(str(value))}" for key, value in env_vars.items()) if env_vars else "" ) if self.password is None: From 12558ea3d4a1fdd52f5c6b7dd0d6774e6fa67c9b Mon Sep 17 00:00:00 2001 From: Rajesh Thummala Date: Mon, 6 Apr 2026 16:31:29 -0500 Subject: [PATCH 09/15] Added new file env_lib.py and respective test file --- cvs/input/cluster_file/cluster.json | 2 +- cvs/lib/env_lib.py | 66 +++++++++++++++++++++++++ cvs/lib/parallel_ssh_lib.py | 6 +-- cvs/lib/unittests/test_env_lib.py | 75 +++++++++++++++++++++++++++++ 4 files changed, 144 insertions(+), 5 deletions(-) create mode 100644 cvs/lib/env_lib.py create mode 100644 cvs/lib/unittests/test_env_lib.py diff --git a/cvs/input/cluster_file/cluster.json b/cvs/input/cluster_file/cluster.json index 1d5b5be9..5655901f 100644 --- a/cvs/input/cluster_file/cluster.json +++ b/cvs/input/cluster_file/cluster.json @@ -15,7 +15,7 @@ "_env_vars_comment": "Custom env variables to be exported before executing each command", "_env_vars_example": { - "PATH": "/opt/rocm-7.2" + "PATH": "/shared/apps/ubuntu/opt/rocm-7.11.0/bin:$PATH" }, "env_vars": {}, "node_dict": diff --git a/cvs/lib/env_lib.py b/cvs/lib/env_lib.py new file mode 100644 index 00000000..ac06b32f --- /dev/null +++ b/cvs/lib/env_lib.py @@ -0,0 +1,66 @@ +""" +env_lib.py + +Utilities for safely constructing shell-compatible environment variable +exports, with controlled support for self-referential expansion (e.g. PATH). + +Key guarantees: +- Prevents shell injection by quoting all user-provided values. +- Allows controlled expansion ONLY for `$KEY` where KEY is the variable being + assigned (e.g., PATH=/x:$PATH, LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/y). +- Expansion is performed on the *remote shell*, not locally. +""" + +import shlex + + +def build_env_prefix(env_vars): + """ + Build a shell-safe export prefix from environment variables. + + Supported patterns: + - :$VAR (prepend) + - $VAR: (append) + + Unsupported patterns (treated as literal values): + - Cross-variable expansion (e.g., FOO=$BAR) + - Shell substitution (e.g., $(...), `...`) + - Parameter expansion (e.g., ${VAR:-default}) + + Args: + env_vars: Dictionary of environment variables to export. + + Returns: + A string suitable for prefixing a shell command, e.g.: + "export PATH=/x:$PATH ; export FOO='bar'" + or an empty string if env_vars is empty. + """ + if not env_vars: + return "" + + exports = [] + + for key, value in env_vars.items(): + marker = f"${key}" + + # Case 1: Prepend to existing variable (e.g., PATH=/x:$PATH) + if value.endswith(":" + marker): + prefix = value[: -(len(marker) + 1)] + exports.append( + f"export {key}={shlex.quote(prefix)}:${key}" + ) + + # Case 2: Append to existing variable (e.g., PATH=$PATH:/x) + elif value.startswith(marker + ":"): + suffix = value[len(marker) + 1 :] + exports.append( + f"export {key}=${key}:{shlex.quote(suffix)}" + ) + + # Case 3: Treat as a literal value (fully quoted) + else: + exports.append( + f"export {key}={shlex.quote(str(value))}" + ) + + return " ; ".join(exports) \ No newline at end of file diff --git a/cvs/lib/parallel_ssh_lib.py b/cvs/lib/parallel_ssh_lib.py index 9992089f..5ec92d7c 100755 --- a/cvs/lib/parallel_ssh_lib.py +++ b/cvs/lib/parallel_ssh_lib.py @@ -15,7 +15,7 @@ import paramiko from paramiko import SSHClient from scp import SCPClient -import shlex +from env_lib import build_env_prefix class Pssh: @@ -47,9 +47,7 @@ def __init__( self.host_key_check = host_key_check self.stop_on_errors = stop_on_errors self.unreachable_hosts = [] - self.env_prefix = ( - " ".join(f"export {key}={shlex.quote(str(value))}" for key, value in env_vars.items()) if env_vars else "" - ) + self.env_prefix = build_env_prefix(env_vars) if self.password is None: print(self.reachable_hosts) diff --git a/cvs/lib/unittests/test_env_lib.py b/cvs/lib/unittests/test_env_lib.py new file mode 100644 index 00000000..1f986b4a --- /dev/null +++ b/cvs/lib/unittests/test_env_lib.py @@ -0,0 +1,75 @@ +""" +test_env_lib.py + +Unit tests for env_lib.build_env_prefix using Python's built-in unittest framework. +""" + +import unittest +from env_lib import build_env_prefix + + +class TestBuildEnvPrefix(unittest.TestCase): + + def test_empty_env_vars(self): + self.assertEqual(build_env_prefix({}), "") + + def test_literal_env_var(self): + env = {"FOO": "bar"} + result = build_env_prefix(env) + self.assertEqual(result, "export FOO=bar") + + def test_literal_env_var_with_spaces(self): + env = {"FOO": "hello world"} + result = build_env_prefix(env) + self.assertEqual(result, "export FOO='hello world'") + + def test_path_prepend(self): + env = {"PATH": "/usr/bin:/custom/bin:$PATH"} + result = build_env_prefix(env) + self.assertEqual(result, "export PATH=/usr/bin:/custom/bin:$PATH") + + def test_path_append(self): + env = {"PATH": "$PATH:/custom/bin"} + result = build_env_prefix(env) + self.assertEqual(result, "export PATH=$PATH:/custom/bin") + + def test_ld_library_path_prepend(self): + env = {"LD_LIBRARY_PATH": "/opt/lib:$LD_LIBRARY_PATH"} + result = build_env_prefix(env) + self.assertEqual( + result, + "export LD_LIBRARY_PATH=/opt/lib:$LD_LIBRARY_PATH" + ) + + def test_multiple_env_vars_mixed(self): + env = { + "PATH": "/usr/bin:$PATH", + "FOO": "bar", + "BAZ": "hello world", + } + result = build_env_prefix(env) + + self.assertEqual( + result, + "export PATH=/usr/bin:$PATH ; " + "export FOO=bar ; " + "export BAZ='hello world'" + ) + + def test_cross_variable_expansion_is_not_allowed(self): + env = {"FOO": "$PATH"} + result = build_env_prefix(env) + + # Expansion should be blocked and treated as a literal + self.assertEqual(result, "export FOO='$PATH'") + + def test_shell_injection_attempt_is_quoted(self): + env = {"FOO": "$(rm -rf /)"} + result = build_env_prefix(env) + + # Must be fully quoted to prevent execution + self.assertEqual(result, "export FOO='$(rm -rf /)'") + + +if __name__ == "__main__": + unittest.main() \ No newline at end of file From eabc42023d87c35c453582f6e8fc4b25a013c18b Mon Sep 17 00:00:00 2001 From: Rajesh Thummala Date: Mon, 6 Apr 2026 16:32:58 -0500 Subject: [PATCH 10/15] make fmt changes --- cvs/lib/env_lib.py | 14 ++++---------- cvs/lib/unittests/test_env_lib.py | 15 +++------------ 2 files changed, 7 insertions(+), 22 deletions(-) diff --git a/cvs/lib/env_lib.py b/cvs/lib/env_lib.py index ac06b32f..203a7de6 100644 --- a/cvs/lib/env_lib.py +++ b/cvs/lib/env_lib.py @@ -46,21 +46,15 @@ def build_env_prefix(env_vars): # Case 1: Prepend to existing variable (e.g., PATH=/x:$PATH) if value.endswith(":" + marker): prefix = value[: -(len(marker) + 1)] - exports.append( - f"export {key}={shlex.quote(prefix)}:${key}" - ) + exports.append(f"export {key}={shlex.quote(prefix)}:${key}") # Case 2: Append to existing variable (e.g., PATH=$PATH:/x) elif value.startswith(marker + ":"): suffix = value[len(marker) + 1 :] - exports.append( - f"export {key}=${key}:{shlex.quote(suffix)}" - ) + exports.append(f"export {key}=${key}:{shlex.quote(suffix)}") # Case 3: Treat as a literal value (fully quoted) else: - exports.append( - f"export {key}={shlex.quote(str(value))}" - ) + exports.append(f"export {key}={shlex.quote(str(value))}") - return " ; ".join(exports) \ No newline at end of file + return " ; ".join(exports) diff --git a/cvs/lib/unittests/test_env_lib.py b/cvs/lib/unittests/test_env_lib.py index 1f986b4a..1d4d7012 100644 --- a/cvs/lib/unittests/test_env_lib.py +++ b/cvs/lib/unittests/test_env_lib.py @@ -9,7 +9,6 @@ class TestBuildEnvPrefix(unittest.TestCase): - def test_empty_env_vars(self): self.assertEqual(build_env_prefix({}), "") @@ -36,10 +35,7 @@ def test_path_append(self): def test_ld_library_path_prepend(self): env = {"LD_LIBRARY_PATH": "/opt/lib:$LD_LIBRARY_PATH"} result = build_env_prefix(env) - self.assertEqual( - result, - "export LD_LIBRARY_PATH=/opt/lib:$LD_LIBRARY_PATH" - ) + self.assertEqual(result, "export LD_LIBRARY_PATH=/opt/lib:$LD_LIBRARY_PATH") def test_multiple_env_vars_mixed(self): env = { @@ -49,12 +45,7 @@ def test_multiple_env_vars_mixed(self): } result = build_env_prefix(env) - self.assertEqual( - result, - "export PATH=/usr/bin:$PATH ; " - "export FOO=bar ; " - "export BAZ='hello world'" - ) + self.assertEqual(result, "export PATH=/usr/bin:$PATH ; export FOO=bar ; export BAZ='hello world'") def test_cross_variable_expansion_is_not_allowed(self): env = {"FOO": "$PATH"} @@ -72,4 +63,4 @@ def test_shell_injection_attempt_is_quoted(self): if __name__ == "__main__": - unittest.main() \ No newline at end of file + unittest.main() From 795aa327480e5de98aa10620d26af9014b6c8500 Mon Sep 17 00:00:00 2001 From: Rajesh Thummala Date: Mon, 6 Apr 2026 16:36:45 -0500 Subject: [PATCH 11/15] Chnaged import statement --- cvs/lib/parallel_ssh_lib.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cvs/lib/parallel_ssh_lib.py b/cvs/lib/parallel_ssh_lib.py index 5ec92d7c..abe4df63 100755 --- a/cvs/lib/parallel_ssh_lib.py +++ b/cvs/lib/parallel_ssh_lib.py @@ -15,7 +15,7 @@ import paramiko from paramiko import SSHClient from scp import SCPClient -from env_lib import build_env_prefix +from csv.lib.env_lib import build_env_prefix class Pssh: From 8d8f551068cea205f4753ad99b3eb6a722c1b39a Mon Sep 17 00:00:00 2001 From: Rajesh Thummala Date: Mon, 6 Apr 2026 16:41:43 -0500 Subject: [PATCH 12/15] corrected the typo --- cvs/lib/parallel_ssh_lib.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cvs/lib/parallel_ssh_lib.py b/cvs/lib/parallel_ssh_lib.py index abe4df63..1c1e16f6 100755 --- a/cvs/lib/parallel_ssh_lib.py +++ b/cvs/lib/parallel_ssh_lib.py @@ -15,7 +15,7 @@ import paramiko from paramiko import SSHClient from scp import SCPClient -from csv.lib.env_lib import build_env_prefix +from cvs.lib.env_lib import build_env_prefix class Pssh: From 7da18a6a3235446ea261403b9ddca29c2ad6decc Mon Sep 17 00:00:00 2001 From: Rajesh Thummala Date: Mon, 6 Apr 2026 18:10:31 -0500 Subject: [PATCH 13/15] added csv.lib to the import statement --- cvs/lib/parallel_ssh_lib.py | 1 + cvs/lib/unittests/test_env_lib.py | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/cvs/lib/parallel_ssh_lib.py b/cvs/lib/parallel_ssh_lib.py index 1c1e16f6..6b74c0de 100755 --- a/cvs/lib/parallel_ssh_lib.py +++ b/cvs/lib/parallel_ssh_lib.py @@ -48,6 +48,7 @@ def __init__( self.stop_on_errors = stop_on_errors self.unreachable_hosts = [] self.env_prefix = build_env_prefix(env_vars) + self.log.Debug(f"Environ vars: {self.env_prefix}") if self.password is None: print(self.reachable_hosts) diff --git a/cvs/lib/unittests/test_env_lib.py b/cvs/lib/unittests/test_env_lib.py index 1d4d7012..9fa4de49 100644 --- a/cvs/lib/unittests/test_env_lib.py +++ b/cvs/lib/unittests/test_env_lib.py @@ -5,7 +5,7 @@ """ import unittest -from env_lib import build_env_prefix +from cvs.lib.env_lib import build_env_prefix class TestBuildEnvPrefix(unittest.TestCase): From f8c67594494d6c0c11b7165a91d825eda1ed1df6 Mon Sep 17 00:00:00 2001 From: Rajesh Thummala Date: Tue, 7 Apr 2026 16:35:36 -0500 Subject: [PATCH 14/15] changes related to rocm-smi command and MI355 condition --- cvs/lib/parallel_ssh_lib.py | 2 +- cvs/lib/utils_lib.py | 2 +- cvs/tests/training/jax/jax_llama3_1_70b_single.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/cvs/lib/parallel_ssh_lib.py b/cvs/lib/parallel_ssh_lib.py index 6b74c0de..71285c19 100755 --- a/cvs/lib/parallel_ssh_lib.py +++ b/cvs/lib/parallel_ssh_lib.py @@ -48,7 +48,7 @@ def __init__( self.stop_on_errors = stop_on_errors self.unreachable_hosts = [] self.env_prefix = build_env_prefix(env_vars) - self.log.Debug(f"Environ vars: {self.env_prefix}") + self.log.debug(f"Environ vars: {self.env_prefix}") if self.password is None: print(self.reachable_hosts) diff --git a/cvs/lib/utils_lib.py b/cvs/lib/utils_lib.py index 45a114c2..41692655 100644 --- a/cvs/lib/utils_lib.py +++ b/cvs/lib/utils_lib.py @@ -193,7 +193,7 @@ def get_model_from_rocm_smi_output(smi_output): model = 'mi325' elif re.search('MI350', smi_output, re.I): model = 'mi350' - elif re.search('MI355', smi_output, re.I): + elif re.search('gfx950', smi_output, re.I): model = 'mi355' else: model = 'mi300x' diff --git a/cvs/tests/training/jax/jax_llama3_1_70b_single.py b/cvs/tests/training/jax/jax_llama3_1_70b_single.py index ec26c53b..e96ff650 100644 --- a/cvs/tests/training/jax/jax_llama3_1_70b_single.py +++ b/cvs/tests/training/jax/jax_llama3_1_70b_single.py @@ -282,7 +282,7 @@ def test_llama_3_1_70b_singlenode_training(phdl, training_dict, model_params_dic """ globals.error_list = [] head_node = phdl.host_list[0] - smi_out_dict = phdl.exec('rocm-smi -a | head -30') + smi_out_dict = phdl.exec('rocm-smi --showproductname | grep "GFX Version"') smi_out = smi_out_dict[head_node] gpu_type = get_model_from_rocm_smi_output(smi_out) jx_obj = jax_training_lib.JaxTrainingJob( From b43fd3998b7b0cca0dc8b121719d73379fab2093 Mon Sep 17 00:00:00 2001 From: Rajesh Thummala Date: Fri, 10 Apr 2026 09:21:30 -0500 Subject: [PATCH 15/15] Reverting rocm-smi changes --- cvs/lib/utils_lib.py | 2 +- cvs/tests/training/jax/jax_llama3_1_70b_single.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/cvs/lib/utils_lib.py b/cvs/lib/utils_lib.py index 41692655..45a114c2 100644 --- a/cvs/lib/utils_lib.py +++ b/cvs/lib/utils_lib.py @@ -193,7 +193,7 @@ def get_model_from_rocm_smi_output(smi_output): model = 'mi325' elif re.search('MI350', smi_output, re.I): model = 'mi350' - elif re.search('gfx950', smi_output, re.I): + elif re.search('MI355', smi_output, re.I): model = 'mi355' else: model = 'mi300x' diff --git a/cvs/tests/training/jax/jax_llama3_1_70b_single.py b/cvs/tests/training/jax/jax_llama3_1_70b_single.py index e96ff650..ec26c53b 100644 --- a/cvs/tests/training/jax/jax_llama3_1_70b_single.py +++ b/cvs/tests/training/jax/jax_llama3_1_70b_single.py @@ -282,7 +282,7 @@ def test_llama_3_1_70b_singlenode_training(phdl, training_dict, model_params_dic """ globals.error_list = [] head_node = phdl.host_list[0] - smi_out_dict = phdl.exec('rocm-smi --showproductname | grep "GFX Version"') + smi_out_dict = phdl.exec('rocm-smi -a | head -30') smi_out = smi_out_dict[head_node] gpu_type = get_model_from_rocm_smi_output(smi_out) jx_obj = jax_training_lib.JaxTrainingJob(