Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions cvs/input/cluster_file/cluster.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@
{
"mgmt_ip": "{xx.xx.xx.xx|hostname}"
},
"_env_vars_comment": "Custom env variables to be exported before executing each command",
"_env_vars_example":
{
"PATH": "/shared/apps/ubuntu/opt/rocm-7.11.0/bin:$PATH"
},
"env_vars": {},
"node_dict":
{
"{xx.xx.xx.xx|hostname}":
Expand Down
60 changes: 60 additions & 0 deletions cvs/lib/env_lib.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
"""
env_lib.py

Utilities for safely constructing shell-compatible environment variable
exports, with controlled support for self-referential expansion (e.g. PATH).

Key guarantees:
- Prevents shell injection by quoting all user-provided values.
- Allows controlled expansion ONLY for `$KEY` where KEY is the variable being
assigned (e.g., PATH=/x:$PATH, LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/y).
- Expansion is performed on the *remote shell*, not locally.
"""

import shlex


def build_env_prefix(env_vars):
"""
Build a shell-safe export prefix from environment variables.

Supported patterns:
- <prefix>:$VAR (prepend)
- $VAR:<suffix> (append)

Unsupported patterns (treated as literal values):
- Cross-variable expansion (e.g., FOO=$BAR)
- Shell substitution (e.g., $(...), `...`)
- Parameter expansion (e.g., ${VAR:-default})

Args:
env_vars: Dictionary of environment variables to export.

Returns:
A string suitable for prefixing a shell command, e.g.:
"export PATH=/x:$PATH ; export FOO='bar'"
or an empty string if env_vars is empty.
"""
if not env_vars:
return ""

exports = []

for key, value in env_vars.items():
marker = f"${key}"

# Case 1: Prepend to existing variable (e.g., PATH=/x:$PATH)
if value.endswith(":" + marker):
prefix = value[: -(len(marker) + 1)]
exports.append(f"export {key}={shlex.quote(prefix)}:${key}")

# Case 2: Append to existing variable (e.g., PATH=$PATH:/x)
elif value.startswith(marker + ":"):
suffix = value[len(marker) + 1 :]
exports.append(f"export {key}=${key}:{shlex.quote(suffix)}")

# Case 3: Treat as a literal value (fully quoted)
else:
exports.append(f"export {key}={shlex.quote(str(value))}")

return " ; ".join(exports)
37 changes: 30 additions & 7 deletions cvs/lib/parallel_ssh_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import paramiko
from paramiko import SSHClient
from scp import SCPClient
from cvs.lib.env_lib import build_env_prefix


class Pssh:
Expand All @@ -27,7 +28,15 @@ class Pssh:
"""

def __init__(
self, log, host_list, user=None, password=None, pkey='id_rsa', host_key_check=False, stop_on_errors=True
self,
log,
host_list,
user=None,
password=None,
pkey='id_rsa',
host_key_check=False,
stop_on_errors=True,
env_vars=None,
):
self.log = log
self.host_list = host_list
Expand All @@ -38,6 +47,8 @@ def __init__(
self.host_key_check = host_key_check
self.stop_on_errors = stop_on_errors
self.unreachable_hosts = []
self.env_prefix = build_env_prefix(env_vars)
self.log.debug(f"Environ vars: {self.env_prefix}")

if self.password is None:
print(self.reachable_hosts)
Expand Down Expand Up @@ -168,20 +179,27 @@ def exec(self, cmd, timeout=None, print_console=True):
"""
Returns a dictionary of host as key and command output as values
"""
print(f'cmd = {cmd}')
if self.env_prefix:
full_cmd = f"{self.env_prefix} ; {cmd}"
else:
full_cmd = cmd

print(f'cmd = {full_cmd}')

# Log command execution
if self.log:
if timeout is not None:
self.log.debug(f"Executing command on {len(self.reachable_hosts)} host(s) [timeout={timeout}s]: {cmd}")
self.log.debug(
f"Executing command on {len(self.reachable_hosts)} host(s) [timeout={timeout}s]: {full_cmd}"
)
else:
self.log.debug(f"Executing command on {len(self.reachable_hosts)} host(s): {cmd}")
self.log.debug(f"Executing command on {len(self.reachable_hosts)} host(s): {full_cmd}")

if timeout is None:
output = self.client.run_command(cmd, stop_on_errors=self.stop_on_errors)
output = self.client.run_command(full_cmd, stop_on_errors=self.stop_on_errors)
else:
output = self.client.run_command(cmd, read_timeout=timeout, stop_on_errors=self.stop_on_errors)
cmd_output = self._process_output(output, cmd=cmd, print_console=print_console)
output = self.client.run_command(full_cmd, read_timeout=timeout, stop_on_errors=self.stop_on_errors)
cmd_output = self._process_output(output, cmd=full_cmd, print_console=print_console)

# Log per-host execution completion
if self.log:
Expand All @@ -196,6 +214,11 @@ def exec_cmd_list(self, cmd_list, timeout=None, print_console=True):
which runs the same command on all hosts.
Returns a dictionary of host as key and command output as values
"""
if self.env_prefix:
cmd_list = [f"{self.env_prefix} ; {cmd}" for cmd in cmd_list]
else:
cmd_list = cmd_list

print(cmd_list)

# Log command list execution
Expand Down
66 changes: 66 additions & 0 deletions cvs/lib/unittests/test_env_lib.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
"""
test_env_lib.py

Unit tests for env_lib.build_env_prefix using Python's built-in unittest framework.
"""

import unittest
from cvs.lib.env_lib import build_env_prefix


class TestBuildEnvPrefix(unittest.TestCase):
def test_empty_env_vars(self):
self.assertEqual(build_env_prefix({}), "")

def test_literal_env_var(self):
env = {"FOO": "bar"}
result = build_env_prefix(env)
self.assertEqual(result, "export FOO=bar")

def test_literal_env_var_with_spaces(self):
env = {"FOO": "hello world"}
result = build_env_prefix(env)
self.assertEqual(result, "export FOO='hello world'")

def test_path_prepend(self):
env = {"PATH": "/usr/bin:/custom/bin:$PATH"}
result = build_env_prefix(env)
self.assertEqual(result, "export PATH=/usr/bin:/custom/bin:$PATH")

def test_path_append(self):
env = {"PATH": "$PATH:/custom/bin"}
result = build_env_prefix(env)
self.assertEqual(result, "export PATH=$PATH:/custom/bin")

def test_ld_library_path_prepend(self):
env = {"LD_LIBRARY_PATH": "/opt/lib:$LD_LIBRARY_PATH"}
result = build_env_prefix(env)
self.assertEqual(result, "export LD_LIBRARY_PATH=/opt/lib:$LD_LIBRARY_PATH")

def test_multiple_env_vars_mixed(self):
env = {
"PATH": "/usr/bin:$PATH",
"FOO": "bar",
"BAZ": "hello world",
}
result = build_env_prefix(env)

self.assertEqual(result, "export PATH=/usr/bin:$PATH ; export FOO=bar ; export BAZ='hello world'")

def test_cross_variable_expansion_is_not_allowed(self):
env = {"FOO": "$PATH"}
result = build_env_prefix(env)

# Expansion should be blocked and treated as a literal
self.assertEqual(result, "export FOO='$PATH'")

def test_shell_injection_attempt_is_quoted(self):
env = {"FOO": "$(rm -rf /)"}
result = build_env_prefix(env)

# Must be fully quoted to prevent execution
self.assertEqual(result, "export FOO='$(rm -rf /)'")


if __name__ == "__main__":
unittest.main()
10 changes: 9 additions & 1 deletion cvs/tests/health/agfhc_cvs.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,16 @@ def phdl(cluster_dict):
Pssh: A handle to execute commands across all nodes.
"""
print(cluster_dict)
env_vars = cluster_dict.get("env_vars")
node_list = list(cluster_dict['node_dict'].keys())
phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'], stop_on_errors=False)
phdl = Pssh(
log,
node_list,
user=cluster_dict['username'],
pkey=cluster_dict['priv_key_file'],
stop_on_errors=False,
env_vars=env_vars,
)
return phdl


Expand Down
3 changes: 2 additions & 1 deletion cvs/tests/health/csp_qual_agfhc.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,9 @@ def phdl(cluster_dict):
Pssh: A handle to execute commands across all nodes.
"""
print(cluster_dict)
env_vars = cluster_dict.get("env_vars")
node_list = list(cluster_dict['node_dict'].keys())
phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'])
phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'], env_vars=env_vars)
return phdl


Expand Down
3 changes: 2 additions & 1 deletion cvs/tests/health/install/install_agfhc.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,9 @@ def phdl(cluster_dict):
Pssh: A handle to execute commands across all nodes.
"""
print(cluster_dict)
env_vars = cluster_dict.get("env_vars")
node_list = list(cluster_dict['node_dict'].keys())
phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'])
phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'], env_vars=env_vars)
return phdl


Expand Down
3 changes: 2 additions & 1 deletion cvs/tests/health/install/install_babelstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,9 @@ def phdl(cluster_dict):

"""
print(cluster_dict)
env_vars = cluster_dict.get("env_vars")
node_list = list(cluster_dict['node_dict'].keys())
phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'])
phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'], env_vars=env_vars)
return phdl


Expand Down
3 changes: 2 additions & 1 deletion cvs/tests/health/install/install_rocblas.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,9 @@ def phdl(cluster_dict):

"""
print(cluster_dict)
env_vars = cluster_dict.get("env_vars")
node_list = list(cluster_dict['node_dict'].keys())
phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'])
phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'], env_vars=env_vars)
return phdl


Expand Down
6 changes: 4 additions & 2 deletions cvs/tests/health/install/install_rvs.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,9 @@ def shdl(cluster_dict):
- Module scope ensures a single connection context for the duration of the module.
"""
node_list = list(cluster_dict['node_dict'].keys())
env_vars = cluster_dict.get("env_vars")
head_node = node_list[0]
shdl = Pssh(log, [head_node], user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'])
shdl = Pssh(log, [head_node], user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'], env_vars=env_vars)
return shdl


Expand All @@ -204,8 +205,9 @@ def phdl(cluster_dict):
Pssh: Handle that executes commands on all nodes and returns dict[node] -> output.
"""
print(cluster_dict)
env_vars = cluster_dict.get("env_vars")
node_list = list(cluster_dict['node_dict'].keys())
phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'])
phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'], env_vars=env_vars)
return phdl


Expand Down
6 changes: 4 additions & 2 deletions cvs/tests/health/install/install_transferbench.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,9 @@ def shdl(cluster_dict):
- nhdl_dict is currently unused; it can be removed unless used elsewhere.
"""
node_list = list(cluster_dict['node_dict'].keys())
env_vars = cluster_dict.get("env_vars")
head_node = node_list[0]
shdl = Pssh(log, [head_node], user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'])
shdl = Pssh(log, [head_node], user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'], env_vars=env_vars)
return shdl


Expand All @@ -209,8 +210,9 @@ def phdl(cluster_dict):

"""
print(cluster_dict)
env_vars = cluster_dict.get("env_vars")
node_list = list(cluster_dict['node_dict'].keys())
phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'])
phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'], env_vars=env_vars)
return phdl


Expand Down
10 changes: 9 additions & 1 deletion cvs/tests/health/rvs_cvs.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,16 @@ def config_dict(config_file, cluster_dict):
@pytest.fixture(scope="module")
def phdl(cluster_dict):
print(cluster_dict)
env_vars = cluster_dict.get("env_vars")
node_list = list(cluster_dict['node_dict'].keys())
phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'], stop_on_errors=False)
phdl = Pssh(
log,
node_list,
user=cluster_dict['username'],
pkey=cluster_dict['priv_key_file'],
stop_on_errors=False,
env_vars=env_vars,
)
return phdl


Expand Down
3 changes: 2 additions & 1 deletion cvs/tests/health/transferbench_cvs.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,8 +251,9 @@ def parse_tb_example_test_results(out_dict, exp_dict):
@pytest.fixture(scope="module")
def phdl(cluster_dict):
print(cluster_dict)
env_vars = cluster_dict.get("env_vars")
node_list = list(cluster_dict['node_dict'].keys())
phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'])
phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'], env_vars=env_vars)
return phdl


Expand Down
6 changes: 4 additions & 2 deletions cvs/tests/ibperf/ib_perf_bw_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,10 +133,11 @@ def phdl(cluster_dict):
- Assumes Pssh(log, node_list, user=..., pkey=...) is available in scope.
"""
print(cluster_dict)
env_vars = cluster_dict.get("env_vars")
node_list = list(cluster_dict['node_dict'].keys())
if len(node_list) % 2 != 0:
node_list.pop()
phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'])
phdl = Pssh(log, node_list, user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'], env_vars=env_vars)
return phdl


Expand All @@ -157,8 +158,9 @@ def shdl(cluster_dict):
- nhdl_dict is currently unused; it can be removed unless used elsewhere.
"""
node_list = list(cluster_dict['node_dict'].keys())
env_vars = cluster_dict.get("env_vars")
head_node = node_list[0]
shdl = Pssh(log, [head_node], user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'])
shdl = Pssh(log, [head_node], user=cluster_dict['username'], pkey=cluster_dict['priv_key_file'], env_vars=env_vars)
return shdl


Expand Down
Loading
Loading