Skip to content

Commit

Permalink
fix: Restore prometheus metrics in Metrics Server (#4766)
Browse files Browse the repository at this point in the history
Signed-off-by: Jorge Turrado <jorge_turrado@hotmail.es>
Signed-off-by: Jorge Turrado Ferrero <Jorge_turrado@hotmail.es>
  • Loading branch information
JorTurFer committed Jul 7, 2023
1 parent 9dbd41e commit b4919ea
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 67 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio

### Fixes

- **General**: Metrics server exposes Prometheus metrics ([#4776](https://github.com/kedacore/keda/issues/4776))
- **AWS Pod Identity Authentication**: Use `default` service account if the workload doesn't set it ([#4767](https://github.com/kedacore/keda/issues/4767))
- **Pulsar Scaler**: Fix `msgBacklogThreshold` field being named wrongly as `msgBacklog` ([#4681](https://github.com/kedacore/keda/issues/4681))

Expand Down
72 changes: 22 additions & 50 deletions cmd/adapter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,9 @@ import (
"flag"
"fmt"
"os"
"time"

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/wait"
kubeinformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
"k8s.io/klog/v2/klogr"
ctrl "sigs.k8s.io/controller-runtime"
Expand All @@ -40,7 +34,6 @@ import (
kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1"
"github.com/kedacore/keda/v2/pkg/metricsservice"
kedaprovider "github.com/kedacore/keda/v2/pkg/provider"
"github.com/kedacore/keda/v2/pkg/scaling"
kedautil "github.com/kedacore/keda/v2/pkg/util"
)

Expand All @@ -62,44 +55,44 @@ var (
metricsServiceAddr string
)

func (a *Adapter) makeProvider(ctx context.Context, globalHTTPTimeout time.Duration) (provider.ExternalMetricsProvider, error) {
func (a *Adapter) makeProvider(ctx context.Context) (provider.ExternalMetricsProvider, <-chan struct{}, error) {
scheme := scheme.Scheme
if err := appsv1.SchemeBuilder.AddToScheme(scheme); err != nil {
logger.Error(err, "failed to add apps/v1 scheme to runtime scheme")
return nil, fmt.Errorf("failed to add apps/v1 scheme to runtime scheme (%s)", err)
return nil, nil, fmt.Errorf("failed to add apps/v1 scheme to runtime scheme (%s)", err)
}
if err := kedav1alpha1.SchemeBuilder.AddToScheme(scheme); err != nil {
logger.Error(err, "failed to add keda scheme to runtime scheme")
return nil, fmt.Errorf("failed to add keda scheme to runtime scheme (%s)", err)
return nil, nil, fmt.Errorf("failed to add keda scheme to runtime scheme (%s)", err)
}
namespace, err := getWatchNamespace()
if err != nil {
logger.Error(err, "failed to get watch namespace")
return nil, fmt.Errorf("failed to get watch namespace (%s)", err)
return nil, nil, fmt.Errorf("failed to get watch namespace (%s)", err)
}

leaseDuration, err := kedautil.ResolveOsEnvDuration("KEDA_METRICS_LEADER_ELECTION_LEASE_DURATION")
if err != nil {
logger.Error(err, "invalid KEDA_METRICS_LEADER_ELECTION_LEASE_DURATION")
return nil, fmt.Errorf("invalid KEDA_METRICS_LEADER_ELECTION_LEASE_DURATION (%s)", err)
return nil, nil, fmt.Errorf("invalid KEDA_METRICS_LEADER_ELECTION_LEASE_DURATION (%s)", err)
}

renewDeadline, err := kedautil.ResolveOsEnvDuration("KEDA_METRICS_LEADER_ELECTION_RENEW_DEADLINE")
if err != nil {
logger.Error(err, "Invalid KEDA_METRICS_LEADER_ELECTION_RENEW_DEADLINE")
return nil, fmt.Errorf("invalid KEDA_METRICS_LEADER_ELECTION_RENEW_DEADLINE (%s)", err)
return nil, nil, fmt.Errorf("invalid KEDA_METRICS_LEADER_ELECTION_RENEW_DEADLINE (%s)", err)
}

retryPeriod, err := kedautil.ResolveOsEnvDuration("KEDA_METRICS_LEADER_ELECTION_RETRY_PERIOD")
if err != nil {
logger.Error(err, "Invalid KEDA_METRICS_LEADER_ELECTION_RETRY_PERIOD")
return nil, fmt.Errorf("invalid KEDA_METRICS_LEADER_ELECTION_RETRY_PERIOD (%s)", err)
return nil, nil, fmt.Errorf("invalid KEDA_METRICS_LEADER_ELECTION_RETRY_PERIOD (%s)", err)
}

useMetricsServiceGrpc, err := kedautil.ResolveOsEnvBool("KEDA_USE_METRICS_SERVICE_GRPC", true)
if err != nil {
logger.Error(err, "Invalid KEDA_USE_METRICS_SERVICE_GRPC")
return nil, fmt.Errorf("invalid KEDA_USE_METRICS_SERVICE_GRPC (%s)", err)
return nil, nil, fmt.Errorf("invalid KEDA_USE_METRICS_SERVICE_GRPC (%s)", err)
}

// Get a config to talk to the apiserver
Expand All @@ -121,38 +114,24 @@ func (a *Adapter) makeProvider(ctx context.Context, globalHTTPTimeout time.Durat
})
if err != nil {
logger.Error(err, "failed to setup manager")
return nil, err
return nil, nil, err
}

broadcaster := record.NewBroadcaster()
recorder := broadcaster.NewRecorder(scheme, corev1.EventSource{Component: "keda-metrics-adapter"})

kubeClientset, err := kubernetes.NewForConfig(cfg)
if err != nil {
logger.Error(err, "Unable to create kube clientset")
return nil, err
}
objectNamespace, err := kedautil.GetClusterObjectNamespace()
if err != nil {
logger.Error(err, "Unable to get cluster object namespace")
return nil, err
}
// the namespaced kubeInformerFactory is used to restrict secret informer to only list/watch secrets in KEDA cluster object namespace,
// refer to https://github.com/kedacore/keda/issues/3668
kubeInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(kubeClientset, 1*time.Hour, kubeinformers.WithNamespace(objectNamespace))
secretInformer := kubeInformerFactory.Core().V1().Secrets()

handler := scaling.NewScaleHandler(mgr.GetClient(), nil, scheme, globalHTTPTimeout, recorder, secretInformer.Lister())
kubeInformerFactory.Start(ctx.Done())

logger.Info("Connecting Metrics Service gRPC client to the server", "address", metricsServiceAddr)
grpcClient, err := metricsservice.NewGrpcClient(metricsServiceAddr, a.SecureServing.ServerCert.CertDirectory)
if err != nil {
logger.Error(err, "error connecting Metrics Service gRPC client to the server", "address", metricsServiceAddr)
return nil, err
}

return kedaprovider.NewProvider(ctx, logger, handler, mgr.GetClient(), *grpcClient, useMetricsServiceGrpc, namespace), nil
return nil, nil, err
}
stopCh := make(chan struct{})
go func() {
if err := mgr.Start(ctx); err != nil {
logger.Error(err, "controller-runtime encountered an error")
stopCh <- struct{}{}
close(stopCh)
}
}()
return kedaprovider.NewProvider(ctx, logger, mgr.GetClient(), *grpcClient, useMetricsServiceGrpc, namespace), stopCh, nil
}

// generateDefaultMetricsServiceAddr generates default Metrics Service gRPC Server address based on the current Namespace.
Expand Down Expand Up @@ -217,27 +196,20 @@ func main() {

ctrl.SetLogger(logger)

// default to 3 seconds if they don't pass the env var
globalHTTPTimeoutMS, err := kedautil.ResolveOsEnvInt("KEDA_HTTP_DEFAULT_TIMEOUT", 3000)
if err != nil {
logger.Error(err, "Invalid KEDA_HTTP_DEFAULT_TIMEOUT")
return
}

err = printWelcomeMsg(cmd)
if err != nil {
return
}

kedaProvider, err := cmd.makeProvider(ctx, time.Duration(globalHTTPTimeoutMS)*time.Millisecond)
kedaProvider, stopCh, err := cmd.makeProvider(ctx)
if err != nil {
logger.Error(err, "making provider")
return
}
cmd.WithExternalMetrics(kedaProvider)

logger.Info(cmd.Message)
if err = cmd.Run(wait.NeverStop); err != nil {
if err = cmd.Run(stopCh); err != nil {
return
}
}
4 changes: 2 additions & 2 deletions config/metrics-server/service.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ spec:
- name: https
port: 443
targetPort: 6443
- name: http
port: 80
- name: metrics
port: 8080
targetPort: 8080
selector:
app: keda-metrics-apiserver
5 changes: 1 addition & 4 deletions pkg/provider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,13 @@ import (

kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1"
"github.com/kedacore/keda/v2/pkg/metricsservice"
"github.com/kedacore/keda/v2/pkg/scaling"
)

// KedaProvider implements External Metrics Provider
type KedaProvider struct {
defaults.DefaultExternalMetricsProvider

client client.Client
scaleHandler scaling.ScaleHandler
watchedNamespace string
ctx context.Context

Expand All @@ -52,10 +50,9 @@ var (
)

// NewProvider returns an instance of KedaProvider
func NewProvider(ctx context.Context, adapterLogger logr.Logger, scaleHandler scaling.ScaleHandler, client client.Client, grpcClient metricsservice.GrpcClient, useMetricsServiceGrpc bool, watchedNamespace string) provider.ExternalMetricsProvider {
func NewProvider(ctx context.Context, adapterLogger logr.Logger, client client.Client, grpcClient metricsservice.GrpcClient, useMetricsServiceGrpc bool, watchedNamespace string) provider.ExternalMetricsProvider {
provider := &KedaProvider{
client: client,
scaleHandler: scaleHandler,
watchedNamespace: watchedNamespace,
ctx: ctx,
grpcClient: grpcClient,
Expand Down
28 changes: 17 additions & 11 deletions tests/sequential/prometheus_metrics/prometheus_metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,18 @@ const (
)

var (
testNamespace = fmt.Sprintf("%s-ns", testName)
deploymentName = fmt.Sprintf("%s-deployment", testName)
monitoredDeploymentName = fmt.Sprintf("%s-monitored", testName)
scaledObjectName = fmt.Sprintf("%s-so", testName)
wrongScaledObjectName = fmt.Sprintf("%s-wrong", testName)
wrongScalerName = fmt.Sprintf("%s-wrong-scaler", testName)
cronScaledJobName = fmt.Sprintf("%s-cron-sj", testName)
clientName = fmt.Sprintf("%s-client", testName)
kedaOperatorPrometheusURL = "http://keda-operator.keda.svc.cluster.local:8080/metrics"
kedaWebhookPrometheusURL = "http://keda-admission-webhooks.keda.svc.cluster.local:8080/metrics"
namespaceString = "namespace"
testNamespace = fmt.Sprintf("%s-ns", testName)
deploymentName = fmt.Sprintf("%s-deployment", testName)
monitoredDeploymentName = fmt.Sprintf("%s-monitored", testName)
scaledObjectName = fmt.Sprintf("%s-so", testName)
wrongScaledObjectName = fmt.Sprintf("%s-wrong", testName)
wrongScalerName = fmt.Sprintf("%s-wrong-scaler", testName)
cronScaledJobName = fmt.Sprintf("%s-cron-sj", testName)
clientName = fmt.Sprintf("%s-client", testName)
kedaOperatorPrometheusURL = "http://keda-operator.keda.svc.cluster.local:8080/metrics"
kedaMetricsServerPrometheusURL = "http://keda-metrics-apiserver.keda.svc.cluster.local:8080/metrics"
kedaWebhookPrometheusURL = "http://keda-admission-webhooks.keda.svc.cluster.local:8080/metrics"
namespaceString = "namespace"
)

type templateData struct {
Expand Down Expand Up @@ -267,6 +268,7 @@ func TestPrometheusMetrics(t *testing.T) {
testScalerErrors(t, data)
testScalerErrorsTotal(t, data)
testOperatorMetrics(t, kc, data)
testMetricServerMetrics(t)
testWebhookMetrics(t, data)
testScalableObjectMetrics(t)

Expand Down Expand Up @@ -617,6 +619,10 @@ func testWebhookMetricValues(t *testing.T) {
checkWebhookValues(t, families)
}

func testMetricServerMetrics(t *testing.T) {
_ = fetchAndParsePrometheusMetrics(t, fmt.Sprintf("curl --insecure %s", kedaMetricsServerPrometheusURL))
}

func testOperatorMetricValues(t *testing.T, kc *kubernetes.Clientset) {
families := fetchAndParsePrometheusMetrics(t, fmt.Sprintf("curl --insecure %s", kedaOperatorPrometheusURL))
expectedTriggerTotals, expectedCrTotals := getOperatorMetricsManually(t, kc)
Expand Down

0 comments on commit b4919ea

Please sign in to comment.