Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WFS Data Store Sync #95

Merged
merged 5 commits into from
Dec 3, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
174 changes: 138 additions & 36 deletions cylc/uiserver/data_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,39 +19,133 @@
import asyncio
import logging

from cylc.flow.exceptions import ClientError, ClientTimeout
from concurrent.futures import ThreadPoolExecutor
from functools import partial
from time import sleep

from cylc.flow.network.server import PB_METHOD_MAP
from cylc.flow.network.scan import MSG_TIMEOUT
from cylc.flow.network.subscriber import WorkflowSubscriber, process_delta_msg
from cylc.flow.ws_data_mgr import (
EDGES, FAMILIES, FAMILY_PROXIES, JOBS, TASKS, TASK_PROXIES, WORKFLOW
EDGES, FAMILIES, FAMILY_PROXIES, JOBS, TASKS, TASK_PROXIES, WORKFLOW,
DELTAS_MAP, apply_delta, generate_checksum
)
from .workflows_mgr import workflow_request

logger = logging.getLogger(__name__)


async def get_workflow_data(w_id, client, method):
"""Call WS endpoint for entire workflow protobuf message."""
# Use already established client
try:
pb_msg = await client.async_request(method)
except ClientTimeout as exc:
logger.exception(exc)
return (w_id, MSG_TIMEOUT)
except ClientError as exc:
logger.exception(exc)
return (w_id, None)
else:
ws_data = PB_METHOD_MAP[method]()
ws_data.ParseFromString(pb_msg)
return (w_id, ws_data)


class DataManager:
"""Manage the local data-store acquisition/updates from all workflows."""

def __init__(self, ws_mgr):
self.ws_mgr = ws_mgr
def __init__(self, workfloworkflows_mgr):
self.workflows_mgr = workfloworkflows_mgr
self.data = {}
self.w_subs = {}
self.topics = {topic.encode('utf-8') for topic in DELTAS_MAP}
self.loop = None

async def sync_workflows(self):
"""Run data store sync with workflow services.

Subscriptions and sync management is instantiated and run in
a separate thread for each workflow. This is to avoid the sync loop
blocking the main loop.
"""
self.loop = asyncio.get_running_loop()
# Might be options other than threads to achieve
# non-blocking subscriptions, but this works.
with ThreadPoolExecutor() as pool:
while True:
for w_id, info in self.workflows_mgr.workflows.items():
if w_id not in self.w_subs:
pool.submit(
partial(
self._start_subscription,
w_id,
info['host'],
info['pub_port']))
await self.entire_workflow_update(ids=[w_id])
for w_id in list(self.w_subs):
if w_id not in self.workflows_mgr.workflows:
self.w_subs[w_id].stop()
del self.w_subs[w_id]
if w_id in self.data:
del self.data[w_id]

await asyncio.sleep(1.0)

def _start_subscription(self, w_id, host, port):
"""Instatiate and run subscriber and data-store management."""
self.w_subs[w_id] = WorkflowSubscriber(
host, port,
context=self.workflows_mgr.context, topics=self.topics)
self.w_subs[w_id].loop.run_until_complete(
self.w_subs[w_id].subscribe(
process_delta_msg,
func=self.update_workflow_data,
w_id=w_id))

def update_workflow_data(self, topic, delta, w_id):
"""Manage and apply incomming data-store deltas."""
loop_cnt = 0
while loop_cnt < 5:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another sleep loop to try and replace at a later date.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe (?).. Subscription happens first, but needs to wait around for initial data payload from REQ client... Publishes are queued until the subscriber recv them, earlier than payload deltas are ignored.

