Skip to content

Commit

Permalink
Fixing Broken Tests (Large number of squashed commits)
Browse files Browse the repository at this point in the history
removed irrelevent test events/49

tests: fix polling tests

replaced global config hosts with job platforms

Fixed most of the reload tests by replacing user_at_host with platform in the database

fix cylc-cat-log

fixed another bit of cat-log

fix cylc get-site-config tests

reset a test which should fail

expansion removed from cylc get-site-config

makde tests evaluate results of cylc-get-config

fixed tests/database/04-lock-recover.t to exand /home/h02/tpilling

fixed cylc-poll/02 to work in Cylc8 way

fix _more_ tests relying on un-evaluated /home/h02/tpilling; skip a test which may not be approprate to fix; improved docsting in test header

refactored unit-tests for forward lookup

fixed test_job_pool

fixed test_job_file.py a la platforms

fix sample database in test/job-submission-01

added sample platforms to all tests

fixed pbs getting job length limit from platform rather than job settinbgs

Oliver's fix for shellcheck failure

fixed issue with pbs batch system unit test

skipped broken tests

fixed unit tests dependent on flow.rc

test_config.py::TestSuiteConfig::test_xfunction_imports

made sure a default localhost is alway available

Revert "made sure a default localhost is alway available"

This reverts commit 0d9d1cc.
  • Loading branch information
wxtim committed Jun 29, 2020
1 parent e8e4894 commit f12a089
Show file tree
Hide file tree
Showing 109 changed files with 468 additions and 335 deletions.
13 changes: 5 additions & 8 deletions cylc/flow/batch_sys_handlers/pbs.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,10 @@ class PBSHandler(object):
# PBS fails a job submit if job "name" in "-N name" is too long.
# For version 12 or below, this is 15 characters.
# You can modify this in the site/user `global.cfg` like this
# [hosts]
# [[the-name-of-my-pbs-host]]
# [[[batch systems]]]
# [[[[pbs]]]]
# # E.g.: PBS 11
# job name length maximum = 15
# [job platforms]
# [[the-name-of-my-pbs-platform]]
# batch system = pbs
# job name length maximum = 15
JOB_NAME_LEN_MAX = 236
KILL_CMD_TMPL = "qdel '%(job_id)s'"
# N.B. The "qstat JOB_ID" command returns 1 if JOB_ID is no longer in the
Expand All @@ -47,8 +45,7 @@ def format_directives(self, job_conf):
directives = job_conf["directives"].__class__() # an ordereddict

directives["-N"] = job_conf["task_id"] + "." + job_conf["suite_name"]
job_name_len_max = job_conf['batch_system_conf'].get(
"job name length maximum", self.JOB_NAME_LEN_MAX)
job_name_len_max = job_conf['platform']["job name length maximum"]
if job_name_len_max:
directives["-N"] = directives["-N"][0:job_name_len_max]

