Skip to content

Commit d656f4e

Browse files
author
Davit Yeghshatyan
committed
Add options and main
Signed-off-by: Davit Yeghshatyan <davo@uber.com>
1 parent 8f27e9a commit d656f4e

File tree

3 files changed

+320
-0
lines changed

3 files changed

+320
-0
lines changed

cmd/ingester/app/options.go

+98
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
// Copyright (c) 2018 The Jaeger Authors.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package app
16+
17+
import (
18+
"flag"
19+
"fmt"
20+
"strconv"
21+
"strings"
22+
23+
"github.com/spf13/viper"
24+
25+
"github.com/jaegertracing/jaeger/pkg/kafka/config/consumer"
26+
"github.com/jaegertracing/jaeger/plugin/storage/kafka"
27+
)
28+
29+
const (
30+
configPrefix = "ingester"
31+
suffixBrokers = ".brokers"
32+
suffixTopic = ".topic"
33+
suffixGroupID = ".group-id"
34+
suffixParallelism = ".parallelism"
35+
suffixEncoding = ".encoding"
36+
37+
encodingJSON = "json"
38+
encodingProto = "protobuf"
39+
40+
defaultBroker = "127.0.0.1:9092"
41+
defaultTopic = "jaeger-ingester-spans"
42+
defaultGroupID = "jaeger-ingester"
43+
defaultParallelism = 1000
44+
defaultEncoding = encodingProto
45+
)
46+
47+
// Options stores the configuration options for a Kafka consumer
48+
type Options struct {
49+
consumer.Configuration
50+
Parallelism int
51+
Encoding string
52+
}
53+
54+
// AddFlags adds flags for Options
55+
func AddFlags(flagSet *flag.FlagSet) {
56+
flagSet.String(
57+
configPrefix+suffixBrokers,
58+
defaultBroker,
59+
"The comma-separated list of kafka brokers. i.e. '127.0.0.1:9092,0.0.0:1234'")
60+
flagSet.String(
61+
configPrefix+suffixTopic,
62+
defaultTopic,
63+
"The name of the kafka topic to consume from")
64+
flagSet.String(
65+
configPrefix+suffixGroupID,
66+
defaultGroupID,
67+
"The Consumer Group that ingester will be consuming on behalf of")
68+
flagSet.String(
69+
configPrefix+suffixParallelism,
70+
strconv.Itoa(defaultParallelism),
71+
"The number of messages to process in parallel")
72+
flagSet.String(
73+
configPrefix+suffixEncoding,
74+
defaultEncoding,
75+
fmt.Sprintf(`The encoding of spans ("%s" or "%s") consumed from kafka`, encodingProto, encodingJSON))
76+
}
77+
78+
// InitFromViper initializes Options with properties from viper
79+
func (opt *Options) InitFromViper(v *viper.Viper) {
80+
opt.Brokers = strings.Split(v.GetString(configPrefix+suffixBrokers), ",")
81+
opt.Topic = v.GetString(configPrefix + suffixTopic)
82+
opt.GroupID = v.GetString(configPrefix + suffixGroupID)
83+
opt.Parallelism = v.GetInt(configPrefix + suffixParallelism)
84+
opt.Encoding = v.GetString(configPrefix + suffixEncoding)
85+
}
86+
87+
// UnmarshallerFromType returns the corresponding Unmarshaller for a byte array span encoding.
88+
// Defaults to ProtobufUnmarshaller
89+
func UnmarshallerFromType(encoding string) kafka.Unmarshaller {
90+
switch encoding {
91+
case encodingJSON:
92+
return kafka.NewJSONUnmarshaller()
93+
case encodingProto:
94+
return kafka.NewProtobufUnmarshaller()
95+
default:
96+
return kafka.NewProtobufUnmarshaller()
97+
}
98+
}

cmd/ingester/app/options_test.go

