diff --git a/airflow/migrations/env.py b/airflow/migrations/env.py index 4be8ac32ca252..9bf9dfec0054b 100644 --- a/airflow/migrations/env.py +++ b/airflow/migrations/env.py @@ -19,6 +19,7 @@ import contextlib import sys +from logging import getLogger from logging.config import fileConfig from alembic import context @@ -48,7 +49,8 @@ def include_object(_, name, type_, *args): # Interpret the config file for Python logging. # This line sets up loggers basically. -fileConfig(config.config_file_name, disable_existing_loggers=False) +if not getLogger().handlers: + fileConfig(config.config_file_name, disable_existing_loggers=False) # add your model's MetaData object here # for 'autogenerate' support diff --git a/airflow/utils/db.py b/airflow/utils/db.py index 34a265fc004be..2c82f5f1948b7 100644 --- a/airflow/utils/db.py +++ b/airflow/utils/db.py @@ -749,6 +749,7 @@ def _get_flask_db(sql_database_uri): def _create_db_from_orm(session): + log.info("Creating Airflow database tables from the ORM") from alembic import command from airflow.models.base import Base @@ -764,6 +765,7 @@ def _create_flask_session_tbl(sql_database_uri): # stamp the migration head config = _get_alembic_config() command.stamp(config, "head") + log.info("Airflow database tables created") @provide_session @@ -1169,7 +1171,7 @@ def upgradedb( previous_revision = _get_current_revision(session=session) - log.info("Creating tables") + log.info("Migrating the Airflow database") val = os.environ.get("AIRFLOW__DATABASE__SQL_ALCHEMY_MAX_SIZE") try: # Reconfigure the ORM to use _EXACTLY_ one connection, otherwise some db engines hang forever @@ -1193,6 +1195,7 @@ def upgradedb( settings.reconfigure_orm() if reserialize_dags and current_revision != previous_revision: + log.info("Reserializing the DAGs") _reserialize_dags(session=session) add_default_pool_if_not_exists(session=session) synchronize_log_template(session=session) @@ -1203,23 +1206,21 @@ def resetdb(session: Session = NEW_SESSION, skip_init: bool = False): """Clear out the database.""" if not settings.engine: raise RuntimeError("The settings.engine must be set. This is a critical assertion") - log.info("Dropping tables that exist") - original_logging_level = logging.root.level - try: - import_all_models() + log.info("Dropping Airflow tables that exist") + + import_all_models() - connection = settings.engine.connect() + connection = settings.engine.connect() - with create_global_lock(session=session, lock=DBLocks.MIGRATIONS), connection.begin(): - drop_airflow_models(connection) - drop_airflow_moved_tables(connection) - external_db_manager = RunDBManager() - external_db_manager.drop_tables(session, connection) + with create_global_lock(session=session, lock=DBLocks.MIGRATIONS), connection.begin(): + drop_airflow_models(connection) + drop_airflow_moved_tables(connection) + log.info("Dropped all Airflow tables") + external_db_manager = RunDBManager() + external_db_manager.drop_tables(session, connection) - if not skip_init: - initdb(session=session) - finally: - logging.root.setLevel(original_logging_level) + if not skip_init: + initdb(session=session) @provide_session @@ -1271,7 +1272,7 @@ def downgrade(*, to_revision, from_revision=None, show_sql_only=False, session: revision_range = f"{from_revision}:{to_revision}" _offline_migration(command.downgrade, config=config, revision=revision_range) else: - log.info("Applying downgrade migrations.") + log.info("Applying downgrade migrations to Airflow database.") command.downgrade(config, revision=to_revision, sql=show_sql_only) diff --git a/airflow/utils/db_manager.py b/airflow/utils/db_manager.py index 2241a646efd48..bc91f3c8003ca 100644 --- a/airflow/utils/db_manager.py +++ b/airflow/utils/db_manager.py @@ -88,10 +88,12 @@ def check_migration(self): def _create_db_from_orm(self): """Create database from ORM.""" + self.log.info("Creating %s tables from the ORM", self.__class__.__name__) engine = self.session.get_bind().engine self.metadata.create_all(engine) config = self.get_alembic_config() command.stamp(config, "head") + self.log.info("%s tables have been created from the ORM", self.__class__.__name__) def drop_tables(self, connection): self.metadata.drop_all(connection) @@ -105,6 +107,7 @@ def resetdb(self, skip_init=False): connection = settings.engine.connect() with create_global_lock(self.session, lock=DBLocks.MIGRATIONS), connection.begin(): + self.log.info("Dropping %s tables", self.__class__.__name__) self.drop_tables(connection) if not skip_init: self.initdb() @@ -123,6 +126,7 @@ def upgradedb(self, to_revision=None, from_revision=None, show_sql_only=False): config = self.get_alembic_config() command.upgrade(config, revision=to_revision or "heads", sql=show_sql_only) + self.log.info("Migrated the %s database", self.__class__.__name__) def downgrade(self, to_version, from_version=None, show_sql_only=False): """Downgrade the database.""" diff --git a/docs/apache-airflow/img/airflow_erd.sha256 b/docs/apache-airflow/img/airflow_erd.sha256 index bb6e976178d38..f278eee7d05ee 100644 --- a/docs/apache-airflow/img/airflow_erd.sha256 +++ b/docs/apache-airflow/img/airflow_erd.sha256 @@ -1 +1 @@ -5ec73e38d7ce2b81c9930d0d03c61a33ca68162a75b0ee2dc79d120913c35223 \ No newline at end of file +9b9dcf915eff051a5cd77176a78bdcca3703b227373efe83fd0a1d4d05623c28 \ No newline at end of file diff --git a/providers/src/airflow/providers/fab/migrations/env.py b/providers/src/airflow/providers/fab/migrations/env.py index 7b31f86024a31..903057ba60208 100644 --- a/providers/src/airflow/providers/fab/migrations/env.py +++ b/providers/src/airflow/providers/fab/migrations/env.py @@ -17,6 +17,7 @@ from __future__ import annotations import contextlib +from logging import getLogger from logging.config import fileConfig from alembic import context @@ -32,7 +33,7 @@ # Interpret the config file for Python logging. # This line sets up loggers basically. -if config.config_file_name is not None: +if not getLogger().handlers and config.config_file_name: fileConfig(config.config_file_name, disable_existing_loggers=False) # add your model's MetaData object here