diff --git a/airflow-core/docs/img/airflow_erd.sha256 b/airflow-core/docs/img/airflow_erd.sha256 index 2b3b4511b5200..2761496534f41 100644 --- a/airflow-core/docs/img/airflow_erd.sha256 +++ b/airflow-core/docs/img/airflow_erd.sha256 @@ -1 +1 @@ -04e335ccd5d3497a851c2a0f1c0fd8870ef9faca5d9b42e159238198f1f99e2a \ No newline at end of file +35a5a5795446d6a0826115873f4484a35a3676e63c9227d01862766d244a81c7 \ No newline at end of file diff --git a/airflow-core/docs/img/airflow_erd.svg b/airflow-core/docs/img/airflow_erd.svg index c69990ebb229d..412738971254f 100644 --- a/airflow-core/docs/img/airflow_erd.svg +++ b/airflow-core/docs/img/airflow_erd.svg @@ -4,2537 +4,2534 @@ - - + + %3 - - - -log - -log - -id - - [INTEGER] - NOT NULL - -dag_id - - [VARCHAR(250)] - -dttm - - [TIMESTAMP] - -event - - [VARCHAR(60)] - -extra - - [TEXT] - -logical_date - - [TIMESTAMP] - -map_index - - [INTEGER] - -owner - - [VARCHAR(500)] - -owner_display_name - - [VARCHAR(500)] - -run_id - - [VARCHAR(250)] - -task_id - - [VARCHAR(250)] - -try_number - - [INTEGER] - + - + dag_priority_parsing_request - -dag_priority_parsing_request - -id - - [VARCHAR(32)] - NOT NULL - -bundle_name - - [VARCHAR(250)] - NOT NULL - -relative_fileloc - - [VARCHAR(2000)] - NOT NULL + +dag_priority_parsing_request + +id + + [VARCHAR(32)] + NOT NULL + +bundle_name + + [VARCHAR(250)] + NOT NULL + +relative_fileloc + + [VARCHAR(2000)] + NOT NULL - + job - -job - -id - - [INTEGER] - NOT NULL - -dag_id - - [VARCHAR(250)] - -end_date - - [TIMESTAMP] - -executor_class - - [VARCHAR(500)] - -hostname - - [VARCHAR(500)] - -job_type - - [VARCHAR(30)] - -latest_heartbeat - - [TIMESTAMP] - -start_date - - [TIMESTAMP] - -state - - [VARCHAR(20)] - -unixname - - [VARCHAR(1000)] + +job + +id + + [INTEGER] + NOT NULL + +dag_id + + [VARCHAR(250)] + +end_date + + [TIMESTAMP] + +executor_class + + [VARCHAR(500)] + +hostname + + [VARCHAR(500)] + +job_type + + [VARCHAR(30)] + +latest_heartbeat + + [TIMESTAMP] + +start_date + + [TIMESTAMP] + +state + + [VARCHAR(20)] + +unixname + + [VARCHAR(1000)] + + + +log + +log + +id + + [INTEGER] + NOT NULL + +dag_id + + [VARCHAR(250)] + +dttm + + [TIMESTAMP] + +event + + [VARCHAR(60)] + +extra + + [TEXT] + +logical_date + + [TIMESTAMP] + +map_index + + [INTEGER] + +owner + + [VARCHAR(500)] + +owner_display_name + + [VARCHAR(500)] + +run_id + + [VARCHAR(250)] + +task_id + + [VARCHAR(250)] + +try_number + + [INTEGER] import_error - -import_error - -id - - [INTEGER] - NOT NULL - -bundle_name - - [VARCHAR(250)] - -filename - - [VARCHAR(1024)] - -stacktrace - - [TEXT] - -timestamp - - [TIMESTAMP] + +import_error + +id + + [INTEGER] + NOT NULL + +bundle_name + + [VARCHAR(250)] + +filename + + [VARCHAR(1024)] + +stacktrace + + [TEXT] + +timestamp + + [TIMESTAMP] dag_bundle - -dag_bundle - -name - - [VARCHAR(250)] - NOT NULL - -active - - [BOOLEAN] - -last_refreshed - - [TIMESTAMP] - -signed_url_template - - [VARCHAR(200)] - -template_params - - [JSON] - -version - - [VARCHAR(200)] + +dag_bundle + +name + + [VARCHAR(250)] + NOT NULL + +active + + [BOOLEAN] + +last_refreshed + + [TIMESTAMP] + +signed_url_template + + [VARCHAR(200)] + +template_params + + [JSON] + +version + + [VARCHAR(200)] dag_bundle_team - -dag_bundle_team - -dag_bundle_name - - [VARCHAR(250)] - NOT NULL - -team_id - - [UUID] - NOT NULL + +dag_bundle_team + +dag_bundle_name + + [VARCHAR(250)] + NOT NULL + +team_id + + [UUID] + NOT NULL - + dag_bundle:name--dag_bundle_team:dag_bundle_name - -0..N -1 + +0..N +1 - + dag - -dag - -dag_id - - [VARCHAR(250)] - NOT NULL - -asset_expression - - [JSON] - -bundle_name - - [VARCHAR(250)] - NOT NULL - -bundle_version - - [VARCHAR(200)] - -dag_display_name - - [VARCHAR(2000)] - -deadline - - [JSON] - -description - - [TEXT] - -fail_fast - - [BOOLEAN] - NOT NULL - -fileloc - - [VARCHAR(2000)] - -has_import_errors - - [BOOLEAN] - -has_task_concurrency_limits - - [BOOLEAN] - NOT NULL - -is_paused - - [BOOLEAN] - -is_stale - - [BOOLEAN] - -last_expired - - [TIMESTAMP] - -last_parse_duration - - [DOUBLE_PRECISION] - -last_parsed_time - - [TIMESTAMP] - -max_active_runs - - [INTEGER] - -max_active_tasks - - [INTEGER] - NOT NULL - -max_consecutive_failed_dag_runs - - [INTEGER] - NOT NULL - -next_dagrun - - [TIMESTAMP] - -next_dagrun_create_after - - [TIMESTAMP] - -next_dagrun_data_interval_end - - [TIMESTAMP] - -next_dagrun_data_interval_start - - [TIMESTAMP] - -owners - - [VARCHAR(2000)] - -relative_fileloc - - [VARCHAR(2000)] - -timetable_description - - [VARCHAR(1000)] - -timetable_summary - - [TEXT] + +dag + +dag_id + + [VARCHAR(250)] + NOT NULL + +asset_expression + + [JSON] + +bundle_name + + [VARCHAR(250)] + NOT NULL + +bundle_version + + [VARCHAR(200)] + +dag_display_name + + [VARCHAR(2000)] + +deadline + + [JSON] + +description + + [TEXT] + +fail_fast + + [BOOLEAN] + NOT NULL + +fileloc + + [VARCHAR(2000)] + +has_import_errors + + [BOOLEAN] + +has_task_concurrency_limits + + [BOOLEAN] + NOT NULL + +is_paused + + [BOOLEAN] + +is_stale + + [BOOLEAN] + +last_expired + + [TIMESTAMP] + +last_parse_duration + + [DOUBLE_PRECISION] + +last_parsed_time + + [TIMESTAMP] + +max_active_runs + + [INTEGER] + +max_active_tasks + + [INTEGER] + NOT NULL + +max_consecutive_failed_dag_runs + + [INTEGER] + NOT NULL + +next_dagrun + + [TIMESTAMP] + +next_dagrun_create_after + + [TIMESTAMP] + +next_dagrun_data_interval_end + + [TIMESTAMP] + +next_dagrun_data_interval_start + + [TIMESTAMP] + +owners + + [VARCHAR(2000)] + +relative_fileloc + + [VARCHAR(2000)] + +timetable_description + + [VARCHAR(1000)] + +timetable_summary + + [TEXT] - + dag_bundle:name--dag:bundle_name - -0..N -1 + +0..N +1 team - -team - -id - - [UUID] - NOT NULL - -name - - [VARCHAR(50)] - NOT NULL + +team + +id + + [UUID] + NOT NULL + +name + + [VARCHAR(50)] + NOT NULL - + team:id--dag_bundle_team:team_id - -0..N -1 + +0..N +1 - + connection - -connection - -id - - [INTEGER] - NOT NULL - -conn_id - - [VARCHAR(250)] - NOT NULL - -conn_type - - [VARCHAR(500)] - NOT NULL - -description - - [TEXT] - -extra - - [TEXT] - -host - - [VARCHAR(500)] - -is_encrypted - - [BOOLEAN] - -is_extra_encrypted - - [BOOLEAN] - -login - - [TEXT] - -password - - [TEXT] - -port - - [INTEGER] - -schema - - [VARCHAR(500)] - -team_id - - [UUID] + +connection + +id + + [INTEGER] + NOT NULL + +conn_id + + [VARCHAR(250)] + NOT NULL + +conn_type + + [VARCHAR(500)] + NOT NULL + +description + + [TEXT] + +extra + + [TEXT] + +host + + [VARCHAR(500)] + +is_encrypted + + [BOOLEAN] + +is_extra_encrypted + + [BOOLEAN] + +login + + [TEXT] + +password + + [TEXT] + +port + + [INTEGER] + +schema + + [VARCHAR(500)] + +team_id + + [UUID] - + team:id--connection:team_id - -0..N -{0,1} + +0..N +{0,1} - + variable - -variable - -id - - [INTEGER] - NOT NULL - -description - - [TEXT] - -is_encrypted - - [BOOLEAN] - -key - - [VARCHAR(250)] - -team_id - - [UUID] - -val - - [TEXT] + +variable + +id + + [INTEGER] + NOT NULL + +description + + [TEXT] + +is_encrypted + + [BOOLEAN] + +key + + [VARCHAR(250)] + +team_id + + [UUID] + +val + + [TEXT] - + team:id--variable:team_id - -0..N -{0,1} + +0..N +{0,1} - + slot_pool - -slot_pool - -id - - [INTEGER] - NOT NULL - -description - - [TEXT] - -include_deferred - - [BOOLEAN] - NOT NULL - -pool - - [VARCHAR(256)] - -slots - - [INTEGER] - -team_id - - [UUID] + +slot_pool + +id + + [INTEGER] + NOT NULL + +description + + [TEXT] + +include_deferred + + [BOOLEAN] + NOT NULL + +pool + + [VARCHAR(256)] + +slots + + [INTEGER] + +team_id + + [UUID] - + team:id--slot_pool:team_id - -0..N -{0,1} + +0..N +{0,1} - + +trigger + +trigger + +id + + [INTEGER] + NOT NULL + +classpath + + [VARCHAR(1000)] + NOT NULL + +created_date + + [TIMESTAMP] + NOT NULL + +kwargs + + [TEXT] + NOT NULL + +triggerer_id + + [INTEGER] + + + +callback + +callback + +id + + [UUID] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL + +data + + [JSONB] + NOT NULL + +fetch_method + + [VARCHAR(20)] + NOT NULL + +output + + [TEXT] + +priority_weight + + [INTEGER] + NOT NULL + +state + + [VARCHAR(10)] + +trigger_id + + [INTEGER] + +type + + [VARCHAR(20)] + NOT NULL + + + +trigger:id--callback:trigger_id + +0..N +{0,1} + + + +asset_watcher + +asset_watcher + +asset_id + + [INTEGER] + NOT NULL + +trigger_id + + [INTEGER] + NOT NULL + +name + + [VARCHAR(1500)] + NOT NULL + + + +trigger:id--asset_watcher:trigger_id + +0..N +1 + + + +task_instance + +task_instance + +id + + [UUID] + NOT NULL + +context_carrier + + [JSONB] + +custom_operator_name + + [VARCHAR(1000)] + +dag_id + + [VARCHAR(250)] + NOT NULL + +dag_version_id + + [UUID] + +duration + + [DOUBLE_PRECISION] + +end_date + + [TIMESTAMP] + +executor + + [VARCHAR(1000)] + +executor_config + + [BYTEA] + +external_executor_id + + [VARCHAR(250)] + +hostname + + [VARCHAR(1000)] + +last_heartbeat_at + + [TIMESTAMP] + +map_index + + [INTEGER] + NOT NULL + +max_tries + + [INTEGER] + +next_kwargs + + [JSONB] + +next_method + + [VARCHAR(1000)] + +operator + + [VARCHAR(1000)] + +pid + + [INTEGER] + +pool + + [VARCHAR(256)] + NOT NULL + +pool_slots + + [INTEGER] + NOT NULL + +priority_weight + + [INTEGER] + +queue + + [VARCHAR(256)] + +queued_by_job_id + + [INTEGER] + +queued_dttm + + [TIMESTAMP] + +rendered_map_index + + [VARCHAR(250)] + +run_id + + [VARCHAR(250)] + NOT NULL + +scheduled_dttm + + [TIMESTAMP] + +span_status + + [VARCHAR(250)] + NOT NULL + +start_date + + [TIMESTAMP] + +state + + [VARCHAR(20)] + +task_display_name + + [VARCHAR(2000)] + +task_id + + [VARCHAR(250)] + NOT NULL + +trigger_id + + [INTEGER] + +trigger_timeout + + [TIMESTAMP] + +try_number + + [INTEGER] + +unixname + + [VARCHAR(1000)] + +updated_at + + [TIMESTAMP] + + + +trigger:id--task_instance:trigger_id + +0..N +{0,1} + + + +deadline + +deadline + +id + + [UUID] + NOT NULL + +callback_id + + [UUID] + NOT NULL + +dagrun_id + + [INTEGER] + +deadline_time + + [TIMESTAMP] + NOT NULL + +missed + + [BOOLEAN] + NOT NULL + + + +callback:id--deadline:callback_id + +0..N +1 + + + asset_alias - -asset_alias - -id - - [INTEGER] - NOT NULL - -group - - [VARCHAR(1500)] - NOT NULL - -name - - [VARCHAR(1500)] - NOT NULL + +asset_alias + +id + + [INTEGER] + NOT NULL + +group + + [VARCHAR(1500)] + NOT NULL + +name + + [VARCHAR(1500)] + NOT NULL - + asset_alias_asset - -asset_alias_asset - -alias_id - - [INTEGER] - NOT NULL - -asset_id - - [INTEGER] - NOT NULL + +asset_alias_asset + +alias_id + + [INTEGER] + NOT NULL + +asset_id + + [INTEGER] + NOT NULL - + asset_alias:id--asset_alias_asset:alias_id - -0..N -1 + +0..N +1 - + asset_alias_asset_event - -asset_alias_asset_event - -alias_id - - [INTEGER] - NOT NULL - -event_id - - [INTEGER] - NOT NULL + +asset_alias_asset_event + +alias_id + + [INTEGER] + NOT NULL + +event_id + + [INTEGER] + NOT NULL - + asset_alias:id--asset_alias_asset_event:alias_id - -0..N -1 + +0..N +1 - + dag_schedule_asset_alias_reference - -dag_schedule_asset_alias_reference - -alias_id - - [INTEGER] - NOT NULL - -dag_id - - [VARCHAR(250)] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL - -updated_at - - [TIMESTAMP] - NOT NULL + +dag_schedule_asset_alias_reference + +alias_id + + [INTEGER] + NOT NULL + +dag_id + + [VARCHAR(250)] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL + +updated_at + + [TIMESTAMP] + NOT NULL - + asset_alias:id--dag_schedule_asset_alias_reference:alias_id - -0..N -1 + +0..N +1 - + asset - -asset - -id - - [INTEGER] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL - -extra - - [JSON] - NOT NULL - -group - - [VARCHAR(1500)] - NOT NULL - -name - - [VARCHAR(1500)] - NOT NULL - -updated_at - - [TIMESTAMP] - NOT NULL - -uri - - [VARCHAR(1500)] - NOT NULL + +asset + +id + + [INTEGER] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL + +extra + + [JSON] + NOT NULL + +group + + [VARCHAR(1500)] + NOT NULL + +name + + [VARCHAR(1500)] + NOT NULL + +updated_at + + [TIMESTAMP] + NOT NULL + +uri + + [VARCHAR(1500)] + NOT NULL - + asset:id--asset_alias_asset:asset_id - -0..N -1 - - - -asset_watcher - -asset_watcher - -asset_id - - [INTEGER] - NOT NULL - -trigger_id - - [INTEGER] - NOT NULL - -name - - [VARCHAR(1500)] - NOT NULL + +0..N +1 - + asset:id--asset_watcher:asset_id - -0..N -1 + +0..N +1 - + asset_active - -asset_active - -name - - [VARCHAR(1500)] - NOT NULL - -uri - - [VARCHAR(1500)] - NOT NULL + +asset_active + +name + + [VARCHAR(1500)] + NOT NULL + +uri + + [VARCHAR(1500)] + NOT NULL - + asset:uri--asset_active:uri - -1 -1 + +1 +1 - + asset:name--asset_active:name - -1 -1 + +1 +1 - + dag_schedule_asset_reference - -dag_schedule_asset_reference - -asset_id - - [INTEGER] - NOT NULL - -dag_id - - [VARCHAR(250)] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL - -updated_at - - [TIMESTAMP] - NOT NULL + +dag_schedule_asset_reference + +asset_id + + [INTEGER] + NOT NULL + +dag_id + + [VARCHAR(250)] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL + +updated_at + + [TIMESTAMP] + NOT NULL - + asset:id--dag_schedule_asset_reference:asset_id - -0..N -1 + +0..N +1 - + task_outlet_asset_reference - -task_outlet_asset_reference - -asset_id - - [INTEGER] - NOT NULL - -dag_id - - [VARCHAR(250)] - NOT NULL - -task_id - - [VARCHAR(250)] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL - -updated_at - - [TIMESTAMP] - NOT NULL + +task_outlet_asset_reference + +asset_id + + [INTEGER] + NOT NULL + +dag_id + + [VARCHAR(250)] + NOT NULL + +task_id + + [VARCHAR(250)] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL + +updated_at + + [TIMESTAMP] + NOT NULL - + asset:id--task_outlet_asset_reference:asset_id - -0..N -1 + +0..N +1 - + task_inlet_asset_reference - -task_inlet_asset_reference - -asset_id - - [INTEGER] - NOT NULL - -dag_id - - [VARCHAR(250)] - NOT NULL - -task_id - - [VARCHAR(250)] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL - -updated_at - - [TIMESTAMP] - NOT NULL + +task_inlet_asset_reference + +asset_id + + [INTEGER] + NOT NULL + +dag_id + + [VARCHAR(250)] + NOT NULL + +task_id + + [VARCHAR(250)] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL + +updated_at + + [TIMESTAMP] + NOT NULL - + asset:id--task_inlet_asset_reference:asset_id - -0..N -1 + +0..N +1 - + asset_dag_run_queue - -asset_dag_run_queue - -asset_id - - [INTEGER] - NOT NULL - -target_dag_id - - [VARCHAR(250)] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL + +asset_dag_run_queue + +asset_id + + [INTEGER] + NOT NULL + +target_dag_id + + [VARCHAR(250)] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL - + asset:id--asset_dag_run_queue:asset_id - -0..N -1 + +0..N +1 - + asset_event - -asset_event - -id - - [INTEGER] - NOT NULL - -asset_id - - [INTEGER] - NOT NULL - -extra - - [JSON] - NOT NULL - -source_dag_id - - [VARCHAR(250)] - -source_map_index - - [INTEGER] - -source_run_id - - [VARCHAR(250)] - -source_task_id - - [VARCHAR(250)] - -timestamp - - [TIMESTAMP] - NOT NULL + +asset_event + +id + + [INTEGER] + NOT NULL + +asset_id + + [INTEGER] + NOT NULL + +extra + + [JSON] + NOT NULL + +source_dag_id + + [VARCHAR(250)] + +source_map_index + + [INTEGER] + +source_run_id + + [VARCHAR(250)] + +source_task_id + + [VARCHAR(250)] + +timestamp + + [TIMESTAMP] + NOT NULL - + asset_event:id--asset_alias_asset_event:event_id - -0..N -1 + +0..N +1 - + dagrun_asset_event - -dagrun_asset_event - -dag_run_id - - [INTEGER] - NOT NULL - -event_id - - [INTEGER] - NOT NULL + +dagrun_asset_event + +dag_run_id + + [INTEGER] + NOT NULL + +event_id + + [INTEGER] + NOT NULL - + asset_event:id--dagrun_asset_event:event_id - -0..N -1 + +0..N +1 - - -trigger - -trigger - -id - - [INTEGER] - NOT NULL - -classpath - - [VARCHAR(1000)] - NOT NULL - -created_date - - [TIMESTAMP] - NOT NULL - -kwargs - - [TEXT] - NOT NULL - -triggerer_id - - [INTEGER] - - - -trigger:id--asset_watcher:trigger_id - -0..N -1 - - - -callback - -callback - -id - - [UUID] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL - -data - - [JSONB] - NOT NULL - -fetch_method - - [VARCHAR(20)] - NOT NULL - -output - - [TEXT] - -priority_weight - - [INTEGER] - NOT NULL - -state - - [VARCHAR(10)] - -trigger_id - - [INTEGER] - -type - - [VARCHAR(20)] - NOT NULL - - - -trigger:id--callback:trigger_id - -0..N -{0,1} - - - -task_instance - -task_instance - -id - - [UUID] - NOT NULL - -context_carrier - - [JSONB] - -custom_operator_name - - [VARCHAR(1000)] - -dag_id - - [VARCHAR(250)] - NOT NULL - -dag_version_id - - [UUID] - -duration - - [DOUBLE_PRECISION] - -end_date - - [TIMESTAMP] - -executor - - [VARCHAR(1000)] - -executor_config - - [BYTEA] - -external_executor_id - - [VARCHAR(250)] - -hostname - - [VARCHAR(1000)] - -last_heartbeat_at - - [TIMESTAMP] - -map_index - - [INTEGER] - NOT NULL - -max_tries - - [INTEGER] - -next_kwargs - - [JSONB] - -next_method - - [VARCHAR(1000)] - -operator - - [VARCHAR(1000)] - -pid - - [INTEGER] - -pool - - [VARCHAR(256)] - NOT NULL - -pool_slots - - [INTEGER] - NOT NULL - -priority_weight - - [INTEGER] - -queue - - [VARCHAR(256)] - -queued_by_job_id - - [INTEGER] - -queued_dttm - - [TIMESTAMP] - -rendered_map_index - - [VARCHAR(250)] - -run_id - - [VARCHAR(250)] - NOT NULL - -scheduled_dttm - - [TIMESTAMP] - -span_status - - [VARCHAR(250)] - NOT NULL - -start_date - - [TIMESTAMP] - -state - - [VARCHAR(20)] - -task_display_name - - [VARCHAR(2000)] - -task_id - - [VARCHAR(250)] - NOT NULL - -trigger_id - - [INTEGER] - -trigger_timeout - - [TIMESTAMP] - -try_number - - [INTEGER] - -unixname - - [VARCHAR(1000)] - -updated_at - - [TIMESTAMP] - - - -trigger:id--task_instance:trigger_id - -0..N -{0,1} + + +dag_version + +dag_version + +id + + [UUID] + NOT NULL + +bundle_name + + [VARCHAR(250)] + +bundle_version + + [VARCHAR(250)] + +created_at + + [TIMESTAMP] + NOT NULL + +dag_id + + [VARCHAR(250)] + NOT NULL + +last_updated + + [TIMESTAMP] + NOT NULL + +version_number + + [INTEGER] + NOT NULL - - -deadline - -deadline - -id - - [UUID] - NOT NULL - -callback - - [JSON] - NOT NULL - -callback_state - - [VARCHAR(20)] - -dagrun_id - - [INTEGER] - -deadline_time - - [TIMESTAMP] - NOT NULL - -trigger_id - - [INTEGER] - - - -trigger:id--deadline:trigger_id - -0..N -{0,1} + + +dag:dag_id--dag_version:dag_id + +0..N +1 - + dag_schedule_asset_name_reference - -dag_schedule_asset_name_reference - -dag_id - - [VARCHAR(250)] - NOT NULL - -name - - [VARCHAR(1500)] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL + +dag_schedule_asset_name_reference + +dag_id + + [VARCHAR(250)] + NOT NULL + +name + + [VARCHAR(1500)] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL - + dag:dag_id--dag_schedule_asset_name_reference:dag_id - -0..N -1 + +0..N +1 - + dag_schedule_asset_uri_reference - -dag_schedule_asset_uri_reference - -dag_id - - [VARCHAR(250)] - NOT NULL - -uri - - [VARCHAR(1500)] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL + +dag_schedule_asset_uri_reference + +dag_id + + [VARCHAR(250)] + NOT NULL + +uri + + [VARCHAR(1500)] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL - + dag:dag_id--dag_schedule_asset_uri_reference:dag_id - -0..N -1 + +0..N +1 - + dag:dag_id--dag_schedule_asset_alias_reference:dag_id - -0..N -1 + +0..N +1 - + dag:dag_id--dag_schedule_asset_reference:dag_id - -0..N -1 + +0..N +1 - + dag:dag_id--task_outlet_asset_reference:dag_id - -0..N -1 + +0..N +1 - + dag:dag_id--task_inlet_asset_reference:dag_id - -0..N -1 + +0..N +1 dag:dag_id--asset_dag_run_queue:target_dag_id - -0..N -1 - - - -dag_version - -dag_version - -id - - [UUID] - NOT NULL - -bundle_name - - [VARCHAR(250)] - -bundle_version - - [VARCHAR(250)] - -created_at - - [TIMESTAMP] - NOT NULL - -dag_id - - [VARCHAR(250)] - NOT NULL - -last_updated - - [TIMESTAMP] - NOT NULL - -version_number - - [INTEGER] - NOT NULL - - - -dag:dag_id--dag_version:dag_id - -0..N -1 + +0..N +1 dag_tag - -dag_tag - -dag_id - - [VARCHAR(250)] - NOT NULL - -name - - [VARCHAR(100)] - NOT NULL + +dag_tag + +dag_id + + [VARCHAR(250)] + NOT NULL + +name + + [VARCHAR(100)] + NOT NULL dag:dag_id--dag_tag:dag_id - -0..N -1 + +0..N +1 dag_owner_attributes - -dag_owner_attributes - -dag_id - - [VARCHAR(250)] - NOT NULL - -owner - - [VARCHAR(500)] - NOT NULL - -link - - [VARCHAR(500)] - NOT NULL + +dag_owner_attributes + +dag_id + + [VARCHAR(250)] + NOT NULL + +owner + + [VARCHAR(500)] + NOT NULL + +link + + [VARCHAR(500)] + NOT NULL dag:dag_id--dag_owner_attributes:dag_id - -0..N -1 + +0..N +1 dag_warning - -dag_warning - -dag_id - - [VARCHAR(250)] - NOT NULL - -warning_type - - [VARCHAR(50)] - NOT NULL - -message - - [TEXT] - NOT NULL - -timestamp - - [TIMESTAMP] - NOT NULL + +dag_warning + +dag_id + + [VARCHAR(250)] + NOT NULL + +warning_type + + [VARCHAR(50)] + NOT NULL + +message + + [TEXT] + NOT NULL + +timestamp + + [TIMESTAMP] + NOT NULL dag:dag_id--dag_warning:dag_id - -0..N -1 + +0..N +1 dag_favorite - -dag_favorite - -dag_id - - [VARCHAR(250)] - NOT NULL - -user_id - - [VARCHAR(250)] - NOT NULL + +dag_favorite + +dag_id + + [VARCHAR(250)] + NOT NULL + +user_id + + [VARCHAR(250)] + NOT NULL dag:dag_id--dag_favorite:dag_id - -0..N -1 + +0..N +1 dag_run - -dag_run - -id - - [INTEGER] - NOT NULL - -backfill_id - - [INTEGER] - -bundle_version - - [VARCHAR(250)] - -clear_number - - [INTEGER] - NOT NULL - -conf - - [JSONB] - -context_carrier - - [JSONB] - -created_dag_version_id - - [UUID] - -creating_job_id - - [INTEGER] - -dag_id - - [VARCHAR(250)] - NOT NULL - -data_interval_end - - [TIMESTAMP] - -data_interval_start - - [TIMESTAMP] - -end_date - - [TIMESTAMP] - -last_scheduling_decision - - [TIMESTAMP] - -log_template_id - - [INTEGER] - -logical_date - - [TIMESTAMP] - -queued_at - - [TIMESTAMP] - -run_after - - [TIMESTAMP] - NOT NULL - -run_id - - [VARCHAR(250)] - NOT NULL - -run_type - - [VARCHAR(50)] - NOT NULL - -scheduled_by_job_id - - [INTEGER] - -span_status - - [VARCHAR(250)] - NOT NULL - -start_date - - [TIMESTAMP] - -state - - [VARCHAR(50)] - -triggered_by - - [VARCHAR(50)] - -triggering_user_name - - [VARCHAR(512)] - -updated_at - - [TIMESTAMP] + +dag_run + +id + + [INTEGER] + NOT NULL + +backfill_id + + [INTEGER] + +bundle_version + + [VARCHAR(250)] + +clear_number + + [INTEGER] + NOT NULL + +conf + + [JSONB] + +context_carrier + + [JSONB] + +created_dag_version_id + + [UUID] + +creating_job_id + + [INTEGER] + +dag_id + + [VARCHAR(250)] + NOT NULL + +data_interval_end + + [TIMESTAMP] + +data_interval_start + + [TIMESTAMP] + +end_date + + [TIMESTAMP] + +last_scheduling_decision + + [TIMESTAMP] + +log_template_id + + [INTEGER] + +logical_date + + [TIMESTAMP] + +queued_at + + [TIMESTAMP] + +run_after + + [TIMESTAMP] + NOT NULL + +run_id + + [VARCHAR(250)] + NOT NULL + +run_type + + [VARCHAR(50)] + NOT NULL + +scheduled_by_job_id + + [INTEGER] + +span_status + + [VARCHAR(250)] + NOT NULL + +start_date + + [TIMESTAMP] + +state + + [VARCHAR(50)] + +triggered_by + + [VARCHAR(50)] + +triggering_user_name + + [VARCHAR(512)] + +updated_at + + [TIMESTAMP] - + dag_version:id--dag_run:created_dag_version_id - -0..N -{0,1} + +0..N +{0,1} dag_code - -dag_code - -id - - [UUID] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL - -dag_id - - [VARCHAR(250)] - NOT NULL - -dag_version_id - - [UUID] - NOT NULL - -fileloc - - [VARCHAR(2000)] - NOT NULL - -last_updated - - [TIMESTAMP] - NOT NULL - -source_code - - [TEXT] - NOT NULL - -source_code_hash - - [VARCHAR(32)] - NOT NULL + +dag_code + +id + + [UUID] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL + +dag_id + + [VARCHAR(250)] + NOT NULL + +dag_version_id + + [UUID] + NOT NULL + +fileloc + + [VARCHAR(2000)] + NOT NULL + +last_updated + + [TIMESTAMP] + NOT NULL + +source_code + + [TEXT] + NOT NULL + +source_code_hash + + [VARCHAR(32)] + NOT NULL dag_version:id--dag_code:dag_version_id - -0..N -1 + +0..N +1 serialized_dag - -serialized_dag - -id - - [UUID] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL - -dag_hash - - [VARCHAR(32)] - NOT NULL - -dag_id - - [VARCHAR(250)] - NOT NULL - -dag_version_id - - [UUID] - NOT NULL - -data - - [JSONB] - -data_compressed - - [BYTEA] - -last_updated - - [TIMESTAMP] - NOT NULL + +serialized_dag + +id + + [UUID] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL + +dag_hash + + [VARCHAR(32)] + NOT NULL + +dag_id + + [VARCHAR(250)] + NOT NULL + +dag_version_id + + [UUID] + NOT NULL + +data + + [JSONB] + +data_compressed + + [BYTEA] + +last_updated + + [TIMESTAMP] + NOT NULL dag_version:id--serialized_dag:dag_version_id - -0..N -1 + +0..N +1 - + dag_version:id--task_instance:dag_version_id - -0..N -{0,1} + +0..N +{0,1} log_template - -log_template - -id - - [INTEGER] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL - -elasticsearch_id - - [TEXT] - NOT NULL - -filename - - [TEXT] - NOT NULL + +log_template + +id + + [INTEGER] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL + +elasticsearch_id + + [TEXT] + NOT NULL + +filename + + [TEXT] + NOT NULL - + log_template:id--dag_run:log_template_id - -0..N -{0,1} - - - -dag_run:id--dagrun_asset_event:dag_run_id - -0..N -1 - - - -dag_run:dag_id--task_instance:dag_id - -0..N -1 - - - -dag_run:run_id--task_instance:run_id - -0..N -1 + +0..N +{0,1} - + dag_run:id--deadline:dagrun_id - -0..N -{0,1} + +0..N +{0,1} + + + +dag_run:id--dagrun_asset_event:dag_run_id + +0..N +1 - + backfill_dag_run - -backfill_dag_run - -id - - [INTEGER] - NOT NULL - -backfill_id - - [INTEGER] - NOT NULL - -dag_run_id - - [INTEGER] - -exception_reason - - [VARCHAR(250)] - -logical_date - - [TIMESTAMP] - NOT NULL - -sort_ordinal - - [INTEGER] - NOT NULL + +backfill_dag_run + +id + + [INTEGER] + NOT NULL + +backfill_id + + [INTEGER] + NOT NULL + +dag_run_id + + [INTEGER] + +exception_reason + + [VARCHAR(250)] + +logical_date + + [TIMESTAMP] + NOT NULL + +sort_ordinal + + [INTEGER] + NOT NULL - + dag_run:id--backfill_dag_run:dag_run_id - -0..N -{0,1} + +0..N +{0,1} + + + +dag_run:run_id--task_instance:run_id + +0..N +1 + + + +dag_run:dag_id--task_instance:dag_id + +0..N +1 dag_run_note - -dag_run_note - -dag_run_id - - [INTEGER] - NOT NULL - -content - - [VARCHAR(1000)] - -created_at - - [TIMESTAMP] - NOT NULL - -updated_at - - [TIMESTAMP] - NOT NULL - -user_id - - [VARCHAR(128)] + +dag_run_note + +dag_run_id + + [INTEGER] + NOT NULL + +content + + [VARCHAR(1000)] + +created_at + + [TIMESTAMP] + NOT NULL + +updated_at + + [TIMESTAMP] + NOT NULL + +user_id + + [VARCHAR(128)] dag_run:id--dag_run_note:dag_run_id - -1 -1 + +1 +1 backfill - -backfill - -id - - [INTEGER] - NOT NULL - -completed_at - - [TIMESTAMP] - -created_at - - [TIMESTAMP] - NOT NULL - -dag_id - - [VARCHAR(250)] - NOT NULL - -dag_run_conf - - [JSON] - NOT NULL - -from_date - - [TIMESTAMP] - NOT NULL - -is_paused - - [BOOLEAN] - -max_active_runs - - [INTEGER] - NOT NULL - -reprocess_behavior - - [VARCHAR(250)] - NOT NULL - -to_date - - [TIMESTAMP] - NOT NULL - -triggering_user_name - - [VARCHAR(512)] - -updated_at - - [TIMESTAMP] - NOT NULL + +backfill + +id + + [INTEGER] + NOT NULL + +completed_at + + [TIMESTAMP] + +created_at + + [TIMESTAMP] + NOT NULL + +dag_id + + [VARCHAR(250)] + NOT NULL + +dag_run_conf + + [JSON] + NOT NULL + +from_date + + [TIMESTAMP] + NOT NULL + +is_paused + + [BOOLEAN] + +max_active_runs + + [INTEGER] + NOT NULL + +reprocess_behavior + + [VARCHAR(250)] + NOT NULL + +to_date + + [TIMESTAMP] + NOT NULL + +triggering_user_name + + [VARCHAR(512)] + +updated_at + + [TIMESTAMP] + NOT NULL backfill:id--dag_run:backfill_id - -0..N -{0,1} + +0..N +{0,1} - + backfill:id--backfill_dag_run:backfill_id - -0..N -1 + +0..N +1 hitl_detail - -hitl_detail - -ti_id - - [UUID] - NOT NULL - -assignees - - [JSON] - -body - - [TEXT] - -chosen_options - - [JSON] - -created_at - - [TIMESTAMP] - NOT NULL - -defaults - - [JSON] - -multiple - - [BOOLEAN] - -options - - [JSON] - NOT NULL - -params - - [JSON] - NOT NULL - -params_input - - [JSON] - NOT NULL - -responded_at - - [TIMESTAMP] - -responded_by - - [JSON] - -subject - - [TEXT] - NOT NULL + +hitl_detail + +ti_id + + [UUID] + NOT NULL + +assignees + + [JSON] + +body + + [TEXT] + +chosen_options + + [JSON] + +created_at + + [TIMESTAMP] + NOT NULL + +defaults + + [JSON] + +multiple + + [BOOLEAN] + +options + + [JSON] + NOT NULL + +params + + [JSON] + NOT NULL + +params_input + + [JSON] + NOT NULL + +responded_at + + [TIMESTAMP] + +responded_by + + [JSON] + +subject + + [TEXT] + NOT NULL task_instance:id--hitl_detail:ti_id - -1 -1 + +1 +1 task_map - -task_map - -dag_id - - [VARCHAR(250)] - NOT NULL - -map_index - - [INTEGER] - NOT NULL - -run_id - - [VARCHAR(250)] - NOT NULL - -task_id - - [VARCHAR(250)] - NOT NULL - -keys - - [JSONB] - -length - - [INTEGER] - NOT NULL + +task_map + +dag_id + + [VARCHAR(250)] + NOT NULL + +map_index + + [INTEGER] + NOT NULL + +run_id + + [VARCHAR(250)] + NOT NULL + +task_id + + [VARCHAR(250)] + NOT NULL + +keys + + [JSONB] + +length + + [INTEGER] + NOT NULL -task_instance:run_id--task_map:run_id - -0..N -1 +task_instance:task_id--task_map:task_id + +0..N +1 -task_instance:task_id--task_map:task_id - -0..N -1 +task_instance:map_index--task_map:map_index + +0..N +1 -task_instance:map_index--task_map:map_index - -0..N -1 +task_instance:run_id--task_map:run_id + +0..N +1 task_instance:dag_id--task_map:dag_id - -0..N -1 + +0..N +1 task_reschedule - -task_reschedule - -id - - [INTEGER] - NOT NULL - -duration - - [INTEGER] - NOT NULL - -end_date - - [TIMESTAMP] - NOT NULL - -reschedule_date - - [TIMESTAMP] - NOT NULL - -start_date - - [TIMESTAMP] - NOT NULL - -ti_id - - [UUID] - NOT NULL + +task_reschedule + +id + + [INTEGER] + NOT NULL + +duration + + [INTEGER] + NOT NULL + +end_date + + [TIMESTAMP] + NOT NULL + +reschedule_date + + [TIMESTAMP] + NOT NULL + +start_date + + [TIMESTAMP] + NOT NULL + +ti_id + + [UUID] + NOT NULL task_instance:id--task_reschedule:ti_id - -0..N -1 + +0..N +1 xcom - -xcom - -dag_run_id - - [INTEGER] - NOT NULL - -key - - [VARCHAR(512)] - NOT NULL - -map_index - - [INTEGER] - NOT NULL - -task_id - - [VARCHAR(250)] - NOT NULL - -dag_id - - [VARCHAR(250)] - NOT NULL - -run_id - - [VARCHAR(250)] - NOT NULL - -timestamp - - [TIMESTAMP] - NOT NULL - -value - - [JSONB] + +xcom + +dag_run_id + + [INTEGER] + NOT NULL + +key + + [VARCHAR(512)] + NOT NULL + +map_index + + [INTEGER] + NOT NULL + +task_id + + [VARCHAR(250)] + NOT NULL + +dag_id + + [VARCHAR(250)] + NOT NULL + +run_id + + [VARCHAR(250)] + NOT NULL + +timestamp + + [TIMESTAMP] + NOT NULL + +value + + [JSONB] -task_instance:task_id--xcom:task_id - -0..N -1 +task_instance:dag_id--xcom:dag_id + +0..N +1 task_instance:map_index--xcom:map_index - -0..N -1 + +0..N +1 task_instance:run_id--xcom:run_id - -0..N -1 + +0..N +1 -task_instance:dag_id--xcom:dag_id - -0..N -1 +task_instance:task_id--xcom:task_id + +0..N +1 task_instance_note - -task_instance_note - -ti_id - - [UUID] - NOT NULL - -content - - [VARCHAR(1000)] - -created_at - - [TIMESTAMP] - NOT NULL - -updated_at - - [TIMESTAMP] - NOT NULL - -user_id - - [VARCHAR(128)] + +task_instance_note + +ti_id + + [UUID] + NOT NULL + +content + + [VARCHAR(1000)] + +created_at + + [TIMESTAMP] + NOT NULL + +updated_at + + [TIMESTAMP] + NOT NULL + +user_id + + [VARCHAR(128)] task_instance:id--task_instance_note:ti_id - -1 -1 + +1 +1 task_instance_history - -task_instance_history - -task_instance_id - - [UUID] - NOT NULL - -context_carrier - - [JSONB] - -custom_operator_name - - [VARCHAR(1000)] - -dag_id - - [VARCHAR(250)] - NOT NULL - -dag_version_id - - [UUID] - -duration - - [DOUBLE_PRECISION] - -end_date - - [TIMESTAMP] - -executor - - [VARCHAR(1000)] - -executor_config - - [BYTEA] - -external_executor_id - - [VARCHAR(250)] - -hostname - - [VARCHAR(1000)] - -map_index - - [INTEGER] - NOT NULL - -max_tries - - [INTEGER] - -next_kwargs - - [JSONB] - -next_method - - [VARCHAR(1000)] - -operator - - [VARCHAR(1000)] - -pid - - [INTEGER] - -pool - - [VARCHAR(256)] - NOT NULL - -pool_slots - - [INTEGER] - NOT NULL - -priority_weight - - [INTEGER] - -queue - - [VARCHAR(256)] - -queued_by_job_id - - [INTEGER] - -queued_dttm - - [TIMESTAMP] - -rendered_map_index - - [VARCHAR(250)] - -run_id - - [VARCHAR(250)] - NOT NULL - -scheduled_dttm - - [TIMESTAMP] - -span_status - - [VARCHAR(250)] - NOT NULL - -start_date - - [TIMESTAMP] - -state - - [VARCHAR(20)] - -task_display_name - - [VARCHAR(2000)] - -task_id - - [VARCHAR(250)] - NOT NULL - -trigger_id - - [INTEGER] - -trigger_timeout - - [TIMESTAMP] - -try_number - - [INTEGER] - NOT NULL - -unixname - - [VARCHAR(1000)] - -updated_at - - [TIMESTAMP] + +task_instance_history + +task_instance_id + + [UUID] + NOT NULL + +context_carrier + + [JSONB] + +custom_operator_name + + [VARCHAR(1000)] + +dag_id + + [VARCHAR(250)] + NOT NULL + +dag_version_id + + [UUID] + +duration + + [DOUBLE_PRECISION] + +end_date + + [TIMESTAMP] + +executor + + [VARCHAR(1000)] + +executor_config + + [BYTEA] + +external_executor_id + + [VARCHAR(250)] + +hostname + + [VARCHAR(1000)] + +map_index + + [INTEGER] + NOT NULL + +max_tries + + [INTEGER] + +next_kwargs + + [JSONB] + +next_method + + [VARCHAR(1000)] + +operator + + [VARCHAR(1000)] + +pid + + [INTEGER] + +pool + + [VARCHAR(256)] + NOT NULL + +pool_slots + + [INTEGER] + NOT NULL + +priority_weight + + [INTEGER] + +queue + + [VARCHAR(256)] + +queued_by_job_id + + [INTEGER] + +queued_dttm + + [TIMESTAMP] + +rendered_map_index + + [VARCHAR(250)] + +run_id + + [VARCHAR(250)] + NOT NULL + +scheduled_dttm + + [TIMESTAMP] + +span_status + + [VARCHAR(250)] + NOT NULL + +start_date + + [TIMESTAMP] + +state + + [VARCHAR(20)] + +task_display_name + + [VARCHAR(2000)] + +task_id + + [VARCHAR(250)] + NOT NULL + +trigger_id + + [INTEGER] + +trigger_timeout + + [TIMESTAMP] + +try_number + + [INTEGER] + NOT NULL + +unixname + + [VARCHAR(1000)] + +updated_at + + [TIMESTAMP] -task_instance:run_id--task_instance_history:run_id - -0..N -1 +task_instance:dag_id--task_instance_history:dag_id + +0..N +1 -task_instance:task_id--task_instance_history:task_id - -0..N -1 +task_instance:run_id--task_instance_history:run_id + +0..N +1 -task_instance:dag_id--task_instance_history:dag_id - -0..N -1 +task_instance:task_id--task_instance_history:task_id + +0..N +1 task_instance:map_index--task_instance_history:map_index - -0..N -1 + +0..N +1 rendered_task_instance_fields - -rendered_task_instance_fields - -dag_id - - [VARCHAR(250)] - NOT NULL - -map_index - - [INTEGER] - NOT NULL - -run_id - - [VARCHAR(250)] - NOT NULL - -task_id - - [VARCHAR(250)] - NOT NULL - -k8s_pod_yaml - - [JSON] - -rendered_fields - - [JSON] - NOT NULL + +rendered_task_instance_fields + +dag_id + + [VARCHAR(250)] + NOT NULL + +map_index + + [INTEGER] + NOT NULL + +run_id + + [VARCHAR(250)] + NOT NULL + +task_id + + [VARCHAR(250)] + NOT NULL + +k8s_pod_yaml + + [JSON] + +rendered_fields + + [JSON] + NOT NULL -task_instance:task_id--rendered_task_instance_fields:task_id - -0..N -1 +task_instance:run_id--rendered_task_instance_fields:run_id + +0..N +1 -task_instance:map_index--rendered_task_instance_fields:map_index - -0..N -1 +task_instance:dag_id--rendered_task_instance_fields:dag_id + +0..N +1 -task_instance:dag_id--rendered_task_instance_fields:dag_id - -0..N -1 +task_instance:task_id--rendered_task_instance_fields:task_id + +0..N +1 -task_instance:run_id--rendered_task_instance_fields:run_id - -0..N -1 +task_instance:map_index--rendered_task_instance_fields:map_index + +0..N +1 hitl_detail_history - -hitl_detail_history - -ti_history_id - - [UUID] - NOT NULL - -assignees - - [JSON] - -body - - [TEXT] - -chosen_options - - [JSON] - -created_at - - [TIMESTAMP] - NOT NULL - -defaults - - [JSON] - -multiple - - [BOOLEAN] - -options - - [JSON] - NOT NULL - -params - - [JSON] - NOT NULL - -params_input - - [JSON] - NOT NULL - -responded_at - - [TIMESTAMP] - -responded_by - - [JSON] - -subject - - [TEXT] - NOT NULL + +hitl_detail_history + +ti_history_id + + [UUID] + NOT NULL + +assignees + + [JSON] + +body + + [TEXT] + +chosen_options + + [JSON] + +created_at + + [TIMESTAMP] + NOT NULL + +defaults + + [JSON] + +multiple + + [BOOLEAN] + +options + + [JSON] + NOT NULL + +params + + [JSON] + NOT NULL + +params_input + + [JSON] + NOT NULL + +responded_at + + [TIMESTAMP] + +responded_by + + [JSON] + +subject + + [TEXT] + NOT NULL task_instance_history:task_instance_id--hitl_detail_history:ti_history_id - -1 -1 + +1 +1 alembic_version - -alembic_version - -version_num - - [VARCHAR(32)] - NOT NULL + +alembic_version + +version_num + + [VARCHAR(32)] + NOT NULL diff --git a/airflow-core/docs/migrations-ref.rst b/airflow-core/docs/migrations-ref.rst index 442b0c87ae53b..c062a1a7312ef 100644 --- a/airflow-core/docs/migrations-ref.rst +++ b/airflow-core/docs/migrations-ref.rst @@ -39,7 +39,10 @@ Here's the list of all the Database Migrations that are executed via when you ru +-------------------------+------------------+-------------------+--------------------------------------------------------------+ | Revision ID | Revises ID | Airflow Version | Description | +=========================+==================+===================+==============================================================+ -| ``b87d2135fa50`` (head) | ``69ddce9a7247`` | ``3.2.0`` | Restructure callback table. | +| ``e812941398f4`` (head) | ``b87d2135fa50`` | ``3.2.0`` | Replace deadline's inline callback fields with foreign key | +| | | | to callback table. | ++-------------------------+------------------+-------------------+--------------------------------------------------------------+ +| ``b87d2135fa50`` | ``69ddce9a7247`` | ``3.2.0`` | Restructure callback table. | +-------------------------+------------------+-------------------+--------------------------------------------------------------+ | ``69ddce9a7247`` | ``5cc8117e9285`` | ``3.2.0`` | Add ``fail_fast`` column to dag table. | +-------------------------+------------------+-------------------+--------------------------------------------------------------+ diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py b/airflow-core/src/airflow/jobs/scheduler_job_runner.py index b86c12c9ff4bf..4d88668d2c696 100644 --- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py +++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py @@ -65,6 +65,7 @@ TaskOutletAssetReference, ) from airflow.models.backfill import Backfill +from airflow.models.callback import Callback from airflow.models.dag import DagModel, get_next_data_interval, get_run_data_interval from airflow.models.dag_version import DagVersion from airflow.models.dagbag import DBDagBag @@ -1414,11 +1415,12 @@ def _run_scheduler_loop(self) -> None: with create_session() as session: # Only retrieve expired deadlines that haven't been processed yet. - # `callback_state` is null/None by default until the handler set it. + # `missed` is False by default until the handler sets it. for deadline in session.scalars( select(Deadline) .where(Deadline.deadline_time < datetime.now(timezone.utc)) - .where(Deadline.callback_state.is_(None)) + .where(~Deadline.missed) + .options(selectinload(Deadline.callback), selectinload(Deadline.dagrun)) ): deadline.handle_miss(session) @@ -2555,7 +2557,7 @@ def _remove_unreferenced_triggers(self, *, session: Session = NEW_SESSION) -> No delete(Trigger) .where( Trigger.id.not_in(select(AssetWatcherModel.trigger_id)), - Trigger.id.not_in(select(Deadline.trigger_id)), + Trigger.id.not_in(select(Callback.trigger_id)), Trigger.id.not_in(select(TaskInstance.trigger_id)), ) .execution_options(synchronize_session="fetch") diff --git a/airflow-core/src/airflow/jobs/triggerer_job_runner.py b/airflow-core/src/airflow/jobs/triggerer_job_runner.py index 617643c3f24ad..ddb52618988b1 100644 --- a/airflow-core/src/airflow/jobs/triggerer_job_runner.py +++ b/airflow-core/src/airflow/jobs/triggerer_job_runner.py @@ -627,7 +627,7 @@ def update_triggers(self, requested_trigger_ids: set[int]): new_trigger_orm = new_triggers[new_id] - # If the trigger is not associated to a task, an asset, or a deadline, this means the TaskInstance + # If the trigger is not associated to a task, an asset, or a callback, this means the TaskInstance # row was updated by either Trigger.submit_event or Trigger.submit_failure # and can happen when a single trigger Job is being run on multiple TriggerRunners # in a High-Availability setup. diff --git a/airflow-core/src/airflow/migrations/versions/0092_3_2_0_replace_deadline_inline_callback_with_fkey.py b/airflow-core/src/airflow/migrations/versions/0092_3_2_0_replace_deadline_inline_callback_with_fkey.py new file mode 100644 index 0000000000000..778c2302b7291 --- /dev/null +++ b/airflow-core/src/airflow/migrations/versions/0092_3_2_0_replace_deadline_inline_callback_with_fkey.py @@ -0,0 +1,338 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Replace deadline's inline callback fields with foreign key to callback table. + +Revision ID: e812941398f4 +Revises: b87d2135fa50 +Create Date: 2025-10-24 00:34:57.111239 + +""" + +from __future__ import annotations + +from datetime import datetime, timezone +from textwrap import dedent + +import sqlalchemy as sa +from alembic import context, op +from sqlalchemy import column, select, table +from sqlalchemy_jsonfield import JSONField +from sqlalchemy_utils import UUIDType + +from airflow.serialization.serde import deserialize +from airflow.utils.sqlalchemy import ExtendedJSON, UtcDateTime + +# revision identifiers, used by Alembic. +revision = "e812941398f4" +down_revision = "b87d2135fa50" +branch_labels = None +depends_on = None +airflow_version = "3.2.0" + +BATCH_SIZE = 1000 + + +def upgrade(): + """Replace Deadline table's inline callback fields with callback_id foreign key.""" + import uuid6 + + from airflow.models.base import StringID + from airflow.models.callback import CallbackFetchMethod, CallbackState, CallbackType + from airflow.models.deadline import CALLBACK_METRICS_PREFIX + + timestamp = datetime.now(timezone.utc) + + def migrate_batch(conn, deadline_table, callback_table, batch): + callback_inserts = [] + deadline_updates = [] + + for deadline in batch: + try: + callback_id = uuid6.uuid7() + + # Transform serialized callback to the new representation + callback_data = deserialize(deadline.callback).serialize() | { + "prefix": CALLBACK_METRICS_PREFIX, + "dag_id": deadline.dag_id, + } + + if deadline.callback_state and deadline.callback_state in { + CallbackState.FAILED, + CallbackState.SUCCESS, + }: + deadline_missed = True + callback_state = deadline.callback_state + else: + # Mark the deadlines in non-terminal states as not missed so the scheduler handles them + deadline_missed = False + callback_state = CallbackState.PENDING + + callback_inserts.append( + { + "id": callback_id, + "type": CallbackType.TRIGGERER, # Past versions only support triggerer callbacks + "fetch_method": CallbackFetchMethod.IMPORT_PATH, # Past versions only support import_path + "data": callback_data, + "state": callback_state, + "priority_weight": 1, # Default priority weight + "created_at": timestamp, + } + ) + + deadline_updates.append( + {"deadline_id": deadline.id, "callback_id": callback_id, "missed": deadline_missed} + ) + except Exception: + print(f"Failed to migrate deadline: {deadline}") + raise + + conn.execute(callback_table.insert(), callback_inserts) + conn.execute( + deadline_table.update() + .where(deadline_table.c.id == sa.bindparam("deadline_id")) + .values(callback_id=sa.bindparam("callback_id"), missed=sa.bindparam("missed")), + deadline_updates, + ) + + def migrate_all_data(): + if context.is_offline_mode(): + print( + dedent(""" + ------------ + -- WARNING: Unable to migrate the data in the deadline table while in offline mode! + -- All the rows in the deadline table will be deleted in this mode. + ------------ + """) + ) + op.execute("DELETE FROM deadline") + return + + deadline_table = table( + "deadline", + column("id", UUIDType(binary=False)), + column("dagrun_id", sa.Integer()), + column("deadline_time", UtcDateTime(timezone=True)), + column("callback", JSONField()), + column("callback_state", sa.String(20)), + column("missed", sa.Boolean()), + column("callback_id", UUIDType(binary=False)), + ) + + dag_run_table = table( + "dag_run", + column("id", sa.Integer()), + column("dag_id", StringID()), + ) + + callback_table = table( + "callback", + column("id", UUIDType(binary=False)), + column("type", sa.String(20)), + column("fetch_method", sa.String(20)), + column("data", ExtendedJSON()), + column("state", sa.String(10)), + column("priority_weight", sa.Integer()), + column("created_at", UtcDateTime(timezone=True)), + ) + + conn = op.get_bind() + batch_num = 0 + while True: + batch_num += 1 + batch = conn.execute( + select( + deadline_table.c.id, + deadline_table.c.dagrun_id, + deadline_table.c.deadline_time, + deadline_table.c.callback, + deadline_table.c.callback_state, + dag_run_table.c.dag_id, + ) + .join(dag_run_table, deadline_table.c.dagrun_id == dag_run_table.c.id) + .where(deadline_table.c.callback_id.is_(None)) # Only get rows that haven't been migrated yet + .limit(BATCH_SIZE) + ).fetchall() + + if not batch: + break + + migrate_batch(conn, deadline_table, callback_table, batch) + print(f"Migrated {len(batch)} deadline records in batch {batch_num}") + + # Add new columns (temporarily nullable until data has been migrated) + with op.batch_alter_table("deadline") as batch_op: + batch_op.add_column(sa.Column("missed", sa.Boolean(), nullable=True)) + batch_op.add_column(sa.Column("callback_id", UUIDType(binary=False), nullable=True)) + + migrate_all_data() + + with op.batch_alter_table("deadline") as batch_op: + # Data for `missed` and `callback_id` has been migrated so make them non-nullable + batch_op.alter_column("missed", existing_type=sa.Boolean(), nullable=False) + batch_op.alter_column("callback_id", existing_type=UUIDType(binary=False), nullable=False) + + batch_op.create_index("deadline_missed_deadline_time_idx", ["missed", "deadline_time"], unique=False) + batch_op.drop_index(batch_op.f("deadline_callback_state_time_idx")) + batch_op.create_foreign_key( + batch_op.f("deadline_callback_id_fkey"), "callback", ["callback_id"], ["id"], ondelete="CASCADE" + ) + batch_op.drop_constraint(batch_op.f("deadline_trigger_id_fkey"), type_="foreignkey") + batch_op.drop_column("callback") + batch_op.drop_column("trigger_id") + batch_op.drop_column("callback_state") + + +def downgrade(): + """Restore Deadline table's inline callback fields from callback_id foreign key.""" + from airflow.models.callback import CallbackState + + def migrate_batch(conn, deadline_table, callback_table, batch): + deadline_updates = [] + callback_ids_to_delete = [] + + for row in batch: + try: + filtered_cb_data = {k: row.callback_data[k] for k in ("path", "kwargs")} + + # Hard-coding the serialization to avoid SDK import. + # Since only AsyncCallback was supported in the previous versions, this is equivalent to: + # from airflow.serialization.serde import serialize + # from airflow.sdk.definitions.deadline import AsyncCallback + # callback_serialized = serialize(AsyncCallback.deserialize(filtered_data, 0)) + callback_serialized = { + "__data__": filtered_cb_data, + "__classname__": "airflow.sdk.definitions.deadline.AsyncCallback", + "__version__": 0, + } + + # Mark the deadline as not handled if its callback is not in a terminal state so that the + # scheduler handles it appropriately + if row.callback_state in {CallbackState.SUCCESS, CallbackState.FAILED}: + callback_state = row.callback_state + else: + callback_state = None + + deadline_updates.append( + { + "deadline_id": row.deadline_id, + "callback": callback_serialized, + "callback_state": callback_state, + "trigger_id": None, + "callback_id": None, + } + ) + + callback_ids_to_delete.append(row.callback_id) + + except Exception: + print(f"Failed to migrate row: {row}") + raise + + conn.execute( + deadline_table.update() + .where(deadline_table.c.id == sa.bindparam("deadline_id")) + .values( + callback=sa.bindparam("callback"), + callback_state=sa.bindparam("callback_state"), + trigger_id=sa.bindparam("trigger_id"), + callback_id=sa.bindparam("callback_id"), + ), + deadline_updates, + ) + conn.execute(callback_table.delete().where(callback_table.c.id.in_(callback_ids_to_delete))) + + def migrate_all_data(): + if context.is_offline_mode(): + print( + dedent(""" + ------------ + -- WARNING: Unable to migrate the data in the + -- deadline and callback tables while in offline mode! + -- All the rows in the deadline table and the referenced rows in + -- the callback table will be deleted in this mode. + ------------ + """) + ) + op.execute("DELETE FROM deadline") + return + + deadline_table = table( + "deadline", + column("id", UUIDType(binary=False)), + column("callback_id", UUIDType(binary=False)), + column("callback", JSONField()), + column("callback_state", sa.String(20)), + column("trigger_id", sa.Integer()), + ) + + callback_table = table( + "callback", + column("id", UUIDType(binary=False)), + column("data", ExtendedJSON()), + column("state", sa.String(10)), + ) + + conn = op.get_bind() + batch_num = 0 + + while True: + batch_num += 1 + batch = conn.execute( + select( + deadline_table.c.id.label("deadline_id"), + deadline_table.c.callback_id, + callback_table.c.data.label("callback_data"), + callback_table.c.state.label("callback_state"), + ) + .join(callback_table, deadline_table.c.callback_id == callback_table.c.id) + .where(deadline_table.c.callback.is_(None)) # Only get rows that haven't been downgraded yet + .limit(BATCH_SIZE) + ).fetchall() + + if not batch: + break + + migrate_batch(conn, deadline_table, callback_table, batch) + print(f"Migrated {len(batch)} deadline records in batch {batch_num}") + + with op.batch_alter_table("deadline") as batch_op: + batch_op.add_column(sa.Column("callback_state", sa.VARCHAR(length=20), nullable=True)) + batch_op.add_column(sa.Column("trigger_id", sa.INTEGER(), autoincrement=False, nullable=True)) + + # Temporarily nullable until data has been migrated + batch_op.add_column(sa.Column("callback", JSONField(), nullable=True)) + + # Make callback_id nullable so the associated callbacks can be cleared during migration + batch_op.alter_column("callback_id", existing_type=UUIDType(binary=False), nullable=True) + + migrate_all_data() + + with op.batch_alter_table("deadline") as batch_op: + # Data for `callback` has been migrated so make it non-nullable + batch_op.alter_column("callback", existing_type=JSONField(), nullable=False) + + batch_op.drop_constraint(batch_op.f("deadline_callback_id_fkey"), type_="foreignkey") + batch_op.create_foreign_key(batch_op.f("deadline_trigger_id_fkey"), "trigger", ["trigger_id"], ["id"]) + batch_op.drop_index("deadline_missed_deadline_time_idx") + batch_op.create_index( + batch_op.f("deadline_callback_state_time_idx"), ["callback_state", "deadline_time"], unique=False + ) + batch_op.drop_column("callback_id") + batch_op.drop_column("missed") diff --git a/airflow-core/src/airflow/models/callback.py b/airflow-core/src/airflow/models/callback.py index b1da459b8afe2..bfaa43db6b2b3 100644 --- a/airflow-core/src/airflow/models/callback.py +++ b/airflow-core/src/airflow/models/callback.py @@ -29,6 +29,7 @@ from airflow._shared.timezones import timezone from airflow.models import Base +from airflow.stats import Stats from airflow.utils.sqlalchemy import ExtendedJSON, UtcDateTime, mapped_column if TYPE_CHECKING: @@ -50,11 +51,15 @@ class CallbackState(str, Enum): FAILED = "failed" +ACTIVE_STATES = frozenset((CallbackState.QUEUED, CallbackState.RUNNING)) +TERMINAL_STATES = frozenset((CallbackState.SUCCESS, CallbackState.FAILED)) + + class CallbackType(str, Enum): """ Types of Callbacks. - Used for figuring out what class to instantiate while deserialization. + Used for figuring out what class to instantiate during deserialization. """ TRIGGERER = "triggerer" @@ -85,7 +90,7 @@ class ImportPathCallbackDefProtocol(CallbackDefinitionProtocol, Protocol): """Protocol for callbacks that use the import path fetch method.""" path: str - kwargs: dict | None + kwargs: dict @runtime_checkable @@ -131,26 +136,49 @@ class Callback(Base): trigger_id: Mapped[int] = mapped_column(Integer, ForeignKey("trigger.id"), nullable=True) trigger = relationship("Trigger", back_populates="callback", uselist=False) - def __init__(self, priority_weight: int = 1): + def __init__(self, priority_weight: int = 1, prefix: str = "", **kwargs): + """ + Initialize a Callback. This is the base class so it shouldn't usually need to be initialized. + + :param priority_weight: Priority for callback execution (higher value -> higher priority) + :param prefix: Optional prefix for metric names + :param kwargs: Additional data emitted in metric tags + """ self.state = CallbackState.PENDING self.priority_weight = priority_weight + self.data = kwargs # kwargs can be used to include additional info in metric tags + if prefix: + self.data["prefix"] = prefix def queue(self): self.state = CallbackState.QUEUED + def get_metric_info(self, status: str, result: Any) -> dict: + tags = {"result": result, **self.data} + tags.pop("prefix", None) + + if "kwargs" in tags: + # Remove the context (if exists) to keep the tags simple + tags["kwargs"] = {k: v for k, v in tags["kwargs"].items() if k != "context"} + + prefix = self.data.get("prefix", "") + name = f"{prefix}.callback_{status}" if prefix else f"callback_{status}" + + return {"stat": name, "tags": tags} + @staticmethod - def create_from_sdk_def(callback_def: CallbackDefinitionProtocol) -> Callback: + def create_from_sdk_def(callback_def: CallbackDefinitionProtocol, **kwargs) -> Callback: # Cannot check actual type using isinstance() because that would require SDK import match type(callback_def).__name__: case "AsyncCallback": if TYPE_CHECKING: assert isinstance(callback_def, ImportPathCallbackDefProtocol) - return TriggererCallback(callback_def) + return TriggererCallback(callback_def, **kwargs) case "SyncCallback": if TYPE_CHECKING: assert isinstance(callback_def, ImportPathExecutorCallbackDefProtocol) - return ExecutorCallback(callback_def, fetch_method=CallbackFetchMethod.IMPORT_PATH) + return ExecutorCallback(callback_def, fetch_method=CallbackFetchMethod.IMPORT_PATH, **kwargs) case _: raise ValueError(f"Cannot handle Callback of type {type(callback_def)}") @@ -162,17 +190,44 @@ class TriggererCallback(Callback): __mapper_args__ = {"polymorphic_identity": CallbackType.TRIGGERER} def __init__(self, callback_def: ImportPathCallbackDefProtocol, **kwargs): + """ + Initialize a TriggererCallback from a callback definition. + + :param callback_def: Callback definition with path and kwargs + :param kwargs: Passed to parent Callback.__init__ (see base class for details) + """ super().__init__(**kwargs) self.fetch_method = CallbackFetchMethod.IMPORT_PATH - self.data = callback_def.serialize() + self.data |= callback_def.serialize() + + def __repr__(self): + return f"{self.data['path']}({self.data['kwargs'] or ''}) on a triggerer" def queue(self): - # TODO: queue the trigger + from airflow.models.trigger import Trigger + from airflow.triggers.callback import CallbackTrigger + + self.trigger = Trigger.from_object( + CallbackTrigger( + callback_path=self.data["path"], + callback_kwargs=self.data["kwargs"], + ) + ) super().queue() def handle_event(self, event: TriggerEvent, session: Session): - # TODO: modify fields based on the event - pass + from airflow.triggers.callback import PAYLOAD_BODY_KEY, PAYLOAD_STATUS_KEY + + if (status := event.payload.get(PAYLOAD_STATUS_KEY)) and status in (ACTIVE_STATES | TERMINAL_STATES): + self.state = status + if status in TERMINAL_STATES: + self.trigger = None + self.output = event.payload.get(PAYLOAD_BODY_KEY) + Stats.incr(**self.get_metric_info(status, self.output)) + + session.add(self) + else: + log.error("Unexpected event received: %s", event.payload) class ExecutorCallback(Callback): @@ -183,9 +238,19 @@ class ExecutorCallback(Callback): def __init__( self, callback_def: ImportPathExecutorCallbackDefProtocol, fetch_method: CallbackFetchMethod, **kwargs ): + """ + Initialize an ExecutorCallback from a callback definition and fetch method. + + :param callback_def: Callback definition with path, kwargs, and executor + :param fetch_method: Method to fetch the callback at runtime + :param kwargs: Passed to parent Callback.__init__ (see base class for details) + """ super().__init__(**kwargs) self.fetch_method = fetch_method - self.data = callback_def.serialize() + self.data |= callback_def.serialize() + + def __repr__(self): + return f"{self.data['path']}({self.data['kwargs'] or ''}) on {self.data.get('executor', 'default')} executor" class DagProcessorCallback(Callback): @@ -194,11 +259,12 @@ class DagProcessorCallback(Callback): __mapper_args__ = {"polymorphic_identity": CallbackType.DAG_PROCESSOR} def __init__(self, priority_weight: int, callback: CallbackRequest): + """Initialize a DagProcessorCallback from a callback request.""" super().__init__(priority_weight=priority_weight) self.fetch_method = CallbackFetchMethod.DAG_ATTRIBUTE self.state = None - self.data = {"req_class": callback.__class__.__name__, "req_data": callback.to_json()} + self.data |= {"req_class": callback.__class__.__name__, "req_data": callback.to_json()} def get_callback_request(self) -> CallbackRequest: module = import_module("airflow.callbacks.callback_requests") diff --git a/airflow-core/src/airflow/models/deadline.py b/airflow-core/src/airflow/models/deadline.py index 3ecac2679dc9d..9e55f02c30754 100644 --- a/airflow-core/src/airflow/models/deadline.py +++ b/airflow-core/src/airflow/models/deadline.py @@ -20,24 +20,18 @@ from abc import ABC, abstractmethod from dataclasses import dataclass from datetime import datetime, timedelta -from enum import Enum -from functools import cached_property from typing import TYPE_CHECKING, Any, cast -import sqlalchemy_jsonfield import uuid6 -from sqlalchemy import ForeignKey, Index, Integer, String, and_, func, inspect, select, text +from sqlalchemy import Boolean, ForeignKey, Index, Integer, and_, func, inspect, select, text from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.orm import Mapped, relationship from sqlalchemy_utils import UUIDType from airflow._shared.timezones import timezone -from airflow.models import Trigger from airflow.models.base import Base -from airflow.serialization.serde import deserialize, serialize -from airflow.settings import json +from airflow.models.callback import Callback, CallbackDefinitionProtocol from airflow.stats import Stats -from airflow.triggers.deadline import PAYLOAD_BODY_KEY, PAYLOAD_STATUS_KEY, DeadlineCallbackTrigger from airflow.utils.log.logging_mixin import LoggingMixin from airflow.utils.session import provide_session from airflow.utils.sqlalchemy import UtcDateTime, get_dialect_name, mapped_column @@ -46,12 +40,11 @@ from sqlalchemy.orm import Session from sqlalchemy.sql import ColumnElement - from airflow.sdk.definitions.deadline import Callback - from airflow.triggers.base import TriggerEvent - logger = logging.getLogger(__name__) +CALLBACK_METRICS_PREFIX = "deadline_alerts" + class classproperty: """ @@ -80,19 +73,6 @@ def __get__(self, instance, cls=None): return self.method(cls) -class DeadlineCallbackState(str, Enum): - """ - All possible states of deadline callbacks once the deadline is missed. - - `None` state implies that the deadline is pending (`deadline_time` hasn't passed yet). - """ - - QUEUED = "queued" - RUNNING = "running" - SUCCESS = "success" - FAILED = "failed" - - class Deadline(Base): """A Deadline is a 'need-by' date which triggers a callback if the provided time has passed.""" @@ -104,37 +84,36 @@ class Deadline(Base): dagrun_id: Mapped[int | None] = mapped_column( Integer, ForeignKey("dag_run.id", ondelete="CASCADE"), nullable=True ) + dagrun = relationship("DagRun", back_populates="deadlines") # The time after which the Deadline has passed and the callback should be triggered. deadline_time: Mapped[datetime] = mapped_column(UtcDateTime, nullable=False) - # The (serialized) callback to be called when the Deadline has passed. - _callback: Mapped[dict[str, Any]] = mapped_column( - "callback", sqlalchemy_jsonfield.JSONField(json=json), nullable=False - ) - # The state of the deadline callback - callback_state: Mapped[str | None] = mapped_column(String(20), nullable=True) - dagrun = relationship("DagRun", back_populates="deadlines") + # Whether the deadline has been marked as missed by the scheduler + missed: Mapped[bool] = mapped_column(Boolean, nullable=False) - # The Trigger where the callback is running - trigger_id: Mapped[int | None] = mapped_column(Integer, ForeignKey("trigger.id"), nullable=True) - trigger = relationship("Trigger", back_populates="deadline") + # Callback that will run when this deadline is missed + callback_id: Mapped[str] = mapped_column( + UUIDType(binary=False), ForeignKey("callback.id", ondelete="CASCADE"), nullable=False + ) + callback = relationship("Callback", uselist=False, cascade="all, delete-orphan", single_parent=True) - __table_args__ = (Index("deadline_callback_state_time_idx", callback_state, deadline_time, unique=False),) + __table_args__ = (Index("deadline_missed_deadline_time_idx", missed, deadline_time, unique=False),) def __init__( self, deadline_time: datetime, - callback: Callback, + callback: CallbackDefinitionProtocol, dagrun_id: int, + dag_id: str | None = None, ): super().__init__() self.deadline_time = deadline_time - result = serialize(callback) - if not isinstance(result, dict): - raise TypeError("Expected callback to serialize into a dict") - self._callback: dict[str, Any] = result self.dagrun_id = dagrun_id + self.missed = False + self.callback = Callback.create_from_sdk_def( + callback_def=callback, prefix=CALLBACK_METRICS_PREFIX, dag_id=dag_id + ) def __repr__(self): def _determine_resource() -> tuple[str, str]: @@ -149,7 +128,7 @@ def _determine_resource() -> tuple[str, str]: return ( f"[{resource_type} Deadline] {resource_details} needed by " - f"{self.deadline_time} or run: {self.callback.path}({self.callback.kwargs or ''})" + f"{self.deadline_time} or run: {self.callback}" ) @classmethod @@ -207,69 +186,34 @@ def prune_deadlines(cls, *, session: Session, conditions: dict[Mapped, Any]) -> return deleted_count - @cached_property - def callback(self) -> Callback: - return cast("Callback", deserialize(self._callback)) - def handle_miss(self, session: Session): - """Handle a missed deadline by running the callback in the appropriate host and updating the `callback_state`.""" - from airflow.sdk.definitions.deadline import AsyncCallback, SyncCallback + """Handle a missed deadline by queueing the callback.""" def get_simple_context(): from airflow.api_fastapi.core_api.datamodels.dag_run import DAGRunResponse + from airflow.models import DagRun # TODO: Use the TaskAPI from within Triggerer to fetch full context instead of sending this context # from the scheduler + + # Fetch the DagRun from the database again to avoid errors when self.dagrun's relationship fields + # are not in the current session. + dagrun = session.get(DagRun, self.dagrun_id) + return { - "dag_run": DAGRunResponse.model_validate(self.dagrun).model_dump(mode="json"), + "dag_run": DAGRunResponse.model_validate(dagrun).model_dump(mode="json"), "deadline": {"id": self.id, "deadline_time": self.deadline_time}, } - if isinstance(self.callback, AsyncCallback): - callback_trigger = DeadlineCallbackTrigger( - callback_path=self.callback.path, - callback_kwargs=(self.callback.kwargs or {}) | {"context": get_simple_context()}, - ) - trigger_orm = Trigger.from_object(callback_trigger) - session.add(trigger_orm) - session.flush() - self.trigger = trigger_orm - - elif isinstance(self.callback, SyncCallback): - raise NotImplementedError("SyncCallback is currently not supported") - - else: - raise TypeError("Unknown Callback type") - - self.callback_state = DeadlineCallbackState.QUEUED + self.callback.data["kwargs"] = self.callback.data["kwargs"] | {"context": get_simple_context()} + self.missed = True + self.callback.queue() session.add(self) Stats.incr( "deadline_alerts.deadline_missed", tags={"dag_id": self.dagrun.dag_id, "dagrun_id": self.dagrun.run_id}, ) - def handle_callback_event(self, event: TriggerEvent, session: Session): - if (status := event.payload.get(PAYLOAD_STATUS_KEY)) and status in { - DeadlineCallbackState.SUCCESS, - DeadlineCallbackState.FAILED, - DeadlineCallbackState.RUNNING, - }: - self.callback_state = status - if status != DeadlineCallbackState.RUNNING: - self.trigger = None - metric_tags = { - "dag_id": self.dagrun.dag_id, - "callback": self._callback, - "result": event.payload.get(PAYLOAD_BODY_KEY), - } - if status == DeadlineCallbackState.FAILED: - Stats.incr("deadline_alerts.deadline_callback_failure", tags=metric_tags) - elif status == DeadlineCallbackState.SUCCESS: - Stats.incr("deadline_alerts.deadline_callback_success", tags=metric_tags) - session.add(self) - else: - logger.error("Unexpected event received: %s", event.payload) - class ReferenceModels: """ diff --git a/airflow-core/src/airflow/models/trigger.py b/airflow-core/src/airflow/models/trigger.py index cd337c6913972..fb84d6f8db530 100644 --- a/airflow-core/src/airflow/models/trigger.py +++ b/airflow-core/src/airflow/models/trigger.py @@ -31,6 +31,7 @@ from airflow._shared.timezones import timezone from airflow.assets.manager import AssetManager +from airflow.models import Callback from airflow.models.asset import AssetWatcherModel from airflow.models.base import Base from airflow.models.taskinstance import TaskInstance @@ -111,8 +112,6 @@ class Trigger(Base): callback = relationship("Callback", back_populates="trigger", uselist=False) - deadline = relationship("Deadline", back_populates="trigger", uselist=False) - def __init__( self, classpath: str, @@ -195,11 +194,9 @@ def bulk_fetch(cls, ids: Iterable[int], session: Session = NEW_SESSION) -> dict[ @classmethod @provide_session def fetch_trigger_ids_with_non_task_associations(cls, session: Session = NEW_SESSION) -> set[str]: - """Fetch all trigger IDs actively associated with non-task entities like assets and deadlines.""" - from airflow.models import Deadline - + """Fetch all trigger IDs actively associated with non-task entities like assets and callbacks.""" query = select(AssetWatcherModel.trigger_id).union_all( - select(Deadline.trigger_id).where(Deadline.trigger_id.is_not(None)) + select(Callback.trigger_id).where(Callback.trigger_id.is_not(None)) ) return set(session.scalars(query)) @@ -224,10 +221,10 @@ def clean_unused(cls, session: Session = NEW_SESSION) -> None: .values(trigger_id=None) ) - # Get all triggers that have no task instances, assets, or deadlines depending on them and delete them + # Get all triggers that have no task instances, assets, or callbacks depending on them and delete them ids = ( select(cls.id) - .where(~cls.assets.any(), ~cls.deadline.has()) + .where(~cls.assets.any(), ~cls.callback.has()) .join(TaskInstance, cls.id == TaskInstance.trigger_id, isouter=True) .group_by(cls.id) .having(func.count(TaskInstance.trigger_id) == 0) @@ -271,8 +268,8 @@ def submit_event(cls, trigger_id, event: TriggerEvent, session: Session = NEW_SE extra={"from_trigger": True, "payload": event.payload}, session=session, ) - if trigger.deadline: - trigger.deadline.handle_callback_event(event, session) + if trigger.callback: + trigger.callback.handle_event(event, session) @classmethod @provide_session @@ -372,11 +369,11 @@ def get_sorted_triggers(cls, capacity: int, alive_triggerer_ids: list[int] | Sel """ result: list[Row[Any]] = [] - # Add triggers associated to deadlines first, then tasks, then assets - # It prioritizes deadline triggers, then DAGs over event driven scheduling which is fair + # Add triggers associated to callbacks first, then tasks, then assets + # It prioritizes callbacks, then DAGs over event driven scheduling which is fair queries = [ - # Deadline triggers - select(cls.id).where(cls.deadline.has()).order_by(cls.created_date), + # Callback triggers + select(cls.id).where(cls.callback.has()).order_by(cls.created_date), # Task Instance triggers select(cls.id) .prefix_with("STRAIGHT_JOIN", dialect="mysql") diff --git a/airflow-core/src/airflow/serialization/serialized_objects.py b/airflow-core/src/airflow/serialization/serialized_objects.py index 939f99fce2e07..6eb5017a6affd 100644 --- a/airflow-core/src/airflow/serialization/serialized_objects.py +++ b/airflow-core/src/airflow/serialization/serialized_objects.py @@ -3313,6 +3313,7 @@ def create_dagrun( deadline_time=deadline_time, callback=deadline.callback, dagrun_id=orm_dagrun.id, + dag_id=orm_dagrun.dag_id, ) ) Stats.incr("deadline_alerts.deadline_created", tags={"dag_id": self.dag_id}) diff --git a/airflow-core/src/airflow/triggers/deadline.py b/airflow-core/src/airflow/triggers/callback.py similarity index 77% rename from airflow-core/src/airflow/triggers/deadline.py rename to airflow-core/src/airflow/triggers/callback.py index bd8a665b9fc62..c3f21d84faf68 100644 --- a/airflow-core/src/airflow/triggers/deadline.py +++ b/airflow-core/src/airflow/triggers/callback.py @@ -22,6 +22,7 @@ from collections.abc import AsyncIterator from typing import Any +from airflow.models.callback import CallbackState from airflow.triggers.base import BaseTrigger, TriggerEvent from airflow.utils.module_loading import import_string, qualname @@ -31,8 +32,8 @@ PAYLOAD_BODY_KEY = "body" -class DeadlineCallbackTrigger(BaseTrigger): - """Trigger that executes a deadline callback function asynchronously.""" +class CallbackTrigger(BaseTrigger): + """Trigger that executes a callback function asynchronously.""" def __init__(self, callback_path: str, callback_kwargs: dict[str, Any] | None = None): super().__init__() @@ -46,28 +47,26 @@ def serialize(self) -> tuple[str, dict[str, Any]]: ) async def run(self) -> AsyncIterator[TriggerEvent]: - from airflow.models.deadline import DeadlineCallbackState # to avoid cyclic imports - try: - yield TriggerEvent({PAYLOAD_STATUS_KEY: DeadlineCallbackState.RUNNING}) + yield TriggerEvent({PAYLOAD_STATUS_KEY: CallbackState.RUNNING}) callback = import_string(self.callback_path) # TODO: get full context and run template rendering. Right now, a simple context in included in `callback_kwargs` result = await callback(**self.callback_kwargs) - yield TriggerEvent({PAYLOAD_STATUS_KEY: DeadlineCallbackState.SUCCESS, PAYLOAD_BODY_KEY: result}) + yield TriggerEvent({PAYLOAD_STATUS_KEY: CallbackState.SUCCESS, PAYLOAD_BODY_KEY: result}) except Exception as e: if isinstance(e, ImportError): - message = "Failed to import this deadline callback on the triggerer" + message = "Failed to import the callable on the triggerer" elif isinstance(e, TypeError) and "await" in str(e): - message = "Failed to run this deadline callback because it is not awaitable" + message = "Failed to run the callable because it's not awaitable" else: - message = "An error occurred during execution of this deadline callback" + message = "An error occurred during execution of the callable" log.exception("%s: %s; kwargs: %s\n%s", message, self.callback_path, self.callback_kwargs, e) yield TriggerEvent( { - PAYLOAD_STATUS_KEY: DeadlineCallbackState.FAILED, + PAYLOAD_STATUS_KEY: CallbackState.FAILED, PAYLOAD_BODY_KEY: f"{message}: {traceback.format_exception(e)}", } ) diff --git a/airflow-core/src/airflow/utils/db.py b/airflow-core/src/airflow/utils/db.py index d969bccc43bbb..5455e7b08bbfb 100644 --- a/airflow-core/src/airflow/utils/db.py +++ b/airflow-core/src/airflow/utils/db.py @@ -112,7 +112,7 @@ class MappedClassProtocol(Protocol): "3.0.0": "29ce7909c52b", "3.0.3": "fe199e1abd77", "3.1.0": "cc92b33c6709", - "3.2.0": "b87d2135fa50", + "3.2.0": "e812941398f4", } diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py b/airflow-core/tests/unit/jobs/test_scheduler_job.py index 9fd022b744c4c..e74de5f8e63ce 100644 --- a/airflow-core/tests/unit/jobs/test_scheduler_job.py +++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py @@ -59,7 +59,7 @@ from airflow.models.dagrun import DagRun from airflow.models.dagwarning import DagWarning from airflow.models.db_callback_request import DbCallbackRequest -from airflow.models.deadline import Deadline, DeadlineCallbackState +from airflow.models.deadline import Deadline from airflow.models.log import Log from airflow.models.pool import Pool from airflow.models.serialized_dag import SerializedDagModel @@ -69,7 +69,7 @@ from airflow.providers.standard.operators.empty import EmptyOperator from airflow.providers.standard.triggers.temporal import DateTimeTrigger from airflow.sdk import DAG, Asset, AssetAlias, AssetWatcher, task -from airflow.sdk.definitions.deadline import AsyncCallback +from airflow.sdk.definitions.deadline import AsyncCallback, SyncCallback from airflow.serialization.serialized_objects import LazyDeserializedDAG, SerializedDAG from airflow.timetables.base import DataInterval from airflow.traces.tracer import Trace @@ -86,6 +86,7 @@ from tests_common.test_utils.db import ( clear_db_assets, clear_db_backfills, + clear_db_callbacks, clear_db_dag_bundles, clear_db_dags, clear_db_deadline, @@ -193,6 +194,7 @@ def clean_db(): clear_db_jobs() clear_db_assets() clear_db_deadline() + clear_db_callbacks() clear_db_triggers() @pytest.fixture(autouse=True) @@ -7046,28 +7048,49 @@ def test_process_expired_deadlines(self, mock_handle_miss, session, dag_maker): callback_path = "classpath.notify" # Create a test Dag run for Deadline - with dag_maker(dag_id="test_deadline_dag"): + dag_id = "test_deadline_dag" + with dag_maker(dag_id=dag_id): EmptyOperator(task_id="empty") dagrun_id = dag_maker.create_dagrun().id - handled_deadlines = [] - for state in DeadlineCallbackState: - deadline = Deadline( - deadline_time=past_date, callback=AsyncCallback(callback_path), dagrun_id=dagrun_id - ) - deadline.callback_state = state - handled_deadlines.append(deadline) + handled_deadline_async = Deadline( + deadline_time=past_date, + callback=AsyncCallback(callback_path), + dagrun_id=dagrun_id, + dag_id=dag_id, + ) + handled_deadline_async.missed = True + + handled_deadline_sync = Deadline( + deadline_time=past_date, + callback=SyncCallback(callback_path), + dagrun_id=dagrun_id, + dag_id=dag_id, + ) + handled_deadline_sync.missed = True + expired_deadline1 = Deadline( - deadline_time=past_date, callback=AsyncCallback(callback_path), dagrun_id=dagrun_id + deadline_time=past_date, callback=AsyncCallback(callback_path), dagrun_id=dagrun_id, dag_id=dag_id ) expired_deadline2 = Deadline( - deadline_time=past_date, callback=AsyncCallback(callback_path), dagrun_id=dagrun_id + deadline_time=past_date, callback=SyncCallback(callback_path), dagrun_id=dagrun_id, dag_id=dag_id ) future_deadline = Deadline( - deadline_time=future_date, callback=AsyncCallback(callback_path), dagrun_id=dagrun_id + deadline_time=future_date, + callback=AsyncCallback(callback_path), + dagrun_id=dagrun_id, + dag_id=dag_id, ) - session.add_all([expired_deadline1, expired_deadline2, future_deadline] + handled_deadlines) + session.add_all( + [ + expired_deadline1, + expired_deadline2, + future_deadline, + handled_deadline_async, + handled_deadline_sync, + ] + ) session.flush() self.job_runner._execute() diff --git a/airflow-core/tests/unit/models/test_callback.py b/airflow-core/tests/unit/models/test_callback.py index 7664dc6ab9b22..545361608382d 100644 --- a/airflow-core/tests/unit/models/test_callback.py +++ b/airflow-core/tests/unit/models/test_callback.py @@ -18,6 +18,7 @@ import pytest +from airflow.models import Trigger from airflow.models.callback import ( Callback, CallbackFetchMethod, @@ -26,8 +27,14 @@ TriggererCallback, ) from airflow.sdk.definitions.deadline import AsyncCallback, SyncCallback +from airflow.triggers.base import TriggerEvent +from airflow.triggers.callback import PAYLOAD_BODY_KEY, PAYLOAD_STATUS_KEY from airflow.utils.session import create_session +from tests_common.test_utils.db import clear_db_callbacks + +pytestmark = [pytest.mark.db_test] + async def async_callback(): """Empty awaitable callable used for unit tests.""" @@ -42,6 +49,7 @@ def sync_callback(): TEST_CALLBACK_KWARGS = {"arg1": "value1"} TEST_ASYNC_CALLBACK = AsyncCallback(async_callback, kwargs=TEST_CALLBACK_KWARGS) TEST_SYNC_CALLBACK = SyncCallback(sync_callback, kwargs=TEST_CALLBACK_KWARGS) +TEST_DAG_ID = "test_dag" @pytest.fixture @@ -51,6 +59,12 @@ def session(): yield session +@pytest.fixture(scope="module", autouse=True) +def clean_db(request): + yield + clear_db_callbacks() + + class TestCallback: @pytest.mark.parametrize( "callback_def, expected_cb_instance", @@ -72,7 +86,7 @@ def test_create_from_sdk_def(self, callback_def, expected_cb_instance): assert isinstance(returned_cb, type(expected_cb_instance)) assert returned_cb.data == expected_cb_instance.data - def test_test_create_from_sdk_def_unknown_type(self): + def test_create_from_sdk_def_unknown_type(self): """Test that unknown callback type raises ValueError""" class UnknownCallback: @@ -83,9 +97,21 @@ class UnknownCallback: with pytest.raises(ValueError, match="Cannot handle Callback of type"): Callback.create_from_sdk_def(unknown_callback) + def test_get_metric_info(self): + callback = TriggererCallback(TEST_ASYNC_CALLBACK, prefix="deadline_alerts", dag_id=TEST_DAG_ID) + callback.data["kwargs"] = {"context": {"dag_id": TEST_DAG_ID}, "email": "test@example.com"} + metric_info = callback.get_metric_info(CallbackState.SUCCESS, "0") + + assert metric_info["stat"] == "deadline_alerts.callback_success" + assert metric_info["tags"] == { + "result": "0", + "path": TEST_ASYNC_CALLBACK.path, + "kwargs": {"email": "test@example.com"}, + "dag_id": TEST_DAG_ID, + } + class TestTriggererCallback: - @pytest.mark.db_test def test_polymorphic_serde(self, session): """Test that TriggererCallback can be serialized and deserialized""" callback = TriggererCallback(TEST_ASYNC_CALLBACK) @@ -102,16 +128,60 @@ def test_polymorphic_serde(self, session): assert retrieved.created_at is not None assert retrieved.trigger_id is None - def test_queue(self): + def test_queue(self, session): callback = TriggererCallback(TEST_ASYNC_CALLBACK) assert callback.state == CallbackState.PENDING + assert callback.trigger is None callback.queue() + assert isinstance(callback.trigger, Trigger) + assert callback.trigger.kwargs["callback_path"] == TEST_ASYNC_CALLBACK.path + assert callback.trigger.kwargs["callback_kwargs"] == TEST_ASYNC_CALLBACK.kwargs assert callback.state == CallbackState.QUEUED + @pytest.mark.parametrize( + "event, terminal_state", + [ + pytest.param( + TriggerEvent({PAYLOAD_STATUS_KEY: CallbackState.SUCCESS, PAYLOAD_BODY_KEY: "test_result"}), + True, + id="success_event", + ), + pytest.param( + TriggerEvent({PAYLOAD_STATUS_KEY: CallbackState.FAILED, PAYLOAD_BODY_KEY: "RuntimeError"}), + True, + id="failed_event", + ), + pytest.param( + TriggerEvent({PAYLOAD_STATUS_KEY: CallbackState.RUNNING}), + False, + id="running_event", + ), + pytest.param( + TriggerEvent({PAYLOAD_STATUS_KEY: CallbackState.QUEUED, PAYLOAD_BODY_KEY: ""}), + False, + id="invalid_event", + ), + pytest.param(TriggerEvent({PAYLOAD_STATUS_KEY: "unknown_state"}), False, id="unknown_event"), + ], + ) + def test_handle_event(self, session, event, terminal_state): + callback = TriggererCallback(TEST_ASYNC_CALLBACK) + callback.queue() + callback.handle_event(event, session) + + status = event.payload[PAYLOAD_STATUS_KEY] + if status in set(CallbackState): + assert callback.state == status + else: + assert callback.state == CallbackState.QUEUED + + if terminal_state: + assert callback.trigger is None + assert callback.output == event.payload[PAYLOAD_BODY_KEY] + class TestExecutorCallback: - @pytest.mark.db_test def test_polymorphic_serde(self, session): """Test that ExecutorCallback can be serialized and deserialized""" callback = ExecutorCallback(TEST_SYNC_CALLBACK, fetch_method=CallbackFetchMethod.IMPORT_PATH) diff --git a/airflow-core/tests/unit/models/test_deadline.py b/airflow-core/tests/unit/models/test_deadline.py index 4ce17772cde24..9642a21f3cbfb 100644 --- a/airflow-core/tests/unit/models/test_deadline.py +++ b/airflow-core/tests/unit/models/test_deadline.py @@ -25,12 +25,10 @@ from sqlalchemy.exc import SQLAlchemyError from airflow.api_fastapi.core_api.datamodels.dag_run import DAGRunResponse -from airflow.models import DagRun, Trigger -from airflow.models.deadline import Deadline, DeadlineCallbackState, ReferenceModels, _fetch_from_db +from airflow.models import DagRun +from airflow.models.deadline import Deadline, ReferenceModels, _fetch_from_db from airflow.providers.standard.operators.empty import EmptyOperator from airflow.sdk.definitions.deadline import AsyncCallback, DeadlineReference, SyncCallback -from airflow.triggers.base import TriggerEvent -from airflow.triggers.deadline import PAYLOAD_BODY_KEY, PAYLOAD_STATUS_KEY from airflow.utils.state import DagRunState from tests_common.test_utils import db @@ -114,7 +112,7 @@ def teardown_method(): id="multiple_conditions", ), pytest.param( - {Deadline.dagrun_id: "valid_placeholder", Deadline.callback_state: "invalid"}, + {Deadline.dagrun_id: "valid_placeholder", Deadline.callback: None}, id="mixed_conditions", ), ], @@ -141,127 +139,39 @@ def test_prune_deadlines(self, mock_session, conditions, dagrun): else: mock_session.query.assert_not_called() - def test_repr_with_callback_kwargs(self, deadline_orm, dagrun): + def test_repr(self, deadline_orm, dagrun): assert ( repr(deadline_orm) == f"[DagRun Deadline] Dag: {DAG_ID} Run: {dagrun.id} needed by " - f"{DEFAULT_DATE} or run: {TEST_CALLBACK_PATH}({TEST_CALLBACK_KWARGS})" - ) - - def test_repr_without_callback_kwargs(self, deadline_orm, dagrun, session): - deadline_orm = Deadline( - deadline_time=DEFAULT_DATE, - callback=AsyncCallback(TEST_CALLBACK_PATH), - dagrun_id=dagrun.id, - ) - session.add(deadline_orm) - session.flush() - - assert deadline_orm.callback.kwargs is None - assert ( - repr(deadline_orm) == f"[DagRun Deadline] Dag: {DAG_ID} Run: {dagrun.id} needed by " - f"{DEFAULT_DATE} or run: {TEST_CALLBACK_PATH}()" + f"{DEFAULT_DATE} or run: {deadline_orm.callback}" ) @pytest.mark.db_test - @pytest.mark.parametrize( - "kwargs", - [ - pytest.param(TEST_CALLBACK_KWARGS, id="non-empty kwargs"), - pytest.param(None, id="null kwargs"), - ], - ) - def test_handle_miss_async_callback(self, dagrun, session, kwargs): + def test_handle_miss(self, dagrun, session): deadline_orm = Deadline( deadline_time=DEFAULT_DATE, - callback=AsyncCallback(TEST_CALLBACK_PATH, kwargs), + callback=AsyncCallback(TEST_CALLBACK_PATH, TEST_CALLBACK_KWARGS), dagrun_id=dagrun.id, + dag_id=dagrun.dag_id, ) session.add(deadline_orm) session.flush() - deadline_orm.handle_miss(session=session) - session.flush() + assert not deadline_orm.missed - assert deadline_orm.trigger_id is not None - trigger = session.query(Trigger).filter(Trigger.id == deadline_orm.trigger_id).one() - assert trigger is not None + with mock.patch.object(deadline_orm.callback, "queue") as mock_queue: + deadline_orm.handle_miss(session) + session.flush() + mock_queue.assert_called_once() - assert trigger.kwargs["callback_path"] == TEST_CALLBACK_PATH + assert deadline_orm.missed - trigger_kwargs = trigger.kwargs["callback_kwargs"] - context = trigger_kwargs.pop("context") - assert trigger_kwargs == (kwargs or {}) + callback_kwargs = deadline_orm.callback.data["kwargs"] + context = callback_kwargs.pop("context") + assert callback_kwargs == TEST_CALLBACK_KWARGS - assert context["deadline"]["id"] == str(deadline_orm.id) + assert context["deadline"]["id"] == deadline_orm.id assert context["deadline"]["deadline_time"].timestamp() == deadline_orm.deadline_time.timestamp() assert context["dag_run"] == DAGRunResponse.model_validate(dagrun).model_dump(mode="json") - @pytest.mark.db_test - def test_handle_miss_sync_callback(self, dagrun, session): - deadline_orm = Deadline( - deadline_time=DEFAULT_DATE, - callback=TEST_SYNC_CALLBACK, - dagrun_id=dagrun.id, - ) - session.add(deadline_orm) - session.flush() - - with pytest.raises(NotImplementedError): - deadline_orm.handle_miss(session=session) - session.flush() - assert deadline_orm.trigger_id is None - - @pytest.mark.db_test - @pytest.mark.parametrize( - "event, none_trigger_expected", - [ - pytest.param( - TriggerEvent( - {PAYLOAD_STATUS_KEY: DeadlineCallbackState.SUCCESS, PAYLOAD_BODY_KEY: "test_result"} - ), - True, - id="success_event", - ), - pytest.param( - TriggerEvent( - {PAYLOAD_STATUS_KEY: DeadlineCallbackState.FAILED, PAYLOAD_BODY_KEY: "RuntimeError"} - ), - True, - id="failed_event", - ), - pytest.param( - TriggerEvent({PAYLOAD_STATUS_KEY: DeadlineCallbackState.RUNNING}), - False, - id="running_event", - ), - pytest.param( - TriggerEvent({PAYLOAD_STATUS_KEY: DeadlineCallbackState.QUEUED, PAYLOAD_BODY_KEY: ""}), - False, - id="invalid_event", - ), - pytest.param(TriggerEvent({PAYLOAD_STATUS_KEY: "unknown_state"}), False, id="unknown_event"), - ], - ) - def test_handle_callback_event(self, dagrun, deadline_orm, session, event, none_trigger_expected): - deadline_orm.handle_miss(session=session) - session.flush() - - deadline_orm.handle_callback_event(event, session) - session.flush() - - assert none_trigger_expected == (deadline_orm.trigger is None) - - status = event.payload[PAYLOAD_STATUS_KEY] - if status in set(DeadlineCallbackState): - assert deadline_orm.callback_state == status - else: - assert deadline_orm.callback_state == DeadlineCallbackState.QUEUED - - def test_handle_miss_sets_callback_state(self, dagrun, deadline_orm, session): - """Test that handle_miss sets the callback state to QUEUED.""" - deadline_orm.handle_miss(session) - - assert deadline_orm.callback_state == DeadlineCallbackState.QUEUED - @pytest.mark.db_test class TestCalculatedDeadlineDatabaseCalls: diff --git a/airflow-core/tests/unit/models/test_trigger.py b/airflow-core/tests/unit/models/test_trigger.py index cfabf0e505eb5..1f2dd60d18796 100644 --- a/airflow-core/tests/unit/models/test_trigger.py +++ b/airflow-core/tests/unit/models/test_trigger.py @@ -30,8 +30,9 @@ from airflow._shared.timezones import timezone from airflow.jobs.job import Job from airflow.jobs.triggerer_job_runner import TriggererJobRunner -from airflow.models import Deadline, TaskInstance, Trigger +from airflow.models import TaskInstance, Trigger from airflow.models.asset import AssetEvent, AssetModel, AssetWatcherModel +from airflow.models.callback import Callback, TriggererCallback from airflow.models.xcom import XComModel from airflow.providers.standard.operators.empty import EmptyOperator from airflow.sdk.definitions.deadline import AsyncCallback @@ -47,7 +48,6 @@ from airflow.utils.state import State from tests_common.test_utils.config import conf_vars -from unit.models import DEFAULT_DATE pytestmark = pytest.mark.db_test @@ -63,7 +63,7 @@ def session(): def clear_db(session): session.query(TaskInstance).delete() session.query(AssetWatcherModel).delete() - session.query(Deadline).delete() + session.query(Callback).delete() session.query(Trigger).delete() session.query(AssetModel).delete() session.query(AssetEvent).delete() @@ -71,7 +71,7 @@ def clear_db(session): yield session session.query(TaskInstance).delete() session.query(AssetWatcherModel).delete() - session.query(Deadline).delete() + session.query(Callback).delete() session.query(Trigger).delete() session.query(AssetModel).delete() session.query(AssetEvent).delete() @@ -79,20 +79,18 @@ def clear_db(session): session.commit() -def test_fetch_trigger_ids_with_non_task_associations(session, create_task_instance): +def test_fetch_trigger_ids_with_non_task_associations(session): # Create triggers asset_trigger = Trigger(classpath="airflow.triggers.testing.SuccessTrigger1", kwargs={}) - deadline_trigger = Trigger(classpath="airflow.triggers.testing.SuccessTrigger2", kwargs={}) + callback_trigger = Trigger(classpath="airflow.triggers.testing.SuccessTrigger2", kwargs={}) other_trigger = Trigger(classpath="airflow.triggers.testing.SuccessTrigger3", kwargs={}) - session.add_all([asset_trigger, deadline_trigger, other_trigger]) + session.add_all([asset_trigger, callback_trigger, other_trigger]) + session.commit() - # Create deadline association - dagrun_id = create_task_instance().dag_run.id - deadline = Deadline( - deadline_time=DEFAULT_DATE, callback=AsyncCallback("classpath.log.error"), dagrun_id=dagrun_id - ) - deadline.trigger = deadline_trigger - session.add(deadline) + # Create callback association + callback = TriggererCallback(callback_def=AsyncCallback("classpath.log.error")) + callback.trigger = callback_trigger + session.add(callback) # Create asset association asset = AssetModel("test") @@ -101,7 +99,7 @@ def test_fetch_trigger_ids_with_non_task_associations(session, create_task_insta session.commit() results = Trigger.fetch_trigger_ids_with_non_task_associations() - assert results == {asset_trigger.id, deadline_trigger.id} + assert results == {asset_trigger.id, callback_trigger.id} def test_clean_unused(session, create_task_instance): @@ -154,14 +152,12 @@ def test_clean_unused(session, create_task_instance): session.commit() assert session.query(AssetModel).count() == 1 - # Create deadline with trigger - deadline = Deadline( - deadline_time=DEFAULT_DATE, - callback=AsyncCallback("classpath.callback"), - dagrun_id=task_instance.dag_run.id, + # Create callback with trigger + callback = TriggererCallback( + callback_def=AsyncCallback("classpath.callback"), ) - deadline.trigger = trigger6 - session.add(deadline) + callback.trigger = trigger6 + session.add(callback) session.commit() # Run clear operation @@ -171,11 +167,11 @@ def test_clean_unused(session, create_task_instance): assert {result.id for result in results} == {trigger1.id, trigger4.id, trigger5.id, trigger6.id} -@patch.object(Deadline, "handle_callback_event") -def test_submit_event(mock_deadline_submit_event, session, create_task_instance): +@patch.object(TriggererCallback, "handle_event") +def test_submit_event(mock_callback_handle_event, session, create_task_instance): """ Tests that events submitted to a trigger re-wake their dependent - task instances and notify associated assets and deadlines. + task instances and notify associated assets and callbacks. """ # Make a trigger trigger = Trigger(classpath="airflow.triggers.testing.SuccessTrigger", kwargs={}) @@ -191,14 +187,12 @@ def test_submit_event(mock_deadline_submit_event, session, create_task_instance) asset.add_trigger(trigger, "test_asset_watcher") session.add(asset) - # Create a deadline with the same trigger - deadline = Deadline( - deadline_time=DEFAULT_DATE, - callback=AsyncCallback("classpath.callback"), - dagrun_id=task_instance.dag_run.id, + # Create a callback with the same trigger + callback = TriggererCallback( + callback_def=AsyncCallback("classpath.callback"), ) - deadline.trigger = trigger - session.add(deadline) + callback.trigger = trigger + session.add(callback) session.commit() # Check that the asset has 0 event prior to sending an event to the trigger @@ -220,8 +214,8 @@ def test_submit_event(mock_deadline_submit_event, session, create_task_instance) asset_event = session.query(AssetEvent).filter_by(asset_id=asset.id).first() assert asset_event.extra == {"from_trigger": True, "payload": payload} - # Check that the deadline's handle_callback_event was called - mock_deadline_submit_event.assert_called_once_with(event, session) + # Check that the callback's handle_event was called + mock_callback_handle_event.assert_called_once_with(event, session) def test_submit_failure(session, create_task_instance): @@ -452,31 +446,29 @@ def test_get_sorted_triggers_same_priority_weight(session, create_task_instance) created_date=new_logical_date, ) session.add(trigger_asset) - trigger_deadline = Trigger( - classpath="airflow.triggers.testing.TriggerDeadline", + trigger_callback = Trigger( + classpath="airflow.triggers.testing.TriggerCallback", kwargs={}, created_date=new_logical_date, ) - session.add(trigger_deadline) + session.add(trigger_callback) session.commit() assert session.query(Trigger).count() == 5 # Create assets asset = AssetModel("test") asset.add_trigger(trigger_asset, "test_asset_watcher") session.add(asset) - # Create deadline with trigger - deadline = Deadline( - deadline_time=DEFAULT_DATE, callback=AsyncCallback("classpath.callback"), dagrun_id=TI_old.dag_run.id - ) - deadline.trigger = trigger_deadline - session.add(deadline) + # Create callback with trigger + callback = TriggererCallback(callback_def=AsyncCallback("classpath.callback")) + callback.trigger = trigger_callback + session.add(callback) session.commit() trigger_ids_query = Trigger.get_sorted_triggers(capacity=100, alive_triggerer_ids=[], session=session) - # Deadline triggers should be first, followed by task triggers, then asset triggers + # Callback triggers should be first, followed by task triggers, then asset triggers assert trigger_ids_query == [ - (trigger_deadline.id,), + (trigger_callback.id,), (trigger_old.id,), (trigger_new.id,), (trigger_asset.id,), diff --git a/airflow-core/tests/unit/triggers/test_deadline.py b/airflow-core/tests/unit/triggers/test_callback.py similarity index 79% rename from airflow-core/tests/unit/triggers/test_deadline.py rename to airflow-core/tests/unit/triggers/test_callback.py index 955b6cb49c0ff..9155aea9207ee 100644 --- a/airflow-core/tests/unit/triggers/test_deadline.py +++ b/airflow-core/tests/unit/triggers/test_callback.py @@ -21,14 +21,14 @@ import pytest -from airflow.models.deadline import DeadlineCallbackState +from airflow.models.callback import CallbackState from airflow.sdk import BaseNotifier -from airflow.triggers.deadline import PAYLOAD_BODY_KEY, PAYLOAD_STATUS_KEY, DeadlineCallbackTrigger +from airflow.triggers.callback import PAYLOAD_BODY_KEY, PAYLOAD_STATUS_KEY, CallbackTrigger TEST_MESSAGE = "test_message" -TEST_CALLBACK_PATH = "classpath.test_callback_for_deadline" +TEST_CALLBACK_PATH = "classpath.test_callback" TEST_CALLBACK_KWARGS = {"message": TEST_MESSAGE, "context": {"dag_run": "test"}} -TEST_TRIGGER = DeadlineCallbackTrigger(callback_path=TEST_CALLBACK_PATH, callback_kwargs=TEST_CALLBACK_KWARGS) +TEST_TRIGGER = CallbackTrigger(callback_path=TEST_CALLBACK_PATH, callback_kwargs=TEST_CALLBACK_KWARGS) class ExampleAsyncNotifier(BaseNotifier): @@ -45,10 +45,10 @@ def notify(self, context): return f"Sync notification: {self.message}, context: {context}" -class TestDeadlineCallbackTrigger: +class TestCallbackTrigger: @pytest.fixture def mock_import_string(self): - with mock.patch("airflow.triggers.deadline.import_string") as m: + with mock.patch("airflow.triggers.callback.import_string") as m: yield m @pytest.mark.parametrize( @@ -59,13 +59,13 @@ def mock_import_string(self): ], ) def test_serialization(self, callback_init_kwargs, expected_serialized_kwargs): - trigger = DeadlineCallbackTrigger( + trigger = CallbackTrigger( callback_path=TEST_CALLBACK_PATH, callback_kwargs=callback_init_kwargs, ) classpath, kwargs = trigger.serialize() - assert classpath == "airflow.triggers.deadline.DeadlineCallbackTrigger" + assert classpath == "airflow.triggers.callback.CallbackTrigger" assert kwargs == { "callback_path": TEST_CALLBACK_PATH, "callback_kwargs": expected_serialized_kwargs, @@ -81,12 +81,12 @@ async def test_run_success_with_async_function(self, mock_import_string): trigger_gen = TEST_TRIGGER.run() running_event = await anext(trigger_gen) - assert running_event.payload[PAYLOAD_STATUS_KEY] == DeadlineCallbackState.RUNNING + assert running_event.payload[PAYLOAD_STATUS_KEY] == CallbackState.RUNNING success_event = await anext(trigger_gen) mock_import_string.assert_called_once_with(TEST_CALLBACK_PATH) mock_callback.assert_called_once_with(**TEST_CALLBACK_KWARGS) - assert success_event.payload[PAYLOAD_STATUS_KEY] == DeadlineCallbackState.SUCCESS + assert success_event.payload[PAYLOAD_STATUS_KEY] == CallbackState.SUCCESS assert success_event.payload[PAYLOAD_BODY_KEY] == callback_return_value @pytest.mark.asyncio @@ -97,11 +97,11 @@ async def test_run_success_with_notifier(self, mock_import_string): trigger_gen = TEST_TRIGGER.run() running_event = await anext(trigger_gen) - assert running_event.payload[PAYLOAD_STATUS_KEY] == DeadlineCallbackState.RUNNING + assert running_event.payload[PAYLOAD_STATUS_KEY] == CallbackState.RUNNING success_event = await anext(trigger_gen) mock_import_string.assert_called_once_with(TEST_CALLBACK_PATH) - assert success_event.payload[PAYLOAD_STATUS_KEY] == DeadlineCallbackState.SUCCESS + assert success_event.payload[PAYLOAD_STATUS_KEY] == CallbackState.SUCCESS assert ( success_event.payload[PAYLOAD_BODY_KEY] == f"Async notification: {TEST_MESSAGE}, context: {{'dag_run': 'test'}}" @@ -116,10 +116,10 @@ async def test_run_failure(self, mock_import_string): trigger_gen = TEST_TRIGGER.run() running_event = await anext(trigger_gen) - assert running_event.payload[PAYLOAD_STATUS_KEY] == DeadlineCallbackState.RUNNING + assert running_event.payload[PAYLOAD_STATUS_KEY] == CallbackState.RUNNING failure_event = await anext(trigger_gen) mock_import_string.assert_called_once_with(TEST_CALLBACK_PATH) mock_callback.assert_called_once_with(**TEST_CALLBACK_KWARGS) - assert failure_event.payload[PAYLOAD_STATUS_KEY] == DeadlineCallbackState.FAILED + assert failure_event.payload[PAYLOAD_STATUS_KEY] == CallbackState.FAILED assert all(s in failure_event.payload[PAYLOAD_BODY_KEY] for s in ["raise", "RuntimeError", exc_msg]) diff --git a/devel-common/src/tests_common/test_utils/db.py b/devel-common/src/tests_common/test_utils/db.py index 6f6e1c3e86d2f..29cd4eeff5367 100644 --- a/devel-common/src/tests_common/test_utils/db.py +++ b/devel-common/src/tests_common/test_utils/db.py @@ -326,7 +326,13 @@ def clear_db_dag_code(): def clear_db_callbacks(): with create_session() as session: - session.query(DbCallbackRequest).delete() + if AIRFLOW_V_3_2_PLUS: + from airflow.models.callback import Callback + + session.query(Callback).delete() + + else: + session.query(DbCallbackRequest).delete() def set_default_pool_slots(slots): diff --git a/task-sdk/src/airflow/sdk/definitions/deadline.py b/task-sdk/src/airflow/sdk/definitions/deadline.py index 1f31dd7ace5ce..7d209d722e6c1 100644 --- a/task-sdk/src/airflow/sdk/definitions/deadline.py +++ b/task-sdk/src/airflow/sdk/definitions/deadline.py @@ -121,13 +121,13 @@ class Callback(ABC): """ path: str - kwargs: dict | None + kwargs: dict def __init__(self, callback_callable: Callable | str, kwargs: dict[str, Any] | None = None): self.path = self.get_callback_path(callback_callable) if kwargs and "context" in kwargs: raise ValueError("context is a reserved kwarg for this class") - self.kwargs = kwargs + self.kwargs = kwargs or {} @classmethod def get_callback_path(cls, _callback: str | Callable) -> str: @@ -192,7 +192,7 @@ def __hash__(self): serialized = self.serialize() hashable_items = [] for k, v in serialized.items(): - if isinstance(v, dict) and v: + if isinstance(v, dict): hashable_items.append((k, tuple(sorted(v.items())))) else: hashable_items.append((k, v))