From b76718470954b02c86ca6620edd3588714f412cd Mon Sep 17 00:00:00 2001 From: Rebecca Asch Date: Fri, 15 Mar 2019 16:49:39 -0400 Subject: [PATCH 1/4] changed how scattered tasks' status, duration and attempts are calculated --- .../jobs/controllers/jobs_controller.py | 26 ++-- .../jobs/test/test_jobs_controller.py | 2 +- .../cromwell/jobs/test/test_task_statuses.py | 115 +++++++++++------- 3 files changed, 86 insertions(+), 57 deletions(-) diff --git a/servers/cromwell/jobs/controllers/jobs_controller.py b/servers/cromwell/jobs/controllers/jobs_controller.py index 6a7dda7a3..6c41add3f 100644 --- a/servers/cromwell/jobs/controllers/jobs_controller.py +++ b/servers/cromwell/jobs/controllers/jobs_controller.py @@ -167,6 +167,8 @@ def health(**kwargs): def format_task(task_name, task_metadata): # check to see if task is scattered if task_metadata[-1].get('shardIndex') != -1: + logger.warning('scattered task response {}'.format( + format_scattered_task(task_name, task_metadata))) return format_scattered_task(task_name, task_metadata) latest_attempt = task_metadata[-1] @@ -232,12 +234,13 @@ def format_scattered_task(task_name, task_metadata): else: temp_status_collection[ status] = temp_status_collection[status] + 1 - if minStart > _parse_datetime(shard.get('start')): - minStart = _parse_datetime(shard.get('start')) - if shard.get('executionStatus') not in ['Failed', 'Done']: - maxEnd = None - if maxEnd is not None and maxEnd < _parse_datetime(shard.get('end')): - maxEnd = _parse_datetime(shard.get('end')) + if minStart > _parse_datetime(shard.get('start')): + minStart = _parse_datetime(shard.get('start')) + if shard.get('executionStatus') not in ['Failed', 'Done']: + maxEnd = None + if maxEnd is not None and maxEnd < _parse_datetime( + shard.get('end')): + maxEnd = _parse_datetime(shard.get('end')) current_shard = shard.get('shardIndex') shard_status_counts = [ @@ -248,10 +251,10 @@ def format_scattered_task(task_name, task_metadata): # grab attempts, path and subWorkflowId from last call return TaskMetadata( name=remove_workflow_name(task_name), - execution_status=_get_scattered_task_status(task_metadata), + execution_status=_get_scattered_task_status(shard_status_counts), start=minStart, end=maxEnd, - attempts=task_metadata[-1].get('attempt'), + attempts=task_metadata[-1].get('shardIndex') + 1, call_root=remove_shard_path(task_metadata[-1].get('callRoot')), job_id=task_metadata[-1].get('subWorkflowId'), shard_statuses=shard_status_counts, @@ -448,11 +451,12 @@ def _format_query_labels(orig_query_labels): return query_labels -def _get_scattered_task_status(metadata): +def _get_scattered_task_status(shard_status_counts): # get all shard statuses statuses = { - task_statuses.cromwell_execution_to_api(shard.get('executionStatus')) - for shard in metadata + shard_status_count.status + for shard_status_count in shard_status_counts + if hasattr(shard_status_count, 'status') } # return status by ranked applicability for status in [ diff --git a/servers/cromwell/jobs/test/test_jobs_controller.py b/servers/cromwell/jobs/test/test_jobs_controller.py index 4c9bae145..bb82c655c 100644 --- a/servers/cromwell/jobs/test/test_jobs_controller.py +++ b/servers/cromwell/jobs/test/test_jobs_controller.py @@ -450,7 +450,7 @@ def test_get_scattered_job_returns_200(self, mock_request): call_root = '/cromwell/cromwell-executions/id/call-analysis' std_err = '/cromwell/cromwell-executions/id/call-analysis/stderr' std_out = '/cromwell/cromwell-executions/id/call-analysis/stdout' - attempts = 1 + attempts = 2 return_code = 0 def _request_callback(request, context): diff --git a/servers/cromwell/jobs/test/test_task_statuses.py b/servers/cromwell/jobs/test/test_task_statuses.py index a7a598748..61627e69d 100644 --- a/servers/cromwell/jobs/test/test_task_statuses.py +++ b/servers/cromwell/jobs/test/test_task_statuses.py @@ -6,6 +6,7 @@ from jobs.controllers.utils import task_statuses from jobs.controllers import jobs_controller +from jobs.models.shard_status_count import ShardStatusCount import itertools @@ -31,38 +32,53 @@ def test_unrecognized_task_status_causes_exception(self): with self.assertRaises(ValueError): task_statuses.cromwell_execution_to_api('Not a valid task status') + # 'Failed', 'Aborting', 'Aborted', 'Running', 'Submitted', + # 'Succeeded' + + def test_scattered_task_status(self): def failed_scattered_task(): - return [{ - 'executionStatus': 'Failed', - }, { - 'executionStatus': 'Aborting', - }, { - 'executionStatus': 'Aborted', - }, { - 'executionStatus': 'Running', - }, { - 'executionStatus': 'Starting', - }, { - 'executionStatus': 'Done', - }] + return [ShardStatusCount( + status='Failed', + count=1 + ), ShardStatusCount( + status='Aborting', + count=1 + ), ShardStatusCount( + status='Aborted', + count=1 + ), ShardStatusCount( + status='Running', + count=1 + ), ShardStatusCount( + status='Submitted', + count=1 + ), ShardStatusCount( + status='Succeeded', + count=1 + )] for response in itertools.permutations(failed_scattered_task(), 6): self.assertEqual( jobs_controller._get_scattered_task_status(response), 'Failed') def aborting_scattered_task(): - return [{ - 'executionStatus': 'Aborting', - }, { - 'executionStatus': 'Aborted', - }, { - 'executionStatus': 'Running', - }, { - 'executionStatus': 'Starting', - }, { - 'executionStatus': 'Done', - }] + return [ShardStatusCount( + status='Aborting', + count=1 + ), ShardStatusCount( + status='Aborted', + count=1 + ), ShardStatusCount( + status='Running', + count=1 + ), ShardStatusCount( + status='Submitted', + count=1 + ), ShardStatusCount( + status='Succeeded', + count=1 + )] for response in itertools.permutations(aborting_scattered_task(), 5): self.assertEqual( @@ -70,15 +86,19 @@ def aborting_scattered_task(): 'Aborting') def aborted_scattered_task(): - return [{ - 'executionStatus': 'Aborted', - }, { - 'executionStatus': 'Running', - }, { - 'executionStatus': 'Starting', - }, { - 'executionStatus': 'Done', - }] + return [ShardStatusCount( + status='Aborted', + count=1 + ), ShardStatusCount( + status='Running', + count=1 + ), ShardStatusCount( + status='Submitted', + count=1 + ), ShardStatusCount( + status='Succeeded', + count=1 + )] for response in itertools.permutations(aborted_scattered_task(), 4): self.assertEqual( @@ -86,13 +106,16 @@ def aborted_scattered_task(): 'Aborted') def running_scattered_task(): - return [{ - 'executionStatus': 'Running', - }, { - 'executionStatus': 'Starting', - }, { - 'executionStatus': 'Done', - }] + return [ShardStatusCount( + status='Running', + count=1 + ), ShardStatusCount( + status='Submitted', + count=1 + ), ShardStatusCount( + status='Succeeded', + count=1 + )] for response in itertools.permutations(running_scattered_task(), 3): self.assertEqual( @@ -100,11 +123,13 @@ def running_scattered_task(): 'Running') def submitted_scattered_task(): - return [{ - 'executionStatus': 'Starting', - }, { - 'executionStatus': 'Done', - }] + return [ShardStatusCount( + status='Submitted', + count=1 + ), ShardStatusCount( + status='Succeeded', + count=1 + )] for response in itertools.permutations(submitted_scattered_task(), 2): self.assertEqual( From fbe7f213a3ba05cadf6e6a6f54f7797aa83bae6e Mon Sep 17 00:00:00 2001 From: Rebecca Asch Date: Fri, 15 Mar 2019 16:50:22 -0400 Subject: [PATCH 2/4] changelog --- CHANGELOG.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index e565df1dc..ed4500594 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ # Job Manager Change Log +## v0.6.2 Release Notes + +### Fixed bug where scattered tasks' status, duration, timing diagram and number of attempts were inaccurate. + ## v0.6.1 Release Notes ### Fixed bug where job IDs that started with a number could not be queried for. From 11a6cd2ca7cc102c0d0099155705706ffa7f491a Mon Sep 17 00:00:00 2001 From: Rebecca Asch Date: Fri, 15 Mar 2019 18:07:45 -0400 Subject: [PATCH 3/4] lint --- .../cromwell/jobs/test/test_task_statuses.py | 96 ++++++------------- 1 file changed, 30 insertions(+), 66 deletions(-) diff --git a/servers/cromwell/jobs/test/test_task_statuses.py b/servers/cromwell/jobs/test/test_task_statuses.py index 61627e69d..f67a2fd01 100644 --- a/servers/cromwell/jobs/test/test_task_statuses.py +++ b/servers/cromwell/jobs/test/test_task_statuses.py @@ -35,50 +35,29 @@ def test_unrecognized_task_status_causes_exception(self): # 'Failed', 'Aborting', 'Aborted', 'Running', 'Submitted', # 'Succeeded' - def test_scattered_task_status(self): def failed_scattered_task(): - return [ShardStatusCount( - status='Failed', - count=1 - ), ShardStatusCount( - status='Aborting', - count=1 - ), ShardStatusCount( - status='Aborted', - count=1 - ), ShardStatusCount( - status='Running', - count=1 - ), ShardStatusCount( - status='Submitted', - count=1 - ), ShardStatusCount( - status='Succeeded', - count=1 - )] + return [ + ShardStatusCount(status='Failed', count=1), + ShardStatusCount(status='Aborting', count=1), + ShardStatusCount(status='Aborted', count=1), + ShardStatusCount(status='Running', count=1), + ShardStatusCount(status='Submitted', count=1), + ShardStatusCount(status='Succeeded', count=1) + ] for response in itertools.permutations(failed_scattered_task(), 6): self.assertEqual( jobs_controller._get_scattered_task_status(response), 'Failed') def aborting_scattered_task(): - return [ShardStatusCount( - status='Aborting', - count=1 - ), ShardStatusCount( - status='Aborted', - count=1 - ), ShardStatusCount( - status='Running', - count=1 - ), ShardStatusCount( - status='Submitted', - count=1 - ), ShardStatusCount( - status='Succeeded', - count=1 - )] + return [ + ShardStatusCount(status='Aborting', count=1), + ShardStatusCount(status='Aborted', count=1), + ShardStatusCount(status='Running', count=1), + ShardStatusCount(status='Submitted', count=1), + ShardStatusCount(status='Succeeded', count=1) + ] for response in itertools.permutations(aborting_scattered_task(), 5): self.assertEqual( @@ -86,19 +65,12 @@ def aborting_scattered_task(): 'Aborting') def aborted_scattered_task(): - return [ShardStatusCount( - status='Aborted', - count=1 - ), ShardStatusCount( - status='Running', - count=1 - ), ShardStatusCount( - status='Submitted', - count=1 - ), ShardStatusCount( - status='Succeeded', - count=1 - )] + return [ + ShardStatusCount(status='Aborted', count=1), + ShardStatusCount(status='Running', count=1), + ShardStatusCount(status='Submitted', count=1), + ShardStatusCount(status='Succeeded', count=1) + ] for response in itertools.permutations(aborted_scattered_task(), 4): self.assertEqual( @@ -106,16 +78,11 @@ def aborted_scattered_task(): 'Aborted') def running_scattered_task(): - return [ShardStatusCount( - status='Running', - count=1 - ), ShardStatusCount( - status='Submitted', - count=1 - ), ShardStatusCount( - status='Succeeded', - count=1 - )] + return [ + ShardStatusCount(status='Running', count=1), + ShardStatusCount(status='Submitted', count=1), + ShardStatusCount(status='Succeeded', count=1) + ] for response in itertools.permutations(running_scattered_task(), 3): self.assertEqual( @@ -123,13 +90,10 @@ def running_scattered_task(): 'Running') def submitted_scattered_task(): - return [ShardStatusCount( - status='Submitted', - count=1 - ), ShardStatusCount( - status='Succeeded', - count=1 - )] + return [ + ShardStatusCount(status='Submitted', count=1), + ShardStatusCount(status='Succeeded', count=1) + ] for response in itertools.permutations(submitted_scattered_task(), 2): self.assertEqual( From ea0e6730e1b1e6a5699e4e17b43742738a41f6ac Mon Sep 17 00:00:00 2001 From: Rebecca Asch Date: Fri, 15 Mar 2019 18:48:40 -0400 Subject: [PATCH 4/4] removed unneeded debugging and comments --- servers/cromwell/jobs/controllers/jobs_controller.py | 2 -- servers/cromwell/jobs/test/test_task_statuses.py | 3 --- 2 files changed, 5 deletions(-) diff --git a/servers/cromwell/jobs/controllers/jobs_controller.py b/servers/cromwell/jobs/controllers/jobs_controller.py index 6c41add3f..590b2dc71 100644 --- a/servers/cromwell/jobs/controllers/jobs_controller.py +++ b/servers/cromwell/jobs/controllers/jobs_controller.py @@ -167,8 +167,6 @@ def health(**kwargs): def format_task(task_name, task_metadata): # check to see if task is scattered if task_metadata[-1].get('shardIndex') != -1: - logger.warning('scattered task response {}'.format( - format_scattered_task(task_name, task_metadata))) return format_scattered_task(task_name, task_metadata) latest_attempt = task_metadata[-1] diff --git a/servers/cromwell/jobs/test/test_task_statuses.py b/servers/cromwell/jobs/test/test_task_statuses.py index f67a2fd01..93c188214 100644 --- a/servers/cromwell/jobs/test/test_task_statuses.py +++ b/servers/cromwell/jobs/test/test_task_statuses.py @@ -32,9 +32,6 @@ def test_unrecognized_task_status_causes_exception(self): with self.assertRaises(ValueError): task_statuses.cromwell_execution_to_api('Not a valid task status') - # 'Failed', 'Aborting', 'Aborted', 'Running', 'Submitted', - # 'Succeeded' - def test_scattered_task_status(self): def failed_scattered_task(): return [