From 345368c365c050adbe7aa7e812f825a504e107db Mon Sep 17 00:00:00 2001 From: pradhanpk Date: Mon, 4 Apr 2016 17:16:46 -0400 Subject: [PATCH 1/2] Fix infinite retries with pools `SchedulerJob` contains a `set` of `TaskInstance`s called `queued_tis`. `SchedulerJob.process_events` loops through `queued_tis` and tries to remove completed tasks. However, without customizing `__eq__` and `__hash__`, the following two lines have no effect, never removing elements from `queued_tis` leading to infinite retries on failure. This is related to my comment on #216. The following code was introduced in the fix to #1225. ``` elif ti in self.queued_tis: self.queued_tis.remove(ti) ```` --- airflow/models.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/airflow/models.py b/airflow/models.py index fd1349ecf5860..9cc7d2f4371a8 100644 --- a/airflow/models.py +++ b/airflow/models.py @@ -619,7 +619,16 @@ def __init__(self, task, execution_date, state=None): if state: self.state = state - def command( + def _key(self): + return (self.task_id, self.dag_id, self.execution_date) + + def __eq__(self, other): + return self._key() == other._key() + + def __hash__(self): + return hash(self.__key()) + + def command( self, mark_success=False, ignore_dependencies=False, From 9ba70f356047b00b1360bacc0292a654429114c4 Mon Sep 17 00:00:00 2001 From: pradhanpk Date: Mon, 4 Apr 2016 17:22:22 -0400 Subject: [PATCH 2/2] Update models.py --- airflow/models.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/models.py b/airflow/models.py index 9cc7d2f4371a8..2b6c5c9e6fac7 100644 --- a/airflow/models.py +++ b/airflow/models.py @@ -626,7 +626,7 @@ def __eq__(self, other): return self._key() == other._key() def __hash__(self): - return hash(self.__key()) + return hash(self._key()) def command( self,