Skip to content

Commit

Permalink
Add builder, flags, and main
Browse files Browse the repository at this point in the history
Signed-off-by: Davit Yeghshatyan <davo@uber.com>
  • Loading branch information
Davit Yeghshatyan committed Jul 31, 2018
1 parent c12a9e2 commit 012147a
Show file tree
Hide file tree
Showing 4 changed files with 355 additions and 0 deletions.
123 changes: 123 additions & 0 deletions cmd/ingester/app/builder/builder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
// Copyright (c) 2018 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package builder

import (
"strings"

"github.com/spf13/viper"
"github.com/uber/jaeger-lib/metrics"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/cmd/ingester/app/consumer"
"github.com/jaegertracing/jaeger/cmd/ingester/app/processor"
kafkaConsumer "github.com/jaegertracing/jaeger/pkg/kafka/consumer"
"github.com/jaegertracing/jaeger/plugin/storage/kafka"
"github.com/jaegertracing/jaeger/storage/spanstore"
)

const (
// EncodingJSON indicates spans are encoded as a json byte array
EncodingJSON = "json"
// EncodingProto indicates spans are encoded as a protobuf byte array
EncodingProto = "protobuf"

// ConfigPrefix is a prefix fro the ingester flags
ConfigPrefix = "ingester"
// SuffixBrokers is a suffix for the brokers flag
SuffixBrokers = ".brokers"
// SuffixTopic is a suffix for the topic flag
SuffixTopic = ".topic"
// SuffixGroupID is a suffix for the group-id flag
SuffixGroupID = ".group-id"
// SuffixParallelism is a suffix for the parallelism flag
SuffixParallelism = ".parallelism"
// SuffixEncoding is a suffix for the encoding flag
SuffixEncoding = ".encoding"

// DefaultBroker is the default kafka broker
DefaultBroker = "127.0.0.1:9092"
// DefaultTopic is the default kafka topic
DefaultTopic = "jaeger-spans"
// DefaultGroupID is the default consumer Group ID
DefaultGroupID = "jaeger-ingester"
// DefaultParallelism is the default parallelism for the span processor
DefaultParallelism = 1000
// DefaultEncoding is the default span encoding
DefaultEncoding = EncodingJSON
)

// Builder stores the configuration options for the Ingester
type Builder struct {
kafkaConsumer.Configuration
Parallelism int
Encoding string
}

// CreateConsumer creates a new span consumer for the ingester
func (b *Builder) CreateConsumer(logger *zap.Logger, metricsFactory metrics.Factory, spanWriter spanstore.Writer) (*consumer.Consumer, error) {
var unmarshaller kafka.Unmarshaller
if b.Encoding == EncodingJSON {
unmarshaller = kafka.NewJSONUnmarshaller()
} else {
unmarshaller = kafka.NewProtobufUnmarshaller()
}

spParams := processor.SpanProcessorParams{
Writer: spanWriter,
Unmarshaller: unmarshaller,
}
spanProcessor := processor.NewSpanProcessor(spParams)

consumerConfig := kafkaConsumer.Configuration{
Brokers: b.Brokers,
Topic: b.Topic,
GroupID: b.GroupID,
}
saramaConsumer, err := consumerConfig.NewConsumer()
if err != nil {
return nil, err
}

factoryParams := consumer.ProcessorFactoryParams{
Topic: b.Topic,
Parallelism: b.Parallelism,
SaramaConsumer: saramaConsumer,
BaseProcessor: spanProcessor,
Logger: logger,
Factory: metricsFactory,
}
processorFactory, err := consumer.NewProcessorFactory(factoryParams)
if err != nil {
return nil, err
}

consumerParams := consumer.Params{
InternalConsumer: saramaConsumer,
ProcessorFactory: *processorFactory,
Factory: metricsFactory,
Logger: logger,
}
return consumer.New(consumerParams)
}

// InitFromViper initializes Builder with properties from viper
func (b *Builder) InitFromViper(v *viper.Viper) {
b.Brokers = strings.Split(v.GetString(ConfigPrefix+SuffixBrokers), ",")
b.Topic = v.GetString(ConfigPrefix + SuffixTopic)
b.GroupID = v.GetString(ConfigPrefix + SuffixGroupID)
b.Parallelism = v.GetInt(ConfigPrefix + SuffixParallelism)
b.Encoding = v.GetString(ConfigPrefix + SuffixEncoding)
}
47 changes: 47 additions & 0 deletions cmd/ingester/app/flags.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright (c) 2018 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package app

import (
"flag"
"fmt"
"strconv"

"github.com/jaegertracing/jaeger/cmd/ingester/app/builder"
)

// AddFlags adds flags for Builder
func AddFlags(flagSet *flag.FlagSet) {
flagSet.String(
builder.ConfigPrefix+builder.SuffixBrokers,
builder.DefaultBroker,
"The comma-separated list of kafka brokers. i.e. '127.0.0.1:9092,0.0.0:1234'")
flagSet.String(
builder.ConfigPrefix+builder.SuffixTopic,
builder.DefaultTopic,
"The name of the kafka topic to consume from")
flagSet.String(
builder.ConfigPrefix+builder.SuffixGroupID,
builder.DefaultGroupID,
"The Consumer Group that ingester will be consuming on behalf of")
flagSet.String(
builder.ConfigPrefix+builder.SuffixParallelism,
strconv.Itoa(builder.DefaultParallelism),
"The number of messages to process in parallel")
flagSet.String(
builder.ConfigPrefix+builder.SuffixEncoding,
builder.DefaultEncoding,
fmt.Sprintf(`The encoding of spans ("%s" or "%s") consumed from kafka`, builder.EncodingProto, builder.EncodingJSON))
}
55 changes: 55 additions & 0 deletions cmd/ingester/app/flags_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Copyright (c) 2018 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package app

