diff --git a/airflow-core/tests/integration/otel/test_otel.py b/airflow-core/tests/integration/otel/test_otel.py index 41685f9b66e7b..99e99c3941894 100644 --- a/airflow-core/tests/integration/otel/test_otel.py +++ b/airflow-core/tests/integration/otel/test_otel.py @@ -873,6 +873,11 @@ def test_scheduler_change_after_the_first_task_finishes( will handle the rest of the dag processing. The paused thread will be resumed afterwards. """ + # For this test, scheduler1 must be idle but still considered healthy by scheduler2. + # If scheduler2 marks the job as unhealthy, then it will recreate scheduler1's spans + # because it will consider them lost. + os.environ["AIRFLOW__SCHEDULER__SCHEDULER_HEALTH_CHECK_THRESHOLD"] = "90" + celery_worker_process = None scheduler_process_1 = None apiserver_process = None @@ -937,13 +942,12 @@ def test_scheduler_change_after_the_first_task_finishes( with open(self.control_file, "w") as file: file.write("continue") - # Wait for scheduler2 to be up and running. - time.sleep(10) - wait_for_dag_run_and_check_span_status( dag_id=dag_id, run_id=run_id, max_wait_time=120, span_status=SpanStatus.SHOULD_END ) + # Stop scheduler2 in case it still has a db lock on the dag_run. + scheduler_process_2.terminate() scheduler_process_1.send_signal(signal.SIGCONT) # Wait for the scheduler to start again and continue running. @@ -959,6 +963,9 @@ def test_scheduler_change_after_the_first_task_finishes( with create_session() as session: dump_airflow_metadata_db(session) + # Reset for the rest of the tests. + os.environ["AIRFLOW__SCHEDULER__SCHEDULER_HEALTH_CHECK_THRESHOLD"] = "15" + # Terminate the processes. celery_worker_process.terminate() celery_worker_process.wait() @@ -969,7 +976,6 @@ def test_scheduler_change_after_the_first_task_finishes( apiserver_process.terminate() apiserver_process.wait() - scheduler_process_2.terminate() scheduler_process_2.wait() out, err = capfd.readouterr() @@ -991,7 +997,6 @@ def test_scheduler_exits_gracefully_in_the_middle_of_the_first_task( """ celery_worker_process = None - scheduler_process_1 = None apiserver_process = None scheduler_process_2 = None try: @@ -1032,7 +1037,7 @@ def test_scheduler_exits_gracefully_in_the_middle_of_the_first_task( with capfd.disabled(): scheduler_process_1.terminate() - assert scheduler_process_1.wait(timeout=30) == 0 + assert scheduler_process_1.wait() == 0 check_dag_run_state_and_span_status( dag_id=dag_id, run_id=run_id, state=State.RUNNING, span_status=SpanStatus.NEEDS_CONTINUANCE @@ -1049,9 +1054,6 @@ def test_scheduler_exits_gracefully_in_the_middle_of_the_first_task( with open(self.control_file, "w") as file: file.write("continue") - # Wait for scheduler2 to be up and running. - time.sleep(10) - wait_for_dag_run_and_check_span_status( dag_id=dag_id, run_id=run_id, max_wait_time=120, span_status=SpanStatus.ENDED ) @@ -1066,8 +1068,6 @@ def test_scheduler_exits_gracefully_in_the_middle_of_the_first_task( celery_worker_process.terminate() celery_worker_process.wait() - scheduler_process_1.wait() - apiserver_process.terminate() apiserver_process.wait() @@ -1253,9 +1253,6 @@ def test_scheduler_exits_forcefully_after_the_first_task_finishes( stderr=None, ) - # Wait for scheduler2 to be up and running. - time.sleep(10) - wait_for_dag_run_and_check_span_status( dag_id=dag_id, run_id=run_id, max_wait_time=120, span_status=SpanStatus.ENDED )