Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Subdag operators consuming all celeryd worker processes. Tasks are hanging in queued or no state #1350

Closed
syvineckruyk opened this issue Apr 11, 2016 · 15 comments
Labels
kind:bug This is a clearly a bug

Comments

@syvineckruyk
Copy link
Contributor

Dear Airflow Maintainers,

Before I tell you about my issue, let me describe my environment:

Environment

  • Version of Airflow (e.g. a release version, running your own fork, running off master -- provide a git log snippet):
commit fd9388c0c27c2e469f4eb0362800323a08b76d68
Merge: 58abca2 b2844af
Author: bolkedebruin <bolkedebruin@users.noreply.github.com>
Date:   Tue Apr 5 22:24:15 2016 +0200

    Merge pull request #1290 from jlowin/subdag-backfill-status

    Make sure backfill deadlocks raise errors
  • Airflow components and configuration, if applicable (e.g. "Running a Scheduler with CeleryExecutor")
  • 2 Nodes
  • Node 1 has
    • 1 scheduler
    • 1 worker
    • 1 webserver
  • Node 2 has
    • 1 worker
  • CeleryExecutor
  • Airflow Scheduler restarted every 5 runs airflow scheduler -n 5

I was using default settings but have boosted up concurrency in some runs to test. This test was using the following:

# The amount of parallelism as a setting to the executor. This defines
# the max number of task instances that should run simultaneously
# on this airflow installation
parallelism = 128

# The number of task instances allowed to run concurrently by the scheduler
dag_concurrency = 64

# The maximum number of active DAG runs per DAG
max_active_runs_per_dag = 16
  • Example code to reproduce the bug (as a code snippet in markdown)
from datetime import timedelta, datetime
from airflow.models import DAG, Pool
from airflow.operators import BashOperator, SubDagOperator, DummyOperator
import airflow


# -----------------------------------------------------------------\
# DEFINE THE POOLS
# -----------------------------------------------------------------/
session = airflow.settings.Session()
for p in ['test_pool_1', 'test_pool_2', 'test_pool_3']:
    pool = (
        session.query(Pool)
        .filter(Pool.pool == p)
        .first())
    if not pool:
        session.add(Pool(pool=p, slots=8))
        session.commit()


# -----------------------------------------------------------------\
# DEFINE THE DAG
# -----------------------------------------------------------------/

# Define the Dag Name. This must be unique.
dag_name = 'hanging_subdags'

# Default args are passed to each task
default_args = {
    'owner': 'Airflow',
    'depends_on_past': False,
    'start_date': datetime(2016, 04, 10),
    'retries': 0,
    'retry_interval': timedelta(minutes=5),
    'email': ['your@email.com'],
    'email_on_failure': True,
    'email_on_retry': True,
    'wait_for_downstream': False,
}

# Create the dag object
dag = DAG(dag_name,
          default_args=default_args,
          schedule_interval='0 0 * * *'
          )

# -----------------------------------------------------------------\
# DEFINE THE TASKS
# -----------------------------------------------------------------/


def get_subdag(dag, sd_id, pool=None):
    subdag = DAG(
        dag_id='{parent_dag}.{sd_id}'.format(
            parent_dag=dag.dag_id,
            sd_id=sd_id),
        params=dag.params,
        default_args=dag.default_args,
        template_searchpath=dag.template_searchpath,
        user_defined_macros=dag.user_defined_macros,
    )

    t1 = BashOperator(
        task_id='{sd_id}_step_1'.format(
            sd_id=sd_id
        ),
        bash_command='echo "hello" && sleep 60',
        dag=subdag,
        pool=pool
    )

    t2 = BashOperator(
        task_id='{sd_id}_step_two'.format(
            sd_id=sd_id
        ),
        bash_command='echo "hello" && sleep 15',
        dag=subdag,
        pool=pool
    )

    t2.set_upstream(t1)

    sdo = SubDagOperator(
        task_id=sd_id,
        subdag=subdag,
        retries=0,
        retry_delay=timedelta(seconds=5),
        dag=dag,
        depends_on_past=True,
    )

    return sdo

start_task = DummyOperator(
    task_id='start',
    dag=dag
)

for n in range(1, 35):
    sd_i = get_subdag(dag=dag, sd_id='level_1_{n}'.format(n=n), pool='test_pool_1')
    sd_ii = get_subdag(dag=dag, sd_id='level_2_{n}'.format(n=n), pool='test_pool_2')
    sd_iii = get_subdag(dag=dag, sd_id='level_3_{n}'.format(n=n), pool='test_pool_3')

    sd_i.set_upstream(start_task)
    sd_ii.set_upstream(sd_i)
    sd_iii.set_upstream(sd_ii)

  • Screen shots of your DAG's graph and tree views:
    dag view
    image

subdag view
image

dag runs
image

main dag page showing 32 running tasks (subdags)
image

32 Running task instances
image
image
image

pools with 15 queued sub-tasks
image

15 Queued task instances
image

49 Task instances (subdag operator tasks) with no state
image

1 Succesful task instance (not a subdag operator)
image

