Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add new job-id annotation to assign globally unique job index to each job #650

Merged
merged 3 commits into from
Aug 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions api/jobset/v1alpha2/jobset_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,15 @@ const (
ReplicatedJobReplicas string = "jobset.sigs.k8s.io/replicatedjob-replicas"
// ReplicatedJobNameKey is used to index into a Jobs labels and retrieve the name of the parent ReplicatedJob
ReplicatedJobNameKey string = "jobset.sigs.k8s.io/replicatedjob-name"
JobIndexKey string = "jobset.sigs.k8s.io/job-index"
JobKey string = "jobset.sigs.k8s.io/job-key"
// JobIndexKey is a label/annotation set to the index of the Job replica within its parent replicatedJob.
// For each replicatedJob, this value will range from 0 to replicas-1, where `replicas`
// is equal to jobset.spec.replicatedJobs[*].replicas.
JobIndexKey string = "jobset.sigs.k8s.io/job-index"
// JobGlobalIndexKey is a label/annotation set to an integer that is unique across the entire JobSet.
// For each JobSet, this value will range from 0 to N-1, where N=total number of jobs in the jobset.
JobGlobalIndexKey string = "jobset.sigs.k8s.io/job-global-index"
// JobKey holds the SHA256 hash of the namespaced job name, which can be used to uniquely identify the job.
JobKey string = "jobset.sigs.k8s.io/job-key"
// ExclusiveKey is an annotation that can be set on the JobSet or on a ReplicatedJob template.
// If set at the JobSet level, all child jobs from all ReplicatedJobs will be scheduled using exclusive
// job placement per topology group (defined as the label value).
Expand Down
31 changes: 31 additions & 0 deletions pkg/controllers/jobset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -730,6 +730,7 @@ func labelAndAnnotateObject(obj metav1.Object, js *jobset.JobSet, rjob *jobset.R
labels[jobset.ReplicatedJobReplicas] = strconv.Itoa(int(rjob.Replicas))
labels[jobset.JobIndexKey] = strconv.Itoa(jobIdx)
labels[jobset.JobKey] = jobHashKey(js.Namespace, jobName)
labels[jobset.JobGlobalIndexKey] = globalJobIndex(js, rjob.Name, jobIdx)

// Set annotations on the object.
annotations := collections.CloneMap(obj.GetAnnotations())
Expand All @@ -739,6 +740,7 @@ func labelAndAnnotateObject(obj metav1.Object, js *jobset.JobSet, rjob *jobset.R
annotations[jobset.ReplicatedJobReplicas] = strconv.Itoa(int(rjob.Replicas))
annotations[jobset.JobIndexKey] = strconv.Itoa(jobIdx)
annotations[jobset.JobKey] = jobHashKey(js.Namespace, jobName)
annotations[jobset.JobGlobalIndexKey] = globalJobIndex(js, rjob.Name, jobIdx)

// Apply coordinator annotation/label if a coordinator is defined in the JobSet spec.
if js.Spec.Coordinator != nil {
Expand Down Expand Up @@ -1032,3 +1034,32 @@ func exclusiveConditions(cond1, cond2 metav1.Condition) bool {
func coordinatorEndpoint(js *jobset.JobSet) string {
return fmt.Sprintf("%s-%s-%d-%d.%s", js.Name, js.Spec.Coordinator.ReplicatedJob, js.Spec.Coordinator.JobIndex, js.Spec.Coordinator.PodIndex, GetSubdomain(js))
}

// globalJobIndex determines the job global index for a given job. The job global index is a unique
// global index for the job, with values ranging from 0 to N-1,
// where N=total number of jobs in the jobset. The job global index is calculated by
// iterating through the replicatedJobs in the order, as defined in the JobSet
// spec, keeping a cumulative sum of total replicas seen so far, then when we
// arrive at the parent replicatedJob of the target job, we add the local job
// index to our running sum of total jobs seen so far, in order to arrive at
// the final job global index value.
//
// Below is a diagram illustrating how job global indexs differ from job indexes.
//
// | my-jobset |
// | replicated job A | replicated job B |
// | job index 0 | job index 1 | job index 0 | job index 1 |
// | global index 0 | global index 2 | global index 3 | global index 4 |
//
// Returns an empty string if the parent replicated Job does not exist,
// although this should never happen in practice.
func globalJobIndex(js *jobset.JobSet, replicatedJobName string, jobIdx int) string {
currTotalJobs := 0
for _, rjob := range js.Spec.ReplicatedJobs {
if rjob.Name == replicatedJobName {
return strconv.Itoa(currTotalJobs + jobIdx)
}
currTotalJobs += int(rjob.Replicas)
}
return ""
}
90 changes: 90 additions & 0 deletions pkg/controllers/jobset_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -686,6 +686,15 @@ func TestConstructJobsFromTemplate(t *testing.T) {

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
// Here we update the expected Jobs with certain features which require
// direct access to the JobSet object itself to calculate. For example,
// the `jobset.sigs.k8s.io/job-global-index` annotation requires access to the
// full JobSet spec to calculate a unique ID for each Job.
for _, expectedJob := range tc.want {
addJobGlobalIndex(t, tc.js, expectedJob)
}

// Now get the actual output of constructJobsFromTemplate, and diff the results.
var got []*batchv1.Job
for _, rjob := range tc.js.Spec.ReplicatedJobs {
jobs := constructJobsFromTemplate(tc.js, &rjob, tc.ownedJobs)
Expand All @@ -699,6 +708,26 @@ func TestConstructJobsFromTemplate(t *testing.T) {
}
}

// addJobGlobalIndex modifies the Job object in place by adding
// the `jobset.sigs.k8s.io/job-global-index` label/annotation to both the
// Job itself and the Job template spec.`
func addJobGlobalIndex(t *testing.T, js *jobset.JobSet, job *batchv1.Job) {
t.Helper()

rjobName := job.Annotations[jobset.ReplicatedJobNameKey]
jobIdx, err := strconv.Atoi(job.Annotations[jobset.JobIndexKey])
if err != nil {
t.Fatalf("invalid test case: %v", err)
}
// Job label/annotation
job.Labels[jobset.JobGlobalIndexKey] = globalJobIndex(js, rjobName, jobIdx)
job.Annotations[jobset.JobGlobalIndexKey] = globalJobIndex(js, rjobName, jobIdx)

// Job template spec label/annotation
job.Spec.Template.Labels[jobset.JobGlobalIndexKey] = globalJobIndex(js, rjobName, jobIdx)
job.Spec.Template.Annotations[jobset.JobGlobalIndexKey] = globalJobIndex(js, rjobName, jobIdx)
}

func TestUpdateConditions(t *testing.T) {
var (
jobSetName = "test-jobset"
Expand Down Expand Up @@ -1381,3 +1410,64 @@ func TestCreateHeadlessSvcIfNecessary(t *testing.T) {
})
}
}

func TestGlobalJobIndex(t *testing.T) {
tests := []struct {
name string
jobSet *jobset.JobSet
replicatedJob string
jobIdx int
expectedJobGlobalIndex string
}{
{
name: "single replicated job",
jobSet: &jobset.JobSet{
Spec: jobset.JobSetSpec{
ReplicatedJobs: []jobset.ReplicatedJob{
{Name: "rjob", Replicas: 3},
},
},
},
replicatedJob: "rjob",
jobIdx: 1,
expectedJobGlobalIndex: "1",
},
{
name: "multiple replicated jobs",
jobSet: &jobset.JobSet{
Spec: jobset.JobSetSpec{
ReplicatedJobs: []jobset.ReplicatedJob{
{Name: "rjob1", Replicas: 2},
{Name: "rjob2", Replicas: 4},
{Name: "rjob3", Replicas: 1},
},
},
},
replicatedJob: "rjob2",
jobIdx: 3,
expectedJobGlobalIndex: "5",
},
{
name: "replicated job not found",
jobSet: &jobset.JobSet{
Spec: jobset.JobSetSpec{
ReplicatedJobs: []jobset.ReplicatedJob{
{Name: "rjob1", Replicas: 2},
},
},
},
replicatedJob: "rjob2",
jobIdx: 0,
expectedJobGlobalIndex: "",
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
actualJobGlobalIndex := globalJobIndex(tc.jobSet, tc.replicatedJob, tc.jobIdx)
if diff := cmp.Diff(tc.expectedJobGlobalIndex, actualJobGlobalIndex); diff != "" {
t.Errorf("unexpected global job index (-want/+got): %s", diff)
}
})
}
}