diff --git a/bin/cylc-checkpoint b/bin/cylc-checkpoint
index 5cbe9a66bbd..bc8391fb62a 100755
--- a/bin/cylc-checkpoint
+++ b/bin/cylc-checkpoint
@@ -46,7 +46,7 @@ def main():
pclient = SuiteRuntimeClient(
suite, options.owner, options.host, options.port,
options.comms_timeout)
- pclient('take_checkpoints', {'items': [name]})
+ pclient('take_checkpoints', {'name': name})
if __name__ == "__main__":
diff --git a/bin/cylc-hold b/bin/cylc-hold
index f52576199d7..10d74d43d7c 100755
--- a/bin/cylc-hold
+++ b/bin/cylc-hold
@@ -18,7 +18,7 @@
"""cylc [control] hold [OPTIONS] ARGS
-Hold one or more waiting tasks (cylc hold REG TASKID ...), or
+Hold one or more waiting tasks (cylc hold REG TASK_GLOB ...), or
a whole suite (cylc hold REG).
Held tasks do not submit even if they are ready to run.
@@ -45,18 +45,17 @@ def main():
__doc__, comms=True, multitask=True,
argdoc=[
("REG", "Suite name"),
- ('[TASKID ...]', 'Task identifiers')])
+ ('[TASK_GLOB ...]', 'Task matching patterns')])
parser.add_option(
"--after",
help="Hold whole suite AFTER this cycle point.",
metavar="CYCLE_POINT", action="store", dest="hold_point_string")
- options, args = parser.parse_args()
+ options, (suite, *task_globs) = parser.parse_args()
- suite = args.pop(0)
- if args:
- prompt('Hold task(s) %s in %s' % (args, suite), options.force)
+ if task_globs:
+ prompt('Hold task(s) %s in %s' % (task_globs, suite), options.force)
elif options.hold_point_string:
prompt(
'Hold suite after %s' % options.hold_point_string, options.force)
@@ -66,11 +65,10 @@ def main():
pclient = SuiteRuntimeClient(
suite, options.owner, options.host, options.port)
- if args:
- items = parser.parse_multitask_compat(options, args)
+ if task_globs:
pclient(
'hold_tasks',
- {'items': items},
+ {'task_globs': task_globs},
timeout=options.comms_timeout
)
elif options.hold_point_string:
diff --git a/bin/cylc-insert b/bin/cylc-insert
index 51d85e2232e..acd1de6aaa8 100755
--- a/bin/cylc-insert
+++ b/bin/cylc-insert
@@ -64,28 +64,13 @@ def main():
"--no-check", help="Add task even if the provided cycle point is not "
"valid for the given task.", action="store_true", default=False)
- options, args = parser.parse_args()
- suite = args.pop(0)
-
- # See "cop.parse_multitask_compat" for back compat discussion
- # "cop.parse_multitask_compat" cannot be used here because argument 3
- # (after suite argument) used to be "options.stop_point_string".
- if (options.multitask_compat and len(args) in [2, 3] and
- all("/" not in arg for arg in args) and
- all("." not in arg for arg in args[1:])):
- items = [(args[0] + "." + args[1])]
- if len(args) == 3:
- options.stop_point_string = args[2]
- prompt(
- 'Insert %s in %s' % (items, suite), options.force)
- else:
- items = args
- for i, item in enumerate(items):
- if not TaskID.is_valid_id_2(item):
- raise UserInputError(
- '"%s": invalid task ID (argument %d)' % (
- item, i + 1))
- prompt('Insert %s in %s' % (items, suite), options.force)
+ options, (suite, *items) = parser.parse_args()
+
+ for i, item in enumerate(items):
+ if not TaskID.is_valid_id_2(item):
+ raise UserInputError(
+ '"%s": invalid task ID (argument %d)' % (item, i + 1))
+ prompt('Insert %s in %s' % (items, suite), options.force)
pclient = SuiteRuntimeClient(
suite, options.owner, options.host, options.port)
diff --git a/bin/cylc-kill b/bin/cylc-kill
index f8755805fe7..10a372ec057 100755
--- a/bin/cylc-kill
+++ b/bin/cylc-kill
@@ -20,7 +20,7 @@
Kill jobs of active tasks and update their statuses accordingly.
-To kill one or more tasks, "cylc kill REG TASKID ..."; to kill all active
+To kill one or more tasks, "cylc kill REG TASK_GLOB ..."; to kill all active
tasks: "cylc kill REG".
"""
@@ -44,20 +44,19 @@ def main():
__doc__, comms=True, multitask=True,
argdoc=[
('REG', 'Suite name'),
- ('[TASKID ...]', 'Task identifiers')])
+ ('[TASK_GLOB ...]', 'Task matching patterns')])
- options, args = parser.parse_args()
+ options, (suite, *task_globs) = parser.parse_args()
- suite = args.pop(0)
- if args:
- prompt('Kill task %s in %s' % (args, suite), options.force)
+ if task_globs:
+ prompt('Kill task %s in %s' % (task_globs, suite), options.force)
else:
prompt('Kill ALL tasks in %s' % (suite), options.force)
pclient = SuiteRuntimeClient(
suite, options.owner, options.host, options.port)
pclient(
'kill_tasks',
- {'items': parser.parse_multitask_compat(options, args)},
+ {'task_globs': task_globs},
timeout=options.comms_timeout
)
diff --git a/bin/cylc-poll b/bin/cylc-poll
index 558e36908f6..76243a5aa86 100755
--- a/bin/cylc-poll
+++ b/bin/cylc-poll
@@ -20,8 +20,8 @@
Poll (query) task jobs to verify and update their statuses.
-Use "cylc poll REG" to poll all active tasks, or "cylc poll REG TASKID" to poll
-individual tasks or families, or groups of them.
+Use "cylc poll REG" to poll all active tasks, or "cylc poll REG TASK_GLOB"
+to poll individual tasks or families, or groups of them.
"""
import sys
@@ -43,29 +43,25 @@ def main():
__doc__, comms=True, multitask=True,
argdoc=[
('REG', 'Suite name'),
- ('[TASKID ...]', 'Task identifiers')])
+ ('[TASK_GLOB ...]', 'Task matching patterns')])
parser.add_option(
"-s", "--succeeded", help="Allow polling of succeeded tasks.",
action="store_true", default=False, dest="poll_succ")
- options, args = parser.parse_args()
+ options, (suite, *task_globs) = parser.parse_args()
- suite = args.pop(0)
- if args:
- prompt('Poll task %s in %s' % (args, suite), options.force)
+ if task_globs:
+ prompt('Poll task %s in %s' % (task_globs, suite), options.force)
else:
prompt('Poll ALL tasks in %s' % (suite), options.force)
pclient = SuiteRuntimeClient(
suite, options.owner, options.host, options.port,
options.comms_timeout)
- items = parser.parse_multitask_compat(options, args)
- # Back compat: back_out introduced >7.5.0
- # So don't call with "poll_succ" if not necessary to avoid breakage.
- if options.poll_succ:
- pclient('poll_tasks', {'items': items, 'poll_succ': options.poll_succ})
- else:
- pclient('poll_tasks', {'items': items})
+ pclient(
+ 'poll_tasks',
+ {'task_globs': task_globs, 'poll_succ': options.poll_succ}
+ )
if __name__ == "__main__":
diff --git a/bin/cylc-release b/bin/cylc-release
index a57ddd50c29..0227d590872 100755
--- a/bin/cylc-release
+++ b/bin/cylc-release
@@ -18,7 +18,7 @@
"""cylc [control] release|unhold [OPTIONS] ARGS
-Release one or more held tasks (cylc release REG TASKID)
+Release one or more held tasks (cylc release REG TASK_GLOB)
or the whole suite (cylc release REG). Held tasks do not
submit even if they are ready to run.
@@ -44,21 +44,22 @@ def main():
__doc__, comms=True, multitask=True,
argdoc=[
("REG", 'Suite name'),
- ('[TASKID ...]', 'Task identifiers')])
+ ('[TASK_GLOB ...]', 'Task matching patterns')])
- options, args = parser.parse_args()
- suite = args.pop(0)
+ options, (suite, *task_globs) = parser.parse_args()
- if args:
- prompt('Release task(s) %s in %s' % (args, suite), options.force)
+ if task_globs:
+ prompt('Release task(s) %s in %s' % (task_globs, suite), options.force)
else:
prompt('Release suite %s' % suite, options.force)
pclient = SuiteRuntimeClient(
suite, options.owner, options.host, options.port,
options.comms_timeout)
- if args:
- items = parser.parse_multitask_compat(options, args)
- pclient('release_tasks', {'items': items})
+ if task_globs:
+ pclient(
+ 'release_tasks',
+ {'task_globs': task_globs}
+ )
else:
pclient('release_suite')
diff --git a/bin/cylc-remove b/bin/cylc-remove
index 3b6fb1147a2..a557d9d113c 100755
--- a/bin/cylc-remove
+++ b/bin/cylc-remove
@@ -18,7 +18,7 @@
"""cylc [control] remove [OPTIONS] ARGS
-Remove one or more tasks (cylc remove REG TASKID), or all tasks with a
+Remove one or more tasks (cylc remove REG TASK_GLOB), or all tasks with a
given cycle point (cylc remove REG *.POINT) from a running suite.
Tasks will spawn successors first if they have not done so already.
@@ -43,24 +43,22 @@ def main():
__doc__, comms=True, multitask=True,
argdoc=[
("REG", "Suite name"),
- ('TASKID [...]', 'Task identifiers')])
+ ('TASK_GLOB [...]', 'Task matching patterns')])
parser.add_option(
"--no-spawn",
help="Do not spawn successors before removal.",
action="store_true", default=False, dest="no_spawn")
- options, args = parser.parse_args()
+ options, (suite, *task_globs) = parser.parse_args()
- suite = args.pop(0)
- prompt('remove task(s) %s in %s' % (args, suite), options.force)
+ prompt('remove task(s) %s in %s' % (task_globs, suite), options.force)
pclient = SuiteRuntimeClient(
suite, options.owner, options.host, options.port,
options.comms_timeout)
- items = parser.parse_multitask_compat(options, args)
pclient(
'remove_tasks',
- {'items': items, 'spawn': (not options.no_spawn)}
+ {'task_globs': task_globs, 'spawn': (not options.no_spawn)}
)
diff --git a/bin/cylc-reset b/bin/cylc-reset
index ef7a1a593d2..d3651d58090 100755
--- a/bin/cylc-reset
+++ b/bin/cylc-reset
@@ -53,7 +53,7 @@ def main():
__doc__, comms=True, multitask=True,
argdoc=[
('REG', 'Suite name'),
- ('[TASKID ...]', 'Task identifiers')])
+ ('[TASK_GLOB ...]', 'Task matching patterns')])
parser.add_option(
"-s", "--state", metavar="STATE",
@@ -71,9 +71,7 @@ def main():
"Can be used more than once to reset multiple task outputs."),
action="append", default=[], dest="outputs")
- options, args = parser.parse_args()
-
- suite = args.pop(0)
+ options, (suite, *task_globs) = parser.parse_args()
if not options.state and not options.outputs:
parser.error("Neither --state=STATE nor --output=OUTPUT is set")
@@ -84,7 +82,7 @@ def main():
"'cylc reset -s spawn' is deprecated; calling 'cylc spawn'\n")
cmd = sys.argv[0].replace('reset', 'spawn')
try:
- os.execvp(cmd, [cmd] + args)
+ os.execvp(cmd, [cmd] + task_globs)
except OSError as exc:
if exc.filename is None:
exc.filename = cmd
@@ -93,14 +91,14 @@ def main():
if not options.state:
options.state = ''
- prompt('Reset task(s) %s in %s' % (args, suite), options.force)
+ prompt('Reset task(s) %s in %s' % (task_globs, suite), options.force)
pclient = SuiteRuntimeClient(
suite, options.owner, options.host, options.port,
options.comms_timeout)
- items = parser.parse_multitask_compat(options, args)
pclient(
'reset_task_states',
- {'items': items, 'state': options.state, 'outputs': options.outputs}
+ {'task_globs': task_globs, 'state': options.state,
+ 'outputs': options.outputs}
)
diff --git a/bin/cylc-show b/bin/cylc-show
index e5bfb9b5a30..a9dd0187296 100755
--- a/bin/cylc-show
+++ b/bin/cylc-show
@@ -45,7 +45,7 @@ def main():
__doc__, comms=True, noforce=True, multitask=True,
argdoc=[
('REG', 'Suite name'),
- ('[TASKID ...]', 'Task names or identifiers')])
+ ('[TASKS ...]', 'Task names or ids (name.cycle)')])
parser.add_option('--list-prereqs', action="store_true", default=False,
help="Print a task's pre-requisites as a list.")
@@ -53,9 +53,8 @@ def main():
parser.add_option('--json', action="store_true", default=False,
help="Print output in JSON format.")
- options, args = parser.parse_args()
- suite = args[0]
- task_args = args[1:]
+ options, (suite, *task_args) = parser.parse_args()
+
pclient = SuiteRuntimeClient(
suite, options.owner, options.host, options.port,
options.comms_timeout)
@@ -85,8 +84,10 @@ def main():
print("%s: %s" % (key, value or "(not given)"))
if task_ids:
- results, bad_items = pclient('get_task_requisites', {
- 'items': task_ids, 'list_prereqs': options.list_prereqs})
+ results, bad_items = pclient(
+ 'get_task_requisites',
+ {'task_globs': task_ids, 'list_prereqs': options.list_prereqs}
+ )
if options.json:
json_filter.append(results)
else:
diff --git a/bin/cylc-spawn b/bin/cylc-spawn
index 218f4af4721..42272c06870 100755
--- a/bin/cylc-spawn
+++ b/bin/cylc-spawn
@@ -42,20 +42,18 @@ def main():
__doc__, comms=True, multitask=True,
argdoc=[
('REG', 'Suite name'),
- ('[TASKID ...]', 'Task identifiers')])
+ ('[TASK_GLOB ...]', 'Task matching patterns')])
- options, args = parser.parse_args()
+ options, (suite, *task_globs) = parser.parse_args()
- suite = args.pop(0)
-
- prompt('Spawn task(s) %s in %s' % (args, suite), options.force)
+ prompt('Spawn task(s) %s in %s' % (task_globs, suite), options.force)
pclient = SuiteRuntimeClient(
suite, options.owner, options.host, options.port,
options.comms_timeout)
pclient(
'spawn_tasks',
- {'items': parser.parse_multitask_compat(options, args)}
+ {'task_globs': task_globs}
)
diff --git a/bin/cylc-trigger b/bin/cylc-trigger
index 22d5a3bd01a..65e9684f897 100755
--- a/bin/cylc-trigger
+++ b/bin/cylc-trigger
@@ -64,7 +64,7 @@ def main():
__doc__, comms=True, multitask=True,
argdoc=[
('REG', 'Suite name'),
- ('[TASKID ...]', 'Task identifiers')])
+ ('[TASK_GLOB ...]', 'Task matching patterns')])
parser.add_option(
"-e", "--edit",
@@ -76,10 +76,9 @@ def main():
help="(with --edit) force use of the configured GUI editor.",
action="store_true", default=False, dest="geditor")
- options, args = parser.parse_args()
- suite = args.pop(0)
+ options, (suite, *task_globs) = parser.parse_args()
- msg = 'Trigger task(s) %s in %s' % (args, suite)
+ msg = 'Trigger task(s) %s in %s' % (task_globs, suite)
prompt(msg, options.force)
pclient = SuiteRuntimeClient(
@@ -88,8 +87,7 @@ def main():
aborted = False
if options.edit_run:
- items = parser.parse_multitask_compat(options, args)
- task_id = items[0]
+ task_id = task_globs[0]
# Check that TASK is a unique task.
success, msg = pclient(
'ping_task',
@@ -115,7 +113,8 @@ def main():
old_mtime = None
# Tell the suite server program to generate the job file.
- pclient('dry_run_tasks', {'items': [task_id], 'check_syntax': False})
+ pclient('dry_run_tasks', {
+ 'task_globs': [task_id], 'check_syntax': False})
# Wait for the new job file to be written. Use mtime because the same
# file could potentially exist already, left from a previous run.
@@ -191,13 +190,10 @@ def main():
aborted = True
# Trigger the task proxy(s).
- items = parser.parse_multitask_compat(options, args)
- # Back compat: back_out introduced >7.5.0
- # So don't call with "back_out" if not necessary to avoid breakage.
- if aborted:
- pclient('trigger_tasks', {'items': items, 'back_out': aborted})
- else:
- pclient('trigger_tasks', {'items': items})
+ pclient(
+ 'trigger_tasks',
+ {'task_globs': task_globs, 'back_out': aborted}
+ )
if __name__ == "__main__":
diff --git a/doc/src/api/index.rst b/doc/src/api/index.rst
new file mode 100644
index 00000000000..03c20de4f5e
--- /dev/null
+++ b/doc/src/api/index.rst
@@ -0,0 +1,7 @@
+Cylc API
+========
+
+
+.. toctree::
+
+ zmq
diff --git a/doc/src/api/zmq.rst b/doc/src/api/zmq.rst
new file mode 100644
index 00000000000..a91d09fbe65
--- /dev/null
+++ b/doc/src/api/zmq.rst
@@ -0,0 +1,50 @@
+Suite Runtime Interface
+=======================
+
+
+Cylc suites are TCP servers which use the ZeroMQ protocol to communicate with
+clients and jobs.
+
+Cylc provides a Python client to communicate with this server
+:py:class:`cylc.network.client.SuiteRuntimeClient`
+
+.. code-block:: python
+
+ >>> from cylc.network.client import SuiteRuntimeClient
+ >>> client = SuiteRuntimeClient('my-suite')
+ >>> client('ping_suite')
+ True
+
+Cylc also provides sub-command called ``cylc client`` which is a simple
+wrapper of the Python client.
+
+.. code-block:: console
+
+ $ cylc client generic ping_suite -n
+ true
+
+The available "commands" or ("endpoints") are contained in
+:py:class:`cylc.network.server.SuiteRuntimeServer` class.
+
+
+Privilege Levels
+----------------
+
+Cylc protects its network interface with configurable privilege levels which
+can be used to allocate different levels of control to different users.
+
+.. autoclass:: cylc.network.Priv
+ :members:
+
+
+Client
+------
+
+.. autoclass:: cylc.network.client.SuiteRuntimeClient
+
+
+Server
+------
+
+.. autoclass:: cylc.network.server.SuiteRuntimeServer
+ :members:
diff --git a/doc/src/appendices/site-user-config-ref.rst b/doc/src/appendices/site-user-config-ref.rst
index 6e1401336e6..82ca6ffb26b 100644
--- a/doc/src/appendices/site-user-config-ref.rst
+++ b/doc/src/appendices/site-user-config-ref.rst
@@ -1095,23 +1095,7 @@ password.
This sets the client privilege level for public access - i.e. no
suite passphrase required.
-- *type*: string (must be one of the following options)
-- *options*:
+- *type*: string (must be one of the following options).
+- *options*: A Cylc privilege level: :py:obj:`cylc.network.Priv`.
- none
- Permit no public suite access.
- identity
- Only suite and owner names revealed.
- description
- Identity plus suite title and description.
- state-totals
- Identity, description, and task state totals.
- read
- Full read-only access.
- shutdown
- *Not yet implemented*
- Full read access plus shutdown, but no other control.
- control
- Permit full control (not recommended).
-
-- *default*: state-totals
+- *default*: :py:obj:`cylc.network.Priv.STATE_TOTALS`
diff --git a/doc/src/index.rst b/doc/src/index.rst
index a98bc415426..b36e7131b2b 100644
--- a/doc/src/index.rst
+++ b/doc/src/index.rst
@@ -1,5 +1,3 @@
-.. cylc documentation master file.
-
Cylc documentation
==================
@@ -42,6 +40,8 @@ indefinitely.
running-suites
suite-storage-etc
+ api/index
+
appendices/appendices-master
suite-design-guide/suite-design-guide-master
diff --git a/doc/src/running-suites.rst b/doc/src/running-suites.rst
index 8d33f2fd9a1..febf9fdc1dd 100644
--- a/doc/src/running-suites.rst
+++ b/doc/src/running-suites.rst
@@ -504,23 +504,9 @@ server program is determined by the public access privilege level set in global
site/user config (:ref:`GlobalAuth`) and optionally overridden in suites
(:ref:`SuiteAuth`):
-none
- Permit no public suite access.
-identity
- Only suite and owner names revealed.
-description
- Identity plus suite title and description.
-state-totals
- Identity, description, and task state totals.
-read
- Full read-only access.
-shutdown
- *Not yet implemented*
- Full read access plus shutdown, but no other control.
-control
- Permit full control (not recommended).
-
-The default public access level is *state-totals*.
+See Cylc privilege levels: :py:obj:`cylc.network.Priv`.
+
+The default public access level is :py:obj:`cylc.network.Priv.STATE_TOTALS`.
The ``cylc scan`` command can print
descriptions and task state totals in addition to basic suite identity, if the
diff --git a/lib/cylc/network/__init__.py b/lib/cylc/network/__init__.py
index 072f9791263..f0ae94a1085 100644
--- a/lib/cylc/network/__init__.py
+++ b/lib/cylc/network/__init__.py
@@ -30,18 +30,36 @@
class Priv(IntEnum):
- """Cylc privilege level."""
+ """Cylc privilege levels.
- # TODO - autodocument from this class.
- # TODO - revert name changes?
+ In Cylc configurations use the lower-case form of each privilege level
+ e.g. ``control`` for ``Priv.CONTROL``.
+
+ These levels are ordered (by the integer associated with each) from 0.
+ Each privilege level grants access to the levels below it.
+
+ """
CONTROL = 6
+ """Provides full control of a suite."""
+
SHUTDOWN = 5 # (Not used yet - for the post-passphrase era.)
+ """Allows issuing of the shutdown command."""
+
READ = 4
+ """Permits read access to the suite's state."""
+
STATE_TOTALS = 3
+ """Provides access to the count of tasks in each state."""
+
DESCRIPTION = 2
+ """Permits reading of suite metadata."""
+
IDENTITY = 1
+ """Provides read access to the suite name, owner and Cylc version."""
+
NONE = 0
+ """No access."""
@classmethod
def parse(cls, key):
diff --git a/lib/cylc/network/client.py b/lib/cylc/network/client.py
index 7b046f1ee39..36a1ae57949 100644
--- a/lib/cylc/network/client.py
+++ b/lib/cylc/network/client.py
@@ -108,23 +108,9 @@ def __init__(self, host, port, encode_method, decode_method, secret_method,
self.header = dict(header)
async def async_request(self, command, args=None, timeout=None):
- """Send a request.
-
- For convenience use __call__ to call this method.
+ """Send an asynchronous request using asyncio.
- Args:
- command (str): The name of the endpoint to call.
- args (dict): Arguments to pass to the endpoint function.
- timeout (float): Override the default timeout (seconds).
-
- Raises:
- ClientTimeout: If a response takes longer than timeout to arrive.
- ClientError: Coverall for all other issues including failed
- authentication.
-
- Returns:
- object: The data exactly as returned from the endpoint function,
- nothing more, nothing less.
+ Has the same arguments and return values as ``serial_request``.
"""
if timeout:
@@ -169,6 +155,24 @@ async def async_request(self, command, args=None, timeout=None):
raise ClientError(error['message'], error.get('traceback'))
def serial_request(self, command, args=None, timeout=None):
+ """Send a request.
+
+ For convenience use ``__call__`` to call this method.
+
+ Args:
+ command (str): The name of the endpoint to call.
+ args (dict): Arguments to pass to the endpoint function.
+ timeout (float): Override the default timeout (seconds).
+
+ Raises:
+ ClientTimeout: If a response takes longer than timeout to arrive.
+ ClientError: Coverall for all other issues including failed auth.
+
+ Returns:
+ object: The data exactly as returned from the endpoint function,
+ nothing more, nothing less.
+
+ """
return asyncio.run(
self.async_request(command, args, timeout))
@@ -198,6 +202,16 @@ class SuiteRuntimeClient:
If there is no socket bound to the specified host/port the client will
bail after ``timeout`` seconds.
+ Call server "endpoints" using:
+
+ ``__call__``, ``serial_request``
+
+ .. automethod:: cylc.network.client.ZMQClient.serial_request
+
+ ``async_request``
+
+ .. automethod:: cylc.network.client.ZMQClient.async_request
+
"""
NOT_RUNNING = "Contact info not found for suite \"%s\", suite not running?"
diff --git a/lib/cylc/network/server.py b/lib/cylc/network/server.py
index 25c19ddecac..45161b3a084 100644
--- a/lib/cylc/network/server.py
+++ b/lib/cylc/network/server.py
@@ -17,8 +17,10 @@
# along with this program. If not, see .
"""Server for suite runtime API."""
+from functools import wraps
import getpass
from queue import Queue
+from textwrap import dedent
from time import sleep
from threading import Thread
@@ -81,7 +83,7 @@ def __init__(self, encode_method, decode_method, secret_method):
self.decode = decode_method
self.secret = secret_method
- def start(self, ports):
+ def start(self, min_port, max_port):
"""Start the server running
Args:
@@ -94,17 +96,8 @@ def start(self, ports):
self.socket = self.context.socket(zmq.REP)
self.socket.RCVTIMEO = int(self.RECV_TIMEOUT) * 1000
- # pick port
- for port in ports:
- try:
- self.socket.bind('tcp://*:%d' % port)
- except zmq.error.ZMQError:
- pass
- else:
- self.port = port
- break
- else:
- raise IOError('No room at the inn, all ports occupied.')
+ self.port = self.socket.bind_to_random_port(
+ 'tcp://*', min_port, max_port)
# start accepting requests
self.register_endpoints()
@@ -153,8 +146,9 @@ def _listener(self):
except Exception as exc: # purposefully catch generic exception
# failed to decode message, possibly resulting from failed
# authentication
- response = self.encode(
- {'error': {'message': str(exc)}}, self.secret())
+ import traceback
+ return {'error': {
+ 'message': str(exc), 'traceback': traceback.format_exc()}}
else:
# success case - serve the request
LOG.debug('zmq:recv %s', message)
@@ -222,13 +216,14 @@ def authorise(req_priv_level):
"""
def wrapper(fcn):
+ @wraps(fcn) # preserve args and docstrings
def _authorise(self, *args, user='?', meta=None, **kwargs):
if not meta:
meta = {}
host = meta.get('host', '?')
prog = meta.get('prog', '?')
- usr_priv_level = self.get_priv_level(user)
+ usr_priv_level = self._get_priv_level(user)
if usr_priv_level < req_priv_level:
LOG.warn(
"[client-connect] DENIED (privilege '%s' < '%s') %s@%s:%s",
@@ -237,6 +232,9 @@ def _authorise(self, *args, user='?', meta=None, **kwargs):
LOG.info(
'[client-command] %s %s@%s:%s', fcn.__name__, user, host, prog)
return fcn(self, *args, **kwargs)
+ _authorise.__doc__ += ( # add auth level to docstring
+ 'Authentication:\n%s:py:obj:`cylc.network.%s`\n' % (
+ ' ' * 12, req_priv_level))
return _authorise
return wrapper
@@ -244,7 +242,32 @@ def _authorise(self, *args, user='?', meta=None, **kwargs):
class SuiteRuntimeServer(ZMQServer):
"""Suite runtime service API facade exposed via zmq.
- This class contains the cylc endpoints.
+ This class contains the Cylc endpoints.
+
+ Common Arguments:
+ Arguments which are shared between multiple commands.
+
+ .. _task identifier:
+
+ task identifier (str):
+ A task identifier in the format ``task.cycle-point``
+ e.g. ``foo.1`` or ``bar.20000101T0000Z``.
+
+ .. _task globs:
+
+ task globs (list):
+ A list of strings in the format
+ ``name[.cycle_point][:task_state]`` where ``name`` could be a
+ task or family name.
+
+ Glob-like patterns may be used to match multiple items e.g.
+
+ ``*``
+ Matches everything.
+ ``*.1``
+ Matches everything in cycle ``1``.
+ ``*.*:failed``
+ Matches all failed tasks.
"""
@@ -260,65 +283,140 @@ def __init__(self, schd):
self.schd = schd
self.public_priv = None # update in get_public_priv()
- def get_public_priv(self):
+ def _get_public_priv(self):
"""Return the public privilege level of this suite."""
if self.schd.config.cfg['cylc']['authentication']['public']:
return Priv.parse(
self.schd.config.cfg['cylc']['authentication']['public'])
return Priv.parse(glbl_cfg().get(['authentication', 'public']))
- def get_priv_level(self, user):
+ def _get_priv_level(self, user):
"""Return the privilege level for the given user for this suite."""
if user == getpass.getuser():
return Priv.CONTROL
if self.public_priv is None:
# cannot do this on initialisation as the suite configuration has
# not yet been parsed
- self.public_priv = self.get_public_priv()
+ self.public_priv = self._get_public_priv()
return self.public_priv
+ @authorise(Priv.IDENTITY)
+ @ZMQServer.expose
+ def api(self, endpoint=None):
+ """Return information about this API.
+
+ Returns a list of callable endpoints.
+
+ Args:
+ endpoint (str, optional):
+ If specified the documentation for the endpoint
+ will be returned instead.
+
+ Returns:
+ list/str: List of endpoints or string documentation of the
+ requested endpoint.
+
+ """
+ if not endpoint:
+ return [
+ method for method in dir(self)
+ if getattr(getattr(self, method), 'exposed', False)
+ ]
+
+ try:
+ method = getattr(self, endpoint)
+ except AttributeError:
+ return 'No method by name "%s"' % endpoint
+ if method.exposed:
+ head, tail = method.__doc__.split('\n', 1)
+ tail = dedent(tail)
+ return '%s\n%s' % (head, tail)
+ return 'No method by name "%s"' % endpoint
+
@authorise(Priv.CONTROL)
@ZMQServer.expose
def clear_broadcast(
self, point_strings=None, namespaces=None, cancel_settings=None):
"""Clear settings globally, or for listed namespaces and/or points.
- Return a tuple (modified_settings, bad_options), where:
- * modified_settings is similar to the return value of the "put" method,
- but for removed settings.
- * bad_options is a dict in the form:
- {"point_strings": ["20020202", ..."], ...}
- The dict is only populated if there are options not associated with
- previous broadcasts. The keys can be:
- * point_strings: a list of bad point strings.
- * namespaces: a list of bad namespaces.
- * cancel: a list of tuples. Each tuple contains the keys of a bad
- setting.
+ Args:
+ point_strings (list, optional):
+ List of point strings for this operation to apply to or
+ ``None`` to apply to all cycle points.
+ namespaces (list, optional):
+ List of namespace string (task / family names) for this
+ operation to apply to or ``None`` to apply to all namespaces.
+ cancel_settings (list, optional):
+ List of broadcast keys to cancel.
+
+ Returns:
+ tuple: (modified_settings, bad_options)
+
+ modified_settings
+ similar to the return value of the "put" method, but for
+ removed settings.
+ bad_options
+ A dict in the form:
+ ``{"point_strings": ["20020202", ..."], ...}``.
+ The dict is only populated if there are options not
+ associated with previous broadcasts. The keys can be:
+
+ * point_strings: a list of bad point strings.
+ * namespaces: a list of bad namespaces.
+ * cancel: a list of tuples. Each tuple contains the keys of
+ a bad setting.
+
"""
return self.schd.task_events_mgr.broadcast_mgr.clear_broadcast(
point_strings, namespaces, cancel_settings)
@authorise(Priv.CONTROL)
@ZMQServer.expose
- def dry_run_tasks(self, items, check_syntax=True):
+ def dry_run_tasks(self, task_globs, check_syntax=True):
"""Prepare job file for a task.
- items[0] is an identifier for matching a task proxy.
+ Args:
+ task_globs (list): List of identifiers, see `task globs`_
+ check_syntax (bool, optional): Check shell syntax.
+
+ Returns:
+ tuple: (outcome, message)
+
+ outcome (bool)
+ True if command successfully queued.
+ message (str)
+ Information about outcome.
+
"""
- self.schd.command_queue.put(('dry_run_tasks', (items,),
+ self.schd.command_queue.put(('dry_run_tasks', (task_globs,),
{'check_syntax': check_syntax}))
return (True, 'Command queued')
@authorise(Priv.CONTROL)
@ZMQServer.expose
def expire_broadcast(self, cutoff=None):
- """Clear all settings targeting cycle points earlier than cutoff."""
+ """Clear all settings targeting cycle points earlier than cutoff.
+
+ Args:
+ cutoff (str, optional):
+ Cycle point, broadcasts earlier than but not inclusive of the
+ cutoff will be canceled.
+
+ """
return self.schd.task_events_mgr.broadcast_mgr.expire_broadcast(cutoff)
@authorise(Priv.READ)
@ZMQServer.expose
def get_broadcast(self, task_id=None):
- """Retrieve all broadcast variables that target a given task ID."""
+ """Retrieve all broadcast variables that target a given task ID.
+
+ Args:
+ task_id (str, optional): A `task identifier`_
+
+ Returns:
+ dict: all broadcast variables that target the given task ID.
+
+ """
return self.schd.task_events_mgr.broadcast_mgr.get_broadcast(task_id)
@authorise(Priv.IDENTITY)
@@ -333,7 +431,55 @@ def get_graph_raw(self, start_point_string, stop_point_string,
group_nodes=None, ungroup_nodes=None,
ungroup_recursive=False, group_all=False,
ungroup_all=False):
- """Return raw suite graph."""
+ """Return a textural representation of the suite graph.
+
+ .. warning::
+
+ The grouping options:
+
+ * ``group_nodes``
+ * ``ungroup_nodes``
+ * ``group_all``
+ * ``ungroup_all``
+
+ Are mutually exclusive.
+
+ Args:
+ start_point_string (str):
+ Cycle point as a string to define the window of view of the
+ suite graph.
+ stop_point_string (str):
+ Cycle point as a string to define the window of view of the
+ suite graph.
+ group_nodes (list, optional):
+ List of (graph nodes) family names to group (collapse according
+ to inheritance) in the output graph.
+ ungroup_nodes (list, optional):
+ List of (graph nodes) family names to ungroup (expand according
+ to inheritance) in the output graph.
+ ungroup_recursive (bool, optional):
+ Recursively ungroup families.
+ group_all (bool, optional):
+ Group all families (collapse according to inheritance).
+ ungroup_all (bool, optional):
+ Ungroup all families (expand according to inheritance).
+
+ Returns:
+ list: [left, right, None, is_suicide, condition]
+
+ left (str):
+ `Task identifier ` for the dependency of
+ an edge.
+ right (str):
+ `Task identifier ` for the dependant task
+ of an edge.
+ is_suicide (bool):
+ True if edge represents a suicide trigger.
+ condition:
+ Conditional expression if edge represents a conditional trigger
+ else ``None``.
+
+ """
# Ensure that a "None" str is converted to the None value.
return self.schd.info_get_graph_raw(
start_point_string, stop_point_string,
@@ -346,38 +492,96 @@ def get_graph_raw(self, start_point_string, stop_point_string,
@authorise(Priv.DESCRIPTION)
@ZMQServer.expose
def get_suite_info(self):
- """Return a dict containing the suite title and description."""
+ """Return a dictionary containing the suite title and description.
+
+ Returns:
+ dict: The `[meta]` section of a suite configuration
+
+ """
return self.schd.info_get_suite_info()
@authorise(Priv.READ)
@ZMQServer.expose
def get_suite_state_summary(self):
- """Return the global, task, and family summary data structures."""
+ """Return the global, task, and family summary data summaries.
+
+ Returns:
+ tuple: (global_summary, task_summary, family_summary)
+
+ global_summary (dict):
+ Contains suite status items e.g. ``last_updated``.
+ task_summary (dict):
+ A dictionary of `task identifiers `_
+ in the format ``{task_id: {...}, ...}``.
+ family_summary (dict):
+ Contains task family information in the format
+ ``{family_id: {...}, ...}``.
+
+ """
return self.schd.info_get_suite_state_summary()
@authorise(Priv.READ)
@ZMQServer.expose
def get_task_info(self, names):
- """Return info of a task."""
+ """Return the configurations for the provided tasks.
+
+ Args:
+ names (list): A list of task names to request information for.
+
+ Returns:
+ dict: Dictionary in the format ``{'task': {...}, ...}``
+
+ """
return self.schd.info_get_task_info(names)
@authorise(Priv.READ)
@ZMQServer.expose
def get_task_jobfile_path(self, task_id):
- """Return task job file path."""
+ """Return task job file path.
+
+ Args:
+ task_id: A `task identifier`_
+
+ Returns:
+ str: The jobfile path.
+
+ """
return self.schd.info_get_task_jobfile_path(task_id)
@authorise(Priv.READ)
@ZMQServer.expose
- def get_task_requisites(self, items=None, list_prereqs=False):
- """Return prerequisites of a task."""
+ def get_task_requisites(self, task_globs=None, list_prereqs=False):
+ """Return prerequisites of a task.
+
+ Args:
+ task_globs (list, optional):
+ List of identifiers, see `task globs`_
+
+ Returns:
+ list: Dictionary of `task identifiers `_
+ in the format ``{task_id: { ... }, ...}``.
+
+ """
return self.schd.info_get_task_requisites(
- items, list_prereqs=list_prereqs)
+ task_globs, list_prereqs=list_prereqs)
@authorise(Priv.CONTROL)
@ZMQServer.expose
def hold_after_point_string(self, point_string):
- """Set hold point of suite."""
+ """Set hold point of suite.
+
+ Args:
+ point_string (str): The cycle point to hold the suite *after.*
+
+ Returns:
+ tuple: (outcome, message)
+
+ outcome (bool)
+ True if command successfully queued.
+ message (str)
+ Information about outcome.
+
+ """
self.schd.command_queue.put(
("hold_after_point_string", (point_string,), {}))
return (True, 'Command queued')
@@ -385,23 +589,56 @@ def hold_after_point_string(self, point_string):
@authorise(Priv.CONTROL)
@ZMQServer.expose
def hold_suite(self):
- """Hold the suite."""
+ """Hold the suite.
+
+ Returns:
+ tuple: (outcome, message)
+
+ outcome (bool)
+ True if command successfully queued.
+ message (str)
+ Information about outcome.
+
+ """
self.schd.command_queue.put(("hold_suite", (), {}))
return (True, 'Command queued')
@authorise(Priv.CONTROL)
@ZMQServer.expose
- def hold_tasks(self, items):
+ def hold_tasks(self, task_globs):
"""Hold tasks.
- items is a list of identifiers for matching task proxies.
+ Args:
+ task_globs (list): List of identifiers, see `task globs`_
+
+ Returns:
+ tuple: (outcome, message)
+
+ outcome (bool)
+ True if command successfully queued.
+ message (str)
+ Information about outcome.
+
"""
- self.schd.command_queue.put(("hold_tasks", (items,), {}))
+ self.schd.command_queue.put(("hold_tasks", (task_globs,), {}))
return (True, 'Command queued')
@authorise(Priv.IDENTITY)
@ZMQServer.expose
def identify(self):
+ """Return basic information about the suite.
+
+ Returns:
+ dict: Dictionary containing the keys
+
+ cylc.suite_status.KEY_NAME
+ The suite name.
+ cylc.suite_status.KEY_OWNER
+ The user account the suite is running under.
+ cylc.suite_status.KEY_VERSION
+ The Cylc version the suite is runnin with.
+
+ """
return {
KEY_NAME: self.schd.suite,
KEY_OWNER: self.schd.owner,
@@ -411,11 +648,37 @@ def identify(self):
@authorise(Priv.DESCRIPTION)
@ZMQServer.expose
def describe(self):
+ """Return the suite metadata.]
+
+ Returns:
+ dict: ``{cylc.suite_status: { ... }}``
+
+ """
return {KEY_META: self.schd.config.cfg[KEY_META]}
@authorise(Priv.STATE_TOTALS)
@ZMQServer.expose
def state_totals(self):
+ """Returns counts of the task states present in the suite.
+
+ Returns:
+ dict: Dictionary with the keys:
+
+ cylc.suite_status.KEY_UPDATE_TIME
+ ISO8601 timestamp of when this data snapshot was made.
+ cylc.suite_status.KEY_STATES
+ Tuple of the form ``(state_count_totals, state_count_cycles)``
+
+ state_count_totals (dict):
+ Dictionary of the form ``{task_state: task_count}``.
+ state_count_cycles (dict):
+ Dictionary of the form ``{cycle_point: task_count}``.
+ cylc.suite_status.KEY_TASKS_BY_STATE
+ Dictionary in the form
+ ``{state: [(most_recent_time_string, task_name, point_string),``
+ ``...]}``.
+
+ """
return {
KEY_UPDATE_TIME: self.schd.state_summary_mgr.update_time,
KEY_STATES: self.schd.state_summary_mgr.get_state_totals(),
@@ -428,7 +691,23 @@ def state_totals(self):
def insert_tasks(self, items, stop_point_string=None, no_check=False):
"""Insert task proxies.
- items is a list of identifiers of (families of) task instances.
+ Args:
+ items (list):
+ A list of `task globs`_ (strings) which *cannot* contain
+ any glob characters (``*``).
+ stop_point_string (str, optional):
+ Optional hold/stop cycle point for inserted task.
+ no_check (bool, optional):
+ Add task even if the provided cycle point is not valid
+ for the given task.
+ Returns:
+ tuple: (outcome, message)
+
+ outcome (bool)
+ True if command successfully queued.
+ message (str)
+ Information about outcome.
+
"""
self.schd.command_queue.put((
"insert_tasks",
@@ -438,42 +717,104 @@ def insert_tasks(self, items, stop_point_string=None, no_check=False):
@authorise(Priv.CONTROL)
@ZMQServer.expose
- def kill_tasks(self, items):
+ def kill_tasks(self, task_globs):
"""Kill task jobs.
- items is a list of identifiers for matching task proxies.
+ Args:
+ task_globs (list): List of identifiers, see `task globs`_
+
+ Returns:
+ tuple: (outcome, message)
+
+ outcome (bool)
+ True if command successfully queued.
+ message (str)
+ Information about outcome.
+
"""
- self.schd.command_queue.put(("kill_tasks", (items,), {}))
+ self.schd.command_queue.put(("kill_tasks", (task_globs,), {}))
return (True, 'Command queued')
@authorise(Priv.CONTROL)
@ZMQServer.expose
def nudge(self):
- """Tell suite to try task processing."""
+ """Tell suite to try task processing.
+
+ Returns:
+ tuple: (outcome, message)
+
+ outcome (bool)
+ True if command successfully queued.
+ message (str)
+ Information about outcome.
+
+ """
self.schd.command_queue.put(("nudge", (), {}))
return (True, 'Command queued')
@authorise(Priv.IDENTITY)
@ZMQServer.expose
def ping_suite(self):
- """Return True."""
+ """Return True.
+
+ This serves as a basic network comms tests.
+
+ Returns:
+ tuple: (outcome, message)
+
+ outcome (bool)
+ True if command successfully queued.
+ message (str)
+ Information about outcome.
+
+ """
return True
@authorise(Priv.READ)
@ZMQServer.expose
def ping_task(self, task_id, exists_only=False):
- """Return True if task_id exists (and running)."""
+ """Return True if task_id exists (and is running).
+
+ Args:
+ task_id:
+ A `task identifier`_
+ exists_only (bool, optional):
+ If True only test that the task exists, if False check both
+ that the task exists and that it is running.
+
+ Returns:
+ tuple: (outcome, message)
+
+ outcome (bool):
+ True if task exists (and is running).
+ message (str):
+ A string describing the outcome / state of the task.
+
+ """
return self.schd.info_ping_task(task_id, exists_only=exists_only)
@authorise(Priv.CONTROL)
@ZMQServer.expose
- def poll_tasks(self, items=None, poll_succ=False):
- """Poll task jobs.
+ def poll_tasks(self, task_globs=None, poll_succ=False):
+ """Request the suite to poll task jobs.
+
+ Args:
+ task_globs (list, optional):
+ List of identifiers, see `task globs`_
+ poll_succ (bool, optional):
+ Allow polling of remote tasks if True.
+
+ Returns:
+ tuple: (outcome, message)
+
+ outcome (bool)
+ True if command successfully queued.
+ message (str)
+ Information about outcome.
- items is a list of identifiers for matching task proxies.
"""
self.schd.command_queue.put(
- ("poll_tasks", (items,), {"poll_succ": poll_succ}))
+ ("poll_tasks", (task_globs,), {"poll_succ": poll_succ}))
return (True, 'Command queued')
@authorise(Priv.CONTROL)
@@ -482,10 +823,35 @@ def put_broadcast(
self, point_strings=None, namespaces=None, settings=None):
"""Add new broadcast settings (server side interface).
- Return a tuple (modified_settings, bad_options) where:
- modified_settings is list of modified settings in the form:
- [("20200202", "foo", {"command scripting": "true"}, ...]
- bad_options is as described in the docstring for self.clear().
+ Args:
+ point_strings (list, optional):
+ List of point strings for this operation to apply to or
+ ``None`` to apply to all cycle points.
+ namespaces (list, optional):
+ List of namespace string (task / family names) for this
+ operation to apply to or ``None`` to apply to all namespaces.
+ settings (list, optional):
+ List of strings in the format ``key=value`` where ``key`` is a
+ Cylc configuration including section names e.g.
+ ``[section][subsection]item``.
+
+ Returns:
+ tuple: (modified_settings, bad_options)
+
+ modified_settings
+ similar to the return value of the "put" method, but for
+ removed settings.
+ bad_options
+ A dict in the form:
+ ``{"point_strings": ["20020202", ..."], ...}``.
+ The dict is only populated if there are options not
+ associated with previous broadcasts. The keys can be:
+
+ * point_strings: a list of bad point strings.
+ * namespaces: a list of bad namespaces.
+ * cancel: a list of tuples. Each tuple contains the keys of
+ a bad setting.
+
"""
return self.schd.task_events_mgr.broadcast_mgr.put_broadcast(
point_strings, namespaces, settings)
@@ -493,7 +859,21 @@ def put_broadcast(
@authorise(Priv.CONTROL)
@ZMQServer.expose
def put_ext_trigger(self, event_message, event_id):
- """Server-side external event trigger interface."""
+ """Server-side external event trigger interface.
+
+ Args:
+ event_message (str): The external trigger message.
+ event_id (str): The unique trigger ID.
+
+ Returns:
+ tuple: (outcome, message)
+
+ outcome (bool)
+ True if command successfully queued.
+ message (str)
+ Information about outcome.
+
+ """
self.schd.ext_trigger_queue.put((event_message, event_id))
return (True, 'Event queued')
@@ -503,10 +883,24 @@ def put_messages(self, task_job=None, event_time=None, messages=None):
"""Put task messages in queue for processing later by the main loop.
Arguments:
- task_job (str): Task job in the form "CYCLE/TASK_NAME/SUBMIT_NUM".
- event_time (str): Event time as string.
- messages (list): List in the form [[severity, message], ...].
+ task_job (str, optional):
+ Task job in the format ``CYCLE/TASK_NAME/SUBMIT_NUM``.
+ event_time (str, optional):
+ Event time as an ISO8601 string.
+ messages (list, optional):
+ List in the format ``[[severity, message], ...]``.
+
+ Returns:
+ tuple: (outcome, message)
+
+ outcome (bool)
+ True if command successfully queued.
+ message (str)
+ Information about outcome.
+
"""
+ # TODO: standardise the task_job interface to one of the other
+ # systems
for severity, message in messages:
self.schd.message_queue.put(
(task_job, event_time, severity, message))
@@ -515,54 +909,131 @@ def put_messages(self, task_job=None, event_time=None, messages=None):
@authorise(Priv.CONTROL)
@ZMQServer.expose
def reload_suite(self):
- """Tell suite to reload the suite definition."""
+ """Tell suite to reload the suite definition.
+
+ Returns:
+ tuple: (outcome, message)
+
+ outcome (bool)
+ True if command successfully queued.
+ message (str)
+ Information about outcome.
+
+ """
self.schd.command_queue.put(("reload_suite", (), {}))
return (True, 'Command queued')
@authorise(Priv.CONTROL)
@ZMQServer.expose
def release_suite(self):
- """Unhold suite."""
+ """Unhold suite.
+
+ Returns:
+ tuple: (outcome, message)
+
+ outcome (bool)
+ True if command successfully queued.
+ message (str)
+ Information about outcome.
+
+ """
self.schd.command_queue.put(("release_suite", (), {}))
return (True, 'Command queued')
@authorise(Priv.CONTROL)
@ZMQServer.expose
- def release_tasks(self, items):
+ def release_tasks(self, task_globs):
"""Unhold tasks.
- items is a list of identifiers for matching task proxies.
+ Args:
+ task_globs (list): List of identifiers, see `task globs`_
+
+ Returns:
+ tuple: (outcome, message)
+
+ outcome (bool)
+ True if command successfully queued.
+ message (str)
+ Information about outcome.
+
"""
- self.schd.command_queue.put(("release_tasks", (items,), {}))
+ self.schd.command_queue.put(("release_tasks", (task_globs,), {}))
return (True, 'Command queued')
@authorise(Priv.CONTROL)
@ZMQServer.expose
- def remove_tasks(self, items, spawn=False):
+ def remove_tasks(self, task_globs, spawn=False):
"""Remove tasks from task pool.
- items is a list of identifiers for matching task proxies.
+ Args:
+ task_globs (list):
+ List of identifiers, see `task globs`_
+ spawn (bool, optional):
+ If True ensure task has spawned before removal.
+
+ Returns:
+ tuple: (outcome, message)
+
+ outcome (bool)
+ True if command successfully queued.
+ message (str)
+ Information about outcome.
+
"""
self.schd.command_queue.put(
- ("remove_tasks", (items,), {"spawn": spawn}))
+ ("remove_tasks", (task_globs,), {"spawn": spawn}))
return (True, 'Command queued')
@authorise(Priv.CONTROL)
@ZMQServer.expose
- def reset_task_states(self, items, state=None, outputs=None):
+ def reset_task_states(self, task_globs, state=None, outputs=None):
"""Reset statuses tasks.
- items is a list of identifiers for matching task proxies.
+ Args:
+ task_globs (list):
+ List of identifiers, see `task globs`_
+ state (str, optional):
+ Task state to reset task to.
+ See ``cylc.task_state.TASK_STATUSES_CAN_RESET_TO``.
+ outputs (list, optional):
+ Find task output by message string or trigger string
+ set complete or incomplete with !OUTPUT
+ ``*`` to set all complete, ``!*`` to set all incomplete.
+
+ Returns:
+ tuple: (outcome, message)
+
+ outcome (bool)
+ True if command successfully queued.
+ message (str)
+ Information about outcome.
+
"""
self.schd.command_queue.put((
"reset_task_states",
- (items,), {"state": state, "outputs": outputs}))
+ (task_globs,), {"state": state, "outputs": outputs}))
return (True, 'Command queued')
@authorise(Priv.SHUTDOWN)
@ZMQServer.expose
def set_stop_after_clock_time(self, datetime_string):
- """Set suite to stop after wallclock time."""
+ """Set suite to stop after wallclock time.
+
+ Args:
+ datetime_string (str):
+ An ISO8601 formatted date-time of the wallclock
+ (real-world as opposed to simulation) time
+ to stop the suite after.
+
+ Returns:
+ tuple: (outcome, message)
+
+ outcome (bool)
+ True if command successfully queued.
+ message (str)
+ Information about outcome.
+
+ """
self.schd.command_queue.put(
("set_stop_after_clock_time", (datetime_string,), {}))
return (True, 'Command queued')
@@ -570,7 +1041,21 @@ def set_stop_after_clock_time(self, datetime_string):
@authorise(Priv.SHUTDOWN)
@ZMQServer.expose
def set_stop_after_point(self, point_string):
- """Set suite to stop after cycle point."""
+ """Set suite to stop after cycle point.
+
+ Args:
+ point_string (str):
+ The cycle point to stop the suite after.
+
+ Returns:
+ tuple: (outcome, message)
+
+ outcome (bool)
+ True if command successfully queued.
+ message (str)
+ Information about outcome.
+
+ """
self.schd.command_queue.put(
("set_stop_after_point", (point_string,), {}))
return (True, 'Command queued')
@@ -578,7 +1063,20 @@ def set_stop_after_point(self, point_string):
@authorise(Priv.SHUTDOWN)
@ZMQServer.expose
def set_stop_after_task(self, task_id):
- """Set suite to stop after an instance of a task."""
+ """Set suite to stop after an instance of a task.
+
+ Args:
+ task_id (str): A `task identifier`_
+
+ Returns:
+ tuple: (outcome, message)
+
+ outcome (bool)
+ True if command successfully queued.
+ message (str)
+ Information about outcome.
+
+ """
self.schd.command_queue.put(
("set_stop_after_task", (task_id,), {}))
return (True, 'Command queued')
@@ -586,7 +1084,25 @@ def set_stop_after_task(self, task_id):
@authorise(Priv.SHUTDOWN)
@ZMQServer.expose
def set_stop_cleanly(self, kill_active_tasks=False):
- """Set suite to stop cleanly or after kill active tasks."""
+ """Set suite to stop cleanly or after kill active tasks.
+
+ The suite will wait for all active (running, submitted) tasks
+ to complete before stopping.
+
+ Args:
+ kill_active_tasks (bool, optional):
+ If True the suite will attempt to kill any active
+ (running, submitted) tasks
+
+ Returns:
+ tuple: (outcome, message)
+
+ outcome (bool)
+ True if command successfully queued.
+ message (str)
+ Information about outcome.
+
+ """
self.schd.command_queue.put(
("set_stop_cleanly", (), {"kill_active_tasks": kill_active_tasks}))
return (True, 'Command queued')
@@ -594,44 +1110,104 @@ def set_stop_cleanly(self, kill_active_tasks=False):
@authorise(Priv.CONTROL)
@ZMQServer.expose
def set_verbosity(self, level):
- """Set suite verbosity to new level."""
+ """Set suite verbosity to new level (for suite logs).
+
+ Args:
+ level (str): A logging level e.g. ``INFO`` or ``ERROR``.
+
+ Returns:
+ tuple: (outcome, message)
+
+ outcome (bool)
+ True if command successfully queued.
+ message (str)
+ Information about outcome.
+
+ """
self.schd.command_queue.put(("set_verbosity", (level,), {}))
return (True, 'Command queued')
@authorise(Priv.CONTROL)
@ZMQServer.expose
- def spawn_tasks(self, items):
+ def spawn_tasks(self, task_globs):
"""Spawn tasks.
- items is a list of identifiers for matching task proxies.
+ Args:
+ task_globs (list): List of identifiers, see `task globs`_
+
+ Returns:
+ tuple: (outcome, message)
+
+ outcome (bool)
+ True if command successfully queued.
+ message (str)
+ Information about outcome.
+
"""
- self.schd.command_queue.put(("spawn_tasks", (items,), {}))
+ self.schd.command_queue.put(("spawn_tasks", (task_globs,), {}))
return (True, 'Command queued')
@authorise(Priv.SHUTDOWN)
@ZMQServer.expose
def stop_now(self, terminate=False):
- """Stop suite on event handler completion, or terminate right away."""
+ """Stop suite on event handler completion, or terminate right away.
+
+ Args:
+ terminate (bool, optional):
+ If False Cylc will run event handlers, if True it will not.
+
+ Returns:
+ tuple: (outcome, message)
+
+ outcome (bool)
+ True if command successfully queued.
+ message (str)
+ Information about outcome.
+
+ """
self.schd.command_queue.put(("stop_now", (), {"terminate": terminate}))
return (True, 'Command queued')
@authorise(Priv.CONTROL)
@ZMQServer.expose
- def take_checkpoints(self, items):
+ def take_checkpoints(self, name):
"""Checkpoint current task pool.
- items[0] is the name of the checkpoint.
+ Args:
+ name (str): The checkpoint name
+
+ Returns:
+ tuple: (outcome, message)
+
+ outcome (bool)
+ True if command successfully queued.
+ message (str)
+ Information about outcome.
+
"""
- self.schd.command_queue.put(("take_checkpoints", (items,), {}))
+ self.schd.command_queue.put(("take_checkpoints", (name,), {}))
return (True, 'Command queued')
@authorise(Priv.CONTROL)
@ZMQServer.expose
- def trigger_tasks(self, items, back_out=False):
+ def trigger_tasks(self, task_globs, back_out=False):
"""Trigger submission of task jobs where possible.
- items is a list of identifiers for matching task proxies.
+ Args:
+ task_globs (list):
+ List of identifiers, see `task globs`_
+ back_out (bool, optional):
+ Abort e.g. in the event of a rejected trigger-edit.
+
+ Returns:
+ tuple: (outcome, message)
+
+ outcome (bool)
+ True if command successfully queued.
+ message (str)
+ Information about outcome.
+
"""
self.schd.command_queue.put(
- ("trigger_tasks", (items,), {"back_out": back_out}))
+ ("trigger_tasks", (task_globs,), {"back_out": back_out}))
return (True, 'Command queued')
diff --git a/lib/cylc/option_parsers.py b/lib/cylc/option_parsers.py
index 663c3b6ca38..cf06b627ccf 100644
--- a/lib/cylc/option_parsers.py
+++ b/lib/cylc/option_parsers.py
@@ -32,7 +32,8 @@ class CylcOptionParser(OptionParser):
"""Common options for all cylc CLI commands."""
MULTITASK_USAGE = """
-TASKID is a pattern to match task proxies or task families, or groups of them:
+TASK_GLOB is a pattern to match task proxies or task families,
+or groups of them:
* [CYCLE-POINT-GLOB/]TASK-NAME-GLOB[:TASK-STATE]
* [CYCLE-POINT-GLOB/]FAMILY-NAME-GLOB[:TASK-STATE]
* TASK-NAME-GLOB[.CYCLE-POINT-GLOB][:TASK-STATE]
@@ -45,11 +46,7 @@ class CylcOptionParser(OptionParser):
'*0000Z/foo*:retrying'
* retrying tasks in 'BAR' family: '*/BAR:retrying' or 'BAR.*:retrying'
* retrying tasks in 'BAR' or 'BAZ' families: '*/BA[RZ]:retrying' or
- 'BA[RZ].*:retrying'
-
-The old 'MATCH POINT' syntax will be automatically detected and supported. To
-avoid this, use the '--no-multitask-compat' option, or use the new syntax
-(with a '/' or a '.') when specifying 2 TASKID arguments."""
+ 'BA[RZ].*:retrying'"""
def __init__(self, usage, argdoc=None, comms=False, noforce=False,
jset=False, multitask=False, prep=False, auto_add=True,
@@ -74,7 +71,6 @@ def __init__(self, usage, argdoc=None, comms=False, noforce=False,
self.comms = comms
self.jset = jset
self.noforce = noforce
- self.multitask = multitask
self.prep = prep
self.icp = icp
self.suite_info = []
@@ -211,22 +207,6 @@ def add_std_options(self):
),
action="store", default=None, dest="templatevars_file")
- if self.multitask:
- self.add_std_option(
- "-m", "--family",
- help=(
- "(Obsolete) This option is now ignored "
- "and is retained for backward compatibility only. "
- "TASKID in the argument list can be used to match "
- "task and family names regardless of this option."),
- action="store_true", default=False, dest="is_family")
-
- self.add_std_option(
- "--no-multitask-compat",
- help="Disallow backward compatible multitask interface.",
- action="store_false", default=True,
- dest="multitask_compat")
-
if self.icp:
self.add_option(
"--icp",
@@ -284,26 +264,3 @@ def parse_args(self, remove_opts=None):
LOG.addHandler(errhandler)
return (options, args)
-
- @classmethod
- def parse_multitask_compat(cls, options, mtask_args):
- """Parse argument items for multitask backward compatibility.
-
- If options.multitask_compat is False, return (mtask_args, None).
-
- If options.multitask_compat is True, it checks if mtask_args is a
- 2-element array and if the 1st and 2nd arguments look like the old
- "MATCH" "POINT" CLI arguments.
- If so, it returns (mtask_args[0], mtask_args[1]).
- Otherwise, it return (mtask_args, None).
-
- """
- if (options.multitask_compat and len(mtask_args) == 2 and
- not any("/" in mtask_arg for mtask_arg in mtask_args) and
- "." not in mtask_args[1]):
- # For backward compat, argument list should have 2 elements.
- # Element 1 may be a regular expression, so it may contain "." but
- # should not contain a "/".
- # All other elements should contain no "." and "/".
- return (mtask_args[0] + "." + mtask_args[1],)
- return mtask_args
diff --git a/lib/cylc/scheduler.py b/lib/cylc/scheduler.py
index 4af561b95b5..b7c443872bc 100644
--- a/lib/cylc/scheduler.py
+++ b/lib/cylc/scheduler.py
@@ -248,7 +248,8 @@ def start(self):
daemonize(self)
self._setup_suite_logger()
self.server = SuiteRuntimeServer(self)
- self.server.start(glbl_cfg().get(['suite servers', 'run ports']))
+ port_range = glbl_cfg().get(['suite servers', 'run ports'])
+ self.server.start(port_range[0], port_range[-1])
self.port = self.server.port
self.configure()
self.profiler.start()
@@ -1838,9 +1839,9 @@ def command_spawn_tasks(self, items):
"""Force spawn task successors."""
return self.pool.spawn_tasks(items)
- def command_take_checkpoints(self, items):
+ def command_take_checkpoints(self, name):
"""Insert current task_pool, etc to checkpoints tables."""
- return self.suite_db_mgr.checkpoint(items[0])
+ return self.suite_db_mgr.checkpoint(name)
def filter_initial_task_list(self, inlist):
"""Return list of initial tasks after applying a filter."""
diff --git a/tests/cylc-insert/05-insert-compat/suite.rc b/tests/cylc-insert/05-insert-compat/suite.rc
index 3c521831386..70c801f532a 100644
--- a/tests/cylc-insert/05-insert-compat/suite.rc
+++ b/tests/cylc-insert/05-insert-compat/suite.rc
@@ -29,6 +29,7 @@ duration of the suite; and the other is set to stop after two cycles."""
script = "sleep 1" # quick
[[prep]]
script = """
-cylc insert $CYLC_SUITE_NAME bar 20140101T00
-cylc insert $CYLC_SUITE_NAME baz 20140101T00 20140102T00
-"""
+ cylc insert $CYLC_SUITE_NAME 'bar.20140101T00'
+ cylc insert $CYLC_SUITE_NAME 'baz.20140101T00' \
+ --stop-point 20140102T00
+ """
diff --git a/tests/cylc-insert/06-insert-bad-cycle-point-compat/suite.rc b/tests/cylc-insert/06-insert-bad-cycle-point-compat/suite.rc
index 4ea1bbc4738..112e11d5302 100644
--- a/tests/cylc-insert/06-insert-bad-cycle-point-compat/suite.rc
+++ b/tests/cylc-insert/06-insert-bad-cycle-point-compat/suite.rc
@@ -24,4 +24,4 @@
script = "sleep 1" # quick
[[prep]]
# Insert the task with a bad cycle point
- script = cylc insert $CYLC_SUITE_NAME foo teatime
+ script = cylc insert "$CYLC_SUITE_NAME" 'foo.teatime'
diff --git a/tests/cylc-insert/07-insert-bad-stop-cycle-point/suite.rc b/tests/cylc-insert/07-insert-bad-stop-cycle-point/suite.rc
index 7298d8ab990..5b01d693f8f 100644
--- a/tests/cylc-insert/07-insert-bad-stop-cycle-point/suite.rc
+++ b/tests/cylc-insert/07-insert-bad-stop-cycle-point/suite.rc
@@ -24,4 +24,6 @@
script = "sleep 1" # quick
[[prep]]
# Insert the task with a bad cycle point
- script = cylc insert $CYLC_SUITE_NAME foo 20140101T00 soon
+ script = """
+ cylc insert $CYLC_SUITE_NAME 'foo.20140101T00' --stop-point=soon
+ """
diff --git a/tests/cylc-insert/08-insert-family-compat/suite.rc b/tests/cylc-insert/08-insert-family-compat/suite.rc
index d5a469447c0..fccbf4674df 100644
--- a/tests/cylc-insert/08-insert-family-compat/suite.rc
+++ b/tests/cylc-insert/08-insert-family-compat/suite.rc
@@ -26,9 +26,7 @@ cylc remove --no-spawn "$CYLC_SUITE_NAME" "FAM-?.$CYLC_TASK_CYCLE_POINT"
"""
[[inserter]]
# Re-insert one family.
- script = """
-cylc insert -m $CYLC_SUITE_NAME FAM-A $CYLC_TASK_CYCLE_POINT
-"""
+ script = cylc insert "$CYLC_SUITE_NAME" "FAM-A.$CYLC_TASK_CYCLE_POINT"
[[FAM-A, FAM-B]]
[[a1, a2]]
inherit = FAM-A
diff --git a/tests/cylc-kill/00-multi-hosts-compat/suite.rc b/tests/cylc-kill/00-multi-hosts-compat/suite.rc
index 03eefaa7226..891f0af9080 100644
--- a/tests/cylc-kill/00-multi-hosts-compat/suite.rc
+++ b/tests/cylc-kill/00-multi-hosts-compat/suite.rc
@@ -21,6 +21,6 @@ KILLABLE:start-all => killer
host={{CYLC_TEST_HOST}}
[[killer]]
script="""
-cylc kill -m "${CYLC_SUITE_NAME}" KILLABLE 1
+cylc kill "${CYLC_SUITE_NAME}" KILLABLE 1
cylc stop "${CYLC_SUITE_NAME}"
"""
diff --git a/tests/cylc-kill/01-multi-hosts/suite.rc b/tests/cylc-kill/01-multi-hosts/suite.rc
index a269d679036..82d9e862c15 100644
--- a/tests/cylc-kill/01-multi-hosts/suite.rc
+++ b/tests/cylc-kill/01-multi-hosts/suite.rc
@@ -21,6 +21,6 @@ KILLABLE:start-all => killer
host={{CYLC_TEST_HOST}}
[[killer]]
script="""
-cylc kill -m "${CYLC_SUITE_NAME}" KILLABLE.1
+cylc kill "${CYLC_SUITE_NAME}" KILLABLE.1
cylc stop "${CYLC_SUITE_NAME}"
"""
diff --git a/tests/cylc-kill/02-submitted/suite.rc b/tests/cylc-kill/02-submitted/suite.rc
index 416a1615acd..f99dae53c2e 100644
--- a/tests/cylc-kill/02-submitted/suite.rc
+++ b/tests/cylc-kill/02-submitted/suite.rc
@@ -24,7 +24,7 @@ sleep 60
script="""
wait "${CYLC_TASK_MESSAGE_STARTED_PID}" 2>/dev/null || true
sleep 5
-cylc kill -m "${CYLC_SUITE_NAME}" '*:submitted'
+cylc kill "${CYLC_SUITE_NAME}" '*:submitted'
"""
[[sleeper]]
script=sleep 20
diff --git a/tests/cylc-take-checkpoints/00-basic/suite.rc b/tests/cylc-take-checkpoints/00-basic/suite.rc
index 451f7b20912..05a0b038cd9 100644
--- a/tests/cylc-take-checkpoints/00-basic/suite.rc
+++ b/tests/cylc-take-checkpoints/00-basic/suite.rc
@@ -20,7 +20,7 @@ if [[ "${CYLC_TASK_CYCLE_POINT}" == '2017' ]]; then
sleep 2 # state of current task should be recorded after 2 seconds
cylc checkpoint "${CYLC_SUITE_NAME}" 'snappy'
LOG="${CYLC_SUITE_LOG_DIR}/log"
- while ! grep -qF "INFO - Command succeeded: take_checkpoints(['snappy'])" \
+ while ! grep -qF "INFO - Command succeeded: take_checkpoints(snappy)" \
"${LOG}"
do
sleep 1 # make sure take_checkpoints command completes
diff --git a/tests/cylc-trigger/03-edit-run/suite.rc b/tests/cylc-trigger/03-edit-run/suite.rc
index 03ce77115a0..dac25105fa8 100644
--- a/tests/cylc-trigger/03-edit-run/suite.rc
+++ b/tests/cylc-trigger/03-edit-run/suite.rc
@@ -18,13 +18,13 @@ second task fixes and retriggers it with an edit-run."""
script = /bin/false
[[fixer-task]]
script = """
-cylc trigger --edit $CYLC_SUITE_NAME broken-task 1 << __END__
+cylc trigger --edit $CYLC_SUITE_NAME 'broken-task.1' << __END__
y
__END__"""
[[syntax_errored_task]]
script = $(
[[syntax_fixer_task]]
script = """
-cylc trigger --edit $CYLC_SUITE_NAME syntax_errored_task 1 << __END__
+cylc trigger --edit $CYLC_SUITE_NAME 'syntax_errored_task.1' << __END__
y
__END__"""
diff --git a/tests/cylc-trigger/08-edit-run-host-select/suite.rc b/tests/cylc-trigger/08-edit-run-host-select/suite.rc
index 9cd5de12877..6d8e35f645f 100644
--- a/tests/cylc-trigger/08-edit-run-host-select/suite.rc
+++ b/tests/cylc-trigger/08-edit-run-host-select/suite.rc
@@ -17,6 +17,6 @@ second task fixes and retriggers it with an edit-run."""
host = $(echo localhost)
[[fixer-task]]
script = """
-cylc trigger --edit $CYLC_SUITE_NAME broken-task 1 << __END__
+cylc trigger --edit $CYLC_SUITE_NAME 'broken-task.1' << __END__
y
__END__"""
diff --git a/tests/restart/25-hold-suite/suite.rc b/tests/restart/25-hold-suite/suite.rc
index 8866e75c91f..638abe11201 100644
--- a/tests/restart/25-hold-suite/suite.rc
+++ b/tests/restart/25-hold-suite/suite.rc
@@ -12,15 +12,15 @@
[[dependencies]]
[[[P1Y]]]
graph = """
-t1[-P1Y] => t1 => t2
-"""
+ t1[-P1Y] => t1 => t2
+ """
[runtime]
[[t1]]
script = """
-if [[ "${CYLC_TASK_CYCLE_POINT}" == '2016' ]]; then
- cylc hold "${CYLC_SUITE_NAME}"
- cylc stop "${CYLC_SUITE_NAME}"
-fi
-"""
+ if [[ "${CYLC_TASK_CYCLE_POINT}" == '2016' ]]; then
+ cylc hold "${CYLC_SUITE_NAME}"
+ cylc stop "${CYLC_SUITE_NAME}"
+ fi
+ """
[[t2]]
script = true