Skip to content

Commit

Permalink
Make Kafka payload encoding configurable (#1584)
Browse files Browse the repository at this point in the history
* Make Kafka payload encoding configurable

Signed-off-by: Pavol Loffay <ploffay@redhat.com>

* Expose marshalers in factory

Signed-off-by: Pavol Loffay <ploffay@redhat.com>

* Remove zipkin from exporter readme

Signed-off-by: Pavol Loffay <ploffay@redhat.com>

* Make custom encodings optional in the factory

Signed-off-by: Pavol Loffay <ploffay@redhat.com>
  • Loading branch information
pavolloffay authored Aug 26, 2020
1 parent b13392b commit 7f13eb6
Show file tree
Hide file tree
Showing 29 changed files with 1,003 additions and 127 deletions.
8 changes: 5 additions & 3 deletions exporter/kafkaexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,18 @@

Kafka exporter exports traces to Kafka. This exporter uses a synchronous producer
that blocks and does not batch messages, therefore it should be used with batch and queued retry
processors for higher throughput and resiliency.
processors for higher throughput and resiliency. Message payload encoding is configurable.

Message payloads are serialized OTLP `ExportTraceServiceRequest`.

The following settings are required:
- `protocol_version` (no default): Kafka protocol version e.g. 2.0.0

The following settings can be optionally configured:
- `brokers` (default = localhost:9092): The list of kafka brokers
- `topic` (default = otlp_spans): The name of the kafka topic to export to
- `encoding` (default = otlp_proto): The encoding of the payload sent to kafka. Available encodings:
- `otlp_proto`: the payload is serialized to `ExportTraceServiceRequest`.
- `jaeger_proto`: the payload is serialized to a single Jaeger proto `Span`.
- `jaeger_json`: the payload is serialized to a single Jaeger JSON Span using `jsonpb`.
- `metadata`
- `full` (default = true): Whether to maintain a full set of metadata.
When disabled the client does not make the initial request to broker at the startup.
Expand Down
2 changes: 2 additions & 0 deletions exporter/kafkaexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ type Config struct {
ProtocolVersion string `mapstructure:"protocol_version"`
// The name of the kafka topic to export to (default "otlp_spans")
Topic string `mapstructure:"topic"`
// Encoding of the messages (default "otlp_proto")
Encoding string `mapstructure:"encoding"`

// Metadata is the namespace for metadata management properties used by the
// Client, and shared by the Producer/Consumer.
Expand Down
5 changes: 3 additions & 2 deletions exporter/kafkaexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,9 @@ func TestLoadConfig(t *testing.T) {
NumConsumers: 2,
QueueSize: 10,
},
Topic: "spans",
Brokers: []string{"foo:123", "bar:456"},
Topic: "spans",
Encoding: "otlp_proto",
Brokers: []string{"foo:123", "bar:456"},
Metadata: Metadata{
Full: false,
Retry: MetadataRetry{
Expand Down
38 changes: 31 additions & 7 deletions exporter/kafkaexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@ import (
)

const (
typeStr = "kafka"
defaultTopic = "otlp_spans"
defaultBroker = "localhost:9092"
typeStr = "kafka"
defaultTopic = "otlp_spans"
defaultEncoding = "otlp_proto"
defaultBroker = "localhost:9092"
// default from sarama.NewConfig()
defaultMetadataRetryMax = 3
// default from sarama.NewConfig()
Expand All @@ -35,12 +36,30 @@ const (
defaultMetadataFull = true
)

// FactoryOption applies changes to kafkaExporterFactory.
type FactoryOption func(factory *kafkaExporterFactory)

// WithAddMarshallers adds marshallers.
func WithAddMarshallers(encodingMarshaller map[string]Marshaller) FactoryOption {
return func(factory *kafkaExporterFactory) {
for encoding, marshaller := range encodingMarshaller {
factory.marshallers[encoding] = marshaller
}
}
}

// NewFactory creates Kafka exporter factory.
func NewFactory() component.ExporterFactory {
func NewFactory(options ...FactoryOption) component.ExporterFactory {
f := &kafkaExporterFactory{
marshallers: defaultMarshallers(),
}
for _, o := range options {
o(f)
}
return exporterhelper.NewFactory(
typeStr,
createDefaultConfig,
exporterhelper.WithTraces(createTraceExporter))
exporterhelper.WithTraces(f.createTraceExporter))
}

func createDefaultConfig() configmodels.Exporter {
Expand All @@ -57,6 +76,7 @@ func createDefaultConfig() configmodels.Exporter {
QueueSettings: qs,
Brokers: []string{defaultBroker},
Topic: defaultTopic,
Encoding: defaultEncoding,
Metadata: Metadata{
Full: defaultMetadataFull,
Retry: MetadataRetry{
Expand All @@ -67,13 +87,17 @@ func createDefaultConfig() configmodels.Exporter {
}
}

func createTraceExporter(
type kafkaExporterFactory struct {
marshallers map[string]Marshaller
}

func (f *kafkaExporterFactory) createTraceExporter(
_ context.Context,
params component.ExporterCreateParams,
cfg configmodels.Exporter,
) (component.TraceExporter, error) {
oCfg := cfg.(*Config)
exp, err := newExporter(*oCfg, params)
exp, err := newExporter(*oCfg, params, f.marshallers)
if err != nil {
return nil, err
}
Expand Down
43 changes: 40 additions & 3 deletions exporter/kafkaexporter/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configcheck"
"go.opentelemetry.io/collector/consumer/pdata"
)

func TestCreateDefaultConfig(t *testing.T) {
Expand All @@ -39,7 +40,8 @@ func TestCreateTracesExporter(t *testing.T) {
cfg.ProtocolVersion = "2.0.0"
// this disables contacting the broker so we can successfully create the exporter
cfg.Metadata.Full = false
r, err := createTraceExporter(context.Background(), component.ExporterCreateParams{}, cfg)
f := kafkaExporterFactory{marshallers: defaultMarshallers()}
r, err := f.createTraceExporter(context.Background(), component.ExporterCreateParams{}, cfg)
require.NoError(t, err)
assert.NotNil(t, r)
}
Expand All @@ -48,8 +50,43 @@ func TestCreateTracesExporter_err(t *testing.T) {
cfg := createDefaultConfig().(*Config)
cfg.Brokers = []string{"invalid:9092"}
cfg.ProtocolVersion = "2.0.0"
// we get the error because the exporter
r, err := createTraceExporter(context.Background(), component.ExporterCreateParams{}, cfg)
f := kafkaExporterFactory{marshallers: defaultMarshallers()}
r, err := f.createTraceExporter(context.Background(), component.ExporterCreateParams{}, cfg)
// no available broker
require.Error(t, err)
assert.Nil(t, r)
}

func TestWithMarshallers(t *testing.T) {
cm := &customMarshaller{}
f := NewFactory(WithAddMarshallers(map[string]Marshaller{cm.Encoding(): cm}))
cfg := createDefaultConfig().(*Config)
// disable contacting broker
cfg.Metadata.Full = false

t.Run("custom_encoding", func(t *testing.T) {
cfg.Encoding = cm.Encoding()
exporter, err := f.CreateTraceExporter(context.Background(), component.ExporterCreateParams{}, cfg)
require.NoError(t, err)
require.NotNil(t, exporter)
})
t.Run("default_encoding", func(t *testing.T) {
cfg.Encoding = new(otlpProtoMarshaller).Encoding()
exporter, err := f.CreateTraceExporter(context.Background(), component.ExporterCreateParams{}, cfg)
require.NoError(t, err)
assert.NotNil(t, exporter)
})
}

type customMarshaller struct {
}

var _ Marshaller = (*customMarshaller)(nil)

func (c customMarshaller) Marshal(traces pdata.Traces) ([]Message, error) {
panic("implement me")
}

func (c customMarshaller) Encoding() string {
return "custom"
}
98 changes: 98 additions & 0 deletions exporter/kafkaexporter/jaeger_marshaller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
// Copyright 2020 The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package kafkaexporter

import (
"bytes"

"github.com/gogo/protobuf/jsonpb"
jaegerproto "github.com/jaegertracing/jaeger/model"

"go.opentelemetry.io/collector/component/componenterror"
"go.opentelemetry.io/collector/consumer/pdata"
jaegertranslator "go.opentelemetry.io/collector/translator/trace/jaeger"
)

type jaegerMarshaller struct {
marshaller jaegerSpanMarshaller
}

var _ Marshaller = (*jaegerMarshaller)(nil)

func (j jaegerMarshaller) Marshal(traces pdata.Traces) ([]Message, error) {
batches, err := jaegertranslator.InternalTracesToJaegerProto(traces)
if err != nil {
return nil, err
}
var messages []Message
var errs []error
for _, batch := range batches {
for _, span := range batch.Spans {
span.Process = batch.Process
bts, err := j.marshaller.marshall(span)
// continue to process spans that can be serialized
if err != nil {
errs = append(errs, err)
continue
}
messages = append(messages, Message{Value: bts})
}
}
return messages, componenterror.CombineErrors(errs)
}

func (j jaegerMarshaller) Encoding() string {
return j.marshaller.encoding()
}

type jaegerSpanMarshaller interface {
marshall(span *jaegerproto.Span) ([]byte, error)
encoding() string
}

type jaegerProtoSpanMarshaller struct {
}

var _ jaegerSpanMarshaller = (*jaegerProtoSpanMarshaller)(nil)

func (p jaegerProtoSpanMarshaller) marshall(span *jaegerproto.Span) ([]byte, error) {
return span.Marshal()
}

func (p jaegerProtoSpanMarshaller) encoding() string {
return "jaeger_proto"
}

type jaegerJSONSpanMarshaller struct {
pbMarshaller *jsonpb.Marshaler
}

var _ jaegerSpanMarshaller = (*jaegerJSONSpanMarshaller)(nil)

func newJaegerJSONMarshaller() *jaegerJSONSpanMarshaller {
return &jaegerJSONSpanMarshaller{
pbMarshaller: &jsonpb.Marshaler{},
}
}

func (p jaegerJSONSpanMarshaller) marshall(span *jaegerproto.Span) ([]byte, error) {
out := new(bytes.Buffer)
err := p.pbMarshaller.Marshal(out, span)
return out.Bytes(), err
}

func (p jaegerJSONSpanMarshaller) encoding() string {
return "jaeger_json"
}
95 changes: 95 additions & 0 deletions exporter/kafkaexporter/jaeger_marshaller_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Copyright 2020 The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package kafkaexporter

import (
"bytes"
"testing"

"github.com/gogo/protobuf/jsonpb"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.opentelemetry.io/collector/consumer/pdata"
jaegertranslator "go.opentelemetry.io/collector/translator/trace/jaeger"
)

func TestJaegerMarshaller(t *testing.T) {
td := pdata.NewTraces()
td.ResourceSpans().Resize(1)
td.ResourceSpans().At(0).InstrumentationLibrarySpans().Resize(1)
td.ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).Spans().Resize(1)
td.ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).Spans().At(0).SetName("foo")
td.ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).Spans().At(0).SetStartTime(pdata.TimestampUnixNano(10))
td.ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).Spans().At(0).SetEndTime(pdata.TimestampUnixNano(20))
td.ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).Spans().At(0).SetTraceID(pdata.NewTraceID([]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}))
td.ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).Spans().At(0).SetSpanID(pdata.NewSpanID([]byte{1, 2, 3, 4, 5, 6, 7, 8}))
batches, err := jaegertranslator.InternalTracesToJaegerProto(td)
require.NoError(t, err)

batches[0].Spans[0].Process = batches[0].Process
jaegerProtoBytes, err := batches[0].Spans[0].Marshal()
require.NoError(t, err)
require.NotNil(t, jaegerProtoBytes)

jsonMarshaller := &jsonpb.Marshaler{}
jsonByteBuffer := new(bytes.Buffer)
require.NoError(t, jsonMarshaller.Marshal(jsonByteBuffer, batches[0].Spans[0]))

tests := []struct {
unmarshaller Marshaller
encoding string
messages []Message
}{
{
unmarshaller: jaegerMarshaller{
marshaller: jaegerProtoSpanMarshaller{},
},
encoding: "jaeger_proto",
messages: []Message{{Value: jaegerProtoBytes}},
},
{
unmarshaller: jaegerMarshaller{
marshaller: jaegerJSONSpanMarshaller{
pbMarshaller: &jsonpb.Marshaler{},
},
},
encoding: "jaeger_json",
messages: []Message{{Value: jsonByteBuffer.Bytes()}},
},
}
for _, test := range tests {
t.Run(test.encoding, func(t *testing.T) {
messages, err := test.unmarshaller.Marshal(td)
require.NoError(t, err)
assert.Equal(t, test.messages, messages)
assert.Equal(t, test.encoding, test.unmarshaller.Encoding())
})
}
}

func TestJaegerMarshaller_error_covert_traceID(t *testing.T) {
marshaller := jaegerMarshaller{
marshaller: jaegerProtoSpanMarshaller{},
}
td := pdata.NewTraces()
td.ResourceSpans().Resize(1)
td.ResourceSpans().At(0).InstrumentationLibrarySpans().Resize(1)
td.ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).Spans().Resize(1)
// fails in zero traceID
messages, err := marshaller.Marshal(td)
require.Error(t, err)
assert.Nil(t, messages)
}
Loading

0 comments on commit 7f13eb6

Please sign in to comment.