Skip to content

Commit

Permalink
scheduler: experimental re-organisation of startup sequence
Browse files Browse the repository at this point in the history
  • Loading branch information
oliver-sanders committed Jun 4, 2020
1 parent b14ae8c commit 671930d
Show file tree
Hide file tree
Showing 2 changed files with 187 additions and 136 deletions.
302 changes: 175 additions & 127 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
from cylc.flow.cfgspec.glbl_cfg import glbl_cfg
from cylc.flow.config import SuiteConfig
from cylc.flow.cycling.loader import get_point, standardise_point_string
from cylc.flow.daemonize import daemonize
from cylc.flow.exceptions import (
CylcError,
PointParsingError,
Expand Down Expand Up @@ -249,18 +248,23 @@ def __init__(self, reg, options, is_restart=False):

self.main_loop_plugins = None

self.prepare()
# create thread sync barrier for setup
self.barrier = Barrier(3, timeout=10)

make_suite_run_tree(self.suite)
# self.install() # TODO ORDER

if self.is_restart:
self.suite_db_mgr.restart_upgrade()
def install(self):
"""Get the filesystem in the right state to run the flow.
def prepare(self):
* Register.
* Install authentication files.
* Build the directory tree.
* Upgrade the DB if required.
* Copy Python files.
"""
try:
suite_files.get_suite_source_dir(self.suite) #, self.options.owner)
suite_files.get_suite_source_dir(self.suite)
except SuiteServiceFileError:
# Source path is assumed to be the run directory
suite_files.register(self.suite, get_suite_run_dir(self.suite))
Expand All @@ -273,94 +277,62 @@ def prepare(self):
suite_files.get_suite_srv_dir(self.suite),
['etc/job.sh'])

async def start(self):
"""Start the server."""
try:
self.data_store_mgr = DataStoreMgr(self)

# *** Network Related ***
# TODO: this in zmq asyncio context?
# Requires the Cylc main loop in asyncio first
# And use of concurrent.futures.ThreadPoolExecutor?
self.zmq_context = zmq.Context()
# create & configure an authenticator for the ZMQ context
self.curve_auth = ThreadAuthenticator(self.zmq_context, log=LOG)
self.curve_auth.start() # start the authentication thread

# Setting the location means that the CurveZMQ auth will only
# accept public client certificates from the given directory, as
# generated by a user when they initiate a ZMQ socket ready to
# connect to a server.
suite_srv_dir = suite_files.get_suite_srv_dir(self.suite)
client_pub_keyinfo = suite_files.KeyInfo(
suite_files.KeyType.PUBLIC,
suite_files.KeyOwner.CLIENT,
suite_srv_dir=suite_srv_dir)
client_pub_key_dir = client_pub_keyinfo.key_path
self.curve_auth.configure_curve(
domain='*',
location=(client_pub_key_dir)
)

port_range = glbl_cfg().get(['suite servers', 'run ports'])
self.server = SuiteRuntimeServer(
self, context=self.zmq_context, barrier=self.barrier)
self.server.start(port_range[0], port_range[-1])
self.publisher = WorkflowPublisher(
self.suite, context=self.zmq_context, barrier=self.barrier)
self.publisher.start(port_range[0], port_range[-1])
# wait for threads to setup socket ports before continuing
self.barrier.wait()
self.port = self.server.port
self.pub_port = self.publisher.port
make_suite_run_tree(self.suite)

await self.configure()
self.profiler.start()
self.initialise_scheduler()
self.data_store_mgr.initiate_data_model()
await self.publisher.publish(
self.data_store_mgr.get_publish_deltas()
)
await self.main_loop()
if self.is_restart:
self.suite_db_mgr.restart_upgrade()

except SchedulerStop as exc:
# deliberate stop
await self.shutdown(exc)
if self.auto_restart_mode == AutoRestartMode.RESTART_NORMAL:
self.suite_auto_restart()
# run shutdown coros
await asyncio.gather(
*main_loop.get_runners(
self.main_loop_plugins,
main_loop.CoroTypes.ShutDown,
self
)
)
raise exc from None
# Copy local python modules from source to run directory
for sub_dir in ["python", os.path.join("lib", "python")]:
# TODO - eventually drop the deprecated "python" sub-dir.
suite_py = os.path.join(self.suite_dir, sub_dir)
if (os.path.realpath(self.suite_dir) !=
os.path.realpath(self.suite_run_dir) and
os.path.isdir(suite_py)):
suite_run_py = os.path.join(self.suite_run_dir, sub_dir)
try:
rmtree(suite_run_py)
except OSError:
pass
copytree(suite_py, suite_run_py)

