Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Publish data store updates #3389

Merged
merged 8 commits into from
Dec 3, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,15 @@ Third alpha release of Cylc 8.

### Enhancements

[#3402](https://github.com/cylc/cylc-flow/pull/3402) - removed automatic task job
status message retries (problems that prevent message transmission are almost
never transient, and in practice job polling is the only way to recover).
[#3389](https://github.com/cylc/cylc-flow/pull/3389) - Publisher/Subscriber
network components added (0MQ PUB/SUB pattern). Used to publish fine-grained
data-store updates for the purposes of UI Server data sync, this change also
includes CLI utility: `cylc subscribe`.

[#3402](https://github.com/cylc/cylc-flow/pull/3402) - removed automatic task
job status message retries (problems that prevent message transmission are
almost never transient, and in practice job polling is the only way to
recover).

### Fixes

Expand Down
2 changes: 2 additions & 0 deletions bin/cylc-help
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ control_commands['broadcast'] = ['broadcast', 'bcast']
control_commands['ext-trigger'] = ['ext-trigger', 'external-trigger']
control_commands['checkpoint'] = ['checkpoint']
control_commands['client'] = ['client']
control_commands['subscribe'] = ['subscribe']

utility_commands = {}
utility_commands['cycle-point'] = [
Expand Down Expand Up @@ -363,6 +364,7 @@ comsum['broadcast'] = 'Change suite [runtime] settings on the fly'
comsum['ext-trigger'] = 'Report an external trigger event to a suite'
comsum['checkpoint'] = 'Tell suite to checkpoint its current state'
comsum['client'] = '(Internal) Invoke suite runtime client, expect JSON input'
comsum['subscribe'] = '(Internal) Invoke suite subscriber'
# discovery
comsum['ping'] = 'Check that a suite is running'
comsum['scan'] = 'Scan a host for running suites'
Expand Down
31 changes: 23 additions & 8 deletions bin/cylc-scan
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,11 @@ def get_option_parser():
"(total, and by cycle point).",
action="store_true", default=False, dest="state_totals")

parser.add_option(
"--publisher",
help="Append the suite publisher information to output.",
action="store_true", default=False, dest="publisher")

parser.add_option(
"-f", "--full",
help="Print all available information about each suite.",
Expand Down Expand Up @@ -128,7 +133,7 @@ def get_option_parser():
def main(parser, options):
"""Implement "cylc scan"."""
if options.full:
options.describe = options.state_totals = True
options.describe = options.state_totals = options.publisher = True
if options.format in ['raw', 'json']:
options.color = False

Expand Down Expand Up @@ -188,7 +193,7 @@ def main(parser, options):
print(state_legend.rstrip() + "\n")

# work through scan results one by one
for reg, host, port, info in suites:
for reg, host, port, pub_port, api, info in suites:
if isinstance(info, str):
print(ERROR_STYLE + ' '.join([reg, host, port, info]))
elif info is None:
Expand All @@ -198,7 +203,7 @@ def main(parser, options):
print(ERROR_STYLE + 'Warning: suite has changed name %s => %s' % (
reg, info[KEY_NAME]))
else:
formatter(reg, host, port, info, options)
formatter(reg, host, port, pub_port, api, info, options)


def sort_meta(item):
Expand All @@ -209,15 +214,21 @@ def sort_meta(item):
return key


def format_plain(name, host, port, info, options):
def format_plain(name, host, port, pub_port, api, info, options):
"""Print a scan result, implements --format=plain"""
owner = info[KEY_OWNER]

print(Style.BRIGHT + name + Style.NORMAL
+ ' %s@%s:%s' % (owner, host, port))
if options.publisher:
print(Style.BRIGHT + name + Style.NORMAL
+ ' %s@%s:%s' % (owner, host, port)
+ ' %s@%s:%s' % (owner, host, pub_port))
else:
print(Style.BRIGHT + name + Style.NORMAL
+ ' %s@%s:%s' % (owner, host, port))

if options.describe:
meta_items = info.get(KEY_META)
meta_items['API'] = api
if meta_items is None:
print(INDENT + MISSING_STYLE + "(description withheld)")
return
Expand All @@ -244,15 +255,19 @@ def format_plain(name, host, port, info, options):
print(INDENT * 2 + "%s%s" % (point_prefix, state_line))


def format_raw(name, host, port, info, options):
def format_raw(name, host, port, pub_port, api, info, options):
"""Print a scan result, implements --format=raw"""
owner = info[KEY_OWNER]

print("%s|%s|%s|port|%s" % (name, owner, host, port))
if options.publisher:
print("%s|%s|%s|port|%s|publish-port|%s" % (name, owner, host, port, pub_port))
else:
print("%s|%s|%s|port|%s" % (name, owner, host, port))

if options.describe:
# Extracting required data for these options before processing
meta_items = info.get(KEY_META)
meta_items['API'] = api

# clean_meta_items = {}
# for key, value in meta_items.items():
Expand Down
145 changes: 145 additions & 0 deletions bin/cylc-subscribe
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
#!/usr/bin/env python3

# THIS FILE IS PART OF THE CYLC SUITE ENGINE.
# Copyright (C) 2008-2019 NIWA & British Crown (Met Office) & Contributors.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

"""cylc subscribe [OPTIONS] ARGS

(This command is for internal use.)
Invoke suite subscriber to receive published workflow output.
"""

import json
import sys
import time

from google.protobuf.json_format import MessageToDict

from cylc.flow.exceptions import ClientError
from cylc.flow.option_parsers import CylcOptionParser as COP
from cylc.flow.network import get_location
from cylc.flow.network.subscriber import WorkflowSubscriber, process_delta_msg
from cylc.flow.terminal import cli_function
from cylc.flow.data_store_mgr import DELTAS_MAP

if '--use-ssh' in sys.argv[1:]:
sys.argv.remove('--use-ssh')
from cylc.flow.remote import remrun
if remrun():
sys.exit(0)


def print_message(topic, data, subscriber=None, once=False):
"""Print protobuf message."""
print(f'Received: {topic}')
if topic == 'shutdown':
print(data.decode('utf-8'))
subscriber.stop()
return
sys.stdout.write(
json.dumps(MessageToDict(data), indent=4) + '\n')
if once and subscriber is not None:
subscriber.stop()


def get_option_parser():
"""Augment options parser to current context."""
parser = COP(
__doc__,
argdoc=[
('REG', 'Suite name'),
('[USER_AT_HOST]', 'user@host:port, shorthand for --user, '
'--host & --port.')],
comms=True,
noforce=True
)

delta_keys = list(DELTAS_MAP)
pb_topics = ("Directly published data-store topics include: '" +
("', '").join(delta_keys[:-1]) +
"' and '" + delta_keys[-1] + "'.")
parser.add_option(
"-T", "--topics",
help="Specify a comma delimited list of subscription topics. "
+ pb_topics,
action="store", dest="topics", default='workflow')

parser.add_option(
"-o", "--once",
help="Show a single publish then exit.",
action="store_true", default=False, dest="once")


return parser


@cli_function(get_option_parser)
def main(_, options, *args):
suite = args[0]

if len(args) > 1:
try:
user_at_host, options.port = args[1].split(':')
options.owner, options.host = user_at_host.split('@')
except ValueError:
print(('USER_AT_HOST must take the form '
'"user@host:port"'), file=sys.stderr)
sys.exit(1)
elif options.host is None or options.port is None:
try:
while True:
try:
options.host, _, options.port = get_location(
suite, options.owner, options.host)
except (ClientError, IOError, TypeError, ValueError):
time.sleep(3)
continue
break
except KeyboardInterrupt:
exit()

print(f'Connecting to tcp://{options.host}:{options.port}')
topic_set = set()
topic_set.add(b'shutdown')
for topic in options.topics.split(','):
topic_set.add(topic.encode('utf-8'))

subscriber = WorkflowSubscriber(
suite,
host=options.host,
port=options.port,
topics=topic_set)

subscriber.loop.create_task(
subscriber.subscribe(
process_delta_msg,
func=print_message,
subscriber=subscriber,
once=options.once
)
)

# run Python run
try:
subscriber.loop.run_forever()
except (KeyboardInterrupt, SystemExit):
print('\nDisconnecting')
subscriber.stop()
exit()


if __name__ == '__main__':
main()
13 changes: 11 additions & 2 deletions cylc/flow/daemonize.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@
"""

_INFO_TMPL = r"""
*** listening on %(url)s ***""" + SUITE_SCAN_INFO_TMPL
*** listening on %(url)s ***
*** publishing on %(pub_url)s ***""" + SUITE_SCAN_INFO_TMPL


_TIMEOUT = 300.0 # 5 minutes

Expand All @@ -64,9 +66,12 @@ def daemonize(server):
# Poll for suite log to be populated
suite_pid = None
suite_url = None
pub_url = None
timeout = time() + _TIMEOUT
while time() <= timeout and (
suite_pid is None or suite_url is None):
suite_pid is None or
suite_url is None or
pub_url is None):
sleep(0.1)
try:
# First INFO line of suite log should contain
Expand All @@ -83,6 +88,9 @@ def daemonize(server):
suite_url, suite_pid = (
item.rsplit("=", 1)[-1]
for item in line.rsplit()[-2:])
if server.START_PUB_MESSAGE_PREFIX in line:
pub_url = line.rsplit("=", 1)[-1].rstrip()
if suite_url and pub_url:
break
elif ' ERROR -' in line or ' CRITICAL -' in line:
# ERROR and CRITICAL before suite starts
Expand All @@ -100,6 +108,7 @@ def daemonize(server):
"suite": server.suite,
"host": server.host,
"url": suite_url,
"pub_url": pub_url,
"ps_opts": PS_OPTS,
"pid": suite_pid,
})
Expand Down
55 changes: 53 additions & 2 deletions cylc/flow/ws_messages.proto → cylc/flow/data_messages.proto
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,14 @@ syntax = "proto3";
* message modules.
*
* Command:
* $ protoc -I=./ --python_out=./ ws_messages.proto
* $ protoc -I=./ --python_out=./ data_messages.proto
*
* Pre-compiled protoc binary may be download from:
* https://github.com/protocolbuffers/protobuf/releases
*
* If merge/rebase conflicts arise, then regenerate the module.
* (DO NOT manually resolve conflicts)
*
* */


Expand Down Expand Up @@ -67,7 +70,7 @@ message PbWorkflow {
string newest_runahead_cycle_point = 15;
string newest_cycle_point = 16;
string oldest_cycle_point = 17;
bool reloading = 18;
bool reloaded = 18;
string run_mode = 19;
string cycling_mode = 20;
map<string, int32> state_totals = 21;
Expand Down Expand Up @@ -231,3 +234,51 @@ message PbEntireWorkflow {
repeated PbFamilyProxy family_proxies = 6;
repeated PbEdge edges = 7;
}

message EDeltas {
double time = 1;
int64 checksum = 2;
repeated string pruned = 3;
repeated PbEdge deltas = 4;
bool reloaded = 5;
}

message FDeltas {
double time = 1;
int64 checksum = 2;
repeated string pruned = 3;
repeated PbFamily deltas = 4;
bool reloaded = 5;
}

message FPDeltas {
double time = 1;
int64 checksum = 2;
repeated string pruned = 3;
repeated PbFamilyProxy deltas = 4;
bool reloaded = 5;
}

message JDeltas {
double time = 1;
int64 checksum = 2;
repeated string pruned = 3;
repeated PbJob deltas = 4;
bool reloaded = 5;
}

message TDeltas {
double time = 1;
int64 checksum = 2;
repeated string pruned = 3;
repeated PbTask deltas = 4;
bool reloaded = 5;
}

message TPDeltas {
double time = 1;
int64 checksum = 2;
repeated string pruned = 3;
repeated PbTaskProxy deltas = 4;
bool reloaded = 5;
}
Loading