Skip to content

Commit

Permalink
WIP: using rayjobs
Browse files Browse the repository at this point in the history
Signed-off-by: Paul S. Schweigert <paul@paulschweigert.com>
  • Loading branch information
psschwei committed Jun 26, 2024
1 parent 72facf8 commit 9937d02
Show file tree
Hide file tree
Showing 8 changed files with 247 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ spec:
httpGet:
path: /liveness
port: http
initialDelaySeconds: 60
periodSeconds: 20
readinessProbe:
httpGet:
path: /readiness
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
apiVersion: v1
kind: ConfigMap
metadata:
name: ray-job-code-sample
data:
sample_code.py: |
import ray
import os
import requests
ray.init()
@ray.remote
class Counter:
def __init__(self):
# Used to verify runtimeEnv
self.name = os.getenv("counter_name")
assert self.name == "test_counter"
self.counter = 0
def inc(self):
self.counter += 1
def get_counter(self):
return "{} got {}".format(self.name, self.counter)
counter = Counter.remote()
for _ in range(5):
ray.get(counter.inc.remote())
print(ray.get(counter.get_counter.remote()))
# Verify that the correct runtime env was used for the job.
assert requests.__version__ == "2.26.0"
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ rules:
- ray.io
resources:
- rayclusters
- rayjobs
verbs:
- create
- delete
Expand Down Expand Up @@ -48,4 +49,4 @@ roleRef:
subjects:
- kind: ServiceAccount
name: ray-cluster-sa
{{- end }}
{{- end }}
Original file line number Diff line number Diff line change
Expand Up @@ -522,3 +522,117 @@ data:
- key: iptables.sh
path: iptables.sh
{{- end }}
rayjobtemplate.yaml: |
apiVersion: ray.io/v1
kind: RayJob
metadata:
name: rayjob-sample
spec:
# submissionMode specifies how RayJob submits the Ray job to the RayCluster.
# The default value is "K8sJobMode", meaning RayJob will submit the Ray job via a submitter Kubernetes Job.
# The alternative value is "HTTPMode", indicating that KubeRay will submit the Ray job by sending an HTTP request to the RayCluster.
# submissionMode: "K8sJobMode"
entrypoint: python /home/ray/samples/sample_code.py
# shutdownAfterJobFinishes specifies whether the RayCluster should be deleted after the RayJob finishes. Default is false.
shutdownAfterJobFinishes: true
# ttlSecondsAfterFinished specifies the number of seconds after which the RayCluster will be deleted after the RayJob finishes.
ttlSecondsAfterFinished: 10
# activeDeadlineSeconds is the duration in seconds that the RayJob may be active before
# KubeRay actively tries to terminate the RayJob; value must be positive integer.
# activeDeadlineSeconds: 120
# RuntimeEnvYAML represents the runtime environment configuration provided as a multi-line YAML string.
# See https://docs.ray.io/en/latest/ray-core/handling-dependencies.html for details.
# (New in KubeRay version 1.0.)
runtimeEnvYAML: |
pip:
- requests==2.26.0
- pendulum==2.1.2
env_vars:
counter_name: "test_counter"
# Suspend specifies whether the RayJob controller should create a RayCluster instance.
# If a job is applied with the suspend field set to true, the RayCluster will not be created and we will wait for the transition to false.
# If the RayCluster is already created, it will be deleted. In the case of transition to false, a new RayCluste rwill be created.
# suspend: false
# rayClusterSpec specifies the RayCluster instance to be created by the RayJob controller.
rayClusterSpec:
rayVersion: '2.30.0' # should match the Ray version in the image of the containers
# Ray head pod template
headGroupSpec:
# The `rayStartParams` are used to configure the `ray start` command.
# See https://github.com/ray-project/kuberay/blob/master/docs/guidance/rayStartParams.md for the default settings of `rayStartParams` in KubeRay.
# See https://docs.ray.io/en/latest/cluster/cli.html#ray-start for all available options in `rayStartParams`.
rayStartParams:
dashboard-host: '0.0.0.0'
#pod template
template:
spec:
containers:
- name: ray-head
image: icr.io/quantum-public/qiskit-serverless/ray-node:0.12.0-py310
ports:
- containerPort: 6379
name: gcs-server
- containerPort: 8265 # Ray dashboard
name: dashboard
- containerPort: 10001
name: client
resources:
limits:
cpu: "1"
requests:
cpu: "200m"
volumeMounts:
- mountPath: /home/ray/samples
name: code-sample
volumes:
# You set volumes at the Pod level, then mount them into containers inside that Pod
- name: code-sample
configMap:
# Provide the name of the ConfigMap you want to mount.
name: ray-job-code-sample
# An array of keys from the ConfigMap to create as files
items:
- key: sample_code.py
path: sample_code.py
workerGroupSpecs:
# the pod replicas in this group typed worker
- replicas: 1
minReplicas: 1
maxReplicas: 5
# logical group name, for this called small-group, also can be functional
groupName: small-group
# The `rayStartParams` are used to configure the `ray start` command.
# See https://github.com/ray-project/kuberay/blob/master/docs/guidance/rayStartParams.md for the default settings of `rayStartParams` in KubeRay.
# See https://docs.ray.io/en/latest/cluster/cli.html#ray-start for all available options in `rayStartParams`.
rayStartParams: {}
#pod template
template:
spec:
containers:
- name: ray-worker # must consist of lower case alphanumeric characters or '-', and must start and end with an alphanumeric character (e.g. 'my-name', or '123-abc'
image: icr.io/quantum-public/qiskit-serverless/ray-node:0.12.0-py310
lifecycle:
preStop:
exec:
command: [ "/bin/sh","-c","ray stop" ]
resources:
limits:
cpu: "1"
requests:
cpu: "200m"
# SubmitterPodTemplate is the template for the pod that will run the `ray job submit` command against the RayCluster.
# If SubmitterPodTemplate is specified, the first container is assumed to be the submitter container.
# submitterPodTemplate:
# spec:
# restartPolicy: Never
# containers:
# - name: my-custom-rayjob-submitter-pod
# image: rayproject/ray:2.9.0
# # If Command is not specified, the correct command will be supplied at runtime using the RayJob spec `entrypoint` field.
# # Specifying Command is not recommended.
# # command: ["sh", "-c", "ray job submit --address=http://$RAY_DASHBOARD_ADDRESS --submission-id=$RAY_JOB_SUBMISSION_ID -- echo hello world"]
4 changes: 2 additions & 2 deletions charts/qiskit-serverless/charts/gateway/templates/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ rules:
- ray.io
resources:
- rayclusters
- rayjobs
- rayjobs/status
verbs:
- create
- delete
Expand All @@ -32,5 +34,3 @@ rules:
- get
- list
{{- end }}


