Skip to content

Commit

Permalink
transport: remove RequireHandshakeHybrid support
Browse files Browse the repository at this point in the history
This removes RequireHandshakeHybrid support and changes the default behavior
to RequireHandshakeOn. Dial calls will now block and wait for a successful
handshake before proceeding. Users relying on the old hybrid behavior (cmux
users) should consult soheilhy/cmux#64.

Also, several tests have been updated to take this into consideration by
sending settings frames.
  • Loading branch information
jeanbza committed Feb 8, 2019
1 parent 2259ee6 commit 1e08f17
Show file tree
Hide file tree
Showing 4 changed files with 161 additions and 223 deletions.
154 changes: 52 additions & 102 deletions clientconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1022,35 +1022,8 @@ func (ac *addrConn) resetTransport() {
reconnect := grpcsync.NewEvent()
prefaceReceived := make(chan struct{})
newTr, err := ac.createTransport(addr, copts, connectDeadline, reconnect, prefaceReceived)
if err == nil {
ac.mu.Lock()
ac.curAddr = addr
ac.transport = newTr
ac.mu.Unlock()

healthCheckConfig := ac.cc.healthCheckConfig()
// LB channel health checking is only enabled when all the four requirements below are met:
// 1. it is not disabled by the user with the WithDisableHealthCheck DialOption,
// 2. the internal.HealthCheckFunc is set by importing the grpc/healthcheck package,
// 3. a service config with non-empty healthCheckConfig field is provided,
// 4. the current load balancer allows it.
healthcheckManagingState := false
if !ac.cc.dopts.disableHealthCheck && healthCheckConfig != nil && ac.scopts.HealthCheckEnabled {
if ac.cc.dopts.healthCheckFunc == nil {
// TODO: add a link to the health check doc in the error message.
grpclog.Error("the client side LB channel health check function has not been set.")
} else {
// TODO(deklerk) refactor to just return transport
go ac.startHealthCheck(hctx, newTr, addr, healthCheckConfig.ServiceName)
healthcheckManagingState = true
}
}
if !healthcheckManagingState {
ac.mu.Lock()
ac.updateConnectivityState(connectivity.Ready)
ac.mu.Unlock()
}
} else {
if err != nil {
ac.cc.blockingpicker.updateConnectionError(err)
hcancel()
if err == errConnClosing {
return
Expand All @@ -1063,55 +1036,46 @@ func (ac *addrConn) resetTransport() {
}

ac.mu.Lock()
reqHandshake := ac.dopts.reqHandshake
ac.curAddr = addr
ac.transport = newTr
ac.mu.Unlock()

<-reconnect.Done()
hcancel()

if reqHandshake == envconfig.RequireHandshakeHybrid {
// In RequireHandshakeHybrid mode, we must check to see whether
// server preface has arrived yet to decide whether to start
// reconnecting at the top of the list (server preface received)
// or continue with the next addr in the list as if the
// connection were not successful (server preface not received).
select {
case <-prefaceReceived:
// We received a server preface - huzzah! We consider this
// a success and restart from the top of the addr list.
ac.mu.Lock()
ac.backoffIdx = 0
ac.mu.Unlock()
break addrLoop
default:
// Despite having set state to READY, in hybrid mode we
// consider this a failure and continue connecting at the
// next addr in the list.
ac.mu.Lock()
if ac.state == connectivity.Shutdown {
ac.mu.Unlock()
return
}

ac.updateConnectivityState(connectivity.TransientFailure)
ac.mu.Unlock()

if tryNextAddrFromStart.HasFired() {
break addrLoop
}
healthCheckConfig := ac.cc.healthCheckConfig()
// LB channel health checking is only enabled when all the four requirements below are met:
// 1. it is not disabled by the user with the WithDisableHealthCheck DialOption,
// 2. the internal.HealthCheckFunc is set by importing the grpc/healthcheck package,
// 3. a service config with non-empty healthCheckConfig field is provided,
// 4. the current load balancer allows it.
healthcheckManagingState := false
if !ac.cc.dopts.disableHealthCheck && healthCheckConfig != nil && ac.scopts.HealthCheckEnabled {
if ac.cc.dopts.healthCheckFunc == nil {
// TODO: add a link to the health check doc in the error message.
grpclog.Error("the client side LB channel health check function has not been set.")
} else {
// TODO(deklerk) refactor to just return transport
go ac.startHealthCheck(hctx, newTr, addr, healthCheckConfig.ServiceName)
healthcheckManagingState = true
}
} else {
// In RequireHandshakeOn mode, we would have already waited for
// the server preface, so we consider this a success and restart
// from the top of the addr list. In RequireHandshakeOff mode,
// we don't care to wait for the server preface before
// considering this a success, so we also restart from the top
// of the addr list.
}
if !healthcheckManagingState {
ac.mu.Lock()
ac.backoffIdx = 0
ac.updateConnectivityState(connectivity.Ready)
ac.mu.Unlock()
break addrLoop
}

<-reconnect.Done()
hcancel()

// In RequireHandshakeOn mode, we would have already waited for
// the server preface, so we consider this a success and restart
// from the top of the addr list. In RequireHandshakeOff mode,
// we don't care to wait for the server preface before
// considering this a success, so we also restart from the top
// of the addr list.
ac.mu.Lock()
ac.backoffIdx = 0
ac.mu.Unlock()
break addrLoop
}

// After exhausting all addresses, or after need to reconnect after a
Expand Down Expand Up @@ -1184,38 +1148,8 @@ func (ac *addrConn) createTransport(addr resolver.Address, copts transport.Conne
}

newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, target, copts, onPrefaceReceipt, onGoAway, onClose)

if err == nil {
if ac.dopts.reqHandshake == envconfig.RequireHandshakeOn {
select {
case <-prefaceTimer.C:
// We didn't get the preface in time.
newTr.Close()
err = errors.New("timed out waiting for server handshake")
case <-prefaceReceived:
// We got the preface - huzzah! things are good.
case <-onCloseCalled:
// The transport has already closed - noop.
return nil, errors.New("connection closed")
}
} else if ac.dopts.reqHandshake == envconfig.RequireHandshakeHybrid {
go func() {
select {
case <-prefaceTimer.C:
// We didn't get the preface in time.
newTr.Close()
case <-prefaceReceived:
// We got the preface just in the nick of time - huzzah!
case <-onCloseCalled:
// The transport has already closed - noop.
}
}()
}
}

if err != nil {
// newTr is either nil, or closed.
ac.cc.blockingpicker.updateConnectionError(err)
ac.mu.Lock()
if ac.state == connectivity.Shutdown {
// ac.tearDown(...) has been invoked.
Expand All @@ -1228,6 +1162,22 @@ func (ac *addrConn) createTransport(addr resolver.Address, copts transport.Conne
return nil, err
}

if ac.dopts.reqHandshake == envconfig.RequireHandshakeOn {
select {
case <-prefaceTimer.C:
// We didn't get the preface in time.
newTr.Close()
grpclog.Warningf("grpc: addrConn.createTransport failed to connect to %v: didn't receive server preface in time. Reconnecting...", addr)
return nil, errors.New("timed out waiting for server handshake")
case <-prefaceReceived:
// We got the preface - huzzah! things are good.
case <-onCloseCalled:
// The transport has already closed - noop.
return nil, errors.New("connection closed")
// TODO(deklerk) this should bail on ac.ctx.Done(). Add a test and fix.
}
}

// Now there is a viable transport to be use, so set ac.transport to reflect the new viable transport.
ac.mu.Lock()
if ac.state == connectivity.Shutdown {
Expand Down
Loading

0 comments on commit 1e08f17

Please sign in to comment.