diff --git a/cylc/flow/scheduler.py b/cylc/flow/scheduler.py index 83dd875c29d..82ed11a1271 100644 --- a/cylc/flow/scheduler.py +++ b/cylc/flow/scheduler.py @@ -39,7 +39,6 @@ from cylc.flow.cfgspec.glbl_cfg import glbl_cfg from cylc.flow.config import SuiteConfig from cylc.flow.cycling.loader import get_point, standardise_point_string -from cylc.flow.daemonize import daemonize from cylc.flow.exceptions import ( CylcError, PointParsingError, @@ -249,18 +248,23 @@ def __init__(self, reg, options, is_restart=False): self.main_loop_plugins = None - self.prepare() # create thread sync barrier for setup self.barrier = Barrier(3, timeout=10) - make_suite_run_tree(self.suite) + # self.install() # TODO ORDER - if self.is_restart: - self.suite_db_mgr.restart_upgrade() + def install(self): + """Get the filesystem in the right state to run the flow. - def prepare(self): + * Register. + * Install authentication files. + * Build the directory tree. + * Upgrade the DB if required. + * Copy Python files. + + """ try: - suite_files.get_suite_source_dir(self.suite) #, self.options.owner) + suite_files.get_suite_source_dir(self.suite) except SuiteServiceFileError: # Source path is assumed to be the run directory suite_files.register(self.suite, get_suite_run_dir(self.suite)) @@ -273,94 +277,62 @@ def prepare(self): suite_files.get_suite_srv_dir(self.suite), ['etc/job.sh']) - async def start(self): - """Start the server.""" - try: - self.data_store_mgr = DataStoreMgr(self) - - # *** Network Related *** - # TODO: this in zmq asyncio context? - # Requires the Cylc main loop in asyncio first - # And use of concurrent.futures.ThreadPoolExecutor? - self.zmq_context = zmq.Context() - # create & configure an authenticator for the ZMQ context - self.curve_auth = ThreadAuthenticator(self.zmq_context, log=LOG) - self.curve_auth.start() # start the authentication thread - - # Setting the location means that the CurveZMQ auth will only - # accept public client certificates from the given directory, as - # generated by a user when they initiate a ZMQ socket ready to - # connect to a server. - suite_srv_dir = suite_files.get_suite_srv_dir(self.suite) - client_pub_keyinfo = suite_files.KeyInfo( - suite_files.KeyType.PUBLIC, - suite_files.KeyOwner.CLIENT, - suite_srv_dir=suite_srv_dir) - client_pub_key_dir = client_pub_keyinfo.key_path - self.curve_auth.configure_curve( - domain='*', - location=(client_pub_key_dir) - ) - - port_range = glbl_cfg().get(['suite servers', 'run ports']) - self.server = SuiteRuntimeServer( - self, context=self.zmq_context, barrier=self.barrier) - self.server.start(port_range[0], port_range[-1]) - self.publisher = WorkflowPublisher( - self.suite, context=self.zmq_context, barrier=self.barrier) - self.publisher.start(port_range[0], port_range[-1]) - # wait for threads to setup socket ports before continuing - self.barrier.wait() - self.port = self.server.port - self.pub_port = self.publisher.port + make_suite_run_tree(self.suite) - await self.configure() - self.profiler.start() - self.initialise_scheduler() - self.data_store_mgr.initiate_data_model() - await self.publisher.publish( - self.data_store_mgr.get_publish_deltas() - ) - await self.main_loop() + if self.is_restart: + self.suite_db_mgr.restart_upgrade() - except SchedulerStop as exc: - # deliberate stop - await self.shutdown(exc) - if self.auto_restart_mode == AutoRestartMode.RESTART_NORMAL: - self.suite_auto_restart() - # run shutdown coros - await asyncio.gather( - *main_loop.get_runners( - self.main_loop_plugins, - main_loop.CoroTypes.ShutDown, - self - ) - ) - raise exc from None + # Copy local python modules from source to run directory + for sub_dir in ["python", os.path.join("lib", "python")]: + # TODO - eventually drop the deprecated "python" sub-dir. + suite_py = os.path.join(self.suite_dir, sub_dir) + if (os.path.realpath(self.suite_dir) != + os.path.realpath(self.suite_run_dir) and + os.path.isdir(suite_py)): + suite_run_py = os.path.join(self.suite_run_dir, sub_dir) + try: + rmtree(suite_run_py) + except OSError: + pass + copytree(suite_py, suite_run_py) - except SchedulerError as exc: - await self.shutdown(exc) + async def initialise(self): + """Initialise the components and sub-systems required to run the flow. + """ + self.data_store_mgr = DataStoreMgr(self) + + # *** Network Related *** + # TODO: this in zmq asyncio context? + # Requires the Cylc main loop in asyncio first + # And use of concurrent.futures.ThreadPoolExecutor? + self.zmq_context = zmq.Context() + # create & configure an authenticator for the ZMQ context + self.curve_auth = ThreadAuthenticator(self.zmq_context, log=LOG) + self.curve_auth.start() # start the authentication thread + + # Setting the location means that the CurveZMQ auth will only + # accept public client certificates from the given directory, as + # generated by a user when they initiate a ZMQ socket ready to + # connect to a server. + suite_srv_dir = suite_files.get_suite_srv_dir(self.suite) + client_pub_keyinfo = suite_files.KeyInfo( + suite_files.KeyType.PUBLIC, + suite_files.KeyOwner.CLIENT, + suite_srv_dir=suite_srv_dir) + client_pub_key_dir = client_pub_keyinfo.key_path + self.curve_auth.configure_curve( + domain='*', + location=(client_pub_key_dir) + ) - except Exception as exc: - try: - await self.shutdown(exc) - except Exception as exc2: - # In case of exceptions in the shutdown method itself - LOG.exception(exc2) - raise exc from None - else: - # main loop ends (not used?) - await self.shutdown(SchedulerStop(StopMode.AUTO.value)) + self.server = SuiteRuntimeServer( + self, context=self.zmq_context, barrier=self.barrier) + self.publisher = WorkflowPublisher( + self.suite, context=self.zmq_context, barrier=self.barrier) - finally: - self.profiler.stop() - async def configure(self): - """Configure suite server program.""" - self.profiler.log_memory("scheduler.py: start configure") - # Start up essential services self.proc_pool = SubProcPool() self.state_summary_mgr = StateSummaryMgr() self.command_queue = Queue() @@ -385,13 +357,20 @@ async def configure(self): suite_share_dir=self.suite_share_dir, suite_source_dir=self.suite_dir) + async def configure(self): + """Configure the scheduler. + + * Load the flow configuration. + * Load/write suite parameters from the DB. + * Get the data store rolling. + + """ + self.profiler.log_memory("scheduler.py: start configure") if self.is_restart: # This logic handles the lack of initial cycle point in "suite.rc". # Things that can't change on suite reload. pri_dao = self.suite_db_mgr.get_pri_dao() pri_dao.select_suite_params(self._load_suite_params) - # Configure contact data only after loading UUID string - self.configure_contact() pri_dao.select_suite_template_vars(self._load_template_vars) # Take checkpoint and commit immediately so that checkpoint can be # copied to the public database. @@ -399,7 +378,6 @@ async def configure(self): pri_dao.execute_queued_items() n_restart = pri_dao.select_checkpoint_id_restart_count() else: - self.configure_contact() n_restart = 0 # Copy local python modules from source to run directory @@ -509,16 +487,115 @@ async def configure(self): self.options.main_loop ) - await asyncio.gather( - *main_loop.get_runners( - self.main_loop_plugins, - main_loop.CoroTypes.StartUp, - self - ) - ) + # Determine whether suite is held or should be held + # Determine whether suite can be auto shutdown + holdcp = None + if self.options.holdcp: + holdcp = self.options.holdcp + elif self.config.cfg['scheduling']['hold after point']: + holdcp = self.config.cfg['scheduling']['hold after point'] + if holdcp is not None: + self.hold_suite(get_point(holdcp)) + if self.options.hold_start: + LOG.info("Held on start-up (no tasks will be submitted)") + self.hold_suite() + self.run_event_handlers(self.EVENT_STARTUP, 'suite starting') + self.profiler.log_memory("scheduler.py: begin run while loop") + self.is_updated = True + if self.options.profile_mode: + self.previous_profile_point = 0 + self.count = 0 + if self.options.no_auto_shutdown is not None: + self.can_auto_stop = not self.options.no_auto_shutdown + elif self.config.cfg['cylc']['disable automatic shutdown'] is not None: + self.can_auto_stop = ( + not self.config.cfg['cylc']['disable automatic shutdown']) self.profiler.log_memory("scheduler.py: end configure") + def start_servers(self): + """Start the TCP servers.""" + port_range = glbl_cfg().get(['suite servers', 'run ports']) + self.server.start(port_range[0], port_range[-1]) + self.publisher.start(port_range[0], port_range[-1]) + # wait for threads to setup socket ports before continuing + self.barrier.wait() + self.port = self.server.port + self.pub_port = self.publisher.port + + async def start(self): + """Start the scheduler.""" + try: + self.data_store_mgr.initiate_data_model() + self._configure_contact() + await asyncio.gather( + *main_loop.get_runners( + self.main_loop_plugins, + main_loop.CoroTypes.StartUp, + self + ) + ) + await self.publisher.publish( + self.data_store_mgr.get_publish_deltas() + ) + self.profiler.start() + await self.main_loop() + + except SchedulerStop as exc: + # deliberate stop + await self.shutdown(exc) + if self.auto_restart_mode == AutoRestartMode.RESTART_NORMAL: + self.suite_auto_restart() + # run shutdown coros + await asyncio.gather( + *main_loop.get_runners( + self.main_loop_plugins, + main_loop.CoroTypes.ShutDown, + self + ) + ) + raise exc from None + + except SchedulerError as exc: + await self.shutdown(exc) + + except Exception as exc: + try: + await self.shutdown(exc) + except Exception as exc2: + # In case of exceptions in the shutdown method itself + LOG.exception(exc2) + raise exc from None + + else: + # main loop ends (not used?) + await self.shutdown(SchedulerStop(StopMode.AUTO.value)) + + finally: + self.profiler.stop() + + async def configure_and_start(self): + """Run the startup sequence. + + * initialise + * configure + * start_servers + * start + + Lightweight wrapper for convenience. + + """ + try: + await self.initialise() + await self.configure() + self.start_servers() + except Exception as exc: + LOG.exception(exc) + raise + else: + await self.start() + + def load_tasks_for_run(self): """Load tasks for a new run.""" if self.config.start_point is not None: @@ -1035,7 +1112,7 @@ def set_suite_inactivity_timer(self): self._get_events_conf(self.EVENT_INACTIVITY_TIMEOUT)), get_current_time_string()) - def configure_contact(self): + def _configure_contact(self): """Create contact file.""" # Make sure another suite of the same name has not started while this # one is starting @@ -1240,35 +1317,6 @@ def run_event_handlers(self, event, reason): event, str(reason), self.suite, self.uuid_str, self.owner, self.host, self.server.port)) - def initialise_scheduler(self): - """Prelude to the main scheduler loop. - - Determines whether suite is held or should be held. - Determines whether suite can be auto shutdown. - Begins profile logs if needed. - """ - holdcp = None - if self.options.holdcp: - holdcp = self.options.holdcp - elif self.config.cfg['scheduling']['hold after point']: - holdcp = self.config.cfg['scheduling']['hold after point'] - if holdcp is not None: - self.hold_suite(get_point(holdcp)) - if self.options.hold_start: - LOG.info("Held on start-up (no tasks will be submitted)") - self.hold_suite() - self.run_event_handlers(self.EVENT_STARTUP, 'suite starting') - self.profiler.log_memory("scheduler.py: begin run while loop") - self.is_updated = True - if self.options.profile_mode: - self.previous_profile_point = 0 - self.count = 0 - if self.options.no_auto_shutdown is not None: - self.can_auto_stop = not self.options.no_auto_shutdown - elif self.config.cfg['cylc']['disable automatic shutdown'] is not None: - self.can_auto_stop = ( - not self.config.cfg['cylc']['disable automatic shutdown']) - def process_task_pool(self): """Process ALL TASKS whenever something has changed that might require renegotiation of dependencies, etc""" diff --git a/cylc/flow/scheduler_cli.py b/cylc/flow/scheduler_cli.py index b49ef2e655b..6444804b49e 100644 --- a/cylc/flow/scheduler_cli.py +++ b/cylc/flow/scheduler_cli.py @@ -39,7 +39,6 @@ from cylc.flow.terminal import cli_function - RUN_DOC = r"""cylc [control] run|start [OPTIONS] [ARGS] Start a suite run from scratch, ignoring dependence prior to the start point. @@ -275,7 +274,6 @@ def _open_logs(reg, no_detach): def _close_logs(): """Close Cylc log handlers for a flow run.""" - LOG.info("DONE") # main thread exit for handler in LOG.handlers: try: handler.close() @@ -346,31 +344,34 @@ def scheduler_cli(parser, options, args, is_restart=False): if remrun(set_rel_local=True): # State localhost as above. sys.exit() - # initalise the scheduler + # initalise the scheduler + scheduler = Scheduler(reg, options, is_restart=is_restart) try: - scheduler = Scheduler(reg, options, is_restart=is_restart) + scheduler.install() except SuiteServiceFileError as exc: sys.exit(exc) - # print the start message + # print the start message if options.no_detach or options.format == 'plain': _start_print_blurb() - # daemonise if requested + # daemonise if requested if not options.no_detach: daemonize(scheduler) # settup loggers _open_logs(reg, options.no_detach) + # take the scheduler through the startup sequence + # run cylc run loop = asyncio.get_event_loop() try: loop.run_until_complete( - scheduler.start() + scheduler.configure_and_start() ) - # stop cylc stop + # stop cylc stop except KeyboardInterrupt as exc: try: loop.run_until_complete( @@ -381,10 +382,12 @@ def scheduler_cli(parser, options, args, is_restart=False): LOG.exception(exc2) raise exc2 from None except Exception: - # suppress the exception to prevent it appearing in the log + # suppress the exception to prevent it appearing in the log pass finally: + LOG.info("DONE") _close_logs() + loop.close() def main(is_restart=False):