Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce Storage API #192

Open
wants to merge 34 commits into
base: development
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
d987a0e
Introduce Storage API
PBundyra Sep 25, 2024
3bd157e
Fix imports
PBundyra Sep 30, 2024
667d384
Improve applying manifest logic
PBundyra Sep 30, 2024
20db482
Add storage e2e sanity checks
PBundyra Sep 30, 2024
a46d5cb
Fix linter
PBundyra Sep 30, 2024
c9e117e
Improve ci/cd
PBundyra Sep 30, 2024
0c20153
Improve ci/cd
PBundyra Sep 30, 2024
ebff4f6
Improve ci/cd
PBundyra Sep 30, 2024
91a1648
Improve ci/cd
PBundyra Sep 30, 2024
08799f9
Improve ci/cd
PBundyra Sep 30, 2024
8d35892
Improve ci/cd
PBundyra Sep 30, 2024
8cf6a0f
Improve ci/cd
PBundyra Sep 30, 2024
19faaf9
Improve ci/cd
PBundyra Sep 30, 2024
922d731
Improve ci/cd
PBundyra Sep 30, 2024
e79edc8
Improve ci/cd
PBundyra Sep 30, 2024
1c1de0c
Improve ci/cd
PBundyra Sep 30, 2024
fdc29e2
Fix gcloud parsing
PBundyra Sep 30, 2024
7b13205
Fix gcloud parsing
PBundyra Sep 30, 2024
4dea70e
Fix gcloud parsing
PBundyra Sep 30, 2024
cdc1c0b
Fix gcloud parsing
PBundyra Sep 30, 2024
88e49fd
Fix gcloud parsing
PBundyra Sep 30, 2024
2ab639a
Fix gcloud parsing
PBundyra Sep 30, 2024
2b54642
Fix gcloud parsing
PBundyra Sep 30, 2024
7849cff
Improve ci/cd
PBundyra Oct 1, 2024
450a5ea
Improve ci/cd
PBundyra Oct 1, 2024
391fa23
Improve ci/cd
PBundyra Oct 1, 2024
e26f0a3
Improve ci/cd
PBundyra Oct 2, 2024
4079019
Improve ci/cd
PBundyra Oct 2, 2024
9e31b2a
Improve ci/cd
PBundyra Oct 2, 2024
ba70658
Improve ci/cd
PBundyra Oct 2, 2024
3aba2b5
Improve ci/cd
PBundyra Oct 2, 2024
3e6c07f
Improve ci/cd
PBundyra Oct 2, 2024
bc9a8c0
Improve ci/cd
PBundyra Oct 2, 2024
c42cd4c
Improve ci/cd
PBundyra Oct 2, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 38 additions & 5 deletions .github/workflows/build_tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ env:
TPU_CLUSTER_NAME: build-xpk-2-v4-8-nodepools
WORKLOAD_NAME: xpktest-build-${{ github.run_attempt }}
PATHWAYS_WORKLOAD_NAME: xpkpw-build-${{ github.run_attempt }}
STORAGE_NAME: test-storage

