Skip to content

Commit

Permalink
add transport closure per grpc#1962, and rename common() to ac.reconn…
Browse files Browse the repository at this point in the history
…ect()
  • Loading branch information
jeanbza committed Apr 6, 2018
1 parent 798878b commit a1f4bdd
Showing 1 changed file with 45 additions and 35 deletions.
80 changes: 45 additions & 35 deletions clientconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -840,71 +840,53 @@ func (ac *addrConn) connect() error {
ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
ac.mu.Unlock()

common := func() bool {
ac.mu.Lock()
if ac.state == connectivity.Shutdown {
ac.mu.Unlock()
return true
}

ac.state = connectivity.TransientFailure
ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
ac.cc.resolveNow(resolver.ResolveNowOption{})
ac.curAddr = resolver.Address{}
ac.mu.Unlock()

if err := ac.resetTransport(); err != nil {
ac.mu.Lock()
ac.printf("transport exiting: %v", err)
ac.mu.Unlock()
grpclog.Warningf("grpc: addrConn.transportMonitor exits due to: %v", err)
if err != errConnClosing {
// Keep this ac in cc.conns, to get the reason it's torn down.
ac.tearDown(err)
}
return true
}
return false
}

var timer *time.Timer
var cdeadline <-chan time.Time

onDeadline := func() bool {
ac.mu.Lock()
if ac.backoffDeadline.IsZero() {
ac.mu.Unlock()
return true
return false
}
ac.mu.Unlock()
err := ac.transport.Close()
if err != nil {
grpclog.Error(err)
}

return common()
return ac.reconnect()
}

onGoAway := func() bool {
if timer != nil {
timer.Stop()
}
ac.adjustParams(ac.transport.GetGoAwayReason())
return common()

return ac.reconnect()
}

onError := func() bool {
if timer != nil {
timer.Stop()
}
// In case this is triggered because clientConn.Close()
// was called, we want to immediately close the transport
// since no other goroutine might notice it for a while.
err := ac.transport.Close()
if err != nil {
grpclog.Error(err)
}

// If a GoAway happened, regardless of error, adjust our keepalive
// parameters as appropriate. Note this is inherently racy.
select {
case <-ac.transport.GoAway():
ac.adjustParams(ac.transport.GetGoAwayReason())
default:
}
return common()
return ac.reconnect()
}

// Start a goroutine connecting to the server asynchronously.
Expand All @@ -918,7 +900,7 @@ func (ac *addrConn) connect() error {
return
}

var shouldExit bool
var successfulReconnect bool
for {
ac.mu.Lock()
if !ac.connectDeadline.IsZero() {
Expand All @@ -929,15 +911,15 @@ func (ac *addrConn) connect() error {

select {
case <-cdeadline:
if shouldExit = onDeadline(); shouldExit {
if successfulReconnect = onDeadline(); !successfulReconnect {
return
}
case <-ac.transport.GoAway():
if shouldExit = onGoAway(); shouldExit {
if successfulReconnect = onGoAway(); !successfulReconnect {
return
}
case <-ac.transport.Error():
if shouldExit = onError(); shouldExit {
if successfulReconnect = onError(); !successfulReconnect {
return
}
}
Expand Down Expand Up @@ -1418,6 +1400,34 @@ func (ac *addrConn) getState() connectivity.State {
return ac.state
}

// Begins reconnect process; returns true if successful or false otherwise.
func (ac *addrConn) reconnect() bool {
ac.mu.Lock()
if ac.state == connectivity.Shutdown {
ac.mu.Unlock()
return false
}

ac.state = connectivity.TransientFailure
ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
ac.cc.resolveNow(resolver.ResolveNowOption{})
ac.curAddr = resolver.Address{}
ac.mu.Unlock()

if err := ac.resetTransport(); err != nil {
ac.mu.Lock()
ac.printf("transport exiting: %v", err)
ac.mu.Unlock()
grpclog.Warningf("grpc: addrConn.transportMonitor exits due to: %v", err)
if err != errConnClosing {
// Keep this ac in cc.conns, to get the reason it's torn down.
ac.tearDown(err)
}
return false
}
return true
}

// ErrClientConnTimeout indicates that the ClientConn cannot establish the
// underlying connections within the specified timeout.
//
Expand Down

0 comments on commit a1f4bdd

Please sign in to comment.