Skip to content

Commit

Permalink
Add zipkin receiver to OTEL collector (#2181)
Browse files Browse the repository at this point in the history
* Add zipkin receiver to OTEL collector

Signed-off-by: Pavol Loffay <ploffay@redhat.com>

* fmt

Signed-off-by: Pavol Loffay <ploffay@redhat.com>

* Fix tests

Signed-off-by: Pavol Loffay <ploffay@redhat.com>

* Use host port instead of const

Signed-off-by: Pavol Loffay <ploffay@redhat.com>
  • Loading branch information
pavolloffay authored Apr 16, 2020
1 parent 72f5a6b commit d75eb14
Show file tree
Hide file tree
Showing 7 changed files with 84 additions and 29 deletions.
35 changes: 24 additions & 11 deletions cmd/opentelemetry-collector/app/defaults/default_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,45 +23,52 @@ import (
"github.com/open-telemetry/opentelemetry-collector/processor/batchprocessor"
"github.com/open-telemetry/opentelemetry-collector/receiver"
"github.com/open-telemetry/opentelemetry-collector/receiver/jaegerreceiver"
"github.com/open-telemetry/opentelemetry-collector/receiver/zipkinreceiver"

"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/cassandra"
"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/elasticsearch"
"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/kafka"
"github.com/jaegertracing/jaeger/ports"
)

// Config creates default configuration.
// It enables default Jaeger receivers, processors and exporters.
func Config(storageType string, factories config.Factories) (*configmodels.Config, error) {
func Config(storageType string, zipkinHostPort string, factories config.Factories) (*configmodels.Config, error) {
exporters, err := createExporters(storageType, factories)
if err != nil {
return nil, err
}
types := []string{}
expTypes := []string{}
for _, v := range exporters {
types = append(types, v.Type())
expTypes = append(expTypes, v.Type())
}
receivers := createReceivers(zipkinHostPort, factories)
recTypes := []string{}
for _, v := range receivers {
recTypes = append(recTypes, v.Type())
}
return &configmodels.Config{
Receivers: createReceivers(factories),
Receivers: receivers,
Exporters: exporters,
Processors: createProcessors(factories),
Service: configmodels.Service{
Pipelines: map[string]*configmodels.Pipeline{
"traces": {
InputType: configmodels.TracesDataType,
Receivers: []string{"jaeger"},
Exporters: types,
Receivers: recTypes,
Exporters: expTypes,
Processors: []string{"batch"},
},
},
},
}, nil
}

func createReceivers(factories config.Factories) configmodels.Receivers {
rec := factories.Receivers["jaeger"].CreateDefaultConfig().(*jaegerreceiver.Config)
func createReceivers(zipkinHostPort string, factories config.Factories) configmodels.Receivers {
jaeger := factories.Receivers["jaeger"].CreateDefaultConfig().(*jaegerreceiver.Config)
// TODO load and serve sampling strategies
// TODO bind sampling strategies file
rec.Protocols = map[string]*receiver.SecureReceiverSettings{
jaeger.Protocols = map[string]*receiver.SecureReceiverSettings{
"grpc": {
ReceiverSettings: configmodels.ReceiverSettings{
Endpoint: "localhost:14250",
Expand All @@ -83,9 +90,15 @@ func createReceivers(factories config.Factories) configmodels.Receivers {
},
},
}
return map[string]configmodels.Receiver{
"jaeger": rec,
recvs := map[string]configmodels.Receiver{
"jaeger": jaeger,
}
if zipkinHostPort != ports.PortToHostPort(0) {
zipkin := factories.Receivers["zipkin"].CreateDefaultConfig().(*zipkinreceiver.Config)
zipkin.Endpoint = zipkinHostPort
recvs["zipkin"] = zipkin
}
return recvs
}

func createExporters(storageTypes string, factories config.Factories) (configmodels.Exporters, error) {
Expand Down
48 changes: 34 additions & 14 deletions cmd/opentelemetry-collector/app/defaults/default_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,23 @@ import (
"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/cassandra"
"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/elasticsearch"
"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/kafka"
"github.com/jaegertracing/jaeger/ports"
)

func TestDefaultConfig(t *testing.T) {
factories := Components(viper.New())
disabledHostPort := ports.PortToHostPort(0)
tests := []struct {
storageType string
exporterTypes []string
pipeline map[string]*configmodels.Pipeline
err string
storageType string
zipkinHostPort string
exporterTypes []string
pipeline map[string]*configmodels.Pipeline
err string
}{
{
storageType: "elasticsearch",
exporterTypes: []string{elasticsearch.TypeStr},
storageType: "elasticsearch",
zipkinHostPort: disabledHostPort,
exporterTypes: []string{elasticsearch.TypeStr},
pipeline: map[string]*configmodels.Pipeline{
"traces": {
InputType: configmodels.TracesDataType,
Expand All @@ -51,8 +55,9 @@ func TestDefaultConfig(t *testing.T) {
},
},
{
storageType: "cassandra",
exporterTypes: []string{cassandra.TypeStr},
storageType: "cassandra",
zipkinHostPort: disabledHostPort,
exporterTypes: []string{cassandra.TypeStr},
pipeline: map[string]*configmodels.Pipeline{
"traces": {
InputType: configmodels.TracesDataType,
Expand All @@ -63,8 +68,9 @@ func TestDefaultConfig(t *testing.T) {
},
},
{
storageType: "kafka",
exporterTypes: []string{kafka.TypeStr},
storageType: "kafka",
zipkinHostPort: disabledHostPort,
exporterTypes: []string{kafka.TypeStr},
pipeline: map[string]*configmodels.Pipeline{
"traces": {
InputType: configmodels.TracesDataType,
Expand All @@ -75,8 +81,9 @@ func TestDefaultConfig(t *testing.T) {
},
},
{
storageType: "cassandra,elasticsearch",
exporterTypes: []string{cassandra.TypeStr, elasticsearch.TypeStr},
storageType: "cassandra,elasticsearch",
zipkinHostPort: disabledHostPort,
exporterTypes: []string{cassandra.TypeStr, elasticsearch.TypeStr},
pipeline: map[string]*configmodels.Pipeline{
"traces": {
InputType: configmodels.TracesDataType,
Expand All @@ -86,14 +93,27 @@ func TestDefaultConfig(t *testing.T) {
},
},
},
{
storageType: "cassandra",
zipkinHostPort: ":9411",
exporterTypes: []string{cassandra.TypeStr},
pipeline: map[string]*configmodels.Pipeline{
"traces": {
InputType: configmodels.TracesDataType,
Receivers: []string{"jaeger", "zipkin"},
Exporters: []string{cassandra.TypeStr},
Processors: []string{"batch"},
},
},
},
{
storageType: "floppy",
err: "unknown storage type: floppy",
},
}
for _, test := range tests {
t.Run(test.storageType, func(t *testing.T) {
cfg, err := Config(test.storageType, factories)
cfg, err := Config(test.storageType, test.zipkinHostPort, factories)
if test.err != "" {
require.Nil(t, cfg)
assert.EqualError(t, err, test.err)
Expand All @@ -102,7 +122,7 @@ func TestDefaultConfig(t *testing.T) {
require.NoError(t, err)
require.NoError(t, config.ValidateConfig(cfg, zap.NewNop()))

assert.Equal(t, 1, len(cfg.Receivers))
assert.Equal(t, len(test.pipeline["traces"].Receivers), len(cfg.Receivers))
assert.Equal(t, "jaeger", cfg.Receivers["jaeger"].Name())
assert.Equal(t, 1, len(cfg.Processors))
assert.Equal(t, "batch", cfg.Processors["batch"].Name())
Expand Down
18 changes: 18 additions & 0 deletions cmd/opentelemetry-collector/app/defaults/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@
package defaults

import (
"flag"

"github.com/open-telemetry/opentelemetry-collector/config"
"github.com/open-telemetry/opentelemetry-collector/service/defaultcomponents"
"github.com/spf13/pflag"
"github.com/spf13/viper"

"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/cassandra"
Expand All @@ -29,6 +32,10 @@ import (

// Components creates default and Jaeger factories
func Components(v *viper.Viper) config.Factories {
// Add flags to viper to make the default values available.
// We have to add all storage flags to viper because any exporter can be specified in the OTEL config file.
// OTEL collector creates default configurations for all factories to verify they can be created.
addDefaultValuesToViper(v)
kafkaExp := kafka.Factory{OptionsFactory: func() *storageKafka.Options {
opts := kafka.DefaultOptions()
opts.InitFromViper(v)
Expand All @@ -51,3 +58,14 @@ func Components(v *viper.Viper) config.Factories {
factories.Exporters[esExp.Type()] = esExp
return factories
}

// addDefaultValuesToViper adds Jaeger storage flags to viper to make the default values available.
func addDefaultValuesToViper(v *viper.Viper) {
flagSet := &flag.FlagSet{}
kafka.DefaultOptions().AddFlags(flagSet)
elasticsearch.DefaultOptions().AddFlags(flagSet)
cassandra.DefaultOptions().AddFlags(flagSet)
pflagSet := &pflag.FlagSet{}
pflagSet.AddGoFlagSet(flagSet)
v.BindPFlags(pflagSet)
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,6 @@ func (Factory) CreateTraceExporter(log *zap.Logger, cfg configmodels.Exporter) (

// CreateMetricsExporter is not implemented.
// This function implements OTEL component.Factory interface.
func (Factory) CreateMetricsExporter(*zap.Logger, configmodels.Exporter) (component.MetricsExporter, error) {
func (Factory) CreateMetricsExporter(*zap.Logger, configmodels.Exporter) (component.MetricsExporterOld, error) {
return nil, configerror.ErrDataTypeIsNotSupported
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,6 @@ func (Factory) CreateTraceExporter(log *zap.Logger, cfg configmodels.Exporter) (

// CreateMetricsExporter is not implemented.
// This function implements OTEL exporter.Factory interface.
func (Factory) CreateMetricsExporter(*zap.Logger, configmodels.Exporter) (component.MetricsExporter, error) {
func (Factory) CreateMetricsExporter(*zap.Logger, configmodels.Exporter) (component.MetricsExporterOld, error) {
return nil, configerror.ErrDataTypeIsNotSupported
}
2 changes: 1 addition & 1 deletion cmd/opentelemetry-collector/app/exporter/kafka/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,6 @@ func (Factory) CreateTraceExporter(log *zap.Logger, cfg configmodels.Exporter) (

// CreateMetricsExporter is not implemented.
// This function implements OTEL component.Factory interface.
func (Factory) CreateMetricsExporter(*zap.Logger, configmodels.Exporter) (component.TraceExporterOld, error) {
func (Factory) CreateMetricsExporter(*zap.Logger, configmodels.Exporter) (component.MetricsExporterOld, error) {
return nil, configerror.ErrDataTypeIsNotSupported
}
6 changes: 5 additions & 1 deletion cmd/opentelemetry-collector/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/open-telemetry/opentelemetry-collector/service/builder"
"github.com/spf13/viper"

collectorApp "github.com/jaegertracing/jaeger/cmd/collector/app"
jflags "github.com/jaegertracing/jaeger/cmd/flags"
"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/defaults"
"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/cassandra"
Expand Down Expand Up @@ -62,7 +63,9 @@ func main() {
if getOTELConfigFile() == "" {
log.Println("Config file not provided, installing default Jaeger components")
cfgFactory = func(*viper.Viper, config.Factories) (*configmodels.Config, error) {
return defaults.Config(storageType, cmpts)
collectorOpts := &collectorApp.CollectorOptions{}
collectorOpts.InitFromViper(v)
return defaults.Config(storageType, collectorOpts.CollectorZipkinHTTPHostPort, cmpts)
}
}

Expand All @@ -82,6 +85,7 @@ func main() {
cmd := svc.Command()
jconfig.AddFlags(v,
cmd,
collectorApp.AddFlags,
jflags.AddConfigFileFlag,
storageFlags,
)
Expand Down

0 comments on commit d75eb14

Please sign in to comment.