Skip to content

Commit

Permalink
Use Keda context in some cases
Browse files Browse the repository at this point in the history
Signed-off-by: Jeroen Bobbeldijk <jeroen@klippa.com>
  • Loading branch information
jerbob92 committed Oct 13, 2021
1 parent db96f64 commit b5f1b40
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 19 deletions.
20 changes: 9 additions & 11 deletions pkg/scalers/redis_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,12 @@ const (
defaultEnableTLS = false
)

var ctx = context.Background()

type redisAddressParser func(metadata, resolvedEnv, authParams map[string]string) (redisConnectionInfo, error)

type redisScaler struct {
metadata *redisMetadata
closeFn func() error
getListLengthFn func() (int64, error)
getListLengthFn func(ctx context.Context) (int64, error)
}

type redisConnectionInfo struct {
Expand Down Expand Up @@ -105,7 +103,7 @@ func createClusteredRedisScaler(meta *redisMetadata, script string) (Scaler, err
return nil
}

listLengthFn := func() (int64, error) {
listLengthFn := func(ctx context.Context) (int64, error) {
cmd := client.Eval(ctx, script, []string{meta.listName})
if cmd.Err() != nil {
return -1, cmd.Err()
Expand Down Expand Up @@ -135,7 +133,7 @@ func createSentinelRedisScaler(meta *redisMetadata, script string) (Scaler, erro
return nil
}

listLengthFn := func() (int64, error) {
listLengthFn := func(ctx context.Context) (int64, error) {
cmd := client.Eval(ctx, script, []string{meta.listName})
if cmd.Err() != nil {
return -1, cmd.Err()
Expand Down Expand Up @@ -165,7 +163,7 @@ func createRedisScaler(meta *redisMetadata, script string) (Scaler, error) {
return nil
}

listLengthFn := func() (int64, error) {
listLengthFn := func(ctx context.Context) (int64, error) {
cmd := client.Eval(ctx, script, []string{meta.listName})
if cmd.Err() != nil {
return -1, cmd.Err()
Expand Down Expand Up @@ -219,7 +217,7 @@ func parseRedisMetadata(config *ScalerConfig, parserFn redisAddressParser) (*red

// IsActive checks if there is any element in the Redis list
func (s *redisScaler) IsActive(ctx context.Context) (bool, error) {
length, err := s.getListLengthFn()
length, err := s.getListLengthFn(ctx)

if err != nil {
redisLog.Error(err, "error")
Expand Down Expand Up @@ -253,7 +251,7 @@ func (s *redisScaler) GetMetricSpecForScaling() []v2beta2.MetricSpec {

// GetMetrics connects to Redis and finds the length of the list
func (s *redisScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) {
listLen, err := s.getListLengthFn()
listLen, err := s.getListLengthFn(ctx)

if err != nil {
redisLog.Error(err, "error getting list length")
Expand Down Expand Up @@ -482,7 +480,7 @@ func getRedisClusterClient(info redisConnectionInfo) (*redis.ClusterClient, erro

// confirm if connected
c := redis.NewClusterClient(options)
err := c.Ping(ctx).Err()
err := c.Ping(context.Background()).Err()
if err != nil {
return nil, err
}
Expand All @@ -507,7 +505,7 @@ func getRedisSentinelClient(info redisConnectionInfo, dbIndex int) (*redis.Clien

// confirm if connected
c := redis.NewFailoverClient(options)
err := c.Ping(ctx).Err()
err := c.Ping(context.Background()).Err()
if err != nil {
return nil, err
}
Expand All @@ -529,7 +527,7 @@ func getRedisClient(info redisConnectionInfo, dbIndex int) (*redis.Client, error

// confirm if connected
c := redis.NewClient(options)
err := c.Ping(ctx).Err()
err := c.Ping(context.Background()).Err()
if err != nil {
return nil, err
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/scalers/redis_scaler_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package scalers

import (
"context"
"errors"
"testing"

Expand Down Expand Up @@ -79,7 +80,7 @@ func TestRedisGetMetricSpecForScaling(t *testing.T) {
t.Fatal("Could not parse metadata:", err)
}
closeFn := func() error { return nil }
lengthFn := func() (int64, error) { return -1, nil }
lengthFn := func(ctx context.Context) (int64, error) { return -1, nil }
mockRedisScaler := redisScaler{
meta,
closeFn,
Expand Down
12 changes: 6 additions & 6 deletions pkg/scalers/redis_streams_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ const (
type redisStreamsScaler struct {
metadata *redisStreamsMetadata
closeFn func() error
getPendingEntriesCountFn func() (int64, error)
getPendingEntriesCountFn func(ctx context.Context) (int64, error)
}

type redisStreamsMetadata struct {
Expand Down Expand Up @@ -82,7 +82,7 @@ func createClusteredRedisStreamsScaler(meta *redisStreamsMetadata) (Scaler, erro
return nil
}

pendingEntriesCountFn := func() (int64, error) {
pendingEntriesCountFn := func(ctx context.Context) (int64, error) {
pendingEntries, err := client.XPending(ctx, meta.streamName, meta.consumerGroupName).Result()
if err != nil {
return -1, err
Expand Down Expand Up @@ -111,7 +111,7 @@ func createSentinelRedisStreamsScaler(meta *redisStreamsMetadata) (Scaler, error
return nil
}

pendingEntriesCountFn := func() (int64, error) {
pendingEntriesCountFn := func(ctx context.Context) (int64, error) {
pendingEntries, err := client.XPending(ctx, meta.streamName, meta.consumerGroupName).Result()
if err != nil {
return -1, err
Expand Down Expand Up @@ -140,7 +140,7 @@ func createRedisStreamsScaler(meta *redisStreamsMetadata) (Scaler, error) {
return nil
}

pendingEntriesCountFn := func() (int64, error) {
pendingEntriesCountFn := func(ctx context.Context) (int64, error) {
pendingEntries, err := client.XPending(ctx, meta.streamName, meta.consumerGroupName).Result()
if err != nil {
return -1, err
Expand Down Expand Up @@ -201,7 +201,7 @@ func parseRedisStreamsMetadata(config *ScalerConfig, parseFn redisAddressParser)

// IsActive checks if there are pending entries in the 'Pending Entries List' for consumer group of a stream
func (s *redisStreamsScaler) IsActive(ctx context.Context) (bool, error) {
count, err := s.getPendingEntriesCountFn()
count, err := s.getPendingEntriesCountFn(ctx)

if err != nil {
redisStreamsLog.Error(err, "error")
Expand Down Expand Up @@ -233,7 +233,7 @@ func (s *redisStreamsScaler) GetMetricSpecForScaling() []v2beta2.MetricSpec {

// GetMetrics fetches the number of pending entries for a consumer group in a stream
func (s *redisStreamsScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) {
pendingEntriesCount, err := s.getPendingEntriesCountFn()
pendingEntriesCount, err := s.getPendingEntriesCountFn(ctx)

if err != nil {
redisStreamsLog.Error(err, "error fetching pending entries count")
Expand Down
3 changes: 2 additions & 1 deletion pkg/scalers/redis_streams_scaler_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package scalers

import (
"context"
"errors"
"strconv"
"testing"
Expand Down Expand Up @@ -141,7 +142,7 @@ func TestRedisStreamsGetMetricSpecForScaling(t *testing.T) {
t.Fatal("Could not parse metadata:", err)
}
closeFn := func() error { return nil }
getPendingEntriesCountFn := func() (int64, error) { return -1, nil }
getPendingEntriesCountFn := func(ctx context.Context) (int64, error) { return -1, nil }
mockRedisStreamsScaler := redisStreamsScaler{meta, closeFn, getPendingEntriesCountFn}

metricSpec := mockRedisStreamsScaler.GetMetricSpecForScaling()
Expand Down

0 comments on commit b5f1b40

Please sign in to comment.