diff --git a/airflow-core/docs/authoring-and-scheduling/deferring.rst b/airflow-core/docs/authoring-and-scheduling/deferring.rst index 1c72a3d1f8356..1cfa238414462 100644 --- a/airflow-core/docs/authoring-and-scheduling/deferring.rst +++ b/airflow-core/docs/authoring-and-scheduling/deferring.rst @@ -468,6 +468,71 @@ According to `benchmarks `` to the Triggerers' startup command. This option ensures the triggerer will only pick up ``trigger`` instances deferred by tasks from the specified task queue(s). + +For example, let's say you are running two triggerer hosts (labeled "X", and "Y") with the following commands: + +.. code-block:: bash + + # triggerer "X" startup command + airflow triggerer --queues=alice,bob + # triggerer "Y" startup command + airflow triggerer --queues=test_q + +In this scenario, triggerer "X" will exclusively run triggers deferred from tasks originating from task queues ``"alice"`` or ``"bob"``. +Similarly, triggerer "Y" will exclusively run triggers deferred from tasks originating from task queue ``"test_q"``. + +Trigger Queue Assignment Caveats +'''''''''''''''''''''''''''''''' + +This feature is only compatible with executors which utilize the task ``queue`` concept +(such as the :ref:`CeleryExecutor`). + +Additionally, queue assignment is currently only compatible with the subset of triggers originating from a task's defer ``method``. + ++------------------------------------------------------------------------------------------------------+----------------------+----------------------------------------------------------------------------------+ +| Trigger Type | Supports queues? | Triggerer assignment when :ref:`config:triggerer__queues_enabled` is ``True`` | ++======================================================================================================+======================+==================================================================================+ +| Task-created Trigger instances | Yes | Any triggerer with the the task queue present in its ``--queues`` option | ++------------------------------------------------------------------------------------------------------+----------------------+----------------------------------------------------------------------------------+ +| :doc:`Event-Driven Triggers<../authoring-and-scheduling/event-scheduling>` | No | Any triggerer running without the ``--queues`` option | ++------------------------------------------------------------------------------------------------------+----------------------+----------------------------------------------------------------------------------+ +| Triggers from async :doc:`Callbacks<../administration-and-deployment/logging-monitoring/callbacks>` | No | Any triggerer running without the ``--queues`` option | ++------------------------------------------------------------------------------------------------------+----------------------+----------------------------------------------------------------------------------+ + +If you use queues for task-based triggers, while **also** using event-based triggers and/or callback triggers, +you must run one or more triggerer hosts **without** the ``--queues`` option, so the latter 2 types of triggers are still run. + +.. note:: + To enable trigger queues, you must set the ``--queues`` option on one or more triggerers' startup command (these values may differ between the various triggerers). + If you set the ``--queue`` value of a triggerer to some value which no task queues exist for, that triggerer will never run any triggers. + Similarly, all ``triggerer`` instances running without the ``--queues`` option will only consume event-driven and callback-based triggers. + + +The following example shows how to run HA triggerers so that all trigger types are run (assuming all tasks' +queue values are set to either ``"team_A"`` or ``"team_B"``): + +.. code-block:: bash + + # triggerer "A" startup command, only consumes triggers registered by tasks in queue "team_A" + airflow triggerer --queues=team_A + # triggerer "B" startup command, only consumes triggers registered by tasks in queue "team_B" + airflow triggerer --queues=team_B + # triggerer "C" startup command, consumes only event-based triggers and callback-based triggers. + airflow triggerer Difference between Mode='reschedule' and Deferrable=True in Sensors diff --git a/airflow-core/docs/img/airflow_erd.sha256 b/airflow-core/docs/img/airflow_erd.sha256 index 98f8adadd86af..5215cded803b9 100644 --- a/airflow-core/docs/img/airflow_erd.sha256 +++ b/airflow-core/docs/img/airflow_erd.sha256 @@ -1 +1 @@ -1c79db933fe961f2d23605e1dcf73125923ea39bbd719c800e8adc1aa5bedb52 \ No newline at end of file +26217aaa3d4efa7c2b9cc4687e713024649729e0d23ce36ddc4ad84754ae4f1d \ No newline at end of file diff --git a/airflow-core/docs/img/airflow_erd.svg b/airflow-core/docs/img/airflow_erd.svg index fd0e44ec76b95..002ebdf7d9408 100644 --- a/airflow-core/docs/img/airflow_erd.svg +++ b/airflow-core/docs/img/airflow_erd.svg @@ -4,552 +4,552 @@ - - + + %3 - + dag_priority_parsing_request - -dag_priority_parsing_request - -id - - [VARCHAR(32)] - NOT NULL - -bundle_name - - [VARCHAR(250)] - NOT NULL - -relative_fileloc - - [VARCHAR(2000)] - NOT NULL + +dag_priority_parsing_request + +id + + [VARCHAR(32)] + NOT NULL + +bundle_name + + [VARCHAR(250)] + NOT NULL + +relative_fileloc + + [VARCHAR(2000)] + NOT NULL job - -job - -id - - [INTEGER] - NOT NULL - -dag_id - - [VARCHAR(250)] - -end_date - - [TIMESTAMP] - -executor_class - - [VARCHAR(500)] - -hostname - - [VARCHAR(500)] - -job_type - - [VARCHAR(30)] - -latest_heartbeat - - [TIMESTAMP] - -start_date - - [TIMESTAMP] - -state - - [VARCHAR(20)] - -unixname - - [VARCHAR(1000)] + +job + +id + + [INTEGER] + NOT NULL + +dag_id + + [VARCHAR(250)] + +end_date + + [TIMESTAMP] + +executor_class + + [VARCHAR(500)] + +hostname + + [VARCHAR(500)] + +job_type + + [VARCHAR(30)] + +latest_heartbeat + + [TIMESTAMP] + +start_date + + [TIMESTAMP] + +state + + [VARCHAR(20)] + +unixname + + [VARCHAR(1000)] log - -log - -id - - [INTEGER] - NOT NULL - -dag_id - - [VARCHAR(250)] - -dttm - - [TIMESTAMP] - -event - - [VARCHAR(60)] - NOT NULL - -extra - - [TEXT] - -logical_date - - [TIMESTAMP] - -map_index - - [INTEGER] - -owner - - [VARCHAR(500)] - -owner_display_name - - [VARCHAR(500)] - -run_id - - [VARCHAR(250)] - -task_id - - [VARCHAR(250)] - -try_number - - [INTEGER] + +log + +id + + [INTEGER] + NOT NULL + +dag_id + + [VARCHAR(250)] + +dttm + + [TIMESTAMP] + +event + + [VARCHAR(60)] + NOT NULL + +extra + + [TEXT] + +logical_date + + [TIMESTAMP] + +map_index + + [INTEGER] + +owner + + [VARCHAR(500)] + +owner_display_name + + [VARCHAR(500)] + +run_id + + [VARCHAR(250)] + +task_id + + [VARCHAR(250)] + +try_number + + [INTEGER] partitioned_asset_key_log - -partitioned_asset_key_log - -id - - [INTEGER] - NOT NULL - -asset_event_id - - [INTEGER] - NOT NULL - -asset_id - - [INTEGER] - NOT NULL - -asset_partition_dag_run_id - - [INTEGER] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL - -source_partition_key - - [VARCHAR(250)] - NOT NULL - -target_dag_id - - [VARCHAR(250)] - NOT NULL - -target_partition_key - - [VARCHAR(250)] - NOT NULL + +partitioned_asset_key_log + +id + + [INTEGER] + NOT NULL + +asset_event_id + + [INTEGER] + NOT NULL + +asset_id + + [INTEGER] + NOT NULL + +asset_partition_dag_run_id + + [INTEGER] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL + +source_partition_key + + [VARCHAR(250)] + NOT NULL + +target_dag_id + + [VARCHAR(250)] + NOT NULL + +target_partition_key + + [VARCHAR(250)] + NOT NULL import_error - -import_error - -id - - [INTEGER] - NOT NULL - -bundle_name - - [VARCHAR(250)] - -filename - - [VARCHAR(1024)] - -stacktrace - - [TEXT] - -timestamp - - [TIMESTAMP] + +import_error + +id + + [INTEGER] + NOT NULL + +bundle_name + + [VARCHAR(250)] + +filename + + [VARCHAR(1024)] + +stacktrace + + [TEXT] + +timestamp + + [TIMESTAMP] dag_bundle - -dag_bundle - -name - - [VARCHAR(250)] - NOT NULL - -active - - [BOOLEAN] - -last_refreshed - - [TIMESTAMP] - -signed_url_template - - [VARCHAR(200)] - -template_params - - [JSON] - -version - - [VARCHAR(200)] + +dag_bundle + +name + + [VARCHAR(250)] + NOT NULL + +active + + [BOOLEAN] + +last_refreshed + + [TIMESTAMP] + +signed_url_template + + [VARCHAR(200)] + +template_params + + [JSON] + +version + + [VARCHAR(200)] dag_bundle_team - -dag_bundle_team - -dag_bundle_name - - [VARCHAR(250)] - NOT NULL - -team_name - - [VARCHAR(50)] - NOT NULL + +dag_bundle_team + +dag_bundle_name + + [VARCHAR(250)] + NOT NULL + +team_name + + [VARCHAR(50)] + NOT NULL dag_bundle:name--dag_bundle_team:dag_bundle_name - -0..N -1 + +0..N +1 dag - -dag - -dag_id - - [VARCHAR(250)] - NOT NULL - -asset_expression - - [JSON] - -bundle_name - - [VARCHAR(250)] - NOT NULL - -bundle_version - - [VARCHAR(200)] - -dag_display_name - - [VARCHAR(2000)] - -deadline - - [JSON] - -description - - [TEXT] - -fail_fast - - [BOOLEAN] - NOT NULL - -fileloc - - [VARCHAR(2000)] - -has_import_errors - - [BOOLEAN] - -has_task_concurrency_limits - - [BOOLEAN] - NOT NULL - -is_paused - - [BOOLEAN] - -is_stale - - [BOOLEAN] - NOT NULL - -last_expired - - [TIMESTAMP] - -last_parse_duration - - [DOUBLE_PRECISION] - -last_parsed_time - - [TIMESTAMP] - -max_active_runs - - [INTEGER] - -max_active_tasks - - [INTEGER] - NOT NULL - -max_consecutive_failed_dag_runs - - [INTEGER] - NOT NULL - -next_dagrun - - [TIMESTAMP] - -next_dagrun_create_after - - [TIMESTAMP] - -next_dagrun_data_interval_end - - [TIMESTAMP] - -next_dagrun_data_interval_start - - [TIMESTAMP] - -owners - - [VARCHAR(2000)] - -relative_fileloc - - [VARCHAR(2000)] - -timetable_description - - [VARCHAR(1000)] - -timetable_summary - - [TEXT] + +dag + +dag_id + + [VARCHAR(250)] + NOT NULL + +asset_expression + + [JSON] + +bundle_name + + [VARCHAR(250)] + NOT NULL + +bundle_version + + [VARCHAR(200)] + +dag_display_name + + [VARCHAR(2000)] + +deadline + + [JSON] + +description + + [TEXT] + +fail_fast + + [BOOLEAN] + NOT NULL + +fileloc + + [VARCHAR(2000)] + +has_import_errors + + [BOOLEAN] + +has_task_concurrency_limits + + [BOOLEAN] + NOT NULL + +is_paused + + [BOOLEAN] + +is_stale + + [BOOLEAN] + NOT NULL + +last_expired + + [TIMESTAMP] + +last_parse_duration + + [DOUBLE_PRECISION] + +last_parsed_time + + [TIMESTAMP] + +max_active_runs + + [INTEGER] + +max_active_tasks + + [INTEGER] + NOT NULL + +max_consecutive_failed_dag_runs + + [INTEGER] + NOT NULL + +next_dagrun + + [TIMESTAMP] + +next_dagrun_create_after + + [TIMESTAMP] + +next_dagrun_data_interval_end + + [TIMESTAMP] + +next_dagrun_data_interval_start + + [TIMESTAMP] + +owners + + [VARCHAR(2000)] + +relative_fileloc + + [VARCHAR(2000)] + +timetable_description + + [VARCHAR(1000)] + +timetable_summary + + [TEXT] dag_bundle:name--dag:bundle_name - -0..N -1 + +0..N +1 team - -team - -name - - [VARCHAR(50)] - NOT NULL + +team + +name + + [VARCHAR(50)] + NOT NULL team:name--dag_bundle_team:team_name - -0..N -1 + +0..N +1 connection - -connection - -id - - [INTEGER] - NOT NULL - -conn_id - - [VARCHAR(250)] - NOT NULL - -conn_type - - [VARCHAR(500)] - NOT NULL - -description - - [TEXT] - -extra - - [TEXT] - -host - - [VARCHAR(500)] - -is_encrypted - - [BOOLEAN] - -is_extra_encrypted - - [BOOLEAN] - -login - - [TEXT] - -password - - [TEXT] - -port - - [INTEGER] - -schema - - [VARCHAR(500)] - -team_name - - [VARCHAR(50)] + +connection + +id + + [INTEGER] + NOT NULL + +conn_id + + [VARCHAR(250)] + NOT NULL + +conn_type + + [VARCHAR(500)] + NOT NULL + +description + + [TEXT] + +extra + + [TEXT] + +host + + [VARCHAR(500)] + +is_encrypted + + [BOOLEAN] + +is_extra_encrypted + + [BOOLEAN] + +login + + [TEXT] + +password + + [TEXT] + +port + + [INTEGER] + +schema + + [VARCHAR(500)] + +team_name + + [VARCHAR(50)] team:name--connection:team_name - -0..N -{0,1} + +0..N +{0,1} slot_pool - -slot_pool - -id - - [INTEGER] - NOT NULL - -description - - [TEXT] - -include_deferred - - [BOOLEAN] - NOT NULL - -pool - - [VARCHAR(256)] - -slots - - [INTEGER] - -team_name - - [VARCHAR(50)] + +slot_pool + +id + + [INTEGER] + NOT NULL + +description + + [TEXT] + +include_deferred + + [BOOLEAN] + NOT NULL + +pool + + [VARCHAR(256)] + +slots + + [INTEGER] + +team_name + + [VARCHAR(50)] team:name--slot_pool:team_name - -0..N -{0,1} + +0..N +{0,1} variable - -variable - -id - - [INTEGER] - NOT NULL - -description - - [TEXT] - -is_encrypted - - [BOOLEAN] - -key - - [VARCHAR(250)] - -team_name - - [VARCHAR(50)] - -val - - [TEXT] + +variable + +id + + [INTEGER] + NOT NULL + +description + + [TEXT] + +is_encrypted + + [BOOLEAN] + +key + + [VARCHAR(250)] + +team_name + + [VARCHAR(50)] + +val + + [TEXT] team:name--variable:team_name - -0..N -{0,1} + +0..N +{0,1} @@ -577,9 +577,13 @@ [TEXT] NOT NULL -triggerer_id - - [INTEGER] +queue + + [VARCHAR(256)] + +triggerer_id + + [INTEGER] @@ -658,7 +662,7 @@ NOT NULL - + trigger:id--asset_watcher:trigger_id 0..N @@ -667,229 +671,229 @@ task_instance - -task_instance - -id - - [UUID] - NOT NULL - -context_carrier - - [JSONB] - -custom_operator_name - - [VARCHAR(1000)] - -dag_id - - [VARCHAR(250)] - NOT NULL - -dag_version_id - - [UUID] - -duration - - [DOUBLE_PRECISION] - -end_date - - [TIMESTAMP] - -executor - - [VARCHAR(1000)] - -executor_config - - [BYTEA] - -external_executor_id - - [VARCHAR(250)] - -hostname - - [VARCHAR(1000)] - -last_heartbeat_at - - [TIMESTAMP] - -map_index - - [INTEGER] - NOT NULL - -max_tries - - [INTEGER] - -next_kwargs - - [JSONB] - -next_method - - [VARCHAR(1000)] - -operator - - [VARCHAR(1000)] - -pid - - [INTEGER] - -pool - - [VARCHAR(256)] - NOT NULL - -pool_slots - - [INTEGER] - NOT NULL - -priority_weight - - [INTEGER] - -queue - - [VARCHAR(256)] - -queued_by_job_id - - [INTEGER] - -queued_dttm - - [TIMESTAMP] - -rendered_map_index - - [VARCHAR(250)] - -run_id - - [VARCHAR(250)] - NOT NULL - -scheduled_dttm - - [TIMESTAMP] - -span_status - - [VARCHAR(250)] - NOT NULL - -start_date - - [TIMESTAMP] - -state - - [VARCHAR(20)] - -task_display_name - - [VARCHAR(2000)] - -task_id - - [VARCHAR(250)] - NOT NULL - -trigger_id - - [INTEGER] - -trigger_timeout - - [TIMESTAMP] - -try_number - - [INTEGER] - -unixname - - [VARCHAR(1000)] - -updated_at - - [TIMESTAMP] + +task_instance + +id + + [UUID] + NOT NULL + +context_carrier + + [JSONB] + +custom_operator_name + + [VARCHAR(1000)] + +dag_id + + [VARCHAR(250)] + NOT NULL + +dag_version_id + + [UUID] + +duration + + [DOUBLE_PRECISION] + +end_date + + [TIMESTAMP] + +executor + + [VARCHAR(1000)] + +executor_config + + [BYTEA] + +external_executor_id + + [VARCHAR(250)] + +hostname + + [VARCHAR(1000)] + +last_heartbeat_at + + [TIMESTAMP] + +map_index + + [INTEGER] + NOT NULL + +max_tries + + [INTEGER] + +next_kwargs + + [JSONB] + +next_method + + [VARCHAR(1000)] + +operator + + [VARCHAR(1000)] + +pid + + [INTEGER] + +pool + + [VARCHAR(256)] + NOT NULL + +pool_slots + + [INTEGER] + NOT NULL + +priority_weight + + [INTEGER] + +queue + + [VARCHAR(256)] + +queued_by_job_id + + [INTEGER] + +queued_dttm + + [TIMESTAMP] + +rendered_map_index + + [VARCHAR(250)] + +run_id + + [VARCHAR(250)] + NOT NULL + +scheduled_dttm + + [TIMESTAMP] + +span_status + + [VARCHAR(250)] + NOT NULL + +start_date + + [TIMESTAMP] + +state + + [VARCHAR(20)] + +task_display_name + + [VARCHAR(2000)] + +task_id + + [VARCHAR(250)] + NOT NULL + +trigger_id + + [INTEGER] + +trigger_timeout + + [TIMESTAMP] + +try_number + + [INTEGER] + +unixname + + [VARCHAR(1000)] + +updated_at + + [TIMESTAMP] trigger:id--task_instance:trigger_id - -0..N + +0..N {0,1} deadline - -deadline - -id - - [UUID] - NOT NULL - -callback_id - - [UUID] - NOT NULL - -dagrun_id - - [INTEGER] - -deadline_time - - [TIMESTAMP] - NOT NULL - -missed - - [BOOLEAN] - NOT NULL + +deadline + +id + + [UUID] + NOT NULL + +callback_id + + [UUID] + NOT NULL + +dagrun_id + + [INTEGER] + +deadline_time + + [TIMESTAMP] + NOT NULL + +missed + + [BOOLEAN] + NOT NULL - + callback:id--deadline:callback_id - -0..N + +0..N 1 asset_alias - -asset_alias - -id - - [INTEGER] - NOT NULL - -group - - [VARCHAR(1500)] - NOT NULL - -name - - [VARCHAR(1500)] - NOT NULL + +asset_alias + +id + + [INTEGER] + NOT NULL + +group + + [VARCHAR(1500)] + NOT NULL + +name + + [VARCHAR(1500)] + NOT NULL @@ -910,9 +914,9 @@ asset_alias:id--asset_alias_asset:alias_id - + 0..N -1 +1 @@ -931,99 +935,99 @@ NOT NULL - + asset_alias:id--asset_alias_asset_event:alias_id - + 0..N -1 +1 dag_schedule_asset_alias_reference - -dag_schedule_asset_alias_reference - -alias_id - - [INTEGER] - NOT NULL - -dag_id - - [VARCHAR(250)] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL - -updated_at - - [TIMESTAMP] - NOT NULL + +dag_schedule_asset_alias_reference + +alias_id + + [INTEGER] + NOT NULL + +dag_id + + [VARCHAR(250)] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL + +updated_at + + [TIMESTAMP] + NOT NULL - + asset_alias:id--dag_schedule_asset_alias_reference:alias_id - -0..N -1 + +0..N +1 asset - -asset - -id - - [INTEGER] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL - -extra - - [JSON] - NOT NULL - -group - - [VARCHAR(1500)] - NOT NULL - -name - - [VARCHAR(1500)] - NOT NULL - -updated_at - - [TIMESTAMP] - NOT NULL - -uri - - [VARCHAR(1500)] - NOT NULL + +asset + +id + + [INTEGER] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL + +extra + + [JSON] + NOT NULL + +group + + [VARCHAR(1500)] + NOT NULL + +name + + [VARCHAR(1500)] + NOT NULL + +updated_at + + [TIMESTAMP] + NOT NULL + +uri + + [VARCHAR(1500)] + NOT NULL asset:id--asset_alias_asset:asset_id - + 0..N -1 +1 - + asset:id--asset_watcher:asset_id - + 0..N -1 +1 @@ -1044,153 +1048,153 @@ asset:name--asset_active:name - + 1 -1 +1 asset:uri--asset_active:uri - + 1 -1 +1 dag_schedule_asset_reference - -dag_schedule_asset_reference - -asset_id - - [INTEGER] - NOT NULL - -dag_id - - [VARCHAR(250)] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL - -updated_at - - [TIMESTAMP] - NOT NULL + +dag_schedule_asset_reference + +asset_id + + [INTEGER] + NOT NULL + +dag_id + + [VARCHAR(250)] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL + +updated_at + + [TIMESTAMP] + NOT NULL - + asset:id--dag_schedule_asset_reference:asset_id - -0..N -1 + +0..N +1 task_outlet_asset_reference - -task_outlet_asset_reference - -asset_id - - [INTEGER] - NOT NULL - -dag_id - - [VARCHAR(250)] - NOT NULL - -task_id - - [VARCHAR(250)] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL - -updated_at - - [TIMESTAMP] - NOT NULL + +task_outlet_asset_reference + +asset_id + + [INTEGER] + NOT NULL + +dag_id + + [VARCHAR(250)] + NOT NULL + +task_id + + [VARCHAR(250)] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL + +updated_at + + [TIMESTAMP] + NOT NULL - + asset:id--task_outlet_asset_reference:asset_id - -0..N -1 + +0..N +1 task_inlet_asset_reference - -task_inlet_asset_reference - -asset_id - - [INTEGER] - NOT NULL - -dag_id - - [VARCHAR(250)] - NOT NULL - -task_id - - [VARCHAR(250)] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL - -updated_at - - [TIMESTAMP] - NOT NULL + +task_inlet_asset_reference + +asset_id + + [INTEGER] + NOT NULL + +dag_id + + [VARCHAR(250)] + NOT NULL + +task_id + + [VARCHAR(250)] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL + +updated_at + + [TIMESTAMP] + NOT NULL - + asset:id--task_inlet_asset_reference:asset_id - -0..N -1 + +0..N +1 asset_dag_run_queue - -asset_dag_run_queue - -asset_id - - [INTEGER] - NOT NULL - -target_dag_id - - [VARCHAR(250)] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL + +asset_dag_run_queue + +asset_id + + [INTEGER] + NOT NULL + +target_dag_id + + [VARCHAR(250)] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL - + asset:id--asset_dag_run_queue:asset_id - -0..N -1 + +0..N +1 @@ -1239,7 +1243,7 @@ NOT NULL - + asset_event:id--asset_alias_asset_event:event_id 0..N @@ -1248,269 +1252,269 @@ dagrun_asset_event - -dagrun_asset_event - -dag_run_id - - [INTEGER] - NOT NULL - -event_id - - [INTEGER] - NOT NULL + +dagrun_asset_event + +dag_run_id + + [INTEGER] + NOT NULL + +event_id + + [INTEGER] + NOT NULL asset_event:id--dagrun_asset_event:event_id - -0..N + +0..N 1 dag_version - -dag_version - -id - - [UUID] - NOT NULL - -bundle_name - - [VARCHAR(250)] - -bundle_version - - [VARCHAR(250)] - -created_at - - [TIMESTAMP] - NOT NULL - -dag_id - - [VARCHAR(250)] - NOT NULL - -last_updated - - [TIMESTAMP] - NOT NULL - -version_number - - [INTEGER] - NOT NULL + +dag_version + +id + + [UUID] + NOT NULL + +bundle_name + + [VARCHAR(250)] + +bundle_version + + [VARCHAR(250)] + +created_at + + [TIMESTAMP] + NOT NULL + +dag_id + + [VARCHAR(250)] + NOT NULL + +last_updated + + [TIMESTAMP] + NOT NULL + +version_number + + [INTEGER] + NOT NULL dag:dag_id--dag_version:dag_id - -0..N -1 + +0..N +1 dag_schedule_asset_name_reference - -dag_schedule_asset_name_reference - -dag_id - - [VARCHAR(250)] - NOT NULL - -name - - [VARCHAR(1500)] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL + +dag_schedule_asset_name_reference + +dag_id + + [VARCHAR(250)] + NOT NULL + +name + + [VARCHAR(1500)] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL dag:dag_id--dag_schedule_asset_name_reference:dag_id - -0..N -1 + +0..N +1 dag_schedule_asset_uri_reference - -dag_schedule_asset_uri_reference - -dag_id - - [VARCHAR(250)] - NOT NULL - -uri - - [VARCHAR(1500)] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL + +dag_schedule_asset_uri_reference + +dag_id + + [VARCHAR(250)] + NOT NULL + +uri + + [VARCHAR(1500)] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL dag:dag_id--dag_schedule_asset_uri_reference:dag_id - -0..N -1 + +0..N +1 - + dag:dag_id--dag_schedule_asset_alias_reference:dag_id - -0..N -1 + +0..N +1 - + dag:dag_id--dag_schedule_asset_reference:dag_id - -0..N -1 + +0..N +1 - + dag:dag_id--task_outlet_asset_reference:dag_id - -0..N -1 + +0..N +1 - + dag:dag_id--task_inlet_asset_reference:dag_id - -0..N -1 + +0..N +1 - + dag:dag_id--asset_dag_run_queue:target_dag_id - -0..N -1 + +0..N +1 dag_tag - -dag_tag - -dag_id - - [VARCHAR(250)] - NOT NULL - -name - - [VARCHAR(100)] - NOT NULL + +dag_tag + +dag_id + + [VARCHAR(250)] + NOT NULL + +name + + [VARCHAR(100)] + NOT NULL dag:dag_id--dag_tag:dag_id - -0..N -1 + +0..N +1 dag_owner_attributes - -dag_owner_attributes - -dag_id - - [VARCHAR(250)] - NOT NULL - -owner - - [VARCHAR(500)] - NOT NULL - -link - - [VARCHAR(500)] - NOT NULL + +dag_owner_attributes + +dag_id + + [VARCHAR(250)] + NOT NULL + +owner + + [VARCHAR(500)] + NOT NULL + +link + + [VARCHAR(500)] + NOT NULL dag:dag_id--dag_owner_attributes:dag_id - -0..N -1 + +0..N +1 dag_warning - -dag_warning - -dag_id - - [VARCHAR(250)] - NOT NULL - -warning_type - - [VARCHAR(50)] - NOT NULL - -message - - [TEXT] - NOT NULL - -timestamp - - [TIMESTAMP] - NOT NULL + +dag_warning + +dag_id + + [VARCHAR(250)] + NOT NULL + +warning_type + + [VARCHAR(50)] + NOT NULL + +message + + [TEXT] + NOT NULL + +timestamp + + [TIMESTAMP] + NOT NULL dag:dag_id--dag_warning:dag_id - -0..N -1 + +0..N +1 dag_favorite - -dag_favorite - -dag_id - - [VARCHAR(250)] - NOT NULL - -user_id - - [VARCHAR(250)] - NOT NULL + +dag_favorite + +dag_id + + [VARCHAR(250)] + NOT NULL + +user_id + + [VARCHAR(250)] + NOT NULL dag:dag_id--dag_favorite:dag_id - -0..N -1 + +0..N +1 @@ -1636,120 +1640,120 @@ dag_version:id--dag_run:created_dag_version_id - + 0..N -{0,1} +{0,1} dag_code - -dag_code - -id - - [UUID] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL - -dag_id - - [VARCHAR(250)] - NOT NULL - -dag_version_id - - [UUID] - NOT NULL - -fileloc - - [VARCHAR(2000)] - NOT NULL - -last_updated - - [TIMESTAMP] - NOT NULL - -source_code - - [TEXT] - NOT NULL - -source_code_hash - - [VARCHAR(32)] - NOT NULL + +dag_code + +id + + [UUID] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL + +dag_id + + [VARCHAR(250)] + NOT NULL + +dag_version_id + + [UUID] + NOT NULL + +fileloc + + [VARCHAR(2000)] + NOT NULL + +last_updated + + [TIMESTAMP] + NOT NULL + +source_code + + [TEXT] + NOT NULL + +source_code_hash + + [VARCHAR(32)] + NOT NULL dag_version:id--dag_code:dag_version_id - -0..N -1 + +0..N +1 serialized_dag - -serialized_dag - -id - - [UUID] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL - -dag_hash - - [VARCHAR(32)] - NOT NULL - -dag_id - - [VARCHAR(250)] - NOT NULL - -dag_version_id - - [UUID] - NOT NULL - -data - - [JSONB] - -data_compressed - - [BYTEA] - -last_updated - - [TIMESTAMP] - NOT NULL + +serialized_dag + +id + + [UUID] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL + +dag_hash + + [VARCHAR(32)] + NOT NULL + +dag_id + + [VARCHAR(250)] + NOT NULL + +dag_version_id + + [UUID] + NOT NULL + +data + + [JSONB] + +data_compressed + + [BYTEA] + +last_updated + + [TIMESTAMP] + NOT NULL dag_version:id--serialized_dag:dag_version_id - -0..N -1 + +0..N +1 - + dag_version:id--task_instance:dag_version_id - -0..N -{0,1} + +0..N +{0,1} @@ -1785,150 +1789,150 @@ {0,1} - + dag_run:id--deadline:dagrun_id - -0..N + +0..N {0,1} dag_run:id--dagrun_asset_event:dag_run_id - -0..N + +0..N 1 asset_partition_dag_run - -asset_partition_dag_run - -id - - [INTEGER] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL - -created_dag_run_id - - [INTEGER] - -partition_key - - [VARCHAR(250)] - NOT NULL - -target_dag_id - - [VARCHAR(250)] - NOT NULL - -updated_at - - [TIMESTAMP] - NOT NULL + +asset_partition_dag_run + +id + + [INTEGER] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL + +created_dag_run_id + + [INTEGER] + +partition_key + + [VARCHAR(250)] + NOT NULL + +target_dag_id + + [VARCHAR(250)] + NOT NULL + +updated_at + + [TIMESTAMP] + NOT NULL dag_run:id--asset_partition_dag_run:created_dag_run_id - -0..N + +0..N {0,1} backfill_dag_run - -backfill_dag_run - -id - - [INTEGER] - NOT NULL - -backfill_id - - [INTEGER] - NOT NULL - -dag_run_id - - [INTEGER] - -exception_reason - - [VARCHAR(250)] - -logical_date - - [TIMESTAMP] - NOT NULL - -sort_ordinal - - [INTEGER] - NOT NULL + +backfill_dag_run + +id + + [INTEGER] + NOT NULL + +backfill_id + + [INTEGER] + NOT NULL + +dag_run_id + + [INTEGER] + +exception_reason + + [VARCHAR(250)] + +logical_date + + [TIMESTAMP] + NOT NULL + +sort_ordinal + + [INTEGER] + NOT NULL dag_run:id--backfill_dag_run:dag_run_id - -0..N + +0..N {0,1} - -dag_run:run_id--task_instance:run_id - -0..N -1 - - dag_run:dag_id--task_instance:dag_id - -0..N + +0..N 1 + + +dag_run:run_id--task_instance:run_id + +0..N +1 + dag_run_note - -dag_run_note - -dag_run_id - - [INTEGER] - NOT NULL - -content - - [VARCHAR(1000)] - -created_at - - [TIMESTAMP] - NOT NULL - -updated_at - - [TIMESTAMP] - NOT NULL - -user_id - - [VARCHAR(128)] + +dag_run_note + +dag_run_id + + [INTEGER] + NOT NULL + +content + + [VARCHAR(1000)] + +created_at + + [TIMESTAMP] + NOT NULL + +updated_at + + [TIMESTAMP] + NOT NULL + +user_id + + [VARCHAR(128)] dag_run:id--dag_run_note:dag_run_id - -1 + +1 1 @@ -2004,627 +2008,627 @@ backfill:id--backfill_dag_run:backfill_id - -0..N + +0..N 1 hitl_detail - -hitl_detail - -ti_id - - [UUID] - NOT NULL - -assignees - - [JSON] - -body - - [TEXT] - -chosen_options - - [JSON] - -created_at - - [TIMESTAMP] - NOT NULL - -defaults - - [JSON] - -multiple - - [BOOLEAN] - -options - - [JSON] - NOT NULL - -params - - [JSON] - NOT NULL - -params_input - - [JSON] - NOT NULL - -responded_at - - [TIMESTAMP] - -responded_by - - [JSON] - -subject - - [TEXT] - NOT NULL + +hitl_detail + +ti_id + + [UUID] + NOT NULL + +assignees + + [JSON] + +body + + [TEXT] + +chosen_options + + [JSON] + +created_at + + [TIMESTAMP] + NOT NULL + +defaults + + [JSON] + +multiple + + [BOOLEAN] + +options + + [JSON] + NOT NULL + +params + + [JSON] + NOT NULL + +params_input + + [JSON] + NOT NULL + +responded_at + + [TIMESTAMP] + +responded_by + + [JSON] + +subject + + [TEXT] + NOT NULL task_instance:id--hitl_detail:ti_id - -1 -1 + +1 +1 task_map - -task_map - -dag_id - - [VARCHAR(250)] - NOT NULL - -map_index - - [INTEGER] - NOT NULL - -run_id - - [VARCHAR(250)] - NOT NULL - -task_id - - [VARCHAR(250)] - NOT NULL - -keys - - [JSONB] - -length - - [INTEGER] - NOT NULL + +task_map + +dag_id + + [VARCHAR(250)] + NOT NULL + +map_index + + [INTEGER] + NOT NULL + +run_id + + [VARCHAR(250)] + NOT NULL + +task_id + + [VARCHAR(250)] + NOT NULL + +keys + + [JSONB] + +length + + [INTEGER] + NOT NULL -task_instance:map_index--task_map:map_index - -0..N -1 +task_instance:dag_id--task_map:dag_id + +0..N +1 -task_instance:task_id--task_map:task_id - -0..N -1 +task_instance:run_id--task_map:run_id + +0..N +1 -task_instance:dag_id--task_map:dag_id - -0..N -1 +task_instance:task_id--task_map:task_id + +0..N +1 -task_instance:run_id--task_map:run_id - -0..N -1 +task_instance:map_index--task_map:map_index + +0..N +1 task_reschedule - -task_reschedule - -id - - [INTEGER] - NOT NULL - -duration - - [INTEGER] - NOT NULL - -end_date - - [TIMESTAMP] - NOT NULL - -reschedule_date - - [TIMESTAMP] - NOT NULL - -start_date - - [TIMESTAMP] - NOT NULL - -ti_id - - [UUID] - NOT NULL + +task_reschedule + +id + + [INTEGER] + NOT NULL + +duration + + [INTEGER] + NOT NULL + +end_date + + [TIMESTAMP] + NOT NULL + +reschedule_date + + [TIMESTAMP] + NOT NULL + +start_date + + [TIMESTAMP] + NOT NULL + +ti_id + + [UUID] + NOT NULL task_instance:id--task_reschedule:ti_id - -0..N -1 + +0..N +1 xcom - -xcom - -dag_run_id - - [INTEGER] - NOT NULL - -key - - [VARCHAR(512)] - NOT NULL - -map_index - - [INTEGER] - NOT NULL - -task_id - - [VARCHAR(250)] - NOT NULL - -dag_id - - [VARCHAR(250)] - NOT NULL - -run_id - - [VARCHAR(250)] - NOT NULL - -timestamp - - [TIMESTAMP] - NOT NULL - -value - - [JSONB] + +xcom + +dag_run_id + + [INTEGER] + NOT NULL + +key + + [VARCHAR(512)] + NOT NULL + +map_index + + [INTEGER] + NOT NULL + +task_id + + [VARCHAR(250)] + NOT NULL + +dag_id + + [VARCHAR(250)] + NOT NULL + +run_id + + [VARCHAR(250)] + NOT NULL + +timestamp + + [TIMESTAMP] + NOT NULL + +value + + [JSONB] -task_instance:task_id--xcom:task_id - -0..N -1 +task_instance:dag_id--xcom:dag_id + +0..N +1 -task_instance:run_id--xcom:run_id - -0..N -1 +task_instance:map_index--xcom:map_index + +0..N +1 -task_instance:dag_id--xcom:dag_id - -0..N -1 +task_instance:run_id--xcom:run_id + +0..N +1 -task_instance:map_index--xcom:map_index - -0..N -1 +task_instance:task_id--xcom:task_id + +0..N +1 task_instance_note - -task_instance_note - -ti_id - - [UUID] - NOT NULL - -content - - [VARCHAR(1000)] - -created_at - - [TIMESTAMP] - NOT NULL - -updated_at - - [TIMESTAMP] - NOT NULL - -user_id - - [VARCHAR(128)] + +task_instance_note + +ti_id + + [UUID] + NOT NULL + +content + + [VARCHAR(1000)] + +created_at + + [TIMESTAMP] + NOT NULL + +updated_at + + [TIMESTAMP] + NOT NULL + +user_id + + [VARCHAR(128)] task_instance:id--task_instance_note:ti_id - -1 -1 + +1 +1 task_instance_history - -task_instance_history - -task_instance_id - - [UUID] - NOT NULL - -context_carrier - - [JSONB] - -custom_operator_name - - [VARCHAR(1000)] - -dag_id - - [VARCHAR(250)] - NOT NULL - -dag_version_id - - [UUID] - -duration - - [DOUBLE_PRECISION] - -end_date - - [TIMESTAMP] - -executor - - [VARCHAR(1000)] - -executor_config - - [BYTEA] - -external_executor_id - - [VARCHAR(250)] - -hostname - - [VARCHAR(1000)] - -map_index - - [INTEGER] - NOT NULL - -max_tries - - [INTEGER] - -next_kwargs - - [JSONB] - -next_method - - [VARCHAR(1000)] - -operator - - [VARCHAR(1000)] - -pid - - [INTEGER] - -pool - - [VARCHAR(256)] - NOT NULL - -pool_slots - - [INTEGER] - NOT NULL - -priority_weight - - [INTEGER] - -queue - - [VARCHAR(256)] - -queued_by_job_id - - [INTEGER] - -queued_dttm - - [TIMESTAMP] - -rendered_map_index - - [VARCHAR(250)] - -run_id - - [VARCHAR(250)] - NOT NULL - -scheduled_dttm - - [TIMESTAMP] - -span_status - - [VARCHAR(250)] - NOT NULL - -start_date - - [TIMESTAMP] - -state - - [VARCHAR(20)] - -task_display_name - - [VARCHAR(2000)] - -task_id - - [VARCHAR(250)] - NOT NULL - -trigger_id - - [INTEGER] - -trigger_timeout - - [TIMESTAMP] - -try_number - - [INTEGER] - NOT NULL - -unixname - - [VARCHAR(1000)] - -updated_at - - [TIMESTAMP] + +task_instance_history + +task_instance_id + + [UUID] + NOT NULL + +context_carrier + + [JSONB] + +custom_operator_name + + [VARCHAR(1000)] + +dag_id + + [VARCHAR(250)] + NOT NULL + +dag_version_id + + [UUID] + +duration + + [DOUBLE_PRECISION] + +end_date + + [TIMESTAMP] + +executor + + [VARCHAR(1000)] + +executor_config + + [BYTEA] + +external_executor_id + + [VARCHAR(250)] + +hostname + + [VARCHAR(1000)] + +map_index + + [INTEGER] + NOT NULL + +max_tries + + [INTEGER] + +next_kwargs + + [JSONB] + +next_method + + [VARCHAR(1000)] + +operator + + [VARCHAR(1000)] + +pid + + [INTEGER] + +pool + + [VARCHAR(256)] + NOT NULL + +pool_slots + + [INTEGER] + NOT NULL + +priority_weight + + [INTEGER] + +queue + + [VARCHAR(256)] + +queued_by_job_id + + [INTEGER] + +queued_dttm + + [TIMESTAMP] + +rendered_map_index + + [VARCHAR(250)] + +run_id + + [VARCHAR(250)] + NOT NULL + +scheduled_dttm + + [TIMESTAMP] + +span_status + + [VARCHAR(250)] + NOT NULL + +start_date + + [TIMESTAMP] + +state + + [VARCHAR(20)] + +task_display_name + + [VARCHAR(2000)] + +task_id + + [VARCHAR(250)] + NOT NULL + +trigger_id + + [INTEGER] + +trigger_timeout + + [TIMESTAMP] + +try_number + + [INTEGER] + NOT NULL + +unixname + + [VARCHAR(1000)] + +updated_at + + [TIMESTAMP] task_instance:task_id--task_instance_history:task_id - -0..N -1 + +0..N +1 -task_instance:run_id--task_instance_history:run_id - -0..N -1 +task_instance:map_index--task_instance_history:map_index + +0..N +1 -task_instance:map_index--task_instance_history:map_index - -0..N -1 +task_instance:run_id--task_instance_history:run_id + +0..N +1 task_instance:dag_id--task_instance_history:dag_id - -0..N -1 + +0..N +1 rendered_task_instance_fields - -rendered_task_instance_fields - -dag_id - - [VARCHAR(250)] - NOT NULL - -map_index - - [INTEGER] - NOT NULL - -run_id - - [VARCHAR(250)] - NOT NULL - -task_id - - [VARCHAR(250)] - NOT NULL - -k8s_pod_yaml - - [JSON] - -rendered_fields - - [JSON] - NOT NULL + +rendered_task_instance_fields + +dag_id + + [VARCHAR(250)] + NOT NULL + +map_index + + [INTEGER] + NOT NULL + +run_id + + [VARCHAR(250)] + NOT NULL + +task_id + + [VARCHAR(250)] + NOT NULL + +k8s_pod_yaml + + [JSON] + +rendered_fields + + [JSON] + NOT NULL -task_instance:dag_id--rendered_task_instance_fields:dag_id - -0..N -1 +task_instance:map_index--rendered_task_instance_fields:map_index + +0..N +1 task_instance:run_id--rendered_task_instance_fields:run_id - -0..N -1 + +0..N +1 -task_instance:task_id--rendered_task_instance_fields:task_id - -0..N -1 +task_instance:dag_id--rendered_task_instance_fields:dag_id + +0..N +1 -task_instance:map_index--rendered_task_instance_fields:map_index - -0..N -1 +task_instance:task_id--rendered_task_instance_fields:task_id + +0..N +1 hitl_detail_history - -hitl_detail_history - -ti_history_id - - [UUID] - NOT NULL - -assignees - - [JSON] - -body - - [TEXT] - -chosen_options - - [JSON] - -created_at - - [TIMESTAMP] - NOT NULL - -defaults - - [JSON] - -multiple - - [BOOLEAN] - -options - - [JSON] - NOT NULL - -params - - [JSON] - NOT NULL - -params_input - - [JSON] - NOT NULL - -responded_at - - [TIMESTAMP] - -responded_by - - [JSON] - -subject - - [TEXT] - NOT NULL + +hitl_detail_history + +ti_history_id + + [UUID] + NOT NULL + +assignees + + [JSON] + +body + + [TEXT] + +chosen_options + + [JSON] + +created_at + + [TIMESTAMP] + NOT NULL + +defaults + + [JSON] + +multiple + + [BOOLEAN] + +options + + [JSON] + NOT NULL + +params + + [JSON] + NOT NULL + +params_input + + [JSON] + NOT NULL + +responded_at + + [TIMESTAMP] + +responded_by + + [JSON] + +subject + + [TEXT] + NOT NULL task_instance_history:task_instance_id--hitl_detail_history:ti_history_id - -1 -1 + +1 +1 alembic_version - -alembic_version - -version_num - - [VARCHAR(32)] - NOT NULL + +alembic_version + +version_num + + [VARCHAR(32)] + NOT NULL diff --git a/airflow-core/docs/migrations-ref.rst b/airflow-core/docs/migrations-ref.rst index 85e52e4fc08fd..4bed891331bf4 100644 --- a/airflow-core/docs/migrations-ref.rst +++ b/airflow-core/docs/migrations-ref.rst @@ -39,7 +39,9 @@ Here's the list of all the Database Migrations that are executed via when you ru +-------------------------+------------------+-------------------+--------------------------------------------------------------+ | Revision ID | Revises ID | Airflow Version | Description | +=========================+==================+===================+==============================================================+ -| ``edc4f85a4619`` (head) | ``b12d4f98a91e`` | ``3.2.0`` | Enforce the new ``NOT NULL`` expectations for ``log.event`` | +| ``c47f2e1ab9d4`` (head) | ``edc4f85a4619`` | ``3.2.0`` | Add ``queue`` column to ``trigger`` table. | ++-------------------------+------------------+-------------------+--------------------------------------------------------------+ +| ``edc4f85a4619`` | ``b12d4f98a91e`` | ``3.2.0`` | Enforce the new ``NOT NULL`` expectations for ``log.event`` | | | | | and ``dag.is_stale``. | +-------------------------+------------------+-------------------+--------------------------------------------------------------+ | ``b12d4f98a91e`` | ``665854ef0536`` | ``3.2.0`` | Drop ``id`` column from ``team`` table and make ``name`` the | diff --git a/airflow-core/newsfragments/59239.feature.rst b/airflow-core/newsfragments/59239.feature.rst new file mode 100644 index 0000000000000..e4292a4b906fd --- /dev/null +++ b/airflow-core/newsfragments/59239.feature.rst @@ -0,0 +1 @@ +Support for task queue-based Trigger assignment to specific Triggerer hosts via the new ``--queues`` CLI option for the ``trigger`` command. diff --git a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/trigger.py b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/trigger.py index 265d40ff19bfd..9b67ff9dc17e7 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/trigger.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/trigger.py @@ -33,4 +33,5 @@ class TriggerResponse(BaseModel): classpath: str kwargs: Annotated[str, BeforeValidator(str)] created_date: datetime + queue: str | None triggerer_id: int | None diff --git a/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml b/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml index cf241db0d076b..ccb45d06727ac 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml +++ b/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml @@ -2729,6 +2729,11 @@ components: type: string format: date-time title: Created Date + queue: + anyOf: + - type: string + - type: 'null' + title: Queue triggerer_id: anyOf: - type: integer @@ -2740,6 +2745,7 @@ components: - classpath - kwargs - created_date + - queue - triggerer_id title: TriggerResponse description: Trigger serializer for responses. diff --git a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml index 63b0bda97f554..550d6e936e116 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml +++ b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml @@ -13101,6 +13101,11 @@ components: type: string format: date-time title: Created Date + queue: + anyOf: + - type: string + - type: 'null' + title: Queue triggerer_id: anyOf: - type: integer @@ -13112,6 +13117,7 @@ components: - classpath - kwargs - created_date + - queue - triggerer_id title: TriggerResponse description: Trigger serializer for responses. diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/taskinstance.py b/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/taskinstance.py index a4c5355a826b8..e7ebee9ebe790 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/taskinstance.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/taskinstance.py @@ -137,6 +137,7 @@ class TIDeferredStatePayload(StrictBaseModel): """ trigger_timeout: timedelta | None = None + queue: str | None = None next_method: str """The name of the method on the operator to call in the worker after the trigger has fired.""" next_kwargs: Annotated[dict[str, JsonValue], Field(default_factory=dict)] diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py index 60711ea255ef7..a729a1ee83b30 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py @@ -497,6 +497,7 @@ def _create_ti_state_update_query_and_update_state( trigger_row = Trigger( classpath=ti_patch_payload.classpath, kwargs={}, + queue=ti_patch_payload.queue, ) trigger_row.encrypted_kwargs = trigger_kwargs session.add(trigger_row) diff --git a/airflow-core/src/airflow/cli/cli_config.py b/airflow-core/src/airflow/cli/cli_config.py index 3eacfa8e11d86..e105b16b5ed73 100644 --- a/airflow-core/src/airflow/cli/cli_config.py +++ b/airflow-core/src/airflow/cli/cli_config.py @@ -905,6 +905,11 @@ def string_lower_type(val): type=positive_int(allow_zero=False), help="The maximum number of triggers that a Triggerer will run at one time.", ) +ARG_QUEUES = Arg( + ("--queues",), + type=string_list_type, + help="Optional comma-separated list of task queues which the triggerer should consume from.", +) ARG_DAG_LIST_COLUMNS = Arg( ("--columns",), @@ -1962,6 +1967,7 @@ class GroupCommand(NamedTuple): ARG_VERBOSE, ARG_SKIP_SERVE_LOGS, ARG_DEV, + ARG_QUEUES, ), ), ActionCommand( diff --git a/airflow-core/src/airflow/cli/commands/triggerer_command.py b/airflow-core/src/airflow/cli/commands/triggerer_command.py index a293d44603051..ed8a08d1d730f 100644 --- a/airflow-core/src/airflow/cli/commands/triggerer_command.py +++ b/airflow-core/src/airflow/cli/commands/triggerer_command.py @@ -26,6 +26,7 @@ from airflow import settings from airflow.cli.commands.daemon_utils import run_command_with_daemon_option from airflow.configuration import conf +from airflow.exceptions import AirflowConfigException from airflow.jobs.job import Job, run_job from airflow.jobs.triggerer_job_runner import TriggererJobRunner from airflow.utils import cli as cli_utils @@ -49,9 +50,13 @@ def _serve_logs(skip_serve_logs: bool = False) -> Generator[None, None, None]: sub_proc.terminate() -def triggerer_run(skip_serve_logs: bool, capacity: int, triggerer_heartrate: float): +def triggerer_run( + skip_serve_logs: bool, capacity: int, triggerer_heartrate: float, queues: set[str] | None = None +): with _serve_logs(skip_serve_logs): - triggerer_job_runner = TriggererJobRunner(job=Job(heartrate=triggerer_heartrate), capacity=capacity) + triggerer_job_runner = TriggererJobRunner( + job=Job(heartrate=triggerer_heartrate), capacity=capacity, queues=queues + ) run_job(job=triggerer_job_runner.job, execute_callable=triggerer_job_runner._execute) @@ -64,13 +69,19 @@ def triggerer(args): SecretsMasker.enable_log_masking() print(settings.HEADER) + if args.queues and not conf.getboolean("triggerer", "queues_enabled", fallback=False): + raise AirflowConfigException( + "--queues option may only be used when triggerer.queues_enabled is `True`." + ) + + queues = set(args.queues) if args.queues else None triggerer_heartrate = conf.getfloat("triggerer", "JOB_HEARTBEAT_SEC") if cli_utils.should_enable_hot_reload(args): from airflow.cli.hot_reload import run_with_reloader run_with_reloader( - lambda: triggerer_run(args.skip_serve_logs, args.capacity, triggerer_heartrate), + lambda: triggerer_run(args.skip_serve_logs, args.capacity, triggerer_heartrate, queues), process_name="triggerer", ) return @@ -78,6 +89,6 @@ def triggerer(args): run_command_with_daemon_option( args=args, process_name="triggerer", - callback=lambda: triggerer_run(args.skip_serve_logs, args.capacity, triggerer_heartrate), + callback=lambda: triggerer_run(args.skip_serve_logs, args.capacity, triggerer_heartrate, queues), should_setup_logging=True, ) diff --git a/airflow-core/src/airflow/config_templates/config.yml b/airflow-core/src/airflow/config_templates/config.yml index 311bd6f54d7e7..bc00638ae1e86 100644 --- a/airflow-core/src/airflow/config_templates/config.yml +++ b/airflow-core/src/airflow/config_templates/config.yml @@ -2395,6 +2395,15 @@ triggerer: type: integer example: ~ default: "50" + queues_enabled: + description: | + When set to True, deferred tasks will register triggers with the task queue they originated from, + and triggerers can selectively run triggers based on their queue assignment. Only relevant when using + executors which support task queue assignment. For more details, refer to the trigger docs. + version_added: 3.2.0 + type: boolean + example: ~ + default: "False" kerberos: description: ~ options: diff --git a/airflow-core/src/airflow/jobs/triggerer_job_runner.py b/airflow-core/src/airflow/jobs/triggerer_job_runner.py index 258dd47f6025c..7396fb56c868d 100644 --- a/airflow-core/src/airflow/jobs/triggerer_job_runner.py +++ b/airflow-core/src/airflow/jobs/triggerer_job_runner.py @@ -116,6 +116,7 @@ def __init__( self, job: Job, capacity=None, + queues: set[str] | None = None, ): super().__init__(job) if capacity is None: @@ -124,6 +125,7 @@ def __init__( self.capacity = capacity else: raise ValueError(f"Capacity number {capacity!r} is invalid") + self.queues = queues def register_signals(self) -> None: """Register signals that stop child processes.""" @@ -165,7 +167,10 @@ def _execute(self) -> int | None: try: # Kick off runner sub-process without DB access self.trigger_runner = TriggerRunnerSupervisor.start( - job=self.job, capacity=self.capacity, logger=log + job=self.job, + capacity=self.capacity, + logger=log, + queues=self.queues, ) # Run the main DB comms loop in this process @@ -334,6 +339,7 @@ class TriggerRunnerSupervisor(WatchedSubprocess): job: Job capacity: int + queues: set[str] | None = None health_check_threshold = conf.getint("triggerer", "triggerer_health_check_threshold") @@ -555,8 +561,13 @@ def heartbeat_callback(self, session: Session | None = None) -> None: @add_debug_span def load_triggers(self): """Query the database for the triggers we're supposed to be running and update the runner.""" - Trigger.assign_unassigned(self.job.id, self.capacity, self.health_check_threshold) - ids = Trigger.ids_for_triggerer(self.job.id) + Trigger.assign_unassigned( + self.job.id, + self.capacity, + self.health_check_threshold, + queues=self.queues, + ) + ids = Trigger.ids_for_triggerer(self.job.id, queues=self.queues) self.update_triggers(set(ids)) @add_debug_span diff --git a/airflow-core/src/airflow/migrations/versions/0015_2_9_0_update_trigger_kwargs_type.py b/airflow-core/src/airflow/migrations/versions/0015_2_9_0_update_trigger_kwargs_type.py index 08843afb0e3b2..d90cd60c79bf1 100644 --- a/airflow-core/src/airflow/migrations/versions/0015_2_9_0_update_trigger_kwargs_type.py +++ b/airflow-core/src/airflow/migrations/versions/0015_2_9_0_update_trigger_kwargs_type.py @@ -27,15 +27,11 @@ from __future__ import annotations -import json from textwrap import dedent import sqlalchemy as sa from alembic import context, op -from sqlalchemy.orm import lazyload -from airflow.models.trigger import Trigger -from airflow.serialization.serialized_objects import BaseSerialization from airflow.utils.sqlalchemy import ExtendedJSON # revision identifiers, used by Alembic. @@ -55,16 +51,9 @@ def get_session() -> sa.orm.Session: def upgrade(): """Update trigger kwargs type to string and encrypt.""" with op.batch_alter_table("trigger") as batch_op: - batch_op.alter_column("kwargs", type_=sa.Text(), existing_nullable=False) - - if not context.is_offline_mode(): - session = get_session() - try: - for trigger in session.query(Trigger).options(lazyload(Trigger.task_instance)): - trigger.kwargs = trigger.kwargs - session.commit() - finally: - session.close() + batch_op.alter_column( + "kwargs", existing_type=ExtendedJSON(), type_=sa.Text(), existing_nullable=False + ) def downgrade(): @@ -78,16 +67,12 @@ def downgrade(): ------------ """) ) - else: - session = get_session() - try: - for trigger in session.query(Trigger).options(lazyload(Trigger.task_instance)): - trigger.encrypted_kwargs = json.dumps(BaseSerialization.serialize(trigger.kwargs)) - session.commit() - finally: - session.close() with op.batch_alter_table("trigger") as batch_op: batch_op.alter_column( - "kwargs", type_=ExtendedJSON(), postgresql_using="kwargs::json", existing_nullable=False + "kwargs", + existing_type=sa.Text(), + type_=ExtendedJSON(), + postgresql_using="kwargs::json", + existing_nullable=False, ) diff --git a/airflow-core/src/airflow/migrations/versions/0096_3_2_0_add_queue_column_to_trigger.py b/airflow-core/src/airflow/migrations/versions/0096_3_2_0_add_queue_column_to_trigger.py new file mode 100644 index 0000000000000..e4f006125efb1 --- /dev/null +++ b/airflow-core/src/airflow/migrations/versions/0096_3_2_0_add_queue_column_to_trigger.py @@ -0,0 +1,50 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Add ``queue`` column to ``trigger`` table. + +Revision ID: c47f2e1ab9d4 +Revises: edc4f85a4619 +Create Date: 2025-12-09 20:30:40.500001 + +""" + +from __future__ import annotations + +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision = "c47f2e1ab9d4" +down_revision = "edc4f85a4619" +branch_labels = None +depends_on = None +airflow_version = "3.2.0" + + +def upgrade(): + """Add ``queue`` column in trigger table.""" + with op.batch_alter_table("trigger") as batch_op: + batch_op.add_column(sa.Column("queue", sa.String(length=256), nullable=True)) + + +def downgrade(): + """Remove ``queue`` column from trigger table.""" + with op.batch_alter_table("trigger") as batch_op: + batch_op.drop_column("queue") diff --git a/airflow-core/src/airflow/models/trigger.py b/airflow-core/src/airflow/models/trigger.py index a601636374213..3e5d3a95ec978 100644 --- a/airflow-core/src/airflow/models/trigger.py +++ b/airflow-core/src/airflow/models/trigger.py @@ -98,6 +98,7 @@ class Trigger(Base): encrypted_kwargs: Mapped[str] = mapped_column("kwargs", Text, nullable=False) created_date: Mapped[datetime.datetime] = mapped_column(UtcDateTime, nullable=False) triggerer_id: Mapped[int | None] = mapped_column(Integer, nullable=True) + queue: Mapped[str] = mapped_column(String(256), nullable=True) triggerer_job = relationship( "Job", @@ -120,11 +121,13 @@ def __init__( classpath: str, kwargs: dict[str, Any], created_date: datetime.datetime | None = None, + queue: str | None = None, ) -> None: super().__init__() self.classpath = classpath self.encrypted_kwargs = self.encrypt_kwargs(kwargs) self.created_date = created_date or timezone.utcnow() + self.queue = queue @property def kwargs(self) -> dict[str, Any]: @@ -320,21 +323,37 @@ def submit_failure(cls, trigger_id, exc=None, session: Session = NEW_SESSION) -> @classmethod @provide_session - def ids_for_triggerer(cls, triggerer_id, session: Session = NEW_SESSION) -> list[int]: + def ids_for_triggerer( + cls, triggerer_id, queues: set[str] | None = None, session: Session = NEW_SESSION + ) -> list[int]: """Retrieve a list of trigger ids.""" - return list(session.scalars(select(cls.id).where(cls.triggerer_id == triggerer_id)).all()) + query = select(cls.id).where(cls.triggerer_id == triggerer_id) + # By default, there is no trigger queue assignment. Only filter by queue when explicitly set in the triggerer CLI. + # Filter by queues if the triggerer explicitly was called with `--queues`, otherwise, filter out + # Triggers which have an explicit `queue` value since there may be other triggerer hosts explicitly assigned to that queue. + if queues: + query = query.filter(cls.queue.in_(queues)) + else: + query = query.filter(cls.queue.is_(None)) + + return list(session.scalars(query).all()) @classmethod @provide_session def assign_unassigned( - cls, triggerer_id, capacity, health_check_threshold, session: Session = NEW_SESSION + cls, + triggerer_id, + capacity, + health_check_threshold, + queues: set[str] | None = None, + session: Session = NEW_SESSION, ) -> None: """ Assign unassigned triggers based on a number of conditions. - Takes a triggerer_id, the capacity for that triggerer and the Triggerer job heartrate - health check threshold, and assigns unassigned triggers until that capacity is reached, - or there are no more unassigned triggers. + Takes a triggerer_id, the capacity for that triggerer, the Triggerer job heartrate + health check threshold, and the queues and assigns unassigned triggers until that + capacity is reached, or there are no more unassigned triggers. """ from airflow.jobs.job import Job # To avoid circular import @@ -358,7 +377,10 @@ def assign_unassigned( # Find triggers who do NOT have an alive triggerer_id, and then assign # up to `capacity` of those to us. trigger_ids_query = cls.get_sorted_triggers( - capacity=capacity, alive_triggerer_ids=alive_triggerer_ids, session=session + capacity=capacity, + alive_triggerer_ids=alive_triggerer_ids, + queues=queues, + session=session, ) if trigger_ids_query: session.execute( @@ -371,12 +393,19 @@ def assign_unassigned( session.commit() @classmethod - def get_sorted_triggers(cls, capacity: int, alive_triggerer_ids: list[int] | Select, session: Session): + def get_sorted_triggers( + cls, + capacity: int, + alive_triggerer_ids: list[int] | Select, + queues: set[str] | None, + session: Session, + ): """ Get sorted triggers based on capacity and alive triggerer ids. :param capacity: The capacity of the triggerer. :param alive_triggerer_ids: The alive triggerer ids as a list or a select query. + :param queues: The optional set of trigger queues to filter triggers by. :param session: The database session. """ result: list[Row[Any]] = [] @@ -408,7 +437,15 @@ def get_sorted_triggers(cls, capacity: int, alive_triggerer_ids: list[int] | Sel # picking up too many triggers and starving other triggerers for HA setup. remaining_capacity = min(remaining_capacity, cls.max_trigger_to_select_per_loop) - locked_query = with_row_locks(query.limit(remaining_capacity), session, skip_locked=True) + # Filter by queues if the triggerer explicitly was called with `--queues`, otherwise, filter out + # Triggers which have an explicit `queue` value since there may be other triggerer hosts explicitly + # assigned to that queue. + if queues: + filtered_query = query.filter(cls.queue.in_(queues)) + else: + filtered_query = query.filter(cls.queue.is_(None)) + + locked_query = with_row_locks(filtered_query.limit(remaining_capacity), session, skip_locked=True) result.extend(session.execute(locked_query).all()) return result diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts index 9ca36e3c90b0c..868a43380fc93 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -6499,6 +6499,17 @@ export const $TriggerResponse = { format: 'date-time', title: 'Created Date' }, + queue: { + anyOf: [ + { + type: 'string' + }, + { + type: 'null' + } + ], + title: 'Queue' + }, triggerer_id: { anyOf: [ { @@ -6512,7 +6523,7 @@ export const $TriggerResponse = { } }, type: 'object', - required: ['id', 'classpath', 'kwargs', 'created_date', 'triggerer_id'], + required: ['id', 'classpath', 'kwargs', 'created_date', 'queue', 'triggerer_id'], title: 'TriggerResponse', description: 'Trigger serializer for responses.' } as const; diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts index 3ef363b800072..59afa7eb8429d 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts @@ -1566,6 +1566,7 @@ export type TriggerResponse = { classpath: string; kwargs: string; created_date: string; + queue: string | null; triggerer_id: number | null; }; diff --git a/airflow-core/src/airflow/utils/db.py b/airflow-core/src/airflow/utils/db.py index 9d99d6e5a3fb6..8b9d095deb394 100644 --- a/airflow-core/src/airflow/utils/db.py +++ b/airflow-core/src/airflow/utils/db.py @@ -112,7 +112,7 @@ class MappedClassProtocol(Protocol): "3.0.0": "29ce7909c52b", "3.0.3": "fe199e1abd77", "3.1.0": "cc92b33c6709", - "3.2.0": "edc4f85a4619", + "3.2.0": "c47f2e1ab9d4", } # Prefix used to identify tables holding data moved during migration. diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py index e0b119beaaa4b..8499a6370697b 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py @@ -376,6 +376,7 @@ def test_should_respond_200_with_task_state_in_deferred(self, test_client, sessi "trigger": { "classpath": "none", "kwargs": "{}", + "queue": None, }, "triggerer_job": { "dag_display_name": None, diff --git a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py index e95b09f1ea1b4..e99b7d9380f03 100644 --- a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py +++ b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py @@ -40,6 +40,7 @@ from airflow.sdk import Asset, TaskGroup, TriggerRule, task, task_group from airflow.utils.state import DagRunState, State, TaskInstanceState, TerminalTIState +from tests_common.test_utils.config import conf_vars from tests_common.test_utils.db import ( clear_db_assets, clear_db_dags, @@ -1131,43 +1132,78 @@ def test_ti_update_state_database_error(self, client, session, create_task_insta assert response.status_code == 500 assert response.json()["detail"] == "Database error occurred" - def test_ti_update_state_to_deferred(self, client, session, create_task_instance, time_machine): + @pytest.mark.parametrize("queues_enabled", [False, True]) + def test_ti_update_state_to_deferred( + self, client, session, create_task_instance, time_machine, queues_enabled: bool + ): """ Test that tests if the transition to deferred state is handled correctly. """ - ti = create_task_instance( - task_id="test_ti_update_state_to_deferred", - state=State.RUNNING, - session=session, - ) - session.commit() - - instant = timezone.datetime(2024, 11, 22) - time_machine.move_to(instant, tick=False) - - payload = { - "state": "deferred", - # expected format is now in serde serialized format - "trigger_kwargs": { - "key": "value", - "moment": { - "__classname__": "datetime.datetime", - "__version__": 2, - "__data__": { - "timestamp": 1734480001.0, - "tz": { - "__classname__": "builtins.tuple", - "__version__": 1, - "__data__": ["UTC", "pendulum.tz.timezone.Timezone", 1, True], + with conf_vars({("triggerer", "queues_enabled"): str(queues_enabled)}): + ti = create_task_instance( + task_id="test_ti_update_state_to_deferred", + state=State.RUNNING, + session=session, + ) + session.commit() + + instant = timezone.datetime(2024, 11, 22) + time_machine.move_to(instant, tick=False) + + payload = { + "state": "deferred", + # expected format is now in serde serialized format + "trigger_kwargs": { + "key": "value", + "moment": { + "__classname__": "datetime.datetime", + "__version__": 2, + "__data__": { + "timestamp": 1734480001.0, + "tz": { + "__classname__": "builtins.tuple", + "__version__": 1, + "__data__": ["UTC", "pendulum.tz.timezone.Timezone", 1, True], + }, }, }, }, - }, - "trigger_timeout": "P1D", # 1 day - "classpath": "my-classpath", - "next_method": "execute_callback", - # expected format is now in serde serialized format - "next_kwargs": { + "trigger_timeout": "P1D", # 1 day + "queue": "default" if queues_enabled else None, + "classpath": "my-classpath", + "next_method": "execute_callback", + # expected format is now in serde serialized format + "next_kwargs": { + "foo": { + "__classname__": "datetime.datetime", + "__version__": 2, + "__data__": { + "timestamp": 1734480000.0, + "tz": { + "__classname__": "builtins.tuple", + "__version__": 1, + "__data__": ["UTC", "pendulum.tz.timezone.Timezone", 1, True], + }, + }, + }, + "bar": "abc", + }, + } + + response = client.patch(f"/execution/task-instances/{ti.id}/state", json=payload) + + assert response.status_code == 204 + assert response.text == "" + + session.expire_all() + + tis = session.scalars(select(TaskInstance)).all() + assert len(tis) == 1 + + assert tis[0].state == TaskInstanceState.DEFERRED + assert tis[0].next_method == "execute_callback" + + assert tis[0].next_kwargs == { "foo": { "__classname__": "datetime.datetime", "__version__": 2, @@ -1181,47 +1217,23 @@ def test_ti_update_state_to_deferred(self, client, session, create_task_instance }, }, "bar": "abc", - }, - } - - response = client.patch(f"/execution/task-instances/{ti.id}/state", json=payload) - - assert response.status_code == 204 - assert response.text == "" - - session.expire_all() - - tis = session.scalars(select(TaskInstance)).all() - assert len(tis) == 1 - - assert tis[0].state == TaskInstanceState.DEFERRED - assert tis[0].next_method == "execute_callback" + } + assert tis[0].trigger_timeout == timezone.make_aware( + datetime(2024, 11, 23), timezone=timezone.utc + ) - assert tis[0].next_kwargs == { - "foo": { - "__classname__": "datetime.datetime", - "__version__": 2, - "__data__": { - "timestamp": 1734480000.0, - "tz": { - "__classname__": "builtins.tuple", - "__version__": 1, - "__data__": ["UTC", "pendulum.tz.timezone.Timezone", 1, True], - }, - }, - }, - "bar": "abc", - } - assert tis[0].trigger_timeout == timezone.make_aware(datetime(2024, 11, 23), timezone=timezone.utc) - - t = session.scalars(select(Trigger)).all() - assert len(t) == 1 - assert t[0].created_date == instant - assert t[0].classpath == "my-classpath" - assert t[0].kwargs == { - "key": "value", - "moment": datetime(2024, 12, 18, 00, 00, 1, tzinfo=timezone.utc), - } + t = session.scalars(select(Trigger)).all() + assert len(t) == 1 + assert t[0].created_date == instant + assert t[0].classpath == "my-classpath" + assert t[0].kwargs == { + "key": "value", + "moment": datetime(2024, 12, 18, 00, 00, 1, tzinfo=timezone.utc), + } + if queues_enabled: + assert t[0].queue == "default" + else: + assert t[0].queue is None def test_ti_update_state_to_reschedule(self, client, session, create_task_instance, time_machine): """ diff --git a/airflow-core/tests/unit/cli/commands/test_triggerer_command.py b/airflow-core/tests/unit/cli/commands/test_triggerer_command.py index 44120f27fc981..fb8f6b347d3fe 100644 --- a/airflow-core/tests/unit/cli/commands/test_triggerer_command.py +++ b/airflow-core/tests/unit/cli/commands/test_triggerer_command.py @@ -24,6 +24,8 @@ from airflow.cli import cli_parser from airflow.cli.commands import triggerer_command +from tests_common.test_utils.config import conf_vars + pytestmark = pytest.mark.db_test @@ -49,14 +51,30 @@ def test_capacity_argument( triggerer_command.triggerer(args) mock_serve.return_value.__enter__.assert_called_once() mock_serve.return_value.__exit__.assert_called_once() - mock_triggerer_job_runner.assert_called_once_with(job=mock.ANY, capacity=42) + mock_triggerer_job_runner.assert_called_once_with(job=mock.ANY, capacity=42, queues=None) + + @conf_vars({("triggerer", "queues_enabled"): "True"}) + @mock.patch("airflow.cli.commands.triggerer_command.TriggererJobRunner") + @mock.patch("airflow.cli.commands.triggerer_command._serve_logs") + def test_queues_argument(self, mock_serve, mock_triggerer_job_runner): + """Ensure that the queues argument is passed correctly""" + mock_triggerer_job_runner.return_value.job_type = "TriggererJob" + args = self.parser.parse_args(["triggerer", "--capacity=4", "--queues=my_queue,other_queue"]) + triggerer_command.triggerer(args) + mock_serve.return_value.__enter__.assert_called_once() + mock_serve.return_value.__exit__.assert_called_once() + mock_triggerer_job_runner.assert_called_once_with( + job=mock.ANY, capacity=4, queues=set(["my_queue", "other_queue"]) + ) @mock.patch("airflow.cli.commands.triggerer_command.TriggererJobRunner") @mock.patch("airflow.cli.commands.triggerer_command.run_job") @mock.patch("airflow.cli.commands.triggerer_command.Process") def test_trigger_run_serve_logs(self, mock_process, mock_run_job, mock_trigger_job_runner): """Ensure that trigger runner and server log functions execute as intended""" - triggerer_command.triggerer_run(False, 1, 10.3) + triggerer_command.triggerer_run( + skip_serve_logs=False, capacity=1, triggerer_heartrate=10.3, queues=None + ) mock_process.assert_called_once() mock_run_job.assert_called_once_with( diff --git a/airflow-core/tests/unit/models/test_trigger.py b/airflow-core/tests/unit/models/test_trigger.py index e2b2b7e70be54..dfd0f2e99cd0a 100644 --- a/airflow-core/tests/unit/models/test_trigger.py +++ b/airflow-core/tests/unit/models/test_trigger.py @@ -19,7 +19,7 @@ import datetime import json from collections.abc import AsyncIterator -from typing import Any +from typing import TYPE_CHECKING, Any from unittest.mock import patch import pendulum @@ -50,6 +50,9 @@ from tests_common.test_utils.config import conf_vars +if TYPE_CHECKING: + from sqlalchemy.orm import Session + pytestmark = pytest.mark.db_test @@ -297,254 +300,422 @@ def get_xcoms(ti): assert actual_xcoms == expected_xcoms +@pytest.fixture +def create_triggerer(): + """Fixture factory which creates individual test Triggerer instances.""" + + def _create_triggerer( + session: Session, + state: State, + queues: set[str] | None = None, + end_date: datetime.datetime | None = None, + latest_heartbeat: datetime.datetime | None = None, + ) -> Job: + test_triggerer = Job(heartrate=10, state=state, latest_heartbeat=latest_heartbeat) + TriggererJobRunner(test_triggerer, queues=queues) + if end_date: + test_triggerer.end_date = end_date + session.add(test_triggerer) + return test_triggerer + + return _create_triggerer + + +@pytest.fixture +def create_trigger(create_task_instance): + """Fixture factory which creates individual test trigger instances.""" + + def _create_test_trigger( + session: Session, + name: str, + logical_date: datetime.datetime, + triggerer_id: int | None = None, + queue: str | None = None, + ) -> Trigger: + trig = Trigger(classpath="airflow.triggers.testing.SuccessTrigger", kwargs={}, queue=queue) + trig.triggerer_id = triggerer_id + session.add(trig) + ti = create_task_instance(task_id=f"ti_{name}", logical_date=logical_date, run_id=f"{name}_run_id") + ti.trigger_id = trig.id + session.add(ti) + return trig + + return _create_test_trigger + + @pytest.mark.need_serialized_dag -def test_assign_unassigned(session, create_task_instance): - """ - Tests that unassigned triggers of all appropriate states are assigned. - """ +@pytest.mark.parametrize("use_queues", [False, True]) +def test_assign_unassigned(session, create_triggerer, create_trigger, use_queues: bool): + """Tests that unassigned triggers of all appropriate states are assigned.""" time_now = timezone.utcnow() - triggerer_heartrate = 10 - finished_triggerer = Job(heartrate=triggerer_heartrate, state=State.SUCCESS) - TriggererJobRunner(finished_triggerer) - finished_triggerer.end_date = time_now - datetime.timedelta(hours=1) - session.add(finished_triggerer) - assert not finished_triggerer.is_alive() - healthy_triggerer = Job(heartrate=triggerer_heartrate, state=State.RUNNING) - TriggererJobRunner(healthy_triggerer) - session.add(healthy_triggerer) - assert healthy_triggerer.is_alive() - new_triggerer = Job(heartrate=triggerer_heartrate, state=State.RUNNING) - TriggererJobRunner(new_triggerer) - session.add(new_triggerer) - assert new_triggerer.is_alive() - # This trigger's last heartbeat is older than the check threshold, expect - # its triggers to be taken by other healthy triggerers below - unhealthy_triggerer = Job( - heartrate=triggerer_heartrate, - state=State.RUNNING, - latest_heartbeat=time_now - datetime.timedelta(seconds=100), + queue = "custom_q_name" if use_queues else None + queues = {queue} if isinstance(queue, str) else None + with conf_vars({("triggerer", "queues_enabled"): str(use_queues)}): + finished_triggerer = create_triggerer( + session, State.SUCCESS, end_date=time_now - datetime.timedelta(hours=1), queues=queues + ) + assert not finished_triggerer.is_alive() + healthy_triggerer = create_triggerer(session, State.RUNNING, latest_heartbeat=time_now, queues=queues) + assert healthy_triggerer.is_alive() + new_triggerer = create_triggerer(session, State.RUNNING, latest_heartbeat=time_now, queues=queues) + assert new_triggerer.is_alive() + # This triggerer's last heartbeat is older than the check threshold, expect + # its triggers to be taken by other healthy triggerers below + unhealthy_triggerer = create_triggerer( + session, State.RUNNING, latest_heartbeat=time_now - datetime.timedelta(seconds=100), queues=queues + ) + # Triggerer is not healthy, its last heartbeat was too long ago + assert not unhealthy_triggerer.is_alive() + session.commit() + + trigger_on_healthy_triggerer = create_trigger( + session=session, + name="trigger_on_healthy_triggerer", + logical_date=time_now, + triggerer_id=healthy_triggerer.id, + queue=queue, + ) + + trigger_on_unhealthy_triggerer = create_trigger( + session=session, + name="trigger_on_unhealthy_triggerer", + logical_date=time_now + datetime.timedelta(hours=1), + triggerer_id=unhealthy_triggerer.id, + queue=queue, + ) + + trigger_on_killed_triggerer = create_trigger( + session=session, + name="trigger_on_killed_triggerer", + logical_date=time_now + datetime.timedelta(hours=2), + triggerer_id=finished_triggerer.id, + queue=queue, + ) + + trigger_unassigned_to_triggerer = create_trigger( + session=session, + name="trigger_unassigned_to_triggerer", + logical_date=time_now + datetime.timedelta(hours=3), + triggerer_id=None, + queue=queue, + ) + + trigger_explicit_bad_queue_unassigned_to_triggerer = create_trigger( + session=session, + name="trigger_explicit_bad_queue_unassigned_to_triggerer_bad_q_name", + logical_date=time_now + datetime.timedelta(hours=4), + triggerer_id=None, + queue="bad_q_name", + ) + + assert trigger_unassigned_to_triggerer.triggerer_id is None + assert trigger_explicit_bad_queue_unassigned_to_triggerer.triggerer_id is None + session.commit() + assert session.scalar(select(func.count()).select_from(Trigger)) == 5 + Trigger.assign_unassigned(new_triggerer.id, capacity=100, health_check_threshold=30, queues=queues) + session.expire_all() + # Check that trigger on killed triggerer and unassigned trigger are assigned to new triggerer + assert ( + session.scalar(select(Trigger).where(Trigger.id == trigger_on_killed_triggerer.id)).triggerer_id + == new_triggerer.id + ) + assert ( + session.scalar( + select(Trigger).where(Trigger.id == trigger_unassigned_to_triggerer.id) + ).triggerer_id + == new_triggerer.id + ) + # Check that unassigned trigger with a queue value which has no consuming triggerers remains unassigned + assert ( + session.scalar( + select(Trigger).where(Trigger.id == trigger_explicit_bad_queue_unassigned_to_triggerer.id) + ).triggerer_id + is None + ) + # Check that trigger on healthy triggerer still assigned to existing triggerer + assert ( + session.scalar(select(Trigger).where(Trigger.id == trigger_on_healthy_triggerer.id)).triggerer_id + == healthy_triggerer.id + ) + # Check that trigger on unhealthy triggerer is assigned to new triggerer + assert ( + session.scalar( + select(Trigger).where(Trigger.id == trigger_on_unhealthy_triggerer.id) + ).triggerer_id + == new_triggerer.id + ) + + +@pytest.mark.need_serialized_dag +@conf_vars({("triggerer", "queues_enabled"): "True"}) +def test_assign_unassigned_with_qeueus(session, create_triggerer, create_trigger) -> None: + """Ensures trigger assignment is handled properly when the `triggerer.queues_enabled` option is set to `True`.""" + time_now = timezone.utcnow() + single_q = "custom_q_name" + team1_q = "team1" + team2_q = "team2" + # Queue name which no triggerers are set to consume from + bad_q_name = "bad_q" + triggerers_and_qs: list[tuple[Job, set[str] | None]] = [] + one_q_triggerer = create_triggerer(session, State.RUNNING, latest_heartbeat=time_now, queues={single_q}) + triggerers_and_qs.append((one_q_triggerer, {single_q})) + assert one_q_triggerer.is_alive() + multi_q_triggerer = create_triggerer( + session, State.RUNNING, latest_heartbeat=time_now, queues={team1_q, team2_q} ) - TriggererJobRunner(unhealthy_triggerer) - session.add(unhealthy_triggerer) - # Triggerer is not healtht, its last heartbeat was too long ago - assert not unhealthy_triggerer.is_alive() + triggerers_and_qs.append((multi_q_triggerer, {team1_q, team2_q})) + assert multi_q_triggerer.is_alive() + no_q_triggerer = create_triggerer(session, State.RUNNING, latest_heartbeat=time_now, queues=None) + triggerers_and_qs.append((no_q_triggerer, None)) + assert no_q_triggerer.is_alive() session.commit() - trigger_on_healthy_triggerer = Trigger(classpath="airflow.triggers.testing.SuccessTrigger", kwargs={}) - trigger_on_healthy_triggerer.triggerer_id = healthy_triggerer.id - session.add(trigger_on_healthy_triggerer) - ti_trigger_on_healthy_triggerer = create_task_instance( - task_id="ti_trigger_on_healthy_triggerer", - logical_date=time_now, - run_id="trigger_on_healthy_triggerer_run_id", + dates = [time_now + datetime.timedelta(hours=i) for i in range(5)] + # This should only be assigned to one_q_triggerer. + trigger_single_q = create_trigger( + session=session, name="trigger_single_q", logical_date=dates[0], triggerer_id=None, queue=single_q + ) + # This should only be assigned to multi_q_triggerer. + trigger_team1_q = create_trigger( + session=session, name="trigger_team1_q", logical_date=dates[1], triggerer_id=None, queue=team1_q ) - ti_trigger_on_healthy_triggerer.trigger_id = trigger_on_healthy_triggerer.id - session.add(ti_trigger_on_healthy_triggerer) - trigger_on_unhealthy_triggerer = Trigger(classpath="airflow.triggers.testing.SuccessTrigger", kwargs={}) - trigger_on_unhealthy_triggerer.triggerer_id = unhealthy_triggerer.id - session.add(trigger_on_unhealthy_triggerer) - ti_trigger_on_unhealthy_triggerer = create_task_instance( - task_id="ti_trigger_on_unhealthy_triggerer", - logical_date=time_now + datetime.timedelta(hours=1), - run_id="trigger_on_unhealthy_triggerer_run_id", + # This should only be assigned to multi_q_triggerer. + trigger_team2_q = create_trigger( + session=session, name="trigger_team2_q", logical_date=dates[2], triggerer_id=None, queue=team2_q ) - ti_trigger_on_unhealthy_triggerer.trigger_id = trigger_on_unhealthy_triggerer.id - session.add(ti_trigger_on_unhealthy_triggerer) - trigger_on_killed_triggerer = Trigger(classpath="airflow.triggers.testing.SuccessTrigger", kwargs={}) - trigger_on_killed_triggerer.triggerer_id = finished_triggerer.id - session.add(trigger_on_killed_triggerer) - ti_trigger_on_killed_triggerer = create_task_instance( - task_id="ti_trigger_on_killed_triggerer", - logical_date=time_now + datetime.timedelta(hours=2), - run_id="trigger_on_killed_triggerer_run_id", + # This should only be assigned to the triggerer which is not consuming from any queues (e.g. no_q_triggerer). + trigger_no_queue = create_trigger( + session=session, name="trigger_no_queue", logical_date=dates[3], triggerer_id=None, queue=None ) - ti_trigger_on_killed_triggerer.trigger_id = trigger_on_killed_triggerer.id - session.add(ti_trigger_on_killed_triggerer) - trigger_unassigned_to_triggerer = Trigger(classpath="airflow.triggers.testing.SuccessTrigger", kwargs={}) - session.add(trigger_unassigned_to_triggerer) - ti_trigger_unassigned_to_triggerer = create_task_instance( - task_id="ti_trigger_unassigned_to_triggerer", - logical_date=time_now + datetime.timedelta(hours=3), - run_id="trigger_unassigned_to_triggerer_run_id", + # This should never get assigned since no triggerers are consuming from this queue value. + trigger_bad_q = create_trigger( + session=session, name="trigger_bad_q", logical_date=dates[4], triggerer_id=None, queue=bad_q_name ) - ti_trigger_unassigned_to_triggerer.trigger_id = trigger_unassigned_to_triggerer.id - session.add(ti_trigger_unassigned_to_triggerer) - assert trigger_unassigned_to_triggerer.triggerer_id is None + + for trig in [trigger_single_q, trigger_team1_q, trigger_team2_q, trigger_no_queue, trigger_bad_q]: + assert trig.triggerer_id is None session.commit() - assert session.scalar(select(func.count()).select_from(Trigger)) == 4 - Trigger.assign_unassigned(new_triggerer.id, 100, health_check_threshold=30) + assert session.scalar(select(func.count()).select_from(Trigger)) == 5 + # Call assign_unassigned against all triggerers + for triggerer, triggerer_queues in triggerers_and_qs: + Trigger.assign_unassigned( + triggerer.id, capacity=100, health_check_threshold=30, queues=triggerer_queues + ) session.expire_all() - # Check that trigger on killed triggerer and unassigned trigger are assigned to new triggerer + # Ensure trigger_single_q (queue value: 'custom_q_name') is assigned to one_q_triggerer assert ( - session.scalar(select(Trigger).where(Trigger.id == trigger_on_killed_triggerer.id)).triggerer_id - == new_triggerer.id - ) + session.scalar(select(Trigger).where(Trigger.id == trigger_single_q.id)).triggerer_id + == one_q_triggerer.id + ), f"Trigger with queue '{single_q}' was not assigned to triggerer consuming from that queue." + # Ensure trigger_team1_q (queue value: 'team1') is assigned to multi_q_triggerer assert ( - session.scalar(select(Trigger).where(Trigger.id == trigger_unassigned_to_triggerer.id)).triggerer_id - == new_triggerer.id - ) - # Check that trigger on healthy triggerer still assigned to existing triggerer + session.scalar(select(Trigger).where(Trigger.id == trigger_team1_q.id)).triggerer_id + == multi_q_triggerer.id + ), f"Trigger with queue '{team1_q}' was not assigned to triggerer consuming from that queue." + # Ensure trigger_team2_q (queue value: 'team2') is assigned to multi_q_triggerer assert ( - session.scalar(select(Trigger).where(Trigger.id == trigger_on_healthy_triggerer.id)).triggerer_id - == healthy_triggerer.id - ) - # Check that trigger on unhealthy triggerer is assigned to new triggerer + session.scalar(select(Trigger).where(Trigger.id == trigger_team2_q.id)).triggerer_id + == multi_q_triggerer.id + ), f"Trigger with queue '{team2_q}' was not assigned to triggerer consuming from that queue." + # Ensure trigger_no_queue (queue value: `None`) is assigned to no_q_triggerer (queues: `None`) assert ( - session.scalar(select(Trigger).where(Trigger.id == trigger_on_unhealthy_triggerer.id)).triggerer_id - == new_triggerer.id + session.scalar(select(Trigger).where(Trigger.id == trigger_no_queue.id)).triggerer_id + == no_q_triggerer.id + ), ( + "Trigger with no queue should only be assigned to triggerers with no `--queues` constraint, but was not." + ) + # Check that unassigned trigger with a queue value which has no consuming triggerers remains unassigned + assert session.scalar(select(Trigger).where(Trigger.id == trigger_bad_q.id)).triggerer_id is None, ( + f"Trigger with queue '{bad_q_name}' should not be assigned to any triggerers since no triggerers reference it." ) +def test_queue_column_max_len_matches_ti_column_max_len() -> None: + """Ensures that the `trigger.queue` column has the same max length as the `task_instance.queue` column.""" + expected_queue_col_max_length_from_ti = TaskInstance.queue.property.columns[0].type.length + trigger_queue_col_max_length = Trigger.queue.property.columns[0].type.length + assert trigger_queue_col_max_length == expected_queue_col_max_length_from_ti + + @pytest.mark.need_serialized_dag -def test_get_sorted_triggers_same_priority_weight(session, create_task_instance): +@pytest.mark.parametrize("use_queues", [False, True]) +def test_get_sorted_triggers_same_priority_weight(session, create_task_instance, use_queues: bool): """ Tests that triggers are sorted by the creation_date if they have the same priority. """ old_logical_date = datetime.datetime( 2023, 5, 9, 12, 16, 14, 474415, tzinfo=pytz.timezone("Africa/Abidjan") ) - trigger_old = Trigger( - classpath="airflow.triggers.testing.SuccessTrigger", - kwargs={}, - created_date=old_logical_date + datetime.timedelta(seconds=30), - ) - session.add(trigger_old) - TI_old = create_task_instance( - task_id="old", - logical_date=old_logical_date, - run_id="old_run_id", - ) - TI_old.priority_weight = 1 - TI_old.trigger_id = trigger_old.id - session.add(TI_old) - - new_logical_date = datetime.datetime( - 2023, 5, 9, 12, 17, 14, 474415, tzinfo=pytz.timezone("Africa/Abidjan") - ) - trigger_new = Trigger( - classpath="airflow.triggers.testing.SuccessTrigger", - kwargs={}, - created_date=new_logical_date + datetime.timedelta(seconds=30), - ) - session.add(trigger_new) - TI_new = create_task_instance( - task_id="new", - logical_date=new_logical_date, - run_id="new_run_id", - ) - TI_new.priority_weight = 1 - TI_new.trigger_id = trigger_new.id - session.add(TI_new) - trigger_orphan = Trigger( - classpath="airflow.triggers.testing.TriggerOrphan", - kwargs={}, - created_date=new_logical_date, - ) - session.add(trigger_orphan) - trigger_asset = Trigger( - classpath="airflow.triggers.testing.TriggerAsset", - kwargs={}, - created_date=new_logical_date, - ) - session.add(trigger_asset) - trigger_callback = Trigger( - classpath="airflow.triggers.testing.TriggerCallback", - kwargs={}, - created_date=new_logical_date, - ) - session.add(trigger_callback) - session.commit() - assert session.scalar(select(func.count()).select_from(Trigger)) == 5 - # Create assets - asset = AssetModel("test") - asset.add_trigger(trigger_asset, "test_asset_watcher") - session.add(asset) - # Create callback with trigger - callback = TriggererCallback(callback_def=AsyncCallback("classpath.callback")) - callback.trigger = trigger_callback - session.add(callback) - session.commit() + # Whether or not trigger queues are used should have no impact on the matched trigger sort order. + queue = "fake_trigger_q_name" if use_queues else None + queues = {queue} if isinstance(queue, str) else None + with conf_vars({("triggerer", "queues_enabled"): str(use_queues)}): + trigger_old = Trigger( + classpath="airflow.triggers.testing.SuccessTrigger", + kwargs={}, + created_date=old_logical_date + datetime.timedelta(seconds=30), + queue=queue, + ) + session.add(trigger_old) + TI_old = create_task_instance(task_id="old", logical_date=old_logical_date, run_id="old_run_id") + TI_old.priority_weight = 1 + TI_old.trigger_id = trigger_old.id + session.add(TI_old) + + new_logical_date = datetime.datetime( + 2023, 5, 9, 12, 17, 14, 474415, tzinfo=pytz.timezone("Africa/Abidjan") + ) + trigger_new = Trigger( + classpath="airflow.triggers.testing.SuccessTrigger", + kwargs={}, + created_date=new_logical_date + datetime.timedelta(seconds=30), + queue=queue, + ) + session.add(trigger_new) + TI_new = create_task_instance( + task_id="new", + logical_date=new_logical_date, + run_id="new_run_id", + ) + TI_new.priority_weight = 1 + TI_new.trigger_id = trigger_new.id + session.add(TI_new) + trigger_orphan = Trigger( + classpath="airflow.triggers.testing.TriggerOrphan", + kwargs={}, + created_date=new_logical_date, + queue=queue, + ) + session.add(trigger_orphan) + trigger_asset = Trigger( + classpath="airflow.triggers.testing.TriggerAsset", + kwargs={}, + created_date=new_logical_date, + queue=queue, + ) + session.add(trigger_asset) + trigger_callback = Trigger( + classpath="airflow.triggers.testing.TriggerCallback", + kwargs={}, + created_date=new_logical_date, + queue=queue, + ) + session.add(trigger_callback) + session.commit() + assert session.scalar(select(func.count()).select_from(Trigger)) == 5 + # Create assets + asset = AssetModel("test") + asset.add_trigger(trigger_asset, "test_asset_watcher") + session.add(asset) + # Create callback with trigger + callback = TriggererCallback(callback_def=AsyncCallback("classpath.callback")) + callback.trigger = trigger_callback + session.add(callback) + session.commit() - trigger_ids_query = Trigger.get_sorted_triggers(capacity=100, alive_triggerer_ids=[], session=session) + trigger_ids_query = Trigger.get_sorted_triggers( + capacity=100, alive_triggerer_ids=[], queues=queues, session=session + ) - # Callback triggers should be first, followed by task triggers, then asset triggers - assert trigger_ids_query == [ - (trigger_callback.id,), - (trigger_old.id,), - (trigger_new.id,), - (trigger_asset.id,), - ] + # Callback triggers should be first, followed by task triggers, then asset triggers + assert trigger_ids_query == [ + (trigger_callback.id,), + (trigger_old.id,), + (trigger_new.id,), + (trigger_asset.id,), + ] @pytest.mark.need_serialized_dag -def test_get_sorted_triggers_different_priority_weights(session, create_task_instance): +@pytest.mark.parametrize("use_queues", [False, True]) +def test_get_sorted_triggers_different_priority_weights(session, create_task_instance, use_queues: bool): """ Tests that triggers are sorted by the priority_weight. """ - callback_triggers = [ - Trigger(classpath="airflow.triggers.testing.CallbackTrigger", kwargs={}), - Trigger(classpath="airflow.triggers.testing.CallbackTrigger", kwargs={}), - Trigger(classpath="airflow.triggers.testing.CallbackTrigger", kwargs={}), - ] - session.add_all(callback_triggers) - session.flush() - - callbacks = [ - TriggererCallback(callback_def=AsyncCallback("classpath.low"), priority_weight=1), - TriggererCallback(callback_def=AsyncCallback("classpath.mid"), priority_weight=5), - TriggererCallback(callback_def=AsyncCallback("classpath.high"), priority_weight=10), - ] - for callback, trigger in zip(callbacks, callback_triggers): - callback.trigger = trigger - session.add_all(callbacks) + # Whether or not trigger queues are used should have no impact on the matched trigger sort order. + queue = "fake_trigger_q_name" if use_queues else None + queues = {queue} if isinstance(queue, str) else None + with conf_vars({("triggerer", "queues_enabled"): str(use_queues)}): + callback_triggers = [ + Trigger(classpath="airflow.triggers.testing.CallbackTrigger", kwargs={}), + Trigger(classpath="airflow.triggers.testing.CallbackTrigger", kwargs={}), + Trigger(classpath="airflow.triggers.testing.CallbackTrigger", kwargs={}), + ] + session.add_all(callback_triggers) + session.flush() - old_logical_date = datetime.datetime( - 2023, 5, 9, 12, 16, 14, 474415, tzinfo=pytz.timezone("Africa/Abidjan") - ) - trigger_old = Trigger( - classpath="airflow.triggers.testing.SuccessTrigger", - kwargs={}, - created_date=old_logical_date + datetime.timedelta(seconds=30), - ) - session.add(trigger_old) - TI_old = create_task_instance( - task_id="old", - logical_date=old_logical_date, - run_id="old_run_id", - ) - TI_old.priority_weight = 1 - TI_old.trigger_id = trigger_old.id - session.add(TI_old) + callbacks = [ + TriggererCallback(callback_def=AsyncCallback("classpath.low"), priority_weight=1), + TriggererCallback(callback_def=AsyncCallback("classpath.mid"), priority_weight=5), + TriggererCallback(callback_def=AsyncCallback("classpath.high"), priority_weight=10), + ] + for callback, trigger in zip(callbacks, callback_triggers): + callback.trigger = trigger + session.add_all(callbacks) + + old_logical_date = datetime.datetime( + 2023, 5, 9, 12, 16, 14, 474415, tzinfo=pytz.timezone("Africa/Abidjan") + ) + trigger_old = Trigger( + classpath="airflow.triggers.testing.SuccessTrigger", + kwargs={}, + created_date=old_logical_date + datetime.timedelta(seconds=30), + queue=queue, + ) + session.add(trigger_old) + TI_old = create_task_instance( + task_id="old", + logical_date=old_logical_date, + run_id="old_run_id", + ) + TI_old.priority_weight = 1 + TI_old.trigger_id = trigger_old.id + session.add(TI_old) - new_logical_date = datetime.datetime( - 2023, 5, 9, 12, 17, 14, 474415, tzinfo=pytz.timezone("Africa/Abidjan") - ) - trigger_new = Trigger( - classpath="airflow.triggers.testing.SuccessTrigger", - kwargs={}, - created_date=new_logical_date + datetime.timedelta(seconds=30), - ) - session.add(trigger_new) - TI_new = create_task_instance( - task_id="new", - logical_date=new_logical_date, - run_id="new_run_id", - ) - TI_new.priority_weight = 2 - TI_new.trigger_id = trigger_new.id - session.add(TI_new) + new_logical_date = datetime.datetime( + 2023, 5, 9, 12, 17, 14, 474415, tzinfo=pytz.timezone("Africa/Abidjan") + ) + trigger_new = Trigger( + classpath="airflow.triggers.testing.SuccessTrigger", + kwargs={}, + created_date=new_logical_date + datetime.timedelta(seconds=30), + queue=queue, + ) + session.add(trigger_new) + TI_new = create_task_instance( + task_id="new", + logical_date=new_logical_date, + run_id="new_run_id", + ) + TI_new.priority_weight = 2 + TI_new.trigger_id = trigger_new.id + session.add(TI_new) - session.commit() - assert session.scalar(select(func.count()).select_from(Trigger)) == 5 + session.commit() + assert session.scalar(select(func.count()).select_from(Trigger)) == 5 - trigger_ids_query = Trigger.get_sorted_triggers(capacity=100, alive_triggerer_ids=[], session=session) + trigger_ids_query = Trigger.get_sorted_triggers( + capacity=100, queues=queues, alive_triggerer_ids=[], session=session + ) - assert trigger_ids_query == [ - (callback_triggers[2].id,), - (callback_triggers[1].id,), - (callback_triggers[0].id,), - (trigger_new.id,), - (trigger_old.id,), - ] + if use_queues: + assert trigger_ids_query == [(trigger_new.id,), (trigger_old.id,)] + trigger_ids_query_no_queue = Trigger.get_sorted_triggers( + capacity=100, queues=None, alive_triggerer_ids=[], session=session + ) + # Callback triggers should not have a queue assignment + assert trigger_ids_query_no_queue == [ + (callback_triggers[2].id,), + (callback_triggers[1].id,), + (callback_triggers[0].id,), + ] + else: + assert trigger_ids_query == [ + (callback_triggers[2].id,), + (callback_triggers[1].id,), + (callback_triggers[0].id,), + (trigger_new.id,), + (trigger_old.id,), + ] @pytest.mark.need_serialized_dag @@ -554,6 +725,7 @@ def test_get_sorted_triggers_dont_starve_for_ha(session, create_task_instance): starvation in HA setups. When capacity is large, it should limit triggers per loop to avoid one triggerer picking up too many triggers. """ + # Whether or not trigger queues are used should not fundamentally change trigger capacity logic. # Create 20 callback triggers with different priorities callback_triggers = [] for i in range(20): @@ -572,9 +744,7 @@ def test_get_sorted_triggers_dont_starve_for_ha(session, create_task_instance): for i in range(20): logical_date = datetime.datetime(2023, 5, 9, 12, i, 0, tzinfo=pytz.timezone("UTC")) trigger = Trigger( - classpath="airflow.triggers.testing.SuccessTrigger", - kwargs={}, - created_date=logical_date, + classpath="airflow.triggers.testing.SuccessTrigger", kwargs={}, created_date=logical_date ) session.add(trigger) session.flush() @@ -593,9 +763,7 @@ def test_get_sorted_triggers_dont_starve_for_ha(session, create_task_instance): for i in range(20): logical_date = datetime.datetime(2023, 5, 9, 13, i, 0, tzinfo=pytz.timezone("UTC")) trigger = Trigger( - classpath="airflow.triggers.testing.AssetTrigger", - kwargs={}, - created_date=logical_date, + classpath="airflow.triggers.testing.AssetTrigger", kwargs={}, created_date=logical_date ) session.add(trigger) session.flush() @@ -611,7 +779,9 @@ def test_get_sorted_triggers_dont_starve_for_ha(session, create_task_instance): with patch.object(Trigger, "max_trigger_to_select_per_loop", 5): # Test with large capacity (100) - should respect max_trigger_to_select_per_loop (5) # and return only 5 triggers from each category (callback, task, asset) - trigger_ids_query = Trigger.get_sorted_triggers(capacity=100, alive_triggerer_ids=[], session=session) + trigger_ids_query = Trigger.get_sorted_triggers( + capacity=100, alive_triggerer_ids=[], session=session, queues=None + ) # Should get 5 callbacks (max_trigger_to_select_per_loop), then 5 tasks, then 5 assets # Total: 15 triggers instead of all 60 @@ -631,13 +801,73 @@ def test_get_sorted_triggers_dont_starve_for_ha(session, create_task_instance): # Test with capacity smaller than max_trigger_to_select_per_loop # Should respect capacity instead - trigger_ids_query = Trigger.get_sorted_triggers(capacity=3, alive_triggerer_ids=[], session=session) + trigger_ids_query = Trigger.get_sorted_triggers( + capacity=3, alive_triggerer_ids=[], session=session, queues=None + ) # Should get only 3 callback triggers (capacity limit) assert len(trigger_ids_query) == 3 assert [row[0] for row in trigger_ids_query] == callback_ids[:3] +@pytest.mark.need_serialized_dag +@conf_vars({("triggerer", "queues_enabled"): "True"}) +def test_get_sorted_triggers_dont_starve_for_ha_with_queues(session, create_task_instance): + """ + Tests that get_sorted_triggers respects max_trigger_to_select_per_loop when trigger queue assignment + is enabled. When capacity is large, it should limit triggers per loop to avoid one triggerer picking + up too many triggers. + """ + # Whether or not trigger queues are used should not fundamentally change trigger capacity logic. + + # Create 20 task instance triggers with different priorities + task_triggers = [] + queue = "fake_q" + for i in range(20): + logical_date = datetime.datetime(2023, 5, 9, 12, i, 0, tzinfo=pytz.timezone("UTC")) + trigger = Trigger( + classpath="airflow.triggers.testing.SuccessTrigger", + kwargs={}, + created_date=logical_date, + queue=queue, + ) + session.add(trigger) + session.flush() + ti = create_task_instance(task_id=f"task_{i}", logical_date=logical_date, run_id=f"run_{i}") + ti.priority_weight = 20 - i + ti.trigger_id = trigger.id + session.add(ti) + task_triggers.append(trigger) + + session.commit() + assert session.scalar(select(func.count()).select_from(Trigger)) == 20 + + # Mock max_trigger_to_select_per_loop to 5 for testing + with patch.object(Trigger, "max_trigger_to_select_per_loop", 5): + # Test with large capacity (100) - should respect max_trigger_to_select_per_loop (5) + # and return only 5 task triggers + trigger_ids_query = Trigger.get_sorted_triggers( + capacity=100, alive_triggerer_ids=[], session=session, queues={queue} + ) + + # Should get 5 task triggers out of a total of 20 + assert len(trigger_ids_query) == 5 + + # 5 task triggers should be the highest priority ones + task_ids = [t.id for t in task_triggers[:5]] + assert [row[0] for row in trigger_ids_query[:5]] == task_ids + + # Test with capacity smaller than max_trigger_to_select_per_loop + # Should respect capacity instead + trigger_ids_query = Trigger.get_sorted_triggers( + capacity=3, alive_triggerer_ids=[], session=session, queues={queue} + ) + + # Should get only 3 task triggers (capacity limit) + assert len(trigger_ids_query) == 3 + assert [row[0] for row in trigger_ids_query] == task_ids[:3] + + class SensitiveKwargsTrigger(BaseTrigger): """ A trigger that has sensitive kwargs. diff --git a/airflow-ctl/src/airflowctl/api/datamodels/generated.py b/airflow-ctl/src/airflowctl/api/datamodels/generated.py index 71bff5eea1534..6472a722397df 100644 --- a/airflow-ctl/src/airflowctl/api/datamodels/generated.py +++ b/airflow-ctl/src/airflowctl/api/datamodels/generated.py @@ -894,6 +894,7 @@ class TriggerResponse(BaseModel): classpath: Annotated[str, Field(title="Classpath")] kwargs: Annotated[str, Field(title="Kwargs")] created_date: Annotated[datetime, Field(title="Created Date")] + queue: Annotated[str | None, Field(title="Queue")] = None triggerer_id: Annotated[int | None, Field(title="Triggerer Id")] = None diff --git a/task-sdk/src/airflow/sdk/api/datamodels/_generated.py b/task-sdk/src/airflow/sdk/api/datamodels/_generated.py index 5ffcd7a42d4a8..6a3a07f5e8a92 100644 --- a/task-sdk/src/airflow/sdk/api/datamodels/_generated.py +++ b/task-sdk/src/airflow/sdk/api/datamodels/_generated.py @@ -200,6 +200,7 @@ class TIDeferredStatePayload(BaseModel): classpath: Annotated[str, Field(title="Classpath")] trigger_kwargs: Annotated[dict[str, JsonValue] | str | None, Field(title="Trigger Kwargs")] = None trigger_timeout: Annotated[timedelta | None, Field(title="Trigger Timeout")] = None + queue: Annotated[str | None, Field(title="Queue")] = None next_method: Annotated[str, Field(title="Next Method")] next_kwargs: Annotated[dict[str, JsonValue] | None, Field(title="Next Kwargs")] = None rendered_map_index: Annotated[str | None, Field(title="Rendered Map Index")] = None diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py b/task-sdk/src/airflow/sdk/execution_time/task_runner.py index 1d15b9b87781b..761486279eba9 100644 --- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py +++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py @@ -120,6 +120,8 @@ from airflow.sdk.execution_time.xcom import XCom from airflow.sdk.observability.stats import Stats from airflow.sdk.timezone import coerce_datetime +from airflow.triggers.base import BaseEventTrigger +from airflow.triggers.callback import CallbackTrigger if TYPE_CHECKING: import jinja2 @@ -1004,6 +1006,13 @@ def _defer_task( log.info("Pausing task as DEFERRED. ", dag_id=ti.dag_id, task_id=ti.task_id, run_id=ti.run_id) classpath, trigger_kwargs = defer.trigger.serialize() + queue: str | None = None + # Currently, only task-associated BaseTrigger instances may have a non-None queue, + # and only when triggerer.queues_enabled is True. + if not isinstance(defer.trigger, (BaseEventTrigger, CallbackTrigger)) and conf.getboolean( + "triggerer", "queues_enabled", fallback=False + ): + queue = ti.task.queue from airflow.sdk.serde import serialize as serde_serialize @@ -1018,6 +1027,7 @@ def _defer_task( classpath=classpath, trigger_kwargs=trigger_kwargs, trigger_timeout=defer.timeout, + queue=queue, next_method=defer.method_name, next_kwargs=next_kwargs, ) diff --git a/task-sdk/tests/task_sdk/api/test_client.py b/task-sdk/tests/task_sdk/api/test_client.py index 22b85eeff6f73..ea9d403fe21ad 100644 --- a/task-sdk/tests/task_sdk/api/test_client.py +++ b/task-sdk/tests/task_sdk/api/test_client.py @@ -344,9 +344,11 @@ def handle_request(request: httpx.Request) -> httpx.Response: client = make_client(transport=httpx.MockTransport(handle_request)) client.task_instances.heartbeat(ti_id, 100) - def test_task_instance_defer(self): + @pytest.mark.parametrize("queues_enabled", [False, True]) + def test_task_instance_defer(self, queues_enabled: bool): # Simulate a successful response from the server that defers a task ti_id = uuid6.uuid7() + task_queue = "test" msg = DeferTask( classpath="airflow.providers.standard.triggers.temporal.DateTimeTrigger", @@ -359,6 +361,7 @@ def test_task_instance_defer(self): }, }, next_kwargs={"__type": "dict", "__var": {}}, + queue=task_queue if queues_enabled else None, ) def handle_request(request: httpx.Request) -> httpx.Response: @@ -370,6 +373,10 @@ def handle_request(request: httpx.Request) -> httpx.Response: actual_body["classpath"] == "airflow.providers.standard.triggers.temporal.DateTimeTrigger" ) assert actual_body["next_method"] == "execute_complete" + if queues_enabled: + assert actual_body["queue"] == task_queue + else: + assert actual_body.get("queue") is None return httpx.Response( status_code=204, ) diff --git a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py index 90cbe6a4f2b9f..406a48a7ba836 100644 --- a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py +++ b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py @@ -26,7 +26,7 @@ from collections.abc import Iterable from datetime import datetime, timedelta from pathlib import Path -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Any from unittest import mock from unittest.mock import call, patch @@ -71,6 +71,7 @@ AirflowTaskTimeout, DownstreamTasksSkipped, ErrorType, + TaskDeferred, ) from airflow.sdk.execution_time.comms import ( AssetEventResult, @@ -121,6 +122,7 @@ from airflow.sdk.execution_time.task_runner import ( RuntimeTaskInstance, TaskRunnerMarker, + _defer_task, _execute_task, _push_xcom_if_needed, _xcom_push, @@ -130,6 +132,9 @@ startup, ) from airflow.sdk.execution_time.xcom import XCom +from airflow.triggers.base import BaseEventTrigger, BaseTrigger, TriggerEvent +from airflow.triggers.callback import CallbackTrigger +from airflow.triggers.testing import SuccessTrigger from tests_common.test_utils.config import conf_vars from tests_common.test_utils.mock_operators import AirflowLink @@ -352,54 +357,105 @@ def test_parse_module_in_bundle_root(tmp_path: Path, make_ti_context): assert ti.task.dag.dag_id == "dag_name" -def test_run_deferred_basic(time_machine, create_runtime_ti, mock_supervisor_comms): +@pytest.mark.parametrize("use_queues", [False, True]) +def test_run_deferred_basic(time_machine, create_runtime_ti, mock_supervisor_comms, use_queues: bool): """Test that a task can transition to a deferred state.""" from airflow.providers.standard.sensors.date_time import DateTimeSensorAsync + task_queue = "fake_q" + deferred_queue = task_queue if use_queues else None # Use the time machine to set the current time instant = timezone.datetime(2024, 11, 22) - task = DateTimeSensorAsync( - task_id="async", - target_time=str(instant + timedelta(seconds=3)), - poke_interval=60, - timeout=600, - ) - time_machine.move_to(instant, tick=False) + with conf_vars({("triggerer", "queues_enabled"): str(use_queues)}): + task = DateTimeSensorAsync( + task_id="async", + target_time=str(instant + timedelta(seconds=3)), + poke_interval=60, + timeout=600, + queue=task_queue, + ) + time_machine.move_to(instant, tick=False) - # Expected DeferTask, it is constructed by _defer_task from exception and is sent to supervisor - expected_defer_task = DeferTask( - state="deferred", - classpath="airflow.providers.standard.triggers.temporal.DateTimeTrigger", - trigger_kwargs={ - "moment": { - "__classname__": "pendulum.datetime.DateTime", - "__version__": 2, - "__data__": { - "timestamp": 1732233603.0, - "tz": { - "__classname__": "builtins.tuple", - "__version__": 1, - "__data__": ["UTC", "pendulum.tz.timezone.Timezone", 1, True], + # Expected DeferTask, it is constructed by _defer_task from exception and is sent to supervisor + expected_defer_task = DeferTask( + state="deferred", + classpath="airflow.providers.standard.triggers.temporal.DateTimeTrigger", + trigger_kwargs={ + "moment": { + "__classname__": "pendulum.datetime.DateTime", + "__version__": 2, + "__data__": { + "timestamp": 1732233603.0, + "tz": { + "__classname__": "builtins.tuple", + "__version__": 1, + "__data__": ["UTC", "pendulum.tz.timezone.Timezone", 1, True], + }, }, }, + "end_from_trigger": False, }, - "end_from_trigger": False, - }, - trigger_timeout=None, - next_method="execute_complete", - next_kwargs={}, - rendered_map_index=None, - type="DeferTask", - ) + trigger_timeout=None, + queue=deferred_queue, + next_method="execute_complete", + next_kwargs={}, + rendered_map_index=None, + type="DeferTask", + ) - # Run the task - ti = create_runtime_ti(dag_id="basic_deferred_run", task=task) - run(ti, context=ti.get_template_context(), log=mock.MagicMock()) + # Run the task + ti = create_runtime_ti(dag_id="basic_deferred_run", task=task) + run(ti, context=ti.get_template_context(), log=mock.MagicMock()) + + assert ti.state == TaskInstanceState.DEFERRED - assert ti.state == TaskInstanceState.DEFERRED + # send will only be called when the TaskDeferred exception is raised + mock_supervisor_comms.send.assert_any_call(expected_defer_task) - # send will only be called when the TaskDeferred exception is raised - mock_supervisor_comms.send.assert_any_call(expected_defer_task) + +class FakeEventTrigger(BaseEventTrigger): + """Fake event trigger class for testing""" + + def serialize(self) -> tuple[str, dict[str, Any]]: + return ("tests.task_sdk.execution_time.test_task_runner.FakeEventTrigger", {}) + + async def run(self): + yield TriggerEvent(True) + + +@conf_vars({("triggerer", "queues_enabled"): "True"}) +@pytest.mark.parametrize( + ("mock_trigger", "expected_trigger_queue"), + [ + (SuccessTrigger(kwargs={}), "task_q_test"), + (FakeEventTrigger(kwarg={}), None), + ( + CallbackTrigger( + callback_path="classpath.test_callback", + callback_kwargs={"message": "test_msg", "context": {"dag_run": "test"}}, + ), + None, + ), + ], +) +def test_defer_task_queue_assignment( + create_runtime_ti, mock_trigger: BaseTrigger, expected_trigger_queue: str | None +) -> None: + """Ensure `_defer_task` will only pass along origin task queue information to the trigger message when expected.""" + from airflow.providers.standard.operators.empty import EmptyOperator + + mock_task_queue = "task_q_test" + task = EmptyOperator(task_id="empty_trig_queue_test", queue=mock_task_queue) + runtime_ti = create_runtime_ti(dag_id="deferred_run", task=task) + actual_msg, actual_state = _defer_task( + defer=TaskDeferred(trigger=mock_trigger, method_name="foo"), ti=runtime_ti, log=mock.MagicMock() + ) + assert isinstance(actual_msg, DeferTask) + assert actual_state == TaskInstanceState.DEFERRED + actual_queue = actual_msg.queue + assert actual_queue == expected_trigger_queue, ( + f"Expected DeferTask's queue value to be {mock_task_queue}, but got {actual_queue}" + ) def test_run_downstream_skipped(mocked_parse, create_runtime_ti, mock_supervisor_comms):