From 460c6111b7842ea6d616afe02adb0e3d693437e8 Mon Sep 17 00:00:00 2001 From: Pavol Loffay Date: Tue, 12 May 2020 17:32:43 +0200 Subject: [PATCH] Configure Jaeger receiver and exporter by flags Signed-off-by: Pavol Loffay --- cmd/collector/app/builder_flags.go | 16 ++--- .../app/defaults/default_config.go | 31 ++++++---- .../jaegerexporter/jaeger_exporter.go | 8 ++- .../jaegerreceiver/jaeger_receiver.go | 61 ++++++++++++++++++- cmd/opentelemetry-collector/cmd/agent/main.go | 1 + .../cmd/collector/main.go | 1 + cmd/opentelemetry-collector/go.mod | 2 +- cmd/opentelemetry-collector/go.sum | 4 ++ 8 files changed, 99 insertions(+), 25 deletions(-) diff --git a/cmd/collector/app/builder_flags.go b/cmd/collector/app/builder_flags.go index 06f2768e67e..c62b88c511e 100644 --- a/cmd/collector/app/builder_flags.go +++ b/cmd/collector/app/builder_flags.go @@ -32,8 +32,8 @@ const ( collectorNumWorkers = "collector.num-workers" collectorHTTPPort = "collector.http-port" collectorGRPCPort = "collector.grpc-port" - collectorHTTPHostPort = "collector.http-server.host-port" - collectorGRPCHostPort = "collector.grpc-server.host-port" + CollectorHTTPHostPort = "collector.http-server.host-port" + CollectorGRPCHostPort = "collector.grpc-server.host-port" collectorZipkinHTTPPort = "collector.zipkin.http-port" collectorZipkinHTTPHostPort = "collector.zipkin.host-port" collectorTags = "collector.tags" @@ -79,11 +79,11 @@ type CollectorOptions struct { func AddFlags(flags *flag.FlagSet) { flags.Int(collectorQueueSize, DefaultQueueSize, "The queue size of the collector") flags.Int(collectorNumWorkers, DefaultNumWorkers, "The number of workers pulling items from the queue") - flags.Int(collectorHTTPPort, 0, collectorHTTPPortWarning+" see --"+collectorHTTPHostPort) - flags.Int(collectorGRPCPort, 0, collectorGRPCPortWarning+" see --"+collectorGRPCHostPort) + flags.Int(collectorHTTPPort, 0, collectorHTTPPortWarning+" see --"+CollectorHTTPHostPort) + flags.Int(collectorGRPCPort, 0, collectorGRPCPortWarning+" see --"+CollectorGRPCHostPort) flags.Int(collectorZipkinHTTPPort, 0, collectorZipkinHTTPPortWarning+" see --"+collectorZipkinHTTPHostPort) - flags.String(collectorHTTPHostPort, ports.PortToHostPort(ports.CollectorHTTP), "The host:port (e.g. 127.0.0.1:5555 or :5555) of the collector's HTTP server") - flags.String(collectorGRPCHostPort, ports.PortToHostPort(ports.CollectorGRPC), "The host:port (e.g. 127.0.0.1:5555 or :5555) of the collector's GRPC server") + flags.String(CollectorHTTPHostPort, ports.PortToHostPort(ports.CollectorHTTP), "The host:port (e.g. 127.0.0.1:5555 or :5555) of the collector's HTTP server") + flags.String(CollectorGRPCHostPort, ports.PortToHostPort(ports.CollectorGRPC), "The host:port (e.g. 127.0.0.1:5555 or :5555) of the collector's GRPC server") flags.String(collectorZipkinHTTPHostPort, ports.PortToHostPort(0), "The host:port (e.g. 127.0.0.1:5555 or :5555) of the collector's Zipkin server") flags.Uint(collectorDynQueueSizeMemory, 0, "(experimental) The max memory size in MiB to use for the dynamic queue.") flags.String(collectorTags, "", "One or more tags to be added to the Process tags of all spans passing through this collector. Ex: key1=value1,key2=${envVar:defaultValue}") @@ -97,8 +97,8 @@ func (cOpts *CollectorOptions) InitFromViper(v *viper.Viper) *CollectorOptions { cOpts.DynQueueSizeMemory = v.GetUint(collectorDynQueueSizeMemory) * 1024 * 1024 // we receive in MiB and store in bytes cOpts.QueueSize = v.GetInt(collectorQueueSize) cOpts.NumWorkers = v.GetInt(collectorNumWorkers) - cOpts.CollectorHTTPHostPort = getAddressFromCLIOptions(v.GetInt(collectorHTTPPort), v.GetString(collectorHTTPHostPort)) - cOpts.CollectorGRPCHostPort = getAddressFromCLIOptions(v.GetInt(collectorGRPCPort), v.GetString(collectorGRPCHostPort)) + cOpts.CollectorHTTPHostPort = getAddressFromCLIOptions(v.GetInt(collectorHTTPPort), v.GetString(CollectorHTTPHostPort)) + cOpts.CollectorGRPCHostPort = getAddressFromCLIOptions(v.GetInt(collectorGRPCPort), v.GetString(CollectorGRPCHostPort)) cOpts.CollectorZipkinHTTPHostPort = getAddressFromCLIOptions(v.GetInt(collectorZipkinHTTPPort), v.GetString(collectorZipkinHTTPHostPort)) cOpts.CollectorTags = flags.ParseJaegerTags(v.GetString(collectorTags)) cOpts.CollectorZipkinAllowedOrigins = v.GetString(collectorZipkinAllowedOrigins) diff --git a/cmd/opentelemetry-collector/app/defaults/default_config.go b/cmd/opentelemetry-collector/app/defaults/default_config.go index 4092f75244e..6ede8202b40 100644 --- a/cmd/opentelemetry-collector/app/defaults/default_config.go +++ b/cmd/opentelemetry-collector/app/defaults/default_config.go @@ -76,19 +76,24 @@ func CollectorConfig(storageType string, zipkinHostPort string, factories config func createCollectorReceivers(zipkinHostPort string, factories config.Factories) configmodels.Receivers { jaeger := factories.Receivers["jaeger"].CreateDefaultConfig().(*jaegerreceiver.Config) - // TODO load and serve sampling strategies - // TODO bind sampling strategies file - jaeger.Protocols = map[string]*receiver.SecureReceiverSettings{ - "grpc": { + if jaeger.Protocols == nil { + jaeger.Protocols = map[string]*receiver.SecureReceiverSettings{} + } + // The CreateDefaultConfig is enabling protocols from flags + // we do not want to override it here + if _, ok := jaeger.Protocols["grpc"]; !ok { + jaeger.Protocols["grpc"] = &receiver.SecureReceiverSettings{ ReceiverSettings: configmodels.ReceiverSettings{ Endpoint: gRPCEndpoint, }, - }, - "thrift_http": { + } + } + if _, ok := jaeger.Protocols["thrift_http"]; !ok { + jaeger.Protocols["thrift_http"] = &receiver.SecureReceiverSettings{ ReceiverSettings: configmodels.ReceiverSettings{ Endpoint: httpThriftBinaryEndpoint, }, - }, + } } recvs := map[string]configmodels.Receiver{ "jaeger": jaeger, @@ -159,17 +164,19 @@ func AgentConfig(factories config.Factories) *configmodels.Config { func createAgentReceivers(factories config.Factories) configmodels.Receivers { jaeger := factories.Receivers["jaeger"].CreateDefaultConfig().(*jaegerreceiver.Config) - jaeger.Protocols = map[string]*receiver.SecureReceiverSettings{ - "thrift_compact": { + if _, ok := jaeger.Protocols["thrift_compact"]; !ok { + jaeger.Protocols["thrift_compact"] = &receiver.SecureReceiverSettings{ ReceiverSettings: configmodels.ReceiverSettings{ Endpoint: udpThriftCompactEndpoint, }, - }, - "thrift_binary": { + } + } + if _, ok := jaeger.Protocols["thrift_binary"]; !ok { + jaeger.Protocols["thrift_binary"] = &receiver.SecureReceiverSettings{ ReceiverSettings: configmodels.ReceiverSettings{ Endpoint: udpThriftBinaryEndpoint, }, - }, + } } recvs := configmodels.Receivers{ "jaeger": jaeger, diff --git a/cmd/opentelemetry-collector/app/exporter/jaegerexporter/jaeger_exporter.go b/cmd/opentelemetry-collector/app/exporter/jaegerexporter/jaeger_exporter.go index e7a64e76c26..2e84db08535 100644 --- a/cmd/opentelemetry-collector/app/exporter/jaegerexporter/jaeger_exporter.go +++ b/cmd/opentelemetry-collector/app/exporter/jaegerexporter/jaeger_exporter.go @@ -50,9 +50,11 @@ func (f Factory) CreateDefaultConfig() configmodels.Exporter { if len(repCfg.CollectorHostPorts) > 0 { cfg.Endpoint = repCfg.CollectorHostPorts[0] } - cfg.GRPCSettings.UseSecure = repCfg.TLS.Enabled - cfg.GRPCSettings.CertPemFile = repCfg.TLS.CertPath - cfg.GRPCSettings.ServerNameOverride = repCfg.TLS.ServerName + cfg.GRPCSettings.TLSConfig.UseSecure = repCfg.TLS.Enabled + cfg.GRPCSettings.TLSConfig.CaCert = repCfg.TLS.CAPath + cfg.GRPCSettings.TLSConfig.ClientCert = repCfg.TLS.CertPath + cfg.GRPCSettings.TLSConfig.ClientKey = repCfg.TLS.KeyPath + cfg.GRPCSettings.TLSConfig.ServerNameOverride = repCfg.TLS.ServerName return cfg } diff --git a/cmd/opentelemetry-collector/app/receiver/jaegerreceiver/jaeger_receiver.go b/cmd/opentelemetry-collector/app/receiver/jaegerreceiver/jaeger_receiver.go index f90f4ca8c12..da1a3bd931e 100644 --- a/cmd/opentelemetry-collector/app/receiver/jaegerreceiver/jaeger_receiver.go +++ b/cmd/opentelemetry-collector/app/receiver/jaegerreceiver/jaeger_receiver.go @@ -20,13 +20,21 @@ import ( "github.com/open-telemetry/opentelemetry-collector/component" "github.com/open-telemetry/opentelemetry-collector/config/configmodels" "github.com/open-telemetry/opentelemetry-collector/consumer" + "github.com/open-telemetry/opentelemetry-collector/receiver" "github.com/open-telemetry/opentelemetry-collector/receiver/jaegerreceiver" "github.com/spf13/viper" + agentApp "github.com/jaegertracing/jaeger/cmd/agent/app" "github.com/jaegertracing/jaeger/cmd/agent/app/reporter/grpc" + collectorApp "github.com/jaegertracing/jaeger/cmd/collector/app" "github.com/jaegertracing/jaeger/plugin/sampling/strategystore/static" ) +const ( + thriftBinaryHostPort = "processor.jaeger-binary.server-host-port" + thriftCompactHostPort = "processor.jaeger-compact.server-host-port" +) + // Factory wraps jaegerreceiver.Factory and makes the default config configurable via viper. // For instance this enables using flags as default values in the config object. type Factory struct { @@ -48,9 +56,55 @@ func (f *Factory) Type() configmodels.Type { func (f *Factory) CreateDefaultConfig() configmodels.Receiver { cfg := f.Wrapped.CreateDefaultConfig().(*jaegerreceiver.Config) cfg.RemoteSampling = createDefaultSamplingConfig(f.Viper) + configureAgent(f.Viper, cfg) + configureCollector(f.Viper, cfg) return cfg } +func configureAgent(v *viper.Viper, cfg *jaegerreceiver.Config) { + aOpts := agentApp.Builder{} + aOpts.InitFromViper(v) + if v.IsSet(thriftBinaryHostPort) { + cfg.Protocols["thrift_binary"] = &receiver.SecureReceiverSettings{ + ReceiverSettings: configmodels.ReceiverSettings{ + Endpoint: v.GetString(thriftBinaryHostPort), + }, + } + } + if v.IsSet(thriftCompactHostPort) { + cfg.Protocols["thrift_compact"] = &receiver.SecureReceiverSettings{ + ReceiverSettings: configmodels.ReceiverSettings{ + Endpoint: v.GetString(thriftCompactHostPort), + }, + } + } +} + +func configureCollector(v *viper.Viper, cfg *jaegerreceiver.Config) { + cOpts := collectorApp.CollectorOptions{} + cOpts.InitFromViper(v) + if v.IsSet(collectorApp.CollectorGRPCHostPort) { + cfg.Protocols["grpc"] = &receiver.SecureReceiverSettings{ + ReceiverSettings: configmodels.ReceiverSettings{ + Endpoint: cOpts.CollectorGRPCHostPort, + }, + } + if cOpts.TLS.ClientCAPath != "" && cOpts.TLS.KeyPath != "" { + cfg.Protocols["grpc"].TLSCredentials = &receiver.TLSCredentials{ + KeyFile: cOpts.TLS.KeyPath, + CertFile: cOpts.TLS.CertPath, + } + } + } + if v.IsSet(collectorApp.CollectorHTTPHostPort) { + cfg.Protocols["thrift_http"] = &receiver.SecureReceiverSettings{ + ReceiverSettings: configmodels.ReceiverSettings{ + Endpoint: cOpts.CollectorHTTPHostPort, + }, + } + } +} + func createDefaultSamplingConfig(v *viper.Viper) *jaegerreceiver.RemoteSamplingConfig { var samplingConf *jaegerreceiver.RemoteSamplingConfig strategyFile := v.GetString(static.SamplingStrategiesFile) @@ -68,7 +122,12 @@ func createDefaultSamplingConfig(v *viper.Viper) *jaegerreceiver.RemoteSamplingC if samplingConf == nil { samplingConf = &jaegerreceiver.RemoteSamplingConfig{} } - samplingConf.FetchEndpoint = repCfg.CollectorHostPorts[0] + samplingConf.GRPCSettings.Endpoint = repCfg.CollectorHostPorts[0] + samplingConf.GRPCSettings.TLSConfig.UseSecure = repCfg.TLS.Enabled + samplingConf.GRPCSettings.TLSConfig.CaCert = repCfg.TLS.CAPath + samplingConf.GRPCSettings.TLSConfig.ClientCert = repCfg.TLS.CertPath + samplingConf.GRPCSettings.TLSConfig.ClientKey = repCfg.TLS.KeyPath + samplingConf.GRPCSettings.TLSConfig.ServerNameOverride = repCfg.TLS.ServerName } return samplingConf } diff --git a/cmd/opentelemetry-collector/cmd/agent/main.go b/cmd/opentelemetry-collector/cmd/agent/main.go index 508627d12c5..c1963e67886 100644 --- a/cmd/opentelemetry-collector/cmd/agent/main.go +++ b/cmd/opentelemetry-collector/cmd/agent/main.go @@ -75,6 +75,7 @@ func main() { handleErr(err) cmd := svc.Command() + // TODO add collector HTTP thrift and gRPC server host port flags jconfig.AddFlags(v, cmd, jflags.AddConfigFileFlag, diff --git a/cmd/opentelemetry-collector/cmd/collector/main.go b/cmd/opentelemetry-collector/cmd/collector/main.go index 5282fa48b18..cba02eb55a9 100644 --- a/cmd/opentelemetry-collector/cmd/collector/main.go +++ b/cmd/opentelemetry-collector/cmd/collector/main.go @@ -94,6 +94,7 @@ func main() { handleErr(err) } cmd := svc.Command() + // TODO add agent UDP processors flags jConfig.AddFlags(v, cmd, collectorApp.AddFlags, diff --git a/cmd/opentelemetry-collector/go.mod b/cmd/opentelemetry-collector/go.mod index 32687c0779f..d6bd9bd5c03 100644 --- a/cmd/opentelemetry-collector/go.mod +++ b/cmd/opentelemetry-collector/go.mod @@ -10,7 +10,7 @@ require ( github.com/Shopify/sarama v1.22.2-0.20190604114437-cd910a683f9f github.com/imdario/mergo v0.3.9 github.com/jaegertracing/jaeger v1.17.0 - github.com/open-telemetry/opentelemetry-collector v0.3.1-0.20200505202444-021607d68586 + github.com/open-telemetry/opentelemetry-collector v0.3.1-0.20200512031848-f588c89b4778 github.com/open-telemetry/opentelemetry-proto v0.3.0 github.com/spf13/pflag v1.0.5 github.com/spf13/viper v1.6.2 diff --git a/cmd/opentelemetry-collector/go.sum b/cmd/opentelemetry-collector/go.sum index 9b24761d417..d5a25b2aba9 100644 --- a/cmd/opentelemetry-collector/go.sum +++ b/cmd/opentelemetry-collector/go.sum @@ -954,6 +954,8 @@ github.com/onsi/gomega v1.9.0 h1:R1uwffexN6Pr340GtYRIdZmAiN4J+iw6WG4wog1DUXg= github.com/onsi/gomega v1.9.0/go.mod h1:Ho0h+IUsWyvy1OpqCwxlQ/21gkhVunqlU8fDGcoTdcA= github.com/open-telemetry/opentelemetry-collector v0.3.1-0.20200505202444-021607d68586 h1:HD0/Opa/WiWaG+B6LP+5XT9lYqpErWd6DjH3ah9mQrU= github.com/open-telemetry/opentelemetry-collector v0.3.1-0.20200505202444-021607d68586/go.mod h1:ht/uOm+HLoXWjhq6seX/BoTwlR0dtCKFe2Y23YRk2a4= +github.com/open-telemetry/opentelemetry-collector v0.3.1-0.20200512031848-f588c89b4778 h1:wcE5bWypMq95vlZGikUklYB6VPyg3pke/32nei5/Sww= +github.com/open-telemetry/opentelemetry-collector v0.3.1-0.20200512031848-f588c89b4778/go.mod h1:+q6GyqO1FHatlq93uplydIVjRhBrv04oQ23AbOqcn6E= github.com/open-telemetry/opentelemetry-proto v0.3.0 h1:+ASAtcayvoELyCF40+rdCMlBOhZIn5TPDez85zSYc30= github.com/open-telemetry/opentelemetry-proto v0.3.0/go.mod h1:PMR5GI0F7BSpio+rBGFxNm6SLzg3FypDTcFuQZnO+F8= github.com/opencontainers/go-digest v1.0.0-rc1/go.mod h1:cMLVZDEM3+U2I4VmLI6N8jQYUd2OVphdqWwCJHrFt2s= @@ -968,6 +970,8 @@ github.com/openzipkin/zipkin-go v0.2.1 h1:noL5/5Uf1HpVl3wNsfkZhIKbSWCVi5jgqkONNx github.com/openzipkin/zipkin-go v0.2.1/go.mod h1:NaW6tEwdmWMaCDZzg8sh+IBNOxHMPnhQw8ySjnjRyN4= github.com/orijtech/prometheus-go-metrics-exporter v0.0.4 h1:AAHKuTu2lX4eMPKV+CRcTug9KdMZ/Ckez7KK+ddEQfM= github.com/orijtech/prometheus-go-metrics-exporter v0.0.4/go.mod h1:BiTx/ugZex8LheBk3j53tktWaRdFjV5FCfT2o0P7msE= +github.com/orijtech/prometheus-go-metrics-exporter v0.0.5 h1:76JFgRIgNDA3pW1fUhmqinU2u5ndHv1gvapDfGG+7/c= +github.com/orijtech/prometheus-go-metrics-exporter v0.0.5/go.mod h1:BiTx/ugZex8LheBk3j53tktWaRdFjV5FCfT2o0P7msE= github.com/ory/dockertest v3.3.5+incompatible/go.mod h1:1vX4m9wsvi00u5bseYwXaSnhNrne+V0E6LAcBILJdPs= github.com/ory/dockertest/v3 v3.5.4/go.mod h1:J8ZUbNB2FOhm1cFZW9xBpDsODqsSWcyYgtJYVPcnF70= github.com/ory/fosite v0.29.0/go.mod h1:0atSZmXO7CAcs6NPMI/Qtot8tmZYj04Nddoold4S2h0=