Skip to content

Commit 9e6972a

Browse files
feat: Auto-inject kai-scheduler annotations and label (#2748)
Signed-off-by: Julien Mancuso <jmancuso@nvidia.com>
1 parent 700d345 commit 9e6972a

File tree

10 files changed

+628
-36
lines changed

10 files changed

+628
-36
lines changed

deploy/cloud/helm/platform/components/operator/templates/manager-rbac.yaml

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,13 @@ rules:
137137
- get
138138
- patch
139139
- update
140+
- apiGroups:
141+
- scheduling.run.ai
142+
resources:
143+
- queues
144+
verbs:
145+
- get
146+
- list
140147
- apiGroups:
141148
- apps
142149
resources:
@@ -475,6 +482,45 @@ roleRef:
475482
{{- end }}
476483
name: '{{ include "dynamo-operator.fullname" . }}-manager-role'
477484
subjects:
485+
- kind: ServiceAccount
486+
name: '{{ include "dynamo-operator.fullname" . }}-controller-manager'
487+
namespace: '{{ .Release.Namespace }}'
488+
---
489+
# ClusterRole for kai-scheduler queue access
490+
# This is always a ClusterRole since Queue resources are cluster-scoped
491+
apiVersion: rbac.authorization.k8s.io/v1
492+
kind: ClusterRole
493+
metadata:
494+
name: {{ include "dynamo-operator.fullname" . }}-queue-reader
495+
labels:
496+
app.kubernetes.io/component: rbac
497+
app.kubernetes.io/created-by: dynamo-operator
498+
app.kubernetes.io/part-of: dynamo-operator
499+
{{- include "dynamo-operator.labels" . | nindent 4 }}
500+
rules:
501+
- apiGroups:
502+
- scheduling.run.ai
503+
resources:
504+
- queues
505+
verbs:
506+
- get
507+
- list
508+
---
509+
# ClusterRoleBinding for kai-scheduler queue access
510+
apiVersion: rbac.authorization.k8s.io/v1
511+
kind: ClusterRoleBinding
512+
metadata:
513+
name: {{ include "dynamo-operator.fullname" . }}-queue-reader-binding
514+
labels:
515+
app.kubernetes.io/component: rbac
516+
app.kubernetes.io/created-by: dynamo-operator
517+
app.kubernetes.io/part-of: dynamo-operator
518+
{{- include "dynamo-operator.labels" . | nindent 4 }}
519+
roleRef:
520+
apiGroup: rbac.authorization.k8s.io
521+
kind: ClusterRole
522+
name: {{ include "dynamo-operator.fullname" . }}-queue-reader
523+
subjects:
478524
- kind: ServiceAccount
479525
name: '{{ include "dynamo-operator.fullname" . }}-controller-manager'
480526
namespace: '{{ .Release.Namespace }}'

deploy/cloud/operator/cmd/main.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,9 @@ func main() {
172172
LWS: commonController.LWSConfig{
173173
Enabled: false, // Will be set after LWS discovery
174174
},
175+
KaiScheduler: commonController.KaiSchedulerConfig{
176+
Enabled: false, // Will be set after Kai-scheduler discovery
177+
},
175178
EtcdAddress: etcdAddr,
176179
NatsAddress: natsAddr,
177180
IngressConfig: commonController.IngressConfig{
@@ -247,6 +250,11 @@ func main() {
247250
lwsEnabled := commonController.DetectLWSAvailability(mainCtx, mgr)
248251
ctrlConfig.LWS.Enabled = lwsEnabled
249252

253+
// Detect Kai-scheduler availability using discovery client
254+
setupLog.Info("Detecting Kai-scheduler availability...")
255+
kaiSchedulerEnabled := commonController.DetectKaiSchedulerAvailability(mainCtx, mgr)
256+
ctrlConfig.KaiScheduler.Enabled = kaiSchedulerEnabled
257+
250258
// Create etcd client
251259
cli, err := clientv3.New(clientv3.Config{
252260
Endpoints: []string{etcdAddr},

deploy/cloud/operator/config/rbac/role.yaml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,13 @@ rules:
185185
- get
186186
- patch
187187
- update
188+
- apiGroups:
189+
- scheduling.run.ai
190+
resources:
191+
- queues
192+
verbs:
193+
- get
194+
- list
188195
- apiGroups:
189196
- scheduling.volcano.sh
190197
resources:

deploy/cloud/operator/internal/consts/consts.go

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
package consts
22

3-
import "time"
3+
import (
4+
"time"
5+
6+
"k8s.io/apimachinery/pkg/runtime/schema"
7+
)
48

59
const (
610
HPACPUDefaultAverageUtilization = 80
@@ -55,6 +59,12 @@ const (
5559
DefaultSharedMemoryMountPath = "/dev/shm"
5660
DefaultSharedMemorySize = "8Gi"
5761

62+
// Kai-scheduler related constants
63+
KubeAnnotationKaiSchedulerQueue = "nvidia.com/kai-scheduler-queue" // User-provided annotation to specify queue name
64+
KubeLabelKaiSchedulerQueue = "kai.scheduler/queue" // Label injected into pods for kai-scheduler
65+
KaiSchedulerName = "kai-scheduler" // Scheduler name for kai-scheduler
66+
DefaultKaiSchedulerQueue = "dynamo" // Default queue name when none specified
67+
5868
// Grove multinode role suffixes
5969
GroveRoleSuffixLeader = "ldr"
6070
GroveRoleSuffixWorker = "wkr"
@@ -68,3 +78,25 @@ const (
6878
MultinodeDeploymentTypeGrove MultinodeDeploymentType = "grove"
6979
MultinodeDeploymentTypeLWS MultinodeDeploymentType = "lws"
7080
)
81+
82+
// GroupVersionResources for external APIs
83+
var (
84+
// Grove GroupVersionResources for scaling operations
85+
PodCliqueGVR = schema.GroupVersionResource{
86+
Group: "grove.io",
87+
Version: "v1alpha1",
88+
Resource: "podcliques",
89+
}
90+
PodCliqueScalingGroupGVR = schema.GroupVersionResource{
91+
Group: "grove.io",
92+
Version: "v1alpha1",
93+
Resource: "podcliquescalinggroups",
94+
}
95+
96+
// KAI-Scheduler GroupVersionResource for queue validation
97+
QueueGVR = schema.GroupVersionResource{
98+
Group: "scheduling.run.ai",
99+
Version: "v2",
100+
Resource: "queues",
101+
}
102+
)

deploy/cloud/operator/internal/controller/dynamographdeployment_controller.go

Lines changed: 3 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -55,20 +55,6 @@ const (
5555
PendingState State = "pending"
5656
)
5757

58-
var (
59-
// Grove GroupVersionResources for scaling operations
60-
podCliqueGVR = schema.GroupVersionResource{
61-
Group: "grove.io",
62-
Version: "v1alpha1",
63-
Resource: "podcliques",
64-
}
65-
podCliqueScalingGroupGVR = schema.GroupVersionResource{
66-
Group: "grove.io",
67-
Version: "v1alpha1",
68-
Resource: "podcliquescalinggroups",
69-
}
70-
)
71-
7258
type etcdStorage interface {
7359
DeleteKeys(ctx context.Context, prefix string) error
7460
}
@@ -88,6 +74,7 @@ type DynamoGraphDeploymentReconciler struct {
8874
// +kubebuilder:rbac:groups=grove.io,resources=podgangsets,verbs=get;list;watch;create;update;patch;delete
8975
// +kubebuilder:rbac:groups=grove.io,resources=podcliques/scale,verbs=get;update;patch
9076
// +kubebuilder:rbac:groups=grove.io,resources=podcliquescalinggroups/scale,verbs=get;update;patch
77+
// +kubebuilder:rbac:groups=scheduling.run.ai,resources=queues,verbs=get;list
9178

9279
// Reconcile is part of the main kubernetes reconciliation loop which aims to
9380
// move the current state of the cluster closer to the desired state.
@@ -201,9 +188,9 @@ func (r *DynamoGraphDeploymentReconciler) scaleGroveResource(ctx context.Context
201188
var gvr schema.GroupVersionResource
202189
switch resourceType {
203190
case "PodClique":
204-
gvr = podCliqueGVR
191+
gvr = consts.PodCliqueGVR
205192
case "PodCliqueScalingGroup":
206-
gvr = podCliqueScalingGroupGVR
193+
gvr = consts.PodCliqueScalingGroupGVR
207194
default:
208195
return fmt.Errorf("unsupported Grove resource type: %s", resourceType)
209196
}

deploy/cloud/operator/internal/controller_common/predicate.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,11 +42,17 @@ type LWSConfig struct {
4242
Enabled bool
4343
}
4444

45+
type KaiSchedulerConfig struct {
46+
// Enabled is automatically determined by checking if Kai-scheduler CRDs are installed in the cluster
47+
Enabled bool
48+
}
49+
4550
type Config struct {
4651
// Enable resources filtering, only the resources belonging to the given namespace will be handled.
4752
RestrictedNamespace string
4853
Grove GroveConfig
4954
LWS LWSConfig
55+
KaiScheduler KaiSchedulerConfig
5056
EtcdAddress string
5157
NatsAddress string
5258
IngressConfig IngressConfig
@@ -75,6 +81,12 @@ func DetectLWSAvailability(ctx context.Context, mgr ctrl.Manager) bool {
7581
return detectAPIGroupAvailability(ctx, mgr, "leaderworkerset.x-k8s.io")
7682
}
7783

84+
// DetectKaiSchedulerAvailability checks if Kai-scheduler is available by checking if the scheduling.run.ai API group is registered
85+
// This approach uses the discovery client which is simpler and more reliable
86+
func DetectKaiSchedulerAvailability(ctx context.Context, mgr ctrl.Manager) bool {
87+
return detectAPIGroupAvailability(ctx, mgr, "scheduling.run.ai")
88+
}
89+
7890
// detectAPIGroupAvailability checks if a specific API group is registered in the cluster
7991
func detectAPIGroupAvailability(ctx context.Context, mgr ctrl.Manager, groupName string) bool {
8092
logger := log.FromContext(ctx)
@@ -107,6 +119,7 @@ func detectAPIGroupAvailability(ctx context.Context, mgr ctrl.Manager, groupName
107119
logger.Info("API group not available", "group", groupName)
108120
return false
109121
}
122+
110123
func EphemeralDeploymentEventFilter(config Config) predicate.Predicate {
111124
return predicate.NewPredicateFuncs(func(o client.Object) bool {
112125
l := log.FromContext(context.Background())

deploy/cloud/operator/internal/dynamo/graph.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -882,6 +882,17 @@ func GenerateGrovePodGangSet(
882882
if controllerConfig.Grove.TerminationDelay > 0 {
883883
gangSet.Spec.Template.TerminationDelay = &metav1.Duration{Duration: controllerConfig.Grove.TerminationDelay}
884884
}
885+
886+
// Validate kai-scheduler queue once if kai-scheduler is enabled
887+
var validatedQueueName string
888+
if controllerConfig.Grove.Enabled && controllerConfig.KaiScheduler.Enabled {
889+
var err error
890+
validatedQueueName, err = DetermineKaiSchedulerQueue(ctx, dynamoDeployment.Annotations)
891+
if err != nil {
892+
return nil, fmt.Errorf("failed to determine kai-scheduler queue: %w", err)
893+
}
894+
}
895+
885896
dynamoNamespace, err := getDynamoNamespace(dynamoDeployment)
886897
if err != nil {
887898
return nil, fmt.Errorf("failed to get the graph dynamo namespace: %w", err)
@@ -935,6 +946,10 @@ func GenerateGrovePodGangSet(
935946
return nil, fmt.Errorf("failed to generate annotations: %w", err)
936947
}
937948
clique.Annotations = annotations
949+
950+
// Inject kai-scheduler settings if enabled
951+
injectKaiSchedulerIfEnabled(clique, controllerConfig, validatedQueueName)
952+
938953
gangSet.Spec.Template.Cliques = append(gangSet.Spec.Template.Cliques, clique)
939954
cliqueNames = append(cliqueNames, strings.ToLower(r.Name))
940955
}

deploy/cloud/operator/internal/dynamo/grove.go

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,10 @@ import (
1414

1515
nvidiacomv1alpha1 "github.com/ai-dynamo/dynamo/deploy/cloud/operator/api/v1alpha1"
1616
commonconsts "github.com/ai-dynamo/dynamo/deploy/cloud/operator/internal/consts"
17+
"github.com/ai-dynamo/dynamo/deploy/cloud/operator/internal/controller_common"
18+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
19+
"k8s.io/client-go/dynamic"
20+
ctrl "sigs.k8s.io/controller-runtime"
1721
)
1822

1923
type GroveMultinodeDeployer struct {
@@ -130,3 +134,95 @@ func checkPCSGReady(ctx context.Context, client client.Client, resourceName, nam
130134

131135
return true, ""
132136
}
137+
138+
// resolveKaiSchedulerQueueName extracts the queue name from annotations or returns default
139+
// This is the shared logic between DetermineKaiSchedulerQueue and ResolveKaiSchedulerQueue
140+
func resolveKaiSchedulerQueueName(annotations map[string]string) string {
141+
queueName := commonconsts.DefaultKaiSchedulerQueue
142+
if annotations != nil {
143+
if annotationQueue, exists := annotations[commonconsts.KubeAnnotationKaiSchedulerQueue]; exists && strings.TrimSpace(annotationQueue) != "" {
144+
queueName = strings.TrimSpace(annotationQueue)
145+
}
146+
}
147+
return queueName
148+
}
149+
150+
// ensureQueueExists validates that a Queue resource with the given name exists in the cluster
151+
// Returns an error if the queue doesn't exist or if validation fails
152+
func ensureQueueExists(ctx context.Context, dynamicClient dynamic.Interface, queueName string) error {
153+
logger := log.FromContext(ctx)
154+
155+
// Try to get the queue resource using the predefined GVR
156+
_, err := dynamicClient.Resource(commonconsts.QueueGVR).Get(ctx, queueName, metav1.GetOptions{})
157+
if err != nil {
158+
if errors.IsNotFound(err) {
159+
logger.Error(err, "Queue not found", "queueName", queueName)
160+
return fmt.Errorf("queue '%s' not found in cluster. Ensure the queue exists before using kai-scheduler", queueName)
161+
}
162+
logger.Error(err, "Failed to validate queue", "queueName", queueName)
163+
return fmt.Errorf("failed to validate queue '%s': %w", queueName, err)
164+
}
165+
166+
logger.Info("Queue validation successful", "queueName", queueName)
167+
return nil
168+
}
169+
170+
// DetermineKaiSchedulerQueue determines the queue name for kai-scheduler from deployment annotations or returns default
171+
// Also validates that the queue exists in the cluster
172+
func DetermineKaiSchedulerQueue(ctx context.Context, annotations map[string]string) (string, error) {
173+
// Get the queue name from annotation or use default
174+
queueName := resolveKaiSchedulerQueueName(annotations)
175+
176+
// Create a dynamic client for CRD validation (Queue CRD might not be in the standard client scheme)
177+
cfg, err := ctrl.GetConfig()
178+
if err != nil {
179+
return "", fmt.Errorf("failed to get kubernetes config for queue validation: %w", err)
180+
}
181+
182+
dynamicClient, err := dynamic.NewForConfig(cfg)
183+
if err != nil {
184+
return "", fmt.Errorf("failed to create dynamic client for queue validation: %w", err)
185+
}
186+
187+
// Validate that the queue exists
188+
if err := ensureQueueExists(ctx, dynamicClient, queueName); err != nil {
189+
return "", fmt.Errorf("kai-scheduler queue validation failed: %w", err)
190+
}
191+
192+
return queueName, nil
193+
}
194+
195+
// ResolveKaiSchedulerQueue determines the queue name for kai-scheduler from deployment annotations or returns default
196+
// Does NOT validate - use DetermineKaiSchedulerQueue for validation
197+
func ResolveKaiSchedulerQueue(annotations map[string]string) string {
198+
return resolveKaiSchedulerQueueName(annotations)
199+
}
200+
201+
// injectKaiSchedulerIfEnabled injects kai-scheduler settings into a clique if kai-scheduler is enabled and grove is enabled
202+
func injectKaiSchedulerIfEnabled(
203+
clique *grovev1alpha1.PodCliqueTemplateSpec,
204+
controllerConfig controller_common.Config,
205+
validatedQueueName string,
206+
) {
207+
// Only proceed if grove is enabled, kai-scheduler is enabled, and no manual schedulerName is set
208+
if !controllerConfig.Grove.Enabled || !controllerConfig.KaiScheduler.Enabled {
209+
return
210+
}
211+
212+
// Check if user has manually set schedulerName - if so, respect their choice
213+
if clique.Spec.PodSpec.SchedulerName != "" && clique.Spec.PodSpec.SchedulerName != commonconsts.KaiSchedulerName {
214+
return
215+
}
216+
217+
// Use the pre-validated queue name
218+
queueName := validatedQueueName
219+
220+
// Inject schedulerName
221+
clique.Spec.PodSpec.SchedulerName = commonconsts.KaiSchedulerName
222+
223+
// Inject queue label
224+
if clique.Labels == nil {
225+
clique.Labels = make(map[string]string)
226+
}
227+
clique.Labels[commonconsts.KubeLabelKaiSchedulerQueue] = queueName
228+
}

0 commit comments

Comments
 (0)