Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master'
Browse files Browse the repository at this point in the history
Sync changes with master

Signed-off-by: Deepak <sah.sslpu@gmail.com>
  • Loading branch information
goku321 committed Sep 29, 2020
2 parents 932f6e0 + 55d1913 commit 72bd90f
Show file tree
Hide file tree
Showing 31 changed files with 1,306 additions and 851 deletions.
23 changes: 14 additions & 9 deletions cmd/ingester/app/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,20 @@ type Options struct {

// AddFlags adds flags for Builder
func AddFlags(flagSet *flag.FlagSet) {
flagSet.String(
ConfigPrefix+SuffixParallelism,
strconv.Itoa(DefaultParallelism),
"The number of messages to process in parallel")
flagSet.Duration(
ConfigPrefix+SuffixDeadlockInterval,
DefaultDeadlockInterval,
"Interval to check for deadlocks. If no messages gets processed in given time, ingester app will exit. Value of 0 disables deadlock check.")
AddOTELFlags(flagSet)
}

// AddOTELFlags adds only OTEL flags
func AddOTELFlags(flagSet *flag.FlagSet) {
// Authentication flags
flagSet.String(
KafkaConsumerConfigPrefix+SuffixBrokers,
DefaultBroker,
Expand All @@ -101,15 +115,6 @@ func AddFlags(flagSet *flag.FlagSet) {
KafkaConsumerConfigPrefix+SuffixEncoding,
DefaultEncoding,
fmt.Sprintf(`The encoding of spans ("%s") consumed from kafka`, strings.Join(kafka.AllEncodings, "\", \"")))
flagSet.String(
ConfigPrefix+SuffixParallelism,
strconv.Itoa(DefaultParallelism),
"The number of messages to process in parallel")
flagSet.Duration(
ConfigPrefix+SuffixDeadlockInterval,
DefaultDeadlockInterval,
"Interval to check for deadlocks. If no messages gets processed in given time, ingester app will exit. Value of 0 disables deadlock check.")
// Authentication flags
auth.AddFlags(KafkaConsumerConfigPrefix, flagSet)
}

Expand Down
27 changes: 12 additions & 15 deletions cmd/opentelemetry/app/defaultcomponents/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@ import (
"github.com/spf13/viper"
"go.opentelemetry.io/collector/component"
otelJaegerExporter "go.opentelemetry.io/collector/exporter/jaegerexporter"
otelKafkaExporter "go.opentelemetry.io/collector/exporter/kafkaexporter"
otelResourceProcessor "go.opentelemetry.io/collector/processor/resourceprocessor"
otelJaegerReceiver "go.opentelemetry.io/collector/receiver/jaegerreceiver"
otelKafkaReceiver "go.opentelemetry.io/collector/receiver/kafkareceiver"
otelZipkinReceiver "go.opentelemetry.io/collector/receiver/zipkinreceiver"
"go.opentelemetry.io/collector/service/defaultcomponents"

ingesterApp "github.com/jaegertracing/jaeger/cmd/ingester/app"
"github.com/jaegertracing/jaeger/cmd/opentelemetry/app/exporter/badgerexporter"
"github.com/jaegertracing/jaeger/cmd/opentelemetry/app/exporter/cassandraexporter"
"github.com/jaegertracing/jaeger/cmd/opentelemetry/app/exporter/elasticsearchexporter"
Expand All @@ -42,7 +43,6 @@ import (
cassandraStorage "github.com/jaegertracing/jaeger/plugin/storage/cassandra"
esStorage "github.com/jaegertracing/jaeger/plugin/storage/es"
grpcStorage "github.com/jaegertracing/jaeger/plugin/storage/grpc"
kafkaStorage "github.com/jaegertracing/jaeger/plugin/storage/kafka"
)

// Components creates default and Jaeger factories
Expand All @@ -51,11 +51,6 @@ func Components(v *viper.Viper) component.Factories {
// We have to add all storage flags to viper because any exporter can be specified in the OTEL config file.
// OTEL collector creates default configurations for all factories to verify they can be created.
addDefaultValuesToViper(v)
kafkaExp := &kafkaexporter.Factory{OptionsFactory: func() *kafkaStorage.Options {
opts := kafkaexporter.DefaultOptions()
opts.InitFromViper(v)
return opts
}}
cassandraExp := &cassandraexporter.Factory{OptionsFactory: func() *cassandraStorage.Options {
opts := cassandraexporter.DefaultOptions()
opts.InitFromViper(v)
Expand All @@ -77,21 +72,22 @@ func Components(v *viper.Viper) component.Factories {
opts.InitFromViper(v)
return opts
})
kafkaRec := &kafkareceiver.Factory{OptionsFactory: func() *ingesterApp.Options {
opts := kafkareceiver.DefaultOptions()
opts.InitFromViper(v)
return opts
}}

factories, _ := defaultcomponents.Components()
factories.Exporters[kafkaExp.Type()] = kafkaExp
factories.Exporters[cassandraExp.Type()] = cassandraExp
factories.Exporters[esExp.Type()] = esExp
factories.Exporters[grpcExp.Type()] = grpcExp
factories.Exporters[memoryExp.Type()] = memoryExp
factories.Exporters[badgerExp.Type()] = badgerExp
factories.Receivers[kafkaRec.Type()] = kafkaRec

factories.Receivers[kafkareceiver.TypeStr] = &kafkareceiver.Factory{
Wrapped: otelKafkaReceiver.NewFactory(),
Viper: v,
}
factories.Exporters[kafkaexporter.TypeStr] = &kafkaexporter.Factory{
Wrapped: otelKafkaExporter.NewFactory(),
Viper: v,
}
factories.Receivers["jaeger"] = &jaegerreceiver.Factory{
Wrapped: otelJaegerReceiver.NewFactory(),
Viper: v,
Expand All @@ -115,7 +111,8 @@ func Components(v *viper.Viper) component.Factories {
// addDefaultValuesToViper adds Jaeger storage flags to viper to make the default values available.
func addDefaultValuesToViper(v *viper.Viper) {
flagSet := &flag.FlagSet{}
kafkaexporter.DefaultOptions().AddFlags(flagSet)
kafkareceiver.AddFlags(flagSet)
kafkaexporter.AddFlags(flagSet)
elasticsearchexporter.DefaultOptions().AddFlags(flagSet)
cassandraexporter.DefaultOptions().AddFlags(flagSet)
pflagSet := &pflag.FlagSet{}
Expand Down
4 changes: 0 additions & 4 deletions cmd/opentelemetry/app/defaultcomponents/defaults_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (

func TestComponents(t *testing.T) {
v, _ := jConfig.Viperize(
kafkaexporter.DefaultOptions().AddFlags,
cassandraexporter.DefaultOptions().AddFlags,
elasticsearchexporter.DefaultOptions().AddFlags,
)
Expand All @@ -50,9 +49,6 @@ func TestComponents(t *testing.T) {
assert.IsType(t, &kafkareceiver.Factory{}, factories.Receivers[kafkareceiver.TypeStr])
assert.IsType(t, &zipkinreceiver.Factory{}, factories.Receivers["zipkin"])

kafkaFactory := factories.Exporters[kafkaexporter.TypeStr]
kc := kafkaFactory.CreateDefaultConfig().(*kafkaexporter.Config)
assert.Equal(t, []string{"127.0.0.1:9092"}, kc.Config.Brokers)
cassandraFactory := factories.Exporters[cassandraexporter.TypeStr]
cc := cassandraFactory.CreateDefaultConfig().(*cassandraexporter.Config)
assert.Equal(t, []string{"127.0.0.1"}, cc.Options.GetPrimary().Servers)
Expand Down
4 changes: 2 additions & 2 deletions cmd/opentelemetry/app/defaultconfig/default_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func createProcessors(factories component.Factories) (configmodels.Processors, [

func createReceivers(component ComponentType, factories component.Factories) configmodels.Receivers {
if component == Ingester {
kafkaReceiver := factories.Receivers[kafkareceiver.TypeStr].CreateDefaultConfig().(*kafkareceiver.Config)
kafkaReceiver := factories.Receivers[kafkareceiver.TypeStr].CreateDefaultConfig()
return configmodels.Receivers{
kafkaReceiver.Name(): kafkaReceiver,
}
Expand Down Expand Up @@ -194,7 +194,7 @@ func createExporters(component ComponentType, storageTypes string, factories com
exporters[elasticsearchexporter.TypeStr] = es
case "kafka":
kaf := factories.Exporters[kafkaexporter.TypeStr].CreateDefaultConfig()
exporters[kafkaexporter.TypeStr] = kaf
exporters["kafka"] = kaf
case "grpc-plugin":
grpcEx := factories.Exporters[grpcpluginexporter.TypeStr].CreateDefaultConfig()
exporters[grpcpluginexporter.TypeStr] = grpcEx
Expand Down
27 changes: 0 additions & 27 deletions cmd/opentelemetry/app/exporter/kafkaexporter/config.go

This file was deleted.

94 changes: 0 additions & 94 deletions cmd/opentelemetry/app/exporter/kafkaexporter/config_test.go

This file was deleted.

34 changes: 0 additions & 34 deletions cmd/opentelemetry/app/exporter/kafkaexporter/exporter.go

This file was deleted.

Loading

0 comments on commit 72bd90f

Please sign in to comment.