Skip to content

Commit

Permalink
Merge pull request #8545 from heyitsanthony/health-balancer
Browse files Browse the repository at this point in the history
clientv3: Health balancer
  • Loading branch information
Anthony Romano authored Sep 18, 2017
2 parents a477708 + 49e5e78 commit 3cad5e4
Show file tree
Hide file tree
Showing 6 changed files with 413 additions and 35 deletions.
122 changes: 94 additions & 28 deletions clientv3/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,40 @@ import (
// This error is returned only when opts.BlockingWait is true.
var ErrNoAddrAvilable = grpc.Errorf(codes.Unavailable, "there is no address available")

type notifyMsg int

const (
notifyReset notifyMsg = iota
notifyNext
)

type balancer interface {
grpc.Balancer
ConnectNotify() <-chan struct{}

endpoint(host string) string
endpoints() []string

// up is Up but includes whether the balancer will use the connection.
up(addr grpc.Address) (func(error), bool)

// updateAddrs changes the balancer's endpoints.
updateAddrs(endpoints ...string)
// ready returns a channel that closes when the balancer first connects.
ready() <-chan struct{}
// next forces the balancer to switch endpoints.
next()
}

// simpleBalancer does the bare minimum to expose multiple eps
// to the grpc reconnection code path
type simpleBalancer struct {
// addrs are the client's endpoints for grpc
// addrs are the client's endpoint addresses for grpc
addrs []grpc.Address

// eps holds the raw endpoints from the client
eps []string

// notifyCh notifies grpc of the set of addresses for connecting
notifyCh chan []grpc.Address

Expand All @@ -57,7 +86,7 @@ type simpleBalancer struct {
donec chan struct{}

// updateAddrsC notifies updateNotifyLoop to update addrs.
updateAddrsC chan struct{}
updateAddrsC chan notifyMsg

// grpc issues TLS cert checks using the string passed into dial so
// that string must be the host. To recover the full scheme://host URL,
Expand All @@ -72,20 +101,18 @@ type simpleBalancer struct {
}

func newSimpleBalancer(eps []string) *simpleBalancer {
notifyCh := make(chan []grpc.Address, 1)
addrs := make([]grpc.Address, len(eps))
for i := range eps {
addrs[i].Addr = getHost(eps[i])
}
notifyCh := make(chan []grpc.Address)
addrs := eps2addrs(eps)
sb := &simpleBalancer{
addrs: addrs,
eps: eps,
notifyCh: notifyCh,
readyc: make(chan struct{}),
upc: make(chan struct{}),
stopc: make(chan struct{}),
downc: make(chan struct{}),
donec: make(chan struct{}),
updateAddrsC: make(chan struct{}, 1),
updateAddrsC: make(chan notifyMsg),
host2ep: getHost2ep(eps),
}
close(sb.downc)
Expand All @@ -101,12 +128,20 @@ func (b *simpleBalancer) ConnectNotify() <-chan struct{} {
return b.upc
}

func (b *simpleBalancer) getEndpoint(host string) string {
func (b *simpleBalancer) ready() <-chan struct{} { return b.readyc }

func (b *simpleBalancer) endpoint(host string) string {
b.mu.Lock()
defer b.mu.Unlock()
return b.host2ep[host]
}

func (b *simpleBalancer) endpoints() []string {
b.mu.RLock()
defer b.mu.RUnlock()
return b.eps
}

func getHost2ep(eps []string) map[string]string {
hm := make(map[string]string, len(eps))
for i := range eps {
Expand All @@ -116,7 +151,7 @@ func getHost2ep(eps []string) map[string]string {
return hm
}

func (b *simpleBalancer) updateAddrs(eps []string) {
func (b *simpleBalancer) updateAddrs(eps ...string) {
np := getHost2ep(eps)

b.mu.Lock()
Expand All @@ -135,27 +170,37 @@ func (b *simpleBalancer) updateAddrs(eps []string) {
}

b.host2ep = np

addrs := make([]grpc.Address, 0, len(eps))
for i := range eps {
addrs = append(addrs, grpc.Address{Addr: getHost(eps[i])})
}
b.addrs = addrs
b.addrs, b.eps = eps2addrs(eps), eps

// updating notifyCh can trigger new connections,
// only update addrs if all connections are down
// or addrs does not include pinAddr.
update := !hasAddr(addrs, b.pinAddr)
update := !hasAddr(b.addrs, b.pinAddr)
b.mu.Unlock()

if update {
select {
case b.updateAddrsC <- struct{}{}:
case b.updateAddrsC <- notifyReset:
case <-b.stopc:
}
}
}

func (b *simpleBalancer) next() {
b.mu.RLock()
downc := b.downc
b.mu.RUnlock()
select {
case b.updateAddrsC <- notifyNext:
case <-b.stopc:
}
// wait until disconnect so new RPCs are not issued on old connection
select {
case <-downc:
case <-b.stopc:
}
}

func hasAddr(addrs []grpc.Address, targetAddr string) bool {
for _, addr := range addrs {
if targetAddr == addr.Addr {
Expand Down Expand Up @@ -192,11 +237,11 @@ func (b *simpleBalancer) updateNotifyLoop() {
default:
}
case downc == nil:
b.notifyAddrs()
b.notifyAddrs(notifyReset)
select {
case <-upc:
case <-b.updateAddrsC:
b.notifyAddrs()
case msg := <-b.updateAddrsC:
b.notifyAddrs(msg)
case <-b.stopc:
return
}
Expand All @@ -210,16 +255,24 @@ func (b *simpleBalancer) updateNotifyLoop() {
}
select {
case <-downc:
case <-b.updateAddrsC:
b.notifyAddrs(notifyReset)
case msg := <-b.updateAddrsC:
b.notifyAddrs(msg)
case <-b.stopc:
return
}
b.notifyAddrs()
}
}
}

func (b *simpleBalancer) notifyAddrs() {
func (b *simpleBalancer) notifyAddrs(msg notifyMsg) {
if msg == notifyNext {
select {
case b.notifyCh <- []grpc.Address{}:
case <-b.stopc:
return
}
}
b.mu.RLock()
addrs := b.addrs
b.mu.RUnlock()
Expand All @@ -230,22 +283,27 @@ func (b *simpleBalancer) notifyAddrs() {
}

func (b *simpleBalancer) Up(addr grpc.Address) func(error) {
f, _ := b.up(addr)
return f
}

func (b *simpleBalancer) up(addr grpc.Address) (func(error), bool) {
b.mu.Lock()
defer b.mu.Unlock()

// gRPC might call Up after it called Close. We add this check
// to "fix" it up at application layer. Otherwise, will panic
// if b.upc is already closed.
if b.closed {
return func(err error) {}
return func(err error) {}, false
}
// gRPC might call Up on a stale address.
// Prevent updating pinAddr with a stale address.
if !hasAddr(b.addrs, addr.Addr) {
return func(err error) {}
return func(err error) {}, false
}
if b.pinAddr != "" {
return func(err error) {}
return func(err error) {}, false
}
// notify waiting Get()s and pin first connected address
close(b.upc)
Expand All @@ -259,7 +317,7 @@ func (b *simpleBalancer) Up(addr grpc.Address) func(error) {
close(b.downc)
b.pinAddr = ""
b.mu.Unlock()
}
}, true
}

func (b *simpleBalancer) Get(ctx context.Context, opts grpc.BalancerGetOptions) (grpc.Address, func(), error) {
Expand Down Expand Up @@ -354,3 +412,11 @@ func getHost(ep string) string {
}
return url.Host
}

func eps2addrs(eps []string) []grpc.Address {
addrs := make([]grpc.Address, len(eps))
for i := range eps {
addrs[i].Addr = getHost(eps[i])
}
return addrs
}
60 changes: 60 additions & 0 deletions clientv3/balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,66 @@ func TestBalancerGetBlocking(t *testing.T) {
}
}

// TestHealthBalancerGraylist checks one endpoint is tried after the other
// due to gray listing.
func TestHealthBalancerGraylist(t *testing.T) {
var wg sync.WaitGroup
// Use 3 endpoints so gray list doesn't fallback to all connections
// after failing on 2 endpoints.
lns, eps := make([]net.Listener, 3), make([]string, 3)
wg.Add(3)
connc := make(chan string, 2)
for i := range eps {
ln, err := net.Listen("tcp", ":0")
testutil.AssertNil(t, err)
lns[i], eps[i] = ln, ln.Addr().String()
go func() {
defer wg.Done()
for {
conn, err := ln.Accept()
if err != nil {
return
}
_, err = conn.Read(make([]byte, 512))
conn.Close()
if err == nil {
select {
case connc <- ln.Addr().String():
// sleep some so balancer catches up
// before attempted next reconnect.
time.Sleep(50 * time.Millisecond)
default:
}
}
}
}()
}

sb := newSimpleBalancer(eps)
tf := func(s string) (bool, error) { return false, nil }
hb := newHealthBalancer(sb, 5*time.Second, tf)

conn, err := grpc.Dial("", grpc.WithInsecure(), grpc.WithBalancer(hb))
testutil.AssertNil(t, err)
defer conn.Close()

kvc := pb.NewKVClient(conn)
<-hb.ready()

kvc.Range(context.TODO(), &pb.RangeRequest{})
ep1 := <-connc
kvc.Range(context.TODO(), &pb.RangeRequest{})
ep2 := <-connc
for _, ln := range lns {
ln.Close()
}
wg.Wait()

if ep1 == ep2 {
t.Fatalf("expected %q != %q", ep1, ep2)
}
}

// TestBalancerDoNotBlockOnClose ensures that balancer and grpc don't deadlock each other
// due to rapid open/close conn. The deadlock causes balancer.Close() to block forever.
// See issue: https://github.com/coreos/etcd/issues/7283 for more detail.
Expand Down
16 changes: 11 additions & 5 deletions clientv3/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ type Client struct {

cfg Config
creds *credentials.TransportCredentials
balancer *simpleBalancer
balancer balancer
mu sync.Mutex

ctx context.Context
cancel context.CancelFunc
Expand Down Expand Up @@ -116,8 +117,10 @@ func (c *Client) Endpoints() (eps []string) {

// SetEndpoints updates client's endpoints.
func (c *Client) SetEndpoints(eps ...string) {
c.mu.Lock()
c.cfg.Endpoints = eps
c.balancer.updateAddrs(eps)
c.mu.Unlock()
c.balancer.updateAddrs(eps...)
}

// Sync synchronizes client's endpoints with the known endpoints from the etcd membership.
Expand Down Expand Up @@ -227,7 +230,7 @@ func (c *Client) dialSetupOpts(endpoint string, dopts ...grpc.DialOption) (opts
opts = append(opts, dopts...)

f := func(host string, t time.Duration) (net.Conn, error) {
proto, host, _ := parseEndpoint(c.balancer.getEndpoint(host))
proto, host, _ := parseEndpoint(c.balancer.endpoint(host))
if host == "" && endpoint != "" {
// dialing an endpoint not in the balancer; use
// endpoint passed into dial
Expand Down Expand Up @@ -375,7 +378,10 @@ func newClient(cfg *Config) (*Client, error) {
client.Password = cfg.Password
}

client.balancer = newSimpleBalancer(cfg.Endpoints)
sb := newSimpleBalancer(cfg.Endpoints)
hc := func(ep string) (bool, error) { return grpcHealthCheck(client, ep) }
client.balancer = newHealthBalancer(sb, cfg.DialTimeout, hc)

// use Endpoints[0] so that for https:// without any tls config given, then
// grpc will assume the certificate server name is the endpoint host.
conn, err := client.dial(cfg.Endpoints[0], grpc.WithBalancer(client.balancer))
Expand All @@ -391,7 +397,7 @@ func newClient(cfg *Config) (*Client, error) {
hasConn := false
waitc := time.After(cfg.DialTimeout)
select {
case <-client.balancer.readyc:
case <-client.balancer.ready():
hasConn = true
case <-ctx.Done():
case <-waitc:
Expand Down
Loading

0 comments on commit 3cad5e4

Please sign in to comment.