From bebdfc8d96a1155f5d16eaeaf47583c161ed26b8 Mon Sep 17 00:00:00 2001 From: Alexander Yastrebov Date: Tue, 15 Nov 2022 01:29:51 +0100 Subject: [PATCH 1/3] test: adds TestRingSetAddrsAndRebalanceRace --- export_test.go | 7 ++--- ring_test.go | 69 ++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 73 insertions(+), 3 deletions(-) diff --git a/export_test.go b/export_test.go index f259fca0f..d37230d81 100644 --- a/export_test.go +++ b/export_test.go @@ -95,9 +95,10 @@ func GetSlavesAddrByName(ctx context.Context, c *SentinelClient, name string) [] } func (c *Ring) ShardByName(name string) *ringShard { - return c.sharding.ShardByName(name) + shard, _ := c.sharding.GetByName(name) + return shard } -func (c *ringSharding) ShardByName(name string) *ringShard { - return c.shards.m[name] +func (c *Ring) ShardByKey(key string) (*ringShard, error) { + return c.sharding.GetByKey(key) } diff --git a/ring_test.go b/ring_test.go index c64e107bb..be8354e6d 100644 --- a/ring_test.go +++ b/ring_test.go @@ -7,6 +7,7 @@ import ( "net" "strconv" "sync" + "testing" "time" . "github.com/onsi/ginkgo" @@ -739,3 +740,71 @@ var _ = Describe("Ring Tx timeout", func() { testTimeout() }) }) + +type fixedHash string + +func (h fixedHash) Get(string) string { + return string(h) +} + +func TestRingSetAddrsAndRebalanceRace(t *testing.T) { + const ( + ringShard1Name = "ringShardOne" + ringShard2Name = "ringShardTwo" + ) + + ring := redis.NewRing(&redis.RingOptions{ + Addrs: map[string]string{ + ringShard1Name: ":" + ringShard1Port, + }, + // Disable heartbeat + HeartbeatFrequency: 1 * time.Hour, + NewConsistentHash: func(shards []string) redis.ConsistentHash { + switch len(shards) { + case 1: + return fixedHash(ringShard1Name) + case 2: + return fixedHash(ringShard2Name) + default: + t.Fatalf("Unexpected number of shards: %v", shards) + return nil + } + }, + }) + + // Continuously update addresses by adding and removing one address + updatesDone := make(chan struct{}) + defer func() { close(updatesDone) }() + go func() { + for i := 0; ; i++ { + select { + case <-updatesDone: + return + default: + if i%2 == 0 { + ring.SetAddrs(map[string]string{ + ringShard1Name: ":" + ringShard1Port, + }) + } else { + ring.SetAddrs(map[string]string{ + ringShard1Name: ":" + ringShard1Port, + ringShard2Name: ":" + ringShard2Port, + }) + } + } + } + }() + + timer := time.NewTimer(1 * time.Second) + for running := true; running; { + select { + case <-timer.C: + running = false + default: + shard, err := ring.ShardByKey("whatever") + if err == nil && shard == nil { + t.Fatal("shard is nil") + } + } + } +} From 433975c45f5aa459ef4d73e86c0ff8b3aa4aae48 Mon Sep 17 00:00:00 2001 From: Alexander Yastrebov Date: Tue, 15 Nov 2022 01:30:58 +0100 Subject: [PATCH 2/3] fix: fixes ring.SetAddrs and rebalance race MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The change ensures atomic update of `c.hash` and `c.shards`. `BenchmarkRingRebalanceLocked` shows rebalance latency: ``` go test . -run=NONE -bench=BenchmarkRingRebalanceLocked -v -count=10 | benchstat /dev/stdin name time/op RingRebalanceLocked-8 8.50µs ±14% ``` (Note: it essentially reverts a46b053aa626a005a30dfb1ac4e096abcce1ef76) --- export_test.go | 4 ++++ ring.go | 34 +++++++++++++++------------------- ring_test.go | 18 ++++++++++++++++++ 3 files changed, 37 insertions(+), 19 deletions(-) diff --git a/export_test.go b/export_test.go index d37230d81..455f46008 100644 --- a/export_test.go +++ b/export_test.go @@ -102,3 +102,7 @@ func (c *Ring) ShardByName(name string) *ringShard { func (c *Ring) ShardByKey(key string) (*ringShard, error) { return c.sharding.GetByKey(key) } + +func (c *Ring) RebalanceLocked() { + c.sharding.rebalanceLocked() +} diff --git a/ring.go b/ring.go index fee2fe894..de1b0a32f 100644 --- a/ring.go +++ b/ring.go @@ -254,9 +254,9 @@ func (c *ringSharding) SetAddrs(addrs map[string]string) { shards, cleanup := c.newRingShards(addrs, c.shards) c.shards = shards + c.rebalanceLocked() c.mu.Unlock() - c.rebalance() cleanup() } @@ -388,7 +388,9 @@ func (c *ringSharding) Heartbeat(ctx context.Context, frequency time.Duration) { } if rebalance { - c.rebalance() + c.mu.Lock() + c.rebalanceLocked() + c.mu.Unlock() } case <-ctx.Done(): return @@ -396,32 +398,26 @@ func (c *ringSharding) Heartbeat(ctx context.Context, frequency time.Duration) { } } -// rebalance removes dead shards from the Ring. -func (c *ringSharding) rebalance() { - c.mu.RLock() - shards := c.shards - c.mu.RUnlock() - - if shards == nil { +// rebalanceLocked removes dead shards from the Ring. +// Requires c.mu locked. +func (c *ringSharding) rebalanceLocked() { + if c.closed { + return + } + if c.shards == nil { return } - liveShards := make([]string, 0, len(shards.m)) + liveShards := make([]string, 0, len(c.shards.m)) - for name, shard := range shards.m { + for name, shard := range c.shards.m { if shard.IsUp() { liveShards = append(liveShards, name) } } - hash := c.opt.NewConsistentHash(liveShards) - - c.mu.Lock() - if !c.closed { - c.hash = hash - c.numShard = len(liveShards) - } - c.mu.Unlock() + c.hash = c.opt.NewConsistentHash(liveShards) + c.numShard = len(liveShards) } func (c *ringSharding) Len() int { diff --git a/ring_test.go b/ring_test.go index be8354e6d..1447e8a70 100644 --- a/ring_test.go +++ b/ring_test.go @@ -808,3 +808,21 @@ func TestRingSetAddrsAndRebalanceRace(t *testing.T) { } } } + +func BenchmarkRingRebalanceLocked(b *testing.B) { + opts := &redis.RingOptions{ + Addrs: make(map[string]string), + // Disable heartbeat + HeartbeatFrequency: 1 * time.Hour, + } + for i := 0; i < 100; i++ { + opts.Addrs[fmt.Sprintf("shard%d", i)] = fmt.Sprintf(":63%02d", i) + } + + ring := redis.NewRing(opts) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + ring.RebalanceLocked() + } +} From 85a88bc64ccd876dca2e021bc8954150715993fa Mon Sep 17 00:00:00 2001 From: Alexander Yastrebov Date: Thu, 17 Nov 2022 22:46:52 +0100 Subject: [PATCH 3/3] test: move tests to internal_test.go --- export_test.go | 8 ----- internal_test.go | 93 ++++++++++++++++++++++++++++++++++++++++++++++++ ring_test.go | 87 -------------------------------------------- 3 files changed, 93 insertions(+), 95 deletions(-) diff --git a/export_test.go b/export_test.go index 455f46008..0d032351b 100644 --- a/export_test.go +++ b/export_test.go @@ -98,11 +98,3 @@ func (c *Ring) ShardByName(name string) *ringShard { shard, _ := c.sharding.GetByName(name) return shard } - -func (c *Ring) ShardByKey(key string) (*ringShard, error) { - return c.sharding.GetByKey(key) -} - -func (c *Ring) RebalanceLocked() { - c.sharding.rebalanceLocked() -} diff --git a/internal_test.go b/internal_test.go index b1dd0bdd7..fcf1235b5 100644 --- a/internal_test.go +++ b/internal_test.go @@ -1,6 +1,10 @@ package redis import ( + "fmt" + "testing" + "time" + . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" ) @@ -65,3 +69,92 @@ var _ = Describe("newClusterState", func() { }) }) }) + +type fixedHash string + +func (h fixedHash) Get(string) string { + return string(h) +} + +func TestRingSetAddrsAndRebalanceRace(t *testing.T) { + const ( + ringShard1Name = "ringShardOne" + ringShard2Name = "ringShardTwo" + + ringShard1Port = "6390" + ringShard2Port = "6391" + ) + + ring := NewRing(&RingOptions{ + Addrs: map[string]string{ + ringShard1Name: ":" + ringShard1Port, + }, + // Disable heartbeat + HeartbeatFrequency: 1 * time.Hour, + NewConsistentHash: func(shards []string) ConsistentHash { + switch len(shards) { + case 1: + return fixedHash(ringShard1Name) + case 2: + return fixedHash(ringShard2Name) + default: + t.Fatalf("Unexpected number of shards: %v", shards) + return nil + } + }, + }) + + // Continuously update addresses by adding and removing one address + updatesDone := make(chan struct{}) + defer func() { close(updatesDone) }() + go func() { + for i := 0; ; i++ { + select { + case <-updatesDone: + return + default: + if i%2 == 0 { + ring.SetAddrs(map[string]string{ + ringShard1Name: ":" + ringShard1Port, + }) + } else { + ring.SetAddrs(map[string]string{ + ringShard1Name: ":" + ringShard1Port, + ringShard2Name: ":" + ringShard2Port, + }) + } + } + } + }() + + timer := time.NewTimer(1 * time.Second) + for running := true; running; { + select { + case <-timer.C: + running = false + default: + shard, err := ring.sharding.GetByKey("whatever") + if err == nil && shard == nil { + t.Fatal("shard is nil") + } + } + } +} + +func BenchmarkRingShardingRebalanceLocked(b *testing.B) { + opts := &RingOptions{ + Addrs: make(map[string]string), + // Disable heartbeat + HeartbeatFrequency: 1 * time.Hour, + } + for i := 0; i < 100; i++ { + opts.Addrs[fmt.Sprintf("shard%d", i)] = fmt.Sprintf(":63%02d", i) + } + + ring := NewRing(opts) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + ring.sharding.rebalanceLocked() + } +} diff --git a/ring_test.go b/ring_test.go index 1447e8a70..c64e107bb 100644 --- a/ring_test.go +++ b/ring_test.go @@ -7,7 +7,6 @@ import ( "net" "strconv" "sync" - "testing" "time" . "github.com/onsi/ginkgo" @@ -740,89 +739,3 @@ var _ = Describe("Ring Tx timeout", func() { testTimeout() }) }) - -type fixedHash string - -func (h fixedHash) Get(string) string { - return string(h) -} - -func TestRingSetAddrsAndRebalanceRace(t *testing.T) { - const ( - ringShard1Name = "ringShardOne" - ringShard2Name = "ringShardTwo" - ) - - ring := redis.NewRing(&redis.RingOptions{ - Addrs: map[string]string{ - ringShard1Name: ":" + ringShard1Port, - }, - // Disable heartbeat - HeartbeatFrequency: 1 * time.Hour, - NewConsistentHash: func(shards []string) redis.ConsistentHash { - switch len(shards) { - case 1: - return fixedHash(ringShard1Name) - case 2: - return fixedHash(ringShard2Name) - default: - t.Fatalf("Unexpected number of shards: %v", shards) - return nil - } - }, - }) - - // Continuously update addresses by adding and removing one address - updatesDone := make(chan struct{}) - defer func() { close(updatesDone) }() - go func() { - for i := 0; ; i++ { - select { - case <-updatesDone: - return - default: - if i%2 == 0 { - ring.SetAddrs(map[string]string{ - ringShard1Name: ":" + ringShard1Port, - }) - } else { - ring.SetAddrs(map[string]string{ - ringShard1Name: ":" + ringShard1Port, - ringShard2Name: ":" + ringShard2Port, - }) - } - } - } - }() - - timer := time.NewTimer(1 * time.Second) - for running := true; running; { - select { - case <-timer.C: - running = false - default: - shard, err := ring.ShardByKey("whatever") - if err == nil && shard == nil { - t.Fatal("shard is nil") - } - } - } -} - -func BenchmarkRingRebalanceLocked(b *testing.B) { - opts := &redis.RingOptions{ - Addrs: make(map[string]string), - // Disable heartbeat - HeartbeatFrequency: 1 * time.Hour, - } - for i := 0; i < 100; i++ { - opts.Addrs[fmt.Sprintf("shard%d", i)] = fmt.Sprintf(":63%02d", i) - } - - ring := redis.NewRing(opts) - - b.ResetTimer() - for i := 0; i < b.N; i++ { - ring.RebalanceLocked() - } -}