Skip to content

Commit

Permalink
test states and fix data purging
Browse files Browse the repository at this point in the history
  • Loading branch information
oliver-sanders committed Sep 1, 2020
1 parent 47c3480 commit ce27269
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 28 deletions.
36 changes: 21 additions & 15 deletions cylc/uiserver/data_store_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import logging
import time

from cylc.flow import ID_DELIM
from cylc.flow.network.server import PB_METHOD_MAP
from cylc.flow.network import MSG_TIMEOUT
from cylc.flow.network.subscriber import WorkflowSubscriber, process_delta_msg
Expand Down Expand Up @@ -80,11 +81,14 @@ def update_contact(self, w_id, contact_data=None):
flow.owner = contact_data['owner']
flow.host = contact_data[CFF.HOST]
flow.port = int(contact_data[CFF.PORT])
# flow.pub_port = int(contact_data[CFF.PUBLISH_PORT])
flow.api_version = int(contact_data[CFF.API])
else:
# wipe pre-existing contact-file data
flow.owner, flow.name = w_id.split(ID_DELIM)
flow.host = ''
flow.port = 0
# flow.pub_port = 0
flow.api_version = 0
flow.status = 'stopped'

Expand All @@ -104,16 +108,16 @@ async def sync_workflow(self, w_id, contact_data):
blocking the main loop.
"""
print(f'$ sync_workflow({w_id})')

self.update_contact(w_id, contact_data)

logger.debug(f'sync_workflow({w_id})')
if self.loop is None:
self.loop = asyncio.get_running_loop()

# don't sync if subscription exists
if w_id in self.w_subs:
return

self.delta_queues[w_id] = {}
self.update_contact(w_id, contact_data)

# Might be options other than threads to achieve
# non-blocking subscriptions, but this works.
Expand All @@ -124,13 +128,13 @@ async def sync_workflow(self, w_id, contact_data):
w_id,
contact_data['name'],
contact_data[CFF.HOST],
contact_data[CFF.PORT]
contact_data[CFF.PUBLISH_PORT]
)
)
await self.entire_workflow_update(ids=[w_id])

async def register_workflow(self, w_id, name, owner):
print(f'$ register_workflow({w_id})')
logger.debug(f'register_workflow({w_id})')
self.delta_queues[w_id] = {}

# create new entry in the data store
Expand All @@ -141,21 +145,23 @@ async def register_workflow(self, w_id, name, owner):
self.update_contact(w_id)

def stop_workflow(self, w_id):
print(f'$ stop_workflow({w_id})')
logger.debug(f'stop_workflow({w_id})')
self.update_contact(w_id)

def purge_workflow(self, w_id):
def purge_workflow(self, w_id, data=True):
"""Purge the manager of a workflow's subscription and data."""
print(f'$ purge_workflow({w_id})')
logger.debug(f'purge_workflow({w_id})')
if w_id in self.w_subs:
self.w_subs[w_id].stop()
del self.w_subs[w_id]
if w_id in self.data:
del self.data[w_id]
if w_id in self.delta_queues:
del self.delta_queues[w_id]
self.executors[w_id].shutdown(wait=True)
del self.executors[w_id]
if data:
if w_id in self.data:
del self.data[w_id]
if w_id in self.delta_queues:
del self.delta_queues[w_id]
if w_id in self.executors:
self.executors[w_id].shutdown(wait=True)
del self.executors[w_id]

