Skip to content

Rescheduled Sensors fail with "not found in serialized_dag table" #54939

@ciancolo

Description

@ciancolo

Apache Airflow version

3.0.5

If "Other Airflow 2 version" selected, which one?

No response

What happened?

We are running Airflow 3.0.4 (latest version available on PyPI) with LocalExecutor and a PostgreSQL 16 database hosted on another machine in the same network as the metastore.

Since upgrading from Airflow 2.11 to Airflow 3, we’ve been encountering an issue where sensors are killed before their configured timeout. This only happens during periods of high load, when multiple DAGs and sensors are running in parallel.

The sensors are configured as follows:

ExternalTaskSensor(task_id='my_sensor_Task',
                                           external_dag_id='external_dag',
                                           external_task_id='external_task',
                                           allowed_states=['success'],
                                           execution_delta=timedelta(seconds=0),
                                           poke_interval=60,
                                           mode='reschedule',
                                           timeout=60 * 60 * 3,
                                           dag=dag)

Here is our Airflow configuration:

[core]
parallelism = 32
max_active_tasks_per_dag = 16
max_active_runs_per_dag = 16

[database]
sql_alchemy_pool_size = 20
sql_alchemy_max_overflow = 40
sql_alchemy_pool_recycle = 1800

Yesterday we increased the pool size and the max overflow parameters for the database. The issue has improved, but it is still not fully resolved.

We also checked the database to see if it was overloaded or unable to handle all the requests, but it looks fine.

Looking at the logs, we find the following errors in the scheduler log:

{base_executor.py:512} DEBUG - Changing state: TaskInstanceKey(dag_id='sensor_race_test', task_id='test_sensor_38', run_id='scheduled__2024-01-10T00:00:00+00:00', try_number=2, map_index=-1)
{base_executor.py:517} DEBUG - Could not find key: TaskInstanceKey(dag_id='sensor_race_test', task_id='test_sensor_38', run_id='scheduled__2024-01-10T00:00:00+00:00', try_number=2, map_index=-1)
{scheduler_job_runner.py:811} INFO - Received executor event with state success for task instance TaskInstanceKey(dag_id='sensor_race_test', task_id='test_sensor_38', run_id='scheduled__2024-01-10T00:00:00+00:00', try_number=2, map_index=-1)
{scheduler_job_runner.py:853} INFO - TaskInstance Finished: dag_id=sensor_race_test, task_id=test_sensor_38, run_id=scheduled__2024-01-10T00:00:00+00:00, map_index=-1, run_start_date=2025-08-26 09:33:41.915710+00:00, run_end_date=2025-08-26 09:33:48.797037+00:00, run_duration=6.881327, state=scheduled, executor=LocalExecutor(parallelism=32), executor_state=success, try_number=2, max_tries=1, pool=default_pool, queue=default, priority_weight=1, operator=PythonSensor, queued_dttm=2025-08-26 09:33:41.700085+00:00, scheduled_dttm=2025-08-26 09:33:54.249313+00:00,queued_by_job_id=10439623, pid=1969715
{scheduler_job_runner.py:926} ERROR - DAG 'sensor_race_test' for task instance <TaskInstance: sensor_race_test.test_sensor_38 scheduled__2024-01-10T00:00:00+00:00 [scheduled]> not found in serialized_dag table
{taskinstance.py:1882} ERROR - Executor LocalExecutor(parallelism=32) reported that the task instance <TaskInstance: sensor_race_test.test_sensor_38 scheduled__2024-01-10T00:00:00+00:00 [scheduled]> finished with state success, but the task instance's state attribute is scheduled. Learn more: https://airflow.apache.org/docs/apache-airflow/stable/troubleshooting.html#task-state-changed-externally
{listener.py:37} DEBUG - Calling 'on_task_instance_failed' with {'previous_state': <TaskInstanceState.RUNNING: 'running'>, 'task_instance': <TaskInstance: sensor_race_test.test_sensor_38 scheduled__2024-01-10T00:00:00+00:00 [failed]>, 'error': "Executor LocalExecutor(parallelism=32) reported that the task instance <TaskInstance: sensor_race_test.test_sensor_38 scheduled__2024-01-10T00:00:00+00:00 [scheduled]> finished with state success, but the task instance's state attribute is scheduled. Learn more: https://airflow.apache.org/docs/apache-airflow/stable/troubleshooting.html#task-state-changed-externally"}
{taskinstance.py:2005} INFO - Marking task as FAILED. dag_id=sensor_race_test, task_id=test_sensor_38, run_id=scheduled__2024-01-10T00:00:00+00:00, logical_date=20240110T000000, start_date=20250826T093341, end_date=20250826T093354

No errors appear in the task logs or any other logs.

What you think should happen instead?

No response

How to reproduce

We can reproduce the issue using this minimal DAG that creates 50 Python sensors:

from airflow import DAG
from airflow.sensors.base import PokeReturnValue
from airflow.sensors.python import PythonSensor
from datetime import timedelta, datetime
import random
import time

# Funzione fittizia che "simula" una condizione esterna
# Restituisce False quasi sempre, così il sensore rimane in reschedule
# e genera tante richieste allo scheduler

def fake_condition():
    # Simula una probabilità bassa di "successo"
    return random.random() < 0.01


def sensor_fn():
    return PokeReturnValue(is_done=fake_condition())


def make_sensor(task_id):
    return PythonSensor(
        task_id=task_id,
        python_callable=sensor_fn,
        poke_interval=5,        # molto breve per stressare
        mode="reschedule",      # fondamentale per generare load su scheduler
        timeout=600,             # dopo 10 minuti si interrompe
    )


def make_dag(dag_id):
    with DAG(
        dag_id=dag_id,
        start_date=datetime(2024, 1, 10),
        schedule="@once",
        catchup=False,
        default_args={"owner": "airflow", "retries": 0},
        tags=["test", "sensor"],
    ) as dag:

        # Genera molti sensori paralleli
        for i in range(50):  # puoi aumentare a 50-100 per stressare di più
            make_sensor(f"test_sensor_{i}")

        return dag


globals()["sensor_race_test"] = make_dag("sensor_race_test")

Operating System

Ubuntu 22.04.5 LTS

Versions of Apache Airflow Providers

apache-airflow==3.0.4
apache-airflow-core==3.0.4
apache-airflow-providers-apache-hdfs==4.10.2
apache-airflow-providers-apache-spark==5.3.2
apache-airflow-providers-apache-sqoop==3.2.0
apache-airflow-providers-common-compat==1.7.3
apache-airflow-providers-common-io==1.6.2
apache-airflow-providers-common-sql==1.27.4
apache-airflow-providers-elasticsearch==6.3.2
apache-airflow-providers-fab==2.3.1
apache-airflow-providers-http==5.3.3
apache-airflow-providers-jdbc==5.2.2
apache-airflow-providers-postgres==6.2.2
apache-airflow-providers-salesforce==5.11.2
apache-airflow-providers-smtp==2.1.2
apache-airflow-providers-ssh==4.1.2
apache-airflow-providers-standard==1.5.0
apache-airflow-providers-tableau==5.2.0
apache-airflow-task-sdk==1.0.4

Deployment

Virtualenv installation

Deployment details

No response

Anything else?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions