Skip to content

Commit

Permalink
Fix infinite retries with pools, with test
Browse files Browse the repository at this point in the history
Addresses the issue raised in apache#1299
  • Loading branch information
jlowin committed Apr 5, 2016
1 parent 2e0421a commit b2844af
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 3 deletions.
4 changes: 2 additions & 2 deletions airflow/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -571,8 +571,6 @@ def process_events(self, executor, dagbag):
# collect queued tasks for prioritiztion
if ti.state == State.QUEUED:
self.queued_tis.add(ti)
elif ti in self.queued_tis:
self.queued_tis.remove(ti)
else:
# special instructions for failed executions could go here
pass
Expand Down Expand Up @@ -601,6 +599,8 @@ def prioritize_queued(self, session, executor, dagbag):
else:
d[ti.pool].append(ti)

self.queued_tis.clear()

dag_blacklist = set(dagbag.paused_dags())
for pool, tis in list(d.items()):
if not pool:
Expand Down
12 changes: 12 additions & 0 deletions tests/dags/test_issue_1225.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,3 +115,15 @@ def fail():
subdag=subdag7)
subdag7_task1.set_downstream(subdag7_task2)
subdag7_task2.set_downstream(subdag7_task3)

# DAG tests that queued tasks are run
dag8 = DAG(
dag_id='test_scheduled_queued_tasks',
start_date=DEFAULT_DATE,
end_date=DEFAULT_DATE,
default_args=default_args)
dag8_task1 = PythonOperator(
python_callable=fail,
task_id='test_queued_task',
dag=dag8,
pool='test_queued_pool')
31 changes: 30 additions & 1 deletion tests/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def run_with_timeout():
job.run()
self.assertRaises(AirflowException, run_with_timeout)

def test_backfill_pooled_task(self):
def test_backfill_pooled_tasks(self):
"""
Test that queued tasks are executed by BackfillJob
Expand Down Expand Up @@ -262,6 +262,35 @@ def test_dagrun_deadlock(self):
dagrun_state=State.FAILED,
advance_execution_date=True)

def test_scheduler_pooled_tasks(self):
"""
Test that the scheduler handles queued tasks correctly
See issue #1299
"""
session = settings.Session()
if not (
session.query(Pool)
.filter(Pool.pool == 'test_queued_pool')
.first()):
pool = Pool(pool='test_queued_pool', slots=5)
session.merge(pool)
session.commit()
session.close()

dag_id = 'test_scheduled_queued_tasks'
dag = self.dagbag.get_dag(dag_id)
dag.clear()

scheduler = SchedulerJob(dag_id, num_runs=10)
scheduler.run()

task_1 = dag.tasks[0]
ti = TI(task_1, dag.start_date)
ti.refresh_from_db()
self.assertEqual(ti.state, State.FAILED)

dag.clear()

def test_dagrun_deadlock_ignore_depends_on_past_advance_ex_date(self):
"""
DagRun is marked a success if ignore_first_depends_on_past=True
Expand Down

0 comments on commit b2844af

Please sign in to comment.