From b750e1d6472744766bed90afc32480b7eb60d1c8 Mon Sep 17 00:00:00 2001 From: Dev-iL <6509619+Dev-iL@users.noreply.github.com> Date: Fri, 6 Feb 2026 15:03:33 +0200 Subject: [PATCH] Use SQLA's native Uuid/JSON instead of sqlalchemy-utils' types --- airflow-core/docs/img/airflow_erd.sha256 | 2 +- airflow-core/docs/img/airflow_erd.svg | 1670 ++++++++--------- airflow-core/docs/migrations-ref.rst | 4 +- airflow-core/pyproject.toml | 1 - .../core_api/datamodels/task_instances.py | 3 +- .../core_api/openapi/_private_ui.yaml | 1 + .../openapi/v2-rest-api-generated.yaml | 1 + .../airflow/api_fastapi/execution_api/deps.py | 2 +- .../api_fastapi/execution_api/routes/hitl.py | 13 +- .../execution_api/routes/task_instances.py | 74 +- .../execution_api/routes/task_reschedules.py | 2 +- .../src/airflow/jobs/scheduler_job_runner.py | 11 +- .../versions/0047_3_0_0_add_dag_versioning.py | 17 +- .../0052_3_0_0_add_deadline_alerts_table.py | 3 +- .../0068_3_0_0_ti_table_id_unique_per_try.py | 9 +- ..._and_template_params_to_dagbundle_model.py | 3 +- .../versions/0083_3_1_0_add_teams.py | 11 +- .../0091_3_2_0_restructure_callback_table.py | 3 +- ...lace_deadline_inline_callback_with_fkey.py | 19 +- ...099_3_2_0_ui_improvements_for_deadlines.py | 7 +- .../0103_3_2_0_fix_uuid_column_types.py | 276 +++ airflow-core/src/airflow/models/callback.py | 6 +- .../src/airflow/models/dag_version.py | 5 +- airflow-core/src/airflow/models/dagbag.py | 5 +- airflow-core/src/airflow/models/dagbundle.py | 4 +- airflow-core/src/airflow/models/dagcode.py | 9 +- airflow-core/src/airflow/models/dagrun.py | 25 +- airflow-core/src/airflow/models/deadline.py | 16 +- .../src/airflow/models/deadline_alert.py | 17 +- airflow-core/src/airflow/models/hitl.py | 8 +- .../src/airflow/models/hitl_history.py | 8 +- .../src/airflow/models/serialized_dag.py | 15 +- .../src/airflow/models/taskinstance.py | 28 +- .../src/airflow/models/taskinstancehistory.py | 11 +- .../src/airflow/models/taskreschedule.py | 13 +- .../ui/openapi-gen/requests/schemas.gen.ts | 1 + airflow-core/src/airflow/utils/db.py | 2 +- .../common/test_uuid_serialization.py | 49 + .../core_api/routes/public/test_hitl.py | 2 +- .../core_api/routes/public/test_log.py | 4 +- .../execution_api/versions/head/test_hitl.py | 10 +- .../versions/head/test_task_instances.py | 10 +- .../tests/unit/jobs/test_scheduler_job.py | 18 +- airflow-core/tests/unit/models/test_dagrun.py | 4 +- .../tests/unit/models/test_taskinstance.py | 4 +- .../airflowctl/api/datamodels/generated.py | 2 +- .../tests_common/test_utils/api_fastapi.py | 3 + .../unit/openlineage/plugins/test_listener.py | 3 +- .../unit/standard/operators/test_hitl.py | 2 +- 49 files changed, 1369 insertions(+), 1047 deletions(-) create mode 100644 airflow-core/src/airflow/migrations/versions/0103_3_2_0_fix_uuid_column_types.py create mode 100644 airflow-core/tests/unit/api_fastapi/common/test_uuid_serialization.py diff --git a/airflow-core/docs/img/airflow_erd.sha256 b/airflow-core/docs/img/airflow_erd.sha256 index 55462aff7bf6b..9922fde77099e 100644 --- a/airflow-core/docs/img/airflow_erd.sha256 +++ b/airflow-core/docs/img/airflow_erd.sha256 @@ -1 +1 @@ -7d6ced1b0a60a60c192ccc102b750c2d893d1c388625f8a90bb95ce457d0d9c4 \ No newline at end of file +5fb3647b8dc66e0c22e13797f2b3b05a9c09ac9e4a6d0d82b03f1556197fa219 \ No newline at end of file diff --git a/airflow-core/docs/img/airflow_erd.svg b/airflow-core/docs/img/airflow_erd.svg index 29eb9a94b2906..363c06bede71a 100644 --- a/airflow-core/docs/img/airflow_erd.svg +++ b/airflow-core/docs/img/airflow_erd.svg @@ -698,7 +698,7 @@ NOT NULL - + trigger:id--asset_watcher:trigger_id 0..N @@ -707,178 +707,178 @@ task_instance - -task_instance - -id - - [UUID] - NOT NULL - -context_carrier - - [JSONB] - -custom_operator_name - - [VARCHAR(1000)] - NOT NULL - -dag_id - - [VARCHAR(250)] - NOT NULL - -dag_version_id - - [UUID] - -duration - - [DOUBLE PRECISION] - -end_date - - [TIMESTAMP] - -executor - - [VARCHAR(1000)] - -executor_config - - [BYTEA] - NOT NULL - -external_executor_id - - [TEXT] - -hostname - - [VARCHAR(1000)] - NOT NULL - -last_heartbeat_at - - [TIMESTAMP] - -map_index - - [INTEGER] - NOT NULL - -max_tries - - [INTEGER] - NOT NULL - -next_kwargs - - [JSONB] - -next_method - - [VARCHAR(1000)] - -operator - - [VARCHAR(1000)] - -pid - - [INTEGER] - -pool - - [VARCHAR(256)] - NOT NULL - -pool_slots - - [INTEGER] - NOT NULL - -priority_weight - - [INTEGER] - NOT NULL - -queue - - [VARCHAR(256)] - NOT NULL - -queued_by_job_id - - [INTEGER] - -queued_dttm - - [TIMESTAMP] - -rendered_map_index - - [VARCHAR(250)] - -run_id - - [VARCHAR(250)] - NOT NULL - -scheduled_dttm - - [TIMESTAMP] - -span_status - - [VARCHAR(250)] - NOT NULL - -start_date - - [TIMESTAMP] - -state - - [VARCHAR(20)] - -task_display_name - - [VARCHAR(2000)] - -task_id - - [VARCHAR(250)] - NOT NULL - -trigger_id - - [INTEGER] - -trigger_timeout - - [TIMESTAMP] - -try_number - - [INTEGER] - NOT NULL - -unixname - - [VARCHAR(1000)] - NOT NULL - -updated_at - - [TIMESTAMP] + +task_instance + +id + + [UUID] + NOT NULL + +context_carrier + + [JSONB] + +custom_operator_name + + [VARCHAR(1000)] + NOT NULL + +dag_id + + [VARCHAR(250)] + NOT NULL + +dag_version_id + + [UUID] + +duration + + [DOUBLE PRECISION] + +end_date + + [TIMESTAMP] + +executor + + [VARCHAR(1000)] + +executor_config + + [BYTEA] + NOT NULL + +external_executor_id + + [TEXT] + +hostname + + [VARCHAR(1000)] + NOT NULL + +last_heartbeat_at + + [TIMESTAMP] + +map_index + + [INTEGER] + NOT NULL + +max_tries + + [INTEGER] + NOT NULL + +next_kwargs + + [JSONB] + +next_method + + [VARCHAR(1000)] + +operator + + [VARCHAR(1000)] + +pid + + [INTEGER] + +pool + + [VARCHAR(256)] + NOT NULL + +pool_slots + + [INTEGER] + NOT NULL + +priority_weight + + [INTEGER] + NOT NULL + +queue + + [VARCHAR(256)] + NOT NULL + +queued_by_job_id + + [INTEGER] + +queued_dttm + + [TIMESTAMP] + +rendered_map_index + + [VARCHAR(250)] + +run_id + + [VARCHAR(250)] + NOT NULL + +scheduled_dttm + + [TIMESTAMP] + +span_status + + [VARCHAR(250)] + NOT NULL + +start_date + + [TIMESTAMP] + +state + + [VARCHAR(20)] + +task_display_name + + [VARCHAR(2000)] + +task_id + + [VARCHAR(250)] + NOT NULL + +trigger_id + + [INTEGER] + +trigger_timeout + + [TIMESTAMP] + +try_number + + [INTEGER] + NOT NULL + +unixname + + [VARCHAR(1000)] + NOT NULL + +updated_at + + [TIMESTAMP] - + trigger:id--task_instance:trigger_id - -0..N + +0..N {0,1} @@ -926,7 +926,7 @@ NOT NULL - + callback:id--deadline:callback_id 0..N @@ -935,102 +935,102 @@ asset_alias - -asset_alias - -id - - [INTEGER] - NOT NULL - -group - - [VARCHAR(1500)] - NOT NULL - -name - - [VARCHAR(1500)] - NOT NULL + +asset_alias + +id + + [INTEGER] + NOT NULL + +group + + [VARCHAR(1500)] + NOT NULL + +name + + [VARCHAR(1500)] + NOT NULL asset_alias_asset - -asset_alias_asset - -alias_id - - [INTEGER] - NOT NULL - -asset_id - - [INTEGER] - NOT NULL + +asset_alias_asset + +alias_id + + [INTEGER] + NOT NULL + +asset_id + + [INTEGER] + NOT NULL - + asset_alias:id--asset_alias_asset:alias_id - -0..N -1 + +0..N +1 asset_alias_asset_event - -asset_alias_asset_event - -alias_id - - [INTEGER] - NOT NULL - -event_id - - [INTEGER] - NOT NULL + +asset_alias_asset_event + +alias_id + + [INTEGER] + NOT NULL + +event_id + + [INTEGER] + NOT NULL - + asset_alias:id--asset_alias_asset_event:alias_id - -0..N -1 + +0..N +1 dag_schedule_asset_alias_reference - -dag_schedule_asset_alias_reference - -alias_id - - [INTEGER] - NOT NULL - -dag_id - - [VARCHAR(250)] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL - -updated_at - - [TIMESTAMP] - NOT NULL + +dag_schedule_asset_alias_reference + +alias_id + + [INTEGER] + NOT NULL + +dag_id + + [VARCHAR(250)] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL + +updated_at + + [TIMESTAMP] + NOT NULL - + asset_alias:id--dag_schedule_asset_alias_reference:alias_id - -0..N -1 + +0..N +1 @@ -1074,261 +1074,261 @@ NOT NULL - + asset:id--asset_alias_asset:asset_id - -0..N + +0..N 1 - + asset:id--asset_watcher:asset_id - + 0..N 1 asset_active - -asset_active - -name - - [VARCHAR(1500)] - NOT NULL - -uri - - [VARCHAR(1500)] - NOT NULL + +asset_active + +name + + [VARCHAR(1500)] + NOT NULL + +uri + + [VARCHAR(1500)] + NOT NULL -asset:name--asset_active:name - -1 -1 +asset:uri--asset_active:uri + +1 +1 -asset:uri--asset_active:uri - -1 -1 +asset:name--asset_active:name + +1 +1 dag_schedule_asset_reference - -dag_schedule_asset_reference - -asset_id - - [INTEGER] - NOT NULL - -dag_id - - [VARCHAR(250)] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL - -updated_at - - [TIMESTAMP] - NOT NULL + +dag_schedule_asset_reference + +asset_id + + [INTEGER] + NOT NULL + +dag_id + + [VARCHAR(250)] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL + +updated_at + + [TIMESTAMP] + NOT NULL asset:id--dag_schedule_asset_reference:asset_id - -0..N + +0..N 1 task_outlet_asset_reference - -task_outlet_asset_reference - -asset_id - - [INTEGER] - NOT NULL - -dag_id - - [VARCHAR(250)] - NOT NULL - -task_id - - [VARCHAR(250)] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL - -updated_at - - [TIMESTAMP] - NOT NULL + +task_outlet_asset_reference + +asset_id + + [INTEGER] + NOT NULL + +dag_id + + [VARCHAR(250)] + NOT NULL + +task_id + + [VARCHAR(250)] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL + +updated_at + + [TIMESTAMP] + NOT NULL asset:id--task_outlet_asset_reference:asset_id - -0..N + +0..N 1 task_inlet_asset_reference - -task_inlet_asset_reference - -asset_id - - [INTEGER] - NOT NULL - -dag_id - - [VARCHAR(250)] - NOT NULL - -task_id - - [VARCHAR(250)] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL - -updated_at - - [TIMESTAMP] - NOT NULL + +task_inlet_asset_reference + +asset_id + + [INTEGER] + NOT NULL + +dag_id + + [VARCHAR(250)] + NOT NULL + +task_id + + [VARCHAR(250)] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL + +updated_at + + [TIMESTAMP] + NOT NULL asset:id--task_inlet_asset_reference:asset_id - -0..N + +0..N 1 asset_dag_run_queue - -asset_dag_run_queue - -asset_id - - [INTEGER] - NOT NULL - -target_dag_id - - [VARCHAR(250)] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL + +asset_dag_run_queue + +asset_id + + [INTEGER] + NOT NULL + +target_dag_id + + [VARCHAR(250)] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL asset:id--asset_dag_run_queue:asset_id - -0..N + +0..N 1 asset_event - -asset_event - -id - - [INTEGER] - NOT NULL - -asset_id - - [INTEGER] - NOT NULL - -extra - - [JSON] - NOT NULL - -partition_key - - [VARCHAR(250)] - -source_dag_id - - [VARCHAR(250)] - -source_map_index - - [INTEGER] - -source_run_id - - [VARCHAR(250)] - -source_task_id - - [VARCHAR(250)] - -timestamp - - [TIMESTAMP] - NOT NULL + +asset_event + +id + + [INTEGER] + NOT NULL + +asset_id + + [INTEGER] + NOT NULL + +extra + + [JSON] + NOT NULL + +partition_key + + [VARCHAR(250)] + +source_dag_id + + [VARCHAR(250)] + +source_map_index + + [INTEGER] + +source_run_id + + [VARCHAR(250)] + +source_task_id + + [VARCHAR(250)] + +timestamp + + [TIMESTAMP] + NOT NULL - + asset_event:id--asset_alias_asset_event:event_id - -0..N -1 + +0..N +1 dagrun_asset_event - -dagrun_asset_event - -dag_run_id - - [INTEGER] - NOT NULL - -event_id - - [INTEGER] - NOT NULL + +dagrun_asset_event + +dag_run_id + + [INTEGER] + NOT NULL + +event_id + + [INTEGER] + NOT NULL asset_event:id--dagrun_asset_event:event_id - -0..N -1 + +0..N +1 @@ -1387,84 +1387,84 @@ 1 - + dag:dag_id--dag_schedule_asset_alias_reference:dag_id - -0..N + +0..N 1 dag:dag_id--dag_schedule_asset_reference:dag_id - -0..N + +0..N 1 dag:dag_id--task_outlet_asset_reference:dag_id - -0..N + +0..N 1 dag:dag_id--task_inlet_asset_reference:dag_id - -0..N + +0..N 1 dag:dag_id--asset_dag_run_queue:target_dag_id - -0..N + +0..N 1 dag_version - -dag_version - -id - - [UUID] - NOT NULL - -bundle_name - - [VARCHAR(250)] - -bundle_version - - [VARCHAR(250)] - -created_at - - [TIMESTAMP] - NOT NULL - -dag_id - - [VARCHAR(250)] - NOT NULL - -last_updated - - [TIMESTAMP] - NOT NULL - -version_number - - [INTEGER] - NOT NULL + +dag_version + +id + + [UUID] + NOT NULL + +bundle_name + + [VARCHAR(250)] + +bundle_version + + [VARCHAR(250)] + +created_at + + [TIMESTAMP] + NOT NULL + +dag_id + + [VARCHAR(250)] + NOT NULL + +last_updated + + [TIMESTAMP] + NOT NULL + +version_number + + [INTEGER] + NOT NULL dag:dag_id--dag_version:dag_id - -0..N + +0..N 1 @@ -1577,133 +1577,133 @@ dag_run - -dag_run - -id - - [INTEGER] - NOT NULL - -backfill_id - - [INTEGER] - -bundle_version - - [VARCHAR(250)] - -clear_number - - [INTEGER] - NOT NULL - -conf - - [JSONB] - -context_carrier - - [JSONB] - -created_dag_version_id - - [UUID] - -creating_job_id - - [INTEGER] - -dag_id - - [VARCHAR(250)] - NOT NULL - -data_interval_end - - [TIMESTAMP] - -data_interval_start - - [TIMESTAMP] - -end_date - - [TIMESTAMP] - -last_scheduling_decision - - [TIMESTAMP] - -log_template_id - - [INTEGER] - NOT NULL - -logical_date - - [TIMESTAMP] - -partition_key - - [VARCHAR(250)] - -queued_at - - [TIMESTAMP] - -run_after - - [TIMESTAMP] - NOT NULL - -run_id - - [VARCHAR(250)] - NOT NULL - -run_type - - [VARCHAR(50)] - NOT NULL - -scheduled_by_job_id - - [INTEGER] - -span_status - - [VARCHAR(250)] - NOT NULL - -start_date - - [TIMESTAMP] - -state - - [VARCHAR(50)] - NOT NULL - -triggered_by - - [VARCHAR(50)] - -triggering_user_name - - [VARCHAR(512)] - -updated_at - - [TIMESTAMP] - NOT NULL + +dag_run + +id + + [INTEGER] + NOT NULL + +backfill_id + + [INTEGER] + +bundle_version + + [VARCHAR(250)] + +clear_number + + [INTEGER] + NOT NULL + +conf + + [JSONB] + +context_carrier + + [JSONB] + +created_dag_version_id + + [UUID] + +creating_job_id + + [INTEGER] + +dag_id + + [VARCHAR(250)] + NOT NULL + +data_interval_end + + [TIMESTAMP] + +data_interval_start + + [TIMESTAMP] + +end_date + + [TIMESTAMP] + +last_scheduling_decision + + [TIMESTAMP] + +log_template_id + + [INTEGER] + NOT NULL + +logical_date + + [TIMESTAMP] + +partition_key + + [VARCHAR(250)] + +queued_at + + [TIMESTAMP] + +run_after + + [TIMESTAMP] + NOT NULL + +run_id + + [VARCHAR(250)] + NOT NULL + +run_type + + [VARCHAR(50)] + NOT NULL + +scheduled_by_job_id + + [INTEGER] + +span_status + + [VARCHAR(250)] + NOT NULL + +start_date + + [TIMESTAMP] + +state + + [VARCHAR(50)] + NOT NULL + +triggered_by + + [VARCHAR(50)] + +triggering_user_name + + [VARCHAR(512)] + +updated_at + + [TIMESTAMP] + NOT NULL - + dag_version:id--dag_run:created_dag_version_id - -0..N -{0,1} + +0..N +{0,1} @@ -1754,9 +1754,9 @@ dag_version:id--dag_code:dag_version_id - + 0..N -1 +1 @@ -1805,112 +1805,112 @@ dag_version:id--serialized_dag:dag_version_id - + 0..N -1 +1 - + dag_version:id--task_instance:dag_version_id - -0..N -{0,1} + +0..N +{0,1} log_template - -log_template - -id - - [INTEGER] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL - -elasticsearch_id - - [TEXT] - NOT NULL - -filename - - [TEXT] - NOT NULL + +log_template + +id + + [INTEGER] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL + +elasticsearch_id + + [TEXT] + NOT NULL + +filename + + [TEXT] + NOT NULL - + log_template:id--dag_run:log_template_id - -0..N -1 + +0..N +1 dag_run:id--dagrun_asset_event:dag_run_id - -0..N -1 + +0..N +1 asset_partition_dag_run - -asset_partition_dag_run - -id - - [INTEGER] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL - -created_dag_run_id - - [INTEGER] - -partition_key - - [VARCHAR(250)] - NOT NULL - -target_dag_id - - [VARCHAR(250)] - NOT NULL - -updated_at - - [TIMESTAMP] - NOT NULL + +asset_partition_dag_run + +id + + [INTEGER] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL + +created_dag_run_id + + [INTEGER] + +partition_key + + [VARCHAR(250)] + NOT NULL + +target_dag_id + + [VARCHAR(250)] + NOT NULL + +updated_at + + [TIMESTAMP] + NOT NULL dag_run:id--asset_partition_dag_run:created_dag_run_id - -0..N -{0,1} + +0..N +{0,1} - -dag_run:dag_id--task_instance:dag_id - -0..N -1 + +dag_run:run_id--task_instance:run_id + +0..N +1 -dag_run:run_id--task_instance:run_id - -0..N -1 +dag_run:dag_id--task_instance:dag_id + +0..N +1 @@ -1947,131 +1947,131 @@ NOT NULL - + dag_run:id--backfill_dag_run:dag_run_id - + 0..N -{0,1} +{0,1} dag_run_note - -dag_run_note - -dag_run_id - - [INTEGER] - NOT NULL - -content - - [VARCHAR(1000)] - -created_at - - [TIMESTAMP] - NOT NULL - -updated_at - - [TIMESTAMP] - NOT NULL - -user_id - - [VARCHAR(128)] + +dag_run_note + +dag_run_id + + [INTEGER] + NOT NULL + +content + + [VARCHAR(1000)] + +created_at + + [TIMESTAMP] + NOT NULL + +updated_at + + [TIMESTAMP] + NOT NULL + +user_id + + [VARCHAR(128)] dag_run:id--dag_run_note:dag_run_id - -1 -1 + +1 +1 - + dag_run:id--deadline:dagrun_id - + 0..N -{0,1} +{0,1} backfill - -backfill - -id - - [INTEGER] - NOT NULL - -completed_at - - [TIMESTAMP] - -created_at - - [TIMESTAMP] - NOT NULL - -dag_id - - [VARCHAR(250)] - NOT NULL - -dag_run_conf - - [JSON] - NOT NULL - -from_date - - [TIMESTAMP] - NOT NULL - -is_paused - - [BOOLEAN] - -max_active_runs - - [INTEGER] - NOT NULL - -reprocess_behavior - - [VARCHAR(250)] - NOT NULL - -to_date - - [TIMESTAMP] - NOT NULL - -triggering_user_name - - [VARCHAR(512)] - -updated_at - - [TIMESTAMP] - NOT NULL + +backfill + +id + + [INTEGER] + NOT NULL + +completed_at + + [TIMESTAMP] + +created_at + + [TIMESTAMP] + NOT NULL + +dag_id + + [VARCHAR(250)] + NOT NULL + +dag_run_conf + + [JSON] + NOT NULL + +from_date + + [TIMESTAMP] + NOT NULL + +is_paused + + [BOOLEAN] + +max_active_runs + + [INTEGER] + NOT NULL + +reprocess_behavior + + [VARCHAR(250)] + NOT NULL + +to_date + + [TIMESTAMP] + NOT NULL + +triggering_user_name + + [VARCHAR(512)] + +updated_at + + [TIMESTAMP] + NOT NULL backfill:id--dag_run:backfill_id - -0..N -{0,1} + +0..N +{0,1} - + backfill:id--backfill_dag_run:backfill_id - + 0..N -1 +1 @@ -2191,9 +2191,9 @@ task_instance:id--hitl_detail:ti_id - + 1 -1 +1 @@ -2233,30 +2233,30 @@ task_instance:task_id--task_map:task_id - + 0..N -1 +1 task_instance:map_index--task_map:map_index - + 0..N -1 +1 task_instance:run_id--task_map:run_id - + 0..N -1 +1 task_instance:dag_id--task_map:dag_id - + 0..N -1 +1 @@ -2297,9 +2297,9 @@ task_instance:id--task_reschedule:ti_id - + 0..N -1 +1 @@ -2348,31 +2348,31 @@ -task_instance:run_id--xcom:run_id - -0..N -1 +task_instance:dag_id--xcom:dag_id + +0..N +1 -task_instance:dag_id--xcom:dag_id - -0..N -1 +task_instance:run_id--xcom:run_id + +0..N +1 task_instance:map_index--xcom:map_index - + 0..N -1 +1 task_instance:task_id--xcom:task_id - + 0..N -1 +1 @@ -2406,9 +2406,9 @@ task_instance:id--task_instance_note:ti_id - + 1 -1 +1 @@ -2572,30 +2572,30 @@ task_instance:task_id--task_instance_history:task_id - + 0..N -1 +1 -task_instance:run_id--task_instance_history:run_id - -0..N -1 +task_instance:map_index--task_instance_history:map_index + +0..N +1 task_instance:dag_id--task_instance_history:dag_id - + 0..N -1 +1 -task_instance:map_index--task_instance_history:map_index - -0..N -1 +task_instance:run_id--task_instance_history:run_id + +0..N +1 @@ -2635,33 +2635,33 @@ task_instance:run_id--rendered_task_instance_fields:run_id - + 0..N -1 +1 -task_instance:map_index--rendered_task_instance_fields:map_index - -0..N -1 +task_instance:dag_id--rendered_task_instance_fields:dag_id + +0..N +1 task_instance:task_id--rendered_task_instance_fields:task_id - + 0..N -1 +1 -task_instance:dag_id--rendered_task_instance_fields:dag_id - -0..N -1 +task_instance:map_index--rendered_task_instance_fields:map_index + +0..N +1 - + deadline_alert:id--deadline:deadline_alert_id 0..N diff --git a/airflow-core/docs/migrations-ref.rst b/airflow-core/docs/migrations-ref.rst index 61cff3374e17b..01f0fc1c3cee9 100644 --- a/airflow-core/docs/migrations-ref.rst +++ b/airflow-core/docs/migrations-ref.rst @@ -39,7 +39,9 @@ Here's the list of all the Database Migrations that are executed via when you ru +-------------------------+------------------+-------------------+--------------------------------------------------------------+ | Revision ID | Revises ID | Airflow Version | Description | +=========================+==================+===================+==============================================================+ -| ``53ff648b8a26`` (head) | ``a5a3e5eb9b8d`` | ``3.2.0`` | Add revoked_token table. | +| ``f8c9d7e6b5a4`` (head) | ``53ff648b8a26`` | ``3.2.0`` | Standardize UUID column format for non-PostgreSQL databases. | ++-------------------------+------------------+-------------------+--------------------------------------------------------------+ +| ``53ff648b8a26`` | ``a5a3e5eb9b8d`` | ``3.2.0`` | Add revoked_token table. | +-------------------------+------------------+-------------------+--------------------------------------------------------------+ | ``a5a3e5eb9b8d`` | ``82dbd68e6171`` | ``3.2.0`` | Make external_executor_id TEXT to allow for longer | | | | | external_executor_ids. | diff --git a/airflow-core/pyproject.toml b/airflow-core/pyproject.toml index 052e4df855eed..3638c47aaa3e0 100644 --- a/airflow-core/pyproject.toml +++ b/airflow-core/pyproject.toml @@ -137,7 +137,6 @@ dependencies = [ "setproctitle>=1.3.3", # SQLAlchemy >=2.0.36 fixes Python 3.13 TypingOnly import AssertionError caused by new typing attributes (__static_attributes__, __firstlineno__) "sqlalchemy[asyncio]>=2.0.36", - "sqlalchemy-utils>=0.41.2", "svcs>=25.1.0", "tabulate>=0.9.0", "tenacity>=8.3.0", diff --git a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/task_instances.py b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/task_instances.py index 61f711d0980f7..397389994a09f 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/task_instances.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/task_instances.py @@ -19,6 +19,7 @@ from collections.abc import Iterable from datetime import datetime from typing import Annotated, Any +from uuid import UUID from pydantic import ( AliasPath, @@ -42,7 +43,7 @@ class TaskInstanceResponse(BaseModel): """TaskInstance serializer for responses.""" - id: str + id: UUID task_id: str dag_id: str run_id: str = Field(alias="dag_run_id") diff --git a/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml b/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml index 53f7b5c2c817e..72832270c4d54 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml +++ b/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml @@ -2545,6 +2545,7 @@ components: properties: id: type: string + format: uuid title: Id task_id: type: string diff --git a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml index 0d6663c3377c3..847409b00769b 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml +++ b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml @@ -12582,6 +12582,7 @@ components: properties: id: type: string + format: uuid title: Id task_id: type: string diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/deps.py b/airflow-core/src/airflow/api_fastapi/execution_api/deps.py index fce188d48ed6d..9fc8c30cb926e 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/deps.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/deps.py @@ -114,6 +114,6 @@ async def get_team_name_dep(session: AsyncSessionDep, token=JWTBearerDep) -> str .join(DagModel, DagModel.dag_id == TaskInstance.dag_id) .join(DagBundleModel, DagBundleModel.name == DagModel.bundle_name) .join(DagBundleModel.teams) - .where(TaskInstance.id == str(token.id)) + .where(TaskInstance.id == token.id) ) return await session.scalar(stmt) diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/routes/hitl.py b/airflow-core/src/airflow/api_fastapi/execution_api/routes/hitl.py index 1c91c3e5b3b84..220bd357ed222 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/routes/hitl.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/routes/hitl.py @@ -60,11 +60,10 @@ def upsert_hitl_detail( This happens when a task instance is cleared after a response has been received. This design ensures that each task instance has only one HITLDetail. """ - ti_id_str = str(task_instance_id) - hitl_detail_model = session.scalar(select(HITLDetail).where(HITLDetail.ti_id == ti_id_str)) + hitl_detail_model = session.scalar(select(HITLDetail).where(HITLDetail.ti_id == task_instance_id)) if not hitl_detail_model: hitl_detail_model = HITLDetail( - ti_id=ti_id_str, + ti_id=task_instance_id, options=payload.options, subject=payload.subject, body=payload.body, @@ -109,15 +108,14 @@ def update_hitl_detail( session: SessionDep, ) -> HITLDetailResponse: """Update the response part of a Human-in-the-loop detail for a specific Task Instance.""" - ti_id_str = str(task_instance_id) hitl_detail_model_result = session.execute( - select(HITLDetail).where(HITLDetail.ti_id == ti_id_str) + select(HITLDetail).where(HITLDetail.ti_id == task_instance_id) ).scalar() hitl_detail_model = _check_hitl_detail_exists(hitl_detail_model_result) if hitl_detail_model.response_received: raise HTTPException( status.HTTP_409_CONFLICT, - f"Human-in-the-loop detail for Task Instance with id {ti_id_str} already exists.", + f"Human-in-the-loop detail for Task Instance with id {task_instance_id} already exists.", ) hitl_detail_model.responded_by = None @@ -138,9 +136,8 @@ def get_hitl_detail( session: SessionDep, ) -> HITLDetailResponse: """Get Human-in-the-loop detail for a specific Task Instance.""" - ti_id_str = str(task_instance_id) hitl_detail_model_result = session.execute( - select(HITLDetail).where(HITLDetail.ti_id == ti_id_str), + select(HITLDetail).where(HITLDetail.ti_id == task_instance_id), ).scalar() hitl_detail_model = _check_hitl_detail_exists(hitl_detail_model_result) return HITLDetailResponse.from_hitl_detail_orm(hitl_detail_model) diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py index df9d6d9bdfc00..f22d7c125853d 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py @@ -109,9 +109,7 @@ def ti_run( This endpoint is used to start a TaskInstance that is in the QUEUED state. """ - # We only use UUID above for validation purposes - ti_id_str = str(task_instance_id) - bind_contextvars(ti_id=ti_id_str) + bind_contextvars(ti_id=str(task_instance_id)) log.debug( "Starting task instance run", hostname=ti_run_payload.hostname, @@ -140,7 +138,7 @@ def ti_run( column("next_kwargs", JSON), ) .select_from(TI) - .where(TI.id == ti_id_str) + .where(TI.id == task_instance_id) .with_for_update() ) try: @@ -164,7 +162,7 @@ def ti_run( data.pop("start_date") log.debug("Removed start_date from update as task is resuming from deferral") - query = update(TI).where(TI.id == ti_id_str).values(data) + query = update(TI).where(TI.id == task_instance_id).values(data) previous_state = ti.state @@ -244,7 +242,9 @@ def ti_run( xcom_keys = list(session.scalars(xcom_query)) task_reschedule_count = ( - session.scalar(select(func.count(TaskReschedule.id)).where(TaskReschedule.ti_id == ti_id_str)) + session.scalar( + select(func.count(TaskReschedule.id)).where(TaskReschedule.ti_id == task_instance_id) + ) or 0 ) @@ -293,12 +293,14 @@ def ti_update_state( Not all state transitions are valid, and transitioning to some states requires extra information to be passed along. (Check out the datamodels for details, the rendered docs might not reflect this accurately) """ - # We only use UUID above for validation purposes - ti_id_str = str(task_instance_id) - bind_contextvars(ti_id=ti_id_str) + bind_contextvars(ti_id=str(task_instance_id)) log.debug("Updating task instance state", new_state=ti_patch_payload.state) - old = select(TI.state, TI.try_number, TI.max_tries, TI.dag_id).where(TI.id == ti_id_str).with_for_update() + old = ( + select(TI.state, TI.try_number, TI.max_tries, TI.dag_id) + .where(TI.id == task_instance_id) + .with_for_update() + ) try: ( previous_state, @@ -338,12 +340,12 @@ def ti_update_state( # We exclude_unset to avoid updating fields that are not set in the payload data = ti_patch_payload.model_dump(exclude={"task_outlets", "outlet_events"}, exclude_unset=True) - query = update(TI).where(TI.id == ti_id_str).values(data) + query = update(TI).where(TI.id == task_instance_id).values(data) try: query, updated_state = _create_ti_state_update_query_and_update_state( ti_patch_payload=ti_patch_payload, - ti_id_str=ti_id_str, + task_instance_id=task_instance_id, session=session, query=query, dag_id=dag_id, @@ -355,7 +357,7 @@ def ti_update_state( "Error updating Task Instance state. Setting the task to failed.", payload=ti_patch_payload, ) - ti = session.get(TI, ti_id_str, with_for_update=True) + ti = session.get(TI, task_instance_id, with_for_update=True) if session.bind is not None: query = TI.duration_expression_update(timezone.utcnow(), query, session.bind) query = query.values(state=(updated_state := TaskInstanceState.FAILED)) @@ -398,14 +400,14 @@ def _handle_fail_fast_for_dag(ti: TI, dag_id: str, session: SessionDep, dag_bag: def _create_ti_state_update_query_and_update_state( *, ti_patch_payload: TIStateUpdate, - ti_id_str: str, + task_instance_id: UUID, query: Update, session: SessionDep, dag_bag: DagBagDep, dag_id: str, ) -> tuple[Update, TaskInstanceState]: if isinstance(ti_patch_payload, (TITerminalStatePayload, TIRetryStatePayload, TISuccessStatePayload)): - ti = session.get(TI, ti_id_str, with_for_update=True) + ti = session.get(TI, task_instance_id, with_for_update=True) updated_state = TaskInstanceState(ti_patch_payload.state.value) if session.bind is not None: query = TI.duration_expression_update(ti_patch_payload.end_date, query, session.bind) @@ -450,7 +452,7 @@ def _create_ti_state_update_query_and_update_state( # TODO: HANDLE execution timeout later as it requires a call to the DB # either get it from the serialised DAG or get it from the API - query = update(TI).where(TI.id == ti_id_str) + query = update(TI).where(TI.id == task_instance_id) # Store next_kwargs directly (already serialized by worker) query = query.values( @@ -477,7 +479,7 @@ def _create_ti_state_update_query_and_update_state( mysql_timestamp_max=_MYSQL_TIMESTAMP_MAX, ) data = ti_patch_payload.model_dump(exclude={"reschedule_date"}, exclude_unset=True) - query = update(TI).where(TI.id == ti_id_str).values(data) + query = update(TI).where(TI.id == task_instance_id).values(data) if session.bind is not None: query = TI.duration_expression_update(timezone.utcnow(), query, session.bind) query = query.values(state=TaskInstanceState.FAILED) @@ -486,19 +488,19 @@ def _create_ti_state_update_query_and_update_state( # in SQLA2. The task is marked as FAILED regardless. return query, TaskInstanceState.FAILED - # We can directly use ti_id_str instead of fetching the TaskInstance object to avoid SQLA2 + # We can directly use task_instance_id instead of fetching the TaskInstance object to avoid SQLA2 # lock contention issues when the TaskInstance row is already locked from before. actual_start_date = timezone.utcnow() session.add( TaskReschedule( - ti_id_str, + task_instance_id, actual_start_date, ti_patch_payload.end_date, ti_patch_payload.reschedule_date, ) ) - query = update(TI).where(TI.id == ti_id_str) + query = update(TI).where(TI.id == task_instance_id) # calculate the duration for TI table too if session.bind is not None: query = TI.duration_expression_update(ti_patch_payload.end_date, query, session.bind) @@ -524,14 +526,13 @@ def ti_skip_downstream( ti_patch_payload: TISkippedDownstreamTasksStatePayload, session: SessionDep, ): - ti_id_str = str(task_instance_id) - bind_contextvars(ti_id=ti_id_str) + bind_contextvars(ti_id=str(task_instance_id)) log.info("Skipping downstream tasks", task_count=len(ti_patch_payload.tasks)) now = timezone.utcnow() tasks = ti_patch_payload.tasks - query_result = session.execute(select(TI.dag_id, TI.run_id).where(TI.id == ti_id_str)) + query_result = session.execute(select(TI.dag_id, TI.run_id).where(TI.id == task_instance_id)) row_result = query_result.fetchone() if row_result is None: raise HTTPException( @@ -572,14 +573,13 @@ def ti_heartbeat( session: SessionDep, ): """Update the heartbeat of a TaskInstance to mark it as alive & still running.""" - ti_id_str = str(task_instance_id) - bind_contextvars(ti_id=ti_id_str) + bind_contextvars(ti_id=str(task_instance_id)) log.debug("Processing heartbeat", hostname=ti_payload.hostname, pid=ti_payload.pid) # Hot path: since heartbeating a task is a very common operation, we try to do minimize the number of queries # and DB round trips as much as possible. - old = select(TI.state, TI.hostname, TI.pid).where(TI.id == ti_id_str).with_for_update() + old = select(TI.state, TI.hostname, TI.pid).where(TI.id == task_instance_id).with_for_update() try: (previous_state, hostname, pid) = session.execute(old).one() @@ -626,7 +626,7 @@ def ti_heartbeat( ) # Update the last heartbeat time! - session.execute(update(TI).where(TI.id == ti_id_str).values(last_heartbeat_at=timezone.utcnow())) + session.execute(update(TI).where(TI.id == task_instance_id).values(last_heartbeat_at=timezone.utcnow())) log.debug("Heartbeat updated", state=previous_state) @@ -651,11 +651,10 @@ def ti_put_rtif( session: SessionDep, ): """Add an RTIF entry for a task instance, sent by the worker.""" - ti_id_str = str(task_instance_id) - bind_contextvars(ti_id=ti_id_str) + bind_contextvars(ti_id=str(task_instance_id)) log.info("Updating RenderedTaskInstanceFields", field_count=len(put_rtif_payload)) - task_instance = session.scalar(select(TI).where(TI.id == ti_id_str)) + task_instance = session.scalar(select(TI).where(TI.id == task_instance_id)) if not task_instance: log.error("Task Instance not found") raise HTTPException( @@ -681,8 +680,7 @@ def ti_patch_rendered_map_index( session: SessionDep, ): """Update rendered_map_index for a task instance, sent by the worker during task execution.""" - ti_id_str = str(task_instance_id) - bind_contextvars(ti_id=ti_id_str) + bind_contextvars(ti_id=str(task_instance_id)) if not rendered_map_index: log.error("rendered_map_index cannot be empty") @@ -693,7 +691,7 @@ def ti_patch_rendered_map_index( log.debug("Updating rendered_map_index", length=len(rendered_map_index)) - query = update(TI).where(TI.id == ti_id_str).values(rendered_map_index=rendered_map_index) + query = update(TI).where(TI.id == task_instance_id).values(rendered_map_index=rendered_map_index) result = session.execute(query) result = cast("CursorResult[Any]", result) @@ -720,11 +718,10 @@ def get_previous_successful_dagrun( The data from this endpoint is used to get values for Task Context. """ - ti_id_str = str(task_instance_id) - bind_contextvars(ti_id=ti_id_str) + bind_contextvars(ti_id=str(task_instance_id)) log.debug("Retrieving previous successful DAG run") - task_instance = session.scalar(select(TI).where(TI.id == ti_id_str)) + task_instance = session.scalar(select(TI).where(TI.id == task_instance_id)) if not task_instance or not task_instance.logical_date: log.debug("No task instance or logical date found") return PrevSuccessfulDagRunResponse() @@ -977,10 +974,9 @@ def validate_inlets_and_outlets( dag_bag: DagBagDep, ) -> InactiveAssetsResponse: """Validate whether there're inactive assets in inlets and outlets of a given task instance.""" - ti_id_str = str(task_instance_id) - bind_contextvars(ti_id=ti_id_str) + bind_contextvars(ti_id=str(task_instance_id)) - ti = session.scalar(select(TI).where(TI.id == ti_id_str)) + ti = session.scalar(select(TI).where(TI.id == task_instance_id)) if not ti: log.error("Task Instance not found") raise HTTPException( diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_reschedules.py b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_reschedules.py index f763858f9b9c1..3c7d4c8070f34 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_reschedules.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_reschedules.py @@ -39,7 +39,7 @@ def get_start_date(task_instance_id: UUID, session: SessionDep) -> UtcDateTime | """Get the first reschedule date if found, None if no records exist.""" start_date = session.scalar( select(TaskReschedule.start_date) - .where(TaskReschedule.ti_id == str(task_instance_id)) + .where(TaskReschedule.ti_id == task_instance_id) .order_by(TaskReschedule.id.asc()) .limit(1) ) diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py b/airflow-core/src/airflow/jobs/scheduler_job_runner.py index 765bb9e5db1a7..44c8910791c6f 100644 --- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py +++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py @@ -32,6 +32,7 @@ from functools import lru_cache, partial from itertools import groupby from typing import TYPE_CHECKING, Any +from uuid import UUID from sqlalchemy import ( and_, @@ -1369,7 +1370,7 @@ def _end_active_spans(self, session: Session = NEW_SESSION): prefix, sep, key = prefixed_key.partition(":") if prefix == "ti": - ti_result = session.get(TaskInstance, key) + ti_result = session.get(TaskInstance, UUID(key)) if ti_result is None: continue ti: TaskInstance = ti_result @@ -1438,14 +1439,14 @@ def _end_spans_of_externally_ended_ops(self, session: Session): dag_run.span_status = SpanStatus.ENDED for ti in tis_should_end: - active_ti_span = self.active_spans.get("ti:" + ti.id) + active_ti_span = self.active_spans.get(f"ti:{ti.id}") if active_ti_span is not None: if ti.state in State.finished: self.set_ti_span_attrs(span=active_ti_span, state=ti.state, ti=ti) active_ti_span.end(end_time=datetime_to_nano(ti.end_date)) else: active_ti_span.end() - self.active_spans.delete("ti:" + ti.id) + self.active_spans.delete(f"ti:{ti.id}") ti.span_status = SpanStatus.ENDED def _recreate_unhealthy_scheduler_spans_if_needed(self, dag_run: DagRun, session: Session): @@ -1495,7 +1496,7 @@ def _recreate_unhealthy_scheduler_spans_if_needed(self, dag_run: DagRun, session for ti in tis # If it has started and there is a reference on the active_spans dict, # then it was started by the current scheduler. - if ti.start_date is not None and self.active_spans.get("ti:" + ti.id) is None + if ti.start_date is not None and self.active_spans.get(f"ti:{ti.id}") is None ] dr_context = Trace.extract(dag_run.context_carrier) @@ -1515,7 +1516,7 @@ def _recreate_unhealthy_scheduler_spans_if_needed(self, dag_run: DagRun, session ti.span_status = SpanStatus.ENDED else: ti.span_status = SpanStatus.ACTIVE - self.active_spans.set("ti:" + ti.id, ti_span) + self.active_spans.set(f"ti:{ti.id}", ti_span) def _run_scheduler_loop(self) -> None: """ diff --git a/airflow-core/src/airflow/migrations/versions/0047_3_0_0_add_dag_versioning.py b/airflow-core/src/airflow/migrations/versions/0047_3_0_0_add_dag_versioning.py index 184c0a98218e0..c55416aaaf2a7 100644 --- a/airflow-core/src/airflow/migrations/versions/0047_3_0_0_add_dag_versioning.py +++ b/airflow-core/src/airflow/migrations/versions/0047_3_0_0_add_dag_versioning.py @@ -29,7 +29,6 @@ import sqlalchemy as sa from alembic import op -from sqlalchemy_utils import UUIDType from airflow._shared.timezones import timezone from airflow.migrations.db_types import TIMESTAMP, StringID @@ -51,7 +50,7 @@ def upgrade(): op.create_table( "dag_version", - sa.Column("id", UUIDType(binary=False), nullable=False), + sa.Column("id", sa.Uuid(), nullable=False), sa.Column("version_number", sa.Integer(), nullable=False), sa.Column("dag_id", StringID(), nullable=False), sa.Column("created_at", TIMESTAMP(), nullable=False, default=timezone.utcnow), @@ -77,9 +76,9 @@ def upgrade(): with op.batch_alter_table("dag_code") as batch_op: batch_op.drop_column("fileloc_hash") - batch_op.add_column(sa.Column("id", UUIDType(binary=False), nullable=False)) + batch_op.add_column(sa.Column("id", sa.Uuid(), nullable=False)) batch_op.create_primary_key("dag_code_pkey", ["id"]) - batch_op.add_column(sa.Column("dag_version_id", UUIDType(binary=False), nullable=False)) + batch_op.add_column(sa.Column("dag_version_id", sa.Uuid(), nullable=False)) batch_op.add_column(sa.Column("source_code_hash", sa.String(length=32), nullable=False)) batch_op.add_column(sa.Column("dag_id", StringID(), nullable=False)) batch_op.add_column(sa.Column("created_at", TIMESTAMP(), default=timezone.utcnow, nullable=False)) @@ -99,8 +98,8 @@ def upgrade(): batch_op.drop_index("idx_fileloc_hash") batch_op.drop_column("fileloc_hash") batch_op.drop_column("fileloc") - batch_op.add_column(sa.Column("id", UUIDType(binary=False), nullable=False)) - batch_op.add_column(sa.Column("dag_version_id", UUIDType(binary=False), nullable=False)) + batch_op.add_column(sa.Column("id", sa.Uuid(), nullable=False)) + batch_op.add_column(sa.Column("dag_version_id", sa.Uuid(), nullable=False)) batch_op.add_column(sa.Column("created_at", TIMESTAMP(), default=timezone.utcnow, nullable=False)) batch_op.create_primary_key("serialized_dag_pkey", ["id"]) batch_op.create_foreign_key( @@ -113,7 +112,7 @@ def upgrade(): batch_op.create_unique_constraint("serialized_dag_dag_version_id_uq", ["dag_version_id"]) with op.batch_alter_table("task_instance", schema=None) as batch_op: - batch_op.add_column(sa.Column("dag_version_id", UUIDType(binary=False))) + batch_op.add_column(sa.Column("dag_version_id", sa.Uuid())) batch_op.create_foreign_key( batch_op.f("task_instance_dag_version_id_fkey"), "dag_version", @@ -123,11 +122,11 @@ def upgrade(): ) with op.batch_alter_table("task_instance_history", schema=None) as batch_op: - batch_op.add_column(sa.Column("dag_version_id", UUIDType(binary=False))) + batch_op.add_column(sa.Column("dag_version_id", sa.Uuid())) with op.batch_alter_table("dag_run", schema=None) as batch_op: batch_op.drop_column("dag_hash") - batch_op.add_column(sa.Column("created_dag_version_id", UUIDType(binary=False), nullable=True)) + batch_op.add_column(sa.Column("created_dag_version_id", sa.Uuid(), nullable=True)) batch_op.create_foreign_key( "created_dag_version_id_fkey", "dag_version", diff --git a/airflow-core/src/airflow/migrations/versions/0052_3_0_0_add_deadline_alerts_table.py b/airflow-core/src/airflow/migrations/versions/0052_3_0_0_add_deadline_alerts_table.py index e310d8e28f13e..0868ae4fdd145 100644 --- a/airflow-core/src/airflow/migrations/versions/0052_3_0_0_add_deadline_alerts_table.py +++ b/airflow-core/src/airflow/migrations/versions/0052_3_0_0_add_deadline_alerts_table.py @@ -29,7 +29,6 @@ import sqlalchemy as sa from alembic import op -from sqlalchemy_utils import UUIDType from airflow.migrations.db_types import StringID from airflow.models import ID_LEN @@ -45,7 +44,7 @@ def upgrade(): op.create_table( "deadline", - sa.Column("id", UUIDType(binary=False), nullable=False), + sa.Column("id", sa.Uuid(), nullable=False), sa.Column("dag_id", StringID(length=ID_LEN), nullable=True), sa.Column("dagrun_id", sa.Integer(), nullable=True), sa.Column("deadline", sa.DateTime(), nullable=False), diff --git a/airflow-core/src/airflow/migrations/versions/0068_3_0_0_ti_table_id_unique_per_try.py b/airflow-core/src/airflow/migrations/versions/0068_3_0_0_ti_table_id_unique_per_try.py index da67f0e4f9ebf..3f11cbb4b74b1 100644 --- a/airflow-core/src/airflow/migrations/versions/0068_3_0_0_ti_table_id_unique_per_try.py +++ b/airflow-core/src/airflow/migrations/versions/0068_3_0_0_ti_table_id_unique_per_try.py @@ -29,7 +29,6 @@ import sqlalchemy as sa from alembic import op -from sqlalchemy_utils import UUIDType # revision identifiers, used by Alembic. revision = "29ce7909c52b" @@ -40,9 +39,9 @@ def _get_uuid_type(dialect_name: str) -> sa.types.TypeEngine: - if dialect_name != "postgres": + if dialect_name != "postgresql": return sa.String(36) - return UUIDType(binary=False) + return sa.Uuid() def upgrade(): @@ -116,6 +115,4 @@ def downgrade(): # This has to be in a separate batch, else on sqlite it throws `sqlalchemy.exc.CircularDependencyError` # (and on non sqlite batching isn't "a thing", it issue alter tables fine) with op.batch_alter_table("task_instance_history", schema=None) as batch_op: - batch_op.add_column( - sa.Column("task_instance_id", UUIDType(binary=False), autoincrement=False, nullable=True) - ) + batch_op.add_column(sa.Column("task_instance_id", sa.Uuid(), autoincrement=False, nullable=True)) diff --git a/airflow-core/src/airflow/migrations/versions/0079_3_1_0_add_url_and_template_params_to_dagbundle_model.py b/airflow-core/src/airflow/migrations/versions/0079_3_1_0_add_url_and_template_params_to_dagbundle_model.py index a5b3787c8758a..7cc5586b6c73f 100644 --- a/airflow-core/src/airflow/migrations/versions/0079_3_1_0_add_url_and_template_params_to_dagbundle_model.py +++ b/airflow-core/src/airflow/migrations/versions/0079_3_1_0_add_url_and_template_params_to_dagbundle_model.py @@ -29,7 +29,6 @@ import sqlalchemy as sa from alembic import op -from sqlalchemy_utils import JSONType # revision identifiers, used by Alembic. revision = "3bda03debd04" @@ -43,7 +42,7 @@ def upgrade(): """Apply Add url and template params to DagBundleModel.""" with op.batch_alter_table("dag_bundle", schema=None) as batch_op: batch_op.add_column(sa.Column("signed_url_template", sa.String(length=200), nullable=True)) - batch_op.add_column(sa.Column("template_params", JSONType(), nullable=True)) + batch_op.add_column(sa.Column("template_params", sa.JSON(), nullable=True)) def downgrade(): diff --git a/airflow-core/src/airflow/migrations/versions/0083_3_1_0_add_teams.py b/airflow-core/src/airflow/migrations/versions/0083_3_1_0_add_teams.py index 8379b1e82828a..d73784eb285f1 100644 --- a/airflow-core/src/airflow/migrations/versions/0083_3_1_0_add_teams.py +++ b/airflow-core/src/airflow/migrations/versions/0083_3_1_0_add_teams.py @@ -31,7 +31,6 @@ import sqlalchemy as sa from alembic import op -from sqlalchemy_utils import UUIDType from airflow.migrations.db_types import StringID @@ -47,7 +46,7 @@ def upgrade(): """Create team table.""" op.create_table( "team", - sa.Column("id", UUIDType(binary=False), nullable=False), + sa.Column("id", sa.Uuid(), nullable=False), sa.Column("name", sa.String(50), nullable=False), sa.PrimaryKeyConstraint("id", name=op.f("team_pkey")), sa.UniqueConstraint("name", name="team_name_uq"), @@ -57,7 +56,7 @@ def upgrade(): op.create_table( "dag_bundle_team", sa.Column("dag_bundle_name", StringID(), nullable=False), - sa.Column("team_id", UUIDType(binary=False), nullable=False), + sa.Column("team_id", sa.Uuid(), nullable=False), sa.PrimaryKeyConstraint("dag_bundle_name", "team_id", name=op.f("dag_bundle_team_pkey")), sa.ForeignKeyConstraint( columns=("dag_bundle_name",), @@ -79,15 +78,15 @@ def upgrade(): """Update `connection` table to add `team_id` column""" with op.batch_alter_table("connection") as batch_op: - batch_op.add_column(sa.Column("team_id", UUIDType(binary=False), nullable=True)) + batch_op.add_column(sa.Column("team_id", sa.Uuid(), nullable=True)) batch_op.create_foreign_key(batch_op.f("connection_team_id_fkey"), "team", ["team_id"], ["id"]) """Update `variable` table to add `team_id` column""" with op.batch_alter_table("variable") as batch_op: - batch_op.add_column(sa.Column("team_id", UUIDType(binary=False), nullable=True)) + batch_op.add_column(sa.Column("team_id", sa.Uuid(), nullable=True)) batch_op.create_foreign_key(batch_op.f("variable_team_id_fkey"), "team", ["team_id"], ["id"]) """Update `slot_pool` table to add `team_id` column""" with op.batch_alter_table("slot_pool") as batch_op: - batch_op.add_column(sa.Column("team_id", UUIDType(binary=False), nullable=True)) + batch_op.add_column(sa.Column("team_id", sa.Uuid(), nullable=True)) batch_op.create_foreign_key(batch_op.f("slot_pool_team_id_fkey"), "team", ["team_id"], ["id"]) diff --git a/airflow-core/src/airflow/migrations/versions/0091_3_2_0_restructure_callback_table.py b/airflow-core/src/airflow/migrations/versions/0091_3_2_0_restructure_callback_table.py index 4a2a94a8dc4f6..d7037393ce6b1 100644 --- a/airflow-core/src/airflow/migrations/versions/0091_3_2_0_restructure_callback_table.py +++ b/airflow-core/src/airflow/migrations/versions/0091_3_2_0_restructure_callback_table.py @@ -29,7 +29,6 @@ import sqlalchemy as sa from alembic import op -from sqlalchemy_utils import UUIDType from airflow.utils.sqlalchemy import ExtendedJSON @@ -64,7 +63,7 @@ def upgrade(): # Replace INTEGER id with UUID id batch_op.drop_column("id") - batch_op.add_column(sa.Column("id", UUIDType(binary=False), nullable=False)) + batch_op.add_column(sa.Column("id", sa.Uuid(), nullable=False)) batch_op.create_primary_key("callback_pkey", ["id"]) batch_op.drop_column("callback_data") diff --git a/airflow-core/src/airflow/migrations/versions/0092_3_2_0_replace_deadline_inline_callback_with_fkey.py b/airflow-core/src/airflow/migrations/versions/0092_3_2_0_replace_deadline_inline_callback_with_fkey.py index 0e023825e6bc6..245c8c0cc6f15 100644 --- a/airflow-core/src/airflow/migrations/versions/0092_3_2_0_replace_deadline_inline_callback_with_fkey.py +++ b/airflow-core/src/airflow/migrations/versions/0092_3_2_0_replace_deadline_inline_callback_with_fkey.py @@ -33,7 +33,6 @@ import sqlalchemy as sa from alembic import context, op from sqlalchemy import column, select, table -from sqlalchemy_utils import UUIDType from airflow.serialization.serde import deserialize from airflow.utils.sqlalchemy import ExtendedJSON, UtcDateTime @@ -125,13 +124,13 @@ def migrate_all_data(): deadline_table = table( "deadline", - column("id", UUIDType(binary=False)), + column("id", sa.Uuid()), column("dagrun_id", sa.Integer()), column("deadline_time", UtcDateTime(timezone=True)), column("callback", sa.JSON()), column("callback_state", sa.String(20)), column("missed", sa.Boolean()), - column("callback_id", UUIDType(binary=False)), + column("callback_id", sa.Uuid()), ) dag_run_table = table( @@ -142,7 +141,7 @@ def migrate_all_data(): callback_table = table( "callback", - column("id", UUIDType(binary=False)), + column("id", sa.Uuid()), column("type", sa.String(20)), column("fetch_method", sa.String(20)), column("data", ExtendedJSON()), @@ -178,14 +177,14 @@ def migrate_all_data(): # Add new columns (temporarily nullable until data has been migrated) with op.batch_alter_table("deadline") as batch_op: batch_op.add_column(sa.Column("missed", sa.Boolean(), nullable=True)) - batch_op.add_column(sa.Column("callback_id", UUIDType(binary=False), nullable=True)) + batch_op.add_column(sa.Column("callback_id", sa.Uuid(), nullable=True)) migrate_all_data() with op.batch_alter_table("deadline") as batch_op: # Data for `missed` and `callback_id` has been migrated so make them non-nullable batch_op.alter_column("missed", existing_type=sa.Boolean(), nullable=False) - batch_op.alter_column("callback_id", existing_type=UUIDType(binary=False), nullable=False) + batch_op.alter_column("callback_id", existing_type=sa.Uuid(), nullable=False) batch_op.create_index("deadline_missed_deadline_time_idx", ["missed", "deadline_time"], unique=False) batch_op.drop_index(batch_op.f("deadline_callback_state_time_idx")) @@ -274,8 +273,8 @@ def migrate_all_data(): deadline_table = table( "deadline", - column("id", UUIDType(binary=False)), - column("callback_id", UUIDType(binary=False)), + column("id", sa.Uuid()), + column("callback_id", sa.Uuid()), column("callback", sa.JSON()), column("callback_state", sa.String(20)), column("trigger_id", sa.Integer()), @@ -283,7 +282,7 @@ def migrate_all_data(): callback_table = table( "callback", - column("id", UUIDType(binary=False)), + column("id", sa.Uuid()), column("data", ExtendedJSON()), column("state", sa.String(10)), ) @@ -319,7 +318,7 @@ def migrate_all_data(): batch_op.add_column(sa.Column("callback", sa.JSON(), nullable=True)) # Make callback_id nullable so the associated callbacks can be cleared during migration - batch_op.alter_column("callback_id", existing_type=UUIDType(binary=False), nullable=True) + batch_op.alter_column("callback_id", existing_type=sa.Uuid(), nullable=True) migrate_all_data() diff --git a/airflow-core/src/airflow/migrations/versions/0099_3_2_0_ui_improvements_for_deadlines.py b/airflow-core/src/airflow/migrations/versions/0099_3_2_0_ui_improvements_for_deadlines.py index 14a1ac855b147..581b38caaa271 100644 --- a/airflow-core/src/airflow/migrations/versions/0099_3_2_0_ui_improvements_for_deadlines.py +++ b/airflow-core/src/airflow/migrations/versions/0099_3_2_0_ui_improvements_for_deadlines.py @@ -39,7 +39,6 @@ import sqlalchemy as sa import uuid6 from alembic import context, op -from sqlalchemy_utils import UUIDType from airflow._shared.timezones import timezone from airflow.configuration import conf @@ -82,9 +81,9 @@ def upgrade() -> None: op.create_table( "deadline_alert", - sa.Column("id", UUIDType(binary=False), default=uuid6.uuid7), + sa.Column("id", sa.Uuid(), default=uuid6.uuid7), sa.Column("created_at", UtcDateTime, nullable=False), - sa.Column("serialized_dag_id", UUIDType(binary=False), nullable=False), + sa.Column("serialized_dag_id", sa.Uuid(), nullable=False), sa.Column("name", sa.String(250), nullable=True), sa.Column("description", sa.Text(), nullable=True), sa.Column("reference", sa.JSON(), nullable=False), @@ -100,7 +99,7 @@ def upgrade() -> None: conn.execute(sa.text("PRAGMA foreign_keys=OFF")) with op.batch_alter_table("deadline", schema=None) as batch_op: - batch_op.add_column(sa.Column("deadline_alert_id", UUIDType(binary=False), nullable=True)) + batch_op.add_column(sa.Column("deadline_alert_id", sa.Uuid(), nullable=True)) batch_op.add_column(sa.Column("created_at", UtcDateTime, nullable=True)) batch_op.add_column(sa.Column("last_updated_at", UtcDateTime, nullable=True)) batch_op.create_foreign_key( diff --git a/airflow-core/src/airflow/migrations/versions/0103_3_2_0_fix_uuid_column_types.py b/airflow-core/src/airflow/migrations/versions/0103_3_2_0_fix_uuid_column_types.py new file mode 100644 index 0000000000000..be901ecf656b3 --- /dev/null +++ b/airflow-core/src/airflow/migrations/versions/0103_3_2_0_fix_uuid_column_types.py @@ -0,0 +1,276 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Standardize UUID column format for non-PostgreSQL databases. + +Several columns used String(36) to store UUIDs in the 36-character dashed format +(e.g., "019c32fd-f36f-77a4-b5ba-098064c38b15"). This migration standardizes UUID +storage to the 32-character hex format (e.g., "019c32fdf36f77a4b5ba098064c38b15") +used by SQLAlchemy's native sa.Uuid() type on non-PostgreSQL databases. + +Affected columns: + - task_instance.id (PK) + - task_instance_note.ti_id (FK -> task_instance.id) + - task_reschedule.ti_id (FK -> task_instance.id) + - hitl_detail.ti_id (FK -> task_instance.id) + - task_instance_history.task_instance_id (PK) + - hitl_detail_history.ti_history_id (FK -> task_instance_history.task_instance_id) + +Revision ID: f8c9d7e6b5a4 +Revises: 53ff648b8a26 +Create Date: 2026-02-06 12:00:00.000000 + +""" + +from __future__ import annotations + +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision = "f8c9d7e6b5a4" +down_revision = "53ff648b8a26" +branch_labels = None +depends_on = None +airflow_version = "3.2.0" + + +def upgrade(): + """ + Standardize UUID storage format for non-PostgreSQL databases. + + On PostgreSQL: No action needed (uses native UUID type). + On MySQL/SQLite: Convert UUID values from 36-char dashed format to 32-char hex format + and change column type from VARCHAR(36) to CHAR(32) to match sa.Uuid() behavior. + """ + conn = op.get_bind() + dialect_name = conn.dialect.name + + # PostgreSQL uses native UUID type, no fix needed + if dialect_name == "postgresql": + return + + # Step 1: Drop all incoming FK constraints that reference the columns being changed. + # Even with ON UPDATE CASCADE, we need to drop FKs because we're changing the column + # TYPE (not just data). On SQLite, batch_alter_table recreates tables entirely, so + # incoming FK references would point to the old (dropped) table. + + # FKs referencing task_instance.id + with op.batch_alter_table("task_instance_note", schema=None) as batch_op: + batch_op.drop_constraint("task_instance_note_ti_fkey", type_="foreignkey") + + with op.batch_alter_table("task_reschedule", schema=None) as batch_op: + batch_op.drop_constraint("task_reschedule_ti_fkey", type_="foreignkey") + + with op.batch_alter_table("hitl_detail", schema=None) as batch_op: + batch_op.drop_constraint("hitl_detail_ti_fkey", type_="foreignkey") + + # FK referencing task_instance_history.task_instance_id + with op.batch_alter_table("hitl_detail_history", schema=None) as batch_op: + batch_op.drop_constraint("hitl_detail_history_tih_fkey", type_="foreignkey") + + # Step 2: Convert UUID values from 36-char (with dashes) to 32-char (hex only) + # This must happen BEFORE changing the column type to avoid truncation. + for table, column in [ + ("task_instance", "id"), + ("task_instance_note", "ti_id"), + ("task_reschedule", "ti_id"), + ("hitl_detail", "ti_id"), + ("task_instance_history", "task_instance_id"), + ("hitl_detail_history", "ti_history_id"), + ]: + conn.execute(sa.text(f"UPDATE {table} SET {column} = REPLACE({column}, '-', '')")) + + # Step 3: Change column types from String(36) to Uuid() + # Using sa.Uuid() directly so Alembic renders the correct dialect-specific type + # (CHAR(32) on SQLite/MySQL) and schema comparison matches the ORM models. + for table, column in [ + ("task_instance", "id"), + ("task_instance_note", "ti_id"), + ("task_reschedule", "ti_id"), + ("hitl_detail", "ti_id"), + ("task_instance_history", "task_instance_id"), + ("hitl_detail_history", "ti_history_id"), + ]: + with op.batch_alter_table(table, schema=None) as batch_op: + batch_op.alter_column( + column, + existing_type=sa.String(36), + type_=sa.Uuid(), + existing_nullable=False, + ) + + # Step 4: Recreate all FK constraints + with op.batch_alter_table("task_instance_note", schema=None) as batch_op: + batch_op.create_foreign_key( + "task_instance_note_ti_fkey", + "task_instance", + ["ti_id"], + ["id"], + ondelete="CASCADE", + onupdate="CASCADE", + ) + + with op.batch_alter_table("task_reschedule", schema=None) as batch_op: + batch_op.create_foreign_key( + "task_reschedule_ti_fkey", + "task_instance", + ["ti_id"], + ["id"], + ondelete="CASCADE", + ) + + with op.batch_alter_table("hitl_detail", schema=None) as batch_op: + batch_op.create_foreign_key( + "hitl_detail_ti_fkey", + "task_instance", + ["ti_id"], + ["id"], + ondelete="CASCADE", + onupdate="CASCADE", + ) + + with op.batch_alter_table("hitl_detail_history", schema=None) as batch_op: + batch_op.create_foreign_key( + "hitl_detail_history_tih_fkey", + "task_instance_history", + ["ti_history_id"], + ["task_instance_id"], + ondelete="CASCADE", + onupdate="CASCADE", + ) + + +def downgrade(): + """ + Revert UUID storage format standardization. + + On PostgreSQL: No action needed (uses native UUID type). + On MySQL/SQLite: Revert CHAR(32) back to VARCHAR(36) and convert UUID values + from 32-char hex format to 36-char dashed format. + """ + conn = op.get_bind() + dialect_name = conn.dialect.name + + # PostgreSQL uses native UUID type, no fix needed + if dialect_name == "postgresql": + return + + # Step 1: Drop all FK constraints before modifying columns + with op.batch_alter_table("task_instance_note", schema=None) as batch_op: + batch_op.drop_constraint("task_instance_note_ti_fkey", type_="foreignkey") + + with op.batch_alter_table("task_reschedule", schema=None) as batch_op: + batch_op.drop_constraint("task_reschedule_ti_fkey", type_="foreignkey") + + with op.batch_alter_table("hitl_detail", schema=None) as batch_op: + batch_op.drop_constraint("hitl_detail_ti_fkey", type_="foreignkey") + + with op.batch_alter_table("hitl_detail_history", schema=None) as batch_op: + batch_op.drop_constraint("hitl_detail_history_tih_fkey", type_="foreignkey") + + # Step 2: Expand column types back to String(36) to make room for dashes + for table, column in [ + ("task_instance", "id"), + ("task_instance_note", "ti_id"), + ("task_reschedule", "ti_id"), + ("hitl_detail", "ti_id"), + ("task_instance_history", "task_instance_id"), + ("hitl_detail_history", "ti_history_id"), + ]: + with op.batch_alter_table(table, schema=None) as batch_op: + batch_op.alter_column( + column, + existing_type=sa.Uuid(), + type_=sa.String(36), + existing_nullable=False, + ) + + # Step 3: Convert UUID values from 32-char (hex only) to 36-char (with dashes) + # Format: xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx (8-4-4-4-12) + for table, column in [ + ("task_instance", "id"), + ("task_instance_note", "ti_id"), + ("task_reschedule", "ti_id"), + ("hitl_detail", "ti_id"), + ("task_instance_history", "task_instance_id"), + ("hitl_detail_history", "ti_history_id"), + ]: + if dialect_name == "mysql": + conn.execute( + sa.text( + f"UPDATE {table} SET {column} = CONCAT(" + f"SUBSTR({column}, 1, 8), '-', " + f"SUBSTR({column}, 9, 4), '-', " + f"SUBSTR({column}, 13, 4), '-', " + f"SUBSTR({column}, 17, 4), '-', " + f"SUBSTR({column}, 21, 12))" + ) + ) + else: # sqlite + conn.execute( + sa.text( + f"UPDATE {table} SET {column} = " + f"SUBSTR({column}, 1, 8) || '-' || " + f"SUBSTR({column}, 9, 4) || '-' || " + f"SUBSTR({column}, 13, 4) || '-' || " + f"SUBSTR({column}, 17, 4) || '-' || " + f"SUBSTR({column}, 21, 12)" + ) + ) + + # Step 4: Recreate all FK constraints + with op.batch_alter_table("task_instance_note", schema=None) as batch_op: + batch_op.create_foreign_key( + "task_instance_note_ti_fkey", + "task_instance", + ["ti_id"], + ["id"], + ondelete="CASCADE", + onupdate="CASCADE", + ) + + with op.batch_alter_table("task_reschedule", schema=None) as batch_op: + batch_op.create_foreign_key( + "task_reschedule_ti_fkey", + "task_instance", + ["ti_id"], + ["id"], + ondelete="CASCADE", + ) + + with op.batch_alter_table("hitl_detail", schema=None) as batch_op: + batch_op.create_foreign_key( + "hitl_detail_ti_fkey", + "task_instance", + ["ti_id"], + ["id"], + ondelete="CASCADE", + onupdate="CASCADE", + ) + + with op.batch_alter_table("hitl_detail_history", schema=None) as batch_op: + batch_op.create_foreign_key( + "hitl_detail_history_tih_fkey", + "task_instance_history", + ["ti_history_id"], + ["task_instance_id"], + ondelete="CASCADE", + onupdate="CASCADE", + ) diff --git a/airflow-core/src/airflow/models/callback.py b/airflow-core/src/airflow/models/callback.py index 30d3ba93b9ed5..ea45a10f1f1ae 100644 --- a/airflow-core/src/airflow/models/callback.py +++ b/airflow-core/src/airflow/models/callback.py @@ -20,12 +20,12 @@ from enum import Enum from importlib import import_module from typing import TYPE_CHECKING, Any, Protocol, runtime_checkable +from uuid import UUID import structlog import uuid6 -from sqlalchemy import ForeignKey, Integer, String, Text +from sqlalchemy import ForeignKey, Integer, String, Text, Uuid from sqlalchemy.orm import Mapped, mapped_column, relationship -from sqlalchemy_utils import UUIDType from airflow._shared.observability.metrics.stats import Stats from airflow._shared.timezones import timezone @@ -108,7 +108,7 @@ class Callback(Base): __tablename__ = "callback" - id: Mapped[str] = mapped_column(UUIDType(binary=False), primary_key=True, default=uuid6.uuid7) + id: Mapped[UUID] = mapped_column(Uuid(), primary_key=True, default=uuid6.uuid7) # This is used by SQLAlchemy to be able to deserialize DB rows to subclasses __mapper_args__ = { diff --git a/airflow-core/src/airflow/models/dag_version.py b/airflow-core/src/airflow/models/dag_version.py index 4af7a4c5505cf..cfdb301d0410e 100644 --- a/airflow-core/src/airflow/models/dag_version.py +++ b/airflow-core/src/airflow/models/dag_version.py @@ -20,11 +20,12 @@ import logging from datetime import datetime from typing import TYPE_CHECKING +from uuid import UUID +import sqlalchemy as sa import uuid6 from sqlalchemy import ForeignKey, Integer, UniqueConstraint, select from sqlalchemy.orm import Mapped, joinedload, mapped_column, relationship -from sqlalchemy_utils import UUIDType from airflow._shared.timezones import timezone from airflow.dag_processing.bundles.manager import DagBundlesManager @@ -44,7 +45,7 @@ class DagVersion(Base): """Model to track the versions of DAGs in the database.""" __tablename__ = "dag_version" - id: Mapped[str] = mapped_column(UUIDType(binary=False), primary_key=True, default=uuid6.uuid7) + id: Mapped[UUID] = mapped_column(sa.Uuid(), primary_key=True, default=uuid6.uuid7) version_number: Mapped[int] = mapped_column(Integer, nullable=False, default=1) dag_id: Mapped[str] = mapped_column( StringID(), ForeignKey("dag.dag_id", ondelete="CASCADE"), nullable=False diff --git a/airflow-core/src/airflow/models/dagbag.py b/airflow-core/src/airflow/models/dagbag.py index 9b965a9460734..3d20c323e031c 100644 --- a/airflow-core/src/airflow/models/dagbag.py +++ b/airflow-core/src/airflow/models/dagbag.py @@ -19,6 +19,7 @@ import hashlib from typing import TYPE_CHECKING, Any +from uuid import UUID from sqlalchemy import String, inspect, select from sqlalchemy.orm import Mapped, joinedload, mapped_column @@ -45,7 +46,7 @@ class DBDagBag: """ def __init__(self, load_op_links: bool = True) -> None: - self._dags: dict[str, SerializedDAG] = {} # dag_version_id to dag + self._dags: dict[UUID, SerializedDAG] = {} # dag_version_id to dag self.load_op_links = load_op_links def _read_dag(self, serdag: SerializedDagModel) -> SerializedDAG | None: @@ -54,7 +55,7 @@ def _read_dag(self, serdag: SerializedDagModel) -> SerializedDAG | None: self._dags[serdag.dag_version_id] = dag return dag - def _get_dag(self, version_id: str, session: Session) -> SerializedDAG | None: + def _get_dag(self, version_id: UUID, session: Session) -> SerializedDAG | None: if dag := self._dags.get(version_id): return dag dag_version = session.get(DagVersion, version_id, options=[joinedload(DagVersion.serialized_dag)]) diff --git a/airflow-core/src/airflow/models/dagbundle.py b/airflow-core/src/airflow/models/dagbundle.py index c2d99f2560d24..b11467b7b6614 100644 --- a/airflow-core/src/airflow/models/dagbundle.py +++ b/airflow-core/src/airflow/models/dagbundle.py @@ -18,9 +18,9 @@ from datetime import datetime +import sqlalchemy as sa from sqlalchemy import Boolean, String from sqlalchemy.orm import Mapped, mapped_column, relationship -from sqlalchemy_utils import JSONType from airflow.models.base import Base, StringID from airflow.models.team import dag_bundle_team_association_table @@ -49,7 +49,7 @@ class DagBundleModel(Base, LoggingMixin): version: Mapped[str | None] = mapped_column(String(200), nullable=True) last_refreshed: Mapped[datetime | None] = mapped_column(UtcDateTime, nullable=True) signed_url_template: Mapped[str | None] = mapped_column(String(200), nullable=True) - template_params: Mapped[dict | None] = mapped_column(JSONType, nullable=True) + template_params: Mapped[dict | None] = mapped_column(sa.JSON(), nullable=True) teams = relationship("Team", secondary=dag_bundle_team_association_table, back_populates="dag_bundles") def __init__(self, *, name: str, version: str | None = None): diff --git a/airflow-core/src/airflow/models/dagcode.py b/airflow-core/src/airflow/models/dagcode.py index 6f1207d376bfe..60ee91c8b59b5 100644 --- a/airflow-core/src/airflow/models/dagcode.py +++ b/airflow-core/src/airflow/models/dagcode.py @@ -19,13 +19,14 @@ import logging from datetime import datetime from typing import TYPE_CHECKING +from uuid import UUID +import sqlalchemy as sa import uuid6 from sqlalchemy import ForeignKey, String, Text, select from sqlalchemy.dialects.mysql import MEDIUMTEXT from sqlalchemy.orm import Mapped, mapped_column, relationship from sqlalchemy.sql.expression import literal -from sqlalchemy_utils import UUIDType from airflow._shared.timezones import timezone from airflow.configuration import conf @@ -55,7 +56,7 @@ class DagCode(Base): """ __tablename__ = "dag_code" - id: Mapped[str] = mapped_column(UUIDType(binary=False), primary_key=True, default=uuid6.uuid7) + id: Mapped[UUID] = mapped_column(sa.Uuid(), primary_key=True, default=uuid6.uuid7) dag_id: Mapped[str] = mapped_column(String(ID_LEN), nullable=False) fileloc: Mapped[str] = mapped_column(String(2000), nullable=False) # The max length of fileloc exceeds the limit of indexing. @@ -65,8 +66,8 @@ class DagCode(Base): ) source_code: Mapped[str] = mapped_column(Text().with_variant(MEDIUMTEXT(), "mysql"), nullable=False) source_code_hash: Mapped[str] = mapped_column(String(32), nullable=False) - dag_version_id: Mapped[str] = mapped_column( - UUIDType(binary=False), ForeignKey("dag_version.id", ondelete="CASCADE"), nullable=False, unique=True + dag_version_id: Mapped[UUID] = mapped_column( + sa.Uuid(), ForeignKey("dag_version.id", ondelete="CASCADE"), nullable=False, unique=True ) dag_version = relationship("DagVersion", back_populates="dag_code", uselist=False) diff --git a/airflow-core/src/airflow/models/dagrun.py b/airflow-core/src/airflow/models/dagrun.py index c5eb3a7087bb7..e05e3c7da3e3c 100644 --- a/airflow-core/src/airflow/models/dagrun.py +++ b/airflow-core/src/airflow/models/dagrun.py @@ -24,6 +24,7 @@ from collections.abc import Callable, Iterable, Iterator, Sequence from datetime import datetime from typing import TYPE_CHECKING, Any, NamedTuple, TypeVar, cast, overload +from uuid import UUID import structlog from natsort import natsorted @@ -38,6 +39,7 @@ String, Text, UniqueConstraint, + Uuid, and_, case, func, @@ -54,7 +56,6 @@ from sqlalchemy.orm import Mapped, declared_attr, joinedload, mapped_column, relationship, synonym, validates from sqlalchemy.sql.expression import false, select from sqlalchemy.sql.functions import coalesce -from sqlalchemy_utils import UUIDType from airflow._shared.observability.metrics.dual_stats_manager import DualStatsManager from airflow._shared.observability.metrics.stats import Stats @@ -214,8 +215,8 @@ class DagRun(Base, LoggingMixin): span_status: Mapped[str] = mapped_column( String(250), server_default=SpanStatus.NOT_STARTED, nullable=False ) - created_dag_version_id: Mapped[str | None] = mapped_column( - UUIDType(binary=False), + created_dag_version_id: Mapped[UUID | None] = mapped_column( + Uuid(), ForeignKey("dag_version.id", name="created_dag_version_id_fkey", ondelete="set null"), nullable=True, ) @@ -438,7 +439,7 @@ def duration(cls, session: Session = NEW_SESSION) -> Case: return case(when_condition, else_=None) @provide_session - def check_version_id_exists_in_dr(self, dag_version_id: UUIDType, session: Session = NEW_SESSION): + def check_version_id_exists_in_dr(self, dag_version_id: UUID, session: Session = NEW_SESSION): select_stmt = ( select(TI.dag_version_id) .where(TI.dag_id == self.dag_id, TI.dag_version_id == dag_version_id, TI.run_id == self.run_id) @@ -1093,7 +1094,7 @@ def start_dr_spans_if_needed(self, tis: list[TI]): ti_carrier = Trace.inject() ti.context_carrier = ti_carrier ti.span_status = SpanStatus.ACTIVE - self.active_spans.set("ti:" + ti.id, ti_span) + self.active_spans.set(f"ti:{ti.id}", ti_span) else: self.log.debug( "Found span_status '%s', while updating state for dag_run '%s'", @@ -1689,7 +1690,7 @@ def _emit_duration_stats_for_finished_state(self): ) @provide_session - def verify_integrity(self, *, session: Session = NEW_SESSION, dag_version_id: UUIDType) -> None: + def verify_integrity(self, *, session: Session = NEW_SESSION, dag_version_id: UUID) -> None: """ Verify the DagRun by checking for removed tasks or tasks that are not in the database yet. @@ -1826,7 +1827,7 @@ def _get_task_creator( created_counts: dict[str, int], ti_mutation_hook: Callable, hook_is_noop: Literal[True], - dag_version_id: UUIDType, + dag_version_id: UUID, ) -> Callable[[Operator, Iterable[int]], Iterator[dict[str, Any]]]: ... @overload @@ -1835,7 +1836,7 @@ def _get_task_creator( created_counts: dict[str, int], ti_mutation_hook: Callable, hook_is_noop: Literal[False], - dag_version_id: UUIDType, + dag_version_id: UUID, ) -> Callable[[Operator, Iterable[int]], Iterator[TI]]: ... def _get_task_creator( @@ -1843,7 +1844,7 @@ def _get_task_creator( created_counts: dict[str, int], ti_mutation_hook: Callable, hook_is_noop: Literal[True, False], - dag_version_id: UUIDType, + dag_version_id: UUID, ) -> Callable[[Operator, Iterable[int]], Iterator[dict[str, Any]] | Iterator[TI]]: """ Get the task creator function. @@ -1959,7 +1960,7 @@ def _create_task_instances( session.rollback() def _revise_map_indexes_if_mapped( - self, task: Operator, *, dag_version_id: UUIDType, session: Session + self, task: Operator, *, dag_version_id: UUID | None, session: Session ) -> Iterator[TI]: """ Check if task increased or reduced in length and handle appropriately. @@ -2052,8 +2053,8 @@ def schedule_tis( """ # Get list of TI IDs that do not need to executed, these are # tasks using EmptyOperator and without on_execute_callback / on_success_callback - empty_ti_ids: list[str] = [] - schedulable_ti_ids: list[str] = [] + empty_ti_ids: list[UUID] = [] + schedulable_ti_ids: list[UUID] = [] for ti in schedulable_tis: if ti.is_schedulable: schedulable_ti_ids.append(ti.id) diff --git a/airflow-core/src/airflow/models/deadline.py b/airflow-core/src/airflow/models/deadline.py index 070304f30a728..bbf24cc2842d2 100644 --- a/airflow-core/src/airflow/models/deadline.py +++ b/airflow-core/src/airflow/models/deadline.py @@ -22,12 +22,12 @@ from dataclasses import dataclass from datetime import datetime, timedelta from typing import TYPE_CHECKING, Any, cast +from uuid import UUID import uuid6 -from sqlalchemy import Boolean, ForeignKey, Index, Integer, and_, func, inspect, select, text +from sqlalchemy import Boolean, ForeignKey, Index, Integer, Uuid, and_, func, inspect, select, text from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.orm import Mapped, mapped_column, relationship -from sqlalchemy_utils import UUIDType from airflow._shared.observability.metrics.stats import Stats from airflow._shared.timezones import timezone @@ -82,7 +82,7 @@ class Deadline(Base): __tablename__ = "deadline" - id: Mapped[str] = mapped_column(UUIDType(binary=False), primary_key=True, default=uuid6.uuid7) + id: Mapped[UUID] = mapped_column(Uuid(), primary_key=True, default=uuid6.uuid7) created_at: Mapped[datetime] = mapped_column(UtcDateTime, nullable=False, default=timezone.utcnow) last_updated_at: Mapped[datetime] = mapped_column( UtcDateTime, nullable=False, default=timezone.utcnow, onupdate=timezone.utcnow @@ -101,14 +101,14 @@ class Deadline(Base): missed: Mapped[bool] = mapped_column(Boolean, nullable=False) # Callback that will run when this deadline is missed - callback_id: Mapped[str] = mapped_column( - UUIDType(binary=False), ForeignKey("callback.id", ondelete="CASCADE"), nullable=False + callback_id: Mapped[UUID] = mapped_column( + Uuid(), ForeignKey("callback.id", ondelete="CASCADE"), nullable=False ) callback = relationship("Callback", uselist=False, cascade="all, delete-orphan", single_parent=True) # The DeadlineAlert that generated this deadline - deadline_alert_id: Mapped[str | None] = mapped_column( - UUIDType(binary=False), ForeignKey("deadline_alert.id", ondelete="SET NULL"), nullable=True + deadline_alert_id: Mapped[UUID | None] = mapped_column( + Uuid(), ForeignKey("deadline_alert.id", ondelete="SET NULL"), nullable=True ) deadline_alert: Mapped[DeadlineAlert | None] = relationship("DeadlineAlert") @@ -119,7 +119,7 @@ def __init__( deadline_time: datetime, callback: CallbackDefinitionProtocol, dagrun_id: int, - deadline_alert_id: str | None, + deadline_alert_id: UUID | None, dag_id: str | None = None, ): super().__init__() diff --git a/airflow-core/src/airflow/models/deadline_alert.py b/airflow-core/src/airflow/models/deadline_alert.py index 557d905354aff..7af61ff70ba43 100644 --- a/airflow-core/src/airflow/models/deadline_alert.py +++ b/airflow-core/src/airflow/models/deadline_alert.py @@ -18,12 +18,12 @@ from datetime import datetime from typing import TYPE_CHECKING +from uuid import UUID import uuid6 -from sqlalchemy import JSON, Float, ForeignKey, String, Text, select +from sqlalchemy import JSON, Float, ForeignKey, String, Text, Uuid, select from sqlalchemy.exc import NoResultFound from sqlalchemy.orm import Mapped, mapped_column -from sqlalchemy_utils import UUIDType from airflow._shared.timezones import timezone from airflow.models import Base @@ -40,11 +40,11 @@ class DeadlineAlert(Base): __tablename__ = "deadline_alert" - id: Mapped[str] = mapped_column(UUIDType(binary=False), primary_key=True, default=uuid6.uuid7) + id: Mapped[UUID] = mapped_column(Uuid(), primary_key=True, default=uuid6.uuid7) created_at: Mapped[datetime] = mapped_column(UtcDateTime, nullable=False, default=timezone.utcnow) - serialized_dag_id: Mapped[str] = mapped_column( - UUIDType(binary=False), ForeignKey("serialized_dag.id", ondelete="CASCADE"), nullable=False + serialized_dag_id: Mapped[UUID] = mapped_column( + Uuid(), ForeignKey("serialized_dag.id", ondelete="CASCADE"), nullable=False ) name: Mapped[str | None] = mapped_column(String(250), nullable=True) @@ -94,13 +94,16 @@ def reference_class(self) -> type[SerializedReferenceModels.SerializedBaseDeadli @classmethod @provide_session - def get_by_id(cls, deadline_alert_id: str, session: Session = NEW_SESSION) -> DeadlineAlert: + def get_by_id(cls, deadline_alert_id: str | UUID, session: Session = NEW_SESSION) -> DeadlineAlert: """ Retrieve a DeadlineAlert record by its UUID. - :param deadline_alert_id: The UUID of the DeadlineAlert to retrieve + :param deadline_alert_id: The UUID of the DeadlineAlert to retrieve (as string or UUID object) :param session: Database session """ + # Convert string to UUID if needed + if isinstance(deadline_alert_id, str): + deadline_alert_id = UUID(deadline_alert_id) result = session.scalar(select(cls).where(cls.id == deadline_alert_id)) if result is None: raise NoResultFound(f"No DeadlineAlert found with id {deadline_alert_id}") diff --git a/airflow-core/src/airflow/models/hitl.py b/airflow-core/src/airflow/models/hitl.py index 3077bc3c8084a..38e2d2f112f87 100644 --- a/airflow-core/src/airflow/models/hitl.py +++ b/airflow-core/src/airflow/models/hitl.py @@ -18,10 +18,10 @@ from datetime import datetime from typing import TYPE_CHECKING, Any, TypedDict +from uuid import UUID import sqlalchemy as sa -from sqlalchemy import Boolean, ForeignKeyConstraint, String, Text, func, literal -from sqlalchemy.dialects import postgresql +from sqlalchemy import Boolean, ForeignKeyConstraint, String, Text, Uuid, func, literal from sqlalchemy.ext.compiler import compiles from sqlalchemy.ext.hybrid import hybrid_property from sqlalchemy.orm import Mapped, mapped_column, relationship @@ -137,8 +137,8 @@ class HITLDetail(Base, HITLDetailPropertyMixin): """Human-in-the-loop request and corresponding response.""" __tablename__ = "hitl_detail" - ti_id: Mapped[str] = mapped_column( - String(36).with_variant(postgresql.UUID(as_uuid=False), "postgresql"), + ti_id: Mapped[UUID] = mapped_column( + Uuid(), primary_key=True, nullable=False, ) diff --git a/airflow-core/src/airflow/models/hitl_history.py b/airflow-core/src/airflow/models/hitl_history.py index a0951a32df217..7974143814008 100644 --- a/airflow-core/src/airflow/models/hitl_history.py +++ b/airflow-core/src/airflow/models/hitl_history.py @@ -18,10 +18,10 @@ from datetime import datetime from typing import TYPE_CHECKING +from uuid import UUID import sqlalchemy as sa -from sqlalchemy import Boolean, ForeignKeyConstraint, String, Text -from sqlalchemy.dialects import postgresql +from sqlalchemy import Boolean, ForeignKeyConstraint, Text, Uuid from sqlalchemy.orm import Mapped, mapped_column, relationship from airflow._shared.timezones import timezone @@ -41,8 +41,8 @@ class HITLDetailHistory(Base, HITLDetailPropertyMixin): """ __tablename__ = "hitl_detail_history" - ti_history_id: Mapped[str] = mapped_column( - String(36).with_variant(postgresql.UUID(as_uuid=False), "postgresql"), + ti_history_id: Mapped[UUID] = mapped_column( + Uuid(), primary_key=True, nullable=False, ) diff --git a/airflow-core/src/airflow/models/serialized_dag.py b/airflow-core/src/airflow/models/serialized_dag.py index a633d764bfe51..3d8ca6c85227a 100644 --- a/airflow-core/src/airflow/models/serialized_dag.py +++ b/airflow-core/src/airflow/models/serialized_dag.py @@ -24,14 +24,13 @@ from collections.abc import Callable, Iterable, Iterator, Sequence from datetime import datetime, timedelta from typing import TYPE_CHECKING, Any, Literal +from uuid import UUID -import sqlalchemy as sa import uuid6 -from sqlalchemy import ForeignKey, LargeBinary, String, exists, select, tuple_, update +from sqlalchemy import JSON, ForeignKey, LargeBinary, String, Uuid, exists, select, tuple_, update from sqlalchemy.dialects.postgresql import JSONB from sqlalchemy.orm import Mapped, backref, foreign, mapped_column, relationship from sqlalchemy.sql.expression import func, literal -from sqlalchemy_utils import UUIDType from airflow._shared.timezones import timezone from airflow.configuration import conf @@ -293,10 +292,10 @@ class SerializedDagModel(Base): """ __tablename__ = "serialized_dag" - id: Mapped[str] = mapped_column(UUIDType(binary=False), primary_key=True, default=uuid6.uuid7) + id: Mapped[UUID] = mapped_column(Uuid(), primary_key=True, default=uuid6.uuid7) dag_id: Mapped[str] = mapped_column(String(ID_LEN), nullable=False) _data: Mapped[dict | None] = mapped_column( - "data", sa.JSON().with_variant(JSONB, "postgresql"), nullable=True + "data", JSON().with_variant(JSONB, "postgresql"), nullable=True ) _data_compressed: Mapped[bytes | None] = mapped_column("data_compressed", LargeBinary, nullable=True) created_at: Mapped[datetime] = mapped_column(UtcDateTime, nullable=False, default=timezone.utcnow) @@ -319,8 +318,8 @@ class SerializedDagModel(Base): innerjoin=True, backref=backref("serialized_dag", uselist=False, innerjoin=True), ) - dag_version_id: Mapped[str] = mapped_column( - UUIDType(binary=False), + dag_version_id: Mapped[UUID] = mapped_column( + Uuid(), ForeignKey("dag_version.id", ondelete="CASCADE"), nullable=False, unique=True, @@ -435,7 +434,7 @@ def _create_deadline_alert_records( for uuid_str, deadline_data in uuid_mapping.items(): alert = DeadlineAlertModel( - id=uuid_str, + id=UUID(uuid_str), reference=deadline_data[DeadlineAlertFields.REFERENCE], interval=deadline_data[DeadlineAlertFields.INTERVAL], callback_def=deadline_data[DeadlineAlertFields.CALLBACK], diff --git a/airflow-core/src/airflow/models/taskinstance.py b/airflow-core/src/airflow/models/taskinstance.py index a0676cfc0ac00..f2e4c03960b99 100644 --- a/airflow-core/src/airflow/models/taskinstance.py +++ b/airflow-core/src/airflow/models/taskinstance.py @@ -22,12 +22,12 @@ import json import logging import math -import uuid from collections import defaultdict from collections.abc import Collection, Iterable from datetime import datetime, timedelta from typing import TYPE_CHECKING, Any from urllib.parse import quote +from uuid import UUID import attrs import dill @@ -43,6 +43,7 @@ String, Text, UniqueConstraint, + Uuid, and_, case, delete, @@ -62,7 +63,6 @@ from sqlalchemy.ext.mutable import MutableDict from sqlalchemy.orm import Mapped, lazyload, mapped_column, reconstructor, relationship from sqlalchemy.orm.attributes import NO_VALUE, set_committed_value -from sqlalchemy_utils import UUIDType from airflow import settings from airflow._shared.observability.metrics.dual_stats_manager import DualStatsManager @@ -251,13 +251,11 @@ def clear_task_instances( :meta private: """ - task_instance_ids: list[str] = [] from airflow.exceptions import AirflowClearRunningTaskException from airflow.models.dagbag import DBDagBag scheduler_dagbag = DBDagBag(load_op_links=False) for ti in tis: - task_instance_ids.append(ti.id) ti.prepare_db_for_next_try(session) if ti.state == TaskInstanceState.RUNNING: @@ -403,9 +401,9 @@ def _date_or_empty(*, task_instance: TaskInstance, attr: str) -> str: return result.strftime("%Y%m%dT%H%M%S") if result else "" -def uuid7() -> str: - """Generate a new UUID7 string.""" - return str(uuid6.uuid7()) +def uuid7() -> UUID: + """Generate a new UUID7.""" + return uuid6.uuid7() class TaskInstance(Base, LoggingMixin): @@ -428,8 +426,8 @@ class TaskInstance(Base, LoggingMixin): """ __tablename__ = "task_instance" - id: Mapped[str] = mapped_column( - String(36).with_variant(postgresql.UUID(as_uuid=False), "postgresql"), + id: Mapped[UUID] = mapped_column( + Uuid(), primary_key=True, default=uuid7, nullable=False, @@ -488,8 +486,8 @@ class TaskInstance(Base, LoggingMixin): _task_display_property_value: Mapped[str | None] = mapped_column( "task_display_name", String(2000), nullable=True ) - dag_version_id: Mapped[str | uuid.UUID | None] = mapped_column( - UUIDType(binary=False), + dag_version_id: Mapped[UUID | None] = mapped_column( + Uuid(), ForeignKey("dag_version.id", ondelete="RESTRICT"), nullable=True, ) @@ -558,7 +556,7 @@ class TaskInstance(Base, LoggingMixin): def __init__( self, task: Operator, - dag_version_id: UUIDType | uuid.UUID, + dag_version_id: UUID | None, run_id: str | None = None, state: str | None = None, map_index: int = -1, @@ -601,7 +599,7 @@ def stats_tags(self) -> dict[str, str]: @staticmethod def insert_mapping( - run_id: str, task: Operator, map_index: int, dag_version_id: UUIDType + run_id: str, task: Operator, map_index: int, dag_version_id: UUID | None ) -> dict[str, Any]: """ Insert mapping. @@ -2172,8 +2170,8 @@ class TaskInstanceNote(Base): """For storage of arbitrary notes concerning the task instance.""" __tablename__ = "task_instance_note" - ti_id: Mapped[str] = mapped_column( - String(36).with_variant(postgresql.UUID(as_uuid=False), "postgresql"), + ti_id: Mapped[UUID] = mapped_column( + Uuid(), primary_key=True, nullable=False, ) diff --git a/airflow-core/src/airflow/models/taskinstancehistory.py b/airflow-core/src/airflow/models/taskinstancehistory.py index 23a1796ac1e0e..df7b0a2876f0e 100644 --- a/airflow-core/src/airflow/models/taskinstancehistory.py +++ b/airflow-core/src/airflow/models/taskinstancehistory.py @@ -19,6 +19,7 @@ from datetime import datetime from typing import TYPE_CHECKING +from uuid import UUID import dill from sqlalchemy import ( @@ -31,13 +32,13 @@ String, Text, UniqueConstraint, + Uuid, func, select, ) from sqlalchemy.dialects import postgresql from sqlalchemy.ext.mutable import MutableDict from sqlalchemy.orm import Mapped, mapped_column, relationship -from sqlalchemy_utils import UUIDType from airflow._shared.timezones import timezone from airflow.models.base import Base, StringID @@ -67,8 +68,8 @@ class TaskInstanceHistory(Base): """ __tablename__ = "task_instance_history" - task_instance_id: Mapped[str] = mapped_column( - String(36).with_variant(postgresql.UUID(as_uuid=False), "postgresql"), + task_instance_id: Mapped[UUID] = mapped_column( + Uuid(), nullable=False, primary_key=True, ) @@ -114,7 +115,7 @@ class TaskInstanceHistory(Base): ) task_display_name: Mapped[str | None] = mapped_column(String(2000), nullable=True) - dag_version_id: Mapped[str | None] = mapped_column(UUIDType(binary=False), nullable=True) + dag_version_id: Mapped[UUID | None] = mapped_column(Uuid(), nullable=True) dag_version = relationship( "DagVersion", @@ -174,7 +175,7 @@ def __init__( ) @property - def id(self) -> str: + def id(self) -> UUID: """Alias for primary key field to support TaskInstance.""" return self.task_instance_id diff --git a/airflow-core/src/airflow/models/taskreschedule.py b/airflow-core/src/airflow/models/taskreschedule.py index 88f87121fd717..a7c3022b8d77c 100644 --- a/airflow-core/src/airflow/models/taskreschedule.py +++ b/airflow-core/src/airflow/models/taskreschedule.py @@ -20,17 +20,16 @@ from __future__ import annotations import datetime -import uuid from typing import TYPE_CHECKING +from uuid import UUID from sqlalchemy import ( ForeignKey, Index, Integer, - String, + Uuid, select, ) -from sqlalchemy.dialects import postgresql from sqlalchemy.orm import Mapped, mapped_column, relationship from airflow.models.base import Base @@ -49,8 +48,8 @@ class TaskReschedule(Base): __tablename__ = "task_reschedule" id: Mapped[int] = mapped_column(Integer, primary_key=True) - ti_id: Mapped[str] = mapped_column( - String(36).with_variant(postgresql.UUID(as_uuid=False), "postgresql"), + ti_id: Mapped[UUID] = mapped_column( + Uuid(), ForeignKey("task_instance.id", ondelete="CASCADE", name="task_reschedule_ti_fkey"), nullable=False, ) @@ -67,12 +66,12 @@ class TaskReschedule(Base): def __init__( self, - ti_id: uuid.UUID | str, + ti_id: UUID, start_date: datetime.datetime, end_date: datetime.datetime, reschedule_date: datetime.datetime, ) -> None: - self.ti_id = str(ti_id) + self.ti_id = ti_id self.start_date = start_date self.end_date = end_date self.reschedule_date = reschedule_date diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts index 27a1d9e15006d..8b0bf124203b2 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -5360,6 +5360,7 @@ export const $TaskInstanceResponse = { properties: { id: { type: 'string', + format: 'uuid', title: 'Id' }, task_id: { diff --git a/airflow-core/src/airflow/utils/db.py b/airflow-core/src/airflow/utils/db.py index 2b3ea410c5f76..01407f3b004b3 100644 --- a/airflow-core/src/airflow/utils/db.py +++ b/airflow-core/src/airflow/utils/db.py @@ -112,7 +112,7 @@ class MappedClassProtocol(Protocol): "3.0.0": "29ce7909c52b", "3.0.3": "fe199e1abd77", "3.1.0": "cc92b33c6709", - "3.2.0": "53ff648b8a26", + "3.2.0": "f8c9d7e6b5a4", } # Prefix used to identify tables holding data moved during migration. diff --git a/airflow-core/tests/unit/api_fastapi/common/test_uuid_serialization.py b/airflow-core/tests/unit/api_fastapi/common/test_uuid_serialization.py new file mode 100644 index 0000000000000..28d7117af542c --- /dev/null +++ b/airflow-core/tests/unit/api_fastapi/common/test_uuid_serialization.py @@ -0,0 +1,49 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from uuid import UUID + +import pytest + +from airflow.api_fastapi.core_api.datamodels.dag_versions import DagVersionResponse +from airflow.api_fastapi.core_api.datamodels.task_instances import TaskInstanceResponse + +TEST_UUID_36 = "019c39c7-4fea-76b6-891d-ebb3267d427d" +TEST_UUID_32 = TEST_UUID_36.replace("-", "") +TEST_UUID = UUID(TEST_UUID_32) + + +@pytest.mark.parametrize( + "model_cls", + [ + TaskInstanceResponse, + DagVersionResponse, + ], + ids=["TaskInstanceResponse", "DagVersionResponse"], +) +def test_uuid_field_serializes_as_dashed_string(model_cls): + """Ensure UUID fields serialize to dashed string format in JSON responses. + + This guards against regressions that could break airflow-client-python + and other consumers of the Public API that expect UUID fields to be + plain dashed strings in JSON. + """ + instance = model_cls.model_construct(id=TEST_UUID) + json_data = instance.model_dump(mode="json") + assert isinstance(json_data["id"], str) + assert json_data["id"] == TEST_UUID_36 diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_hitl.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_hitl.py index 3f370760577e9..65b156fd71a39 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_hitl.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_hitl.py @@ -244,7 +244,7 @@ def expected_sample_hitl_detail_dict(sample_ti: TaskInstance) -> dict[str, Any]: "executor": None, "executor_config": "{}", "hostname": "", - "id": sample_ti.id, + "id": str(sample_ti.id), "logical_date": mock.ANY, "map_index": -1, "max_tries": 0, diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_log.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_log.py index 5df670c3a6054..113401f85fb71 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_log.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_log.py @@ -79,7 +79,7 @@ def add_one(x: int): dag.clear() for ti in dr.task_instances: ti.try_number = 2 - ti.id = str(uuid7()) + ti.id = uuid7() ti.hostname = "localhost" session.merge(ti) # Commit changes to avoid locks @@ -104,7 +104,7 @@ def add_one(x: int): dummy_dag.clear() for ti in dr2.task_instances: ti.try_number = 2 - ti.id = str(uuid7()) + ti.id = uuid7() ti.hostname = "localhost" session.merge(ti) diff --git a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_hitl.py b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_hitl.py index 61c74bf70f342..0b51dfead307a 100644 --- a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_hitl.py +++ b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_hitl.py @@ -119,13 +119,13 @@ def test_upsert_hitl_detail( response = client.post( f"/execution/hitlDetails/{ti.id}", json={ - "ti_id": ti.id, + "ti_id": str(ti.id), **default_hitl_detail_request_kwargs, }, ) expected_json = { - "ti_id": ti.id, + "ti_id": str(ti.id), **default_hitl_detail_request_kwargs, } expected_json["assigned_users"] = expected_json.pop("assignees") or [] @@ -145,7 +145,7 @@ def test_upsert_hitl_detail_with_empty_option( response = client.post( f"/execution/hitlDetails/{ti.id}", json={ - "ti_id": ti.id, + "ti_id": str(ti.id), "subject": "This is subject", "body": "this is body", "options": [], @@ -163,7 +163,7 @@ def test_update_hitl_detail(client: Client, sample_ti: TaskInstance) -> None: response = client.patch( f"/execution/hitlDetails/{sample_ti.id}", json={ - "ti_id": sample_ti.id, + "ti_id": str(sample_ti.id), "chosen_options": ["Reject"], "params_input": {"input_1": 2}, }, @@ -182,7 +182,7 @@ def test_update_hitl_detail_without_option(client: Client, sample_ti: TaskInstan response = client.patch( f"/execution/hitlDetails/{sample_ti.id}", json={ - "ti_id": sample_ti.id, + "ti_id": str(sample_ti.id), "chosen_options": [], "params_input": {"input_1": 2}, }, diff --git a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py index cfee6c9d46cb1..2eba8a045a82b 100644 --- a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py +++ b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py @@ -20,7 +20,7 @@ from datetime import datetime from typing import TYPE_CHECKING from unittest import mock -from uuid import uuid4 +from uuid import UUID, uuid4 import pytest import uuid6 @@ -95,7 +95,7 @@ def test_id_matches_sub_claim(client, session, create_task_instance): def side_effect(cred, validators): if not validators: return claims - if validators["sub"]["value"] != ti.id: + if str(validators["sub"]["value"]) != str(ti.id): raise RuntimeError("Fake auth denied") return claims @@ -1011,7 +1011,7 @@ def test_ti_update_state_not_found(self, client, session): """ Test that a 404 error is returned when the Task Instance does not exist. """ - task_instance_id = "0182e924-0f1e-77e6-ab50-e977118bc139" + task_instance_id = UUID("0182e924-0f1e-77e6-ab50-e977118bc139") # Pre-condition: the Task Instance does not exist assert session.get(TaskInstance, task_instance_id) is None @@ -1575,7 +1575,7 @@ def test_ti_heartbeat( def test_ti_heartbeat_non_existent_task(self, client, session, create_task_instance): """Test that a 404 error is returned when the Task Instance does not exist.""" - task_instance_id = "0182e924-0f1e-77e6-ab50-e977118bc139" + task_instance_id = UUID("0182e924-0f1e-77e6-ab50-e977118bc139") # Pre-condition: the Task Instance does not exist assert session.get(TaskInstance, task_instance_id) is None @@ -1771,7 +1771,7 @@ def test_ti_previous_dag_run(self, client, session, create_task_instance, dag_ma } def test_ti_previous_dag_run_not_found(self, client, session): - ti_id = "0182e924-0f1e-77e6-ab50-e977118bc139" + ti_id = UUID("0182e924-0f1e-77e6-ab50-e977118bc139") assert session.get(TaskInstance, ti_id) is None diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py b/airflow-core/tests/unit/jobs/test_scheduler_job.py index e1db0b698cc0d..fee8dbb7f6617 100644 --- a/airflow-core/tests/unit/jobs/test_scheduler_job.py +++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py @@ -3164,7 +3164,7 @@ def test_recreate_unhealthy_scheduler_spans_if_needed(self, ti_state, final_ti_s assert dr.span_status == SpanStatus.ACTIVE assert self.job_runner.active_spans.get("dr:" + str(dr.id)) is None - assert self.job_runner.active_spans.get("ti:" + ti.id) is None + assert self.job_runner.active_spans.get(f"ti:{ti.id}") is None assert ti.state == ti_state assert ti.span_status == SpanStatus.ACTIVE @@ -3173,10 +3173,10 @@ def test_recreate_unhealthy_scheduler_spans_if_needed(self, ti_state, final_ti_s assert self.job_runner.active_spans.get("dr:" + str(dr.id)) is not None if final_ti_span_status == SpanStatus.ACTIVE: - assert self.job_runner.active_spans.get("ti:" + ti.id) is not None + assert self.job_runner.active_spans.get(f"ti:{ti.id}") is not None assert len(self.job_runner.active_spans.get_all()) == 2 else: - assert self.job_runner.active_spans.get("ti:" + ti.id) is None + assert self.job_runner.active_spans.get(f"ti:{ti.id}") is None assert len(self.job_runner.active_spans.get_all()) == 1 assert dr.span_status == SpanStatus.ACTIVE @@ -3217,13 +3217,13 @@ def test_end_spans_of_externally_ended_ops(self, dag_maker): ti_span = Trace.start_child_span(span_name="ti_span", start_as_current=False) self.job_runner.active_spans.set("dr:" + str(dr.id), dr_span) - self.job_runner.active_spans.set("ti:" + ti.id, ti_span) + self.job_runner.active_spans.set(f"ti:{ti.id}", ti_span) assert dr.span_status == SpanStatus.SHOULD_END assert ti.span_status == SpanStatus.SHOULD_END assert self.job_runner.active_spans.get("dr:" + str(dr.id)) is not None - assert self.job_runner.active_spans.get("ti:" + ti.id) is not None + assert self.job_runner.active_spans.get(f"ti:{ti.id}") is not None self.job_runner._end_spans_of_externally_ended_ops(session) @@ -3231,7 +3231,7 @@ def test_end_spans_of_externally_ended_ops(self, dag_maker): assert ti.span_status == SpanStatus.ENDED assert self.job_runner.active_spans.get("dr:" + str(dr.id)) is None - assert self.job_runner.active_spans.get("ti:" + ti.id) is None + assert self.job_runner.active_spans.get(f"ti:{ti.id}") is None @pytest.mark.parametrize( ("state", "final_span_status"), @@ -3274,13 +3274,13 @@ def test_end_active_spans(self, state, final_span_status, dag_maker): ti_span = Trace.start_child_span(span_name="ti_span", start_as_current=False) self.job_runner.active_spans.set("dr:" + str(dr.id), dr_span) - self.job_runner.active_spans.set("ti:" + ti.id, ti_span) + self.job_runner.active_spans.set(f"ti:{ti.id}", ti_span) assert dr.span_status == SpanStatus.ACTIVE assert ti.span_status == SpanStatus.ACTIVE assert self.job_runner.active_spans.get("dr:" + str(dr.id)) is not None - assert self.job_runner.active_spans.get("ti:" + ti.id) is not None + assert self.job_runner.active_spans.get(f"ti:{ti.id}") is not None assert len(self.job_runner.active_spans.get_all()) == 2 self.job_runner._end_active_spans(session) @@ -3289,7 +3289,7 @@ def test_end_active_spans(self, state, final_span_status, dag_maker): assert ti.span_status == final_span_status assert self.job_runner.active_spans.get("dr:" + str(dr.id)) is None - assert self.job_runner.active_spans.get("ti:" + ti.id) is None + assert self.job_runner.active_spans.get(f"ti:{ti.id}") is None assert len(self.job_runner.active_spans.get_all()) == 0 def test_dagrun_timeout_verify_max_active_runs(self, dag_maker, session): diff --git a/airflow-core/tests/unit/models/test_dagrun.py b/airflow-core/tests/unit/models/test_dagrun.py index 7449a69b5f0cd..070e0f62a4209 100644 --- a/airflow-core/tests/unit/models/test_dagrun.py +++ b/airflow-core/tests/unit/models/test_dagrun.py @@ -563,7 +563,7 @@ def test_start_dr_spans_if_needed_span_with_continuance(self, dag_maker, session assert dag_run.active_spans is not None assert dag_run.active_spans.get("dr:" + str(dag_run.id)) is None - assert dag_run.active_spans.get("ti:" + first_ti.id) is None + assert dag_run.active_spans.get(f"ti:{first_ti.id}") is None assert dag_run.span_status == SpanStatus.NEEDS_CONTINUANCE assert first_ti.span_status == SpanStatus.NEEDS_CONTINUANCE @@ -572,7 +572,7 @@ def test_start_dr_spans_if_needed_span_with_continuance(self, dag_maker, session assert dag_run.span_status == SpanStatus.ACTIVE assert first_ti.span_status == SpanStatus.ACTIVE assert dag_run.active_spans.get("dr:" + str(dag_run.id)) is not None - assert dag_run.active_spans.get("ti:" + first_ti.id) is not None + assert dag_run.active_spans.get(f"ti:{first_ti.id}") is not None def test_end_dr_span_if_needed(self, testing_dag_bundle, dag_maker, session): with dag_maker( diff --git a/airflow-core/tests/unit/models/test_taskinstance.py b/airflow-core/tests/unit/models/test_taskinstance.py index 20faa78946646..8172656ec050e 100644 --- a/airflow-core/tests/unit/models/test_taskinstance.py +++ b/airflow-core/tests/unit/models/test_taskinstance.py @@ -2428,7 +2428,7 @@ def test_refresh_from_db(self, create_task_instance): "try_number": 1, "max_tries": 1, "hostname": "some_unique_hostname", - "id": str(uuid6.uuid7()), + "id": uuid6.uuid7(), "unixname": "some_unique_unixname", "pool": "some_fake_pool_id", "pool_slots": 25, @@ -2576,7 +2576,7 @@ def test_task_instance_history_is_created_when_ti_goes_for_retry(self, dag_maker tih = session.scalars(select(TaskInstanceHistory)).all() assert len(tih) == 1 # the new try_id should be different from what's recorded in tih - assert str(tih[0].task_instance_id) == try_id + assert tih[0].task_instance_id == try_id @pytest.mark.parametrize("pool_override", [None, "test_pool2"]) diff --git a/airflow-ctl/src/airflowctl/api/datamodels/generated.py b/airflow-ctl/src/airflowctl/api/datamodels/generated.py index d965f317c55ae..fbb0881b90936 100644 --- a/airflow-ctl/src/airflowctl/api/datamodels/generated.py +++ b/airflow-ctl/src/airflowctl/api/datamodels/generated.py @@ -1728,7 +1728,7 @@ class TaskInstanceResponse(BaseModel): TaskInstance serializer for responses. """ - id: Annotated[str, Field(title="Id")] + id: Annotated[UUID, Field(title="Id")] task_id: Annotated[str, Field(title="Task Id")] dag_id: Annotated[str, Field(title="Dag Id")] dag_run_id: Annotated[str, Field(title="Dag Run Id")] diff --git a/devel-common/src/tests_common/test_utils/api_fastapi.py b/devel-common/src/tests_common/test_utils/api_fastapi.py index eee7f48aec8c3..a59f05bf10827 100644 --- a/devel-common/src/tests_common/test_utils/api_fastapi.py +++ b/devel-common/src/tests_common/test_utils/api_fastapi.py @@ -17,6 +17,7 @@ from __future__ import annotations import json +from uuid import UUID from airflow.models import DagRun, Log from airflow.models.dagrun import DagRunNote @@ -93,6 +94,8 @@ def _check_dag_run_note(session, dr_id, note_data): def _check_task_instance_note(session, ti_id, note_data): from sqlalchemy import select + if isinstance(ti_id, str): + ti_id = UUID(ti_id) ti_note = session.scalar(select(TaskInstanceNote).where(TaskInstanceNote.ti_id == ti_id)) if note_data is None: assert ti_note is None diff --git a/providers/openlineage/tests/unit/openlineage/plugins/test_listener.py b/providers/openlineage/tests/unit/openlineage/plugins/test_listener.py index 8c44a920ebcf7..608e789436e96 100644 --- a/providers/openlineage/tests/unit/openlineage/plugins/test_listener.py +++ b/providers/openlineage/tests/unit/openlineage/plugins/test_listener.py @@ -205,10 +205,11 @@ def sample_callable(**kwargs): execution_date=date, # type: ignore ) if AIRFLOW_V_3_0_PLUS: + assert dagrun.created_dag_version_id is not None task_instance = create_task_instance( t, run_id=run_id, - dag_version_id=uuid.UUID(dagrun.created_dag_version_id), + dag_version_id=dagrun.created_dag_version_id, ) else: task_instance = TaskInstance(t, run_id=run_id) # type: ignore diff --git a/providers/standard/tests/unit/standard/operators/test_hitl.py b/providers/standard/tests/unit/standard/operators/test_hitl.py index 97c4ce9160b61..108a6935b3858 100644 --- a/providers/standard/tests/unit/standard/operators/test_hitl.py +++ b/providers/standard/tests/unit/standard/operators/test_hitl.py @@ -260,7 +260,7 @@ def test_execute(self, dag_maker: DagMaker, session: Session) -> None: if AIRFLOW_V_3_2_PLUS: expected_params_in_trigger_kwargs = expected_params # trigger_kwargs are encoded via serde from task sdk in versions >= 3.2 - expected_ti_id = UUID(ti.id) + expected_ti_id = ti.id else: expected_params_in_trigger_kwargs = {"input_1": {"value": 1, "description": None, "schema": {}}}