Text view of task instance log (97 Total task instances)

        running hanging_subdags level_1_2   04-10T00:00:00  SubDagOperator  04-11T22:57:21          341441  usw2dbdpag02.glassdoor.local    airflow 3   db      1       
        running hanging_subdags level_1_3   04-10T00:00:00  SubDagOperator  04-11T22:57:21          341440  usw2dbdpag02.glassdoor.local    airflow 3   db      1       
        queued  hanging_subdags.level_1_28  level_1_28_step_1   04-10T00:00:00  BashOperator    04-11T22:57:11          341438  usw2dbdpag02.glassdoor.local    airflow 2   db  04-11T22:57:11  0   test_pool_1 
        queued  hanging_subdags.level_1_30  level_1_30_step_1   04-10T00:00:00  BashOperator    04-11T22:57:11          341439  usw2dbdpag02.glassdoor.local    airflow 2   db  04-11T22:57:11  0   test_pool_1 
        queued  hanging_subdags.level_1_14  level_1_14_step_1   04-10T00:00:00  BashOperator    04-11T22:57:01          341435  usw2dbdpag02.glassdoor.local    airflow 2   db  04-11T22:57:01  0   test_pool_1 
        queued  hanging_subdags.level_1_22  level_1_22_step_1   04-10T00:00:00  BashOperator    04-11T22:57:00          341436  usw2dbdpag02.glassdoor.local    airflow 2   db  04-11T22:57:00  0   test_pool_1 
        queued  hanging_subdags.level_1_20  level_1_20_step_1   04-10T00:00:00  BashOperator    04-11T22:56:51          341434  usw2dbdpag02.glassdoor.local    airflow 2   db  04-11T22:56:51  0   test_pool_1 
        queued  hanging_subdags.level_1_16  level_1_16_step_1   04-10T00:00:00  BashOperator    04-11T22:56:50          341433  usw2dbdpag02.glassdoor.local    airflow 2   db  04-11T22:56:50  0   test_pool_1 
        running hanging_subdags level_1_1   04-10T00:00:00  SubDagOperator  04-11T22:56:37          341430  usw2dbdpag02.glassdoor.local    airflow 3   db      1       
        queued  hanging_subdags.level_1_32  level_1_32_step_1   04-10T00:00:00  BashOperator    04-11T22:56:37          341428  usw2dbdpag02.glassdoor.local    airflow 2   db  04-11T22:56:37  0   test_pool_1 
        running hanging_subdags level_1_8   04-10T00:00:00  SubDagOperator  04-11T22:56:24          341425  usw2dbdpag02.glassdoor.local    airflow 3   db      1       
        running hanging_subdags level_1_13  04-10T00:00:00  SubDagOperator  04-11T22:56:22          341422  usw2dbdpag02.glassdoor.local    airflow 3   db      1       
        queued  hanging_subdags.level_1_19  level_1_19_step_1   04-10T00:00:00  BashOperator    04-11T22:56:22          341423  usw2dbdpag02.glassdoor.local    airflow 2   db  04-11T22:56:22  0   test_pool_1 
        queued  hanging_subdags.level_1_27  level_1_27_step_1   04-10T00:00:00  BashOperator    04-11T22:56:22          341421  usw2dbdpag02.glassdoor.local    airflow 2   db  04-11T22:56:22  0   test_pool_1 
        queued  hanging_subdags.level_1_29  level_1_29_step_1   04-10T00:00:00  BashOperator    04-11T22:56:22          341424  usw2dbdpag02.glassdoor.local    airflow 2   db  04-11T22:56:22  0   test_pool_1 
        running hanging_subdags level_1_4   04-10T00:00:00  SubDagOperator  04-11T22:56:08          341397  usw2dbdpag02.glassdoor.local    airflow 3   db      1       
        running hanging_subdags level_1_5   04-10T00:00:00  SubDagOperator  04-11T22:56:08          341396  usw2dbdpag02.glassdoor.local    airflow 3   db      1       
        running hanging_subdags level_1_6   04-10T00:00:00  SubDagOperator  04-11T22:56:08          341393  usw2dbdpag02.glassdoor.local    airflow 3   db      1       
        running hanging_subdags level_1_7   04-10T00:00:00  SubDagOperator  04-11T22:56:08          341394  usw2dbdpag02.glassdoor.local    airflow 3   db      1       
        running hanging_subdags level_1_9   04-10T00:00:00  SubDagOperator  04-11T22:56:08          341395  usw2dbdpag02.glassdoor.local    airflow 3   db      1       
        running hanging_subdags level_1_10  04-10T00:00:00  SubDagOperator  04-11T22:56:08          341392  usw2dbdpag02.glassdoor.local    airflow 3   db      1       
        running hanging_subdags level_1_14  04-10T00:00:00  SubDagOperator  04-11T22:56:02          341384  usw2dbdpag01.glassdoor.local    airflow 3   db      1       
        running hanging_subdags level_1_16  04-10T00:00:00  SubDagOperator  04-11T22:56:02          341380  usw2dbdpag01.glassdoor.local    airflow 3   db      1       
        running hanging_subdags level_1_18  04-10T00:00:00  SubDagOperator  04-11T22:56:02          341381  usw2dbdpag01.glassdoor.local    airflow 3   db      1       
        running hanging_subdags level_1_20  04-10T00:00:00  SubDagOperator  04-11T22:56:02          341383  usw2dbdpag01.glassdoor.local    airflow 3   db      1       
        running hanging_subdags level_1_24  04-10T00:00:00  SubDagOperator  04-11T22:56:02          341382  usw2dbdpag01.glassdoor.local    airflow 3   db      1       
        running hanging_subdags level_1_25  04-10T00:00:00  SubDagOperator  04-11T22:56:02          341385  usw2dbdpag01.glassdoor.local    airflow 3   db      1       
        running hanging_subdags level_1_28  04-10T00:00:00  SubDagOperator  04-11T22:56:02          341386  usw2dbdpag01.glassdoor.local    airflow 3   db      1       
        running hanging_subdags level_1_30  04-10T00:00:00  SubDagOperator  04-11T22:56:02          341378  usw2dbdpag01.glassdoor.local    airflow 3   db      1       
        running hanging_subdags level_1_32  04-10T00:00:00  SubDagOperator  04-11T22:56:02          341379  usw2dbdpag01.glassdoor.local    airflow 3   db      1       
        running hanging_subdags level_1_22  04-10T00:00:00  SubDagOperator  04-11T22:56:01          341374  usw2dbdpag01.glassdoor.local    airflow 3   db      1       
        running hanging_subdags level_1_27  04-10T00:00:00  SubDagOperator  04-11T22:56:01          341376  usw2dbdpag01.glassdoor.local    airflow 3   db      1       
        running hanging_subdags level_1_29  04-10T00:00:00  SubDagOperator  04-11T22:56:01          341377  usw2dbdpag01.glassdoor.local    airflow 3   db      1       
        running hanging_subdags level_1_33  04-10T00:00:00  SubDagOperator  04-11T22:56:01          341373  usw2dbdpag01.glassdoor.local    airflow 3   db      1       
        running hanging_subdags level_1_34  04-10T00:00:00  SubDagOperator  04-11T22:56:01          341375  usw2dbdpag01.glassdoor.local    airflow 3   db      1       
        queued  hanging_subdags.level_1_15  level_1_15_step_1   04-10T00:00:00  BashOperator    04-11T22:56:01          341391  usw2dbdpag02.glassdoor.local    airflow 2   db  04-11T22:56:01  0   test_pool_1 
        queued  hanging_subdags.level_1_26  level_1_26_step_1   04-10T00:00:00  BashOperator    04-11T22:56:01          341390  usw2dbdpag02.glassdoor.local    airflow 2   db  04-11T22:56:01  0   test_pool_1 
        running hanging_subdags level_1_19  04-10T00:00:00  SubDagOperator  04-11T22:56:00          341372  usw2dbdpag01.glassdoor.local    airflow 3   db      1       
        running hanging_subdags level_1_23  04-10T00:00:00  SubDagOperator  04-11T22:56:00          341371  usw2dbdpag01.glassdoor.local    airflow 3   db      1       
        queued  hanging_subdags.level_1_17  level_1_17_step_1   04-10T00:00:00  BashOperator    04-11T22:56:00          341389  usw2dbdpag02.glassdoor.local    airflow 2   db  04-11T22:56:00  0   test_pool_1 
        queued  hanging_subdags.level_1_21  level_1_21_step_1   04-10T00:00:00  BashOperator    04-11T22:56:00          341388  usw2dbdpag02.glassdoor.local    airflow 2   db  04-11T22:56:00  0   test_pool_1 
        queued  hanging_subdags.level_1_31  level_1_31_step_1   04-10T00:00:00  BashOperator    04-11T22:56:00          341387  usw2dbdpag02.glassdoor.local    airflow 2   db  04-11T22:56:00  0   test_pool_1 
        running hanging_subdags level_1_15  04-10T00:00:00  SubDagOperator  04-11T22:55:29          341363  usw2dbdpag02.glassdoor.local    airflow 3   db      1       
        running hanging_subdags level_1_17  04-10T00:00:00  SubDagOperator  04-11T22:55:29          341365  usw2dbdpag02.glassdoor.local    airflow 3   db      1       
        running hanging_subdags level_1_21  04-10T00:00:00  SubDagOperator  04-11T22:55:29          341364  usw2dbdpag02.glassdoor.local    airflow 3   db      1       
        running hanging_subdags level_1_26  04-10T00:00:00  SubDagOperator  04-11T22:55:29          341362  usw2dbdpag02.glassdoor.local    airflow 3   db      1       
        running hanging_subdags level_1_31  04-10T00:00:00  SubDagOperator  04-11T22:55:29          341361  usw2dbdpag02.glassdoor.local    airflow 3   db      1       
        success hanging_subdags start   04-10T00:00:00  DummyOperator   04-11T22:55:08  04-11T22:55:08  0:00:00.061273  341360  usw2dbdpag01.glassdoor.local    airflow 103 db      1       
        None    hanging_subdags.level_1_1   level_1_1_step_1    04-10T00:00:00                      None    airflow 2   db      0   test_pool_1 
        None    hanging_subdags.level_1_2   level_1_2_step_1    04-10T00:00:00                      None    airflow 2   db      0   test_pool_1 
        None    hanging_subdags.level_1_3   level_1_3_step_1    04-10T00:00:00                      None    airflow 2   db      0   test_pool_1 
        None    hanging_subdags.level_1_4   level_1_4_step_1    04-10T00:00:00                      None    airflow 2   db      0   test_pool_1 
        None    hanging_subdags.level_1_5   level_1_5_step_1    04-10T00:00:00                      None    airflow 2   db      0   test_pool_1 
        None    hanging_subdags.level_1_6   level_1_6_step_1    04-10T00:00:00                      None    airflow 2   db      0   test_pool_1 
        None    hanging_subdags.level_1_7   level_1_7_step_1    04-10T00:00:00                      None    airflow 2   db      0   test_pool_1 
        None    hanging_subdags.level_1_8   level_1_8_step_1    04-10T00:00:00                      None    airflow 2   db      0   test_pool_1 
        None    hanging_subdags.level_1_9   level_1_9_step_1    04-10T00:00:00                      None    airflow 2   db      0   test_pool_1 
        None    hanging_subdags.level_1_10  level_1_10_step_1   04-10T00:00:00                      None    airflow 2   db      0   test_pool_1 
        None    hanging_subdags.level_1_13  level_1_13_step_1   04-10T00:00:00                      None    airflow 2   db      0   test_pool_1 
        None    hanging_subdags.level_1_18  level_1_18_step_1   04-10T00:00:00                      None    airflow 2   db      0   test_pool_1 
        None    hanging_subdags.level_1_23  level_1_23_step_1   04-10T00:00:00                      None    airflow 2   db      0   test_pool_1 
        None    hanging_subdags.level_1_24  level_1_24_step_1   04-10T00:00:00                      None    airflow 2   db      0   test_pool_1 
        None    hanging_subdags.level_1_25  level_1_25_step_1   04-10T00:00:00                      None    airflow 2   db      0   test_pool_1 
        None    hanging_subdags.level_1_33  level_1_33_step_1   04-10T00:00:00                      None    airflow 2   db      0   test_pool_1 
        None    hanging_subdags.level_1_34  level_1_34_step_1   04-10T00:00:00                      None    airflow 2   db      0   test_pool_1 
        None    hanging_subdags.level_1_1   level_1_1_step_two  04-10T00:00:00                      None    airflow 1   db      0   test_pool_1 
        None    hanging_subdags.level_1_2   level_1_2_step_two  04-10T00:00:00                      None    airflow 1   db      0   test_pool_1 
        None    hanging_subdags.level_1_3   level_1_3_step_two  04-10T00:00:00                      None    airflow 1   db      0   test_pool_1 
        None    hanging_subdags.level_1_4   level_1_4_step_two  04-10T00:00:00                      None    airflow 1   db      0   test_pool_1 
        None    hanging_subdags.level_1_5   level_1_5_step_two  04-10T00:00:00                      None    airflow 1   db      0   test_pool_1 
        None    hanging_subdags.level_1_6   level_1_6_step_two  04-10T00:00:00                      None    airflow 1   db      0   test_pool_1 
        None    hanging_subdags.level_1_7   level_1_7_step_two  04-10T00:00:00                      None    airflow 1   db      0   test_pool_1 
        None    hanging_subdags.level_1_8   level_1_8_step_two  04-10T00:00:00                      None    airflow 1   db      0   test_pool_1 
        None    hanging_subdags.level_1_9   level_1_9_step_two  04-10T00:00:00                      None    airflow 1   db      0   test_pool_1 
        None    hanging_subdags.level_1_10  level_1_10_step_two     04-10T00:00:00                      None    airflow 1   db      0   test_pool_1 
        None    hanging_subdags.level_1_13  level_1_13_step_two     04-10T00:00:00                      None    airflow 1   db      0   test_pool_1 
        None    hanging_subdags.level_1_14  level_1_14_step_two     04-10T00:00:00                      None    airflow 1   db      0   test_pool_1 
        None    hanging_subdags.level_1_15  level_1_15_step_two     04-10T00:00:00                      None    airflow 1   db      0   test_pool_1 
        None    hanging_subdags.level_1_16  level_1_16_step_two     04-10T00:00:00                      None    airflow 1   db      0   test_pool_1 
        None    hanging_subdags.level_1_17  level_1_17_step_two     04-10T00:00:00                      None    airflow 1   db      0   test_pool_1 
        None    hanging_subdags.level_1_18  level_1_18_step_two     04-10T00:00:00                      None    airflow 1   db      0   test_pool_1 
        None    hanging_subdags.level_1_19  level_1_19_step_two     04-10T00:00:00                      None    airflow 1   db      0   test_pool_1 
        None    hanging_subdags.level_1_20  level_1_20_step_two     04-10T00:00:00                      None    airflow 1   db      0   test_pool_1 
        None    hanging_subdags.level_1_21  level_1_21_step_two     04-10T00:00:00                      None    airflow 1   db      0   test_pool_1 
        None    hanging_subdags.level_1_22  level_1_22_step_two     04-10T00:00:00                      None    airflow 1   db      0   test_pool_1 
        None    hanging_subdags.level_1_23  level_1_23_step_two     04-10T00:00:00                      None    airflow 1   db      0   test_pool_1 
        None    hanging_subdags.level_1_24  level_1_24_step_two     04-10T00:00:00                      None    airflow 1   db      0   test_pool_1 
        None    hanging_subdags.level_1_25  level_1_25_step_two     04-10T00:00:00                      None    airflow 1   db      0   test_pool_1 
        None    hanging_subdags.level_1_26  level_1_26_step_two     04-10T00:00:00                      None    airflow 1   db      0   test_pool_1 
        None    hanging_subdags.level_1_27  level_1_27_step_two     04-10T00:00:00                      None    airflow 1   db      0   test_pool_1 
        None    hanging_subdags.level_1_28  level_1_28_step_two     04-10T00:00:00                      None    airflow 1   db      0   test_pool_1 
        None    hanging_subdags.level_1_29  level_1_29_step_two     04-10T00:00:00                      None    airflow 1   db      0   test_pool_1 
        None    hanging_subdags.level_1_30  level_1_30_step_two     04-10T00:00:00                      None    airflow 1   db      0   test_pool_1 
        None    hanging_subdags.level_1_31  level_1_31_step_two     04-10T00:00:00                      None    airflow 1   db      0   test_pool_1 
        None    hanging_subdags.level_1_32  level_1_32_step_two     04-10T00:00:00                      None    airflow 1   db      0   test_pool_1 
        None    hanging_subdags.level_1_33  level_1_33_step_two     04-10T00:00:00                      None    airflow 1   db      0   test_pool_1 
        None    hanging_subdags.level_1_34  level_1_34_step_two     04-10T00:00:00                      None    airflow 1   db      0   test_pool_1 
  • Stack trace if applicable:
    NA
  • Operating System: (Windows Version or $ uname -a)
