Skip to content

Commit

Permalink
Add support for running functions on GPUs (#1515)
Browse files Browse the repository at this point in the history
* add support for running functions on gpus

Signed-off-by: Paul S. Schweigert <paul@paulschweigert.com>

* lint

Signed-off-by: Paul S. Schweigert <paul@paulschweigert.com>

* more lint

Signed-off-by: Paul S. Schweigert <paul@paulschweigert.com>

* lint 3

Signed-off-by: Paul S. Schweigert <paul@paulschweigert.com>

* lint

Signed-off-by: Paul S. Schweigert <paul@paulschweigert.com>

* label kind nodes

Signed-off-by: Paul S. Schweigert <paul@paulschweigert.com>

* adding config

Signed-off-by: Paul S. Schweigert <paul@paulschweigert.com>

---------

Signed-off-by: Paul S. Schweigert <paul@paulschweigert.com>
  • Loading branch information
psschwei authored Oct 22, 2024
1 parent a297ef1 commit 4f29082
Show file tree
Hide file tree
Showing 14 changed files with 143 additions and 5 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/kubernetes-deploy.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ jobs:
with:
k8s-version: 1.29.x
kind-worker-count: 0
- name: Label nodes
run: kubectl label node kind-control-plane has-gpu=gpu has-cpu=cpu
- name: Build and load gateway
run: |
docker build -t gateway:test -f ./gateway/Dockerfile .
Expand Down
14 changes: 14 additions & 0 deletions charts/qiskit-serverless/charts/gateway/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,14 @@ spec:
value: {{ .Values.application.ray.minReplicas | quote }}
- name: RAY_CLUSTER_WORKER_MAX_REPLICAS
value: {{ .Values.application.ray.maxReplicas | quote }}
- name: RAY_CLUSTER_CPU_NODE_SELECTOR_LABEL
value: {{ .Values.application.nodeSelector.cpu | quote }}
- name: RAY_CLUSTER_GPU_NODE_SELECTOR_LABEL
value: {{ .Values.application.nodeSelector.gpu | quote }}
- name: LIMITS_CPU_PER_TASK
value: {{ .Values.application.ray.cpu | quote }}
- name: LIMITS_GPU_PER_TASK
value: {{ .Values.application.ray.gpu | quote }}
- name: LIMITS_MEMORY_PER_TASK
value: {{ .Values.application.ray.memory | quote }}
{{- if .Values.application.superuser.enable }}
Expand Down Expand Up @@ -310,10 +316,18 @@ spec:
value: {{ .Release.Namespace }}
- name: RAY_NODE_IMAGE
value: {{ .Values.application.ray.nodeImage | quote }}
- name: RAY_CLUSTER_CPU_NODE_SELECTOR_LABEL
value: {{ .Values.application.nodeSelector.cpu | quote }}
- name: RAY_CLUSTER_GPU_NODE_SELECTOR_LABEL
value: {{ .Values.application.nodeSelector.gpu | quote }}
- name: LIMITS_JOBS_PER_USER
value: {{ .Values.application.limits.maxJobsPerUser | quote }}
- name: LIMITS_MAX_CLUSTERS
value: {{ .Values.application.limits.maxComputeResources | quote }}
- name: LIMITS_GPU_CLUSTERS
value: {{ .Values.application.limits.maxGpuResources | quote }}
- name: GATEWAY_GPU_JOBS_CONFIG
value: {{ .Values.application.ray.gpuJobsConfig | quote }}
{{- if .Values.application.limits.keepClusterOnComplete }}
- name: RAY_CLUSTER_NO_DELETE_ON_COMPLETE
value: "True"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,11 @@ data:
protocol: TCP
resources:
limits:
nvidia.com/gpu: {{`{{gpu_request}}`}}
cpu: {{ .Values.application.ray.cpu }}
memory: {{ .Values.application.ray.memory }}Gi
requests:
nvidia.com/gpu: {{`{{gpu_request}}`}}
cpu: {{ .Values.application.ray.cpu }}
memory: {{ .Values.application.ray.memory }}Gi
securityContext:
Expand Down Expand Up @@ -230,6 +232,7 @@ data:
serviceAccount: ray-cluster-sa
{{- end }}
nodeSelector:
{{`{{node_selector_label}}`}}
tolerations: []
securityContext:
fsGroup: 1000
Expand Down
3 changes: 3 additions & 0 deletions charts/qiskit-serverless/charts/gateway/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ application:
nodeImage: "icr.io/quantum-public/qiskit-serverless/ray-node:0.17.1"
cpu: 2
memory: 2
gpu: 1
replicas: 1
minReplicas: 1
maxReplicas: 4
Expand All @@ -36,6 +37,7 @@ application:
port: 4317
insecure: 0
useTLS: true
gpuJobsConfig: "api/v1/gpu-jobs.json"
proxy:
enabled: true
cpu: 1
Expand All @@ -44,6 +46,7 @@ application:
limits:
maxJobsPerUser: 2
maxComputeResources: 4
maxGpuResources: 1
keepClusterOnComplete: False
programTimeoutDays: 14
qiskitRuntime:
Expand Down
3 changes: 3 additions & 0 deletions charts/qiskit-serverless/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ gateway:
limits:
maxJobsPerUser: 2
maxComputeResources: 4
nodeSelector:
cpu: "has-cpu: cpu"
gpu: "has-gpu: gpu"
cos:
claimName: gateway-claim

Expand Down
4 changes: 2 additions & 2 deletions gateway/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ RUN ln -s /usr/bin/python3.11 /usr/bin/python
WORKDIR /usr/src/app

# set environment variables
ENV PYTHONDONTWRITEBYTECODE 1
ENV PYTHONUNBUFFERED 1
ENV PYTHONDONTWRITEBYTECODE=1
ENV PYTHONUNBUFFERED=1

COPY gateway/requirements.txt .
# Install pip
Expand Down
28 changes: 26 additions & 2 deletions gateway/api/management/commands/schedule_queued_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,30 @@ class Command(BaseCommand):

def handle(self, *args, **options):
max_ray_clusters_possible = settings.LIMITS_MAX_CLUSTERS
number_of_clusters_running = ComputeResource.objects.filter(active=True).count()
max_gpu_clusters_possible = settings.LIMITS_GPU_CLUSTERS
number_of_clusters_running = ComputeResource.objects.filter(
active=True, gpu=False
).count()
number_of_gpu_clusters_running = ComputeResource.objects.filter(
active=True, gpu=True
).count()

self.schedule_jobs_if_slots_available(
max_ray_clusters_possible, number_of_clusters_running, False
)
self.schedule_jobs_if_slots_available(
max_gpu_clusters_possible, number_of_gpu_clusters_running, True
)

def schedule_jobs_if_slots_available(
self, max_ray_clusters_possible, number_of_clusters_running, gpu_job
):
"""Schedule jobs depending on free cluster slots."""
free_clusters_slots = max_ray_clusters_possible - number_of_clusters_running
logger.info("%s free cluster slots.", free_clusters_slots)
if gpu_job:
logger.info("%s free GPU cluster slots.", free_clusters_slots)
else:
logger.info("%s free CPU cluster slots.", free_clusters_slots)

if free_clusters_slots < 1:
# no available resources
Expand All @@ -45,6 +66,9 @@ def handle(self, *args, **options):
# we have available resources
jobs = get_jobs_to_schedule_fair_share(slots=free_clusters_slots)

# only process jobs of the appropriate compute type
jobs = [job for job in jobs if job.gpu is gpu_job]

for job in jobs:
# only for local mode
if settings.RAY_CLUSTER_MODE.get(
Expand Down
23 changes: 23 additions & 0 deletions gateway/api/migrations/0032_computeresource_gpu_job_gpu.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Generated by Django 4.2.15 on 2024-10-09 20:15

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
("api", "0031_program_readable_title_provider_readable_name"),
]

operations = [
migrations.AddField(
model_name="computeresource",
name="gpu",
field=models.BooleanField(default=False),
),
migrations.AddField(
model_name="job",
name="gpu",
field=models.BooleanField(default=False),
),
]
4 changes: 4 additions & 0 deletions gateway/api/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,8 @@ class ComputeResource(models.Model):
blank=True,
)

