Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Changed how scattered tasks' status, duration and attempts are calculated #594

Merged
merged 4 commits into from
Mar 18, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Job Manager Change Log

## v0.6.2 Release Notes

### Fixed bug where scattered tasks' status, duration, timing diagram and number of attempts were inaccurate.

## v0.6.1 Release Notes

### Fixed bug where job IDs that started with a number could not be queried for.
Expand Down
24 changes: 13 additions & 11 deletions servers/cromwell/jobs/controllers/jobs_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,12 +232,13 @@ def format_scattered_task(task_name, task_metadata):
else:
temp_status_collection[
status] = temp_status_collection[status] + 1
if minStart > _parse_datetime(shard.get('start')):
minStart = _parse_datetime(shard.get('start'))
if shard.get('executionStatus') not in ['Failed', 'Done']:
maxEnd = None
if maxEnd is not None and maxEnd < _parse_datetime(shard.get('end')):
maxEnd = _parse_datetime(shard.get('end'))
if minStart > _parse_datetime(shard.get('start')):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you remind me why move this conditional logic a level deeper?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it was moved under the if current_shard != shard.get('shardIndex'): check to only get the data from the latest attempts of each shard

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanation, that makes sense!

minStart = _parse_datetime(shard.get('start'))
if shard.get('executionStatus') not in ['Failed', 'Done']:
maxEnd = None
if maxEnd is not None and maxEnd < _parse_datetime(
shard.get('end')):
maxEnd = _parse_datetime(shard.get('end'))
current_shard = shard.get('shardIndex')

shard_status_counts = [
Expand All @@ -248,10 +249,10 @@ def format_scattered_task(task_name, task_metadata):
# grab attempts, path and subWorkflowId from last call
return TaskMetadata(
name=remove_workflow_name(task_name),
execution_status=_get_scattered_task_status(task_metadata),
execution_status=_get_scattered_task_status(shard_status_counts),
start=minStart,
end=maxEnd,
attempts=task_metadata[-1].get('attempt'),
attempts=task_metadata[-1].get('shardIndex') + 1,
call_root=remove_shard_path(task_metadata[-1].get('callRoot')),
job_id=task_metadata[-1].get('subWorkflowId'),
shard_statuses=shard_status_counts,
Expand Down Expand Up @@ -448,11 +449,12 @@ def _format_query_labels(orig_query_labels):
return query_labels


def _get_scattered_task_status(metadata):
def _get_scattered_task_status(shard_status_counts):
# get all shard statuses
statuses = {
task_statuses.cromwell_execution_to_api(shard.get('executionStatus'))
for shard in metadata
shard_status_count.status
for shard_status_count in shard_status_counts
if hasattr(shard_status_count, 'status')
}
# return status by ranked applicability
for status in [
Expand Down
2 changes: 1 addition & 1 deletion servers/cromwell/jobs/test/test_jobs_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,7 @@ def test_get_scattered_job_returns_200(self, mock_request):
call_root = '/cromwell/cromwell-executions/id/call-analysis'
std_err = '/cromwell/cromwell-executions/id/call-analysis/stderr'
std_out = '/cromwell/cromwell-executions/id/call-analysis/stdout'
attempts = 1
attempts = 2
return_code = 0

def _request_callback(request, context):
Expand Down
76 changes: 31 additions & 45 deletions servers/cromwell/jobs/test/test_task_statuses.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from jobs.controllers.utils import task_statuses
from jobs.controllers import jobs_controller
from jobs.models.shard_status_count import ShardStatusCount

import itertools

Expand Down Expand Up @@ -33,78 +34,63 @@ def test_unrecognized_task_status_causes_exception(self):

def test_scattered_task_status(self):
def failed_scattered_task():
return [{
'executionStatus': 'Failed',
}, {
'executionStatus': 'Aborting',
}, {
'executionStatus': 'Aborted',
}, {
'executionStatus': 'Running',
}, {
'executionStatus': 'Starting',
}, {
'executionStatus': 'Done',
}]
return [
ShardStatusCount(status='Failed', count=1),
ShardStatusCount(status='Aborting', count=1),
ShardStatusCount(status='Aborted', count=1),
ShardStatusCount(status='Running', count=1),
ShardStatusCount(status='Submitted', count=1),
ShardStatusCount(status='Succeeded', count=1)
]

for response in itertools.permutations(failed_scattered_task(), 6):
self.assertEqual(
jobs_controller._get_scattered_task_status(response), 'Failed')

def aborting_scattered_task():
return [{
'executionStatus': 'Aborting',
}, {
'executionStatus': 'Aborted',
}, {
'executionStatus': 'Running',
}, {
'executionStatus': 'Starting',
}, {
'executionStatus': 'Done',
}]
return [
ShardStatusCount(status='Aborting', count=1),
ShardStatusCount(status='Aborted', count=1),
ShardStatusCount(status='Running', count=1),
ShardStatusCount(status='Submitted', count=1),
ShardStatusCount(status='Succeeded', count=1)
]

for response in itertools.permutations(aborting_scattered_task(), 5):
self.assertEqual(
jobs_controller._get_scattered_task_status(response),
'Aborting')

def aborted_scattered_task():
return [{
'executionStatus': 'Aborted',
}, {
'executionStatus': 'Running',
}, {
'executionStatus': 'Starting',
}, {
'executionStatus': 'Done',
}]
return [
ShardStatusCount(status='Aborted', count=1),
ShardStatusCount(status='Running', count=1),
ShardStatusCount(status='Submitted', count=1),
ShardStatusCount(status='Succeeded', count=1)
]

for response in itertools.permutations(aborted_scattered_task(), 4):
self.assertEqual(
jobs_controller._get_scattered_task_status(response),
'Aborted')

def running_scattered_task():
return [{
'executionStatus': 'Running',
}, {
'executionStatus': 'Starting',
}, {
'executionStatus': 'Done',
}]
return [
ShardStatusCount(status='Running', count=1),
ShardStatusCount(status='Submitted', count=1),
ShardStatusCount(status='Succeeded', count=1)
]

for response in itertools.permutations(running_scattered_task(), 3):
self.assertEqual(
jobs_controller._get_scattered_task_status(response),
'Running')

def submitted_scattered_task():
return [{
'executionStatus': 'Starting',
}, {
'executionStatus': 'Done',
}]
return [
ShardStatusCount(status='Submitted', count=1),
ShardStatusCount(status='Succeeded', count=1)
]

for response in itertools.permutations(submitted_scattered_task(), 2):
self.assertEqual(
Expand Down