Expand Down
4 changes: 2 additions & 2 deletions cylc/flow/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,8 @@ def __init__(
if output_fname:
output_fname = os.path.expandvars(output_fname)
self.pcfg = RawSuiteConfig(
fpath,
output_fname,
fpath,
output_fname,
template_vars
)
self.mem_log("config.py: after RawSuiteConfig init")
Expand Down
15 changes: 9 additions & 6 deletions cylc/flow/hostuserutil.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,9 +210,9 @@ def is_remote_user(self, name):
return self.remote_users[name]

def _is_remote_platform(self, platform):
"""Return True if any job host in platform have different IP address
"""Return True if any job host in platform have different IP address
to the current host.
Return False if name is None.
Return True if host is unknown.
Expand Down Expand Up @@ -266,10 +266,12 @@ def is_remote(host, owner):
"""Shorthand for HostUtil.get_inst().is_remote(host, owner)."""
return HostUtil.get_inst().is_remote(host, owner)


def is_remote_platform(platform):
"""Shorthand for HostUtil.get_inst()._is_remote_platform(host, owner)."""
return HostUtil.get_inst()._is_remote_platform(platform)


def is_remote_host(name):
"""Shorthand for HostUtil.get_inst().is_remote_host(name)."""
return HostUtil.get_inst().is_remote_host(name)
Expand All @@ -279,6 +281,7 @@ def is_remote_user(name):
"""Return True if name is not a name of the current user."""
return HostUtil.get_inst().is_remote_user(name)


def get_host_from_platform(platform, method=None):
"""Placeholder for a more sophisticated function which returns a host
given an itask with a platform attribute
Expand All @@ -290,19 +293,19 @@ def get_host_from_platform(platform, method=None):
Name a function to use when selecting hosts from list provided
by platform.
If unset then `[platform][remote hosts][0]` will be returned
Returns:
hostname (str):
TODO:
Make methods other than None work:
- Random Selection
- Random Selection with check for host availability
"""
if method is None:
return istask.platform['remote hosts'][0]
else:
raise NotImplementedError(
f'method {method} is not a valid input for get_host_from_platform'
)
)
7 changes: 4 additions & 3 deletions cylc/flow/job_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,9 @@ def write(self, local_job_file_path, job_conf, check_syntax=True):
# variables: NEXT_CYCLE=$( cylc cycle-point --offset-hours=6 )
platform = job_conf['platform']
tmp_name = os.path.expandvars(local_job_file_path + '.tmp')
run_d = os.path.expandvars(get_remote_suite_run_dir(platform, job_conf['suite_name']))
run_d = os.path.expandvars(
get_remote_suite_run_dir(platform, job_conf['suite_name'])
)
try:
with open(tmp_name, 'w') as handle:
self._write_header(handle, job_conf)
Expand Down Expand Up @@ -118,7 +120,6 @@ def _check_script_value(value):
return True
return False


@staticmethod
def _write_header(handle, job_conf):
"""Write job script header."""
Expand Down Expand Up @@ -186,7 +187,7 @@ def _write_suite_environment(self, handle, job_conf, run_d):
work_d = get_remote_suite_work_dir(
job_conf["platform"], job_conf['suite_name'])
handle.write('\n export CYLC_SUITE_RUN_DIR="%s"' % run_d)
if work_d != run_d:
if os.path.expandvars(work_d) != run_d:
# Note: not an environment variable, but used by job.sh
handle.write('\n CYLC_SUITE_WORK_DIR_ROOT="%s"' % work_d)
if job_conf['remote_suite_d']:
Expand Down
13 changes: 7 additions & 6 deletions cylc/flow/job_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from copy import deepcopy
import os
from time import time
from random import choice

from cylc.flow import LOG, ID_DELIM
from cylc.flow.exceptions import SuiteConfigError
Expand All @@ -33,6 +34,7 @@
TASK_STATUS_RUNNING, TASK_STATUS_SUCCEEDED,
TASK_STATUS_FAILED)
from cylc.flow.data_messages_pb2 import PbJob, JDeltas
from cylc.flow.platform_lookup import forward_lookup

