Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid queueing workloads that don't match CQ namespaceSelector #322

Merged
merged 1 commit into from
Aug 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions apis/config/v1alpha1/groupversion_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ limitations under the License.
*/

// Package v1alpha1 contains API Schema definitions for the config v1alpha1 API group
//+kubebuilder:object:generate=true
//+groupName=config.x-k8s.io
// +kubebuilder:object:generate=true
// +groupName=config.x-k8s.io
package v1alpha1

import (
Expand Down
4 changes: 2 additions & 2 deletions apis/kueue/v1alpha1/groupversion_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ limitations under the License.
*/

// Package v1alpha1 contains API Schema definitions for the kueue v1alpha1 API group
//+kubebuilder:object:generate=true
//+groupName=kueue.x-k8s.io
// +kubebuilder:object:generate=true
// +groupName=kueue.x-k8s.io
package v1alpha1

import (
Expand Down
2 changes: 1 addition & 1 deletion apis/kueue/webhooks/workload_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func ValidateWorkloadUpdate(newObj, oldObj *kueue.Workload) field.ErrorList {
return allErrs
}

/// validateQueueNameUpdate validates that queueName is set once
// validateQueueNameUpdate validates that queueName is set once
func validateQueueNameUpdate(new, old string, path *field.Path) field.ErrorList {
if len(old) == 0 {
return nil
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/core/clusterqueue_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ func (r *ClusterQueueReconciler) Update(e event.UpdateEvent) bool {
if err := r.cache.UpdateClusterQueue(newCq); err != nil {
log.Error(err, "Failed to update clusterQueue in cache")
}
if err := r.qManager.UpdateClusterQueue(newCq); err != nil {
if err := r.qManager.UpdateClusterQueue(context.Background(), newCq); err != nil {
log.Error(err, "Failed to update clusterQueue in queue manager")
}
return true
Expand Down Expand Up @@ -265,7 +265,7 @@ func (h *cqNamespaceHandler) Update(e event.UpdateEvent, q workqueue.RateLimitin
cqs.Insert(cq)
}
}
h.qManager.QueueInadmissibleWorkloads(cqs)
h.qManager.QueueInadmissibleWorkloads(context.Background(), cqs)
}

func (h *cqNamespaceHandler) Delete(event.DeleteEvent, workqueue.RateLimitingInterface) {
Expand Down
6 changes: 3 additions & 3 deletions pkg/controller/core/resourceflavor_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func (r *ResourceFlavorReconciler) Create(e event.CreateEvent) bool {
// As long as one clusterQueue becomes active,
// we should inform clusterQueue controller to broadcast the event.
if cqNames := r.cache.AddOrUpdateResourceFlavor(flv.DeepCopy()); len(cqNames) > 0 {
r.qManager.QueueInadmissibleWorkloads(cqNames)
r.qManager.QueueInadmissibleWorkloads(context.Background(), cqNames)
// If at least one CQ becomes active, then those CQs should now get evaluated by the scheduler;
// note that the workloads in those CQs are not necessarily "inadmissible", and hence we trigger a
// broadcast here in all cases.
Expand All @@ -128,7 +128,7 @@ func (r *ResourceFlavorReconciler) Delete(e event.DeleteEvent) bool {
log.V(2).Info("ResourceFlavor delete event")

if cqNames := r.cache.DeleteResourceFlavor(flv); len(cqNames) > 0 {
r.qManager.QueueInadmissibleWorkloads(cqNames)
r.qManager.QueueInadmissibleWorkloads(context.Background(), cqNames)
}
return false
}
Expand All @@ -147,7 +147,7 @@ func (r *ResourceFlavorReconciler) Update(e event.UpdateEvent) bool {
}

if cqNames := r.cache.AddOrUpdateResourceFlavor(flv.DeepCopy()); len(cqNames) > 0 {
r.qManager.QueueInadmissibleWorkloads(cqNames)
r.qManager.QueueInadmissibleWorkloads(context.Background(), cqNames)
}
return false
}
Expand Down
10 changes: 7 additions & 3 deletions pkg/controller/core/workload_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,8 @@ func (r *WorkloadReconciler) Delete(e event.DeleteEvent) bool {
}
log := r.log.WithValues("workload", klog.KObj(wl), "queue", wl.Spec.QueueName, "status", status)
log.V(2).Info("Workload delete event")
ctx := ctrl.LoggerInto(context.Background(), log)

// When assigning a clusterQueue to a workload, we assume it in the cache. If
// the state is unknown, the workload could have been assumed and we need
// to clear it from the cache.
Expand All @@ -155,7 +157,7 @@ func (r *WorkloadReconciler) Delete(e event.DeleteEvent) bool {
}

// trigger the move of associated inadmissibleWorkloads if required.
r.queues.QueueAssociatedInadmissibleWorkloads(wl)
r.queues.QueueAssociatedInadmissibleWorkloads(ctx, wl)
}

// Even if the state is unknown, the last cached state tells us whether the
Expand All @@ -174,6 +176,8 @@ func (r *WorkloadReconciler) Update(e event.UpdateEvent) bool {

status := workloadStatus(wl)
log := r.log.WithValues("workload", klog.KObj(wl), "queue", wl.Spec.QueueName, "status", status)
ctx := ctrl.LoggerInto(context.Background(), log)

prevQueue := oldWl.Spec.QueueName
if prevQueue != wl.Spec.QueueName {
log = log.WithValues("prevQueue", prevQueue)
Expand Down Expand Up @@ -202,7 +206,7 @@ func (r *WorkloadReconciler) Update(e event.UpdateEvent) bool {
r.queues.DeleteWorkload(oldWl)

// trigger the move of associated inadmissibleWorkloads if required.
r.queues.QueueAssociatedInadmissibleWorkloads(wl)
r.queues.QueueAssociatedInadmissibleWorkloads(ctx, wl)

case prevStatus == pending && status == pending:
if !r.queues.UpdateWorkload(oldWl, wlCopy) {
Expand All @@ -220,7 +224,7 @@ func (r *WorkloadReconciler) Update(e event.UpdateEvent) bool {
log.Error(err, "Failed to delete workload from cache")
}
// trigger the move of associated inadmissibleWorkloads if required.
r.queues.QueueAssociatedInadmissibleWorkloads(wl)
r.queues.QueueAssociatedInadmissibleWorkloads(ctx, wl)

if !r.queues.AddOrUpdateWorkload(wlCopy) {
log.V(2).Info("Queue for workload didn't exist; ignored for now")
Expand Down
92 changes: 5 additions & 87 deletions pkg/queue/cluster_queue_best_effort_fifo.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ limitations under the License.
package queue

import (
"k8s.io/apimachinery/pkg/api/equality"

kueue "sigs.k8s.io/kueue/apis/kueue/v1alpha1"
"sigs.k8s.io/kueue/pkg/workload"
)
Expand All @@ -27,9 +25,6 @@ import (
// BestEffortFIFO.
type ClusterQueueBestEffortFIFO struct {
*ClusterQueueImpl

// inadmissibleWorkloads are workloads that have been tried at least once and couldn't be admitted.
inadmissibleWorkloads map[string]*workload.Info
}

var _ ClusterQueue = &ClusterQueueBestEffortFIFO{}
Expand All @@ -39,90 +34,13 @@ const BestEffortFIFO = kueue.BestEffortFIFO
func newClusterQueueBestEffortFIFO(cq *kueue.ClusterQueue) (ClusterQueue, error) {
cqImpl := newClusterQueueImpl(keyFunc, byCreationTime)
cqBE := &ClusterQueueBestEffortFIFO{
ClusterQueueImpl: cqImpl,
inadmissibleWorkloads: make(map[string]*workload.Info),
}

cqBE.Update(cq)
return cqBE, nil
}

func (cq *ClusterQueueBestEffortFIFO) PushOrUpdate(wInfo *workload.Info) {
key := workload.Key(wInfo.Obj)
oldInfo := cq.inadmissibleWorkloads[key]
if oldInfo != nil {
// update in place if the workload was inadmissible and didn't change
// to potentially become admissible.
if equality.Semantic.DeepEqual(oldInfo.Obj.Spec, wInfo.Obj.Spec) {
cq.inadmissibleWorkloads[key] = wInfo
return
}
// otherwise move or update in place in the queue.
delete(cq.inadmissibleWorkloads, key)
}

cq.ClusterQueueImpl.PushOrUpdate(wInfo)
}

func (cq *ClusterQueueBestEffortFIFO) Delete(w *kueue.Workload) {
delete(cq.inadmissibleWorkloads, workload.Key(w))
cq.ClusterQueueImpl.Delete(w)
}

func (cq *ClusterQueueBestEffortFIFO) DeleteFromQueue(q *Queue) {
for _, w := range q.items {
key := workload.Key(w.Obj)
if wl := cq.inadmissibleWorkloads[key]; wl != nil {
delete(cq.inadmissibleWorkloads, key)
}
}
cq.ClusterQueueImpl.DeleteFromQueue(q)
}

// RequeueIfNotPresent inserts a workload that cannot be admitted into
// ClusterQueue, unless it is already in the queue. If immediate is true,
// the workload will be pushed back to heap directly. If not,
// the workload will be put into the inadmissibleWorkloads.
func (cq *ClusterQueueBestEffortFIFO) RequeueIfNotPresent(wInfo *workload.Info, immediate bool) bool {
key := workload.Key(wInfo.Obj)
if immediate {
// If the workload was inadmissible, move it back into the queue.
inadmissibleWl := cq.inadmissibleWorkloads[key]
if inadmissibleWl != nil {
wInfo = inadmissibleWl
delete(cq.inadmissibleWorkloads, key)
}
return cq.ClusterQueueImpl.pushIfNotPresent(wInfo)
}

if cq.inadmissibleWorkloads[key] != nil {
return false
}

if data := cq.heap.GetByKey(key); data != nil {
return false
}

cq.inadmissibleWorkloads[key] = wInfo

return true
}

// QueueInadmissibleWorkloads moves all workloads from inadmissibleWorkloads to heap.
// If at least one workload is moved, returns true. Otherwise returns false.
func (cq *ClusterQueueBestEffortFIFO) QueueInadmissibleWorkloads() bool {
if len(cq.inadmissibleWorkloads) == 0 {
return false
}

for _, wInfo := range cq.inadmissibleWorkloads {
cq.ClusterQueueImpl.pushIfNotPresent(wInfo)
ClusterQueueImpl: cqImpl,
}

cq.inadmissibleWorkloads = make(map[string]*workload.Info)
return true
err := cqBE.Update(cq)
return cqBE, err
}

func (cq *ClusterQueueBestEffortFIFO) Pending() int32 {
return cq.ClusterQueueImpl.Pending() + int32(len(cq.inadmissibleWorkloads))
func (cq *ClusterQueueBestEffortFIFO) RequeueIfNotPresent(wInfo *workload.Info, reason RequeueReason) bool {
return cq.ClusterQueueImpl.requeueIfNotPresent(wInfo, reason == RequeueReasonFailedAfterNomination)
}
Loading