Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix infinite retries with pools #1299

Conversation

pradhanpk
Copy link
Contributor

SchedulerJob contains a set of TaskInstances called queued_tis. SchedulerJob.process_events loops through queued_tis and tries to remove completed tasks. However, without customizing __eq__ and __hash__, the following two lines in jobs.py's SchedulerJob.process_events have no effect, never removing elements from queued_tis leading to infinite retries on failure. This is related to my comment on #216. The following code was introduced in the fix to #1225.

elif ti in self.queued_tis:
    self.queued_tis.remove(ti)

`SchedulerJob` contains a `set` of `TaskInstance`s called `queued_tis`. `SchedulerJob.process_events` loops through `queued_tis` and tries to remove completed tasks. However, without customizing `__eq__` and `__hash__`, the following two lines have no effect, never removing elements from `queued_tis` leading to infinite retries on failure. This is related to my comment on apache#216. The following code was introduced in the fix to apache#1225.

```
elif ti in self.queued_tis:
    self.queued_tis.remove(ti)
````
@landscape-bot
Copy link

Code Health
Repository health decreased by 9% when pulling 345368c on pradhanpk:pradhanpk-fix-SchedulerJob.process_events into 0bae60f on airbnb:master.

@landscape-bot
Copy link

Code Health
Repository health decreased by 9% when pulling 9ba70f3 on pradhanpk:pradhanpk-fix-SchedulerJob.process_events into 0bae60f on airbnb:master.

@jlowin
Copy link
Member

jlowin commented Apr 4, 2016

@pradhanpk task instances are hashable and added/removed from sets and dicts in many places in Airflow's code. Does this not work for you?

import airflow
import datetime
from airflow.models import DAG, TaskInstance as TI
from airflow.operators import DummyOperator

start_date = datetime.datetime(2016, 1, 1)

dag = DAG('dag')
op = DummyOperator(task_id='op', dag=dag, start_date=start_date, owner='airflow')
ti = TI(op, start_date)
ti.__hash__() # [hash]

test_set = set()
test_set.add(ti)
len(test_set) # 1
test_set.remove(ti)
len(test_set) # 0

It looks like the problem you're seeing is simply because prioritized_queued doesn't clear the queued_tis set after it iterates over it. I will add that to the second round of job fixes that are coming in #1290.

@jlowin
Copy link
Member

jlowin commented Apr 4, 2016

I've added the fix to #1290 with a test case, it will be available once that one is merged

@jlowin jlowin closed this Apr 4, 2016
@pradhanpk
Copy link
Contributor Author

@jlowin I think the issue is that you want two TaskInstances to be equal when their (dag_id, task_id, execution_date)'s match(like in the __eq__ implementation in this PR), since this is the primary key of the TaskInstance ORM. The problem without this can be seen in this modification of your example:

import airflow
import datetime
import copy
from airflow.models import DAG, TaskInstance as TI
from airflow.operators import DummyOperator

start_date = datetime.datetime(2016, 1, 1)

dag = DAG('dag')
op = DummyOperator(task_id='op', dag=dag, start_date=start_date, owner='airflow')
ti = TI(op, start_date)
ti.__hash__() # [hash]

test_set = set()
test_set.add(ti)
len(test_set) # 1

copy_ti = copy.deepcopy(ti)
copy_ti in test_set #False
test_set.remove(copy_ti) #KeyError

Isn't elif ti in self.queued_tis: more like copy_ti in test_set, rather than ti in test_set?

@jlowin
Copy link
Member

jlowin commented Apr 4, 2016

Ah, I understand what you mean. I don't think we want to force TI equality, though. If I create two Tis, they are not the same thing. However there is already a .key property which accomplishes much of what you're looking for if you do need to determine equality.

The issue you're trying to correct, where Tis are removed from queued_tasks, is a red herring -- it's not impacting the execution anymore (because queued_tasks is completely cleared shortly thereafter). I will delete the line entirely to make that clear -- as you point outed, it's not actually doing anything anyway!

@pradhanpk
Copy link
Contributor Author

@jlowin Heh, should have looked harder for key :) Is the fix in master or is there a PR? I have pulled till 0f28090 and am unable to find code clearing queued_tis. Thanks!

jlowin added a commit to jlowin/airflow that referenced this pull request Apr 5, 2016
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants