From c1530f6324ad5914d62758d6611ae463ed1c8740 Mon Sep 17 00:00:00 2001 From: Mahak Mukhi Date: Fri, 3 Nov 2017 13:58:42 -0700 Subject: [PATCH 01/11] First commit. --- clientconn.go | 96 +++++++++++++++++++++++++++++-------- clientconn_test.go | 47 ++++++++++++++++++ transport/http2_client.go | 11 ++++- transport/transport.go | 4 +- transport/transport_test.go | 6 +-- 5 files changed, 136 insertions(+), 28 deletions(-) diff --git a/clientconn.go b/clientconn.go index 346dcb85a445..8375162a1fcb 100644 --- a/clientconn.go +++ b/clientconn.go @@ -899,15 +899,17 @@ type addrConn struct { ctx context.Context cancel context.CancelFunc - cc *ClientConn - curAddr resolver.Address - addrs []resolver.Address - dopts dialOptions - events trace.EventLog - acbw balancer.SubConn - - mu sync.Mutex - state connectivity.State + cc *ClientConn + addrs []resolver.Address + dopts dialOptions + events trace.EventLog + acbw balancer.SubConn + + mu sync.Mutex + retries int + curAddr resolver.Address + prevAddr resolver.Address + state connectivity.State // ready is closed and becomes nil when a new transport is up or failed // due to timeout. ready chan struct{} @@ -915,6 +917,11 @@ type addrConn struct { // The reason this addrConn is torn down. tearDownErr error + + dialDeadline time.Time // Deadline by which dialing should succeed. + // backoffUntil is the time until which resetTransport needs to + // wait before increasing retries count. + backoffUntil *time.Timer } // adjustParams updates parameters used to create transports upon @@ -962,19 +969,43 @@ func (ac *addrConn) resetTransport() error { ac.ready = nil } ac.transport = nil - ac.curAddr = resolver.Address{} + prevAddr := ac.prevAddr ac.mu.Unlock() ac.cc.mu.RLock() ac.dopts.copts.KeepaliveParams = ac.cc.mkp ac.cc.mu.RUnlock() + var ( + backoffUntil *time.Timer + dialDeadline time.Time + ) + defer func() { + if backoffUntil != nil { + backoffUntil.Stop() + } + }() for retries := 0; ; retries++ { - sleepTime := ac.dopts.bs.backoff(retries) - timeout := minConnectTimeout ac.mu.Lock() - if timeout < time.Duration(int(sleepTime)/len(ac.addrs)) { - timeout = time.Duration(int(sleepTime) / len(ac.addrs)) + if ac.backoffUntil == nil { + // This means either a successfull HTTP2 connection was established + // or this is the first time this addrConn is trying to establish a + // connection. + backoffFor := ac.dopts.bs.backoff(retries) // time.Duration. + // This will be the duration that dial gets to finish. + dialDuration := minConnectTimeout + if backoffFor > dialDuration { + // Give dial more time as we keep failing to connect. + dialDuration = backoffFor + } + dialDeadline = time.Now().Add(dialDuration) + backoffUntil = time.NewTimer(backoffFor) + prevAddr = resolver.Address{} // Start connecting from the begining. + } else { + // Continue trying to conect with the same deadlines. + retries = ac.retries + backoffUntil = ac.backoffUntil + dialDeadline = ac.dialDeadline + ac.backoffUntil = nil } - connectTime := time.Now() if ac.state == connectivity.Shutdown { ac.mu.Unlock() return errConnClosing @@ -987,9 +1018,19 @@ func (ac *addrConn) resetTransport() error { // copy ac.addrs in case of race addrsIter := make([]resolver.Address, len(ac.addrs)) copy(addrsIter, ac.addrs) + addrIdx := 0 + for idx, addr := range addrsIter { + // Find the previous attemted address and start with + // the one after it. + if addr.Addr == prevAddr.Addr { + addrIdx = idx + 1 + break + } + } copts := ac.dopts.copts ac.mu.Unlock() - for _, addr := range addrsIter { + for i := addrIdx; i < len(addrsIter); i++ { + addr := addrsIter[i] ac.mu.Lock() if ac.state == connectivity.Shutdown { // ac.tearDown(...) has been invoked. @@ -1002,7 +1043,14 @@ func (ac *addrConn) resetTransport() error { Metadata: addr.Metadata, Authority: ac.cc.authority, } - newTransport, err := transport.NewClientTransport(ac.cc.ctx, sinfo, copts, timeout) + newTransport, err := transport.NewClientTransport(ac.cc.ctx, sinfo, copts, dialDeadline, func() { + ac.mu.Lock() + if ac.backoffUntil != nil { + ac.backoffUntil.Stop() + ac.backoffUntil = nil + } + ac.mu.Unlock() + }) if err != nil { if e, ok := err.(transport.ConnectionError); ok && !e.Temporary() { ac.mu.Lock() @@ -1043,6 +1091,13 @@ func (ac *addrConn) resetTransport() error { close(ac.ready) ac.ready = nil } + // Following will be reset by the client transport when + // it gets server preface thus signifying that a successful + // HTTP2 connection was established. + ac.retries = retries + ac.backoffUntil = backoffUntil + ac.dialDeadline = dialDeadline + backoffUntil = nil // So that the defer function doesn't stop it. ac.mu.Unlock() return nil } @@ -1055,14 +1110,12 @@ func (ac *addrConn) resetTransport() error { ac.ready = nil } ac.mu.Unlock() - timer := time.NewTimer(sleepTime - time.Since(connectTime)) select { - case <-timer.C: + case <-backoffUntil.C: case <-ac.ctx.Done(): - timer.Stop() return ac.ctx.Err() } - timer.Stop() + backoffUntil.Stop() } } @@ -1095,6 +1148,7 @@ func (ac *addrConn) transportMonitor() { ac.state = connectivity.TransientFailure ac.cc.handleSubConnStateChange(ac.acbw, ac.state) ac.cc.resolveNow(resolver.ResolveNowOption{}) + ac.prevAddr = ac.curAddr ac.curAddr = resolver.Address{} ac.mu.Unlock() if err := ac.resetTransport(); err != nil { diff --git a/clientconn_test.go b/clientconn_test.go index 54caf01561e5..70a7ec2736e7 100644 --- a/clientconn_test.go +++ b/clientconn_test.go @@ -46,6 +46,53 @@ func assertState(wantState connectivity.State, cc *ClientConn) (connectivity.Sta return state, state == wantState } +func TestBackoffWhenNoServerPrefaceReceived(t *testing.T) { + defer leakcheck.Check(t) + server, err := net.Listen("tcp", "localhost:0") + if err != nil { + t.Fatalf("Error while listening. Err: %v", err) + } + defer server.Close() + done := make(chan struct{}) + go func() { // Launch the server. + defer func() { + close(done) + }() + conn, err := server.Accept() // Accept the connection only to close it immediately. + if err != nil { + t.Errorf("Error while accepting. Err: %v", err) + return + } + prevAt := time.Now() + conn.Close() + var prevDuration time.Duration + // Make sure the retry attempts are backed off properly. + for i := 0; i < 3; i++ { + conn, err := server.Accept() + if err != nil { + t.Errorf("Error while accepting. Err: %v", err) + return + } + now := time.Now() + conn.Close() + dr := now.Sub(prevAt) + if dr <= prevDuration { + t.Errorf("Client backoff did not increase with retries. Previoud duration: %v, current duration: %v", prevDuration, dr) + return + } + prevDuration = dr + prevAt = now + } + }() + client, err := Dial(server.Addr().String(), WithInsecure()) + if err != nil { + t.Fatalf("Error while dialing. Err: %v", err) + } + defer client.Close() + <-done + +} + func TestConnectivityStates(t *testing.T) { defer leakcheck.Check(t) servers, resolver, cleanup := startServers(t, 2, math.MaxUint32) diff --git a/transport/http2_client.go b/transport/http2_client.go index 0f58a390a374..643ef4aa5e3a 100644 --- a/transport/http2_client.go +++ b/transport/http2_client.go @@ -93,6 +93,11 @@ type http2Client struct { bdpEst *bdpEstimator outQuotaVersion uint32 + // onSuccess is a callback that client transport calls upon + // receiving server preface to signal that a succefull HTTP2 + // connection was established. + onSuccess func() + mu sync.Mutex // guard the following variables state transportState // the state of underlying connection activeStreams map[uint32]*Stream @@ -145,10 +150,10 @@ func isTemporary(err error) bool { // newHTTP2Client constructs a connected ClientTransport to addr based on HTTP2 // and starts to receive messages on it. Non-nil error returns if construction // fails. -func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions, timeout time.Duration) (_ ClientTransport, err error) { +func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions, deadline time.Time, onSuccess func()) (_ ClientTransport, err error) { scheme := "http" ctx, cancel := context.WithCancel(ctx) - connectCtx, connectCancel := context.WithTimeout(ctx, timeout) + connectCtx, connectCancel := context.WithDeadline(ctx, deadline) defer func() { if err != nil { cancel() @@ -240,6 +245,7 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions, t kp: kp, statsHandler: opts.StatsHandler, initialWindowSize: initialWindowSize, + onSuccess: onSuccess, } if opts.InitialWindowSize >= defaultWindowSize { t.initialWindowSize = opts.InitialWindowSize @@ -1160,6 +1166,7 @@ func (t *http2Client) reader() { t.Close() return } + t.onSuccess() t.handleSettings(sf, true) // loop to keep reading incoming messages on this transport. diff --git a/transport/transport.go b/transport/transport.go index b7a5dbe42009..3f31ec946772 100644 --- a/transport/transport.go +++ b/transport/transport.go @@ -506,8 +506,8 @@ type TargetInfo struct { // NewClientTransport establishes the transport with the required ConnectOptions // and returns it to the caller. -func NewClientTransport(ctx context.Context, target TargetInfo, opts ConnectOptions, timeout time.Duration) (ClientTransport, error) { - return newHTTP2Client(ctx, target, opts, timeout) +func NewClientTransport(ctx context.Context, target TargetInfo, opts ConnectOptions, deadline time.Time, onSuccess func()) (ClientTransport, error) { + return newHTTP2Client(ctx, target, opts, deadline, onSuccess) } // Options provides additional hints and information for message diff --git a/transport/transport_test.go b/transport/transport_test.go index 3e29c9179596..0a5d287df98e 100644 --- a/transport/transport_test.go +++ b/transport/transport_test.go @@ -361,7 +361,7 @@ func setUpWithOptions(t *testing.T, port int, serverConfig *ServerConfig, ht hTy target := TargetInfo{ Addr: addr, } - ct, connErr = NewClientTransport(context.Background(), target, copts, 2*time.Second) + ct, connErr = NewClientTransport(context.Background(), target, copts, time.Now().Add(2*time.Second), func() {}) if connErr != nil { t.Fatalf("failed to create transport: %v", connErr) } @@ -384,7 +384,7 @@ func setUpWithNoPingServer(t *testing.T, copts ConnectOptions, done chan net.Con } done <- conn }() - tr, err := NewClientTransport(context.Background(), TargetInfo{Addr: lis.Addr().String()}, copts, 2*time.Second) + tr, err := NewClientTransport(context.Background(), TargetInfo{Addr: lis.Addr().String()}, copts, time.Now().Add(2*time.Second), func() {}) if err != nil { // Server clean-up. lis.Close() @@ -2091,7 +2091,7 @@ func setUpHTTPStatusTest(t *testing.T, httpStatus int, wh writeHeaders) (stream wh: wh, } server.start(t, lis) - client, err = newHTTP2Client(context.Background(), TargetInfo{Addr: lis.Addr().String()}, ConnectOptions{}, 2*time.Second) + client, err = newHTTP2Client(context.Background(), TargetInfo{Addr: lis.Addr().String()}, ConnectOptions{}, time.Now().Add(2*time.Second), func() {}) if err != nil { t.Fatalf("Error creating client. Err: %v", err) } From aef0bf729f6c8cb0570327aeb6933fd8b0f72f4b Mon Sep 17 00:00:00 2001 From: Mahak Mukhi Date: Tue, 7 Nov 2017 14:57:08 -0800 Subject: [PATCH 02/11] Added rest of the features and test cases. --- clientconn.go | 124 +++++++++++++++++++++++++++-------- clientconn_test.go | 126 ++++++++++++++++++++++++++++++++++++ transport/http2_client.go | 6 +- transport/transport.go | 5 +- transport/transport_test.go | 12 +++- 5 files changed, 234 insertions(+), 39 deletions(-) diff --git a/clientconn.go b/clientconn.go index 8375162a1fcb..7c446952d98a 100644 --- a/clientconn.go +++ b/clientconn.go @@ -98,7 +98,8 @@ type dialOptions struct { // This is to support v1 balancer. balancerBuilder balancer.Builder // This is to support grpclb. - resolverBuilder resolver.Builder + resolverBuilder resolver.Builder + waitForServerSettings bool } const ( @@ -109,6 +110,16 @@ const ( // DialOption configures how we set up the connection. type DialOption func(*dialOptions) +// WithWaitForServerSettings makes the Dial wait until the client receives initial settings (server preface) +// from the server. +// Experimental API. +func WithWaitForServerSettings() DialOption { + return func(o *dialOptions) { + o.waitForServerSettings = true + o.block = true + } +} + // WithWriteBufferSize lets you set the size of write buffer, this determines how much data can be batched // before doing a write on the wire. func WithWriteBufferSize(s int) DialOption { @@ -906,7 +917,6 @@ type addrConn struct { acbw balancer.SubConn mu sync.Mutex - retries int curAddr resolver.Address prevAddr resolver.Address state connectivity.State @@ -918,10 +928,13 @@ type addrConn struct { // The reason this addrConn is torn down. tearDownErr error - dialDeadline time.Time // Deadline by which dialing should succeed. - // backoffUntil is the time until which resetTransport needs to + retries int + // backoffDeadline is the time until which resetTransport needs to // wait before increasing retries count. - backoffUntil *time.Timer + backoffDeadline time.Time + // connectDeadline is the time by which dial and tls handshake should + // finish. + connectDeadline time.Time } // adjustParams updates parameters used to create transports upon @@ -956,6 +969,15 @@ func (ac *addrConn) errorf(format string, a ...interface{}) { // resetTransport recreates a transport to the address for ac. The old // transport will close itself on error or when the clientconn is closed. +// The created transport must receive initial settings frame from the server. +// In case that doesnt happen, transportMonitor will kill the newly created +// transport after connectDeadline has expired. +// In case there was an error on the tranpsort before the settings frame was +// received, resetTransport resumes connecting to backends after the one that +// is previously connected to. In case end of the list is reached, resetTransport +// backs off until the original deadline. +// If the DialOption WithWaitForServerSettings was set, resetTrasport returns +// successfully only after server settings are received. // // TODO(bar) make sure all state transitions are valid. func (ac *addrConn) resetTransport() error { @@ -975,17 +997,12 @@ func (ac *addrConn) resetTransport() error { ac.dopts.copts.KeepaliveParams = ac.cc.mkp ac.cc.mu.RUnlock() var ( - backoffUntil *time.Timer - dialDeadline time.Time + backoffDeadline time.Time + connectDeadline time.Time ) - defer func() { - if backoffUntil != nil { - backoffUntil.Stop() - } - }() for retries := 0; ; retries++ { ac.mu.Lock() - if ac.backoffUntil == nil { + if ac.backoffDeadline.IsZero() { // This means either a successfull HTTP2 connection was established // or this is the first time this addrConn is trying to establish a // connection. @@ -996,15 +1013,18 @@ func (ac *addrConn) resetTransport() error { // Give dial more time as we keep failing to connect. dialDuration = backoffFor } - dialDeadline = time.Now().Add(dialDuration) - backoffUntil = time.NewTimer(backoffFor) + now := time.Now() + backoffDeadline = now.Add(backoffFor) + connectDeadline = now.Add(dialDuration) prevAddr = resolver.Address{} // Start connecting from the begining. } else { // Continue trying to conect with the same deadlines. retries = ac.retries - backoffUntil = ac.backoffUntil - dialDeadline = ac.dialDeadline - ac.backoffUntil = nil + backoffDeadline = ac.backoffDeadline + connectDeadline = ac.connectDeadline + ac.backoffDeadline = time.Time{} + ac.connectDeadline = time.Time{} + ac.retries = 0 } if ac.state == connectivity.Shutdown { ac.mu.Unlock() @@ -1043,15 +1063,22 @@ func (ac *addrConn) resetTransport() error { Metadata: addr.Metadata, Authority: ac.cc.authority, } - newTransport, err := transport.NewClientTransport(ac.cc.ctx, sinfo, copts, dialDeadline, func() { + connectCtx, cancel := context.WithDeadline(ac.ctx, connectDeadline) + done := make(chan struct{}) + newTransport, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, sinfo, copts, func() { + close(done) ac.mu.Lock() - if ac.backoffUntil != nil { - ac.backoffUntil.Stop() - ac.backoffUntil = nil + if !ac.backoffDeadline.IsZero() { + ac.backoffDeadline = time.Time{} + ac.connectDeadline = time.Time{} + ac.retries = 0 } ac.mu.Unlock() }) if err != nil { + // Do not cancel in the success path because of + // this issue in Go1.6: TODO(mmukhi): reference the issue. + cancel() if e, ok := err.(transport.ConnectionError); ok && !e.Temporary() { ac.mu.Lock() if ac.state != connectivity.Shutdown { @@ -1071,14 +1098,26 @@ func (ac *addrConn) resetTransport() error { ac.mu.Unlock() continue } + if ac.dopts.waitForServerSettings { + select { + case <-done: + case <-connectCtx.Done(): + // Didn't receive server preface, must kill this new + // transport now. + grpclog.Errorf("grpc: addrConn.resetTransport didn't get server preface after waiting. Closing the new transport now.") + newTransport.Close() + continue + case <-ac.ctx.Done(): + } + } ac.mu.Lock() - ac.printf("ready") if ac.state == connectivity.Shutdown { // ac.tearDown(...) has been invoked. ac.mu.Unlock() newTransport.Close() return errConnClosing } + ac.printf("ready") ac.state = connectivity.Ready ac.cc.handleSubConnStateChange(ac.acbw, ac.state) t := ac.transport @@ -1095,9 +1134,8 @@ func (ac *addrConn) resetTransport() error { // it gets server preface thus signifying that a successful // HTTP2 connection was established. ac.retries = retries - ac.backoffUntil = backoffUntil - ac.dialDeadline = dialDeadline - backoffUntil = nil // So that the defer function doesn't stop it. + ac.backoffDeadline = backoffDeadline + ac.connectDeadline = connectDeadline ac.mu.Unlock() return nil } @@ -1110,12 +1148,14 @@ func (ac *addrConn) resetTransport() error { ac.ready = nil } ac.mu.Unlock() + backoffTimer := time.NewTimer(time.Until(backoffDeadline)) select { - case <-backoffUntil.C: + case <-backoffTimer.C: case <-ac.ctx.Done(): + backoffTimer.Stop() return ac.ctx.Err() } - backoffUntil.Stop() + backoffTimer.Stop() } } @@ -1123,13 +1163,41 @@ func (ac *addrConn) resetTransport() error { // new transport if an error happens. It returns when the channel is closing. func (ac *addrConn) transportMonitor() { for { + var timer *time.Timer + var cdeadline <-chan time.Time ac.mu.Lock() t := ac.transport + if !ac.connectDeadline.IsZero() { + timer = time.NewTimer(time.Until(ac.connectDeadline)) + cdeadline = timer.C + } ac.mu.Unlock() // Block until we receive a goaway or an error occurs. select { case <-t.GoAway(): case <-t.Error(): + case <-cdeadline: + timer.Stop() + var isConnected bool + ac.mu.Lock() + if ac.backoffDeadline.IsZero() { + // This implies that client received server + // preface. + isConnected = true + } + ac.mu.Unlock() + if isConnected { + continue + } + timer = nil + // No server preface received until deadline. + // Kill the connection. + grpclog.Errorf("grpc: addrConn.transportMonitor didn't get server preface after waiting. Closing the new transport now.") + t.Close() + break + } + if timer != nil { + timer.Stop() } // If a GoAway happened, regardless of error, adjust our keepalive // parameters as appropriate. diff --git a/clientconn_test.go b/clientconn_test.go index 70a7ec2736e7..7e9958ec4ab8 100644 --- a/clientconn_test.go +++ b/clientconn_test.go @@ -19,12 +19,14 @@ package grpc import ( + "io" "math" "net" "testing" "time" "golang.org/x/net/context" + "golang.org/x/net/http2" "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials" @@ -46,6 +48,130 @@ func assertState(wantState connectivity.State, cc *ClientConn) (connectivity.Sta return state, state == wantState } +func TestDialWaitsForServerSettings(t *testing.T) { + defer leakcheck.Check(t) + server, err := net.Listen("tcp", "localhost:0") + if err != nil { + t.Fatalf("Error while listening. Err: %v", err) + } + defer server.Close() + done := make(chan struct{}) + sent := make(chan struct{}) + dialDone := make(chan struct{}) + go func() { // Launch the server. + defer func() { + close(done) + }() + conn, err := server.Accept() + if err != nil { + t.Errorf("Error while accepting. Err: %v", err) + return + } + defer conn.Close() + // Sleep so that if the test were to fail it + // will fail more often than not. + time.Sleep(100 * time.Millisecond) + framer := http2.NewFramer(conn, conn) + close(sent) + if err := framer.WriteSettings(http2.Setting{}); err != nil { + t.Errorf("Error while writing settings. Err: %v", err) + return + } + <-dialDone // Close conn only after dial returns. + }() + ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) + client, err := DialContext(ctx, server.Addr().String(), WithInsecure(), WithWaitForServerSettings()) + close(dialDone) + if err != nil { + cancel() + t.Fatalf("Error while dialing. Err: %v", err) + } + defer client.Close() + select { + case <-sent: + default: + t.Fatalf("Dial returned before server settings were sent") + } + <-done + +} + +func TestCloseConnectionWhenServerPrefaceNotReceived(t *testing.T) { + defer leakcheck.Check(t) + mctBkp := minConnectTimeout + defer func() { + minConnectTimeout = mctBkp + }() + minConnectTimeout = time.Millisecond * 500 + server, err := net.Listen("tcp", "localhost:0") + if err != nil { + t.Fatalf("Error while listening. Err: %v", err) + } + defer server.Close() + done := make(chan struct{}) + go func() { // Launch the server. + defer func() { + close(done) + }() + conn1, err := server.Accept() + if err != nil { + t.Errorf("Error while accepting. Err: %v", err) + return + } + defer conn1.Close() + // Don't send server settings and make sure the connection is closed. + time.Sleep(time.Millisecond * 1500) // Since the first backoff is for a second. + conn1.SetDeadline(time.Now().Add(time.Second)) + b := make([]byte, 24) + for { + // Make sure the connection was closed by client. + _, err = conn1.Read(b) + if err == nil { + continue + } + if err != io.EOF { + t.Errorf(" conn1.Read(_) = _, %v, want _, io.EOF", err) + return + } + break + } + + conn2, err := server.Accept() // Accept a reconnection request from client. + if err != nil { + t.Errorf("Error while accepting. Err: %v", err) + return + } + defer conn2.Close() + framer := http2.NewFramer(conn2, conn2) + if err := framer.WriteSettings(http2.Setting{}); err != nil { + t.Errorf("Error while writing settings. Err: %v", err) + return + } + time.Sleep(time.Millisecond * 1500) // Since the first backoff is for a second. + conn2.SetDeadline(time.Now().Add(time.Millisecond * 500)) + for { + // Make sure the conneciton stays open and is closed + // only by connection timeout. + _, err = conn2.Read(b) + if err == nil { + continue + } + if nerr, ok := err.(net.Error); ok && nerr.Timeout() { + return + } + t.Errorf("Unexpected error while reading, wanted timeout error", err) + break + } + + }() + client, err := Dial(server.Addr().String(), WithInsecure()) + if err != nil { + t.Fatalf("Error while dialing. Err: %v", err) + } + defer client.Close() + <-done +} + func TestBackoffWhenNoServerPrefaceReceived(t *testing.T) { defer leakcheck.Check(t) server, err := net.Listen("tcp", "localhost:0") diff --git a/transport/http2_client.go b/transport/http2_client.go index 643ef4aa5e3a..734ac4cade95 100644 --- a/transport/http2_client.go +++ b/transport/http2_client.go @@ -150,16 +150,12 @@ func isTemporary(err error) bool { // newHTTP2Client constructs a connected ClientTransport to addr based on HTTP2 // and starts to receive messages on it. Non-nil error returns if construction // fails. -func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions, deadline time.Time, onSuccess func()) (_ ClientTransport, err error) { +func newHTTP2Client(connectCtx context.Context, ctx context.Context, addr TargetInfo, opts ConnectOptions, onSuccess func()) (_ ClientTransport, err error) { scheme := "http" ctx, cancel := context.WithCancel(ctx) - connectCtx, connectCancel := context.WithDeadline(ctx, deadline) defer func() { if err != nil { cancel() - // Don't call connectCancel in success path due to a race in Go 1.6: - // https://github.com/golang/go/issues/15078. - connectCancel() } }() diff --git a/transport/transport.go b/transport/transport.go index 3f31ec946772..d8c723ce673d 100644 --- a/transport/transport.go +++ b/transport/transport.go @@ -26,7 +26,6 @@ import ( "io" "net" "sync" - "time" "golang.org/x/net/context" "golang.org/x/net/http2" @@ -506,8 +505,8 @@ type TargetInfo struct { // NewClientTransport establishes the transport with the required ConnectOptions // and returns it to the caller. -func NewClientTransport(ctx context.Context, target TargetInfo, opts ConnectOptions, deadline time.Time, onSuccess func()) (ClientTransport, error) { - return newHTTP2Client(ctx, target, opts, deadline, onSuccess) +func NewClientTransport(connectCtx context.Context, ctx context.Context, target TargetInfo, opts ConnectOptions, onSuccess func()) (ClientTransport, error) { + return newHTTP2Client(connectCtx, ctx, target, opts, onSuccess) } // Options provides additional hints and information for message diff --git a/transport/transport_test.go b/transport/transport_test.go index 0a5d287df98e..67f3bcaf1621 100644 --- a/transport/transport_test.go +++ b/transport/transport_test.go @@ -361,8 +361,10 @@ func setUpWithOptions(t *testing.T, port int, serverConfig *ServerConfig, ht hTy target := TargetInfo{ Addr: addr, } - ct, connErr = NewClientTransport(context.Background(), target, copts, time.Now().Add(2*time.Second), func() {}) + connectCtx, cancel := context.WithDeadline(context.Background(), time.Now().Add(2*time.Second)) + ct, connErr = NewClientTransport(connectCtx, context.Background(), target, copts, func() {}) if connErr != nil { + cancel() // Do not cancel in success path. t.Fatalf("failed to create transport: %v", connErr) } return server, ct @@ -384,8 +386,10 @@ func setUpWithNoPingServer(t *testing.T, copts ConnectOptions, done chan net.Con } done <- conn }() - tr, err := NewClientTransport(context.Background(), TargetInfo{Addr: lis.Addr().String()}, copts, time.Now().Add(2*time.Second), func() {}) + connectCtx, cancel := context.WithDeadline(context.Background(), time.Now().Add(2*time.Second)) + tr, err := NewClientTransport(connectCtx, context.Background(), TargetInfo{Addr: lis.Addr().String()}, copts, func() {}) if err != nil { + cancel() // Do not cancel in success path. // Server clean-up. lis.Close() if conn, ok := <-done; ok { @@ -2091,8 +2095,10 @@ func setUpHTTPStatusTest(t *testing.T, httpStatus int, wh writeHeaders) (stream wh: wh, } server.start(t, lis) - client, err = newHTTP2Client(context.Background(), TargetInfo{Addr: lis.Addr().String()}, ConnectOptions{}, time.Now().Add(2*time.Second), func() {}) + connectCtx, cancel := context.WithDeadline(context.Background(), time.Now().Add(2*time.Second)) + client, err = newHTTP2Client(connectCtx, context.Background(), TargetInfo{Addr: lis.Addr().String()}, ConnectOptions{}, func() {}) if err != nil { + cancel() // Do not cancel in success path. t.Fatalf("Error creating client. Err: %v", err) } stream, err = client.NewStream(context.Background(), &CallHdr{Method: "bogus/method", Flush: true}) From cbb796599580b4298e692d8306583072150f9230 Mon Sep 17 00:00:00 2001 From: Mahak Mukhi Date: Tue, 7 Nov 2017 15:56:22 -0800 Subject: [PATCH 03/11] Fix golang version related issues. --- clientconn.go | 4 ++-- transport/http2_client.go | 2 +- transport/transport.go | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/clientconn.go b/clientconn.go index 7c446952d98a..a92df0476979 100644 --- a/clientconn.go +++ b/clientconn.go @@ -1148,7 +1148,7 @@ func (ac *addrConn) resetTransport() error { ac.ready = nil } ac.mu.Unlock() - backoffTimer := time.NewTimer(time.Until(backoffDeadline)) + backoffTimer := time.NewTimer(backoffDeadline.Sub(time.Now())) select { case <-backoffTimer.C: case <-ac.ctx.Done(): @@ -1168,7 +1168,7 @@ func (ac *addrConn) transportMonitor() { ac.mu.Lock() t := ac.transport if !ac.connectDeadline.IsZero() { - timer = time.NewTimer(time.Until(ac.connectDeadline)) + timer = time.NewTimer(ac.connectDeadline.Sub(time.Now())) cdeadline = timer.C } ac.mu.Unlock() diff --git a/transport/http2_client.go b/transport/http2_client.go index 734ac4cade95..4c9f808fbcce 100644 --- a/transport/http2_client.go +++ b/transport/http2_client.go @@ -150,7 +150,7 @@ func isTemporary(err error) bool { // newHTTP2Client constructs a connected ClientTransport to addr based on HTTP2 // and starts to receive messages on it. Non-nil error returns if construction // fails. -func newHTTP2Client(connectCtx context.Context, ctx context.Context, addr TargetInfo, opts ConnectOptions, onSuccess func()) (_ ClientTransport, err error) { +func newHTTP2Client(connectCtx, ctx context.Context, addr TargetInfo, opts ConnectOptions, onSuccess func()) (_ ClientTransport, err error) { scheme := "http" ctx, cancel := context.WithCancel(ctx) defer func() { diff --git a/transport/transport.go b/transport/transport.go index d8c723ce673d..86b23ef7d20c 100644 --- a/transport/transport.go +++ b/transport/transport.go @@ -505,7 +505,7 @@ type TargetInfo struct { // NewClientTransport establishes the transport with the required ConnectOptions // and returns it to the caller. -func NewClientTransport(connectCtx context.Context, ctx context.Context, target TargetInfo, opts ConnectOptions, onSuccess func()) (ClientTransport, error) { +func NewClientTransport(connectCtx, ctx context.Context, target TargetInfo, opts ConnectOptions, onSuccess func()) (ClientTransport, error) { return newHTTP2Client(connectCtx, ctx, target, opts, onSuccess) } From 69269cbcfbd49bd7768f2a54a8cea201faeaeef5 Mon Sep 17 00:00:00 2001 From: Mahak Mukhi Date: Tue, 7 Nov 2017 16:37:02 -0800 Subject: [PATCH 04/11] Struggles with vet. --- clientconn.go | 3 +-- clientconn_test.go | 2 +- vet.sh | 2 +- 3 files changed, 3 insertions(+), 4 deletions(-) diff --git a/clientconn.go b/clientconn.go index a92df0476979..745dd75f218a 100644 --- a/clientconn.go +++ b/clientconn.go @@ -1077,7 +1077,7 @@ func (ac *addrConn) resetTransport() error { }) if err != nil { // Do not cancel in the success path because of - // this issue in Go1.6: TODO(mmukhi): reference the issue. + // this issue in Go1.6: https://github.com/golang/go/issues/15078. cancel() if e, ok := err.(transport.ConnectionError); ok && !e.Temporary() { ac.mu.Lock() @@ -1194,7 +1194,6 @@ func (ac *addrConn) transportMonitor() { // Kill the connection. grpclog.Errorf("grpc: addrConn.transportMonitor didn't get server preface after waiting. Closing the new transport now.") t.Close() - break } if timer != nil { timer.Stop() diff --git a/clientconn_test.go b/clientconn_test.go index 7e9958ec4ab8..f9d195bc346e 100644 --- a/clientconn_test.go +++ b/clientconn_test.go @@ -159,7 +159,7 @@ func TestCloseConnectionWhenServerPrefaceNotReceived(t *testing.T) { if nerr, ok := err.(net.Error); ok && nerr.Timeout() { return } - t.Errorf("Unexpected error while reading, wanted timeout error", err) + t.Errorf("Unexpected error while reading. Err: %v, want timeout error", err) break } diff --git a/vet.sh b/vet.sh index 02d4bae39a98..e1668c78a757 100755 --- a/vet.sh +++ b/vet.sh @@ -65,7 +65,7 @@ trap cleanup EXIT git ls-files "*.go" | xargs sed -i 's:"golang.org/x/net/context":"context":' set +o pipefail # TODO: Stop filtering pb.go files once golang/protobuf#214 is fixed. -go tool vet -all . 2>&1 | grep -vF '.pb.go:' | tee /dev/stderr | (! read) +go tool vet -all . 2>&1 | grep -vE 'cancel (function|var)' | grep -vF '.pb.go:' | tee /dev/stderr | (! read) set -o pipefail git reset --hard HEAD From 0db18b16a750798fd6a5ecdd839ac40f1301611e Mon Sep 17 00:00:00 2001 From: Mahak Mukhi Date: Thu, 9 Nov 2017 10:16:52 -0800 Subject: [PATCH 05/11] One more test. --- clientconn.go | 1 - clientconn_test.go | 79 +++++++++++++++++++++++++++++++++++++-- resolver/manual/manual.go | 12 +++++- 3 files changed, 86 insertions(+), 6 deletions(-) diff --git a/clientconn.go b/clientconn.go index 745dd75f218a..eb34197576db 100644 --- a/clientconn.go +++ b/clientconn.go @@ -116,7 +116,6 @@ type DialOption func(*dialOptions) func WithWaitForServerSettings() DialOption { return func(o *dialOptions) { o.waitForServerSettings = true - o.block = true } } diff --git a/clientconn_test.go b/clientconn_test.go index f9d195bc346e..a1f3169235e3 100644 --- a/clientconn_test.go +++ b/clientconn_test.go @@ -48,6 +48,77 @@ func assertState(wantState connectivity.State, cc *ClientConn) (connectivity.Sta return state, state == wantState } +func TestDialWithMultipleBackendsNotSendingServerPreface(t *testing.T) { + defer leakcheck.Check(t) + numServers := 2 + servers := make([]net.Listener, numServers) + var err error + for i := 0; i < numServers; i++ { + servers[i], err = net.Listen("tcp", "localhost:0") + if err != nil { + t.Fatalf("Error while listening. Err: %v", err) + } + } + dones := make([]chan struct{}, numServers) + for i := 0; i < numServers; i++ { + dones[i] = make(chan struct{}) + } + for i := 0; i < numServers; i++ { + go func(i int) { + defer func() { + close(dones[i]) + }() + conn, err := servers[i].Accept() + if err != nil { + t.Errorf("Error while accepting. Err: %v", err) + return + } + defer conn.Close() + switch i { + case 0: // 1st server accepts the connection and immediately closes it. + case 1: // 2nd server accepts the connection and sends settings frames. + framer := http2.NewFramer(conn, conn) + if err := framer.WriteSettings(http2.Setting{}); err != nil { + t.Errorf("Error while writing settings frame. %v", err) + return + } + conn.SetDeadline(time.Now().Add(time.Second)) + buf := make([]byte, 1024) + for { // Make sure the connection stays healthy. + _, err = conn.Read(buf) + if err == nil { + continue + } + if nerr, ok := err.(net.Error); !ok || !nerr.Timeout() { + t.Errorf("Server expected the conn.Read(_) to timeout instead got error: %v", err) + } + return + } + } + }(i) + } + r, cleanup := manual.GenerateAndRegisterManualResolver() + defer cleanup() + resolvedAddrs := make([]resolver.Address, numServers) + for i := 0; i < numServers; i++ { + resolvedAddrs[i] = resolver.Address{Addr: servers[i].Addr().String()} + } + r.BootstrapWithAddrs(resolvedAddrs) + client, err := Dial(r.Scheme()+":///test.server", WithInsecure()) + if err != nil { + t.Errorf("Dial failed. Err: %v", err) + } else { + defer client.Close() + } + time.Sleep(time.Second) // Close the servers after a second for cleanup. + for _, s := range servers { + s.Close() + } + for _, done := range dones { + <-done + } +} + func TestDialWaitsForServerSettings(t *testing.T) { defer leakcheck.Check(t) server, err := net.Listen("tcp", "localhost:0") @@ -80,7 +151,7 @@ func TestDialWaitsForServerSettings(t *testing.T) { <-dialDone // Close conn only after dial returns. }() ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) - client, err := DialContext(ctx, server.Addr().String(), WithInsecure(), WithWaitForServerSettings()) + client, err := DialContext(ctx, server.Addr().String(), WithInsecure(), WithWaitForServerSettings(), WithBlock()) close(dialDone) if err != nil { cancel() @@ -199,15 +270,15 @@ func TestBackoffWhenNoServerPrefaceReceived(t *testing.T) { t.Errorf("Error while accepting. Err: %v", err) return } - now := time.Now() + meow := time.Now() conn.Close() - dr := now.Sub(prevAt) + dr := meow.Sub(prevAt) if dr <= prevDuration { t.Errorf("Client backoff did not increase with retries. Previoud duration: %v, current duration: %v", prevDuration, dr) return } prevDuration = dr - prevAt = now + prevAt = meow } }() client, err := Dial(server.Addr().String(), WithInsecure()) diff --git a/resolver/manual/manual.go b/resolver/manual/manual.go index 7a79090312d8..714bd8d3cea6 100644 --- a/resolver/manual/manual.go +++ b/resolver/manual/manual.go @@ -40,12 +40,22 @@ type Resolver struct { scheme string // Fields actually belong to the resolver. - cc resolver.ClientConn + cc resolver.ClientConn + bootstrapAddrs []resolver.Address +} + +// BootstrapWithAddrs adds resloved addresses to the resolver so that +// NewAddress doesn't need to be explicitly called after Dial. +func (r *Resolver) BootstrapWithAddrs(addrs []resolver.Address) { + r.bootstrapAddrs = addrs } // Build returns itself for Resolver, because it's both a builder and a resolver. func (r *Resolver) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOption) (resolver.Resolver, error) { r.cc = cc + if r.bootstrapAddrs != nil { + r.NewAddress(r.bootstrapAddrs) + } return r, nil } From 887c8db3380c57a477c027b1f2ff381ed1a16d3e Mon Sep 17 00:00:00 2001 From: Mahak Mukhi Date: Thu, 9 Nov 2017 11:44:35 -0800 Subject: [PATCH 06/11] Shame of misspelled words. --- clientconn.go | 6 +++--- clientconn_test.go | 2 +- resolver/manual/manual.go | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/clientconn.go b/clientconn.go index eb34197576db..f21cd296e935 100644 --- a/clientconn.go +++ b/clientconn.go @@ -1002,7 +1002,7 @@ func (ac *addrConn) resetTransport() error { for retries := 0; ; retries++ { ac.mu.Lock() if ac.backoffDeadline.IsZero() { - // This means either a successfull HTTP2 connection was established + // This means either a successful HTTP2 connection was established // or this is the first time this addrConn is trying to establish a // connection. backoffFor := ac.dopts.bs.backoff(retries) // time.Duration. @@ -1015,7 +1015,7 @@ func (ac *addrConn) resetTransport() error { now := time.Now() backoffDeadline = now.Add(backoffFor) connectDeadline = now.Add(dialDuration) - prevAddr = resolver.Address{} // Start connecting from the begining. + prevAddr = resolver.Address{} // Start connecting from the beginning. } else { // Continue trying to conect with the same deadlines. retries = ac.retries @@ -1039,7 +1039,7 @@ func (ac *addrConn) resetTransport() error { copy(addrsIter, ac.addrs) addrIdx := 0 for idx, addr := range addrsIter { - // Find the previous attemted address and start with + // Find the previous attempted address and start with // the one after it. if addr.Addr == prevAddr.Addr { addrIdx = idx + 1 diff --git a/clientconn_test.go b/clientconn_test.go index a1f3169235e3..eba63d146e57 100644 --- a/clientconn_test.go +++ b/clientconn_test.go @@ -221,7 +221,7 @@ func TestCloseConnectionWhenServerPrefaceNotReceived(t *testing.T) { time.Sleep(time.Millisecond * 1500) // Since the first backoff is for a second. conn2.SetDeadline(time.Now().Add(time.Millisecond * 500)) for { - // Make sure the conneciton stays open and is closed + // Make sure the connection stays open and is closed // only by connection timeout. _, err = conn2.Read(b) if err == nil { diff --git a/resolver/manual/manual.go b/resolver/manual/manual.go index 714bd8d3cea6..635d228d04ee 100644 --- a/resolver/manual/manual.go +++ b/resolver/manual/manual.go @@ -44,7 +44,7 @@ type Resolver struct { bootstrapAddrs []resolver.Address } -// BootstrapWithAddrs adds resloved addresses to the resolver so that +// BootstrapWithAddrs adds resolved addresses to the resolver so that // NewAddress doesn't need to be explicitly called after Dial. func (r *Resolver) BootstrapWithAddrs(addrs []resolver.Address) { r.bootstrapAddrs = addrs From 16769cf6c7169ad2a835b8620a69a28d8e478d4e Mon Sep 17 00:00:00 2001 From: Mahak Mukhi Date: Mon, 27 Nov 2017 16:01:05 -0800 Subject: [PATCH 07/11] Post-review update. --- clientconn.go | 247 +++++++++++++++++++------------------- clientconn_test.go | 6 +- resolver/manual/manual.go | 4 +- 3 files changed, 130 insertions(+), 127 deletions(-) diff --git a/clientconn.go b/clientconn.go index f21cd296e935..087d6d266f7e 100644 --- a/clientconn.go +++ b/clientconn.go @@ -98,8 +98,8 @@ type dialOptions struct { // This is to support v1 balancer. balancerBuilder balancer.Builder // This is to support grpclb. - resolverBuilder resolver.Builder - waitForServerSettings bool + resolverBuilder resolver.Builder + waitForHandshake bool } const ( @@ -110,12 +110,12 @@ const ( // DialOption configures how we set up the connection. type DialOption func(*dialOptions) -// WithWaitForServerSettings makes the Dial wait until the client receives initial settings (server preface) -// from the server. +// WithWaitForHandshake blocks until the initial settings frame is received from the +// server before assigning RPCs to the connection. // Experimental API. -func WithWaitForServerSettings() DialOption { +func WithWaitForHandshake() DialOption { return func(o *dialOptions) { - o.waitForServerSettings = true + o.waitForHandshake = true } } @@ -250,7 +250,7 @@ func WithBackoffConfig(b BackoffConfig) DialOption { return withBackoff(b) } -// withBackoff sets the backoff strategy used for retries after a +// withBackoff sets the backoff strategy used for connectRetryNum after a // failed connection attempt. // // This can be exported if arbitrary backoff strategies are allowed by gRPC. @@ -927,12 +927,12 @@ type addrConn struct { // The reason this addrConn is torn down. tearDownErr error - retries int + connectRetryNum int // backoffDeadline is the time until which resetTransport needs to - // wait before increasing retries count. + // wait before increasing connectRetryNum count. backoffDeadline time.Time - // connectDeadline is the time by which dial and tls handshake should - // finish. + // connectDeadline is the time by which all connection + // negotiations must complete. connectDeadline time.Time } @@ -971,11 +971,11 @@ func (ac *addrConn) errorf(format string, a ...interface{}) { // The created transport must receive initial settings frame from the server. // In case that doesnt happen, transportMonitor will kill the newly created // transport after connectDeadline has expired. -// In case there was an error on the tranpsort before the settings frame was +// In case there was an error on the transport before the settings frame was // received, resetTransport resumes connecting to backends after the one that -// is previously connected to. In case end of the list is reached, resetTransport +// was previously connected to. In case end of the list is reached, resetTransport // backs off until the original deadline. -// If the DialOption WithWaitForServerSettings was set, resetTrasport returns +// If the DialOption WithWaitForHandshake was set, resetTrasport returns // successfully only after server settings are received. // // TODO(bar) make sure all state transitions are valid. @@ -995,35 +995,32 @@ func (ac *addrConn) resetTransport() error { ac.cc.mu.RLock() ac.dopts.copts.KeepaliveParams = ac.cc.mkp ac.cc.mu.RUnlock() - var ( - backoffDeadline time.Time - connectDeadline time.Time - ) - for retries := 0; ; retries++ { + var backoffDeadline, connectDeadline time.Time + for connectRetryNum := 0; ; connectRetryNum++ { ac.mu.Lock() if ac.backoffDeadline.IsZero() { // This means either a successful HTTP2 connection was established // or this is the first time this addrConn is trying to establish a // connection. - backoffFor := ac.dopts.bs.backoff(retries) // time.Duration. + backoffFor := ac.dopts.bs.backoff(connectRetryNum) // time.Duration. // This will be the duration that dial gets to finish. dialDuration := minConnectTimeout if backoffFor > dialDuration { // Give dial more time as we keep failing to connect. dialDuration = backoffFor } - now := time.Now() - backoffDeadline = now.Add(backoffFor) - connectDeadline = now.Add(dialDuration) + start := time.Now() + backoffDeadline = start.Add(backoffFor) + connectDeadline = start.Add(dialDuration) prevAddr = resolver.Address{} // Start connecting from the beginning. } else { // Continue trying to conect with the same deadlines. - retries = ac.retries + connectRetryNum = ac.connectRetryNum backoffDeadline = ac.backoffDeadline connectDeadline = ac.connectDeadline ac.backoffDeadline = time.Time{} ac.connectDeadline = time.Time{} - ac.retries = 0 + ac.connectRetryNum = 0 } if ac.state == connectivity.Shutdown { ac.mu.Unlock() @@ -1037,125 +1034,132 @@ func (ac *addrConn) resetTransport() error { // copy ac.addrs in case of race addrsIter := make([]resolver.Address, len(ac.addrs)) copy(addrsIter, ac.addrs) - addrIdx := 0 - for idx, addr := range addrsIter { - // Find the previous attempted address and start with - // the one after it. - if addr.Addr == prevAddr.Addr { - addrIdx = idx + 1 - break - } - } copts := ac.dopts.copts ac.mu.Unlock() - for i := addrIdx; i < len(addrsIter); i++ { - addr := addrsIter[i] + connected, err := ac.createTransport(connectRetryNum, backoffDeadline, connectDeadline, addrsIter, prevAddr, copts) + if err != nil { + return err + } + if connected { + return nil + } + } +} + +// createTransport creates a connection to one of the backends in addrs. +// It returns true if a connection was established. +func (ac *addrConn) createTransport(connectRetryNum int, backoffDeadline, connectDeadline time.Time, addrs []resolver.Address, prevAddr resolver.Address, copts transport.ConnectOptions) (bool, error) { + startFrom := 0 + for idx, addr := range addrs { + if addr.Addr == prevAddr.Addr { + // Find the previous attempted address and start with the + // one after it. + startFrom = idx + 1 + break + } + } + for i := startFrom; i < len(addrs); i++ { + addr := addrs[i] + target := transport.TargetInfo{ + Addr: addr.Addr, + Metadata: addr.Metadata, + Authority: ac.cc.authority, + } + done := make(chan struct{}) + onPrefaceReceipt := func() { + close(done) ac.mu.Lock() - if ac.state == connectivity.Shutdown { - // ac.tearDown(...) has been invoked. - ac.mu.Unlock() - return errConnClosing + if !ac.backoffDeadline.IsZero() { + // If we haven't already started reconnecting to + // other backends. + // Note, this can happen when writer notices an error + // and triggers resetTransport while at the same time + // reader receives the preface and invokes this closure. + ac.backoffDeadline = time.Time{} + ac.connectDeadline = time.Time{} + ac.connectRetryNum = 0 } ac.mu.Unlock() - sinfo := transport.TargetInfo{ - Addr: addr.Addr, - Metadata: addr.Metadata, - Authority: ac.cc.authority, - } - connectCtx, cancel := context.WithDeadline(ac.ctx, connectDeadline) - done := make(chan struct{}) - newTransport, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, sinfo, copts, func() { - close(done) - ac.mu.Lock() - if !ac.backoffDeadline.IsZero() { - ac.backoffDeadline = time.Time{} - ac.connectDeadline = time.Time{} - ac.retries = 0 - } - ac.mu.Unlock() - }) - if err != nil { - // Do not cancel in the success path because of - // this issue in Go1.6: https://github.com/golang/go/issues/15078. - cancel() - if e, ok := err.(transport.ConnectionError); ok && !e.Temporary() { - ac.mu.Lock() - if ac.state != connectivity.Shutdown { - ac.state = connectivity.TransientFailure - ac.cc.handleSubConnStateChange(ac.acbw, ac.state) - } - ac.mu.Unlock() - return err - } - grpclog.Warningf("grpc: addrConn.resetTransport failed to create client transport: %v; Reconnecting to %v", err, addr) + } + // Do not cancel in the success path because of + // this issue in Go1.6: https://github.com/golang/go/issues/15078. + connectCtx, cancel := context.WithDeadline(ac.ctx, connectDeadline) + newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, target, copts, onPrefaceReceipt) + if err != nil { + cancel() + if e, ok := err.(transport.ConnectionError); ok && !e.Temporary() { ac.mu.Lock() - if ac.state == connectivity.Shutdown { - // ac.tearDown(...) has been invoked. - ac.mu.Unlock() - return errConnClosing + if ac.state != connectivity.Shutdown { + ac.state = connectivity.TransientFailure + ac.cc.handleSubConnStateChange(ac.acbw, ac.state) } ac.mu.Unlock() - continue - } - if ac.dopts.waitForServerSettings { - select { - case <-done: - case <-connectCtx.Done(): - // Didn't receive server preface, must kill this new - // transport now. - grpclog.Errorf("grpc: addrConn.resetTransport didn't get server preface after waiting. Closing the new transport now.") - newTransport.Close() - continue - case <-ac.ctx.Done(): - } + return false, err } ac.mu.Lock() if ac.state == connectivity.Shutdown { // ac.tearDown(...) has been invoked. ac.mu.Unlock() - newTransport.Close() - return errConnClosing - } - ac.printf("ready") - ac.state = connectivity.Ready - ac.cc.handleSubConnStateChange(ac.acbw, ac.state) - t := ac.transport - ac.transport = newTransport - if t != nil { - t.Close() - } - ac.curAddr = addr - if ac.ready != nil { - close(ac.ready) - ac.ready = nil + return false, errConnClosing } - // Following will be reset by the client transport when - // it gets server preface thus signifying that a successful - // HTTP2 connection was established. - ac.retries = retries - ac.backoffDeadline = backoffDeadline - ac.connectDeadline = connectDeadline ac.mu.Unlock() - return nil + grpclog.Warningf("grpc: addrConn.createTransport failed to connect to %v. Err :%v. Reconnecting...", addr, err) + continue + } + if ac.dopts.waitForHandshake { + select { + case <-done: + case <-connectCtx.Done(): + // Didn't receive server preface, must kill this new + // transport now. + grpclog.Warningf("grpc: addrConn.createTransport will close the newly established transport since no server preface was received.") + newTr.Close() + continue + case <-ac.ctx.Done(): + } } ac.mu.Lock() - ac.state = connectivity.TransientFailure + if ac.state == connectivity.Shutdown { + ac.mu.Unlock() + // ac.tearDonn(...) has been invoked. + newTr.Close() + return false, errConnClosing + } + ac.printf("ready") + ac.state = connectivity.Ready ac.cc.handleSubConnStateChange(ac.acbw, ac.state) - ac.cc.resolveNow(resolver.ResolveNowOption{}) + if ac.transport != nil { + ac.transport.Close() + } + ac.transport = newTr + ac.curAddr = addr if ac.ready != nil { close(ac.ready) ac.ready = nil } + ac.connectRetryNum = connectRetryNum + ac.backoffDeadline = backoffDeadline + ac.connectDeadline = connectDeadline ac.mu.Unlock() - backoffTimer := time.NewTimer(backoffDeadline.Sub(time.Now())) - select { - case <-backoffTimer.C: - case <-ac.ctx.Done(): - backoffTimer.Stop() - return ac.ctx.Err() - } - backoffTimer.Stop() + return true, nil } + ac.mu.Lock() + ac.state = connectivity.TransientFailure + ac.cc.handleSubConnStateChange(ac.acbw, ac.state) + ac.cc.resolveNow(resolver.ResolveNowOption{}) + if ac.ready != nil { + close(ac.ready) + ac.ready = nil + } + ac.mu.Unlock() + timer := time.NewTimer(backoffDeadline.Sub(time.Now())) + select { + case <-timer.C: + case <-ac.ctx.Done(): + timer.Stop() + return false, ac.ctx.Err() + } + return false, nil } // Run in a goroutine to track the error in transport and create the @@ -1176,7 +1180,6 @@ func (ac *addrConn) transportMonitor() { case <-t.GoAway(): case <-t.Error(): case <-cdeadline: - timer.Stop() var isConnected bool ac.mu.Lock() if ac.backoffDeadline.IsZero() { @@ -1191,7 +1194,7 @@ func (ac *addrConn) transportMonitor() { timer = nil // No server preface received until deadline. // Kill the connection. - grpclog.Errorf("grpc: addrConn.transportMonitor didn't get server preface after waiting. Closing the new transport now.") + grpclog.Warningf("grpc: addrConn.transportMonitor didn't get server preface after waiting. Closing the new transport now.") t.Close() } if timer != nil { diff --git a/clientconn_test.go b/clientconn_test.go index eba63d146e57..0dfc7be5e6c4 100644 --- a/clientconn_test.go +++ b/clientconn_test.go @@ -103,7 +103,7 @@ func TestDialWithMultipleBackendsNotSendingServerPreface(t *testing.T) { for i := 0; i < numServers; i++ { resolvedAddrs[i] = resolver.Address{Addr: servers[i].Addr().String()} } - r.BootstrapWithAddrs(resolvedAddrs) + r.InitialAddrs(resolvedAddrs) client, err := Dial(r.Scheme()+":///test.server", WithInsecure()) if err != nil { t.Errorf("Dial failed. Err: %v", err) @@ -151,7 +151,7 @@ func TestDialWaitsForServerSettings(t *testing.T) { <-dialDone // Close conn only after dial returns. }() ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) - client, err := DialContext(ctx, server.Addr().String(), WithInsecure(), WithWaitForServerSettings(), WithBlock()) + client, err := DialContext(ctx, server.Addr().String(), WithInsecure(), WithWaitForHandshake(), WithBlock()) close(dialDone) if err != nil { cancel() @@ -274,7 +274,7 @@ func TestBackoffWhenNoServerPrefaceReceived(t *testing.T) { conn.Close() dr := meow.Sub(prevAt) if dr <= prevDuration { - t.Errorf("Client backoff did not increase with retries. Previoud duration: %v, current duration: %v", prevDuration, dr) + t.Errorf("Client backoff did not increase with retries. Previous duration: %v, current duration: %v", prevDuration, dr) return } prevDuration = dr diff --git a/resolver/manual/manual.go b/resolver/manual/manual.go index 635d228d04ee..50ed762a83d0 100644 --- a/resolver/manual/manual.go +++ b/resolver/manual/manual.go @@ -44,9 +44,9 @@ type Resolver struct { bootstrapAddrs []resolver.Address } -// BootstrapWithAddrs adds resolved addresses to the resolver so that +// InitialAddrs adds resolved addresses to the resolver so that // NewAddress doesn't need to be explicitly called after Dial. -func (r *Resolver) BootstrapWithAddrs(addrs []resolver.Address) { +func (r *Resolver) InitialAddrs(addrs []resolver.Address) { r.bootstrapAddrs = addrs } From c8bfe9a41fbb1ec62b96001278774e3651bdc222 Mon Sep 17 00:00:00 2001 From: Mahak Mukhi Date: Thu, 30 Nov 2017 14:16:27 -0800 Subject: [PATCH 08/11] Post-review update. --- clientconn.go | 40 ++++++++++++++-------------------------- clientconn_test.go | 1 + vet.sh | 2 +- 3 files changed, 16 insertions(+), 27 deletions(-) diff --git a/clientconn.go b/clientconn.go index 087d6d266f7e..cd514aa20330 100644 --- a/clientconn.go +++ b/clientconn.go @@ -814,6 +814,7 @@ func (ac *addrConn) tryUpdateAddrs(addrs []resolver.Address) bool { grpclog.Infof("addrConn: tryUpdateAddrs curAddrFound: %v", curAddrFound) if curAddrFound { ac.addrs = addrs + ac.reconnectIdx = 0 // Start reconnecting from begining in the new list. } return curAddrFound @@ -915,10 +916,10 @@ type addrConn struct { events trace.EventLog acbw balancer.SubConn - mu sync.Mutex - curAddr resolver.Address - prevAddr resolver.Address - state connectivity.State + mu sync.Mutex + curAddr resolver.Address + reconnectIdx int // The index in adder list to start reconnecting from. + state connectivity.State // ready is closed and becomes nil when a new transport is up or failed // due to timeout. ready chan struct{} @@ -990,7 +991,7 @@ func (ac *addrConn) resetTransport() error { ac.ready = nil } ac.transport = nil - prevAddr := ac.prevAddr + ridx := ac.reconnectIdx ac.mu.Unlock() ac.cc.mu.RLock() ac.dopts.copts.KeepaliveParams = ac.cc.mkp @@ -1012,7 +1013,7 @@ func (ac *addrConn) resetTransport() error { start := time.Now() backoffDeadline = start.Add(backoffFor) connectDeadline = start.Add(dialDuration) - prevAddr = resolver.Address{} // Start connecting from the beginning. + ridx = 0 // Start connecting from the beginning. } else { // Continue trying to conect with the same deadlines. connectRetryNum = ac.connectRetryNum @@ -1036,7 +1037,7 @@ func (ac *addrConn) resetTransport() error { copy(addrsIter, ac.addrs) copts := ac.dopts.copts ac.mu.Unlock() - connected, err := ac.createTransport(connectRetryNum, backoffDeadline, connectDeadline, addrsIter, prevAddr, copts) + connected, err := ac.createTransport(connectRetryNum, ridx, backoffDeadline, connectDeadline, addrsIter, copts) if err != nil { return err } @@ -1048,17 +1049,8 @@ func (ac *addrConn) resetTransport() error { // createTransport creates a connection to one of the backends in addrs. // It returns true if a connection was established. -func (ac *addrConn) createTransport(connectRetryNum int, backoffDeadline, connectDeadline time.Time, addrs []resolver.Address, prevAddr resolver.Address, copts transport.ConnectOptions) (bool, error) { - startFrom := 0 - for idx, addr := range addrs { - if addr.Addr == prevAddr.Addr { - // Find the previous attempted address and start with the - // one after it. - startFrom = idx + 1 - break - } - } - for i := startFrom; i < len(addrs); i++ { +func (ac *addrConn) createTransport(connectRetryNum, ridx int, backoffDeadline, connectDeadline time.Time, addrs []resolver.Address, copts transport.ConnectOptions) (bool, error) { + for i := ridx; i < len(addrs); i++ { addr := addrs[i] target := transport.TargetInfo{ Addr: addr.Addr, @@ -1110,11 +1102,10 @@ func (ac *addrConn) createTransport(connectRetryNum int, backoffDeadline, connec select { case <-done: case <-connectCtx.Done(): - // Didn't receive server preface, must kill this new - // transport now. - grpclog.Warningf("grpc: addrConn.createTransport will close the newly established transport since no server preface was received.") + // Didn't receive server preface, must kill this new transport now. + grpclog.Warningf("grpc: addrConn.createTransport failed to receive server preface before deadline.") newTr.Close() - continue + break case <-ac.ctx.Done(): } } @@ -1128,9 +1119,6 @@ func (ac *addrConn) createTransport(connectRetryNum int, backoffDeadline, connec ac.printf("ready") ac.state = connectivity.Ready ac.cc.handleSubConnStateChange(ac.acbw, ac.state) - if ac.transport != nil { - ac.transport.Close() - } ac.transport = newTr ac.curAddr = addr if ac.ready != nil { @@ -1140,6 +1128,7 @@ func (ac *addrConn) createTransport(connectRetryNum int, backoffDeadline, connec ac.connectRetryNum = connectRetryNum ac.backoffDeadline = backoffDeadline ac.connectDeadline = connectDeadline + ac.reconnectIdx = i + 1 // Start reconnecting from the next backend in the list. ac.mu.Unlock() return true, nil } @@ -1217,7 +1206,6 @@ func (ac *addrConn) transportMonitor() { ac.state = connectivity.TransientFailure ac.cc.handleSubConnStateChange(ac.acbw, ac.state) ac.cc.resolveNow(resolver.ResolveNowOption{}) - ac.prevAddr = ac.curAddr ac.curAddr = resolver.Address{} ac.mu.Unlock() if err := ac.resetTransport(); err != nil { diff --git a/clientconn_test.go b/clientconn_test.go index 0dfc7be5e6c4..c20bd18720f4 100644 --- a/clientconn_test.go +++ b/clientconn_test.go @@ -151,6 +151,7 @@ func TestDialWaitsForServerSettings(t *testing.T) { <-dialDone // Close conn only after dial returns. }() ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) + defer cancel() client, err := DialContext(ctx, server.Addr().String(), WithInsecure(), WithWaitForHandshake(), WithBlock()) close(dialDone) if err != nil { diff --git a/vet.sh b/vet.sh index e1668c78a757..8c5c5ba6dfb2 100755 --- a/vet.sh +++ b/vet.sh @@ -65,7 +65,7 @@ trap cleanup EXIT git ls-files "*.go" | xargs sed -i 's:"golang.org/x/net/context":"context":' set +o pipefail # TODO: Stop filtering pb.go files once golang/protobuf#214 is fixed. -go tool vet -all . 2>&1 | grep -vE 'cancel (function|var)' | grep -vF '.pb.go:' | tee /dev/stderr | (! read) +go tool vet -all . 2>&1 | grep -vE '(clientconn|transport\/transport_test).go:.*cancel (function|var)' | grep -vF '.pb.go:' | tee /dev/stderr | (! read) set -o pipefail git reset --hard HEAD From 593a8081f3e07587f849eaea0c1f01bf43228e46 Mon Sep 17 00:00:00 2001 From: Mahak Mukhi Date: Thu, 30 Nov 2017 15:16:29 -0800 Subject: [PATCH 09/11] spellings! --- clientconn.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clientconn.go b/clientconn.go index cd514aa20330..33edd241d038 100644 --- a/clientconn.go +++ b/clientconn.go @@ -814,7 +814,7 @@ func (ac *addrConn) tryUpdateAddrs(addrs []resolver.Address) bool { grpclog.Infof("addrConn: tryUpdateAddrs curAddrFound: %v", curAddrFound) if curAddrFound { ac.addrs = addrs - ac.reconnectIdx = 0 // Start reconnecting from begining in the new list. + ac.reconnectIdx = 0 // Start reconnecting from beginning in the new list. } return curAddrFound From f8dda555ef2c8d7d242cc61b49e5ffb7c3d96acb Mon Sep 17 00:00:00 2001 From: Mahak Mukhi Date: Thu, 30 Nov 2017 16:46:15 -0800 Subject: [PATCH 10/11] Fix racy test. --- clientconn_test.go | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/clientconn_test.go b/clientconn_test.go index c20bd18720f4..2142ff15d095 100644 --- a/clientconn_test.go +++ b/clientconn_test.go @@ -181,9 +181,12 @@ func TestCloseConnectionWhenServerPrefaceNotReceived(t *testing.T) { } defer server.Close() done := make(chan struct{}) + clientDone := make(chan struct{}) go func() { // Launch the server. defer func() { - close(done) + if done != nil { + close(done) + } }() conn1, err := server.Accept() if err != nil { @@ -234,14 +237,18 @@ func TestCloseConnectionWhenServerPrefaceNotReceived(t *testing.T) { t.Errorf("Unexpected error while reading. Err: %v, want timeout error", err) break } + close(done) + done = nil + <-clientDone }() client, err := Dial(server.Addr().String(), WithInsecure()) if err != nil { t.Fatalf("Error while dialing. Err: %v", err) } - defer client.Close() <-done + client.Close() + close(clientDone) } func TestBackoffWhenNoServerPrefaceReceived(t *testing.T) { From 431db65ac70e1cbe57d060e4e1baf04da7db7192 Mon Sep 17 00:00:00 2001 From: Mahak Mukhi Date: Thu, 30 Nov 2017 17:13:47 -0800 Subject: [PATCH 11/11] Post review update. --- clientconn.go | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/clientconn.go b/clientconn.go index 33edd241d038..f058d3716d56 100644 --- a/clientconn.go +++ b/clientconn.go @@ -918,7 +918,7 @@ type addrConn struct { mu sync.Mutex curAddr resolver.Address - reconnectIdx int // The index in adder list to start reconnecting from. + reconnectIdx int // The index in addrs list to start reconnecting from. state connectivity.State // ready is closed and becomes nil when a new transport is up or failed // due to timeout. @@ -1169,17 +1169,13 @@ func (ac *addrConn) transportMonitor() { case <-t.GoAway(): case <-t.Error(): case <-cdeadline: - var isConnected bool ac.mu.Lock() + // This implies that client received server preface. if ac.backoffDeadline.IsZero() { - // This implies that client received server - // preface. - isConnected = true - } - ac.mu.Unlock() - if isConnected { + ac.mu.Unlock() continue } + ac.mu.Unlock() timer = nil // No server preface received until deadline. // Kill the connection.