From 3538b5bd9290f68f67d4ae1b4f3cc884b5c51adb Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Fri, 19 Sep 2025 12:45:50 +0100 Subject: [PATCH 1/4] Fix triggerer errors after Airflow 2 to 3 migration When upgrading from Airflow 2, existing deferred triggers can reference TaskInstances without a dag_version_id and DagRuns with conf=None. This caused errors when the triggerer tried to start those triggers and when workers consumed ti_run responses. This change: 1. Skips starting triggers whose TaskInstance lacks dag_version_id, logging a warning instead of erroring 2. Coerces DagRun.conf from None to {} in the ti_run response for compatibility with Airflow 2-era data 3. Adds unit tests covering both behaviors This prevents triggerer crashes and makes deferred tasks resume reliably after migration. --- .../execution_api/routes/task_instances.py | 4 +- .../src/airflow/jobs/triggerer_job_runner.py | 7 +++- .../versions/head/test_task_instances.py | 42 +++++++++++++++++++ .../tests/unit/jobs/test_triggerer_job.py | 32 ++++++++++++++ 4 files changed, 83 insertions(+), 2 deletions(-) diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py index e27922a63df7b..2bb919779d836 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py @@ -225,7 +225,9 @@ def ti_run( if not dr: log.error("DagRun not found", dag_id=ti.dag_id, run_id=ti.run_id) raise ValueError(f"DagRun with dag_id={ti.dag_id} and run_id={ti.run_id} not found.") - + # for airflow 2 with dag_run.conf equal to None, we should convert it to {} + if dr.conf is None: + dr.conf = {} # Send the keys to the SDK so that the client requests to clear those XComs from the server. # The reason we cannot do this here in the server is because we need to issue a purge on custom XCom backends # too. With the current assumption, the workers ONLY have access to the custom XCom backends directly and they diff --git a/airflow-core/src/airflow/jobs/triggerer_job_runner.py b/airflow-core/src/airflow/jobs/triggerer_job_runner.py index f5bc2d94d0601..a242efab84b35 100644 --- a/airflow-core/src/airflow/jobs/triggerer_job_runner.py +++ b/airflow-core/src/airflow/jobs/triggerer_job_runner.py @@ -649,7 +649,12 @@ def update_triggers(self, requested_trigger_ids: set[int]): ) if new_trigger_orm.task_instance: log_path = render_log_fname(ti=new_trigger_orm.task_instance) - + if not new_trigger_orm.task_instance.dag_version_id: + log.warning( + "TaskInstance associated with Trigger has no associated Dag Version, skipping the trigger", + ti_id=new_trigger_orm.task_instance.id, + ) + continue ser_ti = workloads.TaskInstance.model_validate( new_trigger_orm.task_instance, from_attributes=True ) diff --git a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py index 863098e8e6b94..572ba655f6deb 100644 --- a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py +++ b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py @@ -239,6 +239,48 @@ def test_ti_run_state_to_running( ) assert response.status_code == 409 + def test_ti_run_coerces_none_conf_to_empty_dict( + self, client, session, create_task_instance, time_machine + ): + """ + Ensure that when DagRun.conf is None in the DB, ti_run response returns an empty dict for conf. + """ + instant_str = "2024-09-30T12:00:00Z" + instant = timezone.parse(instant_str) + time_machine.move_to(instant, tick=False) + + ti = create_task_instance( + task_id="test_ti_run_coerces_none_conf_to_empty_dict", + state=State.QUEUED, + dagrun_state=DagRunState.RUNNING, + session=session, + start_date=instant, + dag_id=str(uuid4()), + ) + session.commit() + + # Explicitly set the associated DagRun.conf to None to exercise the coercion path + from airflow.models.dagrun import DagRun as DR + + dr = session.scalars(select(DR).filter_by(dag_id=ti.dag_id, run_id=ti.run_id)).one() + dr.conf = None + session.merge(dr) + session.commit() + + response = client.patch( + f"/execution/task-instances/{ti.id}/run", + json={ + "state": "running", + "hostname": "random-hostname", + "unixname": "random-unixname", + "pid": 100, + "start_date": instant_str, + }, + ) + + assert response.status_code == 200 + assert response.json()["dag_run"]["conf"] == {} + def test_dynamic_task_mapping_with_parse_time_value(self, client, dag_maker): """ Test that the Task Instance upstream_map_indexes is correctly fetched when to running the Task Instances diff --git a/airflow-core/tests/unit/jobs/test_triggerer_job.py b/airflow-core/tests/unit/jobs/test_triggerer_job.py index 1e6eb1b9d6fc4..14302e36a8308 100644 --- a/airflow-core/tests/unit/jobs/test_triggerer_job.py +++ b/airflow-core/tests/unit/jobs/test_triggerer_job.py @@ -1140,6 +1140,38 @@ def test_update_triggers_prevents_duplicate_creation_queue_entries_with_multiple assert trigger_orm2.id in trigger_ids +def test_update_triggers_skips_when_ti_has_no_dag_version(session, supervisor_builder, dag_maker): + """ + Ensure supervisor skips creating a trigger when the linked TaskInstance has no dag_version_id. + """ + with dag_maker(dag_id="test_no_dag_version"): + EmptyOperator(task_id="t1") + dr = dag_maker.create_dagrun() + ti = dr.task_instances[0] + + # Create a Trigger and link it to the TaskInstance + trigger = TimeDeltaTrigger(datetime.timedelta(days=7)) + trigger_orm = Trigger.from_object(trigger) + session.add(trigger_orm) + session.flush() + + ti.trigger_id = trigger_orm.id + # Explicitly remove dag_version_id + ti.dag_version_id = None + session.merge(ti) + session.commit() + + supervisor = supervisor_builder() + + # Attempt to enqueue creation of this trigger + supervisor.update_triggers({trigger_orm.id}) + + # Assert that nothing was queued for creation and no subprocess writes happened + assert len(supervisor.creating_triggers) == 0 + assert trigger_orm.id not in supervisor.running_triggers + supervisor.stdin.write.assert_not_called() + + class TestTriggererMessageTypes: def test_message_types_in_triggerer(self): """ From 5c8d44c924541bc38726958d04b0262914ebdf68 Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Wed, 22 Oct 2025 11:57:06 +0100 Subject: [PATCH 2/4] Remove config check as that has been addressed in a different PR --- .../api_fastapi/execution_api/routes/task_instances.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py index 2bb919779d836..e27922a63df7b 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py @@ -225,9 +225,7 @@ def ti_run( if not dr: log.error("DagRun not found", dag_id=ti.dag_id, run_id=ti.run_id) raise ValueError(f"DagRun with dag_id={ti.dag_id} and run_id={ti.run_id} not found.") - # for airflow 2 with dag_run.conf equal to None, we should convert it to {} - if dr.conf is None: - dr.conf = {} + # Send the keys to the SDK so that the client requests to clear those XComs from the server. # The reason we cannot do this here in the server is because we need to issue a purge on custom XCom backends # too. With the current assumption, the workers ONLY have access to the custom XCom backends directly and they From da899256c29de254a1bcfbc8691199a91e3385e7 Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Wed, 22 Oct 2025 18:05:58 +0100 Subject: [PATCH 3/4] Add comment on why we added this --- airflow-core/src/airflow/jobs/triggerer_job_runner.py | 1 + 1 file changed, 1 insertion(+) diff --git a/airflow-core/src/airflow/jobs/triggerer_job_runner.py b/airflow-core/src/airflow/jobs/triggerer_job_runner.py index a242efab84b35..617643c3f24ad 100644 --- a/airflow-core/src/airflow/jobs/triggerer_job_runner.py +++ b/airflow-core/src/airflow/jobs/triggerer_job_runner.py @@ -650,6 +650,7 @@ def update_triggers(self, requested_trigger_ids: set[int]): if new_trigger_orm.task_instance: log_path = render_log_fname(ti=new_trigger_orm.task_instance) if not new_trigger_orm.task_instance.dag_version_id: + # This is to handle 2 to 3 upgrade where TI.dag_version_id can be none log.warning( "TaskInstance associated with Trigger has no associated Dag Version, skipping the trigger", ti_id=new_trigger_orm.task_instance.id, From fef88973c2f4262364f5cb5ba64a126d051e979d Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Wed, 22 Oct 2025 20:17:03 +0100 Subject: [PATCH 4/4] Remove null conf test --- .../versions/head/test_task_instances.py | 42 ------------------- 1 file changed, 42 deletions(-) diff --git a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py index 572ba655f6deb..863098e8e6b94 100644 --- a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py +++ b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py @@ -239,48 +239,6 @@ def test_ti_run_state_to_running( ) assert response.status_code == 409 - def test_ti_run_coerces_none_conf_to_empty_dict( - self, client, session, create_task_instance, time_machine - ): - """ - Ensure that when DagRun.conf is None in the DB, ti_run response returns an empty dict for conf. - """ - instant_str = "2024-09-30T12:00:00Z" - instant = timezone.parse(instant_str) - time_machine.move_to(instant, tick=False) - - ti = create_task_instance( - task_id="test_ti_run_coerces_none_conf_to_empty_dict", - state=State.QUEUED, - dagrun_state=DagRunState.RUNNING, - session=session, - start_date=instant, - dag_id=str(uuid4()), - ) - session.commit() - - # Explicitly set the associated DagRun.conf to None to exercise the coercion path - from airflow.models.dagrun import DagRun as DR - - dr = session.scalars(select(DR).filter_by(dag_id=ti.dag_id, run_id=ti.run_id)).one() - dr.conf = None - session.merge(dr) - session.commit() - - response = client.patch( - f"/execution/task-instances/{ti.id}/run", - json={ - "state": "running", - "hostname": "random-hostname", - "unixname": "random-unixname", - "pid": 100, - "start_date": instant_str, - }, - ) - - assert response.status_code == 200 - assert response.json()["dag_run"]["conf"] == {} - def test_dynamic_task_mapping_with_parse_time_value(self, client, dag_maker): """ Test that the Task Instance upstream_map_indexes is correctly fetched when to running the Task Instances