[root@*** ~]# uname -a
Linux *** 2.6.32-573.12.1.el6.x86_64 #1 SMP Tue Dec 15 21:19:08 UTC 2015 x86_64 x86_64 x86_64 GNU/Linux
  • Python Version: $ python --version
(airflow) [airflow@*** ~]$ python --version
Python 2.7.10
  • (Optional) Python packages: $ pip freeze or $ conda list

Now that you know a little about me, let me tell you about the issue I am having:

Description of Issue

  • What did you expect to happen?
    • subdag operators are not assigned pools. I expected them to consume the 128 slots allocated for non pooled tasks
    • sub-tasks to be queued and then executed in pools
  • What happened instead?
    • subdag operators are running
    • subtasks sometimes execute but mostly get stuck in queue or without a state
    • slot reporting seems erroneous
[2016-04-11 23:42:31,502] {base_executor.py:80} DEBUG - 2 running task instances
[2016-04-11 23:42:31,502] {base_executor.py:81} DEBUG - 0 in queue
[2016-04-11 23:42:31,503] {base_executor.py:82} DEBUG - 126 open slots
  • Here is how you can reproduce this issue on your machine:

    Reproduction Steps

  1. Use celery executor
  2. Restart scheduler every N runs (I am using 5)
  3. Run the included dag
  4. Observe
  5. Rinse / Repeat (The behavior is somewhat erratic. In my production dag sometimes it gets part way through)
@syvineckruyk
Copy link
Contributor Author

i've observed that restarting the worker processes sometimes unblocks a few tasks.

@syvineckruyk
Copy link
Contributor Author

tested with 5 subdags per level and it runs fine... just completed 3 runs covering a two day period each ... a limit is being hit somewhere.

This works (5 subdags per level):

from datetime import timedelta, datetime
from airflow.models import DAG, Pool
from airflow.operators import BashOperator, SubDagOperator, DummyOperator
import airflow


# -----------------------------------------------------------------\
# DEFINE THE POOLS
# -----------------------------------------------------------------/
session = airflow.settings.Session()
for p in ['test_pool_1', 'test_pool_2', 'test_pool_3']:
    pool = (
        session.query(Pool)
        .filter(Pool.pool == p)
        .first())
    if not pool:
        session.add(Pool(pool=p, slots=8))
        session.commit()


# -----------------------------------------------------------------\
# DEFINE THE DAG
# -----------------------------------------------------------------/

# Define the Dag Name. This must be unique.
dag_name = 'hanging_subdags_n5'

# Default args are passed to each task
default_args = {
    'owner': 'Airflow',
    'depends_on_past': False,
    'start_date': datetime(2016, 04, 10),
    'retries': 0,
    'retry_interval': timedelta(minutes=5),
    'email': ['your@email.com'],
    'email_on_failure': True,
    'email_on_retry': True,
    'wait_for_downstream': False,
}

# Create the dag object
dag = DAG(dag_name,
          default_args=default_args,
          schedule_interval='0 0 * * *'
          )

# -----------------------------------------------------------------\
# DEFINE THE TASKS
# -----------------------------------------------------------------/


def get_subdag(dag, sd_id, pool=None):
    subdag = DAG(
        dag_id='{parent_dag}.{sd_id}'.format(
            parent_dag=dag.dag_id,
            sd_id=sd_id),
        params=dag.params,
        default_args=dag.default_args,
        template_searchpath=dag.template_searchpath,
        user_defined_macros=dag.user_defined_macros,
    )

    t1 = BashOperator(
        task_id='{sd_id}_step_1'.format(
            sd_id=sd_id
        ),
        bash_command='echo "hello" && sleep 60',
        dag=subdag,
        pool=pool
    )

    t2 = BashOperator(
        task_id='{sd_id}_step_two'.format(
            sd_id=sd_id
        ),
        bash_command='echo "hello" && sleep 15',
        dag=subdag,
        pool=pool
    )

    t2.set_upstream(t1)

    sdo = SubDagOperator(
        task_id=sd_id,
        subdag=subdag,
        retries=0,
        retry_delay=timedelta(seconds=5),
        dag=dag,
        depends_on_past=True,
    )

    return sdo

start_task = DummyOperator(
    task_id='start',
    dag=dag
)

for n in range(1, 6):
    sd_i = get_subdag(dag=dag, sd_id='level_1_{n}'.format(n=n), pool='test_pool_1')
    sd_ii = get_subdag(dag=dag, sd_id='level_2_{n}'.format(n=n), pool='test_pool_2')
    sd_iii = get_subdag(dag=dag, sd_id='level_3_{n}'.format(n=n), pool='test_pool_3')

    sd_i.set_upstream(start_task)
    sd_ii.set_upstream(sd_i)
    sd_iii.set_upstream(sd_ii)

@syvineckruyk
Copy link
Contributor Author

I believe this is a known issue ... but it came up so i took a screenshot
image

@syvineckruyk
Copy link
Contributor Author

Performed the same test with 10 subdags per level (2 days). Everything worked as expected.

from datetime import timedelta, datetime
from airflow.models import DAG, Pool
from airflow.operators import BashOperator, SubDagOperator, DummyOperator
import airflow


# -----------------------------------------------------------------\
# DEFINE THE POOLS
# -----------------------------------------------------------------/
session = airflow.settings.Session()
for p in ['test_pool_1', 'test_pool_2', 'test_pool_3']:
    pool = (
        session.query(Pool)
        .filter(Pool.pool == p)
        .first())
    if not pool:
        session.add(Pool(pool=p, slots=8))
        session.commit()


# -----------------------------------------------------------------\
# DEFINE THE DAG
# -----------------------------------------------------------------/

# Define the Dag Name. This must be unique.
dag_name = 'hanging_subdags_n10'

# Default args are passed to each task
default_args = {
    'owner': 'Airflow',
    'depends_on_past': False,
    'start_date': datetime(2016, 04, 10),
    'retries': 0,
    'retry_interval': timedelta(minutes=5),
    'email': ['your@email.com'],
    'email_on_failure': True,
    'email_on_retry': True,
    'wait_for_downstream': False,
}

# Create the dag object
dag = DAG(dag_name,
          default_args=default_args,
          schedule_interval='0 0 * * *'
          )

