Skip to content

Commit

Permalink
zmq: tidy server-client interface
Browse files Browse the repository at this point in the history
* remove legacy client uuid interface
* add client metadate for logging
  • Loading branch information
oliver-sanders committed Mar 6, 2019
1 parent 255729e commit a6f7cc0
Show file tree
Hide file tree
Showing 27 changed files with 89 additions and 123 deletions.
3 changes: 1 addition & 2 deletions bin/cylc-broadcast
Original file line number Diff line number Diff line change
Expand Up @@ -250,8 +250,7 @@ def main():

pclient = SuiteRuntimeClient(
suite, options.owner, options.host, options.port,
options.comms_timeout, my_uuid=options.set_uuid,
print_uuid=options.print_uuid)
options.comms_timeout)

if options.show or options.showtask:
if options.showtask:
Expand Down
3 changes: 1 addition & 2 deletions bin/cylc-checkpoint
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,7 @@ def main():

pclient = SuiteRuntimeClient(
suite, options.owner, options.host, options.port,
options.comms_timeout, my_uuid=options.set_uuid,
print_uuid=options.print_uuid)
options.comms_timeout)
pclient('take_checkpoints', {'items': [name]})


Expand Down
3 changes: 1 addition & 2 deletions bin/cylc-client
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,7 @@ def main():
suite = args[0]
func = args[1]
pclient = SuiteRuntimeClient(
suite, options.owner, options.host, options.port,
my_uuid=options.set_uuid, print_uuid=options.print_uuid)
suite, options.owner, options.host, options.port)
if options.no_input:
kwargs = {}
else:
Expand Down
3 changes: 1 addition & 2 deletions bin/cylc-dump
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,7 @@ def main():

pclient = SuiteRuntimeClient(
suite, options.owner, options.host, options.port,
options.comms_timeout, my_uuid=options.set_uuid,
print_uuid=options.print_uuid)
options.comms_timeout)
summary = pclient('get_suite_state_summary')

if options.disp_form == "raw":
Expand Down
3 changes: 1 addition & 2 deletions bin/cylc-ext-trigger
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,7 @@ def main():
LOG.info('Send to suite %s: "%s" (%s)', suite, event_msg, event_id)

pclient = SuiteRuntimeClient(
suite, options.owner, options.host, options.port,
my_uuid=options.set_uuid, print_uuid=options.print_uuid)
suite, options.owner, options.host, options.port)

max_n_tries = int(options.max_n_tries)
retry_intvl_secs = float(options.retry_intvl_secs)
Expand Down
3 changes: 1 addition & 2 deletions bin/cylc-get-suite-version
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,7 @@ def main():
suite = args[0]

pclient = SuiteRuntimeClient(
suite, options.owner, options.host, options.port,
my_uuid=options.set_uuid, print_uuid=options.print_uuid)
suite, options.owner, options.host, options.port)
print(pclient('get_cylc_version', timeout=options.comms_timeout))


Expand Down
3 changes: 1 addition & 2 deletions bin/cylc-hold
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,7 @@ def main():
prompt('Hold suite %s' % suite, options.force)

pclient = SuiteRuntimeClient(
suite, options.owner, options.host, options.port,
my_uuid=options.set_uuid, print_uuid=options.print_uuid)
suite, options.owner, options.host, options.port)

if args:
items = parser.parse_multitask_compat(options, args)
Expand Down
3 changes: 1 addition & 2 deletions bin/cylc-insert
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,7 @@ def main():
prompt('Insert %s in %s' % (items, suite), options.force)

pclient = SuiteRuntimeClient(
suite, options.owner, options.host, options.port,
my_uuid=options.set_uuid, print_uuid=options.print_uuid)
suite, options.owner, options.host, options.port)

pclient(
'insert_tasks',
Expand Down
3 changes: 1 addition & 2 deletions bin/cylc-kill
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,7 @@ def main():
else:
prompt('Kill ALL tasks in %s' % (suite), options.force)
pclient = SuiteRuntimeClient(
suite, options.owner, options.host, options.port,
my_uuid=options.set_uuid, print_uuid=options.print_uuid)
suite, options.owner, options.host, options.port)
pclient(
'kill_tasks',
{'items': parser.parse_multitask_compat(options, args)},
Expand Down
3 changes: 1 addition & 2 deletions bin/cylc-monitor
Original file line number Diff line number Diff line change
Expand Up @@ -210,8 +210,7 @@ The USER_AT_HOST argument allows suite selection by 'cylc scan' output:
if run_mode != "live":
suite_name += " (%s)" % run_mode
prefix = "%s - %d tasks" % (suite_name, int(n_tasks_total))
#suffix = "%s %s" % (client_name, self.pclient.my_uuid)
suffix = "%s" % client_name # TODO
suffix = "%s" % client_name
title_str = ' ' * len_header
title_str = prefix + title_str[len(prefix):]
title_str = '\033[1;37;44m%s%s\033[0m' % (
Expand Down
3 changes: 1 addition & 2 deletions bin/cylc-nudge
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,7 @@ def main():

pclient = SuiteRuntimeClient(
suite, options.owner, options.host, options.port,
options.comms_timeout, my_uuid=options.set_uuid,
print_uuid=options.print_uuid)
options.comms_timeout)

success, msg = pclient('nudge')
if success:
Expand Down
3 changes: 1 addition & 2 deletions bin/cylc-ping
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,7 @@ def main():

pclient = SuiteRuntimeClient(
suite, options.owner, options.host, options.port,
options.comms_timeout, my_uuid=options.set_uuid,
print_uuid=options.print_uuid)
options.comms_timeout)

