Skip to content

Commit

Permalink
API version, module names, record changes
Browse files Browse the repository at this point in the history
  • Loading branch information
dwsutherland committed Nov 29, 2019
1 parent f2eef6f commit 1f281e3
Show file tree
Hide file tree
Showing 28 changed files with 249 additions and 235 deletions.
12 changes: 9 additions & 3 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,15 @@ Third alpha release of Cylc 8.

### Enhancements

[#3402](https://github.com/cylc/cylc-flow/pull/3402) - removed automatic task job
status message retries (problems that prevent message transmission are almost
never transient, and in practice job polling is the only way to recover).
[#3389](https://github.com/cylc/cylc-flow/pull/3389) - Publisher/Subscriber
network components added (0MQ PUB/SUB pattern). Used to publish fine-grained
data-store updates for the purposes of UI Server data sync, this change also
includes CLI utility: `cylc subscribe`.

[#3402](https://github.com/cylc/cylc-flow/pull/3402) - removed automatic task
job status message retries (problems that prevent message transmission are
almost never transient, and in practice job polling is the only way to
recover).

### Fixes

Expand Down
10 changes: 6 additions & 4 deletions bin/cylc-scan
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ def main(parser, options):
print(state_legend.rstrip() + "\n")

# work through scan results one by one
for reg, host, port, pub_port, info in suites:
for reg, host, port, pub_port, api, info in suites:
if isinstance(info, str):
print(ERROR_STYLE + ' '.join([reg, host, port, info]))
elif info is None:
Expand All @@ -203,7 +203,7 @@ def main(parser, options):
print(ERROR_STYLE + 'Warning: suite has changed name %s => %s' % (
reg, info[KEY_NAME]))
else:
formatter(reg, host, port, pub_port, info, options)
formatter(reg, host, port, pub_port, api, info, options)


def sort_meta(item):
Expand All @@ -214,7 +214,7 @@ def sort_meta(item):
return key


def format_plain(name, host, port, pub_port, info, options):
def format_plain(name, host, port, pub_port, api, info, options):
"""Print a scan result, implements --format=plain"""
owner = info[KEY_OWNER]

Expand All @@ -228,6 +228,7 @@ def format_plain(name, host, port, pub_port, info, options):

if options.describe:
meta_items = info.get(KEY_META)
meta_items['API'] = api
if meta_items is None:
print(INDENT + MISSING_STYLE + "(description withheld)")
return
Expand All @@ -254,7 +255,7 @@ def format_plain(name, host, port, pub_port, info, options):
print(INDENT * 2 + "%s%s" % (point_prefix, state_line))


def format_raw(name, host, port, pub_port, info, options):
def format_raw(name, host, port, pub_port, api, info, options):
"""Print a scan result, implements --format=raw"""
owner = info[KEY_OWNER]

Expand All @@ -266,6 +267,7 @@ def format_raw(name, host, port, pub_port, info, options):
if options.describe:
# Extracting required data for these options before processing
meta_items = info.get(KEY_META)
meta_items['API'] = api

# clean_meta_items = {}
# for key, value in meta_items.items():
Expand Down
2 changes: 1 addition & 1 deletion bin/cylc-subscribe
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ from cylc.flow.option_parsers import CylcOptionParser as COP
from cylc.flow.network import get_location
from cylc.flow.network.subscriber import WorkflowSubscriber, process_delta_msg
from cylc.flow.terminal import cli_function
from cylc.flow.ws_data_mgr import DELTAS_MAP
from cylc.flow.data_store_mgr import DELTAS_MAP

if '--use-ssh' in sys.argv[1:]:
sys.argv.remove('--use-ssh')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ syntax = "proto3";
* message modules.
*
* Command:
* $ protoc -I=./ --python_out=./ ws_messages.proto
* $ protoc -I=./ --python_out=./ data_messages.proto
*
* Pre-compiled protoc binary may be download from:
* https://github.com/protocolbuffers/protobuf/releases
Expand Down
294 changes: 146 additions & 148 deletions cylc/flow/ws_messages_pb2.py → cylc/flow/data_messages_pb2.py

Large diffs are not rendered by default.

39 changes: 20 additions & 19 deletions cylc/flow/ws_data_mgr.py → cylc/flow/data_store_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,18 +55,19 @@
from time import time
import zlib

from cylc.flow import __version__ as CYLC_VERSION
from cylc.flow.cycling.loader import get_point
from cylc.flow.task_id import TaskID
from cylc.flow.data_messages_pb2 import (
PbEdge, PbEntireWorkflow, PbFamily, PbFamilyProxy,
PbJob, PbTask, PbTaskProxy, PbWorkflow,
EDeltas, FDeltas, FPDeltas, JDeltas, TDeltas, TPDeltas)
from cylc.flow.network import API
from cylc.flow.suite_status import get_suite_status
from cylc.flow.task_id import TaskID
from cylc.flow.task_job_logs import JOB_LOG_OPTS
from cylc.flow.task_state_prop import extract_group_state
from cylc.flow.wallclock import (
TIME_ZONE_LOCAL_INFO, TIME_ZONE_UTC_INFO, get_utc_mode)
from cylc.flow.task_job_logs import JOB_LOG_OPTS
from cylc.flow import __version__ as CYLC_VERSION
from cylc.flow.ws_messages_pb2 import (
PbEdge, PbEntireWorkflow, PbFamily, PbFamilyProxy,
PbJob, PbTask, PbTaskProxy, PbWorkflow,
EDeltas, FDeltas, FPDeltas, JDeltas, TDeltas, TPDeltas)


ID_DELIM = '|'
Expand Down Expand Up @@ -151,7 +152,7 @@ def apply_delta(key, delta, data):
del data[key][del_id]


class WsDataMgr:
class DataStoreMgr:
"""Manage the workflow data store.
Attributes:
Expand All @@ -162,19 +163,19 @@ class WsDataMgr:
for each cycle point key.
.data (dict):
.edges (dict):
cylc.flow.ws_messages_pb2.PbEdge by internal ID.
cylc.flow.data_messages_pb2.PbEdge by internal ID.
.families (dict):
cylc.flow.ws_messages_pb2.PbFamily by name (internal ID).
cylc.flow.data_messages_pb2.PbFamily by name (internal ID).
.family_proxies (dict):
cylc.flow.ws_messages_pb2.PbFamilyProxy by internal ID.
cylc.flow.data_messages_pb2.PbFamilyProxy by internal ID.
.jobs (dict):
cylc.flow.ws_messages_pb2.PbJob by internal ID, managed by
cylc.flow.data_messages_pb2.PbJob by internal ID, managed by
cylc.flow.job_pool.JobPool
.tasks (dict):
cylc.flow.ws_messages_pb2.PbTask by name (internal ID).
cylc.flow.data_messages_pb2.PbTask by name (internal ID).
.task_proxies (dict):
cylc.flow.ws_messages_pb2.PbTaskProxy by internal ID.
.workflow (cylc.flow.ws_messages_pb2.PbWorkflow)
cylc.flow.data_messages_pb2.PbTaskProxy by internal ID.
.workflow (cylc.flow.data_messages_pb2.PbWorkflow)
Message containing the global information of the workflow.
.descendants (dict):
Local store of config.get_first_parent_descendants()
Expand Down Expand Up @@ -385,7 +386,7 @@ def generate_definition_elements(self):
families[f_id].child_families.append(ch_id)

# Populate static fields of workflow
workflow.api_version = self.schd.server.API
workflow.api_version = API
workflow.cylc_version = CYLC_VERSION
workflow.name = self.schd.suite
workflow.owner = self.schd.owner
Expand Down Expand Up @@ -428,7 +429,7 @@ def generate_ghost_task(self, task_id):
Returns:
object: cylc.flow.ws_messages_pb2.PbTaskProxy
object: cylc.flow.data_messages_pb2.PbTaskProxy
Populated task proxy data element.
"""
Expand Down Expand Up @@ -470,7 +471,7 @@ def generate_ghost_families(self, cycle_points=None):
a set of cycle points.
Returns:
list: [cylc.flow.ws_messages_pb2.PbFamilyProxy]
list: [cylc.flow.data_messages_pb2.PbFamilyProxy]
list of populated family proxy data elements.
"""
Expand Down Expand Up @@ -1016,7 +1017,7 @@ def get_entire_workflow(self):
"""Gather data elements into single Protobuf message.
Returns:
cylc.flow.ws_messages_pb2.PbEntireWorkflow
cylc.flow.data_messages_pb2.PbEntireWorkflow
"""

Expand Down
4 changes: 2 additions & 2 deletions cylc/flow/job_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@
TASK_STATUS_READY, TASK_STATUS_SUBMITTED, TASK_STATUS_SUBMIT_FAILED,
TASK_STATUS_RUNNING, TASK_STATUS_SUCCEEDED,
TASK_STATUS_FAILED)
from cylc.flow.ws_messages_pb2 import PbJob, JDeltas
from cylc.flow.ws_data_mgr import ID_DELIM
from cylc.flow.data_messages_pb2 import PbJob, JDeltas
from cylc.flow.data_store_mgr import ID_DELIM

JOB_STATUSES_ALL = [
TASK_STATUS_READY,
Expand Down
13 changes: 3 additions & 10 deletions cylc/flow/network/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
UserFiles
)

API = 5 # cylc API version


def encode_(message):
"""Convert the structure holding a message field from JSON to a string."""
Expand Down Expand Up @@ -259,16 +261,7 @@ def stop(self, stop_loop=True):
"""
self._bespoke_stop()
if stop_loop and self.loop:
if self.loop.is_running():
future = asyncio.run_coroutine_threadsafe(
self.loop.shutdown_asyncgens(),
self.loop
)
try:
future.result(2.0)
except asyncio.TimeoutError:
pass
if stop_loop and self.loop and self.loop.is_running():
self.loop.stop()
if self.thread and self.thread.is_alive():
self.thread.join() # Wait for processes to return
Expand Down
2 changes: 1 addition & 1 deletion cylc/flow/network/resolvers.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from fnmatch import fnmatchcase
from graphene.utils.str_converters import to_snake_case

from cylc.flow.ws_data_mgr import (
from cylc.flow.data_store_mgr import (
ID_DELIM, EDGES, FAMILY_PROXIES, TASK_PROXIES, WORKFLOW)
from cylc.flow.network.schema import NodesEdges, PROXY_NODES

Expand Down
26 changes: 15 additions & 11 deletions cylc/flow/network/scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,9 @@ def scan_many(items, methods=None, timeout=None, ordered=False):
"""Call "identify" method of suites on many host:port.
Args:
items (list): list of 'host' string or ('host', port) tuple to scan.
items (list):
list of 'host' string or ('host', port, pub_port, api)
tuple to scan.
methods (list): list of 'method' string to be executed when scanning.
timeout (float): connection timeout, default is CONNECT_TIMEOUT.
ordered (bool): whether to scan items in order or not (default).
Expand All @@ -136,8 +138,8 @@ def scan_many(items, methods=None, timeout=None, ordered=False):
list: [(host, port, identify_result), ...]
"""
args = ((reg, host, port, pub_port, timeout, methods)
for reg, host, port, pub_port in items)
args = ((reg, host, port, pub_port, api, timeout, methods)
for reg, host, port, pub_port, api in items)

if ordered:
yield from async_map(scan_one, args)
Expand All @@ -146,19 +148,20 @@ def scan_many(items, methods=None, timeout=None, ordered=False):
result for _, result in async_unordered_map(scan_one, args))


async def scan_one(reg, host, port, pub_port, timeout=None, methods=None):
async def scan_one(reg, host, port, pub_port, api, timeout=None, methods=None):
"""Connect to and identify workflow server if possible.
Args:
reg (str): Registered name of workflow.
host (str): Workflow host.
port (int): Workflow server port.
pub_port (int): Workflow publisher port.
api (str): Workflow API version.
timeout (float, optional): Client socket receiver timeout.
methods (list): List of methods/endpoints to request.
Returns:
tuple: (reg, host, port, result)
tuple: (reg, host, port, pub_port, result)
"""
if not methods:
Expand All @@ -171,7 +174,7 @@ async def scan_one(reg, host, port, pub_port, timeout=None, methods=None):
if cylc.flow.flags.debug:
raise
sys.stderr.write("ERROR: %s: %s\n" % (exc, host))
return (reg, host, port, pub_port, None)
return (reg, host, port, pub_port, api, None)

# NOTE: Connect to the suite by host:port, this was the
# SuiteRuntimeClient will not attempt to check the contact file
Expand All @@ -188,13 +191,13 @@ async def scan_one(reg, host, port, pub_port, timeout=None, methods=None):
except ClientTimeout as exc:
LOG.exception(
"Timeout: name:%s, host:%s, port:%s", reg, host, port)
return (reg, host, port, pub_port, MSG_TIMEOUT)
return (reg, host, port, pub_port, api, MSG_TIMEOUT)
except ClientError as exc:
LOG.exception("ClientError")
return (reg, host, port, pub_port, result or None)
return (reg, host, port, pub_port, api, result or None)
else:
result.update(msg)
return (reg, host, port, pub_port, result)
return (reg, host, port, pub_port, api, result)


def re_compile_filters(patterns_owner=None, patterns_name=None):
Expand Down Expand Up @@ -227,7 +230,7 @@ def get_scan_items_from_fs(
active, or all (active plus registered but dormant), suites.
Yields:
tuple - (reg, host, port, pub_port)
tuple - (reg, host, port, pub_port, api)
"""
if owner_pattern is None:
Expand Down Expand Up @@ -277,7 +280,8 @@ def get_scan_items_from_fs(
reg,
contact_data[ContactFileFields.HOST],
contact_data[ContactFileFields.PORT],
contact_data[ContactFileFields.PUBLISH_PORT]
contact_data[ContactFileFields.PUBLISH_PORT],
contact_data[ContactFileFields.API]
)
else:
try:
Expand Down
2 changes: 1 addition & 1 deletion cylc/flow/network/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from typing import Callable, AsyncGenerator, Any

from cylc.flow.task_state import TASK_STATUSES_ORDERED
from cylc.flow.ws_data_mgr import (
from cylc.flow.data_store_mgr import (
ID_DELIM, FAMILIES, FAMILY_PROXIES,
JOBS, TASKS, TASK_PROXIES
)
Expand Down
6 changes: 2 additions & 4 deletions cylc/flow/network/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@
KEY_META, KEY_NAME, KEY_OWNER, KEY_STATES,
KEY_TASKS_BY_STATE, KEY_UPDATE_TIME, KEY_VERSION
)
from cylc.flow.ws_data_mgr import DELTAS_MAP
from cylc.flow.ws_messages_pb2 import PbEntireWorkflow
from cylc.flow.data_store_mgr import DELTAS_MAP
from cylc.flow.data_messages_pb2 import PbEntireWorkflow
from cylc.flow import __version__ as CYLC_VERSION

# maps server methods to the protobuf message (for client/UIS import)
Expand Down Expand Up @@ -99,8 +99,6 @@ class SuiteRuntimeServer(ZMQSocketBase):
"""

API = 4 # cylc API version

RECV_TIMEOUT = 1
"""Max time the SuiteRuntimeServer will wait for an incoming
message in seconds.
Expand Down
2 changes: 1 addition & 1 deletion cylc/flow/network/subscriber.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import zmq

from cylc.flow.network import ZMQSocketBase, get_location
from cylc.flow.ws_data_mgr import DELTAS_MAP
from cylc.flow.data_store_mgr import DELTAS_MAP


def process_delta_msg(btopic, delta_msg, func, *args, **kwargs):
Expand Down
2 changes: 1 addition & 1 deletion cylc/flow/prerequisite.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from cylc.flow.conditional_simplifier import ConditionalSimplifier
from cylc.flow.cycling.loader import get_point
from cylc.flow.exceptions import TriggerExpressionError
from cylc.flow.ws_messages_pb2 import PbPrerequisite, PbCondition
from cylc.flow.data_messages_pb2 import PbPrerequisite, PbCondition


class Prerequisite(object):
Expand Down
Loading

0 comments on commit 1f281e3

Please sign in to comment.