Skip to content

Commit 9787726

Browse files
committed
feature: ring.SetAddrs to add and remove shards by the ring client
tests: to scale in and out Signed-off-by: Sandor Szücs <sandor.szuecs@zalando.de>
1 parent 2465baa commit 9787726

File tree

2 files changed

+69
-8
lines changed

2 files changed

+69
-8
lines changed

ring.go

Lines changed: 30 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -220,25 +220,43 @@ type ringShards struct {
220220
}
221221

222222
func newRingShards(opt *RingOptions) *ringShards {
223-
shards := make(map[string]*ringShard, len(opt.Addrs))
223+
shards, list := createRingShards(opt, opt.Addrs)
224+
225+
c := &ringShards{
226+
opt: opt,
227+
228+
shards: shards,
229+
list: list,
230+
}
231+
c.rebalance()
232+
233+
return c
234+
}
235+
236+
func createRingShards(opt *RingOptions, addrs map[string]string) (map[string]*ringShard, []*ringShard) {
237+
shards := make(map[string]*ringShard, len(addrs))
224238
list := make([]*ringShard, 0, len(shards))
225239

226-
for name, addr := range opt.Addrs {
240+
for name, addr := range addrs {
227241
shard := newRingShard(opt, name, addr)
228242
shards[name] = shard
229243

230244
list = append(list, shard)
231245
}
246+
return shards, list
247+
}
232248

233-
c := &ringShards{
234-
opt: opt,
249+
func (c *ringShards) SetAddrs(addrs map[string]string) {
250+
shards, list := createRingShards(c.opt, addrs)
235251

236-
shards: shards,
237-
list: list,
252+
c.mu.Lock()
253+
if !c.closed {
254+
c.shards = shards
255+
c.list = list
238256
}
239-
c.rebalance()
257+
c.mu.Unlock()
240258

241-
return c
259+
c.rebalance()
242260
}
243261

244262
func (c *ringShards) List() []*ringShard {
@@ -447,6 +465,10 @@ func (c *Ring) WithContext(ctx context.Context) *Ring {
447465
return &clone
448466
}
449467

468+
func (c *Ring) SetAddrs(addrs map[string]string) {
469+
c.shards.SetAddrs(addrs)
470+
}
471+
450472
// Do creates a Cmd from the args and processes the cmd.
451473
func (c *Ring) Do(ctx context.Context, args ...interface{}) *Cmd {
452474
cmd := NewCmd(ctx, args...)

ring_test.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,45 @@ var _ = Describe("Redis Ring", func() {
113113
Expect(ringShard2.Info(ctx, "keyspace").Val()).To(ContainSubstring("keys=100"))
114114
})
115115

116+
Describe("[new] dynamic setting ring shards", func() {
117+
It("uses one shard after setting it to one shard", func() {
118+
Expect(ring.Len(), 2)
119+
ring.SetAddrs(map[string]string{
120+
"ringShardOne": ":" + ringShard1Port,
121+
})
122+
Expect(ring.Len(), 1)
123+
124+
ring.SetAddrs(map[string]string{
125+
"ringShardOne": ":" + ringShard1Port,
126+
"ringShardTwo": ":" + ringShard2Port,
127+
})
128+
129+
Expect(ring.Len(), 2)
130+
})
131+
132+
It("uses 3 shards after setting it to 3 shards", func() {
133+
Expect(ring.Len(), 2)
134+
135+
// Start ringShard3.
136+
var err error
137+
ringShard3, err = startRedis(ringShard3Port)
138+
Expect(err).NotTo(HaveOccurred())
139+
140+
ring.SetAddrs(map[string]string{
141+
"ringShardOne": ":" + ringShard1Port,
142+
"ringShardTwo": ":" + ringShard2Port,
143+
"ringShardThree": ":" + ringShard3Port,
144+
})
145+
Expect(ring.Len(), 3)
146+
147+
ring.SetAddrs(map[string]string{
148+
"ringShardOne": ":" + ringShard1Port,
149+
"ringShardTwo": ":" + ringShard2Port,
150+
})
151+
Expect(ring.Len(), 2)
152+
})
153+
154+
})
116155
Describe("pipeline", func() {
117156
It("distributes keys", func() {
118157
pipe := ring.Pipeline()

0 commit comments

Comments
 (0)