Skip to content

fix: reduce SetAddrs shards lock contention #2292

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 118 additions & 0 deletions internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ package redis
import (
"context"
"fmt"
"reflect"
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -107,6 +110,7 @@ func TestRingSetAddrsAndRebalanceRace(t *testing.T) {
}
},
})
defer ring.Close()

// Continuously update addresses by adding and removing one address
updatesDone := make(chan struct{})
Expand Down Expand Up @@ -156,13 +160,127 @@ func BenchmarkRingShardingRebalanceLocked(b *testing.B) {
}

ring := NewRing(opts)
defer ring.Close()

b.ResetTimer()
for i := 0; i < b.N; i++ {
ring.sharding.rebalanceLocked()
}
}

type testCounter struct {
mu sync.Mutex
t *testing.T
m map[string]int
}

func newTestCounter(t *testing.T) *testCounter {
return &testCounter{t: t, m: make(map[string]int)}
}

func (ct *testCounter) increment(key string) {
ct.mu.Lock()
defer ct.mu.Unlock()
ct.m[key]++
}

func (ct *testCounter) expect(values map[string]int) {
ct.mu.Lock()
defer ct.mu.Unlock()
ct.t.Helper()
if !reflect.DeepEqual(values, ct.m) {
ct.t.Errorf("expected %v != actual %v", values, ct.m)
}
}

func TestRingShardsCleanup(t *testing.T) {
const (
ringShard1Name = "ringShardOne"
ringShard2Name = "ringShardTwo"

ringShard1Addr = "shard1.test"
ringShard2Addr = "shard2.test"
)

t.Run("closes unused shards", func(t *testing.T) {
closeCounter := newTestCounter(t)

ring := NewRing(&RingOptions{
Addrs: map[string]string{
ringShard1Name: ringShard1Addr,
ringShard2Name: ringShard2Addr,
},
NewClient: func(opt *Options) *Client {
c := NewClient(opt)
c.baseClient.onClose = func() error {
closeCounter.increment(opt.Addr)
return nil
}
return c
},
})
closeCounter.expect(map[string]int{})

// no change due to the same addresses
ring.SetAddrs(map[string]string{
ringShard1Name: ringShard1Addr,
ringShard2Name: ringShard2Addr,
})
closeCounter.expect(map[string]int{})

ring.SetAddrs(map[string]string{
ringShard1Name: ringShard1Addr,
})
closeCounter.expect(map[string]int{ringShard2Addr: 1})

ring.SetAddrs(map[string]string{
ringShard2Name: ringShard2Addr,
})
closeCounter.expect(map[string]int{ringShard1Addr: 1, ringShard2Addr: 1})

ring.Close()
closeCounter.expect(map[string]int{ringShard1Addr: 1, ringShard2Addr: 2})
})

t.Run("closes created shards if ring was closed", func(t *testing.T) {
createCounter := newTestCounter(t)
closeCounter := newTestCounter(t)

var (
ring *Ring
shouldClose int32
)

ring = NewRing(&RingOptions{
Addrs: map[string]string{
ringShard1Name: ringShard1Addr,
},
NewClient: func(opt *Options) *Client {
if atomic.LoadInt32(&shouldClose) != 0 {
ring.Close()
}
createCounter.increment(opt.Addr)
c := NewClient(opt)
c.baseClient.onClose = func() error {
closeCounter.increment(opt.Addr)
return nil
}
return c
},
})
createCounter.expect(map[string]int{ringShard1Addr: 1})
closeCounter.expect(map[string]int{})

atomic.StoreInt32(&shouldClose, 1)

ring.SetAddrs(map[string]string{
ringShard2Name: ringShard2Addr,
})
createCounter.expect(map[string]int{ringShard1Addr: 1, ringShard2Addr: 1})
closeCounter.expect(map[string]int{ringShard1Addr: 1, ringShard2Addr: 1})
})
}

//------------------------------------------------------------------------------