gpu = models.BooleanField(default=False, null=False)

def __str__(self):
return self.title

Expand Down Expand Up @@ -201,6 +203,8 @@ class Job(models.Model):
blank=True,
)

gpu = models.BooleanField(default=False, null=False)

def __str__(self):
return f"<Job {self.id} | {self.status}>"

Expand Down
12 changes: 12 additions & 0 deletions gateway/api/ray.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,14 @@ def create_ray_cluster( # pylint: disable=too-many-branches
job_config.auto_scaling = settings.RAY_CLUSTER_WORKER_AUTO_SCALING
node_image = settings.RAY_NODE_IMAGE

# cpu job settings
node_selector_label = settings.RAY_CLUSTER_CPU_NODE_SELECTOR_LABEL
gpu_request = 0
# if gpu job, use gpu nodes and resources
if job.gpu:
node_selector_label = settings.RAY_CLUSTER_GPU_NODE_SELECTOR_LABEL
gpu_request = settings.LIMITS_GPU_PER_TASK

# if user specified image use specified image
function_data = user.username
if job.program.image is not None:
Expand All @@ -268,6 +276,8 @@ def create_ray_cluster( # pylint: disable=too-many-branches
"max_workers": job_config.max_workers,
"auto_scaling": job_config.auto_scaling,
"user": user.username,
"node_selector_label": node_selector_label,
"gpu_request": gpu_request,
}
)
cluster_data = yaml.safe_load(manifest)
Expand All @@ -292,6 +302,8 @@ def create_ray_cluster( # pylint: disable=too-many-branches
resource.owner = user
resource.title = cluster_name
resource.host = host
if job.gpu:
resource.gpu = True
resource.save()
else:
raise RuntimeError("Something went wrong during cluster creation")
Expand Down
13 changes: 12 additions & 1 deletion gateway/api/schedule.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

from api.models import Job, ComputeResource
from api.ray import submit_job, create_ray_cluster, kill_ray_cluster
from api.utils import generate_cluster_name
from api.utils import generate_cluster_name, create_gpujob_allowlist
from main import settings as config


Expand All @@ -26,6 +26,7 @@
def execute_job(job: Job) -> Job:
"""Executes program.
0. configure compute resource type
1. check if cluster exists
1.1 if not: create cluster
2. connect to cluster
Expand All @@ -41,6 +42,16 @@ def execute_job(job: Job) -> Job:

tracer = trace.get_tracer("scheduler.tracer")
with tracer.start_as_current_span("execute.job") as span:
# configure functions to use gpus
gpujobs = create_gpujob_allowlist()
if (
job.program.provider
and job.program.provider.name in gpujobs["gpu-functions"].keys()
):
logger.debug("Job %s will be run on GPU nodes", job.id)
job.gpu = True
job.save()

compute_resource = ComputeResource.objects.filter(
owner=job.author, active=True
).first()
Expand Down
20 changes: 20 additions & 0 deletions gateway/api/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -428,3 +428,23 @@ def sanitize_name(name: str):
sanitized_name += c
return sanitized_name
return name


def create_gpujob_allowlist():
"""
Create dictionary of jobs allowed to run on gpu nodes.
Sample format of json:
{ "gpu-functions": { "mockprovider": [ "my-first-pattern" ] } }
"""
try:
with open(settings.GATEWAY_GPU_JOBS_CONFIG, encoding="utf-8", mode="r") as f:
gpujobs = json.load(f)
except IOError as e:
logger.error("Unable to open gpu job config file: %s", e)
raise ValueError("Unable to open gpu job config file") from e
except ValueError as e:
logger.error("Unable to decode gpu job allowlist: %s", e)
raise ValueError("Unable to decode gpujob allowlist") from e

return gpujobs
3 changes: 3 additions & 0 deletions gateway/api/v1/gpu-jobs.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"gpu-functions": {}
}
16 changes: 16 additions & 0 deletions gateway/main/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,9 @@
# resources limitations
LIMITS_JOBS_PER_USER = int(os.environ.get("LIMITS_JOBS_PER_USER", "2"))
LIMITS_MAX_CLUSTERS = int(os.environ.get("LIMITS_MAX_CLUSTERS", "6"))
LIMITS_GPU_CLUSTERS = int(os.environ.get("LIMITS_MAX_GPU_CLUSTERS", "1"))
LIMITS_CPU_PER_TASK = int(os.environ.get("LIMITS_CPU_PER_TASK", "4"))
LIMITS_GPU_PER_TASK = int(os.environ.get("LIMITS_GPU_PER_TASK", "1"))
LIMITS_MEMORY_PER_TASK = int(os.environ.get("LIMITS_MEMORY_PER_TASK", "8"))

# ray cluster management
Expand Down Expand Up @@ -367,12 +369,26 @@
os.environ.get("RAY_CLUSTER_NO_DELETE_ON_COMPLETE", False)
)

RAY_CLUSTER_CPU_NODE_SELECTOR_LABEL = os.environ.get(
"RAY_CLUSTER_CPU_NODE_SELECTOR_LABEL",
"ibm-cloud.kubernetes.io/worker-pool-name: default",
)

RAY_CLUSTER_GPU_NODE_SELECTOR_LABEL = os.environ.get(
"RAY_CLUSTER_GPU_NODE_SELECTOR_LABEL",
"ibm-cloud.kubernetes.io/worker-pool-name: gpu-workers",
)

PROGRAM_TIMEOUT = int(os.environ.get("PROGRAM_TIMEOUT", "14"))

GATEWAY_ALLOWLIST_CONFIG = str(
os.environ.get("GATEWAY_ALLOWLIST_CONFIG", "api/v1/allowlist.json")
)

GATEWAY_GPU_JOBS_CONFIG = str(
os.environ.get("GATEWAY_GPU_JOBS_CONFIG", "api/v1/gpu-jobs.json")
)

# qiskit runtime
QISKIT_IBM_CHANNEL = os.environ.get("QISKIT_IBM_CHANNEL", "ibm_quantum")
QISKIT_IBM_URL = os.environ.get(
Expand Down

0 comments on commit 4f29082

Please sign in to comment.