Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SubDag Operator broken when used with pools - infinite retry of tasks #1225

Closed
syvineckruyk opened this issue Mar 27, 2016 · 31 comments
Closed
Assignees
Labels
kind:bug This is a clearly a bug

Comments

@syvineckruyk
Copy link
Contributor

Dear Airflow Maintainers,

Before I tell you about my issue, let me describe my environment:

Environment

Centos 6
CeleryExecutor (redis)
Two workers, 1 scheduler, 1 webserver
MySql

  • Version of Airflow 1.6.2.10, 1.7.0rc1, HEAD
  • Example code to reproduce the bug (as a code snippet in markdown)
from datetime import datetime, timedelta
from airflow.models import DAG
from airflow.operators import BashOperator, SubDagOperator



dag_name = 'test_18'
test_pool = 'test_pool'
email_to = 'your@email.com'

now = datetime.now()
start_time = datetime(now.year, now.month, now.day, 12, 0, 0)
start_time = start_time + timedelta(days=-1)

default_args = {
    'owner': 'Test',
    'depends_on_past': True,
    'start_date': start_time,
    'email': [email_to],
    'email_on_failure': True,
    'email_on_retry': True,
    'wait_for_downstream': False,
}

# Create the dag object
dag = DAG(dag_name,
          default_args=default_args,
          schedule_interval='0 * * * *')


def get_subdag(dag, sd_id, pool=None):
    subdag = DAG(
        dag_id='{parent_dag}.{sd_id}'.format(
            parent_dag=dag.dag_id,
            sd_id=sd_id),
        params=dag.params,
        default_args=dag.default_args,
        template_searchpath=dag.template_searchpath,
        user_defined_macros=dag.user_defined_macros,
    )

    t1 = BashOperator(
        task_id='test_task',
        bash_command='echo "hello" && sleep 10',
        dag=subdag,
        pool=pool
    )

    t2 = BashOperator(
        task_id='test_task_2_with_exit_1',
        bash_command='echo "hello" && sleep 10 && exit 1',
        dag=subdag,
        pool=pool
    )

    t2.set_upstream(t1)

    sdo = SubDagOperator(
        task_id=sd_id,
        subdag=subdag,
        retries=2,
        retry_delay=timedelta(seconds=30),
        dag=dag,
        depends_on_past=True,
    )

    return sdo

sd1 = get_subdag(dag, sd_id='test_subdag')
sd2 = get_subdag(dag, sd_id='test_subdag_with_pool', pool=test_pool)
  • Screen shots of your DAG's graph and tree views:
  1. sub-task of non pooled subdag fails
    af_issue_1
  2. subdag enters retry state
    af_issue_2
  3. after all retries exhausted non pooled subdag fails
    af_issue_6
  4. sub-task of pooled subdag fails
    af_issue_3
  5. pooled subdag remains in running state, sub-task of pooled subdag enters queued state
    af_issue_4
  6. sub-task of pooled subdag enters running state
    af_issue_5
  7. sub-task failures of pooled subdags generate email alerts for all failures
    af_issue_7
  8. logs (from UI) also show observed behavior
Id  Dttm    Dag Id  Task Id Event   Execution Date  Owner   Extra
1910    03-27T15:37:36  test_18.test_subdag test_task_2_with_exit_1 running 03-26T12:00:00  Test    
1911    03-27T15:37:46  test_18.test_subdag test_task_2_with_exit_1 failed  03-26T12:00:00  Test    
1916    03-27T15:39:18  test_18.test_subdag test_task_2_with_exit_1 running 03-26T12:00:00  Test    
1918    03-27T15:39:28  test_18.test_subdag test_task_2_with_exit_1 failed  03-26T12:00:00  Test    
1924    03-27T15:41:18  test_18.test_subdag test_task_2_with_exit_1 running 03-26T12:00:00  Test    
1930    03-27T15:41:28  test_18.test_subdag test_task_2_with_exit_1 failed  03-26T12:00:00  Test    
1912    03-27T15:37:54  test_18.test_subdag_with_pool   test_task_2_with_exit_1 running 03-26T12:00:00  Test    
1914    03-27T15:38:05  test_18.test_subdag_with_pool   test_task_2_with_exit_1 failed  03-26T12:00:00  Test    
1917    03-27T15:39:23  test_18.test_subdag_with_pool   test_task_2_with_exit_1 running 03-26T12:00:00  Test    
1920    03-27T15:39:33  test_18.test_subdag_with_pool   test_task_2_with_exit_1 failed  03-26T12:00:00  Test    
1921    03-27T15:40:54  test_18.test_subdag_with_pool   test_task_2_with_exit_1 running 03-26T12:00:00  Test    
1922    03-27T15:41:04  test_18.test_subdag_with_pool   test_task_2_with_exit_1 failed  03-26T12:00:00  Test    
1925    03-27T15:41:26  test_18.test_subdag_with_pool   test_task_2_with_exit_1 running 03-26T12:00:00  Test    
1926    03-27T15:41:36  test_18.test_subdag_with_pool   test_task_2_with_exit_1 failed  03-26T12:00:00  Test    
1927    03-27T15:41:52  test_18.test_subdag_with_pool   test_task_2_with_exit_1 running 03-26T12:00:00  Test    
1928    03-27T15:42:02  test_18.test_subdag_with_pool   test_task_2_with_exit_1 failed  03-26T12:00:00  Test    
1929    03-27T15:42:18  test_18.test_subdag_with_pool   test_task_2_with_exit_1 running 03-26T12:00:00  Test    
1931    03-27T15:42:28  test_18.test_subdag_with_pool   test_task_2_with_exit_1 failed  03-26T12:00:00  Test    
1933    03-27T15:43:52  test_18.test_subdag_with_pool   test_task_2_with_exit_1 running 03-26T12:00:00  Test    
1934    03-27T15:44:02  test_18.test_subdag_with_pool   test_task_2_with_exit_1 failed  03-26T12:00:00  Test    
1935    03-27T15:44:30  test_18.test_subdag_with_pool   test_task_2_with_exit_1 running 03-26T12:00:00  Test    
1936    03-27T15:44:40  test_18.test_subdag_with_pool   test_task_2_with_exit_1 failed  03-26T12:00:00  Test    
1937    03-27T15:44:56  test_18.test_subdag_with_pool   test_task_2_with_exit_1 running 03-26T12:00:00  Test    
1938    03-27T15:45:06  test_18.test_subdag_with_pool   test_task_2_with_exit_1 failed  03-26T12:00:00  Test    
1939    03-27T15:46:35  test_18.test_subdag_with_pool   test_task_2_with_exit_1 running 03-26T12:00:00  Test    
1940    03-27T15:46:45  test_18.test_subdag_with_pool   test_task_2_with_exit_1 failed  03-26T12:00:00  Test    
1941    03-27T15:47:42  test_18.test_subdag_with_pool   test_task_2_with_exit_1 running 03-26T12:00:00  Test    
1942    03-27T15:47:52  test_18.test_subdag_with_pool   test_task_2_with_exit_1 failed  03-26T12:00:00  Test    
1943    03-27T15:49:20  test_18.test_subdag_with_pool   test_task_2_with_exit_1 running 03-26T12:00:00  Test    
1944    03-27T15:49:30  test_18.test_subdag_with_pool   test_task_2_with_exit_1 failed  03-26T12:00:00  Test    
...

Now that you know a little about me, let me tell you about the issue I am having:

Description of Issue

When subdag operators are used with pools, sub-task failures are not "bubbled" up to the subdag operator. The contained failed task will retry indefinitely even when retries is set to 0.

When a subtask failure occurs the subdag operator remains in the running state, the sub-task quickly enters a queued state, and shortly there after re-enters a running state.

  • What did you expect to happen?
  • sub-task fails
  • subdag operator enters retry state
  • subdag operator enters running state when retry_delay has been met
  • sub-tasks get queued
  • sub-tasks enter running state
  • What happened instead?
  • sub-task fails
  • sub-task enters queued state
  • sub-task runs
  • rinse, repeat
  • Here is how you can reproduce this issue on your machine:

    Reproduction Steps

  • create a pool
  • set the email and pool in included DAG code
  • submit to your dags folder
  • observe non pooled subdag handle failure as specified
  • observe pooled subdag enter infinite loop

This behavior has been observed on multiple versions. I did the testing for submitting this issue on 1.7.0rc1.

I also tested on HEAD 2016-03-26. The issue seems to get worst. The subdag which is not pooled (which works as expected on 1.7.0rc1) enters the retry state as it should .. but then never re-enters a running state. Hanging indefinitely.

@jlowin jlowin self-assigned this Mar 27, 2016
@jlowin
Copy link
Member

jlowin commented Mar 27, 2016

@syvineckruyk Thanks so much for taking the time to fill out such a complete report. I believed I've identified the issue, but would you mind verifying it? The fix is in my branch, so you can either clone it or you can install it directly like this:

pip install git+https://github.com/jlowin/airflow@subdag_pool_issue_1225

This branch includes the fixes from #1220, so we'd have to merge that first, but I think it's just about ready to go. I don't think #1220 directly impacts your issue, but it touches the same code so I'd like to avoid a messy conflict. Please let me know if you have any issues getting it running -- if this works for you I'll put through the PR.

If you're curious, here's what I believe is going on:

  • When you run a task in a pool, it doesn't actually run right away; first it becomes queued. This means that running a task in a pool actually requires executing the run command at least twice: first to place it in a queue, and second to move it from the queue to the executor.
  • When you run a SchedulerJob, it constantly checks for queued tasks and schedules them appropriately, so the "run it twice" problem is taken care of.
  • The catch is that SubDagOperators don't submit their subdags to the scheduler. Instead, they kick them off with a BackfillJob. And BackfillJob had a little logic hole that would never run queued tasks the critical second time. Instead, queued tasks would just sit in the Backfill loop forever and the backfill job would never end.

@syvineckruyk
Copy link
Contributor Author

@jlowin Awesome ... I'll give it a go ... will update later today

@syvineckruyk
Copy link
Contributor Author

@jlowin still getting some weird (but different) behavior ... the subdag with pool did what it should the first time but then actually succeeded which should be impossible as the task returns an exit code of 1. I need a bit more time for a better analysis.

@jlowin
Copy link
Member

jlowin commented Mar 28, 2016

Strange! These subdags might be more trouble than they're worth :) @syvineckruyk instead of watching your DAG through the scheduler and airflow UI, could you just try kicking off a backfill and see if it completes or hangs (and if it properly sets the task states -- you can check those in the UI)? I want to try to minimize any other variables that could be affecting things.

airflow backfill test_18 -s 2016-03-25 -e 2016-03-25

(note with your settings this will kick off more than one run -- you might want to set your interval to 1 day for simplicity)

It's a more controllable way to see what's happening than the Scheduler and in your case should give the same result because the subdag is ultimately just running that exact code.

Sorry for the iterative debugging process -- if you post any modifications you make to your code I'll run it as well.

@jlowin
Copy link
Member

jlowin commented Mar 28, 2016

If I run that backfill command (with a fresh resetdb and a new test_pool with 10 slots), I get failures as expected when I check the UI after it finishes:

screen shot 2016-03-27 at 11 12 06 pm

That's Python 2.7.11 using the branch I linked above. Next I'll run it in the scheduler, and let's see if we can figure out what's going on.

I modified your code slightly to make it run faster (literally, shortened the sleep and retry delays)

from datetime import datetime, timedelta
from airflow.models import DAG
from airflow.operators import BashOperator, SubDagOperator



dag_name = 'test_18'
test_pool = 'test_pool'

start_time = datetime(2016, 3, 25)

default_args = {
    'owner': 'Test',
    'depends_on_past': True,
    'start_date': start_time,
    # 'email': [email_to],
    # 'email_on_failure': True,
    # 'email_on_retry': True,
    # 'wait_for_downstream': False,
}

# Create the dag object
dag = DAG(dag_name,
          default_args=default_args,
          schedule_interval=timedelta(days=1))


def get_subdag(dag, sd_id, pool=None):
    subdag = DAG(
        dag_id='{parent_dag}.{sd_id}'.format(
            parent_dag=dag.dag_id,
            sd_id=sd_id),
        params=dag.params,
        default_args=dag.default_args,
        template_searchpath=dag.template_searchpath,
        user_defined_macros=dag.user_defined_macros,
    )

    t1 = BashOperator(
        task_id='test_task',
        bash_command='echo "hello" && sleep 1',
        dag=subdag,
        pool=pool
    )

    t2 = BashOperator(
        task_id='test_task_2_with_exit_1',
        bash_command='echo "hello" && sleep 1 && exit 1',
        dag=subdag,
        pool=pool
    )

    t2.set_upstream(t1)

    sdo = SubDagOperator(
        task_id=sd_id,
        subdag=subdag,
        retries=1,
        retry_delay=timedelta(seconds=3),
        dag=dag,
        depends_on_past=True,
    )

    return sdo

sd1 = get_subdag(dag, sd_id='test_subdag')
sd2 = get_subdag(dag, sd_id='test_subdag_with_pool', pool=test_pool)

@jlowin
Copy link
Member

jlowin commented Mar 28, 2016

Running with the scheduler (and full airflow resetdb), I also get the expected failures (and I think nothing else runs because depends_on_past=True):
screen shot 2016-03-27 at 11 24 13 pm

Please let me know if you think something else is going on -- I definitely want to get to the bottom of this.

@jlowin jlowin added the kind:bug This is a clearly a bug label Mar 28, 2016
@syvineckruyk
Copy link
Contributor Author

@jlowin just got through a clean run ... (after resetdb) ... I want to perform a couple more tests/runs to make sure the behavior is consistent as I cannot explain my previous runs that had issues on your branch.

@syvineckruyk
Copy link
Contributor Author

@jlowin still some issues happening. So my current testing round is based on issuing backfill from the cli .. but i suspect the behavior is the same from the scheduler.

I ran
airflow backfill test_25 -s 2016-03-25 -e 2016-03-25
an it worked as expected ... got the same result as you.
Then I ran
airflow backfill test_25 -s 2016-03-26 -e 2016-03-26
and got a weird condition which should not be possible

image

The tasks in retry (stuck there btw)... Have retries set to 0 ... so this should not be possible. Also as you can see the subdag operators that are parent to the sub-tasks are in success ... so i think we both saw clean runs due to reset db .. but something is still off if there are previous dag runs / task instances.

Thanks for your help with all of this and let me know if you need anything from me.

@jlowin
Copy link
Member

jlowin commented Mar 29, 2016

@syvineckruyk is test_25 the code I posted?

@syvineckruyk
Copy link
Contributor Author

@jlowin yeah .. im bumping up the version for clean logs.

@jlowin
Copy link
Member

jlowin commented Mar 29, 2016

@syvineckruyk unfortunately I can't reproduce that weird case in your screenshot. I'm putting the exact
code I'm running at the bottom of this message. Thanks for your patience trying to solve this. If I had a gold medal to hand out for the "issue that identifies the most problems at once", you'd get it!

I run:

airflow resetdb -y
airflow backfill test_25 -s 2016-03-25 -e 2016-03-25
airflow backfill test_25 -s 2016-03-26 -e 2016-03-26

The 3/26 backfill simply doesn't run because its dependencies haven't been met. That's the correct behavior, since depends_on_past=True and the previous day failed.

