Skip to content

Commit

Permalink
Avoid queueing workloads that don't match CQ namespaceSelector
Browse files Browse the repository at this point in the history
  • Loading branch information
ahg-g committed Aug 12, 2022
1 parent 5402db6 commit 84cd6d7
Show file tree
Hide file tree
Showing 17 changed files with 490 additions and 326 deletions.
4 changes: 2 additions & 2 deletions apis/config/v1alpha1/groupversion_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ limitations under the License.
*/

// Package v1alpha1 contains API Schema definitions for the config v1alpha1 API group
//+kubebuilder:object:generate=true
//+groupName=config.x-k8s.io
// +kubebuilder:object:generate=true
// +groupName=config.x-k8s.io
package v1alpha1

import (
Expand Down
4 changes: 2 additions & 2 deletions apis/kueue/v1alpha1/groupversion_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ limitations under the License.
*/

// Package v1alpha1 contains API Schema definitions for the kueue v1alpha1 API group
//+kubebuilder:object:generate=true
//+groupName=kueue.x-k8s.io
// +kubebuilder:object:generate=true
// +groupName=kueue.x-k8s.io
package v1alpha1

import (
Expand Down
2 changes: 1 addition & 1 deletion apis/kueue/webhooks/workload_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func ValidateWorkloadUpdate(newObj, oldObj *kueue.Workload) field.ErrorList {
return allErrs
}

/// validateQueueNameUpdate validates that queueName is set once
// validateQueueNameUpdate validates that queueName is set once
func validateQueueNameUpdate(new, old string, path *field.Path) field.ErrorList {
if len(old) == 0 {
return nil
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/core/clusterqueue_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ func (r *ClusterQueueReconciler) Update(e event.UpdateEvent) bool {
if err := r.cache.UpdateClusterQueue(newCq); err != nil {
log.Error(err, "Failed to update clusterQueue in cache")
}
if err := r.qManager.UpdateClusterQueue(newCq); err != nil {
if err := r.qManager.UpdateClusterQueue(context.Background(), newCq); err != nil {
log.Error(err, "Failed to update clusterQueue in queue manager")
}
return true
Expand Down Expand Up @@ -265,7 +265,7 @@ func (h *cqNamespaceHandler) Update(e event.UpdateEvent, q workqueue.RateLimitin
cqs.Insert(cq)
}
}
h.qManager.QueueInadmissibleWorkloads(cqs)
h.qManager.QueueInadmissibleWorkloads(context.Background(), cqs)
}

func (h *cqNamespaceHandler) Delete(event.DeleteEvent, workqueue.RateLimitingInterface) {
Expand Down
6 changes: 3 additions & 3 deletions pkg/controller/core/resourceflavor_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func (r *ResourceFlavorReconciler) Create(e event.CreateEvent) bool {
// As long as one clusterQueue becomes active,
// we should inform clusterQueue controller to broadcast the event.
if cqNames := r.cache.AddOrUpdateResourceFlavor(flv.DeepCopy()); len(cqNames) > 0 {
r.qManager.QueueInadmissibleWorkloads(cqNames)
r.qManager.QueueInadmissibleWorkloads(context.Background(), cqNames)
// If at least one CQ becomes active, then those CQs should now get evaluated by the scheduler;
// note that the workloads in those CQs are not necessarily "inadmissible", and hence we trigger a
// broadcast here in all cases.
Expand All @@ -128,7 +128,7 @@ func (r *ResourceFlavorReconciler) Delete(e event.DeleteEvent) bool {
log.V(2).Info("ResourceFlavor delete event")

if cqNames := r.cache.DeleteResourceFlavor(flv); len(cqNames) > 0 {
r.qManager.QueueInadmissibleWorkloads(cqNames)
r.qManager.QueueInadmissibleWorkloads(context.Background(), cqNames)
}
return false
}
Expand All @@ -147,7 +147,7 @@ func (r *ResourceFlavorReconciler) Update(e event.UpdateEvent) bool {
}

if cqNames := r.cache.AddOrUpdateResourceFlavor(flv.DeepCopy()); len(cqNames) > 0 {
r.qManager.QueueInadmissibleWorkloads(cqNames)
r.qManager.QueueInadmissibleWorkloads(context.Background(), cqNames)
}
return false
}
Expand Down
10 changes: 7 additions & 3 deletions pkg/controller/core/workload_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,8 @@ func (r *WorkloadReconciler) Delete(e event.DeleteEvent) bool {
}
log := r.log.WithValues("workload", klog.KObj(wl), "queue", wl.Spec.QueueName, "status", status)
log.V(2).Info("Workload delete event")
ctx := ctrl.LoggerInto(context.Background(), log)

// When assigning a clusterQueue to a workload, we assume it in the cache. If
// the state is unknown, the workload could have been assumed and we need
// to clear it from the cache.
Expand All @@ -155,7 +157,7 @@ func (r *WorkloadReconciler) Delete(e event.DeleteEvent) bool {
}

// trigger the move of associated inadmissibleWorkloads if required.
r.queues.QueueAssociatedInadmissibleWorkloads(wl)
r.queues.QueueAssociatedInadmissibleWorkloads(ctx, wl)
}

// Even if the state is unknown, the last cached state tells us whether the
Expand All @@ -174,6 +176,8 @@ func (r *WorkloadReconciler) Update(e event.UpdateEvent) bool {

status := workloadStatus(wl)
log := r.log.WithValues("workload", klog.KObj(wl), "queue", wl.Spec.QueueName, "status", status)
ctx := ctrl.LoggerInto(context.Background(), log)

prevQueue := oldWl.Spec.QueueName
if prevQueue != wl.Spec.QueueName {
log = log.WithValues("prevQueue", prevQueue)
Expand Down Expand Up @@ -202,7 +206,7 @@ func (r *WorkloadReconciler) Update(e event.UpdateEvent) bool {
r.queues.DeleteWorkload(oldWl)

// trigger the move of associated inadmissibleWorkloads if required.
r.queues.QueueAssociatedInadmissibleWorkloads(wl)
r.queues.QueueAssociatedInadmissibleWorkloads(ctx, wl)

case prevStatus == pending && status == pending:
if !r.queues.UpdateWorkload(oldWl, wlCopy) {
Expand All @@ -220,7 +224,7 @@ func (r *WorkloadReconciler) Update(e event.UpdateEvent) bool {
log.Error(err, "Failed to delete workload from cache")
}
// trigger the move of associated inadmissibleWorkloads if required.
r.queues.QueueAssociatedInadmissibleWorkloads(wl)
r.queues.QueueAssociatedInadmissibleWorkloads(ctx, wl)

if !r.queues.AddOrUpdateWorkload(wlCopy) {
log.V(2).Info("Queue for workload didn't exist; ignored for now")
Expand Down
92 changes: 5 additions & 87 deletions pkg/queue/cluster_queue_best_effort_fifo.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ limitations under the License.
package queue

import (
"k8s.io/apimachinery/pkg/api/equality"

kueue "sigs.k8s.io/kueue/apis/kueue/v1alpha1"
"sigs.k8s.io/kueue/pkg/workload"
)
Expand All @@ -27,9 +25,6 @@ import (
// BestEffortFIFO.
type ClusterQueueBestEffortFIFO struct {
*ClusterQueueImpl

// inadmissibleWorkloads are workloads that have been tried at least once and couldn't be admitted.
inadmissibleWorkloads map[string]*workload.Info
}

var _ ClusterQueue = &ClusterQueueBestEffortFIFO{}
Expand All @@ -39,90 +34,13 @@ const BestEffortFIFO = kueue.BestEffortFIFO
func newClusterQueueBestEffortFIFO(cq *kueue.ClusterQueue) (ClusterQueue, error) {
cqImpl := newClusterQueueImpl(keyFunc, byCreationTime)
cqBE := &ClusterQueueBestEffortFIFO{
ClusterQueueImpl: cqImpl,
inadmissibleWorkloads: make(map[string]*workload.Info),
}

cqBE.Update(cq)
return cqBE, nil
}

func (cq *ClusterQueueBestEffortFIFO) PushOrUpdate(wInfo *workload.Info) {
key := workload.Key(wInfo.Obj)
oldInfo := cq.inadmissibleWorkloads[key]
if oldInfo != nil {
// update in place if the workload was inadmissible and didn't change
// to potentially become admissible.
if equality.Semantic.DeepEqual(oldInfo.Obj.Spec, wInfo.Obj.Spec) {
cq.inadmissibleWorkloads[key] = wInfo
return
}
// otherwise move or update in place in the queue.
delete(cq.inadmissibleWorkloads, key)
}

cq.ClusterQueueImpl.PushOrUpdate(wInfo)
}

func (cq *ClusterQueueBestEffortFIFO) Delete(w *kueue.Workload) {
delete(cq.inadmissibleWorkloads, workload.Key(w))
cq.ClusterQueueImpl.Delete(w)
}

func (cq *ClusterQueueBestEffortFIFO) DeleteFromQueue(q *Queue) {
for _, w := range q.items {
key := workload.Key(w.Obj)
if wl := cq.inadmissibleWorkloads[key]; wl != nil {
delete(cq.inadmissibleWorkloads, key)
}
}
cq.ClusterQueueImpl.DeleteFromQueue(q)
}

// RequeueIfNotPresent inserts a workload that cannot be admitted into
// ClusterQueue, unless it is already in the queue. If immediate is true,
// the workload will be pushed back to heap directly. If not,
// the workload will be put into the inadmissibleWorkloads.
func (cq *ClusterQueueBestEffortFIFO) RequeueIfNotPresent(wInfo *workload.Info, immediate bool) bool {
key := workload.Key(wInfo.Obj)
if immediate {
// If the workload was inadmissible, move it back into the queue.
inadmissibleWl := cq.inadmissibleWorkloads[key]
if inadmissibleWl != nil {
wInfo = inadmissibleWl
delete(cq.inadmissibleWorkloads, key)
}
return cq.ClusterQueueImpl.pushIfNotPresent(wInfo)
}

if cq.inadmissibleWorkloads[key] != nil {
return false
}

if data := cq.heap.GetByKey(key); data != nil {
return false
}

cq.inadmissibleWorkloads[key] = wInfo

return true
}

// QueueInadmissibleWorkloads moves all workloads from inadmissibleWorkloads to heap.
// If at least one workload is moved, returns true. Otherwise returns false.
func (cq *ClusterQueueBestEffortFIFO) QueueInadmissibleWorkloads() bool {
if len(cq.inadmissibleWorkloads) == 0 {
return false
}

for _, wInfo := range cq.inadmissibleWorkloads {
cq.ClusterQueueImpl.pushIfNotPresent(wInfo)
ClusterQueueImpl: cqImpl,
}

cq.inadmissibleWorkloads = make(map[string]*workload.Info)
return true
err := cqBE.Update(cq)
return cqBE, err
}

func (cq *ClusterQueueBestEffortFIFO) Pending() int32 {
return cq.ClusterQueueImpl.Pending() + int32(len(cq.inadmissibleWorkloads))
func (cq *ClusterQueueBestEffortFIFO) RequeueIfNotPresent(wInfo *workload.Info, reason RequeueReason) bool {
return cq.ClusterQueueImpl.requeueIfNotPresent(wInfo, reason == RequeueReasonFailedAfterNomination)
}
Loading

0 comments on commit 84cd6d7

Please sign in to comment.