From e81b5698fd6a1fdbae0c62f52e9aabdeee231b99 Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Wed, 9 Aug 2017 10:31:12 -0700 Subject: [PATCH] Add and use connectivity package for states (#1430) * Add and use connectivity package * Mark cc state APIs as experimental --- clientconn.go | 113 +++++++++++++---------------------- clientconn_test.go | 11 ++-- connectivity/connectivity.go | 72 ++++++++++++++++++++++ test/end2end_test.go | 5 +- 4 files changed, 122 insertions(+), 79 deletions(-) create mode 100644 connectivity/connectivity.go diff --git a/clientconn.go b/clientconn.go index 19be007d2d48..e3f6cb19a308 100644 --- a/clientconn.go +++ b/clientconn.go @@ -20,7 +20,6 @@ package grpc import ( "errors" - "fmt" "net" "strings" "sync" @@ -28,6 +27,7 @@ import ( "golang.org/x/net/context" "golang.org/x/net/trace" + "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials" "google.golang.org/grpc/grpclog" "google.golang.org/grpc/keepalive" @@ -445,39 +445,6 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn * return cc, nil } -// ConnectivityState indicates the state of a client connection. -type ConnectivityState int - -const ( - // Idle indicates the ClientConn is idle. - Idle ConnectivityState = iota - // Connecting indicates the ClienConn is connecting. - Connecting - // Ready indicates the ClientConn is ready for work. - Ready - // TransientFailure indicates the ClientConn has seen a failure but expects to recover. - TransientFailure - // Shutdown indicates the ClientConn has started shutting down. - Shutdown -) - -func (s ConnectivityState) String() string { - switch s { - case Idle: - return "IDLE" - case Connecting: - return "CONNECTING" - case Ready: - return "READY" - case TransientFailure: - return "TRANSIENT_FAILURE" - case Shutdown: - return "SHUTDOWN" - default: - panic(fmt.Sprintf("unknown connectivity state: %d", s)) - } -} - // connectivityStateEvaluator gets updated by addrConns when their // states transition, based on which it evaluates the state of // ClientConn. @@ -492,55 +459,55 @@ type connectivityStateEvaluator struct { // recordTransition records state change happening in every addrConn and based on // that it evaluates what state the ClientConn is in. -// It can only transition between Ready, Connecting and TransientFailure. Other states, -// Idle and Shutdown are transitioned into by ClientConn; in the begining of the connection +// It can only transition between connectivity.Ready, connectivity.Connecting and connectivity.TransientFailure. Other states, +// Idle and connectivity.Shutdown are transitioned into by ClientConn; in the begining of the connection // before any addrConn is created ClientConn is in idle state. In the end when ClientConn -// closes it is in Shutdown state. +// closes it is in connectivity.Shutdown state. // TODO Note that in later releases, a ClientConn with no activity will be put into an Idle state. -func (cse *connectivityStateEvaluator) recordTransition(oldState, newState ConnectivityState) { +func (cse *connectivityStateEvaluator) recordTransition(oldState, newState connectivity.State) { cse.mu.Lock() defer cse.mu.Unlock() // Update counters. - for idx, state := range []ConnectivityState{oldState, newState} { + for idx, state := range []connectivity.State{oldState, newState} { updateVal := 2*uint64(idx) - 1 // -1 for oldState and +1 for new. switch state { - case Ready: + case connectivity.Ready: cse.numReady += updateVal - case Connecting: + case connectivity.Connecting: cse.numConnecting += updateVal - case TransientFailure: + case connectivity.TransientFailure: cse.numTransientFailure += updateVal } } // Evaluate. if cse.numReady > 0 { - cse.csMgr.updateState(Ready) + cse.csMgr.updateState(connectivity.Ready) return } if cse.numConnecting > 0 { - cse.csMgr.updateState(Connecting) + cse.csMgr.updateState(connectivity.Connecting) return } - cse.csMgr.updateState(TransientFailure) + cse.csMgr.updateState(connectivity.TransientFailure) } -// connectivityStateManager keeps the ConnectivityState of ClientConn. +// connectivityStateManager keeps the connectivity.State of ClientConn. // This struct will eventually be exported so the balancers can access it. type connectivityStateManager struct { mu sync.Mutex - state ConnectivityState + state connectivity.State notifyChan chan struct{} } -// updateState updates the ConnectivityState of ClientConn. +// updateState updates the connectivity.State of ClientConn. // If there's a change it notifies goroutines waiting on state change to // happen. -func (csm *connectivityStateManager) updateState(state ConnectivityState) { +func (csm *connectivityStateManager) updateState(state connectivity.State) { csm.mu.Lock() defer csm.mu.Unlock() - if csm.state == Shutdown { + if csm.state == connectivity.Shutdown { return } if csm.state == state { @@ -554,7 +521,7 @@ func (csm *connectivityStateManager) updateState(state ConnectivityState) { } } -func (csm *connectivityStateManager) getState() ConnectivityState { +func (csm *connectivityStateManager) getState() connectivity.State { csm.mu.Lock() defer csm.mu.Unlock() return csm.state @@ -587,9 +554,10 @@ type ClientConn struct { mkp keepalive.ClientParameters } -// WaitForStateChange waits until the ConnectivityState of ClientConn changes from sourceState or +// WaitForStateChange waits until the connectivity.State of ClientConn changes from sourceState or // ctx expires. A true value is returned in former case and false in latter. -func (cc *ClientConn) WaitForStateChange(ctx context.Context, sourceState ConnectivityState) bool { +// This is an EXPERIMENTAL API. +func (cc *ClientConn) WaitForStateChange(ctx context.Context, sourceState connectivity.State) bool { ch := cc.csMgr.getNotifyChan() if cc.csMgr.getState() != sourceState { return true @@ -602,8 +570,9 @@ func (cc *ClientConn) WaitForStateChange(ctx context.Context, sourceState Connec } } -// GetState returns the ConnectivityState of ClientConn. -func (cc *ClientConn) GetState() ConnectivityState { +// GetState returns the connectivity.State of ClientConn. +// This is an EXPERIMENTAL API. +func (cc *ClientConn) GetState() connectivity.State { return cc.csMgr.getState() } @@ -855,7 +824,7 @@ func (cc *ClientConn) Close() error { } conns := cc.conns cc.conns = nil - cc.csMgr.updateState(Shutdown) + cc.csMgr.updateState(connectivity.Shutdown) cc.mu.Unlock() if cc.dopts.balancer != nil { cc.dopts.balancer.Close() @@ -879,7 +848,7 @@ type addrConn struct { csEvltr *connectivityStateEvaluator mu sync.Mutex - state ConnectivityState + state connectivity.State down func(error) // the handler called when a connection is down. // ready is closed and becomes nil when a new transport is up or failed // due to timeout. @@ -926,7 +895,7 @@ func (ac *addrConn) errorf(format string, a ...interface{}) { // - otherwise, it will be closed. func (ac *addrConn) resetTransport(drain bool) error { ac.mu.Lock() - if ac.state == Shutdown { + if ac.state == connectivity.Shutdown { ac.mu.Unlock() return errConnClosing } @@ -936,7 +905,7 @@ func (ac *addrConn) resetTransport(drain bool) error { ac.down = nil } oldState := ac.state - ac.state = Connecting + ac.state = connectivity.Connecting ac.csEvltr.recordTransition(oldState, ac.state) t := ac.transport ac.transport = nil @@ -949,7 +918,7 @@ func (ac *addrConn) resetTransport(drain bool) error { ac.cc.mu.RUnlock() for retries := 0; ; retries++ { ac.mu.Lock() - if ac.state == Shutdown { + if ac.state == connectivity.Shutdown { // ac.tearDown(...) has been invoked. ac.mu.Unlock() return errConnClosing @@ -977,14 +946,14 @@ func (ac *addrConn) resetTransport(drain bool) error { } grpclog.Warningf("grpc: addrConn.resetTransport failed to create client transport: %v; Reconnecting to %v", err, ac.addr) ac.mu.Lock() - if ac.state == Shutdown { + if ac.state == connectivity.Shutdown { // ac.tearDown(...) has been invoked. ac.mu.Unlock() return errConnClosing } ac.errorf("transient failure: %v", err) oldState = ac.state - ac.state = TransientFailure + ac.state = connectivity.TransientFailure ac.csEvltr.recordTransition(oldState, ac.state) if ac.ready != nil { close(ac.ready) @@ -1003,14 +972,14 @@ func (ac *addrConn) resetTransport(drain bool) error { } ac.mu.Lock() ac.printf("ready") - if ac.state == Shutdown { + if ac.state == connectivity.Shutdown { // ac.tearDown(...) has been invoked. ac.mu.Unlock() newTransport.Close() return errConnClosing } oldState = ac.state - ac.state = Ready + ac.state = connectivity.Ready ac.csEvltr.recordTransition(oldState, ac.state) ac.transport = newTransport if ac.ready != nil { @@ -1081,13 +1050,13 @@ func (ac *addrConn) transportMonitor() { default: } ac.mu.Lock() - if ac.state == Shutdown { + if ac.state == connectivity.Shutdown { // ac has been shutdown. ac.mu.Unlock() return } oldState := ac.state - ac.state = TransientFailure + ac.state = connectivity.TransientFailure ac.csEvltr.recordTransition(oldState, ac.state) ac.mu.Unlock() if err := ac.resetTransport(false); err != nil { @@ -1107,12 +1076,12 @@ func (ac *addrConn) transportMonitor() { } // wait blocks until i) the new transport is up or ii) ctx is done or iii) ac is closed or -// iv) transport is in TransientFailure and there is a balancer/failfast is true. +// iv) transport is in connectivity.TransientFailure and there is a balancer/failfast is true. func (ac *addrConn) wait(ctx context.Context, hasBalancer, failfast bool) (transport.ClientTransport, error) { for { ac.mu.Lock() switch { - case ac.state == Shutdown: + case ac.state == connectivity.Shutdown: if failfast || !hasBalancer { // RPC is failfast or balancer is nil. This RPC should fail with ac.tearDownErr. err := ac.tearDownErr @@ -1121,11 +1090,11 @@ func (ac *addrConn) wait(ctx context.Context, hasBalancer, failfast bool) (trans } ac.mu.Unlock() return nil, errConnClosing - case ac.state == Ready: + case ac.state == connectivity.Ready: ct := ac.transport ac.mu.Unlock() return ct, nil - case ac.state == TransientFailure: + case ac.state == connectivity.TransientFailure: if failfast || hasBalancer { ac.mu.Unlock() return nil, errConnUnavailable @@ -1167,11 +1136,11 @@ func (ac *addrConn) tearDown(err error) { // address removal and GoAway. ac.transport.GracefulClose() } - if ac.state == Shutdown { + if ac.state == connectivity.Shutdown { return } oldState := ac.state - ac.state = Shutdown + ac.state = connectivity.Shutdown ac.tearDownErr = err ac.csEvltr.recordTransition(oldState, ac.state) if ac.events != nil { diff --git a/clientconn_test.go b/clientconn_test.go index 11071ceb5f0b..95a99c02c281 100644 --- a/clientconn_test.go +++ b/clientconn_test.go @@ -26,16 +26,17 @@ import ( "golang.org/x/net/context" + "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials" "google.golang.org/grpc/keepalive" "google.golang.org/grpc/naming" "google.golang.org/grpc/testdata" ) -func assertState(wantState ConnectivityState, cc *ClientConn) (ConnectivityState, bool) { +func assertState(wantState connectivity.State, cc *ClientConn) (connectivity.State, bool) { ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() - var state ConnectivityState + var state connectivity.State for state = cc.GetState(); state != wantState && cc.WaitForStateChange(ctx, state); state = cc.GetState() { } return state, state == wantState @@ -54,7 +55,7 @@ func TestConnectivityStates(t *testing.T) { t.Fatalf("Dial(\"foo.bar.com\", WithBalancer(_)) = _, %v, want _ ", err) } defer cc.Close() - wantState := Ready + wantState := connectivity.Ready if state, ok := assertState(wantState, cc); !ok { t.Fatalf("asserState(%s) = %s, false, want %s, true", wantState, state, wantState) } @@ -66,7 +67,7 @@ func TestConnectivityStates(t *testing.T) { }, } resolver.w.inject(update) - wantState = TransientFailure + wantState = connectivity.TransientFailure if state, ok := assertState(wantState, cc); !ok { t.Fatalf("asserState(%s) = %s, false, want %s, true", wantState, state, wantState) } @@ -75,7 +76,7 @@ func TestConnectivityStates(t *testing.T) { Addr: "localhost:" + servers[1].port, } resolver.w.inject(update) - wantState = Ready + wantState = connectivity.Ready if state, ok := assertState(wantState, cc); !ok { t.Fatalf("asserState(%s) = %s, false, want %s, true", wantState, state, wantState) } diff --git a/connectivity/connectivity.go b/connectivity/connectivity.go new file mode 100644 index 000000000000..568ef5dc68ba --- /dev/null +++ b/connectivity/connectivity.go @@ -0,0 +1,72 @@ +/* + * + * Copyright 2017 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +// Package connectivity defines connectivity semantics. +// For details, see https://github.com/grpc/grpc/blob/master/doc/connectivity-semantics-and-api.md. +// All APIs in this package are experimental. +package connectivity + +import ( + "golang.org/x/net/context" + "google.golang.org/grpc/grpclog" +) + +// State indicates the state of connectivity. +// It can be the state of a ClientConn or SubConn. +type State int + +func (s State) String() string { + switch s { + case Idle: + return "IDLE" + case Connecting: + return "CONNECTING" + case Ready: + return "READY" + case TransientFailure: + return "TRANSIENT_FAILURE" + case Shutdown: + return "SHUTDOWN" + default: + grpclog.Errorf("unknown connectivity state: %d", s) + return "Invalid-State" + } +} + +const ( + // Idle indicates the ClientConn is idle. + Idle State = iota + // Connecting indicates the ClienConn is connecting. + Connecting + // Ready indicates the ClientConn is ready for work. + Ready + // TransientFailure indicates the ClientConn has seen a failure but expects to recover. + TransientFailure + // Shutdown indicates the ClientConn has started shutting down. + Shutdown +) + +// Reporter reports the connectivity states. +type Reporter interface { + // CurrentState returns the current state of the reporter. + CurrentState() State + // WaitForStateChange blocks until the reporter's state is different from the given state, + // and returns true. + // It returns false if <-ctx.Done() can proceed (ctx got timeout or got canceled). + WaitForStateChange(context.Context, State) bool +} diff --git a/test/end2end_test.go b/test/end2end_test.go index f17448effd26..0907e203692e 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -44,6 +44,7 @@ import ( spb "google.golang.org/genproto/googleapis/rpc/status" "google.golang.org/grpc" "google.golang.org/grpc/codes" + "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials" _ "google.golang.org/grpc/grpclog/glogger" "google.golang.org/grpc/health" @@ -4853,9 +4854,9 @@ func testWaitForReadyConnection(t *testing.T, e env) { defer cancel() state := cc.GetState() // Wait for connection to be Ready. - for ; state != grpc.Ready && cc.WaitForStateChange(ctx, state); state = cc.GetState() { + for ; state != connectivity.Ready && cc.WaitForStateChange(ctx, state); state = cc.GetState() { } - if state != grpc.Ready { + if state != connectivity.Ready { t.Fatalf("Want connection state to be Ready, got %v", state) } ctx, cancel = context.WithTimeout(context.Background(), time.Second)