def start_subscription(self, w_id, reg, host, port):
"""Instantiate and run subscriber data-store sync.
Expand Down
113 changes: 112 additions & 1 deletion cylc/uiserver/tests/test_workflows_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,22 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

from itertools import product
from pathlib import Path
import pytest
from random import random

from pytest_mock import MockFixture

from cylc.uiserver.workflows_mgr import *
from cylc.flow import ID_DELIM
from cylc.flow.exceptions import ClientTimeout
from cylc.flow.network import API
from cylc.flow.suite_files import (
ContactFileFields as CFF,
SuiteFiles
)

from cylc.uiserver.workflows_mgr import *

from .conftest import AsyncClientFixture

Expand Down Expand Up @@ -152,3 +162,104 @@ def test_workflows_manager_spawn_workflow():
assert not mgr.workflows

# TODO: add tests for remaining methods in WorkflowsManager


def mk_flow(path, reg, active=True):
"""Make a workflow appear on the filesystem for scan purposes.
Args:
path (pathlib.Path):
The directory to create the mocked workflow in.
reg (str):
The registered name for this workflow.
active (bool):
If True then a contact file will be written.
"""
run_dir = path / reg
srv_dir = run_dir / SuiteFiles.Service.DIRNAME
contact = srv_dir / SuiteFiles.Service.CONTACT
fconfig = run_dir / SuiteFiles.FLOW_FILE
run_dir.mkdir()
fconfig.touch() # cylc uses this to identify a dir as a workflow
if active:
srv_dir.mkdir()
with open(contact, 'w+') as contact_file:
contact_file.write(
'\n'.join([
f'{CFF.API}={API}',
f'{CFF.HOST}=42',
f'{CFF.PORT}=42',
f'{CFF.UUID}=42'
])
)


@pytest.mark.asyncio
@pytest.mark.parametrize(
# generate all possile state changes
'active_before,active_after',
product(['active', 'inactive', None], repeat=2)
)
async def test_workflow_state_changes(tmp_path, active_before, active_after):
"""It correctly identifies workflow state changes from the filesystem."""
tmp_path /= str(random())
tmp_path.mkdir()

# mock the results of the previous scan
wfm = WorkflowsManager(None, context=None, run_dir=tmp_path)
wid = f'{wfm.owner}{ID_DELIM}a'
if active_before == 'active':
wfm.active[wid] = {
CFF.API: API,
CFF.UUID: '42'
}
elif active_before == 'inactive':
wfm.inactive.add(wid)

# mock the filesystem in the new state
if active_after == 'active':
mk_flow(tmp_path, 'a', active=True)
if active_after == 'inactive':
mk_flow(tmp_path, 'a', active=False)

# see what state changes the workflow manager detects
changes = []
async for change in wfm._workflow_state_changes():
changes.append(change)

# compare those changes to expectations
if active_before == active_after:
assert changes == []
else:
assert len(changes) == 1
assert (wid, active_before, active_after) == changes[0][:3]


@pytest.mark.asyncio
async def test_workflow_state_change_restart(tmp_path):
"""It identifies workflows which have restarted between scans."""
# mock the result of the previous scan
wfm = WorkflowsManager(None, context=None, run_dir=tmp_path)
wid = f'{wfm.owner}{ID_DELIM}a'
wfm.active[wid] = {
CFF.API: API,
CFF.UUID: '41'
}

# create a new workflow with the same name but a different UUID
mk_flow(tmp_path, 'a', active=True)

# see what state changes the workflow manager detects
changes = []
async for change in wfm._workflow_state_changes():
changes.append(change)

# the flow should be marked as becomming inactive then active again
assert [change[:3] for change in changes] == [
(wid, 'active', 'inactive'),
(wid, 'inactive', 'active')
]

# it should have picked up the new uuid too
assert changes[1][3][CFF.UUID] == '42'
16 changes: 4 additions & 12 deletions cylc/uiserver/workflows_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,8 @@ async def est_workflow(reg, host, port, pub_port, context=None, timeout=None):


class WorkflowsManager:
"""Discover and Manage workflows."""

def __init__(self, uiserver, context=None):
def __init__(self, uiserver, context=None, run_dir=None):
self.uiserver = uiserver
if context is None:
self.context = zmq.asyncio.Context()
Expand All @@ -112,7 +111,7 @@ def __init__(self, uiserver, context=None):
self.inactive = set()
self._scan_pipe = (
# all flows on the filesystem
scan
scan(run_dir)
# only flows which have a contact file
# | is_active(True)
# stop here is the flow is stopped, else...
Expand Down Expand Up @@ -171,7 +170,7 @@ async def _workflow_state_changes(self):
):
# this flow is running but it's a different run
active.add(wid)
yield (wid, 'active', 'inactive', flow)
yield (wid, 'active', 'inactive', self.active[wid])
yield (wid, 'inactive', 'active', flow)

else:
Expand All @@ -190,18 +189,14 @@ async def _workflow_state_changes(self):
for wid in inactive_before - (active | inactive):
yield (wid, 'inactive', None, None)

# return active, inactive

async def _register(self, wid, flow):
"""Register a new workflow with the data store."""
print(f'_register({wid})')
await self.uiserver.data_store_mgr.register_workflow(
wid, flow['name'], flow['owner']
)

async def _connect(self, wid, flow):
"""Open a connection to a running workflow."""
print(f'_connect({wid})')
self.active[wid] = flow
flow['req_client'] = SuiteRuntimeClient(flow['name'])
await self.uiserver.data_store_mgr.sync_workflow(
Expand All @@ -211,21 +206,20 @@ async def _connect(self, wid, flow):

async def _disconnect(self, wid):
"""Disconnect from a running workflow."""
print(f'_disconnect({wid})')
client = self.active[wid]['req_client']
with suppress(IOError):
client.stop(stop_loop=False)

async def _unregister(self, wid):
"""Unregister a workflow from the data store."""
print(f'_unregister({wid})')
self.uiserver.data_store_mgr.purge_workflow(wid)

async def _stop(self, wid):
"""Mark a workflow as stopped.
The workflow can't do this itself, because it's not running.
"""
self.uiserver.data_store_mgr.purge_workflow(wid, data=False)
self.uiserver.data_store_mgr.stop_workflow(wid)

async def update(self):
Expand All @@ -245,8 +239,6 @@ async def update(self):
self.stopping.clear()

async for wid, before, after, flow in self._workflow_state_changes():
print(f'# {wid} {before}->{after}')

# handle state changes
if before == 'active' and after == 'inactive':
await self._disconnect(wid)
Expand Down

0 comments on commit ce27269

Please sign in to comment.