Skip to content

Commit

Permalink
Add main and options
Browse files Browse the repository at this point in the history
Signed-off-by: Davit Yeghshatyan <davo@uber.com>
  • Loading branch information
Davit Yeghshatyan committed Jul 27, 2018
1 parent 809ffd8 commit 0dec945
Show file tree
Hide file tree
Showing 3 changed files with 324 additions and 0 deletions.
98 changes: 98 additions & 0 deletions cmd/ingester/app/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
// Copyright (c) 2018 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package app

import (
"flag"
"fmt"
"strconv"
"strings"

"github.com/spf13/viper"

"github.com/jaegertracing/jaeger/pkg/kafka/consumer"
"github.com/jaegertracing/jaeger/plugin/storage/kafka"
)

const (
configPrefix = "ingester"
suffixBrokers = ".brokers"
suffixTopic = ".topic"
suffixGroupID = ".group-id"
suffixParallelism = ".parallelism"
suffixEncoding = ".encoding"

encodingJSON = "json"
encodingProto = "protobuf"

defaultBroker = "127.0.0.1:9092"
defaultTopic = "jaeger-ingester-spans"
defaultGroupID = "jaeger-ingester"
defaultParallelism = 1000
defaultEncoding = encodingProto
)

// Options stores the configuration options for a Kafka consumer
type Options struct {
consumer.Configuration
Parallelism int
Encoding string
}

// AddFlags adds flags for Options
func AddFlags(flagSet *flag.FlagSet) {
flagSet.String(
configPrefix+suffixBrokers,
defaultBroker,
"The comma-separated list of kafka brokers. i.e. '127.0.0.1:9092,0.0.0:1234'")
flagSet.String(
configPrefix+suffixTopic,
defaultTopic,
"The name of the kafka topic to consume from")
flagSet.String(
configPrefix+suffixGroupID,
defaultGroupID,
"The Consumer Group that ingester will be consuming on behalf of")
flagSet.String(
configPrefix+suffixParallelism,
strconv.Itoa(defaultParallelism),
"The number of messages to process in parallel")
flagSet.String(
configPrefix+suffixEncoding,
defaultEncoding,
fmt.Sprintf(`The encoding of spans ("%s" or "%s") consumed from kafka`, encodingProto, encodingJSON))
}

// InitFromViper initializes Options with properties from viper
func (opt *Options) InitFromViper(v *viper.Viper) {
opt.Brokers = strings.Split(v.GetString(configPrefix+suffixBrokers), ",")
opt.Topic = v.GetString(configPrefix + suffixTopic)
opt.GroupID = v.GetString(configPrefix + suffixGroupID)
opt.Parallelism = v.GetInt(configPrefix + suffixParallelism)
opt.Encoding = v.GetString(configPrefix + suffixEncoding)
}

// UnmarshallerFromType returns the corresponding Unmarshaller for a byte array span encoding.
// Defaults to ProtobufUnmarshaller
func UnmarshallerFromType(encoding string) kafka.Unmarshaller {
switch encoding {
case encodingJSON:
return kafka.NewJSONUnmarshaller()
case encodingProto:
return kafka.NewProtobufUnmarshaller()
default:
return kafka.NewProtobufUnmarshaller()
}
}
66 changes: 66 additions & 0 deletions cmd/ingester/app/options_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Copyright (c) 2018 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package app

import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/jaegertracing/jaeger/pkg/config"
"github.com/jaegertracing/jaeger/plugin/storage/kafka"
)

func TestOptionsWithFlags(t *testing.T) {
opts := &Options{}
v, command := config.Viperize(AddFlags)
command.ParseFlags([]string{
"--ingester.topic=topic1",
"--ingester.brokers=127.0.0.1:9092,0.0.0:1234",
"--ingester.group-id=group1",
"--ingester.parallelism=5",
"--ingester.encoding=json"})
opts.InitFromViper(v)

assert.Equal(t, "topic1", opts.Topic)
assert.Equal(t, []string{"127.0.0.1:9092", "0.0.0:1234"}, opts.Brokers)
assert.Equal(t, "group1", opts.GroupID)
assert.Equal(t, 5, opts.Parallelism)
assert.Equal(t, encodingJSON, opts.Encoding)
}

func TestFlagDefaults(t *testing.T) {
opts := &Options{}
v, command := config.Viperize(AddFlags)
command.ParseFlags([]string{})
opts.InitFromViper(v)

assert.Equal(t, defaultTopic, opts.Topic)
assert.Equal(t, []string{defaultBroker}, opts.Brokers)
assert.Equal(t, defaultGroupID, opts.GroupID)
assert.Equal(t, defaultParallelism, opts.Parallelism)
assert.Equal(t, defaultEncoding, opts.Encoding)
}

