Skip to content

Commit

Permalink
Add E2E test for Scheduler Plugins with coscheduling
Browse files Browse the repository at this point in the history
Signed-off-by: Yuki Iwai <yuki.iwai.tz@gmail.com>
  • Loading branch information
tenzen-y committed Jan 22, 2023
1 parent 0126d96 commit eb319db
Show file tree
Hide file tree
Showing 13 changed files with 868 additions and 149 deletions.
14 changes: 12 additions & 2 deletions .github/workflows/integration-tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,24 @@ name: integration test
on:
- pull_request

concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true

jobs:
integration-test:
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
kubernetes-version: ["v1.23.12", "v1.24.6", "v1.25.2"]
# TODO (tenzen-y): Add volcano.
gang-scheduler-name: ["none", "scheduler-plugins"]

steps:
- name: Checkout
uses: actions/checkout@v3

- name: Create k8s Kind Cluster
uses: helm/kind-action@v1.3.0
with:
Expand All @@ -25,15 +32,18 @@ jobs:
./scripts/gha/build-image.sh
env:
TRAINING_CI_IMAGE: kubeflowtraining/training-operator:test

- name: Deploy training operator
run: |
./scripts/gha/setup-training-operator.sh
env:
KIND_CLUSTER: training-operator-cluster
TRAINING_CI_IMAGE: kubeflowtraining/training-operator:test
GANG_SCHEDULER_NAME: ${{ matrix.gang-scheduler-name }}

- name: Run tests
run: |
pip install pytest
python3 -m pip install -e sdk/python; pytest sdk/python/test --log-cli-level=info
env:
GANG_SCHEDULER_NAME: ${{ matrix.gang-scheduler-name }}
15 changes: 14 additions & 1 deletion scripts/gha/setup-training-operator.sh
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ set -o errexit
set -o nounset
set -o pipefail


echo "Kind load newly locally built image"
# use cluster name which is used in github actions kind create
kind load docker-image ${TRAINING_CI_IMAGE} --name ${KIND_CLUSTER}
Expand All @@ -31,6 +30,15 @@ echo "Update training operator manifest with newly built image"
cd manifests/overlays/standalone
kustomize edit set image kubeflow/training-operator=${TRAINING_CI_IMAGE}

if [ "${GANG_SCHEDULER_NAME}" = "scheduler-plugins" ]; then
echo "Installing Scheduler Plugins..."
git clone https://github.com/kubernetes-sigs/scheduler-plugins.git
pushd scheduler-plugins/manifests/install/charts
helm install scheduler-plugins as-a-second-scheduler/
popd
rm -rf scheduler-plugins
fi

echo "Installing training operator manifests"
kustomize build . | kubectl apply -f -

