diff --git a/clientconn.go b/clientconn.go index 346dcb85a445..f058d3716d56 100644 --- a/clientconn.go +++ b/clientconn.go @@ -98,7 +98,8 @@ type dialOptions struct { // This is to support v1 balancer. balancerBuilder balancer.Builder // This is to support grpclb. - resolverBuilder resolver.Builder + resolverBuilder resolver.Builder + waitForHandshake bool } const ( @@ -109,6 +110,15 @@ const ( // DialOption configures how we set up the connection. type DialOption func(*dialOptions) +// WithWaitForHandshake blocks until the initial settings frame is received from the +// server before assigning RPCs to the connection. +// Experimental API. +func WithWaitForHandshake() DialOption { + return func(o *dialOptions) { + o.waitForHandshake = true + } +} + // WithWriteBufferSize lets you set the size of write buffer, this determines how much data can be batched // before doing a write on the wire. func WithWriteBufferSize(s int) DialOption { @@ -240,7 +250,7 @@ func WithBackoffConfig(b BackoffConfig) DialOption { return withBackoff(b) } -// withBackoff sets the backoff strategy used for retries after a +// withBackoff sets the backoff strategy used for connectRetryNum after a // failed connection attempt. // // This can be exported if arbitrary backoff strategies are allowed by gRPC. @@ -804,6 +814,7 @@ func (ac *addrConn) tryUpdateAddrs(addrs []resolver.Address) bool { grpclog.Infof("addrConn: tryUpdateAddrs curAddrFound: %v", curAddrFound) if curAddrFound { ac.addrs = addrs + ac.reconnectIdx = 0 // Start reconnecting from beginning in the new list. } return curAddrFound @@ -899,15 +910,16 @@ type addrConn struct { ctx context.Context cancel context.CancelFunc - cc *ClientConn - curAddr resolver.Address - addrs []resolver.Address - dopts dialOptions - events trace.EventLog - acbw balancer.SubConn + cc *ClientConn + addrs []resolver.Address + dopts dialOptions + events trace.EventLog + acbw balancer.SubConn - mu sync.Mutex - state connectivity.State + mu sync.Mutex + curAddr resolver.Address + reconnectIdx int // The index in addrs list to start reconnecting from. + state connectivity.State // ready is closed and becomes nil when a new transport is up or failed // due to timeout. ready chan struct{} @@ -915,6 +927,14 @@ type addrConn struct { // The reason this addrConn is torn down. tearDownErr error + + connectRetryNum int + // backoffDeadline is the time until which resetTransport needs to + // wait before increasing connectRetryNum count. + backoffDeadline time.Time + // connectDeadline is the time by which all connection + // negotiations must complete. + connectDeadline time.Time } // adjustParams updates parameters used to create transports upon @@ -949,6 +969,15 @@ func (ac *addrConn) errorf(format string, a ...interface{}) { // resetTransport recreates a transport to the address for ac. The old // transport will close itself on error or when the clientconn is closed. +// The created transport must receive initial settings frame from the server. +// In case that doesnt happen, transportMonitor will kill the newly created +// transport after connectDeadline has expired. +// In case there was an error on the transport before the settings frame was +// received, resetTransport resumes connecting to backends after the one that +// was previously connected to. In case end of the list is reached, resetTransport +// backs off until the original deadline. +// If the DialOption WithWaitForHandshake was set, resetTrasport returns +// successfully only after server settings are received. // // TODO(bar) make sure all state transitions are valid. func (ac *addrConn) resetTransport() error { @@ -962,19 +991,38 @@ func (ac *addrConn) resetTransport() error { ac.ready = nil } ac.transport = nil - ac.curAddr = resolver.Address{} + ridx := ac.reconnectIdx ac.mu.Unlock() ac.cc.mu.RLock() ac.dopts.copts.KeepaliveParams = ac.cc.mkp ac.cc.mu.RUnlock() - for retries := 0; ; retries++ { - sleepTime := ac.dopts.bs.backoff(retries) - timeout := minConnectTimeout + var backoffDeadline, connectDeadline time.Time + for connectRetryNum := 0; ; connectRetryNum++ { ac.mu.Lock() - if timeout < time.Duration(int(sleepTime)/len(ac.addrs)) { - timeout = time.Duration(int(sleepTime) / len(ac.addrs)) + if ac.backoffDeadline.IsZero() { + // This means either a successful HTTP2 connection was established + // or this is the first time this addrConn is trying to establish a + // connection. + backoffFor := ac.dopts.bs.backoff(connectRetryNum) // time.Duration. + // This will be the duration that dial gets to finish. + dialDuration := minConnectTimeout + if backoffFor > dialDuration { + // Give dial more time as we keep failing to connect. + dialDuration = backoffFor + } + start := time.Now() + backoffDeadline = start.Add(backoffFor) + connectDeadline = start.Add(dialDuration) + ridx = 0 // Start connecting from the beginning. + } else { + // Continue trying to conect with the same deadlines. + connectRetryNum = ac.connectRetryNum + backoffDeadline = ac.backoffDeadline + connectDeadline = ac.connectDeadline + ac.backoffDeadline = time.Time{} + ac.connectDeadline = time.Time{} + ac.connectRetryNum = 0 } - connectTime := time.Now() if ac.state == connectivity.Shutdown { ac.mu.Unlock() return errConnClosing @@ -989,94 +1037,153 @@ func (ac *addrConn) resetTransport() error { copy(addrsIter, ac.addrs) copts := ac.dopts.copts ac.mu.Unlock() - for _, addr := range addrsIter { + connected, err := ac.createTransport(connectRetryNum, ridx, backoffDeadline, connectDeadline, addrsIter, copts) + if err != nil { + return err + } + if connected { + return nil + } + } +} + +// createTransport creates a connection to one of the backends in addrs. +// It returns true if a connection was established. +func (ac *addrConn) createTransport(connectRetryNum, ridx int, backoffDeadline, connectDeadline time.Time, addrs []resolver.Address, copts transport.ConnectOptions) (bool, error) { + for i := ridx; i < len(addrs); i++ { + addr := addrs[i] + target := transport.TargetInfo{ + Addr: addr.Addr, + Metadata: addr.Metadata, + Authority: ac.cc.authority, + } + done := make(chan struct{}) + onPrefaceReceipt := func() { + close(done) ac.mu.Lock() - if ac.state == connectivity.Shutdown { - // ac.tearDown(...) has been invoked. - ac.mu.Unlock() - return errConnClosing + if !ac.backoffDeadline.IsZero() { + // If we haven't already started reconnecting to + // other backends. + // Note, this can happen when writer notices an error + // and triggers resetTransport while at the same time + // reader receives the preface and invokes this closure. + ac.backoffDeadline = time.Time{} + ac.connectDeadline = time.Time{} + ac.connectRetryNum = 0 } ac.mu.Unlock() - sinfo := transport.TargetInfo{ - Addr: addr.Addr, - Metadata: addr.Metadata, - Authority: ac.cc.authority, - } - newTransport, err := transport.NewClientTransport(ac.cc.ctx, sinfo, copts, timeout) - if err != nil { - if e, ok := err.(transport.ConnectionError); ok && !e.Temporary() { - ac.mu.Lock() - if ac.state != connectivity.Shutdown { - ac.state = connectivity.TransientFailure - ac.cc.handleSubConnStateChange(ac.acbw, ac.state) - } - ac.mu.Unlock() - return err - } - grpclog.Warningf("grpc: addrConn.resetTransport failed to create client transport: %v; Reconnecting to %v", err, addr) + } + // Do not cancel in the success path because of + // this issue in Go1.6: https://github.com/golang/go/issues/15078. + connectCtx, cancel := context.WithDeadline(ac.ctx, connectDeadline) + newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, target, copts, onPrefaceReceipt) + if err != nil { + cancel() + if e, ok := err.(transport.ConnectionError); ok && !e.Temporary() { ac.mu.Lock() - if ac.state == connectivity.Shutdown { - // ac.tearDown(...) has been invoked. - ac.mu.Unlock() - return errConnClosing + if ac.state != connectivity.Shutdown { + ac.state = connectivity.TransientFailure + ac.cc.handleSubConnStateChange(ac.acbw, ac.state) } ac.mu.Unlock() - continue + return false, err } ac.mu.Lock() - ac.printf("ready") if ac.state == connectivity.Shutdown { // ac.tearDown(...) has been invoked. ac.mu.Unlock() - newTransport.Close() - return errConnClosing - } - ac.state = connectivity.Ready - ac.cc.handleSubConnStateChange(ac.acbw, ac.state) - t := ac.transport - ac.transport = newTransport - if t != nil { - t.Close() - } - ac.curAddr = addr - if ac.ready != nil { - close(ac.ready) - ac.ready = nil + return false, errConnClosing } ac.mu.Unlock() - return nil + grpclog.Warningf("grpc: addrConn.createTransport failed to connect to %v. Err :%v. Reconnecting...", addr, err) + continue + } + if ac.dopts.waitForHandshake { + select { + case <-done: + case <-connectCtx.Done(): + // Didn't receive server preface, must kill this new transport now. + grpclog.Warningf("grpc: addrConn.createTransport failed to receive server preface before deadline.") + newTr.Close() + break + case <-ac.ctx.Done(): + } } ac.mu.Lock() - ac.state = connectivity.TransientFailure + if ac.state == connectivity.Shutdown { + ac.mu.Unlock() + // ac.tearDonn(...) has been invoked. + newTr.Close() + return false, errConnClosing + } + ac.printf("ready") + ac.state = connectivity.Ready ac.cc.handleSubConnStateChange(ac.acbw, ac.state) - ac.cc.resolveNow(resolver.ResolveNowOption{}) + ac.transport = newTr + ac.curAddr = addr if ac.ready != nil { close(ac.ready) ac.ready = nil } + ac.connectRetryNum = connectRetryNum + ac.backoffDeadline = backoffDeadline + ac.connectDeadline = connectDeadline + ac.reconnectIdx = i + 1 // Start reconnecting from the next backend in the list. ac.mu.Unlock() - timer := time.NewTimer(sleepTime - time.Since(connectTime)) - select { - case <-timer.C: - case <-ac.ctx.Done(): - timer.Stop() - return ac.ctx.Err() - } + return true, nil + } + ac.mu.Lock() + ac.state = connectivity.TransientFailure + ac.cc.handleSubConnStateChange(ac.acbw, ac.state) + ac.cc.resolveNow(resolver.ResolveNowOption{}) + if ac.ready != nil { + close(ac.ready) + ac.ready = nil + } + ac.mu.Unlock() + timer := time.NewTimer(backoffDeadline.Sub(time.Now())) + select { + case <-timer.C: + case <-ac.ctx.Done(): timer.Stop() + return false, ac.ctx.Err() } + return false, nil } // Run in a goroutine to track the error in transport and create the // new transport if an error happens. It returns when the channel is closing. func (ac *addrConn) transportMonitor() { for { + var timer *time.Timer + var cdeadline <-chan time.Time ac.mu.Lock() t := ac.transport + if !ac.connectDeadline.IsZero() { + timer = time.NewTimer(ac.connectDeadline.Sub(time.Now())) + cdeadline = timer.C + } ac.mu.Unlock() // Block until we receive a goaway or an error occurs. select { case <-t.GoAway(): case <-t.Error(): + case <-cdeadline: + ac.mu.Lock() + // This implies that client received server preface. + if ac.backoffDeadline.IsZero() { + ac.mu.Unlock() + continue + } + ac.mu.Unlock() + timer = nil + // No server preface received until deadline. + // Kill the connection. + grpclog.Warningf("grpc: addrConn.transportMonitor didn't get server preface after waiting. Closing the new transport now.") + t.Close() + } + if timer != nil { + timer.Stop() } // If a GoAway happened, regardless of error, adjust our keepalive // parameters as appropriate. diff --git a/clientconn_test.go b/clientconn_test.go index 54caf01561e5..2142ff15d095 100644 --- a/clientconn_test.go +++ b/clientconn_test.go @@ -19,12 +19,14 @@ package grpc import ( + "io" "math" "net" "testing" "time" "golang.org/x/net/context" + "golang.org/x/net/http2" "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials" @@ -46,6 +48,256 @@ func assertState(wantState connectivity.State, cc *ClientConn) (connectivity.Sta return state, state == wantState } +func TestDialWithMultipleBackendsNotSendingServerPreface(t *testing.T) { + defer leakcheck.Check(t) + numServers := 2 + servers := make([]net.Listener, numServers) + var err error + for i := 0; i < numServers; i++ { + servers[i], err = net.Listen("tcp", "localhost:0") + if err != nil { + t.Fatalf("Error while listening. Err: %v", err) + } + } + dones := make([]chan struct{}, numServers) + for i := 0; i < numServers; i++ { + dones[i] = make(chan struct{}) + } + for i := 0; i < numServers; i++ { + go func(i int) { + defer func() { + close(dones[i]) + }() + conn, err := servers[i].Accept() + if err != nil { + t.Errorf("Error while accepting. Err: %v", err) + return + } + defer conn.Close() + switch i { + case 0: // 1st server accepts the connection and immediately closes it. + case 1: // 2nd server accepts the connection and sends settings frames. + framer := http2.NewFramer(conn, conn) + if err := framer.WriteSettings(http2.Setting{}); err != nil { + t.Errorf("Error while writing settings frame. %v", err) + return + } + conn.SetDeadline(time.Now().Add(time.Second)) + buf := make([]byte, 1024) + for { // Make sure the connection stays healthy. + _, err = conn.Read(buf) + if err == nil { + continue + } + if nerr, ok := err.(net.Error); !ok || !nerr.Timeout() { + t.Errorf("Server expected the conn.Read(_) to timeout instead got error: %v", err) + } + return + } + } + }(i) + } + r, cleanup := manual.GenerateAndRegisterManualResolver() + defer cleanup() + resolvedAddrs := make([]resolver.Address, numServers) + for i := 0; i < numServers; i++ { + resolvedAddrs[i] = resolver.Address{Addr: servers[i].Addr().String()} + } + r.InitialAddrs(resolvedAddrs) + client, err := Dial(r.Scheme()+":///test.server", WithInsecure()) + if err != nil { + t.Errorf("Dial failed. Err: %v", err) + } else { + defer client.Close() + } + time.Sleep(time.Second) // Close the servers after a second for cleanup. + for _, s := range servers { + s.Close() + } + for _, done := range dones { + <-done + } +} + +func TestDialWaitsForServerSettings(t *testing.T) { + defer leakcheck.Check(t) + server, err := net.Listen("tcp", "localhost:0") + if err != nil { + t.Fatalf("Error while listening. Err: %v", err) + } + defer server.Close() + done := make(chan struct{}) + sent := make(chan struct{}) + dialDone := make(chan struct{}) + go func() { // Launch the server. + defer func() { + close(done) + }() + conn, err := server.Accept() + if err != nil { + t.Errorf("Error while accepting. Err: %v", err) + return + } + defer conn.Close() + // Sleep so that if the test were to fail it + // will fail more often than not. + time.Sleep(100 * time.Millisecond) + framer := http2.NewFramer(conn, conn) + close(sent) + if err := framer.WriteSettings(http2.Setting{}); err != nil { + t.Errorf("Error while writing settings. Err: %v", err) + return + } + <-dialDone // Close conn only after dial returns. + }() + ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) + defer cancel() + client, err := DialContext(ctx, server.Addr().String(), WithInsecure(), WithWaitForHandshake(), WithBlock()) + close(dialDone) + if err != nil { + cancel() + t.Fatalf("Error while dialing. Err: %v", err) + } + defer client.Close() + select { + case <-sent: + default: + t.Fatalf("Dial returned before server settings were sent") + } + <-done + +} + +func TestCloseConnectionWhenServerPrefaceNotReceived(t *testing.T) { + defer leakcheck.Check(t) + mctBkp := minConnectTimeout + defer func() { + minConnectTimeout = mctBkp + }() + minConnectTimeout = time.Millisecond * 500 + server, err := net.Listen("tcp", "localhost:0") + if err != nil { + t.Fatalf("Error while listening. Err: %v", err) + } + defer server.Close() + done := make(chan struct{}) + clientDone := make(chan struct{}) + go func() { // Launch the server. + defer func() { + if done != nil { + close(done) + } + }() + conn1, err := server.Accept() + if err != nil { + t.Errorf("Error while accepting. Err: %v", err) + return + } + defer conn1.Close() + // Don't send server settings and make sure the connection is closed. + time.Sleep(time.Millisecond * 1500) // Since the first backoff is for a second. + conn1.SetDeadline(time.Now().Add(time.Second)) + b := make([]byte, 24) + for { + // Make sure the connection was closed by client. + _, err = conn1.Read(b) + if err == nil { + continue + } + if err != io.EOF { + t.Errorf(" conn1.Read(_) = _, %v, want _, io.EOF", err) + return + } + break + } + + conn2, err := server.Accept() // Accept a reconnection request from client. + if err != nil { + t.Errorf("Error while accepting. Err: %v", err) + return + } + defer conn2.Close() + framer := http2.NewFramer(conn2, conn2) + if err := framer.WriteSettings(http2.Setting{}); err != nil { + t.Errorf("Error while writing settings. Err: %v", err) + return + } + time.Sleep(time.Millisecond * 1500) // Since the first backoff is for a second. + conn2.SetDeadline(time.Now().Add(time.Millisecond * 500)) + for { + // Make sure the connection stays open and is closed + // only by connection timeout. + _, err = conn2.Read(b) + if err == nil { + continue + } + if nerr, ok := err.(net.Error); ok && nerr.Timeout() { + return + } + t.Errorf("Unexpected error while reading. Err: %v, want timeout error", err) + break + } + close(done) + done = nil + <-clientDone + + }() + client, err := Dial(server.Addr().String(), WithInsecure()) + if err != nil { + t.Fatalf("Error while dialing. Err: %v", err) + } + <-done + client.Close() + close(clientDone) +} + +func TestBackoffWhenNoServerPrefaceReceived(t *testing.T) { + defer leakcheck.Check(t) + server, err := net.Listen("tcp", "localhost:0") + if err != nil { + t.Fatalf("Error while listening. Err: %v", err) + } + defer server.Close() + done := make(chan struct{}) + go func() { // Launch the server. + defer func() { + close(done) + }() + conn, err := server.Accept() // Accept the connection only to close it immediately. + if err != nil { + t.Errorf("Error while accepting. Err: %v", err) + return + } + prevAt := time.Now() + conn.Close() + var prevDuration time.Duration + // Make sure the retry attempts are backed off properly. + for i := 0; i < 3; i++ { + conn, err := server.Accept() + if err != nil { + t.Errorf("Error while accepting. Err: %v", err) + return + } + meow := time.Now() + conn.Close() + dr := meow.Sub(prevAt) + if dr <= prevDuration { + t.Errorf("Client backoff did not increase with retries. Previous duration: %v, current duration: %v", prevDuration, dr) + return + } + prevDuration = dr + prevAt = meow + } + }() + client, err := Dial(server.Addr().String(), WithInsecure()) + if err != nil { + t.Fatalf("Error while dialing. Err: %v", err) + } + defer client.Close() + <-done + +} + func TestConnectivityStates(t *testing.T) { defer leakcheck.Check(t) servers, resolver, cleanup := startServers(t, 2, math.MaxUint32) diff --git a/resolver/manual/manual.go b/resolver/manual/manual.go index 7a79090312d8..50ed762a83d0 100644 --- a/resolver/manual/manual.go +++ b/resolver/manual/manual.go @@ -40,12 +40,22 @@ type Resolver struct { scheme string // Fields actually belong to the resolver. - cc resolver.ClientConn + cc resolver.ClientConn + bootstrapAddrs []resolver.Address +} + +// InitialAddrs adds resolved addresses to the resolver so that +// NewAddress doesn't need to be explicitly called after Dial. +func (r *Resolver) InitialAddrs(addrs []resolver.Address) { + r.bootstrapAddrs = addrs } // Build returns itself for Resolver, because it's both a builder and a resolver. func (r *Resolver) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOption) (resolver.Resolver, error) { r.cc = cc + if r.bootstrapAddrs != nil { + r.NewAddress(r.bootstrapAddrs) + } return r, nil } diff --git a/transport/http2_client.go b/transport/http2_client.go index 0f58a390a374..4c9f808fbcce 100644 --- a/transport/http2_client.go +++ b/transport/http2_client.go @@ -93,6 +93,11 @@ type http2Client struct { bdpEst *bdpEstimator outQuotaVersion uint32 + // onSuccess is a callback that client transport calls upon + // receiving server preface to signal that a succefull HTTP2 + // connection was established. + onSuccess func() + mu sync.Mutex // guard the following variables state transportState // the state of underlying connection activeStreams map[uint32]*Stream @@ -145,16 +150,12 @@ func isTemporary(err error) bool { // newHTTP2Client constructs a connected ClientTransport to addr based on HTTP2 // and starts to receive messages on it. Non-nil error returns if construction // fails. -func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions, timeout time.Duration) (_ ClientTransport, err error) { +func newHTTP2Client(connectCtx, ctx context.Context, addr TargetInfo, opts ConnectOptions, onSuccess func()) (_ ClientTransport, err error) { scheme := "http" ctx, cancel := context.WithCancel(ctx) - connectCtx, connectCancel := context.WithTimeout(ctx, timeout) defer func() { if err != nil { cancel() - // Don't call connectCancel in success path due to a race in Go 1.6: - // https://github.com/golang/go/issues/15078. - connectCancel() } }() @@ -240,6 +241,7 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions, t kp: kp, statsHandler: opts.StatsHandler, initialWindowSize: initialWindowSize, + onSuccess: onSuccess, } if opts.InitialWindowSize >= defaultWindowSize { t.initialWindowSize = opts.InitialWindowSize @@ -1160,6 +1162,7 @@ func (t *http2Client) reader() { t.Close() return } + t.onSuccess() t.handleSettings(sf, true) // loop to keep reading incoming messages on this transport. diff --git a/transport/transport.go b/transport/transport.go index b7a5dbe42009..86b23ef7d20c 100644 --- a/transport/transport.go +++ b/transport/transport.go @@ -26,7 +26,6 @@ import ( "io" "net" "sync" - "time" "golang.org/x/net/context" "golang.org/x/net/http2" @@ -506,8 +505,8 @@ type TargetInfo struct { // NewClientTransport establishes the transport with the required ConnectOptions // and returns it to the caller. -func NewClientTransport(ctx context.Context, target TargetInfo, opts ConnectOptions, timeout time.Duration) (ClientTransport, error) { - return newHTTP2Client(ctx, target, opts, timeout) +func NewClientTransport(connectCtx, ctx context.Context, target TargetInfo, opts ConnectOptions, onSuccess func()) (ClientTransport, error) { + return newHTTP2Client(connectCtx, ctx, target, opts, onSuccess) } // Options provides additional hints and information for message diff --git a/transport/transport_test.go b/transport/transport_test.go index 3e29c9179596..67f3bcaf1621 100644 --- a/transport/transport_test.go +++ b/transport/transport_test.go @@ -361,8 +361,10 @@ func setUpWithOptions(t *testing.T, port int, serverConfig *ServerConfig, ht hTy target := TargetInfo{ Addr: addr, } - ct, connErr = NewClientTransport(context.Background(), target, copts, 2*time.Second) + connectCtx, cancel := context.WithDeadline(context.Background(), time.Now().Add(2*time.Second)) + ct, connErr = NewClientTransport(connectCtx, context.Background(), target, copts, func() {}) if connErr != nil { + cancel() // Do not cancel in success path. t.Fatalf("failed to create transport: %v", connErr) } return server, ct @@ -384,8 +386,10 @@ func setUpWithNoPingServer(t *testing.T, copts ConnectOptions, done chan net.Con } done <- conn }() - tr, err := NewClientTransport(context.Background(), TargetInfo{Addr: lis.Addr().String()}, copts, 2*time.Second) + connectCtx, cancel := context.WithDeadline(context.Background(), time.Now().Add(2*time.Second)) + tr, err := NewClientTransport(connectCtx, context.Background(), TargetInfo{Addr: lis.Addr().String()}, copts, func() {}) if err != nil { + cancel() // Do not cancel in success path. // Server clean-up. lis.Close() if conn, ok := <-done; ok { @@ -2091,8 +2095,10 @@ func setUpHTTPStatusTest(t *testing.T, httpStatus int, wh writeHeaders) (stream wh: wh, } server.start(t, lis) - client, err = newHTTP2Client(context.Background(), TargetInfo{Addr: lis.Addr().String()}, ConnectOptions{}, 2*time.Second) + connectCtx, cancel := context.WithDeadline(context.Background(), time.Now().Add(2*time.Second)) + client, err = newHTTP2Client(connectCtx, context.Background(), TargetInfo{Addr: lis.Addr().String()}, ConnectOptions{}, func() {}) if err != nil { + cancel() // Do not cancel in success path. t.Fatalf("Error creating client. Err: %v", err) } stream, err = client.NewStream(context.Background(), &CallHdr{Method: "bogus/method", Flush: true}) diff --git a/vet.sh b/vet.sh index 02d4bae39a98..8c5c5ba6dfb2 100755 --- a/vet.sh +++ b/vet.sh @@ -65,7 +65,7 @@ trap cleanup EXIT git ls-files "*.go" | xargs sed -i 's:"golang.org/x/net/context":"context":' set +o pipefail # TODO: Stop filtering pb.go files once golang/protobuf#214 is fixed. -go tool vet -all . 2>&1 | grep -vF '.pb.go:' | tee /dev/stderr | (! read) +go tool vet -all . 2>&1 | grep -vE '(clientconn|transport\/transport_test).go:.*cancel (function|var)' | grep -vF '.pb.go:' | tee /dev/stderr | (! read) set -o pipefail git reset --hard HEAD