Skip to content

Commit

Permalink
Propagating contexts to all remaining scalers (#2267)
Browse files Browse the repository at this point in the history
Signed-off-by: Aaron Schlesinger <aaron@ecomaz.net>
  • Loading branch information
arschles authored Nov 10, 2021
1 parent 37a4324 commit 89d8663
Show file tree
Hide file tree
Showing 15 changed files with 120 additions and 103 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

### Improvements

- Improve context handling in appropriate functionality in which we instantiate scalers ([#2267](https://github.com/kedacore/keda/pull/2267))
- Improve validation in Cron scaler in case start & end input is same.([#2032](https://github.com/kedacore/keda/pull/2032))
- Improve the cron validation in Cron Scaler ([#2038](https://github.com/kedacore/keda/pull/2038))
- Add Bearer auth for Metrics API scaler ([#2028](https://github.com/kedacore/keda/pull/2028))
Expand Down
13 changes: 7 additions & 6 deletions adapter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ var (
adapterClientRequestBurst int
)

func (a *Adapter) makeProvider(globalHTTPTimeout time.Duration) (provider.MetricsProvider, <-chan struct{}, error) {
func (a *Adapter) makeProvider(ctx context.Context, globalHTTPTimeout time.Duration) (provider.MetricsProvider, <-chan struct{}, error) {
// Get a config to talk to the apiserver
cfg, err := config.GetConfig()
if cfg != nil {
Expand Down Expand Up @@ -113,14 +113,14 @@ func (a *Adapter) makeProvider(globalHTTPTimeout time.Duration) (provider.Metric
prometheusServer := &prommetrics.PrometheusMetricServer{}
go func() { prometheusServer.NewServer(fmt.Sprintf(":%v", prometheusMetricsPort), prometheusMetricsPath) }()
stopCh := make(chan struct{})
if err := runScaledObjectController(scheme, namespace, handler, logger, stopCh); err != nil {
if err := runScaledObjectController(ctx, scheme, namespace, handler, logger, stopCh); err != nil {
return nil, nil, err
}

return kedaprovider.NewProvider(logger, handler, kubeclient, namespace), stopCh, nil
return kedaprovider.NewProvider(ctx, logger, handler, kubeclient, namespace), stopCh, nil
}

func runScaledObjectController(scheme *k8sruntime.Scheme, namespace string, scaleHandler scaling.ScaleHandler, logger logr.Logger, stopCh chan<- struct{}) error {
func runScaledObjectController(ctx context.Context, scheme *k8sruntime.Scheme, namespace string, scaleHandler scaling.ScaleHandler, logger logr.Logger, stopCh chan<- struct{}) error {
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
Scheme: scheme,
Namespace: namespace,
Expand All @@ -136,7 +136,7 @@ func runScaledObjectController(scheme *k8sruntime.Scheme, namespace string, scal
}

go func() {
if err := mgr.Start(context.Background()); err != nil {
if err := mgr.Start(ctx); err != nil {
logger.Error(err, "controller-runtime encountered an error")
stopCh <- struct{}{}
close(stopCh)
Expand Down Expand Up @@ -164,6 +164,7 @@ func getWatchNamespace() (string, error) {
}

func main() {
ctx := ctrl.SetupSignalHandler()
var err error
defer func() {
if err != nil {
Expand Down Expand Up @@ -205,7 +206,7 @@ func main() {
return
}

kedaProvider, stopCh, err := cmd.makeProvider(time.Duration(globalHTTPTimeoutMS) * time.Millisecond)
kedaProvider, stopCh, err := cmd.makeProvider(ctx, time.Duration(globalHTTPTimeoutMS)*time.Millisecond)
if err != nil {
logger.Error(err, "making provider")
return
Expand Down
1 change: 1 addition & 0 deletions apis/keda/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 5 additions & 5 deletions controllers/keda/scaledjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func (r *ScaledJobReconciler) reconcileScaledJob(ctx context.Context, logger log
}

// scaledJob was created or modified - let's start a new ScaleLoop
err = r.requestScaleLoop(logger, scaledJob)
err = r.requestScaleLoop(ctx, logger, scaledJob)
if err != nil {
return "Failed to start a new scale loop with scaling logic", err
}
Expand Down Expand Up @@ -187,13 +187,13 @@ func (r *ScaledJobReconciler) deletePreviousVersionScaleJobs(ctx context.Context
}

// requestScaleLoop request ScaleLoop handler for the respective ScaledJob
func (r *ScaledJobReconciler) requestScaleLoop(logger logr.Logger, scaledJob *kedav1alpha1.ScaledJob) error {
func (r *ScaledJobReconciler) requestScaleLoop(ctx context.Context, logger logr.Logger, scaledJob *kedav1alpha1.ScaledJob) error {
logger.V(1).Info("Starting a new ScaleLoop")
return r.scaleHandler.HandleScalableObject(scaledJob)
return r.scaleHandler.HandleScalableObject(ctx, scaledJob)
}

// stopScaleLoop stops ScaleLoop handler for the respective ScaledJob
func (r *ScaledJobReconciler) stopScaleLoop(logger logr.Logger, scaledJob *kedav1alpha1.ScaledJob) error {
func (r *ScaledJobReconciler) stopScaleLoop(ctx context.Context, logger logr.Logger, scaledJob *kedav1alpha1.ScaledJob) error {
logger.V(1).Info("Stopping a ScaleLoop")
return r.scaleHandler.DeleteScalableObject(scaledJob)
return r.scaleHandler.DeleteScalableObject(ctx, scaledJob)
}
2 changes: 1 addition & 1 deletion controllers/keda/scaledjob_finalizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func (r *ScaledJobReconciler) finalizeScaledJob(ctx context.Context, logger logr
// Run finalization logic for scaledJobFinalizer. If the
// finalization logic fails, don't remove the finalizer so
// that we can retry during the next reconciliation.
if err := r.stopScaleLoop(logger, scaledJob); err != nil {
if err := r.stopScaleLoop(ctx, logger, scaledJob); err != nil {
return err
}

Expand Down
10 changes: 5 additions & 5 deletions controllers/keda/scaledobject_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ func (r *ScaledObjectReconciler) reconcileScaledObject(ctx context.Context, logg

// Notify ScaleHandler if a new HPA was created or if ScaledObject was updated
if newHPACreated || scaleObjectSpecChanged {
if r.requestScaleLoop(logger, scaledObject) != nil {
if r.requestScaleLoop(ctx, logger, scaledObject) != nil {
return "Failed to start a new scale loop with scaling logic", err
}
logger.Info("Initializing Scaling logic according to ScaledObject Specification")
Expand Down Expand Up @@ -382,7 +382,7 @@ func (r *ScaledObjectReconciler) ensureHPAForScaledObjectExists(ctx context.Cont
}

// startScaleLoop starts ScaleLoop handler for the respective ScaledObject
func (r *ScaledObjectReconciler) requestScaleLoop(logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject) error {
func (r *ScaledObjectReconciler) requestScaleLoop(ctx context.Context, logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject) error {
logger.V(1).Info("Notify scaleHandler of an update in scaledObject")

key, err := cache.MetaNamespaceKeyFunc(scaledObject)
Expand All @@ -391,7 +391,7 @@ func (r *ScaledObjectReconciler) requestScaleLoop(logger logr.Logger, scaledObje
return err
}

if err = r.scaleHandler.HandleScalableObject(scaledObject); err != nil {
if err = r.scaleHandler.HandleScalableObject(ctx, scaledObject); err != nil {
return err
}

Expand All @@ -402,14 +402,14 @@ func (r *ScaledObjectReconciler) requestScaleLoop(logger logr.Logger, scaledObje
}

// stopScaleLoop stops ScaleLoop handler for the respective ScaleObject
func (r *ScaledObjectReconciler) stopScaleLoop(logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject) error {
func (r *ScaledObjectReconciler) stopScaleLoop(ctx context.Context, logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject) error {
key, err := cache.MetaNamespaceKeyFunc(scaledObject)
if err != nil {
logger.Error(err, "Error getting key for scaledObject")
return err
}

if err := r.scaleHandler.DeleteScalableObject(scaledObject); err != nil {
if err := r.scaleHandler.DeleteScalableObject(ctx, scaledObject); err != nil {
return err
}
// delete ScaledObject's current Generation
Expand Down
2 changes: 1 addition & 1 deletion controllers/keda/scaledobject_finalizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (r *ScaledObjectReconciler) finalizeScaledObject(ctx context.Context, logge
// Run finalization logic for scaledObjectFinalizer. If the
// finalization logic fails, don't remove the finalizer so
// that we can retry during the next reconciliation.
if err := r.stopScaleLoop(logger, scaledObject); err != nil {
if err := r.stopScaleLoop(ctx, logger, scaledObject); err != nil {
return err
}

Expand Down
16 changes: 8 additions & 8 deletions pkg/mock/mock_scaling/mock_interface.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions pkg/provider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type KedaProvider struct {
externalMetrics []externalMetric
scaleHandler scaling.ScaleHandler
watchedNamespace string
ctx context.Context
}

type externalMetric struct{}
Expand All @@ -50,13 +51,14 @@ var logger logr.Logger
var metricsServer prommetrics.PrometheusMetricServer

// NewProvider returns an instance of KedaProvider
func NewProvider(adapterLogger logr.Logger, scaleHandler scaling.ScaleHandler, client client.Client, watchedNamespace string) provider.MetricsProvider {
func NewProvider(ctx context.Context, adapterLogger logr.Logger, scaleHandler scaling.ScaleHandler, client client.Client, watchedNamespace string) provider.MetricsProvider {
provider := &KedaProvider{
values: make(map[provider.CustomMetricInfo]int64),
externalMetrics: make([]externalMetric, 2, 10),
client: client,
scaleHandler: scaleHandler,
watchedNamespace: watchedNamespace,
ctx: ctx,
}
logger = adapterLogger.WithName("provider")
logger.Info("starting")
Expand Down Expand Up @@ -149,7 +151,7 @@ func (p *KedaProvider) ListAllExternalMetrics() []provider.ExternalMetricInfo {
opts := []client.ListOption{
client.InNamespace(p.watchedNamespace),
}
err := p.client.List(context.TODO(), scaledObjects, opts...)
err := p.client.List(p.ctx, scaledObjects, opts...)
if err != nil {
logger.Error(err, "Cannot get list of ScaledObjects", "WatchedNamespace", p.watchedNamespace)
return nil
Expand Down
14 changes: 9 additions & 5 deletions pkg/scalers/azure_servicebus_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ const (
var azureServiceBusLog = logf.Log.WithName("azure_servicebus_scaler")

type azureServiceBusScaler struct {
ctx context.Context
metadata *azureServiceBusMetadata
podIdentity kedav1alpha1.PodIdentityProvider
httpClient *http.Client
Expand All @@ -68,13 +69,14 @@ type azureServiceBusMetadata struct {
}

// NewAzureServiceBusScaler creates a new AzureServiceBusScaler
func NewAzureServiceBusScaler(config *ScalerConfig) (Scaler, error) {
func NewAzureServiceBusScaler(ctx context.Context, config *ScalerConfig) (Scaler, error) {
meta, err := parseAzureServiceBusMetadata(config)
if err != nil {
return nil, fmt.Errorf("error parsing azure service bus metadata: %s", err)
}

return &azureServiceBusScaler{
ctx: ctx,
metadata: meta,
podIdentity: config.PodIdentity,
httpClient: kedautil.CreateHTTPClient(config.GlobalHTTPTimeout),
Expand Down Expand Up @@ -180,7 +182,7 @@ func (s *azureServiceBusScaler) Close(context.Context) error {
// Returns the metric spec to be used by the HPA
func (s *azureServiceBusScaler) GetMetricSpecForScaling(context.Context) []v2beta2.MetricSpec {
targetLengthQty := resource.NewQuantity(int64(s.metadata.targetLength), resource.DecimalSI)
namespace, err := s.getServiceBusNamespace()
namespace, err := s.getServiceBusNamespace(s.ctx)
if err != nil {
azureServiceBusLog.Error(err, "error parsing azure service bus metadata", "namespace")
return nil
Expand Down Expand Up @@ -225,11 +227,12 @@ func (s *azureServiceBusScaler) GetMetrics(ctx context.Context, metricName strin

type azureTokenProvider struct {
httpClient *http.Client
ctx context.Context
}

// GetToken implements TokenProvider interface for azureTokenProvider
func (a azureTokenProvider) GetToken(uri string) (*auth.Token, error) {
ctx := context.Background()
ctx := a.ctx
// Service bus resource id is "https://servicebus.azure.net/" in all cloud environments
token, err := azure.GetAzureADPodIdentityToken(ctx, a.httpClient, "https://servicebus.azure.net/")
if err != nil {
Expand All @@ -246,7 +249,7 @@ func (a azureTokenProvider) GetToken(uri string) (*auth.Token, error) {
// Returns the length of the queue or subscription
func (s *azureServiceBusScaler) GetAzureServiceBusLength(ctx context.Context) (int32, error) {
// get namespace
namespace, err := s.getServiceBusNamespace()
namespace, err := s.getServiceBusNamespace(ctx)
if err != nil {
return -1, err
}
Expand All @@ -262,7 +265,7 @@ func (s *azureServiceBusScaler) GetAzureServiceBusLength(ctx context.Context) (i
}

// Returns service bus namespace object
func (s *azureServiceBusScaler) getServiceBusNamespace() (*servicebus.Namespace, error) {
func (s *azureServiceBusScaler) getServiceBusNamespace(ctx context.Context) (*servicebus.Namespace, error) {
var namespace *servicebus.Namespace
var err error

Expand All @@ -277,6 +280,7 @@ func (s *azureServiceBusScaler) getServiceBusNamespace() (*servicebus.Namespace,
return namespace, err
}
namespace.TokenProvider = azureTokenProvider{
ctx: ctx,
httpClient: s.httpClient,
}
namespace.Name = s.metadata.namespace
Expand Down
Loading

0 comments on commit 89d8663

Please sign in to comment.