Skip to content

Commit

Permalink
Shutdown publish & cylc-subscribe fix
Browse files Browse the repository at this point in the history
  • Loading branch information
dwsutherland committed Nov 27, 2019
1 parent 8b9aa4e commit 84440b6
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 29 deletions.
39 changes: 21 additions & 18 deletions bin/cylc-subscribe
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,15 @@
Invoke suite subscriber to receive published workflow output.
"""

import asyncio
import json
import sys
import time

from google.protobuf.json_format import MessageToDict

from cylc.flow.exceptions import ClientError
from cylc.flow.option_parsers import CylcOptionParser as COP
from cylc.flow.network.scan import get_scan_items_from_fs, re_compile_filters
from cylc.flow.network import get_location
from cylc.flow.network.subscriber import WorkflowSubscriber, process_delta_msg
from cylc.flow.terminal import cli_function
from cylc.flow.ws_data_mgr import DELTAS_MAP
Expand All @@ -45,6 +45,10 @@ if '--use-ssh' in sys.argv[1:]:
def print_message(topic, data, subscriber=None, once=False):
"""Print protobuf message."""
print(f'Received: {topic}')
if topic == 'shutdown':
print(data.decode('utf-8'))
subscriber.stop()
return
sys.stdout.write(
json.dumps(MessageToDict(data), indent=4) + '\n')
if once and subscriber is not None:
Expand All @@ -65,12 +69,12 @@ def get_option_parser():

delta_keys = list(DELTAS_MAP)
pb_topics = ("Directly published data-store topics include: '" +
("', '").join(delta_keys[:-1]) +
"' and '" + delta_keys[-1] + "'.")
("', '").join(delta_keys[:-1]) +
"' and '" + delta_keys[-1] + "'.")
parser.add_option(
"-T", "--topics",
help="Specify a comma delimited list of subscription topics. "
+ pb_topics,
+ pb_topics,
action="store", dest="topics", default='workflow')

parser.add_option(
Expand All @@ -95,22 +99,21 @@ def main(_, options, *args):
'"user@host:port"'), file=sys.stderr)
sys.exit(1)
elif options.host is None or options.port is None:
cre_owner, cre_name = re_compile_filters(None, ['.*'])
while True:
for reg, host, _, pub_port in get_scan_items_from_fs(
cre_owner, cre_name):
if reg == suite:
if options.host is None:
options.host = host
if options.port is None:
options.port = int(pub_port)
break
if options.host and options.port:
try:
while True:
try:
options.host, _, options.port = get_location(
suite, options.owner, options.host)
except (ClientError, IOError, TypeError, ValueError):
time.sleep(3)
continue
break
time.sleep(5)
except KeyboardInterrupt:
exit()

print(f'Connecting to tcp://{options.host}:{options.port}')
topic_set = set()
topic_set.add(b'shutdown')
for topic in options.topics.split(','):
topic_set.add(topic.encode('utf-8'))

Expand All @@ -132,7 +135,7 @@ def main(_, options, *args):
# run Python run
try:
subscriber.loop.run_forever()
except KeyboardInterrupt:
except (KeyboardInterrupt, SystemExit):
print('\nDisconnecting')
subscriber.stop()
exit()
Expand Down
2 changes: 1 addition & 1 deletion cylc/flow/network/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import zmq.asyncio

from cylc.flow import LOG
from cylc.flow.exceptions import CylcError, ClientError, SuiteServiceFileError
from cylc.flow.exceptions import ClientError, CylcError, SuiteServiceFileError
from cylc.flow.hostuserutil import get_fqdn_by_host
from cylc.flow.suite_files import (
ContactFileFields,
Expand Down
17 changes: 9 additions & 8 deletions cylc/flow/network/publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""Publisher for suite runtime API."""
"""Publisher for workflow runtime API."""

import asyncio

Expand Down Expand Up @@ -75,7 +75,14 @@ def _bespoke_stop(self):
self.stopping = True

async def send_multi(self, topic, data, serializer=None):
"""Send multi part message."""
"""Send multi part message.
Args:
topic (bytes): The topic of the message.
data (object): Data element/message to serialise and send.
serializer (object, optional): string/func for encoding.
"""
self.topics.add(topic)
self.socket.send_multipart(
[topic, serialize_data(data, serializer)]
Expand All @@ -87,12 +94,6 @@ def publish(self, items):
Args:
items (iterable): [(topic, data, serializer)]
topic (bytes): The topic of the message.
data (object): Data element/message to serialise and send.
serializer (object, optional): string or func object.
"""
try:
self.loop.run_until_complete(gather_coros(self.send_multi, items))
Expand Down
2 changes: 1 addition & 1 deletion cylc/flow/network/subscriber.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def process_delta_msg(btopic, delta_msg, func, *args, **kwargs):
delta = DELTAS_MAP[topic]()
delta.ParseFromString(delta_msg)
except KeyError:
return (topic, None)
delta = delta_msg
if callable(func):
return func(topic, delta, *args, **kwargs)
return (topic, delta)
Expand Down
2 changes: 2 additions & 0 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1790,6 +1790,8 @@ def shutdown(self, reason):
if self.server:
self.server.stop()
if self.publisher:
self.publisher.publish(
[(b'shutdown', f'{str(reason)}'.encode('utf-8'))])
self.publisher.stop()
self.curve_auth.stop() # stop the authentication thread

Expand Down
2 changes: 1 addition & 1 deletion cylc/flow/tests/network/test_subscriber.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def test_process_delta_msg():
# test non-key
not_topic, not_delta = process_delta_msg(b'foo', b'bar', None)
assert not_topic == 'foo'
assert not_delta is None
assert not_delta == b'bar'


class TestWorkflowSubscriber(CylcWorkflowTestCase):
Expand Down

0 comments on commit 84440b6

Please sign in to comment.