except SchedulerError as exc:
await self.shutdown(exc)
async def initialise(self):
"""Initialise the components and sub-systems required to run the flow.
"""
self.data_store_mgr = DataStoreMgr(self)

# *** Network Related ***
# TODO: this in zmq asyncio context?
# Requires the Cylc main loop in asyncio first
# And use of concurrent.futures.ThreadPoolExecutor?
self.zmq_context = zmq.Context()
# create & configure an authenticator for the ZMQ context
self.curve_auth = ThreadAuthenticator(self.zmq_context, log=LOG)
self.curve_auth.start() # start the authentication thread

# Setting the location means that the CurveZMQ auth will only
# accept public client certificates from the given directory, as
# generated by a user when they initiate a ZMQ socket ready to
# connect to a server.
suite_srv_dir = suite_files.get_suite_srv_dir(self.suite)
client_pub_keyinfo = suite_files.KeyInfo(
suite_files.KeyType.PUBLIC,
suite_files.KeyOwner.CLIENT,
suite_srv_dir=suite_srv_dir)
client_pub_key_dir = client_pub_keyinfo.key_path
self.curve_auth.configure_curve(
domain='*',
location=(client_pub_key_dir)
)

except Exception as exc:
try:
await self.shutdown(exc)
except Exception as exc2:
# In case of exceptions in the shutdown method itself
LOG.exception(exc2)
raise exc from None

else:
# main loop ends (not used?)
await self.shutdown(SchedulerStop(StopMode.AUTO.value))
self.server = SuiteRuntimeServer(
self, context=self.zmq_context, barrier=self.barrier)
self.publisher = WorkflowPublisher(
self.suite, context=self.zmq_context, barrier=self.barrier)

finally:
self.profiler.stop()

async def configure(self):
"""Configure suite server program."""
self.profiler.log_memory("scheduler.py: start configure")

# Start up essential services
self.proc_pool = SubProcPool()
self.state_summary_mgr = StateSummaryMgr()
self.command_queue = Queue()
Expand All @@ -385,21 +357,27 @@ async def configure(self):
suite_share_dir=self.suite_share_dir,
suite_source_dir=self.suite_dir)

async def configure(self):
"""Configure the scheduler.
* Load the flow configuration.
* Load/write suite parameters from the DB.
* Get the data store rolling.
"""
self.profiler.log_memory("scheduler.py: start configure")
if self.is_restart:
# This logic handles the lack of initial cycle point in "suite.rc".
# Things that can't change on suite reload.
pri_dao = self.suite_db_mgr.get_pri_dao()
pri_dao.select_suite_params(self._load_suite_params)
# Configure contact data only after loading UUID string
self.configure_contact()
pri_dao.select_suite_template_vars(self._load_template_vars)
# Take checkpoint and commit immediately so that checkpoint can be
# copied to the public database.
pri_dao.take_checkpoints("restart")
pri_dao.execute_queued_items()
n_restart = pri_dao.select_checkpoint_id_restart_count()
else:
self.configure_contact()
n_restart = 0

# Copy local python modules from source to run directory
Expand Down Expand Up @@ -509,16 +487,115 @@ async def configure(self):
self.options.main_loop
)

await asyncio.gather(
*main_loop.get_runners(
self.main_loop_plugins,
main_loop.CoroTypes.StartUp,
self
)
)
# Determine whether suite is held or should be held
# Determine whether suite can be auto shutdown
holdcp = None
if self.options.holdcp:
holdcp = self.options.holdcp
elif self.config.cfg['scheduling']['hold after point']:
holdcp = self.config.cfg['scheduling']['hold after point']
if holdcp is not None:
self.hold_suite(get_point(holdcp))
if self.options.hold_start:
LOG.info("Held on start-up (no tasks will be submitted)")
self.hold_suite()
self.run_event_handlers(self.EVENT_STARTUP, 'suite starting')
self.profiler.log_memory("scheduler.py: begin run while loop")
self.is_updated = True
if self.options.profile_mode:
self.previous_profile_point = 0
self.count = 0
if self.options.no_auto_shutdown is not None:
self.can_auto_stop = not self.options.no_auto_shutdown
elif self.config.cfg['cylc']['disable automatic shutdown'] is not None:
self.can_auto_stop = (
not self.config.cfg['cylc']['disable automatic shutdown'])

