Skip to content

Commit

Permalink
Job pool unit tests
Browse files Browse the repository at this point in the history
Add more coverage
  • Loading branch information
dwsutherland committed Sep 24, 2019
1 parent 7d2f06c commit 0887517
Show file tree
Hide file tree
Showing 2 changed files with 186 additions and 24 deletions.
48 changes: 24 additions & 24 deletions cylc/flow/job_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ class JobPool(object):
"""Pool of protobuf job messages."""
# TODO: description, args, and types

ERR_PREFIX_JOBID_MATCH = "No matching jobs found: "
ERR_PREFIX_JOB_NOT_ON_SEQUENCE = "Invalid cycle point for job: "
ERR_PREFIX_JOBID_MATCH = 'No matching jobs found: '
ERR_PREFIX_JOB_NOT_ON_SEQUENCE = 'Invalid cycle point for job: '

def __init__(self, suite, owner):
self.suite = suite
Expand All @@ -64,7 +64,7 @@ def insert_job(self, job_conf):
t_id = f'{self.workflow_id}{ID_DELIM}{point_string}{ID_DELIM}{name}'
j_id = f'{t_id}{ID_DELIM}{sub_num}'
j_buf = PbJob(
stamp=f"{j_id}@{update_time}",
stamp=f'{j_id}@{update_time}',
id=j_id,
submit_num=sub_num,
state=JOB_STATUSES_ALL[0],
Expand All @@ -86,34 +86,34 @@ def insert_job(self, job_conf):
cycle_point=point_string,
)
j_buf.batch_sys_conf.extend(
[f"{key}={val}" for key, val in
[f'{key}={val}' for key, val in
job_conf['batch_system_conf'].items()])
j_buf.directives.extend(
[f"{key}={val}" for key, val in
[f'{key}={val}' for key, val in
job_conf['directives'].items()])
j_buf.environment.extend(
[f"{key}={val}" for key, val in
[f'{key}={val}' for key, val in
job_conf['environment'].items()])
j_buf.param_env_tmpl.extend(
[f"{key}={val}" for key, val in
[f'{key}={val}' for key, val in
job_conf['param_env_tmpl'].items()])
j_buf.param_var.extend(
[f"{key}={val}" for key, val in
[f'{key}={val}' for key, val in
job_conf['param_var'].items()])
j_buf.extra_logs.extend(job_conf['logfiles'])
self.pool[j_id] = j_buf
self.task_jobs.setdefault(t_id, []).append(j_id)

def add_job_msg(self, job_d, msg):
"""Set job attribute."""
"""Add message to job."""
update_time = time()
point, name, sub_num = self.parse_job_item(job_d)
j_id = (
f'{self.workflow_id}{ID_DELIM}{point}'
f'{ID_DELIM}{name}{ID_DELIM}{sub_num}')
try:
self.pool[j_id].messages.append(msg)
self.pool[j_id].stamp = f"{j_id}@{update_time}"
self.pool[j_id].stamp = f'{j_id}@{update_time}'
except (KeyError, TypeError):
pass

Expand Down Expand Up @@ -146,8 +146,8 @@ def set_job_attr(self, job_d, attr_key, attr_val):
f'{ID_DELIM}{name}{ID_DELIM}{sub_num}')
try:
setattr(self.pool[j_id], attr_key, attr_val)
self.pool[j_id].stamp = f"{j_id}@{update_time}"
except (KeyError, TypeError):
self.pool[j_id].stamp = f'{j_id}@{update_time}'
except (KeyError, TypeError, AttributeError):
pass

def set_job_state(self, job_d, status):
Expand All @@ -160,14 +160,14 @@ def set_job_state(self, job_d, status):
if status in JOB_STATUSES_ALL:
try:
self.pool[j_id].state = status
self.pool[j_id].stamp = f"{j_id}@{update_time}"
self.pool[j_id].stamp = f'{j_id}@{update_time}'
except KeyError:
pass

def set_job_time(self, job_d, event_key, time_str=None):
"""Set an event time in job pool object.
Set values of both event_key + "_time" and event_key + "_time_string".
Set values of both event_key + '_time' and event_key + '_time_string'.
"""
update_time = time()
point, name, sub_num = self.parse_job_item(job_d)
Expand All @@ -176,8 +176,8 @@ def set_job_time(self, job_d, event_key, time_str=None):
f'{ID_DELIM}{name}{ID_DELIM}{sub_num}')
try:
setattr(self.pool[j_id], event_key + '_time', time_str)
self.pool[j_id].stamp = f"{j_id}@{update_time}"
except KeyError:
self.pool[j_id].stamp = f'{j_id}@{update_time}'
except (KeyError, TypeError, AttributeError):
pass

@staticmethod
Expand All @@ -187,14 +187,14 @@ def parse_job_item(item):
or name.point.submit_num syntax (back compat).
"""
submit_num = None
if item.count("/") > 1:
point_str, name_str, submit_num = item.split("/", 2)
elif "/" in item:
point_str, name_str = item.split("/", 1)
elif item.count(".") > 1:
name_str, point_str, submit_num = item.split(".", 2)
elif "." in item:
name_str, point_str = item.split(".", 1)
if item.count('/') > 1:
point_str, name_str, submit_num = item.split('/', 2)
elif '/' in item:
point_str, name_str = item.split('/', 1)
elif item.count('.') > 1:
name_str, point_str, submit_num = item.split('.', 2)
elif '.' in item:
name_str, point_str = item.split('.', 1)
else:
name_str, point_str = (item, None)
try:
Expand Down
162 changes: 162 additions & 0 deletions cylc/flow/tests/test_job_pool.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
#!/usr/bin/env python3

# THIS FILE IS PART OF THE CYLC SUITE ENGINE.
# Copyright (C) 2008-2019 NIWA & British Crown (Met Office) & Contributors.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

import unittest
from copy import copy

from cylc.flow.job_pool import JobPool, JOB_STATUSES_ALL
from cylc.flow.ws_data_mgr import ID_DELIM
from cylc.flow.wallclock import get_current_time_string


JOB_CONFIG = {
'owner': 'captain',
'host': 'commet',
'submit_num': 3,
'task_id': 'foo.30010101T01',
'batch_system_name': 'background',
'env-script': None,
'err-script': None,
'exit-script': None,
'execution_time_limit': None,
'job_log_dir': '/home/captain/cylc-run/baz/log/job/30010101T01/foo/03',
'init-script': None,
'post-script': None,
'pre-script': None,
'script': 'sleep 5; echo "I come in peace"',
'work_d': None,
'batch_system_conf': {},
'directives': {},
'environment': {},
'param_env_tmpl': {},
'param_var': {},
'logfiles': [],
}


class TestJobPool(unittest.TestCase):

def setUp(self) -> None:
super(TestJobPool, self).setUp()
self.job_pool = JobPool('baz', 'captain')
self.ext_id = (
f'captain{ID_DELIM}baz{ID_DELIM}'
f'30010101T01{ID_DELIM}foo{ID_DELIM}3'
)
self.int_id = f'30010101T01/foo/03'

def test_insert_job(self):
"""Test method that adds a new job to the pool."""
self.assertEqual(0, len(self.job_pool.pool))
self.job_pool.insert_job(JOB_CONFIG)
self.assertEqual(1, len(self.job_pool.pool))
self.assertTrue(self.ext_id in self.job_pool.pool)

def test_add_job_msg(self):
"""Test method adding messages to job element."""
self.job_pool.insert_job(JOB_CONFIG)
job = self.job_pool.pool[self.ext_id]
old_stamp = copy(job.stamp)
self.assertEqual(0, len(job.messages))
self.job_pool.add_job_msg('NotJobID', 'The Atomic Age')
self.assertEqual(0, len(job.messages))
self.job_pool.add_job_msg(self.int_id, 'The Atomic Age')
self.assertNotEqual(old_stamp, job.stamp)
self.assertEqual(1, len(job.messages))

def test_remove_job(self):
"""Test method removing a job from the pool via internal job id."""
self.job_pool.insert_job(JOB_CONFIG)
jobs = self.job_pool.pool
self.assertEqual(1, len(jobs))
self.job_pool.remove_job('NotJobID')
self.assertEqual(1, len(jobs))
self.job_pool.remove_job(self.int_id)
self.assertEqual(0, len(jobs))

def test_remove_task_jobs(self):
"""Test method removing jobs from the pool via internal task ID."""
self.job_pool.insert_job(JOB_CONFIG)
jobs = self.job_pool.pool
self.assertEqual(1, len(jobs))
self.job_pool.remove_task_jobs('NotTaskID')
self.assertEqual(1, len(jobs))
task_id = self.job_pool.pool[self.ext_id].task_proxy
self.job_pool.remove_task_jobs(task_id)
self.assertEqual(0, len(jobs))

def test_set_job_attr(self):
"""Test method setting job attribute value."""
self.job_pool.insert_job(JOB_CONFIG)
job = self.job_pool.pool[self.ext_id]
old_exit_script = copy(job.exit_script)
self.job_pool.set_job_attr(self.int_id, 'leave_scripting', 'rm -v *')
self.assertEqual(old_exit_script, job.exit_script)
self.job_pool.set_job_attr(self.int_id, 'exit_script', 10.0)
self.assertEqual(old_exit_script, job.exit_script)
self.job_pool.set_job_attr(self.int_id, 'exit_script', 'rm -v *')
self.assertNotEqual(old_exit_script, job.exit_script)

def test_set_job_state(self):
"""Test method setting the job state."""
self.job_pool.insert_job(JOB_CONFIG)
job = self.job_pool.pool[self.ext_id]
old_state = copy(job.state)
self.job_pool.set_job_state(self.int_id, 'waiting')
self.assertEqual(old_state, job.state)
self.job_pool.set_job_state('NotJobID', JOB_STATUSES_ALL[0])
self.assertEqual(old_state, job.state)
self.job_pool.set_job_state(self.int_id, JOB_STATUSES_ALL[-1])
self.assertNotEqual(old_state, job.state)

def test_set_job_time(self):
"""Test method setting event time."""
event_time = get_current_time_string()
self.job_pool.insert_job(JOB_CONFIG)
job = self.job_pool.pool[self.ext_id]
old_time = copy(job.submitted_time)
self.job_pool.set_job_time(self.int_id, 'jumped', event_time)
self.assertEqual(old_time, job.submitted_time)
self.job_pool.set_job_time(self.int_id, 'submitted', event_time)
self.assertNotEqual(old_time, job.submitted_time)

def test_parse_job_item(self):
"""Test internal id parsing method."""
point, name, sub_num = self.job_pool.parse_job_item(self.int_id)
tpoint, tname, tsub_num = self.int_id.split('/', 2)
self.assertEqual(
(point, name, sub_num), (tpoint, tname, int(tsub_num)))
tpoint, tname, tsub_num = self.job_pool.parse_job_item(
f'{point}/{name}')
self.assertEqual((point, name, None), (tpoint, tname, tsub_num))
tpoint, tname, tsub_num = self.job_pool.parse_job_item(
f'{name}.{point}.{sub_num}')
self.assertEqual((point, name, sub_num), (tpoint, tname, tsub_num))
tpoint, tname, tsub_num = self.job_pool.parse_job_item(
f'{name}.{point}.NotNumber')
self.assertEqual((point, name, None), (tpoint, tname, tsub_num))
tpoint, tname, tsub_num = self.job_pool.parse_job_item(
f'{name}.{point}')
self.assertEqual((point, name, None), (tpoint, tname, tsub_num))
tpoint, tname, tsub_num = self.job_pool.parse_job_item(
f'{name}')
self.assertEqual((None, name, None), (tpoint, tname, tsub_num))


if __name__ == '__main__':
unittest.main()

0 comments on commit 0887517

Please sign in to comment.