# cylc ping SUITE
pclient('ping_suite') # (no need to check the result)
Expand Down
3 changes: 1 addition & 2 deletions bin/cylc-poll
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,7 @@ def main():
prompt('Poll ALL tasks in %s' % (suite), options.force)
pclient = SuiteRuntimeClient(
suite, options.owner, options.host, options.port,
options.comms_timeout, my_uuid=options.set_uuid,
print_uuid=options.print_uuid)
options.comms_timeout)
items = parser.parse_multitask_compat(options, args)
# Back compat: back_out introduced >7.5.0
# So don't call with "poll_succ" if not necessary to avoid breakage.
Expand Down
3 changes: 1 addition & 2 deletions bin/cylc-release
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,7 @@ def main():
prompt('Release suite %s' % suite, options.force)
pclient = SuiteRuntimeClient(
suite, options.owner, options.host, options.port,
options.comms_timeout, my_uuid=options.set_uuid,
print_uuid=options.print_uuid)
options.comms_timeout)
if args:
items = parser.parse_multitask_compat(options, args)
pclient('release_tasks', {'items': items})
Expand Down
3 changes: 1 addition & 2 deletions bin/cylc-reload
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,7 @@ def main():
prompt('Reload %s' % suite, options.force)
pclient = SuiteRuntimeClient(
suite, options.owner, options.host, options.port,
options.comms_timeout, my_uuid=options.set_uuid,
print_uuid=options.print_uuid)
options.comms_timeout)
pclient('reload_suite')


Expand Down
3 changes: 1 addition & 2 deletions bin/cylc-remove
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,7 @@ def main():
prompt('remove task(s) %s in %s' % (args, suite), options.force)
pclient = SuiteRuntimeClient(
suite, options.owner, options.host, options.port,
options.comms_timeout, my_uuid=options.set_uuid,
print_uuid=options.print_uuid)
options.comms_timeout)
items = parser.parse_multitask_compat(options, args)
pclient(
'remove_tasks',
Expand Down
3 changes: 1 addition & 2 deletions bin/cylc-reset
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,7 @@ def main():
prompt('Reset task(s) %s in %s' % (args, suite), options.force)
pclient = SuiteRuntimeClient(
suite, options.owner, options.host, options.port,
options.comms_timeout, my_uuid=options.set_uuid,
print_uuid=options.print_uuid)
options.comms_timeout)
items = parser.parse_multitask_compat(options, args)
pclient(
'reset_task_states',
Expand Down
3 changes: 1 addition & 2 deletions bin/cylc-set-verbosity
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,7 @@ def main():
options.force)
pclient = SuiteRuntimeClient(
suite, options.owner, options.host, options.port,
options.comms_timeout, my_uuid=options.set_uuid,
print_uuid=options.print_uuid)
options.comms_timeout)

pclient('set_verbosity', {'level': severity})

Expand Down
4 changes: 1 addition & 3 deletions bin/cylc-show
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,7 @@ def main():
task_args = args[1:]
pclient = SuiteRuntimeClient(
suite, options.owner, options.host, options.port,
options.comms_timeout, my_uuid=options.set_uuid,
print_uuid=options.print_uuid
)
options.comms_timeout)
json_filter = []

if not task_args:
Expand Down
3 changes: 1 addition & 2 deletions bin/cylc-spawn
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,7 @@ def main():
prompt('Spawn task(s) %s in %s' % (args, suite), options.force)
pclient = SuiteRuntimeClient(
suite, options.owner, options.host, options.port,
options.comms_timeout, my_uuid=options.set_uuid,
print_uuid=options.print_uuid)
options.comms_timeout)

pclient(
'spawn_tasks',
Expand Down
3 changes: 1 addition & 2 deletions bin/cylc-stop
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,7 @@ def main():

pclient = SuiteRuntimeClient(
suite, options.owner, options.host, options.port,
options.comms_timeout, my_uuid=options.set_uuid,
print_uuid=options.print_uuid)
options.comms_timeout)

