Skip to content

Commit

Permalink
Fix : Don't treat premature tasks as could_not_run tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
r39132 authored and aoen committed May 13, 2016
1 parent 4a5f4a0 commit 563be13
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 1 deletion.
3 changes: 3 additions & 0 deletions airflow/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -525,7 +525,10 @@ def process_dag(self, dag, queue):
elif ti.is_runnable(flag_upstream_failed=True):
self.logger.debug('Queuing task: {}'.format(ti))
queue.put((ti.key, pickle_id))
elif ti.is_premature():
continue
else:
self.logger.debug('Adding task: {} to the COULD_NOT_RUN set'.format(ti))
could_not_run.add(ti)

# this type of deadlock happens when dagruns can't even start and so
Expand Down
11 changes: 10 additions & 1 deletion airflow/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -856,7 +856,7 @@ def is_queueable(
if self.execution_date > datetime.now():
return False
# is the task still in the retry waiting period?
elif self.state == State.UP_FOR_RETRY and not self.ready_for_retry():
elif self.is_premature():
return False
# does the task have an end_date prior to the execution date?
elif self.task.end_date and self.execution_date > self.task.end_date:
Expand All @@ -878,6 +878,15 @@ def is_queueable(
else:
return False


def is_premature(self):
"""
Returns whether a task is in UP_FOR_RETRY state and its retry interval
has elapsed.
"""
# is the task still in the retry waiting period?
return self.state == State.UP_FOR_RETRY and not self.ready_for_retry()

def is_runnable(
self,
include_queued=False,
Expand Down

0 comments on commit 563be13

Please sign in to comment.