Skip to content

Commit

Permalink
job messages added
Browse files Browse the repository at this point in the history
  • Loading branch information
dwsutherland committed Sep 23, 2019
1 parent f9927a5 commit 8643f6b
Show file tree
Hide file tree
Showing 7 changed files with 96 additions and 58 deletions.
62 changes: 39 additions & 23 deletions cylc/flow/job_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,22 @@ def insert_job(self, job_conf):
self.pool[j_id] = j_buf
self.task_jobs.setdefault(t_id, []).append(j_id)

def add_job_msg(self, job_d, msg):
"""Set job attribute."""
update_time = time()
point, name, sub_num = self.parse_job_item(job_d)
j_id = (
f'{self.workflow_id}{ID_DELIM}{point}'
f'{ID_DELIM}{name}{ID_DELIM}{sub_num}')
try:
self.pool[j_id].messages.append(msg)
self.pool[j_id].stamp = f"{j_id}@{update_time}"
except (KeyError, TypeError):
pass

def remove_job(self, job_d):
"""Remove job from pool."""
point, name, sub_num, _ = self.parse_job_item(job_d)
point, name, sub_num = self.parse_job_item(job_d)
t_id = f'{self.workflow_id}{ID_DELIM}{point}{ID_DELIM}{name}'
j_id = f'{t_id}{ID_DELIM}{sub_num}'
try:
Expand All @@ -126,24 +139,28 @@ def remove_task_jobs(self, task_id):

def set_job_attr(self, job_d, attr_key, attr_val):
"""Set job attribute."""
point, name, sub_num, _ = self.parse_job_item(job_d)
update_time = time()
point, name, sub_num = self.parse_job_item(job_d)
j_id = (
f'{self.workflow_id}{ID_DELIM}{point}'
f'{ID_DELIM}{name}{ID_DELIM}{sub_num}')
try:
setattr(self.pool[j_id], attr_key, attr_val)
self.pool[j_id].stamp = f"{j_id}@{update_time}"
except (KeyError, TypeError):
pass

def set_job_state(self, job_d, status):
"""Set job state."""
point, name, sub_num, _ = self.parse_job_item(job_d)
update_time = time()
point, name, sub_num = self.parse_job_item(job_d)
j_id = (
f'{self.workflow_id}{ID_DELIM}{point}'
f'{ID_DELIM}{name}{ID_DELIM}{sub_num}')
if status in JOB_STATUSES_ALL:
try:
self.pool[j_id].state = status
self.pool[j_id].stamp = f"{j_id}@{update_time}"
except KeyError:
pass

