Skip to content

Commit

Permalink
using dedicated HTTP clients (kedacore#1251)
Browse files Browse the repository at this point in the history
Signed-off-by: Aaron Schlesinger <aaron@ecomaz.net>
  • Loading branch information
arschles authored and ycabrer committed Mar 1, 2021
1 parent 3ded87f commit 7bfc6ac
Show file tree
Hide file tree
Showing 34 changed files with 290 additions and 198 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@

### New

- KEDA uses a dedicated [HTTP client](https://pkg.go.dev/net/http#Client), connection pool, and (optional) TLS certificate for each configured scaler
- KEDA scales any CustomResource that implements Scale subresource ([#703](https://github.com/kedacore/keda/issues/703))
- Provide KEDA go-client ([#494](https://github.com/kedacore/keda/issues/494))
- Define KEDA readiness and liveness probes ([#788](https://github.com/kedacore/keda/issues/788))
Expand Down
36 changes: 27 additions & 9 deletions adapter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"fmt"
"os"
"runtime"
"strconv"
"time"

appsv1 "k8s.io/api/apps/v1"
"k8s.io/apimachinery/pkg/util/wait"
Expand Down Expand Up @@ -39,44 +41,44 @@ var (
prometheusMetricsPath string
)

func (a *Adapter) makeProviderOrDie() provider.MetricsProvider {
func (a *Adapter) makeProvider(globalHTTPTimeout time.Duration) (provider.MetricsProvider, error) {
// Get a config to talk to the apiserver
cfg, err := config.GetConfig()
if err != nil {
logger.Error(err, "failed to get the config")
os.Exit(1)
return nil, fmt.Errorf("failed to get the config (%s)", err)
}

scheme := scheme.Scheme
if err := appsv1.SchemeBuilder.AddToScheme(scheme); err != nil {
logger.Error(err, "failed to add apps/v1 scheme to runtime scheme")
os.Exit(1)
return nil, fmt.Errorf("failed to add apps/v1 scheme to runtime scheme (%s)", err)
}
if err := kedav1alpha1.SchemeBuilder.AddToScheme(scheme); err != nil {
logger.Error(err, "failed to add keda scheme to runtime scheme")
os.Exit(1)
return nil, fmt.Errorf("failed to add keda scheme to runtime scheme (%s)", err)
}

kubeclient, err := client.New(cfg, client.Options{
Scheme: scheme,
})
if err != nil {
logger.Error(err, "unable to construct new client")
os.Exit(1)
return nil, fmt.Errorf("unable to construct new client (%s)", err)
}

handler := scaling.NewScaleHandler(kubeclient, nil, scheme)
handler := scaling.NewScaleHandler(kubeclient, nil, scheme, globalHTTPTimeout)

namespace, err := getWatchNamespace()
if err != nil {
logger.Error(err, "failed to get watch namespace")
os.Exit(1)
return nil, fmt.Errorf("failed to get watch namespace (%s)", err)
}

prometheusServer := &prommetrics.PrometheusMetricServer{}
go func() { prometheusServer.NewServer(fmt.Sprintf(":%v", prometheusMetricsPort), prometheusMetricsPath) }()

return kedaprovider.NewProvider(logger, handler, kubeclient, namespace)
return kedaprovider.NewProvider(logger, handler, kubeclient, namespace), nil
}

func printVersion() {
Expand Down Expand Up @@ -117,7 +119,23 @@ func main() {
return
}

kedaProvider := cmd.makeProviderOrDie()
globalHTTPTimeoutStr := os.Getenv("KEDA_HTTP_DEFAULT_TIMEOUT")
if globalHTTPTimeoutStr == "" {
// default to 3 seconds if they don't pass the env var
globalHTTPTimeoutStr = "3000"
}

globalHTTPTimeoutMS, err := strconv.Atoi(globalHTTPTimeoutStr)
if err != nil {
logger.Error(err, "Invalid KEDA_HTTP_DEFAULT_TIMEOUT")
return
}

kedaProvider, err := cmd.makeProvider(time.Duration(globalHTTPTimeoutMS) * time.Millisecond)
if err != nil {
logger.Error(err, "making provider")
return
}
cmd.WithExternalMetrics(kedaProvider)

logger.Info(cmd.Message)
Expand Down
10 changes: 6 additions & 4 deletions controllers/scaledjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package controllers
import (
"context"
"fmt"
"time"

"github.com/go-logr/logr"
batchv1 "k8s.io/api/batch/v1"
Expand All @@ -25,14 +26,15 @@ import (
// ScaledJobReconciler reconciles a ScaledJob object
type ScaledJobReconciler struct {
client.Client
Log logr.Logger
Scheme *runtime.Scheme
scaleHandler scaling.ScaleHandler
Log logr.Logger
Scheme *runtime.Scheme
scaleHandler scaling.ScaleHandler
globalHTTPTimeout time.Duration
}

// SetupWithManager initializes the ScaledJobReconciler instance and starts a new controller managed by the passed Manager instance.
func (r *ScaledJobReconciler) SetupWithManager(mgr ctrl.Manager) error {
r.scaleHandler = scaling.NewScaleHandler(mgr.GetClient(), nil, mgr.GetScheme())
r.scaleHandler = scaling.NewScaleHandler(mgr.GetClient(), nil, mgr.GetScheme(), r.globalHTTPTimeout)

return ctrl.NewControllerManagedBy(mgr).
// Ignore updates to ScaledJob Status (in this case metadata.Generation does not change)
Expand Down
5 changes: 4 additions & 1 deletion controllers/scaledobject_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"sync"
"time"

"github.com/go-logr/logr"
autoscalingv2beta2 "k8s.io/api/autoscaling/v2beta2"
Expand Down Expand Up @@ -48,6 +49,8 @@ type ScaledObjectReconciler struct {
scaledObjectsGenerations *sync.Map
scaleHandler scaling.ScaleHandler
kubeVersion kedautil.K8sVersion

globalHTTPTimeout time.Duration
}

// SetupWithManager initializes the ScaledObjectReconciler instance and starts a new controller managed by the passed Manager instance.
Expand Down Expand Up @@ -75,7 +78,7 @@ func (r *ScaledObjectReconciler) SetupWithManager(mgr ctrl.Manager) error {
// Init the rest of ScaledObjectReconciler
r.restMapper = mgr.GetRESTMapper()
r.scaledObjectsGenerations = &sync.Map{}
r.scaleHandler = scaling.NewScaleHandler(mgr.GetClient(), r.scaleClient, mgr.GetScheme())
r.scaleHandler = scaling.NewScaleHandler(mgr.GetClient(), r.scaleClient, mgr.GetScheme(), r.globalHTTPTimeout)

// Start controller
return ctrl.NewControllerManagedBy(mgr).
Expand Down
Loading

0 comments on commit 7bfc6ac

Please sign in to comment.