Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scheduling on latest Master broken when using Pooled #1397

Closed
r39132 opened this issue Apr 18, 2016 · 5 comments
Closed

Scheduling on latest Master broken when using Pooled #1397

r39132 opened this issue Apr 18, 2016 · 5 comments
Labels
kind:bug This is a clearly a bug

Comments

@r39132
Copy link
Contributor

r39132 commented Apr 18, 2016

Dear Airflow Maintainers,

Before I tell you about my issue, let me describe my environment:

Environment

  • Version of Airflow (e.g. a release version, running your own fork, running off master -- provide a git log snippet):
I'm running on latest master : 

Sid-As-MacBook-Pro:r39132_airflow siddharth$ git log --pretty=format:"%h - %an, %ar : %s" | head -n 4
8750d75 - Siddharth Anand, 38 minutes ago : Fixed a bug in the scheduler: num_runs used where runs intended
c3f6892 - Sid Anand, 9 hours ago : Merge pull request #1366 from angelgao/master
7da6a94 - bolkedebruin, 14 hours ago : Merge pull request #1376 from bolkedebruin/multiprocessing_scheduler
a36861a - Bolke de Bruin, 2 days ago : Add multiprocessing support to the scheduler

I was also seeing this error when git log had 
7da6a94 - bolkedebruin, 14 hours ago : Merge pull request #1376 from bolkedebruin/multiprocessing_scheduler
a36861a - Bolke de Bruin, 2 days ago : Add multiprocessing support to the scheduler

  • Airflow components and configuration, if applicable (e.g. "Running a Scheduler with CeleryExecutor") : LocalExecutor
  • Example code to reproduce the bug (as a code snippet in markdown)
import airflow
from airflow import DAG
from airflow.models import Pool
from airflow.operators import BashOperator
import datetime

session = airflow.settings.Session()

if not (
        session.query(Pool)
            .filter(Pool.pool == 'pool')
            .first()):
    session.add(Pool(pool='pool', slots=4))


default_args = dict(owner='airflow', start_date=datetime.datetime(2016, 4, 12))
dag = DAG('test_dag_garthcn', default_args=default_args)

for i in range(3):
    BashOperator(
        task_id='op_{}'.format(i),
        bash_command="sleep 1",
        dag=dag,
        pool='pool')
  • Screen shots of your DAG's graph and tree views:

These views don't change!

screenshot 2016-04-18 08 20 02

screenshot 2016-04-18 08 20 20

screenshot 2016-04-18 08 19 50

screenshot 2016-04-18 08 20 36

- Stack trace if applicable: - Operating System: (Windows Version or `Darwin Sid-As-MacBook-Pro.local 14.5.0 Darwin Kernel Version 14.5.0: Tue Sep 1 21:23:09 PDT 2015; root:xnu-2782.50.1~1/RELEASE_X86_64 x86_64`) - Python Version: `Python 2.7.9` - (Optional) Python packages: `$ pip freeze` or `$ conda list`

Now that you know a little about me, let me tell you about the issue I am having:

Description of Issue

  • What did you expect to happen?
  • What happened instead?
  • Here is how you can reproduce this issue on your machine:

Reproduction Steps

  1. run airflow scheduler -f
  2. After a short while, no forward progress is made and the scheduler output appears to keep marking the same tasks as successful
