From 012147a3f19dcc2a27aea46255d09168bc90d010 Mon Sep 17 00:00:00 2001 From: Davit Yeghshatyan Date: Mon, 30 Jul 2018 18:51:46 -0400 Subject: [PATCH] Add builder, flags, and main Signed-off-by: Davit Yeghshatyan --- cmd/ingester/app/builder/builder.go | 123 ++++++++++++++++++++++++++ cmd/ingester/app/flags.go | 47 ++++++++++ cmd/ingester/app/flags_test.go | 55 ++++++++++++ cmd/ingester/main.go | 130 ++++++++++++++++++++++++++++ 4 files changed, 355 insertions(+) create mode 100644 cmd/ingester/app/builder/builder.go create mode 100644 cmd/ingester/app/flags.go create mode 100644 cmd/ingester/app/flags_test.go create mode 100644 cmd/ingester/main.go diff --git a/cmd/ingester/app/builder/builder.go b/cmd/ingester/app/builder/builder.go new file mode 100644 index 00000000000..247f3e6381a --- /dev/null +++ b/cmd/ingester/app/builder/builder.go @@ -0,0 +1,123 @@ +// Copyright (c) 2018 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package builder + +import ( + "strings" + + "github.com/spf13/viper" + "github.com/uber/jaeger-lib/metrics" + "go.uber.org/zap" + + "github.com/jaegertracing/jaeger/cmd/ingester/app/consumer" + "github.com/jaegertracing/jaeger/cmd/ingester/app/processor" + kafkaConsumer "github.com/jaegertracing/jaeger/pkg/kafka/consumer" + "github.com/jaegertracing/jaeger/plugin/storage/kafka" + "github.com/jaegertracing/jaeger/storage/spanstore" +) + +const ( + // EncodingJSON indicates spans are encoded as a json byte array + EncodingJSON = "json" + // EncodingProto indicates spans are encoded as a protobuf byte array + EncodingProto = "protobuf" + + // ConfigPrefix is a prefix fro the ingester flags + ConfigPrefix = "ingester" + // SuffixBrokers is a suffix for the brokers flag + SuffixBrokers = ".brokers" + // SuffixTopic is a suffix for the topic flag + SuffixTopic = ".topic" + // SuffixGroupID is a suffix for the group-id flag + SuffixGroupID = ".group-id" + // SuffixParallelism is a suffix for the parallelism flag + SuffixParallelism = ".parallelism" + // SuffixEncoding is a suffix for the encoding flag + SuffixEncoding = ".encoding" + + // DefaultBroker is the default kafka broker + DefaultBroker = "127.0.0.1:9092" + // DefaultTopic is the default kafka topic + DefaultTopic = "jaeger-spans" + // DefaultGroupID is the default consumer Group ID + DefaultGroupID = "jaeger-ingester" + // DefaultParallelism is the default parallelism for the span processor + DefaultParallelism = 1000 + // DefaultEncoding is the default span encoding + DefaultEncoding = EncodingJSON +) + +// Builder stores the configuration options for the Ingester +type Builder struct { + kafkaConsumer.Configuration + Parallelism int + Encoding string +} + +// CreateConsumer creates a new span consumer for the ingester +func (b *Builder) CreateConsumer(logger *zap.Logger, metricsFactory metrics.Factory, spanWriter spanstore.Writer) (*consumer.Consumer, error) { + var unmarshaller kafka.Unmarshaller + if b.Encoding == EncodingJSON { + unmarshaller = kafka.NewJSONUnmarshaller() + } else { + unmarshaller = kafka.NewProtobufUnmarshaller() + } + + spParams := processor.SpanProcessorParams{ + Writer: spanWriter, + Unmarshaller: unmarshaller, + } + spanProcessor := processor.NewSpanProcessor(spParams) + + consumerConfig := kafkaConsumer.Configuration{ + Brokers: b.Brokers, + Topic: b.Topic, + GroupID: b.GroupID, + } + saramaConsumer, err := consumerConfig.NewConsumer() + if err != nil { + return nil, err + } + + factoryParams := consumer.ProcessorFactoryParams{ + Topic: b.Topic, + Parallelism: b.Parallelism, + SaramaConsumer: saramaConsumer, + BaseProcessor: spanProcessor, + Logger: logger, + Factory: metricsFactory, + } + processorFactory, err := consumer.NewProcessorFactory(factoryParams) + if err != nil { + return nil, err + } + + consumerParams := consumer.Params{ + InternalConsumer: saramaConsumer, + ProcessorFactory: *processorFactory, + Factory: metricsFactory, + Logger: logger, + } + return consumer.New(consumerParams) +} + +// InitFromViper initializes Builder with properties from viper +func (b *Builder) InitFromViper(v *viper.Viper) { + b.Brokers = strings.Split(v.GetString(ConfigPrefix+SuffixBrokers), ",") + b.Topic = v.GetString(ConfigPrefix + SuffixTopic) + b.GroupID = v.GetString(ConfigPrefix + SuffixGroupID) + b.Parallelism = v.GetInt(ConfigPrefix + SuffixParallelism) + b.Encoding = v.GetString(ConfigPrefix + SuffixEncoding) +} diff --git a/cmd/ingester/app/flags.go b/cmd/ingester/app/flags.go new file mode 100644 index 00000000000..30e646764a7 --- /dev/null +++ b/cmd/ingester/app/flags.go @@ -0,0 +1,47 @@ +// Copyright (c) 2018 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package app + +import ( + "flag" + "fmt" + "strconv" + + "github.com/jaegertracing/jaeger/cmd/ingester/app/builder" +) + +// AddFlags adds flags for Builder +func AddFlags(flagSet *flag.FlagSet) { + flagSet.String( + builder.ConfigPrefix+builder.SuffixBrokers, + builder.DefaultBroker, + "The comma-separated list of kafka brokers. i.e. '127.0.0.1:9092,0.0.0:1234'") + flagSet.String( + builder.ConfigPrefix+builder.SuffixTopic, + builder.DefaultTopic, + "The name of the kafka topic to consume from") + flagSet.String( + builder.ConfigPrefix+builder.SuffixGroupID, + builder.DefaultGroupID, + "The Consumer Group that ingester will be consuming on behalf of") + flagSet.String( + builder.ConfigPrefix+builder.SuffixParallelism, + strconv.Itoa(builder.DefaultParallelism), + "The number of messages to process in parallel") + flagSet.String( + builder.ConfigPrefix+builder.SuffixEncoding, + builder.DefaultEncoding, + fmt.Sprintf(`The encoding of spans ("%s" or "%s") consumed from kafka`, builder.EncodingProto, builder.EncodingJSON)) +} diff --git a/cmd/ingester/app/flags_test.go b/cmd/ingester/app/flags_test.go new file mode 100644 index 00000000000..a375cdaa9a5 --- /dev/null +++ b/cmd/ingester/app/flags_test.go @@ -0,0 +1,55 @@ +// Copyright (c) 2018 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package app + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/jaegertracing/jaeger/cmd/ingester/app/builder" + "github.com/jaegertracing/jaeger/pkg/config" +) + +func TestOptionsWithFlags(t *testing.T) { + b := &builder.Builder{} + v, command := config.Viperize(AddFlags) + command.ParseFlags([]string{ + "--ingester.topic=topic1", + "--ingester.brokers=127.0.0.1:9092,0.0.0:1234", + "--ingester.group-id=group1", + "--ingester.parallelism=5", + "--ingester.encoding=json"}) + b.InitFromViper(v) + + assert.Equal(t, "topic1", b.Topic) + assert.Equal(t, []string{"127.0.0.1:9092", "0.0.0:1234"}, b.Brokers) + assert.Equal(t, "group1", b.GroupID) + assert.Equal(t, 5, b.Parallelism) + assert.Equal(t, builder.EncodingJSON, b.Encoding) +} + +func TestFlagDefaults(t *testing.T) { + b := &builder.Builder{} + v, command := config.Viperize(AddFlags) + command.ParseFlags([]string{}) + b.InitFromViper(v) + + assert.Equal(t, builder.DefaultTopic, b.Topic) + assert.Equal(t, []string{builder.DefaultBroker}, b.Brokers) + assert.Equal(t, builder.DefaultGroupID, b.GroupID) + assert.Equal(t, builder.DefaultParallelism, b.Parallelism) + assert.Equal(t, builder.DefaultEncoding, b.Encoding) +} diff --git a/cmd/ingester/main.go b/cmd/ingester/main.go new file mode 100644 index 00000000000..d30af4bdda8 --- /dev/null +++ b/cmd/ingester/main.go @@ -0,0 +1,130 @@ +// Copyright (c) 2018 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "fmt" + "io" + "log" + "os" + "os/signal" + "syscall" + + "github.com/spf13/cobra" + "github.com/spf13/viper" + "go.uber.org/zap" + + "github.com/jaegertracing/jaeger/cmd/env" + "github.com/jaegertracing/jaeger/cmd/flags" + "github.com/jaegertracing/jaeger/cmd/ingester/app" + "github.com/jaegertracing/jaeger/cmd/ingester/app/builder" + "github.com/jaegertracing/jaeger/pkg/config" + pMetrics "github.com/jaegertracing/jaeger/pkg/metrics" + "github.com/jaegertracing/jaeger/pkg/version" + "github.com/jaegertracing/jaeger/plugin/storage" +) + +func main() { + var signalsChannel = make(chan os.Signal, 0) + signal.Notify(signalsChannel, os.Interrupt, syscall.SIGTERM) + + storageFactory, err := storage.NewFactory(storage.FactoryConfigFromEnvAndCLI(os.Args, os.Stderr)) + if err != nil { + log.Fatalf("Cannot initialize storage factory: %v", err) + } + + v := viper.New() + command := &cobra.Command{ + Use: "jaeger-ingester", + Short: "Jaeger ingester consumes from Kafka and writes to storage", + Long: `Jaeger ingester consumes spans from a particular Kafka topic and writes them to all configured storage types.`, + RunE: func(cmd *cobra.Command, args []string) error { + err := flags.TryLoadConfigFile(v) + if err != nil { + return err + } + + sFlags := new(flags.SharedFlags).InitFromViper(v) + logger, err := sFlags.NewLogger(zap.NewProductionConfig()) + if err != nil { + return err + } + hc, err := sFlags.NewHealthCheck(logger) + if err != nil { + logger.Fatal("Could not start the health check server.", zap.Error(err)) + } + + mBldr := new(pMetrics.Builder).InitFromViper(v) + baseFactory, err := mBldr.CreateMetricsFactory("jaeger") + if err != nil { + logger.Fatal("Cannot create metrics factory.", zap.Error(err)) + } + metricsFactory := baseFactory.Namespace("ingester", nil) + + storageFactory.InitFromViper(v) + if err := storageFactory.Initialize(baseFactory, logger); err != nil { + logger.Fatal("Failed to init storage factory", zap.Error(err)) + } + spanWriter, err := storageFactory.CreateSpanWriter() + if err != nil { + logger.Fatal("Failed to create span writer", zap.Error(err)) + } + + b := builder.Builder{} + b.InitFromViper(v) + consumer, err := b.CreateConsumer(logger, metricsFactory, spanWriter) + if err != nil { + logger.Fatal("Unable to create consumer", zap.Error(err)) + } + consumer.Start() + + hc.Ready() + select { + case <-signalsChannel: + logger.Info("Jaeger Ingester is starting to close") + err := consumer.Close() + if err != nil { + logger.Error("Failed to close consumer", zap.Error(err)) + } + if closer, ok := spanWriter.(io.Closer); ok { + err := closer.Close() + if err != nil { + logger.Error("Failed to close span writer", zap.Error(err)) + } + } + logger.Info("Jaeger Ingester has finished closing") + } + return nil + }, + } + + command.AddCommand(version.Command()) + command.AddCommand(env.Command()) + + config.AddFlags( + v, + command, + flags.AddConfigFileFlag, + flags.AddFlags, + storageFactory.AddFlags, + pMetrics.AddFlags, + app.AddFlags, + ) + + if err := command.Execute(); err != nil { + fmt.Println(err.Error()) + os.Exit(1) + } +}