Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add OTEL ingester component #2225

Merged
merged 7 commits into from
May 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,5 @@ crossdock/crossdock-*
run-crossdock.log
cmd/opentelemetry-collector/cmd/collector/opentelemetry-collector-*
cmd/opentelemetry-collector/cmd/agent/opentelemetry-agent-*
cmd/opentelemetry-collector/cmd/ingester/opentelemetry-ingester-*
__pycache__
13 changes: 11 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ build-all-in-one-linux: build-ui
build-all-in-one: elasticsearch-mappings
ifeq ($(GOARCH), s390x)
$(GOBUILD) -tags ui -o ./cmd/all-in-one/all-in-one-$(GOOS)-$(GOARCH) $(BUILD_INFO) ./cmd/all-in-one/main.go
else
else
$(GOBUILD) -tags ui -o ./cmd/all-in-one/all-in-one-$(GOOS) $(BUILD_INFO) ./cmd/all-in-one/main.go
endif

Expand Down Expand Up @@ -272,6 +272,14 @@ else
cd ${OTEL_COLLECTOR_DIR}/cmd/agent && $(GOBUILD) -o ./opentelemetry-agent-$(GOOS) $(BUILD_INFO) main.go
endif

.PHONY: build-otel-ingester
build-otel-ingester:
ifeq ($(GOARCH), s390x)
cd ${OTEL_COLLECTOR_DIR}/cmd/ingester && $(GOBUILD) -o ./opentelemetry-ingester-$(GOOS)-$(GOARCH) $(BUILD_INFO) main.go
else
cd ${OTEL_COLLECTOR_DIR}/cmd/ingester && $(GOBUILD) -o ./opentelemetry-ingester-$(GOOS) $(BUILD_INFO) main.go
endif

.PHONY: build-ingester
build-ingester:
ifeq ($(GOARCH), s390x)
Expand Down Expand Up @@ -300,7 +308,7 @@ build-binaries-s390x:
GOOS=linux GOARCH=s390x $(MAKE) build-platform-binaries

.PHONY: build-platform-binaries
build-platform-binaries: build-agent build-collector build-query build-ingester build-all-in-one build-examples build-tracegen build-otel-collector build-otel-agent
build-platform-binaries: build-agent build-collector build-query build-ingester build-all-in-one build-examples build-tracegen build-otel-collector build-otel-agent build-otel-ingester

.PHONY: build-all-platforms
build-all-platforms: build-binaries-linux build-binaries-windows build-binaries-darwin build-binaries-s390x
Expand All @@ -324,6 +332,7 @@ docker-images-jaeger-backend:
done
docker build -t $(DOCKER_NAMESPACE)/jaeger-opentelemetry-collector:${DOCKER_TAG} -f ${OTEL_COLLECTOR_DIR}/cmd/collector/Dockerfile cmd/opentelemetry-collector/cmd/collector
docker build -t $(DOCKER_NAMESPACE)/jaeger-opentelemetry-agent:${DOCKER_TAG} -f ${OTEL_COLLECTOR_DIR}/cmd/agent/Dockerfile cmd/opentelemetry-collector/cmd/agent
docker build -t $(DOCKER_NAMESPACE)/jaeger-opentelemetry-ingester:${DOCKER_TAG} -f ${OTEL_COLLECTOR_DIR}/cmd/ingester/Dockerfile cmd/opentelemetry-collector/cmd/ingester

.PHONY: docker-images-tracegen
docker-images-tracegen:
Expand Down
36 changes: 34 additions & 2 deletions cmd/opentelemetry-collector/app/defaults/default_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/cassandra"
"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/elasticsearch"
"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/kafka"
kafkaRec "github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/receiver/kafka"
"github.com/jaegertracing/jaeger/ports"
)

Expand Down Expand Up @@ -123,7 +124,9 @@ func createExporters(storageTypes string, factories config.Factories) (configmod
// It enables Jaeger receiver with UDP endpoints and Jaeger exporter.
func AgentConfig(factories config.Factories) *configmodels.Config {
jaegerExporter := factories.Exporters["jaeger"]
exporters := configmodels.Exporters{"jaeger": jaegerExporter.CreateDefaultConfig()}
exporters := configmodels.Exporters{
"jaeger": jaegerExporter.CreateDefaultConfig(),
}
hc := factories.Extensions["health_check"].CreateDefaultConfig().(*healthcheckextension.Config)
processors := configmodels.Processors{}
resProcessor := factories.Processors["resource"].CreateDefaultConfig().(*resourceprocessor.Config)
Expand Down Expand Up @@ -164,12 +167,41 @@ func createAgentReceivers(factories config.Factories) configmodels.Receivers {
},
},
}
recvs := map[string]configmodels.Receiver{
recvs := configmodels.Receivers{
"jaeger": jaeger,
}
return recvs
}

