-
Notifications
You must be signed in to change notification settings - Fork 14.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
1.7.1rc1 seems to keep re-scheduling successful tasks #1365
Comments
Thanks for reporting this @garthcn ( I will try to reproduce this shortly ) cc/ @mistercrunch @criccomini @jlowin |
@r39132 I think this might be the same thing that I'm seeing. When I grep for previously succeeded on, I find a ton of entries after upgrading to 1.7.1. |
Note: I was using local executor, so problem seems to happen both on celery and local. |
I might have found the problem. I was making many changes regarding the scheduler's behavior when this release candidate was cut and it looks like one of my PRs made it in and one just missed -- and the one that missed included a bugfix for this situation. Specifically, this one line (no. 823) https://github.com/airbnb/airflow/pull/1290/files?diff=unified#diff-54a57ccc2c8e73d12c812798bf79ccb2R823 Without this line, the Scheduler's set of queued TI's doesn't clear even when they succeed, and the Scheduler tries to put them through again and again. BUT this morning @abridgett raised a situation that may have been caused by the new logic when used in conjunction with Long story short: don't use 1.7.1rc 😞 |
by the way @r39132 this was my test dag. @garthcn Does it exhibit the behavior for you too? I actually did NOT see the "previously succeeded" message in the logs for some reason, but I could see that it was trying to run the tasks. import airflow
from airflow import DAG
from airflow.models import Pool
from airflow.operators import BashOperator
import datetime
session = airflow.settings.Session()
if not (
session.query(Pool)
.filter(Pool.pool == 'pool')
.first()):
session.add(Pool(pool='pool', slots=4))
default_args = dict(owner='airflow', start_date=datetime.datetime(2016, 4, 12))
dag = DAG('dag', default_args=default_args)
for i in range(3):
BashOperator(
task_id='op_{}'.format(i),
bash_command="sleep 1",
dag=dag,
pool='pool') |
@garthcn ping. It would be nice to have this fixed before a new release |
I'm not using |
Please reopen if issue continues to persist |
Please note that this is only happening to us on 1.7.1rc1, which is not an official release.
Reverting to 1.7.0 seems to have solved the issue.
Environment
Description of Issue
We have a Pool (size 4) for our task instances. Airflow is supposed to add task instances to the queue, pick at most 4 from the queue, finish them, and get more task instances from the queue. This is working properly before 1.7.1rc1.
However, after updating to 1.7.1rc1, Airflow adds tasks instances to the queue, takes 4 from the queue, finishes them, but stops taking more from the queue. Instead, it would re-add successful task instances repeatedly, try to run them, and exit without doing anything (because it would find out that it's already successful). In each task instance's log file, there are many lines like:
It keeps running each already finished task instances, and keeps printing out lines like this.
Reproduce
I'm not sure if this is reliably reproducible, but our setup is:
airflow flower
UI.The text was updated successfully, but these errors were encountered: