diff --git a/cvs/input/cluster_file/cluster.json b/cvs/input/cluster_file/cluster.json index 9fa7d18f..5655901f 100644 --- a/cvs/input/cluster_file/cluster.json +++ b/cvs/input/cluster_file/cluster.json @@ -12,6 +12,12 @@ { "mgmt_ip": "{xx.xx.xx.xx|hostname}" }, + "_env_vars_comment": "Custom env variables to be exported before executing each command", + "_env_vars_example": + { + "PATH": "/shared/apps/ubuntu/opt/rocm-7.11.0/bin:$PATH" + }, + "env_vars": {}, "node_dict": { "{xx.xx.xx.xx|hostname}": diff --git a/cvs/lib/env_lib.py b/cvs/lib/env_lib.py new file mode 100644 index 00000000..203a7de6 --- /dev/null +++ b/cvs/lib/env_lib.py @@ -0,0 +1,60 @@ +""" +env_lib.py + +Utilities for safely constructing shell-compatible environment variable +exports, with controlled support for self-referential expansion (e.g. PATH). + +Key guarantees: +- Prevents shell injection by quoting all user-provided values. +- Allows controlled expansion ONLY for `$KEY` where KEY is the variable being + assigned (e.g., PATH=/x:$PATH, LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/y). +- Expansion is performed on the *remote shell*, not locally. +""" + +import shlex + + +def build_env_prefix(env_vars): + """ + Build a shell-safe export prefix from environment variables. + + Supported patterns: + - :$VAR (prepend) + - $VAR: (append) + + Unsupported patterns (treated as literal values): + - Cross-variable expansion (e.g., FOO=$BAR) + - Shell substitution (e.g., $(...), `...`) + - Parameter expansion (e.g., ${VAR:-default}) + + Args: + env_vars: Dictionary of environment variables to export. + + Returns: + A string suitable for prefixing a shell command, e.g.: + "export PATH=/x:$PATH ; export FOO='bar'" + or an empty string if env_vars is empty. + """ + if not env_vars: + return "" + + exports = [] + + for key, value in env_vars.items(): + marker = f"${key}" + + # Case 1: Prepend to existing variable (e.g., PATH=/x:$PATH) + if value.endswith(":" + marker): + prefix = value[: -(len(marker) + 1)] + exports.append(f"export {key}={shlex.quote(prefix)}:${key}") + + # Case 2: Append to existing variable (e.g., PATH=$PATH:/x) + elif value.startswith(marker + ":"): + suffix = value[len(marker) + 1 :] + exports.append(f"export {key}=${key}:{shlex.quote(suffix)}") + + # Case 3: Treat as a literal value (fully quoted) + else: + exports.append(f"export {key}={shlex.quote(str(value))}") + + return " ; ".join(exports) diff --git a/cvs/lib/parallel_ssh_lib.py b/cvs/lib/parallel_ssh_lib.py index f11e8093..71285c19 100755 --- a/cvs/lib/parallel_ssh_lib.py +++ b/cvs/lib/parallel_ssh_lib.py @@ -15,6 +15,7 @@ import paramiko from paramiko import SSHClient from scp import SCPClient +from cvs.lib.env_lib import build_env_prefix class Pssh: @@ -27,7 +28,15 @@ class Pssh: """ def __init__( - self, log, host_list, user=None, password=None, pkey='id_rsa', host_key_check=False, stop_on_errors=True + self, + log, + host_list, + user=None, + password=None, + pkey='id_rsa', + host_key_check=False, + stop_on_errors=True, + env_vars=None, ): self.log = log self.host_list = host_list @@ -38,6 +47,8 @@ def __init__( self.host_key_check = host_key_check self.stop_on_errors = stop_on_errors self.unreachable_hosts = [] + self.env_prefix = build_env_prefix(env_vars) + self.log.debug(f"Environ vars: {self.env_prefix}") if self.password is None: print(self.reachable_hosts) @@ -168,20 +179,27 @@ def exec(self, cmd, timeout=None, print_console=True): """ Returns a dictionary of host as key and command output as values """ - print(f'cmd = {cmd}') + if self.env_prefix: + full_cmd = f"{self.env_prefix} ; {cmd}" + else: + full_cmd = cmd + + print(f'cmd = {full_cmd}') # Log command execution if self.log: if timeout is not None: - self.log.debug(f"Executing command on {len(self.reachable_hosts)} host(s) [timeout={timeout}s]: {cmd}") + self.log.debug( + f"Executing command on {len(self.reachable_hosts)} host(s) [timeout={timeout}s]: {full_cmd}" + ) else: - self.log.debug(f"Executing command on {len(self.reachable_hosts)} host(s): {cmd}") + self.log.debug(f"Executing command on {len(self.reachable_hosts)} host(s): {full_cmd}") if timeout is None: - output = self.client.run_command(cmd, stop_on_errors=self.stop_on_errors) + output = self.client.run_command(full_cmd, stop_on_errors=self.stop_on_errors) else: - output = self.client.run_command(cmd, read_timeout=timeout, stop_on_errors=self.stop_on_errors) - cmd_output = self._process_output(output, cmd=cmd, print_console=print_console) + output = self.client.run_command(full_cmd, read_timeout=timeout, stop_on_errors=self.stop_on_errors) + cmd_output = self._process_output(output, cmd=full_cmd, print_console=print_console) # Log per-host execution completion if self.log: @@ -196,6 +214,11 @@ def exec_cmd_list(self, cmd_list, timeout=None, print_console=True): which runs the same command on all hosts. Returns a dictionary of host as key and command output as values """ + if self.env_prefix: + cmd_list = [f"{self.env_prefix} ; {cmd}" for cmd in cmd_list] + else: + cmd_list = cmd_list + print(cmd_list) # Log command list execution diff --git a/cvs/lib/unittests/test_env_lib.py b/cvs/lib/unittests/test_env_lib.py new file mode 100644 index 00000000..9fa4de49 --- /dev/null +++ b/cvs/lib/unittests/test_env_lib.py @@ -0,0 +1,66 @@ +""" +test_env_lib.py + +Unit tests for env_lib.build_env_prefix using Python's built-in unittest framework. +""" + +import unittest +from cvs.lib.env_lib import build_env_prefix + + +class TestBuildEnvPrefix(unittest.TestCase): + def test_empty_env_vars(self): + self.assertEqual(build_env_prefix({}), "") + + def test_literal_env_var(self): + env = {"FOO": "bar"} + result = build_env_prefix(env) + self.assertEqual(result, "export FOO=bar") + + def test_literal_env_var_with_spaces(self): + env = {"FOO": "hello world"} + result = build_env_prefix(env) + self.assertEqual(result, "export FOO='hello world'") + + def test_path_prepend(self): + env = {"PATH": "/usr/bin:/custom/bin:$PATH"} + result = build_env_prefix(env) + self.assertEqual(result, "export PATH=/usr/bin:/custom/bin:$PATH") + + def test_path_append(self): + env = {"PATH": "$PATH:/custom/bin"} + result = build_env_prefix(env) + self.assertEqual(result, "export PATH=$PATH:/custom/bin") + + def test_ld_library_path_prepend(self): + env = {"LD_LIBRARY_PATH": "/opt/lib:$LD_LIBRARY_PATH"} + result = build_env_prefix(env) + self.assertEqual(result, "export LD_LIBRARY_PATH=/opt/lib:$LD_LIBRARY_PATH") + + def test_multiple_env_vars_mixed(self): + env = { + "PATH": "/usr/bin:$PATH", + "FOO": "bar", + "BAZ": "hello world", + } + result = build_env_prefix(env) + + self.assertEqual(result, "export PATH=/usr/bin:$PATH ; export FOO=bar ; export BAZ='hello world'") + + def test_cross_variable_expansion_is_not_allowed(self): + env = {"FOO": "$PATH"} + result = build_env_prefix(env) + + # Expansion should be blocked and treated as a literal + self.assertEqual(result, "export FOO='$PATH'") + + def test_shell_injection_attempt_is_quoted(self): + env = {"FOO": "$(rm -rf /)"} + result = build_env_prefix(env) + + # Must be fully quoted to prevent execution + self.assertEqual(result, "export FOO='$(rm -rf /)'") + + +if __name__ == "__main__": + unittest.main() diff --git a/cvs/tests/health/agfhc_cvs.py b/cvs/tests/health/agfhc_cvs.py index 5bb18ccc..3216895e 100644 --- a/cvs/tests/health/agfhc_cvs.py +++ b/cvs/tests/health/agfhc_cvs.py @@ -119,8 +119,16 @@ def phdl(cluster_dict): Pssh: A handle to execute commands across all nodes. """ print(cluster_dict) + env_vars = cluster_dict.get("env_vars") node_list = list(cluster_dict['node_dict'].keys()) - phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'], stop_on_errors=False) + phdl = Pssh( + log, + node_list, + user=cluster_dict['username'], + pkey=cluster_dict['priv_key_file'], + stop_on_errors=False, + env_vars=env_vars, + ) return phdl diff --git a/cvs/tests/health/csp_qual_agfhc.py b/cvs/tests/health/csp_qual_agfhc.py index 97f37b3c..46b1173c 100644 --- a/cvs/tests/health/csp_qual_agfhc.py +++ b/cvs/tests/health/csp_qual_agfhc.py @@ -142,8 +142,9 @@ def phdl(cluster_dict): Pssh: A handle to execute commands across all nodes. """ print(cluster_dict) + env_vars = cluster_dict.get("env_vars") node_list = list(cluster_dict['node_dict'].keys()) - phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file']) + phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'], env_vars=env_vars) return phdl diff --git a/cvs/tests/health/install/install_agfhc.py b/cvs/tests/health/install/install_agfhc.py index 112f2734..70f43575 100644 --- a/cvs/tests/health/install/install_agfhc.py +++ b/cvs/tests/health/install/install_agfhc.py @@ -123,8 +123,9 @@ def phdl(cluster_dict): Pssh: A handle to execute commands across all nodes. """ print(cluster_dict) + env_vars = cluster_dict.get("env_vars") node_list = list(cluster_dict['node_dict'].keys()) - phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file']) + phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'], env_vars=env_vars) return phdl diff --git a/cvs/tests/health/install/install_babelstream.py b/cvs/tests/health/install/install_babelstream.py index f028a72c..72b0a872 100644 --- a/cvs/tests/health/install/install_babelstream.py +++ b/cvs/tests/health/install/install_babelstream.py @@ -138,8 +138,9 @@ def phdl(cluster_dict): """ print(cluster_dict) + env_vars = cluster_dict.get("env_vars") node_list = list(cluster_dict['node_dict'].keys()) - phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file']) + phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'], env_vars=env_vars) return phdl diff --git a/cvs/tests/health/install/install_rocblas.py b/cvs/tests/health/install/install_rocblas.py index 905d0c3a..1a9fd80e 100644 --- a/cvs/tests/health/install/install_rocblas.py +++ b/cvs/tests/health/install/install_rocblas.py @@ -133,8 +133,9 @@ def phdl(cluster_dict): """ print(cluster_dict) + env_vars = cluster_dict.get("env_vars") node_list = list(cluster_dict['node_dict'].keys()) - phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file']) + phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'], env_vars=env_vars) return phdl diff --git a/cvs/tests/health/install/install_rvs.py b/cvs/tests/health/install/install_rvs.py index 326d0afa..d2db9020 100644 --- a/cvs/tests/health/install/install_rvs.py +++ b/cvs/tests/health/install/install_rvs.py @@ -183,8 +183,9 @@ def shdl(cluster_dict): - Module scope ensures a single connection context for the duration of the module. """ node_list = list(cluster_dict['node_dict'].keys()) + env_vars = cluster_dict.get("env_vars") head_node = node_list[0] - shdl = Pssh(log, [head_node], user=cluster_dict['username'], pkey=cluster_dict['priv_key_file']) + shdl = Pssh(log, [head_node], user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'], env_vars=env_vars) return shdl @@ -204,8 +205,9 @@ def phdl(cluster_dict): Pssh: Handle that executes commands on all nodes and returns dict[node] -> output. """ print(cluster_dict) + env_vars = cluster_dict.get("env_vars") node_list = list(cluster_dict['node_dict'].keys()) - phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file']) + phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'], env_vars=env_vars) return phdl diff --git a/cvs/tests/health/install/install_transferbench.py b/cvs/tests/health/install/install_transferbench.py index 3fb0dec5..2cee070a 100644 --- a/cvs/tests/health/install/install_transferbench.py +++ b/cvs/tests/health/install/install_transferbench.py @@ -187,8 +187,9 @@ def shdl(cluster_dict): - nhdl_dict is currently unused; it can be removed unless used elsewhere. """ node_list = list(cluster_dict['node_dict'].keys()) + env_vars = cluster_dict.get("env_vars") head_node = node_list[0] - shdl = Pssh(log, [head_node], user=cluster_dict['username'], pkey=cluster_dict['priv_key_file']) + shdl = Pssh(log, [head_node], user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'], env_vars=env_vars) return shdl @@ -209,8 +210,9 @@ def phdl(cluster_dict): """ print(cluster_dict) + env_vars = cluster_dict.get("env_vars") node_list = list(cluster_dict['node_dict'].keys()) - phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file']) + phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'], env_vars=env_vars) return phdl diff --git a/cvs/tests/health/rvs_cvs.py b/cvs/tests/health/rvs_cvs.py index 9df7df22..9fdc9d7b 100644 --- a/cvs/tests/health/rvs_cvs.py +++ b/cvs/tests/health/rvs_cvs.py @@ -60,8 +60,16 @@ def config_dict(config_file, cluster_dict): @pytest.fixture(scope="module") def phdl(cluster_dict): print(cluster_dict) + env_vars = cluster_dict.get("env_vars") node_list = list(cluster_dict['node_dict'].keys()) - phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'], stop_on_errors=False) + phdl = Pssh( + log, + node_list, + user=cluster_dict['username'], + pkey=cluster_dict['priv_key_file'], + stop_on_errors=False, + env_vars=env_vars, + ) return phdl diff --git a/cvs/tests/health/transferbench_cvs.py b/cvs/tests/health/transferbench_cvs.py index 75e098f5..e5cb8a2f 100644 --- a/cvs/tests/health/transferbench_cvs.py +++ b/cvs/tests/health/transferbench_cvs.py @@ -251,8 +251,9 @@ def parse_tb_example_test_results(out_dict, exp_dict): @pytest.fixture(scope="module") def phdl(cluster_dict): print(cluster_dict) + env_vars = cluster_dict.get("env_vars") node_list = list(cluster_dict['node_dict'].keys()) - phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file']) + phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'], env_vars=env_vars) return phdl diff --git a/cvs/tests/ibperf/ib_perf_bw_test.py b/cvs/tests/ibperf/ib_perf_bw_test.py index 59be244f..9139fae0 100644 --- a/cvs/tests/ibperf/ib_perf_bw_test.py +++ b/cvs/tests/ibperf/ib_perf_bw_test.py @@ -133,10 +133,11 @@ def phdl(cluster_dict): - Assumes Pssh(log, node_list, user=..., pkey=...) is available in scope. """ print(cluster_dict) + env_vars = cluster_dict.get("env_vars") node_list = list(cluster_dict['node_dict'].keys()) if len(node_list) % 2 != 0: node_list.pop() - phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file']) + phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'], env_vars=env_vars) return phdl @@ -157,8 +158,9 @@ def shdl(cluster_dict): - nhdl_dict is currently unused; it can be removed unless used elsewhere. """ node_list = list(cluster_dict['node_dict'].keys()) + env_vars = cluster_dict.get("env_vars") head_node = node_list[0] - shdl = Pssh(log, [head_node], user=cluster_dict['username'], pkey=cluster_dict['priv_key_file']) + shdl = Pssh(log, [head_node], user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'], env_vars=env_vars) return shdl diff --git a/cvs/tests/ibperf/install_ibperf_tools.py b/cvs/tests/ibperf/install_ibperf_tools.py index eef40003..4aadea93 100644 --- a/cvs/tests/ibperf/install_ibperf_tools.py +++ b/cvs/tests/ibperf/install_ibperf_tools.py @@ -131,10 +131,11 @@ def phdl(cluster_dict): - Assumes Pssh(log, node_list, user=..., pkey=...) is available in scope. """ print(cluster_dict) + env_vars = cluster_dict.get("env_vars") node_list = list(cluster_dict['node_dict'].keys()) if len(node_list) % 2 != 0: node_list.pop() - phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file']) + phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'], env_vars=env_vars) return phdl @@ -155,8 +156,9 @@ def shdl(cluster_dict): - nhdl_dict is currently unused; it can be removed unless used elsewhere. """ node_list = list(cluster_dict['node_dict'].keys()) + env_vars = cluster_dict.get("env_vars") head_node = node_list[0] - shdl = Pssh(log, [head_node], user=cluster_dict['username'], pkey=cluster_dict['priv_key_file']) + shdl = Pssh(log, [head_node], user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'], env_vars=env_vars) return shdl diff --git a/cvs/tests/inference/inferencemax/inferencemax_gpt_oss_120b_single.py b/cvs/tests/inference/inferencemax/inferencemax_gpt_oss_120b_single.py index 9658bf19..a2ec8520 100644 --- a/cvs/tests/inference/inferencemax/inferencemax_gpt_oss_120b_single.py +++ b/cvs/tests/inference/inferencemax/inferencemax_gpt_oss_120b_single.py @@ -154,8 +154,9 @@ def s_phdl(cluster_dict): - This fixture has module scope, so a single connection handle is reused for all tests in the module. """ print(cluster_dict) + env_vars = cluster_dict.get("env_vars") node_list = list(cluster_dict['node_dict'].keys()) - s_phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file']) + s_phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'], env_vars=env_vars) return s_phdl @@ -181,8 +182,9 @@ def c_phdl(cluster_dict): - This fixture has module scope, so a single connection handle is reused for all tests in the module. """ print(cluster_dict) + env_vars = cluster_dict.get("env_vars") node_list = list(cluster_dict['node_dict'].keys()) - c_phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file']) + c_phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'], env_vars=env_vars) return c_phdl diff --git a/cvs/tests/inference/pytorch_xdit/pytorch_xdit_flux1_dev_single.py b/cvs/tests/inference/pytorch_xdit/pytorch_xdit_flux1_dev_single.py index 8ca2a92f..a1c94644 100644 --- a/cvs/tests/inference/pytorch_xdit/pytorch_xdit_flux1_dev_single.py +++ b/cvs/tests/inference/pytorch_xdit/pytorch_xdit_flux1_dev_single.py @@ -274,6 +274,7 @@ def hf_token(inference_dict): def s_phdl(cluster_dict): """Create and return a command execution handle for all cluster nodes.""" node_list = list(cluster_dict['node_dict'].keys()) + env_vars = cluster_dict.get("env_vars") # Single-node mode: execute locally ONLY when the target actually refers to this machine. # @@ -292,6 +293,7 @@ def s_phdl(cluster_dict): user=cluster_dict.get("username"), password=cluster_dict.get("password"), pkey=cluster_dict.get("priv_key_file"), + env_vars=env_vars, ) log.info(f"Using parallel-ssh execution mode for {len(node_list)} node(s)") @@ -301,6 +303,7 @@ def s_phdl(cluster_dict): user=cluster_dict.get('username'), password=cluster_dict.get('password'), pkey=cluster_dict.get('priv_key_file'), + env_vars=env_vars, ) diff --git a/cvs/tests/inference/pytorch_xdit/pytorch_xdit_wan22_14b_single.py b/cvs/tests/inference/pytorch_xdit/pytorch_xdit_wan22_14b_single.py index 330cb0d3..1f848625 100644 --- a/cvs/tests/inference/pytorch_xdit/pytorch_xdit_wan22_14b_single.py +++ b/cvs/tests/inference/pytorch_xdit/pytorch_xdit_wan22_14b_single.py @@ -251,6 +251,7 @@ def hf_token(inference_dict): def s_phdl(cluster_dict): """Create and return a command execution handle for all cluster nodes.""" node_list = list(cluster_dict['node_dict'].keys()) + env_vars = cluster_dict.get("env_vars") # Single-node mode: execute locally ONLY when the target actually refers to this machine. # @@ -269,6 +270,7 @@ def s_phdl(cluster_dict): user=cluster_dict.get("username"), password=cluster_dict.get("password"), pkey=cluster_dict.get("priv_key_file"), + env_vars=env_vars, ) log.info(f"Using parallel-ssh execution mode for {len(node_list)} node(s)") @@ -278,6 +280,7 @@ def s_phdl(cluster_dict): user=cluster_dict.get('username'), password=cluster_dict.get('password'), pkey=cluster_dict.get('priv_key_file'), + env_vars=env_vars, ) diff --git a/cvs/tests/inference/sglang/sglang_deepseek_r1_671b_distributed.py b/cvs/tests/inference/sglang/sglang_deepseek_r1_671b_distributed.py index 2b27a03a..978d1b83 100644 --- a/cvs/tests/inference/sglang/sglang_deepseek_r1_671b_distributed.py +++ b/cvs/tests/inference/sglang/sglang_deepseek_r1_671b_distributed.py @@ -135,16 +135,26 @@ def hf_token(inference_dict): @pytest.fixture(scope="module") def p_phdl(cluster_dict, inference_dict): print(cluster_dict) + env_vars = cluster_dict.get("env_vars") p_phdl = Pssh( - log, inference_dict['prefill_node_list'], user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'] + log, + inference_dict['prefill_node_list'], + user=cluster_dict['username'], + pkey=cluster_dict['priv_key_file'], + env_vars=env_vars, ) return p_phdl @pytest.fixture(scope="module") def d_phdl(cluster_dict, inference_dict): + env_vars = cluster_dict.get("env_vars") d_phdl = Pssh( - log, inference_dict['decode_node_list'], user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'] + log, + inference_dict['decode_node_list'], + user=cluster_dict['username'], + pkey=cluster_dict['priv_key_file'], + env_vars=env_vars, ) return d_phdl @@ -152,16 +162,18 @@ def d_phdl(cluster_dict, inference_dict): @pytest.fixture(scope="module") def r_phdl(cluster_dict, inference_dict): node_list = [] + env_vars = cluster_dict.get("env_vars") node_list.append(inference_dict['proxy_router_node']) - r_phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file']) + r_phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'], env_vars=env_vars) return r_phdl @pytest.fixture(scope="module") def b_phdl(cluster_dict, inference_dict): node_list = [] + env_vars = cluster_dict.get("env_vars") node_list.append(inference_dict['benchmark_serv_node']) - b_phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file']) + b_phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'], env_vars=env_vars) return b_phdl diff --git a/cvs/tests/inference/sglang/sglang_llama_70b_distributed.py b/cvs/tests/inference/sglang/sglang_llama_70b_distributed.py index 595fd649..6dd7759b 100644 --- a/cvs/tests/inference/sglang/sglang_llama_70b_distributed.py +++ b/cvs/tests/inference/sglang/sglang_llama_70b_distributed.py @@ -135,16 +135,26 @@ def hf_token(inference_dict): @pytest.fixture(scope="module") def p_phdl(cluster_dict, inference_dict): print(cluster_dict) + env_vars = cluster_dict.get("env_vars") p_phdl = Pssh( - log, inference_dict['prefill_node_list'], user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'] + log, + inference_dict['prefill_node_list'], + user=cluster_dict['username'], + pkey=cluster_dict['priv_key_file'], + env_vars=env_vars, ) return p_phdl @pytest.fixture(scope="module") def d_phdl(cluster_dict, inference_dict): + env_vars = cluster_dict.get("env_vars") d_phdl = Pssh( - log, inference_dict['decode_node_list'], user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'] + log, + inference_dict['decode_node_list'], + user=cluster_dict['username'], + pkey=cluster_dict['priv_key_file'], + env_vars=env_vars, ) return d_phdl @@ -152,16 +162,18 @@ def d_phdl(cluster_dict, inference_dict): @pytest.fixture(scope="module") def r_phdl(cluster_dict, inference_dict): node_list = [] + env_vars = cluster_dict.get("env_vars") node_list.append(inference_dict['proxy_router_node']) - r_phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file']) + r_phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'], env_vars=env_vars) return r_phdl @pytest.fixture(scope="module") def b_phdl(cluster_dict, inference_dict): node_list = [] + env_vars = cluster_dict.get("env_vars") node_list.append(inference_dict['benchmark_serv_node']) - b_phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file']) + b_phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'], env_vars=env_vars) return b_phdl diff --git a/cvs/tests/inference/vllm/vllm_deepseek31_685b_single.py b/cvs/tests/inference/vllm/vllm_deepseek31_685b_single.py index 8f6e06eb..4373751a 100644 --- a/cvs/tests/inference/vllm/vllm_deepseek31_685b_single.py +++ b/cvs/tests/inference/vllm/vllm_deepseek31_685b_single.py @@ -244,8 +244,9 @@ def s_phdl(cluster_dict): - This fixture has module scope, so a single connection handle is reused for all tests in the module. """ print(cluster_dict) + env_vars = cluster_dict.get("env_vars") node_list = list(cluster_dict['node_dict'].keys()) - s_phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file']) + s_phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'], env_vars=env_vars) return s_phdl @@ -271,8 +272,9 @@ def c_phdl(cluster_dict): - This fixture has module scope, so a single connection handle is reused for all tests in the module. """ print(cluster_dict) + env_vars = cluster_dict.get("env_vars") node_list = list(cluster_dict['node_dict'].keys()) - c_phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file']) + c_phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'], env_vars=env_vars) return c_phdl diff --git a/cvs/tests/inference/vllm/vllm_gpt_oss_120b_single.py b/cvs/tests/inference/vllm/vllm_gpt_oss_120b_single.py index 37726453..6a0e83c6 100644 --- a/cvs/tests/inference/vllm/vllm_gpt_oss_120b_single.py +++ b/cvs/tests/inference/vllm/vllm_gpt_oss_120b_single.py @@ -243,8 +243,9 @@ def s_phdl(cluster_dict): - This fixture has module scope, so a single connection handle is reused for all tests in the module. """ print(cluster_dict) + env_vars = cluster_dict.get("env_vars") node_list = list(cluster_dict['node_dict'].keys()) - s_phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file']) + s_phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'], env_vars=env_vars) return s_phdl @@ -270,8 +271,9 @@ def c_phdl(cluster_dict): - This fixture has module scope, so a single connection handle is reused for all tests in the module. """ print(cluster_dict) + env_vars = cluster_dict.get("env_vars") node_list = list(cluster_dict['node_dict'].keys()) - c_phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file']) + c_phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'], env_vars=env_vars) return c_phdl diff --git a/cvs/tests/inference/vllm/vllm_qwen3_235b_single.py b/cvs/tests/inference/vllm/vllm_qwen3_235b_single.py index c419b617..cb0d6d3a 100644 --- a/cvs/tests/inference/vllm/vllm_qwen3_235b_single.py +++ b/cvs/tests/inference/vllm/vllm_qwen3_235b_single.py @@ -243,8 +243,9 @@ def s_phdl(cluster_dict): - This fixture has module scope, so a single connection handle is reused for all tests in the module. """ print(cluster_dict) + env_vars = cluster_dict.get("env_vars") node_list = list(cluster_dict['node_dict'].keys()) - s_phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file']) + s_phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'], env_vars=env_vars) return s_phdl @@ -270,8 +271,9 @@ def c_phdl(cluster_dict): - This fixture has module scope, so a single connection handle is reused for all tests in the module. """ print(cluster_dict) + env_vars = cluster_dict.get("env_vars") node_list = list(cluster_dict['node_dict'].keys()) - c_phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file']) + c_phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'], env_vars=env_vars) return c_phdl diff --git a/cvs/tests/inference/vllm/vllm_qwen3_80b_single.py b/cvs/tests/inference/vllm/vllm_qwen3_80b_single.py index 7363fe43..66f1bfa3 100644 --- a/cvs/tests/inference/vllm/vllm_qwen3_80b_single.py +++ b/cvs/tests/inference/vllm/vllm_qwen3_80b_single.py @@ -243,8 +243,9 @@ def s_phdl(cluster_dict): - This fixture has module scope, so a single connection handle is reused for all tests in the module. """ print(cluster_dict) + env_vars = cluster_dict.get("env_vars") node_list = list(cluster_dict['node_dict'].keys()) - s_phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file']) + s_phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'], env_vars=env_vars) return s_phdl @@ -270,8 +271,9 @@ def c_phdl(cluster_dict): - This fixture has module scope, so a single connection handle is reused for all tests in the module. """ print(cluster_dict) + env_vars = cluster_dict.get("env_vars") node_list = list(cluster_dict['node_dict'].keys()) - c_phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file']) + c_phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'], env_vars=env_vars) return c_phdl diff --git a/cvs/tests/mori/mori_benchmark_test.py b/cvs/tests/mori/mori_benchmark_test.py index 7ad10e3c..83fb2101 100644 --- a/cvs/tests/mori/mori_benchmark_test.py +++ b/cvs/tests/mori/mori_benchmark_test.py @@ -84,8 +84,9 @@ def mori_dict(mori_config_file, cluster_dict): @pytest.fixture(scope="module") def phdl(cluster_dict): print(cluster_dict) + env_vars = cluster_dict.get("env_vars") node_list = list(cluster_dict['node_dict'].keys()) - phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file']) + phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'], env_vars=env_vars) return phdl diff --git a/cvs/tests/platform/host_configs_cvs.py b/cvs/tests/platform/host_configs_cvs.py index 0b93b4a8..67a9ca59 100644 --- a/cvs/tests/platform/host_configs_cvs.py +++ b/cvs/tests/platform/host_configs_cvs.py @@ -114,8 +114,9 @@ def phdl(cluster_dict): - Assumes Pssh is available in scope and accepts (log, node_list, user, pkey) in its constructor. """ print(cluster_dict) + env_vars = cluster_dict.get("env_vars") node_list = list(cluster_dict['node_dict'].keys()) - phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file']) + phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'], env_vars=env_vars) return phdl diff --git a/cvs/tests/rccl/rccl_heatmap_cvs.py b/cvs/tests/rccl/rccl_heatmap_cvs.py index a467b980..65e251c9 100644 --- a/cvs/tests/rccl/rccl_heatmap_cvs.py +++ b/cvs/tests/rccl/rccl_heatmap_cvs.py @@ -141,8 +141,9 @@ def phdl(cluster_dict): - Assumes Pssh(log, node_list, user=..., pkey=...) is available in scope. """ print(cluster_dict) + env_vars = cluster_dict.get("env_vars") node_list = list(cluster_dict['node_dict'].keys()) - phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file']) + phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'], env_vars=env_vars) return phdl @@ -163,8 +164,9 @@ def shdl(cluster_dict): - nhdl_dict is currently unused; it can be removed unless used elsewhere. """ node_list = list(cluster_dict['node_dict'].keys()) + env_vars = cluster_dict.get("env_vars") head_node = node_list[0] - shdl = Pssh(log, [head_node], user=cluster_dict['username'], pkey=cluster_dict['priv_key_file']) + shdl = Pssh(log, [head_node], user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'], env_vars=env_vars) return shdl diff --git a/cvs/tests/rccl/rccl_multinode_cvs.py b/cvs/tests/rccl/rccl_multinode_cvs.py index 571003f3..43b8c017 100644 --- a/cvs/tests/rccl/rccl_multinode_cvs.py +++ b/cvs/tests/rccl/rccl_multinode_cvs.py @@ -136,8 +136,9 @@ def phdl(cluster_dict): - Assumes Pssh(log, node_list, user=..., pkey=...) is available in scope. """ print(cluster_dict) + env_vars = cluster_dict.get("env_vars") node_list = list(cluster_dict['node_dict'].keys()) - phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file']) + phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'], env_vars=env_vars) return phdl @@ -158,8 +159,9 @@ def shdl(cluster_dict): - nhdl_dict is currently unused; it can be removed unless used elsewhere. """ node_list = list(cluster_dict['node_dict'].keys()) + env_vars = cluster_dict.get("env_vars") head_node = node_list[0] - shdl = Pssh(log, [head_node], user=cluster_dict['username'], pkey=cluster_dict['priv_key_file']) + shdl = Pssh(log, [head_node], user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'], env_vars=env_vars) return shdl diff --git a/cvs/tests/rccl/rccl_multinode_default_cvs.py b/cvs/tests/rccl/rccl_multinode_default_cvs.py index 3d2b6461..25e987aa 100644 --- a/cvs/tests/rccl/rccl_multinode_default_cvs.py +++ b/cvs/tests/rccl/rccl_multinode_default_cvs.py @@ -138,8 +138,9 @@ def phdl(cluster_dict): - Assumes Pssh(log, node_list, user=..., pkey=...) is available in scope. """ print(cluster_dict) + env_vars = cluster_dict.get("env_vars") node_list = list(cluster_dict['node_dict'].keys()) - phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file']) + phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'], env_vars=env_vars) return phdl @@ -160,8 +161,9 @@ def shdl(cluster_dict): - nhdl_dict is currently unused; it can be removed unless used elsewhere. """ node_list = list(cluster_dict['node_dict'].keys()) + env_vars = cluster_dict.get("env_vars") head_node = node_list[0] - shdl = Pssh(log, [head_node], user=cluster_dict['username'], pkey=cluster_dict['priv_key_file']) + shdl = Pssh(log, [head_node], user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'], env_vars=env_vars) return shdl diff --git a/cvs/tests/rccl/rccl_singlenode_cvs.py b/cvs/tests/rccl/rccl_singlenode_cvs.py index ab785947..e9affdb4 100644 --- a/cvs/tests/rccl/rccl_singlenode_cvs.py +++ b/cvs/tests/rccl/rccl_singlenode_cvs.py @@ -138,8 +138,9 @@ def phdl(cluster_dict): - Assumes Pssh(log, node_list, user=..., pkey=...) is available in scope. """ print(cluster_dict) + env_vars = cluster_dict.get("env_vars") node_list = list(cluster_dict['node_dict'].keys()) - phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file']) + phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'], env_vars=env_vars) return phdl @@ -160,8 +161,9 @@ def shdl(cluster_dict): - nhdl_dict is currently unused; it can be removed unless used elsewhere. """ node_list = list(cluster_dict['node_dict'].keys()) + env_vars = cluster_dict.get("env_vars") head_node = node_list[0] - shdl = Pssh(log, [head_node], user=cluster_dict['username'], pkey=cluster_dict['priv_key_file']) + shdl = Pssh(log, [head_node], user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'], env_vars=env_vars) return shdl diff --git a/cvs/tests/training/jax/jax_llama3_1_405b_distributed.py b/cvs/tests/training/jax/jax_llama3_1_405b_distributed.py index 1f73073c..264ea934 100644 --- a/cvs/tests/training/jax/jax_llama3_1_405b_distributed.py +++ b/cvs/tests/training/jax/jax_llama3_1_405b_distributed.py @@ -177,8 +177,9 @@ def phdl(cluster_dict): """ print(cluster_dict) + env_vars = cluster_dict.get("env_vars") node_list = list(cluster_dict['node_dict'].keys()) - phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file']) + phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'], env_vars=env_vars) return phdl diff --git a/cvs/tests/training/jax/jax_llama3_1_70b_distributed.py b/cvs/tests/training/jax/jax_llama3_1_70b_distributed.py index 492f4eb7..5b157d03 100644 --- a/cvs/tests/training/jax/jax_llama3_1_70b_distributed.py +++ b/cvs/tests/training/jax/jax_llama3_1_70b_distributed.py @@ -177,8 +177,9 @@ def phdl(cluster_dict): """ print(cluster_dict) + env_vars = cluster_dict.get("env_vars") node_list = list(cluster_dict['node_dict'].keys()) - phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file']) + phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'], env_vars=env_vars) return phdl diff --git a/cvs/tests/training/jax/jax_llama3_1_70b_single.py b/cvs/tests/training/jax/jax_llama3_1_70b_single.py index 9be11272..ec26c53b 100644 --- a/cvs/tests/training/jax/jax_llama3_1_70b_single.py +++ b/cvs/tests/training/jax/jax_llama3_1_70b_single.py @@ -177,8 +177,9 @@ def phdl(cluster_dict): """ print(cluster_dict) + env_vars = cluster_dict.get("env_vars") node_list = list(cluster_dict['node_dict'].keys()) - phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file']) + phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'], env_vars=env_vars) return phdl diff --git a/cvs/tests/training/megatron/megatron_llama3_1_70b_distributed.py b/cvs/tests/training/megatron/megatron_llama3_1_70b_distributed.py index 8fdfb965..542daa59 100644 --- a/cvs/tests/training/megatron/megatron_llama3_1_70b_distributed.py +++ b/cvs/tests/training/megatron/megatron_llama3_1_70b_distributed.py @@ -180,8 +180,9 @@ def phdl(cluster_dict): - This fixture has module scope, so a single connection handle is reused for all tests in the module. """ print(cluster_dict) + env_vars = cluster_dict.get("env_vars") node_list = list(cluster_dict['node_dict'].keys()) - phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file']) + phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'], env_vars=env_vars) return phdl diff --git a/cvs/tests/training/megatron/megatron_llama3_1_70b_single.py b/cvs/tests/training/megatron/megatron_llama3_1_70b_single.py index a6326df1..a9805c25 100644 --- a/cvs/tests/training/megatron/megatron_llama3_1_70b_single.py +++ b/cvs/tests/training/megatron/megatron_llama3_1_70b_single.py @@ -178,8 +178,9 @@ def phdl(cluster_dict): - This fixture has module scope, so a single connection handle is reused for all tests in the module. """ print(cluster_dict) + env_vars = cluster_dict.get("env_vars") node_list = list(cluster_dict['node_dict'].keys()) - phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file']) + phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'], env_vars=env_vars) return phdl diff --git a/cvs/tests/training/megatron/megatron_llama3_1_8b_distributed.py b/cvs/tests/training/megatron/megatron_llama3_1_8b_distributed.py index ec89049a..40920477 100644 --- a/cvs/tests/training/megatron/megatron_llama3_1_8b_distributed.py +++ b/cvs/tests/training/megatron/megatron_llama3_1_8b_distributed.py @@ -180,8 +180,9 @@ def phdl(cluster_dict): - This fixture has module scope, so a single connection handle is reused for all tests in the module. """ print(cluster_dict) + env_vars = cluster_dict.get("env_vars") node_list = list(cluster_dict['node_dict'].keys()) - phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file']) + phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'], env_vars=env_vars) return phdl diff --git a/cvs/tests/training/megatron/megatron_llama3_1_8b_single.py b/cvs/tests/training/megatron/megatron_llama3_1_8b_single.py index 30943a10..572618aa 100644 --- a/cvs/tests/training/megatron/megatron_llama3_1_8b_single.py +++ b/cvs/tests/training/megatron/megatron_llama3_1_8b_single.py @@ -179,8 +179,9 @@ def phdl(cluster_dict): - This fixture has module scope, so a single connection handle is reused for all tests in the module. """ print(cluster_dict) + env_vars = cluster_dict.get("env_vars") node_list = list(cluster_dict['node_dict'].keys()) - phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file']) + phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'], env_vars=env_vars) return phdl