Skip to content

Commit

Permalink
initial commit for coordinator support
Browse files Browse the repository at this point in the history
  • Loading branch information
danielvegamyhre committed Jul 15, 2024
1 parent 345cc92 commit a9b40a6
Show file tree
Hide file tree
Showing 6 changed files with 186 additions and 13 deletions.
28 changes: 28 additions & 0 deletions api/jobset/v1alpha2/jobset_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ const (
// JobSetControllerName is the reserved value for the managedBy field for the built-in
// JobSet controller.
JobSetControllerName = "jobset.sigs.k8s.io/jobset-controller"

// CoordinatorKey is used as an annotation and label on Jobs and Pods. If the JobSet spec
// defines the .spec.coordinator field, this annotation/label will be added to store a stable
// network endpoint where the coordinator pod can be reached.
CoordinatorKey = "jobset.sigs.k8s.io/coordinator"
)

type JobSetConditionType string
Expand Down Expand Up @@ -95,6 +100,14 @@ type JobSetSpec struct {
// Suspend suspends all running child Jobs when set to true.
Suspend *bool `json:"suspend,omitempty"`

// Coordinator can be used to assign a specific pod as the coordinator for
// the JobSet. If defined, an annotation will be added to all Jobs and pods with
// coordinator pod, which contains the stable network endpoint where the
// coordinator pod can be reached.
// jobset.sigs.k8s.io/coordinator=<pod hostname>.<headless service>
// +optional
Coordinator *Coordinator `json:"coordinator,omitempty"`

// ManagedBy is used to indicate the controller or entity that manages a JobSet.
// The built-in JobSet controller reconciles JobSets which don't have this
// field at all or the field value is the reserved string
Expand Down Expand Up @@ -323,6 +336,21 @@ type StartupPolicy struct {
StartupPolicyOrder StartupPolicyOptions `json:"startupPolicyOrder"`
}

// Coordinator defines which pod can be marked as the coordinator for the JobSet workload.
type Coordinator struct {
// ReplicatedJob is the name of the ReplicatedJob which contains
// the coordinator pod.
ReplicatedJob string `json:"replicatedJob"`

// JobIndex is the index of Job which contains the coordinator pod.
// Defaults to 0 if unset.
JobIndex int `json:"jobIndex,omitempty"`

// PodIndex is the Job completion index of the pod that the coordinator.
// Defaults to 0 if unset.
PodIndex int `json:"podIndex,omitempty"`
}

func init() {
SchemeBuilder.Register(&JobSet{}, &JobSetList{})
}
26 changes: 26 additions & 0 deletions config/components/crd/bases/jobset.x-k8s.io_jobsets.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,32 @@ spec:
spec:
description: JobSetSpec defines the desired state of JobSet
properties:
coordinator:
description: |-
Coordinator can be used to assign a specific pod as the coordinator for
the JobSet. If defined, an annotation will be added to all Jobs and pods with
coordinator pod, which contains the stable network endpoint where the
coordinator pod can be reached.
jobset.sigs.k8s.io/coordinator=<pod hostname>.<headless service>
properties:
jobIndex:
description: |-
JobIndex is the index of Job which contains the coordinator pod.
Defaults to 0 if unset.
type: integer
podIndex:
description: |-
PodIndex is the Job completion index of the pod that the coordinator.
Defaults to 0 if unset.
type: integer
replicatedJob:
description: |-
ReplicatedJob is the name of the ReplicatedJob which contains
the coordinator pod.
type: string
required:
- replicatedJob
type: object
failurePolicy:
description: |-
FailurePolicy, if set, configures when to declare the JobSet as
Expand Down
12 changes: 12 additions & 0 deletions pkg/controllers/jobset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -746,6 +746,12 @@ func labelAndAnnotateObject(obj metav1.Object, js *jobset.JobSet, rjob *jobset.R
annotations[jobset.JobIndexKey] = strconv.Itoa(jobIdx)
annotations[jobset.JobKey] = jobHashKey(js.Namespace, jobName)

// Apply coordinator annotation/label if a coordinator is defined in the JobSet spec.
if js.Spec.Coordinator != nil {
labels[jobset.CoordinatorKey] = coordinatorEndpoint(js)
annotations[jobset.CoordinatorKey] = coordinatorEndpoint(js)
}

// Check for JobSet level exclusive placement.
if topologyDomain, exists := js.Annotations[jobset.ExclusiveKey]; exists {
annotations[jobset.ExclusiveKey] = topologyDomain
Expand Down Expand Up @@ -1024,3 +1030,9 @@ func exclusiveConditions(cond1, cond2 metav1.Condition) bool {
cond2.Type == string(jobset.JobSetStartupPolicyInProgress)
return inProgressAndCompleted || completedAndInProgress
}

// coordinatorEndpoint returns the stable network endpoint where the coordinator pod can be reached.
// This function assumes the caller has validated that jobset.Spec.Coordinator != nil.
func coordinatorEndpoint(js *jobset.JobSet) string {
return fmt.Sprintf("%s-%s-%d-%d.%s", js.Name, js.Spec.Coordinator.ReplicatedJob, js.Spec.Coordinator.JobIndex, js.Spec.Coordinator.PodIndex, GetSubdomain(js))
}
47 changes: 42 additions & 5 deletions pkg/controllers/jobset_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package controllers
import (
"context"
"errors"
"fmt"
"strconv"
"strings"
"testing"
Expand Down Expand Up @@ -126,11 +127,14 @@ func TestIsJobFinished(t *testing.T) {

func TestConstructJobsFromTemplate(t *testing.T) {
var (
jobSetName = "test-jobset"
replicatedJobName = "replicated-job"
jobName = "test-job"
ns = "default"
topologyDomain = "test-topology-domain"
jobSetName = "test-jobset"
replicatedJobName = "replicated-job"
jobName = "test-job"
ns = "default"
topologyDomain = "test-topology-domain"
coordinatorKeyValue = map[string]string{
jobset.CoordinatorKey: fmt.Sprintf("%s-%s-%d-%d.%s", jobSetName, replicatedJobName, 0, 0, jobSetName),
}
)

tests := []struct {
Expand Down Expand Up @@ -618,6 +622,39 @@ func TestConstructJobsFromTemplate(t *testing.T) {
}).Obj(),
},
},
{
name: "coordinator",
js: testutils.MakeJobSet(jobSetName, ns).
Coordinator(&jobset.Coordinator{
ReplicatedJob: replicatedJobName,
JobIndex: 0,
PodIndex: 0,
}).
EnableDNSHostnames(true).
NetworkSubdomain(jobSetName).
ReplicatedJob(testutils.MakeReplicatedJob(replicatedJobName).
Job(testutils.MakeJobTemplate(jobName, ns).Obj()).
Subdomain(jobSetName).
Replicas(1).
Obj()).
Obj(),
ownedJobs: &childJobs{},
want: []*batchv1.Job{
makeJob(&makeJobArgs{
jobSetName: jobSetName,
replicatedJobName: replicatedJobName,
jobName: "test-jobset-replicated-job-0",
ns: ns,
replicas: 1,
jobIdx: 0}).
JobAnnotations(coordinatorKeyValue).
JobLabels(coordinatorKeyValue).
PodAnnotations(coordinatorKeyValue).
PodLabels(coordinatorKeyValue).
Suspend(false).
Subdomain(jobSetName).Obj(),
},
},
{
name: "startup-policy",
js: testutils.MakeJobSet(jobSetName, ns).
Expand Down
43 changes: 35 additions & 8 deletions pkg/util/testing/wrappers.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"k8s.io/utils/ptr"

jobset "sigs.k8s.io/jobset/api/jobset/v1alpha2"
"sigs.k8s.io/jobset/pkg/util/collections"
)

// TestPodSpec is the default pod spec used for testing.
Expand Down Expand Up @@ -121,6 +122,12 @@ func (j *JobSetWrapper) Suspend(suspend bool) *JobSetWrapper {
return j
}

// Coordinator sets the Coordinator field on the JobSet spec.
func (j *JobSetWrapper) Coordinator(coordinator *jobset.Coordinator) *JobSetWrapper {
j.JobSet.Spec.Coordinator = coordinator
return j
}

// NetworkSubdomain sets the value of JobSet.Network.Subdomain
func (j *JobSetWrapper) NetworkSubdomain(val string) *JobSetWrapper {
j.JobSet.Spec.Network.Subdomain = val
Expand Down Expand Up @@ -299,21 +306,36 @@ func (j *JobWrapper) Affinity(affinity *corev1.Affinity) *JobWrapper {
return j
}

// JobLabels sets the Job labels.
// JobLabels merges the given labels to the existing Job labels.
// Duplicate keys will be overwritten by the new annotations (given in the function
// parameter).
func (j *JobWrapper) JobLabels(labels map[string]string) *JobWrapper {
j.Labels = labels
if j.Labels == nil {
j.Labels = make(map[string]string)
}
j.Labels = collections.MergeMaps(j.Labels, labels)
return j
}

// JobAnnotations sets the Job annotations.
// JobAnnotations merges the given annotations to the existing Job annotations.
// Duplicate keys will be overwritten by the new annotations (given in the function
// parameter).
func (j *JobWrapper) JobAnnotations(annotations map[string]string) *JobWrapper {
j.Annotations = annotations
if j.Annotations == nil {
j.Annotations = make(map[string]string)
}
j.Annotations = collections.MergeMaps(j.Annotations, annotations)
return j
}

// PodLabels sets the pod template spec labels.
// PodLabels merges the given labels to the existing Pod labels.
// Duplicate keys will be overwritten by the new annotations (given in the function
// parameter).
func (j *JobWrapper) PodLabels(labels map[string]string) *JobWrapper {
j.Spec.Template.Labels = labels
if j.Spec.Template.Labels == nil {
j.Spec.Template.Labels = make(map[string]string)
}
j.Spec.Template.Labels = collections.MergeMaps(j.Spec.Template.Labels, labels)
return j
}

Expand All @@ -323,9 +345,14 @@ func (j *JobWrapper) Suspend(suspend bool) *JobWrapper {
return j
}

// PodAnnotations sets the pod template spec annotations.
// PodAnnotations merges the given annotations to the existing Pod annotations.
// Duplicate keys will be overwritten by the new annotations (given in the function
// parameter).
func (j *JobWrapper) PodAnnotations(annotations map[string]string) *JobWrapper {
j.Spec.Template.Annotations = annotations
if j.Spec.Template.Annotations == nil {
j.Spec.Template.Annotations = make(map[string]string)
}
j.Spec.Template.Annotations = collections.MergeMaps(j.Spec.Template.Annotations, annotations)
return j
}

Expand Down
43 changes: 43 additions & 0 deletions test/integration/controller/jobset_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1544,6 +1544,25 @@ var _ = ginkgo.Describe("JobSet controller", func() {
},
},
}),
ginkgo.Entry("jobset with coordinator set should have annotation and label set on all jobs", &testCase{
makeJobSet: func(ns *corev1.Namespace) *testing.JobSetWrapper {
return testJobSet(ns).Coordinator(&jobset.Coordinator{
ReplicatedJob: "replicated-job-a",
JobIndex: 0,
PodIndex: 0,
})
},
steps: []*step{
{
checkJobSetState: func(js *jobset.JobSet) {
gomega.Eventually(func() (bool, error) {
expectedCoordinator := fmt.Sprintf("%s-%s-%d-%d.%s", "test-js", "replicated-job-a", 0, 0, "test-js")
return checkCoordinator(js, expectedCoordinator)
}, timeout, interval).Should(gomega.Succeed())
},
},
},
}),
) // end of DescribeTable

ginkgo.When("A JobSet is managed by another controller", ginkgo.Ordered, func() {
Expand Down Expand Up @@ -2118,6 +2137,30 @@ func matchJobSetReplicatedStatus(js *jobset.JobSet, expectedStatus []jobset.Repl
}, timeout, interval).Should(gomega.Equal(expectedStatus))
}

// checkCoordinator verifies that all child Jobs of a JobSet have the label and annotation:
// jobset.sigs.k8s.io/coordinator=<expectedCoordinator>
// Returns boolean value indicating if the check passed or not.
func checkCoordinator(js *jobset.JobSet, expectedCoordinator string) (bool, error) {
var jobList batchv1.JobList
if err := k8sClient.List(ctx, &jobList, client.InNamespace(js.Namespace)); err != nil {
return false, err
}
// Check we have the right number of jobs.
if len(jobList.Items) != testutil.NumExpectedJobs(js) {
return false, nil
}
// Check all the jobs have the coordinator label and annotation.
for _, job := range jobList.Items {
if job.Labels[jobset.CoordinatorKey] != expectedCoordinator {
return false, nil
}
if job.Annotations[jobset.CoordinatorKey] != expectedCoordinator {
return false, nil
}
}
return true, nil
}

// 2 replicated jobs:
// - one with 1 replica
// - one with 3 replicas and DNS hostnames enabled
Expand Down

0 comments on commit a9b40a6

Please sign in to comment.