Skip to content

Commit

Permalink
taking snapshot of cluster queue
Browse files Browse the repository at this point in the history
  • Loading branch information
Anton Stuchinskii committed Aug 24, 2023
1 parent dc7eccf commit 032c77f
Show file tree
Hide file tree
Showing 21 changed files with 194 additions and 28 deletions.
20 changes: 18 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

// Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.)
// to ensure that exec-entrypoint and run can make use of them.

_ "k8s.io/client-go/plugin/pkg/client/auth"

zaplog "go.uber.org/zap"
Expand Down Expand Up @@ -146,7 +147,7 @@ func main() {
}

cCache := cache.New(mgr.GetClient(), cache.WithPodsReadyTracking(blockForPodsReady(&cfg)))
queues := queue.NewManager(mgr.GetClient(), cCache)
queues := setupManager(mgr, cCache, &cfg)

ctx := ctrl.SetupSignalHandler()
if err := setupIndexes(ctx, mgr, &cfg); err != nil {
Expand Down Expand Up @@ -178,6 +179,15 @@ func main() {
}
}

func setupManager(mgr ctrl.Manager, cCache *cache.Cache, cfg *configapi.Configuration) *queue.Manager {
queues := queue.NewManager(mgr.GetClient(), cCache, cfg)
if err := mgr.Add(queues); err != nil {
setupLog.Error(err, "Unable to add queue manager to manager")
os.Exit(1)
}
return queues
}

func setupIndexes(ctx context.Context, mgr ctrl.Manager, cfg *configapi.Configuration) error {
err := indexer.Setup(ctx, mgr.GetFieldIndexer())
if err != nil {
Expand All @@ -195,7 +205,13 @@ func setupIndexes(ctx context.Context, mgr ctrl.Manager, cfg *configapi.Configur
return err
}

func setupControllers(mgr ctrl.Manager, cCache *cache.Cache, queues *queue.Manager, certsReady chan struct{}, cfg *configapi.Configuration, serverVersionFetcher *kubeversion.ServerVersionFetcher) {
func setupControllers(
mgr ctrl.Manager,
cCache *cache.Cache,
queues *queue.Manager,
certsReady chan struct{},
cfg *configapi.Configuration,
serverVersionFetcher *kubeversion.ServerVersionFetcher) {
// The controllers won't work until the webhooks are operating, and the webhook won't work until the
// certs are all in place.
setupLog.Info("Waiting for certificate generation to complete")
Expand Down
1 change: 1 addition & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,5 +163,6 @@ func Load(scheme *runtime.Scheme, configFile string) (ctrl.Options, configapi.Co
}
}
addTo(&options, &cfg)
err = validate(&cfg)
return options, cfg, err
}
42 changes: 42 additions & 0 deletions pkg/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -688,3 +688,45 @@ func TestEncode(t *testing.T) {
})
}
}

func TestValidate(t *testing.T) {
testcases := []struct {
name string
cfg *configapi.Configuration
wantErr error
}{

{
name: "empty",
cfg: &configapi.Configuration{},
wantErr: nil,
},
{
name: "invalid queue visibility UpdateIntervalSeconds",
cfg: &configapi.Configuration{
QueueVisibility: &configapi.QueueVisibility{
UpdateIntervalSeconds: 0,
},
},
wantErr: errInvalidUpdateIntervalSeconds,
},
{
name: "invalid queue visibility cluster queue max count",
cfg: &configapi.Configuration{
QueueVisibility: &configapi.QueueVisibility{
ClusterQueues: &configapi.ClusterQueueVisibility{
MaxCount: 4001,
},
},
},
wantErr: errInvalidMaxValue,
},
}
for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
if diff := cmp.Diff(tc.wantErr, validate(tc.cfg), cmpopts.EquateErrors()); diff != "" {
t.Errorf("Unexpected returned error (-want,+got):\n%s", diff)
}
})
}
}
31 changes: 31 additions & 0 deletions pkg/config/validation.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package config

import (
"fmt"

configapi "sigs.k8s.io/kueue/apis/config/v1beta1"
)

