-
Notifications
You must be signed in to change notification settings - Fork 0
/
twunproxy.go
194 lines (165 loc) · 6.01 KB
/
twunproxy.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
package twunproxy
import (
"errors"
"gopkg.in/yaml.v2"
"io/ioutil"
"sync"
)
// Conn interface represents the minimum implemented signature for underlying Redis connections.
type Conn interface {
Close() error
Do(commandName string, args ...interface{}) (reply interface{}, err error)
}
// ConnGetter is the interface that underlying Redis connection pools should implement.
type ConnGetter interface {
Get() Conn
}
// RedisPoolConfig represents one named pool from a Twemproxy configuration file.
type redisPoolConfig struct {
Servers []string `yaml:"servers"`
Auth string `yaml:"redis_auth"`
}
// RedisReturn allows us to pass Redis command returns around as a single value.
type redisReturn struct {
val interface{}
err error
}
// RedisCmd is a container for all the requisite properties of a Redis command.
// Assumed usage is for commands where the key is the first argument after the command name.
type RedisCmd struct {
name string
key string
args []interface{}
}
// The 'Do' command accepts a variadic list of args after the command name.
// We need to create a single slice.
func (c *RedisCmd) getArgs() []interface{} {
return append([]interface{}{c.key}, c.args...)
}
// ProxyConn maintains its own slice of Redis connection pools and mappings of Redis keys to pools.
type ProxyConn struct {
Pools []ConnGetter
KeyInstance map[string]ConnGetter
keyInstanceMutex *sync.RWMutex
}
// CreatePool is the signature for returning a connection pool based on the input Redis address and auth strings.
type CreatePool func(string, string) ConnGetter
// NewProxyConn creates a proxy for all of the connections in a Twemproxy-fronted pool.
// Read the Twemproxy configuration file from the input path.
// Instantiate a ProxyConn based on the input pool name.
// Initialise a key-to-pool mapping with the input initial capacity.
func NewProxyConn(confPath, poolName string, keyCap int, create CreatePool) (*ProxyConn, error) {
f, err := ioutil.ReadFile(confPath)
if err != nil {
return nil, err
}
var m map[string]redisPoolConfig
if err := yaml.Unmarshal(f, &m); err != nil {
return nil, err
}
conf := m[poolName]
pools := make([]ConnGetter, len(conf.Servers))
// For each instance described in the Twemproxy configuration, create a connection pool.
// Execute a PING command to check that it is valid and available.
for i, def := range conf.Servers {
p := create(def, conf.Auth)
c := p.Get()
defer c.Close()
if _, err := c.Do("PING"); err != nil {
return nil, err
}
pools[i] = p
}
proxy := new(ProxyConn)
proxy.Pools = pools
proxy.KeyInstance = make(map[string]ConnGetter, keyCap)
proxy.keyInstanceMutex = new(sync.RWMutex)
return proxy, nil
}
// Do runs the input command against the cluster.
// If we already have a pool mapped to the command key, just run it there and return the result.
// Otherwise set up Goroutines running against each connection in the pool.
// The Goroutines will terminate upon the first successful Redis command return.
// NOTE: Blocking commands should be issued with a timeout or risk blocking permanently.
func (r *ProxyConn) Do(cmd *RedisCmd, canMap func(interface{}) bool) (interface{}, error) {
// If we have already determined the instance for this key, just run it.
// Unlock as soon as possible.
pool, ok := func() (ConnGetter, bool) {
r.keyInstanceMutex.RLock()
defer r.keyInstanceMutex.RUnlock()
pool, ok := r.KeyInstance[cmd.key]
return pool, ok
}()
if ok {
conn := pool.Get()
defer conn.Close()
return conn.Do(cmd.name, cmd.getArgs()...)
}
// Start the command on each of the pools and receive results on a channel.
results := make(chan redisReturn)
wg := new(sync.WaitGroup)
stop := make([]chan bool, len(r.Pools))
for i := range r.Pools {
// Buffer prevents blocking when sending stop commands to completed Goroutines.
stop[i] = make(chan bool, 1)
wg.Add(1)
go r.doInstance(i, cmd, canMap, results, stop[i], wg)
}
// Wait for the first accepted Redis command result then send a message on the stop channel to other Goroutines.
// Goroutines started above will detect this condition and complete.
res := redisReturn{val: nil, err: errors.New("No results returned that could determine a key mapping.")}
go func() {
for rr := range results {
res = rr
for _, c := range stop {
c <- true
}
}
}()
// Wait for all the Redis connections to run their operations.
wg.Wait()
close(results)
return res.val, res.err
}
// Runs the input Redis command against a connection from the input pool.
// If the canMap test returns true for the result, the key is mapped to the pool.
// The result is then sent on the result channel, which causes a subsequent message on the stop channel.
// Any Redis command return causes the wait group to be notified and a return from the method.
// The last remaining path is for the a message on the stop channel before a return is received from the Redis command.
// This causes wait group notification and return.
func (r *ProxyConn) doInstance(
pIdx int,
cmd *RedisCmd,
canMap func(interface{}) bool,
res chan redisReturn,
stop chan bool,
wg *sync.WaitGroup) {
defer wg.Done()
pool := r.Pools[pIdx]
// This is outside the Goroutine below to ensure connection closure.
conn := pool.Get()
defer conn.Close()
// Start the command on a new Goroutine.
// If we receive a return, test it and add a mapping if we have located the instance correctly.
// If we have, send the return on the results channel.
// NOTE: Bad canMap definitions can result in panics here.
// If the definition returns true for more than one result, there will be an attempted write to a closed channel.
cmdDone := make(chan bool)
go func() {
if val, err := conn.Do(cmd.name, cmd.getArgs()...); canMap(val) {
r.keyInstanceMutex.Lock()
defer r.keyInstanceMutex.Unlock()
r.KeyInstance[cmd.key] = pool
res <- redisReturn{val: val, err: err}
} else {
cmdDone <- true
}
}()
// Wait for completion of this command or notification of accepted return from any others.
select {
case <-stop:
return
case <-cmdDone:
return
}
}