Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve Redis support #2181

Merged
merged 8 commits into from
Nov 1, 2021
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
- TODO ([#XXX](https://github.com/kedacore/keda/pull/XXX))
- ScaledJob: introduce MultipleScalersCalculation ([#2016](https://github.com/kedacore/keda/pull/2016))
- Add Graphite Scaler ([#1628](https://github.com/kedacore/keda/pull/2092))
- Improve Redis Scaler, upgrade library, add username and Sentinel support ([#2181](https://github.com/kedacore/keda/pull/2181))

### Improvements

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ require (
github.com/denisenkom/go-mssqldb v0.11.0
github.com/go-logr/logr v0.4.0
github.com/go-playground/assert/v2 v2.0.1
github.com/go-redis/redis v6.15.9+incompatible
github.com/go-redis/redis/v8 v8.11.4
github.com/go-sql-driver/mysql v1.6.0
github.com/golang/mock v1.6.0
github.com/golang/protobuf v1.5.2
Expand Down
9 changes: 6 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,9 @@ github.com/certifi/gocertifi v0.0.0-20191021191039-0944d244cd40/go.mod h1:sGbDF6
github.com/certifi/gocertifi v0.0.0-20200922220541-2c3bb06c6054/go.mod h1:sGbDF6GwGcLpkNXPUTkMRoywsNa/ol15pxFe6ERfguA=
github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY=
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE=
github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
Expand Down Expand Up @@ -241,6 +242,8 @@ github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZm
github.com/dgryski/go-gk v0.0.0-20140819190930-201884a44051/go.mod h1:qm+vckxRlDt0aOla0RYJJVeqHZlWfOm2UIxHaqPB46E=
github.com/dgryski/go-gk v0.0.0-20200319235926-a69029f61654/go.mod h1:qm+vckxRlDt0aOla0RYJJVeqHZlWfOm2UIxHaqPB46E=
github.com/dgryski/go-lttb v0.0.0-20180810165845-318fcdf10a77/go.mod h1:Va5MyIzkU0rAM92tn3hb3Anb7oz7KcnixF49+2wOMe4=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no=
github.com/dimchansky/utfbom v1.1.0/go.mod h1:rO41eb7gLfo8SF1jd9F8HplJm1Fewwi4mQvIirEdv+8=
github.com/dimchansky/utfbom v1.1.1 h1:vV6w1AhK4VMnhBno/TPVCoK9U/LP0PkLCS9tbxHdi/U=
Expand Down Expand Up @@ -349,8 +352,8 @@ github.com/go-playground/universal-translator v0.17.0 h1:icxd5fm+REJzpZx7ZfpaD87
github.com/go-playground/universal-translator v0.17.0/go.mod h1:UkSxE5sNxxRwHyU+Scu5vgOQjsIJAF8j9muTVoKLVtA=
github.com/go-playground/validator/v10 v10.2.0 h1:KgJ0snyC2R9VXYN2rneOtQcw5aHQB1Vv0sFl1UcHBOY=
github.com/go-playground/validator/v10 v10.2.0/go.mod h1:uOYAAleCW8F/7oMFd6aG0GOhaH6EGOAJShg8Id5JGkI=
github.com/go-redis/redis v6.15.9+incompatible h1:K0pv1D7EQUjfyoMql+r/jZqCLizCGKFlFgcHWWmHQjg=
github.com/go-redis/redis v6.15.9+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA=
github.com/go-redis/redis/v8 v8.11.4 h1:kHoYkfZP6+pe04aFTnhDH6GDROa5yJdHJVNxV3F46Tg=
github.com/go-redis/redis/v8 v8.11.4/go.mod h1:2Z2wHZXdQpCDXEGzqMockDpNyYvi2l4Pxt6RJr792+w=
github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
github.com/go-sql-driver/mysql v1.6.0 h1:BCTh4TKNUYmOmMUcQ3IipzF5prigylS7XXjEkfCHuOE=
github.com/go-sql-driver/mysql v1.6.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
Expand Down
186 changes: 169 additions & 17 deletions pkg/scalers/redis_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"strconv"
"strings"

"github.com/go-redis/redis"
"github.com/go-redis/redis/v8"
v2beta2 "k8s.io/api/autoscaling/v2beta2"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -29,15 +29,19 @@ type redisAddressParser func(metadata, resolvedEnv, authParams map[string]string
type redisScaler struct {
metadata *redisMetadata
closeFn func() error
getListLengthFn func() (int64, error)
getListLengthFn func(ctx context.Context) (int64, error)
}

type redisConnectionInfo struct {
addresses []string
password string
hosts []string
ports []string
enableTLS bool
addresses []string
username string
password string
sentinelUsername string
sentinelPassword string
sentinelMaster string
hosts []string
ports []string
enableTLS bool
}

type redisMetadata struct {
Expand All @@ -50,7 +54,7 @@ type redisMetadata struct {
var redisLog = logf.Log.WithName("redis_scaler")

// NewRedisScaler creates a new redisScaler
func NewRedisScaler(isClustered bool, config *ScalerConfig) (Scaler, error) {
func NewRedisScaler(isClustered, isSentinel bool, config *ScalerConfig) (Scaler, error) {
luaScript := `
local listName = KEYS[1]
local listType = redis.call('type', listName).ok
Expand All @@ -70,7 +74,14 @@ func NewRedisScaler(isClustered bool, config *ScalerConfig) (Scaler, error) {
return nil, fmt.Errorf("error parsing redis metadata: %s", err)
}
return createClusteredRedisScaler(meta, luaScript)
} else if isSentinel {
meta, err := parseRedisMetadata(config, parseRedisSentinelAddress)
if err != nil {
return nil, fmt.Errorf("error parsing redis metadata: %s", err)
}
return createSentinelRedisScaler(meta, luaScript)
}

meta, err := parseRedisMetadata(config, parseRedisAddress)
if err != nil {
return nil, fmt.Errorf("error parsing redis metadata: %s", err)
Expand All @@ -92,8 +103,38 @@ func createClusteredRedisScaler(meta *redisMetadata, script string) (Scaler, err
return nil
}

listLengthFn := func() (int64, error) {
cmd := client.Eval(script, []string{meta.listName})
listLengthFn := func(ctx context.Context) (int64, error) {
cmd := client.Eval(ctx, script, []string{meta.listName})
jerbob92 marked this conversation as resolved.
Show resolved Hide resolved
if cmd.Err() != nil {
return -1, cmd.Err()
}

return cmd.Int64()
}

return &redisScaler{
metadata: meta,
closeFn: closeFn,
getListLengthFn: listLengthFn,
}, nil
}

func createSentinelRedisScaler(meta *redisMetadata, script string) (Scaler, error) {
client, err := getRedisSentinelClient(meta.connectionInfo, meta.databaseIndex)
if err != nil {
return nil, fmt.Errorf("connection to redis sentinel failed: %s", err)
}

closeFn := func() error {
if err := client.Close(); err != nil {
redisLog.Error(err, "error closing redis client")
return err
}
return nil
}

listLengthFn := func(ctx context.Context) (int64, error) {
cmd := client.Eval(ctx, script, []string{meta.listName})
jerbob92 marked this conversation as resolved.
Show resolved Hide resolved
if cmd.Err() != nil {
return -1, cmd.Err()
}
Expand Down Expand Up @@ -122,8 +163,8 @@ func createRedisScaler(meta *redisMetadata, script string) (Scaler, error) {
return nil
}

listLengthFn := func() (int64, error) {
cmd := client.Eval(script, []string{meta.listName})
listLengthFn := func(ctx context.Context) (int64, error) {
cmd := client.Eval(ctx, script, []string{meta.listName})
if cmd.Err() != nil {
return -1, cmd.Err()
}
Expand Down Expand Up @@ -176,7 +217,7 @@ func parseRedisMetadata(config *ScalerConfig, parserFn redisAddressParser) (*red

// IsActive checks if there is any element in the Redis list
func (s *redisScaler) IsActive(ctx context.Context) (bool, error) {
length, err := s.getListLengthFn()
length, err := s.getListLengthFn(ctx)

if err != nil {
redisLog.Error(err, "error")
Expand Down Expand Up @@ -210,7 +251,7 @@ func (s *redisScaler) GetMetricSpecForScaling() []v2beta2.MetricSpec {

// GetMetrics connects to Redis and finds the length of the list
func (s *redisScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) {
listLen, err := s.getListLengthFn()
listLen, err := s.getListLengthFn(ctx)

if err != nil {
redisLog.Error(err, "error getting list length")
Expand Down Expand Up @@ -263,6 +304,15 @@ func parseRedisAddress(metadata, resolvedEnv, authParams map[string]string) (red
return info, fmt.Errorf("no address or host given. address should be in the format of host:port or you should set the host/port values")
}

switch {
case authParams["username"] != "":
info.username = authParams["username"]
case metadata["username"] != "":
info.username = metadata["username"]
case metadata["usernameFromEnv"] != "":
info.username = resolvedEnv[metadata["usernameFromEnv"]]
}

if authParams["password"] != "" {
info.password = authParams["password"]
} else if metadata["passwordFromEnv"] != "" {
Expand All @@ -281,7 +331,7 @@ func parseRedisAddress(metadata, resolvedEnv, authParams map[string]string) (red
return info, nil
}

func parseRedisClusterAddress(metadata, resolvedEnv, authParams map[string]string) (redisConnectionInfo, error) {
func parseRedisMultipleAddress(metadata, resolvedEnv, authParams map[string]string) (redisConnectionInfo, error) {
info := redisConnectionInfo{}
switch {
case authParams["addresses"] != "":
Expand Down Expand Up @@ -323,6 +373,24 @@ func parseRedisClusterAddress(metadata, resolvedEnv, authParams map[string]strin
return info, fmt.Errorf("no addresses or hosts given. address should be a comma separated list of host:port or set the host/port values")
}

return info, nil
}

func parseRedisClusterAddress(metadata, resolvedEnv, authParams map[string]string) (redisConnectionInfo, error) {
info, err := parseRedisMultipleAddress(metadata, resolvedEnv, authParams)
if err != nil {
return info, err
}

switch {
case authParams["username"] != "":
info.username = authParams["username"]
case metadata["username"] != "":
info.username = metadata["username"]
case metadata["usernameFromEnv"] != "":
info.username = resolvedEnv[metadata["usernameFromEnv"]]
}

if authParams["password"] != "" {
info.password = authParams["password"]
} else if metadata["passwordFromEnv"] != "" {
Expand All @@ -341,9 +409,67 @@ func parseRedisClusterAddress(metadata, resolvedEnv, authParams map[string]strin
return info, nil
}

func parseRedisSentinelAddress(metadata, resolvedEnv, authParams map[string]string) (redisConnectionInfo, error) {
info, err := parseRedisMultipleAddress(metadata, resolvedEnv, authParams)
if err != nil {
return info, err
}

switch {
case authParams["username"] != "":
info.username = authParams["username"]
case metadata["username"] != "":
info.username = metadata["username"]
case metadata["usernameFromEnv"] != "":
info.username = resolvedEnv[metadata["usernameFromEnv"]]
}

if authParams["password"] != "" {
info.password = authParams["password"]
} else if metadata["passwordFromEnv"] != "" {
info.password = resolvedEnv[metadata["passwordFromEnv"]]
}

switch {
case authParams["sentinelUsername"] != "":
info.sentinelUsername = authParams["sentinelUsername"]
case metadata["sentinelUsername"] != "":
info.sentinelUsername = metadata["sentinelUsername"]
case metadata["sentinelUsernameFromEnv"] != "":
info.sentinelUsername = resolvedEnv[metadata["sentinelUsernameFromEnv"]]
}

if authParams["sentinelPassword"] != "" {
info.sentinelPassword = authParams["sentinelPassword"]
} else if metadata["sentinelPasswordFromEnv"] != "" {
info.sentinelPassword = resolvedEnv[metadata["sentinelPasswordFromEnv"]]
}

switch {
case authParams["sentinelMaster"] != "":
info.sentinelMaster = authParams["sentinelMaster"]
case metadata["sentinelMaster"] != "":
info.sentinelMaster = metadata["sentinelMaster"]
case metadata["sentinelMasterFromEnv"] != "":
info.sentinelMaster = resolvedEnv[metadata["sentinelMasterFromEnv"]]
}

info.enableTLS = defaultEnableTLS
if val, ok := metadata["enableTLS"]; ok {
tls, err := strconv.ParseBool(val)
if err != nil {
return info, fmt.Errorf("enableTLS parsing error %s", err.Error())
}
info.enableTLS = tls
}

return info, nil
}

func getRedisClusterClient(info redisConnectionInfo) (*redis.ClusterClient, error) {
options := &redis.ClusterOptions{
Addrs: info.addresses,
Username: info.username,
Password: info.password,
}
if info.enableTLS {
Expand All @@ -354,7 +480,32 @@ func getRedisClusterClient(info redisConnectionInfo) (*redis.ClusterClient, erro

// confirm if connected
c := redis.NewClusterClient(options)
err := c.Ping().Err()
err := c.Ping(context.Background()).Err()
if err != nil {
return nil, err
}
return c, nil
}

func getRedisSentinelClient(info redisConnectionInfo, dbIndex int) (*redis.Client, error) {
options := &redis.FailoverOptions{
Username: info.username,
Password: info.password,
DB: dbIndex,
SentinelAddrs: info.addresses,
SentinelUsername: info.sentinelUsername,
SentinelPassword: info.sentinelPassword,
MasterName: info.sentinelMaster,
}
if info.enableTLS {
options.TLSConfig = &tls.Config{
InsecureSkipVerify: info.enableTLS,
}
}

// confirm if connected
c := redis.NewFailoverClient(options)
err := c.Ping(context.Background()).Err()
if err != nil {
return nil, err
}
Expand All @@ -364,6 +515,7 @@ func getRedisClusterClient(info redisConnectionInfo) (*redis.ClusterClient, erro
func getRedisClient(info redisConnectionInfo, dbIndex int) (*redis.Client, error) {
options := &redis.Options{
Addr: info.addresses[0],
Username: info.username,
Password: info.password,
DB: dbIndex,
}
Expand All @@ -375,7 +527,7 @@ func getRedisClient(info redisConnectionInfo, dbIndex int) (*redis.Client, error

// confirm if connected
c := redis.NewClient(options)
err := c.Ping().Err()
err := c.Ping(context.Background()).Err()
if err != nil {
return nil, err
}
Expand Down
Loading