diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 439511a3462..24d11b67a87 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -61,6 +61,7 @@ requests_). - (Martin Dix) - (Ivor Blockley) - Alexander Paulsell + - David Sutherland (All contributors are identifiable with email addresses in the git version control logs or otherwise.) diff --git a/doc/src/suite-config.rst b/doc/src/suite-config.rst index 1e05f2e77cb..2416dadcd62 100644 --- a/doc/src/suite-config.rst +++ b/doc/src/suite-config.rst @@ -3241,10 +3241,34 @@ automatically imports the user environment to template's global namespace [[[environment]]] SUITE_OWNER_HOME_DIR_ON_SUITE_HOST = {{environ['HOME']}} -This example is emphasizes that *the environment is read on the suite -host at the time the suite configuration is parsed* - it is not, for -instance, read at task run time on the task host. +In addition, the following variables are exported to this environment +prior to configuration parsing to provide suite context: +.. code-block:: bash + + CYLC_DEBUG # Debug mode, true or not defined + CYLC_DIR # Location of cylc installation used + CYLC_VERBOSE # Verbose mode, True or False + CYLC_VERSION # Version of cylc installation used + + CYLC_SUITE_NAME # Suite name + + CYLC_SUITE_DEF_PATH # Location of the suite configuration + # source path on suite host, + # e.g. ~/cylc-run/foo + CYLC_SUITE_LOG_DIR # Suite log directory. + CYLC_SUITE_RUN_DIR # Location of the suite run directory in + # suite host, e.g. ~/cylc-run/foo + CYLC_SUITE_SHARE_DIR # Suite (or task post parsing!) + # shared directory. + CYLC_SUITE_WORK_DIR # Suite work directory. + + +.. note:: + + The example above emphasizes that *the environment - including the suite + context variables - is read on the suite host when the suite configuration + is parsed*, not at task run time on job hosts. .. _CustomJinja2Filters: diff --git a/lib/cylc/config.py b/lib/cylc/config.py index 0d13bda19a4..4606ed76139 100644 --- a/lib/cylc/config.py +++ b/lib/cylc/config.py @@ -17,6 +17,13 @@ # along with this program. If not, see . """Parse and validate the suite definition file +Set local values of variables to give suite context before parsing +config, i.e for template filters (Jinja2, python ...etc) and possibly +needed locally by event handlers. This is needed for both running and +non-running suite parsing (obtaining config/graph info). Potentially +task-specific due to different directory paths on different task hosts, +however, they are overridden by tasks prior to job submission. + Do some consistency checking, then construct task proxy objects and graph structures. """ @@ -56,7 +63,7 @@ from cylc.task_id import TaskID from cylc.task_outputs import TASK_OUTPUT_SUCCEEDED from cylc.task_trigger import TaskTrigger, Dependency -from cylc.wallclock import get_current_time_string, set_utc_mode +from cylc.wallclock import get_current_time_string, set_utc_mode, get_utc_mode from cylc.xtrigger_mgr import XtriggerManager @@ -112,7 +119,9 @@ def __init__(self, suite, fpath, template_vars=None, cli_start_point_string=None, cli_final_point_string=None, is_reload=False, output_fname=None, vis_start_string=None, vis_stop_string=None, - xtrigger_mgr=None, mem_log_func=None): + xtrigger_mgr=None, mem_log_func=None, + run_dir=None, log_dir=None, + work_dir=None, share_dir=None): self.mem_log = mem_log_func if mem_log_func is None: @@ -121,6 +130,14 @@ def __init__(self, suite, fpath, template_vars=None, self.suite = suite # suite name self.fpath = fpath # suite definition self.fdir = os.path.dirname(fpath) + self.run_dir = run_dir or glbl_cfg().get_derived_host_item( + self.suite, 'suite run directory') + self.log_dir = log_dir or glbl_cfg().get_derived_host_item( + self.suite, 'suite log directory') + self.work_dir = work_dir or glbl_cfg().get_derived_host_item( + self.suite, 'suite work directory') + self.share_dir = share_dir or glbl_cfg().get_derived_host_item( + self.suite, 'suite share directory') self.owner = owner self.run_mode = run_mode self.strict = strict @@ -129,6 +146,7 @@ def __init__(self, suite, fpath, template_vars=None, self.taskdefs = {} self.initial_point = None self.start_point = None + self.final_point = None self.first_graph = True self.clock_offsets = {} self.expiration_offsets = {} @@ -175,6 +193,9 @@ def __init__(self, suite, fpath, template_vars=None, # one up from root self.feet = [] + # Export local environmental suite context before config parsing. + self.process_suite_env() + # parse, upgrade, validate the suite, but don't expand with default # items self.mem_log("config.py: before RawSuiteConfig init") @@ -381,10 +402,7 @@ def __init__(self, suite, fpath, template_vars=None, self.cfg['scheduling']['initial cycle point constraints']) raise SuiteConfigError( ("Initial cycle point %s does not meet the constraints " + - "%s") % ( - str(self.initial_point), - constraints_str - ) + "%s") % (str(self.initial_point), constraints_str) ) if (self.cfg['scheduling']['final cycle point'] is not None and @@ -392,44 +410,44 @@ def __init__(self, suite, fpath, template_vars=None, self.cfg['scheduling']['final cycle point'] = None final_point_string = (cli_final_point_string or self.cfg['scheduling']['final cycle point']) - final_point = None if final_point_string is not None: # Is the final "point"(/interval) relative to initial? if get_interval_cls().get_null().TYPE == INTEGER_CYCLING_TYPE: if "P" in final_point_string: # Relative, integer cycling. - final_point = get_point_relative( + self.final_point = get_point_relative( self.cfg['scheduling']['final cycle point'], self.initial_point ).standardise() else: try: # Relative, ISO8601 cycling. - final_point = get_point_relative( + self.final_point = get_point_relative( final_point_string, self.initial_point).standardise() except ValueError: # (not relative) pass - if final_point is None: + if self.final_point is None: # Must be absolute. - final_point = get_point(final_point_string).standardise() - self.cfg['scheduling']['final cycle point'] = str(final_point) + self.final_point = get_point(final_point_string).standardise() + self.cfg['scheduling']['final cycle point'] = str(self.final_point) - if final_point is not None and self.initial_point > final_point: + if (self.final_point is not None and + self.initial_point > self.final_point): raise SuiteConfigError( "The initial cycle point:" + str(self.initial_point) + " is after the final cycle point:" + - str(final_point) + ".") + str(self.final_point) + ".") # Validate final cycle point against any constraints - if (final_point is not None and + if (self.final_point is not None and self.cfg['scheduling']['final cycle point constraints']): valid_fcp = False for entry in ( self.cfg['scheduling']['final cycle point constraints']): possible_pt = get_point_relative( - entry, final_point).standardise() - if final_point == possible_pt: + entry, self.final_point).standardise() + if self.final_point == possible_pt: valid_fcp = True break if not valid_fcp: @@ -437,7 +455,7 @@ def __init__(self, suite, fpath, template_vars=None, self.cfg['scheduling']['final cycle point constraints']) raise SuiteConfigError( "Final cycle point %s does not meet the constraints %s" % ( - str(final_point), constraints_str)) + str(self.final_point), constraints_str)) # Parse special task cycle point offsets, and replace family names. LOG.debug("Parsing [special tasks]") @@ -536,7 +554,7 @@ def __init__(self, suite, fpath, template_vars=None, if self.cfg['cylc']['force run mode']: self.run_mode = self.cfg['cylc']['force run mode'] - self.process_directories() + self.process_config_env() self.mem_log("config.py: before load_graph()") self.load_graph() @@ -702,10 +720,10 @@ def __init__(self, suite, fpath, template_vars=None, vfcp = None # A viz final point can't be beyond the suite final point. - if vfcp is not None and final_point is not None: - if vfcp > final_point: + if vfcp is not None and self.final_point is not None: + if vfcp > self.final_point: self.cfg['visualization']['final cycle point'] = str( - final_point) + self.final_point) # Replace suite and task name in suite and task URLs. self.cfg['meta']['URL'] = self.cfg['meta']['URL'] % { @@ -1261,8 +1279,8 @@ def configure_sim_modes(self): ).get_seconds() rtc['job']['execution time limit'] = ( sleep_sec + DurationParser().parse(str( - rtc['simulation']['time limit buffer']) - ).get_seconds()) + rtc['simulation']['time limit buffer'])).get_seconds() + ) rtc['job']['simulated run length'] = sleep_sec # Generate dummy scripting. @@ -1410,9 +1428,36 @@ def print_first_parent_tree(self, pretty=False, titles=False): print_tree(tree, padding=padding, use_unicode=pretty) - def process_directories(self): - os.environ['CYLC_SUITE_NAME'] = self.suite - os.environ['CYLC_SUITE_DEF_PATH'] = self.fdir + def process_suite_env(self): + """Suite context is exported to the local environment.""" + for var, val in [ + ('CYLC_SUITE_NAME', self.suite), + ('CYLC_DEBUG', str(cylc.flags.debug).lower()), + ('CYLC_VERBOSE', str(cylc.flags.verbose).lower()), + ('CYLC_SUITE_DEF_PATH', self.fdir), + ('CYLC_SUITE_RUN_DIR', self.run_dir), + ('CYLC_SUITE_LOG_DIR', self.log_dir), + ('CYLC_SUITE_WORK_DIR', self.work_dir), + ('CYLC_SUITE_SHARE_DIR', self.share_dir)]: + os.environ[var] = val + + def process_config_env(self): + """Set local config derived environment.""" + os.environ['CYLC_UTC'] = str(get_utc_mode()) + os.environ['CYLC_SUITE_INITIAL_CYCLE_POINT'] = str(self.initial_point) + os.environ['CYLC_SUITE_FINAL_CYCLE_POINT'] = str(self.final_point) + os.environ['CYLC_CYCLING_MODE'] = self.cfg['scheduling'][ + 'cycling mode'] + # (global config auto expands environment variables in local paths) + cenv = self.cfg['cylc']['environment'].copy() + for var, val in cenv.items(): + cenv[var] = os.path.expandvars(val) + # path to suite bin directory for suite and event handlers + cenv['PATH'] = os.pathsep.join([ + os.path.join(self.fdir, 'bin'), os.environ['PATH']]) + # and to suite event handlers in this process. + for var, val in cenv.items(): + os.environ[var] = val def check_tasks(self): """Call after all tasks are defined. diff --git a/lib/cylc/scheduler.py b/lib/cylc/scheduler.py index 9d8d19bc3d0..d0f36ff52bd 100644 --- a/lib/cylc/scheduler.py +++ b/lib/cylc/scheduler.py @@ -450,7 +450,6 @@ def configure(self): self.suite_db_mgr.put_suite_params(self) self.suite_db_mgr.put_suite_template_vars(self.template_vars) self.suite_db_mgr.put_runtime_inheritance(self.config) - self.configure_suite_environment() # Copy local python modules from source to run directory for sub_dir in ["python", os.path.join("lib", "python")]: @@ -891,7 +890,6 @@ def command_reload_suite(self): for task in add: LOG.warning("Added task: '%s'" % (task,)) - self.configure_suite_environment() if self.options.genref or self.options.reftest: self.configure_reftest(recon=True) self.suite_db_mgr.put_suite_params(self) @@ -972,7 +970,8 @@ def configure_contact(self): self.contact_data = contact_data def load_suiterc(self, is_reload=False): - """Load and log the suite definition.""" + """Load, and log the suite definition.""" + # Local suite environment set therein. self.config = SuiteConfig( self.suite, self.suiterc, self.template_vars, run_mode=self.run_mode, @@ -985,6 +984,10 @@ def load_suiterc(self, is_reload=False): output_fname=os.path.join( self.suite_run_dir, self.suite_srv_files_mgr.FILE_BASE_SUITE_RC + '.processed'), + run_dir=self.suite_run_dir, + log_dir=self.suite_log_dir, + work_dir=self.suite_work_dir, + share_dir=self.suite_share_dir, ) self.suiterc_update_time = time() # Dump the loaded suiterc for future reference. @@ -1013,12 +1016,7 @@ def load_suiterc(self, is_reload=False): # self.config already alters the 'initial cycle point' for CLI. self.initial_point = self.config.initial_point self.start_point = self.config.start_point - self.final_point = get_point( - self.options.final_point_string or - self.config.cfg['scheduling']['final cycle point'] - ) - if self.final_point is not None: - self.final_point.standardise() + self.final_point = self.config.final_point if not self.initial_point and not self.is_restart: LOG.warning('No initial cycle point provided - no cycling tasks ' @@ -1027,11 +1025,24 @@ def load_suiterc(self, is_reload=False): if self.run_mode != self.config.run_mode: self.run_mode = self.config.run_mode + # Pass static cylc and suite variables to job script generation code + self.task_job_mgr.job_file_writer.set_suite_env({ + 'CYLC_UTC': str(get_utc_mode()), + 'CYLC_DEBUG': str(cylc.flags.debug).lower(), + 'CYLC_VERBOSE': str(cylc.flags.verbose).lower(), + 'CYLC_SUITE_NAME': self.suite, + 'CYLC_CYCLING_MODE': str( + self.config.cfg['scheduling']['cycling mode']), + 'CYLC_SUITE_INITIAL_CYCLE_POINT': str(self.initial_point), + 'CYLC_SUITE_FINAL_CYCLE_POINT': str(self.final_point), + }) + def _load_suite_params_1(self, _, row): """Load previous initial cycle point or (warm) start cycle point. For restart, these may be missing from "suite.rc", but was specified as a command line argument on cold/warm start. + """ key, value = row if key == 'initial_point': @@ -1051,47 +1062,6 @@ def _load_template_vars(self, _, row): if key not in self.template_vars: self.template_vars[key] = value - def configure_suite_environment(self): - """Configure suite environment.""" - # Pass static cylc and suite variables to job script generation code - self.task_job_mgr.job_file_writer.set_suite_env({ - 'CYLC_UTC': str(get_utc_mode()), - 'CYLC_DEBUG': str(cylc.flags.debug).lower(), - 'CYLC_VERBOSE': str(cylc.flags.verbose).lower(), - 'CYLC_SUITE_NAME': self.suite, - 'CYLC_CYCLING_MODE': str( - self.config.cfg['scheduling']['cycling mode']), - 'CYLC_SUITE_INITIAL_CYCLE_POINT': str(self.initial_point), - 'CYLC_SUITE_FINAL_CYCLE_POINT': str(self.final_point), - }) - - # Make suite vars available to [cylc][environment]: - for var, val in self.task_job_mgr.job_file_writer.suite_env.items(): - os.environ[var] = val - # Set local values of variables that are potentially task-specific - # due to different directory paths on different task hosts. These - # are overridden by tasks prior to job submission, but in - # principle they could be needed locally by event handlers: - for var, val in [ - ('CYLC_SUITE_RUN_DIR', self.suite_run_dir), - ('CYLC_SUITE_LOG_DIR', self.suite_log_dir), - ('CYLC_SUITE_WORK_DIR', self.suite_work_dir), - ('CYLC_SUITE_SHARE_DIR', self.suite_share_dir), - ('CYLC_SUITE_DEF_PATH', self.suite_dir)]: - os.environ[var] = val - - # (global config auto expands environment variables in local paths) - cenv = self.config.cfg['cylc']['environment'].copy() - for var, val in cenv.items(): - cenv[var] = os.path.expandvars(val) - # path to suite bin directory for suite and event handlers - cenv['PATH'] = os.pathsep.join([ - os.path.join(self.suite_dir, 'bin'), os.environ['PATH']]) - - # and to suite event handlers in this process. - for var, val in cenv.items(): - os.environ[var] = val - def configure_reftest(self, recon=False): """Configure the reference test.""" if self.options.genref: @@ -1137,8 +1107,8 @@ def run_event_handlers(self, event, reason): """ try: if (self.run_mode in ['simulation', 'dummy'] and - self.config.cfg['cylc']['simulation'][ - 'disable suite event handlers']): + self.config.cfg['cylc']['simulation'][ + 'disable suite event handlers']): return except KeyError: pass @@ -1299,7 +1269,7 @@ def suite_shutdown(self): else: raise SchedulerStop(self.stop_mode) elif (self.time_next_kill is not None and - time() > self.time_next_kill): + time() > self.time_next_kill): self.command_poll_tasks() self.command_kill_tasks() self.time_next_kill = time() + self.INTERVAL_STOP_KILL @@ -1314,12 +1284,10 @@ def suite_shutdown(self): # * Ensure the host can be safely taken down once the # suite has stopped running. for itask in self.pool.get_tasks(): - if ( - itask.state.status in TASK_STATUSES_ACTIVE - and itask.summary['batch_sys_name'] - and self.task_job_mgr.batch_sys_mgr.is_job_local_to_host( - itask.summary['batch_sys_name']) - ): + if (itask.state.status in TASK_STATUSES_ACTIVE and + itask.summary['batch_sys_name'] and + self.task_job_mgr.batch_sys_mgr.is_job_local_to_host( + itask.summary['batch_sys_name'])): LOG.info('Waiting for jobs running on localhost to ' 'complete before attempting restart') break @@ -1504,10 +1472,8 @@ def suite_health_check(self, has_changes): ' $ cylc restart %s', self.suite) if self.set_auto_restart(mode=mode): return # skip remaining health checks - elif ( - self.set_auto_restart(current_glbl_cfg.get( - ['suite servers', 'auto restart delay'])) - ): + elif (self.set_auto_restart(current_glbl_cfg.get( + ['suite servers', 'auto restart delay']))): # server is condemned -> configure the suite to # auto stop-restart if possible, else, report the # issue preventing this @@ -1913,8 +1879,7 @@ def stop_clock_done(self): self.stop_clock_time)) self.stop_clock_time = None return True - else: - return False + return False def _update_profile_info(self, category, amount, amount_format="%s"): """Update the 1, 5, 15 minute dt averages for a given category."""