if w_id not in self.data:
sleep(0.5)
loop_cnt += 1
continue
break
if w_id not in self.data:
return
delta_time = getattr(
delta, 'time', getattr(delta, 'last_updated', 0.0))
# If the workflow has reloaded recreate the data
if delta.reloaded:
self.data[w_id][topic] = {ele.id: ele for ele in delta.deltas}
self.data[w_id]['delta_times'][topic] = delta_time
elif delta_time >= self.data[w_id]['delta_times'][topic]:
apply_delta(topic, delta, self.data[w_id])
self.data[w_id]['delta_times'][topic] = delta_time
self.reconcile_update(topic, delta, w_id)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can catch inconstancies early at this point in code rather than waiting for the next reconcile_update?

else:
    rebuild_store()

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unless I missed your point: The elif delta_time >= self.data[w_id]['delta_times'][topic]: statement filters for valid deltas to apply, so you don't really want to do anything to the store with an invalid delta (delta creation time < latest applied delta creation time).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wondering about the impact of out of order messages and whether there is the potential for the data store to become corrupted but for us not to notice until the next message comes in.

I think you've got this covered with the topic/checksum system so probably irrelevant:

  • If the workflow sends messages: 1, 2, 3
  • And the UI Server receives messages: 1, 3, 2
  • Then we know something is out-of-wack, we don't need to wait for message 4 to find that out.

I think this system should pick up on the error, if messages 1, 2 and 3 relate to different topics is the the potential for anything to fall down the cracks?

Copy link
Member Author

@dwsutherland dwsutherland Nov 28, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if the ZMQ message queue can be out of order? and if you hit recv on the subscriber you'd just get the oldest first (post connect).. (unless HWM has been hit, then you'd miss any pruned)

Different topics each hit their different respective part of the data store. There's no confusion of topic, as the topic and respective data arrive together.

def reconcile_update(self, topic, delta, w_id):
"""Reconcile local with workflow data-store.

Verify data-store is in sync by topic/element-type
and on failure request entire set of respective data elements.

"""
if topic == WORKFLOW:
return
if topic == EDGES:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why use id for edges and stamp for everything else?

Copy link
Member Author

@dwsutherland dwsutherland Nov 27, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because edges don't have dynamic fields (no plan to change that (yet!)). Once created they don't change, a suite reload replaces existing, and they only get deleted thereafter. (guess for consistency we could still do it the same way...)

s_att = 'id'
else:
s_att = 'stamp'
local_checksum = generate_checksum(
[getattr(e, s_att)
for e in self.data[w_id][topic].values()])
if local_checksum != delta.checksum:
future = asyncio.run_coroutine_threadsafe(
workflow_request(
self.workflows_mgr.workflows[w_id]['req_client'],
'pb_data_elements',
args={'element_type': topic}
),
self.loop
)
try:
_, new_delta_msg = future.result(5.0)
except asyncio.TimeoutError:
logger.info(f'The reconcile update coroutine {w_id} {topic}'
f'took too long, cancelling the task...')
future.cancel()
except Exception as exc:
logger.exception(exc)
else:
new_delta = DELTAS_MAP[topic]()
new_delta.ParseFromString(new_delta_msg)
apply_delta(topic, new_delta, self.data[w_id])
self.data[w_id]['delta_times'][topic] = new_delta.time

# Data syncing
async def entire_workflow_update(self, ids=None):
Expand All @@ -61,33 +155,41 @@ async def entire_workflow_update(self, ids=None):

# Prune old data
for w_id in list(self.data):
if w_id not in self.ws_mgr.workflows:
if w_id not in self.workflows_mgr.workflows:
del self.data[w_id]