+66
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
// Copyright (c) 2018 The Jaeger Authors.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package app
16+
17+
import (
18+
"testing"
19+
20+
"github.com/stretchr/testify/assert"
21+
22+
"github.com/jaegertracing/jaeger/pkg/config"
23+
"github.com/jaegertracing/jaeger/plugin/storage/kafka"
24+
)
25+
26+
func TestOptionsWithFlags(t *testing.T) {
27+
opts := &Options{}
28+
v, command := config.Viperize(AddFlags)
29+
command.ParseFlags([]string{
30+
"--ingester.topic=topic1",
31+
"--ingester.brokers=127.0.0.1:9092,0.0.0:1234",
32+
"--ingester.group-id=group1",
33+
"--ingester.parallelism=5",
34+
"--ingester.encoding=json"})
35+
opts.InitFromViper(v)
36+
37+
assert.Equal(t, "topic1", opts.Topic)
38+
assert.Equal(t, []string{"127.0.0.1:9092", "0.0.0:1234"}, opts.Brokers)
39+
assert.Equal(t, "group1", opts.GroupID)
40+
assert.Equal(t, 5, opts.Parallelism)
41+
assert.Equal(t, encodingJSON, opts.Encoding)
42+
}
43+
44+
func TestFlagDefaults(t *testing.T) {
45+
opts := &Options{}
46+
v, command := config.Viperize(AddFlags)
47+
command.ParseFlags([]string{})
48+
opts.InitFromViper(v)
49+
50+
assert.Equal(t, defaultTopic, opts.Topic)
51+
assert.Equal(t, []string{defaultBroker}, opts.Brokers)
52+
assert.Equal(t, defaultGroupID, opts.GroupID)
53+
assert.Equal(t, defaultParallelism, opts.Parallelism)
54+
assert.Equal(t, defaultEncoding, opts.Encoding)
55+
}
56+
57+
func TestUnmarshallerFromType(t *testing.T) {
58+
res := UnmarshallerFromType(encodingJSON)
59+
assert.IsType(t, kafka.JSONUnmarshaller{}, res)
60+
res = UnmarshallerFromType(encodingProto)
61+
assert.IsType(t, kafka.ProtobufUnmarshaller{}, res)
62+
res = UnmarshallerFromType("something else")
63+
assert.IsType(t, kafka.ProtobufUnmarshaller{}, res)
64+
res = UnmarshallerFromType("")
65+
assert.IsType(t, kafka.ProtobufUnmarshaller{}, res)
66+
}

cmd/ingester/main.go

