Skip to content

Commit

Permalink
API version filter, module name change
Browse files Browse the repository at this point in the history
  • Loading branch information
dwsutherland committed Nov 29, 2019
1 parent be928a8 commit 6039dae
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 13 deletions.
4 changes: 2 additions & 2 deletions cylc/uiserver/data_mgr.py → cylc/uiserver/data_store_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
from cylc.flow.network.server import PB_METHOD_MAP
from cylc.flow.network.scan import MSG_TIMEOUT
from cylc.flow.network.subscriber import WorkflowSubscriber, process_delta_msg
from cylc.flow.ws_data_mgr import (
from cylc.flow.data_store_mgr import (
EDGES, FAMILIES, FAMILY_PROXIES, JOBS, TASKS, TASK_PROXIES, WORKFLOW,
DELTAS_MAP, apply_delta, generate_checksum
)
Expand All @@ -51,7 +51,7 @@
logger = logging.getLogger(__name__)


class DataManager:
class DataStoreMgr:
"""Manage the local data-store acquisition/updates for all workflows."""

INIT_DATA_WAIT_TIME = 5. # seconds
Expand Down
10 changes: 5 additions & 5 deletions cylc/uiserver/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@

from jupyterhub.services.auth import HubOAuthCallbackHandler
from jupyterhub.utils import url_path_join
from .data_mgr import DataManager
from .data_store_mgr import DataStoreMgr
from .handlers import *
from .resolvers import Resolvers
from .workflows_mgr import WorkflowsManager
Expand All @@ -49,10 +49,10 @@ def try_exit(self, uis):
# clean up and stop in here
if self.is_closing:
# stop the subscribers running in the thread pool executor
for sub in uis.data_mgr.w_subs.values():
for sub in uis.data_store_mgr.w_subs.values():
sub.stop()
# Shutdown the thread pool executor
uis.data_mgr.executor.shutdown(wait=False)
uis.data_store_mgr.executor.shutdown(wait=False)
# Destroy ZeroMQ context of all sockets
uis.workflows_mgr.context.destroy()
ioloop.IOLoop.instance().stop()
Expand All @@ -70,9 +70,9 @@ def __init__(self, port, static, jupyter_hub_service_prefix):
self._static = os.path.abspath(os.path.join(script_dir, static))
self._jupyter_hub_service_prefix = jupyter_hub_service_prefix
self.workflows_mgr = WorkflowsManager(self)
self.data_mgr = DataManager(self.workflows_mgr)
self.data_store_mgr = DataStoreMgr(self.workflows_mgr)
self.resolvers = Resolvers(
self.data_mgr.data,
self.data_store_mgr.data,
workflows_mgr=self.workflows_mgr)

def _make_app(self, debug: bool):
Expand Down
2 changes: 1 addition & 1 deletion cylc/uiserver/resolvers.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
"""GraphQL resolvers for use in data accessing and mutation of workflows."""

from cylc.flow.network.resolvers import BaseResolvers
from cylc.flow.ws_data_mgr import WORKFLOW
from cylc.flow.data_store_mgr import WORKFLOW


class Resolvers(BaseResolvers):
Expand Down
11 changes: 6 additions & 5 deletions cylc/uiserver/workflows_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,11 @@
from cylc.flow import flags
from cylc.flow.exceptions import ClientError, ClientTimeout
from cylc.flow.hostuserutil import is_remote_host, get_host_ip_by_name
from cylc.flow.network import API
from cylc.flow.network.client import SuiteRuntimeClient
from cylc.flow.network.scan import (
get_scan_items_from_fs, re_compile_filters, MSG_TIMEOUT)
from cylc.flow.ws_data_mgr import ID_DELIM
from cylc.flow.data_store_mgr import ID_DELIM

logger = logging.getLogger(__name__)
CLIENT_TIMEOUT = 2.0
Expand Down Expand Up @@ -102,9 +103,9 @@ async def gather_workflows(self):
cre_owner, cre_name = re_compile_filters(None, ['.*'])
scan_args = (
(reg, host, port, pub_port, self.context, CLIENT_TIMEOUT)
for reg, host, port, pub_port in
for reg, host, port, pub_port, api in
get_scan_items_from_fs(cre_owner, cre_name)
if reg not in self.stopping)
if reg not in self.stopping and api == str(API))
# clear stopping set
self.stopping.clear()

Expand Down Expand Up @@ -139,14 +140,14 @@ async def gather_workflows(self):
with suppress(IOError):
client.stop(stop_loop=False)
self.workflows.pop(w_id)
self.uiserver.data_mgr.purge_workflow(w_id)
self.uiserver.data_store_mgr.purge_workflow(w_id)

# update with new, and start data sync
gathers = ()
for w_id, w_info in scanflows.items():
self.workflows[w_id] = w_info
gathers += (
self.uiserver.data_mgr.sync_workflow(
self.uiserver.data_store_mgr.sync_workflow(
w_id, w_info['name'], w_info['host'], w_info['pub_port']
),
)
Expand Down

0 comments on commit 6039dae

Please sign in to comment.