Skip to content

Commit

Permalink
Address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
PBundyra committed Oct 30, 2024
1 parent f9fa268 commit 55daaf8
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 42 deletions.
12 changes: 9 additions & 3 deletions apis/kueue/v1beta1/provisioningrequestconfig_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package v1beta1
import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/ptr"
)

const (
Expand Down Expand Up @@ -59,16 +60,21 @@ type ProvisioningRequestConfigSpec struct {
ManagedResources []corev1.ResourceName `json:"managedResources,omitempty"`

// retryStrategy defines strategy for retrying ProvisioningRequest.
// If null, then the default configuration is applied. To switch off retry mechanism
// set retryStrategy.backoffLimitCount to 0.
// If null, then the default configuration is applied with the following parameter values:
// BackoffLimitCount: 3
// BackoffBaseSeconds: 60 - 1 min
// BackoffMaxSeconds: 1800 - 30 mins
//
// To switch off retry mechanism
// set retryStrategy.backoffLimitCount to 0.
//
// +optional
RetryStrategy *ProvisioningRequestRetryStrategy `json:"retryStrategy,omitempty"`
}

var (
DefaultRetryStrategy = ProvisioningRequestRetryStrategy{
BackoffLimitCount: nil,
BackoffLimitCount: ptr.To(int32(3)),
BackoffBaseSeconds: 60, // 1 min
BackoffMaxSeconds: 1800, // 30 min
}
Expand Down
31 changes: 12 additions & 19 deletions pkg/controller/admissionchecks/provisioning/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,19 +197,17 @@ func (c *Controller) deleteUnusedProvisioningRequests(ctx context.Context, owned

func (c *Controller) syncOwnedProvisionRequest(ctx context.Context, wl *kueue.Workload, wlInfo *workloadInfo, relevantChecks []string, activeOrLastPRForChecks map[string]*autoscaling.ProvisioningRequest) (*time.Duration, error) {
log := ctrl.LoggerFrom(ctx)
var requeueAfter *time.Duration
var requeAfter *time.Duration
for _, checkName := range relevantChecks {
// get the config
prc, err := c.helper.ConfigForAdmissionCheck(ctx, checkName)
if err != nil {
// the check is not active
continue
}
retryStrategy := ptr.Deref(prc.Spec.RetryStrategy, kueue.DefaultRetryStrategy)
if !c.reqIsNeeded(wl, prc) {
continue
}
fmt.Printf("check states: %+v, checkName: %+v", wlInfo.checkStates, checkName)
ac := workload.FindAdmissionCheck(wlInfo.checkStates, checkName)
if ac != nil && ac.State == kueue.CheckStateReady {
log.V(2).Info("Skip syncing of the ProvReq for admission check which is Ready", "workload", klog.KObj(wl), "admissionCheck", checkName)
Expand All @@ -225,18 +223,16 @@ func (c *Controller) syncOwnedProvisionRequest(ctx context.Context, wl *kueue.Wo
ac != nil && ac.State == kueue.CheckStatePending {
// if the workload is in Retry/Rejected state we don't create another ProvReq
attempt = getAttempt(log, oldPr, wl.Name, checkName)
if retryStrategy.BackoffLimitCount == nil || attempt <= *retryStrategy.BackoffLimitCount {
if !features.Enabled(features.KeepQuotaForProvReqRetry) {
shouldCreatePr = true
attempt += 1
}
remainingTime := c.remainingTimeToRetry(oldPr, attempt, prc)
if remainingTime <= 0 {
shouldCreatePr = true
attempt += 1
} else if requeueAfter == nil || remainingTime < *requeueAfter {
requeueAfter = &remainingTime
}
if !features.Enabled(features.KeepQuotaForProvReqRetry) {
shouldCreatePr = true
attempt += 1
}
remainingTime := c.remainingTimeToRetry(oldPr, attempt, prc)
if remainingTime <= 0 {
shouldCreatePr = true
attempt += 1
} else if requeAfter == nil || remainingTime < *requeAfter {
requeAfter = &remainingTime
}
}
} else {
Expand Down Expand Up @@ -296,14 +292,11 @@ func (c *Controller) syncOwnedProvisionRequest(ctx context.Context, wl *kueue.Wo
return nil, err
}
}
return requeueAfter, nil
return requeAfter, nil
}

func (c *Controller) remainingTimeToRetry(pr *autoscaling.ProvisioningRequest, failuresCount int32, prc *kueue.ProvisioningRequestConfig) time.Duration {
retryStrategy := ptr.Deref(prc.Spec.RetryStrategy, kueue.DefaultRetryStrategy)
if prc.Spec.RetryStrategy != nil {

}
backoffDuration := time.Duration(retryStrategy.BackoffBaseSeconds) * time.Second
maxBackoffDuration := time.Duration(retryStrategy.BackoffMaxSeconds) * time.Second
var cond *metav1.Condition
Expand Down
51 changes: 48 additions & 3 deletions pkg/controller/core/workload_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,13 +306,13 @@ func TestSyncCheckStates(t *testing.T) {

var (
workloadCmpOpts = []cmp.Option{
cmpopts.EquateApproxTime(2 * time.Second),
cmpopts.EquateEmpty(),
cmpopts.IgnoreFields(
kueue.Workload{}, "TypeMeta", "ObjectMeta.ResourceVersion",
),
cmpopts.IgnoreFields(metav1.Condition{}, "LastTransitionTime"),
cmpopts.IgnoreFields(kueue.AdmissionCheckState{}, "LastTransitionTime"),
cmpopts.IgnoreFields(kueue.RequeueState{}, "RequeueAt"),
cmpopts.SortSlices(func(a, b metav1.Condition) bool { return a.Type < b.Type }),
}
)
Expand Down Expand Up @@ -736,7 +736,7 @@ func TestReconcile(t *testing.T) {
}).
Obj(),
},
"should keep the WorkloadRequeued condition until the backoff expires": {
"should keep the WorkloadRequeued condition until the WaitForPodsReady backoff expires": {
workload: utiltesting.MakeWorkload("wl", "ns").
Active(true).
Condition(metav1.Condition{
Expand All @@ -759,7 +759,7 @@ func TestReconcile(t *testing.T) {
Obj(),
wantResult: reconcile.Result{RequeueAfter: time.Minute},
},
"should set the WorkloadRequeued condition when backoff expires": {
"should set the WorkloadRequeued condition when the WaitForPodsReady backoff expires": {
workload: utiltesting.MakeWorkload("wl", "ns").
Active(true).
Condition(metav1.Condition{
Expand All @@ -781,6 +781,51 @@ func TestReconcile(t *testing.T) {
RequeueState(ptr.To[int32](1), ptr.To(metav1.NewTime(testStartTime.Truncate(time.Second)))).
Obj(),
},
"should keep the WorkloadRequeued condition until the AdmissionCheck backoff expires": {
workload: utiltesting.MakeWorkload("wl", "ns").
Active(true).
Condition(metav1.Condition{
Type: kueue.WorkloadRequeued,
Status: metav1.ConditionFalse,
Reason: kueue.WorkloadEvictedByAdmissionCheck,
Message: "Exceeded the AdmissionCheck timeout ns",
}).
RequeueState(ptr.To[int32](1), ptr.To(metav1.NewTime(testStartTime.Add(60*time.Second).Truncate(time.Second)))).
Obj(),
wantWorkload: utiltesting.MakeWorkload("wl", "ns").
Active(true).
Condition(metav1.Condition{
Type: kueue.WorkloadRequeued,
Status: metav1.ConditionFalse,
Reason: kueue.WorkloadEvictedByAdmissionCheck,
Message: "Exceeded the AdmissionCheck timeout ns",
}).
RequeueState(ptr.To[int32](1), ptr.To(metav1.NewTime(testStartTime.Add(60*time.Second).Truncate(time.Second)))).
Obj(),
wantResult: reconcile.Result{RequeueAfter: time.Minute},
},
"should set the WorkloadRequeued condition when the AdmissionCheck backoff expires": {
workload: utiltesting.MakeWorkload("wl", "ns").
Active(true).
Condition(metav1.Condition{
Type: kueue.WorkloadRequeued,
Status: metav1.ConditionFalse,
Reason: kueue.WorkloadEvictedByAdmissionCheck,
Message: "Exceeded the AdmissionCheck timeout ns",
}).
RequeueState(ptr.To[int32](1), ptr.To(metav1.NewTime(testStartTime.Truncate(time.Second)))).
Obj(),
wantWorkload: utiltesting.MakeWorkload("wl", "ns").
Active(true).
Condition(metav1.Condition{
Type: kueue.WorkloadRequeued,
Status: metav1.ConditionTrue,
Reason: kueue.WorkloadBackoffFinished,
Message: "The workload backoff was finished",
}).
RequeueState(ptr.To[int32](1), ptr.To(metav1.NewTime(testStartTime.Truncate(time.Second)))).
Obj(),
},
"shouldn't set the WorkloadRequeued condition when backoff expires and workload finished": {
workload: utiltesting.MakeWorkload("wl", "ns").
Active(true).
Expand Down
10 changes: 0 additions & 10 deletions pkg/features/kube_features.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,13 +202,3 @@ func LogFeatureGates(log logr.Logger) {
}
log.V(2).Info("Loaded feature gates", "featureGates", features)
}

func PrintFeatureGates() {
features := make(map[featuregate.Feature]bool, len(defaultFeatureGates))
for f := range utilfeature.DefaultMutableFeatureGate.GetAll() {
if _, ok := defaultFeatureGates[f]; ok {
features[f] = Enabled(f)
}
}
fmt.Printf("PATRYK Loaded feature gates %+v", features)
}
18 changes: 11 additions & 7 deletions pkg/workload/admissionchecks.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ func FindAdmissionCheck(checks []kueue.AdmissionCheckState, checkName string) *k
return &checks[i]
}
}

return nil
}

Expand Down Expand Up @@ -127,7 +128,8 @@ func SetAdmissionCheckState(checks *[]kueue.AdmissionCheckState, newCheck kueue.
// RejectedChecks returns the list of Rejected admission checks
func RejectedChecks(wl *kueue.Workload) []kueue.AdmissionCheckState {
rejectedChecks := make([]kueue.AdmissionCheckState, 0, len(wl.Status.AdmissionChecks))
for _, ac := range wl.Status.AdmissionChecks {
for i := range wl.Status.AdmissionChecks {
ac := wl.Status.AdmissionChecks[i]
if ac.State == kueue.CheckStateRejected {
rejectedChecks = append(rejectedChecks, ac)
}
Expand All @@ -137,8 +139,8 @@ func RejectedChecks(wl *kueue.Workload) []kueue.AdmissionCheckState {

// HasAllChecksReady returns true if all the checks of the workload are ready.
func HasAllChecksReady(wl *kueue.Workload) bool {
for _, ac := range wl.Status.AdmissionChecks {
if ac.State != kueue.CheckStateReady {
for i := range wl.Status.AdmissionChecks {
if wl.Status.AdmissionChecks[i].State != kueue.CheckStateReady {
return false
}
}
Expand All @@ -164,8 +166,9 @@ func HasAllChecks(wl *kueue.Workload, mustHaveChecks sets.Set[string]) bool {

// HasRetryChecks returns true if any of the workloads checks is Retry
func HasRetryChecks(wl *kueue.Workload) bool {
for _, ac := range wl.Status.AdmissionChecks {
if ac.State == kueue.CheckStateRetry {
for i := range wl.Status.AdmissionChecks {
state := wl.Status.AdmissionChecks[i].State
if state == kueue.CheckStateRetry {
return true
}
}
Expand All @@ -174,8 +177,9 @@ func HasRetryChecks(wl *kueue.Workload) bool {

// HasRejectedChecks returns true if any of the workloads checks is Rejected
func HasRejectedChecks(wl *kueue.Workload) bool {
for _, ac := range wl.Status.AdmissionChecks {
if ac.State == kueue.CheckStateRejected {
for i := range wl.Status.AdmissionChecks {
state := wl.Status.AdmissionChecks[i].State
if state == kueue.CheckStateRejected {
return true
}
}
Expand Down

0 comments on commit 55daaf8

Please sign in to comment.