47 changes: 26 additions & 21 deletions gateway/api/management/commands/update_jobs_statuses.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
from django.core.management.base import BaseCommand

from api.models import Job
from api.ray import get_job_handler
from api.schedule import check_job_timeout, handle_job_status_not_available
from api.utils import ray_job_status_to_model_job_status, check_logs
from kubernetes import client as kubernetes_client, config
from kubernetes.client.exceptions import ApiException


logger = logging.getLogger("commands")

Expand All @@ -23,22 +23,29 @@ def handle(self, *args, **options):
updated_jobs_counter = 0
jobs = Job.objects.filter(status__in=Job.RUNNING_STATES)
for job in jobs:
job_status = Job.PENDING
if job.compute_resource:
job_status = Job.PENDING
success = True
job_handler = get_job_handler(job.compute_resource.host)
if job_handler:
ray_job_status = job_handler.status(job.ray_job_id)
if ray_job_status:
job_status = ray_job_status_to_model_job_status(ray_job_status)
else:
success = False
else:
success = False

job_status = check_job_timeout(job, job_status)
if not success:
job_status = handle_job_status_not_available(job, job_status)
# get rayjob status
# TODO make util function
config.load_incluster_config()
k8s_client = kubernetes_client.api_client.ApiClient()
ray_job_name = "rayjob-sample" # TODO don't hardcode

# Get cluster name
api_instance = kubernetes_client.CustomObjectsApi(k8s_client)
group = "ray.io"
version = "v1"
namespace = "default"
plural = "rayjobs"

try:
api_response = api_instance.get_namespaced_custom_object_status(
group, version, namespace, plural, ray_job_name
)
logger.debug(f"new job status is {api_response['status']['jobStatus']}")
job_status = api_response['status']['jobStatus']
except ApiException as e:
print("Exception when getting RayJob status: %s\n" % e)