(Something weird happens that I do need to fix but I don't think it's what you're seeing. When a task doesn't run because of dependencies, the command exits without failing. Therefore, the executor thinks it succeeded. But the task doesn't report a success, and so the executor concludes that something went wrong, puts up the "The airflow run command failed at reporting an error message" and, after three loops, fails the task. So you'll see the 3/26 task show up as failed with 0 try_number... that's wrong.)

I've updated my branch to pull the latest fixes from master -- do you mind checking it out again?

In my dags folder:

from datetime import datetime, timedelta
from airflow.models import DAG, Pool
from airflow.operators import BashOperator, SubDagOperator

import airflow
session = airflow.settings.Session()

pool = (
    session.query(Pool)
    .filter(Pool.pool == 'test_pool')
    .first())
if not pool:
    session.add(Pool(pool='test_pool', slots=10))
    session.commit()


dag_name = 'test_25'
test_pool = 'test_pool'

start_time = datetime(2016, 3, 25)

default_args = {
    'owner': 'Test',
    'depends_on_past': True,
    'start_date': start_time,
    # 'email': [email_to],
    # 'email_on_failure': True,
    # 'email_on_retry': True,
    # 'wait_for_downstream': False,
}

# Create the dag object
dag = DAG(dag_name,
          default_args=default_args,
          schedule_interval=timedelta(days=1))


def get_subdag(dag, sd_id, pool=None):
    subdag = DAG(
        dag_id='{parent_dag}.{sd_id}'.format(
            parent_dag=dag.dag_id,
            sd_id=sd_id),
        params=dag.params,
        default_args=dag.default_args,
        template_searchpath=dag.template_searchpath,
        user_defined_macros=dag.user_defined_macros,
    )

    t1 = BashOperator(
        task_id='test_task',
        bash_command='echo "hello" && sleep 0.2',
        dag=subdag,
        pool=pool
    )

    t2 = BashOperator(
        task_id='test_task_2_with_exit_1',
        bash_command='echo "hello" && sleep 0.2 && exit 1',
        dag=subdag,
        pool=pool
    )

    t2.set_upstream(t1)

    sdo = SubDagOperator(
        task_id=sd_id,
        subdag=subdag,
        retries=0,
        retry_delay=timedelta(seconds=0.2),
        dag=dag,
        depends_on_past=True,
    )

    return sdo

# sd1 = get_subdag(dag, sd_id='test_subdag')
sd2 = get_subdag(dag, sd_id='test_subdag_with_pool', pool=test_pool)

@syvineckruyk
Copy link
Contributor Author

@jlowin ill check it out now... thanks ! ... no need for apologies. let me know if I can do more ... I don't want to create a bunch of messy commits since you are already in deep. Let me know if you need anything from me other than testing.

@syvineckruyk
Copy link
Contributor Author

@jlowin are you using CeleryExecutor ?

@jlowin
Copy link
Member

jlowin commented Mar 30, 2016

Not for testing. I'm using Sequential Executor -- are you using Celery Executor? I'm using a completely fresh, default version of Airflow.

@syvineckruyk
Copy link
Contributor Author

@jlowin
So running your current branch with your current code and depends_past=false seems to work as expected. I ran two consecutive days here.

(airflow) [airflow@*** ~]$ airflow backfill test_27 -s 2016-03-25 -e 2016-03-25
[2016-03-30 01:28:35,636] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags
[2016-03-30 01:28:35,636] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 01:28:35,720] {models.py:317} INFO - Loaded DAG <DAG: test_27.test_subdag_with_pool>
[2016-03-30 01:28:35,720] {models.py:317} INFO - Loaded DAG <DAG: test_27>
[2016-03-30 01:28:35,749] {base_executor.py:35} INFO - Adding to queue: airflow run test_27 test_subdag_with_pool 2016-03-25T00:00:00 --pickle 2 --local -s 2016-03-25T00:00:00 
[2016-03-30 01:28:41,021] {celery_executor.py:66} INFO - [celery] queuing ('test_27', 'test_subdag_with_pool', datetime.datetime(2016, 3, 25, 0, 0)) through celery, queue=dc
[2016-03-30 01:28:41,127] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:28:46,066] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:28:51,060] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:28:56,054] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:29:01,060] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:29:06,080] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:29:11,062] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:29:16,091] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:29:21,068] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:29:26,087] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:29:31,068] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:29:36,049] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:29:41,061] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:29:46,060] {jobs.py:806} ERROR - Task instance ('test_27', 'test_subdag_with_pool', datetime.datetime(2016, 3, 25, 0, 0)) failed
[2016-03-30 01:29:46,060] {jobs.py:863} INFO - [backfill progress] waiting: 0 | succeeded: 0 | kicked_off: 1 | failed: 1 | wont_run: 0 
Traceback (most recent call last):
  File "/home/airflow/.pyenv/versions/airflow/bin/airflow", line 4, in <module>
    __import__('pkg_resources').run_script('airflow==1.6.2', 'airflow')
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/pkg_resources/__init__.py", line 726, in run_script
    self.require(requires)[0].run_script(script_name, ns)
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/pkg_resources/__init__.py", line 1484, in run_script
    exec(code, namespace, namespace)
  File "/home/airflow/.pyenv/versions/2.7.10/envs/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/EGG-INFO/scripts/airflow", line 15, in <module>
    args.func(args)
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/bin/cli.py", line 87, in backfill
    pool=args.pool)
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/models.py", line 2630, in run
    job.run()
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/jobs.py", line 175, in run
    self._execute()
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/jobs.py", line 872, in _execute
    raise AirflowException(msg)
airflow.exceptions.AirflowException: ------------------------------------------
Some tasks instances failed, here's the list:
[('test_27', 'test_subdag_with_pool', datetime.datetime(2016, 3, 25, 0, 0))]






(airflow) [airflow@*** ~]$ airflow backfill test_27 -s 2016-03-26 -e 2016-03-26
[2016-03-30 01:32:14,677] {__init__.py:36} INFO - Using executor CeleryExecutor
[2016-03-30 01:32:15,096] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags
[2016-03-30 01:32:15,096] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 01:32:15,180] {models.py:317} INFO - Loaded DAG <DAG: test_27.test_subdag_with_pool>
[2016-03-30 01:32:15,181] {models.py:317} INFO - Loaded DAG <DAG: test_27>
[2016-03-30 01:32:15,208] {base_executor.py:35} INFO - Adding to queue: airflow run test_27 test_subdag_with_pool 2016-03-26T00:00:00 --pickle 3 --local -s 2016-03-26T00:00:00 
[2016-03-30 01:32:20,018] {celery_executor.py:66} INFO - [celery] queuing ('test_27', 'test_subdag_with_pool', datetime.datetime(2016, 3, 26, 0, 0)) through celery, queue=dc
[2016-03-30 01:32:20,122] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:32:25,086] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:32:30,072] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:32:35,051] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:32:40,086] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:32:45,094] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:32:50,049] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:32:55,071] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:33:00,081] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:33:05,119] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:33:10,056] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:33:15,081] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:33:20,084] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:33:25,044] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:33:30,106] {jobs.py:806} ERROR - Task instance ('test_27', 'test_subdag_with_pool', datetime.datetime(2016, 3, 26, 0, 0)) failed
[2016-03-30 01:33:30,107] {jobs.py:863} INFO - [backfill progress] waiting: 0 | succeeded: 0 | kicked_off: 1 | failed: 1 | wont_run: 0 
Traceback (most recent call last):
  File "/home/airflow/.pyenv/versions/airflow/bin/airflow", line 4, in <module>
    __import__('pkg_resources').run_script('airflow==1.6.2', 'airflow')
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/pkg_resources/__init__.py", line 726, in run_script
    self.require(requires)[0].run_script(script_name, ns)
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/pkg_resources/__init__.py", line 1484, in run_script
    exec(code, namespace, namespace)
  File "/home/airflow/.pyenv/versions/2.7.10/envs/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/EGG-INFO/scripts/airflow", line 15, in <module>
    args.func(args)
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/bin/cli.py", line 87, in backfill
    pool=args.pool)
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/models.py", line 2630, in run
    job.run()
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/jobs.py", line 175, in run
    self._execute()
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/jobs.py", line 872, in _execute
    raise AirflowException(msg)
airflow.exceptions.AirflowException: ------------------------------------------
Some tasks instances failed, here's the list:
[('test_27', 'test_subdag_with_pool', datetime.datetime(2016, 3, 26, 0, 0))]

image

I'll continue testing different configurations.

@syvineckruyk
Copy link
Contributor Author

Set retries=2 and retry_delay=10

from datetime import datetime, timedelta
from airflow.models import DAG, Pool
from airflow.operators import BashOperator, SubDagOperator

import airflow
session = airflow.settings.Session()

pool = (
    session.query(Pool)
    .filter(Pool.pool == 'test_pool')
    .first())
if not pool:
    session.add(Pool(pool='test_pool', slots=10))
    session.commit()


dag_name = 'test_28'
test_pool = 'test_pool'

start_time = datetime(2016, 3, 25)

default_args = {
    'owner': 'Test',
    'depends_on_past': False,
    'start_date': start_time,
    # 'email': [email_to],
    # 'email_on_failure': True,
    # 'email_on_retry': True,
    # 'wait_for_downstream': False,
}

# Create the dag object
dag = DAG(dag_name,
          default_args=default_args,
          schedule_interval=timedelta(days=1))


def get_subdag(dag, sd_id, pool=None):
    subdag = DAG(
        dag_id='{parent_dag}.{sd_id}'.format(
            parent_dag=dag.dag_id,
            sd_id=sd_id),
        params=dag.params,
        default_args=dag.default_args,
        template_searchpath=dag.template_searchpath,
        user_defined_macros=dag.user_defined_macros,
    )

    t1 = BashOperator(
        task_id='test_task',
        bash_command='echo "hello" && sleep 0.2',
        dag=subdag,
        pool=pool
    )

    t2 = BashOperator(
        task_id='test_task_2_with_exit_1',
        bash_command='echo "hello" && sleep 0.2 && exit 1',
        dag=subdag,
        pool=pool
    )

    t2.set_upstream(t1)

    sdo = SubDagOperator(
        task_id=sd_id,
        subdag=subdag,
        retries=2,
        retry_delay=timedelta(seconds=10),
        dag=dag,
        depends_on_past=True,
    )

    return sdo

# sd1 = get_subdag(dag, sd_id='test_subdag')
sd2 = get_subdag(dag, sd_id='test_subdag_with_pool', pool=test_pool)
(airflow) [airflow@*** ~]$ airflow backfill test_28 -s 2016-03-25 -e 2016-03-25
[2016-03-30 01:42:52,081] {__init__.py:36} INFO - Using executor CeleryExecutor
[2016-03-30 01:42:52,504] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags
[2016-03-30 01:42:52,504] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 01:42:52,568] {models.py:317} INFO - Loaded DAG <DAG: test_28.test_subdag_with_pool>
[2016-03-30 01:42:52,568] {models.py:317} INFO - Loaded DAG <DAG: test_28>
[2016-03-30 01:42:52,596] {base_executor.py:35} INFO - Adding to queue: airflow run test_28 test_subdag_with_pool 2016-03-25T00:00:00 --pickle 4 --local -s 2016-03-25T00:00:00 
[2016-03-30 01:42:58,018] {celery_executor.py:66} INFO - [celery] queuing ('test_28', 'test_subdag_with_pool', datetime.datetime(2016, 3, 25, 0, 0)) through celery, queue=dc
[2016-03-30 01:42:58,130] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:43:03,060] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:43:08,069] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:43:13,056] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:43:18,079] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:43:23,065] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:43:28,091] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:43:33,058] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:43:38,205] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:43:43,063] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:43:48,099] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:43:53,196] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:43:58,099] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:44:03,093] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:44:08,062] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:44:13,065] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:44:18,023] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:44:18,028] {base_executor.py:35} INFO - Adding to queue: airflow run test_28 test_subdag_with_pool 2016-03-25T00:00:00 --pickle 4 --local -s 2016-03-25T00:00:00 
[2016-03-30 01:44:23,015] {celery_executor.py:66} INFO - [celery] queuing ('test_28', 'test_subdag_with_pool', datetime.datetime(2016, 3, 25, 0, 0)) through celery, queue=dc
[2016-03-30 01:44:23,059] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:44:28,060] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:44:33,146] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:44:38,159] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:44:43,074] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:44:48,059] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:44:53,083] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:44:58,145] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:45:03,037] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:45:08,120] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:45:13,142] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:45:18,013] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:45:18,018] {base_executor.py:35} INFO - Adding to queue: airflow run test_28 test_subdag_with_pool 2016-03-25T00:00:00 --pickle 4 --local -s 2016-03-25T00:00:00 
[2016-03-30 01:45:23,021] {celery_executor.py:66} INFO - [celery] queuing ('test_28', 'test_subdag_with_pool', datetime.datetime(2016, 3, 25, 0, 0)) through celery, queue=dc
[2016-03-30 01:45:23,039] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:45:28,039] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:45:33,107] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:45:38,036] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:45:43,105] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:45:48,065] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:45:53,084] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:45:58,176] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:46:03,155] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:46:08,060] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:46:13,079] {jobs.py:806} ERROR - Task instance ('test_28', 'test_subdag_with_pool', datetime.datetime(2016, 3, 25, 0, 0)) failed
[2016-03-30 01:46:13,079] {jobs.py:863} INFO - [backfill progress] waiting: 0 | succeeded: 0 | kicked_off: 1 | failed: 1 | wont_run: 0 
Traceback (most recent call last):
  File "/home/airflow/.pyenv/versions/airflow/bin/airflow", line 4, in <module>
    __import__('pkg_resources').run_script('airflow==1.6.2', 'airflow')
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/pkg_resources/__init__.py", line 726, in run_script
    self.require(requires)[0].run_script(script_name, ns)
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/pkg_resources/__init__.py", line 1484, in run_script
    exec(code, namespace, namespace)
  File "/home/airflow/.pyenv/versions/2.7.10/envs/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/EGG-INFO/scripts/airflow", line 15, in <module>
    args.func(args)
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/bin/cli.py", line 87, in backfill
    pool=args.pool)
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/models.py", line 2630, in run
    job.run()
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/jobs.py", line 175, in run
    self._execute()
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/jobs.py", line 872, in _execute
    raise AirflowException(msg)
airflow.exceptions.AirflowException: ------------------------------------------
Some tasks instances failed, here's the list:
[('test_28', 'test_subdag_with_pool', datetime.datetime(2016, 3, 25, 0, 0))]



(airflow) [airflow@*** ~]$ airflow backfill test_28 -s 2016-03-26 -e 2016-03-26
[2016-03-30 01:47:35,730] {__init__.py:36} INFO - Using executor CeleryExecutor
[2016-03-30 01:47:36,161] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags
[2016-03-30 01:47:36,161] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 01:47:36,249] {models.py:317} INFO - Loaded DAG <DAG: test_28.test_subdag_with_pool>
[2016-03-30 01:47:36,249] {models.py:317} INFO - Loaded DAG <DAG: test_28>
[2016-03-30 01:47:36,276] {base_executor.py:35} INFO - Adding to queue: airflow run test_28 test_subdag_with_pool 2016-03-26T00:00:00 --pickle 5 --local -s 2016-03-26T00:00:00 
[2016-03-30 01:47:41,020] {celery_executor.py:66} INFO - [celery] queuing ('test_28', 'test_subdag_with_pool', datetime.datetime(2016, 3, 26, 0, 0)) through celery, queue=dc
[2016-03-30 01:47:41,121] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:47:46,050] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:47:51,158] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:47:56,072] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:48:01,175] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:48:06,138] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:48:11,055] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:48:16,097] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:48:21,053] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:48:26,175] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:48:31,082] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:48:36,104] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:48:41,168] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:48:46,047] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:48:51,053] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:48:56,020] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:48:56,025] {base_executor.py:35} INFO - Adding to queue: airflow run test_28 test_subdag_with_pool 2016-03-26T00:00:00 --pickle 5 --local -s 2016-03-26T00:00:00 
[2016-03-30 01:49:01,023] {celery_executor.py:66} INFO - [celery] queuing ('test_28', 'test_subdag_with_pool', datetime.datetime(2016, 3, 26, 0, 0)) through celery, queue=dc
[2016-03-30 01:49:01,069] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:49:06,100] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:49:11,094] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:49:16,060] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:49:21,066] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:49:26,113] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:49:31,071] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:49:36,065] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:49:41,064] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:49:46,025] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:49:46,030] {base_executor.py:35} INFO - Adding to queue: airflow run test_28 test_subdag_with_pool 2016-03-26T00:00:00 --pickle 5 --local -s 2016-03-26T00:00:00 
[2016-03-30 01:49:51,022] {celery_executor.py:66} INFO - [celery] queuing ('test_28', 'test_subdag_with_pool', datetime.datetime(2016, 3, 26, 0, 0)) through celery, queue=dc
[2016-03-30 01:49:51,169] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:49:56,040] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:50:01,064] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:50:06,077] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:50:11,094] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:50:16,044] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:50:21,042] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:50:26,037] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:50:31,080] {jobs.py:806} ERROR - Task instance ('test_28', 'test_subdag_with_pool', datetime.datetime(2016, 3, 26, 0, 0)) failed
[2016-03-30 01:50:31,081] {jobs.py:863} INFO - [backfill progress] waiting: 0 | succeeded: 0 | kicked_off: 1 | failed: 1 | wont_run: 0 
Traceback (most recent call last):
  File "/home/airflow/.pyenv/versions/airflow/bin/airflow", line 4, in <module>
    __import__('pkg_resources').run_script('airflow==1.6.2', 'airflow')
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/pkg_resources/__init__.py", line 726, in run_script
    self.require(requires)[0].run_script(script_name, ns)
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/pkg_resources/__init__.py", line 1484, in run_script
    exec(code, namespace, namespace)
  File "/home/airflow/.pyenv/versions/2.7.10/envs/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/EGG-INFO/scripts/airflow", line 15, in <module>
    args.func(args)
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/bin/cli.py", line 87, in backfill
    pool=args.pool)
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/models.py", line 2630, in run
    job.run()
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/jobs.py", line 175, in run
    self._execute()
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/jobs.py", line 872, in _execute
    raise AirflowException(msg)
