Skip to content

Commit

Permalink
[configgrpc] wrap gRPC client/server options in extensible interface (#…
Browse files Browse the repository at this point in the history
…11069)

#### Description

To allow extending the possible option types provided to
`configgrpc.ClientConfig.ToClientConn` and
`configgrpc.ServerConfig.ToServer` in the future, we want to wrap the
`grpc.DialOption` and `grpc.ServerOption` parameters in more generic
`ToClientConnOption` and `ToServerOption` interfaces.

For compatibility, we start by adding new `ToClientConnWithOptions` and
`ToServerWithOptions` methods, to which the now deprecated
`ToClientConn` and `ToServer` defer. A second PR will be needed to fully
replace the original methods.

#### Link to tracking issue
Fixes #9480

#### Testing
No tests have been added. Feel free to tell me if I should add some.

#### Documentation
No documentation has been added.

---------

Co-authored-by: Alex Boten <223565+codeboten@users.noreply.github.com>
  • Loading branch information
jade-guiton-dd and codeboten authored Sep 19, 2024
1 parent 134c956 commit 459b429
Show file tree
Hide file tree
Showing 5 changed files with 183 additions and 19 deletions.
28 changes: 28 additions & 0 deletions .chloggen/9480-configgrpc-option-wrapper.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: 'deprecation'

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: configgrpc

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: "Deprecate `ClientConfig.ToClientConn`/`ServerConfig.ToServer` in favor of `ToClientConnWithOptions`/`ToServerWithOptions`"

