Skip to content

Commit

Permalink
published shutdown handling & sub on discovery
Browse files Browse the repository at this point in the history
  • Loading branch information
dwsutherland committed Nov 27, 2019
1 parent 347189f commit 4f4b6f2
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 42 deletions.
80 changes: 48 additions & 32 deletions cylc/uiserver/data_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,55 +54,63 @@
class DataManager:
"""Manage the local data-store acquisition/updates for all workflows."""

INIT_DATA_WAIT_TIME = 5. # seconds
INIT_DATA_RETRY_DELAY = 0.5 # seconds
RECONCILE_TIMEOUT = 5. # seconds

def __init__(self, workflows_mgr):
self.workflows_mgr = workflows_mgr
self.data = {}
self.w_subs = {}
self.topics = {topic.encode('utf-8') for topic in DELTAS_MAP}
self.topics.add(b'shutdown')
self.loop = None
# Might be options other than threads to achieve
# non-blocking subscriptions, but this works.
self.executor = ThreadPoolExecutor()

async def sync_workflows(self):
async def sync_workflow(self, w_id, *args, **kwargs):
"""Run data store sync with workflow services.
Subscriptions and sync management is instantiated and run in
a separate thread for each workflow. This is to avoid the sync loop
blocking the main loop.
"""
self.loop = asyncio.get_running_loop()
# Might be options other than threads to achieve
# non-blocking subscriptions, but this works.
with ThreadPoolExecutor() as pool:
while True:
for w_id, info in self.workflows_mgr.workflows.items():
if w_id not in self.w_subs:
pool.submit(
partial(
self._start_subscription,
w_id,
info['host'],
info['pub_port']))
await self.entire_workflow_update(ids=[w_id])
for w_id in list(self.w_subs):
if w_id not in self.workflows_mgr.workflows:
self.w_subs[w_id].stop()
del self.w_subs[w_id]
if w_id in self.data:
del self.data[w_id]

await asyncio.sleep(1.0)

def _start_subscription(self, w_id, host, port):
"""Instatiate and run subscriber and data-store management.
if self.loop is None:
self.loop = asyncio.get_running_loop()
if w_id in self.w_subs:
return
self.executor.submit(
partial(self.start_subscription, w_id, *args, **kwargs)
)
await self.entire_workflow_update(ids=[w_id])

def purge_workflow(self, w_id):
"""Purge the manager of a workflow's subscription and data."""
if w_id in self.w_subs:
self.w_subs[w_id].stop()
del self.w_subs[w_id]
if w_id in self.data:
del self.data[w_id]