airflow.exceptions.AirflowException: ------------------------------------------
Some tasks instances failed, here's the list:
[('test_28', 'test_subdag_with_pool', datetime.datetime(2016, 3, 26, 0, 0))]

image

This also seems to work as expected ... will drill through the logs though to make sure.

Should I be able to test with depends_on_past = true ? Or is there still an open issue ?

Thanks !

@syvineckruyk
Copy link
Contributor Author

@jlowin
Same DAG (with a version bump) enters a weird loop when run by scheduler.

from datetime import datetime, timedelta
from airflow.models import DAG, Pool
from airflow.operators import BashOperator, SubDagOperator

import airflow
session = airflow.settings.Session()

pool = (
    session.query(Pool)
    .filter(Pool.pool == 'test_pool')
    .first())
if not pool:
    session.add(Pool(pool='test_pool', slots=10))
    session.commit()


dag_name = 'test_30'
test_pool = 'test_pool'

start_time = datetime(2016, 3, 25)

default_args = {
    'owner': 'Test',
    'depends_on_past': False,
    'start_date': start_time,
    # 'email': [email_to],
    # 'email_on_failure': True,
    # 'email_on_retry': True,
    # 'wait_for_downstream': False,
}

# Create the dag object
dag = DAG(dag_name,
          default_args=default_args,
          schedule_interval=timedelta(days=1))


def get_subdag(dag, sd_id, pool=None):
    subdag = DAG(
        dag_id='{parent_dag}.{sd_id}'.format(
            parent_dag=dag.dag_id,
            sd_id=sd_id),
        params=dag.params,
        default_args=dag.default_args,
        template_searchpath=dag.template_searchpath,
        user_defined_macros=dag.user_defined_macros,
    )

    t1 = BashOperator(
        task_id='test_task',
        bash_command='echo "hello" && sleep 0.2',
        dag=subdag,
        pool=pool
    )

    t2 = BashOperator(
        task_id='test_task_2_with_exit_1',
        bash_command='echo "hello" && sleep 0.2 && exit 1',
        dag=subdag,
        pool=pool
    )

    t2.set_upstream(t1)

    sdo = SubDagOperator(
        task_id=sd_id,
        subdag=subdag,
        retries=2,
        retry_delay=timedelta(seconds=10),
        dag=dag,
        depends_on_past=True,
    )

    return sdo

# sd1 = get_subdag(dag, sd_id='test_subdag')
sd2 = get_subdag(dag, sd_id='test_subdag_with_pool', pool=test_pool)

The loop is not infinite ... but the sub-task queues, fails, and re-runs many times before the error bubbles up to the subdag operator.

subdag operator log: node 1:

[2016-03-30 02:20:05,379] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 02:20:05,379] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 02:20:05,456] {models.py:317} INFO - Loaded DAG <DAG: test_30.test_subdag_with_pool>
[2016-03-30 02:20:05,456] {models.py:317} INFO - Loaded DAG <DAG: test_30>
[2016-03-30 02:20:06,627] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 02:20:06,627] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 02:20:06,662] {models.py:317} INFO - Loaded DAG <DAG: test_30.test_subdag_with_pool>
[2016-03-30 02:20:06,662] {models.py:317} INFO - Loaded DAG <DAG: test_30>
[2016-03-30 02:20:06,675] {models.py:1037} INFO - 
--------------------------------------------------------------------------------
Starting attempt 1 of 3
--------------------------------------------------------------------------------

[2016-03-30 02:20:06,689] {models.py:1060} INFO - Executing <Task(SubDagOperator): test_subdag_with_pool> on 2016-03-25 00:00:00
[2016-03-30 02:20:06,733] {base_executor.py:35} INFO - Adding to queue: airflow run test_30.test_subdag_with_pool test_task 2016-03-25T00:00:00 --local --pool test_pool -s 2016-03-25T00:00:00 -sd DAGS_FOLDER/steven_test/main.py 
[2016-03-30 02:20:12,012] {celery_executor.py:66} INFO - [celery] queuing ('test_30.test_subdag_with_pool', 'test_task', datetime.datetime(2016, 3, 25, 0, 0)) through celery, queue=dc
[2016-03-30 02:20:12,120] {jobs.py:863} INFO - [backfill progress] waiting: 2 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 02:20:17,081] {jobs.py:863} INFO - [backfill progress] waiting: 2 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 02:20:22,072] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 02:20:22,094] {base_executor.py:35} INFO - Adding to queue: airflow run test_30.test_subdag_with_pool test_task_2_with_exit_1 2016-03-25T00:00:00 --local --pool test_pool -s 2016-03-25T00:00:00 -sd DAGS_FOLDER/steven_test/main.py 
[2016-03-30 02:20:27,017] {celery_executor.py:66} INFO - [celery] queuing ('test_30.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 25, 0, 0)) through celery, queue=dc
[2016-03-30 02:20:27,025] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 2 | failed: 0 | wont_run: 0 
[2016-03-30 02:20:32,060] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 2 | failed: 0 | wont_run: 0 
[2016-03-30 02:20:37,072] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 2 | failed: 0 | wont_run: 0 
[2016-03-30 02:20:37,085] {base_executor.py:35} INFO - Adding to queue: airflow run test_30.test_subdag_with_pool test_task_2_with_exit_1 2016-03-25T00:00:00 --local --pool test_pool -s 2016-03-25T00:00:00 -sd DAGS_FOLDER/steven_test/main.py 
[2016-03-30 02:20:42,016] {celery_executor.py:66} INFO - [celery] queuing ('test_30.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 25, 0, 0)) through celery, queue=dc
[2016-03-30 02:20:42,189] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 2 | failed: 0 | wont_run: 0 
[2016-03-30 02:20:47,055] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 2 | failed: 0 | wont_run: 0 
[2016-03-30 02:20:52,066] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 2 | failed: 0 | wont_run: 0 
[2016-03-30 02:20:52,082] {base_executor.py:35} INFO - Adding to queue: airflow run test_30.test_subdag_with_pool test_task_2_with_exit_1 2016-03-25T00:00:00 --local --pool test_pool -s 2016-03-25T00:00:00 -sd DAGS_FOLDER/steven_test/main.py 
[2016-03-30 02:20:57,021] {celery_executor.py:66} INFO - [celery] queuing ('test_30.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 25, 0, 0)) through celery, queue=dc
[2016-03-30 02:20:57,032] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 2 | failed: 0 | wont_run: 0 
[2016-03-30 02:21:02,076] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 2 | failed: 0 | wont_run: 0 
[2016-03-30 02:21:07,071] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 2 | failed: 0 | wont_run: 0 
[2016-03-30 02:21:07,082] {base_executor.py:35} INFO - Adding to queue: airflow run test_30.test_subdag_with_pool test_task_2_with_exit_1 2016-03-25T00:00:00 --local --pool test_pool -s 2016-03-25T00:00:00 -sd DAGS_FOLDER/steven_test/main.py 
[2016-03-30 02:21:12,015] {celery_executor.py:66} INFO - [celery] queuing ('test_30.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 25, 0, 0)) through celery, queue=dc
[2016-03-30 02:21:12,025] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 2 | failed: 0 | wont_run: 0 
[2016-03-30 02:21:17,050] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 2 | failed: 0 | wont_run: 0 
[2016-03-30 02:21:22,043] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 2 | failed: 0 | wont_run: 0 
[2016-03-30 02:21:22,055] {base_executor.py:35} INFO - Adding to queue: airflow run test_30.test_subdag_with_pool test_task_2_with_exit_1 2016-03-25T00:00:00 --local --pool test_pool -s 2016-03-25T00:00:00 -sd DAGS_FOLDER/steven_test/main.py 
[2016-03-30 02:21:27,013] {celery_executor.py:66} INFO - [celery] queuing ('test_30.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 25, 0, 0)) through celery, queue=dc
[2016-03-30 02:21:27,023] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 2 | failed: 0 | wont_run: 0 
[2016-03-30 02:21:32,043] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 2 | failed: 0 | wont_run: 0 
[2016-03-30 02:21:37,057] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 2 | failed: 0 | wont_run: 0 
[2016-03-30 02:21:37,070] {base_executor.py:35} INFO - Adding to queue: airflow run test_30.test_subdag_with_pool test_task_2_with_exit_1 2016-03-25T00:00:00 --local --pool test_pool -s 2016-03-25T00:00:00 -sd DAGS_FOLDER/steven_test/main.py 
[2016-03-30 02:21:42,013] {celery_executor.py:66} INFO - [celery] queuing ('test_30.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 25, 0, 0)) through celery, queue=dc
[2016-03-30 02:21:42,043] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 2 | failed: 0 | wont_run: 0 
[2016-03-30 02:21:47,039] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 2 | failed: 0 | wont_run: 0 
[2016-03-30 02:21:52,053] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 2 | failed: 0 | wont_run: 0 
[2016-03-30 02:21:52,067] {base_executor.py:35} INFO - Adding to queue: airflow run test_30.test_subdag_with_pool test_task_2_with_exit_1 2016-03-25T00:00:00 --local --pool test_pool -s 2016-03-25T00:00:00 -sd DAGS_FOLDER/steven_test/main.py 
[2016-03-30 02:21:57,018] {celery_executor.py:66} INFO - [celery] queuing ('test_30.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 25, 0, 0)) through celery, queue=dc
[2016-03-30 02:21:57,025] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 2 | failed: 0 | wont_run: 0 
[2016-03-30 02:22:02,062] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 2 | failed: 0 | wont_run: 0 
[2016-03-30 02:22:07,059] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 2 | failed: 0 | wont_run: 0 
[2016-03-30 02:22:07,071] {base_executor.py:35} INFO - Adding to queue: airflow run test_30.test_subdag_with_pool test_task_2_with_exit_1 2016-03-25T00:00:00 --local --pool test_pool -s 2016-03-25T00:00:00 -sd DAGS_FOLDER/steven_test/main.py 
[2016-03-30 02:22:12,012] {celery_executor.py:66} INFO - [celery] queuing ('test_30.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 25, 0, 0)) through celery, queue=dc
[2016-03-30 02:22:12,024] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 2 | failed: 0 | wont_run: 0 
[2016-03-30 02:22:17,054] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 2 | failed: 0 | wont_run: 0 
[2016-03-30 02:22:22,050] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 2 | failed: 0 | wont_run: 0 
[2016-03-30 02:22:22,063] {base_executor.py:35} INFO - Adding to queue: airflow run test_30.test_subdag_with_pool test_task_2_with_exit_1 2016-03-25T00:00:00 --local --pool test_pool -s 2016-03-25T00:00:00 -sd DAGS_FOLDER/steven_test/main.py 
[2016-03-30 02:22:27,026] {celery_executor.py:66} INFO - [celery] queuing ('test_30.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 25, 0, 0)) through celery, queue=dc
[2016-03-30 02:22:27,034] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 2 | failed: 0 | wont_run: 0 
[2016-03-30 02:22:32,063] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 2 | failed: 0 | wont_run: 0 
[2016-03-30 02:22:37,054] {jobs.py:806} ERROR - Task instance ('test_30.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 25, 0, 0)) failed
[2016-03-30 02:22:37,054] {jobs.py:863} INFO - [backfill progress] waiting: 0 | succeeded: 1 | kicked_off: 2 | failed: 1 | wont_run: 0 
[2016-03-30 02:22:37,055] {models.py:1125} ERROR - ------------------------------------------
Some tasks instances failed, here's the list:
[('test_30.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 25, 0, 0))]
Traceback (most recent call last):
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/models.py", line 1086, in run
    result = task_copy.execute(context=context)
  File "/home/airflow/.pyenv/versions/2.7.10/envs/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/operators/subdag_operator.py", line 45, in execute
    executor=self.executor)
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/models.py", line 2630, in run
    job.run()
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/jobs.py", line 175, in run
    self._execute()
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/jobs.py", line 872, in _execute
    raise AirflowException(msg)
AirflowException: ------------------------------------------
Some tasks instances failed, here's the list:
[('test_30.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 25, 0, 0))]
[2016-03-30 02:22:37,056] {models.py:1137} INFO - Marking task as UP_FOR_RETRY
[2016-03-30 02:22:37,075] {models.py:1166} ERROR - ------------------------------------------
Some tasks instances failed, here's the list:
[('test_30.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 25, 0, 0))]
[2016-03-30 02:22:52,255] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 02:22:52,256] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 02:22:52,351] {models.py:317} INFO - Loaded DAG <DAG: test_30.test_subdag_with_pool>
[2016-03-30 02:22:52,351] {models.py:317} INFO - Loaded DAG <DAG: test_30>
[2016-03-30 02:22:53,514] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 02:22:53,514] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 02:22:53,551] {models.py:317} INFO - Loaded DAG <DAG: test_30.test_subdag_with_pool>
[2016-03-30 02:22:53,551] {models.py:317} INFO - Loaded DAG <DAG: test_30>
[2016-03-30 02:22:53,564] {models.py:1037} INFO - 
--------------------------------------------------------------------------------
Starting attempt 2 of 3
--------------------------------------------------------------------------------

[2016-03-30 02:22:53,582] {models.py:1060} INFO - Executing <Task(SubDagOperator): test_subdag_with_pool> on 2016-03-25 00:00:00
[2016-03-30 02:22:53,629] {base_executor.py:35} INFO - Adding to queue: airflow run test_30.test_subdag_with_pool test_task_2_with_exit_1 2016-03-25T00:00:00 --local --pool test_pool -s 2016-03-25T00:00:00 -sd DAGS_FOLDER/steven_test/main.py 
[2016-03-30 02:22:59,017] {celery_executor.py:66} INFO - [celery] queuing ('test_30.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 25, 0, 0)) through celery, queue=dc
[2016-03-30 02:22:59,118] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 02:23:04,083] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 02:23:09,072] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 02:23:09,083] {base_executor.py:35} INFO - Adding to queue: airflow run test_30.test_subdag_with_pool test_task_2_with_exit_1 2016-03-25T00:00:00 --local --pool test_pool -s 2016-03-25T00:00:00 -sd DAGS_FOLDER/steven_test/main.py 
[2016-03-30 02:23:14,011] {celery_executor.py:66} INFO - [celery] queuing ('test_30.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 25, 0, 0)) through celery, queue=dc
[2016-03-30 02:23:14,021] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 02:23:19,064] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 02:23:24,079] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 02:23:24,092] {base_executor.py:35} INFO - Adding to queue: airflow run test_30.test_subdag_with_pool test_task_2_with_exit_1 2016-03-25T00:00:00 --local --pool test_pool -s 2016-03-25T00:00:00 -sd DAGS_FOLDER/steven_test/main.py 
[2016-03-30 02:23:29,017] {celery_executor.py:66} INFO - [celery] queuing ('test_30.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 25, 0, 0)) through celery, queue=dc
[2016-03-30 02:23:29,030] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 02:23:34,063] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 02:23:39,093] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 02:23:39,106] {base_executor.py:35} INFO - Adding to queue: airflow run test_30.test_subdag_with_pool test_task_2_with_exit_1 2016-03-25T00:00:00 --local --pool test_pool -s 2016-03-25T00:00:00 -sd DAGS_FOLDER/steven_test/main.py 
[2016-03-30 02:23:44,018] {celery_executor.py:66} INFO - [celery] queuing ('test_30.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 25, 0, 0)) through celery, queue=dc
[2016-03-30 02:23:44,029] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 02:23:49,069] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 02:23:54,069] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 02:23:54,079] {base_executor.py:35} INFO - Adding to queue: airflow run test_30.test_subdag_with_pool test_task_2_with_exit_1 2016-03-25T00:00:00 --local --pool test_pool -s 2016-03-25T00:00:00 -sd DAGS_FOLDER/steven_test/main.py 
[2016-03-30 02:23:59,015] {celery_executor.py:66} INFO - [celery] queuing ('test_30.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 25, 0, 0)) through celery, queue=dc
[2016-03-30 02:23:59,027] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 02:24:04,042] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 02:24:09,039] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 02:24:09,051] {base_executor.py:35} INFO - Adding to queue: airflow run test_30.test_subdag_with_pool test_task_2_with_exit_1 2016-03-25T00:00:00 --local --pool test_pool -s 2016-03-25T00:00:00 -sd DAGS_FOLDER/steven_test/main.py 
[2016-03-30 02:24:14,014] {celery_executor.py:66} INFO - [celery] queuing ('test_30.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 25, 0, 0)) through celery, queue=dc
[2016-03-30 02:24:14,024] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 02:24:19,040] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 02:24:24,042] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 02:24:24,055] {base_executor.py:35} INFO - Adding to queue: airflow run test_30.test_subdag_with_pool test_task_2_with_exit_1 2016-03-25T00:00:00 --local --pool test_pool -s 2016-03-25T00:00:00 -sd DAGS_FOLDER/steven_test/main.py 
[2016-03-30 02:24:29,012] {celery_executor.py:66} INFO - [celery] queuing ('test_30.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 25, 0, 0)) through celery, queue=dc
[2016-03-30 02:24:29,023] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 02:24:34,067] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 02:24:39,061] {jobs.py:806} ERROR - Task instance ('test_30.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 25, 0, 0)) failed
[2016-03-30 02:24:39,061] {jobs.py:863} INFO - [backfill progress] waiting: 0 | succeeded: 1 | kicked_off: 1 | failed: 1 | wont_run: 0 
[2016-03-30 02:24:39,062] {models.py:1125} ERROR - ------------------------------------------
Some tasks instances failed, here's the list:
[('test_30.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 25, 0, 0))]
Traceback (most recent call last):
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/models.py", line 1086, in run
    result = task_copy.execute(context=context)
  File "/home/airflow/.pyenv/versions/2.7.10/envs/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/operators/subdag_operator.py", line 45, in execute
    executor=self.executor)
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/models.py", line 2630, in run
    job.run()
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/jobs.py", line 175, in run
    self._execute()
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/jobs.py", line 872, in _execute
    raise AirflowException(msg)
AirflowException: ------------------------------------------
Some tasks instances failed, here's the list:
[('test_30.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 25, 0, 0))]
[2016-03-30 02:24:39,063] {models.py:1137} INFO - Marking task as UP_FOR_RETRY
[2016-03-30 02:24:39,077] {models.py:1166} ERROR - ------------------------------------------
Some tasks instances failed, here's the list:
[('test_30.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 25, 0, 0))]

**
subdag operator log: node 2:**

[2016-03-30 02:24:55,354] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 02:24:55,354] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 02:24:55,411] {models.py:317} INFO - Loaded DAG <DAG: test_30.test_subdag_with_pool>
[2016-03-30 02:24:55,412] {models.py:317} INFO - Loaded DAG <DAG: test_30>
[2016-03-30 02:24:56,395] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 02:24:56,395] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 02:24:56,431] {models.py:317} INFO - Loaded DAG <DAG: test_30.test_subdag_with_pool>
[2016-03-30 02:24:56,431] {models.py:317} INFO - Loaded DAG <DAG: test_30>
[2016-03-30 02:24:56,445] {models.py:1037} INFO - 
--------------------------------------------------------------------------------
Starting attempt 3 of 3
--------------------------------------------------------------------------------

[2016-03-30 02:24:56,459] {models.py:1060} INFO - Executing <Task(SubDagOperator): test_subdag_with_pool> on 2016-03-25 00:00:00
[2016-03-30 02:24:56,499] {base_executor.py:35} INFO - Adding to queue: airflow run test_30.test_subdag_with_pool test_task_2_with_exit_1 2016-03-25T00:00:00 --local --pool test_pool -s 2016-03-25T00:00:00 -sd DAGS_FOLDER/steven_test/main.py 
[2016-03-30 02:25:01,022] {celery_executor.py:66} INFO - [celery] queuing ('test_30.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 25, 0, 0)) through celery, queue=dc
[2016-03-30 02:25:01,113] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 02:25:06,047] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 02:25:11,056] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 02:25:16,049] {jobs.py:806} ERROR - Task instance ('test_30.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 25, 0, 0)) failed
[2016-03-30 02:25:16,049] {jobs.py:863} INFO - [backfill progress] waiting: 0 | succeeded: 1 | kicked_off: 1 | failed: 1 | wont_run: 0 
[2016-03-30 02:25:16,049] {models.py:1125} ERROR - ------------------------------------------
Some tasks instances failed, here's the list:
[('test_30.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 25, 0, 0))]
Traceback (most recent call last):
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/models.py", line 1086, in run
    result = task_copy.execute(context=context)
  File "/home/airflow/.pyenv/versions/2.7.10/envs/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/operators/subdag_operator.py", line 45, in execute
    executor=self.executor)
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/models.py", line 2630, in run
    job.run()
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/jobs.py", line 175, in run
    self._execute()
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/jobs.py", line 872, in _execute
    raise AirflowException(msg)
AirflowException: ------------------------------------------
Some tasks instances failed, here's the list:
[('test_30.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 25, 0, 0))]
[2016-03-30 02:25:16,050] {models.py:1143} INFO - All retries failed; marking task as FAILED
[2016-03-30 02:25:16,066] {models.py:1166} ERROR - ------------------------------------------
Some tasks instances failed, here's the list:
[('test_30.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 25, 0, 0))]
(airflow) [airflow@usw2dcdpag01 ~]$ 

sub-task log: from ui:

[2016-03-30 02:20:36,783] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 02:20:36,783] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 02:20:36,819] {models.py:317} INFO - Loaded DAG <DAG: test_30.test_subdag_with_pool>
[2016-03-30 02:20:36,819] {models.py:317} INFO - Loaded DAG <DAG: test_30>
[2016-03-30 02:20:37,974] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 02:20:37,974] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 02:20:38,012] {models.py:317} INFO - Loaded DAG <DAG: test_30.test_subdag_with_pool>
[2016-03-30 02:20:38,012] {models.py:317} INFO - Loaded DAG <DAG: test_30>
[2016-03-30 02:20:38,027] {models.py:1037} INFO - 
--------------------------------------------------------------------------------
Starting attempt 1 of 1
--------------------------------------------------------------------------------

[2016-03-30 02:20:38,041] {models.py:1060} INFO - Executing <Task(BashOperator): test_task_2_with_exit_1> on 2016-03-25 00:00:00
[2016-03-30 02:20:38,059] {bash_operator.py:55} INFO - tmp dir root location: 
/tmp
[2016-03-30 02:20:38,061] {bash_operator.py:64} INFO - Temporary script location :/tmp/airflowtmp00DHHF//tmp/airflowtmp00DHHF/test_task_2_with_exit_1WRaiEi
[2016-03-30 02:20:38,061] {bash_operator.py:65} INFO - Running command: echo "hello" && sleep 0.2 && exit 1
[2016-03-30 02:20:38,065] {bash_operator.py:73} INFO - Output:
[2016-03-30 02:20:38,066] {bash_operator.py:77} INFO - hello
[2016-03-30 02:20:38,267] {bash_operator.py:80} INFO - Command exited with return code 1
[2016-03-30 02:20:38,268] {models.py:1125} ERROR - Bash command failed
Traceback (most recent call last):
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/models.py", line 1086, in run
    result = task_copy.execute(context=context)
  File "/home/airflow/.pyenv/versions/2.7.10/envs/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/operators/bash_operator.py", line 83, in execute
    raise AirflowException("Bash command failed")
AirflowException: Bash command failed
[2016-03-30 02:20:38,269] {models.py:1145} INFO - Marking task as FAILED.
[2016-03-30 02:20:38,286] {models.py:1166} ERROR - Bash command failed
[2016-03-30 02:20:52,315] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 02:20:52,315] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 02:20:52,351] {models.py:317} INFO - Loaded DAG <DAG: test_30.test_subdag_with_pool>
[2016-03-30 02:20:52,351] {models.py:317} INFO - Loaded DAG <DAG: test_30>
[2016-03-30 02:20:53,528] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 02:20:53,528] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 02:20:53,564] {models.py:317} INFO - Loaded DAG <DAG: test_30.test_subdag_with_pool>
[2016-03-30 02:20:53,564] {models.py:317} INFO - Loaded DAG <DAG: test_30>
[2016-03-30 02:20:53,581] {models.py:1037} INFO - 
--------------------------------------------------------------------------------
Starting attempt 1 of 1
--------------------------------------------------------------------------------

