diff --git a/airflow-ctl/docs/images/command_hashes.txt b/airflow-ctl/docs/images/command_hashes.txt index d689874c2a0e9..935d3d7dfb787 100644 --- a/airflow-ctl/docs/images/command_hashes.txt +++ b/airflow-ctl/docs/images/command_hashes.txt @@ -1,11 +1,11 @@ -main:32f8c659348c161980e59398a17f1bb0 +main:8d768c837899829dfd21d37253d2fb44 assets:b3ae2b933e54528bf486ff28e887804d auth:f396d4bce90215599dde6ad0a8f30f29 backfills:725109470cd2613de8cc8af022fb54bc config:cb175bedf29e8a2c2c6a2ebd13d770a7 connections:44e4da38aa214ccab4a1414a0c8967bb -dag:f4a1936ebf330773001d6d04f65c3249 -dagrun:8381fea6a9119b9ebba1b39261ac68a4 +dag:0c06fff60c0cc6618c8de05915506605 +dagrun:ec1b6098822419967e621687bd7e5e4b jobs:7f8680afff230eb9940bc7fca727bd52 pools:03fc7d948cbecf16ff8d640eb8f0ce43 providers:1c0afb2dff31d93ab2934b032a2250ab diff --git a/airflow-ctl/docs/images/output_dag.svg b/airflow-ctl/docs/images/output_dag.svg index 2f840734e4821..f3482d722e083 100644 --- a/airflow-ctl/docs/images/output_dag.svg +++ b/airflow-ctl/docs/images/output_dag.svg @@ -1,4 +1,4 @@ - + - - + + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - - - - Command: dag + Command: dag - + - - Usage: airflowctl dag [-h] COMMAND ... - -Perform Dag operations - -Positional Arguments: -  COMMAND -    delete           Perform delete operation -    get              Perform get operation -    get-details      Perform get_details operation -    get-import-error -                     Perform get_import_error operation -    get-stats        Perform get_stats operation -    get-tags         Perform get_tags operation -    get-version      Perform get_version operation -    list             Perform list operation -    list-import-error -                     Perform list_import_error operation -    list-version     Perform list_version operation -    list-warning     Perform list_warning operation -    patch            Perform patch operation - -Options: -  -h, --help         show this help message and exit + + Usage: airflowctl dag [-h] COMMAND ... + +Perform Dag operations + +Positional Arguments: +  COMMAND +    delete            Perform delete operation +    get               Perform get operation +    get-details       Perform get_details operation +    get-import-error  Perform get_import_error operation +    get-stats         Perform get_stats operation +    get-tags          Perform get_tags operation +    get-version       Perform get_version operation +    list              Perform list operation +    list-import-errors +                      Perform list_import_errors operation +    list-version      Perform list_version operation +    list-warning      Perform list_warning operation +    patch             Perform patch operation + +Options: +  -h, --help          show this help message and exit diff --git a/airflow-ctl/docs/images/output_dagrun.svg b/airflow-ctl/docs/images/output_dagrun.svg index f095c0e32fcc7..5a66197e1ac84 100644 --- a/airflow-ctl/docs/images/output_dagrun.svg +++ b/airflow-ctl/docs/images/output_dagrun.svg @@ -19,83 +19,83 @@ font-weight: 700; } - .terminal-1550757437-matrix { + .terminal-86224669-matrix { font-family: Fira Code, monospace; font-size: 20px; line-height: 24.4px; font-variant-east-asian: full-width; } - .terminal-1550757437-title { + .terminal-86224669-title { font-size: 18px; font-weight: bold; font-family: arial; } - .terminal-1550757437-r1 { fill: #c5c8c6 } + .terminal-86224669-r1 { fill: #c5c8c6 } - + - + - + - + - + - + - + - + - + - + - + - + - Command: dagrun + Command: dagrun - + - - Usage: airflowctl dagrun [-h] COMMAND ... - -Perform DagRun operations - -Positional Arguments: -  COMMAND -    create    Perform create operation -    get       Perform get operation -    list      Perform list operation - -Options: -  -h, --help  show this help message and exit + + Usage: airflowctl dagrun [-h] COMMAND ... + +Perform DagRun operations + +Positional Arguments: +  COMMAND +    get       Perform get operation +    list      Perform list operation +    trigger   Perform trigger operation + +Options: +  -h, --help  show this help message and exit diff --git a/airflow-ctl/docs/images/output_main.svg b/airflow-ctl/docs/images/output_main.svg index a5f8703a8549e..98bf85d2fd28f 100644 --- a/airflow-ctl/docs/images/output_main.svg +++ b/airflow-ctl/docs/images/output_main.svg @@ -19,135 +19,135 @@ font-weight: 700; } - .terminal-2610352951-matrix { + .terminal-101498644-matrix { font-family: Fira Code, monospace; font-size: 20px; line-height: 24.4px; font-variant-east-asian: full-width; } - .terminal-2610352951-title { + .terminal-101498644-title { font-size: 18px; font-weight: bold; font-family: arial; } - .terminal-2610352951-r1 { fill: #c5c8c6 } + .terminal-101498644-r1 { fill: #c5c8c6 } - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - Command: main + Command: main - + - - Usage: airflowctl [-h] GROUP_OR_COMMAND ... - -Positional Arguments: -  GROUP_OR_COMMAND - -    Groups -      assets        Perform Assets operations -      auth          Manage authentication for CLI. Either pass token from -                    environment variable/parameter or pass username and -                    password. -      backfills     Perform Backfills operations -      config        Perform Config operations -      connections   Perform Connections operations -      dag           Perform Dag operations -      dagrun        Perform DagRun operations -      jobs          Perform Jobs operations -      pools         Perform Pools operations -      providers     Perform Providers operations -      variables     Perform Variables operations - -    Commands: -      version       Show version information - -Options: -  -h, --help        show this help message and exit + + Usage: airflowctl [-h] GROUP_OR_COMMAND ... + +Positional Arguments: +  GROUP_OR_COMMAND + +    Groups +      assets        Perform Assets operations +      auth          Manage authentication for CLI. Either pass token from +                    environment variable/parameter or pass username and +                    password. +      backfills     Perform Backfills operations +      config        Perform Config operations +      connections   Perform Connections operations +      dag           Perform Dag operations +      dagrun        Perform DagRun operations +      jobs          Perform Jobs operations +      pools         Perform Pools operations +      providers     Perform Providers operations +      variables     Perform Variables operations + +    Commands: +      version       Show version information + +Options: +  -h, --help        show this help message and exit diff --git a/airflow-ctl/src/airflowctl/api/client.py b/airflow-ctl/src/airflowctl/api/client.py index 807b7f2f38404..4f2ee2fbd8d14 100644 --- a/airflow-ctl/src/airflowctl/api/client.py +++ b/airflow-ctl/src/airflowctl/api/client.py @@ -156,7 +156,9 @@ def load(self) -> Credentials: # Saving the URL set from the Auth Commands if Kind is AUTH self.save() elif self.client_kind == ClientKind.CLI: - raise AirflowCtlCredentialNotFoundException(f"No credentials found in {default_config_dir}") + raise AirflowCtlCredentialNotFoundException( + f"No credentials found in {default_config_dir} for environment {self.api_environment}." + ) else: raise AirflowCtlException(f"Unknown client kind: {self.client_kind}") diff --git a/airflow-ctl/src/airflowctl/api/operations.py b/airflow-ctl/src/airflowctl/api/operations.py index 05c6f2b1a6255..d7f62040ec76e 100644 --- a/airflow-ctl/src/airflowctl/api/operations.py +++ b/airflow-ctl/src/airflowctl/api/operations.py @@ -197,7 +197,6 @@ def login_with_username_and_password(self, login: LoginBody) -> LoginResponse | # Operations -# TODO: Get all with limit and offset to overcome default 100 limit for all list operations class AssetsOperations(BaseOperations): """Assets operations.""" @@ -494,7 +493,7 @@ def get_import_error(self, import_error_id: str) -> ImportErrorResponse | Server except ServerResponseError as e: raise e - def list_import_error(self) -> ImportErrorCollectionResponse | ServerResponseError: + def list_import_errors(self) -> ImportErrorCollectionResponse | ServerResponseError: return super().execute_list(path="importErrors", data_model=ImportErrorCollectionResponse) def get_stats(self, dag_ids: list) -> DagStatsCollectionResponse | ServerResponseError: # type: ignore @@ -523,10 +522,10 @@ def list_warning(self) -> DAGWarningCollectionResponse | ServerResponseError: class DagRunOperations(BaseOperations): """Dag run operations.""" - def get(self, dag_run_id: str) -> DAGRunResponse | ServerResponseError: + def get(self, dag_id: str, dag_run_id: str) -> DAGRunResponse | ServerResponseError: """Get a dag run.""" try: - self.response = self.client.get(f"dag_runs/{dag_run_id}") + self.response = self.client.get(f"/dags/{dag_id}/dagRuns/{dag_run_id}") return DAGRunResponse.model_validate_json(self.response.content) except ServerResponseError as e: raise e @@ -545,16 +544,21 @@ def list( "end_date": end_date, "state": state, "limit": limit, + "dag_id": dag_id, } - return super().execute_list(path="dag_runs", data_model=DAGRunCollectionResponse, params=params) + return super().execute_list( + path=f"/dags/{dag_id}/dagRuns", data_model=DAGRunCollectionResponse, params=params + ) - def create( + def trigger( self, dag_id: str, trigger_dag_run: TriggerDAGRunPostBody ) -> DAGRunResponse | ServerResponseError: """Create a dag run.""" try: # It is model_dump_json() because it has unparsable json datetime objects - self.response = self.client.post(f"dag_runs/{dag_id}", json=trigger_dag_run.model_dump_json()) + self.response = self.client.post( + f"/dags/{dag_id}/dagRuns", json=trigger_dag_run.model_dump_json() + ) return DAGRunResponse.model_validate_json(self.response.content) except ServerResponseError as e: raise e diff --git a/airflow-ctl/src/airflowctl/ctl/cli_config.py b/airflow-ctl/src/airflowctl/ctl/cli_config.py index 9ef416b077073..29e55d4a36c38 100644 --- a/airflow-ctl/src/airflowctl/ctl/cli_config.py +++ b/airflow-ctl/src/airflowctl/ctl/cli_config.py @@ -77,16 +77,23 @@ def safe_call_command(function: Callable, args: Iterable[Arg]) -> None: rich.print(f"command failed due to {e}") sys.exit(1) except httpx.RemoteProtocolError as e: + rich.print(f"[red]Remote protocol error: {e}[/red]") if "Server disconnected without sending a response." in str(e): rich.print( f"[red]Server response error: {e}. " "Please check if the server is running and the API URL is correct.[/red]" ) except httpx.ReadTimeout as e: + rich.print(f"[red]Read timeout error: {e}[/red]") if "timed out" in str(e): + rich.print("Please check if the server is running and the API ready to accept calls.[/red]") + except ServerResponseError as e: + rich.print(f"Server response error: {e}") + if "Client error message:" in str(e): rich.print( - f"[red]Request timed out: {e}. " - "Please check if the server is running and the API ready to accept calls.[/red]" + "[red]Client error, [/red] " + "Please check the command and its parameters. " + "If you need help, run the command with --help." ) @@ -338,7 +345,9 @@ class CommandFactory: func_map: dict[tuple, Callable] commands_map: dict[str, list[ActionCommand]] group_commands_list: list[CLICommand] - ouput_command_list: list[str] + output_command_list: list[str] + exclude_operation_names: list[str] + exclude_method_names: list[str] def __init__(self, file_path: str | Path | None = None): self.datamodels_extended_map = {} @@ -348,10 +357,20 @@ def __init__(self, file_path: str | Path | None = None): self.commands_map = {} self.group_commands_list = [] self.file_path = inspect.getfile(BaseOperations) if file_path is None else file_path + # Excluded Lists are in Class Level for further usage and avoid searching them # Exclude parameters that are not needed for CLI from datamodels self.excluded_parameters = ["schema_"] # This list is used to determine if the command/operation needs to output data - self.output_command_list = ["list", "get", "create", "delete"] + self.output_command_list = ["list", "get", "create", "delete", "update"] + self.exclude_operation_names = ["LoginOperations", "VersionOperations", "BaseOperations"] + self.exclude_method_names = [ + "error", + "__init__", + "__init_subclass__", + "_check_flag_and_exit_if_server_response_error", + # Excluding bulk operation. Out of scope for CLI. Should use implemented commands. + "bulk", + ] def _inspect_operations(self) -> None: """Parse file and return matching Operation Method with details.""" @@ -386,24 +405,15 @@ def get_function_details(node: ast.FunctionDef, parent_node: ast.ClassDef) -> di with open(self.file_path, encoding="utf-8") as file: tree = ast.parse(file.read(), filename=self.file_path) - exclude_operation_names = ["LoginOperations", "VersionOperations"] - exclude_method_names = [ - "error", - "__init__", - "__init_subclass__", - "_check_flag_and_exit_if_server_response_error", - # Excluding bulk operation. Out of scope for CLI. Should use implemented commands. - "bulk", - ] for node in ast.walk(tree): if ( isinstance(node, ast.ClassDef) and "Operations" in node.name - and node.name not in exclude_operation_names + and node.name not in self.exclude_operation_names and node.body ): for child in node.body: - if isinstance(child, ast.FunctionDef) and child.name not in exclude_method_names: + if isinstance(child, ast.FunctionDef) and child.name not in self.exclude_method_names: self.operations.append(get_function_details(node=child, parent_node=node)) @staticmethod @@ -595,6 +605,8 @@ def convert_to_dict(obj: Any, api_operation_name: str) -> dict | Any: def check_operation_and_collect_list_of_dict(dict_obj: dict) -> list: """Check if the object is a nested dictionary and collect list of dictionaries.""" + if isinstance(dict_obj, dict): + return [dict_obj] def is_dict_nested(obj: dict) -> bool: """Check if the object is a nested dictionary.""" diff --git a/airflow-ctl/src/airflowctl/ctl/commands/auth_command.py b/airflow-ctl/src/airflowctl/ctl/commands/auth_command.py index e3c3a840c8582..e66a1d835d40a 100644 --- a/airflow-ctl/src/airflowctl/ctl/commands/auth_command.py +++ b/airflow-ctl/src/airflowctl/ctl/commands/auth_command.py @@ -65,6 +65,7 @@ def login(args, api_client=NEW_API_CLIENT) -> None: rich.print( "[green]Please pass:[/green] [blue]--api-token[/blue] or set " "[blue]AIRFLOW_CLI_TOKEN[/blue] environment variable to login." + "[blue] Alternatively, you can use --username and --password to login.[/blue]" ) sys.exit(1) diff --git a/airflow-ctl/src/airflowctl/ctl/console_formatting.py b/airflow-ctl/src/airflowctl/ctl/console_formatting.py index ac9bc43159722..38e065375162b 100644 --- a/airflow-ctl/src/airflowctl/ctl/console_formatting.py +++ b/airflow-ctl/src/airflowctl/ctl/console_formatting.py @@ -87,7 +87,7 @@ def print_as_plain_table(self, data: list[dict]): return rows = [d.values() for d in data] output = tabulate(rows, tablefmt="plain", headers=list(data[0])) - print(output) + self.print(output) def _normalize_data(self, value: Any, output: str) -> list | str | dict | None: if isinstance(value, (tuple, list)): diff --git a/airflow-ctl/tests/airflow_ctl/api/test_operations.py b/airflow-ctl/tests/airflow_ctl/api/test_operations.py index 62099d0f1a98b..edb81007d46cb 100644 --- a/airflow-ctl/tests/airflow_ctl/api/test_operations.py +++ b/airflow-ctl/tests/airflow_ctl/api/test_operations.py @@ -832,7 +832,7 @@ def handle_request(request: httpx.Request) -> httpx.Response: response = client.dags.get_import_error(import_error_id=0) assert response == self.import_error_response - def test_list_import_error(self): + def test_list_import_errors(self): def handle_request(request: httpx.Request) -> httpx.Response: assert request.url.path == "/api/v2/importErrors" return httpx.Response( @@ -840,7 +840,7 @@ def handle_request(request: httpx.Request) -> httpx.Response: ) client = make_api_client(transport=httpx.MockTransport(handle_request)) - response = client.dags.list_import_error() + response = client.dags.list_import_errors() assert response == self.import_error_collection_response def test_get_stats(self): @@ -932,16 +932,16 @@ class TestDagRunOperations: def test_get(self): def handle_request(request: httpx.Request) -> httpx.Response: - assert request.url.path == f"/api/v2/dag_runs/{self.dag_run_id}" + assert request.url.path == f"/api/v2/dags/{self.dag_id}/dagRuns/{self.dag_run_id}" return httpx.Response(200, json=json.loads(self.dag_run_response.model_dump_json())) client = make_api_client(transport=httpx.MockTransport(handle_request)) - response = client.dag_runs.get(dag_run_id=self.dag_run_id) + response = client.dag_runs.get(dag_id=self.dag_id, dag_run_id=self.dag_run_id) assert response == self.dag_run_response def test_list(self): def handle_request(request: httpx.Request) -> httpx.Response: - assert request.url.path == "/api/v2/dag_runs" + assert request.url.path == f"/api/v2/dags/{self.dag_id}/dagRuns" return httpx.Response(200, json=json.loads(self.dag_run_collection_response.model_dump_json())) client = make_api_client(transport=httpx.MockTransport(handle_request)) @@ -954,13 +954,13 @@ def handle_request(request: httpx.Request) -> httpx.Response: ) assert response == self.dag_run_collection_response - def test_create(self): + def test_trigger(self): def handle_request(request: httpx.Request) -> httpx.Response: - assert request.url.path == f"/api/v2/dag_runs/{self.dag_id}" + assert request.url.path == f"/api/v2/dags/{self.dag_id}/dagRuns" return httpx.Response(200, json=json.loads(self.dag_run_response.model_dump_json())) client = make_api_client(transport=httpx.MockTransport(handle_request)) - response = client.dag_runs.create(dag_id=self.dag_id, trigger_dag_run=self.trigger_dag_run) + response = client.dag_runs.trigger(dag_id=self.dag_id, trigger_dag_run=self.trigger_dag_run) assert response == self.dag_run_response