Skip to content

Commit

Permalink
Add and use connectivity package for states (#1430)
Browse files Browse the repository at this point in the history
* Add and use connectivity package
* Mark cc state APIs as experimental
  • Loading branch information
menghanl authored Aug 9, 2017
1 parent 73041be commit e81b569
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 79 deletions.
113 changes: 41 additions & 72 deletions clientconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@ package grpc

import (
"errors"
"fmt"
"net"
"strings"
"sync"
"time"

"golang.org/x/net/context"
"golang.org/x/net/trace"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/keepalive"
Expand Down Expand Up @@ -445,39 +445,6 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
return cc, nil
}

// ConnectivityState indicates the state of a client connection.
type ConnectivityState int

const (
// Idle indicates the ClientConn is idle.
Idle ConnectivityState = iota
// Connecting indicates the ClienConn is connecting.
Connecting
// Ready indicates the ClientConn is ready for work.
Ready
// TransientFailure indicates the ClientConn has seen a failure but expects to recover.
TransientFailure
// Shutdown indicates the ClientConn has started shutting down.
Shutdown
)

func (s ConnectivityState) String() string {
switch s {
case Idle:
return "IDLE"
case Connecting:
return "CONNECTING"
case Ready:
return "READY"
case TransientFailure:
return "TRANSIENT_FAILURE"
case Shutdown:
return "SHUTDOWN"
default:
panic(fmt.Sprintf("unknown connectivity state: %d", s))
}
}

// connectivityStateEvaluator gets updated by addrConns when their
// states transition, based on which it evaluates the state of
// ClientConn.
Expand All @@ -492,55 +459,55 @@ type connectivityStateEvaluator struct {

// recordTransition records state change happening in every addrConn and based on
// that it evaluates what state the ClientConn is in.
// It can only transition between Ready, Connecting and TransientFailure. Other states,
// Idle and Shutdown are transitioned into by ClientConn; in the begining of the connection
// It can only transition between connectivity.Ready, connectivity.Connecting and connectivity.TransientFailure. Other states,
// Idle and connectivity.Shutdown are transitioned into by ClientConn; in the begining of the connection
// before any addrConn is created ClientConn is in idle state. In the end when ClientConn
// closes it is in Shutdown state.
// closes it is in connectivity.Shutdown state.
// TODO Note that in later releases, a ClientConn with no activity will be put into an Idle state.
func (cse *connectivityStateEvaluator) recordTransition(oldState, newState ConnectivityState) {
func (cse *connectivityStateEvaluator) recordTransition(oldState, newState connectivity.State) {
cse.mu.Lock()
defer cse.mu.Unlock()

// Update counters.
for idx, state := range []ConnectivityState{oldState, newState} {
for idx, state := range []connectivity.State{oldState, newState} {
updateVal := 2*uint64(idx) - 1 // -1 for oldState and +1 for new.
switch state {
case Ready:
case connectivity.Ready:
cse.numReady += updateVal
case Connecting:
case connectivity.Connecting:
cse.numConnecting += updateVal
case TransientFailure:
case connectivity.TransientFailure:
cse.numTransientFailure += updateVal
}
}

// Evaluate.
if cse.numReady > 0 {
cse.csMgr.updateState(Ready)
cse.csMgr.updateState(connectivity.Ready)
return
}
if cse.numConnecting > 0 {
cse.csMgr.updateState(Connecting)
cse.csMgr.updateState(connectivity.Connecting)
return
}
cse.csMgr.updateState(TransientFailure)
cse.csMgr.updateState(connectivity.TransientFailure)
}

// connectivityStateManager keeps the ConnectivityState of ClientConn.
// connectivityStateManager keeps the connectivity.State of ClientConn.
// This struct will eventually be exported so the balancers can access it.
type connectivityStateManager struct {
mu sync.Mutex
state ConnectivityState
state connectivity.State
notifyChan chan struct{}
}

// updateState updates the ConnectivityState of ClientConn.
// updateState updates the connectivity.State of ClientConn.
// If there's a change it notifies goroutines waiting on state change to
// happen.
func (csm *connectivityStateManager) updateState(state ConnectivityState) {
func (csm *connectivityStateManager) updateState(state connectivity.State) {
csm.mu.Lock()
defer csm.mu.Unlock()
if csm.state == Shutdown {
if csm.state == connectivity.Shutdown {
return
}
if csm.state == state {
Expand All @@ -554,7 +521,7 @@ func (csm *connectivityStateManager) updateState(state ConnectivityState) {
}
}

func (csm *connectivityStateManager) getState() ConnectivityState {
func (csm *connectivityStateManager) getState() connectivity.State {
csm.mu.Lock()
defer csm.mu.Unlock()
return csm.state
Expand Down Expand Up @@ -587,9 +554,10 @@ type ClientConn struct {
mkp keepalive.ClientParameters
}

// WaitForStateChange waits until the ConnectivityState of ClientConn changes from sourceState or
// WaitForStateChange waits until the connectivity.State of ClientConn changes from sourceState or
// ctx expires. A true value is returned in former case and false in latter.
func (cc *ClientConn) WaitForStateChange(ctx context.Context, sourceState ConnectivityState) bool {
// This is an EXPERIMENTAL API.
func (cc *ClientConn) WaitForStateChange(ctx context.Context, sourceState connectivity.State) bool {
ch := cc.csMgr.getNotifyChan()
if cc.csMgr.getState() != sourceState {
return true
Expand All @@ -602,8 +570,9 @@ func (cc *ClientConn) WaitForStateChange(ctx context.Context, sourceState Connec
}
}

// GetState returns the ConnectivityState of ClientConn.
func (cc *ClientConn) GetState() ConnectivityState {
// GetState returns the connectivity.State of ClientConn.
// This is an EXPERIMENTAL API.
func (cc *ClientConn) GetState() connectivity.State {
return cc.csMgr.getState()
}

Expand Down Expand Up @@ -855,7 +824,7 @@ func (cc *ClientConn) Close() error {
}
conns := cc.conns
cc.conns = nil
cc.csMgr.updateState(Shutdown)
cc.csMgr.updateState(connectivity.Shutdown)
cc.mu.Unlock()
if cc.dopts.balancer != nil {
cc.dopts.balancer.Close()
Expand All @@ -879,7 +848,7 @@ type addrConn struct {
csEvltr *connectivityStateEvaluator

mu sync.Mutex
state ConnectivityState
state connectivity.State
down func(error) // the handler called when a connection is down.
// ready is closed and becomes nil when a new transport is up or failed
// due to timeout.
Expand Down Expand Up @@ -926,7 +895,7 @@ func (ac *addrConn) errorf(format string, a ...interface{}) {
// - otherwise, it will be closed.
func (ac *addrConn) resetTransport(drain bool) error {
ac.mu.Lock()
if ac.state == Shutdown {
if ac.state == connectivity.Shutdown {
ac.mu.Unlock()
return errConnClosing
}
Expand All @@ -936,7 +905,7 @@ func (ac *addrConn) resetTransport(drain bool) error {
ac.down = nil
}
oldState := ac.state
ac.state = Connecting
ac.state = connectivity.Connecting
ac.csEvltr.recordTransition(oldState, ac.state)
t := ac.transport
ac.transport = nil
Expand All @@ -949,7 +918,7 @@ func (ac *addrConn) resetTransport(drain bool) error {
ac.cc.mu.RUnlock()
for retries := 0; ; retries++ {
ac.mu.Lock()
if ac.state == Shutdown {
if ac.state == connectivity.Shutdown {
// ac.tearDown(...) has been invoked.
ac.mu.Unlock()
return errConnClosing
Expand Down Expand Up @@ -977,14 +946,14 @@ func (ac *addrConn) resetTransport(drain bool) error {
}
grpclog.Warningf("grpc: addrConn.resetTransport failed to create client transport: %v; Reconnecting to %v", err, ac.addr)
ac.mu.Lock()
if ac.state == Shutdown {
if ac.state == connectivity.Shutdown {
// ac.tearDown(...) has been invoked.
ac.mu.Unlock()
return errConnClosing
}
ac.errorf("transient failure: %v", err)
oldState = ac.state
ac.state = TransientFailure
ac.state = connectivity.TransientFailure
ac.csEvltr.recordTransition(oldState, ac.state)
if ac.ready != nil {
close(ac.ready)
Expand All @@ -1003,14 +972,14 @@ func (ac *addrConn) resetTransport(drain bool) error {
}
ac.mu.Lock()
ac.printf("ready")
if ac.state == Shutdown {
if ac.state == connectivity.Shutdown {
// ac.tearDown(...) has been invoked.
ac.mu.Unlock()
newTransport.Close()
return errConnClosing
}
oldState = ac.state
ac.state = Ready
ac.state = connectivity.Ready
ac.csEvltr.recordTransition(oldState, ac.state)
ac.transport = newTransport
if ac.ready != nil {
Expand Down Expand Up @@ -1081,13 +1050,13 @@ func (ac *addrConn) transportMonitor() {
default:
}
ac.mu.Lock()
if ac.state == Shutdown {
if ac.state == connectivity.Shutdown {
// ac has been shutdown.
ac.mu.Unlock()
return
}
oldState := ac.state
ac.state = TransientFailure
ac.state = connectivity.TransientFailure
ac.csEvltr.recordTransition(oldState, ac.state)
ac.mu.Unlock()
if err := ac.resetTransport(false); err != nil {
Expand All @@ -1107,12 +1076,12 @@ func (ac *addrConn) transportMonitor() {
}

// wait blocks until i) the new transport is up or ii) ctx is done or iii) ac is closed or
// iv) transport is in TransientFailure and there is a balancer/failfast is true.
// iv) transport is in connectivity.TransientFailure and there is a balancer/failfast is true.
func (ac *addrConn) wait(ctx context.Context, hasBalancer, failfast bool) (transport.ClientTransport, error) {
for {
ac.mu.Lock()
switch {
case ac.state == Shutdown:
case ac.state == connectivity.Shutdown:
if failfast || !hasBalancer {
// RPC is failfast or balancer is nil. This RPC should fail with ac.tearDownErr.
err := ac.tearDownErr
Expand All @@ -1121,11 +1090,11 @@ func (ac *addrConn) wait(ctx context.Context, hasBalancer, failfast bool) (trans
}
ac.mu.Unlock()
return nil, errConnClosing
case ac.state == Ready:
case ac.state == connectivity.Ready:
ct := ac.transport
ac.mu.Unlock()
return ct, nil
case ac.state == TransientFailure:
case ac.state == connectivity.TransientFailure:
if failfast || hasBalancer {
ac.mu.Unlock()
return nil, errConnUnavailable
Expand Down Expand Up @@ -1167,11 +1136,11 @@ func (ac *addrConn) tearDown(err error) {
// address removal and GoAway.
ac.transport.GracefulClose()
}
if ac.state == Shutdown {
if ac.state == connectivity.Shutdown {
return
}
oldState := ac.state
ac.state = Shutdown
ac.state = connectivity.Shutdown
ac.tearDownErr = err
ac.csEvltr.recordTransition(oldState, ac.state)
if ac.events != nil {
Expand Down
11 changes: 6 additions & 5 deletions clientconn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,17 @@ import (

"golang.org/x/net/context"

"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/naming"
"google.golang.org/grpc/testdata"
)

func assertState(wantState ConnectivityState, cc *ClientConn) (ConnectivityState, bool) {
func assertState(wantState connectivity.State, cc *ClientConn) (connectivity.State, bool) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
var state ConnectivityState
var state connectivity.State
for state = cc.GetState(); state != wantState && cc.WaitForStateChange(ctx, state); state = cc.GetState() {
}
return state, state == wantState
Expand All @@ -54,7 +55,7 @@ func TestConnectivityStates(t *testing.T) {
t.Fatalf("Dial(\"foo.bar.com\", WithBalancer(_)) = _, %v, want _ <nil>", err)
}
defer cc.Close()
wantState := Ready
wantState := connectivity.Ready
if state, ok := assertState(wantState, cc); !ok {
t.Fatalf("asserState(%s) = %s, false, want %s, true", wantState, state, wantState)
}
Expand All @@ -66,7 +67,7 @@ func TestConnectivityStates(t *testing.T) {
},
}
resolver.w.inject(update)
wantState = TransientFailure
wantState = connectivity.TransientFailure
if state, ok := assertState(wantState, cc); !ok {
t.Fatalf("asserState(%s) = %s, false, want %s, true", wantState, state, wantState)
}
Expand All @@ -75,7 +76,7 @@ func TestConnectivityStates(t *testing.T) {
Addr: "localhost:" + servers[1].port,
}
resolver.w.inject(update)
wantState = Ready
wantState = connectivity.Ready
if state, ok := assertState(wantState, cc); !ok {
t.Fatalf("asserState(%s) = %s, false, want %s, true", wantState, state, wantState)
}
Expand Down
Loading

0 comments on commit e81b569

Please sign in to comment.