Skip to content

Commit

Permalink
feat: use Default metric providers (#4473)
Browse files Browse the repository at this point in the history
Signed-off-by: Jorge Turrado <jorge_turrado@hotmail.es>
Signed-off-by: Jorge Turrado <jorge.turrado@scrm.lidl>
  • Loading branch information
JorTurFer authored May 18, 2023
1 parent c1746a9 commit 8c9dd7f
Show file tree
Hide file tree
Showing 702 changed files with 44,410 additions and 23,355 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ New deprecation(s):
- **General**: Drop a transitive dependency on bou.ke/monkey ([#4364](https://github.com/kedacore/keda/issues/4364))
- **General**: Fix odd number of arguments passed as key-value pairs for logging ([#4368](https://github.com/kedacore/keda/issues/4368))
- **General**: Automatically scale test clusters in/out to reduce environmental footprint & improve cost-efficiency ([#4456](https://github.com/kedacore/keda/pull/4456))
- **General**: Use default metrics provider from sigs.k8s.io/custom-metrics-apiserver ([#4473](https://github.com/kedacore/keda/pull/4473))

## v2.10.0

Expand Down
21 changes: 11 additions & 10 deletions apis/keda/v1alpha1/scaledobject_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/webhook"
"sigs.k8s.io/controller-runtime/pkg/webhook/admission"

prommetrics "github.com/kedacore/keda/v2/pkg/prommetrics/webhook"
)
Expand All @@ -56,26 +57,26 @@ func (so *ScaledObject) SetupWebhookWithManager(mgr ctrl.Manager) error {
var _ webhook.Validator = &ScaledObject{}

// ValidateCreate implements webhook.Validator so a webhook will be registered for the type
func (so *ScaledObject) ValidateCreate() error {
func (so *ScaledObject) ValidateCreate() (admission.Warnings, error) {
val, _ := json.MarshalIndent(so, "", " ")
scaledobjectlog.V(1).Info(fmt.Sprintf("validating scaledobject creation for %s", string(val)))
return validateWorkload(so, "create")
}

func (so *ScaledObject) ValidateUpdate(old runtime.Object) error {
func (so *ScaledObject) ValidateUpdate(old runtime.Object) (admission.Warnings, error) {
val, _ := json.MarshalIndent(so, "", " ")
scaledobjectlog.V(1).Info(fmt.Sprintf("validating scaledobject update for %s", string(val)))

if isRemovingFinalizer(so, old) {
scaledobjectlog.V(1).Info("finalizer removal, skipping validation")
return nil
return nil, nil
}

return validateWorkload(so, "update")
}

func (so *ScaledObject) ValidateDelete() error {
return nil
func (so *ScaledObject) ValidateDelete() (admission.Warnings, error) {
return nil, nil
}

func isRemovingFinalizer(so *ScaledObject, old runtime.Object) bool {
Expand All @@ -89,23 +90,23 @@ func isRemovingFinalizer(so *ScaledObject, old runtime.Object) bool {
return len(so.ObjectMeta.Finalizers) == 0 && len(oldSo.ObjectMeta.Finalizers) == 1 && soSpecString == oldSoSpecString
}

func validateWorkload(so *ScaledObject, action string) error {
func validateWorkload(so *ScaledObject, action string) (admission.Warnings, error) {
prommetrics.RecordScaledObjectValidatingTotal(so.Namespace, action)
err := verifyCPUMemoryScalers(so, action)
if err != nil {
return err
return nil, err
}
err = verifyScaledObjects(so, action)
if err != nil {
return err
return nil, err
}
err = verifyHpas(so, action)
if err != nil {
return err
return nil, err
}

scaledobjectlog.V(1).Info(fmt.Sprintf("scaledobject %s is valid", so.Name))
return nil
return nil, nil
}

func verifyHpas(incomingSo *ScaledObject, action string) error {
Expand Down
75 changes: 16 additions & 59 deletions cmd/adapter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,27 +21,22 @@ import (
"flag"
"fmt"
"os"
"sync"
"time"

"github.com/go-logr/logr"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/wait"
kubeinformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
"k8s.io/klog/v2/klogr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/manager"
basecmd "sigs.k8s.io/custom-metrics-apiserver/pkg/cmd"
"sigs.k8s.io/custom-metrics-apiserver/pkg/provider"

kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1"
kedacontrollers "github.com/kedacore/keda/v2/controllers/keda"
"github.com/kedacore/keda/v2/pkg/metricsservice"
prommetrics "github.com/kedacore/keda/v2/pkg/prommetrics/adapter"
kedaprovider "github.com/kedacore/keda/v2/pkg/provider"
Expand Down Expand Up @@ -69,44 +64,44 @@ var (
metricsServiceAddr string
)

func (a *Adapter) makeProvider(ctx context.Context, globalHTTPTimeout time.Duration, maxConcurrentReconciles int) (provider.MetricsProvider, <-chan struct{}, error) {
func (a *Adapter) makeProvider(ctx context.Context, globalHTTPTimeout time.Duration) (provider.ExternalMetricsProvider, error) {
scheme := scheme.Scheme
if err := appsv1.SchemeBuilder.AddToScheme(scheme); err != nil {
logger.Error(err, "failed to add apps/v1 scheme to runtime scheme")
return nil, nil, fmt.Errorf("failed to add apps/v1 scheme to runtime scheme (%s)", err)
return nil, fmt.Errorf("failed to add apps/v1 scheme to runtime scheme (%s)", err)
}
if err := kedav1alpha1.SchemeBuilder.AddToScheme(scheme); err != nil {
logger.Error(err, "failed to add keda scheme to runtime scheme")
return nil, nil, fmt.Errorf("failed to add keda scheme to runtime scheme (%s)", err)
return nil, fmt.Errorf("failed to add keda scheme to runtime scheme (%s)", err)
}
namespace, err := getWatchNamespace()
if err != nil {
logger.Error(err, "failed to get watch namespace")
return nil, nil, fmt.Errorf("failed to get watch namespace (%s)", err)
return nil, fmt.Errorf("failed to get watch namespace (%s)", err)
}

leaseDuration, err := kedautil.ResolveOsEnvDuration("KEDA_METRICS_LEADER_ELECTION_LEASE_DURATION")
if err != nil {
logger.Error(err, "invalid KEDA_METRICS_LEADER_ELECTION_LEASE_DURATION")
return nil, nil, fmt.Errorf("invalid KEDA_METRICS_LEADER_ELECTION_LEASE_DURATION (%s)", err)
return nil, fmt.Errorf("invalid KEDA_METRICS_LEADER_ELECTION_LEASE_DURATION (%s)", err)
}

renewDeadline, err := kedautil.ResolveOsEnvDuration("KEDA_METRICS_LEADER_ELECTION_RENEW_DEADLINE")
if err != nil {
logger.Error(err, "Invalid KEDA_METRICS_LEADER_ELECTION_RENEW_DEADLINE")
return nil, nil, fmt.Errorf("invalid KEDA_METRICS_LEADER_ELECTION_RENEW_DEADLINE (%s)", err)
return nil, fmt.Errorf("invalid KEDA_METRICS_LEADER_ELECTION_RENEW_DEADLINE (%s)", err)
}

retryPeriod, err := kedautil.ResolveOsEnvDuration("KEDA_METRICS_LEADER_ELECTION_RETRY_PERIOD")
if err != nil {
logger.Error(err, "Invalid KEDA_METRICS_LEADER_ELECTION_RETRY_PERIOD")
return nil, nil, fmt.Errorf("invalid KEDA_METRICS_LEADER_ELECTION_RETRY_PERIOD (%s)", err)
return nil, fmt.Errorf("invalid KEDA_METRICS_LEADER_ELECTION_RETRY_PERIOD (%s)", err)
}

useMetricsServiceGrpc, err := kedautil.ResolveOsEnvBool("KEDA_USE_METRICS_SERVICE_GRPC", true)
if err != nil {
logger.Error(err, "Invalid KEDA_USE_METRICS_SERVICE_GRPC")
return nil, nil, fmt.Errorf("invalid KEDA_USE_METRICS_SERVICE_GRPC (%s)", err)
return nil, fmt.Errorf("invalid KEDA_USE_METRICS_SERVICE_GRPC (%s)", err)
}

// Get a config to talk to the apiserver
Expand All @@ -126,7 +121,7 @@ func (a *Adapter) makeProvider(ctx context.Context, globalHTTPTimeout time.Durat
})
if err != nil {
logger.Error(err, "failed to setup manager")
return nil, nil, err
return nil, err
}

broadcaster := record.NewBroadcaster()
Expand All @@ -135,12 +130,12 @@ func (a *Adapter) makeProvider(ctx context.Context, globalHTTPTimeout time.Durat
kubeClientset, err := kubernetes.NewForConfig(cfg)
if err != nil {
logger.Error(err, "Unable to create kube clientset")
return nil, nil, err
return nil, err
}
objectNamespace, err := kedautil.GetClusterObjectNamespace()
if err != nil {
logger.Error(err, "Unable to get cluster object namespace")
return nil, nil, err
return nil, err
}
// the namespaced kubeInformerFactory is used to restrict secret informer to only list/watch secrets in KEDA cluster object namespace,
// refer to https://github.com/kedacore/keda/issues/3668
Expand All @@ -150,49 +145,17 @@ func (a *Adapter) makeProvider(ctx context.Context, globalHTTPTimeout time.Durat
handler := scaling.NewScaleHandler(mgr.GetClient(), nil, scheme, globalHTTPTimeout, recorder, secretInformer.Lister())
kubeInformerFactory.Start(ctx.Done())

externalMetricsInfo := &[]provider.ExternalMetricInfo{}
externalMetricsInfoLock := &sync.RWMutex{}

prometheusServer := &prommetrics.PrometheusMetricServer{}
go func() { prometheusServer.NewServer(fmt.Sprintf(":%v", prometheusMetricsPort), prometheusMetricsPath) }()

stopCh := make(chan struct{})
if err := runScaledObjectController(ctx, mgr, handler, logger, externalMetricsInfo, externalMetricsInfoLock, maxConcurrentReconciles, stopCh, secretInformer.Informer().HasSynced); err != nil {
return nil, nil, err
}

logger.Info("Connecting Metrics Service gRPC client to the server", "address", metricsServiceAddr)
grpcClient, err := metricsservice.NewGrpcClient(metricsServiceAddr, a.SecureServing.ServerCert.CertDirectory)
if err != nil {
logger.Error(err, "error connecting Metrics Service gRPC client to the server", "address", metricsServiceAddr)
return nil, nil, err
return nil, err
}

return kedaprovider.NewProvider(ctx, logger, handler, mgr.GetClient(), *grpcClient, useMetricsServiceGrpc, namespace, externalMetricsInfo, externalMetricsInfoLock), stopCh, nil
}

func runScaledObjectController(ctx context.Context, mgr manager.Manager, scaleHandler scaling.ScaleHandler, logger logr.Logger, externalMetricsInfo *[]provider.ExternalMetricInfo, externalMetricsInfoLock *sync.RWMutex, maxConcurrentReconciles int, stopCh chan<- struct{}, secretSynced cache.InformerSynced) error {
if err := (&kedacontrollers.MetricsScaledObjectReconciler{
Client: mgr.GetClient(),
ScaleHandler: scaleHandler,
ExternalMetricsInfo: externalMetricsInfo,
ExternalMetricsInfoLock: externalMetricsInfoLock,
}).SetupWithManager(mgr, controller.Options{MaxConcurrentReconciles: maxConcurrentReconciles}); err != nil {
return err
}

go func() {
if err := mgr.Start(ctx); err != nil {
logger.Error(err, "controller-runtime encountered an error")
stopCh <- struct{}{}
close(stopCh)
}
}()

if ok := cache.WaitForCacheSync(ctx.Done(), secretSynced); !ok {
return fmt.Errorf("failed to wait Secrets cache synced")
}
return nil
return kedaprovider.NewProvider(ctx, logger, handler, mgr.GetClient(), *grpcClient, useMetricsServiceGrpc, namespace), nil
}

// generateDefaultMetricsServiceAddr generates default Metrics Service gRPC Server address based on the current Namespace.
Expand Down Expand Up @@ -266,26 +229,20 @@ func main() {
return
}

controllerMaxReconciles, err := kedautil.ResolveOsEnvInt("KEDA_METRICS_CTRL_MAX_RECONCILES", 1)
if err != nil {
logger.Error(err, "Invalid KEDA_METRICS_CTRL_MAX_RECONCILES")
return
}

err = printWelcomeMsg(cmd)
if err != nil {
return
}

kedaProvider, stopCh, err := cmd.makeProvider(ctx, time.Duration(globalHTTPTimeoutMS)*time.Millisecond, controllerMaxReconciles)
kedaProvider, err := cmd.makeProvider(ctx, time.Duration(globalHTTPTimeoutMS)*time.Millisecond)
if err != nil {
logger.Error(err, "making provider")
return
}
cmd.WithExternalMetrics(kedaProvider)

logger.Info(cmd.Message)
if err = cmd.Run(stopCh); err != nil {
if err = cmd.Run(wait.NeverStop); err != nil {
return
}
}
11 changes: 9 additions & 2 deletions cmd/operator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
_ "k8s.io/client-go/plugin/pkg/client/auth"
"k8s.io/client-go/tools/cache"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/config"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/healthz"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
Expand Down Expand Up @@ -202,7 +203,10 @@ func main() {
Recorder: eventRecorder,
ScaleClient: scaleClient,
ScaleHandler: scaledHandler,
}).SetupWithManager(mgr, controller.Options{MaxConcurrentReconciles: scaledObjectMaxReconciles}); err != nil {
}).SetupWithManager(mgr, controller.Options{
Controller: config.Controller{
MaxConcurrentReconciles: scaledObjectMaxReconciles,
}}); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "ScaledObject")
os.Exit(1)
}
Expand All @@ -213,7 +217,10 @@ func main() {
Recorder: eventRecorder,
SecretsLister: secretInformer.Lister(),
SecretsSynced: secretInformer.Informer().HasSynced,
}).SetupWithManager(mgr, controller.Options{MaxConcurrentReconciles: scaledJobMaxReconciles}); err != nil {
}).SetupWithManager(mgr, controller.Options{
Controller: config.Controller{
MaxConcurrentReconciles: scaledJobMaxReconciles,
}}); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "ScaledJob")
os.Exit(1)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
annotations:
controller-gen.kubebuilder.io/version: v0.11.1
controller-gen.kubebuilder.io/version: v0.11.3
creationTimestamp: null
name: clustertriggerauthentications.keda.sh
spec:
Expand Down
Loading

0 comments on commit 8c9dd7f

Please sign in to comment.