diff --git a/cylc/flow/option_parsers.py b/cylc/flow/option_parsers.py
index cb21f5e8bcf..96073ca4aef 100644
--- a/cylc/flow/option_parsers.py
+++ b/cylc/flow/option_parsers.py
@@ -44,6 +44,7 @@
)
WORKFLOW_ID_ARG_DOC = ('WORKFLOW', 'Workflow ID')
+OPT_WORKFLOW_ID_ARG_DOC = ('[WORKFLOW]', 'Workflow ID')
WORKFLOW_ID_MULTI_ARG_DOC = ('WORKFLOW ...', 'Workflow ID(s)')
WORKFLOW_ID_OR_PATH_ARG_DOC = ('WORKFLOW | PATH', 'Workflow ID or path')
ID_MULTI_ARG_DOC = ('ID ...', 'Workflow/Cycle/Family/Task ID(s)')
diff --git a/cylc/flow/scripts/tui.py b/cylc/flow/scripts/tui.py
index 86970052b46..c21fa0bd043 100644
--- a/cylc/flow/scripts/tui.py
+++ b/cylc/flow/scripts/tui.py
@@ -15,30 +15,35 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
-"""cylc tui WORKFLOW
+"""cylc tui [WORKFLOW]
View and control running workflows in the terminal.
(Tui = Terminal User Interface)
-WARNING: Tui is experimental and may break with large flows.
-An upcoming change to the way Tui receives data from the scheduler will make it
-much more efficient in the future.
+Tui allows you to monitor and interact with workflows in a manner similar
+to the GUI.
+
+Press "h" whilst running Tui to bring up the help screen, use the arrow
+keys to navigage.
+
"""
-# TODO: remove this warning once Tui is delta-driven
-# https://github.com/cylc/cylc-flow/issues/3527
+from getpass import getuser
from textwrap import indent
-from typing import TYPE_CHECKING
+from typing import TYPE_CHECKING, Optional
from urwid import html_fragment
+from cylc.flow.exceptions import InputError
+from cylc.flow.id import Tokens
from cylc.flow.id_cli import parse_id
from cylc.flow.option_parsers import (
- WORKFLOW_ID_ARG_DOC,
+ OPT_WORKFLOW_ID_ARG_DOC,
CylcOptionParser as COP,
)
from cylc.flow.terminal import cli_function
from cylc.flow.tui import TUI
+from cylc.flow.tui.util import suppress_logging
from cylc.flow.tui.app import (
TuiApp,
TREE_EXPAND_DEPTH
@@ -55,7 +60,7 @@
def get_option_parser() -> COP:
parser = COP(
__doc__,
- argdoc=[WORKFLOW_ID_ARG_DOC],
+ argdoc=[OPT_WORKFLOW_ID_ARG_DOC],
# auto_add=False, NOTE: at present auto_add can not be turned off
color=False
)
@@ -80,32 +85,58 @@ def get_option_parser() -> COP:
action='store',
default='80,24'
)
+ parser.add_option(
+ '--single-workflow', '-s',
+ help=(
+ 'Only show a single workflow rather than all workflows'
+ ),
+ action='store_true',
+ default=False,
+ )
return parser
-@cli_function(get_option_parser)
-def main(_, options: 'Values', workflow_id: str) -> None:
- workflow_id, *_ = parse_id(
- workflow_id,
- constraint='workflows',
+def configure_screenshot(v_term_size):
+ screen = html_fragment.HtmlGenerator()
+ screen.set_terminal_properties(256)
+ screen.register_palette(TuiApp.palette)
+ html_fragment.screenshot_init(
+ [tuple(map(int, v_term_size.split(',')))],
+ []
)
+ return screen, html_fragment
+
+
+@cli_function(get_option_parser)
+def main(_, options: 'Values', workflow_id: Optional[str] = None) -> None:
+ if options.single_workflow and not workflow_id:
+ raise InputError('--single-workflow requires a workflow ID')
+
+ # get workflow ID if specified
+ if workflow_id:
+ workflow_id, *_ = parse_id(
+ workflow_id,
+ constraint='workflows',
+ )
+ tokens = Tokens(workflow_id)
+ workflow_id = tokens.duplicate(user=getuser()).id
+
+ # configure Tui
screen = None
+ html_fragment = None
if options.display == 'html':
- TREE_EXPAND_DEPTH[0] = -1 # expand tree fully
- screen = html_fragment.HtmlGenerator()
- screen.set_terminal_properties(256)
- screen.register_palette(TuiApp.palette)
- html_fragment.screenshot_init(
- [tuple(map(int, options.v_term_size.split(',')))],
- []
- )
+ screen, html_fragment = configure_screenshot(options.v_term_size)
+ # start Tui
try:
- TuiApp(workflow_id, screen=screen).main()
+ with suppress_logging():
+ TuiApp(screen=screen).main(
+ workflow_id,
+ options.single_workflow,
+ )
if options.display == 'html':
- for fragment in html_fragment.screenshot_collect():
- print(fragment)
+ print(html_fragment.screenshot_collect()[-1])
except KeyboardInterrupt:
pass
diff --git a/cylc/flow/tui/__init__.py b/cylc/flow/tui/__init__.py
index 92e3bce0268..78ce0f36d62 100644
--- a/cylc/flow/tui/__init__.py
+++ b/cylc/flow/tui/__init__.py
@@ -143,6 +143,3 @@ def list_groups(self):
if binding['group'] == name
]
)
-
-
-BINDINGS = Bindings()
diff --git a/cylc/flow/tui/app.py b/cylc/flow/tui/app.py
index cf09f75db0e..72748de5ad7 100644
--- a/cylc/flow/tui/app.py
+++ b/cylc/flow/tui/app.py
@@ -16,22 +16,17 @@
# along with this program. If not, see .
"""The application control logic for Tui."""
+from contextlib import contextmanager
+from multiprocessing import Process
+import re
import sys
import urwid
from urwid import html_fragment
from urwid.wimp import SelectableIcon
-from pathlib import Path
-from cylc.flow.network.client_factory import get_client
-from cylc.flow.exceptions import (
- ClientError,
- ClientTimeout,
- WorkflowStopped
-)
-from cylc.flow.pathutil import get_workflow_run_dir
+from cylc.flow.id import Tokens
from cylc.flow.task_state import (
- TASK_STATUSES_ORDERED,
TASK_STATUS_SUBMITTED,
TASK_STATUS_RUNNING,
TASK_STATUS_FAILED,
@@ -41,7 +36,7 @@
)
import cylc.flow.tui.overlay as overlay
from cylc.flow.tui import (
- BINDINGS,
+ Bindings,
FORE,
BACK,
JOB_COLOURS,
@@ -49,21 +44,25 @@
)
from cylc.flow.tui.tree import (
find_closest_focus,
- translate_collapsing
+ translate_collapsing,
+ expand_tree,
+)
+from cylc.flow.tui.updater import (
+ Updater,
+ get_default_filters,
)
from cylc.flow.tui.util import (
- compute_tree,
dummy_flow,
- get_task_status_summary,
- get_workflow_status_str,
render_node
)
-from cylc.flow.workflow_files import WorkflowFiles
+from cylc.flow.workflow_status import (
+ WorkflowStatus,
+)
urwid.set_encoding('utf8') # required for unicode task icons
-TREE_EXPAND_DEPTH = [2]
+TREE_EXPAND_DEPTH = 5
class TuiWidget(urwid.TreeWidget):
@@ -82,18 +81,20 @@ class TuiWidget(urwid.TreeWidget):
# will skip rows when the user navigates
unexpandable_icon = SelectableIcon(' ', 0)
- def __init__(self, node, max_depth=None):
- if not max_depth:
- max_depth = TREE_EXPAND_DEPTH[0]
+ def __init__(self, app, node, max_depth=None):
+ self.app = app
self._node = node
self._innerwidget = None
self.is_leaf = not node.get_child_keys()
- if max_depth > 0:
- self.expanded = node.get_depth() < max_depth
- else:
- self.expanded = True
+ # if max_depth > 0:
+ # self.expanded = node.get_depth() < max_depth
+ # else:
+ # self.expanded = True
+ self.expanded = False
widget = self.get_indented_widget()
urwid.WidgetWrap.__init__(self, widget)
+ # self.expanded = False
+ # self.update_expanded_icon()
def selectable(self):
"""Return True if this node is selectable.
@@ -158,6 +159,19 @@ def get_indented_widget(self):
)
return self.__super.get_indented_widget()
+ def update_expanded_icon(self):
+ node = self.get_node()
+ value = node.get_value()
+ data = value['data']
+ type_ = value['type_']
+ if type_ == 'workflow':
+ if self.expanded:
+ self.app.updater.subscribe(data['id'])
+ self.app.expand_on_load = data['id']
+ else:
+ self.app.updater.unsubscribe(data['id'])
+ return urwid.TreeWidget.update_expanded_icon(self)
+
class TuiNode(urwid.TreeNode):
"""Data storage object for leaf nodes."""
@@ -169,8 +183,12 @@ def load_widget(self):
class TuiParentNode(urwid.ParentNode):
"""Data storage object for interior/parent nodes."""
+ def __init__(self, app, *args, **kwargs):
+ self.app = app
+ urwid.ParentNode.__init__(self, *args, **kwargs)
+
def load_widget(self):
- return TuiWidget(self)
+ return TuiWidget(self.app, self)
def load_child_keys(self):
# Note: keys are really indices.
@@ -185,6 +203,7 @@ def load_child_node(self, key):
else:
childclass = TuiNode
return childclass(
+ self.app,
childdata,
parent=self,
key=key,
@@ -192,6 +211,20 @@ def load_child_node(self, key):
)
+@contextmanager
+def updater_subproc(filters):
+ """Runs the Updater in its own process."""
+ # start the updater
+ updater = Updater()
+ p = Process(target=updater.start, args=(filters,))
+ try:
+ p.start()
+ yield updater
+ finally:
+ updater.terminate()
+ p.join()
+
+
class TuiApp:
"""An application to display a single Cylc workflow.
@@ -206,8 +239,7 @@ class TuiApp:
"""
- UPDATE_INTERVAL = 1
- CLIENT_TIMEOUT = 1
+ UPDATE_INTERVAL = 0.1
palette = [
('head', FORE, BACK),
@@ -227,16 +259,14 @@ class TuiApp:
for status, spec in WORKFLOW_COLOURS.items()
]
- def __init__(self, id_, screen=None):
- self.id_ = id_
- self.client = None
+ def __init__(self, screen=None):
self.loop = None
- self.screen = None
+ self.screen = screen
self.stack = 0
self.tree_walker = None
# create the template
- topnode = TuiParentNode(dummy_flow({'id': 'Loading...'}))
+ topnode = TuiParentNode(self, dummy_flow({'id': 'Loading...'}))
self.listbox = urwid.TreeListBox(urwid.TreeWalker(topnode))
header = urwid.Text('\n')
footer = urwid.AttrWrap(
@@ -249,27 +279,87 @@ def __init__(self, id_, screen=None):
header=urwid.AttrWrap(header, 'head'),
footer=footer
)
- self.filter_states = {
- state: True
- for state in TASK_STATUSES_ORDERED
- }
- if isinstance(screen, html_fragment.HtmlGenerator):
- # the HtmlGenerator only captures one frame
- # so we need to pre-populate the GUI before
- # starting the event loop
- self.update()
+ self.filters = {}
- def main(self):
+ def main(self, w_id=None, single_workflow=False, id_filter=None):
"""Start the event loop."""
- self.loop = urwid.MainLoop(
- self.view,
- self.palette,
- unhandled_input=self.unhandled_input,
- screen=self.screen
- )
- # schedule the first update
- self.loop.set_alarm_in(0, self._update)
- self.loop.run()
+ # TODO: move to INIT
+ self.filters = get_default_filters()
+ if single_workflow and w_id:
+ workflow = str(Tokens(w_id)['workflow'])
+ self.filters['workflows']['id'] = rf'^{re.escape(workflow)}$'
+ elif id_filter:
+ self.filters['workflows']['id'] = id_filter
+
+ with updater_subproc(self.filters) as updater:
+ self.updater = updater
+
+ # pre-subscribe to the provided workflow if requested
+ self.expand_on_load = w_id or 'root'
+ if w_id:
+ self.updater.subscribe(w_id)
+
+ # configure the urwid main loop
+ self.loop = urwid.MainLoop(
+ self.view,
+ self.palette,
+ unhandled_input=self.unhandled_input,
+ screen=self.screen
+ )
+
+ if isinstance(self.screen, html_fragment.HtmlGenerator):
+ # Tui has been configured to generate HTML screenshots
+ while self.expand_on_load:
+ # load the data and wait for any requested workflows to be
+ # loaded
+ self.update()
+ return
+ else:
+ # Tui is being run normally as an interactive application
+ # schedule the first update
+ self.loop.set_alarm_in(0, self._update)
+
+ # start the urwid main loop
+ self.loop.run()
+
+ @contextmanager
+ def start_noninteractive(self, w_id=None, single_workflow=False, id_filter=None):
+ """Start the event loop."""
+ # TODO: move to INIT
+ self.filters = get_default_filters()
+ if single_workflow and w_id:
+ workflow = str(Tokens(w_id)['workflow'])
+ self.filters['workflows']['id'] = rf'^{re.escape(workflow)}$'
+ elif id_filter:
+ self.filters['workflows']['id'] = id_filter
+
+ with updater_subproc(self.filters) as updater:
+ self.updater = updater
+
+ # pre-subscribe to the provided workflow if requested
+ self.expand_on_load = w_id or 'root'
+ if w_id:
+ self.updater.subscribe(w_id)
+
+ # configure the urwid main loop
+ self.loop = urwid.MainLoop(
+ self.view,
+ self.palette,
+ unhandled_input=self.unhandled_input,
+ screen=self.screen
+ )
+
+ self.wait_until_loaded(w_id or 'root')
+
+ yield self
+
+ def wait_until_loaded(self, *ids):
+ """Wait for any requested nodes to be created.
+ """
+ for id_ in ids:
+ self.expand_on_load = id_
+ while self.expand_on_load:
+ self.update()
def unhandled_input(self, key):
"""Catch key presses, uncaught events are passed down the chain."""
@@ -285,74 +375,6 @@ def unhandled_input(self, key):
meth(self, *args)
return
- def get_snapshot(self):
- """Contact the workflow, return a tree structure
-
- In the event of error contacting the workflow the
- message is written to this Widget's header.
-
- Returns:
- dict if successful, else False
-
- """
- try:
- if not self.client:
- self.client = get_client(self.id_, timeout=self.CLIENT_TIMEOUT)
- data = self.client(
- 'graphql',
- {
- 'request_string': QUERY,
- 'variables': {
- # list of task states we want to see
- 'taskStates': [
- state
- for state, is_on in self.filter_states.items()
- if is_on
- ]
- }
- }
- )
- except WorkflowStopped:
- # Distinguish stopped flow from non-existent flow.
- self.client = None
- full_path = Path(get_workflow_run_dir(self.id_))
- if (
- (full_path / WorkflowFiles.SUITE_RC).is_file()
- or (full_path / WorkflowFiles.FLOW_FILE).is_file()
- ):
- message = "stopped"
- else:
- message = (
- f"No {WorkflowFiles.SUITE_RC} or {WorkflowFiles.FLOW_FILE}"
- f"found in {self.id_}."
- )
-
- return dummy_flow({
- 'name': self.id_,
- 'id': self.id_,
- 'status': message,
- 'stateTotals': {}
- })
- except (ClientError, ClientTimeout) as exc:
- # catch network / client errors
- self.set_header([('workflow_error', str(exc))])
- return False
-
- if isinstance(data, list):
- # catch GraphQL errors
- try:
- message = data[0]['error']['message']
- except (IndexError, KeyError):
- message = str(data)
- self.set_header([('workflow_error', message)])
- return False
-
- if len(data['workflows']) != 1:
- # multiple workflows in returned data - shouldn't happen
- raise ValueError()
-
- return compute_tree(data['workflows'][0])
-
@staticmethod
def get_node_id(node):
"""Return a unique identifier for a node.
@@ -377,23 +399,13 @@ def set_header(self, message: list):
"""
# put in a one line gap
message.append('\n')
-
- # TODO: remove once Tui is delta-driven
- # https://github.com/cylc/cylc-flow/issues/3527
- message.extend([
- (
- 'workflow_error',
- 'TUI is experimental and may break with large flows'
- ),
- '\n'
- ])
-
self.view.header = urwid.Text(message)
def _update(self, *_):
try:
self.update()
except Exception as exc:
+ raise # TODO
sys.exit(exc)
def update(self):
@@ -402,26 +414,32 @@ def update(self):
Preserves the current focus and collapse/expand state.
"""
- # update the data store
- # TODO: this can be done incrementally using deltas
- # once this interface is available
- snapshot = self.get_snapshot()
- if snapshot is False:
+ # attempt to fetch an update
+ update = False
+ while not self.updater.update_queue.empty():
+ # fetch the most recent update
+ update = self.updater.update_queue.get()
+
+ if update is False:
+ if self.loop:
+ self.loop.set_alarm_in(self.UPDATE_INTERVAL, self._update)
return False
# update the workflow status message
- header = [get_workflow_status_str(snapshot['data'])]
- status_summary = get_task_status_summary(snapshot['data'])
- if status_summary:
- header.extend([' ('] + status_summary + [' )'])
- if not all(self.filter_states.values()):
- header.extend([' ', '*filtered* "R" to reset', ' '])
+ header = []
+ # header = [get_workflow_status_str(snapshot['data'])]
+ # status_summary = get_task_status_summary(snapshot['data'])
+ # if status_summary:
+ # header.extend([' ('] + status_summary + [' )'])
+
+ if not all(self.filters['tasks'].values()):
+ header.extend([' ', '*tasks filtered* "F" to edit, "R" to reset', ' '])
self.set_header(header)
# global update - the nuclear option - slow but simple
# TODO: this can be done incrementally by adding and
# removing nodes from the existing tree
- topnode = TuiParentNode(snapshot)
+ topnode = TuiParentNode(self, update)
# NOTE: because we are nuking the tree we need to manually
# preserve the focus and collapse status of tree nodes
@@ -436,6 +454,13 @@ def update(self):
# get the new focus
_, new_node = self.listbox._body.get_focus()
+ if self.expand_on_load:
+ depth = TREE_EXPAND_DEPTH
+ if self.expand_on_load == 'root':
+ depth = 1
+ if expand_tree(self, new_node, self.expand_on_load, depth):
+ self.expand_on_load = None
+
# preserve the focus or walk to the nearest parent
closest_focus = find_closest_focus(self, old_node, new_node)
self.listbox._body.set_focus(closest_focus)
@@ -457,11 +482,25 @@ def filter_by_task_state(self, filtered_state=None):
A task state to filter by or None.
"""
- self.filter_states = {
+ self.filters['tasks'] = {
state: (state == filtered_state) or not filtered_state
- for state in self.filter_states
+ for state in self.filters['tasks']
+ }
+ self.updater.update_filters(self.filters)
+
+ def filter_by_workflow_state(self, *filtered_states):
+ """Filter workflows.
+
+ Args:
+ filtered_state (str):
+ A task state to filter by or None.
+
+ """
+ self.filters['workflows'] = {
+ state: (state in filtered_states) or not filtered_states
+ for state in self.filters['workflows']
}
- return
+ self.updater.update_filters(self.filters)
def open_overlay(self, fcn):
self.create_overlay(*fcn(self))
@@ -521,6 +560,7 @@ def close_topmost(self):
self.stack -= 1
+BINDINGS = Bindings()
BINDINGS.add_group(
'',
'Application Controls'
@@ -603,40 +643,62 @@ def close_topmost(self):
)
BINDINGS.add_group(
- 'filter',
+ 'filter tasks',
'Filter by task state'
)
BINDINGS.bind(
- ('F',),
- 'filter',
+ ('T',),
+ 'filter tasks',
'Select task states to filter by',
(TuiApp.open_overlay, overlay.filter_task_state)
)
BINDINGS.bind(
('f',),
- 'filter',
+ 'filter tasks',
'Show only failed tasks',
(TuiApp.filter_by_task_state, TASK_STATUS_FAILED)
)
BINDINGS.bind(
('s',),
- 'filter',
+ 'filter tasks',
'Show only submitted tasks',
(TuiApp.filter_by_task_state, TASK_STATUS_SUBMITTED)
)
BINDINGS.bind(
('r',),
- 'filter',
+ 'filter tasks',
'Show only running tasks',
(TuiApp.filter_by_task_state, TASK_STATUS_RUNNING)
)
BINDINGS.bind(
('R',),
- 'filter',
+ 'filter tasks',
'Reset task state filtering',
(TuiApp.filter_by_task_state,)
)
+BINDINGS.add_group(
+ 'filter workflows',
+ 'Filter by workflow state'
+)
+BINDINGS.bind(
+ ('W',),
+ 'filter workflows',
+ 'Select workflow states to filter by',
+ (TuiApp.open_overlay, overlay.filter_workflow_state)
+)
+BINDINGS.bind(
+ ('p',),
+ 'filter workflows',
+ 'Show only running workflows',
+ (
+ TuiApp.filter_by_workflow_state,
+ WorkflowStatus.RUNNING.value,
+ WorkflowStatus.PAUSED.value,
+ WorkflowStatus.STOPPING.value
+ )
+)
+
def list_bindings():
"""Write out an in-line list of the key bindings."""
diff --git a/cylc/flow/tui/data.py b/cylc/flow/tui/data.py
index 4fd538d8783..a20fe8d8396 100644
--- a/cylc/flow/tui/data.py
+++ b/cylc/flow/tui/data.py
@@ -19,6 +19,7 @@
import sys
from cylc.flow.exceptions import ClientError
+from cylc.flow.id import Tokens
from cylc.flow.tui.util import (
extract_context
)
@@ -29,6 +30,7 @@
workflows {
id
name
+ port
status
stateTotals
taskProxies(states: $taskStates) {
@@ -154,23 +156,20 @@ def cli_cmd(*cmd):
raise ClientError(f'Error in command {" ".join(cmd)}\n{err}')
-def _clean(workflow):
- # for now we will exit tui when the workflow is cleaned
- # this will change when tui supports multiple workflows
- cli_cmd('clean', workflow)
- sys.exit(0)
-
-
OFFLINE_MUTATIONS = {
+ 'user': {
+ 'stop-all': partial(cli_cmd, 'stop', '*'),
+ },
'workflow': {
'play': partial(cli_cmd, 'play'),
- 'clean': _clean,
+ 'clean': partial(cli_cmd, 'clean', '--yes'),
'reinstall-reload': partial(cli_cmd, 'vr', '--yes'),
}
}
def generate_mutation(mutation, arguments):
+ arguments.pop('user')
graphql_args = ', '.join([
f'${argument}: {ARGUMENT_TYPES[argument]}'
for argument in arguments
@@ -206,28 +205,40 @@ def context_to_variables(context):
Examples:
>>> context_to_variables(extract_context(['~a/b//c/d']))
- {'workflow': ['b'], 'task': ['c/d']}
+ {'user': ['a'], 'workflow': ['b'], 'task': ['c/d']}
>>> context_to_variables(extract_context(['~a/b//c']))
- {'workflow': ['b'], 'task': ['c/*']}
+ {'user': ['a'], 'workflow': ['b'], 'task': ['c/*']}
>>> context_to_variables(extract_context(['~a/b']))
- {'workflow': ['b']}
+ {'user': ['a'], 'workflow': ['b']}
"""
# context_to_variables because it can only handle single-selection ATM
- variables = {'workflow': context['workflow']}
+ variables = {'user': context['user']}
+
+ if 'workflow' in context:
+ variables['workflow'] = context['workflow']
if 'task' in context:
variables['task'] = [
- f'{context["cycle"][0]}/{context["task"][0]}'
+ Tokens(
+ cycle=context['cycle'][0],
+ task=context['task'][0]
+ ).relative_id
]
elif 'cycle' in context:
- variables['task'] = [f'{context["cycle"][0]}/*']
+ variables['task'] = [
+ Tokens(cycle=context['cycle'][0], task='*').relative_id
+ ]
return variables
def mutate(client, mutation, selection):
- if mutation in OFFLINE_MUTATIONS['workflow']:
+ if mutation in {
+ _mutation
+ for section in OFFLINE_MUTATIONS.values()
+ for _mutation in section
+ }:
offline_mutate(mutation, selection)
elif client:
online_mutate(client, mutation, selection)
@@ -256,6 +267,9 @@ def offline_mutate(mutation, selection):
"""Issue a mutation over the CLI or other offline interface."""
context = extract_context(selection)
variables = context_to_variables(context)
- for workflow in variables['workflow']:
- # NOTE: this currently only supports workflow mutations
- OFFLINE_MUTATIONS['workflow'][mutation](workflow)
+ if 'workflow' in variables:
+ for workflow in variables['workflow']:
+ # NOTE: this currently only supports workflow mutations
+ OFFLINE_MUTATIONS['workflow'][mutation](workflow)
+ else:
+ OFFLINE_MUTATIONS['user'][mutation]()
diff --git a/cylc/flow/tui/overlay.py b/cylc/flow/tui/overlay.py
index cddcc8d5093..54e1fad261c 100644
--- a/cylc/flow/tui/overlay.py
+++ b/cylc/flow/tui/overlay.py
@@ -37,55 +37,122 @@
"""
+from contextlib import suppress
from functools import partial
+import re
import sys
import urwid
from cylc.flow.exceptions import (
ClientError,
+ ClientTimeout,
+ WorkflowStopped,
)
+from cylc.flow.id import Tokens
+from cylc.flow.network.client_factory import get_client
from cylc.flow.task_state import (
TASK_STATUSES_ORDERED,
TASK_STATUS_WAITING
)
+import cylc.flow.tui.app
from cylc.flow.tui import (
- BINDINGS,
JOB_COLOURS,
JOB_ICON,
TUI
)
from cylc.flow.tui.data import (
+ extract_context,
list_mutations,
mutate,
)
from cylc.flow.tui.util import (
- get_task_icon
+ get_task_icon,
)
+def _toggle_filter(app, filter_group, status, *_):
+ """Toggle a filter state."""
+ app.filters[filter_group][status] = not app.filters[filter_group][status]
+ app.updater.update_filters(app.filters)
+
+
+def _invert_filter(checkboxes, *_):
+ """Invert the state of all filters."""
+ for checkbox in checkboxes:
+ checkbox.set_state(not checkbox.state)
+
+
+def filter_workflow_state(app):
+ """Return a widget for adjusting the workflow filter options."""
+ checkboxes = [
+ urwid.CheckBox(
+ [status],
+ state=is_on,
+ on_state_change=partial(_toggle_filter, app, 'workflows', status)
+ )
+ for status, is_on in app.filters['workflows'].items()
+ if status != 'id'
+ ]
+
+ workflow_id_prompt = 'id (regex)'
+
+ def update_id_filter(widget, value):
+ nonlocal app
+ try:
+ # ensure the filter is value before updating the filter
+ re.compile(value)
+ except re.error:
+ # error in the regex -> inform the user
+ widget.set_caption(f'{workflow_id_prompt} - error: \n')
+ else:
+ # valid regex -> update the filter
+ widget.set_caption(f'{workflow_id_prompt}: \n')
+ app.filters['workflows']['id'] = value
+ app.updater.update_filters(app.filters)
+
+ id_filter_widget = urwid.Edit(
+ caption=f'{workflow_id_prompt}: \n',
+ edit_text=app.filters['workflows']['id'],
+ )
+ urwid.connect_signal(id_filter_widget, 'change', update_id_filter)
+
+ widget = urwid.ListBox(
+ urwid.SimpleFocusListWalker([
+ urwid.Text('Filter Workflow States'),
+ urwid.Divider(),
+ urwid.Padding(
+ urwid.Button(
+ 'Invert',
+ on_press=partial(_invert_filter, checkboxes)
+ ),
+ right=19
+ )
+ ] + checkboxes + [
+ urwid.Divider(),
+ id_filter_widget,
+ ])
+ )
+
+ return (
+ widget,
+ {'width': 35, 'height': 23}
+ )
+
+
def filter_task_state(app):
"""Return a widget for adjusting the task state filter."""
- def toggle(state, *_):
- """Toggle a filter state."""
- app.filter_states[state] = not app.filter_states[state]
-
checkboxes = [
urwid.CheckBox(
get_task_icon(state)
+ [' ' + state],
state=is_on,
- on_state_change=partial(toggle, state)
+ on_state_change=partial(_toggle_filter, app, 'tasks', state)
)
- for state, is_on in app.filter_states.items()
+ for state, is_on in app.filters['tasks'].items()
]
- def invert(*_):
- """Invert the state of all filters."""
- for checkbox in checkboxes:
- checkbox.set_state(not checkbox.state)
-
widget = urwid.ListBox(
urwid.SimpleFocusListWalker([
urwid.Text('Filter Task States'),
@@ -93,7 +160,7 @@ def invert(*_):
urwid.Padding(
urwid.Button(
'Invert',
- on_press=invert
+ on_press=partial(_invert_filter, checkboxes)
),
right=19
)
@@ -127,7 +194,7 @@ def help_info(app):
]
# list key bindings
- for group, bindings in BINDINGS.list_groups():
+ for group, bindings in cylc.flow.tui.app.BINDINGS.list_groups():
items.append(
urwid.Text([
f'{group["desc"]}:'
@@ -214,22 +281,38 @@ def context(app):
"""An overlay for context menus."""
value = app.tree_walker.get_focus()[0].get_node().get_value()
selection = [value['id_']] # single selection ATM
+ context = extract_context(selection)
+
+ client = None
+ if 'workflow' in context:
+ w_id = context['workflow'][0]
+ with suppress(WorkflowStopped, ClientError, ClientTimeout):
+ client = get_client(w_id)
def _mutate(mutation, _):
- nonlocal app
+ nonlocal app, client
app.open_overlay(partial(progress, text='Running Command'))
try:
- mutate(app.client, mutation, selection)
+ mutate(client, mutation, selection)
except ClientError as exc:
app.open_overlay(partial(error, text=str(exc)))
else:
app.close_topmost()
app.close_topmost()
+ # determine the ID to display for the context menu
+ tokens = Tokens(value["id_"])
+ if tokens.is_task_like:
+ # if it's a cycle/task/job, then use the relative id
+ display_id = tokens.relative_id
+ else:
+ # otherwise use the full id
+ display_id = tokens.id
+
widget = urwid.ListBox(
urwid.SimpleFocusListWalker(
[
- urwid.Text(f'id: {value["id_"]}'),
+ urwid.Text(f'id: {display_id}'),
urwid.Divider(),
urwid.Text('Action'),
urwid.Button(
@@ -242,14 +325,14 @@ def _mutate(mutation, _):
mutation,
on_press=partial(_mutate, mutation)
)
- for mutation in list_mutations(app.client, selection)
+ for mutation in list_mutations(client, selection)
]
)
)
return (
widget,
- {'width': 30, 'height': 20}
+ {'width': 40, 'height': 20}
)
diff --git a/cylc/flow/tui/tree.py b/cylc/flow/tui/tree.py
index 84ed55ab53a..f933acfe95f 100644
--- a/cylc/flow/tui/tree.py
+++ b/cylc/flow/tui/tree.py
@@ -56,6 +56,76 @@ def find_closest_focus(app, old_node, new_node):
)
+def expand_tree(app, new_node, head, depth, node_types=None):
+ """Expand the Tui tree to the desired level.
+
+ Arguments:
+ app:
+ The Tui application instance.
+ new_node:
+ The Tui widget representing the tree view.
+ head:
+ If specified, the tree below this node will be expanded.
+ depth:
+ The max depth to expand nodes too.
+ node_types:
+ Whitelist of node types to expand, note "task", "job" and "spring"
+ nodes are excluded by default.
+
+ Returns:
+ True, if the node was found in the tree, is loaded and has been
+ expanded.
+
+ Examples:
+ # expand the top three levels of the tree
+ compute_tree(app, node, None, 3)
+
+ # expand the "root" node AND the top five levels of the tree under
+ # ~user/workflow
+ compute_tree(app, node, '~user/workflow', 5)
+
+ """
+ if not node_types:
+ # don't auto-expand job nodes by default
+ node_types = {'root', 'workflow', 'cycle', 'family'}
+
+ new_root = new_node.get_root()
+ head_node = new_root
+
+ # locate the "head" if specified
+ if head:
+ for node in walk_tree(new_root):
+ key = app.get_node_id(node)
+ if key == head:
+ head_node = node
+ child_keys = node.get_child_keys()
+ if (
+ # if the node only has one child
+ len(child_keys) == 1
+ # and that child is a "#spring" node (i.e. a loading node)
+ and node.get_child_node([k for k in child_keys][0]).get_value()['type_'] == '#spring'
+ ):
+ # then the content hasn't loaded yet so the node cannot be
+ # expanded
+ return False
+ break
+ else:
+ # the requested node does not exist yet
+ # it might still be loading
+ return False
+
+ # expand the specified nodes
+ for node in (*walk_tree(head_node, depth), new_root):
+ if node.get_value()['type_'] not in node_types:
+ continue
+ widget = node.get_widget()
+ widget.expanded = True
+ widget.update_expanded_icon()
+ # print('EXPAND',node.get_value()['id_'])
+
+ return True
+
+
def translate_collapsing(app, old_node, new_node):
"""Transfer the collapse state from one tree to another.
@@ -88,22 +158,25 @@ def translate_collapsing(app, old_node, new_node):
widget.update_expanded_icon()
-def walk_tree(node):
+def walk_tree(node, depth=None):
"""Yield nodes in order.
Arguments:
node (urwid.TreeNode):
Yield this node and all nodes beneath it.
+ depth:
+ The maximum depth to walk to or None to walk all children.
Yields:
urwid.TreeNode
"""
- stack = [node]
+ stack = [(node, 1)]
while stack:
- node = stack.pop()
+ node, _depth = stack.pop()
yield node
- stack.extend([
- node.get_child_node(index)
- for index in node.get_child_keys()
- ])
+ if depth is None or _depth < depth:
+ stack.extend([
+ (node.get_child_node(index), _depth + 1)
+ for index in node.get_child_keys()
+ ])
diff --git a/cylc/flow/tui/updater.py b/cylc/flow/tui/updater.py
new file mode 100644
index 00000000000..3203bc761ce
--- /dev/null
+++ b/cylc/flow/tui/updater.py
@@ -0,0 +1,279 @@
+#!/usr/bin/env python3
+# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
+# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+
+"""Contains the logic for updating the Tui app."""
+
+from asyncio import (
+ run,
+ sleep,
+ gather,
+)
+from copy import deepcopy
+from multiprocessing import Queue
+from time import time
+
+from cylc.flow.exceptions import (
+ ClientError,
+ ClientTimeout,
+ CylcError,
+ WorkflowStopped,
+)
+from cylc.flow.id import Tokens
+from cylc.flow.network.client_factory import get_client
+from cylc.flow.network.scan import (
+ filter_name,
+ graphql_query,
+ is_active,
+ scan,
+)
+from cylc.flow.task_state import (
+ TASK_STATUSES_ORDERED,
+)
+from cylc.flow.tui.data import (
+ QUERY
+)
+from cylc.flow.tui.util import (
+ compute_tree,
+ suppress_logging,
+)
+from cylc.flow.workflow_status import (
+ WorkflowStatus,
+)
+
+
+def get_default_filters():
+ """Return default task/workflow filters.
+
+ These filters show everything.
+ """
+ return {
+ 'tasks': {
+ # filtered task statuses
+ state: True
+ for state in TASK_STATUSES_ORDERED
+ },
+ 'workflows': {
+ # filtered workflow statuses
+ **{
+ state.value: True
+ for state in WorkflowStatus
+ },
+ # filtered workflow ids
+ 'id': '.*',
+ }
+ }
+
+
+class Updater():
+
+ CLIENT_TIMEOUT = 1
+ BASE_UPDATE_INTERVAL = 1
+ BASE_SCAN_INTERVAL = 10
+
+ SIGNAL_TERMINATE = 'terminate'
+
+ def __init__(self, w_id=None):
+ # Cylc comms clients for each workflow we're connected to
+ self._clients = {}
+
+ # iterate over this to get a list of workflows
+ self._scan_pipe = None
+ # the new pipe if the workflow filter options are changed
+ self.__scan_pipe = None
+
+ # task/workflow filters
+ self.filters = None # note set on self.run()
+ # queue for pushing out updates
+ self.update_queue = Queue()
+ # queue for commands to the updater
+ self._command_queue = Queue()
+
+ def subscribe(self, w_id):
+ """Subscribe to updates from a workflow."""
+ self._command_queue.put((self._subscribe.__name__, w_id))
+
+ def unsubscribe(self, w_id):
+ """Unsubscribe to updates from a workflow."""
+ self._command_queue.put((self._unsubscribe.__name__, w_id))
+
+ def update_filters(self, filters):
+ """Update the task state filter."""
+ self._command_queue.put((self._update_filters.__name__, filters))
+
+ def terminate(self):
+ """Stop the updater."""
+ self._command_queue.put((self.SIGNAL_TERMINATE, None))
+
+ def start(self, filters):
+ """Start the updater in a new asyncio.loop.
+
+ The Tui app will call this within a dedicated process.
+ """
+ run(self.run(filters))
+
+ async def run(self, filters):
+ """Start the updater in an existing asyncio.loop.
+
+ The tests call this within the same process.
+ """
+ with suppress_logging():
+ self._update_filters(filters)
+ await self._update()
+
+ def _subscribe(self, w_id):
+ if w_id not in self._clients:
+ self._clients[w_id] = None
+
+ def _unsubscribe(self, w_id):
+ if w_id in self._clients:
+ self._clients.pop(w_id)
+
+ def _update_filters(self, filters):
+ if (
+ not self.filters
+ or filters['workflows']['id'] != self.filters['workflows']['id']
+ ):
+ # update the scan pipe
+ self.__scan_pipe = (
+ # scan all workflows
+ scan
+ # filter by workflow name
+ | filter_name(filters['workflows']['id'])
+ # if the workflow is active, retrieve its status
+ | is_active(True, filter_stop=False)
+ | graphql_query({'status': None})
+ )
+
+ self.filters = filters
+
+ async def _update(self):
+ last_scan_time = 0
+ while True:
+ if not self._command_queue.empty():
+ (command, payload) = self._command_queue.get()
+ if command == self.SIGNAL_TERMINATE:
+ break
+ getattr(self, command)(payload)
+ continue
+
+ update_start_time = time()
+ if update_start_time - last_scan_time > self.BASE_SCAN_INTERVAL:
+ data = await self._scan()
+
+ self.update_queue.put(await self._run_update(data))
+
+ update_time = time() - update_start_time
+ await sleep(self.BASE_UPDATE_INTERVAL - update_time)
+
+ async def _run_update(self, data):
+ # copy the scanned data so it can be reused for future updates
+ data = deepcopy(data)
+
+ # connect to schedulers if needed
+ self._connect(data)
+
+ # update data with the response from each workflow
+ # TODO: make this even more async so each workflow can update on a
+ # different timescale?
+ await gather(
+ *(
+ self._update_workflow(w_id, client, data)
+ for w_id, client in self._clients.items()
+ )
+ )
+
+ return compute_tree(data)
+
+ async def _update_workflow(self, w_id, client, data):
+ if not client:
+ return
+
+ def _append(workflow_data):
+ for workflow in data['workflows']:
+ if workflow['id'] == workflow_data['id']:
+ workflow.update(workflow_data)
+ break
+
+ try:
+ workflow_data = await client.async_request(
+ 'graphql',
+ {
+ 'request_string': QUERY,
+ 'variables': {
+ # list of task states we want to see
+ 'taskStates': [
+ state
+ for state, is_on in self.filters['tasks'].items()
+ if is_on
+ ]
+ }
+ }
+ )
+ except WorkflowStopped:
+ # TODO
+ pass
+ except (ClientError, ClientTimeout):
+ # TODO
+ pass
+ except CylcError:
+ pass
+ else:
+ _append(workflow_data['workflows'][0])
+
+ def _connect(self, data):
+ for w_id, client in self._clients.items():
+ if not client:
+ try:
+ self._clients[w_id] = get_client(
+ Tokens(w_id)['workflow'],
+ timeout=self.CLIENT_TIMEOUT
+ )
+ except WorkflowStopped:
+ for workflow in data['workflows']:
+ if workflow['id'] == w_id:
+ workflow['_tui_data'] = f'Workflow is not running'
+ except (ClientError, ClientTimeout) as exc:
+ for workflow in data['workflows']:
+ if workflow['id'] == w_id:
+ workflow['_tui_data'] = f'Error: {exc}'
+ break
+
+ async def _scan(self):
+ # TODO async!
+ data = {'workflows': []}
+ workflow_filter_statuses = {
+ status
+ for status, filtered in self.filters['workflows'].items()
+ if filtered
+ }
+ if self.__scan_pipe:
+ # switch to the new pipe if it has been changed
+ self._scan_pipe = self.__scan_pipe
+ async for workflow in self._scan_pipe:
+ status = workflow.get('status', WorkflowStatus.STOPPED.value)
+ if status not in workflow_filter_statuses:
+ # this workflow is filtered out
+ continue
+ data['workflows'].append({
+ 'id': f'~osanders/{workflow["name"]}',
+ 'name': workflow['name'],
+ 'status': status,
+ 'stateTotals': {},
+ })
+
+ data['workflows'].sort(key=lambda x: x['id'])
+ return data
diff --git a/cylc/flow/tui/util.py b/cylc/flow/tui/util.py
index 575fc693437..6587cd4be1d 100644
--- a/cylc/flow/tui/util.py
+++ b/cylc/flow/tui/util.py
@@ -16,10 +16,13 @@
# along with this program. If not, see .
"""Common utilities for Tui."""
+from contextlib import contextmanager
+from getpass import getuser
from itertools import zip_longest
import re
from time import time
+from cylc.flow import LOG
from cylc.flow.id import Tokens
from cylc.flow.task_state import (
TASK_STATUS_RUNNING
@@ -33,6 +36,26 @@
from cylc.flow.wallclock import get_unix_time_from_time_string
+# the Tui user, note this is always the same as the workflow owner
+# (Tui doesn't do multi-user stuff)
+ME = getuser()
+
+
+@contextmanager
+def suppress_logging():
+ """Suppress Cylc logging.
+
+ Log goes to stdout/err which can pollute Urwid apps.
+ Patching sys.stdout/err is insufficient so we set the level to something
+ silly for the duration of this context manager then set it back again
+ afterwards.
+ """
+ level = LOG.getEffectiveLevel()
+ LOG.setLevel(99999)
+ yield
+ LOG.setLevel(level)
+
+
def get_task_icon(
status,
*,
@@ -113,80 +136,90 @@ def idpop(id_):
return tokens.id
-def compute_tree(flow):
- """Digest GraphQL data to produce a tree.
+def compute_tree(data):
+ """Digest GraphQL data to produce a tree."""
+ root_node = add_node('root', 'root', {}, data={})
- Arguments:
- flow (dict):
- A dictionary representing a single workflow.
+ for flow in data['workflows']:
+ nodes = {}
+ flow_node = add_node(
+ 'workflow', flow['id'], nodes, data=flow)
+ root_node['children'].append(flow_node)
- Returns:
- dict - A top-level workflow node.
+ # populate cycle nodes
+ for cycle in flow.get('cyclePoints', []):
+ cycle['id'] = idpop(cycle['id']) # strip the family off of the id
+ cycle_node = add_node('cycle', cycle['id'], nodes, data=cycle)
+ flow_node['children'].append(cycle_node)
- """
- nodes = {}
- flow_node = add_node(
- 'workflow', flow['id'], nodes, data=flow)
-
- # populate cycle nodes
- for cycle in flow['cyclePoints']:
- cycle['id'] = idpop(cycle['id']) # strip the family off of the id
- cycle_node = add_node('cycle', cycle['id'], nodes, data=cycle)
- flow_node['children'].append(cycle_node)
-
- # populate family nodes
- for family in flow['familyProxies']:
- add_node('family', family['id'], nodes, data=family)
-
- # create cycle/family tree
- for family in flow['familyProxies']:
- family_node = add_node(
- 'family', family['id'], nodes)
- first_parent = family['firstParent']
- if (
- first_parent
- and first_parent['name'] != 'root'
- ):
- parent_node = add_node(
- 'family', first_parent['id'], nodes)
- parent_node['children'].append(family_node)
- else:
- add_node(
- 'cycle', idpop(family['id']), nodes
- )['children'].append(family_node)
-
- # add leaves
- for task in flow['taskProxies']:
- # If there's no first parent, the child will have been deleted
- # during/after API query resolution. So ignore.
- if not task['firstParent']:
- continue
- task_node = add_node(
- 'task', task['id'], nodes, data=task)
- if task['firstParent']['name'] == 'root':
- family_node = add_node(
- 'cycle', idpop(task['id']), nodes)
- else:
+ # populate family nodes
+ for family in flow.get('familyProxies', []):
+ add_node('family', family['id'], nodes, data=family)
+
+ # create cycle/family tree
+ for family in flow.get('familyProxies', []):
family_node = add_node(
- 'family', task['firstParent']['id'], nodes)
- family_node['children'].append(task_node)
- for job in task['jobs']:
- job_node = add_node(
- 'job', job['id'], nodes, data=job)
- job_info_node = add_node(
- 'job_info', job['id'] + '_info', nodes, data=job)
- job_node['children'] = [job_info_node]
- task_node['children'].append(job_node)
-
- # sort
- for (type_, _), node in nodes.items():
- if type_ != 'task':
- # NOTE: jobs are sorted by submit-num in the GraphQL query
- node['children'].sort(
- key=lambda x: NaturalSort(x['id_'])
+ 'family', family['id'], nodes)
+ first_parent = family['firstParent']
+ if (
+ first_parent
+ and first_parent['name'] != 'root'
+ ):
+ parent_node = add_node(
+ 'family', first_parent['id'], nodes)
+ parent_node['children'].append(family_node)
+ else:
+ add_node(
+ 'cycle', idpop(family['id']), nodes
+ )['children'].append(family_node)
+
+ # add leaves
+ for task in flow.get('taskProxies', []):
+ # If there's no first parent, the child will have been deleted
+ # during/after API query resolution. So ignore.
+ if not task['firstParent']:
+ continue
+ task_node = add_node(
+ 'task', task['id'], nodes, data=task)
+ if task['firstParent']['name'] == 'root':
+ family_node = add_node(
+ 'cycle', idpop(task['id']), nodes)
+ else:
+ family_node = add_node(
+ 'family', task['firstParent']['id'], nodes)
+ family_node['children'].append(task_node)
+ for job in task['jobs']:
+ job_node = add_node(
+ 'job', job['id'], nodes, data=job)
+ job_info_node = add_node(
+ 'job_info', job['id'] + '_info', nodes, data=job)
+ job_node['children'] = [job_info_node]
+ task_node['children'].append(job_node)
+
+ # sort
+ for (type_, _), node in nodes.items():
+ if type_ != 'task':
+ # NOTE: jobs are sorted by submit-num in the GraphQL query
+ node['children'].sort(
+ key=lambda x: NaturalSort(x['id_'])
+ )
+
+ # spring nodes
+ if 'port' not in flow:
+ # the "port" field is only available via GraphQL
+ # so we are not connected to this workflow yet
+ flow_node['children'].append(
+ add_node(
+ '#spring',
+ '#spring',
+ nodes,
+ data={
+ 'id': flow.get('_tui_data', 'Loading ...'),
+ }
+ )
)
- return flow_node
+ return root_node
class NaturalSort:
@@ -361,20 +394,113 @@ def get_workflow_status_str(flow):
list - Text list for the urwid.Text widget.
"""
- status = flow['status']
+
+
+def _render_user(node, data):
+ return f'~{ME}'
+
+
+def _render_job_info(node, data):
+ key_len = max(len(key) for key in data)
+ ret = [
+ f'{key} {" " * (key_len - len(key))} {value}\n'
+ for key, value in data.items()
+ ]
+ ret[-1] = ret[-1][:-1] # strip trailing newline
+ return ret
+
+
+def _render_job(node, data):
return [
- (
- 'title',
- flow['name'],
+ f'#{data["submitNum"]:02d} ',
+ get_job_icon(data['state'])
+ ]
+
+
+def _render_task(node, data):
+ start_time = None
+ mean_time = None
+ try:
+ # due to sorting this is the most recent job
+ first_child = node.get_child_node(0)
+ except IndexError:
+ first_child = None
+
+ # progress information
+ if data['state'] == TASK_STATUS_RUNNING and first_child:
+ start_time = first_child.get_value()['data']['startedTime']
+ mean_time = data['task']['meanElapsedTime']
+
+ # the task icon
+ ret = get_task_icon(
+ data['state'],
+ is_held=data['isHeld'],
+ is_queued=data['isQueued'],
+ is_runahead=data['isRunahead'],
+ start_time=start_time,
+ mean_time=mean_time
+ )
+
+ # the most recent job status
+ ret.append(' ')
+ if first_child:
+ state = first_child.get_value()['data']['state']
+ ret += [(f'job_{state}', f'{JOB_ICON}'), ' ']
+
+ # the task name
+ ret.append(f'{data["name"]}')
+ return ret
+
+
+def _render_family(node, data):
+ return [
+ get_task_icon(
+ data['state'],
+ is_held=data['isHeld'],
+ is_queued=data['isQueued'],
+ is_runahead=data['isRunahead']
),
- ' - ',
- (
- f'workflow_{status}',
- status
- )
+ ' ',
+ Tokens(data['id']).pop_token()[1]
]
+def _render_unknown(node, data):
+ try:
+ state_totals = get_task_status_summary(data)
+ status = data['status']
+ status_msg = [
+ (
+ 'title',
+ _display_workflow_id(data),
+ ),
+ ' - ',
+ (
+ f'workflow_{status}',
+ status
+ )
+ ]
+ except KeyError:
+ return Tokens(data['id']).pop_token()[1]
+
+ return [*status_msg, *state_totals]
+
+
+def _display_workflow_id(data):
+ return data['name']
+
+
+RENDER_FUNCTIONS = {
+ 'user': _render_user,
+ 'root': _render_user,
+ 'job_info': _render_job_info,
+ 'job': _render_job,
+ 'task': _render_task,
+ 'cycle': _render_family,
+ 'family': _render_family,
+}
+
+
def render_node(node, data, type_):
"""Render a tree node as text.
@@ -387,68 +513,7 @@ def render_node(node, data, type_):
The node type (e.g. `task`, `job`, `family`).
"""
- if type_ == 'job_info':
- key_len = max(len(key) for key in data)
- ret = [
- f'{key} {" " * (key_len - len(key))} {value}\n'
- for key, value in data.items()
- ]
- ret[-1] = ret[-1][:-1] # strip trailing newline
- return ret
-
- if type_ == 'job':
- return [
- f'#{data["submitNum"]:02d} ',
- get_job_icon(data['state'])
- ]
-
- if type_ == 'task':
- start_time = None
- mean_time = None
- try:
- # due to sorting this is the most recent job
- first_child = node.get_child_node(0)
- except IndexError:
- first_child = None
-
- # progress information
- if data['state'] == TASK_STATUS_RUNNING and first_child:
- start_time = first_child.get_value()['data']['startedTime']
- mean_time = data['task']['meanElapsedTime']
-
- # the task icon
- ret = get_task_icon(
- data['state'],
- is_held=data['isHeld'],
- is_queued=data['isQueued'],
- is_runahead=data['isRunahead'],
- start_time=start_time,
- mean_time=mean_time
- )
-
- # the most recent job status
- ret.append(' ')
- if first_child:
- state = first_child.get_value()['data']['state']
- ret += [(f'job_{state}', f'{JOB_ICON}'), ' ']
-
- # the task name
- ret.append(f'{data["name"]}')
- return ret
-
- if type_ in ['family', 'cycle']:
- return [
- get_task_icon(
- data['state'],
- is_held=data['isHeld'],
- is_queued=data['isQueued'],
- is_runahead=data['isRunahead']
- ),
- ' ',
- Tokens(data['id']).pop_token()[1]
- ]
-
- return Tokens(data['id']).pop_token()[1]
+ return RENDER_FUNCTIONS.get(type_, _render_unknown)(node, data)
PARTS = [
@@ -476,9 +541,18 @@ def extract_context(selection):
{'user': ['a'], 'workflow': ['b'], 'cycle': ['c'],
'task': ['d'], 'job': ['e']}
+ >>> list(extract_context(['root']).keys())
+ ['user']
+
"""
ret = {}
for item in selection:
+ if item == 'root':
+ # special handling for the Tui "root" node
+ # (this represents the workflow owner which is always the same as
+ # user for Tui)
+ ret['user'] = ME
+ continue
tokens = Tokens(item)
for key, value in tokens.items():
if (
diff --git a/tests/integration/tui/test_app.py b/tests/integration/tui/test_app.py
new file mode 100644
index 00000000000..de16f72063f
--- /dev/null
+++ b/tests/integration/tui/test_app.py
@@ -0,0 +1,200 @@
+from cylc.flow.cycling.integer import IntegerPoint
+# from cylc.flow.task_state import (
+# TASK_STATUS_RUNNING,
+# TASK_STATUS_SUCCEEDED,
+# TASK_STATUS_FAILED,
+# TASK_STATUS_WAITING,
+# )
+from cylc.flow.workflow_status import StopMode
+
+
+def set_task_state(schd, task_states):
+ """Force tasks into the desired states.
+
+ Task states should be of the format (cycle, task, state, is_held).
+ """
+ for cycle, task, state, is_held in task_states:
+ itask = schd.pool.get_task(cycle, task)
+ if not itask:
+ itask = schd.pool.spawn_task(task, cycle, {1})
+ itask.state_reset(state, is_held=is_held)
+ schd.data_store_mgr.delta_task_state(itask)
+ schd.data_store_mgr.increment_graph_window(
+ itask.tokens,
+ cycle,
+ {1},
+ )
+
+
+async def test_tui_basics(raikura):
+ """Test basic Tui interaction with no workflows."""
+ with raikura(size='80,40') as rk:
+ # the app should open
+ rk.compare_screenshot('test-raikura')
+
+ # "h" should bring up the onscreen help
+ rk.user_input('h')
+ rk.compare_screenshot('test-raikura-help')
+
+ # "q" should close the popup
+ rk.user_input('q')
+ rk.compare_screenshot('test-raikura')
+
+ # "enter" should bring up the context menu
+ rk.user_input('enter')
+ rk.compare_screenshot('test-raikura-enter')
+
+ # "enter" again should close it via the "cancel" button
+ rk.user_input('enter')
+ rk.compare_screenshot('test-raikura')
+
+
+async def test_single_workflow(one_conf, flow, scheduler, start, raikura):
+ """Test a simple workflow with one task."""
+ id_ = flow(one_conf, name='one')
+ schd = scheduler(id_)
+ async with start(schd):
+ await schd.update_data_structure()
+ with raikura(size='80,15') as rk:
+ rk.compare_screenshot('simple')
+
+
+async def test_workflow_states(one_conf, flow, scheduler, start, raikura):
+ """Test viewing multiple workflows in different states."""
+ # one => stopping
+ id_1 = flow(one_conf, name='one')
+ schd_1 = scheduler(id_1)
+ # two => paused
+ id_2 = flow(one_conf, name='two')
+ schd_2 = scheduler(id_2)
+ # tre => running
+ flow(one_conf, name='tre')
+
+ async with start(schd_1):
+ schd_1.stop_mode = StopMode.AUTO # make it look like we're stopping
+ await schd_1.update_data_structure()
+
+ async with start(schd_2):
+ await schd_2.update_data_structure()
+ with raikura(size='80,15') as rk:
+ rk.compare_screenshot('unfiltered')
+
+ # filter out paused workflows
+ rk.user_input('W', 'down', 'enter', 'q')
+ rk.force_update()
+ rk.compare_screenshot('filter-not-paused')
+
+ # invert the filter so we are filtering for paused workflows
+ rk.user_input('W', 'enter', 'q')
+ rk.force_update()
+ rk.compare_screenshot('filter-paused')
+
+
+# TODO: Task state filtering is currently broken
+# see: https://github.com/cylc/cylc-flow/issues/5716
+#
+# async def test_task_states(flow, scheduler, start, raikura):
+# id_ = flow({
+# 'scheduler': {
+# 'allow implicit tasks': 'true',
+# },
+# 'scheduling': {
+# 'initial cycle point': '1',
+# 'cycling mode': 'integer',
+# 'runahead limit': 'P1',
+# 'graph': {
+# 'P1': '''
+# a => b => c
+# b[-P1] => b
+# '''
+# }
+# }
+# }, name='test_task_states')
+# schd = scheduler(id_)
+# async with start(schd):
+# set_task_state(
+# schd,
+# [
+# (IntegerPoint('1'), 'a', TASK_STATUS_SUCCEEDED, False),
+# # (IntegerPoint('1'), 'b', TASK_STATUS_FAILED, False),
+# (IntegerPoint('1'), 'c', TASK_STATUS_RUNNING, False),
+# # (IntegerPoint('2'), 'a', TASK_STATUS_RUNNING, False),
+# (IntegerPoint('2'), 'b', TASK_STATUS_WAITING, True),
+# ]
+# )
+# await schd.update_data_structure()
+#
+# with raikura(schd.tokens.id, size='80,20') as rk:
+# rk.compare_screenshot('unfiltered')
+#
+# # filter out waiting tasks
+# rk.user_input('T', 'down', 'enter', 'q')
+# rk.force_update()
+# rk.compare_screenshot('filter-not-waiting')
+
+
+async def test_navigation(flow, scheduler, start, raikura):
+ """Test navigating with the arrow keys."""
+ id_ = flow({
+ 'scheduling': {
+ 'graph': {
+ 'R1': 'A & B1 & B2',
+ }
+ },
+ 'runtime': {
+ 'A': {},
+ 'B': {},
+ 'B1': {'inherit': 'B'},
+ 'B2': {'inherit': 'B'},
+ 'a1': {'inherit': 'A'},
+ 'a2': {'inherit': 'A'},
+ 'b11': {'inherit': 'B1'},
+ 'b12': {'inherit': 'B1'},
+ 'b21': {'inherit': 'B2'},
+ 'b22': {'inherit': 'B2'},
+ }
+ }, name='one')
+ schd = scheduler(id_)
+ async with start(schd):
+ await schd.update_data_structure()
+
+ with raikura(size='80,30') as rk:
+ # the workflow should be collapsed when Tui is loaded
+ rk.compare_screenshot('on-load')
+
+ # pressing "right" should connect to the workflow
+ # and expand it once the data arrives
+ rk.user_input('down', 'right')
+ rk.wait_until_loaded(schd.tokens.id)
+ rk.force_update()
+ rk.compare_screenshot('workflow-expanded')
+
+ # pressing "left" should collapse the node
+ rk.user_input('down', 'down', 'left')
+ rk.compare_screenshot('family-A-collapsed')
+
+ # the "page up" and "page down" buttons should navigate to the top
+ # and bottom of the screen
+ rk.user_input('page down')
+ rk.compare_screenshot('cursor-at-bottom-of-screen')
+
+
+# async def test_restart_reconnect(one_conf, flow, scheduler, start, raikura):
+# with raikura(size='80,20') as rk:
+# schd = scheduler(flow(one_conf, name='one'))
+# async with start(schd):
+# await schd.update_data_structure()
+# rk.force_update()
+# rk.user_input('down', 'right')
+# rk.wait_until_loaded(schd.tokens.id)
+# rk.force_update()
+# rk.compare_screenshot('workflow-running')
+#
+# rk.force_update()
+# rk.compare_screenshot('workflow-stopped')
+#
+# schd = scheduler(flow(one_conf))
+# async with start(schd):
+# await schd.update_data_structure()
+# rk.force_update()
+# rk.compare_screenshot('workflow-restarted')
diff --git a/tests/integration/tui/test_updater.py b/tests/integration/tui/test_updater.py
new file mode 100644
index 00000000000..a0e2dde275a
--- /dev/null
+++ b/tests/integration/tui/test_updater.py
@@ -0,0 +1,278 @@
+#!/usr/bin/env python3
+# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
+# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+
+import asyncio
+from copy import deepcopy
+from pathlib import Path
+import re
+
+from async_timeout import timeout
+
+from cylc.flow.cycling.integer import IntegerPoint
+from cylc.flow.id import Tokens
+from cylc.flow.tui.updater import (
+ Updater,
+ get_default_filters,
+)
+from cylc.flow.workflow_status import WorkflowStatus
+
+
+async def await_update(updater):
+ """Wait for the latest update from the Updater.
+
+ Note:
+ If multiple updates are waiting, this returns the most recent.Q
+
+ Returns:
+ The latest update from the Updater's "update_queue".
+
+ """
+ while updater.update_queue.empty():
+ await asyncio.sleep(0.1)
+ while not updater.update_queue.empty():
+ # flush out any older updates to avoid race conditions in tests
+ update = updater.update_queue.get()
+ return update
+
+
+async def wait_workflow_connected(updater, tokens, connected=True, time=3):
+ """Wait for the Updater to connect to a workflow.
+
+ This will return once the updater has connected to a workflow and returned
+ the first data from it.
+
+ Arugments:
+ tokens:
+ The tokens of the workflow you're waiting for.
+ connected:
+ If True this waits for the updater to connect to the workflow, if
+ False it waits for the updater to disconnect from it.
+ time:
+ The maximum time to wait for this to happen.
+
+ Returns:
+ The first update from the Updater which contains the workflow data.
+
+ """
+ async with timeout(time):
+ while True:
+ root_node = await await_update(updater)
+ workflow_node = root_node['children'][0]
+ for workflow_node in root_node['children']:
+ if (
+ workflow_node['id_'] == tokens.id
+ and (
+ workflow_node['children'][0]['id_'] != '#spring'
+ ) == connected
+ ):
+ # if the spring node is still there then we haven't
+ # recieved the first update from the workflow yet
+ return root_node
+
+
+def get_child_tokens(root_node, types, relative=False):
+ """Return all ID of the specified types contained within the provided tree.
+
+ Args:
+ root_node:
+ The Tui tree you want to look for IDs in.
+ types:
+ The Tui types (e.g. 'workflow' or 'task') you want to extract.
+ relative:
+ If True, the relative IDs will be returned.
+
+ """
+ ret = set()
+ stack = [root_node]
+ while stack:
+ node = stack.pop()
+ stack.extend(node['children'])
+ if node['type_'] in types:
+
+ tokens = Tokens(node['id_'])
+ if relative:
+ ret.add(tokens.relative_id)
+ else:
+ ret.add(tokens.id)
+ return ret
+
+
+async def test_subscribe(one_conf, flow, scheduler, run, test_dir):
+ """It should subscribe and unsubscribe from workflows."""
+ id_ = flow(one_conf)
+ schd = scheduler(id_)
+
+ updater = Updater()
+
+ async def the_test():
+ nonlocal updater
+
+ # wait for the first update
+ root_node = await await_update(updater)
+
+ # there should be a root root_node
+ assert root_node['id_'] == 'root'
+ # a single root_node representing the workflow
+ assert root_node['children'][0]['id_'] == schd.tokens.id
+ # and a "spring" root_node used to active the subscription mechanism
+ assert root_node['children'][0]['children'][0]['id_'] == '#spring'
+
+ # subscribe to the workflow
+ updater.subscribe(schd.tokens.id)
+
+ # wait for it to connect to the workflow
+ root_node = await wait_workflow_connected(updater, schd.tokens)
+
+ # check the workflow contains one cycle with one task in it
+ workflow_node = root_node['children'][0]
+ assert len(workflow_node['children']) == 1
+ cycle_node = workflow_node['children'][0]
+ assert Tokens(cycle_node['id_']).relative_id == '1' # cycle ID
+ assert len(cycle_node['children']) == 1
+ task_node = cycle_node['children'][0]
+ assert Tokens(task_node['id_']).relative_id == '1/one' # task ID
+
+ # unsubscribe from the workflow
+ updater.unsubscribe(schd.tokens.id)
+
+ # wait for it to disconnect from the workflow
+ root_node = await wait_workflow_connected(
+ updater,
+ schd.tokens,
+ connected=False,
+ )
+
+ # shut down the updater
+ updater.terminate()
+
+ async with run(schd):
+ filters = get_default_filters()
+ # (test_dir.relative_to(Path("~/cylc-run").expanduser())
+ filters['workflows']['id'] = f'{re.escape(str(test_dir.relative_to(Path("~/cylc-run").expanduser())))}/.*'
+
+ # run the updater and the test
+ await asyncio.gather(
+ asyncio.create_task(updater.run(filters)),
+ asyncio.create_task(the_test()),
+ )
+
+
+async def test_filters(one_conf, flow, scheduler, run, test_dir):
+ """It should filter workflow and task states.
+
+ Note:
+ The workflow ID filter is not explicitly tested here, but it is
+ indirectly tested, otherwise other workflows would show up in the
+ updater results.
+
+ """
+ one = scheduler(flow({
+ 'scheduler': {
+ 'allow implicit tasks': 'True',
+ },
+ 'scheduling': {
+ 'graph': {
+ 'R1': 'a & b & c',
+ }
+ }
+ }, name='one'), paused_start=True)
+ two = scheduler(flow(one_conf, name='two'))
+ tre = scheduler(flow(one_conf, name='tre'))
+
+ filters = get_default_filters()
+ id_base = str(test_dir.relative_to(Path("~/cylc-run").expanduser()))
+ filters['workflows']['id'] = f'{re.escape(id_base)}/.*'
+
+ updater = Updater()
+
+ async def the_test():
+ nonlocal filters
+ root_node = await await_update(updater)
+ assert {child['id_'] for child in root_node['children']} == {
+ one.tokens.id,
+ two.tokens.id,
+ tre.tokens.id,
+ }
+
+ # filter out paused workflows
+ filters = deepcopy(filters)
+ filters['workflows'][WorkflowStatus.STOPPED.value] = True
+ filters['workflows'][WorkflowStatus.PAUSED.value] = False
+ updater.update_filters(filters)
+
+ # "one" and "two" should now be filtered out
+ root_node = await await_update(updater)
+ assert {child['id_'] for child in root_node['children']} == {
+ tre.tokens.id,
+ }
+
+ # filter out stopped workflows
+ filters = deepcopy(filters)
+ filters['workflows'][WorkflowStatus.STOPPED.value] = False
+ filters['workflows'][WorkflowStatus.PAUSED.value] = True
+ updater.update_filters(filters)
+
+ # "tre" should now be filtered out
+ root_node = await await_update(updater)
+ assert {child['id_'] for child in root_node['children']} == {
+ one.tokens.id,
+ two.tokens.id,
+ }
+
+ # subscribe to "one"
+ updater.subscribe(one.tokens.id)
+ root_node = await wait_workflow_connected(updater, one.tokens)
+ assert get_child_tokens(root_node, types={'task'}, relative=True) == {
+ '1/a',
+ '1/b',
+ '1/c',
+ }
+
+ # filter out running tasks
+ # TODO: see https://github.com/cylc/cylc-flow/issues/5716
+ # filters = deepcopy(filters)
+ # filters['tasks'][TASK_STATUS_RUNNING] = False
+ # updater.update_filters(filters)
+
+ # root_node = await await_update(updater)
+ # assert get_child_tokens(
+ # root_node,
+ # types={'task'},
+ # relative=True
+ # ) == {
+ # '1/b',
+ # '1/c',
+ # }
+
+ # shut down the updater
+ updater.terminate()
+
+ # start workflow "one"
+ async with run(one):
+ # mark "1/a" as running and "1/b" as succeeded
+ one_a = one.pool.get_task(IntegerPoint('1'), 'a')
+ one_a.state_reset('running')
+ one.data_store_mgr.delta_task_state(one_a)
+ one.pool.get_task(IntegerPoint('1'), 'b').state_reset('succeeded')
+
+ # start workflow "two"
+ async with run(two):
+ # run the updater and the test
+ await asyncio.gather(
+ asyncio.create_task(the_test()),
+ asyncio.create_task(updater.run(filters)),
+ )
diff --git a/tests/unit/tui/test_data.py b/tests/unit/tui/test_data.py
index a2d17bf2e76..85805a5d1ea 100644
--- a/tests/unit/tui/test_data.py
+++ b/tests/unit/tui/test_data.py
@@ -28,7 +28,7 @@ def test_generate_mutation(monkeypatch):
monkeypatch.setattr(cylc.flow.tui.data, 'ARGUMENT_TYPES', arg_types)
assert generate_mutation(
'my_mutation',
- ['foo', 'bar']
+ {'foo': 'foo', 'bar': 'bar', 'user': 'user'}
) == '''
mutation($foo: String!, $bar: [Int]) {
my_mutation (foos: $foo, bars: $bar) {
diff --git a/tests/unit/tui/test_overlay.py b/tests/unit/tui/test_overlay.py
index 42334aac009..18e3c87daab 100644
--- a/tests/unit/tui/test_overlay.py
+++ b/tests/unit/tui/test_overlay.py
@@ -39,6 +39,7 @@ def overlay_functions():
getattr(cylc.flow.tui.overlay, obj.name)
for obj in tree.body
if isinstance(obj, ast.FunctionDef)
+ and not obj.name.startswith('_')
]
@@ -47,14 +48,14 @@ def test_interface(overlay_functions):
for function in overlay_functions:
# mock up an app object to keep things working
app = Mock(
- filter_states={},
+ filters={'tasks': {}, 'workflows': {'id': '.*'}},
tree_walker=Mock(
get_focus=Mock(
return_value=[
Mock(
get_node=Mock(
return_value=Mock(
- get_value=lambda: {'id_': 'a'}
+ get_value=lambda: {'id_': '~u/a'}
)
)
)
diff --git a/tests/unit/tui/test_util.py b/tests/unit/tui/test_util.py
index 00ac9fa95be..2b3231e0f7e 100644
--- a/tests/unit/tui/test_util.py
+++ b/tests/unit/tui/test_util.py
@@ -189,77 +189,87 @@ def test_compute_tree():
"""
tree = compute_tree({
- 'id': 'workflow id',
- 'cyclePoints': [
- {
- 'id': '1/family-suffix',
- 'cyclePoint': '1'
- }
- ],
- 'familyProxies': [
- { # top level family
- 'name': 'FOO',
- 'id': '1/FOO',
- 'cyclePoint': '1',
- 'firstParent': {'name': 'root', 'id': '1/root'}
- },
- { # nested family
- 'name': 'FOOT',
- 'id': '1/FOOT',
- 'cyclePoint': '1',
- 'firstParent': {'name': 'FOO', 'id': '1/FOO'}
- },
- ],
- 'taskProxies': [
- { # top level task
- 'name': 'pub',
- 'id': '1/pub',
- 'firstParent': {'name': 'root', 'id': '1/root'},
- 'cyclePoint': '1',
- 'jobs': []
- },
- { # child task (belongs to family)
- 'name': 'fan',
- 'id': '1/fan',
- 'firstParent': {'name': 'fan', 'id': '1/fan'},
- 'cyclePoint': '1',
- 'jobs': []
- },
- { # nested child task (belongs to incestuous family)
- 'name': 'fool',
- 'id': '1/fool',
- 'firstParent': {'name': 'FOOT', 'id': '1/FOOT'},
- 'cyclePoint': '1',
- 'jobs': []
- },
- { # a task which has jobs
- 'name': 'worker',
- 'id': '1/worker',
- 'firstParent': {'name': 'root', 'id': '1/root'},
- 'cyclePoint': '1',
- 'jobs': [
- {'id': '1/worker/03', 'submitNum': '3'},
- {'id': '1/worker/02', 'submitNum': '2'},
- {'id': '1/worker/01', 'submitNum': '1'}
- ]
- }
- ]
+ 'workflows': [{
+ 'id': 'workflow id',
+ 'port': 1234,
+ 'cyclePoints': [
+ {
+ 'id': '1/family-suffix',
+ 'cyclePoint': '1'
+ }
+ ],
+ 'familyProxies': [
+ { # top level family
+ 'name': 'FOO',
+ 'id': '1/FOO',
+ 'cyclePoint': '1',
+ 'firstParent': {'name': 'root', 'id': '1/root'}
+ },
+ { # nested family
+ 'name': 'FOOT',
+ 'id': '1/FOOT',
+ 'cyclePoint': '1',
+ 'firstParent': {'name': 'FOO', 'id': '1/FOO'}
+ },
+ ],
+ 'taskProxies': [
+ { # top level task
+ 'name': 'pub',
+ 'id': '1/pub',
+ 'firstParent': {'name': 'root', 'id': '1/root'},
+ 'cyclePoint': '1',
+ 'jobs': []
+ },
+ { # child task (belongs to family)
+ 'name': 'fan',
+ 'id': '1/fan',
+ 'firstParent': {'name': 'fan', 'id': '1/fan'},
+ 'cyclePoint': '1',
+ 'jobs': []
+ },
+ { # nested child task (belongs to incestuous family)
+ 'name': 'fool',
+ 'id': '1/fool',
+ 'firstParent': {'name': 'FOOT', 'id': '1/FOOT'},
+ 'cyclePoint': '1',
+ 'jobs': []
+ },
+ { # a task which has jobs
+ 'name': 'worker',
+ 'id': '1/worker',
+ 'firstParent': {'name': 'root', 'id': '1/root'},
+ 'cyclePoint': '1',
+ 'jobs': [
+ {'id': '1/worker/03', 'submitNum': '3'},
+ {'id': '1/worker/02', 'submitNum': '2'},
+ {'id': '1/worker/01', 'submitNum': '1'}
+ ]
+ }
+ ]
+ }]
})
+ # the root node
+ assert tree['type_'] == 'root'
+ assert tree['id_'] == 'root'
+ assert len(tree['children']) == 1
+
# the workflow node
- assert tree['type_'] == 'workflow'
- assert tree['id_'] == 'workflow id'
- assert list(tree['data']) == [
+ workflow = tree['children'][0]
+ assert workflow['type_'] == 'workflow'
+ assert workflow['id_'] == 'workflow id'
+ assert set(workflow['data']) == {
# whatever if present on the node should end up in data
- 'id',
'cyclePoints',
'familyProxies',
+ 'id',
+ 'port',
'taskProxies'
- ]
- assert len(tree['children']) == 1
+ }
+ assert len(workflow['children']) == 1
# the cycle point node
- cycle = tree['children'][0]
+ cycle = workflow['children'][0]
assert cycle['type_'] == 'cycle'
assert cycle['id_'] == '//1'
assert list(cycle['data']) == [