Skip to content

Commit 464d53d

Browse files
Tactionpkedy
andauthored
Move the common Redis code to internal (#859)
* Move pubsub redis client to internal * Add failover for common redis. Refactor state redis. * refactor binding redis * fix lint * Refactor to setting partten * fix db and lint * fix lint * for backward compatibility * Using a type alias to handle decoding duration values of -1 for Redis. * Linter fixes Co-authored-by: Phil Kedy <phil.kedy@gmail.com>
1 parent d499387 commit 464d53d

File tree

13 files changed

+419
-534
lines changed

13 files changed

+419
-534
lines changed

bindings/redis/metadata.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,6 @@ package redis
88
import "time"
99

1010
type metadata struct {
11-
host string
12-
password string
13-
enableTLS bool
1411
maxRetries int
1512
maxRetryBackoff time.Duration
1613
}

bindings/redis/redis.go

Lines changed: 11 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -7,35 +7,32 @@ package redis
77

88
import (
99
"context"
10-
"crypto/tls"
1110
"errors"
1211
"fmt"
1312
"strconv"
1413
"time"
1514

15+
"github.com/go-redis/redis/v8"
16+
1617
"github.com/dapr/components-contrib/bindings"
18+
rediscomponent "github.com/dapr/components-contrib/internal/component/redis"
1719
"github.com/dapr/kit/logger"
18-
redis "github.com/go-redis/redis/v8"
1920
)
2021

2122
const (
22-
host = "redisHost"
23-
password = "redisPassword"
24-
enableTLS = "enableTLS"
2523
maxRetries = "maxRetries"
2624
maxRetryBackoff = "maxRetryBackoff"
2725
defaultBase = 10
2826
defaultBitSize = 0
29-
defaultDB = 0
3027
defaultMaxRetries = 3
3128
defaultMaxRetryBackoff = time.Second * 2
32-
defaultEnableTLS = false
3329
)
3430

3531
// Redis is a redis output binding
3632
type Redis struct {
37-
client *redis.Client
38-
logger logger.Logger
33+
client redis.UniversalClient
34+
clientSettings *rediscomponent.Settings
35+
logger logger.Logger
3936

4037
ctx context.Context
4138
cancel context.CancelFunc
@@ -48,32 +45,21 @@ func NewRedis(logger logger.Logger) *Redis {
4845

4946
// Init performs metadata parsing and connection creation
5047
func (r *Redis) Init(meta bindings.Metadata) error {
51-
m, err := r.parseMetadata(meta)
48+
_, err := r.parseMetadata(meta)
5249
if err != nil {
5350
return err
5451
}
5552

56-
opts := &redis.Options{
57-
Addr: m.host,
58-
Password: m.password,
59-
DB: defaultDB,
60-
MaxRetries: m.maxRetries,
61-
MaxRetryBackoff: m.maxRetryBackoff,
62-
}
63-
64-
/* #nosec */
65-
if m.enableTLS {
66-
opts.TLSConfig = &tls.Config{
67-
InsecureSkipVerify: m.enableTLS,
68-
}
53+
r.client, r.clientSettings, err = rediscomponent.ParseClientFromProperties(meta.Properties, nil)
54+
if err != nil {
55+
return err
6956
}
7057

71-
r.client = redis.NewClient(opts)
7258
r.ctx, r.cancel = context.WithCancel(context.Background())
7359

7460
_, err = r.client.Ping(r.ctx).Result()
7561
if err != nil {
76-
return fmt.Errorf("redis binding: error connecting to redis at %s: %s", m.host, err)
62+
return fmt.Errorf("redis binding: error connecting to redis at %s: %s", r.clientSettings.Host, err)
7763
}
7864

7965
return err
@@ -82,25 +68,6 @@ func (r *Redis) Init(meta bindings.Metadata) error {
8268
func (r *Redis) parseMetadata(meta bindings.Metadata) (metadata, error) {
8369
m := metadata{}
8470

85-
if val, ok := meta.Properties[host]; ok && val != "" {
86-
m.host = val
87-
} else {
88-
return m, errors.New("redis binding error: missing host address")
89-
}
90-
91-
if val, ok := meta.Properties[password]; ok && val != "" {
92-
m.password = val
93-
}
94-
95-
m.enableTLS = defaultEnableTLS
96-
if val, ok := meta.Properties[enableTLS]; ok && val != "" {
97-
tls, err := strconv.ParseBool(val)
98-
if err != nil {
99-
return m, fmt.Errorf("redis binding error: can't parse enableTLS field: %s", err)
100-
}
101-
m.enableTLS = tls
102-
}
103-
10471
m.maxRetries = defaultMaxRetries
10572
if val, ok := meta.Properties[maxRetries]; ok && val != "" {
10673
parsedVal, err := strconv.ParseInt(val, defaultBase, defaultBitSize)

bindings/redis/redis_test.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,6 @@ func TestParseMetadata(t *testing.T) {
2020
r := Redis{logger: logger.NewLogger("test")}
2121
redisM, err := r.parseMetadata(m)
2222
assert.Nil(t, err)
23-
assert.Equal(t, "host", redisM.host)
24-
assert.Equal(t, "password", redisM.password)
25-
assert.Equal(t, true, redisM.enableTLS)
2623
assert.Equal(t, 3, redisM.maxRetries)
2724
assert.Equal(t, time.Duration(10000), redisM.maxRetryBackoff)
2825
}

internal/component/redis/redis.go

Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
// ------------------------------------------------------------
2+
// Copyright (c) Microsoft Corporation and Dapr Contributors.
3+
// Licensed under the MIT License.
4+
// ------------------------------------------------------------
5+
6+
package redis
7+
8+
import (
9+
"crypto/tls"
10+
"fmt"
11+
"strings"
12+
"time"
13+
14+
"github.com/go-redis/redis/v8"
15+
)
16+
17+
const (
18+
ClusterType = "cluster"
19+
NodeType = "node"
20+
)
21+
22+
func ParseClientFromProperties(properties map[string]string, defaultSettings *Settings) (client redis.UniversalClient, settings *Settings, err error) {
23+
if defaultSettings == nil {
24+
settings = &Settings{}
25+
} else {
26+
settings = defaultSettings
27+
}
28+
err = settings.Decode(properties)
29+
if err != nil {
30+
return nil, nil, fmt.Errorf("redis client configuration error: %w", err)
31+
}
32+
if settings.Failover {
33+
return newFailoverClient(settings), settings, nil
34+
}
35+
36+
return newClient(settings), settings, nil
37+
}
38+
39+
func newFailoverClient(s *Settings) redis.UniversalClient {
40+
if s == nil {
41+
return nil
42+
}
43+
opts := &redis.FailoverOptions{
44+
DB: s.DB,
45+
MasterName: s.SentinelMasterName,
46+
SentinelAddrs: []string{s.Host},
47+
Password: s.Password,
48+
MaxRetries: s.RedisMaxRetries,
49+
MaxRetryBackoff: time.Duration(s.RedisMaxRetryInterval),
50+
MinRetryBackoff: time.Duration(s.RedisMinRetryInterval),
51+
DialTimeout: time.Duration(s.DialTimeout),
52+
ReadTimeout: time.Duration(s.ReadTimeout),
53+
WriteTimeout: time.Duration(s.WriteTimeout),
54+
PoolSize: s.PoolSize,
55+
MaxConnAge: time.Duration(s.MaxConnAge),
56+
MinIdleConns: s.MinIdleConns,
57+
PoolTimeout: time.Duration(s.PoolTimeout),
58+
IdleCheckFrequency: time.Duration(s.IdleCheckFrequency),
59+
IdleTimeout: time.Duration(s.IdleTimeout),
60+
}
61+
62+
/* #nosec */
63+
if s.EnableTLS {
64+
opts.TLSConfig = &tls.Config{
65+
InsecureSkipVerify: s.EnableTLS,
66+
}
67+
}
68+
69+
if s.RedisType == ClusterType {
70+
opts.SentinelAddrs = strings.Split(s.Host, ",")
71+
72+
return redis.NewFailoverClusterClient(opts)
73+
}
74+
75+
return redis.NewFailoverClient(opts)
76+
}
77+
78+
func newClient(s *Settings) redis.UniversalClient {
79+
if s == nil {
80+
return nil
81+
}
82+
if s.RedisType == ClusterType {
83+
options := &redis.ClusterOptions{
84+
Addrs: strings.Split(s.Host, ","),
85+
Password: s.Password,
86+
MaxRetries: s.RedisMaxRetries,
87+
MaxRetryBackoff: time.Duration(s.RedisMaxRetryInterval),
88+
MinRetryBackoff: time.Duration(s.RedisMinRetryInterval),
89+
DialTimeout: time.Duration(s.DialTimeout),
90+
ReadTimeout: time.Duration(s.ReadTimeout),
91+
WriteTimeout: time.Duration(s.WriteTimeout),
92+
PoolSize: s.PoolSize,
93+
MaxConnAge: time.Duration(s.MaxConnAge),
94+
MinIdleConns: s.MinIdleConns,
95+
PoolTimeout: time.Duration(s.PoolTimeout),
96+
IdleCheckFrequency: time.Duration(s.IdleCheckFrequency),
97+
IdleTimeout: time.Duration(s.IdleTimeout),
98+
}
99+
/* #nosec */
100+
if s.EnableTLS {
101+
options.TLSConfig = &tls.Config{
102+
InsecureSkipVerify: s.EnableTLS,
103+
}
104+
}
105+
106+
return redis.NewClusterClient(options)
107+
}
108+
109+
options := &redis.Options{
110+
Addr: s.Host,
111+
Password: s.Password,
112+
DB: s.DB,
113+
MaxRetries: s.RedisMaxRetries,
114+
MaxRetryBackoff: time.Duration(s.RedisMaxRetryInterval),
115+
MinRetryBackoff: time.Duration(s.RedisMinRetryInterval),
116+
DialTimeout: time.Duration(s.DialTimeout),
117+
ReadTimeout: time.Duration(s.ReadTimeout),
118+
WriteTimeout: time.Duration(s.WriteTimeout),
119+
PoolSize: s.PoolSize,
120+
MaxConnAge: time.Duration(s.MaxConnAge),
121+
MinIdleConns: s.MinIdleConns,
122+
PoolTimeout: time.Duration(s.PoolTimeout),
123+
IdleCheckFrequency: time.Duration(s.IdleCheckFrequency),
124+
IdleTimeout: time.Duration(s.IdleTimeout),
125+
}
126+
127+
/* #nosec */
128+
if s.EnableTLS {
129+
options.TLSConfig = &tls.Config{
130+
InsecureSkipVerify: s.EnableTLS,
131+
}
132+
}
133+
134+
return redis.NewClient(options)
135+
}
Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
// ------------------------------------------------------------
2+
// Copyright (c) Microsoft Corporation and Dapr Contributors.
3+
// Licensed under the MIT License.
4+
// ------------------------------------------------------------
5+
6+
package redis
7+
8+
import (
9+
"errors"
10+
"testing"
11+
"time"
12+
13+
"github.com/stretchr/testify/assert"
14+
)
15+
16+
const (
17+
host = "redisHost"
18+
password = "redisPassword"
19+
db = "redisDB"
20+
redisType = "redisType"
21+
redisMaxRetries = "redisMaxRetries"
22+
redisMinRetryInterval = "redisMinRetryInterval"
23+
redisMaxRetryInterval = "redisMaxRetryInterval"
24+
dialTimeout = "dialTimeout"
25+
readTimeout = "readTimeout"
26+
writeTimeout = "writeTimeout"
27+
poolSize = "poolSize"
28+
minIdleConns = "minIdleConns"
29+
poolTimeout = "poolTimeout"
30+
idleTimeout = "idleTimeout"
31+
idleCheckFrequency = "idleCheckFrequency"
32+
maxConnAge = "maxConnAge"
33+
enableTLS = "enableTLS"
34+
failover = "failover"
35+
sentinelMasterName = "sentinelMasterName"
36+
)
37+
38+
func getFakeProperties() map[string]string {
39+
return map[string]string{
40+
host: "fake.redis.com",
41+
password: "fakePassword",
42+
redisType: "node",
43+
enableTLS: "true",
44+
dialTimeout: "5s",
45+
readTimeout: "5s",
46+
writeTimeout: "50000",
47+
poolSize: "20",
48+
maxConnAge: "200s",
49+
db: "1",
50+
redisMaxRetries: "1",
51+
redisMinRetryInterval: "8ms",
52+
redisMaxRetryInterval: "1s",
53+
minIdleConns: "1",
54+
poolTimeout: "1s",
55+
idleTimeout: "1s",
56+
idleCheckFrequency: "1s",
57+
failover: "true",
58+
sentinelMasterName: "master",
59+
}
60+
}
61+
62+
func TestParseRedisMetadata(t *testing.T) {
63+
t.Run("ClientMetadata is correct", func(t *testing.T) {
64+
fakeProperties := getFakeProperties()
65+
66+
// act
67+
m := &Settings{}
68+
err := m.Decode(fakeProperties)
69+
70+
// assert
71+
assert.NoError(t, err)
72+
assert.Equal(t, fakeProperties[host], m.Host)
73+
assert.Equal(t, fakeProperties[password], m.Password)
74+
assert.Equal(t, fakeProperties[redisType], m.RedisType)
75+
assert.Equal(t, true, m.EnableTLS)
76+
assert.Equal(t, 5*time.Second, time.Duration(m.DialTimeout))
77+
assert.Equal(t, 5*time.Second, time.Duration(m.ReadTimeout))
78+
assert.Equal(t, 50000*time.Millisecond, time.Duration(m.WriteTimeout))
79+
assert.Equal(t, 20, m.PoolSize)
80+
assert.Equal(t, 200*time.Second, time.Duration(m.MaxConnAge))
81+
assert.Equal(t, 1, m.DB)
82+
assert.Equal(t, 1, m.RedisMaxRetries)
83+
assert.Equal(t, 8*time.Millisecond, time.Duration(m.RedisMinRetryInterval))
84+
assert.Equal(t, 1*time.Second, time.Duration(m.RedisMaxRetryInterval))
85+
assert.Equal(t, 1, m.MinIdleConns)
86+
assert.Equal(t, 1*time.Second, time.Duration(m.PoolTimeout))
87+
assert.Equal(t, 1*time.Second, time.Duration(m.IdleTimeout))
88+
assert.Equal(t, 1*time.Second, time.Duration(m.IdleCheckFrequency))
89+
assert.Equal(t, true, m.Failover)
90+
assert.Equal(t, "master", m.SentinelMasterName)
91+
})
92+
93+
t.Run("host is not given", func(t *testing.T) {
94+
fakeProperties := getFakeProperties()
95+
96+
fakeProperties[host] = ""
97+
98+
// act
99+
m := &Settings{}
100+
err := m.Decode(fakeProperties)
101+
102+
// assert
103+
assert.Error(t, errors.New("redis streams error: missing host address"), err)
104+
assert.Empty(t, m.Host)
105+
})
106+
107+
t.Run("check values can be set as -1", func(t *testing.T) {
108+
fakeProperties := getFakeProperties()
109+
110+
fakeProperties[readTimeout] = "-1"
111+
fakeProperties[idleTimeout] = "-1"
112+
fakeProperties[idleCheckFrequency] = "-1"
113+
fakeProperties[redisMaxRetryInterval] = "-1"
114+
fakeProperties[redisMinRetryInterval] = "-1"
115+
116+
// act
117+
m := &Settings{}
118+
err := m.Decode(fakeProperties)
119+
// assert
120+
assert.NoError(t, err)
121+
assert.True(t, m.ReadTimeout == -1)
122+
assert.True(t, m.IdleTimeout == -1)
123+
assert.True(t, m.IdleCheckFrequency == -1)
124+
assert.True(t, m.RedisMaxRetryInterval == -1)
125+
assert.True(t, m.RedisMinRetryInterval == -1)
126+
})
127+
}

0 commit comments

Comments
 (0)