Skip to content

Commit

Permalink
API version filter, module name change
Browse files Browse the repository at this point in the history
  • Loading branch information
dwsutherland committed Dec 3, 2019
1 parent be928a8 commit 5fafcf1
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 16 deletions.
6 changes: 3 additions & 3 deletions cylc/uiserver/data_mgr.py → cylc/uiserver/data_store_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
from cylc.flow.network.server import PB_METHOD_MAP
from cylc.flow.network.scan import MSG_TIMEOUT
from cylc.flow.network.subscriber import WorkflowSubscriber, process_delta_msg
from cylc.flow.ws_data_mgr import (
from cylc.flow.data_store_mgr import (
EDGES, FAMILIES, FAMILY_PROXIES, JOBS, TASKS, TASK_PROXIES, WORKFLOW,
DELTAS_MAP, apply_delta, generate_checksum
)
Expand All @@ -51,7 +51,7 @@
logger = logging.getLogger(__name__)


class DataManager:
class DataStoreMgr:
"""Manage the local data-store acquisition/updates for all workflows."""

INIT_DATA_WAIT_TIME = 5. # seconds
Expand Down Expand Up @@ -184,7 +184,7 @@ def reconcile_update(self, topic, delta, w_id):
self.loop
)
try:
_, new_delta_msg = future.result(5.0)
_, new_delta_msg = future.result(self.RECONCILE_TIMEOUT)
except asyncio.TimeoutError:
logger.info(
f'The reconcile update coroutine {w_id} {topic}'
Expand Down
12 changes: 6 additions & 6 deletions cylc/uiserver/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,20 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.

import argparse
from functools import partial
import json
import logging
from logging.config import dictConfig
import os
from os.path import join, abspath, dirname
import signal
from functools import partial

from cylc.flow.network.schema import schema
from tornado import web, ioloop

from jupyterhub.services.auth import HubOAuthCallbackHandler
from jupyterhub.utils import url_path_join
from .data_mgr import DataManager
from .data_store_mgr import DataStoreMgr
from .handlers import *
from .resolvers import Resolvers
from .workflows_mgr import WorkflowsManager
Expand All @@ -49,10 +49,10 @@ def try_exit(self, uis):
# clean up and stop in here
if self.is_closing:
# stop the subscribers running in the thread pool executor
for sub in uis.data_mgr.w_subs.values():
for sub in uis.data_store_mgr.w_subs.values():
sub.stop()
# Shutdown the thread pool executor
uis.data_mgr.executor.shutdown(wait=False)
uis.data_store_mgr.executor.shutdown(wait=False)
# Destroy ZeroMQ context of all sockets
uis.workflows_mgr.context.destroy()
ioloop.IOLoop.instance().stop()
Expand All @@ -70,9 +70,9 @@ def __init__(self, port, static, jupyter_hub_service_prefix):
self._static = os.path.abspath(os.path.join(script_dir, static))
self._jupyter_hub_service_prefix = jupyter_hub_service_prefix
self.workflows_mgr = WorkflowsManager(self)
self.data_mgr = DataManager(self.workflows_mgr)
self.data_store_mgr = DataStoreMgr(self.workflows_mgr)
self.resolvers = Resolvers(
self.data_mgr.data,
self.data_store_mgr.data,
workflows_mgr=self.workflows_mgr)

def _make_app(self, debug: bool):
Expand Down
2 changes: 1 addition & 1 deletion cylc/uiserver/resolvers.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
"""GraphQL resolvers for use in data accessing and mutation of workflows."""

from cylc.flow.network.resolvers import BaseResolvers
from cylc.flow.ws_data_mgr import WORKFLOW
from cylc.flow.data_store_mgr import WORKFLOW


class Resolvers(BaseResolvers):
Expand Down
25 changes: 19 additions & 6 deletions cylc/uiserver/workflows_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,31 @@
from cylc.flow import flags
from cylc.flow.exceptions import ClientError, ClientTimeout
from cylc.flow.hostuserutil import is_remote_host, get_host_ip_by_name
from cylc.flow.network import API
from cylc.flow.network.client import SuiteRuntimeClient
from cylc.flow.network.scan import (
get_scan_items_from_fs, re_compile_filters, MSG_TIMEOUT)
from cylc.flow.ws_data_mgr import ID_DELIM
from cylc.flow.data_store_mgr import ID_DELIM

logger = logging.getLogger(__name__)
CLIENT_TIMEOUT = 2.0


async def workflow_request(client, command, args=None,
timeout=None, req_context=None):
"""Workflow request command."""
"""Workflow request command.
Args:
client (object): Instantiated workflow client.
command (str): Command/Endpoint name.
args (dict): Endpoint arguments.
timeout (float): Client request timeout (secs).
req_context (str): A string to identifier.
Returns:
tuple: (req_context, result)
"""
if req_context is None:
req_context = command
try:
Expand Down Expand Up @@ -102,9 +115,9 @@ async def gather_workflows(self):
cre_owner, cre_name = re_compile_filters(None, ['.*'])
scan_args = (
(reg, host, port, pub_port, self.context, CLIENT_TIMEOUT)
for reg, host, port, pub_port in
for reg, host, port, pub_port, api in
get_scan_items_from_fs(cre_owner, cre_name)
if reg not in self.stopping)
if reg not in self.stopping and api == str(API))
# clear stopping set
self.stopping.clear()

Expand Down Expand Up @@ -139,14 +152,14 @@ async def gather_workflows(self):
with suppress(IOError):
client.stop(stop_loop=False)
self.workflows.pop(w_id)
self.uiserver.data_mgr.purge_workflow(w_id)
self.uiserver.data_store_mgr.purge_workflow(w_id)

# update with new, and start data sync
gathers = ()
for w_id, w_info in scanflows.items():
self.workflows[w_id] = w_info
gathers += (
self.uiserver.data_mgr.sync_workflow(
self.uiserver.data_store_mgr.sync_workflow(
w_id, w_info['name'], w_info['host'], w_info['pub_port']
),
)
Expand Down

0 comments on commit 5fafcf1

Please sign in to comment.