diff --git a/plugin/storage/integration/kafka_test.go b/plugin/storage/integration/kafka_test.go index efc55c85fa5..8a78cda8318 100644 --- a/plugin/storage/integration/kafka_test.go +++ b/plugin/storage/integration/kafka_test.go @@ -15,22 +15,21 @@ package integration import ( - "bytes" "os" "strconv" "testing" "time" - "github.com/Shopify/sarama" - "github.com/gogo/protobuf/jsonpb" "github.com/stretchr/testify/require" "github.com/uber/jaeger-lib/metrics" - "go.uber.org/zap" + "github.com/jaegertracing/jaeger/cmd/ingester/app" + "github.com/jaegertracing/jaeger/cmd/ingester/app/builder" "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/pkg/config" "github.com/jaegertracing/jaeger/pkg/testutils" "github.com/jaegertracing/jaeger/plugin/storage/kafka" + "github.com/jaegertracing/jaeger/plugin/storage/memory" "github.com/jaegertracing/jaeger/storage/spanstore" ) @@ -47,7 +46,7 @@ func (s *KafkaIntegrationTestSuite) initialize() error { topic := "jaeger-kafka-integration-test-" + strconv.FormatInt(time.Now().UnixNano(), 10) f := kafka.NewFactory() - v, command := config.Viperize(f.AddFlags) + v, command := config.Viperize(f.AddFlags, app.AddFlags) command.ParseFlags([]string{ "--kafka.topic", topic, @@ -55,6 +54,16 @@ func (s *KafkaIntegrationTestSuite) initialize() error { defaultLocalKafkaBroker, "--kafka.encoding", "json", + "--ingester.brokers", + defaultLocalKafkaBroker, + "--ingester.topic", + topic, + "--ingester.group-id", + "kafka-integration-test", + "--ingester.parallelism", + "1000", + "--ingester.encoding", + "json", }) f.InitFromViper(v) if err := f.Initialize(metrics.NullFactory, s.logger); err != nil { @@ -64,77 +73,41 @@ func (s *KafkaIntegrationTestSuite) initialize() error { if err != nil { return err } - spanReader, err := createSpanReader(topic) + + options := app.Options{} + options.InitFromViper(v) + traceStore := memory.NewStore() + spanConsumer, err := builder.CreateConsumer(logger, metrics.NullFactory, traceStore, options) if err != nil { return err } + spanConsumer.Start() + s.SpanWriter = spanWriter - s.SpanReader = spanReader + s.SpanReader = &ingester{traceStore} s.Refresh = func() error { return nil } s.CleanUp = func() error { return nil } return nil } -type spanReader struct { - logger *zap.Logger - topic string - consumer sarama.PartitionConsumer -} - -func createSpanReader(topic string) (spanstore.Reader, error) { - logger, _ := testutils.NewLogger() - c, err := sarama.NewConsumer([]string{defaultLocalKafkaBroker}, nil) - if err != nil { - return nil, err - } - pc, err := c.ConsumePartition(topic, 0, sarama.OffsetOldest) - if err != nil { - return nil, err - } - return &spanReader{ - consumer: pc, - topic: topic, - logger: logger, - }, nil +// The ingester consumes spans from kafka and writes them to an in-memory traceStore +type ingester struct { + traceStore *memory.Store } -func (r *spanReader) GetTrace(traceID model.TraceID) (*model.Trace, error) { - result := &model.Trace{} - var err error - doneCh := make(chan struct{}) - go func() { - for { - select { - case msg := <-r.consumer.Messages(): - newSpan := model.Span{} - if err = jsonpb.Unmarshal(bytes.NewReader(msg.Value), &newSpan); err != nil { - r.logger.Error("protobuf unmarshaling error", zap.Error(err)) - } - if newSpan.TraceID == traceID { - result.Spans = append(result.Spans, &newSpan) - } - case <-doneCh: - return - } - } - }() - time.Sleep(100 * time.Millisecond) - doneCh <- struct{}{} - if err != nil { - return nil, err - } - return result, nil +func (r *ingester) GetTrace(traceID model.TraceID) (*model.Trace, error) { + return r.traceStore.GetTrace(traceID) } -func (r *spanReader) GetServices() ([]string, error) { +func (r *ingester) GetServices() ([]string, error) { return nil, nil } -func (r *spanReader) GetOperations(service string) ([]string, error) { +func (r *ingester) GetOperations(service string) ([]string, error) { return nil, nil } -func (r *spanReader) FindTraces(query *spanstore.TraceQueryParameters) ([]*model.Trace, error) { +func (r *ingester) FindTraces(query *spanstore.TraceQueryParameters) ([]*model.Trace, error) { return nil, nil }