Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dev/breeze/doc/images/output_k8s_dev.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
2 changes: 1 addition & 1 deletion dev/breeze/doc/images/output_k8s_dev.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
99d0fd2bbb0308c67b89d704ed4b7b97
588761dd319e43e8ae2e38d5d43d2a9c
99 changes: 78 additions & 21 deletions dev/breeze/src/airflow_breeze/commands/kubernetes_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
from airflow_breeze.params.build_prod_params import BuildProdParams
from airflow_breeze.utils.ci_group import ci_group
from airflow_breeze.utils.click_utils import BreezeGroup
from airflow_breeze.utils.confirm import confirm_action
from airflow_breeze.utils.console import Output, get_console
from airflow_breeze.utils.custom_param_types import CacheableChoice, CacheableDefault
from airflow_breeze.utils.kubernetes_utils import (
Expand Down Expand Up @@ -185,7 +186,7 @@ def kubernetes_group():
"--dags-path",
help="Local dags directory to sync.",
type=click.Path(file_okay=False, dir_okay=True, path_type=Path),
default="dags",
default="files/dags",
show_default=True,
)
option_dags_dest = click.option(
Expand Down Expand Up @@ -286,6 +287,8 @@ def _create_cluster(
output: Output | None,
num_tries: int,
force_recreate_cluster: bool,
*,
show_hints: bool = True,
) -> tuple[int, str]:
while True:
if force_recreate_cluster:
Expand Down Expand Up @@ -319,10 +322,17 @@ def _create_cluster(
kubeconfig_file = get_kubeconfig_file(python=python, kubernetes_version=kubernetes_version)
(KUBERNETES_TEST_PATH / ".env").write_text(f"KUBECONFIG={quote(kubeconfig_file.as_posix())}\n")
get_console(output=output).print(f"[success]KinD cluster {cluster_name} created!\n")
get_console(output=output).print(
"\n[warning]NEXT STEP:[/][info] You might now configure your cluster by:\n"
)
get_console(output=output).print("\nbreeze k8s configure-cluster\n")

if show_hints:
get_console(output=output).print(
"\n[warning]NEXT STEP:[/][info] You might now configure your cluster by:\n"
)
get_console(output=output).print("\nbreeze k8s configure-cluster\n")
# or breeze k8s dev to both configure and deploy
get_console(output=output).print(
f"\n[warning]Alternatively, jump straight into development on Kubernetes with:[/]\n\n"
f"breeze k8s dev --python {python} --kubernetes-version {kubernetes_version}\n"
)
return result.returncode, f"K8S cluster {cluster_name}."
num_tries -= 1
if num_tries == 0:
Expand Down Expand Up @@ -854,51 +864,71 @@ def _build_skaffold_config(

params = BuildProdParams(python=python)
use_flask_appbuilder = Version(python) < Version("3.13")
if use_flask_appbuilder:
auth_manager = "airflow.providers.fab.auth_manager.fab_auth_manager.FabAuthManager"
else:
auth_manager = "airflow.api_fastapi.auth.managers.simple.simple_auth_manager.SimpleAuthManager"
auth_manager = (
"airflow.providers.fab.auth_manager.fab_auth_manager.FabAuthManager"
if use_flask_appbuilder
else "airflow.api_fastapi.auth.managers.simple.simple_auth_manager.SimpleAuthManager"
)

_, api_server_port = get_kubernetes_port_numbers(python=python, kubernetes_version=kubernetes_version)

# --------------------
# Helm values (NON-image)
# --------------------
set_values: dict[str, object] = {
"defaultAirflowRepository": params.airflow_image_kubernetes,
"defaultAirflowTag": "latest",
"images.airflow.repository": params.airflow_image_kubernetes,
"images.airflow.tag": "latest",
"config.logging.logging_level": log_level,
"executor": executor,
"airflowVersion": params.airflow_semver_version,
"config.api_auth.jwt_secret": "foo",
"config.core.auth_manager": auth_manager,
"config.api.base_url": f"http://localhost:{api_server_port}",
}

if multi_namespace_mode:
set_values["multiNamespaceMode"] = True
if not use_flask_appbuilder:
set_values["webserver.defaultUser.enabled"] = False
if use_standard_naming:
set_values["useStandardNaming"] = True

# --------------------
# Sync configuration
# --------------------
sync_entries: list[dict[str, str]] = []
sync_entry: dict[str, str] = {
dependencies_paths: list[str]

dags_sync_entry = {
"src": f"{dags_relative_path}/**",
"dest": dags_dest,
}

if dags_relative_path != ".":
sync_entry["strip"] = f"{dags_relative_path}/"
dags_sync_entry["strip"] = f"{dags_relative_path}/"
dependencies_paths = [f"{dags_relative_path}/**"]
else:
dependencies_paths = ["**"]
sync_entries.append(sync_entry)

sync_entries.append(dags_sync_entry)

core_relative_path = "airflow-core/src/airflow"
core_dest = f"{AIRFLOW_SOURCES_TO}/airflow-core/src/airflow"

sync_entries.append(
{
"src": f"{core_relative_path}/**",
"dest": core_dest,
"strip": f"{core_relative_path}/",
}
)

if dependencies_paths != ["**"]:
dependencies_paths.append(f"{core_relative_path}/**")

# --------------------
# Skaffold config
# --------------------
image_var_suffix = params.airflow_image_kubernetes.replace("/", "_").replace(".", "_").replace("-", "_")

return {
"apiVersion": "skaffold/v4beta13",
"kind": "Config",
Expand All @@ -925,7 +955,15 @@ def _build_skaffold_config(
"name": "airflow",
"chartPath": CHART_PATH.as_posix(),
"namespace": HELM_AIRFLOW_NAMESPACE,
"createNamespace": True,
"skipBuildDependencies": True,
# Let Skaffold inject the resolved image instead of hardcoding as `latest` here
"setValueTemplates": {
"defaultAirflowRepository": f"{{{{.IMAGE_REPO_{image_var_suffix}}}}}",
"defaultAirflowTag": f"{{{{.IMAGE_TAG_{image_var_suffix}}}}}",
"images.airflow.repository": f"{{{{.IMAGE_REPO_{image_var_suffix}}}}}",
"images.airflow.tag": f"{{{{.IMAGE_TAG_{image_var_suffix}}}}}",
},
"setValues": set_values,
}
]
Expand Down Expand Up @@ -1442,11 +1480,27 @@ def dev(
f"\n[warning]Cluster for Python {python} and Kubernetes {kubernetes_version} "
"has not been created yet.\n"
)
get_console().print(
"[info]Run: "
f"`breeze k8s create-cluster --python {python} --kubernetes-version {kubernetes_version}`\n"
)
sys.exit(1)

if confirm_action(
f"Do you want to create cluster for Python {python} and Kubernetes {kubernetes_version} now?"
):
return_code, _ = _create_cluster(
python=python,
kubernetes_version=kubernetes_version,
output=None,
force_recreate_cluster=False,
num_tries=1,
# Since we are using skaffold dev, so we don't need to show the hints for configuring the cluster
show_hints=False,
)
if return_code != 0:
sys.exit(return_code)
else:
get_console().print(
"\n[info]To create the cluster, please run: [/]\n\n"
f"breeze k8s create-cluster --python {python} --kubernetes-version {kubernetes_version}\n"
)
sys.exit(0)
skaffold_config = _build_skaffold_config(
python=python,
kubernetes_version=kubernetes_version,
Expand All @@ -1473,6 +1527,9 @@ def dev(
skaffold_config["deploy"]["helm"]["releases"][0]["valuesFiles"] = [dev_env_values_path.as_posix()]
skaffold_config_path = Path(tmp_dir) / "skaffold.yaml"
skaffold_config_path.write_text(yaml.safe_dump(skaffold_config, sort_keys=False))

get_console().print(f"[info]Generated skaffold config at {skaffold_config_path}")

skaffold_command = [
"skaffold",
"dev",
Expand Down
Loading