jobs:
cluster-create-and-delete:
Expand All @@ -43,6 +44,10 @@ jobs:
with:
version: '>= 363.0.0'
install_components: 'beta,gke-gcloud-auth-plugin'
- name: Generate random seed
run: |
RANDOM_SEED=$((RANDOM % 10000)) # Generate a random number between 0 and 9999
echo "RANDOM_SEED=$RANDOM_SEED" >> $GITHUB_ENV
- name: Verify gcp setup
run: gcloud info
- name: Set Google Cloud CLI properties to a unused zone to verify --zone arg is passed properly in commands.
Expand All @@ -54,19 +59,43 @@ jobs:
pip install .
xpk --help
- name: Create a Pathways-enabled XPK Cluster with 2x v4-8 nodepools. Larger num-nodes to avoid master resizing.
run: python xpk.py cluster create-pathways --cluster $TPU_CLUSTER_NAME --tpu-type=v4-8 --num-slices=2 --zone=us-central2-b --default-pool-cpu-machine-type=n1-standard-16 --default-pool-cpu-num-nodes=16 --reservation='${{ secrets.GCP_TPU_V4_RESERVATION }}'
run: |
python3 xpk.py cluster create-pathways --cluster $TPU_CLUSTER_NAME --tpu-type=v4-8 --num-slices=2 \
--zone=us-central2-b --default-pool-cpu-machine-type=n1-standard-16 --default-pool-cpu-num-nodes=16 \
--reservation='${{ secrets.GCP_TPU_V4_RESERVATION }}' --enable-workload-identity --enable-gcsfuse-csi-driver
- name: Authenticate Docker
run: gcloud auth configure-docker --quiet
- name: Create auto-mount Storage instance
run: |
python3 xpk.py storage create $STORAGE_NAME --cluster=$TPU_CLUSTER_NAME --zone=us-central2-b --type=test-type \
--auto-mount=true \
--mount-point='/test-mount-point' --readonly=false --manifest='./tests/data/pv-pvc-templates.yaml'
- name: List and verify existing Storages
run: python3 xpk.py storage list --cluster $TPU_CLUSTER_NAME --zone=us-central2-b | tee output.txt | grep 'test-storage' || (echo 'No storage found' && cat output.txt && exit 1)
- name: Create test script to execute in workloads
run: echo -e '#!/bin/bash \n echo "Hello world from a test script!"' > test.sh
run: |
echo -e \
'#!/bin/bash \n
echo "Hello world from a test script!"
cd ~/../test-mount-point && echo "Hello world from a Github Action CI/CD test script!" > '$RANDOM_SEED'.txt' \
> test.sh
- name: Run a base-docker-image workload
run: python xpk.py workload create --cluster $TPU_CLUSTER_NAME --workload $WORKLOAD_NAME --command "bash test.sh" --tpu-type=v4-8 --num-slices=2 --zone=us-central2-b
run: |
python3 xpk.py workload create --cluster $TPU_CLUSTER_NAME --workload $WORKLOAD_NAME --command "bash test.sh" \
--tpu-type=v4-8 --num-slices=2 --zone=us-central2-b
- name: Run xpk inspector with the workload created above
run: python3 xpk.py inspector --cluster $TPU_CLUSTER_NAME --zone=us-central2-b --workload $WORKLOAD_NAME
- name: Wait for workload completion and confirm it succeeded
run: python3 xpk.py workload list --cluster $TPU_CLUSTER_NAME --zone=us-central2-b --wait-for-job-completion $WORKLOAD_NAME --timeout 300
- name: Run a Pathways workload on Ubuntu base image
run: python xpk.py workload create-pathways --cluster $TPU_CLUSTER_NAME --workload $PATHWAYS_WORKLOAD_NAME --docker-image='marketplace.gcr.io/google/ubuntu2004' --tpu-type=v4-8 --num-slices=2 --zone=us-central2-b --command "echo \"Hello world from a test script! \""
run: |
python3 xpk.py workload create-pathways --cluster $TPU_CLUSTER_NAME --workload $PATHWAYS_WORKLOAD_NAME \
--docker-image='marketplace.gcr.io/google/ubuntu2004' --tpu-type=v4-8 --num-slices=2 --zone=us-central2-b \
--command "echo \"Hello world from a test script! \""
- name: Verify if the file was created in the GCS bucket
run: gsutil cp gs://xpk-ci-cd-tests/$RANDOM_SEED.txt .
- name: Check if the file contains desired content
run: grep 'Hello world from a Github Action CI/CD test script!' $RANDOM_SEED.txt
- name: Wait for Pathways workload completion and confirm it succeeded
run: python3 xpk.py workload list --cluster $TPU_CLUSTER_NAME --zone=us-central2-b --wait-for-job-completion $PATHWAYS_WORKLOAD_NAME --timeout 300
- name: List out the workloads on the cluster
Expand All @@ -75,9 +104,13 @@ jobs:
run: python3 xpk.py workload delete --workload $WORKLOAD_NAME --cluster $TPU_CLUSTER_NAME --zone=us-central2-b
- name: Delete the Pathways workload on the cluster
run: python3 xpk.py workload delete --workload $PATHWAYS_WORKLOAD_NAME --cluster $TPU_CLUSTER_NAME --zone=us-central2-b
- name: Delete created GCS file
run: gsutil rm gs://xpk-ci-cd-tests/$RANDOM_SEED.txt
- name: Delete existing Storage
run: python3 xpk.py storage delete $STORAGE_NAME --cluster $TPU_CLUSTER_NAME --zone=us-central2-b
- name: Delete the cluster created
if: always()
run: python xpk.py cluster delete --cluster $TPU_CLUSTER_NAME --zone=us-central2-b
run: python3 xpk.py cluster delete --cluster $TPU_CLUSTER_NAME --zone=us-central2-b