func TestUnmarshallerFromType(t *testing.T) {
res := UnmarshallerFromType(encodingJSON)
assert.IsType(t, &kafka.JSONUnmarshaller{}, res)
res = UnmarshallerFromType(encodingProto)
assert.IsType(t, &kafka.ProtobufUnmarshaller{}, res)
res = UnmarshallerFromType("something else")
assert.IsType(t, &kafka.ProtobufUnmarshaller{}, res)
res = UnmarshallerFromType("")
assert.IsType(t, &kafka.ProtobufUnmarshaller{}, res)
}
160 changes: 160 additions & 0 deletions cmd/ingester/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
// Copyright (c) 2018 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
"fmt"
"log"
"os"
"os/signal"
"syscall"

"github.com/spf13/cobra"
"github.com/spf13/viper"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/cmd/env"
"github.com/jaegertracing/jaeger/cmd/flags"
"github.com/jaegertracing/jaeger/cmd/ingester/app"
spanconsumer "github.com/jaegertracing/jaeger/cmd/ingester/app/consumer"
"github.com/jaegertracing/jaeger/cmd/ingester/app/processor"
"github.com/jaegertracing/jaeger/pkg/config"
"github.com/jaegertracing/jaeger/pkg/kafka/consumer"
pMetrics "github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/pkg/version"
"github.com/jaegertracing/jaeger/plugin/storage"
)

func main() {
var signalsChannel = make(chan os.Signal, 0)
signal.Notify(signalsChannel, os.Interrupt, syscall.SIGTERM)

storageFactory, err := storage.NewFactory(storage.FactoryConfigFromEnvAndCLI(os.Args, os.Stderr))
if err != nil {
log.Fatalf("Cannot initialize storage factory: %v", err)
}

v := viper.New()
command := &cobra.Command{
Use: "jaeger-ingester",
Short: "Jaeger ingester consumes from Kafka and writes to storage",
Long: `Jaeger ingester consumes spans from a particular Kafka topic and writes them to all configured storage types.`,
RunE: func(cmd *cobra.Command, args []string) error {
err := flags.TryLoadConfigFile(v)
if err != nil {
return err
}

sFlags := new(flags.SharedFlags).InitFromViper(v)
logger, err := sFlags.NewLogger(zap.NewProductionConfig())
if err != nil {
return err
}
hc, err := sFlags.NewHealthCheck(logger)
if err != nil {
logger.Fatal("Could not start the health check server.", zap.Error(err))
}

mBldr := new(pMetrics.Builder).InitFromViper(v)
baseFactory, err := mBldr.CreateMetricsFactory("jaeger")
if err != nil {
logger.Fatal("Cannot create metrics factory.", zap.Error(err))
}
metricsFactory := baseFactory.Namespace("ingester", nil)

options := app.Options{}
options.InitFromViper(v)

storageFactory.InitFromViper(v)
if err := storageFactory.Initialize(baseFactory, logger); err != nil {
logger.Fatal("Failed to init storage factory", zap.Error(err))
}
spanWriter, err := storageFactory.CreateSpanWriter()
if err != nil {
logger.Fatal("Failed to create span writer", zap.Error(err))
}
unmarshaller := app.UnmarshallerFromType(options.Encoding)
spParams := processor.SpanProcessorParams{
Writer: spanWriter,
Unmarshaller: unmarshaller,
}
spanProcessor := processor.NewSpanProcessor(spParams)

consumerConfig := consumer.Configuration{
Brokers: options.Brokers,
Topic: options.Topic,
GroupID: options.GroupID,
}
saramaConsumer, err := consumerConfig.NewConsumer()
if err != nil {
logger.Fatal("Failed to create sarama consumer", zap.Error(err))
}

factoryParams := spanconsumer.ProcessorFactoryParams{
Topic: options.Topic,
Parallelism: options.Parallelism,
SaramaConsumer: saramaConsumer,
BaseProcessor: spanProcessor,
Logger: logger,
Factory: metricsFactory,
}
processorFactory, err := spanconsumer.NewProcessorFactory(factoryParams)
if err != nil {
logger.Fatal("Failed to create processor factory", zap.Error(err))
}

consumerParams := spanconsumer.Params{
InternalConsumer: saramaConsumer,
ProcessorFactory: *processorFactory,
Factory: metricsFactory,
Logger: logger,
}
kafkaConsumer, err := spanconsumer.New(consumerParams)
if err != nil {
logger.Fatal("Unable to set up consumer", zap.Error(err))
}
kafkaConsumer.Start()

hc.Ready()
select {
case <-signalsChannel:
err := kafkaConsumer.Close()
if err != nil {
logger.Error("Failed to close span writer", zap.Error(err))
}
logger.Info("Jaeger Ingester is finishing")
}
return nil
},
}

command.AddCommand(version.Command())
command.AddCommand(env.Command())

config.AddFlags(
v,
command,
flags.AddConfigFileFlag,
flags.AddFlags,
storageFactory.AddFlags,
pMetrics.AddFlags,
app.AddFlags,
)

if err := command.Execute(); err != nil {
fmt.Println(err.Error())
os.Exit(1)
}
}

0 comments on commit 0dec945

Please sign in to comment.