diff --git a/cmd/ingester/main.go b/cmd/ingester/main.go index bbdc0e36dc1..0a4516935c8 100644 --- a/cmd/ingester/main.go +++ b/cmd/ingester/main.go @@ -21,8 +21,10 @@ import ( "os/signal" "syscall" + "github.com/pkg/errors" "github.com/spf13/cobra" "github.com/spf13/viper" + "github.com/uber/jaeger-lib/metrics" "go.uber.org/zap" "github.com/jaegertracing/jaeger/cmd/env" @@ -35,6 +37,7 @@ import ( pMetrics "github.com/jaegertracing/jaeger/pkg/metrics" "github.com/jaegertracing/jaeger/pkg/version" "github.com/jaegertracing/jaeger/plugin/storage" + "github.com/jaegertracing/jaeger/storage/spanstore" ) func main() { @@ -85,43 +88,7 @@ func main() { if err != nil { logger.Fatal("Failed to create span writer", zap.Error(err)) } - unmarshaller := app.UnmarshallerFromType(options.Encoding) - spParams := processor.SpanProcessorParams{ - Writer: spanWriter, - Unmarshaller: unmarshaller, - } - spanProcessor := processor.NewSpanProcessor(spParams) - - consumerConfig := consumer.Configuration{ - Brokers: options.Brokers, - Topic: options.Topic, - GroupID: options.GroupID, - } - saramaConsumer, err := consumerConfig.NewConsumer() - if err != nil { - logger.Fatal("Failed to create sarama consumer", zap.Error(err)) - } - - factoryParams := spanconsumer.ProcessorFactoryParams{ - Topic: options.Topic, - Parallelism: options.Parallelism, - SaramaConsumer: saramaConsumer, - BaseProcessor: spanProcessor, - Logger: logger, - Factory: metricsFactory, - } - processorFactory, err := spanconsumer.NewProcessorFactory(factoryParams) - if err != nil { - logger.Fatal("Failed to create processor factory", zap.Error(err)) - } - - consumerParams := spanconsumer.Params{ - InternalConsumer: saramaConsumer, - ProcessorFactory: *processorFactory, - Factory: metricsFactory, - Logger: logger, - } - kafkaConsumer, err := spanconsumer.New(consumerParams) + kafkaConsumer, err := newConsumer(options, spanWriter, logger, metricsFactory) if err != nil { logger.Fatal("Unable to set up consumer", zap.Error(err)) } @@ -158,3 +125,44 @@ func main() { os.Exit(1) } } + +func newConsumer(options app.Options, spanWriter spanstore.Writer, logger *zap.Logger, metricsFactory metrics.Factory) (*spanconsumer.Consumer, error) { + unmarshaller := app.UnmarshallerFromType(options.Encoding) + spParams := processor.SpanProcessorParams{ + Writer: spanWriter, + Unmarshaller: unmarshaller, + } + spanProcessor := processor.NewSpanProcessor(spParams) + + consumerConfig := consumer.Configuration{ + Brokers: options.Brokers, + Topic: options.Topic, + GroupID: options.GroupID, + } + saramaConsumer, err := consumerConfig.NewConsumer() + if err != nil { + return nil, errors.Wrap(err, "Failed to create sarama consumer") + } + + factoryParams := spanconsumer.ProcessorFactoryParams{ + Topic: options.Topic, + Parallelism: options.Parallelism, + SaramaConsumer: saramaConsumer, + BaseProcessor: spanProcessor, + Logger: logger, + Factory: metricsFactory, + } + processorFactory, err := spanconsumer.NewProcessorFactory(factoryParams) + if err != nil { + return nil, errors.Wrap(err, "Failed to create processor factory") + } + + consumerParams := spanconsumer.Params{ + InternalConsumer: saramaConsumer, + ProcessorFactory: *processorFactory, + Factory: metricsFactory, + Logger: logger, + } + + return spanconsumer.New(consumerParams) +}