-
Notifications
You must be signed in to change notification settings - Fork 14.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Update conf column in dag_run table type from bytea to JSON #44533
base: main
Are you sure you want to change the base?
Changes from 17 commits
b91446b
cbde2ed
e698769
f7f9155
18c03fb
2b6abf7
a54c435
6b3789d
07258f5
8b88d63
730470b
c2e0d09
c2b8dbd
c8fa27d
d27b7ed
603ca17
9406702
01c453c
219b694
22a5dba
70ce5df
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,73 @@ | ||
# | ||
# Licensed to the Apache Software Foundation (ASF) under one | ||
# or more contributor license agreements. See the NOTICE file | ||
# distributed with this work for additional information | ||
# regarding copyright ownership. The ASF licenses this file | ||
# to you under the Apache License, Version 2.0 (the | ||
# "License"); you may not use this file except in compliance | ||
# with the License. You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, | ||
# software distributed under the License is distributed on an | ||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
# KIND, either express or implied. See the License for the | ||
# specific language governing permissions and limitations | ||
# under the License. | ||
|
||
""" | ||
remove pickled data from dagrun table. | ||
|
||
Revision ID: e39a26ac59f6 | ||
Revises: 038dc8bc6284 | ||
Create Date: 2024-12-01 08:33:15.425141 | ||
|
||
""" | ||
|
||
from __future__ import annotations | ||
|
||
import sqlalchemy as sa | ||
from alembic import op | ||
from sqlalchemy import text | ||
from sqlalchemy.dialects import postgresql | ||
|
||
# revision identifiers, used by Alembic. | ||
revision = "e39a26ac59f6" | ||
down_revision = "038dc8bc6284" | ||
branch_labels = None | ||
depends_on = None | ||
airflow_version = "3.0.0" | ||
|
||
|
||
def upgrade(): | ||
"""Apply remove pickled data from dagrun table.""" | ||
conn = op.get_bind() | ||
|
||
# Update the dag_run.conf column value to NULL | ||
conn.execute(text("UPDATE dag_run set conf=null WHERE conf IS NOT NULL")) | ||
|
||
conf_type = sa.JSON().with_variant(postgresql.JSONB, "postgresql") | ||
|
||
with op.batch_alter_table("dag_run", schema=None) as batch_op: | ||
# Drop the existing column for SQLite (due to its limitations) | ||
batch_op.drop_column("conf") | ||
# Add the new column with the correct type for the all dialect | ||
batch_op.add_column(sa.Column("conf", conf_type, nullable=True)) | ||
|
||
|
||
def downgrade(): | ||
"""Unapply Remove pickled data from dagrun table.""" | ||
conn = op.get_bind() | ||
conn.execute(text("UPDATE dag_run set conf=null WHERE conf IS NOT NULL")) | ||
|
||
# Update the dag_run.conf column value to NULL to avoid issues during the type change | ||
conn.execute(text("UPDATE dag_run set conf=null WHERE conf IS NOT NULL")) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Duplicate? |
||
|
||
conf_type = sa.LargeBinary().with_variant(postgresql.BYTEA, "postgresql") | ||
|
||
# Apply the same logic for all dialects, including SQLite | ||
with op.batch_alter_table("dag_run", schema=None) as batch_op: | ||
# Drop the existing column for SQLite (due to its limitations) | ||
batch_op.drop_column("conf") | ||
batch_op.add_column(sa.Column("conf", conf_type, nullable=True)) |
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -25,14 +25,14 @@ | |||||
|
||||||
import re2 | ||||||
from sqlalchemy import ( | ||||||
JSON, | ||||||
Boolean, | ||||||
Column, | ||||||
Enum, | ||||||
ForeignKey, | ||||||
ForeignKeyConstraint, | ||||||
Index, | ||||||
Integer, | ||||||
PickleType, | ||||||
PrimaryKeyConstraint, | ||||||
String, | ||||||
Text, | ||||||
|
@@ -44,6 +44,7 @@ | |||||
text, | ||||||
update, | ||||||
) | ||||||
from sqlalchemy.dialects import postgresql | ||||||
from sqlalchemy.exc import IntegrityError | ||||||
from sqlalchemy.ext.associationproxy import association_proxy | ||||||
from sqlalchemy.orm import declared_attr, joinedload, relationship, synonym, validates | ||||||
|
@@ -137,7 +138,7 @@ class DagRun(Base, LoggingMixin): | |||||
triggered_by = Column( | ||||||
Enum(DagRunTriggeredByType, native_enum=False, length=50) | ||||||
) # Airflow component that triggered the run. | ||||||
conf = Column(PickleType) | ||||||
conf = Column(JSON().with_variant(postgresql.JSONB, "postgresql")) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we not need any more handling than these? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Check if the DagRun edit view works in the FAB UI There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think you might need to change this line: Lines 131 to 132 in 3c1124e
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You will also need to add a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fyi: the following is what I had to do for XComs: PRs: |
||||||
# These two must be either both NULL or both datetime. | ||||||
data_interval_start = Column(UtcDateTime) | ||||||
data_interval_end = Column(UtcDateTime) | ||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1 +1 @@ | ||
8f2fd91375c546b297490e701dc3853d7ba53c7cd1422ed7f7e57b9ac86f6eca | ||
3c7e69f4098d4078941590dd3a21b78473e8586683595a592dafc08decf4d534 |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
Update conf column in dag_run table type from byte ( that store a python pickle ) to JSON | ||
|
||
.. Provide additional contextual information | ||
|
||
Column conf of the table dag_run is using the type byte ( and storing a python pickle ) on the database , since airflow only support postgres 12+ and mysql 8+ , we updated it to json type . | ||
|
||
.. Check the type of change that applies to this change | ||
|
||
* Types of change | ||
|
||
* [ ] DAG changes | ||
* [ ] Config changes | ||
* [ ] API changes | ||
* [ ] CLI changes | ||
* [x] Behaviour changes | ||
* [ ] Plugin changes | ||
* [ ] Dependency change | ||
|
||
.. List the migration rules needed for this change (see https://github.com/apache/airflow/issues/41641) | ||
|
||
* Migrations rules needed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to set conf=null, if we are going to drop that column anyway?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Worth for you to take a look too @jedcunningham @ephraimbuddy