From 8742d5eaadd150a8632805c2375b15246bacb028 Mon Sep 17 00:00:00 2001 From: Oliver Sanders Date: Fri, 1 Sep 2023 12:03:50 +0100 Subject: [PATCH] Tui 1.0 * Move the updater into another process. This removes the limitation that caused Tui to crash with active workflows. Closes #3527 * Add multi-workflow capability. Closes #3464 * Add visual regression testing framework. Closes #3530 --- cylc/flow/option_parsers.py | 1 + cylc/flow/scripts/tui.py | 81 ++++-- cylc/flow/tui/__init__.py | 3 - cylc/flow/tui/app.py | 364 +++++++++++++++----------- cylc/flow/tui/data.py | 50 ++-- cylc/flow/tui/overlay.py | 123 +++++++-- cylc/flow/tui/tree.py | 87 +++++- cylc/flow/tui/updater.py | 279 ++++++++++++++++++++ cylc/flow/tui/util.py | 352 +++++++++++++++---------- tests/integration/tui/conftest.py | 186 +++++++++++++ tests/integration/tui/test_app.py | 200 ++++++++++++++ tests/integration/tui/test_updater.py | 278 ++++++++++++++++++++ tests/unit/tui/test_data.py | 2 +- tests/unit/tui/test_overlay.py | 5 +- tests/unit/tui/test_util.py | 134 +++++----- 15 files changed, 1717 insertions(+), 428 deletions(-) create mode 100644 cylc/flow/tui/updater.py create mode 100644 tests/integration/tui/conftest.py create mode 100644 tests/integration/tui/test_app.py create mode 100644 tests/integration/tui/test_updater.py diff --git a/cylc/flow/option_parsers.py b/cylc/flow/option_parsers.py index cb21f5e8bcf..96073ca4aef 100644 --- a/cylc/flow/option_parsers.py +++ b/cylc/flow/option_parsers.py @@ -44,6 +44,7 @@ ) WORKFLOW_ID_ARG_DOC = ('WORKFLOW', 'Workflow ID') +OPT_WORKFLOW_ID_ARG_DOC = ('[WORKFLOW]', 'Workflow ID') WORKFLOW_ID_MULTI_ARG_DOC = ('WORKFLOW ...', 'Workflow ID(s)') WORKFLOW_ID_OR_PATH_ARG_DOC = ('WORKFLOW | PATH', 'Workflow ID or path') ID_MULTI_ARG_DOC = ('ID ...', 'Workflow/Cycle/Family/Task ID(s)') diff --git a/cylc/flow/scripts/tui.py b/cylc/flow/scripts/tui.py index 86970052b46..c21fa0bd043 100644 --- a/cylc/flow/scripts/tui.py +++ b/cylc/flow/scripts/tui.py @@ -15,30 +15,35 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . -"""cylc tui WORKFLOW +"""cylc tui [WORKFLOW] View and control running workflows in the terminal. (Tui = Terminal User Interface) -WARNING: Tui is experimental and may break with large flows. -An upcoming change to the way Tui receives data from the scheduler will make it -much more efficient in the future. +Tui allows you to monitor and interact with workflows in a manner similar +to the GUI. + +Press "h" whilst running Tui to bring up the help screen, use the arrow +keys to navigage. + """ -# TODO: remove this warning once Tui is delta-driven -# https://github.com/cylc/cylc-flow/issues/3527 +from getpass import getuser from textwrap import indent -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Optional from urwid import html_fragment +from cylc.flow.exceptions import InputError +from cylc.flow.id import Tokens from cylc.flow.id_cli import parse_id from cylc.flow.option_parsers import ( - WORKFLOW_ID_ARG_DOC, + OPT_WORKFLOW_ID_ARG_DOC, CylcOptionParser as COP, ) from cylc.flow.terminal import cli_function from cylc.flow.tui import TUI +from cylc.flow.tui.util import suppress_logging from cylc.flow.tui.app import ( TuiApp, TREE_EXPAND_DEPTH @@ -55,7 +60,7 @@ def get_option_parser() -> COP: parser = COP( __doc__, - argdoc=[WORKFLOW_ID_ARG_DOC], + argdoc=[OPT_WORKFLOW_ID_ARG_DOC], # auto_add=False, NOTE: at present auto_add can not be turned off color=False ) @@ -80,32 +85,58 @@ def get_option_parser() -> COP: action='store', default='80,24' ) + parser.add_option( + '--single-workflow', '-s', + help=( + 'Only show a single workflow rather than all workflows' + ), + action='store_true', + default=False, + ) return parser -@cli_function(get_option_parser) -def main(_, options: 'Values', workflow_id: str) -> None: - workflow_id, *_ = parse_id( - workflow_id, - constraint='workflows', +def configure_screenshot(v_term_size): + screen = html_fragment.HtmlGenerator() + screen.set_terminal_properties(256) + screen.register_palette(TuiApp.palette) + html_fragment.screenshot_init( + [tuple(map(int, v_term_size.split(',')))], + [] ) + return screen, html_fragment + + +@cli_function(get_option_parser) +def main(_, options: 'Values', workflow_id: Optional[str] = None) -> None: + if options.single_workflow and not workflow_id: + raise InputError('--single-workflow requires a workflow ID') + + # get workflow ID if specified + if workflow_id: + workflow_id, *_ = parse_id( + workflow_id, + constraint='workflows', + ) + tokens = Tokens(workflow_id) + workflow_id = tokens.duplicate(user=getuser()).id + + # configure Tui screen = None + html_fragment = None if options.display == 'html': - TREE_EXPAND_DEPTH[0] = -1 # expand tree fully - screen = html_fragment.HtmlGenerator() - screen.set_terminal_properties(256) - screen.register_palette(TuiApp.palette) - html_fragment.screenshot_init( - [tuple(map(int, options.v_term_size.split(',')))], - [] - ) + screen, html_fragment = configure_screenshot(options.v_term_size) + # start Tui try: - TuiApp(workflow_id, screen=screen).main() + with suppress_logging(): + TuiApp(screen=screen).main( + workflow_id, + options.single_workflow, + ) if options.display == 'html': - for fragment in html_fragment.screenshot_collect(): - print(fragment) + print(html_fragment.screenshot_collect()[-1]) except KeyboardInterrupt: pass diff --git a/cylc/flow/tui/__init__.py b/cylc/flow/tui/__init__.py index 92e3bce0268..78ce0f36d62 100644 --- a/cylc/flow/tui/__init__.py +++ b/cylc/flow/tui/__init__.py @@ -143,6 +143,3 @@ def list_groups(self): if binding['group'] == name ] ) - - -BINDINGS = Bindings() diff --git a/cylc/flow/tui/app.py b/cylc/flow/tui/app.py index cf09f75db0e..72748de5ad7 100644 --- a/cylc/flow/tui/app.py +++ b/cylc/flow/tui/app.py @@ -16,22 +16,17 @@ # along with this program. If not, see . """The application control logic for Tui.""" +from contextlib import contextmanager +from multiprocessing import Process +import re import sys import urwid from urwid import html_fragment from urwid.wimp import SelectableIcon -from pathlib import Path -from cylc.flow.network.client_factory import get_client -from cylc.flow.exceptions import ( - ClientError, - ClientTimeout, - WorkflowStopped -) -from cylc.flow.pathutil import get_workflow_run_dir +from cylc.flow.id import Tokens from cylc.flow.task_state import ( - TASK_STATUSES_ORDERED, TASK_STATUS_SUBMITTED, TASK_STATUS_RUNNING, TASK_STATUS_FAILED, @@ -41,7 +36,7 @@ ) import cylc.flow.tui.overlay as overlay from cylc.flow.tui import ( - BINDINGS, + Bindings, FORE, BACK, JOB_COLOURS, @@ -49,21 +44,25 @@ ) from cylc.flow.tui.tree import ( find_closest_focus, - translate_collapsing + translate_collapsing, + expand_tree, +) +from cylc.flow.tui.updater import ( + Updater, + get_default_filters, ) from cylc.flow.tui.util import ( - compute_tree, dummy_flow, - get_task_status_summary, - get_workflow_status_str, render_node ) -from cylc.flow.workflow_files import WorkflowFiles +from cylc.flow.workflow_status import ( + WorkflowStatus, +) urwid.set_encoding('utf8') # required for unicode task icons -TREE_EXPAND_DEPTH = [2] +TREE_EXPAND_DEPTH = 5 class TuiWidget(urwid.TreeWidget): @@ -82,18 +81,20 @@ class TuiWidget(urwid.TreeWidget): # will skip rows when the user navigates unexpandable_icon = SelectableIcon(' ', 0) - def __init__(self, node, max_depth=None): - if not max_depth: - max_depth = TREE_EXPAND_DEPTH[0] + def __init__(self, app, node, max_depth=None): + self.app = app self._node = node self._innerwidget = None self.is_leaf = not node.get_child_keys() - if max_depth > 0: - self.expanded = node.get_depth() < max_depth - else: - self.expanded = True + # if max_depth > 0: + # self.expanded = node.get_depth() < max_depth + # else: + # self.expanded = True + self.expanded = False widget = self.get_indented_widget() urwid.WidgetWrap.__init__(self, widget) + # self.expanded = False + # self.update_expanded_icon() def selectable(self): """Return True if this node is selectable. @@ -158,6 +159,19 @@ def get_indented_widget(self): ) return self.__super.get_indented_widget() + def update_expanded_icon(self): + node = self.get_node() + value = node.get_value() + data = value['data'] + type_ = value['type_'] + if type_ == 'workflow': + if self.expanded: + self.app.updater.subscribe(data['id']) + self.app.expand_on_load = data['id'] + else: + self.app.updater.unsubscribe(data['id']) + return urwid.TreeWidget.update_expanded_icon(self) + class TuiNode(urwid.TreeNode): """Data storage object for leaf nodes.""" @@ -169,8 +183,12 @@ def load_widget(self): class TuiParentNode(urwid.ParentNode): """Data storage object for interior/parent nodes.""" + def __init__(self, app, *args, **kwargs): + self.app = app + urwid.ParentNode.__init__(self, *args, **kwargs) + def load_widget(self): - return TuiWidget(self) + return TuiWidget(self.app, self) def load_child_keys(self): # Note: keys are really indices. @@ -185,6 +203,7 @@ def load_child_node(self, key): else: childclass = TuiNode return childclass( + self.app, childdata, parent=self, key=key, @@ -192,6 +211,20 @@ def load_child_node(self, key): ) +@contextmanager +def updater_subproc(filters): + """Runs the Updater in its own process.""" + # start the updater + updater = Updater() + p = Process(target=updater.start, args=(filters,)) + try: + p.start() + yield updater + finally: + updater.terminate() + p.join() + + class TuiApp: """An application to display a single Cylc workflow. @@ -206,8 +239,7 @@ class TuiApp: """ - UPDATE_INTERVAL = 1 - CLIENT_TIMEOUT = 1 + UPDATE_INTERVAL = 0.1 palette = [ ('head', FORE, BACK), @@ -227,16 +259,14 @@ class TuiApp: for status, spec in WORKFLOW_COLOURS.items() ] - def __init__(self, id_, screen=None): - self.id_ = id_ - self.client = None + def __init__(self, screen=None): self.loop = None - self.screen = None + self.screen = screen self.stack = 0 self.tree_walker = None # create the template - topnode = TuiParentNode(dummy_flow({'id': 'Loading...'})) + topnode = TuiParentNode(self, dummy_flow({'id': 'Loading...'})) self.listbox = urwid.TreeListBox(urwid.TreeWalker(topnode)) header = urwid.Text('\n') footer = urwid.AttrWrap( @@ -249,27 +279,87 @@ def __init__(self, id_, screen=None): header=urwid.AttrWrap(header, 'head'), footer=footer ) - self.filter_states = { - state: True - for state in TASK_STATUSES_ORDERED - } - if isinstance(screen, html_fragment.HtmlGenerator): - # the HtmlGenerator only captures one frame - # so we need to pre-populate the GUI before - # starting the event loop - self.update() + self.filters = {} - def main(self): + def main(self, w_id=None, single_workflow=False, id_filter=None): """Start the event loop.""" - self.loop = urwid.MainLoop( - self.view, - self.palette, - unhandled_input=self.unhandled_input, - screen=self.screen - ) - # schedule the first update - self.loop.set_alarm_in(0, self._update) - self.loop.run() + # TODO: move to INIT + self.filters = get_default_filters() + if single_workflow and w_id: + workflow = str(Tokens(w_id)['workflow']) + self.filters['workflows']['id'] = rf'^{re.escape(workflow)}$' + elif id_filter: + self.filters['workflows']['id'] = id_filter + + with updater_subproc(self.filters) as updater: + self.updater = updater + + # pre-subscribe to the provided workflow if requested + self.expand_on_load = w_id or 'root' + if w_id: + self.updater.subscribe(w_id) + + # configure the urwid main loop + self.loop = urwid.MainLoop( + self.view, + self.palette, + unhandled_input=self.unhandled_input, + screen=self.screen + ) + + if isinstance(self.screen, html_fragment.HtmlGenerator): + # Tui has been configured to generate HTML screenshots + while self.expand_on_load: + # load the data and wait for any requested workflows to be + # loaded + self.update() + return + else: + # Tui is being run normally as an interactive application + # schedule the first update + self.loop.set_alarm_in(0, self._update) + + # start the urwid main loop + self.loop.run() + + @contextmanager + def start_noninteractive(self, w_id=None, single_workflow=False, id_filter=None): + """Start the event loop.""" + # TODO: move to INIT + self.filters = get_default_filters() + if single_workflow and w_id: + workflow = str(Tokens(w_id)['workflow']) + self.filters['workflows']['id'] = rf'^{re.escape(workflow)}$' + elif id_filter: + self.filters['workflows']['id'] = id_filter + + with updater_subproc(self.filters) as updater: + self.updater = updater + + # pre-subscribe to the provided workflow if requested + self.expand_on_load = w_id or 'root' + if w_id: + self.updater.subscribe(w_id) + + # configure the urwid main loop + self.loop = urwid.MainLoop( + self.view, + self.palette, + unhandled_input=self.unhandled_input, + screen=self.screen + ) + + self.wait_until_loaded(w_id or 'root') + + yield self + + def wait_until_loaded(self, *ids): + """Wait for any requested nodes to be created. + """ + for id_ in ids: + self.expand_on_load = id_ + while self.expand_on_load: + self.update() def unhandled_input(self, key): """Catch key presses, uncaught events are passed down the chain.""" @@ -285,74 +375,6 @@ def unhandled_input(self, key): meth(self, *args) return - def get_snapshot(self): - """Contact the workflow, return a tree structure - - In the event of error contacting the workflow the - message is written to this Widget's header. - - Returns: - dict if successful, else False - - """ - try: - if not self.client: - self.client = get_client(self.id_, timeout=self.CLIENT_TIMEOUT) - data = self.client( - 'graphql', - { - 'request_string': QUERY, - 'variables': { - # list of task states we want to see - 'taskStates': [ - state - for state, is_on in self.filter_states.items() - if is_on - ] - } - } - ) - except WorkflowStopped: - # Distinguish stopped flow from non-existent flow. - self.client = None - full_path = Path(get_workflow_run_dir(self.id_)) - if ( - (full_path / WorkflowFiles.SUITE_RC).is_file() - or (full_path / WorkflowFiles.FLOW_FILE).is_file() - ): - message = "stopped" - else: - message = ( - f"No {WorkflowFiles.SUITE_RC} or {WorkflowFiles.FLOW_FILE}" - f"found in {self.id_}." - ) - - return dummy_flow({ - 'name': self.id_, - 'id': self.id_, - 'status': message, - 'stateTotals': {} - }) - except (ClientError, ClientTimeout) as exc: - # catch network / client errors - self.set_header([('workflow_error', str(exc))]) - return False - - if isinstance(data, list): - # catch GraphQL errors - try: - message = data[0]['error']['message'] - except (IndexError, KeyError): - message = str(data) - self.set_header([('workflow_error', message)]) - return False - - if len(data['workflows']) != 1: - # multiple workflows in returned data - shouldn't happen - raise ValueError() - - return compute_tree(data['workflows'][0]) - @staticmethod def get_node_id(node): """Return a unique identifier for a node. @@ -377,23 +399,13 @@ def set_header(self, message: list): """ # put in a one line gap message.append('\n') - - # TODO: remove once Tui is delta-driven - # https://github.com/cylc/cylc-flow/issues/3527 - message.extend([ - ( - 'workflow_error', - 'TUI is experimental and may break with large flows' - ), - '\n' - ]) - self.view.header = urwid.Text(message) def _update(self, *_): try: self.update() except Exception as exc: + raise # TODO sys.exit(exc) def update(self): @@ -402,26 +414,32 @@ def update(self): Preserves the current focus and collapse/expand state. """ - # update the data store - # TODO: this can be done incrementally using deltas - # once this interface is available - snapshot = self.get_snapshot() - if snapshot is False: + # attempt to fetch an update + update = False + while not self.updater.update_queue.empty(): + # fetch the most recent update + update = self.updater.update_queue.get() + + if update is False: + if self.loop: + self.loop.set_alarm_in(self.UPDATE_INTERVAL, self._update) return False # update the workflow status message - header = [get_workflow_status_str(snapshot['data'])] - status_summary = get_task_status_summary(snapshot['data']) - if status_summary: - header.extend([' ('] + status_summary + [' )']) - if not all(self.filter_states.values()): - header.extend([' ', '*filtered* "R" to reset', ' ']) + header = [] + # header = [get_workflow_status_str(snapshot['data'])] + # status_summary = get_task_status_summary(snapshot['data']) + # if status_summary: + # header.extend([' ('] + status_summary + [' )']) + + if not all(self.filters['tasks'].values()): + header.extend([' ', '*tasks filtered* "F" to edit, "R" to reset', ' ']) self.set_header(header) # global update - the nuclear option - slow but simple # TODO: this can be done incrementally by adding and # removing nodes from the existing tree - topnode = TuiParentNode(snapshot) + topnode = TuiParentNode(self, update) # NOTE: because we are nuking the tree we need to manually # preserve the focus and collapse status of tree nodes @@ -436,6 +454,13 @@ def update(self): # get the new focus _, new_node = self.listbox._body.get_focus() + if self.expand_on_load: + depth = TREE_EXPAND_DEPTH + if self.expand_on_load == 'root': + depth = 1 + if expand_tree(self, new_node, self.expand_on_load, depth): + self.expand_on_load = None + # preserve the focus or walk to the nearest parent closest_focus = find_closest_focus(self, old_node, new_node) self.listbox._body.set_focus(closest_focus) @@ -457,11 +482,25 @@ def filter_by_task_state(self, filtered_state=None): A task state to filter by or None. """ - self.filter_states = { + self.filters['tasks'] = { state: (state == filtered_state) or not filtered_state - for state in self.filter_states + for state in self.filters['tasks'] + } + self.updater.update_filters(self.filters) + + def filter_by_workflow_state(self, *filtered_states): + """Filter workflows. + + Args: + filtered_state (str): + A task state to filter by or None. + + """ + self.filters['workflows'] = { + state: (state in filtered_states) or not filtered_states + for state in self.filters['workflows'] } - return + self.updater.update_filters(self.filters) def open_overlay(self, fcn): self.create_overlay(*fcn(self)) @@ -521,6 +560,7 @@ def close_topmost(self): self.stack -= 1 +BINDINGS = Bindings() BINDINGS.add_group( '', 'Application Controls' @@ -603,40 +643,62 @@ def close_topmost(self): ) BINDINGS.add_group( - 'filter', + 'filter tasks', 'Filter by task state' ) BINDINGS.bind( - ('F',), - 'filter', + ('T',), + 'filter tasks', 'Select task states to filter by', (TuiApp.open_overlay, overlay.filter_task_state) ) BINDINGS.bind( ('f',), - 'filter', + 'filter tasks', 'Show only failed tasks', (TuiApp.filter_by_task_state, TASK_STATUS_FAILED) ) BINDINGS.bind( ('s',), - 'filter', + 'filter tasks', 'Show only submitted tasks', (TuiApp.filter_by_task_state, TASK_STATUS_SUBMITTED) ) BINDINGS.bind( ('r',), - 'filter', + 'filter tasks', 'Show only running tasks', (TuiApp.filter_by_task_state, TASK_STATUS_RUNNING) ) BINDINGS.bind( ('R',), - 'filter', + 'filter tasks', 'Reset task state filtering', (TuiApp.filter_by_task_state,) ) +BINDINGS.add_group( + 'filter workflows', + 'Filter by workflow state' +) +BINDINGS.bind( + ('W',), + 'filter workflows', + 'Select workflow states to filter by', + (TuiApp.open_overlay, overlay.filter_workflow_state) +) +BINDINGS.bind( + ('p',), + 'filter workflows', + 'Show only running workflows', + ( + TuiApp.filter_by_workflow_state, + WorkflowStatus.RUNNING.value, + WorkflowStatus.PAUSED.value, + WorkflowStatus.STOPPING.value + ) +) + def list_bindings(): """Write out an in-line list of the key bindings.""" diff --git a/cylc/flow/tui/data.py b/cylc/flow/tui/data.py index 4fd538d8783..a20fe8d8396 100644 --- a/cylc/flow/tui/data.py +++ b/cylc/flow/tui/data.py @@ -19,6 +19,7 @@ import sys from cylc.flow.exceptions import ClientError +from cylc.flow.id import Tokens from cylc.flow.tui.util import ( extract_context ) @@ -29,6 +30,7 @@ workflows { id name + port status stateTotals taskProxies(states: $taskStates) { @@ -154,23 +156,20 @@ def cli_cmd(*cmd): raise ClientError(f'Error in command {" ".join(cmd)}\n{err}') -def _clean(workflow): - # for now we will exit tui when the workflow is cleaned - # this will change when tui supports multiple workflows - cli_cmd('clean', workflow) - sys.exit(0) - - OFFLINE_MUTATIONS = { + 'user': { + 'stop-all': partial(cli_cmd, 'stop', '*'), + }, 'workflow': { 'play': partial(cli_cmd, 'play'), - 'clean': _clean, + 'clean': partial(cli_cmd, 'clean', '--yes'), 'reinstall-reload': partial(cli_cmd, 'vr', '--yes'), } } def generate_mutation(mutation, arguments): + arguments.pop('user') graphql_args = ', '.join([ f'${argument}: {ARGUMENT_TYPES[argument]}' for argument in arguments @@ -206,28 +205,40 @@ def context_to_variables(context): Examples: >>> context_to_variables(extract_context(['~a/b//c/d'])) - {'workflow': ['b'], 'task': ['c/d']} + {'user': ['a'], 'workflow': ['b'], 'task': ['c/d']} >>> context_to_variables(extract_context(['~a/b//c'])) - {'workflow': ['b'], 'task': ['c/*']} + {'user': ['a'], 'workflow': ['b'], 'task': ['c/*']} >>> context_to_variables(extract_context(['~a/b'])) - {'workflow': ['b']} + {'user': ['a'], 'workflow': ['b']} """ # context_to_variables because it can only handle single-selection ATM - variables = {'workflow': context['workflow']} + variables = {'user': context['user']} + + if 'workflow' in context: + variables['workflow'] = context['workflow'] if 'task' in context: variables['task'] = [ - f'{context["cycle"][0]}/{context["task"][0]}' + Tokens( + cycle=context['cycle'][0], + task=context['task'][0] + ).relative_id ] elif 'cycle' in context: - variables['task'] = [f'{context["cycle"][0]}/*'] + variables['task'] = [ + Tokens(cycle=context['cycle'][0], task='*').relative_id + ] return variables def mutate(client, mutation, selection): - if mutation in OFFLINE_MUTATIONS['workflow']: + if mutation in { + _mutation + for section in OFFLINE_MUTATIONS.values() + for _mutation in section + }: offline_mutate(mutation, selection) elif client: online_mutate(client, mutation, selection) @@ -256,6 +267,9 @@ def offline_mutate(mutation, selection): """Issue a mutation over the CLI or other offline interface.""" context = extract_context(selection) variables = context_to_variables(context) - for workflow in variables['workflow']: - # NOTE: this currently only supports workflow mutations - OFFLINE_MUTATIONS['workflow'][mutation](workflow) + if 'workflow' in variables: + for workflow in variables['workflow']: + # NOTE: this currently only supports workflow mutations + OFFLINE_MUTATIONS['workflow'][mutation](workflow) + else: + OFFLINE_MUTATIONS['user'][mutation]() diff --git a/cylc/flow/tui/overlay.py b/cylc/flow/tui/overlay.py index cddcc8d5093..54e1fad261c 100644 --- a/cylc/flow/tui/overlay.py +++ b/cylc/flow/tui/overlay.py @@ -37,55 +37,122 @@ """ +from contextlib import suppress from functools import partial +import re import sys import urwid from cylc.flow.exceptions import ( ClientError, + ClientTimeout, + WorkflowStopped, ) +from cylc.flow.id import Tokens +from cylc.flow.network.client_factory import get_client from cylc.flow.task_state import ( TASK_STATUSES_ORDERED, TASK_STATUS_WAITING ) +import cylc.flow.tui.app from cylc.flow.tui import ( - BINDINGS, JOB_COLOURS, JOB_ICON, TUI ) from cylc.flow.tui.data import ( + extract_context, list_mutations, mutate, ) from cylc.flow.tui.util import ( - get_task_icon + get_task_icon, ) +def _toggle_filter(app, filter_group, status, *_): + """Toggle a filter state.""" + app.filters[filter_group][status] = not app.filters[filter_group][status] + app.updater.update_filters(app.filters) + + +def _invert_filter(checkboxes, *_): + """Invert the state of all filters.""" + for checkbox in checkboxes: + checkbox.set_state(not checkbox.state) + + +def filter_workflow_state(app): + """Return a widget for adjusting the workflow filter options.""" + checkboxes = [ + urwid.CheckBox( + [status], + state=is_on, + on_state_change=partial(_toggle_filter, app, 'workflows', status) + ) + for status, is_on in app.filters['workflows'].items() + if status != 'id' + ] + + workflow_id_prompt = 'id (regex)' + + def update_id_filter(widget, value): + nonlocal app + try: + # ensure the filter is value before updating the filter + re.compile(value) + except re.error: + # error in the regex -> inform the user + widget.set_caption(f'{workflow_id_prompt} - error: \n') + else: + # valid regex -> update the filter + widget.set_caption(f'{workflow_id_prompt}: \n') + app.filters['workflows']['id'] = value + app.updater.update_filters(app.filters) + + id_filter_widget = urwid.Edit( + caption=f'{workflow_id_prompt}: \n', + edit_text=app.filters['workflows']['id'], + ) + urwid.connect_signal(id_filter_widget, 'change', update_id_filter) + + widget = urwid.ListBox( + urwid.SimpleFocusListWalker([ + urwid.Text('Filter Workflow States'), + urwid.Divider(), + urwid.Padding( + urwid.Button( + 'Invert', + on_press=partial(_invert_filter, checkboxes) + ), + right=19 + ) + ] + checkboxes + [ + urwid.Divider(), + id_filter_widget, + ]) + ) + + return ( + widget, + {'width': 35, 'height': 23} + ) + + def filter_task_state(app): """Return a widget for adjusting the task state filter.""" - def toggle(state, *_): - """Toggle a filter state.""" - app.filter_states[state] = not app.filter_states[state] - checkboxes = [ urwid.CheckBox( get_task_icon(state) + [' ' + state], state=is_on, - on_state_change=partial(toggle, state) + on_state_change=partial(_toggle_filter, app, 'tasks', state) ) - for state, is_on in app.filter_states.items() + for state, is_on in app.filters['tasks'].items() ] - def invert(*_): - """Invert the state of all filters.""" - for checkbox in checkboxes: - checkbox.set_state(not checkbox.state) - widget = urwid.ListBox( urwid.SimpleFocusListWalker([ urwid.Text('Filter Task States'), @@ -93,7 +160,7 @@ def invert(*_): urwid.Padding( urwid.Button( 'Invert', - on_press=invert + on_press=partial(_invert_filter, checkboxes) ), right=19 ) @@ -127,7 +194,7 @@ def help_info(app): ] # list key bindings - for group, bindings in BINDINGS.list_groups(): + for group, bindings in cylc.flow.tui.app.BINDINGS.list_groups(): items.append( urwid.Text([ f'{group["desc"]}:' @@ -214,22 +281,38 @@ def context(app): """An overlay for context menus.""" value = app.tree_walker.get_focus()[0].get_node().get_value() selection = [value['id_']] # single selection ATM + context = extract_context(selection) + + client = None + if 'workflow' in context: + w_id = context['workflow'][0] + with suppress(WorkflowStopped, ClientError, ClientTimeout): + client = get_client(w_id) def _mutate(mutation, _): - nonlocal app + nonlocal app, client app.open_overlay(partial(progress, text='Running Command')) try: - mutate(app.client, mutation, selection) + mutate(client, mutation, selection) except ClientError as exc: app.open_overlay(partial(error, text=str(exc))) else: app.close_topmost() app.close_topmost() + # determine the ID to display for the context menu + tokens = Tokens(value["id_"]) + if tokens.is_task_like: + # if it's a cycle/task/job, then use the relative id + display_id = tokens.relative_id + else: + # otherwise use the full id + display_id = tokens.id + widget = urwid.ListBox( urwid.SimpleFocusListWalker( [ - urwid.Text(f'id: {value["id_"]}'), + urwid.Text(f'id: {display_id}'), urwid.Divider(), urwid.Text('Action'), urwid.Button( @@ -242,14 +325,14 @@ def _mutate(mutation, _): mutation, on_press=partial(_mutate, mutation) ) - for mutation in list_mutations(app.client, selection) + for mutation in list_mutations(client, selection) ] ) ) return ( widget, - {'width': 30, 'height': 20} + {'width': 40, 'height': 20} ) diff --git a/cylc/flow/tui/tree.py b/cylc/flow/tui/tree.py index 84ed55ab53a..f933acfe95f 100644 --- a/cylc/flow/tui/tree.py +++ b/cylc/flow/tui/tree.py @@ -56,6 +56,76 @@ def find_closest_focus(app, old_node, new_node): ) +def expand_tree(app, new_node, head, depth, node_types=None): + """Expand the Tui tree to the desired level. + + Arguments: + app: + The Tui application instance. + new_node: + The Tui widget representing the tree view. + head: + If specified, the tree below this node will be expanded. + depth: + The max depth to expand nodes too. + node_types: + Whitelist of node types to expand, note "task", "job" and "spring" + nodes are excluded by default. + + Returns: + True, if the node was found in the tree, is loaded and has been + expanded. + + Examples: + # expand the top three levels of the tree + compute_tree(app, node, None, 3) + + # expand the "root" node AND the top five levels of the tree under + # ~user/workflow + compute_tree(app, node, '~user/workflow', 5) + + """ + if not node_types: + # don't auto-expand job nodes by default + node_types = {'root', 'workflow', 'cycle', 'family'} + + new_root = new_node.get_root() + head_node = new_root + + # locate the "head" if specified + if head: + for node in walk_tree(new_root): + key = app.get_node_id(node) + if key == head: + head_node = node + child_keys = node.get_child_keys() + if ( + # if the node only has one child + len(child_keys) == 1 + # and that child is a "#spring" node (i.e. a loading node) + and node.get_child_node([k for k in child_keys][0]).get_value()['type_'] == '#spring' + ): + # then the content hasn't loaded yet so the node cannot be + # expanded + return False + break + else: + # the requested node does not exist yet + # it might still be loading + return False + + # expand the specified nodes + for node in (*walk_tree(head_node, depth), new_root): + if node.get_value()['type_'] not in node_types: + continue + widget = node.get_widget() + widget.expanded = True + widget.update_expanded_icon() + # print('EXPAND',node.get_value()['id_']) + + return True + + def translate_collapsing(app, old_node, new_node): """Transfer the collapse state from one tree to another. @@ -88,22 +158,25 @@ def translate_collapsing(app, old_node, new_node): widget.update_expanded_icon() -def walk_tree(node): +def walk_tree(node, depth=None): """Yield nodes in order. Arguments: node (urwid.TreeNode): Yield this node and all nodes beneath it. + depth: + The maximum depth to walk to or None to walk all children. Yields: urwid.TreeNode """ - stack = [node] + stack = [(node, 1)] while stack: - node = stack.pop() + node, _depth = stack.pop() yield node - stack.extend([ - node.get_child_node(index) - for index in node.get_child_keys() - ]) + if depth is None or _depth < depth: + stack.extend([ + (node.get_child_node(index), _depth + 1) + for index in node.get_child_keys() + ]) diff --git a/cylc/flow/tui/updater.py b/cylc/flow/tui/updater.py new file mode 100644 index 00000000000..3203bc761ce --- /dev/null +++ b/cylc/flow/tui/updater.py @@ -0,0 +1,279 @@ +#!/usr/bin/env python3 +# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE. +# Copyright (C) NIWA & British Crown (Met Office) & Contributors. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +"""Contains the logic for updating the Tui app.""" + +from asyncio import ( + run, + sleep, + gather, +) +from copy import deepcopy +from multiprocessing import Queue +from time import time + +from cylc.flow.exceptions import ( + ClientError, + ClientTimeout, + CylcError, + WorkflowStopped, +) +from cylc.flow.id import Tokens +from cylc.flow.network.client_factory import get_client +from cylc.flow.network.scan import ( + filter_name, + graphql_query, + is_active, + scan, +) +from cylc.flow.task_state import ( + TASK_STATUSES_ORDERED, +) +from cylc.flow.tui.data import ( + QUERY +) +from cylc.flow.tui.util import ( + compute_tree, + suppress_logging, +) +from cylc.flow.workflow_status import ( + WorkflowStatus, +) + + +def get_default_filters(): + """Return default task/workflow filters. + + These filters show everything. + """ + return { + 'tasks': { + # filtered task statuses + state: True + for state in TASK_STATUSES_ORDERED + }, + 'workflows': { + # filtered workflow statuses + **{ + state.value: True + for state in WorkflowStatus + }, + # filtered workflow ids + 'id': '.*', + } + } + + +class Updater(): + + CLIENT_TIMEOUT = 1 + BASE_UPDATE_INTERVAL = 1 + BASE_SCAN_INTERVAL = 10 + + SIGNAL_TERMINATE = 'terminate' + + def __init__(self, w_id=None): + # Cylc comms clients for each workflow we're connected to + self._clients = {} + + # iterate over this to get a list of workflows + self._scan_pipe = None + # the new pipe if the workflow filter options are changed + self.__scan_pipe = None + + # task/workflow filters + self.filters = None # note set on self.run() + # queue for pushing out updates + self.update_queue = Queue() + # queue for commands to the updater + self._command_queue = Queue() + + def subscribe(self, w_id): + """Subscribe to updates from a workflow.""" + self._command_queue.put((self._subscribe.__name__, w_id)) + + def unsubscribe(self, w_id): + """Unsubscribe to updates from a workflow.""" + self._command_queue.put((self._unsubscribe.__name__, w_id)) + + def update_filters(self, filters): + """Update the task state filter.""" + self._command_queue.put((self._update_filters.__name__, filters)) + + def terminate(self): + """Stop the updater.""" + self._command_queue.put((self.SIGNAL_TERMINATE, None)) + + def start(self, filters): + """Start the updater in a new asyncio.loop. + + The Tui app will call this within a dedicated process. + """ + run(self.run(filters)) + + async def run(self, filters): + """Start the updater in an existing asyncio.loop. + + The tests call this within the same process. + """ + with suppress_logging(): + self._update_filters(filters) + await self._update() + + def _subscribe(self, w_id): + if w_id not in self._clients: + self._clients[w_id] = None + + def _unsubscribe(self, w_id): + if w_id in self._clients: + self._clients.pop(w_id) + + def _update_filters(self, filters): + if ( + not self.filters + or filters['workflows']['id'] != self.filters['workflows']['id'] + ): + # update the scan pipe + self.__scan_pipe = ( + # scan all workflows + scan + # filter by workflow name + | filter_name(filters['workflows']['id']) + # if the workflow is active, retrieve its status + | is_active(True, filter_stop=False) + | graphql_query({'status': None}) + ) + + self.filters = filters + + async def _update(self): + last_scan_time = 0 + while True: + if not self._command_queue.empty(): + (command, payload) = self._command_queue.get() + if command == self.SIGNAL_TERMINATE: + break + getattr(self, command)(payload) + continue + + update_start_time = time() + if update_start_time - last_scan_time > self.BASE_SCAN_INTERVAL: + data = await self._scan() + + self.update_queue.put(await self._run_update(data)) + + update_time = time() - update_start_time + await sleep(self.BASE_UPDATE_INTERVAL - update_time) + + async def _run_update(self, data): + # copy the scanned data so it can be reused for future updates + data = deepcopy(data) + + # connect to schedulers if needed + self._connect(data) + + # update data with the response from each workflow + # TODO: make this even more async so each workflow can update on a + # different timescale? + await gather( + *( + self._update_workflow(w_id, client, data) + for w_id, client in self._clients.items() + ) + ) + + return compute_tree(data) + + async def _update_workflow(self, w_id, client, data): + if not client: + return + + def _append(workflow_data): + for workflow in data['workflows']: + if workflow['id'] == workflow_data['id']: + workflow.update(workflow_data) + break + + try: + workflow_data = await client.async_request( + 'graphql', + { + 'request_string': QUERY, + 'variables': { + # list of task states we want to see + 'taskStates': [ + state + for state, is_on in self.filters['tasks'].items() + if is_on + ] + } + } + ) + except WorkflowStopped: + # TODO + pass + except (ClientError, ClientTimeout): + # TODO + pass + except CylcError: + pass + else: + _append(workflow_data['workflows'][0]) + + def _connect(self, data): + for w_id, client in self._clients.items(): + if not client: + try: + self._clients[w_id] = get_client( + Tokens(w_id)['workflow'], + timeout=self.CLIENT_TIMEOUT + ) + except WorkflowStopped: + for workflow in data['workflows']: + if workflow['id'] == w_id: + workflow['_tui_data'] = f'Workflow is not running' + except (ClientError, ClientTimeout) as exc: + for workflow in data['workflows']: + if workflow['id'] == w_id: + workflow['_tui_data'] = f'Error: {exc}' + break + + async def _scan(self): + # TODO async! + data = {'workflows': []} + workflow_filter_statuses = { + status + for status, filtered in self.filters['workflows'].items() + if filtered + } + if self.__scan_pipe: + # switch to the new pipe if it has been changed + self._scan_pipe = self.__scan_pipe + async for workflow in self._scan_pipe: + status = workflow.get('status', WorkflowStatus.STOPPED.value) + if status not in workflow_filter_statuses: + # this workflow is filtered out + continue + data['workflows'].append({ + 'id': f'~osanders/{workflow["name"]}', + 'name': workflow['name'], + 'status': status, + 'stateTotals': {}, + }) + + data['workflows'].sort(key=lambda x: x['id']) + return data diff --git a/cylc/flow/tui/util.py b/cylc/flow/tui/util.py index 575fc693437..6587cd4be1d 100644 --- a/cylc/flow/tui/util.py +++ b/cylc/flow/tui/util.py @@ -16,10 +16,13 @@ # along with this program. If not, see . """Common utilities for Tui.""" +from contextlib import contextmanager +from getpass import getuser from itertools import zip_longest import re from time import time +from cylc.flow import LOG from cylc.flow.id import Tokens from cylc.flow.task_state import ( TASK_STATUS_RUNNING @@ -33,6 +36,26 @@ from cylc.flow.wallclock import get_unix_time_from_time_string +# the Tui user, note this is always the same as the workflow owner +# (Tui doesn't do multi-user stuff) +ME = getuser() + + +@contextmanager +def suppress_logging(): + """Suppress Cylc logging. + + Log goes to stdout/err which can pollute Urwid apps. + Patching sys.stdout/err is insufficient so we set the level to something + silly for the duration of this context manager then set it back again + afterwards. + """ + level = LOG.getEffectiveLevel() + LOG.setLevel(99999) + yield + LOG.setLevel(level) + + def get_task_icon( status, *, @@ -113,80 +136,90 @@ def idpop(id_): return tokens.id -def compute_tree(flow): - """Digest GraphQL data to produce a tree. +def compute_tree(data): + """Digest GraphQL data to produce a tree.""" + root_node = add_node('root', 'root', {}, data={}) - Arguments: - flow (dict): - A dictionary representing a single workflow. + for flow in data['workflows']: + nodes = {} + flow_node = add_node( + 'workflow', flow['id'], nodes, data=flow) + root_node['children'].append(flow_node) - Returns: - dict - A top-level workflow node. + # populate cycle nodes + for cycle in flow.get('cyclePoints', []): + cycle['id'] = idpop(cycle['id']) # strip the family off of the id + cycle_node = add_node('cycle', cycle['id'], nodes, data=cycle) + flow_node['children'].append(cycle_node) - """ - nodes = {} - flow_node = add_node( - 'workflow', flow['id'], nodes, data=flow) - - # populate cycle nodes - for cycle in flow['cyclePoints']: - cycle['id'] = idpop(cycle['id']) # strip the family off of the id - cycle_node = add_node('cycle', cycle['id'], nodes, data=cycle) - flow_node['children'].append(cycle_node) - - # populate family nodes - for family in flow['familyProxies']: - add_node('family', family['id'], nodes, data=family) - - # create cycle/family tree - for family in flow['familyProxies']: - family_node = add_node( - 'family', family['id'], nodes) - first_parent = family['firstParent'] - if ( - first_parent - and first_parent['name'] != 'root' - ): - parent_node = add_node( - 'family', first_parent['id'], nodes) - parent_node['children'].append(family_node) - else: - add_node( - 'cycle', idpop(family['id']), nodes - )['children'].append(family_node) - - # add leaves - for task in flow['taskProxies']: - # If there's no first parent, the child will have been deleted - # during/after API query resolution. So ignore. - if not task['firstParent']: - continue - task_node = add_node( - 'task', task['id'], nodes, data=task) - if task['firstParent']['name'] == 'root': - family_node = add_node( - 'cycle', idpop(task['id']), nodes) - else: + # populate family nodes + for family in flow.get('familyProxies', []): + add_node('family', family['id'], nodes, data=family) + + # create cycle/family tree + for family in flow.get('familyProxies', []): family_node = add_node( - 'family', task['firstParent']['id'], nodes) - family_node['children'].append(task_node) - for job in task['jobs']: - job_node = add_node( - 'job', job['id'], nodes, data=job) - job_info_node = add_node( - 'job_info', job['id'] + '_info', nodes, data=job) - job_node['children'] = [job_info_node] - task_node['children'].append(job_node) - - # sort - for (type_, _), node in nodes.items(): - if type_ != 'task': - # NOTE: jobs are sorted by submit-num in the GraphQL query - node['children'].sort( - key=lambda x: NaturalSort(x['id_']) + 'family', family['id'], nodes) + first_parent = family['firstParent'] + if ( + first_parent + and first_parent['name'] != 'root' + ): + parent_node = add_node( + 'family', first_parent['id'], nodes) + parent_node['children'].append(family_node) + else: + add_node( + 'cycle', idpop(family['id']), nodes + )['children'].append(family_node) + + # add leaves + for task in flow.get('taskProxies', []): + # If there's no first parent, the child will have been deleted + # during/after API query resolution. So ignore. + if not task['firstParent']: + continue + task_node = add_node( + 'task', task['id'], nodes, data=task) + if task['firstParent']['name'] == 'root': + family_node = add_node( + 'cycle', idpop(task['id']), nodes) + else: + family_node = add_node( + 'family', task['firstParent']['id'], nodes) + family_node['children'].append(task_node) + for job in task['jobs']: + job_node = add_node( + 'job', job['id'], nodes, data=job) + job_info_node = add_node( + 'job_info', job['id'] + '_info', nodes, data=job) + job_node['children'] = [job_info_node] + task_node['children'].append(job_node) + + # sort + for (type_, _), node in nodes.items(): + if type_ != 'task': + # NOTE: jobs are sorted by submit-num in the GraphQL query + node['children'].sort( + key=lambda x: NaturalSort(x['id_']) + ) + + # spring nodes + if 'port' not in flow: + # the "port" field is only available via GraphQL + # so we are not connected to this workflow yet + flow_node['children'].append( + add_node( + '#spring', + '#spring', + nodes, + data={ + 'id': flow.get('_tui_data', 'Loading ...'), + } + ) ) - return flow_node + return root_node class NaturalSort: @@ -361,20 +394,113 @@ def get_workflow_status_str(flow): list - Text list for the urwid.Text widget. """ - status = flow['status'] + + +def _render_user(node, data): + return f'~{ME}' + + +def _render_job_info(node, data): + key_len = max(len(key) for key in data) + ret = [ + f'{key} {" " * (key_len - len(key))} {value}\n' + for key, value in data.items() + ] + ret[-1] = ret[-1][:-1] # strip trailing newline + return ret + + +def _render_job(node, data): return [ - ( - 'title', - flow['name'], + f'#{data["submitNum"]:02d} ', + get_job_icon(data['state']) + ] + + +def _render_task(node, data): + start_time = None + mean_time = None + try: + # due to sorting this is the most recent job + first_child = node.get_child_node(0) + except IndexError: + first_child = None + + # progress information + if data['state'] == TASK_STATUS_RUNNING and first_child: + start_time = first_child.get_value()['data']['startedTime'] + mean_time = data['task']['meanElapsedTime'] + + # the task icon + ret = get_task_icon( + data['state'], + is_held=data['isHeld'], + is_queued=data['isQueued'], + is_runahead=data['isRunahead'], + start_time=start_time, + mean_time=mean_time + ) + + # the most recent job status + ret.append(' ') + if first_child: + state = first_child.get_value()['data']['state'] + ret += [(f'job_{state}', f'{JOB_ICON}'), ' '] + + # the task name + ret.append(f'{data["name"]}') + return ret + + +def _render_family(node, data): + return [ + get_task_icon( + data['state'], + is_held=data['isHeld'], + is_queued=data['isQueued'], + is_runahead=data['isRunahead'] ), - ' - ', - ( - f'workflow_{status}', - status - ) + ' ', + Tokens(data['id']).pop_token()[1] ] +def _render_unknown(node, data): + try: + state_totals = get_task_status_summary(data) + status = data['status'] + status_msg = [ + ( + 'title', + _display_workflow_id(data), + ), + ' - ', + ( + f'workflow_{status}', + status + ) + ] + except KeyError: + return Tokens(data['id']).pop_token()[1] + + return [*status_msg, *state_totals] + + +def _display_workflow_id(data): + return data['name'] + + +RENDER_FUNCTIONS = { + 'user': _render_user, + 'root': _render_user, + 'job_info': _render_job_info, + 'job': _render_job, + 'task': _render_task, + 'cycle': _render_family, + 'family': _render_family, +} + + def render_node(node, data, type_): """Render a tree node as text. @@ -387,68 +513,7 @@ def render_node(node, data, type_): The node type (e.g. `task`, `job`, `family`). """ - if type_ == 'job_info': - key_len = max(len(key) for key in data) - ret = [ - f'{key} {" " * (key_len - len(key))} {value}\n' - for key, value in data.items() - ] - ret[-1] = ret[-1][:-1] # strip trailing newline - return ret - - if type_ == 'job': - return [ - f'#{data["submitNum"]:02d} ', - get_job_icon(data['state']) - ] - - if type_ == 'task': - start_time = None - mean_time = None - try: - # due to sorting this is the most recent job - first_child = node.get_child_node(0) - except IndexError: - first_child = None - - # progress information - if data['state'] == TASK_STATUS_RUNNING and first_child: - start_time = first_child.get_value()['data']['startedTime'] - mean_time = data['task']['meanElapsedTime'] - - # the task icon - ret = get_task_icon( - data['state'], - is_held=data['isHeld'], - is_queued=data['isQueued'], - is_runahead=data['isRunahead'], - start_time=start_time, - mean_time=mean_time - ) - - # the most recent job status - ret.append(' ') - if first_child: - state = first_child.get_value()['data']['state'] - ret += [(f'job_{state}', f'{JOB_ICON}'), ' '] - - # the task name - ret.append(f'{data["name"]}') - return ret - - if type_ in ['family', 'cycle']: - return [ - get_task_icon( - data['state'], - is_held=data['isHeld'], - is_queued=data['isQueued'], - is_runahead=data['isRunahead'] - ), - ' ', - Tokens(data['id']).pop_token()[1] - ] - - return Tokens(data['id']).pop_token()[1] + return RENDER_FUNCTIONS.get(type_, _render_unknown)(node, data) PARTS = [ @@ -476,9 +541,18 @@ def extract_context(selection): {'user': ['a'], 'workflow': ['b'], 'cycle': ['c'], 'task': ['d'], 'job': ['e']} + >>> list(extract_context(['root']).keys()) + ['user'] + """ ret = {} for item in selection: + if item == 'root': + # special handling for the Tui "root" node + # (this represents the workflow owner which is always the same as + # user for Tui) + ret['user'] = ME + continue tokens = Tokens(item) for key, value in tokens.items(): if ( diff --git a/tests/integration/tui/conftest.py b/tests/integration/tui/conftest.py new file mode 100644 index 00000000000..0b9315b165a --- /dev/null +++ b/tests/integration/tui/conftest.py @@ -0,0 +1,186 @@ +from contextlib import contextmanager +from difflib import unified_diff +import os +from pathlib import Path +import re +from time import sleep + +import pytest + +from cylc.flow.tui.app import TuiApp +from cylc.flow.scripts.tui import configure_screenshot + + +SCREENSHOT_DIR = Path(__file__).parent / 'screenshots' + + +class RaikuraSession: + """Convenience class for accessing Raikura functionality.""" + + def __init__(self, app, html_fragment, test_dir, test_name): + self.app = app + self.html_fragment = html_fragment + self.test_dir = test_dir + self.test_name = test_name + + def user_input(self, *keys): + """Simulate a user pressing keys. + + Each "key" is a keyboard button e.g. "x" or "enter". + + If you provide more than one key, each one will be pressed, one + after another. + + You can combine keys in a single string, e.g. "ctrl d". + """ + return self.app.loop.process_input(keys) + + def compare_screenshot(self, name, retries=5, delay=0.1): + """Take a screenshot and compare it to one taken previously. + + To update the screenshot, set the environment variable + "CYLC_UPDATE_SCREENSHOTS" to "true". + + Arguments: + name: + The name to use for the screenshot, this is used in the + filename for the generated HTML fragment. + retries: + The maximum number of retries for this test before failing. + delay: + The delay between retries. This helps overcome timing issues + with data provision. + + Raises: + Exception: + If the screenshot does not match the reference. + + """ + filename = SCREENSHOT_DIR / f'{self.test_name}.{name}.html' + + exc = None + for _try in range(retries): + self.app.loop.draw_screen() + screenshot = self.html_fragment.screenshot_collect()[-1] + try: + expected = '' + if filename.exists(): + with open(filename, 'r') as expected_file: + expected = expected_file.read() + + if expected != screenshot: + raise Exception( + f'Screenshot "{filename}" does not match.' + ' Set "CYLC_UPDATE_SCREENSHOTS=true" to update.\n\n' + + '\n'.join( + unified_diff( + expected.splitlines(), + screenshot.splitlines(), + fromfile='expected', + tofile='got', + ) + ) + ) + + break + except Exception as exc_: + exc = exc_ + sleep(delay) + else: + if os.environ.get('CYLC_UPDATE_SCREENSHOTS', '').lower() == 'true': + with open(filename, 'w+') as expected_file: + expected_file.write(screenshot) + else: + raise exc + + def force_update(self): + """Run Tui's update method. + + DON'T CLOBBER THIS METHOD! + + With Raikura, the Tui event loop is not running so the data is never + refreshed. + + You do NOT need to call this method for key presses, but you do need to + call this if the data has changed (e.g. if you've changed a task state) + OR if you've changed any filters (because filters are handled by the + update code). + + """ + while not self.app.update(): + pass + + def wait_until_loaded(self, *ids): + """Wait until the given ID appears in the Tui tree. + + Useful for waiting whilst Tui loads a workflow. + """ + return self.app.wait_until_loaded(*ids) + + +@pytest.fixture +def raikura(test_dir, request, monkeypatch): + """Visual regression test framework for Urwid apps. + + Like Cypress but for Tui so named after a NZ island with lots of Tuis. + + When called this yields a RaikuraSession object loaded with test + utilities. All tests have default retries to avoid flaky tests. + + Similar to the "start" fixture, which starts a Scheduler without running + the main loop, raikura starts Tui without running the main loop. + + Arguments: + workflow_id: + The "WORKFLOW" argument of the "cylc tui" command line. + single_workflow: + The "-s" option of the "cylc tui" command line. + size: + The virtual terminal size for screenshots as a comma + separated string e.g. "80,50" for 80 cols wide by 50 rows tall. + + Returns: + A RaikuraSession context manager which provides useful utilities for + testing. + + """ + # make the workflow and scan update intervals match (more reliable) + # and speed things up a little whilst we're at it + monkeypatch.setattr( + 'cylc.flow.tui.updater.Updater.BASE_UPDATE_INTERVAL', + 0.5, + ) + monkeypatch.setattr( + 'cylc.flow.tui.updater.Updater.BASE_SCAN_INTERVAL', + 0.5, + ) + + # the user name and the prefix of workflow IDs are both variable + # so we patch the render functions to make test output stable + monkeypatch.setattr('cylc.flow.tui.util.ME', 'cylc') + monkeypatch.setattr( + 'cylc.flow.tui.util._display_workflow_id', + lambda data: data['name'].rsplit('/', 1)[-1] + ) + + # filter Tui so that only workflows created within our test show up + id_base = str(test_dir.relative_to(Path("~/cylc-run").expanduser())) + workflow_filter = re.escape(id_base) + r'/.*' + + @contextmanager + def _raikura(workflow_id=None, single_workflow=False, size='80,50'): + screen, html_fragment = configure_screenshot(size) + app = TuiApp(screen=screen) + with app.start_noninteractive( + workflow_id, + single_workflow, + id_filter=workflow_filter, + ): + yield RaikuraSession( + app, + html_fragment, + test_dir, + request.function.__name__, + ) + + return _raikura diff --git a/tests/integration/tui/test_app.py b/tests/integration/tui/test_app.py new file mode 100644 index 00000000000..de16f72063f --- /dev/null +++ b/tests/integration/tui/test_app.py @@ -0,0 +1,200 @@ +from cylc.flow.cycling.integer import IntegerPoint +# from cylc.flow.task_state import ( +# TASK_STATUS_RUNNING, +# TASK_STATUS_SUCCEEDED, +# TASK_STATUS_FAILED, +# TASK_STATUS_WAITING, +# ) +from cylc.flow.workflow_status import StopMode + + +def set_task_state(schd, task_states): + """Force tasks into the desired states. + + Task states should be of the format (cycle, task, state, is_held). + """ + for cycle, task, state, is_held in task_states: + itask = schd.pool.get_task(cycle, task) + if not itask: + itask = schd.pool.spawn_task(task, cycle, {1}) + itask.state_reset(state, is_held=is_held) + schd.data_store_mgr.delta_task_state(itask) + schd.data_store_mgr.increment_graph_window( + itask.tokens, + cycle, + {1}, + ) + + +async def test_tui_basics(raikura): + """Test basic Tui interaction with no workflows.""" + with raikura(size='80,40') as rk: + # the app should open + rk.compare_screenshot('test-raikura') + + # "h" should bring up the onscreen help + rk.user_input('h') + rk.compare_screenshot('test-raikura-help') + + # "q" should close the popup + rk.user_input('q') + rk.compare_screenshot('test-raikura') + + # "enter" should bring up the context menu + rk.user_input('enter') + rk.compare_screenshot('test-raikura-enter') + + # "enter" again should close it via the "cancel" button + rk.user_input('enter') + rk.compare_screenshot('test-raikura') + + +async def test_single_workflow(one_conf, flow, scheduler, start, raikura): + """Test a simple workflow with one task.""" + id_ = flow(one_conf, name='one') + schd = scheduler(id_) + async with start(schd): + await schd.update_data_structure() + with raikura(size='80,15') as rk: + rk.compare_screenshot('simple') + + +async def test_workflow_states(one_conf, flow, scheduler, start, raikura): + """Test viewing multiple workflows in different states.""" + # one => stopping + id_1 = flow(one_conf, name='one') + schd_1 = scheduler(id_1) + # two => paused + id_2 = flow(one_conf, name='two') + schd_2 = scheduler(id_2) + # tre => running + flow(one_conf, name='tre') + + async with start(schd_1): + schd_1.stop_mode = StopMode.AUTO # make it look like we're stopping + await schd_1.update_data_structure() + + async with start(schd_2): + await schd_2.update_data_structure() + with raikura(size='80,15') as rk: + rk.compare_screenshot('unfiltered') + + # filter out paused workflows + rk.user_input('W', 'down', 'enter', 'q') + rk.force_update() + rk.compare_screenshot('filter-not-paused') + + # invert the filter so we are filtering for paused workflows + rk.user_input('W', 'enter', 'q') + rk.force_update() + rk.compare_screenshot('filter-paused') + + +# TODO: Task state filtering is currently broken +# see: https://github.com/cylc/cylc-flow/issues/5716 +# +# async def test_task_states(flow, scheduler, start, raikura): +# id_ = flow({ +# 'scheduler': { +# 'allow implicit tasks': 'true', +# }, +# 'scheduling': { +# 'initial cycle point': '1', +# 'cycling mode': 'integer', +# 'runahead limit': 'P1', +# 'graph': { +# 'P1': ''' +# a => b => c +# b[-P1] => b +# ''' +# } +# } +# }, name='test_task_states') +# schd = scheduler(id_) +# async with start(schd): +# set_task_state( +# schd, +# [ +# (IntegerPoint('1'), 'a', TASK_STATUS_SUCCEEDED, False), +# # (IntegerPoint('1'), 'b', TASK_STATUS_FAILED, False), +# (IntegerPoint('1'), 'c', TASK_STATUS_RUNNING, False), +# # (IntegerPoint('2'), 'a', TASK_STATUS_RUNNING, False), +# (IntegerPoint('2'), 'b', TASK_STATUS_WAITING, True), +# ] +# ) +# await schd.update_data_structure() +# +# with raikura(schd.tokens.id, size='80,20') as rk: +# rk.compare_screenshot('unfiltered') +# +# # filter out waiting tasks +# rk.user_input('T', 'down', 'enter', 'q') +# rk.force_update() +# rk.compare_screenshot('filter-not-waiting') + + +async def test_navigation(flow, scheduler, start, raikura): + """Test navigating with the arrow keys.""" + id_ = flow({ + 'scheduling': { + 'graph': { + 'R1': 'A & B1 & B2', + } + }, + 'runtime': { + 'A': {}, + 'B': {}, + 'B1': {'inherit': 'B'}, + 'B2': {'inherit': 'B'}, + 'a1': {'inherit': 'A'}, + 'a2': {'inherit': 'A'}, + 'b11': {'inherit': 'B1'}, + 'b12': {'inherit': 'B1'}, + 'b21': {'inherit': 'B2'}, + 'b22': {'inherit': 'B2'}, + } + }, name='one') + schd = scheduler(id_) + async with start(schd): + await schd.update_data_structure() + + with raikura(size='80,30') as rk: + # the workflow should be collapsed when Tui is loaded + rk.compare_screenshot('on-load') + + # pressing "right" should connect to the workflow + # and expand it once the data arrives + rk.user_input('down', 'right') + rk.wait_until_loaded(schd.tokens.id) + rk.force_update() + rk.compare_screenshot('workflow-expanded') + + # pressing "left" should collapse the node + rk.user_input('down', 'down', 'left') + rk.compare_screenshot('family-A-collapsed') + + # the "page up" and "page down" buttons should navigate to the top + # and bottom of the screen + rk.user_input('page down') + rk.compare_screenshot('cursor-at-bottom-of-screen') + + +# async def test_restart_reconnect(one_conf, flow, scheduler, start, raikura): +# with raikura(size='80,20') as rk: +# schd = scheduler(flow(one_conf, name='one')) +# async with start(schd): +# await schd.update_data_structure() +# rk.force_update() +# rk.user_input('down', 'right') +# rk.wait_until_loaded(schd.tokens.id) +# rk.force_update() +# rk.compare_screenshot('workflow-running') +# +# rk.force_update() +# rk.compare_screenshot('workflow-stopped') +# +# schd = scheduler(flow(one_conf)) +# async with start(schd): +# await schd.update_data_structure() +# rk.force_update() +# rk.compare_screenshot('workflow-restarted') diff --git a/tests/integration/tui/test_updater.py b/tests/integration/tui/test_updater.py new file mode 100644 index 00000000000..a0e2dde275a --- /dev/null +++ b/tests/integration/tui/test_updater.py @@ -0,0 +1,278 @@ +#!/usr/bin/env python3 +# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE. +# Copyright (C) NIWA & British Crown (Met Office) & Contributors. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +import asyncio +from copy import deepcopy +from pathlib import Path +import re + +from async_timeout import timeout + +from cylc.flow.cycling.integer import IntegerPoint +from cylc.flow.id import Tokens +from cylc.flow.tui.updater import ( + Updater, + get_default_filters, +) +from cylc.flow.workflow_status import WorkflowStatus + + +async def await_update(updater): + """Wait for the latest update from the Updater. + + Note: + If multiple updates are waiting, this returns the most recent.Q + + Returns: + The latest update from the Updater's "update_queue". + + """ + while updater.update_queue.empty(): + await asyncio.sleep(0.1) + while not updater.update_queue.empty(): + # flush out any older updates to avoid race conditions in tests + update = updater.update_queue.get() + return update + + +async def wait_workflow_connected(updater, tokens, connected=True, time=3): + """Wait for the Updater to connect to a workflow. + + This will return once the updater has connected to a workflow and returned + the first data from it. + + Arugments: + tokens: + The tokens of the workflow you're waiting for. + connected: + If True this waits for the updater to connect to the workflow, if + False it waits for the updater to disconnect from it. + time: + The maximum time to wait for this to happen. + + Returns: + The first update from the Updater which contains the workflow data. + + """ + async with timeout(time): + while True: + root_node = await await_update(updater) + workflow_node = root_node['children'][0] + for workflow_node in root_node['children']: + if ( + workflow_node['id_'] == tokens.id + and ( + workflow_node['children'][0]['id_'] != '#spring' + ) == connected + ): + # if the spring node is still there then we haven't + # recieved the first update from the workflow yet + return root_node + + +def get_child_tokens(root_node, types, relative=False): + """Return all ID of the specified types contained within the provided tree. + + Args: + root_node: + The Tui tree you want to look for IDs in. + types: + The Tui types (e.g. 'workflow' or 'task') you want to extract. + relative: + If True, the relative IDs will be returned. + + """ + ret = set() + stack = [root_node] + while stack: + node = stack.pop() + stack.extend(node['children']) + if node['type_'] in types: + + tokens = Tokens(node['id_']) + if relative: + ret.add(tokens.relative_id) + else: + ret.add(tokens.id) + return ret + + +async def test_subscribe(one_conf, flow, scheduler, run, test_dir): + """It should subscribe and unsubscribe from workflows.""" + id_ = flow(one_conf) + schd = scheduler(id_) + + updater = Updater() + + async def the_test(): + nonlocal updater + + # wait for the first update + root_node = await await_update(updater) + + # there should be a root root_node + assert root_node['id_'] == 'root' + # a single root_node representing the workflow + assert root_node['children'][0]['id_'] == schd.tokens.id + # and a "spring" root_node used to active the subscription mechanism + assert root_node['children'][0]['children'][0]['id_'] == '#spring' + + # subscribe to the workflow + updater.subscribe(schd.tokens.id) + + # wait for it to connect to the workflow + root_node = await wait_workflow_connected(updater, schd.tokens) + + # check the workflow contains one cycle with one task in it + workflow_node = root_node['children'][0] + assert len(workflow_node['children']) == 1 + cycle_node = workflow_node['children'][0] + assert Tokens(cycle_node['id_']).relative_id == '1' # cycle ID + assert len(cycle_node['children']) == 1 + task_node = cycle_node['children'][0] + assert Tokens(task_node['id_']).relative_id == '1/one' # task ID + + # unsubscribe from the workflow + updater.unsubscribe(schd.tokens.id) + + # wait for it to disconnect from the workflow + root_node = await wait_workflow_connected( + updater, + schd.tokens, + connected=False, + ) + + # shut down the updater + updater.terminate() + + async with run(schd): + filters = get_default_filters() + # (test_dir.relative_to(Path("~/cylc-run").expanduser()) + filters['workflows']['id'] = f'{re.escape(str(test_dir.relative_to(Path("~/cylc-run").expanduser())))}/.*' + + # run the updater and the test + await asyncio.gather( + asyncio.create_task(updater.run(filters)), + asyncio.create_task(the_test()), + ) + + +async def test_filters(one_conf, flow, scheduler, run, test_dir): + """It should filter workflow and task states. + + Note: + The workflow ID filter is not explicitly tested here, but it is + indirectly tested, otherwise other workflows would show up in the + updater results. + + """ + one = scheduler(flow({ + 'scheduler': { + 'allow implicit tasks': 'True', + }, + 'scheduling': { + 'graph': { + 'R1': 'a & b & c', + } + } + }, name='one'), paused_start=True) + two = scheduler(flow(one_conf, name='two')) + tre = scheduler(flow(one_conf, name='tre')) + + filters = get_default_filters() + id_base = str(test_dir.relative_to(Path("~/cylc-run").expanduser())) + filters['workflows']['id'] = f'{re.escape(id_base)}/.*' + + updater = Updater() + + async def the_test(): + nonlocal filters + root_node = await await_update(updater) + assert {child['id_'] for child in root_node['children']} == { + one.tokens.id, + two.tokens.id, + tre.tokens.id, + } + + # filter out paused workflows + filters = deepcopy(filters) + filters['workflows'][WorkflowStatus.STOPPED.value] = True + filters['workflows'][WorkflowStatus.PAUSED.value] = False + updater.update_filters(filters) + + # "one" and "two" should now be filtered out + root_node = await await_update(updater) + assert {child['id_'] for child in root_node['children']} == { + tre.tokens.id, + } + + # filter out stopped workflows + filters = deepcopy(filters) + filters['workflows'][WorkflowStatus.STOPPED.value] = False + filters['workflows'][WorkflowStatus.PAUSED.value] = True + updater.update_filters(filters) + + # "tre" should now be filtered out + root_node = await await_update(updater) + assert {child['id_'] for child in root_node['children']} == { + one.tokens.id, + two.tokens.id, + } + + # subscribe to "one" + updater.subscribe(one.tokens.id) + root_node = await wait_workflow_connected(updater, one.tokens) + assert get_child_tokens(root_node, types={'task'}, relative=True) == { + '1/a', + '1/b', + '1/c', + } + + # filter out running tasks + # TODO: see https://github.com/cylc/cylc-flow/issues/5716 + # filters = deepcopy(filters) + # filters['tasks'][TASK_STATUS_RUNNING] = False + # updater.update_filters(filters) + + # root_node = await await_update(updater) + # assert get_child_tokens( + # root_node, + # types={'task'}, + # relative=True + # ) == { + # '1/b', + # '1/c', + # } + + # shut down the updater + updater.terminate() + + # start workflow "one" + async with run(one): + # mark "1/a" as running and "1/b" as succeeded + one_a = one.pool.get_task(IntegerPoint('1'), 'a') + one_a.state_reset('running') + one.data_store_mgr.delta_task_state(one_a) + one.pool.get_task(IntegerPoint('1'), 'b').state_reset('succeeded') + + # start workflow "two" + async with run(two): + # run the updater and the test + await asyncio.gather( + asyncio.create_task(the_test()), + asyncio.create_task(updater.run(filters)), + ) diff --git a/tests/unit/tui/test_data.py b/tests/unit/tui/test_data.py index a2d17bf2e76..85805a5d1ea 100644 --- a/tests/unit/tui/test_data.py +++ b/tests/unit/tui/test_data.py @@ -28,7 +28,7 @@ def test_generate_mutation(monkeypatch): monkeypatch.setattr(cylc.flow.tui.data, 'ARGUMENT_TYPES', arg_types) assert generate_mutation( 'my_mutation', - ['foo', 'bar'] + {'foo': 'foo', 'bar': 'bar', 'user': 'user'} ) == ''' mutation($foo: String!, $bar: [Int]) { my_mutation (foos: $foo, bars: $bar) { diff --git a/tests/unit/tui/test_overlay.py b/tests/unit/tui/test_overlay.py index 42334aac009..18e3c87daab 100644 --- a/tests/unit/tui/test_overlay.py +++ b/tests/unit/tui/test_overlay.py @@ -39,6 +39,7 @@ def overlay_functions(): getattr(cylc.flow.tui.overlay, obj.name) for obj in tree.body if isinstance(obj, ast.FunctionDef) + and not obj.name.startswith('_') ] @@ -47,14 +48,14 @@ def test_interface(overlay_functions): for function in overlay_functions: # mock up an app object to keep things working app = Mock( - filter_states={}, + filters={'tasks': {}, 'workflows': {'id': '.*'}}, tree_walker=Mock( get_focus=Mock( return_value=[ Mock( get_node=Mock( return_value=Mock( - get_value=lambda: {'id_': 'a'} + get_value=lambda: {'id_': '~u/a'} ) ) ) diff --git a/tests/unit/tui/test_util.py b/tests/unit/tui/test_util.py index 00ac9fa95be..2b3231e0f7e 100644 --- a/tests/unit/tui/test_util.py +++ b/tests/unit/tui/test_util.py @@ -189,77 +189,87 @@ def test_compute_tree(): """ tree = compute_tree({ - 'id': 'workflow id', - 'cyclePoints': [ - { - 'id': '1/family-suffix', - 'cyclePoint': '1' - } - ], - 'familyProxies': [ - { # top level family - 'name': 'FOO', - 'id': '1/FOO', - 'cyclePoint': '1', - 'firstParent': {'name': 'root', 'id': '1/root'} - }, - { # nested family - 'name': 'FOOT', - 'id': '1/FOOT', - 'cyclePoint': '1', - 'firstParent': {'name': 'FOO', 'id': '1/FOO'} - }, - ], - 'taskProxies': [ - { # top level task - 'name': 'pub', - 'id': '1/pub', - 'firstParent': {'name': 'root', 'id': '1/root'}, - 'cyclePoint': '1', - 'jobs': [] - }, - { # child task (belongs to family) - 'name': 'fan', - 'id': '1/fan', - 'firstParent': {'name': 'fan', 'id': '1/fan'}, - 'cyclePoint': '1', - 'jobs': [] - }, - { # nested child task (belongs to incestuous family) - 'name': 'fool', - 'id': '1/fool', - 'firstParent': {'name': 'FOOT', 'id': '1/FOOT'}, - 'cyclePoint': '1', - 'jobs': [] - }, - { # a task which has jobs - 'name': 'worker', - 'id': '1/worker', - 'firstParent': {'name': 'root', 'id': '1/root'}, - 'cyclePoint': '1', - 'jobs': [ - {'id': '1/worker/03', 'submitNum': '3'}, - {'id': '1/worker/02', 'submitNum': '2'}, - {'id': '1/worker/01', 'submitNum': '1'} - ] - } - ] + 'workflows': [{ + 'id': 'workflow id', + 'port': 1234, + 'cyclePoints': [ + { + 'id': '1/family-suffix', + 'cyclePoint': '1' + } + ], + 'familyProxies': [ + { # top level family + 'name': 'FOO', + 'id': '1/FOO', + 'cyclePoint': '1', + 'firstParent': {'name': 'root', 'id': '1/root'} + }, + { # nested family + 'name': 'FOOT', + 'id': '1/FOOT', + 'cyclePoint': '1', + 'firstParent': {'name': 'FOO', 'id': '1/FOO'} + }, + ], + 'taskProxies': [ + { # top level task + 'name': 'pub', + 'id': '1/pub', + 'firstParent': {'name': 'root', 'id': '1/root'}, + 'cyclePoint': '1', + 'jobs': [] + }, + { # child task (belongs to family) + 'name': 'fan', + 'id': '1/fan', + 'firstParent': {'name': 'fan', 'id': '1/fan'}, + 'cyclePoint': '1', + 'jobs': [] + }, + { # nested child task (belongs to incestuous family) + 'name': 'fool', + 'id': '1/fool', + 'firstParent': {'name': 'FOOT', 'id': '1/FOOT'}, + 'cyclePoint': '1', + 'jobs': [] + }, + { # a task which has jobs + 'name': 'worker', + 'id': '1/worker', + 'firstParent': {'name': 'root', 'id': '1/root'}, + 'cyclePoint': '1', + 'jobs': [ + {'id': '1/worker/03', 'submitNum': '3'}, + {'id': '1/worker/02', 'submitNum': '2'}, + {'id': '1/worker/01', 'submitNum': '1'} + ] + } + ] + }] }) + # the root node + assert tree['type_'] == 'root' + assert tree['id_'] == 'root' + assert len(tree['children']) == 1 + # the workflow node - assert tree['type_'] == 'workflow' - assert tree['id_'] == 'workflow id' - assert list(tree['data']) == [ + workflow = tree['children'][0] + assert workflow['type_'] == 'workflow' + assert workflow['id_'] == 'workflow id' + assert set(workflow['data']) == { # whatever if present on the node should end up in data - 'id', 'cyclePoints', 'familyProxies', + 'id', + 'port', 'taskProxies' - ] - assert len(tree['children']) == 1 + } + assert len(workflow['children']) == 1 # the cycle point node - cycle = tree['children'][0] + cycle = workflow['children'][0] assert cycle['type_'] == 'cycle' assert cycle['id_'] == '//1' assert list(cycle['data']) == [