# -----------------------------------------------------------------\
# DEFINE THE TASKS
# -----------------------------------------------------------------/


def get_subdag(dag, sd_id, pool=None):
    subdag = DAG(
        dag_id='{parent_dag}.{sd_id}'.format(
            parent_dag=dag.dag_id,
            sd_id=sd_id),
        params=dag.params,
        default_args=dag.default_args,
        template_searchpath=dag.template_searchpath,
        user_defined_macros=dag.user_defined_macros,
    )

    t1 = BashOperator(
        task_id='{sd_id}_step_1'.format(
            sd_id=sd_id
        ),
        bash_command='echo "hello" && sleep 60',
        dag=subdag,
        pool=pool
    )

    t2 = BashOperator(
        task_id='{sd_id}_step_two'.format(
            sd_id=sd_id
        ),
        bash_command='echo "hello" && sleep 15',
        dag=subdag,
        pool=pool
    )

    t2.set_upstream(t1)

    sdo = SubDagOperator(
        task_id=sd_id,
        subdag=subdag,
        retries=0,
        retry_delay=timedelta(seconds=5),
        dag=dag,
        depends_on_past=True,
    )

    return sdo

start_task = DummyOperator(
    task_id='start',
    dag=dag
)

for n in range(1, 11):
    sd_i = get_subdag(dag=dag, sd_id='level_1_{n}'.format(n=n), pool='test_pool_1')
    sd_ii = get_subdag(dag=dag, sd_id='level_2_{n}'.format(n=n), pool='test_pool_2')
    sd_iii = get_subdag(dag=dag, sd_id='level_3_{n}'.format(n=n), pool='test_pool_3')

    sd_i.set_upstream(start_task)
    sd_ii.set_upstream(sd_i)
    sd_iii.set_upstream(sd_ii)

@syvineckruyk
Copy link
Contributor Author

Performed another thest with 16 Subdags per level.

Now things start to fall apart.

from datetime import timedelta, datetime
from airflow.models import DAG, Pool
from airflow.operators import BashOperator, SubDagOperator, DummyOperator
import airflow


# -----------------------------------------------------------------\
# DEFINE THE POOLS
# -----------------------------------------------------------------/
session = airflow.settings.Session()
for p in ['test_pool_1', 'test_pool_2', 'test_pool_3']:
    pool = (
        session.query(Pool)
        .filter(Pool.pool == p)
        .first())
    if not pool:
        session.add(Pool(pool=p, slots=8))
        session.commit()


# -----------------------------------------------------------------\
# DEFINE THE DAG
# -----------------------------------------------------------------/

# Define the Dag Name. This must be unique.
dag_name = 'hanging_subdags_n16'

# Default args are passed to each task
default_args = {
    'owner': 'Airflow',
    'depends_on_past': False,
    'start_date': datetime(2016, 04, 10),
    'retries': 0,
    'retry_interval': timedelta(minutes=5),
    'email': ['your@email.com'],
    'email_on_failure': True,
    'email_on_retry': True,
    'wait_for_downstream': False,
}

# Create the dag object
dag = DAG(dag_name,
          default_args=default_args,
          schedule_interval='0 0 * * *'
          )

# -----------------------------------------------------------------\
# DEFINE THE TASKS
# -----------------------------------------------------------------/


def get_subdag(dag, sd_id, pool=None):
    subdag = DAG(
        dag_id='{parent_dag}.{sd_id}'.format(
            parent_dag=dag.dag_id,
            sd_id=sd_id),
        params=dag.params,
        default_args=dag.default_args,
        template_searchpath=dag.template_searchpath,
        user_defined_macros=dag.user_defined_macros,
    )

    t1 = BashOperator(
        task_id='{sd_id}_step_1'.format(
            sd_id=sd_id
        ),
        bash_command='echo "hello" && sleep 60',
        dag=subdag,
        pool=pool
    )

    t2 = BashOperator(
        task_id='{sd_id}_step_two'.format(
            sd_id=sd_id
        ),
        bash_command='echo "hello" && sleep 15',
        dag=subdag,
        pool=pool
    )

    t2.set_upstream(t1)

    sdo = SubDagOperator(
        task_id=sd_id,
        subdag=subdag,
        retries=0,
        retry_delay=timedelta(seconds=5),
        dag=dag,
        depends_on_past=True,
    )

    return sdo

start_task = DummyOperator(
    task_id='start',
    dag=dag
)

for n in range(1, 17):
    sd_i = get_subdag(dag=dag, sd_id='level_1_{n}'.format(n=n), pool='test_pool_1')
    sd_ii = get_subdag(dag=dag, sd_id='level_2_{n}'.format(n=n), pool='test_pool_2')
    sd_iii = get_subdag(dag=dag, sd_id='level_3_{n}'.format(n=n), pool='test_pool_3')

    sd_i.set_upstream(start_task)
    sd_ii.set_upstream(sd_i)
    sd_iii.set_upstream(sd_ii)

Tasks are running as expected until a point is reached where there are 32 running subdag operators. Then everything grinds to a halt.
32 running tasks
image

32 Subdag Operator task instances that are running
image

image

image

Text version

    State   Dag Id  Task Id     Execution Date  Operator    Start Date  End Date    Duration    Job Id  Hostname    Unixname    Priority Weight Queue   Queued Dttm Try Number  Pool    Log
        running hanging_subdags_n16 level_1_1   04-11T00:00:00  SubDagOperator  04-12T02:52:11          343347  usw2dbdpag01.glassdoor.local    airflow 3   db      1       
        running hanging_subdags_n16 level_1_10  04-11T00:00:00  SubDagOperator  04-12T02:51:00          343313  usw2dbdpag01.glassdoor.local    airflow 3   db      1       
        running hanging_subdags_n16 level_1_11  04-11T00:00:00  SubDagOperator  04-12T02:51:21          343324  usw2dbdpag01.glassdoor.local    airflow 3   db      1       
        running hanging_subdags_n16 level_1_12  04-11T00:00:00  SubDagOperator  04-12T02:54:58          343399  usw2dbdpag01.glassdoor.local    airflow 3   db      1       
        running hanging_subdags_n16 level_1_13  04-11T00:00:00  SubDagOperator  04-12T02:52:04          343349  usw2dbdpag02.glassdoor.local    airflow 3   db      1       
        running hanging_subdags_n16 level_1_14  04-11T00:00:00  SubDagOperator  04-12T02:53:46          343391  usw2dbdpag01.glassdoor.local    airflow 3   db      1       
        running hanging_subdags_n16 level_1_15  04-11T00:00:00  SubDagOperator  04-12T02:52:00          343344  usw2dbdpag02.glassdoor.local    airflow 3   db      1       
        running hanging_subdags_n16 level_1_16  04-11T00:00:00  SubDagOperator  04-12T02:52:13          343350  usw2dbdpag01.glassdoor.local    airflow 3   db      1       
        running hanging_subdags_n16 level_1_2   04-11T00:00:00  SubDagOperator  04-12T02:52:19          343364  usw2dbdpag01.glassdoor.local    airflow 3   db      1       
        running hanging_subdags_n16 level_1_3   04-11T00:00:00  SubDagOperator  04-12T02:52:00          343340  usw2dbdpag02.glassdoor.local    airflow 3   db      1       
        running hanging_subdags_n16 level_1_4   04-11T00:00:00  SubDagOperator  04-12T02:50:43          343306  usw2dbdpag02.glassdoor.local    airflow 3   db      1       
        running hanging_subdags_n16 level_1_5   04-11T00:00:00  SubDagOperator  04-12T02:52:19          343365  usw2dbdpag01.glassdoor.local    airflow 3   db      1       
        running hanging_subdags_n16 level_1_6   04-11T00:00:00  SubDagOperator  04-12T02:52:00          343342  usw2dbdpag02.glassdoor.local    airflow 3   db      1       
        running hanging_subdags_n16 level_1_7   04-11T00:00:00  SubDagOperator  04-12T02:52:15          343354  usw2dbdpag01.glassdoor.local    airflow 3   db      1       
        running hanging_subdags_n16 level_1_8   04-11T00:00:00  SubDagOperator  04-12T02:52:12          343346  usw2dbdpag01.glassdoor.local    airflow 3   db      1       
        running hanging_subdags_n16 level_1_9   04-11T00:00:00  SubDagOperator  04-12T02:50:43          343307  usw2dbdpag02.glassdoor.local    airflow 3   db      1       
        running hanging_subdags_n16 level_2_1   04-10T00:00:00  SubDagOperator  04-12T02:52:07          343360  usw2dbdpag02.glassdoor.local    airflow 2   db      1       
        running hanging_subdags_n16 level_2_10  04-10T00:00:00  SubDagOperator  04-12T02:50:56          343308  usw2dbdpag01.glassdoor.local    airflow 2   db      1       
        running hanging_subdags_n16 level_2_11  04-10T00:00:00  SubDagOperator  04-12T02:51:22          343326  usw2dbdpag01.glassdoor.local    airflow 2   db      1       
        running hanging_subdags_n16 level_2_12  04-10T00:00:00  SubDagOperator  04-12T02:52:41          343384  usw2dbdpag02.glassdoor.local    airflow 2   db      1       
        running hanging_subdags_n16 level_2_13  04-10T00:00:00  SubDagOperator  04-12T02:52:11          343348  usw2dbdpag01.glassdoor.local    airflow 2   db      1       
        running hanging_subdags_n16 level_2_14  04-10T00:00:00  SubDagOperator  04-12T02:52:33          343382  usw2dbdpag02.glassdoor.local    airflow 2   db      1       
        running hanging_subdags_n16 level_2_15  04-10T00:00:00  SubDagOperator  04-12T02:52:00          343341  usw2dbdpag02.glassdoor.local    airflow 2   db      1       
        running hanging_subdags_n16 level_2_16  04-10T00:00:00  SubDagOperator  04-12T02:52:00          343343  usw2dbdpag02.glassdoor.local    airflow 2   db      1       
        running hanging_subdags_n16 level_2_2   04-10T00:00:00  SubDagOperator  04-12T02:55:40          343402  usw2dbdpag02.glassdoor.local    airflow 2   db      1       
        running hanging_subdags_n16 level_2_3   04-10T00:00:00  SubDagOperator  04-12T02:52:20          343370  usw2dbdpag01.glassdoor.local    airflow 2   db      1       
        running hanging_subdags_n16 level_2_4   04-10T00:00:00  SubDagOperator  04-12T02:50:43          343305  usw2dbdpag02.glassdoor.local    airflow 2   db      1       
        running hanging_subdags_n16 level_2_5   04-10T00:00:00  SubDagOperator  04-12T02:52:09          343363  usw2dbdpag02.glassdoor.local    airflow 2   db      1       
        running hanging_subdags_n16 level_2_6   04-10T00:00:00  SubDagOperator  04-12T02:52:13          343352  usw2dbdpag01.glassdoor.local    airflow 2   db      1       
        running hanging_subdags_n16 level_2_7   04-10T00:00:00  SubDagOperator  04-12T02:52:12          343351  usw2dbdpag01.glassdoor.local    airflow 2   db      1       
        running hanging_subdags_n16 level_2_8   04-10T00:00:00  SubDagOperator  04-12T02:52:01          343345  usw2dbdpag02.glassdoor.local    airflow 2   db      1       
        running hanging_subdags_n16 level_2_9   04-10T00:00:00  SubDagOperator  04-12T02:50:43          343304  usw2dbdpag02.glassdoor.local    airflow 2   db      1       