[2016-03-30 02:20:53,596] {models.py:1060} INFO - Executing <Task(BashOperator): test_task_2_with_exit_1> on 2016-03-25 00:00:00
[2016-03-30 02:20:53,613] {bash_operator.py:55} INFO - tmp dir root location: 
/tmp
[2016-03-30 02:20:53,614] {bash_operator.py:64} INFO - Temporary script location :/tmp/airflowtmpxFKcuu//tmp/airflowtmpxFKcuu/test_task_2_with_exit_1Z9sXI8
[2016-03-30 02:20:53,615] {bash_operator.py:65} INFO - Running command: echo "hello" && sleep 0.2 && exit 1
[2016-03-30 02:20:53,619] {bash_operator.py:73} INFO - Output:
[2016-03-30 02:20:53,620] {bash_operator.py:77} INFO - hello
[2016-03-30 02:20:53,821] {bash_operator.py:80} INFO - Command exited with return code 1
[2016-03-30 02:20:53,821] {models.py:1125} ERROR - Bash command failed
Traceback (most recent call last):
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/models.py", line 1086, in run
    result = task_copy.execute(context=context)
  File "/home/airflow/.pyenv/versions/2.7.10/envs/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/operators/bash_operator.py", line 83, in execute
    raise AirflowException("Bash command failed")
AirflowException: Bash command failed
[2016-03-30 02:20:53,823] {models.py:1145} INFO - Marking task as FAILED.
[2016-03-30 02:20:53,837] {models.py:1166} ERROR - Bash command failed
[2016-03-30 02:21:13,209] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 02:21:13,210] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 02:21:13,265] {models.py:317} INFO - Loaded DAG <DAG: test_30.test_subdag_with_pool>
[2016-03-30 02:21:13,265] {models.py:317} INFO - Loaded DAG <DAG: test_30>
[2016-03-30 02:21:14,413] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 02:21:14,414] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 02:21:14,452] {models.py:317} INFO - Loaded DAG <DAG: test_30.test_subdag_with_pool>
[2016-03-30 02:21:14,452] {models.py:317} INFO - Loaded DAG <DAG: test_30>
[2016-03-30 02:21:14,468] {models.py:1028} INFO - 
--------------------------------------------------------------------------------
Queuing attempt 1 of 1
--------------------------------------------------------------------------------

[2016-03-30 02:21:14,478] {models.py:1033} INFO - Queuing into pool test_pool
[2016-03-30 02:21:28,904] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 02:21:28,904] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 02:21:28,977] {models.py:317} INFO - Loaded DAG <DAG: test_30.test_subdag_with_pool>
[2016-03-30 02:21:28,977] {models.py:317} INFO - Loaded DAG <DAG: test_30>
[2016-03-30 02:21:30,144] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 02:21:30,145] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 02:21:30,182] {models.py:317} INFO - Loaded DAG <DAG: test_30.test_subdag_with_pool>
[2016-03-30 02:21:30,182] {models.py:317} INFO - Loaded DAG <DAG: test_30>
[2016-03-30 02:21:30,198] {models.py:1028} INFO - 
--------------------------------------------------------------------------------
Queuing attempt 1 of 1
--------------------------------------------------------------------------------

[2016-03-30 02:21:30,213] {models.py:1033} INFO - Queuing into pool test_pool
[2016-03-30 02:21:44,361] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 02:21:44,362] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 02:21:44,432] {models.py:317} INFO - Loaded DAG <DAG: test_30.test_subdag_with_pool>
[2016-03-30 02:21:44,432] {models.py:317} INFO - Loaded DAG <DAG: test_30>
[2016-03-30 02:21:45,615] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 02:21:45,615] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 02:21:45,658] {models.py:317} INFO - Loaded DAG <DAG: test_30.test_subdag_with_pool>
[2016-03-30 02:21:45,658] {models.py:317} INFO - Loaded DAG <DAG: test_30>
[2016-03-30 02:21:45,677] {models.py:1028} INFO - 
--------------------------------------------------------------------------------
Queuing attempt 1 of 1
--------------------------------------------------------------------------------

[2016-03-30 02:21:45,690] {models.py:1033} INFO - Queuing into pool test_pool
[2016-03-30 02:22:21,050] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 02:22:21,050] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 02:22:21,086] {models.py:317} INFO - Loaded DAG <DAG: test_30.test_subdag_with_pool>
[2016-03-30 02:22:21,087] {models.py:317} INFO - Loaded DAG <DAG: test_30>
[2016-03-30 02:22:22,268] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 02:22:22,268] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 02:22:22,303] {models.py:317} INFO - Loaded DAG <DAG: test_30.test_subdag_with_pool>
[2016-03-30 02:22:22,303] {models.py:317} INFO - Loaded DAG <DAG: test_30>
[2016-03-30 02:22:22,318] {models.py:1037} INFO - 
--------------------------------------------------------------------------------
Starting attempt 1 of 1
--------------------------------------------------------------------------------

[2016-03-30 02:22:22,338] {models.py:1060} INFO - Executing <Task(BashOperator): test_task_2_with_exit_1> on 2016-03-25 00:00:00
[2016-03-30 02:22:22,357] {bash_operator.py:55} INFO - tmp dir root location: 
/tmp
[2016-03-30 02:22:22,359] {bash_operator.py:64} INFO - Temporary script location :/tmp/airflowtmpRDlYlN//tmp/airflowtmpRDlYlN/test_task_2_with_exit_1RCl2T3
[2016-03-30 02:22:22,359] {bash_operator.py:65} INFO - Running command: echo "hello" && sleep 0.2 && exit 1
[2016-03-30 02:22:22,363] {bash_operator.py:73} INFO - Output:
[2016-03-30 02:22:22,364] {bash_operator.py:77} INFO - hello
[2016-03-30 02:22:22,565] {bash_operator.py:80} INFO - Command exited with return code 1
[2016-03-30 02:22:22,566] {models.py:1125} ERROR - Bash command failed
Traceback (most recent call last):
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/models.py", line 1086, in run
    result = task_copy.execute(context=context)
  File "/home/airflow/.pyenv/versions/2.7.10/envs/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/operators/bash_operator.py", line 83, in execute
    raise AirflowException("Bash command failed")
AirflowException: Bash command failed
[2016-03-30 02:22:22,567] {models.py:1145} INFO - Marking task as FAILED.
[2016-03-30 02:22:22,582] {models.py:1166} ERROR - Bash command failed
[2016-03-30 02:23:07,922] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 02:23:07,922] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 02:23:07,960] {models.py:317} INFO - Loaded DAG <DAG: test_30.test_subdag_with_pool>
[2016-03-30 02:23:07,960] {models.py:317} INFO - Loaded DAG <DAG: test_30>
[2016-03-30 02:23:09,128] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 02:23:09,128] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 02:23:09,168] {models.py:317} INFO - Loaded DAG <DAG: test_30.test_subdag_with_pool>
[2016-03-30 02:23:09,168] {models.py:317} INFO - Loaded DAG <DAG: test_30>
[2016-03-30 02:23:09,184] {models.py:1037} INFO - 
--------------------------------------------------------------------------------
Starting attempt 1 of 1
--------------------------------------------------------------------------------

