diff --git a/.github/actions/migration_tests/action.yml b/.github/actions/migration_tests/action.yml index 34951bac5ffe0..ee8888e5f7446 100644 --- a/.github/actions/migration_tests/action.yml +++ b/.github/actions/migration_tests/action.yml @@ -24,9 +24,11 @@ runs: - name: "Test migration file 2 to 3 migration: ${{env.BACKEND}}" shell: bash run: | - breeze shell "${AIRFLOW_2_CMD}" --use-airflow-version 2.11.0 --answer y && - breeze shell "export AIRFLOW__DATABASE__EXTERNAL_DB_MANAGERS=${DB_MANGERS} - ${AIRFLOW_3_CMD}" --no-db-cleanup + breeze shell "export _AIRFLOW__SKIP_DATABASE_EXECUTOR_COMPATIBILITY_CHECK='1' && + ${AIRFLOW_2_CMD}" --use-airflow-version 2.11.0 --answer y && + breeze shell "export _AIRFLOW__SKIP_DATABASE_EXECUTOR_COMPATIBILITY_CHECK='1' && + export AIRFLOW__DATABASE__EXTERNAL_DB_MANAGERS=${env.DB_MANGERS} && + ${AIRFLOW_3_CMD}" --no-db-cleanup env: COMPOSE_PROJECT_NAME: "docker-compose" DB_RESET: "false" @@ -38,7 +40,6 @@ runs: airflow db migrate --to-revision heads && airflow db downgrade -n 2.7.0 -y && airflow db migrate - if: env.BACKEND != 'sqlite' - name: "Bring composer down" shell: bash run: breeze down @@ -47,9 +48,11 @@ runs: - name: "Test ORM migration 2 to 3: ${{env.BACKEND}}" shell: bash run: > - breeze shell "${AIRFLOW_2_CMD}" --use-airflow-version 2.11.0 --answer y && - breeze shell "export AIRFLOW__DATABASE__EXTERNAL_DB_MANAGERS=${DB_MANGERS} - ${AIRFLOW_3_CMD}" --no-db-cleanup + breeze shell "export _AIRFLOW__SKIP_DATABASE_EXECUTOR_COMPATIBILITY_CHECK='1' && + ${AIRFLOW_2_CMD}" --use-airflow-version 2.11.0 --answer y && + breeze shell "export _AIRFLOW__SKIP_DATABASE_EXECUTOR_COMPATIBILITY_CHECK='1' && + export AIRFLOW__DATABASE__EXTERNAL_DB_MANAGERS=${env.DB_MANGERS} && + ${AIRFLOW_3_CMD}" --no-db-cleanup env: COMPOSE_PROJECT_NAME: "docker-compose" DB_RESET: "false" @@ -60,7 +63,6 @@ runs: airflow db migrate --to-revision heads && airflow db downgrade -n 2.7.0 -y && airflow db migrate - if: env.BACKEND != 'sqlite' - name: "Bring compose down again" shell: bash run: breeze down @@ -69,7 +71,8 @@ runs: - name: "Test ORM migration ${{env.BACKEND}}" shell: bash run: > - breeze shell "export AIRFLOW__DATABASE__EXTERNAL_DB_MANAGERS=${DB_MANAGERS} && + breeze shell "export _AIRFLOW__SKIP_DATABASE_EXECUTOR_COMPATIBILITY_CHECK='1' && + export AIRFLOW__DATABASE__EXTERNAL_DB_MANAGERS=${env.DB_MANAGERS} && airflow db reset -y && airflow db migrate --to-revision heads && airflow db downgrade -n 2.7.0 -y && @@ -85,15 +88,14 @@ runs: - name: "Test offline migration ${{env.BACKEND}}" shell: bash run: > - breeze shell - "export AIRFLOW__DATABASE__EXTERNAL_DB_MANAGERS=${DB_MANAGERS} && + breeze shell "export _AIRFLOW__SKIP_DATABASE_EXECUTOR_COMPATIBILITY_CHECK='1' && + export AIRFLOW__DATABASE__EXTERNAL_DB_MANAGERS=${env.DB_MANAGERS} && airflow db reset -y && airflow db downgrade -n 2.7.0 -y && airflow db migrate -s" env: COMPOSE_PROJECT_NAME: "docker-compose" DB_MANAGERS: "airflow.providers.fab.auth_manager.models.db.FABDBManager" - if: env.BACKEND != 'sqlite' - name: "Bring any containers left down" shell: bash run: breeze down diff --git a/airflow-core/docs/img/airflow_erd.sha256 b/airflow-core/docs/img/airflow_erd.sha256 index e935d2a08ab18..3b697226de6a2 100644 --- a/airflow-core/docs/img/airflow_erd.sha256 +++ b/airflow-core/docs/img/airflow_erd.sha256 @@ -1 +1 @@ -2e49ab99fe1076b0f3f22a52b9ee37eeb7fc20a5a043ea504cc26022f4315277 \ No newline at end of file +8e298c007f5604f2ddf278ee60f5a622f5a9c76220a55134805f74fbfadadfa3 \ No newline at end of file diff --git a/airflow-core/docs/img/airflow_erd.svg b/airflow-core/docs/img/airflow_erd.svg index 2f9f9b4becc5e..7801cd7329893 100644 --- a/airflow-core/docs/img/airflow_erd.svg +++ b/airflow-core/docs/img/airflow_erd.svg @@ -4,11 +4,11 @@ - - + + %3 - + dag_priority_parsing_request @@ -692,24 +692,24 @@ dagrun_asset_event - -dagrun_asset_event - -dag_run_id - - [INTEGER] - NOT NULL - -event_id - - [INTEGER] - NOT NULL + +dagrun_asset_event + +dag_run_id + + [INTEGER] + NOT NULL + +event_id + + [INTEGER] + NOT NULL asset_event--dagrun_asset_event - -0..N + +0..N 1 @@ -752,699 +752,699 @@ task_instance - -task_instance - -id - - [UUID] - NOT NULL - -context_carrier - - [JSONB] - -custom_operator_name - - [VARCHAR(1000)] - -dag_id - - [VARCHAR(250)] - NOT NULL - -dag_version_id - - [UUID] - NOT NULL - -duration - - [DOUBLE_PRECISION] - -end_date - - [TIMESTAMP] - -executor - - [VARCHAR(1000)] - -executor_config - - [BYTEA] - -external_executor_id - - [VARCHAR(250)] - -hostname - - [VARCHAR(1000)] - -last_heartbeat_at - - [TIMESTAMP] - -map_index - - [INTEGER] - NOT NULL - -max_tries - - [INTEGER] - -next_kwargs - - [JSONB] - -next_method - - [VARCHAR(1000)] - -operator - - [VARCHAR(1000)] - -pid - - [INTEGER] - -pool - - [VARCHAR(256)] - NOT NULL - -pool_slots - - [INTEGER] - NOT NULL - -priority_weight - - [INTEGER] - -queue - - [VARCHAR(256)] - -queued_by_job_id - - [INTEGER] - -queued_dttm - - [TIMESTAMP] - -rendered_map_index - - [VARCHAR(250)] - -run_id - - [VARCHAR(250)] - NOT NULL - -scheduled_dttm - - [TIMESTAMP] - -span_status - - [VARCHAR(250)] - NOT NULL - -start_date - - [TIMESTAMP] - -state - - [VARCHAR(20)] - -task_display_name - - [VARCHAR(2000)] - -task_id - - [VARCHAR(250)] - NOT NULL - -trigger_id - - [INTEGER] - -trigger_timeout - - [TIMESTAMP] - -try_number - - [INTEGER] - -unixname - - [VARCHAR(1000)] - -updated_at - - [TIMESTAMP] + +task_instance + +id + + [UUID] + NOT NULL + +context_carrier + + [JSONB] + +custom_operator_name + + [VARCHAR(1000)] + +dag_id + + [VARCHAR(250)] + NOT NULL + +dag_version_id + + [UUID] + NOT NULL + +duration + + [DOUBLE_PRECISION] + +end_date + + [TIMESTAMP] + +executor + + [VARCHAR(1000)] + +executor_config + + [BYTEA] + +external_executor_id + + [VARCHAR(250)] + +hostname + + [VARCHAR(1000)] + +last_heartbeat_at + + [TIMESTAMP] + +map_index + + [INTEGER] + NOT NULL + +max_tries + + [INTEGER] + +next_kwargs + + [JSONB] + +next_method + + [VARCHAR(1000)] + +operator + + [VARCHAR(1000)] + +pid + + [INTEGER] + +pool + + [VARCHAR(256)] + NOT NULL + +pool_slots + + [INTEGER] + NOT NULL + +priority_weight + + [INTEGER] + +queue + + [VARCHAR(256)] + +queued_by_job_id + + [INTEGER] + +queued_dttm + + [TIMESTAMP] + +rendered_map_index + + [VARCHAR(250)] + +run_id + + [VARCHAR(250)] + NOT NULL + +scheduled_dttm + + [TIMESTAMP] + +span_status + + [VARCHAR(250)] + NOT NULL + +start_date + + [TIMESTAMP] + +state + + [VARCHAR(20)] + +task_display_name + + [VARCHAR(2000)] + +task_id + + [VARCHAR(250)] + NOT NULL + +trigger_id + + [INTEGER] + +trigger_timeout + + [TIMESTAMP] + +try_number + + [INTEGER] + +unixname + + [VARCHAR(1000)] + +updated_at + + [TIMESTAMP] trigger--task_instance - -0..N + +0..N {0,1} - + -task_map - -task_map - -dag_id - - [VARCHAR(250)] - NOT NULL - -map_index - - [INTEGER] - NOT NULL - -run_id - - [VARCHAR(250)] - NOT NULL - -task_id - - [VARCHAR(250)] - NOT NULL - -keys - - [JSONB] - -length - - [INTEGER] - NOT NULL +hitl_detail + +hitl_detail + +ti_id + + [UUID] + NOT NULL + +body + + [TEXT] + +chosen_options + + [JSON] + +defaults + + [JSON] + +multiple + + [BOOLEAN] + +options + + [JSON] + NOT NULL + +params + + [JSON] + NOT NULL + +params_input + + [JSON] + NOT NULL + +response_at + + [TIMESTAMP] + +subject + + [TEXT] + NOT NULL + +user_id + + [VARCHAR(128)] - + -task_instance--task_map - -0..N -1 +task_instance--hitl_detail + +1 +1 + + + +task_map + +task_map + +dag_id + + [VARCHAR(250)] + NOT NULL + +map_index + + [INTEGER] + NOT NULL + +run_id + + [VARCHAR(250)] + NOT NULL + +task_id + + [VARCHAR(250)] + NOT NULL + +keys + + [JSONB] + +length + + [INTEGER] + NOT NULL task_instance--task_map - -0..N -1 + +0..N +1 task_instance--task_map - -0..N -1 + +0..N +1 task_instance--task_map - -0..N -1 + +0..N +1 + + + +task_instance--task_map + +0..N +1 - + task_reschedule - -task_reschedule - -id - - [INTEGER] - NOT NULL - -duration - - [INTEGER] - NOT NULL - -end_date - - [TIMESTAMP] - NOT NULL - -reschedule_date - - [TIMESTAMP] - NOT NULL - -start_date - - [TIMESTAMP] - NOT NULL - -ti_id - - [UUID] - NOT NULL + +task_reschedule + +id + + [INTEGER] + NOT NULL + +duration + + [INTEGER] + NOT NULL + +end_date + + [TIMESTAMP] + NOT NULL + +reschedule_date + + [TIMESTAMP] + NOT NULL + +start_date + + [TIMESTAMP] + NOT NULL + +ti_id + + [UUID] + NOT NULL - + task_instance--task_reschedule - -0..N -1 + +0..N +1 - + xcom - -xcom - -dag_run_id - - [INTEGER] - NOT NULL - -key - - [VARCHAR(512)] - NOT NULL - -map_index - - [INTEGER] - NOT NULL - -task_id - - [VARCHAR(250)] - NOT NULL - -dag_id - - [VARCHAR(250)] - NOT NULL - -run_id - - [VARCHAR(250)] - NOT NULL - -timestamp - - [TIMESTAMP] - NOT NULL - -value - - [JSONB] - - - -task_instance--xcom - -0..N -1 + +xcom + +dag_run_id + + [INTEGER] + NOT NULL + +key + + [VARCHAR(512)] + NOT NULL + +map_index + + [INTEGER] + NOT NULL + +task_id + + [VARCHAR(250)] + NOT NULL + +dag_id + + [VARCHAR(250)] + NOT NULL + +run_id + + [VARCHAR(250)] + NOT NULL + +timestamp + + [TIMESTAMP] + NOT NULL + +value + + [JSONB] task_instance--xcom - -0..N -1 + +0..N +1 task_instance--xcom - -0..N -1 + +0..N +1 task_instance--xcom - -0..N -1 + +0..N +1 + + + +task_instance--xcom + +0..N +1 - + task_instance_note - -task_instance_note - -ti_id - - [UUID] - NOT NULL - -content - - [VARCHAR(1000)] - -created_at - - [TIMESTAMP] - NOT NULL - -updated_at - - [TIMESTAMP] - NOT NULL - -user_id - - [VARCHAR(128)] + +task_instance_note + +ti_id + + [UUID] + NOT NULL + +content + + [VARCHAR(1000)] + +created_at + + [TIMESTAMP] + NOT NULL + +updated_at + + [TIMESTAMP] + NOT NULL + +user_id + + [VARCHAR(128)] - + task_instance--task_instance_note - -1 -1 + +1 +1 - + task_instance_history - -task_instance_history - -task_instance_id - - [UUID] - NOT NULL - -context_carrier - - [JSONB] - -custom_operator_name - - [VARCHAR(1000)] - -dag_id - - [VARCHAR(250)] - NOT NULL - -dag_version_id - - [UUID] - -duration - - [DOUBLE_PRECISION] - -end_date - - [TIMESTAMP] - -executor - - [VARCHAR(1000)] - -executor_config - - [BYTEA] - -external_executor_id - - [VARCHAR(250)] - -hostname - - [VARCHAR(1000)] - -map_index - - [INTEGER] - NOT NULL - -max_tries - - [INTEGER] - -next_kwargs - - [JSONB] - -next_method - - [VARCHAR(1000)] - -operator - - [VARCHAR(1000)] - -pid - - [INTEGER] - -pool - - [VARCHAR(256)] - NOT NULL - -pool_slots - - [INTEGER] - NOT NULL - -priority_weight - - [INTEGER] - -queue - - [VARCHAR(256)] - -queued_by_job_id - - [INTEGER] - -queued_dttm - - [TIMESTAMP] - -rendered_map_index - - [VARCHAR(250)] - -run_id - - [VARCHAR(250)] - NOT NULL - -scheduled_dttm - - [TIMESTAMP] - -span_status - - [VARCHAR(250)] - NOT NULL - -start_date - - [TIMESTAMP] - -state - - [VARCHAR(20)] - -task_display_name - - [VARCHAR(2000)] - -task_id - - [VARCHAR(250)] - NOT NULL - -trigger_id - - [INTEGER] - -trigger_timeout - - [TIMESTAMP] - -try_number - - [INTEGER] - NOT NULL - -unixname - - [VARCHAR(1000)] - -updated_at - - [TIMESTAMP] - - - -task_instance--task_instance_history - -0..N -1 + +task_instance_history + +task_instance_id + + [UUID] + NOT NULL + +context_carrier + + [JSONB] + +custom_operator_name + + [VARCHAR(1000)] + +dag_id + + [VARCHAR(250)] + NOT NULL + +dag_version_id + + [UUID] + +duration + + [DOUBLE_PRECISION] + +end_date + + [TIMESTAMP] + +executor + + [VARCHAR(1000)] + +executor_config + + [BYTEA] + +external_executor_id + + [VARCHAR(250)] + +hostname + + [VARCHAR(1000)] + +map_index + + [INTEGER] + NOT NULL + +max_tries + + [INTEGER] + +next_kwargs + + [JSONB] + +next_method + + [VARCHAR(1000)] + +operator + + [VARCHAR(1000)] + +pid + + [INTEGER] + +pool + + [VARCHAR(256)] + NOT NULL + +pool_slots + + [INTEGER] + NOT NULL + +priority_weight + + [INTEGER] + +queue + + [VARCHAR(256)] + +queued_by_job_id + + [INTEGER] + +queued_dttm + + [TIMESTAMP] + +rendered_map_index + + [VARCHAR(250)] + +run_id + + [VARCHAR(250)] + NOT NULL + +scheduled_dttm + + [TIMESTAMP] + +span_status + + [VARCHAR(250)] + NOT NULL + +start_date + + [TIMESTAMP] + +state + + [VARCHAR(20)] + +task_display_name + + [VARCHAR(2000)] + +task_id + + [VARCHAR(250)] + NOT NULL + +trigger_id + + [INTEGER] + +trigger_timeout + + [TIMESTAMP] + +try_number + + [INTEGER] + NOT NULL + +unixname + + [VARCHAR(1000)] + +updated_at + + [TIMESTAMP] task_instance--task_instance_history - -0..N -1 + +0..N +1 task_instance--task_instance_history - -0..N -1 + +0..N +1 task_instance--task_instance_history - -0..N -1 + +0..N +1 - - -hitl_detail - -hitl_detail - -ti_id - - [UUID] - NOT NULL - -body - - [TEXT] - -chosen_options - - [JSON] - -defaults - - [JSON] - -multiple - - [BOOLEAN] - -options - - [JSON] - NOT NULL - -params - - [JSON] - NOT NULL - -params_input - - [JSON] - NOT NULL - -response_at - - [TIMESTAMP] - -subject - - [TEXT] - NOT NULL - -user_id - - [VARCHAR(128)] - - + -task_instance--hitl_detail - -1 -1 +task_instance--task_instance_history + +0..N +1 rendered_task_instance_fields - -rendered_task_instance_fields - -dag_id - - [VARCHAR(250)] - NOT NULL - -map_index - - [INTEGER] - NOT NULL - -run_id - - [VARCHAR(250)] - NOT NULL - -task_id - - [VARCHAR(250)] - NOT NULL - -k8s_pod_yaml - - [JSON] - -rendered_fields - - [JSON] - NOT NULL + +rendered_task_instance_fields + +dag_id + + [VARCHAR(250)] + NOT NULL + +map_index + + [INTEGER] + NOT NULL + +run_id + + [VARCHAR(250)] + NOT NULL + +task_id + + [VARCHAR(250)] + NOT NULL + +k8s_pod_yaml + + [JSON] + +rendered_fields + + [JSON] + NOT NULL task_instance--rendered_task_instance_fields - -0..N -1 + +0..N +1 task_instance--rendered_task_instance_fields - -0..N -1 + +0..N +1 task_instance--rendered_task_instance_fields - -0..N -1 + +0..N +1 task_instance--rendered_task_instance_fields - -0..N -1 + +0..N +1 @@ -1833,173 +1833,173 @@ deadline - -deadline - -id - - [UUID] - NOT NULL - -callback - - [VARCHAR(500)] - NOT NULL - -callback_kwargs - - [JSON] - -dag_id - - [VARCHAR(250)] - -dagrun_id - - [INTEGER] - -deadline_time - - [TIMESTAMP] - NOT NULL + +deadline + +id + + [UUID] + NOT NULL + +callback + + [VARCHAR(500)] + NOT NULL + +callback_kwargs + + [JSON] + +dag_id + + [VARCHAR(250)] + +dagrun_id + + [INTEGER] + +deadline_time + + [TIMESTAMP] + NOT NULL dag--deadline - -0..N + +0..N {0,1} dag_version--task_instance - -0..N + +0..N 1 dag_run - -dag_run - -id - - [INTEGER] - NOT NULL - -backfill_id - - [INTEGER] - -bundle_version - - [VARCHAR(250)] - -clear_number - - [INTEGER] - NOT NULL - -conf - - [JSONB] - -context_carrier - - [JSONB] - -created_dag_version_id - - [UUID] - -creating_job_id - - [INTEGER] - -dag_id - - [VARCHAR(250)] - NOT NULL - -data_interval_end - - [TIMESTAMP] - -data_interval_start - - [TIMESTAMP] - -end_date - - [TIMESTAMP] - -last_scheduling_decision - - [TIMESTAMP] - -log_template_id - - [INTEGER] - -logical_date - - [TIMESTAMP] - -queued_at - - [TIMESTAMP] - -run_after - - [TIMESTAMP] - NOT NULL - -run_id - - [VARCHAR(250)] - NOT NULL - -run_type - - [VARCHAR(50)] - NOT NULL - -scheduled_by_job_id - - [INTEGER] - -span_status - - [VARCHAR(250)] - NOT NULL - -start_date - - [TIMESTAMP] - -state - - [VARCHAR(50)] - -triggered_by - - [VARCHAR(50)] - -triggering_user_name - - [VARCHAR(512)] - -updated_at - - [TIMESTAMP] + +dag_run + +id + + [INTEGER] + NOT NULL + +backfill_id + + [INTEGER] + +bundle_version + + [VARCHAR(250)] + +clear_number + + [INTEGER] + NOT NULL + +conf + + [JSONB] + +context_carrier + + [JSONB] + +created_dag_version_id + + [UUID] + +creating_job_id + + [INTEGER] + +dag_id + + [VARCHAR(250)] + NOT NULL + +data_interval_end + + [TIMESTAMP] + +data_interval_start + + [TIMESTAMP] + +end_date + + [TIMESTAMP] + +last_scheduling_decision + + [TIMESTAMP] + +log_template_id + + [INTEGER] + +logical_date + + [TIMESTAMP] + +queued_at + + [TIMESTAMP] + +run_after + + [TIMESTAMP] + NOT NULL + +run_id + + [VARCHAR(250)] + NOT NULL + +run_type + + [VARCHAR(50)] + NOT NULL + +scheduled_by_job_id + + [INTEGER] + +span_status + + [VARCHAR(250)] + NOT NULL + +start_date + + [TIMESTAMP] + +state + + [VARCHAR(50)] + +triggered_by + + [VARCHAR(50)] + +triggering_user_name + + [VARCHAR(512)] + +updated_at + + [TIMESTAMP] dag_version--dag_run - -0..N -{0,1} + +0..N +{0,1} @@ -2108,107 +2108,107 @@ dag_run--dagrun_asset_event - -0..N -1 + +0..N +1 - + dag_run--task_instance - -0..N -1 + +0..N +1 - + dag_run--task_instance - -0..N -1 + +0..N +1 dag_run--deadline - -0..N -{0,1} + +0..N +{0,1} backfill_dag_run - -backfill_dag_run - -id - - [INTEGER] - NOT NULL - -backfill_id - - [INTEGER] - NOT NULL - -dag_run_id - - [INTEGER] - -exception_reason - - [VARCHAR(250)] - -logical_date - - [TIMESTAMP] - NOT NULL - -sort_ordinal - - [INTEGER] - NOT NULL + +backfill_dag_run + +id + + [INTEGER] + NOT NULL + +backfill_id + + [INTEGER] + NOT NULL + +dag_run_id + + [INTEGER] + +exception_reason + + [VARCHAR(250)] + +logical_date + + [TIMESTAMP] + NOT NULL + +sort_ordinal + + [INTEGER] + NOT NULL - + dag_run--backfill_dag_run - -0..N -{0,1} + +0..N +{0,1} dag_run_note - -dag_run_note - -dag_run_id - - [INTEGER] - NOT NULL - -content - - [VARCHAR(1000)] - -created_at - - [TIMESTAMP] - NOT NULL - -updated_at - - [TIMESTAMP] - NOT NULL - -user_id - - [VARCHAR(128)] + +dag_run_note + +dag_run_id + + [INTEGER] + NOT NULL + +content + + [VARCHAR(1000)] + +created_at + + [TIMESTAMP] + NOT NULL + +updated_at + + [TIMESTAMP] + NOT NULL + +user_id + + [VARCHAR(128)] dag_run--dag_run_note - -1 -1 + +1 +1 @@ -2239,9 +2239,9 @@ log_template--dag_run - -0..N -{0,1} + +0..N +{0,1} @@ -2309,15 +2309,15 @@ backfill--dag_run - -0..N -{0,1} + +0..N +{0,1} backfill--backfill_dag_run - -0..N + +0..N 1 diff --git a/airflow-core/src/airflow/migrations/utils.py b/airflow-core/src/airflow/migrations/utils.py index 2dbbbece01a57..957ca0a949083 100644 --- a/airflow-core/src/airflow/migrations/utils.py +++ b/airflow-core/src/airflow/migrations/utils.py @@ -52,13 +52,13 @@ def get_mssql_table_constraints(conn, table_name) -> dict[str, dict[str, list[st @contextmanager -def disable_sqlite_fkeys(op): - if op.get_bind().dialect.name == "sqlite": - op.execute("PRAGMA foreign_keys=off") - yield op - op.execute("PRAGMA foreign_keys=on") +def disable_sqlite_fkeys(conn): + if conn.dialect.name == "sqlite": + conn.execute(text("PRAGMA foreign_keys=off")) + yield conn + conn.execute(text("PRAGMA foreign_keys=on")) else: - yield op + yield conn def mysql_drop_foreignkey_if_exists(constraint_name, table_name, op): diff --git a/airflow-core/src/airflow/migrations/versions/0017_2_9_2_fix_inconsistency_between_ORM_and_migration_files.py b/airflow-core/src/airflow/migrations/versions/0017_2_9_2_fix_inconsistency_between_ORM_and_migration_files.py index fa24916df6faa..0aa329b20f796 100644 --- a/airflow-core/src/airflow/migrations/versions/0017_2_9_2_fix_inconsistency_between_ORM_and_migration_files.py +++ b/airflow-core/src/airflow/migrations/versions/0017_2_9_2_fix_inconsistency_between_ORM_and_migration_files.py @@ -31,6 +31,8 @@ from alembic import op from sqlalchemy import literal +from airflow.migrations.utils import disable_sqlite_fkeys + # revision identifiers, used by Alembic. revision = "686269002441" down_revision = "bff083ad727d" @@ -76,31 +78,30 @@ def upgrade(): elif conn.dialect.name == "sqlite": # SQLite does not support DROP CONSTRAINT # We have to recreate the table without the constraint - conn.execute(sa.text("PRAGMA foreign_keys=off")) - conn.execute( - sa.text(""" - CREATE TABLE connection_new ( - id INTEGER NOT NULL, - conn_id VARCHAR(250) NOT NULL, - conn_type VARCHAR(500) NOT NULL, - host VARCHAR(500), - schema VARCHAR(500), - login TEXT, - password TEXT, - port INTEGER, - extra TEXT, - is_encrypted BOOLEAN, - is_extra_encrypted BOOLEAN, - description VARCHAR(5000), - CONSTRAINT connection_pkey PRIMARY KEY (id), - CONSTRAINT connection_conn_id_uq UNIQUE (conn_id) - ) - """) - ) - conn.execute(sa.text("INSERT INTO connection_new SELECT * FROM connection")) - conn.execute(sa.text("DROP TABLE connection")) - conn.execute(sa.text("ALTER TABLE connection_new RENAME TO connection")) - conn.execute(sa.text("PRAGMA foreign_keys=on")) + with disable_sqlite_fkeys(conn): + conn.execute( + sa.text(""" + CREATE TABLE connection_new ( + id INTEGER NOT NULL, + conn_id VARCHAR(250) NOT NULL, + conn_type VARCHAR(500) NOT NULL, + host VARCHAR(500), + schema VARCHAR(500), + login TEXT, + password TEXT, + port INTEGER, + extra TEXT, + is_encrypted BOOLEAN, + is_extra_encrypted BOOLEAN, + description VARCHAR(5000), + CONSTRAINT connection_pkey PRIMARY KEY (id), + CONSTRAINT connection_conn_id_uq UNIQUE (conn_id) + ) + """) + ) + conn.execute(sa.text("INSERT INTO connection_new SELECT * FROM connection")) + conn.execute(sa.text("DROP TABLE connection")) + conn.execute(sa.text("ALTER TABLE connection_new RENAME TO connection")) else: op.execute("ALTER TABLE connection DROP CONSTRAINT IF EXISTS unique_conn_id") # Dropping and recreating cause there's no IF NOT EXISTS @@ -213,45 +214,45 @@ def upgrade(): elif conn.dialect.name == "sqlite": # SQLite does not support DROP CONSTRAINT # We have to recreate the table without the constraint - conn.execute(sa.text("PRAGMA foreign_keys=off")) - conn.execute( - sa.text(""" - CREATE TABLE dag_run_new ( - id INTEGER NOT NULL, - dag_id VARCHAR(250) NOT NULL, - queued_at TIMESTAMP, - execution_date TIMESTAMP NOT NULL, - start_date TIMESTAMP, - end_date TIMESTAMP, - state VARCHAR(50), - run_id VARCHAR(250) NOT NULL, - creating_job_id INTEGER, - external_trigger BOOLEAN, - run_type VARCHAR(50) NOT NULL, - conf BLOB, - data_interval_start TIMESTAMP, - data_interval_end TIMESTAMP, - last_scheduling_decision TIMESTAMP, - dag_hash VARCHAR(32), - log_template_id INTEGER, - updated_at TIMESTAMP, - clear_number INTEGER DEFAULT '0' NOT NULL, - CONSTRAINT dag_run_pkey PRIMARY KEY (id), - CONSTRAINT dag_run_dag_id_execution_date_key UNIQUE (dag_id, execution_date), - CONSTRAINT dag_run_dag_id_run_id_key UNIQUE (dag_id, run_id), - CONSTRAINT task_instance_log_template_id_fkey FOREIGN KEY(log_template_id) REFERENCES log_template (id) ON DELETE NO ACTION + with disable_sqlite_fkeys(conn): + conn.execute( + sa.text(""" + CREATE TABLE dag_run_new ( + id INTEGER NOT NULL, + dag_id VARCHAR(250) NOT NULL, + queued_at TIMESTAMP, + execution_date TIMESTAMP NOT NULL, + start_date TIMESTAMP, + end_date TIMESTAMP, + state VARCHAR(50), + run_id VARCHAR(250) NOT NULL, + creating_job_id INTEGER, + external_trigger BOOLEAN, + run_type VARCHAR(50) NOT NULL, + conf BLOB, + data_interval_start TIMESTAMP, + data_interval_end TIMESTAMP, + last_scheduling_decision TIMESTAMP, + dag_hash VARCHAR(32), + log_template_id INTEGER, + updated_at TIMESTAMP, + clear_number INTEGER DEFAULT '0' NOT NULL, + CONSTRAINT dag_run_pkey PRIMARY KEY (id), + CONSTRAINT dag_run_dag_id_execution_date_key UNIQUE (dag_id, execution_date), + CONSTRAINT dag_run_dag_id_run_id_key UNIQUE (dag_id, run_id), + CONSTRAINT task_instance_log_template_id_fkey FOREIGN KEY(log_template_id) REFERENCES log_template (id) ON DELETE NO ACTION + ) + """) ) - """) - ) - headers = ( - "id, dag_id, queued_at, execution_date, start_date, end_date, state, run_id, creating_job_id, " - "external_trigger, run_type, conf, data_interval_start, data_interval_end, " - "last_scheduling_decision, dag_hash, log_template_id, updated_at, clear_number" - ) - conn.execute(sa.text(f"INSERT INTO dag_run_new ({headers}) SELECT {headers} FROM dag_run")) - conn.execute(sa.text("DROP TABLE dag_run")) - conn.execute(sa.text("ALTER TABLE dag_run_new RENAME TO dag_run")) - conn.execute(sa.text("PRAGMA foreign_keys=on")) + headers = ( + "id, dag_id, queued_at, execution_date, start_date, end_date, state, run_id, creating_job_id, " + "external_trigger, run_type, conf, data_interval_start, data_interval_end, " + "last_scheduling_decision, dag_hash, log_template_id, updated_at, clear_number" + ) + conn.execute(sa.text(f"INSERT INTO dag_run_new ({headers}) SELECT {headers} FROM dag_run")) + conn.execute(sa.text("DROP TABLE dag_run")) + conn.execute(sa.text("ALTER TABLE dag_run_new RENAME TO dag_run")) + with op.batch_alter_table("dag_run") as batch_op: batch_op.create_index("dag_id_state", ["dag_id", "state"], if_not_exists=True) batch_op.create_index("idx_dag_run_dag_id", ["dag_id"], if_not_exists=True) diff --git a/airflow-core/src/airflow/utils/db.py b/airflow-core/src/airflow/utils/db.py index c99972a8b1e96..07765c6c703f6 100644 --- a/airflow-core/src/airflow/utils/db.py +++ b/airflow-core/src/airflow/utils/db.py @@ -1066,7 +1066,7 @@ def downgrade(*, to_revision, from_revision=None, show_sql_only=False, session: except ImportError: log.warning("Import error occurred while importing FABDBManager. Skipping the check.") return - if not inspect(settings.engine).has_table("ab_user") and not unitest_mode: + if "ab_user" not in inspect(settings.engine).get_table_names() and not unitest_mode: raise AirflowException( "Downgrade to revision less than 3.0.0 requires that `ab_user` table is present. " "Please add FabDBManager to [core] external_db_managers and run fab migrations before proceeding"