Skip to content

Commit

Permalink
Add logging to the migration commands (#43516)
Browse files Browse the repository at this point in the history
* Add logging to the migration commands

This PR enhances logging for the migration commands and ensures that
alembic doesn't change the logger when logging.

A PR was raised to address the issue of reset db command changing logging level;
see f166467, but that did not address
it for other commands. This PR addresses it for all the migration commands

* fixup! Add logging to the migration commands

* Apply suggestions from code review

Co-authored-by: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com>

---------

Co-authored-by: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com>
  • Loading branch information
ephraimbuddy and jedcunningham authored Oct 31, 2024
1 parent 06088a3 commit 22d2aeb
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 19 deletions.
4 changes: 3 additions & 1 deletion airflow/migrations/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import contextlib
import sys
from logging import getLogger
from logging.config import fileConfig

from alembic import context
Expand Down Expand Up @@ -48,7 +49,8 @@ def include_object(_, name, type_, *args):

# Interpret the config file for Python logging.
# This line sets up loggers basically.
fileConfig(config.config_file_name, disable_existing_loggers=False)
if not getLogger().handlers:
fileConfig(config.config_file_name, disable_existing_loggers=False)

# add your model's MetaData object here
# for 'autogenerate' support
Expand Down
33 changes: 17 additions & 16 deletions airflow/utils/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -749,6 +749,7 @@ def _get_flask_db(sql_database_uri):


def _create_db_from_orm(session):
log.info("Creating Airflow database tables from the ORM")
from alembic import command

from airflow.models.base import Base
Expand All @@ -764,6 +765,7 @@ def _create_flask_session_tbl(sql_database_uri):
# stamp the migration head
config = _get_alembic_config()
command.stamp(config, "head")
log.info("Airflow database tables created")


@provide_session
Expand Down Expand Up @@ -1169,7 +1171,7 @@ def upgradedb(

previous_revision = _get_current_revision(session=session)

log.info("Creating tables")
log.info("Migrating the Airflow database")
val = os.environ.get("AIRFLOW__DATABASE__SQL_ALCHEMY_MAX_SIZE")
try:
# Reconfigure the ORM to use _EXACTLY_ one connection, otherwise some db engines hang forever
Expand All @@ -1193,6 +1195,7 @@ def upgradedb(
settings.reconfigure_orm()

if reserialize_dags and current_revision != previous_revision:
log.info("Reserializing the DAGs")
_reserialize_dags(session=session)
add_default_pool_if_not_exists(session=session)
synchronize_log_template(session=session)
Expand All @@ -1203,23 +1206,21 @@ def resetdb(session: Session = NEW_SESSION, skip_init: bool = False):
"""Clear out the database."""
if not settings.engine:
raise RuntimeError("The settings.engine must be set. This is a critical assertion")
log.info("Dropping tables that exist")
original_logging_level = logging.root.level
try:
import_all_models()
log.info("Dropping Airflow tables that exist")

import_all_models()

connection = settings.engine.connect()
connection = settings.engine.connect()

with create_global_lock(session=session, lock=DBLocks.MIGRATIONS), connection.begin():
drop_airflow_models(connection)
drop_airflow_moved_tables(connection)
external_db_manager = RunDBManager()
external_db_manager.drop_tables(session, connection)
with create_global_lock(session=session, lock=DBLocks.MIGRATIONS), connection.begin():
drop_airflow_models(connection)
drop_airflow_moved_tables(connection)
log.info("Dropped all Airflow tables")
external_db_manager = RunDBManager()
external_db_manager.drop_tables(session, connection)

if not skip_init:
initdb(session=session)
finally:
logging.root.setLevel(original_logging_level)
if not skip_init:
initdb(session=session)


@provide_session
Expand Down Expand Up @@ -1271,7 +1272,7 @@ def downgrade(*, to_revision, from_revision=None, show_sql_only=False, session:
revision_range = f"{from_revision}:{to_revision}"
_offline_migration(command.downgrade, config=config, revision=revision_range)
else:
log.info("Applying downgrade migrations.")
log.info("Applying downgrade migrations to Airflow database.")
command.downgrade(config, revision=to_revision, sql=show_sql_only)


Expand Down
4 changes: 4 additions & 0 deletions airflow/utils/db_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,12 @@ def check_migration(self):

def _create_db_from_orm(self):
"""Create database from ORM."""
self.log.info("Creating %s tables from the ORM", self.__class__.__name__)
engine = self.session.get_bind().engine
self.metadata.create_all(engine)
config = self.get_alembic_config()
command.stamp(config, "head")
self.log.info("%s tables have been created from the ORM", self.__class__.__name__)

def drop_tables(self, connection):
self.metadata.drop_all(connection)
Expand All @@ -105,6 +107,7 @@ def resetdb(self, skip_init=False):
connection = settings.engine.connect()

with create_global_lock(self.session, lock=DBLocks.MIGRATIONS), connection.begin():
self.log.info("Dropping %s tables", self.__class__.__name__)
self.drop_tables(connection)
if not skip_init:
self.initdb()
Expand All @@ -123,6 +126,7 @@ def upgradedb(self, to_revision=None, from_revision=None, show_sql_only=False):

config = self.get_alembic_config()
command.upgrade(config, revision=to_revision or "heads", sql=show_sql_only)
self.log.info("Migrated the %s database", self.__class__.__name__)

def downgrade(self, to_version, from_version=None, show_sql_only=False):
"""Downgrade the database."""
Expand Down
2 changes: 1 addition & 1 deletion docs/apache-airflow/img/airflow_erd.sha256
Original file line number Diff line number Diff line change
@@ -1 +1 @@
5ec73e38d7ce2b81c9930d0d03c61a33ca68162a75b0ee2dc79d120913c35223
9b9dcf915eff051a5cd77176a78bdcca3703b227373efe83fd0a1d4d05623c28
3 changes: 2 additions & 1 deletion providers/src/airflow/providers/fab/migrations/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from __future__ import annotations

import contextlib
from logging import getLogger
from logging.config import fileConfig

from alembic import context
Expand All @@ -32,7 +33,7 @@

# Interpret the config file for Python logging.
# This line sets up loggers basically.
if config.config_file_name is not None:
if not getLogger().handlers and config.config_file_name:
fileConfig(config.config_file_name, disable_existing_loggers=False)

# add your model's MetaData object here
Expand Down

0 comments on commit 22d2aeb

Please sign in to comment.