From fa5c5c604eee0c8718af3fc667c70d51515051fe Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Thu, 5 Jun 2025 20:13:09 +0800 Subject: [PATCH 001/122] fix(migrations): from 2.7.0 to 3.0.0 for SQLite (#51431) --- airflow-core/docs/img/airflow_erd.sha256 | 2 +- airflow-core/docs/img/airflow_erd.svg | 816 +++++++++--------- airflow-core/src/airflow/migrations/utils.py | 9 + ...istency_between_ORM_and_migration_files.py | 8 +- .../versions/0047_3_0_0_add_dag_versioning.py | 16 +- 5 files changed, 436 insertions(+), 415 deletions(-) diff --git a/airflow-core/docs/img/airflow_erd.sha256 b/airflow-core/docs/img/airflow_erd.sha256 index da3765115e31d..d7bbf4bb8c209 100644 --- a/airflow-core/docs/img/airflow_erd.sha256 +++ b/airflow-core/docs/img/airflow_erd.sha256 @@ -1 +1 @@ -066cb891884eea1ee0496b5c507d4a52c20d0440387f9ec8bacb1d616a26e40e \ No newline at end of file +ba7271a819353797fae12c105853ef4d30d0029ca705f5611577496e20e1234f \ No newline at end of file diff --git a/airflow-core/docs/img/airflow_erd.svg b/airflow-core/docs/img/airflow_erd.svg index 879c9f17f903e..8c66ac9da76dc 100644 --- a/airflow-core/docs/img/airflow_erd.svg +++ b/airflow-core/docs/img/airflow_erd.svg @@ -880,470 +880,470 @@ 0..N {0,1} - - -rendered_task_instance_fields - -rendered_task_instance_fields - -dag_id - - [VARCHAR(250)] - NOT NULL - -map_index - - [INTEGER] - NOT NULL - -run_id - - [VARCHAR(250)] - NOT NULL - -task_id - - [VARCHAR(250)] - NOT NULL - -k8s_pod_yaml - - [JSON] - -rendered_fields - - [JSON] - NOT NULL - - - -task_instance--rendered_task_instance_fields - -0..N -1 - - - -task_instance--rendered_task_instance_fields - -0..N -1 - - - -task_instance--rendered_task_instance_fields - -0..N -1 - - - -task_instance--rendered_task_instance_fields - -0..N -1 - - + task_map - -task_map - -dag_id - - [VARCHAR(250)] - NOT NULL - -map_index - - [INTEGER] - NOT NULL - -run_id - - [VARCHAR(250)] - NOT NULL - -task_id - - [VARCHAR(250)] - NOT NULL - -keys - - [JSONB] - -length - - [INTEGER] - NOT NULL + +task_map + +dag_id + + [VARCHAR(250)] + NOT NULL + +map_index + + [INTEGER] + NOT NULL + +run_id + + [VARCHAR(250)] + NOT NULL + +task_id + + [VARCHAR(250)] + NOT NULL + +keys + + [JSONB] + +length + + [INTEGER] + NOT NULL - + task_instance--task_map - -0..N -1 + +0..N +1 - + task_instance--task_map - -0..N -1 + +0..N +1 - + task_instance--task_map - -0..N -1 + +0..N +1 - + task_instance--task_map - -0..N -1 + +0..N +1 - + task_reschedule - -task_reschedule - -id - - [INTEGER] - NOT NULL - -duration - - [INTEGER] - NOT NULL - -end_date - - [TIMESTAMP] - NOT NULL - -reschedule_date - - [TIMESTAMP] - NOT NULL - -start_date - - [TIMESTAMP] - NOT NULL - -ti_id - - [UUID] - NOT NULL + +task_reschedule + +id + + [INTEGER] + NOT NULL + +duration + + [INTEGER] + NOT NULL + +end_date + + [TIMESTAMP] + NOT NULL + +reschedule_date + + [TIMESTAMP] + NOT NULL + +start_date + + [TIMESTAMP] + NOT NULL + +ti_id + + [UUID] + NOT NULL - + task_instance--task_reschedule - -0..N -1 + +0..N +1 - + xcom - -xcom - -dag_run_id - - [INTEGER] - NOT NULL - -key - - [VARCHAR(512)] - NOT NULL - -map_index - - [INTEGER] - NOT NULL - -task_id - - [VARCHAR(250)] - NOT NULL - -dag_id - - [VARCHAR(250)] - NOT NULL - -run_id - - [VARCHAR(250)] - NOT NULL - -timestamp - - [TIMESTAMP] - NOT NULL - -value - - [JSONB] + +xcom + +dag_run_id + + [INTEGER] + NOT NULL + +key + + [VARCHAR(512)] + NOT NULL + +map_index + + [INTEGER] + NOT NULL + +task_id + + [VARCHAR(250)] + NOT NULL + +dag_id + + [VARCHAR(250)] + NOT NULL + +run_id + + [VARCHAR(250)] + NOT NULL + +timestamp + + [TIMESTAMP] + NOT NULL + +value + + [JSONB] - + task_instance--xcom - -0..N -1 + +0..N +1 - + task_instance--xcom - -0..N -1 + +0..N +1 - + task_instance--xcom - -0..N -1 + +0..N +1 - + task_instance--xcom - -0..N -1 + +0..N +1 - + task_instance_note - -task_instance_note - -ti_id - - [UUID] - NOT NULL - -content - - [VARCHAR(1000)] - -created_at - - [TIMESTAMP] - NOT NULL - -updated_at - - [TIMESTAMP] - NOT NULL - -user_id - - [VARCHAR(128)] + +task_instance_note + +ti_id + + [UUID] + NOT NULL + +content + + [VARCHAR(1000)] + +created_at + + [TIMESTAMP] + NOT NULL + +updated_at + + [TIMESTAMP] + NOT NULL + +user_id + + [VARCHAR(128)] - + task_instance--task_instance_note - -1 -1 + +1 +1 - + task_instance_history - -task_instance_history - -task_instance_id - - [UUID] - NOT NULL - -context_carrier - - [JSONB] - -custom_operator_name - - [VARCHAR(1000)] - -dag_id - - [VARCHAR(250)] - NOT NULL - -dag_version_id - - [UUID] - -duration - - [DOUBLE_PRECISION] - -end_date - - [TIMESTAMP] - -executor - - [VARCHAR(1000)] - -executor_config - - [BYTEA] - -external_executor_id - - [VARCHAR(250)] - -hostname - - [VARCHAR(1000)] - -map_index - - [INTEGER] - NOT NULL - -max_tries - - [INTEGER] - -next_kwargs - - [JSONB] - -next_method - - [VARCHAR(1000)] - -operator - - [VARCHAR(1000)] - -pid - - [INTEGER] - -pool - - [VARCHAR(256)] - NOT NULL - -pool_slots - - [INTEGER] - NOT NULL - -priority_weight - - [INTEGER] - -queue - - [VARCHAR(256)] - -queued_by_job_id - - [INTEGER] - -queued_dttm - - [TIMESTAMP] - -rendered_map_index - - [VARCHAR(250)] - -run_id - - [VARCHAR(250)] - NOT NULL - -scheduled_dttm - - [TIMESTAMP] - -span_status - - [VARCHAR(250)] - NOT NULL - -start_date - - [TIMESTAMP] - -state - - [VARCHAR(20)] - -task_display_name - - [VARCHAR(2000)] - -task_id - - [VARCHAR(250)] - NOT NULL - -trigger_id - - [INTEGER] - -trigger_timeout - - [TIMESTAMP] - -try_number - - [INTEGER] - NOT NULL - -unixname - - [VARCHAR(1000)] - -updated_at - - [TIMESTAMP] + +task_instance_history + +task_instance_id + + [UUID] + NOT NULL + +context_carrier + + [JSONB] + +custom_operator_name + + [VARCHAR(1000)] + +dag_id + + [VARCHAR(250)] + NOT NULL + +dag_version_id + + [UUID] + +duration + + [DOUBLE_PRECISION] + +end_date + + [TIMESTAMP] + +executor + + [VARCHAR(1000)] + +executor_config + + [BYTEA] + +external_executor_id + + [VARCHAR(250)] + +hostname + + [VARCHAR(1000)] + +map_index + + [INTEGER] + NOT NULL + +max_tries + + [INTEGER] + +next_kwargs + + [JSONB] + +next_method + + [VARCHAR(1000)] + +operator + + [VARCHAR(1000)] + +pid + + [INTEGER] + +pool + + [VARCHAR(256)] + NOT NULL + +pool_slots + + [INTEGER] + NOT NULL + +priority_weight + + [INTEGER] + +queue + + [VARCHAR(256)] + +queued_by_job_id + + [INTEGER] + +queued_dttm + + [TIMESTAMP] + +rendered_map_index + + [VARCHAR(250)] + +run_id + + [VARCHAR(250)] + NOT NULL + +scheduled_dttm + + [TIMESTAMP] + +span_status + + [VARCHAR(250)] + NOT NULL + +start_date + + [TIMESTAMP] + +state + + [VARCHAR(20)] + +task_display_name + + [VARCHAR(2000)] + +task_id + + [VARCHAR(250)] + NOT NULL + +trigger_id + + [INTEGER] + +trigger_timeout + + [TIMESTAMP] + +try_number + + [INTEGER] + NOT NULL + +unixname + + [VARCHAR(1000)] + +updated_at + + [TIMESTAMP] - + task_instance--task_instance_history - -0..N -1 + +0..N +1 - + task_instance--task_instance_history - -0..N -1 + +0..N +1 - + task_instance--task_instance_history - -0..N -1 + +0..N +1 - + task_instance--task_instance_history - -0..N -1 + +0..N +1 + + + +rendered_task_instance_fields + +rendered_task_instance_fields + +dag_id + + [VARCHAR(250)] + NOT NULL + +map_index + + [INTEGER] + NOT NULL + +run_id + + [VARCHAR(250)] + NOT NULL + +task_id + + [VARCHAR(250)] + NOT NULL + +k8s_pod_yaml + + [JSON] + +rendered_fields + + [JSON] + NOT NULL + + + +task_instance--rendered_task_instance_fields + +0..N +1 + + + +task_instance--rendered_task_instance_fields + +0..N +1 + + + +task_instance--rendered_task_instance_fields + +0..N +1 + + + +task_instance--rendered_task_instance_fields + +0..N +1 diff --git a/airflow-core/src/airflow/migrations/utils.py b/airflow-core/src/airflow/migrations/utils.py index 9305606873549..2dbbbece01a57 100644 --- a/airflow-core/src/airflow/migrations/utils.py +++ b/airflow-core/src/airflow/migrations/utils.py @@ -16,6 +16,7 @@ # under the License. from __future__ import annotations +import contextlib from collections import defaultdict from contextlib import contextmanager @@ -103,3 +104,11 @@ def mysql_drop_index_if_exists(index_name, table_name, op): SELECT 1; END IF; """) + + +def ignore_sqlite_value_error(): + from alembic import op + + if op.get_bind().dialect.name == "sqlite": + return contextlib.suppress(ValueError) + return contextlib.nullcontext() diff --git a/airflow-core/src/airflow/migrations/versions/0017_2_9_2_fix_inconsistency_between_ORM_and_migration_files.py b/airflow-core/src/airflow/migrations/versions/0017_2_9_2_fix_inconsistency_between_ORM_and_migration_files.py index 0a62b550d40b9..fa24916df6faa 100644 --- a/airflow-core/src/airflow/migrations/versions/0017_2_9_2_fix_inconsistency_between_ORM_and_migration_files.py +++ b/airflow-core/src/airflow/migrations/versions/0017_2_9_2_fix_inconsistency_between_ORM_and_migration_files.py @@ -243,8 +243,12 @@ def upgrade(): ) """) ) - - conn.execute(sa.text("INSERT INTO dag_run_new SELECT * FROM dag_run")) + headers = ( + "id, dag_id, queued_at, execution_date, start_date, end_date, state, run_id, creating_job_id, " + "external_trigger, run_type, conf, data_interval_start, data_interval_end, " + "last_scheduling_decision, dag_hash, log_template_id, updated_at, clear_number" + ) + conn.execute(sa.text(f"INSERT INTO dag_run_new ({headers}) SELECT {headers} FROM dag_run")) conn.execute(sa.text("DROP TABLE dag_run")) conn.execute(sa.text("ALTER TABLE dag_run_new RENAME TO dag_run")) conn.execute(sa.text("PRAGMA foreign_keys=on")) diff --git a/airflow-core/src/airflow/migrations/versions/0047_3_0_0_add_dag_versioning.py b/airflow-core/src/airflow/migrations/versions/0047_3_0_0_add_dag_versioning.py index a4d4238816a2c..e411c8f43c8b7 100644 --- a/airflow-core/src/airflow/migrations/versions/0047_3_0_0_add_dag_versioning.py +++ b/airflow-core/src/airflow/migrations/versions/0047_3_0_0_add_dag_versioning.py @@ -32,6 +32,7 @@ from sqlalchemy_utils import UUIDType from airflow.migrations.db_types import TIMESTAMP, StringID +from airflow.migrations.utils import ignore_sqlite_value_error from airflow.models.base import naming_convention from airflow.utils import timezone @@ -55,15 +56,22 @@ def upgrade(): sa.Column("dag_id", StringID(), nullable=False), sa.Column("created_at", TIMESTAMP(), nullable=False, default=timezone.utcnow), sa.Column( - "last_updated", TIMESTAMP(), nullable=False, default=timezone.utcnow, onupdate=timezone.utcnow + "last_updated", + TIMESTAMP(), + nullable=False, + default=timezone.utcnow, + onupdate=timezone.utcnow, ), sa.ForeignKeyConstraint( - ("dag_id",), ["dag.dag_id"], name=op.f("dag_version_dag_id_fkey"), ondelete="CASCADE" + ("dag_id",), + ["dag.dag_id"], + name=op.f("dag_version_dag_id_fkey"), + ondelete="CASCADE", ), sa.PrimaryKeyConstraint("id", name=op.f("dag_version_pkey")), sa.UniqueConstraint("dag_id", "version_number", name="dag_id_v_name_v_number_unique_constraint"), ) - with op.batch_alter_table("dag_code") as batch_op: + with ignore_sqlite_value_error(), op.batch_alter_table("dag_code") as batch_op: batch_op.drop_constraint("dag_code_pkey", type_="primary") batch_op.drop_column("fileloc_hash") batch_op.add_column(sa.Column("id", UUIDType(binary=False), nullable=False)) @@ -81,7 +89,7 @@ def upgrade(): ) batch_op.create_unique_constraint("dag_code_dag_version_id_uq", ["dag_version_id"]) - with op.batch_alter_table("serialized_dag") as batch_op: + with ignore_sqlite_value_error(), op.batch_alter_table("serialized_dag") as batch_op: batch_op.drop_constraint("serialized_dag_pkey", type_="primary") batch_op.drop_index("idx_fileloc_hash") batch_op.drop_column("fileloc_hash") From c6435b01b103a833b131ba624e50f641f38ef212 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Thu, 5 Jun 2025 15:47:44 +0200 Subject: [PATCH 002/122] [v3-0-test] Structure endpoint attach downstream asset to task level (#51401) (#51425) * Attach downstream assets to task * Adjust tests (cherry picked from commit 14ee1f4f60019b4ff90f09ede1d10abf42feea4d) Co-authored-by: Pierre Jeambrun --- .../core_api/routes/ui/structure.py | 7 ++++- .../core_api/services/ui/structure.py | 31 +++++++++++++++++++ .../core_api/routes/ui/test_structure.py | 2 +- 3 files changed, 38 insertions(+), 2 deletions(-) diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/structure.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/structure.py index 05fb79bd0bf29..25c8134203b4d 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/structure.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/structure.py @@ -26,7 +26,10 @@ from airflow.api_fastapi.core_api.datamodels.ui.structure import StructureDataResponse from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc from airflow.api_fastapi.core_api.security import requires_access_dag -from airflow.api_fastapi.core_api.services.ui.structure import get_upstream_assets +from airflow.api_fastapi.core_api.services.ui.structure import ( + bind_output_assets_to_tasks, + get_upstream_assets, +) from airflow.models.dag_version import DagVersion from airflow.models.serialized_dag import SerializedDagModel from airflow.utils.dag_edges import dag_edges @@ -139,4 +142,6 @@ def structure_data( data["edges"] += start_edges + edges + end_edges + bind_output_assets_to_tasks(data["edges"], serialized_dag) + return StructureDataResponse(**data) diff --git a/airflow-core/src/airflow/api_fastapi/core_api/services/ui/structure.py b/airflow-core/src/airflow/api_fastapi/core_api/services/ui/structure.py index 128dc93b7706d..6f5f415d3fdb7 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/services/ui/structure.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/services/ui/structure.py @@ -23,6 +23,8 @@ from __future__ import annotations +from airflow.models.serialized_dag import SerializedDagModel + def get_upstream_assets( asset_expression: dict, entry_node_ref: str, level: int = 0 @@ -112,3 +114,32 @@ def get_upstream_assets( edges = edges + e return nodes, edges + + +def bind_output_assets_to_tasks(edges: list[dict], serialized_dag: SerializedDagModel) -> None: + """ + Try to bind the downstream assets to the relevant task that produces them. + + This function will mutate the `edges` in place. + """ + outlet_asset_references = serialized_dag.dag_model.task_outlet_asset_references + + downstream_asset_related_edges = [edge for edge in edges if edge["target_id"].startswith("asset:")] + + for edge in downstream_asset_related_edges: + asset_id = int(edge["target_id"].strip("asset:")) + try: + # Try to attach the outlet asset to the relevant task + outlet_asset_reference = next( + outlet_asset_reference + for outlet_asset_reference in outlet_asset_references + if outlet_asset_reference.asset_id == asset_id + ) + edge["source_id"] = outlet_asset_reference.task_id + continue + except StopIteration: + # If no asset reference found, fallback to using the exit node reference + # This can happen because asset aliases are not yet handled, they do no populate + # the `outlet_asset_references` when resolved. Extra lookup is needed. Same for asset-name-ref and + # asset-uri-ref. + pass diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_structure.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_structure.py index 928c0b2cc8490..035721a1dcbe4 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_structure.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_structure.py @@ -369,7 +369,7 @@ def test_should_return_200_with_asset(self, test_client, asset1_id, asset2_id, a { "is_setup_teardown": None, "label": None, - "source_id": "task_2", + "source_id": "task_1", "target_id": f"asset:{asset3_id}", "is_source_asset": None, }, From dded4b6ab9218e5876eb15f353284b74c59d0d3b Mon Sep 17 00:00:00 2001 From: Kaxil Naik Date: Fri, 6 Jun 2025 18:54:13 +0530 Subject: [PATCH 003/122] Make ``dag.test`` consistent with ``airflow dags test`` CLI command (#51476) (cherry picked from commit 3fb3fb36ad52df7b33e738175806758bc0d6059f) --- task-sdk/src/airflow/sdk/definitions/dag.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/task-sdk/src/airflow/sdk/definitions/dag.py b/task-sdk/src/airflow/sdk/definitions/dag.py index 70db3a2035e29..877d56e330864 100644 --- a/task-sdk/src/airflow/sdk/definitions/dag.py +++ b/task-sdk/src/airflow/sdk/definitions/dag.py @@ -54,7 +54,7 @@ from airflow.sdk.bases.operator import BaseOperator from airflow.sdk.definitions._internal.abstractoperator import AbstractOperator from airflow.sdk.definitions._internal.node import validate_key -from airflow.sdk.definitions._internal.types import NOTSET +from airflow.sdk.definitions._internal.types import NOTSET, ArgNotSet from airflow.sdk.definitions.asset import AssetAll, BaseAsset from airflow.sdk.definitions.context import Context from airflow.sdk.definitions.param import DagParam, ParamsDict @@ -1019,7 +1019,7 @@ def _validate_owner_links(self, _, owner_links): def test( self, run_after: datetime | None = None, - logical_date: datetime | None = None, + logical_date: datetime | None | ArgNotSet = NOTSET, run_conf: dict[str, Any] | None = None, conn_file_path: str | None = None, variable_file_path: str | None = None, @@ -1087,6 +1087,10 @@ def add_logger_if_needed(ti: TaskInstance): with exit_stack: self.validate() + + # Allow users to explicitly pass None. If it isn't set, we default to current time. + logical_date = logical_date if not isinstance(logical_date, ArgNotSet) else timezone.utcnow() + log.debug("Clearing existing task instances for logical date %s", logical_date) # TODO: Replace with calling client.dag_run.clear in Execution API at some point SchedulerDAG.clear_dags( From a3f55452f43df358924de218ea837ca54b5980da Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Fri, 6 Jun 2025 21:17:17 +0200 Subject: [PATCH 004/122] [v3-0-test] Fix structure edges (#51484) (#51489) (cherry picked from commit 414407b7d72f85e43ad67352c4da5090d8e24a56) Co-authored-by: Pierre Jeambrun --- .../core_api/routes/ui/structure.py | 4 +-- .../core_api/routes/ui/test_structure.py | 28 +++++++++---------- 2 files changed, 16 insertions(+), 16 deletions(-) diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/structure.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/structure.py index 25c8134203b4d..a84f36ab010d3 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/structure.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/structure.py @@ -138,9 +138,9 @@ def structure_data( asset_expression, entry_node_ref["id"] ) data["nodes"] += upstream_asset_nodes - data["edges"] = upstream_asset_edges + data["edges"] += upstream_asset_edges - data["edges"] += start_edges + edges + end_edges + data["edges"] += start_edges + end_edges bind_output_assets_to_tasks(data["edges"], serialized_dag) diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_structure.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_structure.py index 035721a1dcbe4..2c6425db1b395 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_structure.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_structure.py @@ -310,6 +310,20 @@ def test_should_return_200_with_asset(self, test_client, asset1_id, asset2_id, a } expected = { "edges": [ + { + "is_setup_teardown": None, + "label": None, + "source_id": "external_task_sensor", + "target_id": "task_2", + "is_source_asset": None, + }, + { + "is_setup_teardown": None, + "label": None, + "source_id": "task_1", + "target_id": "external_task_sensor", + "is_source_asset": None, + }, { "is_setup_teardown": None, "label": None, @@ -352,20 +366,6 @@ def test_should_return_200_with_asset(self, test_client, asset1_id, asset2_id, a "target_id": "task_1", "is_source_asset": None, }, - { - "is_setup_teardown": None, - "label": None, - "source_id": "external_task_sensor", - "target_id": "task_2", - "is_source_asset": None, - }, - { - "is_setup_teardown": None, - "label": None, - "source_id": "task_1", - "target_id": "external_task_sensor", - "is_source_asset": None, - }, { "is_setup_teardown": None, "label": None, From 62ed2bcd2b6d8484a386c84451a9e8bfba02282b Mon Sep 17 00:00:00 2001 From: Pierre Jeambrun Date: Tue, 10 Jun 2025 15:13:53 +0200 Subject: [PATCH 005/122] Minor fix in python client docs (#51216) (#51571) (cherry picked from commit a08ea0a223f6b0c536d4294593edcc601d575621) Co-authored-by: Dheeraj Turaga --- clients/python/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clients/python/README.md b/clients/python/README.md index 0fa289542a581..58568af8b8d30 100644 --- a/clients/python/README.md +++ b/clients/python/README.md @@ -552,7 +552,7 @@ You can also set it by env variable: `export AIRFLOW__CORE__LOAD_EXAMPLES=True` * optionally expose configuration (NOTE! that this is dangerous setting). The script will happily run with the default setting, but if you want to see the configuration, you need to expose it. - In the `[webserver]` section of your `airflow.cfg` set: + In the `[api]` section of your `airflow.cfg` set: ```ini [api] From c4ffd8670a5b036e6fddf59df8690563c3175b58 Mon Sep 17 00:00:00 2001 From: Pierre Jeambrun Date: Tue, 10 Jun 2025 16:26:49 +0200 Subject: [PATCH 006/122] Update docs for Python client (#50850) (#51572) These were outdated. Still need to automate this process! (cherry picked from commit 687decf2988f33498ba1b49d81ab71dc816f45cd) Co-authored-by: Kaxil Naik --- clients/python/README.md | 455 +++++++++++++++------------ clients/python/test_python_client.py | 8 +- 2 files changed, 252 insertions(+), 211 deletions(-) diff --git a/clients/python/README.md b/clients/python/README.md index 58568af8b8d30..2253aec8bcfef 100644 --- a/clients/python/README.md +++ b/clients/python/README.md @@ -70,7 +70,7 @@ of resources' metadata in the response body. When reading resources, some common query parameters are usually available. e.g.: ``` -v1/connections?limit=25&offset=25 +/api/v2/connections?limit=25&offset=25 ``` |Query Parameter|Type|Description| @@ -138,17 +138,18 @@ You can use a third party client, such as [curl](https://curl.haxx.se/), [HTTPie [Postman](https://www.postman.com/) or [the Insomnia rest client](https://insomnia.rest/) to test the Apache Airflow API. -Note that you will need to pass credentials data. +Note that you will need to pass authentication credentials. If your Airflow deployment supports +**Bearer token authentication**, you can use the following example: -For e.g., here is how to pause a DAG with [curl](https://curl.haxx.se/), when basic authorization is used: +For example, here is how to pause a DAG with `curl`, using a Bearer token: ```bash -curl -X PATCH 'https://example.com/api/v1/dags/{dag_id}?update_mask=is_paused' \\ --H 'Content-Type: application/json' \\ ---user \"username:password\" \\ --d '{ - \"is_paused\": true -}' +curl -X PATCH 'https://example.com/api/v2/dags/{dag_id}?update_mask=is_paused' \ + -H 'Content-Type: application/json' \ + -H 'Authorization: Bearer YOUR_ACCESS_TOKEN' \ + -d '{ + \"is_paused\": true + }' ``` Using a graphical tool such as [Postman](https://www.postman.com/) or [Insomnia](https://insomnia.rest/), @@ -281,232 +282,277 @@ import airflow_client.client Please follow the [installation procedure](#installation--usage) and then run the following: ```python -import time import airflow_client.client +from airflow_client.client.rest import ApiException from pprint import pprint -from airflow_client.client.api import config_api -from airflow_client.client.model.config import Config -from airflow_client.client.model.error import Error -# Defining the host is optional and defaults to /api/v1 +# Defining the host is optional and defaults to http://localhost # See configuration.py for a list of all supported configuration parameters. -configuration = client.Configuration(host="/api/v1") +configuration = airflow_client.client.Configuration(host="http://localhost") # The client must configure the authentication and authorization parameters # in accordance with the API server security policy. # Examples for each auth method are provided below, use the example that # satisfies your auth use case. -# Configure HTTP basic authorization: Basic -configuration = client.Configuration(username="YOUR_USERNAME", password="YOUR_PASSWORD") +configuration.access_token = os.environ["ACCESS_TOKEN"] # Enter a context with an instance of the API client -with client.ApiClient(configuration) as api_client: +with airflow_client.client.ApiClient(configuration) as api_client: # Create an instance of the API class - api_instance = config_api.ConfigApi(api_client) + api_instance = airflow_client.client.AssetApi(api_client) + create_asset_events_body = airflow_client.client.CreateAssetEventsBody() # CreateAssetEventsBody | try: - # Get current configuration - api_response = api_instance.get_config() + # Create Asset Event + api_response = api_instance.create_asset_event(create_asset_events_body) + print("The response of AssetApi->create_asset_event:\n") pprint(api_response) - except client.ApiException as e: - print("Exception when calling ConfigApi->get_config: %s\n" % e) + except ApiException as e: + print("Exception when calling AssetApi->create_asset_event: %s\n" % e) ``` ## Documentation for API Endpoints -All URIs are relative to */api/v1* +All URIs are relative to *http://localhost* Class | Method | HTTP request | Description ------------ | ------------- | ------------- | ------------- -*ConfigApi* | [**get_config**](docs/ConfigApi.md#get_config) | **GET** /config | Get current configuration -*ConnectionApi* | [**delete_connection**](docs/ConnectionApi.md#delete_connection) | **DELETE** /connections/{connection_id} | Delete a connection -*ConnectionApi* | [**get_connection**](docs/ConnectionApi.md#get_connection) | **GET** /connections/{connection_id} | Get a connection -*ConnectionApi* | [**get_connections**](docs/ConnectionApi.md#get_connections) | **GET** /connections | List connections -*ConnectionApi* | [**patch_connection**](docs/ConnectionApi.md#patch_connection) | **PATCH** /connections/{connection_id} | Update a connection -*ConnectionApi* | [**post_connection**](docs/ConnectionApi.md#post_connection) | **POST** /connections | Create a connection -*ConnectionApi* | [**test_connection**](docs/ConnectionApi.md#test_connection) | **POST** /connections/test | Test a connection -*DAGApi* | [**delete_dag**](docs/DAGApi.md#delete_dag) | **DELETE** /dags/{dag_id} | Delete a DAG -*DAGApi* | [**get_dag**](docs/DAGApi.md#get_dag) | **GET** /dags/{dag_id} | Get basic information about a DAG -*DAGApi* | [**get_dag_details**](docs/DAGApi.md#get_dag_details) | **GET** /dags/{dag_id}/details | Get a simplified representation of DAG -*DAGApi* | [**get_dag_source**](docs/DAGApi.md#get_dag_source) | **GET** /dagSources/{file_token} | Get a source code -*DAGApi* | [**get_dags**](docs/DAGApi.md#get_dags) | **GET** /dags | List DAGs -*DAGApi* | [**get_task**](docs/DAGApi.md#get_task) | **GET** /dags/{dag_id}/tasks/{task_id} | Get simplified representation of a task -*DAGApi* | [**get_tasks**](docs/DAGApi.md#get_tasks) | **GET** /dags/{dag_id}/tasks | Get tasks for DAG -*DAGApi* | [**patch_dag**](docs/DAGApi.md#patch_dag) | **PATCH** /dags/{dag_id} | Update a DAG -*DAGApi* | [**patch_dags**](docs/DAGApi.md#patch_dags) | **PATCH** /dags | Update DAGs -*DAGApi* | [**post_clear_task_instances**](docs/DAGApi.md#post_clear_task_instances) | **POST** /dags/{dag_id}/clearTaskInstances | Clear a set of task instances -*DAGApi* | [**post_set_task_instances_state**](docs/DAGApi.md#post_set_task_instances_state) | **POST** /dags/{dag_id}/updateTaskInstancesState | Set a state of task instances -*DAGRunApi* | [**clear_dag_run**](docs/DAGRunApi.md#clear_dag_run) | **POST** /dags/{dag_id}/dagRuns/{dag_run_id}/clear | Clear a DAG run -*DAGRunApi* | [**delete_dag_run**](docs/DAGRunApi.md#delete_dag_run) | **DELETE** /dags/{dag_id}/dagRuns/{dag_run_id} | Delete a DAG run -*DAGRunApi* | [**get_dag_run**](docs/DAGRunApi.md#get_dag_run) | **GET** /dags/{dag_id}/dagRuns/{dag_run_id} | Get a DAG run -*DAGRunApi* | [**get_dag_runs**](docs/DAGRunApi.md#get_dag_runs) | **GET** /dags/{dag_id}/dagRuns | List DAG runs -*DAGRunApi* | [**get_dag_runs_batch**](docs/DAGRunApi.md#get_dag_runs_batch) | **POST** /dags/~/dagRuns/list | List DAG runs (batch) -*DAGRunApi* | [**get_upstream_asset_events**](docs/DAGRunApi.md#get_upstream_asset_events) | **GET** /dags/{dag_id}/dagRuns/{dag_run_id}/upstreamAssetEvents | Get asset events for a DAG run -*DAGRunApi* | [**post_dag_run**](docs/DAGRunApi.md#post_dag_run) | **POST** /dags/{dag_id}/dagRuns | Trigger a new DAG run -*DAGRunApi* | [**set_dag_run_note**](docs/DAGRunApi.md#set_dag_run_note) | **PATCH** /dags/{dag_id}/dagRuns/{dag_run_id}/setNote | Update the DagRun note. -*DAGRunApi* | [**update_dag_run_state**](docs/DAGRunApi.md#update_dag_run_state) | **PATCH** /dags/{dag_id}/dagRuns/{dag_run_id} | Modify a DAG run -*DagWarningApi* | [**get_dag_warnings**](docs/DagWarningApi.md#get_dag_warnings) | **GET** /dagWarnings | List dag warnings -*AssetApi* | [**get_asset**](docs/DatasetApi.md#get_asset) | **GET** /assets/{uri} | Get an asset -*AssetApi* | [**get_asset_events**](docs/DatasetApi.md#get_asset_events) | **GET** /assets/events | Get asset events -*DatasetApi* | [**get_assets**](docs/DatasetApi.md#get_assets) | **GET** /assets | List assets -*DatasetApi* | [**get_upstream_asset_events**](docs/DatasetApi.md#get_upstream_asset_events) | **GET** /dags/{dag_id}/dagRuns/{dag_run_id}/upstreamAssetEvents | Get dataset events for a DAG run -*EventLogApi* | [**get_event_log**](docs/EventLogApi.md#get_event_log) | **GET** /eventLogs/{event_log_id} | Get a log entry -*EventLogApi* | [**get_event_logs**](docs/EventLogApi.md#get_event_logs) | **GET** /eventLogs | List log entries -*ImportErrorApi* | [**get_import_error**](docs/ImportErrorApi.md#get_import_error) | **GET** /importErrors/{import_error_id} | Get an import error -*ImportErrorApi* | [**get_import_errors**](docs/ImportErrorApi.md#get_import_errors) | **GET** /importErrors | List import errors -*MonitoringApi* | [**get_health**](docs/MonitoringApi.md#get_health) | **GET** /health | Get instance status -*MonitoringApi* | [**get_version**](docs/MonitoringApi.md#get_version) | **GET** /version | Get version information -*PermissionApi* | [**get_permissions**](docs/PermissionApi.md#get_permissions) | **GET** /permissions | List permissions -*PluginApi* | [**get_plugins**](docs/PluginApi.md#get_plugins) | **GET** /plugins | Get a list of loaded plugins -*PoolApi* | [**delete_pool**](docs/PoolApi.md#delete_pool) | **DELETE** /pools/{pool_name} | Delete a pool -*PoolApi* | [**get_pool**](docs/PoolApi.md#get_pool) | **GET** /pools/{pool_name} | Get a pool -*PoolApi* | [**get_pools**](docs/PoolApi.md#get_pools) | **GET** /pools | List pools -*PoolApi* | [**patch_pool**](docs/PoolApi.md#patch_pool) | **PATCH** /pools/{pool_name} | Update a pool -*PoolApi* | [**post_pool**](docs/PoolApi.md#post_pool) | **POST** /pools | Create a pool -*ProviderApi* | [**get_providers**](docs/ProviderApi.md#get_providers) | **GET** /providers | List providers -*RoleApi* | [**delete_role**](docs/RoleApi.md#delete_role) | **DELETE** /roles/{role_name} | Delete a role -*RoleApi* | [**get_role**](docs/RoleApi.md#get_role) | **GET** /roles/{role_name} | Get a role -*RoleApi* | [**get_roles**](docs/RoleApi.md#get_roles) | **GET** /roles | List roles -*RoleApi* | [**patch_role**](docs/RoleApi.md#patch_role) | **PATCH** /roles/{role_name} | Update a role -*RoleApi* | [**post_role**](docs/RoleApi.md#post_role) | **POST** /roles | Create a role -*TaskInstanceApi* | [**get_extra_links**](docs/TaskInstanceApi.md#get_extra_links) | **GET** /dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/links | List extra links -*TaskInstanceApi* | [**get_log**](docs/TaskInstanceApi.md#get_log) | **GET** /dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/logs/{task_try_number} | Get logs -*TaskInstanceApi* | [**get_mapped_task_instance**](docs/TaskInstanceApi.md#get_mapped_task_instance) | **GET** /dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/{map_index} | Get a mapped task instance -*TaskInstanceApi* | [**get_mapped_task_instances**](docs/TaskInstanceApi.md#get_mapped_task_instances) | **GET** /dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/listMapped | List mapped task instances -*TaskInstanceApi* | [**get_task_instance**](docs/TaskInstanceApi.md#get_task_instance) | **GET** /dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id} | Get a task instance -*TaskInstanceApi* | [**get_task_instances**](docs/TaskInstanceApi.md#get_task_instances) | **GET** /dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances | List task instances -*TaskInstanceApi* | [**get_task_instances_batch**](docs/TaskInstanceApi.md#get_task_instances_batch) | **POST** /dags/~/dagRuns/~/taskInstances/list | List task instances (batch) -*TaskInstanceApi* | [**patch_mapped_task_instance**](docs/TaskInstanceApi.md#patch_mapped_task_instance) | **PATCH** /dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/{map_index} | Updates the state of a mapped task instance -*TaskInstanceApi* | [**patch_task_instance**](docs/TaskInstanceApi.md#patch_task_instance) | **PATCH** /dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id} | Updates the state of a task instance -*TaskInstanceApi* | [**set_mapped_task_instance_note**](docs/TaskInstanceApi.md#set_mapped_task_instance_note) | **PATCH** /dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/{map_index}/setNote | Update the TaskInstance note. -*TaskInstanceApi* | [**set_task_instance_note**](docs/TaskInstanceApi.md#set_task_instance_note) | **PATCH** /dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/setNote | Update the TaskInstance note. -*UserApi* | [**delete_user**](docs/UserApi.md#delete_user) | **DELETE** /users/{username} | Delete a user -*UserApi* | [**get_user**](docs/UserApi.md#get_user) | **GET** /users/{username} | Get a user -*UserApi* | [**get_users**](docs/UserApi.md#get_users) | **GET** /users | List users -*UserApi* | [**patch_user**](docs/UserApi.md#patch_user) | **PATCH** /users/{username} | Update a user -*UserApi* | [**post_user**](docs/UserApi.md#post_user) | **POST** /users | Create a user -*VariableApi* | [**delete_variable**](docs/VariableApi.md#delete_variable) | **DELETE** /variables/{variable_key} | Delete a variable -*VariableApi* | [**get_variable**](docs/VariableApi.md#get_variable) | **GET** /variables/{variable_key} | Get a variable -*VariableApi* | [**get_variables**](docs/VariableApi.md#get_variables) | **GET** /variables | List variables -*VariableApi* | [**patch_variable**](docs/VariableApi.md#patch_variable) | **PATCH** /variables/{variable_key} | Update a variable -*VariableApi* | [**post_variables**](docs/VariableApi.md#post_variables) | **POST** /variables | Create a variable -*XComApi* | [**get_xcom_entries**](docs/XComApi.md#get_xcom_entries) | **GET** /dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/xcomEntries | List XCom entries -*XComApi* | [**get_xcom_entry**](docs/XComApi.md#get_xcom_entry) | **GET** /dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/xcomEntries/{xcom_key} | Get an XCom entry +*AssetApi* | [**create_asset_event**](docs/AssetApi.md#create_asset_event) | **POST** /api/v2/assets/events | Create Asset Event +*AssetApi* | [**delete_asset_queued_events**](docs/AssetApi.md#delete_asset_queued_events) | **DELETE** /api/v2/assets/{asset_id}/queuedEvents | Delete Asset Queued Events +*AssetApi* | [**delete_dag_asset_queued_event**](docs/AssetApi.md#delete_dag_asset_queued_event) | **DELETE** /api/v2/dags/{dag_id}/assets/{asset_id}/queuedEvents | Delete Dag Asset Queued Event +*AssetApi* | [**delete_dag_asset_queued_events**](docs/AssetApi.md#delete_dag_asset_queued_events) | **DELETE** /api/v2/dags/{dag_id}/assets/queuedEvents | Delete Dag Asset Queued Events +*AssetApi* | [**get_asset**](docs/AssetApi.md#get_asset) | **GET** /api/v2/assets/{asset_id} | Get Asset +*AssetApi* | [**get_asset_alias**](docs/AssetApi.md#get_asset_alias) | **GET** /api/v2/assets/aliases/{asset_alias_id} | Get Asset Alias +*AssetApi* | [**get_asset_aliases**](docs/AssetApi.md#get_asset_aliases) | **GET** /api/v2/assets/aliases | Get Asset Aliases +*AssetApi* | [**get_asset_events**](docs/AssetApi.md#get_asset_events) | **GET** /api/v2/assets/events | Get Asset Events +*AssetApi* | [**get_asset_queued_events**](docs/AssetApi.md#get_asset_queued_events) | **GET** /api/v2/assets/{asset_id}/queuedEvents | Get Asset Queued Events +*AssetApi* | [**get_assets**](docs/AssetApi.md#get_assets) | **GET** /api/v2/assets | Get Assets +*AssetApi* | [**get_dag_asset_queued_event**](docs/AssetApi.md#get_dag_asset_queued_event) | **GET** /api/v2/dags/{dag_id}/assets/{asset_id}/queuedEvents | Get Dag Asset Queued Event +*AssetApi* | [**get_dag_asset_queued_events**](docs/AssetApi.md#get_dag_asset_queued_events) | **GET** /api/v2/dags/{dag_id}/assets/queuedEvents | Get Dag Asset Queued Events +*AssetApi* | [**materialize_asset**](docs/AssetApi.md#materialize_asset) | **POST** /api/v2/assets/{asset_id}/materialize | Materialize Asset +*BackfillApi* | [**cancel_backfill**](docs/BackfillApi.md#cancel_backfill) | **PUT** /api/v2/backfills/{backfill_id}/cancel | Cancel Backfill +*BackfillApi* | [**create_backfill**](docs/BackfillApi.md#create_backfill) | **POST** /api/v2/backfills | Create Backfill +*BackfillApi* | [**create_backfill_dry_run**](docs/BackfillApi.md#create_backfill_dry_run) | **POST** /api/v2/backfills/dry_run | Create Backfill Dry Run +*BackfillApi* | [**get_backfill**](docs/BackfillApi.md#get_backfill) | **GET** /api/v2/backfills/{backfill_id} | Get Backfill +*BackfillApi* | [**list_backfills**](docs/BackfillApi.md#list_backfills) | **GET** /api/v2/backfills | List Backfills +*BackfillApi* | [**pause_backfill**](docs/BackfillApi.md#pause_backfill) | **PUT** /api/v2/backfills/{backfill_id}/pause | Pause Backfill +*BackfillApi* | [**unpause_backfill**](docs/BackfillApi.md#unpause_backfill) | **PUT** /api/v2/backfills/{backfill_id}/unpause | Unpause Backfill +*ConfigApi* | [**get_config**](docs/ConfigApi.md#get_config) | **GET** /api/v2/config | Get Config +*ConfigApi* | [**get_config_value**](docs/ConfigApi.md#get_config_value) | **GET** /api/v2/config/section/{section}/option/{option} | Get Config Value +*ConnectionApi* | [**bulk_connections**](docs/ConnectionApi.md#bulk_connections) | **PATCH** /api/v2/connections | Bulk Connections +*ConnectionApi* | [**create_default_connections**](docs/ConnectionApi.md#create_default_connections) | **POST** /api/v2/connections/defaults | Create Default Connections +*ConnectionApi* | [**delete_connection**](docs/ConnectionApi.md#delete_connection) | **DELETE** /api/v2/connections/{connection_id} | Delete Connection +*ConnectionApi* | [**get_connection**](docs/ConnectionApi.md#get_connection) | **GET** /api/v2/connections/{connection_id} | Get Connection +*ConnectionApi* | [**get_connections**](docs/ConnectionApi.md#get_connections) | **GET** /api/v2/connections | Get Connections +*ConnectionApi* | [**patch_connection**](docs/ConnectionApi.md#patch_connection) | **PATCH** /api/v2/connections/{connection_id} | Patch Connection +*ConnectionApi* | [**post_connection**](docs/ConnectionApi.md#post_connection) | **POST** /api/v2/connections | Post Connection +*ConnectionApi* | [**test_connection**](docs/ConnectionApi.md#test_connection) | **POST** /api/v2/connections/test | Test Connection +*DAGApi* | [**delete_dag**](docs/DAGApi.md#delete_dag) | **DELETE** /api/v2/dags/{dag_id} | Delete Dag +*DAGApi* | [**get_dag**](docs/DAGApi.md#get_dag) | **GET** /api/v2/dags/{dag_id} | Get Dag +*DAGApi* | [**get_dag_details**](docs/DAGApi.md#get_dag_details) | **GET** /api/v2/dags/{dag_id}/details | Get Dag Details +*DAGApi* | [**get_dag_tags**](docs/DAGApi.md#get_dag_tags) | **GET** /api/v2/dagTags | Get Dag Tags +*DAGApi* | [**get_dags**](docs/DAGApi.md#get_dags) | **GET** /api/v2/dags | Get Dags +*DAGApi* | [**patch_dag**](docs/DAGApi.md#patch_dag) | **PATCH** /api/v2/dags/{dag_id} | Patch Dag +*DAGApi* | [**patch_dags**](docs/DAGApi.md#patch_dags) | **PATCH** /api/v2/dags | Patch Dags +*DAGParsingApi* | [**reparse_dag_file**](docs/DAGParsingApi.md#reparse_dag_file) | **PUT** /api/v2/parseDagFile/{file_token} | Reparse Dag File +*DagReportApi* | [**get_dag_reports**](docs/DagReportApi.md#get_dag_reports) | **GET** /api/v2/dagReports | Get Dag Reports +*DagRunApi* | [**clear_dag_run**](docs/DagRunApi.md#clear_dag_run) | **POST** /api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/clear | Clear Dag Run +*DagRunApi* | [**delete_dag_run**](docs/DagRunApi.md#delete_dag_run) | **DELETE** /api/v2/dags/{dag_id}/dagRuns/{dag_run_id} | Delete Dag Run +*DagRunApi* | [**get_dag_run**](docs/DagRunApi.md#get_dag_run) | **GET** /api/v2/dags/{dag_id}/dagRuns/{dag_run_id} | Get Dag Run +*DagRunApi* | [**get_dag_runs**](docs/DagRunApi.md#get_dag_runs) | **GET** /api/v2/dags/{dag_id}/dagRuns | Get Dag Runs +*DagRunApi* | [**get_list_dag_runs_batch**](docs/DagRunApi.md#get_list_dag_runs_batch) | **POST** /api/v2/dags/{dag_id}/dagRuns/list | Get List Dag Runs Batch +*DagRunApi* | [**get_upstream_asset_events**](docs/DagRunApi.md#get_upstream_asset_events) | **GET** /api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/upstreamAssetEvents | Get Upstream Asset Events +*DagRunApi* | [**patch_dag_run**](docs/DagRunApi.md#patch_dag_run) | **PATCH** /api/v2/dags/{dag_id}/dagRuns/{dag_run_id} | Patch Dag Run +*DagRunApi* | [**trigger_dag_run**](docs/DagRunApi.md#trigger_dag_run) | **POST** /api/v2/dags/{dag_id}/dagRuns | Trigger Dag Run +*DagSourceApi* | [**get_dag_source**](docs/DagSourceApi.md#get_dag_source) | **GET** /api/v2/dagSources/{dag_id} | Get Dag Source +*DagStatsApi* | [**get_dag_stats**](docs/DagStatsApi.md#get_dag_stats) | **GET** /api/v2/dagStats | Get Dag Stats +*DagVersionApi* | [**get_dag_version**](docs/DagVersionApi.md#get_dag_version) | **GET** /api/v2/dags/{dag_id}/dagVersions/{version_number} | Get Dag Version +*DagVersionApi* | [**get_dag_versions**](docs/DagVersionApi.md#get_dag_versions) | **GET** /api/v2/dags/{dag_id}/dagVersions | Get Dag Versions +*DagWarningApi* | [**list_dag_warnings**](docs/DagWarningApi.md#list_dag_warnings) | **GET** /api/v2/dagWarnings | List Dag Warnings +*EventLogApi* | [**get_event_log**](docs/EventLogApi.md#get_event_log) | **GET** /api/v2/eventLogs/{event_log_id} | Get Event Log +*EventLogApi* | [**get_event_logs**](docs/EventLogApi.md#get_event_logs) | **GET** /api/v2/eventLogs | Get Event Logs +*ExtraLinksApi* | [**get_extra_links**](docs/ExtraLinksApi.md#get_extra_links) | **GET** /api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/links | Get Extra Links +*ImportErrorApi* | [**get_import_error**](docs/ImportErrorApi.md#get_import_error) | **GET** /api/v2/importErrors/{import_error_id} | Get Import Error +*ImportErrorApi* | [**get_import_errors**](docs/ImportErrorApi.md#get_import_errors) | **GET** /api/v2/importErrors | Get Import Errors +*JobApi* | [**get_jobs**](docs/JobApi.md#get_jobs) | **GET** /api/v2/jobs | Get Jobs +*LoginApi* | [**login**](docs/LoginApi.md#login) | **GET** /api/v2/auth/login | Login +*LoginApi* | [**logout**](docs/LoginApi.md#logout) | **GET** /api/v2/auth/logout | Logout +*MonitorApi* | [**get_health**](docs/MonitorApi.md#get_health) | **GET** /api/v2/monitor/health | Get Health +*PluginApi* | [**get_plugins**](docs/PluginApi.md#get_plugins) | **GET** /api/v2/plugins | Get Plugins +*PoolApi* | [**bulk_pools**](docs/PoolApi.md#bulk_pools) | **PATCH** /api/v2/pools | Bulk Pools +*PoolApi* | [**delete_pool**](docs/PoolApi.md#delete_pool) | **DELETE** /api/v2/pools/{pool_name} | Delete Pool +*PoolApi* | [**get_pool**](docs/PoolApi.md#get_pool) | **GET** /api/v2/pools/{pool_name} | Get Pool +*PoolApi* | [**get_pools**](docs/PoolApi.md#get_pools) | **GET** /api/v2/pools | Get Pools +*PoolApi* | [**patch_pool**](docs/PoolApi.md#patch_pool) | **PATCH** /api/v2/pools/{pool_name} | Patch Pool +*PoolApi* | [**post_pool**](docs/PoolApi.md#post_pool) | **POST** /api/v2/pools | Post Pool +*ProviderApi* | [**get_providers**](docs/ProviderApi.md#get_providers) | **GET** /api/v2/providers | Get Providers +*TaskApi* | [**get_task**](docs/TaskApi.md#get_task) | **GET** /api/v2/dags/{dag_id}/tasks/{task_id} | Get Task +*TaskApi* | [**get_tasks**](docs/TaskApi.md#get_tasks) | **GET** /api/v2/dags/{dag_id}/tasks | Get Tasks +*TaskInstanceApi* | [**get_extra_links**](docs/TaskInstanceApi.md#get_extra_links) | **GET** /api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/links | Get Extra Links +*TaskInstanceApi* | [**get_log**](docs/TaskInstanceApi.md#get_log) | **GET** /api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/logs/{try_number} | Get Log +*TaskInstanceApi* | [**get_mapped_task_instance**](docs/TaskInstanceApi.md#get_mapped_task_instance) | **GET** /api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/{map_index} | Get Mapped Task Instance +*TaskInstanceApi* | [**get_mapped_task_instance_tries**](docs/TaskInstanceApi.md#get_mapped_task_instance_tries) | **GET** /api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/{map_index}/tries | Get Mapped Task Instance Tries +*TaskInstanceApi* | [**get_mapped_task_instance_try_details**](docs/TaskInstanceApi.md#get_mapped_task_instance_try_details) | **GET** /api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/{map_index}/tries/{task_try_number} | Get Mapped Task Instance Try Details +*TaskInstanceApi* | [**get_mapped_task_instances**](docs/TaskInstanceApi.md#get_mapped_task_instances) | **GET** /api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/listMapped | Get Mapped Task Instances +*TaskInstanceApi* | [**get_task_instance**](docs/TaskInstanceApi.md#get_task_instance) | **GET** /api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id} | Get Task Instance +*TaskInstanceApi* | [**get_task_instance_dependencies**](docs/TaskInstanceApi.md#get_task_instance_dependencies) | **GET** /api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/dependencies | Get Task Instance Dependencies +*TaskInstanceApi* | [**get_task_instance_dependencies_by_map_index**](docs/TaskInstanceApi.md#get_task_instance_dependencies_by_map_index) | **GET** /api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/{map_index}/dependencies | Get Task Instance Dependencies +*TaskInstanceApi* | [**get_task_instance_tries**](docs/TaskInstanceApi.md#get_task_instance_tries) | **GET** /api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/tries | Get Task Instance Tries +*TaskInstanceApi* | [**get_task_instance_try_details**](docs/TaskInstanceApi.md#get_task_instance_try_details) | **GET** /api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/tries/{task_try_number} | Get Task Instance Try Details +*TaskInstanceApi* | [**get_task_instances**](docs/TaskInstanceApi.md#get_task_instances) | **GET** /api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances | Get Task Instances +*TaskInstanceApi* | [**get_task_instances_batch**](docs/TaskInstanceApi.md#get_task_instances_batch) | **POST** /api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/list | Get Task Instances Batch +*TaskInstanceApi* | [**patch_task_instance**](docs/TaskInstanceApi.md#patch_task_instance) | **PATCH** /api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id} | Patch Task Instance +*TaskInstanceApi* | [**patch_task_instance_by_map_index**](docs/TaskInstanceApi.md#patch_task_instance_by_map_index) | **PATCH** /api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/{map_index} | Patch Task Instance +*TaskInstanceApi* | [**patch_task_instance_dry_run**](docs/TaskInstanceApi.md#patch_task_instance_dry_run) | **PATCH** /api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/dry_run | Patch Task Instance Dry Run +*TaskInstanceApi* | [**patch_task_instance_dry_run_by_map_index**](docs/TaskInstanceApi.md#patch_task_instance_dry_run_by_map_index) | **PATCH** /api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/{map_index}/dry_run | Patch Task Instance Dry Run +*TaskInstanceApi* | [**post_clear_task_instances**](docs/TaskInstanceApi.md#post_clear_task_instances) | **POST** /api/v2/dags/{dag_id}/clearTaskInstances | Post Clear Task Instances +*VariableApi* | [**bulk_variables**](docs/VariableApi.md#bulk_variables) | **PATCH** /api/v2/variables | Bulk Variables +*VariableApi* | [**delete_variable**](docs/VariableApi.md#delete_variable) | **DELETE** /api/v2/variables/{variable_key} | Delete Variable +*VariableApi* | [**get_variable**](docs/VariableApi.md#get_variable) | **GET** /api/v2/variables/{variable_key} | Get Variable +*VariableApi* | [**get_variables**](docs/VariableApi.md#get_variables) | **GET** /api/v2/variables | Get Variables +*VariableApi* | [**patch_variable**](docs/VariableApi.md#patch_variable) | **PATCH** /api/v2/variables/{variable_key} | Patch Variable +*VariableApi* | [**post_variable**](docs/VariableApi.md#post_variable) | **POST** /api/v2/variables | Post Variable +*VersionApi* | [**get_version**](docs/VersionApi.md#get_version) | **GET** /api/v2/version | Get Version +*XComApi* | [**create_xcom_entry**](docs/XComApi.md#create_xcom_entry) | **POST** /api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/xcomEntries | Create Xcom Entry +*XComApi* | [**get_xcom_entries**](docs/XComApi.md#get_xcom_entries) | **GET** /api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/xcomEntries | Get Xcom Entries +*XComApi* | [**get_xcom_entry**](docs/XComApi.md#get_xcom_entry) | **GET** /api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/xcomEntries/{xcom_key} | Get Xcom Entry +*XComApi* | [**update_xcom_entry**](docs/XComApi.md#update_xcom_entry) | **PATCH** /api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/xcomEntries/{xcom_key} | Update Xcom Entry ## Documentation For Models - - [Action](docs/Action.md) - - [ActionCollection](docs/ActionCollection.md) - - [ActionCollectionAllOf](docs/ActionCollectionAllOf.md) - - [ActionResource](docs/ActionResource.md) - - [AssetCollection](docs/AssetCollection.md) - - [AssetCollectionAllOf](docs/AssetCollectionAllOf.md) - - [AssetEvent](docs/AssetEvent.md) - - [AssetEventCollection](docs/AssetEventCollection.md) - - [AssetEventCollectionAllOf](docs/AssetEventCollectionAllOf.md) - - [BasicDAGRun](docs/BasicDAGRun.md) - - [ClassReference](docs/ClassReference.md) - - [ClearDagRun](docs/ClearDagRun.md) - - [ClearTaskInstances](docs/ClearTaskInstances.md) - - [CollectionInfo](docs/CollectionInfo.md) - - [Color](docs/Color.md) + - [AppBuilderMenuItemResponse](docs/AppBuilderMenuItemResponse.md) + - [AppBuilderViewResponse](docs/AppBuilderViewResponse.md) + - [AssetAliasCollectionResponse](docs/AssetAliasCollectionResponse.md) + - [AssetAliasResponse](docs/AssetAliasResponse.md) + - [AssetCollectionResponse](docs/AssetCollectionResponse.md) + - [AssetEventCollectionResponse](docs/AssetEventCollectionResponse.md) + - [AssetEventResponse](docs/AssetEventResponse.md) + - [AssetResponse](docs/AssetResponse.md) + - [BackfillCollectionResponse](docs/BackfillCollectionResponse.md) + - [BackfillPostBody](docs/BackfillPostBody.md) + - [BackfillResponse](docs/BackfillResponse.md) + - [BaseInfoResponse](docs/BaseInfoResponse.md) + - [BulkAction](docs/BulkAction.md) + - [BulkActionNotOnExistence](docs/BulkActionNotOnExistence.md) + - [BulkActionOnExistence](docs/BulkActionOnExistence.md) + - [BulkActionResponse](docs/BulkActionResponse.md) + - [BulkBodyConnectionBody](docs/BulkBodyConnectionBody.md) + - [BulkBodyConnectionBodyActionsInner](docs/BulkBodyConnectionBodyActionsInner.md) + - [BulkBodyPoolBody](docs/BulkBodyPoolBody.md) + - [BulkBodyPoolBodyActionsInner](docs/BulkBodyPoolBodyActionsInner.md) + - [BulkBodyVariableBody](docs/BulkBodyVariableBody.md) + - [BulkBodyVariableBodyActionsInner](docs/BulkBodyVariableBodyActionsInner.md) + - [BulkCreateActionConnectionBody](docs/BulkCreateActionConnectionBody.md) + - [BulkCreateActionPoolBody](docs/BulkCreateActionPoolBody.md) + - [BulkCreateActionVariableBody](docs/BulkCreateActionVariableBody.md) + - [BulkDeleteActionConnectionBody](docs/BulkDeleteActionConnectionBody.md) + - [BulkDeleteActionPoolBody](docs/BulkDeleteActionPoolBody.md) + - [BulkDeleteActionVariableBody](docs/BulkDeleteActionVariableBody.md) + - [BulkResponse](docs/BulkResponse.md) + - [BulkUpdateActionConnectionBody](docs/BulkUpdateActionConnectionBody.md) + - [BulkUpdateActionPoolBody](docs/BulkUpdateActionPoolBody.md) + - [BulkUpdateActionVariableBody](docs/BulkUpdateActionVariableBody.md) + - [ClearTaskInstancesBody](docs/ClearTaskInstancesBody.md) + - [ClearTaskInstancesBodyTaskIdsInner](docs/ClearTaskInstancesBodyTaskIdsInner.md) - [Config](docs/Config.md) - [ConfigOption](docs/ConfigOption.md) - [ConfigSection](docs/ConfigSection.md) - - [Connection](docs/Connection.md) - - [ConnectionAllOf](docs/ConnectionAllOf.md) - - [ConnectionCollection](docs/ConnectionCollection.md) - - [ConnectionCollectionAllOf](docs/ConnectionCollectionAllOf.md) - - [ConnectionCollectionItem](docs/ConnectionCollectionItem.md) - - [ConnectionTest](docs/ConnectionTest.md) - - [CronExpression](docs/CronExpression.md) - - [DAG](docs/DAG.md) - - [DAGCollection](docs/DAGCollection.md) - - [DAGCollectionAllOf](docs/DAGCollectionAllOf.md) - - [DAGDetail](docs/DAGDetail.md) - - [DAGDetailAllOf](docs/DAGDetailAllOf.md) - - [DAGRun](docs/DAGRun.md) - - [DAGRunCollection](docs/DAGRunCollection.md) - - [DAGRunCollectionAllOf](docs/DAGRunCollectionAllOf.md) + - [ConnectionBody](docs/ConnectionBody.md) + - [ConnectionCollectionResponse](docs/ConnectionCollectionResponse.md) + - [ConnectionResponse](docs/ConnectionResponse.md) + - [ConnectionTestResponse](docs/ConnectionTestResponse.md) + - [Content](docs/Content.md) + - [CreateAssetEventsBody](docs/CreateAssetEventsBody.md) + - [DAGCollectionResponse](docs/DAGCollectionResponse.md) + - [DAGDetailsResponse](docs/DAGDetailsResponse.md) + - [DAGPatchBody](docs/DAGPatchBody.md) + - [DAGResponse](docs/DAGResponse.md) + - [DAGRunClearBody](docs/DAGRunClearBody.md) + - [DAGRunCollectionResponse](docs/DAGRunCollectionResponse.md) + - [DAGRunPatchBody](docs/DAGRunPatchBody.md) + - [DAGRunPatchStates](docs/DAGRunPatchStates.md) + - [DAGRunResponse](docs/DAGRunResponse.md) + - [DAGRunsBatchBody](docs/DAGRunsBatchBody.md) + - [DAGSourceResponse](docs/DAGSourceResponse.md) + - [DAGTagCollectionResponse](docs/DAGTagCollectionResponse.md) + - [DAGVersionCollectionResponse](docs/DAGVersionCollectionResponse.md) + - [DAGWarningCollectionResponse](docs/DAGWarningCollectionResponse.md) + - [DAGWarningResponse](docs/DAGWarningResponse.md) + - [DagProcessorInfoResponse](docs/DagProcessorInfoResponse.md) + - [DagRunAssetReference](docs/DagRunAssetReference.md) + - [DagRunState](docs/DagRunState.md) + - [DagRunTriggeredByType](docs/DagRunTriggeredByType.md) + - [DagRunType](docs/DagRunType.md) - [DagScheduleAssetReference](docs/DagScheduleAssetReference.md) - - [DagState](docs/DagState.md) - - [DagWarning](docs/DagWarning.md) - - [DagWarningCollection](docs/DagWarningCollection.md) - - [DagWarningCollectionAllOf](docs/DagWarningCollectionAllOf.md) - - [Dataset](docs/Dataset.md) - - [Error](docs/Error.md) - - [EventLog](docs/EventLog.md) - - [EventLogCollection](docs/EventLogCollection.md) - - [EventLogCollectionAllOf](docs/EventLogCollectionAllOf.md) - - [ExtraLink](docs/ExtraLink.md) - - [ExtraLinkCollection](docs/ExtraLinkCollection.md) - - [HealthInfo](docs/HealthInfo.md) - - [HealthStatus](docs/HealthStatus.md) - - [ImportError](docs/ImportError.md) - - [ImportErrorCollection](docs/ImportErrorCollection.md) - - [ImportErrorCollectionAllOf](docs/ImportErrorCollectionAllOf.md) - - [InlineResponse200](docs/InlineResponse200.md) - - [InlineResponse2001](docs/InlineResponse2001.md) - - [Job](docs/Job.md) - - [ListDagRunsForm](docs/ListDagRunsForm.md) - - [ListTaskInstanceForm](docs/ListTaskInstanceForm.md) - - [MetadatabaseStatus](docs/MetadatabaseStatus.md) - - [PluginCollection](docs/PluginCollection.md) - - [PluginCollectionAllOf](docs/PluginCollectionAllOf.md) - - [PluginCollectionItem](docs/PluginCollectionItem.md) - - [Pool](docs/Pool.md) - - [PoolCollection](docs/PoolCollection.md) - - [PoolCollectionAllOf](docs/PoolCollectionAllOf.md) - - [Provider](docs/Provider.md) - - [ProviderCollection](docs/ProviderCollection.md) - - [RelativeDelta](docs/RelativeDelta.md) - - [Resource](docs/Resource.md) - - [Role](docs/Role.md) - - [RoleCollection](docs/RoleCollection.md) - - [RoleCollectionAllOf](docs/RoleCollectionAllOf.md) - - [SLAMiss](docs/SLAMiss.md) - - [ScheduleInterval](docs/ScheduleInterval.md) - - [SchedulerStatus](docs/SchedulerStatus.md) - - [SetDagRunNote](docs/SetDagRunNote.md) - - [SetTaskInstanceNote](docs/SetTaskInstanceNote.md) - - [Tag](docs/Tag.md) - - [Task](docs/Task.md) - - [TaskCollection](docs/TaskCollection.md) - - [TaskExtraLinks](docs/TaskExtraLinks.md) - - [TaskInstance](docs/TaskInstance.md) - - [TaskInstanceCollection](docs/TaskInstanceCollection.md) - - [TaskInstanceCollectionAllOf](docs/TaskInstanceCollectionAllOf.md) - - [TaskInstanceReference](docs/TaskInstanceReference.md) - - [TaskInstanceReferenceCollection](docs/TaskInstanceReferenceCollection.md) + - [DagStatsCollectionResponse](docs/DagStatsCollectionResponse.md) + - [DagStatsResponse](docs/DagStatsResponse.md) + - [DagStatsStateResponse](docs/DagStatsStateResponse.md) + - [DagTagResponse](docs/DagTagResponse.md) + - [DagVersionResponse](docs/DagVersionResponse.md) + - [DagWarningType](docs/DagWarningType.md) + - [Detail](docs/Detail.md) + - [DryRunBackfillCollectionResponse](docs/DryRunBackfillCollectionResponse.md) + - [DryRunBackfillResponse](docs/DryRunBackfillResponse.md) + - [EventLogCollectionResponse](docs/EventLogCollectionResponse.md) + - [EventLogResponse](docs/EventLogResponse.md) + - [ExtraLinkCollectionResponse](docs/ExtraLinkCollectionResponse.md) + - [FastAPIAppResponse](docs/FastAPIAppResponse.md) + - [FastAPIRootMiddlewareResponse](docs/FastAPIRootMiddlewareResponse.md) + - [HTTPExceptionResponse](docs/HTTPExceptionResponse.md) + - [HTTPValidationError](docs/HTTPValidationError.md) + - [HealthInfoResponse](docs/HealthInfoResponse.md) + - [ImportErrorCollectionResponse](docs/ImportErrorCollectionResponse.md) + - [ImportErrorResponse](docs/ImportErrorResponse.md) + - [JobCollectionResponse](docs/JobCollectionResponse.md) + - [JobResponse](docs/JobResponse.md) + - [PatchTaskInstanceBody](docs/PatchTaskInstanceBody.md) + - [PluginCollectionResponse](docs/PluginCollectionResponse.md) + - [PluginResponse](docs/PluginResponse.md) + - [PoolBody](docs/PoolBody.md) + - [PoolCollectionResponse](docs/PoolCollectionResponse.md) + - [PoolPatchBody](docs/PoolPatchBody.md) + - [PoolResponse](docs/PoolResponse.md) + - [ProviderCollectionResponse](docs/ProviderCollectionResponse.md) + - [ProviderResponse](docs/ProviderResponse.md) + - [QueuedEventCollectionResponse](docs/QueuedEventCollectionResponse.md) + - [QueuedEventResponse](docs/QueuedEventResponse.md) + - [ReprocessBehavior](docs/ReprocessBehavior.md) + - [ResponseClearDagRun](docs/ResponseClearDagRun.md) + - [ResponseGetXcomEntry](docs/ResponseGetXcomEntry.md) + - [SchedulerInfoResponse](docs/SchedulerInfoResponse.md) + - [StructuredLogMessage](docs/StructuredLogMessage.md) + - [TaskCollectionResponse](docs/TaskCollectionResponse.md) + - [TaskDependencyCollectionResponse](docs/TaskDependencyCollectionResponse.md) + - [TaskDependencyResponse](docs/TaskDependencyResponse.md) + - [TaskInstanceCollectionResponse](docs/TaskInstanceCollectionResponse.md) + - [TaskInstanceHistoryCollectionResponse](docs/TaskInstanceHistoryCollectionResponse.md) + - [TaskInstanceHistoryResponse](docs/TaskInstanceHistoryResponse.md) + - [TaskInstanceResponse](docs/TaskInstanceResponse.md) + - [TaskInstanceState](docs/TaskInstanceState.md) + - [TaskInstancesBatchBody](docs/TaskInstancesBatchBody.md) + - [TaskInstancesLogResponse](docs/TaskInstancesLogResponse.md) - [TaskOutletAssetReference](docs/TaskOutletAssetReference.md) - - [TaskState](docs/TaskState.md) + - [TaskResponse](docs/TaskResponse.md) - [TimeDelta](docs/TimeDelta.md) - - [Trigger](docs/Trigger.md) - - [TriggerRule](docs/TriggerRule.md) - - [UpdateDagRunState](docs/UpdateDagRunState.md) - - [UpdateTaskInstance](docs/UpdateTaskInstance.md) - - [UpdateTaskInstancesState](docs/UpdateTaskInstancesState.md) - - [User](docs/User.md) - - [UserAllOf](docs/UserAllOf.md) - - [UserCollection](docs/UserCollection.md) - - [UserCollectionAllOf](docs/UserCollectionAllOf.md) - - [UserCollectionItem](docs/UserCollectionItem.md) - - [UserCollectionItemRoles](docs/UserCollectionItemRoles.md) - - [Variable](docs/Variable.md) - - [VariableAllOf](docs/VariableAllOf.md) - - [VariableCollection](docs/VariableCollection.md) - - [VariableCollectionAllOf](docs/VariableCollectionAllOf.md) - - [VariableCollectionItem](docs/VariableCollectionItem.md) + - [TriggerDAGRunPostBody](docs/TriggerDAGRunPostBody.md) + - [TriggerResponse](docs/TriggerResponse.md) + - [TriggererInfoResponse](docs/TriggererInfoResponse.md) + - [ValidationError](docs/ValidationError.md) + - [ValidationErrorLocInner](docs/ValidationErrorLocInner.md) + - [Value](docs/Value.md) + - [VariableBody](docs/VariableBody.md) + - [VariableCollectionResponse](docs/VariableCollectionResponse.md) + - [VariableResponse](docs/VariableResponse.md) - [VersionInfo](docs/VersionInfo.md) - - [WeightRule](docs/WeightRule.md) - - [XCom](docs/XCom.md) - - [XComAllOf](docs/XComAllOf.md) - - [XComCollection](docs/XComCollection.md) - - [XComCollectionAllOf](docs/XComCollectionAllOf.md) - - [XComCollectionItem](docs/XComCollectionItem.md) + - [XComCollectionResponse](docs/XComCollectionResponse.md) + - [XComCreateBody](docs/XComCreateBody.md) + - [XComResponse](docs/XComResponse.md) + - [XComResponseNative](docs/XComResponseNative.md) + - [XComResponseString](docs/XComResponseString.md) + - [XComUpdateBody](docs/XComUpdateBody.md) ## Documentation For Authorization @@ -515,6 +561,7 @@ By default the generated client supports the three authentication schemes: * Basic * GoogleOpenID * Kerberos +* OAuth2PasswordBearer However, you can generate client and documentation with your own schemes by adding your own schemes in the security section of the OpenAPI specification. You can do it with Breeze CLI by adding the @@ -566,10 +613,8 @@ You can also set it by env variable: `export AIRFLOW__API__EXPOSE_CONFIG=True` ```python import airflow_client -# Configure HTTP basic authorization: Basic -configuration = airflow_client.client.Configuration( - host="http://localhost:8080/api/v1", username="admin", password="admin" -) +# get the access token from Airflow API Server via /auth/token +configuration = airflow_client.client.Configuration(host="http://localhost:8080", access_token=access_token) ``` * Run scheduler (or dag file processor you have setup with standalone dag file processor) for few parsing @@ -602,7 +647,7 @@ import sys sys.setrecursionlimit(1500) import airflow_client.client -from airflow_client.client.apis import * +from airflow_client.client.api import * from airflow_client.client.models import * ``` diff --git a/clients/python/test_python_client.py b/clients/python/test_python_client.py index d3e982d6b512b..bf04d68500ea6 100644 --- a/clients/python/test_python_client.py +++ b/clients/python/test_python_client.py @@ -63,9 +63,7 @@ access_token = generate_access_token("admin", "admin", "localhost:8080") -configuration = airflow_client.client.Configuration( - host="http://localhost:8080", -) +configuration = airflow_client.client.Configuration(host="http://localhost:8080", access_token=access_token) # Make sure in the [core] section, the `load_examples` config is set to True in your airflow.cfg # or AIRFLOW__CORE__LOAD_EXAMPLES environment variable set to True @@ -75,9 +73,7 @@ # Enter a context with an instance of the API client @pytest.mark.execution_timeout(400) def test_python_client(): - with airflow_client.client.ApiClient( - configuration, header_name="Authorization", header_value=f"Bearer {access_token}" - ) as api_client: + with airflow_client.client.ApiClient(configuration) as api_client: errors = False print("[blue]Getting DAG list") From 95d67083b795cd4dd5f48cd1e4b5f333d8231466 Mon Sep 17 00:00:00 2001 From: Pierre Jeambrun Date: Tue, 10 Jun 2025 16:48:04 +0200 Subject: [PATCH 007/122] Sync python client readme (#51536) (#51573) * Sync python client readme Sync python client readme with https://github.com/apache/airflow-client-python/pull/135/files * Revert bad change from airflow-client-python * pre-commit pass (cherry picked from commit 1ccca8198229fbc04491fa35b6066b6872254208) Co-authored-by: r-richmond --- clients/python/README.md | 52 +++++++++++++++++++++++++++++++--------- 1 file changed, 41 insertions(+), 11 deletions(-) diff --git a/clients/python/README.md b/clients/python/README.md index 2253aec8bcfef..f071d077354a2 100644 --- a/clients/python/README.md +++ b/clients/python/README.md @@ -178,14 +178,6 @@ For details on enabling/configuring CORS, see To be able to meet the requirements of many organizations, Airflow supports many authentication methods, and it is even possible to add your own method. -If you want to check which auth backend is currently set, you can use -`airflow config get-value api auth_backends` command as in the example below. - -```bash -$ airflow config get-value api auth_backends -airflow.providers.fab.auth_manager.api.auth.backend.basic_auth -``` - The default is to deny all requests. For details on configuring the authentication, see @@ -279,24 +271,62 @@ import airflow_client.client ## Getting Started +Before attempting the following examples ensure you have an account with API access. +As an example you can create an account for usage with the API as follows using the Airflow CLI. + +```bash +airflow users create -u admin-api -e admin-api@example.com -f admin-api -l admin-api -p $PASSWORD -r Admin +``` + Please follow the [installation procedure](#installation--usage) and then run the following: ```python import airflow_client.client +import requests from airflow_client.client.rest import ApiException from pprint import pprint +from pydantic import BaseModel + + +# What we expect back from auth/token +class AirflowAccessTokenResponse(BaseModel): + access_token: str + + +# An optional helper function to retrieve an access token +def get_airflow_client_access_token( + host: str, + username: str, + password: str, +) -> str: + url = f"{host}/auth/token" + payload = { + "username": username, + "password": password, + } + headers = {"Content-Type": "application/json"} + response = requests.post(url, json=payload, headers=headers) + if response.status_code != 201: + raise RuntimeError(f"Failed to get access token: {response.status_code} {response.text}") + response_success = AirflowAccessTokenResponse(**response.json()) + return response_success.access_token + # Defining the host is optional and defaults to http://localhost # See configuration.py for a list of all supported configuration parameters. -configuration = airflow_client.client.Configuration(host="http://localhost") +host = "http://localhost" +configuration = airflow_client.client.Configuration(host=host) # The client must configure the authentication and authorization parameters # in accordance with the API server security policy. # Examples for each auth method are provided below, use the example that # satisfies your auth use case. -configuration.access_token = os.environ["ACCESS_TOKEN"] - +configuration.access_token = get_airflow_client_access_token( + host=host, + username="admin-api", + password=os.environ["PASSWORD"], +) # Enter a context with an instance of the API client with airflow_client.client.ApiClient(configuration) as api_client: From c822f23f3638efd8dc6b94e70cd96d4174b5755c Mon Sep 17 00:00:00 2001 From: Pierre Jeambrun Date: Tue, 10 Jun 2025 18:02:13 +0200 Subject: [PATCH 008/122] Pass dag_run_conf form value to backfill payload. (#51577) (#51584) (cherry picked from commit 6ab6d53236af030260adcc126683c815df6143ad) Co-authored-by: Karthikeyan Singaravelan --- airflow-core/src/airflow/ui/src/queries/useCreateBackfill.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow-core/src/airflow/ui/src/queries/useCreateBackfill.ts b/airflow-core/src/airflow/ui/src/queries/useCreateBackfill.ts index 1aa9a42e4a865..feed7010b3b55 100644 --- a/airflow-core/src/airflow/ui/src/queries/useCreateBackfill.ts +++ b/airflow-core/src/airflow/ui/src/queries/useCreateBackfill.ts @@ -77,7 +77,7 @@ export const useCreateBackfill = ({ onSuccessConfirm }: { onSuccessConfirm: () = mutate({ requestBody: { dag_id: dagId, - dag_run_conf: {}, + dag_run_conf: data.requestBody.dag_run_conf ?? {}, from_date: formattedDataIntervalStart, max_active_runs: data.requestBody.max_active_runs, reprocess_behavior: data.requestBody.reprocess_behavior, From 8d3876873c644cb780e7453f7ed13d480a4eb988 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Tue, 10 Jun 2025 18:43:16 +0200 Subject: [PATCH 009/122] [v3-0-test] Add git binary to PROD docker image (#51580) (#51587) Since Git provider and git bundle requires git binary, we should add it to the PROD docker image (cherry picked from commit bf0bfe9dc1f3812989acfe6ed7118acf9ba5b586) Co-authored-by: Jarek Potiuk --- Dockerfile | 2 +- Dockerfile.ci | 2 +- docker-stack-docs/changelog.rst | 5 +++++ scripts/docker/install_os_dependencies.sh | 2 +- 4 files changed, 8 insertions(+), 3 deletions(-) diff --git a/Dockerfile b/Dockerfile index 0ca2986087de6..5ee3b0c829d94 100644 --- a/Dockerfile +++ b/Dockerfile @@ -137,7 +137,7 @@ function get_runtime_apt_deps() { echo if [[ "${RUNTIME_APT_DEPS=}" == "" ]]; then RUNTIME_APT_DEPS="apt-transport-https apt-utils ca-certificates \ -curl dumb-init freetds-bin krb5-user libev4 libgeos-dev \ +curl dumb-init freetds-bin git krb5-user libev4 libgeos-dev \ ldap-utils libsasl2-2 libsasl2-modules libxmlsec1 locales ${debian_version_apt_deps} \ lsb-release openssh-client python3-selinux rsync sasl2-bin sqlite3 sudo unixodbc" export RUNTIME_APT_DEPS diff --git a/Dockerfile.ci b/Dockerfile.ci index 4c46efbf2dc2c..3637654a54fce 100644 --- a/Dockerfile.ci +++ b/Dockerfile.ci @@ -76,7 +76,7 @@ function get_runtime_apt_deps() { echo if [[ "${RUNTIME_APT_DEPS=}" == "" ]]; then RUNTIME_APT_DEPS="apt-transport-https apt-utils ca-certificates \ -curl dumb-init freetds-bin krb5-user libev4 libgeos-dev \ +curl dumb-init freetds-bin git krb5-user libev4 libgeos-dev \ ldap-utils libsasl2-2 libsasl2-modules libxmlsec1 locales ${debian_version_apt_deps} \ lsb-release openssh-client python3-selinux rsync sasl2-bin sqlite3 sudo unixodbc" export RUNTIME_APT_DEPS diff --git a/docker-stack-docs/changelog.rst b/docker-stack-docs/changelog.rst index 6c5dfa7866998..52918cbdb317d 100644 --- a/docker-stack-docs/changelog.rst +++ b/docker-stack-docs/changelog.rst @@ -34,6 +34,11 @@ the Airflow team. any Airflow version from the ``Airflow 2`` line. There is no guarantee that it will work, but if it does, then you can use latest features from that image to build images for previous Airflow versions. +Airflow 3.0.3 +~~~~~~~~~~~~~ + + * The ``git`` binary was added to the image by default which is needed for the git provider to work. + Airflow 3.0.1 ~~~~~~~~~~~~~ diff --git a/scripts/docker/install_os_dependencies.sh b/scripts/docker/install_os_dependencies.sh index 8bd63fe5c024b..dab5966008252 100644 --- a/scripts/docker/install_os_dependencies.sh +++ b/scripts/docker/install_os_dependencies.sh @@ -58,7 +58,7 @@ function get_runtime_apt_deps() { echo if [[ "${RUNTIME_APT_DEPS=}" == "" ]]; then RUNTIME_APT_DEPS="apt-transport-https apt-utils ca-certificates \ -curl dumb-init freetds-bin krb5-user libev4 libgeos-dev \ +curl dumb-init freetds-bin git krb5-user libev4 libgeos-dev \ ldap-utils libsasl2-2 libsasl2-modules libxmlsec1 locales ${debian_version_apt_deps} \ lsb-release openssh-client python3-selinux rsync sasl2-bin sqlite3 sudo unixodbc" export RUNTIME_APT_DEPS From e4b19768eec6890c0c7215793ea6e574c9d76e02 Mon Sep 17 00:00:00 2001 From: Kaxil Naik Date: Sat, 17 May 2025 13:35:20 +0530 Subject: [PATCH 010/122] Change v1 to v2 in generated OpenAPI schema files (#50705) (cherry picked from commit baf5faec690b09703c38e5b05b459058c2914a83) --- airflow-core/docs/conf.py | 4 ++-- ...r-generated.yaml => v2-simple-auth-manager-generated.yaml} | 0 .../airflow/api_fastapi/auth/managers/simple/ui/package.json | 2 +- ...{v1-rest-api-generated.yaml => v2-rest-api-generated.yaml} | 0 airflow-core/src/airflow/ui/openapi-merge.json | 2 +- airflow-ctl/pyproject.toml | 2 +- airflow-ctl/src/airflowctl/api/datamodels/auth_generated.py | 2 +- airflow-ctl/src/airflowctl/api/datamodels/generated.py | 2 +- contributing-docs/16_adding_api_endpoints.rst | 2 +- dev/README_RELEASE_PYTHON_CLIENT.md | 2 +- dev/airflow-github | 2 +- .../airflow_breeze/commands/release_management_commands.py | 2 +- dev/breeze/tests/test_selective_checks.py | 2 +- devel-common/src/docs/provider_conf.py | 2 +- ...ager-generated.yaml => v2-fab-auth-manager-generated.yaml} | 0 scripts/in_container/run_generate_openapi_spec.py | 4 ++-- 16 files changed, 15 insertions(+), 15 deletions(-) rename airflow-core/src/airflow/api_fastapi/auth/managers/simple/openapi/{v1-simple-auth-manager-generated.yaml => v2-simple-auth-manager-generated.yaml} (100%) rename airflow-core/src/airflow/api_fastapi/core_api/openapi/{v1-rest-api-generated.yaml => v2-rest-api-generated.yaml} (100%) rename providers/fab/src/airflow/providers/fab/auth_manager/api_fastapi/openapi/{v1-fab-auth-manager-generated.yaml => v2-fab-auth-manager-generated.yaml} (100%) diff --git a/airflow-core/docs/conf.py b/airflow-core/docs/conf.py index dfcafe1ed99a8..6ee9437509cc4 100644 --- a/airflow-core/docs/conf.py +++ b/airflow-core/docs/conf.py @@ -349,8 +349,8 @@ def add_airflow_core_exclude_patterns_to_sphinx(exclude_patterns: list[str]): graphviz_output_format = "svg" -main_openapi_path = Path(main_openapi_file).parent.joinpath("v1-rest-api-generated.yaml") -sam_openapi_path = Path(sam_openapi_file).parent.joinpath("v1-simple-auth-manager-generated.yaml") +main_openapi_path = Path(main_openapi_file).parent.joinpath("v2-rest-api-generated.yaml") +sam_openapi_path = Path(sam_openapi_file).parent.joinpath("v2-simple-auth-manager-generated.yaml") redoc = [ { "name": "Simple auth manager token API", diff --git a/airflow-core/src/airflow/api_fastapi/auth/managers/simple/openapi/v1-simple-auth-manager-generated.yaml b/airflow-core/src/airflow/api_fastapi/auth/managers/simple/openapi/v2-simple-auth-manager-generated.yaml similarity index 100% rename from airflow-core/src/airflow/api_fastapi/auth/managers/simple/openapi/v1-simple-auth-manager-generated.yaml rename to airflow-core/src/airflow/api_fastapi/auth/managers/simple/openapi/v2-simple-auth-manager-generated.yaml diff --git a/airflow-core/src/airflow/api_fastapi/auth/managers/simple/ui/package.json b/airflow-core/src/airflow/api_fastapi/auth/managers/simple/ui/package.json index 61e99e7773143..451b1939ffcdf 100644 --- a/airflow-core/src/airflow/api_fastapi/auth/managers/simple/ui/package.json +++ b/airflow-core/src/airflow/api_fastapi/auth/managers/simple/ui/package.json @@ -10,7 +10,7 @@ "lint:fix": "eslint --fix && tsc --p tsconfig.app.json", "format": "pnpm prettier --write .", "preview": "vite preview", - "codegen": "openapi-rq -i \"../openapi/v1-simple-auth-manager-generated.yaml\" -c axios --format prettier -o openapi-gen --operationId", + "codegen": "openapi-rq -i \"../openapi/v2-simple-auth-manager-generated.yaml\" -c axios --format prettier -o openapi-gen --operationId", "test": "vitest run", "coverage": "vitest run --coverage" }, diff --git a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v1-rest-api-generated.yaml b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml similarity index 100% rename from airflow-core/src/airflow/api_fastapi/core_api/openapi/v1-rest-api-generated.yaml rename to airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml diff --git a/airflow-core/src/airflow/ui/openapi-merge.json b/airflow-core/src/airflow/ui/openapi-merge.json index 7b4bcb75ca15f..059cadf6fd507 100644 --- a/airflow-core/src/airflow/ui/openapi-merge.json +++ b/airflow-core/src/airflow/ui/openapi-merge.json @@ -1,7 +1,7 @@ { "inputs": [ { - "inputFile": "../api_fastapi/core_api/openapi/v1-rest-api-generated.yaml" + "inputFile": "../api_fastapi/core_api/openapi/v2-rest-api-generated.yaml" }, { "inputFile": "../api_fastapi/core_api/openapi/_private_ui.yaml" diff --git a/airflow-ctl/pyproject.toml b/airflow-ctl/pyproject.toml index e9bbc394453b2..02f6919aee21f 100644 --- a/airflow-ctl/pyproject.toml +++ b/airflow-ctl/pyproject.toml @@ -127,7 +127,7 @@ use-standard-collections=true # list[] not List[] use-subclass-enum=true # enum, not union of Literals use-union-operator=true # 3.9+annotations, not `Union[]` -url = "http://0.0.0.0:28080/openapi.json" +input = "../airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml" output = "src/airflowctl/api/datamodels/generated.py" ## pytest settings ## diff --git a/airflow-ctl/src/airflowctl/api/datamodels/auth_generated.py b/airflow-ctl/src/airflowctl/api/datamodels/auth_generated.py index 4f2bc0a9b0eb9..5898f1928ea7b 100644 --- a/airflow-ctl/src/airflowctl/api/datamodels/auth_generated.py +++ b/airflow-ctl/src/airflowctl/api/datamodels/auth_generated.py @@ -16,7 +16,7 @@ # under the License. # generated by datamodel-codegen: -# filename: http://0.0.0.0:8080/auth/openapi.json +# filename: v2-simple-auth-manager-generated.yaml # version: 0.28.2 # mypy: disable-error-code="assignment" diff --git a/airflow-ctl/src/airflowctl/api/datamodels/generated.py b/airflow-ctl/src/airflowctl/api/datamodels/generated.py index 74fdedbc628c0..dbfaef6f398b9 100644 --- a/airflow-ctl/src/airflowctl/api/datamodels/generated.py +++ b/airflow-ctl/src/airflowctl/api/datamodels/generated.py @@ -16,7 +16,7 @@ # under the License. # generated by datamodel-codegen: -# filename: http://0.0.0.0:28080/openapi.json +# filename: v2-rest-api-generated.yaml # version: 0.28.2 from __future__ import annotations diff --git a/contributing-docs/16_adding_api_endpoints.rst b/contributing-docs/16_adding_api_endpoints.rst index 02d2940b4182a..1881e7a2ba8a2 100644 --- a/contributing-docs/16_adding_api_endpoints.rst +++ b/contributing-docs/16_adding_api_endpoints.rst @@ -74,7 +74,7 @@ Step 4: Run Pre-commit Hooks ----------------------------- 1. Ensure all code meets the project's quality standards by running pre-commit hooks. 2. Pre-commit hooks include static code checks, formatting, and other validations. -3. Persisted openapi spec is located in ``v1-rest-api-generated.yaml` and a hook will take care of updating it based on your new endpoint, you just need to add and commit the change. +3. Persisted openapi spec is located in ``v2-rest-api-generated.yaml` and a hook will take care of updating it based on your new endpoint, you just need to add and commit the change. 4. Run the following command to execute all pre-commit hooks: .. code-block:: bash diff --git a/dev/README_RELEASE_PYTHON_CLIENT.md b/dev/README_RELEASE_PYTHON_CLIENT.md index 126f9ebe9d257..8fe8909e054bb 100644 --- a/dev/README_RELEASE_PYTHON_CLIENT.md +++ b/dev/README_RELEASE_PYTHON_CLIENT.md @@ -91,7 +91,7 @@ echo "${VERSION}" > clients/python/version.txt ```shell script cd ${AIRFLOW_REPO_ROOT} -git log 2.8.0..HEAD --pretty=oneline -- airflow-core/src/airflow/api_fastapi/core_api/openapi/v1-rest-api-generated.yaml +git log 2.8.0..HEAD --pretty=oneline -- airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml ``` - Update CHANGELOG.md with the details. diff --git a/dev/airflow-github b/dev/airflow-github index 562c28c3fc28e..5866f13f4ffea 100755 --- a/dev/airflow-github +++ b/dev/airflow-github @@ -408,7 +408,7 @@ def api_clients_policy(previous_version, target_version): repo, previous_version, target_version, - files=[f"{repo.working_dir}/airflow-core/src/airflow/api_fastapi/core_api/openapi/v1-rest-api-generated.yaml"], + files=[f"{repo.working_dir}/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml"], ) clients_need_release = False diff --git a/dev/breeze/src/airflow_breeze/commands/release_management_commands.py b/dev/breeze/src/airflow_breeze/commands/release_management_commands.py index c2fa9b0807c89..c54d76467acb7 100644 --- a/dev/breeze/src/airflow_breeze/commands/release_management_commands.py +++ b/dev/breeze/src/airflow_breeze/commands/release_management_commands.py @@ -3067,7 +3067,7 @@ def split_version_and_suffix(file_name: str, suffix: str) -> VersionedFile: VERSION_FILE = PYTHON_CLIENT_DIR_PATH / "version.txt" SOURCE_API_YAML_PATH = ( - AIRFLOW_ROOT_PATH / "airflow-core/src/airflow/api_fastapi/core_api/openapi/v1-rest-api-generated.yaml" + AIRFLOW_ROOT_PATH / "airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml" ) TARGET_API_YAML_PATH = PYTHON_CLIENT_DIR_PATH / "v2.yaml" OPENAPI_GENERATOR_CLI_VER = "7.13.0" diff --git a/dev/breeze/tests/test_selective_checks.py b/dev/breeze/tests/test_selective_checks.py index 4b4ecdf0bb3b4..956b151261e2c 100644 --- a/dev/breeze/tests/test_selective_checks.py +++ b/dev/breeze/tests/test_selective_checks.py @@ -2009,7 +2009,7 @@ def test_expected_output_push( id="Tests for all airflow core types except providers should run if model file changed", ), pytest.param( - ("airflow-core/src/airflow/api_fastapi/core_api/openapi/v1-rest-api-generated.yaml",), + ("airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml",), { "selected-providers-list-as-string": "", "all-python-versions": f"['{DEFAULT_PYTHON_MAJOR_MINOR_VERSION}']", diff --git a/devel-common/src/docs/provider_conf.py b/devel-common/src/docs/provider_conf.py index ce5f09188d336..7e19a79b2d77f 100644 --- a/devel-common/src/docs/provider_conf.py +++ b/devel-common/src/docs/provider_conf.py @@ -360,7 +360,7 @@ "v1-flask-api.yaml" ) fab_auth_manager_fastapi_api_path = Path(fab_auth_manager_fastapi_api_file).parent.joinpath( - "v1-fab-auth-manager-generated.yaml" + "v2-fab-auth-manager-generated.yaml" ) redoc = [ { diff --git a/providers/fab/src/airflow/providers/fab/auth_manager/api_fastapi/openapi/v1-fab-auth-manager-generated.yaml b/providers/fab/src/airflow/providers/fab/auth_manager/api_fastapi/openapi/v2-fab-auth-manager-generated.yaml similarity index 100% rename from providers/fab/src/airflow/providers/fab/auth_manager/api_fastapi/openapi/v1-fab-auth-manager-generated.yaml rename to providers/fab/src/airflow/providers/fab/auth_manager/api_fastapi/openapi/v2-fab-auth-manager-generated.yaml diff --git a/scripts/in_container/run_generate_openapi_spec.py b/scripts/in_container/run_generate_openapi_spec.py index 1caea62e67d5f..cd8c0d5d4f4f8 100755 --- a/scripts/in_container/run_generate_openapi_spec.py +++ b/scripts/in_container/run_generate_openapi_spec.py @@ -29,12 +29,12 @@ sys.path.insert(0, str(Path(__file__).parent.resolve())) from in_container_utils import console, generate_openapi_file, validate_openapi_file -OPENAPI_SPEC_FILE = Path(CORE_API_PATH).parent / "openapi" / "v1-rest-api-generated.yaml" +OPENAPI_SPEC_FILE = Path(CORE_API_PATH).parent / "openapi" / "v2-rest-api-generated.yaml" # We need a "combined" spec file to generate the UI code with, but we don't want to include this in the repo # nor in the rendered docs, so we make this a separate file which is gitignored OPENAPI_UI_SPEC_FILE = Path(CORE_API_PATH).parent / "openapi" / "_private_ui.yaml" SIMPLE_AUTH_MANAGER_OPENAPI_SPEC_FILE = ( - Path(SIMPLE_AUTH_MANAGER_PATH).parent / "openapi" / "v1-simple-auth-manager-generated.yaml" + Path(SIMPLE_AUTH_MANAGER_PATH).parent / "openapi" / "v2-simple-auth-manager-generated.yaml" ) # Generate main application openapi spec From 9a8a50386fca28961e73c81c702342a913fac8d6 Mon Sep 17 00:00:00 2001 From: Kaxil Naik Date: Wed, 21 May 2025 01:06:31 +0530 Subject: [PATCH 011/122] Fix the version for generated client (#50859) The command that was generated before: ``` docker run --rm -u 501:20 -v /Users/kaxilnaik/Documents/GitHub/astronomer/airflow/clients/python/v2.yaml:/spec.yaml -v /Users/kaxilnaik/Documents/GitHub/astronomer/airflow/clients/python/tmp:/output openapitools/openapi-generator-cli:v7.13.0 generate --input-spec /spec.yaml --generator-name python --git-user-id None --output /output --package-name airflow_client.client --git-repo-id airflow-client-python --additional-properties 'packageVersion="3.0.0"' ``` which caused things like `__version__ = ""2.10.0""` ( [here](https://github.com/apache/airflow-client-python/blob/4bd5b2544e30b05bd8cd03502dc99bd6784a20d6/airflow_client/client/__init__.py) ) or `__version__ = ""3.0.0""` in [here](https://github.com/apache/airflow-client-python/blob/38367b1158b777c87f51fe4d6283273031a4fb60/airflow_client/client/__init__.py#L17) While this does not cause error the user-agent and things are weird: ```py self.user_agent = 'OpenAPI-Generator/"3.0.0"/python' ``` vs ```py self.user_agent = 'OpenAPI-Generator/3.0.0/python' ``` (cherry picked from commit e1f4290de84966b087166b42999ae54383d8c1c1) --- .../src/airflow_breeze/commands/release_management_commands.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dev/breeze/src/airflow_breeze/commands/release_management_commands.py b/dev/breeze/src/airflow_breeze/commands/release_management_commands.py index c54d76467acb7..50307e73b42f1 100644 --- a/dev/breeze/src/airflow_breeze/commands/release_management_commands.py +++ b/dev/breeze/src/airflow_breeze/commands/release_management_commands.py @@ -3135,7 +3135,7 @@ def _generate_python_client_sources(python_client_version: str) -> None: "--git-repo-id", "airflow-client-python", "--additional-properties", - f'packageVersion="{python_client_version}"', + f"packageVersion={python_client_version}", ], capture_output=True, text=True, From 6192aa20aa430cba1f6297950edc64f4b23d7731 Mon Sep 17 00:00:00 2001 From: Kaxil Naik Date: Wed, 11 Jun 2025 01:18:53 +0530 Subject: [PATCH 012/122] Update Python Client to 3.0.2 --- clients/python/CHANGELOG.md | 18 ++++++++++++++++++ clients/python/version.txt | 2 +- 2 files changed, 19 insertions(+), 1 deletion(-) diff --git a/clients/python/CHANGELOG.md b/clients/python/CHANGELOG.md index d44d7d8f5edba..095e52c8ee292 100644 --- a/clients/python/CHANGELOG.md +++ b/clients/python/CHANGELOG.md @@ -17,6 +17,24 @@ under the License. --> +# v3.0.2 + +## Major changes: + +- Add `owner_links` field to DAGDetailsResponse ([#50557](https://github.com/apache/airflow/pull/50557)) +- Allow non-string valid JSON values in Variable import ([#49844](https://github.com/apache/airflow/pull/49844)) +- Add `bundle_version` to DagRun response ([#49726](https://github.com/apache/airflow/pull/49726)) +- Use `NonNegativeInt` for `backfill_id` ([#49691](https://github.com/apache/airflow/pull/49691)) +- Rename operation IDs for task instance endpoints to include map indexes ([#49608](https://github.com/apache/airflow/pull/49608)) +- Remove filtering by last dag run state in patch dags endpoint ([#51176](https://github.com/apache/airflow/pull/51176)) +- Make `dag_run` nullable in Details page ([#50719](https://github.com/apache/airflow/pull/50719)) + +## Bug Fixes + +- Fix OpenAPI schema for `get_log` API ([#50547](https://github.com/apache/airflow/pull/50547)) +- Fix bulk action annotation ([#50852](https://github.com/apache/airflow/pull/50852)) +- Fix `patch_task_instance` endpoint ([#50550](https://github.com/apache/airflow/pull/50550)) + # v3.0.0 This is the first release of the **Airflow 3.0.0** Python client. It introduces compatibility with the new [Airflow 3.0 REST API](https://airflow.apache.org/docs/apache-airflow/3.0.0/stable-rest-api-ref.html), and includes several **breaking changes** and behavior updates. diff --git a/clients/python/version.txt b/clients/python/version.txt index 4a36342fcab70..b50214693056f 100644 --- a/clients/python/version.txt +++ b/clients/python/version.txt @@ -1 +1 @@ -3.0.0 +3.0.2 From 43df0ac5e03c13f0c865b0973f6e9a3dcdb8d71c Mon Sep 17 00:00:00 2001 From: Vincent <97131062+vincbeck@users.noreply.github.com> Date: Tue, 13 May 2025 12:35:11 -0400 Subject: [PATCH 013/122] Upgrade `flask-appbuilder` to 4.6.3 in FAB provider (#50513) (cherry picked from commit e6430c25cdb00d8cf56093deb1b1561ae615a5f6) --- providers/fab/pyproject.toml | 2 +- .../role_and_permission_endpoint.py | 4 ++ .../api_endpoints/user_endpoint.py | 4 ++ .../fab/auth_manager/models/__init__.py | 62 +++++++++++++++++-- .../fab/auth_manager/models/anonymous_user.py | 6 +- .../auth_manager/security_manager/override.py | 46 +++++++------- .../auth_manager/api_endpoints/test_auth.py | 2 - .../models/test_anonymous_user.py | 2 +- .../unit/fab/auth_manager/test_security.py | 12 ++-- 9 files changed, 102 insertions(+), 38 deletions(-) diff --git a/providers/fab/pyproject.toml b/providers/fab/pyproject.toml index 905be02a20cf8..080aea46f177a 100644 --- a/providers/fab/pyproject.toml +++ b/providers/fab/pyproject.toml @@ -71,7 +71,7 @@ dependencies = [ # Every time we update FAB version here, please make sure that you review the classes and models in # `airflow/providers/fab/auth_manager/security_manager/override.py` with their upstream counterparts. # In particular, make sure any breaking changes, for example any new methods, are accounted for. - "flask-appbuilder==4.5.3", + "flask-appbuilder==4.6.3", "flask-login>=0.6.2", # Flask-Session 0.6 add new arguments into the SqlAlchemySessionInterface constructor as well as # all parameters now are mandatory which make AirflowDatabaseSessionInterface incompatible with this version. diff --git a/providers/fab/src/airflow/providers/fab/auth_manager/api_endpoints/role_and_permission_endpoint.py b/providers/fab/src/airflow/providers/fab/auth_manager/api_endpoints/role_and_permission_endpoint.py index e8aacedea8c83..fa5e29782b9c2 100644 --- a/providers/fab/src/airflow/providers/fab/auth_manager/api_endpoints/role_and_permission_endpoint.py +++ b/providers/fab/src/airflow/providers/fab/auth_manager/api_endpoints/role_and_permission_endpoint.py @@ -123,6 +123,8 @@ def patch_role(*, role_name: str, update_mask: UpdateMask = None) -> APIResponse """Update a role.""" security_manager = cast("FabAuthManager", get_auth_manager()).security_manager body = request.json + if body is None: + raise BadRequest("Request body is required") try: data = role_schema.load(body) except ValidationError as err: @@ -156,6 +158,8 @@ def post_role() -> APIResponse: """Create a new role.""" security_manager = cast("FabAuthManager", get_auth_manager()).security_manager body = request.json + if body is None: + raise BadRequest("Request body is required") try: data = role_schema.load(body) except ValidationError as err: diff --git a/providers/fab/src/airflow/providers/fab/auth_manager/api_endpoints/user_endpoint.py b/providers/fab/src/airflow/providers/fab/auth_manager/api_endpoints/user_endpoint.py index 8c504d6446694..e8f9fc83d9059 100644 --- a/providers/fab/src/airflow/providers/fab/auth_manager/api_endpoints/user_endpoint.py +++ b/providers/fab/src/airflow/providers/fab/auth_manager/api_endpoints/user_endpoint.py @@ -88,6 +88,8 @@ def get_users(*, limit: int, order_by: str = "id", offset: str | None = None) -> @requires_access_custom_view("POST", permissions.RESOURCE_USER) def post_user() -> APIResponse: """Create a new user.""" + if request.json is None: + raise BadRequest("Request body is required") try: data = user_schema.load(request.json) except ValidationError as e: @@ -131,6 +133,8 @@ def post_user() -> APIResponse: @requires_access_custom_view("PUT", permissions.RESOURCE_USER) def patch_user(*, username: str, update_mask: UpdateMask = None) -> APIResponse: """Update a user.""" + if request.json is None: + raise BadRequest("Request body is required") try: data = user_schema.load(request.json) except ValidationError as e: diff --git a/providers/fab/src/airflow/providers/fab/auth_manager/models/__init__.py b/providers/fab/src/airflow/providers/fab/auth_manager/models/__init__.py index 1a34c9b6884eb..cb6f59f6ad576 100644 --- a/providers/fab/src/airflow/providers/fab/auth_manager/models/__init__.py +++ b/providers/fab/src/airflow/providers/fab/auth_manager/models/__init__.py @@ -102,11 +102,37 @@ def __repr__(self): "ab_permission_view_role", Model.metadata, Column("id", Integer, primary_key=True), - Column("permission_view_id", Integer, ForeignKey("ab_permission_view.id")), - Column("role_id", Integer, ForeignKey("ab_role.id")), + Column( + "permission_view_id", + Integer, + ForeignKey("ab_permission_view.id", ondelete="CASCADE"), + ), + Column("role_id", Integer, ForeignKey("ab_role.id", ondelete="CASCADE")), UniqueConstraint("permission_view_id", "role_id"), ) +assoc_user_group = Table( + "ab_user_group", + Model.metadata, + Column("id", Integer, primary_key=True), + Column("user_id", Integer, ForeignKey("ab_user.id", ondelete="CASCADE")), + Column("group_id", Integer, ForeignKey("ab_group.id", ondelete="CASCADE")), + UniqueConstraint("user_id", "group_id"), + Index("idx_user_id", "user_id"), + Index("idx_user_group_id", "group_id"), +) + +assoc_group_role = Table( + "ab_group_role", + Model.metadata, + Column("id", Integer, primary_key=True), + Column("group_id", Integer, ForeignKey("ab_group.id", ondelete="CASCADE")), + Column("role_id", Integer, ForeignKey("ab_role.id", ondelete="CASCADE")), + UniqueConstraint("group_id", "role_id"), + Index("idx_group_id", "group_id"), + Index("idx_group_role_id", "role_id"), +) + class Role(Model): """Represents a user role to which permissions can be assigned.""" @@ -115,7 +141,29 @@ class Role(Model): id = Column(Integer, primary_key=True) name = Column(String(64), unique=True, nullable=False) - permissions = relationship("Permission", secondary=assoc_permission_role, backref="role", lazy="joined") + permissions = relationship( + "Permission", + secondary=assoc_permission_role, + backref="role", + lazy="joined", + passive_deletes=True, + ) + + def __repr__(self): + return self.name + + +class Group(Model): + """Represents a user group.""" + + __tablename__ = "ab_group" + + id = Column(Integer, primary_key=True) + name = Column(String(100), unique=True, nullable=False) + label = Column(String(150)) + description = Column(String(512)) + users = relationship("User", secondary=assoc_user_group, backref="groups", passive_deletes=True) + roles = relationship("Role", secondary=assoc_group_role, backref="groups", passive_deletes=True) def __repr__(self): return self.name @@ -148,8 +196,8 @@ def __repr__(self): "ab_user_role", Model.metadata, Column("id", Integer, primary_key=True), - Column("user_id", Integer, ForeignKey("ab_user.id")), - Column("role_id", Integer, ForeignKey("ab_role.id")), + Column("user_id", Integer, ForeignKey("ab_user.id", ondelete="CASCADE")), + Column("role_id", Integer, ForeignKey("ab_role.id", ondelete="CASCADE")), UniqueConstraint("user_id", "role_id"), ) @@ -170,7 +218,9 @@ class User(Model, BaseUser): last_login = Column(DateTime) login_count = Column(Integer) fail_login_count = Column(Integer) - roles = relationship("Role", secondary=assoc_user_role, backref="user", lazy="selectin") + roles = relationship( + "Role", secondary=assoc_user_role, backref="user", lazy="selectin", passive_deletes=True + ) created_on = Column(DateTime, default=datetime.datetime.now, nullable=True) changed_on = Column(DateTime, default=datetime.datetime.now, nullable=True) diff --git a/providers/fab/src/airflow/providers/fab/auth_manager/models/anonymous_user.py b/providers/fab/src/airflow/providers/fab/auth_manager/models/anonymous_user.py index b9abd5f165378..f50341b2304c2 100644 --- a/providers/fab/src/airflow/providers/fab/auth_manager/models/anonymous_user.py +++ b/providers/fab/src/airflow/providers/fab/auth_manager/models/anonymous_user.py @@ -37,13 +37,17 @@ def roles(self): if not self._roles: public_role = current_app.config.get("AUTH_ROLE_PUBLIC", None) self._roles = {current_app.appbuilder.sm.find_role(public_role)} if public_role else set() - return self._roles + return list(self._roles) @roles.setter def roles(self, roles): self._roles = roles self._perms = set() + @property + def groups(self): + return [] + @property def perms(self): if not self._perms: diff --git a/providers/fab/src/airflow/providers/fab/auth_manager/security_manager/override.py b/providers/fab/src/airflow/providers/fab/auth_manager/security_manager/override.py index 5bd05cd131da3..7cb4377a956e8 100644 --- a/providers/fab/src/airflow/providers/fab/auth_manager/security_manager/override.py +++ b/providers/fab/src/airflow/providers/fab/auth_manager/security_manager/override.py @@ -55,6 +55,7 @@ AuthOIDView, AuthRemoteUserView, RegisterUserModelView, + UserGroupModelView, ) from flask_babel import lazy_gettext from flask_jwt_extended import JWTManager @@ -71,6 +72,7 @@ from airflow.models import DagBag from airflow.providers.fab.auth_manager.models import ( Action, + Group, Permission, RegisterUser, Resource, @@ -100,10 +102,7 @@ from airflow.providers.fab.auth_manager.views.user_stats import CustomUserStatsChartView from airflow.providers.fab.www.security import permissions from airflow.providers.fab.www.security_manager import AirflowSecurityManagerV2 -from airflow.providers.fab.www.session import ( - AirflowDatabaseSessionInterface, - AirflowDatabaseSessionInterface as FabAirflowDatabaseSessionInterface, -) +from airflow.providers.fab.www.session import AirflowDatabaseSessionInterface from airflow.security.permissions import RESOURCE_BACKFILL if TYPE_CHECKING: @@ -149,6 +148,7 @@ class FabAirflowSecurityManagerOverride(AirflowSecurityManagerV2): """ Models """ user_model = User role_model = Role + group_model = Group action_model = Action resource_model = Resource permission_model = Permission @@ -173,6 +173,7 @@ class FabAirflowSecurityManagerOverride(AirflowSecurityManagerV2): actionmodelview = ActionModelView permissionmodelview = PermissionPairModelView rolemodelview = CustomRoleModelView + groupmodelview = UserGroupModelView registeruser_model = RegisterUser registerusermodelview = RegisterUserModelView resourcemodelview = ResourceModelView @@ -450,7 +451,7 @@ def register_views(self): role_view = self.appbuilder.add_view( self.rolemodelview, "List Roles", - icon="fa-group", + icon="fa-user-gear", label=lazy_gettext("List Roles"), category="Security", category_icon="fa-cogs", @@ -532,12 +533,7 @@ def reset_password(self, userid: int, password: str) -> bool: return self.update_user(user) def reset_user_sessions(self, user: User) -> None: - if isinstance( - self.appbuilder.get_app.session_interface, AirflowDatabaseSessionInterface - ) or isinstance( - self.appbuilder.get_app.session_interface, - FabAirflowDatabaseSessionInterface, - ): + if isinstance(self.appbuilder.get_app.session_interface, AirflowDatabaseSessionInterface): interface = self.appbuilder.get_app.session_interface session = interface.db.session user_session_model = interface.sql_session_model @@ -859,6 +855,7 @@ def _init_data_model(self): self.registerusermodelview.datamodel = SQLAInterface(self.registeruser_model) self.rolemodelview.datamodel = SQLAInterface(self.role_model) + self.groupmodelview.datamodel = SQLAInterface(self.group_model) self.actionmodelview.datamodel = SQLAInterface(self.action_model) self.resourcemodelview.datamodel = SQLAInterface(self.resource_model) self.permissionmodelview.datamodel = SQLAInterface(self.permission_model) @@ -875,7 +872,8 @@ def create_db(self): try: engine = self.get_session.get_bind(mapper=None, clause=None) inspector = inspect(engine) - if "ab_user" not in inspector.get_table_names(): + existing_tables = inspector.get_table_names() + if "ab_user" not in existing_tables or "ab_group" not in existing_tables: log.info(const.LOGMSG_INF_SEC_NO_DB) Base.metadata.create_all(engine) log.info(const.LOGMSG_INF_SEC_ADD_DB) @@ -1323,15 +1321,20 @@ def get_public_role(self): def add_user( self, - username, - first_name, - last_name, - email, - role, - password="", - hashed_password="", + username: str, + first_name: str, + last_name: str, + email: str, + role: list[Role] | Role | None = None, + password: str = "", + hashed_password: str = "", + groups: list[Group] | None = None, ): """Create a user.""" + roles: list[Role] = [] + if role: + roles = role if isinstance(role, list) else [role] + try: user = self.user_model() user.first_name = first_name @@ -1340,7 +1343,8 @@ def add_user( user.email = email user.active = True self.get_session.add(user) - user.roles = role if isinstance(role, list) else [role] + user.roles = roles + user.groups = groups or [] if hashed_password: user.password = hashed_password else: @@ -1704,7 +1708,7 @@ def get_user_roles(user=None): """ if user is None: user = g.user - return user.roles + return user.roles + [role for group in user.groups for role in group.roles] """ -------------------- diff --git a/providers/fab/tests/unit/fab/auth_manager/api_endpoints/test_auth.py b/providers/fab/tests/unit/fab/auth_manager/api_endpoints/test_auth.py index 63b88495715f6..c59c7dcf78221 100644 --- a/providers/fab/tests/unit/fab/auth_manager/api_endpoints/test_auth.py +++ b/providers/fab/tests/unit/fab/auth_manager/api_endpoints/test_auth.py @@ -19,7 +19,6 @@ from base64 import b64encode import pytest -from flask_login import current_user from tests_common.test_utils.config import conf_vars from tests_common.test_utils.db import clear_db_pools @@ -74,6 +73,5 @@ def test_success(self): with self.app.test_client() as test_client: response = test_client.get("/fab/v1/users", headers={"Authorization": token}) - assert current_user.email == "test@fab.org" assert response.status_code == 200 diff --git a/providers/fab/tests/unit/fab/auth_manager/models/test_anonymous_user.py b/providers/fab/tests/unit/fab/auth_manager/models/test_anonymous_user.py index eaf6b357f9264..ea959bd25bfb2 100644 --- a/providers/fab/tests/unit/fab/auth_manager/models/test_anonymous_user.py +++ b/providers/fab/tests/unit/fab/auth_manager/models/test_anonymous_user.py @@ -25,7 +25,7 @@ class TestAnonymousUser: def test_roles(self): - roles = {"role1"} + roles = ["role1"] user = AnonymousUser() user.roles = roles assert user.roles == roles diff --git a/providers/fab/tests/unit/fab/auth_manager/test_security.py b/providers/fab/tests/unit/fab/auth_manager/test_security.py index 6d8e7f2521a36..0f3314b80c59f 100644 --- a/providers/fab/tests/unit/fab/auth_manager/test_security.py +++ b/providers/fab/tests/unit/fab/auth_manager/test_security.py @@ -353,7 +353,7 @@ def test_verify_default_anon_user_has_no_accessible_dag_ids( mock_is_logged_in.return_value = False user = AnonymousUser() app.config["AUTH_ROLE_PUBLIC"] = "Public" - assert security_manager.get_user_roles(user) == {security_manager.get_public_role()} + assert security_manager.get_user_roles(user) == [security_manager.get_public_role()] with _create_dag_model_context("test_dag_id", session, security_manager): security_manager.sync_roles() @@ -365,7 +365,7 @@ def test_verify_default_anon_user_has_no_access_to_specific_dag(app, session, se with app.app_context(): user = AnonymousUser() app.config["AUTH_ROLE_PUBLIC"] = "Public" - assert security_manager.get_user_roles(user) == {security_manager.get_public_role()} + assert security_manager.get_user_roles(user) == [security_manager.get_public_role()] dag_id = "test_dag_id" with _create_dag_model_context(dag_id, session, security_manager): @@ -392,7 +392,7 @@ def test_verify_anon_user_with_admin_role_has_all_dag_access( mock_is_logged_in.return_value = False user = AnonymousUser() - assert security_manager.get_user_roles(user) == {security_manager.get_public_role()} + assert security_manager.get_user_roles(user) == [security_manager.get_public_role()] security_manager.sync_roles() @@ -408,7 +408,7 @@ def test_verify_anon_user_with_admin_role_has_access_to_each_dag( # Call `.get_user_roles` bc `user` is a mock and the `user.roles` prop needs to be set. user.roles = security_manager.get_user_roles(user) - assert user.roles == {security_manager.get_public_role()} + assert user.roles == [security_manager.get_public_role()] test_dag_ids = ["test_dag_id_1", "test_dag_id_2", "test_dag_id_3", "test_dag_id_4.with_dot"] @@ -425,8 +425,8 @@ def test_verify_anon_user_with_admin_role_has_access_to_each_dag( def test_get_user_roles(app_builder, security_manager): user = mock.MagicMock() roles = app_builder.sm.find_role("Admin") - user.roles = roles - assert security_manager.get_user_roles(user) == roles + user.roles = [roles] + assert security_manager.get_user_roles(user) == [roles] def test_get_user_roles_for_anonymous_user(app, security_manager): From 09353bf8666839d0d6eb71287fdb54b9a84f9c73 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Wed, 11 Jun 2025 11:29:32 +0200 Subject: [PATCH 014/122] [v3-0-test] Structure endpoint task level resolved aliases (#51481) (#51579) * Structure endpoint task level resolved aliases * Address PR comments (cherry picked from commit acf1e77478a42aa11a80bfa3a3d44b4bcf3a7f08) Co-authored-by: Pierre Jeambrun --- .../core_api/routes/ui/structure.py | 12 +- .../core_api/services/ui/structure.py | 80 ++++++++--- .../core_api/routes/ui/test_structure.py | 129 ++++++++++++++++-- 3 files changed, 190 insertions(+), 31 deletions(-) diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/structure.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/structure.py index a84f36ab010d3..738e4c6edf65f 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/structure.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/structure.py @@ -122,7 +122,15 @@ def structure_data( elif ( dependency.target == dependency.dependency_type or dependency.source == dag_id ) and exit_node_ref: - end_edges.append({"source_id": exit_node_ref["id"], "target_id": dependency.node_id}) + end_edges.append( + { + "source_id": exit_node_ref["id"], + "target_id": dependency.node_id, + "resolved_from_alias": dependency.source.replace("asset-alias:", "", 1) + if dependency.source.startswith("asset-alias:") + else None, + } + ) # Add nodes nodes.append( @@ -142,6 +150,6 @@ def structure_data( data["edges"] += start_edges + end_edges - bind_output_assets_to_tasks(data["edges"], serialized_dag) + bind_output_assets_to_tasks(data["edges"], serialized_dag, version_number, session) return StructureDataResponse(**data) diff --git a/airflow-core/src/airflow/api_fastapi/core_api/services/ui/structure.py b/airflow-core/src/airflow/api_fastapi/core_api/services/ui/structure.py index 6f5f415d3fdb7..db3d1ba6deac4 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/services/ui/structure.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/services/ui/structure.py @@ -23,6 +23,14 @@ from __future__ import annotations +from collections import defaultdict + +from sqlalchemy import select +from sqlalchemy.orm import Session + +from airflow.models.asset import AssetAliasModel, AssetEvent +from airflow.models.dag_version import DagVersion +from airflow.models.dagrun import DagRun from airflow.models.serialized_dag import SerializedDagModel @@ -116,30 +124,62 @@ def get_upstream_assets( return nodes, edges -def bind_output_assets_to_tasks(edges: list[dict], serialized_dag: SerializedDagModel) -> None: +def bind_output_assets_to_tasks( + edges: list[dict], serialized_dag: SerializedDagModel, version_number: int, session: Session +) -> None: """ Try to bind the downstream assets to the relevant task that produces them. This function will mutate the `edges` in place. """ + # bind normal assets present in the `task_outlet_asset_references` outlet_asset_references = serialized_dag.dag_model.task_outlet_asset_references - downstream_asset_related_edges = [edge for edge in edges if edge["target_id"].startswith("asset:")] - - for edge in downstream_asset_related_edges: - asset_id = int(edge["target_id"].strip("asset:")) - try: - # Try to attach the outlet asset to the relevant task - outlet_asset_reference = next( - outlet_asset_reference - for outlet_asset_reference in outlet_asset_references - if outlet_asset_reference.asset_id == asset_id - ) - edge["source_id"] = outlet_asset_reference.task_id - continue - except StopIteration: - # If no asset reference found, fallback to using the exit node reference - # This can happen because asset aliases are not yet handled, they do no populate - # the `outlet_asset_references` when resolved. Extra lookup is needed. Same for asset-name-ref and - # asset-uri-ref. - pass + downstream_asset_edges = [ + edge + for edge in edges + if edge["target_id"].startswith("asset:") and not edge.get("resolved_from_alias") + ] + + for edge in downstream_asset_edges: + # Try to attach the outlet assets to the relevant tasks + asset_id = int(edge["target_id"].replace("asset:", "", 1)) + outlet_asset_reference = next( + outlet_asset_reference + for outlet_asset_reference in outlet_asset_references + if outlet_asset_reference.asset_id == asset_id + ) + edge["source_id"] = outlet_asset_reference.task_id + + # bind assets resolved from aliases, they do not populate the `outlet_asset_references` + downstream_alias_resolved_edges = [ + edge for edge in edges if edge["target_id"].startswith("asset:") and edge.get("resolved_from_alias") + ] + + aliases_names = {edges["resolved_from_alias"] for edges in downstream_alias_resolved_edges} + + result = session.scalars( + select(AssetEvent) + .join(AssetEvent.source_aliases) + .join(AssetEvent.source_dag_run) + # That's a simplification, instead doing `version_number` in `DagRun.dag_versions`. + .join(DagRun.created_dag_version) + .where(AssetEvent.source_aliases.any(AssetAliasModel.name.in_(aliases_names))) + .where(AssetEvent.source_dag_run.has(DagRun.dag_id == serialized_dag.dag_model.dag_id)) + .where(DagVersion.version_number == version_number) + ).unique() + + asset_id_to_task_ids = defaultdict(set) + for asset_event in result: + asset_id_to_task_ids[asset_event.asset_id].add(asset_event.source_task_id) + + for edge in downstream_alias_resolved_edges: + asset_id = int(edge["target_id"].replace("asset:", "", 1)) + task_ids = asset_id_to_task_ids.get(asset_id, set()) + + for index, task_id in enumerate(task_ids): + if index == 0: + edge["source_id"] = task_id + continue + edge_copy = {**edge, "source_id": task_id} + edges.append(edge_copy) diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_structure.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_structure.py index 2c6425db1b395..1587797256a7a 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_structure.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_structure.py @@ -25,18 +25,21 @@ from sqlalchemy.orm import Session from airflow.models import DagBag -from airflow.models.asset import AssetModel +from airflow.models.asset import AssetAliasModel, AssetEvent, AssetModel from airflow.providers.standard.operators.empty import EmptyOperator from airflow.providers.standard.operators.trigger_dagrun import TriggerDagRunOperator from airflow.providers.standard.sensors.external_task import ExternalTaskSensor +from airflow.sdk import Metadata, task from airflow.sdk.definitions.asset import Asset, AssetAlias, Dataset +from airflow.utils import timezone -from tests_common.test_utils.db import clear_db_runs +from tests_common.test_utils.db import clear_db_assets, clear_db_runs pytestmark = pytest.mark.db_test DAG_ID = "dag_with_multiple_versions" DAG_ID_EXTERNAL_TRIGGER = "external_trigger" +DAG_ID_RESOLVED_ASSET_ALIAS = "dag_with_resolved_asset_alias" LATEST_VERSION_DAG_RESPONSE: dict = { "edges": [], "nodes": [ @@ -95,8 +98,10 @@ def examples_dag_bag() -> DagBag: @pytest.fixture(autouse=True) def clean(): clear_db_runs() + clear_db_assets() yield clear_db_runs() + clear_db_assets() @pytest.fixture @@ -115,7 +120,7 @@ def asset3() -> Dataset: @pytest.fixture -def make_dag(dag_maker, session, time_machine, asset1: Asset, asset2: Asset, asset3: Dataset) -> None: +def make_dags(dag_maker, session, time_machine, asset1: Asset, asset2: Asset, asset3: Dataset) -> None: with dag_maker( dag_id=DAG_ID_EXTERNAL_TRIGGER, serialized=True, @@ -123,7 +128,6 @@ def make_dag(dag_maker, session, time_machine, asset1: Asset, asset2: Asset, ass start_date=pendulum.DateTime(2023, 2, 1, 0, 0, 0, tzinfo=pendulum.UTC), ): TriggerDagRunOperator(task_id="trigger_dag_run_operator", trigger_dag_id=DAG_ID) - dag_maker.sync_dagbag_to_db() with dag_maker( @@ -138,7 +142,45 @@ def make_dag(dag_maker, session, time_machine, asset1: Asset, asset2: Asset, ass >> ExternalTaskSensor(task_id="external_task_sensor", external_dag_id=DAG_ID) >> EmptyOperator(task_id="task_2") ) + dag_maker.sync_dagbag_to_db() + + with dag_maker( + dag_id=DAG_ID_RESOLVED_ASSET_ALIAS, + serialized=True, + session=session, + start_date=pendulum.DateTime(2023, 2, 1, 0, 0, 0, tzinfo=pendulum.UTC), + ): + + @task(outlets=[AssetAlias("example-alias-resolved")]) + def task_1(**context): + yield Metadata( + asset=Asset("resolved_example_asset_alias"), + extra={"k": "v"}, # extra has to be provided, can be {} + alias=AssetAlias("example-alias-resolved"), + ) + task_1() >> EmptyOperator(task_id="task_2") + + dr = dag_maker.create_dagrun() + asset_alias = session.scalar( + select(AssetAliasModel).where(AssetAliasModel.name == "example-alias-resolved") + ) + asset_model = AssetModel(name="resolved_example_asset_alias") + session.add(asset_model) + session.flush() + asset_alias.assets.append(asset_model) + asset_alias.asset_events.append( + AssetEvent( + id=1, + timestamp=timezone.parse("2021-01-01T00:00:00"), + asset_id=asset_model.id, + source_dag_id=DAG_ID_RESOLVED_ASSET_ALIAS, + source_task_id="task_1", + source_run_id=dr.run_id, + source_map_index=-1, + ) + ) + session.commit() dag_maker.sync_dagbag_to_db() @@ -151,17 +193,17 @@ def _fetch_asset_id(asset: Asset, session: Session) -> str: @pytest.fixture -def asset1_id(make_dag, asset1, session: Session) -> str: +def asset1_id(make_dags, asset1, session: Session) -> str: return _fetch_asset_id(asset1, session) @pytest.fixture -def asset2_id(make_dag, asset2, session) -> str: +def asset2_id(make_dags, asset2, session) -> str: return _fetch_asset_id(asset2, session) @pytest.fixture -def asset3_id(make_dag, asset3, session) -> str: +def asset3_id(make_dags, asset3, session) -> str: return _fetch_asset_id(asset3, session) @@ -296,13 +338,13 @@ class TestStructureDataEndpoint: ), ], ) - @pytest.mark.usefixtures("make_dag") + @pytest.mark.usefixtures("make_dags") def test_should_return_200(self, test_client, params, expected): response = test_client.get("/structure/structure_data", params=params) assert response.status_code == 200 assert response.json() == expected - @pytest.mark.usefixtures("make_dag") + @pytest.mark.usefixtures("make_dags") def test_should_return_200_with_asset(self, test_client, asset1_id, asset2_id, asset3_id): params = { "dag_id": DAG_ID, @@ -492,6 +534,75 @@ def test_should_return_200_with_asset(self, test_client, asset1_id, asset2_id, a assert response.status_code == 200 assert response.json() == expected + @pytest.mark.usefixtures("make_dags") + def test_should_return_200_with_resolved_asset_alias_attached_to_the_corrrect_producing_task( + self, test_client, session + ): + resolved_asset = session.scalar( + session.query(AssetModel).filter_by(name="resolved_example_asset_alias") + ) + params = { + "dag_id": DAG_ID_RESOLVED_ASSET_ALIAS, + "external_dependencies": True, + } + expected = { + "edges": [ + { + "source_id": "task_1", + "target_id": "task_2", + "is_setup_teardown": None, + "label": None, + "is_source_asset": None, + }, + { + "source_id": "task_1", + "target_id": f"asset:{resolved_asset.id}", + "is_setup_teardown": None, + "label": None, + "is_source_asset": None, + }, + ], + "nodes": [ + { + "id": "task_1", + "label": "task_1", + "type": "task", + "children": None, + "is_mapped": None, + "tooltip": None, + "setup_teardown_type": None, + "operator": "@task", + "asset_condition_type": None, + }, + { + "id": "task_2", + "label": "task_2", + "type": "task", + "children": None, + "is_mapped": None, + "tooltip": None, + "setup_teardown_type": None, + "operator": "EmptyOperator", + "asset_condition_type": None, + }, + { + "id": f"asset:{resolved_asset.id}", + "label": "resolved_example_asset_alias", + "type": "asset", + "children": None, + "is_mapped": None, + "tooltip": None, + "setup_teardown_type": None, + "operator": None, + "asset_condition_type": None, + }, + ], + } + + response = test_client.get("/structure/structure_data", params=params) + assert response.status_code == 200 + assert response.json() == expected + @pytest.mark.parametrize( "params, expected", [ From 7c9b2978f1d1c93ee261fad071659aac325c2148 Mon Sep 17 00:00:00 2001 From: Pierre Jeambrun Date: Wed, 11 Jun 2025 19:53:59 +0200 Subject: [PATCH 015/122] Add config setting to control exposing stacktraces (#51509) (#51617) * feat: add a config setting to expose stacktraces * fix: remove the starting newline to simplify the traceback info * test: update tests to align with the config change * feat: add exception id for better correlation between ui messages and log entries * test: update tests * fix: use random string as exception id instead of python object id * fix: update tests with patched random strings * test: use patch fixtures in tests to prevent side effects (cherry picked from commit c4cd58cc30aeb2ba0c6cb92a5f1d00d5c25bdada) Co-authored-by: Zhen-Lun (Kevin) Hong --- .../airflow/api_fastapi/common/exceptions.py | 23 ++++++++++++ .../src/airflow/config_templates/config.yml | 6 ++++ .../api_fastapi/common/test_exceptions.py | 35 +++++++++++++++++-- .../routes/public/test_connections.py | 2 +- .../core_api/routes/public/test_dag_run.py | 9 ++--- .../core_api/routes/public/test_pools.py | 2 +- 6 files changed, 66 insertions(+), 11 deletions(-) diff --git a/airflow-core/src/airflow/api_fastapi/common/exceptions.py b/airflow-core/src/airflow/api_fastapi/common/exceptions.py index 061eec55d3d84..39909b7a46395 100644 --- a/airflow-core/src/airflow/api_fastapi/common/exceptions.py +++ b/airflow-core/src/airflow/api_fastapi/common/exceptions.py @@ -17,6 +17,8 @@ from __future__ import annotations +import logging +import traceback from abc import ABC, abstractmethod from enum import Enum from typing import Generic, TypeVar @@ -24,8 +26,13 @@ from fastapi import HTTPException, Request, status from sqlalchemy.exc import IntegrityError +from airflow.configuration import conf +from airflow.utils.strings import get_random_string + T = TypeVar("T", bound=Exception) +log = logging.getLogger(__name__) + class BaseErrorHandler(Generic[T], ABC): """Base class for error handlers.""" @@ -61,12 +68,28 @@ def __init__(self): def exception_handler(self, request: Request, exc: IntegrityError): """Handle IntegrityError exception.""" if self._is_dialect_matched(exc): + exception_id = get_random_string() + stacktrace = "" + for tb in traceback.format_tb(exc.__traceback__): + stacktrace += tb + + log_message = f"Error with id {exception_id}\n{stacktrace}" + log.error(log_message) + if conf.get("api", "expose_stacktrace") == "True": + message = log_message + else: + message = ( + "Serious error when handling your request. Check logs for more details - " + f"you will find it in api server when you look for ID {exception_id}" + ) + raise HTTPException( status_code=status.HTTP_409_CONFLICT, detail={ "reason": "Unique constraint violation", "statement": str(exc.statement), "orig_error": str(exc.orig), + "message": message, }, ) diff --git a/airflow-core/src/airflow/config_templates/config.yml b/airflow-core/src/airflow/config_templates/config.yml index 948299dec0d5c..437b04e771e22 100644 --- a/airflow-core/src/airflow/config_templates/config.yml +++ b/airflow-core/src/airflow/config_templates/config.yml @@ -1348,6 +1348,12 @@ api: type: string example: ~ default: "False" + expose_stacktrace: + description: Expose stacktrace in the web server + version_added: ~ + type: string + example: ~ + default: "False" base_url: description: | The base url of the API server. Airflow cannot guess what domain or CNAME you are using. diff --git a/airflow-core/tests/unit/api_fastapi/common/test_exceptions.py b/airflow-core/tests/unit/api_fastapi/common/test_exceptions.py index bed77a67a2723..b5136310611e0 100644 --- a/airflow-core/tests/unit/api_fastapi/common/test_exceptions.py +++ b/airflow-core/tests/unit/api_fastapi/common/test_exceptions.py @@ -16,6 +16,8 @@ # under the License. from __future__ import annotations +from unittest.mock import patch + import pytest from fastapi import HTTPException, status from sqlalchemy.exc import IntegrityError @@ -26,6 +28,7 @@ from airflow.utils.session import provide_session from airflow.utils.state import DagRunState +from tests_common.test_utils.config import conf_vars from tests_common.test_utils.db import clear_db_connections, clear_db_dags, clear_db_pools, clear_db_runs pytestmark = pytest.mark.db_test @@ -50,6 +53,11 @@ "reason": f"Test for {_DatabaseDialect.POSTGRES.value} only", }, ] +MOCKED_ID = "TgVcT3QW" +MESSAGE = ( + "Serious error when handling your request. Check logs for more details - " + f"you will find it in api server when you look for ID {MOCKED_ID}" +) def generate_test_cases_parametrize( @@ -109,6 +117,7 @@ def teardown_method(self) -> None: "reason": "Unique constraint violation", "statement": "INSERT INTO slot_pool (pool, slots, description, include_deferred) VALUES (?, ?, ?, ?)", "orig_error": "UNIQUE constraint failed: slot_pool.pool", + "message": MESSAGE, }, ), HTTPException( @@ -117,6 +126,7 @@ def teardown_method(self) -> None: "reason": "Unique constraint violation", "statement": "INSERT INTO slot_pool (pool, slots, description, include_deferred) VALUES (%s, %s, %s, %s)", "orig_error": "(1062, \"Duplicate entry 'test_pool' for key 'slot_pool.slot_pool_pool_uq'\")", + "message": MESSAGE, }, ), HTTPException( @@ -125,6 +135,7 @@ def teardown_method(self) -> None: "reason": "Unique constraint violation", "statement": "INSERT INTO slot_pool (pool, slots, description, include_deferred) VALUES (%(pool)s, %(slots)s, %(description)s, %(include_deferred)s) RETURNING slot_pool.id", "orig_error": 'duplicate key value violates unique constraint "slot_pool_pool_uq"\nDETAIL: Key (pool)=(test_pool) already exists.\n', + "message": MESSAGE, }, ), ], @@ -135,6 +146,7 @@ def teardown_method(self) -> None: "reason": "Unique constraint violation", "statement": 'INSERT INTO variable ("key", val, description, is_encrypted) VALUES (?, ?, ?, ?)', "orig_error": "UNIQUE constraint failed: variable.key", + "message": MESSAGE, }, ), HTTPException( @@ -143,6 +155,7 @@ def teardown_method(self) -> None: "reason": "Unique constraint violation", "statement": "INSERT INTO variable (`key`, val, description, is_encrypted) VALUES (%s, %s, %s, %s)", "orig_error": "(1062, \"Duplicate entry 'test_key' for key 'variable.variable_key_uq'\")", + "message": MESSAGE, }, ), HTTPException( @@ -151,14 +164,23 @@ def teardown_method(self) -> None: "reason": "Unique constraint violation", "statement": "INSERT INTO variable (key, val, description, is_encrypted) VALUES (%(key)s, %(val)s, %(description)s, %(is_encrypted)s) RETURNING variable.id", "orig_error": 'duplicate key value violates unique constraint "variable_key_uq"\nDETAIL: Key (key)=(test_key) already exists.\n', + "message": MESSAGE, }, ), ], ], ), ) + @patch("airflow.api_fastapi.common.exceptions.get_random_string", return_value=MOCKED_ID) + @conf_vars({("api", "expose_stacktrace"): "False"}) @provide_session - def test_handle_single_column_unique_constraint_error(self, session, table, expected_exception) -> None: + def test_handle_single_column_unique_constraint_error( + self, + mock_get_random_string, + session, + table, + expected_exception, + ) -> None: # Take Pool and Variable tables as test cases if table == "Pool": session.add(Pool(pool=TEST_POOL, slots=1, description="test pool", include_deferred=False)) @@ -188,6 +210,7 @@ def test_handle_single_column_unique_constraint_error(self, session, table, expe "reason": "Unique constraint violation", "statement": "INSERT INTO dag_run (dag_id, queued_at, logical_date, start_date, end_date, state, run_id, creating_job_id, run_type, triggered_by, conf, data_interval_start, data_interval_end, run_after, last_scheduling_decision, log_template_id, updated_at, clear_number, backfill_id, bundle_version, scheduled_by_job_id, context_carrier, created_dag_version_id) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, (SELECT max(log_template.id) AS max_1 \nFROM log_template), ?, ?, ?, ?, ?, ?, ?)", "orig_error": "UNIQUE constraint failed: dag_run.dag_id, dag_run.run_id", + "message": MESSAGE, }, ), HTTPException( @@ -196,6 +219,7 @@ def test_handle_single_column_unique_constraint_error(self, session, table, expe "reason": "Unique constraint violation", "statement": "INSERT INTO dag_run (dag_id, queued_at, logical_date, start_date, end_date, state, run_id, creating_job_id, run_type, triggered_by, conf, data_interval_start, data_interval_end, run_after, last_scheduling_decision, log_template_id, updated_at, clear_number, backfill_id, bundle_version, scheduled_by_job_id, context_carrier, created_dag_version_id) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, (SELECT max(log_template.id) AS max_1 \nFROM log_template), %s, %s, %s, %s, %s, %s, %s)", "orig_error": "(1062, \"Duplicate entry 'test_dag_id-test_run_id' for key 'dag_run.dag_run_dag_id_run_id_key'\")", + "message": MESSAGE, }, ), HTTPException( @@ -204,15 +228,22 @@ def test_handle_single_column_unique_constraint_error(self, session, table, expe "reason": "Unique constraint violation", "statement": "INSERT INTO dag_run (dag_id, queued_at, logical_date, start_date, end_date, state, run_id, creating_job_id, run_type, triggered_by, conf, data_interval_start, data_interval_end, run_after, last_scheduling_decision, log_template_id, updated_at, clear_number, backfill_id, bundle_version, scheduled_by_job_id, context_carrier, created_dag_version_id) VALUES (%(dag_id)s, %(queued_at)s, %(logical_date)s, %(start_date)s, %(end_date)s, %(state)s, %(run_id)s, %(creating_job_id)s, %(run_type)s, %(triggered_by)s, %(conf)s, %(data_interval_start)s, %(data_interval_end)s, %(run_after)s, %(last_scheduling_decision)s, (SELECT max(log_template.id) AS max_1 \nFROM log_template), %(updated_at)s, %(clear_number)s, %(backfill_id)s, %(bundle_version)s, %(scheduled_by_job_id)s, %(context_carrier)s, %(created_dag_version_id)s) RETURNING dag_run.id", "orig_error": 'duplicate key value violates unique constraint "dag_run_dag_id_run_id_key"\nDETAIL: Key (dag_id, run_id)=(test_dag_id, test_run_id) already exists.\n', + "message": MESSAGE, }, ), ], ], ), ) + @patch("airflow.api_fastapi.common.exceptions.get_random_string", return_value=MOCKED_ID) + @conf_vars({("api", "expose_stacktrace"): "False"}) @provide_session def test_handle_multiple_columns_unique_constraint_error( - self, session, table, expected_exception + self, + mock_get_random_string, + session, + table, + expected_exception, ) -> None: if table == "DagRun": session.add( diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_connections.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_connections.py index 502621169dbc4..ddff799431cf7 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_connections.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_connections.py @@ -306,7 +306,7 @@ def test_post_should_respond_already_exist(self, test_client, body): assert response.status_code == 409 response_json = response.json() assert "detail" in response_json - assert list(response_json["detail"].keys()) == ["reason", "statement", "orig_error"] + assert list(response_json["detail"].keys()) == ["reason", "statement", "orig_error", "message"] @pytest.mark.enable_redact @pytest.mark.parametrize( diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py index 2d60fedbd191c..4ab197c2f599d 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py @@ -38,12 +38,7 @@ from airflow.utils.types import DagRunTriggeredByType, DagRunType from tests_common.test_utils.api_fastapi import _check_dag_run_note, _check_last_log -from tests_common.test_utils.db import ( - clear_db_dags, - clear_db_logs, - clear_db_runs, - clear_db_serialized_dags, -) +from tests_common.test_utils.db import clear_db_dags, clear_db_logs, clear_db_runs, clear_db_serialized_dags from tests_common.test_utils.format_datetime import from_datetime_to_zulu, from_datetime_to_zulu_without_ms if TYPE_CHECKING: @@ -1569,7 +1564,7 @@ def test_response_409(self, test_client): assert response.status_code == 409 response_json = response.json() assert "detail" in response_json - assert list(response_json["detail"].keys()) == ["reason", "statement", "orig_error"] + assert list(response_json["detail"].keys()) == ["reason", "statement", "orig_error", "message"] @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle") def test_should_respond_200_with_null_logical_date(self, test_client): diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_pools.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_pools.py index fb27a1f4c911d..68c2375913657 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_pools.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_pools.py @@ -417,7 +417,7 @@ def test_should_response_409( else: response_json = response.json() assert "detail" in response_json - assert list(response_json["detail"].keys()) == ["reason", "statement", "orig_error"] + assert list(response_json["detail"].keys()) == ["reason", "statement", "orig_error", "message"] assert session.query(Pool).count() == n_pools + 1 From d3b33a94a3e1f890dbe111763975c00c234321da Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Wed, 11 Jun 2025 20:20:17 +0200 Subject: [PATCH 016/122] [v3-0-test] Use Connection Hook Names for Dropdown instead of connection IDs (#51599) (#51613) (cherry picked from commit f41f5f23396a6dc97373ba1109f9dce28860026d) Co-authored-by: Jens Scheffler <95105677+jscheffl@users.noreply.github.com> --- .../src/airflow/ui/src/pages/Connections/ConnectionForm.tsx | 3 ++- .../src/airflow/ui/src/queries/useConnectionTypeMeta.ts | 6 ++++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/airflow-core/src/airflow/ui/src/pages/Connections/ConnectionForm.tsx b/airflow-core/src/airflow/ui/src/pages/Connections/ConnectionForm.tsx index 6a17f1794f6bf..866d555c06857 100644 --- a/airflow-core/src/airflow/ui/src/pages/Connections/ConnectionForm.tsx +++ b/airflow-core/src/airflow/ui/src/pages/Connections/ConnectionForm.tsx @@ -49,6 +49,7 @@ const ConnectionForm = ({ const [errors, setErrors] = useState<{ conf?: string }>({}); const { formattedData: connectionTypeMeta, + hookNames: hookNameMap, isPending: isMetaPending, keysList: connectionTypes, } = useConnectionTypeMeta(); @@ -116,7 +117,7 @@ const ConnectionForm = ({ }; const connTypesOptions = connectionTypes.map((conn) => ({ - label: conn, + label: hookNameMap[conn], value: conn, })); diff --git a/airflow-core/src/airflow/ui/src/queries/useConnectionTypeMeta.ts b/airflow-core/src/airflow/ui/src/queries/useConnectionTypeMeta.ts index 8f9d4137c2683..5f6f5c8826771 100644 --- a/airflow-core/src/airflow/ui/src/queries/useConnectionTypeMeta.ts +++ b/airflow-core/src/airflow/ui/src/queries/useConnectionTypeMeta.ts @@ -58,6 +58,7 @@ export const useConnectionTypeMeta = () => { } const formattedData: Record = {}; + const hookNames: Record = {}; const keysList: Array = []; const defaultStandardFields: StandardFieldSpec | undefined = { @@ -91,6 +92,7 @@ export const useConnectionTypeMeta = () => { data?.forEach((item) => { const key = item.connection_type; + hookNames[key] = item.hook_name; keysList.push(key); const populatedStandardFields: StandardFieldSpec = mergeWithDefaults( @@ -109,7 +111,7 @@ export const useConnectionTypeMeta = () => { }; }); - keysList.sort((first, second) => first.localeCompare(second)); + keysList.sort((first, second) => (hookNames[first] ?? first).localeCompare(hookNames[second] ?? second)); - return { formattedData, isPending, keysList }; + return { formattedData, hookNames, isPending, keysList }; }; From aa95a37a3ee4cd5fb1154382516fc2eb5677f83a Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Wed, 11 Jun 2025 21:23:58 +0200 Subject: [PATCH 017/122] [v3-0-test] Exclude libcst 1.8.1 for Python 3.9 (#51606) (#51609) * build(pre-commit): ignore 1.8.1 as it misses typing-extensions which is needed in Python 3.9 * build(pyproject.toml): include libcst==1.8.1 for python 3.9 as it misses typing-extentions (cherry picked from commit 2bd4c1707633d602c8d2ebc95212aeab0727748c) Co-authored-by: Wei Lee --- .pre-commit-config.yaml | 3 ++- airflow-core/pyproject.toml | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 01a585c19c456..e5c88598f172a 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -226,7 +226,8 @@ repos: entry: ./scripts/ci/pre_commit/check_deferrable_default.py pass_filenames: false # libcst doesn't have source wheels for all PY except PY3.12, excluding it - additional_dependencies: ['libcst>=1.1.0,!=1.8.0'] + # libcst 1.8.1 doesn't include typing-extensions which is needed for Python 3.9 + additional_dependencies: ['libcst>=1.1.0,!=1.8.0,!=1.8.1'] files: ^(providers/.*/)?airflow/.*/(sensors|operators)/.*\.py$ - repo: https://github.com/asottile/blacken-docs rev: 1.19.1 diff --git a/airflow-core/pyproject.toml b/airflow-core/pyproject.toml index e29f1589cdefa..f42fad6dc764b 100644 --- a/airflow-core/pyproject.toml +++ b/airflow-core/pyproject.toml @@ -95,7 +95,8 @@ dependencies = [ "jinja2>=3.1.5", "jsonschema>=4.19.1", "lazy-object-proxy>=1.2.0", - "libcst >=1.1.0", + 'libcst >=1.1.0,!=1.8.1;python_version<"3.10"', + 'libcst >=1.1.0;python_version>="3.10"', "linkify-it-py>=2.0.0", "lockfile>=0.12.2", "methodtools>=0.4.7", From 414b75215d03d02e51f47c55a66cdf5bedd3f165 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Wed, 11 Jun 2025 16:33:40 -0400 Subject: [PATCH 018/122] [v3-0-test] Refactor `structuredLog` structure and layout (#51567) (#51626) (cherry picked from commit b6db7a757b44b36bce821195bf4a5123aa62723f) Co-authored-by: Guan Ming(Wesley) Chiu <105915352+guan404ming@users.noreply.github.com> --- .../ui/src/components/renderStructuredLog.tsx | 54 +++++++++---------- 1 file changed, 27 insertions(+), 27 deletions(-) diff --git a/airflow-core/src/airflow/ui/src/components/renderStructuredLog.tsx b/airflow-core/src/airflow/ui/src/components/renderStructuredLog.tsx index 122c68d3a1fed..ec1bc622f590d 100644 --- a/airflow-core/src/airflow/ui/src/components/renderStructuredLog.tsx +++ b/airflow-core/src/airflow/ui/src/components/renderStructuredLog.tsx @@ -124,25 +124,6 @@ export const renderStructuredLog = ({ return ""; } - elements.push( - - {index} - , - ); - if (Boolean(timestamp)) { elements.push("[",