diff --git a/airflow-core/src/airflow/jobs/triggerer_job_runner.py b/airflow-core/src/airflow/jobs/triggerer_job_runner.py index f5bc2d94d0601..617643c3f24ad 100644 --- a/airflow-core/src/airflow/jobs/triggerer_job_runner.py +++ b/airflow-core/src/airflow/jobs/triggerer_job_runner.py @@ -649,7 +649,13 @@ def update_triggers(self, requested_trigger_ids: set[int]): ) if new_trigger_orm.task_instance: log_path = render_log_fname(ti=new_trigger_orm.task_instance) - + if not new_trigger_orm.task_instance.dag_version_id: + # This is to handle 2 to 3 upgrade where TI.dag_version_id can be none + log.warning( + "TaskInstance associated with Trigger has no associated Dag Version, skipping the trigger", + ti_id=new_trigger_orm.task_instance.id, + ) + continue ser_ti = workloads.TaskInstance.model_validate( new_trigger_orm.task_instance, from_attributes=True ) diff --git a/airflow-core/tests/unit/jobs/test_triggerer_job.py b/airflow-core/tests/unit/jobs/test_triggerer_job.py index 1e6eb1b9d6fc4..14302e36a8308 100644 --- a/airflow-core/tests/unit/jobs/test_triggerer_job.py +++ b/airflow-core/tests/unit/jobs/test_triggerer_job.py @@ -1140,6 +1140,38 @@ def test_update_triggers_prevents_duplicate_creation_queue_entries_with_multiple assert trigger_orm2.id in trigger_ids +def test_update_triggers_skips_when_ti_has_no_dag_version(session, supervisor_builder, dag_maker): + """ + Ensure supervisor skips creating a trigger when the linked TaskInstance has no dag_version_id. + """ + with dag_maker(dag_id="test_no_dag_version"): + EmptyOperator(task_id="t1") + dr = dag_maker.create_dagrun() + ti = dr.task_instances[0] + + # Create a Trigger and link it to the TaskInstance + trigger = TimeDeltaTrigger(datetime.timedelta(days=7)) + trigger_orm = Trigger.from_object(trigger) + session.add(trigger_orm) + session.flush() + + ti.trigger_id = trigger_orm.id + # Explicitly remove dag_version_id + ti.dag_version_id = None + session.merge(ti) + session.commit() + + supervisor = supervisor_builder() + + # Attempt to enqueue creation of this trigger + supervisor.update_triggers({trigger_orm.id}) + + # Assert that nothing was queued for creation and no subprocess writes happened + assert len(supervisor.creating_triggers) == 0 + assert trigger_orm.id not in supervisor.running_triggers + supervisor.stdin.write.assert_not_called() + + class TestTriggererMessageTypes: def test_message_types_in_triggerer(self): """