Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow setting MaxConcurrentReconciles for reconcilers #2272

Merged
merged 7 commits into from
Nov 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
- Validating values length in prometheus query response ([#2264](https://github.com/kedacore/keda/pull/2264))
- Add `unsafeSsl` parameter in SeleniumGrid scaler ([#2157](https://github.com/kedacore/keda/pull/2157))
- Improve logs of Azure Pipelines Scaler. ([#2297](https://github.com/kedacore/keda/pull/2297))
- Allow setting `MaxConcurrentReconciles` for controllers ([#2272](https://github.com/kedacore/keda/pull/2272))

### Deprecations

Expand Down
25 changes: 13 additions & 12 deletions adapter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"fmt"
"os"
"runtime"
"strconv"
"sync"
"time"

Expand Down Expand Up @@ -50,6 +49,7 @@ import (
prommetrics "github.com/kedacore/keda/v2/pkg/metrics"
kedaprovider "github.com/kedacore/keda/v2/pkg/provider"
"github.com/kedacore/keda/v2/pkg/scaling"
kedautil "github.com/kedacore/keda/v2/pkg/util"
"github.com/kedacore/keda/v2/version"
)

Expand All @@ -70,7 +70,7 @@ var (
adapterClientRequestBurst int
)

func (a *Adapter) makeProvider(ctx context.Context, globalHTTPTimeout time.Duration) (provider.MetricsProvider, <-chan struct{}, error) {
func (a *Adapter) makeProvider(ctx context.Context, globalHTTPTimeout time.Duration, maxConcurrentReconciles int) (provider.MetricsProvider, <-chan struct{}, error) {
// Get a config to talk to the apiserver
cfg, err := config.GetConfig()
if cfg != nil {
Expand Down Expand Up @@ -116,14 +116,14 @@ func (a *Adapter) makeProvider(ctx context.Context, globalHTTPTimeout time.Durat
prometheusServer := &prommetrics.PrometheusMetricServer{}
go func() { prometheusServer.NewServer(fmt.Sprintf(":%v", prometheusMetricsPort), prometheusMetricsPath) }()
stopCh := make(chan struct{})
if err := runScaledObjectController(ctx, scheme, namespace, handler, logger, externalMetricsInfo, externalMetricsInfoLock, stopCh); err != nil {
if err := runScaledObjectController(ctx, scheme, namespace, handler, logger, externalMetricsInfo, externalMetricsInfoLock, maxConcurrentReconciles, stopCh); err != nil {
return nil, nil, err
}

return kedaprovider.NewProvider(ctx, logger, handler, kubeclient, namespace, externalMetricsInfo, externalMetricsInfoLock), stopCh, nil
}

func runScaledObjectController(ctx context.Context, scheme *k8sruntime.Scheme, namespace string, scaleHandler scaling.ScaleHandler, logger logr.Logger, externalMetricsInfo *[]provider.ExternalMetricInfo, externalMetricsInfoLock *sync.RWMutex, stopCh chan<- struct{}) error {
func runScaledObjectController(ctx context.Context, scheme *k8sruntime.Scheme, namespace string, scaleHandler scaling.ScaleHandler, logger logr.Logger, externalMetricsInfo *[]provider.ExternalMetricInfo, externalMetricsInfoLock *sync.RWMutex, maxConcurrentReconciles int, stopCh chan<- struct{}) error {
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
Scheme: scheme,
Namespace: namespace,
Expand All @@ -137,7 +137,7 @@ func runScaledObjectController(ctx context.Context, scheme *k8sruntime.Scheme, n
ScaleHandler: scaleHandler,
ExternalMetricsInfo: externalMetricsInfo,
ExternalMetricsInfoLock: externalMetricsInfoLock,
}).SetupWithManager(mgr, controller.Options{}); err != nil {
}).SetupWithManager(mgr, controller.Options{MaxConcurrentReconciles: maxConcurrentReconciles}); err != nil {
return err
}

Expand Down Expand Up @@ -200,19 +200,20 @@ func main() {

ctrl.SetLogger(logger)

globalHTTPTimeoutStr := os.Getenv("KEDA_HTTP_DEFAULT_TIMEOUT")
if globalHTTPTimeoutStr == "" {
// default to 3 seconds if they don't pass the env var
globalHTTPTimeoutStr = "3000"
// default to 3 seconds if they don't pass the env var
globalHTTPTimeoutMS, err := kedautil.ResolveOsEnvInt("KEDA_HTTP_DEFAULT_TIMEOUT", 3000)
if err != nil {
logger.Error(err, "Invalid KEDA_HTTP_DEFAULT_TIMEOUT")
return
}

globalHTTPTimeoutMS, err := strconv.Atoi(globalHTTPTimeoutStr)
controllerMaxReconciles, err := kedautil.ResolveOsEnvInt("KEDA_METRICS_CTRL_MAX_RECONCILES", 1)
if err != nil {
logger.Error(err, "Invalid KEDA_HTTP_DEFAULT_TIMEOUT")
logger.Error(err, "Invalid KEDA_METRICS_CTRL_MAX_RECONCILES")
return
}

kedaProvider, stopCh, err := cmd.makeProvider(ctx, time.Duration(globalHTTPTimeoutMS)*time.Millisecond)
kedaProvider, stopCh, err := cmd.makeProvider(ctx, time.Duration(globalHTTPTimeoutMS)*time.Millisecond, controllerMaxReconciles)
if err != nil {
logger.Error(err, "making provider")
return
Expand Down
1 change: 1 addition & 0 deletions controllers/keda/metrics_adapter_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type MetricsScaledObjectReconciler struct {
ScaleHandler scaling.ScaleHandler
ExternalMetricsInfo *[]provider.ExternalMetricInfo
ExternalMetricsInfoLock *sync.RWMutex
MaxConcurrentReconciles int
}

var (
Expand Down
7 changes: 5 additions & 2 deletions controllers/keda/scaledjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/predicate"

Expand All @@ -49,14 +50,16 @@ type ScaledJobReconciler struct {
Scheme *runtime.Scheme
GlobalHTTPTimeout time.Duration
Recorder record.EventRecorder
scaleHandler scaling.ScaleHandler

scaleHandler scaling.ScaleHandler
}

// SetupWithManager initializes the ScaledJobReconciler instance and starts a new controller managed by the passed Manager instance.
func (r *ScaledJobReconciler) SetupWithManager(mgr ctrl.Manager) error {
func (r *ScaledJobReconciler) SetupWithManager(mgr ctrl.Manager, options controller.Options) error {
r.scaleHandler = scaling.NewScaleHandler(mgr.GetClient(), nil, mgr.GetScheme(), r.GlobalHTTPTimeout, mgr.GetEventRecorderFor("scale-handler"))

return ctrl.NewControllerManagedBy(mgr).
WithOptions(options).
// Ignore updates to ScaledJob Status (in this case metadata.Generation does not change)
// so reconcile loop is not started on Status updates
For(&kedav1alpha1.ScaledJob{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})).
Expand Down
17 changes: 9 additions & 8 deletions controllers/keda/scaledobject_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/predicate"
Expand Down Expand Up @@ -75,18 +76,17 @@ type ScaledObjectReconciler struct {
}

// A cache mapping "resource.group" to true or false if we know if this resource is scalable.
var isScalableCache map[string]bool
var isScalableCache *sync.Map

func init() {
// Prefill the cache with some known values for core resources in case of future parallelism to avoid stampeding herd on startup.
isScalableCache = map[string]bool{
"deployments.apps": true,
"statefulsets.apps": true,
}
isScalableCache = &sync.Map{}
isScalableCache.Store("deployments.apps", true)
isScalableCache.Store("statefulsets.apps", true)
}

// SetupWithManager initializes the ScaledObjectReconciler instance and starts a new controller managed by the passed Manager instance.
func (r *ScaledObjectReconciler) SetupWithManager(mgr ctrl.Manager) error {
func (r *ScaledObjectReconciler) SetupWithManager(mgr ctrl.Manager, options controller.Options) error {
setupLog := log.Log.WithName("setup")

// create Discovery clientset
Expand Down Expand Up @@ -117,6 +117,7 @@ func (r *ScaledObjectReconciler) SetupWithManager(mgr ctrl.Manager) error {

// Start controller
return ctrl.NewControllerManagedBy(mgr).
WithOptions(options).
// predicate.GenerationChangedPredicate{} ignore updates to ScaledObject Status
// (in this case metadata.Generation does not change)
// so reconcile loop is not started on Status updates
Expand Down Expand Up @@ -284,7 +285,7 @@ func (r *ScaledObjectReconciler) checkTargetResourceIsScalable(ctx context.Conte
// check if we already know.
var scale *autoscalingv1.Scale
gr := gvkr.GroupResource()
isScalable := isScalableCache[gr.String()]
_, isScalable := isScalableCache.Load(gr.String())
if !isScalable || wantStatusUpdate {
// not cached, let's try to detect /scale subresource
// also rechecks when we need to update the status.
Expand All @@ -303,7 +304,7 @@ func (r *ScaledObjectReconciler) checkTargetResourceIsScalable(ctx context.Conte
logger.Error(errScale, "Target resource doesn't expose /scale subresource", "resource", gvkString, "name", scaledObject.Spec.ScaleTargetRef.Name)
return gvkr, errScale
}
isScalableCache[gr.String()] = true
isScalableCache.Store(gr.String(), true)
}
zroubalik marked this conversation as resolved.
Show resolved Hide resolved

// if it is not already present in ScaledObject Status:
Expand Down
3 changes: 2 additions & 1 deletion controllers/keda/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"k8s.io/client-go/rest"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/envtest"
"sigs.k8s.io/controller-runtime/pkg/envtest/printer"
logf "sigs.k8s.io/controller-runtime/pkg/log"
Expand Down Expand Up @@ -85,7 +86,7 @@ var _ = BeforeSuite(func(done Done) {
Client: k8sManager.GetClient(),
Scheme: k8sManager.GetScheme(),
Recorder: k8sManager.GetEventRecorderFor("keda-operator"),
}).SetupWithManager(k8sManager)
}).SetupWithManager(k8sManager, controller.Options{})
Expect(err).ToNot(HaveOccurred())

k8sClient, err = client.New(cfg, client.Options{Scheme: scheme.Scheme})
Expand Down
28 changes: 18 additions & 10 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,20 @@ import (
"fmt"
"os"
"runtime"
"strconv"
"time"

apimachineryruntime "k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
_ "k8s.io/client-go/plugin/pkg/client/auth"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/healthz"
"sigs.k8s.io/controller-runtime/pkg/log/zap"

kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1"
kedacontrollers "github.com/kedacore/keda/v2/controllers/keda"
kedautil "github.com/kedacore/keda/v2/pkg/util"
"github.com/kedacore/keda/v2/version"
//+kubebuilder:scaffold:imports
)
Expand Down Expand Up @@ -96,16 +97,23 @@ func main() {
os.Exit(1)
}

globalHTTPTimeoutStr := os.Getenv("KEDA_HTTP_DEFAULT_TIMEOUT")
if globalHTTPTimeoutStr == "" {
// default to 3 seconds if they don't pass the env var
globalHTTPTimeoutStr = "3000"
// default to 3 seconds if they don't pass the env var
globalHTTPTimeoutMS, err := kedautil.ResolveOsEnvInt("KEDA_HTTP_DEFAULT_TIMEOUT", 3000)
if err != nil {
setupLog.Error(err, "Invalid KEDA_HTTP_DEFAULT_TIMEOUT")
os.Exit(1)
}

globalHTTPTimeoutMS, err := strconv.Atoi(globalHTTPTimeoutStr)
scaledObjectMaxReconciles, err := kedautil.ResolveOsEnvInt("KEDA_SCALEDOBJECT_CTRL_MAX_RECONCILES", 5)
if err != nil {
setupLog.Error(err, "Invalid KEDA_HTTP_DEFAULT_TIMEOUT")
return
setupLog.Error(err, "Invalid KEDA_SCALEDOBJECT_CTRL_MAX_RECONCILES")
os.Exit(1)
}

scaledJobMaxReconciles, err := kedautil.ResolveOsEnvInt("KEDA_SCALEDJOB_CTRL_MAX_RECONCILES", 1)
if err != nil {
setupLog.Error(err, "Invalid KEDA_SCALEDJOB_CTRL_MAX_RECONCILES")
os.Exit(1)
}

globalHTTPTimeout := time.Duration(globalHTTPTimeoutMS) * time.Millisecond
Expand All @@ -116,7 +124,7 @@ func main() {
Scheme: mgr.GetScheme(),
GlobalHTTPTimeout: globalHTTPTimeout,
Recorder: eventRecorder,
}).SetupWithManager(mgr); err != nil {
}).SetupWithManager(mgr, controller.Options{MaxConcurrentReconciles: scaledObjectMaxReconciles}); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "ScaledObject")
os.Exit(1)
}
Expand All @@ -125,7 +133,7 @@ func main() {
Scheme: mgr.GetScheme(),
GlobalHTTPTimeout: globalHTTPTimeout,
Recorder: eventRecorder,
}).SetupWithManager(mgr); err != nil {
}).SetupWithManager(mgr, controller.Options{MaxConcurrentReconciles: scaledJobMaxReconciles}); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "ScaledJob")
os.Exit(1)
}
Expand Down
16 changes: 16 additions & 0 deletions pkg/util/env_resolver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package util

import (
"os"
"strconv"
)

func ResolveOsEnvInt(envName string, defaultValue int) (int, error) {
valueStr, found := os.LookupEnv(envName)

if found && valueStr != "" {
return strconv.Atoi(valueStr)
}

return defaultValue, nil
}