Skip to content

Commit

Permalink
initial commit for coordinator support
Browse files Browse the repository at this point in the history
  • Loading branch information
danielvegamyhre committed Jul 15, 2024
1 parent 345cc92 commit 95fc11b
Show file tree
Hide file tree
Showing 23 changed files with 640 additions and 15 deletions.
29 changes: 29 additions & 0 deletions api/jobset/v1alpha2/jobset_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ const (
// JobSetControllerName is the reserved value for the managedBy field for the built-in
// JobSet controller.
JobSetControllerName = "jobset.sigs.k8s.io/jobset-controller"

// CoordinatorKey is used as an annotation and label on Jobs and Pods. If the JobSet spec
// defines the .spec.coordinator field, this annotation/label will be added to store a stable
// network endpoint where the coordinator pod can be reached.
CoordinatorKey = "jobset.sigs.k8s.io/coordinator"
)

type JobSetConditionType string
Expand Down Expand Up @@ -95,6 +100,14 @@ type JobSetSpec struct {
// Suspend suspends all running child Jobs when set to true.
Suspend *bool `json:"suspend,omitempty"`

// Coordinator can be used to assign a specific pod as the coordinator for
// the JobSet. If defined, an annotation will be added to all Jobs and pods with
// coordinator pod, which contains the stable network endpoint where the
// coordinator pod can be reached.
// jobset.sigs.k8s.io/coordinator=<pod hostname>.<headless service>
// +optional
Coordinator *Coordinator `json:"coordinator,omitempty"`

// ManagedBy is used to indicate the controller or entity that manages a JobSet.
// The built-in JobSet controller reconciles JobSets which don't have this
// field at all or the field value is the reserved string
Expand Down Expand Up @@ -323,6 +336,22 @@ type StartupPolicy struct {
StartupPolicyOrder StartupPolicyOptions `json:"startupPolicyOrder"`
}

// Coordinator defines which pod can be marked as the coordinator for the JobSet workload.
type Coordinator struct {
// ReplicatedJob is the name of the ReplicatedJob which contains
// the coordinator pod.
ReplicatedJob string `json:"replicatedJob"`

// JobIndex is the index of Job which contains the coordinator pod
// (i.e., for a ReplicatedJob with N replicas, there are Job indexes 0 to N-1).
// Defaults to 0 if unset.
JobIndex int `json:"jobIndex,omitempty"`

// PodIndex is the Job completion index of the coordinator pod.
// Defaults to 0 if unset.
PodIndex int `json:"podIndex,omitempty"`
}

func init() {
SchemeBuilder.Register(&JobSet{}, &JobSetList{})
}
45 changes: 44 additions & 1 deletion api/jobset/v1alpha2/openapi_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 20 additions & 0 deletions api/jobset/v1alpha2/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

53 changes: 53 additions & 0 deletions client-go/applyconfiguration/jobset/v1alpha2/coordinator.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions client-go/applyconfiguration/jobset/v1alpha2/jobsetspec.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions client-go/applyconfiguration/utils.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 27 additions & 0 deletions config/components/crd/bases/jobset.x-k8s.io_jobsets.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,33 @@ spec:
spec:
description: JobSetSpec defines the desired state of JobSet
properties:
coordinator:
description: |-
Coordinator can be used to assign a specific pod as the coordinator for
the JobSet. If defined, an annotation will be added to all Jobs and pods with
coordinator pod, which contains the stable network endpoint where the
coordinator pod can be reached.
jobset.sigs.k8s.io/coordinator=<pod hostname>.<headless service>
properties:
jobIndex:
description: |-
JobIndex is the index of Job which contains the coordinator pod
(i.e., for a ReplicatedJob with N replicas, there are Job indexes 0 to N-1).
Defaults to 0 if unset.
type: integer
podIndex:
description: |-
PodIndex is the Job completion index of the coordinator pod.
Defaults to 0 if unset.
type: integer
replicatedJob:
description: |-
ReplicatedJob is the name of the ReplicatedJob which contains
the coordinator pod.
type: string
required:
- replicatedJob
type: object
failurePolicy:
description: |-
FailurePolicy, if set, configures when to declare the JobSet as
Expand Down
28 changes: 28 additions & 0 deletions hack/python-sdk/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,30 @@
},
"paths": {},
"definitions": {
"jobset.v1alpha2.Coordinator": {
"description": "Coordinator defines which pod can be marked as the coordinator for the JobSet workload.",
"type": "object",
"required": [
"replicatedJob"
],
"properties": {
"jobIndex": {
"description": "JobIndex is the index of Job which contains the coordinator pod (i.e., for a ReplicatedJob with N replicas, there are Job indexes 0 to N-1). Defaults to 0 if unset.",
"type": "integer",
"format": "int32"
},
"podIndex": {
"description": "PodIndex is the Job completion index of the coordinator pod. Defaults to 0 if unset.",
"type": "integer",
"format": "int32"
},
"replicatedJob": {
"description": "ReplicatedJob is the name of the ReplicatedJob which contains the coordinator pod.",
"type": "string",
"default": ""
}
}
},
"jobset.v1alpha2.FailurePolicy": {
"type": "object",
"properties": {
Expand Down Expand Up @@ -120,6 +144,10 @@
"description": "JobSetSpec defines the desired state of JobSet",
"type": "object",
"properties": {
"coordinator": {
"description": "Coordinator can be used to assign a specific pod as the coordinator for the JobSet. If defined, an annotation will be added to all Jobs and pods with coordinator pod, which contains the stable network endpoint where the coordinator pod can be reached. jobset.sigs.k8s.io/coordinator=\u003cpod hostname\u003e.\u003cheadless service\u003e",
"$ref": "#/definitions/jobset.v1alpha2.Coordinator"
},
"failurePolicy": {
"description": "FailurePolicy, if set, configures when to declare the JobSet as failed. The JobSet is always declared failed if any job in the set finished with status failed.",
"$ref": "#/definitions/jobset.v1alpha2.FailurePolicy"
Expand Down
12 changes: 12 additions & 0 deletions pkg/controllers/jobset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -746,6 +746,12 @@ func labelAndAnnotateObject(obj metav1.Object, js *jobset.JobSet, rjob *jobset.R
annotations[jobset.JobIndexKey] = strconv.Itoa(jobIdx)
annotations[jobset.JobKey] = jobHashKey(js.Namespace, jobName)

// Apply coordinator annotation/label if a coordinator is defined in the JobSet spec.
if js.Spec.Coordinator != nil {
labels[jobset.CoordinatorKey] = coordinatorEndpoint(js)
annotations[jobset.CoordinatorKey] = coordinatorEndpoint(js)
}

// Check for JobSet level exclusive placement.
if topologyDomain, exists := js.Annotations[jobset.ExclusiveKey]; exists {
annotations[jobset.ExclusiveKey] = topologyDomain
Expand Down Expand Up @@ -1024,3 +1030,9 @@ func exclusiveConditions(cond1, cond2 metav1.Condition) bool {
cond2.Type == string(jobset.JobSetStartupPolicyInProgress)
return inProgressAndCompleted || completedAndInProgress
}

// coordinatorEndpoint returns the stable network endpoint where the coordinator pod can be reached.
// This function assumes the caller has validated that jobset.Spec.Coordinator != nil.
func coordinatorEndpoint(js *jobset.JobSet) string {
return fmt.Sprintf("%s-%s-%d-%d.%s", js.Name, js.Spec.Coordinator.ReplicatedJob, js.Spec.Coordinator.JobIndex, js.Spec.Coordinator.PodIndex, GetSubdomain(js))
}
Loading

0 comments on commit 95fc11b

Please sign in to comment.