Skip to content

Commit e22c22c

Browse files
author
Davit Yeghshatyan
committed
Extract consumer setup
Done to avoid redundant code in the integration test Signed-off-by: Davit Yeghshatyan <davo@uber.com>
1 parent 30117f3 commit e22c22c

File tree

1 file changed

+45
-37
lines changed

1 file changed

+45
-37
lines changed

cmd/ingester/main.go

+45-37
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,10 @@ import (
2121
"os/signal"
2222
"syscall"
2323

24+
"github.com/pkg/errors"
2425
"github.com/spf13/cobra"
2526
"github.com/spf13/viper"
27+
"github.com/uber/jaeger-lib/metrics"
2628
"go.uber.org/zap"
2729

2830
"github.com/jaegertracing/jaeger/cmd/env"
@@ -35,6 +37,7 @@ import (
3537
pMetrics "github.com/jaegertracing/jaeger/pkg/metrics"
3638
"github.com/jaegertracing/jaeger/pkg/version"
3739
"github.com/jaegertracing/jaeger/plugin/storage"
40+
"github.com/jaegertracing/jaeger/storage/spanstore"
3841
)
3942

4043
func main() {
@@ -85,43 +88,7 @@ func main() {
8588
if err != nil {
8689
logger.Fatal("Failed to create span writer", zap.Error(err))
8790
}
88-
unmarshaller := app.UnmarshallerFromType(options.Encoding)
89-
spParams := processor.SpanProcessorParams{
90-
Writer: spanWriter,
91-
Unmarshaller: unmarshaller,
92-
}
93-
spanProcessor := processor.NewSpanProcessor(spParams)
94-
95-
consumerConfig := consumer.Configuration{
96-
Brokers: options.Brokers,
97-
Topic: options.Topic,
98-
GroupID: options.GroupID,
99-
}
100-
saramaConsumer, err := consumerConfig.NewConsumer()
101-
if err != nil {
102-
logger.Fatal("Failed to create sarama consumer", zap.Error(err))
103-
}
104-
105-
factoryParams := spanconsumer.ProcessorFactoryParams{
106-
Topic: options.Topic,
107-
Parallelism: options.Parallelism,
108-
SaramaConsumer: saramaConsumer,
109-
BaseProcessor: spanProcessor,
110-
Logger: logger,
111-
Factory: metricsFactory,
112-
}
113-
processorFactory, err := spanconsumer.NewProcessorFactory(factoryParams)
114-
if err != nil {
115-
logger.Fatal("Failed to create processor factory", zap.Error(err))
116-
}
117-
118-
consumerParams := spanconsumer.Params{
119-
InternalConsumer: saramaConsumer,
120-
ProcessorFactory: *processorFactory,
121-
Factory: metricsFactory,
122-
Logger: logger,
123-
}
124-
kafkaConsumer, err := spanconsumer.New(consumerParams)
91+
kafkaConsumer, err := newConsumer(options, spanWriter, logger, metricsFactory)
12592
if err != nil {
12693
logger.Fatal("Unable to set up consumer", zap.Error(err))
12794
}
@@ -158,3 +125,44 @@ func main() {
158125
os.Exit(1)
159126
}
160127
}
128+
129+
func newConsumer(options app.Options, spanWriter spanstore.Writer, logger *zap.Logger, metricsFactory metrics.Factory) (*spanconsumer.Consumer, error) {
130+
unmarshaller := app.UnmarshallerFromType(options.Encoding)
131+
spParams := processor.SpanProcessorParams{
132+
Writer: spanWriter,
133+
Unmarshaller: unmarshaller,
134+
}
135+
spanProcessor := processor.NewSpanProcessor(spParams)
136+
137+
consumerConfig := consumer.Configuration{
138+
Brokers: options.Brokers,
139+
Topic: options.Topic,
140+
GroupID: options.GroupID,
141+
}
142+
saramaConsumer, err := consumerConfig.NewConsumer()
143+
if err != nil {
144+
return nil, errors.Wrap(err, "Failed to create sarama consumer")
145+
}
146+
147+
factoryParams := spanconsumer.ProcessorFactoryParams{
148+
Topic: options.Topic,
149+
Parallelism: options.Parallelism,
150+
SaramaConsumer: saramaConsumer,
151+
BaseProcessor: spanProcessor,
152+
Logger: logger,
153+
Factory: metricsFactory,
154+
}
155+
processorFactory, err := spanconsumer.NewProcessorFactory(factoryParams)
156+
if err != nil {
157+
return nil, errors.Wrap(err, "Failed to create processor factory")
158+
}
159+
160+
consumerParams := spanconsumer.Params{
161+
InternalConsumer: saramaConsumer,
162+
ProcessorFactory: *processorFactory,
163+
Factory: metricsFactory,
164+
Logger: logger,
165+
}
166+
167+
return spanconsumer.New(consumerParams)
168+
}

0 commit comments

Comments
 (0)