if job_status != job.status:
logger.info(
Expand All @@ -54,9 +61,7 @@ def handle(self, *args, **options):
if job.in_terminal_state():
job.env_vars = "{}"

if job_handler:
logs = job_handler.logs(job.ray_job_id)
job.logs = check_logs(logs, job)
# TODO update logs on errors?

try:
job.save()
Expand Down
57 changes: 57 additions & 0 deletions gateway/api/ray.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import time
import uuid
from typing import Optional
from time import sleep

import requests
import yaml
Expand Down Expand Up @@ -201,6 +202,62 @@ def submit_job(job: Job) -> Job:
return job


def create_ray_job(
job: Job,
cluster_name: Optional[str] = None,
cluster_data: Optional[str] = None,
) -> Optional[str]:
"""Create ray job.
This is still WIP and only runs a sample job.
"""

namespace = settings.RAY_KUBERAY_NAMESPACE
jobtemplate = get_template("rayjobtemplate.yaml")
manifest = jobtemplate.render()
cluster_data = yaml.safe_load(manifest)

config.load_incluster_config()
k8s_client = kubernetes_client.api_client.ApiClient()
dyn_client = DynamicClient(k8s_client)
raycluster_client = dyn_client.resources.get(api_version="v1", kind="RayJob")
response = raycluster_client.create(body=cluster_data, namespace=namespace)
ray_job_name = response.metadata.name

logger.debug(f"Ray Job name is {ray_job_name}")

# Get cluster name
api_instance = kubernetes_client.CustomObjectsApi(k8s_client)
group = "ray.io"
version = "v1"
namespace = "default"
plural = "rayjobs"

status = False
while not status:
try:
print("getting status of rayjob")
api_response = api_instance.get_namespaced_custom_object_status(
group, version, namespace, plural, ray_job_name
)
if "status" in api_response.keys():
status = True
cluster_name = api_response['status']['rayClusterName']
job.status = api_response['status']['jobStatus']
else:
sleep(1)
except ApiException as e:
print("Exception when getting RayJob status: %s\n" % e)

resource = None
if status and cluster_name:
resource = ComputeResource()
resource.owner = job.author
resource.title = cluster_name
resource.save()
return resource


def create_ray_cluster(
job: Job,
cluster_name: Optional[str] = None,
Expand Down
51 changes: 9 additions & 42 deletions gateway/api/schedule.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@

from opentelemetry import trace

from api.models import Job, ComputeResource
from api.ray import submit_job, create_ray_cluster, kill_ray_cluster
from api.models import Job
from api.ray import create_ray_job, kill_ray_cluster
from api.utils import generate_cluster_name
from main import settings as config

Expand All @@ -41,46 +41,13 @@ def execute_job(job: Job) -> Job:

tracer = trace.get_tracer("scheduler.tracer")
with tracer.start_as_current_span("execute.job") as span:
compute_resource = ComputeResource.objects.filter(
owner=job.author, active=True
).first()

if not compute_resource:
cluster_name = generate_cluster_name(job.author.username)
span.set_attribute("job.clustername", cluster_name)
try:
compute_resource = create_ray_cluster(job, cluster_name=cluster_name)
except Exception: # pylint: disable=broad-exception-caught
# if something went wrong
# try to kill resource if it was allocated
logger.warning(
"Compute resource [%s] was not created properly.\n"
"Setting job [%s] status to [FAILED].",
cluster_name,
job,
)
kill_ray_cluster(cluster_name)
job.status = Job.FAILED
job.logs += "\nCompute resource was not created properly."

if compute_resource:
try:
job.compute_resource = compute_resource
job = submit_job(job)
job.status = Job.PENDING
except Exception: # pylint: disable=broad-exception-caught:
logger.error(
"Exception was caught during scheduling job on user [%s] resource.\n"
"Resource [%s] was in DB records, but address is not reachable.\n"
"Cleaning up db record and setting job [%s] to failed",
job.author,
compute_resource.title,
job.id,
)
kill_ray_cluster(compute_resource.title)
compute_resource.delete()
job.status = Job.FAILED
job.logs += "\nCompute resource was not found."
cluster_name = generate_cluster_name(job.author.username)
span.set_attribute("job.clustername", cluster_name)

# test out running ray job
job_resource = create_ray_job(job, cluster_name)
logger.info(f"Ray Job {job_resource} created!")
job.compute_resource = job_resource

span.set_attribute("job.status", job.status)
return job
Expand Down

0 comments on commit 9937d02

Please sign in to comment.