Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add zipkin receiver to OTEL collector #2181

Merged
merged 4 commits into from
Apr 16, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 24 additions & 11 deletions cmd/opentelemetry-collector/app/defaults/default_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,45 +23,52 @@ import (
"github.com/open-telemetry/opentelemetry-collector/processor/batchprocessor"
"github.com/open-telemetry/opentelemetry-collector/receiver"
"github.com/open-telemetry/opentelemetry-collector/receiver/jaegerreceiver"
"github.com/open-telemetry/opentelemetry-collector/receiver/zipkinreceiver"

"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/cassandra"
"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/elasticsearch"
"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/kafka"
"github.com/jaegertracing/jaeger/ports"
)

// Config creates default configuration.
// It enables default Jaeger receivers, processors and exporters.
func Config(storageType string, factories config.Factories) (*configmodels.Config, error) {
func Config(storageType string, zipkinHostPort string, factories config.Factories) (*configmodels.Config, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be better to use an interface so can just add extra flag fields if/when supported?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interface perhaps no but options/params struct could be used. I would wait and see until the number of parameters grows and then refactor. Three params still look good to me.

exporters, err := createExporters(storageType, factories)
if err != nil {
return nil, err
}
types := []string{}
expTypes := []string{}
for _, v := range exporters {
types = append(types, v.Type())
expTypes = append(expTypes, v.Type())
}
receivers := createReceivers(zipkinHostPort, factories)
recTypes := []string{}
for _, v := range receivers {
recTypes = append(recTypes, v.Type())
}
return &configmodels.Config{
Receivers: createReceivers(factories),
Receivers: receivers,
Exporters: exporters,
Processors: createProcessors(factories),
Service: configmodels.Service{
Pipelines: map[string]*configmodels.Pipeline{
"traces": {
InputType: configmodels.TracesDataType,
Receivers: []string{"jaeger"},
Exporters: types,
Receivers: recTypes,
Exporters: expTypes,
Processors: []string{"batch"},
},
},
},
}, nil
}

func createReceivers(factories config.Factories) configmodels.Receivers {
rec := factories.Receivers["jaeger"].CreateDefaultConfig().(*jaegerreceiver.Config)
func createReceivers(zipkinHostPort string, factories config.Factories) configmodels.Receivers {
jaeger := factories.Receivers["jaeger"].CreateDefaultConfig().(*jaegerreceiver.Config)
// TODO load and serve sampling strategies
// TODO bind sampling strategies file
rec.Protocols = map[string]*receiver.SecureReceiverSettings{
jaeger.Protocols = map[string]*receiver.SecureReceiverSettings{
"grpc": {
ReceiverSettings: configmodels.ReceiverSettings{
Endpoint: "localhost:14250",
Expand All @@ -83,9 +90,15 @@ func createReceivers(factories config.Factories) configmodels.Receivers {
},
},
}
return map[string]configmodels.Receiver{
"jaeger": rec,
recvs := map[string]configmodels.Receiver{
"jaeger": jaeger,
}
if zipkinHostPort != ports.PortToHostPort(0) {
zipkin := factories.Receivers["zipkin"].CreateDefaultConfig().(*zipkinreceiver.Config)
zipkin.Endpoint = zipkinHostPort
recvs["zipkin"] = zipkin
}
return recvs
}

func createExporters(storageTypes string, factories config.Factories) (configmodels.Exporters, error) {
Expand Down
48 changes: 34 additions & 14 deletions cmd/opentelemetry-collector/app/defaults/default_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,23 @@ import (
"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/cassandra"
"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/elasticsearch"
"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/kafka"
"github.com/jaegertracing/jaeger/ports"
)

func TestDefaultConfig(t *testing.T) {
factories := Components(viper.New())
disabledHostPort := ports.PortToHostPort(0)
tests := []struct {
storageType string
exporterTypes []string
pipeline map[string]*configmodels.Pipeline
err string
storageType string
zipkinHostPort string
exporterTypes []string
pipeline map[string]*configmodels.Pipeline
err string
}{
{
storageType: "elasticsearch",
exporterTypes: []string{elasticsearch.TypeStr},
storageType: "elasticsearch",
zipkinHostPort: disabledHostPort,
exporterTypes: []string{elasticsearch.TypeStr},
pipeline: map[string]*configmodels.Pipeline{
"traces": {
InputType: configmodels.TracesDataType,
Expand All @@ -51,8 +55,9 @@ func TestDefaultConfig(t *testing.T) {
},
},
{
storageType: "cassandra",
exporterTypes: []string{cassandra.TypeStr},
storageType: "cassandra",
zipkinHostPort: disabledHostPort,
exporterTypes: []string{cassandra.TypeStr},
pipeline: map[string]*configmodels.Pipeline{
"traces": {
InputType: configmodels.TracesDataType,
Expand All @@ -63,8 +68,9 @@ func TestDefaultConfig(t *testing.T) {
},
},
{
storageType: "kafka",
exporterTypes: []string{kafka.TypeStr},
storageType: "kafka",
zipkinHostPort: disabledHostPort,
exporterTypes: []string{kafka.TypeStr},
pipeline: map[string]*configmodels.Pipeline{
"traces": {
InputType: configmodels.TracesDataType,
Expand All @@ -75,8 +81,9 @@ func TestDefaultConfig(t *testing.T) {
},
},
{
storageType: "cassandra,elasticsearch",
exporterTypes: []string{cassandra.TypeStr, elasticsearch.TypeStr},
storageType: "cassandra,elasticsearch",
zipkinHostPort: disabledHostPort,
exporterTypes: []string{cassandra.TypeStr, elasticsearch.TypeStr},
pipeline: map[string]*configmodels.Pipeline{
"traces": {
InputType: configmodels.TracesDataType,
Expand All @@ -86,14 +93,27 @@ func TestDefaultConfig(t *testing.T) {
},
},
},
{
storageType: "cassandra",
zipkinHostPort: ":9411",
exporterTypes: []string{cassandra.TypeStr},
pipeline: map[string]*configmodels.Pipeline{
"traces": {
InputType: configmodels.TracesDataType,
Receivers: []string{"jaeger", "zipkin"},
Exporters: []string{cassandra.TypeStr},
Processors: []string{"batch"},
},
},
},
{
storageType: "floppy",
err: "unknown storage type: floppy",
},
}
for _, test := range tests {
t.Run(test.storageType, func(t *testing.T) {
cfg, err := Config(test.storageType, factories)
cfg, err := Config(test.storageType, test.zipkinHostPort, factories)
if test.err != "" {
require.Nil(t, cfg)
assert.EqualError(t, err, test.err)
Expand All @@ -102,7 +122,7 @@ func TestDefaultConfig(t *testing.T) {
require.NoError(t, err)
require.NoError(t, config.ValidateConfig(cfg, zap.NewNop()))

assert.Equal(t, 1, len(cfg.Receivers))
assert.Equal(t, len(test.pipeline["traces"].Receivers), len(cfg.Receivers))
assert.Equal(t, "jaeger", cfg.Receivers["jaeger"].Name())
assert.Equal(t, 1, len(cfg.Processors))
assert.Equal(t, "batch", cfg.Processors["batch"].Name())
Expand Down
18 changes: 18 additions & 0 deletions cmd/opentelemetry-collector/app/defaults/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@
package defaults

import (
"flag"

"github.com/open-telemetry/opentelemetry-collector/config"
"github.com/open-telemetry/opentelemetry-collector/service/defaultcomponents"
"github.com/spf13/pflag"
"github.com/spf13/viper"

"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/cassandra"
Expand All @@ -29,6 +32,10 @@ import (

// Components creates default and Jaeger factories
func Components(v *viper.Viper) config.Factories {
// Add flags to viper to make the default values available.
// We have to add all storage flags to viper because any exporter can be specified in the OTEL config file.
// OTEL collector creates default configurations for all factories to verify they can be created.
addDefaultValuesToViper(v)
kafkaExp := kafka.Factory{OptionsFactory: func() *storageKafka.Options {
opts := kafka.DefaultOptions()
opts.InitFromViper(v)
Expand All @@ -51,3 +58,14 @@ func Components(v *viper.Viper) config.Factories {
factories.Exporters[esExp.Type()] = esExp
return factories
}

// addDefaultValuesToViper adds Jaeger storage flags to viper to make the default values available.
func addDefaultValuesToViper(v *viper.Viper) {
flagSet := &flag.FlagSet{}
kafka.DefaultOptions().AddFlags(flagSet)
elasticsearch.DefaultOptions().AddFlags(flagSet)
cassandra.DefaultOptions().AddFlags(flagSet)
pflagSet := &pflag.FlagSet{}
pflagSet.AddGoFlagSet(flagSet)
v.BindPFlags(pflagSet)
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,6 @@ func (Factory) CreateTraceExporter(log *zap.Logger, cfg configmodels.Exporter) (

// CreateMetricsExporter is not implemented.
// This function implements OTEL component.Factory interface.
func (Factory) CreateMetricsExporter(*zap.Logger, configmodels.Exporter) (component.MetricsExporter, error) {
func (Factory) CreateMetricsExporter(*zap.Logger, configmodels.Exporter) (component.MetricsExporterOld, error) {
return nil, configerror.ErrDataTypeIsNotSupported
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,6 @@ func (Factory) CreateTraceExporter(log *zap.Logger, cfg configmodels.Exporter) (

// CreateMetricsExporter is not implemented.
// This function implements OTEL exporter.Factory interface.
func (Factory) CreateMetricsExporter(*zap.Logger, configmodels.Exporter) (component.MetricsExporter, error) {
func (Factory) CreateMetricsExporter(*zap.Logger, configmodels.Exporter) (component.MetricsExporterOld, error) {
return nil, configerror.ErrDataTypeIsNotSupported
}
2 changes: 1 addition & 1 deletion cmd/opentelemetry-collector/app/exporter/kafka/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,6 @@ func (Factory) CreateTraceExporter(log *zap.Logger, cfg configmodels.Exporter) (

// CreateMetricsExporter is not implemented.
// This function implements OTEL component.Factory interface.
func (Factory) CreateMetricsExporter(*zap.Logger, configmodels.Exporter) (component.TraceExporterOld, error) {
func (Factory) CreateMetricsExporter(*zap.Logger, configmodels.Exporter) (component.MetricsExporterOld, error) {
return nil, configerror.ErrDataTypeIsNotSupported
}
6 changes: 5 additions & 1 deletion cmd/opentelemetry-collector/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/open-telemetry/opentelemetry-collector/service/builder"
"github.com/spf13/viper"

collectorApp "github.com/jaegertracing/jaeger/cmd/collector/app"
jflags "github.com/jaegertracing/jaeger/cmd/flags"
"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/defaults"
"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/cassandra"
Expand Down Expand Up @@ -62,7 +63,9 @@ func main() {
if getOTELConfigFile() == "" {
log.Println("Config file not provided, installing default Jaeger components")
cfgFactory = func(*viper.Viper, config.Factories) (*configmodels.Config, error) {
return defaults.Config(storageType, cmpts)
collectorOpts := &collectorApp.CollectorOptions{}
collectorOpts.InitFromViper(v)
return defaults.Config(storageType, collectorOpts.CollectorZipkinHTTPHostPort, cmpts)
}
}

Expand All @@ -82,6 +85,7 @@ func main() {
cmd := svc.Command()
jconfig.AddFlags(v,
cmd,
collectorApp.AddFlags,
jflags.AddConfigFileFlag,
storageFlags,
)
Expand Down