Skip to content

Commit d1c1751

Browse files
committed
feat: ring.SetAddrs to add and remove shards by the ring client and reuse old connections
test: ring scale-in and scale-out rewrite as suggested by @AlexanderYastrebov Signed-off-by: Sandor Szücs <sandor.szuecs@zalando.de>
1 parent e061db8 commit d1c1751

File tree

3 files changed

+155
-14
lines changed

3 files changed

+155
-14
lines changed

export_test.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,3 +93,11 @@ func GetSlavesAddrByName(ctx context.Context, c *SentinelClient, name string) []
9393
}
9494
return parseReplicaAddrs(addrs, false)
9595
}
96+
97+
func (c *Ring) GetAddr(addr string) *ringShard {
98+
return c.shards.GetAddr(addr)
99+
}
100+
101+
func (c *ringShards) GetAddr(addr string) *ringShard {
102+
return c.shards[addr]
103+
}

ring.go

Lines changed: 74 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,7 @@ func (opt *RingOptions) clientOptions() *Options {
160160
type ringShard struct {
161161
Client *Client
162162
down int32
163+
addr string
163164
}
164165

165166
func newRingShard(opt *RingOptions, name, addr string) *ringShard {
@@ -168,6 +169,7 @@ func newRingShard(opt *RingOptions, name, addr string) *ringShard {
168169

169170
return &ringShard{
170171
Client: opt.NewClient(name, clopt),
172+
addr: addr,
171173
}
172174
}
173175

@@ -212,33 +214,68 @@ type ringShards struct {
212214
opt *RingOptions
213215

214216
mu sync.RWMutex
217+
muClose sync.Mutex
215218
hash ConsistentHash
216-
shards map[string]*ringShard // read only
217-
list []*ringShard // read only
219+
shards map[string]*ringShard // read only, updated by SetAddrs
220+
list []*ringShard // read only, updated by SetAddrs
218221
numShard int
219222
closed bool
220223
}
221224

222225
func newRingShards(opt *RingOptions) *ringShards {
223-
shards := make(map[string]*ringShard, len(opt.Addrs))
224-
list := make([]*ringShard, 0, len(shards))
226+
c := &ringShards{
227+
opt: opt,
228+
}
229+
c.SetAddrs(opt.Addrs)
225230

226-
for name, addr := range opt.Addrs {
227-
shard := newRingShard(opt, name, addr)
228-
shards[name] = shard
231+
return c
232+
}
229233

230-
list = append(list, shard)
234+
// SetAddrs replaces the shards in use, such that you can increase and
235+
// decrease number of shards, that you use. It will reuse shards that
236+
// existed before and close the ones that will not be used anymore.
237+
func (c *ringShards) SetAddrs(addrs map[string]string) {
238+
c.muClose.Lock()
239+
defer c.muClose.Unlock()
240+
if c.closed {
241+
return
231242
}
232243

233-
c := &ringShards{
234-
opt: opt,
244+
shards := make(map[string]*ringShard)
245+
unusedShards := make(map[string]*ringShard)
246+
247+
for k, shard := range c.shards {
248+
if addr, ok := addrs[k]; ok && shard.addr == addr {
249+
shards[k] = shard
250+
} else {
251+
unusedShards[k] = shard
252+
}
253+
}
235254

236-
shards: shards,
237-
list: list,
255+
for k, addr := range addrs {
256+
if shard, ok := c.shards[k]; !ok || shard.addr != addr {
257+
shards[k] = newRingShard(c.opt, k, addr)
258+
}
238259
}
239-
c.rebalance()
240260

241-
return c
261+
list := make([]*ringShard, 0, len(shards))
262+
for _, shard := range shards {
263+
list = append(list, shard)
264+
}
265+
266+
c.mu.Lock()
267+
c.shards = shards
268+
c.list = list
269+
270+
c.rebalanceLocked()
271+
c.mu.Unlock()
272+
273+
for k, shard := range unusedShards {
274+
err := shard.Client.Close()
275+
if err != nil {
276+
internal.Logger.Printf(context.Background(), "Failed to close ring shard client %s %s: %v", k, shard.addr, err)
277+
}
278+
}
242279
}
243280

244281
func (c *ringShards) List() []*ringShard {
@@ -355,6 +392,23 @@ func (c *ringShards) rebalance() {
355392
c.mu.Unlock()
356393
}
357394

395+
// rebalanceLocked removes dead shards from the Ring and callers need to hold the locl
396+
func (c *ringShards) rebalanceLocked() {
397+
shards := c.shards
398+
liveShards := make([]string, 0, len(shards))
399+
400+
for name, shard := range shards {
401+
if shard.IsUp() {
402+
liveShards = append(liveShards, name)
403+
}
404+
}
405+
406+
hash := c.opt.NewConsistentHash(liveShards)
407+
408+
c.hash = hash
409+
c.numShard = len(liveShards)
410+
}
411+
358412
func (c *ringShards) Len() int {
359413
c.mu.RLock()
360414
l := c.numShard
@@ -363,6 +417,8 @@ func (c *ringShards) Len() int {
363417
}
364418

365419
func (c *ringShards) Close() error {
420+
c.muClose.Lock()
421+
defer c.muClose.Unlock()
366422
c.mu.Lock()
367423
defer c.mu.Unlock()
368424

@@ -430,6 +486,10 @@ func NewRing(opt *RingOptions) *Ring {
430486
return &ring
431487
}
432488

489+
func (c *Ring) SetAddrs(ctx context.Context, addrs map[string]string) {
490+
c.shards.SetAddrs(addrs)
491+
}
492+
433493
// Do creates a Cmd from the args and processes the cmd.
434494
func (c *Ring) Do(ctx context.Context, args ...interface{}) *Cmd {
435495
cmd := NewCmd(ctx, args...)

ring_test.go

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,79 @@ var _ = Describe("Redis Ring", func() {
113113
Expect(ringShard2.Info(ctx, "keyspace").Val()).To(ContainSubstring("keys=100"))
114114
})
115115

116+
Describe("[new] dynamic setting ring shards", func() {
117+
It("downscale shard and check reuse shard, upscale shard and check reuse", func() {
118+
Expect(ring.Len(), 2)
119+
120+
wantShard := ring.GetAddr("ringShardOne")
121+
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
122+
defer cancel()
123+
ring.SetAddrs(ctx, map[string]string{
124+
"ringShardOne": ":" + ringShard1Port,
125+
})
126+
Expect(ring.Len(), 1)
127+
gotShard := ring.GetAddr("ringShardOne")
128+
Expect(gotShard).To(Equal(wantShard))
129+
130+
ctx, cancel = context.WithTimeout(context.Background(), 10*time.Second)
131+
defer cancel()
132+
ring.SetAddrs(ctx, map[string]string{
133+
"ringShardOne": ":" + ringShard1Port,
134+
"ringShardTwo": ":" + ringShard2Port,
135+
})
136+
Expect(ring.Len(), 2)
137+
gotShard = ring.GetAddr("ringShardOne")
138+
Expect(gotShard).To(Equal(wantShard))
139+
140+
})
141+
142+
It("uses 3 shards after setting it to 3 shards", func() {
143+
Expect(ring.Len(), 2)
144+
145+
// Start ringShard3.
146+
var err error
147+
ringShard3, err = startRedis(ringShard3Port)
148+
Expect(err).NotTo(HaveOccurred())
149+
150+
shardName1 := "ringShardOne"
151+
shardAddr1 := ":" + ringShard1Port
152+
wantShard1 := ring.GetAddr(shardName1)
153+
shardName2 := "ringShardTwo"
154+
shardAddr2 := ":" + ringShard2Port
155+
wantShard2 := ring.GetAddr(shardName2)
156+
shardName3 := "ringShardThree"
157+
shardAddr3 := ":" + ringShard3Port
158+
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
159+
defer cancel()
160+
ring.SetAddrs(ctx, map[string]string{
161+
shardName1: shardAddr1,
162+
shardName2: shardAddr2,
163+
shardName3: shardAddr3,
164+
})
165+
Expect(ring.Len(), 3)
166+
gotShard1 := ring.GetAddr(shardName1)
167+
gotShard2 := ring.GetAddr(shardName2)
168+
gotShard3 := ring.GetAddr(shardName3)
169+
Expect(gotShard1).To(Equal(wantShard1))
170+
Expect(gotShard2).To(Equal(wantShard2))
171+
Expect(gotShard3).ToNot(BeNil())
172+
173+
ctx, cancel = context.WithTimeout(context.Background(), 10*time.Second)
174+
defer cancel()
175+
ring.SetAddrs(ctx, map[string]string{
176+
shardName1: shardAddr1,
177+
shardName2: shardAddr2,
178+
})
179+
Expect(ring.Len(), 2)
180+
gotShard1 = ring.GetAddr(shardName1)
181+
gotShard2 = ring.GetAddr(shardName2)
182+
gotShard3 = ring.GetAddr(shardName3)
183+
Expect(gotShard1).To(Equal(wantShard1))
184+
Expect(gotShard2).To(Equal(wantShard2))
185+
Expect(gotShard3).To(BeNil())
186+
})
187+
188+
})
116189
Describe("pipeline", func() {
117190
It("distributes keys", func() {
118191
pipe := ring.Pipeline()

0 commit comments

Comments
 (0)