Skip to content

Commit

Permalink
Data sync functional
Browse files Browse the repository at this point in the history
  • Loading branch information
dwsutherland committed Oct 18, 2019
1 parent 7b04c20 commit adc4f12
Show file tree
Hide file tree
Showing 5 changed files with 200 additions and 84 deletions.
170 changes: 134 additions & 36 deletions cylc/uiserver/data_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,39 +19,129 @@
import asyncio
import logging

from cylc.flow.exceptions import ClientError, ClientTimeout
from concurrent.futures import ThreadPoolExecutor
from functools import partial
from time import sleep

from cylc.flow.network.server import PB_METHOD_MAP
from cylc.flow.network.scan import MSG_TIMEOUT
from cylc.flow.network.subscriber import WorkflowSubscriber, process_delta_msg
from cylc.flow.ws_data_mgr import (
EDGES, FAMILIES, FAMILY_PROXIES, JOBS, TASKS, TASK_PROXIES, WORKFLOW
EDGES, FAMILIES, FAMILY_PROXIES, JOBS, TASKS, TASK_PROXIES, WORKFLOW,
DELTAS_MAP, apply_workflow_delta, generate_checksum
)
from .workflows_mgr import workflow_request

logger = logging.getLogger(__name__)


async def get_workflow_data(w_id, client, method):
"""Call WS endpoint for entire workflow protobuf message."""
# Use already established client
try:
pb_msg = await client.async_request(method)
except ClientTimeout as exc:
logger.exception(exc)
return (w_id, MSG_TIMEOUT)
except ClientError as exc:
logger.exception(exc)
return (w_id, None)
else:
ws_data = PB_METHOD_MAP[method]()
ws_data.ParseFromString(pb_msg)
return (w_id, ws_data)


class DataManager:
"""Manage the local data-store acquisition/updates from all workflows."""

def __init__(self, ws_mgr):
self.ws_mgr = ws_mgr
def __init__(self, workfloworkflows_mgr):
self.workflows_mgr = workfloworkflows_mgr
self.data = {}
self.w_subs = {}
self.topics = {topic.encode('utf-8') for topic in DELTAS_MAP}
self.loop = None

async def sync_workflows(self):
"""Run data store sync with workflow services.
Subscriptions and sync management is instantiated and run in
a separate thread for each workflow. This is to avoid the sync loop
blocking the main loop.
"""
self.loop = asyncio.get_running_loop()
# Might be options other than threads to achieve
# non-blocking subscriptions, but this works.
with ThreadPoolExecutor() as pool:
while True:
for w_id, info in self.workflows_mgr.workflows.items():
if w_id not in self.w_subs:
pool.submit(
partial(
self._start_subscription,
w_id,
info['host'],
info['pub_port']))
await self.entire_workflow_update(ids=[w_id])
for w_id in list(self.w_subs):
if w_id not in self.workflows_mgr.workflows:
self.w_subs[w_id].stop()
del self.w_subs[w_id]
if w_id in self.data:
del self.data[w_id]

await asyncio.sleep(1.0)

def _start_subscription(self, w_id, host, port):
"""Instatiate and run subscriber and data-store management."""
self.w_subs[w_id] = WorkflowSubscriber(
host, port,
context=self.workflows_mgr.context, topics=self.topics)
self.w_subs[w_id].loop.run_until_complete(
self.w_subs[w_id].subscribe(
process_delta_msg,
func=self.update_workflow_data,
w_id=w_id))

def update_workflow_data(self, topic, delta, w_id):
"""Manage and apply incomming data-store deltas."""
loop_cnt = 0
while loop_cnt < 5:
if w_id not in self.data:
sleep(0.5)
loop_cnt += 1
continue
break
if w_id not in self.data:
return
delta_time = getattr(
delta, 'time', getattr(delta, 'last_updated', 0.0))
if delta_time >= self.data[w_id]['delta_times'][topic]:
apply_workflow_delta(topic, delta, self.data[w_id])
self.data[w_id]['delta_times'][topic] = delta_time
self.reconcile_update(topic, delta, w_id)

def reconcile_update(self, topic, delta, w_id):
"""Reconcile local with workflow data-store.
Verify data-store is in sync by topic/element-type
and on failure request entire set of respective data elements.
"""
if topic == WORKFLOW:
return
if topic == EDGES:
s_att = 'id'
else:
s_att = 'stamp'
local_checksum = generate_checksum(
[getattr(e, s_att)
for e in self.data[w_id][topic].values()])
if local_checksum != delta.checksum:
future = asyncio.run_coroutine_threadsafe(
workflow_request(
self.workflows_mgr.workflows[w_id]['req_client'],
'pb_data_elements',
args={'element_type': topic}
),
self.loop
)
try:
_, new_delta_msg = future.result(5.0)
except asyncio.TimeoutError:
logger.info(f'The reconcile update coroutine {w_id} {topic}'
f'took too long, cancelling the task...')
future.cancel()
except Exception as exc:
logger.exception(exc)
else:
new_delta = DELTAS_MAP[topic]()
new_delta.ParseFromString(new_delta_msg)
apply_workflow_delta(topic, new_delta, self.data[w_id])
self.data[w_id]['delta_times'][topic] = new_delta.time

