diff --git a/CHANGELOG.md b/CHANGELOG.md index 040523c9e67..51578c12fc0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -25,6 +25,7 @@ - ScaledJob: introduce MultipleScalersCalculation ([#2016](https://github.com/kedacore/keda/pull/2016)) - Add Graphite Scaler ([#1628](https://github.com/kedacore/keda/pull/2092)) - Add Cassandra Scaler ([#2211](https://github.com/kedacore/keda/pull/2211)) +- Improve Redis Scaler, upgrade library, add username and Sentinel support ([#2181](https://github.com/kedacore/keda/pull/2181)) ### Improvements diff --git a/go.mod b/go.mod index b5bbe87fcd9..186c8198a74 100644 --- a/go.mod +++ b/go.mod @@ -18,7 +18,7 @@ require ( github.com/denisenkom/go-mssqldb v0.11.0 github.com/go-logr/logr v0.4.0 github.com/go-playground/assert/v2 v2.0.1 - github.com/go-redis/redis v6.15.9+incompatible + github.com/go-redis/redis/v8 v8.11.4 github.com/go-sql-driver/mysql v1.6.0 github.com/gocql/gocql v0.0.0-20211015133455-b225f9b53fa1 github.com/golang/mock v1.6.0 diff --git a/go.sum b/go.sum index 2a179abf2c8..8bd4f0c8857 100644 --- a/go.sum +++ b/go.sum @@ -186,8 +186,9 @@ github.com/certifi/gocertifi v0.0.0-20191021191039-0944d244cd40/go.mod h1:sGbDF6 github.com/certifi/gocertifi v0.0.0-20200922220541-2c3bb06c6054/go.mod h1:sGbDF6GwGcLpkNXPUTkMRoywsNa/ol15pxFe6ERfguA= github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko= github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= -github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE= +github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI= github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU= @@ -245,6 +246,8 @@ github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZm github.com/dgryski/go-gk v0.0.0-20140819190930-201884a44051/go.mod h1:qm+vckxRlDt0aOla0RYJJVeqHZlWfOm2UIxHaqPB46E= github.com/dgryski/go-gk v0.0.0-20200319235926-a69029f61654/go.mod h1:qm+vckxRlDt0aOla0RYJJVeqHZlWfOm2UIxHaqPB46E= github.com/dgryski/go-lttb v0.0.0-20180810165845-318fcdf10a77/go.mod h1:Va5MyIzkU0rAM92tn3hb3Anb7oz7KcnixF49+2wOMe4= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no= github.com/dimchansky/utfbom v1.1.0/go.mod h1:rO41eb7gLfo8SF1jd9F8HplJm1Fewwi4mQvIirEdv+8= github.com/dimchansky/utfbom v1.1.1 h1:vV6w1AhK4VMnhBno/TPVCoK9U/LP0PkLCS9tbxHdi/U= @@ -353,8 +356,8 @@ github.com/go-playground/universal-translator v0.17.0 h1:icxd5fm+REJzpZx7ZfpaD87 github.com/go-playground/universal-translator v0.17.0/go.mod h1:UkSxE5sNxxRwHyU+Scu5vgOQjsIJAF8j9muTVoKLVtA= github.com/go-playground/validator/v10 v10.2.0 h1:KgJ0snyC2R9VXYN2rneOtQcw5aHQB1Vv0sFl1UcHBOY= github.com/go-playground/validator/v10 v10.2.0/go.mod h1:uOYAAleCW8F/7oMFd6aG0GOhaH6EGOAJShg8Id5JGkI= -github.com/go-redis/redis v6.15.9+incompatible h1:K0pv1D7EQUjfyoMql+r/jZqCLizCGKFlFgcHWWmHQjg= -github.com/go-redis/redis v6.15.9+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA= +github.com/go-redis/redis/v8 v8.11.4 h1:kHoYkfZP6+pe04aFTnhDH6GDROa5yJdHJVNxV3F46Tg= +github.com/go-redis/redis/v8 v8.11.4/go.mod h1:2Z2wHZXdQpCDXEGzqMockDpNyYvi2l4Pxt6RJr792+w= github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= github.com/go-sql-driver/mysql v1.6.0 h1:BCTh4TKNUYmOmMUcQ3IipzF5prigylS7XXjEkfCHuOE= github.com/go-sql-driver/mysql v1.6.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= diff --git a/pkg/scalers/redis_scaler.go b/pkg/scalers/redis_scaler.go index d38ec74274e..87aad6afaf6 100644 --- a/pkg/scalers/redis_scaler.go +++ b/pkg/scalers/redis_scaler.go @@ -7,7 +7,7 @@ import ( "strconv" "strings" - "github.com/go-redis/redis" + "github.com/go-redis/redis/v8" v2beta2 "k8s.io/api/autoscaling/v2beta2" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -33,11 +33,15 @@ type redisScaler struct { } type redisConnectionInfo struct { - addresses []string - password string - hosts []string - ports []string - enableTLS bool + addresses []string + username string + password string + sentinelUsername string + sentinelPassword string + sentinelMaster string + hosts []string + ports []string + enableTLS bool } type redisMetadata struct { @@ -51,7 +55,7 @@ type redisMetadata struct { var redisLog = logf.Log.WithName("redis_scaler") // NewRedisScaler creates a new redisScaler -func NewRedisScaler(ctx context.Context, isClustered bool, config *ScalerConfig) (Scaler, error) { +func NewRedisScaler(ctx context.Context, isClustered, isSentinel bool, config *ScalerConfig) (Scaler, error) { luaScript := ` local listName = KEYS[1] local listType = redis.call('type', listName).ok @@ -71,7 +75,14 @@ func NewRedisScaler(ctx context.Context, isClustered bool, config *ScalerConfig) return nil, fmt.Errorf("error parsing redis metadata: %s", err) } return createClusteredRedisScaler(ctx, meta, luaScript) + } else if isSentinel { + meta, err := parseRedisMetadata(config, parseRedisSentinelAddress) + if err != nil { + return nil, fmt.Errorf("error parsing redis metadata: %s", err) + } + return createSentinelRedisScaler(ctx, meta, luaScript) } + meta, err := parseRedisMetadata(config, parseRedisAddress) if err != nil { return nil, fmt.Errorf("error parsing redis metadata: %s", err) @@ -94,8 +105,37 @@ func createClusteredRedisScaler(ctx context.Context, meta *redisMetadata, script } listLengthFn := func(ctx context.Context) (int64, error) { - cl := client.WithContext(ctx) - cmd := cl.Eval(script, []string{meta.listName}) + cmd := client.Eval(ctx, script, []string{meta.listName}) + if cmd.Err() != nil { + return -1, cmd.Err() + } + + return cmd.Int64() + } + + return &redisScaler{ + metadata: meta, + closeFn: closeFn, + getListLengthFn: listLengthFn, + }, nil +} + +func createSentinelRedisScaler(ctx context.Context, meta *redisMetadata, script string) (Scaler, error) { + client, err := getRedisSentinelClient(ctx, meta.connectionInfo, meta.databaseIndex) + if err != nil { + return nil, fmt.Errorf("connection to redis sentinel failed: %s", err) + } + + closeFn := func() error { + if err := client.Close(); err != nil { + redisLog.Error(err, "error closing redis client") + return err + } + return nil + } + + listLengthFn := func(ctx context.Context) (int64, error) { + cmd := client.Eval(ctx, script, []string{meta.listName}) if cmd.Err() != nil { return -1, cmd.Err() } @@ -125,8 +165,7 @@ func createRedisScaler(ctx context.Context, meta *redisMetadata, script string) } listLengthFn := func(ctx context.Context) (int64, error) { - cl := client.WithContext(ctx) - cmd := cl.Eval(script, []string{meta.listName}) + cmd := client.Eval(ctx, script, []string{meta.listName}) if cmd.Err() != nil { return -1, cmd.Err() } @@ -267,6 +306,15 @@ func parseRedisAddress(metadata, resolvedEnv, authParams map[string]string) (red return info, fmt.Errorf("no address or host given. address should be in the format of host:port or you should set the host/port values") } + switch { + case authParams["username"] != "": + info.username = authParams["username"] + case metadata["username"] != "": + info.username = metadata["username"] + case metadata["usernameFromEnv"] != "": + info.username = resolvedEnv[metadata["usernameFromEnv"]] + } + if authParams["password"] != "" { info.password = authParams["password"] } else if metadata["passwordFromEnv"] != "" { @@ -285,7 +333,7 @@ func parseRedisAddress(metadata, resolvedEnv, authParams map[string]string) (red return info, nil } -func parseRedisClusterAddress(metadata, resolvedEnv, authParams map[string]string) (redisConnectionInfo, error) { +func parseRedisMultipleAddress(metadata, resolvedEnv, authParams map[string]string) (redisConnectionInfo, error) { info := redisConnectionInfo{} switch { case authParams["addresses"] != "": @@ -327,12 +375,87 @@ func parseRedisClusterAddress(metadata, resolvedEnv, authParams map[string]strin return info, fmt.Errorf("no addresses or hosts given. address should be a comma separated list of host:port or set the host/port values") } + return info, nil +} + +func parseRedisClusterAddress(metadata, resolvedEnv, authParams map[string]string) (redisConnectionInfo, error) { + info, err := parseRedisMultipleAddress(metadata, resolvedEnv, authParams) + if err != nil { + return info, err + } + + switch { + case authParams["username"] != "": + info.username = authParams["username"] + case metadata["username"] != "": + info.username = metadata["username"] + case metadata["usernameFromEnv"] != "": + info.username = resolvedEnv[metadata["usernameFromEnv"]] + } + + if authParams["password"] != "" { + info.password = authParams["password"] + } else if metadata["passwordFromEnv"] != "" { + info.password = resolvedEnv[metadata["passwordFromEnv"]] + } + + info.enableTLS = defaultEnableTLS + if val, ok := metadata["enableTLS"]; ok { + tls, err := strconv.ParseBool(val) + if err != nil { + return info, fmt.Errorf("enableTLS parsing error %s", err.Error()) + } + info.enableTLS = tls + } + + return info, nil +} + +func parseRedisSentinelAddress(metadata, resolvedEnv, authParams map[string]string) (redisConnectionInfo, error) { + info, err := parseRedisMultipleAddress(metadata, resolvedEnv, authParams) + if err != nil { + return info, err + } + + switch { + case authParams["username"] != "": + info.username = authParams["username"] + case metadata["username"] != "": + info.username = metadata["username"] + case metadata["usernameFromEnv"] != "": + info.username = resolvedEnv[metadata["usernameFromEnv"]] + } + if authParams["password"] != "" { info.password = authParams["password"] } else if metadata["passwordFromEnv"] != "" { info.password = resolvedEnv[metadata["passwordFromEnv"]] } + switch { + case authParams["sentinelUsername"] != "": + info.sentinelUsername = authParams["sentinelUsername"] + case metadata["sentinelUsername"] != "": + info.sentinelUsername = metadata["sentinelUsername"] + case metadata["sentinelUsernameFromEnv"] != "": + info.sentinelUsername = resolvedEnv[metadata["sentinelUsernameFromEnv"]] + } + + if authParams["sentinelPassword"] != "" { + info.sentinelPassword = authParams["sentinelPassword"] + } else if metadata["sentinelPasswordFromEnv"] != "" { + info.sentinelPassword = resolvedEnv[metadata["sentinelPasswordFromEnv"]] + } + + switch { + case authParams["sentinelMaster"] != "": + info.sentinelMaster = authParams["sentinelMaster"] + case metadata["sentinelMaster"] != "": + info.sentinelMaster = metadata["sentinelMaster"] + case metadata["sentinelMasterFromEnv"] != "": + info.sentinelMaster = resolvedEnv[metadata["sentinelMasterFromEnv"]] + } + info.enableTLS = defaultEnableTLS if val, ok := metadata["enableTLS"]; ok { tls, err := strconv.ParseBool(val) @@ -348,6 +471,7 @@ func parseRedisClusterAddress(metadata, resolvedEnv, authParams map[string]strin func getRedisClusterClient(ctx context.Context, info redisConnectionInfo) (*redis.ClusterClient, error) { options := &redis.ClusterOptions{ Addrs: info.addresses, + Username: info.username, Password: info.password, } if info.enableTLS { @@ -358,7 +482,31 @@ func getRedisClusterClient(ctx context.Context, info redisConnectionInfo) (*redi // confirm if connected c := redis.NewClusterClient(options) - if err := c.WithContext(ctx).Ping().Err(); err != nil { + if err := c.Ping(ctx).Err(); err != nil { + return nil, err + } + return c, nil +} + +func getRedisSentinelClient(ctx context.Context, info redisConnectionInfo, dbIndex int) (*redis.Client, error) { + options := &redis.FailoverOptions{ + Username: info.username, + Password: info.password, + DB: dbIndex, + SentinelAddrs: info.addresses, + SentinelUsername: info.sentinelUsername, + SentinelPassword: info.sentinelPassword, + MasterName: info.sentinelMaster, + } + if info.enableTLS { + options.TLSConfig = &tls.Config{ + InsecureSkipVerify: info.enableTLS, + } + } + + // confirm if connected + c := redis.NewFailoverClient(options) + if err := c.Ping(ctx).Err(); err != nil { return nil, err } return c, nil @@ -367,6 +515,7 @@ func getRedisClusterClient(ctx context.Context, info redisConnectionInfo) (*redi func getRedisClient(ctx context.Context, info redisConnectionInfo, dbIndex int) (*redis.Client, error) { options := &redis.Options{ Addr: info.addresses[0], + Username: info.username, Password: info.password, DB: dbIndex, } @@ -378,7 +527,8 @@ func getRedisClient(ctx context.Context, info redisConnectionInfo, dbIndex int) // confirm if connected c := redis.NewClient(options) - if err := c.WithContext(ctx).Ping().Err(); err != nil { + err := c.Ping(ctx).Err() + if err != nil { return nil, err } return c, nil diff --git a/pkg/scalers/redis_scaler_test.go b/pkg/scalers/redis_scaler_test.go index 235ed6fe9e6..2b64c30d8ec 100644 --- a/pkg/scalers/redis_scaler_test.go +++ b/pkg/scalers/redis_scaler_test.go @@ -9,9 +9,13 @@ import ( ) var testRedisResolvedEnv = map[string]string{ - "REDIS_HOST": "none", - "REDIS_PORT": "6379", - "REDIS_PASSWORD": "none", + "REDIS_HOST": "none", + "REDIS_PORT": "6379", + "REDIS_USERNAME": "none", + "REDIS_PASSWORD": "none", + "REDIS_SENTINEL_MASTER": "none", + "REDIS_SENTINEL_USERNAME": "none", + "REDIS_SENTINEL_PASSWORD": "none", } type parseRedisMetadataTestData struct { @@ -174,6 +178,115 @@ func TestParseRedisClusterMetadata(t *testing.T) { }, wantErr: nil, }, + { + name: "username given in authParams", + metadata: map[string]string{ + "hosts": "a, b, c", + "ports": "1, 2, 3", + "listName": "mylist", + }, + authParams: map[string]string{ + "username": "username", + }, + wantMeta: &redisMetadata{ + targetListLength: 5, + listName: "mylist", + connectionInfo: redisConnectionInfo{ + addresses: []string{"a:1", "b:2", "c:3"}, + hosts: []string{"a", "b", "c"}, + ports: []string{"1", "2", "3"}, + username: "username", + }, + }, + wantErr: nil, + }, + { + name: "username given in metadata", + metadata: map[string]string{ + "hosts": "a, b, c", + "ports": "1, 2, 3", + "listName": "mylist", + "username": "username", + }, + authParams: map[string]string{}, + wantMeta: &redisMetadata{ + targetListLength: 5, + listName: "mylist", + connectionInfo: redisConnectionInfo{ + addresses: []string{"a:1", "b:2", "c:3"}, + hosts: []string{"a", "b", "c"}, + ports: []string{"1", "2", "3"}, + username: "username", + }, + }, + wantErr: nil, + }, + { + name: "username given in metadata from env", + metadata: map[string]string{ + "hosts": "a, b, c", + "ports": "1, 2, 3", + "listName": "mylist", + "usernameFromEnv": "REDIS_USERNAME", + }, + authParams: map[string]string{}, + resolvedEnv: testRedisResolvedEnv, + wantMeta: &redisMetadata{ + targetListLength: 5, + listName: "mylist", + connectionInfo: redisConnectionInfo{ + addresses: []string{"a:1", "b:2", "c:3"}, + hosts: []string{"a", "b", "c"}, + ports: []string{"1", "2", "3"}, + username: "none", + }, + }, + wantErr: nil, + }, + { + name: "password given in authParams", + metadata: map[string]string{ + "hosts": "a, b, c", + "ports": "1, 2, 3", + "listName": "mylist", + }, + authParams: map[string]string{ + "password": "password", + }, + wantMeta: &redisMetadata{ + targetListLength: 5, + listName: "mylist", + connectionInfo: redisConnectionInfo{ + addresses: []string{"a:1", "b:2", "c:3"}, + hosts: []string{"a", "b", "c"}, + ports: []string{"1", "2", "3"}, + password: "password", + }, + }, + wantErr: nil, + }, + { + name: "password given in metadata from env", + metadata: map[string]string{ + "hosts": "a, b, c", + "ports": "1, 2, 3", + "listName": "mylist", + "passwordFromEnv": "REDIS_PASSWORD", + }, + authParams: map[string]string{}, + resolvedEnv: testRedisResolvedEnv, + wantMeta: &redisMetadata{ + targetListLength: 5, + listName: "mylist", + connectionInfo: redisConnectionInfo{ + addresses: []string{"a:1", "b:2", "c:3"}, + hosts: []string{"a", "b", "c"}, + ports: []string{"1", "2", "3"}, + password: "none", + }, + }, + wantErr: nil, + }, } for _, testCase := range cases { @@ -194,3 +307,408 @@ func TestParseRedisClusterMetadata(t *testing.T) { }) } } + +func TestParseRedisSentinelMetadata(t *testing.T) { + cases := []struct { + name string + metadata map[string]string + resolvedEnv map[string]string + authParams map[string]string + wantMeta *redisMetadata + wantErr error + }{ + { + name: "empty metadata", + wantMeta: nil, + wantErr: errors.New("no addresses or hosts given. address should be a comma separated list of host:port or set the host/port values"), + }, + { + name: "unequal number of hosts/ports", + metadata: map[string]string{ + "hosts": "a, b, c", + "ports": "1, 2", + }, + wantMeta: nil, + wantErr: errors.New("not enough hosts or ports given. number of hosts should be equal to the number of ports"), + }, + { + name: "no list name", + metadata: map[string]string{ + "hosts": "a, b, c", + "ports": "1, 2, 3", + "listLength": "5", + }, + wantMeta: nil, + wantErr: errors.New("no list name given"), + }, + { + name: "invalid list length", + metadata: map[string]string{ + "hosts": "a, b, c", + "ports": "1, 2, 3", + "listName": "mylist", + "listLength": "invalid", + }, + wantMeta: nil, + wantErr: errors.New("list length parsing error"), + }, + { + name: "address is defined in auth params", + metadata: map[string]string{ + "listName": "mylist", + }, + authParams: map[string]string{ + "addresses": ":7001, :7002", + }, + wantMeta: &redisMetadata{ + targetListLength: 5, + listName: "mylist", + connectionInfo: redisConnectionInfo{ + addresses: []string{":7001", ":7002"}, + }, + }, + wantErr: nil, + }, + { + name: "hosts and ports given in auth params", + metadata: map[string]string{ + "listName": "mylist", + }, + authParams: map[string]string{ + "hosts": " a, b, c ", + "ports": "1, 2, 3", + }, + wantMeta: &redisMetadata{ + targetListLength: 5, + listName: "mylist", + connectionInfo: redisConnectionInfo{ + addresses: []string{"a:1", "b:2", "c:3"}, + hosts: []string{"a", "b", "c"}, + ports: []string{"1", "2", "3"}, + }, + }, + wantErr: nil, + }, + { + name: "hosts and ports given in auth params", + metadata: map[string]string{ + "listName": "mylist", + }, + authParams: map[string]string{ + "hosts": " a, b, c ", + "ports": "1, 2, 3", + }, + wantMeta: &redisMetadata{ + targetListLength: 5, + listName: "mylist", + connectionInfo: redisConnectionInfo{ + addresses: []string{"a:1", "b:2", "c:3"}, + hosts: []string{"a", "b", "c"}, + ports: []string{"1", "2", "3"}, + }, + }, + wantErr: nil, + }, + { + name: "username given in authParams", + metadata: map[string]string{ + "hosts": "a, b, c", + "ports": "1, 2, 3", + "listName": "mylist", + }, + authParams: map[string]string{ + "username": "username", + }, + wantMeta: &redisMetadata{ + targetListLength: 5, + listName: "mylist", + connectionInfo: redisConnectionInfo{ + addresses: []string{"a:1", "b:2", "c:3"}, + hosts: []string{"a", "b", "c"}, + ports: []string{"1", "2", "3"}, + username: "username", + }, + }, + wantErr: nil, + }, + { + name: "username given in metadata", + metadata: map[string]string{ + "hosts": "a, b, c", + "ports": "1, 2, 3", + "listName": "mylist", + "username": "username", + }, + authParams: map[string]string{}, + wantMeta: &redisMetadata{ + targetListLength: 5, + listName: "mylist", + connectionInfo: redisConnectionInfo{ + addresses: []string{"a:1", "b:2", "c:3"}, + hosts: []string{"a", "b", "c"}, + ports: []string{"1", "2", "3"}, + username: "username", + }, + }, + wantErr: nil, + }, + { + name: "username given in metadata from env", + metadata: map[string]string{ + "hosts": "a, b, c", + "ports": "1, 2, 3", + "listName": "mylist", + "usernameFromEnv": "REDIS_USERNAME", + }, + authParams: map[string]string{}, + resolvedEnv: testRedisResolvedEnv, + wantMeta: &redisMetadata{ + targetListLength: 5, + listName: "mylist", + connectionInfo: redisConnectionInfo{ + addresses: []string{"a:1", "b:2", "c:3"}, + hosts: []string{"a", "b", "c"}, + ports: []string{"1", "2", "3"}, + username: "none", + }, + }, + wantErr: nil, + }, + { + name: "password given in authParams", + metadata: map[string]string{ + "hosts": "a, b, c", + "ports": "1, 2, 3", + "listName": "mylist", + }, + authParams: map[string]string{ + "password": "password", + }, + wantMeta: &redisMetadata{ + targetListLength: 5, + listName: "mylist", + connectionInfo: redisConnectionInfo{ + addresses: []string{"a:1", "b:2", "c:3"}, + hosts: []string{"a", "b", "c"}, + ports: []string{"1", "2", "3"}, + password: "password", + }, + }, + wantErr: nil, + }, + { + name: "password given in metadata from env", + metadata: map[string]string{ + "hosts": "a, b, c", + "ports": "1, 2, 3", + "listName": "mylist", + "passwordFromEnv": "REDIS_PASSWORD", + }, + authParams: map[string]string{}, + resolvedEnv: testRedisResolvedEnv, + wantMeta: &redisMetadata{ + targetListLength: 5, + listName: "mylist", + connectionInfo: redisConnectionInfo{ + addresses: []string{"a:1", "b:2", "c:3"}, + hosts: []string{"a", "b", "c"}, + ports: []string{"1", "2", "3"}, + password: "none", + }, + }, + wantErr: nil, + }, + { + name: "sentinelUsername given in authParams", + metadata: map[string]string{ + "hosts": "a, b, c", + "ports": "1, 2, 3", + "listName": "mylist", + }, + authParams: map[string]string{ + "sentinelUsername": "sentinelUsername", + }, + wantMeta: &redisMetadata{ + targetListLength: 5, + listName: "mylist", + connectionInfo: redisConnectionInfo{ + addresses: []string{"a:1", "b:2", "c:3"}, + hosts: []string{"a", "b", "c"}, + ports: []string{"1", "2", "3"}, + sentinelUsername: "sentinelUsername", + }, + }, + wantErr: nil, + }, + { + name: "sentinelUsername given in metadata", + metadata: map[string]string{ + "hosts": "a, b, c", + "ports": "1, 2, 3", + "listName": "mylist", + "sentinelUsername": "sentinelUsername", + }, + authParams: map[string]string{}, + wantMeta: &redisMetadata{ + targetListLength: 5, + listName: "mylist", + connectionInfo: redisConnectionInfo{ + addresses: []string{"a:1", "b:2", "c:3"}, + hosts: []string{"a", "b", "c"}, + ports: []string{"1", "2", "3"}, + sentinelUsername: "sentinelUsername", + }, + }, + wantErr: nil, + }, + { + name: "sentinelUsername given in metadata from env", + metadata: map[string]string{ + "hosts": "a, b, c", + "ports": "1, 2, 3", + "listName": "mylist", + "sentinelUsernameFromEnv": "REDIS_SENTINEL_USERNAME", + }, + authParams: map[string]string{}, + resolvedEnv: testRedisResolvedEnv, + wantMeta: &redisMetadata{ + targetListLength: 5, + listName: "mylist", + connectionInfo: redisConnectionInfo{ + addresses: []string{"a:1", "b:2", "c:3"}, + hosts: []string{"a", "b", "c"}, + ports: []string{"1", "2", "3"}, + sentinelUsername: "none", + }, + }, + wantErr: nil, + }, + { + name: "sentinelPassword given in authParams", + metadata: map[string]string{ + "hosts": "a, b, c", + "ports": "1, 2, 3", + "listName": "mylist", + }, + authParams: map[string]string{ + "sentinelPassword": "sentinelPassword", + }, + wantMeta: &redisMetadata{ + targetListLength: 5, + listName: "mylist", + connectionInfo: redisConnectionInfo{ + addresses: []string{"a:1", "b:2", "c:3"}, + hosts: []string{"a", "b", "c"}, + ports: []string{"1", "2", "3"}, + sentinelPassword: "sentinelPassword", + }, + }, + wantErr: nil, + }, + { + name: "sentinelPassword given in metadata from env", + metadata: map[string]string{ + "hosts": "a, b, c", + "ports": "1, 2, 3", + "listName": "mylist", + "sentinelPasswordFromEnv": "REDIS_SENTINEL_PASSWORD", + }, + authParams: map[string]string{}, + resolvedEnv: testRedisResolvedEnv, + wantMeta: &redisMetadata{ + targetListLength: 5, + listName: "mylist", + connectionInfo: redisConnectionInfo{ + addresses: []string{"a:1", "b:2", "c:3"}, + hosts: []string{"a", "b", "c"}, + ports: []string{"1", "2", "3"}, + sentinelPassword: "none", + }, + }, + wantErr: nil, + }, + { + name: "sentinelMaster given in authParams", + metadata: map[string]string{ + "hosts": "a, b, c", + "ports": "1, 2, 3", + "listName": "mylist", + }, + authParams: map[string]string{ + "sentinelMaster": "sentinelMaster", + }, + wantMeta: &redisMetadata{ + targetListLength: 5, + listName: "mylist", + connectionInfo: redisConnectionInfo{ + addresses: []string{"a:1", "b:2", "c:3"}, + hosts: []string{"a", "b", "c"}, + ports: []string{"1", "2", "3"}, + sentinelMaster: "sentinelMaster", + }, + }, + wantErr: nil, + }, + { + name: "sentinelMaster given in metadata", + metadata: map[string]string{ + "hosts": "a, b, c", + "ports": "1, 2, 3", + "listName": "mylist", + "sentinelMaster": "sentinelMaster", + }, + authParams: map[string]string{}, + wantMeta: &redisMetadata{ + targetListLength: 5, + listName: "mylist", + connectionInfo: redisConnectionInfo{ + addresses: []string{"a:1", "b:2", "c:3"}, + hosts: []string{"a", "b", "c"}, + ports: []string{"1", "2", "3"}, + sentinelMaster: "sentinelMaster", + }, + }, + wantErr: nil, + }, + { + name: "sentinelMaster given in metadata from env", + metadata: map[string]string{ + "hosts": "a, b, c", + "ports": "1, 2, 3", + "listName": "mylist", + "sentinelMasterFromEnv": "REDIS_SENTINEL_MASTER", + }, + authParams: map[string]string{}, + resolvedEnv: testRedisResolvedEnv, + wantMeta: &redisMetadata{ + targetListLength: 5, + listName: "mylist", + connectionInfo: redisConnectionInfo{ + addresses: []string{"a:1", "b:2", "c:3"}, + hosts: []string{"a", "b", "c"}, + ports: []string{"1", "2", "3"}, + sentinelMaster: "none", + }, + }, + wantErr: nil, + }, + } + + for _, testCase := range cases { + c := testCase + t.Run(c.name, func(t *testing.T) { + config := &ScalerConfig{ + TriggerMetadata: c.metadata, + ResolvedEnv: c.resolvedEnv, + AuthParams: c.authParams, + } + meta, err := parseRedisMetadata(config, parseRedisSentinelAddress) + if c.wantErr != nil { + assert.Contains(t, err.Error(), c.wantErr.Error()) + } else { + assert.NoError(t, err) + } + assert.Equal(t, c.wantMeta, meta) + }) + } +} diff --git a/pkg/scalers/redis_streams_scaler.go b/pkg/scalers/redis_streams_scaler.go index 59b5954706d..9858f10adbe 100644 --- a/pkg/scalers/redis_streams_scaler.go +++ b/pkg/scalers/redis_streams_scaler.go @@ -24,6 +24,7 @@ const ( pendingEntriesCountMetadata = "pendingEntriesCount" streamNameMetadata = "stream" consumerGroupNameMetadata = "consumerGroup" + usernameMetadata = "username" passwordMetadata = "password" databaseIndexMetadata = "databaseIndex" enableTLSMetadata = "enableTLS" @@ -32,7 +33,7 @@ const ( type redisStreamsScaler struct { metadata *redisStreamsMetadata closeFn func() error - getPendingEntriesCountFn func() (int64, error) + getPendingEntriesCountFn func(ctx context.Context) (int64, error) } type redisStreamsMetadata struct { @@ -47,13 +48,19 @@ type redisStreamsMetadata struct { var redisStreamsLog = logf.Log.WithName("redis_streams_scaler") // NewRedisStreamsScaler creates a new redisStreamsScaler -func NewRedisStreamsScaler(ctx context.Context, isClustered bool, config *ScalerConfig) (Scaler, error) { +func NewRedisStreamsScaler(ctx context.Context, isClustered, isSentinel bool, config *ScalerConfig) (Scaler, error) { if isClustered { meta, err := parseRedisStreamsMetadata(config, parseRedisClusterAddress) if err != nil { return nil, fmt.Errorf("error parsing redis streams metadata: %s", err) } return createClusteredRedisStreamsScaler(ctx, meta) + } else if isSentinel { + meta, err := parseRedisStreamsMetadata(config, parseRedisSentinelAddress) + if err != nil { + return nil, fmt.Errorf("error parsing redis streams metadata: %s", err) + } + return createSentinelRedisStreamsScaler(ctx, meta) } meta, err := parseRedisStreamsMetadata(config, parseRedisAddress) if err != nil { @@ -76,8 +83,37 @@ func createClusteredRedisStreamsScaler(ctx context.Context, meta *redisStreamsMe return nil } - pendingEntriesCountFn := func() (int64, error) { - pendingEntries, err := client.XPending(meta.streamName, meta.consumerGroupName).Result() + pendingEntriesCountFn := func(ctx context.Context) (int64, error) { + pendingEntries, err := client.XPending(ctx, meta.streamName, meta.consumerGroupName).Result() + if err != nil { + return -1, err + } + return pendingEntries.Count, nil + } + + return &redisStreamsScaler{ + metadata: meta, + closeFn: closeFn, + getPendingEntriesCountFn: pendingEntriesCountFn, + }, nil +} + +func createSentinelRedisStreamsScaler(ctx context.Context, meta *redisStreamsMetadata) (Scaler, error) { + client, err := getRedisSentinelClient(ctx, meta.connectionInfo, meta.databaseIndex) + if err != nil { + return nil, fmt.Errorf("connection to redis sentinel failed: %s", err) + } + + closeFn := func() error { + if err := client.Close(); err != nil { + redisStreamsLog.Error(err, "error closing redis client") + return err + } + return nil + } + + pendingEntriesCountFn := func(ctx context.Context) (int64, error) { + pendingEntries, err := client.XPending(ctx, meta.streamName, meta.consumerGroupName).Result() if err != nil { return -1, err } @@ -105,8 +141,8 @@ func createRedisStreamsScaler(ctx context.Context, meta *redisStreamsMetadata) ( return nil } - pendingEntriesCountFn := func() (int64, error) { - pendingEntries, err := client.XPending(meta.streamName, meta.consumerGroupName).Result() + pendingEntriesCountFn := func(ctx context.Context) (int64, error) { + pendingEntries, err := client.XPending(ctx, meta.streamName, meta.consumerGroupName).Result() if err != nil { return -1, err } @@ -166,7 +202,7 @@ func parseRedisStreamsMetadata(config *ScalerConfig, parseFn redisAddressParser) // IsActive checks if there are pending entries in the 'Pending Entries List' for consumer group of a stream func (s *redisStreamsScaler) IsActive(ctx context.Context) (bool, error) { - count, err := s.getPendingEntriesCountFn() + count, err := s.getPendingEntriesCountFn(ctx) if err != nil { redisStreamsLog.Error(err, "error") @@ -199,7 +235,7 @@ func (s *redisStreamsScaler) GetMetricSpecForScaling(context.Context) []v2beta2. // GetMetrics fetches the number of pending entries for a consumer group in a stream func (s *redisStreamsScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) { - pendingEntriesCount, err := s.getPendingEntriesCountFn() + pendingEntriesCount, err := s.getPendingEntriesCountFn(ctx) if err != nil { redisStreamsLog.Error(err, "error fetching pending entries count") diff --git a/pkg/scalers/redis_streams_scaler_test.go b/pkg/scalers/redis_streams_scaler_test.go index a5890b699f4..ba9b9da5fee 100644 --- a/pkg/scalers/redis_streams_scaler_test.go +++ b/pkg/scalers/redis_streams_scaler_test.go @@ -17,14 +17,15 @@ func TestParseRedisStreamsMetadata(t *testing.T) { authParams map[string]string } - authParams := map[string]string{"password": "foobarred"} + authParams := map[string]string{"username": "foobarred", "password": "foobarred"} testCases := []testCase{ { name: "with address", - metadata: map[string]string{"stream": "my-stream", "consumerGroup": "my-stream-consumer-group", "pendingEntriesCount": "5", "addressFromEnv": "REDIS_SERVICE", "passwordFromEnv": "REDIS_PASSWORD", "databaseIndex": "0", "enableTLS": "true"}, + metadata: map[string]string{"stream": "my-stream", "consumerGroup": "my-stream-consumer-group", "pendingEntriesCount": "5", "addressFromEnv": "REDIS_SERVICE", "usernameFromEnv": "REDIS_USERNAME", "passwordFromEnv": "REDIS_PASSWORD", "databaseIndex": "0", "enableTLS": "true"}, resolvedEnv: map[string]string{ "REDIS_SERVICE": "myredis:6379", + "REDIS_USERNAME": "foobarred", "REDIS_PASSWORD": "foobarred", }, authParams: nil, @@ -32,10 +33,11 @@ func TestParseRedisStreamsMetadata(t *testing.T) { { name: "with host and port", - metadata: map[string]string{"stream": "my-stream", "consumerGroup": "my-stream-consumer-group", "pendingEntriesCount": "15", "hostFromEnv": "REDIS_HOST", "port": "REDIS_PORT", "passwordFromEnv": "REDIS_PASSWORD", "databaseIndex": "0", "enableTLS": "false"}, + metadata: map[string]string{"stream": "my-stream", "consumerGroup": "my-stream-consumer-group", "pendingEntriesCount": "15", "hostFromEnv": "REDIS_HOST", "port": "REDIS_PORT", "usernameFromEnv": "REDIS_USERNAME", "passwordFromEnv": "REDIS_PASSWORD", "databaseIndex": "0", "enableTLS": "false"}, resolvedEnv: map[string]string{ "REDIS_HOST": "myredis", "REDIS_PORT": "6379", + "REDIS_USERNAME": "foobarred", "REDIS_PASSWORD": "foobarred", }, authParams: authParams, @@ -52,11 +54,14 @@ func TestParseRedisStreamsMetadata(t *testing.T) { assert.Equal(t, strconv.Itoa(m.targetPendingEntriesCount), tc.metadata[pendingEntriesCountMetadata]) if authParams != nil { // if authParam is used + assert.Equal(t, m.connectionInfo.username, authParams[usernameMetadata]) assert.Equal(t, m.connectionInfo.password, authParams[passwordMetadata]) } else { - // if metadata is used to pass password env var name + // if metadata is used to pass credentials' env var names + assert.Equal(t, m.connectionInfo.username, tc.resolvedEnv[tc.metadata[usernameMetadata]]) assert.Equal(t, m.connectionInfo.password, tc.resolvedEnv[tc.metadata[passwordMetadata]]) } + assert.Equal(t, strconv.Itoa(m.databaseIndex), tc.metadata[databaseIndexMetadata]) b, err := strconv.ParseBool(tc.metadata[enableTLSMetadata]) assert.Nil(t, err) @@ -139,7 +144,7 @@ func TestRedisStreamsGetMetricSpecForScaling(t *testing.T) { t.Fatal("Could not parse metadata:", err) } closeFn := func() error { return nil } - getPendingEntriesCountFn := func() (int64, error) { return -1, nil } + getPendingEntriesCountFn := func(ctx context.Context) (int64, error) { return -1, nil } mockRedisStreamsScaler := redisStreamsScaler{meta, closeFn, getPendingEntriesCountFn} metricSpec := mockRedisStreamsScaler.GetMetricSpecForScaling(context.Background()) @@ -246,6 +251,130 @@ func TestParseRedisClusterStreamsMetadata(t *testing.T) { }, wantErr: nil, }, + { + name: "username given in authParams", + metadata: map[string]string{ + "hosts": "a, b, c", + "ports": "1, 2, 3", + "stream": "my-stream", + "pendingEntriesCount": "10", + "consumerGroup": "consumer1", + }, + authParams: map[string]string{ + "username": "username", + }, + wantMeta: &redisStreamsMetadata{ + streamName: "my-stream", + targetPendingEntriesCount: 10, + consumerGroupName: "consumer1", + connectionInfo: redisConnectionInfo{ + addresses: []string{"a:1", "b:2", "c:3"}, + hosts: []string{"a", "b", "c"}, + ports: []string{"1", "2", "3"}, + username: "username", + }, + }, + wantErr: nil, + }, + { + name: "username given in metadata", + metadata: map[string]string{ + "hosts": "a, b, c", + "ports": "1, 2, 3", + "stream": "my-stream", + "pendingEntriesCount": "10", + "consumerGroup": "consumer1", + "username": "username", + }, + authParams: map[string]string{}, + wantMeta: &redisStreamsMetadata{ + streamName: "my-stream", + targetPendingEntriesCount: 10, + consumerGroupName: "consumer1", + connectionInfo: redisConnectionInfo{ + addresses: []string{"a:1", "b:2", "c:3"}, + hosts: []string{"a", "b", "c"}, + ports: []string{"1", "2", "3"}, + username: "username", + }, + }, + wantErr: nil, + }, + { + name: "username given in metadata from env", + metadata: map[string]string{ + "hosts": "a, b, c", + "ports": "1, 2, 3", + "stream": "my-stream", + "pendingEntriesCount": "10", + "consumerGroup": "consumer1", + "usernameFromEnv": "REDIS_USERNAME", + }, + authParams: map[string]string{}, + resolvedEnv: testRedisResolvedEnv, + wantMeta: &redisStreamsMetadata{ + streamName: "my-stream", + targetPendingEntriesCount: 10, + consumerGroupName: "consumer1", + connectionInfo: redisConnectionInfo{ + addresses: []string{"a:1", "b:2", "c:3"}, + hosts: []string{"a", "b", "c"}, + ports: []string{"1", "2", "3"}, + username: "none", + }, + }, + wantErr: nil, + }, + { + name: "password given in authParams", + metadata: map[string]string{ + "hosts": "a, b, c", + "ports": "1, 2, 3", + "stream": "my-stream", + "pendingEntriesCount": "10", + "consumerGroup": "consumer1", + }, + authParams: map[string]string{ + "password": "password", + }, + wantMeta: &redisStreamsMetadata{ + streamName: "my-stream", + targetPendingEntriesCount: 10, + consumerGroupName: "consumer1", + connectionInfo: redisConnectionInfo{ + addresses: []string{"a:1", "b:2", "c:3"}, + hosts: []string{"a", "b", "c"}, + ports: []string{"1", "2", "3"}, + password: "password", + }, + }, + wantErr: nil, + }, + { + name: "password given in metadata from env", + metadata: map[string]string{ + "hosts": "a, b, c", + "ports": "1, 2, 3", + "stream": "my-stream", + "pendingEntriesCount": "10", + "consumerGroup": "consumer1", + "passwordFromEnv": "REDIS_PASSWORD", + }, + authParams: map[string]string{}, + resolvedEnv: testRedisResolvedEnv, + wantMeta: &redisStreamsMetadata{ + streamName: "my-stream", + targetPendingEntriesCount: 10, + consumerGroupName: "consumer1", + connectionInfo: redisConnectionInfo{ + addresses: []string{"a:1", "b:2", "c:3"}, + hosts: []string{"a", "b", "c"}, + ports: []string{"1", "2", "3"}, + password: "none", + }, + }, + wantErr: nil, + }, } for _, testCase := range cases { @@ -266,3 +395,442 @@ func TestParseRedisClusterStreamsMetadata(t *testing.T) { }) } } + +func TestParseRedisSentinelStreamsMetadata(t *testing.T) { + cases := []struct { + name string + metadata map[string]string + resolvedEnv map[string]string + authParams map[string]string + wantMeta *redisStreamsMetadata + wantErr error + }{ + { + name: "empty metadata", + wantMeta: nil, + wantErr: errors.New("no addresses or hosts given. address should be a comma separated list of host:port or set the host/port values"), + }, + { + name: "unequal number of hosts/ports", + metadata: map[string]string{ + "hosts": "a, b, c", + "ports": "1, 2", + }, + wantMeta: nil, + wantErr: errors.New("not enough hosts or ports given. number of hosts should be equal to the number of ports"), + }, + { + name: "no stream name", + metadata: map[string]string{ + "hosts": "a, b, c", + "ports": "1, 2, 3", + "pendingEntriesCount": "5", + }, + wantMeta: nil, + wantErr: errors.New("missing redis stream name"), + }, + { + name: "missing pending entries count", + metadata: map[string]string{ + "hosts": "a, b, c", + "ports": "1, 2, 3", + "stream": "my-stream", + }, + wantMeta: nil, + wantErr: errors.New("missing pending entries count"), + }, + { + name: "invalid pending entries count", + metadata: map[string]string{ + "hosts": "a, b, c", + "ports": "1, 2, 3", + "pendingEntriesCount": "invalid", + }, + wantMeta: nil, + wantErr: errors.New("error parsing pending entries count"), + }, + { + name: "address is defined in auth params", + metadata: map[string]string{ + "stream": "my-stream", + "pendingEntriesCount": "10", + "consumerGroup": "consumer1", + }, + authParams: map[string]string{ + "addresses": ":7001, :7002", + }, + wantMeta: &redisStreamsMetadata{ + streamName: "my-stream", + targetPendingEntriesCount: 10, + consumerGroupName: "consumer1", + connectionInfo: redisConnectionInfo{ + addresses: []string{":7001", ":7002"}, + }, + }, + wantErr: nil, + }, + { + name: "hosts and ports given in auth params", + metadata: map[string]string{ + "stream": "my-stream", + "pendingEntriesCount": "10", + "consumerGroup": "consumer1", + }, + authParams: map[string]string{ + "hosts": " a, b, c ", + "ports": "1, 2, 3", + }, + wantMeta: &redisStreamsMetadata{ + streamName: "my-stream", + targetPendingEntriesCount: 10, + consumerGroupName: "consumer1", + connectionInfo: redisConnectionInfo{ + addresses: []string{"a:1", "b:2", "c:3"}, + hosts: []string{"a", "b", "c"}, + ports: []string{"1", "2", "3"}, + }, + }, + wantErr: nil, + }, + { + name: "username given in authParams", + metadata: map[string]string{ + "hosts": "a, b, c", + "ports": "1, 2, 3", + "stream": "my-stream", + "pendingEntriesCount": "10", + "consumerGroup": "consumer1", + }, + authParams: map[string]string{ + "username": "username", + }, + wantMeta: &redisStreamsMetadata{ + streamName: "my-stream", + targetPendingEntriesCount: 10, + consumerGroupName: "consumer1", + connectionInfo: redisConnectionInfo{ + addresses: []string{"a:1", "b:2", "c:3"}, + hosts: []string{"a", "b", "c"}, + ports: []string{"1", "2", "3"}, + username: "username", + }, + }, + wantErr: nil, + }, + { + name: "username given in metadata", + metadata: map[string]string{ + "hosts": "a, b, c", + "ports": "1, 2, 3", + "stream": "my-stream", + "pendingEntriesCount": "10", + "consumerGroup": "consumer1", + "username": "username", + }, + authParams: map[string]string{}, + wantMeta: &redisStreamsMetadata{ + streamName: "my-stream", + targetPendingEntriesCount: 10, + consumerGroupName: "consumer1", + connectionInfo: redisConnectionInfo{ + addresses: []string{"a:1", "b:2", "c:3"}, + hosts: []string{"a", "b", "c"}, + ports: []string{"1", "2", "3"}, + username: "username", + }, + }, + wantErr: nil, + }, + { + name: "username given in metadata from env", + metadata: map[string]string{ + "hosts": "a, b, c", + "ports": "1, 2, 3", + "stream": "my-stream", + "pendingEntriesCount": "10", + "consumerGroup": "consumer1", + "usernameFromEnv": "REDIS_USERNAME", + }, + authParams: map[string]string{}, + resolvedEnv: testRedisResolvedEnv, + wantMeta: &redisStreamsMetadata{ + streamName: "my-stream", + targetPendingEntriesCount: 10, + consumerGroupName: "consumer1", + connectionInfo: redisConnectionInfo{ + addresses: []string{"a:1", "b:2", "c:3"}, + hosts: []string{"a", "b", "c"}, + ports: []string{"1", "2", "3"}, + username: "none", + }, + }, + wantErr: nil, + }, + { + name: "password given in authParams", + metadata: map[string]string{ + "hosts": "a, b, c", + "ports": "1, 2, 3", + "stream": "my-stream", + "pendingEntriesCount": "10", + "consumerGroup": "consumer1", + }, + authParams: map[string]string{ + "password": "password", + }, + wantMeta: &redisStreamsMetadata{ + streamName: "my-stream", + targetPendingEntriesCount: 10, + consumerGroupName: "consumer1", + connectionInfo: redisConnectionInfo{ + addresses: []string{"a:1", "b:2", "c:3"}, + hosts: []string{"a", "b", "c"}, + ports: []string{"1", "2", "3"}, + password: "password", + }, + }, + wantErr: nil, + }, + { + name: "password given in metadata from env", + metadata: map[string]string{ + "hosts": "a, b, c", + "ports": "1, 2, 3", + "stream": "my-stream", + "pendingEntriesCount": "10", + "consumerGroup": "consumer1", + "passwordFromEnv": "REDIS_PASSWORD", + }, + authParams: map[string]string{}, + resolvedEnv: testRedisResolvedEnv, + wantMeta: &redisStreamsMetadata{ + streamName: "my-stream", + targetPendingEntriesCount: 10, + consumerGroupName: "consumer1", + connectionInfo: redisConnectionInfo{ + addresses: []string{"a:1", "b:2", "c:3"}, + hosts: []string{"a", "b", "c"}, + ports: []string{"1", "2", "3"}, + password: "none", + }, + }, + wantErr: nil, + }, + { + name: "sentinelUsername given in authParams", + metadata: map[string]string{ + "hosts": "a, b, c", + "ports": "1, 2, 3", + "stream": "my-stream", + "pendingEntriesCount": "10", + "consumerGroup": "consumer1", + }, + authParams: map[string]string{ + "sentinelUsername": "sentinelUsername", + }, + wantMeta: &redisStreamsMetadata{ + streamName: "my-stream", + targetPendingEntriesCount: 10, + consumerGroupName: "consumer1", + connectionInfo: redisConnectionInfo{ + addresses: []string{"a:1", "b:2", "c:3"}, + hosts: []string{"a", "b", "c"}, + ports: []string{"1", "2", "3"}, + sentinelUsername: "sentinelUsername", + }, + }, + wantErr: nil, + }, + { + name: "sentinelUsername given in metadata", + metadata: map[string]string{ + "hosts": "a, b, c", + "ports": "1, 2, 3", + "stream": "my-stream", + "pendingEntriesCount": "10", + "consumerGroup": "consumer1", + "sentinelUsername": "sentinelUsername", + }, + authParams: map[string]string{}, + wantMeta: &redisStreamsMetadata{ + streamName: "my-stream", + targetPendingEntriesCount: 10, + consumerGroupName: "consumer1", + connectionInfo: redisConnectionInfo{ + addresses: []string{"a:1", "b:2", "c:3"}, + hosts: []string{"a", "b", "c"}, + ports: []string{"1", "2", "3"}, + sentinelUsername: "sentinelUsername", + }, + }, + wantErr: nil, + }, + { + name: "sentinelUsername given in metadata from env", + metadata: map[string]string{ + "hosts": "a, b, c", + "ports": "1, 2, 3", + "stream": "my-stream", + "pendingEntriesCount": "10", + "consumerGroup": "consumer1", + "sentinelUsernameFromEnv": "REDIS_SENTINEL_USERNAME", + }, + authParams: map[string]string{}, + resolvedEnv: testRedisResolvedEnv, + wantMeta: &redisStreamsMetadata{ + streamName: "my-stream", + targetPendingEntriesCount: 10, + consumerGroupName: "consumer1", + connectionInfo: redisConnectionInfo{ + addresses: []string{"a:1", "b:2", "c:3"}, + hosts: []string{"a", "b", "c"}, + ports: []string{"1", "2", "3"}, + sentinelUsername: "none", + }, + }, + wantErr: nil, + }, + { + name: "sentinelPassword given in authParams", + metadata: map[string]string{ + "hosts": "a, b, c", + "ports": "1, 2, 3", + "stream": "my-stream", + "pendingEntriesCount": "10", + "consumerGroup": "consumer1", + }, + authParams: map[string]string{ + "sentinelPassword": "sentinelPassword", + }, + wantMeta: &redisStreamsMetadata{ + streamName: "my-stream", + targetPendingEntriesCount: 10, + consumerGroupName: "consumer1", + connectionInfo: redisConnectionInfo{ + addresses: []string{"a:1", "b:2", "c:3"}, + hosts: []string{"a", "b", "c"}, + ports: []string{"1", "2", "3"}, + sentinelPassword: "sentinelPassword", + }, + }, + wantErr: nil, + }, + { + name: "sentinelPassword given in metadata from env", + metadata: map[string]string{ + "hosts": "a, b, c", + "ports": "1, 2, 3", + "stream": "my-stream", + "pendingEntriesCount": "10", + "consumerGroup": "consumer1", + "sentinelPasswordFromEnv": "REDIS_SENTINEL_PASSWORD", + }, + authParams: map[string]string{}, + resolvedEnv: testRedisResolvedEnv, + wantMeta: &redisStreamsMetadata{ + streamName: "my-stream", + targetPendingEntriesCount: 10, + consumerGroupName: "consumer1", + connectionInfo: redisConnectionInfo{ + addresses: []string{"a:1", "b:2", "c:3"}, + hosts: []string{"a", "b", "c"}, + ports: []string{"1", "2", "3"}, + sentinelPassword: "none", + }, + }, + wantErr: nil, + }, + { + name: "sentinelMaster given in authParams", + metadata: map[string]string{ + "hosts": "a, b, c", + "ports": "1, 2, 3", + "stream": "my-stream", + "pendingEntriesCount": "10", + "consumerGroup": "consumer1", + }, + authParams: map[string]string{ + "sentinelMaster": "sentinelMaster", + }, + wantMeta: &redisStreamsMetadata{ + streamName: "my-stream", + targetPendingEntriesCount: 10, + consumerGroupName: "consumer1", + connectionInfo: redisConnectionInfo{ + addresses: []string{"a:1", "b:2", "c:3"}, + hosts: []string{"a", "b", "c"}, + ports: []string{"1", "2", "3"}, + sentinelMaster: "sentinelMaster", + }, + }, + wantErr: nil, + }, + { + name: "sentinelMaster given in metadata", + metadata: map[string]string{ + "hosts": "a, b, c", + "ports": "1, 2, 3", + "stream": "my-stream", + "pendingEntriesCount": "10", + "consumerGroup": "consumer1", + "sentinelMaster": "sentinelMaster", + }, + authParams: map[string]string{}, + wantMeta: &redisStreamsMetadata{ + streamName: "my-stream", + targetPendingEntriesCount: 10, + consumerGroupName: "consumer1", + connectionInfo: redisConnectionInfo{ + addresses: []string{"a:1", "b:2", "c:3"}, + hosts: []string{"a", "b", "c"}, + ports: []string{"1", "2", "3"}, + sentinelMaster: "sentinelMaster", + }, + }, + wantErr: nil, + }, + { + name: "sentinelMaster given in metadata from env", + metadata: map[string]string{ + "hosts": "a, b, c", + "ports": "1, 2, 3", + "stream": "my-stream", + "pendingEntriesCount": "10", + "consumerGroup": "consumer1", + "sentinelMasterFromEnv": "REDIS_SENTINEL_MASTER", + }, + authParams: map[string]string{}, + resolvedEnv: testRedisResolvedEnv, + wantMeta: &redisStreamsMetadata{ + streamName: "my-stream", + targetPendingEntriesCount: 10, + consumerGroupName: "consumer1", + connectionInfo: redisConnectionInfo{ + addresses: []string{"a:1", "b:2", "c:3"}, + hosts: []string{"a", "b", "c"}, + ports: []string{"1", "2", "3"}, + sentinelMaster: "none", + }, + }, + wantErr: nil, + }, + } + + for _, testCase := range cases { + c := testCase + t.Run(c.name, func(t *testing.T) { + config := &ScalerConfig{ + TriggerMetadata: c.metadata, + ResolvedEnv: c.resolvedEnv, + AuthParams: c.authParams, + } + meta, err := parseRedisStreamsMetadata(config, parseRedisSentinelAddress) + if c.wantErr != nil { + assert.Contains(t, err.Error(), c.wantErr.Error()) + } else { + assert.NoError(t, err) + } + assert.Equal(t, c.wantMeta, meta) + }) + } +} diff --git a/pkg/scaling/scale_handler.go b/pkg/scaling/scale_handler.go index be2794295a9..c2bb8638939 100644 --- a/pkg/scaling/scale_handler.go +++ b/pkg/scaling/scale_handler.go @@ -382,13 +382,17 @@ func buildScaler(ctx context.Context, client client.Client, triggerType string, case "rabbitmq": return scalers.NewRabbitMQScaler(config) case "redis": - return scalers.NewRedisScaler(ctx, false, config) + return scalers.NewRedisScaler(ctx, false, false, config) case "redis-cluster": - return scalers.NewRedisScaler(ctx, true, config) + return scalers.NewRedisScaler(ctx, true, false, config) case "redis-cluster-streams": - return scalers.NewRedisStreamsScaler(ctx, true, config) + return scalers.NewRedisStreamsScaler(ctx, true, false, config) + case "redis-sentinel": + return scalers.NewRedisScaler(ctx, false, true, config) + case "redis-sentinel-streams": + return scalers.NewRedisStreamsScaler(ctx, false, true, config) case "redis-streams": - return scalers.NewRedisStreamsScaler(ctx, false, config) + return scalers.NewRedisStreamsScaler(ctx, false, false, config) case "selenium-grid": return scalers.NewSeleniumGridScaler(config) case "solace-event-queue": diff --git a/tests/scalers/redis-sentinel-lists.test.ts b/tests/scalers/redis-sentinel-lists.test.ts new file mode 100644 index 00000000000..cc95430222d --- /dev/null +++ b/tests/scalers/redis-sentinel-lists.test.ts @@ -0,0 +1,567 @@ +import test from 'ava' +import * as sh from 'shelljs' +import * as tmp from 'tmp' +import * as fs from 'fs' +import {waitForRollout} from "./helpers"; + +const redisNamespace = 'redis-sentinel' +const redisService = 'redis-sentinel' +const testNamespace = 'redis-sentinel-lists-test' +const redisStatefulSetName = 'redis-sentinel-node' +const redisSentinelName = 'redis-sentinel' +const redisSentinelMasterName = 'mymaster' +const redisPassword = 'my-password' +let redisHost = '' +const redisPort = 26379 +let redisAddress = '' +const listNameForHostPortRef = 'my-test-list-host-port-ref' +const listNameForAddressRef = 'my-test-list-address-ref' +const listNameForHostPortTriggerAuth = 'my-test-list-host-port-trigger' +const redisWorkerHostPortRefDeploymentName = 'redis-worker-test-hostport' +const redisWorkerAddressRefDeploymentName = 'redis-worker-test-address' +const redisWorkerHostPortRefTriggerAuthDeploymentName = 'redis-worker-test-hostport-triggerauth' +const itemsToWrite = 200 +const deploymentContainerImage = 'ghcr.io/kedacore/tests-redis-sentinel-lists' +const writeJobNameForHostPortRef = 'redis-writer-host-port-ref' +const writeJobNameForAddressRef = 'redis-writer-address-ref' +const writeJobNameForHostPortInTriggerAuth = 'redis-writer-host-port-trigger-auth' + +test.before(t => { + // Deploy Redis sentinel. + sh.exec(`kubectl create namespace ${redisNamespace}`) + sh.exec(`helm repo add bitnami https://charts.bitnami.com/bitnami`) + + let sentinelStatus = sh.exec(`helm install --timeout 600s ${redisSentinelName} --namespace ${redisNamespace} --set "sentinel.enabled=true" --set "global.redis.password=${redisPassword}" bitnami/redis`).code + t.is(0, + sentinelStatus, + 'creating a Redis sentinel setup should work.' + ) + + // Wait for Redis sentinel to be ready. + t.is(0, waitForRollout('statefulset', redisStatefulSetName, redisNamespace)) + + // Get Redis sentinel address. + redisHost = sh.exec(`kubectl get svc ${redisService} -n ${redisNamespace} -o jsonpath='{.spec.clusterIP}'`) + redisAddress = `${redisHost}:${redisPort}` + + // Create test namespace. + sh.exec(`kubectl create namespace ${testNamespace}`) + + const triggerAuthTmpFile = tmp.fileSync() + const base64Password = Buffer.from(redisPassword).toString('base64') + fs.writeFileSync(triggerAuthTmpFile.name, scaledObjectTriggerAuthYaml.replace('{{REDIS_PASSWORD}}', base64Password).replace('{{REDIS_SENTINEL_PASSWORD}}', base64Password)) + + t.is( + 0, + sh.exec(`kubectl apply -f ${triggerAuthTmpFile.name} --namespace ${testNamespace}`).code, + 'creating trigger auth should work..' + ) + + const triggerAuthHostPortTmpFile = tmp.fileSync() + + fs.writeFileSync(triggerAuthHostPortTmpFile.name, + scaledObjectTriggerAuthHostPortYaml.replace('{{REDIS_PASSWORD}}', base64Password) + .replace('{{REDIS_SENTINEL_PASSWORD}}', base64Password) + .replace('{{REDIS_SENTINEL_MASTER}}', Buffer.from(redisSentinelMasterName).toString('base64')) + .replace('{{REDIS_HOSTS}}', Buffer.from(redisHost).toString('base64')) + .replace('{{REDIS_PORTS}}', Buffer.from(redisPort.toString()).toString('base64')) + ) + + t.is( + 0, + sh.exec(`kubectl apply -f ${triggerAuthHostPortTmpFile.name} --namespace ${testNamespace}`).code, + 'creating trigger auth with host port should work..' + ) + + // Create a deployment with host and port. + const deploymentHostPortRefTmpFile = tmp.fileSync() + + fs.writeFileSync(deploymentHostPortRefTmpFile.name, redisListDeployHostPortYaml.replace(/{{REDIS_PASSWORD}}/g, redisPassword) + .replace(/{{REDIS_SENTINEL_PASSWORD}}/g, redisPassword) + .replace(/{{REDIS_SENTINEL_MASTER}}/g, redisSentinelMasterName) + .replace(/{{REDIS_HOSTS}}/g, redisHost) + .replace(/{{REDIS_PORTS}}/g, redisPort.toString()) + .replace(/{{LIST_NAME}}/g, listNameForHostPortRef) + .replace(/{{DEPLOYMENT_NAME}}/g, redisWorkerHostPortRefDeploymentName) + .replace(/{{CONTAINER_IMAGE}}/g, deploymentContainerImage) + ) + + t.is( + 0, + sh.exec(`kubectl apply -f ${deploymentHostPortRefTmpFile.name} --namespace ${testNamespace}`).code, + 'creating a deployment using redis host and port envs should work..' + ) + + const deploymentAddressRefTmpFile = tmp.fileSync() + + fs.writeFileSync(deploymentAddressRefTmpFile.name, redisListDeployAddressYaml.replace(/{{REDIS_PASSWORD}}/g, redisPassword) + .replace(/{{REDIS_SENTINEL_PASSWORD}}/g, redisPassword) + .replace(/{{REDIS_SENTINEL_MASTER}}/g, redisSentinelMasterName) + .replace(/{{REDIS_ADDRESSES}}/g, redisAddress) + .replace(/{{LIST_NAME}}/g, listNameForAddressRef) + .replace(/{{DEPLOYMENT_NAME}}/g, redisWorkerAddressRefDeploymentName) + .replace(/{{CONTAINER_IMAGE}}/g, deploymentContainerImage) + ) + + t.is( + 0, + sh.exec(`kubectl apply -f ${deploymentAddressRefTmpFile.name} --namespace ${testNamespace}`).code, + 'creating a deployment using redis address var should work..' + ) + + + const deploymentHostPortRefTriggerAuthTmpFile = tmp.fileSync() + + fs.writeFileSync(deploymentHostPortRefTriggerAuthTmpFile.name, redisListDeployHostPortInTriggerAuhYaml.replace(/{{REDIS_PASSWORD}}/g, redisPassword) + .replace(/{{REDIS_SENTINEL_PASSWORD}}/g, redisPassword) + .replace(/{{REDIS_SENTINEL_MASTER}}/g, redisSentinelMasterName) + .replace(/{{REDIS_HOSTS}}/g, redisHost) + .replace(/{{REDIS_PORTS}}/g, redisPort.toString()) + .replace(/{{LIST_NAME}}/g, listNameForHostPortTriggerAuth) + .replace(/{{DEPLOYMENT_NAME}}/g, redisWorkerHostPortRefTriggerAuthDeploymentName) + .replace(/{{CONTAINER_IMAGE}}/g, deploymentContainerImage) + ) + + t.is( + 0, + sh.exec(`kubectl apply -f ${deploymentHostPortRefTriggerAuthTmpFile.name} --namespace ${testNamespace}`).code, + 'creating a deployment using redis host port in trigger auth should work..' + ) +}) + +test.serial('Deployment for redis host and port env vars should have 0 replica on start', t => { + + const replicaCount = sh.exec( + `kubectl get deployment/${redisWorkerHostPortRefDeploymentName} --namespace ${testNamespace} -o jsonpath="{.spec.replicas}"` + ).stdout + t.is(replicaCount, '0', 'replica count should start out as 0') +}) + + +test.serial(`Deployment using redis host port env vars should max and scale to 5 with ${itemsToWrite} items written to list and back to 0`, t => { + runWriteJob(t, writeJobNameForHostPortRef, listNameForHostPortRef) + + let replicaCount = '0' + for (let i = 0; i < 30 && replicaCount !== '5'; i++) { + replicaCount = sh.exec( + `kubectl get deployment/${redisWorkerHostPortRefDeploymentName} --namespace ${testNamespace} -o jsonpath="{.spec.replicas}"` + ).stdout + t.log('(scale up) replica count is:' + replicaCount) + if (replicaCount !== '5') { + sh.exec('sleep 3s') + } + } + + t.is('5', replicaCount, 'Replica count should be 5 within 60 seconds') + + for (let i = 0; i < 12 && replicaCount !== '0'; i++) { + replicaCount = sh.exec( + `kubectl get deployment/${redisWorkerHostPortRefDeploymentName} --namespace ${testNamespace} -o jsonpath="{.spec.replicas}"` + ).stdout + t.log('(scale down) replica count is:' + replicaCount) + if (replicaCount !== '0') { + sh.exec('sleep 10s') + } + } + + t.is('0', replicaCount, 'Replica count should be 0 within 2 minutes') +}) + +test.serial('Deployment for redis address env var should have 0 replica on start', t => { + + const replicaCount = sh.exec( + `kubectl get deployment/${redisWorkerAddressRefDeploymentName} --namespace ${testNamespace} -o jsonpath="{.spec.replicas}"` + ).stdout + t.is(replicaCount, '0', 'replica count should start out as 0') +}) + + + +test.serial(`Deployment using redis address env var should max and scale to 5 with ${itemsToWrite} items written to list and back to 0`, t => { + + runWriteJob(t, writeJobNameForAddressRef, listNameForAddressRef) + + let replicaCount = '0' + for (let i = 0; i < 30 && replicaCount !== '5'; i++) { + replicaCount = sh.exec( + `kubectl get deployment/${redisWorkerAddressRefDeploymentName} --namespace ${testNamespace} -o jsonpath="{.spec.replicas}"` + ).stdout + t.log('(scale up) replica count is:' + replicaCount) + if (replicaCount !== '5') { + sh.exec('sleep 3s') + } + } + + t.is('5', replicaCount, 'Replica count should be 5 within 60 seconds') + + for (let i = 0; i < 12 && replicaCount !== '0'; i++) { + replicaCount = sh.exec( + `kubectl get deployment/${redisWorkerAddressRefDeploymentName} --namespace ${testNamespace} -o jsonpath="{.spec.replicas}"` + ).stdout + t.log('(scale down) replica count is:' + replicaCount) + if (replicaCount !== '0') { + sh.exec('sleep 10s') + } + } + + t.is('0', replicaCount, 'Replica count should be 0 within 2 minutes') +}) + + +test.serial('Deployment for redis host and port in the trigger auth should have 0 replica on start', t => { + + const replicaCount = sh.exec( + `kubectl get deployment/${redisWorkerHostPortRefTriggerAuthDeploymentName} --namespace ${testNamespace} -o jsonpath="{.spec.replicas}"` + ).stdout + t.is(replicaCount, '0', 'replica count should start out as 0') +}) + + +test.serial(`Deployment using redis host port in triggerAuth should max and scale to 5 with ${itemsToWrite} items written to list and back to 0`, t => { + + runWriteJob(t, writeJobNameForHostPortInTriggerAuth, listNameForHostPortTriggerAuth) + + let replicaCount = '0' + for (let i = 0; i < 30 && replicaCount !== '5'; i++) { + replicaCount = sh.exec( + `kubectl get deployment/${redisWorkerHostPortRefTriggerAuthDeploymentName} --namespace ${testNamespace} -o jsonpath="{.spec.replicas}"` + ).stdout + t.log('(scale up) replica count is:' + replicaCount) + if (replicaCount !== '5') { + sh.exec('sleep 3s') + } + } + + t.is('5', replicaCount, 'Replica count should be 5 within 60 seconds') + + for (let i = 0; i < 12 && replicaCount !== '0'; i++) { + replicaCount = sh.exec( + `kubectl get deployment/${redisWorkerHostPortRefTriggerAuthDeploymentName} --namespace ${testNamespace} -o jsonpath="{.spec.replicas}"` + ).stdout + t.log('(scale down) replica count is:' + replicaCount) + if (replicaCount !== '0') { + sh.exec('sleep 10s') + } + } + + t.is('0', replicaCount, 'Replica count should be 0 within 2 minutes') +}) + + +test.after.always.cb('clean up deployment', t => { + const resources = [ + `job/${writeJobNameForHostPortRef}`, + `job/${writeJobNameForAddressRef}`, + `job/${writeJobNameForHostPortInTriggerAuth}`, + `scaledobject.keda.sh/${redisWorkerHostPortRefDeploymentName}`, + `scaledobject.keda.sh/${redisWorkerAddressRefDeploymentName}`, + `scaledobject.keda.sh/${redisWorkerHostPortRefTriggerAuthDeploymentName}`, + 'triggerauthentication.keda.sh/keda-redis-sentinel-list-triggerauth', + 'triggerauthentication.keda.sh/keda-redis-sentinel-list-triggerauth-host-port', + `deployment/${redisWorkerAddressRefDeploymentName}`, + `deployment/${redisWorkerHostPortRefTriggerAuthDeploymentName}`, + `deployment/${redisWorkerHostPortRefDeploymentName}`, + 'secret/redis-password', + ] + + for (const resource of resources) { + sh.exec(`kubectl delete ${resource} --namespace ${testNamespace}`) + } + sh.exec(`kubectl delete namespace ${testNamespace}`) + + sh.exec(`helm delete ${redisSentinelName} --namespace ${redisNamespace}`) + sh.exec(`kubectl delete namespace ${redisNamespace}`) + t.end() +}) + +function runWriteJob(t, jobName, listName) { + // write to list + const tmpFile = tmp.fileSync() + fs.writeFileSync(tmpFile.name, writeJobYaml.replace('{{REDIS_ADDRESSES}}', redisAddress).replace('{{REDIS_PASSWORD}}', redisPassword) + .replace('{{REDIS_SENTINEL_PASSWORD}}', redisPassword) + .replace('{{REDIS_SENTINEL_MASTER}}', redisSentinelMasterName) + .replace('{{LIST_NAME}}', listName) + .replace('{{NUMBER_OF_ITEMS_TO_WRITE}}', itemsToWrite.toString()) + .replace('{{CONTAINER_IMAGE}}', deploymentContainerImage) + .replace('{{JOB_NAME}}', jobName) + ) + + t.is( + 0, + sh.exec(`kubectl apply -f ${tmpFile.name} --namespace ${testNamespace}`).code, + 'list writer job should apply.' + ) + + // wait for the write job to complete + for (let i = 0; i < 20; i++) { + const succeeded = sh.exec(`kubectl get job ${writeJobNameForHostPortRef} --namespace ${testNamespace} -o jsonpath='{.items[0].status.succeeded}'`).stdout + if (succeeded == '1') { + break + } + sh.exec('sleep 1s') + } +} + +const redisListDeployHostPortYaml = `apiVersion: apps/v1 +kind: Deployment +metadata: + name: {{DEPLOYMENT_NAME}} + labels: + app: {{DEPLOYMENT_NAME}} +spec: + replicas: 0 + selector: + matchLabels: + app: {{DEPLOYMENT_NAME}} + template: + metadata: + labels: + app: {{DEPLOYMENT_NAME}} + spec: + containers: + - name: redis-worker + image: {{CONTAINER_IMAGE}} + imagePullPolicy: IfNotPresent + command: ["./main"] + args: ["read"] + env: + - name: REDIS_HOSTS + value: {{REDIS_HOSTS}} + - name: REDIS_PORTS + value: "{{REDIS_PORTS}}" + - name: LIST_NAME + value: {{LIST_NAME}} + - name: REDIS_PASSWORD + value: {{REDIS_PASSWORD}} + - name: REDIS_SENTINEL_PASSWORD + value: {{REDIS_SENTINEL_PASSWORD}} + - name: REDIS_SENTINEL_MASTER + value: {{REDIS_SENTINEL_MASTER}} + - name: READ_PROCESS_TIME + value: "500" +--- +apiVersion: keda.sh/v1alpha1 +kind: ScaledObject +metadata: + name: {{DEPLOYMENT_NAME}} +spec: + scaleTargetRef: + name: {{DEPLOYMENT_NAME}} + pollingInterval: 5 + cooldownPeriod: 30 + minReplicaCount: 0 + maxReplicaCount: 5 + triggers: + - type: redis-sentinel + metadata: + hostsFromEnv: REDIS_HOSTS + portsFromEnv: REDIS_PORTS + listName: {{LIST_NAME}} + listLength: "5" + sentinelMaster: {{REDIS_SENTINEL_MASTER}} + authenticationRef: + name: keda-redis-sentinel-list-triggerauth +` + + +const redisListDeployAddressYaml = `apiVersion: apps/v1 +kind: Deployment +metadata: + name: {{DEPLOYMENT_NAME}} + labels: + app: {{DEPLOYMENT_NAME}} +spec: + replicas: 0 + selector: + matchLabels: + app: {{DEPLOYMENT_NAME}} + template: + metadata: + labels: + app: {{DEPLOYMENT_NAME}} + spec: + containers: + - name: redis-worker + image: {{CONTAINER_IMAGE}} + imagePullPolicy: IfNotPresent + command: ["./main"] + args: ["read"] + env: + - name: REDIS_ADDRESSES + value: {{REDIS_ADDRESSES}} + - name: LIST_NAME + value: {{LIST_NAME}} + - name: REDIS_PASSWORD + value: {{REDIS_PASSWORD}} + - name: REDIS_SENTINEL_PASSWORD + value: {{REDIS_SENTINEL_PASSWORD}} + - name: REDIS_SENTINEL_MASTER + value: {{REDIS_SENTINEL_MASTER}} + - name: READ_PROCESS_TIME + value: "500" +--- +apiVersion: keda.sh/v1alpha1 +kind: ScaledObject +metadata: + name: {{DEPLOYMENT_NAME}} +spec: + scaleTargetRef: + name: {{DEPLOYMENT_NAME}} + pollingInterval: 5 + cooldownPeriod: 30 + minReplicaCount: 0 + maxReplicaCount: 5 + triggers: + - type: redis-sentinel + metadata: + addressesFromEnv: REDIS_ADDRESSES + listName: {{LIST_NAME}} + listLength: "5" + sentinelMaster: {{REDIS_SENTINEL_MASTER}} + authenticationRef: + name: keda-redis-sentinel-list-triggerauth +` + +const redisListDeployHostPortInTriggerAuhYaml = `apiVersion: apps/v1 +kind: Deployment +metadata: + name: {{DEPLOYMENT_NAME}} + labels: + app: {{DEPLOYMENT_NAME}} +spec: + replicas: 0 + selector: + matchLabels: + app: {{DEPLOYMENT_NAME}} + template: + metadata: + labels: + app: {{DEPLOYMENT_NAME}} + spec: + containers: + - name: redis-worker + image: {{CONTAINER_IMAGE}} + imagePullPolicy: IfNotPresent + command: ["./main"] + args: ["read"] + env: + - name: REDIS_HOSTS + value: {{REDIS_HOSTS}} + - name: REDIS_PORTS + value: "{{REDIS_PORTS}}" + - name: LIST_NAME + value: {{LIST_NAME}} + - name: REDIS_PASSWORD + value: {{REDIS_PASSWORD}} + - name: REDIS_SENTINEL_PASSWORD + value: {{REDIS_SENTINEL_PASSWORD}} + - name: REDIS_SENTINEL_MASTER + value: {{REDIS_SENTINEL_MASTER}} + - name: READ_PROCESS_TIME + value: "500" +--- +apiVersion: keda.sh/v1alpha1 +kind: ScaledObject +metadata: + name: {{DEPLOYMENT_NAME}} +spec: + scaleTargetRef: + name: {{DEPLOYMENT_NAME}} + pollingInterval: 5 + cooldownPeriod: 30 + minReplicaCount: 0 + maxReplicaCount: 5 + triggers: + - type: redis-sentinel + metadata: + listName: {{LIST_NAME}} + listLength: "5" + sentinelMaster: {{REDIS_SENTINEL_MASTER}} + authenticationRef: + name: keda-redis-sentinel-list-triggerauth-host-port +` + +const scaledObjectTriggerAuthHostPortYaml = `apiVersion: v1 +kind: Secret +metadata: + name: redis-config +type: Opaque +data: + password: {{REDIS_PASSWORD}} + sentinelPassword: {{REDIS_SENTINEL_PASSWORD}} + redisHost: {{REDIS_HOSTS}} + redisPort: {{REDIS_PORTS}} +--- +apiVersion: keda.sh/v1alpha1 +kind: TriggerAuthentication +metadata: + name: keda-redis-sentinel-list-triggerauth-host-port +spec: + secretTargetRef: + - parameter: password + name: redis-config + key: password + - parameter: sentinelPassword + name: redis-config + key: sentinelPassword + - parameter: hosts + name: redis-config + key: redisHost + - parameter: ports + name: redis-config + key: redisPort +` + +const scaledObjectTriggerAuthYaml = `apiVersion: v1 +kind: Secret +metadata: + name: redis-password +type: Opaque +data: + password: {{REDIS_PASSWORD}} + sentinelPassword: {{REDIS_SENTINEL_PASSWORD}} +--- +apiVersion: keda.sh/v1alpha1 +kind: TriggerAuthentication +metadata: + name: keda-redis-sentinel-list-triggerauth +spec: + secretTargetRef: + - parameter: password + name: redis-password + key: password + - parameter: sentinelPassword + name: redis-password + key: sentinelPassword +` + + +const writeJobYaml = `apiVersion: batch/v1 +kind: Job +metadata: + name: {{JOB_NAME}} +spec: + template: + spec: + containers: + - name: redis + image: {{CONTAINER_IMAGE}} + imagePullPolicy: IfNotPresent + command: ["./main"] + env: + - name: REDIS_ADDRESSES + value: {{REDIS_ADDRESSES}} + - name: REDIS_PASSWORD + value: {{REDIS_PASSWORD}} + - name: REDIS_SENTINEL_PASSWORD + value: {{REDIS_SENTINEL_PASSWORD}} + - name: REDIS_SENTINEL_MASTER + value: {{REDIS_SENTINEL_MASTER}} + - name: LIST_NAME + value: {{LIST_NAME}} + - name: NO_LIST_ITEMS_TO_WRITE + value: "{{NUMBER_OF_ITEMS_TO_WRITE}}" + args: ["write"] + restartPolicy: Never + backoffLimit: 4 +` diff --git a/tests/scalers/redis-sentinel-streams.test.ts b/tests/scalers/redis-sentinel-streams.test.ts new file mode 100644 index 00000000000..61e43173803 --- /dev/null +++ b/tests/scalers/redis-sentinel-streams.test.ts @@ -0,0 +1,227 @@ +import test from 'ava' +import * as sh from 'shelljs' +import * as tmp from 'tmp' +import * as fs from 'fs' +import {waitForDeploymentReplicaCount, waitForRollout} from "./helpers"; + +const redisNamespace = 'redis-sentinel-streams' +const redisSentinelName = 'redis-sentinel-streams' +const redisSentinelMasterName = 'mymaster' +const redisStatefulSetName = 'redis-sentinel-streams-node' +const redisService = 'redis-sentinel-streams' +const testNamespace = 'redis-sentinel-streams-test' +const redisPassword = 'foobared' +let redisHost = '' +const numMessages = 100 + +test.before(t => { + // Deploy Redis Sentinel. + sh.exec(`kubectl create namespace ${redisNamespace}`) + sh.exec(`helm repo add bitnami https://charts.bitnami.com/bitnami`) + + let sentinelStatus = sh.exec(`helm install --timeout 600s ${redisSentinelName} --namespace ${redisNamespace} --set "sentinel.enabled=true" --set "global.redis.password=${redisPassword}" bitnami/redis`).code + t.is(0, + sentinelStatus, + 'creating a Redis Sentinel setup should work.' + ) + + // Wait for Redis Sentinel to be ready. + let exitCode = waitForRollout('statefulset', redisStatefulSetName, redisNamespace) + t.is(0, exitCode, 'expected rollout status for redis to finish successfully') + + // Get Redis Sentinel address. + redisHost = sh.exec(`kubectl get svc ${redisService} -n ${redisNamespace} -o jsonpath='{.spec.clusterIP}'`) + + // Create test namespace. + sh.exec(`kubectl create namespace ${testNamespace}`) + + // Deploy streams consumer app, scaled object etc. + const tmpFile = tmp.fileSync() + const base64Password = Buffer.from(redisPassword).toString('base64') + + fs.writeFileSync(tmpFile.name, redisStreamsDeployYaml.replace('{{REDIS_PASSWORD}}', base64Password).replace('{{REDIS_SENTINEL_PASSWORD}}', base64Password).replace('{{REDIS_SENTINEL_MASTER}}', redisSentinelMasterName).replace('{{REDIS_HOSTS}}', redisHost)) + t.is( + 0, + sh.exec(`kubectl apply -f ${tmpFile.name} --namespace ${testNamespace}`).code, + 'creating a deployment should work..' + ) +}) + +test.serial('Deployment should have 1 replica on start', t => { + + const replicaCount = sh.exec( + `kubectl get deployment/redis-streams-consumer --namespace ${testNamespace} -o jsonpath="{.spec.replicas}"` + ).stdout + t.is(replicaCount, '1', 'replica count should start out as 1') +}) + +test.serial(`Deployment should scale to 5 with ${numMessages} messages and back to 1`, async t => { + // Publish messages to redis streams. + const tmpFile = tmp.fileSync() + fs.writeFileSync(tmpFile.name, producerDeployYaml.replace('{{NUM_MESSAGES}}', numMessages.toString()) + .replace('{{REDIS_SENTINEL_MASTER}}', redisSentinelMasterName) + .replace('{{REDIS_HOSTS}}', redisHost)) + t.is( + 0, + sh.exec(`kubectl apply -f ${tmpFile.name} --namespace ${testNamespace}`).code, + 'producer job should apply.' + ) + + // Wait for producer job to finish. + for (let i = 0; i < 40; i++) { + const succeeded = sh.exec(`kubectl get job --namespace ${testNamespace} -o jsonpath='{.items[0].status.succeeded}'`).stdout + if (succeeded == '1') { + break + } + sh.exec('sleep 1s') + } + // With messages published, the consumer deployment should start receiving the messages. + t.true(await waitForDeploymentReplicaCount(5, 'redis-streams-consumer', testNamespace, 30, 3000), 'Replica count should be 5 within 60 seconds') + t.true(await waitForDeploymentReplicaCount(1, 'redis-streams-consumer', testNamespace, 60, 10000), 'Replica count should be 1 within 10 minutes') +}) + + + +test.after.always.cb('clean up deployment', t => { + const resources = [ + 'scaledobject.keda.sh/redis-streams-scaledobject', + 'triggerauthentications.keda.sh/keda-redis-stream-triggerauth', + 'secret/redis-password', + 'deployment/redis-streams-consumer', + 'job/redis-streams-producer', + ] + + for (const resource of resources) { + sh.exec(`kubectl delete ${resource} --namespace ${testNamespace}`) + } + sh.exec(`kubectl delete namespace ${testNamespace}`) + + sh.exec(`helm delete ${redisSentinelName} --namespace ${redisNamespace}`) + sh.exec(`kubectl delete namespace ${redisNamespace}`) + t.end() +}) + +const redisStreamsDeployYaml = `apiVersion: v1 +kind: Secret +metadata: + name: redis-password +type: Opaque +data: + password: {{REDIS_PASSWORD}} + sentinelPassword: {{REDIS_SENTINEL_PASSWORD}} +--- +apiVersion: keda.sh/v1alpha1 +kind: TriggerAuthentication +metadata: + name: keda-redis-stream-triggerauth +spec: + secretTargetRef: + - parameter: password + name: redis-password + key: password + - parameter: sentinelPassword + name: redis-password + key: sentinelPassword +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: redis-streams-consumer +spec: + replicas: 1 + selector: + matchLabels: + app: redis-streams-consumer + template: + metadata: + labels: + app: redis-streams-consumer + spec: + containers: + - name: redis-streams-consumer + image: ghcr.io/kedacore/tests-redis-sentinel-streams + command: ["./main"] + args: ["consumer"] + imagePullPolicy: Always + env: + - name: REDIS_HOSTS + value: {{REDIS_HOSTS}} + - name: REDIS_PORTS + value: "26379" + - name: REDIS_STREAM_NAME + value: my-stream + - name: REDIS_STREAM_CONSUMER_GROUP_NAME + value: consumer-group-1 + - name: REDIS_SENTINEL_MASTER + value: {{REDIS_SENTINEL_MASTER}} + - name: REDIS_PASSWORD + valueFrom: + secretKeyRef: + name: redis-password + key: password + - name: REDIS_SENTINEL_PASSWORD + valueFrom: + secretKeyRef: + name: redis-password + key: sentinelPassword +--- +apiVersion: keda.sh/v1alpha1 +kind: ScaledObject +metadata: + name: redis-streams-scaledobject +spec: + scaleTargetRef: + name: redis-streams-consumer + pollingInterval: 5 + cooldownPeriod: 10 + minReplicaCount: 1 + maxReplicaCount: 5 + triggers: + - type: redis-sentinel-streams + metadata: + hostsFromEnv: REDIS_HOSTS + portsFromEnv: REDIS_PORTS + sentinelPasswordFromEnv: REDIS_SENTINEL_MASTER + stream: my-stream + consumerGroup: consumer-group-1 + pendingEntriesCount: "10" + authenticationRef: + name: keda-redis-stream-triggerauth +` + +const producerDeployYaml = `apiVersion: batch/v1 +kind: Job +metadata: + name: redis-streams-producer +spec: + template: + spec: + containers: + - name: producer + image: ghcr.io/kedacore/tests-redis-sentinel-streams + command: ["./main"] + args: ["producer"] + imagePullPolicy: Always + env: + - name: REDIS_HOSTS + value: {{REDIS_HOSTS}} + - name: REDIS_PORTS + value: "26379" + - name: REDIS_STREAM_NAME + value: my-stream + - name: NUM_MESSAGES + value: "{{NUM_MESSAGES}}" + - name: REDIS_SENTINEL_MASTER + value: "{{REDIS_SENTINEL_MASTER}}" + - name: REDIS_PASSWORD + valueFrom: + secretKeyRef: + name: redis-password + key: password + - name: REDIS_SENTINEL_PASSWORD + valueFrom: + secretKeyRef: + name: redis-password + key: sentinelPassword + restartPolicy: Never +`