Skip to content

Commit

Permalink
Add logic to lock DB and avoid race condition
Browse files Browse the repository at this point in the history
The scheduler can encounter a queued task twice before the
task actually starts to run -- this locks the task and avoids
that condition.
  • Loading branch information
jlowin committed May 9, 2016
1 parent 43bdd7a commit c1aa93f
Showing 1 changed file with 14 additions and 5 deletions.
19 changes: 14 additions & 5 deletions airflow/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -817,16 +817,25 @@ def error(self, session=None):
session.commit()

@provide_session
def refresh_from_db(self, session=None):
def refresh_from_db(self, session=None, lock_for_update=False):
"""
Refreshes the task instance from the database based on the primary key
:param lock_for_update: if True, indicates that the database should
lock the TaskInstance (issuing a FOR UPDATE clause) until the session
is committed.
"""
TI = TaskInstance
ti = session.query(TI).filter(

qry = session.query(TI).filter(
TI.dag_id == self.dag_id,
TI.task_id == self.task_id,
TI.execution_date == self.execution_date,
).first()
TI.execution_date == self.execution_date)

if lock_for_update:
ti = qry.with_for_update().first()
else:
ti = qry.first()
if ti:
self.state = ti.state
self.start_date = ti.start_date
Expand Down Expand Up @@ -1159,7 +1168,7 @@ def run(
self.pool = pool or task.pool
self.test_mode = test_mode
self.force = force
self.refresh_from_db()
self.refresh_from_db(session=session, lock_for_update=True)
self.clear_xcom_data()
self.job_id = job_id
iso = datetime.now().isoformat()
Expand Down

0 comments on commit c1aa93f

Please sign in to comment.