Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

grpc: Add perTargetDialOption type and global list #7234

Merged
merged 5 commits into from
May 21, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 20 additions & 22 deletions clientconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,16 @@ func NewClient(target string, opts ...DialOption) (conn *ClientConn, err error)
for _, opt := range opts {
opt.apply(&cc.dopts)
}

// Determine the resolver to use.
if err := cc.parseTargetAndFindResolver(); err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you mind converting this and cc.determineAuthority so they are not cc methods? Otherwise they might forget they're not supposed to be using cc.channelz.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This reads and writes a lot of stuff from cc. Unless you want me to pass in the target, dial options, and return a bunch of things to NewClient to write to cc's stuff. For the sake of readability I think it's better to keep it on cc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This used to all be in the function body inline and Easwar rewrote it (which I reviewed) to be on cc to factor out functionality into readable helpers and keep this function body clean.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess let's rename these to initParsedTargetAndResolverBuilder and initAuthority and then at least it's obvious they're being called before the cc is fully initialized.

return nil, err
}

for _, opt := range globalPerTargetDialOptions {
opt.DialOption(cc.parsedTarget.URL).apply(&cc.dopts)
}

chainUnaryClientInterceptors(cc)
chainStreamClientInterceptors(cc)

Expand All @@ -168,25 +178,14 @@ func NewClient(target string, opts ...DialOption) (conn *ClientConn, err error)
}
cc.mkp = cc.dopts.copts.KeepaliveParams

// Register ClientConn with channelz.
cc.channelzRegistration(target)

// TODO: Ideally it should be impossible to error from this function after
// channelz registration. This will require removing some channelz logs
// from the following functions that can error. Errors can be returned to
// the user, and successful logs can be emitted here, after the checks have
// passed and channelz is subsequently registered.

// Determine the resolver to use.
if err := cc.parseTargetAndFindResolver(); err != nil {
channelz.RemoveEntry(cc.channelz.ID)
return nil, err
}
if err = cc.determineAuthority(); err != nil {
channelz.RemoveEntry(cc.channelz.ID)
return nil, err
}

// Register ClientConn with channelz.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe something that says "Note that this is only done after channel creation cannot fail", so that nobody adds error cases after this. Maybe also move immediately before the return to make it doubly sure.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added comment. I don't think I can move it right before return because the component creation reads the channelz pointer of the channel to log in future, and if I move to right before return would read off a nil pointer and not log anything.

cc.channelzRegistration(target)
channelz.Infof(logger, cc.channelz, "Channel authority set to %q", cc.authority)

cc.csMgr = newConnectivityStateManager(cc.ctx, cc.channelz)
cc.pickerWrapper = newPickerWrapper(cc.dopts.copts.StatsHandlers)

