Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

job messages list field #3373

Merged
merged 2 commits into from
Sep 24, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 50 additions & 34 deletions cylc/flow/job_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ class JobPool(object):
"""Pool of protobuf job messages."""
# TODO: description, args, and types

ERR_PREFIX_JOBID_MATCH = "No matching jobs found: "
ERR_PREFIX_JOB_NOT_ON_SEQUENCE = "Invalid cycle point for job: "
ERR_PREFIX_JOBID_MATCH = 'No matching jobs found: '
ERR_PREFIX_JOB_NOT_ON_SEQUENCE = 'Invalid cycle point for job: '

def __init__(self, suite, owner):
self.suite = suite
Expand All @@ -64,7 +64,7 @@ def insert_job(self, job_conf):
t_id = f'{self.workflow_id}{ID_DELIM}{point_string}{ID_DELIM}{name}'
j_id = f'{t_id}{ID_DELIM}{sub_num}'
j_buf = PbJob(
stamp=f"{j_id}@{update_time}",
stamp=f'{j_id}@{update_time}',
id=j_id,
submit_num=sub_num,
state=JOB_STATUSES_ALL[0],
Expand All @@ -86,27 +86,40 @@ def insert_job(self, job_conf):
cycle_point=point_string,
)
j_buf.batch_sys_conf.extend(
[f"{key}={val}" for key, val in
[f'{key}={val}' for key, val in
job_conf['batch_system_conf'].items()])
j_buf.directives.extend(
[f"{key}={val}" for key, val in
[f'{key}={val}' for key, val in
job_conf['directives'].items()])
j_buf.environment.extend(
[f"{key}={val}" for key, val in
[f'{key}={val}' for key, val in
job_conf['environment'].items()])
j_buf.param_env_tmpl.extend(
[f"{key}={val}" for key, val in
[f'{key}={val}' for key, val in
job_conf['param_env_tmpl'].items()])
j_buf.param_var.extend(
[f"{key}={val}" for key, val in
[f'{key}={val}' for key, val in
job_conf['param_var'].items()])
j_buf.extra_logs.extend(job_conf['logfiles'])
self.pool[j_id] = j_buf
self.task_jobs.setdefault(t_id, []).append(j_id)

def add_job_msg(self, job_d, msg):
"""Add message to job."""
update_time = time()
point, name, sub_num = self.parse_job_item(job_d)
j_id = (
f'{self.workflow_id}{ID_DELIM}{point}'
f'{ID_DELIM}{name}{ID_DELIM}{sub_num}')
try:
self.pool[j_id].messages.append(msg)
self.pool[j_id].stamp = f'{j_id}@{update_time}'
except (KeyError, TypeError):
pass

def remove_job(self, job_d):
"""Remove job from pool."""
point, name, sub_num, _ = self.parse_job_item(job_d)
point, name, sub_num = self.parse_job_item(job_d)
t_id = f'{self.workflow_id}{ID_DELIM}{point}{ID_DELIM}{name}'
j_id = f'{t_id}{ID_DELIM}{sub_num}'
try:
Expand All @@ -126,63 +139,66 @@ def remove_task_jobs(self, task_id):

def set_job_attr(self, job_d, attr_key, attr_val):
"""Set job attribute."""
point, name, sub_num, _ = self.parse_job_item(job_d)
update_time = time()
point, name, sub_num = self.parse_job_item(job_d)
j_id = (
f'{self.workflow_id}{ID_DELIM}{point}'
f'{ID_DELIM}{name}{ID_DELIM}{sub_num}')
try:
setattr(self.pool[j_id], attr_key, attr_val)
except (KeyError, TypeError):
self.pool[j_id].stamp = f'{j_id}@{update_time}'
except (KeyError, TypeError, AttributeError):
pass

def set_job_state(self, job_d, status):
"""Set job state."""
point, name, sub_num, _ = self.parse_job_item(job_d)
update_time = time()
point, name, sub_num = self.parse_job_item(job_d)
j_id = (
f'{self.workflow_id}{ID_DELIM}{point}'
f'{ID_DELIM}{name}{ID_DELIM}{sub_num}')
if status in JOB_STATUSES_ALL:
try:
self.pool[j_id].state = status
self.pool[j_id].stamp = f'{j_id}@{update_time}'
except KeyError:
pass

def set_job_time(self, job_d, event_key, time_str=None):
"""Set an event time in job pool object.

Set values of both event_key + "_time" and event_key + "_time_string".
Set values of both event_key + '_time' and event_key + '_time_string'.
"""
point, name, sub_num, _ = self.parse_job_item(job_d)
update_time = time()
point, name, sub_num = self.parse_job_item(job_d)
j_id = (
f'{self.workflow_id}{ID_DELIM}{point}'
f'{ID_DELIM}{name}{ID_DELIM}{sub_num}')
try:
setattr(self.pool[j_id], event_key + '_time', time_str)
except KeyError:
self.pool[j_id].stamp = f'{j_id}@{update_time}'
except (KeyError, TypeError, AttributeError):
pass

@staticmethod
def parse_job_item(item):
"""Parse internal id
point/name/submit_num:state
or name.point.submit_num:state syntax.
point/name/submit_num
or name.point.submit_num syntax (back compat).
"""
if ":" in item:
head, state_str = item.rsplit(":", 1)
else:
head, state_str = (item, None)
if head.count("/") > 1:
point_str, name_str, submit_num = head.split("/", 2)
elif "/" in head:
point_str, name_str = head.split("/", 1)
submit_num = None
elif head.count(".") > 1:
name_str, point_str, submit_num = head.split(".", 2)
elif "." in head:
name_str, point_str = head.split(".", 1)
submit_num = None
submit_num = None
if item.count('/') > 1:
point_str, name_str, submit_num = item.split('/', 2)
elif '/' in item:
point_str, name_str = item.split('/', 1)
elif item.count('.') > 1:
name_str, point_str, submit_num = item.split('.', 2)
elif '.' in item:
name_str, point_str = item.split('.', 1)
else:
name_str, point_str, submit_num = (head, None, None)
if submit_num is not None:
name_str, point_str = (item, None)
try:
sub_num = int(submit_num)
return (point_str, name_str, sub_num, state_str)
except (TypeError, ValueError):
sub_num = None
return (point_str, name_str, sub_num)
1 change: 1 addition & 0 deletions cylc/flow/network/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,7 @@ class Meta:
param_env_tmpl = List(String)
param_var = List(String)
extra_logs = List(String)
messages = List(String)


class Task(ObjectType):
Expand Down
10 changes: 3 additions & 7 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -565,13 +565,9 @@ def process_queued_task_messages(self):
except Empty:
break
self.message_queue.task_done()
if '/' in task_job: # cycle/task-name/submit-num
cycle, task_name, submit_num, _ = (
self.job_pool.parse_job_item(task_job))
task_id = TaskID.get(task_name, cycle)
else: # back compat: task-name.cycle
task_id = task_job
submit_num = None
cycle, task_name, submit_num = (
self.job_pool.parse_job_item(task_job))
task_id = TaskID.get(task_name, cycle)
messages.setdefault(task_id, [])
messages[task_id].append(
(submit_num, event_time, severity, message))
Expand Down
8 changes: 6 additions & 2 deletions cylc/flow/task_events_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -348,9 +348,13 @@ def process_message(

# always update the suite state summary for latest message
if flag == self.FLAG_POLLED:
itask.set_summary_message('%s %s' % (message, self.FLAG_POLLED))
new_msg = f'{message} {self.FLAG_POLLED}'
else:
itask.set_summary_message(message)
new_msg = message
itask.set_summary_message(new_msg)
self.job_pool.add_job_msg(
get_task_job_id(itask.point, itask.tdef.name, submit_num),
new_msg)

# Satisfy my output, if possible, and record the result.
completed_trigger = itask.state.outputs.set_msg_trg_completion(
Expand Down
21 changes: 17 additions & 4 deletions cylc/flow/task_job_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ class TaskJobManager(object):
POLL_FAIL = 'poll failed'
REMOTE_SELECT_MSG = 'waiting for remote host selection'
REMOTE_INIT_MSG = 'remote host initialising'
DRY_RUN_MSG = 'job file written (edit/dry-run)'
KEY_EXECUTE_TIME_LIMIT = TaskEventsManager.KEY_EXECUTE_TIME_LIMIT

def __init__(self, suite, proc_pool, suite_db_mgr, suite_srv_files_mgr,
Expand Down Expand Up @@ -218,6 +219,10 @@ def submit_task_jobs(self, suite, itasks, is_simulation=False):
# Remote is waiting to be initialised
for itask in itasks:
itask.set_summary_message(self.REMOTE_INIT_MSG)
self.job_pool.add_job_msg(
get_task_job_id(
itask.point, itask.tdef.name, itask.submit_num),
self.REMOTE_INIT_MSG)
continue
# Ensure that localhost background/at jobs are recorded as running
# on the host name of the current suite host, rather than just
Expand Down Expand Up @@ -472,6 +477,9 @@ def _kill_task_job_callback(self, suite, itask, cmd_ctx, line):
'ignoring job kill result, unexpected task state: %s' %
itask.state.status)
itask.set_summary_message(log_msg)
self.job_pool.add_job_msg(
get_task_job_id(itask.point, itask.tdef.name, itask.submit_num),
log_msg)
LOG.log(log_lvl, "[%s] -job(%02d) %s" % (
itask.identity, itask.submit_num, log_msg))

Expand Down Expand Up @@ -547,12 +555,14 @@ def _poll_task_job_callback(self, suite, itask, cmd_ctx, line):
ctx.ret_code = 0

# See cylc.flow.batch_sys_manager.JobPollContext
job_d = get_task_job_id(itask.point, itask.tdef.name, itask.submit_num)
try:
job_log_dir, context = line.split('|')[1:3]
items = json.loads(context)
jp_ctx = JobPollContext(job_log_dir, **items)
except TypeError:
itask.set_summary_message(self.POLL_FAIL)
self.job_pool.add_job_msg(job_d, self.POLL_FAIL)
ctx.cmd = cmd_ctx.cmd # print original command on failure
return
except ValueError:
Expand All @@ -565,6 +575,7 @@ def _poll_task_job_callback(self, suite, itask, cmd_ctx, line):
job_log_dir = items.pop('job_log_dir')
except (ValueError, IndexError):
itask.set_summary_message(self.POLL_FAIL)
self.job_pool.add_job_msg(job_d, self.POLL_FAIL)
ctx.cmd = cmd_ctx.cmd # print original command on failure
return
finally:
Expand Down Expand Up @@ -797,16 +808,18 @@ def _prep_submit_task_job(self, suite, itask, dry_run, check_syntax=True):
return False
itask.local_job_file_path = local_job_file_path

if dry_run:
itask.set_summary_message('job file written (edit/dry-run)')
LOG.debug('[%s] -%s', itask, itask.summary['latest_message'])

job_config = deepcopy(job_conf)
job_config['logfiles'] = deepcopy(itask.summary['logfiles'])
job_config['job_log_dir'] = get_task_job_log(
suite, itask.point, itask.tdef.name, itask.submit_num)
itask.jobs.append(job_config['job_d'])
self.job_pool.insert_job(job_config)

if dry_run:
itask.set_summary_message(self.DRY_RUN_MSG)
self.job_pool.add_job_msg(job_config['job_d'], self.DRY_RUN_MSG)
LOG.debug(f'[{itask}] -{self.DRY_RUN_MSG}')

# Return value used by "cylc submit" and "cylc jobscript":
return itask

Expand Down
Loading