self.profiler.log_memory("scheduler.py: end configure")

def start_servers(self):
"""Start the TCP servers."""
port_range = glbl_cfg().get(['suite servers', 'run ports'])
self.server.start(port_range[0], port_range[-1])
self.publisher.start(port_range[0], port_range[-1])
# wait for threads to setup socket ports before continuing
self.barrier.wait()
self.port = self.server.port
self.pub_port = self.publisher.port

async def start(self):
"""Start the scheduler."""
try:
self.data_store_mgr.initiate_data_model()
self._configure_contact()
await asyncio.gather(
*main_loop.get_runners(
self.main_loop_plugins,
main_loop.CoroTypes.StartUp,
self
)
)
await self.publisher.publish(
self.data_store_mgr.get_publish_deltas()
)
self.profiler.start()
await self.main_loop()

except SchedulerStop as exc:
# deliberate stop
await self.shutdown(exc)
if self.auto_restart_mode == AutoRestartMode.RESTART_NORMAL:
self.suite_auto_restart()
# run shutdown coros
await asyncio.gather(
*main_loop.get_runners(
self.main_loop_plugins,
main_loop.CoroTypes.ShutDown,
self
)
)
raise exc from None

except SchedulerError as exc:
await self.shutdown(exc)

except Exception as exc:
try:
await self.shutdown(exc)
except Exception as exc2:
# In case of exceptions in the shutdown method itself
LOG.exception(exc2)
raise exc from None

else:
# main loop ends (not used?)
await self.shutdown(SchedulerStop(StopMode.AUTO.value))

finally:
self.profiler.stop()

async def configure_and_start(self):
"""Run the startup sequence.
* initialise
* configure
* start_servers
* start
Lightweight wrapper for convenience.
"""
try:
await self.initialise()
await self.configure()
self.start_servers()
except Exception as exc:
LOG.exception(exc)
raise
else:
await self.start()


def load_tasks_for_run(self):
"""Load tasks for a new run."""
if self.config.start_point is not None:
Expand Down Expand Up @@ -1035,7 +1112,7 @@ def set_suite_inactivity_timer(self):
self._get_events_conf(self.EVENT_INACTIVITY_TIMEOUT)),
get_current_time_string())

def configure_contact(self):
def _configure_contact(self):
"""Create contact file."""
# Make sure another suite of the same name has not started while this
# one is starting
Expand Down Expand Up @@ -1240,35 +1317,6 @@ def run_event_handlers(self, event, reason):
event, str(reason), self.suite, self.uuid_str, self.owner,
self.host, self.server.port))

def initialise_scheduler(self):
"""Prelude to the main scheduler loop.
Determines whether suite is held or should be held.
Determines whether suite can be auto shutdown.
Begins profile logs if needed.
"""
holdcp = None
if self.options.holdcp:
holdcp = self.options.holdcp
elif self.config.cfg['scheduling']['hold after point']:
holdcp = self.config.cfg['scheduling']['hold after point']
if holdcp is not None:
self.hold_suite(get_point(holdcp))
if self.options.hold_start:
LOG.info("Held on start-up (no tasks will be submitted)")
self.hold_suite()
self.run_event_handlers(self.EVENT_STARTUP, 'suite starting')
self.profiler.log_memory("scheduler.py: begin run while loop")
self.is_updated = True
if self.options.profile_mode:
self.previous_profile_point = 0
self.count = 0
if self.options.no_auto_shutdown is not None:
self.can_auto_stop = not self.options.no_auto_shutdown
elif self.config.cfg['cylc']['disable automatic shutdown'] is not None:
self.can_auto_stop = (
not self.config.cfg['cylc']['disable automatic shutdown'])

def process_task_pool(self):
"""Process ALL TASKS whenever something has changed that might
require renegotiation of dependencies, etc"""
Expand Down
Loading

0 comments on commit 671930d

Please sign in to comment.