Skip to content

Commit

Permalink
♻️ ✅ Refactoring internal/db/kvs/redis package (#590)
Browse files Browse the repository at this point in the history
* fix: refactoring

Signed-off-by: hlts2 <hiroto.funakoshi.hiroto@gmail.com>

* fix: apply gotests

Signed-off-by: hlts2 <hiroto.funakoshi.hiroto@gmail.com>

* feat: test code

Signed-off-by: hlts2 <hiroto.funakoshi.hiroto@gmail.com>

* fix: feedback of author

Signed-off-by: hlts2 <hiroto.funakoshi.hiroto@gmail.com>

* fix: apply suggestion

Signed-off-by: hlts2 <hiroto.funakoshi.hiroto@gmail.com>

* 🤖 Update license headers / Format go codes and yaml files

Signed-off-by: vdaas-ci <ci@vdaas.org>

Co-authored-by: vdaas-ci <ci@vdaas.org>
  • Loading branch information
hlts2 and vdaas-ci authored Jul 30, 2020
1 parent 5d8eb92 commit f793475
Show file tree
Hide file tree
Showing 4 changed files with 653 additions and 99 deletions.
1 change: 0 additions & 1 deletion internal/db/kvs/redis/option_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (

redis "github.com/go-redis/redis/v7"
"github.com/vdaas/vald/internal/net"

"go.uber.org/goleak"
)

Expand Down
154 changes: 87 additions & 67 deletions internal/db/kvs/redis/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,11 @@ import (
)

var (
// Nil is a type alias of redis.Nil.
Nil = redis.Nil
)

// Redis is an interface to manipulate Redis server.
type Redis interface {
TxPipeline() redis.Pipeliner
Ping() *StatusCmd
Expand All @@ -42,10 +44,16 @@ type Redis interface {
Deleter
}

type Conn = redis.Conn
type IntCmd = redis.IntCmd
type StringCmd = redis.StringCmd
type StatusCmd = redis.StatusCmd
type (
// Conn is a type alias of redis.Conn.
Conn = redis.Conn
// IntCmd is a type alias of redis.IntCmd.
IntCmd = redis.IntCmd
// StringCmd is a type alias of redis.StringCmd.
StringCmd = redis.StringCmd
// StatusCmd is a type alias of redis.StatusCmd.
StatusCmd = redis.StatusCmd
)

type redisClient struct {
addrs []string
Expand Down Expand Up @@ -75,89 +83,101 @@ type redisClient struct {
routeRandomly bool
tlsConfig *tls.Config
writeTimeout time.Duration
client Redis
}

// New returns Redis implementation if no error occurs.
func New(ctx context.Context, opts ...Option) (rc Redis, err error) {
r := new(redisClient)
for _, opt := range append(defaultOpts, opts...) {
if err = opt(r); err != nil {
return nil, errors.ErrOptionFailed(err, reflect.ValueOf(opt))
}
}
switch len(r.addrs) {

r, err = r.newRedisClient(ctx)
if err != nil {
return nil, err
}

return r.ping(ctx)
}

func (rc *redisClient) newRedisClient(ctx context.Context) (*redisClient, error) {
switch len(rc.addrs) {
case 0:
return nil, errors.ErrRedisAddrsNotFound
case 1:
if len(r.addrs[0]) == 0 {
if len(rc.addrs[0]) == 0 {
return nil, errors.ErrRedisAddrsNotFound
}
rc = redis.NewClient(&redis.Options{
Addr: r.addrs[0],
Password: r.password,
Dialer: r.dialer,
OnConnect: r.onConnect,
DB: r.db,
MaxRetries: r.maxRetries,
MinRetryBackoff: r.minRetryBackoff,
MaxRetryBackoff: r.maxRetryBackoff,
DialTimeout: r.dialTimeout,
ReadTimeout: r.readTimeout,
WriteTimeout: r.writeTimeout,
PoolSize: r.poolSize,
MinIdleConns: r.minIdleConns,
MaxConnAge: r.maxConnAge,
PoolTimeout: r.poolTimeout,
IdleTimeout: r.idleTimeout,
IdleCheckFrequency: r.idleCheckFrequency,
TLSConfig: r.tlsConfig,
rc.client = redis.NewClient(&redis.Options{
Addr: rc.addrs[0],
Password: rc.password,
Dialer: rc.dialer,
OnConnect: rc.onConnect,
DB: rc.db,
MaxRetries: rc.maxRetries,
MinRetryBackoff: rc.minRetryBackoff,
MaxRetryBackoff: rc.maxRetryBackoff,
DialTimeout: rc.dialTimeout,
ReadTimeout: rc.readTimeout,
WriteTimeout: rc.writeTimeout,
PoolSize: rc.poolSize,
MinIdleConns: rc.minIdleConns,
MaxConnAge: rc.maxConnAge,
PoolTimeout: rc.poolTimeout,
IdleTimeout: rc.idleTimeout,
IdleCheckFrequency: rc.idleCheckFrequency,
TLSConfig: rc.tlsConfig,
})
default:
rc = redis.NewClusterClient(&redis.ClusterOptions{
Addrs: r.addrs,
Dialer: r.dialer,
MaxRedirects: r.maxRedirects,
ReadOnly: r.readOnly,
RouteByLatency: r.routeByLatency,
RouteRandomly: r.routeRandomly,
ClusterSlots: r.clusterSlots,
OnNewNode: r.onNewNode,
OnConnect: r.onConnect,
Password: r.password,
MaxRetries: r.maxRetries,
MinRetryBackoff: r.minRetryBackoff,
MaxRetryBackoff: r.maxRetryBackoff,
DialTimeout: r.dialTimeout,
ReadTimeout: r.readTimeout,
WriteTimeout: r.writeTimeout,
PoolSize: r.poolSize,
MinIdleConns: r.minIdleConns,
MaxConnAge: r.maxConnAge,
PoolTimeout: r.poolTimeout,
IdleTimeout: r.idleTimeout,
IdleCheckFrequency: r.idleCheckFrequency,
TLSConfig: r.tlsConfig,
rc.client = redis.NewClusterClient(&redis.ClusterOptions{
Addrs: rc.addrs,
Dialer: rc.dialer,
MaxRedirects: rc.maxRedirects,
ReadOnly: rc.readOnly,
RouteByLatency: rc.routeByLatency,
RouteRandomly: rc.routeRandomly,
ClusterSlots: rc.clusterSlots,
OnNewNode: rc.onNewNode,
OnConnect: rc.onConnect,
Password: rc.password,
MaxRetries: rc.maxRetries,
MinRetryBackoff: rc.minRetryBackoff,
MaxRetryBackoff: rc.maxRetryBackoff,
DialTimeout: rc.dialTimeout,
ReadTimeout: rc.readTimeout,
WriteTimeout: rc.writeTimeout,
PoolSize: rc.poolSize,
MinIdleConns: rc.minIdleConns,
MaxConnAge: rc.maxConnAge,
PoolTimeout: rc.poolTimeout,
IdleTimeout: rc.idleTimeout,
IdleCheckFrequency: rc.idleCheckFrequency,
TLSConfig: rc.tlsConfig,
}).WithContext(ctx)
}

err = func() (err error) {
pctx, cancel := context.WithTimeout(ctx, r.initialPingTimeLimit)
defer cancel()
tick := time.NewTicker(r.initialPingDuration)
for {
select {
case <-pctx.Done():
return errors.Wrap(errors.Wrap(err, errors.ErrRedisConnectionPingFailed.Error()), ctx.Err().Error())
case <-tick.C:
err = rc.Ping().Err()
if err == nil {
return nil
}
log.Error(err)
return rc, nil
}

func (rc *redisClient) ping(ctx context.Context) (r Redis, err error) {
pctx, cancel := context.WithTimeout(ctx, rc.initialPingTimeLimit)
defer cancel()
tick := time.NewTicker(rc.initialPingDuration)
for {
select {
case <-pctx.Done():
err = errors.Wrap(errors.Wrap(err, errors.ErrRedisConnectionPingFailed.Error()), pctx.Err().Error())
log.Error(err)
return nil, err
case <-tick.C:
err = rc.client.Ping().Err()
if err == nil {
return rc.client, nil
}
log.Warn(err)
}
}()
if err != nil {
return nil, err
}
return rc, nil
}
53 changes: 53 additions & 0 deletions internal/db/kvs/redis/redis_mock_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
//
// Copyright (C) 2019-2020 Vdaas.org Vald team ( kpango, rinx, kmrmt )
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
package redis

import redis "github.com/go-redis/redis/v7"

type MockRedis struct {
TxPipelineFunc func() redis.Pipeliner
PingFunc func() *StatusCmd
CloseFunc func() error
GetFunc func(string) *redis.StringCmd
MGetFunc func(...string) *redis.SliceCmd
DelFunc func(keys ...string) *redis.IntCmd
}

var _ = (*MockRedis)(nil)

func (m *MockRedis) TxPipeline() redis.Pipeliner {
return m.TxPipelineFunc()
}

func (m *MockRedis) Ping() *StatusCmd {
return m.PingFunc()
}

func (m *MockRedis) Close() error {
return m.CloseFunc()
}

func (m *MockRedis) Get(key string) *redis.StringCmd {
return m.GetFunc(key)
}

func (m *MockRedis) MGet(keys ...string) *redis.SliceCmd {
return m.MGetFunc(keys...)
}

func (m *MockRedis) Del(keys ...string) *redis.IntCmd {
return m.DelFunc(keys...)
}
Loading

0 comments on commit f793475

Please sign in to comment.