Tasks stuck in 'queued' state
image

The same tasks are present in the pool
image

Text version

    queued  hanging_subdags_n16.level_2_9   level_2_9_step_two  04-10T00:00:00  BashOperator    04-12T02:54:45          343397  usw2dbdpag01.glassdoor.local    airflow 1   db  04-12T02:54:45  0   test_pool_2 
        queued  hanging_subdags_n16.level_2_8   level_2_8_step_1    04-10T00:00:00  BashOperator    04-12T02:54:36          343396  usw2dbdpag01.glassdoor.local    airflow 2   db  04-12T02:54:36  0   test_pool_2 
        queued  hanging_subdags_n16.level_2_15  level_2_15_step_1   04-10T00:00:00  BashOperator    04-12T02:54:26          343394  usw2dbdpag01.glassdoor.local    airflow 2   db  04-12T02:54:26  0   test_pool_2 
        queued  hanging_subdags_n16.level_2_12  level_2_12_step_1   04-10T00:00:00  BashOperator    04-12T02:53:33          343389  usw2dbdpag01.glassdoor.local    airflow 2   db  04-12T02:53:33  0   test_pool_2 
        queued  hanging_subdags_n16.level_1_3   level_1_3_step_1    04-11T00:00:00  BashOperator    04-12T02:52:32          343381  usw2dbdpag02.glassdoor.local    airflow 2   db  04-12T02:52:32  0   test_pool_1 

Task instances with no state (Text)

State   Dag Id  Task Id Execution Date  Operator    Start Date  End Date    Duration    Job Id  Hostname    Unixname    Priority Weight Queue   Queued Dttm Try Number  Pool    Log
        None    hanging_subdags_n16.level_1_1   level_1_1_step_1    04-11T00:00:00                      None    airflow 2   db      0   test_pool_1 
        None    hanging_subdags_n16.level_1_2   level_1_2_step_1    04-11T00:00:00                      None    airflow 2   db      0   test_pool_1 
        None    hanging_subdags_n16.level_1_5   level_1_5_step_1    04-11T00:00:00                      None    airflow 2   db      0   test_pool_1 
        None    hanging_subdags_n16.level_1_6   level_1_6_step_1    04-11T00:00:00                      None    airflow 2   db      0   test_pool_1 
        None    hanging_subdags_n16.level_1_7   level_1_7_step_1    04-11T00:00:00                      None    airflow 2   db      0   test_pool_1 
        None    hanging_subdags_n16.level_1_8   level_1_8_step_1    04-11T00:00:00                      None    airflow 2   db      0   test_pool_1 
        None    hanging_subdags_n16.level_2_1   level_2_1_step_1    04-10T00:00:00                      None    airflow 2   db      0   test_pool_2 
        None    hanging_subdags_n16.level_2_2   level_2_2_step_1    04-10T00:00:00                      None    airflow 2   db      0   test_pool_2 
        None    hanging_subdags_n16.level_2_3   level_2_3_step_1    04-10T00:00:00                      None    airflow 2   db      0   test_pool_2 
        None    hanging_subdags_n16.level_2_5   level_2_5_step_1    04-10T00:00:00                      None    airflow 2   db      0   test_pool_2 
        None    hanging_subdags_n16.level_2_6   level_2_6_step_1    04-10T00:00:00                      None    airflow 2   db      0   test_pool_2 
        None    hanging_subdags_n16.level_2_7   level_2_7_step_1    04-10T00:00:00                      None    airflow 2   db      0   test_pool_2 
        None    hanging_subdags_n16.level_1_12  level_1_12_step_1   04-11T00:00:00                      None    airflow 2   db      0   test_pool_1 
        None    hanging_subdags_n16.level_1_13  level_1_13_step_1   04-11T00:00:00                      None    airflow 2   db      0   test_pool_1 
        None    hanging_subdags_n16.level_1_14  level_1_14_step_1   04-11T00:00:00                      None    airflow 2   db      0   test_pool_1 
        None    hanging_subdags_n16.level_1_15  level_1_15_step_1   04-11T00:00:00                      None    airflow 2   db      0   test_pool_1 
        None    hanging_subdags_n16.level_1_16  level_1_16_step_1   04-11T00:00:00                      None    airflow 2   db      0   test_pool_1 
        None    hanging_subdags_n16.level_2_13  level_2_13_step_1   04-10T00:00:00                      None    airflow 2   db      0   test_pool_2 
        None    hanging_subdags_n16.level_2_14  level_2_14_step_1   04-10T00:00:00                      None    airflow 2   db      0   test_pool_2 
        None    hanging_subdags_n16.level_2_16  level_2_16_step_1   04-10T00:00:00                      None    airflow 2   db      0   test_pool_2 
        None    hanging_subdags_n16.level_1_1   level_1_1_step_two  04-11T00:00:00                      None    airflow 1   db      0   test_pool_1 
        None    hanging_subdags_n16.level_1_2   level_1_2_step_two  04-11T00:00:00                      None    airflow 1   db      0   test_pool_1 
        None    hanging_subdags_n16.level_1_3   level_1_3_step_two  04-11T00:00:00                      None    airflow 1   db      0   test_pool_1 
        None    hanging_subdags_n16.level_1_4   level_1_4_step_two  04-11T00:00:00                      None    airflow 1   db      0   test_pool_1 
        None    hanging_subdags_n16.level_1_5   level_1_5_step_two  04-11T00:00:00                      None    airflow 1   db      0   test_pool_1 
        None    hanging_subdags_n16.level_1_6   level_1_6_step_two  04-11T00:00:00                      None    airflow 1   db      0   test_pool_1 
        None    hanging_subdags_n16.level_1_7   level_1_7_step_two  04-11T00:00:00                      None    airflow 1   db      0   test_pool_1 
        None    hanging_subdags_n16.level_1_8   level_1_8_step_two  04-11T00:00:00                      None    airflow 1   db      0   test_pool_1 
        None    hanging_subdags_n16.level_1_9   level_1_9_step_two  04-11T00:00:00                      None    airflow 1   db      0   test_pool_1 
        None    hanging_subdags_n16.level_2_1   level_2_1_step_two  04-10T00:00:00                      None    airflow 1   db      0   test_pool_2 
        None    hanging_subdags_n16.level_2_2   level_2_2_step_two  04-10T00:00:00                      None    airflow 1   db      0   test_pool_2 
        None    hanging_subdags_n16.level_2_3   level_2_3_step_two  04-10T00:00:00                      None    airflow 1   db      0   test_pool_2 
        None    hanging_subdags_n16.level_2_4   level_2_4_step_two  04-10T00:00:00                      None    airflow 1   db      0   test_pool_2 
        None    hanging_subdags_n16.level_2_5   level_2_5_step_two  04-10T00:00:00                      None    airflow 1   db      0   test_pool_2 
        None    hanging_subdags_n16.level_2_6   level_2_6_step_two  04-10T00:00:00                      None    airflow 1   db      0   test_pool_2 
        None    hanging_subdags_n16.level_2_7   level_2_7_step_two  04-10T00:00:00                      None    airflow 1   db      0   test_pool_2 
        None    hanging_subdags_n16.level_2_8   level_2_8_step_two  04-10T00:00:00                      None    airflow 1   db      0   test_pool_2 
        None    hanging_subdags_n16.level_1_10  level_1_10_step_two     04-11T00:00:00                      None    airflow 1   db      0   test_pool_1 
        None    hanging_subdags_n16.level_1_11  level_1_11_step_two     04-11T00:00:00                      None    airflow 1   db      0   test_pool_1 
        None    hanging_subdags_n16.level_1_12  level_1_12_step_two     04-11T00:00:00                      None    airflow 1   db      0   test_pool_1 
        None    hanging_subdags_n16.level_1_13  level_1_13_step_two     04-11T00:00:00                      None    airflow 1   db      0   test_pool_1 
        None    hanging_subdags_n16.level_1_14  level_1_14_step_two     04-11T00:00:00                      None    airflow 1   db      0   test_pool_1 
        None    hanging_subdags_n16.level_1_15  level_1_15_step_two     04-11T00:00:00                      None    airflow 1   db      0   test_pool_1 
        None    hanging_subdags_n16.level_1_16  level_1_16_step_two     04-11T00:00:00                      None    airflow 1   db      0   test_pool_1 
        None    hanging_subdags_n16.level_2_10  level_2_10_step_two     04-10T00:00:00                      None    airflow 1   db      0   test_pool_2 
        None    hanging_subdags_n16.level_2_11  level_2_11_step_two     04-10T00:00:00                      None    airflow 1   db      0   test_pool_2 
        None    hanging_subdags_n16.level_2_12  level_2_12_step_two     04-10T00:00:00                      None    airflow 1   db      0   test_pool_2 
        None    hanging_subdags_n16.level_2_13  level_2_13_step_two     04-10T00:00:00                      None    airflow 1   db      0   test_pool_2 
        None    hanging_subdags_n16.level_2_14  level_2_14_step_two     04-10T00:00:00                      None    airflow 1   db      0   test_pool_2 
        None    hanging_subdags_n16.level_2_15  level_2_15_step_two     04-10T00:00:00                      None    airflow 1   db      0   test_pool_2 
        None    hanging_subdags_n16.level_2_16  level_2_16_step_two     04-10T00:00:00                      None    airflow 1   db      0   test_pool_2 

