Skip to content

Commit

Permalink
published shutdown handling & sub on discovery
Browse files Browse the repository at this point in the history
  • Loading branch information
dwsutherland committed Nov 27, 2019
1 parent 347189f commit be928a8
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 51 deletions.
82 changes: 49 additions & 33 deletions cylc/uiserver/data_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,55 +54,63 @@
class DataManager:
"""Manage the local data-store acquisition/updates for all workflows."""

INIT_DATA_WAIT_TIME = 5. # seconds
INIT_DATA_RETRY_DELAY = 0.5 # seconds
RECONCILE_TIMEOUT = 5. # seconds

def __init__(self, workflows_mgr):
self.workflows_mgr = workflows_mgr
self.data = {}
self.w_subs = {}
self.topics = {topic.encode('utf-8') for topic in DELTAS_MAP}
self.topics.add(b'shutdown')
self.loop = None
# Might be options other than threads to achieve
# non-blocking subscriptions, but this works.
self.executor = ThreadPoolExecutor()

async def sync_workflows(self):
async def sync_workflow(self, w_id, *args, **kwargs):
"""Run data store sync with workflow services.
Subscriptions and sync management is instantiated and run in
a separate thread for each workflow. This is to avoid the sync loop
blocking the main loop.
"""
self.loop = asyncio.get_running_loop()
# Might be options other than threads to achieve
# non-blocking subscriptions, but this works.
with ThreadPoolExecutor() as pool:
while True:
for w_id, info in self.workflows_mgr.workflows.items():
if w_id not in self.w_subs:
pool.submit(
partial(
self._start_subscription,
w_id,
info['host'],
info['pub_port']))
await self.entire_workflow_update(ids=[w_id])
for w_id in list(self.w_subs):
if w_id not in self.workflows_mgr.workflows:
self.w_subs[w_id].stop()
del self.w_subs[w_id]
if w_id in self.data:
del self.data[w_id]

await asyncio.sleep(1.0)

def _start_subscription(self, w_id, host, port):
"""Instatiate and run subscriber and data-store management.
if self.loop is None:
self.loop = asyncio.get_running_loop()
if w_id in self.w_subs:
return
self.executor.submit(
partial(self.start_subscription, w_id, *args, **kwargs)
)
await self.entire_workflow_update(ids=[w_id])

def purge_workflow(self, w_id):
"""Purge the manager of a workflow's subscription and data."""
if w_id in self.w_subs:
self.w_subs[w_id].stop()
del self.w_subs[w_id]
if w_id in self.data:
del self.data[w_id]

