Skip to content

Commit

Permalink
topics/multi-part messages added
Browse files Browse the repository at this point in the history
  • Loading branch information
dwsutherland committed Oct 13, 2019
1 parent d8b6cfb commit ac2fcbf
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 35 deletions.
24 changes: 11 additions & 13 deletions bin/cylc-subscribe
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ if '--use-ssh' in sys.argv[1:]:
sys.exit(0)
import time

import zmq
import zmq.asyncio
from google.protobuf.json_format import MessageToDict

from cylc.flow.option_parsers import CylcOptionParser as COP
Expand All @@ -43,28 +41,24 @@ from cylc.flow.terminal import cli_function
from cylc.flow.ws_messages_pb2 import PbWorkflow


def print_workflow(msg):
def print_workflow(topic, msg):
data = PbWorkflow()
data.ParseFromString(msg)
print('Received: ', topic.decode())
sys.stdout.write(
json.dumps(MessageToDict(data), indent=4) + '\n')


def get_option_parser():
parser = COP(__doc__, comms=True, argdoc=[
('REG', 'Suite name'),
('[SUBTYPE]', 'Subscription Feed Name')])

parser.add_option(
'-n', '--no-input',
help='Do not read from STDIN, assume null input',
action='store_true', dest='no_input')
('[TOPIC]', 'Subscription topic to receive')])

return parser


@cli_function(get_option_parser)
def main(_, options, suite, subtype=None):
def main(_, options, suite, topic=None):
host = None
port = None
cre_owner, cre_name = re_compile_filters(None, ['.*'])
Expand All @@ -80,15 +74,19 @@ def main(_, options, suite, subtype=None):
time.sleep(5)

print(f'Connecting to tcp://{host}:{port}')
subscriber = WorkflowSubscriber(host, port)
if topic is None:
topic = b'workflow'
else:
topic = topic.encode()
subscriber = WorkflowSubscriber(host, port, [topic])

asyncio.ensure_future(subscriber.subscribe(print_workflow))
asyncio.ensure_future(subscriber.subscribe([topic], print_workflow))

# run Python run
try:
asyncio.get_event_loop().run_forever()
except KeyboardInterrupt:
print('Disconnecting')
print('\nDisconnecting')
subscriber.stop()
exit()

Expand Down
52 changes: 39 additions & 13 deletions cylc/flow/network/publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,34 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""Publisher for suite runtime API."""

import asyncio
from threading import Thread

import zmq

from cylc.flow import LOG
from cylc.flow.exceptions import CylcError
from cylc.flow.ws_messages_pb2 import PbEntireWorkflow
from cylc.flow import __version__ as CYLC_VERSION

# maps server methods to the protobuf message (for client/UIS import)
PB_METHOD_MAP = {
'pb_entire_workflow': PbEntireWorkflow
}

async def gather_coros(coro_func, items):
"""Gather multi-part send coroutines"""
try:
gathers = ()
for item in items:
gathers += (coro_func(*item),)
await asyncio.gather(*gathers)
except Exception as exc:
LOG.error('publisher: gather_sends: %s' % str(exc))


def serialize_data(data, serializer):
"""Serialize by specified method."""
if callable(serializer):
return serializer(data)
elif isinstance(serializer, str):
return getattr(data, serializer)()
return data


class WorkflowPublisher:
Expand All @@ -54,6 +69,8 @@ def __init__(self, context=None):
self.socket = None
self.endpoints = None
self.thread = None
self.loop = None
self.topics = set()

def start(self, min_port, max_port):
"""Start the ZeroMQ publisher.
Expand Down Expand Up @@ -90,6 +107,11 @@ def _create_socket(self, min_port, max_port):
self.socket.close()
raise CylcError(
'could not start Cylc ZMQ publisher: %s' % str(exc))
try:
asyncio.get_running_loop()
except RuntimeError:
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)

def stop(self):
"""Stop the publisher socket."""
Expand All @@ -98,14 +120,18 @@ def stop(self):
self.socket.close()
LOG.debug('...stopped')

def publish(self, data, serializer=None):
"""Publish data."""
async def send_multi(self, topic, data, serializer=None):
"""Send multi part message."""
try:
if callable(serializer):
self.socket.send(serializer(data))
elif isinstance(serializer, str):
self.socket.send(getattr(data, serializer)())
else:
self.socket.send(data)
self.socket.send_multipart(
[topic, serialize_data(data, serializer)]
)
except Exception as exc:
LOG.error('publisher: send_multi: %s' % str(exc))

def publish(self, items):
"""Publish topics"""
try:
self.loop.run_until_complete(gather_coros(self.send_multi, items))
except Exception as exc:
LOG.error('publisher: %s' % str(exc))
14 changes: 8 additions & 6 deletions cylc/flow/network/subscriber.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@ class WorkflowSubscriber:

DEFAULT_TIMEOUT = 300. # 5 min

def __init__(self, host, port, timeout=None):
def __init__(self, host, port, topics=None, timeout=None):
if topics is None:
topics = [b'']
if timeout is None:
timeout = self.DEFAULT_TIMEOUT
else:
Expand All @@ -60,15 +62,15 @@ def __init__(self, host, port, timeout=None):
self.socket.connect(f'tcp://{host}:{port}')
# if there is no server don't keep the subscriber hanging around
self.socket.setsockopt(zmq.LINGER, int(timeout))
for topic in set(topics):
self.socket.setsockopt(zmq.SUBSCRIBE, topic)

self.socket.setsockopt(zmq.SUBSCRIBE, b'')

async def subscribe(self, msg_handler=None):
async def subscribe(self, topics, msg_handler=None):
"""Subscribe to updates from the provided socket."""
while True:
msg = await self.socket.recv()
[topic, msg] = await self.socket.recv_multipart()
if callable(msg_handler):
msg_handler(msg)
msg_handler(topic, msg)
else:
data = json.loads(msg)
sys.stdout.write(
Expand Down
6 changes: 4 additions & 2 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ def __init__(self, is_restart, options, args):
self.server = None
self.port = None
self.publisher = None
self.sub_port = None
self.pub_port = None
self.command_queue = None
self.message_queue = None
self.ext_trigger_queue = None
Expand Down Expand Up @@ -1598,7 +1598,9 @@ def update_data_structure(self):
self.ws_data_mgr.update_dynamic_elements(updated_nodes)
# Publish updates:
flow_data = self.ws_data_mgr.data[f'{self.owner}|{self.suite}']
self.publisher.publish(flow_data['workflow'], 'SerializeToString')
self.publisher.publish(
[(b'workflow', flow_data['workflow'], 'SerializeToString')]
)
# TODO: deprecate after CLI GraphQL migration
self.state_summary_mgr.update(self)
# Database update
Expand Down
4 changes: 3 additions & 1 deletion cylc/flow/suite_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

from enum import Enum

from cylc.flow.wallclock import get_time_string_from_unix_time as time2str

# Keys for identify API call
KEY_GROUP = "group"
KEY_META = "meta"
Expand Down Expand Up @@ -128,7 +130,7 @@ def get_suite_status(schd):
elif schd.stop_clock_time is not None:
status_msg = (
SUITE_STATUS_RUNNING_TO_STOP %
schd.stop_clock_time_string)
time2str(schd.stop_clock_time))
elif schd.stop_task:
status_msg = (
SUITE_STATUS_RUNNING_TO_STOP %
Expand Down

0 comments on commit ac2fcbf

Please sign in to comment.