From c88a585e0b02f9826cc66559a78b41c437d5f874 Mon Sep 17 00:00:00 2001 From: Dimitry Marchenko Date: Tue, 12 Jul 2016 10:16:15 -0500 Subject: [PATCH 1/5] track exit codes --- luigi/contrib/ecs.py | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/luigi/contrib/ecs.py b/luigi/contrib/ecs.py index 54e52c1aa3..02ca6ed4a3 100644 --- a/luigi/contrib/ecs.py +++ b/luigi/contrib/ecs.py @@ -79,12 +79,25 @@ def _get_task_statuses(cluster, task_ids): if response['failures'] != []: raise Exception('There were some failures:\n{0}'.format( response['failures'])) + + tasks = response['tasks'] + + dirty_exits = [] + for t in tasks: + for container in t['containers']: + if 'exitCode' in container and container['exitCode'] != 0: + dirty_exits.append(container) + + if len(dirty_exits): + raise Exception('Some containers had non-zero exit codes:\n{0}'.format( + dirty_exits)) + status_code = response['ResponseMetadata']['HTTPStatusCode'] if status_code != 200: msg = 'Task status request received status code {0}:\n{1}' raise Exception(msg.format(status_code, response)) - return [t['lastStatus'] for t in response['tasks']] + return [t['lastStatus'] for t in tasks] def _track_tasks(cluster, task_ids): From 5c24dabfda65f790902c8ad3c51bde3a3d1ab8b1 Mon Sep 17 00:00:00 2001 From: Dimitry Marchenko Date: Tue, 12 Jul 2016 10:40:16 -0500 Subject: [PATCH 2/5] only throw on essential containers non-zero --- luigi/contrib/ecs.py | 25 +++++++++++++++++++------ 1 file changed, 19 insertions(+), 6 deletions(-) diff --git a/luigi/contrib/ecs.py b/luigi/contrib/ecs.py index 02ca6ed4a3..41f20ccc7f 100644 --- a/luigi/contrib/ecs.py +++ b/luigi/contrib/ecs.py @@ -67,7 +67,7 @@ POLL_TIME = 2 -def _get_task_statuses(cluster, task_ids): +def _get_task_statuses(cluster, task_ids, essential_containers=None): """ Retrieve task statuses from ECS API @@ -85,8 +85,9 @@ def _get_task_statuses(cluster, task_ids): dirty_exits = [] for t in tasks: for container in t['containers']: - if 'exitCode' in container and container['exitCode'] != 0: - dirty_exits.append(container) + if container['name'] in essential_containers: + if 'exitCode' in container and container['exitCode'] != 0: + dirty_exits.append(container) if len(dirty_exits): raise Exception('Some containers had non-zero exit codes:\n{0}'.format( @@ -100,10 +101,10 @@ def _get_task_statuses(cluster, task_ids): return [t['lastStatus'] for t in tasks] -def _track_tasks(cluster, task_ids): +def _track_tasks(cluster, task_ids, **kwargs): """Poll task status until STOPPED""" while True: - statuses = _get_task_statuses(cluster, task_ids) + statuses = _get_task_statuses(cluster, task_ids, **kwargs) if all([status == 'STOPPED' for status in statuses]): logger.info('ECS tasks {0} STOPPED'.format(','.join(task_ids))) break @@ -196,7 +197,19 @@ def run(self): cluster=self.cluster, taskDefinition=self.task_def_arn, overrides=overrides) + self._task_ids = [task['taskArn'] for task in response['tasks']] + # Get essential container names to fail early on errors + task_definition = client.describe_task_definition( + taskDefinition=self.task_def_arn) + task_definition = task_definition['taskDefinition'] + self._essential_containers = \ + [cont['name'] + for cont in task_definition['containerDefinitions'] + if cont['essential']] + # Wait on task completion - _track_tasks(self.cluster, self._task_ids) + _track_tasks( + self.cluster, self._task_ids, + essential_containers=self._essential_containers) From d8e59264d6ba64018bf75859c6a8b2f6249f4c2f Mon Sep 17 00:00:00 2001 From: Dimitry Marchenko Date: Tue, 12 Jul 2016 12:06:24 -0500 Subject: [PATCH 3/5] handle ECS container reason failure, add tests for ECSTask failures --- luigi/contrib/ecs.py | 8 +++-- test/contrib/ecs_test.py | 65 ++++++++++++++++++++++++++++++++++++++++ tox.ini | 1 + 3 files changed, 72 insertions(+), 2 deletions(-) diff --git a/luigi/contrib/ecs.py b/luigi/contrib/ecs.py index 41f20ccc7f..5452335ed6 100644 --- a/luigi/contrib/ecs.py +++ b/luigi/contrib/ecs.py @@ -86,8 +86,12 @@ def _get_task_statuses(cluster, task_ids, essential_containers=None): for t in tasks: for container in t['containers']: if container['name'] in essential_containers: - if 'exitCode' in container and container['exitCode'] != 0: - dirty_exits.append(container) + if container['lastStatus'] == 'STOPPED': + # Check if container's command returned error + # or if ECS had an error running the command + if 'exitCode' in container and container['exitCode'] != 0 \ + or 'reason' in container: + dirty_exits.append(container) if len(dirty_exits): raise Exception('Some containers had non-zero exit codes:\n{0}'.format( diff --git a/test/contrib/ecs_test.py b/test/contrib/ecs_test.py index 5cc2c3ebe9..4330c67566 100644 --- a/test/contrib/ecs_test.py +++ b/test/contrib/ecs_test.py @@ -56,6 +56,56 @@ ] } +MISSING_COMMAND_TASK_DEF = { + 'family': 'hello-world', + 'volumes': [], + 'containerDefinitions': [ + { + 'memory': 1, + 'essential': True, + 'name': 'hello-world', + 'image': 'ubuntu', + 'command': ['doesnotexist'] + } + ] +} + +NON_ZERO_EXIT_CODE_TASK_DEF = { + 'family': 'hello-world', + 'volumes': [], + 'containerDefinitions': [ + { + 'memory': 1, + 'essential': True, + 'name': 'hello-world', + 'image': 'ubuntu', + 'command': ['sh', '-c', 'exit 123'] + } + ] +} + + +NON_ESSENTIAL_FAILURE = { + 'family': 'hello-world', + 'volumes': [], + 'containerDefinitions': [ + { + 'memory': 1, + 'essential': True, + 'name': 'hello-world', + 'image': 'ubuntu', + 'command': ['/bin/echo', 'hello world'] + }, + { + 'memory': 1, + 'essential': False, + 'name': 'non-essential-hello-world', + 'image': 'ubuntu', + 'command': ['doesnotexist'] + } + ] +} + class ECSTaskNoOutput(ECSTask): @@ -91,3 +141,18 @@ def test_registered_task(self): def test_override_command(self): t = ECSTaskOverrideCommand(task_def_arn=self.arn) luigi.build([t], local_scheduler=True) + + def test_missing_command_failure_task(self): + t = ECSTaskNoOutput(task_def=MISSING_COMMAND_TASK_DEF) + scheduled = luigi.build([t], local_scheduler=True) + self.assertFalse(scheduled) + + def test_non_zero_exit_code_failure_task(self): + t = ECSTaskNoOutput(task_def=NON_ZERO_EXIT_CODE_TASK_DEF) + scheduled = luigi.build([t], local_scheduler=True) + self.assertFalse(scheduled) + + def test_ignore_non_essential_failure_task(self): + t = ECSTaskNoOutput(task_def=NON_ESSENTIAL_FAILURE) + scheduled = luigi.build([t], local_scheduler=True) + self.assertTrue(scheduled) diff --git a/tox.ini b/tox.ini index cc55b325a0..460d889868 100644 --- a/tox.ini +++ b/tox.ini @@ -12,6 +12,7 @@ deps= nose<2.0 unittest2<2.0 boto<3.0 + boto3>=1.3.1 sqlalchemy<2.0 elasticsearch<2.0.0 psutil<4.0 From 822965409c03c03032a13d4379e0797ecae7ecfb Mon Sep 17 00:00:00 2001 From: Dimitry Marchenko Date: Tue, 12 Jul 2016 13:35:11 -0500 Subject: [PATCH 4/5] reorganize task status tracking a bit --- luigi/contrib/ecs.py | 50 +++++++++++++++++++++++++++------------- test/contrib/ecs_test.py | 5 ++-- 2 files changed, 37 insertions(+), 18 deletions(-) diff --git a/luigi/contrib/ecs.py b/luigi/contrib/ecs.py index 5452335ed6..59e9eac68b 100644 --- a/luigi/contrib/ecs.py +++ b/luigi/contrib/ecs.py @@ -67,9 +67,9 @@ POLL_TIME = 2 -def _get_task_statuses(cluster, task_ids, essential_containers=None): +def _get_task_descriptions(cluster, task_ids): """ - Retrieve task statuses from ECS API + Retrieve task descriptions from ECS API Returns list of {RUNNING|PENDING|STOPPED} for each id in task_ids """ @@ -80,8 +80,34 @@ def _get_task_statuses(cluster, task_ids, essential_containers=None): raise Exception('There were some failures:\n{0}'.format( response['failures'])) - tasks = response['tasks'] + status_code = response['ResponseMetadata']['HTTPStatusCode'] + if status_code != 200: + msg = 'Task status request received status code {0}:\n{1}' + raise Exception(msg.format(status_code, response)) + + return response['tasks'] + +def _get_task_statuses(tasks): + """ + Returns list of {RUNNING|PENDING|STOPPED} for each task in tasks + + Args: + - tasks (list): list of ECS taskDescriptions + """ + return [t['lastStatus'] for t in tasks] + +def _check_exit_codes(tasks, essential_containers): + """ + Checks each essential task in tasks for a failure reason or a + non-zero exitCode + + Args: + - tasks (list): list of ECS taskDescriptions + - essential_containers (list): list of essential container names + Raises: + - Exception: A failing essential task was found + """ dirty_exits = [] for t in tasks: for container in t['containers']: @@ -97,19 +123,13 @@ def _get_task_statuses(cluster, task_ids, essential_containers=None): raise Exception('Some containers had non-zero exit codes:\n{0}'.format( dirty_exits)) - status_code = response['ResponseMetadata']['HTTPStatusCode'] - if status_code != 200: - msg = 'Task status request received status code {0}:\n{1}' - raise Exception(msg.format(status_code, response)) - - return [t['lastStatus'] for t in tasks] - - -def _track_tasks(cluster, task_ids, **kwargs): +def _track_tasks(cluster, task_ids, essential_containers): """Poll task status until STOPPED""" while True: - statuses = _get_task_statuses(cluster, task_ids, **kwargs) + task_descriptions = _get_task_descriptions(cluster, task_ids) + statuses = _get_task_statuses(task_descriptions) if all([status == 'STOPPED' for status in statuses]): + _check_exit_codes(task_descriptions, essential_containers) logger.info('ECS tasks {0} STOPPED'.format(','.join(task_ids))) break time.sleep(POLL_TIME) @@ -214,6 +234,4 @@ def run(self): if cont['essential']] # Wait on task completion - _track_tasks( - self.cluster, self._task_ids, - essential_containers=self._essential_containers) + _track_tasks(self.cluster, self._task_ids, self._essential_containers) diff --git a/test/contrib/ecs_test.py b/test/contrib/ecs_test.py index 4330c67566..be1a448378 100644 --- a/test/contrib/ecs_test.py +++ b/test/contrib/ecs_test.py @@ -34,7 +34,7 @@ import unittest import luigi -from luigi.contrib.ecs import ECSTask, _get_task_statuses +from luigi.contrib.ecs import ECSTask, _get_task_statuses, _get_task_descriptions try: import boto3 @@ -111,8 +111,9 @@ class ECSTaskNoOutput(ECSTask): def complete(self): if self.ecs_task_ids: + task_definitions = _get_task_descriptions(self.ecs_task_ids) return all([status == 'STOPPED' - for status in _get_task_statuses(self.ecs_task_ids)]) + for status in _get_task_statuses(task_definitions)]) return False From aa057520fe1a60374291399e7d80b1338be7c7dc Mon Sep 17 00:00:00 2001 From: Pasha Katsev Date: Mon, 18 Jul 2016 09:25:51 -0500 Subject: [PATCH 5/5] add AWS_DEFAULT_PROFILE env var to tox passthru --- tox.ini | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tox.ini b/tox.ini index 460d889868..48251fd67d 100644 --- a/tox.ini +++ b/tox.ini @@ -28,7 +28,7 @@ deps= pygments hypothesis[datetime] passenv = - USER JAVA_HOME POSTGRES_USER DATAPROC_TEST_PROJECT_ID GCS_TEST_PROJECT_ID GCS_TEST_BUCKET GOOGLE_APPLICATION_CREDENTIALS TRAVIS_BUILD_ID TRAVIS TRAVIS_BRANCH TRAVIS_JOB_NUMBER TRAVIS_PULL_REQUEST TRAVIS_JOB_ID TRAVIS_REPO_SLUG TRAVIS_COMMIT CI + USER JAVA_HOME POSTGRES_USER DATAPROC_TEST_PROJECT_ID GCS_TEST_PROJECT_ID GCS_TEST_BUCKET GOOGLE_APPLICATION_CREDENTIALS TRAVIS_BUILD_ID TRAVIS TRAVIS_BRANCH TRAVIS_JOB_NUMBER TRAVIS_PULL_REQUEST TRAVIS_JOB_ID TRAVIS_REPO_SLUG TRAVIS_COMMIT CI AWS_DEFAULT_PROFILE setenv = LC_ALL = en_US.utf-8 cdh: HADOOP_DISTRO=cdh