# Fetch new data
ws_args = (
(w_id, info['req_client'], 'pb_entire_workflow')
for w_id, info in self.ws_mgr.workflows.items())
# Request new data
req_method = 'pb_entire_workflow'
req_kwargs = (
{'client': info['req_client'],
'command': req_method,
'req_context': w_id}
for w_id, info in self.workflows_mgr.workflows.items())
gathers = ()
for args in ws_args:
if not ids or args[0] in ids:
gathers += (get_workflow_data(*args),)
for kwargs in req_kwargs:
if not ids or kwargs['req_context'] in ids:
gathers += (workflow_request(**kwargs),)
items = await asyncio.gather(*gathers)
new_data = {}
for w_id, result in items:
if result is not None and result != MSG_TIMEOUT:
pb_data = PB_METHOD_MAP[req_method]()
pb_data.ParseFromString(result)
new_data[w_id] = {
EDGES: {e.id: e for e in getattr(result, EDGES)},
FAMILIES: {f.id: f for f in getattr(result, FAMILIES)},
EDGES: {e.id: e for e in getattr(pb_data, EDGES)},
FAMILIES: {f.id: f for f in getattr(pb_data, FAMILIES)},
FAMILY_PROXIES: {
n.id: n
for n in getattr(result, FAMILY_PROXIES)},
JOBS: {j.id: j for j in getattr(result, JOBS)},
TASKS: {t.id: t for t in getattr(result, TASKS)},
for n in getattr(pb_data, FAMILY_PROXIES)},
JOBS: {j.id: j for j in getattr(pb_data, JOBS)},
TASKS: {t.id: t for t in getattr(pb_data, TASKS)},
TASK_PROXIES: {
n.id: n
for n in getattr(result, TASK_PROXIES)},
WORKFLOW: getattr(result, WORKFLOW),
for n in getattr(pb_data, TASK_PROXIES)},
WORKFLOW: getattr(pb_data, WORKFLOW),
'delta_times': {
topic: getattr(pb_data, WORKFLOW).last_updated
for topic in DELTAS_MAP.keys()}
}

self.data.update(new_data)
23 changes: 15 additions & 8 deletions cylc/uiserver/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import logging
import os
import signal
from functools import partial

from cylc.flow.network.schema import schema
from logging.config import dictConfig
Expand All @@ -44,9 +45,12 @@ def signal_handler(self, signum, frame):
logger.info('exiting...')
self.is_closing = True

def try_exit(self):
def try_exit(self, uis):
if self.is_closing:
# clean up here
for sub in uis.data_mgr.w_subs.values():
sub.stop()
uis.workflows_mgr.context.destroy()
ioloop.IOLoop.instance().stop()
logger.info('exit success')

Expand All @@ -61,11 +65,11 @@ def __init__(self, port, static, jupyter_hub_service_prefix):
script_dir = os.path.dirname(__file__)
self._static = os.path.abspath(os.path.join(script_dir, static))
self._jupyter_hub_service_prefix = jupyter_hub_service_prefix
self.ws_mgr = WorkflowsManager()
self.data_mgr = DataManager(self.ws_mgr)
self.workflows_mgr = WorkflowsManager()
self.data_mgr = DataManager(self.workflows_mgr)
self.resolvers = Resolvers(
self.data_mgr.data,
ws_mgr=self.ws_mgr)
workflows_mgr=self.workflows_mgr)