# Data syncing
async def entire_workflow_update(self, ids=None):
Expand All @@ -61,33 +151,41 @@ async def entire_workflow_update(self, ids=None):

# Prune old data
for w_id in list(self.data):
if w_id not in self.ws_mgr.workflows:
if w_id not in self.workflows_mgr.workflows:
del self.data[w_id]

# Fetch new data
ws_args = (
(w_id, info['req_client'], 'pb_entire_workflow')
for w_id, info in self.ws_mgr.workflows.items())
# Request new data
req_method = 'pb_entire_workflow'
req_kwargs = (
{'client': info['req_client'],
'command': req_method,
'req_context': w_id}
for w_id, info in self.workflows_mgr.workflows.items())
gathers = ()
for args in ws_args:
if not ids or args[0] in ids:
gathers += (get_workflow_data(*args),)
for kwargs in req_kwargs:
if not ids or kwargs['req_context'] in ids:
gathers += (workflow_request(**kwargs),)
items = await asyncio.gather(*gathers)
new_data = {}
for w_id, result in items:
if result is not None and result != MSG_TIMEOUT:
pb_data = PB_METHOD_MAP[req_method]()
pb_data.ParseFromString(result)
new_data[w_id] = {
EDGES: {e.id: e for e in getattr(result, EDGES)},
FAMILIES: {f.id: f for f in getattr(result, FAMILIES)},
EDGES: {e.id: e for e in getattr(pb_data, EDGES)},
FAMILIES: {f.id: f for f in getattr(pb_data, FAMILIES)},
FAMILY_PROXIES: {
n.id: n
for n in getattr(result, FAMILY_PROXIES)},
JOBS: {j.id: j for j in getattr(result, JOBS)},
TASKS: {t.id: t for t in getattr(result, TASKS)},
for n in getattr(pb_data, FAMILY_PROXIES)},
JOBS: {j.id: j for j in getattr(pb_data, JOBS)},
TASKS: {t.id: t for t in getattr(pb_data, TASKS)},
TASK_PROXIES: {
n.id: n
for n in getattr(result, TASK_PROXIES)},
WORKFLOW: getattr(result, WORKFLOW),
for n in getattr(pb_data, TASK_PROXIES)},
WORKFLOW: getattr(pb_data, WORKFLOW),
'delta_times': {
topic: getattr(pb_data, WORKFLOW).last_updated
for topic in DELTAS_MAP.keys()}
}

self.data.update(new_data)
23 changes: 15 additions & 8 deletions cylc/uiserver/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import logging
import os
import signal
from functools import partial

from cylc.flow.network.schema import schema
from logging.config import dictConfig
Expand All @@ -44,9 +45,12 @@ def signal_handler(self, signum, frame):
logger.info('exiting...')
self.is_closing = True

def try_exit(self):
def try_exit(self, uis):
if self.is_closing:
# clean up here
for sub in uis.data_mgr.w_subs.values():
sub.stop()
uis.workflows_mgr.context.destroy()
ioloop.IOLoop.instance().stop()
logger.info('exit success')

Expand All @@ -61,11 +65,11 @@ def __init__(self, port, static, jupyter_hub_service_prefix):
script_dir = os.path.dirname(__file__)
self._static = os.path.abspath(os.path.join(script_dir, static))
self._jupyter_hub_service_prefix = jupyter_hub_service_prefix
self.ws_mgr = WorkflowsManager()
self.data_mgr = DataManager(self.ws_mgr)
self.workflows_mgr = WorkflowsManager()
self.data_mgr = DataManager(self.workflows_mgr)
self.resolvers = Resolvers(
self.data_mgr.data,
ws_mgr=self.ws_mgr)
workflows_mgr=self.workflows_mgr)

