forked from bsm/redis-balancer
-
Notifications
You must be signed in to change notification settings - Fork 0
/
balancer.go
145 lines (123 loc) · 3.05 KB
/
balancer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
package balancer
import (
"sync/atomic"
"time"
"github.com/gomodule/redigo/redis"
)
// BalanceMode type
type BalanceMode int
const (
// ModeLeastConn picks the backend with the fewest connections.
ModeLeastConn BalanceMode = iota
// ModeFirstUp always picks the first available backend.
ModeFirstUp
// ModeMinLatency always picks the backend with the minimal latency.
ModeMinLatency
// ModeRandom selects backends randomly.
ModeRandom
// ModeWeightedLatency uses latency as a weight for random selection.
ModeWeightedLatency
// ModeRoundRobin round-robins across available backends.
ModeRoundRobin
)
const minCheckInterval = 100 * time.Millisecond
// Balancer client
type Balancer struct {
selector pool
mode BalanceMode
cursor int32
}
// New initializes a new redis balancer
func New(opts []*Options, mode BalanceMode) *Balancer {
if len(opts) == 0 {
opts = []*Options{
&Options{Network: "tcp", Addr: "127.0.0.1:6379", MaxIdle: 1},
}
}
balancer := &Balancer{
selector: make(pool, len(opts)),
mode: mode,
}
for i, opt := range opts {
if opt.MaxIdle == 0 {
opt.MaxIdle = 1
}
balancer.selector[i] = newRedisBackend(opt)
}
return balancer
}
// Next returns the next available redis client
func (b *Balancer) Next() *redis.Pool { return b.pickNext().client }
// Close closes all connecitons in the balancer
func (b *Balancer) Close() (err error) {
for _, b := range b.selector {
if e := b.Close(); e != nil {
err = e
}
}
return
}
// Pick the next backend
func (b *Balancer) pickNext() (backend *redisBackend) {
switch b.mode {
case ModeLeastConn:
backend = b.selector.MinUp(func(b *redisBackend) int64 {
return b.Connections()
})
case ModeFirstUp:
backend = b.selector.FirstUp()
case ModeMinLatency:
backend = b.selector.MinUp(func(b *redisBackend) int64 {
return int64(b.Latency())
})
case ModeRandom:
backend = b.selector.Up().Random()
case ModeWeightedLatency:
backend = b.selector.Up().WeightedRandom(func(b *redisBackend) int64 {
factor := int64(b.Latency())
return factor * factor
})
case ModeRoundRobin:
next := int(atomic.AddInt32(&b.cursor, 1))
backend = b.selector.Up().At(next)
}
// Fall back on random backend
if backend == nil {
backend = b.selector.Random()
}
// Increment the number of connections
backend.incConnections(1)
return
}
// --------------------------------------------------------------------
// Options for custom balancer
type Options struct {
Addr string
Network string
MaxIdle int
// Check interval, min 100ms, defaults to 1s
CheckInterval time.Duration
// Rise and Fall indicate the number of checks required to
// mark the instance as up or down, defaults to 1
Rise, Fall int
}
func (o *Options) getCheckInterval() time.Duration {
if o.CheckInterval == 0 {
return time.Second
} else if o.CheckInterval < minCheckInterval {
return minCheckInterval
}
return o.CheckInterval
}
func (o *Options) getRise() int {
if o.Rise < 1 {
return 1
}
return o.Rise
}
func (o *Options) getFall() int {
if o.Fall < 1 {
return 1
}
return o.Fall
}