def start_subscription(self, w_id, reg, host, port):
"""Instatiate and run subscriber data-store sync.
Args:
w_id (str): Workflow external ID.
reg (str): Registered workflow name.
host (str): Hostname of target workflow.
port (int): Port of target workflow.
"""
self.w_subs[w_id] = WorkflowSubscriber(
host, port,
context=self.workflows_mgr.context, topics=self.topics)
reg,
host=host,
port=port,
context=self.workflows_mgr.context,
topics=self.topics
)
self.w_subs[w_id].loop.run_until_complete(
self.w_subs[w_id].subscribe(
process_delta_msg,
Expand All @@ -120,16 +128,20 @@ def update_workflow_data(self, topic, delta, w_id):
"""
# wait until data-store is populated for this workflow
loop_cnt = 0
while loop_cnt < 5:
while loop_cnt < self.INIT_DATA_WAIT_TIME:
if w_id not in self.data:
sleep(0.5)
sleep(self.INIT_DATA_RETRY_DELAY)
loop_cnt += 1
continue
break
if w_id not in self.data:
return
if topic == 'shutdown':
self.workflows_mgr.stopping.add(w_id)
self.w_subs[w_id].stop()
return
delta_time = getattr(
delta, 'time', getattr(delta, 'last_updated', 0.0))
delta, 'time', getattr(delta, 'last_updated', 0.0))
# If the workflow has reloaded recreate the data
# otherwise apply the delta if it's newer than the previously applied.
if delta.reloaded:
Expand Down Expand Up @@ -174,9 +186,13 @@ def reconcile_update(self, topic, delta, w_id):
try:
_, new_delta_msg = future.result(5.0)
except asyncio.TimeoutError:
logger.info(f'The reconcile update coroutine {w_id} {topic}'
f'took too long, cancelling the task...')
logger.info(
f'The reconcile update coroutine {w_id} {topic}'
f'took too long, cancelling the subscription/sync.'
)
future.cancel()
self.workflows_mgr.stopping.add(w_id)
self.w_subs[w_id].stop()
except Exception as exc:
logger.exception(exc)
else:
Expand Down
12 changes: 6 additions & 6 deletions cylc/uiserver/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@
import argparse
import json
import logging
from logging.config import dictConfig
import os
from os.path import join, abspath, dirname
import signal
from functools import partial

from cylc.flow.network.schema import schema
from logging.config import dictConfig
from os.path import join, abspath, dirname
from tornado import web, ioloop

from jupyterhub.services.auth import HubOAuthCallbackHandler
Expand All @@ -51,6 +51,8 @@ def try_exit(self, uis):
# stop the subscribers running in the thread pool executor
for sub in uis.data_mgr.w_subs.values():
sub.stop()
# Shutdown the thread pool executor
uis.data_mgr.executor.shutdown(wait=False)
# Destroy ZeroMQ context of all sockets
uis.workflows_mgr.context.destroy()
ioloop.IOLoop.instance().stop()
Expand All @@ -67,7 +69,7 @@ def __init__(self, port, static, jupyter_hub_service_prefix):
script_dir = os.path.dirname(__file__)
self._static = os.path.abspath(os.path.join(script_dir, static))
self._jupyter_hub_service_prefix = jupyter_hub_service_prefix
self.workflows_mgr = WorkflowsManager()
self.workflows_mgr = WorkflowsManager(self)
self.data_mgr = DataManager(self.workflows_mgr)
self.resolvers = Resolvers(
self.data_mgr.data,
Expand Down Expand Up @@ -138,12 +140,10 @@ def start(self, debug: bool):
# Discover workflows on initial start up.
ioloop.IOLoop.current().add_callback(
self.workflows_mgr.gather_workflows)
# Start the workflow data-store sync
ioloop.IOLoop.current().add_callback(self.data_mgr.sync_workflows)
# If the client is already established it's not overridden,
# so the following callbacks can happen at the same time.
ioloop.PeriodicCallback(
self.workflows_mgr.gather_workflows, 10000).start()
self.workflows_mgr.gather_workflows, 7000).start()
try:
ioloop.IOLoop.current().start()
except KeyboardInterrupt:
Expand Down
4 changes: 2 additions & 2 deletions cylc/uiserver/tests/test_workflows_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,12 +140,12 @@ async def test_est_workflow(


def test_workflows_manager_constructor():
mgr = WorkflowsManager()
mgr = WorkflowsManager(None)
assert not mgr.workflows


def test_workflows_manager_spawn_workflow():
mgr = WorkflowsManager()
mgr = WorkflowsManager(None)
mgr.spawn_workflow()
assert not mgr.workflows

Expand Down
38 changes: 28 additions & 10 deletions cylc/uiserver/workflows_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@
- ?
"""
import socket
import asyncio
from contextlib import suppress
import logging
import zmq.asyncio
import socket

from contextlib import suppress
import zmq.asyncio

from cylc.flow import flags
from cylc.flow.exceptions import ClientError, ClientTimeout
Expand Down Expand Up @@ -66,7 +66,7 @@ async def est_workflow(reg, host, port, pub_port, context=None, timeout=None):
except socket.error as exc:
if flags.debug:
raise
logger.error("ERROR: %s: %s\n" % (exc, host))
logger.error("ERROR: %s: %s\n", exc, host)
return (reg, host, port, pub_port, None)

# NOTE: Connect to the suite by host:port. This way the
Expand All @@ -75,30 +75,39 @@ async def est_workflow(reg, host, port, pub_port, context=None, timeout=None):
# NOTE: This part of the scan *is* IO blocking.
client = SuiteRuntimeClient(reg, host=host, port=port,
context=context, timeout=timeout)
req_context, result = await workflow_request(client, 'identify')
_, result = await workflow_request(client, 'identify')
return (reg, host, port, pub_port, client, result)


class WorkflowsManager:
"""Discover and Manage workflows."""

def __init__(self, context=None):
def __init__(self, uiserver, context=None):
self.uiserver = uiserver
if context is None:
self.context = zmq.asyncio.Context()
else:
self.context = context
self.workflows = {}
self.stopping = set()

def spawn_workflow(self):
"""Start/spawn a workflow."""
# TODO - Spawn workflows
pass

async def gather_workflows(self):
"""Scan, establish, and discard workflows."""
scanflows = {}
cre_owner, cre_name = re_compile_filters(None, ['.*'])
scan_args = (
(reg, host, port, pub_port, self.context, CLIENT_TIMEOUT)
for reg, host, port, pub_port in
get_scan_items_from_fs(cre_owner, cre_name))
get_scan_items_from_fs(cre_owner, cre_name)
if reg not in self.stopping)
# clear stopping set
self.stopping.clear()

gathers = ()
for arg in scan_args:
gathers += (est_workflow(*arg),)
Expand Down Expand Up @@ -130,9 +139,18 @@ async def gather_workflows(self):
with suppress(IOError):
client.stop(stop_loop=False)
self.workflows.pop(w_id)
self.uiserver.data_mgr.purge_workflow(w_id)

# update with new
self.workflows.update(scanflows)
# update with new, and start data sync
gathers = ()
for w_id, w_info in scanflows.items():
self.workflows[w_id] = w_info
gathers += (
self.uiserver.data_mgr.sync_workflow(
w_id, w_info['name'], w_info['host'], w_info['pub_port']
),
)
await asyncio.gather(*gathers)

async def multi_request(self, command, workflows, args=None,
multi_args=None, timeout=None):
Expand All @@ -155,7 +173,7 @@ async def multi_request(self, command, workflows, args=None,
gathers += (workflow_request(req_context=info, *request_args),)
results = await asyncio.gather(*gathers)
res = []
for key, val in results:
for _, val in results:
res.extend([
msg_core
for msg_core in list(val.values())[0].get('result')
Expand Down

0 comments on commit be928a8

Please sign in to comment.