From 4006ff89b7125e73123c4eb377cc3791f4a003e9 Mon Sep 17 00:00:00 2001 From: Matt Shin Date: Thu, 8 Nov 2018 11:59:41 +0000 Subject: [PATCH] localhost background/at jobs: record host name Ensure that localhost background/at jobs are recorded as running on the host name of the current suite host, rather than just `localhost`. On suite restart on a different suite host, this allows the restart logic to correctly poll the status of the background/at jobs that may still be running on the previous suite host. --- lib/cylc/batch_sys_manager.py | 9 ++++- lib/cylc/task_job_mgr.py | 38 +++++++++++++------ tests/cylc-review/00-basic.t | 4 +- tests/database/00-simple.t | 7 +++- tests/database/00-simple/select-task-jobs.out | 3 -- tests/database/01-broadcast.t | 9 +++-- tests/database/02-retry.t | 9 +++-- tests/database/03-remote.t | 2 +- 8 files changed, 53 insertions(+), 28 deletions(-) delete mode 100644 tests/database/00-simple/select-task-jobs.out diff --git a/lib/cylc/batch_sys_manager.py b/lib/cylc/batch_sys_manager.py index 66e15a64da7..a92b6af0c28 100644 --- a/lib/cylc/batch_sys_manager.py +++ b/lib/cylc/batch_sys_manager.py @@ -78,7 +78,9 @@ batch_sys.SHOULD_KILL_PROC_GROUP * A boolean to indicate whether it is necessary to kill a job by sending - a signal to its Unix process group. + a signal to its Unix process group. This boolean also indicates that + a job submitted via this batch system will physically run on the same + host it is submitted to. batch_sys.SHOULD_POLL_PROC_GROUP * A boolean to indicate whether it is necessary to poll a job by its PID @@ -247,6 +249,11 @@ def get_vacation_signal(self, job_conf): if hasattr(batch_sys, "get_vacation_signal"): return batch_sys.get_vacation_signal(job_conf) + def is_job_local_to_host(self, batch_sys_name): + """Return True if batch system runs jobs local to the submit host.""" + return getattr( + self._get_sys(batch_sys_name), "SHOULD_KILL_PROC_GROUP", False) + def jobs_kill(self, job_log_root, job_log_dirs): """Kill multiple jobs. diff --git a/lib/cylc/task_job_mgr.py b/lib/cylc/task_job_mgr.py index 73e0399f7a5..ce5077067f5 100644 --- a/lib/cylc/task_job_mgr.py +++ b/lib/cylc/task_job_mgr.py @@ -36,10 +36,10 @@ from parsec.util import pdeepcopy, poverride from cylc import LOG -from cylc.batch_sys_manager import BatchSysManager, JobPollContext +from cylc.batch_sys_manager import JobPollContext from cylc.cfgspec.glbl_cfg import glbl_cfg import cylc.flags -from cylc.hostuserutil import is_remote_host, is_remote_user +from cylc.hostuserutil import get_host, is_remote_host, is_remote_user from cylc.job_file import JobFileWriter from cylc.task_job_logs import ( JOB_LOG_JOB, get_task_job_log, get_task_job_job_log, @@ -88,6 +88,7 @@ def __init__(self, suite, proc_pool, suite_db_mgr, suite_srv_files_mgr, self.suite_db_mgr = suite_db_mgr self.task_events_mgr = task_events_mgr self.job_file_writer = JobFileWriter() + self.batch_sys_mgr = self.job_file_writer.batch_sys_mgr self.suite_srv_files_mgr = suite_srv_files_mgr self.task_remote_mgr = TaskRemoteMgr( suite, proc_pool, suite_srv_files_mgr) @@ -216,11 +217,23 @@ def submit_task_jobs(self, suite, itasks, is_simulation=False): for itask in itasks: itask.summary['latest_message'] = self.REMOTE_INIT_MSG continue - # Persist - if owner: - owner_at_host = owner + '@' + host + # Ensure that localhost background/at jobs are recorded as running + # on the host name of the current suite host, rather than just + # "localhost". On suite restart on a different suite host, this + # allows the restart logic to correctly poll the status of the + # background/at jobs that may still be running on the previous + # suite host. + if ( + self.batch_sys_mgr.is_job_local_to_host( + itask.summary['batch_sys_name']) and + not is_remote_host(host) + ): + owner_at_host = get_host() else: owner_at_host = host + # Persist + if owner: + owner_at_host = owner + '@' + owner_at_host now_str = get_current_time_string() done_tasks.extend(itasks) for itask in itasks: @@ -411,7 +424,8 @@ def _kill_task_jobs_callback(self, ctx, suite, itasks): suite, itasks, self._kill_task_job_callback, - {BatchSysManager.OUT_PREFIX_COMMAND: self._job_cmd_out_callback}) + {self.batch_sys_mgr.OUT_PREFIX_COMMAND: self._job_cmd_out_callback} + ) def _kill_task_job_callback(self, suite, itask, cmd_ctx, line): """Helper for _kill_task_jobs_callback, on one task job.""" @@ -451,9 +465,8 @@ def _kill_task_job_callback(self, suite, itask, cmd_ctx, line): LOG.log(log_lvl, "[%s] -job(%02d) %s" % ( itask.identity, itask.submit_num, log_msg)) - @staticmethod def _manip_task_jobs_callback( - ctx, suite, itasks, summary_callback, more_callbacks=None): + self, ctx, suite, itasks, summary_callback, more_callbacks=None): """Callback when submit/poll/kill tasks command exits.""" if ctx.ret_code: LOG.error(ctx) @@ -468,7 +481,7 @@ def _manip_task_jobs_callback( if itask.point is not None and itask.submit_num: submit_num = "%02d" % (itask.submit_num) tasks[(str(itask.point), itask.tdef.name, submit_num)] = itask - handlers = [(BatchSysManager.OUT_PREFIX_SUMMARY, summary_callback)] + handlers = [(self.batch_sys_mgr.OUT_PREFIX_SUMMARY, summary_callback)] if more_callbacks: for prefix, callback in more_callbacks.items(): handlers.append((prefix, callback)) @@ -481,7 +494,7 @@ def _manip_task_jobs_callback( for job_log_dir in job_log_dirs: point, name, submit_num = job_log_dir.split(os.sep, 2) itask = tasks[(point, name, submit_num)] - out += (BatchSysManager.OUT_PREFIX_SUMMARY + + out += (self.batch_sys_mgr.OUT_PREFIX_SUMMARY + "|".join([ctx.timestamp, job_log_dir, "1"]) + "\n") for line in out.splitlines(True): for prefix, callback in handlers: @@ -505,7 +518,7 @@ def _poll_task_jobs_callback(self, ctx, suite, itasks): suite, itasks, self._poll_task_job_callback, - {BatchSysManager.OUT_PREFIX_MESSAGE: + {self.batch_sys_mgr.OUT_PREFIX_MESSAGE: self._poll_task_job_message_callback}) def _poll_task_job_callback(self, suite, itask, cmd_ctx, line): @@ -671,7 +684,8 @@ def _submit_task_jobs_callback(self, ctx, suite, itasks): suite, itasks, self._submit_task_job_callback, - {BatchSysManager.OUT_PREFIX_COMMAND: self._job_cmd_out_callback}) + {self.batch_sys_mgr.OUT_PREFIX_COMMAND: self._job_cmd_out_callback} + ) def _submit_task_job_callback(self, suite, itask, cmd_ctx, line): """Helper for _submit_task_jobs_callback, on one task job.""" diff --git a/tests/cylc-review/00-basic.t b/tests/cylc-review/00-basic.t index 96a34f4e6d0..49383badf1f 100755 --- a/tests/cylc-review/00-basic.t +++ b/tests/cylc-review/00-basic.t @@ -136,7 +136,7 @@ cylc_ws_json_greps "${TEST_NAME}.stdout" "${TEST_NAME}.stdout" \ "[('states', 'is_failed',), False]" \ "[('of_n_entries',), 2]" \ "[('entries', ${FOO0}, 'task_status',), 'succeeded']" \ - "[('entries', ${FOO0}, 'host',), 'localhost']" \ + "[('entries', ${FOO0}, 'host',), '$(hostname -f)']" \ "[('entries', ${FOO0}, 'submit_method',), 'background']" \ "[('entries', ${FOO0}, 'logs', 'job', 'path'), '${FOO0_JOB}']" \ "[('entries', ${FOO0}, 'logs', 'job.err', 'path'), '${FOO0_JOB}.err']" \ @@ -160,7 +160,7 @@ cylc_ws_json_greps "${TEST_NAME}.stdout" "${TEST_NAME}.stdout" \ "[('entries', ${FOO0}, 'seq_logs_indexes', 'job.trace.*.html', '32'), 'job.trace.32.html']" \ "[('entries', ${FOO0}, 'seq_logs_indexes', 'job.trace.*.html', '256'), 'job.trace.256.html']" \ "[('entries', ${FOO1}, 'task_status',), 'succeeded']" \ - "[('entries', ${FOO1}, 'host',), 'localhost']" \ + "[('entries', ${FOO1}, 'host',), '$(hostname -f)']" \ "[('entries', ${FOO1}, 'submit_method',), 'background']" \ "[('entries', ${FOO1}, 'logs', 'job', 'path'), '${FOO1_JOB}']" \ "[('entries', ${FOO1}, 'logs', 'job.err', 'path'), '${FOO1_JOB}.err']" \ diff --git a/tests/database/00-simple.t b/tests/database/00-simple.t index b5beaac3b29..ab5b7d3cf3f 100644 --- a/tests/database/00-simple.t +++ b/tests/database/00-simple.t @@ -56,7 +56,12 @@ sqlite3 "${DB_FILE}" \ user_at_host, batch_sys_name FROM task_jobs ORDER BY name' \ >"${NAME}" -cmp_ok "${TEST_SOURCE_DIR}/${TEST_NAME_BASE}/${NAME}" "${NAME}" +LOCALHOST="$(hostname -f)" +cmp_ok - "${NAME}" <<__SELECT__ +1|bar|1|1|0|0|${LOCALHOST}|background +1|baz|1|1|0|0|${LOCALHOST}|background +1|foo|1|1|0|0|${LOCALHOST}|background +__SELECT__ NAME='select-task-jobs-times.out' sqlite3 "${DB_FILE}" \ diff --git a/tests/database/00-simple/select-task-jobs.out b/tests/database/00-simple/select-task-jobs.out deleted file mode 100644 index bbbefc5c76d..00000000000 --- a/tests/database/00-simple/select-task-jobs.out +++ /dev/null @@ -1,3 +0,0 @@ -1|bar|1|1|0|0|localhost|background -1|baz|1|1|0|0|localhost|background -1|foo|1|1|0|0|localhost|background diff --git a/tests/database/01-broadcast.t b/tests/database/01-broadcast.t index 2896523ab34..4c3e2ffb3ff 100755 --- a/tests/database/01-broadcast.t +++ b/tests/database/01-broadcast.t @@ -52,10 +52,11 @@ sqlite3 "${DB_FILE}" \ user_at_host, batch_sys_name FROM task_jobs ORDER BY name' \ >"${NAME}" -cmp_ok "${NAME}" <<'__SELECT__' -1|recover-t1|1|0|0|0|localhost|background -1|t1|1|0|0|1|localhost|background -1|t1|2|1|0|0|localhost|background +LOCALHOST="$(hostname -f)" +cmp_ok "${NAME}" <<__SELECT__ +1|recover-t1|1|0|0|0|${LOCALHOST}|background +1|t1|1|0|0|1|${LOCALHOST}|background +1|t1|2|1|0|0|${LOCALHOST}|background __SELECT__ purge_suite "${SUITE_NAME}" diff --git a/tests/database/02-retry.t b/tests/database/02-retry.t index cf34323342e..d5722030c70 100755 --- a/tests/database/02-retry.t +++ b/tests/database/02-retry.t @@ -38,10 +38,11 @@ sqlite3 "${DB_FILE}" \ user_at_host, batch_sys_name FROM task_jobs ORDER BY name' \ >"${NAME}" -cmp_ok "${NAME}" <<'__SELECT__' -20200101T0000Z|t1|1|1|0|1|localhost|background -20200101T0000Z|t1|2|2|0|1|localhost|background -20200101T0000Z|t1|3|3|0|0|localhost|background +LOCALHOST="$(hostname -f)" +cmp_ok "${NAME}" <<__SELECT__ +20200101T0000Z|t1|1|1|0|1|${LOCALHOST}|background +20200101T0000Z|t1|2|2|0|1|${LOCALHOST}|background +20200101T0000Z|t1|3|3|0|0|${LOCALHOST}|background __SELECT__ purge_suite "${SUITE_NAME}" diff --git a/tests/database/03-remote.t b/tests/database/03-remote.t index 519bc42ddfb..e491385041c 100755 --- a/tests/database/03-remote.t +++ b/tests/database/03-remote.t @@ -41,7 +41,7 @@ sqlite3 "${DB_FILE}" \ FROM task_jobs ORDER BY name' \ >"${NAME}" cmp_ok "${NAME}" <<__SELECT__ -20200101T0000Z|t1|1|1|0|0|localhost|background +20200101T0000Z|t1|1|1|0|0|$(hostname -f)|background 20200101T0000Z|t2|1|1|0|0|${CYLC_TEST_HOST}|background __SELECT__