Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backfill crashes with "KeyError: TaskInstanceKey" when task has retries #13322

Closed
sarvo-madhavan opened this issue Dec 26, 2020 · 19 comments
Closed
Labels
affected_version:2.0 Issues Reported for 2.0 kind:bug This is a clearly a bug pending-response stale Stale PRs per the .github/workflows/stale.yml policy file

Comments

@sarvo-madhavan
Copy link

sarvo-madhavan commented Dec 26, 2020

Apache Airflow version: 2.0.0

Kubernetes version (if you are using kubernetes) (use kubectl version): No Kubernetes

Environment: Docker python environment (3.8)

What happened: Backfill command crashes with this stack error:

Traceback (most recent call last):
  File "/opt/conda/bin/airflow", line 8, in <module>
    sys.exit(main())
  File "/opt/conda/lib/python3.8/site-packages/airflow/__main__.py", line 40, in main
    args.func(args)
  File "/opt/conda/lib/python3.8/site-packages/airflow/cli/cli_parser.py", line 48, in command
    return func(*args, **kwargs)
  File "/opt/conda/lib/python3.8/site-packages/airflow/utils/cli.py", line 89, in wrapper
    return f(*args, **kwargs)
  File "/opt/conda/lib/python3.8/site-packages/airflow/cli/commands/dag_command.py", line 103, in dag_backfill
    dag.run(
  File "/opt/conda/lib/python3.8/site-packages/airflow/models/dag.py", line 1701, in run
    job.run()
  File "/opt/conda/lib/python3.8/site-packages/airflow/jobs/base_job.py", line 237, in run
    self._execute()
  File "/opt/conda/lib/python3.8/site-packages/airflow/utils/session.py", line 65, in wrapper
    return func(*args, session=session, **kwargs)
  File "/opt/conda/lib/python3.8/site-packages/airflow/jobs/backfill_job.py", line 799, in _execute
    self._execute_for_run_dates(
  File "/opt/conda/lib/python3.8/site-packages/airflow/utils/session.py", line 62, in wrapper
    return func(*args, **kwargs)
  File "/opt/conda/lib/python3.8/site-packages/airflow/jobs/backfill_job.py", line 722, in _execute_for_run_dates
    processed_dag_run_dates = self._process_backfill_task_instances(
  File "/opt/conda/lib/python3.8/site-packages/airflow/utils/session.py", line 62, in wrapper
    return func(*args, **kwargs)
  File "/opt/conda/lib/python3.8/site-packages/airflow/jobs/backfill_job.py", line 620, in _process_backfill_task_instances
    self._update_counters(ti_status=ti_status)
  File "/opt/conda/lib/python3.8/site-packages/airflow/utils/session.py", line 65, in wrapper
    return func(*args, session=session, **kwargs)
  File "/opt/conda/lib/python3.8/site-packages/airflow/jobs/backfill_job.py", line 211, in _update_counters
    ti_status.running.pop(key)
KeyError: TaskInstanceKey(dag_id='dag_id', task_id='task_name', execution_date=datetime.datetime(2020, 12, 15, 0, 0, tzinfo=Timezone('UTC')), try_number=2)

From the webserver, it looks like after the second try the task actually finished successfully (the first time there was a network error.
Just before the error I also see this warning:
WARNING - TaskInstanceKey(dag_id='dag_id', task_id='task_name', execution_date=datetime.datetime(2020, 12, 15, 0, 0, tzinfo=Timezone('UTC')), try_number=2) state success not in running=dict_values([<TaskInstance: dag_id.task_name 2020-12-15 00:00:00+00:00 [queued]>])

This happens whenever a task has to retry. The subsequent commands are not run and the backfill command has to be re-run to continue.
What you expected to happen: The backfill command to continue to the next step.

How to reproduce it: Not sure. Create a DAG with a future start date with a task that fails on the first try but succeeds in the second, keep it turned off, and run a backfill command with a single past date. Command that was used: airflow dags backfill dag_id -s 2020-12-15 -e 2020-12-15

Anything else we need to know:


default_args = {
        'owner':            'owner',
        'depends_on_past':  False,
        'email':            ['email@address.com'],
        'email_on_failure': False,
        'email_on_retry':   False,
        'retries':          3,
        'retry_delay':      timedelta(minutes=5),
        'concurrency':      4
        }
dag_id = DAG(
        dag_id='dag_id',
        default_args=default_args,
        description='Some Description',
        start_date=datetime(2021, 1, 1),
        schedule_interval=timedelta(weeks=1), catchup=True,
        template_searchpath=templates_searchpath,
        )

@sarvo-madhavan sarvo-madhavan added the kind:bug This is a clearly a bug label Dec 26, 2020
@boring-cyborg
Copy link

boring-cyborg bot commented Dec 26, 2020

Thanks for opening your first issue here! Be sure to follow the issue template!

@potiuk potiuk added the invalid label Dec 26, 2020
@potiuk
Copy link
Member

potiuk commented Dec 26, 2020

Hey @sarvothaman - you need to provider way more information in order to successfully report issue - version of airflow is a bare mniumum, but without logs, information what you have done to debug and check what's wrong, what are the conditions and how to reproduce the issue, there is no way anyone will do anything with the issue. I am closing it as invalid, until you specify enough information to be able to reproduce it or at least understand what's wrong.

@potiuk potiuk closed this as completed Dec 26, 2020
@sarvo-madhavan
Copy link
Author

sarvo-madhavan commented Dec 26, 2020

@potiuk Sorry, accidentally hit enter before entering all the info (with no way to delete?). In any case, added the details

@sarvo-madhavan sarvo-madhavan changed the title Backfill crashes with "KeyError: TaskInstanceKey Backfill crashes with "KeyError: TaskInstanceKey when task has retries Dec 26, 2020
@potiuk potiuk reopened this Dec 26, 2020
@potiuk potiuk removed the invalid label Dec 26, 2020
@sarvo-madhavan sarvo-madhavan changed the title Backfill crashes with "KeyError: TaskInstanceKey when task has retries Backfill crashes with "KeyError: TaskInstanceKey" when task has retries Dec 26, 2020
@potiuk
Copy link
Member

potiuk commented Dec 26, 2020

@potiuk Sorry, accidentally hit enter before entering all the info (with no way to delete?). In any case, added the details

I was too fast then :). Sorry. I see very comprehensive information now :). Looks like an interesting one to take a look at. I believe this is something we are already aware of - there are some cases where try_number is wrongly calculated @turbaszek and @ashb -> I know you had discussions about similar case - maybe it is related?

@potiuk potiuk added this to the Airflow 2.0.1 milestone Dec 26, 2020
@turbaszek
Copy link
Member

@sarvothaman do you by any chance use a sensor in the DAG (especially in reschedule mode)?

@kaxil
Copy link
Member

kaxil commented Jan 19, 2021

Create a DAG with a future start date with a task that fails on the first try but succeeds in the second, keep it turned off, and run a backfill command with a single past date. Command that was used: airflow dags backfill dag_id -s 2020-12-15 -e 2020-12-15

future start date with backfill ?

@kaxil kaxil removed this from the Airflow 2.0.1 milestone Jan 19, 2021
@vikramkoka vikramkoka added the affected_version:2.0 Issues Reported for 2.0 label Feb 6, 2021
@leonsmith
Copy link
Contributor

We hit this last night, ill follow up later today with some more information on environment & conditions it happened under.

@vikramkoka
Copy link
Contributor

@leonsmith Looking forward to more information. Not sure we can do much with this issue as it stands.

@linyfei
Copy link

linyfei commented Apr 15, 2021

I got the same error.Let me explain my workflow : I submitted the airflow job with DebugExecutor on my mac and submitted it to Amazon EMR.
The weird thing is that my airflow job(all steps) has been submitted successfully, and it was confirmed to run successfully on Amazon EMR.

I tried to find google for relevant informations , but didn't get any things.

I sincerely hope that this problem will be taken seriously and resolved as soon as possible.Thanks a lot!

Here is my error:
Traceback (most recent call last): File "/Users/anker/PycharmProjects/Airflow2.0_test/dags/EMRS3FileRepartitionTest.py", line 82, in <module> dag.run() File "/Users/anker/.conda/envs/Airflow2.0_test/lib/python3.7/site-packages/airflow/models/dag.py", line 1706, in run job.run() File "/Users/anker/.conda/envs/Airflow2.0_test/lib/python3.7/site-packages/airflow/jobs/base_job.py", line 237, in run self._execute() File "/Users/anker/.conda/envs/Airflow2.0_test/lib/python3.7/site-packages/airflow/utils/session.py", line 65, in wrapper return func(*args, session=session, **kwargs) File "/Users/anker/.conda/envs/Airflow2.0_test/lib/python3.7/site-packages/airflow/jobs/backfill_job.py", line 805, in _execute session=session, File "/Users/anker/.conda/envs/Airflow2.0_test/lib/python3.7/site-packages/airflow/utils/session.py", line 62, in wrapper return func(*args, **kwargs) File "/Users/anker/.conda/envs/Airflow2.0_test/lib/python3.7/site-packages/airflow/jobs/backfill_job.py", line 727, in _execute_for_run_dates session=session, File "/Users/anker/.conda/envs/Airflow2.0_test/lib/python3.7/site-packages/airflow/utils/session.py", line 62, in wrapper return func(*args, **kwargs) File "/Users/anker/.conda/envs/Airflow2.0_test/lib/python3.7/site-packages/airflow/jobs/backfill_job.py", line 620, in _process_backfill_task_instances self._update_counters(ti_status=ti_status) File "/Users/anker/.conda/envs/Airflow2.0_test/lib/python3.7/site-packages/airflow/utils/session.py", line 65, in wrapper return func(*args, session=session, **kwargs) File "/Users/anker/.conda/envs/Airflow2.0_test/lib/python3.7/site-packages/airflow/jobs/backfill_job.py", line 231, in _update_counters ti_status.running.pop(reduced_key) KeyError: TaskInstanceKey(dag_id='emr_sync_ea_apply_l_dag', task_id='watch_steps', execution_date=datetime.datetime(2021, 4, 10, 0, 0, tzinfo=Timezone('UTC')), try_number=1)

Here is my Code:
`from airflow import DAG
from datetime import timedelta
from airflow.utils.dates import days_ago
from airflow.providers.amazon.aws.operators.emr_create_job_flow import EmrCreateJobFlowOperator
from airflow.providers.amazon.aws.operators.emr_add_steps import EmrAddStepsOperator
from airflow.providers.amazon.aws.operators.emr_terminate_job_flow import EmrTerminateJobFlowOperator
from airflow.providers.amazon.aws.sensors.emr_step import EmrStepSensor

default_args = {
'owner': 'stella',
'depends_on_past': True,
'wait_for_downstream': True,
'email': ['stella.wu@**.com'],
'email_on_failure': False,
'email_on_retry': False,
'reties': 1,
'retry_delay': timedelta(minutes=1),
}

Job_Flow_Overrides = {
'Name': 'apply_2_s_dag',
}
with DAG(
dag_id='apply_2_s_dag',
default_args=default_args,
catchup=True,
dagrun_timeout=timedelta(hours=2),
start_date=days_ago(5),
# schedule_interval='@once',
) as dag:
PYSPARK_STEPS = [
{
'Name': 'sync_apply_2s',
'ActionOnFailure': 'TERMINATE_CLUSTER',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args': ['spark-submit',
's3:///apply_2_s.py',
's3://
-partitioned',
's3://-dev',
'/
/apply_s/',
'{{ task_instance.execution_date }}'],
}
},
]
# create job flow at first
cluster_creator = EmrCreateJobFlowOperator(
task_id='create_job_flow',
aws_conn_id='aws_default',
emr_conn_id='emr_default',
job_flow_overrides=Job_Flow_Overrides,
)

step_adder = EmrAddStepsOperator(
    task_id='add_steps',
    job_flow_id="{{ task_instance.xcom_pull(task_ids='create_job_flow',key='return_value') }}",
    aws_conn_id='aws_default',
    steps=PYSPARK_STEPS,

)
# max steps is 256 could be add

# Asks for the state of the step until it reaches a terminal state. If it fails the sensor errors, failing the task
step_checker = EmrStepSensor(
    task_id='watch_steps',
    job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
    step_id="{{ task_instance.xcom_pull(task_ids='add_steps', key='return_value')[0] }}",
    aws_conn_id='aws_default',
)

# finally terminate job flow with emr
cluster_remover = EmrTerminateJobFlowOperator(
    task_id='remove_cluster',
    job_flow_id="{{ task_instance.xcom_pull(task_ids='create_job_flow', key='return_value') }}",
    aws_conn_id='aws_default',
)

cluster_creator >> step_adder >> step_checker >> cluster_remover

if __name__ == '__main__':
    dag.clear()
    dag.run()

`

@aliavni
Copy link
Contributor

aliavni commented May 27, 2021

I am having the same issue here on airflow 2.0.2. In my case, the dag works fine when deployed to the airflow server, but fails when I run or debug it in my IDE

The exception happens in this file airflow/jobs/backfill_job.py:221 at the ti_status.running.pop(reduced_key) line. Right before the exception:

the value of reduced_key is:

TaskInstanceKey(dag_id='refactor', task_id='task-1', execution_date=datetime.datetime(2021, 5, 7, 0, 0, tzinfo=Timezone('UTC')), try_number=2) 

and this is what I have in ti_status.running:

{
	TaskInstanceKey(dag_id='refactor', task_id='task-1', execution_date=datetime.datetime(2021, 5, 1, 0, 0, tzinfo=Timezone('UTC')), try_number=3): <TaskInstance: refactor.task-1 2021-05-01 00:00:00+00:00 [failed]>, 
	TaskInstanceKey(dag_id='refactor', task_id='task-1', execution_date=datetime.datetime(2021, 5, 2, 0, 0, tzinfo=Timezone('UTC')), try_number=3): <TaskInstance: refactor.task-1 2021-05-02 00:00:00+00:00 [failed]>, 
	TaskInstanceKey(dag_id='refactor', task_id='task-1', execution_date=datetime.datetime(2021, 5, 3, 0, 0, tzinfo=Timezone('UTC')), try_number=3): <TaskInstance: refactor.task-1 2021-05-03 00:00:00+00:00 [failed]>, 
	TaskInstanceKey(dag_id='refactor', task_id='task-1', execution_date=datetime.datetime(2021, 5, 4, 0, 0, tzinfo=Timezone('UTC')), try_number=3): <TaskInstance: refactor.task-1 2021-05-04 00:00:00+00:00 [failed]>, 
	TaskInstanceKey(dag_id='refactor', task_id='task-1', execution_date=datetime.datetime(2021, 5, 5, 0, 0, tzinfo=Timezone('UTC')), try_number=3): <TaskInstance: refactor.task-1 2021-05-05 00:00:00+00:00 [failed]>, 
	TaskInstanceKey(dag_id='refactor', task_id='task-1', execution_date=datetime.datetime(2021, 5, 6, 0, 0, tzinfo=Timezone('UTC')), try_number=3): <TaskInstance: refactor.task-1 2021-05-06 00:00:00+00:00 [failed]>, 
	TaskInstanceKey(dag_id='refactor', task_id='task-1', execution_date=datetime.datetime(2021, 5, 7, 0, 0, tzinfo=Timezone('UTC')), try_number=3): <TaskInstance: refactor.task-1 2021-05-07 00:00:00+00:00 [failed]>, 
	TaskInstanceKey(dag_id='refactor', task_id='task-1', execution_date=datetime.datetime(2021, 5, 8, 0, 0, tzinfo=Timezone('UTC')), try_number=3): <TaskInstance: refactor.task-1 2021-05-08 00:00:00+00:00 [failed]>, 
	TaskInstanceKey(dag_id='refactor', task_id='task-1', execution_date=datetime.datetime(2021, 5, 9, 0, 0, tzinfo=Timezone('UTC')), try_number=3): <TaskInstance: refactor.task-1 2021-05-09 00:00:00+00:00 [failed]>, 
	TaskInstanceKey(dag_id='refactor', task_id='task-1', execution_date=datetime.datetime(2021, 5, 10, 0, 0, tzinfo=Timezone('UTC')), try_number=3): <TaskInstance: refactor.task-1 2021-05-10 00:00:00+00:00 [failed]>, 
	TaskInstanceKey(dag_id='refactor', task_id='task-1', execution_date=datetime.datetime(2021, 5, 11, 0, 0, tzinfo=Timezone('UTC')), try_number=3): <TaskInstance: refactor.task-1 2021-05-11 00:00:00+00:00 [failed]>, 
	TaskInstanceKey(dag_id='refactor', task_id='task-1', execution_date=datetime.datetime(2021, 5, 12, 0, 0, tzinfo=Timezone('UTC')), try_number=3): <TaskInstance: refactor.task-1 2021-05-12 00:00:00+00:00 [failed]>
}

@github-actions
Copy link

This issue has been automatically marked as stale because it has been open for 30 days with no response from the author. It will be closed in next 7 days if no further activity occurs from the issue author.

@github-actions github-actions bot added the stale Stale PRs per the .github/workflows/stale.yml policy file label Jun 27, 2021
@github-actions
Copy link

github-actions bot commented Jul 5, 2021

This issue has been closed because it has not received response from the issue author.

@github-actions github-actions bot closed this as completed Jul 5, 2021
@huozhanfeng
Copy link
Contributor

It happened in airflow 2.1.0 also, and there is a bug in backfill logic. I have fixed it in my env and I will submit a PR for it later when idle...

@galuszkak
Copy link
Contributor

Hey @potiuk,

would it be possible to reopen this issue given that there is open PR patch for this in #17305 ?

@WulkanHefajstos
Copy link

WulkanHefajstos commented Sep 2, 2021

Hey Guys!

I'm facing similar issue but with DebugExecutor (Airflow 2.1.0).

I've found two cases when BackfillJob fails with the KeyError: TaskInstanceKey(.... in _update_counters method.

The first one is described in #17305 and the second one occurs when I run tests for tasks that've already been executed with any other executor (hence they already have increased try number in DB).

DebugExecutor doesn't execute TI method check_and_change_state_before_execution and therefore it doesn't increase try_number.

I didn't have much time to investigate it so i might be completely wrong but i think that it works now only thanks to max(1, self.try_number - 1) in TaskInstanceKey.reduced property (try number in key and reduced key are always equal to 1) and it breakes every time initial try number is different than 0 (or it's modified when handling reschedule state).

@pembo13
Copy link

pembo13 commented Sep 4, 2021

I seem to be getting the same error with apache-airflow==2.0.0? Should I create a new issue?

In my case, I successfully ran airflow dags backfill and now can't run airflow dags test without getting this error, unless I delete the entire DB and start again.

@tkang007
Copy link

tkang007 commented Sep 19, 2021

This issue still happened with apache-airflow==2.1.3 and 2.1.4 version.
In my case, airflow dags test was failed after DB reinitialized without `airflow dags backfill'

@potiuk
It was happened with SequentialExecutor and Postgresql in my case.
I can avoid this case with LocalExecutor and Postgresql metatore.
Thanks.

@potiuk
Copy link
Member

potiuk commented Sep 19, 2021

This issue still happened with apache-airflow==2.1.3 and 2.1.4 version.

In my case, airflow dags test was failed after DB reinitialized without `airflow dags backfill'

could you please open a new issue with reproducible case?

@rubbengimenez
Copy link

Hi guys!

Just for those who are facing this issue when debugging a DAG with DebugExecutor.

If you change your start_date and use the function days_ago instead of passing a datetime it seems to work fine (although I don't know the reasons behing this behaviour...):

from airflow.utils.dates import days_ago

default_args = {
    'owner': 'owner',
    # 'start_date': datetime(2021, 10, 5, 7, 45, 0, 0, tzinfo=TZINFO), 
    'start_date': days_ago(1), 
    'email': ['blabla@email.com'],
    'email_on_failure': False,
    'email_on_retry':   False,
    'retries': 2
}

kaxil pushed a commit that referenced this issue Oct 7, 2021
…eschedue state (#17305)

Backfill job fails to run when there are tasks run into rescheduling state. 
The error log as follows in issue #13322

```
Traceback (most recent call last):
  File "/opt/conda/bin/airflow", line 8, in <module>
    sys.exit(main())
  File "/opt/conda/lib/python3.8/site-packages/airflow/__main__.py", line 40, in main
    args.func(args)
  File "/opt/conda/lib/python3.8/site-packages/airflow/cli/cli_parser.py", line 48, in command
    return func(*args, **kwargs)
  File "/opt/conda/lib/python3.8/site-packages/airflow/utils/cli.py", line 89, in wrapper
    return f(*args, **kwargs)
  File "/opt/conda/lib/python3.8/site-packages/airflow/cli/commands/dag_command.py", line 103, in dag_backfill
    dag.run(
  File "/opt/conda/lib/python3.8/site-packages/airflow/models/dag.py", line 1701, in run
    job.run()
  File "/opt/conda/lib/python3.8/site-packages/airflow/jobs/base_job.py", line 237, in run
    self._execute()
  File "/opt/conda/lib/python3.8/site-packages/airflow/utils/session.py", line 65, in wrapper
    return func(*args, session=session, **kwargs)
  File "/opt/conda/lib/python3.8/site-packages/airflow/jobs/backfill_job.py", line 799, in _execute
    self._execute_for_run_dates(
  File "/opt/conda/lib/python3.8/site-packages/airflow/utils/session.py", line 62, in wrapper
    return func(*args, **kwargs)
  File "/opt/conda/lib/python3.8/site-packages/airflow/jobs/backfill_job.py", line 722, in _execute_for_run_dates
    processed_dag_run_dates = self._process_backfill_task_instances(
  File "/opt/conda/lib/python3.8/site-packages/airflow/utils/session.py", line 62, in wrapper
    return func(*args, **kwargs)
  File "/opt/conda/lib/python3.8/site-packages/airflow/jobs/backfill_job.py", line 620, in _process_backfill_task_instances
    self._update_counters(ti_status=ti_status)
  File "/opt/conda/lib/python3.8/site-packages/airflow/utils/session.py", line 65, in wrapper
    return func(*args, session=session, **kwargs)
  File "/opt/conda/lib/python3.8/site-packages/airflow/jobs/backfill_job.py", line 211, in _update_counters
    ti_status.running.pop(key)
KeyError: TaskInstanceKey(dag_id='dag_id', task_id='task_name', execution_date=datetime.datetime(2020, 12, 15, 0, 0, tzinfo=Timezone('UTC')), try_number=2)
```

The root cause is that the field `try_number` doesn't Increase when the task runs into rescheduling state, but there is a reduce operation on `try_number`. 

Currently, I can't think out a good ut to test it, only post the code here to help the one who is affected by it to solve the problem.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
affected_version:2.0 Issues Reported for 2.0 kind:bug This is a clearly a bug pending-response stale Stale PRs per the .github/workflows/stale.yml policy file
Projects
None yet
Development

No branches or pull requests