Skip to content

Commit

Permalink
grpc: report connectivity state changes on the ClientConn for Subscri…
Browse files Browse the repository at this point in the history
…bers (#6437)

Co-authored-by: Easwar Swaminathan <easwars@google.com>
  • Loading branch information
my4-dev and easwars authored Aug 8, 2023
1 parent 4832deb commit 2059c6e
Show file tree
Hide file tree
Showing 3 changed files with 139 additions and 2 deletions.
30 changes: 28 additions & 2 deletions clientconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/backoff"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/grpcsync"
Expand Down Expand Up @@ -136,7 +137,6 @@ func (dcs *defaultConfigSelector) SelectConfig(rpcInfo iresolver.RPCInfo) (*ires
func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
cc := &ClientConn{
target: target,
csMgr: &connectivityStateManager{},
conns: make(map[*addrConn]struct{}),
dopts: defaultDialOptions(),
czData: new(channelzData),
Expand Down Expand Up @@ -189,6 +189,8 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
// Register ClientConn with channelz.
cc.channelzRegistration(target)

cc.csMgr = newConnectivityStateManager(cc.channelzID)

if err := cc.validateTransportCredentials(); err != nil {
return nil, err
}
Expand Down Expand Up @@ -473,7 +475,6 @@ func (cc *ClientConn) validateTransportCredentials() error {
func (cc *ClientConn) channelzRegistration(target string) {
cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, cc.dopts.channelzParentID, target)
cc.addTraceEvent("created")
cc.csMgr.channelzID = cc.channelzID
}

// chainUnaryClientInterceptors chains all unary client interceptors into one.
Expand Down Expand Up @@ -538,13 +539,27 @@ func getChainStreamer(interceptors []StreamClientInterceptor, curr int, finalStr
}
}

// newConnectivityStateManager creates an connectivityStateManager with
// the specified id.
func newConnectivityStateManager(id *channelz.Identifier) *connectivityStateManager {
return &connectivityStateManager{
channelzID: id,
pubSub: grpcsync.NewPubSub(),
}
}

// connectivityStateManager keeps the connectivity.State of ClientConn.
// This struct will eventually be exported so the balancers can access it.
//
// TODO: If possible, get rid of the `connectivityStateManager` type, and
// provide this functionality using the `PubSub`, to avoid keeping track of
// the connectivity state at two places.
type connectivityStateManager struct {
mu sync.Mutex
state connectivity.State
notifyChan chan struct{}
channelzID *channelz.Identifier
pubSub *grpcsync.PubSub
}

// updateState updates the connectivity.State of ClientConn.
Expand All @@ -560,6 +575,8 @@ func (csm *connectivityStateManager) updateState(state connectivity.State) {
return
}
csm.state = state
csm.pubSub.Publish(state)

channelz.Infof(logger, csm.channelzID, "Channel Connectivity change to %v", state)
if csm.notifyChan != nil {
// There are other goroutines waiting on this channel.
Expand All @@ -583,6 +600,10 @@ func (csm *connectivityStateManager) getNotifyChan() <-chan struct{} {
return csm.notifyChan
}

func (csm *connectivityStateManager) close() {
csm.pubSub.Stop()
}

// ClientConnInterface defines the functions clients need to perform unary and
// streaming RPCs. It is implemented by *ClientConn, and is only intended to
// be referenced by generated code.
Expand Down Expand Up @@ -771,6 +792,10 @@ func init() {
panic(fmt.Sprintf("impossible error parsing empty service config: %v", cfg.Err))
}
emptyServiceConfig = cfg.Config.(*ServiceConfig)

internal.SubscribeToConnectivityStateChanges = func(cc *ClientConn, s grpcsync.Subscriber) func() {
return cc.csMgr.pubSub.Subscribe(s)
}
}

func (cc *ClientConn) maybeApplyDefaultServiceConfig(addrs []resolver.Address) {
Expand Down Expand Up @@ -1224,6 +1249,7 @@ func (cc *ClientConn) Close() error {
conns := cc.conns
cc.conns = nil
cc.csMgr.updateState(connectivity.Shutdown)
cc.csMgr.close()

pWrapper := cc.blockingpicker
rWrapper := cc.resolverWrapper
Expand Down
3 changes: 3 additions & 0 deletions internal/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,9 @@ var (
// deleted or changed.
BinaryLogger interface{} // func(binarylog.Logger) grpc.ServerOption

// SubscribeToConnectivityStateChanges adds a grpcsync.Subscriber to a provided grpc.ClientConn
SubscribeToConnectivityStateChanges interface{} // func(*grpc.ClientConn, grpcsync.Subscriber)

// NewXDSResolverWithConfigForTesting creates a new xds resolver builder using
// the provided xds bootstrap config instead of the global configuration from
// the supported environment variables. The resolver.Builder is meant to be
Expand Down
108 changes: 108 additions & 0 deletions test/connectivity_state_updates_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
*
* Copyright 2023 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package test

import (
"context"
"testing"

"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/internal/stubserver"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/resolver/manual"
)

type testSubscriber struct {
onMsgCh chan connectivity.State
}

func newTestSubscriber() *testSubscriber {
return &testSubscriber{onMsgCh: make(chan connectivity.State, 1)}
}

func (ts *testSubscriber) OnMessage(msg interface{}) {
select {
case ts.onMsgCh <- msg.(connectivity.State):
default:
}
}

func (s) TestConnectivityStateUpdates(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()

// Create a ClientConn with a short idle_timeout.
r := manual.NewBuilderWithScheme("whatever")
dopts := []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithResolvers(r),
grpc.WithIdleTimeout(defaultTestShortIdleTimeout),
}
cc, err := grpc.Dial(r.Scheme()+":///test.server", dopts...)
if err != nil {
t.Fatalf("grpc.Dial() failed: %v", err)
}
t.Cleanup(func() { cc.Close() })

s := newTestSubscriber()
internal.SubscribeToConnectivityStateChanges.(func(cc *grpc.ClientConn, s grpcsync.Subscriber) func())(cc, s)

backend := stubserver.StartTestService(t, nil)
t.Cleanup(backend.Stop)

wantStates := []connectivity.State{
connectivity.Connecting,
connectivity.Ready,
connectivity.Idle,
connectivity.Shutdown,
}

doneCh := make(chan struct{})
go func() {
defer close(doneCh)
for _, wantState := range wantStates {
select {
case gotState := <-s.onMsgCh:
if gotState != wantState {
t.Errorf("Received unexpected state: %q; want: %q", gotState, wantState)
}
case <-ctx.Done():
t.Error("Timeout when expecting the onMessage() callback to be invoked")
}
if t.Failed() {
break
}
}
}()

// Verify that the ClientConn moves to READY.
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: backend.Address}}})

// Verify that the ClientConn moves to IDLE as there is no activity.
awaitState(ctx, t, cc, connectivity.Idle)

cc.Close()
awaitState(ctx, t, cc, connectivity.Shutdown)

<-doneCh
}

0 comments on commit 2059c6e

Please sign in to comment.