Skip to content

Commit

Permalink
second round of reviews
Browse files Browse the repository at this point in the history
  • Loading branch information
ahg-g committed Aug 11, 2022
1 parent 74c2fef commit 7d5bdf3
Show file tree
Hide file tree
Showing 9 changed files with 101 additions and 25 deletions.
5 changes: 3 additions & 2 deletions pkg/controller/core/clusterqueue_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,10 +188,11 @@ func (r *ClusterQueueReconciler) Update(e event.UpdateEvent) bool {
}
defer r.notifyWatchers(oldCq, newCq)

ctx := ctrl.LoggerInto(context.Background(), log)
if err := r.cache.UpdateClusterQueue(newCq); err != nil {
log.Error(err, "Failed to update clusterQueue in cache")
}
if err := r.qManager.UpdateClusterQueue(newCq); err != nil {
if err := r.qManager.UpdateClusterQueue(ctx, newCq); err != nil {
log.Error(err, "Failed to update clusterQueue in queue manager")
}
return true
Expand Down Expand Up @@ -265,7 +266,7 @@ func (h *cqNamespaceHandler) Update(e event.UpdateEvent, q workqueue.RateLimitin
cqs.Insert(cq)
}
}
h.qManager.QueueInadmissibleWorkloads(cqs)
h.qManager.QueueInadmissibleWorkloads(context.Background(), cqs)
}