type timeoutErr struct {
Expand Down
68 changes: 41 additions & 27 deletions ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,10 @@ type ringSharding struct {
hash ConsistentHash
numShard int
onNewNode []func(rdb *Client)

// ensures exclusive access to SetAddrs so there is no need
// to hold mu for the duration of potentially long shard creation
setAddrsMu sync.Mutex
}

type ringShards struct {
Expand All @@ -245,46 +249,62 @@ func (c *ringSharding) OnNewNode(fn func(rdb *Client)) {
// decrease number of shards, that you use. It will reuse shards that
// existed before and close the ones that will not be used anymore.
func (c *ringSharding) SetAddrs(addrs map[string]string) {
c.mu.Lock()
c.setAddrsMu.Lock()
defer c.setAddrsMu.Unlock()

cleanup := func(shards map[string]*ringShard) {
for addr, shard := range shards {
if err := shard.Client.Close(); err != nil {
internal.Logger.Printf(context.Background(), "shard.Close %s failed: %s", addr, err)
}
}
}

c.mu.RLock()
if c.closed {
c.mu.Unlock()
c.mu.RUnlock()
return
}
existing := c.shards
c.mu.RUnlock()

shards, created, unused := c.newRingShards(addrs, existing)

shards, cleanup := c.newRingShards(addrs, c.shards)
c.mu.Lock()
if c.closed {
cleanup(created)
c.mu.Unlock()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just wonder if we can call cleanup(created) after the mutex is released?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about it but decided to keep like that - there would not be much difference as the ring is closed anyway at this point. I can move the unlock for aesthetics though.

return
}
c.shards = shards
c.rebalanceLocked()
c.mu.Unlock()

cleanup()
cleanup(unused)
}

func (c *ringSharding) newRingShards(
addrs map[string]string, existingShards *ringShards,
) (*ringShards, func()) {
shardMap := make(map[string]*ringShard) // indexed by addr
unusedShards := make(map[string]*ringShard) // indexed by addr

if existingShards != nil {
for _, shard := range existingShards.list {
addr := shard.Client.opt.Addr
shardMap[addr] = shard
unusedShards[addr] = shard
}
}
addrs map[string]string, existing *ringShards,
) (shards *ringShards, created, unused map[string]*ringShard) {

shards = &ringShards{m: make(map[string]*ringShard, len(addrs))}
created = make(map[string]*ringShard) // indexed by addr
unused = make(map[string]*ringShard) // indexed by addr

shards := &ringShards{
m: make(map[string]*ringShard),
if existing != nil {
for _, shard := range existing.list {
unused[shard.addr] = shard
}
}

for name, addr := range addrs {
if shard, ok := shardMap[addr]; ok {
if shard, ok := unused[addr]; ok {
shards.m[name] = shard
delete(unusedShards, addr)
delete(unused, addr)
} else {
shard := newRingShard(c.opt, addr)
shards.m[name] = shard
created[addr] = shard

for _, fn := range c.onNewNode {
fn(shard.Client)
Expand All @@ -296,13 +316,7 @@ func (c *ringSharding) newRingShards(
shards.list = append(shards.list, shard)
}

return shards, func() {
for addr, shard := range unusedShards {
if err := shard.Client.Close(); err != nil {
internal.Logger.Printf(context.Background(), "shard.Close %s failed: %s", addr, err)
}
}
}
return
}

func (c *ringSharding) List() []*ringShard {
Expand Down
99 changes: 93 additions & 6 deletions ring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"net"
"strconv"
"sync"
"testing"
"time"

. "github.com/onsi/ginkgo"
Expand Down Expand Up @@ -123,15 +124,15 @@ var _ = Describe("Redis Ring", func() {
})
Expect(ring.Len(), 1)
gotShard := ring.ShardByName("ringShardOne")
Expect(gotShard).To(Equal(wantShard))
Expect(gotShard).To(BeIdenticalTo(wantShard))

ring.SetAddrs(map[string]string{
"ringShardOne": ":" + ringShard1Port,
"ringShardTwo": ":" + ringShard2Port,
})
Expect(ring.Len(), 2)
gotShard = ring.ShardByName("ringShardOne")
Expect(gotShard).To(Equal(wantShard))
Expect(gotShard).To(BeIdenticalTo(wantShard))
})

It("uses 3 shards after setting it to 3 shards", func() {
Expand All @@ -155,8 +156,8 @@ var _ = Describe("Redis Ring", func() {
gotShard1 := ring.ShardByName(shardName1)
gotShard2 := ring.ShardByName(shardName2)
gotShard3 := ring.ShardByName(shardName3)
Expect(gotShard1).To(Equal(wantShard1))
Expect(gotShard2).To(Equal(wantShard2))
Expect(gotShard1).To(BeIdenticalTo(wantShard1))
Expect(gotShard2).To(BeIdenticalTo(wantShard2))
Expect(gotShard3).ToNot(BeNil())

ring.SetAddrs(map[string]string{
Expand All @@ -167,8 +168,8 @@ var _ = Describe("Redis Ring", func() {
gotShard1 = ring.ShardByName(shardName1)
gotShard2 = ring.ShardByName(shardName2)
gotShard3 = ring.ShardByName(shardName3)
Expect(gotShard1).To(Equal(wantShard1))
Expect(gotShard2).To(Equal(wantShard2))
Expect(gotShard1).To(BeIdenticalTo(wantShard1))
Expect(gotShard2).To(BeIdenticalTo(wantShard2))
Expect(gotShard3).To(BeNil())
})
})
Expand Down Expand Up @@ -739,3 +740,89 @@ var _ = Describe("Ring Tx timeout", func() {
testTimeout()
})
})

func TestRingSetAddrsContention(t *testing.T) {
const (
ringShard1Name = "ringShardOne"
ringShard2Name = "ringShardTwo"
)

for _, port := range []string{ringShard1Port, ringShard2Port} {
if _, err := startRedis(port); err != nil {
t.Fatal(err)
}
}

t.Cleanup(func() {
for _, p := range processes {
if err := p.Close(); err != nil {
t.Errorf("Failed to stop redis process: %v", err)
}
}
processes = nil
})

ring := redis.NewRing(&redis.RingOptions{
Addrs: map[string]string{
"ringShardOne": ":" + ringShard1Port,
},
NewClient: func(opt *redis.Options) *redis.Client {
// Simulate slow shard creation
time.Sleep(100 * time.Millisecond)
return redis.NewClient(opt)
},
})
defer ring.Close()

if _, err := ring.Ping(context.Background()).Result(); err != nil {
t.Fatal(err)
}

// Continuously update addresses by adding and removing one address
updatesDone := make(chan struct{})
defer func() { close(updatesDone) }()
go func() {
ticker := time.NewTicker(10 * time.Millisecond)
defer ticker.Stop()
for i := 0; ; i++ {
select {
case <-ticker.C:
if i%2 == 0 {
ring.SetAddrs(map[string]string{
ringShard1Name: ":" + ringShard1Port,
})
} else {
ring.SetAddrs(map[string]string{
ringShard1Name: ":" + ringShard1Port,
ringShard2Name: ":" + ringShard2Port,
})
}
case <-updatesDone:
return
}
}
}()

var pings, errClosed int
timer := time.NewTimer(1 * time.Second)
for running := true; running; pings++ {
select {
case <-timer.C:
running = false
default:
if _, err := ring.Ping(context.Background()).Result(); err != nil {
if err == redis.ErrClosed {
// The shard client could be closed while ping command is in progress
errClosed++
} else {
t.Fatal(err)
}
}
}
}

t.Logf("Number of pings: %d, errClosed: %d", pings, errClosed)
if pings < 10_000 {
t.Errorf("Expected at least 10k pings, got: %d", pings)
}
}