Skip to content

Commit

Permalink
Track workflow step input definitions in our model.
Browse files Browse the repository at this point in the history
We don't track workflow step inputs in any formal way in our model currently. This has resulted in some current hacks and prevents future enhancements. This commit splits WorkflowStepConnection into two models WorkflowStepInput and WorkflowStepConnection - normalizing the previous table workflow_step_connection on input step and input name.

In terms of current hacks forced on it by restricting all of tool state to be confined to a big JSON blob in the database - we have problems distinguishing keys and values when walking tool state. As we store more and more JSON blobs inside of the giant tool state blob - the worse this problem gets. Take for instance checking for runtime parameters or the rules parameter values - these both use JSON blobs that aren't simple values, so it is hard to tell looking at the tool state blob in the database or the workflow export to tell what is a key or what is a value. Tracking state as normalized inputs with default values and explicit attributes runtime values should allow much more percise state definition and construction.

This variant of the models would also potentially allow defining runtime values with non-tool default values (so default values defined for the workflow but still explicitly settable at runtime). The combinations of overriding defaults and defining runtime values were not representable before.

In terms of future enhancements, there is a lot we cannot track with the current models - such as map/reduce options for collection operations (galaxyproject#4623 (comment)). This should enable a lot of that. Obviously there are a lot of attributes defined here that are not yet utilized, but I'm using most (all?) of them downstream in the CWL branch. I'd rather populate this table fully realized and fill in the implementation around it as work continues to stream in from the CWL branch - to keep things simple and avoid extra database migrations. But I understand if this feels like speculative complexity we want to avoid despite the implementation being readily available for inspection downstream.
  • Loading branch information
jmchilton committed Nov 6, 2018
1 parent 31465cf commit 8bc9591
Show file tree
Hide file tree
Showing 16 changed files with 448 additions and 105 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ functools32==3.2.3.post2 ; python_version == '2.7'
future==0.16.0
futures==3.2.0 ; python_version == '2.6' or python_version == '2.7'
galaxy-sequence-utils==1.1.3
gxformat2==0.7.1
gxformat2
h5py==2.8.0
idna==2.7
ipaddress==1.0.22 ; python_version < '3.3'
Expand Down
32 changes: 25 additions & 7 deletions lib/galaxy/managers/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -771,6 +771,14 @@ def _workflow_to_dict_export(self, trans, stored=None, workflow=None):
for output in module.get_data_outputs():
step_dict['outputs'].append({'name': output['name'], 'type': output['extensions'][0]})

step_in = {}
for step_input in step.inputs:
if step_input.default_value_set:
step_in[step_input.name] = {"default": step_input.default_value}

if step_in:
step_dict["in"] = step_in

# Connections
input_connections = step.input_connections
if step.type is None or step.type == 'tool':
Expand Down Expand Up @@ -998,6 +1006,18 @@ def __module_from_dict(self, trans, steps, steps_by_external_id, step_dict, **kw
label=label,
)
trans.sa_session.add(m)

if "in" in step_dict:
for input_name, input_dict in step_dict["in"].items():
step_input = step.get_or_add_input(input_name)
NO_DEFAULT_DEFINED = object()
default = input_dict.get("default", NO_DEFAULT_DEFINED)
if default is not NO_DEFAULT_DEFINED:
step_input.default_value = default
step_input.default_value_set = True

step.get_or_add_input(input_name)

return module, step

def __load_subworkflow_from_step_dict(self, trans, step_dict, subworkflow_id_map, **kwds):
Expand Down Expand Up @@ -1041,23 +1061,21 @@ def __connect_workflow_steps(self, steps, steps_by_external_id):
continue
if not isinstance(conn_list, list): # Older style singleton connection
conn_list = [conn_list]

for conn_dict in conn_list:
if 'output_name' not in conn_dict or 'id' not in conn_dict:
template = "Invalid connection [%s] - must be dict with output_name and id fields."
message = template % conn_dict
raise exceptions.MessageException(message)
conn = model.WorkflowStepConnection()
conn.input_step = step
conn.input_name = input_name
conn.output_name = conn_dict['output_name']
external_id = conn_dict['id']
if external_id not in steps_by_external_id:
raise KeyError("Failed to find external id %s in %s" % (external_id, steps_by_external_id.keys()))
conn.output_step = steps_by_external_id[external_id]
output_step = steps_by_external_id[external_id]

output_name = conn_dict["output_name"]
input_subworkflow_step_index = conn_dict.get('input_subworkflow_step_id', None)
if input_subworkflow_step_index is not None:
conn.input_subworkflow_step = step.subworkflow.step_by_index(input_subworkflow_step_index)

step.add_connection(input_name, output_name, output_step, input_subworkflow_step_index)

del step.temp_input_connections

Expand Down
89 changes: 82 additions & 7 deletions lib/galaxy/model/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4058,13 +4058,49 @@ def __init__(self):
self.tool_inputs = None
self.tool_errors = None
self.position = None
self.input_connections = []
self.inputs = []
self.config = None
self.label = None
self.uuid = uuid4()
self.workflow_outputs = []
self._input_connections_by_name = None

def get_input(self, input_name):
for step_input in self.inputs:
if step_input.name == input_name:
return step_input

return None

def get_or_add_input(self, input_name):
step_input = self.get_input(input_name)

if step_input is None:
step_input = WorkflowStepInput(self)
step_input.name = input_name
# self.inputs.append(step_input)
return step_input

def add_connection(self, input_name, output_name, output_step, input_subworkflow_step_index=None):
step_input = self.get_or_add_input(input_name)

conn = WorkflowStepConnection()
conn.input_step_input = step_input
conn.output_name = output_name
conn.output_step = output_step
if input_subworkflow_step_index is not None:
input_subworkflow_step = self.subworkflow.step_by_index(input_subworkflow_step_index)
conn.input_subworkflow_step = input_subworkflow_step
return conn

@property
def input_connections(self):
connections = []
for step_input in self.inputs:
for connection in step_input.connections:
connections.append(connection)
return connections

@property
def unique_workflow_outputs(self):
# Older Galaxy workflows may have multiple WorkflowOutputs
Expand Down Expand Up @@ -4138,7 +4174,7 @@ def copy_to(self, copied_step, step_mapping):
copied_step.position = self.position
copied_step.config = self.config
copied_step.label = self.label
copied_step.input_connections = copy_list(self.input_connections)
copied_step.inputs = copy_list(self.inputs, copied_step)

subworkflow_step_mapping = {}
subworkflow = self.subworkflow
Expand All @@ -4149,8 +4185,7 @@ def copy_to(self, copied_step, step_mapping):
subworkflow_step_mapping[subworkflow_step.id] = copied_subworkflow_step

for old_conn, new_conn in zip(self.input_connections, copied_step.input_connections):
# new_conn.input_step = new_
new_conn.input_step = step_mapping[old_conn.input_step_id]
new_conn.input_step_input = copied_step.get_or_add_input(old_conn.input_name)
new_conn.output_step = step_mapping[old_conn.output_step_id]
if old_conn.input_subworkflow_step_id:
new_conn.input_subworkflow_step = subworkflow_step_mapping[old_conn.input_subworkflow_step_id]
Expand All @@ -4165,6 +4200,35 @@ def log_str(self):
return "WorkflowStep[index=%d,type=%s]" % (self.order_index, self.type)


class WorkflowStepInput(object):
default_merge_type = None
default_scatter_type = None

def __init__(self, workflow_step):
self.workflow_step = workflow_step
self.name = None
self.default_value = None
self.default_value_set = False
self.merge_type = self.default_merge_type
self.scatter_type = self.default_scatter_type

def copy(self, copied_step):
copied_step_input = WorkflowStepInput(copied_step)
copied_step_input.name = self.name
copied_step_input.default_value = self.default_value
copied_step_input.default_value_set = self.default_value_set
copied_step_input.merge_type = self.merge_type
copied_step_input.scatter_type = self.scatter_type

copied_step_input.connections = copy_list(self.connections)
return copied_step_input

def log_str(self):
return "WorkflowStepInput[name=%s]" % (
self.name,
)


class WorkflowStepConnection(object):
# Constant used in lieu of output_name and input_name to indicate an
# implicit connection between two steps that is not dependent on a dataset
Expand All @@ -4176,18 +4240,29 @@ class WorkflowStepConnection(object):
def __init__(self):
self.output_step_id = None
self.output_name = None
self.input_step_id = None
self.input_name = None
self.input_step_input_id = None

@property
def non_data_connection(self):
return (self.output_name == self.input_name == WorkflowStepConnection.NON_DATA_CONNECTION)

@property
def input_name(self):
return self.input_step_input.name

@property
def input_step(self):
return self.input_step_input and self.input_step_input.workflow_step

@property
def input_step_id(self):
input_step = self.input_step
return input_step and input_step.id

def copy(self):
# TODO: handle subworkflow ids...
copied_connection = WorkflowStepConnection()
copied_connection.output_name = self.output_name
copied_connection.input_name = self.input_name
return copied_connection


Expand Down
31 changes: 26 additions & 5 deletions lib/galaxy/model/mapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -908,6 +908,21 @@
# Column( "input_connections", JSONType ),
Column("label", Unicode(255)))