import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/jaegertracing/jaeger/cmd/ingester/app/builder"
"github.com/jaegertracing/jaeger/pkg/config"
)

func TestOptionsWithFlags(t *testing.T) {
b := &builder.Builder{}
v, command := config.Viperize(AddFlags)
command.ParseFlags([]string{
"--ingester.topic=topic1",
"--ingester.brokers=127.0.0.1:9092,0.0.0:1234",
"--ingester.group-id=group1",
"--ingester.parallelism=5",
"--ingester.encoding=json"})
b.InitFromViper(v)

assert.Equal(t, "topic1", b.Topic)
assert.Equal(t, []string{"127.0.0.1:9092", "0.0.0:1234"}, b.Brokers)
assert.Equal(t, "group1", b.GroupID)
assert.Equal(t, 5, b.Parallelism)
assert.Equal(t, builder.EncodingJSON, b.Encoding)
}

func TestFlagDefaults(t *testing.T) {
b := &builder.Builder{}
v, command := config.Viperize(AddFlags)
command.ParseFlags([]string{})
b.InitFromViper(v)

assert.Equal(t, builder.DefaultTopic, b.Topic)
assert.Equal(t, []string{builder.DefaultBroker}, b.Brokers)
assert.Equal(t, builder.DefaultGroupID, b.GroupID)
assert.Equal(t, builder.DefaultParallelism, b.Parallelism)
assert.Equal(t, builder.DefaultEncoding, b.Encoding)
}
130 changes: 130 additions & 0 deletions cmd/ingester/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
// Copyright (c) 2018 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
"fmt"
"io"
"log"
"os"
"os/signal"
"syscall"

"github.com/spf13/cobra"
"github.com/spf13/viper"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/cmd/env"
"github.com/jaegertracing/jaeger/cmd/flags"
"github.com/jaegertracing/jaeger/cmd/ingester/app"
"github.com/jaegertracing/jaeger/cmd/ingester/app/builder"
"github.com/jaegertracing/jaeger/pkg/config"
pMetrics "github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/pkg/version"
"github.com/jaegertracing/jaeger/plugin/storage"
)

func main() {
var signalsChannel = make(chan os.Signal, 0)
signal.Notify(signalsChannel, os.Interrupt, syscall.SIGTERM)

storageFactory, err := storage.NewFactory(storage.FactoryConfigFromEnvAndCLI(os.Args, os.Stderr))
if err != nil {
log.Fatalf("Cannot initialize storage factory: %v", err)
}

v := viper.New()
command := &cobra.Command{
Use: "jaeger-ingester",
Short: "Jaeger ingester consumes from Kafka and writes to storage",
Long: `Jaeger ingester consumes spans from a particular Kafka topic and writes them to all configured storage types.`,
RunE: func(cmd *cobra.Command, args []string) error {
err := flags.TryLoadConfigFile(v)
if err != nil {
return err
}

sFlags := new(flags.SharedFlags).InitFromViper(v)
logger, err := sFlags.NewLogger(zap.NewProductionConfig())
if err != nil {
return err
}
hc, err := sFlags.NewHealthCheck(logger)
if err != nil {
logger.Fatal("Could not start the health check server.", zap.Error(err))
}

mBldr := new(pMetrics.Builder).InitFromViper(v)
baseFactory, err := mBldr.CreateMetricsFactory("jaeger")
if err != nil {
logger.Fatal("Cannot create metrics factory.", zap.Error(err))
}
metricsFactory := baseFactory.Namespace("ingester", nil)

storageFactory.InitFromViper(v)
if err := storageFactory.Initialize(baseFactory, logger); err != nil {
logger.Fatal("Failed to init storage factory", zap.Error(err))
}
spanWriter, err := storageFactory.CreateSpanWriter()
if err != nil {
logger.Fatal("Failed to create span writer", zap.Error(err))
}

b := builder.Builder{}
b.InitFromViper(v)
consumer, err := b.CreateConsumer(logger, metricsFactory, spanWriter)
if err != nil {
logger.Fatal("Unable to create consumer", zap.Error(err))
}
consumer.Start()

hc.Ready()
select {
case <-signalsChannel:
logger.Info("Jaeger Ingester is starting to close")
err := consumer.Close()
if err != nil {
logger.Error("Failed to close consumer", zap.Error(err))
}
if closer, ok := spanWriter.(io.Closer); ok {
err := closer.Close()
if err != nil {
logger.Error("Failed to close span writer", zap.Error(err))
}
}
logger.Info("Jaeger Ingester has finished closing")
}
return nil
},
}

command.AddCommand(version.Command())
command.AddCommand(env.Command())

config.AddFlags(
v,
command,
flags.AddConfigFileFlag,
flags.AddFlags,
storageFactory.AddFlags,
pMetrics.AddFlags,
app.AddFlags,
)

if err := command.Execute(); err != nil {
fmt.Println(err.Error())
os.Exit(1)
}
}

0 comments on commit 012147a

Please sign in to comment.