Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix retry number handling #1230

Merged
merged 1 commit into from
Mar 29, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 21 additions & 13 deletions airflow/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -1000,32 +1000,35 @@ def run(
)
elif force or self.state in State.runnable():
HR = "\n" + ("-" * 80) + "\n" # Line break
tot_tries = task.retries + 1

# For reporting purposes, we report based on 1-indexed,
# not 0-indexed lists (i.e. Attempt 1 instead of
# Attempt 0 for the first attempt)
msg = "Attempt {} out of {}".format(self.try_number+1,
tot_tries)
self.try_number += 1
msg = msg.format(**locals())
logging.info(HR + msg + HR)
# Attempt 0 for the first attempt).
msg = "Starting attempt {attempt} of {total}".format(
attempt=self.try_number % (task.retries + 1) + 1,
total=task.retries + 1)
self.start_date = datetime.now()

if not mark_success and self.state != State.QUEUED and (
self.pool or self.task.dag.concurrency_reached):
# If a pool is set for this task, marking the task instance
# as QUEUED
self.state = State.QUEUED
# Since we are just getting enqueued, we need to undo
# the try_number increment above and update the message as well
self.try_number -= 1
msg = "Queuing attempt {} out of {}".format(self.try_number+1,
tot_tries)
msg = "Queuing attempt {attempt} of {total}".format(
attempt=self.try_number % (task.retries + 1) + 1,
total=task.retries + 1)
logging.info(HR + msg + HR)

self.queued_dttm = datetime.now()
session.merge(self)
session.commit()
logging.info("Queuing into pool {}".format(self.pool))
return

# print status message
logging.info(HR + msg + HR)
self.try_number += 1

if not test_mode:
session.add(Log(State.RUNNING, self))
self.state = State.RUNNING
Expand Down Expand Up @@ -1121,12 +1124,17 @@ def handle_failure(self, error, test_mode=False, context=None):

# Let's go deeper
try:
if self.try_number <= task.retries:
if task.retries and self.try_number % (task.retries + 1) != 0:
self.state = State.UP_FOR_RETRY
logging.info('Marking task as UP_FOR_RETRY')
if task.email_on_retry and task.email:
self.email_alert(error, is_retry=True)
else:
self.state = State.FAILED
if task.retries:
logging.info('All retries failed; marking task as FAILED')
else:
logging.info('Marking task as FAILED.')
if task.email_on_failure and task.email:
self.email_alert(error, is_retry=False)
except Exception as e2:
Expand Down
118 changes: 109 additions & 9 deletions tests/models.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,29 @@
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals

import unittest
import datetime
import unittest
import time

from airflow import models
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators import DummyOperator, BashOperator
from airflow.utils import State, AirflowException


class DagTest(unittest.TestCase):
Expand Down Expand Up @@ -95,14 +110,15 @@ class TaskInstanceTest(unittest.TestCase):

def test_run_pooling_task(self):
"""
test that running task with mark_success param update task state as SUCCESS
without running task.
test that running task with mark_success param update task state as
SUCCESS without running task.
"""
dag = models.DAG(dag_id='test_run_pooling_task')
task = DummyOperator(task_id='test_run_pooling_task_op', dag=dag,
pool='test_run_pooling_task_pool', owner='airflow',
start_date=datetime.datetime(2016, 2, 1, 0, 0, 0))
ti = models.TaskInstance(task=task, execution_date=datetime.datetime.now())
ti = models.TaskInstance(
task=task, execution_date=datetime.datetime.now())
ti.run()
assert ti.state == models.State.QUEUED

Expand All @@ -112,9 +128,93 @@ def test_run_pooling_task_with_mark_success(self):
without running task.
"""
dag = models.DAG(dag_id='test_run_pooling_task_with_mark_success')
task = DummyOperator(task_id='test_run_pooling_task_with_mark_success_op', dag=dag,
pool='test_run_pooling_task_with_mark_success_pool', owner='airflow',
start_date=datetime.datetime(2016, 2, 1, 0, 0, 0))
ti = models.TaskInstance(task=task, execution_date=datetime.datetime.now())
task = DummyOperator(
task_id='test_run_pooling_task_with_mark_success_op',
dag=dag,
pool='test_run_pooling_task_with_mark_success_pool',
owner='airflow',
start_date=datetime.datetime(2016, 2, 1, 0, 0, 0))
ti = models.TaskInstance(
task=task, execution_date=datetime.datetime.now())
ti.run(mark_success=True)
assert ti.state == models.State.SUCCESS

def test_retry_delay(self):
"""
Test that retry delays are respected
"""
dag = models.DAG(dag_id='test_retry_handling')
task = BashOperator(
task_id='test_retry_handling_op',
bash_command='exit 1',
retries=1,
retry_delay=datetime.timedelta(seconds=3),
dag=dag,
owner='airflow',
start_date=datetime.datetime(2016, 2, 1, 0, 0, 0))

def run_with_error(ti):
try:
ti.run()
except AirflowException:
pass

ti = models.TaskInstance(
task=task, execution_date=datetime.datetime.now())

# first run -- up for retry
run_with_error(ti)
self.assertEqual(ti.state, State.UP_FOR_RETRY)
self.assertEqual(ti.try_number, 1)

# second run -- still up for retry because retry_delay hasn't expired
run_with_error(ti)
self.assertEqual(ti.state, State.UP_FOR_RETRY)

# third run -- failed
time.sleep(3)
run_with_error(ti)
self.assertEqual(ti.state, State.FAILED)

def test_retry_handling(self):
"""
Test that task retries are handled properly
"""
dag = models.DAG(dag_id='test_retry_handling')
task = BashOperator(
task_id='test_retry_handling_op',
bash_command='exit 1',
retries=1,
retry_delay=datetime.timedelta(seconds=0),
dag=dag,
owner='airflow',
start_date=datetime.datetime(2016, 2, 1, 0, 0, 0))

def run_with_error(ti):
try:
ti.run()
except AirflowException:
pass

ti = models.TaskInstance(
task=task, execution_date=datetime.datetime.now())

# first run -- up for retry
run_with_error(ti)
self.assertEqual(ti.state, State.UP_FOR_RETRY)
self.assertEqual(ti.try_number, 1)

# second run -- fail
run_with_error(ti)
self.assertEqual(ti.state, State.FAILED)
self.assertEqual(ti.try_number, 2)

# third run -- up for retry
run_with_error(ti)
self.assertEqual(ti.state, State.UP_FOR_RETRY)
self.assertEqual(ti.try_number, 3)

# fourth run -- fail
run_with_error(ti)
self.assertEqual(ti.state, State.FAILED)
self.assertEqual(ti.try_number, 4)