Skip to content

Commit

Permalink
Fast admission annotation (kubernetes-sigs#3189)
Browse files Browse the repository at this point in the history
* Fast admission annotation

* Fast admission annotation

* Simplify constructGroupPodSetsFast

* Fast admission annotation

* Restructure integration test
  • Loading branch information
vladikkuzn authored Oct 9, 2024
1 parent 460e737 commit a768127
Show file tree
Hide file tree
Showing 4 changed files with 168 additions and 8 deletions.
28 changes: 27 additions & 1 deletion pkg/controller/jobs/pod/pod_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -667,9 +667,34 @@ func (p *Pod) Load(ctx context.Context, c client.Client, key *types.NamespacedNa
}

func (p *Pod) constructGroupPodSets() ([]kueue.PodSet, error) {
if _, useFastAdmission := p.pod.GetAnnotations()[GroupFastAdmissionAnnotation]; useFastAdmission {
tc, err := p.groupTotalCount()
if err != nil {
return nil, err
}
return constructGroupPodSetsFast(p, tc)
}
return constructGroupPodSets(p.list.Items)
}

func constructGroupPodSetsFast(p *Pod, groupTotalCount int) ([]kueue.PodSet, error) {
for _, podInGroup := range p.list.Items {
if !isPodRunnableOrSucceeded(&podInGroup) {
continue
}
roleHash, err := getRoleHash(podInGroup)
if err != nil {
return nil, fmt.Errorf("failed to calculate pod role hash: %w", err)
}
podSets := FromObject(&podInGroup).PodSets()
podSets[0].Name = roleHash
podSets[0].Count = int32(groupTotalCount)
return podSets, nil
}

return nil, errors.New("failed to find a runnable pod in the group")
}

func constructGroupPodSets(pods []corev1.Pod) ([]kueue.PodSet, error) {
var resultPodSets []kueue.PodSet

Expand Down Expand Up @@ -713,8 +738,9 @@ func (p *Pod) validatePodGroupMetadata(r record.EventRecorder, activePods []core
return err
}
originalQueue := jobframework.QueueName(p)
_, useFastAdmission := p.pod.GetAnnotations()[GroupFastAdmissionAnnotation]

if len(activePods) < groupTotalCount {
if !useFastAdmission && len(activePods) < groupTotalCount {
errMsg := fmt.Sprintf("'%s' group has fewer runnable pods than expected", podGroupName(p.pod))
r.Eventf(p.Object(), corev1.EventTypeWarning, jobframework.ReasonErrWorkloadCompose, errMsg)
return jobframework.UnretryableError(errMsg)
Expand Down
54 changes: 54 additions & 0 deletions pkg/controller/jobs/pod/pod_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -622,6 +622,60 @@ func TestReconciler(t *testing.T) {
},
},
},
"workload is composed and created for the pod group with fast admission": {
pods: []corev1.Pod{
*basePodWrapper.
Clone().
Label(constants.ManagedByKueueLabel, "true").
KueueFinalizer().
KueueSchedulingGate().
Annotation(controllerconsts.ProvReqAnnotationPrefix+"test-annotation", "test-val").
Annotation("invalid-provreq-prefix/test-annotation-2", "test-val-2").
Group("test-group").
GroupTotalCount("3").
Annotation(GroupFastAdmissionAnnotation, "true").
Obj(),
},
wantPods: []corev1.Pod{
// Other pods are created on second reconcile
*basePodWrapper.
Clone().
Label(constants.ManagedByKueueLabel, "true").
KueueFinalizer().
KueueSchedulingGate().
Annotation(controllerconsts.ProvReqAnnotationPrefix+"test-annotation", "test-val").
Annotation("invalid-provreq-prefix/test-annotation-2", "test-val-2").
Group("test-group").
GroupTotalCount("3").
Annotation(GroupFastAdmissionAnnotation, "true").
Obj(),
},
wantWorkloads: []kueue.Workload{
*utiltesting.MakeWorkload("test-group", "ns").Finalizers(kueue.ResourceInUseFinalizerName).
PodSets(
*utiltesting.MakePodSet("dc85db45", 3).
Request(corev1.ResourceCPU, "1").
SchedulingGates(corev1.PodSchedulingGate{Name: "kueue.x-k8s.io/admission"}).
Obj(),
).
Queue("user-queue").
Priority(0).
OwnerReference(corev1.SchemeGroupVersion.WithKind("Pod"), "pod", "test-uid").
Annotations(map[string]string{
"kueue.x-k8s.io/is-group-workload": "true",
controllerconsts.ProvReqAnnotationPrefix + "test-annotation": "test-val"}).
Obj(),
},
workloadCmpOpts: defaultWorkloadCmpOpts,
wantEvents: []utiltesting.EventRecord{
{
Key: types.NamespacedName{Name: "pod", Namespace: "ns"},
EventType: "Normal",
Reason: "CreatedWorkload",
Message: "Created Workload: ns/test-group",
},
},
},
"workload is found for the pod group": {
pods: []corev1.Pod{
*basePodWrapper.
Expand Down
15 changes: 8 additions & 7 deletions pkg/controller/jobs/pod/pod_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,14 @@ import (
)

const (
ManagedLabelKey = constants.ManagedByKueueLabel
ManagedLabelValue = "true"
PodFinalizer = ManagedLabelKey
GroupNameLabel = "kueue.x-k8s.io/pod-group-name"
GroupTotalCountAnnotation = "kueue.x-k8s.io/pod-group-total-count"
RoleHashAnnotation = "kueue.x-k8s.io/role-hash"
RetriableInGroupAnnotation = "kueue.x-k8s.io/retriable-in-group"
ManagedLabelKey = constants.ManagedByKueueLabel
ManagedLabelValue = "true"
PodFinalizer = ManagedLabelKey
GroupNameLabel = "kueue.x-k8s.io/pod-group-name"
GroupTotalCountAnnotation = "kueue.x-k8s.io/pod-group-total-count"
GroupFastAdmissionAnnotation = "kueue.x-k8s.io/pod-group-fast-admission"
RoleHashAnnotation = "kueue.x-k8s.io/role-hash"
RetriableInGroupAnnotation = "kueue.x-k8s.io/retriable-in-group"
)

var (
Expand Down
79 changes: 79 additions & 0 deletions test/integration/controller/jobs/pod/pod_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -614,6 +614,85 @@ var _ = ginkgo.Describe("Pod controller", ginkgo.Ordered, ginkgo.ContinueOnFailu
})
})

ginkgo.It("Should ungate pods when admitted with fast admission", func() {
ginkgo.By("Creating pod1 and delaying creation of pod2")
pod1 := testingpod.MakePod("test-pod1", ns.Name).
Group("test-group").
GroupTotalCount("2").
Annotation(podcontroller.GroupFastAdmissionAnnotation, "true").
Queue("test-queue").
Obj()
pod1LookupKey := client.ObjectKeyFromObject(pod1)
gomega.Expect(k8sClient.Create(ctx, pod1)).Should(gomega.Succeed())

ginkgo.By("checking that workload is created for the pod group with the queue name")
wlLookupKey := types.NamespacedName{
Namespace: pod1.Namespace,
Name: "test-group",
}
createdWorkload := &kueue.Workload{}
gomega.Eventually(func() error {
return k8sClient.Get(ctx, wlLookupKey, createdWorkload)
}, util.Timeout, util.Interval).Should(gomega.Succeed())

gomega.Expect(createdWorkload.Spec.PodSets).To(gomega.HaveLen(1))
gomega.Expect(createdWorkload.Spec.PodSets[0].Count).To(gomega.Equal(int32(2)))

gomega.Expect(createdWorkload.Spec.QueueName).To(gomega.Equal("test-queue"), "The Workload should have .spec.queueName set")

ginkgo.By("checking that all pods in group are unsuspended when workload is admitted", func() {
admission := testing.MakeAdmission(clusterQueue.Name).PodSets(
kueue.PodSetAssignment{
Name: "bf90803c",
Flavors: map[corev1.ResourceName]kueue.ResourceFlavorReference{
corev1.ResourceCPU: "default",
},
Count: ptr.To[int32](2),
},
).Obj()
gomega.Expect(util.SetQuotaReservation(ctx, k8sClient, createdWorkload, admission)).Should(gomega.Succeed())
util.SyncAdmittedConditionForWorkloads(ctx, k8sClient, createdWorkload)

util.ExpectWorkloadsToBeAdmitted(ctx, k8sClient, createdWorkload)

util.ExpectPodUnsuspendedWithNodeSelectors(ctx, k8sClient, pod1LookupKey, map[string]string{"kubernetes.io/arch": "arm64"})
})

pod2 := testingpod.MakePod("test-pod2", ns.Name).
Group("test-group").
GroupTotalCount("2").
Annotation(podcontroller.GroupFastAdmissionAnnotation, "true").
Queue("test-queue").
Obj()
pod2LookupKey := client.ObjectKeyFromObject(pod2)

ginkgo.By("Creating pod2 and checking that it is unsuspended", func() {
gomega.Expect(k8sClient.Create(ctx, pod2)).Should(gomega.Succeed())
util.ExpectPodUnsuspendedWithNodeSelectors(ctx, k8sClient, pod2LookupKey, map[string]string{"kubernetes.io/arch": "arm64"})
})

ginkgo.By("checking that pod group is finalized when all pods in the group succeed", func() {
util.SetPodsPhase(ctx, k8sClient, corev1.PodSucceeded, pod1)
util.SetPodsPhase(ctx, k8sClient, corev1.PodSucceeded, pod2)

gomega.Eventually(func() []metav1.Condition {
err := k8sClient.Get(ctx, wlLookupKey, createdWorkload)
if err != nil {
return nil
}
return createdWorkload.Status.Conditions
}, util.Timeout, util.Interval).Should(gomega.ContainElement(
gomega.BeComparableTo(
metav1.Condition{Type: kueue.WorkloadFinished, Status: metav1.ConditionTrue},
wlConditionCmpOpts...,
),
), "Expected 'Finished' workload condition")

util.ExpectPodsFinalized(ctx, k8sClient, pod1LookupKey)
util.ExpectPodsFinalized(ctx, k8sClient, pod2LookupKey)
})
})

ginkgo.It("Should keep the running pod group with the queue name if workload is evicted", func() {
ginkgo.By("Creating pods with queue name")
pod1 := testingpod.MakePod("test-pod1", ns.Name).
Expand Down

0 comments on commit a768127

Please sign in to comment.