5252 default_pipestat_output_schema ,
5353 result_formatter_markdown ,
5454)
55- from pipestat . helpers import read_yaml_data
55+ from yacman import load_yaml
5656
5757__all__ = ["PipelineManager" ]
5858
5959
6060LOCK_PREFIX = "lock."
61+ LOGFILE_SUFFIX = "_log.md"
6162
6263
6364class Unbuffered (object ):
@@ -112,6 +113,12 @@ class PipelineManager(object):
112113 protect from a case in which a restart begins upstream of a stage
113114 for which a checkpoint file already exists, but that depends on the
114115 upstream stage and thus should be rerun if it's "parent" is rerun.
116+ :param str pipestat_record_identifier: record_identifier to report results via pipestat
117+ :param str pipestat_schema: output schema used by pipestat to report results
118+ :param str pipestat_results_file: path to file backend for reporting results
119+ :param str pipestat_config_file: path to pipestat configuration file
120+ :param str pipestat_pipeline_type: Sample or Project level pipeline
121+ :param pipestat_result_formatter: function used to style reported results, defaults to result_formatter_markdown
115122 :raise TypeError: if start or stop point(s) are provided both directly and
116123 via args namespace, or if both stopping types (exclusive/prospective
117124 and inclusive/retrospective) are provided.
@@ -136,8 +143,7 @@ def __init__(
136143 output_parent = None ,
137144 overwrite_checkpoints = False ,
138145 logger_kwargs = None ,
139- pipestat_project_name = None ,
140- pipestat_sample_name = None ,
146+ pipestat_record_identifier = None ,
141147 pipestat_schema = None ,
142148 pipestat_results_file = None ,
143149 pipestat_config = None ,
@@ -193,10 +199,7 @@ def __init__(
193199 # If no starting point was specified, assume that the pipeline's
194200 # execution is to begin right away and set the internal flag so that
195201 # run() is let loose to execute instructions given.
196- if not self .start_point :
197- self ._active = True
198- else :
199- self ._active = False
202+ self ._active = not self .start_point
200203
201204 # Pipeline-level variables to track global state and pipeline stats
202205 # Pipeline settings
@@ -210,26 +213,37 @@ def __init__(
210213 self .output_parent = params ["output_parent" ]
211214 self .testmode = params ["testmode" ]
212215
216+ # Establish the log file to check safety with logging keyword arguments.
217+ # Establish the output folder since it's required for the log file.
218+ self .outfolder = os .path .join (outfolder , "" ) # trailing slash
219+ self .pipeline_log_file = pipeline_filepath (self , suffix = LOGFILE_SUFFIX )
220+
213221 # Set up logger
214222 logger_kwargs = logger_kwargs or {}
223+ if logger_kwargs .get ("logfile" ) == self .pipeline_log_file :
224+ raise ValueError (
225+ f"The logfile given for the pipeline manager's logger matches that which will be used by the manager itself: { self .pipeline_log_file } "
226+ )
215227 default_logname = "." .join ([__name__ , self .__class__ .__name__ , self .name ])
216- if not args :
228+ self ._logger = None
229+ if args :
230+ logger_builder_method = "logger_via_cli"
231+ try :
232+ self ._logger = logger_via_cli (args , ** logger_kwargs )
233+ except logmuse .est .AbsentOptionException as e :
234+ # Defer logger construction to init_logger.
235+ self .debug (f"logger_via_cli failed: { e } " )
236+ if self ._logger is None :
237+ logger_builder_method = "init_logger"
238+ # covers cases of bool(args) being False, or failure of logger_via_cli.
217239 # strict is only for logger_via_cli.
218- kwds = {k : v for k , v in logger_kwargs .items () if k != "strict" }
240+ logger_kwargs = {k : v for k , v in logger_kwargs .items () if k != "strict" }
219241 try :
220- name = kwds .pop ("name" )
242+ name = logger_kwargs .pop ("name" )
221243 except KeyError :
222244 name = default_logname
223- self ._logger = logmuse .init_logger (name , ** kwds )
224- self .debug ("Logger set with logmuse.init_logger" )
225- else :
226- logger_kwargs .setdefault ("name" , default_logname )
227- try :
228- self ._logger = logmuse .logger_via_cli (args )
229- self .debug ("Logger set with logmuse.logger_via_cli" )
230- except logmuse .est .AbsentOptionException :
231- self ._logger = logmuse .init_logger ("pypiper" , level = "DEBUG" )
232- self .debug ("logger_via_cli failed; Logger set with logmuse.init_logger" )
245+ self ._logger = logmuse .init_logger (name , ** logger_kwargs )
246+ self .debug (f"Logger set with { logger_builder_method } " )
233247
234248 # Keep track of an ID for the number of processes attempted
235249 self .proc_count = 0
@@ -276,10 +290,7 @@ def __init__(
276290 # self.output_parent = os.path.join(os.getcwd(), self.output_parent)
277291
278292 # File paths:
279- self .outfolder = os .path .join (outfolder , "" ) # trailing slash
280293 self .make_sure_path_exists (self .outfolder )
281- self .pipeline_log_file = pipeline_filepath (self , suffix = "_log.md" )
282-
283294 self .pipeline_profile_file = pipeline_filepath (self , suffix = "_profile.tsv" )
284295
285296 # Stats and figures are general and so lack the pipeline name.
@@ -330,7 +341,9 @@ def __init__(
330341 signal .signal (signal .SIGTERM , self ._signal_term_handler )
331342
332343 # pipestat setup
333- self .pipestat_record_identifier = pipestat_sample_name or DEFAULT_SAMPLE_NAME
344+ self .pipestat_record_identifier = (
345+ pipestat_record_identifier or DEFAULT_SAMPLE_NAME
346+ )
334347 self .pipestat_pipeline_type = pipestat_pipeline_type or "sample"
335348
336349 # don't force default pipestat_results_file value unless
@@ -631,88 +644,41 @@ def start_pipeline(self, args=None, multi=False):
631644 # Print out a header section in the pipeline log:
632645 # Wrap things in backticks to prevent markdown from interpreting underscores as emphasis.
633646 # print("----------------------------------------")
634- self .info ("### Pipeline run code and environment:\n " )
635- self .info (
636- "* " + "Command" .rjust (20 ) + ": " + "`" + str (" " .join (sys .argv )) + "`"
637- )
638- self .info ("* " + "Compute host" .rjust (20 ) + ": " + platform .node ())
639- self .info ("* " + "Working dir" .rjust (20 ) + ": " + os .getcwd ())
640- self .info ("* " + "Outfolder" .rjust (20 ) + ": " + self .outfolder )
647+ def logfmt (key , value = None , padding = 16 ):
648+ padded_key = key .rjust (padding )
649+ formatted_val = f"`{ value } `" if value else ""
650+ return f"* { padded_key } : { formatted_val } "
641651
642- self .timestamp ("* " + "Pipeline started at" .rjust (20 ) + ": " )
652+ self .info ("### Pipeline run code and environment:\n " )
653+ self .info (logfmt ("Command" , str (" " .join (sys .argv ))))
654+ self .info (logfmt ("Compute host" , platform .node ()))
655+ self .info (logfmt ("Working dir" , os .getcwd ()))
656+ self .info (logfmt ("Outfolder" , self .outfolder ))
657+ self .info (logfmt ("Log file" , self .pipeline_log_file ))
658+ self .timestamp (logfmt ("Start time" ))
643659
644660 self .info ("\n ### Version log:\n " )
645- self .info ("* " + " Python version". rjust ( 20 ) + ": " + platform .python_version ())
661+ self .info (logfmt ( " Python version", platform .python_version () ))
646662 try :
647- self .info (
648- "* "
649- + "Pypiper dir" .rjust (20 )
650- + ": "
651- + "`"
652- + gitvars ["pypiper_dir" ].strip ()
653- + "`"
654- )
655- self .info ("* " + "Pypiper version" .rjust (20 ) + ": " + __version__ )
656- self .info (
657- "* " + "Pypiper hash" .rjust (20 ) + ": " + str (gitvars ["pypiper_hash" ])
658- )
659- self .info (
660- "* "
661- + "Pypiper branch" .rjust (20 )
662- + ": "
663- + str (gitvars ["pypiper_branch" ])
664- )
665- self .info (
666- "* " + "Pypiper date" .rjust (20 ) + ": " + str (gitvars ["pypiper_date" ])
667- )
663+ self .info (logfmt ("Pypiper dir" , gitvars ["pypiper_dir" ].strip ()))
664+ self .info (logfmt ("Pypiper version" , __version__ ))
665+ self .info (logfmt ("Pypiper hash" , gitvars ["pypiper_hash" ]))
666+ self .info (logfmt ("Pypiper branch" , gitvars ["pypiper_branch" ]))
667+ self .info (logfmt ("Pypiper date" , gitvars ["pypiper_date" ]))
668668 if gitvars ["pypiper_diff" ]:
669- self .info (
670- "* "
671- + "Pypiper diff" .rjust (20 )
672- + ": "
673- + str (gitvars ["pypiper_diff" ])
674- )
669+ self .info (logfmt ("Pypiper diff" , gitvars ["pypiper_diff" ]))
675670 except KeyError :
676671 # It is ok if keys aren't set, it means pypiper isn't in a git repo.
677672 pass
678673
679674 try :
680- self .info (
681- "* "
682- + "Pipeline dir" .rjust (20 )
683- + ": "
684- + "`"
685- + gitvars ["pipe_dir" ].strip ()
686- + "`"
687- )
688- self .info (
689- "* " + "Pipeline version" .rjust (20 ) + ": " + str (self .pl_version )
690- )
691- self .info (
692- "* "
693- + "Pipeline hash" .rjust (20 )
694- + ": "
695- + str (gitvars ["pipe_hash" ]).strip ()
696- )
697- self .info (
698- "* "
699- + "Pipeline branch" .rjust (20 )
700- + ": "
701- + str (gitvars ["pipe_branch" ]).strip ()
702- )
703- self .info (
704- "* "
705- + "Pipeline date" .rjust (20 )
706- + ": "
707- + str (gitvars ["pipe_date" ]).strip ()
708- )
675+ self .info (logfmt ("Pipeline dir" , gitvars ["pipe_dir" ].strip ()))
676+ self .info (logfmt ("Pipeline version" , self .pl_version ))
677+ self .info (logfmt ("Pipeline hash" , gitvars ["pipe_hash" ]).strip ())
678+ self .info (logfmt ("Pipeline branch" , gitvars ["pipe_branch" ]).strip ())
679+ self .info (logfmt ("Pipeline date" , gitvars ["pipe_date" ]).strip ())
709680 if gitvars ["pipe_diff" ] != "" :
710- self .info (
711- "* "
712- + "Pipeline diff" .rjust (20 )
713- + ": "
714- + str (gitvars ["pipe_diff" ]).strip ()
715- )
681+ self .info (logfmt ("Pipeline diff" , gitvars ["pipe_diff" ]).strip ())
716682 except KeyError :
717683 # It is ok if keys aren't set, it means the pipeline isn't a git repo.
718684 pass
@@ -1593,7 +1559,7 @@ def _report_profile(
15931559 myfile .write (message_raw + "\n " )
15941560
15951561 def report_result (
1596- self , key , value , nolog = False , result_formatter = None , force_overwrite = False
1562+ self , key , value , nolog = False , result_formatter = None , force_overwrite = True
15971563 ):
15981564 """
15991565 Writes a key:value pair to self.pipeline_stats_file.
@@ -1640,7 +1606,7 @@ def report_object(
16401606 annotation = None ,
16411607 nolog = False ,
16421608 result_formatter = None ,
1643- force_overwrite = False ,
1609+ force_overwrite = True ,
16441610 ):
16451611 """
16461612 Writes a key:value pair to self.pipeline_stats_file. Note: this function
@@ -1862,7 +1828,7 @@ def _refresh_stats(self):
18621828 """
18631829
18641830 if os .path .isfile (self .pipeline_stats_file ):
1865- _ , data = read_yaml_data ( path = self .pipeline_stats_file , what = "stats_file" )
1831+ data = load_yaml ( filepath = self .pipeline_stats_file )
18661832
18671833 for key , value in data [self ._pipestat_manager .pipeline_name ][
18681834 self ._pipestat_manager .pipeline_type
0 commit comments