if int(options.max_polls) > 0:
# (test to avoid the "nothing to do" warning for # --max-polls=0)
Expand Down
3 changes: 1 addition & 2 deletions bin/cylc-trigger
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,7 @@ def main():

pclient = SuiteRuntimeClient(
suite, options.owner, options.host, options.port,
options.comms_timeout, my_uuid=options.set_uuid,
print_uuid=options.print_uuid)
options.comms_timeout)

aborted = False
if options.edit_run:
Expand Down
62 changes: 37 additions & 25 deletions lib/cylc/network/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,11 @@
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""HTTP(S) client for suite runtime API.
Implementation currently via requests (urllib3) or urllib2.
"""
"""Client for suite runtime API."""

import asyncio
import os
import socket
import sys

import jose.exceptions
Expand Down Expand Up @@ -86,6 +85,7 @@ class ZMQClient(object):
Optional function which runs before ClientTimeout is raised.
This provides an interface for raising more specific exceptions in
the event of a communication timeout.
header (dict): Request "header" data to attach to each request.
Usage:
* Call endpoints using ``ZMQClient.__call__``.
Expand All @@ -100,7 +100,7 @@ class ZMQClient(object):
DEFAULT_TIMEOUT = 5. # 5 seconds

def __init__(self, host, port, encode_method, decode_method, secret_method,
timeout=None, timeout_handler=None):
timeout=None, timeout_handler=None, header=None):
self.encode = encode_method
self.decode = decode_method
self.secret = secret_method
Expand All @@ -122,6 +122,11 @@ def __init__(self, host, port, encode_method, decode_method, secret_method,
self.poller = zmq.Poller()
self.poller.register(self.socket, zmq.POLLIN)

if not header:
self.header = {}
else:
self.header = dict(header)

async def async_request(self, command, args=None, timeout=None):
"""Send a request.
Expand Down Expand Up @@ -157,6 +162,7 @@ async def async_request(self, command, args=None, timeout=None):

# send message
msg = {'command': command, 'args': args}
msg.update(self.header)
LOG.debug('zmq:send %s' % msg)
message = encrypt(msg, secret)
self.socket.send_string(message)
Expand Down Expand Up @@ -190,7 +196,7 @@ def serial_request(self, command, args=None, timeout=None):
__call__ = serial_request


class SuiteRuntimeClient(ZMQClient):
class SuiteRuntimeClient:
"""Initiate a client to the suite runtime API.
This class contains the logic specific to communicating with Cylc suites.
Expand All @@ -208,42 +214,48 @@ class SuiteRuntimeClient(ZMQClient):
Message receive timeout in seconds. Also used to set the
"linger" time, see ``ZMQClient``.
Determine host and port from the contact file unless they are both
provided.
Determine host and port from the contact file unless provided.
If there is no socket bound to the specified host/port the client will
bail after ``timeout`` seconds.
TODO: Implement or remove:
* my_uuid
* print_uuid
* auth
"""

NOT_RUNNING = "Contact info not found for suite \"%s\", suite not running?"

def __init__(self, suite, owner=None, host=None, port=None,
timeout=None, my_uuid=None, print_uuid=False, auth=None):
self.suite = suite
def __new__(cls, suite, owner=None, host=None, port=None, timeout=None):
if isinstance(timeout, str):
timeout = float(timeout)

# work out what we are connecting to
if host and port:
if port:
port = int(port)
elif host or port:
raise ValueError('Provide both host and port')
else:
host, port = self.get_location(suite, owner, host)
if not (host and port):
host, port = cls.get_location(suite, owner, host)

# create connection
ZMQClient.__init__(
self, host, port, encrypt, decrypt, lambda: get_secret(suite),
timeout=timeout,
timeout_handler=lambda: self._timeout_handler(suite, host, port)
return ZMQClient(
host, port, encrypt, decrypt, lambda: get_secret(suite),
timeout=timeout, header=cls.get_header(),
timeout_handler=lambda: cls._timeout_handler(suite, host, port)
)

@staticmethod
def get_header():
"""Return "header" data to attach to each request for traceability."""
CYLC_EXE = os.path.join(os.environ['CYLC_DIR'], 'bin', '')
cmd = sys.argv[0]

if cmd.startswith(CYLC_EXE):
cmd = cmd.replace(CYLC_EXE, '')

return {
'meta': {
'prog': cmd,
'host': socket.gethostname()
}
}

@staticmethod
def _timeout_handler(suite, host, port):
"""Handle the eventuality of a communication timeout with the suite."""
Expand Down
Loading

0 comments on commit a6f7cc0

Please sign in to comment.