Skip to content

Commit

Permalink
Cancelling the context, restarting Kueue pod.
Browse files Browse the repository at this point in the history
  • Loading branch information
ChristianZaccaria committed Aug 7, 2024
1 parent 27be9c7 commit 564fb0f
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 21 deletions.
8 changes: 4 additions & 4 deletions cmd/kueue/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func main() {
cCache := cache.New(mgr.GetClient(), cacheOptions...)
queues := queue.NewManager(mgr.GetClient(), cCache, queueOptions...)

ctx := ctrl.SetupSignalHandler()
ctx, cancel := context.WithCancel(ctrl.SetupSignalHandler())
if err := setupIndexes(ctx, mgr, &cfg); err != nil {
setupLog.Error(err, "Unable to setup indexes")
os.Exit(1)
Expand All @@ -182,7 +182,7 @@ func main() {
// Cert won't be ready until manager starts, so start a goroutine here which
// will block until the cert is ready before setting up the controllers.
// Controllers who register after manager starts will start directly.
go setupControllers(ctx, mgr, cCache, queues, certsReady, &cfg, serverVersionFetcher)
go setupControllers(ctx, cancel, mgr, cCache, queues, certsReady, &cfg, serverVersionFetcher)

go queues.CleanUpOnContext(ctx)
go cCache.CleanUpOnContext(ctx)
Expand Down Expand Up @@ -229,7 +229,7 @@ func setupIndexes(ctx context.Context, mgr ctrl.Manager, cfg *configapi.Configur
return jobframework.SetupIndexes(ctx, mgr.GetFieldIndexer(), opts...)
}

func setupControllers(ctx context.Context, mgr ctrl.Manager, cCache *cache.Cache, queues *queue.Manager, certsReady chan struct{}, cfg *configapi.Configuration, serverVersionFetcher *kubeversion.ServerVersionFetcher) {
func setupControllers(ctx context.Context, cancel context.CancelFunc, mgr ctrl.Manager, cCache *cache.Cache, queues *queue.Manager, certsReady chan struct{}, cfg *configapi.Configuration, serverVersionFetcher *kubeversion.ServerVersionFetcher) {
// The controllers won't work until the webhooks are operating, and the webhook won't work until the
// certs are all in place.
cert.WaitForCertsReady(setupLog, certsReady)
Expand Down Expand Up @@ -282,7 +282,7 @@ func setupControllers(ctx context.Context, mgr ctrl.Manager, cCache *cache.Cache
jobframework.WithCache(cCache),
jobframework.WithQueues(queues),
}
if err := jobframework.SetupControllers(ctx, mgr, setupLog, opts...); err != nil {
if err := jobframework.SetupControllers(ctx, cancel, mgr, setupLog, opts...); err != nil {
setupLog.Error(err, "Unable to create controller or webhook", "kubernetesVersion", serverVersionFetcher.GetServerVersion())
os.Exit(1)
}
Expand Down
10 changes: 4 additions & 6 deletions pkg/controller/jobframework/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,11 @@ var (
// this function needs to be called after the certs get ready because the controllers won't work
// until the webhooks are operating, and the webhook won't work until the
// certs are all in place.
func SetupControllers(ctx context.Context, mgr ctrl.Manager, log logr.Logger, opts ...Option) error {
return manager.setupControllers(ctx, mgr, log, opts...)
func SetupControllers(ctx context.Context, cancel context.CancelFunc, mgr ctrl.Manager, log logr.Logger, opts ...Option) error {
return manager.setupControllers(ctx, cancel, mgr, log, opts...)
}

func (m *integrationManager) setupControllers(ctx context.Context, mgr ctrl.Manager, log logr.Logger, opts ...Option) error {
func (m *integrationManager) setupControllers(ctx context.Context, cancel context.CancelFunc, mgr ctrl.Manager, log logr.Logger, opts ...Option) error {
options := ProcessOptions(opts...)

for fwkName := range options.EnabledExternalFrameworks {
Expand Down Expand Up @@ -83,9 +83,7 @@ func (m *integrationManager) setupControllers(ctx context.Context, mgr ctrl.Mana
logger.Info("No matching API in the server for job framework, skipped setup of controller and webhook")
go waitForAPI(ctx, mgr, defaultCheckAPIAvailable, log, gvk, func() {
log.Info(fmt.Sprintf("API now available, starting controller and webhook for %v", gvk))
if err := m.setupControllerAndWebhook(mgr, name, fwkNamePrefix, cb, options, opts...); err != nil {
log.Error(err, "Failed to setup controller and webhook for job framework")
}
cancel()
})
} else {
if err := m.setupControllerAndWebhook(mgr, name, fwkNamePrefix, cb, options, opts...); err != nil {
Expand Down
19 changes: 8 additions & 11 deletions pkg/controller/jobframework/setup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"context"
"net/http"
"testing"
"time"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
Expand Down Expand Up @@ -92,7 +91,7 @@ func TestSetupControllers(t *testing.T) {
mapperGVKs []schema.GroupVersionKind
wantError error
wantEnabledIntegrations []string
afterSetup func(mgr ctrlmgr.Manager, manager *integrationManager)
afterSetup func(t *testing.T, ctx context.Context)
}{
"setup controllers succeed": {
opts: []Option{
Expand Down Expand Up @@ -132,6 +131,8 @@ func TestSetupControllers(t *testing.T) {
}
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
manager := integrationManager{}
for name, cbs := range availableIntegrations {
err := manager.register(name, cbs)
Expand Down Expand Up @@ -165,13 +166,13 @@ func TestSetupControllers(t *testing.T) {
t.Fatalf("Failed to setup manager: %v", err)
}

gotError := manager.setupControllers(context.Background(), mgr, logger, tc.opts...)
gotError := manager.setupControllers(ctx, cancel, mgr, logger, tc.opts...)
if diff := cmp.Diff(tc.wantError, gotError, cmpopts.EquateErrors()); len(diff) != 0 {
t.Errorf("Unexpected error from SetupControllers (-want,+got):\n%s", diff)
}

if tc.afterSetup != nil {
tc.afterSetup(mgr, &manager)
tc.afterSetup(t, ctx)
}

diff := cmp.Diff(tc.wantEnabledIntegrations, manager.getEnabledIntegrations().SortedList())
Expand All @@ -182,13 +183,9 @@ func TestSetupControllers(t *testing.T) {
}
}

func testDelayedIntegration(mgr ctrlmgr.Manager, manager *integrationManager) {
for {
_, ok := manager.getEnabledIntegrations()["ray.io/raycluster"]
if ok {
break // Exit loop if RayCluster is enabled
}
time.Sleep(10 * time.Millisecond)
func testDelayedIntegration(t *testing.T, ctx context.Context) {
if ctx.Err() == context.Canceled {
t.Skip("Context has been canceled, Kueue pod will restart...")
}
}

Expand Down

0 comments on commit 564fb0f

Please sign in to comment.