Expand All @@ -152,37 +169,36 @@ def set_job_time(self, job_d, event_key, time_str=None):
Set values of both event_key + "_time" and event_key + "_time_string".
"""
point, name, sub_num, _ = self.parse_job_item(job_d)
update_time = time()
point, name, sub_num = self.parse_job_item(job_d)
j_id = (
f'{self.workflow_id}{ID_DELIM}{point}'
f'{ID_DELIM}{name}{ID_DELIM}{sub_num}')
try:
setattr(self.pool[j_id], event_key + '_time', time_str)
self.pool[j_id].stamp = f"{j_id}@{update_time}"
except KeyError:
pass

@staticmethod
def parse_job_item(item):
"""Parse internal id
point/name/submit_num:state
or name.point.submit_num:state syntax.
point/name/submit_num
or name.point.submit_num syntax (back compat).
"""
if ":" in item:
head, state_str = item.rsplit(":", 1)
else:
head, state_str = (item, None)
if head.count("/") > 1:
point_str, name_str, submit_num = head.split("/", 2)
elif "/" in head:
point_str, name_str = head.split("/", 1)
submit_num = None
elif head.count(".") > 1:
name_str, point_str, submit_num = head.split(".", 2)
elif "." in head:
name_str, point_str = head.split(".", 1)
submit_num = None
submit_num = None
if item.count("/") > 1:
point_str, name_str, submit_num = item.split("/", 2)
elif "/" in item:
point_str, name_str = item.split("/", 1)
elif item.count(".") > 1:
name_str, point_str, submit_num = item.split(".", 2)
elif "." in item:
name_str, point_str = item.split(".", 1)
else:
name_str, point_str, submit_num = (head, None, None)
if submit_num is not None:
name_str, point_str = (item, None)
try:
sub_num = int(submit_num)
return (point_str, name_str, sub_num, state_str)
except (TypeError, ValueError):
sub_num = None
return (point_str, name_str, sub_num)
1 change: 1 addition & 0 deletions cylc/flow/network/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,7 @@ class Meta:
param_env_tmpl = List(String)
param_var = List(String)
extra_logs = List(String)
messages = List(String)


class Task(ObjectType):
Expand Down
10 changes: 3 additions & 7 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -565,13 +565,9 @@ def process_queued_task_messages(self):
except Empty:
break
self.message_queue.task_done()
if '/' in task_job: # cycle/task-name/submit-num
cycle, task_name, submit_num, _ = (
self.job_pool.parse_job_item(task_job))
task_id = TaskID.get(task_name, cycle)
else: # back compat: task-name.cycle
task_id = task_job
submit_num = None
cycle, task_name, submit_num = (
self.job_pool.parse_job_item(task_job))
task_id = TaskID.get(task_name, cycle)
messages.setdefault(task_id, [])
messages[task_id].append(
(submit_num, event_time, severity, message))
Expand Down
8 changes: 6 additions & 2 deletions cylc/flow/task_events_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -348,9 +348,13 @@ def process_message(

# always update the suite state summary for latest message
if flag == self.FLAG_POLLED:
itask.set_summary_message('%s %s' % (message, self.FLAG_POLLED))
new_msg = f'{message} {self.FLAG_POLLED}'
else:
itask.set_summary_message(message)
new_msg = message
itask.set_summary_message(new_msg)
self.job_pool.add_job_msg(
get_task_job_id(itask.point, itask.tdef.name, submit_num),
new_msg)

# Satisfy my output, if possible, and record the result.
completed_trigger = itask.state.outputs.set_msg_trg_completion(
Expand Down
21 changes: 17 additions & 4 deletions cylc/flow/task_job_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ class TaskJobManager(object):
POLL_FAIL = 'poll failed'
REMOTE_SELECT_MSG = 'waiting for remote host selection'
REMOTE_INIT_MSG = 'remote host initialising'
DRY_RUN_MSG = 'job file written (edit/dry-run)'
KEY_EXECUTE_TIME_LIMIT = TaskEventsManager.KEY_EXECUTE_TIME_LIMIT

def __init__(self, suite, proc_pool, suite_db_mgr, suite_srv_files_mgr,
Expand Down Expand Up @@ -218,6 +219,10 @@ def submit_task_jobs(self, suite, itasks, is_simulation=False):
# Remote is waiting to be initialised
for itask in itasks:
itask.set_summary_message(self.REMOTE_INIT_MSG)
self.job_pool.add_job_msg(
get_task_job_id(
itask.point, itask.tdef.name, itask.submit_num),
self.REMOTE_INIT_MSG)
continue
# Ensure that localhost background/at jobs are recorded as running
# on the host name of the current suite host, rather than just
Expand Down Expand Up @@ -472,6 +477,9 @@ def _kill_task_job_callback(self, suite, itask, cmd_ctx, line):
'ignoring job kill result, unexpected task state: %s' %
itask.state.status)
itask.set_summary_message(log_msg)
self.job_pool.add_job_msg(
get_task_job_id(itask.point, itask.tdef.name, itask.submit_num),
log_msg)
LOG.log(log_lvl, "[%s] -job(%02d) %s" % (
itask.identity, itask.submit_num, log_msg))

Expand Down Expand Up @@ -547,12 +555,14 @@ def _poll_task_job_callback(self, suite, itask, cmd_ctx, line):
ctx.ret_code = 0

# See cylc.flow.batch_sys_manager.JobPollContext
job_d = get_task_job_id(itask.point, itask.tdef.name, itask.submit_num)
try:
job_log_dir, context = line.split('|')[1:3]
items = json.loads(context)
jp_ctx = JobPollContext(job_log_dir, **items)
except TypeError:
itask.set_summary_message(self.POLL_FAIL)
self.job_pool.add_job_msg(job_d, self.POLL_FAIL)
ctx.cmd = cmd_ctx.cmd # print original command on failure
return
except ValueError:
Expand All @@ -565,6 +575,7 @@ def _poll_task_job_callback(self, suite, itask, cmd_ctx, line):
job_log_dir = items.pop('job_log_dir')
except (ValueError, IndexError):
itask.set_summary_message(self.POLL_FAIL)
self.job_pool.add_job_msg(job_d, self.POLL_FAIL)
ctx.cmd = cmd_ctx.cmd # print original command on failure
return
finally:
Expand Down Expand Up @@ -797,16 +808,18 @@ def _prep_submit_task_job(self, suite, itask, dry_run, check_syntax=True):
return False
itask.local_job_file_path = local_job_file_path

if dry_run:
itask.set_summary_message('job file written (edit/dry-run)')
LOG.debug('[%s] -%s', itask, itask.summary['latest_message'])

job_config = deepcopy(job_conf)
job_config['logfiles'] = deepcopy(itask.summary['logfiles'])
job_config['job_log_dir'] = get_task_job_log(
suite, itask.point, itask.tdef.name, itask.submit_num)
itask.jobs.append(job_config['job_d'])
self.job_pool.insert_job(job_config)

if dry_run:
itask.set_summary_message(self.DRY_RUN_MSG)
self.job_pool.add_job_msg(job_config['job_d'], self.DRY_RUN_MSG)
LOG.debug(f'[{itask}] -{self.DRY_RUN_MSG}')

# Return value used by "cylc submit" and "cylc jobscript":
return itask

Expand Down
1 change: 1 addition & 0 deletions cylc/flow/ws_messages.proto
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ message PbJob {
repeated string extra_logs = 29;
string name = 30; /* filter item */
string cycle_point = 31; /* filter item */
repeated string messages = 32;
}

message PbTask {
Expand Down
Loading

0 comments on commit 8643f6b

Please sign in to comment.