def start_subscription(self, w_id, reg, host, port):
"""Instatiate and run subscriber data-store sync.
Args:
w_id (str): Workflow external ID.
reg (str): Registered workflow name.
host (str): Hostname of target workflow.
port (int): Port of target workflow.
"""
self.w_subs[w_id] = WorkflowSubscriber(
host, port,
context=self.workflows_mgr.context, topics=self.topics)
reg,
host=host,
port=port,
context=self.workflows_mgr.context,
topics=self.topics
)
self.w_subs[w_id].loop.run_until_complete(
self.w_subs[w_id].subscribe(
process_delta_msg,
Expand All @@ -120,14 +128,18 @@ def update_workflow_data(self, topic, delta, w_id):
"""
# wait until data-store is populated for this workflow
loop_cnt = 0
while loop_cnt < 5:
while loop_cnt < self.INIT_DATA_WAIT_TIME:
if w_id not in self.data:
sleep(0.5)
sleep(self.INIT_DATA_RETRY_DELAY)
loop_cnt += 1
continue
break
if w_id not in self.data:
return
if topic == 'shutdown':
self.workflow_mgr.stopping.add(w_id)
self.w_subs[w_id].stop()
return
delta_time = getattr(
delta, 'time', getattr(delta, 'last_updated', 0.0))
# If the workflow has reloaded recreate the data
Expand Down Expand Up @@ -174,9 +186,13 @@ def reconcile_update(self, topic, delta, w_id):
try:
_, new_delta_msg = future.result(5.0)
except asyncio.TimeoutError:
logger.info(f'The reconcile update coroutine {w_id} {topic}'
f'took too long, cancelling the task...')
logger.info(
f'The reconcile update coroutine {w_id} {topic}'
f'took too long, cancelling the subscription/sync.'
)
future.cancel()
self.workflow_mgr.stopping.add(w_id)
self.w_subs[w_id].stop()
except Exception as exc:
logger.exception(exc)
else:
Expand Down
8 changes: 4 additions & 4 deletions cylc/uiserver/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ def try_exit(self, uis):
# stop the subscribers running in the thread pool executor
for sub in uis.data_mgr.w_subs.values():
sub.stop()
# Shutdown the thread pool executor
uis.data_mgr.executor.shutdown()
# Destroy ZeroMQ context of all sockets
uis.workflows_mgr.context.destroy()
ioloop.IOLoop.instance().stop()
Expand All @@ -67,7 +69,7 @@ def __init__(self, port, static, jupyter_hub_service_prefix):
script_dir = os.path.dirname(__file__)
self._static = os.path.abspath(os.path.join(script_dir, static))
self._jupyter_hub_service_prefix = jupyter_hub_service_prefix
self.workflows_mgr = WorkflowsManager()
self.workflows_mgr = WorkflowsManager(self)
self.data_mgr = DataManager(self.workflows_mgr)
self.resolvers = Resolvers(
self.data_mgr.data,
Expand Down Expand Up @@ -138,12 +140,10 @@ def start(self, debug: bool):
# Discover workflows on initial start up.
ioloop.IOLoop.current().add_callback(
self.workflows_mgr.gather_workflows)
# Start the workflow data-store sync
ioloop.IOLoop.current().add_callback(self.data_mgr.sync_workflows)
# If the client is already established it's not overridden,
# so the following callbacks can happen at the same time.
ioloop.PeriodicCallback(
self.workflows_mgr.gather_workflows, 10000).start()
self.workflows_mgr.gather_workflows, 7000).start()
try:
ioloop.IOLoop.current().start()
except KeyboardInterrupt:
Expand Down
4 changes: 2 additions & 2 deletions cylc/uiserver/tests/test_workflows_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,12 +140,12 @@ async def test_est_workflow(


def test_workflows_manager_constructor():
mgr = WorkflowsManager()
mgr = WorkflowsManager(None)
assert not mgr.workflows


def test_workflows_manager_spawn_workflow():
mgr = WorkflowsManager()
mgr = WorkflowsManager(None)
mgr.spawn_workflow()
assert not mgr.workflows

Expand Down
23 changes: 19 additions & 4 deletions cylc/uiserver/workflows_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,14 @@ async def est_workflow(reg, host, port, pub_port, context=None, timeout=None):

class WorkflowsManager:

def __init__(self, context=None):
def __init__(self, uiserver, context=None):
self.uiserver = uiserver
if context is None:
self.context = zmq.asyncio.Context()
else:
self.context = context
self.workflows = {}
self.stopping = set()

def spawn_workflow(self):
# TODO - Spawn workflows
Expand All @@ -98,7 +100,11 @@ async def gather_workflows(self):
scan_args = (
(reg, host, port, pub_port, self.context, CLIENT_TIMEOUT)
for reg, host, port, pub_port in
get_scan_items_from_fs(cre_owner, cre_name))
get_scan_items_from_fs(cre_owner, cre_name)
if reg not in self.stopping)
# clear stopping set
self.stopping.clear()

gathers = ()
for arg in scan_args:
gathers += (est_workflow(*arg),)
Expand Down Expand Up @@ -130,9 +136,18 @@ async def gather_workflows(self):
with suppress(IOError):
client.stop(stop_loop=False)
self.workflows.pop(w_id)
self.uiserver.data_mgr.purge_workflow(w_id)

# update with new
self.workflows.update(scanflows)
# update with new, and start data sync
gathers = ()
for w_id, w_info in scanflows.items():
self.workflows[w_id] = w_info
gathers += (
self.uiserver.data_mgr.sync_workflow(
w_id, w_info['name'], w_info['host'], w_info['pub_port']
),
)
await asyncio.gather(*gathers)

async def multi_request(self, command, workflows, args=None,
multi_args=None, timeout=None):
Expand Down

0 comments on commit 4f4b6f2

Please sign in to comment.