Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions hera/bin/hera-openFoam
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,18 @@ from hera.simulations import CLI as workflowCLI
from hera.utils.logging import initialize_logging, with_logger


def _add_scheduler_arguments(subparser):
"""Add the Luigi scheduler-selection and dispatch_id options to a subparser."""
subparser.add_argument('--scheduler', dest="scheduler", choices=["local", "central"], default="local",
help="Luigi scheduler to use. 'local' (default) runs with --local-scheduler; 'central' connects to a running luigid.")
subparser.add_argument('--scheduler-host', dest="scheduler_host", type=str, default=None,
help="Host of the central Luigi scheduler (only used with --scheduler central). Defaults to Luigi's default (localhost).")
subparser.add_argument('--scheduler-port', dest="scheduler_port", type=int, default=None,
help="Port of the central Luigi scheduler (only used with --scheduler central). Defaults to Luigi's default (8082).")
subparser.add_argument('--dispatch-id', dest="dispatch_id", type=str, default=None,
help="Unique id for this execution, propagated to all nodes. If omitted a uuid4 is generated.")


def addSolverTemplateOptions(subparser, solverName, incompressible):
"""
Adds the template options.
Expand Down Expand Up @@ -58,6 +70,7 @@ def addSolverTemplateOptions(subparser, solverName, incompressible):
help='Add the current workflow to the DB even if it exists there under a different name')
parser_prepare.add_argument('--noDB', dest="noDB", action="store_true", default=False,
help='Run the workflow without adding to the DB')
_add_scheduler_arguments(parser_prepare)
parser_prepare.set_defaults(execute=True)
parser_prepare.set_defaults(func=CLI.foam_solver_template_buildExecute)

Expand Down Expand Up @@ -509,6 +522,7 @@ if __name__ == "__main__":

execute_parser.add_argument('--workflowGroup', dest="workflowGroup", default=None, type=str,
help='The simulation group. [Optional]. If None, try to infer from the file name ')
_add_scheduler_arguments(execute_parser)

execute_parser.set_defaults(execute=True)
execute_parser.set_defaults(func=CLI.foam_solver_template_buildExecute)
Expand Down
8 changes: 8 additions & 0 deletions hera/bin/hera-workflows
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,14 @@ if __name__ == "__main__":
workflow_build_exec_parser = subparsers.add_parser('buildExecute', help='Adds the workflow and executes it.')
workflow_build_exec_parser.add_argument('workflowName', type=str, help='The workflow file to load')
workflow_build_exec_parser.add_argument('--projectName', type=str, default=None, help=PROJECT_NAME_HELP_DESC)
workflow_build_exec_parser.add_argument('--scheduler', dest="scheduler", choices=["local", "central"], default="local",
help="Luigi scheduler to use. 'local' (default) runs with --local-scheduler; 'central' connects to a running luigid.")
workflow_build_exec_parser.add_argument('--scheduler-host', dest="scheduler_host", type=str, default=None,
help="Host of the central Luigi scheduler (only used with --scheduler central). Defaults to Luigi's default (localhost).")
workflow_build_exec_parser.add_argument('--scheduler-port', dest="scheduler_port", type=int, default=None,
help="Port of the central Luigi scheduler (only used with --scheduler central). Defaults to Luigi's default (8082).")
workflow_build_exec_parser.add_argument('--dispatch-id', dest="dispatch_id", type=str, default=None,
help="Unique id for this execution, propagated to all nodes. If omitted a uuid4 is generated.")
workflow_build_exec_parser.set_defaults(func=CLI.workflow_buildExecute)


Expand Down
6 changes: 5 additions & 1 deletion hera/simulations/CLI.py
Original file line number Diff line number Diff line change
Expand Up @@ -666,4 +666,8 @@ def workflow_buildExecute(arguments):
else:
raise ValueError(f"The workflowName {arguments.workflowName} is not in the DB and not a file on the disk")

wftk.executeWorkflowFromDB(workflowName)
wftk.executeWorkflowFromDB(workflowName,
scheduler=getattr(arguments, "scheduler", "local"),
schedulerHost=getattr(arguments, "scheduler_host", None),
schedulerPort=getattr(arguments, "scheduler_port", None),
dispatch_id=getattr(arguments, "dispatch_id", None))
16 changes: 14 additions & 2 deletions hera/simulations/LSM/hermesWorkflowToolkit.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@
from ..datalayer import datatypes
import numpy
import pydoc
import uuid
import warnings
from ..utils.logging import with_logger,get_classMethod_logger
from ..hermesWorkflowToolkit import buildLuigiExecutionCommand, SCHEDULER_LOCAL, SCHEDULER_CENTRAL

try:
from hermes import workflow
Expand Down Expand Up @@ -407,7 +409,11 @@ def addWorkflowToGroup(self,
force: bool = False,
assignName: bool = False,
execute: bool = False,
parameters: dict = dict()):
parameters: dict = dict(),
scheduler: str = SCHEDULER_LOCAL,
schedulerHost: str = None,
schedulerPort: int = None,
dispatch_id: str = None):
"""
1. Adds the workflow to the database in the requested group
2. Builds the template (.json) and python executer
Expand Down Expand Up @@ -580,7 +586,13 @@ def addWorkflowToGroup(self,
shutil.rmtree(executionfileDir, ignore_errors=True)

pythonPath = os.path.join(self.FilesDirectory, f"{workflowName}")
executionStr = f"python3 -m luigi --module {os.path.basename(pythonPath)} finalnode_xx_0 --local-scheduler"
runDispatchId = dispatch_id or uuid.uuid4().hex
logger.info(f"Executing with scheduler='{scheduler}' dispatch_id='{runDispatchId}'")
executionStr = buildLuigiExecutionCommand(os.path.basename(pythonPath),
runDispatchId,
scheduler=scheduler,
schedulerHost=schedulerHost,
schedulerPort=schedulerPort)
self.logger.debug(executionStr)
os.system(executionStr)

Expand Down
83 changes: 77 additions & 6 deletions hera/simulations/hermesWorkflowToolkit.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from hera.datalayer import datatypes
import numpy
import pydoc
import uuid
import warnings
from ..utils.logging import get_classMethod_logger

Expand All @@ -23,6 +24,52 @@
workflow = None


# Default Luigi scheduler selection values, used by buildLuigiExecutionCommand below.
SCHEDULER_LOCAL = "local"
SCHEDULER_CENTRAL = "central"


def buildLuigiExecutionCommand(moduleName, dispatch_id, scheduler=SCHEDULER_LOCAL,
schedulerHost=None, schedulerPort=None,
targetTask="finalnode_xx_0"):
"""Build the ``python3 -m luigi`` command line used to execute a workflow.

A single definition shared by every place that runs a Hermes workflow so the
local/central scheduler logic and the ``dispatch_id`` propagation stay consistent.

Parameters
----------
moduleName : str
The name (without ``.py``) of the generated Luigi module to run with ``--module``.
dispatch_id : str
Unique identifier for this execution, passed to Luigi as ``--dispatch-id``. Luigi
maps the task's ``dispatch_id`` Parameter to the ``--dispatch-id`` CLI flag.
scheduler : str
``"local"`` (default) adds ``--local-scheduler``; ``"central"`` connects to a
running ``luigid`` central scheduler (optionally at ``schedulerHost``/``schedulerPort``).
schedulerHost, schedulerPort : str, int
Optional central scheduler address. When omitted Luigi uses its defaults
(localhost:8082). Ignored for the local scheduler.
targetTask : str
The terminal task that triggers the whole DAG (default ``finalnode_xx_0``).

Returns
-------
str
The full shell command.
"""
cmd = f"python3 -m luigi --module {moduleName} {targetTask}"
if scheduler == SCHEDULER_CENTRAL:
if schedulerHost is not None:
cmd += f" --scheduler-host {schedulerHost}"
if schedulerPort is not None:
cmd += f" --scheduler-port {schedulerPort}"
else:
cmd += " --local-scheduler"
cmd += f" --dispatch-id {dispatch_id}"
return cmd


@unique
class actionModes(Enum):
"""Workflow action modes controlling which steps are executed.
Expand Down Expand Up @@ -685,7 +732,9 @@ def addWorkflowToGroup(self, workflowJSON: str, groupName: str, writeWorkflowToF
return doc


def executeWorkflowFromDB(self, nameOrWorkflowFileOrJSONOrResource):
def executeWorkflowFromDB(self, nameOrWorkflowFileOrJSONOrResource,
scheduler=SCHEDULER_LOCAL, schedulerHost=None,
schedulerPort=None, dispatch_id=None):
"""
Building and Executing the workflow.

Expand All @@ -705,16 +754,31 @@ def executeWorkflowFromDB(self, nameOrWorkflowFileOrJSONOrResource):
- Its workflow
- workfolow dict.

build : bool [default = True]
If true, also builds the workflow.
scheduler : str [default = "local"]
Which Luigi scheduler to use. ``"local"`` runs with ``--local-scheduler``
(no separate process). ``"central"`` connects to a running ``luigid``.

schedulerHost, schedulerPort : str, int [default = None]
Optional central scheduler address (only used when scheduler == "central").
When omitted Luigi uses its defaults (localhost:8082).

dispatch_id : str [default = None]
Unique identifier for this execution, propagated to every Luigi node so the
central scheduler can tell distinct runs apart. When None a fresh uuid4 is
generated.

Returns
-------
None
str
The dispatch_id used for the execution (generated if not supplied), so
callers can correlate the run with the central scheduler.
"""
logger = get_classMethod_logger(self, "executeWorkflow")
docList = self.getWorkflowListDocumentFromDB(nameOrWorkflowFileOrJSONOrResource)

dispatch_id = dispatch_id or uuid.uuid4().hex
logger.info(f"Executing with scheduler='{scheduler}' dispatch_id='{dispatch_id}'")

for doc in docList:
workflowJSON = doc.desc['workflow']
workflowName = doc.desc['workflowName']
Expand Down Expand Up @@ -751,16 +815,23 @@ def executeWorkflowFromDB(self, nameOrWorkflowFileOrJSONOrResource):

# Step 5: Execute the Luigi pipeline via command line.
# 'finalnode_xx_0' is the terminal task that triggers the full DAG.
# --local-scheduler avoids requiring a separate Luigi scheduler process.
# The dispatch_id makes each run unique so the (optional) central scheduler
# does not deduplicate distinct executions of the same workflow.
pythonPath = os.path.join(self.FilesDirectory, f"{workflowName}")
executionStr = f"python3 -m luigi --module {os.path.basename(pythonPath)} finalnode_xx_0 --local-scheduler"
executionStr = buildLuigiExecutionCommand(os.path.basename(pythonPath),
dispatch_id,
scheduler=scheduler,
schedulerHost=schedulerHost,
schedulerPort=schedulerPort)
logger.debug(executionStr)
os.system(executionStr)

# Step 6: Clean up the generated Python module (the workflow JSON stays).
logger.info(f"Cleaning the executer python for {workflowName}")
os.remove(pythonFileName)

return dispatch_id


def compareWorkflowObj(self,
workflowList,
Expand Down
14 changes: 12 additions & 2 deletions hera/simulations/openFoam/toolkit.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,21 +68,31 @@ def __init__(self, projectName, filesDirectory=None, connectionName=None):
self.buoyantReactingFoam = buoyantReactingFoam_toolkitExtension(self)


def runOFSimulation(self,nameOrWorkflowFileOrJSONOrResource):
def runOFSimulation(self,nameOrWorkflowFileOrJSONOrResource,
scheduler="local", schedulerHost=None,
schedulerPort=None, dispatch_id=None):
"""
Build the workflow and then runs the simulation.

Parameters
----------
nameOrWorkflowFileOrJSONOrResource

scheduler, schedulerHost, schedulerPort, dispatch_id
Luigi scheduler selection and execution identity, forwarded to
executeWorkflowFromDB. See its docstring for details.

Returns
-------

"""
logger = get_classMethod_logger(self,"runOFSimulation")
logger.info("Building the case")
self.executeWorkflowFromDB(nameOrWorkflowFileOrJSONOrResource)
self.executeWorkflowFromDB(nameOrWorkflowFileOrJSONOrResource,
scheduler=scheduler,
schedulerHost=schedulerHost,
schedulerPort=schedulerPort,
dispatch_id=dispatch_id)

logger.info("Executing the cases")
docList = self.getWorkflowListDocumentFromDB(nameOrWorkflowFileOrJSONOrResource)
Expand Down
Loading
Loading