From 0cae44fc7b1e850d9e830d3d7530cd8df337b3eb Mon Sep 17 00:00:00 2001 From: Ilay Falach Date: Mon, 15 Jun 2026 16:45:56 +0300 Subject: [PATCH 1/4] feat: optional centralized Luigi scheduler + dispatch_id for hermes workflows (#918) Move hermes workflow execution off the hardcoded --local-scheduler to an optional centralized scheduler, and propagate a unique dispatch_id to every Luigi node so the central scheduler (which keys on task family + parameters) can tell distinct runs apart. - buildLuigiExecutionCommand helper builds local/central invocations and the --dispatch-id flag, shared by both execution call sites. - executeWorkflowFromDB / LSM addWorkflowToGroup / OF runOFSimulation accept scheduler, schedulerHost, schedulerPort, dispatch_id (auto uuid4, logged). - hera-workflows buildExecute CLI gains --scheduler/--scheduler-host/ --scheduler-port/--dispatch-id, forwarded through workflow_buildExecute. - Add test_workflow_execution.py covering the command builder and template. Requires the matching hermes change (dispatch_id luigi.Parameter on nodes). Co-Authored-By: Claude Opus 4.8 (1M context) --- hera/bin/hera-workflows | 8 ++ hera/simulations/CLI.py | 6 +- hera/simulations/LSM/hermesWorkflowToolkit.py | 16 +++- hera/simulations/hermesWorkflowToolkit.py | 83 +++++++++++++++++-- hera/simulations/openFoam/toolkit.py | 14 +++- hera/tests/test_workflow_execution.py | 81 ++++++++++++++++++ 6 files changed, 197 insertions(+), 11 deletions(-) create mode 100644 hera/tests/test_workflow_execution.py diff --git a/hera/bin/hera-workflows b/hera/bin/hera-workflows index cd8a88621..1ebb2800a 100755 --- a/hera/bin/hera-workflows +++ b/hera/bin/hera-workflows @@ -63,6 +63,14 @@ if __name__ == "__main__": workflow_build_exec_parser = subparsers.add_parser('buildExecute', help='Adds the workflow and executes it.') workflow_build_exec_parser.add_argument('workflowName', type=str, help='The workflow file to load') workflow_build_exec_parser.add_argument('--projectName', type=str, default=None, help=PROJECT_NAME_HELP_DESC) + workflow_build_exec_parser.add_argument('--scheduler', dest="scheduler", choices=["local", "central"], default="local", + help="Luigi scheduler to use. 'local' (default) runs with --local-scheduler; 'central' connects to a running luigid.") + workflow_build_exec_parser.add_argument('--scheduler-host', dest="scheduler_host", type=str, default=None, + help="Host of the central Luigi scheduler (only used with --scheduler central). Defaults to Luigi's default (localhost).") + workflow_build_exec_parser.add_argument('--scheduler-port', dest="scheduler_port", type=int, default=None, + help="Port of the central Luigi scheduler (only used with --scheduler central). Defaults to Luigi's default (8082).") + workflow_build_exec_parser.add_argument('--dispatch-id', dest="dispatch_id", type=str, default=None, + help="Unique id for this execution, propagated to all nodes. If omitted a uuid4 is generated.") workflow_build_exec_parser.set_defaults(func=CLI.workflow_buildExecute) diff --git a/hera/simulations/CLI.py b/hera/simulations/CLI.py index 19d8bbabd..13c7fade3 100644 --- a/hera/simulations/CLI.py +++ b/hera/simulations/CLI.py @@ -666,4 +666,8 @@ def workflow_buildExecute(arguments): else: raise ValueError(f"The workflowName {arguments.workflowName} is not in the DB and not a file on the disk") - wftk.executeWorkflowFromDB(workflowName) \ No newline at end of file + wftk.executeWorkflowFromDB(workflowName, + scheduler=getattr(arguments, "scheduler", "local"), + schedulerHost=getattr(arguments, "scheduler_host", None), + schedulerPort=getattr(arguments, "scheduler_port", None), + dispatch_id=getattr(arguments, "dispatch_id", None)) \ No newline at end of file diff --git a/hera/simulations/LSM/hermesWorkflowToolkit.py b/hera/simulations/LSM/hermesWorkflowToolkit.py index a02ba721a..c0794b009 100644 --- a/hera/simulations/LSM/hermesWorkflowToolkit.py +++ b/hera/simulations/LSM/hermesWorkflowToolkit.py @@ -10,8 +10,10 @@ from ..datalayer import datatypes import numpy import pydoc +import uuid import warnings from ..utils.logging import with_logger,get_classMethod_logger +from ..hermesWorkflowToolkit import buildLuigiExecutionCommand, SCHEDULER_LOCAL, SCHEDULER_CENTRAL try: from hermes import workflow @@ -407,7 +409,11 @@ def addWorkflowToGroup(self, force: bool = False, assignName: bool = False, execute: bool = False, - parameters: dict = dict()): + parameters: dict = dict(), + scheduler: str = SCHEDULER_LOCAL, + schedulerHost: str = None, + schedulerPort: int = None, + dispatch_id: str = None): """ 1. Adds the workflow to the database in the requested group 2. Builds the template (.json) and python executer @@ -580,7 +586,13 @@ def addWorkflowToGroup(self, shutil.rmtree(executionfileDir, ignore_errors=True) pythonPath = os.path.join(self.FilesDirectory, f"{workflowName}") - executionStr = f"python3 -m luigi --module {os.path.basename(pythonPath)} finalnode_xx_0 --local-scheduler" + runDispatchId = dispatch_id or uuid.uuid4().hex + logger.info(f"Executing with scheduler='{scheduler}' dispatch_id='{runDispatchId}'") + executionStr = buildLuigiExecutionCommand(os.path.basename(pythonPath), + runDispatchId, + scheduler=scheduler, + schedulerHost=schedulerHost, + schedulerPort=schedulerPort) self.logger.debug(executionStr) os.system(executionStr) diff --git a/hera/simulations/hermesWorkflowToolkit.py b/hera/simulations/hermesWorkflowToolkit.py index 1fafa1349..9cc8231b0 100644 --- a/hera/simulations/hermesWorkflowToolkit.py +++ b/hera/simulations/hermesWorkflowToolkit.py @@ -12,6 +12,7 @@ from hera.datalayer import datatypes import numpy import pydoc +import uuid import warnings from ..utils.logging import get_classMethod_logger @@ -23,6 +24,52 @@ workflow = None +# Default Luigi scheduler selection values, used by buildLuigiExecutionCommand below. +SCHEDULER_LOCAL = "local" +SCHEDULER_CENTRAL = "central" + + +def buildLuigiExecutionCommand(moduleName, dispatch_id, scheduler=SCHEDULER_LOCAL, + schedulerHost=None, schedulerPort=None, + targetTask="finalnode_xx_0"): + """Build the ``python3 -m luigi`` command line used to execute a workflow. + + A single definition shared by every place that runs a Hermes workflow so the + local/central scheduler logic and the ``dispatch_id`` propagation stay consistent. + + Parameters + ---------- + moduleName : str + The name (without ``.py``) of the generated Luigi module to run with ``--module``. + dispatch_id : str + Unique identifier for this execution, passed to Luigi as ``--dispatch-id``. Luigi + maps the task's ``dispatch_id`` Parameter to the ``--dispatch-id`` CLI flag. + scheduler : str + ``"local"`` (default) adds ``--local-scheduler``; ``"central"`` connects to a + running ``luigid`` central scheduler (optionally at ``schedulerHost``/``schedulerPort``). + schedulerHost, schedulerPort : str, int + Optional central scheduler address. When omitted Luigi uses its defaults + (localhost:8082). Ignored for the local scheduler. + targetTask : str + The terminal task that triggers the whole DAG (default ``finalnode_xx_0``). + + Returns + ------- + str + The full shell command. + """ + cmd = f"python3 -m luigi --module {moduleName} {targetTask}" + if scheduler == SCHEDULER_CENTRAL: + if schedulerHost is not None: + cmd += f" --scheduler-host {schedulerHost}" + if schedulerPort is not None: + cmd += f" --scheduler-port {schedulerPort}" + else: + cmd += " --local-scheduler" + cmd += f" --dispatch-id {dispatch_id}" + return cmd + + @unique class actionModes(Enum): """Workflow action modes controlling which steps are executed. @@ -685,7 +732,9 @@ def addWorkflowToGroup(self, workflowJSON: str, groupName: str, writeWorkflowToF return doc - def executeWorkflowFromDB(self, nameOrWorkflowFileOrJSONOrResource): + def executeWorkflowFromDB(self, nameOrWorkflowFileOrJSONOrResource, + scheduler=SCHEDULER_LOCAL, schedulerHost=None, + schedulerPort=None, dispatch_id=None): """ Building and Executing the workflow. @@ -705,16 +754,31 @@ def executeWorkflowFromDB(self, nameOrWorkflowFileOrJSONOrResource): - Its workflow - workfolow dict. - build : bool [default = True] - If true, also builds the workflow. + scheduler : str [default = "local"] + Which Luigi scheduler to use. ``"local"`` runs with ``--local-scheduler`` + (no separate process). ``"central"`` connects to a running ``luigid``. + + schedulerHost, schedulerPort : str, int [default = None] + Optional central scheduler address (only used when scheduler == "central"). + When omitted Luigi uses its defaults (localhost:8082). + + dispatch_id : str [default = None] + Unique identifier for this execution, propagated to every Luigi node so the + central scheduler can tell distinct runs apart. When None a fresh uuid4 is + generated. Returns ------- - None + str + The dispatch_id used for the execution (generated if not supplied), so + callers can correlate the run with the central scheduler. """ logger = get_classMethod_logger(self, "executeWorkflow") docList = self.getWorkflowListDocumentFromDB(nameOrWorkflowFileOrJSONOrResource) + dispatch_id = dispatch_id or uuid.uuid4().hex + logger.info(f"Executing with scheduler='{scheduler}' dispatch_id='{dispatch_id}'") + for doc in docList: workflowJSON = doc.desc['workflow'] workflowName = doc.desc['workflowName'] @@ -751,9 +815,14 @@ def executeWorkflowFromDB(self, nameOrWorkflowFileOrJSONOrResource): # Step 5: Execute the Luigi pipeline via command line. # 'finalnode_xx_0' is the terminal task that triggers the full DAG. - # --local-scheduler avoids requiring a separate Luigi scheduler process. + # The dispatch_id makes each run unique so the (optional) central scheduler + # does not deduplicate distinct executions of the same workflow. pythonPath = os.path.join(self.FilesDirectory, f"{workflowName}") - executionStr = f"python3 -m luigi --module {os.path.basename(pythonPath)} finalnode_xx_0 --local-scheduler" + executionStr = buildLuigiExecutionCommand(os.path.basename(pythonPath), + dispatch_id, + scheduler=scheduler, + schedulerHost=schedulerHost, + schedulerPort=schedulerPort) logger.debug(executionStr) os.system(executionStr) @@ -761,6 +830,8 @@ def executeWorkflowFromDB(self, nameOrWorkflowFileOrJSONOrResource): logger.info(f"Cleaning the executer python for {workflowName}") os.remove(pythonFileName) + return dispatch_id + def compareWorkflowObj(self, workflowList, diff --git a/hera/simulations/openFoam/toolkit.py b/hera/simulations/openFoam/toolkit.py index 819719add..e0d148870 100644 --- a/hera/simulations/openFoam/toolkit.py +++ b/hera/simulations/openFoam/toolkit.py @@ -68,7 +68,9 @@ def __init__(self, projectName, filesDirectory=None, connectionName=None): self.buoyantReactingFoam = buoyantReactingFoam_toolkitExtension(self) - def runOFSimulation(self,nameOrWorkflowFileOrJSONOrResource): + def runOFSimulation(self,nameOrWorkflowFileOrJSONOrResource, + scheduler="local", schedulerHost=None, + schedulerPort=None, dispatch_id=None): """ Build the workflow and then runs the simulation. @@ -76,13 +78,21 @@ def runOFSimulation(self,nameOrWorkflowFileOrJSONOrResource): ---------- nameOrWorkflowFileOrJSONOrResource + scheduler, schedulerHost, schedulerPort, dispatch_id + Luigi scheduler selection and execution identity, forwarded to + executeWorkflowFromDB. See its docstring for details. + Returns ------- """ logger = get_classMethod_logger(self,"runOFSimulation") logger.info("Building the case") - self.executeWorkflowFromDB(nameOrWorkflowFileOrJSONOrResource) + self.executeWorkflowFromDB(nameOrWorkflowFileOrJSONOrResource, + scheduler=scheduler, + schedulerHost=schedulerHost, + schedulerPort=schedulerPort, + dispatch_id=dispatch_id) logger.info("Executing the cases") docList = self.getWorkflowListDocumentFromDB(nameOrWorkflowFileOrJSONOrResource) diff --git a/hera/tests/test_workflow_execution.py b/hera/tests/test_workflow_execution.py new file mode 100644 index 000000000..d6e9b8b9a --- /dev/null +++ b/hera/tests/test_workflow_execution.py @@ -0,0 +1,81 @@ +""" +Tests for the Luigi execution command construction and the dispatch_id mechanism +introduced for the centralized-scheduler support (issue #918). + +Covers: + - buildLuigiExecutionCommand: local vs central scheduler, host/port, dispatch-id. + - The generated Luigi node template carries a dispatch_id luigi.Parameter and + propagates it through requires()/output() (guarded by hermes availability). +""" + +import pytest + +from hera.simulations.hermesWorkflowToolkit import ( + buildLuigiExecutionCommand, + SCHEDULER_LOCAL, + SCHEDULER_CENTRAL, +) + + +# --------------------------------------------------------------------------- +# buildLuigiExecutionCommand +# --------------------------------------------------------------------------- + +def test_local_scheduler_command_is_backward_compatible(): + cmd = buildLuigiExecutionCommand("Flow", "abc123", scheduler=SCHEDULER_LOCAL) + assert "--module Flow finalnode_xx_0" in cmd + assert "--local-scheduler" in cmd + assert "--dispatch-id abc123" in cmd + # The legacy invocation (module + target + local-scheduler) must be preserved. + assert cmd.startswith("python3 -m luigi --module Flow finalnode_xx_0 --local-scheduler") + + +def test_central_scheduler_drops_local_flag(): + cmd = buildLuigiExecutionCommand("Flow", "abc123", scheduler=SCHEDULER_CENTRAL) + assert "--local-scheduler" not in cmd + assert "--dispatch-id abc123" in cmd + + +def test_central_scheduler_host_and_port(): + cmd = buildLuigiExecutionCommand("Flow", "id1", scheduler=SCHEDULER_CENTRAL, + schedulerHost="myhost", schedulerPort=8082) + assert "--scheduler-host myhost" in cmd + assert "--scheduler-port 8082" in cmd + assert "--local-scheduler" not in cmd + + +def test_central_scheduler_omits_address_when_not_given(): + cmd = buildLuigiExecutionCommand("Flow", "id1", scheduler=SCHEDULER_CENTRAL) + assert "--scheduler-host" not in cmd + assert "--scheduler-port" not in cmd + + +def test_local_scheduler_ignores_host_and_port(): + cmd = buildLuigiExecutionCommand("Flow", "id1", scheduler=SCHEDULER_LOCAL, + schedulerHost="myhost", schedulerPort=8082) + assert "--scheduler-host" not in cmd + assert "--scheduler-port" not in cmd + assert "--local-scheduler" in cmd + + +def test_custom_target_task(): + cmd = buildLuigiExecutionCommand("Flow", "id1", targetTask="otherNode_0") + assert "otherNode_0" in cmd + assert "finalnode_xx_0" not in cmd + + +# --------------------------------------------------------------------------- +# Generated Luigi node template (requires hermes to be importable) +# --------------------------------------------------------------------------- + +def test_generated_template_declares_and_propagates_dispatch_id(): + pytest.importorskip("hermes") + from hermes.engines.luigi.pythonClassBase import transform + + template = transform._basicLuigiTemplate + # The parameter must be declared on every generated node class... + assert "dispatch_id = luigi.Parameter(default=\"\")" in template + # ...propagated to required tasks (Luigi does not thread params automatically)... + assert "dispatch_id=self.dispatch_id" in template + # ...and used to isolate per-dispatch output targets. + assert "self.dispatch_id or" in template From d56bc9adea0ffff0cfc0a7388c815b512c21e989 Mon Sep 17 00:00:00 2001 From: Ilay Falach Date: Mon, 15 Jun 2026 17:01:53 +0300 Subject: [PATCH 2/4] feat: expose --scheduler/--dispatch-id on hera-openFoam buildExecute paths (#918) The OpenFOAM CLI executes workflows via hermes handler_buildExecute, a path separate from executeWorkflowFromDB. Add the scheduler-selection and dispatch_id flags to its executeWorkflow/buildExecute subparsers so the centralized scheduler option reaches those runs too. Co-Authored-By: Claude Opus 4.8 (1M context) --- hera/bin/hera-openFoam | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/hera/bin/hera-openFoam b/hera/bin/hera-openFoam index f0aee5955..324608050 100755 --- a/hera/bin/hera-openFoam +++ b/hera/bin/hera-openFoam @@ -10,6 +10,18 @@ from hera.simulations import CLI as workflowCLI from hera.utils.logging import initialize_logging, with_logger +def _add_scheduler_arguments(subparser): + """Add the Luigi scheduler-selection and dispatch_id options to a subparser.""" + subparser.add_argument('--scheduler', dest="scheduler", choices=["local", "central"], default="local", + help="Luigi scheduler to use. 'local' (default) runs with --local-scheduler; 'central' connects to a running luigid.") + subparser.add_argument('--scheduler-host', dest="scheduler_host", type=str, default=None, + help="Host of the central Luigi scheduler (only used with --scheduler central). Defaults to Luigi's default (localhost).") + subparser.add_argument('--scheduler-port', dest="scheduler_port", type=int, default=None, + help="Port of the central Luigi scheduler (only used with --scheduler central). Defaults to Luigi's default (8082).") + subparser.add_argument('--dispatch-id', dest="dispatch_id", type=str, default=None, + help="Unique id for this execution, propagated to all nodes. If omitted a uuid4 is generated.") + + def addSolverTemplateOptions(subparser, solverName, incompressible): """ Adds the template options. @@ -58,6 +70,7 @@ def addSolverTemplateOptions(subparser, solverName, incompressible): help='Add the current workflow to the DB even if it exists there under a different name') parser_prepare.add_argument('--noDB', dest="noDB", action="store_true", default=False, help='Run the workflow without adding to the DB') + _add_scheduler_arguments(parser_prepare) parser_prepare.set_defaults(execute=True) parser_prepare.set_defaults(func=CLI.foam_solver_template_buildExecute) @@ -509,6 +522,7 @@ if __name__ == "__main__": execute_parser.add_argument('--workflowGroup', dest="workflowGroup", default=None, type=str, help='The simulation group. [Optional]. If None, try to infer from the file name ') + _add_scheduler_arguments(execute_parser) execute_parser.set_defaults(execute=True) execute_parser.set_defaults(func=CLI.foam_solver_template_buildExecute) From a43fd9223b551f3191e1df4eb84b973d42e9959e Mon Sep 17 00:00:00 2001 From: Ilay Falach Date: Mon, 15 Jun 2026 17:21:44 +0300 Subject: [PATCH 3/4] test: move dispatch_id template assertion to the hermes repo (#918) The template lives in hermes; hera CI vendors hermes' default branch, which does not carry the dispatch_id change until the hermes PR merges, so a hera-side assertion on the template fails in CI. Keep only the hera command-builder tests here; the template is tested in the hermes repo. Co-Authored-By: Claude Opus 4.8 (1M context) --- hera/tests/test_workflow_execution.py | 27 ++++++++------------------- 1 file changed, 8 insertions(+), 19 deletions(-) diff --git a/hera/tests/test_workflow_execution.py b/hera/tests/test_workflow_execution.py index d6e9b8b9a..981144ec8 100644 --- a/hera/tests/test_workflow_execution.py +++ b/hera/tests/test_workflow_execution.py @@ -4,11 +4,10 @@ Covers: - buildLuigiExecutionCommand: local vs central scheduler, host/port, dispatch-id. - - The generated Luigi node template carries a dispatch_id luigi.Parameter and - propagates it through requires()/output() (guarded by hermes availability). -""" -import pytest +The generated Luigi node template (dispatch_id parameter + propagation) is tested +in the hermes repository, since that is where the template lives. +""" from hera.simulations.hermesWorkflowToolkit import ( buildLuigiExecutionCommand, @@ -64,18 +63,8 @@ def test_custom_target_task(): assert "finalnode_xx_0" not in cmd -# --------------------------------------------------------------------------- -# Generated Luigi node template (requires hermes to be importable) -# --------------------------------------------------------------------------- - -def test_generated_template_declares_and_propagates_dispatch_id(): - pytest.importorskip("hermes") - from hermes.engines.luigi.pythonClassBase import transform - - template = transform._basicLuigiTemplate - # The parameter must be declared on every generated node class... - assert "dispatch_id = luigi.Parameter(default=\"\")" in template - # ...propagated to required tasks (Luigi does not thread params automatically)... - assert "dispatch_id=self.dispatch_id" in template - # ...and used to isolate per-dispatch output targets. - assert "self.dispatch_id or" in template +# Note: the generated Luigi node template (the dispatch_id luigi.Parameter, its +# propagation through requires() and per-dispatch output isolation) lives in the +# hermes repository, so it is tested there (tests/test_dispatch_id_template.py), +# not here. Hera's CI vendors hermes' default branch, which would not carry the +# change until the hermes PR merges. From 1311b05ec825dcf1dbf47975a53ebb6dd731cbc5 Mon Sep 17 00:00:00 2001 From: Ilay Falach Date: Tue, 16 Jun 2026 09:08:52 +0300 Subject: [PATCH 4/4] test: add end-to-end hermes workflow run with dispatch_id isolation (#918) Builds and executes the Tutorial workflow through hera's buildLuigiExecutionCommand and asserts the run completes with per-dispatch output isolation. Guarded to skip when hermes/luigi are unavailable or the vendored hermes predates the dispatch_id node template change, so it stays green until the hermes PR merges and activates automatically afterwards. Co-Authored-By: Claude Opus 4.8 (1M context) --- hera/tests/test_workflow_execution.py | 89 +++++++++++++++++++++++++++ 1 file changed, 89 insertions(+) diff --git a/hera/tests/test_workflow_execution.py b/hera/tests/test_workflow_execution.py index 981144ec8..97b2e942b 100644 --- a/hera/tests/test_workflow_execution.py +++ b/hera/tests/test_workflow_execution.py @@ -4,11 +4,20 @@ Covers: - buildLuigiExecutionCommand: local vs central scheduler, host/port, dispatch-id. + - An end-to-end run of a real hermes workflow driven through hera's command, + asserting per-dispatch output isolation (skipped when hermes/luigi are + unavailable or the vendored hermes predates the dispatch_id change). The generated Luigi node template (dispatch_id parameter + propagation) is tested in the hermes repository, since that is where the template lives. """ +import os +import subprocess +import sys + +import pytest + from hera.simulations.hermesWorkflowToolkit import ( buildLuigiExecutionCommand, SCHEDULER_LOCAL, @@ -68,3 +77,83 @@ def test_custom_target_task(): # hermes repository, so it is tested there (tests/test_dispatch_id_template.py), # not here. Hera's CI vendors hermes' default branch, which would not carry the # change until the hermes PR merges. + + +# --------------------------------------------------------------------------- +# End-to-end: run a real hermes workflow through hera's command and verify +# per-dispatch output isolation. +# --------------------------------------------------------------------------- + +_WORKFLOW_JSON = { + "workflow": { + "nodes": { + "CopyDirectory": { + "Execution": { + "input_parameters": { + "Source": "source", "Target": "target", "dirs_exist_ok": True, + } + }, + "type": "general.CopyDirectory", + }, + "RunPythonCode": { + "Execution": { + "input_parameters": { + "ModulePath": "tutorial1", + "ClassName": "tutrialPrinter", + "MethodName": "printDirectories", + "Parameters": { + "source": "{CopyDirectory.output.Source}", + "target": "{CopyDirectory.output.Target}", + }, + } + }, + "type": "general.RunPythonCode", + }, + } + } +} + +_TUTORIAL_MODULE = ( + "class tutrialPrinter:\n" + " def printDirectories(self, source, target):\n" + " print(f'Copied {source} to {target}')\n" +) + + +def test_run_hermes_workflow_isolates_outputs_per_dispatch(tmp_path): + """Build and execute a real hermes workflow using hera's buildLuigiExecutionCommand + and confirm the run completes and writes its targets under the dispatch_id subdir.""" + hermes = pytest.importorskip("hermes") + pytest.importorskip("luigi") + from hermes import workflow + from hermes.engines.luigi.pythonClassBase import transform + + # The vendored hermes must carry the dispatch_id change for this to run; otherwise + # passing --dispatch-id to nodes that don't declare it would error. Skip until then. + if "dispatch_id" not in transform._basicLuigiTemplate: + pytest.skip("vendored hermes predates the dispatch_id node template change") + + workdir = str(tmp_path) + os.makedirs(os.path.join(workdir, "source"), exist_ok=True) + with open(os.path.join(workdir, "tutorial1.py"), "w") as fp: + fp.write(_TUTORIAL_MODULE) + + wf = workflow(_WORKFLOW_JSON, workdir, Resource_path=workdir) + with open(os.path.join(workdir, "Workflow1.py"), "w") as fp: + fp.write(wf.build(buildername=workflow.BUILDER_LUIGI)) + + # Drive the run through hera's own command builder (python3 -> current interpreter + # so the test is robust to how python is named in the environment). + cmd = buildLuigiExecutionCommand("Workflow1", "RUN1", scheduler=SCHEDULER_LOCAL) + cmd = cmd.replace("python3", sys.executable, 1) + env = dict(os.environ) + env["PYTHONPATH"] = os.pathsep.join( + [os.path.dirname(hermes.__file__) + "/..", workdir, env.get("PYTHONPATH", "")] + ) + result = subprocess.run(cmd, shell=True, cwd=workdir, env=env, + capture_output=True, text=True) + + assert "this progress looks :)" in result.stderr.lower(), result.stderr + run_dir = os.path.join(workdir, "Workflow1_targetFiles", "RUN1") + assert os.path.isfile(os.path.join(run_dir, "finalnode_xx_0.json")) + assert os.path.isfile(os.path.join(run_dir, "RunPythonCode_0.json"))