From 1f3a0b85d4e3a905654246aa2c3f13e39e8aaa2f Mon Sep 17 00:00:00 2001 From: David Sutherland Date: Sat, 19 Oct 2019 02:00:32 +1300 Subject: [PATCH 1/5] Data sync functional --- cylc/uiserver/data_mgr.py | 174 +++++++++++++++++----- cylc/uiserver/main.py | 23 ++- cylc/uiserver/resolvers.py | 6 +- cylc/uiserver/tests/test_workflows_mgr.py | 44 +++--- cylc/uiserver/workflows_mgr.py | 41 ++--- 5 files changed, 204 insertions(+), 84 deletions(-) diff --git a/cylc/uiserver/data_mgr.py b/cylc/uiserver/data_mgr.py index c4672f26..0aec5d49 100644 --- a/cylc/uiserver/data_mgr.py +++ b/cylc/uiserver/data_mgr.py @@ -19,39 +19,133 @@ import asyncio import logging -from cylc.flow.exceptions import ClientError, ClientTimeout +from concurrent.futures import ThreadPoolExecutor +from functools import partial +from time import sleep + from cylc.flow.network.server import PB_METHOD_MAP from cylc.flow.network.scan import MSG_TIMEOUT +from cylc.flow.network.subscriber import WorkflowSubscriber, process_delta_msg from cylc.flow.ws_data_mgr import ( - EDGES, FAMILIES, FAMILY_PROXIES, JOBS, TASKS, TASK_PROXIES, WORKFLOW + EDGES, FAMILIES, FAMILY_PROXIES, JOBS, TASKS, TASK_PROXIES, WORKFLOW, + DELTAS_MAP, apply_delta, generate_checksum ) +from .workflows_mgr import workflow_request logger = logging.getLogger(__name__) -async def get_workflow_data(w_id, client, method): - """Call WS endpoint for entire workflow protobuf message.""" - # Use already established client - try: - pb_msg = await client.async_request(method) - except ClientTimeout as exc: - logger.exception(exc) - return (w_id, MSG_TIMEOUT) - except ClientError as exc: - logger.exception(exc) - return (w_id, None) - else: - ws_data = PB_METHOD_MAP[method]() - ws_data.ParseFromString(pb_msg) - return (w_id, ws_data) - - class DataManager: """Manage the local data-store acquisition/updates from all workflows.""" - def __init__(self, ws_mgr): - self.ws_mgr = ws_mgr + def __init__(self, workfloworkflows_mgr): + self.workflows_mgr = workfloworkflows_mgr self.data = {} + self.w_subs = {} + self.topics = {topic.encode('utf-8') for topic in DELTAS_MAP} + self.loop = None + + async def sync_workflows(self): + """Run data store sync with workflow services. + + Subscriptions and sync management is instantiated and run in + a separate thread for each workflow. This is to avoid the sync loop + blocking the main loop. + """ + self.loop = asyncio.get_running_loop() + # Might be options other than threads to achieve + # non-blocking subscriptions, but this works. + with ThreadPoolExecutor() as pool: + while True: + for w_id, info in self.workflows_mgr.workflows.items(): + if w_id not in self.w_subs: + pool.submit( + partial( + self._start_subscription, + w_id, + info['host'], + info['pub_port'])) + await self.entire_workflow_update(ids=[w_id]) + for w_id in list(self.w_subs): + if w_id not in self.workflows_mgr.workflows: + self.w_subs[w_id].stop() + del self.w_subs[w_id] + if w_id in self.data: + del self.data[w_id] + + await asyncio.sleep(1.0) + + def _start_subscription(self, w_id, host, port): + """Instatiate and run subscriber and data-store management.""" + self.w_subs[w_id] = WorkflowSubscriber( + host, port, + context=self.workflows_mgr.context, topics=self.topics) + self.w_subs[w_id].loop.run_until_complete( + self.w_subs[w_id].subscribe( + process_delta_msg, + func=self.update_workflow_data, + w_id=w_id)) + + def update_workflow_data(self, topic, delta, w_id): + """Manage and apply incomming data-store deltas.""" + loop_cnt = 0 + while loop_cnt < 5: + if w_id not in self.data: + sleep(0.5) + loop_cnt += 1 + continue + break + if w_id not in self.data: + return + delta_time = getattr( + delta, 'time', getattr(delta, 'last_updated', 0.0)) + # If the workflow has reloaded recreate the data + if delta.reloaded: + self.data[w_id][topic] = {ele.id: ele for ele in delta.deltas} + self.data[w_id]['delta_times'][topic] = delta_time + elif delta_time >= self.data[w_id]['delta_times'][topic]: + apply_delta(topic, delta, self.data[w_id]) + self.data[w_id]['delta_times'][topic] = delta_time + self.reconcile_update(topic, delta, w_id) + + def reconcile_update(self, topic, delta, w_id): + """Reconcile local with workflow data-store. + + Verify data-store is in sync by topic/element-type + and on failure request entire set of respective data elements. + + """ + if topic == WORKFLOW: + return + if topic == EDGES: + s_att = 'id' + else: + s_att = 'stamp' + local_checksum = generate_checksum( + [getattr(e, s_att) + for e in self.data[w_id][topic].values()]) + if local_checksum != delta.checksum: + future = asyncio.run_coroutine_threadsafe( + workflow_request( + self.workflows_mgr.workflows[w_id]['req_client'], + 'pb_data_elements', + args={'element_type': topic} + ), + self.loop + ) + try: + _, new_delta_msg = future.result(5.0) + except asyncio.TimeoutError: + logger.info(f'The reconcile update coroutine {w_id} {topic}' + f'took too long, cancelling the task...') + future.cancel() + except Exception as exc: + logger.exception(exc) + else: + new_delta = DELTAS_MAP[topic]() + new_delta.ParseFromString(new_delta_msg) + apply_delta(topic, new_delta, self.data[w_id]) + self.data[w_id]['delta_times'][topic] = new_delta.time # Data syncing async def entire_workflow_update(self, ids=None): @@ -61,33 +155,41 @@ async def entire_workflow_update(self, ids=None): # Prune old data for w_id in list(self.data): - if w_id not in self.ws_mgr.workflows: + if w_id not in self.workflows_mgr.workflows: del self.data[w_id] - # Fetch new data - ws_args = ( - (w_id, info['req_client'], 'pb_entire_workflow') - for w_id, info in self.ws_mgr.workflows.items()) + # Request new data + req_method = 'pb_entire_workflow' + req_kwargs = ( + {'client': info['req_client'], + 'command': req_method, + 'req_context': w_id} + for w_id, info in self.workflows_mgr.workflows.items()) gathers = () - for args in ws_args: - if not ids or args[0] in ids: - gathers += (get_workflow_data(*args),) + for kwargs in req_kwargs: + if not ids or kwargs['req_context'] in ids: + gathers += (workflow_request(**kwargs),) items = await asyncio.gather(*gathers) new_data = {} for w_id, result in items: if result is not None and result != MSG_TIMEOUT: + pb_data = PB_METHOD_MAP[req_method]() + pb_data.ParseFromString(result) new_data[w_id] = { - EDGES: {e.id: e for e in getattr(result, EDGES)}, - FAMILIES: {f.id: f for f in getattr(result, FAMILIES)}, + EDGES: {e.id: e for e in getattr(pb_data, EDGES)}, + FAMILIES: {f.id: f for f in getattr(pb_data, FAMILIES)}, FAMILY_PROXIES: { n.id: n - for n in getattr(result, FAMILY_PROXIES)}, - JOBS: {j.id: j for j in getattr(result, JOBS)}, - TASKS: {t.id: t for t in getattr(result, TASKS)}, + for n in getattr(pb_data, FAMILY_PROXIES)}, + JOBS: {j.id: j for j in getattr(pb_data, JOBS)}, + TASKS: {t.id: t for t in getattr(pb_data, TASKS)}, TASK_PROXIES: { n.id: n - for n in getattr(result, TASK_PROXIES)}, - WORKFLOW: getattr(result, WORKFLOW), + for n in getattr(pb_data, TASK_PROXIES)}, + WORKFLOW: getattr(pb_data, WORKFLOW), + 'delta_times': { + topic: getattr(pb_data, WORKFLOW).last_updated + for topic in DELTAS_MAP.keys()} } self.data.update(new_data) diff --git a/cylc/uiserver/main.py b/cylc/uiserver/main.py index 4ff38d7b..8b627215 100644 --- a/cylc/uiserver/main.py +++ b/cylc/uiserver/main.py @@ -21,6 +21,7 @@ import logging import os import signal +from functools import partial from cylc.flow.network.schema import schema from logging.config import dictConfig @@ -44,9 +45,12 @@ def signal_handler(self, signum, frame): logger.info('exiting...') self.is_closing = True - def try_exit(self): + def try_exit(self, uis): if self.is_closing: # clean up here + for sub in uis.data_mgr.w_subs.values(): + sub.stop() + uis.workflows_mgr.context.destroy() ioloop.IOLoop.instance().stop() logger.info('exit success') @@ -61,11 +65,11 @@ def __init__(self, port, static, jupyter_hub_service_prefix): script_dir = os.path.dirname(__file__) self._static = os.path.abspath(os.path.join(script_dir, static)) self._jupyter_hub_service_prefix = jupyter_hub_service_prefix - self.ws_mgr = WorkflowsManager() - self.data_mgr = DataManager(self.ws_mgr) + self.workflows_mgr = WorkflowsManager() + self.data_mgr = DataManager(self.workflows_mgr) self.resolvers = Resolvers( self.data_mgr.data, - ws_mgr=self.ws_mgr) + workflows_mgr=self.workflows_mgr) def _make_app(self, debug: bool): """Crete a Tornado web application. @@ -126,14 +130,17 @@ def start(self, debug: bool): app = self._make_app(debug) signal.signal(signal.SIGINT, app.signal_handler) app.listen(self._port) - ioloop.PeriodicCallback(app.try_exit, 100).start() + ioloop.PeriodicCallback( + partial(app.try_exit, uis=self), 100).start() # Discover workflows on initial start up. - ioloop.IOLoop.current().add_callback(self.ws_mgr.gather_workflows) + ioloop.IOLoop.current().add_callback( + self.workflows_mgr.gather_workflows) + # Start the workflow data-store sync + ioloop.IOLoop.current().add_callback(self.data_mgr.sync_workflows) # If the client is already established it's not overridden, # so the following callbacks can happen at the same time. - ioloop.PeriodicCallback(self.ws_mgr.gather_workflows, 10000).start() ioloop.PeriodicCallback( - self.data_mgr.entire_workflow_update, 5000).start() + self.workflows_mgr.gather_workflows, 10000).start() try: ioloop.IOLoop.current().start() except KeyboardInterrupt: diff --git a/cylc/uiserver/resolvers.py b/cylc/uiserver/resolvers.py index 3b9e2fca..438c1bb9 100644 --- a/cylc/uiserver/resolvers.py +++ b/cylc/uiserver/resolvers.py @@ -23,7 +23,7 @@ class Resolvers(BaseResolvers): """UI Server context GraphQL query and mutation resolvers.""" - ws_mgr = None + workflows_mgr = None def __init__(self, data, **kwargs): super().__init__(data) @@ -46,7 +46,7 @@ async def mutator(self, info, *m_args): 'request_string': req_str, 'variables': variables, } - return self.ws_mgr.multi_request('graphql', w_ids, graphql_args) + return self.workflows_mgr.multi_request('graphql', w_ids, graphql_args) async def nodes_mutator(self, info, *m_args): """Mutate node items of associated workflows.""" @@ -63,5 +63,5 @@ async def nodes_mutator(self, info, *m_args): 'variables': variables, } multi_args = {w_id: graphql_args for w_id in w_ids} - return self.ws_mgr.multi_request( + return self.workflows_mgr.multi_request( 'graphql', w_ids, multi_args=multi_args) diff --git a/cylc/uiserver/tests/test_workflows_mgr.py b/cylc/uiserver/tests/test_workflows_mgr.py index f1e8e885..74f4ec41 100644 --- a/cylc/uiserver/tests/test_workflows_mgr.py +++ b/cylc/uiserver/tests/test_workflows_mgr.py @@ -43,28 +43,30 @@ async def test_workflow_request_client_error(async_client: AsyncClientFixture): @pytest.mark.asyncio -@pytest.mark.parametrize("returns,command,context,expected_ctx,expected_msg", [ - pytest.param( - 42, 'cmd', None, 'cmd', 42 - ), - pytest.param( - 42, '', None, '', 42 - ), - pytest.param( - 42, 'cmd', 'some-context', 'some-context', 42 - ) -]) +@pytest.mark.parametrize( + "returns,command,req_context,expected_ctx,expected_msg", + [ + pytest.param( + 42, 'cmd', None, 'cmd', 42 + ), + pytest.param( + 42, '', None, '', 42 + ), + pytest.param( + 42, 'cmd', 'some-context', 'some-context', 42 + ) + ]) async def test_workflow_request( async_client: AsyncClientFixture, returns, command, - context, + req_context, expected_ctx, expected_msg ): async_client.will_return(returns) ctx, msg = await workflow_request( - client=async_client, command=command, context=context) + client=async_client, command=command, req_context=req_context) assert expected_ctx == ctx assert expected_msg == msg @@ -86,19 +88,19 @@ async def test_est_workflow_socket_error( def side_effect(*_, **__): raise socket.error mocked_get_host_ip_by_name.side_effect = side_effect - reg, host, port, client = await est_workflow( + reg, host, port, pub_port, client = await est_workflow( '', '', 0, 0) - assert not any([reg, host, port, client]) + assert not any([reg, host, port, pub_port, client]) assert client is None @pytest.mark.asyncio -@pytest.mark.parametrize('reg,host,port,timeout,expected_host', [ +@pytest.mark.parametrize('reg,host,port,pub_port,timeout,expected_host', [ pytest.param( - 'remote', 'remote', 8000, 1, 'remote_host' + 'remote', 'remote', 8000, 8002, 1, 'remote_host' ), pytest.param( - 'local', 'localhost', 4000, 2, 'localhost' + 'local', 'localhost', 4000, 4001, 2, 'localhost' ) ]) async def test_est_workflow( @@ -107,6 +109,7 @@ async def test_est_workflow( reg, host, port, + pub_port, timeout, expected_host ): @@ -124,11 +127,12 @@ async def test_est_workflow( mocked_get_host_ip_by_name.side_effect = lambda x: f'remote_host' \ if x == 'remote' else x - r_reg, r_host, r_port, r_client, r_result = await est_workflow( - reg, host, port, timeout) + r_reg, r_host, r_port, r_pub_port, r_client, r_result = await est_workflow( + reg, host, port, pub_port, timeout) assert reg == r_reg assert expected_host == r_host assert port == r_port + assert pub_port == r_pub_port assert r_client.__class__ == AsyncClientFixture diff --git a/cylc/uiserver/workflows_mgr.py b/cylc/uiserver/workflows_mgr.py index d20e8549..06a2580b 100644 --- a/cylc/uiserver/workflows_mgr.py +++ b/cylc/uiserver/workflows_mgr.py @@ -26,6 +26,7 @@ import socket import asyncio import logging +import zmq.asyncio from contextlib import suppress @@ -42,22 +43,22 @@ async def workflow_request(client, command, args=None, - timeout=None, context=None): + timeout=None, req_context=None): """Workflow request command.""" - if context is None: - context = command + if req_context is None: + req_context = command try: result = await client.async_request(command, args, timeout) - return (context, result) + return (req_context, result) except ClientTimeout as exc: logger.exception(exc) - return (context, MSG_TIMEOUT) + return (req_context, MSG_TIMEOUT) except ClientError as exc: logger.exception(exc) - return (context, None) + return (req_context, None) -async def est_workflow(reg, host, port, timeout=None): +async def est_workflow(reg, host, port, pub_port, context=None, timeout=None): """Establish communication with workflow, instantiating REQ client.""" if is_remote_host(host): try: @@ -66,20 +67,25 @@ async def est_workflow(reg, host, port, timeout=None): if flags.debug: raise logger.error("ERROR: %s: %s\n" % (exc, host)) - return (reg, host, port, None) + return (reg, host, port, pub_port, None) # NOTE: Connect to the suite by host:port. This way the # SuiteRuntimeClient will not attempt to check the contact file # which would be unnecessary as we have already done so. # NOTE: This part of the scan *is* IO blocking. - client = SuiteRuntimeClient(reg, host=host, port=port, timeout=timeout) - context, result = await workflow_request(client, 'identify') - return (reg, host, port, client, result) + client = SuiteRuntimeClient(reg, host=host, port=port, + context=context, timeout=timeout) + req_context, result = await workflow_request(client, 'identify') + return (reg, host, port, pub_port, client, result) -class WorkflowsManager(object): +class WorkflowsManager: - def __init__(self): + def __init__(self, context=None): + if context is None: + self.context = zmq.asyncio.Context() + else: + self.context = context self.workflows = {} def spawn_workflow(self): @@ -90,14 +96,14 @@ async def gather_workflows(self): scanflows = {} cre_owner, cre_name = re_compile_filters(None, ['.*']) scan_args = ( - (reg, host, port, CLIENT_TIMEOUT) - for reg, host, port in + (reg, host, port, pub_port, self.context, CLIENT_TIMEOUT) + for reg, host, port, pub_port in get_scan_items_from_fs(cre_owner, cre_name)) gathers = () for arg in scan_args: gathers += (est_workflow(*arg),) items = await asyncio.gather(*gathers) - for reg, host, port, client, info in items: + for reg, host, port, pub_port, client, info in items: if info is not None and info != MSG_TIMEOUT: owner = info['owner'] scanflows[f"{owner}{ID_DELIM}{reg}"] = { @@ -105,6 +111,7 @@ async def gather_workflows(self): 'owner': owner, 'host': host, 'port': port, + 'pub_port': pub_port, 'version': info['version'], 'req_client': client, } @@ -145,7 +152,7 @@ async def multi_request(self, command, workflows, args=None, ) gathers = () for info, request_args in req_args.items(): - gathers += (workflow_request(context=info, *request_args),) + gathers += (workflow_request(req_context=info, *request_args),) results = await asyncio.gather(*gathers) res = [] for key, val in results: From 347189f8f3e42115e357b05b20555fd60a4b1a65 Mon Sep 17 00:00:00 2001 From: David Sutherland Date: Tue, 12 Nov 2019 21:22:06 +1300 Subject: [PATCH 2/5] fix tests, sync comments & docstrings --- cylc/uiserver/data_mgr.py | 59 ++++++++++++++++++++++++++++----- cylc/uiserver/main.py | 5 ++- cylc/uiserver/tests/conftest.py | 9 +++-- cylc/uiserver/workflows_mgr.py | 4 +-- 4 files changed, 61 insertions(+), 16 deletions(-) diff --git a/cylc/uiserver/data_mgr.py b/cylc/uiserver/data_mgr.py index 0aec5d49..ce094227 100644 --- a/cylc/uiserver/data_mgr.py +++ b/cylc/uiserver/data_mgr.py @@ -13,8 +13,24 @@ # # You should have received a copy of the GNU General Public License # along with this program. If not, see . +"""Manage a local data-store replica of all workflow service data-stores. -"""Create and update the data structure for all workflow services.""" +A local data-store is created and synced for all workflows established by +the workflow service manager. + +The workflows publish the updated fields of the updated data elements (deltas), +and these elements are grouped by type/topic. Once subscribed to, the publisher +queues these messages until the are received, if the delta creation time is +newer than that of the last update then it is applied (updates merged, pruned +deleted) then a checksum is generated from the time stamped IDs and compared to +the published one. + +Reconciliation on failed verification is done by requesting all elements of a +topic, and replacing the respective data-store elements with this. + +Subscriptions are currently run in a different thread (via ThreadPoolExecutor). + +""" import asyncio import logging @@ -36,10 +52,10 @@ class DataManager: - """Manage the local data-store acquisition/updates from all workflows.""" + """Manage the local data-store acquisition/updates for all workflows.""" - def __init__(self, workfloworkflows_mgr): - self.workflows_mgr = workfloworkflows_mgr + def __init__(self, workflows_mgr): + self.workflows_mgr = workflows_mgr self.data = {} self.w_subs = {} self.topics = {topic.encode('utf-8') for topic in DELTAS_MAP} @@ -76,7 +92,14 @@ async def sync_workflows(self): await asyncio.sleep(1.0) def _start_subscription(self, w_id, host, port): - """Instatiate and run subscriber and data-store management.""" + """Instatiate and run subscriber and data-store management. + + Args: + w_id (str): Workflow external ID. + host (str): Hostname of target workflow. + port (int): Port of target workflow. + + """ self.w_subs[w_id] = WorkflowSubscriber( host, port, context=self.workflows_mgr.context, topics=self.topics) @@ -87,7 +110,15 @@ def _start_subscription(self, w_id, host, port): w_id=w_id)) def update_workflow_data(self, topic, delta, w_id): - """Manage and apply incomming data-store deltas.""" + """Manage and apply incomming data-store deltas. + + Args: + topic (str): topic of published data. + delta (object): Published protobuf message data container. + w_id (str): Workflow external ID. + + """ + # wait until data-store is populated for this workflow loop_cnt = 0 while loop_cnt < 5: if w_id not in self.data: @@ -100,6 +131,7 @@ def update_workflow_data(self, topic, delta, w_id): delta_time = getattr( delta, 'time', getattr(delta, 'last_updated', 0.0)) # If the workflow has reloaded recreate the data + # otherwise apply the delta if it's newer than the previously applied. if delta.reloaded: self.data[w_id][topic] = {ele.id: ele for ele in delta.deltas} self.data[w_id]['delta_times'][topic] = delta_time @@ -114,6 +146,11 @@ def reconcile_update(self, topic, delta, w_id): Verify data-store is in sync by topic/element-type and on failure request entire set of respective data elements. + Args: + topic (str): topic of published data. + delta (object): Published protobuf message data container. + w_id (str): Workflow external ID. + """ if topic == WORKFLOW: return @@ -125,6 +162,7 @@ def reconcile_update(self, topic, delta, w_id): [getattr(e, s_att) for e in self.data[w_id][topic].values()]) if local_checksum != delta.checksum: + # use threadsafe as client socket is in main loop thread. future = asyncio.run_coroutine_threadsafe( workflow_request( self.workflows_mgr.workflows[w_id]['req_client'], @@ -147,9 +185,14 @@ def reconcile_update(self, topic, delta, w_id): apply_delta(topic, new_delta, self.data[w_id]) self.data[w_id]['delta_times'][topic] = new_delta.time - # Data syncing async def entire_workflow_update(self, ids=None): - """Update all data of workflow(s) from associated WS.""" + """Update entire local data-store of workflow(s). + + Args: + ids (list): List of workflow external IDs. + + + """ if ids is None: ids = [] diff --git a/cylc/uiserver/main.py b/cylc/uiserver/main.py index 8b627215..3b524416 100644 --- a/cylc/uiserver/main.py +++ b/cylc/uiserver/main.py @@ -46,10 +46,12 @@ def signal_handler(self, signum, frame): self.is_closing = True def try_exit(self, uis): + # clean up and stop in here if self.is_closing: - # clean up here + # stop the subscribers running in the thread pool executor for sub in uis.data_mgr.w_subs.values(): sub.stop() + # Destroy ZeroMQ context of all sockets uis.workflows_mgr.context.destroy() ioloop.IOLoop.instance().stop() logger.info('exit success') @@ -130,6 +132,7 @@ def start(self, debug: bool): app = self._make_app(debug) signal.signal(signal.SIGINT, app.signal_handler) app.listen(self._port) + # pass in server object for clean exit ioloop.PeriodicCallback( partial(app.try_exit, uis=self), 100).start() # Discover workflows on initial start up. diff --git a/cylc/uiserver/tests/conftest.py b/cylc/uiserver/tests/conftest.py index de253de5..2610afaf 100644 --- a/cylc/uiserver/tests/conftest.py +++ b/cylc/uiserver/tests/conftest.py @@ -17,16 +17,15 @@ import inspect import pytest +import zmq -from cylc.flow.network.client import ZMQClient +from cylc.flow.network import ZMQSocketBase -class AsyncClientFixture(ZMQClient): +class AsyncClientFixture(ZMQSocketBase): + pattern = zmq.REQ host = '' port = 0 - encode_method = None - decode_method = None - secret_method = None def __init__(self): self.returns = None diff --git a/cylc/uiserver/workflows_mgr.py b/cylc/uiserver/workflows_mgr.py index 06a2580b..f60d2ad6 100644 --- a/cylc/uiserver/workflows_mgr.py +++ b/cylc/uiserver/workflows_mgr.py @@ -123,12 +123,12 @@ async def gather_workflows(self): info['port'] == scanflows[w_id]['port']): client = scanflows[w_id]['req_client'] with suppress(IOError): - client.socket.close() + client.stop(stop_loop=False) scanflows.pop(w_id) continue client = self.workflows[w_id]['req_client'] with suppress(IOError): - client.socket.close() + client.stop(stop_loop=False) self.workflows.pop(w_id) # update with new From be928a87b6708f16b8adbf45ecee7aab4460c53c Mon Sep 17 00:00:00 2001 From: David Sutherland Date: Wed, 13 Nov 2019 14:19:34 +1300 Subject: [PATCH 3/5] published shutdown handling & sub on discovery --- cylc/uiserver/data_mgr.py | 82 ++++++++++++++--------- cylc/uiserver/main.py | 12 ++-- cylc/uiserver/tests/test_workflows_mgr.py | 4 +- cylc/uiserver/workflows_mgr.py | 38 ++++++++--- 4 files changed, 85 insertions(+), 51 deletions(-) diff --git a/cylc/uiserver/data_mgr.py b/cylc/uiserver/data_mgr.py index ce094227..398430c5 100644 --- a/cylc/uiserver/data_mgr.py +++ b/cylc/uiserver/data_mgr.py @@ -54,55 +54,63 @@ class DataManager: """Manage the local data-store acquisition/updates for all workflows.""" + INIT_DATA_WAIT_TIME = 5. # seconds + INIT_DATA_RETRY_DELAY = 0.5 # seconds + RECONCILE_TIMEOUT = 5. # seconds + def __init__(self, workflows_mgr): self.workflows_mgr = workflows_mgr self.data = {} self.w_subs = {} self.topics = {topic.encode('utf-8') for topic in DELTAS_MAP} + self.topics.add(b'shutdown') self.loop = None + # Might be options other than threads to achieve + # non-blocking subscriptions, but this works. + self.executor = ThreadPoolExecutor() - async def sync_workflows(self): + async def sync_workflow(self, w_id, *args, **kwargs): """Run data store sync with workflow services. Subscriptions and sync management is instantiated and run in a separate thread for each workflow. This is to avoid the sync loop blocking the main loop. + """ - self.loop = asyncio.get_running_loop() - # Might be options other than threads to achieve - # non-blocking subscriptions, but this works. - with ThreadPoolExecutor() as pool: - while True: - for w_id, info in self.workflows_mgr.workflows.items(): - if w_id not in self.w_subs: - pool.submit( - partial( - self._start_subscription, - w_id, - info['host'], - info['pub_port'])) - await self.entire_workflow_update(ids=[w_id]) - for w_id in list(self.w_subs): - if w_id not in self.workflows_mgr.workflows: - self.w_subs[w_id].stop() - del self.w_subs[w_id] - if w_id in self.data: - del self.data[w_id] - - await asyncio.sleep(1.0) - - def _start_subscription(self, w_id, host, port): - """Instatiate and run subscriber and data-store management. + if self.loop is None: + self.loop = asyncio.get_running_loop() + if w_id in self.w_subs: + return + self.executor.submit( + partial(self.start_subscription, w_id, *args, **kwargs) + ) + await self.entire_workflow_update(ids=[w_id]) + + def purge_workflow(self, w_id): + """Purge the manager of a workflow's subscription and data.""" + if w_id in self.w_subs: + self.w_subs[w_id].stop() + del self.w_subs[w_id] + if w_id in self.data: + del self.data[w_id] + + def start_subscription(self, w_id, reg, host, port): + """Instatiate and run subscriber data-store sync. Args: w_id (str): Workflow external ID. + reg (str): Registered workflow name. host (str): Hostname of target workflow. port (int): Port of target workflow. """ self.w_subs[w_id] = WorkflowSubscriber( - host, port, - context=self.workflows_mgr.context, topics=self.topics) + reg, + host=host, + port=port, + context=self.workflows_mgr.context, + topics=self.topics + ) self.w_subs[w_id].loop.run_until_complete( self.w_subs[w_id].subscribe( process_delta_msg, @@ -120,16 +128,20 @@ def update_workflow_data(self, topic, delta, w_id): """ # wait until data-store is populated for this workflow loop_cnt = 0 - while loop_cnt < 5: + while loop_cnt < self.INIT_DATA_WAIT_TIME: if w_id not in self.data: - sleep(0.5) + sleep(self.INIT_DATA_RETRY_DELAY) loop_cnt += 1 continue break if w_id not in self.data: return + if topic == 'shutdown': + self.workflows_mgr.stopping.add(w_id) + self.w_subs[w_id].stop() + return delta_time = getattr( - delta, 'time', getattr(delta, 'last_updated', 0.0)) + delta, 'time', getattr(delta, 'last_updated', 0.0)) # If the workflow has reloaded recreate the data # otherwise apply the delta if it's newer than the previously applied. if delta.reloaded: @@ -174,9 +186,13 @@ def reconcile_update(self, topic, delta, w_id): try: _, new_delta_msg = future.result(5.0) except asyncio.TimeoutError: - logger.info(f'The reconcile update coroutine {w_id} {topic}' - f'took too long, cancelling the task...') + logger.info( + f'The reconcile update coroutine {w_id} {topic}' + f'took too long, cancelling the subscription/sync.' + ) future.cancel() + self.workflows_mgr.stopping.add(w_id) + self.w_subs[w_id].stop() except Exception as exc: logger.exception(exc) else: diff --git a/cylc/uiserver/main.py b/cylc/uiserver/main.py index 3b524416..f4b4c354 100644 --- a/cylc/uiserver/main.py +++ b/cylc/uiserver/main.py @@ -19,13 +19,13 @@ import argparse import json import logging +from logging.config import dictConfig import os +from os.path import join, abspath, dirname import signal from functools import partial from cylc.flow.network.schema import schema -from logging.config import dictConfig -from os.path import join, abspath, dirname from tornado import web, ioloop from jupyterhub.services.auth import HubOAuthCallbackHandler @@ -51,6 +51,8 @@ def try_exit(self, uis): # stop the subscribers running in the thread pool executor for sub in uis.data_mgr.w_subs.values(): sub.stop() + # Shutdown the thread pool executor + uis.data_mgr.executor.shutdown(wait=False) # Destroy ZeroMQ context of all sockets uis.workflows_mgr.context.destroy() ioloop.IOLoop.instance().stop() @@ -67,7 +69,7 @@ def __init__(self, port, static, jupyter_hub_service_prefix): script_dir = os.path.dirname(__file__) self._static = os.path.abspath(os.path.join(script_dir, static)) self._jupyter_hub_service_prefix = jupyter_hub_service_prefix - self.workflows_mgr = WorkflowsManager() + self.workflows_mgr = WorkflowsManager(self) self.data_mgr = DataManager(self.workflows_mgr) self.resolvers = Resolvers( self.data_mgr.data, @@ -138,12 +140,10 @@ def start(self, debug: bool): # Discover workflows on initial start up. ioloop.IOLoop.current().add_callback( self.workflows_mgr.gather_workflows) - # Start the workflow data-store sync - ioloop.IOLoop.current().add_callback(self.data_mgr.sync_workflows) # If the client is already established it's not overridden, # so the following callbacks can happen at the same time. ioloop.PeriodicCallback( - self.workflows_mgr.gather_workflows, 10000).start() + self.workflows_mgr.gather_workflows, 7000).start() try: ioloop.IOLoop.current().start() except KeyboardInterrupt: diff --git a/cylc/uiserver/tests/test_workflows_mgr.py b/cylc/uiserver/tests/test_workflows_mgr.py index 74f4ec41..5b20b460 100644 --- a/cylc/uiserver/tests/test_workflows_mgr.py +++ b/cylc/uiserver/tests/test_workflows_mgr.py @@ -140,12 +140,12 @@ async def test_est_workflow( def test_workflows_manager_constructor(): - mgr = WorkflowsManager() + mgr = WorkflowsManager(None) assert not mgr.workflows def test_workflows_manager_spawn_workflow(): - mgr = WorkflowsManager() + mgr = WorkflowsManager(None) mgr.spawn_workflow() assert not mgr.workflows diff --git a/cylc/uiserver/workflows_mgr.py b/cylc/uiserver/workflows_mgr.py index f60d2ad6..e3f86381 100644 --- a/cylc/uiserver/workflows_mgr.py +++ b/cylc/uiserver/workflows_mgr.py @@ -23,12 +23,12 @@ - ? """ -import socket import asyncio +from contextlib import suppress import logging -import zmq.asyncio +import socket -from contextlib import suppress +import zmq.asyncio from cylc.flow import flags from cylc.flow.exceptions import ClientError, ClientTimeout @@ -66,7 +66,7 @@ async def est_workflow(reg, host, port, pub_port, context=None, timeout=None): except socket.error as exc: if flags.debug: raise - logger.error("ERROR: %s: %s\n" % (exc, host)) + logger.error("ERROR: %s: %s\n", exc, host) return (reg, host, port, pub_port, None) # NOTE: Connect to the suite by host:port. This way the @@ -75,30 +75,39 @@ async def est_workflow(reg, host, port, pub_port, context=None, timeout=None): # NOTE: This part of the scan *is* IO blocking. client = SuiteRuntimeClient(reg, host=host, port=port, context=context, timeout=timeout) - req_context, result = await workflow_request(client, 'identify') + _, result = await workflow_request(client, 'identify') return (reg, host, port, pub_port, client, result) class WorkflowsManager: + """Discover and Manage workflows.""" - def __init__(self, context=None): + def __init__(self, uiserver, context=None): + self.uiserver = uiserver if context is None: self.context = zmq.asyncio.Context() else: self.context = context self.workflows = {} + self.stopping = set() def spawn_workflow(self): + """Start/spawn a workflow.""" # TODO - Spawn workflows pass async def gather_workflows(self): + """Scan, establish, and discard workflows.""" scanflows = {} cre_owner, cre_name = re_compile_filters(None, ['.*']) scan_args = ( (reg, host, port, pub_port, self.context, CLIENT_TIMEOUT) for reg, host, port, pub_port in - get_scan_items_from_fs(cre_owner, cre_name)) + get_scan_items_from_fs(cre_owner, cre_name) + if reg not in self.stopping) + # clear stopping set + self.stopping.clear() + gathers = () for arg in scan_args: gathers += (est_workflow(*arg),) @@ -130,9 +139,18 @@ async def gather_workflows(self): with suppress(IOError): client.stop(stop_loop=False) self.workflows.pop(w_id) + self.uiserver.data_mgr.purge_workflow(w_id) - # update with new - self.workflows.update(scanflows) + # update with new, and start data sync + gathers = () + for w_id, w_info in scanflows.items(): + self.workflows[w_id] = w_info + gathers += ( + self.uiserver.data_mgr.sync_workflow( + w_id, w_info['name'], w_info['host'], w_info['pub_port'] + ), + ) + await asyncio.gather(*gathers) async def multi_request(self, command, workflows, args=None, multi_args=None, timeout=None): @@ -155,7 +173,7 @@ async def multi_request(self, command, workflows, args=None, gathers += (workflow_request(req_context=info, *request_args),) results = await asyncio.gather(*gathers) res = [] - for key, val in results: + for _, val in results: res.extend([ msg_core for msg_core in list(val.values())[0].get('result') From 5fafcf13befe8d12942ed2f14cae9fc1bab011d7 Mon Sep 17 00:00:00 2001 From: David Sutherland Date: Fri, 29 Nov 2019 13:19:17 +1300 Subject: [PATCH 4/5] API version filter, module name change --- .../{data_mgr.py => data_store_mgr.py} | 6 ++--- cylc/uiserver/main.py | 12 ++++----- cylc/uiserver/resolvers.py | 2 +- cylc/uiserver/workflows_mgr.py | 25 ++++++++++++++----- 4 files changed, 29 insertions(+), 16 deletions(-) rename cylc/uiserver/{data_mgr.py => data_store_mgr.py} (98%) diff --git a/cylc/uiserver/data_mgr.py b/cylc/uiserver/data_store_mgr.py similarity index 98% rename from cylc/uiserver/data_mgr.py rename to cylc/uiserver/data_store_mgr.py index 398430c5..b6d5cf86 100644 --- a/cylc/uiserver/data_mgr.py +++ b/cylc/uiserver/data_store_mgr.py @@ -42,7 +42,7 @@ from cylc.flow.network.server import PB_METHOD_MAP from cylc.flow.network.scan import MSG_TIMEOUT from cylc.flow.network.subscriber import WorkflowSubscriber, process_delta_msg -from cylc.flow.ws_data_mgr import ( +from cylc.flow.data_store_mgr import ( EDGES, FAMILIES, FAMILY_PROXIES, JOBS, TASKS, TASK_PROXIES, WORKFLOW, DELTAS_MAP, apply_delta, generate_checksum ) @@ -51,7 +51,7 @@ logger = logging.getLogger(__name__) -class DataManager: +class DataStoreMgr: """Manage the local data-store acquisition/updates for all workflows.""" INIT_DATA_WAIT_TIME = 5. # seconds @@ -184,7 +184,7 @@ def reconcile_update(self, topic, delta, w_id): self.loop ) try: - _, new_delta_msg = future.result(5.0) + _, new_delta_msg = future.result(self.RECONCILE_TIMEOUT) except asyncio.TimeoutError: logger.info( f'The reconcile update coroutine {w_id} {topic}' diff --git a/cylc/uiserver/main.py b/cylc/uiserver/main.py index f4b4c354..157e916a 100644 --- a/cylc/uiserver/main.py +++ b/cylc/uiserver/main.py @@ -17,20 +17,20 @@ # along with this program. If not, see . import argparse +from functools import partial import json import logging from logging.config import dictConfig import os from os.path import join, abspath, dirname import signal -from functools import partial from cylc.flow.network.schema import schema from tornado import web, ioloop from jupyterhub.services.auth import HubOAuthCallbackHandler from jupyterhub.utils import url_path_join -from .data_mgr import DataManager +from .data_store_mgr import DataStoreMgr from .handlers import * from .resolvers import Resolvers from .workflows_mgr import WorkflowsManager @@ -49,10 +49,10 @@ def try_exit(self, uis): # clean up and stop in here if self.is_closing: # stop the subscribers running in the thread pool executor - for sub in uis.data_mgr.w_subs.values(): + for sub in uis.data_store_mgr.w_subs.values(): sub.stop() # Shutdown the thread pool executor - uis.data_mgr.executor.shutdown(wait=False) + uis.data_store_mgr.executor.shutdown(wait=False) # Destroy ZeroMQ context of all sockets uis.workflows_mgr.context.destroy() ioloop.IOLoop.instance().stop() @@ -70,9 +70,9 @@ def __init__(self, port, static, jupyter_hub_service_prefix): self._static = os.path.abspath(os.path.join(script_dir, static)) self._jupyter_hub_service_prefix = jupyter_hub_service_prefix self.workflows_mgr = WorkflowsManager(self) - self.data_mgr = DataManager(self.workflows_mgr) + self.data_store_mgr = DataStoreMgr(self.workflows_mgr) self.resolvers = Resolvers( - self.data_mgr.data, + self.data_store_mgr.data, workflows_mgr=self.workflows_mgr) def _make_app(self, debug: bool): diff --git a/cylc/uiserver/resolvers.py b/cylc/uiserver/resolvers.py index 438c1bb9..1deb9b44 100644 --- a/cylc/uiserver/resolvers.py +++ b/cylc/uiserver/resolvers.py @@ -17,7 +17,7 @@ """GraphQL resolvers for use in data accessing and mutation of workflows.""" from cylc.flow.network.resolvers import BaseResolvers -from cylc.flow.ws_data_mgr import WORKFLOW +from cylc.flow.data_store_mgr import WORKFLOW class Resolvers(BaseResolvers): diff --git a/cylc/uiserver/workflows_mgr.py b/cylc/uiserver/workflows_mgr.py index e3f86381..3b6d6e21 100644 --- a/cylc/uiserver/workflows_mgr.py +++ b/cylc/uiserver/workflows_mgr.py @@ -33,10 +33,11 @@ from cylc.flow import flags from cylc.flow.exceptions import ClientError, ClientTimeout from cylc.flow.hostuserutil import is_remote_host, get_host_ip_by_name +from cylc.flow.network import API from cylc.flow.network.client import SuiteRuntimeClient from cylc.flow.network.scan import ( get_scan_items_from_fs, re_compile_filters, MSG_TIMEOUT) -from cylc.flow.ws_data_mgr import ID_DELIM +from cylc.flow.data_store_mgr import ID_DELIM logger = logging.getLogger(__name__) CLIENT_TIMEOUT = 2.0 @@ -44,7 +45,19 @@ async def workflow_request(client, command, args=None, timeout=None, req_context=None): - """Workflow request command.""" + """Workflow request command. + + Args: + client (object): Instantiated workflow client. + command (str): Command/Endpoint name. + args (dict): Endpoint arguments. + timeout (float): Client request timeout (secs). + req_context (str): A string to identifier. + + Returns: + tuple: (req_context, result) + + """ if req_context is None: req_context = command try: @@ -102,9 +115,9 @@ async def gather_workflows(self): cre_owner, cre_name = re_compile_filters(None, ['.*']) scan_args = ( (reg, host, port, pub_port, self.context, CLIENT_TIMEOUT) - for reg, host, port, pub_port in + for reg, host, port, pub_port, api in get_scan_items_from_fs(cre_owner, cre_name) - if reg not in self.stopping) + if reg not in self.stopping and api == str(API)) # clear stopping set self.stopping.clear() @@ -139,14 +152,14 @@ async def gather_workflows(self): with suppress(IOError): client.stop(stop_loop=False) self.workflows.pop(w_id) - self.uiserver.data_mgr.purge_workflow(w_id) + self.uiserver.data_store_mgr.purge_workflow(w_id) # update with new, and start data sync gathers = () for w_id, w_info in scanflows.items(): self.workflows[w_id] = w_info gathers += ( - self.uiserver.data_mgr.sync_workflow( + self.uiserver.data_store_mgr.sync_workflow( w_id, w_info['name'], w_info['host'], w_info['pub_port'] ), ) From 251e0df92fb95cb28c02d937e6128d2565bb3c4a Mon Sep 17 00:00:00 2001 From: David Sutherland Date: Tue, 3 Dec 2019 22:29:53 +1300 Subject: [PATCH 5/5] cylc-flow 8.0a2-dev pip --- setup.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/setup.py b/setup.py index abc88be9..9fd5c9cb 100644 --- a/setup.py +++ b/setup.py @@ -42,7 +42,8 @@ def find_version(*file_paths): 'jupyterhub==1.0.*', 'tornado==6.0.*', 'graphene-tornado==2.1.*', - 'cylc-flow==8.0a1' + ('cylc-flow @ https://github.com/cylc/cylc-flow' + '/tarball/master#egg=cylc-8.0a2.dev') ] # Only include pytest-runner in setup_requires if we're invoking tests