Skip to content

Commit

Permalink
Changed how scattered tasks' status, duration and attempts are calcul…
Browse files Browse the repository at this point in the history
…ated (#594)
  • Loading branch information
rsasch authored Mar 18, 2019
1 parent a7bf724 commit 846ddb6
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 57 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Job Manager Change Log

## v0.6.2 Release Notes

### Fixed bug where scattered tasks' status, duration, timing diagram and number of attempts were inaccurate.

## v0.6.1 Release Notes

### Fixed bug where job IDs that started with a number could not be queried for.
Expand Down
24 changes: 13 additions & 11 deletions servers/cromwell/jobs/controllers/jobs_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,12 +232,13 @@ def format_scattered_task(task_name, task_metadata):
else:
temp_status_collection[
status] = temp_status_collection[status] + 1
if minStart > _parse_datetime(shard.get('start')):
minStart = _parse_datetime(shard.get('start'))
if shard.get('executionStatus') not in ['Failed', 'Done']:
maxEnd = None
if maxEnd is not None and maxEnd < _parse_datetime(shard.get('end')):
maxEnd = _parse_datetime(shard.get('end'))
if minStart > _parse_datetime(shard.get('start')):
minStart = _parse_datetime(shard.get('start'))
if shard.get('executionStatus') not in ['Failed', 'Done']:
maxEnd = None
if maxEnd is not None and maxEnd < _parse_datetime(
shard.get('end')):
maxEnd = _parse_datetime(shard.get('end'))
current_shard = shard.get('shardIndex')

shard_status_counts = [
Expand All @@ -248,10 +249,10 @@ def format_scattered_task(task_name, task_metadata):
# grab attempts, path and subWorkflowId from last call
return TaskMetadata(
name=remove_workflow_name(task_name),
execution_status=_get_scattered_task_status(task_metadata),
execution_status=_get_scattered_task_status(shard_status_counts),
start=minStart,
end=maxEnd,
attempts=task_metadata[-1].get('attempt'),
attempts=task_metadata[-1].get('shardIndex') + 1,
call_root=remove_shard_path(task_metadata[-1].get('callRoot')),
job_id=task_metadata[-1].get('subWorkflowId'),
shard_statuses=shard_status_counts,
Expand Down Expand Up @@ -448,11 +449,12 @@ def _format_query_labels(orig_query_labels):
return query_labels


def _get_scattered_task_status(metadata):
def _get_scattered_task_status(shard_status_counts):
# get all shard statuses
statuses = {
task_statuses.cromwell_execution_to_api(shard.get('executionStatus'))
for shard in metadata
shard_status_count.status
for shard_status_count in shard_status_counts
if hasattr(shard_status_count, 'status')
}
# return status by ranked applicability
for status in [
Expand Down
2 changes: 1 addition & 1 deletion servers/cromwell/jobs/test/test_jobs_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,7 @@ def test_get_scattered_job_returns_200(self, mock_request):
call_root = '/cromwell/cromwell-executions/id/call-analysis'
std_err = '/cromwell/cromwell-executions/id/call-analysis/stderr'
std_out = '/cromwell/cromwell-executions/id/call-analysis/stdout'
attempts = 1
attempts = 2
return_code = 0

def _request_callback(request, context):
Expand Down
76 changes: 31 additions & 45 deletions servers/cromwell/jobs/test/test_task_statuses.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from jobs.controllers.utils import task_statuses
from jobs.controllers import jobs_controller
from jobs.models.shard_status_count import ShardStatusCount

import itertools

Expand Down Expand Up @@ -33,78 +34,63 @@ def test_unrecognized_task_status_causes_exception(self):

def test_scattered_task_status(self):
def failed_scattered_task():
return [{
'executionStatus': 'Failed',
}, {
'executionStatus': 'Aborting',
}, {
'executionStatus': 'Aborted',
}, {
'executionStatus': 'Running',
}, {
'executionStatus': 'Starting',
}, {
'executionStatus': 'Done',
}]
return [
ShardStatusCount(status='Failed', count=1),
ShardStatusCount(status='Aborting', count=1),
ShardStatusCount(status='Aborted', count=1),
ShardStatusCount(status='Running', count=1),
ShardStatusCount(status='Submitted', count=1),
ShardStatusCount(status='Succeeded', count=1)
]

for response in itertools.permutations(failed_scattered_task(), 6):
self.assertEqual(
jobs_controller._get_scattered_task_status(response), 'Failed')

def aborting_scattered_task():
return [{
'executionStatus': 'Aborting',
}, {
'executionStatus': 'Aborted',
}, {
'executionStatus': 'Running',
}, {
'executionStatus': 'Starting',
}, {
'executionStatus': 'Done',
}]
return [
ShardStatusCount(status='Aborting', count=1),
ShardStatusCount(status='Aborted', count=1),
ShardStatusCount(status='Running', count=1),
ShardStatusCount(status='Submitted', count=1),
ShardStatusCount(status='Succeeded', count=1)
]

for response in itertools.permutations(aborting_scattered_task(), 5):
self.assertEqual(
jobs_controller._get_scattered_task_status(response),
'Aborting')

def aborted_scattered_task():
return [{
'executionStatus': 'Aborted',
}, {
'executionStatus': 'Running',
}, {
'executionStatus': 'Starting',
}, {
'executionStatus': 'Done',
}]
return [
ShardStatusCount(status='Aborted', count=1),
ShardStatusCount(status='Running', count=1),
ShardStatusCount(status='Submitted', count=1),
ShardStatusCount(status='Succeeded', count=1)
]

for response in itertools.permutations(aborted_scattered_task(), 4):
self.assertEqual(
jobs_controller._get_scattered_task_status(response),
'Aborted')

def running_scattered_task():
return [{
'executionStatus': 'Running',
}, {
'executionStatus': 'Starting',
}, {
'executionStatus': 'Done',
}]
return [
ShardStatusCount(status='Running', count=1),
ShardStatusCount(status='Submitted', count=1),
ShardStatusCount(status='Succeeded', count=1)
]

for response in itertools.permutations(running_scattered_task(), 3):
self.assertEqual(
jobs_controller._get_scattered_task_status(response),
'Running')

def submitted_scattered_task():
return [{
'executionStatus': 'Starting',
}, {
'executionStatus': 'Done',
}]
return [
ShardStatusCount(status='Submitted', count=1),
ShardStatusCount(status='Succeeded', count=1)
]

for response in itertools.permutations(submitted_scattered_task(), 2):
self.assertEqual(
Expand Down

0 comments on commit 846ddb6

Please sign in to comment.