diff --git a/cmd/all-in-one/main.go b/cmd/all-in-one/main.go index a34a95d0ea2..7f31b472945 100644 --- a/cmd/all-in-one/main.go +++ b/cmd/all-in-one/main.go @@ -149,7 +149,7 @@ by default uses only in-memory database.`, logger.Fatal("Failed to configure query service", zap.Error(err)) } - tm := tenancy.NewManager(&cOpts.GRPC.Tenancy) + tm := tenancy.NewManager(&cOpts.Tenancy) // collector c := collectorApp.New(&collectorApp.CollectorParams{ diff --git a/cmd/collector/app/collector.go b/cmd/collector/app/collector.go index 44b1ef47803..c2c5b4a9272 100644 --- a/cmd/collector/app/collector.go +++ b/cmd/collector/app/collector.go @@ -6,7 +6,6 @@ package app import ( "context" "fmt" - "io" "net/http" "time" @@ -47,13 +46,10 @@ type Collector struct { tenancyMgr *tenancy.Manager // state, read only - hServer *http.Server - grpcServer *grpc.Server - otlpReceiver receiver.Traces - zipkinReceiver receiver.Traces - tlsGRPCCertWatcherCloser io.Closer - tlsHTTPCertWatcherCloser io.Closer - tlsZipkinCertWatcherCloser io.Closer + hServer *http.Server + grpcServer *grpc.Server + otlpReceiver receiver.Traces + zipkinReceiver receiver.Traces } // CollectorParams to construct a new Jaeger Collector. @@ -101,26 +97,19 @@ func (c *Collector) Start(options *flags.CollectorOptions) error { c.spanProcessor = handlerBuilder.BuildSpanProcessor(additionalProcessors...) c.spanHandlers = handlerBuilder.BuildHandlers(c.spanProcessor) - grpcServer, err := server.StartGRPCServer(&server.GRPCServerParams{ - HostPort: options.GRPC.HostPort, - Handler: c.spanHandlers.GRPCHandler, - TLSConfig: options.GRPC.TLS, - SamplingProvider: c.samplingProvider, - Logger: c.logger, - MaxReceiveMessageLength: options.GRPC.MaxReceiveMessageLength, - MaxConnectionAge: options.GRPC.MaxConnectionAge, - MaxConnectionAgeGrace: options.GRPC.MaxConnectionAgeGrace, + Handler: c.spanHandlers.GRPCHandler, + SamplingProvider: c.samplingProvider, + Logger: c.logger, + ServerConfig: options.GRPC, }) if err != nil { return fmt.Errorf("could not start gRPC server: %w", err) } c.grpcServer = grpcServer - httpServer, err := server.StartHTTPServer(&server.HTTPServerParams{ - HostPort: options.HTTP.HostPort, + ServerConfig: options.HTTP, Handler: c.spanHandlers.JaegerBatchesHandler, - TLSConfig: options.HTTP.TLS, HealthCheck: c.hCheck, MetricsFactory: c.metricsFactory, SamplingProvider: c.samplingProvider, @@ -131,11 +120,7 @@ func (c *Collector) Start(options *flags.CollectorOptions) error { } c.hServer = httpServer - c.tlsGRPCCertWatcherCloser = &options.GRPC.TLS - c.tlsHTTPCertWatcherCloser = &options.HTTP.TLS - c.tlsZipkinCertWatcherCloser = &options.Zipkin.TLS - - if options.Zipkin.HTTPHostPort == "" { + if options.Zipkin.Endpoint == "" { c.logger.Info("Not listening for Zipkin HTTP traffic, port not configured") } else { zipkinReceiver, err := handler.StartZipkinReceiver(options, c.logger, c.spanProcessor, c.tenancyMgr) @@ -209,17 +194,6 @@ func (c *Collector) Close() error { } } - // watchers actually never return errors from Close - if c.tlsGRPCCertWatcherCloser != nil { - _ = c.tlsGRPCCertWatcherCloser.Close() - } - if c.tlsHTTPCertWatcherCloser != nil { - _ = c.tlsHTTPCertWatcherCloser.Close() - } - if c.tlsZipkinCertWatcherCloser != nil { - _ = c.tlsZipkinCertWatcherCloser.Close() - } - return nil } diff --git a/cmd/collector/app/collector_test.go b/cmd/collector/app/collector_test.go index 35c0576d630..4211f88cbbe 100644 --- a/cmd/collector/app/collector_test.go +++ b/cmd/collector/app/collector_test.go @@ -13,6 +13,10 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/config/configgrpc" + "go.opentelemetry.io/collector/config/confighttp" + "go.opentelemetry.io/collector/config/confignet" + "go.opentelemetry.io/collector/config/configtls" "go.uber.org/zap" "github.com/jaegertracing/jaeger/cmd/collector/app/flags" @@ -27,13 +31,54 @@ import ( var _ (io.Closer) = (*Collector)(nil) func optionsForEphemeralPorts() *flags.CollectorOptions { - collectorOpts := &flags.CollectorOptions{} - collectorOpts.GRPC.HostPort = ":0" - collectorOpts.HTTP.HostPort = ":0" - collectorOpts.OTLP.Enabled = true - collectorOpts.OTLP.GRPC.HostPort = ":0" - collectorOpts.OTLP.HTTP.HostPort = ":0" - collectorOpts.Zipkin.HTTPHostPort = ":0" + collectorOpts := &flags.CollectorOptions{ + HTTP: confighttp.ServerConfig{ + Endpoint: ":0", + TLSSetting: &configtls.ServerConfig{}, + }, + GRPC: configgrpc.ServerConfig{ + NetAddr: confignet.AddrConfig{ + Endpoint: ":0", + Transport: confignet.TransportTypeTCP, + }, + Keepalive: &configgrpc.KeepaliveServerConfig{ + ServerParameters: &configgrpc.KeepaliveServerParameters{ + MaxConnectionIdle: 10, + }, + }, + }, + OTLP: struct { + Enabled bool + GRPC configgrpc.ServerConfig + HTTP confighttp.ServerConfig + }{ + Enabled: true, + HTTP: confighttp.ServerConfig{ + Endpoint: ":0", + TLSSetting: &configtls.ServerConfig{}, + }, + GRPC: configgrpc.ServerConfig{ + NetAddr: confignet.AddrConfig{ + Endpoint: ":0", + Transport: confignet.TransportTypeTCP, + }, + Keepalive: &configgrpc.KeepaliveServerConfig{ + ServerParameters: &configgrpc.KeepaliveServerParameters{ + MaxConnectionIdle: 10, + }, + }, + }, + }, + Zipkin: struct { + confighttp.ServerConfig + KeepAlive bool + }{ + ServerConfig: confighttp.ServerConfig{ + Endpoint: ":0", + }, + }, + Tenancy: tenancy.Options{}, + } return collectorOpts } @@ -112,23 +157,23 @@ func TestCollector_StartErrors(t *testing.T) { var options *flags.CollectorOptions options = optionsForEphemeralPorts() - options.GRPC.HostPort = ":-1" + options.GRPC.NetAddr.Endpoint = ":-1" run("gRPC", options, "could not start gRPC server") options = optionsForEphemeralPorts() - options.HTTP.HostPort = ":-1" + options.HTTP.Endpoint = ":-1" run("HTTP", options, "could not start HTTP server") options = optionsForEphemeralPorts() - options.Zipkin.HTTPHostPort = ":-1" + options.Zipkin.Endpoint = ":-1" run("Zipkin", options, "could not start Zipkin receiver") options = optionsForEphemeralPorts() - options.OTLP.GRPC.HostPort = ":-1" + options.OTLP.GRPC.NetAddr.Endpoint = ":-1" run("OTLP/GRPC", options, "could not start OTLP receiver") options = optionsForEphemeralPorts() - options.OTLP.HTTP.HostPort = ":-1" + options.OTLP.HTTP.Endpoint = ":-1" run("OTLP/HTTP", options, "could not start OTLP receiver") } diff --git a/cmd/collector/app/flags/flags.go b/cmd/collector/app/flags/flags.go index 5c68169c881..2cef1e5f585 100644 --- a/cmd/collector/app/flags/flags.go +++ b/cmd/collector/app/flags/flags.go @@ -10,6 +10,8 @@ import ( "time" "github.com/spf13/viper" + "go.opentelemetry.io/collector/config/configgrpc" + "go.opentelemetry.io/collector/config/confighttp" "go.uber.org/zap" "github.com/jaegertracing/jaeger/cmd/internal/flags" @@ -63,6 +65,9 @@ var httpServerFlagsCfg = serverFlagsConfig{ tls: tlscfg.ServerFlagsConfig{ Prefix: "collector.http", }, + corsCfg: corscfg.Flags{ + Prefix: "collector.otlp.http", + }, } var otlpServerFlagsCfg = struct { @@ -82,19 +87,20 @@ var otlpServerFlagsCfg = struct { Prefix: "collector.otlp.http", EnableCertReloadInterval: true, }, + corsCfg: corscfg.Flags{ + Prefix: "collector.otlp.http", + }, }, } -var tlsZipkinFlagsConfig = tlscfg.ServerFlagsConfig{ - Prefix: "collector.zipkin", -} - -var corsZipkinFlags = corscfg.Flags{ - Prefix: "collector.zipkin", -} - -var corsOTLPFlags = corscfg.Flags{ - Prefix: "collector.otlp.http", +var zipkinServerFlagsCfg = serverFlagsConfig{ + prefix: "collector.zipkin", + tls: tlscfg.ServerFlagsConfig{ + Prefix: "collector.zipkin", + }, + corsCfg: corscfg.Flags{ + Prefix: "collector.zipkin", + }, } // CollectorOptions holds configuration for collector @@ -106,69 +112,32 @@ type CollectorOptions struct { // NumWorkers is the number of internal workers in a collector NumWorkers int // HTTP section defines options for HTTP server - HTTP HTTPOptions + HTTP confighttp.ServerConfig // GRPC section defines options for gRPC server - GRPC GRPCOptions + GRPC configgrpc.ServerConfig // OTLP section defines options for servers accepting OpenTelemetry OTLP format OTLP struct { Enabled bool - GRPC GRPCOptions - HTTP HTTPOptions + GRPC configgrpc.ServerConfig + HTTP confighttp.ServerConfig } // Zipkin section defines options for Zipkin HTTP server Zipkin struct { - // HTTPHostPort is the host:port address that the Zipkin collector service listens in on for http requests - HTTPHostPort string - // TLS configures secure transport for Zipkin endpoint to collect spans - TLS tlscfg.Options - // CORS allows CORS requests , sets the values for Allowed Headers and Allowed Origins. - CORS corscfg.Options - // KeepAlive configures allow Keep-Alive for Zipkin HTTP server + confighttp.ServerConfig KeepAlive bool } // CollectorTags is the string representing collector tags to append to each and every span CollectorTags map[string]string // SpanSizeMetricsEnabled determines whether to enable metrics based on processed span size SpanSizeMetricsEnabled bool -} -type serverFlagsConfig struct { - prefix string - tls tlscfg.ServerFlagsConfig -} - -// HTTPOptions defines options for an HTTP server -type HTTPOptions struct { - // HostPort is the host:port address that the server listens on - HostPort string - // TLS configures secure transport for HTTP endpoint - TLS tlscfg.Options - // ReadTimeout sets the respective parameter of http.Server - ReadTimeout time.Duration - // ReadHeaderTimeout sets the respective parameter of http.Server - ReadHeaderTimeout time.Duration - // IdleTimeout sets the respective parameter of http.Server - IdleTimeout time.Duration - // CORS allows CORS requests , sets the values for Allowed Headers and Allowed Origins. - CORS corscfg.Options + Tenancy tenancy.Options } -// GRPCOptions defines options for a gRPC server -type GRPCOptions struct { - // HostPort is the host:port address that the collector service listens in on for gRPC requests - HostPort string - // TLS configures secure transport for gRPC endpoint to collect spans - TLS tlscfg.Options - // MaxReceiveMessageLength is the maximum message size receivable by the gRPC Collector. - MaxReceiveMessageLength int - // MaxConnectionAge is a duration for the maximum amount of time a connection may exist. - // See gRPC's keepalive.ServerParameters#MaxConnectionAge. - MaxConnectionAge time.Duration - // MaxConnectionAgeGrace is an additive period after MaxConnectionAge after which the connection will be forcibly closed. - // See gRPC's keepalive.ServerParameters#MaxConnectionAgeGrace. - MaxConnectionAgeGrace time.Duration - // Tenancy configures tenancy for endpoints that collect spans - Tenancy tenancy.Options +type serverFlagsConfig struct { + prefix string + tls tlscfg.ServerFlagsConfig + corsCfg corscfg.Flags } // AddFlags adds flags for CollectorOptions @@ -184,13 +153,13 @@ func AddFlags(flagSet *flag.FlagSet) { flagSet.Bool(flagCollectorOTLPEnabled, true, "Enables OpenTelemetry OTLP receiver on dedicated HTTP and gRPC ports") addHTTPFlags(flagSet, otlpServerFlagsCfg.HTTP, ":4318") - corsOTLPFlags.AddFlags(flagSet) + otlpServerFlagsCfg.HTTP.corsCfg.AddFlags(flagSet) addGRPCFlags(flagSet, otlpServerFlagsCfg.GRPC, ":4317") flagSet.String(flagZipkinHTTPHostPort, "", "The host:port (e.g. 127.0.0.1:9411 or :9411) of the collector's Zipkin server (disabled by default)") flagSet.Bool(flagZipkinKeepAliveEnabled, true, "KeepAlive configures allow Keep-Alive for Zipkin HTTP server (enabled by default)") - tlsZipkinFlagsConfig.AddFlags(flagSet) - corsZipkinFlags.AddFlags(flagSet) + zipkinServerFlagsCfg.tls.AddFlags(flagSet) + zipkinServerFlagsCfg.corsCfg.AddFlags(flagSet) tenancy.AddFlags(flagSet) } @@ -223,67 +192,70 @@ func addGRPCFlags(flagSet *flag.FlagSet, cfg serverFlagsConfig, defaultHostPort cfg.tls.AddFlags(flagSet) } -func (opts *HTTPOptions) initFromViper(v *viper.Viper, _ *zap.Logger, cfg serverFlagsConfig) error { - opts.HostPort = ports.FormatHostPort(v.GetString(cfg.prefix + "." + flagSuffixHostPort)) - opts.IdleTimeout = v.GetDuration(cfg.prefix + "." + flagSuffixHTTPIdleTimeout) - opts.ReadTimeout = v.GetDuration(cfg.prefix + "." + flagSuffixHTTPReadTimeout) - opts.ReadHeaderTimeout = v.GetDuration(cfg.prefix + "." + flagSuffixHTTPReadHeaderTimeout) +func initHTTPFromViper(v *viper.Viper, opts *confighttp.ServerConfig, cfg serverFlagsConfig) error { tlsOpts, err := cfg.tls.InitFromViper(v) if err != nil { return fmt.Errorf("failed to parse HTTP TLS options: %w", err) } - opts.TLS = tlsOpts + opts.TLSSetting = tlsOpts.ToOtelServerConfig() + opts.Endpoint = ports.FormatHostPort(v.GetString(cfg.prefix + "." + flagSuffixHostPort)) + opts.IdleTimeout = v.GetDuration(cfg.prefix + "." + flagSuffixHTTPIdleTimeout) + opts.ReadTimeout = v.GetDuration(cfg.prefix + "." + flagSuffixHTTPReadTimeout) + opts.ReadHeaderTimeout = v.GetDuration(cfg.prefix + "." + flagSuffixHTTPReadHeaderTimeout) + opts.CORS = cfg.corsCfg.InitFromViper(v) + return nil } -func (opts *GRPCOptions) initFromViper(v *viper.Viper, _ *zap.Logger, cfg serverFlagsConfig) error { - opts.HostPort = ports.FormatHostPort(v.GetString(cfg.prefix + "." + flagSuffixHostPort)) - opts.MaxReceiveMessageLength = v.GetInt(cfg.prefix + "." + flagSuffixGRPCMaxReceiveMessageLength) - opts.MaxConnectionAge = v.GetDuration(cfg.prefix + "." + flagSuffixGRPCMaxConnectionAge) - opts.MaxConnectionAgeGrace = v.GetDuration(cfg.prefix + "." + flagSuffixGRPCMaxConnectionAgeGrace) +func initGRPCFromViper(v *viper.Viper, opts *configgrpc.ServerConfig, cfg serverFlagsConfig) error { tlsOpts, err := cfg.tls.InitFromViper(v) if err != nil { - return fmt.Errorf("failed to parse gRPC TLS options: %w", err) + return fmt.Errorf("failed to parse GRPC TLS options: %w", err) + } + opts.TLSSetting = tlsOpts.ToOtelServerConfig() + opts.NetAddr.Endpoint = ports.FormatHostPort(v.GetString(cfg.prefix + "." + flagSuffixHostPort)) + opts.MaxRecvMsgSizeMiB = v.GetInt(cfg.prefix+"."+flagSuffixGRPCMaxReceiveMessageLength) / (1024 * 1024) + opts.Keepalive = &configgrpc.KeepaliveServerConfig{ + ServerParameters: &configgrpc.KeepaliveServerParameters{ + MaxConnectionAge: v.GetDuration(cfg.prefix + "." + flagSuffixGRPCMaxConnectionAge), + MaxConnectionAgeGrace: v.GetDuration(cfg.prefix + "." + flagSuffixGRPCMaxConnectionAgeGrace), + }, } - opts.TLS = tlsOpts - opts.Tenancy = tenancy.InitFromViper(v) return nil } // InitFromViper initializes CollectorOptions with properties from viper -func (cOpts *CollectorOptions) InitFromViper(v *viper.Viper, logger *zap.Logger) (*CollectorOptions, error) { +func (cOpts *CollectorOptions) InitFromViper(v *viper.Viper, _ *zap.Logger) (*CollectorOptions, error) { cOpts.CollectorTags = flags.ParseJaegerTags(v.GetString(flagCollectorTags)) cOpts.NumWorkers = v.GetInt(flagNumWorkers) cOpts.QueueSize = v.GetUint(flagQueueSize) cOpts.DynQueueSizeMemory = v.GetUint(flagDynQueueSizeMemory) * 1024 * 1024 // we receive in MiB and store in bytes cOpts.SpanSizeMetricsEnabled = v.GetBool(flagSpanSizeMetricsEnabled) + cOpts.Tenancy = tenancy.InitFromViper(v) - if err := cOpts.HTTP.initFromViper(v, logger, httpServerFlagsCfg); err != nil { + if err := initHTTPFromViper(v, &cOpts.HTTP, httpServerFlagsCfg); err != nil { return cOpts, fmt.Errorf("failed to parse HTTP server options: %w", err) } - if err := cOpts.GRPC.initFromViper(v, logger, grpcServerFlagsCfg); err != nil { + if err := initGRPCFromViper(v, &cOpts.GRPC, grpcServerFlagsCfg); err != nil { return cOpts, fmt.Errorf("failed to parse gRPC server options: %w", err) } cOpts.OTLP.Enabled = v.GetBool(flagCollectorOTLPEnabled) - if err := cOpts.OTLP.HTTP.initFromViper(v, logger, otlpServerFlagsCfg.HTTP); err != nil { + + if err := initHTTPFromViper(v, &cOpts.OTLP.HTTP, otlpServerFlagsCfg.HTTP); err != nil { return cOpts, fmt.Errorf("failed to parse OTLP/HTTP server options: %w", err) } - cOpts.OTLP.HTTP.CORS = corsOTLPFlags.InitFromViper(v) - if err := cOpts.OTLP.GRPC.initFromViper(v, logger, otlpServerFlagsCfg.GRPC); err != nil { + if err := initGRPCFromViper(v, &cOpts.OTLP.GRPC, otlpServerFlagsCfg.GRPC); err != nil { return cOpts, fmt.Errorf("failed to parse OTLP/gRPC server options: %w", err) } cOpts.Zipkin.KeepAlive = v.GetBool(flagZipkinKeepAliveEnabled) - cOpts.Zipkin.HTTPHostPort = ports.FormatHostPort(v.GetString(flagZipkinHTTPHostPort)) - tlsZipkin, err := tlsZipkinFlagsConfig.InitFromViper(v) - if err != nil { - return cOpts, fmt.Errorf("failed to parse Zipkin TLS options: %w", err) + + if err := initHTTPFromViper(v, &cOpts.Zipkin.ServerConfig, zipkinServerFlagsCfg); err != nil { + return cOpts, fmt.Errorf("failed to parse Zipkin server options: %w", err) } - cOpts.Zipkin.TLS = tlsZipkin - cOpts.Zipkin.CORS = corsZipkinFlags.InitFromViper(v) return cOpts, nil } diff --git a/cmd/collector/app/flags/flags_test.go b/cmd/collector/app/flags/flags_test.go index 9d261465c09..39727d1bce8 100644 --- a/cmd/collector/app/flags/flags_test.go +++ b/cmd/collector/app/flags/flags_test.go @@ -26,9 +26,9 @@ func TestCollectorOptionsWithFlags_CheckHostPort(t *testing.T) { _, err := c.InitFromViper(v, zap.NewNop()) require.NoError(t, err) - assert.Equal(t, ":5678", c.HTTP.HostPort) - assert.Equal(t, ":1234", c.GRPC.HostPort) - assert.Equal(t, ":3456", c.Zipkin.HTTPHostPort) + assert.Equal(t, ":5678", c.HTTP.Endpoint) + assert.Equal(t, ":1234", c.GRPC.NetAddr.Endpoint) + assert.Equal(t, ":3456", c.Zipkin.Endpoint) } func TestCollectorOptionsWithFlags_CheckFullHostPort(t *testing.T) { @@ -42,9 +42,9 @@ func TestCollectorOptionsWithFlags_CheckFullHostPort(t *testing.T) { _, err := c.InitFromViper(v, zap.NewNop()) require.NoError(t, err) - assert.Equal(t, ":5678", c.HTTP.HostPort) - assert.Equal(t, "127.0.0.1:1234", c.GRPC.HostPort) - assert.Equal(t, "0.0.0.0:3456", c.Zipkin.HTTPHostPort) + assert.Equal(t, ":5678", c.HTTP.Endpoint) + assert.Equal(t, "127.0.0.1:1234", c.GRPC.NetAddr.Endpoint) + assert.Equal(t, "0.0.0.0:3456", c.Zipkin.Endpoint) } func TestCollectorOptionsWithFailedTLSFlags(t *testing.T) { @@ -107,7 +107,7 @@ func TestCollectorOptionsWithFlags_CheckMaxReceiveMessageLength(t *testing.T) { _, err := c.InitFromViper(v, zap.NewNop()) require.NoError(t, err) - assert.Equal(t, 8388608, c.GRPC.MaxReceiveMessageLength) + assert.Equal(t, 8, c.GRPC.MaxRecvMsgSizeMiB) } func TestCollectorOptionsWithFlags_CheckMaxConnectionAge(t *testing.T) { @@ -123,8 +123,8 @@ func TestCollectorOptionsWithFlags_CheckMaxConnectionAge(t *testing.T) { _, err := c.InitFromViper(v, zap.NewNop()) require.NoError(t, err) - assert.Equal(t, 5*time.Minute, c.GRPC.MaxConnectionAge) - assert.Equal(t, time.Minute, c.GRPC.MaxConnectionAgeGrace) + assert.Equal(t, 5*time.Minute, c.GRPC.Keepalive.ServerParameters.MaxConnectionAge) + assert.Equal(t, time.Minute, c.GRPC.Keepalive.ServerParameters.MaxConnectionAgeGrace) assert.Equal(t, 5*time.Minute, c.HTTP.IdleTimeout) assert.Equal(t, 6*time.Minute, c.HTTP.ReadTimeout) assert.Equal(t, 5*time.Second, c.HTTP.ReadHeaderTimeout) @@ -136,7 +136,7 @@ func TestCollectorOptionsWithFlags_CheckNoTenancy(t *testing.T) { command.ParseFlags([]string{}) c.InitFromViper(v, zap.NewNop()) - assert.False(t, c.GRPC.Tenancy.Enabled) + assert.False(t, c.Tenancy.Enabled) } func TestCollectorOptionsWithFlags_CheckSimpleTenancy(t *testing.T) { @@ -147,8 +147,8 @@ func TestCollectorOptionsWithFlags_CheckSimpleTenancy(t *testing.T) { }) c.InitFromViper(v, zap.NewNop()) - assert.True(t, c.GRPC.Tenancy.Enabled) - assert.Equal(t, "x-tenant", c.GRPC.Tenancy.Header) + assert.True(t, c.Tenancy.Enabled) + assert.Equal(t, "x-tenant", c.Tenancy.Header) } func TestCollectorOptionsWithFlags_CheckFullTenancy(t *testing.T) { @@ -161,9 +161,9 @@ func TestCollectorOptionsWithFlags_CheckFullTenancy(t *testing.T) { }) c.InitFromViper(v, zap.NewNop()) - assert.True(t, c.GRPC.Tenancy.Enabled) - assert.Equal(t, "custom-tenant-header", c.GRPC.Tenancy.Header) - assert.Equal(t, []string{"acme", "hardware-store"}, c.GRPC.Tenancy.Tenants) + assert.True(t, c.Tenancy.Enabled) + assert.Equal(t, "custom-tenant-header", c.Tenancy.Header) + assert.Equal(t, []string{"acme", "hardware-store"}, c.Tenancy.Tenants) } func TestCollectorOptionsWithFlags_CheckZipkinKeepAlive(t *testing.T) { diff --git a/cmd/collector/app/handler/otlp_receiver.go b/cmd/collector/app/handler/otlp_receiver.go index fb54d931bf8..1bb2b2a96d9 100644 --- a/cmd/collector/app/handler/otlp_receiver.go +++ b/cmd/collector/app/handler/otlp_receiver.go @@ -10,10 +10,8 @@ import ( otlp2jaeger "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componentstatus" - "go.opentelemetry.io/collector/config/configgrpc" - "go.opentelemetry.io/collector/config/confighttp" + "go.opentelemetry.io/collector/config/confignet" "go.opentelemetry.io/collector/config/configtelemetry" - "go.opentelemetry.io/collector/config/configtls" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/extension" "go.opentelemetry.io/collector/pdata/ptrace" @@ -27,7 +25,6 @@ import ( "github.com/jaegertracing/jaeger/cmd/collector/app/flags" "github.com/jaegertracing/jaeger/cmd/collector/app/processor" - "github.com/jaegertracing/jaeger/pkg/config/tlscfg" "github.com/jaegertracing/jaeger/pkg/tenancy" ) @@ -62,8 +59,9 @@ func startOTLPReceiver( cfg component.Config, nextConsumer consumer.Traces) (receiver.Traces, error), ) (receiver.Traces, error) { otlpReceiverConfig := otlpFactory.CreateDefaultConfig().(*otlpreceiver.Config) - applyGRPCSettings(otlpReceiverConfig.GRPC, &options.OTLP.GRPC) - applyHTTPSettings(otlpReceiverConfig.HTTP.ServerConfig, &options.OTLP.HTTP) + otlpReceiverConfig.GRPC = &options.OTLP.GRPC + otlpReceiverConfig.GRPC.NetAddr.Transport = confignet.TransportTypeTCP + otlpReceiverConfig.HTTP.ServerConfig = &options.OTLP.HTTP statusReporter := func(ev *componentstatus.Event) { // TODO this could be wired into changing healthcheck.HealthCheck logger.Info("OTLP receiver status change", zap.Stringer("status", ev.Status())) @@ -101,54 +99,6 @@ func startOTLPReceiver( return otlpReceiver, nil } -func applyGRPCSettings(cfg *configgrpc.ServerConfig, opts *flags.GRPCOptions) { - if opts.HostPort != "" { - cfg.NetAddr.Endpoint = opts.HostPort - } - if opts.TLS.Enabled { - cfg.TLSSetting = applyTLSSettings(&opts.TLS) - } - if opts.MaxReceiveMessageLength > 0 { - cfg.MaxRecvMsgSizeMiB = int(opts.MaxReceiveMessageLength / (1024 * 1024)) - } - if opts.MaxConnectionAge != 0 || opts.MaxConnectionAgeGrace != 0 { - cfg.Keepalive = &configgrpc.KeepaliveServerConfig{ - ServerParameters: &configgrpc.KeepaliveServerParameters{ - MaxConnectionAge: opts.MaxConnectionAge, - MaxConnectionAgeGrace: opts.MaxConnectionAgeGrace, - }, - } - } -} - -func applyHTTPSettings(cfg *confighttp.ServerConfig, opts *flags.HTTPOptions) { - if opts.HostPort != "" { - cfg.Endpoint = opts.HostPort - } - if opts.TLS.Enabled { - cfg.TLSSetting = applyTLSSettings(&opts.TLS) - } - - cfg.CORS = &confighttp.CORSConfig{ - AllowedOrigins: opts.CORS.AllowedOrigins, - AllowedHeaders: opts.CORS.AllowedHeaders, - } -} - -func applyTLSSettings(opts *tlscfg.Options) *configtls.ServerConfig { - return &configtls.ServerConfig{ - Config: configtls.Config{ - CAFile: opts.CAPath, - CertFile: opts.CertPath, - KeyFile: opts.KeyPath, - MinVersion: opts.MinVersion, - MaxVersion: opts.MaxVersion, - ReloadInterval: opts.ReloadInterval, - }, - ClientCAFile: opts.ClientCAPath, - } -} - func newConsumerDelegate(logger *zap.Logger, spanProcessor processor.SpanProcessor, tm *tenancy.Manager) *consumerDelegate { return &consumerDelegate{ batchConsumer: newBatchConsumer(logger, diff --git a/cmd/collector/app/handler/otlp_receiver_test.go b/cmd/collector/app/handler/otlp_receiver_test.go index 1f18c3d07bb..c0f80de60f2 100644 --- a/cmd/collector/app/handler/otlp_receiver_test.go +++ b/cmd/collector/app/handler/otlp_receiver_test.go @@ -7,11 +7,13 @@ import ( "context" "errors" "testing" - "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config/configgrpc" + "go.opentelemetry.io/collector/config/confighttp" + "go.opentelemetry.io/collector/config/confignet" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/pdata/ptrace" "go.opentelemetry.io/collector/pipeline" @@ -19,19 +21,28 @@ import ( "go.opentelemetry.io/collector/receiver/otlpreceiver" "github.com/jaegertracing/jaeger/cmd/collector/app/flags" - "github.com/jaegertracing/jaeger/pkg/config/corscfg" - "github.com/jaegertracing/jaeger/pkg/config/tlscfg" "github.com/jaegertracing/jaeger/pkg/tenancy" "github.com/jaegertracing/jaeger/pkg/testutils" ) func optionsWithPorts(port string) *flags.CollectorOptions { - opts := &flags.CollectorOptions{} - opts.OTLP.GRPC = flags.GRPCOptions{ - HostPort: port, - } - opts.OTLP.HTTP = flags.HTTPOptions{ - HostPort: port, + opts := &flags.CollectorOptions{ + OTLP: struct { + Enabled bool + GRPC configgrpc.ServerConfig + HTTP confighttp.ServerConfig + }{ + Enabled: true, + HTTP: confighttp.ServerConfig{ + Endpoint: port, + }, + GRPC: configgrpc.ServerConfig{ + NetAddr: confignet.AddrConfig{ + Endpoint: port, + Transport: confignet.TransportTypeTCP, + }, + }, + }, } return opts } @@ -128,80 +139,3 @@ func TestOtelHost(t *testing.T) { assert.Nil(t, host.GetExtensions()) assert.Nil(t, host.GetExporters()) } - -func TestApplyOTLPGRPCServerSettings(t *testing.T) { - otlpFactory := otlpreceiver.NewFactory() - otlpReceiverConfig := otlpFactory.CreateDefaultConfig().(*otlpreceiver.Config) - - grpcOpts := &flags.GRPCOptions{ - HostPort: ":54321", - MaxReceiveMessageLength: 42 * 1024 * 1024, - MaxConnectionAge: 33 * time.Second, - MaxConnectionAgeGrace: 37 * time.Second, - TLS: tlscfg.Options{ - Enabled: true, - CAPath: "ca", - CertPath: "cert", - KeyPath: "key", - ClientCAPath: "clientca", - MinVersion: "1.1", - MaxVersion: "1.3", - ReloadInterval: 24 * time.Hour, - }, - } - applyGRPCSettings(otlpReceiverConfig.GRPC, grpcOpts) - out := otlpReceiverConfig.GRPC - assert.Equal(t, ":54321", out.NetAddr.Endpoint) - assert.EqualValues(t, 42, out.MaxRecvMsgSizeMiB) - require.NotNil(t, out.Keepalive) - require.NotNil(t, out.Keepalive.ServerParameters) - assert.Equal(t, 33*time.Second, out.Keepalive.ServerParameters.MaxConnectionAge) - assert.Equal(t, 37*time.Second, out.Keepalive.ServerParameters.MaxConnectionAgeGrace) - require.NotNil(t, out.TLSSetting) - assert.Equal(t, "ca", out.TLSSetting.CAFile) - assert.Equal(t, "cert", out.TLSSetting.CertFile) - assert.Equal(t, "key", out.TLSSetting.KeyFile) - assert.Equal(t, "clientca", out.TLSSetting.ClientCAFile) - assert.Equal(t, "1.1", out.TLSSetting.MinVersion) - assert.Equal(t, "1.3", out.TLSSetting.MaxVersion) - assert.Equal(t, 24*time.Hour, out.TLSSetting.ReloadInterval) -} - -func TestApplyOTLPHTTPServerSettings(t *testing.T) { - otlpFactory := otlpreceiver.NewFactory() - otlpReceiverConfig := otlpFactory.CreateDefaultConfig().(*otlpreceiver.Config) - - httpOpts := &flags.HTTPOptions{ - HostPort: ":12345", - TLS: tlscfg.Options{ - Enabled: true, - CAPath: "ca", - CertPath: "cert", - KeyPath: "key", - ClientCAPath: "clientca", - MinVersion: "1.1", - MaxVersion: "1.3", - ReloadInterval: 24 * time.Hour, - }, - CORS: corscfg.Options{ - AllowedOrigins: []string{"http://example.domain.com", "http://*.domain.com"}, - AllowedHeaders: []string{"Content-Type", "Accept", "X-Requested-With"}, - }, - } - - applyHTTPSettings(otlpReceiverConfig.HTTP.ServerConfig, httpOpts) - - out := otlpReceiverConfig.HTTP - - assert.Equal(t, ":12345", out.Endpoint) - require.NotNil(t, out.TLSSetting) - assert.Equal(t, "ca", out.TLSSetting.CAFile) - assert.Equal(t, "cert", out.TLSSetting.CertFile) - assert.Equal(t, "key", out.TLSSetting.KeyFile) - assert.Equal(t, "clientca", out.TLSSetting.ClientCAFile) - assert.Equal(t, "1.1", out.TLSSetting.MinVersion) - assert.Equal(t, "1.3", out.TLSSetting.MaxVersion) - assert.Equal(t, 24*time.Hour, out.TLSSetting.ReloadInterval) - assert.Equal(t, []string{"Content-Type", "Accept", "X-Requested-With"}, out.CORS.AllowedHeaders) - assert.Equal(t, []string{"http://example.domain.com", "http://*.domain.com"}, out.CORS.AllowedOrigins) -} diff --git a/cmd/collector/app/handler/zipkin_receiver.go b/cmd/collector/app/handler/zipkin_receiver.go index 983ee4bd2a5..b7fc18619e3 100644 --- a/cmd/collector/app/handler/zipkin_receiver.go +++ b/cmd/collector/app/handler/zipkin_receiver.go @@ -56,12 +56,7 @@ func startZipkinReceiver( cfg component.Config, nextConsumer consumer.Traces) (receiver.Traces, error), ) (receiver.Traces, error) { receiverConfig := zipkinFactory.CreateDefaultConfig().(*zipkinreceiver.Config) - applyHTTPSettings(&receiverConfig.ServerConfig, &flags.HTTPOptions{ - HostPort: options.Zipkin.HTTPHostPort, - TLS: options.Zipkin.TLS, - CORS: options.HTTP.CORS, - // TODO keepAlive not supported? - }) + receiverConfig.ServerConfig = options.Zipkin.ServerConfig receiverSettings := receiver.Settings{ TelemetrySettings: component.TelemetrySettings{ Logger: logger, diff --git a/cmd/collector/app/handler/zipkin_receiver_test.go b/cmd/collector/app/handler/zipkin_receiver_test.go index 584dbc56cda..d6ad98c4319 100644 --- a/cmd/collector/app/handler/zipkin_receiver_test.go +++ b/cmd/collector/app/handler/zipkin_receiver_test.go @@ -37,7 +37,7 @@ func TestZipkinReceiver(t *testing.T) { tm := &tenancy.Manager{} opts := &flags.CollectorOptions{} - opts.Zipkin.HTTPHostPort = ":11911" + opts.Zipkin.Endpoint = ":11911" rec, err := StartZipkinReceiver(opts, logger, spanProcessor, tm) require.NoError(t, err) @@ -138,7 +138,7 @@ func TestStartZipkinReceiver_Error(t *testing.T) { tm := &tenancy.Manager{} opts := &flags.CollectorOptions{} - opts.Zipkin.HTTPHostPort = ":-1" + opts.Zipkin.Endpoint = ":-1" _, err := StartZipkinReceiver(opts, logger, spanProcessor, tm) require.ErrorContains(t, err, "could not start Zipkin receiver") diff --git a/cmd/collector/app/handler/zipkin_receiver_tls_test.go b/cmd/collector/app/handler/zipkin_receiver_tls_test.go index 060327360a6..108a1023150 100644 --- a/cmd/collector/app/handler/zipkin_receiver_tls_test.go +++ b/cmd/collector/app/handler/zipkin_receiver_tls_test.go @@ -13,10 +13,9 @@ import ( "time" "github.com/stretchr/testify/require" - "go.uber.org/zap" + "go.opentelemetry.io/collector/config/configtls" "github.com/jaegertracing/jaeger/cmd/collector/app/flags" - "github.com/jaegertracing/jaeger/pkg/config/tlscfg" "github.com/jaegertracing/jaeger/pkg/tenancy" "github.com/jaegertracing/jaeger/pkg/testutils" "github.com/jaegertracing/jaeger/ports" @@ -26,21 +25,21 @@ func TestSpanCollectorZipkinTLS(t *testing.T) { const testCertKeyLocation = "../../../../pkg/config/tlscfg/testdata" testCases := []struct { name string - serverTLS tlscfg.Options - clientTLS tlscfg.Options + serverTLS configtls.ServerConfig + clientTLS configtls.ClientConfig expectTLSClientErr bool expectZipkinClientErr bool expectServerFail bool }{ { name: "should fail with TLS client to untrusted TLS server", - serverTLS: tlscfg.Options{ - Enabled: true, - CertPath: testCertKeyLocation + "/example-server-cert.pem", - KeyPath: testCertKeyLocation + "/example-server-key.pem", + serverTLS: configtls.ServerConfig{ + Config: configtls.Config{ + CertFile: testCertKeyLocation + "/example-server-cert.pem", + KeyFile: testCertKeyLocation + "/example-server-key.pem", + }, }, - clientTLS: tlscfg.Options{ - Enabled: true, + clientTLS: configtls.ClientConfig{ ServerName: "example.com", }, expectTLSClientErr: true, @@ -49,14 +48,16 @@ func TestSpanCollectorZipkinTLS(t *testing.T) { }, { name: "should fail with TLS client to trusted TLS server with incorrect hostname", - serverTLS: tlscfg.Options{ - Enabled: true, - CertPath: testCertKeyLocation + "/example-server-cert.pem", - KeyPath: testCertKeyLocation + "/example-server-key.pem", + serverTLS: configtls.ServerConfig{ + Config: configtls.Config{ + CertFile: testCertKeyLocation + "/example-server-cert.pem", + KeyFile: testCertKeyLocation + "/example-server-key.pem", + }, }, - clientTLS: tlscfg.Options{ - Enabled: true, - CAPath: testCertKeyLocation + "/example-CA-cert.pem", + clientTLS: configtls.ClientConfig{ + Config: configtls.Config{ + CAFile: testCertKeyLocation + "/example-CA-cert.pem", + }, ServerName: "nonEmpty", }, expectTLSClientErr: true, @@ -65,14 +66,16 @@ func TestSpanCollectorZipkinTLS(t *testing.T) { }, { name: "should pass with TLS client to trusted TLS server with correct hostname", - serverTLS: tlscfg.Options{ - Enabled: true, - CertPath: testCertKeyLocation + "/example-server-cert.pem", - KeyPath: testCertKeyLocation + "/example-server-key.pem", + serverTLS: configtls.ServerConfig{ + Config: configtls.Config{ + CertFile: testCertKeyLocation + "/example-server-cert.pem", + KeyFile: testCertKeyLocation + "/example-server-key.pem", + }, }, - clientTLS: tlscfg.Options{ - Enabled: true, - CAPath: testCertKeyLocation + "/example-CA-cert.pem", + clientTLS: configtls.ClientConfig{ + Config: configtls.Config{ + CAFile: testCertKeyLocation + "/example-CA-cert.pem", + }, ServerName: "example.com", }, expectTLSClientErr: false, @@ -81,74 +84,82 @@ func TestSpanCollectorZipkinTLS(t *testing.T) { }, { name: "should fail with TLS client without cert to trusted TLS server requiring cert", - serverTLS: tlscfg.Options{ - Enabled: true, - CertPath: testCertKeyLocation + "/example-server-cert.pem", - KeyPath: testCertKeyLocation + "/example-server-key.pem", - ClientCAPath: testCertKeyLocation + "/example-CA-cert.pem", + serverTLS: configtls.ServerConfig{ + ClientCAFile: testCertKeyLocation + "/example-CA-cert.pem", + Config: configtls.Config{ + CertFile: testCertKeyLocation + "/example-server-cert.pem", + KeyFile: testCertKeyLocation + "/example-server-key.pem", + }, }, - clientTLS: tlscfg.Options{ - Enabled: true, - CAPath: testCertKeyLocation + "/example-CA-cert.pem", + clientTLS: configtls.ClientConfig{ + Config: configtls.Config{ + CAFile: testCertKeyLocation + "/example-CA-cert.pem", + }, ServerName: "example.com", }, expectTLSClientErr: false, - expectServerFail: false, expectZipkinClientErr: true, + expectServerFail: false, }, { name: "should pass with TLS client with cert to trusted TLS server requiring cert", - serverTLS: tlscfg.Options{ - Enabled: true, - CertPath: testCertKeyLocation + "/example-server-cert.pem", - KeyPath: testCertKeyLocation + "/example-server-key.pem", - ClientCAPath: testCertKeyLocation + "/example-CA-cert.pem", + serverTLS: configtls.ServerConfig{ + ClientCAFile: testCertKeyLocation + "/example-CA-cert.pem", + Config: configtls.Config{ + CertFile: testCertKeyLocation + "/example-server-cert.pem", + KeyFile: testCertKeyLocation + "/example-server-key.pem", + }, }, - clientTLS: tlscfg.Options{ - Enabled: true, - CAPath: testCertKeyLocation + "/example-CA-cert.pem", + clientTLS: configtls.ClientConfig{ + Config: configtls.Config{ + CAFile: testCertKeyLocation + "/example-CA-cert.pem", + CertFile: testCertKeyLocation + "/example-client-cert.pem", + KeyFile: testCertKeyLocation + "/example-client-key.pem", + }, ServerName: "example.com", - CertPath: testCertKeyLocation + "/example-client-cert.pem", - KeyPath: testCertKeyLocation + "/example-client-key.pem", }, expectTLSClientErr: false, - expectServerFail: false, expectZipkinClientErr: false, + expectServerFail: false, }, { - name: "should fail with TLS client without cert to trusted TLS server requiring cert from a different CA", - serverTLS: tlscfg.Options{ - Enabled: true, - CertPath: testCertKeyLocation + "/example-server-cert.pem", - KeyPath: testCertKeyLocation + "/example-server-key.pem", - ClientCAPath: testCertKeyLocation + "/wrong-CA-cert.pem", // NB: wrong CA + name: "should fail with TLS client without cert to trusted TLS server requiring cert from different CA", + serverTLS: configtls.ServerConfig{ + ClientCAFile: testCertKeyLocation + "/wrong-CA-cert.pem", + Config: configtls.Config{ + CertFile: testCertKeyLocation + "/example-server-cert.pem", + KeyFile: testCertKeyLocation + "/example-server-key.pem", + }, }, - clientTLS: tlscfg.Options{ - Enabled: true, - CAPath: testCertKeyLocation + "/example-CA-cert.pem", + clientTLS: configtls.ClientConfig{ + Config: configtls.Config{ + CAFile: testCertKeyLocation + "/example-CA-cert.pem", + CertFile: testCertKeyLocation + "/example-client-cert.pem", + KeyFile: testCertKeyLocation + "/example-client-key.pem", + }, ServerName: "example.com", - CertPath: testCertKeyLocation + "/example-client-cert.pem", - KeyPath: testCertKeyLocation + "/example-client-key.pem", }, expectTLSClientErr: false, - expectServerFail: false, expectZipkinClientErr: true, + expectServerFail: false, }, { name: "should fail with TLS client with cert to trusted TLS server with incorrect TLS min", - serverTLS: tlscfg.Options{ - Enabled: true, - CertPath: testCertKeyLocation + "/example-server-cert.pem", - KeyPath: testCertKeyLocation + "/example-server-key.pem", - ClientCAPath: testCertKeyLocation + "/example-CA-cert.pem", - MinVersion: "1.5", + serverTLS: configtls.ServerConfig{ + ClientCAFile: testCertKeyLocation + "/example-CA-cert.pem", + Config: configtls.Config{ + CertFile: testCertKeyLocation + "/example-server-cert.pem", + KeyFile: testCertKeyLocation + "/example-server-key.pem", + MinVersion: "1.5", + }, }, - clientTLS: tlscfg.Options{ - Enabled: true, - CAPath: testCertKeyLocation + "/example-CA-cert.pem", + clientTLS: configtls.ClientConfig{ + Config: configtls.Config{ + CAFile: testCertKeyLocation + "/example-CA-cert.pem", + CertFile: testCertKeyLocation + "/example-client-cert.pem", + KeyFile: testCertKeyLocation + "/example-client-key.pem", + }, ServerName: "example.com", - CertPath: testCertKeyLocation + "/example-client-cert.pem", - KeyPath: testCertKeyLocation + "/example-client-key.pem", }, expectTLSClientErr: true, expectServerFail: true, @@ -163,9 +174,8 @@ func TestSpanCollectorZipkinTLS(t *testing.T) { tm := &tenancy.Manager{} opts := &flags.CollectorOptions{} - opts.Zipkin.HTTPHostPort = ports.PortToHostPort(ports.CollectorZipkin) - opts.Zipkin.TLS = test.serverTLS - defer test.serverTLS.Close() + opts.Zipkin.Endpoint = ports.PortToHostPort(ports.CollectorZipkin) + opts.Zipkin.TLSSetting = &test.serverTLS server, err := StartZipkinReceiver(opts, logger, spanProcessor, tm) if test.expectServerFail { @@ -177,8 +187,7 @@ func TestSpanCollectorZipkinTLS(t *testing.T) { require.NoError(t, server.Shutdown(context.Background())) }() - clientTLSCfg, err0 := test.clientTLS.Config(zap.NewNop()) - defer test.clientTLS.Close() + clientTLSCfg, err0 := test.clientTLS.LoadTLSConfig(context.Background()) require.NoError(t, err0) dialer := &net.Dialer{Timeout: 2 * time.Second} conn, clientError := tls.DialWithDialer(dialer, "tcp", fmt.Sprintf("localhost:%d", ports.CollectorZipkin), clientTLSCfg) @@ -197,7 +206,6 @@ func TestSpanCollectorZipkinTLS(t *testing.T) { } response, requestError := client.Post(fmt.Sprintf("https://localhost:%d", ports.CollectorZipkin), "", nil) - if test.expectZipkinClientErr { require.Error(t, requestError) } else { diff --git a/cmd/collector/app/server/grpc.go b/cmd/collector/app/server/grpc.go index a740fe5b8fe..dbdedce6f26 100644 --- a/cmd/collector/app/server/grpc.go +++ b/cmd/collector/app/server/grpc.go @@ -4,36 +4,32 @@ package server import ( + "context" "fmt" "net" - "time" + "go.opentelemetry.io/collector/config/configgrpc" + "go.opentelemetry.io/collector/config/confignet" "go.uber.org/zap" "google.golang.org/grpc" - "google.golang.org/grpc/credentials" "google.golang.org/grpc/health" "google.golang.org/grpc/health/grpc_health_v1" - "google.golang.org/grpc/keepalive" "google.golang.org/grpc/reflection" "github.com/jaegertracing/jaeger/cmd/collector/app/handler" "github.com/jaegertracing/jaeger/cmd/collector/app/sampling" "github.com/jaegertracing/jaeger/cmd/collector/app/sampling/samplingstrategy" - "github.com/jaegertracing/jaeger/pkg/config/tlscfg" + "github.com/jaegertracing/jaeger/pkg/telemetry" "github.com/jaegertracing/jaeger/proto-gen/api_v2" ) // GRPCServerParams to construct a new Jaeger Collector gRPC Server type GRPCServerParams struct { - TLSConfig tlscfg.Options - HostPort string - Handler *handler.GRPCHandler - SamplingProvider samplingstrategy.Provider - Logger *zap.Logger - OnError func(error) - MaxReceiveMessageLength int - MaxConnectionAge time.Duration - MaxConnectionAgeGrace time.Duration + configgrpc.ServerConfig + Handler *handler.GRPCHandler + SamplingProvider samplingstrategy.Provider + Logger *zap.Logger + OnError func(error) // Set by the server to indicate the actual host:port of the server. HostPortActual string @@ -42,31 +38,17 @@ type GRPCServerParams struct { // StartGRPCServer based on the given parameters func StartGRPCServer(params *GRPCServerParams) (*grpc.Server, error) { var server *grpc.Server - var grpcOpts []grpc.ServerOption - - if params.MaxReceiveMessageLength > 0 { - grpcOpts = append(grpcOpts, grpc.MaxRecvMsgSize(params.MaxReceiveMessageLength)) - } - grpcOpts = append(grpcOpts, grpc.KeepaliveParams(keepalive.ServerParameters{ - MaxConnectionAge: params.MaxConnectionAge, - MaxConnectionAgeGrace: params.MaxConnectionAgeGrace, - })) - - if params.TLSConfig.Enabled { - // user requested a server with TLS, setup creds - tlsCfg, err := params.TLSConfig.Config(params.Logger) - if err != nil { - return nil, err - } - - creds := credentials.NewTLS(tlsCfg) - grpcOpts = append(grpcOpts, grpc.Creds(creds)) + var grpcOpts []configgrpc.ToServerOption + params.NetAddr.Transport = confignet.TransportTypeTCP + server, err := params.ToServer(context.Background(), nil, + telemetry.NoopSettings().ToOtelComponent(), + grpcOpts...) + if err != nil { + return nil, err } - - server = grpc.NewServer(grpcOpts...) reflection.Register(server) - listener, err := net.Listen("tcp", params.HostPort) + listener, err := params.NetAddr.Listen(context.Background()) if err != nil { return nil, fmt.Errorf("failed to listen on gRPC port: %w", err) } diff --git a/cmd/collector/app/server/grpc_test.go b/cmd/collector/app/server/grpc_test.go index 19594ebcfa7..3ac6351d628 100644 --- a/cmd/collector/app/server/grpc_test.go +++ b/cmd/collector/app/server/grpc_test.go @@ -10,6 +10,8 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/config/configgrpc" + "go.opentelemetry.io/collector/config/confignet" "go.uber.org/zap" "go.uber.org/zap/zapcore" "go.uber.org/zap/zaptest/observer" @@ -28,7 +30,12 @@ import ( func TestFailToListen(t *testing.T) { logger, _ := zap.NewDevelopment() server, err := StartGRPCServer(&GRPCServerParams{ - HostPort: ":-1", + ServerConfig: configgrpc.ServerConfig{ + NetAddr: confignet.AddrConfig{ + Endpoint: ":-1", + Transport: confignet.TransportTypeTCP, + }, + }, Handler: handler.NewGRPCHandler(logger, &mockSpanProcessor{}, &tenancy.Manager{}), SamplingProvider: &mockSamplingProvider{}, Logger: logger, @@ -63,10 +70,15 @@ func TestFailServe(t *testing.T) { func TestSpanCollector(t *testing.T) { logger, _ := zap.NewDevelopment() params := &GRPCServerParams{ - Handler: handler.NewGRPCHandler(logger, &mockSpanProcessor{}, &tenancy.Manager{}), - SamplingProvider: &mockSamplingProvider{}, - Logger: logger, - MaxReceiveMessageLength: 1024 * 1024, + Handler: handler.NewGRPCHandler(logger, &mockSpanProcessor{}, &tenancy.Manager{}), + SamplingProvider: &mockSamplingProvider{}, + Logger: logger, + ServerConfig: configgrpc.ServerConfig{ + MaxRecvMsgSizeMiB: 2, + NetAddr: confignet.AddrConfig{ + Transport: confignet.TransportTypeTCP, + }, + }, } server, err := StartGRPCServer(params) @@ -87,21 +99,26 @@ func TestSpanCollector(t *testing.T) { func TestCollectorStartWithTLS(t *testing.T) { logger, _ := zap.NewDevelopment() + opts := tlscfg.Options{ + Enabled: true, + CertPath: testCertKeyLocation + "/example-server-cert.pem", + KeyPath: testCertKeyLocation + "/example-server-key.pem", + ClientCAPath: testCertKeyLocation + "/example-CA-cert.pem", + } params := &GRPCServerParams{ Handler: handler.NewGRPCHandler(logger, &mockSpanProcessor{}, &tenancy.Manager{}), SamplingProvider: &mockSamplingProvider{}, Logger: logger, - TLSConfig: tlscfg.Options{ - Enabled: true, - CertPath: testCertKeyLocation + "/example-server-cert.pem", - KeyPath: testCertKeyLocation + "/example-server-key.pem", - ClientCAPath: testCertKeyLocation + "/example-CA-cert.pem", + ServerConfig: configgrpc.ServerConfig{ + NetAddr: confignet.AddrConfig{ + Transport: confignet.TransportTypeTCP, + }, + TLSSetting: opts.ToOtelServerConfig(), }, } server, err := StartGRPCServer(params) require.NoError(t, err) defer server.Stop() - defer params.TLSConfig.Close() } func TestCollectorReflection(t *testing.T) { @@ -110,6 +127,11 @@ func TestCollectorReflection(t *testing.T) { Handler: handler.NewGRPCHandler(logger, &mockSpanProcessor{}, &tenancy.Manager{}), SamplingProvider: &mockSamplingProvider{}, Logger: logger, + ServerConfig: configgrpc.ServerConfig{ + NetAddr: confignet.AddrConfig{ + Transport: confignet.TransportTypeTCP, + }, + }, } server, err := StartGRPCServer(params) diff --git a/cmd/collector/app/server/http.go b/cmd/collector/app/server/http.go index c519e6df495..15385bb9320 100644 --- a/cmd/collector/app/server/http.go +++ b/cmd/collector/app/server/http.go @@ -4,63 +4,47 @@ package server import ( + "context" "net" "net/http" - "time" "github.com/gorilla/mux" + "go.opentelemetry.io/collector/config/confighttp" "go.uber.org/zap" "go.uber.org/zap/zapcore" "github.com/jaegertracing/jaeger/cmd/collector/app/handler" "github.com/jaegertracing/jaeger/cmd/collector/app/sampling/samplingstrategy" clientcfgHandler "github.com/jaegertracing/jaeger/pkg/clientcfg/clientcfghttp" - "github.com/jaegertracing/jaeger/pkg/config/tlscfg" "github.com/jaegertracing/jaeger/pkg/healthcheck" "github.com/jaegertracing/jaeger/pkg/httpmetrics" "github.com/jaegertracing/jaeger/pkg/metrics" "github.com/jaegertracing/jaeger/pkg/recoveryhandler" + "github.com/jaegertracing/jaeger/pkg/telemetry" ) // HTTPServerParams to construct a new Jaeger Collector HTTP Server type HTTPServerParams struct { - TLSConfig tlscfg.Options - HostPort string + confighttp.ServerConfig Handler handler.JaegerBatchesHandler SamplingProvider samplingstrategy.Provider MetricsFactory metrics.Factory HealthCheck *healthcheck.HealthCheck Logger *zap.Logger - - // ReadTimeout sets the respective parameter of http.Server - ReadTimeout time.Duration - // ReadHeaderTimeout sets the respective parameter of http.Server - ReadHeaderTimeout time.Duration - // IdleTimeout sets the respective parameter of http.Server - IdleTimeout time.Duration } // StartHTTPServer based on the given parameters func StartHTTPServer(params *HTTPServerParams) (*http.Server, error) { - params.Logger.Info("Starting jaeger-collector HTTP server", zap.String("http host-port", params.HostPort)) - - errorLog, _ := zap.NewStdLogAt(params.Logger, zapcore.ErrorLevel) - server := &http.Server{ - Addr: params.HostPort, - ReadTimeout: params.ReadTimeout, - ReadHeaderTimeout: params.ReadHeaderTimeout, - IdleTimeout: params.IdleTimeout, - ErrorLog: errorLog, - } - if params.TLSConfig.Enabled { - tlsCfg, err := params.TLSConfig.Config(params.Logger) // This checks if the certificates are correctly provided - if err != nil { - return nil, err - } - server.TLSConfig = tlsCfg + params.Logger.Info("Starting jaeger-collector HTTP server", zap.String("host-port", params.Endpoint)) + listener, err := params.ToListener(context.Background()) + if err != nil { + return nil, err } + errorLog, _ := zap.NewStdLogAt(params.Logger, zapcore.ErrorLevel) + server, err := params.ToServer(context.Background(), nil, telemetry.NoopSettings().ToOtelComponent(), + nil) - listener, err := net.Listen("tcp", params.HostPort) + server.ErrorLog = errorLog if err != nil { return nil, err } @@ -89,12 +73,7 @@ func serveHTTP(server *http.Server, listener net.Listener, params *HTTPServerPar recoveryHandler := recoveryhandler.NewRecoveryHandler(params.Logger, true) server.Handler = httpmetrics.Wrap(recoveryHandler(r), params.MetricsFactory, params.Logger) go func() { - var err error - if params.TLSConfig.Enabled { - err = server.ServeTLS(listener, "", "") - } else { - err = server.Serve(listener) - } + err := server.Serve(listener) if err != nil { if err != http.ErrServerClosed { params.Logger.Error("Could not start HTTP collector", zap.Error(err)) diff --git a/cmd/collector/app/server/http_test.go b/cmd/collector/app/server/http_test.go index 46184bd8b89..24c00b94e54 100644 --- a/cmd/collector/app/server/http_test.go +++ b/cmd/collector/app/server/http_test.go @@ -4,17 +4,20 @@ package server import ( + "context" "crypto/tls" "fmt" "net" "net/http" - "net/http/httptest" "strconv" "testing" "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config/confighttp" + "go.opentelemetry.io/collector/config/configtls" "go.uber.org/zap" "github.com/jaegertracing/jaeger/cmd/collector/app/handler" @@ -30,8 +33,10 @@ var testCertKeyLocation = "../../../../pkg/config/tlscfg/testdata" func TestFailToListenHTTP(t *testing.T) { logger, _ := zap.NewDevelopment() server, err := StartHTTPServer(&HTTPServerParams{ - HostPort: ":-1", - Logger: logger, + ServerConfig: confighttp.ServerConfig{ + Endpoint: ":-1", + }, + Logger: logger, }) assert.Nil(t, server) require.EqualError(t, err, "listen tcp: address -1: invalid port") @@ -39,22 +44,23 @@ func TestFailToListenHTTP(t *testing.T) { func TestCreateTLSHTTPServerError(t *testing.T) { logger, _ := zap.NewDevelopment() - tlsCfg := tlscfg.Options{ - Enabled: true, - CertPath: "invalid/path", - KeyPath: "invalid/path", - ClientCAPath: "invalid/path", - } params := &HTTPServerParams{ - HostPort: fmt.Sprintf(":%d", ports.CollectorHTTP), + ServerConfig: confighttp.ServerConfig{ + Endpoint: ":0", + TLSSetting: &configtls.ServerConfig{ + Config: configtls.Config{ + CertFile: "invalid/path", + KeyFile: "invalid/path", + CAFile: "invalid/path", + }, + }, + }, HealthCheck: healthcheck.New(), Logger: logger, - TLSConfig: tlsCfg, } _, err := StartHTTPServer(params) require.Error(t, err) - defer params.TLSConfig.Close() } func TestSpanCollectorHTTP(t *testing.T) { @@ -69,11 +75,15 @@ func TestSpanCollectorHTTP(t *testing.T) { Logger: logger, } - server := httptest.NewServer(nil) - - serveHTTP(server.Config, server.Listener, params) + server, _ := params.ToServer(context.Background(), nil, component.TelemetrySettings{}, nil) + listener, _ := params.ToListener(context.Background()) - response, err := http.Post(server.URL, "", nil) + serveHTTP(server, listener, params) + addr := listener.Addr().String() + host, port, err := net.SplitHostPort(addr) + require.NoError(t, err) + url := fmt.Sprintf("http://%s:%s", host, port) + response, err := http.Post(url, "", nil) require.NoError(t, err) assert.NotNil(t, response) defer response.Body.Close() @@ -188,15 +198,16 @@ func TestSpanCollectorHTTPS(t *testing.T) { mFact := metricstest.NewFactory(time.Hour) defer mFact.Backend.Stop() params := &HTTPServerParams{ - HostPort: fmt.Sprintf(":%d", ports.CollectorHTTP), + ServerConfig: confighttp.ServerConfig{ + Endpoint: fmt.Sprintf(":%d", ports.CollectorHTTP), + TLSSetting: test.TLS.ToOtelServerConfig(), + }, Handler: handler.NewJaegerSpanHandler(logger, &mockSpanProcessor{}), SamplingProvider: &mockSamplingProvider{}, MetricsFactory: mFact, HealthCheck: healthcheck.New(), Logger: logger, - TLSConfig: test.TLS, } - defer params.TLSConfig.Close() server, err := StartHTTPServer(params) require.NoError(t, err) @@ -204,8 +215,7 @@ func TestSpanCollectorHTTPS(t *testing.T) { require.NoError(t, server.Close()) }() - clientTLSCfg, err0 := test.clientTLS.Config(logger) - defer test.clientTLS.Close() + clientTLSCfg, err0 := test.clientTLS.ToOtelClientConfig().LoadTLSConfig(context.Background()) require.NoError(t, err0) dialer := &net.Dialer{Timeout: 2 * time.Second} conn, clientError := tls.DialWithDialer(dialer, "tcp", "localhost:"+strconv.Itoa(ports.CollectorHTTP), clientTLSCfg) @@ -250,15 +260,17 @@ func TestStartHTTPServerParams(t *testing.T) { mFact := metricstest.NewFactory(time.Hour) defer mFact.Stop() params := &HTTPServerParams{ - HostPort: fmt.Sprintf(":%d", ports.CollectorHTTP), - Handler: handler.NewJaegerSpanHandler(logger, &mockSpanProcessor{}), - SamplingProvider: &mockSamplingProvider{}, - MetricsFactory: mFact, - HealthCheck: healthcheck.New(), - Logger: logger, - IdleTimeout: 5 * time.Minute, - ReadTimeout: 6 * time.Minute, - ReadHeaderTimeout: 7 * time.Second, + ServerConfig: confighttp.ServerConfig{ + Endpoint: fmt.Sprintf(":%d", ports.CollectorHTTP), + IdleTimeout: 5 * time.Minute, + ReadTimeout: 6 * time.Minute, + ReadHeaderTimeout: 7 * time.Second, + }, + Handler: handler.NewJaegerSpanHandler(logger, &mockSpanProcessor{}), + SamplingProvider: &mockSamplingProvider{}, + MetricsFactory: mFact, + HealthCheck: healthcheck.New(), + Logger: logger, } server, err := StartHTTPServer(params) diff --git a/cmd/collector/main.go b/cmd/collector/main.go index 52528ada6fb..29303cecdbb 100644 --- a/cmd/collector/main.go +++ b/cmd/collector/main.go @@ -95,7 +95,7 @@ func main() { if err != nil { logger.Fatal("Failed to initialize collector", zap.Error(err)) } - tm := tenancy.NewManager(&collectorOpts.GRPC.Tenancy) + tm := tenancy.NewManager(&collectorOpts.Tenancy) collector := app.New(&app.CollectorParams{ ServiceName: serviceName, diff --git a/cmd/internal/flags/admin.go b/cmd/internal/flags/admin.go index e868eff7f3b..6078fb97c3b 100644 --- a/cmd/internal/flags/admin.go +++ b/cmd/internal/flags/admin.go @@ -5,23 +5,23 @@ package flags import ( "context" - "crypto/tls" "errors" "flag" "fmt" - "io" "net" "net/http" "net/http/pprof" - "time" + "sync" "github.com/spf13/viper" + "go.opentelemetry.io/collector/config/confighttp" "go.uber.org/zap" "go.uber.org/zap/zapcore" "github.com/jaegertracing/jaeger/pkg/config/tlscfg" "github.com/jaegertracing/jaeger/pkg/healthcheck" "github.com/jaegertracing/jaeger/pkg/recoveryhandler" + "github.com/jaegertracing/jaeger/pkg/telemetry" "github.com/jaegertracing/jaeger/pkg/version" ) @@ -35,22 +35,23 @@ var tlsAdminHTTPFlagsConfig = tlscfg.ServerFlagsConfig{ // AdminServer runs an HTTP server with admin endpoints, such as healthcheck at /, /metrics, etc. type AdminServer struct { - logger *zap.Logger - adminHostPort string - hc *healthcheck.HealthCheck - mux *http.ServeMux - server *http.Server - tlsCfg *tls.Config - tlsCertWatcherCloser io.Closer + logger *zap.Logger + hc *healthcheck.HealthCheck + mux *http.ServeMux + server *http.Server + serverCfg confighttp.ServerConfig + stopped sync.WaitGroup } // NewAdminServer creates a new admin server. func NewAdminServer(hostPort string) *AdminServer { return &AdminServer{ - adminHostPort: hostPort, - logger: zap.NewNop(), - hc: healthcheck.New(), - mux: http.NewServeMux(), + logger: zap.NewNop(), + hc: healthcheck.New(), + mux: http.NewServeMux(), + serverCfg: confighttp.ServerConfig{ + Endpoint: hostPort, + }, } } @@ -67,7 +68,7 @@ func (s *AdminServer) setLogger(logger *zap.Logger) { // AddFlags registers CLI flags. func (s *AdminServer) AddFlags(flagSet *flag.FlagSet) { - flagSet.String(adminHTTPHostPort, s.adminHostPort, fmt.Sprintf("The host:port (e.g. 127.0.0.1%s or %s) for the admin server, including health check, /metrics, etc.", s.adminHostPort, s.adminHostPort)) + flagSet.String(adminHTTPHostPort, s.serverCfg.Endpoint, fmt.Sprintf("The host:port (e.g. 127.0.0.1%s or %s) for the admin server, including health check, /metrics, etc.", s.serverCfg.Endpoint, s.serverCfg.Endpoint)) tlsAdminHTTPFlagsConfig.AddFlags(flagSet) } @@ -75,22 +76,13 @@ func (s *AdminServer) AddFlags(flagSet *flag.FlagSet) { func (s *AdminServer) initFromViper(v *viper.Viper, logger *zap.Logger) error { s.setLogger(logger) - s.adminHostPort = v.GetString(adminHTTPHostPort) - var tlsAdminHTTP tlscfg.Options tlsAdminHTTP, err := tlsAdminHTTPFlagsConfig.InitFromViper(v) if err != nil { return fmt.Errorf("failed to parse admin server TLS options: %w", err) } - if tlsAdminHTTP.Enabled { - tlsCfg, err := tlsAdminHTTP.Config(s.logger) // This checks if the certificates are correctly provided - if err != nil { - return err - } - s.tlsCfg = tlsCfg - s.tlsCertWatcherCloser = &tlsAdminHTTP - } else { - s.tlsCertWatcherCloser = io.NopCloser(nil) - } + + s.serverCfg.Endpoint = v.GetString(adminHTTPHostPort) + s.serverCfg.TLSSetting = tlsAdminHTTP.ToOtelServerConfig() return nil } @@ -101,48 +93,52 @@ func (s *AdminServer) Handle(path string, handler http.Handler) { // Serve starts HTTP server. func (s *AdminServer) Serve() error { - l, err := net.Listen("tcp", s.adminHostPort) + l, err := s.serverCfg.ToListener(context.Background()) if err != nil { s.logger.Error("Admin server failed to listen", zap.Error(err)) return err } - s.serveWithListener(l) - s.logger.Info( - "Admin server started", - zap.String("http.host-port", l.Addr().String()), - zap.Stringer("health-status", s.hc.Get())) - return nil + return s.serveWithListener(l) } -func (s *AdminServer) serveWithListener(l net.Listener) { +func (s *AdminServer) serveWithListener(l net.Listener) (err error) { s.logger.Info("Mounting health check on admin server", zap.String("route", "/")) s.mux.Handle("/", s.hc.Handler()) version.RegisterHandler(s.mux, s.logger) s.registerPprofHandlers() recoveryHandler := recoveryhandler.NewRecoveryHandler(s.logger, true) - errorLog, _ := zap.NewStdLogAt(s.logger, zapcore.ErrorLevel) - s.server = &http.Server{ - Handler: recoveryHandler(s.mux), - ErrorLog: errorLog, - ReadHeaderTimeout: 2 * time.Second, - } - if s.tlsCfg != nil { - s.server.TLSConfig = s.tlsCfg + s.server, err = s.serverCfg.ToServer( + context.Background(), + nil, // host + telemetry.NoopSettings().ToOtelComponent(), + recoveryHandler(s.mux), + ) + if err != nil { + return fmt.Errorf("failed to create admin server: %w", err) } - s.logger.Info("Starting admin HTTP server", zap.String("http-addr", s.adminHostPort)) + errorLog, _ := zap.NewStdLogAt(s.logger, zapcore.ErrorLevel) + s.server.ErrorLog = errorLog + + s.logger.Info("Starting admin HTTP server") + var wg sync.WaitGroup + wg.Add(1) + s.stopped.Add(1) go func() { - var err error - if s.tlsCfg != nil { - err = s.server.ServeTLS(l, "", "") - } else { - err = s.server.Serve(l) - } + wg.Done() + defer s.stopped.Done() + err := s.server.Serve(l) if err != nil && !errors.Is(err, http.ErrServerClosed) { s.logger.Error("failed to serve", zap.Error(err)) s.hc.Set(healthcheck.Broken) } }() + wg.Wait() // wait for the server to start listening + s.logger.Info( + "Admin server started", + zap.String("http.host-port", l.Addr().String()), + zap.Stringer("health-status", s.hc.Get())) + return nil } func (s *AdminServer) registerPprofHandlers() { @@ -159,8 +155,7 @@ func (s *AdminServer) registerPprofHandlers() { // Close stops the HTTP server func (s *AdminServer) Close() error { - return errors.Join( - s.tlsCertWatcherCloser.Close(), - s.server.Shutdown(context.Background()), - ) + err := s.server.Shutdown(context.Background()) + s.stopped.Wait() + return err } diff --git a/cmd/internal/flags/admin_test.go b/cmd/internal/flags/admin_test.go index 74eaaf525fe..8d83e46d99c 100644 --- a/cmd/internal/flags/admin_test.go +++ b/cmd/internal/flags/admin_test.go @@ -4,6 +4,7 @@ package flags import ( + "context" "crypto/tls" "fmt" "net" @@ -69,7 +70,7 @@ func TestAdminFailToServe(t *testing.T) { require.NoError(t, adminServer.initFromViper(v, logger)) adminServer.serveWithListener(l) - defer adminServer.Close() + t.Cleanup(func() { assert.NoError(t, adminServer.Close()) }) waitForEqual(t, healthcheck.Broken, func() any { return adminServer.HC().Get() }) @@ -133,9 +134,8 @@ func TestAdminServerTLS(t *testing.T) { adminServer.Serve() defer adminServer.Close() - clientTLSCfg, err0 := test.clientTLS.Config(zap.NewNop()) + clientTLSCfg, err0 := test.clientTLS.ToOtelClientConfig().LoadTLSConfig(context.Background()) require.NoError(t, err0) - defer test.clientTLS.Close() dialer := &net.Dialer{Timeout: 2 * time.Second} conn, clientError := tls.DialWithDialer(dialer, "tcp", fmt.Sprintf("localhost:%d", ports.CollectorAdminHTTP), clientTLSCfg) require.NoError(t, clientError) diff --git a/cmd/internal/flags/service_test.go b/cmd/internal/flags/service_test.go index f50a2bbe97f..4157b1c7c6c 100644 --- a/cmd/internal/flags/service_test.go +++ b/cmd/internal/flags/service_test.go @@ -50,7 +50,7 @@ func TestStartErrors(t *testing.T) { { name: "bad admin TLS", flags: []string{"--admin.http.tls.enabled=true", "--admin.http.tls.cert=invalid-cert"}, - expErr: "cannot initialize admin server", + expErr: "cannot start the admin server: failed to load TLS config", }, { name: "bad host:port", diff --git a/cmd/query/app/querysvc/adjuster/adjuster.go b/cmd/query/app/querysvc/adjuster/adjuster.go new file mode 100644 index 00000000000..748eee10b07 --- /dev/null +++ b/cmd/query/app/querysvc/adjuster/adjuster.go @@ -0,0 +1,60 @@ +// Copyright (c) 2024 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package adjuster + +import ( + "errors" + + "go.opentelemetry.io/collector/pdata/ptrace" +) + +// Adjuster defines an interface for modifying a trace object. +// It returns the adjusted trace object, which is also updated in place. +// If the adjuster encounters an issue that prevents it from applying +// modifications, it should return the original trace object along with an error. +type Adjuster interface { + Adjust(ptrace.Traces) (ptrace.Traces, error) +} + +// Func is a type alias that wraps a function and makes an Adjuster from it. +type Func func(trace ptrace.Traces) (ptrace.Traces, error) + +// Adjust implements Adjuster interface for the Func alias. +func (f Func) Adjust(trace ptrace.Traces) (ptrace.Traces, error) { + return f(trace) +} + +// Sequence creates an adjuster that combines a series of adjusters +// applied in order. Errors from each step are accumulated and returned +// in the end as a single wrapper error. Errors do not interrupt the +// sequence of adapters. +func Sequence(adjusters ...Adjuster) Adjuster { + return sequence{adjusters: adjusters} +} + +// FailFastSequence is similar to Sequence() but returns immediately +// if any adjuster returns an error. +func FailFastSequence(adjusters ...Adjuster) Adjuster { + return sequence{adjusters: adjusters, failFast: true} +} + +type sequence struct { + adjusters []Adjuster + failFast bool +} + +func (c sequence) Adjust(trace ptrace.Traces) (ptrace.Traces, error) { + var errs []error + for _, adjuster := range c.adjusters { + var err error + trace, err = adjuster.Adjust(trace) + if err != nil { + if c.failFast { + return trace, err + } + errs = append(errs, err) + } + } + return trace, errors.Join(errs...) +} diff --git a/cmd/query/app/querysvc/adjuster/adjuster_test.go b/cmd/query/app/querysvc/adjuster/adjuster_test.go new file mode 100644 index 00000000000..96cd4336988 --- /dev/null +++ b/cmd/query/app/querysvc/adjuster/adjuster_test.go @@ -0,0 +1,68 @@ +// Copyright (c) 2024 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package adjuster_test + +import ( + "errors" + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/ptrace" + + "github.com/jaegertracing/jaeger/cmd/query/app/querysvc/adjuster" +) + +func TestSequences(t *testing.T) { + // mock adjuster that increments last byte of span ID + adj := adjuster.Func(func(trace ptrace.Traces) (ptrace.Traces, error) { + span := trace.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0) + spanId := span.SpanID() + spanId[7]++ + span.SetSpanID(spanId) + return trace, nil + }) + + adjErr := errors.New("mock adjuster error") + failingAdj := adjuster.Func(func(trace ptrace.Traces) (ptrace.Traces, error) { + return trace, adjErr + }) + + tests := []struct { + name string + adjuster adjuster.Adjuster + err string + lastSpanID pcommon.SpanID + }{ + { + name: "normal sequence", + adjuster: adjuster.Sequence(adj, failingAdj, adj, failingAdj), + err: fmt.Sprintf("%s\n%s", adjErr, adjErr), + lastSpanID: [8]byte{0, 0, 0, 0, 0, 0, 0, 2}, + }, + { + name: "fail fast sequence", + adjuster: adjuster.FailFastSequence(adj, failingAdj, adj, failingAdj), + err: adjErr.Error(), + lastSpanID: [8]byte{0, 0, 0, 0, 0, 0, 0, 1}, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + trace := ptrace.NewTraces() + span := trace.ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty().Spans().AppendEmpty() + span.SetSpanID([8]byte{0, 0, 0, 0, 0, 0, 0, 0}) + + adjTrace, err := test.adjuster.Adjust(trace) + adjTraceSpan := adjTrace.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0) + + assert.Equal(t, span, adjTraceSpan) + assert.EqualValues(t, test.lastSpanID, span.SpanID()) + require.EqualError(t, err, test.err) + }) + } +} diff --git a/cmd/query/app/querysvc/adjuster/package_test.go b/cmd/query/app/querysvc/adjuster/package_test.go new file mode 100644 index 00000000000..d5e9585aa5c --- /dev/null +++ b/cmd/query/app/querysvc/adjuster/package_test.go @@ -0,0 +1,14 @@ +// Copyright (c) 2024 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package adjuster + +import ( + "testing" + + "github.com/jaegertracing/jaeger/pkg/testutils" +) + +func TestMain(m *testing.M) { + testutils.VerifyGoLeaks(m) +} diff --git a/go.mod b/go.mod index 003fc057822..656614e68cd 100644 --- a/go.mod +++ b/go.mod @@ -20,7 +20,7 @@ require ( github.com/gogo/protobuf v1.3.2 github.com/gorilla/handlers v1.5.2 github.com/gorilla/mux v1.8.1 - github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.1.0 + github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.2.0 github.com/kr/pretty v0.3.1 github.com/olivere/elastic v6.2.37+incompatible github.com/open-telemetry/opentelemetry-collector-contrib/connector/spanmetricsconnector v0.115.0 @@ -262,7 +262,7 @@ require ( go.opentelemetry.io/proto/otlp v1.3.1 // indirect go.uber.org/atomic v1.11.0 // indirect go.uber.org/multierr v1.11.0 // indirect - golang.org/x/crypto v0.30.0 // indirect + golang.org/x/crypto v0.31.0 // indirect golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842 // indirect golang.org/x/text v0.21.0 // indirect gonum.org/v1/gonum v0.15.1 // indirect diff --git a/go.sum b/go.sum index 308f93da133..332af178994 100644 --- a/go.sum +++ b/go.sum @@ -248,8 +248,8 @@ github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWm github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/grafana/regexp v0.0.0-20240518133315-a468a5bfb3bc h1:GN2Lv3MGO7AS6PrRoT6yV5+wkrOpcszoIsO4+4ds248= github.com/grafana/regexp v0.0.0-20240518133315-a468a5bfb3bc/go.mod h1:+JKpmjMGhpgPL+rXZ5nsZieVzvarn86asRlBg4uNGnk= -github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.1.0 h1:pRhl55Yx1eC7BZ1N+BBWwnKaMyD8uC+34TLdndZMAKk= -github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.1.0/go.mod h1:XKMd7iuf/RGPSMJ/U4HP0zS2Z9Fh8Ps9a+6X26m/tmI= +github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.2.0 h1:kQ0NI7W1B3HwiN5gAYtY+XFItDPbLBwYRxAqbFTyDes= +github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.2.0/go.mod h1:zrT2dxOAjNFPRGjTUe2Xmb4q4YdUwVvQFV6xiCSf+z0= github.com/grpc-ecosystem/grpc-gateway/v2 v2.23.0 h1:ad0vkEBuk23VJzZR9nkLVG0YAoN9coASF1GusYX6AlU= github.com/grpc-ecosystem/grpc-gateway/v2 v2.23.0/go.mod h1:igFoXX2ELCW06bol23DWPB5BEWfZISOzSP5K2sbLea0= github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed h1:5upAirOpQc1Q53c0bnx2ufif5kANL7bfZWcc6VJWJd8= @@ -786,8 +786,8 @@ golang.org/x/crypto v0.0.0-20201112155050-0c6587e931a9/go.mod h1:LzIPMQfyMNhhGPh golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20220214200702-86341886e292/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58= -golang.org/x/crypto v0.30.0 h1:RwoQn3GkWiMkzlX562cLB7OxWvjH1L8xutO2WoJcRoY= -golang.org/x/crypto v0.30.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk= +golang.org/x/crypto v0.31.0 h1:ihbySMvVjLAeSH1IbfcRTkD/iNscyz8rGzjF/E5hV6U= +golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk= golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= diff --git a/pkg/config/corscfg/flags.go b/pkg/config/corscfg/flags.go index c58b6b44bc4..892e177d7f7 100644 --- a/pkg/config/corscfg/flags.go +++ b/pkg/config/corscfg/flags.go @@ -8,6 +8,7 @@ import ( "strings" "github.com/spf13/viper" + "go.opentelemetry.io/collector/config/confighttp" ) const ( @@ -25,8 +26,8 @@ func (c Flags) AddFlags(flags *flag.FlagSet) { flags.String(c.Prefix+corsAllowedOrigins, "", "Comma-separated CORS allowed origins. See https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Access-Control-Allow-Origin") } -func (c Flags) InitFromViper(v *viper.Viper) Options { - var p Options +func (c Flags) InitFromViper(v *viper.Viper) *confighttp.CORSConfig { + var p confighttp.CORSConfig allowedHeaders := v.GetString(c.Prefix + corsAllowedHeaders) allowedOrigins := v.GetString(c.Prefix + corsAllowedOrigins) @@ -34,5 +35,5 @@ func (c Flags) InitFromViper(v *viper.Viper) Options { p.AllowedOrigins = strings.Split(strings.ReplaceAll(allowedOrigins, " ", ""), ",") p.AllowedHeaders = strings.Split(strings.ReplaceAll(allowedHeaders, " ", ""), ",") - return p + return &p } diff --git a/pkg/config/corscfg/flags_test.go b/pkg/config/corscfg/flags_test.go index 4ddb292e1fe..2ca108b6c2b 100644 --- a/pkg/config/corscfg/flags_test.go +++ b/pkg/config/corscfg/flags_test.go @@ -9,6 +9,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/config/confighttp" "github.com/jaegertracing/jaeger/pkg/config" "github.com/jaegertracing/jaeger/pkg/testutils" @@ -31,10 +32,10 @@ func TestCORSFlags(t *testing.T) { corsOpts := flagCfg.InitFromViper(v) fmt.Println(corsOpts) - assert.Equal(t, Options{ + assert.Equal(t, confighttp.CORSConfig{ AllowedHeaders: []string{"Content-Type", "Accept", "X-Requested-With"}, AllowedOrigins: []string{"http://example.domain.com", "http://*.domain.com"}, - }, corsOpts) + }, *corsOpts) }) } diff --git a/pkg/config/corscfg/options.go b/pkg/config/corscfg/options.go deleted file mode 100644 index 9dd1f7ffd5a..00000000000 --- a/pkg/config/corscfg/options.go +++ /dev/null @@ -1,9 +0,0 @@ -// Copyright (c) 2023 The Jaeger Authors. -// SPDX-License-Identifier: Apache-2.0 - -package corscfg - -type Options struct { - AllowedOrigins []string - AllowedHeaders []string -} diff --git a/pkg/telemetry/settings.go b/pkg/telemetry/settings.go index 1f76b38e107..5b58fd7583d 100644 --- a/pkg/telemetry/settings.go +++ b/pkg/telemetry/settings.go @@ -70,3 +70,11 @@ func FromOtelComponent(telset component.TelemetrySettings, host component.Host) Host: host, } } + +func (s Settings) ToOtelComponent() component.TelemetrySettings { + return component.TelemetrySettings{ + Logger: s.Logger, + MeterProvider: s.MeterProvider, + TracerProvider: s.TracerProvider, + } +}