Skip to content

Commit

Permalink
Enable Cromwell-tools to add labels at submission time. (394) (#9)
Browse files Browse the repository at this point in the history
* Add cromwell label validator.

* Add backport supoort for python2.7. (394)

* Make validators optional, also polish the code and elaborate docstrings based on comments. (394)
  • Loading branch information
rexwangcc authored Mar 24, 2018
1 parent dfe1af5 commit 5bb5aa2
Show file tree
Hide file tree
Showing 2 changed files with 145 additions and 10 deletions.
116 changes: 107 additions & 9 deletions cromwell_tools/cromwell_tools.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,28 @@
"""This module contains utility functions to interact with Cromwell.
"""
import io
import sys
import zipfile
import json
from datetime import datetime, timedelta
import time
import requests
from requests.auth import HTTPBasicAuth
import six
import re


_failed_statuses = ['Failed', 'Aborted', 'Aborting']

# Note: the following rules for validating labels are based on Cromwell's documentation:
# https://cromwell.readthedocs.io/en/develop/Labels/ and they could be changed in the future.
_CROMWELL_LABEL_LENGTH = 63
_CROMWELL_LABEL_KEY_REGEX = '[a-z]([-a-z0-9]*[a-z0-9])?'
_CROMWELL_LABEL_VALUE_REGEX = '([a-z0-9]*[-a-z0-9]*[a-z0-9])?'


def harmonize_credentials(secrets_file=None, cromwell_username=None, cromwell_password=None):
"""
takes all of the valid ways of providing authentication to cromwell and returns a username
Takes all of the valid ways of providing authentication to cromwell and returns a username
and password
:param str cromwell_password:
Expand All @@ -40,7 +46,7 @@ def harmonize_credentials(secrets_file=None, cromwell_username=None, cromwell_pa

def get_workflow_statuses(
ids, cromwell_url, cromwell_user=None, cromwell_password=None, secrets_file=None):
""" given a list of workflow ids, query cromwell url for their statuses
""" Given a list of workflow ids, query cromwell url for their statuses
:param list ids:
:param str cromwell_url:
Expand Down Expand Up @@ -73,7 +79,7 @@ def wait_until_workflow_completes(
cromwell_url, workflow_ids, timeout_minutes, poll_interval_seconds=30, cromwell_user=None,
cromwell_password=None, secrets_file=None):
"""
given a list of workflow ids, wait until cromwell returns successfully for each status, or
Given a list of workflow ids, wait until cromwell returns successfully for each status, or
one of the workflows fails or is aborted.
:param list workflow_ids:
Expand Down Expand Up @@ -110,20 +116,29 @@ def wait_until_workflow_completes(

def start_workflow(
wdl_file, inputs_file, url, options_file=None, inputs_file2=None, zip_file=None, user=None,
password=None):
password=None, label=None, validate_labels=True):
"""Use HTTP POST to start workflow in Cromwell.
The requests library could accept both Bytes and String objects as parameters of files, so there is no
strict restrictions on the type of inputs of this function.
:param _io.BytesIO wdl_file: wdl file.
:param _io.BytesIO inputs_file: inputs file.
:param str url: cromwell url.
:param _io.BytesIO options_file: (optional) cromwell configs file.
:param _io.BytesIO inputs_file2: (optional) inputs file 2.
:param _io.BytesIO zip_file: (optional) zip file containing dependencies.
:param str url: cromwell url
:param str user: cromwell username
:param str password: cromwell password
:param str user: (optional) cromwell username.
:param str password: (optional) cromwell password.
:param str|_io.BytesIO label: (optional) JSON file containing a collection of key/value pairs for workflow labels.
:param bool validate_labels: (optional) Whether to validate labels or not, using cromwell-tools' built-in
validators. It is set to True by default.
:return requests.Response response: HTTP response from cromwell.
"""
if validate_labels:
validate_cromwell_label(label)

files = {
'workflowSource': wdl_file,
'workflowInputs': inputs_file,
Expand All @@ -140,8 +155,11 @@ def start_workflow(
auth = HTTPBasicAuth(user, password)
else:
auth = None
response = requests.post(url, files=files, auth=auth)

if label:
files['labels'] = label

response = requests.post(url, files=files, auth=auth)
return response


Expand Down Expand Up @@ -208,3 +226,83 @@ def read_local_file(path):
with open(path) as f:
contents = f.read()
return contents


def _content_checker(regex, content):
"""Helper function to check if a string is obeying the rule described by a regex string or not.
:param str regex: A regex string defines valid content.
:param str content: A string to be validated.
:return str: A string of error message if validation fails, or an empty string if validation succeeds.
"""
if "fullmatch" in dir(re): # For Python3.4+
matched = re.fullmatch(regex, content)
else: # For Python3.3/2.7 or earlier versions
matched = _emulate_python_fullmatch(regex, content)

if not matched:
return 'Invalid label: {0} did not match the regex {1}.\n'.format(content, regex)
else:
return ''


def _length_checker(length, content):
"""Helper function to check if a string is shorter than expected length of not.
:param int length: Maximum length of an expected string.
:param str content: A string to be validated.
:return str: A string of error message if validation fails, or an empty string if validation succeeds.
"""
if len(content) > length:
return 'Invalid label: {0} has {1} characters. The maximum is {2}.\n'.format(content, len(content), length)
else:
return ''


def _emulate_python_fullmatch(regex, string, flags=0):
"""Backport Python 3.4's regular expression "fullmatch()" to Python 2 by emulating python-3.4 re.fullmatch().
If the whole string matches the regular expression pattern, return a corresponding match object.
Return None if the string does not match the pattern; note that this is different from a zero-length match.
:param str regex: A regex string.
:param str string: The string that you want to apply regex match to.
:param str|int flags: The expression's behaviour can be modified by specifying a flags value. Values can be any of
the variables listed in https://docs.python.org/3/library/re.html
:return SRE_Match/None: return a corresponding match object, or None if the string does not match the pattern.
"""
return re.match("(?:" + regex + r")\Z", string, flags=flags)


def validate_cromwell_label(label_object):
"""Check if the label object is valid for Cromwell.
Note: this function as well as the global variables _CROMWELL_LABEL_LENGTH, _CROMWELL_LABEL_KEY_REGEX
and _CROMWELL_LABEL_VALUE_REGEX are implemented based on the Cromwell's documentation:
https://cromwell.readthedocs.io/en/develop/Labels/ and the Cromwell's code base:
https://github.com/broadinstitute/cromwell/blob/master/core/src/main/scala/cromwell/core/labels/Label.scala#L16
Both the docs and the code base of Cromwell could possibly change in the future, please update this
checker on demand.
:param str|_io.BytesIO label_object: A dictionary or a key-value object string that define a Cromwell label.
:raises ValueError: This validator will raise an exception if the label_object is invalid as a Cromwell label.
"""
err_msg = ''

if isinstance(label_object, str) or isinstance(label_object, bytes):
label_object = json.loads(label_object)
elif isinstance(label_object, io.BytesIO):
label_object = json.loads(label_object.getvalue())

for label_key, label_value in label_object.items():
err_msg += _content_checker(_CROMWELL_LABEL_KEY_REGEX, label_key)
err_msg += _content_checker(_CROMWELL_LABEL_VALUE_REGEX, label_value)
err_msg += _length_checker(_CROMWELL_LABEL_LENGTH, label_key)
err_msg += _length_checker(_CROMWELL_LABEL_LENGTH, label_value)

if err_msg != '':
raise ValueError(err_msg)
39 changes: 38 additions & 1 deletion cromwell_tools/tests/test_cromwell_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from cromwell_tools import cromwell_tools
import zipfile
import os
import json


class TestUtils(unittest.TestCase):
Expand All @@ -21,6 +22,20 @@ def setUpClass(cls):
# Change to test directory, as tests may have been invoked from another dir
dir = os.path.abspath(os.path.dirname(__file__))
os.chdir(dir)
cls.invalid_labels = {
"0-label-key-1": "0-label-value-1",
"the-maximum-allowed-character-length-for-label-pairs-is-sixty-three":
"cromwell-please-dont-validate-these-labels",
"": "not a great label key",
"Comment": "This-is-a-test-label"
}
cls.valid_labels = {
"label-key-1": "label-value-1",
"label-key-2": "label-value-2",
"only-key": "",
"fc-id": "0123-abcd-4567-efgh",
"comment": "this-is-a-test-label"
}

@requests_mock.mock()
def test_start_workflow(self, mock_request):
Expand All @@ -31,6 +46,7 @@ def test_start_workflow(self, mock_request):
inputs_file = io.BytesIO(b"inputs_file_content")
inputs_file2 = io.BytesIO(b"inputs_file2_content")
options_file = io.BytesIO(b"options_file_content")
label = io.BytesIO(b'{"test-label-key": "test-label-value"}')

def _request_callback(request, context):
context.status_code = 200
Expand All @@ -43,7 +59,7 @@ def _request_callback(request, context):
# Check request actions
mock_request.post(url, json=_request_callback)
result = cromwell_tools.start_workflow(
wdl_file, inputs_file, url, options_file, inputs_file2, zip_file, user, password)
wdl_file, inputs_file, url, options_file, inputs_file2, zip_file, user, password, label)
self.assertEqual(result.status_code, 200)
self.assertEqual(result.headers.get('test'), 'header')

Expand Down Expand Up @@ -118,6 +134,27 @@ def test_make_zip_in_memory(self):
self.assertEqual(f1_contents, b'aaa\n')
self.assertEqual(f2_contents, b'bbb\n')

def test_validate_cromwell_label_on_invalid_labels_object(self):
self.assertRaises(ValueError, cromwell_tools.validate_cromwell_label,
self.invalid_labels)

def test_validate_cromwell_label_on_invalid_labels_str_object(self):
self.assertRaises(ValueError, cromwell_tools.validate_cromwell_label,
json.dumps(self.invalid_labels))

def test_validate_cromwell_label_on_invalid_labels_bytes_object(self):
self.assertRaises(ValueError, cromwell_tools.validate_cromwell_label,
json.dumps(self.invalid_labels).encode('utf-8'))

def test_validate_cromwell_label_on_valid_labels_object(self):
self.assertIsNone(cromwell_tools.validate_cromwell_label(self.valid_labels))

def test_validate_cromwell_label_on_valid_labels_str_object(self):
self.assertIsNone(cromwell_tools.validate_cromwell_label(json.dumps(self.valid_labels)))

def test_validate_cromwell_label_on_valid_labels_bytes_object(self):
self.assertIsNone(cromwell_tools.validate_cromwell_label(json.dumps(self.valid_labels).encode('utf-8')))


if __name__ == '__main__':
unittest.main()

0 comments on commit 5bb5aa2

Please sign in to comment.