Skip to content

Commit

Permalink
Remove unnecessary loggings from offline sql generation command (#21962)
Browse files Browse the repository at this point in the history
I found a way to remove unnecessary loggings that comes from validating revisions in the offline
migration command. Also cleaned up some codes
  • Loading branch information
ephraimbuddy authored Mar 3, 2022
1 parent 7e806cf commit 0d856b1
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 24 deletions.
7 changes: 1 addition & 6 deletions airflow/cli/cli_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,15 +244,10 @@ def string_list_type(val):
('--revision-range',),
help=(
"Migration revision range(start:end) to use for offline sql generation. "
"Example: 'a13f7613ad25:7b2661a43ba3'"
"Example: ``a13f7613ad25:7b2661a43ba3``"
),
default=None,
)
ARG_REVISION_RANGE = Arg(
('--revision-range',),
help='Revision range(start:end) to use for offline sql generation',
default=None,
)

# list_dag_runs
ARG_DAG_ID_OPT = Arg(("-d", "--dag-id"), help="The id of the dag")
Expand Down
37 changes: 19 additions & 18 deletions airflow/utils/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -1035,15 +1035,14 @@ def _check_migration_errors(session: Session = NEW_SESSION) -> Iterable[str]:


def _offline_migration(migration_func: Callable, config, revision):
log.info("Running offline migrations for revision range %s", revision)
with warnings.catch_warnings():
warnings.simplefilter("ignore")
logging.disable(logging.CRITICAL)
migration_func(config, revision, sql=True)
logging.disable(logging.NOTSET)


def _validate_version_range(command, config, version_range):
def _validate_version_range(script_, version_range):
if ':' not in version_range:
raise AirflowException(
'Please provide Airflow version range with the format "old_version:new_version"'
Expand Down Expand Up @@ -1074,9 +1073,11 @@ def _validate_version_range(command, config, version_range):
raise AirflowException(
'MSSQL is not supported for offline migration in Airflow versions less than 2.2.0.'
)
revision = f"{REVISION_HEADS_MAP[lower]}:{REVISION_HEADS_MAP[upper]}"
_lower, _upper = REVISION_HEADS_MAP[lower], REVISION_HEADS_MAP[upper]
revision = f"{_lower}:{_upper}"
try:
command.history(config, rev_range=revision)
# Check if there is history between the revisions
list(script_.revision_map.iterate_revisions(_upper, _lower))
except Exception:
raise AirflowException(
f"Error while checking history for revision range {revision}. "
Expand All @@ -1085,7 +1086,7 @@ def _validate_version_range(command, config, version_range):
return revision


def _validate_revision(command, config, revision_range):
def _validate_revision(script_, revision_range):
if ':' not in revision_range:
raise AirflowException(
'Please provide Airflow revision range with the format "old_revision:new_revision"'
Expand All @@ -1097,21 +1098,17 @@ def _validate_revision(command, config, revision_range):
rev_2_0_0_head = 'e959f08ac86c'
_lowerband, _upperband = revision_range.split(':')
if dbname == 'mssql':
rev_2_2_0_head = '7b2661a43ba3'
head_to_lowerband_range = f"{rev_2_2_0_head}:{_lowerband}"
head_to_upperband_range = f"{rev_2_2_0_head}:{_upperband}"
rev_2_0_0_head = rev_2_2_0_head # for logging purposes
rev_2_0_0_head = '7b2661a43ba3'
start_version = '2.2.0'
else:
head_to_lowerband_range = f"{rev_2_0_0_head}:{_lowerband}"
head_to_upperband_range = f"{rev_2_0_0_head}:{_upperband}"
for i in [head_to_lowerband_range, head_to_upperband_range]:
for i in [_lowerband, _upperband]:
try:
command.history(config, rev_range=i)
# Check if there is history between the revisions and the start revision
# This ensures that the revisions are above 2.0.0 head or 2.2.0 head if mssql
list(script_.revision_map.iterate_revisions(upper=i, lower=rev_2_0_0_head))
except Exception:
raise AirflowException(
f"Error while checking history for revision range {i}. "
f"Check that {i.split(':')[1]} is a valid revision. "
f"Error while checking history for revision range {rev_2_0_0_head}:{i}. "
f"Check that {i} is a valid revision. "
f"Supported revision for offline migration is from {rev_2_0_0_head} "
f"which is airflow {start_version} head"
)
Expand All @@ -1126,17 +1123,21 @@ def upgradedb(
if not settings.SQL_ALCHEMY_CONN:
raise RuntimeError("The settings.SQL_ALCHEMY_CONN not set. This is critical assertion.")
from alembic import command
from alembic.script import ScriptDirectory

config = _get_alembic_config()
script_ = ScriptDirectory.from_config(config)

config.set_main_option('sqlalchemy.url', settings.SQL_ALCHEMY_CONN.replace('%', '%%'))
if version_range:
revision = _validate_version_range(command, config, version_range)
revision = _validate_version_range(script_, version_range)
if not revision:
return
log.info("Running offline migrations for version range %s", version_range)
return _offline_migration(command.upgrade, config, revision)
elif revision_range:
_validate_revision(command, config, revision_range)
_validate_revision(script_, revision_range)
log.info("Running offline migrations for revision range %s", revision_range)
return _offline_migration(command.upgrade, config, revision_range)

errors_seen = False
Expand Down

0 comments on commit 0d856b1

Please sign in to comment.