Skip to content

feature: dynamically add and remove shards by the ring client #2093

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,3 +93,11 @@ func GetSlavesAddrByName(ctx context.Context, c *SentinelClient, name string) []
}
return parseReplicaAddrs(addrs, false)
}

func (c *Ring) GetAddr(addr string) *ringShard {
return c.shards.GetAddr(addr)
}

func (c *ringShards) GetAddr(addr string) *ringShard {
return c.shards[addr]
}
88 changes: 74 additions & 14 deletions ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ func (opt *RingOptions) clientOptions() *Options {
type ringShard struct {
Client *Client
down int32
addr string
}

func newRingShard(opt *RingOptions, name, addr string) *ringShard {
Expand All @@ -168,6 +169,7 @@ func newRingShard(opt *RingOptions, name, addr string) *ringShard {

return &ringShard{
Client: opt.NewClient(name, clopt),
addr: addr,
}
}

Expand Down Expand Up @@ -212,33 +214,68 @@ type ringShards struct {
opt *RingOptions

mu sync.RWMutex
muClose sync.Mutex
hash ConsistentHash
shards map[string]*ringShard // read only
list []*ringShard // read only
shards map[string]*ringShard // read only, updated by SetAddrs
list []*ringShard // read only, updated by SetAddrs
numShard int
closed bool
}

func newRingShards(opt *RingOptions) *ringShards {
shards := make(map[string]*ringShard, len(opt.Addrs))
list := make([]*ringShard, 0, len(shards))
c := &ringShards{
opt: opt,
}
c.SetAddrs(opt.Addrs)

for name, addr := range opt.Addrs {
shard := newRingShard(opt, name, addr)
shards[name] = shard
return c
}

list = append(list, shard)
// SetAddrs replaces the shards in use, such that you can increase and
// decrease number of shards, that you use. It will reuse shards that
// existed before and close the ones that will not be used anymore.
func (c *ringShards) SetAddrs(addrs map[string]string) {
c.muClose.Lock()
defer c.muClose.Unlock()
if c.closed {
return
}

c := &ringShards{
opt: opt,
shards := make(map[string]*ringShard)
unusedShards := make(map[string]*ringShard)

for k, shard := range c.shards {
if addr, ok := addrs[k]; ok && shard.addr == addr {
shards[k] = shard
} else {
unusedShards[k] = shard
}
}

shards: shards,
list: list,
for k, addr := range addrs {
if shard, ok := c.shards[k]; !ok || shard.addr != addr {
shards[k] = newRingShard(c.opt, k, addr)
}
}
c.rebalance()

return c
list := make([]*ringShard, 0, len(shards))
for _, shard := range shards {
list = append(list, shard)
}

c.mu.Lock()
c.shards = shards
c.list = list

c.rebalanceLocked()
c.mu.Unlock()

for k, shard := range unusedShards {
err := shard.Client.Close()
if err != nil {
internal.Logger.Printf(context.Background(), "Failed to close ring shard client %s %s: %v", k, shard.addr, err)
}
}
}

func (c *ringShards) List() []*ringShard {
Expand Down Expand Up @@ -355,6 +392,23 @@ func (c *ringShards) rebalance() {
c.mu.Unlock()
}

// rebalanceLocked removes dead shards from the Ring and callers need to hold the locl
func (c *ringShards) rebalanceLocked() {
shards := c.shards
liveShards := make([]string, 0, len(shards))

for name, shard := range shards {
if shard.IsUp() {
liveShards = append(liveShards, name)
}
}

hash := c.opt.NewConsistentHash(liveShards)

c.hash = hash
c.numShard = len(liveShards)
}

func (c *ringShards) Len() int {
c.mu.RLock()
defer c.mu.RUnlock()
Expand All @@ -363,6 +417,8 @@ func (c *ringShards) Len() int {
}

func (c *ringShards) Close() error {
c.muClose.Lock()
defer c.muClose.Unlock()
c.mu.Lock()
defer c.mu.Unlock()

Expand Down Expand Up @@ -436,6 +492,10 @@ func NewRing(opt *RingOptions) *Ring {
return &ring
}

func (c *Ring) SetAddrs(ctx context.Context, addrs map[string]string) {
c.shards.SetAddrs(addrs)
}

// Do creates a Cmd from the args and processes the cmd.
func (c *Ring) Do(ctx context.Context, args ...interface{}) *Cmd {
cmd := NewCmd(ctx, args...)
Expand Down
73 changes: 73 additions & 0 deletions ring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,79 @@ var _ = Describe("Redis Ring", func() {
Expect(ringShard2.Info(ctx, "keyspace").Val()).To(ContainSubstring("keys=100"))
})

Describe("[new] dynamic setting ring shards", func() {
It("downscale shard and check reuse shard, upscale shard and check reuse", func() {
Expect(ring.Len(), 2)

wantShard := ring.GetAddr("ringShardOne")
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
ring.SetAddrs(ctx, map[string]string{
"ringShardOne": ":" + ringShard1Port,
})
Expect(ring.Len(), 1)
gotShard := ring.GetAddr("ringShardOne")
Expect(gotShard).To(Equal(wantShard))

ctx, cancel = context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
ring.SetAddrs(ctx, map[string]string{
"ringShardOne": ":" + ringShard1Port,
"ringShardTwo": ":" + ringShard2Port,
})
Expect(ring.Len(), 2)
gotShard = ring.GetAddr("ringShardOne")
Expect(gotShard).To(Equal(wantShard))

})

It("uses 3 shards after setting it to 3 shards", func() {
Expect(ring.Len(), 2)

// Start ringShard3.
var err error
ringShard3, err = startRedis(ringShard3Port)
Expect(err).NotTo(HaveOccurred())

shardName1 := "ringShardOne"
shardAddr1 := ":" + ringShard1Port
wantShard1 := ring.GetAddr(shardName1)
shardName2 := "ringShardTwo"
shardAddr2 := ":" + ringShard2Port
wantShard2 := ring.GetAddr(shardName2)
shardName3 := "ringShardThree"
shardAddr3 := ":" + ringShard3Port
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
ring.SetAddrs(ctx, map[string]string{
shardName1: shardAddr1,
shardName2: shardAddr2,
shardName3: shardAddr3,
})
Expect(ring.Len(), 3)
gotShard1 := ring.GetAddr(shardName1)
gotShard2 := ring.GetAddr(shardName2)
gotShard3 := ring.GetAddr(shardName3)
Expect(gotShard1).To(Equal(wantShard1))
Expect(gotShard2).To(Equal(wantShard2))
Expect(gotShard3).ToNot(BeNil())

ctx, cancel = context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
ring.SetAddrs(ctx, map[string]string{
shardName1: shardAddr1,
shardName2: shardAddr2,
})
Expect(ring.Len(), 2)
gotShard1 = ring.GetAddr(shardName1)
gotShard2 = ring.GetAddr(shardName2)
gotShard3 = ring.GetAddr(shardName3)
Expect(gotShard1).To(Equal(wantShard1))
Expect(gotShard2).To(Equal(wantShard2))
Expect(gotShard3).To(BeNil())
})

})
Describe("pipeline", func() {
It("doesn't panic closed ring, returns error", func() {
pipe := ring.Pipeline()
Expand Down