[2016-03-30 02:23:09,198] {models.py:1060} INFO - Executing <Task(BashOperator): test_task_2_with_exit_1> on 2016-03-25 00:00:00
[2016-03-30 02:23:09,215] {bash_operator.py:55} INFO - tmp dir root location: 
/tmp
[2016-03-30 02:23:09,216] {bash_operator.py:64} INFO - Temporary script location :/tmp/airflowtmp80YCR0//tmp/airflowtmp80YCR0/test_task_2_with_exit_1PCUde2
[2016-03-30 02:23:09,216] {bash_operator.py:65} INFO - Running command: echo "hello" && sleep 0.2 && exit 1
[2016-03-30 02:23:09,221] {bash_operator.py:73} INFO - Output:
[2016-03-30 02:23:09,222] {bash_operator.py:77} INFO - hello
[2016-03-30 02:23:09,423] {bash_operator.py:80} INFO - Command exited with return code 1
[2016-03-30 02:23:09,423] {models.py:1125} ERROR - Bash command failed
Traceback (most recent call last):
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/models.py", line 1086, in run
    result = task_copy.execute(context=context)
  File "/home/airflow/.pyenv/versions/2.7.10/envs/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/operators/bash_operator.py", line 83, in execute
    raise AirflowException("Bash command failed")
AirflowException: Bash command failed
[2016-03-30 02:23:09,425] {models.py:1145} INFO - Marking task as FAILED.
[2016-03-30 02:23:09,437] {models.py:1166} ERROR - Bash command failed
[2016-03-30 02:23:23,540] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 02:23:23,540] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 02:23:23,578] {models.py:317} INFO - Loaded DAG <DAG: test_30.test_subdag_with_pool>
[2016-03-30 02:23:23,578] {models.py:317} INFO - Loaded DAG <DAG: test_30>
[2016-03-30 02:23:24,749] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 02:23:24,749] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 02:23:24,787] {models.py:317} INFO - Loaded DAG <DAG: test_30.test_subdag_with_pool>
[2016-03-30 02:23:24,788] {models.py:317} INFO - Loaded DAG <DAG: test_30>
[2016-03-30 02:23:24,804] {models.py:1037} INFO - 
--------------------------------------------------------------------------------
Starting attempt 1 of 1
--------------------------------------------------------------------------------

[2016-03-30 02:23:24,818] {models.py:1060} INFO - Executing <Task(BashOperator): test_task_2_with_exit_1> on 2016-03-25 00:00:00
[2016-03-30 02:23:24,835] {bash_operator.py:55} INFO - tmp dir root location: 
/tmp
[2016-03-30 02:23:24,836] {bash_operator.py:64} INFO - Temporary script location :/tmp/airflowtmpGTyqlC//tmp/airflowtmpGTyqlC/test_task_2_with_exit_14j9Ctl
[2016-03-30 02:23:24,836] {bash_operator.py:65} INFO - Running command: echo "hello" && sleep 0.2 && exit 1
[2016-03-30 02:23:24,841] {bash_operator.py:73} INFO - Output:
[2016-03-30 02:23:24,842] {bash_operator.py:77} INFO - hello
[2016-03-30 02:23:25,043] {bash_operator.py:80} INFO - Command exited with return code 1
[2016-03-30 02:23:25,043] {models.py:1125} ERROR - Bash command failed
Traceback (most recent call last):
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/models.py", line 1086, in run
    result = task_copy.execute(context=context)
  File "/home/airflow/.pyenv/versions/2.7.10/envs/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/operators/bash_operator.py", line 83, in execute
    raise AirflowException("Bash command failed")
AirflowException: Bash command failed
[2016-03-30 02:23:25,045] {models.py:1145} INFO - Marking task as FAILED.
[2016-03-30 02:23:25,062] {models.py:1166} ERROR - Bash command failed
[2016-03-30 02:24:00,177] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 02:24:00,177] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 02:24:00,242] {models.py:317} INFO - Loaded DAG <DAG: test_30.test_subdag_with_pool>
[2016-03-30 02:24:00,242] {models.py:317} INFO - Loaded DAG <DAG: test_30>
[2016-03-30 02:24:01,406] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 02:24:01,406] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 02:24:01,446] {models.py:317} INFO - Loaded DAG <DAG: test_30.test_subdag_with_pool>
[2016-03-30 02:24:01,446] {models.py:317} INFO - Loaded DAG <DAG: test_30>
[2016-03-30 02:24:01,461] {models.py:1028} INFO - 
--------------------------------------------------------------------------------
Queuing attempt 1 of 1
--------------------------------------------------------------------------------

[2016-03-30 02:24:01,471] {models.py:1033} INFO - Queuing into pool test_pool
[2016-03-30 02:24:15,665] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 02:24:15,665] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 02:24:15,737] {models.py:317} INFO - Loaded DAG <DAG: test_30.test_subdag_with_pool>
[2016-03-30 02:24:15,737] {models.py:317} INFO - Loaded DAG <DAG: test_30>
[2016-03-30 02:24:16,911] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 02:24:16,911] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 02:24:16,953] {models.py:317} INFO - Loaded DAG <DAG: test_30.test_subdag_with_pool>
[2016-03-30 02:24:16,953] {models.py:317} INFO - Loaded DAG <DAG: test_30>
[2016-03-30 02:24:16,968] {models.py:1028} INFO - 
--------------------------------------------------------------------------------
Queuing attempt 1 of 1
--------------------------------------------------------------------------------

[2016-03-30 02:24:16,978] {models.py:1033} INFO - Queuing into pool test_pool
[2016-03-30 02:24:31,183] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 02:24:31,184] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 02:24:31,240] {models.py:317} INFO - Loaded DAG <DAG: test_30.test_subdag_with_pool>
[2016-03-30 02:24:31,240] {models.py:317} INFO - Loaded DAG <DAG: test_30>
[2016-03-30 02:24:32,404] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 02:24:32,404] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 02:24:32,441] {models.py:317} INFO - Loaded DAG <DAG: test_30.test_subdag_with_pool>
[2016-03-30 02:24:32,441] {models.py:317} INFO - Loaded DAG <DAG: test_30>
[2016-03-30 02:24:32,457] {models.py:1028} INFO - 
--------------------------------------------------------------------------------
Queuing attempt 1 of 1
--------------------------------------------------------------------------------

[2016-03-30 02:24:32,476] {models.py:1033} INFO - Queuing into pool test_pool
[2016-03-30 02:25:12,436] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 02:25:12,436] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 02:25:12,502] {models.py:317} INFO - Loaded DAG <DAG: test_30.test_subdag_with_pool>
[2016-03-30 02:25:12,502] {models.py:317} INFO - Loaded DAG <DAG: test_30>
[2016-03-30 02:25:13,684] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 02:25:13,684] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 02:25:13,720] {models.py:317} INFO - Loaded DAG <DAG: test_30.test_subdag_with_pool>
[2016-03-30 02:25:13,720] {models.py:317} INFO - Loaded DAG <DAG: test_30>
[2016-03-30 02:25:13,735] {models.py:1037} INFO - 
--------------------------------------------------------------------------------
Starting attempt 1 of 1
--------------------------------------------------------------------------------

[2016-03-30 02:25:13,749] {models.py:1060} INFO - Executing <Task(BashOperator): test_task_2_with_exit_1> on 2016-03-25 00:00:00
[2016-03-30 02:25:13,767] {bash_operator.py:55} INFO - tmp dir root location: 
/tmp
[2016-03-30 02:25:13,768] {bash_operator.py:64} INFO - Temporary script location :/tmp/airflowtmp_3kW3N//tmp/airflowtmp_3kW3N/test_task_2_with_exit_13LwrXU
[2016-03-30 02:25:13,768] {bash_operator.py:65} INFO - Running command: echo "hello" && sleep 0.2 && exit 1
[2016-03-30 02:25:13,773] {bash_operator.py:73} INFO - Output:
[2016-03-30 02:25:13,774] {bash_operator.py:77} INFO - hello
[2016-03-30 02:25:13,975] {bash_operator.py:80} INFO - Command exited with return code 1
[2016-03-30 02:25:13,975] {models.py:1125} ERROR - Bash command failed
Traceback (most recent call last):
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/models.py", line 1086, in run
    result = task_copy.execute(context=context)
  File "/home/airflow/.pyenv/versions/2.7.10/envs/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/operators/bash_operator.py", line 83, in execute
    raise AirflowException("Bash command failed")
AirflowException: Bash command failed
[2016-03-30 02:25:13,977] {models.py:1145} INFO - Marking task as FAILED.
[2016-03-30 02:25:13,991] {models.py:1166} ERROR - Bash command failed

@syvineckruyk
Copy link
Contributor Author

@jlowin I noticed I had not set the depends_on_past=False in the subdag operator specification. Performed another test with this set and observed the same behavior as above. (multiple attempts before bubbling up to the subdag operator)

from datetime import datetime, timedelta
from airflow.models import DAG, Pool
from airflow.operators import BashOperator, SubDagOperator

import airflow
session = airflow.settings.Session()

pool = (
    session.query(Pool)
    .filter(Pool.pool == 'test_pool')
    .first())
if not pool:
    session.add(Pool(pool='test_pool', slots=10))
    session.commit()


dag_name = 'test_31'
test_pool = 'test_pool'

start_time = datetime(2016, 3, 25)

default_args = {
    'owner': 'Test',
    'depends_on_past': False,
    'start_date': start_time,
    # 'email': [email_to],
    # 'email_on_failure': True,
    # 'email_on_retry': True,
    # 'wait_for_downstream': False,
}

# Create the dag object
dag = DAG(dag_name,
          default_args=default_args,
          schedule_interval=timedelta(days=1))


def get_subdag(dag, sd_id, pool=None):
    subdag = DAG(
        dag_id='{parent_dag}.{sd_id}'.format(
            parent_dag=dag.dag_id,
            sd_id=sd_id),
        params=dag.params,
        default_args=dag.default_args,
        template_searchpath=dag.template_searchpath,
        user_defined_macros=dag.user_defined_macros,
    )

    t1 = BashOperator(
        task_id='test_task',
        bash_command='echo "hello" && sleep 0.2',
        dag=subdag,
        pool=pool
    )

    t2 = BashOperator(
        task_id='test_task_2_with_exit_1',
        bash_command='echo "hello" && sleep 0.2 && exit 1',
        dag=subdag,
        pool=pool
    )

    t2.set_upstream(t1)

    sdo = SubDagOperator(
        task_id=sd_id,
        subdag=subdag,
        retries=2,
        retry_delay=timedelta(seconds=10),
        dag=dag,
        depends_on_past=False,
    )

    return sdo

# sd1 = get_subdag(dag, sd_id='test_subdag')
sd2 = get_subdag(dag, sd_id='test_subdag_with_pool', pool=test_pool)

@jlowin
Copy link
Member

jlowin commented Mar 30, 2016

I've managed to replicate this behavior by dropping my scheduler_heartbeat_sec to 1 in airflow.cfg. I haven't found exactly where yet, but it looks like if you flood the CeleryExecutor with requests to run tasks, there's some critical moment where its guard is down and it schedules tasks that it's already run. That's my working hypothesis, anyway. What is your scheduler heartbeat interval set to?

@syvineckruyk
Copy link
Contributor Author

@jlowin
job_heartbeat_sec = 5
scheduler_heartbeat_sec = 5

intersting observation ! ... I'll run some tests with upped hearbeat times.

@jlowin
Copy link
Member

jlowin commented Mar 30, 2016

@syvineckruyk please give my latest commits a try.

Here's what I found (I know I keep writing that but the bittersweet news is you're hitting lots of pain points!): the SchedulerJob has a prioritize_queued() method that looks for queued tasks and tries to run them. Critically, it includes any queued task, no matter how it was run. The problem is that the subdag is being run by a BackfillJob with its OWN loop checking for queued tasks. So:

  1. the BackfillJob running a CeleryExecutor puts the job in "queued" status
  2. the 'BackfillJob' running a CeleryExecutor submits the task to the executor
  3. near-simultaneously, the SchedulerJob running a different CeleryExecutor (because it's on a different machine or process) submits the task to its own executor
  4. Whichever task is run first flips the task from queued -> running. Normally, further run requests would be ignored, but the second request comes in and is followed, flipping the state back to queued.
  5. Repeat.

Solution: prioritize_queued only considers tasks in the Scheduler's DagBag and ignores subdags on the assumption they are being run on their own.

This isn't a complete solution -- the problem will crop up if someone is running a scheduler along any other kind of job because the scheduler will try to run all queued tasks (in fact I wonder if it's the cause of some of the weird backfill errors people have been seeing). To do this properly, I will make the scheduler only work with its "own" tasks -- but that will involve more work. Please let me know if this solution works for now.

p.s. if you want to really stress this scenario, set your scheduler_heartbeat to 0

Thanks!

@syvineckruyk
Copy link
Contributor Author

@jlowin this seems to be working much better. As far as actual execution, errors, and bubbling up of errors everything seems in order ... I am seeing more queuing activity then I would expect as I spin up workers ... Need to get the logs together ... Im at Strata conference all day today so more then likely will not get to this before this evening.

Thanks again for all your help.

Also just wanted to clarify your last statement.

the problem will crop up if someone is running a scheduler along any other kind of job because the scheduler will try to run all queued tasks

Do you mean for example if someone were to run a backfill from cli while the schedule is running ? Are the subdag operator's "backfills" protected from this ?

Thanks

@jlowin
Copy link
Member

jlowin commented Mar 31, 2016

Yes, that's exactly what I mean -- the Scheduler is subtly greedy and tries to run ALL queued tasks even if they were queued by a subdagoperator's backfill (or potentially a CLI backfill). I will fix that with the PR that addresses all of your issues.

@syvineckruyk
Copy link
Contributor Author

I will fix that with the PR that addresses all of your issues.

You're making me sound like a basket case ;)

@jlowin
Copy link
Member

jlowin commented Mar 31, 2016

I should have said: all of Airflow's issues!

@syvineckruyk
Copy link
Contributor Author

@jlowin
So here are my observations based on your latest commits

In the subdag operator logs it appears that every task gets queued twice. Is it normal ?

The task instance logs looks fine.

Going to try with depends on past next.

Code

from datetime import datetime, timedelta
from airflow.models import DAG, Pool
from airflow.operators import BashOperator, SubDagOperator

import airflow
session = airflow.settings.Session()

pool = (
    session.query(Pool)
    .filter(Pool.pool == 'test_pool')
    .first())
if not pool:
    session.add(Pool(pool='test_pool', slots=10))
    session.commit()


dag_name = 'test_35'
test_pool = 'test_pool'

start_time = datetime(2016, 3, 28)

default_args = {
    'owner': 'Test',
    'depends_on_past': False,
    'start_date': start_time,
    # 'email': [email_to],
    # 'email_on_failure': True,
    # 'email_on_retry': True,
    # 'wait_for_downstream': False,
}

# Create the dag object
dag = DAG(dag_name,
          default_args=default_args,
          schedule_interval=timedelta(days=1))


def get_subdag(dag, sd_id, pool=None):
    subdag = DAG(
        dag_id='{parent_dag}.{sd_id}'.format(
            parent_dag=dag.dag_id,
            sd_id=sd_id),
        params=dag.params,
        default_args=dag.default_args,
        template_searchpath=dag.template_searchpath,
        user_defined_macros=dag.user_defined_macros,
    )

    t1 = BashOperator(
        task_id='test_task',
        bash_command='echo "hello" && sleep 0.2',
        dag=subdag,
        pool=pool
    )

    t2 = BashOperator(
        task_id='test_task_2_with_exit_1',
        bash_command='echo "hello" && sleep 0.2 && exit 1',
        dag=subdag,
        pool=pool
    )

    t2.set_upstream(t1)

    sdo = SubDagOperator(
        task_id=sd_id,
        subdag=subdag,
        retries=2,
        retry_delay=timedelta(seconds=10),
        dag=dag,
        depends_on_past=False,
    )

    return sdo

# sd1 = get_subdag(dag, sd_id='test_subdag')
sd2 = get_subdag(dag, sd_id='test_subdag_with_pool', pool=test_pool)

Subdag operator log from worker 2 (no log on worker 1)