def _make_app(self, debug: bool):
"""Crete a Tornado web application.
Expand Down Expand Up @@ -126,14 +130,17 @@ def start(self, debug: bool):
app = self._make_app(debug)
signal.signal(signal.SIGINT, app.signal_handler)
app.listen(self._port)
ioloop.PeriodicCallback(app.try_exit, 100).start()
ioloop.PeriodicCallback(
partial(app.try_exit, uis=self), 100).start()
# Discover workflows on initial start up.
ioloop.IOLoop.current().add_callback(self.ws_mgr.gather_workflows)
ioloop.IOLoop.current().add_callback(
self.workflows_mgr.gather_workflows)
# Start the workflow data-store sync
ioloop.IOLoop.current().add_callback(self.data_mgr.sync_workflows)
# If the client is already established it's not overridden,
# so the following callbacks can happen at the same time.
ioloop.PeriodicCallback(self.ws_mgr.gather_workflows, 10000).start()
ioloop.PeriodicCallback(
self.data_mgr.entire_workflow_update, 5000).start()
self.workflows_mgr.gather_workflows, 10000).start()
try:
ioloop.IOLoop.current().start()
except KeyboardInterrupt:
Expand Down
6 changes: 3 additions & 3 deletions cylc/uiserver/resolvers.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
class Resolvers(BaseResolvers):
"""UI Server context GraphQL query and mutation resolvers."""

ws_mgr = None
workflows_mgr = None

def __init__(self, data, **kwargs):
super().__init__(data)
Expand All @@ -46,7 +46,7 @@ async def mutator(self, info, *m_args):
'request_string': req_str,
'variables': variables,
}
return self.ws_mgr.multi_request('graphql', w_ids, graphql_args)
return self.workflows_mgr.multi_request('graphql', w_ids, graphql_args)

async def nodes_mutator(self, info, *m_args):
"""Mutate node items of associated workflows."""
Expand All @@ -63,5 +63,5 @@ async def nodes_mutator(self, info, *m_args):
'variables': variables,
}
multi_args = {w_id: graphql_args for w_id in w_ids}
return self.ws_mgr.multi_request(
return self.workflows_mgr.multi_request(
'graphql', w_ids, multi_args=multi_args)
44 changes: 24 additions & 20 deletions cylc/uiserver/tests/test_workflows_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,28 +43,30 @@ async def test_workflow_request_client_error(async_client: AsyncClientFixture):


@pytest.mark.asyncio
@pytest.mark.parametrize("returns,command,context,expected_ctx,expected_msg", [
pytest.param(
42, 'cmd', None, 'cmd', 42
),
pytest.param(
42, '', None, '', 42
),
pytest.param(
42, 'cmd', 'some-context', 'some-context', 42
)
])
@pytest.mark.parametrize(
"returns,command,req_context,expected_ctx,expected_msg",
[
pytest.param(
42, 'cmd', None, 'cmd', 42
),
pytest.param(
42, '', None, '', 42
),
pytest.param(
42, 'cmd', 'some-context', 'some-context', 42
)
])
async def test_workflow_request(
async_client: AsyncClientFixture,
returns,
command,
context,
req_context,
expected_ctx,
expected_msg
):
async_client.will_return(returns)
ctx, msg = await workflow_request(
client=async_client, command=command, context=context)
client=async_client, command=command, req_context=req_context)
assert expected_ctx == ctx
assert expected_msg == msg

Expand All @@ -86,19 +88,19 @@ async def test_est_workflow_socket_error(
def side_effect(*_, **__):
raise socket.error
mocked_get_host_ip_by_name.side_effect = side_effect
reg, host, port, client = await est_workflow(
reg, host, port, pub_port, client = await est_workflow(
'', '', 0, 0)
assert not any([reg, host, port, client])
assert not any([reg, host, port, pub_port, client])
assert client is None


@pytest.mark.asyncio
@pytest.mark.parametrize('reg,host,port,timeout,expected_host', [
@pytest.mark.parametrize('reg,host,port,pub_port,timeout,expected_host', [
pytest.param(
'remote', 'remote', 8000, 1, 'remote_host'
'remote', 'remote', 8000, 8002, 1, 'remote_host'
),
pytest.param(
'local', 'localhost', 4000, 2, 'localhost'
'local', 'localhost', 4000, 4001, 2, 'localhost'
)
])
async def test_est_workflow(
Expand All @@ -107,6 +109,7 @@ async def test_est_workflow(
reg,
host,
port,
pub_port,
timeout,
expected_host
):
Expand All @@ -124,11 +127,12 @@ async def test_est_workflow(
mocked_get_host_ip_by_name.side_effect = lambda x: f'remote_host' \
if x == 'remote' else x

r_reg, r_host, r_port, r_client, r_result = await est_workflow(
reg, host, port, timeout)
r_reg, r_host, r_port, r_pub_port, r_client, r_result = await est_workflow(
reg, host, port, pub_port, timeout)
assert reg == r_reg
assert expected_host == r_host
assert port == r_port
assert pub_port == r_pub_port
assert r_client.__class__ == AsyncClientFixture


Expand Down
Loading