From cc80d12a66a9e7bfc1d27a1da5412099a70ae9bd Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Wed, 13 Aug 2025 09:29:12 +0100 Subject: [PATCH] [v3-0-test] Prevent repeated warning of triggers being added twice in triggerer (#54438) When running multiple deferrable in parallel, triggers are repeatedly added to triggerer queue to create by the update_trigger's method. Then it's being skipped with a warning at the creating_trigger's method. The warning is too many when there're many tasks to defer causing inefficency. The fix here was to exclude the already queued triggers from the triggers to create and I purposely left the warning log of encountering a trigger already in to_create in case there's another way it could still be possible. (cherry picked from commit bfd0bbe4ce20394ca91fa8efcc946f05b4a5de7a) Co-authored-by: Ephraim Anierobi --- .../src/airflow/jobs/triggerer_job_runner.py | 1 + .../tests/unit/jobs/test_triggerer_job.py | 77 +++++++++++++++++++ 2 files changed, 78 insertions(+) diff --git a/airflow-core/src/airflow/jobs/triggerer_job_runner.py b/airflow-core/src/airflow/jobs/triggerer_job_runner.py index 1cf32ac2e4d35..916854fdf426a 100644 --- a/airflow-core/src/airflow/jobs/triggerer_job_runner.py +++ b/airflow-core/src/airflow/jobs/triggerer_job_runner.py @@ -574,6 +574,7 @@ def update_triggers(self, requested_trigger_ids: set[int]): self.running_triggers.union(x[0] for x in self.events) .union(self.cancelling_triggers) .union(trigger[0] for trigger in self.failed_triggers) + .union(trigger.id for trigger in self.creating_triggers) ) # Work out the two difference sets new_trigger_ids = requested_trigger_ids - known_trigger_ids diff --git a/airflow-core/tests/unit/jobs/test_triggerer_job.py b/airflow-core/tests/unit/jobs/test_triggerer_job.py index 5e543a161dbd3..89f5823f658d6 100644 --- a/airflow-core/tests/unit/jobs/test_triggerer_job.py +++ b/airflow-core/tests/unit/jobs/test_triggerer_job.py @@ -988,3 +988,80 @@ async def test_trigger_can_fetch_dag_run_count_ti_count_in_deferrable(session, d assert task_instance.next_kwargs == { "event": {"ti_count": 1, "dr_count": 1, "task_states": {"test": {"parent_task": "success"}}} } + + +def test_update_triggers_prevents_duplicate_creation_queue_entries(session, supervisor_builder): + """ + Test that update_triggers prevents adding triggers to the creation queue + if they are already queued for creation. + """ + trigger = TimeDeltaTrigger(datetime.timedelta(days=7)) + dag_model, run, trigger_orm, task_instance = create_trigger_in_db(session, trigger) + + supervisor = supervisor_builder() + + # First call to update_triggers should add the trigger to creating_triggers + supervisor.update_triggers({trigger_orm.id}) + assert len(supervisor.creating_triggers) == 1 + assert supervisor.creating_triggers[0].id == trigger_orm.id + + # Second call to update_triggers with the same trigger_id should not add it again + supervisor.update_triggers({trigger_orm.id}) + assert len(supervisor.creating_triggers) == 1 + assert supervisor.creating_triggers[0].id == trigger_orm.id + + # Verify that the trigger is not in running_triggers yet (it's still queued) + assert trigger_orm.id not in supervisor.running_triggers + + # Verify that the trigger is not in any other tracking sets + assert trigger_orm.id not in supervisor.cancelling_triggers + assert not any(trigger_id == trigger_orm.id for trigger_id, _ in supervisor.events) + assert not any(trigger_id == trigger_orm.id for trigger_id, _ in supervisor.failed_triggers) + + +def test_update_triggers_prevents_duplicate_creation_queue_entries_with_multiple_triggers( + session, supervisor_builder, dag_maker +): + """ + Test that update_triggers prevents adding multiple triggers to the creation queue + if they are already queued for creation. + """ + trigger1 = TimeDeltaTrigger(datetime.timedelta(days=7)) + trigger2 = TimeDeltaTrigger(datetime.timedelta(days=14)) + + dag_model1, run1, trigger_orm1, task_instance1 = create_trigger_in_db(session, trigger1) + + with dag_maker("test_dag_2"): + EmptyOperator(task_id="test_ti_2") + + run2 = dag_maker.create_dagrun() + trigger_orm2 = Trigger.from_object(trigger2) + ti2 = run2.task_instances[0] + session.add(trigger_orm2) + session.flush() + ti2.trigger_id = trigger_orm2.id + session.merge(ti2) + session.flush() + # Create a supervisor + supervisor = supervisor_builder() + + # First call to update_triggers should add both triggers to creating_triggers + supervisor.update_triggers({trigger_orm1.id, trigger_orm2.id}) + assert len(supervisor.creating_triggers) == 2 + trigger_ids = {trigger.id for trigger in supervisor.creating_triggers} + assert trigger_orm1.id in trigger_ids + assert trigger_orm2.id in trigger_ids + + # Second call to update_triggers with the same trigger_ids should not add them again + supervisor.update_triggers({trigger_orm1.id, trigger_orm2.id}) + assert len(supervisor.creating_triggers) == 2 + trigger_ids = {trigger.id for trigger in supervisor.creating_triggers} + assert trigger_orm1.id in trigger_ids + assert trigger_orm2.id in trigger_ids + + # Third call with just one trigger should not add duplicates + supervisor.update_triggers({trigger_orm1.id}) + assert len(supervisor.creating_triggers) == 2 + trigger_ids = {trigger.id for trigger in supervisor.creating_triggers} + assert trigger_orm1.id in trigger_ids + assert trigger_orm2.id in trigger_ids