Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cancel context kueue - No mutex/locks #8

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
8 changes: 4 additions & 4 deletions cmd/kueue/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func main() {
cCache := cache.New(mgr.GetClient(), cacheOptions...)
queues := queue.NewManager(mgr.GetClient(), cCache, queueOptions...)

ctx := ctrl.SetupSignalHandler()
ctx, cancel := context.WithCancel(ctrl.SetupSignalHandler())
if err := setupIndexes(ctx, mgr, &cfg); err != nil {
setupLog.Error(err, "Unable to setup indexes")
os.Exit(1)
Expand All @@ -182,7 +182,7 @@ func main() {
// Cert won't be ready until manager starts, so start a goroutine here which
// will block until the cert is ready before setting up the controllers.
// Controllers who register after manager starts will start directly.
go setupControllers(mgr, cCache, queues, certsReady, &cfg, serverVersionFetcher)
go setupControllers(ctx, cancel, mgr, cCache, queues, certsReady, &cfg, serverVersionFetcher)

go queues.CleanUpOnContext(ctx)
go cCache.CleanUpOnContext(ctx)
Expand Down Expand Up @@ -229,7 +229,7 @@ func setupIndexes(ctx context.Context, mgr ctrl.Manager, cfg *configapi.Configur
return jobframework.SetupIndexes(ctx, mgr.GetFieldIndexer(), opts...)
}

func setupControllers(mgr ctrl.Manager, cCache *cache.Cache, queues *queue.Manager, certsReady chan struct{}, cfg *configapi.Configuration, serverVersionFetcher *kubeversion.ServerVersionFetcher) {
func setupControllers(ctx context.Context, cancel context.CancelFunc, mgr ctrl.Manager, cCache *cache.Cache, queues *queue.Manager, certsReady chan struct{}, cfg *configapi.Configuration, serverVersionFetcher *kubeversion.ServerVersionFetcher) {
// The controllers won't work until the webhooks are operating, and the webhook won't work until the
// certs are all in place.
cert.WaitForCertsReady(setupLog, certsReady)
Expand Down Expand Up @@ -282,7 +282,7 @@ func setupControllers(mgr ctrl.Manager, cCache *cache.Cache, queues *queue.Manag
jobframework.WithCache(cCache),
jobframework.WithQueues(queues),
}
if err := jobframework.SetupControllers(mgr, setupLog, opts...); err != nil {
if err := jobframework.SetupControllers(ctx, cancel, mgr, setupLog, opts...); err != nil {
setupLog.Error(err, "Unable to create controller or webhook", "kubernetesVersion", serverVersionFetcher.GetServerVersion())
os.Exit(1)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/jobframework/integrationmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ func EnableIntegration(name string) {
// Mark the frameworks identified by names and return a revert function.
func EnableIntegrationsForTest(tb testing.TB, names ...string) func() {
tb.Helper()
old := manager.enabledIntegrations.Clone()
old := manager.enabledIntegrations
for _, name := range names {
manager.enableIntegration(name)
}
Expand Down
85 changes: 69 additions & 16 deletions pkg/controller/jobframework/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,21 @@ import (
"errors"
"fmt"
"os"
"time"

"github.com/go-logr/logr"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/util/workqueue"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"

"sigs.k8s.io/kueue/pkg/controller/jobs/noop"
)

type checkAPIAvailableFunc func(mgr ctrl.Manager, gvk schema.GroupVersionKind) (bool, error)

var (
errFailedMappingResource = errors.New("restMapper failed mapping resource")
)
Expand All @@ -44,11 +49,11 @@ var (
// this function needs to be called after the certs get ready because the controllers won't work
// until the webhooks are operating, and the webhook won't work until the
// certs are all in place.
func SetupControllers(mgr ctrl.Manager, log logr.Logger, opts ...Option) error {
return manager.setupControllers(mgr, log, opts...)
func SetupControllers(ctx context.Context, cancel context.CancelFunc, mgr ctrl.Manager, log logr.Logger, opts ...Option) error {
return manager.setupControllers(ctx, cancel, mgr, log, opts...)
}

func (m *integrationManager) setupControllers(mgr ctrl.Manager, log logr.Logger, opts ...Option) error {
func (m *integrationManager) setupControllers(ctx context.Context, cancel context.CancelFunc, mgr ctrl.Manager, log logr.Logger, opts ...Option) error {
options := ProcessOptions(opts...)

for fwkName := range options.EnabledExternalFrameworks {
Expand All @@ -71,25 +76,19 @@ func (m *integrationManager) setupControllers(mgr ctrl.Manager, log logr.Logger,
if err != nil {
return fmt.Errorf("%s: %w: %w", fwkNamePrefix, errFailedMappingResource, err)
}
if _, err = mgr.GetRESTMapper().RESTMapping(gvk.GroupKind(), gvk.Version); err != nil {
if _, err := getRESTMapping(mgr, gvk); err != nil {
if !meta.IsNoMatchError(err) {
return fmt.Errorf("%s: %w", fwkNamePrefix, err)
}
logger.Info("No matching API in the server for job framework, skipped setup of controller and webhook")
go waitForAPI(ctx, mgr, defaultCheckAPIAvailable, log, gvk, func() {
log.Info(fmt.Sprintf("API now available, starting controller and webhook for %v", gvk))
cancel()
})
} else {
if err = cb.NewReconciler(
mgr.GetClient(),
mgr.GetEventRecorderFor(fmt.Sprintf("%s-%s-controller", name, options.ManagerName)),
opts...,
).SetupWithManager(mgr); err != nil {
return fmt.Errorf("%s: %w", fwkNamePrefix, err)
}
if err = cb.SetupWebhook(mgr, opts...); err != nil {
return fmt.Errorf("%s: unable to create webhook: %w", fwkNamePrefix, err)
if err := m.setupControllerAndWebhook(mgr, name, fwkNamePrefix, cb, options, opts...); err != nil {
return err
}
m.enableIntegration(name)
logger.Info("Set up controller and webhook for job framework")
return nil
}
}
if err := noop.SetupWebhook(mgr, cb.JobType); err != nil {
Expand All @@ -99,6 +98,60 @@ func (m *integrationManager) setupControllers(mgr ctrl.Manager, log logr.Logger,
})
}

func (m *integrationManager) setupControllerAndWebhook(mgr ctrl.Manager, name string, fwkNamePrefix string, cb IntegrationCallbacks, options Options, opts ...Option) error {
if err := cb.NewReconciler(
mgr.GetClient(),
mgr.GetEventRecorderFor(fmt.Sprintf("%s-%s-controller", name, options.ManagerName)),
opts...,
).SetupWithManager(mgr); err != nil {
return fmt.Errorf("%s: %w", fwkNamePrefix, err)
}
if err := cb.SetupWebhook(mgr, opts...); err != nil {
return fmt.Errorf("%s: unable to create webhook: %w", fwkNamePrefix, err)
}
m.enableIntegration(name)
return nil
}

func waitForAPI(ctx context.Context, mgr ctrl.Manager, checkAPIAvailable checkAPIAvailableFunc, log logr.Logger, gvk schema.GroupVersionKind, action func()) {
rateLimiter := workqueue.NewItemExponentialFailureRateLimiter(100*time.Millisecond, 100*time.Second)
item := gvk.String()

for {
isAvailable, err := checkAPIAvailable(mgr, gvk)
if isAvailable {
rateLimiter.Forget(item)
action()
return
}
if err != nil && !meta.IsNoMatchError(err) {
log.Error(err, "Failed to get REST mapping for gvk", "gvk", gvk)
}
select {
case <-ctx.Done():
return
case <-time.After(rateLimiter.When(item)):
continue
}
}
}

func getRESTMapping(mgr ctrl.Manager, gvk schema.GroupVersionKind) (*meta.RESTMapping, error) {
restMapping, err := mgr.GetRESTMapper().RESTMapping(gvk.GroupKind(), gvk.Version)
if err != nil {
return nil, fmt.Errorf("failed to get REST mapping for %v: %w", gvk, err)
}
return restMapping, nil
}

var defaultCheckAPIAvailable checkAPIAvailableFunc = func(mgr ctrl.Manager, gvk schema.GroupVersionKind) (bool, error) {
_, err := getRESTMapping(mgr, gvk)
if err != nil {
return false, err
}
return true, nil
}

// SetupIndexes setups the indexers for integrations.
// When the platform developers implement a separate kueue-manager to manage the in-house custom jobs,
// they can easily setup indexers for the in-house custom jobs.
Expand Down
46 changes: 44 additions & 2 deletions pkg/controller/jobframework/setup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,14 @@ import (
)

func TestSetupControllers(t *testing.T) {
// Simulate Job Framework API checks
defaultCheckAPIAvailable = func(mgr ctrlmgr.Manager, gvk schema.GroupVersionKind) (bool, error) {
// Simulate API being unavailable for MPIJob
if gvk.Kind == "MPIJob" {
return false, nil
}
return true, nil // Simulate API becoming available
}
availableIntegrations := map[string]IntegrationCallbacks{
"batch/job": {
NewReconciler: testNewReconciler,
Expand All @@ -68,13 +76,22 @@ func TestSetupControllers(t *testing.T) {
AddToScheme: testAddToScheme,
CanSupportIntegration: testCanSupportIntegration,
},
"ray.io/raycluster": {
NewReconciler: testNewReconciler,
SetupWebhook: testSetupWebhook,
JobType: &rayv1.RayCluster{},
SetupIndexes: testSetupIndexes,
AddToScheme: testAddToScheme,
CanSupportIntegration: testCanSupportIntegration,
},
}

cases := map[string]struct {
opts []Option
mapperGVKs []schema.GroupVersionKind
wantError error
wantEnabledIntegrations []string
afterSetup func(t *testing.T, ctx context.Context)
}{
"setup controllers succeed": {
opts: []Option{
Expand All @@ -99,9 +116,23 @@ func TestSetupControllers(t *testing.T) {
},
wantEnabledIntegrations: []string{"batch/job"},
},
"mapper doesn't have ray.io/raycluster when Controllers have been setup, but eventually does": {
opts: []Option{
WithEnabledFrameworks([]string{"batch/job", "kubeflow.org/mpijob", "ray.io/raycluster"}),
},
mapperGVKs: []schema.GroupVersionKind{
batchv1.SchemeGroupVersion.WithKind("Job"),
kubeflow.SchemeGroupVersionKind,
// Not including RayCluster
},
wantEnabledIntegrations: []string{"batch/job", "kubeflow.org/mpijob", "ray.io/raycluster"},
afterSetup: testDelayedIntegration,
},
}
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
manager := integrationManager{}
for name, cbs := range availableIntegrations {
err := manager.register(name, cbs)
Expand Down Expand Up @@ -135,18 +166,29 @@ func TestSetupControllers(t *testing.T) {
t.Fatalf("Failed to setup manager: %v", err)
}

gotError := manager.setupControllers(mgr, logger, tc.opts...)
gotError := manager.setupControllers(ctx, cancel, mgr, logger, tc.opts...)
if diff := cmp.Diff(tc.wantError, gotError, cmpopts.EquateErrors()); len(diff) != 0 {
t.Errorf("Unexpected error from SetupControllers (-want,+got):\n%s", diff)
}

if diff := cmp.Diff(tc.wantEnabledIntegrations, manager.enabledIntegrations.SortedList()); len(diff) != 0 {
if tc.afterSetup != nil {
tc.afterSetup(t, ctx)
}

diff := cmp.Diff(tc.wantEnabledIntegrations, manager.enabledIntegrations.SortedList())
if len(diff) != 0 {
t.Errorf("Unexpected enabled integrations (-want,+got):\n%s", diff)
}
})
}
}

func testDelayedIntegration(t *testing.T, ctx context.Context) {
if ctx.Err() == context.Canceled {
t.Skip("Context has been canceled, Kueue pod will restart...")
}
}

func TestSetupIndexes(t *testing.T) {
testNamespace := "test"

Expand Down