From c944ab6bf7404f1b02e0046f81a7508258db15e8 Mon Sep 17 00:00:00 2001 From: Kunal Bhattacharya Date: Tue, 23 Dec 2025 15:20:33 +0530 Subject: [PATCH 1/5] Update sqlalchemy compatible change for test_types.py --- .pre-commit-config.yaml | 1 + airflow-core/tests/unit/utils/test_types.py | 19 +++++++++++-------- 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 63d0b0c5c6095..5bbc14a5a56e6 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -457,6 +457,7 @@ repos: ^airflow-core/tests/unit/utils/test_db_cleanup.py$| ^airflow-core/tests/unit/utils/test_state.py$| ^airflow-core/tests/unit/utils/test_log_handlers.py$| + ^airflow-core/tests/unit/utils/test_types.py$| ^dev/airflow_perf/scheduler_dag_execution_timing.py$| ^providers/celery/.*\.py$| ^providers/cncf/kubernetes/.*\.py$| diff --git a/airflow-core/tests/unit/utils/test_types.py b/airflow-core/tests/unit/utils/test_types.py index 4a6831f40354d..19991bf785abb 100644 --- a/airflow-core/tests/unit/utils/test_types.py +++ b/airflow-core/tests/unit/utils/test_types.py @@ -19,6 +19,7 @@ from datetime import timedelta import pytest +from sqlalchemy import select from airflow.models.dagrun import DagRun from airflow.utils.state import State @@ -36,14 +37,16 @@ def test_runtype_enum_escape(dag_maker, session): pass dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED) - query = session.query( - DagRun.dag_id, - DagRun.state, - DagRun.run_type, - ).filter( - DagRun.dag_id == "test_enum_dags", - # make sure enum value can be used in filter queries - DagRun.run_type == DagRunType.SCHEDULED, + query = session.scalars( + select( + DagRun.dag_id, + DagRun.state, + DagRun.run_type, + ).where( + DagRun.dag_id == "test_enum_dags", + # make sure enum value can be used in filter queries + DagRun.run_type == DagRunType.SCHEDULED, + ) ) assert str(query.statement.compile(compile_kwargs={"literal_binds": True})) == ( "SELECT dag_run.dag_id, dag_run.state, dag_run.run_type \n" From 76253ef5553f223ab3d5474e4ff954be8239d45e Mon Sep 17 00:00:00 2001 From: Kunal Bhattacharya Date: Tue, 23 Dec 2025 19:00:47 +0530 Subject: [PATCH 2/5] Compatibility fixes --- airflow-core/tests/unit/utils/test_types.py | 22 ++++++++++----------- 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/airflow-core/tests/unit/utils/test_types.py b/airflow-core/tests/unit/utils/test_types.py index 19991bf785abb..277a0d8460723 100644 --- a/airflow-core/tests/unit/utils/test_types.py +++ b/airflow-core/tests/unit/utils/test_types.py @@ -37,24 +37,22 @@ def test_runtype_enum_escape(dag_maker, session): pass dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED) - query = session.scalars( - select( - DagRun.dag_id, - DagRun.state, - DagRun.run_type, - ).where( - DagRun.dag_id == "test_enum_dags", - # make sure enum value can be used in filter queries - DagRun.run_type == DagRunType.SCHEDULED, - ) + query = select( + DagRun.dag_id, + DagRun.state, + DagRun.run_type, + ).where( + DagRun.dag_id == "test_enum_dags", + # make sure enum value can be used in filter queries + DagRun.run_type == DagRunType.SCHEDULED, ) - assert str(query.statement.compile(compile_kwargs={"literal_binds": True})) == ( + rows = session.execute(query).all() + assert str(query.compile(compile_kwargs={"literal_binds": True})) == ( "SELECT dag_run.dag_id, dag_run.state, dag_run.run_type \n" "FROM dag_run \n" "WHERE dag_run.dag_id = 'test_enum_dags' AND dag_run.run_type = 'scheduled'" ) - rows = query.all() assert len(rows) == 1 assert rows[0].dag_id == "test_enum_dags" assert rows[0].state == State.RUNNING From ca3cd4fb46b60b3f4a4c316051c3d1eae0a8bcec Mon Sep 17 00:00:00 2001 From: Kunal Bhattacharya Date: Tue, 23 Dec 2025 20:51:45 +0530 Subject: [PATCH 3/5] Update sqlalchemy compatible change for test_manager.py --- .pre-commit-config.yaml | 1 + .../tests/unit/dag_processing/test_manager.py | 44 ++++++++----------- 2 files changed, 20 insertions(+), 25 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 5bbc14a5a56e6..6f97733d8bb3b 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -458,6 +458,7 @@ repos: ^airflow-core/tests/unit/utils/test_state.py$| ^airflow-core/tests/unit/utils/test_log_handlers.py$| ^airflow-core/tests/unit/utils/test_types.py$| + ^airflow-core/tests/unit/dag_processing/test_manager.py$| ^dev/airflow_perf/scheduler_dag_execution_timing.py$| ^providers/celery/.*\.py$| ^providers/cncf/kubernetes/.*\.py$| diff --git a/airflow-core/tests/unit/dag_processing/test_manager.py b/airflow-core/tests/unit/dag_processing/test_manager.py index 6bb1aea8d8f1d..9813074be5373 100644 --- a/airflow-core/tests/unit/dag_processing/test_manager.py +++ b/airflow-core/tests/unit/dag_processing/test_manager.py @@ -177,7 +177,7 @@ def test_remove_file_clears_import_error(self, tmp_path, configure_testing_dag_b manager.run() with create_session() as session: - import_errors = session.query(ParseImportError).all() + import_errors = session.scalars(select(ParseImportError)).all() assert len(import_errors) == 1 path_to_parse.unlink() @@ -186,7 +186,7 @@ def test_remove_file_clears_import_error(self, tmp_path, configure_testing_dag_b manager.run() with create_session() as session: - import_errors = session.query(ParseImportError).all() + import_errors = session.scalars(select(ParseImportError)).all() assert len(import_errors) == 0 session.rollback() @@ -440,7 +440,7 @@ def test_parsing_requests_only_bundles_being_parsed(self, testing_dag_bundle): manager._queue_requested_files_for_parsing() assert manager._file_queue == deque([file1]) with create_session() as session2: - parsing_request_after = session2.query(DagPriorityParsingRequest).all() + parsing_request_after = session2.scalars(select(DagPriorityParsingRequest)).all() assert len(parsing_request_after) == 1 assert parsing_request_after[0].relative_fileloc == "file_x.py" @@ -485,38 +485,32 @@ def test_scan_stale_dags(self, session): manager._files = [test_dag_path] manager._file_stats[test_dag_path] = stat - active_dag_count = ( - session.query(func.count(DagModel.dag_id)) - .filter( + active_dag_count = session.scalars( + select(func.count(DagModel.dag_id)).where( ~DagModel.is_stale, DagModel.relative_fileloc == str(test_dag_path.rel_path), DagModel.bundle_name == test_dag_path.bundle_name, ) - .scalar() - ) - assert active_dag_count == 1 + ).all() + assert active_dag_count == [1] manager._scan_stale_dags() - active_dag_count = ( - session.query(func.count(DagModel.dag_id)) - .filter( + active_dag_count = session.scalars( + select(func.count(DagModel.dag_id)).where( ~DagModel.is_stale, DagModel.relative_fileloc == str(test_dag_path.rel_path), DagModel.bundle_name == test_dag_path.bundle_name, ) - .scalar() - ) - assert active_dag_count == 0 + ).all() + assert active_dag_count == [0] - serialized_dag_count = ( - session.query(func.count(SerializedDagModel.dag_id)) - .filter(SerializedDagModel.dag_id == dag.dag_id) - .scalar() - ) + serialized_dag_count = session.scalars( + select(func.count(SerializedDagModel.dag_id)).where(SerializedDagModel.dag_id == dag.dag_id) + ).all() # Deactivating the DagModel should not delete the SerializedDagModel # SerializedDagModel gives history about Dags - assert serialized_dag_count == 1 + assert serialized_dag_count == [1] def test_kill_timed_out_processors_kill(self): manager = DagFileProcessorManager(max_runs=1, processor_timeout=5) @@ -876,7 +870,7 @@ def test_fetch_callbacks_from_database(self, configure_testing_dag_bundle): assert callbacks[0].run_id == "123" assert callbacks[1].run_id == "456" - assert session.query(DbCallbackRequest).count() == 0 + assert len(session.scalars(select(DbCallbackRequest)).all()) == 0 @conf_vars( { @@ -905,11 +899,11 @@ def test_fetch_callbacks_from_database_max_per_loop(self, tmp_path, configure_te with create_session() as session: manager.run() - assert session.query(DbCallbackRequest).count() == 3 + assert len(session.scalars(select(DbCallbackRequest)).all()) == 3 with create_session() as session: manager.run() - assert session.query(DbCallbackRequest).count() == 1 + assert len(session.scalars(select(DbCallbackRequest)).all()) == 1 @conf_vars({("core", "load_examples"): "False"}) def test_fetch_callbacks_ignores_other_bundles(self, configure_testing_dag_bundle): @@ -950,7 +944,7 @@ def test_fetch_callbacks_ignores_other_bundles(self, configure_testing_dag_bundl assert [c.run_id for c in callbacks] == ["match"] # The non-matching callback should remain in the DB - remaining = session.query(DbCallbackRequest).all() + remaining = session.scalars(select(DbCallbackRequest)).all() assert len(remaining) == 1 # Decode remaining request and verify it's for the other bundle remaining_req = remaining[0].get_callback_request() From 939044fe387f7d7563de16b13ad33f80d50a7c60 Mon Sep 17 00:00:00 2001 From: Kunal Bhattacharya Date: Tue, 23 Dec 2025 21:23:31 +0530 Subject: [PATCH 4/5] Update sqlalchemy compatible change for test_processor.py --- .pre-commit-config.yaml | 1 + airflow-core/tests/unit/dag_processing/test_processor.py | 5 +++-- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index d11604203f72b..3f28cb467cb64 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -459,6 +459,7 @@ repos: ^airflow-core/tests/unit/utils/test_log_handlers.py$| ^airflow-core/tests/unit/utils/test_types.py$| ^airflow-core/tests/unit/dag_processing/test_manager.py$| + ^airflow-core/tests/unit/dag_processing/test_processor.py$| ^dev/airflow_perf/scheduler_dag_execution_timing.py$| ^providers/celery/.*\.py$| ^providers/cncf/kubernetes/.*\.py$| diff --git a/airflow-core/tests/unit/dag_processing/test_processor.py b/airflow-core/tests/unit/dag_processing/test_processor.py index 34e4f61941f8a..1348a428d5fcb 100644 --- a/airflow-core/tests/unit/dag_processing/test_processor.py +++ b/airflow-core/tests/unit/dag_processing/test_processor.py @@ -31,6 +31,7 @@ import pytest import structlog from pydantic import TypeAdapter +from sqlalchemy import select from structlog.typing import FilteringBoundLogger from airflow._shared.timezones import timezone @@ -247,7 +248,7 @@ def dag_in_a_fn(): assert result.import_errors == {} assert result.serialized_dags[0].dag_id == "test_myvalue" - all_vars = session.query(VariableORM).all() + all_vars = session.scalars(select(VariableORM)).all() assert len(all_vars) == 1 assert all_vars[0].key == "mykey" @@ -291,7 +292,7 @@ def dag_in_a_fn(): assert result.import_errors == {} assert result.serialized_dags[0].dag_id == "not-found" - all_vars = session.query(VariableORM).all() + all_vars = session.scalars(select(VariableORM)).all() assert len(all_vars) == 0 def test_top_level_connection_access( From e36d409c66688b00848d57c8164422421de0b8a2 Mon Sep 17 00:00:00 2001 From: Kunal Bhattacharya Date: Wed, 24 Dec 2025 12:33:42 +0530 Subject: [PATCH 5/5] Review fix --- .../tests/unit/dag_processing/test_manager.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/airflow-core/tests/unit/dag_processing/test_manager.py b/airflow-core/tests/unit/dag_processing/test_manager.py index 9813074be5373..5402ab20492de 100644 --- a/airflow-core/tests/unit/dag_processing/test_manager.py +++ b/airflow-core/tests/unit/dag_processing/test_manager.py @@ -485,32 +485,32 @@ def test_scan_stale_dags(self, session): manager._files = [test_dag_path] manager._file_stats[test_dag_path] = stat - active_dag_count = session.scalars( + active_dag_count = session.scalar( select(func.count(DagModel.dag_id)).where( ~DagModel.is_stale, DagModel.relative_fileloc == str(test_dag_path.rel_path), DagModel.bundle_name == test_dag_path.bundle_name, ) - ).all() - assert active_dag_count == [1] + ) + assert active_dag_count == 1 manager._scan_stale_dags() - active_dag_count = session.scalars( + active_dag_count = session.scalar( select(func.count(DagModel.dag_id)).where( ~DagModel.is_stale, DagModel.relative_fileloc == str(test_dag_path.rel_path), DagModel.bundle_name == test_dag_path.bundle_name, ) - ).all() - assert active_dag_count == [0] + ) + assert active_dag_count == 0 - serialized_dag_count = session.scalars( + serialized_dag_count = session.scalar( select(func.count(SerializedDagModel.dag_id)).where(SerializedDagModel.dag_id == dag.dag_id) - ).all() + ) # Deactivating the DagModel should not delete the SerializedDagModel # SerializedDagModel gives history about Dags - assert serialized_dag_count == [1] + assert serialized_dag_count == 1 def test_kill_timed_out_processors_kill(self): manager = DagFileProcessorManager(max_runs=1, processor_timeout=5)