Expand Down Expand Up @@ -1681,14 +1680,14 @@ func (cc *ClientConn) connectionError() error {
//
// Doesn't grab cc.mu as this method is expected to be called only at Dial time.
func (cc *ClientConn) parseTargetAndFindResolver() error {
channelz.Infof(logger, cc.channelz, "original dial target is: %q", cc.target)
logger.Infof("original dial target is: %q", cc.target)
dfawley marked this conversation as resolved.
Show resolved Hide resolved

var rb resolver.Builder
parsedTarget, err := parseTarget(cc.target)
if err != nil {
channelz.Infof(logger, cc.channelz, "dial target %q parse failed: %v", cc.target, err)
logger.Infof("dial target %q parse failed: %v", cc.target, err)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • This doesn't seem worth keeping. We're going to try some more stuff that might make it valid, and something as simple as hostname:port will trigger this.

Copy link
Contributor Author

@zasweq zasweq May 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deleted (and switched conditional to err == nil).

} else {
channelz.Infof(logger, cc.channelz, "parsed dial target is: %#v", parsedTarget)
logger.Infof("parsed dial target is: %#v", parsedTarget)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Move to NewClient please.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

rb = cc.getResolver(parsedTarget.URL.Scheme)
if rb != nil {
cc.parsedTarget = parsedTarget
Expand All @@ -1707,15 +1706,15 @@ func (cc *ClientConn) parseTargetAndFindResolver() error {
defScheme = resolver.GetDefaultScheme()
}

channelz.Infof(logger, cc.channelz, "fallback to scheme %q", defScheme)
logger.Infof("fallback to scheme %q", defScheme)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • This can be deleted.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, yeah because this now gets logged.

canonicalTarget := defScheme + ":///" + cc.target

parsedTarget, err = parseTarget(canonicalTarget)
if err != nil {
channelz.Infof(logger, cc.channelz, "dial target %q parse failed: %v", canonicalTarget, err)
logger.Infof("dial target %q parse failed: %v", canonicalTarget, err)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems unnecessary as we will return this error to the user.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, deleted.

return err
}
channelz.Infof(logger, cc.channelz, "parsed dial target is: %+v", parsedTarget)
logger.Infof("parsed dial target is: %+v", parsedTarget)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Delete since we already do this in NewClient per the above.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

rb = cc.getResolver(parsedTarget.URL.Scheme)
if rb == nil {
return fmt.Errorf("could not get resolver for default scheme: %q", parsedTarget.URL.Scheme)
Expand Down Expand Up @@ -1838,6 +1837,5 @@ func (cc *ClientConn) determineAuthority() error {
} else {
cc.authority = encodeAuthority(endpoint)
}
channelz.Infof(logger, cc.channelz, "Channel authority set to %q", cc.authority)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

return nil
}
29 changes: 29 additions & 0 deletions default_dial_option_server_option_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package grpc

import (
"fmt"
"net/url"
"strings"
"testing"

Expand Down Expand Up @@ -74,6 +75,34 @@ func (s) TestDisableGlobalOptions(t *testing.T) {
internal.ClearGlobalDialOptions()
}

type testPerTargetDialOption struct{}

func (do *testPerTargetDialOption) DialOption(parsedTarget url.URL) DialOption {
if parsedTarget.Scheme == "passthrough" {
return WithTransportCredentials(insecure.NewCredentials()) // credentials provided, should pass NewClient.
}
return EmptyDialOption{} // no credentials, should fail NewClient
}

// TestGlobalPerTargetDialOption configures a global per target dial option that
// produces transport credentials for channels using "passthrough" scheme.
// Channels that use the passthrough scheme should this be successfully created
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"This" makes no sense. 😆

(Delete "this")

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whoops haha, deleted.

// due to picking up transport credentials, whereas other channels should fail
// at creation due to not having transport credentials.
func (s) TestGlobalPerTargetDialOption(t *testing.T) {
internal.AddGlobalPerTargetDialOptions.(func(opt any))(&testPerTargetDialOption{})
noTSecStr := "no transport security set"
if _, err := NewClient("dns:///fake"); !strings.Contains(fmt.Sprint(err), noTSecStr) {
t.Fatalf("Dialing received unexpected error: %v, want error containing \"%v\"", err, noTSecStr)
}
cc, err := NewClient("passthrough:///nice")
if err != nil {
t.Fatalf("Dialing with insecure credentials failed: %v", err)
}
defer cc.Close()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: just cc.Close()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, changed.

internal.ClearGlobalPerTargetDialOptions()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Defer at the start, otherwise if this fails it impacts other tests.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed, and also changed the other tests in this file I wrote for gcp observability :).

}

func (s) TestAddGlobalServerOptions(t *testing.T) {
const maxRecvSize = 998765
// Set and check the ServerOptions
Expand Down
20 changes: 20 additions & 0 deletions dialoptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package grpc
import (
"context"
"net"
"net/url"
"time"

"google.golang.org/grpc/backoff"
Expand All @@ -43,6 +44,14 @@ func init() {
internal.ClearGlobalDialOptions = func() {
globalDialOptions = nil
}
internal.AddGlobalPerTargetDialOptions = func(opt any) {
if ptdo, ok := opt.(perTargetDialOption); ok {
globalPerTargetDialOptions = append(globalPerTargetDialOptions, ptdo)
}
}
internal.ClearGlobalPerTargetDialOptions = func() {
globalPerTargetDialOptions = nil
}
internal.WithBinaryLogger = withBinaryLogger
internal.JoinDialOptions = newJoinDialOption
internal.DisableGlobalDialOptions = newDisableGlobalDialOptions
Expand Down Expand Up @@ -89,6 +98,17 @@ type DialOption interface {

var globalDialOptions []DialOption

// perTargetDialOption takes a parsed target and returns a dial option to apply.
//
// This gets called after NewClient() parses the target, and allows per target
// configuration set through a returned DialOption.
type perTargetDialOption interface {
// DialOption returns a Dial Option to apply.
DialOption(parsedTarget url.URL) DialOption
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bikeshed: DialOptionForTarget?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, done.

}

var globalPerTargetDialOptions []perTargetDialOption

// EmptyDialOption does not alter the dial configuration. It can be embedded in
// another structure to build custom dial options.
//
Expand Down
20 changes: 16 additions & 4 deletions internal/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,14 @@ var (
// This is used in the 1.0 release of gcp/observability, and thus must not be
// deleted or changed.
ClearGlobalDialOptions func()

// AddGlobalPerTargetDialOptions adds a PerTargetDialOption that will be
// configured for newly created ClientConns.
AddGlobalPerTargetDialOptions any // func (opt any)
// ClearGlobalPerTargetDialOptions clears the slice of global late apply
// dial options.
ClearGlobalPerTargetDialOptions func()

// JoinDialOptions combines the dial options passed as arguments into a
// single dial option.
JoinDialOptions any // func(...grpc.DialOption) grpc.DialOption
Expand All @@ -126,7 +134,8 @@ var (
// deleted or changed.
BinaryLogger any // func(binarylog.Logger) grpc.ServerOption

// SubscribeToConnectivityStateChanges adds a grpcsync.Subscriber to a provided grpc.ClientConn
// SubscribeToConnectivityStateChanges adds a grpcsync.Subscriber to a
// provided grpc.ClientConn.
SubscribeToConnectivityStateChanges any // func(*grpc.ClientConn, grpcsync.Subscriber)

// NewXDSResolverWithConfigForTesting creates a new xds resolver builder using
Expand Down Expand Up @@ -195,14 +204,17 @@ var (
// resource name.
TriggerXDSResourceNameNotFoundClient any // func(string, string) error

// FromOutgoingContextRaw returns the un-merged, intermediary contents of metadata.rawMD.
// FromOutgoingContextRaw returns the un-merged, intermediary contents of
// metadata.rawMD.
FromOutgoingContextRaw any // func(context.Context) (metadata.MD, [][]string, bool)

// UserSetDefaultScheme is set to true if the user has overridden the default resolver scheme.
// UserSetDefaultScheme is set to true if the user has overridden the
// default resolver scheme.
UserSetDefaultScheme bool = false
)

// HealthChecker defines the signature of the client-side LB channel health checking function.
// HealthChecker defines the signature of the client-side LB channel health
// checking function.
//
// The implementation is expected to create a health checking RPC stream by
// calling newStream(), watch for the health status of serviceName, and report
Expand Down
Loading