var (
errInvalidMaxValue = fmt.Errorf("maximum value for QueueVisibility.ClusterQueues.MaxCount must be %d", queueVisibilityClusterQueuesMaxValue)
errInvalidUpdateIntervalSeconds = fmt.Errorf("minimum value for QueueVisibility.UpdateIntervalSeconds must be %d", queueVisibilityClusterQueuesUpdateIntervalSeconds)
)

const (
queueVisibilityClusterQueuesMaxValue = 4000
queueVisibilityClusterQueuesUpdateIntervalSeconds = 1
)

func validate(cfg *configapi.Configuration) error {
if cfg.QueueVisibility != nil {
if cfg.QueueVisibility.ClusterQueues != nil {
if cfg.QueueVisibility.ClusterQueues.MaxCount > queueVisibilityClusterQueuesMaxValue {
return errInvalidMaxValue
}
}
if cfg.QueueVisibility.UpdateIntervalSeconds < queueVisibilityClusterQueuesUpdateIntervalSeconds {
return errInvalidUpdateIntervalSeconds
}
}
return nil
}
1 change: 1 addition & 0 deletions pkg/controller/core/clusterqueue_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,7 @@ func (r *ClusterQueueReconciler) updateCqStatusIfChanged(
cq.Status.FlavorsUsage = usage
cq.Status.AdmittedWorkloads = int32(workloads)
cq.Status.PendingWorkloads = int32(pendingWorkloads)
cq.Status.PendingWorkloadsStatus = r.qManager.GetWorkloadsStatus()
meta.SetStatusCondition(&cq.Status.Conditions, metav1.Condition{
Type: kueue.ClusterQueueActive,
Status: conditionStatus,
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/core/clusterqueue_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func TestUpdateCqStatusIfChanged(t *testing.T) {
cl := utiltesting.NewClientBuilder().WithLists(defaultWls).WithObjects(lq, cq).WithStatusSubresource(lq, cq).
Build()
cqCache := cache.New(cl)
qManager := queue.NewManager(cl, cqCache)
qManager := queue.NewManager(cl, cqCache, nil)
if err := cqCache.AddClusterQueue(ctx, cq); err != nil {
t.Fatalf("Inserting clusterQueue in cache: %v", err)
}
Expand Down
18 changes: 18 additions & 0 deletions pkg/queue/cluster_queue_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,24 @@ func (c *clusterQueueBase) Dump() (sets.Set[string], bool) {
return elements, true
}

func (c *clusterQueueBase) Snapshot(maxCount int32) ([]*kueue.Workload, bool) {
if c.heap.Len() == 0 || maxCount == 0 {
return nil, false
}
elements := make([]*kueue.Workload, c.heap.Len())
for i, e := range c.heap.List() {
if maxCount >= int32(i) {
return elements, true
}
info := e.(*workload.Info)
elements = append(elements, info.Obj)
}
for _, info := range c.inadmissibleWorkloads {
elements = append(elements, info.Obj)
}
return elements, true
}

func (c *clusterQueueBase) DumpInadmissible() (sets.Set[string], bool) {
if len(c.inadmissibleWorkloads) == 0 {
return nil, false
Expand Down
1 change: 1 addition & 0 deletions pkg/queue/cluster_queue_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ type ClusterQueue interface {
// Otherwise returns true.
Dump() (sets.Set[string], bool)
DumpInadmissible() (sets.Set[string], bool)
Snapshot(maxCount int32) ([]*kueue.Workload, bool)
// Info returns workload.Info for the workload key.
// Users of this method should not modify the returned object.
Info(string) *workload.Info
Expand Down
58 changes: 57 additions & 1 deletion pkg/queue/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,15 @@ import (
"errors"
"fmt"
"sync"
"time"

apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

configapi "sigs.k8s.io/kueue/apis/config/v1beta1"
kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
utilindexer "sigs.k8s.io/kueue/pkg/controller/core/indexer"
"sigs.k8s.io/kueue/pkg/metrics"
Expand All @@ -48,14 +51,18 @@ type Manager struct {
clusterQueues map[string]ClusterQueue
localQueues map[string]*LocalQueue

workloadsStatus *kueue.ClusterQueuePendingWorkloadsStatus
cfg *configapi.Configuration

// Key is cohort's name. Value is a set of associated ClusterQueue names.
cohorts map[string]sets.Set[string]
}

func NewManager(client client.Client, checker StatusChecker) *Manager {
func NewManager(client client.Client, checker StatusChecker, cfg *configapi.Configuration) *Manager {
m := &Manager{
client: client,
statusChecker: checker,
cfg: cfg,
localQueues: make(map[string]*LocalQueue),
clusterQueues: make(map[string]ClusterQueue),
cohorts: make(map[string]sets.Set[string]),
Expand Down Expand Up @@ -469,6 +476,22 @@ func (m *Manager) Dump() map[string]sets.Set[string] {
return dump
}

// Snapshot is a copy of the queues elements and inadmissible workloads list.
func (m *Manager) Snapshot() []*kueue.Workload {
m.Lock()
defer m.Unlock()
if len(m.clusterQueues) == 0 {
return nil
}
snapshot := make([]*kueue.Workload, 0)
for _, cq := range m.clusterQueues {
if elements, ok := cq.Snapshot(m.cfg.QueueVisibility.ClusterQueues.MaxCount); ok {
snapshot = append(snapshot, elements...)
}
}
return snapshot
}

// DumpInadmissible is a dump of the inadmissible workloads list.
// Only use for testing purposes.
func (m *Manager) DumpInadmissible() map[string]sets.Set[string] {
Expand Down Expand Up @@ -547,3 +570,36 @@ func (m *Manager) reportPendingWorkloads(cqName string, cq ClusterQueue) {
}
metrics.ReportPendingWorkloads(cqName, active, inadmissible)
}

func (m *Manager) Start(ctx context.Context) error {
log := ctrl.LoggerFrom(ctx).WithName("clusterQueueSnapshot")
ctx = ctrl.LoggerInto(ctx, log)

ticker := time.NewTicker(
time.Duration(m.cfg.QueueVisibility.UpdateIntervalSeconds) * time.Second,
)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
log.V(2).Info("Context cancelled; stop doing snapshot of cluster queue")
return nil
case t := <-ticker.C:
m.workloadsStatus = &kueue.ClusterQueuePendingWorkloadsStatus{
Head: make([]kueue.ClusterQueuePendingWorkload, 0),
LastChangeTime: metav1.Time{Time: t},
}
for _, wl := range m.Snapshot() {
m.workloadsStatus.Head = append(m.workloadsStatus.Head, kueue.ClusterQueuePendingWorkload{
Name: wl.Name,
Namespace: wl.Namespace,
})
}
}
}
}

func (m *Manager) GetWorkloadsStatus() *kueue.ClusterQueuePendingWorkloadsStatus {
return m.workloadsStatus
}
24 changes: 12 additions & 12 deletions pkg/queue/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func TestAddLocalQueueOrphans(t *testing.T) {
Admit(utiltesting.MakeAdmission("cq").Obj()).Obj(),
utiltesting.MakeWorkload("a", "moon").Queue("foo").Obj(),
)
manager := NewManager(kClient, nil)
manager := NewManager(kClient, nil, nil)
q := utiltesting.MakeLocalQueue("foo", "earth").Obj()
if err := manager.AddLocalQueue(context.Background(), q); err != nil {
t.Fatalf("Failed adding queue: %v", err)
Expand Down Expand Up @@ -83,7 +83,7 @@ func TestAddClusterQueueOrphans(t *testing.T) {
queues[1],
)
ctx := context.Background()
manager := NewManager(kClient, nil)
manager := NewManager(kClient, nil, nil)
cq := utiltesting.MakeClusterQueue("cq").Obj()
if err := manager.AddClusterQueue(ctx, cq); err != nil {
t.Fatalf("Failed adding cluster queue %s: %v", cq.Name, err)
Expand Down Expand Up @@ -139,7 +139,7 @@ func TestUpdateClusterQueue(t *testing.T) {
cl := utiltesting.NewFakeClient(
&corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: defaultNamespace}},
)
manager := NewManager(cl, nil)
manager := NewManager(cl, nil, nil)
for _, cq := range clusterQueues {
if err := manager.AddClusterQueue(ctx, cq); err != nil {
t.Fatalf("Failed adding clusterQueue %s: %v", cq.Name, err)
Expand Down Expand Up @@ -204,7 +204,7 @@ func TestUpdateLocalQueue(t *testing.T) {
t.Fatalf("Failed adding kueue scheme: %s", err)
}
ctx := context.Background()
manager := NewManager(utiltesting.NewFakeClient(), nil)
manager := NewManager(utiltesting.NewFakeClient(), nil, nil)
for _, cq := range clusterQueues {
if err := manager.AddClusterQueue(ctx, cq); err != nil {
t.Fatalf("Failed adding clusterQueue %s: %v", cq.Name, err)
Expand Down Expand Up @@ -248,7 +248,7 @@ func TestDeleteLocalQueue(t *testing.T) {

ctx := context.Background()
cl := utiltesting.NewFakeClient(wl)
manager := NewManager(cl, nil)
manager := NewManager(cl, nil, nil)

if err := manager.AddClusterQueue(ctx, cq); err != nil {
t.Fatalf("Could not create ClusterQueue: %v", err)
Expand All @@ -272,7 +272,7 @@ func TestDeleteLocalQueue(t *testing.T) {
}

func TestAddWorkload(t *testing.T) {
manager := NewManager(utiltesting.NewFakeClient(), nil)
manager := NewManager(utiltesting.NewFakeClient(), nil, nil)
cq := utiltesting.MakeClusterQueue("cq").Obj()
if err := manager.AddClusterQueue(context.Background(), cq); err != nil {
t.Fatalf("Failed adding clusterQueue %s: %v", cq.Name, err)
Expand Down Expand Up @@ -390,7 +390,7 @@ func TestStatus(t *testing.T) {
},
}

manager := NewManager(utiltesting.NewFakeClient(), nil)
manager := NewManager(utiltesting.NewFakeClient(), nil, nil)
for _, q := range queues {
if err := manager.AddLocalQueue(ctx, &q); err != nil {
t.Errorf("Failed adding queue: %s", err)
Expand Down Expand Up @@ -500,7 +500,7 @@ func TestRequeueWorkloadStrictFIFO(t *testing.T) {
for _, tc := range cases {
t.Run(tc.workload.Name, func(t *testing.T) {
cl := utiltesting.NewFakeClient()
manager := NewManager(cl, nil)
manager := NewManager(cl, nil, nil)
ctx := context.Background()
if err := manager.AddClusterQueue(ctx, cq); err != nil {
t.Fatalf("Failed adding cluster queue %s: %v", cq.Name, err)
Expand Down Expand Up @@ -657,7 +657,7 @@ func TestUpdateWorkload(t *testing.T) {
}
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
manager := NewManager(utiltesting.NewFakeClient(), nil)
manager := NewManager(utiltesting.NewFakeClient(), nil, nil)
ctx := context.Background()
for _, cq := range tc.clusterQueues {
if err := manager.AddClusterQueue(ctx, cq); err != nil {
Expand Down Expand Up @@ -773,7 +773,7 @@ func TestHeads(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), headsTimeout)
defer cancel()
fakeC := &fakeStatusChecker{}
manager := NewManager(utiltesting.NewFakeClient(), fakeC)
manager := NewManager(utiltesting.NewFakeClient(), fakeC, nil)
for _, cq := range clusterQueues {
if err := manager.AddClusterQueue(ctx, cq); err != nil {
t.Fatalf("Failed adding clusterQueue %s to manager: %v", cq.Name, err)
Expand Down Expand Up @@ -1006,7 +1006,7 @@ func TestHeadsAsync(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), headsTimeout)
defer cancel()
client := utiltesting.NewFakeClient(tc.initialObjs...)
manager := NewManager(client, nil)
manager := NewManager(client, nil, nil)
go manager.CleanUpOnContext(ctx)
tc.op(ctx, manager)
heads := manager.Heads(ctx)
Expand All @@ -1019,7 +1019,7 @@ func TestHeadsAsync(t *testing.T) {

// TestHeadsCancelled ensures that the Heads call returns when the context is closed.
func TestHeadsCancelled(t *testing.T) {
manager := NewManager(utiltesting.NewFakeClient(), nil)
manager := NewManager(utiltesting.NewFakeClient(), nil, nil)
ctx, cancel := context.WithCancel(context.Background())
go func() {
cancel()
Expand Down
Loading

0 comments on commit 032c77f

Please sign in to comment.