// IngesterConfig creates default ingester configuration.
// It enables Jaeger kafka receiver and storage backend.
func IngesterConfig(storageType string, factories config.Factories) (*configmodels.Config, error) {
exporters, err := createExporters(storageType, factories)
if err != nil {
return nil, err
}
kafkaReceiver := factories.Receivers[kafkaRec.TypeStr].CreateDefaultConfig().(*kafkaRec.Config)
receivers := configmodels.Receivers{
kafkaReceiver.Name(): kafkaReceiver,
}
hc := factories.Extensions["health_check"].CreateDefaultConfig()
return &configmodels.Config{
Receivers: receivers,
Exporters: exporters,
Extensions: configmodels.Extensions{"health_check": hc},
Service: configmodels.Service{
Extensions: []string{"health_check"},
Pipelines: configmodels.Pipelines{
"traces": {
InputType: configmodels.TracesDataType,
Receivers: receiverNames(receivers),
Exporters: exporterNames(exporters),
},
},
},
}, nil
}

func receiverNames(receivers configmodels.Receivers) []string {
var names []string
for _, v := range receivers {
Expand Down
109 changes: 90 additions & 19 deletions cmd/opentelemetry-collector/app/defaults/default_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package defaults

import (
"fmt"
"sort"
"testing"

Expand All @@ -23,6 +24,7 @@ import (
"github.com/open-telemetry/opentelemetry-collector/exporter/jaegerexporter"
"github.com/open-telemetry/opentelemetry-collector/processor/resourceprocessor"
"github.com/open-telemetry/opentelemetry-collector/receiver/jaegerreceiver"
"github.com/spf13/viper"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
Expand All @@ -31,6 +33,7 @@ import (
"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/cassandra"
"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/elasticsearch"
"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/kafka"
kafkaRec "github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/receiver/kafka"
jConfig "github.com/jaegertracing/jaeger/pkg/config"
"github.com/jaegertracing/jaeger/ports"
)
Expand Down Expand Up @@ -189,25 +192,93 @@ func TestDefaultAgentConfig(t *testing.T) {
},
}
for _, test := range tests {
v, _ := jConfig.Viperize(grpc.AddFlags)
for key, val := range test.config {
v.Set(key, val)
}
factories := Components(v)
cfg := AgentConfig(factories)
t.Run(fmt.Sprintf("%v", test.config), func(t *testing.T) {
v, _ := jConfig.Viperize(grpc.AddFlags)
for key, val := range test.config {
v.Set(key, val)
}
factories := Components(v)
cfg := AgentConfig(factories)
require.NoError(t, config.ValidateConfig(cfg, zap.NewNop()))

assert.Equal(t, test.service, cfg.Service)
assert.Equal(t, 1, len(cfg.Receivers))
assert.IsType(t, &jaegerreceiver.Config{}, cfg.Receivers["jaeger"])
assert.Equal(t, 1, len(cfg.Exporters))
assert.IsType(t, &jaegerexporter.Config{}, cfg.Exporters["jaeger"])
processorMap := map[string]bool{}
for _, p := range test.service.Pipelines["traces"].Processors {
processorMap[p] = true
}
if processorMap["resource"] {
assert.Equal(t, len(processorMap), len(cfg.Processors))
assert.IsType(t, &resourceprocessor.Config{}, cfg.Processors["resource"])
}
assert.Equal(t, test.service, cfg.Service)
assert.Equal(t, 1, len(cfg.Receivers))
assert.IsType(t, &jaegerreceiver.Config{}, cfg.Receivers["jaeger"])
assert.Equal(t, 1, len(cfg.Exporters))
assert.IsType(t, &jaegerexporter.Config{}, cfg.Exporters["jaeger"])
processorMap := map[string]bool{}
for _, p := range test.service.Pipelines["traces"].Processors {
processorMap[p] = true
}
if processorMap["resource"] {
assert.Equal(t, len(processorMap), len(cfg.Processors))
assert.IsType(t, &resourceprocessor.Config{}, cfg.Processors["resource"])
}
})
}
}

func TestDefaultIngesterConfig(t *testing.T) {
tests := []struct {
storageType string
service configmodels.Service
err string
}{
{
storageType: "elasticsearch",
service: configmodels.Service{
Extensions: []string{"health_check"},
Pipelines: configmodels.Pipelines{
"traces": &configmodels.Pipeline{
InputType: configmodels.TracesDataType,
Receivers: []string{kafkaRec.TypeStr},
Exporters: []string{elasticsearch.TypeStr},
},
},
},
},
{
storageType: "elasticsearch,cassandra",
service: configmodels.Service{
Extensions: []string{"health_check"},
Pipelines: configmodels.Pipelines{
"traces": &configmodels.Pipeline{
InputType: configmodels.TracesDataType,
Receivers: []string{kafkaRec.TypeStr},
Exporters: []string{cassandra.TypeStr, elasticsearch.TypeStr},
},
},
},
},
{
storageType: "floppy",
err: "unknown storage type: floppy",
},
}
for _, test := range tests {
t.Run(test.storageType, func(t *testing.T) {
factories := Components(viper.New())
cfg, err := IngesterConfig(test.storageType, factories)
if test.err != "" {
require.Nil(t, cfg)
assert.EqualError(t, err, test.err)
return
}
require.NoError(t, err)
require.NoError(t, config.ValidateConfig(cfg, zap.NewNop()))

sort.Strings(cfg.Service.Pipelines["traces"].Exporters)
assert.Equal(t, test.service, cfg.Service)
assert.Equal(t, 1, len(cfg.Receivers))
assert.IsType(t, &kafkaRec.Config{}, cfg.Receivers[kafkaRec.TypeStr])

assert.Equal(t, len(test.service.Pipelines["traces"].Exporters), len(cfg.Exporters))
types := []string{}
for _, v := range cfg.Exporters {
types = append(types, string(v.Type()))
}
sort.Strings(types)
assert.Equal(t, test.service.Pipelines["traces"].Exporters, types)
})
}
}
4 changes: 4 additions & 0 deletions cmd/opentelemetry-collector/app/receiver/kafka/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ func (f Factory) CreateDefaultConfig() configmodels.Receiver {
opts := f.OptionsFactory()
return &Config{
Options: *opts,
ReceiverSettings: configmodels.ReceiverSettings{
TypeVal: TypeStr,
NameVal: TypeStr,
},
}
}

Expand Down
29 changes: 29 additions & 0 deletions cmd/opentelemetry-collector/app/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,16 @@ package app

import (
"flag"
"fmt"
"io/ioutil"
"os"
"strings"

"github.com/open-telemetry/opentelemetry-collector/service/builder"

"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/cassandra"
"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/elasticsearch"
"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/kafka"
)

// GetOTELConfigFile returns name of OTEL config file.
Expand All @@ -31,3 +37,26 @@ func GetOTELConfigFile() string {
f.Parse(os.Args[1:])
return builder.GetConfigFile()
}

// StorageFlags return a function that adds storage flags.
// storage parameter can contain a comma separated list of supported Jaeger storage backends.
func StorageFlags(storage string) (func(*flag.FlagSet), error) {
var flagFn []func(*flag.FlagSet)
for _, s := range strings.Split(storage, ",") {
switch s {
case "cassandra":
flagFn = append(flagFn, cassandra.DefaultOptions().AddFlags)
case "elasticsearch":
flagFn = append(flagFn, elasticsearch.DefaultOptions().AddFlags)
case "kafka":
flagFn = append(flagFn, kafka.DefaultOptions().AddFlags)
default:
return nil, fmt.Errorf("unknown storage type: %s", s)
}
}
return func(flagSet *flag.FlagSet) {
for _, f := range flagFn {
f(flagSet)
}
}, nil
}
30 changes: 1 addition & 29 deletions cmd/opentelemetry-collector/cmd/collector/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,9 @@
package main

import (
"flag"
"fmt"
"log"
"os"
"strings"

"github.com/open-telemetry/opentelemetry-collector/config"
"github.com/open-telemetry/opentelemetry-collector/config/configmodels"
Expand All @@ -31,9 +29,6 @@ import (
jflags "github.com/jaegertracing/jaeger/cmd/flags"
"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app"
"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/defaults"
"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/cassandra"
"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/elasticsearch"
"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/kafka"
"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/processor/resourceprocessor"
jConfig "github.com/jaegertracing/jaeger/pkg/config"
"github.com/jaegertracing/jaeger/plugin/sampling/strategystore/static"
Expand Down Expand Up @@ -92,7 +87,7 @@ func main() {

// Add Jaeger specific flags to service command
// this passes flag values to viper.
storageFlags, err := storageFlags(storageType)
storageFlags, err := app.StorageFlags(storageType)
if err != nil {
handleErr(err)
}
Expand All @@ -117,26 +112,3 @@ func main() {
err = svc.Start()
handleErr(err)
}

// storageFlags return a function that will add storage flags.
// storage parameter can contain a comma separated list of supported Jaeger storage backends.
func storageFlags(storage string) (func(*flag.FlagSet), error) {
var flagFn []func(*flag.FlagSet)
for _, s := range strings.Split(storage, ",") {
switch s {
case "cassandra":
flagFn = append(flagFn, cassandra.DefaultOptions().AddFlags)
case "elasticsearch":
flagFn = append(flagFn, elasticsearch.DefaultOptions().AddFlags)
case "kafka":
flagFn = append(flagFn, kafka.DefaultOptions().AddFlags)
default:
return nil, fmt.Errorf("unknown storage type: %s", s)
}
}
return func(flagSet *flag.FlagSet) {
for _, f := range flagFn {
f(flagSet)
}
}, nil
}
8 changes: 8 additions & 0 deletions cmd/opentelemetry-collector/cmd/ingester/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
FROM alpine:latest as certs
RUN apk add --update --no-cache ca-certificates

FROM scratch
COPY --from=certs /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/ca-certificates.crt

COPY opentelemetry-ingester-linux /go/bin/
ENTRYPOINT ["/go/bin/opentelemetry-ingester-linux"]
Loading