diff --git a/airflow-ctl-tests/tests/airflowctl_tests/conftest.py b/airflow-ctl-tests/tests/airflowctl_tests/conftest.py index 202f5d3ca4e83..720403e3c642d 100644 --- a/airflow-ctl-tests/tests/airflowctl_tests/conftest.py +++ b/airflow-ctl-tests/tests/airflowctl_tests/conftest.py @@ -235,7 +235,7 @@ def test_commands(login_command, date_param): # Define test commands to run with actual running API server return [ login_command, - "backfills list", + "backfill list", "config get --section core --option executor", "connections create --connection-id=test_con --conn-type=mysql --password=TEST_PASS -o json", "connections list", diff --git a/airflow-ctl/docs/howto/index.rst b/airflow-ctl/docs/howto/index.rst index 602c0174e6ebf..12946aed4498b 100644 --- a/airflow-ctl/docs/howto/index.rst +++ b/airflow-ctl/docs/howto/index.rst @@ -28,13 +28,24 @@ configuring an airflowctl environment. How to use airflowctl ----------------------- +--------------------- **Important Note** '''''''''''''''''' airflowctl needs the Airflow API running to be able to work. Please, see the login section below before use. Otherwise, you may get errors. +Datetime Usage +'''''''''''''' +For datetime parameters, date should be timezone aware and in ISO format. +For example: ``2025-10-10T10:00:00+00:00`` +Let's take example of triggering a DAG run with a logical date, run after and a note. + +.. code-block:: bash + + airflowctl dagrun trigger --dag-id="example_bash_operator" --logical-date="2025-09-06T00:00:00+00:00" --run-after="2025-09-06T00:00:00+00:00" --note="Triggered from airflowctl" + + Login ''''' airflowctl needs to be able to connect to the Airflow API. You should pass API URL as a parameter to the command @@ -66,7 +77,39 @@ In both cases token is securely stored in the keyring backend. Only configuratio is the API URL and the environment name. The token is stored in the keyring backend and is not persisted in the configuration file. The keyring backend is used to securely store the token and is not accessible to the user. +What is authentication for ``airflowctl``? +`````````````````````````````````````````` +For ``airflowctl`` to be able to communicate with the Airflow API, it needs to authenticate itself and acquire token. +This is done using either a token or a username and password. +The token can be acquired from the Airflow API or generated using a username and password. + +.. image:: ../images/diagrams/airflowctl_api_network_architecture_diagram.png + :target: https://raw.githubusercontent.com/apache/airflow/main/airflow-ctl/docs/images/diagrams/airflowctl_api_network_architecture_diagram.png + :width: 100% + :alt: airflowctl Auth Login Help + +Parameter Details for airflowctl auth login +``````````````````````````````````````````` + +**--api-url**: This parameter is required. (e.g. ``http://localhost:8080``) +The default value is ``http://localhost:8080``. Full URL of the Airflow API. Without any ``/api/*`` suffixes. +If you are running the ``airflowctl`` in ``breeze`` container, it is optional. + +**--api-token**: This parameter is optional. +If you are setting the token via the environment variable ``AIRFLOW_CLI_TOKEN``, you can skip using this parameter. + +**--username**: This parameter is optional. +If you are not using ``--api-token`` or the environment variable ``AIRFLOW_CLI_TOKEN``, you must provide a username to authentication along with ``--password``. +**--password**: This parameter is optional. +If you provide a username via ``--username`` this is the required password to authenticate. + +**--env**: This parameter is optional. +The name of the environment to create or update. The default value is ``production``. +This parameter is useful when you want to manage multiple Airflow environments. + +More Usage and Help Pictures +'''''''''''''''''''''''''''' For more information use .. code-block:: bash @@ -88,7 +131,6 @@ You can use the command ``airflowctl --help`` to see the list of available comma :width: 60% :alt: airflowctl Help - All Available Group Command References -------------------------------------- @@ -109,10 +151,10 @@ These visual references show the full command syntax, options, and parameters fo :width: 60% :alt: airflowctl Auth Command -**Backfills** +**Backfill** ''''''''''''' -.. image:: ../images/output_backfills.svg - :target: https://raw.githubusercontent.com/apache/airflow/main/airflow-ctl/docs/images/output_backfills.svg +.. image:: ../images/output_backfill.svg + :target: https://raw.githubusercontent.com/apache/airflow/main/airflow-ctl/docs/images/output_backfill.svg :width: 60% :alt: airflowctl Backfills Command @@ -132,8 +174,8 @@ These visual references show the full command syntax, options, and parameters fo **Dags** '''''''' -.. image:: ../images/output_dag.svg - :target: https://raw.githubusercontent.com/apache/airflow/main/airflow-ctl/docs/images/output_dag.svg +.. image:: ../images/output_dags.svg + :target: https://raw.githubusercontent.com/apache/airflow/main/airflow-ctl/docs/images/output_dags.svg :width: 60% :alt: airflowctl Dag Command diff --git a/airflow-ctl/docs/images/command_hashes.txt b/airflow-ctl/docs/images/command_hashes.txt index 8865ab3e19827..80a7ac0cbf48e 100644 --- a/airflow-ctl/docs/images/command_hashes.txt +++ b/airflow-ctl/docs/images/command_hashes.txt @@ -1,14 +1,14 @@ -main:8d768c837899829dfd21d37253d2fb44 +main:deacf21c6300eae16afbf8cbd538f1ef assets:b3ae2b933e54528bf486ff28e887804d auth:f396d4bce90215599dde6ad0a8f30f29 -backfills:725109470cd2613de8cc8af022fb54bc +backfill:bbce9859a2d1ce054ad22db92dea8c05 config:cb175bedf29e8a2c2c6a2ebd13d770a7 connections:a16225e1c7d28488d0da612752669b4b -dag:0c06fff60c0cc6618c8de05915506605 +dags:6928d0192e95fde5b0c092e0ea5a0703 dagrun:ec1b6098822419967e621687bd7e5e4b jobs:7f8680afff230eb9940bc7fca727bd52 pools:03fc7d948cbecf16ff8d640eb8f0ce43 providers:1c0afb2dff31d93ab2934b032a2250ab variables:0b04188937b3c364204ef4cc9a541c62 -version:000176f03a175890b12181c8569e2d0f +version:d4a7a6229b3a204f114283b62eac789b auth login:5277c653ff6dce51f37472dc0bda9775 diff --git a/airflow-ctl/docs/images/diagrams/airflowctl_api_network_architecture_diagram.md5sum b/airflow-ctl/docs/images/diagrams/airflowctl_api_network_architecture_diagram.md5sum new file mode 100644 index 0000000000000..d3ecab853e6fd --- /dev/null +++ b/airflow-ctl/docs/images/diagrams/airflowctl_api_network_architecture_diagram.md5sum @@ -0,0 +1 @@ +ea755b0b93dd0524e7ed0ef42b83a888 diff --git a/airflow-ctl/docs/images/diagrams/airflowctl_api_network_architecture_diagram.png b/airflow-ctl/docs/images/diagrams/airflowctl_api_network_architecture_diagram.png new file mode 100644 index 0000000000000..b79bafcc61813 Binary files /dev/null and b/airflow-ctl/docs/images/diagrams/airflowctl_api_network_architecture_diagram.png differ diff --git a/airflow-ctl/docs/images/diagrams/airflowctl_api_network_architecture_diagram.py b/airflow-ctl/docs/images/diagrams/airflowctl_api_network_architecture_diagram.py new file mode 100644 index 0000000000000..0894fb955df7a --- /dev/null +++ b/airflow-ctl/docs/images/diagrams/airflowctl_api_network_architecture_diagram.py @@ -0,0 +1,86 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from pathlib import Path + +from diagrams import Cluster, Diagram, Edge +from diagrams.onprem.client import User +from diagrams.onprem.compute import Server +from rich.console import Console + +MY_DIR = Path(__file__).parent +MY_FILENAME = Path(__file__).with_suffix("").name + +console = Console(width=400, color_system="standard") + +graph_attr = { + "concentrate": "false", + "splines": "splines", +} + +edge_attr = { + "minlen": "1", +} + + +def generate_airflowctl_api_network_diagram(): + image_file = (MY_DIR / MY_FILENAME).with_suffix(".png") + + console.print(f"[bright_blue]Generating network diagram {image_file}") + with Diagram( + name="airflowctl<->API Network Diagram", + show=False, + direction="LR", + filename=MY_FILENAME, + edge_attr=edge_attr, + graph_attr=graph_attr, + ): + # Machine network with client + with Cluster("Machine Network", graph_attr={"margin": "30", "width": "10"}): + client = User("Client\n(The machine/host has the airflowctl installed)") + + # Airflow deployment network with API server + with Cluster("Apache Airflow Deployment Network", graph_attr={"margin": "30"}): + api_server = Server("Apache Airflow API Server\n(e.g. DNS: https://airflow.internal.api.com)") + + # Edges representing the flows + ( + client + >> Edge( + color="blue", + style="solid", + label="Login Request\n(if not manually used in --api-token or env var. Authentication done with username/password)", + ) + >> api_server + ) + + ( + api_server + >> Edge( + color="darkgreen", + style="solid", + label="Returns Token", + ) + >> client + ) + + console.print(f"[green]Generated network diagram {image_file}") + + +if __name__ == "__main__": + generate_airflowctl_api_network_diagram() diff --git a/airflow-ctl/docs/images/output_backfill.svg b/airflow-ctl/docs/images/output_backfill.svg new file mode 100644 index 0000000000000..239a37ab8ea0d --- /dev/null +++ b/airflow-ctl/docs/images/output_backfill.svg @@ -0,0 +1,125 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + Usage:airflowctl backfill [-hCOMMAND... + +Perform Backfill operations + +Positional Arguments: +COMMAND +cancelPerform cancel operation +createPerform create operation +create-dry-run +Perform create_dry_run operation +getPerform get operation +listPerform list operation +pausePerform pause operation +unpausePerform unpause operation + +Options: +-h--helpshow this help message and exit + + + + diff --git a/airflow-ctl/docs/images/output_backfills.svg b/airflow-ctl/docs/images/output_backfills.svg deleted file mode 100644 index d620c4962b494..0000000000000 --- a/airflow-ctl/docs/images/output_backfills.svg +++ /dev/null @@ -1,125 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - Usage:airflowctl backfills [-hCOMMAND... - -Perform Backfills operations - -Positional Arguments: -COMMAND -cancelPerform cancel operation -createPerform create operation -create-dry-run -Perform create_dry_run operation -getPerform get operation -listPerform list operation -pausePerform pause operation -unpausePerform unpause operation - -Options: --h--helpshow this help message and exit - - - - diff --git a/airflow-ctl/docs/images/output_dag.svg b/airflow-ctl/docs/images/output_dag.svg deleted file mode 100644 index f9192776445fe..0000000000000 --- a/airflow-ctl/docs/images/output_dag.svg +++ /dev/null @@ -1,145 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - Usage:airflowctl dag [-hCOMMAND... - -Perform Dag operations - -Positional Arguments: -COMMAND -deletePerform delete operation -getPerform get operation -get-detailsPerform get_details operation -get-import-errorPerform get_import_error operation -get-statsPerform get_stats operation -get-tagsPerform get_tags operation -get-versionPerform get_version operation -listPerform list operation -list-import-errors -Perform list_import_errors operation -list-versionPerform list_version operation -list-warningPerform list_warning operation -patchPerform patch operation - -Options: --h--helpshow this help message and exit - - - - diff --git a/airflow-ctl/docs/images/output_dags.svg b/airflow-ctl/docs/images/output_dags.svg new file mode 100644 index 0000000000000..73c7b63a48fd8 --- /dev/null +++ b/airflow-ctl/docs/images/output_dags.svg @@ -0,0 +1,145 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + Usage:airflowctl dags [-hCOMMAND... + +Perform Dags operations + +Positional Arguments: +COMMAND +deletePerform delete operation +getPerform get operation +get-detailsPerform get_details operation +get-import-errorPerform get_import_error operation +get-statsPerform get_stats operation +get-tagsPerform get_tags operation +get-versionPerform get_version operation +listPerform list operation +list-import-errors +Perform list_import_errors operation +list-versionPerform list_version operation +list-warningPerform list_warning operation +updatePerform update operation + +Options: +-h--helpshow this help message and exit + + + + diff --git a/airflow-ctl/docs/images/output_main.svg b/airflow-ctl/docs/images/output_main.svg index c2489613f0824..f6c7225a4ebc9 100644 --- a/airflow-ctl/docs/images/output_main.svg +++ b/airflow-ctl/docs/images/output_main.svg @@ -19,99 +19,99 @@ font-weight: 700; } - .terminal-1369337233-matrix { + .terminal-3400494481-matrix { font-family: Fira Code, monospace; font-size: 20px; line-height: 24.4px; font-variant-east-asian: full-width; } - .terminal-1369337233-title { + .terminal-3400494481-title { font-size: 18px; font-weight: bold; font-family: arial; } - .terminal-1369337233-r1 { fill: #ff8700 } -.terminal-1369337233-r2 { fill: #c5c8c6 } -.terminal-1369337233-r3 { fill: #808080 } -.terminal-1369337233-r4 { fill: #68a0b3 } + .terminal-3400494481-r1 { fill: #ff8700 } +.terminal-3400494481-r2 { fill: #c5c8c6 } +.terminal-3400494481-r3 { fill: #808080 } +.terminal-3400494481-r4 { fill: #68a0b3 } - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + @@ -123,34 +123,34 @@ - + - - Usage:airflowctl [-hGROUP_OR_COMMAND... - -Positional Arguments: -GROUP_OR_COMMAND - -    Groups -assetsPerform Assets operations -authManage authentication for CLI. Either pass -token from environment variable/parameter -or pass username and password. -backfillsPerform Backfills operations -configPerform Config operations -connectionsPerform Connections operations -dagPerform Dag operations -dagrunPerform DagRun operations -jobsPerform Jobs operations -poolsPerform Pools operations -providersPerform Providers operations -variablesPerform Variables operations - -    Commands: -versionShow version information - -Options: --h--helpshow this help message and exit + + Usage:airflowctl [-hGROUP_OR_COMMAND... + +Positional Arguments: +GROUP_OR_COMMAND + +    Groups +assetsPerform Assets operations +authManage authentication for CLI. Either pass +token from environment variable/parameter +or pass username and password. +backfillPerform Backfill operations +configPerform Config operations +connectionsPerform Connections operations +dagrunPerform DagRun operations +dagsPerform Dags operations +jobsPerform Jobs operations +poolsPerform Pools operations +providersPerform Providers operations +variablesPerform Variables operations + +    Commands: +versionShow version information + +Options: +-h--helpshow this help message and exit diff --git a/airflow-ctl/docs/images/output_version.svg b/airflow-ctl/docs/images/output_version.svg index 8a1da141ff4ed..a7ce436c8b8aa 100644 --- a/airflow-ctl/docs/images/output_version.svg +++ b/airflow-ctl/docs/images/output_version.svg @@ -1,4 +1,4 @@ - + - - + + - + - + - + - + - + + + + + + + + + + - + - + - - Usage:airflowctl version [-h] - -Show version information - -Options: --h--helpshow this help message and exit + + Usage:airflowctl version [-h] [-eENV] [--remote] + +Show version information + +Options: +-h--helpshow this help message and exit +-e--envENVThe environment to run the command in +--remoteFetch the Airflow version in remote server,  +otherwise only shows the local airflowctl version diff --git a/airflow-ctl/src/airflowctl/api/client.py b/airflow-ctl/src/airflowctl/api/client.py index 09947833bafa3..355304ae37a14 100644 --- a/airflow-ctl/src/airflowctl/api/client.py +++ b/airflow-ctl/src/airflowctl/api/client.py @@ -36,11 +36,11 @@ from airflowctl import __version__ as version from airflowctl.api.operations import ( AssetsOperations, - BackfillsOperations, + BackfillOperations, ConfigOperations, ConnectionsOperations, - DagOperations, DagRunOperations, + DagsOperations, JobsOperations, LoginOperations, PoolsOperations, @@ -244,7 +244,7 @@ def assets(self): @property def backfills(self): """Operations related to backfills.""" - return BackfillsOperations(self) + return BackfillOperations(self) @lru_cache() # type: ignore[prop-decorator] @property @@ -262,7 +262,7 @@ def connections(self): @property def dags(self): """Operations related to DAGs.""" - return DagOperations(self) + return DagsOperations(self) @lru_cache() # type: ignore[prop-decorator] @property @@ -316,7 +316,7 @@ def get_client(kind: Literal[ClientKind.CLI, ClientKind.AUTH] = ClientKind.CLI): api_client = Client( base_url=credentials.api_url or "http://localhost:8080", limits=httpx.Limits(max_keepalive_connections=1, max_connections=1), - token=credentials.api_token or "", + token=credentials.api_token or str(os.getenv("AIRFLOW_CLI_TOKEN", "")), kind=kind, ) yield api_client diff --git a/airflow-ctl/src/airflowctl/api/operations.py b/airflow-ctl/src/airflowctl/api/operations.py index fe56661d2d0b8..a2cb27eafc46b 100644 --- a/airflow-ctl/src/airflowctl/api/operations.py +++ b/airflow-ctl/src/airflowctl/api/operations.py @@ -190,7 +190,7 @@ def login_with_username_and_password(self, login: LoginBody) -> LoginResponse | """Login to the API server.""" try: return LoginResponse.model_validate_json( - self.client.post("/token/cli", json=login.model_dump()).content + self.client.post("/token/cli", json=login.model_dump(mode="json")).content ) except ServerResponseError as e: raise e @@ -229,7 +229,10 @@ def create_event( ) -> AssetEventResponse | ServerResponseError: """Create an asset event.""" try: - self.response = self.client.post("assets/events", json=asset_event_body.model_dump()) + # Ensure extra is initialised before sent to API + if asset_event_body.extra is None: + asset_event_body.extra = {} + self.response = self.client.post("assets/events", json=asset_event_body.model_dump(mode="json")) return AssetEventResponse.model_validate_json(self.response.content) except ServerResponseError as e: raise e @@ -293,13 +296,13 @@ def delete_queued_event(self, dag_id: str, asset_id: str) -> str | ServerRespons raise e -class BackfillsOperations(BaseOperations): +class BackfillOperations(BaseOperations): """Backfill operations.""" def create(self, backfill: BackfillPostBody) -> BackfillResponse | ServerResponseError: """Create a backfill.""" try: - self.response = self.client.post("backfills", data=backfill.model_dump()) + self.response = self.client.post("backfills", data=backfill.model_dump(mode="json")) return BackfillResponse.model_validate_json(self.response.content) except ServerResponseError as e: raise e @@ -307,7 +310,7 @@ def create(self, backfill: BackfillPostBody) -> BackfillResponse | ServerRespons def create_dry_run(self, backfill: BackfillPostBody) -> BackfillResponse | ServerResponseError: """Create a dry run backfill.""" try: - self.response = self.client.post("backfills/dry_run", data=backfill.model_dump()) + self.response = self.client.post("backfills/dry_run", data=backfill.model_dump(mode="json")) return BackfillResponse.model_validate_json(self.response.content) except ServerResponseError as e: raise e @@ -391,7 +394,7 @@ def create( ) -> ConnectionResponse | ServerResponseError: """Create a connection.""" try: - self.response = self.client.post("connections", json=connection.model_dump()) + self.response = self.client.post("connections", json=connection.model_dump(mode="json")) return ConnectionResponse.model_validate_json(self.response.content) except ServerResponseError as e: raise e @@ -399,7 +402,7 @@ def create( def bulk(self, connections: BulkBodyConnectionBody) -> BulkResponse | ServerResponseError: """CRUD multiple connections.""" try: - self.response = self.client.patch("connections", json=connections.model_dump()) + self.response = self.client.patch("connections", json=connections.model_dump(mode="json")) return BulkResponse.model_validate_json(self.response.content) except ServerResponseError as e: raise e @@ -427,7 +430,7 @@ def update( """Update a connection.""" try: self.response = self.client.patch( - f"connections/{connection.connection_id}", json=connection.model_dump() + f"connections/{connection.connection_id}", json=connection.model_dump(mode="json") ) return ConnectionResponse.model_validate_json(self.response.content) except ServerResponseError as e: @@ -439,14 +442,14 @@ def test( ) -> ConnectionTestResponse | ServerResponseError: """Test a connection.""" try: - self.response = self.client.post("connections/test", json=connection.model_dump()) + self.response = self.client.post("connections/test", json=connection.model_dump(mode="json")) return ConnectionTestResponse.model_validate_json(self.response.content) except ServerResponseError as e: raise e -class DagOperations(BaseOperations): - """Dag operations.""" +class DagsOperations(BaseOperations): + """Dags operations.""" def get(self, dag_id: str) -> DAGResponse | ServerResponseError: """Get a DAG.""" @@ -472,9 +475,9 @@ def list(self) -> DAGCollectionResponse | ServerResponseError: """List DAGs.""" return super().execute_list(path="dags", data_model=DAGCollectionResponse) - def patch(self, dag_id: str, dag_body: DAGPatchBody) -> DAGResponse | ServerResponseError: + def update(self, dag_id: str, dag_body: DAGPatchBody) -> DAGResponse | ServerResponseError: try: - self.response = self.client.patch(f"dags/{dag_id}", json=dag_body.model_dump()) + self.response = self.client.patch(f"dags/{dag_id}", json=dag_body.model_dump(mode="json")) return DAGResponse.model_validate_json(self.response.content) except ServerResponseError as e: raise e @@ -557,7 +560,9 @@ def trigger( if trigger_dag_run.conf is None: trigger_dag_run.conf = {} try: - self.response = self.client.post(f"dags/{dag_id}/dagRuns", json=trigger_dag_run.model_dump()) + self.response = self.client.post( + f"dags/{dag_id}/dagRuns", json=trigger_dag_run.model_dump(mode="json") + ) return DAGRunResponse.model_validate_json(self.response.content) except ServerResponseError as e: raise e @@ -592,7 +597,7 @@ def list(self) -> PoolCollectionResponse | ServerResponseError: def create(self, pool: PoolBody) -> PoolResponse | ServerResponseError: """Create a pool.""" try: - self.response = self.client.post("pools", json=pool.model_dump()) + self.response = self.client.post("pools", json=pool.model_dump(mode="json")) return PoolResponse.model_validate_json(self.response.content) except ServerResponseError as e: raise e @@ -600,7 +605,7 @@ def create(self, pool: PoolBody) -> PoolResponse | ServerResponseError: def bulk(self, pools: BulkBodyPoolBody) -> BulkResponse | ServerResponseError: """CRUD multiple pools.""" try: - self.response = self.client.patch("pools", json=pools.model_dump()) + self.response = self.client.patch("pools", json=pools.model_dump(mode="json")) return BulkResponse.model_validate_json(self.response.content) except ServerResponseError as e: raise e @@ -616,7 +621,9 @@ def delete(self, pool: str) -> str | ServerResponseError: def update(self, pool_body: PoolPatchBody) -> PoolResponse | ServerResponseError: """Update a pool.""" try: - self.response = self.client.patch(f"pools/{pool_body.pool}", json=pool_body.model_dump()) + self.response = self.client.patch( + f"pools/{pool_body.pool}", json=pool_body.model_dump(mode="json") + ) return PoolResponse.model_validate_json(self.response.content) except ServerResponseError as e: raise e @@ -648,7 +655,7 @@ def list(self) -> VariableCollectionResponse | ServerResponseError: def create(self, variable: VariableBody) -> VariableResponse | ServerResponseError: """Create a variable.""" try: - self.response = self.client.post("variables", json=variable.model_dump()) + self.response = self.client.post("variables", json=variable.model_dump(mode="json")) return VariableResponse.model_validate_json(self.response.content) except ServerResponseError as e: raise e @@ -656,7 +663,7 @@ def create(self, variable: VariableBody) -> VariableResponse | ServerResponseErr def bulk(self, variables: BulkBodyVariableBody) -> BulkResponse | ServerResponseError: """CRUD multiple variables.""" try: - self.response = self.client.patch("variables", json=variables.model_dump()) + self.response = self.client.patch("variables", json=variables.model_dump(mode="json")) return BulkResponse.model_validate_json(self.response.content) except ServerResponseError as e: raise e @@ -672,7 +679,9 @@ def delete(self, variable_key: str) -> str | ServerResponseError: def update(self, variable: VariableBody) -> VariableResponse | ServerResponseError: """Update a variable.""" try: - self.response = self.client.patch(f"variables/{variable.key}", json=variable.model_dump()) + self.response = self.client.patch( + f"variables/{variable.key}", json=variable.model_dump(mode="json") + ) return VariableResponse.model_validate_json(self.response.content) except ServerResponseError as e: raise e diff --git a/airflow-ctl/src/airflowctl/ctl/cli_config.py b/airflow-ctl/src/airflowctl/ctl/cli_config.py index fb6a7dfde0d24..b4b283c658a3b 100644 --- a/airflow-ctl/src/airflowctl/ctl/cli_config.py +++ b/airflow-ctl/src/airflowctl/ctl/cli_config.py @@ -83,7 +83,7 @@ def safe_call_command(function: Callable, args: Iterable[Arg]) -> None: except AirflowCtlNotFoundException as e: rich.print(f"command failed due to {e}") sys.exit(1) - except httpx.RemoteProtocolError as e: + except (httpx.RemoteProtocolError, httpx.ReadError) as e: rich.print(f"[red]Remote protocol error: {e}[/red]") if "Server disconnected without sending a response." in str(e): rich.print( @@ -93,7 +93,7 @@ def safe_call_command(function: Callable, args: Iterable[Arg]) -> None: except httpx.ReadTimeout as e: rich.print(f"[red]Read timeout error: {e}[/red]") if "timed out" in str(e): - rich.print("Please check if the server is running and the API ready to accept calls.[/red]") + rich.print("[red]Please check if the server is running and the API ready to accept calls.[/red]") except ServerResponseError as e: rich.print(f"Server response error: {e}") if "Client error message:" in str(e): @@ -296,6 +296,14 @@ def __call__(self, parser, namespace, values, option_string=None): action="store_true", ) +# Version Command Args +ARG_REMOTE = Arg( + flags=("--remote",), + help="Fetch the Airflow version in remote server, otherwise only shows the local airflowctl version", + default=False, + action="store_true", +) + class ActionCommand(NamedTuple): """Single CLI command.""" @@ -452,7 +460,7 @@ def _is_primitive_type(type_name: str) -> bool: return type_name in primitive_types @staticmethod - def _python_type_from_string(type_name: str) -> type: + def _python_type_from_string(type_name: str | type) -> type | Callable: """ Return the corresponding Python *type* for a primitive type name string. @@ -462,7 +470,9 @@ def _python_type_from_string(type_name: str) -> type: leading to type errors or unexpected behaviour when invoking the REST API. """ - mapping: dict[str, type] = { + if "|" in str(type_name): + type_name = [t.strip() for t in str(type_name).split("|") if t.strip() != "None"].pop() + mapping: dict[str, type | Callable] = { "int": int, "float": float, "bool": bool, @@ -473,15 +483,18 @@ def _python_type_from_string(type_name: str) -> type: "tuple": tuple, "set": set, "datetime.datetime": datetime.datetime, + "dict[str, typing.Any]": dict, } # Default to ``str`` to preserve previous behaviour for any unrecognised # type names while still allowing the CLI to function. - return mapping.get(type_name, str) + if isinstance(type_name, type): + type_name = type_name.__name__ + return mapping.get(str(type_name), str) @staticmethod def _create_arg( arg_flags: tuple, - arg_type: type, + arg_type: type | Callable, arg_help: str, arg_action: argparse.BooleanOptionalAction | None, arg_dest: str | None = None, @@ -514,7 +527,7 @@ def _create_arg_for_non_primitive_type( commands.append( self._create_arg( arg_flags=("--" + self._sanitize_arg_parameter_key(field),), - arg_type=field_type.annotation, + arg_type=self._python_type_from_string(field_type.annotation), arg_action=argparse.BooleanOptionalAction if field_type.annotation is bool else None, # type: ignore arg_help=f"{field} for {parameter_key} operation", arg_default=False if field_type.annotation is bool else None, @@ -529,7 +542,7 @@ def _create_arg_for_non_primitive_type( commands.append( self._create_arg( arg_flags=("--" + self._sanitize_arg_parameter_key(field),), - arg_type=annotation, + arg_type=self._python_type_from_string(annotation), arg_action=argparse.BooleanOptionalAction if annotation is bool else None, # type: ignore arg_help=f"{field} for {parameter_key} operation", arg_default=False if annotation is bool else None, @@ -544,12 +557,11 @@ def _create_args_map_from_operation(self): for parameter in operation.get("parameters"): for parameter_key, parameter_type in parameter.items(): if self._is_primitive_type(type_name=parameter_type): - python_type = self._python_type_from_string(parameter_type) is_bool = parameter_type == "bool" args.append( self._create_arg( arg_flags=("--" + self._sanitize_arg_parameter_key(parameter_key),), - arg_type=None if is_bool else python_type, + arg_type=self._python_type_from_string(parameter_type), arg_action=argparse.BooleanOptionalAction if is_bool else None, arg_help=f"{parameter_key} for {operation.get('name')} operation in {operation.get('parent').name}", arg_default=False if is_bool else None, @@ -563,7 +575,7 @@ def _create_args_map_from_operation(self): ) if any(operation.get("name").startswith(cmd) for cmd in self.output_command_list): - args.extend([ARG_OUTPUT]) + args.extend([ARG_OUTPUT, ARG_AUTH_ENVIRONMENT]) self.args_map[(operation.get("name"), operation.get("parent").name)] = args @@ -852,7 +864,10 @@ def merge_commands( help="Show version information", description="Show version information", func=lazy_load_command("airflowctl.ctl.commands.version_command.version_info"), - args=(), + args=( + ARG_AUTH_ENVIRONMENT, + ARG_REMOTE, + ), ), GroupCommand( name="variables", diff --git a/airflow-ctl/src/airflowctl/ctl/commands/version_command.py b/airflow-ctl/src/airflowctl/ctl/commands/version_command.py index 8eae1fcea392d..bfaa49a668240 100644 --- a/airflow-ctl/src/airflowctl/ctl/commands/version_command.py +++ b/airflow-ctl/src/airflowctl/ctl/commands/version_command.py @@ -16,8 +16,6 @@ # under the License. from __future__ import annotations -import sys - import rich from airflowctl import __version__ as airflowctl_version @@ -28,11 +26,9 @@ def version_info(arg, api_client=NEW_API_CLIENT): """Get version information.""" version_dict = {"airflowctl_version": airflowctl_version} - try: + if arg.remote: version_response = api_client.version.get() version_dict.update(version_response.model_dump()) rich.print(version_dict) - except Exception as e: - rich.print(f"[red]Error fetching version information: {e}[/red]") + else: rich.print(version_dict) - sys.exit(1) diff --git a/airflow-ctl/tests/airflow_ctl/api/test_operations.py b/airflow-ctl/tests/airflow_ctl/api/test_operations.py index 4aa5da45b1b1c..785fed87dcc34 100644 --- a/airflow-ctl/tests/airflow_ctl/api/test_operations.py +++ b/airflow-ctl/tests/airflow_ctl/api/test_operations.py @@ -811,13 +811,13 @@ def handle_request(request: httpx.Request) -> httpx.Response: response = client.dags.list() assert response == self.dag_collection_response - def test_patch(self): + def test_update(self): def handle_request(request: httpx.Request) -> httpx.Response: assert request.url.path == "/api/v2/dags/dag_id" return httpx.Response(200, json=json.loads(self.dag_response.model_dump_json())) client = make_api_client(transport=httpx.MockTransport(handle_request)) - response = client.dags.patch(dag_id="dag_id", dag_body=self.dag_patch_body) + response = client.dags.update(dag_id="dag_id", dag_body=self.dag_patch_body) assert response == self.dag_response def test_delete(self): diff --git a/airflow-ctl/tests/airflow_ctl/ctl/commands/test_version_command.py b/airflow-ctl/tests/airflow_ctl/ctl/commands/test_version_command.py index b5deacabb1bd7..fee0487d3f74f 100644 --- a/airflow-ctl/tests/airflow_ctl/ctl/commands/test_version_command.py +++ b/airflow-ctl/tests/airflow_ctl/ctl/commands/test_version_command.py @@ -48,19 +48,16 @@ class TestVersionCommand: parser = cli_parser.get_parser() - def test_ctl_version(self, mock_client): + def test_ctl_version_remote(self, mock_client): with redirect_stdout(StringIO()) as stdout: - version_info(self.parser.parse_args(["version"]), api_client=mock_client) + version_info(self.parser.parse_args(["version", "--remote"]), api_client=mock_client) assert "version" in stdout.getvalue() assert "git_version" in stdout.getvalue() assert "airflowctl_version" in stdout.getvalue() - def test_ctl_version_exception(self, mock_client): + def test_ctl_version_only_local_version(self, mock_client): """Test the version command with an exception.""" - mock_client.version.get.side_effect = Exception("Test exception") with redirect_stdout(StringIO()) as stdout: - with pytest.raises(SystemExit): - version_info(self.parser.parse_args(["version"]), api_client=mock_client) + version_info(self.parser.parse_args(["version"]), api_client=mock_client) output = stdout.getvalue() - assert "Error fetching version information: Test exception" in output assert "airflowctl_version" in output diff --git a/airflow-ctl/tests/airflow_ctl/ctl/test_cli_config.py b/airflow-ctl/tests/airflow_ctl/ctl/test_cli_config.py index 1e4f5728a25ec..c0ac022a2560f 100644 --- a/airflow-ctl/tests/airflow_ctl/ctl/test_cli_config.py +++ b/airflow-ctl/tests/airflow_ctl/ctl/test_cli_config.py @@ -17,14 +17,11 @@ from __future__ import annotations -import datetime from argparse import BooleanOptionalAction from textwrap import dedent -from typing import Any import pytest -from airflowctl.api.datamodels.generated import ReprocessBehavior from airflowctl.ctl.cli_config import ActionCommand, CommandFactory, GroupCommand, merge_commands @@ -59,7 +56,7 @@ def test_args_create(): "help": "from_date for backfill operation", "action": None, "default": None, - "type": datetime.datetime, + "type": str, "dest": None, }, ), @@ -69,7 +66,7 @@ def test_args_create(): "help": "to_date for backfill operation", "action": None, "default": None, - "type": datetime.datetime, + "type": str, "dest": None, }, ), @@ -89,7 +86,7 @@ def test_args_create(): "help": "dag_run_conf for backfill operation", "action": None, "default": None, - "type": dict[str, Any], + "type": dict, "dest": None, }, ), @@ -99,7 +96,7 @@ def test_args_create(): "help": "reprocess_behavior for backfill operation", "action": None, "default": None, - "type": ReprocessBehavior, + "type": str, "dest": None, }, ), @@ -228,7 +225,7 @@ def test_method(self): class BackfillsOperations(BaseOperations): def create(self, backfill: BackfillPostBody) -> BackfillResponse | ServerResponseError: try: - self.response = self.client.post("backfills", data=backfill.model_dump()) + self.response = self.client.post("backfills", json=backfill.model_dump(mode="json")) return BackfillResponse.model_validate_json(self.response.content) except ServerResponseError as e: raise e diff --git a/scripts/in_container/run_capture_airflowctl_help.py b/scripts/in_container/run_capture_airflowctl_help.py index 874b0bc6a8213..665325e3e413a 100644 --- a/scripts/in_container/run_capture_airflowctl_help.py +++ b/scripts/in_container/run_capture_airflowctl_help.py @@ -38,10 +38,10 @@ "", # for `airflowctl -h`, main help "assets", "auth", - "backfills", + "backfill", "config", "connections", - "dag", + "dags", "dagrun", "jobs", "pools",