func (h *cqNamespaceHandler) Delete(event.DeleteEvent, workqueue.RateLimitingInterface) {
Expand Down
9 changes: 6 additions & 3 deletions pkg/controller/core/resourceflavor_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,8 @@ func (r *ResourceFlavorReconciler) Create(e event.CreateEvent) bool {
// As long as one clusterQueue becomes active,
// we should inform clusterQueue controller to broadcast the event.
if cqNames := r.cache.AddOrUpdateResourceFlavor(flv.DeepCopy()); len(cqNames) > 0 {
r.qManager.QueueInadmissibleWorkloads(cqNames)
ctx := ctrl.LoggerInto(context.Background(), log)
r.qManager.QueueInadmissibleWorkloads(ctx, cqNames)
// If at least one CQ becomes active, then those CQs should now get evaluated by the scheduler;
// note that the workloads in those CQs are not necessarily "inadmissible", and hence we trigger a
// broadcast here in all cases.
Expand All @@ -128,7 +129,8 @@ func (r *ResourceFlavorReconciler) Delete(e event.DeleteEvent) bool {
log.V(2).Info("ResourceFlavor delete event")

if cqNames := r.cache.DeleteResourceFlavor(flv); len(cqNames) > 0 {
r.qManager.QueueInadmissibleWorkloads(cqNames)
ctx := ctrl.LoggerInto(context.Background(), log)
r.qManager.QueueInadmissibleWorkloads(ctx, cqNames)
}
return false
}
Expand All @@ -147,7 +149,8 @@ func (r *ResourceFlavorReconciler) Update(e event.UpdateEvent) bool {
}

if cqNames := r.cache.AddOrUpdateResourceFlavor(flv.DeepCopy()); len(cqNames) > 0 {
r.qManager.QueueInadmissibleWorkloads(cqNames)
ctx := ctrl.LoggerInto(context.Background(), log)
r.qManager.QueueInadmissibleWorkloads(ctx, cqNames)
}
return false
}
Expand Down
10 changes: 7 additions & 3 deletions pkg/controller/core/workload_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,8 @@ func (r *WorkloadReconciler) Delete(e event.DeleteEvent) bool {
}
log := r.log.WithValues("workload", klog.KObj(wl), "queue", wl.Spec.QueueName, "status", status)
log.V(2).Info("Workload delete event")
ctx := ctrl.LoggerInto(context.Background(), log)

// When assigning a clusterQueue to a workload, we assume it in the cache. If
// the state is unknown, the workload could have been assumed and we need
// to clear it from the cache.
Expand All @@ -155,7 +157,7 @@ func (r *WorkloadReconciler) Delete(e event.DeleteEvent) bool {
}

// trigger the move of associated inadmissibleWorkloads if required.
r.queues.QueueAssociatedInadmissibleWorkloads(wl)
r.queues.QueueAssociatedInadmissibleWorkloads(ctx, wl)
}

// Even if the state is unknown, the last cached state tells us whether the
Expand All @@ -174,6 +176,8 @@ func (r *WorkloadReconciler) Update(e event.UpdateEvent) bool {

status := workloadStatus(wl)
log := r.log.WithValues("workload", klog.KObj(wl), "queue", wl.Spec.QueueName, "status", status)
ctx := ctrl.LoggerInto(context.Background(), log)

prevQueue := oldWl.Spec.QueueName
if prevQueue != wl.Spec.QueueName {
log = log.WithValues("prevQueue", prevQueue)
Expand Down Expand Up @@ -202,7 +206,7 @@ func (r *WorkloadReconciler) Update(e event.UpdateEvent) bool {
r.queues.DeleteWorkload(oldWl)

// trigger the move of associated inadmissibleWorkloads if required.
r.queues.QueueAssociatedInadmissibleWorkloads(wl)
r.queues.QueueAssociatedInadmissibleWorkloads(ctx, wl)

case prevStatus == pending && status == pending:
if !r.queues.UpdateWorkload(oldWl, wlCopy) {
Expand All @@ -220,7 +224,7 @@ func (r *WorkloadReconciler) Update(e event.UpdateEvent) bool {
log.Error(err, "Failed to delete workload from cache")
}
// trigger the move of associated inadmissibleWorkloads if required.
r.queues.QueueAssociatedInadmissibleWorkloads(wl)
r.queues.QueueAssociatedInadmissibleWorkloads(ctx, wl)

if !r.queues.AddOrUpdateWorkload(wlCopy) {
log.V(2).Info("Queue for workload didn't exist; ignored for now")
Expand Down
62 changes: 62 additions & 0 deletions pkg/queue/cluster_queue_best_effort_fifo_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package queue

import (
"testing"

"github.com/google/go-cmp/cmp"
kueue "sigs.k8s.io/kueue/apis/kueue/v1alpha1"
utiltesting "sigs.k8s.io/kueue/pkg/util/testing"
"sigs.k8s.io/kueue/pkg/workload"
)

func TestBestEffortFIFORequeueIfNotPresent(t *testing.T) {
tests := map[RequeueReason]struct {
wantInadmissible bool
}{
RequeueReasonFailedAfterNomination: {
wantInadmissible: false,
},
RequeueReasonNamespaceMismatch: {
wantInadmissible: true,
},
RequeueReasonGeneric: {
wantInadmissible: true,
},
}

for reason, test := range tests {
t.Run(string(reason), func(t *testing.T) {
cq, _ := newClusterQueueBestEffortFIFO(&kueue.ClusterQueue{
Spec: kueue.ClusterQueueSpec{
QueueingStrategy: kueue.StrictFIFO,
},
})
wl := utiltesting.MakeWorkload("workload-1", defaultNamespace).Obj()
if ok := cq.RequeueIfNotPresent(workload.NewInfo(wl), reason); !ok {
t.Error("failed to requeue nonexistent workload")
}

_, gotInadmissible := cq.(*ClusterQueueBestEffortFIFO).inadmissibleWorkloads[workload.Key(wl)]
if diff := cmp.Diff(test.wantInadmissible, gotInadmissible); diff != "" {
t.Errorf("Unexpected inadmissible status (-want,+got):\n%s", diff)
}

})
}
}
9 changes: 5 additions & 4 deletions pkg/queue/cluster_queue_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,9 @@ func (c *ClusterQueueImpl) PushOrUpdate(wInfo *workload.Info) {
}

func (c *ClusterQueueImpl) Delete(w *kueue.Workload) {
delete(c.inadmissibleWorkloads, workload.Key(w))
c.heap.Delete(workload.Key(w))
key := workload.Key(w)
delete(c.inadmissibleWorkloads, key)
c.heap.Delete(key)
}

func (c *ClusterQueueImpl) DeleteFromQueue(q *Queue) {
Expand Down Expand Up @@ -151,7 +152,7 @@ func (c *ClusterQueueImpl) requeueIfNotPresent(wInfo *workload.Info, immediate b

// QueueInadmissibleWorkloads moves all workloads from inadmissibleWorkloads to heap.
// If at least one workload is moved, returns true. Otherwise returns false.
func (c *ClusterQueueImpl) QueueInadmissibleWorkloads(client client.Client) bool {
func (c *ClusterQueueImpl) QueueInadmissibleWorkloads(ctx context.Context, client client.Client) bool {
if len(c.inadmissibleWorkloads) == 0 {
return false
}
Expand All @@ -160,7 +161,7 @@ func (c *ClusterQueueImpl) QueueInadmissibleWorkloads(client client.Client) bool
moved := false
for key, wInfo := range c.inadmissibleWorkloads {
ns := corev1.Namespace{}
err := client.Get(context.Background(), types.NamespacedName{Name: wInfo.Obj.Namespace}, &ns)
err := client.Get(ctx, types.NamespacedName{Name: wInfo.Obj.Namespace}, &ns)
if err != nil || !c.namespaceSelector.Matches(labels.Set(ns.Labels)) {
inadmissibleWorkloads[key] = wInfo
} else {
Expand Down
4 changes: 3 additions & 1 deletion pkg/queue/cluster_queue_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package queue

import (
"context"
"testing"
"time"

Expand Down Expand Up @@ -319,7 +320,8 @@ func TestClusterQueueImpl(t *testing.T) {
}

if test.queueInadmissibleWorkloads {
if diff := cmp.Diff(test.wantInadmissibleWorkloadsRequeued, cq.QueueInadmissibleWorkloads(cl)); diff != "" {
if diff := cmp.Diff(test.wantInadmissibleWorkloadsRequeued,
cq.QueueInadmissibleWorkloads(context.Background(), cl)); diff != "" {
t.Errorf("Unexpected requeueing of inadmissible workloads (-want,+got):\n%s", diff)
}
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/queue/cluster_queue_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package queue

import (
"context"
"fmt"

"k8s.io/apimachinery/pkg/util/sets"
Expand Down Expand Up @@ -72,7 +73,7 @@ type ClusterQueue interface {
// QueueInadmissibleWorkloads moves all workloads put in temporary placeholder stage
// to the ClusterQueue. If at least one workload is moved,
// returns true. Otherwise returns false.
QueueInadmissibleWorkloads(client client.Client) bool
QueueInadmissibleWorkloads(ctx context.Context, client client.Client) bool

// Pending returns the number of pending workloads.
Pending() int32
Expand Down
20 changes: 10 additions & 10 deletions pkg/queue/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,15 +106,15 @@ func (m *Manager) AddClusterQueue(ctx context.Context, cq *kueue.ClusterQueue) e
}
}

queued := m.queueAllInadmissibleWorkloadsInCohort(cqImpl)
queued := m.queueAllInadmissibleWorkloadsInCohort(ctx, cqImpl)
reportPendingWorkloads(cq.Name, cqImpl.Pending())
if queued || addedWorkloads {
m.Broadcast()
}
return nil
}

func (m *Manager) UpdateClusterQueue(cq *kueue.ClusterQueue) error {
func (m *Manager) UpdateClusterQueue(ctx context.Context, cq *kueue.ClusterQueue) error {
m.Lock()
defer m.Unlock()
cqImpl, ok := m.clusterQueues[cq.Name]
Expand All @@ -133,7 +133,7 @@ func (m *Manager) UpdateClusterQueue(cq *kueue.ClusterQueue) error {
}

// TODO(#8): Selectively move workloads based on the exact event.
if m.queueAllInadmissibleWorkloadsInCohort(cqImpl) {
if m.queueAllInadmissibleWorkloadsInCohort(ctx, cqImpl) {
m.Broadcast()
}

Expand Down Expand Up @@ -343,7 +343,7 @@ func (m *Manager) deleteWorkloadFromQueueAndClusterQueue(w *kueue.Workload, qKey
// QueueAssociatedInadmissibleWorkloads moves all associated workloads from
// inadmissibleWorkloads to heap. If at least one workload is moved,
// returns true. Otherwise returns false.
func (m *Manager) QueueAssociatedInadmissibleWorkloads(w *kueue.Workload) {
func (m *Manager) QueueAssociatedInadmissibleWorkloads(ctx context.Context, w *kueue.Workload) {
m.Lock()
defer m.Unlock()

Expand All @@ -357,15 +357,15 @@ func (m *Manager) QueueAssociatedInadmissibleWorkloads(w *kueue.Workload) {
return
}

if m.queueAllInadmissibleWorkloadsInCohort(cq) {
if m.queueAllInadmissibleWorkloadsInCohort(ctx, cq) {
m.Broadcast()
}
}

// QueueInadmissibleWorkloads moves all inadmissibleWorkloads in
// corresponding ClusterQueues to heap. If at least one workload queued,
// we will broadcast the event.
func (m *Manager) QueueInadmissibleWorkloads(cqNames sets.String) {
func (m *Manager) QueueInadmissibleWorkloads(ctx context.Context, cqNames sets.String) {
m.Lock()
defer m.Unlock()
if len(cqNames) == 0 {
Expand All @@ -378,7 +378,7 @@ func (m *Manager) QueueInadmissibleWorkloads(cqNames sets.String) {
if !exists {
continue
}
if m.queueAllInadmissibleWorkloadsInCohort(cq) {
if m.queueAllInadmissibleWorkloadsInCohort(ctx, cq) {
queued = true
}
}
Expand All @@ -398,16 +398,16 @@ func (m *Manager) QueueInadmissibleWorkloads(cqNames sets.String) {
// 1. delete events for any admitted workload in the cohort.
// 2. add events of any cluster queue in the cohort.
// 3. update events of any cluster queue in the cohort.
func (m *Manager) queueAllInadmissibleWorkloadsInCohort(cq ClusterQueue) bool {
func (m *Manager) queueAllInadmissibleWorkloadsInCohort(ctx context.Context, cq ClusterQueue) bool {
cohort := cq.Cohort()
if cohort == "" {
return cq.QueueInadmissibleWorkloads(m.client)
return cq.QueueInadmissibleWorkloads(ctx, m.client)
}

queued := false
for cqName := range m.cohorts[cohort] {
if clusterQueue, ok := m.clusterQueues[cqName]; ok {
queued = clusterQueue.QueueInadmissibleWorkloads(m.client) || queued
queued = clusterQueue.QueueInadmissibleWorkloads(ctx, m.client) || queued
}
}
return queued
Expand Down
4 changes: 3 additions & 1 deletion pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,8 @@ const (
skipped entryStatus = "skipped"
// indicates if the workload was assumed to have been admitted.
assumed entryStatus = "assumed"
// indicates that the workload was never nominated for admission.
notNominated entryStatus = ""
)

// entry holds requirements for a workload to be admitted by a clusterQueue.
Expand Down Expand Up @@ -529,7 +531,7 @@ func (s *Scheduler) requeueAndUpdate(log logr.Logger, ctx context.Context, e ent
added := s.queues.RequeueWorkload(ctx, &e.Info, e.requeueReason)
log.V(2).Info("Workload re-queued", "workload", klog.KObj(e.Obj), "clusterQueue", e.ClusterQueue, "queue", klog.KRef(e.Obj.Namespace, e.Obj.Spec.QueueName), "added", added, "status", e.status)

if e.status == "" {
if e.status == notNominated {
err := workload.UpdateStatus(ctx, s.client, e.Obj, kueue.WorkloadAdmitted, corev1.ConditionFalse, "Pending", e.inadmissibleMsg)
if err != nil {
log.Error(err, "Could not update Workload status")
Expand Down

0 comments on commit 7d5bdf3

Please sign in to comment.