diff --git a/airflow/migrations/versions/0140_2_9_0_update_trigger_kwargs_type.py b/airflow/migrations/versions/0140_2_9_0_update_trigger_kwargs_type.py
new file mode 100644
index 0000000000000..dbde1201e4cd0
--- /dev/null
+++ b/airflow/migrations/versions/0140_2_9_0_update_trigger_kwargs_type.py
@@ -0,0 +1,50 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""update trigger kwargs type
+
+Revision ID: 1949afb29106
+Revises: ee1467d4aa35
+Create Date: 2024-03-17 22:09:09.406395
+
+"""
+import sqlalchemy as sa
+
+from airflow.models.trigger import Trigger
+from alembic import op
+
+from airflow.utils.sqlalchemy import ExtendedJSON
+
+# revision identifiers, used by Alembic.
+revision = "1949afb29106"
+down_revision = "ee1467d4aa35"
+branch_labels = None
+depends_on = None
+airflow_version = "2.9.0"
+
+
+def upgrade():
+ """Update trigger kwargs type to string"""
+ with op.batch_alter_table("trigger") as batch_op:
+ batch_op.alter_column("kwargs", type_=sa.Text(), )
+
+
+def downgrade():
+ """Unapply update trigger kwargs type to string"""
+ with op.batch_alter_table("trigger") as batch_op:
+ batch_op.alter_column("kwargs", type_=ExtendedJSON(), postgresql_using="kwargs::json")
diff --git a/airflow/utils/db.py b/airflow/utils/db.py
index 6cc0d7159b2f2..26697bb118c8d 100644
--- a/airflow/utils/db.py
+++ b/airflow/utils/db.py
@@ -90,7 +90,7 @@
"2.7.0": "405de8318b3a",
"2.8.0": "10b52ebd31f7",
"2.8.1": "88344c1d9134",
- "2.9.0": "1fd565369930",
+ "2.9.0": "1949afb29106",
}
@@ -972,6 +972,33 @@ def synchronize_log_template(*, session: Session = NEW_SESSION) -> None:
session.add(LogTemplate(filename=filename, elasticsearch_id=elasticsearch_id))
+def encrypt_trigger_kwargs(*, session: Session) -> None:
+ """Encrypt trigger kwargs."""
+ from airflow.models.trigger import Trigger
+ from airflow.serialization.serialized_objects import BaseSerialization
+
+ for trigger in session.query(Trigger):
+ # convert serialized dict to string and encrypt it
+ trigger.kwargs = BaseSerialization.deserialize(json.loads(trigger.encrypted_kwargs))
+ session.commit()
+
+
+def decrypt_trigger_kwargs(*, session: Session) -> None:
+ """Decrypt trigger kwargs."""
+ from airflow.models.trigger import Trigger
+ from airflow.serialization.serialized_objects import BaseSerialization
+
+ if not inspect(session.bind).has_table(Trigger.__tablename__):
+ # table does not exist, nothing to do
+ # this can happen when we downgrade to an old version before the Trigger table was added
+ return
+
+ for trigger in session.query(Trigger):
+ # decrypt the string and convert it to serialized dict
+ trigger.encrypted_kwargs = json.dumps(BaseSerialization.serialize(trigger.kwargs))
+ session.commit()
+
+
def check_conn_id_duplicates(session: Session) -> Iterable[str]:
"""
Check unique conn_id in connection table.
@@ -1639,6 +1666,12 @@ def upgradedb(
_reserialize_dags(session=session)
add_default_pool_if_not_exists(session=session)
synchronize_log_template(session=session)
+ if _revision_greater(
+ config,
+ _REVISION_HEADS_MAP["2.9.0"],
+ _get_current_revision(session=session),
+ ):
+ encrypt_trigger_kwargs(session=session)
@provide_session
@@ -1711,6 +1744,12 @@ def downgrade(*, to_revision, from_revision=None, show_sql_only=False, session:
else:
log.info("Applying downgrade migrations.")
command.downgrade(config, revision=to_revision, sql=show_sql_only)
+ if _revision_greater(
+ config,
+ _REVISION_HEADS_MAP["2.9.0"],
+ to_revision,
+ ):
+ decrypt_trigger_kwargs(session=session)
def drop_airflow_models(connection):
diff --git a/docs/apache-airflow/img/airflow_erd.sha256 b/docs/apache-airflow/img/airflow_erd.sha256
index 7a1c0a9fba53e..09f84daea2acb 100644
--- a/docs/apache-airflow/img/airflow_erd.sha256
+++ b/docs/apache-airflow/img/airflow_erd.sha256
@@ -1 +1 @@
-3ec33f4a14388277f9aba431c06c3bfa9d044ab2eae466aa394aa9618d2f3eb5
\ No newline at end of file
+2a24225537326f38be5df14e0b7a8dca867122093e0fa932f1a11ac12d1fb11c
\ No newline at end of file
diff --git a/docs/apache-airflow/img/airflow_erd.svg b/docs/apache-airflow/img/airflow_erd.svg
index 144668b90032a..dc32fe0566902 100644
--- a/docs/apache-airflow/img/airflow_erd.svg
+++ b/docs/apache-airflow/img/airflow_erd.svg
@@ -1361,28 +1361,28 @@
task_instance--xcom
-0..N
+1
1
task_instance--xcom
-1
+0..N
1
task_instance--xcom
-0..N
+1
1
task_instance--xcom
-1
+0..N
1
diff --git a/docs/apache-airflow/migrations-ref.rst b/docs/apache-airflow/migrations-ref.rst
index a28f2ee766907..13c70abe9d64f 100644
--- a/docs/apache-airflow/migrations-ref.rst
+++ b/docs/apache-airflow/migrations-ref.rst
@@ -39,7 +39,9 @@ Here's the list of all the Database Migrations that are executed via when you ru
+---------------------------------+-------------------+-------------------+--------------------------------------------------------------+
| Revision ID | Revises ID | Airflow Version | Description |
+=================================+===================+===================+==============================================================+
-| ``ee1467d4aa35`` (head) | ``b4078ac230a1`` | ``2.9.0`` | add display name for dag and task instance |
+| ``1949afb29106`` (head) | ``ee1467d4aa35`` | ``2.9.0`` | update trigger kwargs type |
++---------------------------------+-------------------+-------------------+--------------------------------------------------------------+
+| ``ee1467d4aa35`` | ``b4078ac230a1`` | ``2.9.0`` | add display name for dag and task instance |
+---------------------------------+-------------------+-------------------+--------------------------------------------------------------+
| ``b4078ac230a1`` | ``8e1c784a4fc7`` | ``2.9.0`` | Change value column type to longblob in xcom table for mysql |
+---------------------------------+-------------------+-------------------+--------------------------------------------------------------+