From 55882bb0f3cf1a22289780065ead39c99b8ddec0 Mon Sep 17 00:00:00 2001 From: John Chilton Date: Wed, 10 Oct 2018 11:43:33 -0400 Subject: [PATCH] Track workflow step input definitions in our model. We don't track workflow step inputs in any formal way in our model currently. This has resulted in some current hacks and prevents future enhancements. This commit splits WorkflowStepConnection into two models WorkflowStepInput and WorkflowStepConnection - normalizing the previous table workflow_step_connection on input step and input name. In terms of current hacks forced on it by restricting all of tool state to be confined to a big JSON blob in the database - we have problems distinguishing keys and values when walking tool state. As we store more and more JSON blobs inside of the giant tool state blob - the worse this problem gets. Take for instance checking for runtime parameters or the rules parameter values - these both use JSON blobs that aren't simple values, so it is hard to tell looking at the tool state blob in the database or the workflow export to tell what is a key or what is a value. Tracking state as normalized inputs with default values and explicit attributes runtime values should allow much more percise state definition and construction. This variant of the models would also potentially allow defining runtime values with non-tool default values (so default values defined for the workflow but still explicitly settable at runtime). The combinations of overriding defaults and defining runtime values were not representable before. In terms of future enhancements, there is a lot we cannot track with the current models - such as map/reduce options for collection operations (https://github.com/galaxyproject/galaxy/issues/4623#issuecomment-389544980). This should enable a lot of that. Obviously there are a lot of attributes defined here that are not yet utilized, but I'm using most (all?) of them downstream in the CWL branch. I'd rather populate this table fully realized and fill in the implementation around it as work continues to stream in from the CWL branch - to keep things simple and avoid extra database migrations. But I understand if this feels like speculative complexity we want to avoid despite the implementation being readily available for inspection downstream. --- lib/galaxy/managers/workflows.py | 6 +- lib/galaxy/model/__init__.py | 73 ++++++++++-- lib/galaxy/model/mapping.py | 29 ++++- .../0136_collection_and_workflow_state.py | 8 -- .../versions/0145_add_workflow_step_input.py | 105 ++++++++++++++++++ lib/galaxy/workflow/extract.py | 4 +- lib/tool_shed/util/workflow_util.py | 4 +- test/unit/workflows/test_modules.py | 27 +++-- test/unit/workflows/test_render.py | 20 ++-- test/unit/workflows/test_workflow_progress.py | 47 ++++---- test/unit/workflows/workflow_support.py | 36 +++--- 11 files changed, 275 insertions(+), 84 deletions(-) create mode 100644 lib/galaxy/model/migrate/versions/0145_add_workflow_step_input.py diff --git a/lib/galaxy/managers/workflows.py b/lib/galaxy/managers/workflows.py index 319b01dc8726..fbf5735c30e8 100644 --- a/lib/galaxy/managers/workflows.py +++ b/lib/galaxy/managers/workflows.py @@ -1044,14 +1044,16 @@ def __connect_workflow_steps(self, steps, steps_by_external_id): continue if not isinstance(conn_list, list): # Older style singleton connection conn_list = [conn_list] + for conn_dict in conn_list: + step_input = step.get_or_add_input(input_name) + if 'output_name' not in conn_dict or 'id' not in conn_dict: template = "Invalid connection [%s] - must be dict with output_name and id fields." message = template % conn_dict raise exceptions.MessageException(message) conn = model.WorkflowStepConnection() - conn.input_step = step - conn.input_name = input_name + conn.input_step_input = step_input conn.output_name = conn_dict['output_name'] external_id = conn_dict['id'] if external_id not in steps_by_external_id: diff --git a/lib/galaxy/model/__init__.py b/lib/galaxy/model/__init__.py index 2f1ac406e549..5bcaf663a382 100644 --- a/lib/galaxy/model/__init__.py +++ b/lib/galaxy/model/__init__.py @@ -4058,13 +4058,31 @@ def __init__(self): self.tool_inputs = None self.tool_errors = None self.position = None - self.input_connections = [] + self.inputs = [] self.config = None self.label = None self.uuid = uuid4() self.workflow_outputs = [] self._input_connections_by_name = None + def get_or_add_input(self, input_name): + for step_input in self.inputs: + if step_input.name == input_name: + return step_input + + step_input = WorkflowStepInput() + step_input.workflow_step = self + step_input.name = input_name + return step_input + + @property + def input_connections(self): + connections = [] + for step_input in self.inputs: + for connection in step_input.connections: + connections.append(connection) + return connections + @property def unique_workflow_outputs(self): # Older Galaxy workflows may have multiple WorkflowOutputs @@ -4138,7 +4156,7 @@ def copy_to(self, copied_step, step_mapping): copied_step.position = self.position copied_step.config = self.config copied_step.label = self.label - copied_step.input_connections = copy_list(self.input_connections) + copied_step.inputs = copy_list(self.inputs) subworkflow_step_mapping = {} subworkflow = self.subworkflow @@ -4149,8 +4167,7 @@ def copy_to(self, copied_step, step_mapping): subworkflow_step_mapping[subworkflow_step.id] = copied_subworkflow_step for old_conn, new_conn in zip(self.input_connections, copied_step.input_connections): - # new_conn.input_step = new_ - new_conn.input_step = step_mapping[old_conn.input_step_id] + new_conn.input_step_input = copied_step.get_or_add_input(old_conn.input_name) new_conn.output_step = step_mapping[old_conn.output_step_id] if old_conn.input_subworkflow_step_id: new_conn.input_subworkflow_step = subworkflow_step_mapping[old_conn.input_subworkflow_step_id] @@ -4165,6 +4182,35 @@ def log_str(self): return "WorkflowStep[index=%d,type=%s]" % (self.order_index, self.type) +class WorkflowStepInput(object): + default_merge_type = None + default_scatter_type = None + + def __init__(self): + self.id = None + self.name = None + self.default_value = None + self.default_value_set = False + self.merge_type = self.default_merge_type + self.scatter_type = self.default_scatter_type + + def copy(self): + copied_step_input = WorkflowStepInput() + copied_step_input.name = self.name + copied_step_input.default_value = self.default_value + copied_step_input.default_value_set = self.default_value_set + copied_step_input.merge_type = self.merge_type + copied_step_input.scatter_type = self.scatter_type + + copied_step_input.connections = copy_list(self.connections) + return copied_step_input + + def log_str(self): + return "WorkflowStepInput[name=%s]" % ( + self.name, + ) + + class WorkflowStepConnection(object): # Constant used in lieu of output_name and input_name to indicate an # implicit connection between two steps that is not dependent on a dataset @@ -4176,23 +4222,34 @@ class WorkflowStepConnection(object): def __init__(self): self.output_step_id = None self.output_name = None - self.input_step_id = None - self.input_name = None + self.input_step_input_id = None def set_non_data_connection(self): self.output_name = WorkflowStepConnection.NON_DATA_CONNECTION - self.input_name = WorkflowStepConnection.NON_DATA_CONNECTION + raise NotImplementedError("implement adding step here somehow...") @property def non_data_connection(self): return (self.output_name == WorkflowStepConnection.NON_DATA_CONNECTION and self.input_name == WorkflowStepConnection.NON_DATA_CONNECTION) + @property + def input_name(self): + return self.input_step_input.name + + @property + def input_step(self): + return self.input_step_input and self.input_step_input.workflow_step + + @property + def input_step_id(self): + input_step = self.input_step + return input_step and input_step.id + def copy(self): # TODO: handle subworkflow ids... copied_connection = WorkflowStepConnection() copied_connection.output_name = self.output_name - copied_connection.input_name = self.input_name return copied_connection diff --git a/lib/galaxy/model/mapping.py b/lib/galaxy/model/mapping.py index 0fc1203b8411..f5357e8c5ec4 100644 --- a/lib/galaxy/model/mapping.py +++ b/lib/galaxy/model/mapping.py @@ -908,6 +908,21 @@ # Column( "input_connections", JSONType ), Column("label", Unicode(255))) + +model.WorkflowStepInput.table = Table( + "workflow_step_input", metadata, + Column("id", Integer, primary_key=True), + Column("workflow_step_id", Integer, ForeignKey("workflow_step.id"), index=True), + Column("name", Unicode(255)), + Column("merge_type", TEXT), + Column("scatter_type", TEXT), + Column("value_from", JSONType), + Column("value_from_type", TEXT), + Column("default_value", JSONType), + Column("default_value_set", Boolean), + Column("runtime_value", Boolean)) + + model.WorkflowRequestStepState.table = Table( "workflow_request_step_states", metadata, Column("id", Integer, primary_key=True), @@ -953,9 +968,8 @@ "workflow_step_connection", metadata, Column("id", Integer, primary_key=True), Column("output_step_id", Integer, ForeignKey("workflow_step.id"), index=True), - Column("input_step_id", Integer, ForeignKey("workflow_step.id"), index=True), + Column("input_step_input_id", Integer, ForeignKey("workflow_step_input.id"), index=True), Column("output_name", TEXT), - Column("input_name", TEXT), Column("input_subworkflow_step_id", Integer, ForeignKey("workflow_step.id"), index=True), ) @@ -2218,6 +2232,11 @@ def simple_mapping(model, **kwds): backref="workflow_steps") )) +mapper(model.WorkflowStepInput, model.WorkflowStepInput.table, properties=dict( + workflow_step=relation(model.WorkflowStep, + backref="inputs"), +)) + mapper(model.WorkflowOutput, model.WorkflowOutput.table, properties=dict( workflow_step=relation(model.WorkflowStep, backref='workflow_outputs', @@ -2225,10 +2244,10 @@ def simple_mapping(model, **kwds): )) mapper(model.WorkflowStepConnection, model.WorkflowStepConnection.table, properties=dict( - input_step=relation(model.WorkflowStep, - backref="input_connections", + input_step_input=relation(model.WorkflowStepInput, + backref="connections", cascade="all", - primaryjoin=(model.WorkflowStepConnection.table.c.input_step_id == model.WorkflowStep.table.c.id)), + primaryjoin=(model.WorkflowStepConnection.table.c.input_step_input_id == model.WorkflowStepInput.table.c.id)), input_subworkflow_step=relation(model.WorkflowStep, backref=backref("parent_workflow_input_connections", uselist=True), primaryjoin=(model.WorkflowStepConnection.table.c.input_subworkflow_step_id == model.WorkflowStep.table.c.id), diff --git a/lib/galaxy/model/migrate/versions/0136_collection_and_workflow_state.py b/lib/galaxy/model/migrate/versions/0136_collection_and_workflow_state.py index 0a787a5f7bb1..d2597359041a 100644 --- a/lib/galaxy/model/migrate/versions/0136_collection_and_workflow_state.py +++ b/lib/galaxy/model/migrate/versions/0136_collection_and_workflow_state.py @@ -93,14 +93,6 @@ def upgrade(migrate_engine): for table in tables.values(): __create(table) - def nextval(table, col='id'): - if migrate_engine.name in ['postgres', 'postgresql']: - return "nextval('%s_%s_seq')" % (table, col) - elif migrate_engine.name in ['mysql', 'sqlite']: - return "null" - else: - raise Exception("Unhandled database type") - # Set default for creation to scheduled, actual mapping has new as default. workflow_invocation_step_state_column = Column("state", TrimmedString(64), default="scheduled") if migrate_engine.name in ['postgres', 'postgresql']: diff --git a/lib/galaxy/model/migrate/versions/0145_add_workflow_step_input.py b/lib/galaxy/model/migrate/versions/0145_add_workflow_step_input.py new file mode 100644 index 000000000000..bca3259b9db3 --- /dev/null +++ b/lib/galaxy/model/migrate/versions/0145_add_workflow_step_input.py @@ -0,0 +1,105 @@ +""" +Migration script for workflow step input table. +""" +from __future__ import print_function + +import logging + +from sqlalchemy import Boolean, Column, ForeignKey, Integer, MetaData, Table, TEXT + +from galaxy.model.custom_types import JSONType + +log = logging.getLogger(__name__) +metadata = MetaData() + + +def get_new_tables(): + + WorkflowStepInput_table = Table( + "workflow_step_input", metadata, + Column("id", Integer, primary_key=True), + Column("workflow_step_id", Integer, ForeignKey("workflow_step.id"), index=True), + Column("name", TEXT), + Column("merge_type", TEXT), + Column("scatter_type", TEXT), + Column("value_from", JSONType), + Column("value_from_type", TEXT), + Column("default_value", JSONType), + Column("default_value_set", Boolean, default=False), + Column("runtime_value", Boolean, default=False), + ) + + WorkflowStepConnection_table = Table( + "workflow_step_connection", metadata, + Column("id", Integer, primary_key=True), + Column("output_step_id", Integer, ForeignKey("workflow_step.id"), index=True), + Column("input_step_input_id", Integer, ForeignKey("workflow_step_input.id"), index=True), + Column("output_name", TEXT), + Column("input_subworkflow_step_id", Integer, ForeignKey("workflow_step.id"), index=True), + ) + + return [ + WorkflowStepInput_table, WorkflowStepConnection_table + ] + + +def upgrade(migrate_engine): + metadata.bind = migrate_engine + print(__doc__) + metadata.reflect() + + LegacyWorkflowStepConnection_table = Table("workflow_step_connection", metadata, autoload=True) + for index in LegacyWorkflowStepConnection_table.indexes: + index.drop() + LegacyWorkflowStepConnection_table.rename("workflow_step_connection_premigrate144") + # Try to deregister that table to work around some caching problems it seems. + LegacyWorkflowStepConnection_table.deregister() + metadata._remove_table("workflow_step_connection", metadata.schema) + + metadata.reflect() + tables = get_new_tables() + for table in tables: + __create(table) + + insert_step_inputs_cmd = \ + "INSERT INTO workflow_step_input (workflow_step_id, name) " + \ + "SELECT id, input_name FROM workflow_step_connection_premigrate144" + + migrate_engine.execute(insert_step_inputs_cmd) + + # TODO: verify order here. + insert_step_connections_cmd = \ + "INSERT INTO workflow_step_connection (output_step_id, input_step_input_id, output_name, input_subworkflow_step_id) " + \ + "SELECT wsc.output_step_id, wsi.id, wsc.output_name, wsc.input_subworkflow_step_id " + \ + "FROM workflow_step_connection_premigrate144 as wsc left outer join workflow_step_input as wsi on wsc.input_step_id = wsi.workflow_step_id and wsc.input_name = wsi.name ORDER BY wsc.id" + + migrate_engine.execute(insert_step_connections_cmd) + + +def downgrade(migrate_engine): + metadata.bind = migrate_engine + + tables = get_new_tables() + for table in tables: + __drop(table) + + metadata._remove_table("workflow_step_connection", metadata.schema) + metadata.reflect() + + # Drop new workflow invocation step and job association table and restore legacy data. + LegacyWorkflowStepConnection_table = Table("workflow_step_connection_premigrate144", metadata, autoload=True) + LegacyWorkflowStepConnection_table.rename("workflow_step_connection") + + +def __create(table): + try: + table.create() + except Exception: + log.exception("Creating %s table failed.", table.name) + + +def __drop(table): + try: + table.drop() + except Exception: + log.exception("Dropping %s table failed.", table.name) diff --git a/lib/galaxy/workflow/extract.py b/lib/galaxy/workflow/extract.py index 44bdda300a21..bf2f41657871 100644 --- a/lib/galaxy/workflow/extract.py +++ b/lib/galaxy/workflow/extract.py @@ -124,10 +124,10 @@ def extract_steps(trans, history=None, job_ids=None, dataset_ids=None, dataset_c else: log.info("Cannot find implicit input collection for %s" % input_name) if other_hid in hid_to_output_pair: + step_input = step.get_or_add_input(input_name) other_step, other_name = hid_to_output_pair[other_hid] conn = model.WorkflowStepConnection() - conn.input_step = step - conn.input_name = input_name + conn.input_step_input = step_input # Should always be connected to an earlier step conn.output_step = other_step conn.output_name = other_name diff --git a/lib/tool_shed/util/workflow_util.py b/lib/tool_shed/util/workflow_util.py index 0604759c0407..b8297a0374cc 100644 --- a/lib/tool_shed/util/workflow_util.py +++ b/lib/tool_shed/util/workflow_util.py @@ -306,10 +306,10 @@ def get_workflow_from_dict(trans, workflow_dict, tools_metadata, repository_id, # Input connections. for input_name, conn_dict in step.temp_input_connections.items(): if conn_dict: + step_input = step.get_or_add_input(input_name) output_step = steps_by_external_id[conn_dict['id']] conn = trans.model.WorkflowStepConnection() - conn.input_step = step - conn.input_name = input_name + conn.input_step_input = step_input conn.output_step = output_step conn.output_name = conn_dict['output_name'] step.input_connections.append(conn) diff --git a/test/unit/workflows/test_modules.py b/test/unit/workflows/test_modules.py index 1394a39645e8..f382c5c47a45 100644 --- a/test/unit/workflows/test_modules.py +++ b/test/unit/workflows/test_modules.py @@ -166,25 +166,28 @@ def test_tool_version_same(): label: "input2" - type: "tool" tool_id: "cat1" - input_connections: - - input_name: "input1" - "@output_step": 0 - output_name: "output" + inputs: + input1: + connections: + - "@output_step": 0 + output_name: "output" - type: "tool" tool_id: "cat1" - input_connections: - - input_name: "input1" - "@output_step": 0 - output_name: "output" + inputs: + input1: + connections: + - "@output_step": 0 + output_name: "output" workflow_outputs: - output_name: "out_file1" label: "out1" - type: "tool" tool_id: "cat1" - input_connections: - - input_name: "input1" - "@output_step": 2 - output_name: "out_file1" + inputs: + input1: + connections: + - "@output_step": 2 + output_name: "out_file1" workflow_outputs: - output_name: "out_file1" """ diff --git a/test/unit/workflows/test_render.py b/test/unit/workflows/test_render.py index b7afcd28cd16..04ee10c791cf 100644 --- a/test/unit/workflows/test_render.py +++ b/test/unit/workflows/test_render.py @@ -6,28 +6,28 @@ - type: "data_input" order_index: 0 tool_inputs: {"name": "input1"} - input_connections: [] position: {"top": 3, "left": 3} - type: "data_input" order_index: 1 tool_inputs: {"name": "input2"} - input_connections: [] position: {"top": 6, "left": 4} - type: "tool" tool_id: "cat1" order_index: 2 - input_connections: - - input_name: "input1" - "@output_step": 0 - output_name: "di1" + inputs: + input1: + connection: + - "@output_step": 0 + output_name: "di1" position: {"top": 13, "left": 10} - type: "tool" tool_id: "cat1" order_index: 3 - input_connections: - - input_name: "input1" - "@output_step": 0 - output_name: "di1" + inputs: + input1: + connection: + - "@output_step": 0 + output_name: "di1" position: {"top": 33, "left": 103} """ diff --git a/test/unit/workflows/test_workflow_progress.py b/test/unit/workflows/test_workflow_progress.py index 880650d9d0ec..c148fb51e0ab 100644 --- a/test/unit/workflows/test_workflow_progress.py +++ b/test/unit/workflows/test_workflow_progress.py @@ -12,22 +12,25 @@ tool_inputs: {"name": "input2"} - type: "tool" tool_id: "cat1" - input_connections: - - input_name: "input1" - "@output_step": 0 - output_name: "output" + inputs: + "input1": + connections: + - "@output_step": 0 + output_name: "output" - type: "tool" tool_id: "cat1" - input_connections: - - input_name: "input1" - "@output_step": 0 - output_name: "output" + inputs: + input1: + connections: + - "@output_step": 0 + output_name: "output" - type: "tool" tool_id: "cat1" - input_connections: - - input_name: "input1" - "@output_step": 2 - output_name: "out_file1" + inputs: + "input1": + connections: + - "@output_step": 2 + output_name: "out_file1" """ TEST_SUBWORKFLOW_YAML = """ @@ -41,15 +44,17 @@ tool_inputs: {"name": "inner_input"} - type: "tool" tool_id: "cat1" - input_connections: - - input_name: "input1" - "@output_step": 0 - output_name: "output" - input_connections: - - input_name: "inner_input" - "@output_step": 0 - output_name: "output" - "@input_subworkflow_step": 0 + inputs: + "input1": + connections: + - "@output_step": 0 + output_name: "output" + inputs: + inner_input: + connections: + - "@output_step": 0 + output_name: "output" + "@input_subworkflow_step": 0 """ UNSCHEDULED_STEP = object() diff --git a/test/unit/workflows/workflow_support.py b/test/unit/workflows/workflow_support.py index 6f4f567cb66d..1e1392cfd94c 100644 --- a/test/unit/workflows/workflow_support.py +++ b/test/unit/workflows/workflow_support.py @@ -99,20 +99,28 @@ def yaml_to_model(has_dict, id_offset=100): for key, value in step.items(): if key == "input_connections": - connections = [] - for conn_dict in value: - conn = model.WorkflowStepConnection() - for conn_key, conn_value in conn_dict.items(): - if conn_key == "@output_step": - target_step = workflow.steps[conn_value] - conn_value = target_step - conn_key = "output_step" - if conn_key == "@input_subworkflow_step": - conn_value = step["subworkflow"].step_by_index(conn_value) - conn_key = "input_subworkflow_step" - setattr(conn, conn_key, conn_value) - connections.append(conn) - value = connections + raise NotImplementedError() + if key == "inputs": + inputs = [] + for input_name, input_def in value.items(): + step_input = model.WorkflowStepInput() + step_input.name = input_name + connections = [] + for conn_dict in input_def.get("connections", []): + conn = model.WorkflowStepConnection() + for conn_key, conn_value in conn_dict.items(): + if conn_key == "@output_step": + target_step = workflow.steps[conn_value] + conn_value = target_step + conn_key = "output_step" + if conn_key == "@input_subworkflow_step": + conn_value = step["subworkflow"].step_by_index(conn_value) + conn_key = "input_subworkflow_step" + setattr(conn, conn_key, conn_value) + connections.append(conn) + step_input.connections = connections + inputs.append(step_input) + value = inputs if key == "workflow_outputs": value = [partial(_dict_to_workflow_output, workflow_step)(_) for _ in value] setattr(workflow_step, key, value)