diff --git a/airflow-core/docs/img/airflow_erd.sha256 b/airflow-core/docs/img/airflow_erd.sha256
index 55462aff7bf6b..9922fde77099e 100644
--- a/airflow-core/docs/img/airflow_erd.sha256
+++ b/airflow-core/docs/img/airflow_erd.sha256
@@ -1 +1 @@
-7d6ced1b0a60a60c192ccc102b750c2d893d1c388625f8a90bb95ce457d0d9c4
\ No newline at end of file
+5fb3647b8dc66e0c22e13797f2b3b05a9c09ac9e4a6d0d82b03f1556197fa219
\ No newline at end of file
diff --git a/airflow-core/docs/img/airflow_erd.svg b/airflow-core/docs/img/airflow_erd.svg
index 29eb9a94b2906..363c06bede71a 100644
--- a/airflow-core/docs/img/airflow_erd.svg
+++ b/airflow-core/docs/img/airflow_erd.svg
@@ -698,7 +698,7 @@
NOT NULL
-
+
trigger:id--asset_watcher:trigger_id
0..N
@@ -707,178 +707,178 @@
task_instance
-
-task_instance
-
-id
-
- [UUID]
- NOT NULL
-
-context_carrier
-
- [JSONB]
-
-custom_operator_name
-
- [VARCHAR(1000)]
- NOT NULL
-
-dag_id
-
- [VARCHAR(250)]
- NOT NULL
-
-dag_version_id
-
- [UUID]
-
-duration
-
- [DOUBLE PRECISION]
-
-end_date
-
- [TIMESTAMP]
-
-executor
-
- [VARCHAR(1000)]
-
-executor_config
-
- [BYTEA]
- NOT NULL
-
-external_executor_id
-
- [TEXT]
-
-hostname
-
- [VARCHAR(1000)]
- NOT NULL
-
-last_heartbeat_at
-
- [TIMESTAMP]
-
-map_index
-
- [INTEGER]
- NOT NULL
-
-max_tries
-
- [INTEGER]
- NOT NULL
-
-next_kwargs
-
- [JSONB]
-
-next_method
-
- [VARCHAR(1000)]
-
-operator
-
- [VARCHAR(1000)]
-
-pid
-
- [INTEGER]
-
-pool
-
- [VARCHAR(256)]
- NOT NULL
-
-pool_slots
-
- [INTEGER]
- NOT NULL
-
-priority_weight
-
- [INTEGER]
- NOT NULL
-
-queue
-
- [VARCHAR(256)]
- NOT NULL
-
-queued_by_job_id
-
- [INTEGER]
-
-queued_dttm
-
- [TIMESTAMP]
-
-rendered_map_index
-
- [VARCHAR(250)]
-
-run_id
-
- [VARCHAR(250)]
- NOT NULL
-
-scheduled_dttm
-
- [TIMESTAMP]
-
-span_status
-
- [VARCHAR(250)]
- NOT NULL
-
-start_date
-
- [TIMESTAMP]
-
-state
-
- [VARCHAR(20)]
-
-task_display_name
-
- [VARCHAR(2000)]
-
-task_id
-
- [VARCHAR(250)]
- NOT NULL
-
-trigger_id
-
- [INTEGER]
-
-trigger_timeout
-
- [TIMESTAMP]
-
-try_number
-
- [INTEGER]
- NOT NULL
-
-unixname
-
- [VARCHAR(1000)]
- NOT NULL
-
-updated_at
-
- [TIMESTAMP]
+
+task_instance
+
+id
+
+ [UUID]
+ NOT NULL
+
+context_carrier
+
+ [JSONB]
+
+custom_operator_name
+
+ [VARCHAR(1000)]
+ NOT NULL
+
+dag_id
+
+ [VARCHAR(250)]
+ NOT NULL
+
+dag_version_id
+
+ [UUID]
+
+duration
+
+ [DOUBLE PRECISION]
+
+end_date
+
+ [TIMESTAMP]
+
+executor
+
+ [VARCHAR(1000)]
+
+executor_config
+
+ [BYTEA]
+ NOT NULL
+
+external_executor_id
+
+ [TEXT]
+
+hostname
+
+ [VARCHAR(1000)]
+ NOT NULL
+
+last_heartbeat_at
+
+ [TIMESTAMP]
+
+map_index
+
+ [INTEGER]
+ NOT NULL
+
+max_tries
+
+ [INTEGER]
+ NOT NULL
+
+next_kwargs
+
+ [JSONB]
+
+next_method
+
+ [VARCHAR(1000)]
+
+operator
+
+ [VARCHAR(1000)]
+
+pid
+
+ [INTEGER]
+
+pool
+
+ [VARCHAR(256)]
+ NOT NULL
+
+pool_slots
+
+ [INTEGER]
+ NOT NULL
+
+priority_weight
+
+ [INTEGER]
+ NOT NULL
+
+queue
+
+ [VARCHAR(256)]
+ NOT NULL
+
+queued_by_job_id
+
+ [INTEGER]
+
+queued_dttm
+
+ [TIMESTAMP]
+
+rendered_map_index
+
+ [VARCHAR(250)]
+
+run_id
+
+ [VARCHAR(250)]
+ NOT NULL
+
+scheduled_dttm
+
+ [TIMESTAMP]
+
+span_status
+
+ [VARCHAR(250)]
+ NOT NULL
+
+start_date
+
+ [TIMESTAMP]
+
+state
+
+ [VARCHAR(20)]
+
+task_display_name
+
+ [VARCHAR(2000)]
+
+task_id
+
+ [VARCHAR(250)]
+ NOT NULL
+
+trigger_id
+
+ [INTEGER]
+
+trigger_timeout
+
+ [TIMESTAMP]
+
+try_number
+
+ [INTEGER]
+ NOT NULL
+
+unixname
+
+ [VARCHAR(1000)]
+ NOT NULL
+
+updated_at
+
+ [TIMESTAMP]
-
+
trigger:id--task_instance:trigger_id
-
-0..N
+
+0..N
{0,1}
@@ -926,7 +926,7 @@
NOT NULL
-
+
callback:id--deadline:callback_id
0..N
@@ -935,102 +935,102 @@
asset_alias
-
-asset_alias
-
-id
-
- [INTEGER]
- NOT NULL
-
-group
-
- [VARCHAR(1500)]
- NOT NULL
-
-name
-
- [VARCHAR(1500)]
- NOT NULL
+
+asset_alias
+
+id
+
+ [INTEGER]
+ NOT NULL
+
+group
+
+ [VARCHAR(1500)]
+ NOT NULL
+
+name
+
+ [VARCHAR(1500)]
+ NOT NULL
asset_alias_asset
-
-asset_alias_asset
-
-alias_id
-
- [INTEGER]
- NOT NULL
-
-asset_id
-
- [INTEGER]
- NOT NULL
+
+asset_alias_asset
+
+alias_id
+
+ [INTEGER]
+ NOT NULL
+
+asset_id
+
+ [INTEGER]
+ NOT NULL
-
+
asset_alias:id--asset_alias_asset:alias_id
-
-0..N
-1
+
+0..N
+1
asset_alias_asset_event
-
-asset_alias_asset_event
-
-alias_id
-
- [INTEGER]
- NOT NULL
-
-event_id
-
- [INTEGER]
- NOT NULL
+
+asset_alias_asset_event
+
+alias_id
+
+ [INTEGER]
+ NOT NULL
+
+event_id
+
+ [INTEGER]
+ NOT NULL
-
+
asset_alias:id--asset_alias_asset_event:alias_id
-
-0..N
-1
+
+0..N
+1
dag_schedule_asset_alias_reference
-
-dag_schedule_asset_alias_reference
-
-alias_id
-
- [INTEGER]
- NOT NULL
-
-dag_id
-
- [VARCHAR(250)]
- NOT NULL
-
-created_at
-
- [TIMESTAMP]
- NOT NULL
-
-updated_at
-
- [TIMESTAMP]
- NOT NULL
+
+dag_schedule_asset_alias_reference
+
+alias_id
+
+ [INTEGER]
+ NOT NULL
+
+dag_id
+
+ [VARCHAR(250)]
+ NOT NULL
+
+created_at
+
+ [TIMESTAMP]
+ NOT NULL
+
+updated_at
+
+ [TIMESTAMP]
+ NOT NULL
-
+
asset_alias:id--dag_schedule_asset_alias_reference:alias_id
-
-0..N
-1
+
+0..N
+1
@@ -1074,261 +1074,261 @@
NOT NULL
-
+
asset:id--asset_alias_asset:asset_id
-
-0..N
+
+0..N
1
-
+
asset:id--asset_watcher:asset_id
-
+
0..N
1
asset_active
-
-asset_active
-
-name
-
- [VARCHAR(1500)]
- NOT NULL
-
-uri
-
- [VARCHAR(1500)]
- NOT NULL
+
+asset_active
+
+name
+
+ [VARCHAR(1500)]
+ NOT NULL
+
+uri
+
+ [VARCHAR(1500)]
+ NOT NULL
-asset:name--asset_active:name
-
-1
-1
+asset:uri--asset_active:uri
+
+1
+1
-asset:uri--asset_active:uri
-
-1
-1
+asset:name--asset_active:name
+
+1
+1
dag_schedule_asset_reference
-
-dag_schedule_asset_reference
-
-asset_id
-
- [INTEGER]
- NOT NULL
-
-dag_id
-
- [VARCHAR(250)]
- NOT NULL
-
-created_at
-
- [TIMESTAMP]
- NOT NULL
-
-updated_at
-
- [TIMESTAMP]
- NOT NULL
+
+dag_schedule_asset_reference
+
+asset_id
+
+ [INTEGER]
+ NOT NULL
+
+dag_id
+
+ [VARCHAR(250)]
+ NOT NULL
+
+created_at
+
+ [TIMESTAMP]
+ NOT NULL
+
+updated_at
+
+ [TIMESTAMP]
+ NOT NULL
asset:id--dag_schedule_asset_reference:asset_id
-
-0..N
+
+0..N
1
task_outlet_asset_reference
-
-task_outlet_asset_reference
-
-asset_id
-
- [INTEGER]
- NOT NULL
-
-dag_id
-
- [VARCHAR(250)]
- NOT NULL
-
-task_id
-
- [VARCHAR(250)]
- NOT NULL
-
-created_at
-
- [TIMESTAMP]
- NOT NULL
-
-updated_at
-
- [TIMESTAMP]
- NOT NULL
+
+task_outlet_asset_reference
+
+asset_id
+
+ [INTEGER]
+ NOT NULL
+
+dag_id
+
+ [VARCHAR(250)]
+ NOT NULL
+
+task_id
+
+ [VARCHAR(250)]
+ NOT NULL
+
+created_at
+
+ [TIMESTAMP]
+ NOT NULL
+
+updated_at
+
+ [TIMESTAMP]
+ NOT NULL
asset:id--task_outlet_asset_reference:asset_id
-
-0..N
+
+0..N
1
task_inlet_asset_reference
-
-task_inlet_asset_reference
-
-asset_id
-
- [INTEGER]
- NOT NULL
-
-dag_id
-
- [VARCHAR(250)]
- NOT NULL
-
-task_id
-
- [VARCHAR(250)]
- NOT NULL
-
-created_at
-
- [TIMESTAMP]
- NOT NULL
-
-updated_at
-
- [TIMESTAMP]
- NOT NULL
+
+task_inlet_asset_reference
+
+asset_id
+
+ [INTEGER]
+ NOT NULL
+
+dag_id
+
+ [VARCHAR(250)]
+ NOT NULL
+
+task_id
+
+ [VARCHAR(250)]
+ NOT NULL
+
+created_at
+
+ [TIMESTAMP]
+ NOT NULL
+
+updated_at
+
+ [TIMESTAMP]
+ NOT NULL
asset:id--task_inlet_asset_reference:asset_id
-
-0..N
+
+0..N
1
asset_dag_run_queue
-
-asset_dag_run_queue
-
-asset_id
-
- [INTEGER]
- NOT NULL
-
-target_dag_id
-
- [VARCHAR(250)]
- NOT NULL
-
-created_at
-
- [TIMESTAMP]
- NOT NULL
+
+asset_dag_run_queue
+
+asset_id
+
+ [INTEGER]
+ NOT NULL
+
+target_dag_id
+
+ [VARCHAR(250)]
+ NOT NULL
+
+created_at
+
+ [TIMESTAMP]
+ NOT NULL
asset:id--asset_dag_run_queue:asset_id
-
-0..N
+
+0..N
1
asset_event
-
-asset_event
-
-id
-
- [INTEGER]
- NOT NULL
-
-asset_id
-
- [INTEGER]
- NOT NULL
-
-extra
-
- [JSON]
- NOT NULL
-
-partition_key
-
- [VARCHAR(250)]
-
-source_dag_id
-
- [VARCHAR(250)]
-
-source_map_index
-
- [INTEGER]
-
-source_run_id
-
- [VARCHAR(250)]
-
-source_task_id
-
- [VARCHAR(250)]
-
-timestamp
-
- [TIMESTAMP]
- NOT NULL
+
+asset_event
+
+id
+
+ [INTEGER]
+ NOT NULL
+
+asset_id
+
+ [INTEGER]
+ NOT NULL
+
+extra
+
+ [JSON]
+ NOT NULL
+
+partition_key
+
+ [VARCHAR(250)]
+
+source_dag_id
+
+ [VARCHAR(250)]
+
+source_map_index
+
+ [INTEGER]
+
+source_run_id
+
+ [VARCHAR(250)]
+
+source_task_id
+
+ [VARCHAR(250)]
+
+timestamp
+
+ [TIMESTAMP]
+ NOT NULL
-
+
asset_event:id--asset_alias_asset_event:event_id
-
-0..N
-1
+
+0..N
+1
dagrun_asset_event
-
-dagrun_asset_event
-
-dag_run_id
-
- [INTEGER]
- NOT NULL
-
-event_id
-
- [INTEGER]
- NOT NULL
+
+dagrun_asset_event
+
+dag_run_id
+
+ [INTEGER]
+ NOT NULL
+
+event_id
+
+ [INTEGER]
+ NOT NULL
asset_event:id--dagrun_asset_event:event_id
-
-0..N
-1
+
+0..N
+1
@@ -1387,84 +1387,84 @@
1
-
+
dag:dag_id--dag_schedule_asset_alias_reference:dag_id
-
-0..N
+
+0..N
1
dag:dag_id--dag_schedule_asset_reference:dag_id
-
-0..N
+
+0..N
1
dag:dag_id--task_outlet_asset_reference:dag_id
-
-0..N
+
+0..N
1
dag:dag_id--task_inlet_asset_reference:dag_id
-
-0..N
+
+0..N
1
dag:dag_id--asset_dag_run_queue:target_dag_id
-
-0..N
+
+0..N
1
dag_version
-
-dag_version
-
-id
-
- [UUID]
- NOT NULL
-
-bundle_name
-
- [VARCHAR(250)]
-
-bundle_version
-
- [VARCHAR(250)]
-
-created_at
-
- [TIMESTAMP]
- NOT NULL
-
-dag_id
-
- [VARCHAR(250)]
- NOT NULL
-
-last_updated
-
- [TIMESTAMP]
- NOT NULL
-
-version_number
-
- [INTEGER]
- NOT NULL
+
+dag_version
+
+id
+
+ [UUID]
+ NOT NULL
+
+bundle_name
+
+ [VARCHAR(250)]
+
+bundle_version
+
+ [VARCHAR(250)]
+
+created_at
+
+ [TIMESTAMP]
+ NOT NULL
+
+dag_id
+
+ [VARCHAR(250)]
+ NOT NULL
+
+last_updated
+
+ [TIMESTAMP]
+ NOT NULL
+
+version_number
+
+ [INTEGER]
+ NOT NULL
dag:dag_id--dag_version:dag_id
-
-0..N
+
+0..N
1
@@ -1577,133 +1577,133 @@
dag_run
-
-dag_run
-
-id
-
- [INTEGER]
- NOT NULL
-
-backfill_id
-
- [INTEGER]
-
-bundle_version
-
- [VARCHAR(250)]
-
-clear_number
-
- [INTEGER]
- NOT NULL
-
-conf
-
- [JSONB]
-
-context_carrier
-
- [JSONB]
-
-created_dag_version_id
-
- [UUID]
-
-creating_job_id
-
- [INTEGER]
-
-dag_id
-
- [VARCHAR(250)]
- NOT NULL
-
-data_interval_end
-
- [TIMESTAMP]
-
-data_interval_start
-
- [TIMESTAMP]
-
-end_date
-
- [TIMESTAMP]
-
-last_scheduling_decision
-
- [TIMESTAMP]
-
-log_template_id
-
- [INTEGER]
- NOT NULL
-
-logical_date
-
- [TIMESTAMP]
-
-partition_key
-
- [VARCHAR(250)]
-
-queued_at
-
- [TIMESTAMP]
-
-run_after
-
- [TIMESTAMP]
- NOT NULL
-
-run_id
-
- [VARCHAR(250)]
- NOT NULL
-
-run_type
-
- [VARCHAR(50)]
- NOT NULL
-
-scheduled_by_job_id
-
- [INTEGER]
-
-span_status
-
- [VARCHAR(250)]
- NOT NULL
-
-start_date
-
- [TIMESTAMP]
-
-state
-
- [VARCHAR(50)]
- NOT NULL
-
-triggered_by
-
- [VARCHAR(50)]
-
-triggering_user_name
-
- [VARCHAR(512)]
-
-updated_at
-
- [TIMESTAMP]
- NOT NULL
+
+dag_run
+
+id
+
+ [INTEGER]
+ NOT NULL
+
+backfill_id
+
+ [INTEGER]
+
+bundle_version
+
+ [VARCHAR(250)]
+
+clear_number
+
+ [INTEGER]
+ NOT NULL
+
+conf
+
+ [JSONB]
+
+context_carrier
+
+ [JSONB]
+
+created_dag_version_id
+
+ [UUID]
+
+creating_job_id
+
+ [INTEGER]
+
+dag_id
+
+ [VARCHAR(250)]
+ NOT NULL
+
+data_interval_end
+
+ [TIMESTAMP]
+
+data_interval_start
+
+ [TIMESTAMP]
+
+end_date
+
+ [TIMESTAMP]
+
+last_scheduling_decision
+
+ [TIMESTAMP]
+
+log_template_id
+
+ [INTEGER]
+ NOT NULL
+
+logical_date
+
+ [TIMESTAMP]
+
+partition_key
+
+ [VARCHAR(250)]
+
+queued_at
+
+ [TIMESTAMP]
+
+run_after
+
+ [TIMESTAMP]
+ NOT NULL
+
+run_id
+
+ [VARCHAR(250)]
+ NOT NULL
+
+run_type
+
+ [VARCHAR(50)]
+ NOT NULL
+
+scheduled_by_job_id
+
+ [INTEGER]
+
+span_status
+
+ [VARCHAR(250)]
+ NOT NULL
+
+start_date
+
+ [TIMESTAMP]
+
+state
+
+ [VARCHAR(50)]
+ NOT NULL
+
+triggered_by
+
+ [VARCHAR(50)]
+
+triggering_user_name
+
+ [VARCHAR(512)]
+
+updated_at
+
+ [TIMESTAMP]
+ NOT NULL
-
+
dag_version:id--dag_run:created_dag_version_id
-
-0..N
-{0,1}
+
+0..N
+{0,1}
@@ -1754,9 +1754,9 @@
dag_version:id--dag_code:dag_version_id
-
+
0..N
-1
+1
@@ -1805,112 +1805,112 @@
dag_version:id--serialized_dag:dag_version_id
-
+
0..N
-1
+1
-
+
dag_version:id--task_instance:dag_version_id
-
-0..N
-{0,1}
+
+0..N
+{0,1}
log_template
-
-log_template
-
-id
-
- [INTEGER]
- NOT NULL
-
-created_at
-
- [TIMESTAMP]
- NOT NULL
-
-elasticsearch_id
-
- [TEXT]
- NOT NULL
-
-filename
-
- [TEXT]
- NOT NULL
+
+log_template
+
+id
+
+ [INTEGER]
+ NOT NULL
+
+created_at
+
+ [TIMESTAMP]
+ NOT NULL
+
+elasticsearch_id
+
+ [TEXT]
+ NOT NULL
+
+filename
+
+ [TEXT]
+ NOT NULL
-
+
log_template:id--dag_run:log_template_id
-
-0..N
-1
+
+0..N
+1
dag_run:id--dagrun_asset_event:dag_run_id
-
-0..N
-1
+
+0..N
+1
asset_partition_dag_run
-
-asset_partition_dag_run
-
-id
-
- [INTEGER]
- NOT NULL
-
-created_at
-
- [TIMESTAMP]
- NOT NULL
-
-created_dag_run_id
-
- [INTEGER]
-
-partition_key
-
- [VARCHAR(250)]
- NOT NULL
-
-target_dag_id
-
- [VARCHAR(250)]
- NOT NULL
-
-updated_at
-
- [TIMESTAMP]
- NOT NULL
+
+asset_partition_dag_run
+
+id
+
+ [INTEGER]
+ NOT NULL
+
+created_at
+
+ [TIMESTAMP]
+ NOT NULL
+
+created_dag_run_id
+
+ [INTEGER]
+
+partition_key
+
+ [VARCHAR(250)]
+ NOT NULL
+
+target_dag_id
+
+ [VARCHAR(250)]
+ NOT NULL
+
+updated_at
+
+ [TIMESTAMP]
+ NOT NULL
dag_run:id--asset_partition_dag_run:created_dag_run_id
-
-0..N
-{0,1}
+
+0..N
+{0,1}
-
-dag_run:dag_id--task_instance:dag_id
-
-0..N
-1
+
+dag_run:run_id--task_instance:run_id
+
+0..N
+1
-dag_run:run_id--task_instance:run_id
-
-0..N
-1
+dag_run:dag_id--task_instance:dag_id
+
+0..N
+1
@@ -1947,131 +1947,131 @@
NOT NULL
-
+
dag_run:id--backfill_dag_run:dag_run_id
-
+
0..N
-{0,1}
+{0,1}
dag_run_note
-
-dag_run_note
-
-dag_run_id
-
- [INTEGER]
- NOT NULL
-
-content
-
- [VARCHAR(1000)]
-
-created_at
-
- [TIMESTAMP]
- NOT NULL
-
-updated_at
-
- [TIMESTAMP]
- NOT NULL
-
-user_id
-
- [VARCHAR(128)]
+
+dag_run_note
+
+dag_run_id
+
+ [INTEGER]
+ NOT NULL
+
+content
+
+ [VARCHAR(1000)]
+
+created_at
+
+ [TIMESTAMP]
+ NOT NULL
+
+updated_at
+
+ [TIMESTAMP]
+ NOT NULL
+
+user_id
+
+ [VARCHAR(128)]
dag_run:id--dag_run_note:dag_run_id
-
-1
-1
+
+1
+1
-
+
dag_run:id--deadline:dagrun_id
-
+
0..N
-{0,1}
+{0,1}
backfill
-
-backfill
-
-id
-
- [INTEGER]
- NOT NULL
-
-completed_at
-
- [TIMESTAMP]
-
-created_at
-
- [TIMESTAMP]
- NOT NULL
-
-dag_id
-
- [VARCHAR(250)]
- NOT NULL
-
-dag_run_conf
-
- [JSON]
- NOT NULL
-
-from_date
-
- [TIMESTAMP]
- NOT NULL
-
-is_paused
-
- [BOOLEAN]
-
-max_active_runs
-
- [INTEGER]
- NOT NULL
-
-reprocess_behavior
-
- [VARCHAR(250)]
- NOT NULL
-
-to_date
-
- [TIMESTAMP]
- NOT NULL
-
-triggering_user_name
-
- [VARCHAR(512)]
-
-updated_at
-
- [TIMESTAMP]
- NOT NULL
+
+backfill
+
+id
+
+ [INTEGER]
+ NOT NULL
+
+completed_at
+
+ [TIMESTAMP]
+
+created_at
+
+ [TIMESTAMP]
+ NOT NULL
+
+dag_id
+
+ [VARCHAR(250)]
+ NOT NULL
+
+dag_run_conf
+
+ [JSON]
+ NOT NULL
+
+from_date
+
+ [TIMESTAMP]
+ NOT NULL
+
+is_paused
+
+ [BOOLEAN]
+
+max_active_runs
+
+ [INTEGER]
+ NOT NULL
+
+reprocess_behavior
+
+ [VARCHAR(250)]
+ NOT NULL
+
+to_date
+
+ [TIMESTAMP]
+ NOT NULL
+
+triggering_user_name
+
+ [VARCHAR(512)]
+
+updated_at
+
+ [TIMESTAMP]
+ NOT NULL
backfill:id--dag_run:backfill_id
-
-0..N
-{0,1}
+
+0..N
+{0,1}
-
+
backfill:id--backfill_dag_run:backfill_id
-
+
0..N
-1
+1
@@ -2191,9 +2191,9 @@
task_instance:id--hitl_detail:ti_id
-
+
1
-1
+1
@@ -2233,30 +2233,30 @@
task_instance:task_id--task_map:task_id
-
+
0..N
-1
+1
task_instance:map_index--task_map:map_index
-
+
0..N
-1
+1
task_instance:run_id--task_map:run_id
-
+
0..N
-1
+1
task_instance:dag_id--task_map:dag_id
-
+
0..N
-1
+1
@@ -2297,9 +2297,9 @@
task_instance:id--task_reschedule:ti_id
-
+
0..N
-1
+1
@@ -2348,31 +2348,31 @@
-task_instance:run_id--xcom:run_id
-
-0..N
-1
+task_instance:dag_id--xcom:dag_id
+
+0..N
+1
-task_instance:dag_id--xcom:dag_id
-
-0..N
-1
+task_instance:run_id--xcom:run_id
+
+0..N
+1
task_instance:map_index--xcom:map_index
-
+
0..N
-1
+1
task_instance:task_id--xcom:task_id
-
+
0..N
-1
+1
@@ -2406,9 +2406,9 @@
task_instance:id--task_instance_note:ti_id
-
+
1
-1
+1
@@ -2572,30 +2572,30 @@
task_instance:task_id--task_instance_history:task_id
-
+
0..N
-1
+1
-task_instance:run_id--task_instance_history:run_id
-
-0..N
-1
+task_instance:map_index--task_instance_history:map_index
+
+0..N
+1
task_instance:dag_id--task_instance_history:dag_id
-
+
0..N
-1
+1
-task_instance:map_index--task_instance_history:map_index
-
-0..N
-1
+task_instance:run_id--task_instance_history:run_id
+
+0..N
+1
@@ -2635,33 +2635,33 @@
task_instance:run_id--rendered_task_instance_fields:run_id
-
+
0..N
-1
+1
-task_instance:map_index--rendered_task_instance_fields:map_index
-
-0..N
-1
+task_instance:dag_id--rendered_task_instance_fields:dag_id
+
+0..N
+1
task_instance:task_id--rendered_task_instance_fields:task_id
-
+
0..N
-1
+1
-task_instance:dag_id--rendered_task_instance_fields:dag_id
-
-0..N
-1
+task_instance:map_index--rendered_task_instance_fields:map_index
+
+0..N
+1
-
+
deadline_alert:id--deadline:deadline_alert_id
0..N
diff --git a/airflow-core/docs/migrations-ref.rst b/airflow-core/docs/migrations-ref.rst
index 61cff3374e17b..01f0fc1c3cee9 100644
--- a/airflow-core/docs/migrations-ref.rst
+++ b/airflow-core/docs/migrations-ref.rst
@@ -39,7 +39,9 @@ Here's the list of all the Database Migrations that are executed via when you ru
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
| Revision ID | Revises ID | Airflow Version | Description |
+=========================+==================+===================+==============================================================+
-| ``53ff648b8a26`` (head) | ``a5a3e5eb9b8d`` | ``3.2.0`` | Add revoked_token table. |
+| ``f8c9d7e6b5a4`` (head) | ``53ff648b8a26`` | ``3.2.0`` | Standardize UUID column format for non-PostgreSQL databases. |
++-------------------------+------------------+-------------------+--------------------------------------------------------------+
+| ``53ff648b8a26`` | ``a5a3e5eb9b8d`` | ``3.2.0`` | Add revoked_token table. |
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
| ``a5a3e5eb9b8d`` | ``82dbd68e6171`` | ``3.2.0`` | Make external_executor_id TEXT to allow for longer |
| | | | external_executor_ids. |
diff --git a/airflow-core/pyproject.toml b/airflow-core/pyproject.toml
index 052e4df855eed..3638c47aaa3e0 100644
--- a/airflow-core/pyproject.toml
+++ b/airflow-core/pyproject.toml
@@ -137,7 +137,6 @@ dependencies = [
"setproctitle>=1.3.3",
# SQLAlchemy >=2.0.36 fixes Python 3.13 TypingOnly import AssertionError caused by new typing attributes (__static_attributes__, __firstlineno__)
"sqlalchemy[asyncio]>=2.0.36",
- "sqlalchemy-utils>=0.41.2",
"svcs>=25.1.0",
"tabulate>=0.9.0",
"tenacity>=8.3.0",
diff --git a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/task_instances.py b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/task_instances.py
index 61f711d0980f7..397389994a09f 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/task_instances.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/task_instances.py
@@ -19,6 +19,7 @@
from collections.abc import Iterable
from datetime import datetime
from typing import Annotated, Any
+from uuid import UUID
from pydantic import (
AliasPath,
@@ -42,7 +43,7 @@
class TaskInstanceResponse(BaseModel):
"""TaskInstance serializer for responses."""
- id: str
+ id: UUID
task_id: str
dag_id: str
run_id: str = Field(alias="dag_run_id")
diff --git a/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml b/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml
index 53f7b5c2c817e..72832270c4d54 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml
+++ b/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml
@@ -2545,6 +2545,7 @@ components:
properties:
id:
type: string
+ format: uuid
title: Id
task_id:
type: string
diff --git a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
index 0d6663c3377c3..847409b00769b 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
+++ b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
@@ -12582,6 +12582,7 @@ components:
properties:
id:
type: string
+ format: uuid
title: Id
task_id:
type: string
diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/deps.py b/airflow-core/src/airflow/api_fastapi/execution_api/deps.py
index fce188d48ed6d..9fc8c30cb926e 100644
--- a/airflow-core/src/airflow/api_fastapi/execution_api/deps.py
+++ b/airflow-core/src/airflow/api_fastapi/execution_api/deps.py
@@ -114,6 +114,6 @@ async def get_team_name_dep(session: AsyncSessionDep, token=JWTBearerDep) -> str
.join(DagModel, DagModel.dag_id == TaskInstance.dag_id)
.join(DagBundleModel, DagBundleModel.name == DagModel.bundle_name)
.join(DagBundleModel.teams)
- .where(TaskInstance.id == str(token.id))
+ .where(TaskInstance.id == token.id)
)
return await session.scalar(stmt)
diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/routes/hitl.py b/airflow-core/src/airflow/api_fastapi/execution_api/routes/hitl.py
index 1c91c3e5b3b84..220bd357ed222 100644
--- a/airflow-core/src/airflow/api_fastapi/execution_api/routes/hitl.py
+++ b/airflow-core/src/airflow/api_fastapi/execution_api/routes/hitl.py
@@ -60,11 +60,10 @@ def upsert_hitl_detail(
This happens when a task instance is cleared after a response has been received.
This design ensures that each task instance has only one HITLDetail.
"""
- ti_id_str = str(task_instance_id)
- hitl_detail_model = session.scalar(select(HITLDetail).where(HITLDetail.ti_id == ti_id_str))
+ hitl_detail_model = session.scalar(select(HITLDetail).where(HITLDetail.ti_id == task_instance_id))
if not hitl_detail_model:
hitl_detail_model = HITLDetail(
- ti_id=ti_id_str,
+ ti_id=task_instance_id,
options=payload.options,
subject=payload.subject,
body=payload.body,
@@ -109,15 +108,14 @@ def update_hitl_detail(
session: SessionDep,
) -> HITLDetailResponse:
"""Update the response part of a Human-in-the-loop detail for a specific Task Instance."""
- ti_id_str = str(task_instance_id)
hitl_detail_model_result = session.execute(
- select(HITLDetail).where(HITLDetail.ti_id == ti_id_str)
+ select(HITLDetail).where(HITLDetail.ti_id == task_instance_id)
).scalar()
hitl_detail_model = _check_hitl_detail_exists(hitl_detail_model_result)
if hitl_detail_model.response_received:
raise HTTPException(
status.HTTP_409_CONFLICT,
- f"Human-in-the-loop detail for Task Instance with id {ti_id_str} already exists.",
+ f"Human-in-the-loop detail for Task Instance with id {task_instance_id} already exists.",
)
hitl_detail_model.responded_by = None
@@ -138,9 +136,8 @@ def get_hitl_detail(
session: SessionDep,
) -> HITLDetailResponse:
"""Get Human-in-the-loop detail for a specific Task Instance."""
- ti_id_str = str(task_instance_id)
hitl_detail_model_result = session.execute(
- select(HITLDetail).where(HITLDetail.ti_id == ti_id_str),
+ select(HITLDetail).where(HITLDetail.ti_id == task_instance_id),
).scalar()
hitl_detail_model = _check_hitl_detail_exists(hitl_detail_model_result)
return HITLDetailResponse.from_hitl_detail_orm(hitl_detail_model)
diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
index df9d6d9bdfc00..f22d7c125853d 100644
--- a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
+++ b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
@@ -109,9 +109,7 @@ def ti_run(
This endpoint is used to start a TaskInstance that is in the QUEUED state.
"""
- # We only use UUID above for validation purposes
- ti_id_str = str(task_instance_id)
- bind_contextvars(ti_id=ti_id_str)
+ bind_contextvars(ti_id=str(task_instance_id))
log.debug(
"Starting task instance run",
hostname=ti_run_payload.hostname,
@@ -140,7 +138,7 @@ def ti_run(
column("next_kwargs", JSON),
)
.select_from(TI)
- .where(TI.id == ti_id_str)
+ .where(TI.id == task_instance_id)
.with_for_update()
)
try:
@@ -164,7 +162,7 @@ def ti_run(
data.pop("start_date")
log.debug("Removed start_date from update as task is resuming from deferral")
- query = update(TI).where(TI.id == ti_id_str).values(data)
+ query = update(TI).where(TI.id == task_instance_id).values(data)
previous_state = ti.state
@@ -244,7 +242,9 @@ def ti_run(
xcom_keys = list(session.scalars(xcom_query))
task_reschedule_count = (
- session.scalar(select(func.count(TaskReschedule.id)).where(TaskReschedule.ti_id == ti_id_str))
+ session.scalar(
+ select(func.count(TaskReschedule.id)).where(TaskReschedule.ti_id == task_instance_id)
+ )
or 0
)
@@ -293,12 +293,14 @@ def ti_update_state(
Not all state transitions are valid, and transitioning to some states requires extra information to be
passed along. (Check out the datamodels for details, the rendered docs might not reflect this accurately)
"""
- # We only use UUID above for validation purposes
- ti_id_str = str(task_instance_id)
- bind_contextvars(ti_id=ti_id_str)
+ bind_contextvars(ti_id=str(task_instance_id))
log.debug("Updating task instance state", new_state=ti_patch_payload.state)
- old = select(TI.state, TI.try_number, TI.max_tries, TI.dag_id).where(TI.id == ti_id_str).with_for_update()
+ old = (
+ select(TI.state, TI.try_number, TI.max_tries, TI.dag_id)
+ .where(TI.id == task_instance_id)
+ .with_for_update()
+ )
try:
(
previous_state,
@@ -338,12 +340,12 @@ def ti_update_state(
# We exclude_unset to avoid updating fields that are not set in the payload
data = ti_patch_payload.model_dump(exclude={"task_outlets", "outlet_events"}, exclude_unset=True)
- query = update(TI).where(TI.id == ti_id_str).values(data)
+ query = update(TI).where(TI.id == task_instance_id).values(data)
try:
query, updated_state = _create_ti_state_update_query_and_update_state(
ti_patch_payload=ti_patch_payload,
- ti_id_str=ti_id_str,
+ task_instance_id=task_instance_id,
session=session,
query=query,
dag_id=dag_id,
@@ -355,7 +357,7 @@ def ti_update_state(
"Error updating Task Instance state. Setting the task to failed.",
payload=ti_patch_payload,
)
- ti = session.get(TI, ti_id_str, with_for_update=True)
+ ti = session.get(TI, task_instance_id, with_for_update=True)
if session.bind is not None:
query = TI.duration_expression_update(timezone.utcnow(), query, session.bind)
query = query.values(state=(updated_state := TaskInstanceState.FAILED))
@@ -398,14 +400,14 @@ def _handle_fail_fast_for_dag(ti: TI, dag_id: str, session: SessionDep, dag_bag:
def _create_ti_state_update_query_and_update_state(
*,
ti_patch_payload: TIStateUpdate,
- ti_id_str: str,
+ task_instance_id: UUID,
query: Update,
session: SessionDep,
dag_bag: DagBagDep,
dag_id: str,
) -> tuple[Update, TaskInstanceState]:
if isinstance(ti_patch_payload, (TITerminalStatePayload, TIRetryStatePayload, TISuccessStatePayload)):
- ti = session.get(TI, ti_id_str, with_for_update=True)
+ ti = session.get(TI, task_instance_id, with_for_update=True)
updated_state = TaskInstanceState(ti_patch_payload.state.value)
if session.bind is not None:
query = TI.duration_expression_update(ti_patch_payload.end_date, query, session.bind)
@@ -450,7 +452,7 @@ def _create_ti_state_update_query_and_update_state(
# TODO: HANDLE execution timeout later as it requires a call to the DB
# either get it from the serialised DAG or get it from the API
- query = update(TI).where(TI.id == ti_id_str)
+ query = update(TI).where(TI.id == task_instance_id)
# Store next_kwargs directly (already serialized by worker)
query = query.values(
@@ -477,7 +479,7 @@ def _create_ti_state_update_query_and_update_state(
mysql_timestamp_max=_MYSQL_TIMESTAMP_MAX,
)
data = ti_patch_payload.model_dump(exclude={"reschedule_date"}, exclude_unset=True)
- query = update(TI).where(TI.id == ti_id_str).values(data)
+ query = update(TI).where(TI.id == task_instance_id).values(data)
if session.bind is not None:
query = TI.duration_expression_update(timezone.utcnow(), query, session.bind)
query = query.values(state=TaskInstanceState.FAILED)
@@ -486,19 +488,19 @@ def _create_ti_state_update_query_and_update_state(
# in SQLA2. The task is marked as FAILED regardless.
return query, TaskInstanceState.FAILED
- # We can directly use ti_id_str instead of fetching the TaskInstance object to avoid SQLA2
+ # We can directly use task_instance_id instead of fetching the TaskInstance object to avoid SQLA2
# lock contention issues when the TaskInstance row is already locked from before.
actual_start_date = timezone.utcnow()
session.add(
TaskReschedule(
- ti_id_str,
+ task_instance_id,
actual_start_date,
ti_patch_payload.end_date,
ti_patch_payload.reschedule_date,
)
)
- query = update(TI).where(TI.id == ti_id_str)
+ query = update(TI).where(TI.id == task_instance_id)
# calculate the duration for TI table too
if session.bind is not None:
query = TI.duration_expression_update(ti_patch_payload.end_date, query, session.bind)
@@ -524,14 +526,13 @@ def ti_skip_downstream(
ti_patch_payload: TISkippedDownstreamTasksStatePayload,
session: SessionDep,
):
- ti_id_str = str(task_instance_id)
- bind_contextvars(ti_id=ti_id_str)
+ bind_contextvars(ti_id=str(task_instance_id))
log.info("Skipping downstream tasks", task_count=len(ti_patch_payload.tasks))
now = timezone.utcnow()
tasks = ti_patch_payload.tasks
- query_result = session.execute(select(TI.dag_id, TI.run_id).where(TI.id == ti_id_str))
+ query_result = session.execute(select(TI.dag_id, TI.run_id).where(TI.id == task_instance_id))
row_result = query_result.fetchone()
if row_result is None:
raise HTTPException(
@@ -572,14 +573,13 @@ def ti_heartbeat(
session: SessionDep,
):
"""Update the heartbeat of a TaskInstance to mark it as alive & still running."""
- ti_id_str = str(task_instance_id)
- bind_contextvars(ti_id=ti_id_str)
+ bind_contextvars(ti_id=str(task_instance_id))
log.debug("Processing heartbeat", hostname=ti_payload.hostname, pid=ti_payload.pid)
# Hot path: since heartbeating a task is a very common operation, we try to do minimize the number of queries
# and DB round trips as much as possible.
- old = select(TI.state, TI.hostname, TI.pid).where(TI.id == ti_id_str).with_for_update()
+ old = select(TI.state, TI.hostname, TI.pid).where(TI.id == task_instance_id).with_for_update()
try:
(previous_state, hostname, pid) = session.execute(old).one()
@@ -626,7 +626,7 @@ def ti_heartbeat(
)
# Update the last heartbeat time!
- session.execute(update(TI).where(TI.id == ti_id_str).values(last_heartbeat_at=timezone.utcnow()))
+ session.execute(update(TI).where(TI.id == task_instance_id).values(last_heartbeat_at=timezone.utcnow()))
log.debug("Heartbeat updated", state=previous_state)
@@ -651,11 +651,10 @@ def ti_put_rtif(
session: SessionDep,
):
"""Add an RTIF entry for a task instance, sent by the worker."""
- ti_id_str = str(task_instance_id)
- bind_contextvars(ti_id=ti_id_str)
+ bind_contextvars(ti_id=str(task_instance_id))
log.info("Updating RenderedTaskInstanceFields", field_count=len(put_rtif_payload))
- task_instance = session.scalar(select(TI).where(TI.id == ti_id_str))
+ task_instance = session.scalar(select(TI).where(TI.id == task_instance_id))
if not task_instance:
log.error("Task Instance not found")
raise HTTPException(
@@ -681,8 +680,7 @@ def ti_patch_rendered_map_index(
session: SessionDep,
):
"""Update rendered_map_index for a task instance, sent by the worker during task execution."""
- ti_id_str = str(task_instance_id)
- bind_contextvars(ti_id=ti_id_str)
+ bind_contextvars(ti_id=str(task_instance_id))
if not rendered_map_index:
log.error("rendered_map_index cannot be empty")
@@ -693,7 +691,7 @@ def ti_patch_rendered_map_index(
log.debug("Updating rendered_map_index", length=len(rendered_map_index))
- query = update(TI).where(TI.id == ti_id_str).values(rendered_map_index=rendered_map_index)
+ query = update(TI).where(TI.id == task_instance_id).values(rendered_map_index=rendered_map_index)
result = session.execute(query)
result = cast("CursorResult[Any]", result)
@@ -720,11 +718,10 @@ def get_previous_successful_dagrun(
The data from this endpoint is used to get values for Task Context.
"""
- ti_id_str = str(task_instance_id)
- bind_contextvars(ti_id=ti_id_str)
+ bind_contextvars(ti_id=str(task_instance_id))
log.debug("Retrieving previous successful DAG run")
- task_instance = session.scalar(select(TI).where(TI.id == ti_id_str))
+ task_instance = session.scalar(select(TI).where(TI.id == task_instance_id))
if not task_instance or not task_instance.logical_date:
log.debug("No task instance or logical date found")
return PrevSuccessfulDagRunResponse()
@@ -977,10 +974,9 @@ def validate_inlets_and_outlets(
dag_bag: DagBagDep,
) -> InactiveAssetsResponse:
"""Validate whether there're inactive assets in inlets and outlets of a given task instance."""
- ti_id_str = str(task_instance_id)
- bind_contextvars(ti_id=ti_id_str)
+ bind_contextvars(ti_id=str(task_instance_id))
- ti = session.scalar(select(TI).where(TI.id == ti_id_str))
+ ti = session.scalar(select(TI).where(TI.id == task_instance_id))
if not ti:
log.error("Task Instance not found")
raise HTTPException(
diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_reschedules.py b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_reschedules.py
index f763858f9b9c1..3c7d4c8070f34 100644
--- a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_reschedules.py
+++ b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_reschedules.py
@@ -39,7 +39,7 @@ def get_start_date(task_instance_id: UUID, session: SessionDep) -> UtcDateTime |
"""Get the first reschedule date if found, None if no records exist."""
start_date = session.scalar(
select(TaskReschedule.start_date)
- .where(TaskReschedule.ti_id == str(task_instance_id))
+ .where(TaskReschedule.ti_id == task_instance_id)
.order_by(TaskReschedule.id.asc())
.limit(1)
)
diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
index 765bb9e5db1a7..44c8910791c6f 100644
--- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
+++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
@@ -32,6 +32,7 @@
from functools import lru_cache, partial
from itertools import groupby
from typing import TYPE_CHECKING, Any
+from uuid import UUID
from sqlalchemy import (
and_,
@@ -1369,7 +1370,7 @@ def _end_active_spans(self, session: Session = NEW_SESSION):
prefix, sep, key = prefixed_key.partition(":")
if prefix == "ti":
- ti_result = session.get(TaskInstance, key)
+ ti_result = session.get(TaskInstance, UUID(key))
if ti_result is None:
continue
ti: TaskInstance = ti_result
@@ -1438,14 +1439,14 @@ def _end_spans_of_externally_ended_ops(self, session: Session):
dag_run.span_status = SpanStatus.ENDED
for ti in tis_should_end:
- active_ti_span = self.active_spans.get("ti:" + ti.id)
+ active_ti_span = self.active_spans.get(f"ti:{ti.id}")
if active_ti_span is not None:
if ti.state in State.finished:
self.set_ti_span_attrs(span=active_ti_span, state=ti.state, ti=ti)
active_ti_span.end(end_time=datetime_to_nano(ti.end_date))
else:
active_ti_span.end()
- self.active_spans.delete("ti:" + ti.id)
+ self.active_spans.delete(f"ti:{ti.id}")
ti.span_status = SpanStatus.ENDED
def _recreate_unhealthy_scheduler_spans_if_needed(self, dag_run: DagRun, session: Session):
@@ -1495,7 +1496,7 @@ def _recreate_unhealthy_scheduler_spans_if_needed(self, dag_run: DagRun, session
for ti in tis
# If it has started and there is a reference on the active_spans dict,
# then it was started by the current scheduler.
- if ti.start_date is not None and self.active_spans.get("ti:" + ti.id) is None
+ if ti.start_date is not None and self.active_spans.get(f"ti:{ti.id}") is None
]
dr_context = Trace.extract(dag_run.context_carrier)
@@ -1515,7 +1516,7 @@ def _recreate_unhealthy_scheduler_spans_if_needed(self, dag_run: DagRun, session
ti.span_status = SpanStatus.ENDED
else:
ti.span_status = SpanStatus.ACTIVE
- self.active_spans.set("ti:" + ti.id, ti_span)
+ self.active_spans.set(f"ti:{ti.id}", ti_span)
def _run_scheduler_loop(self) -> None:
"""
diff --git a/airflow-core/src/airflow/migrations/versions/0047_3_0_0_add_dag_versioning.py b/airflow-core/src/airflow/migrations/versions/0047_3_0_0_add_dag_versioning.py
index 184c0a98218e0..c55416aaaf2a7 100644
--- a/airflow-core/src/airflow/migrations/versions/0047_3_0_0_add_dag_versioning.py
+++ b/airflow-core/src/airflow/migrations/versions/0047_3_0_0_add_dag_versioning.py
@@ -29,7 +29,6 @@
import sqlalchemy as sa
from alembic import op
-from sqlalchemy_utils import UUIDType
from airflow._shared.timezones import timezone
from airflow.migrations.db_types import TIMESTAMP, StringID
@@ -51,7 +50,7 @@ def upgrade():
op.create_table(
"dag_version",
- sa.Column("id", UUIDType(binary=False), nullable=False),
+ sa.Column("id", sa.Uuid(), nullable=False),
sa.Column("version_number", sa.Integer(), nullable=False),
sa.Column("dag_id", StringID(), nullable=False),
sa.Column("created_at", TIMESTAMP(), nullable=False, default=timezone.utcnow),
@@ -77,9 +76,9 @@ def upgrade():
with op.batch_alter_table("dag_code") as batch_op:
batch_op.drop_column("fileloc_hash")
- batch_op.add_column(sa.Column("id", UUIDType(binary=False), nullable=False))
+ batch_op.add_column(sa.Column("id", sa.Uuid(), nullable=False))
batch_op.create_primary_key("dag_code_pkey", ["id"])
- batch_op.add_column(sa.Column("dag_version_id", UUIDType(binary=False), nullable=False))
+ batch_op.add_column(sa.Column("dag_version_id", sa.Uuid(), nullable=False))
batch_op.add_column(sa.Column("source_code_hash", sa.String(length=32), nullable=False))
batch_op.add_column(sa.Column("dag_id", StringID(), nullable=False))
batch_op.add_column(sa.Column("created_at", TIMESTAMP(), default=timezone.utcnow, nullable=False))
@@ -99,8 +98,8 @@ def upgrade():
batch_op.drop_index("idx_fileloc_hash")
batch_op.drop_column("fileloc_hash")
batch_op.drop_column("fileloc")
- batch_op.add_column(sa.Column("id", UUIDType(binary=False), nullable=False))
- batch_op.add_column(sa.Column("dag_version_id", UUIDType(binary=False), nullable=False))
+ batch_op.add_column(sa.Column("id", sa.Uuid(), nullable=False))
+ batch_op.add_column(sa.Column("dag_version_id", sa.Uuid(), nullable=False))
batch_op.add_column(sa.Column("created_at", TIMESTAMP(), default=timezone.utcnow, nullable=False))
batch_op.create_primary_key("serialized_dag_pkey", ["id"])
batch_op.create_foreign_key(
@@ -113,7 +112,7 @@ def upgrade():
batch_op.create_unique_constraint("serialized_dag_dag_version_id_uq", ["dag_version_id"])
with op.batch_alter_table("task_instance", schema=None) as batch_op:
- batch_op.add_column(sa.Column("dag_version_id", UUIDType(binary=False)))
+ batch_op.add_column(sa.Column("dag_version_id", sa.Uuid()))
batch_op.create_foreign_key(
batch_op.f("task_instance_dag_version_id_fkey"),
"dag_version",
@@ -123,11 +122,11 @@ def upgrade():
)
with op.batch_alter_table("task_instance_history", schema=None) as batch_op:
- batch_op.add_column(sa.Column("dag_version_id", UUIDType(binary=False)))
+ batch_op.add_column(sa.Column("dag_version_id", sa.Uuid()))
with op.batch_alter_table("dag_run", schema=None) as batch_op:
batch_op.drop_column("dag_hash")
- batch_op.add_column(sa.Column("created_dag_version_id", UUIDType(binary=False), nullable=True))
+ batch_op.add_column(sa.Column("created_dag_version_id", sa.Uuid(), nullable=True))
batch_op.create_foreign_key(
"created_dag_version_id_fkey",
"dag_version",
diff --git a/airflow-core/src/airflow/migrations/versions/0052_3_0_0_add_deadline_alerts_table.py b/airflow-core/src/airflow/migrations/versions/0052_3_0_0_add_deadline_alerts_table.py
index e310d8e28f13e..0868ae4fdd145 100644
--- a/airflow-core/src/airflow/migrations/versions/0052_3_0_0_add_deadline_alerts_table.py
+++ b/airflow-core/src/airflow/migrations/versions/0052_3_0_0_add_deadline_alerts_table.py
@@ -29,7 +29,6 @@
import sqlalchemy as sa
from alembic import op
-from sqlalchemy_utils import UUIDType
from airflow.migrations.db_types import StringID
from airflow.models import ID_LEN
@@ -45,7 +44,7 @@
def upgrade():
op.create_table(
"deadline",
- sa.Column("id", UUIDType(binary=False), nullable=False),
+ sa.Column("id", sa.Uuid(), nullable=False),
sa.Column("dag_id", StringID(length=ID_LEN), nullable=True),
sa.Column("dagrun_id", sa.Integer(), nullable=True),
sa.Column("deadline", sa.DateTime(), nullable=False),
diff --git a/airflow-core/src/airflow/migrations/versions/0068_3_0_0_ti_table_id_unique_per_try.py b/airflow-core/src/airflow/migrations/versions/0068_3_0_0_ti_table_id_unique_per_try.py
index da67f0e4f9ebf..3f11cbb4b74b1 100644
--- a/airflow-core/src/airflow/migrations/versions/0068_3_0_0_ti_table_id_unique_per_try.py
+++ b/airflow-core/src/airflow/migrations/versions/0068_3_0_0_ti_table_id_unique_per_try.py
@@ -29,7 +29,6 @@
import sqlalchemy as sa
from alembic import op
-from sqlalchemy_utils import UUIDType
# revision identifiers, used by Alembic.
revision = "29ce7909c52b"
@@ -40,9 +39,9 @@
def _get_uuid_type(dialect_name: str) -> sa.types.TypeEngine:
- if dialect_name != "postgres":
+ if dialect_name != "postgresql":
return sa.String(36)
- return UUIDType(binary=False)
+ return sa.Uuid()
def upgrade():
@@ -116,6 +115,4 @@ def downgrade():
# This has to be in a separate batch, else on sqlite it throws `sqlalchemy.exc.CircularDependencyError`
# (and on non sqlite batching isn't "a thing", it issue alter tables fine)
with op.batch_alter_table("task_instance_history", schema=None) as batch_op:
- batch_op.add_column(
- sa.Column("task_instance_id", UUIDType(binary=False), autoincrement=False, nullable=True)
- )
+ batch_op.add_column(sa.Column("task_instance_id", sa.Uuid(), autoincrement=False, nullable=True))
diff --git a/airflow-core/src/airflow/migrations/versions/0079_3_1_0_add_url_and_template_params_to_dagbundle_model.py b/airflow-core/src/airflow/migrations/versions/0079_3_1_0_add_url_and_template_params_to_dagbundle_model.py
index a5b3787c8758a..7cc5586b6c73f 100644
--- a/airflow-core/src/airflow/migrations/versions/0079_3_1_0_add_url_and_template_params_to_dagbundle_model.py
+++ b/airflow-core/src/airflow/migrations/versions/0079_3_1_0_add_url_and_template_params_to_dagbundle_model.py
@@ -29,7 +29,6 @@
import sqlalchemy as sa
from alembic import op
-from sqlalchemy_utils import JSONType
# revision identifiers, used by Alembic.
revision = "3bda03debd04"
@@ -43,7 +42,7 @@ def upgrade():
"""Apply Add url and template params to DagBundleModel."""
with op.batch_alter_table("dag_bundle", schema=None) as batch_op:
batch_op.add_column(sa.Column("signed_url_template", sa.String(length=200), nullable=True))
- batch_op.add_column(sa.Column("template_params", JSONType(), nullable=True))
+ batch_op.add_column(sa.Column("template_params", sa.JSON(), nullable=True))
def downgrade():
diff --git a/airflow-core/src/airflow/migrations/versions/0083_3_1_0_add_teams.py b/airflow-core/src/airflow/migrations/versions/0083_3_1_0_add_teams.py
index 8379b1e82828a..d73784eb285f1 100644
--- a/airflow-core/src/airflow/migrations/versions/0083_3_1_0_add_teams.py
+++ b/airflow-core/src/airflow/migrations/versions/0083_3_1_0_add_teams.py
@@ -31,7 +31,6 @@
import sqlalchemy as sa
from alembic import op
-from sqlalchemy_utils import UUIDType
from airflow.migrations.db_types import StringID
@@ -47,7 +46,7 @@ def upgrade():
"""Create team table."""
op.create_table(
"team",
- sa.Column("id", UUIDType(binary=False), nullable=False),
+ sa.Column("id", sa.Uuid(), nullable=False),
sa.Column("name", sa.String(50), nullable=False),
sa.PrimaryKeyConstraint("id", name=op.f("team_pkey")),
sa.UniqueConstraint("name", name="team_name_uq"),
@@ -57,7 +56,7 @@ def upgrade():
op.create_table(
"dag_bundle_team",
sa.Column("dag_bundle_name", StringID(), nullable=False),
- sa.Column("team_id", UUIDType(binary=False), nullable=False),
+ sa.Column("team_id", sa.Uuid(), nullable=False),
sa.PrimaryKeyConstraint("dag_bundle_name", "team_id", name=op.f("dag_bundle_team_pkey")),
sa.ForeignKeyConstraint(
columns=("dag_bundle_name",),
@@ -79,15 +78,15 @@ def upgrade():
"""Update `connection` table to add `team_id` column"""
with op.batch_alter_table("connection") as batch_op:
- batch_op.add_column(sa.Column("team_id", UUIDType(binary=False), nullable=True))
+ batch_op.add_column(sa.Column("team_id", sa.Uuid(), nullable=True))
batch_op.create_foreign_key(batch_op.f("connection_team_id_fkey"), "team", ["team_id"], ["id"])
"""Update `variable` table to add `team_id` column"""
with op.batch_alter_table("variable") as batch_op:
- batch_op.add_column(sa.Column("team_id", UUIDType(binary=False), nullable=True))
+ batch_op.add_column(sa.Column("team_id", sa.Uuid(), nullable=True))
batch_op.create_foreign_key(batch_op.f("variable_team_id_fkey"), "team", ["team_id"], ["id"])
"""Update `slot_pool` table to add `team_id` column"""
with op.batch_alter_table("slot_pool") as batch_op:
- batch_op.add_column(sa.Column("team_id", UUIDType(binary=False), nullable=True))
+ batch_op.add_column(sa.Column("team_id", sa.Uuid(), nullable=True))
batch_op.create_foreign_key(batch_op.f("slot_pool_team_id_fkey"), "team", ["team_id"], ["id"])
diff --git a/airflow-core/src/airflow/migrations/versions/0091_3_2_0_restructure_callback_table.py b/airflow-core/src/airflow/migrations/versions/0091_3_2_0_restructure_callback_table.py
index 4a2a94a8dc4f6..d7037393ce6b1 100644
--- a/airflow-core/src/airflow/migrations/versions/0091_3_2_0_restructure_callback_table.py
+++ b/airflow-core/src/airflow/migrations/versions/0091_3_2_0_restructure_callback_table.py
@@ -29,7 +29,6 @@
import sqlalchemy as sa
from alembic import op
-from sqlalchemy_utils import UUIDType
from airflow.utils.sqlalchemy import ExtendedJSON
@@ -64,7 +63,7 @@ def upgrade():
# Replace INTEGER id with UUID id
batch_op.drop_column("id")
- batch_op.add_column(sa.Column("id", UUIDType(binary=False), nullable=False))
+ batch_op.add_column(sa.Column("id", sa.Uuid(), nullable=False))
batch_op.create_primary_key("callback_pkey", ["id"])
batch_op.drop_column("callback_data")
diff --git a/airflow-core/src/airflow/migrations/versions/0092_3_2_0_replace_deadline_inline_callback_with_fkey.py b/airflow-core/src/airflow/migrations/versions/0092_3_2_0_replace_deadline_inline_callback_with_fkey.py
index 0e023825e6bc6..245c8c0cc6f15 100644
--- a/airflow-core/src/airflow/migrations/versions/0092_3_2_0_replace_deadline_inline_callback_with_fkey.py
+++ b/airflow-core/src/airflow/migrations/versions/0092_3_2_0_replace_deadline_inline_callback_with_fkey.py
@@ -33,7 +33,6 @@
import sqlalchemy as sa
from alembic import context, op
from sqlalchemy import column, select, table
-from sqlalchemy_utils import UUIDType
from airflow.serialization.serde import deserialize
from airflow.utils.sqlalchemy import ExtendedJSON, UtcDateTime
@@ -125,13 +124,13 @@ def migrate_all_data():
deadline_table = table(
"deadline",
- column("id", UUIDType(binary=False)),
+ column("id", sa.Uuid()),
column("dagrun_id", sa.Integer()),
column("deadline_time", UtcDateTime(timezone=True)),
column("callback", sa.JSON()),
column("callback_state", sa.String(20)),
column("missed", sa.Boolean()),
- column("callback_id", UUIDType(binary=False)),
+ column("callback_id", sa.Uuid()),
)
dag_run_table = table(
@@ -142,7 +141,7 @@ def migrate_all_data():
callback_table = table(
"callback",
- column("id", UUIDType(binary=False)),
+ column("id", sa.Uuid()),
column("type", sa.String(20)),
column("fetch_method", sa.String(20)),
column("data", ExtendedJSON()),
@@ -178,14 +177,14 @@ def migrate_all_data():
# Add new columns (temporarily nullable until data has been migrated)
with op.batch_alter_table("deadline") as batch_op:
batch_op.add_column(sa.Column("missed", sa.Boolean(), nullable=True))
- batch_op.add_column(sa.Column("callback_id", UUIDType(binary=False), nullable=True))
+ batch_op.add_column(sa.Column("callback_id", sa.Uuid(), nullable=True))
migrate_all_data()
with op.batch_alter_table("deadline") as batch_op:
# Data for `missed` and `callback_id` has been migrated so make them non-nullable
batch_op.alter_column("missed", existing_type=sa.Boolean(), nullable=False)
- batch_op.alter_column("callback_id", existing_type=UUIDType(binary=False), nullable=False)
+ batch_op.alter_column("callback_id", existing_type=sa.Uuid(), nullable=False)
batch_op.create_index("deadline_missed_deadline_time_idx", ["missed", "deadline_time"], unique=False)
batch_op.drop_index(batch_op.f("deadline_callback_state_time_idx"))
@@ -274,8 +273,8 @@ def migrate_all_data():
deadline_table = table(
"deadline",
- column("id", UUIDType(binary=False)),
- column("callback_id", UUIDType(binary=False)),
+ column("id", sa.Uuid()),
+ column("callback_id", sa.Uuid()),
column("callback", sa.JSON()),
column("callback_state", sa.String(20)),
column("trigger_id", sa.Integer()),
@@ -283,7 +282,7 @@ def migrate_all_data():
callback_table = table(
"callback",
- column("id", UUIDType(binary=False)),
+ column("id", sa.Uuid()),
column("data", ExtendedJSON()),
column("state", sa.String(10)),
)
@@ -319,7 +318,7 @@ def migrate_all_data():
batch_op.add_column(sa.Column("callback", sa.JSON(), nullable=True))
# Make callback_id nullable so the associated callbacks can be cleared during migration
- batch_op.alter_column("callback_id", existing_type=UUIDType(binary=False), nullable=True)
+ batch_op.alter_column("callback_id", existing_type=sa.Uuid(), nullable=True)
migrate_all_data()
diff --git a/airflow-core/src/airflow/migrations/versions/0099_3_2_0_ui_improvements_for_deadlines.py b/airflow-core/src/airflow/migrations/versions/0099_3_2_0_ui_improvements_for_deadlines.py
index 14a1ac855b147..581b38caaa271 100644
--- a/airflow-core/src/airflow/migrations/versions/0099_3_2_0_ui_improvements_for_deadlines.py
+++ b/airflow-core/src/airflow/migrations/versions/0099_3_2_0_ui_improvements_for_deadlines.py
@@ -39,7 +39,6 @@
import sqlalchemy as sa
import uuid6
from alembic import context, op
-from sqlalchemy_utils import UUIDType
from airflow._shared.timezones import timezone
from airflow.configuration import conf
@@ -82,9 +81,9 @@ def upgrade() -> None:
op.create_table(
"deadline_alert",
- sa.Column("id", UUIDType(binary=False), default=uuid6.uuid7),
+ sa.Column("id", sa.Uuid(), default=uuid6.uuid7),
sa.Column("created_at", UtcDateTime, nullable=False),
- sa.Column("serialized_dag_id", UUIDType(binary=False), nullable=False),
+ sa.Column("serialized_dag_id", sa.Uuid(), nullable=False),
sa.Column("name", sa.String(250), nullable=True),
sa.Column("description", sa.Text(), nullable=True),
sa.Column("reference", sa.JSON(), nullable=False),
@@ -100,7 +99,7 @@ def upgrade() -> None:
conn.execute(sa.text("PRAGMA foreign_keys=OFF"))
with op.batch_alter_table("deadline", schema=None) as batch_op:
- batch_op.add_column(sa.Column("deadline_alert_id", UUIDType(binary=False), nullable=True))
+ batch_op.add_column(sa.Column("deadline_alert_id", sa.Uuid(), nullable=True))
batch_op.add_column(sa.Column("created_at", UtcDateTime, nullable=True))
batch_op.add_column(sa.Column("last_updated_at", UtcDateTime, nullable=True))
batch_op.create_foreign_key(
diff --git a/airflow-core/src/airflow/migrations/versions/0103_3_2_0_fix_uuid_column_types.py b/airflow-core/src/airflow/migrations/versions/0103_3_2_0_fix_uuid_column_types.py
new file mode 100644
index 0000000000000..be901ecf656b3
--- /dev/null
+++ b/airflow-core/src/airflow/migrations/versions/0103_3_2_0_fix_uuid_column_types.py
@@ -0,0 +1,276 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Standardize UUID column format for non-PostgreSQL databases.
+
+Several columns used String(36) to store UUIDs in the 36-character dashed format
+(e.g., "019c32fd-f36f-77a4-b5ba-098064c38b15"). This migration standardizes UUID
+storage to the 32-character hex format (e.g., "019c32fdf36f77a4b5ba098064c38b15")
+used by SQLAlchemy's native sa.Uuid() type on non-PostgreSQL databases.
+
+Affected columns:
+ - task_instance.id (PK)
+ - task_instance_note.ti_id (FK -> task_instance.id)
+ - task_reschedule.ti_id (FK -> task_instance.id)
+ - hitl_detail.ti_id (FK -> task_instance.id)
+ - task_instance_history.task_instance_id (PK)
+ - hitl_detail_history.ti_history_id (FK -> task_instance_history.task_instance_id)
+
+Revision ID: f8c9d7e6b5a4
+Revises: 53ff648b8a26
+Create Date: 2026-02-06 12:00:00.000000
+
+"""
+
+from __future__ import annotations
+
+import sqlalchemy as sa
+from alembic import op
+
+# revision identifiers, used by Alembic.
+revision = "f8c9d7e6b5a4"
+down_revision = "53ff648b8a26"
+branch_labels = None
+depends_on = None
+airflow_version = "3.2.0"
+
+
+def upgrade():
+ """
+ Standardize UUID storage format for non-PostgreSQL databases.
+
+ On PostgreSQL: No action needed (uses native UUID type).
+ On MySQL/SQLite: Convert UUID values from 36-char dashed format to 32-char hex format
+ and change column type from VARCHAR(36) to CHAR(32) to match sa.Uuid() behavior.
+ """
+ conn = op.get_bind()
+ dialect_name = conn.dialect.name
+
+ # PostgreSQL uses native UUID type, no fix needed
+ if dialect_name == "postgresql":
+ return
+
+ # Step 1: Drop all incoming FK constraints that reference the columns being changed.
+ # Even with ON UPDATE CASCADE, we need to drop FKs because we're changing the column
+ # TYPE (not just data). On SQLite, batch_alter_table recreates tables entirely, so
+ # incoming FK references would point to the old (dropped) table.
+
+ # FKs referencing task_instance.id
+ with op.batch_alter_table("task_instance_note", schema=None) as batch_op:
+ batch_op.drop_constraint("task_instance_note_ti_fkey", type_="foreignkey")
+
+ with op.batch_alter_table("task_reschedule", schema=None) as batch_op:
+ batch_op.drop_constraint("task_reschedule_ti_fkey", type_="foreignkey")
+
+ with op.batch_alter_table("hitl_detail", schema=None) as batch_op:
+ batch_op.drop_constraint("hitl_detail_ti_fkey", type_="foreignkey")
+
+ # FK referencing task_instance_history.task_instance_id
+ with op.batch_alter_table("hitl_detail_history", schema=None) as batch_op:
+ batch_op.drop_constraint("hitl_detail_history_tih_fkey", type_="foreignkey")
+
+ # Step 2: Convert UUID values from 36-char (with dashes) to 32-char (hex only)
+ # This must happen BEFORE changing the column type to avoid truncation.
+ for table, column in [
+ ("task_instance", "id"),
+ ("task_instance_note", "ti_id"),
+ ("task_reschedule", "ti_id"),
+ ("hitl_detail", "ti_id"),
+ ("task_instance_history", "task_instance_id"),
+ ("hitl_detail_history", "ti_history_id"),
+ ]:
+ conn.execute(sa.text(f"UPDATE {table} SET {column} = REPLACE({column}, '-', '')"))
+
+ # Step 3: Change column types from String(36) to Uuid()
+ # Using sa.Uuid() directly so Alembic renders the correct dialect-specific type
+ # (CHAR(32) on SQLite/MySQL) and schema comparison matches the ORM models.
+ for table, column in [
+ ("task_instance", "id"),
+ ("task_instance_note", "ti_id"),
+ ("task_reschedule", "ti_id"),
+ ("hitl_detail", "ti_id"),
+ ("task_instance_history", "task_instance_id"),
+ ("hitl_detail_history", "ti_history_id"),
+ ]:
+ with op.batch_alter_table(table, schema=None) as batch_op:
+ batch_op.alter_column(
+ column,
+ existing_type=sa.String(36),
+ type_=sa.Uuid(),
+ existing_nullable=False,
+ )
+
+ # Step 4: Recreate all FK constraints
+ with op.batch_alter_table("task_instance_note", schema=None) as batch_op:
+ batch_op.create_foreign_key(
+ "task_instance_note_ti_fkey",
+ "task_instance",
+ ["ti_id"],
+ ["id"],
+ ondelete="CASCADE",
+ onupdate="CASCADE",
+ )
+
+ with op.batch_alter_table("task_reschedule", schema=None) as batch_op:
+ batch_op.create_foreign_key(
+ "task_reschedule_ti_fkey",
+ "task_instance",
+ ["ti_id"],
+ ["id"],
+ ondelete="CASCADE",
+ )
+
+ with op.batch_alter_table("hitl_detail", schema=None) as batch_op:
+ batch_op.create_foreign_key(
+ "hitl_detail_ti_fkey",
+ "task_instance",
+ ["ti_id"],
+ ["id"],
+ ondelete="CASCADE",
+ onupdate="CASCADE",
+ )
+
+ with op.batch_alter_table("hitl_detail_history", schema=None) as batch_op:
+ batch_op.create_foreign_key(
+ "hitl_detail_history_tih_fkey",
+ "task_instance_history",
+ ["ti_history_id"],
+ ["task_instance_id"],
+ ondelete="CASCADE",
+ onupdate="CASCADE",
+ )
+
+
+def downgrade():
+ """
+ Revert UUID storage format standardization.
+
+ On PostgreSQL: No action needed (uses native UUID type).
+ On MySQL/SQLite: Revert CHAR(32) back to VARCHAR(36) and convert UUID values
+ from 32-char hex format to 36-char dashed format.
+ """
+ conn = op.get_bind()
+ dialect_name = conn.dialect.name
+
+ # PostgreSQL uses native UUID type, no fix needed
+ if dialect_name == "postgresql":
+ return
+
+ # Step 1: Drop all FK constraints before modifying columns
+ with op.batch_alter_table("task_instance_note", schema=None) as batch_op:
+ batch_op.drop_constraint("task_instance_note_ti_fkey", type_="foreignkey")
+
+ with op.batch_alter_table("task_reschedule", schema=None) as batch_op:
+ batch_op.drop_constraint("task_reschedule_ti_fkey", type_="foreignkey")
+
+ with op.batch_alter_table("hitl_detail", schema=None) as batch_op:
+ batch_op.drop_constraint("hitl_detail_ti_fkey", type_="foreignkey")
+
+ with op.batch_alter_table("hitl_detail_history", schema=None) as batch_op:
+ batch_op.drop_constraint("hitl_detail_history_tih_fkey", type_="foreignkey")
+
+ # Step 2: Expand column types back to String(36) to make room for dashes
+ for table, column in [
+ ("task_instance", "id"),
+ ("task_instance_note", "ti_id"),
+ ("task_reschedule", "ti_id"),
+ ("hitl_detail", "ti_id"),
+ ("task_instance_history", "task_instance_id"),
+ ("hitl_detail_history", "ti_history_id"),
+ ]:
+ with op.batch_alter_table(table, schema=None) as batch_op:
+ batch_op.alter_column(
+ column,
+ existing_type=sa.Uuid(),
+ type_=sa.String(36),
+ existing_nullable=False,
+ )
+
+ # Step 3: Convert UUID values from 32-char (hex only) to 36-char (with dashes)
+ # Format: xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx (8-4-4-4-12)
+ for table, column in [
+ ("task_instance", "id"),
+ ("task_instance_note", "ti_id"),
+ ("task_reschedule", "ti_id"),
+ ("hitl_detail", "ti_id"),
+ ("task_instance_history", "task_instance_id"),
+ ("hitl_detail_history", "ti_history_id"),
+ ]:
+ if dialect_name == "mysql":
+ conn.execute(
+ sa.text(
+ f"UPDATE {table} SET {column} = CONCAT("
+ f"SUBSTR({column}, 1, 8), '-', "
+ f"SUBSTR({column}, 9, 4), '-', "
+ f"SUBSTR({column}, 13, 4), '-', "
+ f"SUBSTR({column}, 17, 4), '-', "
+ f"SUBSTR({column}, 21, 12))"
+ )
+ )
+ else: # sqlite
+ conn.execute(
+ sa.text(
+ f"UPDATE {table} SET {column} = "
+ f"SUBSTR({column}, 1, 8) || '-' || "
+ f"SUBSTR({column}, 9, 4) || '-' || "
+ f"SUBSTR({column}, 13, 4) || '-' || "
+ f"SUBSTR({column}, 17, 4) || '-' || "
+ f"SUBSTR({column}, 21, 12)"
+ )
+ )
+
+ # Step 4: Recreate all FK constraints
+ with op.batch_alter_table("task_instance_note", schema=None) as batch_op:
+ batch_op.create_foreign_key(
+ "task_instance_note_ti_fkey",
+ "task_instance",
+ ["ti_id"],
+ ["id"],
+ ondelete="CASCADE",
+ onupdate="CASCADE",
+ )
+
+ with op.batch_alter_table("task_reschedule", schema=None) as batch_op:
+ batch_op.create_foreign_key(
+ "task_reschedule_ti_fkey",
+ "task_instance",
+ ["ti_id"],
+ ["id"],
+ ondelete="CASCADE",
+ )
+
+ with op.batch_alter_table("hitl_detail", schema=None) as batch_op:
+ batch_op.create_foreign_key(
+ "hitl_detail_ti_fkey",
+ "task_instance",
+ ["ti_id"],
+ ["id"],
+ ondelete="CASCADE",
+ onupdate="CASCADE",
+ )
+
+ with op.batch_alter_table("hitl_detail_history", schema=None) as batch_op:
+ batch_op.create_foreign_key(
+ "hitl_detail_history_tih_fkey",
+ "task_instance_history",
+ ["ti_history_id"],
+ ["task_instance_id"],
+ ondelete="CASCADE",
+ onupdate="CASCADE",
+ )
diff --git a/airflow-core/src/airflow/models/callback.py b/airflow-core/src/airflow/models/callback.py
index 30d3ba93b9ed5..ea45a10f1f1ae 100644
--- a/airflow-core/src/airflow/models/callback.py
+++ b/airflow-core/src/airflow/models/callback.py
@@ -20,12 +20,12 @@
from enum import Enum
from importlib import import_module
from typing import TYPE_CHECKING, Any, Protocol, runtime_checkable
+from uuid import UUID
import structlog
import uuid6
-from sqlalchemy import ForeignKey, Integer, String, Text
+from sqlalchemy import ForeignKey, Integer, String, Text, Uuid
from sqlalchemy.orm import Mapped, mapped_column, relationship
-from sqlalchemy_utils import UUIDType
from airflow._shared.observability.metrics.stats import Stats
from airflow._shared.timezones import timezone
@@ -108,7 +108,7 @@ class Callback(Base):
__tablename__ = "callback"
- id: Mapped[str] = mapped_column(UUIDType(binary=False), primary_key=True, default=uuid6.uuid7)
+ id: Mapped[UUID] = mapped_column(Uuid(), primary_key=True, default=uuid6.uuid7)
# This is used by SQLAlchemy to be able to deserialize DB rows to subclasses
__mapper_args__ = {
diff --git a/airflow-core/src/airflow/models/dag_version.py b/airflow-core/src/airflow/models/dag_version.py
index 4af7a4c5505cf..cfdb301d0410e 100644
--- a/airflow-core/src/airflow/models/dag_version.py
+++ b/airflow-core/src/airflow/models/dag_version.py
@@ -20,11 +20,12 @@
import logging
from datetime import datetime
from typing import TYPE_CHECKING
+from uuid import UUID
+import sqlalchemy as sa
import uuid6
from sqlalchemy import ForeignKey, Integer, UniqueConstraint, select
from sqlalchemy.orm import Mapped, joinedload, mapped_column, relationship
-from sqlalchemy_utils import UUIDType
from airflow._shared.timezones import timezone
from airflow.dag_processing.bundles.manager import DagBundlesManager
@@ -44,7 +45,7 @@ class DagVersion(Base):
"""Model to track the versions of DAGs in the database."""
__tablename__ = "dag_version"
- id: Mapped[str] = mapped_column(UUIDType(binary=False), primary_key=True, default=uuid6.uuid7)
+ id: Mapped[UUID] = mapped_column(sa.Uuid(), primary_key=True, default=uuid6.uuid7)
version_number: Mapped[int] = mapped_column(Integer, nullable=False, default=1)
dag_id: Mapped[str] = mapped_column(
StringID(), ForeignKey("dag.dag_id", ondelete="CASCADE"), nullable=False
diff --git a/airflow-core/src/airflow/models/dagbag.py b/airflow-core/src/airflow/models/dagbag.py
index 9b965a9460734..3d20c323e031c 100644
--- a/airflow-core/src/airflow/models/dagbag.py
+++ b/airflow-core/src/airflow/models/dagbag.py
@@ -19,6 +19,7 @@
import hashlib
from typing import TYPE_CHECKING, Any
+from uuid import UUID
from sqlalchemy import String, inspect, select
from sqlalchemy.orm import Mapped, joinedload, mapped_column
@@ -45,7 +46,7 @@ class DBDagBag:
"""
def __init__(self, load_op_links: bool = True) -> None:
- self._dags: dict[str, SerializedDAG] = {} # dag_version_id to dag
+ self._dags: dict[UUID, SerializedDAG] = {} # dag_version_id to dag
self.load_op_links = load_op_links
def _read_dag(self, serdag: SerializedDagModel) -> SerializedDAG | None:
@@ -54,7 +55,7 @@ def _read_dag(self, serdag: SerializedDagModel) -> SerializedDAG | None:
self._dags[serdag.dag_version_id] = dag
return dag
- def _get_dag(self, version_id: str, session: Session) -> SerializedDAG | None:
+ def _get_dag(self, version_id: UUID, session: Session) -> SerializedDAG | None:
if dag := self._dags.get(version_id):
return dag
dag_version = session.get(DagVersion, version_id, options=[joinedload(DagVersion.serialized_dag)])
diff --git a/airflow-core/src/airflow/models/dagbundle.py b/airflow-core/src/airflow/models/dagbundle.py
index c2d99f2560d24..b11467b7b6614 100644
--- a/airflow-core/src/airflow/models/dagbundle.py
+++ b/airflow-core/src/airflow/models/dagbundle.py
@@ -18,9 +18,9 @@
from datetime import datetime
+import sqlalchemy as sa
from sqlalchemy import Boolean, String
from sqlalchemy.orm import Mapped, mapped_column, relationship
-from sqlalchemy_utils import JSONType
from airflow.models.base import Base, StringID
from airflow.models.team import dag_bundle_team_association_table
@@ -49,7 +49,7 @@ class DagBundleModel(Base, LoggingMixin):
version: Mapped[str | None] = mapped_column(String(200), nullable=True)
last_refreshed: Mapped[datetime | None] = mapped_column(UtcDateTime, nullable=True)
signed_url_template: Mapped[str | None] = mapped_column(String(200), nullable=True)
- template_params: Mapped[dict | None] = mapped_column(JSONType, nullable=True)
+ template_params: Mapped[dict | None] = mapped_column(sa.JSON(), nullable=True)
teams = relationship("Team", secondary=dag_bundle_team_association_table, back_populates="dag_bundles")
def __init__(self, *, name: str, version: str | None = None):
diff --git a/airflow-core/src/airflow/models/dagcode.py b/airflow-core/src/airflow/models/dagcode.py
index 6f1207d376bfe..60ee91c8b59b5 100644
--- a/airflow-core/src/airflow/models/dagcode.py
+++ b/airflow-core/src/airflow/models/dagcode.py
@@ -19,13 +19,14 @@
import logging
from datetime import datetime
from typing import TYPE_CHECKING
+from uuid import UUID
+import sqlalchemy as sa
import uuid6
from sqlalchemy import ForeignKey, String, Text, select
from sqlalchemy.dialects.mysql import MEDIUMTEXT
from sqlalchemy.orm import Mapped, mapped_column, relationship
from sqlalchemy.sql.expression import literal
-from sqlalchemy_utils import UUIDType
from airflow._shared.timezones import timezone
from airflow.configuration import conf
@@ -55,7 +56,7 @@ class DagCode(Base):
"""
__tablename__ = "dag_code"
- id: Mapped[str] = mapped_column(UUIDType(binary=False), primary_key=True, default=uuid6.uuid7)
+ id: Mapped[UUID] = mapped_column(sa.Uuid(), primary_key=True, default=uuid6.uuid7)
dag_id: Mapped[str] = mapped_column(String(ID_LEN), nullable=False)
fileloc: Mapped[str] = mapped_column(String(2000), nullable=False)
# The max length of fileloc exceeds the limit of indexing.
@@ -65,8 +66,8 @@ class DagCode(Base):
)
source_code: Mapped[str] = mapped_column(Text().with_variant(MEDIUMTEXT(), "mysql"), nullable=False)
source_code_hash: Mapped[str] = mapped_column(String(32), nullable=False)
- dag_version_id: Mapped[str] = mapped_column(
- UUIDType(binary=False), ForeignKey("dag_version.id", ondelete="CASCADE"), nullable=False, unique=True
+ dag_version_id: Mapped[UUID] = mapped_column(
+ sa.Uuid(), ForeignKey("dag_version.id", ondelete="CASCADE"), nullable=False, unique=True
)
dag_version = relationship("DagVersion", back_populates="dag_code", uselist=False)
diff --git a/airflow-core/src/airflow/models/dagrun.py b/airflow-core/src/airflow/models/dagrun.py
index c5eb3a7087bb7..e05e3c7da3e3c 100644
--- a/airflow-core/src/airflow/models/dagrun.py
+++ b/airflow-core/src/airflow/models/dagrun.py
@@ -24,6 +24,7 @@
from collections.abc import Callable, Iterable, Iterator, Sequence
from datetime import datetime
from typing import TYPE_CHECKING, Any, NamedTuple, TypeVar, cast, overload
+from uuid import UUID
import structlog
from natsort import natsorted
@@ -38,6 +39,7 @@
String,
Text,
UniqueConstraint,
+ Uuid,
and_,
case,
func,
@@ -54,7 +56,6 @@
from sqlalchemy.orm import Mapped, declared_attr, joinedload, mapped_column, relationship, synonym, validates
from sqlalchemy.sql.expression import false, select
from sqlalchemy.sql.functions import coalesce
-from sqlalchemy_utils import UUIDType
from airflow._shared.observability.metrics.dual_stats_manager import DualStatsManager
from airflow._shared.observability.metrics.stats import Stats
@@ -214,8 +215,8 @@ class DagRun(Base, LoggingMixin):
span_status: Mapped[str] = mapped_column(
String(250), server_default=SpanStatus.NOT_STARTED, nullable=False
)
- created_dag_version_id: Mapped[str | None] = mapped_column(
- UUIDType(binary=False),
+ created_dag_version_id: Mapped[UUID | None] = mapped_column(
+ Uuid(),
ForeignKey("dag_version.id", name="created_dag_version_id_fkey", ondelete="set null"),
nullable=True,
)
@@ -438,7 +439,7 @@ def duration(cls, session: Session = NEW_SESSION) -> Case:
return case(when_condition, else_=None)
@provide_session
- def check_version_id_exists_in_dr(self, dag_version_id: UUIDType, session: Session = NEW_SESSION):
+ def check_version_id_exists_in_dr(self, dag_version_id: UUID, session: Session = NEW_SESSION):
select_stmt = (
select(TI.dag_version_id)
.where(TI.dag_id == self.dag_id, TI.dag_version_id == dag_version_id, TI.run_id == self.run_id)
@@ -1093,7 +1094,7 @@ def start_dr_spans_if_needed(self, tis: list[TI]):
ti_carrier = Trace.inject()
ti.context_carrier = ti_carrier
ti.span_status = SpanStatus.ACTIVE
- self.active_spans.set("ti:" + ti.id, ti_span)
+ self.active_spans.set(f"ti:{ti.id}", ti_span)
else:
self.log.debug(
"Found span_status '%s', while updating state for dag_run '%s'",
@@ -1689,7 +1690,7 @@ def _emit_duration_stats_for_finished_state(self):
)
@provide_session
- def verify_integrity(self, *, session: Session = NEW_SESSION, dag_version_id: UUIDType) -> None:
+ def verify_integrity(self, *, session: Session = NEW_SESSION, dag_version_id: UUID) -> None:
"""
Verify the DagRun by checking for removed tasks or tasks that are not in the database yet.
@@ -1826,7 +1827,7 @@ def _get_task_creator(
created_counts: dict[str, int],
ti_mutation_hook: Callable,
hook_is_noop: Literal[True],
- dag_version_id: UUIDType,
+ dag_version_id: UUID,
) -> Callable[[Operator, Iterable[int]], Iterator[dict[str, Any]]]: ...
@overload
@@ -1835,7 +1836,7 @@ def _get_task_creator(
created_counts: dict[str, int],
ti_mutation_hook: Callable,
hook_is_noop: Literal[False],
- dag_version_id: UUIDType,
+ dag_version_id: UUID,
) -> Callable[[Operator, Iterable[int]], Iterator[TI]]: ...
def _get_task_creator(
@@ -1843,7 +1844,7 @@ def _get_task_creator(
created_counts: dict[str, int],
ti_mutation_hook: Callable,
hook_is_noop: Literal[True, False],
- dag_version_id: UUIDType,
+ dag_version_id: UUID,
) -> Callable[[Operator, Iterable[int]], Iterator[dict[str, Any]] | Iterator[TI]]:
"""
Get the task creator function.
@@ -1959,7 +1960,7 @@ def _create_task_instances(
session.rollback()
def _revise_map_indexes_if_mapped(
- self, task: Operator, *, dag_version_id: UUIDType, session: Session
+ self, task: Operator, *, dag_version_id: UUID | None, session: Session
) -> Iterator[TI]:
"""
Check if task increased or reduced in length and handle appropriately.
@@ -2052,8 +2053,8 @@ def schedule_tis(
"""
# Get list of TI IDs that do not need to executed, these are
# tasks using EmptyOperator and without on_execute_callback / on_success_callback
- empty_ti_ids: list[str] = []
- schedulable_ti_ids: list[str] = []
+ empty_ti_ids: list[UUID] = []
+ schedulable_ti_ids: list[UUID] = []
for ti in schedulable_tis:
if ti.is_schedulable:
schedulable_ti_ids.append(ti.id)
diff --git a/airflow-core/src/airflow/models/deadline.py b/airflow-core/src/airflow/models/deadline.py
index 070304f30a728..bbf24cc2842d2 100644
--- a/airflow-core/src/airflow/models/deadline.py
+++ b/airflow-core/src/airflow/models/deadline.py
@@ -22,12 +22,12 @@
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import TYPE_CHECKING, Any, cast
+from uuid import UUID
import uuid6
-from sqlalchemy import Boolean, ForeignKey, Index, Integer, and_, func, inspect, select, text
+from sqlalchemy import Boolean, ForeignKey, Index, Integer, Uuid, and_, func, inspect, select, text
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.orm import Mapped, mapped_column, relationship
-from sqlalchemy_utils import UUIDType
from airflow._shared.observability.metrics.stats import Stats
from airflow._shared.timezones import timezone
@@ -82,7 +82,7 @@ class Deadline(Base):
__tablename__ = "deadline"
- id: Mapped[str] = mapped_column(UUIDType(binary=False), primary_key=True, default=uuid6.uuid7)
+ id: Mapped[UUID] = mapped_column(Uuid(), primary_key=True, default=uuid6.uuid7)
created_at: Mapped[datetime] = mapped_column(UtcDateTime, nullable=False, default=timezone.utcnow)
last_updated_at: Mapped[datetime] = mapped_column(
UtcDateTime, nullable=False, default=timezone.utcnow, onupdate=timezone.utcnow
@@ -101,14 +101,14 @@ class Deadline(Base):
missed: Mapped[bool] = mapped_column(Boolean, nullable=False)
# Callback that will run when this deadline is missed
- callback_id: Mapped[str] = mapped_column(
- UUIDType(binary=False), ForeignKey("callback.id", ondelete="CASCADE"), nullable=False
+ callback_id: Mapped[UUID] = mapped_column(
+ Uuid(), ForeignKey("callback.id", ondelete="CASCADE"), nullable=False
)
callback = relationship("Callback", uselist=False, cascade="all, delete-orphan", single_parent=True)
# The DeadlineAlert that generated this deadline
- deadline_alert_id: Mapped[str | None] = mapped_column(
- UUIDType(binary=False), ForeignKey("deadline_alert.id", ondelete="SET NULL"), nullable=True
+ deadline_alert_id: Mapped[UUID | None] = mapped_column(
+ Uuid(), ForeignKey("deadline_alert.id", ondelete="SET NULL"), nullable=True
)
deadline_alert: Mapped[DeadlineAlert | None] = relationship("DeadlineAlert")
@@ -119,7 +119,7 @@ def __init__(
deadline_time: datetime,
callback: CallbackDefinitionProtocol,
dagrun_id: int,
- deadline_alert_id: str | None,
+ deadline_alert_id: UUID | None,
dag_id: str | None = None,
):
super().__init__()
diff --git a/airflow-core/src/airflow/models/deadline_alert.py b/airflow-core/src/airflow/models/deadline_alert.py
index 557d905354aff..7af61ff70ba43 100644
--- a/airflow-core/src/airflow/models/deadline_alert.py
+++ b/airflow-core/src/airflow/models/deadline_alert.py
@@ -18,12 +18,12 @@
from datetime import datetime
from typing import TYPE_CHECKING
+from uuid import UUID
import uuid6
-from sqlalchemy import JSON, Float, ForeignKey, String, Text, select
+from sqlalchemy import JSON, Float, ForeignKey, String, Text, Uuid, select
from sqlalchemy.exc import NoResultFound
from sqlalchemy.orm import Mapped, mapped_column
-from sqlalchemy_utils import UUIDType
from airflow._shared.timezones import timezone
from airflow.models import Base
@@ -40,11 +40,11 @@ class DeadlineAlert(Base):
__tablename__ = "deadline_alert"
- id: Mapped[str] = mapped_column(UUIDType(binary=False), primary_key=True, default=uuid6.uuid7)
+ id: Mapped[UUID] = mapped_column(Uuid(), primary_key=True, default=uuid6.uuid7)
created_at: Mapped[datetime] = mapped_column(UtcDateTime, nullable=False, default=timezone.utcnow)
- serialized_dag_id: Mapped[str] = mapped_column(
- UUIDType(binary=False), ForeignKey("serialized_dag.id", ondelete="CASCADE"), nullable=False
+ serialized_dag_id: Mapped[UUID] = mapped_column(
+ Uuid(), ForeignKey("serialized_dag.id", ondelete="CASCADE"), nullable=False
)
name: Mapped[str | None] = mapped_column(String(250), nullable=True)
@@ -94,13 +94,16 @@ def reference_class(self) -> type[SerializedReferenceModels.SerializedBaseDeadli
@classmethod
@provide_session
- def get_by_id(cls, deadline_alert_id: str, session: Session = NEW_SESSION) -> DeadlineAlert:
+ def get_by_id(cls, deadline_alert_id: str | UUID, session: Session = NEW_SESSION) -> DeadlineAlert:
"""
Retrieve a DeadlineAlert record by its UUID.
- :param deadline_alert_id: The UUID of the DeadlineAlert to retrieve
+ :param deadline_alert_id: The UUID of the DeadlineAlert to retrieve (as string or UUID object)
:param session: Database session
"""
+ # Convert string to UUID if needed
+ if isinstance(deadline_alert_id, str):
+ deadline_alert_id = UUID(deadline_alert_id)
result = session.scalar(select(cls).where(cls.id == deadline_alert_id))
if result is None:
raise NoResultFound(f"No DeadlineAlert found with id {deadline_alert_id}")
diff --git a/airflow-core/src/airflow/models/hitl.py b/airflow-core/src/airflow/models/hitl.py
index 3077bc3c8084a..38e2d2f112f87 100644
--- a/airflow-core/src/airflow/models/hitl.py
+++ b/airflow-core/src/airflow/models/hitl.py
@@ -18,10 +18,10 @@
from datetime import datetime
from typing import TYPE_CHECKING, Any, TypedDict
+from uuid import UUID
import sqlalchemy as sa
-from sqlalchemy import Boolean, ForeignKeyConstraint, String, Text, func, literal
-from sqlalchemy.dialects import postgresql
+from sqlalchemy import Boolean, ForeignKeyConstraint, String, Text, Uuid, func, literal
from sqlalchemy.ext.compiler import compiles
from sqlalchemy.ext.hybrid import hybrid_property
from sqlalchemy.orm import Mapped, mapped_column, relationship
@@ -137,8 +137,8 @@ class HITLDetail(Base, HITLDetailPropertyMixin):
"""Human-in-the-loop request and corresponding response."""
__tablename__ = "hitl_detail"
- ti_id: Mapped[str] = mapped_column(
- String(36).with_variant(postgresql.UUID(as_uuid=False), "postgresql"),
+ ti_id: Mapped[UUID] = mapped_column(
+ Uuid(),
primary_key=True,
nullable=False,
)
diff --git a/airflow-core/src/airflow/models/hitl_history.py b/airflow-core/src/airflow/models/hitl_history.py
index a0951a32df217..7974143814008 100644
--- a/airflow-core/src/airflow/models/hitl_history.py
+++ b/airflow-core/src/airflow/models/hitl_history.py
@@ -18,10 +18,10 @@
from datetime import datetime
from typing import TYPE_CHECKING
+from uuid import UUID
import sqlalchemy as sa
-from sqlalchemy import Boolean, ForeignKeyConstraint, String, Text
-from sqlalchemy.dialects import postgresql
+from sqlalchemy import Boolean, ForeignKeyConstraint, Text, Uuid
from sqlalchemy.orm import Mapped, mapped_column, relationship
from airflow._shared.timezones import timezone
@@ -41,8 +41,8 @@ class HITLDetailHistory(Base, HITLDetailPropertyMixin):
"""
__tablename__ = "hitl_detail_history"
- ti_history_id: Mapped[str] = mapped_column(
- String(36).with_variant(postgresql.UUID(as_uuid=False), "postgresql"),
+ ti_history_id: Mapped[UUID] = mapped_column(
+ Uuid(),
primary_key=True,
nullable=False,
)
diff --git a/airflow-core/src/airflow/models/serialized_dag.py b/airflow-core/src/airflow/models/serialized_dag.py
index a633d764bfe51..3d8ca6c85227a 100644
--- a/airflow-core/src/airflow/models/serialized_dag.py
+++ b/airflow-core/src/airflow/models/serialized_dag.py
@@ -24,14 +24,13 @@
from collections.abc import Callable, Iterable, Iterator, Sequence
from datetime import datetime, timedelta
from typing import TYPE_CHECKING, Any, Literal
+from uuid import UUID
-import sqlalchemy as sa
import uuid6
-from sqlalchemy import ForeignKey, LargeBinary, String, exists, select, tuple_, update
+from sqlalchemy import JSON, ForeignKey, LargeBinary, String, Uuid, exists, select, tuple_, update
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.orm import Mapped, backref, foreign, mapped_column, relationship
from sqlalchemy.sql.expression import func, literal
-from sqlalchemy_utils import UUIDType
from airflow._shared.timezones import timezone
from airflow.configuration import conf
@@ -293,10 +292,10 @@ class SerializedDagModel(Base):
"""
__tablename__ = "serialized_dag"
- id: Mapped[str] = mapped_column(UUIDType(binary=False), primary_key=True, default=uuid6.uuid7)
+ id: Mapped[UUID] = mapped_column(Uuid(), primary_key=True, default=uuid6.uuid7)
dag_id: Mapped[str] = mapped_column(String(ID_LEN), nullable=False)
_data: Mapped[dict | None] = mapped_column(
- "data", sa.JSON().with_variant(JSONB, "postgresql"), nullable=True
+ "data", JSON().with_variant(JSONB, "postgresql"), nullable=True
)
_data_compressed: Mapped[bytes | None] = mapped_column("data_compressed", LargeBinary, nullable=True)
created_at: Mapped[datetime] = mapped_column(UtcDateTime, nullable=False, default=timezone.utcnow)
@@ -319,8 +318,8 @@ class SerializedDagModel(Base):
innerjoin=True,
backref=backref("serialized_dag", uselist=False, innerjoin=True),
)
- dag_version_id: Mapped[str] = mapped_column(
- UUIDType(binary=False),
+ dag_version_id: Mapped[UUID] = mapped_column(
+ Uuid(),
ForeignKey("dag_version.id", ondelete="CASCADE"),
nullable=False,
unique=True,
@@ -435,7 +434,7 @@ def _create_deadline_alert_records(
for uuid_str, deadline_data in uuid_mapping.items():
alert = DeadlineAlertModel(
- id=uuid_str,
+ id=UUID(uuid_str),
reference=deadline_data[DeadlineAlertFields.REFERENCE],
interval=deadline_data[DeadlineAlertFields.INTERVAL],
callback_def=deadline_data[DeadlineAlertFields.CALLBACK],
diff --git a/airflow-core/src/airflow/models/taskinstance.py b/airflow-core/src/airflow/models/taskinstance.py
index a0676cfc0ac00..f2e4c03960b99 100644
--- a/airflow-core/src/airflow/models/taskinstance.py
+++ b/airflow-core/src/airflow/models/taskinstance.py
@@ -22,12 +22,12 @@
import json
import logging
import math
-import uuid
from collections import defaultdict
from collections.abc import Collection, Iterable
from datetime import datetime, timedelta
from typing import TYPE_CHECKING, Any
from urllib.parse import quote
+from uuid import UUID
import attrs
import dill
@@ -43,6 +43,7 @@
String,
Text,
UniqueConstraint,
+ Uuid,
and_,
case,
delete,
@@ -62,7 +63,6 @@
from sqlalchemy.ext.mutable import MutableDict
from sqlalchemy.orm import Mapped, lazyload, mapped_column, reconstructor, relationship
from sqlalchemy.orm.attributes import NO_VALUE, set_committed_value
-from sqlalchemy_utils import UUIDType
from airflow import settings
from airflow._shared.observability.metrics.dual_stats_manager import DualStatsManager
@@ -251,13 +251,11 @@ def clear_task_instances(
:meta private:
"""
- task_instance_ids: list[str] = []
from airflow.exceptions import AirflowClearRunningTaskException
from airflow.models.dagbag import DBDagBag
scheduler_dagbag = DBDagBag(load_op_links=False)
for ti in tis:
- task_instance_ids.append(ti.id)
ti.prepare_db_for_next_try(session)
if ti.state == TaskInstanceState.RUNNING:
@@ -403,9 +401,9 @@ def _date_or_empty(*, task_instance: TaskInstance, attr: str) -> str:
return result.strftime("%Y%m%dT%H%M%S") if result else ""
-def uuid7() -> str:
- """Generate a new UUID7 string."""
- return str(uuid6.uuid7())
+def uuid7() -> UUID:
+ """Generate a new UUID7."""
+ return uuid6.uuid7()
class TaskInstance(Base, LoggingMixin):
@@ -428,8 +426,8 @@ class TaskInstance(Base, LoggingMixin):
"""
__tablename__ = "task_instance"
- id: Mapped[str] = mapped_column(
- String(36).with_variant(postgresql.UUID(as_uuid=False), "postgresql"),
+ id: Mapped[UUID] = mapped_column(
+ Uuid(),
primary_key=True,
default=uuid7,
nullable=False,
@@ -488,8 +486,8 @@ class TaskInstance(Base, LoggingMixin):
_task_display_property_value: Mapped[str | None] = mapped_column(
"task_display_name", String(2000), nullable=True
)
- dag_version_id: Mapped[str | uuid.UUID | None] = mapped_column(
- UUIDType(binary=False),
+ dag_version_id: Mapped[UUID | None] = mapped_column(
+ Uuid(),
ForeignKey("dag_version.id", ondelete="RESTRICT"),
nullable=True,
)
@@ -558,7 +556,7 @@ class TaskInstance(Base, LoggingMixin):
def __init__(
self,
task: Operator,
- dag_version_id: UUIDType | uuid.UUID,
+ dag_version_id: UUID | None,
run_id: str | None = None,
state: str | None = None,
map_index: int = -1,
@@ -601,7 +599,7 @@ def stats_tags(self) -> dict[str, str]:
@staticmethod
def insert_mapping(
- run_id: str, task: Operator, map_index: int, dag_version_id: UUIDType
+ run_id: str, task: Operator, map_index: int, dag_version_id: UUID | None
) -> dict[str, Any]:
"""
Insert mapping.
@@ -2172,8 +2170,8 @@ class TaskInstanceNote(Base):
"""For storage of arbitrary notes concerning the task instance."""
__tablename__ = "task_instance_note"
- ti_id: Mapped[str] = mapped_column(
- String(36).with_variant(postgresql.UUID(as_uuid=False), "postgresql"),
+ ti_id: Mapped[UUID] = mapped_column(
+ Uuid(),
primary_key=True,
nullable=False,
)
diff --git a/airflow-core/src/airflow/models/taskinstancehistory.py b/airflow-core/src/airflow/models/taskinstancehistory.py
index 23a1796ac1e0e..df7b0a2876f0e 100644
--- a/airflow-core/src/airflow/models/taskinstancehistory.py
+++ b/airflow-core/src/airflow/models/taskinstancehistory.py
@@ -19,6 +19,7 @@
from datetime import datetime
from typing import TYPE_CHECKING
+from uuid import UUID
import dill
from sqlalchemy import (
@@ -31,13 +32,13 @@
String,
Text,
UniqueConstraint,
+ Uuid,
func,
select,
)
from sqlalchemy.dialects import postgresql
from sqlalchemy.ext.mutable import MutableDict
from sqlalchemy.orm import Mapped, mapped_column, relationship
-from sqlalchemy_utils import UUIDType
from airflow._shared.timezones import timezone
from airflow.models.base import Base, StringID
@@ -67,8 +68,8 @@ class TaskInstanceHistory(Base):
"""
__tablename__ = "task_instance_history"
- task_instance_id: Mapped[str] = mapped_column(
- String(36).with_variant(postgresql.UUID(as_uuid=False), "postgresql"),
+ task_instance_id: Mapped[UUID] = mapped_column(
+ Uuid(),
nullable=False,
primary_key=True,
)
@@ -114,7 +115,7 @@ class TaskInstanceHistory(Base):
)
task_display_name: Mapped[str | None] = mapped_column(String(2000), nullable=True)
- dag_version_id: Mapped[str | None] = mapped_column(UUIDType(binary=False), nullable=True)
+ dag_version_id: Mapped[UUID | None] = mapped_column(Uuid(), nullable=True)
dag_version = relationship(
"DagVersion",
@@ -174,7 +175,7 @@ def __init__(
)
@property
- def id(self) -> str:
+ def id(self) -> UUID:
"""Alias for primary key field to support TaskInstance."""
return self.task_instance_id
diff --git a/airflow-core/src/airflow/models/taskreschedule.py b/airflow-core/src/airflow/models/taskreschedule.py
index 88f87121fd717..a7c3022b8d77c 100644
--- a/airflow-core/src/airflow/models/taskreschedule.py
+++ b/airflow-core/src/airflow/models/taskreschedule.py
@@ -20,17 +20,16 @@
from __future__ import annotations
import datetime
-import uuid
from typing import TYPE_CHECKING
+from uuid import UUID
from sqlalchemy import (
ForeignKey,
Index,
Integer,
- String,
+ Uuid,
select,
)
-from sqlalchemy.dialects import postgresql
from sqlalchemy.orm import Mapped, mapped_column, relationship
from airflow.models.base import Base
@@ -49,8 +48,8 @@ class TaskReschedule(Base):
__tablename__ = "task_reschedule"
id: Mapped[int] = mapped_column(Integer, primary_key=True)
- ti_id: Mapped[str] = mapped_column(
- String(36).with_variant(postgresql.UUID(as_uuid=False), "postgresql"),
+ ti_id: Mapped[UUID] = mapped_column(
+ Uuid(),
ForeignKey("task_instance.id", ondelete="CASCADE", name="task_reschedule_ti_fkey"),
nullable=False,
)
@@ -67,12 +66,12 @@ class TaskReschedule(Base):
def __init__(
self,
- ti_id: uuid.UUID | str,
+ ti_id: UUID,
start_date: datetime.datetime,
end_date: datetime.datetime,
reschedule_date: datetime.datetime,
) -> None:
- self.ti_id = str(ti_id)
+ self.ti_id = ti_id
self.start_date = start_date
self.end_date = end_date
self.reschedule_date = reschedule_date
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
index 27a1d9e15006d..8b0bf124203b2 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
@@ -5360,6 +5360,7 @@ export const $TaskInstanceResponse = {
properties: {
id: {
type: 'string',
+ format: 'uuid',
title: 'Id'
},
task_id: {
diff --git a/airflow-core/src/airflow/utils/db.py b/airflow-core/src/airflow/utils/db.py
index 2b3ea410c5f76..01407f3b004b3 100644
--- a/airflow-core/src/airflow/utils/db.py
+++ b/airflow-core/src/airflow/utils/db.py
@@ -112,7 +112,7 @@ class MappedClassProtocol(Protocol):
"3.0.0": "29ce7909c52b",
"3.0.3": "fe199e1abd77",
"3.1.0": "cc92b33c6709",
- "3.2.0": "53ff648b8a26",
+ "3.2.0": "f8c9d7e6b5a4",
}
# Prefix used to identify tables holding data moved during migration.
diff --git a/airflow-core/tests/unit/api_fastapi/common/test_uuid_serialization.py b/airflow-core/tests/unit/api_fastapi/common/test_uuid_serialization.py
new file mode 100644
index 0000000000000..28d7117af542c
--- /dev/null
+++ b/airflow-core/tests/unit/api_fastapi/common/test_uuid_serialization.py
@@ -0,0 +1,49 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from uuid import UUID
+
+import pytest
+
+from airflow.api_fastapi.core_api.datamodels.dag_versions import DagVersionResponse
+from airflow.api_fastapi.core_api.datamodels.task_instances import TaskInstanceResponse
+
+TEST_UUID_36 = "019c39c7-4fea-76b6-891d-ebb3267d427d"
+TEST_UUID_32 = TEST_UUID_36.replace("-", "")
+TEST_UUID = UUID(TEST_UUID_32)
+
+
+@pytest.mark.parametrize(
+ "model_cls",
+ [
+ TaskInstanceResponse,
+ DagVersionResponse,
+ ],
+ ids=["TaskInstanceResponse", "DagVersionResponse"],
+)
+def test_uuid_field_serializes_as_dashed_string(model_cls):
+ """Ensure UUID fields serialize to dashed string format in JSON responses.
+
+ This guards against regressions that could break airflow-client-python
+ and other consumers of the Public API that expect UUID fields to be
+ plain dashed strings in JSON.
+ """
+ instance = model_cls.model_construct(id=TEST_UUID)
+ json_data = instance.model_dump(mode="json")
+ assert isinstance(json_data["id"], str)
+ assert json_data["id"] == TEST_UUID_36
diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_hitl.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_hitl.py
index 3f370760577e9..65b156fd71a39 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_hitl.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_hitl.py
@@ -244,7 +244,7 @@ def expected_sample_hitl_detail_dict(sample_ti: TaskInstance) -> dict[str, Any]:
"executor": None,
"executor_config": "{}",
"hostname": "",
- "id": sample_ti.id,
+ "id": str(sample_ti.id),
"logical_date": mock.ANY,
"map_index": -1,
"max_tries": 0,
diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_log.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_log.py
index 5df670c3a6054..113401f85fb71 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_log.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_log.py
@@ -79,7 +79,7 @@ def add_one(x: int):
dag.clear()
for ti in dr.task_instances:
ti.try_number = 2
- ti.id = str(uuid7())
+ ti.id = uuid7()
ti.hostname = "localhost"
session.merge(ti)
# Commit changes to avoid locks
@@ -104,7 +104,7 @@ def add_one(x: int):
dummy_dag.clear()
for ti in dr2.task_instances:
ti.try_number = 2
- ti.id = str(uuid7())
+ ti.id = uuid7()
ti.hostname = "localhost"
session.merge(ti)
diff --git a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_hitl.py b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_hitl.py
index 61c74bf70f342..0b51dfead307a 100644
--- a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_hitl.py
+++ b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_hitl.py
@@ -119,13 +119,13 @@ def test_upsert_hitl_detail(
response = client.post(
f"/execution/hitlDetails/{ti.id}",
json={
- "ti_id": ti.id,
+ "ti_id": str(ti.id),
**default_hitl_detail_request_kwargs,
},
)
expected_json = {
- "ti_id": ti.id,
+ "ti_id": str(ti.id),
**default_hitl_detail_request_kwargs,
}
expected_json["assigned_users"] = expected_json.pop("assignees") or []
@@ -145,7 +145,7 @@ def test_upsert_hitl_detail_with_empty_option(
response = client.post(
f"/execution/hitlDetails/{ti.id}",
json={
- "ti_id": ti.id,
+ "ti_id": str(ti.id),
"subject": "This is subject",
"body": "this is body",
"options": [],
@@ -163,7 +163,7 @@ def test_update_hitl_detail(client: Client, sample_ti: TaskInstance) -> None:
response = client.patch(
f"/execution/hitlDetails/{sample_ti.id}",
json={
- "ti_id": sample_ti.id,
+ "ti_id": str(sample_ti.id),
"chosen_options": ["Reject"],
"params_input": {"input_1": 2},
},
@@ -182,7 +182,7 @@ def test_update_hitl_detail_without_option(client: Client, sample_ti: TaskInstan
response = client.patch(
f"/execution/hitlDetails/{sample_ti.id}",
json={
- "ti_id": sample_ti.id,
+ "ti_id": str(sample_ti.id),
"chosen_options": [],
"params_input": {"input_1": 2},
},
diff --git a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py
index cfee6c9d46cb1..2eba8a045a82b 100644
--- a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py
+++ b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py
@@ -20,7 +20,7 @@
from datetime import datetime
from typing import TYPE_CHECKING
from unittest import mock
-from uuid import uuid4
+from uuid import UUID, uuid4
import pytest
import uuid6
@@ -95,7 +95,7 @@ def test_id_matches_sub_claim(client, session, create_task_instance):
def side_effect(cred, validators):
if not validators:
return claims
- if validators["sub"]["value"] != ti.id:
+ if str(validators["sub"]["value"]) != str(ti.id):
raise RuntimeError("Fake auth denied")
return claims
@@ -1011,7 +1011,7 @@ def test_ti_update_state_not_found(self, client, session):
"""
Test that a 404 error is returned when the Task Instance does not exist.
"""
- task_instance_id = "0182e924-0f1e-77e6-ab50-e977118bc139"
+ task_instance_id = UUID("0182e924-0f1e-77e6-ab50-e977118bc139")
# Pre-condition: the Task Instance does not exist
assert session.get(TaskInstance, task_instance_id) is None
@@ -1575,7 +1575,7 @@ def test_ti_heartbeat(
def test_ti_heartbeat_non_existent_task(self, client, session, create_task_instance):
"""Test that a 404 error is returned when the Task Instance does not exist."""
- task_instance_id = "0182e924-0f1e-77e6-ab50-e977118bc139"
+ task_instance_id = UUID("0182e924-0f1e-77e6-ab50-e977118bc139")
# Pre-condition: the Task Instance does not exist
assert session.get(TaskInstance, task_instance_id) is None
@@ -1771,7 +1771,7 @@ def test_ti_previous_dag_run(self, client, session, create_task_instance, dag_ma
}
def test_ti_previous_dag_run_not_found(self, client, session):
- ti_id = "0182e924-0f1e-77e6-ab50-e977118bc139"
+ ti_id = UUID("0182e924-0f1e-77e6-ab50-e977118bc139")
assert session.get(TaskInstance, ti_id) is None
diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py b/airflow-core/tests/unit/jobs/test_scheduler_job.py
index e1db0b698cc0d..fee8dbb7f6617 100644
--- a/airflow-core/tests/unit/jobs/test_scheduler_job.py
+++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py
@@ -3164,7 +3164,7 @@ def test_recreate_unhealthy_scheduler_spans_if_needed(self, ti_state, final_ti_s
assert dr.span_status == SpanStatus.ACTIVE
assert self.job_runner.active_spans.get("dr:" + str(dr.id)) is None
- assert self.job_runner.active_spans.get("ti:" + ti.id) is None
+ assert self.job_runner.active_spans.get(f"ti:{ti.id}") is None
assert ti.state == ti_state
assert ti.span_status == SpanStatus.ACTIVE
@@ -3173,10 +3173,10 @@ def test_recreate_unhealthy_scheduler_spans_if_needed(self, ti_state, final_ti_s
assert self.job_runner.active_spans.get("dr:" + str(dr.id)) is not None
if final_ti_span_status == SpanStatus.ACTIVE:
- assert self.job_runner.active_spans.get("ti:" + ti.id) is not None
+ assert self.job_runner.active_spans.get(f"ti:{ti.id}") is not None
assert len(self.job_runner.active_spans.get_all()) == 2
else:
- assert self.job_runner.active_spans.get("ti:" + ti.id) is None
+ assert self.job_runner.active_spans.get(f"ti:{ti.id}") is None
assert len(self.job_runner.active_spans.get_all()) == 1
assert dr.span_status == SpanStatus.ACTIVE
@@ -3217,13 +3217,13 @@ def test_end_spans_of_externally_ended_ops(self, dag_maker):
ti_span = Trace.start_child_span(span_name="ti_span", start_as_current=False)
self.job_runner.active_spans.set("dr:" + str(dr.id), dr_span)
- self.job_runner.active_spans.set("ti:" + ti.id, ti_span)
+ self.job_runner.active_spans.set(f"ti:{ti.id}", ti_span)
assert dr.span_status == SpanStatus.SHOULD_END
assert ti.span_status == SpanStatus.SHOULD_END
assert self.job_runner.active_spans.get("dr:" + str(dr.id)) is not None
- assert self.job_runner.active_spans.get("ti:" + ti.id) is not None
+ assert self.job_runner.active_spans.get(f"ti:{ti.id}") is not None
self.job_runner._end_spans_of_externally_ended_ops(session)
@@ -3231,7 +3231,7 @@ def test_end_spans_of_externally_ended_ops(self, dag_maker):
assert ti.span_status == SpanStatus.ENDED
assert self.job_runner.active_spans.get("dr:" + str(dr.id)) is None
- assert self.job_runner.active_spans.get("ti:" + ti.id) is None
+ assert self.job_runner.active_spans.get(f"ti:{ti.id}") is None
@pytest.mark.parametrize(
("state", "final_span_status"),
@@ -3274,13 +3274,13 @@ def test_end_active_spans(self, state, final_span_status, dag_maker):
ti_span = Trace.start_child_span(span_name="ti_span", start_as_current=False)
self.job_runner.active_spans.set("dr:" + str(dr.id), dr_span)
- self.job_runner.active_spans.set("ti:" + ti.id, ti_span)
+ self.job_runner.active_spans.set(f"ti:{ti.id}", ti_span)
assert dr.span_status == SpanStatus.ACTIVE
assert ti.span_status == SpanStatus.ACTIVE
assert self.job_runner.active_spans.get("dr:" + str(dr.id)) is not None
- assert self.job_runner.active_spans.get("ti:" + ti.id) is not None
+ assert self.job_runner.active_spans.get(f"ti:{ti.id}") is not None
assert len(self.job_runner.active_spans.get_all()) == 2
self.job_runner._end_active_spans(session)
@@ -3289,7 +3289,7 @@ def test_end_active_spans(self, state, final_span_status, dag_maker):
assert ti.span_status == final_span_status
assert self.job_runner.active_spans.get("dr:" + str(dr.id)) is None
- assert self.job_runner.active_spans.get("ti:" + ti.id) is None
+ assert self.job_runner.active_spans.get(f"ti:{ti.id}") is None
assert len(self.job_runner.active_spans.get_all()) == 0
def test_dagrun_timeout_verify_max_active_runs(self, dag_maker, session):
diff --git a/airflow-core/tests/unit/models/test_dagrun.py b/airflow-core/tests/unit/models/test_dagrun.py
index 7449a69b5f0cd..070e0f62a4209 100644
--- a/airflow-core/tests/unit/models/test_dagrun.py
+++ b/airflow-core/tests/unit/models/test_dagrun.py
@@ -563,7 +563,7 @@ def test_start_dr_spans_if_needed_span_with_continuance(self, dag_maker, session
assert dag_run.active_spans is not None
assert dag_run.active_spans.get("dr:" + str(dag_run.id)) is None
- assert dag_run.active_spans.get("ti:" + first_ti.id) is None
+ assert dag_run.active_spans.get(f"ti:{first_ti.id}") is None
assert dag_run.span_status == SpanStatus.NEEDS_CONTINUANCE
assert first_ti.span_status == SpanStatus.NEEDS_CONTINUANCE
@@ -572,7 +572,7 @@ def test_start_dr_spans_if_needed_span_with_continuance(self, dag_maker, session
assert dag_run.span_status == SpanStatus.ACTIVE
assert first_ti.span_status == SpanStatus.ACTIVE
assert dag_run.active_spans.get("dr:" + str(dag_run.id)) is not None
- assert dag_run.active_spans.get("ti:" + first_ti.id) is not None
+ assert dag_run.active_spans.get(f"ti:{first_ti.id}") is not None
def test_end_dr_span_if_needed(self, testing_dag_bundle, dag_maker, session):
with dag_maker(
diff --git a/airflow-core/tests/unit/models/test_taskinstance.py b/airflow-core/tests/unit/models/test_taskinstance.py
index 20faa78946646..8172656ec050e 100644
--- a/airflow-core/tests/unit/models/test_taskinstance.py
+++ b/airflow-core/tests/unit/models/test_taskinstance.py
@@ -2428,7 +2428,7 @@ def test_refresh_from_db(self, create_task_instance):
"try_number": 1,
"max_tries": 1,
"hostname": "some_unique_hostname",
- "id": str(uuid6.uuid7()),
+ "id": uuid6.uuid7(),
"unixname": "some_unique_unixname",
"pool": "some_fake_pool_id",
"pool_slots": 25,
@@ -2576,7 +2576,7 @@ def test_task_instance_history_is_created_when_ti_goes_for_retry(self, dag_maker
tih = session.scalars(select(TaskInstanceHistory)).all()
assert len(tih) == 1
# the new try_id should be different from what's recorded in tih
- assert str(tih[0].task_instance_id) == try_id
+ assert tih[0].task_instance_id == try_id
@pytest.mark.parametrize("pool_override", [None, "test_pool2"])
diff --git a/airflow-ctl/src/airflowctl/api/datamodels/generated.py b/airflow-ctl/src/airflowctl/api/datamodels/generated.py
index d965f317c55ae..fbb0881b90936 100644
--- a/airflow-ctl/src/airflowctl/api/datamodels/generated.py
+++ b/airflow-ctl/src/airflowctl/api/datamodels/generated.py
@@ -1728,7 +1728,7 @@ class TaskInstanceResponse(BaseModel):
TaskInstance serializer for responses.
"""
- id: Annotated[str, Field(title="Id")]
+ id: Annotated[UUID, Field(title="Id")]
task_id: Annotated[str, Field(title="Task Id")]
dag_id: Annotated[str, Field(title="Dag Id")]
dag_run_id: Annotated[str, Field(title="Dag Run Id")]
diff --git a/devel-common/src/tests_common/test_utils/api_fastapi.py b/devel-common/src/tests_common/test_utils/api_fastapi.py
index eee7f48aec8c3..a59f05bf10827 100644
--- a/devel-common/src/tests_common/test_utils/api_fastapi.py
+++ b/devel-common/src/tests_common/test_utils/api_fastapi.py
@@ -17,6 +17,7 @@
from __future__ import annotations
import json
+from uuid import UUID
from airflow.models import DagRun, Log
from airflow.models.dagrun import DagRunNote
@@ -93,6 +94,8 @@ def _check_dag_run_note(session, dr_id, note_data):
def _check_task_instance_note(session, ti_id, note_data):
from sqlalchemy import select
+ if isinstance(ti_id, str):
+ ti_id = UUID(ti_id)
ti_note = session.scalar(select(TaskInstanceNote).where(TaskInstanceNote.ti_id == ti_id))
if note_data is None:
assert ti_note is None
diff --git a/providers/openlineage/tests/unit/openlineage/plugins/test_listener.py b/providers/openlineage/tests/unit/openlineage/plugins/test_listener.py
index 8c44a920ebcf7..608e789436e96 100644
--- a/providers/openlineage/tests/unit/openlineage/plugins/test_listener.py
+++ b/providers/openlineage/tests/unit/openlineage/plugins/test_listener.py
@@ -205,10 +205,11 @@ def sample_callable(**kwargs):
execution_date=date, # type: ignore
)
if AIRFLOW_V_3_0_PLUS:
+ assert dagrun.created_dag_version_id is not None
task_instance = create_task_instance(
t,
run_id=run_id,
- dag_version_id=uuid.UUID(dagrun.created_dag_version_id),
+ dag_version_id=dagrun.created_dag_version_id,
)
else:
task_instance = TaskInstance(t, run_id=run_id) # type: ignore
diff --git a/providers/standard/tests/unit/standard/operators/test_hitl.py b/providers/standard/tests/unit/standard/operators/test_hitl.py
index 97c4ce9160b61..108a6935b3858 100644
--- a/providers/standard/tests/unit/standard/operators/test_hitl.py
+++ b/providers/standard/tests/unit/standard/operators/test_hitl.py
@@ -260,7 +260,7 @@ def test_execute(self, dag_maker: DagMaker, session: Session) -> None:
if AIRFLOW_V_3_2_PLUS:
expected_params_in_trigger_kwargs = expected_params
# trigger_kwargs are encoded via serde from task sdk in versions >= 3.2
- expected_ti_id = UUID(ti.id)
+ expected_ti_id = ti.id
else:
expected_params_in_trigger_kwargs = {"input_1": {"value": 1, "description": None, "schema": {}}}