-
Notifications
You must be signed in to change notification settings - Fork 14.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Update conf column in dag_run table type from bytea to JSON #44533
base: main
Are you sure you want to change the base?
Update conf column in dag_run table type from bytea to JSON #44533
Conversation
While Testing this I noticed an issue in the downgrade case. We are not removing the data from downgrade and just perform below conversion.
I am getting Should we also move data to the archive table in case of a downgrade as well? |
Yeah, it is different than XCom, because XCom code had handling of both JSON & pickle type -- for dagrun conf it would be different. For downgrade you might just want to insert all records from the archive table back here. |
@@ -137,7 +138,7 @@ class DagRun(Base, LoggingMixin): | |||
triggered_by = Column( | |||
Enum(DagRunTriggeredByType, native_enum=False, length=50) | |||
) # Airflow component that triggered the run. | |||
conf = Column(PickleType) | |||
conf = Column(JSON().with_variant(postgresql.JSONB, "postgresql")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we not need any more handling than these?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Check if the DagRun edit view works in the FAB UI
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you might need to change this line:
Lines 131 to 132 in 3c1124e
if item.conf: | |
item.conf = json.loads(item.conf) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You will also need to add a newsfragment
, something like:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fyi: the following is what I had to do for XComs:
PRs:
…rate_conf_column_as_json
…onomer/airflow into dag_run_migrate_conf_column_as_json
airflow/migrations/versions/0050_3_0_0_remove_pickled_data_from_dagrun_table.py
Outdated
Show resolved
Hide resolved
In [PR](#44166) we added migration for removing pickled data from `xcom` table. During my testing I noticed with `SQLite` [insert](https://github.com/apache/airflow/blob/main/airflow/migrations/versions/0049_3_0_0_remove_pickled_data_from_xcom_table.py#L88) statement is not working in case of upgrade. Changing condition to `hex(substr(value, 1, 1)) = '80'` works. Tested [here](#44533 (comment)). related: #44166
) | ||
|
||
# Delete the pickled data from the dag_run table so that we can update the column type | ||
conn.execute(text(f"DELETE FROM dag_run WHERE conf IS NOT NULL AND {condition}")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't want to delete the rows right? That'll cascade down and delete other stuff too, like TIs.
Should we just set conf
to null instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One difference with this situation vs with xcom is the xcom value is the "valuable" part of the row - this conf isn't though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this is better.
Should we just set conf to null instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this is better.
Should we just set conf to null instead?
This is done
|
||
insert_query = text( | ||
""" | ||
INSERT INTO dag_run (dag_id, run_id, queued_at, logical_date, start_date, end_date, state, creating_job_id, external_trigger, run_type, triggered_by, conf, data_interval_start, data_interval_end, last_scheduling_decision, log_template_id, updated_at, clear_number, backfill_id, dag_version_id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't feel right - what if I upgraded 2 weeks ago and have since done a db clean?!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will only run if _dag_run_archive
exits.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed this
sa.Column("logical_date", sa.TIMESTAMP(timezone=True), nullable=False), | ||
sa.Column("start_date", sa.TIMESTAMP(timezone=True), nullable=True), | ||
sa.Column("end_date", sa.TIMESTAMP(timezone=True), nullable=True), | ||
sa.Column("state", sa.String(length=50), nullable=True), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure we really need a full record for the archive. I also kinda question if we really need one to keep the old pickled confs even.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can simplify and nuke
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nuking will simplify a lot.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are now not archiving the record.
dialect = conn.dialect.name | ||
|
||
# Update the dag_run.conf column value to NULL | ||
conn.execute(text("UPDATE dag_run set conf=null WHERE conf IS NOT NULL")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This just gets rid of all of the values? They may not all be pickled, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assumed that if we upgrade to this new version, all values in the configuration would be pickled. Should we add a condition to check for pickled data?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc: @kaxil
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since DagRun.conf uses PickleType
, all values should be pickled
airflow/airflow/models/dagrun.py
Line 140 in b69441d
conf = Column(PickleType) |
END | ||
""" | ||
) | ||
elif dialect == "mysql": |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And for mysql/sqlite, even if we had data, we don't move any of it from the old to new column.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jedcunningham as discussed I have update mysql and sqlite commands for downgrade
""" | ||
ALTER TABLE dag_run | ||
ALTER COLUMN conf TYPE JSONB | ||
USING CAST(CONVERT_FROM(conf, 'UTF8') AS JSONB); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since everything is already null, do we need this line?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Assuming we don't, we can have same code for all 3 DB types
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
apart from JSONB (Postgres) vs JSON (MySQL & SQLite) where you can do something like:
JSON().with_variant(postgresql.JSONB, "postgresql")
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Implemented same logic for all 3 DB with condition JSON().with_variant(postgresql.JSONB, "postgresql")
op.add_column("dag_run", sa.Column("conf_json", sa.JSON(), nullable=True)) | ||
op.drop_column("dag_run", "conf") | ||
op.alter_column("dag_run", "conf_json", existing_type=sa.JSON(), new_column_name="conf") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to drop the column? Since all values are NULL, can’t we just change the type?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For SQLite
ALTER
column type is not supported.
…onomer/airflow into dag_run_migrate_conf_column_as_json
conn.execute(text("UPDATE dag_run set conf=null WHERE conf IS NOT NULL")) | ||
|
||
# Update the dag_run.conf column value to NULL to avoid issues during the type change | ||
conn.execute(text("UPDATE dag_run set conf=null WHERE conf IS NOT NULL")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Duplicate?
conn.execute(text("UPDATE dag_run set conf=null WHERE conf IS NOT NULL")) | ||
|
||
conf_type = sa.JSON().with_variant(postgresql.JSONB, "postgresql") | ||
|
||
with op.batch_alter_table("dag_run", schema=None) as batch_op: | ||
# Drop the existing column for SQLite (due to its limitations) | ||
batch_op.drop_column("conf") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to set conf=null, if we are going to drop that column anyway?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Worth for you to take a look too @jedcunningham @ephraimbuddy
closes: #43933
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named
{pr_number}.significant.rst
or{issue_number}.significant.rst
, in newsfragments.