Skip to content

Commit

Permalink
Improve Redis support (#2181)
Browse files Browse the repository at this point in the history
Signed-off-by: Jeroen Bobbeldijk <jeroen@klippa.com>
  • Loading branch information
jerbob92 committed Nov 1, 2021
1 parent f78c16e commit 7f619bd
Show file tree
Hide file tree
Showing 10 changed files with 2,112 additions and 38 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
- ScaledJob: introduce MultipleScalersCalculation ([#2016](https://github.com/kedacore/keda/pull/2016))
- Add Graphite Scaler ([#1628](https://github.com/kedacore/keda/pull/2092))
- Add Cassandra Scaler ([#2211](https://github.com/kedacore/keda/pull/2211))
- Improve Redis Scaler, upgrade library, add username and Sentinel support ([#2181](https://github.com/kedacore/keda/pull/2181))

### Improvements

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ require (
github.com/denisenkom/go-mssqldb v0.11.0
github.com/go-logr/logr v0.4.0
github.com/go-playground/assert/v2 v2.0.1
github.com/go-redis/redis v6.15.9+incompatible
github.com/go-redis/redis/v8 v8.11.4
github.com/go-sql-driver/mysql v1.6.0
github.com/gocql/gocql v0.0.0-20211015133455-b225f9b53fa1
github.com/golang/mock v1.6.0
Expand Down
9 changes: 6 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,9 @@ github.com/certifi/gocertifi v0.0.0-20191021191039-0944d244cd40/go.mod h1:sGbDF6
github.com/certifi/gocertifi v0.0.0-20200922220541-2c3bb06c6054/go.mod h1:sGbDF6GwGcLpkNXPUTkMRoywsNa/ol15pxFe6ERfguA=
github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY=
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE=
github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
Expand Down Expand Up @@ -245,6 +246,8 @@ github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZm
github.com/dgryski/go-gk v0.0.0-20140819190930-201884a44051/go.mod h1:qm+vckxRlDt0aOla0RYJJVeqHZlWfOm2UIxHaqPB46E=
github.com/dgryski/go-gk v0.0.0-20200319235926-a69029f61654/go.mod h1:qm+vckxRlDt0aOla0RYJJVeqHZlWfOm2UIxHaqPB46E=
github.com/dgryski/go-lttb v0.0.0-20180810165845-318fcdf10a77/go.mod h1:Va5MyIzkU0rAM92tn3hb3Anb7oz7KcnixF49+2wOMe4=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no=
github.com/dimchansky/utfbom v1.1.0/go.mod h1:rO41eb7gLfo8SF1jd9F8HplJm1Fewwi4mQvIirEdv+8=
github.com/dimchansky/utfbom v1.1.1 h1:vV6w1AhK4VMnhBno/TPVCoK9U/LP0PkLCS9tbxHdi/U=
Expand Down Expand Up @@ -353,8 +356,8 @@ github.com/go-playground/universal-translator v0.17.0 h1:icxd5fm+REJzpZx7ZfpaD87
github.com/go-playground/universal-translator v0.17.0/go.mod h1:UkSxE5sNxxRwHyU+Scu5vgOQjsIJAF8j9muTVoKLVtA=
github.com/go-playground/validator/v10 v10.2.0 h1:KgJ0snyC2R9VXYN2rneOtQcw5aHQB1Vv0sFl1UcHBOY=
github.com/go-playground/validator/v10 v10.2.0/go.mod h1:uOYAAleCW8F/7oMFd6aG0GOhaH6EGOAJShg8Id5JGkI=
github.com/go-redis/redis v6.15.9+incompatible h1:K0pv1D7EQUjfyoMql+r/jZqCLizCGKFlFgcHWWmHQjg=
github.com/go-redis/redis v6.15.9+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA=
github.com/go-redis/redis/v8 v8.11.4 h1:kHoYkfZP6+pe04aFTnhDH6GDROa5yJdHJVNxV3F46Tg=
github.com/go-redis/redis/v8 v8.11.4/go.mod h1:2Z2wHZXdQpCDXEGzqMockDpNyYvi2l4Pxt6RJr792+w=
github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
github.com/go-sql-driver/mysql v1.6.0 h1:BCTh4TKNUYmOmMUcQ3IipzF5prigylS7XXjEkfCHuOE=
github.com/go-sql-driver/mysql v1.6.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
Expand Down
178 changes: 164 additions & 14 deletions pkg/scalers/redis_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"strconv"
"strings"

"github.com/go-redis/redis"
"github.com/go-redis/redis/v8"
v2beta2 "k8s.io/api/autoscaling/v2beta2"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -33,11 +33,15 @@ type redisScaler struct {
}

type redisConnectionInfo struct {
addresses []string
password string
hosts []string
ports []string
enableTLS bool
addresses []string
username string
password string
sentinelUsername string
sentinelPassword string
sentinelMaster string
hosts []string
ports []string
enableTLS bool
}

type redisMetadata struct {
Expand All @@ -51,7 +55,7 @@ type redisMetadata struct {
var redisLog = logf.Log.WithName("redis_scaler")

// NewRedisScaler creates a new redisScaler
func NewRedisScaler(ctx context.Context, isClustered bool, config *ScalerConfig) (Scaler, error) {
func NewRedisScaler(ctx context.Context, isClustered, isSentinel bool, config *ScalerConfig) (Scaler, error) {
luaScript := `
local listName = KEYS[1]
local listType = redis.call('type', listName).ok
Expand All @@ -71,7 +75,14 @@ func NewRedisScaler(ctx context.Context, isClustered bool, config *ScalerConfig)
return nil, fmt.Errorf("error parsing redis metadata: %s", err)
}
return createClusteredRedisScaler(ctx, meta, luaScript)
} else if isSentinel {
meta, err := parseRedisMetadata(config, parseRedisSentinelAddress)
if err != nil {
return nil, fmt.Errorf("error parsing redis metadata: %s", err)
}
return createSentinelRedisScaler(ctx, meta, luaScript)
}

meta, err := parseRedisMetadata(config, parseRedisAddress)
if err != nil {
return nil, fmt.Errorf("error parsing redis metadata: %s", err)
Expand All @@ -94,8 +105,37 @@ func createClusteredRedisScaler(ctx context.Context, meta *redisMetadata, script
}

listLengthFn := func(ctx context.Context) (int64, error) {
cl := client.WithContext(ctx)
cmd := cl.Eval(script, []string{meta.listName})
cmd := client.Eval(ctx, script, []string{meta.listName})
if cmd.Err() != nil {
return -1, cmd.Err()
}

return cmd.Int64()
}

return &redisScaler{
metadata: meta,
closeFn: closeFn,
getListLengthFn: listLengthFn,
}, nil
}

func createSentinelRedisScaler(ctx context.Context, meta *redisMetadata, script string) (Scaler, error) {
client, err := getRedisSentinelClient(ctx, meta.connectionInfo, meta.databaseIndex)
if err != nil {
return nil, fmt.Errorf("connection to redis sentinel failed: %s", err)
}

closeFn := func() error {
if err := client.Close(); err != nil {
redisLog.Error(err, "error closing redis client")
return err
}
return nil
}

listLengthFn := func(ctx context.Context) (int64, error) {
cmd := client.Eval(ctx, script, []string{meta.listName})
if cmd.Err() != nil {
return -1, cmd.Err()
}
Expand Down Expand Up @@ -125,8 +165,7 @@ func createRedisScaler(ctx context.Context, meta *redisMetadata, script string)
}

listLengthFn := func(ctx context.Context) (int64, error) {
cl := client.WithContext(ctx)
cmd := cl.Eval(script, []string{meta.listName})
cmd := client.Eval(ctx, script, []string{meta.listName})
if cmd.Err() != nil {
return -1, cmd.Err()
}
Expand Down Expand Up @@ -267,6 +306,15 @@ func parseRedisAddress(metadata, resolvedEnv, authParams map[string]string) (red
return info, fmt.Errorf("no address or host given. address should be in the format of host:port or you should set the host/port values")
}

switch {
case authParams["username"] != "":
info.username = authParams["username"]
case metadata["username"] != "":
info.username = metadata["username"]
case metadata["usernameFromEnv"] != "":
info.username = resolvedEnv[metadata["usernameFromEnv"]]
}

if authParams["password"] != "" {
info.password = authParams["password"]
} else if metadata["passwordFromEnv"] != "" {
Expand All @@ -285,7 +333,7 @@ func parseRedisAddress(metadata, resolvedEnv, authParams map[string]string) (red
return info, nil
}

func parseRedisClusterAddress(metadata, resolvedEnv, authParams map[string]string) (redisConnectionInfo, error) {
func parseRedisMultipleAddress(metadata, resolvedEnv, authParams map[string]string) (redisConnectionInfo, error) {
info := redisConnectionInfo{}
switch {
case authParams["addresses"] != "":
Expand Down Expand Up @@ -327,12 +375,87 @@ func parseRedisClusterAddress(metadata, resolvedEnv, authParams map[string]strin
return info, fmt.Errorf("no addresses or hosts given. address should be a comma separated list of host:port or set the host/port values")
}

return info, nil
}

func parseRedisClusterAddress(metadata, resolvedEnv, authParams map[string]string) (redisConnectionInfo, error) {
info, err := parseRedisMultipleAddress(metadata, resolvedEnv, authParams)
if err != nil {
return info, err
}

switch {
case authParams["username"] != "":
info.username = authParams["username"]
case metadata["username"] != "":
info.username = metadata["username"]
case metadata["usernameFromEnv"] != "":
info.username = resolvedEnv[metadata["usernameFromEnv"]]
}

if authParams["password"] != "" {
info.password = authParams["password"]
} else if metadata["passwordFromEnv"] != "" {
info.password = resolvedEnv[metadata["passwordFromEnv"]]
}

info.enableTLS = defaultEnableTLS
if val, ok := metadata["enableTLS"]; ok {
tls, err := strconv.ParseBool(val)
if err != nil {
return info, fmt.Errorf("enableTLS parsing error %s", err.Error())
}
info.enableTLS = tls
}

return info, nil
}

func parseRedisSentinelAddress(metadata, resolvedEnv, authParams map[string]string) (redisConnectionInfo, error) {
info, err := parseRedisMultipleAddress(metadata, resolvedEnv, authParams)
if err != nil {
return info, err
}

switch {
case authParams["username"] != "":
info.username = authParams["username"]
case metadata["username"] != "":
info.username = metadata["username"]
case metadata["usernameFromEnv"] != "":
info.username = resolvedEnv[metadata["usernameFromEnv"]]
}

if authParams["password"] != "" {
info.password = authParams["password"]
} else if metadata["passwordFromEnv"] != "" {
info.password = resolvedEnv[metadata["passwordFromEnv"]]
}

switch {
case authParams["sentinelUsername"] != "":
info.sentinelUsername = authParams["sentinelUsername"]
case metadata["sentinelUsername"] != "":
info.sentinelUsername = metadata["sentinelUsername"]
case metadata["sentinelUsernameFromEnv"] != "":
info.sentinelUsername = resolvedEnv[metadata["sentinelUsernameFromEnv"]]
}

if authParams["sentinelPassword"] != "" {
info.sentinelPassword = authParams["sentinelPassword"]
} else if metadata["sentinelPasswordFromEnv"] != "" {
info.sentinelPassword = resolvedEnv[metadata["sentinelPasswordFromEnv"]]
}

switch {
case authParams["sentinelMaster"] != "":
info.sentinelMaster = authParams["sentinelMaster"]
case metadata["sentinelMaster"] != "":
info.sentinelMaster = metadata["sentinelMaster"]
case metadata["sentinelMasterFromEnv"] != "":
info.sentinelMaster = resolvedEnv[metadata["sentinelMasterFromEnv"]]
}

info.enableTLS = defaultEnableTLS
if val, ok := metadata["enableTLS"]; ok {
tls, err := strconv.ParseBool(val)
Expand All @@ -348,6 +471,7 @@ func parseRedisClusterAddress(metadata, resolvedEnv, authParams map[string]strin
func getRedisClusterClient(ctx context.Context, info redisConnectionInfo) (*redis.ClusterClient, error) {
options := &redis.ClusterOptions{
Addrs: info.addresses,
Username: info.username,
Password: info.password,
}
if info.enableTLS {
Expand All @@ -358,7 +482,31 @@ func getRedisClusterClient(ctx context.Context, info redisConnectionInfo) (*redi

// confirm if connected
c := redis.NewClusterClient(options)
if err := c.WithContext(ctx).Ping().Err(); err != nil {
if err := c.Ping(ctx).Err(); err != nil {
return nil, err
}
return c, nil
}

func getRedisSentinelClient(ctx context.Context, info redisConnectionInfo, dbIndex int) (*redis.Client, error) {
options := &redis.FailoverOptions{
Username: info.username,
Password: info.password,
DB: dbIndex,
SentinelAddrs: info.addresses,
SentinelUsername: info.sentinelUsername,
SentinelPassword: info.sentinelPassword,
MasterName: info.sentinelMaster,
}
if info.enableTLS {
options.TLSConfig = &tls.Config{
InsecureSkipVerify: info.enableTLS,
}
}

// confirm if connected
c := redis.NewFailoverClient(options)
if err := c.Ping(ctx).Err(); err != nil {
return nil, err
}
return c, nil
Expand All @@ -367,6 +515,7 @@ func getRedisClusterClient(ctx context.Context, info redisConnectionInfo) (*redi
func getRedisClient(ctx context.Context, info redisConnectionInfo, dbIndex int) (*redis.Client, error) {
options := &redis.Options{
Addr: info.addresses[0],
Username: info.username,
Password: info.password,
DB: dbIndex,
}
Expand All @@ -378,7 +527,8 @@ func getRedisClient(ctx context.Context, info redisConnectionInfo, dbIndex int)

// confirm if connected
c := redis.NewClient(options)
if err := c.WithContext(ctx).Ping().Err(); err != nil {
err := c.Ping(ctx).Err()
if err != nil {
return nil, err
}
return c, nil
Expand Down
Loading

0 comments on commit 7f619bd

Please sign in to comment.