From 84db8fdaea1857ef8712adc6f532bae7079d8214 Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Mon, 11 Sep 2017 07:03:59 -0700 Subject: [PATCH 1/4] clientv3: health check balancer --- clientv3/balancer.go | 70 +++++++++--- clientv3/client.go | 16 ++- clientv3/health_balancer.go | 212 ++++++++++++++++++++++++++++++++++++ 3 files changed, 275 insertions(+), 23 deletions(-) create mode 100644 clientv3/health_balancer.go diff --git a/clientv3/balancer.go b/clientv3/balancer.go index 83b4d1aaa20..ea9308b741a 100644 --- a/clientv3/balancer.go +++ b/clientv3/balancer.go @@ -29,11 +29,31 @@ import ( // This error is returned only when opts.BlockingWait is true. var ErrNoAddrAvilable = grpc.Errorf(codes.Unavailable, "there is no address available") +type balancer interface { + grpc.Balancer + ConnectNotify() <-chan struct{} + + endpoint(host string) string + endpoints() []string + + // up is Up but includes whether the balancer will use the connection. + up(addr grpc.Address) (func(error), bool) + + // updateAddrs changes the balancer's endpoints. + updateAddrs(endpoints ...string) + // ready returns a channel that closes when the balancer first connects. + ready() <-chan struct{} +} + // simpleBalancer does the bare minimum to expose multiple eps // to the grpc reconnection code path type simpleBalancer struct { - // addrs are the client's endpoints for grpc + // addrs are the client's endpoint addresses for grpc addrs []grpc.Address + + // eps holds the raw endpoints from the client + eps []string + // notifyCh notifies grpc of the set of addresses for connecting notifyCh chan []grpc.Address @@ -73,12 +93,10 @@ type simpleBalancer struct { func newSimpleBalancer(eps []string) *simpleBalancer { notifyCh := make(chan []grpc.Address, 1) - addrs := make([]grpc.Address, len(eps)) - for i := range eps { - addrs[i].Addr = getHost(eps[i]) - } + addrs := eps2addrs(eps) sb := &simpleBalancer{ addrs: addrs, + eps: eps, notifyCh: notifyCh, readyc: make(chan struct{}), upc: make(chan struct{}), @@ -101,12 +119,20 @@ func (b *simpleBalancer) ConnectNotify() <-chan struct{} { return b.upc } -func (b *simpleBalancer) getEndpoint(host string) string { +func (b *simpleBalancer) ready() <-chan struct{} { return b.readyc } + +func (b *simpleBalancer) endpoint(host string) string { b.mu.Lock() defer b.mu.Unlock() return b.host2ep[host] } +func (b *simpleBalancer) endpoints() []string { + b.mu.RLock() + defer b.mu.RUnlock() + return b.eps +} + func getHost2ep(eps []string) map[string]string { hm := make(map[string]string, len(eps)) for i := range eps { @@ -116,7 +142,7 @@ func getHost2ep(eps []string) map[string]string { return hm } -func (b *simpleBalancer) updateAddrs(eps []string) { +func (b *simpleBalancer) updateAddrs(eps ...string) { np := getHost2ep(eps) b.mu.Lock() @@ -135,17 +161,12 @@ func (b *simpleBalancer) updateAddrs(eps []string) { } b.host2ep = np - - addrs := make([]grpc.Address, 0, len(eps)) - for i := range eps { - addrs = append(addrs, grpc.Address{Addr: getHost(eps[i])}) - } - b.addrs = addrs + b.addrs, b.eps = eps2addrs(eps), eps // updating notifyCh can trigger new connections, // only update addrs if all connections are down // or addrs does not include pinAddr. - update := !hasAddr(addrs, b.pinAddr) + update := !hasAddr(b.addrs, b.pinAddr) b.mu.Unlock() if update { @@ -230,6 +251,11 @@ func (b *simpleBalancer) notifyAddrs() { } func (b *simpleBalancer) Up(addr grpc.Address) func(error) { + f, _ := b.up(addr) + return f +} + +func (b *simpleBalancer) up(addr grpc.Address) (func(error), bool) { b.mu.Lock() defer b.mu.Unlock() @@ -237,15 +263,15 @@ func (b *simpleBalancer) Up(addr grpc.Address) func(error) { // to "fix" it up at application layer. Otherwise, will panic // if b.upc is already closed. if b.closed { - return func(err error) {} + return func(err error) {}, false } // gRPC might call Up on a stale address. // Prevent updating pinAddr with a stale address. if !hasAddr(b.addrs, addr.Addr) { - return func(err error) {} + return func(err error) {}, false } if b.pinAddr != "" { - return func(err error) {} + return func(err error) {}, false } // notify waiting Get()s and pin first connected address close(b.upc) @@ -259,7 +285,7 @@ func (b *simpleBalancer) Up(addr grpc.Address) func(error) { close(b.downc) b.pinAddr = "" b.mu.Unlock() - } + }, true } func (b *simpleBalancer) Get(ctx context.Context, opts grpc.BalancerGetOptions) (grpc.Address, func(), error) { @@ -354,3 +380,11 @@ func getHost(ep string) string { } return url.Host } + +func eps2addrs(eps []string) []grpc.Address { + addrs := make([]grpc.Address, len(eps)) + for i := range eps { + addrs[i].Addr = getHost(eps[i]) + } + return addrs +} diff --git a/clientv3/client.go b/clientv3/client.go index dec664605aa..06e2d771366 100644 --- a/clientv3/client.go +++ b/clientv3/client.go @@ -55,7 +55,8 @@ type Client struct { cfg Config creds *credentials.TransportCredentials - balancer *simpleBalancer + balancer balancer + mu sync.Mutex ctx context.Context cancel context.CancelFunc @@ -116,8 +117,10 @@ func (c *Client) Endpoints() (eps []string) { // SetEndpoints updates client's endpoints. func (c *Client) SetEndpoints(eps ...string) { + c.mu.Lock() c.cfg.Endpoints = eps - c.balancer.updateAddrs(eps) + c.mu.Unlock() + c.balancer.updateAddrs(eps...) } // Sync synchronizes client's endpoints with the known endpoints from the etcd membership. @@ -227,7 +230,7 @@ func (c *Client) dialSetupOpts(endpoint string, dopts ...grpc.DialOption) (opts opts = append(opts, dopts...) f := func(host string, t time.Duration) (net.Conn, error) { - proto, host, _ := parseEndpoint(c.balancer.getEndpoint(host)) + proto, host, _ := parseEndpoint(c.balancer.endpoint(host)) if host == "" && endpoint != "" { // dialing an endpoint not in the balancer; use // endpoint passed into dial @@ -375,7 +378,10 @@ func newClient(cfg *Config) (*Client, error) { client.Password = cfg.Password } - client.balancer = newSimpleBalancer(cfg.Endpoints) + sb := newSimpleBalancer(cfg.Endpoints) + hc := func(ep string) (bool, error) { return grpcHealthCheck(client, ep) } + client.balancer = newHealthBalancer(sb, cfg.DialTimeout, hc) + // use Endpoints[0] so that for https:// without any tls config given, then // grpc will assume the certificate server name is the endpoint host. conn, err := client.dial(cfg.Endpoints[0], grpc.WithBalancer(client.balancer)) @@ -391,7 +397,7 @@ func newClient(cfg *Config) (*Client, error) { hasConn := false waitc := time.After(cfg.DialTimeout) select { - case <-client.balancer.readyc: + case <-client.balancer.ready(): hasConn = true case <-ctx.Done(): case <-waitc: diff --git a/clientv3/health_balancer.go b/clientv3/health_balancer.go new file mode 100644 index 00000000000..25c2b1c6b9c --- /dev/null +++ b/clientv3/health_balancer.go @@ -0,0 +1,212 @@ +// Copyright 2017 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package clientv3 + +import ( + "context" + "sync" + "time" + + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + healthpb "google.golang.org/grpc/health/grpc_health_v1" + "google.golang.org/grpc/status" +) + +const minHealthRetryDuration = 3 * time.Second +const unknownService = "unknown service grpc.health.v1.Health" + +type healthCheckFunc func(ep string) (bool, error) + +// healthBalancer wraps a balancer so that it uses health checking +// to choose its endpoints. +type healthBalancer struct { + balancer + + // healthCheck checks an endpoint's health. + healthCheck healthCheckFunc + + // mu protects addrs, eps, unhealthy map, and stopc. + mu sync.RWMutex + + // addrs stores all grpc addresses associated with the balancer. + addrs []grpc.Address + + // eps stores all client endpoints + eps []string + + // unhealthy tracks the last unhealthy time of endpoints. + unhealthy map[string]time.Time + + stopc chan struct{} + stopOnce sync.Once + + host2ep map[string]string + + wg sync.WaitGroup +} + +func newHealthBalancer(b balancer, timeout time.Duration, hc healthCheckFunc) *healthBalancer { + hb := &healthBalancer{ + balancer: b, + healthCheck: hc, + eps: b.endpoints(), + addrs: eps2addrs(b.endpoints()), + host2ep: getHost2ep(b.endpoints()), + unhealthy: make(map[string]time.Time), + stopc: make(chan struct{}), + } + if timeout < minHealthRetryDuration { + timeout = minHealthRetryDuration + } + + hb.wg.Add(1) + go func() { + defer hb.wg.Done() + hb.updateUnhealthy(timeout) + }() + + return hb +} + +func (hb *healthBalancer) Up(addr grpc.Address) func(error) { + f, used := hb.up(addr) + if !used { + return f + } + return func(err error) { + // If connected to a black hole endpoint or a killed server, the gRPC ping + // timeout will induce a network I/O error, and retrying until success; + // finding healthy endpoint on retry could take several timeouts and redials. + // To avoid wasting retries, gray-list unhealthy endpoints. + hb.mu.Lock() + hb.unhealthy[addr.Addr] = time.Now() + hb.mu.Unlock() + f(err) + } +} + +func (hb *healthBalancer) up(addr grpc.Address) (func(error), bool) { + if !hb.mayPin(addr) { + return func(err error) {}, false + } + return hb.balancer.up(addr) +} + +func (hb *healthBalancer) Close() error { + hb.stopOnce.Do(func() { close(hb.stopc) }) + hb.wg.Wait() + return hb.balancer.Close() +} + +func (hb *healthBalancer) updateAddrs(eps ...string) { + addrs, host2ep := eps2addrs(eps), getHost2ep(eps) + hb.mu.Lock() + hb.addrs, hb.eps, hb.host2ep = addrs, eps, host2ep + hb.mu.Unlock() + hb.balancer.updateAddrs(eps...) +} + +func (hb *healthBalancer) endpoint(host string) string { + hb.mu.RLock() + defer hb.mu.RUnlock() + return hb.host2ep[host] +} + +func (hb *healthBalancer) endpoints() []string { + hb.mu.RLock() + defer hb.mu.RUnlock() + return hb.eps +} + +func (hb *healthBalancer) updateUnhealthy(timeout time.Duration) { + for { + select { + case <-time.After(timeout): + hb.mu.Lock() + for k, v := range hb.unhealthy { + if time.Since(v) > timeout { + delete(hb.unhealthy, k) + } + } + hb.mu.Unlock() + eps := []string{} + for _, addr := range hb.liveAddrs() { + eps = append(eps, hb.endpoint(addr.Addr)) + } + hb.balancer.updateAddrs(eps...) + case <-hb.stopc: + return + } + } +} + +func (hb *healthBalancer) liveAddrs() []grpc.Address { + hb.mu.RLock() + defer hb.mu.RUnlock() + hbAddrs := hb.addrs + if len(hb.addrs) == 1 || len(hb.unhealthy) == 0 || len(hb.unhealthy) == len(hb.addrs) { + return hbAddrs + } + addrs := make([]grpc.Address, 0, len(hb.addrs)-len(hb.unhealthy)) + for _, addr := range hb.addrs { + if _, unhealthy := hb.unhealthy[addr.Addr]; !unhealthy { + addrs = append(addrs, addr) + } + } + return addrs +} + +func (hb *healthBalancer) mayPin(addr grpc.Address) bool { + hb.mu.RLock() + skip := len(hb.addrs) == 1 || len(hb.unhealthy) == 0 + _, bad := hb.unhealthy[addr.Addr] + hb.mu.RUnlock() + if skip || !bad { + return true + } + if ok, _ := hb.healthCheck(addr.Addr); ok { + hb.mu.Lock() + delete(hb.unhealthy, addr.Addr) + hb.mu.Unlock() + return true + } + hb.mu.Lock() + hb.unhealthy[addr.Addr] = time.Now() + hb.mu.Unlock() + return false +} + +func grpcHealthCheck(client *Client, ep string) (bool, error) { + conn, err := client.dial(ep) + if err != nil { + return false, err + } + defer conn.Close() + cli := healthpb.NewHealthClient(conn) + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + resp, err := cli.Check(ctx, &healthpb.HealthCheckRequest{}) + cancel() + if err != nil { + if s, ok := status.FromError(err); ok && s.Code() == codes.Unavailable { + if s.Message() == unknownService { + // etcd < v3.3.0 + return true, nil + } + } + return false, err + } + return resp.Status == healthpb.HealthCheckResponse_SERVING, nil +} From e3deb9f4822e1c581c50841a9888b91d0925427f Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Tue, 12 Sep 2017 02:13:47 -0700 Subject: [PATCH 2/4] clientv3: test health balancer gray listing --- clientv3/balancer_test.go | 60 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 60 insertions(+) diff --git a/clientv3/balancer_test.go b/clientv3/balancer_test.go index 4485a474be6..7048f939c45 100644 --- a/clientv3/balancer_test.go +++ b/clientv3/balancer_test.go @@ -133,6 +133,66 @@ func TestBalancerGetBlocking(t *testing.T) { } } +// TestHealthBalancerGraylist checks one endpoint is tried after the other +// due to gray listing. +func TestHealthBalancerGraylist(t *testing.T) { + var wg sync.WaitGroup + // Use 3 endpoints so gray list doesn't fallback to all connections + // after failing on 2 endpoints. + lns, eps := make([]net.Listener, 3), make([]string, 3) + wg.Add(3) + connc := make(chan string, 2) + for i := range eps { + ln, err := net.Listen("tcp", ":0") + testutil.AssertNil(t, err) + lns[i], eps[i] = ln, ln.Addr().String() + go func() { + defer wg.Done() + for { + conn, err := ln.Accept() + if err != nil { + return + } + _, err = conn.Read(make([]byte, 512)) + conn.Close() + if err == nil { + select { + case connc <- ln.Addr().String(): + // sleep some so balancer catches up + // before attempted next reconnect. + time.Sleep(50 * time.Millisecond) + default: + } + } + } + }() + } + + sb := newSimpleBalancer(eps) + tf := func(s string) (bool, error) { return false, nil } + hb := newHealthBalancer(sb, 5*time.Second, tf) + + conn, err := grpc.Dial("", grpc.WithInsecure(), grpc.WithBalancer(hb)) + testutil.AssertNil(t, err) + defer conn.Close() + + kvc := pb.NewKVClient(conn) + <-hb.ready() + + kvc.Range(context.TODO(), &pb.RangeRequest{}) + ep1 := <-connc + kvc.Range(context.TODO(), &pb.RangeRequest{}) + ep2 := <-connc + for _, ln := range lns { + ln.Close() + } + wg.Wait() + + if ep1 == ep2 { + t.Fatalf("expected %q != %q", ep1, ep2) + } +} + // TestBalancerDoNotBlockOnClose ensures that balancer and grpc don't deadlock each other // due to rapid open/close conn. The deadlock causes balancer.Close() to block forever. // See issue: https://github.com/coreos/etcd/issues/7283 for more detail. From efd7800e0fd5b3e5342479a04a90fca8cb017954 Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Mon, 11 Sep 2017 21:05:53 -0700 Subject: [PATCH 3/4] clientv3: try next endpoint point on unavailable error --- clientv3/balancer.go | 52 +++++++++++++++++++++++++++++++++++--------- clientv3/retry.go | 12 ++++++++-- 2 files changed, 52 insertions(+), 12 deletions(-) diff --git a/clientv3/balancer.go b/clientv3/balancer.go index ea9308b741a..75c5cd43823 100644 --- a/clientv3/balancer.go +++ b/clientv3/balancer.go @@ -29,6 +29,13 @@ import ( // This error is returned only when opts.BlockingWait is true. var ErrNoAddrAvilable = grpc.Errorf(codes.Unavailable, "there is no address available") +type notifyMsg int + +const ( + notifyReset notifyMsg = iota + notifyNext +) + type balancer interface { grpc.Balancer ConnectNotify() <-chan struct{} @@ -43,6 +50,8 @@ type balancer interface { updateAddrs(endpoints ...string) // ready returns a channel that closes when the balancer first connects. ready() <-chan struct{} + // next forces the balancer to switch endpoints. + next() } // simpleBalancer does the bare minimum to expose multiple eps @@ -77,7 +86,7 @@ type simpleBalancer struct { donec chan struct{} // updateAddrsC notifies updateNotifyLoop to update addrs. - updateAddrsC chan struct{} + updateAddrsC chan notifyMsg // grpc issues TLS cert checks using the string passed into dial so // that string must be the host. To recover the full scheme://host URL, @@ -92,7 +101,7 @@ type simpleBalancer struct { } func newSimpleBalancer(eps []string) *simpleBalancer { - notifyCh := make(chan []grpc.Address, 1) + notifyCh := make(chan []grpc.Address) addrs := eps2addrs(eps) sb := &simpleBalancer{ addrs: addrs, @@ -103,7 +112,7 @@ func newSimpleBalancer(eps []string) *simpleBalancer { stopc: make(chan struct{}), downc: make(chan struct{}), donec: make(chan struct{}), - updateAddrsC: make(chan struct{}, 1), + updateAddrsC: make(chan notifyMsg), host2ep: getHost2ep(eps), } close(sb.downc) @@ -171,12 +180,27 @@ func (b *simpleBalancer) updateAddrs(eps ...string) { if update { select { - case b.updateAddrsC <- struct{}{}: + case b.updateAddrsC <- notifyReset: case <-b.stopc: } } } +func (b *simpleBalancer) next() { + b.mu.RLock() + downc := b.downc + b.mu.RUnlock() + select { + case b.updateAddrsC <- notifyNext: + case <-b.stopc: + } + // wait until disconnect so new RPCs are not issued on old connection + select { + case <-downc: + case <-b.stopc: + } +} + func hasAddr(addrs []grpc.Address, targetAddr string) bool { for _, addr := range addrs { if targetAddr == addr.Addr { @@ -213,11 +237,11 @@ func (b *simpleBalancer) updateNotifyLoop() { default: } case downc == nil: - b.notifyAddrs() + b.notifyAddrs(notifyReset) select { case <-upc: - case <-b.updateAddrsC: - b.notifyAddrs() + case msg := <-b.updateAddrsC: + b.notifyAddrs(msg) case <-b.stopc: return } @@ -231,16 +255,24 @@ func (b *simpleBalancer) updateNotifyLoop() { } select { case <-downc: - case <-b.updateAddrsC: + b.notifyAddrs(notifyReset) + case msg := <-b.updateAddrsC: + b.notifyAddrs(msg) case <-b.stopc: return } - b.notifyAddrs() } } } -func (b *simpleBalancer) notifyAddrs() { +func (b *simpleBalancer) notifyAddrs(msg notifyMsg) { + if msg == notifyNext { + select { + case b.notifyCh <- []grpc.Address{}: + case <-b.stopc: + return + } + } b.mu.RLock() addrs := b.addrs b.mu.RUnlock() diff --git a/clientv3/retry.go b/clientv3/retry.go index 272b62b9210..aab2c9235e9 100644 --- a/clientv3/retry.go +++ b/clientv3/retry.go @@ -51,11 +51,19 @@ func isWriteStopError(err error) bool { func (c *Client) newRetryWrapper(isStop retryStopErrFunc) retryRpcFunc { return func(rpcCtx context.Context, f rpcFunc) error { for { - if err := f(rpcCtx); err == nil || isStop(err) { + err := f(rpcCtx) + if err == nil { + return nil + } + notify := c.balancer.ConnectNotify() + if s, ok := status.FromError(err); ok && s.Code() == codes.Unavailable { + c.balancer.next() + } + if isStop(err) { return err } select { - case <-c.balancer.ConnectNotify(): + case <-notify: case <-rpcCtx.Done(): return rpcCtx.Err() case <-c.ctx.Done(): From 49e5e78d0f3cc9f7ca31e55ddd57f3873969e011 Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Mon, 11 Sep 2017 21:12:52 -0700 Subject: [PATCH 4/4] clientv3/integration: test endpoint switches on partitioned member --- clientv3/integration/kv_test.go | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/clientv3/integration/kv_test.go b/clientv3/integration/kv_test.go index f5feb0aaea0..22c71019f7b 100644 --- a/clientv3/integration/kv_test.go +++ b/clientv3/integration/kv_test.go @@ -933,3 +933,29 @@ func TestKVPutAtMostOnce(t *testing.T) { t.Fatalf("expected version <= 10, got %+v", resp.Kvs[0]) } } + +func TestKVSwitchUnavailable(t *testing.T) { + defer testutil.AfterTest(t) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) + defer clus.Terminate(t) + + clus.Members[0].InjectPartition(t, clus.Members[1:]) + // try to connect with dead node in the endpoint list + cfg := clientv3.Config{ + Endpoints: []string{ + clus.Members[0].GRPCAddr(), + clus.Members[1].GRPCAddr(), + }, + DialTimeout: 1 * time.Second} + cli, err := clientv3.New(cfg) + if err != nil { + t.Fatal(err) + } + defer cli.Close() + timeout := 3 * clus.Members[0].ServerConfig.ReqTimeout() + ctx, cancel := context.WithTimeout(context.TODO(), timeout) + if _, err := cli.Get(ctx, "abc"); err != nil { + t.Fatal(err) + } + cancel() +}