[2016-04-18 08:43:04,077] {jobs.py:751} INFO - Starting 1 scheduler jobs
[2016-04-18 08:43:04,111] {models.py:2626} INFO - Checking state for <DagRun test_dag_garthcn @ 2016-04-12 00:00:00: scheduled__2016-04-12T00:00:00, externally triggered: False>
[2016-04-18 08:43:04,112] {models.py:2626} INFO - Checking state for <DagRun test_dag_garthcn @ 2016-04-13 00:00:00: scheduled__2016-04-13T00:00:00, externally triggered: False>
[2016-04-18 08:43:04,112] {models.py:2648} INFO - Marking run <DagRun test_dag_garthcn @ 2016-04-13 00:00:00: scheduled__2016-04-13T00:00:00, externally triggered: False> successful
[2016-04-18 08:43:04,112] {models.py:2626} INFO - Checking state for <DagRun test_dag_garthcn @ 2016-04-14 00:00:00: scheduled__2016-04-14T00:00:00, externally triggered: False>
[2016-04-18 08:43:04,112] {models.py:2648} INFO - Marking run <DagRun test_dag_garthcn @ 2016-04-14 00:00:00: scheduled__2016-04-14T00:00:00, externally triggered: False> successful
[2016-04-18 08:43:04,113] {models.py:2626} INFO - Checking state for <DagRun test_dag_garthcn @ 2016-04-15 00:00:00: scheduled__2016-04-15T00:00:00, externally triggered: False>
[2016-04-18 08:43:04,113] {models.py:2648} INFO - Marking run <DagRun test_dag_garthcn @ 2016-04-15 00:00:00: scheduled__2016-04-15T00:00:00, externally triggered: False> successful
[2016-04-18 08:43:04,113] {models.py:2626} INFO - Checking state for <DagRun test_dag_garthcn @ 2016-04-16 00:00:00: scheduled__2016-04-16T00:00:00, externally triggered: False>
[2016-04-18 08:43:04,113] {models.py:2626} INFO - Checking state for <DagRun test_dag_garthcn @ 2016-04-17 00:00:00: scheduled__2016-04-17T00:00:00, externally triggered: False>
[2016-04-18 08:43:04,114] {jobs.py:499} INFO - Getting list of tasks to skip for active runs.
[2016-04-18 08:43:04,116] {jobs.py:515} INFO - Checking dependencies on 9 tasks instances, minus 3 skippable ones
[2016-04-18 08:43:04,139] {jobs.py:764} INFO - Done queuing tasks, calling the executor's heartbeat
[2016-04-18 08:43:04,139] {jobs.py:767} INFO - Loop took: 0.074213 seconds
[2016-04-18 08:43:04,145] {models.py:303} INFO - Finding 'running' jobs without a recent heartbeat
[2016-04-18 08:43:04,145] {models.py:309} INFO - Failing jobs without heartbeat after 2016-04-18 08:40:49.145599
[2016-04-18 08:43:09,070] {jobs.py:597} INFO - Prioritizing 0 queued jobs
[2016-04-18 08:43:09,079] {jobs.py:751} INFO - Starting 1 scheduler jobs
[2016-04-18 08:43:09,115] {models.py:2626} INFO - Checking state for <DagRun test_dag_garthcn @ 2016-04-12 00:00:00: scheduled__2016-04-12T00:00:00, externally triggered: False>
[2016-04-18 08:43:09,116] {models.py:2626} INFO - Checking state for <DagRun test_dag_garthcn @ 2016-04-13 00:00:00: scheduled__2016-04-13T00:00:00, externally triggered: False>
[2016-04-18 08:43:09,116] {models.py:2648} INFO - Marking run <DagRun test_dag_garthcn @ 2016-04-13 00:00:00: scheduled__2016-04-13T00:00:00, externally triggered: False> successful
[2016-04-18 08:43:09,116] {models.py:2626} INFO - Checking state for <DagRun test_dag_garthcn @ 2016-04-14 00:00:00: scheduled__2016-04-14T00:00:00, externally triggered: False>
[2016-04-18 08:43:09,116] {models.py:2648} INFO - Marking run <DagRun test_dag_garthcn @ 2016-04-14 00:00:00: scheduled__2016-04-14T00:00:00, externally triggered: False> successful
[2016-04-18 08:43:09,117] {models.py:2626} INFO - Checking state for <DagRun test_dag_garthcn @ 2016-04-15 00:00:00: scheduled__2016-04-15T00:00:00, externally triggered: False>
[2016-04-18 08:43:09,117] {models.py:2648} INFO - Marking run <DagRun test_dag_garthcn @ 2016-04-15 00:00:00: scheduled__2016-04-15T00:00:00, externally triggered: False> successful
[2016-04-18 08:43:09,117] {models.py:2626} INFO - Checking state for <DagRun test_dag_garthcn @ 2016-04-16 00:00:00: scheduled__2016-04-16T00:00:00, externally triggered: False>
[2016-04-18 08:43:09,118] {models.py:2626} INFO - Checking state for <DagRun test_dag_garthcn @ 2016-04-17 00:00:00: scheduled__2016-04-17T00:00:00, externally triggered: False>
[2016-04-18 08:43:09,118] {jobs.py:499} INFO - Getting list of tasks to skip for active runs.
[2016-04-18 08:43:09,120] {jobs.py:515} INFO - Checking dependencies on 9 tasks instances, minus 3 skippable ones
[2016-04-18 08:43:09,143] {jobs.py:764} INFO - Done queuing tasks, calling the executor's heartbeat
[2016-04-18 08:43:09,143] {jobs.py:767} INFO - Loop took: 0.075661 seconds
[2016-04-18 08:43:09,149] {models.py:303} INFO - Finding 'running' jobs without a recent heartbeat
[2016-04-18 08:43:09,149] {models.py:309} INFO - Failing jobs without heartbeat after 2016-04-18 08:40:54.149653
[2016-04-18 08:43:14,071] {jobs.py:597} INFO - Prioritizing 0 queued jobs
[2016-04-18 08:43:14,074] {models.py:152} INFO - Filling up the DagBag from /Users/siddharth/Projects/r39132_airflow/my_dags
[2016-04-18 08:43:14,089] {jobs.py:751} INFO - Starting 1 scheduler jobs
[2016-04-18 08:43:14,122] {models.py:2626} INFO - Checking state for <DagRun test_dag_garthcn @ 2016-04-12 00:00:00: scheduled__2016-04-12T00:00:00, externally triggered: False>
[2016-04-18 08:43:14,123] {models.py:2626} INFO - Checking state for <DagRun test_dag_garthcn @ 2016-04-13 00:00:00: scheduled__2016-04-13T00:00:00, externally triggered: False>
[2016-04-18 08:43:14,123] {models.py:2648} INFO - Marking run <DagRun test_dag_garthcn @ 2016-04-13 00:00:00: scheduled__2016-04-13T00:00:00, externally triggered: False> successful
[2016-04-18 08:43:14,124] {models.py:2626} INFO - Checking state for <DagRun test_dag_garthcn @ 2016-04-14 00:00:00: scheduled__2016-04-14T00:00:00, externally triggered: False>
[2016-04-18 08:43:14,124] {models.py:2648} INFO - Marking run <DagRun test_dag_garthcn @ 2016-04-14 00:00:00: scheduled__2016-04-14T00:00:00, externally triggered: False> successful
[2016-04-18 08:43:14,124] {models.py:2626} INFO - Checking state for <DagRun test_dag_garthcn @ 2016-04-15 00:00:00: scheduled__2016-04-15T00:00:00, externally triggered: False>
[2016-04-18 08:43:14,124] {models.py:2648} INFO - Marking run <DagRun test_dag_garthcn @ 2016-04-15 00:00:00: scheduled__2016-04-15T00:00:00, externally triggered: False> successful
[2016-04-18 08:43:14,124] {models.py:2626} INFO - Checking state for <DagRun test_dag_garthcn @ 2016-04-16 00:00:00: scheduled__2016-04-16T00:00:00, externally triggered: False>
[2016-04-18 08:43:14,125] {models.py:2626} INFO - Checking state for <DagRun test_dag_garthcn @ 2016-04-17 00:00:00: scheduled__2016-04-17T00:00:00, externally triggered: False>
[2016-04-18 08:43:14,125] {jobs.py:499} INFO - Getting list of tasks to skip for active runs.
@r39132
Copy link
Contributor Author

r39132 commented Apr 18, 2016

This may be the same as #1365

@bolkedebruin @jlowin @garthcn

FYI, my code example above is borrowed from the above-referenced issue (1365)

@r39132 r39132 added the kind:bug This is a clearly a bug label Apr 18, 2016
@bolkedebruin
Copy link
Contributor

I don't think it is related to the multiprocessing scheduler changes. Did you try @jlowin 's #1378?

@jlowin
Copy link
Member

jlowin commented Apr 19, 2016

@r39132 try #1378, I think that fixes it.

@jlowin
Copy link
Member

jlowin commented Apr 21, 2016

@r39132 I checked out the same commit as you (8750d75) and confirmed exactly the same issue (though a different set of tasks were in queued limbo -- the issue must be non-deterministic due to the use of a set). #1378 fixes it:
screen shot 2016-04-21 at 9 38 42 am

@bolkedebruin
Copy link
Contributor

@r39132 closing this, as @jlowin and I confirmed the fix in #1378. Please open a Jira if it persists

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
kind:bug This is a clearly a bug
Projects
None yet
Development

No branches or pull requests

3 participants