From 4412095bc4b50992e6d31afb60784332a3ae0265 Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Thu, 8 May 2025 11:03:11 +0100 Subject: [PATCH 1/3] Raise exception if downgrade can't proceed due to no `ab_user` table We only log the error that the downgrade can't proceed if the `ab_user` table is not found, but we should raise an exception instead. Currently, the CI migrations don't exit on error, which prevents the necessary tests from running. Also updated the CI migration tests to export DB manager at the shell because it seems that's not happening, hence why downgrade is running correctly. --- .github/actions/migration_tests/action.yml | 21 +++++++++++++-------- airflow-core/src/airflow/utils/db.py | 3 +-- airflow-core/tests/unit/utils/test_db.py | 13 ++++++++----- 3 files changed, 22 insertions(+), 15 deletions(-) diff --git a/.github/actions/migration_tests/action.yml b/.github/actions/migration_tests/action.yml index ed71e21407d10..933c9a847a1c4 100644 --- a/.github/actions/migration_tests/action.yml +++ b/.github/actions/migration_tests/action.yml @@ -25,11 +25,12 @@ runs: shell: bash run: | breeze shell "${{ env.AIRFLOW_2_CMD }}" --use-airflow-version 2.10.5 --answer y && - breeze shell "${{ env.AIRFLOW_3_CMD }}" --no-db-cleanup + breeze shell "export AIRFLOW__DATABASE__EXTERNAL_DB_MANAGERS=${{env.DB_MANGERS}} + ${{ env.AIRFLOW_3_CMD }}" --no-db-cleanup env: COMPOSE_PROJECT_NAME: "docker-compose" - AIRFLOW__DATABASE__EXTERNAL_DB_MANAGERS: "airflow.providers.fab.auth_manager.models.db.FABDBManager" DB_RESET: "false" + DB_MANAGERS: "airflow.providers.fab.auth_manager.models.db.FABDBManager" AIRFLOW_2_CMD: >- airflow db reset --skip-init -y && airflow db migrate --to-revision heads @@ -47,11 +48,12 @@ runs: shell: bash run: > breeze shell "${{ env.AIRFLOW_2_CMD }}" --use-airflow-version 2.10.5 --answer y && - breeze shell "${{ env.AIRFLOW_3_CMD }}" --no-db-cleanup + breeze shell "export AIRFLOW__DATABASE__EXTERNAL_DB_MANAGERS=${{env.DB_MANGERS}} + ${{ env.AIRFLOW_3_CMD }}" --no-db-cleanup env: COMPOSE_PROJECT_NAME: "docker-compose" - AIRFLOW__DATABASE__EXTERNAL_DB_MANAGERS: "airflow.providers.fab.auth_manager.models.db.FABDBManager" DB_RESET: "false" + DB_MANAGERS: "airflow.providers.fab.auth_manager.models.db.FABDBManager" AIRFLOW_2_CMD: >- airflow db reset -y AIRFLOW_3_CMD: >- @@ -67,13 +69,14 @@ runs: - name: "Test ORM migration ${{env.BACKEND}}" shell: bash run: > - breeze shell "airflow db reset -y && + breeze shell "export AIRFLOW__DATABASE__EXTERNAL_DB_MANAGERS=${{env.DB_MANAGERS}} && + airflow db reset -y && airflow db migrate --to-revision heads && airflow db downgrade -n 2.7.0 -y && airflow db migrate" env: COMPOSE_PROJECT_NAME: "docker-compose" - AIRFLOW__DATABASE__EXTERNAL_DB_MANAGERS: "airflow.providers.fab.auth_manager.models.db.FABDBManager" + DB_MANAGERS: "airflow.providers.fab.auth_manager.models.db.FABDBManager" - name: "Bring compose down again" shell: bash run: breeze down @@ -82,12 +85,14 @@ runs: - name: "Test offline migration ${{env.BACKEND}}" shell: bash run: > - breeze shell "airflow db reset -y && + breeze shell + "export AIRFLOW__DATABASE__EXTERNAL_DB_MANAGERS=${{env.DB_MANAGERS}} && + airflow db reset -y && airflow db downgrade -n 2.7.0 -y && airflow db migrate -s" env: COMPOSE_PROJECT_NAME: "docker-compose" - AIRFLOW__DATABASE__EXTERNAL_DB_MANAGERS: "airflow.providers.fab.auth_manager.models.db.FABDBManager" + DB_MANAGERS: "airflow.providers.fab.auth_manager.models.db.FABDBManager" if: env.BACKEND != 'sqlite' - name: "Bring any containers left down" shell: bash diff --git a/airflow-core/src/airflow/utils/db.py b/airflow-core/src/airflow/utils/db.py index c7853adfabfbf..cc15b4da01f3c 100644 --- a/airflow-core/src/airflow/utils/db.py +++ b/airflow-core/src/airflow/utils/db.py @@ -1229,11 +1229,10 @@ def downgrade(*, to_revision, from_revision=None, show_sql_only=False, session: log.warning("Import error occurred while importing FABDBManager. Skipping the check.") pass if not inspect(settings.engine).has_table("ab_user"): - log.error( + raise AirflowException( "Downgrade to revision less than 3.0.0 requires that `ab_user` table is present. " "Please add FabDBManager to [core] external_db_managers and run fab migrations before proceeding" ) - return with create_global_lock(session=session, lock=DBLocks.MIGRATIONS): if show_sql_only: log.warning("Generating sql scripts for manual migration.") diff --git a/airflow-core/tests/unit/utils/test_db.py b/airflow-core/tests/unit/utils/test_db.py index 05186d573f0d9..98aca99f1108a 100644 --- a/airflow-core/tests/unit/utils/test_db.py +++ b/airflow-core/tests/unit/utils/test_db.py @@ -34,6 +34,7 @@ from alembic.script import ScriptDirectory from sqlalchemy import Column, Integer, MetaData, Table, select +from airflow.exceptions import AirflowException from airflow.models import Base as airflow_base from airflow.providers.fab.auth_manager.models.db import FABDBManager from airflow.settings import engine @@ -295,10 +296,12 @@ def scalar(self, stmt): @conf_vars({("core", "unit_test_mode"): "False"}) @mock.patch("airflow.utils.db.inspect") - def test_upgradedb_raises_if_lower_than_v3_0_0(self, mock_inspect, caplog): + def test_downgrade_raises_if_lower_than_v3_0_0_and_no_ab_user(self, mock_inspect, caplog): mock_inspect.return_value.has_table.return_value = False - downgrade(to_revision=_REVISION_HEADS_MAP["2.7.0"]) - assert ( + msg = ( "Downgrade to revision less than 3.0.0 requires that `ab_user` table is present. " - "Please add FabDBManager to [core] external_db_managers and run fab migrations before proceeding" - ) in caplog.text + "Please add FabDBManager to [core] external_db_managers and run fab migrations before " + "proceeding" + ) + with pytest.raises(AirflowException, match=re.escape(msg)): + downgrade(to_revision=_REVISION_HEADS_MAP["2.7.0"]) From e582b6ffc3a030eb321413645cadc477f4e68126 Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Thu, 8 May 2025 12:33:21 +0100 Subject: [PATCH 2/3] result can be none so check before accessing row_count --- airflow-core/docs/img/airflow_erd.sha256 | 2 +- airflow-core/docs/img/airflow_erd.svg | 108 +++++++++--------- ..._add_uuid_primary_key_to_task_instance_.py | 4 +- 3 files changed, 58 insertions(+), 56 deletions(-) diff --git a/airflow-core/docs/img/airflow_erd.sha256 b/airflow-core/docs/img/airflow_erd.sha256 index f92146f77a88f..da3765115e31d 100644 --- a/airflow-core/docs/img/airflow_erd.sha256 +++ b/airflow-core/docs/img/airflow_erd.sha256 @@ -1 +1 @@ -1b5221fa589cfc8652242b95f24d218c632168238933b82b533321a217c2447f \ No newline at end of file +066cb891884eea1ee0496b5c507d4a52c20d0440387f9ec8bacb1d616a26e40e \ No newline at end of file diff --git a/airflow-core/docs/img/airflow_erd.svg b/airflow-core/docs/img/airflow_erd.svg index 7ce02c7187b1a..879c9f17f903e 100644 --- a/airflow-core/docs/img/airflow_erd.svg +++ b/airflow-core/docs/img/airflow_erd.svg @@ -1482,35 +1482,81 @@ {0,1} - + dag--dag_schedule_asset_alias_reference 0..N 1 - + dag--dag_schedule_asset_reference 0..N 1 - + dag--task_outlet_asset_reference 0..N 1 - + dag--asset_dag_run_queue 0..N 1 - + +dag_version + +dag_version + +id + + [UUID] + NOT NULL + +bundle_name + + [VARCHAR(250)] + +bundle_version + + [VARCHAR(250)] + +created_at + + [TIMESTAMP] + NOT NULL + +dag_id + + [VARCHAR(250)] + NOT NULL + +last_updated + + [TIMESTAMP] + NOT NULL + +version_number + + [INTEGER] + NOT NULL + + + +dag--dag_version + +0..N +1 + + + dag_schedule_asset_name_reference dag_schedule_asset_name_reference @@ -1531,14 +1577,14 @@ NOT NULL - + dag--dag_schedule_asset_name_reference 0..N 1 - + dag_schedule_asset_uri_reference dag_schedule_asset_uri_reference @@ -1559,58 +1605,12 @@ NOT NULL - + dag--dag_schedule_asset_uri_reference 0..N 1 - - -dag_version - -dag_version - -id - - [UUID] - NOT NULL - -bundle_name - - [VARCHAR(250)] - -bundle_version - - [VARCHAR(250)] - -created_at - - [TIMESTAMP] - NOT NULL - -dag_id - - [VARCHAR(250)] - NOT NULL - -last_updated - - [TIMESTAMP] - NOT NULL - -version_number - - [INTEGER] - NOT NULL - - - -dag--dag_version - -0..N -1 - dag_tag diff --git a/airflow-core/src/airflow/migrations/versions/0042_3_0_0_add_uuid_primary_key_to_task_instance_.py b/airflow-core/src/airflow/migrations/versions/0042_3_0_0_add_uuid_primary_key_to_task_instance_.py index 3f4d5c4e926f1..ccd0697ff63ab 100644 --- a/airflow-core/src/airflow/migrations/versions/0042_3_0_0_add_uuid_primary_key_to_task_instance_.py +++ b/airflow-core/src/airflow/migrations/versions/0042_3_0_0_add_uuid_primary_key_to_task_instance_.py @@ -223,7 +223,9 @@ def upgrade(): """ ).bindparams(batch_size=batch_size) ) - row_count = result.rowcount + row_count = 0 + if result: + row_count = result.rowcount if row_count == 0: break print(f"Migrated {row_count} task_instance rows in this batch...") From e8cc34b501056af341f5ed076390f224128d1a33 Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Thu, 8 May 2025 14:20:01 +0100 Subject: [PATCH 3/3] dont raise in unittest mode --- airflow-core/src/airflow/utils/db.py | 7 ++++--- airflow-core/tests/unit/utils/test_db.py | 2 +- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/airflow-core/src/airflow/utils/db.py b/airflow-core/src/airflow/utils/db.py index cc15b4da01f3c..c1ecef8f52f26 100644 --- a/airflow-core/src/airflow/utils/db.py +++ b/airflow-core/src/airflow/utils/db.py @@ -1219,7 +1219,8 @@ def downgrade(*, to_revision, from_revision=None, show_sql_only=False, session: config = _get_alembic_config() # Check if downgrade is less than 3.0.0 and requires that `ab_user` fab table is present if _revision_greater(config, _REVISION_HEADS_MAP["3.0.0"], to_revision): - if conf.getboolean("core", "unit_test_mode"): + unitest_mode = conf.getboolean("core", "unit_test_mode") + if unitest_mode: try: from airflow.providers.fab.auth_manager.models.db import FABDBManager @@ -1227,8 +1228,8 @@ def downgrade(*, to_revision, from_revision=None, show_sql_only=False, session: dbm.initdb() except ImportError: log.warning("Import error occurred while importing FABDBManager. Skipping the check.") - pass - if not inspect(settings.engine).has_table("ab_user"): + return + if not inspect(settings.engine).has_table("ab_user") and not unitest_mode: raise AirflowException( "Downgrade to revision less than 3.0.0 requires that `ab_user` table is present. " "Please add FabDBManager to [core] external_db_managers and run fab migrations before proceeding" diff --git a/airflow-core/tests/unit/utils/test_db.py b/airflow-core/tests/unit/utils/test_db.py index 98aca99f1108a..cf69275025330 100644 --- a/airflow-core/tests/unit/utils/test_db.py +++ b/airflow-core/tests/unit/utils/test_db.py @@ -296,7 +296,7 @@ def scalar(self, stmt): @conf_vars({("core", "unit_test_mode"): "False"}) @mock.patch("airflow.utils.db.inspect") - def test_downgrade_raises_if_lower_than_v3_0_0_and_no_ab_user(self, mock_inspect, caplog): + def test_downgrade_raises_if_lower_than_v3_0_0_and_no_ab_user(self, mock_inspect): mock_inspect.return_value.has_table.return_value = False msg = ( "Downgrade to revision less than 3.0.0 requires that `ab_user` table is present. "