Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support generating SQL script for upgrades #20962

Merged
merged 3 commits into from
Feb 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion airflow/cli/cli_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,26 @@ def _check(value):
default=ColorMode.AUTO,
)

# DB args
ARG_VERSION_RANGE = Arg(
("-r", "--range"),
help="Version range(start:end) for offline sql generation. Example: '2.0.2:2.2.3'",
default=None,
)
ARG_REVISION_RANGE = Arg(
('--revision-range',),
help=(
"Migration revision range(start:end) to use for offline sql generation. "
"Example: 'a13f7613ad25:7b2661a43ba3'"
),
default=None,
)
ARG_REVISION_RANGE = Arg(
('--revision-range',),
help='Revision range(start:end) to use for offline sql generation',
ephraimbuddy marked this conversation as resolved.
Show resolved Hide resolved
default=None,
)

# list_dag_runs
ARG_DAG_ID_OPT = Arg(("-d", "--dag-id"), help="The id of the dag")
ARG_NO_BACKFILL = Arg(
Expand Down Expand Up @@ -1274,7 +1294,7 @@ class GroupCommand(NamedTuple):
name='upgrade',
help="Upgrade the metadata database to latest version",
func=lazy_load_command('airflow.cli.commands.db_command.upgradedb'),
args=(),
args=(ARG_VERSION_RANGE, ARG_REVISION_RANGE),
),
ActionCommand(
name='shell',
Expand Down
2 changes: 1 addition & 1 deletion airflow/cli/commands/db_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def resetdb(args):
def upgradedb(args):
"""Upgrades the metadata database"""
print("DB: " + repr(settings.engine.url))
db.upgradedb()
db.upgradedb(version_range=args.range, revision_range=args.revision_range)
print("Upgrades done")


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,48 +26,21 @@

import sqlalchemy as sa
from alembic import op
from sqlalchemy import Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base

from airflow.utils.sqlalchemy import UtcDateTime

# revision identifiers, used by Alembic.
revision = '8646922c8a04'
down_revision = '449b4072c2da'
branch_labels = None
depends_on = None

Base = declarative_base()
BATCH_SIZE = 5000


class TaskInstance(Base): # type: ignore
"""Minimal model definition for migrations"""

__tablename__ = "task_instance"

task_id = Column(String(), primary_key=True)
dag_id = Column(String(), primary_key=True)
execution_date = Column(UtcDateTime, primary_key=True)
pool_slots = Column(Integer, default=1)


def upgrade():
"""Change default pool_slots to 1 and make pool_slots not nullable"""
connection = op.get_bind()
sessionmaker = sa.orm.sessionmaker()
session = sessionmaker(bind=connection)

session.query(TaskInstance).filter(TaskInstance.pool_slots.is_(None)).update(
{TaskInstance.pool_slots: 1}, synchronize_session=False
)
session.commit()

with op.batch_alter_table("task_instance", schema=None) as batch_op:
batch_op.alter_column("pool_slots", existing_type=sa.Integer, nullable=False)
batch_op.alter_column("pool_slots", existing_type=sa.Integer, nullable=False, server_default='1')
ephraimbuddy marked this conversation as resolved.
Show resolved Hide resolved
ephraimbuddy marked this conversation as resolved.
Show resolved Hide resolved
ephraimbuddy marked this conversation as resolved.
Show resolved Hide resolved


def downgrade():
"""Unapply Change default pool_slots to 1"""
with op.batch_alter_table("task_instance", schema=None) as batch_op:
batch_op.alter_column("pool_slots", existing_type=sa.Integer, nullable=True)
batch_op.alter_column("pool_slots", existing_type=sa.Integer, nullable=True, server_default=None)
2 changes: 1 addition & 1 deletion airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ class TaskInstance(Base, LoggingMixin):
unixname = Column(String(1000))
job_id = Column(Integer)
pool = Column(String(256), nullable=False)
pool_slots = Column(Integer, default=1, nullable=False)
pool_slots = Column(Integer, default=1, nullable=False, server_default=text("1"))
ephraimbuddy marked this conversation as resolved.
Show resolved Hide resolved
queue = Column(String(256))
priority_weight = Column(Integer)
operator = Column(String(1000))
Expand Down
113 changes: 111 additions & 2 deletions airflow/utils/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@
import sys
import time
from tempfile import gettempdir
from typing import TYPE_CHECKING, Any, Callable, Iterable, List, Tuple
from typing import TYPE_CHECKING, Any, Callable, Iterable, List, Optional, Tuple

from bcrypt import warnings
from sqlalchemy import Table, exc, func, inspect, or_, text
from sqlalchemy.orm.session import Session

Expand Down Expand Up @@ -68,6 +69,21 @@

log = logging.getLogger(__name__)

REVISION_HEADS_MAP = {
"2.0.0": "e959f08ac86c",
"2.0.1": "82b7c48c147f",
"2.0.2": "2e42bb497a22",
"2.1.0": "a13f7613ad25",
"2.1.1": "a13f7613ad25",
"2.1.2": "a13f7613ad25",
"2.1.3": "97cdd93827b8",
"2.1.4": "ccde3e26fe78",
"2.2.0": "7b2661a43ba3",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we include all releases here? If we pass 2.2.1, users would get "Please provide valid Airflow versions that has migrations between them." right? I think it'd be worth adding them all so it "just works" from their perspective.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's no migration head for that version unless we should use the head for a previous release that has migration but I think that a user using this feature is more likely to understand the message. WDYT?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should add all versions. It's very easy to add another command which will filter out the versions that have the same migration head as the previous one if you want "miigration versions only".

And It also gives a great opportunity to print a "happy" message to the users when they run "get me the script to migrate from "2.2.1 to 2.2.2":

# Hey this is your migration script from 2.2.1, to 2.2.2, but guess what? 
#
# There is no migration needed as the database has not changed between those versions. You are done.
#
#  /\_/\
# (='_' )
# (, (") (")
#

"2.2.1": "7b2661a43ba3",
"2.2.2": "7b2661a43ba3",
"2.2.3": "be2bfac3da23",
}


def _format_airflow_moved_table_name(source_table, version):
return "__".join([settings.AIRFLOW_MOVED_TABLE_PREFIX, version.replace(".", "_"), source_table])
Expand Down Expand Up @@ -1001,8 +1017,93 @@ def _check_migration_errors(session: Session = NEW_SESSION) -> Iterable[str]:
session.commit()


def _offline_migration(command, config, revision):
log.info("Running offline migrations for revision range %s", revision)
with warnings.catch_warnings():
warnings.simplefilter("ignore")
logging.disable(logging.CRITICAL)
command.upgrade(config, revision, sql=True)
logging.disable(logging.NOTSET)


def _validate_version_range(command, config, version_range):
if ':' not in version_range:
raise AirflowException(
'Please provide Airflow version range with the format "old_version:new_version"'
)
lower, upper = version_range.split(':')

if not REVISION_HEADS_MAP.get(lower) or not REVISION_HEADS_MAP.get(upper):
raise AirflowException('Please provide valid Airflow versions above 2.0.0.')
if REVISION_HEADS_MAP.get(lower) == REVISION_HEADS_MAP.get(upper):
if sys.stdout.isatty():
size = os.get_terminal_size().columns
else:
size = 0
print(f"Hey this is your migration script from {lower}, to {upper}, but guess what?".center(size))
ephraimbuddy marked this conversation as resolved.
Show resolved Hide resolved
print(
"There is no migration needed as the database has not changed between those versions. "
"You are done.".center(size)
)
print("""/\\_/\\""".center(size))
print("""(='_' )""".center(size))
print("""(,(") (")""".center(size))
print("""^^^""".center(size))
return
dbname = settings.engine.dialect.name
if dbname == 'sqlite':
raise AirflowException('SQLite is not supported for offline migration.')
elif dbname == 'mssql' and (lower != '2.2.0' or int(lower.split('.')[1]) < 2):
raise AirflowException(
'MSSQL is not supported for offline migration in Airflow versions less than 2.2.0.'
)
revision = f"{REVISION_HEADS_MAP[lower]}:{REVISION_HEADS_MAP[upper]}"
try:
command.history(config, rev_range=revision)
except Exception:
raise AirflowException(
f"Error while checking history for revision range {revision}. "
f"Check that the supplied airflow version is in the format 'old_version:new_version'."
)
return revision


def _validate_revision(command, config, revision_range):
if ':' not in revision_range:
raise AirflowException(
'Please provide Airflow revision range with the format "old_revision:new_revision"'
)
dbname = settings.engine.dialect.name
if dbname == 'sqlite':
raise AirflowException('SQLite is not supported for offline migration.')
start_version = '2.0.0'
rev_2_0_0_head = 'e959f08ac86c'
_lowerband, _upperband = revision_range.split(':')
if dbname == 'mssql':
rev_2_2_0_head = '7b2661a43ba3'
head_to_lowerband_range = f"{rev_2_2_0_head}:{_lowerband}"
head_to_upperband_range = f"{rev_2_2_0_head}:{_upperband}"
rev_2_0_0_head = rev_2_2_0_head # for logging purposes
start_version = '2.2.0'
else:
head_to_lowerband_range = f"{rev_2_0_0_head}:{_lowerband}"
head_to_upperband_range = f"{rev_2_0_0_head}:{_upperband}"
for i in [head_to_lowerband_range, head_to_upperband_range]:
try:
command.history(config, rev_range=i)
except Exception:
raise AirflowException(
f"Error while checking history for revision range {i}. "
f"Check that {i.split(':')[1]} is a valid revision. "
f"Supported revision for offline migration is from {rev_2_0_0_head} "
f"which is airflow {start_version} head"
)


@provide_session
def upgradedb(session: Session = NEW_SESSION):
def upgradedb(
version_range: Optional[str] = None, revision_range: Optional[str] = None, session: Session = NEW_SESSION
):
"""Upgrade the database."""
# alembic adds significant import time, so we import it lazily
if not settings.SQL_ALCHEMY_CONN:
Expand All @@ -1012,6 +1113,14 @@ def upgradedb(session: Session = NEW_SESSION):
config = _get_alembic_config()

config.set_main_option('sqlalchemy.url', settings.SQL_ALCHEMY_CONN.replace('%', '%%'))
if version_range:
revision = _validate_version_range(command, config, version_range)
if not revision:
return
return _offline_migration(command, config, revision)
elif revision_range:
_validate_revision(command, config, revision_range)
return _offline_migration(command, config, revision_range)

errors_seen = False
for err in _check_migration_errors(session=session):
Expand Down
1 change: 1 addition & 0 deletions dev/README_RELEASE_AIRFLOW.md
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ The Release Candidate artifacts we vote upon should be the exact ones we vote ag
- Add a commit that updates `CHANGELOG.md` to add changes from previous version if it has not already added.
For now this is done manually, example run `git log --oneline v2-2-test..HEAD --pretty='format:- %s'` and categorize them.
- Add section for the release in `UPDATING.md`. If no new entries exist, put "No breaking changes" (e.g. `2.1.4`).
- Update the `REVISION_HEADS_MAP` at airflow/utils/db.py to include the revision head of the release even if there are no migrations.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should have a pre-commit to verify that a tuple containing the current Airflow version is added to REVISION_HEADS_MAP to avoid forgetting it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like a pre-commit won't be possible in this case as we only update the migration head when there's a new airflow release.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about something as simple as:

from airflow.utils.db import REVISION_HEADS_MAP
from airflow.version import version as airflow_version
from packaging.version import Version


if not Version(airflow_version).is_prerelease and airflow_version not in REVISION_HEADS_MAP:
    raise Exception(f"Airflow version {airflow_version} is not in the revision map {REVISION_HEADS_MAP}")

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i was also thinking about this issue. i saw that we had a table in the docs which contains this mapping. so what i was thinking to do was store that table in yaml form, and it could be used to generate the docs table and to support CLI commands which need to map version to revision, and that's what i do in #21601

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good, I think the challenge would be to get the migration heads of the versions and discard every other migration in-between when going from version to version. It's still necessary to have a pre-commit or a ci job that makes sure we have the migration head updated before a release

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes that's right.

- Commit the version change.
- PR from the 'test' branch to the 'stable' branch, and manually merge it once approved.
- Check out the 'stable' branch
Expand Down
11 changes: 11 additions & 0 deletions docs/apache-airflow/installation/upgrading.rst
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,17 @@ In order to manually upgrade the database you should run the ``airflow db upgrad
environment. It can be run either in your virtual environment or in the containers that give
you access to Airflow ``CLI`` :doc:`/usage-cli` and the database.

Offline SQL migration scripts
=============================
If you want to run the upgrade script offline, you can use the ``-r`` or ``--revision-range`` flag
to get the SQL statements that would be executed. This feature is supported in Postgres and MySQL
from Airflow 2.0.0 onward and in MSSQL from Airflow 2.2.0 onward.

Sample usage:
``airflow db upgrade -r "2.0.0:2.2.0"``
``airflow db upgrade --revision-range "e959f08ac86c:142555e44c17"``


Migration best practices
========================

Expand Down
2 changes: 1 addition & 1 deletion tests/cli/commands/test_db_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def test_cli_check_migrations(self, mock_wait_for_migrations):
def test_cli_upgradedb(self, mock_upgradedb):
db_command.upgradedb(self.parser.parse_args(['db', 'upgrade']))

mock_upgradedb.assert_called_once_with()
mock_upgradedb.assert_called_once_with(version_range=None, revision_range=None)

@mock.patch("airflow.cli.commands.db_command.execute_interactive")
@mock.patch("airflow.cli.commands.db_command.NamedTemporaryFile")
Expand Down
Loading