Succesfull task instances

State   Dag Id  Task Id Execution Date  Operator    Start Date  End Date    Duration    Job Id  Hostname    Unixname    Priority Weight Queue   Queued Dttm Try Number  Pool    Log
        success hanging_subdags_n16.level_2_11  level_2_11_step_1   04-10T00:00:00  BashOperator    04-12T02:54:28  04-12T02:55:28  0:01:00.053700  343395  usw2dbdpag02.glassdoor.local    airflow 2   db  04-12T02:51:47  1   test_pool_2 
        success hanging_subdags_n16.level_1_11  level_1_11_step_1   04-11T00:00:00  BashOperator    04-12T02:53:16  04-12T02:54:17  0:01:00.049400  343388  usw2dbdpag02.glassdoor.local    airflow 2   db  04-12T02:51:44  1   test_pool_1 
        success hanging_subdags_n16.level_1_10  level_1_10_step_1   04-11T00:00:00  BashOperator    04-12T02:53:14  04-12T02:54:14  0:01:00.064500  343387  usw2dbdpag01.glassdoor.local    airflow 2   db  04-12T02:51:32  1   test_pool_1 
        success hanging_subdags_n16.level_2_10  level_2_10_step_1   04-10T00:00:00  BashOperator    04-12T02:52:19  04-12T02:53:19  0:01:00.123500  343366  usw2dbdpag01.glassdoor.local    airflow 2   db  04-12T02:51:30  1   test_pool_2 
        success hanging_subdags_n16.level_1_4   level_1_4_step_1    04-11T00:00:00  BashOperator    04-12T02:52:08  04-12T02:53:08  0:01:00.101500  343361  usw2dbdpag02.glassdoor.local    airflow 2   db  04-12T02:51:26  1   test_pool_1 
        success hanging_subdags_n16.level_2_9   level_2_9_step_1    04-10T00:00:00  BashOperator    04-12T02:51:59  04-12T02:52:59  0:01:00.275900  343339  usw2dbdpag01.glassdoor.local    airflow 2   db  04-12T02:51:25  1   test_pool_2 
        success hanging_subdags_n16.level_1_12  level_1_12_step_two     04-10T00:00:00  BashOperator    04-12T02:51:28  04-12T02:51:43  0:00:15.084500  343330  usw2dbdpag01.glassdoor.local    airflow 1   db  04-12T02:50:44  1   test_pool_1 
        success hanging_subdags_n16.level_1_14  level_1_14_step_two     04-10T00:00:00  BashOperator    04-12T02:51:25  04-12T02:51:40  0:00:15.180700  343327  usw2dbdpag01.glassdoor.local    airflow 1   db  04-12T02:50:39  1   test_pool_1 
        success hanging_subdags_n16.level_1_5   level_1_5_step_two  04-10T00:00:00  BashOperator    04-12T02:51:22  04-12T02:51:38  0:00:15.203100  343325  usw2dbdpag01.glassdoor.local    airflow 1   db  04-12T02:50:38  1   test_pool_1 
        success hanging_subdags_n16.level_1_9   level_1_9_step_1    04-11T00:00:00  BashOperator    04-12T02:51:20  04-12T02:52:20  0:01:00.054000  343331  usw2dbdpag02.glassdoor.local    airflow 2   db  04-12T02:51:02  1   test_pool_1 
        success hanging_subdags_n16.level_2_4   level_2_4_step_1    04-10T00:00:00  BashOperator    04-12T02:51:20  04-12T02:52:20  0:01:00.064000  343332  usw2dbdpag02.glassdoor.local    airflow 2   db  04-12T02:51:02  1   test_pool_2 
        success hanging_subdags_n16.level_1_2   level_1_2_step_two  04-10T00:00:00  BashOperator    04-12T02:51:20  04-12T02:51:36  0:00:15.271900  343323  usw2dbdpag01.glassdoor.local    airflow 1   db  04-12T02:50:37  1   test_pool_1 
        success hanging_subdags_n16.level_1_8   level_1_8_step_two  04-10T00:00:00  BashOperator    04-12T02:51:14  04-12T02:51:29  0:00:15.314200  343316  usw2dbdpag01.glassdoor.local    airflow 1   db  04-12T02:50:25  1   test_pool_1 
        success hanging_subdags_n16.level_1_1   level_1_1_step_two  04-10T00:00:00  BashOperator    04-12T02:51:13  04-12T02:51:29  0:00:15.310300  343315  usw2dbdpag01.glassdoor.local    airflow 1   db  04-12T02:50:25  1   test_pool_1 
        success hanging_subdags_n16.level_1_15  level_1_15_step_two     04-10T00:00:00  BashOperator    04-12T02:51:05  04-12T02:51:20  0:00:15.310200  343314  usw2dbdpag01.glassdoor.local    airflow 1   db  04-12T02:50:25  1   test_pool_1 
        success hanging_subdags_n16.level_1_16  level_1_16_step_two     04-10T00:00:00  BashOperator    04-12T02:51:01  04-12T02:51:16  0:00:15.092800  343317  usw2dbdpag02.glassdoor.local    airflow 1   db  04-12T02:50:39  1   test_pool_1 
        success hanging_subdags_n16.level_1_7   level_1_7_step_two  04-10T00:00:00  BashOperator    04-12T02:50:44  04-12T02:50:59  0:00:15.200000  343303  usw2dbdpag01.glassdoor.local    airflow 1   db  04-12T02:50:01  1   test_pool_1 
        success hanging_subdags_n16.level_1_6   level_1_6_step_two  04-10T00:00:00  BashOperator    04-12T02:50:30  04-12T02:50:45  0:00:15.208100  343293  usw2dbdpag01.glassdoor.local    airflow 1   db  04-12T02:49:52  1   test_pool_1 
        success hanging_subdags_n16.level_1_13  level_1_13_step_two     04-10T00:00:00  BashOperator    04-12T02:50:30  04-12T02:50:45  0:00:15.255100  343294  usw2dbdpag01.glassdoor.local    airflow 1   db  04-12T02:49:52  1   test_pool_1 
        success hanging_subdags_n16.level_1_3   level_1_3_step_two  04-10T00:00:00  BashOperator    04-12T02:50:29  04-12T02:50:45  0:00:15.201300  343292  usw2dbdpag01.glassdoor.local    airflow 1   db  04-12T02:49:50  1   test_pool_1 
        success hanging_subdags_n16.level_1_11  level_1_11_step_two     04-10T00:00:00  BashOperator    04-12T02:50:12  04-12T02:50:27  0:00:15.402300  343291  usw2dbdpag01.glassdoor.local    airflow 1   db  04-12T02:49:40  1   test_pool_1 
        success hanging_subdags_n16.level_1_10  level_1_10_step_two     04-10T00:00:00  BashOperator    04-12T02:50:04  04-12T02:50:19  0:00:15.076900  343290  usw2dbdpag02.glassdoor.local    airflow 1   db  04-12T02:49:46  1   test_pool_1 
        success hanging_subdags_n16.level_1_9   level_1_9_step_two  04-10T00:00:00  BashOperator    04-12T02:50:01  04-12T02:50:16  0:00:15.193400  343287  usw2dbdpag01.glassdoor.local    airflow 1   db  04-12T02:49:28  1   test_pool_1 
        success hanging_subdags_n16.level_1_4   level_1_4_step_two  04-10T00:00:00  BashOperator    04-12T02:49:59  04-12T02:50:14  0:00:15.052100  343289  usw2dbdpag02.glassdoor.local    airflow 1   db  04-12T02:49:41  1   test_pool_1 
        success hanging_subdags_n16.level_1_1   level_1_1_step_1    04-10T00:00:00  BashOperator    04-12T02:49:04  04-12T02:50:04  0:01:00.103600  343274  usw2dbdpag01.glassdoor.local    airflow 2   db  04-12T02:48:27  1   test_pool_1 
        success hanging_subdags_n16.level_1_8   level_1_8_step_1    04-10T00:00:00  BashOperator    04-12T02:49:04  04-12T02:50:04  0:01:00.107500  343275  usw2dbdpag01.glassdoor.local    airflow 2   db  04-12T02:48:28  1   test_pool_1 
        success hanging_subdags_n16.level_1_12  level_1_12_step_1   04-10T00:00:00  BashOperator    04-12T02:49:04  04-12T02:50:04  0:01:00.106700  343276  usw2dbdpag01.glassdoor.local    airflow 2   db  04-12T02:48:27  1   test_pool_1 
        success hanging_subdags_n16.level_1_15  level_1_15_step_1   04-10T00:00:00  BashOperator    04-12T02:49:04  04-12T02:50:04  0:01:00.080500  343277  usw2dbdpag01.glassdoor.local    airflow 2   db  04-12T02:48:28  1   test_pool_1 
        success hanging_subdags_n16.level_1_2   level_1_2_step_1    04-10T00:00:00  BashOperator    04-12T02:48:58  04-12T02:49:58  0:01:00.186700  343271  usw2dbdpag01.glassdoor.local    airflow 2   db  04-12T02:48:26  1   test_pool_1 
        success hanging_subdags_n16.level_1_5   level_1_5_step_1    04-10T00:00:00  BashOperator    04-12T02:48:58  04-12T02:49:59  0:01:00.130800  343273  usw2dbdpag01.glassdoor.local    airflow 2   db  04-12T02:48:27  1   test_pool_1 
        success hanging_subdags_n16.level_1_16  level_1_16_step_1   04-10T00:00:00  BashOperator    04-12T02:48:58  04-12T02:49:58  0:01:00.171300  343272  usw2dbdpag01.glassdoor.local    airflow 2   db  04-12T02:48:28  1   test_pool_1 
        success hanging_subdags_n16.level_1_14  level_1_14_step_1   04-10T00:00:00  BashOperator    04-12T02:48:57  04-12T02:49:57  0:01:00.354500  343270  usw2dbdpag01.glassdoor.local    airflow 2   db  04-12T02:48:27  1   test_pool_1 
        success hanging_subdags_n16.level_1_6   level_1_6_step_1    04-10T00:00:00  BashOperator    04-12T02:48:36  04-12T02:49:36  0:01:00.068500  343269  usw2dbdpag01.glassdoor.local    airflow 2   db  04-12T02:48:05  1   test_pool_1 
        success hanging_subdags_n16.level_1_13  level_1_13_step_1   04-10T00:00:00  BashOperator    04-12T02:48:34  04-12T02:49:35  0:01:00.142200  343268  usw2dbdpag01.glassdoor.local    airflow 2   db  04-12T02:47:58  1   test_pool_1 
        success hanging_subdags_n16.level_1_7   level_1_7_step_1    04-10T00:00:00  BashOperator    04-12T02:48:32  04-12T02:49:33  0:01:00.117000  343265  usw2dbdpag01.glassdoor.local    airflow 2   db  04-12T02:47:54  1   test_pool_1 
        success hanging_subdags_n16.level_1_3   level_1_3_step_1    04-10T00:00:00  BashOperator    04-12T02:48:30  04-12T02:49:30  0:01:00.069700  343266  usw2dbdpag02.glassdoor.local    airflow 2   db  04-12T02:47:59  1   test_pool_1 
        success hanging_subdags_n16.level_1_10  level_1_10_step_1   04-10T00:00:00  BashOperator    04-12T02:48:30  04-12T02:49:30  0:01:00.073500  343267  usw2dbdpag02.glassdoor.local    airflow 2   db  04-12T02:48:04  1   test_pool_1 
        success hanging_subdags_n16.level_1_4   level_1_4_step_1    04-10T00:00:00  BashOperator    04-12T02:48:24  04-12T02:49:25  0:01:00.095400  343264  usw2dbdpag02.glassdoor.local    airflow 2   db  04-12T02:48:05  1   test_pool_1 
        success hanging_subdags_n16.level_1_11  level_1_11_step_1   04-10T00:00:00  BashOperator    04-12T02:48:24  04-12T02:49:24  0:01:00.107200  343263  usw2dbdpag02.glassdoor.local    airflow 2   db  04-12T02:48:00  1   test_pool_1 
        success hanging_subdags_n16.level_1_9   level_1_9_step_1    04-10T00:00:00  BashOperator    04-12T02:48:10  04-12T02:49:10  0:01:00.054200  343254  usw2dbdpag02.glassdoor.local    airflow 2   db  04-12T02:47:54  1   test_pool_1 
        success hanging_subdags_n16 level_1_4   04-10T00:00:00  SubDagOperator  04-12T02:47:47  04-12T02:50:17  0:02:30.373000  343227  usw2dbdpag02.glassdoor.local    airflow 3   db      1       
        success hanging_subdags_n16 level_1_10  04-10T00:00:00  SubDagOperator  04-12T02:47:47  04-12T02:50:27  0:02:40.508000  343224  usw2dbdpag02.glassdoor.local    airflow 3   db      1       
        success hanging_subdags_n16 level_1_15  04-10T00:00:00  SubDagOperator  04-12T02:47:47  04-12T02:51:27  0:03:40.175000  343228  usw2dbdpag02.glassdoor.local    airflow 3   db      1       
        success hanging_subdags_n16 level_1_2   04-10T00:00:00  SubDagOperator  04-12T02:47:46  04-12T02:51:41  0:03:55.360000  343219  usw2dbdpag02.glassdoor.local    airflow 3   db      1       
        success hanging_subdags_n16 level_1_5   04-10T00:00:00  SubDagOperator  04-12T02:47:46  04-12T02:51:41  0:03:54.854000  343226  usw2dbdpag02.glassdoor.local    airflow 3   db      1       
        success hanging_subdags_n16 level_1_6   04-10T00:00:00  SubDagOperator  04-12T02:47:46  04-12T02:50:52  0:03:05.716000  343221  usw2dbdpag02.glassdoor.local    airflow 3   db      1       
        success hanging_subdags_n16 level_1_8   04-10T00:00:00  SubDagOperator  04-12T02:47:46  04-12T02:51:32  0:03:45.671000  343222  usw2dbdpag02.glassdoor.local    airflow 3   db      1       
        success hanging_subdags_n16 level_1_12  04-10T00:00:00  SubDagOperator  04-12T02:47:46  04-12T02:51:47  0:04:00.732000  343225  usw2dbdpag02.glassdoor.local    airflow 3   db      1       
        success hanging_subdags_n16 level_1_14  04-10T00:00:00  SubDagOperator  04-12T02:47:46  04-12T02:51:51  0:04:05.075000  343223  usw2dbdpag02.glassdoor.local    airflow 3   db      1       
        success hanging_subdags_n16 level_1_16  04-10T00:00:00  SubDagOperator  04-12T02:47:46  04-12T02:51:26  0:03:40.434000  343220  usw2dbdpag02.glassdoor.local    airflow 3   db      1       
        success hanging_subdags_n16 level_1_1   04-10T00:00:00  SubDagOperator  04-12T02:47:45  04-12T02:51:30  0:03:44.928000  343218  usw2dbdpag02.glassdoor.local    airflow 3   db      1       
        success hanging_subdags_n16 level_1_3   04-10T00:00:00  SubDagOperator  04-12T02:47:35  04-12T02:50:51  0:03:15.789000  343217  usw2dbdpag01.glassdoor.local    airflow 3   db      1       
        success hanging_subdags_n16 level_1_7   04-10T00:00:00  SubDagOperator  04-12T02:47:35  04-12T02:51:10  0:03:34.999000  343216  usw2dbdpag01.glassdoor.local    airflow 3   db      1       
        success hanging_subdags_n16 level_1_9   04-10T00:00:00  SubDagOperator  04-12T02:47:35  04-12T02:50:25  0:02:50.283000  343215  usw2dbdpag01.glassdoor.local    airflow 3   db      1       
        success hanging_subdags_n16 level_1_11  04-10T00:00:00  SubDagOperator  04-12T02:47:35  04-12T02:50:35  0:03:00.086000  343213  usw2dbdpag01.glassdoor.local    airflow 3   db      1       
        success hanging_subdags_n16 level_1_13  04-10T00:00:00  SubDagOperator  04-12T02:47:35  04-12T02:50:50  0:03:15.163000  343214  usw2dbdpag01.glassdoor.local    airflow 3   db      1       
        success hanging_subdags_n16 start   04-11T00:00:00  DummyOperator   04-12T02:47:06  04-12T02:47:06  0:00:00.040969  343211  usw2dbdpag02.glassdoor.local    airflow 49  db      1       
        success hanging_subdags_n16 start   04-10T00:00:00  DummyOperator   04-12T02:47:01  04-12T02:47:01  0:00:00.056152  343210  usw2dbdpag02.glassdoor.local    airflow 49  db      1       

