Skip to content

Commit

Permalink
data_store_mgr: refactor workflow connection
Browse files Browse the repository at this point in the history
* Move error handling up a level into `connect_workflow` from the
  `_start_subscription` and `_entire_workflow_update` methods.
* Simplify tests (all exceptions are now caught in the same way).
* Remove the multi-workflow handling ability of
  `_entire_workflow_update`, this is unused and can now be achieved more
  easily via asyncio.gather as the threadding has been removed.
  • Loading branch information
oliver-sanders committed Apr 22, 2024
1 parent c730b51 commit 583b71d
Show file tree
Hide file tree
Showing 2 changed files with 123 additions and 191 deletions.
128 changes: 69 additions & 59 deletions cylc/uiserver/data_store_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
from functools import wraps
from pathlib import Path
import time
from typing import Dict, Optional, Set, NamedTuple
from typing import Dict, NamedTuple

from cylc.flow.exceptions import WorkflowStopped
from cylc.flow.id import Tokens
Expand Down Expand Up @@ -197,20 +197,21 @@ async def connect_workflow(self, w_id, contact_data):

self.delta_queues[w_id] = {}

self._start_subscription(
w_id,
contact_data['name'],
contact_data[CFF.HOST],
contact_data[CFF.PUBLISH_PORT],
)
successful_updates = await self._entire_workflow_update(ids=[w_id])

if w_id not in successful_updates:
# something went wrong, undo any changes to allow for subsequent
# connection attempts
self.log.info(f'failed to connect to {w_id}')
try:
# start the subscriber to keep this store updated
self._start_subscription(
w_id,
contact_data['name'],
contact_data[CFF.HOST],
contact_data[CFF.PUBLISH_PORT],
)
# make a one-off request to provide the initial data
await self._entire_workflow_update(w_id)
except WorkflowStopped:
self.disconnect_workflow(w_id)

Check warning on line 211 in cylc/uiserver/data_store_mgr.py

View check run for this annotation

Codecov / codecov/patch

cylc/uiserver/data_store_mgr.py#L211

Added line #L211 was not covered by tests
except Exception as exc:
self.log.error(f'Failed to connect to {w_id}: {exc}')
self.disconnect_workflow(w_id)
return False
else:
# don't update the contact data until we have successfully updated
self._update_contact(w_id, contact_data)
Expand Down Expand Up @@ -268,7 +269,13 @@ def _purge_workflow(self, w_id):
if w_id in self.delta_queues:
del self.delta_queues[w_id]

def _start_subscription(self, w_id: str, reg: str, host: str, port: int):
def _start_subscription(
self,
w_id: str,
reg: str,
host: str,
port: int,
) -> None:
"""Instantiate and run subscriber data-store sync.
Args:
Expand All @@ -277,22 +284,30 @@ def _start_subscription(self, w_id: str, reg: str, host: str, port: int):
host: Hostname of target workflow.
port: Port of target workflow.
Raises:
WorkflowStoppedError
"""
# create the subscription client
subscriber = WorkflowSubscriber(

Check warning on line 292 in cylc/uiserver/data_store_mgr.py

View check run for this annotation

Codecov / codecov/patch

cylc/uiserver/data_store_mgr.py#L292

Added line #L292 was not covered by tests
reg,
host=host,
port=port,
context=self.workflows_mgr.context,
topics=self.topics,
)

# start the subscription task
subscriber_task = asyncio.create_task(

Check warning on line 301 in cylc/uiserver/data_store_mgr.py

View check run for this annotation

Codecov / codecov/patch

cylc/uiserver/data_store_mgr.py#L301

Added line #L301 was not covered by tests
subscriber.subscribe(
process_delta_msg,
func=_call_to_tuple(self.message_queue.put_nowait),
w_id=w_id,
)
)
self.active_subscriptions[w_id] = ActiveSubscription(subscriber, subscriber_task)
self.active_subscriptions[w_id] = ActiveSubscription(

Check warning on line 308 in cylc/uiserver/data_store_mgr.py

View check run for this annotation

Codecov / codecov/patch

cylc/uiserver/data_store_mgr.py#L308

Added line #L308 was not covered by tests
subscriber, subscriber_task
)

def _stop_subscription(self, w_id: str) -> None:
"""Stop an active subscription.
Expand Down Expand Up @@ -426,55 +441,50 @@ def _reconcile_update(self, topic, delta, w_id):
self.log.exception(exc)

async def _entire_workflow_update(
self, ids: Optional[list] = None
) -> Set[str]:
"""Update entire local data-store of workflow(s).
self,
w_id: str,
req_method: str = 'pb_entire_workflow',
) -> None:
"""Call "req_method" on a workflow and put the data in the store.
Args:
ids: List of workflow external IDs.
w_id:
The workflow ID to fetch data from.
req_method:
The protobuf data endpoint to call on the workflow.
"""
if ids is None:
ids = []
Raises:
WorkflowStoppedError
# Request new data
req_method = 'pb_entire_workflow'

requests = {
w_id: workflow_request(
client=info['req_client'], command=req_method, log=self.log
)
for w_id, info in self.workflows_mgr.workflows.items()
if info.get('req_client') # skip stopped workflows
and (not ids or w_id in ids)
}
results = await asyncio.gather(
*requests.values(), return_exceptions=True
"""
# get the workflow client
client = self.workflows_mgr.workflows.get(w_id, {}).get(
'req_client', None
)
successes: Set[str] = set()
for w_id, result in zip(requests, results):
if isinstance(result, Exception):
if not isinstance(result, WorkflowStopped):
self.log.error(
'Failed to update entire local data-store '
f'of a workflow: {result}'
)
if not client:
raise WorkflowStopped(w_id)

Check warning on line 465 in cylc/uiserver/data_store_mgr.py

View check run for this annotation

Codecov / codecov/patch

cylc/uiserver/data_store_mgr.py#L465

Added line #L465 was not covered by tests

# request data
result = await workflow_request(
client=client,
command=req_method,
log=self.log,
)

# update the store
pb_data = PB_METHOD_MAP[req_method]()
pb_data.ParseFromString(result)
new_data = deepcopy(DATA_TEMPLATE)
for field, value in pb_data.ListFields():
if field.name == WORKFLOW:
new_data[field.name].CopyFrom(value)
new_data['delta_times'] = {
key: value.last_updated
for key in DATA_TEMPLATE
}
continue
pb_data = PB_METHOD_MAP[req_method]()
pb_data.ParseFromString(result)
new_data = deepcopy(DATA_TEMPLATE)
for field, value in pb_data.ListFields():
if field.name == WORKFLOW:
new_data[field.name].CopyFrom(value)
new_data['delta_times'] = {
key: value.last_updated
for key in DATA_TEMPLATE
}
continue
new_data[field.name] = {n.id: n for n in value}
self.data[w_id] = new_data
successes.add(w_id)
return successes
new_data[field.name] = {n.id: n for n in value}
self.data[w_id] = new_data

def _update_contact(
self,
Expand Down
Loading

0 comments on commit 583b71d

Please sign in to comment.