From d987a0e6a1929508ef517c8c051ed8745a7de063 Mon Sep 17 00:00:00 2001 From: Patryk Bundyra Date: Wed, 25 Sep 2024 14:34:54 +0000 Subject: [PATCH] Introduce Storage API --- pyproject.toml | 8 +- src/xpk/api/__init__.py | 15 ++ src/xpk/api/storage_crd.yaml | 52 ++++ src/xpk/commands/cluster.py | 44 +--- src/xpk/commands/inspector.py | 6 +- src/xpk/commands/storage.py | 84 +++++++ src/xpk/commands/workload.py | 63 ++++- src/xpk/core/core.py | 133 +++++++++-- src/xpk/core/nap.py | 6 +- src/xpk/core/pathways.py | 23 +- src/xpk/core/storage.py | 420 +++++++++++++++++++++++++++++++++ src/xpk/parser/common.py | 7 +- src/xpk/parser/core.py | 11 +- src/xpk/parser/inspector.py | 2 +- src/xpk/parser/storage.py | 121 ++++++++++ src/xpk/parser/workload.py | 12 + src/xpk/templates/__init__.py | 15 ++ src/xpk/templates/pod.yaml | 0 src/xpk/templates/storage.yaml | 13 + src/xpk/utils.py | 58 ++++- 20 files changed, 1003 insertions(+), 90 deletions(-) create mode 100644 src/xpk/api/__init__.py create mode 100644 src/xpk/api/storage_crd.yaml create mode 100644 src/xpk/commands/storage.py create mode 100644 src/xpk/core/storage.py create mode 100644 src/xpk/parser/storage.py create mode 100644 src/xpk/templates/__init__.py create mode 100644 src/xpk/templates/pod.yaml create mode 100644 src/xpk/templates/storage.yaml diff --git a/pyproject.toml b/pyproject.toml index 923473b..9d078d2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -30,7 +30,10 @@ keywords = [] # pip dependencies installed with `pip install -e .` dependencies = [ - "cloud-accelerator-diagnostics" + "cloud-accelerator-diagnostics", + "kubernetes", + "google-cloud", + "google-api-core", ] [project.urls] @@ -57,8 +60,9 @@ dev = [ version = {attr = "xpk.core.core.__version__"} [tool.setuptools] -packages = ["xpk", "xpk.parser", "xpk.core", "xpk.commands"] +packages = ["xpk", "xpk.parser", "xpk.core", "xpk.commands", "xpk.api"] package-dir = {"" = "src"} +package-data = {"xpk.api" = ["storage_crd.yaml"]} [tool.pyink] # Formatting configuration to follow Google style-guide. diff --git a/src/xpk/api/__init__.py b/src/xpk/api/__init__.py new file mode 100644 index 0000000..e7c0b71 --- /dev/null +++ b/src/xpk/api/__init__.py @@ -0,0 +1,15 @@ +""" +Copyright 2024 Google LLC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" diff --git a/src/xpk/api/storage_crd.yaml b/src/xpk/api/storage_crd.yaml new file mode 100644 index 0000000..03c47f5 --- /dev/null +++ b/src/xpk/api/storage_crd.yaml @@ -0,0 +1,52 @@ +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + name: storages.xpk.x-k8s.io +spec: + group: xpk.x-k8s.io + versions: + - name: v1 + served: true + storage: true + schema: + openAPIV3Schema: + type: object + properties: + spec: + type: object + properties: + type: + type: string + cluster: + type: string + auto_mount: + type: boolean + mount_point: + type: string + readonly: + type: boolean + manifest: + type: string + pv: + type: string + pvc: + type: string + required: + - type + - cluster + - auto_mount + - mount_point + - readonly + - manifest + - pvc + - pv + x-kubernetes-validations: + - message: Value is immutable + rule: self == oldSelf + scope: Cluster + names: + plural: storages + singular: storage + kind: Storage + shortNames: + - stg \ No newline at end of file diff --git a/src/xpk/commands/cluster.py b/src/xpk/commands/cluster.py index 35c54a9..b824d79 100644 --- a/src/xpk/commands/cluster.py +++ b/src/xpk/commands/cluster.py @@ -14,11 +14,7 @@ limitations under the License. """ -from ..core.commands import ( - run_command_for_value, - run_command_with_updates, - run_command_with_updates_retry, -) +from ..core.commands import run_command_for_value, run_command_with_updates from ..core.core import ( VERTEX_TENSORBOARD_FEATURE_FLAG, add_zone_and_project, @@ -27,6 +23,7 @@ create_vertex_tensorboard, delete_cluster_subnets, get_all_clusters_programmatic, + get_cluster_credentials, get_gke_control_plane_version, get_gke_node_pool_version, get_gke_server_config, @@ -113,9 +110,7 @@ def cluster_create(args) -> None: if update_cluster_command_code != 0: xpk_exit(update_cluster_command_code) - set_cluster_command_code = set_cluster_command(args) - if set_cluster_command_code != 0: - xpk_exit(set_cluster_command_code) + get_cluster_credentials(args) # create Vertex Tensorboard for new and existing clusters if create-vertex-tensorboard is set tensorboard_config = {} @@ -236,9 +231,7 @@ def cluster_cacheimage(args) -> None: ) add_zone_and_project(args) - set_cluster_command_code = set_cluster_command(args) - if set_cluster_command_code != 0: - xpk_exit(set_cluster_command_code) + get_cluster_credentials(args) system, return_code = get_system_characteristics(args) if return_code > 0: @@ -287,9 +280,7 @@ def cluster_describe(args) -> None: xpk_print(f'Starting nodepool list for cluster: {args.cluster}', flush=True) add_zone_and_project(args) - set_cluster_command_code = set_cluster_command(args) - if set_cluster_command_code != 0: - xpk_exit(set_cluster_command_code) + get_cluster_credentials(args) command = ( f'gcloud container node-pools list --cluster {args.cluster} ' @@ -502,28 +493,3 @@ def run_gke_cluster_create_command( xpk_print(f'GKE Cluster Create request returned ERROR {return_code}') return 1 return 0 - - -def set_cluster_command(args) -> int: - """Run cluster configuration command to set the kubectl config. - - Args: - args: user provided arguments for running the command. - - Returns: - 0 if successful and 1 otherwise. - """ - command = ( - 'gcloud container clusters get-credentials' - f' {args.cluster} --region={zone_to_region(args.zone)}' - f' --project={args.project} &&' - ' kubectl config view && kubectl config set-context --current' - ' --namespace=default' - ) - task = f'get-credentials to cluster {args.cluster}' - return_code = run_command_with_updates_retry( - command, task, args, verbose=False - ) - if return_code != 0: - xpk_print(f'{task} returned ERROR {return_code}') - return return_code diff --git a/src/xpk/commands/inspector.py b/src/xpk/commands/inspector.py index 8fbba5f..2871784 100644 --- a/src/xpk/commands/inspector.py +++ b/src/xpk/commands/inspector.py @@ -19,11 +19,11 @@ CLUSTER_METADATA_CONFIGMAP, CLUSTER_RESOURCES_CONFIGMAP, add_zone_and_project, + get_cluster_credentials, zone_to_region, ) from ..core.kueue import CLUSTER_QUEUE_NAME, LOCAL_QUEUE_NAME from ..utils import append_tmp_file, write_tmp_file, xpk_exit, xpk_print -from .cluster import set_cluster_command from .workload import get_workload_list @@ -124,9 +124,7 @@ def inspector(args) -> None: xpk_print(args) add_zone_and_project(args) - set_cluster_command_code = set_cluster_command(args) - if set_cluster_command_code != 0: - xpk_exit(set_cluster_command_code) + get_cluster_credentials(args) inspector_file = write_tmp_file( '==================\nXPK inspector OUTPUT:\n==================\n' diff --git a/src/xpk/commands/storage.py b/src/xpk/commands/storage.py new file mode 100644 index 0000000..959bc1a --- /dev/null +++ b/src/xpk/commands/storage.py @@ -0,0 +1,84 @@ +""" +Copyright 2024 Google LLC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +from argparse import Namespace + +from kubernetes import client as k8s_client +from kubernetes.client.exceptions import ApiException + +from ..core.core import ( + setup_k8s_env, + update_cluster_with_gcsfuse_driver_if_necessary, + update_cluster_with_workload_identity_if_necessary, +) +from ..core.storage import ( + STORAGE_CRD_KIND, + XPK_API_GROUP_NAME, + XPK_API_GROUP_VERSION, + create_storage_instance, + get_storage, + install_storage_crd, + list_storages, + print_storages_for_cluster, +) +from ..utils import apply_kubectl_manifest, xpk_exit, xpk_print + + +def storage_create(args: Namespace) -> None: + k8s_api_client = setup_k8s_env(args) + + install_storage_crd(k8s_api_client) + return_code = update_cluster_with_workload_identity_if_necessary(args) + if return_code > 0: + xpk_exit(return_code) + return_code = update_cluster_with_gcsfuse_driver_if_necessary(args) + if return_code > 0: + xpk_exit(return_code) + + create_storage_instance(k8s_api_client, args) + apply_kubectl_manifest(k8s_api_client, args.manifest) + + +def storage_list(args: Namespace) -> None: + k8s_api_client = setup_k8s_env(args) + install_storage_crd(k8s_api_client) + storages = list_storages(k8s_api_client) + print_storages_for_cluster(storages, args.cluster) + + +def storage_delete(args: Namespace) -> None: + k8s_api_client = setup_k8s_env(args) + install_storage_crd(k8s_api_client) + api_instance = k8s_client.CustomObjectsApi(k8s_api_client) + core_api = k8s_client.CoreV1Api() + try: + storage = get_storage(k8s_api_client, args.name) + + api_instance.delete_cluster_custom_object( + name=args.name, + group=XPK_API_GROUP_NAME, + version=XPK_API_GROUP_VERSION, + plural=STORAGE_CRD_KIND.lower() + "s", + ) + + core_api.delete_namespaced_persistent_volume_claim(storage.pvc, "default") + core_api.delete_persistent_volume(storage.pv) + except ApiException as e: + if e.status == 404: + xpk_print(f"Storage: {args.name} not found. Might be already deleted.") + else: + xpk_print(f"Encountered error during storage deletion: {e}") + xpk_exit(1) diff --git a/src/xpk/commands/workload.py b/src/xpk/commands/workload.py index 3c6c656..46dffac 100644 --- a/src/xpk/commands/workload.py +++ b/src/xpk/commands/workload.py @@ -27,9 +27,11 @@ check_if_workload_can_schedule, check_if_workload_exists, create_accelerator_label, + create_k8s_service_account, create_machine_label, create_vertex_experiment, get_cluster_configmap, + get_cluster_credentials, get_cpu_affinity, get_gke_outlier_dashboard, get_gpu_rxdm_cmd, @@ -41,9 +43,11 @@ get_volumes, is_cluster_using_clouddns, parse_env_config, + setup_k8s_env, wait_for_job_completion, xpk_current_version, zone_to_region, + GCS_FUSE_ANNOTATION, ) from ..core.kueue import LOCAL_QUEUE_NAME from ..core.nap import ( @@ -58,12 +62,20 @@ get_pathways_worker_args, get_user_workload_for_pathways, ) +from ..core.storage import ( + XPK_SA, + Storage, + add_bucket_iam_members, + get_storage_volume_mounts_yaml, + get_storage_volumes_yaml, + get_storages, +) from ..core.system_characteristics import ( AcceleratorType, get_system_characteristics, ) from ..utils import get_user_input, write_tmp_file, xpk_exit, xpk_print -from .cluster import set_cluster_command + workload_create_yaml = """apiVersion: jobset.x-k8s.io/v1alpha2 kind: JobSet @@ -74,6 +86,7 @@ xpk.google.com/workload: {args.workload} annotations: alpha.jobset.sigs.k8s.io/exclusive-topology: cloud.google.com/gke-nodepool # 1:1 job replica to node pool assignment + {gcs_fuse_annotation} spec: failurePolicy: maxRestarts: {args.max_restarts} @@ -89,6 +102,8 @@ metadata: labels: xpk.google.com/workload: {args.workload} + annotations: + {gcs_fuse_annotation} spec: schedulerName: {args.scheduler} restartPolicy: Never @@ -103,6 +118,7 @@ terminationGracePeriodSeconds: {args.termination_grace_period_seconds} containers: {container} + serviceAccountName: {service_account} volumes: {volumes} """ @@ -142,6 +158,7 @@ key: nvidia.com/gpu volumes: {gpu_volume} + {storage_volumes} containers: {gpu_rxdm_image} imagePullPolicy: Always @@ -155,6 +172,7 @@ privileged: true volumeMounts: {gpu_tcp_volume} + {storage_volume_mounts} - name: nvidia-install-dir-host mountPath: /usr/local/nvidia/lib64 - name: workload-terminated-volume @@ -193,8 +211,12 @@ completions: {system.vms_per_slice} parallelism: {system.vms_per_slice} template: + metadata: + annotations: + {gcs_fuse_annotation} spec: terminationGracePeriodSeconds: {args.termination_grace_period_seconds} + serviceAccountName: {service_account} containers: - args: {pathways_worker_args} @@ -213,6 +235,7 @@ volumeMounts: - mountPath: /tmp name: shared-tmp + {storage_volume_mounts} nodeSelector: {accelerator_label} {machine_label} @@ -223,6 +246,7 @@ path: /tmp type: DirectoryOrCreate name: shared-tmp + {storage_volumes} - name: rm replicas: 1 template: @@ -324,7 +348,8 @@ def workload_create(args) -> None: Returns: 0 if successful and 1 otherwise. """ - add_zone_and_project(args) + k8s_api_client = setup_k8s_env(args) + create_k8s_service_account(XPK_SA, 'default') if args.headless and not is_cluster_using_clouddns(args): xpk_print( @@ -333,10 +358,6 @@ def workload_create(args) -> None: ) xpk_exit(1) - set_cluster_command_code = set_cluster_command(args) - if set_cluster_command_code != 0: - xpk_exit(set_cluster_command_code) - workload_exists = check_if_workload_exists(args) if workload_exists: @@ -408,6 +429,16 @@ def workload_create(args) -> None: if return_code != 0: xpk_exit(return_code) + storages: list[Storage] = get_storages(k8s_api_client, args.storage) + gcs_fuse_annotation = '' + service_account = '' + if len(storages) > 0: + gcs_fuse_annotation = GCS_FUSE_ANNOTATION + service_account = XPK_SA + xpk_print(f'Detected Storages to add: {storages}') + else: + xpk_print('No Storages to add detected') + # Create the workload file based on accelerator type or workload type. if system.accelerator_type == AcceleratorType['GPU']: container, debugging_dashboard_id = get_user_workload_container( @@ -429,6 +460,8 @@ def workload_create(args) -> None: gpu_rxdm_image=get_gpu_rxdm_image(system), gpu_rxdm_cmd=get_gpu_rxdm_cmd(system), gpu_tcp_volume=get_gpu_tcp_volume(system), + storage_volumes=get_storage_volumes_yaml(storages), + storage_volume_mounts=get_storage_volume_mounts_yaml(storages), ) elif args.use_pathways and ensure_pathways_workload_prerequisites( args, system @@ -443,13 +476,17 @@ def workload_create(args) -> None: pathways_rm_args=get_pathways_rm_args(args, system), pathways_worker_args=get_pathways_worker_args(args), pathways_proxy_args=get_pathways_proxy_args(args), - user_workload=get_user_workload_for_pathways(args, system), + user_workload=get_user_workload_for_pathways(args, system, storages), resource_type=AcceleratorTypeToAcceleratorCharacteristics[ system.accelerator_type ].resource_type, local_queue_name=LOCAL_QUEUE_NAME, autoprovisioning_args=autoprovisioning_args, backoff_limit=system.vms_per_slice * 4, + gcs_fuse_annotation=gcs_fuse_annotation, + storage_volumes=get_storage_volumes_yaml(storages), + storage_volume_mounts=get_storage_volume_mounts_yaml(storages), + service_account=XPK_SA, ) else: container, debugging_dashboard_id = get_user_workload_container( @@ -467,6 +504,8 @@ def workload_create(args) -> None: local_queue_name=LOCAL_QUEUE_NAME, autoprovisioning_args=autoprovisioning_args, volumes=get_volumes(args, system), + gcs_fuse_annotation=gcs_fuse_annotation, + service_account=service_account, ) tmp = write_tmp_file(yml_string) command = f'kubectl apply -f {str(tmp.file.name)}' @@ -476,6 +515,8 @@ def workload_create(args) -> None: xpk_print(f'Create Workload request returned ERROR {return_code}') xpk_exit(return_code) + add_bucket_iam_members(args, storages) + # Get GKE outlier dashboard for TPU outlier_dashboard_id = None if system.accelerator_type == AcceleratorType['TPU']: @@ -539,9 +580,7 @@ def workload_delete(args) -> None: """ xpk_print('Starting Workload delete', flush=True) add_zone_and_project(args) - set_cluster_command_code = set_cluster_command(args) - if set_cluster_command_code != 0: - xpk_exit(set_cluster_command_code) + get_cluster_credentials(args) will_delete = True if not args.workload: @@ -607,9 +646,7 @@ def workload_list(args) -> None: xpk_print('Starting workload list', flush=True) add_zone_and_project(args) - set_cluster_command_code = set_cluster_command(args) - if set_cluster_command_code != 0: - xpk_exit(set_cluster_command_code) + get_cluster_credentials(args) if args.wait_for_job_completion: return_code = wait_for_job_completion(args) diff --git a/src/xpk/core/core.py b/src/xpk/core/core.py index ee80de3..c467a30 100644 --- a/src/xpk/core/core.py +++ b/src/xpk/core/core.py @@ -39,8 +39,15 @@ import string import subprocess import sys +from argparse import Namespace from dataclasses import dataclass +from google.api_core.exceptions import PermissionDenied +from google.cloud import resourcemanager_v3 +from kubernetes import client as k8s_client +from kubernetes import config +from kubernetes.client.exceptions import ApiException + from ..utils import get_user_input, write_tmp_file, xpk_exit, xpk_print from .commands import ( run_command_for_value, @@ -48,6 +55,7 @@ run_command_with_updates_retry, run_commands, ) +from .storage import Storage, get_storages from .system_characteristics import ( AcceleratorType, AcceleratorTypeToAcceleratorCharacteristics, @@ -76,6 +84,7 @@ AUTOPROVISIONING_CONFIG_VALUE = 'AUTOPROVISION' AUTOPROVISIONING_CONFIG_MINIMUM_KEY = 'minimum_chips' AUTOPROVISIONING_CONFIG_MAXIMUM_KEY = 'maximum_chips' +GCS_FUSE_ANNOTATION = 'gke-gcsfuse/volumes: "true"' class CapacityType(enum.Enum): @@ -279,6 +288,23 @@ def get_project(): ] # The project name lives on the last line of the output +def project_id_to_project_number(project_id: str) -> str: + client = resourcemanager_v3.ProjectsClient() + request = resourcemanager_v3.GetProjectRequest() + request.name = f'projects/{project_id}' + try: + response: resourcemanager_v3.Project = client.get_project(request=request) + except PermissionDenied as e: + xpk_print( + f"Couldn't translate project id: {project_id} to project number." + f' Error: {e}' + ) + xpk_exit(1) + parts = response.name.split('/', 1) + xpk_print(f'Project number for project: {project_id} is {parts[1]}') + return parts[1] + + def get_zone(): """Get GCE zone from `gcloud config get compute/zone`. @@ -312,6 +338,29 @@ def zone_to_region(zone) -> str: return zone_terms[0] + '-' + zone_terms[1] +def setup_k8s_env(args: Namespace) -> k8s_client.ApiClient: + add_zone_and_project(args) + get_cluster_credentials(args) + args.project_number = project_id_to_project_number(args.project) + + config.load_kube_config() + return k8s_client.ApiClient() + + +def create_k8s_service_account(name: str, namespace: str) -> None: + k8s_core_client = k8s_client.CoreV1Api() + sa = k8s_client.V1ServiceAccount(metadata=k8s_client.V1ObjectMeta(name=name)) + + xpk_print(f'Creating a new service account: {name}') + try: + k8s_core_client.create_namespaced_service_account( + namespace, sa, pretty=True + ) + xpk_print(f'Created a new service account: {sa} successfully') + except ApiException: + xpk_print(f'Service account: {name} already exists. Skipping its creation') + + def get_total_chips_requested_from_args( args, system: SystemCharacteristics ) -> int: @@ -960,8 +1009,8 @@ def get_cluster_configmap(args, configmap_name) -> dict[str, str] | None: return_value = return_value[return_value.index('map') :] configs = return_value[4:-1].split(' ') - for config in configs: - key, value = config.strip().split(':') + for pair in configs: + key, value = pair.strip().split(':') config_map[key] = value return config_map @@ -1980,8 +2029,20 @@ def get_gke_node_pool_version( if args.gke_version is not None: node_pool_gke_version = args.gke_version else: - node_pool_gke_version = current_gke_master_version.strip() - + master_gke_version = current_gke_master_version.strip() + node_pool_gke_version = '' + # Select minimum version which is >= master_gke_version and has the same minor version. + # If this does not exist select maximum version which is < master_gke_version. + for version in gke_server_config.valid_versions: + if ( + (node_pool_gke_version == '' or node_pool_gke_version < version) + and version < master_gke_version + ) or ( + (node_pool_gke_version == '' or node_pool_gke_version > version) + and master_gke_version <= version + and master_gke_version.split('.')[:2] == version.split('.')[:2] + ): + node_pool_gke_version = version is_supported_node_pool_version = ( node_pool_gke_version in gke_server_config.valid_versions ) @@ -1999,6 +2060,31 @@ def get_gke_node_pool_version( return 0, node_pool_gke_version +def get_cluster_credentials(args: Namespace) -> None: + """Run cluster configuration command to set the kubectl config. + + Args: + args: user provided arguments for running the command. + + Returns: + 0 if successful and 1 otherwise. + """ + command = ( + 'gcloud container clusters get-credentials' + f' {args.cluster} --region={zone_to_region(args.zone)}' + f' --project={args.project} &&' + ' kubectl config view && kubectl config set-context --current' + ' --namespace=default' + ) + task = f'get-credentials to cluster {args.cluster}' + return_code = run_command_with_updates_retry( + command, task, args, verbose=False + ) + if return_code != 0: + xpk_print(f'{task} returned ERROR {return_code}') + xpk_exit(return_code) + + def validate_docker_image(docker_image, args) -> int: """Validates that the user provided docker image exists in your project. @@ -2491,7 +2577,8 @@ def get_volumes(args, system: SystemCharacteristics) -> str: """ volumes = """- emptyDir: medium: Memory - name: dshm-2""" + name: dshm-2 + """ if ( system.accelerator_type == AcceleratorType['TPU'] @@ -2499,8 +2586,16 @@ def get_volumes(args, system: SystemCharacteristics) -> str: ): volumes += """ - name: tpu-stack-trace - - name: shared-data""" + - name: shared-data + """ + storages: list[Storage] = get_storages(setup_k8s_env(args), args.storage) + for storage in storages: + volumes += f"""- name: {storage.pv} + persistentVolumeClaim: + claimName: {storage.pvc} + readOnly: {storage.readonly} + """ return volumes @@ -2514,20 +2609,22 @@ def get_volume_mounts(args, system: SystemCharacteristics) -> str: YAML for the volumes mounted within a Pathways container or GPU container as a YAML string. """ volume_mount_yaml = """- mountPath: /dev/shm - name: dshm-2""" + name: dshm-2 + """ if args.use_pathways: volume_mount_yaml = """- mountPath: /tmp - name: shared-tmp""" + name: shared-tmp + """ elif ( system.accelerator_type == AcceleratorType['TPU'] and args.deploy_stacktrace_sidecar ): - volume_mount_yaml += """ - - name: tpu-stack-trace + volume_mount_yaml += """- name: tpu-stack-trace mountPath: /tmp/debugging - name: shared-data - mountPath: /shared-volume""" + mountPath: /shared-volume + """ elif system.accelerator_type == AcceleratorType['GPU']: if system.device_type == h100_device_type: volume_mount_yaml = """- name: nvidia-install-dir-host @@ -2539,15 +2636,23 @@ def get_volume_mounts(args, system: SystemCharacteristics) -> str: - name: shared-memory mountPath: /dev/shm - name: workload-terminated-volume - mountPath: /usr/share/workload""" + mountPath: /usr/share/workload + """ elif system.device_type == h100_mega_device_type: volume_mount_yaml = """- name: nvidia-install-dir-host mountPath: /usr/local/nvidia/lib64 - name: shared-memory mountPath: /dev/shm - name: workload-terminated-volume - mountPath: /usr/share/workload""" - + mountPath: /usr/share/workload + """ + + storages: list[Storage] = get_storages(setup_k8s_env(args), args.storage) + for storage in storages: + volume_mount_yaml += f"""- name: {storage.pv} + mountPath: {storage.mount_point} + readOnly: {storage.readonly} + """ return volume_mount_yaml diff --git a/src/xpk/core/nap.py b/src/xpk/core/nap.py index b6021e9..bcd06a4 100644 --- a/src/xpk/core/nap.py +++ b/src/xpk/core/nap.py @@ -264,13 +264,9 @@ def is_autoprovisioning_enabled( return False, 0 return_code, autoprovisioning_value = get_value_from_map( - system.gke_accelerator, cluster_config_map + system.gke_accelerator, cluster_config_map, verbose=False ) if return_code != 0: - xpk_print( - 'gke_accelerator type not found in config map:' - f' {resources_configmap_name}. Autoprovisioning is not enabled.' - ) return False, 0 if autoprovisioning_value == AUTOPROVISIONING_CONFIG_VALUE: diff --git a/src/xpk/core/pathways.py b/src/xpk/core/pathways.py index fa20398..cd53ad5 100644 --- a/src/xpk/core/pathways.py +++ b/src/xpk/core/pathways.py @@ -21,7 +21,9 @@ get_user_workload_container, is_cluster_using_clouddns, zone_to_region, + GCS_FUSE_ANNOTATION, ) +from .storage import (Storage, get_storage_volumes_yaml, XPK_SA) from .system_characteristics import SystemCharacteristics PathwaysExpectedInstancesMap = { @@ -225,7 +227,9 @@ def get_pathways_rm_args(args, system: SystemCharacteristics) -> str: return '' -def get_user_workload_for_pathways(args, system: SystemCharacteristics) -> str: +def get_user_workload_for_pathways( + args, system: SystemCharacteristics, storages: list[Storage] +) -> str: """ Create a user workload container for Pathways. Don't create one for Pathways headless mode. @@ -245,14 +249,19 @@ def get_user_workload_for_pathways(args, system: SystemCharacteristics) -> str: metadata: labels: xpk.google.com/workload: {args.workload} + spec: backoffLimit: 0 completions: 1 parallelism: 1 template: + metadata: + annotations: + {gcs_fuse_annotation} spec: containers: {container} + serviceAccountName: {service_account} nodeSelector: cloud.google.com/gke-nodepool: cpu-user-np restartPolicy: OnFailure @@ -260,12 +269,20 @@ def get_user_workload_for_pathways(args, system: SystemCharacteristics) -> str: - hostPath: path: /tmp type: DirectoryOrCreate - name: shared-tmp""" + name: shared-tmp + {storage_volumes}""" if args.headless: return '' else: container, _ = get_user_workload_container(args, system) - return user_workload_yaml.format(args=args, container=container) + storage_volumes = get_storage_volumes_yaml(storages) + return user_workload_yaml.format( + args=args, + container=container, + storage_volumes=storage_volumes, + service_account=XPK_SA, + gcs_fuse_annotation=GCS_FUSE_ANNOTATION, + ) def get_rm_address(args) -> str: diff --git a/src/xpk/core/storage.py b/src/xpk/core/storage.py new file mode 100644 index 0000000..044cc40 --- /dev/null +++ b/src/xpk/core/storage.py @@ -0,0 +1,420 @@ +""" +Copyright 2024 Google LLC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +import os +from argparse import Namespace +from dataclasses import dataclass + +import yaml +from google.cloud import storage as gcp_storage +from kubernetes import client as k8s_client +from kubernetes import utils +from kubernetes.client import ApiClient +from kubernetes.client.exceptions import ApiException +from kubernetes.client.models.v1_persistent_volume import V1PersistentVolume +from kubernetes.utils import FailToCreateError +from tabulate import tabulate + +from ..utils import xpk_exit, xpk_print + +XPK_SA = "xpk-sa" +STORAGE_CRD_PATH = "/../api/storage_crd.yaml" +STORAGE_TEMPLATE_PATH = "/../templates/storage.yaml" +STORAGE_CRD_NAME = "storages.xpk.x-k8s.io" +STORAGE_CRD_KIND = "Storage" +XPK_API_GROUP_NAME = "xpk.x-k8s.io" +XPK_API_GROUP_VERSION = "v1" + + +@dataclass +class Storage: + """ + Represents a Storage custom resource in Kubernetes. + + Attributes: + name: The name of the Storage resource. + type: The type of storage (e.g., 'GCSFuse'). + cluster: The cluster where the storage is located. + auto_mount: Whether the storage should be automatically mounted to every workload. + mount_point: The path on which a given storage should be mounted for a workload. + readonly: Whether the storage is read-only. + manifest: The path to a yaml file containing PersistentVolume and PersistentVolumeClaim for a given storage. + pvc: The name of the PersistentVolumeClaim associated with the storage. + pv: The name of the PersistentVolume associated with the storage. + bucket: The name of the bucket PersistentVolume refers to. + """ + + name: str + type: str + cluster: str + auto_mount: bool + mount_point: str + readonly: bool + manifest: str + pvc: str + pv: str + bucket: str + + def __init__(self, data: dict): + """ + Initializes a Storage object from a dictionary. + + Args: + data: A dictionary containing the Storage resource definition. + """ + metadata: k8s_client.V1ObjectMeta = data.get("metadata", {}) + self.name = metadata.get("name") + spec = data.get("spec", {}) + self.type: str = spec.get("type") + self.cluster: str = spec.get("cluster") + self.auto_mount: bool = spec.get("auto_mount") + self.mount_point: bool = spec.get("mount_point") + self.readonly: bool = spec.get("readonly") + self.manifest: str = spec.get("manifest") + self.pvc: str = spec.get("pvc") + self.pv: str = spec.get("pv") + self.bucket: str = self._get_bucket() + + def fields_as_list(self) -> list[str]: + """ + Returns a list of fields for display purposes. + + Returns: + A list of strings representing the Storage object's fields. + """ + return [ + self.name, + self.type, + self.auto_mount, + self.mount_point, + self.readonly, + self.manifest, + ] + + def _get_bucket(self) -> str: + """ + Retrieves the bucket name from PersistentVolume definition associated with the storage. + + Returns: + The name of the bucket. + """ + client = k8s_client.CoreV1Api() + try: + pv: V1PersistentVolume = client.read_persistent_volume(self.pv) + except client.ApiException as e: + xpk_print( + f"Exception when calling CoreV1Api->read_persistent_volume: {e}" + ) + return pv.spec.csi.volume_handle + + def get_mount_options(self) -> list[str]: + """ + Retrieves the mount options for the PersistentVolume. + + Returns: + A list of mount options. + """ + client = k8s_client.CoreV1Api() + try: + pv: V1PersistentVolume = client.read_persistent_volume(self.pv) + except client.ApiException as e: + xpk_print( + f"Exception when calling CoreV1Api->read_persistent_volume: {e}" + ) + return pv.spec.mount_options + + +def list_storages(k8s_api_client: ApiClient) -> list[Storage]: + """ + Lists all Storage custom resources in the cluster. + + Args: + k8s_api_client: An ApiClient object for interacting with the Kubernetes API. + + Returns: + A list of Storage objects representing the Storage resources. + """ + api_instance = k8s_client.CustomObjectsApi(k8s_api_client) + try: + resp = api_instance.list_cluster_custom_object( + group=XPK_API_GROUP_NAME, + version=XPK_API_GROUP_VERSION, + plural=STORAGE_CRD_KIND.lower() + "s", + ) + except ApiException as e: + xpk_print(f"Kubernetes API exception while listing Storages: {e}") + xpk_exit(1) + + storages = [] + for stg in resp["items"]: + storage = Storage(stg) + storages.append(storage) + return storages + + +def get_auto_mount_storages(k8s_api_client: ApiClient) -> list[Storage]: + """ + Retrieves all Storage resources that have --auto-mount flag set to true. + + Args: + k8s_api_client: An ApiClient object for interacting with the Kubernetes API. + + Returns: + A list of Storage objects that have `auto_mount` set to True. + """ + auto_mount_storages: list[Storage] = [] + for storage in list_storages(k8s_api_client): + if storage.auto_mount is True: + auto_mount_storages.append(storage) + return auto_mount_storages + + +def get_storages(k8s_api_client: ApiClient, names: list[str]) -> list[Storage]: + """ + Retrieves a list of Storage resources by their names, including auto-mounted storages. + + Args: + k8s_api_client: An ApiClient object for interacting with the Kubernetes API. + names: A list of Storage resource names to retrieve. + + Returns: + A list of Storage objects matching the given names and any auto-mounted storages. + """ + storages: list[Storage] = [] + for storage in list_storages(k8s_api_client): + if storage.name in names: + storages.append(storage) + + for auto_mounted_stg in get_auto_mount_storages(k8s_api_client): + # prevent duplicating storages + if auto_mounted_stg.name not in names: + storages.append(auto_mounted_stg) + + return storages + + +def get_storage(k8s_api_client: ApiClient, name: str) -> Storage: + """ + Retrieves a specific Storage custom resource by its name. + + Args: + k8s_api_client: An ApiClient object for interacting with the Kubernetes API. + name: The name of the Storage resource to retrieve. + + Returns: + A Storage object representing the retrieved Storage resource. + """ + api_instance = k8s_client.CustomObjectsApi(k8s_api_client) + try: + resp = api_instance.get_cluster_custom_object( + name=name, + group=XPK_API_GROUP_NAME, + version=XPK_API_GROUP_VERSION, + plural=STORAGE_CRD_KIND.lower() + "s", + ) + return Storage(resp) + except ApiException as e: + xpk_print(f"Kubernetes API exception while getting Storage {name}: {e}") + xpk_exit(1) + + +def install_storage_crd(k8s_api_client: ApiClient) -> None: + """ + Installs the Storage custom resource definition (CRD) in the Kubernetes cluster. + + Args: + k8s_api_client: An ApiClient object for interacting with the Kubernetes API. + """ + xpk_print(f"Creating a new CRD: {STORAGE_CRD_NAME}") + try: + utils.create_from_yaml( + k8s_api_client, + f"{os.path.dirname(__file__)}{STORAGE_CRD_PATH}", + verbose=True, + ) + xpk_print(f"Created a CRD: {STORAGE_CRD_NAME} successfully") + except FailToCreateError as e: + for api_exception in e.api_exceptions: + if api_exception.status == 409: + xpk_print( + f"CRD: {STORAGE_CRD_NAME} already exists. Skipping its creation" + ) + break + else: + xpk_print(f"Encountered error during installing Storage CRD: {e}") + xpk_exit(1) + + +def get_storage_volume_mounts_yaml(storages: list[Storage]) -> str: + """ + Generates the YAML representation of the volumeMounts section for the given Storages. + + This function creates the YAML snippet that defines how the storage volumes + should be mounted within a Pod's containers. + + Args: + storages: A list of Storage objects. + + Returns: + A string containing the YAML representation of the volumeMounts section. + """ + yaml_str = "" + for storage in storages: + yaml_str += f"""- name: {storage.pv} + mountPath: {storage.mount_point} + readOnly: {storage.readonly} + """ + return yaml_str + + +def get_storage_volumes_yaml(storages: list[Storage]) -> str: + """ + Generates the YAML representation of the volumes section for the given Storages. + + This function creates the YAML snippet that defines the volumes to be + mounted in a Pod, including the PersistentVolumeClaim associated with + each Storage. + + Args: + storages: A list of Storage objects. + + Returns: + A string containing the YAML representation of the volumes section. + """ + yaml_str = "" + for storage in storages: + yaml_str += f"""- name: {storage.pv} + persistentVolumeClaim: + claimName: {storage.pvc} + readOnly: {storage.readonly} + """ + return yaml_str + + +def add_bucket_iam_members(args: Namespace, storages: list[Storage]) -> None: + """ + Adds IAM members to the GCS buckets associated with the given Storages. + + This function grants the necessary permissions to the XPK service account + to access the GCS buckets. The specific role (viewer or user) is determined + based on the `readonly` attribute of each Storage object. + + Args: + args: An argparse Namespace object containing command-line arguments. + storages: A list of Storage objects. + """ + storage_client = gcp_storage.Client() + + for storage in storages: + bucket = storage_client.bucket(storage.bucket) + policy = bucket.get_iam_policy(requested_policy_version=3) + if storage.readonly: + role = "roles/storage.objectViewer" + else: + role = "roles/storage.objectUser" + + member = ( + f"principal://iam.googleapis.com/projects/{args.project_number}/" + f"locations/global/workloadIdentityPools/{args.project}.svc.id.goog/" + f"subject/ns/default/sa/{XPK_SA}" + ) + + policy.bindings.append({"role": role, "members": {member}}) + bucket.set_iam_policy(policy) + print(f"Added {member} with role {role} to {storage.bucket}.") + + +def print_storages_for_cluster(storages: list[Storage], cluster: str): + """ + Prints in human readable manner a table of Storage resources that belong to the specified cluster. + + Args: + storages: A list of Storage objects. + cluster: The name of the cluster to filter by. + """ + headers = [ + "NAME", + "TYPE", + "AUTO MOUNT", + "MOUNT POINT", + "READONLY", + "MANIFEST", + ] + storage_tab = [] + for storage in storages: + if storage.cluster == cluster: + storage_tab.append(storage.fields_as_list()) + + print( + tabulate( + storage_tab, + headers=headers, + ) + ) + + +def create_storage_instance(k8s_api_client: ApiClient, args: Namespace) -> None: + """ + Creates a new Storage custom resource in the Kubernetes cluster. + + This function reads a Storage template from a YAML file, populates it with + values from the provided arguments, and then creates the Storage object + in the cluster. + + Args: + k8s_api_client: An ApiClient object for interacting with the Kubernetes API. + args: An argparse Namespace object containing the arguments for creating + the Storage resource. + """ + abs_path = f"{os.path.dirname(__file__)}{STORAGE_TEMPLATE_PATH}" + with open(abs_path, "r", encoding="utf-8") as file: + data = yaml.safe_load(file) + + data["metadata"]["name"] = args.name + spec = data["spec"] + spec["cluster"] = args.cluster + spec["type"] = args.type + spec["auto_mount"] = args.auto_mount + spec["mount_point"] = args.mount_point + spec["readonly"] = args.readonly + spec["manifest"] = args.manifest + + with open(args.manifest, "r", encoding="utf-8") as f: + pv_pvc_definitions = yaml.safe_load_all(f) + for obj in pv_pvc_definitions: + if obj["kind"] == "PersistentVolume": + spec["pv"] = obj["metadata"]["name"] + elif obj["kind"] == "PersistentVolumeClaim": + spec["pvc"] = obj["metadata"]["name"] + + data["spec"] = spec + + api_instance = k8s_client.CustomObjectsApi(k8s_api_client) + xpk_print(f"Creating a new Storage: {args.name}") + try: + api_instance.create_cluster_custom_object( + group=XPK_API_GROUP_NAME, + version=XPK_API_GROUP_VERSION, + plural=STORAGE_CRD_KIND.lower() + "s", + body=data, + ) + xpk_print(f"Created {STORAGE_CRD_KIND} object: {data['metadata']['name']}") + except ApiException as e: + if e.status == 409: + xpk_print(f"Storage: {args.name} already exists. Skipping its creation") + else: + xpk_print(f"Encountered error during storage creation: {e}") + xpk_exit(1) diff --git a/src/xpk/parser/common.py b/src/xpk/parser/common.py index 390f7bd..28514bd 100644 --- a/src/xpk/parser/common.py +++ b/src/xpk/parser/common.py @@ -17,7 +17,9 @@ import argparse -def add_shared_arguments(custom_parser: argparse.ArgumentParser): +def add_shared_arguments( + custom_parser: argparse.ArgumentParser, required=False +) -> None: """Add shared arguments to the parser. Args: @@ -28,6 +30,7 @@ def add_shared_arguments(custom_parser: argparse.ArgumentParser): type=str, default=None, help='GCE project name, defaults to "gcloud config project."', + required=required, ) custom_parser.add_argument( '--zone', @@ -38,6 +41,7 @@ def add_shared_arguments(custom_parser: argparse.ArgumentParser): 'compute/zone." Only one of --zone or --region is allowed in a ' 'command.' ), + required=required, ) custom_parser.add_argument( '--dry-run', @@ -49,4 +53,5 @@ def add_shared_arguments(custom_parser: argparse.ArgumentParser): ' but not run them. This is imperfect in cases where xpk might' ' branch based on the output of commands' ), + required=required, ) diff --git a/src/xpk/parser/core.py b/src/xpk/parser/core.py index 5c104e0..ea5987a 100644 --- a/src/xpk/parser/core.py +++ b/src/xpk/parser/core.py @@ -19,6 +19,7 @@ from ..utils import xpk_print from .cluster import set_cluster_parser from .inspector import set_inspector_parser +from .storage import set_storage_parser from .workload import set_workload_parsers @@ -27,7 +28,10 @@ def set_parser(parser: argparse.ArgumentParser): title="xpk subcommands", dest="xpk_subcommands", help="Top level commands" ) workload_parser = xpk_subcommands.add_parser( - "workload", help="commands around workload management" + "workload", help="Commands around workload management" + ) + storage_parser = xpk_subcommands.add_parser( + "storage", help="Commands around storage management" ) cluster_parser = xpk_subcommands.add_parser( "cluster", @@ -35,7 +39,7 @@ def set_parser(parser: argparse.ArgumentParser): ) inspector_parser = xpk_subcommands.add_parser( "inspector", - help="commands around investigating workload, and Kueue failures.", + help="Commands around investigating workload, and Kueue failures.", ) def default_subcommand_function( @@ -53,12 +57,15 @@ def default_subcommand_function( parser.print_help() cluster_parser.print_help() workload_parser.print_help() + storage_parser.print_help() return 0 parser.set_defaults(func=default_subcommand_function) workload_parser.set_defaults(func=default_subcommand_function) cluster_parser.set_defaults(func=default_subcommand_function) + storage_parser.set_defaults(func=default_subcommand_function) set_workload_parsers(workload_parser=workload_parser) set_cluster_parser(cluster_parser=cluster_parser) set_inspector_parser(inspector_parser=inspector_parser) + set_storage_parser(storage_parser=storage_parser) diff --git a/src/xpk/parser/inspector.py b/src/xpk/parser/inspector.py index dfadc6c..1f2d13b 100644 --- a/src/xpk/parser/inspector.py +++ b/src/xpk/parser/inspector.py @@ -43,9 +43,9 @@ def set_inspector_parser(inspector_parser): required=True, ) - ### "inspector" Optional Arguments add_shared_arguments(inspector_parser_optional_arguments) + ### "inspector" Optional Arguments inspector_parser_optional_arguments.add_argument( '--workload', type=workload_name_type, diff --git a/src/xpk/parser/storage.py b/src/xpk/parser/storage.py new file mode 100644 index 0000000..e804185 --- /dev/null +++ b/src/xpk/parser/storage.py @@ -0,0 +1,121 @@ +""" +Copyright 2024 Google LLC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +import argparse + +from ..commands.storage import storage_create, storage_delete, storage_list +from .common import add_shared_arguments + + +def set_storage_parser(storage_parser: argparse.ArgumentParser) -> None: + storage_subcommands = storage_parser.add_subparsers( + title='storage subcommands', + dest='xpk_storage_subcommands', + help=( + 'These are commands related to storage management. Look at help for' + ' specific subcommands for more details.' + ), + ) + add_storage_create_parser(storage_subcommands) + add_storage_list_parser(storage_subcommands) + add_storage_delete_parser(storage_subcommands) + + +def add_storage_create_parser( + storage_subcommands_parser: argparse.ArgumentParser, +) -> None: + + storage_create_parser: argparse.ArgumentParser = ( + storage_subcommands_parser.add_parser( + 'create', help='Create XPK Storage.' + ) + ) + storage_create_parser.set_defaults(func=storage_create) + req_args = storage_create_parser.add_argument_group( + 'Required Arguments', + 'Arguments required for storage create.', + ) + add_shared_arguments(req_args) + req_args.add_argument( + 'name', + type=str, + help='The name of storage', + ) + req_args.add_argument( + '--type', + type=str, + help='The type of storage', + required=True, + ) + req_args.add_argument( + '--cluster', + type=str, + required=True, + ) + req_args.add_argument( + '--auto-mount', type=lambda v: v.lower() == 'true', required=True + ) + req_args.add_argument( + '--mount-point', + type=str, + required=True, + ) + req_args.add_argument( + '--readonly', type=lambda v: v.lower() == 'true', required=True + ) + + req_args.add_argument( + '--manifest', + type=str, + required=True, + ) + + +def add_storage_list_parser( + storage_subcommands_parser: argparse.ArgumentParser, +): + storage_list_parser: argparse.ArgumentParser = ( + storage_subcommands_parser.add_parser('list', help='List XPK Storages.') + ) + storage_list_parser.set_defaults(func=storage_list) + add_shared_arguments(storage_list_parser) + req_args = storage_list_parser.add_argument_group( + 'Required Arguments', + 'Arguments required for storage list.', + ) + req_args.add_argument( + '--cluster', + type=str, + ) + + +def add_storage_delete_parser( + storage_subcommands_parser: argparse.ArgumentParser, +): + storage_delete_parser: argparse.ArgumentParser = ( + storage_subcommands_parser.add_parser( + 'delete', help='Delete XPK Storage.' + ) + ) + storage_delete_parser.set_defaults(func=storage_delete) + add_shared_arguments(storage_delete_parser) + + req_args = storage_delete_parser.add_argument_group( + 'Required Arguments', + 'Arguments required for storage delete.', + ) + req_args.add_argument('name', type=str) + req_args.add_argument('--cluster', type=str, required=True) diff --git a/src/xpk/parser/workload.py b/src/xpk/parser/workload.py index 3ea38f3..8d12a2e 100644 --- a/src/xpk/parser/workload.py +++ b/src/xpk/parser/workload.py @@ -114,6 +114,12 @@ def set_workload_parsers(workload_parser): ), ) + workload_create_parser_optional_arguments.add_argument( + '--storage', + action='append', + default=[], + help='Names of storages the workload uses', + ) workload_create_parser_optional_arguments.add_argument( '--num-nodes', type=int, @@ -243,6 +249,12 @@ def set_workload_parsers(workload_parser): ), required=False, ) + workload_create_pathways_parser_optional_arguments.add_argument( + '--storage', + action='append', + default=[], + help='Names of storages the workload uses', + ) add_shared_workload_create_required_arguments([ workload_create_parser_required_arguments, diff --git a/src/xpk/templates/__init__.py b/src/xpk/templates/__init__.py new file mode 100644 index 0000000..c133d2d --- /dev/null +++ b/src/xpk/templates/__init__.py @@ -0,0 +1,15 @@ +""" +Copyright 2023 Google LLC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" diff --git a/src/xpk/templates/pod.yaml b/src/xpk/templates/pod.yaml new file mode 100644 index 0000000..e69de29 diff --git a/src/xpk/templates/storage.yaml b/src/xpk/templates/storage.yaml new file mode 100644 index 0000000..3a6d847 --- /dev/null +++ b/src/xpk/templates/storage.yaml @@ -0,0 +1,13 @@ +apiVersion: xpk.x-k8s.io/v1 +kind: Storage +metadata: + name: +spec: + auto_mount: + cluster: + manifest: + mount_point: + readonly: + type: + pvc: + pv: diff --git a/src/xpk/utils.py b/src/xpk/utils.py index b984887..fa47cc0 100644 --- a/src/xpk/utils.py +++ b/src/xpk/utils.py @@ -20,6 +20,10 @@ import sys import tempfile +import yaml +from kubernetes import client as k8s_client +from kubernetes.dynamic import DynamicClient + def chunks(lst: list, n: int): """Return a list of n-sized chunks from lst. @@ -34,7 +38,9 @@ def chunks(lst: list, n: int): return [lst[i : i + n] for i in range(0, len(lst), n)] -def get_value_from_map(key: str, map_to_search: dict) -> tuple[int, str | None]: +def get_value_from_map( + key: str, map_to_search: dict, verbose: bool = True +) -> tuple[int, str | None]: """Helper function to get value from a map if the key exists. Args: @@ -50,10 +56,11 @@ def get_value_from_map(key: str, map_to_search: dict) -> tuple[int, str | None]: if value: return 0, value else: - xpk_print( - f'Unable to find key: {key} in map: {map_to_search}.' - f'The map has the following keys: {map_to_search.keys()}' - ) + if verbose: + xpk_print( + f'Unable to find key: {key} in map: {map_to_search}.' + f'The map has the following keys: {map_to_search.keys()}' + ) return 1, value @@ -77,7 +84,6 @@ def make_tmp_files(per_command_name): def write_tmp_file(payload): """Writes `payload` to a temporary file. - Args: payload: The string to be written to the file. @@ -165,3 +171,43 @@ def directory_path_type(value): f'Directory path is invalid. User provided path was {value}' ) return value + + +def apply_kubectl_manifest(client, file_path): + dynamic_client = DynamicClient(client) + + with open(file_path, 'r', encoding='utf-8') as f: + manifest_data = yaml.safe_load_all(f) # Use yaml.safe_load_all for parsing + + # Apply each object in the manifest + for obj in manifest_data: + # Determine the resource API and namespace + api_version = obj['apiVersion'] + kind = obj['kind'] + namespace = obj.get('metadata', {}).get('namespace', 'default') + + # Get the appropriate API resource object + api_resource = dynamic_client.resources.get( + api_version=api_version, kind=kind + ) + + try: + # Apply the resource (create or update) + api_resource.create(body=obj, namespace=namespace) + print( + f"Applied {kind} '{obj['metadata']['name']}' in namespace" + f" '{namespace}'" + ) + + except k8s_client.rest.ApiException as e: + # Handle conflicts + if e.status == 409: + api_resource.patch( + body=obj, namespace=namespace, name=obj['metadata']['name'] + ) + print( + f"Updated {kind} '{obj['metadata']['name']}' in namespace" + f" '{namespace}'" + ) + else: + print(f'Error applying {kind}: {e}')