Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xdsclient: switch more transport tests to e2e style (2/N) #7693

Merged
merged 4 commits into from
Oct 7, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions xds/internal/xdsclient/authority.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ type authorityArgs struct {
serializer *grpcsync.CallbackSerializer
resourceTypeGetter func(string) xdsresource.Type
watchExpiryTimeout time.Duration
backoff func(int) time.Duration // Backoff for ADS and LRS stream failures.
logger *grpclog.PrefixLogger
}

Expand All @@ -123,6 +124,7 @@ func newAuthority(args authorityArgs) (*authority, error) {
OnRecvHandler: ret.handleResourceUpdate,
OnErrorHandler: ret.newConnectionError,
OnSendHandler: ret.transportOnSendHandler,
Backoff: args.backoff,
Logger: args.logger,
NodeProto: args.bootstrapCfg.Node(),
})
Expand Down
16 changes: 13 additions & 3 deletions xds/internal/xdsclient/client_new.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"time"

"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/backoff"
"google.golang.org/grpc/internal/cache"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/internal/xds/bootstrap"
Expand Down Expand Up @@ -53,16 +54,17 @@ const NameForServer = "#server"
// only when all references are released, and it is safe for the caller to
// invoke this close function multiple times.
func New(name string) (XDSClient, func(), error) {
return newRefCounted(name, defaultWatchExpiryTimeout, defaultIdleAuthorityDeleteTimeout)
return newRefCounted(name, defaultWatchExpiryTimeout, defaultIdleAuthorityDeleteTimeout, backoff.DefaultExponential.Backoff)
}

// newClientImpl returns a new xdsClient with the given config.
func newClientImpl(config *bootstrap.Config, watchExpiryTimeout time.Duration, idleAuthorityDeleteTimeout time.Duration) (*clientImpl, error) {
func newClientImpl(config *bootstrap.Config, watchExpiryTimeout time.Duration, idleAuthorityDeleteTimeout time.Duration, streamBackoff func(int) time.Duration) (*clientImpl, error) {
ctx, cancel := context.WithCancel(context.Background())
c := &clientImpl{
done: grpcsync.NewEvent(),
config: config,
watchExpiryTimeout: watchExpiryTimeout,
backoff: streamBackoff,
serializer: grpcsync.NewCallbackSerializer(ctx),
serializerClose: cancel,
resourceTypes: newResourceTypeRegistry(),
Expand Down Expand Up @@ -90,6 +92,11 @@ type OptionsForTesting struct {
// AuthorityIdleTimeout is the timeout before idle authorities are deleted.
// If unspecified, uses the default value used in non-test code.
AuthorityIdleTimeout time.Duration

// StreamBackoffAfterFailure is the backoff function used to determine the
// backoff duration after stream failures. If unspecified, uses the default
// value used in non-test code.
StreamBackoffAfterFailure func(int) time.Duration
}

// NewForTesting returns an xDS client configured with the provided options.
Expand All @@ -111,11 +118,14 @@ func NewForTesting(opts OptionsForTesting) (XDSClient, func(), error) {
if opts.AuthorityIdleTimeout == 0 {
opts.AuthorityIdleTimeout = defaultIdleAuthorityDeleteTimeout
}
if opts.StreamBackoffAfterFailure == nil {
opts.StreamBackoffAfterFailure = defaultStreamBackoffFunc
}

if err := bootstrap.SetFallbackBootstrapConfig(opts.Contents); err != nil {
return nil, nil, err
}
client, cancel, err := newRefCounted(opts.Name, opts.WatchExpiryTimeout, opts.AuthorityIdleTimeout)
client, cancel, err := newRefCounted(opts.Name, opts.WatchExpiryTimeout, opts.AuthorityIdleTimeout, opts.StreamBackoffAfterFailure)
return client, func() { bootstrap.UnsetFallbackBootstrapConfigForTesting(); cancel() }, err
}

Expand Down
7 changes: 5 additions & 2 deletions xds/internal/xdsclient/client_refcounted.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"sync/atomic"
"time"

"google.golang.org/grpc/internal/backoff"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/internal/xds/bootstrap"
)
Expand All @@ -37,6 +38,8 @@ var (
// overridden in tests to give them visibility into certain events.
xdsClientImplCreateHook = func(string) {}
xdsClientImplCloseHook = func(string) {}

defaultStreamBackoffFunc = backoff.DefaultExponential.Backoff
)

func clientRefCountedClose(name string) {
Expand All @@ -60,7 +63,7 @@ func clientRefCountedClose(name string) {
// newRefCounted creates a new reference counted xDS client implementation for
// name, if one does not exist already. If an xDS client for the given name
// exists, it gets a reference to it and returns it.
func newRefCounted(name string, watchExpiryTimeout, idleAuthorityTimeout time.Duration) (XDSClient, func(), error) {
func newRefCounted(name string, watchExpiryTimeout, idleAuthorityTimeout time.Duration, streamBackoff func(int) time.Duration) (XDSClient, func(), error) {
clientsMu.Lock()
defer clientsMu.Unlock()

Expand All @@ -74,7 +77,7 @@ func newRefCounted(name string, watchExpiryTimeout, idleAuthorityTimeout time.Du
if err != nil {
return nil, nil, fmt.Errorf("xds: failed to get xDS bootstrap config: %v", err)
}
c, err := newClientImpl(config, watchExpiryTimeout, idleAuthorityTimeout)
c, err := newClientImpl(config, watchExpiryTimeout, idleAuthorityTimeout, streamBackoff)
if err != nil {
return nil, nil, err
}
Expand Down
1 change: 1 addition & 0 deletions xds/internal/xdsclient/clientimpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type clientImpl struct {
config *bootstrap.Config
logger *grpclog.PrefixLogger
watchExpiryTimeout time.Duration
backoff func(int) time.Duration // Backoff for ADS and LRS stream failures.
serializer *grpcsync.CallbackSerializer
serializerClose func()
resourceTypes *resourceTypeRegistry
Expand Down
1 change: 1 addition & 0 deletions xds/internal/xdsclient/clientimpl_authority.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ func (c *clientImpl) newAuthorityLocked(config *bootstrap.ServerConfig) (_ *auth
serializer: c.serializer,
resourceTypeGetter: c.resourceTypes.get,
watchExpiryTimeout: c.watchExpiryTimeout,
backoff: c.backoff,
logger: grpclog.NewPrefixLogger(logger, authorityPrefix(c, config.ServerURI())),
})
if err != nil {
Expand Down
Loading
Loading