[root@***02 ~]# cat /home/airflow/airflow/logs/test_35/test_subdag_with_pool/2016-03-28T00\:00\:00 
[2016-04-01 18:21:00,748] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:21:00,748] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:21:00,811] {models.py:317} INFO - Loaded DAG <DAG: test_35.test_subdag_with_pool>
[2016-04-01 18:21:00,811] {models.py:317} INFO - Loaded DAG <DAG: test_35>
[2016-04-01 18:21:01,968] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:21:01,968] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:21:02,006] {models.py:317} INFO - Loaded DAG <DAG: test_35.test_subdag_with_pool>
[2016-04-01 18:21:02,006] {models.py:317} INFO - Loaded DAG <DAG: test_35>
[2016-04-01 18:21:02,024] {models.py:1087} INFO - 
--------------------------------------------------------------------------------
Starting attempt 1 of 3
--------------------------------------------------------------------------------

[2016-04-01 18:21:02,044] {models.py:1110} INFO - Executing <Task(SubDagOperator): test_subdag_with_pool> on 2016-03-28 00:00:00
[2016-04-01 18:21:02,089] {base_executor.py:35} INFO - Adding to queue: airflow run test_35.test_subdag_with_pool test_task 2016-03-28T00:00:00 --local --pool test_pool -sd DAGS_FOLDER/steven_test/main.py 
[2016-04-01 18:21:07,016] {celery_executor.py:67} INFO - [celery] queuing ('test_35.test_subdag_with_pool', 'test_task', datetime.datetime(2016, 3, 28, 0, 0)) through celery, queue=dc
[2016-04-01 18:21:07,128] {jobs.py:942} INFO - [backfill progress] waiting: 2 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-04-01 18:21:12,046] {jobs.py:942} INFO - [backfill progress] waiting: 2 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-04-01 18:21:17,042] {jobs.py:942} INFO - [backfill progress] waiting: 2 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-04-01 18:21:17,059] {base_executor.py:35} INFO - Adding to queue: airflow run test_35.test_subdag_with_pool test_task 2016-03-28T00:00:00 --local --pool test_pool -sd DAGS_FOLDER/steven_test/main.py 
[2016-04-01 18:21:22,015] {celery_executor.py:67} INFO - [celery] queuing ('test_35.test_subdag_with_pool', 'test_task', datetime.datetime(2016, 3, 28, 0, 0)) through celery, queue=dc
[2016-04-01 18:21:22,023] {jobs.py:942} INFO - [backfill progress] waiting: 2 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-04-01 18:21:27,101] {jobs.py:942} INFO - [backfill progress] waiting: 2 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-04-01 18:21:32,077] {jobs.py:942} INFO - [backfill progress] waiting: 2 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-04-01 18:21:32,107] {base_executor.py:35} INFO - Adding to queue: airflow run test_35.test_subdag_with_pool test_task_2_with_exit_1 2016-03-28T00:00:00 --local --pool test_pool -sd DAGS_FOLDER/steven_test/main.py 
[2016-04-01 18:21:37,086] {celery_executor.py:67} INFO - [celery] queuing ('test_35.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 28, 0, 0)) through celery, queue=dc
[2016-04-01 18:21:37,106] {jobs.py:942} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 2 | failed: 0 | wont_run: 0 
[2016-04-01 18:21:42,109] {jobs.py:942} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 2 | failed: 0 | wont_run: 0 
[2016-04-01 18:21:47,068] {jobs.py:942} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 2 | failed: 0 | wont_run: 0 
[2016-04-01 18:21:47,086] {base_executor.py:35} INFO - Adding to queue: airflow run test_35.test_subdag_with_pool test_task_2_with_exit_1 2016-03-28T00:00:00 --local --pool test_pool -sd DAGS_FOLDER/steven_test/main.py 
[2016-04-01 18:21:52,016] {celery_executor.py:67} INFO - [celery] queuing ('test_35.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 28, 0, 0)) through celery, queue=dc
[2016-04-01 18:21:52,026] {jobs.py:942} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 2 | failed: 0 | wont_run: 0 
[2016-04-01 18:21:57,055] {jobs.py:942} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 2 | failed: 0 | wont_run: 0 
[2016-04-01 18:22:02,074] {jobs.py:883} ERROR - Task instance ('test_35.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 28, 0, 0)) failed
[2016-04-01 18:22:02,074] {jobs.py:942} INFO - [backfill progress] waiting: 0 | succeeded: 1 | kicked_off: 2 | failed: 1 | wont_run: 0 
[2016-04-01 18:22:02,075] {models.py:1175} ERROR - ------------------------------------------
Some tasks instances failed, here's the list:
[('test_35.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 28, 0, 0))]
Traceback (most recent call last):
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/models.py", line 1136, in run
    result = task_copy.execute(context=context)
  File "/home/airflow/.pyenv/versions/2.7.10/envs/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/operators/subdag_operator.py", line 45, in execute
    executor=self.executor)
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/models.py", line 2729, in run
    job.run()
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/jobs.py", line 175, in run
    self._execute()
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/jobs.py", line 951, in _execute
    raise AirflowException(msg)
AirflowException: ------------------------------------------
Some tasks instances failed, here's the list:
[('test_35.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 28, 0, 0))]
[2016-04-01 18:22:02,076] {models.py:1187} INFO - Marking task as UP_FOR_RETRY
[2016-04-01 18:22:02,098] {models.py:1216} ERROR - ------------------------------------------
Some tasks instances failed, here's the list:
[('test_35.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 28, 0, 0))]
[2016-04-01 18:22:18,834] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:22:18,834] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:22:18,877] {models.py:317} INFO - Loaded DAG <DAG: test_35.test_subdag_with_pool>
[2016-04-01 18:22:18,877] {models.py:317} INFO - Loaded DAG <DAG: test_35>
[2016-04-01 18:22:20,021] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:22:20,021] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:22:20,071] {models.py:317} INFO - Loaded DAG <DAG: test_35.test_subdag_with_pool>
[2016-04-01 18:22:20,071] {models.py:317} INFO - Loaded DAG <DAG: test_35>
[2016-04-01 18:22:20,087] {models.py:1087} INFO - 
--------------------------------------------------------------------------------
Starting attempt 2 of 3
--------------------------------------------------------------------------------

[2016-04-01 18:22:20,105] {models.py:1110} INFO - Executing <Task(SubDagOperator): test_subdag_with_pool> on 2016-03-28 00:00:00
[2016-04-01 18:22:20,143] {base_executor.py:35} INFO - Adding to queue: airflow run test_35.test_subdag_with_pool test_task_2_with_exit_1 2016-03-28T00:00:00 --local --pool test_pool -sd DAGS_FOLDER/steven_test/main.py 
[2016-04-01 18:22:25,014] {celery_executor.py:67} INFO - [celery] queuing ('test_35.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 28, 0, 0)) through celery, queue=dc
[2016-04-01 18:22:25,118] {jobs.py:942} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-04-01 18:22:30,061] {jobs.py:942} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-04-01 18:22:35,046] {jobs.py:942} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-04-01 18:22:35,065] {base_executor.py:35} INFO - Adding to queue: airflow run test_35.test_subdag_with_pool test_task_2_with_exit_1 2016-03-28T00:00:00 --local --pool test_pool -sd DAGS_FOLDER/steven_test/main.py 
[2016-04-01 18:22:40,021] {celery_executor.py:67} INFO - [celery] queuing ('test_35.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 28, 0, 0)) through celery, queue=dc
[2016-04-01 18:22:40,033] {jobs.py:942} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-04-01 18:22:45,061] {jobs.py:942} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-04-01 18:22:50,062] {jobs.py:942} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-04-01 18:22:55,066] {jobs.py:883} ERROR - Task instance ('test_35.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 28, 0, 0)) failed
[2016-04-01 18:22:55,072] {jobs.py:942} INFO - [backfill progress] waiting: 0 | succeeded: 1 | kicked_off: 1 | failed: 1 | wont_run: 0 
[2016-04-01 18:22:55,072] {models.py:1175} ERROR - ------------------------------------------
Some tasks instances failed, here's the list:
[('test_35.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 28, 0, 0))]
Traceback (most recent call last):
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/models.py", line 1136, in run
    result = task_copy.execute(context=context)
  File "/home/airflow/.pyenv/versions/2.7.10/envs/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/operators/subdag_operator.py", line 45, in execute
    executor=self.executor)
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/models.py", line 2729, in run
    job.run()
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/jobs.py", line 175, in run
    self._execute()
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/jobs.py", line 951, in _execute
    raise AirflowException(msg)
AirflowException: ------------------------------------------
Some tasks instances failed, here's the list:
[('test_35.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 28, 0, 0))]
[2016-04-01 18:22:55,073] {models.py:1187} INFO - Marking task as UP_FOR_RETRY
[2016-04-01 18:22:55,087] {models.py:1216} ERROR - ------------------------------------------
Some tasks instances failed, here's the list:
[('test_35.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 28, 0, 0))]
[2016-04-01 18:23:10,900] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:23:10,900] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:23:10,964] {models.py:317} INFO - Loaded DAG <DAG: test_35.test_subdag_with_pool>
[2016-04-01 18:23:10,964] {models.py:317} INFO - Loaded DAG <DAG: test_35>
[2016-04-01 18:23:12,115] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:23:12,116] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:23:12,157] {models.py:317} INFO - Loaded DAG <DAG: test_35.test_subdag_with_pool>
[2016-04-01 18:23:12,157] {models.py:317} INFO - Loaded DAG <DAG: test_35>
[2016-04-01 18:23:12,174] {models.py:1087} INFO - 
--------------------------------------------------------------------------------
Starting attempt 3 of 3
--------------------------------------------------------------------------------

[2016-04-01 18:23:12,189] {models.py:1110} INFO - Executing <Task(SubDagOperator): test_subdag_with_pool> on 2016-03-28 00:00:00
[2016-04-01 18:23:12,229] {base_executor.py:35} INFO - Adding to queue: airflow run test_35.test_subdag_with_pool test_task_2_with_exit_1 2016-03-28T00:00:00 --local --pool test_pool -sd DAGS_FOLDER/steven_test/main.py 
[2016-04-01 18:23:17,013] {celery_executor.py:67} INFO - [celery] queuing ('test_35.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 28, 0, 0)) through celery, queue=dc
[2016-04-01 18:23:17,162] {jobs.py:942} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-04-01 18:23:22,057] {jobs.py:942} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-04-01 18:23:27,049] {jobs.py:942} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-04-01 18:23:27,067] {base_executor.py:35} INFO - Adding to queue: airflow run test_35.test_subdag_with_pool test_task_2_with_exit_1 2016-03-28T00:00:00 --local --pool test_pool -sd DAGS_FOLDER/steven_test/main.py 
[2016-04-01 18:23:32,019] {celery_executor.py:67} INFO - [celery] queuing ('test_35.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 28, 0, 0)) through celery, queue=dc
[2016-04-01 18:23:32,027] {jobs.py:942} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-04-01 18:23:37,052] {jobs.py:942} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-04-01 18:23:42,067] {jobs.py:942} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-04-01 18:23:47,079] {jobs.py:883} ERROR - Task instance ('test_35.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 28, 0, 0)) failed
[2016-04-01 18:23:47,079] {jobs.py:942} INFO - [backfill progress] waiting: 0 | succeeded: 1 | kicked_off: 1 | failed: 1 | wont_run: 0 
[2016-04-01 18:23:47,079] {models.py:1175} ERROR - ------------------------------------------
Some tasks instances failed, here's the list:
[('test_35.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 28, 0, 0))]
Traceback (most recent call last):
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/models.py", line 1136, in run
    result = task_copy.execute(context=context)
  File "/home/airflow/.pyenv/versions/2.7.10/envs/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/operators/subdag_operator.py", line 45, in execute
    executor=self.executor)
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/models.py", line 2729, in run
    job.run()
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/jobs.py", line 175, in run
    self._execute()
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/jobs.py", line 951, in _execute
    raise AirflowException(msg)
AirflowException: ------------------------------------------
Some tasks instances failed, here's the list:
[('test_35.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 28, 0, 0))]
[2016-04-01 18:23:47,081] {models.py:1193} INFO - All retries failed; marking task as FAILED
[2016-04-01 18:23:47,101] {models.py:1216} ERROR - ------------------------------------------
Some tasks instances failed, here's the list:
[('test_35.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 28, 0, 0))]

Subtask log from worker 1

[root@***01 ~]# cat /home/airflow/airflow/logs/test_35.test_subdag_with_pool/test_task_2_with_exit_1/2016-03-28T00\:00\:00 
[2016-04-01 18:21:38,306] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:21:38,307] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:21:38,339] {models.py:317} INFO - Loaded DAG <DAG: test_35.test_subdag_with_pool>
[2016-04-01 18:21:38,339] {models.py:317} INFO - Loaded DAG <DAG: test_35>
[2016-04-01 18:21:39,339] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:21:39,339] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:21:39,372] {models.py:317} INFO - Loaded DAG <DAG: test_35.test_subdag_with_pool>
[2016-04-01 18:21:39,372] {models.py:317} INFO - Loaded DAG <DAG: test_35>
[2016-04-01 18:21:39,387] {models.py:1078} INFO - 
--------------------------------------------------------------------------------
Queuing attempt 1 of 1
--------------------------------------------------------------------------------

[2016-04-01 18:21:39,397] {models.py:1083} INFO - Queuing into pool test_pool
[2016-04-01 18:21:54,182] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:21:54,182] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:21:54,215] {models.py:317} INFO - Loaded DAG <DAG: test_35.test_subdag_with_pool>
[2016-04-01 18:21:54,215] {models.py:317} INFO - Loaded DAG <DAG: test_35>
[2016-04-01 18:21:55,211] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:21:55,212] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:21:55,247] {models.py:317} INFO - Loaded DAG <DAG: test_35.test_subdag_with_pool>
[2016-04-01 18:21:55,247] {models.py:317} INFO - Loaded DAG <DAG: test_35>
[2016-04-01 18:21:55,261] {models.py:1087} INFO - 
--------------------------------------------------------------------------------
Starting attempt 1 of 1
--------------------------------------------------------------------------------

[2016-04-01 18:21:55,274] {models.py:1110} INFO - Executing <Task(BashOperator): test_task_2_with_exit_1> on 2016-03-28 00:00:00
[2016-04-01 18:21:55,293] {bash_operator.py:55} INFO - tmp dir root location: 
/tmp
[2016-04-01 18:21:55,295] {bash_operator.py:64} INFO - Temporary script location :/tmp/airflowtmphCzdRV//tmp/airflowtmphCzdRV/test_task_2_with_exit_1oD_rdc
[2016-04-01 18:21:55,295] {bash_operator.py:65} INFO - Running command: echo "hello" && sleep 0.2 && exit 1
[2016-04-01 18:21:55,298] {bash_operator.py:73} INFO - Output:
[2016-04-01 18:21:55,299] {bash_operator.py:77} INFO - hello
[2016-04-01 18:21:55,501] {bash_operator.py:80} INFO - Command exited with return code 1
[2016-04-01 18:21:55,502] {models.py:1175} ERROR - Bash command failed
Traceback (most recent call last):
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/models.py", line 1136, in run
    result = task_copy.execute(context=context)
  File "/home/airflow/.pyenv/versions/2.7.10/envs/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/operators/bash_operator.py", line 83, in execute
    raise AirflowException("Bash command failed")
AirflowException: Bash command failed
[2016-04-01 18:21:55,503] {models.py:1195} INFO - Marking task as FAILED.
[2016-04-01 18:21:55,516] {models.py:1216} ERROR - Bash command failed

Subtask log from worker 2

[root@usw2dcdpag02 ~]# cat /home/airflow/airflow/logs/test_35.test_subdag_with_pool/test_task_2_with_exit_1/2016-03-28T00\:00\:00 
[2016-04-01 18:22:29,202] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:22:29,202] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:22:29,264] {models.py:317} INFO - Loaded DAG <DAG: test_35.test_subdag_with_pool>
[2016-04-01 18:22:29,264] {models.py:317} INFO - Loaded DAG <DAG: test_35>
[2016-04-01 18:22:30,440] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:22:30,440] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:22:30,477] {models.py:317} INFO - Loaded DAG <DAG: test_35.test_subdag_with_pool>
[2016-04-01 18:22:30,477] {models.py:317} INFO - Loaded DAG <DAG: test_35>
[2016-04-01 18:22:30,496] {models.py:1078} INFO - 
--------------------------------------------------------------------------------
Queuing attempt 1 of 1
--------------------------------------------------------------------------------