# One or more tracking issues or pull requests related to the change
issues: [9480]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
Users providing a grpc.DialOption/grpc.ServerOption should now wrap them into
a generic option with `WithGrpcDialOption`/`WithGrpcServerOption`.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api]
117 changes: 106 additions & 11 deletions config/configgrpc/configgrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,16 +226,58 @@ func (gcs *ClientConfig) isSchemeHTTPS() bool {
// a non-blocking dial (the function won't wait for connections to be
// established, and connecting happens in the background). To make it a blocking
// dial, use grpc.WithBlock() dial option.
func (gcs *ClientConfig) ToClientConn(ctx context.Context, host component.Host, settings component.TelemetrySettings, extraOpts ...grpc.DialOption) (*grpc.ClientConn, error) {
opts, err := gcs.toDialOptions(ctx, host, settings)
//
// Deprecated: [v0.110.0] If providing a [grpc.DialOption], use [ClientConfig.ToClientConnWithOptions]
// with [WithGrpcDialOption] instead.
func (gcs *ClientConfig) ToClientConn(
ctx context.Context,
host component.Host,
settings component.TelemetrySettings,
grpcOpts ...grpc.DialOption,
) (*grpc.ClientConn, error) {
var extraOpts []ToClientConnOption
for _, grpcOpt := range grpcOpts {
extraOpts = append(extraOpts, WithGrpcDialOption(grpcOpt))
}
return gcs.ToClientConnWithOptions(ctx, host, settings, extraOpts...)
}

// ToClientConnOption is a sealed interface wrapping options for [ClientConfig.ToClientConnWithOptions].
type ToClientConnOption interface {
isToClientConnOption()
}

type grpcDialOptionWrapper struct {
opt grpc.DialOption
}

// WithGrpcDialOption wraps a [grpc.DialOption] into a [ToClientConnOption].
func WithGrpcDialOption(opt grpc.DialOption) ToClientConnOption {
return grpcDialOptionWrapper{opt: opt}
}
func (grpcDialOptionWrapper) isToClientConnOption() {}

// ToClientConnWithOptions is the same as [ClientConfig.ToClientConn], but uses the [ToClientConnOption] interface for options.
// This method will eventually replace [ClientConfig.ToClientConn].
func (gcs *ClientConfig) ToClientConnWithOptions(
ctx context.Context,
host component.Host,
settings component.TelemetrySettings,
extraOpts ...ToClientConnOption,
) (*grpc.ClientConn, error) {
grpcOpts, err := gcs.getGrpcDialOptions(ctx, host, settings, extraOpts)
if err != nil {
return nil, err
}
opts = append(opts, extraOpts...)
return grpc.NewClient(gcs.sanitizedEndpoint(), opts...)
return grpc.NewClient(gcs.sanitizedEndpoint(), grpcOpts...)
}

func (gcs *ClientConfig) toDialOptions(ctx context.Context, host component.Host, settings component.TelemetrySettings) ([]grpc.DialOption, error) {
func (gcs *ClientConfig) getGrpcDialOptions(
ctx context.Context,
host component.Host,
settings component.TelemetrySettings,
extraOpts []ToClientConnOption,
) ([]grpc.DialOption, error) {
var opts []grpc.DialOption
if gcs.Compression.IsCompressed() {
cp, err := getGRPCCompressionName(gcs.Compression)
Expand Down Expand Up @@ -312,6 +354,12 @@ func (gcs *ClientConfig) toDialOptions(ctx context.Context, host component.Host,
// Enable OpenTelemetry observability plugin.
opts = append(opts, grpc.WithStatsHandler(otelgrpc.NewClientHandler(otelOpts...)))

for _, opt := range extraOpts {
if wrapper, ok := opt.(grpcDialOptionWrapper); ok {
opts = append(opts, wrapper.opt)
}
}

return opts, nil
}

Expand All @@ -335,17 +383,58 @@ func (gss *ServerConfig) Validate() error {
return nil
}

// ToServer returns a grpc.Server for the configuration
func (gss *ServerConfig) ToServer(_ context.Context, host component.Host, settings component.TelemetrySettings, extraOpts ...grpc.ServerOption) (*grpc.Server, error) {
opts, err := gss.toServerOption(host, settings)
// ToServer returns a [grpc.Server] for the configuration
//
// Deprecated: [v0.110.0] If providing a [grpc.ServerOption], use [ServerConfig.ToServerWithOptions]
// with [WithGrpcServerOption] instead.
func (gss *ServerConfig) ToServer(
ctx context.Context,
host component.Host,
settings component.TelemetrySettings,
grpcOpts ...grpc.ServerOption,
) (*grpc.Server, error) {
var extraOpts []ToServerOption
for _, grpcOpt := range grpcOpts {
extraOpts = append(extraOpts, WithGrpcServerOption(grpcOpt))
}
return gss.ToServerWithOptions(ctx, host, settings, extraOpts...)
}

// ToServerOption is a sealed interface wrapping options for [ServerConfig.ToServerWithOptions].
type ToServerOption interface {
isToServerOption()
}

type grpcServerOptionWrapper struct {
opt grpc.ServerOption
}

// WithGrpcServerOption wraps a [grpc.ServerOption] into a [ToServerOption].
func WithGrpcServerOption(opt grpc.ServerOption) ToServerOption {
return grpcServerOptionWrapper{opt: opt}
}
func (grpcServerOptionWrapper) isToServerOption() {}

// ToServerWithOptions is the same as [ServerConfig.ToServer], but uses the [ToServerOption] interface for options.
// This method will eventually replace [ServerConfig.ToServer].
func (gss *ServerConfig) ToServerWithOptions(
_ context.Context,
host component.Host,
settings component.TelemetrySettings,
extraOpts ...ToServerOption,
) (*grpc.Server, error) {
grpcOpts, err := gss.getGrpcServerOptions(host, settings, extraOpts)
if err != nil {
return nil, err
}
opts = append(opts, extraOpts...)
return grpc.NewServer(opts...), nil
return grpc.NewServer(grpcOpts...), nil
}

func (gss *ServerConfig) toServerOption(host component.Host, settings component.TelemetrySettings) ([]grpc.ServerOption, error) {
func (gss *ServerConfig) getGrpcServerOptions(
host component.Host,
settings component.TelemetrySettings,
extraOpts []ToServerOption,
) ([]grpc.ServerOption, error) {
switch gss.NetAddr.Transport {
case confignet.TransportTypeTCP, confignet.TransportTypeTCP4, confignet.TransportTypeTCP6, confignet.TransportTypeUDP, confignet.TransportTypeUDP4, confignet.TransportTypeUDP6:
internal.WarnOnUnspecifiedHost(settings.Logger, gss.NetAddr.Endpoint)
Expand Down Expand Up @@ -435,6 +524,12 @@ func (gss *ServerConfig) toServerOption(host component.Host, settings component.

opts = append(opts, grpc.StatsHandler(otelgrpc.NewServerHandler(otelOpts...)), grpc.ChainUnaryInterceptor(uInterceptors...), grpc.ChainStreamInterceptor(sInterceptors...))

for _, opt := range extraOpts {
if wrapper, ok := opt.(grpcServerOptionWrapper); ok {
opts = append(opts, wrapper.opt)
}
}

return opts, nil
}

Expand Down
51 changes: 45 additions & 6 deletions config/configgrpc/configgrpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,33 @@ func TestDefaultGrpcClientSettings(t *testing.T) {
Insecure: true,
},
}
opts, err := gcs.toDialOptions(context.Background(), componenttest.NewNopHost(), tt.TelemetrySettings())
opts, err := gcs.getGrpcDialOptions(context.Background(), componenttest.NewNopHost(), tt.TelemetrySettings(), []ToClientConnOption{})
require.NoError(t, err)
assert.Len(t, opts, 2)
}

func TestGrpcClientExtraOption(t *testing.T) {
tt, err := componenttest.SetupTelemetry(componentID)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })

gcs := &ClientConfig{
TLSSetting: configtls.ClientConfig{
Insecure: true,
},
}
extraOpt := grpc.WithUserAgent("test-agent")
opts, err := gcs.getGrpcDialOptions(
context.Background(),
componenttest.NewNopHost(),
tt.TelemetrySettings(),
[]ToClientConnOption{WithGrpcDialOption(extraOpt)},
)
require.NoError(t, err)
assert.Len(t, opts, 3)
assert.Equal(t, opts[2], extraOpt)
}

func TestAllGrpcClientSettings(t *testing.T) {
tt, err := componenttest.SetupTelemetry(componentID)
require.NoError(t, err)
Expand Down Expand Up @@ -231,7 +253,7 @@ func TestAllGrpcClientSettings(t *testing.T) {
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
opts, err := test.settings.toDialOptions(context.Background(), test.host, tt.TelemetrySettings())
opts, err := test.settings.getGrpcDialOptions(context.Background(), test.host, tt.TelemetrySettings(), []ToClientConnOption{})
require.NoError(t, err)
assert.Len(t, opts, 9)
})
Expand All @@ -244,11 +266,28 @@ func TestDefaultGrpcServerSettings(t *testing.T) {
Endpoint: "0.0.0.0:1234",
},
}
opts, err := gss.toServerOption(componenttest.NewNopHost(), componenttest.NewNopTelemetrySettings())
opts, err := gss.getGrpcServerOptions(componenttest.NewNopHost(), componenttest.NewNopTelemetrySettings(), []ToServerOption{})
require.NoError(t, err)
assert.Len(t, opts, 3)
}

func TestGrpcServerExtraOption(t *testing.T) {
gss := &ServerConfig{
NetAddr: confignet.AddrConfig{
Endpoint: "0.0.0.0:1234",
},
}
extraOpt := grpc.ConnectionTimeout(1_000_000_000)
opts, err := gss.getGrpcServerOptions(
componenttest.NewNopHost(),
componenttest.NewNopTelemetrySettings(),
[]ToServerOption{WithGrpcServerOption(extraOpt)},
)
require.NoError(t, err)
assert.Len(t, opts, 4)
assert.Equal(t, opts[3], extraOpt)
}

func TestGrpcServerValidate(t *testing.T) {
tests := []struct {
gss *ServerConfig
Expand Down Expand Up @@ -329,7 +368,7 @@ func TestAllGrpcServerSettingsExceptAuth(t *testing.T) {
},
},
}
opts, err := gss.toServerOption(componenttest.NewNopHost(), componenttest.NewNopTelemetrySettings())
opts, err := gss.getGrpcServerOptions(componenttest.NewNopHost(), componenttest.NewNopTelemetrySettings(), []ToServerOption{})
require.NoError(t, err)
assert.Len(t, opts, 10)
}
Expand Down Expand Up @@ -488,7 +527,7 @@ func TestUseSecure(t *testing.T) {
TLSSetting: configtls.ClientConfig{},
Keepalive: nil,
}
dialOpts, err := gcs.toDialOptions(context.Background(), componenttest.NewNopHost(), tt.TelemetrySettings())
dialOpts, err := gcs.getGrpcDialOptions(context.Background(), componenttest.NewNopHost(), tt.TelemetrySettings(), []ToClientConnOption{})
require.NoError(t, err)
assert.Len(t, dialOpts, 2)
}
Expand Down Expand Up @@ -540,7 +579,7 @@ func TestGRPCServerWarning(t *testing.T) {
logger, observed := observer.New(zap.DebugLevel)
set.Logger = zap.New(logger)

opts, err := test.settings.toServerOption(componenttest.NewNopHost(), set)
opts, err := test.settings.getGrpcServerOptions(componenttest.NewNopHost(), set, []ToServerOption{})
require.NoError(t, err)
require.NotNil(t, opts)
_ = grpc.NewServer(opts...)
Expand Down
4 changes: 3 additions & 1 deletion exporter/otlpexporter/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"google.golang.org/grpc/status"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configgrpc"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterhelper"
Expand Down Expand Up @@ -58,7 +59,8 @@ func newExporter(cfg component.Config, set exporter.Settings) *baseExporter {
// start actually creates the gRPC connection. The client construction is deferred till this point as this
// is the only place we get hold of Extensions which are required to construct auth round tripper.
func (e *baseExporter) start(ctx context.Context, host component.Host) (err error) {
if e.clientConn, err = e.config.ClientConfig.ToClientConn(ctx, host, e.settings, grpc.WithUserAgent(e.userAgent)); err != nil {
agentOpt := configgrpc.WithGrpcDialOption(grpc.WithUserAgent(e.userAgent))
if e.clientConn, err = e.config.ClientConfig.ToClientConnWithOptions(ctx, host, e.settings, agentOpt); err != nil {
return err
}
e.traceExporter = ptraceotlp.NewGRPCClient(e.clientConn)
Expand Down
2 changes: 1 addition & 1 deletion receiver/otlpreceiver/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func (r *otlpReceiver) startGRPCServer(host component.Host) error {
}

var err error
if r.serverGRPC, err = r.cfg.GRPC.ToServer(context.Background(), host, r.settings.TelemetrySettings); err != nil {
if r.serverGRPC, err = r.cfg.GRPC.ToServerWithOptions(context.Background(), host, r.settings.TelemetrySettings); err != nil {
return err
}

Expand Down

0 comments on commit 459b429

Please sign in to comment.