Expand Down
9 changes: 7 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,11 @@ keywords = []

# pip dependencies installed with `pip install -e .`
dependencies = [
"cloud-accelerator-diagnostics"
"cloud-accelerator-diagnostics",
"kubernetes",
"google-cloud",
"google-api-core",
"tabulate",
]

[project.urls]
Expand All @@ -57,8 +61,9 @@ dev = [
version = {attr = "xpk.core.core.__version__"}

[tool.setuptools]
packages = ["xpk", "xpk.parser", "xpk.core", "xpk.commands"]
packages = ["xpk", "xpk.parser", "xpk.core", "xpk.commands", "xpk.api"]
package-dir = {"" = "src"}
package-data = {"xpk.api" = ["storage_crd.yaml"]}

[tool.pyink]
# Formatting configuration to follow Google style-guide.
Expand Down
15 changes: 15 additions & 0 deletions src/xpk/api/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
"""
Copyright 2024 Google LLC

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

https://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
52 changes: 52 additions & 0 deletions src/xpk/api/storage_crd.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
name: storages.xpk.x-k8s.io
spec:
group: xpk.x-k8s.io
versions:
- name: v1
served: true
storage: true
schema:
openAPIV3Schema:
type: object
properties:
spec:
type: object
properties:
type:
type: string
cluster:
type: string
auto_mount:
type: boolean
mount_point:
type: string
readonly:
type: boolean
manifest:
type: string
pv:
type: string
pvc:
type: string
required:
- type
- cluster
- auto_mount
- mount_point
- readonly
- manifest
- pvc
- pv
x-kubernetes-validations:
- message: Value is immutable
rule: self == oldSelf
scope: Cluster
names:
plural: storages
singular: storage
kind: Storage
shortNames:
- stg
48 changes: 7 additions & 41 deletions src/xpk/commands/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,7 @@
limitations under the License.
"""

from ..core.commands import (
run_command_for_value,
run_command_with_updates,
run_command_with_updates_retry,
)
from ..core.commands import run_command_for_value, run_command_with_updates
from ..core.core import (
VERTEX_TENSORBOARD_FEATURE_FLAG,
add_zone_and_project,
Expand All @@ -27,6 +23,7 @@
create_vertex_tensorboard,
delete_cluster_subnets,
get_all_clusters_programmatic,
get_cluster_credentials,
get_gke_control_plane_version,
get_gke_node_pool_version,
get_gke_server_config,
Expand All @@ -36,8 +33,8 @@
set_jobset_on_cluster,
set_up_cluster_network_for_gpu,
update_cluster_with_clouddns_if_necessary,
update_cluster_with_workload_identity_if_necessary,
update_cluster_with_gcsfuse_driver_if_necessary,
update_cluster_with_workload_identity_if_necessary,
zone_to_region,
)
from ..core.kueue import (
Expand Down Expand Up @@ -113,9 +110,7 @@ def cluster_create(args) -> None:
if update_cluster_command_code != 0:
xpk_exit(update_cluster_command_code)

set_cluster_command_code = set_cluster_command(args)
if set_cluster_command_code != 0:
xpk_exit(set_cluster_command_code)
get_cluster_credentials(args)

# create Vertex Tensorboard for new and existing clusters if create-vertex-tensorboard is set
tensorboard_config = {}
Expand Down Expand Up @@ -236,9 +231,7 @@ def cluster_cacheimage(args) -> None:
)
add_zone_and_project(args)

set_cluster_command_code = set_cluster_command(args)
if set_cluster_command_code != 0:
xpk_exit(set_cluster_command_code)
get_cluster_credentials(args)
system, return_code = get_system_characteristics(args)

if return_code > 0:
Expand Down Expand Up @@ -287,9 +280,7 @@ def cluster_describe(args) -> None:
xpk_print(f'Starting nodepool list for cluster: {args.cluster}', flush=True)
add_zone_and_project(args)

set_cluster_command_code = set_cluster_command(args)
if set_cluster_command_code != 0:
xpk_exit(set_cluster_command_code)
get_cluster_credentials(args)

command = (
f'gcloud container node-pools list --cluster {args.cluster} '
Expand Down Expand Up @@ -470,8 +461,8 @@ def run_gke_cluster_create_command(
' --enable-autoscaling'
' --total-min-nodes 1 --total-max-nodes 1000'
f' --num-nodes {args.default_pool_cpu_num_nodes}'
f' {args.custom_cluster_arguments}'
' --release-channel rapid'
f' {args.custom_cluster_arguments}'
)

if system.accelerator_type == AcceleratorType['GPU']:
Expand Down Expand Up @@ -502,28 +493,3 @@ def run_gke_cluster_create_command(
xpk_print(f'GKE Cluster Create request returned ERROR {return_code}')
return 1
return 0


def set_cluster_command(args) -> int:
"""Run cluster configuration command to set the kubectl config.

Args:
args: user provided arguments for running the command.

Returns:
0 if successful and 1 otherwise.
"""
command = (
'gcloud container clusters get-credentials'
f' {args.cluster} --region={zone_to_region(args.zone)}'
f' --project={args.project} &&'
' kubectl config view && kubectl config set-context --current'
' --namespace=default'
)
task = f'get-credentials to cluster {args.cluster}'
return_code = run_command_with_updates_retry(
command, task, args, verbose=False
)
if return_code != 0:
xpk_print(f'{task} returned ERROR {return_code}')
return return_code
6 changes: 2 additions & 4 deletions src/xpk/commands/inspector.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@
CLUSTER_METADATA_CONFIGMAP,
CLUSTER_RESOURCES_CONFIGMAP,
add_zone_and_project,
get_cluster_credentials,
zone_to_region,
)
from ..core.kueue import CLUSTER_QUEUE_NAME, LOCAL_QUEUE_NAME
from ..utils import append_tmp_file, write_tmp_file, xpk_exit, xpk_print
from .cluster import set_cluster_command
from .workload import get_workload_list


Expand Down Expand Up @@ -124,9 +124,7 @@ def inspector(args) -> None:
xpk_print(args)

add_zone_and_project(args)
set_cluster_command_code = set_cluster_command(args)
if set_cluster_command_code != 0:
xpk_exit(set_cluster_command_code)
get_cluster_credentials(args)

inspector_file = write_tmp_file(
'==================\nXPK inspector OUTPUT:\n==================\n'
Expand Down
84 changes: 84 additions & 0 deletions src/xpk/commands/storage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
"""
Copyright 2024 Google LLC

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

https://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""

from argparse import Namespace

from kubernetes import client as k8s_client
from kubernetes.client.exceptions import ApiException

from ..core.core import (
setup_k8s_env,
update_cluster_with_gcsfuse_driver_if_necessary,
update_cluster_with_workload_identity_if_necessary,
)
from ..core.storage import (
STORAGE_CRD_KIND,
XPK_API_GROUP_NAME,
XPK_API_GROUP_VERSION,
create_storage_instance,
get_storage,
install_storage_crd,
list_storages,
print_storages_for_cluster,
)
from ..utils import apply_kubectl_manifest, xpk_exit, xpk_print


def storage_create(args: Namespace) -> None:
k8s_api_client = setup_k8s_env(args)

install_storage_crd(k8s_api_client)
return_code = update_cluster_with_workload_identity_if_necessary(args)
if return_code > 0:
xpk_exit(return_code)
return_code = update_cluster_with_gcsfuse_driver_if_necessary(args)
if return_code > 0:
xpk_exit(return_code)

create_storage_instance(k8s_api_client, args)
apply_kubectl_manifest(k8s_api_client, args.manifest)


def storage_list(args: Namespace) -> None:
k8s_api_client = setup_k8s_env(args)
install_storage_crd(k8s_api_client)
storages = list_storages(k8s_api_client)
print_storages_for_cluster(storages, args.cluster)


def storage_delete(args: Namespace) -> None:
k8s_api_client = setup_k8s_env(args)
install_storage_crd(k8s_api_client)
api_instance = k8s_client.CustomObjectsApi(k8s_api_client)
core_api = k8s_client.CoreV1Api()
try:
storage = get_storage(k8s_api_client, args.name)

api_instance.delete_cluster_custom_object(
name=args.name,
group=XPK_API_GROUP_NAME,
version=XPK_API_GROUP_VERSION,
plural=STORAGE_CRD_KIND.lower() + "s",
)

core_api.delete_namespaced_persistent_volume_claim(storage.pvc, "default")
core_api.delete_persistent_volume(storage.pv)
except ApiException as e:
if e.status == 404:
xpk_print(f"Storage: {args.name} not found. Might be already deleted.")
else:
xpk_print(f"Encountered error during storage deletion: {e}")
xpk_exit(1)
Loading
Loading