diff --git a/Dockerfile b/Dockerfile index a9bbd1268..423017d29 100644 --- a/Dockerfile +++ b/Dockerfile @@ -16,6 +16,7 @@ COPY main.go main.go COPY api/ api/ COPY pkg/controllers/ pkg/controllers/ COPY pkg/util/ pkg/util/ +COPY pkg/webhooks pkg/webhooks # Build # the GOARCH has not a default value to allow the binary be built according to the host where the command diff --git a/api/jobset/v1alpha2/jobset_webhook.go b/api/jobset/v1alpha2/jobset_webhook.go index 01c682666..dd4cb5654 100644 --- a/api/jobset/v1alpha2/jobset_webhook.go +++ b/api/jobset/v1alpha2/jobset_webhook.go @@ -29,7 +29,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/webhook/admission" "sigs.k8s.io/jobset/pkg/util/collections" - "sigs.k8s.io/jobset/pkg/util/names" + shared "sigs.k8s.io/jobset/pkg/util/shared" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" @@ -109,7 +109,7 @@ func (js *JobSet) ValidateCreate() (admission.Warnings, error) { } // Check that the generated job names for this replicated job will be DNS 1035 compliant. // Use the largest job index as it will have the longest name. - testJobName := names.GenJobName(js.Name, rjob.Name, int(rjob.Replicas-1)) + testJobName := shared.GenJobName(js.Name, rjob.Name, int(rjob.Replicas-1)) for _, errMessage := range validation.IsDNS1035Label(testJobName) { allErrs = append(allErrs, fmt.Errorf(errMessage)) } diff --git a/config/components/manager/manager.yaml b/config/components/manager/manager.yaml index b4626ec60..de6a57a31 100644 --- a/config/components/manager/manager.yaml +++ b/config/components/manager/manager.yaml @@ -92,11 +92,8 @@ spec: # TODO(user): Configure the resources accordingly based on the project requirements. # More info: https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/ resources: - limits: - cpu: 500m - memory: 128Mi requests: - cpu: 10m - memory: 64Mi + cpu: 2 + memory: 512Mi serviceAccountName: controller-manager terminationGracePeriodSeconds: 10 diff --git a/config/components/rbac/role.yaml b/config/components/rbac/role.yaml index 7444f9bb4..31e1bd4ea 100644 --- a/config/components/rbac/role.yaml +++ b/config/components/rbac/role.yaml @@ -60,6 +60,26 @@ rules: - get - patch - update +- apiGroups: + - "" + resources: + - nodes + verbs: + - get + - list + - watch +- apiGroups: + - "" + resources: + - pods + verbs: + - create + - delete + - get + - list + - patch + - update + - watch - apiGroups: - "" resources: diff --git a/config/components/webhook/manifests.yaml b/config/components/webhook/manifests.yaml index 08846173c..a0a70815f 100644 --- a/config/components/webhook/manifests.yaml +++ b/config/components/webhook/manifests.yaml @@ -24,6 +24,25 @@ webhooks: resources: - jobsets sideEffects: None +- admissionReviewVersions: + - v1 + clientConfig: + service: + name: webhook-service + namespace: system + path: /mutate--v1-pod + failurePolicy: Fail + name: mpod.kb.io + rules: + - apiGroups: + - "" + apiVersions: + - v1 + operations: + - CREATE + resources: + - pods + sideEffects: None --- apiVersion: admissionregistration.k8s.io/v1 kind: ValidatingWebhookConfiguration @@ -50,3 +69,22 @@ webhooks: resources: - jobsets sideEffects: None +- admissionReviewVersions: + - v1 + clientConfig: + service: + name: webhook-service + namespace: system + path: /validate--v1-pod + failurePolicy: Fail + name: vpod.kb.io + rules: + - apiGroups: + - "" + apiVersions: + - v1 + operations: + - CREATE + resources: + - pods + sideEffects: None diff --git a/examples/simple/exclusive-placement.yaml b/examples/simple/exclusive-placement.yaml new file mode 100644 index 000000000..d4af78e7c --- /dev/null +++ b/examples/simple/exclusive-placement.yaml @@ -0,0 +1,26 @@ +apiVersion: jobset.x-k8s.io/v1alpha2 +kind: JobSet +metadata: + name: exclusive-placement + annotations: + alpha.jobset.sigs.k8s.io/exclusive-topology: cloud.google.com/gke-nodepool # 1:1 job replica to node pool assignment +spec: + failurePolicy: + maxRestarts: 3 + replicatedJobs: + - name: workers + replicas: 3 + template: + spec: + parallelism: 3 + completions: 3 + backoffLimit: 10 + template: + spec: + containers: + - name: sleep + image: busybox + command: + - sleep + args: + - 1000s diff --git a/main.go b/main.go index 3d3b58a27..ee9d8c2cd 100644 --- a/main.go +++ b/main.go @@ -36,6 +36,7 @@ import ( jobset "sigs.k8s.io/jobset/api/jobset/v1alpha2" "sigs.k8s.io/jobset/pkg/controllers" "sigs.k8s.io/jobset/pkg/util/cert" + "sigs.k8s.io/jobset/pkg/webhooks" //+kubebuilder:scaffold:imports ) @@ -68,7 +69,11 @@ func main() { ctrl.SetLogger(zap.New(zap.UseFlagOptions(&opts))) - mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{ + kubeConfig := ctrl.GetConfigOrDie() + kubeConfig.QPS = 500 + kubeConfig.Burst = 500 + + mgr, err := ctrl.NewManager(kubeConfig, ctrl.Options{ Scheme: scheme, Metrics: server.Options{ BindAddress: metricsAddr, @@ -104,8 +109,12 @@ func main() { } ctx := ctrl.SetupSignalHandler() - if err := controllers.SetupIndexes(ctx, mgr.GetFieldIndexer()); err != nil { - setupLog.Error(err, "unable to setup indexes") + if err := controllers.SetupJobSetIndexes(ctx, mgr.GetFieldIndexer()); err != nil { + setupLog.Error(err, "unable to setup jobset reconciler indexes") + os.Exit(1) + } + if err := controllers.SetupPodReconcilerIndexes(ctx, mgr.GetFieldIndexer()); err != nil { + setupLog.Error(err, "unable to setup pod reconciler indexes") os.Exit(1) } @@ -130,13 +139,30 @@ func setupControllers(mgr ctrl.Manager, certsReady chan struct{}) { <-certsReady setupLog.Info("certs ready") + // Set up JobSet controller. jobSetController := controllers.NewJobSetReconciler(mgr.GetClient(), mgr.GetScheme(), mgr.GetEventRecorderFor("jobset")) if err := jobSetController.SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "JobSet") os.Exit(1) } + + // Set up pod reconciler. + podController := controllers.NewPodReconciler(mgr.GetClient(), mgr.GetScheme(), mgr.GetEventRecorderFor("pod")) + if err := podController.SetupWithManager(mgr); err != nil { + setupLog.Error(err, "unable to create controller", "controller", "Pod") + os.Exit(1) + } + + // Set up JobSet validating/defaulting webhook. if err := (&jobset.JobSet{}).SetupWebhookWithManager(mgr); err != nil { - setupLog.Error(err, "unable to create webhook", "webhook", "JobSet") + setupLog.Error(err, "unable to create validation/defaulting webhook", "webhook", "JobSet") + os.Exit(1) + } + + // Set up pod mutating and admission webhook. + podWebhook := webhooks.NewPodWebhook(mgr) + if err := podWebhook.SetupWebhookWithManager(mgr); err != nil { + setupLog.Error(err, "unable to create mutating webhook", "webhook", "Pod") os.Exit(1) } //+kubebuilder:scaffold:builder diff --git a/pkg/controllers/jobset_controller.go b/pkg/controllers/jobset_controller.go index 27f004f5d..f0808c6d5 100644 --- a/pkg/controllers/jobset_controller.go +++ b/pkg/controllers/jobset_controller.go @@ -37,7 +37,7 @@ import ( jobset "sigs.k8s.io/jobset/api/jobset/v1alpha2" "sigs.k8s.io/jobset/pkg/util/collections" - "sigs.k8s.io/jobset/pkg/util/names" + shared "sigs.k8s.io/jobset/pkg/util/shared" ) const ( @@ -176,7 +176,7 @@ func (r *JobSetReconciler) SetupWithManager(mgr ctrl.Manager) error { Complete(r) } -func SetupIndexes(ctx context.Context, indexer client.FieldIndexer) error { +func SetupJobSetIndexes(ctx context.Context, indexer client.FieldIndexer) error { return indexer.IndexField(ctx, &batchv1.Job{}, jobOwnerKey, func(obj client.Object) []string { o := obj.(*batchv1.Job) owner := metav1.GetControllerOf(o) @@ -232,6 +232,7 @@ func (r *JobSetReconciler) getChildJobs(ctx context.Context, js *jobset.JobSet) } return &ownedJobs, nil } + func (r *JobSetReconciler) calculateAndUpdateReplicatedJobsStatuses(ctx context.Context, js *jobset.JobSet, jobs *childJobs) error { status := r.calculateReplicatedJobStatuses(ctx, js, jobs) // Check if status ReplicatedJobsStatus has changed @@ -572,7 +573,7 @@ func updateCondition(js *jobset.JobSet, condition metav1.Condition) bool { func constructJobsFromTemplate(js *jobset.JobSet, rjob *jobset.ReplicatedJob, ownedJobs *childJobs) ([]*batchv1.Job, error) { var jobs []*batchv1.Job for jobIdx := 0; jobIdx < int(rjob.Replicas); jobIdx++ { - jobName := names.GenJobName(js.Name, rjob.Name, jobIdx) + jobName := shared.GenJobName(js.Name, rjob.Name, jobIdx) if create := shouldCreateJob(jobName, ownedJobs); !create { continue } @@ -590,7 +591,7 @@ func constructJob(js *jobset.JobSet, rjob *jobset.ReplicatedJob, jobIdx int) (*b ObjectMeta: metav1.ObjectMeta{ Labels: collections.CloneMap(rjob.Template.Labels), Annotations: collections.CloneMap(rjob.Template.Annotations), - Name: names.GenJobName(js.Name, rjob.Name, jobIdx), + Name: shared.GenJobName(js.Name, rjob.Name, jobIdx), Namespace: js.Namespace, }, Spec: *rjob.Template.Spec.DeepCopy(), @@ -606,17 +607,17 @@ func constructJob(js *jobset.JobSet, rjob *jobset.ReplicatedJob, jobIdx int) (*b } // If this job should be exclusive per topology, configure the scheduling constraints accordingly. - if topologyDomain, ok := js.Annotations[jobset.ExclusiveKey]; ok { + if _, ok := js.Annotations[jobset.ExclusiveKey]; ok { // If user has set the nodeSelectorStrategy annotation flag, add the job name label as a // nodeSelector, and add a toleration for the no schedule taint. // The node label and node taint must be added separately by a user/script. if _, exists := js.Annotations[jobset.NodeSelectorStrategyKey]; exists { - addNodeSelector(job) + addNamespacedJobNodeSelector(job) addTaintToleration(job) - } else { - // Otherwise, default to using exclusive pod affinities/anti-affinities strategy. - setExclusiveAffinities(job, topologyDomain) } + // Otherwise, we fall back to the default strategy of the pod webhook assigning exclusive affinities + // to the leader pod, preventing follower pod creation until leader pod is scheduled, then + // assigning the follower pods to the same node as the leader pods. } // if Suspend is set, then we assume all jobs will be suspended also. @@ -626,60 +627,6 @@ func constructJob(js *jobset.JobSet, rjob *jobset.ReplicatedJob, jobIdx int) (*b return job, nil } -// Appends pod affinity/anti-affinity terms to the job pod template spec, -// ensuring that exclusively one job runs per topology domain and that all pods -// from each job land on the same topology domain. -func setExclusiveAffinities(job *batchv1.Job, topologyKey string) { - if job.Spec.Template.Spec.Affinity == nil { - job.Spec.Template.Spec.Affinity = &corev1.Affinity{} - } - if job.Spec.Template.Spec.Affinity.PodAffinity == nil { - job.Spec.Template.Spec.Affinity.PodAffinity = &corev1.PodAffinity{} - } - if job.Spec.Template.Spec.Affinity.PodAntiAffinity == nil { - job.Spec.Template.Spec.Affinity.PodAntiAffinity = &corev1.PodAntiAffinity{} - } - - // Pod affinity ensures the pods of this job land on the same topology domain. - job.Spec.Template.Spec.Affinity.PodAffinity.RequiredDuringSchedulingIgnoredDuringExecution = append(job.Spec.Template.Spec.Affinity.PodAffinity.RequiredDuringSchedulingIgnoredDuringExecution, - corev1.PodAffinityTerm{ - LabelSelector: &metav1.LabelSelector{MatchExpressions: []metav1.LabelSelectorRequirement{ - { - Key: jobset.JobKey, - Operator: metav1.LabelSelectorOpIn, - Values: []string{job.Labels[jobset.JobKey]}, - }, - }}, - TopologyKey: topologyKey, - NamespaceSelector: &metav1.LabelSelector{}, - }) - - // Pod anti-affinity ensures exclusively this job lands on the topology, preventing multiple jobs per topology domain. - job.Spec.Template.Spec.Affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution = append(job.Spec.Template.Spec.Affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution, - corev1.PodAffinityTerm{ - LabelSelector: &metav1.LabelSelector{MatchExpressions: []metav1.LabelSelectorRequirement{ - { - Key: jobset.JobKey, - Operator: metav1.LabelSelectorOpExists, - }, - { - Key: jobset.JobKey, - Operator: metav1.LabelSelectorOpNotIn, - Values: []string{job.Labels[jobset.JobKey]}, - }, - }}, - TopologyKey: topologyKey, - NamespaceSelector: &metav1.LabelSelector{}, - }) -} - -func addNodeSelector(job *batchv1.Job) { - if job.Spec.Template.Spec.NodeSelector == nil { - job.Spec.Template.Spec.NodeSelector = make(map[string]string) - } - job.Spec.Template.Spec.NodeSelector[jobset.NamespacedJobKey] = namespacedJobName(job.Namespace, job.Name) -} - func addTaintToleration(job *batchv1.Job) { job.Spec.Template.Spec.Tolerations = append(job.Spec.Template.Spec.Tolerations, corev1.Toleration{ @@ -704,7 +651,7 @@ func shouldCreateJob(jobName string, ownedJobs *childJobs) bool { } func labelAndAnnotateObject(obj metav1.Object, js *jobset.JobSet, rjob *jobset.ReplicatedJob, jobIdx int) { - jobName := names.GenJobName(js.Name, rjob.Name, jobIdx) + jobName := shared.GenJobName(js.Name, rjob.Name, jobIdx) labels := collections.CloneMap(obj.GetLabels()) labels[jobset.JobSetNameKey] = js.Name labels[jobset.ReplicatedJobNameKey] = rjob.Name @@ -716,8 +663,16 @@ func labelAndAnnotateObject(obj metav1.Object, js *jobset.JobSet, rjob *jobset.R annotations := collections.CloneMap(obj.GetAnnotations()) annotations[jobset.JobSetNameKey] = js.Name annotations[jobset.ReplicatedJobNameKey] = rjob.Name + annotations[RestartsKey] = strconv.Itoa(int(js.Status.Restarts)) annotations[jobset.ReplicatedJobReplicas] = strconv.Itoa(int(rjob.Replicas)) annotations[jobset.JobIndexKey] = strconv.Itoa(jobIdx) + annotations[jobset.JobKey] = jobHashKey(js.Namespace, jobName) + + // Only set exclusive key label/annotation on jobs/pods if the parent JobSet + // is using exclusive placement. + if _, exists := js.Annotations[jobset.ExclusiveKey]; exists { + annotations[jobset.ExclusiveKey] = js.Annotations[jobset.ExclusiveKey] + } obj.SetLabels(labels) obj.SetAnnotations(annotations) @@ -740,9 +695,11 @@ func GenSubdomain(js *jobset.JobSet) string { return js.Name } -// jobHashKey returns the SHA1 hash of the namespaced job name (i.e. /). -func jobHashKey(ns, jobName string) string { - return sha1Hash(fmt.Sprintf("%s/%s", ns, jobName)) +func addNamespacedJobNodeSelector(job *batchv1.Job) { + if job.Spec.Template.Spec.NodeSelector == nil { + job.Spec.Template.Spec.NodeSelector = make(map[string]string) + } + job.Spec.Template.Spec.NodeSelector[jobset.NamespacedJobKey] = namespacedJobName(job.Namespace, job.Name) } // Human readable namespaced job name. We must use '_' to separate namespace and job instead of '/' @@ -751,6 +708,18 @@ func namespacedJobName(ns, jobName string) string { return fmt.Sprintf("%s_%s", ns, jobName) } +// jobHashKey returns the SHA1 hash of the namespaced job name (i.e. /). +func jobHashKey(ns string, jobName string) string { + return sha1Hash(fmt.Sprintf("%s/%s", ns, jobName)) +} + +// sha1Hash accepts an input string and returns the 40 character SHA1 hash digest of the input string. +func sha1Hash(s string) string { + h := sha1.New() + h.Write([]byte(s)) + return hex.EncodeToString(h.Sum(nil)) +} + func jobSetFinished(js *jobset.JobSet) bool { for _, c := range js.Status.Conditions { if (c.Type == string(jobset.JobSetCompleted) || c.Type == string(jobset.JobSetFailed)) && c.Status == metav1.ConditionTrue { @@ -796,10 +765,3 @@ func numJobsExpectedToSucceed(js *jobset.JobSet) int { } return total } - -// sha1Hash accepts an input string and returns the 40 character SHA1 hash digest of the input string. -func sha1Hash(s string) string { - h := sha1.New() - h.Write([]byte(s)) - return hex.EncodeToString(h.Sum(nil)) -} diff --git a/pkg/controllers/jobset_controller_test.go b/pkg/controllers/jobset_controller_test.go index d191880ef..91155b50b 100644 --- a/pkg/controllers/jobset_controller_test.go +++ b/pkg/controllers/jobset_controller_test.go @@ -111,174 +111,6 @@ func TestIsJobFinished(t *testing.T) { } } -func TestSetExclusiveAffinities(t *testing.T) { - var ( - topologyKey = "test-topology-key" - jobName = "test-job" - ns = "default" - ) - tests := []struct { - name string - job *batchv1.Job - wantAffinity corev1.Affinity - }{ - { - name: "no existing affinities", - job: testutils.MakeJob(jobName, ns). - JobLabels(map[string]string{ - jobset.JobKey: jobHashKey(ns, jobName), - }).Obj(), - wantAffinity: corev1.Affinity{ - PodAffinity: &corev1.PodAffinity{ - RequiredDuringSchedulingIgnoredDuringExecution: []corev1.PodAffinityTerm{ - { - LabelSelector: &metav1.LabelSelector{MatchExpressions: []metav1.LabelSelectorRequirement{ - { - Key: jobset.JobKey, - Operator: metav1.LabelSelectorOpIn, - Values: []string{jobHashKey(ns, jobName)}, - }, - }}, - TopologyKey: topologyKey, - NamespaceSelector: &metav1.LabelSelector{}, - }, - }, - }, - PodAntiAffinity: &corev1.PodAntiAffinity{ - RequiredDuringSchedulingIgnoredDuringExecution: []corev1.PodAffinityTerm{ - { - LabelSelector: &metav1.LabelSelector{MatchExpressions: []metav1.LabelSelectorRequirement{ - { - Key: jobset.JobKey, - Operator: metav1.LabelSelectorOpExists, - }, - { - Key: jobset.JobKey, - Operator: metav1.LabelSelectorOpNotIn, - Values: []string{jobHashKey(ns, jobName)}, - }, - }}, - TopologyKey: topologyKey, - NamespaceSelector: &metav1.LabelSelector{}, - }, - }, - }, - }, - }, - { - name: "existing affinities should be appended to, not replaced", - job: testutils.MakeJob(jobName, ns). - JobLabels(map[string]string{ - jobset.JobKey: jobHashKey(ns, jobName), - }).Affinity(&corev1.Affinity{ - PodAffinity: &corev1.PodAffinity{ - RequiredDuringSchedulingIgnoredDuringExecution: []corev1.PodAffinityTerm{ - { - LabelSelector: &metav1.LabelSelector{MatchExpressions: []metav1.LabelSelectorRequirement{ - { - Key: "label-foo", - Operator: metav1.LabelSelectorOpIn, - Values: []string{"value-foo"}, - }, - }}, - TopologyKey: "topology.kubernetes.io/zone", - NamespaceSelector: &metav1.LabelSelector{}, - }, - }, - }, - PodAntiAffinity: &corev1.PodAntiAffinity{ - RequiredDuringSchedulingIgnoredDuringExecution: []corev1.PodAffinityTerm{ - { - LabelSelector: &metav1.LabelSelector{MatchExpressions: []metav1.LabelSelectorRequirement{ - { - Key: "label-bar", - Operator: metav1.LabelSelectorOpNotIn, - Values: []string{"value-bar"}, - }, - }}, - TopologyKey: "topology.kubernetes.io/zone", - NamespaceSelector: &metav1.LabelSelector{}, - }, - }, - }, - }).Obj(), - wantAffinity: corev1.Affinity{ - PodAffinity: &corev1.PodAffinity{ - RequiredDuringSchedulingIgnoredDuringExecution: []corev1.PodAffinityTerm{ - { - LabelSelector: &metav1.LabelSelector{MatchExpressions: []metav1.LabelSelectorRequirement{ - { - Key: "label-foo", - Operator: metav1.LabelSelectorOpIn, - Values: []string{"value-foo"}, - }, - }}, - TopologyKey: "topology.kubernetes.io/zone", - NamespaceSelector: &metav1.LabelSelector{}, - }, - { - LabelSelector: &metav1.LabelSelector{MatchExpressions: []metav1.LabelSelectorRequirement{ - { - Key: jobset.JobKey, - Operator: metav1.LabelSelectorOpIn, - Values: []string{jobHashKey(ns, jobName)}, - }, - }}, - TopologyKey: topologyKey, - NamespaceSelector: &metav1.LabelSelector{}, - }, - }, - }, - PodAntiAffinity: &corev1.PodAntiAffinity{ - RequiredDuringSchedulingIgnoredDuringExecution: []corev1.PodAffinityTerm{ - { - LabelSelector: &metav1.LabelSelector{MatchExpressions: []metav1.LabelSelectorRequirement{ - { - Key: "label-bar", - Operator: metav1.LabelSelectorOpNotIn, - Values: []string{"value-bar"}, - }, - }}, - TopologyKey: "topology.kubernetes.io/zone", - NamespaceSelector: &metav1.LabelSelector{}, - }, - { - LabelSelector: &metav1.LabelSelector{MatchExpressions: []metav1.LabelSelectorRequirement{ - { - Key: jobset.JobKey, - Operator: metav1.LabelSelectorOpExists, - }, - { - Key: jobset.JobKey, - Operator: metav1.LabelSelectorOpNotIn, - Values: []string{jobHashKey(ns, jobName)}, - }, - }}, - TopologyKey: topologyKey, - NamespaceSelector: &metav1.LabelSelector{}, - }, - }, - }, - }, - }, - } - - for _, tc := range tests { - t.Run(tc.name, func(t *testing.T) { - setExclusiveAffinities(tc.job, topologyKey) - // Check pod affinities. - if diff := cmp.Diff(tc.wantAffinity.PodAffinity, tc.job.Spec.Template.Spec.Affinity.PodAffinity); diff != "" { - t.Errorf("unexpected diff in pod affinity (-want/+got): %s", diff) - } - // Check pod anti-affinities. - if diff := cmp.Diff(tc.wantAffinity.PodAntiAffinity, tc.job.Spec.Template.Spec.Affinity.PodAntiAffinity); diff != "" { - t.Errorf("unexpected diff in pod anti-affinity (-want/+got): %s", diff) - } - - }) - } -} - func TestConstructJobsFromTemplate(t *testing.T) { var ( jobSetName = "test-jobset" @@ -486,44 +318,9 @@ func TestConstructJobsFromTemplate(t *testing.T) { jobName: "test-jobset-replicated-job-0", ns: ns, replicas: 1, - jobIdx: 0}). - Suspend(false). - Affinity(&corev1.Affinity{ - PodAffinity: &corev1.PodAffinity{ - RequiredDuringSchedulingIgnoredDuringExecution: []corev1.PodAffinityTerm{ - { - LabelSelector: &metav1.LabelSelector{MatchExpressions: []metav1.LabelSelectorRequirement{ - { - Key: jobset.JobKey, - Operator: metav1.LabelSelectorOpIn, - Values: []string{jobHashKey(ns, "test-jobset-replicated-job-0")}, - }, - }}, - TopologyKey: topologyDomain, - NamespaceSelector: &metav1.LabelSelector{}, - }, - }, - }, - PodAntiAffinity: &corev1.PodAntiAffinity{ - RequiredDuringSchedulingIgnoredDuringExecution: []corev1.PodAffinityTerm{ - { - LabelSelector: &metav1.LabelSelector{MatchExpressions: []metav1.LabelSelectorRequirement{ - { - Key: jobset.JobKey, - Operator: metav1.LabelSelectorOpExists, - }, - { - Key: jobset.JobKey, - Operator: metav1.LabelSelectorOpNotIn, - Values: []string{jobHashKey(ns, "test-jobset-replicated-job-0")}, - }, - }}, - TopologyKey: topologyDomain, - NamespaceSelector: &metav1.LabelSelector{}, - }, - }, - }, - }).Obj(), + jobIdx: 0, + topology: topologyDomain}). + Suspend(false).Obj(), }, }, { @@ -623,7 +420,8 @@ func TestConstructJobsFromTemplate(t *testing.T) { jobName: "test-jobset-replicated-job-0", ns: ns, replicas: 1, - jobIdx: 0}). + jobIdx: 0, + topology: "cloud.google.com/gke-nodepool"}). Suspend(false). Subdomain(jobSetName). NodeSelector(map[string]string{ @@ -1060,37 +858,34 @@ type makeJobArgs struct { replicas int jobIdx int restarts int + topology string } func makeJob(args *makeJobArgs) *testutils.JobWrapper { + labels := map[string]string{ + jobset.JobSetNameKey: args.jobSetName, + jobset.ReplicatedJobNameKey: args.replicatedJobName, + jobset.ReplicatedJobReplicas: strconv.Itoa(args.replicas), + jobset.JobIndexKey: strconv.Itoa(args.jobIdx), + RestartsKey: strconv.Itoa(args.restarts), + jobset.JobKey: jobHashKey(args.ns, args.jobName), + } + annotations := map[string]string{ + jobset.JobSetNameKey: args.jobSetName, + jobset.ReplicatedJobNameKey: args.replicatedJobName, + jobset.ReplicatedJobReplicas: strconv.Itoa(args.replicas), + jobset.JobIndexKey: strconv.Itoa(args.jobIdx), + RestartsKey: strconv.Itoa(args.restarts), + jobset.JobKey: jobHashKey(args.ns, args.jobName), + } + // Only set exclusive key if we are using exclusive placement per topology. + if args.topology != "" { + annotations[jobset.ExclusiveKey] = args.topology + } jobWrapper := testutils.MakeJob(args.jobName, args.ns). - JobLabels(map[string]string{ - jobset.JobSetNameKey: args.jobSetName, - jobset.ReplicatedJobNameKey: args.replicatedJobName, - jobset.ReplicatedJobReplicas: strconv.Itoa(args.replicas), - jobset.JobIndexKey: strconv.Itoa(args.jobIdx), - RestartsKey: strconv.Itoa(args.restarts), - jobset.JobKey: jobHashKey(args.ns, args.jobName), - }). - JobAnnotations(map[string]string{ - jobset.JobSetNameKey: args.jobSetName, - jobset.ReplicatedJobNameKey: args.replicatedJobName, - jobset.ReplicatedJobReplicas: strconv.Itoa(args.replicas), - jobset.JobIndexKey: strconv.Itoa(args.jobIdx), - }). - PodLabels(map[string]string{ - jobset.JobSetNameKey: args.jobSetName, - jobset.ReplicatedJobNameKey: args.replicatedJobName, - jobset.ReplicatedJobReplicas: strconv.Itoa(args.replicas), - jobset.JobIndexKey: strconv.Itoa(args.jobIdx), - RestartsKey: strconv.Itoa(args.restarts), - jobset.JobKey: jobHashKey(args.ns, args.jobName), - }). - PodAnnotations(map[string]string{ - jobset.JobSetNameKey: args.jobSetName, - jobset.ReplicatedJobNameKey: args.replicatedJobName, - jobset.ReplicatedJobReplicas: strconv.Itoa(args.replicas), - jobset.JobIndexKey: strconv.Itoa(args.jobIdx), - }) + JobLabels(labels). + JobAnnotations(annotations). + PodLabels(labels). + PodAnnotations(annotations) return jobWrapper } diff --git a/pkg/controllers/pod_controller.go b/pkg/controllers/pod_controller.go new file mode 100644 index 000000000..c061cdd01 --- /dev/null +++ b/pkg/controllers/pod_controller.go @@ -0,0 +1,322 @@ +/* +Copyright 2023 The Kubernetes Authors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controllers + +import ( + "context" + "errors" + "fmt" + "strings" + "sync" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/tools/record" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/predicate" + + "sigs.k8s.io/jobset/pkg/util/shared" + + jobset "sigs.k8s.io/jobset/api/jobset/v1alpha2" +) + +const ( + // PodNameKey is the key used for building an index where the key is the + // pod name (without the random suffix), and the value is the pod itself. + PodNameKey string = "podName" + + // podJobKey is the key used for building an index where the key is the hash of + // the namespaced job name of the job that owns this pod, and value is + // the pod itself. + podJobKey string = "podJobKey" + + // parallelDeletions defines the maximum number of pod deletions that can be done concurrently.. + parallelDeletions int = 50 +) + +// PodReconciler reconciles a Pod owned by a JobSet using exclusive placement. +type PodReconciler struct { + client.Client + Scheme *runtime.Scheme + Record record.EventRecorder +} + +func NewPodReconciler(client client.Client, scheme *runtime.Scheme, record record.EventRecorder) *PodReconciler { + return &PodReconciler{Client: client, Scheme: scheme, Record: record} +} + +// SetupWithManager sets up the controller with the Manager. +func (r *PodReconciler) SetupWithManager(mgr ctrl.Manager) error { + return ctrl.NewControllerManagedBy(mgr). + For(&corev1.Pod{}). + WithEventFilter(predicate.NewPredicateFuncs(func(object client.Object) bool { + pod, ok := object.(*corev1.Pod) + // Only reconcile pods that are part of JobSets using exclusive placement. + return ok && usingExclusivePlacement(pod) + })). + Complete(r) +} + +func SetupPodReconcilerIndexes(ctx context.Context, indexer client.FieldIndexer) error { + // Build index where key is the hash of the namespaced job name of the job that owns this pod, + // and value is the pod itself. + if err := indexer.IndexField(ctx, &corev1.Pod{}, podJobKey, func(obj client.Object) []string { + pod := obj.(*corev1.Pod) + // Make sure the pod is part of a JobSet using exclusive placement. + if _, exists := pod.Annotations[jobset.ExclusiveKey]; !exists { + return nil + } + jobKey, exists := pod.Labels[jobset.JobKey] + if !exists { + return nil + } + return []string{jobKey} + }); err != nil { + return err + } + + // Build index where the key is the pod name (without the random suffix), and the value is the pod itself. + return indexer.IndexField(ctx, &corev1.Pod{}, PodNameKey, func(obj client.Object) []string { + pod := obj.(*corev1.Pod) + // Make sure the pod is part of a JobSet using exclusive placement. + if _, exists := pod.Annotations[jobset.ExclusiveKey]; !exists { + return nil + } + podName, err := removePodNameSuffix(pod.Name) + if err != nil { + return nil + } + return []string{podName} + }) +} + +// +kubebuilder:rbac:groups="",resources=events,verbs=create;watch;update;patch +// +kubebuilder:rbac:groups=core,resources=pods,verbs=get;list;watch;create;update;patch;delete +// +kubebuilder:rbac:groups=core,resources=nodes,verbs=get;list;watch + +// Reconcile is part of the main kubernetes reconciliation loop which aims to +// move the current state of the cluster closer to the desired state. +func (r *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + // Get Pod from apiserver. + var pod corev1.Pod + if err := r.Get(ctx, req.NamespacedName, &pod); err != nil { + // we'll ignore not-found errors, since there is nothing we can do here. + return ctrl.Result{}, client.IgnoreNotFound(err) + } + + log := ctrl.LoggerFrom(ctx).WithValues("pod", klog.KObj(&pod)) + ctx = ctrl.LoggerInto(ctx, log) + log.V(2).Info("Reconciling Pod") + + // Check if this is the leader pod. If it is the leader pod and it hasn't been + // scheduled, do nothing and return early. + leader := shared.IsLeaderPod(&pod) + if leader { + log.Info(fmt.Sprintf("%q is a leader pod", pod.Name)) + if pod.Spec.NodeName == "" { + log.Info("leader pod not scheduled") + return ctrl.Result{}, nil + } + } + + // We need a reference to the scheduled leader pod of this job, to find the topology domain + // it is scheduled in. + leaderPod, err := r.getLeader(ctx, &pod) + if err != nil { + return ctrl.Result{}, err + } + + // Get all the pods owned by the same job as this pod. + jobKey, exists := leaderPod.Labels[jobset.JobKey] + if !exists { + return ctrl.Result{}, fmt.Errorf("job key label not found on leader pod: %q", leaderPod.Name) + } + podList, err := r.listPodsForJob(ctx, leaderPod.Namespace, jobKey) + if err != nil { + log.Error(err, "listing pods for job") + return ctrl.Result{}, err + } + + // Validate all follower pods in this job are assigned to the same topology as the leader pod. + // If not, then delete all the job's pods so they can be recreated and rescheduled correctly. + valid, err := r.validatePodPlacements(ctx, leaderPod, podList) + if err != nil { + return ctrl.Result{}, err + } + if !valid { + return ctrl.Result{}, r.deletePods(ctx, podList.Items) + } + return ctrl.Result{}, nil +} + +// listPodsForJobKey returns a list of pods owned by a specific job, using the +// jobKey (SHA1 hash of the namespaced job name) label selector. +func (r *PodReconciler) listPodsForJob(ctx context.Context, ns, jobKey string) (*corev1.PodList, error) { + log := ctrl.LoggerFrom(ctx) + + var podList corev1.PodList + if err := r.List(ctx, &podList, client.InNamespace(ns), &client.MatchingFields{podJobKey: jobKey}); err != nil { + log.Error(err, "listing pods") + return nil, err + } + + return &podList, nil +} + +func (r *PodReconciler) getPodByName(ctx context.Context, ns, podName string) (*corev1.Pod, error) { + log := ctrl.LoggerFrom(ctx) + + var podList corev1.PodList + if err := r.List(ctx, &podList, client.InNamespace(ns), &client.MatchingFields{PodNameKey: podName}); err != nil { + log.Error(err, "listing pods") + return nil, err + } + + // Validate only 1 pod with this name exists. + if len(podList.Items) != 1 { + return nil, fmt.Errorf("expected 1 pod with name %q, got %d", podName, len(podList.Items)) + } + + return &podList.Items[0], nil +} + +func usingExclusivePlacement(pod *corev1.Pod) bool { + _, exists := pod.Annotations[jobset.ExclusiveKey] + return exists +} + +// removePodNameSuffix removes the random suffix appended to pod names. +func removePodNameSuffix(podName string) (string, error) { + parts := strings.Split(podName, "-") + + // For a pod that belongs to a jobset, the minimum number of parts (split on "-" character) + // is 5, given the pod name format is defined as: ---- + if len(parts) < 5 { + return "", fmt.Errorf("invalid pod name: %s", podName) + } + return strings.Join(parts[:len(parts)-1], "-"), nil +} + +// genLeaderPodName accepts the name of a pod that is part of a jobset as input, and +// returns the name of the pod with completion index 0 in the same child job. +func genLeaderPodName(pod *corev1.Pod) (string, error) { + // Pod name format: ---- + jobSet, ok := pod.Labels[jobset.JobSetNameKey] + if !ok { + return "", fmt.Errorf("pod missing label: %s", jobset.JobSetNameKey) + } + replicatedJob, ok := pod.Labels[jobset.ReplicatedJobNameKey] + if !ok { + return "", fmt.Errorf("pod missing label: %s", jobset.ReplicatedJobNameKey) + } + jobIndex, ok := pod.Labels[jobset.JobIndexKey] + if !ok { + return "", fmt.Errorf("pod missing label: %s", jobset.JobIndexKey) + } + return shared.GenLeaderPodName(jobSet, replicatedJob, jobIndex), nil +} + +func (r *PodReconciler) getLeader(ctx context.Context, pod *corev1.Pod) (*corev1.Pod, error) { + log := ctrl.LoggerFrom(ctx) + + var leaderPod *corev1.Pod + if shared.IsLeaderPod(pod) { + log.Info(fmt.Sprintf("%q is a leader pod", pod.Name)) + leaderPod = pod + } else { + log.Info(fmt.Sprintf("%q is a follower pod", pod.Name)) + leaderPodName, err := genLeaderPodName(pod) + if err != nil { + log.Error(err, "generating leader pod name") + return nil, err + } + // Use pod name index to quickly fetch the leader pod object. + leaderPod, err = r.getPodByName(ctx, pod.Namespace, leaderPodName) + if err != nil { + log.Error(err, "getting leader pod by name") + return nil, err + } + } + return leaderPod, nil +} + +func (r *PodReconciler) validatePodPlacements(ctx context.Context, leaderPod *corev1.Pod, podList *corev1.PodList) (bool, error) { + // We know exclusive key is set since we have an event filter for this. + topologyKey := leaderPod.Annotations[jobset.ExclusiveKey] + leaderTopology, err := r.topologyFromPod(ctx, leaderPod, topologyKey) + if err != nil { + return false, err + } + + // Validate all follower pods are assigned to the same topology as the leader pod. + for _, pod := range podList.Items { + if shared.IsLeaderPod(&pod) { + continue + } + followerTopology, err := r.topologyFromPod(ctx, &pod, topologyKey) + if err != nil { + return false, err + } + if followerTopology != leaderTopology { + return false, fmt.Errorf("follower topology %q != leader topology %q", followerTopology, leaderTopology) + } + } + return true, nil +} + +func (r *PodReconciler) topologyFromPod(ctx context.Context, pod *corev1.Pod, topologyKey string) (string, error) { + log := ctrl.LoggerFrom(ctx) + + nodeName := pod.Spec.NodeName + ns := pod.Namespace + + // Get node the leader pod is running on. + var node corev1.Node + if err := r.Get(ctx, types.NamespacedName{Name: nodeName, Namespace: ns}, &node); err != nil { + // We'll ignore not-found errors, since there is nothing we can do here. + // A node may not exist temporarily due to a maintenance event or other scenarios. + log.Error(err, fmt.Sprintf("getting node %s", nodeName)) + return "", client.IgnoreNotFound(err) + } + + // Get topology (e.g. node pool name) from node labels. + topology, exists := node.Labels[topologyKey] + if !exists { + return "", fmt.Errorf("node does not have topology label: %s", topology) + } + return topology, nil +} + +// deletePods deletes the given pods, parallelizing up to `parallelDeletions` requests. +func (r *PodReconciler) deletePods(ctx context.Context, pods []corev1.Pod) error { + lock := &sync.Mutex{} + var finalErrs []error + + workqueue.ParallelizeUntil(ctx, parallelDeletions, len(pods), func(i int) { + pod := pods[i] + backgroundPolicy := metav1.DeletePropagationBackground + if err := r.Delete(ctx, &pod, &client.DeleteOptions{PropagationPolicy: &backgroundPolicy}); client.IgnoreNotFound(err) != nil { + lock.Lock() + defer lock.Unlock() + finalErrs = append(finalErrs, err) + return + } + }) + return errors.Join(finalErrs...) +} diff --git a/pkg/util/names/names.go b/pkg/util/names/names.go deleted file mode 100644 index 6584ce72f..000000000 --- a/pkg/util/names/names.go +++ /dev/null @@ -1,9 +0,0 @@ -package names - -import ( - "fmt" -) - -func GenJobName(jsName, rjobName string, jobIndex int) string { - return fmt.Sprintf("%s-%s-%d", jsName, rjobName, jobIndex) -} diff --git a/pkg/util/shared/shared.go b/pkg/util/shared/shared.go new file mode 100644 index 000000000..cd2f40864 --- /dev/null +++ b/pkg/util/shared/shared.go @@ -0,0 +1,24 @@ +// shared package provides utility functions that are shared between the +// webhooks and the controllers. +package shared + +import ( + "fmt" + + batchv1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" +) + +func GenJobName(jsName, rjobName string, jobIndex int) string { + return fmt.Sprintf("%s-%s-%d", jsName, rjobName, jobIndex) +} + +func IsLeaderPod(pod *corev1.Pod) bool { + return pod.Annotations[batchv1.JobCompletionIndexAnnotation] == "0" +} + +// GenLeaderPodName returns the name of the leader pod (pod with completion index 0) +// for a given job in a jobset. +func GenLeaderPodName(jobSet, replicatedJob, jobIndex string) string { + return fmt.Sprintf("%s-%s-%s-0", jobSet, replicatedJob, jobIndex) +} diff --git a/pkg/webhooks/pod_admission_webhook.go b/pkg/webhooks/pod_admission_webhook.go new file mode 100644 index 000000000..f2afb65e0 --- /dev/null +++ b/pkg/webhooks/pod_admission_webhook.go @@ -0,0 +1,125 @@ +package webhooks + +import ( + "context" + "fmt" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" + + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/webhook/admission" + + jobset "sigs.k8s.io/jobset/api/jobset/v1alpha2" + "sigs.k8s.io/jobset/pkg/controllers" + "sigs.k8s.io/jobset/pkg/util/shared" +) + +//+kubebuilder:webhook:path=/validate--v1-pod,mutating=false,failurePolicy=fail,sideEffects=None,groups="",resources=pods,verbs=create,versions=v1,name=vpod.kb.io,sideEffects=None,admissionReviewVersions=v1 + +// ValidateCreate implements webhook.Validator so a webhook will be registered for the type +func (p *podWebhook) ValidateCreate(ctx context.Context, obj runtime.Object) (admission.Warnings, error) { + return p.validate(ctx, obj) +} + +func (p *podWebhook) ValidateUpdate(ctx context.Context, oldObj, newObj runtime.Object) (admission.Warnings, error) { + return nil, nil +} + +func (p *podWebhook) ValidateDelete(ctx context.Context, obj runtime.Object) (admission.Warnings, error) { + return nil, nil +} + +func (p *podWebhook) validate(ctx context.Context, obj runtime.Object) (admission.Warnings, error) { + pod, ok := obj.(*corev1.Pod) + if !ok { + return nil, fmt.Errorf("expected a Pod but got a %T", obj) + } + + // If pod is part of a JobSet that is using the node selector exclusive placement strategy, + // we don't need to validate anything. + _, usingNodeSelectorStrategy := pod.Annotations[jobset.NodeSelectorStrategyKey] + if usingNodeSelectorStrategy { + return nil, nil + } + + // If pod is not part of a JobSet using exclusive placement, we don't need to validate anything. + topologyKey, usingExclusivePlacement := pod.Annotations[jobset.ExclusiveKey] + if !usingExclusivePlacement { + return nil, nil + } + + // Do not validate anything else for leader pods, proceed with creation immediately. + if !shared.IsLeaderPod(pod) { + // If a follower pod node selector has not been set, reject the creation. + if pod.Spec.NodeSelector == nil { + return nil, fmt.Errorf("follower pod node selector not set") + } + if _, exists := pod.Spec.NodeSelector[topologyKey]; !exists { + return nil, fmt.Errorf("follower pod node selector not set") + } + // For follower pods, validate leader pod exists and is scheduled. + leaderScheduled, err := p.leaderPodScheduled(ctx, pod) + if err != nil { + return nil, err + } + if !leaderScheduled { + return nil, fmt.Errorf("leader pod not yet scheduled, not creating follower pod %q", pod.Name) + } + } + return nil, nil +} + +func (p *podWebhook) leaderPodScheduled(ctx context.Context, pod *corev1.Pod) (bool, error) { + leaderPod, err := p.leaderPodForFollower(ctx, pod) + if err != nil { + return false, err + } + return leaderPod.Spec.NodeName != "", nil +} + +func (p *podWebhook) leaderPodForFollower(ctx context.Context, pod *corev1.Pod) (*corev1.Pod, error) { + // Generate the expected leader pod name for this follower pod. + log := ctrl.LoggerFrom(ctx) + log.Info(fmt.Sprintf("generating leader pod name for follower pod: %s", pod.Name)) + leaderPodName, err := genLeaderPodName(pod) + if err != nil { + log.Error(err, "getting leader pod name for follower pod") + return nil, err + } + + // Get the leader pod object via the pod name index. + var podList corev1.PodList + if err := p.client.List(ctx, &podList, client.InNamespace(pod.Namespace), &client.MatchingFields{controllers.PodNameKey: leaderPodName}); err != nil { + return nil, err + } + + // Validate there is only 1 leader pod for this job. + if len(podList.Items) != 1 { + return nil, fmt.Errorf("too many leader pods for this job (expected 1, got %d", len(podList.Items)) + } + + // Check if the leader pod is scheduled. + leaderPod := &podList.Items[0] + return leaderPod, nil +} + +// GenLeaderPodName accepts the name of a pod that is part of a jobset as input, and +// returns the name of the pod with completion index 0 in the same child job. +func genLeaderPodName(pod *corev1.Pod) (string, error) { + // Pod name format: ---- + jobSet, ok := pod.Labels[jobset.JobSetNameKey] + if !ok { + return "", fmt.Errorf("pod missing label: %s", jobset.JobSetNameKey) + } + replicatedJob, ok := pod.Labels[jobset.ReplicatedJobNameKey] + if !ok { + return "", fmt.Errorf("pod missing label: %s", jobset.ReplicatedJobNameKey) + } + jobIndex, ok := pod.Labels[jobset.JobIndexKey] + if !ok { + return "", fmt.Errorf("pod missing label: %s", jobset.JobIndexKey) + } + return shared.GenLeaderPodName(jobSet, replicatedJob, jobIndex), nil +} diff --git a/pkg/webhooks/pod_admission_webhook_test.go b/pkg/webhooks/pod_admission_webhook_test.go new file mode 100644 index 000000000..85bc4e48e --- /dev/null +++ b/pkg/webhooks/pod_admission_webhook_test.go @@ -0,0 +1,66 @@ +package webhooks + +import ( + "testing" + + "github.com/google/go-cmp/cmp" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + jobset "sigs.k8s.io/jobset/api/jobset/v1alpha2" +) + +func TestGenLeaderPodName(t *testing.T) { + cases := []struct { + desc string + pod *corev1.Pod + want string + wantErr bool + }{ + { + desc: "valid pod", + pod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod", + Labels: map[string]string{ + jobset.JobSetNameKey: "js", + jobset.ReplicatedJobNameKey: "rjob", + jobset.JobIndexKey: "0", + }, + }, + }, + want: "js-rjob-0-0", + }, + { + desc: "pod missing labels", + pod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod", + Labels: map[string]string{ + jobset.JobSetNameKey: "js", + jobset.ReplicatedJobNameKey: "rjob", + }, + }, + }, + wantErr: true, + }, + } + for _, tc := range cases { + t.Run(tc.desc, func(t *testing.T) { + got, err := genLeaderPodName(tc.pod) + // Handle unexpected error. + if err != nil && !tc.wantErr { + t.Errorf("unexpected error: %v", err) + return + } + // Handle missing an expected error. + if err == nil && tc.wantErr { + t.Errorf("expected error, got nil") + return + } + if diff := cmp.Diff(tc.want, got); diff != "" { + t.Errorf("unexpected diff (-want/+got): %s", diff) + } + }) + } +} diff --git a/pkg/webhooks/pod_mutating_webhook.go b/pkg/webhooks/pod_mutating_webhook.go new file mode 100644 index 000000000..a887883db --- /dev/null +++ b/pkg/webhooks/pod_mutating_webhook.go @@ -0,0 +1,188 @@ +/* +Copyright 2023 The Kubernetes Authors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package webhooks + +import ( + "context" + "fmt" + + batchv1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/webhook/admission" + + jobset "sigs.k8s.io/jobset/api/jobset/v1alpha2" +) + +// +kubebuilder:webhook:path=/mutate--v1-pod,mutating=true,failurePolicy=fail,groups="",resources=pods,verbs=create,versions=v1,name=mpod.kb.io,sideEffects=None,admissionReviewVersions=v1 + +// podWebhook for mutating webhook. +type podWebhook struct { + client client.Client + decoder *admission.Decoder +} + +func NewPodWebhook(mgr ctrl.Manager) *podWebhook { + return &podWebhook{client: mgr.GetClient()} +} + +// SetupMutatingWebhook configures the mutating webhook for pods. +func (p *podWebhook) SetupWebhookWithManager(mgr ctrl.Manager) error { + return ctrl.NewWebhookManagedBy(mgr). + For(&corev1.Pod{}). + WithDefaulter(p). + WithValidator(p). + Complete() +} + +// InjectDecoder, when defined, will result in the decoder automatically being set. +func (p *podWebhook) InjectDecoder(d *admission.Decoder) error { + p.decoder = d + return nil +} + +func (p *podWebhook) Default(ctx context.Context, obj runtime.Object) error { + pod, ok := obj.(*corev1.Pod) + if !ok { + return nil + } + // If this pod is part of a JobSet that is NOT using the exclusive placement feature, + // or if this jobset is using the node selector exclusive placement strategy (running + // the hack/label_nodes.py script beforehand), we don't need to mutate the pod here. + _, usingExclusivePlacement := pod.Annotations[jobset.ExclusiveKey] + _, usingNodeSelectorStrategy := pod.Annotations[jobset.NodeSelectorStrategyKey] + if !usingExclusivePlacement || usingNodeSelectorStrategy { + return nil + } + return p.patchPod(ctx, pod) +} + +// patchPod will add exclusive affinites to pod index 0, and for all pods +// it will add a nodeSelector selecting the same topology as pod index 0 is +// scheduled on. +func (p *podWebhook) patchPod(ctx context.Context, pod *corev1.Pod) error { + log := ctrl.LoggerFrom(ctx) + if pod.Annotations[batchv1.JobCompletionIndexAnnotation] == "0" { + log.Info(fmt.Sprintf("pod webhook: setting exclusive affinities for pod: %s", pod.Name)) + setExclusiveAffinities(pod) + return nil + } else { + log.Info(fmt.Sprintf("pod webhook: adding node selector for follower pod: %s", pod.Name)) + return p.setNodeSelector(ctx, pod) + } +} + +func setExclusiveAffinities(pod *corev1.Pod) { + if pod.Spec.Affinity == nil { + pod.Spec.Affinity = &corev1.Affinity{} + } + if pod.Spec.Affinity.PodAffinity == nil { + pod.Spec.Affinity.PodAffinity = &corev1.PodAffinity{} + } + if pod.Spec.Affinity.PodAntiAffinity == nil { + pod.Spec.Affinity.PodAntiAffinity = &corev1.PodAntiAffinity{} + } + // Pod affinity ensures the pods of this job land on the same topology domain. + pod.Spec.Affinity.PodAffinity.RequiredDuringSchedulingIgnoredDuringExecution = append(pod.Spec.Affinity.PodAffinity.RequiredDuringSchedulingIgnoredDuringExecution, + corev1.PodAffinityTerm{ + LabelSelector: &metav1.LabelSelector{MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: jobset.JobKey, + Operator: metav1.LabelSelectorOpIn, + Values: []string{pod.Labels[jobset.JobKey]}, + }, + }}, + TopologyKey: pod.Annotations[jobset.ExclusiveKey], + NamespaceSelector: &metav1.LabelSelector{}, + }) + // Pod anti-affinity ensures exclusively this job lands on the topology, preventing multiple jobs per topology domain. + pod.Spec.Affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution = append(pod.Spec.Affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution, + corev1.PodAffinityTerm{ + LabelSelector: &metav1.LabelSelector{MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: jobset.JobKey, + Operator: metav1.LabelSelectorOpExists, + }, + { + Key: jobset.JobKey, + Operator: metav1.LabelSelectorOpNotIn, + Values: []string{pod.Labels[jobset.JobKey]}, + }, + }}, + TopologyKey: pod.Annotations[jobset.ExclusiveKey], + NamespaceSelector: &metav1.LabelSelector{}, + }) +} + +func (p *podWebhook) setNodeSelector(ctx context.Context, pod *corev1.Pod) error { + log := ctrl.LoggerFrom(ctx) + // Find leader pod (completion index 0) for this job. + leaderPod, err := p.leaderPodForFollower(ctx, pod) + if err != nil { + log.Error(err, "finding leader pod for follower") + // Return no error, validation webhook will reject creation of this follower pod. + return nil + } + + // If leader pod is not scheduled yet, return error to retry pod creation until leader is scheduled. + if leaderPod.Spec.NodeName == "" { + // Return no error, validation webhook will reject creation of this follower pod. + return nil + } + + // Get the exclusive topology value for the leader pod (i.e. name of nodepool, rack, etc.) + topologyKey, ok := pod.Annotations[jobset.ExclusiveKey] + if !ok { + return fmt.Errorf("pod missing annotation: %s", jobset.ExclusiveKey) + } + topologyValue, err := p.topologyFromPod(ctx, leaderPod, topologyKey) + if err != nil { + log.Error(err, "getting topology from leader pod") + return err + } + + // Set node selector of follower pod so it's scheduled on the same topology as the leader. + if pod.Spec.NodeSelector == nil { + pod.Spec.NodeSelector = make(map[string]string) + } + pod.Spec.NodeSelector[topologyKey] = topologyValue + return nil +} + +func (p *podWebhook) topologyFromPod(ctx context.Context, pod *corev1.Pod, topologyKey string) (string, error) { + log := ctrl.LoggerFrom(ctx) + + nodeName := pod.Spec.NodeName + ns := pod.Namespace + + // Get node the leader pod is running on. + var node corev1.Node + if err := p.client.Get(ctx, types.NamespacedName{Name: nodeName, Namespace: ns}, &node); err != nil { + // We'll ignore not-found errors, since there is nothing we can do here. + // A node may not exist temporarily due to a maintenance event or other scenarios. + log.Error(err, fmt.Sprintf("getting node %s", nodeName)) + return "", client.IgnoreNotFound(err) + } + + // Get topology (e.g. node pool name) from node labels. + topology, exists := node.Labels[topologyKey] + if !exists { + return "", fmt.Errorf("node does not have topology label: %s", topology) + } + return topology, nil +} diff --git a/test/e2e/suite_test.go b/test/e2e/suite_test.go index 3b330cc2f..00116a461 100644 --- a/test/e2e/suite_test.go +++ b/test/e2e/suite_test.go @@ -31,7 +31,7 @@ import ( ) const ( - timeout = 5 * time.Minute + timeout = 10 * time.Minute interval = time.Millisecond * 250 ) diff --git a/test/integration/controller/suite_test.go b/test/integration/controller/suite_test.go index f2a7114b6..cd0c32630 100644 --- a/test/integration/controller/suite_test.go +++ b/test/integration/controller/suite_test.go @@ -84,12 +84,23 @@ var _ = BeforeSuite(func() { Scheme: scheme.Scheme, }) Expect(err).ToNot(HaveOccurred()) - jobSetController := controllers.NewJobSetReconciler(k8sManager.GetClient(), k8sManager.GetScheme(), k8sManager.GetEventRecorderFor("jobset")) - err = controllers.SetupIndexes(ctx, k8sManager.GetFieldIndexer()) + // Set up JobSet reconciler and indexes. + jobSetReconciler := controllers.NewJobSetReconciler(k8sManager.GetClient(), k8sManager.GetScheme(), k8sManager.GetEventRecorderFor("jobset")) + + err = controllers.SetupJobSetIndexes(ctx, k8sManager.GetFieldIndexer()) + Expect(err).ToNot(HaveOccurred()) + + err = jobSetReconciler.SetupWithManager(k8sManager) + Expect(err).ToNot(HaveOccurred()) + + // Set up pod reconciler and indexes. + podReconciler := controllers.NewPodReconciler(k8sManager.GetClient(), k8sManager.GetScheme(), k8sManager.GetEventRecorderFor("pod")) + + err = controllers.SetupPodReconcilerIndexes(ctx, k8sManager.GetFieldIndexer()) Expect(err).ToNot(HaveOccurred()) - err = jobSetController.SetupWithManager(k8sManager) + err = podReconciler.SetupWithManager(k8sManager) Expect(err).ToNot(HaveOccurred()) go func() { diff --git a/test/integration/webhook/suite_test.go b/test/integration/webhook/suite_test.go index c0dfdc428..e6d4a3958 100644 --- a/test/integration/webhook/suite_test.go +++ b/test/integration/webhook/suite_test.go @@ -40,6 +40,7 @@ import ( jobset "sigs.k8s.io/jobset/api/jobset/v1alpha2" "sigs.k8s.io/jobset/pkg/controllers" + "sigs.k8s.io/jobset/pkg/webhooks" ) // These tests use Ginkgo (BDD-style Go testing framework). Refer to @@ -107,12 +108,21 @@ var _ = BeforeSuite(func() { }) Expect(err).NotTo(HaveOccurred()) - err = controllers.SetupIndexes(ctx, mgr.GetFieldIndexer()) + // Set up JobSet webhook and indexes. + err = controllers.SetupJobSetIndexes(ctx, mgr.GetFieldIndexer()) Expect(err).NotTo(HaveOccurred()) err = (&jobset.JobSet{}).SetupWebhookWithManager(mgr) Expect(err).NotTo(HaveOccurred()) + // Set up pod webhook and indexes. + podWebhook := webhooks.NewPodWebhook(mgr) + err = controllers.SetupPodReconcilerIndexes(ctx, mgr.GetFieldIndexer()) + Expect(err).NotTo(HaveOccurred()) + + err = podWebhook.SetupWebhookWithManager(mgr) + Expect(err).NotTo(HaveOccurred()) + //+kubebuilder:scaffold:webhook go func() {