From c042c2e2bce638050b127d005e8b5c08a05bafa5 Mon Sep 17 00:00:00 2001 From: jx2lee Date: Sun, 18 May 2025 17:30:11 +0900 Subject: [PATCH 1/3] init import/export in airflowctl variables --- airflow-ctl/docs/images/command_hashes.txt | 2 +- airflow-ctl/docs/images/output_variables.svg | 80 +++++++++------- airflow-ctl/src/airflowctl/ctl/cli_config.py | 37 +++++++ .../ctl/commands/variable_command.py | 96 +++++++++++++++++++ 4 files changed, 178 insertions(+), 37 deletions(-) create mode 100644 airflow-ctl/src/airflowctl/ctl/commands/variable_command.py diff --git a/airflow-ctl/docs/images/command_hashes.txt b/airflow-ctl/docs/images/command_hashes.txt index 22eb1729734e9..30e8f104adff0 100644 --- a/airflow-ctl/docs/images/command_hashes.txt +++ b/airflow-ctl/docs/images/command_hashes.txt @@ -9,6 +9,6 @@ dagrun:7b3e06a3664cc7ceb18457b4c0895532 jobs:806174e6c9511db669705279ed6a00b9 pools:2c17a4131b6481bd8fe9120982606db2 providers:d053e6f17ff271e1e08942378344d27b -variables:d9001295d77adefbd68e389f6622b89a +variables:cd3970589b2cb1e3ebd9a0b7f2ffdf4d version:11da98f530c37754403a87151cbe2274 auth login:348c25d49128b6007ac97dae2ef7563f diff --git a/airflow-ctl/docs/images/output_variables.svg b/airflow-ctl/docs/images/output_variables.svg index 3233311b978fd..4baba40504240 100644 --- a/airflow-ctl/docs/images/output_variables.svg +++ b/airflow-ctl/docs/images/output_variables.svg @@ -1,4 +1,4 @@ - + - - + + - + - + - + - + - + - + - + - + - + - + - + - + - + + + + + + + - Command: variables + Command: variables - + - - Usage: airflowctl variables [-h] COMMAND ... - -Perform Variables operations - -Positional Arguments: -  COMMAND -    create    Perform create operation -    delete    Perform delete operation -    get       Perform get operation -    list      Perform list operation -    update    Perform update operation - -Optional Arguments: -  -h, --help  show this help message and exit + + Usage: airflowctl variables [-h] COMMAND ... + +Perform Variables operations + +Positional Arguments: +  COMMAND +    create    Perform create operation +    delete    Perform delete operation +    export    Export all variables +    get       Perform get operation +    import    Import variables +    list      Perform list operation +    update    Perform update operation + +Optional Arguments: +  -h, --help  show this help message and exit diff --git a/airflow-ctl/src/airflowctl/ctl/cli_config.py b/airflow-ctl/src/airflowctl/ctl/cli_config.py index d2b045da32a36..a703b31cc2737 100644 --- a/airflow-ctl/src/airflowctl/ctl/cli_config.py +++ b/airflow-ctl/src/airflowctl/ctl/cli_config.py @@ -198,6 +198,23 @@ def __call__(self, parser, namespace, values, option_string=None): action=Password, nargs="?", ) +ARG_VARIABLE_IMPORT = Arg( + flags=("file",), + metavar="file", + help="Import variables from JSON file", +) +ARG_VARIABLE_ACTION_ON_EXISTING_KEY = Arg( + flags=("-a", "--action-on-existing-key"), + type=str, + default="overwrite", + help="Action to take if we encounter a variable key that already exists.", + choices=("overwrite", "fail", "skip"), +) +ARG_VARIABLE_EXPORT = Arg( + flags=("file",), + metavar="file", + help="Export all variables to JSON file", +) ARG_OUTPUT = Arg( flags=("-o", "--output"), @@ -626,6 +643,21 @@ def merge_commands( ), ) +VARIABLE_COMMANDS = ( + ActionCommand( + name="import", + help="Import variables", + func=lazy_load_command("airflowctl.ctl.commands.variable_command.import_"), + args=(ARG_VARIABLE_IMPORT, ARG_VARIABLE_ACTION_ON_EXISTING_KEY), + ), + ActionCommand( + name="export", + help="Export all variables", + func=lazy_load_command("airflowctl.ctl.commands.variable_command.export"), + args=(ARG_VARIABLE_EXPORT,), + ), +) + core_commands: list[CLICommand] = [ GroupCommand( name="auth", @@ -638,6 +670,11 @@ def merge_commands( help="Manage Airflow pools", subcommands=POOL_COMMANDS, ), + GroupCommand( + name="variables", + help="Manage Airflow variables", + subcommands=VARIABLE_COMMANDS, + ), ] # Add generated group commands core_commands = merge_commands( diff --git a/airflow-ctl/src/airflowctl/ctl/commands/variable_command.py b/airflow-ctl/src/airflowctl/ctl/commands/variable_command.py new file mode 100644 index 0000000000000..57144e0d0b47f --- /dev/null +++ b/airflow-ctl/src/airflowctl/ctl/commands/variable_command.py @@ -0,0 +1,96 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import json +import os +import sys +from pathlib import Path + +import rich + +from airflow.api_fastapi.core_api.datamodels.common import BulkAction, BulkActionOnExistence +from airflowctl.api.client import NEW_API_CLIENT, ClientKind, provide_api_client +from airflowctl.api.datamodels.generated import ( + BulkBodyVariableBody, + BulkCreateActionVariableBody, + VariableBody, +) + + +@provide_api_client(kind=ClientKind.CLI) +def import_(args, api_client=NEW_API_CLIENT): + """Import variables from a given file.""" + success_message = "[green]Import successful! success: {success}, errors: {errors}[/green]" + if not os.path.exists(args.file): + rich.print(f"[red]Missing variable file: {args.file}") + sys.exit(1) + with open(args.file) as var_file: + try: + var_json = json.load(var_file) + except json.JSONDecodeError: + rich.print(f"[red]Invalid variable file: {args.file}") + sys.exit(1) + + action_on_existence = BulkActionOnExistence(args.action_on_existing_key) + vars_to_update = [] + for k, v in var_json.items(): + value, description = v, None + if isinstance(v, dict) and v.get("value"): + value, description = v["value"], v.get("description") + + vars_to_update.append( + VariableBody( + key=k, + value=value, + description=description, + ) + ) + + bulk_body = BulkBodyVariableBody( + actions=[ + BulkCreateActionVariableBody( + action=BulkAction.CREATE, + entities=vars_to_update, + action_on_existence=action_on_existence, + ) + ] + ) + result = api_client.variables.bulk(variables=bulk_body) + rich.print(success_message.format(success=result.success, errors=result.errors)) + return result.success, result.errors + + +@provide_api_client(kind=ClientKind.CLI) +def export(args, api_client=NEW_API_CLIENT): + """Export all the variables to the file.""" + success_message = "[green]Export successful! {total_entries} variable(s) to {file}[/green]" + var_dict = {} + variables = api_client.variables.list() + + for variable in variables.variables: + if variable.description: + var_dict[variable.key] = { + "value": variable.value, + "description": variable.description, + } + else: + var_dict[variable.key] = variable.value + + with open(Path(args.file), "w") as var_file: + json.dump(var_dict, var_file, sort_keys=True, indent=4) + rich.print(success_message.format(total_entries=variables.total_entries, file=args.file)) From 9eefc7ea9a9244f6bfeff183764f0761025a3727 Mon Sep 17 00:00:00 2001 From: jx2lee Date: Sat, 24 May 2025 17:22:22 +0900 Subject: [PATCH 2/3] tests --- .../ctl/commands/test_variable_command.py | 89 +++++++++++++++++++ 1 file changed, 89 insertions(+) create mode 100644 airflow-ctl/tests/airflow_ctl/ctl/commands/test_variable_command.py diff --git a/airflow-ctl/tests/airflow_ctl/ctl/commands/test_variable_command.py b/airflow-ctl/tests/airflow_ctl/ctl/commands/test_variable_command.py new file mode 100644 index 0000000000000..72116b73f58a0 --- /dev/null +++ b/airflow-ctl/tests/airflow_ctl/ctl/commands/test_variable_command.py @@ -0,0 +1,89 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import json +import os + +from airflowctl.api.client import ClientKind +from airflowctl.api.datamodels.generated import ( + BulkActionResponse, + VariableCollectionResponse, + VariableResponse, +) +from airflowctl.ctl import cli_parser +from airflowctl.ctl.commands import variable_command + + +class TestCliVariableCommands: + key = "key" + value = "value" + description = "description" + export_file_name = "exported_json.json" + parser = cli_parser.get_parser() + variable_collection_response = VariableCollectionResponse( + variables=[ + VariableResponse( + key=key, + value=value, + description=description, + is_encrypted=False, + ), + ], + total_entries=1, + ) + bulk_action_response = BulkActionResponse(success=[key], errors=[]) + + def test_import(self, api_client_maker, tmp_path, monkeypatch): + api_client = api_client_maker( + path="/api/v2/variables", + response_json=self.bulk_action_response.model_dump(), + expected_http_status_code=200, + kind=ClientKind.CLI, + ) + + monkeypatch.chdir(tmp_path) + expected_json_path = tmp_path / self.export_file_name + variable_file = { + self.key: self.value, + } + + expected_json_path.write_text(json.dumps(variable_file)) + response = variable_command.import_( + self.parser.parse_args(["variables", "import", expected_json_path.as_posix()]), + api_client=api_client, + ) + assert response == ([self.key], []) + + def test_export(self, api_client_maker, tmp_path, monkeypatch): + api_client = api_client_maker( + path="/api/v2/variables", + response_json=self.variable_collection_response.model_dump(), + expected_http_status_code=200, + kind=ClientKind.CLI, + ) + + monkeypatch.chdir(tmp_path) + expected_json_path = (tmp_path / self.export_file_name).as_posix() + variable_command.export( + self.parser.parse_args(["variables", "export", expected_json_path]), + api_client=api_client, + ) + assert os.path.exists(tmp_path / self.export_file_name) + + with open(expected_json_path) as f: + assert json.load(f) == {self.key: {"description": self.description, "value": self.value}} From 877b01893a548085ac923c5990162ca99e88edbf Mon Sep 17 00:00:00 2001 From: jx2lee Date: Wed, 28 May 2025 20:50:15 +0900 Subject: [PATCH 3/3] BulkCreateActionVariableBody.action to str --- airflow-ctl/src/airflowctl/ctl/commands/variable_command.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airflow-ctl/src/airflowctl/ctl/commands/variable_command.py b/airflow-ctl/src/airflowctl/ctl/commands/variable_command.py index 57144e0d0b47f..1c31828a372fd 100644 --- a/airflow-ctl/src/airflowctl/ctl/commands/variable_command.py +++ b/airflow-ctl/src/airflowctl/ctl/commands/variable_command.py @@ -23,7 +23,7 @@ import rich -from airflow.api_fastapi.core_api.datamodels.common import BulkAction, BulkActionOnExistence +from airflow.api_fastapi.core_api.datamodels.common import BulkActionOnExistence from airflowctl.api.client import NEW_API_CLIENT, ClientKind, provide_api_client from airflowctl.api.datamodels.generated import ( BulkBodyVariableBody, @@ -64,7 +64,7 @@ def import_(args, api_client=NEW_API_CLIENT): bulk_body = BulkBodyVariableBody( actions=[ BulkCreateActionVariableBody( - action=BulkAction.CREATE, + action="create", entities=vars_to_update, action_on_existence=action_on_existence, )