From 7f13eb66c33341f840f83f43a5fc2bf2f9c8628c Mon Sep 17 00:00:00 2001 From: Pavol Loffay Date: Wed, 26 Aug 2020 20:00:16 +0200 Subject: [PATCH] Make Kafka payload encoding configurable (#1584) * Make Kafka payload encoding configurable Signed-off-by: Pavol Loffay * Expose marshalers in factory Signed-off-by: Pavol Loffay * Remove zipkin from exporter readme Signed-off-by: Pavol Loffay * Make custom encodings optional in the factory Signed-off-by: Pavol Loffay --- exporter/kafkaexporter/README.md | 8 +- exporter/kafkaexporter/config.go | 2 + exporter/kafkaexporter/config_test.go | 5 +- exporter/kafkaexporter/factory.go | 38 +++++-- exporter/kafkaexporter/factory_test.go | 43 +++++++- exporter/kafkaexporter/jaeger_marshaller.go | 98 +++++++++++++++++++ .../kafkaexporter/jaeger_marshaller_test.go | 95 ++++++++++++++++++ exporter/kafkaexporter/kafka_exporter.go | 33 +++++-- exporter/kafkaexporter/kafka_exporter_test.go | 25 +++-- exporter/kafkaexporter/marshaller.go | 29 ++++-- exporter/kafkaexporter/marshaller_test.go | 38 +++---- exporter/kafkaexporter/otlp_marshaller.go | 40 ++++++++ .../kafkaexporter/otlp_marshaller_test.go | 44 +++++++++ go.sum | 1 - receiver/kafkareceiver/README.md | 10 +- receiver/kafkareceiver/config.go | 2 + receiver/kafkareceiver/config_test.go | 1 + receiver/kafkareceiver/factory.go | 32 +++++- receiver/kafkareceiver/factory_test.go | 43 +++++++- receiver/kafkareceiver/jaeger_unmarshaller.go | 69 +++++++++++++ .../kafkareceiver/jaeger_unmarshaller_test.go | 87 ++++++++++++++++ receiver/kafkareceiver/kafka_receiver.go | 18 +++- receiver/kafkareceiver/kafka_receiver_test.go | 19 +++- receiver/kafkareceiver/otlp_unmarshaller.go | 38 +++++++ .../kafkareceiver/otlp_unmarshaller_test.go | 50 ++++++++++ receiver/kafkareceiver/unmarshaller.go | 35 ++++--- receiver/kafkareceiver/unmarshaller_test.go | 41 +++----- receiver/kafkareceiver/zipkin_unmarshaller.go | 88 +++++++++++++++++ .../kafkareceiver/zipkin_unmarshaller_test.go | 98 +++++++++++++++++++ 29 files changed, 1003 insertions(+), 127 deletions(-) create mode 100644 exporter/kafkaexporter/jaeger_marshaller.go create mode 100644 exporter/kafkaexporter/jaeger_marshaller_test.go create mode 100644 exporter/kafkaexporter/otlp_marshaller.go create mode 100644 exporter/kafkaexporter/otlp_marshaller_test.go create mode 100644 receiver/kafkareceiver/jaeger_unmarshaller.go create mode 100644 receiver/kafkareceiver/jaeger_unmarshaller_test.go create mode 100644 receiver/kafkareceiver/otlp_unmarshaller.go create mode 100644 receiver/kafkareceiver/otlp_unmarshaller_test.go create mode 100644 receiver/kafkareceiver/zipkin_unmarshaller.go create mode 100644 receiver/kafkareceiver/zipkin_unmarshaller_test.go diff --git a/exporter/kafkaexporter/README.md b/exporter/kafkaexporter/README.md index 38fe2d8d4c1..8eadfc9952a 100644 --- a/exporter/kafkaexporter/README.md +++ b/exporter/kafkaexporter/README.md @@ -2,16 +2,18 @@ Kafka exporter exports traces to Kafka. This exporter uses a synchronous producer that blocks and does not batch messages, therefore it should be used with batch and queued retry -processors for higher throughput and resiliency. +processors for higher throughput and resiliency. Message payload encoding is configurable. -Message payloads are serialized OTLP `ExportTraceServiceRequest`. - The following settings are required: - `protocol_version` (no default): Kafka protocol version e.g. 2.0.0 The following settings can be optionally configured: - `brokers` (default = localhost:9092): The list of kafka brokers - `topic` (default = otlp_spans): The name of the kafka topic to export to +- `encoding` (default = otlp_proto): The encoding of the payload sent to kafka. Available encodings: + - `otlp_proto`: the payload is serialized to `ExportTraceServiceRequest`. + - `jaeger_proto`: the payload is serialized to a single Jaeger proto `Span`. + - `jaeger_json`: the payload is serialized to a single Jaeger JSON Span using `jsonpb`. - `metadata` - `full` (default = true): Whether to maintain a full set of metadata. When disabled the client does not make the initial request to broker at the startup. diff --git a/exporter/kafkaexporter/config.go b/exporter/kafkaexporter/config.go index 18fea8a3a52..1b3124d6102 100644 --- a/exporter/kafkaexporter/config.go +++ b/exporter/kafkaexporter/config.go @@ -34,6 +34,8 @@ type Config struct { ProtocolVersion string `mapstructure:"protocol_version"` // The name of the kafka topic to export to (default "otlp_spans") Topic string `mapstructure:"topic"` + // Encoding of the messages (default "otlp_proto") + Encoding string `mapstructure:"encoding"` // Metadata is the namespace for metadata management properties used by the // Client, and shared by the Producer/Consumer. diff --git a/exporter/kafkaexporter/config_test.go b/exporter/kafkaexporter/config_test.go index a913e9dc6c0..aee27033d45 100644 --- a/exporter/kafkaexporter/config_test.go +++ b/exporter/kafkaexporter/config_test.go @@ -58,8 +58,9 @@ func TestLoadConfig(t *testing.T) { NumConsumers: 2, QueueSize: 10, }, - Topic: "spans", - Brokers: []string{"foo:123", "bar:456"}, + Topic: "spans", + Encoding: "otlp_proto", + Brokers: []string{"foo:123", "bar:456"}, Metadata: Metadata{ Full: false, Retry: MetadataRetry{ diff --git a/exporter/kafkaexporter/factory.go b/exporter/kafkaexporter/factory.go index 5f7569cb728..788126bf987 100644 --- a/exporter/kafkaexporter/factory.go +++ b/exporter/kafkaexporter/factory.go @@ -24,9 +24,10 @@ import ( ) const ( - typeStr = "kafka" - defaultTopic = "otlp_spans" - defaultBroker = "localhost:9092" + typeStr = "kafka" + defaultTopic = "otlp_spans" + defaultEncoding = "otlp_proto" + defaultBroker = "localhost:9092" // default from sarama.NewConfig() defaultMetadataRetryMax = 3 // default from sarama.NewConfig() @@ -35,12 +36,30 @@ const ( defaultMetadataFull = true ) +// FactoryOption applies changes to kafkaExporterFactory. +type FactoryOption func(factory *kafkaExporterFactory) + +// WithAddMarshallers adds marshallers. +func WithAddMarshallers(encodingMarshaller map[string]Marshaller) FactoryOption { + return func(factory *kafkaExporterFactory) { + for encoding, marshaller := range encodingMarshaller { + factory.marshallers[encoding] = marshaller + } + } +} + // NewFactory creates Kafka exporter factory. -func NewFactory() component.ExporterFactory { +func NewFactory(options ...FactoryOption) component.ExporterFactory { + f := &kafkaExporterFactory{ + marshallers: defaultMarshallers(), + } + for _, o := range options { + o(f) + } return exporterhelper.NewFactory( typeStr, createDefaultConfig, - exporterhelper.WithTraces(createTraceExporter)) + exporterhelper.WithTraces(f.createTraceExporter)) } func createDefaultConfig() configmodels.Exporter { @@ -57,6 +76,7 @@ func createDefaultConfig() configmodels.Exporter { QueueSettings: qs, Brokers: []string{defaultBroker}, Topic: defaultTopic, + Encoding: defaultEncoding, Metadata: Metadata{ Full: defaultMetadataFull, Retry: MetadataRetry{ @@ -67,13 +87,17 @@ func createDefaultConfig() configmodels.Exporter { } } -func createTraceExporter( +type kafkaExporterFactory struct { + marshallers map[string]Marshaller +} + +func (f *kafkaExporterFactory) createTraceExporter( _ context.Context, params component.ExporterCreateParams, cfg configmodels.Exporter, ) (component.TraceExporter, error) { oCfg := cfg.(*Config) - exp, err := newExporter(*oCfg, params) + exp, err := newExporter(*oCfg, params, f.marshallers) if err != nil { return nil, err } diff --git a/exporter/kafkaexporter/factory_test.go b/exporter/kafkaexporter/factory_test.go index 00e7200a30d..9255dfb3eb6 100644 --- a/exporter/kafkaexporter/factory_test.go +++ b/exporter/kafkaexporter/factory_test.go @@ -23,6 +23,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configcheck" + "go.opentelemetry.io/collector/consumer/pdata" ) func TestCreateDefaultConfig(t *testing.T) { @@ -39,7 +40,8 @@ func TestCreateTracesExporter(t *testing.T) { cfg.ProtocolVersion = "2.0.0" // this disables contacting the broker so we can successfully create the exporter cfg.Metadata.Full = false - r, err := createTraceExporter(context.Background(), component.ExporterCreateParams{}, cfg) + f := kafkaExporterFactory{marshallers: defaultMarshallers()} + r, err := f.createTraceExporter(context.Background(), component.ExporterCreateParams{}, cfg) require.NoError(t, err) assert.NotNil(t, r) } @@ -48,8 +50,43 @@ func TestCreateTracesExporter_err(t *testing.T) { cfg := createDefaultConfig().(*Config) cfg.Brokers = []string{"invalid:9092"} cfg.ProtocolVersion = "2.0.0" - // we get the error because the exporter - r, err := createTraceExporter(context.Background(), component.ExporterCreateParams{}, cfg) + f := kafkaExporterFactory{marshallers: defaultMarshallers()} + r, err := f.createTraceExporter(context.Background(), component.ExporterCreateParams{}, cfg) + // no available broker require.Error(t, err) assert.Nil(t, r) } + +func TestWithMarshallers(t *testing.T) { + cm := &customMarshaller{} + f := NewFactory(WithAddMarshallers(map[string]Marshaller{cm.Encoding(): cm})) + cfg := createDefaultConfig().(*Config) + // disable contacting broker + cfg.Metadata.Full = false + + t.Run("custom_encoding", func(t *testing.T) { + cfg.Encoding = cm.Encoding() + exporter, err := f.CreateTraceExporter(context.Background(), component.ExporterCreateParams{}, cfg) + require.NoError(t, err) + require.NotNil(t, exporter) + }) + t.Run("default_encoding", func(t *testing.T) { + cfg.Encoding = new(otlpProtoMarshaller).Encoding() + exporter, err := f.CreateTraceExporter(context.Background(), component.ExporterCreateParams{}, cfg) + require.NoError(t, err) + assert.NotNil(t, exporter) + }) +} + +type customMarshaller struct { +} + +var _ Marshaller = (*customMarshaller)(nil) + +func (c customMarshaller) Marshal(traces pdata.Traces) ([]Message, error) { + panic("implement me") +} + +func (c customMarshaller) Encoding() string { + return "custom" +} diff --git a/exporter/kafkaexporter/jaeger_marshaller.go b/exporter/kafkaexporter/jaeger_marshaller.go new file mode 100644 index 00000000000..7b9b2236d92 --- /dev/null +++ b/exporter/kafkaexporter/jaeger_marshaller.go @@ -0,0 +1,98 @@ +// Copyright 2020 The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kafkaexporter + +import ( + "bytes" + + "github.com/gogo/protobuf/jsonpb" + jaegerproto "github.com/jaegertracing/jaeger/model" + + "go.opentelemetry.io/collector/component/componenterror" + "go.opentelemetry.io/collector/consumer/pdata" + jaegertranslator "go.opentelemetry.io/collector/translator/trace/jaeger" +) + +type jaegerMarshaller struct { + marshaller jaegerSpanMarshaller +} + +var _ Marshaller = (*jaegerMarshaller)(nil) + +func (j jaegerMarshaller) Marshal(traces pdata.Traces) ([]Message, error) { + batches, err := jaegertranslator.InternalTracesToJaegerProto(traces) + if err != nil { + return nil, err + } + var messages []Message + var errs []error + for _, batch := range batches { + for _, span := range batch.Spans { + span.Process = batch.Process + bts, err := j.marshaller.marshall(span) + // continue to process spans that can be serialized + if err != nil { + errs = append(errs, err) + continue + } + messages = append(messages, Message{Value: bts}) + } + } + return messages, componenterror.CombineErrors(errs) +} + +func (j jaegerMarshaller) Encoding() string { + return j.marshaller.encoding() +} + +type jaegerSpanMarshaller interface { + marshall(span *jaegerproto.Span) ([]byte, error) + encoding() string +} + +type jaegerProtoSpanMarshaller struct { +} + +var _ jaegerSpanMarshaller = (*jaegerProtoSpanMarshaller)(nil) + +func (p jaegerProtoSpanMarshaller) marshall(span *jaegerproto.Span) ([]byte, error) { + return span.Marshal() +} + +func (p jaegerProtoSpanMarshaller) encoding() string { + return "jaeger_proto" +} + +type jaegerJSONSpanMarshaller struct { + pbMarshaller *jsonpb.Marshaler +} + +var _ jaegerSpanMarshaller = (*jaegerJSONSpanMarshaller)(nil) + +func newJaegerJSONMarshaller() *jaegerJSONSpanMarshaller { + return &jaegerJSONSpanMarshaller{ + pbMarshaller: &jsonpb.Marshaler{}, + } +} + +func (p jaegerJSONSpanMarshaller) marshall(span *jaegerproto.Span) ([]byte, error) { + out := new(bytes.Buffer) + err := p.pbMarshaller.Marshal(out, span) + return out.Bytes(), err +} + +func (p jaegerJSONSpanMarshaller) encoding() string { + return "jaeger_json" +} diff --git a/exporter/kafkaexporter/jaeger_marshaller_test.go b/exporter/kafkaexporter/jaeger_marshaller_test.go new file mode 100644 index 00000000000..be698698b4d --- /dev/null +++ b/exporter/kafkaexporter/jaeger_marshaller_test.go @@ -0,0 +1,95 @@ +// Copyright 2020 The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kafkaexporter + +import ( + "bytes" + "testing" + + "github.com/gogo/protobuf/jsonpb" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/collector/consumer/pdata" + jaegertranslator "go.opentelemetry.io/collector/translator/trace/jaeger" +) + +func TestJaegerMarshaller(t *testing.T) { + td := pdata.NewTraces() + td.ResourceSpans().Resize(1) + td.ResourceSpans().At(0).InstrumentationLibrarySpans().Resize(1) + td.ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).Spans().Resize(1) + td.ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).Spans().At(0).SetName("foo") + td.ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).Spans().At(0).SetStartTime(pdata.TimestampUnixNano(10)) + td.ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).Spans().At(0).SetEndTime(pdata.TimestampUnixNano(20)) + td.ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).Spans().At(0).SetTraceID(pdata.NewTraceID([]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16})) + td.ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).Spans().At(0).SetSpanID(pdata.NewSpanID([]byte{1, 2, 3, 4, 5, 6, 7, 8})) + batches, err := jaegertranslator.InternalTracesToJaegerProto(td) + require.NoError(t, err) + + batches[0].Spans[0].Process = batches[0].Process + jaegerProtoBytes, err := batches[0].Spans[0].Marshal() + require.NoError(t, err) + require.NotNil(t, jaegerProtoBytes) + + jsonMarshaller := &jsonpb.Marshaler{} + jsonByteBuffer := new(bytes.Buffer) + require.NoError(t, jsonMarshaller.Marshal(jsonByteBuffer, batches[0].Spans[0])) + + tests := []struct { + unmarshaller Marshaller + encoding string + messages []Message + }{ + { + unmarshaller: jaegerMarshaller{ + marshaller: jaegerProtoSpanMarshaller{}, + }, + encoding: "jaeger_proto", + messages: []Message{{Value: jaegerProtoBytes}}, + }, + { + unmarshaller: jaegerMarshaller{ + marshaller: jaegerJSONSpanMarshaller{ + pbMarshaller: &jsonpb.Marshaler{}, + }, + }, + encoding: "jaeger_json", + messages: []Message{{Value: jsonByteBuffer.Bytes()}}, + }, + } + for _, test := range tests { + t.Run(test.encoding, func(t *testing.T) { + messages, err := test.unmarshaller.Marshal(td) + require.NoError(t, err) + assert.Equal(t, test.messages, messages) + assert.Equal(t, test.encoding, test.unmarshaller.Encoding()) + }) + } +} + +func TestJaegerMarshaller_error_covert_traceID(t *testing.T) { + marshaller := jaegerMarshaller{ + marshaller: jaegerProtoSpanMarshaller{}, + } + td := pdata.NewTraces() + td.ResourceSpans().Resize(1) + td.ResourceSpans().At(0).InstrumentationLibrarySpans().Resize(1) + td.ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).Spans().Resize(1) + // fails in zero traceID + messages, err := marshaller.Marshal(td) + require.Error(t, err) + assert.Nil(t, messages) +} diff --git a/exporter/kafkaexporter/kafka_exporter.go b/exporter/kafkaexporter/kafka_exporter.go index 8d3b7ffe420..ce392fcfb06 100644 --- a/exporter/kafkaexporter/kafka_exporter.go +++ b/exporter/kafkaexporter/kafka_exporter.go @@ -16,6 +16,7 @@ package kafkaexporter import ( "context" + "fmt" "github.com/Shopify/sarama" "go.uber.org/zap" @@ -25,16 +26,23 @@ import ( "go.opentelemetry.io/collector/consumer/pdata" ) +var errUnrecognizedEncoding = fmt.Errorf("unrecognized encoding") + // kafkaProducer uses sarama to produce messages to Kafka. type kafkaProducer struct { producer sarama.SyncProducer topic string - marshaller marshaller + marshaller Marshaller logger *zap.Logger } // newExporter creates Kafka exporter. -func newExporter(config Config, params component.ExporterCreateParams) (*kafkaProducer, error) { +func newExporter(config Config, params component.ExporterCreateParams, marshallers map[string]Marshaller) (*kafkaProducer, error) { + marshaller := marshallers[config.Encoding] + if marshaller == nil { + return nil, errUnrecognizedEncoding + } + c := sarama.NewConfig() // These setting are required by the sarama.SyncProducer implementation. c.Producer.Return.Successes = true @@ -60,21 +68,17 @@ func newExporter(config Config, params component.ExporterCreateParams) (*kafkaPr return &kafkaProducer{ producer: producer, topic: config.Topic, - marshaller: &protoMarshaller{}, + marshaller: marshaller, logger: params.Logger, }, nil } func (e *kafkaProducer) traceDataPusher(_ context.Context, td pdata.Traces) (int, error) { - bts, err := e.marshaller.Marshal(td) + messages, err := e.marshaller.Marshal(td) if err != nil { return td.SpanCount(), consumererror.Permanent(err) } - m := &sarama.ProducerMessage{ - Topic: e.topic, - Value: sarama.ByteEncoder(bts), - } - _, _, err = e.producer.SendMessage(m) + err = e.producer.SendMessages(producerMessages(messages, e.topic)) if err != nil { return td.SpanCount(), err } @@ -84,3 +88,14 @@ func (e *kafkaProducer) traceDataPusher(_ context.Context, td pdata.Traces) (int func (e *kafkaProducer) Close(context.Context) error { return e.producer.Close() } + +func producerMessages(messages []Message, topic string) []*sarama.ProducerMessage { + producerMessages := make([]*sarama.ProducerMessage, len(messages)) + for i := range messages { + producerMessages[i] = &sarama.ProducerMessage{ + Topic: topic, + Value: sarama.ByteEncoder(messages[i].Value), + } + } + return producerMessages +} diff --git a/exporter/kafkaexporter/kafka_exporter_test.go b/exporter/kafkaexporter/kafka_exporter_test.go index 41ba397c569..3b27c98dbf1 100644 --- a/exporter/kafkaexporter/kafka_exporter_test.go +++ b/exporter/kafkaexporter/kafka_exporter_test.go @@ -30,13 +30,20 @@ import ( "go.opentelemetry.io/collector/internal/data/testdata" ) -func TestNewExporter_wrong_version(t *testing.T) { - c := Config{ProtocolVersion: "0.0.0"} - exp, err := newExporter(c, component.ExporterCreateParams{}) +func TestNewExporter_err_version(t *testing.T) { + c := Config{ProtocolVersion: "0.0.0", Encoding: defaultEncoding} + exp, err := newExporter(c, component.ExporterCreateParams{}, defaultMarshallers()) assert.Error(t, err) assert.Nil(t, exp) } +func TestNewExporter_err_encoding(t *testing.T) { + c := Config{Encoding: "foo"} + exp, err := newExporter(c, component.ExporterCreateParams{}, map[string]Marshaller{}) + assert.EqualError(t, err, errUnrecognizedEncoding.Error()) + assert.Nil(t, exp) +} + func TestTraceDataPusher(t *testing.T) { c := sarama.NewConfig() producer := mocks.NewSyncProducer(t, c) @@ -44,7 +51,7 @@ func TestTraceDataPusher(t *testing.T) { p := kafkaProducer{ producer: producer, - marshaller: &protoMarshaller{}, + marshaller: &otlpProtoMarshaller{}, } t.Cleanup(func() { require.NoError(t, p.Close(context.Background())) @@ -62,7 +69,7 @@ func TestTraceDataPusher_err(t *testing.T) { p := kafkaProducer{ producer: producer, - marshaller: &protoMarshaller{}, + marshaller: &otlpProtoMarshaller{}, logger: zap.NewNop(), } t.Cleanup(func() { @@ -91,8 +98,12 @@ type errorMarshaller struct { err error } -var _ marshaller = (*errorMarshaller)(nil) +var _ Marshaller = (*errorMarshaller)(nil) -func (e errorMarshaller) Marshal(pdata.Traces) ([]byte, error) { +func (e errorMarshaller) Marshal(traces pdata.Traces) ([]Message, error) { return nil, e.err } + +func (e errorMarshaller) Encoding() string { + panic("implement me") +} diff --git a/exporter/kafkaexporter/marshaller.go b/exporter/kafkaexporter/marshaller.go index 52e989b99ff..a80f4bbd8c8 100644 --- a/exporter/kafkaexporter/marshaller.go +++ b/exporter/kafkaexporter/marshaller.go @@ -16,21 +16,30 @@ package kafkaexporter import ( "go.opentelemetry.io/collector/consumer/pdata" - otlptrace "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/collector/trace/v1" ) -// marshaller encodes traces into a byte array to be sent to Kafka. -type marshaller interface { - // Marshal serializes spans into byte array - Marshal(traces pdata.Traces) ([]byte, error) +// Marshaller marshals traces into Message array. +type Marshaller interface { + // Marshal serializes spans into Messages + Marshal(traces pdata.Traces) ([]Message, error) + + // Encoding returns encoding name + Encoding() string } -type protoMarshaller struct { +// Message encapsulates Kafka's message payload. +type Message struct { + Value []byte } -func (m *protoMarshaller) Marshal(traces pdata.Traces) ([]byte, error) { - request := otlptrace.ExportTraceServiceRequest{ - ResourceSpans: pdata.TracesToOtlp(traces), +// defaultMarshallers returns map of supported encodings with Marshaller. +func defaultMarshallers() map[string]Marshaller { + otlp := &otlpProtoMarshaller{} + jaegerProto := jaegerMarshaller{marshaller: jaegerProtoSpanMarshaller{}} + jaegerJSON := jaegerMarshaller{marshaller: newJaegerJSONMarshaller()} + return map[string]Marshaller{ + otlp.Encoding(): otlp, + jaegerProto.Encoding(): jaegerProto, + jaegerJSON.Encoding(): jaegerJSON, } - return request.Marshal() } diff --git a/exporter/kafkaexporter/marshaller_test.go b/exporter/kafkaexporter/marshaller_test.go index b2ae8ffaf0a..d31c30fea40 100644 --- a/exporter/kafkaexporter/marshaller_test.go +++ b/exporter/kafkaexporter/marshaller_test.go @@ -19,31 +19,21 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - - "go.opentelemetry.io/collector/consumer/pdata" - otlptrace "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/collector/trace/v1" ) -func TestMarshall(t *testing.T) { - td := pdata.NewTraces() - td.ResourceSpans().Resize(1) - td.ResourceSpans().At(0).Resource().InitEmpty() - td.ResourceSpans().At(0).Resource().Attributes().InsertString("foo", "bar") - m := protoMarshaller{} - bts, err := m.Marshal(td) - require.NoError(t, err) - - request := &otlptrace.ExportTraceServiceRequest{ - ResourceSpans: pdata.TracesToOtlp(td), +func TestDefaultMarshallers(t *testing.T) { + expectedEncodings := []string{ + "otlp_proto", + "jaeger_proto", + "jaeger_json", + } + marshallers := defaultMarshallers() + assert.Equal(t, len(expectedEncodings), len(marshallers)) + for _, e := range expectedEncodings { + t.Run(e, func(t *testing.T) { + m, ok := marshallers[e] + require.True(t, ok) + assert.NotNil(t, m) + }) } - expected, err := request.Marshal() - require.NoError(t, err) - assert.Equal(t, expected, bts) -} - -func TestMarshall_empty(t *testing.T) { - m := protoMarshaller{} - bts, err := m.Marshal(pdata.NewTraces()) - require.NoError(t, err) - assert.NotNil(t, bts) } diff --git a/exporter/kafkaexporter/otlp_marshaller.go b/exporter/kafkaexporter/otlp_marshaller.go new file mode 100644 index 00000000000..1ebed86bbab --- /dev/null +++ b/exporter/kafkaexporter/otlp_marshaller.go @@ -0,0 +1,40 @@ +// Copyright 2020 The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kafkaexporter + +import ( + "go.opentelemetry.io/collector/consumer/pdata" + otlptrace "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/collector/trace/v1" +) + +type otlpProtoMarshaller struct { +} + +var _ Marshaller = (*otlpProtoMarshaller)(nil) + +func (m *otlpProtoMarshaller) Encoding() string { + return defaultEncoding +} + +func (m *otlpProtoMarshaller) Marshal(traces pdata.Traces) ([]Message, error) { + request := otlptrace.ExportTraceServiceRequest{ + ResourceSpans: pdata.TracesToOtlp(traces), + } + bts, err := request.Marshal() + if err != nil { + return nil, err + } + return []Message{{Value: bts}}, nil +} diff --git a/exporter/kafkaexporter/otlp_marshaller_test.go b/exporter/kafkaexporter/otlp_marshaller_test.go new file mode 100644 index 00000000000..be0634ef47c --- /dev/null +++ b/exporter/kafkaexporter/otlp_marshaller_test.go @@ -0,0 +1,44 @@ +// Copyright 2020 The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kafkaexporter + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/collector/consumer/pdata" + otlptrace "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/collector/trace/v1" +) + +func TestOTLPMarshaller(t *testing.T) { + td := pdata.NewTraces() + td.ResourceSpans().Resize(1) + td.ResourceSpans().At(0).Resource().InitEmpty() + td.ResourceSpans().At(0).Resource().Attributes().InsertString("foo", "bar") + request := &otlptrace.ExportTraceServiceRequest{ + ResourceSpans: pdata.TracesToOtlp(td), + } + expected, err := request.Marshal() + require.NoError(t, err) + require.NotNil(t, expected) + + m := otlpProtoMarshaller{} + assert.Equal(t, "otlp_proto", m.Encoding()) + messages, err := m.Marshal(td) + require.NoError(t, err) + assert.Equal(t, []Message{{Value: expected}}, messages) +} diff --git a/go.sum b/go.sum index 74a423adc48..2cf4a6105c2 100644 --- a/go.sum +++ b/go.sum @@ -519,7 +519,6 @@ github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgf github.com/grpc-ecosystem/grpc-gateway v1.9.0/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY= github.com/grpc-ecosystem/grpc-gateway v1.9.5/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY= github.com/grpc-ecosystem/grpc-gateway v1.13.0/go.mod h1:8XEsbTttt/W+VvjtQhLACqCisSPWTxCZ7sBRjU6iH9c= -github.com/grpc-ecosystem/grpc-gateway v1.14.6 h1:8ERzHx8aj1Sc47mu9n/AksaKCSWrMchFtkdrS4BIj5o= github.com/grpc-ecosystem/grpc-gateway v1.14.6/go.mod h1:zdiPV4Yse/1gnckTHtghG4GkDEdKCRJduHpTxT3/jcw= github.com/grpc-ecosystem/grpc-gateway v1.14.7 h1:Nk5kuHrnWUTf/0GL1a/vchH/om9Ap2/HnVna+jYZgTY= github.com/grpc-ecosystem/grpc-gateway v1.14.7/go.mod h1:oYZKL012gGh6LMyg/xA7Q2yq6j8bu0wa+9w14EEthWU= diff --git a/receiver/kafkareceiver/README.md b/receiver/kafkareceiver/README.md index 7a5771caff8..9ca7e82dc71 100644 --- a/receiver/kafkareceiver/README.md +++ b/receiver/kafkareceiver/README.md @@ -1,8 +1,6 @@ # Kafka Receiver -Kafka receiver receives traces from Kafka. - -Message payloads are deserialized into OTLP `ExportTraceServiceRequest`. +Kafka receiver receives traces from Kafka. Message payload encoding is configurable. The following settings are required: - `protocol_version` (no default): Kafka protocol version e.g. 2.0.0 @@ -10,6 +8,12 @@ The following settings are required: The following settings can be optionally configured: - `brokers` (default = localhost:9092): The list of kafka brokers - `topic` (default = otlp_spans): The name of the kafka topic to export to +- `encoding` (default = otlp_proto): The encoding of the payload sent to kafka. Available encodings: + - `otlp_proto`: the payload is deserialized to `ExportTraceServiceRequest`. + - `jaeger_proto`: the payload is deserialized to a single Jaeger proto `Span`. + - `jaeger_json`: the payload is deserialized to a single Jaeger JSON Span using `jsonpb`. + - `zipkin_thrift`: the payload is deserialized into Zipkin Thrift spans. + - `zipkin_json`: the payload is deserialized into Zipkin V2 JSON spans. - `group_id` (default = otel-collector): The consumer group that receiver will be consuming messages from - `client_id` (default = otel-collector): The consumer client ID that receiver will use - `metadata` diff --git a/receiver/kafkareceiver/config.go b/receiver/kafkareceiver/config.go index 10512fb6fcf..aa274b10fe4 100644 --- a/receiver/kafkareceiver/config.go +++ b/receiver/kafkareceiver/config.go @@ -28,6 +28,8 @@ type Config struct { ProtocolVersion string `mapstructure:"protocol_version"` // The name of the kafka topic to consume from (default "otlp_spans") Topic string `mapstructure:"topic"` + // Encoding of the messages (default "otlp_proto") + Encoding string `mapstructure:"encoding"` // The consumer group that receiver will be consuming messages from (default "otel-collector") GroupID string `mapstructure:"group_id"` // The consumer client ID that receiver will use (default "otel-collector") diff --git a/receiver/kafkareceiver/config_test.go b/receiver/kafkareceiver/config_test.go index f7679f08cf7..f627c8e393f 100644 --- a/receiver/kafkareceiver/config_test.go +++ b/receiver/kafkareceiver/config_test.go @@ -45,6 +45,7 @@ func TestLoadConfig(t *testing.T) { TypeVal: typeStr, }, Topic: "spans", + Encoding: "otlp_proto", Brokers: []string{"foo:123", "bar:456"}, ClientID: "otel-collector", GroupID: "otel-collector", diff --git a/receiver/kafkareceiver/factory.go b/receiver/kafkareceiver/factory.go index c8a28ba0bc6..7232c54c1c5 100644 --- a/receiver/kafkareceiver/factory.go +++ b/receiver/kafkareceiver/factory.go @@ -28,6 +28,7 @@ import ( const ( typeStr = "kafka" defaultTopic = "otlp_spans" + defaultEncoding = "otlp_proto" defaultBroker = "localhost:9092" defaultClientID = "otel-collector" defaultGroupID = defaultClientID @@ -40,12 +41,30 @@ const ( defaultMetadataFull = true ) +// FactoryOption applies changes to kafkaExporterFactory. +type FactoryOption func(factory *kafkaReceiverFactory) + +// WithAddUnmarshallers adds marshallers. +func WithAddUnmarshallers(encodingMarshaller map[string]Unmarshaller) FactoryOption { + return func(factory *kafkaReceiverFactory) { + for encoding, unmarshaller := range encodingMarshaller { + factory.unmarshalers[encoding] = unmarshaller + } + } +} + // NewFactory creates Kafka receiver factory. -func NewFactory() component.ReceiverFactory { +func NewFactory(options ...FactoryOption) component.ReceiverFactory { + f := &kafkaReceiverFactory{ + unmarshalers: defaultUnmarshallers(), + } + for _, o := range options { + o(f) + } return receiverhelper.NewFactory( typeStr, createDefaultConfig, - receiverhelper.WithTraces(createTraceReceiver)) + receiverhelper.WithTraces(f.createTraceReceiver)) } func createDefaultConfig() configmodels.Receiver { @@ -55,6 +74,7 @@ func createDefaultConfig() configmodels.Receiver { NameVal: typeStr, }, Topic: defaultTopic, + Encoding: defaultEncoding, Brokers: []string{defaultBroker}, ClientID: defaultClientID, GroupID: defaultGroupID, @@ -68,14 +88,18 @@ func createDefaultConfig() configmodels.Receiver { } } -func createTraceReceiver( +type kafkaReceiverFactory struct { + unmarshalers map[string]Unmarshaller +} + +func (f *kafkaReceiverFactory) createTraceReceiver( _ context.Context, params component.ReceiverCreateParams, cfg configmodels.Receiver, nextConsumer consumer.TraceConsumer, ) (component.TraceReceiver, error) { c := cfg.(*Config) - r, err := newReceiver(*c, params, nextConsumer) + r, err := newReceiver(*c, params, f.unmarshalers, nextConsumer) if err != nil { return nil, err } diff --git a/receiver/kafkareceiver/factory_test.go b/receiver/kafkareceiver/factory_test.go index 4f81a5172e1..c094815bbb0 100644 --- a/receiver/kafkareceiver/factory_test.go +++ b/receiver/kafkareceiver/factory_test.go @@ -23,6 +23,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configcheck" + "go.opentelemetry.io/collector/consumer/pdata" ) func TestCreateDefaultConfig(t *testing.T) { @@ -39,7 +40,9 @@ func TestCreateTraceReceiver(t *testing.T) { cfg := createDefaultConfig().(*Config) cfg.Brokers = []string{"invalid:9092"} cfg.ProtocolVersion = "2.0.0" - r, err := createTraceReceiver(context.Background(), component.ReceiverCreateParams{}, cfg, nil) + f := kafkaReceiverFactory{unmarshalers: defaultUnmarshallers()} + r, err := f.createTraceReceiver(context.Background(), component.ReceiverCreateParams{}, cfg, nil) + // no available broker require.Error(t, err) assert.Nil(t, r) } @@ -49,7 +52,43 @@ func TestCreateTraceReceiver_error(t *testing.T) { cfg.ProtocolVersion = "2.0.0" // disable contacting broker at startup cfg.Metadata.Full = false - r, err := createTraceReceiver(context.Background(), component.ReceiverCreateParams{}, cfg, nil) + f := kafkaReceiverFactory{unmarshalers: defaultUnmarshallers()} + r, err := f.createTraceReceiver(context.Background(), component.ReceiverCreateParams{}, cfg, nil) require.NoError(t, err) assert.NotNil(t, r) } + +func TestWithUnmarshallers(t *testing.T) { + cum := &customUnamarshaller{} + f := NewFactory(WithAddUnmarshallers(map[string]Unmarshaller{cum.Encoding(): cum})) + cfg := createDefaultConfig().(*Config) + // disable contacting broker + cfg.Metadata.Full = false + cfg.ProtocolVersion = "2.0.0" + + t.Run("custom_encoding", func(t *testing.T) { + cfg.Encoding = cum.Encoding() + exporter, err := f.CreateTraceReceiver(context.Background(), component.ReceiverCreateParams{}, cfg, nil) + require.NoError(t, err) + require.NotNil(t, exporter) + }) + t.Run("default_encoding", func(t *testing.T) { + cfg.Encoding = new(otlpProtoUnmarshaller).Encoding() + exporter, err := f.CreateTraceReceiver(context.Background(), component.ReceiverCreateParams{}, cfg, nil) + require.NoError(t, err) + assert.NotNil(t, exporter) + }) +} + +type customUnamarshaller struct { +} + +var _ Unmarshaller = (*customUnamarshaller)(nil) + +func (c customUnamarshaller) Unmarshal(bytes []byte) (pdata.Traces, error) { + panic("implement me") +} + +func (c customUnamarshaller) Encoding() string { + return "custom" +} diff --git a/receiver/kafkareceiver/jaeger_unmarshaller.go b/receiver/kafkareceiver/jaeger_unmarshaller.go new file mode 100644 index 00000000000..579f7f6e1d5 --- /dev/null +++ b/receiver/kafkareceiver/jaeger_unmarshaller.go @@ -0,0 +1,69 @@ +// Copyright 2020 The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kafkareceiver + +import ( + "bytes" + + "github.com/gogo/protobuf/jsonpb" + jaegerproto "github.com/jaegertracing/jaeger/model" + + "go.opentelemetry.io/collector/consumer/pdata" + jaegertranslator "go.opentelemetry.io/collector/translator/trace/jaeger" +) + +type jaegerProtoSpanUnmarshaller struct { +} + +var _ Unmarshaller = (*jaegerProtoSpanUnmarshaller)(nil) + +func (j jaegerProtoSpanUnmarshaller) Unmarshal(bytes []byte) (pdata.Traces, error) { + span := &jaegerproto.Span{} + err := span.Unmarshal(bytes) + if err != nil { + return pdata.NewTraces(), err + } + return jaegerSpanToTraces(span), nil +} + +func (j jaegerProtoSpanUnmarshaller) Encoding() string { + return "jaeger_proto" +} + +type jaegerJSONSpanUnmarshaller struct { +} + +var _ Unmarshaller = (*jaegerJSONSpanUnmarshaller)(nil) + +func (j jaegerJSONSpanUnmarshaller) Unmarshal(data []byte) (pdata.Traces, error) { + span := &jaegerproto.Span{} + err := jsonpb.Unmarshal(bytes.NewReader(data), span) + if err != nil { + return pdata.NewTraces(), err + } + return jaegerSpanToTraces(span), nil +} + +func (j jaegerJSONSpanUnmarshaller) Encoding() string { + return "jaeger_json" +} + +func jaegerSpanToTraces(span *jaegerproto.Span) pdata.Traces { + batch := jaegerproto.Batch{ + Spans: []*jaegerproto.Span{span}, + Process: span.Process, + } + return jaegertranslator.ProtoBatchToInternalTraces(batch) +} diff --git a/receiver/kafkareceiver/jaeger_unmarshaller_test.go b/receiver/kafkareceiver/jaeger_unmarshaller_test.go new file mode 100644 index 00000000000..d828348778a --- /dev/null +++ b/receiver/kafkareceiver/jaeger_unmarshaller_test.go @@ -0,0 +1,87 @@ +// Copyright 2020 The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kafkareceiver + +import ( + "bytes" + "testing" + + "github.com/gogo/protobuf/jsonpb" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/collector/consumer/pdata" + jaegertranslator "go.opentelemetry.io/collector/translator/trace/jaeger" +) + +func TestUnmarshallJaeger(t *testing.T) { + td := pdata.NewTraces() + td.ResourceSpans().Resize(1) + td.ResourceSpans().At(0).InstrumentationLibrarySpans().Resize(1) + td.ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).Spans().Resize(1) + td.ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).Spans().At(0).SetName("foo") + td.ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).Spans().At(0).SetStartTime(pdata.TimestampUnixNano(10)) + td.ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).Spans().At(0).SetEndTime(pdata.TimestampUnixNano(20)) + td.ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).Spans().At(0).SetTraceID(pdata.NewTraceID([]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16})) + td.ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).Spans().At(0).SetSpanID(pdata.NewSpanID([]byte{1, 2, 3, 4, 5, 6, 7, 8})) + batches, err := jaegertranslator.InternalTracesToJaegerProto(td) + require.NoError(t, err) + + protoBytes, err := batches[0].Spans[0].Marshal() + require.NoError(t, err) + + jsonMarshaller := &jsonpb.Marshaler{} + jsonBytes := new(bytes.Buffer) + jsonMarshaller.Marshal(jsonBytes, batches[0].Spans[0]) + + tests := []struct { + unmarshaller Unmarshaller + encoding string + bytes []byte + }{ + { + unmarshaller: jaegerProtoSpanUnmarshaller{}, + encoding: "jaeger_proto", + bytes: protoBytes, + }, + { + unmarshaller: jaegerJSONSpanUnmarshaller{}, + encoding: "jaeger_json", + bytes: jsonBytes.Bytes(), + }, + } + for _, test := range tests { + t.Run(test.encoding, func(t *testing.T) { + got, err := test.unmarshaller.Unmarshal(test.bytes) + require.NoError(t, err) + assert.Equal(t, td, got) + assert.Equal(t, test.encoding, test.unmarshaller.Encoding()) + }) + } +} + +func TestUnmarshallJaegerProto_error(t *testing.T) { + p := jaegerProtoSpanUnmarshaller{} + got, err := p.Unmarshal([]byte("+$%")) + assert.Equal(t, pdata.NewTraces(), got) + assert.Error(t, err) +} + +func TestUnmarshallJaegerJSON_error(t *testing.T) { + p := jaegerJSONSpanUnmarshaller{} + got, err := p.Unmarshal([]byte("+$%")) + assert.Equal(t, pdata.NewTraces(), got) + assert.Error(t, err) +} diff --git a/receiver/kafkareceiver/kafka_receiver.go b/receiver/kafkareceiver/kafka_receiver.go index f03fe8895ed..6ea35d7b132 100644 --- a/receiver/kafkareceiver/kafka_receiver.go +++ b/receiver/kafkareceiver/kafka_receiver.go @@ -16,6 +16,7 @@ package kafkareceiver import ( "context" + "fmt" "github.com/Shopify/sarama" "go.opencensus.io/stats" @@ -31,6 +32,8 @@ const ( transport = "kafka" ) +var errUnrecognizedEncoding = fmt.Errorf("unrecognized encoding") + // kafkaConsumer uses sarama to consume and handle messages from kafka. type kafkaConsumer struct { name string @@ -38,14 +41,19 @@ type kafkaConsumer struct { nextConsumer consumer.TraceConsumer topics []string cancelConsumeLoop context.CancelFunc - unmarshaller unmarshaller + unmarshaller Unmarshaller logger *zap.Logger } var _ component.Receiver = (*kafkaConsumer)(nil) -func newReceiver(config Config, params component.ReceiverCreateParams, nextConsumer consumer.TraceConsumer) (*kafkaConsumer, error) { +func newReceiver(config Config, params component.ReceiverCreateParams, unmarshalers map[string]Unmarshaller, nextConsumer consumer.TraceConsumer) (*kafkaConsumer, error) { + unmarshaller := unmarshalers[config.Encoding] + if unmarshaller == nil { + return nil, errUnrecognizedEncoding + } + c := sarama.NewConfig() c.ClientID = config.ClientID c.Metadata.Full = config.Metadata.Full @@ -67,7 +75,7 @@ func newReceiver(config Config, params component.ReceiverCreateParams, nextConsu consumerGroup: client, topics: []string{config.Topic}, nextConsumer: nextConsumer, - unmarshaller: &protoUnmarshaller{}, + unmarshaller: unmarshaller, logger: params.Logger, }, nil } @@ -110,7 +118,7 @@ func (c *kafkaConsumer) Shutdown(context.Context) error { type consumerGroupHandler struct { name string - unmarshaller unmarshaller + unmarshaller Unmarshaller nextConsumer consumer.TraceConsumer ready chan bool @@ -156,7 +164,7 @@ func (c *consumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, } err = c.nextConsumer.ConsumeTraces(session.Context(), traces) - obsreport.EndTraceDataReceiveOp(ctx, c.unmarshaller.Format(), traces.SpanCount(), err) + obsreport.EndTraceDataReceiveOp(ctx, c.unmarshaller.Encoding(), traces.SpanCount(), err) if err != nil { return err } diff --git a/receiver/kafkareceiver/kafka_receiver_test.go b/receiver/kafkareceiver/kafka_receiver_test.go index 8f946904c89..a1238c841f4 100644 --- a/receiver/kafkareceiver/kafka_receiver_test.go +++ b/receiver/kafkareceiver/kafka_receiver_test.go @@ -37,13 +37,24 @@ import ( func TestNewReceiver_version_err(t *testing.T) { c := Config{ + Encoding: defaultEncoding, ProtocolVersion: "none", } - r, err := newReceiver(c, component.ReceiverCreateParams{}, exportertest.NewNopTraceExporter()) + r, err := newReceiver(c, component.ReceiverCreateParams{}, defaultUnmarshallers(), exportertest.NewNopTraceExporter()) assert.Error(t, err) assert.Nil(t, r) } +func TestNewReceiver_encoding_err(t *testing.T) { + c := Config{ + Encoding: "foo", + } + r, err := newReceiver(c, component.ReceiverCreateParams{}, defaultUnmarshallers(), exportertest.NewNopTraceExporter()) + require.Error(t, err) + assert.Nil(t, r) + assert.EqualError(t, err, errUnrecognizedEncoding.Error()) +} + func TestReceiverStart(t *testing.T) { testClient := testConsumerGroup{once: &sync.Once{}} c := kafkaConsumer{ @@ -100,7 +111,7 @@ func TestConsumerGroupHandler(t *testing.T) { defer view.Unregister(views...) c := consumerGroupHandler{ - unmarshaller: &protoUnmarshaller{}, + unmarshaller: &otlpProtoUnmarshaller{}, logger: zap.NewNop(), ready: make(chan bool), nextConsumer: exportertest.NewNopTraceExporter(), @@ -144,7 +155,7 @@ func TestConsumerGroupHandler(t *testing.T) { func TestConsumerGroupHandler_error_unmarshall(t *testing.T) { c := consumerGroupHandler{ - unmarshaller: &protoUnmarshaller{}, + unmarshaller: &otlpProtoUnmarshaller{}, logger: zap.NewNop(), ready: make(chan bool), nextConsumer: exportertest.NewNopTraceExporter(), @@ -170,7 +181,7 @@ func TestConsumerGroupHandler_error_nextConsumer(t *testing.T) { consumerError := fmt.Errorf("failed to consumer") nextConsumer.SetConsumeTraceError(consumerError) c := consumerGroupHandler{ - unmarshaller: &protoUnmarshaller{}, + unmarshaller: &otlpProtoUnmarshaller{}, logger: zap.NewNop(), ready: make(chan bool), nextConsumer: nextConsumer, diff --git a/receiver/kafkareceiver/otlp_unmarshaller.go b/receiver/kafkareceiver/otlp_unmarshaller.go new file mode 100644 index 00000000000..b73978606b2 --- /dev/null +++ b/receiver/kafkareceiver/otlp_unmarshaller.go @@ -0,0 +1,38 @@ +// Copyright 2020 The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kafkareceiver + +import ( + "go.opentelemetry.io/collector/consumer/pdata" + otlptrace "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/collector/trace/v1" +) + +type otlpProtoUnmarshaller struct { +} + +var _ Unmarshaller = (*otlpProtoUnmarshaller)(nil) + +func (p *otlpProtoUnmarshaller) Unmarshal(bytes []byte) (pdata.Traces, error) { + request := &otlptrace.ExportTraceServiceRequest{} + err := request.Unmarshal(bytes) + if err != nil { + return pdata.NewTraces(), err + } + return pdata.TracesFromOtlp(request.GetResourceSpans()), nil +} + +func (*otlpProtoUnmarshaller) Encoding() string { + return defaultEncoding +} diff --git a/receiver/kafkareceiver/otlp_unmarshaller_test.go b/receiver/kafkareceiver/otlp_unmarshaller_test.go new file mode 100644 index 00000000000..9b81090b9d4 --- /dev/null +++ b/receiver/kafkareceiver/otlp_unmarshaller_test.go @@ -0,0 +1,50 @@ +// Copyright 2020 The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kafkareceiver + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/collector/consumer/pdata" + otlptrace "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/collector/trace/v1" +) + +func TestUnmarshallOTLP(t *testing.T) { + td := pdata.NewTraces() + td.ResourceSpans().Resize(1) + td.ResourceSpans().At(0).Resource().InitEmpty() + td.ResourceSpans().At(0).Resource().Attributes().InsertString("foo", "bar") + request := &otlptrace.ExportTraceServiceRequest{ + ResourceSpans: pdata.TracesToOtlp(td), + } + expected, err := request.Marshal() + require.NoError(t, err) + + p := otlpProtoUnmarshaller{} + got, err := p.Unmarshal(expected) + require.NoError(t, err) + assert.Equal(t, td, got) + assert.Equal(t, "otlp_proto", p.Encoding()) +} + +func TestUnmarshallOTLP_error(t *testing.T) { + p := otlpProtoUnmarshaller{} + got, err := p.Unmarshal([]byte("+$%")) + assert.Equal(t, pdata.NewTraces(), got) + assert.Error(t, err) +} diff --git a/receiver/kafkareceiver/unmarshaller.go b/receiver/kafkareceiver/unmarshaller.go index 1a4909cea95..14e36eb555a 100644 --- a/receiver/kafkareceiver/unmarshaller.go +++ b/receiver/kafkareceiver/unmarshaller.go @@ -16,30 +16,29 @@ package kafkareceiver import ( "go.opentelemetry.io/collector/consumer/pdata" - otlptrace "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/collector/trace/v1" ) -// unmarshaller deserializes the message body. -type unmarshaller interface { +// Unmarshaller deserializes the message body. +type Unmarshaller interface { // Unmarshal deserializes the message body into traces. Unmarshal([]byte) (pdata.Traces, error) - // Format of the encoding. - Format() string + // Encoding of the serialized messages. + Encoding() string } -type protoUnmarshaller struct { -} - -func (p *protoUnmarshaller) Unmarshal(bytes []byte) (pdata.Traces, error) { - request := &otlptrace.ExportTraceServiceRequest{} - err := request.Unmarshal(bytes) - if err != nil { - return pdata.NewTraces(), err +// defaultUnmarshallers returns map of supported encodings with Unmarshaller. +func defaultUnmarshallers() map[string]Unmarshaller { + otlp := &otlpProtoUnmarshaller{} + jaegerProto := jaegerProtoSpanUnmarshaller{} + jaegerJSON := jaegerJSONSpanUnmarshaller{} + zipkinJSON := zipkinJSONSpanUnmarshaller{} + zipkinThrift := zipkinThriftSpanUnmarshaller{} + return map[string]Unmarshaller{ + otlp.Encoding(): otlp, + jaegerProto.Encoding(): jaegerProto, + jaegerJSON.Encoding(): jaegerJSON, + zipkinJSON.Encoding(): zipkinJSON, + zipkinThrift.Encoding(): zipkinThrift, } - return pdata.TracesFromOtlp(request.GetResourceSpans()), nil -} - -func (*protoUnmarshaller) Format() string { - return "otlp_proto" } diff --git a/receiver/kafkareceiver/unmarshaller_test.go b/receiver/kafkareceiver/unmarshaller_test.go index 07750af6c04..74b1d7df91d 100644 --- a/receiver/kafkareceiver/unmarshaller_test.go +++ b/receiver/kafkareceiver/unmarshaller_test.go @@ -19,32 +19,23 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - - "go.opentelemetry.io/collector/consumer/pdata" - otlptrace "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/collector/trace/v1" ) -func TestUnmarshall(t *testing.T) { - td := pdata.NewTraces() - td.ResourceSpans().Resize(1) - td.ResourceSpans().At(0).Resource().InitEmpty() - td.ResourceSpans().At(0).Resource().Attributes().InsertString("foo", "bar") - request := &otlptrace.ExportTraceServiceRequest{ - ResourceSpans: pdata.TracesToOtlp(td), +func TestDefaultUnMarshaller(t *testing.T) { + expectedEncodings := []string{ + "otlp_proto", + "jaeger_proto", + "jaeger_json", + "zipkin_json", + "zipkin_thrift", + } + marshallers := defaultUnmarshallers() + assert.Equal(t, len(expectedEncodings), len(marshallers)) + for _, e := range expectedEncodings { + t.Run(e, func(t *testing.T) { + m, ok := marshallers[e] + require.True(t, ok) + assert.NotNil(t, m) + }) } - expected, err := request.Marshal() - require.NoError(t, err) - - p := protoUnmarshaller{} - got, err := p.Unmarshal(expected) - require.NoError(t, err) - assert.Equal(t, td, got) - assert.Equal(t, "otlp_proto", p.Format()) -} - -func TestUnmarshall_error(t *testing.T) { - p := protoUnmarshaller{} - got, err := p.Unmarshal([]byte("+$%")) - assert.Equal(t, pdata.NewTraces(), got) - assert.Error(t, err) } diff --git a/receiver/kafkareceiver/zipkin_unmarshaller.go b/receiver/kafkareceiver/zipkin_unmarshaller.go new file mode 100644 index 00000000000..fcd3bfa16c2 --- /dev/null +++ b/receiver/kafkareceiver/zipkin_unmarshaller.go @@ -0,0 +1,88 @@ +// Copyright 2020 The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kafkareceiver + +import ( + "encoding/json" + + "github.com/apache/thrift/lib/go/thrift" + "github.com/jaegertracing/jaeger/thrift-gen/zipkincore" + zipkinmodel "github.com/openzipkin/zipkin-go/model" + + "go.opentelemetry.io/collector/consumer/pdata" + zipkintranslator "go.opentelemetry.io/collector/translator/trace/zipkin" +) + +type zipkinJSONSpanUnmarshaller struct { +} + +var _ Unmarshaller = (*zipkinJSONSpanUnmarshaller)(nil) + +func (z zipkinJSONSpanUnmarshaller) Unmarshal(bytes []byte) (pdata.Traces, error) { + var spans []*zipkinmodel.SpanModel + if err := json.Unmarshal(bytes, &spans); err != nil { + return pdata.NewTraces(), err + } + return zipkintranslator.V2SpansToInternalTraces(spans) +} + +func (z zipkinJSONSpanUnmarshaller) Encoding() string { + return "zipkin_json" +} + +type zipkinThriftSpanUnmarshaller struct { +} + +var _ Unmarshaller = (*zipkinThriftSpanUnmarshaller)(nil) + +func (z zipkinThriftSpanUnmarshaller) Unmarshal(bytes []byte) (pdata.Traces, error) { + spans, err := deserializeZipkinThrift(bytes) + if err != nil { + return pdata.NewTraces(), err + } + return zipkintranslator.V1ThriftBatchToInternalTraces(spans) + +} + +func (z zipkinThriftSpanUnmarshaller) Encoding() string { + return "zipkin_thrift" +} + +// deserializeThrift decodes Thrift bytes to a list of spans. +// This code comes from jaegertracing/jaeger, ideally we should have imported +// it but this was creating many conflicts so brought the code to here. +// https://github.com/jaegertracing/jaeger/blob/6bc0c122bfca8e737a747826ae60a22a306d7019/model/converter/thrift/zipkin/deserialize.go#L36 +func deserializeZipkinThrift(b []byte) ([]*zipkincore.Span, error) { + buffer := thrift.NewTMemoryBuffer() + buffer.Write(b) + + transport := thrift.NewTBinaryProtocolTransport(buffer) + _, size, err := transport.ReadListBegin() // Ignore the returned element type + if err != nil { + return nil, err + } + + // We don't depend on the size returned by ReadListBegin to preallocate the array because it + // sometimes returns a nil error on bad input and provides an unreasonably large int for size + var spans []*zipkincore.Span + for i := 0; i < size; i++ { + zs := &zipkincore.Span{} + if err = zs.Read(transport); err != nil { + return nil, err + } + spans = append(spans, zs) + } + return spans, nil +} diff --git a/receiver/kafkareceiver/zipkin_unmarshaller_test.go b/receiver/kafkareceiver/zipkin_unmarshaller_test.go new file mode 100644 index 00000000000..9706a904e3b --- /dev/null +++ b/receiver/kafkareceiver/zipkin_unmarshaller_test.go @@ -0,0 +1,98 @@ +// Copyright 2020 The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kafkareceiver + +import ( + "testing" + + "github.com/apache/thrift/lib/go/thrift" + "github.com/jaegertracing/jaeger/thrift-gen/zipkincore" + zipkinreporter "github.com/openzipkin/zipkin-go/reporter" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/collector/consumer/pdata" + zipkintranslator "go.opentelemetry.io/collector/translator/trace/zipkin" +) + +func TestUnmarshallZipkin(t *testing.T) { + td := pdata.NewTraces() + td.ResourceSpans().Resize(1) + td.ResourceSpans().At(0).InstrumentationLibrarySpans().Resize(1) + td.ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).Spans().Resize(1) + td.ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).Spans().At(0).SetName("foo") + td.ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).Spans().At(0).SetStartTime(pdata.TimestampUnixNano(1597759000)) + td.ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).Spans().At(0).Attributes().InitEmptyWithCapacity(1) + td.ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).Spans().At(0).SetTraceID(pdata.NewTraceID([]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16})) + td.ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).Spans().At(0).SetSpanID(pdata.NewSpanID([]byte{1, 2, 3, 4, 5, 6, 7, 8})) + spans, err := zipkintranslator.InternalTracesToZipkinSpans(td) + require.NoError(t, err) + + serializer := zipkinreporter.JSONSerializer{} + jsonBytes, err := serializer.Serialize(spans) + require.NoError(t, err) + + tSpan := &zipkincore.Span{Name: "foo"} + thriftTransport := thrift.NewTMemoryBuffer() + protocolTransport := thrift.NewTBinaryProtocolTransport(thriftTransport) + require.NoError(t, protocolTransport.WriteListBegin(thrift.STRUCT, 1)) + err = tSpan.Write(protocolTransport) + require.NoError(t, err) + require.NoError(t, protocolTransport.WriteListEnd()) + + tdThrift, err := zipkintranslator.V1ThriftBatchToInternalTraces([]*zipkincore.Span{tSpan}) + require.NoError(t, err) + tests := []struct { + unmarshaller Unmarshaller + encoding string + bytes []byte + expected pdata.Traces + }{ + { + unmarshaller: zipkinJSONSpanUnmarshaller{}, + encoding: "zipkin_json", + bytes: jsonBytes, + expected: td, + }, + { + unmarshaller: zipkinThriftSpanUnmarshaller{}, + encoding: "zipkin_thrift", + bytes: thriftTransport.Buffer.Bytes(), + expected: tdThrift, + }, + } + for _, test := range tests { + t.Run(test.encoding, func(t *testing.T) { + traces, err := test.unmarshaller.Unmarshal(test.bytes) + require.NoError(t, err) + assert.Equal(t, test.expected, traces) + assert.Equal(t, test.encoding, test.unmarshaller.Encoding()) + }) + } +} + +func TestUnmarshallZipkinThrift_error(t *testing.T) { + p := zipkinThriftSpanUnmarshaller{} + got, err := p.Unmarshal([]byte("+$%")) + assert.Equal(t, pdata.NewTraces(), got) + assert.Error(t, err) +} + +func TestUnmarshallZipkinJSON_error(t *testing.T) { + p := zipkinJSONSpanUnmarshaller{} + got, err := p.Unmarshal([]byte("+$%")) + assert.Equal(t, pdata.NewTraces(), got) + assert.Error(t, err) +}