Skip to content

Commit

Permalink
Extract consumer setup
Browse files Browse the repository at this point in the history
Done to avoid redundant code in the integration test

Signed-off-by: Davit Yeghshatyan <davo@uber.com>
  • Loading branch information
Davit Yeghshatyan committed Jul 27, 2018
1 parent 0dec945 commit 60fc837
Showing 1 changed file with 45 additions and 37 deletions.
82 changes: 45 additions & 37 deletions cmd/ingester/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@ import (
"os/signal"
"syscall"

"github.com/pkg/errors"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"github.com/uber/jaeger-lib/metrics"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/cmd/env"
Expand All @@ -35,6 +37,7 @@ import (
pMetrics "github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/pkg/version"
"github.com/jaegertracing/jaeger/plugin/storage"
"github.com/jaegertracing/jaeger/storage/spanstore"
)

func main() {
Expand Down Expand Up @@ -85,43 +88,7 @@ func main() {
if err != nil {
logger.Fatal("Failed to create span writer", zap.Error(err))
}
unmarshaller := app.UnmarshallerFromType(options.Encoding)
spParams := processor.SpanProcessorParams{
Writer: spanWriter,
Unmarshaller: unmarshaller,
}
spanProcessor := processor.NewSpanProcessor(spParams)

consumerConfig := consumer.Configuration{
Brokers: options.Brokers,
Topic: options.Topic,
GroupID: options.GroupID,
}
saramaConsumer, err := consumerConfig.NewConsumer()
if err != nil {
logger.Fatal("Failed to create sarama consumer", zap.Error(err))
}

factoryParams := spanconsumer.ProcessorFactoryParams{
Topic: options.Topic,
Parallelism: options.Parallelism,
SaramaConsumer: saramaConsumer,
BaseProcessor: spanProcessor,
Logger: logger,
Factory: metricsFactory,
}
processorFactory, err := spanconsumer.NewProcessorFactory(factoryParams)
if err != nil {
logger.Fatal("Failed to create processor factory", zap.Error(err))
}

consumerParams := spanconsumer.Params{
InternalConsumer: saramaConsumer,
ProcessorFactory: *processorFactory,
Factory: metricsFactory,
Logger: logger,
}
kafkaConsumer, err := spanconsumer.New(consumerParams)
kafkaConsumer, err := newConsumer(options, spanWriter, logger, metricsFactory)
if err != nil {
logger.Fatal("Unable to set up consumer", zap.Error(err))
}
Expand Down Expand Up @@ -158,3 +125,44 @@ func main() {
os.Exit(1)
}
}

func newConsumer(options app.Options, spanWriter spanstore.Writer, logger *zap.Logger, metricsFactory metrics.Factory) (*spanconsumer.Consumer, error) {
unmarshaller := app.UnmarshallerFromType(options.Encoding)
spParams := processor.SpanProcessorParams{
Writer: spanWriter,
Unmarshaller: unmarshaller,
}
spanProcessor := processor.NewSpanProcessor(spParams)

consumerConfig := consumer.Configuration{
Brokers: options.Brokers,
Topic: options.Topic,
GroupID: options.GroupID,
}
saramaConsumer, err := consumerConfig.NewConsumer()
if err != nil {
return nil, errors.Wrap(err, "Failed to create sarama consumer")
}

factoryParams := spanconsumer.ProcessorFactoryParams{
Topic: options.Topic,
Parallelism: options.Parallelism,
SaramaConsumer: saramaConsumer,
BaseProcessor: spanProcessor,
Logger: logger,
Factory: metricsFactory,
}
processorFactory, err := spanconsumer.NewProcessorFactory(factoryParams)
if err != nil {
return nil, errors.Wrap(err, "Failed to create processor factory")
}

consumerParams := spanconsumer.Params{
InternalConsumer: saramaConsumer,
ProcessorFactory: *processorFactory,
Factory: metricsFactory,
Logger: logger,
}

return spanconsumer.New(consumerParams)
}

0 comments on commit 60fc837

Please sign in to comment.