Skip to content

Commit

Permalink
Add Kafka OTEL receiver/ingester (jaegertracing#2221)
Browse files Browse the repository at this point in the history
* Add Kafka OTEL receiver/ingester

Signed-off-by: Pavol Loffay <ploffay@redhat.com>

* Fix review

Signed-off-by: Pavol Loffay <ploffay@redhat.com>

* Use tls.cert from flags

Signed-off-by: Pavol Loffay <ploffay@redhat.com>
  • Loading branch information
pavolloffay committed May 6, 2020
1 parent 6cdaea8 commit ef8205d
Show file tree
Hide file tree
Showing 21 changed files with 629 additions and 54 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,5 @@ crossdock/crossdock-*
run-crossdock.log
cmd/opentelemetry-collector/cmd/collector/opentelemetry-collector-*
cmd/opentelemetry-collector/cmd/agent/opentelemetry-agent-*
cmd/opentelemetry-collector/cmd/ingester/opentelemetry-ingester-*
__pycache__
9 changes: 9 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,14 @@ else
cd ${OTEL_COLLECTOR_DIR}/cmd/agent && $(GOBUILD) -o ./opentelemetry-agent-$(GOOS) $(BUILD_INFO) main.go
endif

.PHONY: build-otel-ingester
build-otel-ingester:
ifeq ($(GOARCH), s390x)
cd ${OTEL_COLLECTOR_DIR}/cmd/ingester && $(GOBUILD) -o ./opentelemetry-ingester-$(GOOS)-$(GOARCH) $(BUILD_INFO) main.go
else
cd ${OTEL_COLLECTOR_DIR}/cmd/ingester && $(GOBUILD) -o ./opentelemetry-ingester-$(GOOS) $(BUILD_INFO) main.go
endif

.PHONY: build-ingester
build-ingester:
ifeq ($(GOARCH), s390x)
Expand Down Expand Up @@ -324,6 +332,7 @@ docker-images-jaeger-backend:
done
docker build -t $(DOCKER_NAMESPACE)/jaeger-opentelemetry-collector:${DOCKER_TAG} -f ${OTEL_COLLECTOR_DIR}/cmd/collector/Dockerfile cmd/opentelemetry-collector/cmd/collector
docker build -t $(DOCKER_NAMESPACE)/jaeger-opentelemetry-agent:${DOCKER_TAG} -f ${OTEL_COLLECTOR_DIR}/cmd/agent/Dockerfile cmd/opentelemetry-collector/cmd/agent
docker build -t $(DOCKER_NAMESPACE)/jaeger-opentelemetry-ingeser:${DOCKER_TAG} -f ${OTEL_COLLECTOR_DIR}/cmd/ingester/Dockerfile cmd/opentelemetry-collector/cmd/ingester

.PHONY: docker-images-tracegen
docker-images-tracegen:
Expand Down
8 changes: 4 additions & 4 deletions cmd/ingester/app/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,10 @@ const (

// Options stores the configuration options for the Ingester
type Options struct {
kafkaConsumer.Configuration
Parallelism int
Encoding string
DeadlockInterval time.Duration
kafkaConsumer.Configuration `mapstructure:",squash"`
Parallelism int `mapstructure:"parallelism"`
Encoding string `mapstructure:"encoding"`
DeadlockInterval time.Duration `mapstructure:"deadlock_interval"`
}

// AddFlags adds flags for Builder
Expand Down
35 changes: 35 additions & 0 deletions cmd/opentelemetry-collector/app/defaults/default_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,41 @@ func createAgentReceivers(factories config.Factories) configmodels.Receivers {
return recvs
}

// IngesterConfig creates default ingester configuration.
// It enables Jaeger kafka receiver and storage backend.
func IngesterConfig(storageType string, factories config.Factories) (*configmodels.Config, error) {
exporters, err := createExporters(storageType, factories)
if err != nil {
return nil, err
}
processors := configmodels.Processors{}
resProcessor := factories.Processors["resource"].CreateDefaultConfig().(*resourceprocessor.Config)
if len(resProcessor.Labels) > 0 {
processors[resProcessor.Name()] = resProcessor
}
receivers := configmodels.Receivers{}
kafkaReceiver := factories.Receivers[kafka.TypeStr].CreateDefaultConfig().(*resourceprocessor.Config)
receivers[kafkaReceiver.Name()] = kafkaReceiver
hc := factories.Extensions["health_check"].CreateDefaultConfig()
return &configmodels.Config{
Receivers: receivers,
Processors: processors,
Exporters: exporters,
Extensions: configmodels.Extensions{"health_check": hc},
Service: configmodels.Service{
Extensions: []string{"health_check"},
Pipelines: configmodels.Pipelines{
"traces": {
InputType: configmodels.TracesDataType,
Receivers: receiverNames(receivers),
Processors: processorNames(processors),
Exporters: exporterNames(exporters),
},
},
},
}, nil
}

func receiverNames(receivers configmodels.Receivers) []string {
var names []string
for _, v := range receivers {
Expand Down
14 changes: 11 additions & 3 deletions cmd/opentelemetry-collector/app/defaults/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@ import (
"github.com/spf13/pflag"
"github.com/spf13/viper"

ingesterApp "github.com/jaegertracing/jaeger/cmd/ingester/app"
"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/cassandra"
"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/elasticsearch"
"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/jaegerexporter"
"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/kafka"
"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/processor/resourceprocessor"
"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/receiver/jaegerreceiver"
kafkaRec "github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/receiver/kafka"
storageCassandra "github.com/jaegertracing/jaeger/plugin/storage/cassandra"
storageEs "github.com/jaegertracing/jaeger/plugin/storage/es"
storageKafka "github.com/jaegertracing/jaeger/plugin/storage/kafka"
Expand All @@ -42,26 +44,32 @@ func Components(v *viper.Viper) config.Factories {
// We have to add all storage flags to viper because any exporter can be specified in the OTEL config file.
// OTEL collector creates default configurations for all factories to verify they can be created.
addDefaultValuesToViper(v)
kafkaExp := kafka.Factory{OptionsFactory: func() *storageKafka.Options {
kafkaExp := &kafka.Factory{OptionsFactory: func() *storageKafka.Options {
opts := kafka.DefaultOptions()
opts.InitFromViper(v)
return opts
}}
cassandraExp := cassandra.Factory{OptionsFactory: func() *storageCassandra.Options {
cassandraExp := &cassandra.Factory{OptionsFactory: func() *storageCassandra.Options {
opts := cassandra.DefaultOptions()
opts.InitFromViper(v)
return opts
}}
esExp := elasticsearch.Factory{OptionsFactory: func() *storageEs.Options {
esExp := &elasticsearch.Factory{OptionsFactory: func() *storageEs.Options {
opts := elasticsearch.DefaultOptions()
opts.InitFromViper(v)
return opts
}}
kafkaRec := &kafkaRec.Factory{OptionsFactory: func() *ingesterApp.Options {
opts := kafkaRec.DefaultOptions()
opts.InitFromViper(v)
return opts
}}

factories, _ := defaultcomponents.Components()
factories.Exporters[kafkaExp.Type()] = kafkaExp
factories.Exporters[cassandraExp.Type()] = cassandraExp
factories.Exporters[esExp.Type()] = esExp
factories.Receivers[kafkaRec.Type()] = kafkaRec

jaegerRec := factories.Receivers["jaeger"].(*otelJaegerReceiver.Factory)
factories.Receivers["jaeger"] = &jaegerreceiver.Factory{
Expand Down
16 changes: 7 additions & 9 deletions cmd/opentelemetry-collector/app/defaults/defaults_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,14 @@ package defaults
import (
"testing"

"github.com/open-telemetry/opentelemetry-collector/config/configmodels"
"github.com/stretchr/testify/assert"

"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/cassandra"
"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/elasticsearch"
"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/jaegerexporter"
"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/kafka"
"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/processor/resourceprocessor"
"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/receiver/jaegerreceiver"
kafkaRec "github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/receiver/kafka"
jConfig "github.com/jaegertracing/jaeger/pkg/config"
)

Expand All @@ -36,21 +35,20 @@ func TestComponents(t *testing.T) {
elasticsearch.DefaultOptions().AddFlags,
)
factories := Components(v)
assert.Equal(t, configmodels.Type("jaeger_kafka"), factories.Exporters[kafka.TypeStr].Type())
assert.Equal(t, configmodels.Type("jaeger_cassandra"), factories.Exporters[cassandra.TypeStr].Type())
assert.Equal(t, configmodels.Type("jaeger_elasticsearch"), factories.Exporters[elasticsearch.TypeStr].Type())
assert.IsType(t, &kafka.Factory{}, factories.Exporters[kafka.TypeStr])
assert.IsType(t, &cassandra.Factory{}, factories.Exporters[cassandra.TypeStr])
assert.IsType(t, &elasticsearch.Factory{}, factories.Exporters[elasticsearch.TypeStr])
assert.IsType(t, &jaegerreceiver.Factory{}, factories.Receivers["jaeger"])
assert.IsType(t, &jaegerexporter.Factory{}, factories.Exporters["jaeger"])
assert.IsType(t, &kafkaRec.Factory{}, factories.Receivers[kafkaRec.TypeStr])

kafkaFactory := factories.Exporters[kafka.TypeStr]
kc := kafkaFactory.CreateDefaultConfig().(*kafka.Config)
assert.Equal(t, []string{"127.0.0.1:9092"}, kc.Config.Brokers)

cassandraFactory := factories.Exporters[cassandra.TypeStr]
cc := cassandraFactory.CreateDefaultConfig().(*cassandra.Config)
assert.Equal(t, []string{"127.0.0.1"}, cc.Options.GetPrimary().Servers)
esFactory := factories.Exporters[elasticsearch.TypeStr]
ec := esFactory.CreateDefaultConfig().(*elasticsearch.Config)
assert.Equal(t, []string{"http://127.0.0.1:9200"}, ec.GetPrimary().Servers)
assert.IsType(t, &jaegerreceiver.Factory{}, factories.Receivers["jaeger"])
assert.IsType(t, &jaegerexporter.Factory{}, factories.Exporters["jaeger"])
assert.IsType(t, &resourceprocessor.Factory{}, factories.Processors["resource"])
}
27 changes: 27 additions & 0 deletions cmd/opentelemetry-collector/app/receiver/kafka/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Copyright (c) 2020 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package kafka

import (
"github.com/open-telemetry/opentelemetry-collector/config/configmodels"

ingesterApp "github.com/jaegertracing/jaeger/cmd/ingester/app"
)

// Config hold configuration for Jaeger kafka receiver/ingester.
type Config struct {
configmodels.ReceiverSettings `mapstructure:",squash"`
ingesterApp.Options `mapstructure:",squash"`
}
91 changes: 91 additions & 0 deletions cmd/opentelemetry-collector/app/receiver/kafka/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Copyright (c) 2020 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package kafka

import (
"path"
"testing"

"github.com/open-telemetry/opentelemetry-collector/config"
"github.com/open-telemetry/opentelemetry-collector/config/configcheck"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/jaegertracing/jaeger/cmd/flags"
"github.com/jaegertracing/jaeger/cmd/ingester/app"
jConfig "github.com/jaegertracing/jaeger/pkg/config"
)

func TestDefaultConfig(t *testing.T) {
v, c := jConfig.Viperize(app.AddFlags)
err := c.ParseFlags([]string{""})
require.NoError(t, err)
factory := &Factory{OptionsFactory: func() *app.Options {
opts := DefaultOptions()
opts.InitFromViper(v)
return opts
}}
defaultCfg := factory.CreateDefaultConfig().(*Config)
assert.NoError(t, configcheck.ValidateConfig(defaultCfg))
assert.Equal(t, "jaeger-spans", defaultCfg.Topic)
assert.Equal(t, "protobuf", defaultCfg.Encoding)
assert.Equal(t, []string{"127.0.0.1:9092"}, defaultCfg.Brokers)
assert.Equal(t, "none", defaultCfg.Authentication)
assert.Equal(t, "/etc/krb5.conf", defaultCfg.Kerberos.ConfigPath)
assert.Equal(t, "kafka", defaultCfg.Kerberos.ServiceName)
assert.Equal(t, false, defaultCfg.TLS.Enabled)
}

func TestLoadConfigAndFlags(t *testing.T) {
factories, err := config.ExampleComponents()
require.NoError(t, err)

v, c := jConfig.Viperize(app.AddFlags, flags.AddConfigFileFlag)
err = c.ParseFlags([]string{"--config-file=./testdata/jaeger-config.yaml", "--kafka.consumer.topic=jaeger-test", "--kafka.consumer.brokers=host1,host2", "--kafka.consumer.tls.cert=from-flag"})
require.NoError(t, err)

err = flags.TryLoadConfigFile(v)
require.NoError(t, err)

factory := &Factory{OptionsFactory: func() *app.Options {
opts := DefaultOptions()
opts.InitFromViper(v)
assert.Equal(t, "jaeger-test", opts.Topic)
assert.Equal(t, []string{"host1", "host2"}, opts.Brokers)
return opts
}}

factories.Receivers[TypeStr] = factory
cfg, err := config.LoadConfigFile(t, path.Join(".", "testdata", "config.yaml"), factories)
require.NoError(t, err)
require.NotNil(t, cfg)

kafkaCfg := cfg.Receivers[TypeStr].(*Config)
assert.Equal(t, TypeStr, kafkaCfg.Name())
assert.Equal(t, "jaeger-prod", kafkaCfg.Topic)
assert.Equal(t, "emojis", kafkaCfg.Encoding)
assert.Equal(t, []string{"foo", "bar"}, kafkaCfg.Options.Brokers)
assert.Equal(t, "tls", kafkaCfg.Options.Authentication)
assert.Equal(t, "user", kafkaCfg.Options.PlainText.UserName)
assert.Equal(t, "123", kafkaCfg.Options.PlainText.Password)
assert.Equal(t, true, kafkaCfg.Options.TLS.Enabled)
assert.Equal(t, "ca.crt", kafkaCfg.Options.TLS.CAPath)
assert.Equal(t, "key.crt", kafkaCfg.Options.TLS.KeyPath)
assert.Equal(t, "from-flag", kafkaCfg.Options.TLS.CertPath)
assert.Equal(t, true, kafkaCfg.Options.TLS.SkipHostVerify)
assert.Equal(t, "jaeger", kafkaCfg.Options.Kerberos.Realm)
assert.Equal(t, "/etc/foo", kafkaCfg.Options.Kerberos.ConfigPath)
assert.Equal(t, "from-jaeger-config", kafkaCfg.Options.Kerberos.Username)
}
91 changes: 91 additions & 0 deletions cmd/opentelemetry-collector/app/receiver/kafka/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Copyright (c) 2020 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package kafka

import (
"context"
"fmt"

"github.com/open-telemetry/opentelemetry-collector/component"
"github.com/open-telemetry/opentelemetry-collector/config/configerror"
"github.com/open-telemetry/opentelemetry-collector/config/configmodels"
"github.com/open-telemetry/opentelemetry-collector/consumer"

ingesterApp "github.com/jaegertracing/jaeger/cmd/ingester/app"
)

const (
TypeStr = "jaeger_kafka"
)

// OptionsFactory returns initialized ingester app.Options structure.
type OptionsFactory func() *ingesterApp.Options

// DefaultOptions creates Kafka options supported by this receiver.
func DefaultOptions() *ingesterApp.Options {
return &ingesterApp.Options{}
}

type Factory struct {
OptionsFactory OptionsFactory
}

var _ component.ReceiverFactory = (*Factory)(nil)

// Type returns the receiver type.
func (f Factory) Type() configmodels.Type {
return TypeStr
}

// CreateDefaultConfig creates default config.
// This function implements OTEL component.ReceiverFactoryBase interface.
func (f Factory) CreateDefaultConfig() configmodels.Receiver {
opts := f.OptionsFactory()
return &Config{
Options: *opts,
}
}

// CustomUnmarshaler returns custom marshaller.
// This function implements OTEL component.ReceiverFactoryBase interface.
func (f Factory) CustomUnmarshaler() component.CustomUnmarshaler {
return nil
}

// CreateTraceReceiver returns Kafka receiver.
// This function implements OTEL component.ReceiverFactory.
func (f Factory) CreateTraceReceiver(
_ context.Context,
params component.ReceiverCreateParams,
cfg configmodels.Receiver,
nextConsumer consumer.TraceConsumer,
) (component.TraceReceiver, error) {
kafkaCfg, ok := cfg.(*Config)
if !ok {
return nil, fmt.Errorf("could not cast configuration to %s", TypeStr)
}
return new(kafkaCfg, nextConsumer, params)
}

// CreateMetricsReceiver returns metrics receiver.
// This function implements OTEL component.ReceiverFactory.
func (f Factory) CreateMetricsReceiver(
_ context.Context,
_ component.ReceiverCreateParams,
_ configmodels.Receiver,
_ consumer.MetricsConsumer,
) (component.MetricsReceiver, error) {
return nil, configerror.ErrDataTypeIsNotSupported
}
Loading

0 comments on commit ef8205d

Please sign in to comment.