Skip to content

Commit d83436b

Browse files
fix: fixes ring.SetAddrs and rebalance race (#2283)
* fix: fixes ring.SetAddrs and rebalance race The change ensures atomic update of `c.hash` and `c.shards`. `BenchmarkRingRebalanceLocked` shows rebalance latency: ``` go test . -run=NONE -bench=BenchmarkRingRebalanceLocked -v -count=10 | benchstat /dev/stdin name time/op RingRebalanceLocked-8 8.50µs ±14% ``` (Note: it essentially reverts a46b053)
1 parent bbff4dd commit d83436b

File tree

3 files changed

+110
-24
lines changed

3 files changed

+110
-24
lines changed

export_test.go

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -95,9 +95,6 @@ func GetSlavesAddrByName(ctx context.Context, c *SentinelClient, name string) []
9595
}
9696

9797
func (c *Ring) ShardByName(name string) *ringShard {
98-
return c.sharding.ShardByName(name)
99-
}
100-
101-
func (c *ringSharding) ShardByName(name string) *ringShard {
102-
return c.shards.m[name]
98+
shard, _ := c.sharding.GetByName(name)
99+
return shard
103100
}

internal_test.go

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
package redis
22

33
import (
4+
"fmt"
5+
"testing"
6+
"time"
7+
48
. "github.com/onsi/ginkgo"
59
. "github.com/onsi/gomega"
610
)
@@ -65,3 +69,92 @@ var _ = Describe("newClusterState", func() {
6569
})
6670
})
6771
})
72+
73+
type fixedHash string
74+
75+
func (h fixedHash) Get(string) string {
76+
return string(h)
77+
}
78+
79+
func TestRingSetAddrsAndRebalanceRace(t *testing.T) {
80+
const (
81+
ringShard1Name = "ringShardOne"
82+
ringShard2Name = "ringShardTwo"
83+
84+
ringShard1Port = "6390"
85+
ringShard2Port = "6391"
86+
)
87+
88+
ring := NewRing(&RingOptions{
89+
Addrs: map[string]string{
90+
ringShard1Name: ":" + ringShard1Port,
91+
},
92+
// Disable heartbeat
93+
HeartbeatFrequency: 1 * time.Hour,
94+
NewConsistentHash: func(shards []string) ConsistentHash {
95+
switch len(shards) {
96+
case 1:
97+
return fixedHash(ringShard1Name)
98+
case 2:
99+
return fixedHash(ringShard2Name)
100+
default:
101+
t.Fatalf("Unexpected number of shards: %v", shards)
102+
return nil
103+
}
104+
},
105+
})
106+
107+
// Continuously update addresses by adding and removing one address
108+
updatesDone := make(chan struct{})
109+
defer func() { close(updatesDone) }()
110+
go func() {
111+
for i := 0; ; i++ {
112+
select {
113+
case <-updatesDone:
114+
return
115+
default:
116+
if i%2 == 0 {
117+
ring.SetAddrs(map[string]string{
118+
ringShard1Name: ":" + ringShard1Port,
119+
})
120+
} else {
121+
ring.SetAddrs(map[string]string{
122+
ringShard1Name: ":" + ringShard1Port,
123+
ringShard2Name: ":" + ringShard2Port,
124+
})
125+
}
126+
}
127+
}
128+
}()
129+
130+
timer := time.NewTimer(1 * time.Second)
131+
for running := true; running; {
132+
select {
133+
case <-timer.C:
134+
running = false
135+
default:
136+
shard, err := ring.sharding.GetByKey("whatever")
137+
if err == nil && shard == nil {
138+
t.Fatal("shard is nil")
139+
}
140+
}
141+
}
142+
}
143+
144+
func BenchmarkRingShardingRebalanceLocked(b *testing.B) {
145+
opts := &RingOptions{
146+
Addrs: make(map[string]string),
147+
// Disable heartbeat
148+
HeartbeatFrequency: 1 * time.Hour,
149+
}
150+
for i := 0; i < 100; i++ {
151+
opts.Addrs[fmt.Sprintf("shard%d", i)] = fmt.Sprintf(":63%02d", i)
152+
}
153+
154+
ring := NewRing(opts)
155+
156+
b.ResetTimer()
157+
for i := 0; i < b.N; i++ {
158+
ring.sharding.rebalanceLocked()
159+
}
160+
}

ring.go

Lines changed: 15 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -254,9 +254,9 @@ func (c *ringSharding) SetAddrs(addrs map[string]string) {
254254

255255
shards, cleanup := c.newRingShards(addrs, c.shards)
256256
c.shards = shards
257+
c.rebalanceLocked()
257258
c.mu.Unlock()
258259

259-
c.rebalance()
260260
cleanup()
261261
}
262262

@@ -388,40 +388,36 @@ func (c *ringSharding) Heartbeat(ctx context.Context, frequency time.Duration) {
388388
}
389389

390390
if rebalance {
391-
c.rebalance()
391+
c.mu.Lock()
392+
c.rebalanceLocked()
393+
c.mu.Unlock()
392394
}
393395
case <-ctx.Done():
394396
return
395397
}
396398
}
397399
}
398400

399-
// rebalance removes dead shards from the Ring.
400-
func (c *ringSharding) rebalance() {
401-
c.mu.RLock()
402-
shards := c.shards
403-
c.mu.RUnlock()
404-
405-
if shards == nil {
401+
// rebalanceLocked removes dead shards from the Ring.
402+
// Requires c.mu locked.
403+
func (c *ringSharding) rebalanceLocked() {
404+
if c.closed {
405+
return
406+
}
407+
if c.shards == nil {
406408
return
407409
}
408410

409-
liveShards := make([]string, 0, len(shards.m))
411+
liveShards := make([]string, 0, len(c.shards.m))
410412

411-
for name, shard := range shards.m {
413+
for name, shard := range c.shards.m {
412414
if shard.IsUp() {
413415
liveShards = append(liveShards, name)
414416
}
415417
}
416418

417-
hash := c.opt.NewConsistentHash(liveShards)
418-
419-
c.mu.Lock()
420-
if !c.closed {
421-
c.hash = hash
422-
c.numShard = len(liveShards)
423-
}
424-
c.mu.Unlock()
419+
c.hash = c.opt.NewConsistentHash(liveShards)
420+
c.numShard = len(liveShards)
425421
}
426422

427423
func (c *ringSharding) Len() int {

0 commit comments

Comments
 (0)