Skip to content

Commit

Permalink
set/clear contact data properly
Browse files Browse the repository at this point in the history
  • Loading branch information
oliver-sanders committed Sep 1, 2020
1 parent ef12c36 commit 47c3480
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 22 deletions.
67 changes: 52 additions & 15 deletions cylc/uiserver/data_store_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
from copy import deepcopy
from functools import partial
import logging
from time import sleep
import time

from cylc.flow.network.server import PB_METHOD_MAP
from cylc.flow.network import MSG_TIMEOUT
Expand All @@ -46,6 +46,8 @@
EDGES, DATA_TEMPLATE, ALL_DELTAS, DELTAS_MAP, WORKFLOW,
apply_delta, generate_checksum, create_delta_store
)
from cylc.flow.suite_files import ContactFileFields as CFF

from .workflows_mgr import workflow_request

logger = logging.getLogger(__name__)
Expand All @@ -67,7 +69,34 @@ def __init__(self, workflows_mgr):
self.executors = {}
self.delta_queues = {}

async def sync_workflow(self, w_id, *args, **kwargs):
def update_contact(self, w_id, contact_data=None):
delta = DELTAS_MAP[WORKFLOW]()
delta.time = time.time()
flow = delta.updated
flow.id = w_id
if contact_data:
# update with contact file data
flow.name = contact_data['name']
flow.owner = contact_data['owner']
flow.host = contact_data[CFF.HOST]
flow.port = int(contact_data[CFF.PORT])
flow.api_version = int(contact_data[CFF.API])
else:
# wipe pre-existing contact-file data
flow.host = ''
flow.port = 0
flow.api_version = 0
flow.status = 'stopped'

# Apply to existing workflow data
apply_delta(WORKFLOW, delta, self.data[w_id])
if 'delta_times' not in self.data[w_id]:
self.data[w_id]['delta_times'] = {}
self.data[w_id]['delta_times'][WORKFLOW] = delta.time
# Queue delta for subscription push
self.delta_store_to_queues(w_id, ALL_DELTAS, delta)

async def sync_workflow(self, w_id, contact_data):
"""Run data store sync with workflow services.
Subscriptions and sync management is instantiated and run in
Expand All @@ -76,6 +105,9 @@ async def sync_workflow(self, w_id, *args, **kwargs):
"""
print(f'$ sync_workflow({w_id})')

self.update_contact(w_id, contact_data)

if self.loop is None:
self.loop = asyncio.get_running_loop()
if w_id in self.w_subs:
Expand All @@ -87,25 +119,30 @@ async def sync_workflow(self, w_id, *args, **kwargs):
# non-blocking subscriptions, but this works.
self.executors[w_id] = ThreadPoolExecutor()
self.executors[w_id].submit(
partial(self.start_subscription, w_id, *args, **kwargs)
partial(
self.start_subscription,
w_id,
contact_data['name'],
contact_data[CFF.HOST],
contact_data[CFF.PORT]
)
)
await self.entire_workflow_update(ids=[w_id])

async def register_workflow(self, w_id, name, owner):
print(f'$ register_workflow({w_id})')
self.delta_queues[w_id] = {}

# create new entry in the data store
data = deepcopy(DATA_TEMPLATE)
flow = data[WORKFLOW]
flow.id = w_id
flow.name = name
flow.owner = owner
flow.status = 'stopped'
self.data[w_id] = data

# create new entry in the delta store
self.update_contact(w_id)

def stop_workflow(self, w_id):
print(f'$ stop_workflow({w_id})')
self.data[w_id]['status'] = 'stopped'
self.data[w_id]['host'] = None
self.data[w_id]['port'] = None
self.update_contact(w_id)

def purge_workflow(self, w_id):
"""Purge the manager of a workflow's subscription and data."""
Expand Down Expand Up @@ -158,7 +195,7 @@ def update_workflow_data(self, topic, delta, w_id):
while loop_cnt < self.INIT_DATA_WAIT_TIME:
if w_id in self.data:
break
sleep(self.INIT_DATA_RETRY_DELAY)
time.sleep(self.INIT_DATA_RETRY_DELAY)
loop_cnt += 1
continue
if topic == 'shutdown':
Expand Down Expand Up @@ -250,9 +287,9 @@ async def entire_workflow_update(self, ids=None):
ids = []

# Prune old data
for w_id in list(self.data):
if w_id not in self.workflows_mgr.active:
del self.data[w_id]
# for w_id in list(self.data):
# if w_id not in self.workflows_mgr.active:
# del self.data[w_id]

# Request new data
req_method = 'pb_entire_workflow'
Expand Down
11 changes: 4 additions & 7 deletions cylc/uiserver/workflows_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,16 +144,15 @@ async def _workflow_state_changes(self):
The scan data (i.e. the contents of the contact file).
"""
owner = getuser()

active_before = set(self.active)
inactive_before = set(self.inactive)

active = set()
inactive = set()

async for flow in self._scan_pipe:
wid = f'{owner}{ID_DELIM}{flow["name"]}'
flow['owner'] = self.owner
wid = f'{flow["owner"]}{ID_DELIM}{flow["name"]}'
flow['id'] = wid

if not flow.get('contact'):
Expand Down Expand Up @@ -197,7 +196,7 @@ async def _register(self, wid, flow):
"""Register a new workflow with the data store."""
print(f'_register({wid})')
await self.uiserver.data_store_mgr.register_workflow(
wid, flow['name'], self.owner
wid, flow['name'], flow['owner']
)

async def _connect(self, wid, flow):
Expand All @@ -207,9 +206,7 @@ async def _connect(self, wid, flow):
flow['req_client'] = SuiteRuntimeClient(flow['name'])
await self.uiserver.data_store_mgr.sync_workflow(
wid,
flow['name'],
flow[CFF.HOST],
flow[CFF.PUBLISH_PORT]
flow
)

async def _disconnect(self, wid):
Expand Down

0 comments on commit 47c3480

Please sign in to comment.