Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Task state code refactor. #1775

Merged
merged 5 commits into from
May 12, 2016
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions bin/cylc
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ control_commands['kill'] = ['kill']
control_commands['hold'] = ['hold']
control_commands['release'] = ['release', 'unhold']
control_commands['reset'] = ['reset']
control_commands['spawn'] = ['spawn']
control_commands['nudge'] = ['nudge']
control_commands['reload'] = ['reload']
control_commands['set-runahead'] = ['set-runahead']
Expand Down Expand Up @@ -439,6 +440,7 @@ comsum['kill'] = 'Kill submitted or running tasks'
comsum['hold'] = 'Hold (pause) suites or individual tasks'
comsum['release'] = 'Release (unpause) suites or individual tasks'
comsum['reset'] = 'Force one or more tasks to change state.'
comsum['spawn'] = 'Force one or more tasks to spawn their successors.'
comsum['nudge'] = 'Cause the cylc task processing loop to be invoked'
comsum['reload'] = 'Reload the suite definition at run time'
comsum['set-runahead'] = 'Change the runahead limit in a running suite.'
Expand Down
6 changes: 3 additions & 3 deletions bin/cylc-email-task
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@ if [[ $# < 4 ]]; then
exit 1
fi

EVENT=$1 # e.g. "failed"
SUITE=$2 # suite name
TASKID=$3 # task ID
EVENT=$1
SUITE=$2
TASKID=$3
MESSAGE=$4

MAIL_SUBJECT="!!cylc alert!! suite $SUITE task $TASKID $EVENT"
Expand Down
3 changes: 1 addition & 2 deletions bin/cylc-kill
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@

"""cylc [control] kill [OPTIONS] ARGS

Kill jobs of active tasks (those in the 'submitted' or 'running' states) and
update their statuses accordingly.
Kill jobs of active tasks and update their statuses accordingly.

To kill one or more tasks, "cylc kill REG TASKID ..."; to kill all active
tasks: "cylc kill REG".
Expand Down
50 changes: 24 additions & 26 deletions bin/cylc-monitor
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,14 @@ from time import sleep

from parsec.OrderedDict import OrderedDict
from cylc.CylcOptionParsers import cop
from cylc.task_state import TaskState
from cylc.network.suite_state import (
SUITE_STATUS_SPLIT_REC, get_suite_status_string, StateSummaryClient,
SuiteStillInitialisingError)
from cylc.wallclock import get_time_string_from_unix_time
from cylc.cfgspec.globalcfg import GLOBAL_CFG
from cylc.task_state import (
TaskState, TASK_STATUS_RUNAHEAD, TASK_STATUSES_ORDERED,
TASK_STATUSES_RESTRICTED)


class SuiteMonitor(object):
Expand All @@ -57,10 +59,9 @@ A terminal-based live suite monitor. Exit with 'Ctrl-C'.""",

self.parser.add_option(
"-r", "--restricted",
help="Restrict display to 'active' task states: submitted, "
"submit-failed, submit-retrying, running, failed, retrying. "
"This may be needed for very large suites. The state summary "
"line still represents all task proxies.",
help="Restrict display to active task states. "
"This may be useful for monitoring very large suites. "
"The state summary line still reflects all task proxies.",
action="store_true", default=False, dest="restricted")

def_sort_order = GLOBAL_CFG.get(["monitor", "sort order"])
Expand Down Expand Up @@ -99,11 +100,11 @@ A terminal-based live suite monitor. Exit with 'Ctrl-C'.""",
client_name += " -r"

legend = ''
for state in TaskState.legal:
legend += "%s%s%s" % (
TaskState.ctrl[state], state, TaskState.ctrl_end)
for state in TASK_STATUSES_ORDERED:
legend += TaskState.get_status_prop(state, 'ascii_ctrl')
legend = legend.rstrip()
len_header = sum(len(s) for s in TaskState.legal)

len_header = sum(len(s) for s in TASK_STATUSES_ORDERED)

self.pclient = StateSummaryClient(
suite, options.owner, options.host, options.pyro_timeout,
Expand All @@ -116,8 +117,7 @@ A terminal-based live suite monitor. Exit with 'Ctrl-C'.""",
except SuiteStillInitialisingError as exc:
print str(exc)
except Exception as exc:
print >> sys.stderr, "\033[1;37;41mERROR%s" % (
TaskState.ctrl_end), str(exc)
print >> sys.stderr, "\033[1;37;41mERROR\033[0m", str(exc)
self.pclient.reset()
else:
states = [t["state"] for t in task_summaries.values() if (
Expand All @@ -126,12 +126,11 @@ A terminal-based live suite monitor. Exit with 'Ctrl-C'.""",
if options.restricted:
task_summaries = dict(
(i, j) for i, j in task_summaries.items() if (
j['state'] in
TaskState.legal_for_restricted_monitoring))
j['state'] in TASK_STATUSES_RESTRICTED))
if not options.display_runahead:
task_summaries = dict(
(i, j) for i, j in task_summaries.items() if (
j['state'] != 'runahead'))
j['state'] != TASK_STATUS_RUNAHEAD))
try:
updated_at = get_time_string_from_unix_time(
glbl['last_updated'])
Expand Down Expand Up @@ -159,8 +158,8 @@ A terminal-based live suite monitor. Exit with 'Ctrl-C'.""",
name_list.add(name)
if point_string not in task_info:
task_info[point_string] = {}
task_info[point_string][name] = "%s%s%s" % (
TaskState.ctrl[state], name, TaskState.ctrl_end)
task_info[point_string][name] = TaskState.get_status_prop(
state, 'ascii_ctrl', subst=name)

# Sort the tasks in each cycle point.
if options.sort_order == "alphanumeric":
Expand All @@ -186,13 +185,12 @@ A terminal-based live suite monitor. Exit with 'Ctrl-C'.""",
suffix = "%s %s" % (client_name, self.pclient.my_uuid)
title_str = ' ' * len_header
title_str = prefix + title_str[len(prefix):]
title_str = '\033[1;37;44m%s%s%s' % (
title_str[:-len(suffix)], suffix, TaskState.ctrl_end)
title_str = '\033[1;37;44m%s%s\033[0m' % (
title_str[:-len(suffix)], suffix)
blit.append(title_str)
blit.append(legend)

updated_str = "updated: %s%s%s" % (
'\033[1;38m', updated_at, TaskState.ctrl_end)
updated_str = "updated: \033[1;38m%s\033[0m" % updated_at
blit.append(updated_str)

summary = 'state summary:'
Expand All @@ -205,8 +203,9 @@ A terminal-based live suite monitor. Exit with 'Ctrl-C'.""",
state_totals.setdefault(state, 0)
state_totals[state] += 1
for state, tot in state_totals.items():
summary += '%s %d %s' % (
TaskState.ctrl[state], tot, TaskState.ctrl_end)
subst = " %d " % tot
summary += TaskState.get_status_prop(state,
'ascii_ctrl', subst)
blit.append(summary)

# Print a divider line containing the suite status string.
Expand All @@ -218,15 +217,14 @@ A terminal-based live suite monitor. Exit with 'Ctrl-C'.""",
status2 = ''
suffix = '_'.join(list(status1.replace(' ', '_'))) + status2
divider_str = '_' * len_header
divider_str = "\033[1;31m%s%s%s" % (
divider_str[:-len(suffix)], suffix, TaskState.ctrl_end)
divider_str = "\033[1;31m%s%s\033[0m" % (
divider_str[:-len(suffix)], suffix)
blit.append(divider_str)

blitlines = {}
for point_str, val in sorted_task_info.items():
indx = point_str
line = "%s%s%s" % (
'\033[1;34m', point_str, TaskState.ctrl_end)
line = "\033[1;34m%s\033[0m" % point_str
for name, info in val.items():
if info is not None:
line += " %s" % info
Expand Down
4 changes: 2 additions & 2 deletions bin/cylc-ping
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@

"""cylc [discovery] ping [OPTIONS] ARGS

If suite REG is running or TASK in suite REG is currently in the 'running'
state exit with success status, else exit with error status."""
If suite REG is running or TASK in suite REG is currently running,
exit with success status, else exit with error status."""

import sys
if '--use-ssh' in sys.argv[1:]:
Expand Down
4 changes: 1 addition & 3 deletions bin/cylc-poll
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@

"""cylc [control] poll [OPTIONS] ARGS

Poll jobs of active tasks (those in the 'submitted' or 'running' states) to
verify or update their statuses - even if they have suffered an external hard
kill.
Poll jobs of active tasks to verify or update their statuses.

To poll one or more tasks, "cylc poll REG TASKID"; to poll all active
tasks: "cylc poll REG".
Expand Down
31 changes: 23 additions & 8 deletions bin/cylc-reset
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,16 @@
"""cylc [control] reset [OPTIONS] ARGS

Force one or more task proxies in a running suite to change state and modify
their prerequisites and outputs accordingly. For example, the 'waiting' state
means "prerequisites not satisfied, outputs not completed"; 'ready' means
"prerequisites satisfied, outputs not completed". Setting a task to 'ready'
generally has the same effect as using the "cylc trigger" command.
their prerequisites and outputs accordingly. For example, --state=waiting
means "prerequisites not satisfied, outputs not completed"; --state=ready means
"prerequisites satisfied, outputs not completed" (this generally has the same
effect as using the "cylc trigger" command).

"cylc reset --state=spawn" is deprecated: use "cylc spawn" instead.

See the documentation for the -s/--state option for legal reset states."""

import os
import sys
if '--use-ssh' in sys.argv[1:]:
sys.argv.remove('--use-ssh')
Expand All @@ -37,7 +40,7 @@ import cylc.flags
from cylc.prompt import prompt
from cylc.network.suite_command import SuiteCommandClient
from cylc.CylcOptionParsers import cop
from cylc.task_state import TaskState
from cylc.task_state import TASK_STATUSES_CAN_RESET_TO


def main():
Expand All @@ -49,15 +52,27 @@ def main():

parser.add_option(
"-s", "--state", metavar="STATE",
help="Reset task state to STATE to on of %s" % (
', '.join(TaskState.legal_for_reset)),
help="Reset task state to STATE, can be %s" % (
', '.join(TASK_STATUSES_CAN_RESET_TO)),
action="store", default=None, dest="state")

options, args = parser.parse_args()

suite = args.pop(0)

if options.state not in TaskState.legal_for_reset:
if options.state == "spawn":
# Back compat.
sys.stderr.write(
"'cylc reset -s spawn' is deprecated; calling 'cylc spawn'\n")
cmd = sys.argv[0].replace('reset', 'spawn')
try:
os.execvp(cmd, [cmd] + args)
except OSError, exc:
if exc.filename is None:
exc.filename = cmd
raise SystemExit(exc)

if options.state not in TASK_STATUSES_CAN_RESET_TO:
parser.error("Illegal STATE value: " + options.state)

prompt('Reset task(s) %s in %s' % (args, suite), options.force)
Expand Down
Loading