Skip to content

Commit

Permalink
fix tests, sync comments & docstrings
Browse files Browse the repository at this point in the history
  • Loading branch information
dwsutherland committed Nov 12, 2019
1 parent 1f3a0b8 commit f5c16cc
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 14 deletions.
59 changes: 51 additions & 8 deletions cylc/uiserver/data_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,24 @@
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""Manage a local data-store replica of all workflow service data-stores.
"""Create and update the data structure for all workflow services."""
A local data-store is created and synced for all workflows established by
the workflow service manager.
The workflows publish the updated fields of the updated data elements (deltas),
and these elements are grouped by type/topic. Once subscribed to, the publisher
queues these messages until the are received, if the delta creation time is
newer than that of the last update then it is applied (updates merged, pruned
deleted) then a checksum is generated from the time stamped IDs and compared to
the published one.
Reconciliation on failed verification is done by requesting all elements of a
topic, and replacing the respective data-store elements with this.
Subscriptions are currently run in a different thread (via ThreadPoolExecutor).
"""

import asyncio
import logging
Expand All @@ -36,10 +52,10 @@


class DataManager:
"""Manage the local data-store acquisition/updates from all workflows."""
"""Manage the local data-store acquisition/updates for all workflows."""

def __init__(self, workfloworkflows_mgr):
self.workflows_mgr = workfloworkflows_mgr
def __init__(self, workflows_mgr):
self.workflows_mgr = workflows_mgr
self.data = {}
self.w_subs = {}
self.topics = {topic.encode('utf-8') for topic in DELTAS_MAP}
Expand Down Expand Up @@ -76,7 +92,14 @@ async def sync_workflows(self):
await asyncio.sleep(1.0)

def _start_subscription(self, w_id, host, port):
"""Instatiate and run subscriber and data-store management."""
"""Instatiate and run subscriber and data-store management.
Args:
w_id (str): Workflow external ID.
host (str): Hostname of target workflow.
port (int): Port of target workflow.
"""
self.w_subs[w_id] = WorkflowSubscriber(
host, port,
context=self.workflows_mgr.context, topics=self.topics)
Expand All @@ -87,7 +110,15 @@ def _start_subscription(self, w_id, host, port):
w_id=w_id))

def update_workflow_data(self, topic, delta, w_id):
"""Manage and apply incomming data-store deltas."""
"""Manage and apply incomming data-store deltas.
Args:
topic (str): topic of published data.
delta (object): Published protobuf message data container.
w_id (str): Workflow external ID.
"""
# wait until data-store is populated for this workflow
loop_cnt = 0
while loop_cnt < 5:
if w_id not in self.data:
Expand All @@ -100,6 +131,7 @@ def update_workflow_data(self, topic, delta, w_id):
delta_time = getattr(
delta, 'time', getattr(delta, 'last_updated', 0.0))
# If the workflow has reloaded recreate the data
# otherwise apply the delta if it's newer than the previously applied.
if delta.reloaded:
self.data[w_id][topic] = {ele.id: ele for ele in delta.deltas}
self.data[w_id]['delta_times'][topic] = delta_time
Expand All @@ -114,6 +146,11 @@ def reconcile_update(self, topic, delta, w_id):
Verify data-store is in sync by topic/element-type
and on failure request entire set of respective data elements.
Args:
topic (str): topic of published data.
delta (object): Published protobuf message data container.
w_id (str): Workflow external ID.
"""
if topic == WORKFLOW:
return
Expand All @@ -125,6 +162,7 @@ def reconcile_update(self, topic, delta, w_id):
[getattr(e, s_att)
for e in self.data[w_id][topic].values()])
if local_checksum != delta.checksum:
# use threadsafe as client socket is in main loop thread.
future = asyncio.run_coroutine_threadsafe(
workflow_request(
self.workflows_mgr.workflows[w_id]['req_client'],
Expand All @@ -147,9 +185,14 @@ def reconcile_update(self, topic, delta, w_id):
apply_delta(topic, new_delta, self.data[w_id])
self.data[w_id]['delta_times'][topic] = new_delta.time

# Data syncing
async def entire_workflow_update(self, ids=None):
"""Update all data of workflow(s) from associated WS."""
"""Update entire local data-store of workflow(s).
Args:
ids (list): List of workflow external IDs.
"""
if ids is None:
ids = []

Expand Down
5 changes: 4 additions & 1 deletion cylc/uiserver/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,12 @@ def signal_handler(self, signum, frame):
self.is_closing = True

def try_exit(self, uis):
# clean up and stop in here
if self.is_closing:
# clean up here
# stop the subscribers running in the thread pool executor
for sub in uis.data_mgr.w_subs.values():
sub.stop()
# Destroy ZeroMQ context of all sockets
uis.workflows_mgr.context.destroy()
ioloop.IOLoop.instance().stop()
logger.info('exit success')
Expand Down Expand Up @@ -130,6 +132,7 @@ def start(self, debug: bool):
app = self._make_app(debug)
signal.signal(signal.SIGINT, app.signal_handler)
app.listen(self._port)
# pass in server object for clean exit
ioloop.PeriodicCallback(
partial(app.try_exit, uis=self), 100).start()
# Discover workflows on initial start up.
Expand Down
9 changes: 4 additions & 5 deletions cylc/uiserver/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,15 @@
import inspect

import pytest
import zmq

from cylc.flow.network.client import ZMQClient
from cylc.flow.network import ZMQSocketBase


class AsyncClientFixture(ZMQClient):
class AsyncClientFixture(ZMQSocketBase):
pattern = zmq.REQ
host = ''
port = 0
encode_method = None
decode_method = None
secret_method = None

def __init__(self):
self.returns = None
Expand Down

0 comments on commit f5c16cc

Please sign in to comment.