diff --git a/dev/breeze/doc/images/output_k8s_dev.svg b/dev/breeze/doc/images/output_k8s_dev.svg index bd43ff2b65d4c..db84670cf7f81 100644 --- a/dev/breeze/doc/images/output_k8s_dev.svg +++ b/dev/breeze/doc/images/output_k8s_dev.svg @@ -172,7 +172,7 @@ [default: INFO]                                      --use-standard-namingUse standard naming. --multi-namespace-modeUse multi namespace mode. ---dags-pathLocal dags directory to sync.(DIRECTORY)[default: dags] +--dags-pathLocal dags directory to sync.(DIRECTORY)[default: files/dags] --dags-destDestination path inside the Airflow container for dags.(TEXT) [default: /opt/airflow/dags]                            --deploy/--no-deployLet skaffold deploy/upgrade Airflow via Helm.[default: no-deploy] diff --git a/dev/breeze/doc/images/output_k8s_dev.txt b/dev/breeze/doc/images/output_k8s_dev.txt index 6b642713c15c9..df0c2471a345a 100644 --- a/dev/breeze/doc/images/output_k8s_dev.txt +++ b/dev/breeze/doc/images/output_k8s_dev.txt @@ -1 +1 @@ -99d0fd2bbb0308c67b89d704ed4b7b97 +588761dd319e43e8ae2e38d5d43d2a9c diff --git a/dev/breeze/src/airflow_breeze/commands/kubernetes_commands.py b/dev/breeze/src/airflow_breeze/commands/kubernetes_commands.py index 0de0496405bd4..5e248750e7529 100644 --- a/dev/breeze/src/airflow_breeze/commands/kubernetes_commands.py +++ b/dev/breeze/src/airflow_breeze/commands/kubernetes_commands.py @@ -58,6 +58,7 @@ from airflow_breeze.params.build_prod_params import BuildProdParams from airflow_breeze.utils.ci_group import ci_group from airflow_breeze.utils.click_utils import BreezeGroup +from airflow_breeze.utils.confirm import confirm_action from airflow_breeze.utils.console import Output, get_console from airflow_breeze.utils.custom_param_types import CacheableChoice, CacheableDefault from airflow_breeze.utils.kubernetes_utils import ( @@ -185,7 +186,7 @@ def kubernetes_group(): "--dags-path", help="Local dags directory to sync.", type=click.Path(file_okay=False, dir_okay=True, path_type=Path), - default="dags", + default="files/dags", show_default=True, ) option_dags_dest = click.option( @@ -286,6 +287,8 @@ def _create_cluster( output: Output | None, num_tries: int, force_recreate_cluster: bool, + *, + show_hints: bool = True, ) -> tuple[int, str]: while True: if force_recreate_cluster: @@ -319,10 +322,17 @@ def _create_cluster( kubeconfig_file = get_kubeconfig_file(python=python, kubernetes_version=kubernetes_version) (KUBERNETES_TEST_PATH / ".env").write_text(f"KUBECONFIG={quote(kubeconfig_file.as_posix())}\n") get_console(output=output).print(f"[success]KinD cluster {cluster_name} created!\n") - get_console(output=output).print( - "\n[warning]NEXT STEP:[/][info] You might now configure your cluster by:\n" - ) - get_console(output=output).print("\nbreeze k8s configure-cluster\n") + + if show_hints: + get_console(output=output).print( + "\n[warning]NEXT STEP:[/][info] You might now configure your cluster by:\n" + ) + get_console(output=output).print("\nbreeze k8s configure-cluster\n") + # or breeze k8s dev to both configure and deploy + get_console(output=output).print( + f"\n[warning]Alternatively, jump straight into development on Kubernetes with:[/]\n\n" + f"breeze k8s dev --python {python} --kubernetes-version {kubernetes_version}\n" + ) return result.returncode, f"K8S cluster {cluster_name}." num_tries -= 1 if num_tries == 0: @@ -854,16 +864,18 @@ def _build_skaffold_config( params = BuildProdParams(python=python) use_flask_appbuilder = Version(python) < Version("3.13") - if use_flask_appbuilder: - auth_manager = "airflow.providers.fab.auth_manager.fab_auth_manager.FabAuthManager" - else: - auth_manager = "airflow.api_fastapi.auth.managers.simple.simple_auth_manager.SimpleAuthManager" + auth_manager = ( + "airflow.providers.fab.auth_manager.fab_auth_manager.FabAuthManager" + if use_flask_appbuilder + else "airflow.api_fastapi.auth.managers.simple.simple_auth_manager.SimpleAuthManager" + ) + _, api_server_port = get_kubernetes_port_numbers(python=python, kubernetes_version=kubernetes_version) + + # -------------------- + # Helm values (NON-image) + # -------------------- set_values: dict[str, object] = { - "defaultAirflowRepository": params.airflow_image_kubernetes, - "defaultAirflowTag": "latest", - "images.airflow.repository": params.airflow_image_kubernetes, - "images.airflow.tag": "latest", "config.logging.logging_level": log_level, "executor": executor, "airflowVersion": params.airflow_semver_version, @@ -871,25 +883,36 @@ def _build_skaffold_config( "config.core.auth_manager": auth_manager, "config.api.base_url": f"http://localhost:{api_server_port}", } + if multi_namespace_mode: set_values["multiNamespaceMode"] = True if not use_flask_appbuilder: set_values["webserver.defaultUser.enabled"] = False if use_standard_naming: set_values["useStandardNaming"] = True + + # -------------------- + # Sync configuration + # -------------------- sync_entries: list[dict[str, str]] = [] - sync_entry: dict[str, str] = { + dependencies_paths: list[str] + + dags_sync_entry = { "src": f"{dags_relative_path}/**", "dest": dags_dest, } + if dags_relative_path != ".": - sync_entry["strip"] = f"{dags_relative_path}/" + dags_sync_entry["strip"] = f"{dags_relative_path}/" dependencies_paths = [f"{dags_relative_path}/**"] else: dependencies_paths = ["**"] - sync_entries.append(sync_entry) + + sync_entries.append(dags_sync_entry) + core_relative_path = "airflow-core/src/airflow" core_dest = f"{AIRFLOW_SOURCES_TO}/airflow-core/src/airflow" + sync_entries.append( { "src": f"{core_relative_path}/**", @@ -897,8 +920,15 @@ def _build_skaffold_config( "strip": f"{core_relative_path}/", } ) + if dependencies_paths != ["**"]: dependencies_paths.append(f"{core_relative_path}/**") + + # -------------------- + # Skaffold config + # -------------------- + image_var_suffix = params.airflow_image_kubernetes.replace("/", "_").replace(".", "_").replace("-", "_") + return { "apiVersion": "skaffold/v4beta13", "kind": "Config", @@ -925,7 +955,15 @@ def _build_skaffold_config( "name": "airflow", "chartPath": CHART_PATH.as_posix(), "namespace": HELM_AIRFLOW_NAMESPACE, + "createNamespace": True, "skipBuildDependencies": True, + # Let Skaffold inject the resolved image instead of hardcoding as `latest` here + "setValueTemplates": { + "defaultAirflowRepository": f"{{{{.IMAGE_REPO_{image_var_suffix}}}}}", + "defaultAirflowTag": f"{{{{.IMAGE_TAG_{image_var_suffix}}}}}", + "images.airflow.repository": f"{{{{.IMAGE_REPO_{image_var_suffix}}}}}", + "images.airflow.tag": f"{{{{.IMAGE_TAG_{image_var_suffix}}}}}", + }, "setValues": set_values, } ] @@ -1442,11 +1480,27 @@ def dev( f"\n[warning]Cluster for Python {python} and Kubernetes {kubernetes_version} " "has not been created yet.\n" ) - get_console().print( - "[info]Run: " - f"`breeze k8s create-cluster --python {python} --kubernetes-version {kubernetes_version}`\n" - ) - sys.exit(1) + + if confirm_action( + f"Do you want to create cluster for Python {python} and Kubernetes {kubernetes_version} now?" + ): + return_code, _ = _create_cluster( + python=python, + kubernetes_version=kubernetes_version, + output=None, + force_recreate_cluster=False, + num_tries=1, + # Since we are using skaffold dev, so we don't need to show the hints for configuring the cluster + show_hints=False, + ) + if return_code != 0: + sys.exit(return_code) + else: + get_console().print( + "\n[info]To create the cluster, please run: [/]\n\n" + f"breeze k8s create-cluster --python {python} --kubernetes-version {kubernetes_version}\n" + ) + sys.exit(0) skaffold_config = _build_skaffold_config( python=python, kubernetes_version=kubernetes_version, @@ -1473,6 +1527,9 @@ def dev( skaffold_config["deploy"]["helm"]["releases"][0]["valuesFiles"] = [dev_env_values_path.as_posix()] skaffold_config_path = Path(tmp_dir) / "skaffold.yaml" skaffold_config_path.write_text(yaml.safe_dump(skaffold_config, sort_keys=False)) + + get_console().print(f"[info]Generated skaffold config at {skaffold_config_path}") + skaffold_command = [ "skaffold", "dev",