Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AIRFLOW-6946] Switch to MySQL 5.7 in 2.0 as base #7570

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ RUN KEY="A4A9406876FCBD3C456770C88C718D3B5072E1F5" \
&& gpgconf --kill all \
rm -rf "${GNUPGHOME}"; \
apt-key list > /dev/null \
&& echo "deb http://repo.mysql.com/apt/debian/ stretch mysql-5.6" | tee -a /etc/apt/sources.list.d/mysql.list \
&& echo "deb http://repo.mysql.com/apt/debian/ stretch mysql-5.7" | tee -a /etc/apt/sources.list.d/mysql.list \
potiuk marked this conversation as resolved.
Show resolved Hide resolved
&& apt-get update \
&& apt-get install --no-install-recommends -y \
libmysqlclient-dev \
Expand Down
10 changes: 10 additions & 0 deletions airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,16 @@
type: string
example: ~
default: "utf-8"
- name: sql_engine_collation_for_ids
description: |
Collation for `dag_id`, `task_id`, `key` columns in case they have different encoding.
This is particularly useful in case of mysql with utf8mb4 encoding because
primary keys for XCom table has too big size and `sql_engine_collation_for_ids` should
be set to `utf8mb3_general_ci`
version_added: 2.0.0
type: string
example: ~
default: ~
- name: sql_alchemy_pool_enabled
description: |
If SqlAlchemy should pool database connections.
Expand Down
6 changes: 6 additions & 0 deletions airflow/config_templates/default_airflow.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,12 @@ sql_alchemy_conn = sqlite:///{AIRFLOW_HOME}/airflow.db
# The encoding for the databases
sql_engine_encoding = utf-8

# Collation for `dag_id`, `task_id`, `key` columns in case they have different encoding.
# This is particularly useful in case of mysql with utf8mb4 encoding because
# primary keys for XCom table has too big size and `sql_engine_collation_for_ids` should
# be set to `utf8mb3_general_ci`
# sql_engine_collation_for_ids =
potiuk marked this conversation as resolved.
Show resolved Hide resolved

# If SqlAlchemy should pool database connections.
sql_alchemy_pool_enabled = True

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
from sqlalchemy.dialects import mysql

# revision identifiers, used by Alembic.
from airflow.models.base import COLLATION_ARGS