[2016-04-01 18:22:30,507] {models.py:1083} INFO - Queuing into pool test_pool
[2016-04-01 18:22:44,889] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:22:44,889] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:22:44,954] {models.py:317} INFO - Loaded DAG <DAG: test_35.test_subdag_with_pool>
[2016-04-01 18:22:44,954] {models.py:317} INFO - Loaded DAG <DAG: test_35>
[2016-04-01 18:22:46,122] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:22:46,122] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:22:46,162] {models.py:317} INFO - Loaded DAG <DAG: test_35.test_subdag_with_pool>
[2016-04-01 18:22:46,162] {models.py:317} INFO - Loaded DAG <DAG: test_35>
[2016-04-01 18:22:46,178] {models.py:1087} INFO - 
--------------------------------------------------------------------------------
Starting attempt 1 of 1
--------------------------------------------------------------------------------

[2016-04-01 18:22:46,193] {models.py:1110} INFO - Executing <Task(BashOperator): test_task_2_with_exit_1> on 2016-03-28 00:00:00
[2016-04-01 18:22:46,210] {bash_operator.py:55} INFO - tmp dir root location: 
/tmp
[2016-04-01 18:22:46,211] {bash_operator.py:64} INFO - Temporary script location :/tmp/airflowtmpAafXtN//tmp/airflowtmpAafXtN/test_task_2_with_exit_1QMBYdK
[2016-04-01 18:22:46,211] {bash_operator.py:65} INFO - Running command: echo "hello" && sleep 0.2 && exit 1
[2016-04-01 18:22:46,215] {bash_operator.py:73} INFO - Output:
[2016-04-01 18:22:46,216] {bash_operator.py:77} INFO - hello
[2016-04-01 18:22:46,417] {bash_operator.py:80} INFO - Command exited with return code 1
[2016-04-01 18:22:46,417] {models.py:1175} ERROR - Bash command failed
Traceback (most recent call last):
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/models.py", line 1136, in run
    result = task_copy.execute(context=context)
  File "/home/airflow/.pyenv/versions/2.7.10/envs/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/operators/bash_operator.py", line 83, in execute
    raise AirflowException("Bash command failed")
AirflowException: Bash command failed
[2016-04-01 18:22:46,419] {models.py:1195} INFO - Marking task as FAILED.
[2016-04-01 18:22:46,434] {models.py:1216} ERROR - Bash command failed
[2016-04-01 18:23:21,401] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:23:21,401] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:23:21,491] {models.py:317} INFO - Loaded DAG <DAG: test_35.test_subdag_with_pool>
[2016-04-01 18:23:21,491] {models.py:317} INFO - Loaded DAG <DAG: test_35>
[2016-04-01 18:23:22,643] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:23:22,643] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:23:22,686] {models.py:317} INFO - Loaded DAG <DAG: test_35.test_subdag_with_pool>
[2016-04-01 18:23:22,687] {models.py:317} INFO - Loaded DAG <DAG: test_35>
[2016-04-01 18:23:22,705] {models.py:1078} INFO - 
--------------------------------------------------------------------------------
Queuing attempt 1 of 1
--------------------------------------------------------------------------------

[2016-04-01 18:23:22,716] {models.py:1083} INFO - Queuing into pool test_pool
[2016-04-01 18:23:37,244] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:23:37,244] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:23:37,280] {models.py:317} INFO - Loaded DAG <DAG: test_35.test_subdag_with_pool>
[2016-04-01 18:23:37,280] {models.py:317} INFO - Loaded DAG <DAG: test_35>
[2016-04-01 18:23:38,435] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:23:38,436] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:23:38,474] {models.py:317} INFO - Loaded DAG <DAG: test_35.test_subdag_with_pool>
[2016-04-01 18:23:38,474] {models.py:317} INFO - Loaded DAG <DAG: test_35>
[2016-04-01 18:23:38,490] {models.py:1087} INFO - 
--------------------------------------------------------------------------------
Starting attempt 1 of 1
--------------------------------------------------------------------------------

[2016-04-01 18:23:38,505] {models.py:1110} INFO - Executing <Task(BashOperator): test_task_2_with_exit_1> on 2016-03-28 00:00:00
[2016-04-01 18:23:38,524] {bash_operator.py:55} INFO - tmp dir root location: 
/tmp
[2016-04-01 18:23:38,525] {bash_operator.py:64} INFO - Temporary script location :/tmp/airflowtmpnU0bjL//tmp/airflowtmpnU0bjL/test_task_2_with_exit_1kAx8gd
[2016-04-01 18:23:38,525] {bash_operator.py:65} INFO - Running command: echo "hello" && sleep 0.2 && exit 1
[2016-04-01 18:23:38,529] {bash_operator.py:73} INFO - Output:
[2016-04-01 18:23:38,530] {bash_operator.py:77} INFO - hello
[2016-04-01 18:23:38,732] {bash_operator.py:80} INFO - Command exited with return code 1
[2016-04-01 18:23:38,732] {models.py:1175} ERROR - Bash command failed
Traceback (most recent call last):
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/models.py", line 1136, in run
    result = task_copy.execute(context=context)
  File "/home/airflow/.pyenv/versions/2.7.10/envs/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/operators/bash_operator.py", line 83, in execute
    raise AirflowException("Bash command failed")
AirflowException: Bash command failed
[2016-04-01 18:23:38,733] {models.py:1195} INFO - Marking task as FAILED.
[2016-04-01 18:23:38,748] {models.py:1216} ERROR - Bash command failed

@jlowin
Copy link
Member

jlowin commented Apr 1, 2016

@syvineckruyk you're referring to the doubles near the top of the log, right? (After "starting attempt 1 of 3"). If so, that's normal. The line appears any time the task is sent to the executor. The first time, it gets queued into the pool and the second time it gets run. That's why you don't see it for the non-pooled tasks; they go straight to running.

Maybe we can make that log a little clearer about executor queuing vs celery queuing vs pool queuing.

@syvineckruyk
Copy link
Contributor Author

@jlowin I think I understand ... just to be 100% sure. I am talking about these messages:

[2016-04-01 18:21:02,089] {base_executor.py:35} INFO - Adding to queue: airflow run test_35.test_subdag_with_pool test_task 2016-03-28T00:00:00 --local --pool test_pool -sd DAGS_FOLDER/steven_test/main.py 
[2016-04-01 18:21:07,016] {celery_executor.py:67} INFO - [celery] queuing ('test_35.test_subdag_with_pool', 'test_task', datetime.datetime(2016, 3, 28, 0, 0)) through celery, queue=dc

Which are also duplicated for "starting attempt 2 of 3" and "starting attempt 3 of 3"... Sounds like we are talking about the same thing though.

@syvineckruyk
Copy link
Contributor Author

@jlowin

  • Subdag Operator logs look good ... same as with depends_on_past = False
  • Sub-task logs look good as well
  • 👍

Code run

from datetime import datetime, timedelta
from airflow.models import DAG, Pool
from airflow.operators import BashOperator, SubDagOperator

import airflow
session = airflow.settings.Session()

pool = (
    session.query(Pool)
    .filter(Pool.pool == 'test_pool')
    .first())
if not pool:
    session.add(Pool(pool='test_pool', slots=10))
    session.commit()


dag_name = 'test_36'
test_pool = 'test_pool'

start_time = datetime(2016, 3, 28)

default_args = {
    'owner': 'Test',
    'depends_on_past': False,
    'start_date': start_time,
    # 'email': [email_to],
    # 'email_on_failure': True,
    # 'email_on_retry': True,
    # 'wait_for_downstream': False,
}

# Create the dag object
dag = DAG(dag_name,
          default_args=default_args,
          schedule_interval=timedelta(days=1))


def get_subdag(dag, sd_id, pool=None):
    subdag = DAG(
        dag_id='{parent_dag}.{sd_id}'.format(
            parent_dag=dag.dag_id,
            sd_id=sd_id),
        params=dag.params,
        default_args=dag.default_args,
        template_searchpath=dag.template_searchpath,
        user_defined_macros=dag.user_defined_macros,
    )

    t1 = BashOperator(
        task_id='test_task',
        bash_command='echo "hello" && sleep 0.2',
        dag=subdag,
        pool=pool
    )

    t2 = BashOperator(
        task_id='test_task_2_with_exit_1',
        bash_command='echo "hello" && sleep 0.2 && exit 1',
        dag=subdag,
        pool=pool
    )

    t2.set_upstream(t1)

    sdo = SubDagOperator(
        task_id=sd_id,
        subdag=subdag,
        retries=2,
        retry_delay=timedelta(seconds=10),
        dag=dag,
        depends_on_past=True,
    )

    return sdo

# sd1 = get_subdag(dag, sd_id='test_subdag')
sd2 = get_subdag(dag, sd_id='test_subdag_with_pool', pool=test_pool)

Subdag Operator - Worker 1 - Log


[root@***01 ~]# cat /home/airflow/airflow/logs/test_36/test_subdag_with_pool/2016-03-28T00\:00\:00 
[2016-04-01 18:47:53,011] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:47:53,011] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:47:53,062] {models.py:317} INFO - Loaded DAG <DAG: test_36.test_subdag_with_pool>
[2016-04-01 18:47:53,062] {models.py:317} INFO - Loaded DAG <DAG: test_36>
[2016-04-01 18:47:54,082] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:47:54,082] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:47:54,117] {models.py:317} INFO - Loaded DAG <DAG: test_36.test_subdag_with_pool>
[2016-04-01 18:47:54,117] {models.py:317} INFO - Loaded DAG <DAG: test_36>
[2016-04-01 18:47:54,132] {models.py:1087} INFO - 
--------------------------------------------------------------------------------
Starting attempt 1 of 3
--------------------------------------------------------------------------------

