diff --git a/airflow-core/src/airflow/jobs/triggerer_job_runner.py b/airflow-core/src/airflow/jobs/triggerer_job_runner.py index 5e6d6d613b47f..b88f65868a037 100644 --- a/airflow-core/src/airflow/jobs/triggerer_job_runner.py +++ b/airflow-core/src/airflow/jobs/triggerer_job_runner.py @@ -603,6 +603,7 @@ def update_triggers(self, requested_trigger_ids: set[int]): self.running_triggers.union(x[0] for x in self.events) .union(self.cancelling_triggers) .union(trigger[0] for trigger in self.failed_triggers) + .union(trigger.id for trigger in self.creating_triggers) ) # Work out the two difference sets new_trigger_ids = requested_trigger_ids - known_trigger_ids diff --git a/airflow-core/tests/unit/jobs/test_triggerer_job.py b/airflow-core/tests/unit/jobs/test_triggerer_job.py index ef065cf2c8d16..b6892e6232551 100644 --- a/airflow-core/tests/unit/jobs/test_triggerer_job.py +++ b/airflow-core/tests/unit/jobs/test_triggerer_job.py @@ -1006,3 +1006,80 @@ async def test_trigger_can_fetch_dag_run_count_ti_count_in_deferrable(session, d assert task_instance.next_kwargs == { "event": {"ti_count": 1, "dr_count": 1, "task_states": {"test": {"parent_task": "success"}}} } + + +def test_update_triggers_prevents_duplicate_creation_queue_entries(session, supervisor_builder): + """ + Test that update_triggers prevents adding triggers to the creation queue + if they are already queued for creation. + """ + trigger = TimeDeltaTrigger(datetime.timedelta(days=7)) + dag_model, run, trigger_orm, task_instance = create_trigger_in_db(session, trigger) + + supervisor = supervisor_builder() + + # First call to update_triggers should add the trigger to creating_triggers + supervisor.update_triggers({trigger_orm.id}) + assert len(supervisor.creating_triggers) == 1 + assert supervisor.creating_triggers[0].id == trigger_orm.id + + # Second call to update_triggers with the same trigger_id should not add it again + supervisor.update_triggers({trigger_orm.id}) + assert len(supervisor.creating_triggers) == 1 + assert supervisor.creating_triggers[0].id == trigger_orm.id + + # Verify that the trigger is not in running_triggers yet (it's still queued) + assert trigger_orm.id not in supervisor.running_triggers + + # Verify that the trigger is not in any other tracking sets + assert trigger_orm.id not in supervisor.cancelling_triggers + assert not any(trigger_id == trigger_orm.id for trigger_id, _ in supervisor.events) + assert not any(trigger_id == trigger_orm.id for trigger_id, _ in supervisor.failed_triggers) + + +def test_update_triggers_prevents_duplicate_creation_queue_entries_with_multiple_triggers( + session, supervisor_builder, dag_maker +): + """ + Test that update_triggers prevents adding multiple triggers to the creation queue + if they are already queued for creation. + """ + trigger1 = TimeDeltaTrigger(datetime.timedelta(days=7)) + trigger2 = TimeDeltaTrigger(datetime.timedelta(days=14)) + + dag_model1, run1, trigger_orm1, task_instance1 = create_trigger_in_db(session, trigger1) + + with dag_maker("test_dag_2"): + EmptyOperator(task_id="test_ti_2") + + run2 = dag_maker.create_dagrun() + trigger_orm2 = Trigger.from_object(trigger2) + ti2 = run2.task_instances[0] + session.add(trigger_orm2) + session.flush() + ti2.trigger_id = trigger_orm2.id + session.merge(ti2) + session.flush() + # Create a supervisor + supervisor = supervisor_builder() + + # First call to update_triggers should add both triggers to creating_triggers + supervisor.update_triggers({trigger_orm1.id, trigger_orm2.id}) + assert len(supervisor.creating_triggers) == 2 + trigger_ids = {trigger.id for trigger in supervisor.creating_triggers} + assert trigger_orm1.id in trigger_ids + assert trigger_orm2.id in trigger_ids + + # Second call to update_triggers with the same trigger_ids should not add them again + supervisor.update_triggers({trigger_orm1.id, trigger_orm2.id}) + assert len(supervisor.creating_triggers) == 2 + trigger_ids = {trigger.id for trigger in supervisor.creating_triggers} + assert trigger_orm1.id in trigger_ids + assert trigger_orm2.id in trigger_ids + + # Third call with just one trigger should not add duplicates + supervisor.update_triggers({trigger_orm1.id}) + assert len(supervisor.creating_triggers) == 2 + trigger_ids = {trigger.id for trigger in supervisor.creating_triggers} + assert trigger_orm1.id in trigger_ids + assert trigger_orm2.id in trigger_ids