Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Track ECS container exit codes #2

Merged
merged 5 commits into from
Jul 27, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 54 additions & 6 deletions luigi/contrib/ecs.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,9 @@
POLL_TIME = 2


def _get_task_statuses(cluster, task_ids):
def _get_task_descriptions(cluster, task_ids):
"""
Retrieve task statuses from ECS API
Retrieve task descriptions from ECS API

Returns list of {RUNNING|PENDING|STOPPED} for each id in task_ids
"""
Expand All @@ -79,19 +79,57 @@ def _get_task_statuses(cluster, task_ids):
if response['failures'] != []:
raise Exception('There were some failures:\n{0}'.format(
response['failures']))

status_code = response['ResponseMetadata']['HTTPStatusCode']
if status_code != 200:
msg = 'Task status request received status code {0}:\n{1}'
raise Exception(msg.format(status_code, response))

return [t['lastStatus'] for t in response['tasks']]
return response['tasks']

def _get_task_statuses(tasks):
"""
Returns list of {RUNNING|PENDING|STOPPED} for each task in tasks

Args:
- tasks (list): list of ECS taskDescriptions
"""
return [t['lastStatus'] for t in tasks]

def _check_exit_codes(tasks, essential_containers):
"""
Checks each essential task in tasks for a failure reason or a
non-zero exitCode

Args:
- tasks (list): list of ECS taskDescriptions
- essential_containers (list): list of essential container names

def _track_tasks(cluster, task_ids):
Raises:
- Exception: A failing essential task was found
"""
dirty_exits = []
for t in tasks:
for container in t['containers']:
if container['name'] in essential_containers:
if container['lastStatus'] == 'STOPPED':
# Check if container's command returned error
# or if ECS had an error running the command
if 'exitCode' in container and container['exitCode'] != 0 \
or 'reason' in container:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i imagine that reasons are not extraordinarily informative. but something's better than nothing - maybe we should let them bubble up?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean by bubble up exactly?

I'm collecting all the failures into one exception and then raising that, as there might be multiple failing essential containers in one task, and ECS does not seem to kill all the tasks immediately on a failed essential container (there is some delay, and those containers that are subsequently terminated exit with exitCode 137), so multiple containers may have reason/exitCode set.

dirty_exits.append(container)

if len(dirty_exits):
raise Exception('Some containers had non-zero exit codes:\n{0}'.format(
dirty_exits))

def _track_tasks(cluster, task_ids, essential_containers):
"""Poll task status until STOPPED"""
while True:
statuses = _get_task_statuses(cluster, task_ids)
task_descriptions = _get_task_descriptions(cluster, task_ids)
statuses = _get_task_statuses(task_descriptions)
if all([status == 'STOPPED' for status in statuses]):
_check_exit_codes(task_descriptions, essential_containers)
logger.info('ECS tasks {0} STOPPED'.format(','.join(task_ids)))
break
time.sleep(POLL_TIME)
Expand Down Expand Up @@ -183,7 +221,17 @@ def run(self):
cluster=self.cluster,
taskDefinition=self.task_def_arn,
overrides=overrides)

self._task_ids = [task['taskArn'] for task in response['tasks']]

# Get essential container names to fail early on errors
task_definition = client.describe_task_definition(
taskDefinition=self.task_def_arn)
task_definition = task_definition['taskDefinition']
self._essential_containers = \
[cont['name']
for cont in task_definition['containerDefinitions']
if cont['essential']]

# Wait on task completion
_track_tasks(self.cluster, self._task_ids)
_track_tasks(self.cluster, self._task_ids, self._essential_containers)
70 changes: 68 additions & 2 deletions test/contrib/ecs_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import unittest

import luigi
from luigi.contrib.ecs import ECSTask, _get_task_statuses
from luigi.contrib.ecs import ECSTask, _get_task_statuses, _get_task_descriptions

try:
import boto3
Expand All @@ -56,13 +56,64 @@
]
}

MISSING_COMMAND_TASK_DEF = {
'family': 'hello-world',
'volumes': [],
'containerDefinitions': [
{
'memory': 1,
'essential': True,
'name': 'hello-world',
'image': 'ubuntu',
'command': ['doesnotexist']
}
]
}

NON_ZERO_EXIT_CODE_TASK_DEF = {
'family': 'hello-world',
'volumes': [],
'containerDefinitions': [
{
'memory': 1,
'essential': True,
'name': 'hello-world',
'image': 'ubuntu',
'command': ['sh', '-c', 'exit 123']
}
]
}


NON_ESSENTIAL_FAILURE = {
'family': 'hello-world',
'volumes': [],
'containerDefinitions': [
{
'memory': 1,
'essential': True,
'name': 'hello-world',
'image': 'ubuntu',
'command': ['/bin/echo', 'hello world']
},
{
'memory': 1,
'essential': False,
'name': 'non-essential-hello-world',
'image': 'ubuntu',
'command': ['doesnotexist']
}
]
}


class ECSTaskNoOutput(ECSTask):

def complete(self):
if self.ecs_task_ids:
task_definitions = _get_task_descriptions(self.ecs_task_ids)
return all([status == 'STOPPED'
for status in _get_task_statuses(self.ecs_task_ids)])
for status in _get_task_statuses(task_definitions)])
return False


Expand Down Expand Up @@ -91,3 +142,18 @@ def test_registered_task(self):
def test_override_command(self):
t = ECSTaskOverrideCommand(task_def_arn=self.arn)
luigi.build([t], local_scheduler=True)

def test_missing_command_failure_task(self):
t = ECSTaskNoOutput(task_def=MISSING_COMMAND_TASK_DEF)
scheduled = luigi.build([t], local_scheduler=True)
self.assertFalse(scheduled)

def test_non_zero_exit_code_failure_task(self):
t = ECSTaskNoOutput(task_def=NON_ZERO_EXIT_CODE_TASK_DEF)
scheduled = luigi.build([t], local_scheduler=True)
self.assertFalse(scheduled)

def test_ignore_non_essential_failure_task(self):
t = ECSTaskNoOutput(task_def=NON_ESSENTIAL_FAILURE)
scheduled = luigi.build([t], local_scheduler=True)
self.assertTrue(scheduled)
3 changes: 2 additions & 1 deletion tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ deps=
nose<2.0
unittest2<2.0
boto<3.0
boto3>=1.3.1
sqlalchemy<2.0
elasticsearch<2.0.0
psutil<4.0
Expand All @@ -27,7 +28,7 @@ deps=
pygments
hypothesis[datetime]
passenv =
USER JAVA_HOME POSTGRES_USER DATAPROC_TEST_PROJECT_ID GCS_TEST_PROJECT_ID GCS_TEST_BUCKET GOOGLE_APPLICATION_CREDENTIALS TRAVIS_BUILD_ID TRAVIS TRAVIS_BRANCH TRAVIS_JOB_NUMBER TRAVIS_PULL_REQUEST TRAVIS_JOB_ID TRAVIS_REPO_SLUG TRAVIS_COMMIT CI
USER JAVA_HOME POSTGRES_USER DATAPROC_TEST_PROJECT_ID GCS_TEST_PROJECT_ID GCS_TEST_BUCKET GOOGLE_APPLICATION_CREDENTIALS TRAVIS_BUILD_ID TRAVIS TRAVIS_BRANCH TRAVIS_JOB_NUMBER TRAVIS_PULL_REQUEST TRAVIS_JOB_ID TRAVIS_REPO_SLUG TRAVIS_COMMIT CI AWS_DEFAULT_PROFILE
setenv =
LC_ALL = en_US.utf-8
cdh: HADOOP_DISTRO=cdh
Expand Down