Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion airflow-core/src/airflow/jobs/triggerer_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -649,7 +649,13 @@ def update_triggers(self, requested_trigger_ids: set[int]):
)
if new_trigger_orm.task_instance:
log_path = render_log_fname(ti=new_trigger_orm.task_instance)

if not new_trigger_orm.task_instance.dag_version_id:
# This is to handle 2 to 3 upgrade where TI.dag_version_id can be none
log.warning(
"TaskInstance associated with Trigger has no associated Dag Version, skipping the trigger",
ti_id=new_trigger_orm.task_instance.id,
)
continue
ser_ti = workloads.TaskInstance.model_validate(
new_trigger_orm.task_instance, from_attributes=True
)
Expand Down
32 changes: 32 additions & 0 deletions airflow-core/tests/unit/jobs/test_triggerer_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -1140,6 +1140,38 @@ def test_update_triggers_prevents_duplicate_creation_queue_entries_with_multiple
assert trigger_orm2.id in trigger_ids


def test_update_triggers_skips_when_ti_has_no_dag_version(session, supervisor_builder, dag_maker):
"""
Ensure supervisor skips creating a trigger when the linked TaskInstance has no dag_version_id.
"""
with dag_maker(dag_id="test_no_dag_version"):
EmptyOperator(task_id="t1")
dr = dag_maker.create_dagrun()
ti = dr.task_instances[0]

# Create a Trigger and link it to the TaskInstance
trigger = TimeDeltaTrigger(datetime.timedelta(days=7))
trigger_orm = Trigger.from_object(trigger)
session.add(trigger_orm)
session.flush()

ti.trigger_id = trigger_orm.id
# Explicitly remove dag_version_id
ti.dag_version_id = None
session.merge(ti)
session.commit()

supervisor = supervisor_builder()

# Attempt to enqueue creation of this trigger
supervisor.update_triggers({trigger_orm.id})

# Assert that nothing was queued for creation and no subprocess writes happened
assert len(supervisor.creating_triggers) == 0
assert trigger_orm.id not in supervisor.running_triggers
supervisor.stdin.write.assert_not_called()


class TestTriggererMessageTypes:
def test_message_types_in_triggerer(self):
"""
Expand Down