model.WorkflowStepInput.table = Table(
"workflow_step_input", metadata,
Column("id", Integer, primary_key=True),
Column("workflow_step_id", Integer, ForeignKey("workflow_step.id"), index=True),
Column("name", Unicode(255)),
Column("merge_type", TEXT),
Column("scatter_type", TEXT),
Column("value_from", JSONType),
Column("value_from_type", TEXT),
Column("default_value", JSONType),
Column("default_value_set", Boolean),
Column("runtime_value", Boolean))


model.WorkflowRequestStepState.table = Table(
"workflow_request_step_states", metadata,
Column("id", Integer, primary_key=True),
Expand Down Expand Up @@ -953,9 +968,8 @@
"workflow_step_connection", metadata,
Column("id", Integer, primary_key=True),
Column("output_step_id", Integer, ForeignKey("workflow_step.id"), index=True),
Column("input_step_id", Integer, ForeignKey("workflow_step.id"), index=True),
Column("input_step_input_id", Integer, ForeignKey("workflow_step_input.id"), index=True),
Column("output_name", TEXT),
Column("input_name", TEXT),
Column("input_subworkflow_step_id", Integer, ForeignKey("workflow_step.id"), index=True),
)

Expand Down Expand Up @@ -2218,17 +2232,24 @@ def simple_mapping(model, **kwds):
backref="workflow_steps")
))