revision = '0a2a5b66e19d'
down_revision = '9635ae0956e7'
branch_labels = None
Expand Down Expand Up @@ -64,8 +66,8 @@ def upgrade():
op.create_table(
TABLE_NAME,
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('task_id', sa.String(length=250), nullable=False),
sa.Column('dag_id', sa.String(length=250), nullable=False),
sa.Column('task_id', sa.String(length=250, **COLLATION_ARGS), nullable=False),
sa.Column('dag_id', sa.String(length=250, **COLLATION_ARGS), nullable=False),
# use explicit server_default=None otherwise mysql implies defaults for first timestamp column
sa.Column('execution_date', timestamp(), nullable=False, server_default=None),
sa.Column('try_number', sa.Integer(), nullable=False),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
from alembic import op

# revision identifiers, used by Alembic.
from airflow.models.base import COLLATION_ARGS

revision = '64de9cddf6c9'
down_revision = '211e584da130'
branch_labels = None
Expand All @@ -37,8 +39,8 @@ def upgrade():
op.create_table(
'task_fail',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('task_id', sa.String(length=250), nullable=False),
sa.Column('dag_id', sa.String(length=250), nullable=False),
sa.Column('task_id', sa.String(length=250, **COLLATION_ARGS), nullable=False),
sa.Column('dag_id', sa.String(length=250, **COLLATION_ARGS), nullable=False),
sa.Column('execution_date', sa.DateTime(), nullable=False),
sa.Column('start_date', sa.DateTime(), nullable=True),
sa.Column('end_date', sa.DateTime(), nullable=True),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from sqlalchemy import Column, Float, Integer, PickleType, String
from sqlalchemy.ext.declarative import declarative_base

from airflow.models.base import COLLATION_ARGS
from airflow.utils.session import create_session
from airflow.utils.sqlalchemy import UtcDateTime

Expand Down Expand Up @@ -59,8 +60,8 @@ class TaskInstance(Base):

__tablename__ = "task_instance"

task_id = Column(String(ID_LEN), primary_key=True)
dag_id = Column(String(ID_LEN), primary_key=True)
task_id = Column(String(ID_LEN, **COLLATION_ARGS), primary_key=True)
dag_id = Column(String(ID_LEN, **COLLATION_ARGS), primary_key=True)
execution_date = Column(UtcDateTime, primary_key=True)
start_date = Column(UtcDateTime)
end_date = Column(UtcDateTime)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@

from airflow import settings
from airflow.models import DagBag

# revision identifiers, used by Alembic.
from airflow.models.base import COLLATION_ARGS

revision = 'cc1e65623dc7'
down_revision = '127d2bf2dfa7'
branch_labels = None
Expand All @@ -46,8 +47,8 @@
class TaskInstance(Base):
__tablename__ = "task_instance"

task_id = Column(String(ID_LEN), primary_key=True)
dag_id = Column(String(ID_LEN), primary_key=True)
task_id = Column(String(ID_LEN, **COLLATION_ARGS), primary_key=True)
dag_id = Column(String(ID_LEN, **COLLATION_ARGS), primary_key=True)
execution_date = Column(sa.DateTime, primary_key=True)
max_tries = Column(Integer)
try_number = Column(Integer, default=0)
Expand Down
20 changes: 11 additions & 9 deletions airflow/migrations/versions/e3a246e0dc1_current_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
from sqlalchemy.engine.reflection import Inspector

# revision identifiers, used by Alembic.
from airflow.models.base import COLLATION_ARGS

revision = 'e3a246e0dc1'
down_revision = None
branch_labels = None
Expand Down Expand Up @@ -115,8 +117,8 @@ def upgrade():
'log',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('dttm', sa.DateTime(), nullable=True),
sa.Column('dag_id', sa.String(length=250), nullable=True),
sa.Column('task_id', sa.String(length=250), nullable=True),
sa.Column('dag_id', sa.String(length=250, **COLLATION_ARGS), nullable=True),
sa.Column('task_id', sa.String(length=250, **COLLATION_ARGS), nullable=True),
sa.Column('event', sa.String(length=30), nullable=True),
sa.Column('execution_date', sa.DateTime(), nullable=True),
sa.Column('owner', sa.String(length=500), nullable=True),
Expand All @@ -125,8 +127,8 @@ def upgrade():
if 'sla_miss' not in tables:
op.create_table(
'sla_miss',
sa.Column('task_id', sa.String(length=250), nullable=False),
sa.Column('dag_id', sa.String(length=250), nullable=False),
sa.Column('task_id', sa.String(length=250, **COLLATION_ARGS), nullable=False),
sa.Column('dag_id', sa.String(length=250, **COLLATION_ARGS), nullable=False),
sa.Column('execution_date', sa.DateTime(), nullable=False),
sa.Column('email_sent', sa.Boolean(), nullable=True),
sa.Column('timestamp', sa.DateTime(), nullable=True),
Expand All @@ -146,8 +148,8 @@ def upgrade():
if 'task_instance' not in tables:
op.create_table(
'task_instance',
sa.Column('task_id', sa.String(length=250), nullable=False),
sa.Column('dag_id', sa.String(length=250), nullable=False),
sa.Column('task_id', sa.String(length=250, **COLLATION_ARGS), nullable=False),
sa.Column('dag_id', sa.String(length=250, **COLLATION_ARGS), nullable=False),
sa.Column('execution_date', sa.DateTime(), nullable=False),
sa.Column('start_date', sa.DateTime(), nullable=True),
sa.Column('end_date', sa.DateTime(), nullable=True),
Expand Down Expand Up @@ -224,16 +226,16 @@ def upgrade():
op.create_table(
'xcom',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('key', sa.String(length=512), nullable=True),
sa.Column('key', sa.String(length=512, **COLLATION_ARGS), nullable=True),
sa.Column('value', sa.PickleType(), nullable=True),
sa.Column(
'timestamp',
sa.DateTime(),
default=func.now(),
nullable=False),
sa.Column('execution_date', sa.DateTime(), nullable=False),
sa.Column('task_id', sa.String(length=250), nullable=False),
sa.Column('dag_id', sa.String(length=250), nullable=False),
sa.Column('task_id', sa.String(length=250, **COLLATION_ARGS), nullable=False),
sa.Column('dag_id', sa.String(length=250, **COLLATION_ARGS), nullable=False),
sa.PrimaryKeyConstraint('id')
)

Expand Down
11 changes: 11 additions & 0 deletions airflow/models/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,14 @@
# used for typing
class Operator:
pass


def get_id_collation_args():
collation = conf.get('core', 'sql_engine_collation_for_ids', fallback=None)
if collation:
return {'collation': collation}
else:
return {}
potiuk marked this conversation as resolved.
Show resolved Hide resolved


COLLATION_ARGS = get_id_collation_args()
6 changes: 3 additions & 3 deletions airflow/models/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

from sqlalchemy import Column, Index, Integer, String, Text

from airflow.models.base import ID_LEN, Base
from airflow.models.base import COLLATION_ARGS, ID_LEN, Base
from airflow.utils import timezone
from airflow.utils.sqlalchemy import UtcDateTime

Expand All @@ -32,8 +32,8 @@ class Log(Base):

id = Column(Integer, primary_key=True)
dttm = Column(UtcDateTime)
dag_id = Column(String(ID_LEN))
task_id = Column(String(ID_LEN))
dag_id = Column(String(ID_LEN, **COLLATION_ARGS))
task_id = Column(String(ID_LEN, **COLLATION_ARGS))
event = Column(String(30))
execution_date = Column(UtcDateTime)
owner = Column(String(500))
Expand Down
6 changes: 3 additions & 3 deletions airflow/models/slamiss.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

from sqlalchemy import Boolean, Column, Index, String, Text

from airflow.models.base import ID_LEN, Base
from airflow.models.base import COLLATION_ARGS, ID_LEN, Base
from airflow.utils.sqlalchemy import UtcDateTime


Expand All @@ -30,8 +30,8 @@ class SlaMiss(Base):
"""
__tablename__ = "sla_miss"

task_id = Column(String(ID_LEN), primary_key=True)
dag_id = Column(String(ID_LEN), primary_key=True)
task_id = Column(String(ID_LEN, **COLLATION_ARGS), primary_key=True)
dag_id = Column(String(ID_LEN, **COLLATION_ARGS), primary_key=True)
execution_date = Column(UtcDateTime, primary_key=True)
email_sent = Column(Boolean, default=False)
timestamp = Column(UtcDateTime)
Expand Down
6 changes: 3 additions & 3 deletions airflow/models/taskfail.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
"""Taskfail tracks the failed run durations of each task instance"""
from sqlalchemy import Column, Index, Integer, String

from airflow.models.base import ID_LEN, Base
from airflow.models.base import COLLATION_ARGS, ID_LEN, Base
from airflow.utils.sqlalchemy import UtcDateTime


Expand All @@ -30,8 +30,8 @@ class TaskFail(Base):
__tablename__ = "task_fail"

id = Column(Integer, primary_key=True)
task_id = Column(String(ID_LEN), nullable=False)
dag_id = Column(String(ID_LEN), nullable=False)
task_id = Column(String(ID_LEN, **COLLATION_ARGS), nullable=False)
dag_id = Column(String(ID_LEN, **COLLATION_ARGS), nullable=False)
execution_date = Column(UtcDateTime, nullable=False)
start_date = Column(UtcDateTime)
end_date = Column(UtcDateTime)
Expand Down
6 changes: 3 additions & 3 deletions airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
from airflow.exceptions import (
AirflowException, AirflowRescheduleException, AirflowSkipException, AirflowTaskTimeout,
)
from airflow.models.base import ID_LEN, Base
from airflow.models.base import COLLATION_ARGS, ID_LEN, Base
from airflow.models.log import Log
from airflow.models.taskfail import TaskFail
from airflow.models.taskreschedule import TaskReschedule
Expand Down Expand Up @@ -146,8 +146,8 @@ class TaskInstance(Base, LoggingMixin):

__tablename__ = "task_instance"

task_id = Column(String(ID_LEN), primary_key=True)
dag_id = Column(String(ID_LEN), primary_key=True)
task_id = Column(String(ID_LEN, **COLLATION_ARGS), primary_key=True)
dag_id = Column(String(ID_LEN, **COLLATION_ARGS), primary_key=True)
execution_date = Column(UtcDateTime, primary_key=True)
start_date = Column(UtcDateTime)
end_date = Column(UtcDateTime)
Expand Down
6 changes: 3 additions & 3 deletions airflow/models/taskreschedule.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
"""TaskReschedule tracks rescheduled task instances."""
from sqlalchemy import Column, ForeignKeyConstraint, Index, Integer, String, asc

from airflow.models.base import ID_LEN, Base
from airflow.models.base import COLLATION_ARGS, ID_LEN, Base
from airflow.utils.session import provide_session
from airflow.utils.sqlalchemy import UtcDateTime

Expand All @@ -31,8 +31,8 @@ class TaskReschedule(Base):
__tablename__ = "task_reschedule"

id = Column(Integer, primary_key=True)
task_id = Column(String(ID_LEN), nullable=False)
dag_id = Column(String(ID_LEN), nullable=False)
task_id = Column(String(ID_LEN, **COLLATION_ARGS), nullable=False)
dag_id = Column(String(ID_LEN, **COLLATION_ARGS), nullable=False)
execution_date = Column(UtcDateTime, nullable=False)
try_number = Column(Integer, nullable=False)
start_date = Column(UtcDateTime, nullable=False)
Expand Down
8 changes: 4 additions & 4 deletions airflow/models/xcom.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from sqlalchemy.orm import Query, Session, reconstructor

from airflow.configuration import conf
from airflow.models.base import ID_LEN, Base
from airflow.models.base import COLLATION_ARGS, ID_LEN, Base
from airflow.utils import timezone
from airflow.utils.helpers import is_container
from airflow.utils.log.logging_mixin import LoggingMixin
Expand All @@ -47,15 +47,15 @@ class XCom(Base, LoggingMixin):
"""
__tablename__ = "xcom"

key = Column(String(512), primary_key=True)
key = Column(String(512, **COLLATION_ARGS), primary_key=True)
value = Column(LargeBinary)
timestamp = Column(
UtcDateTime, default=timezone.utcnow, nullable=False)
execution_date = Column(UtcDateTime, primary_key=True)

# source information
task_id = Column(String(ID_LEN), primary_key=True)
dag_id = Column(String(ID_LEN), primary_key=True)
task_id = Column(String(ID_LEN, **COLLATION_ARGS), primary_key=True)
dag_id = Column(String(ID_LEN, **COLLATION_ARGS), primary_key=True)

"""
TODO: "pickling" has been deprecated and JSON is preferred.
Expand Down
8 changes: 5 additions & 3 deletions scripts/ci/docker-compose/backend-mysql.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@ services:
airflow-testing:
environment:
- BACKEND=mysql
- AIRFLOW__CORE__SQL_ALCHEMY_CONN=mysql://root@mysql/airflow
- AIRFLOW__CELERY__RESULT_BACKEND=db+mysql://root@mysql/airflow
- AIRFLOW__CORE__SQL_ALCHEMY_CONN=mysql://root@mysql/airflow?charset=utf8mb4
- AIRFLOW__CORE__SQL_ENGINE_COLLATION_FOR_IDS=utf8mb3_general_ci
- AIRFLOW__CELERY__RESULT_BACKEND=db+mysql://root@mysql/airflow?charset=utf8mb4
depends_on:
- mysql
mysql:
image: mysql:5.6
image: mysql:5.7
command: --character-set-server=utf8mb4 --collation-server=utf8mb4_unicode_ci
environment:
- MYSQL_ALLOW_EMPTY_PASSWORD=true
- MYSQL_ROOT_HOST=%
Expand Down
1 change: 1 addition & 0 deletions scripts/ci/in_container/airflow_ci.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
dags_folder = ~/airflow/dags
executor = LocalExecutor
sql_alchemy_conn = # overridden by the startup scripts
#sql_engine_collation_for_ids = overridden by the startup scripts
unit_test_mode = True
load_examples = True
donot_pickle = False
Expand Down
4 changes: 4 additions & 0 deletions scripts/ci/in_container/entrypoint_ci.sh
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ echo
echo "Airflow home: ${AIRFLOW_HOME}"
echo "Airflow sources: ${AIRFLOW_SOURCES}"
echo "Airflow core SQL connection: ${AIRFLOW__CORE__SQL_ALCHEMY_CONN:=}"
if [[ -n "${AIRFLOW__CORE__SQL_ENGINE_COLLATION_FOR_IDS:=}" ]]; then
echo "Airflow collation for IDs: ${AIRFLOW__CORE__SQL_ENGINE_COLLATION_FOR_IDS}"
fi

echo

ARGS=( "$@" )
Expand Down