Expand All @@ -39,6 +47,11 @@ until kubectl get pods -n kubeflow | grep training-operator | grep 1/1 || [[ $TI
sleep 10
TIMEOUT=$(( TIMEOUT - 1 ))
done

if [ "${GANG_SCHEDULER_NAME}" = "scheduler-plugins" ]; then
kubectl wait pods --for=condition=ready -n scheduler-plugins --timeout "${TIMEOUT}s" --all
fi

kubectl version
kubectl cluster-info
kubectl get nodes
Expand Down
174 changes: 170 additions & 4 deletions sdk/python/kubeflow/training/api/training_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -752,12 +752,12 @@ def list_tfjobs(
Args:
namespace: Namespace to list the TFJobs.
timeout: Optional, Kubernetes API server timeout in seconds
to execute the request.
Returns:
list[KubeflowOrgV1TFJob]: List of TFJobs objects. It returns
empty list if TFJobs cannot be found.
timeout: Optional, Kubernetes API server timeout in seconds
to execute the request.
Raises:
TimeoutError: Timeout to list TFJobs.
Expand Down Expand Up @@ -802,6 +802,33 @@ def delete_tfjob(
delete_options=delete_options,
)

def patch_tfjob(
self,
tfjob: models.KubeflowOrgV1TFJob,
name: str,
namespace: str = utils.get_default_target_namespace(),
):
"""Patch the TFJob.
Args:
tfjob: TFJob object of type KubeflowOrgV1TFJob.
name: Name for the TFJob.
namespace: Namespace for the TFJob.
Raises:
TimeoutError: Timeout to patch TFJob.
RuntimeError: Failed to patch TFJob.
"""

return utils.patch_job(
custom_api=self.custom_api,
job=tfjob,
namespace=namespace,
name=name,
job_kind=constants.TFJOB_KIND,
job_plural=constants.TFJOB_PLURAL,
)

# ------------------------------------------------------------------------ #
# PyTorchJob Training Client APIs.
# ------------------------------------------------------------------------ #
Expand Down Expand Up @@ -1000,6 +1027,33 @@ def delete_pytorchjob(
delete_options=delete_options,
)

def patch_pytorchjob(
self,
pytorchjob: models.KubeflowOrgV1PyTorchJob,
name: str,
namespace: str = utils.get_default_target_namespace(),
):
"""Patch the PyTorchJob.
Args:
pytorchjob: PyTorchJob object of type KubeflowOrgV1PyTorchJob.
name: Name for the PyTorchJob.
namespace: Namespace for the PyTorchJob.
Raises:
TimeoutError: Timeout to patch PyTorchJob.
RuntimeError: Failed to patch PyTorchJob.
"""

return utils.patch_job(
custom_api=self.custom_api,
job=pytorchjob,
namespace=namespace,
name=name,
job_kind=constants.PYTORCHJOB_KIND,
job_plural=constants.PYTORCHJOB_PLURAL,
)

# ------------------------------------------------------------------------ #
# MXJob Training Client APIs.
# ------------------------------------------------------------------------ #
Expand Down Expand Up @@ -1044,6 +1098,8 @@ def get_mxjob(
Args:
name: Name for the MXJob.
namespace: Namespace for the MXJob.
timeout: Optional, Kubernetes API server timeout in seconds
to execute the request.
Returns:
KubeflowOrgV1MXJob: MXJob object.
Expand Down Expand Up @@ -1123,6 +1179,33 @@ def delete_mxjob(
delete_options=delete_options,
)

def patch_mxjob(
self,
mxjob: models.KubeflowOrgV1MXJob,
name: str,
namespace: str = utils.get_default_target_namespace(),
):
"""Patch the MXJob.
Args:
mxjob: MXJob object of type KubeflowOrgV1MXJob.
name: Name for the MXJob.
namespace: Namespace for the MXJob.
Raises:
TimeoutError: Timeout to patch MXJob.
RuntimeError: Failed to patch MXJob.
"""

return utils.patch_job(
custom_api=self.custom_api,
job=mxjob,
namespace=namespace,
name=name,
job_kind=constants.MXJOB_KIND,
job_plural=constants.MXJOB_PLURAL,
)

# ------------------------------------------------------------------------ #
# XGBoostJob Training Client APIs.
# ------------------------------------------------------------------------ #
Expand Down Expand Up @@ -1248,6 +1331,33 @@ def delete_xgboostjob(
delete_options=delete_options,
)

def patch_xgboostjob(
self,
xgboostjob: models.KubeflowOrgV1XGBoostJob,
name: str,
namespace: str = utils.get_default_target_namespace(),
):
"""Patch the XGBoostJob.
Args:
xgboostjob: XGBoostJob object of type KubeflowOrgV1XGBoostJob.
name: Name for the XGBoostJob.
namespace: Namespace for the XGBoostJob.
Raises:
TimeoutError: Timeout to patch XGBoostJob.
RuntimeError: Failed to patch XGBoostJob.
"""

return utils.patch_job(
custom_api=self.custom_api,
job=xgboostjob,
namespace=namespace,
name=name,
job_kind=constants.XGBOOSTJOB_KIND,
job_plural=constants.XGBOOSTJOB_PLURAL,
)

# ------------------------------------------------------------------------ #
# MPIJob Training Client APIs.
# ------------------------------------------------------------------------ #
Expand Down Expand Up @@ -1323,12 +1433,12 @@ def list_mpijobs(
Args:
namespace: Namespace to list the MPIJobs.
timeout: Optional, Kubernetes API server timeout in seconds
to execute the request.
Returns:
list[KubeflowOrgV1MPIJob]: List of MPIJobs objects. It returns
empty list if MPIJobs cannot be found.
timeout: Optional, Kubernetes API server timeout in seconds
to execute the request.
Raises:
TimeoutError: Timeout to list MPIJobs.
Expand Down Expand Up @@ -1373,6 +1483,33 @@ def delete_mpijob(
delete_options=delete_options,
)

def patch_mpijob(
self,
mpijob: models.KubeflowOrgV1MPIJob,
name: str,
namespace: str = utils.get_default_target_namespace(),
):
"""Patch the MPIJob.
Args:
mpijob: MPIJob object of type KubeflowOrgV1MPIJob.
name: Name for the MPIJob.
namespace: Namespace for the MPIJob.
Raises:
TimeoutError: Timeout to patch MPIJob.
RuntimeError: Failed to patch MPIJob.
"""

return utils.patch_job(
custom_api=self.custom_api,
job=mpijob,
namespace=namespace,
name=name,
job_kind=constants.MPIJOB_KIND,
job_plural=constants.MPIJOB_PLURAL,
)

# ------------------------------------------------------------------------ #
# PaddleJob Training Client APIs.
# ------------------------------------------------------------------------ #
Expand Down Expand Up @@ -1417,6 +1554,8 @@ def get_paddlejob(
Args:
name: Name for the PaddleJob.
namespace: Namespace for the PaddleJob.
timeout: Optional, Kubernetes API server timeout in seconds
to execute the request.
Returns:
KubeflowOrgV1PaddleJob: PaddleJob object.
Expand Down Expand Up @@ -1495,3 +1634,30 @@ def delete_paddlejob(
job_plural=constants.PADDLEJOB_PLURAL,
delete_options=delete_options,
)

def patch_paddlejob(
self,
paddlejob: models.KubeflowOrgV1PaddleJob,
name: str,
namespace: str = utils.get_default_target_namespace(),
):
"""Patch the PaddleJob.
Args:
paddlejob: PaddleJob object of type KubeflowOrgV1PaddleJob.
name: Name for the PaddleJob.
namespace: Namespace for the PaddleJob.
Raises:
TimeoutError: Timeout to patch PaddleJob.
RuntimeError: Failed to patch PaddleJob.
"""

return utils.patch_job(
custom_api=self.custom_api,
job=paddlejob,
namespace=namespace,
name=name,
job_kind=constants.PADDLEJOB_KIND,
job_plural=constants.PADDLEJOB_PLURAL,
)
1 change: 0 additions & 1 deletion sdk/python/kubeflow/training/constants/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@
"docker.io/paddlepaddle/paddle:2.4.0rc0-gpu-cuda11.2-cudnn8.1-trt8.0"
)


# Dictionary to get plural and model for each Job kind.
JOB_KINDS = {
TFJOB_KIND: {"plural": TFJOB_PLURAL, "model": models.KubeflowOrgV1TFJob},
Expand Down
31 changes: 31 additions & 0 deletions sdk/python/kubeflow/training/utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,37 @@ def delete_job(
logging.info(f"{job_kind} {namespace}/{name} has been deleted")


def patch_job(
custom_api: client.CustomObjectsApi,
job: object,
namespace: str,
name: str,
job_kind: str,
job_plural: str,
):
"""Patch the Training Job."""

try:
custom_api.patch_namespaced_custom_object(
constants.KUBEFLOW_GROUP,
constants.OPERATOR_VERSION,
namespace,
job_plural,
name,
job,
)
except multiprocessing.TimeoutError:
raise TimeoutError(
f"Timeout to create {job_kind}: {namespace}/{job.metadata.name}"
)
except Exception:
raise RuntimeError(
f"Failed to create {job_kind}: {namespace}/{job.metadata.name}"
)

logging.info(f"{job_kind} {namespace}/{job.metadata.name} has been patched")


def wrap_log_stream(q, stream):
while True:
try:
Expand Down
Loading

0 comments on commit eb319db

Please sign in to comment.