def _make_app(self, debug: bool):
"""Crete a Tornado web application.
Expand Down Expand Up @@ -126,14 +130,17 @@ def start(self, debug: bool):
app = self._make_app(debug)
signal.signal(signal.SIGINT, app.signal_handler)
app.listen(self._port)
ioloop.PeriodicCallback(app.try_exit, 100).start()
ioloop.PeriodicCallback(
partial(app.try_exit, uis=self), 100).start()
# Discover workflows on initial start up.
ioloop.IOLoop.current().add_callback(self.ws_mgr.gather_workflows)
ioloop.IOLoop.current().add_callback(
self.workflows_mgr.gather_workflows)
# Start the workflow data-store sync
ioloop.IOLoop.current().add_callback(self.data_mgr.sync_workflows)
# If the client is already established it's not overridden,
# so the following callbacks can happen at the same time.
ioloop.PeriodicCallback(self.ws_mgr.gather_workflows, 10000).start()
ioloop.PeriodicCallback(
self.data_mgr.entire_workflow_update, 5000).start()
self.workflows_mgr.gather_workflows, 10000).start()
try:
ioloop.IOLoop.current().start()
except KeyboardInterrupt:
Expand Down
6 changes: 3 additions & 3 deletions cylc/uiserver/resolvers.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
class Resolvers(BaseResolvers):
"""UI Server context GraphQL query and mutation resolvers."""

ws_mgr = None
workflows_mgr = None

def __init__(self, data, **kwargs):
super().__init__(data)
Expand All @@ -46,7 +46,7 @@ async def mutator(self, info, *m_args):
'request_string': req_str,
'variables': variables,
}
return self.ws_mgr.multi_request('graphql', w_ids, graphql_args)
return self.workflows_mgr.multi_request('graphql', w_ids, graphql_args)

async def nodes_mutator(self, info, *m_args):
"""Mutate node items of associated workflows."""
Expand All @@ -63,5 +63,5 @@ async def nodes_mutator(self, info, *m_args):
'variables': variables,
}
multi_args = {w_id: graphql_args for w_id in w_ids}
return self.ws_mgr.multi_request(
return self.workflows_mgr.multi_request(
'graphql', w_ids, multi_args=multi_args)
44 changes: 24 additions & 20 deletions cylc/uiserver/tests/test_workflows_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,28 +43,30 @@ async def test_workflow_request_client_error(async_client: AsyncClientFixture):


@pytest.mark.asyncio
@pytest.mark.parametrize("returns,command,context,expected_ctx,expected_msg", [
pytest.param(
42, 'cmd', None, 'cmd', 42
),
pytest.param(
42, '', None, '', 42
),
pytest.param(
42, 'cmd', 'some-context', 'some-context', 42
)
])
@pytest.mark.parametrize(
"returns,command,req_context,expected_ctx,expected_msg",
[
pytest.param(
42, 'cmd', None, 'cmd', 42
),
pytest.param(
42, '', None, '', 42
),
pytest.param(
42, 'cmd', 'some-context', 'some-context', 42
)
])
async def test_workflow_request(
async_client: AsyncClientFixture,
returns,
command,
context,
req_context,
expected_ctx,
expected_msg
):
async_client.will_return(returns)
ctx, msg = await workflow_request(
client=async_client, command=command, context=context)
client=async_client, command=command, req_context=req_context)
assert expected_ctx == ctx
assert expected_msg == msg

Expand All @@ -86,19 +88,19 @@ async def test_est_workflow_socket_error(
def side_effect(*_, **__):
raise socket.error
mocked_get_host_ip_by_name.side_effect = side_effect
reg, host, port, client = await est_workflow(
reg, host, port, pub_port, client = await est_workflow(
'', '', 0, 0)
assert not any([reg, host, port, client])
assert not any([reg, host, port, pub_port, client])
assert client is None


@pytest.mark.asyncio
@pytest.mark.parametrize('reg,host,port,timeout,expected_host', [
@pytest.mark.parametrize('reg,host,port,pub_port,timeout,expected_host', [
pytest.param(
'remote', 'remote', 8000, 1, 'remote_host'
'remote', 'remote', 8000, 8002, 1, 'remote_host'
),
pytest.param(
'local', 'localhost', 4000, 2, 'localhost'
'local', 'localhost', 4000, 4001, 2, 'localhost'
)
])
async def test_est_workflow(
Expand All @@ -107,6 +109,7 @@ async def test_est_workflow(
reg,
host,
port,
pub_port,
timeout,
expected_host
):
Expand All @@ -124,11 +127,12 @@ async def test_est_workflow(
mocked_get_host_ip_by_name.side_effect = lambda x: f'remote_host' \
if x == 'remote' else x

r_reg, r_host, r_port, r_client, r_result = await est_workflow(
reg, host, port, timeout)
r_reg, r_host, r_port, r_pub_port, r_client, r_result = await est_workflow(
reg, host, port, pub_port, timeout)
assert reg == r_reg
assert expected_host == r_host
assert port == r_port
assert pub_port == r_pub_port
assert r_client.__class__ == AsyncClientFixture


Expand Down
Loading

0 comments on commit adc4f12

Please sign in to comment.