[2016-04-01 18:47:54,147] {models.py:1110} INFO - Executing <Task(SubDagOperator): test_subdag_with_pool> on 2016-03-28 00:00:00
[2016-04-01 18:47:54,186] {base_executor.py:35} INFO - Adding to queue: airflow run test_36.test_subdag_with_pool test_task 2016-03-28T00:00:00 --local --pool test_pool -sd DAGS_FOLDER/steven_test/main.py 
[2016-04-01 18:47:59,020] {celery_executor.py:67} INFO - [celery] queuing ('test_36.test_subdag_with_pool', 'test_task', datetime.datetime(2016, 3, 28, 0, 0)) through celery, queue=dc
[2016-04-01 18:47:59,116] {jobs.py:942} INFO - [backfill progress] waiting: 2 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-04-01 18:48:04,088] {jobs.py:942} INFO - [backfill progress] waiting: 2 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-04-01 18:48:09,087] {jobs.py:942} INFO - [backfill progress] waiting: 2 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-04-01 18:48:14,066] {jobs.py:942} INFO - [backfill progress] waiting: 2 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-04-01 18:48:14,087] {base_executor.py:35} INFO - Adding to queue: airflow run test_36.test_subdag_with_pool test_task 2016-03-28T00:00:00 --local --pool test_pool -sd DAGS_FOLDER/steven_test/main.py 
[2016-04-01 18:48:19,013] {celery_executor.py:67} INFO - [celery] queuing ('test_36.test_subdag_with_pool', 'test_task', datetime.datetime(2016, 3, 28, 0, 0)) through celery, queue=dc
[2016-04-01 18:48:19,021] {jobs.py:942} INFO - [backfill progress] waiting: 2 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-04-01 18:48:24,060] {jobs.py:942} INFO - [backfill progress] waiting: 2 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-04-01 18:48:29,065] {jobs.py:942} INFO - [backfill progress] waiting: 2 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-04-01 18:48:29,075] {base_executor.py:35} INFO - Adding to queue: airflow run test_36.test_subdag_with_pool test_task_2_with_exit_1 2016-03-28T00:00:00 --local --pool test_pool -sd DAGS_FOLDER/steven_test/main.py 
[2016-04-01 18:48:34,062] {celery_executor.py:67} INFO - [celery] queuing ('test_36.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 28, 0, 0)) through celery, queue=dc
[2016-04-01 18:48:34,070] {jobs.py:942} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 2 | failed: 0 | wont_run: 0 
[2016-04-01 18:48:39,053] {jobs.py:942} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 2 | failed: 0 | wont_run: 0 
[2016-04-01 18:48:44,063] {jobs.py:942} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 2 | failed: 0 | wont_run: 0 
[2016-04-01 18:48:49,069] {jobs.py:942} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 2 | failed: 0 | wont_run: 0 
[2016-04-01 18:48:49,079] {base_executor.py:35} INFO - Adding to queue: airflow run test_36.test_subdag_with_pool test_task_2_with_exit_1 2016-03-28T00:00:00 --local --pool test_pool -sd DAGS_FOLDER/steven_test/main.py 
[2016-04-01 18:48:54,015] {celery_executor.py:67} INFO - [celery] queuing ('test_36.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 28, 0, 0)) through celery, queue=dc
[2016-04-01 18:48:54,158] {jobs.py:942} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 2 | failed: 0 | wont_run: 0 
[2016-04-01 18:48:59,036] {jobs.py:942} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 2 | failed: 0 | wont_run: 0 
[2016-04-01 18:49:04,054] {jobs.py:883} ERROR - Task instance ('test_36.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 28, 0, 0)) failed
[2016-04-01 18:49:04,055] {jobs.py:942} INFO - [backfill progress] waiting: 0 | succeeded: 1 | kicked_off: 2 | failed: 1 | wont_run: 0 
[2016-04-01 18:49:04,055] {models.py:1175} ERROR - ------------------------------------------
Some tasks instances failed, here's the list:
[('test_36.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 28, 0, 0))]
Traceback (most recent call last):
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/models.py", line 1136, in run
    result = task_copy.execute(context=context)
  File "/home/airflow/.pyenv/versions/2.7.10/envs/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/operators/subdag_operator.py", line 45, in execute
    executor=self.executor)
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/models.py", line 2729, in run
    job.run()
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/jobs.py", line 175, in run
    self._execute()
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/jobs.py", line 951, in _execute
    raise AirflowException(msg)
AirflowException: ------------------------------------------
Some tasks instances failed, here's the list:
[('test_36.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 28, 0, 0))]
[2016-04-01 18:49:04,056] {models.py:1187} INFO - Marking task as UP_FOR_RETRY
[2016-04-01 18:49:04,069] {models.py:1216} ERROR - ------------------------------------------
Some tasks instances failed, here's the list:
[('test_36.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 28, 0, 0))]

Subdag Operator - Worker 2 - Log

[root@***02 ~]# cat /home/airflow/airflow/logs/test_36/test_subdag_with_pool/2016-03-28T00\:00\:00 
[2016-04-01 18:49:21,265] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:49:21,265] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:49:21,366] {models.py:317} INFO - Loaded DAG <DAG: test_36.test_subdag_with_pool>
[2016-04-01 18:49:21,366] {models.py:317} INFO - Loaded DAG <DAG: test_36>
[2016-04-01 18:49:22,509] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:49:22,509] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:49:22,546] {models.py:317} INFO - Loaded DAG <DAG: test_36.test_subdag_with_pool>
[2016-04-01 18:49:22,546] {models.py:317} INFO - Loaded DAG <DAG: test_36>
[2016-04-01 18:49:22,560] {models.py:1087} INFO - 
--------------------------------------------------------------------------------
Starting attempt 2 of 3
--------------------------------------------------------------------------------

[2016-04-01 18:49:22,575] {models.py:1110} INFO - Executing <Task(SubDagOperator): test_subdag_with_pool> on 2016-03-28 00:00:00
[2016-04-01 18:49:22,613] {base_executor.py:35} INFO - Adding to queue: airflow run test_36.test_subdag_with_pool test_task_2_with_exit_1 2016-03-28T00:00:00 --local --pool test_pool -sd DAGS_FOLDER/steven_test/main.py 
[2016-04-01 18:49:28,013] {celery_executor.py:67} INFO - [celery] queuing ('test_36.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 28, 0, 0)) through celery, queue=dc
[2016-04-01 18:49:28,106] {jobs.py:942} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-04-01 18:49:33,048] {jobs.py:942} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-04-01 18:49:38,050] {jobs.py:942} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-04-01 18:49:38,064] {base_executor.py:35} INFO - Adding to queue: airflow run test_36.test_subdag_with_pool test_task_2_with_exit_1 2016-03-28T00:00:00 --local --pool test_pool -sd DAGS_FOLDER/steven_test/main.py 
[2016-04-01 18:49:43,012] {celery_executor.py:67} INFO - [celery] queuing ('test_36.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 28, 0, 0)) through celery, queue=dc
[2016-04-01 18:49:43,019] {jobs.py:942} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-04-01 18:49:48,045] {jobs.py:942} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-04-01 18:49:53,042] {jobs.py:883} ERROR - Task instance ('test_36.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 28, 0, 0)) failed
[2016-04-01 18:49:53,042] {jobs.py:942} INFO - [backfill progress] waiting: 0 | succeeded: 1 | kicked_off: 1 | failed: 1 | wont_run: 0 
[2016-04-01 18:49:53,042] {models.py:1175} ERROR - ------------------------------------------
Some tasks instances failed, here's the list:
[('test_36.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 28, 0, 0))]
Traceback (most recent call last):
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/models.py", line 1136, in run
    result = task_copy.execute(context=context)
  File "/home/airflow/.pyenv/versions/2.7.10/envs/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/operators/subdag_operator.py", line 45, in execute
    executor=self.executor)
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/models.py", line 2729, in run
    job.run()
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/jobs.py", line 175, in run
    self._execute()
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/jobs.py", line 951, in _execute
    raise AirflowException(msg)
AirflowException: ------------------------------------------
Some tasks instances failed, here's the list:
[('test_36.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 28, 0, 0))]
[2016-04-01 18:49:53,043] {models.py:1187} INFO - Marking task as UP_FOR_RETRY
[2016-04-01 18:49:53,059] {models.py:1216} ERROR - ------------------------------------------
Some tasks instances failed, here's the list:
[('test_36.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 28, 0, 0))]
[2016-04-01 18:50:08,057] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:50:08,057] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:50:08,134] {models.py:317} INFO - Loaded DAG <DAG: test_36.test_subdag_with_pool>
[2016-04-01 18:50:08,134] {models.py:317} INFO - Loaded DAG <DAG: test_36>
[2016-04-01 18:50:09,274] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:50:09,274] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:50:09,314] {models.py:317} INFO - Loaded DAG <DAG: test_36.test_subdag_with_pool>
[2016-04-01 18:50:09,314] {models.py:317} INFO - Loaded DAG <DAG: test_36>
[2016-04-01 18:50:09,332] {models.py:1087} INFO - 
--------------------------------------------------------------------------------
Starting attempt 3 of 3
--------------------------------------------------------------------------------

[2016-04-01 18:50:09,347] {models.py:1110} INFO - Executing <Task(SubDagOperator): test_subdag_with_pool> on 2016-03-28 00:00:00
[2016-04-01 18:50:09,382] {base_executor.py:35} INFO - Adding to queue: airflow run test_36.test_subdag_with_pool test_task_2_with_exit_1 2016-03-28T00:00:00 --local --pool test_pool -sd DAGS_FOLDER/steven_test/main.py 
[2016-04-01 18:50:14,018] {celery_executor.py:67} INFO - [celery] queuing ('test_36.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 28, 0, 0)) through celery, queue=dc
[2016-04-01 18:50:14,142] {jobs.py:942} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-04-01 18:50:19,053] {jobs.py:942} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-04-01 18:50:24,079] {jobs.py:942} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-04-01 18:50:29,045] {jobs.py:942} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-04-01 18:50:29,057] {base_executor.py:35} INFO - Adding to queue: airflow run test_36.test_subdag_with_pool test_task_2_with_exit_1 2016-03-28T00:00:00 --local --pool test_pool -sd DAGS_FOLDER/steven_test/main.py 
[2016-04-01 18:50:34,013] {celery_executor.py:67} INFO - [celery] queuing ('test_36.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 28, 0, 0)) through celery, queue=dc
[2016-04-01 18:50:34,025] {jobs.py:942} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-04-01 18:50:39,080] {jobs.py:942} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-04-01 18:50:44,062] {jobs.py:942} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-04-01 18:50:49,073] {jobs.py:883} ERROR - Task instance ('test_36.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 28, 0, 0)) failed
[2016-04-01 18:50:49,073] {jobs.py:942} INFO - [backfill progress] waiting: 0 | succeeded: 1 | kicked_off: 1 | failed: 1 | wont_run: 0 
[2016-04-01 18:50:49,078] {models.py:1175} ERROR - ------------------------------------------
Some tasks instances failed, here's the list:
[('test_36.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 28, 0, 0))]
Traceback (most recent call last):
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/models.py", line 1136, in run
    result = task_copy.execute(context=context)
  File "/home/airflow/.pyenv/versions/2.7.10/envs/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/operators/subdag_operator.py", line 45, in execute
    executor=self.executor)
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/models.py", line 2729, in run
    job.run()
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/jobs.py", line 175, in run
    self._execute()
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/jobs.py", line 951, in _execute
    raise AirflowException(msg)
AirflowException: ------------------------------------------
Some tasks instances failed, here's the list:
[('test_36.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 28, 0, 0))]
[2016-04-01 18:50:49,080] {models.py:1193} INFO - All retries failed; marking task as FAILED
[2016-04-01 18:50:49,092] {models.py:1216} ERROR - ------------------------------------------
Some tasks instances failed, here's the list:
[('test_36.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 28, 0, 0))]

Sub-Task - Worker 1 - Log

[root@***01 ~]# cat /home/airflow/airflow/logs/test_36.test_subdag_with_pool/test_task_2_with_exit_1/2016-03-28T00\:00\:00 
[2016-04-01 18:48:55,206] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:48:55,207] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:48:55,297] {models.py:317} INFO - Loaded DAG <DAG: test_36.test_subdag_with_pool>
[2016-04-01 18:48:55,297] {models.py:317} INFO - Loaded DAG <DAG: test_36>
[2016-04-01 18:48:56,281] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:48:56,282] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:48:56,314] {models.py:317} INFO - Loaded DAG <DAG: test_36.test_subdag_with_pool>
[2016-04-01 18:48:56,314] {models.py:317} INFO - Loaded DAG <DAG: test_36>
[2016-04-01 18:48:56,329] {models.py:1087} INFO - 
--------------------------------------------------------------------------------
Starting attempt 1 of 1
--------------------------------------------------------------------------------

[2016-04-01 18:48:56,344] {models.py:1110} INFO - Executing <Task(BashOperator): test_task_2_with_exit_1> on 2016-03-28 00:00:00
[2016-04-01 18:48:56,359] {bash_operator.py:55} INFO - tmp dir root location: 
/tmp
[2016-04-01 18:48:56,361] {bash_operator.py:64} INFO - Temporary script location :/tmp/airflowtmpHOJK_m//tmp/airflowtmpHOJK_m/test_task_2_with_exit_18ZJESg
[2016-04-01 18:48:56,361] {bash_operator.py:65} INFO - Running command: echo "hello" && sleep 0.2 && exit 1
[2016-04-01 18:48:56,365] {bash_operator.py:73} INFO - Output:
[2016-04-01 18:48:56,366] {bash_operator.py:77} INFO - hello
[2016-04-01 18:48:56,567] {bash_operator.py:80} INFO - Command exited with return code 1
[2016-04-01 18:48:56,567] {models.py:1175} ERROR - Bash command failed
Traceback (most recent call last):
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/models.py", line 1136, in run
    result = task_copy.execute(context=context)
  File "/home/airflow/.pyenv/versions/2.7.10/envs/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/operators/bash_operator.py", line 83, in execute
    raise AirflowException("Bash command failed")
AirflowException: Bash command failed
[2016-04-01 18:48:56,569] {models.py:1195} INFO - Marking task as FAILED.
[2016-04-01 18:48:56,584] {models.py:1216} ERROR - Bash command failed

Sub-task - Worker 2 - Log

[root@***02 ~]# cat /home/airflow/airflow/logs/test_36.test_subdag_with_pool/test_task_2_with_exit_1/2016-03-28T00\:00\:00
[2016-04-01 18:48:39,716] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:48:39,717] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:48:39,772] {models.py:317} INFO - Loaded DAG <DAG: test_36.test_subdag_with_pool>
[2016-04-01 18:48:39,772] {models.py:317} INFO - Loaded DAG <DAG: test_36>
[2016-04-01 18:48:40,917] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:48:40,917] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:48:40,955] {models.py:317} INFO - Loaded DAG <DAG: test_36.test_subdag_with_pool>
[2016-04-01 18:48:40,955] {models.py:317} INFO - Loaded DAG <DAG: test_36>
[2016-04-01 18:48:40,971] {models.py:1078} INFO - 
--------------------------------------------------------------------------------
Queuing attempt 1 of 1
--------------------------------------------------------------------------------

[2016-04-01 18:48:40,982] {models.py:1083} INFO - Queuing into pool test_pool
[2016-04-01 18:49:31,616] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:49:31,616] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:49:31,673] {models.py:317} INFO - Loaded DAG <DAG: test_36.test_subdag_with_pool>
[2016-04-01 18:49:31,673] {models.py:317} INFO - Loaded DAG <DAG: test_36>
[2016-04-01 18:49:32,830] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:49:32,831] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:49:32,908] {models.py:317} INFO - Loaded DAG <DAG: test_36.test_subdag_with_pool>
[2016-04-01 18:49:32,908] {models.py:317} INFO - Loaded DAG <DAG: test_36>
[2016-04-01 18:49:32,923] {models.py:1078} INFO - 
--------------------------------------------------------------------------------
Queuing attempt 1 of 1
--------------------------------------------------------------------------------

[2016-04-01 18:49:32,932] {models.py:1083} INFO - Queuing into pool test_pool
[2016-04-01 18:49:47,250] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:49:47,250] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:49:47,321] {models.py:317} INFO - Loaded DAG <DAG: test_36.test_subdag_with_pool>
[2016-04-01 18:49:47,321] {models.py:317} INFO - Loaded DAG <DAG: test_36>
[2016-04-01 18:49:48,476] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:49:48,476] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:49:48,515] {models.py:317} INFO - Loaded DAG <DAG: test_36.test_subdag_with_pool>
[2016-04-01 18:49:48,515] {models.py:317} INFO - Loaded DAG <DAG: test_36>
[2016-04-01 18:49:48,531] {models.py:1087} INFO - 
--------------------------------------------------------------------------------
Starting attempt 1 of 1
--------------------------------------------------------------------------------

[2016-04-01 18:49:48,545] {models.py:1110} INFO - Executing <Task(BashOperator): test_task_2_with_exit_1> on 2016-03-28 00:00:00
[2016-04-01 18:49:48,562] {bash_operator.py:55} INFO - tmp dir root location: 
/tmp
[2016-04-01 18:49:48,563] {bash_operator.py:64} INFO - Temporary script location :/tmp/airflowtmpk5MaUu//tmp/airflowtmpk5MaUu/test_task_2_with_exit_1hmLgze
[2016-04-01 18:49:48,563] {bash_operator.py:65} INFO - Running command: echo "hello" && sleep 0.2 && exit 1
[2016-04-01 18:49:48,567] {bash_operator.py:73} INFO - Output:
[2016-04-01 18:49:48,568] {bash_operator.py:77} INFO - hello
[2016-04-01 18:49:48,769] {bash_operator.py:80} INFO - Command exited with return code 1
[2016-04-01 18:49:48,769] {models.py:1175} ERROR - Bash command failed
Traceback (most recent call last):
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/models.py", line 1136, in run
    result = task_copy.execute(context=context)
  File "/home/airflow/.pyenv/versions/2.7.10/envs/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/operators/bash_operator.py", line 83, in execute
    raise AirflowException("Bash command failed")
AirflowException: Bash command failed
[2016-04-01 18:49:48,771] {models.py:1195} INFO - Marking task as FAILED.
[2016-04-01 18:49:48,786] {models.py:1216} ERROR - Bash command failed
[2016-04-01 18:50:18,447] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:50:18,448] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:50:18,536] {models.py:317} INFO - Loaded DAG <DAG: test_36.test_subdag_with_pool>
[2016-04-01 18:50:18,536] {models.py:317} INFO - Loaded DAG <DAG: test_36>
[2016-04-01 18:50:19,678] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:50:19,678] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:50:19,715] {models.py:317} INFO - Loaded DAG <DAG: test_36.test_subdag_with_pool>
[2016-04-01 18:50:19,715] {models.py:317} INFO - Loaded DAG <DAG: test_36>
[2016-04-01 18:50:19,731] {models.py:1078} INFO - 
--------------------------------------------------------------------------------
Queuing attempt 1 of 1
--------------------------------------------------------------------------------

[2016-04-01 18:50:19,741] {models.py:1083} INFO - Queuing into pool test_pool
[2016-04-01 18:50:39,222] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:50:39,223] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:50:39,260] {models.py:317} INFO - Loaded DAG <DAG: test_36.test_subdag_with_pool>
[2016-04-01 18:50:39,260] {models.py:317} INFO - Loaded DAG <DAG: test_36>
[2016-04-01 18:50:40,413] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:50:40,413] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:50:40,449] {models.py:317} INFO - Loaded DAG <DAG: test_36.test_subdag_with_pool>
[2016-04-01 18:50:40,449] {models.py:317} INFO - Loaded DAG <DAG: test_36>
[2016-04-01 18:50:40,465] {models.py:1087} INFO - 
--------------------------------------------------------------------------------
Starting attempt 1 of 1
--------------------------------------------------------------------------------

[2016-04-01 18:50:40,485] {models.py:1110} INFO - Executing <Task(BashOperator): test_task_2_with_exit_1> on 2016-03-28 00:00:00
[2016-04-01 18:50:40,502] {bash_operator.py:55} INFO - tmp dir root location: 
/tmp
[2016-04-01 18:50:40,504] {bash_operator.py:64} INFO - Temporary script location :/tmp/airflowtmpmrPZZ6//tmp/airflowtmpmrPZZ6/test_task_2_with_exit_1ApB6qp
[2016-04-01 18:50:40,504] {bash_operator.py:65} INFO - Running command: echo "hello" && sleep 0.2 && exit 1
[2016-04-01 18:50:40,508] {bash_operator.py:73} INFO - Output:
[2016-04-01 18:50:40,509] {bash_operator.py:77} INFO - hello
[2016-04-01 18:50:40,710] {bash_operator.py:80} INFO - Command exited with return code 1
[2016-04-01 18:50:40,710] {models.py:1175} ERROR - Bash command failed
Traceback (most recent call last):
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/models.py", line 1136, in run
    result = task_copy.execute(context=context)
  File "/home/airflow/.pyenv/versions/2.7.10/envs/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/operators/bash_operator.py", line 83, in execute
    raise AirflowException("Bash command failed")
AirflowException: Bash command failed
[2016-04-01 18:50:40,712] {models.py:1195} INFO - Marking task as FAILED.
[2016-04-01 18:50:40,727] {models.py:1216} ERROR - Bash command failed

@jlowin
Copy link
Member

jlowin commented Apr 3, 2016

@syvineckruyk #1271 was just merged into master so I'm going to mark this issue as closed. If you have any other problems please let me know!

@jlowin jlowin closed this as completed Apr 3, 2016
@syvineckruyk
Copy link
Contributor Author

thanks @jlowin great work on this fix.

pradhanpk added a commit to pradhanpk/airflow that referenced this issue Apr 4, 2016
`SchedulerJob` contains a `set` of `TaskInstance`s called `queued_tis`. `SchedulerJob.process_events` loops through `queued_tis` and tries to remove completed tasks. However, without customizing `__eq__` and `__hash__`, the following two lines have no effect, never removing elements from `queued_tis` leading to infinite retries on failure. This is related to my comment on apache#216. The following code was introduced in the fix to apache#1225.

```
elif ti in self.queued_tis:
    self.queued_tis.remove(ti)
````
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
kind:bug This is a clearly a bug
Projects
None yet
Development

No branches or pull requests

2 participants