diff --git a/.gitignore b/.gitignore index 224e5a58fcb..269e8787513 100644 --- a/.gitignore +++ b/.gitignore @@ -34,4 +34,5 @@ crossdock/crossdock-* run-crossdock.log cmd/opentelemetry-collector/cmd/collector/opentelemetry-collector-* cmd/opentelemetry-collector/cmd/agent/opentelemetry-agent-* +cmd/opentelemetry-collector/cmd/ingester/opentelemetry-ingester-* __pycache__ diff --git a/Makefile b/Makefile index 9a9300b47dc..fb1ad83f24d 100644 --- a/Makefile +++ b/Makefile @@ -228,7 +228,7 @@ build-all-in-one-linux: build-ui build-all-in-one: elasticsearch-mappings ifeq ($(GOARCH), s390x) $(GOBUILD) -tags ui -o ./cmd/all-in-one/all-in-one-$(GOOS)-$(GOARCH) $(BUILD_INFO) ./cmd/all-in-one/main.go -else +else $(GOBUILD) -tags ui -o ./cmd/all-in-one/all-in-one-$(GOOS) $(BUILD_INFO) ./cmd/all-in-one/main.go endif @@ -272,6 +272,14 @@ else cd ${OTEL_COLLECTOR_DIR}/cmd/agent && $(GOBUILD) -o ./opentelemetry-agent-$(GOOS) $(BUILD_INFO) main.go endif +.PHONY: build-otel-ingester +build-otel-ingester: +ifeq ($(GOARCH), s390x) + cd ${OTEL_COLLECTOR_DIR}/cmd/ingester && $(GOBUILD) -o ./opentelemetry-ingester-$(GOOS)-$(GOARCH) $(BUILD_INFO) main.go +else + cd ${OTEL_COLLECTOR_DIR}/cmd/ingester && $(GOBUILD) -o ./opentelemetry-ingester-$(GOOS) $(BUILD_INFO) main.go +endif + .PHONY: build-ingester build-ingester: ifeq ($(GOARCH), s390x) @@ -300,7 +308,7 @@ build-binaries-s390x: GOOS=linux GOARCH=s390x $(MAKE) build-platform-binaries .PHONY: build-platform-binaries -build-platform-binaries: build-agent build-collector build-query build-ingester build-all-in-one build-examples build-tracegen build-otel-collector build-otel-agent +build-platform-binaries: build-agent build-collector build-query build-ingester build-all-in-one build-examples build-tracegen build-otel-collector build-otel-agent build-otel-ingester .PHONY: build-all-platforms build-all-platforms: build-binaries-linux build-binaries-windows build-binaries-darwin build-binaries-s390x @@ -324,6 +332,7 @@ docker-images-jaeger-backend: done docker build -t $(DOCKER_NAMESPACE)/jaeger-opentelemetry-collector:${DOCKER_TAG} -f ${OTEL_COLLECTOR_DIR}/cmd/collector/Dockerfile cmd/opentelemetry-collector/cmd/collector docker build -t $(DOCKER_NAMESPACE)/jaeger-opentelemetry-agent:${DOCKER_TAG} -f ${OTEL_COLLECTOR_DIR}/cmd/agent/Dockerfile cmd/opentelemetry-collector/cmd/agent + docker build -t $(DOCKER_NAMESPACE)/jaeger-opentelemetry-ingester:${DOCKER_TAG} -f ${OTEL_COLLECTOR_DIR}/cmd/ingester/Dockerfile cmd/opentelemetry-collector/cmd/ingester .PHONY: docker-images-tracegen docker-images-tracegen: diff --git a/cmd/opentelemetry-collector/app/defaults/default_config.go b/cmd/opentelemetry-collector/app/defaults/default_config.go index 40dab02dc33..c1b64fb74d5 100644 --- a/cmd/opentelemetry-collector/app/defaults/default_config.go +++ b/cmd/opentelemetry-collector/app/defaults/default_config.go @@ -29,6 +29,7 @@ import ( "github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/cassandra" "github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/elasticsearch" "github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/kafka" + kafkaRec "github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/receiver/kafka" "github.com/jaegertracing/jaeger/ports" ) @@ -123,7 +124,9 @@ func createExporters(storageTypes string, factories config.Factories) (configmod // It enables Jaeger receiver with UDP endpoints and Jaeger exporter. func AgentConfig(factories config.Factories) *configmodels.Config { jaegerExporter := factories.Exporters["jaeger"] - exporters := configmodels.Exporters{"jaeger": jaegerExporter.CreateDefaultConfig()} + exporters := configmodels.Exporters{ + "jaeger": jaegerExporter.CreateDefaultConfig(), + } hc := factories.Extensions["health_check"].CreateDefaultConfig().(*healthcheckextension.Config) processors := configmodels.Processors{} resProcessor := factories.Processors["resource"].CreateDefaultConfig().(*resourceprocessor.Config) @@ -164,12 +167,41 @@ func createAgentReceivers(factories config.Factories) configmodels.Receivers { }, }, } - recvs := map[string]configmodels.Receiver{ + recvs := configmodels.Receivers{ "jaeger": jaeger, } return recvs } +// IngesterConfig creates default ingester configuration. +// It enables Jaeger kafka receiver and storage backend. +func IngesterConfig(storageType string, factories config.Factories) (*configmodels.Config, error) { + exporters, err := createExporters(storageType, factories) + if err != nil { + return nil, err + } + kafkaReceiver := factories.Receivers[kafkaRec.TypeStr].CreateDefaultConfig().(*kafkaRec.Config) + receivers := configmodels.Receivers{ + kafkaReceiver.Name(): kafkaReceiver, + } + hc := factories.Extensions["health_check"].CreateDefaultConfig() + return &configmodels.Config{ + Receivers: receivers, + Exporters: exporters, + Extensions: configmodels.Extensions{"health_check": hc}, + Service: configmodels.Service{ + Extensions: []string{"health_check"}, + Pipelines: configmodels.Pipelines{ + "traces": { + InputType: configmodels.TracesDataType, + Receivers: receiverNames(receivers), + Exporters: exporterNames(exporters), + }, + }, + }, + }, nil +} + func receiverNames(receivers configmodels.Receivers) []string { var names []string for _, v := range receivers { diff --git a/cmd/opentelemetry-collector/app/defaults/default_config_test.go b/cmd/opentelemetry-collector/app/defaults/default_config_test.go index 58d14a7ff3b..dfb8352b28d 100644 --- a/cmd/opentelemetry-collector/app/defaults/default_config_test.go +++ b/cmd/opentelemetry-collector/app/defaults/default_config_test.go @@ -15,6 +15,7 @@ package defaults import ( + "fmt" "sort" "testing" @@ -23,6 +24,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector/exporter/jaegerexporter" "github.com/open-telemetry/opentelemetry-collector/processor/resourceprocessor" "github.com/open-telemetry/opentelemetry-collector/receiver/jaegerreceiver" + "github.com/spf13/viper" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/zap" @@ -31,6 +33,7 @@ import ( "github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/cassandra" "github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/elasticsearch" "github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/kafka" + kafkaRec "github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/receiver/kafka" jConfig "github.com/jaegertracing/jaeger/pkg/config" "github.com/jaegertracing/jaeger/ports" ) @@ -189,25 +192,93 @@ func TestDefaultAgentConfig(t *testing.T) { }, } for _, test := range tests { - v, _ := jConfig.Viperize(grpc.AddFlags) - for key, val := range test.config { - v.Set(key, val) - } - factories := Components(v) - cfg := AgentConfig(factories) + t.Run(fmt.Sprintf("%v", test.config), func(t *testing.T) { + v, _ := jConfig.Viperize(grpc.AddFlags) + for key, val := range test.config { + v.Set(key, val) + } + factories := Components(v) + cfg := AgentConfig(factories) + require.NoError(t, config.ValidateConfig(cfg, zap.NewNop())) - assert.Equal(t, test.service, cfg.Service) - assert.Equal(t, 1, len(cfg.Receivers)) - assert.IsType(t, &jaegerreceiver.Config{}, cfg.Receivers["jaeger"]) - assert.Equal(t, 1, len(cfg.Exporters)) - assert.IsType(t, &jaegerexporter.Config{}, cfg.Exporters["jaeger"]) - processorMap := map[string]bool{} - for _, p := range test.service.Pipelines["traces"].Processors { - processorMap[p] = true - } - if processorMap["resource"] { - assert.Equal(t, len(processorMap), len(cfg.Processors)) - assert.IsType(t, &resourceprocessor.Config{}, cfg.Processors["resource"]) - } + assert.Equal(t, test.service, cfg.Service) + assert.Equal(t, 1, len(cfg.Receivers)) + assert.IsType(t, &jaegerreceiver.Config{}, cfg.Receivers["jaeger"]) + assert.Equal(t, 1, len(cfg.Exporters)) + assert.IsType(t, &jaegerexporter.Config{}, cfg.Exporters["jaeger"]) + processorMap := map[string]bool{} + for _, p := range test.service.Pipelines["traces"].Processors { + processorMap[p] = true + } + if processorMap["resource"] { + assert.Equal(t, len(processorMap), len(cfg.Processors)) + assert.IsType(t, &resourceprocessor.Config{}, cfg.Processors["resource"]) + } + }) + } +} + +func TestDefaultIngesterConfig(t *testing.T) { + tests := []struct { + storageType string + service configmodels.Service + err string + }{ + { + storageType: "elasticsearch", + service: configmodels.Service{ + Extensions: []string{"health_check"}, + Pipelines: configmodels.Pipelines{ + "traces": &configmodels.Pipeline{ + InputType: configmodels.TracesDataType, + Receivers: []string{kafkaRec.TypeStr}, + Exporters: []string{elasticsearch.TypeStr}, + }, + }, + }, + }, + { + storageType: "elasticsearch,cassandra", + service: configmodels.Service{ + Extensions: []string{"health_check"}, + Pipelines: configmodels.Pipelines{ + "traces": &configmodels.Pipeline{ + InputType: configmodels.TracesDataType, + Receivers: []string{kafkaRec.TypeStr}, + Exporters: []string{cassandra.TypeStr, elasticsearch.TypeStr}, + }, + }, + }, + }, + { + storageType: "floppy", + err: "unknown storage type: floppy", + }, + } + for _, test := range tests { + t.Run(test.storageType, func(t *testing.T) { + factories := Components(viper.New()) + cfg, err := IngesterConfig(test.storageType, factories) + if test.err != "" { + require.Nil(t, cfg) + assert.EqualError(t, err, test.err) + return + } + require.NoError(t, err) + require.NoError(t, config.ValidateConfig(cfg, zap.NewNop())) + + sort.Strings(cfg.Service.Pipelines["traces"].Exporters) + assert.Equal(t, test.service, cfg.Service) + assert.Equal(t, 1, len(cfg.Receivers)) + assert.IsType(t, &kafkaRec.Config{}, cfg.Receivers[kafkaRec.TypeStr]) + + assert.Equal(t, len(test.service.Pipelines["traces"].Exporters), len(cfg.Exporters)) + types := []string{} + for _, v := range cfg.Exporters { + types = append(types, string(v.Type())) + } + sort.Strings(types) + assert.Equal(t, test.service.Pipelines["traces"].Exporters, types) + }) } } diff --git a/cmd/opentelemetry-collector/app/receiver/kafka/factory.go b/cmd/opentelemetry-collector/app/receiver/kafka/factory.go index ae1695590f2..8a131c59c3c 100644 --- a/cmd/opentelemetry-collector/app/receiver/kafka/factory.go +++ b/cmd/opentelemetry-collector/app/receiver/kafka/factory.go @@ -55,6 +55,10 @@ func (f Factory) CreateDefaultConfig() configmodels.Receiver { opts := f.OptionsFactory() return &Config{ Options: *opts, + ReceiverSettings: configmodels.ReceiverSettings{ + TypeVal: TypeStr, + NameVal: TypeStr, + }, } } diff --git a/cmd/opentelemetry-collector/app/util.go b/cmd/opentelemetry-collector/app/util.go index dec2bfada9b..54a6bdcccec 100644 --- a/cmd/opentelemetry-collector/app/util.go +++ b/cmd/opentelemetry-collector/app/util.go @@ -16,10 +16,16 @@ package app import ( "flag" + "fmt" "io/ioutil" "os" + "strings" "github.com/open-telemetry/opentelemetry-collector/service/builder" + + "github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/cassandra" + "github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/elasticsearch" + "github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/kafka" ) // GetOTELConfigFile returns name of OTEL config file. @@ -31,3 +37,26 @@ func GetOTELConfigFile() string { f.Parse(os.Args[1:]) return builder.GetConfigFile() } + +// StorageFlags return a function that adds storage flags. +// storage parameter can contain a comma separated list of supported Jaeger storage backends. +func StorageFlags(storage string) (func(*flag.FlagSet), error) { + var flagFn []func(*flag.FlagSet) + for _, s := range strings.Split(storage, ",") { + switch s { + case "cassandra": + flagFn = append(flagFn, cassandra.DefaultOptions().AddFlags) + case "elasticsearch": + flagFn = append(flagFn, elasticsearch.DefaultOptions().AddFlags) + case "kafka": + flagFn = append(flagFn, kafka.DefaultOptions().AddFlags) + default: + return nil, fmt.Errorf("unknown storage type: %s", s) + } + } + return func(flagSet *flag.FlagSet) { + for _, f := range flagFn { + f(flagSet) + } + }, nil +} diff --git a/cmd/opentelemetry-collector/cmd/collector/main.go b/cmd/opentelemetry-collector/cmd/collector/main.go index 5af40582b3b..9339d672d4b 100644 --- a/cmd/opentelemetry-collector/cmd/collector/main.go +++ b/cmd/opentelemetry-collector/cmd/collector/main.go @@ -15,11 +15,9 @@ package main import ( - "flag" "fmt" "log" "os" - "strings" "github.com/open-telemetry/opentelemetry-collector/config" "github.com/open-telemetry/opentelemetry-collector/config/configmodels" @@ -31,9 +29,6 @@ import ( jflags "github.com/jaegertracing/jaeger/cmd/flags" "github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app" "github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/defaults" - "github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/cassandra" - "github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/elasticsearch" - "github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/kafka" "github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/processor/resourceprocessor" jConfig "github.com/jaegertracing/jaeger/pkg/config" "github.com/jaegertracing/jaeger/plugin/sampling/strategystore/static" @@ -92,7 +87,7 @@ func main() { // Add Jaeger specific flags to service command // this passes flag values to viper. - storageFlags, err := storageFlags(storageType) + storageFlags, err := app.StorageFlags(storageType) if err != nil { handleErr(err) } @@ -117,26 +112,3 @@ func main() { err = svc.Start() handleErr(err) } - -// storageFlags return a function that will add storage flags. -// storage parameter can contain a comma separated list of supported Jaeger storage backends. -func storageFlags(storage string) (func(*flag.FlagSet), error) { - var flagFn []func(*flag.FlagSet) - for _, s := range strings.Split(storage, ",") { - switch s { - case "cassandra": - flagFn = append(flagFn, cassandra.DefaultOptions().AddFlags) - case "elasticsearch": - flagFn = append(flagFn, elasticsearch.DefaultOptions().AddFlags) - case "kafka": - flagFn = append(flagFn, kafka.DefaultOptions().AddFlags) - default: - return nil, fmt.Errorf("unknown storage type: %s", s) - } - } - return func(flagSet *flag.FlagSet) { - for _, f := range flagFn { - f(flagSet) - } - }, nil -} diff --git a/cmd/opentelemetry-collector/cmd/ingester/Dockerfile b/cmd/opentelemetry-collector/cmd/ingester/Dockerfile new file mode 100644 index 00000000000..816fa72350d --- /dev/null +++ b/cmd/opentelemetry-collector/cmd/ingester/Dockerfile @@ -0,0 +1,8 @@ +FROM alpine:latest as certs +RUN apk add --update --no-cache ca-certificates + +FROM scratch +COPY --from=certs /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/ca-certificates.crt + +COPY opentelemetry-ingester-linux /go/bin/ +ENTRYPOINT ["/go/bin/opentelemetry-ingester-linux"] diff --git a/cmd/opentelemetry-collector/cmd/ingester/main.go b/cmd/opentelemetry-collector/cmd/ingester/main.go new file mode 100644 index 00000000000..6bd7ba651e0 --- /dev/null +++ b/cmd/opentelemetry-collector/cmd/ingester/main.go @@ -0,0 +1,107 @@ +// Copyright (c) 2020 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "fmt" + "log" + "os" + + "github.com/open-telemetry/opentelemetry-collector/config" + "github.com/open-telemetry/opentelemetry-collector/config/configmodels" + "github.com/open-telemetry/opentelemetry-collector/service" + "github.com/spf13/viper" + + jflags "github.com/jaegertracing/jaeger/cmd/flags" + ingesterApp "github.com/jaegertracing/jaeger/cmd/ingester/app" + "github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app" + "github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/defaults" + jconfig "github.com/jaegertracing/jaeger/pkg/config" + "github.com/jaegertracing/jaeger/plugin/storage" +) + +func main() { + handleErr := func(err error) { + if err != nil { + log.Fatalf("Failed to run the service: %v", err) + } + } + + info := service.ApplicationStartInfo{ + ExeName: "jaeger-opentelemetry-ingester", + LongName: "Jaeger OpenTelemetry Ingester", + // TODO + //Version: version.Version, + //GitHash: version.GitHash, + } + + v := viper.New() + + storageType := os.Getenv(storage.SpanStorageTypeEnvVar) + if storageType == "" { + storageType = "cassandra" + } + + cmpts := defaults.Components(v) + cfgFactory := func(otelViper *viper.Viper, f config.Factories) (*configmodels.Config, error) { + cfg, err := defaults.IngesterConfig(storageType, cmpts) + if err != nil { + return nil, err + } + if len(app.GetOTELConfigFile()) > 0 { + otelCfg, err := service.FileLoaderConfigFactory(otelViper, f) + if err != nil { + return nil, err + } + err = defaults.MergeConfigs(cfg, otelCfg) + if err != nil { + return nil, err + } + } + return cfg, nil + } + + svc, err := service.New(service.Parameters{ + ApplicationStartInfo: info, + Factories: cmpts, + ConfigFactory: cfgFactory, + }) + handleErr(err) + + // Add Jaeger specific flags to service command + // this passes flag values to viper. + storageFlags, err := app.StorageFlags(storageType) + if err != nil { + handleErr(err) + } + + cmd := svc.Command() + jconfig.AddFlags(v, + cmd, + jflags.AddConfigFileFlag, + ingesterApp.AddFlags, + storageFlags, + ) + + // parse flags to propagate Jaeger config file flag value to viper + cmd.ParseFlags(os.Args) + err = jflags.TryLoadConfigFile(v) + if err != nil { + handleErr(fmt.Errorf("could not load Jaeger configuration file %w", err)) + } + + err = svc.Start() + handleErr(err) +} diff --git a/scripts/travis/upload-all-docker-images.sh b/scripts/travis/upload-all-docker-images.sh index de0cc9f8b56..8d2e869f4da 100755 --- a/scripts/travis/upload-all-docker-images.sh +++ b/scripts/travis/upload-all-docker-images.sh @@ -20,7 +20,7 @@ else fi export DOCKER_NAMESPACE=jaegertracing -for component in agent cassandra-schema es-index-cleaner es-rollover collector query ingester tracegen opentelemetry-collector opentelemetry-agent +for component in agent cassandra-schema es-index-cleaner es-rollover collector query ingester tracegen opentelemetry-collector opentelemetry-agent opentelemetry-ingester do export REPO="jaegertracing/jaeger-${component}" bash ./scripts/travis/upload-to-docker.sh diff --git a/scripts/travis/upload-to-docker.sh b/scripts/travis/upload-to-docker.sh index 82b8b6fc142..8550e53e951 100755 --- a/scripts/travis/upload-to-docker.sh +++ b/scripts/travis/upload-to-docker.sh @@ -34,7 +34,7 @@ fi # Do not enable echo before the `docker login` command to avoid revealing the password. set -x docker login -u $DOCKER_USER -p $DOCKER_PASS -if [[ "${REPO}" == "jaegertracing/jaeger-opentelemetry-collector" || "${REPO}" == "jaegertracing/jaeger-opentelemetry-agent" ]]; then +if [[ "${REPO}" == "jaegertracing/jaeger-opentelemetry-collector" || "${REPO}" == "jaegertracing/jaeger-opentelemetry-agent" || "${REPO}" == "jaegertracing/jaeger-opentelemetry-ingester" ]]; then # TODO remove once Jaeger OTEL collector is stable docker push $REPO:latest else