Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Provide an internal-only API to report connectivity state changes on the ClientConn #6036

Closed
wants to merge 17 commits into from
22 changes: 18 additions & 4 deletions clientconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,10 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/backoff"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/connectivitystate"
"google.golang.org/grpc/internal/grpcsync"
iresolver "google.golang.org/grpc/internal/resolver"
"google.golang.org/grpc/internal/transport"
Expand Down Expand Up @@ -406,10 +408,11 @@ func getChainStreamer(interceptors []StreamClientInterceptor, curr int, finalStr
// connectivityStateManager keeps the connectivity.State of ClientConn.
// This struct will eventually be exported so the balancers can access it.
easwars marked this conversation as resolved.
Show resolved Hide resolved
type connectivityStateManager struct {
mu sync.Mutex
state connectivity.State
notifyChan chan struct{}
channelzID *channelz.Identifier
mu sync.Mutex
state connectivity.State
notifyChan chan struct{}
channelzID *channelz.Identifier
connectivityStateTracker *connectivitystate.Tracker
}

// updateState updates the connectivity.State of ClientConn.
Expand All @@ -425,6 +428,9 @@ func (csm *connectivityStateManager) updateState(state connectivity.State) {
return
}
csm.state = state
if csm.connectivityStateTracker != nil {
csm.connectivityStateTracker.SetState(state)
}
channelz.Infof(logger, csm.channelzID, "Channel Connectivity change to %v", state)
if csm.notifyChan != nil {
// There are other goroutines waiting on this channel.
Expand Down Expand Up @@ -598,6 +604,14 @@ func init() {
panic(fmt.Sprintf("impossible error parsing empty service config: %v", cfg.Err))
}
emptyServiceConfig = cfg.Config.(*ServiceConfig)

internal.AddConnectivityStateWatcher = func(cc *ClientConn, w connectivitystate.Watcher) func() {
easwars marked this conversation as resolved.
Show resolved Hide resolved
if cc.csMgr.connectivityStateTracker == nil {
cc.csMgr.connectivityStateTracker = connectivitystate.NewTracker(connectivity.Idle)
}
cancelFunc := cc.csMgr.connectivityStateTracker.AddWatcher(w)
return cancelFunc
}
}

func (cc *ClientConn) maybeApplyDefaultServiceConfig(addrs []resolver.Address) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,30 +16,32 @@
*
*/

package xdsclient
// Package callbackserializer defines CallbackSerializer to provide a mechanism
// to schedule callbacks in a synchronized manner.
easwars marked this conversation as resolved.
Show resolved Hide resolved
package callbackserializer

import (
"context"

"google.golang.org/grpc/internal/buffer"
)

// callbackSerializer provides a mechanism to schedule callbacks in a
// CallbackSerializer provides a mechanism to schedule callbacks in a
// synchronized manner. It provides a FIFO guarantee on the order of execution
// of scheduled callbacks. New callbacks can be scheduled by invoking the
// Schedule() method.
//
// This type is safe for concurrent access.
type callbackSerializer struct {
type CallbackSerializer struct {
callbacks *buffer.Unbounded
}

// newCallbackSerializer returns a new callbackSerializer instance. The provided
// New returns a new callbackSerializer instance. The provided
// context will be passed to the scheduled callbacks. Users should cancel the
// provided context to shutdown the callbackSerializer. It is guaranteed that no
// callbacks will be executed once this context is canceled.
func newCallbackSerializer(ctx context.Context) *callbackSerializer {
t := &callbackSerializer{callbacks: buffer.NewUnbounded()}
func New(ctx context.Context) *CallbackSerializer {
t := &CallbackSerializer{callbacks: buffer.NewUnbounded()}
go t.run(ctx)
return t
}
Expand All @@ -48,11 +50,11 @@ func newCallbackSerializer(ctx context.Context) *callbackSerializer {
//
// Callbacks are expected to honor the context when performing any blocking
// operations, and should return early when the context is canceled.
func (t *callbackSerializer) Schedule(f func(ctx context.Context)) {
func (t *CallbackSerializer) Schedule(f func(ctx context.Context)) {
t.callbacks.Put(f)
}

func (t *callbackSerializer) run(ctx context.Context) {
func (t *CallbackSerializer) run(ctx context.Context) {
for ctx.Err() == nil {
select {
case <-ctx.Done():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*
*/

package xdsclient
package callbackserializer

import (
"context"
Expand All @@ -26,13 +26,28 @@ import (
"time"

"github.com/google/go-cmp/cmp"
"google.golang.org/grpc/internal/grpctest"
)

const (
defaultTestWatchExpiryTimeout = 500 * time.Millisecond
defaultTestTimeout = 5 * time.Second
defaultTestShortTimeout = 10 * time.Millisecond // For events expected to *not* happen.
)

type s struct {
grpctest.Tester
}

func Test(t *testing.T) {
grpctest.RunSubTests(t, s{})
}

// TestCallbackSerializer_Schedule_FIFO verifies that callbacks are executed in
// the same order in which they were scheduled.
func (s) TestCallbackSerializer_Schedule_FIFO(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
cs := newCallbackSerializer(ctx)
cs := New(ctx)
defer cancel()

// We have two channels, one to record the order of scheduling, and the
Expand Down Expand Up @@ -100,7 +115,7 @@ func (s) TestCallbackSerializer_Schedule_FIFO(t *testing.T) {
// scheduled callbacks get executed.
func (s) TestCallbackSerializer_Schedule_Concurrent(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
cs := newCallbackSerializer(ctx)
cs := New(ctx)
defer cancel()

// Schedule callbacks concurrently by calling Schedule() from goroutines.
Expand Down Expand Up @@ -136,7 +151,7 @@ func (s) TestCallbackSerializer_Schedule_Concurrent(t *testing.T) {
// are not executed once Close() returns.
func (s) TestCallbackSerializer_Schedule_Close(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
cs := newCallbackSerializer(ctx)
cs := New(ctx)

// Schedule a callback which blocks until the context passed to it is
// canceled. It also closes a couple of channels to signal that it started
Expand Down
93 changes: 93 additions & 0 deletions internal/connectivitystate/connectivitystate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
*
* Copyright 2023 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

// Package connectivitystate provides functionality to report and track
// connectivity state changes of ClientConns and SubConns.
package connectivitystate

import (
"context"
"sync"

"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/internal/callbackserializer"
)

// Watcher defines the functions which is executed as soon as a connectivity state changes
// of Tracker is reported.
easwars marked this conversation as resolved.
Show resolved Hide resolved
type Watcher interface {
// OnStateChange is invoked when connectivity state changes on ClientConn is reported.
easwars marked this conversation as resolved.
Show resolved Hide resolved
OnStateChange(state connectivity.State)
}

// Tracker manages watchers and their status. It holds a previous connecitivity state of
// ClientConns and SubConns.
easwars marked this conversation as resolved.
Show resolved Hide resolved
type Tracker struct {
mu sync.Mutex
state connectivity.State
watchers map[Watcher]bool
cs *callbackserializer.CallbackSerializer
}
easwars marked this conversation as resolved.
Show resolved Hide resolved

// NewTracker returns a new Tracker instance.
easwars marked this conversation as resolved.
Show resolved Hide resolved
func NewTracker(state connectivity.State) *Tracker {
ctx := context.Background()
easwars marked this conversation as resolved.
Show resolved Hide resolved
return &Tracker{
state: state,
watchers: map[Watcher]bool{},
cs: callbackserializer.New(ctx),
}
}

// AddWatcher adds a provided watcher to the set of watchers in Tracker.
// Schedules a callback on the provided watcher with current state.
// Returns a function to remove the provided watcher from the set of watchers. The caller
// of this method is responsible for invoking this function when it no longer needs to
// monitor the connectivity state changes on the channel.
easwars marked this conversation as resolved.
Show resolved Hide resolved
func (t *Tracker) AddWatcher(watcher Watcher) func() {
t.mu.Lock()
defer t.mu.Unlock()
t.watchers[watcher] = true

t.cs.Schedule(func(_ context.Context) {
easwars marked this conversation as resolved.
Show resolved Hide resolved
watcher.OnStateChange(t.state)
easwars marked this conversation as resolved.
Show resolved Hide resolved
})
easwars marked this conversation as resolved.
Show resolved Hide resolved

return func() {
t.mu.Lock()
defer t.mu.Unlock()
t.watchers[watcher] = false
easwars marked this conversation as resolved.
Show resolved Hide resolved
}
}

// SetState is called to publish the connectivity.State changes on ClientConn
// to watchers.
easwars marked this conversation as resolved.
Show resolved Hide resolved
func (t *Tracker) SetState(state connectivity.State) {
t.mu.Lock()
defer t.mu.Unlock()
// Update the cached state
easwars marked this conversation as resolved.
Show resolved Hide resolved
t.state = state
// Invoke callbacks on all registered watchers.
easwars marked this conversation as resolved.
Show resolved Hide resolved
for watcher, isEffective := range t.watchers {
if isEffective {
t.cs.Schedule(func(_ context.Context) {
watcher.OnStateChange(t.state)
})
}
easwars marked this conversation as resolved.
Show resolved Hide resolved
}
}
3 changes: 3 additions & 0 deletions internal/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@ var (
// server.
BinaryLogger interface{} // func(binarylog.Logger) grpc.ServerOption

// AddConnectivityStateWatcher adds a connectivitystate.Watcher to a provided grpc.ClientConn
AddConnectivityStateWatcher interface{} // func(*grpc.ClientConn, connectivitystate.Watcher)

// NewXDSResolverWithConfigForTesting creates a new xds resolver builder using
// the provided xds bootstrap config instead of the global configuration from
// the supported environment variables. The resolver.Builder is meant to be
Expand Down
17 changes: 9 additions & 8 deletions xds/internal/xdsclient/authority.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"sync"
"time"

"google.golang.org/grpc/internal/callbackserializer"
"google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/xds/internal/xdsclient/bootstrap"
"google.golang.org/grpc/xds/internal/xdsclient/load"
Expand Down Expand Up @@ -61,13 +62,13 @@ type resourceState struct {
// share the same authority instance amongst these entries, and the reference
// counting is taken care of by the `clientImpl` type.
type authority struct {
serverCfg *bootstrap.ServerConfig // Server config for this authority
bootstrapCfg *bootstrap.Config // Full bootstrap configuration
refCount int // Reference count of watches referring to this authority
serializer *callbackSerializer // Callback serializer for invoking watch callbacks
resourceTypeGetter func(string) xdsresource.Type // ResourceType registry lookup
transport *transport.Transport // Underlying xDS transport to the management server
watchExpiryTimeout time.Duration // Resource watch expiry timeout
serverCfg *bootstrap.ServerConfig // Server config for this authority
bootstrapCfg *bootstrap.Config // Full bootstrap configuration
refCount int // Reference count of watches referring to this authority
serializer *callbackserializer.CallbackSerializer // Callback serializer for invoking watch callbacks
resourceTypeGetter func(string) xdsresource.Type // ResourceType registry lookup
transport *transport.Transport // Underlying xDS transport to the management server
watchExpiryTimeout time.Duration // Resource watch expiry timeout
logger *grpclog.PrefixLogger

// A two level map containing the state of all the resources being watched.
Expand Down Expand Up @@ -98,7 +99,7 @@ type authorityArgs struct {
// the second case.
serverCfg *bootstrap.ServerConfig
bootstrapCfg *bootstrap.Config
serializer *callbackSerializer
serializer *callbackserializer.CallbackSerializer
resourceTypeGetter func(string) xdsresource.Type
watchExpiryTimeout time.Duration
logger *grpclog.PrefixLogger
Expand Down
3 changes: 2 additions & 1 deletion xds/internal/xdsclient/client_new.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"time"

"google.golang.org/grpc/internal/cache"
"google.golang.org/grpc/internal/callbackserializer"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/xds/internal/xdsclient/bootstrap"
)
Expand Down Expand Up @@ -69,7 +70,7 @@ func newWithConfig(config *bootstrap.Config, watchExpiryTimeout time.Duration, i
done: grpcsync.NewEvent(),
config: config,
watchExpiryTimeout: watchExpiryTimeout,
serializer: newCallbackSerializer(ctx),
serializer: callbackserializer.New(ctx),
serializerClose: cancel,
resourceTypes: newResourceTypeRegistry(),
authorities: make(map[string]*authority),
Expand Down
3 changes: 2 additions & 1 deletion xds/internal/xdsclient/clientimpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"time"

"google.golang.org/grpc/internal/cache"
"google.golang.org/grpc/internal/callbackserializer"
"google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/xds/internal/xdsclient/bootstrap"
Expand All @@ -37,7 +38,7 @@ type clientImpl struct {
config *bootstrap.Config
logger *grpclog.PrefixLogger
watchExpiryTimeout time.Duration
serializer *callbackSerializer
serializer *callbackserializer.CallbackSerializer
serializerClose func()
resourceTypes *resourceTypeRegistry

Expand Down