mapper(model.WorkflowStepInput, model.WorkflowStepInput.table, properties=dict(
workflow_step=relation(model.WorkflowStep,
backref=backref("inputs", uselist=True),
cascade="all",
primaryjoin=(model.WorkflowStep.table.c.id == model.WorkflowStepInput.table.c.workflow_step_id))
))

mapper(model.WorkflowOutput, model.WorkflowOutput.table, properties=dict(
workflow_step=relation(model.WorkflowStep,
backref='workflow_outputs',
primaryjoin=(model.WorkflowStep.table.c.id == model.WorkflowOutput.table.c.workflow_step_id))
))

mapper(model.WorkflowStepConnection, model.WorkflowStepConnection.table, properties=dict(
input_step=relation(model.WorkflowStep,
backref="input_connections",
input_step_input=relation(model.WorkflowStepInput,
backref="connections",
cascade="all",
primaryjoin=(model.WorkflowStepConnection.table.c.input_step_id == model.WorkflowStep.table.c.id)),
primaryjoin=(model.WorkflowStepConnection.table.c.input_step_input_id == model.WorkflowStepInput.table.c.id)),
input_subworkflow_step=relation(model.WorkflowStep,
backref=backref("parent_workflow_input_connections", uselist=True),
primaryjoin=(model.WorkflowStepConnection.table.c.input_subworkflow_step_id == model.WorkflowStep.table.c.id),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,14 +93,6 @@ def upgrade(migrate_engine):
for table in tables.values():
__create(table)

def nextval(table, col='id'):
if migrate_engine.name in ['postgres', 'postgresql']:
return "nextval('%s_%s_seq')" % (table, col)
elif migrate_engine.name in ['mysql', 'sqlite']:
return "null"
else:
raise Exception("Unhandled database type")

# Set default for creation to scheduled, actual mapping has new as default.
workflow_invocation_step_state_column = Column("state", TrimmedString(64), default="scheduled")
if migrate_engine.name in ['postgres', 'postgresql']:
Expand Down
105 changes: 105 additions & 0 deletions lib/galaxy/model/migrate/versions/0145_add_workflow_step_input.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
"""
Migration script for workflow step input table.
"""
from __future__ import print_function

import logging

from sqlalchemy import Boolean, Column, ForeignKey, Integer, MetaData, Table, TEXT

from galaxy.model.custom_types import JSONType

log = logging.getLogger(__name__)
metadata = MetaData()


def get_new_tables():

WorkflowStepInput_table = Table(
"workflow_step_input", metadata,
Column("id", Integer, primary_key=True),
Column("workflow_step_id", Integer, ForeignKey("workflow_step.id"), index=True),
Column("name", TEXT),
Column("merge_type", TEXT),
Column("scatter_type", TEXT),
Column("value_from", JSONType),
Column("value_from_type", TEXT),
Column("default_value", JSONType),
Column("default_value_set", Boolean, default=False),
Column("runtime_value", Boolean, default=False),
)

WorkflowStepConnection_table = Table(
"workflow_step_connection", metadata,
Column("id", Integer, primary_key=True),
Column("output_step_id", Integer, ForeignKey("workflow_step.id"), index=True),
Column("input_step_input_id", Integer, ForeignKey("workflow_step_input.id"), index=True),
Column("output_name", TEXT),
Column("input_subworkflow_step_id", Integer, ForeignKey("workflow_step.id"), index=True),
)

return [
WorkflowStepInput_table, WorkflowStepConnection_table
]


def upgrade(migrate_engine):
metadata.bind = migrate_engine
print(__doc__)
metadata.reflect()

LegacyWorkflowStepConnection_table = Table("workflow_step_connection", metadata, autoload=True)
for index in LegacyWorkflowStepConnection_table.indexes:
index.drop()
LegacyWorkflowStepConnection_table.rename("workflow_step_connection_premigrate144")
# Try to deregister that table to work around some caching problems it seems.
LegacyWorkflowStepConnection_table.deregister()
metadata._remove_table("workflow_step_connection", metadata.schema)

metadata.reflect()
tables = get_new_tables()
for table in tables:
__create(table)

insert_step_inputs_cmd = \
"INSERT INTO workflow_step_input (workflow_step_id, name) " + \
"SELECT id, input_name FROM workflow_step_connection_premigrate144"

migrate_engine.execute(insert_step_inputs_cmd)

# TODO: verify order here.
insert_step_connections_cmd = \
"INSERT INTO workflow_step_connection (output_step_id, input_step_input_id, output_name, input_subworkflow_step_id) " + \
"SELECT wsc.output_step_id, wsi.id, wsc.output_name, wsc.input_subworkflow_step_id " + \
"FROM workflow_step_connection_premigrate144 as wsc left outer join workflow_step_input as wsi on wsc.input_step_id = wsi.workflow_step_id and wsc.input_name = wsi.name ORDER BY wsc.id"

migrate_engine.execute(insert_step_connections_cmd)


def downgrade(migrate_engine):
metadata.bind = migrate_engine

tables = get_new_tables()
for table in tables:
__drop(table)

metadata._remove_table("workflow_step_connection", metadata.schema)
metadata.reflect()

# Drop new workflow invocation step and job association table and restore legacy data.
LegacyWorkflowStepConnection_table = Table("workflow_step_connection_premigrate144", metadata, autoload=True)
LegacyWorkflowStepConnection_table.rename("workflow_step_connection")


def __create(table):
try:
table.create()
except Exception:
log.exception("Creating %s table failed.", table.name)


def __drop(table):
try:
table.drop()
except Exception:
log.exception("Dropping %s table failed.", table.name)
Loading

0 comments on commit 8bc9591

Please sign in to comment.