@syvineckruyk
Copy link
Contributor Author

So I think i'm on to something ... eveything grinds to a halt when 32 subdag operators are running.

I have two workers each with a celery_concurrency of 16 (32 Celery processes)

# The concurrency that will be used when starting workers with the
# "airflow worker" command. This defines the number of task instances that
# a worker will take, so size up your workers based on the resources on
# your worker box and the nature of your tasks
celeryd_concurrency = 16

When they are all consumed nothing else can run... including the subtasks which would allow for the subdag operator to complete.

I am going to to boost the celery concurrency to confirm.

Short of boosting the celery workers .... not sure what the best 'fix' for this would be.

@syvineckruyk
Copy link
Contributor Author

@mistercrunch your mentioned using SequentialExecutor with subdags. Where do you set it ?

  • sub-task ?
  • sub-dag ?
  • SubDagOperator ?

@syvineckruyk
Copy link
Contributor Author

Boosting the celery_concurrency worked ... but not really a solution.

I tried setting the sub-tasks to use the sequential executor but hit the same issue once 32 SubDagOperators were running

from datetime import timedelta, datetime
from airflow.models import DAG, Pool
from airflow.operators import BashOperator, SubDagOperator, DummyOperator
from airflow.executors import SequentialExecutor
import airflow


# -----------------------------------------------------------------\
# DEFINE THE POOLS
# -----------------------------------------------------------------/
session = airflow.settings.Session()
for p in ['test_pool_1', 'test_pool_2', 'test_pool_3']:
    pool = (
        session.query(Pool)
        .filter(Pool.pool == p)
        .first())
    if not pool:
        session.add(Pool(pool=p, slots=8))
        session.commit()


