diff --git a/airflow/cli/cli_config.py b/airflow/cli/cli_config.py
index cd2a25b074c80..82b00f0ff2d55 100644
--- a/airflow/cli/cli_config.py
+++ b/airflow/cli/cli_config.py
@@ -691,6 +691,12 @@ def string_lower_type(val):
action="store_true",
default=False,
)
+ARG_DB_USE_MIGRATION_FILES = Arg(
+ ("-m", "--use-migration-files"),
+ help="Use migration files to perform migration",
+ action="store_true",
+ default=False,
+)
# webserver
ARG_PORT = Arg(
@@ -1525,7 +1531,7 @@ class GroupCommand(NamedTuple):
name="reset",
help="Burn down and rebuild the metadata database",
func=lazy_load_command("airflow.cli.commands.db_command.resetdb"),
- args=(ARG_YES, ARG_DB_SKIP_INIT, ARG_VERBOSE),
+ args=(ARG_YES, ARG_DB_SKIP_INIT, ARG_DB_USE_MIGRATION_FILES, ARG_VERBOSE),
),
ActionCommand(
name="upgrade",
@@ -1545,6 +1551,7 @@ class GroupCommand(NamedTuple):
ARG_DB_FROM_REVISION,
ARG_DB_FROM_VERSION,
ARG_DB_RESERIALIZE_DAGS,
+ ARG_DB_USE_MIGRATION_FILES,
ARG_VERBOSE,
),
hide=True,
@@ -1568,6 +1575,7 @@ class GroupCommand(NamedTuple):
ARG_DB_FROM_REVISION,
ARG_DB_FROM_VERSION,
ARG_DB_RESERIALIZE_DAGS,
+ ARG_DB_USE_MIGRATION_FILES,
ARG_VERBOSE,
),
),
diff --git a/airflow/cli/commands/db_command.py b/airflow/cli/commands/db_command.py
index 776dabcdead7e..737dfa1ed469b 100644
--- a/airflow/cli/commands/db_command.py
+++ b/airflow/cli/commands/db_command.py
@@ -63,7 +63,7 @@ def resetdb(args):
print(f"DB: {settings.engine.url!r}")
if not (args.yes or input("This will drop existing tables if they exist. Proceed? (y/n)").upper() == "Y"):
raise SystemExit("Cancelled")
- db.resetdb(skip_init=args.skip_init)
+ db.resetdb(skip_init=args.skip_init, use_migration_files=args.use_migration_files)
def upgradedb(args):
@@ -142,6 +142,7 @@ def migratedb(args):
from_revision=from_revision,
show_sql_only=args.show_sql_only,
reserialize_dags=args.reserialize_dags,
+ use_migration_files=args.use_migration_files,
)
if not args.show_sql_only:
print("Database migrating done!")
diff --git a/airflow/migrations/env.py b/airflow/migrations/env.py
index c521654c1c709..7a5854bb06ece 100644
--- a/airflow/migrations/env.py
+++ b/airflow/migrations/env.py
@@ -29,6 +29,9 @@
def include_object(_, name, type_, *args):
"""Filter objects for autogenerating revisions."""
+ # Ignore the sqlite_sequence table, which is an internal SQLite construct
+ if name == "sqlite_sequence":
+ return False
# Ignore _anything_ to do with Celery, or FlaskSession's tables
if type_ == "table" and (name.startswith("celery_") or name == "session"):
return False
diff --git a/airflow/migrations/versions/0142_2_9_2_fix_inconsistency_between_ORM_and_migration_files.py b/airflow/migrations/versions/0142_2_9_2_fix_inconsistency_between_ORM_and_migration_files.py
new file mode 100644
index 0000000000000..3d07b67e3a27b
--- /dev/null
+++ b/airflow/migrations/versions/0142_2_9_2_fix_inconsistency_between_ORM_and_migration_files.py
@@ -0,0 +1,297 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Fix inconsistency between ORM and migration files.
+
+Revision ID: 686269002441
+Revises: bff083ad727d
+Create Date: 2024-04-15 14:19:49.913797
+
+"""
+
+from __future__ import annotations
+
+import sqlalchemy as sa
+from alembic import op
+from sqlalchemy import literal
+
+# revision identifiers, used by Alembic.
+revision = "686269002441"
+down_revision = "bff083ad727d"
+branch_labels = None
+depends_on = None
+airflow_version = "2.9.2"
+
+
+def upgrade():
+ """Apply Update missing constraints."""
+ conn = op.get_bind()
+ if conn.dialect.name == "mysql":
+ # TODO: Rewrite these queries to use alembic when lowest MYSQL version supports IF EXISTS
+ conn.execute(
+ sa.text("""
+ set @var=if((SELECT true FROM information_schema.TABLE_CONSTRAINTS WHERE
+ CONSTRAINT_SCHEMA = DATABASE() AND
+ TABLE_NAME = 'connection' AND
+ CONSTRAINT_NAME = 'unique_conn_id' AND
+ CONSTRAINT_TYPE = 'UNIQUE') = true,'ALTER TABLE connection
+ drop constraint unique_conn_id','select 1');
+
+ prepare stmt from @var;
+ execute stmt;
+ deallocate prepare stmt;
+ """)
+ )
+ # Dropping the below and recreating cause there's no IF NOT EXISTS in mysql
+ conn.execute(
+ sa.text("""
+ set @var=if((SELECT true FROM information_schema.TABLE_CONSTRAINTS WHERE
+ CONSTRAINT_SCHEMA = DATABASE() AND
+ TABLE_NAME = 'connection' AND
+ CONSTRAINT_NAME = 'connection_conn_id_uq' AND
+ CONSTRAINT_TYPE = 'UNIQUE') = true,'ALTER TABLE connection
+ drop constraint connection_conn_id_uq','select 1');
+
+ prepare stmt from @var;
+ execute stmt;
+ deallocate prepare stmt;
+ """)
+ )
+ elif conn.dialect.name == "sqlite":
+ # SQLite does not support DROP CONSTRAINT
+ # We have to recreate the table without the constraint
+ conn.execute(sa.text("PRAGMA foreign_keys=off"))
+ conn.execute(
+ sa.text("""
+ CREATE TABLE connection_new (
+ id INTEGER NOT NULL,
+ conn_id VARCHAR(250) NOT NULL,
+ conn_type VARCHAR(500) NOT NULL,
+ host VARCHAR(500),
+ schema VARCHAR(500),
+ login TEXT,
+ password TEXT,
+ port INTEGER,
+ extra TEXT,
+ is_encrypted BOOLEAN,
+ is_extra_encrypted BOOLEAN,
+ description VARCHAR(5000),
+ CONSTRAINT connection_pkey PRIMARY KEY (id),
+ CONSTRAINT connection_conn_id_uq UNIQUE (conn_id)
+ )
+ """)
+ )
+ conn.execute(sa.text("INSERT INTO connection_new SELECT * FROM connection"))
+ conn.execute(sa.text("DROP TABLE connection"))
+ conn.execute(sa.text("ALTER TABLE connection_new RENAME TO connection"))
+ conn.execute(sa.text("PRAGMA foreign_keys=on"))
+ else:
+ op.execute("ALTER TABLE connection DROP CONSTRAINT IF EXISTS unique_conn_id")
+ # Dropping and recreating cause there's no IF NOT EXISTS
+ op.execute("ALTER TABLE connection DROP CONSTRAINT IF EXISTS connection_conn_id_uq")
+
+ with op.batch_alter_table("connection") as batch_op:
+ batch_op.create_unique_constraint(batch_op.f("connection_conn_id_uq"), ["conn_id"])
+
+ max_cons = sa.table("dag", sa.column("max_consecutive_failed_dag_runs"))
+ op.execute(max_cons.update().values(max_consecutive_failed_dag_runs=literal("0")))
+ with op.batch_alter_table("dag") as batch_op:
+ batch_op.alter_column("max_consecutive_failed_dag_runs", existing_type=sa.Integer(), nullable=False)
+
+ with op.batch_alter_table("task_instance") as batch_op:
+ batch_op.drop_constraint("task_instance_dag_run_fkey", type_="foreignkey")
+
+ with op.batch_alter_table("task_reschedule") as batch_op:
+ batch_op.drop_constraint("task_reschedule_dr_fkey", type_="foreignkey")
+
+ if conn.dialect.name == "mysql":
+ conn.execute(
+ sa.text("""
+ set @var=if((SELECT true FROM information_schema.TABLE_CONSTRAINTS WHERE
+ CONSTRAINT_SCHEMA = DATABASE() AND
+ TABLE_NAME = 'dag_run' AND
+ CONSTRAINT_NAME = 'dag_run_dag_id_execution_date_uq' AND
+ CONSTRAINT_TYPE = 'UNIQUE') = true,'ALTER TABLE dag_run
+ drop constraint dag_run_dag_id_execution_date_uq','select 1');
+
+ prepare stmt from @var;
+ execute stmt;
+ deallocate prepare stmt;
+ """)
+ )
+ conn.execute(
+ sa.text("""
+ set @var=if((SELECT true FROM information_schema.TABLE_CONSTRAINTS WHERE
+ CONSTRAINT_SCHEMA = DATABASE() AND
+ TABLE_NAME = 'dag_run' AND
+ CONSTRAINT_NAME = 'dag_run_dag_id_run_id_uq' AND
+ CONSTRAINT_TYPE = 'UNIQUE') = true,'ALTER TABLE dag_run
+ drop constraint dag_run_dag_id_run_id_uq','select 1');
+
+ prepare stmt from @var;
+ execute stmt;
+ deallocate prepare stmt;
+ """)
+ )
+ # below we drop and recreate the constraints because there's no IF NOT EXISTS
+ conn.execute(
+ sa.text("""
+ set @var=if((SELECT true FROM information_schema.TABLE_CONSTRAINTS WHERE
+ CONSTRAINT_SCHEMA = DATABASE() AND
+ TABLE_NAME = 'dag_run' AND
+ CONSTRAINT_NAME = 'dag_run_dag_id_execution_date_key' AND
+ CONSTRAINT_TYPE = 'UNIQUE') = true,'ALTER TABLE dag_run
+ drop constraint dag_run_dag_id_execution_date_key','select 1');
+
+ prepare stmt from @var;
+ execute stmt;
+ deallocate prepare stmt;
+ """)
+ )
+ conn.execute(
+ sa.text("""
+ set @var=if((SELECT true FROM information_schema.TABLE_CONSTRAINTS WHERE
+ CONSTRAINT_SCHEMA = DATABASE() AND
+ TABLE_NAME = 'dag_run' AND
+ CONSTRAINT_NAME = 'dag_run_dag_id_run_id_key' AND
+ CONSTRAINT_TYPE = 'UNIQUE') = true,'ALTER TABLE dag_run
+ drop constraint dag_run_dag_id_run_id_key','select 1');
+
+ prepare stmt from @var;
+ execute stmt;
+ deallocate prepare stmt;
+ """)
+ )
+ with op.batch_alter_table("callback_request", schema=None) as batch_op:
+ batch_op.alter_column(
+ "processor_subdir",
+ existing_type=sa.Text(length=2000),
+ type_=sa.String(length=2000),
+ existing_nullable=True,
+ )
+
+ with op.batch_alter_table("dag", schema=None) as batch_op:
+ batch_op.alter_column(
+ "processor_subdir",
+ existing_type=sa.Text(length=2000),
+ type_=sa.String(length=2000),
+ existing_nullable=True,
+ )
+
+ with op.batch_alter_table("import_error", schema=None) as batch_op:
+ batch_op.alter_column(
+ "processor_subdir",
+ existing_type=sa.Text(length=2000),
+ type_=sa.String(length=2000),
+ existing_nullable=True,
+ )
+
+ with op.batch_alter_table("serialized_dag", schema=None) as batch_op:
+ batch_op.alter_column(
+ "processor_subdir",
+ existing_type=sa.Text(length=2000),
+ type_=sa.String(length=2000),
+ existing_nullable=True,
+ )
+
+ elif conn.dialect.name == "sqlite":
+ # SQLite does not support DROP CONSTRAINT
+ # We have to recreate the table without the constraint
+ conn.execute(sa.text("PRAGMA foreign_keys=off"))
+ conn.execute(
+ sa.text("""
+ CREATE TABLE dag_run_new (
+ id INTEGER NOT NULL,
+ dag_id VARCHAR(250) NOT NULL,
+ queued_at TIMESTAMP,
+ execution_date TIMESTAMP NOT NULL,
+ start_date TIMESTAMP,
+ end_date TIMESTAMP,
+ state VARCHAR(50),
+ run_id VARCHAR(250) NOT NULL,
+ creating_job_id INTEGER,
+ external_trigger BOOLEAN,
+ run_type VARCHAR(50) NOT NULL,
+ conf BLOB,
+ data_interval_start TIMESTAMP,
+ data_interval_end TIMESTAMP,
+ last_scheduling_decision TIMESTAMP,
+ dag_hash VARCHAR(32),
+ log_template_id INTEGER,
+ updated_at TIMESTAMP,
+ clear_number INTEGER DEFAULT '0' NOT NULL,
+ CONSTRAINT dag_run_pkey PRIMARY KEY (id),
+ CONSTRAINT dag_run_dag_id_execution_date_key UNIQUE (dag_id, execution_date),
+ CONSTRAINT dag_run_dag_id_run_id_key UNIQUE (dag_id, run_id),
+ CONSTRAINT task_instance_log_template_id_fkey FOREIGN KEY(log_template_id) REFERENCES log_template (id) ON DELETE NO ACTION
+ )
+ """)
+ )
+
+ conn.execute(sa.text("INSERT INTO dag_run_new SELECT * FROM dag_run"))
+ conn.execute(sa.text("DROP TABLE dag_run"))
+ conn.execute(sa.text("ALTER TABLE dag_run_new RENAME TO dag_run"))
+ conn.execute(sa.text("PRAGMA foreign_keys=on"))
+ with op.batch_alter_table("dag_run") as batch_op:
+ batch_op.create_index("dag_id_state", ["dag_id", "state"], if_not_exists=True)
+ batch_op.create_index("idx_dag_run_dag_id", ["dag_id"], if_not_exists=True)
+ batch_op.create_index(
+ "idx_dag_run_running_dags",
+ ["state", "dag_id"],
+ sqlite_where=sa.text("state='running'"),
+ if_not_exists=True,
+ )
+ batch_op.create_index(
+ "idx_dag_run_queued_dags",
+ ["state", "dag_id"],
+ sqlite_where=sa.text("state='queued'"),
+ if_not_exists=True,
+ )
+
+ else:
+ op.execute("ALTER TABLE dag_run DROP CONSTRAINT IF EXISTS dag_run_dag_id_execution_date_uq")
+ op.execute("ALTER TABLE dag_run DROP CONSTRAINT IF EXISTS dag_run_dag_id_run_id_uq")
+ # below we drop and recreate the constraints because there's no IF NOT EXISTS
+ op.execute("ALTER TABLE dag_run DROP CONSTRAINT IF EXISTS dag_run_dag_id_execution_date_key")
+ op.execute("ALTER TABLE dag_run DROP CONSTRAINT IF EXISTS dag_run_dag_id_run_id_key")
+
+ with op.batch_alter_table("dag_run") as batch_op:
+ batch_op.create_unique_constraint("dag_run_dag_id_execution_date_key", ["dag_id", "execution_date"])
+ batch_op.create_unique_constraint("dag_run_dag_id_run_id_key", ["dag_id", "run_id"])
+
+ with op.batch_alter_table("task_instance") as batch_op:
+ batch_op.create_foreign_key(
+ "task_instance_dag_run_fkey",
+ "dag_run",
+ ["dag_id", "run_id"],
+ ["dag_id", "run_id"],
+ ondelete="CASCADE",
+ )
+
+ with op.batch_alter_table("task_reschedule") as batch_op:
+ batch_op.create_foreign_key(
+ "task_reschedule_dr_fkey",
+ "dag_run",
+ ["dag_id", "run_id"],
+ ["dag_id", "run_id"],
+ ondelete="CASCADE",
+ )
+
+
+def downgrade():
+ """NO downgrade because this is to make ORM consistent with the database."""
diff --git a/airflow/migrations/versions/0142_2_10_0_add_new_executor_field_to_db.py b/airflow/migrations/versions/0143_2_10_0_add_new_executor_field_to_db.py
similarity index 96%
rename from airflow/migrations/versions/0142_2_10_0_add_new_executor_field_to_db.py
rename to airflow/migrations/versions/0143_2_10_0_add_new_executor_field_to_db.py
index 8fda89da849ff..5adf5dbb330ec 100644
--- a/airflow/migrations/versions/0142_2_10_0_add_new_executor_field_to_db.py
+++ b/airflow/migrations/versions/0143_2_10_0_add_new_executor_field_to_db.py
@@ -19,7 +19,7 @@
"""add new executor field to db.
Revision ID: 677fdbb7fc54
-Revises: 1949afb29106
+Revises: 686269002441
Create Date: 2024-04-01 15:26:59.186579
"""
@@ -31,7 +31,7 @@
# revision identifiers, used by Alembic.
revision = "677fdbb7fc54"
-down_revision = "bff083ad727d"
+down_revision = "686269002441"
branch_labels = None
depends_on = None
airflow_version = "2.10.0"
diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py
index 77679f8d9a881..117dd59e288ba 100644
--- a/airflow/models/dagrun.py
+++ b/airflow/models/dagrun.py
@@ -147,7 +147,7 @@ class DagRun(Base, LoggingMixin):
# Keeps track of the number of times the dagrun had been cleared.
# This number is incremented only when the DagRun is re-Queued,
# when the DagRun is cleared.
- clear_number = Column(Integer, default=0, nullable=False)
+ clear_number = Column(Integer, default=0, nullable=False, server_default="0")
# Remove this `if` after upgrading Sphinx-AutoAPI
if not TYPE_CHECKING and "BUILDING_AIRFLOW_DOCS" in os.environ:
diff --git a/airflow/utils/db.py b/airflow/utils/db.py
index af53c241ed6c0..e20836f0e2feb 100644
--- a/airflow/utils/db.py
+++ b/airflow/utils/db.py
@@ -91,7 +91,7 @@
"2.8.0": "10b52ebd31f7",
"2.8.1": "88344c1d9134",
"2.9.0": "1949afb29106",
- "2.9.2": "bff083ad727d",
+ "2.9.2": "686269002441",
"2.10.0": "677fdbb7fc54",
}
@@ -744,13 +744,13 @@ def _create_flask_session_tbl(sql_database_uri):
@provide_session
-def initdb(session: Session = NEW_SESSION, load_connections: bool = True):
+def initdb(session: Session = NEW_SESSION, load_connections: bool = True, use_migration_files: bool = False):
"""Initialize Airflow database."""
import_all_models()
db_exists = _get_current_revision(session)
- if db_exists:
- upgradedb(session=session)
+ if db_exists or use_migration_files:
+ upgradedb(session=session, use_migration_files=use_migration_files)
else:
_create_db_from_orm(session=session)
if conf.getboolean("database", "LOAD_DEFAULT_CONNECTIONS") and load_connections:
@@ -1558,6 +1558,7 @@ def upgradedb(
show_sql_only: bool = False,
reserialize_dags: bool = True,
session: Session = NEW_SESSION,
+ use_migration_files: bool = False,
):
"""
Upgrades the DB.
@@ -1614,7 +1615,7 @@ def upgradedb(
if errors_seen:
exit(1)
- if not to_revision and not _get_current_revision(session=session):
+ if not to_revision and not _get_current_revision(session=session) and not use_migration_files:
# Don't load default connections
# New DB; initialize and exit
initdb(session=session, load_connections=False)
@@ -1644,7 +1645,7 @@ def upgradedb(
@provide_session
-def resetdb(session: Session = NEW_SESSION, skip_init: bool = False):
+def resetdb(session: Session = NEW_SESSION, skip_init: bool = False, use_migration_files: bool = False):
"""Clear out the database."""
if not settings.engine:
raise RuntimeError("The settings.engine must be set. This is a critical assertion")
@@ -1659,7 +1660,7 @@ def resetdb(session: Session = NEW_SESSION, skip_init: bool = False):
drop_airflow_moved_tables(connection)
if not skip_init:
- initdb(session=session)
+ initdb(session=session, use_migration_files=use_migration_files)
@provide_session
diff --git a/docs/apache-airflow/img/airflow_erd.sha256 b/docs/apache-airflow/img/airflow_erd.sha256
index 132655e3b39f8..464463bb3f4b0 100644
--- a/docs/apache-airflow/img/airflow_erd.sha256
+++ b/docs/apache-airflow/img/airflow_erd.sha256
@@ -1 +1 @@
-e816aa2004ece1f8e07f6c5978a690ae7176b81f7d9df89115f3b65dab1c5231
\ No newline at end of file
+45e8aa557ff6c5c69995915206df2fef17495d5da04453e897cea73a8df0f492
\ No newline at end of file
diff --git a/docs/apache-airflow/img/airflow_erd.svg b/docs/apache-airflow/img/airflow_erd.svg
index fb280ee0ea7fc..7225b0ce3ef15 100644
--- a/docs/apache-airflow/img/airflow_erd.svg
+++ b/docs/apache-airflow/img/airflow_erd.svg
@@ -1421,14 +1421,14 @@
task_instance--xcom
-1
+0..N
1
task_instance--xcom
-0..N
+1
1
diff --git a/docs/apache-airflow/migrations-ref.rst b/docs/apache-airflow/migrations-ref.rst
index 0961b87f395ee..eb01d6aec39ed 100644
--- a/docs/apache-airflow/migrations-ref.rst
+++ b/docs/apache-airflow/migrations-ref.rst
@@ -39,7 +39,9 @@ Here's the list of all the Database Migrations that are executed via when you ru
+---------------------------------+-------------------+-------------------+--------------------------------------------------------------+
| Revision ID | Revises ID | Airflow Version | Description |
+=================================+===================+===================+==============================================================+
-| ``677fdbb7fc54`` (head) | ``bff083ad727d`` | ``2.10.0`` | add new executor field to db. |
+| ``677fdbb7fc54`` (head) | ``686269002441`` | ``2.10.0`` | add new executor field to db. |
++---------------------------------+-------------------+-------------------+--------------------------------------------------------------+
+| ``686269002441`` | ``bff083ad727d`` | ``2.9.2`` | Fix inconsistency between ORM and migration files. |
+---------------------------------+-------------------+-------------------+--------------------------------------------------------------+
| ``bff083ad727d`` | ``1949afb29106`` | ``2.9.2`` | Remove ``idx_last_scheduling_decision`` index on |
| | | | last_scheduling_decision in dag_run table. |
diff --git a/tests/cli/commands/test_db_command.py b/tests/cli/commands/test_db_command.py
index 3321887287b1a..352292bd61dce 100644
--- a/tests/cli/commands/test_db_command.py
+++ b/tests/cli/commands/test_db_command.py
@@ -46,12 +46,17 @@ def test_cli_initdb(self, mock_initdb):
def test_cli_resetdb(self, mock_resetdb):
db_command.resetdb(self.parser.parse_args(["db", "reset", "--yes"]))
- mock_resetdb.assert_called_once_with(skip_init=False)
+ mock_resetdb.assert_called_once_with(skip_init=False, use_migration_files=False)
@mock.patch("airflow.cli.commands.db_command.db.resetdb")
def test_cli_resetdb_skip_init(self, mock_resetdb):
db_command.resetdb(self.parser.parse_args(["db", "reset", "--yes", "--skip-init"]))
- mock_resetdb.assert_called_once_with(skip_init=True)
+ mock_resetdb.assert_called_once_with(skip_init=True, use_migration_files=False)
+
+ @mock.patch("airflow.cli.commands.db_command.db.resetdb")
+ def test_cli_resetdb_use_migration_files(self, mock_resetdb):
+ db_command.resetdb(self.parser.parse_args(["db", "reset", "--yes", "--use-migration-files"]))
+ mock_resetdb.assert_called_once_with(skip_init=False, use_migration_files=True)
@mock.patch("airflow.cli.commands.db_command.db.check_migrations")
def test_cli_check_migrations(self, mock_wait_for_migrations):
@@ -62,36 +67,73 @@ def test_cli_check_migrations(self, mock_wait_for_migrations):
@pytest.mark.parametrize(
"args, called_with",
[
- ([], dict(to_revision=None, from_revision=None, show_sql_only=False)),
- (["--show-sql-only"], dict(to_revision=None, from_revision=None, show_sql_only=True)),
- (["--to-revision", "abc"], dict(to_revision="abc", from_revision=None, show_sql_only=False)),
+ ([], dict(to_revision=None, from_revision=None, show_sql_only=False, use_migration_files=False)),
+ (
+ ["--show-sql-only"],
+ dict(to_revision=None, from_revision=None, show_sql_only=True, use_migration_files=False),
+ ),
+ (
+ ["--to-revision", "abc"],
+ dict(to_revision="abc", from_revision=None, show_sql_only=False, use_migration_files=False),
+ ),
(
["--to-revision", "abc", "--show-sql-only"],
- dict(to_revision="abc", from_revision=None, show_sql_only=True),
+ dict(to_revision="abc", from_revision=None, show_sql_only=True, use_migration_files=False),
),
(
["--to-version", "2.2.2"],
- dict(to_revision="7b2661a43ba3", from_revision=None, show_sql_only=False),
+ dict(
+ to_revision="7b2661a43ba3",
+ from_revision=None,
+ show_sql_only=False,
+ use_migration_files=False,
+ ),
),
(
["--to-version", "2.2.2", "--show-sql-only"],
- dict(to_revision="7b2661a43ba3", from_revision=None, show_sql_only=True),
+ dict(
+ to_revision="7b2661a43ba3",
+ from_revision=None,
+ show_sql_only=True,
+ use_migration_files=False,
+ ),
),
(
["--to-revision", "abc", "--from-revision", "abc123", "--show-sql-only"],
- dict(to_revision="abc", from_revision="abc123", show_sql_only=True),
+ dict(
+ to_revision="abc", from_revision="abc123", show_sql_only=True, use_migration_files=False
+ ),
),
(
["--to-revision", "abc", "--from-version", "2.2.2", "--show-sql-only"],
- dict(to_revision="abc", from_revision="7b2661a43ba3", show_sql_only=True),
+ dict(
+ to_revision="abc",
+ from_revision="7b2661a43ba3",
+ show_sql_only=True,
+ use_migration_files=False,
+ ),
),
(
["--to-version", "2.2.4", "--from-revision", "abc123", "--show-sql-only"],
- dict(to_revision="587bdf053233", from_revision="abc123", show_sql_only=True),
+ dict(
+ to_revision="587bdf053233",
+ from_revision="abc123",
+ show_sql_only=True,
+ use_migration_files=False,
+ ),
),
(
["--to-version", "2.2.4", "--from-version", "2.2.2", "--show-sql-only"],
- dict(to_revision="587bdf053233", from_revision="7b2661a43ba3", show_sql_only=True),
+ dict(
+ to_revision="587bdf053233",
+ from_revision="7b2661a43ba3",
+ show_sql_only=True,
+ use_migration_files=False,
+ ),
+ ),
+ (
+ ["--use-migration-files", "--show-sql-only"],
+ dict(to_revision=None, from_revision=None, use_migration_files=True, show_sql_only=True),
),
],
)
diff --git a/tests/conftest.py b/tests/conftest.py
index 32dbc7ec3ea5f..e0aab5551c34c 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -324,7 +324,7 @@ def initial_db_init():
from airflow.www.extensions.init_appbuilder import init_appbuilder
from airflow.www.extensions.init_auth_manager import get_auth_manager
- db.resetdb()
+ db.resetdb(use_migration_files=True)
db.bootstrap_dagbag()
# minimal app to add roles
flask_app = Flask(__name__)
diff --git a/tests/utils/test_db.py b/tests/utils/test_db.py
index 657f5fbc7cc20..0b4347f589d22 100644
--- a/tests/utils/test_db.py
+++ b/tests/utils/test_db.py
@@ -84,6 +84,7 @@ def test_database_schema_and_sqlalchemy_model_are_in_sync(self):
# Ignore flask-session table/index
lambda t: (t[0] == "remove_table" and t[1].name == "session"),
lambda t: (t[0] == "remove_index" and t[1].name == "session_id"),
+ lambda t: (t[0] == "remove_index" and t[1].name == "session_session_id_uq"),
# sqlite sequence is used for autoincrementing columns created with `sqlite_autoincrement` option
lambda t: (t[0] == "remove_table" and t[1].name == "sqlite_sequence"),
]
@@ -233,7 +234,7 @@ def test_resetdb(
if skip_init:
mock_init.assert_not_called()
else:
- mock_init.assert_called_once_with(session=session_mock)
+ mock_init.assert_called_once_with(session=session_mock, use_migration_files=False)
def test_alembic_configuration(self):
with mock.patch.dict(