diff --git a/cylc/flow/job_pool.py b/cylc/flow/job_pool.py index ca7ec46bdb4..df4a70404c7 100644 --- a/cylc/flow/job_pool.py +++ b/cylc/flow/job_pool.py @@ -45,8 +45,8 @@ class JobPool(object): """Pool of protobuf job messages.""" # TODO: description, args, and types - ERR_PREFIX_JOBID_MATCH = "No matching jobs found: " - ERR_PREFIX_JOB_NOT_ON_SEQUENCE = "Invalid cycle point for job: " + ERR_PREFIX_JOBID_MATCH = 'No matching jobs found: ' + ERR_PREFIX_JOB_NOT_ON_SEQUENCE = 'Invalid cycle point for job: ' def __init__(self, suite, owner): self.suite = suite @@ -64,7 +64,7 @@ def insert_job(self, job_conf): t_id = f'{self.workflow_id}{ID_DELIM}{point_string}{ID_DELIM}{name}' j_id = f'{t_id}{ID_DELIM}{sub_num}' j_buf = PbJob( - stamp=f"{j_id}@{update_time}", + stamp=f'{j_id}@{update_time}', id=j_id, submit_num=sub_num, state=JOB_STATUSES_ALL[0], @@ -86,26 +86,26 @@ def insert_job(self, job_conf): cycle_point=point_string, ) j_buf.batch_sys_conf.extend( - [f"{key}={val}" for key, val in + [f'{key}={val}' for key, val in job_conf['batch_system_conf'].items()]) j_buf.directives.extend( - [f"{key}={val}" for key, val in + [f'{key}={val}' for key, val in job_conf['directives'].items()]) j_buf.environment.extend( - [f"{key}={val}" for key, val in + [f'{key}={val}' for key, val in job_conf['environment'].items()]) j_buf.param_env_tmpl.extend( - [f"{key}={val}" for key, val in + [f'{key}={val}' for key, val in job_conf['param_env_tmpl'].items()]) j_buf.param_var.extend( - [f"{key}={val}" for key, val in + [f'{key}={val}' for key, val in job_conf['param_var'].items()]) j_buf.extra_logs.extend(job_conf['logfiles']) self.pool[j_id] = j_buf self.task_jobs.setdefault(t_id, []).append(j_id) def add_job_msg(self, job_d, msg): - """Set job attribute.""" + """Add message to job.""" update_time = time() point, name, sub_num = self.parse_job_item(job_d) j_id = ( @@ -113,7 +113,7 @@ def add_job_msg(self, job_d, msg): f'{ID_DELIM}{name}{ID_DELIM}{sub_num}') try: self.pool[j_id].messages.append(msg) - self.pool[j_id].stamp = f"{j_id}@{update_time}" + self.pool[j_id].stamp = f'{j_id}@{update_time}' except (KeyError, TypeError): pass @@ -146,8 +146,8 @@ def set_job_attr(self, job_d, attr_key, attr_val): f'{ID_DELIM}{name}{ID_DELIM}{sub_num}') try: setattr(self.pool[j_id], attr_key, attr_val) - self.pool[j_id].stamp = f"{j_id}@{update_time}" - except (KeyError, TypeError): + self.pool[j_id].stamp = f'{j_id}@{update_time}' + except (KeyError, TypeError, AttributeError): pass def set_job_state(self, job_d, status): @@ -160,14 +160,14 @@ def set_job_state(self, job_d, status): if status in JOB_STATUSES_ALL: try: self.pool[j_id].state = status - self.pool[j_id].stamp = f"{j_id}@{update_time}" + self.pool[j_id].stamp = f'{j_id}@{update_time}' except KeyError: pass def set_job_time(self, job_d, event_key, time_str=None): """Set an event time in job pool object. - Set values of both event_key + "_time" and event_key + "_time_string". + Set values of both event_key + '_time' and event_key + '_time_string'. """ update_time = time() point, name, sub_num = self.parse_job_item(job_d) @@ -176,8 +176,8 @@ def set_job_time(self, job_d, event_key, time_str=None): f'{ID_DELIM}{name}{ID_DELIM}{sub_num}') try: setattr(self.pool[j_id], event_key + '_time', time_str) - self.pool[j_id].stamp = f"{j_id}@{update_time}" - except KeyError: + self.pool[j_id].stamp = f'{j_id}@{update_time}' + except (KeyError, TypeError, AttributeError): pass @staticmethod @@ -187,14 +187,14 @@ def parse_job_item(item): or name.point.submit_num syntax (back compat). """ submit_num = None - if item.count("/") > 1: - point_str, name_str, submit_num = item.split("/", 2) - elif "/" in item: - point_str, name_str = item.split("/", 1) - elif item.count(".") > 1: - name_str, point_str, submit_num = item.split(".", 2) - elif "." in item: - name_str, point_str = item.split(".", 1) + if item.count('/') > 1: + point_str, name_str, submit_num = item.split('/', 2) + elif '/' in item: + point_str, name_str = item.split('/', 1) + elif item.count('.') > 1: + name_str, point_str, submit_num = item.split('.', 2) + elif '.' in item: + name_str, point_str = item.split('.', 1) else: name_str, point_str = (item, None) try: diff --git a/cylc/flow/tests/test_job_pool.py b/cylc/flow/tests/test_job_pool.py new file mode 100644 index 00000000000..44d96b268ab --- /dev/null +++ b/cylc/flow/tests/test_job_pool.py @@ -0,0 +1,162 @@ +#!/usr/bin/env python3 + +# THIS FILE IS PART OF THE CYLC SUITE ENGINE. +# Copyright (C) 2008-2019 NIWA & British Crown (Met Office) & Contributors. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +import unittest +from copy import copy + +from cylc.flow.job_pool import JobPool, JOB_STATUSES_ALL +from cylc.flow.ws_data_mgr import ID_DELIM +from cylc.flow.wallclock import get_current_time_string + + +JOB_CONFIG = { + 'owner': 'captain', + 'host': 'commet', + 'submit_num': 3, + 'task_id': 'foo.30010101T01', + 'batch_system_name': 'background', + 'env-script': None, + 'err-script': None, + 'exit-script': None, + 'execution_time_limit': None, + 'job_log_dir': '/home/captain/cylc-run/baz/log/job/30010101T01/foo/03', + 'init-script': None, + 'post-script': None, + 'pre-script': None, + 'script': 'sleep 5; echo "I come in peace"', + 'work_d': None, + 'batch_system_conf': {}, + 'directives': {}, + 'environment': {}, + 'param_env_tmpl': {}, + 'param_var': {}, + 'logfiles': [], +} + + +class TestJobPool(unittest.TestCase): + + def setUp(self) -> None: + super(TestJobPool, self).setUp() + self.job_pool = JobPool('baz', 'captain') + self.ext_id = ( + f'captain{ID_DELIM}baz{ID_DELIM}' + f'30010101T01{ID_DELIM}foo{ID_DELIM}3' + ) + self.int_id = f'30010101T01/foo/03' + + def test_insert_job(self): + """Test method that adds a new job to the pool.""" + self.assertEqual(0, len(self.job_pool.pool)) + self.job_pool.insert_job(JOB_CONFIG) + self.assertEqual(1, len(self.job_pool.pool)) + self.assertTrue(self.ext_id in self.job_pool.pool) + + def test_add_job_msg(self): + """Test method adding messages to job element.""" + self.job_pool.insert_job(JOB_CONFIG) + job = self.job_pool.pool[self.ext_id] + old_stamp = copy(job.stamp) + self.assertEqual(0, len(job.messages)) + self.job_pool.add_job_msg('NotJobID', 'The Atomic Age') + self.assertEqual(0, len(job.messages)) + self.job_pool.add_job_msg(self.int_id, 'The Atomic Age') + self.assertNotEqual(old_stamp, job.stamp) + self.assertEqual(1, len(job.messages)) + + def test_remove_job(self): + """Test method removing a job from the pool via internal job id.""" + self.job_pool.insert_job(JOB_CONFIG) + jobs = self.job_pool.pool + self.assertEqual(1, len(jobs)) + self.job_pool.remove_job('NotJobID') + self.assertEqual(1, len(jobs)) + self.job_pool.remove_job(self.int_id) + self.assertEqual(0, len(jobs)) + + def test_remove_task_jobs(self): + """Test method removing jobs from the pool via internal task ID.""" + self.job_pool.insert_job(JOB_CONFIG) + jobs = self.job_pool.pool + self.assertEqual(1, len(jobs)) + self.job_pool.remove_task_jobs('NotTaskID') + self.assertEqual(1, len(jobs)) + task_id = self.job_pool.pool[self.ext_id].task_proxy + self.job_pool.remove_task_jobs(task_id) + self.assertEqual(0, len(jobs)) + + def test_set_job_attr(self): + """Test method setting job attribute value.""" + self.job_pool.insert_job(JOB_CONFIG) + job = self.job_pool.pool[self.ext_id] + old_exit_script = copy(job.exit_script) + self.job_pool.set_job_attr(self.int_id, 'leave_scripting', 'rm -v *') + self.assertEqual(old_exit_script, job.exit_script) + self.job_pool.set_job_attr(self.int_id, 'exit_script', 10.0) + self.assertEqual(old_exit_script, job.exit_script) + self.job_pool.set_job_attr(self.int_id, 'exit_script', 'rm -v *') + self.assertNotEqual(old_exit_script, job.exit_script) + + def test_set_job_state(self): + """Test method setting the job state.""" + self.job_pool.insert_job(JOB_CONFIG) + job = self.job_pool.pool[self.ext_id] + old_state = copy(job.state) + self.job_pool.set_job_state(self.int_id, 'waiting') + self.assertEqual(old_state, job.state) + self.job_pool.set_job_state('NotJobID', JOB_STATUSES_ALL[0]) + self.assertEqual(old_state, job.state) + self.job_pool.set_job_state(self.int_id, JOB_STATUSES_ALL[-1]) + self.assertNotEqual(old_state, job.state) + + def test_set_job_time(self): + """Test method setting event time.""" + event_time = get_current_time_string() + self.job_pool.insert_job(JOB_CONFIG) + job = self.job_pool.pool[self.ext_id] + old_time = copy(job.submitted_time) + self.job_pool.set_job_time(self.int_id, 'jumped', event_time) + self.assertEqual(old_time, job.submitted_time) + self.job_pool.set_job_time(self.int_id, 'submitted', event_time) + self.assertNotEqual(old_time, job.submitted_time) + + def test_parse_job_item(self): + """Test internal id parsing method.""" + point, name, sub_num = self.job_pool.parse_job_item(self.int_id) + tpoint, tname, tsub_num = self.int_id.split('/', 2) + self.assertEqual( + (point, name, sub_num), (tpoint, tname, int(tsub_num))) + tpoint, tname, tsub_num = self.job_pool.parse_job_item( + f'{point}/{name}') + self.assertEqual((point, name, None), (tpoint, tname, tsub_num)) + tpoint, tname, tsub_num = self.job_pool.parse_job_item( + f'{name}.{point}.{sub_num}') + self.assertEqual((point, name, sub_num), (tpoint, tname, tsub_num)) + tpoint, tname, tsub_num = self.job_pool.parse_job_item( + f'{name}.{point}.NotNumber') + self.assertEqual((point, name, None), (tpoint, tname, tsub_num)) + tpoint, tname, tsub_num = self.job_pool.parse_job_item( + f'{name}.{point}') + self.assertEqual((point, name, None), (tpoint, tname, tsub_num)) + tpoint, tname, tsub_num = self.job_pool.parse_job_item( + f'{name}') + self.assertEqual((None, name, None), (tpoint, tname, tsub_num)) + + +if __name__ == '__main__': + main()