JOB_STATUSES_ALL = [
TASK_STATUS_READY,
Expand Down Expand Up @@ -128,19 +130,18 @@ def insert_db_job(self, row_idx, row):
if row_idx == 0:
LOG.info("LOADING job data")
(point_string, name, status, submit_num, time_submit, time_run,
time_run_exit, batch_sys_name, batch_sys_job_id, user_at_host) = row
time_run_exit, batch_sys_name, batch_sys_job_id, platform_name) = row
if status not in JOB_STATUS_SET:
return
t_id = f'{self.workflow_id}{ID_DELIM}{point_string}{ID_DELIM}{name}'
j_id = f'{t_id}{ID_DELIM}{submit_num}'
try:
tdef = self.schd.config.get_taskdef(name)
j_owner = self.schd.owner
if user_at_host:
if '@' in user_at_host:
j_owner, j_host = user_at_host.split('@')
else:
j_host = user_at_host
if platform_name:
j_host = choice(
forward_lookup(platform_name)['remote hosts']
)
else:
j_host = self.schd.host
j_buf = PbJob(
Expand Down
11 changes: 9 additions & 2 deletions cylc/flow/network/scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,9 @@ def get_scan_items_from_fs(
"""
if owner_pattern is None:
# Run directory of current user only
run_dirs = [(os.path.expandvars(forward_lookup()['run directory']), None)]
run_dirs = [
(os.path.expandvars(forward_lookup()['run directory']), None)
]
else:
# Run directory of all users matching "owner_pattern".
# But skip those with /nologin or /false shells
Expand All @@ -248,7 +250,12 @@ def get_scan_items_from_fs(
if any(pwent.pw_shell.endswith(s) for s in skips):
continue
if owner_pattern.match(pwent.pw_name):
run_dirs.append((os.path.expandvars(forward_lookup()['run directory']), None))
run_dirs.append(
(
os.path.expandvars(forward_lookup()['run directory']),
None
)
)
if cylc.flow.flags.debug:
sys.stderr.write('Listing suites:%s%s\n' % (
DEBUG_DELIM, DEBUG_DELIM.join(item[1] for item in run_dirs if
Expand Down
6 changes: 3 additions & 3 deletions cylc/flow/pathutil.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def get_remote_suite_work_dir(platform, suite, *args):

def get_suite_run_dir(suite, *args):
"""Return local suite run directory, join any extra args."""
return os.path.expandvars(
return os.path.expandvars(
os.path.join(
forward_lookup()['run directory'], suite, *args
)
Expand Down Expand Up @@ -91,7 +91,7 @@ def get_suite_run_share_dir(suite, *args):

def get_suite_run_work_dir(suite, *args):
"""Return local suite work/work directory, join any extra args."""
return os.path.expandvars(
return os.path.expandvars(
os.path.join(
forward_lookup()['work directory'], suite, 'work', *args
)
Expand Down Expand Up @@ -133,4 +133,4 @@ def make_suite_run_tree(suite):
if dir_:
dir_ = os.path.expandvars(dir_)
os.makedirs(dir_, exist_ok=True)
LOG.debug('%s: directory created', dir_)
LOG.debug('%s: directory created', dir_)
20 changes: 12 additions & 8 deletions cylc/flow/platform_lookup.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from cylc.flow.cfgspec.glbl_cfg import glbl_cfg


def forward_lookup(platform_name=None):
def forward_lookup(platform_name=None, platforms=None):
"""
Find out which job platform to use given a list of possible platforms and
a task platform string.
Expand All @@ -33,17 +33,20 @@ def forward_lookup(platform_name=None):
Args:
platform_name (str):
name of platform to be retrieved.
platforms ():
globalrc platforms given as a dict for logic testing purposes
Returns:
platform (dict):
object containing settings for a platform, loaded from Global Config
object containing settings for a platform, loaded from
Global Config.
TODO:
TODO:
- Refactor testing for this method. Ideally including a method
example.
- Work out what to do with cases where localhost not set.
"""
platforms = glbl_cfg().get(['job platforms'])
if platforms is None:
platforms = glbl_cfg().get(['job platforms'])

if platform_name is None:
platform_data = platforms['localhost']
Expand All @@ -55,12 +58,13 @@ def forward_lookup(platform_name=None):
# defined platforms.
for platform_name_re in reversed(list(platforms)):
if re.fullmatch(platform_name_re, platform_name):
platform_data = platforms[platform_name]
platform_data = platforms[platform_name_re]
if not platform_data:
platform_data = forward_lookup('localhost').copy()
platform_data['remote hosts'] = platform_name
platform_data['name'] = platform_name
return platform_data



raise PlatformLookupError(
f"No matching platform \"{platform_name}\" found")

Expand Down
19 changes: 9 additions & 10 deletions cylc/flow/rundb.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,6 @@ class CylcSuiteDAO(object):
["time_run_exit"],
["run_signal"],
["run_status", {"datatype": "INTEGER"}],
["user_at_host"],
["platform_name"],
["batch_sys_name"],
["batch_sys_job_id"],
Expand Down Expand Up @@ -606,7 +605,7 @@ def select_job_pool_for_restart(self, callback, id_key=None):
Invoke callback(row_idx, row) on each row, where each row contains:
[cycle, name, status, submit_num, time_submit, time_run,
time_run_exit, batch_sys_name, batch_sys_job_id, user_at_host]
time_run_exit, batch_sys_name, batch_sys_job_id, platform_name]
If id_key is specified,
select from task_pool table if id_key == CHECKPOINT_LATEST_ID.
Expand All @@ -623,7 +622,7 @@ def select_job_pool_for_restart(self, callback, id_key=None):
%(task_jobs)s.time_run_exit,
%(task_jobs)s.batch_sys_name,
%(task_jobs)s.batch_sys_job_id,
%(task_jobs)s.user_at_host
%(task_jobs)s.platform_name
FROM
%(task_jobs)s
JOIN
Expand Down Expand Up @@ -735,7 +734,7 @@ def select_task_pool_for_restart(self, callback, id_key=None):
Invoke callback(row_idx, row) on each row, where each row contains:
[cycle, name, spawned, is_late, status, is_held, submit_num,
try_num, user_at_host, platform_name, time_submit, time_run, timeout, outputs]
try_num, platform_name, time_submit, time_run, timeout, outputs]
If id_key is specified,
select from task_pool table if id_key == CHECKPOINT_LATEST_ID.
Expand All @@ -751,7 +750,7 @@ def select_task_pool_for_restart(self, callback, id_key=None):
%(task_pool)s.is_held,
%(task_states)s.submit_num,
%(task_jobs)s.try_num,
%(task_jobs)s.user_at_host,
%(task_jobs)s.platform_name,
%(task_jobs)s.time_submit,
%(task_jobs)s.time_run,
%(task_timeout_timers)s.timeout,
Expand Down Expand Up @@ -808,7 +807,7 @@ def select_task_times(self):
SELECT
name,
cycle,
user_at_host,
platform_name,
batch_sys_name,
time_submit,
time_run,
Expand Down Expand Up @@ -993,8 +992,8 @@ def upgrade_to_platforms(self):
# check if upgrade required
schema = conn.execute(rf'PRAGMA table_info({self.TABLE_TASK_JOBS})')
for _, name, *_ in schema:
if name == 'platform':
LOG.debug('platform column present - skipping db upgrade')
if name == 'platform_name':
LOG.debug('platform_name column present - skipping db upgrade')
return False

# Perform upgrade:
Expand All @@ -1013,7 +1012,7 @@ def upgrade_to_platforms(self):
ALTER TABLE
{table}
ADD COLUMN
platform TEXT
platform_name TEXT
'''
)
job_platforms = glbl_cfg(cached=False).get(['job platforms'])
Expand Down Expand Up @@ -1041,7 +1040,7 @@ def upgrade_to_platforms(self):
{table}
SET
user=?,
platform=?
platform_name=?
WHERE
cycle==?
AND name==?
Expand Down
22 changes: 12 additions & 10 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -621,19 +621,20 @@ def load_tasks_for_restart(self):
auths = set()
for itask in self.pool.get_rh_tasks():
if itask.state(*TASK_STATUSES_ACTIVE):
auths.add(itask.task_platform)
auths.add(itask.platform['name'])
while auths:
for host, owner in auths.copy():
for platform_name in auths.copy():
if (
self.task_job_mgr.task_remote_mgr.remote_init(
host, owner, self.curve_auth, self.client_pub_key_dir)
) is not None:
auths.remove((host, owner))
for platform in auths.copy():
if self.task_job_mgr.task_remote_mgr.remote_init(
platform) is not None:
platform_nameself.curve_auth,
self.client_pub_key_dir
)
is not None
):
auths.remove(
platform, self.curve_auth, self.client_pub_key_dir
platform_name,
self.curve_auth,
self.client_pub_key_dir
)
if auths:
sleep(1.0)
Expand Down Expand Up @@ -1184,7 +1185,8 @@ def load_suiterc(self, is_reload=False):
load_type = "run"
file_name = os.path.expandvars(
get_suite_run_rc_dir(
self.suite, f"{time_str}-{load_type}.rc")
self.suite, f"{time_str}-{load_type}.rc"
)
)
with open(file_name, "wb") as handle:
handle.write(b"# cylc-version: %s\n" % CYLC_VERSION.encode())
Expand Down
Loading

0 comments on commit f12a089

Please sign in to comment.