Skip to content

Commit

Permalink
fix some review issues
Browse files Browse the repository at this point in the history
  • Loading branch information
Anton Stuchinskii committed Aug 25, 2023
1 parent 032c77f commit 81a2384
Show file tree
Hide file tree
Showing 20 changed files with 96 additions and 49 deletions.
2 changes: 1 addition & 1 deletion apis/config/v1beta1/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ const (
DefaultClientConnectionQPS float32 = 20.0
DefaultClientConnectionBurst int32 = 30
defaultPodsReadyTimeout = 5 * time.Minute
DefaultQueueVisibilityUpdateIntervalSeconds = 5
DefaultQueueVisibilityUpdateIntervalSeconds int32 = 5
)

func addDefaultingFuncs(scheme *runtime.Scheme) error {
Expand Down
11 changes: 8 additions & 3 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func main() {
}

cCache := cache.New(mgr.GetClient(), cache.WithPodsReadyTracking(blockForPodsReady(&cfg)))
queues := setupManager(mgr, cCache, &cfg)
queues := setupQueueManager(mgr, cCache, &cfg)

ctx := ctrl.SetupSignalHandler()
if err := setupIndexes(ctx, mgr, &cfg); err != nil {
Expand Down Expand Up @@ -179,8 +179,13 @@ func main() {
}
}

func setupManager(mgr ctrl.Manager, cCache *cache.Cache, cfg *configapi.Configuration) *queue.Manager {
queues := queue.NewManager(mgr.GetClient(), cCache, cfg)
func setupQueueManager(mgr ctrl.Manager, cCache *cache.Cache, cfg *configapi.Configuration) *queue.Manager {
queues := queue.NewManager(
mgr.GetClient(),
cCache,
queue.WithQueueVisibilityUpdateInterval(cfg.QueueVisibility.UpdateIntervalSeconds),
queue.WithQueueVisibilityClusterQueuesMaxCount(cfg.QueueVisibility.ClusterQueues.MaxCount),
)
if err := mgr.Add(queues); err != nil {
setupLog.Error(err, "Unable to add queue manager to manager")
os.Exit(1)
Expand Down
2 changes: 1 addition & 1 deletion pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func Load(scheme *runtime.Scheme, configFile string) (ctrl.Options, configapi.Co
return options, cfg, err
}
}
err = validate(&cfg).ToAggregate()
addTo(&options, &cfg)
err = validate(&cfg)
return options, cfg, err
}
15 changes: 11 additions & 4 deletions pkg/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package config

import (
"errors"
"fmt"
"io/fs"
"os"
"path/filepath"
Expand All @@ -28,6 +29,7 @@ import (
"github.com/google/go-cmp/cmp/cmpopts"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/validation/field"
"k8s.io/apimachinery/pkg/util/yaml"
"k8s.io/utils/ptr"
ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -693,7 +695,7 @@ func TestValidate(t *testing.T) {
testcases := []struct {
name string
cfg *configapi.Configuration
wantErr error
wantErr field.ErrorList
}{

{
Expand All @@ -708,7 +710,9 @@ func TestValidate(t *testing.T) {
UpdateIntervalSeconds: 0,
},
},
wantErr: errInvalidUpdateIntervalSeconds,
wantErr: field.ErrorList{
field.Invalid(field.NewPath("queueVisibility").Child("updateIntervalSeconds"), 0, fmt.Sprintf("must be more or equal %d", queueVisibilityClusterQueuesUpdateIntervalSeconds)),
},
},
{
name: "invalid queue visibility cluster queue max count",
Expand All @@ -717,14 +721,17 @@ func TestValidate(t *testing.T) {
ClusterQueues: &configapi.ClusterQueueVisibility{
MaxCount: 4001,
},
UpdateIntervalSeconds: 1,
},
},
wantErr: errInvalidMaxValue,
wantErr: field.ErrorList{
field.Invalid(field.NewPath("queueVisibility").Child("clusterQueues").Child("maxCount"), 4001, fmt.Sprintf("must be less than %d", queueVisibilityClusterQueuesMaxValue)),
},
},
}
for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
if diff := cmp.Diff(tc.wantErr, validate(tc.cfg), cmpopts.EquateErrors()); diff != "" {
if diff := cmp.Diff(tc.wantErr, validate(tc.cfg), cmpopts.IgnoreFields(field.Error{}, "BadValue")); diff != "" {
t.Errorf("Unexpected returned error (-want,+got):\n%s", diff)
}
})
Expand Down
18 changes: 9 additions & 9 deletions pkg/config/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,29 +3,29 @@ package config
import (
"fmt"

configapi "sigs.k8s.io/kueue/apis/config/v1beta1"
)
"k8s.io/apimachinery/pkg/util/validation/field"

var (
errInvalidMaxValue = fmt.Errorf("maximum value for QueueVisibility.ClusterQueues.MaxCount must be %d", queueVisibilityClusterQueuesMaxValue)
errInvalidUpdateIntervalSeconds = fmt.Errorf("minimum value for QueueVisibility.UpdateIntervalSeconds must be %d", queueVisibilityClusterQueuesUpdateIntervalSeconds)
configapi "sigs.k8s.io/kueue/apis/config/v1beta1"
)

const (
queueVisibilityClusterQueuesMaxValue = 4000
queueVisibilityClusterQueuesUpdateIntervalSeconds = 1
)

func validate(cfg *configapi.Configuration) error {
func validate(cfg *configapi.Configuration) field.ErrorList {
var allErrs field.ErrorList
if cfg.QueueVisibility != nil {
queueVisibilityPath := field.NewPath("queueVisibility")
if cfg.QueueVisibility.ClusterQueues != nil {
clusterQueues := queueVisibilityPath.Child("clusterQueues")
if cfg.QueueVisibility.ClusterQueues.MaxCount > queueVisibilityClusterQueuesMaxValue {
return errInvalidMaxValue
allErrs = append(allErrs, field.Invalid(clusterQueues.Child("maxCount"), int(cfg.QueueVisibility.ClusterQueues.MaxCount), fmt.Sprintf("must be less than %d", queueVisibilityClusterQueuesMaxValue)))
}
}
if cfg.QueueVisibility.UpdateIntervalSeconds < queueVisibilityClusterQueuesUpdateIntervalSeconds {
return errInvalidUpdateIntervalSeconds
allErrs = append(allErrs, field.Invalid(queueVisibilityPath.Child("updateIntervalSeconds"), cfg.QueueVisibility.UpdateIntervalSeconds, fmt.Sprintf("must be more or equal %d", queueVisibilityClusterQueuesUpdateIntervalSeconds)))
}
}
return nil
return allErrs
}
2 changes: 1 addition & 1 deletion pkg/controller/core/clusterqueue_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func TestUpdateCqStatusIfChanged(t *testing.T) {
cl := utiltesting.NewClientBuilder().WithLists(defaultWls).WithObjects(lq, cq).WithStatusSubresource(lq, cq).
Build()
cqCache := cache.New(cl)
qManager := queue.NewManager(cl, cqCache, nil)
qManager := queue.NewManager(cl, cqCache)
if err := cqCache.AddClusterQueue(ctx, cq); err != nil {
t.Fatalf("Inserting clusterQueue in cache: %v", err)
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/queue/cluster_queue_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ type ClusterQueue interface {
// Otherwise returns true.
Dump() (sets.Set[string], bool)
DumpInadmissible() (sets.Set[string], bool)
// Snapshot returns a copy of the current workloads in the heap of
// this ClusterQueue. It returns false if the queue is empty.
// Otherwise returns true.
Snapshot(maxCount int32) ([]*kueue.Workload, bool)
// Info returns workload.Info for the workload key.
// Users of this method should not modify the returned object.
Expand Down
44 changes: 38 additions & 6 deletions pkg/queue/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

configapi "sigs.k8s.io/kueue/apis/config/v1beta1"
kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
utilindexer "sigs.k8s.io/kueue/pkg/controller/core/indexer"
"sigs.k8s.io/kueue/pkg/metrics"
Expand All @@ -52,17 +51,48 @@ type Manager struct {
localQueues map[string]*LocalQueue

workloadsStatus *kueue.ClusterQueuePendingWorkloadsStatus
cfg *configapi.Configuration

queueVisibilityUpdateInterval int32
queueVisibilityClusterQueuesMaxCount int32

// Key is cohort's name. Value is a set of associated ClusterQueue names.
cohorts map[string]sets.Set[string]
}

func NewManager(client client.Client, checker StatusChecker, cfg *configapi.Configuration) *Manager {
type Options struct {
QueueVisibilityUpdateInterval int32
QueueVisibilityClusterQueuesMaxCount int32
}

// Option configures the reconciler.
type Option func(*Options)

// WithQueueVisibilityUpdateInterval indicates if the controller should reconcile
// jobs that don't set the queue name annotation.
func WithQueueVisibilityUpdateInterval(interval int32) Option {
return func(o *Options) {
o.QueueVisibilityUpdateInterval = interval
}
}

// WithQueueVisibilityClusterQueuesMaxCount indicates if the controller should reconcile
// jobs that don't set the queue name annotation.
func WithQueueVisibilityClusterQueuesMaxCount(value int32) Option {
return func(o *Options) {
o.QueueVisibilityClusterQueuesMaxCount = value
}
}

var DefaultOptions = Options{}

func NewManager(client client.Client, checker StatusChecker, opts ...Option) *Manager {
options := DefaultOptions
for _, opt := range opts {
opt(&options)
}
m := &Manager{
client: client,
statusChecker: checker,
cfg: cfg,
localQueues: make(map[string]*LocalQueue),
clusterQueues: make(map[string]ClusterQueue),
cohorts: make(map[string]sets.Set[string]),
Expand Down Expand Up @@ -485,7 +515,7 @@ func (m *Manager) Snapshot() []*kueue.Workload {
}
snapshot := make([]*kueue.Workload, 0)
for _, cq := range m.clusterQueues {
if elements, ok := cq.Snapshot(m.cfg.QueueVisibility.ClusterQueues.MaxCount); ok {
if elements, ok := cq.Snapshot(m.queueVisibilityClusterQueuesMaxCount); ok {
snapshot = append(snapshot, elements...)
}
}
Expand Down Expand Up @@ -576,7 +606,7 @@ func (m *Manager) Start(ctx context.Context) error {
ctx = ctrl.LoggerInto(ctx, log)

ticker := time.NewTicker(
time.Duration(m.cfg.QueueVisibility.UpdateIntervalSeconds) * time.Second,
time.Duration(m.queueVisibilityUpdateInterval) * time.Second,
)
defer ticker.Stop()

Expand All @@ -601,5 +631,7 @@ func (m *Manager) Start(ctx context.Context) error {
}

func (m *Manager) GetWorkloadsStatus() *kueue.ClusterQueuePendingWorkloadsStatus {
m.RLock()
defer m.RUnlock()
return m.workloadsStatus
}
24 changes: 12 additions & 12 deletions pkg/queue/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func TestAddLocalQueueOrphans(t *testing.T) {
Admit(utiltesting.MakeAdmission("cq").Obj()).Obj(),
utiltesting.MakeWorkload("a", "moon").Queue("foo").Obj(),
)
manager := NewManager(kClient, nil, nil)
manager := NewManager(kClient, nil)
q := utiltesting.MakeLocalQueue("foo", "earth").Obj()
if err := manager.AddLocalQueue(context.Background(), q); err != nil {
t.Fatalf("Failed adding queue: %v", err)
Expand Down Expand Up @@ -83,7 +83,7 @@ func TestAddClusterQueueOrphans(t *testing.T) {
queues[1],
)
ctx := context.Background()
manager := NewManager(kClient, nil, nil)
manager := NewManager(kClient, nil)
cq := utiltesting.MakeClusterQueue("cq").Obj()
if err := manager.AddClusterQueue(ctx, cq); err != nil {
t.Fatalf("Failed adding cluster queue %s: %v", cq.Name, err)
Expand Down Expand Up @@ -139,7 +139,7 @@ func TestUpdateClusterQueue(t *testing.T) {
cl := utiltesting.NewFakeClient(
&corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: defaultNamespace}},
)
manager := NewManager(cl, nil, nil)
manager := NewManager(cl, nil)
for _, cq := range clusterQueues {
if err := manager.AddClusterQueue(ctx, cq); err != nil {
t.Fatalf("Failed adding clusterQueue %s: %v", cq.Name, err)
Expand Down Expand Up @@ -204,7 +204,7 @@ func TestUpdateLocalQueue(t *testing.T) {
t.Fatalf("Failed adding kueue scheme: %s", err)
}
ctx := context.Background()
manager := NewManager(utiltesting.NewFakeClient(), nil, nil)
manager := NewManager(utiltesting.NewFakeClient(), nil)
for _, cq := range clusterQueues {
if err := manager.AddClusterQueue(ctx, cq); err != nil {
t.Fatalf("Failed adding clusterQueue %s: %v", cq.Name, err)
Expand Down Expand Up @@ -248,7 +248,7 @@ func TestDeleteLocalQueue(t *testing.T) {

ctx := context.Background()
cl := utiltesting.NewFakeClient(wl)
manager := NewManager(cl, nil, nil)
manager := NewManager(cl, nil)

if err := manager.AddClusterQueue(ctx, cq); err != nil {
t.Fatalf("Could not create ClusterQueue: %v", err)
Expand All @@ -272,7 +272,7 @@ func TestDeleteLocalQueue(t *testing.T) {
}

func TestAddWorkload(t *testing.T) {
manager := NewManager(utiltesting.NewFakeClient(), nil, nil)
manager := NewManager(utiltesting.NewFakeClient(), nil)
cq := utiltesting.MakeClusterQueue("cq").Obj()
if err := manager.AddClusterQueue(context.Background(), cq); err != nil {
t.Fatalf("Failed adding clusterQueue %s: %v", cq.Name, err)
Expand Down Expand Up @@ -390,7 +390,7 @@ func TestStatus(t *testing.T) {
},
}

manager := NewManager(utiltesting.NewFakeClient(), nil, nil)
manager := NewManager(utiltesting.NewFakeClient(), nil)
for _, q := range queues {
if err := manager.AddLocalQueue(ctx, &q); err != nil {
t.Errorf("Failed adding queue: %s", err)
Expand Down Expand Up @@ -500,7 +500,7 @@ func TestRequeueWorkloadStrictFIFO(t *testing.T) {
for _, tc := range cases {
t.Run(tc.workload.Name, func(t *testing.T) {
cl := utiltesting.NewFakeClient()
manager := NewManager(cl, nil, nil)
manager := NewManager(cl, nil)
ctx := context.Background()
if err := manager.AddClusterQueue(ctx, cq); err != nil {
t.Fatalf("Failed adding cluster queue %s: %v", cq.Name, err)
Expand Down Expand Up @@ -657,7 +657,7 @@ func TestUpdateWorkload(t *testing.T) {
}
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
manager := NewManager(utiltesting.NewFakeClient(), nil, nil)
manager := NewManager(utiltesting.NewFakeClient(), nil)
ctx := context.Background()
for _, cq := range tc.clusterQueues {
if err := manager.AddClusterQueue(ctx, cq); err != nil {
Expand Down Expand Up @@ -773,7 +773,7 @@ func TestHeads(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), headsTimeout)
defer cancel()
fakeC := &fakeStatusChecker{}
manager := NewManager(utiltesting.NewFakeClient(), fakeC, nil)
manager := NewManager(utiltesting.NewFakeClient(), fakeC)
for _, cq := range clusterQueues {
if err := manager.AddClusterQueue(ctx, cq); err != nil {
t.Fatalf("Failed adding clusterQueue %s to manager: %v", cq.Name, err)
Expand Down Expand Up @@ -1006,7 +1006,7 @@ func TestHeadsAsync(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), headsTimeout)
defer cancel()
client := utiltesting.NewFakeClient(tc.initialObjs...)
manager := NewManager(client, nil, nil)
manager := NewManager(client, nil)
go manager.CleanUpOnContext(ctx)
tc.op(ctx, manager)
heads := manager.Heads(ctx)
Expand All @@ -1019,7 +1019,7 @@ func TestHeadsAsync(t *testing.T) {

// TestHeadsCancelled ensures that the Heads call returns when the context is closed.
func TestHeadsCancelled(t *testing.T) {
manager := NewManager(utiltesting.NewFakeClient(), nil, nil)
manager := NewManager(utiltesting.NewFakeClient(), nil)
ctx, cancel := context.WithCancel(context.Background())
go func() {
cancel()
Expand Down
4 changes: 2 additions & 2 deletions pkg/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1041,7 +1041,7 @@ func TestSchedule(t *testing.T) {
recorder := broadcaster.NewRecorder(scheme,
corev1.EventSource{Component: constants.AdmissionName})
cqCache := cache.New(cl)
qManager := queue.NewManager(cl, cqCache, nil)
qManager := queue.NewManager(cl, cqCache)
// Workloads are loaded into queues or clusterQueues as we add them.
for _, q := range allQueues {
if err := qManager.AddLocalQueue(ctx, &q); err != nil {
Expand Down Expand Up @@ -1339,7 +1339,7 @@ func TestRequeueAndUpdate(t *testing.T) {
broadcaster := record.NewBroadcaster()
recorder := broadcaster.NewRecorder(scheme, corev1.EventSource{Component: constants.AdmissionName})
cqCache := cache.New(cl)
qManager := queue.NewManager(cl, cqCache, nil)
qManager := queue.NewManager(cl, cqCache)
scheduler := New(qManager, cqCache, cl, recorder)
if err := qManager.AddLocalQueue(ctx, q1); err != nil {
t.Fatalf("Inserting queue %s/%s in manager: %v", q1.Namespace, q1.Name, err)
Expand Down
2 changes: 1 addition & 1 deletion test/integration/controller/core/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func managerSetup(mgr manager.Manager, ctx context.Context) {
gomega.Expect(err).ToNot(gomega.HaveOccurred(), "webhook", failedWebhook)

cCache := cache.New(mgr.GetClient())
queues := queue.NewManager(mgr.GetClient(), cCache, nil)
queues := queue.NewManager(mgr.GetClient(), cCache)

controllersCfg := &config.Configuration{}
controllersCfg.Metrics.EnableClusterQueueResources = true
Expand Down
2 changes: 1 addition & 1 deletion test/integration/controller/jobs/job/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func managerAndSchedulerSetup(opts ...jobframework.Option) framework.ManagerSetu
gomega.Expect(err).NotTo(gomega.HaveOccurred())

cCache := cache.New(mgr.GetClient())
queues := queue.NewManager(mgr.GetClient(), cCache, nil)
queues := queue.NewManager(mgr.GetClient(), cCache)

failedCtrl, err := core.SetupControllers(mgr, queues, cCache, &config.Configuration{})
gomega.Expect(err).ToNot(gomega.HaveOccurred(), "controller", failedCtrl)
Expand Down
2 changes: 1 addition & 1 deletion test/integration/controller/jobs/jobset/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func managerAndSchedulerSetup(opts ...jobframework.Option) framework.ManagerSetu
gomega.Expect(err).NotTo(gomega.HaveOccurred())

cCache := cache.New(mgr.GetClient())
queues := queue.NewManager(mgr.GetClient(), cCache, nil)
queues := queue.NewManager(mgr.GetClient(), cCache)

failedCtrl, err := core.SetupControllers(mgr, queues, cCache, &config.Configuration{})
gomega.Expect(err).ToNot(gomega.HaveOccurred(), "controller", failedCtrl)
Expand Down
Loading

0 comments on commit 81a2384

Please sign in to comment.