Skip to content

Commit

Permalink
Subscriber added
Browse files Browse the repository at this point in the history
  • Loading branch information
dwsutherland committed Oct 13, 2019
1 parent c421b60 commit d8b6cfb
Show file tree
Hide file tree
Showing 4 changed files with 179 additions and 1 deletion.
2 changes: 2 additions & 0 deletions bin/cylc-help
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ control_commands['broadcast'] = ['broadcast', 'bcast']
control_commands['ext-trigger'] = ['ext-trigger', 'external-trigger']
control_commands['checkpoint'] = ['checkpoint']
control_commands['client'] = ['client']
control_commands['subscribe'] = ['subscribe']

utility_commands = {}
utility_commands['cycle-point'] = [
Expand Down Expand Up @@ -363,6 +364,7 @@ comsum['broadcast'] = 'Change suite [runtime] settings on the fly'
comsum['ext-trigger'] = 'Report an external trigger event to a suite'
comsum['checkpoint'] = 'Tell suite to checkpoint its current state'
comsum['client'] = '(Internal) Invoke suite runtime client, expect JSON input'
comsum['subscribe'] = '(Internal) Invoke suite subscriber'
# discovery
comsum['ping'] = 'Check that a suite is running'
comsum['scan'] = 'Scan a host for running suites'
Expand Down
97 changes: 97 additions & 0 deletions bin/cylc-subscribe
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
#!/usr/bin/env python3

# THIS FILE IS PART OF THE CYLC SUITE ENGINE.
# Copyright (C) 2008-2019 NIWA & British Crown (Met Office) & Contributors.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

"""cylc subscribe [OPTIONS] ARGS
(This command is for internal use.)
Invoke suite subscriber to receive published workflow output.
"""

import asyncio
import json
import sys
if '--use-ssh' in sys.argv[1:]:
sys.argv.remove('--use-ssh')
from cylc.flow.remote import remrun
if remrun():
sys.exit(0)
import time

import zmq
import zmq.asyncio
from google.protobuf.json_format import MessageToDict

from cylc.flow.option_parsers import CylcOptionParser as COP
from cylc.flow.network.scan import get_scan_items_from_fs, re_compile_filters
from cylc.flow.network.subscriber import WorkflowSubscriber
from cylc.flow.terminal import cli_function
from cylc.flow.ws_messages_pb2 import PbWorkflow


def print_workflow(msg):
data = PbWorkflow()
data.ParseFromString(msg)
sys.stdout.write(
json.dumps(MessageToDict(data), indent=4) + '\n')


def get_option_parser():
parser = COP(__doc__, comms=True, argdoc=[
('REG', 'Suite name'),
('[SUBTYPE]', 'Subscription Feed Name')])

parser.add_option(
'-n', '--no-input',
help='Do not read from STDIN, assume null input',
action='store_true', dest='no_input')

return parser


@cli_function(get_option_parser)
def main(_, options, suite, subtype=None):
host = None
port = None
cre_owner, cre_name = re_compile_filters(None, ['.*'])
while True:
for s_reg, s_host, s_port, s_pub_port in get_scan_items_from_fs(
cre_owner, cre_name):
if s_reg == suite:
host = s_host
port = int(s_pub_port)
break
if host and port:
break
time.sleep(5)

print(f'Connecting to tcp://{host}:{port}')
subscriber = WorkflowSubscriber(host, port)

asyncio.ensure_future(subscriber.subscribe(print_workflow))

# run Python run
try:
asyncio.get_event_loop().run_forever()
except KeyboardInterrupt:
print('Disconnecting')
subscriber.stop()
exit()


if __name__ == '__main__':
main()
2 changes: 1 addition & 1 deletion cylc/flow/network/scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ def get_scan_items_from_fs(
active, or all (active plus registered but dormant), suites.
Yields:
tuple - (reg, host, port)
tuple - (reg, host, port, pub_port)
"""
srv_files_mgr = SuiteSrvFilesManager()
Expand Down
79 changes: 79 additions & 0 deletions cylc/flow/network/subscriber.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
#!/usr/bin/env python3

# THIS FILE IS PART OF THE CYLC SUITE ENGINE.
# Copyright (C) 2008-2019 NIWA & British Crown (Met Office) & Contributors.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""Subscriber for published suite output."""

import sys
import json

import zmq
import zmq.asyncio


# we should only have one ZMQ context per-process
CONTEXT = zmq.asyncio.Context()


class WorkflowSubscriber:
"""Initiate the SUB part of a ZMQ PUB-SUB pair.
This class contains the logic for the ZMQ message Subscriber.
NOTE: Security to be provided by zmq.auth
Args:
host (str):
The host to connect to.
port (int):
The port on the aforementioned host to connect to.
Usage:
* Subscribe to Publisher socket using ``WorkflowSubscriber.__call__``.
"""

DEFAULT_TIMEOUT = 300. # 5 min

def __init__(self, host, port, timeout=None):
if timeout is None:
timeout = self.DEFAULT_TIMEOUT
else:
timeout = float(timeout)
self.timeout = timeout * 1000

# open the ZMQ socket
self.socket = CONTEXT.socket(zmq.SUB)
self.socket.connect(f'tcp://{host}:{port}')
# if there is no server don't keep the subscriber hanging around
self.socket.setsockopt(zmq.LINGER, int(timeout))

self.socket.setsockopt(zmq.SUBSCRIBE, b'')

async def subscribe(self, msg_handler=None):
"""Subscribe to updates from the provided socket."""
while True:
msg = await self.socket.recv()
if callable(msg_handler):
msg_handler(msg)
else:
data = json.loads(msg)
sys.stdout.write(
json.dumps(data, indent=4) + '\n')

def stop(self):
"""Close subscriber socket."""
self.socket.close()

0 comments on commit d8b6cfb

Please sign in to comment.