Skip to content

Commit

Permalink
Create PodTemplate before ProvisionRequest. (#4086)
Browse files Browse the repository at this point in the history
  • Loading branch information
mbobrovskyi authored Feb 3, 2025
1 parent 14e2ca7 commit d02a764
Show file tree
Hide file tree
Showing 3 changed files with 359 additions and 223 deletions.
137 changes: 76 additions & 61 deletions pkg/controller/admissionchecks/provisioning/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"k8s.io/utils/ptr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
Expand Down Expand Up @@ -241,17 +242,17 @@ func (c *Controller) syncOwnedProvisionRequest(
continue
}

oldPr, exists := activeOrLastPRForChecks[checkName]
req, exists := activeOrLastPRForChecks[checkName]
attempt := int32(1)
shouldCreatePr := false
if exists {
if (isFailed(oldPr) || (isBookingExpired(oldPr) && !workload.IsAdmitted(wl))) &&
if (isFailed(req) || (isBookingExpired(req) && !workload.IsAdmitted(wl))) &&
// if the workload is Admitted we don't want to retry on BookingExpired
ac != nil && ac.State == kueue.CheckStatePending {
// if the workload is in Retry/Rejected state we don't create another ProvReq
attempt = getAttempt(log, oldPr, wl.Name, checkName)
attempt = getAttempt(log, req, wl.Name, checkName)
if features.Enabled(features.KeepQuotaForProvReqRetry) {
remainingTime := c.remainingTimeToRetry(oldPr, attempt, prc)
remainingTime := c.remainingTimeToRetry(req, attempt, prc)
if remainingTime <= 0 {
shouldCreatePr = true
attempt++
Expand All @@ -269,7 +270,7 @@ func (c *Controller) syncOwnedProvisionRequest(
requestName := ProvisioningRequestName(wl.Name, checkName, attempt)
if shouldCreatePr {
log.V(3).Info("Creating ProvisioningRequest", "requestName", requestName, "attempt", attempt)
req := &autoscaling.ProvisioningRequest{
req = &autoscaling.ProvisioningRequest{
ObjectMeta: metav1.ObjectMeta{
Name: requestName,
Namespace: wl.Namespace,
Expand All @@ -293,9 +294,27 @@ func (c *Controller) syncOwnedProvisionRequest(
if !psFound || !psaFound {
return nil, errInconsistentPodSetAssignments
}

ptName := getProvisioningRequestPodTemplateName(requestName, psName)

pt := &corev1.PodTemplate{}
err := c.client.Get(ctx, types.NamespacedName{Namespace: wl.Namespace, Name: ptName}, pt)
if client.IgnoreNotFound(err) != nil {
return nil, err
}
if err != nil {
// it's a not found, so create it
_, err := c.createPodTemplate(ctx, wl, ptName, ps, psa)
if err != nil {
msg := api.TruncateEventMessage(fmt.Sprintf("Error creating PodTemplate %q: %v", ptName, err))
c.record.Eventf(wl, corev1.EventTypeWarning, "FailedCreate", msg)
return nil, c.handleError(ctx, wl, ac, msg, err)
}
}

req.Spec.PodSets = append(req.Spec.PodSets, autoscaling.PodSet{
PodTemplateRef: autoscaling.Reference{
Name: getProvisioningRequestPodTemplateName(requestName, psName),
Name: ptName,
},
Count: ptr.Deref(psa.Count, ps.Count),
})
Expand All @@ -313,7 +332,7 @@ func (c *Controller) syncOwnedProvisionRequest(
c.record.Eventf(wl, corev1.EventTypeNormal, "ProvisioningRequestCreated", "Created ProvisioningRequest: %q", req.Name)
activeOrLastPRForChecks[checkName] = req
}
if err := c.syncProvisionRequestsPodTemplates(ctx, wl, requestName, prc); err != nil {
if err := c.syncProvisionRequestsPodTemplates(ctx, wl, req); err != nil {
return nil, err
}
}
Expand Down Expand Up @@ -354,40 +373,49 @@ func (c *Controller) handleError(ctx context.Context, wl *kueue.Workload, ac *ku
return errors.Join(err, patchErr)
}

func (c *Controller) syncProvisionRequestsPodTemplates(ctx context.Context, wl *kueue.Workload, prName string, prc *kueue.ProvisioningRequestConfig) error {
request := &autoscaling.ProvisioningRequest{}
requestKey := types.NamespacedName{
Name: prName,
Namespace: wl.Namespace,
func (c *Controller) createPodTemplate(ctx context.Context, wl *kueue.Workload, name string, ps *kueue.PodSet, psa *kueue.PodSetAssignment) (*corev1.PodTemplate, error) {
newPt := &corev1.PodTemplate{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: wl.Namespace,
Labels: map[string]string{
constants.ManagedByKueueLabel: "true",
},
},
Template: ps.Template,
}

// set the controller reference to workload so that the template is not left orphaned
// if the ProvisioningRequest creation fails. The ownership is later transferred to the
// ProvisioningRequest.
if err := ctrl.SetControllerReference(wl, newPt, c.client.Scheme()); err != nil {
return nil, err
}

// apply the admission node selectors to the Template
psi, err := podset.FromAssignment(ctx, c.client, psa, ptr.Deref(psa.Count, ps.Count))
if err != nil {
return nil, err
}
err := c.client.Get(ctx, requestKey, request)

err = podset.Merge(&newPt.Template.ObjectMeta, &newPt.Template.Spec, psi)
if err != nil {
return client.IgnoreNotFound(err)
return nil, err
}

expectedPodSets := requiredPodSets(wl.Spec.PodSets, prc.Spec.ManagedResources)
podsetRefsMap := slices.ToMap(expectedPodSets, func(i int) (string, string) {
return getProvisioningRequestPodTemplateName(prName, expectedPodSets[i]), expectedPodSets[i]
})
// copy limits to requests if needed
workload.UseLimitsAsMissingRequestsInPod(&newPt.Template.Spec)

// the order of the podSets should be the same in the workload and prov. req.
// if the number is different, just delete the request
if len(request.Spec.PodSets) != len(expectedPodSets) {
return c.client.Delete(ctx, request)
if err := c.client.Create(ctx, newPt); err != nil {
return nil, err
}

psaMap := slices.ToRefMap(wl.Status.Admission.PodSetAssignments, func(p *kueue.PodSetAssignment) string { return p.Name })
podSetMap := slices.ToRefMap(wl.Spec.PodSets, func(ps *kueue.PodSet) string { return ps.Name })
return newPt, nil
}

func (c *Controller) syncProvisionRequestsPodTemplates(ctx context.Context, wl *kueue.Workload, request *autoscaling.ProvisioningRequest) error {
for i := range request.Spec.PodSets {
reqPS := &request.Spec.PodSets[i]
psName, refFound := podsetRefsMap[reqPS.PodTemplateRef.Name]
ps, psFound := podSetMap[psName]
psa, psaFound := psaMap[psName]

if !refFound || !psFound || !psaFound || ptr.Deref(psa.Count, 0) != reqPS.Count {
return c.client.Delete(ctx, request)
}

pt := &corev1.PodTemplate{}
ptKey := types.NamespacedName{
Expand All @@ -396,47 +424,34 @@ func (c *Controller) syncProvisionRequestsPodTemplates(ctx context.Context, wl *
}

err := c.client.Get(ctx, ptKey, pt)

if client.IgnoreNotFound(err) != nil {
return err
}

if err != nil {
// it's a not found, so create it
newPt := &corev1.PodTemplate{
ObjectMeta: metav1.ObjectMeta{
Name: ptKey.Name,
Namespace: ptKey.Namespace,
Labels: map[string]string{
constants.ManagedByKueueLabel: "true",
},
},
Template: ps.Template,
}
if err == nil {
var shouldUpdate bool

// apply the admission node selectors to the Template
psi, err := podset.FromAssignment(ctx, c.client, psaMap[psName], reqPS.Count)
if err != nil {
return err
}

err = podset.Merge(&newPt.Template.ObjectMeta, &newPt.Template.Spec, psi)
if err != nil {
return err
// transfer the ownership of the template to the ProvisioningRequest
if metav1.IsControlledBy(pt, wl) {
if err := controllerutil.RemoveControllerReference(wl, pt, c.client.Scheme()); err != nil {
return err
}
shouldUpdate = true
}

// copy limits to requests if needed
workload.UseLimitsAsMissingRequestsInPod(&newPt.Template.Spec)

if err := ctrl.SetControllerReference(request, newPt, c.client.Scheme()); err != nil {
return err
if !metav1.IsControlledBy(pt, request) {
if err := controllerutil.SetControllerReference(request, pt, c.client.Scheme()); err != nil {
return err
}
shouldUpdate = true
}

if err = c.client.Create(ctx, newPt); err != nil {
return err
if shouldUpdate {
if err := c.client.Update(ctx, pt); err != nil {
return err
}
}
}
// maybe check the consistency deeper
}
return nil
}
Expand Down
Loading

0 comments on commit d02a764

Please sign in to comment.