Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 39 additions & 2 deletions src/bindings/python/flux/job/Jobspec.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,26 @@
import math
import numbers
import os
import threading

import yaml
from _flux._core import ffi
from flux import hostlist, idset
from flux import Flux, hostlist, idset
from flux.util import Fileref, del_treedict, parse_fsd, set_treedict

# thread local storage to support shared and cached validation data
_tls = threading.local()


def _get_flux():
"""
Obtain a per-thread Flux handle created on-demand
(used for Jobspec validation)
"""
if not hasattr(_tls, "flux"):
_tls.flux = Flux()
return _tls.flux


def _convert_jobspec_arg_to_string(jobspec):
"""
Expand Down Expand Up @@ -72,6 +86,29 @@ def _validate_property_query(name):
raise TypeError(f"invalid character in property '{name}'")


def _validate_hosts(hosts):
"""
Raise an exception if any of a set of hosts are not valid for the current
enclosing Flux instance.
"""
if not hasattr(_tls, "hostlist"):
_tls.hostlist = None
try:
string = _get_flux().attr_get("hostlist")
if string is not None:
_tls.hostlist = hostlist.decode(string)
except (FileNotFoundError, ValueError):
# ignore failures above and leave hostlist set to None,
# effectively causes this check to be ignored.
pass

if _tls.hostlist is not None:
try:
_tls.hostlist.index(hosts)
except FileNotFoundError as exc:
raise ValueError(f"host constraint contains invalid hosts: {exc}")


def _validate_constraint_op(operator, args):
if not isinstance(operator, str):
raise TypeError(f"constraint operation {operator} is not a string")
Expand All @@ -85,7 +122,7 @@ def _validate_constraint_op(operator, args):
_validate_property_query(name)
elif operator in ["hostlist"]:
for hosts in args:
hostlist.decode(hosts)
_validate_hosts(hostlist.decode(hosts))
elif operator in ["ranks"]:
for ranks in args:
idset.decode(ranks)
Expand Down
13 changes: 13 additions & 0 deletions t/t2110-job-ingest-validator.t
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,19 @@ test_expect_success 'job-ingest: all valid jobspecs accepted' '
test_expect_success 'job-ingest: invalid jobs rejected' '
test_invalid ${JOBSPEC}/invalid/*
'
test_expect_success 'job-ingest: invalid hostlist constraint is caught' '
test_must_fail flux run --requires=host:badhost true \
2>host-inval.err &&
test_debug "cat host-inval.err" &&
grep "invalid host.*badhost.*not found" host-inval.err
'
test_expect_success 'job-ingest: one invalid host among multiple is caught' '
host=$(hostname) &&
test_must_fail flux run --requires=host:$host,badhost true \
2>hosts-inval.err &&
test_debug "cat hosts-inval.err" &&
grep "invalid host.*badhost.*not found" hosts-inval.err
'
test_expect_success 'job-ingest: stop the queue so no more jobs run' '
flux queue stop
'
Expand Down
2 changes: 1 addition & 1 deletion t/t2260-job-list.t
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ test_description='Test flux job list services'
. $(dirname $0)/sharness.sh

export FLUX_CONF_DIR=$(pwd)
test_under_flux 4 job
test_under_flux 4 job --test-hosts=node[0-3]

RPC=${FLUX_BUILD_DIR}/t/request/rpc
listRPC="flux python ${SHARNESS_TEST_SRCDIR}/job-list/list-rpc.py"
Expand Down
Loading