Skip to content

Commit

Permalink
Use Ingester in kafka integration test
Browse files Browse the repository at this point in the history
Signed-off-by: Davit Yeghshatyan <davo@uber.com>
  • Loading branch information
Davit Yeghshatyan authored and David committed Jul 31, 2018
1 parent b216377 commit 1bcd314
Showing 1 changed file with 30 additions and 57 deletions.
87 changes: 30 additions & 57 deletions plugin/storage/integration/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,21 @@
package integration

import (
"bytes"
"os"
"strconv"
"testing"
"time"

"github.com/Shopify/sarama"
"github.com/gogo/protobuf/jsonpb"
"github.com/stretchr/testify/require"
"github.com/uber/jaeger-lib/metrics"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/cmd/ingester/app"
"github.com/jaegertracing/jaeger/cmd/ingester/app/builder"
"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/pkg/config"
"github.com/jaegertracing/jaeger/pkg/testutils"
"github.com/jaegertracing/jaeger/plugin/storage/kafka"
"github.com/jaegertracing/jaeger/plugin/storage/memory"
"github.com/jaegertracing/jaeger/storage/spanstore"
)

Expand All @@ -47,14 +46,24 @@ func (s *KafkaIntegrationTestSuite) initialize() error {
topic := "jaeger-kafka-integration-test-" + strconv.FormatInt(time.Now().UnixNano(), 10)

f := kafka.NewFactory()
v, command := config.Viperize(f.AddFlags)
v, command := config.Viperize(f.AddFlags, app.AddFlags)
command.ParseFlags([]string{
"--kafka.topic",
topic,
"--kafka.brokers",
defaultLocalKafkaBroker,
"--kafka.encoding",
"json",
"--ingester.brokers",
defaultLocalKafkaBroker,
"--ingester.topic",
topic,
"--ingester.group-id",
"kafka-integration-test",
"--ingester.parallelism",
"1000",
"--ingester.encoding",
"json",
})
f.InitFromViper(v)
if err := f.Initialize(metrics.NullFactory, s.logger); err != nil {
Expand All @@ -64,77 +73,41 @@ func (s *KafkaIntegrationTestSuite) initialize() error {
if err != nil {
return err
}
spanReader, err := createSpanReader(topic)

b := builder.Builder{}
b.InitFromViper(v)
traceStore := memory.NewStore()
spanConsumer, err := b.CreateConsumer(logger, metrics.NullFactory, traceStore)
if err != nil {
return err
}
spanConsumer.Start()

s.SpanWriter = spanWriter
s.SpanReader = spanReader
s.SpanReader = &ingester{traceStore}
s.Refresh = func() error { return nil }
s.CleanUp = func() error { return nil }
return nil
}

type spanReader struct {
logger *zap.Logger
topic string
consumer sarama.PartitionConsumer
}

func createSpanReader(topic string) (spanstore.Reader, error) {
logger, _ := testutils.NewLogger()
c, err := sarama.NewConsumer([]string{defaultLocalKafkaBroker}, nil)
if err != nil {
return nil, err
}
pc, err := c.ConsumePartition(topic, 0, sarama.OffsetOldest)
if err != nil {
return nil, err
}
return &spanReader{
consumer: pc,
topic: topic,
logger: logger,
}, nil
// The ingester consumes spans from kafka and writes them to an in-memory traceStore
type ingester struct {
traceStore *memory.Store
}

func (r *spanReader) GetTrace(traceID model.TraceID) (*model.Trace, error) {
result := &model.Trace{}
var err error
doneCh := make(chan struct{})
go func() {
for {
select {
case msg := <-r.consumer.Messages():
newSpan := model.Span{}
if err = jsonpb.Unmarshal(bytes.NewReader(msg.Value), &newSpan); err != nil {
r.logger.Error("protobuf unmarshaling error", zap.Error(err))
}
if newSpan.TraceID == traceID {
result.Spans = append(result.Spans, &newSpan)
}
case <-doneCh:
return
}
}
}()
time.Sleep(100 * time.Millisecond)
doneCh <- struct{}{}
if err != nil {
return nil, err
}
return result, nil
func (r *ingester) GetTrace(traceID model.TraceID) (*model.Trace, error) {
return r.traceStore.GetTrace(traceID)
}

func (r *spanReader) GetServices() ([]string, error) {
func (r *ingester) GetServices() ([]string, error) {
return nil, nil
}

func (r *spanReader) GetOperations(service string) ([]string, error) {
func (r *ingester) GetOperations(service string) ([]string, error) {
return nil, nil
}

func (r *spanReader) FindTraces(query *spanstore.TraceQueryParameters) ([]*model.Trace, error) {
func (r *ingester) FindTraces(query *spanstore.TraceQueryParameters) ([]*model.Trace, error) {
return nil, nil
}

Expand Down

0 comments on commit 1bcd314

Please sign in to comment.