Skip to content
This repository has been archived by the owner on Jul 8, 2021. It is now read-only.

Commit

Permalink
remote-init/tidy: command to init/tidy remotes
Browse files Browse the repository at this point in the history
New commands to initialise/tidy suite directory hierarchy for remote
task runs. Commands now run in the background, and launched on remote
hosts as using `cylc.remote` remote run mechanism.
(Tick a box in cylc#2302.)

Update log and runtime database only before the real submission command.
  • Loading branch information
dvalters authored and matthewrmshin committed Jan 9, 2018
1 parent 36ce9c3 commit dc7a4a1
Show file tree
Hide file tree
Showing 9 changed files with 572 additions and 244 deletions.
4 changes: 4 additions & 0 deletions bin/cylc-help
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,8 @@ task_commands['message'] = ['message', 'task-message']
task_commands['jobs-kill'] = ['jobs-kill']
task_commands['jobs-poll'] = ['jobs-poll']
task_commands['jobs-submit'] = ['jobs-submit']
task_commands['remote-init'] = ['remote-init']
task_commands['remote-tidy'] = ['remote-tidy']

all_commands = {}
for dct in [
Expand Down Expand Up @@ -422,6 +424,8 @@ comsum['broadcast'] = 'Change suite [runtime] settings on the fly'
comsum['jobs-kill'] = '(Internal) Kill task jobs'
comsum['jobs-poll'] = '(Internal) Retrieve status for task jobs'
comsum['jobs-submit'] = '(Internal) Submit task jobs'
comsum['remote-init'] = '(Internal) Initialise a task remote'
comsum['remote-tidy'] = '(Internal) Tidy a task remote'
# utility
comsum['cycle-point'] = 'Cycle point arithmetic and filename templating'
comsum['jobscript'] = 'Generate a task job script and print it to stdout'
Expand Down
48 changes: 48 additions & 0 deletions bin/cylc-remote-init
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
#!/usr/bin/env python

# THIS FILE IS PART OF THE CYLC SUITE ENGINE.
# Copyright (C) 2008-2017 NIWA
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""cylc [task] remote-init UUID RUND
(This command is for internal use.)
Install suite service files on a task remote (i.e. a [owner@]host):
* ".service/contact"
* ".service/passphrase"
* ".service/ssl.cert"
Content of items to install from a tar file read from STDIN.
Return 0 on success or if initialisation not required. Return 1 on failure.
Print SuiteSrvFilesManager.REMOTE_INIT_NOT_REQUIRED if initialisation not
required (e.g. remote has shared file system with suite host).
Print SuiteSrvFilesManager.REMOTE_INIT_DONE on success.
"""


from cylc.remote import remrun


if __name__ == '__main__' and not remrun().execute():
from cylc.option_parsers import CylcOptionParser as COP
from cylc.task_remote_cmd import remote_init
parser = COP(__doc__, argdoc=[
('UUID', 'UUID of current suite running process'),
('RUND', 'The run directory of the suite')])
uuid_str, rund = parser.parse_args()[1]
remote_init(uuid_str, rund)
34 changes: 34 additions & 0 deletions bin/cylc-remote-tidy
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#!/usr/bin/env python

# THIS FILE IS PART OF THE CYLC SUITE ENGINE.
# Copyright (C) 2008-2017 NIWA
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""cylc [task] remote-tidy RUND
(This command is for internal use.)
Remove ".service/contact" from a task remote (i.e. a [owner@]host).
Remove ".service" directory on the remote if emptied.
"""


from cylc.remote import remrun


if __name__ == '__main__' and not remrun().execute():
from cylc.option_parsers import CylcOptionParser as COP
from cylc.task_remote_cmd import remote_tidy
parser = COP(__doc__, argdoc=[('RUND', 'The run directory of the suite')])
remote_tidy(parser.parse_args()[1][0]) # position argument 1, rund
2 changes: 1 addition & 1 deletion bin/cylc-submit
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ def main():
GLOBAL_CFG.create_cylc_run_tree(suite)
task_job_mgr = TaskJobManager(
suite, SuiteProcPool(), SuiteDatabaseManager(), suite_srv_mgr)
task_job_mgr.single_task_mode = True
task_job_mgr.task_remote_mgr.single_task_mode = True
task_job_mgr.job_file_writer.set_suite_env({
'CYLC_UTC': str(config.cfg['cylc']['UTC mode']),
'CYLC_DEBUG': str(cylc.flags.debug),
Expand Down
12 changes: 7 additions & 5 deletions lib/cylc/mp_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,13 @@ def _run_command(ctx):

try:
if ctx.cmd_kwargs.get('stdin_file_paths'):
stdin_file = TemporaryFile()
for file_path in ctx.cmd_kwargs['stdin_file_paths']:
for line in open(file_path):
stdin_file.write(line)
stdin_file.seek(0)
if len(ctx.cmd_kwargs['stdin_file_paths']) > 1:
stdin_file = TemporaryFile()
for file_path in ctx.cmd_kwargs['stdin_file_paths']:
stdin_file.write(open(file_path, 'rb').read())
stdin_file.seek(0)
else:
stdin_file = open(ctx.cmd_kwargs['stdin_file_paths'][0], 'rb')
elif ctx.cmd_kwargs.get('stdin_str'):
stdin_file = PIPE
else:
Expand Down
33 changes: 21 additions & 12 deletions lib/cylc/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
KEY_TASKS_BY_STATE, KEY_TITLE, KEY_UPDATE_TIME)
from cylc.taskdef import TaskDef
from cylc.task_id import TaskID
from cylc.task_job_mgr import TaskJobManager, RemoteJobHostInitError
from cylc.task_job_mgr import TaskJobManager
from cylc.task_pool import TaskPool
from cylc.task_proxy import TaskProxy, TaskProxySequenceBoundsError
from cylc.task_state import TASK_STATUSES_ACTIVE, TASK_STATUS_FAILED
Expand Down Expand Up @@ -452,13 +452,19 @@ def load_tasks_for_restart(self):
# Re-initialise run directory for user@host for each submitted and
# running tasks.
# Note: tasks should all be in the runahead pool at this point.
auths = set()
for itask in self.pool.get_rh_tasks():
if itask.state.status in TASK_STATUSES_ACTIVE:
try:
self.task_job_mgr.init_host(
self.suite, itask.task_host, itask.task_owner)
except RemoteJobHostInitError as exc:
LOG.error(str(exc))
auths.add((itask.task_host, itask.task_owner))
while auths:
for host, owner in auths.copy():
if self.task_job_mgr.task_remote_mgr.remote_init(
host, owner) is not None:
auths.remove((host, owner))
if auths:
sleep(1.0)
# Remote init is done via process pool
self.proc_pool.handle_results_async()
self.command_poll_tasks()

def _load_suite_params(self, row_idx, row):
Expand Down Expand Up @@ -1168,6 +1174,9 @@ def process_task_pool(self):
if cylc.flags.debug:
LOG.debug("BEGIN TASK PROCESSING")
time0 = time()
if (self._get_events_conf(self.EVENT_INACTIVITY_TIMEOUT) and
self._get_events_conf('reset inactivity timer')):
self.set_suite_inactivity_timer()
self.pool.match_dependencies()
if self.stop_mode is None:
itasks = self.pool.get_ready_tasks()
Expand Down Expand Up @@ -1454,6 +1463,11 @@ def process_tasks(self):
process = True
self.task_events_mgr.pflag = False # reset

if self.task_job_mgr.task_remote_mgr.ready:
# This flag is turned on when a host init/select command completes
process = True
self.task_job_mgr.task_remote_mgr.ready = False # reset

self.pool.set_expired_tasks()
if self.pool.waiting_tasks_ready():
process = True
Expand All @@ -1462,11 +1476,6 @@ def process_tasks(self):
self.message_queue):
process = True

if (process and
self._get_events_conf(self.EVENT_INACTIVITY_TIMEOUT) and
self._get_events_conf('reset inactivity timer')):
self.set_suite_inactivity_timer()

return process

def shutdown(self, reason=None):
Expand Down Expand Up @@ -1523,7 +1532,7 @@ def shutdown(self, reason=None):
ERR.warning("failed to remove suite contact file: %s\n%s\n" % (
fname, exc))
if self.task_job_mgr:
self.task_job_mgr.unlink_hosts_contacts(self.suite)
self.task_job_mgr.task_remote_mgr.remote_tidy()

# disconnect from suite-db, stop db queue
try:
Expand Down
Loading

0 comments on commit dc7a4a1

Please sign in to comment.