Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Provide an internal-only API to report connectivity state changes on the ClientConn #6036

Closed
wants to merge 17 commits into from
29 changes: 25 additions & 4 deletions clientconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,10 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/backoff"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/connectivitystate"
"google.golang.org/grpc/internal/grpcsync"
iresolver "google.golang.org/grpc/internal/resolver"
"google.golang.org/grpc/internal/transport"
Expand Down Expand Up @@ -405,11 +407,16 @@ func getChainStreamer(interceptors []StreamClientInterceptor, curr int, finalStr

// connectivityStateManager keeps the connectivity.State of ClientConn.
// This struct will eventually be exported so the balancers can access it.
easwars marked this conversation as resolved.
Show resolved Hide resolved
//
// TODO: If possible, get rid of the `connectivityStateManager` type, and
// provide this functionality using the `Tracker`, to avoid keeping track of
// the connectivity state at two places.
type connectivityStateManager struct {
mu sync.Mutex
state connectivity.State
notifyChan chan struct{}
channelzID *channelz.Identifier
mu sync.Mutex
state connectivity.State
notifyChan chan struct{}
channelzID *channelz.Identifier
connectivityStateTracker *connectivitystate.Tracker
}

// updateState updates the connectivity.State of ClientConn.
Expand All @@ -425,6 +432,9 @@ func (csm *connectivityStateManager) updateState(state connectivity.State) {
return
}
csm.state = state
if csm.connectivityStateTracker != nil {
csm.connectivityStateTracker.SetState(state)
}
channelz.Infof(logger, csm.channelzID, "Channel Connectivity change to %v", state)
if csm.notifyChan != nil {
// There are other goroutines waiting on this channel.
Expand Down Expand Up @@ -598,6 +608,13 @@ func init() {
panic(fmt.Sprintf("impossible error parsing empty service config: %v", cfg.Err))
}
emptyServiceConfig = cfg.Config.(*ServiceConfig)

internal.AddConnectivityStateWatcher = func(cc *ClientConn, w connectivitystate.Watcher) func() {
easwars marked this conversation as resolved.
Show resolved Hide resolved
if cc.csMgr.connectivityStateTracker == nil {
cc.csMgr.connectivityStateTracker = connectivitystate.NewTracker(cc.csMgr.getState())
}
return cc.csMgr.connectivityStateTracker.AddWatcher(w)
}
}

func (cc *ClientConn) maybeApplyDefaultServiceConfig(addrs []resolver.Address) {
Expand Down Expand Up @@ -1055,6 +1072,10 @@ func (cc *ClientConn) Close() error {
rWrapper.close()
}

if cc.csMgr.connectivityStateTracker != nil {
cc.csMgr.connectivityStateTracker.Stop()
}

for ac := range conns {
ac.tearDown(ErrClientConnClosing)
}
Expand Down
129 changes: 129 additions & 0 deletions internal/connectivitystate/connectivitystate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
*
* Copyright 2023 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

// Package connectivitystate provides functionality to report and track
// connectivity state changes of ClientConns and SubConns.
package connectivitystate

import (
"context"
"sync"

"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/internal/grpcsync"
)

// Watcher wraps the functionality to be implemented by components
// interested in watching connectivity state changes.
type Watcher interface {
// OnStateChange is invoked to report connectivity state changes on the
// entity being watched.
OnStateChange(state connectivity.State)
}

// Tracker provides pubsub-like functionality for connectivity state changes.
//
// The entity whose connectivity state is being tracked publishes updates by
// calling the SetState() method.
//
// Components interested in connectivity state updates of the tracked entity
// subscribe to updates by calling the AddWatcher() method.
type Tracker struct {
cs *grpcsync.CallbackSerializer
cancel context.CancelFunc

// Access to the below fields are guarded by this mutex.
mu sync.Mutex
state connectivity.State
watchers map[Watcher]bool
stopped bool
}

// NewTracker returns a new Tracker instance initialized with the provided
// connectivity state.
func NewTracker(state connectivity.State) *Tracker {
ctx, cancel := context.WithCancel(context.Background())
return &Tracker{
cs: grpcsync.NewCallbackSerializer(ctx),
cancel: cancel,
state: state,
watchers: map[Watcher]bool{},
}
}

// AddWatcher adds the provided watcher to the set of watchers in Tracker.
// The OnStateChange() callback will be invoked asynchronously with the current
// state of the tracked entity to begin with, and subsequently for every state
// change.
//
// Returns a function to remove the provided watcher from the set of watchers.
// The caller of this method is responsible for invoking this function when it
// no longer needs to monitor the connectivity state changes on the channel.
func (t *Tracker) AddWatcher(watcher Watcher) func() {
if t.stopped {
easwars marked this conversation as resolved.
Show resolved Hide resolved
return func() {}
}

t.mu.Lock()
defer t.mu.Unlock()
t.watchers[watcher] = true

t.cs.Schedule(func(context.Context) {
t.mu.Lock()
defer t.mu.Unlock()
watcher.OnStateChange(t.state)
easwars marked this conversation as resolved.
Show resolved Hide resolved
})

return func() {
t.mu.Lock()
defer t.mu.Unlock()
delete(t.watchers, watcher)
}
}

// SetState updates the connectivity state of the entity being tracked, and
// invokes the OnStateChange callback of all registered watchers.
func (t *Tracker) SetState(state connectivity.State) {
if t.stopped {
easwars marked this conversation as resolved.
Show resolved Hide resolved
return
}

t.mu.Lock()
defer t.mu.Unlock()
// Update the cached state
easwars marked this conversation as resolved.
Show resolved Hide resolved
t.state = state
// Invoke callbacks on all registered watchers.
easwars marked this conversation as resolved.
Show resolved Hide resolved
for watcher := range t.watchers {
t.cs.Schedule(func(context.Context) {
t.mu.Lock()
defer t.mu.Unlock()
watcher.OnStateChange(t.state)
Copy link
Contributor Author

@my4-dev my4-dev Mar 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While implementing test cases, I noticed that watcher.OnStateChange could be called with an incorrect argument because of characteristics of closure if t.state would be changed very quickly.

e.g.

  1. SetState is called with connectivity.Idle.
  2. watcher.OnStateChange is scheduled.
  3. SetState is called with connectivity.Connecting immediately after step 2.
  4. watcher.OnStateChange is scheduled.
  5. Scheduled watcher.OnStateChange is called with t.state( = connectivity.Connecting) twice incorrectly.

In order to resolve this problem, I'm going to change t.state to state.
We would need to fix watcher.OnStateChange(t.state) in AddWatcher func as well.

Do you have any other opinions?
If so, I'll adopt yours.

})
}
}

// Stop shuts down the Tracker and releases any resources allocated by it.
// It is guaranteed that no Watcher callbacks would be invoked once this
// method returns.
func (t *Tracker) Stop() {
t.mu.Lock()
defer t.mu.Unlock()
t.stopped = true

t.cancel()
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,30 +16,30 @@
*
*/

package xdsclient
package grpcsync

import (
"context"

"google.golang.org/grpc/internal/buffer"
)

// callbackSerializer provides a mechanism to schedule callbacks in a
// CallbackSerializer provides a mechanism to schedule callbacks in a
// synchronized manner. It provides a FIFO guarantee on the order of execution
// of scheduled callbacks. New callbacks can be scheduled by invoking the
// Schedule() method.
//
// This type is safe for concurrent access.
type callbackSerializer struct {
type CallbackSerializer struct {
callbacks *buffer.Unbounded
}

// newCallbackSerializer returns a new callbackSerializer instance. The provided
// NewCallbackSerializer returns a new callbackSerializer instance. The provided
// context will be passed to the scheduled callbacks. Users should cancel the
// provided context to shutdown the callbackSerializer. It is guaranteed that no
// callbacks will be executed once this context is canceled.
func newCallbackSerializer(ctx context.Context) *callbackSerializer {
t := &callbackSerializer{callbacks: buffer.NewUnbounded()}
func NewCallbackSerializer(ctx context.Context) *CallbackSerializer {
t := &CallbackSerializer{callbacks: buffer.NewUnbounded()}
go t.run(ctx)
return t
}
Expand All @@ -48,11 +48,11 @@ func newCallbackSerializer(ctx context.Context) *callbackSerializer {
//
// Callbacks are expected to honor the context when performing any blocking
// operations, and should return early when the context is canceled.
func (t *callbackSerializer) Schedule(f func(ctx context.Context)) {
func (t *CallbackSerializer) Schedule(f func(ctx context.Context)) {
t.callbacks.Put(f)
}

func (t *callbackSerializer) run(ctx context.Context) {
func (t *CallbackSerializer) run(ctx context.Context) {
for ctx.Err() == nil {
select {
case <-ctx.Done():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*
*/

package xdsclient
package grpcsync

import (
"context"
Expand All @@ -28,11 +28,17 @@ import (
"github.com/google/go-cmp/cmp"
)

const (
defaultTestWatchExpiryTimeout = 500 * time.Millisecond
defaultTestTimeout = 5 * time.Second
defaultTestShortTimeout = 10 * time.Millisecond // For events expected to *not* happen.
)

// TestCallbackSerializer_Schedule_FIFO verifies that callbacks are executed in
// the same order in which they were scheduled.
func (s) TestCallbackSerializer_Schedule_FIFO(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
cs := newCallbackSerializer(ctx)
cs := NewCallbackSerializer(ctx)
defer cancel()

// We have two channels, one to record the order of scheduling, and the
Expand Down Expand Up @@ -100,7 +106,7 @@ func (s) TestCallbackSerializer_Schedule_FIFO(t *testing.T) {
// scheduled callbacks get executed.
func (s) TestCallbackSerializer_Schedule_Concurrent(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
cs := newCallbackSerializer(ctx)
cs := NewCallbackSerializer(ctx)
defer cancel()

// Schedule callbacks concurrently by calling Schedule() from goroutines.
Expand Down Expand Up @@ -136,7 +142,7 @@ func (s) TestCallbackSerializer_Schedule_Concurrent(t *testing.T) {
// are not executed once Close() returns.
func (s) TestCallbackSerializer_Schedule_Close(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
cs := newCallbackSerializer(ctx)
cs := NewCallbackSerializer(ctx)

// Schedule a callback which blocks until the context passed to it is
// canceled. It also closes a couple of channels to signal that it started
Expand Down
3 changes: 3 additions & 0 deletions internal/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@ var (
// server.
BinaryLogger interface{} // func(binarylog.Logger) grpc.ServerOption

// AddConnectivityStateWatcher adds a connectivitystate.Watcher to a provided grpc.ClientConn
AddConnectivityStateWatcher interface{} // func(*grpc.ClientConn, connectivitystate.Watcher)

// NewXDSResolverWithConfigForTesting creates a new xds resolver builder using
// the provided xds bootstrap config instead of the global configuration from
// the supported environment variables. The resolver.Builder is meant to be
Expand Down
5 changes: 3 additions & 2 deletions xds/internal/xdsclient/authority.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"time"

"google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/xds/internal/xdsclient/bootstrap"
"google.golang.org/grpc/xds/internal/xdsclient/load"
"google.golang.org/grpc/xds/internal/xdsclient/transport"
Expand Down Expand Up @@ -65,7 +66,7 @@ type authority struct {
serverCfg *bootstrap.ServerConfig // Server config for this authority
bootstrapCfg *bootstrap.Config // Full bootstrap configuration
refCount int // Reference count of watches referring to this authority
serializer *callbackSerializer // Callback serializer for invoking watch callbacks
serializer *grpcsync.CallbackSerializer // Callback serializer for invoking watch callbacks
resourceTypeGetter func(string) xdsresource.Type // ResourceType registry lookup
transport *transport.Transport // Underlying xDS transport to the management server
watchExpiryTimeout time.Duration // Resource watch expiry timeout
Expand Down Expand Up @@ -99,7 +100,7 @@ type authorityArgs struct {
// the second case.
serverCfg *bootstrap.ServerConfig
bootstrapCfg *bootstrap.Config
serializer *callbackSerializer
serializer *grpcsync.CallbackSerializer
resourceTypeGetter func(string) xdsresource.Type
watchExpiryTimeout time.Duration
logger *grpclog.PrefixLogger
Expand Down
3 changes: 2 additions & 1 deletion xds/internal/xdsclient/authority_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"time"

"github.com/google/uuid"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/internal/testutils/xds/e2e"
"google.golang.org/grpc/xds/internal"
"google.golang.org/grpc/xds/internal/testutils"
Expand Down Expand Up @@ -69,7 +70,7 @@ func setupTest(ctx context.Context, t *testing.T, opts e2e.ManagementServerOptio
bootstrapCfg: &bootstrap.Config{
NodeProto: &v3corepb.Node{Id: nodeID},
},
serializer: newCallbackSerializer(ctx),
serializer: grpcsync.NewCallbackSerializer(ctx),
resourceTypeGetter: rtRegistry.get,
watchExpiryTimeout: watchExpiryTimeout,
logger: nil,
Expand Down
2 changes: 1 addition & 1 deletion xds/internal/xdsclient/client_new.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func newWithConfig(config *bootstrap.Config, watchExpiryTimeout time.Duration, i
done: grpcsync.NewEvent(),
config: config,
watchExpiryTimeout: watchExpiryTimeout,
serializer: newCallbackSerializer(ctx),
serializer: grpcsync.NewCallbackSerializer(ctx),
serializerClose: cancel,
resourceTypes: newResourceTypeRegistry(),
authorities: make(map[string]*authority),
Expand Down
2 changes: 1 addition & 1 deletion xds/internal/xdsclient/clientimpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type clientImpl struct {
config *bootstrap.Config
logger *grpclog.PrefixLogger
watchExpiryTimeout time.Duration
serializer *callbackSerializer
serializer *grpcsync.CallbackSerializer
serializerClose func()
resourceTypes *resourceTypeRegistry

Expand Down