# -----------------------------------------------------------------\
# DEFINE THE DAG
# -----------------------------------------------------------------/

# Define the Dag Name. This must be unique.
dag_name = 'hanging_subdags_n16_sqe'

# Default args are passed to each task
default_args = {
    'owner': 'Airflow',
    'depends_on_past': False,
    'start_date': datetime(2016, 04, 10),
    'retries': 0,
    'retry_interval': timedelta(minutes=5),
    'email': ['your@email.com'],
    'email_on_failure': True,
    'email_on_retry': True,
    'wait_for_downstream': False,
}

# Create the dag object
dag = DAG(dag_name,
          default_args=default_args,
          schedule_interval='0 0 * * *'
          )

# -----------------------------------------------------------------\
# DEFINE THE TASKS
# -----------------------------------------------------------------/


def get_subdag(dag, sd_id, pool=None):
    subdag = DAG(
        dag_id='{parent_dag}.{sd_id}'.format(
            parent_dag=dag.dag_id,
            sd_id=sd_id),
        params=dag.params,
        default_args=dag.default_args,
        template_searchpath=dag.template_searchpath,
        user_defined_macros=dag.user_defined_macros,
    )

    t1 = BashOperator(
        task_id='{sd_id}_step_1'.format(
            sd_id=sd_id
        ),
        bash_command='echo "hello" && sleep 60',
        dag=subdag,
        pool=pool,
        executor=SequentialExecutor
    )

    t2 = BashOperator(
        task_id='{sd_id}_step_two'.format(
            sd_id=sd_id
        ),
        bash_command='echo "hello" && sleep 15',
        dag=subdag,
        pool=pool,
        executor=SequentialExecutor
    )

    t2.set_upstream(t1)

    sdo = SubDagOperator(
        task_id=sd_id,
        subdag=subdag,
        retries=0,
        retry_delay=timedelta(seconds=5),
        dag=dag,
        depends_on_past=True,
    )

    return sdo

start_task = DummyOperator(
    task_id='start',
    dag=dag
)

for n in range(1, 17):
    sd_i = get_subdag(dag=dag, sd_id='level_1_{n}'.format(n=n), pool='test_pool_1')
    sd_ii = get_subdag(dag=dag, sd_id='level_2_{n}'.format(n=n), pool='test_pool_2')
    sd_iii = get_subdag(dag=dag, sd_id='level_3_{n}'.format(n=n), pool='test_pool_3')

    sd_i.set_upstream(start_task)
    sd_ii.set_upstream(sd_i)
    sd_iii.set_upstream(sd_ii)

@jlowin jlowin added the kind:bug This is a clearly a bug label Apr 12, 2016
@jlowin jlowin self-assigned this Apr 12, 2016
@syvineckruyk
Copy link
Contributor Author

So I tried @mistercrunch suggestion from https://groups.google.com/d/msg/airbnb_airflow/8NyLPHV1Fv8/VmgSx4EXBgAJ)
... Should have tried this sooner but it was not clear to me what the implications/reasons for this were.

This appears to resolve the issue ... I am a bit unclear what the workflow actually is now though ... trying to figure out from the logs. Are my sub-tasks still being distributed through celery to the workers ? Is the SequentialExecutor in this case just responsible for queueing the sub-tasks for the CeleryExecutor to consume ?

from datetime import timedelta, datetime
from airflow.models import DAG, Pool
from airflow.operators import BashOperator, SubDagOperator, DummyOperator
from airflow.executors import SequentialExecutor
import airflow


# -----------------------------------------------------------------\
# DEFINE THE POOLS
# -----------------------------------------------------------------/
session = airflow.settings.Session()
for p in ['test_pool_1', 'test_pool_2', 'test_pool_3']:
    pool = (
        session.query(Pool)
        .filter(Pool.pool == p)
        .first())
    if not pool:
        session.add(Pool(pool=p, slots=8))
        session.commit()


# -----------------------------------------------------------------\
# DEFINE THE DAG
# -----------------------------------------------------------------/

# Define the Dag Name. This must be unique.
dag_name = 'hanging_subdags_n16_sqe'

# Default args are passed to each task
default_args = {
    'owner': 'Airflow',
    'depends_on_past': False,
    'start_date': datetime(2016, 04, 10),
    'retries': 0,
    'retry_interval': timedelta(minutes=5),
    'email': ['your@email.com'],
    'email_on_failure': True,
    'email_on_retry': True,
    'wait_for_downstream': False,
}

# Create the dag object
dag = DAG(dag_name,
          default_args=default_args,
          schedule_interval='0 0 * * *'
          )

# -----------------------------------------------------------------\
# DEFINE THE TASKS
# -----------------------------------------------------------------/


def get_subdag(dag, sd_id, pool=None):
    subdag = DAG(
        dag_id='{parent_dag}.{sd_id}'.format(
            parent_dag=dag.dag_id,
            sd_id=sd_id),
        params=dag.params,
        default_args=dag.default_args,
        template_searchpath=dag.template_searchpath,
        user_defined_macros=dag.user_defined_macros,
    )

    t1 = BashOperator(
        task_id='{sd_id}_step_1'.format(
            sd_id=sd_id
        ),
        bash_command='echo "hello" && sleep 60',
        dag=subdag,
        pool=pool
    )

    t2 = BashOperator(
        task_id='{sd_id}_step_two'.format(
            sd_id=sd_id
        ),
        bash_command='echo "hello" && sleep 15',
        dag=subdag,
        pool=pool
    )

    t2.set_upstream(t1)

    sdo = SubDagOperator(
        task_id=sd_id,
        subdag=subdag,
        retries=0,
        retry_delay=timedelta(seconds=5),
        dag=dag,
        depends_on_past=True,
        executor=SequentialExecutor()
    )

    return sdo

start_task = DummyOperator(
    task_id='start',
    dag=dag
)

for n in range(1, 17):
    sd_i = get_subdag(dag=dag, sd_id='level_1_{n}'.format(n=n), pool='test_pool_1')
    sd_ii = get_subdag(dag=dag, sd_id='level_2_{n}'.format(n=n), pool='test_pool_2')
    sd_iii = get_subdag(dag=dag, sd_id='level_3_{n}'.format(n=n), pool='test_pool_3')

    sd_i.set_upstream(start_task)
    sd_ii.set_upstream(sd_i)
    sd_iii.set_upstream(sd_ii)

@syvineckruyk syvineckruyk changed the title Pooled Subdag operators not running as expected. Tasks are hanging in queued or no state Subdag operators consuming all celeryd worker processes. Tasks are hanging in queued or no state Apr 12, 2016
@bolkedebruin
Copy link
Contributor

bolkedebruin commented Apr 15, 2016

@jlowin @mistercrunch . The issue is that subdag operators keep their slot and call "run" that uses a backfilljob and then waits for this process to complete, keeping its slot the entire time. Imho the subdag should not call "run", but should do a schedule. This allows the subdag to return right away and clear its slot.

Furthermore, I consider being able to specify the executor on the subdag operator an issue as it allows someone to go beyond the resources that have been assigned by ops (ie. this is known and marked as such in the docs). However, by specifying the executor you can essentially tie the dag to a node and make it possible to share data via, eg. /tmp . I think that should be solved differently.

@bolkedebruin
Copy link
Contributor

bolkedebruin commented May 8, 2016

See also work on AIRFLOW-20. @syvineckruyk Can you create a Jira on this and link it to AIRFLOW-20?

@syvineckruyk
Copy link
Contributor Author

@bolkedebruin i cannot seem to create issues .. do i need to be given permission ?

@bolkedebruin
Copy link
Contributor

Did you register? You should have access by default

@syvineckruyk
Copy link
Contributor Author

@bolkedebruin yeah registered and logged in ... I had to change whatever this option is to get it working.

image

@syvineckruyk
Copy link
Contributor Author

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
kind:bug This is a clearly a bug
Projects
None yet
Development

No branches or pull requests

5 participants