Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 11 additions & 14 deletions airflow-core/tests/integration/otel/test_otel.py
Original file line number Diff line number Diff line change
Expand Up @@ -873,6 +873,11 @@ def test_scheduler_change_after_the_first_task_finishes(
will handle the rest of the dag processing. The paused thread will be resumed afterwards.
"""

# For this test, scheduler1 must be idle but still considered healthy by scheduler2.
# If scheduler2 marks the job as unhealthy, then it will recreate scheduler1's spans
# because it will consider them lost.
os.environ["AIRFLOW__SCHEDULER__SCHEDULER_HEALTH_CHECK_THRESHOLD"] = "90"

celery_worker_process = None
scheduler_process_1 = None
apiserver_process = None
Expand Down Expand Up @@ -937,13 +942,12 @@ def test_scheduler_change_after_the_first_task_finishes(
with open(self.control_file, "w") as file:
file.write("continue")

# Wait for scheduler2 to be up and running.
time.sleep(10)

wait_for_dag_run_and_check_span_status(
dag_id=dag_id, run_id=run_id, max_wait_time=120, span_status=SpanStatus.SHOULD_END
)

# Stop scheduler2 in case it still has a db lock on the dag_run.
scheduler_process_2.terminate()
scheduler_process_1.send_signal(signal.SIGCONT)

# Wait for the scheduler to start again and continue running.
Expand All @@ -959,6 +963,9 @@ def test_scheduler_change_after_the_first_task_finishes(
with create_session() as session:
dump_airflow_metadata_db(session)

# Reset for the rest of the tests.
os.environ["AIRFLOW__SCHEDULER__SCHEDULER_HEALTH_CHECK_THRESHOLD"] = "15"

# Terminate the processes.
celery_worker_process.terminate()
celery_worker_process.wait()
Expand All @@ -969,7 +976,6 @@ def test_scheduler_change_after_the_first_task_finishes(
apiserver_process.terminate()
apiserver_process.wait()

scheduler_process_2.terminate()
scheduler_process_2.wait()

out, err = capfd.readouterr()
Expand All @@ -991,7 +997,6 @@ def test_scheduler_exits_gracefully_in_the_middle_of_the_first_task(
"""

celery_worker_process = None
scheduler_process_1 = None
apiserver_process = None
scheduler_process_2 = None
try:
Expand Down Expand Up @@ -1032,7 +1037,7 @@ def test_scheduler_exits_gracefully_in_the_middle_of_the_first_task(
with capfd.disabled():
scheduler_process_1.terminate()

assert scheduler_process_1.wait(timeout=30) == 0
assert scheduler_process_1.wait() == 0

check_dag_run_state_and_span_status(
dag_id=dag_id, run_id=run_id, state=State.RUNNING, span_status=SpanStatus.NEEDS_CONTINUANCE
Expand All @@ -1049,9 +1054,6 @@ def test_scheduler_exits_gracefully_in_the_middle_of_the_first_task(
with open(self.control_file, "w") as file:
file.write("continue")

# Wait for scheduler2 to be up and running.
time.sleep(10)

wait_for_dag_run_and_check_span_status(
dag_id=dag_id, run_id=run_id, max_wait_time=120, span_status=SpanStatus.ENDED
)
Expand All @@ -1066,8 +1068,6 @@ def test_scheduler_exits_gracefully_in_the_middle_of_the_first_task(
celery_worker_process.terminate()
celery_worker_process.wait()

scheduler_process_1.wait()

apiserver_process.terminate()
apiserver_process.wait()

Expand Down Expand Up @@ -1253,9 +1253,6 @@ def test_scheduler_exits_forcefully_after_the_first_task_finishes(
stderr=None,
)

# Wait for scheduler2 to be up and running.
time.sleep(10)

wait_for_dag_run_and_check_span_status(
dag_id=dag_id, run_id=run_id, max_wait_time=120, span_status=SpanStatus.ENDED
)
Expand Down