diff --git a/hera/bin/hera-openFoam b/hera/bin/hera-openFoam index f0aee5955..324608050 100755 --- a/hera/bin/hera-openFoam +++ b/hera/bin/hera-openFoam @@ -10,6 +10,18 @@ from hera.simulations import CLI as workflowCLI from hera.utils.logging import initialize_logging, with_logger +def _add_scheduler_arguments(subparser): + """Add the Luigi scheduler-selection and dispatch_id options to a subparser.""" + subparser.add_argument('--scheduler', dest="scheduler", choices=["local", "central"], default="local", + help="Luigi scheduler to use. 'local' (default) runs with --local-scheduler; 'central' connects to a running luigid.") + subparser.add_argument('--scheduler-host', dest="scheduler_host", type=str, default=None, + help="Host of the central Luigi scheduler (only used with --scheduler central). Defaults to Luigi's default (localhost).") + subparser.add_argument('--scheduler-port', dest="scheduler_port", type=int, default=None, + help="Port of the central Luigi scheduler (only used with --scheduler central). Defaults to Luigi's default (8082).") + subparser.add_argument('--dispatch-id', dest="dispatch_id", type=str, default=None, + help="Unique id for this execution, propagated to all nodes. If omitted a uuid4 is generated.") + + def addSolverTemplateOptions(subparser, solverName, incompressible): """ Adds the template options. @@ -58,6 +70,7 @@ def addSolverTemplateOptions(subparser, solverName, incompressible): help='Add the current workflow to the DB even if it exists there under a different name') parser_prepare.add_argument('--noDB', dest="noDB", action="store_true", default=False, help='Run the workflow without adding to the DB') + _add_scheduler_arguments(parser_prepare) parser_prepare.set_defaults(execute=True) parser_prepare.set_defaults(func=CLI.foam_solver_template_buildExecute) @@ -509,6 +522,7 @@ if __name__ == "__main__": execute_parser.add_argument('--workflowGroup', dest="workflowGroup", default=None, type=str, help='The simulation group. [Optional]. If None, try to infer from the file name ') + _add_scheduler_arguments(execute_parser) execute_parser.set_defaults(execute=True) execute_parser.set_defaults(func=CLI.foam_solver_template_buildExecute) diff --git a/hera/bin/hera-workflows b/hera/bin/hera-workflows index cd8a88621..1ebb2800a 100755 --- a/hera/bin/hera-workflows +++ b/hera/bin/hera-workflows @@ -63,6 +63,14 @@ if __name__ == "__main__": workflow_build_exec_parser = subparsers.add_parser('buildExecute', help='Adds the workflow and executes it.') workflow_build_exec_parser.add_argument('workflowName', type=str, help='The workflow file to load') workflow_build_exec_parser.add_argument('--projectName', type=str, default=None, help=PROJECT_NAME_HELP_DESC) + workflow_build_exec_parser.add_argument('--scheduler', dest="scheduler", choices=["local", "central"], default="local", + help="Luigi scheduler to use. 'local' (default) runs with --local-scheduler; 'central' connects to a running luigid.") + workflow_build_exec_parser.add_argument('--scheduler-host', dest="scheduler_host", type=str, default=None, + help="Host of the central Luigi scheduler (only used with --scheduler central). Defaults to Luigi's default (localhost).") + workflow_build_exec_parser.add_argument('--scheduler-port', dest="scheduler_port", type=int, default=None, + help="Port of the central Luigi scheduler (only used with --scheduler central). Defaults to Luigi's default (8082).") + workflow_build_exec_parser.add_argument('--dispatch-id', dest="dispatch_id", type=str, default=None, + help="Unique id for this execution, propagated to all nodes. If omitted a uuid4 is generated.") workflow_build_exec_parser.set_defaults(func=CLI.workflow_buildExecute) diff --git a/hera/simulations/CLI.py b/hera/simulations/CLI.py index 19d8bbabd..13c7fade3 100644 --- a/hera/simulations/CLI.py +++ b/hera/simulations/CLI.py @@ -666,4 +666,8 @@ def workflow_buildExecute(arguments): else: raise ValueError(f"The workflowName {arguments.workflowName} is not in the DB and not a file on the disk") - wftk.executeWorkflowFromDB(workflowName) \ No newline at end of file + wftk.executeWorkflowFromDB(workflowName, + scheduler=getattr(arguments, "scheduler", "local"), + schedulerHost=getattr(arguments, "scheduler_host", None), + schedulerPort=getattr(arguments, "scheduler_port", None), + dispatch_id=getattr(arguments, "dispatch_id", None)) \ No newline at end of file diff --git a/hera/simulations/LSM/hermesWorkflowToolkit.py b/hera/simulations/LSM/hermesWorkflowToolkit.py index a02ba721a..c0794b009 100644 --- a/hera/simulations/LSM/hermesWorkflowToolkit.py +++ b/hera/simulations/LSM/hermesWorkflowToolkit.py @@ -10,8 +10,10 @@ from ..datalayer import datatypes import numpy import pydoc +import uuid import warnings from ..utils.logging import with_logger,get_classMethod_logger +from ..hermesWorkflowToolkit import buildLuigiExecutionCommand, SCHEDULER_LOCAL, SCHEDULER_CENTRAL try: from hermes import workflow @@ -407,7 +409,11 @@ def addWorkflowToGroup(self, force: bool = False, assignName: bool = False, execute: bool = False, - parameters: dict = dict()): + parameters: dict = dict(), + scheduler: str = SCHEDULER_LOCAL, + schedulerHost: str = None, + schedulerPort: int = None, + dispatch_id: str = None): """ 1. Adds the workflow to the database in the requested group 2. Builds the template (.json) and python executer @@ -580,7 +586,13 @@ def addWorkflowToGroup(self, shutil.rmtree(executionfileDir, ignore_errors=True) pythonPath = os.path.join(self.FilesDirectory, f"{workflowName}") - executionStr = f"python3 -m luigi --module {os.path.basename(pythonPath)} finalnode_xx_0 --local-scheduler" + runDispatchId = dispatch_id or uuid.uuid4().hex + logger.info(f"Executing with scheduler='{scheduler}' dispatch_id='{runDispatchId}'") + executionStr = buildLuigiExecutionCommand(os.path.basename(pythonPath), + runDispatchId, + scheduler=scheduler, + schedulerHost=schedulerHost, + schedulerPort=schedulerPort) self.logger.debug(executionStr) os.system(executionStr) diff --git a/hera/simulations/hermesWorkflowToolkit.py b/hera/simulations/hermesWorkflowToolkit.py index 1fafa1349..9cc8231b0 100644 --- a/hera/simulations/hermesWorkflowToolkit.py +++ b/hera/simulations/hermesWorkflowToolkit.py @@ -12,6 +12,7 @@ from hera.datalayer import datatypes import numpy import pydoc +import uuid import warnings from ..utils.logging import get_classMethod_logger @@ -23,6 +24,52 @@ workflow = None +# Default Luigi scheduler selection values, used by buildLuigiExecutionCommand below. +SCHEDULER_LOCAL = "local" +SCHEDULER_CENTRAL = "central" + + +def buildLuigiExecutionCommand(moduleName, dispatch_id, scheduler=SCHEDULER_LOCAL, + schedulerHost=None, schedulerPort=None, + targetTask="finalnode_xx_0"): + """Build the ``python3 -m luigi`` command line used to execute a workflow. + + A single definition shared by every place that runs a Hermes workflow so the + local/central scheduler logic and the ``dispatch_id`` propagation stay consistent. + + Parameters + ---------- + moduleName : str + The name (without ``.py``) of the generated Luigi module to run with ``--module``. + dispatch_id : str + Unique identifier for this execution, passed to Luigi as ``--dispatch-id``. Luigi + maps the task's ``dispatch_id`` Parameter to the ``--dispatch-id`` CLI flag. + scheduler : str + ``"local"`` (default) adds ``--local-scheduler``; ``"central"`` connects to a + running ``luigid`` central scheduler (optionally at ``schedulerHost``/``schedulerPort``). + schedulerHost, schedulerPort : str, int + Optional central scheduler address. When omitted Luigi uses its defaults + (localhost:8082). Ignored for the local scheduler. + targetTask : str + The terminal task that triggers the whole DAG (default ``finalnode_xx_0``). + + Returns + ------- + str + The full shell command. + """ + cmd = f"python3 -m luigi --module {moduleName} {targetTask}" + if scheduler == SCHEDULER_CENTRAL: + if schedulerHost is not None: + cmd += f" --scheduler-host {schedulerHost}" + if schedulerPort is not None: + cmd += f" --scheduler-port {schedulerPort}" + else: + cmd += " --local-scheduler" + cmd += f" --dispatch-id {dispatch_id}" + return cmd + + @unique class actionModes(Enum): """Workflow action modes controlling which steps are executed. @@ -685,7 +732,9 @@ def addWorkflowToGroup(self, workflowJSON: str, groupName: str, writeWorkflowToF return doc - def executeWorkflowFromDB(self, nameOrWorkflowFileOrJSONOrResource): + def executeWorkflowFromDB(self, nameOrWorkflowFileOrJSONOrResource, + scheduler=SCHEDULER_LOCAL, schedulerHost=None, + schedulerPort=None, dispatch_id=None): """ Building and Executing the workflow. @@ -705,16 +754,31 @@ def executeWorkflowFromDB(self, nameOrWorkflowFileOrJSONOrResource): - Its workflow - workfolow dict. - build : bool [default = True] - If true, also builds the workflow. + scheduler : str [default = "local"] + Which Luigi scheduler to use. ``"local"`` runs with ``--local-scheduler`` + (no separate process). ``"central"`` connects to a running ``luigid``. + + schedulerHost, schedulerPort : str, int [default = None] + Optional central scheduler address (only used when scheduler == "central"). + When omitted Luigi uses its defaults (localhost:8082). + + dispatch_id : str [default = None] + Unique identifier for this execution, propagated to every Luigi node so the + central scheduler can tell distinct runs apart. When None a fresh uuid4 is + generated. Returns ------- - None + str + The dispatch_id used for the execution (generated if not supplied), so + callers can correlate the run with the central scheduler. """ logger = get_classMethod_logger(self, "executeWorkflow") docList = self.getWorkflowListDocumentFromDB(nameOrWorkflowFileOrJSONOrResource) + dispatch_id = dispatch_id or uuid.uuid4().hex + logger.info(f"Executing with scheduler='{scheduler}' dispatch_id='{dispatch_id}'") + for doc in docList: workflowJSON = doc.desc['workflow'] workflowName = doc.desc['workflowName'] @@ -751,9 +815,14 @@ def executeWorkflowFromDB(self, nameOrWorkflowFileOrJSONOrResource): # Step 5: Execute the Luigi pipeline via command line. # 'finalnode_xx_0' is the terminal task that triggers the full DAG. - # --local-scheduler avoids requiring a separate Luigi scheduler process. + # The dispatch_id makes each run unique so the (optional) central scheduler + # does not deduplicate distinct executions of the same workflow. pythonPath = os.path.join(self.FilesDirectory, f"{workflowName}") - executionStr = f"python3 -m luigi --module {os.path.basename(pythonPath)} finalnode_xx_0 --local-scheduler" + executionStr = buildLuigiExecutionCommand(os.path.basename(pythonPath), + dispatch_id, + scheduler=scheduler, + schedulerHost=schedulerHost, + schedulerPort=schedulerPort) logger.debug(executionStr) os.system(executionStr) @@ -761,6 +830,8 @@ def executeWorkflowFromDB(self, nameOrWorkflowFileOrJSONOrResource): logger.info(f"Cleaning the executer python for {workflowName}") os.remove(pythonFileName) + return dispatch_id + def compareWorkflowObj(self, workflowList, diff --git a/hera/simulations/openFoam/toolkit.py b/hera/simulations/openFoam/toolkit.py index 819719add..e0d148870 100644 --- a/hera/simulations/openFoam/toolkit.py +++ b/hera/simulations/openFoam/toolkit.py @@ -68,7 +68,9 @@ def __init__(self, projectName, filesDirectory=None, connectionName=None): self.buoyantReactingFoam = buoyantReactingFoam_toolkitExtension(self) - def runOFSimulation(self,nameOrWorkflowFileOrJSONOrResource): + def runOFSimulation(self,nameOrWorkflowFileOrJSONOrResource, + scheduler="local", schedulerHost=None, + schedulerPort=None, dispatch_id=None): """ Build the workflow and then runs the simulation. @@ -76,13 +78,21 @@ def runOFSimulation(self,nameOrWorkflowFileOrJSONOrResource): ---------- nameOrWorkflowFileOrJSONOrResource + scheduler, schedulerHost, schedulerPort, dispatch_id + Luigi scheduler selection and execution identity, forwarded to + executeWorkflowFromDB. See its docstring for details. + Returns ------- """ logger = get_classMethod_logger(self,"runOFSimulation") logger.info("Building the case") - self.executeWorkflowFromDB(nameOrWorkflowFileOrJSONOrResource) + self.executeWorkflowFromDB(nameOrWorkflowFileOrJSONOrResource, + scheduler=scheduler, + schedulerHost=schedulerHost, + schedulerPort=schedulerPort, + dispatch_id=dispatch_id) logger.info("Executing the cases") docList = self.getWorkflowListDocumentFromDB(nameOrWorkflowFileOrJSONOrResource) diff --git a/hera/tests/test_workflow_execution.py b/hera/tests/test_workflow_execution.py new file mode 100644 index 000000000..97b2e942b --- /dev/null +++ b/hera/tests/test_workflow_execution.py @@ -0,0 +1,159 @@ +""" +Tests for the Luigi execution command construction and the dispatch_id mechanism +introduced for the centralized-scheduler support (issue #918). + +Covers: + - buildLuigiExecutionCommand: local vs central scheduler, host/port, dispatch-id. + - An end-to-end run of a real hermes workflow driven through hera's command, + asserting per-dispatch output isolation (skipped when hermes/luigi are + unavailable or the vendored hermes predates the dispatch_id change). + +The generated Luigi node template (dispatch_id parameter + propagation) is tested +in the hermes repository, since that is where the template lives. +""" + +import os +import subprocess +import sys + +import pytest + +from hera.simulations.hermesWorkflowToolkit import ( + buildLuigiExecutionCommand, + SCHEDULER_LOCAL, + SCHEDULER_CENTRAL, +) + + +# --------------------------------------------------------------------------- +# buildLuigiExecutionCommand +# --------------------------------------------------------------------------- + +def test_local_scheduler_command_is_backward_compatible(): + cmd = buildLuigiExecutionCommand("Flow", "abc123", scheduler=SCHEDULER_LOCAL) + assert "--module Flow finalnode_xx_0" in cmd + assert "--local-scheduler" in cmd + assert "--dispatch-id abc123" in cmd + # The legacy invocation (module + target + local-scheduler) must be preserved. + assert cmd.startswith("python3 -m luigi --module Flow finalnode_xx_0 --local-scheduler") + + +def test_central_scheduler_drops_local_flag(): + cmd = buildLuigiExecutionCommand("Flow", "abc123", scheduler=SCHEDULER_CENTRAL) + assert "--local-scheduler" not in cmd + assert "--dispatch-id abc123" in cmd + + +def test_central_scheduler_host_and_port(): + cmd = buildLuigiExecutionCommand("Flow", "id1", scheduler=SCHEDULER_CENTRAL, + schedulerHost="myhost", schedulerPort=8082) + assert "--scheduler-host myhost" in cmd + assert "--scheduler-port 8082" in cmd + assert "--local-scheduler" not in cmd + + +def test_central_scheduler_omits_address_when_not_given(): + cmd = buildLuigiExecutionCommand("Flow", "id1", scheduler=SCHEDULER_CENTRAL) + assert "--scheduler-host" not in cmd + assert "--scheduler-port" not in cmd + + +def test_local_scheduler_ignores_host_and_port(): + cmd = buildLuigiExecutionCommand("Flow", "id1", scheduler=SCHEDULER_LOCAL, + schedulerHost="myhost", schedulerPort=8082) + assert "--scheduler-host" not in cmd + assert "--scheduler-port" not in cmd + assert "--local-scheduler" in cmd + + +def test_custom_target_task(): + cmd = buildLuigiExecutionCommand("Flow", "id1", targetTask="otherNode_0") + assert "otherNode_0" in cmd + assert "finalnode_xx_0" not in cmd + + +# Note: the generated Luigi node template (the dispatch_id luigi.Parameter, its +# propagation through requires() and per-dispatch output isolation) lives in the +# hermes repository, so it is tested there (tests/test_dispatch_id_template.py), +# not here. Hera's CI vendors hermes' default branch, which would not carry the +# change until the hermes PR merges. + + +# --------------------------------------------------------------------------- +# End-to-end: run a real hermes workflow through hera's command and verify +# per-dispatch output isolation. +# --------------------------------------------------------------------------- + +_WORKFLOW_JSON = { + "workflow": { + "nodes": { + "CopyDirectory": { + "Execution": { + "input_parameters": { + "Source": "source", "Target": "target", "dirs_exist_ok": True, + } + }, + "type": "general.CopyDirectory", + }, + "RunPythonCode": { + "Execution": { + "input_parameters": { + "ModulePath": "tutorial1", + "ClassName": "tutrialPrinter", + "MethodName": "printDirectories", + "Parameters": { + "source": "{CopyDirectory.output.Source}", + "target": "{CopyDirectory.output.Target}", + }, + } + }, + "type": "general.RunPythonCode", + }, + } + } +} + +_TUTORIAL_MODULE = ( + "class tutrialPrinter:\n" + " def printDirectories(self, source, target):\n" + " print(f'Copied {source} to {target}')\n" +) + + +def test_run_hermes_workflow_isolates_outputs_per_dispatch(tmp_path): + """Build and execute a real hermes workflow using hera's buildLuigiExecutionCommand + and confirm the run completes and writes its targets under the dispatch_id subdir.""" + hermes = pytest.importorskip("hermes") + pytest.importorskip("luigi") + from hermes import workflow + from hermes.engines.luigi.pythonClassBase import transform + + # The vendored hermes must carry the dispatch_id change for this to run; otherwise + # passing --dispatch-id to nodes that don't declare it would error. Skip until then. + if "dispatch_id" not in transform._basicLuigiTemplate: + pytest.skip("vendored hermes predates the dispatch_id node template change") + + workdir = str(tmp_path) + os.makedirs(os.path.join(workdir, "source"), exist_ok=True) + with open(os.path.join(workdir, "tutorial1.py"), "w") as fp: + fp.write(_TUTORIAL_MODULE) + + wf = workflow(_WORKFLOW_JSON, workdir, Resource_path=workdir) + with open(os.path.join(workdir, "Workflow1.py"), "w") as fp: + fp.write(wf.build(buildername=workflow.BUILDER_LUIGI)) + + # Drive the run through hera's own command builder (python3 -> current interpreter + # so the test is robust to how python is named in the environment). + cmd = buildLuigiExecutionCommand("Workflow1", "RUN1", scheduler=SCHEDULER_LOCAL) + cmd = cmd.replace("python3", sys.executable, 1) + env = dict(os.environ) + env["PYTHONPATH"] = os.pathsep.join( + [os.path.dirname(hermes.__file__) + "/..", workdir, env.get("PYTHONPATH", "")] + ) + result = subprocess.run(cmd, shell=True, cwd=workdir, env=env, + capture_output=True, text=True) + + assert "this progress looks :)" in result.stderr.lower(), result.stderr + run_dir = os.path.join(workdir, "Workflow1_targetFiles", "RUN1") + assert os.path.isfile(os.path.join(run_dir, "finalnode_xx_0.json")) + assert os.path.isfile(os.path.join(run_dir, "RunPythonCode_0.json"))