+156
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
// Copyright (c) 2018 The Jaeger Authors.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package main
16+
17+
import (
18+
"fmt"
19+
"log"
20+
"os"
21+
"os/signal"
22+
"syscall"
23+
24+
"github.com/spf13/cobra"
25+
"github.com/spf13/viper"
26+
"go.uber.org/zap"
27+
28+
"github.com/jaegertracing/jaeger/cmd/env"
29+
"github.com/jaegertracing/jaeger/cmd/flags"
30+
"github.com/jaegertracing/jaeger/cmd/ingester/app"
31+
spanconsumer "github.com/jaegertracing/jaeger/cmd/ingester/app/consumer"
32+
"github.com/jaegertracing/jaeger/cmd/ingester/app/processor"
33+
"github.com/jaegertracing/jaeger/pkg/config"
34+
cConfig "github.com/jaegertracing/jaeger/pkg/kafka/config/consumer"
35+
pMetrics "github.com/jaegertracing/jaeger/pkg/metrics"
36+
"github.com/jaegertracing/jaeger/pkg/version"
37+
"github.com/jaegertracing/jaeger/plugin/storage"
38+
)
39+
40+
func main() {
41+
var signalsChannel = make(chan os.Signal, 0)
42+
signal.Notify(signalsChannel, os.Interrupt, syscall.SIGTERM)
43+
44+
storageFactory, err := storage.NewFactory(storage.FactoryConfigFromEnvAndCLI(os.Args, os.Stderr))
45+
if err != nil {
46+
log.Fatalf("Cannot initialize storage factory: %v", err)
47+
}
48+
49+
v := viper.New()
50+
command := &cobra.Command{
51+
Use: "jaeger-ingester",
52+
Short: "Jaeger ingester consumes from Kafka and writes to storage",
53+
Long: `Jaeger ingester consumes spans from a particular Kafka topic and writes them to all configured storage types.`,
54+
RunE: func(cmd *cobra.Command, args []string) error {
55+
err := flags.TryLoadConfigFile(v)
56+
if err != nil {
57+
return err
58+
}
59+
60+
sFlags := new(flags.SharedFlags).InitFromViper(v)
61+
logger, err := sFlags.NewLogger(zap.NewProductionConfig())
62+
if err != nil {
63+
return err
64+
}
65+
hc, err := sFlags.NewHealthCheck(logger)
66+
if err != nil {
67+
logger.Fatal("Could not start the health check server.", zap.Error(err))
68+
}
69+
70+
mBldr := new(pMetrics.Builder).InitFromViper(v)
71+
baseFactory, err := mBldr.CreateMetricsFactory("jaeger")
72+
if err != nil {
73+
logger.Fatal("Cannot create metrics factory.", zap.Error(err))
74+
}
75+
metricsFactory := baseFactory.Namespace("ingester", nil)
76+
77+
options := app.Options{}
78+
options.InitFromViper(v)
79+
80+
storageFactory.InitFromViper(v)
81+
if err := storageFactory.Initialize(baseFactory, logger); err != nil {
82+
logger.Fatal("Failed to init storage factory", zap.Error(err))
83+
}
84+
spanWriter, err := storageFactory.CreateSpanWriter()
85+
if err != nil {
86+
logger.Fatal("Failed to create span writer", zap.Error(err))
87+
}
88+
unmarshaller := app.UnmarshallerFromType(options.Encoding)
89+
spanProcessor := processor.NewSpanProcessor(spanWriter, unmarshaller)
90+
91+
consumerConfig := cConfig.Configuration{
92+
Brokers: options.Brokers,
93+
Topic: options.Topic,
94+
GroupID: options.GroupID,
95+
}
96+
saramaConsumer, err := consumerConfig.NewConsumer()
97+
if err != nil {
98+
logger.Fatal("Failed to create sarama consumer", zap.Error(err))
99+
}
100+
101+
factoryParams := spanconsumer.FactoryParams{
102+
Topic: options.Topic,
103+
Parallelism: options.Parallelism,
104+
SaramaConsumer: saramaConsumer,
105+
BaseProcessor: spanProcessor,
106+
Logger: logger,
107+
Factory: metricsFactory,
108+
}
109+
processorFactory, err := spanconsumer.NewFactory(factoryParams)
110+
if err != nil {
111+
logger.Fatal("Failed to create processor factory", zap.Error(err))
112+
}
113+
114+
consumerParams := spanconsumer.Params{
115+
SaramaConsumer: saramaConsumer,
116+
ProcessorFactory: processorFactory,
117+
Factory: metricsFactory,
118+
Logger: logger,
119+
}
120+
consumer, err := spanconsumer.New(consumerParams)
121+
if err != nil {
122+
logger.Fatal("Unable to set up consumer", zap.Error(err))
123+
}
124+
consumer.Start()
125+
126+
hc.Ready()
127+
select {
128+
case <-signalsChannel:
129+
err := consumer.Close()
130+
if err != nil {
131+
logger.Error("Failed to close span writer", zap.Error(err))
132+
}
133+
logger.Info("Jaeger Ingester is finishing")
134+
}
135+
return nil
136+
},
137+
}
138+
139+
command.AddCommand(version.Command())
140+
command.AddCommand(env.Command())
141+
142+
config.AddFlags(
143+
v,
144+
command,
145+
flags.AddConfigFileFlag,
146+
flags.AddFlags,
147+
storageFactory.AddFlags,
148+
pMetrics.AddFlags,
149+
app.AddFlags,
150+
)
151+
152+
if err := command.Execute(); err != nil {
153+
fmt.Println(err.Error())
154+
os.Exit(1)
155+
}
156+
}

0 commit comments

Comments
 (0)