Skip to content

Commit c5519e5

Browse files
thmshmmjriguera
authored andcommitted
[receiver/kafkareceiver] Add encoding extensions support (open-telemetry#33888)
**Description:** Add support for encoding extensions in the kafkareceiver <!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.--> To be able to use encoding extensions this PR adds extension support and proposes to rename the existing `encoding` configuration property to `format` and reusing the `encoding` property for configuring encoding extensions. Reason is to be consistent with other receivers/exporters like the `fileexporter` that already support extensions. **Link to tracking Issue:** n/a **Testing:** Tested with the existing avro_log_encoding extension as well with receivers internal json encoding. **Documentation:**: Updated README.md within the receiver describing the use of encoding extensions.
1 parent 1c0a193 commit c5519e5

File tree

7 files changed

+333
-122
lines changed

7 files changed

+333
-122
lines changed
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: kafkareceiver
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Add support for encoding extensions in the Kafka receiver.
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [33888]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext: |
19+
This change adds support for encoding extensions in the Kafka receiver. Loading extensions takes precedence over the internally supported encodings.
20+
21+
# If your change doesn't affect end users or the exported elements of any package,
22+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
23+
# Optional: The change log or logs in which this entry should be included.
24+
# e.g. '[user]' or '[user, api]'
25+
# Include 'user' if the change is relevant to end users.
26+
# Include 'api' if there is a change to a library API.
27+
# Default: '[user]'
28+
change_logs: [user]

receiver/kafkareceiver/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ The following settings can be optionally configured:
2929
- `resolve_canonical_bootstrap_servers_only` (default = false): Whether to resolve then reverse-lookup broker IPs during startup
3030
- `topic` (default = otlp_spans for traces, otlp_metrics for metrics, otlp_logs for logs): The name of the kafka topic to read from.
3131
Only one telemetry type may be used for a given topic.
32-
- `encoding` (default = otlp_proto): The encoding of the payload received from kafka. Available encodings:
32+
- `encoding` (default = otlp_proto): The encoding of the payload received from kafka. Supports encoding extensions. Tries to load an encoding extension and falls back to internal encodings if no extension was loaded. Available internal encodings:
3333
- `otlp_proto`: the payload is deserialized to `ExportTraceServiceRequest`, `ExportLogsServiceRequest` or `ExportMetricsServiceRequest` respectively.
3434
- `jaeger_proto`: the payload is deserialized to a single Jaeger proto `Span`.
3535
- `jaeger_json`: the payload is deserialized to a single Jaeger JSON Span using `jsonpb`.

receiver/kafkareceiver/factory.go

Lines changed: 5 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -54,40 +54,9 @@ var errUnrecognizedEncoding = fmt.Errorf("unrecognized encoding")
5454
// FactoryOption applies changes to kafkaExporterFactory.
5555
type FactoryOption func(factory *kafkaReceiverFactory)
5656

57-
// withTracesUnmarshalers adds Unmarshalers.
58-
func withTracesUnmarshalers(tracesUnmarshalers ...TracesUnmarshaler) FactoryOption {
59-
return func(factory *kafkaReceiverFactory) {
60-
for _, unmarshaler := range tracesUnmarshalers {
61-
factory.tracesUnmarshalers[unmarshaler.Encoding()] = unmarshaler
62-
}
63-
}
64-
}
65-
66-
// withMetricsUnmarshalers adds MetricsUnmarshalers.
67-
func withMetricsUnmarshalers(metricsUnmarshalers ...MetricsUnmarshaler) FactoryOption {
68-
return func(factory *kafkaReceiverFactory) {
69-
for _, unmarshaler := range metricsUnmarshalers {
70-
factory.metricsUnmarshalers[unmarshaler.Encoding()] = unmarshaler
71-
}
72-
}
73-
}
74-
75-
// withLogsUnmarshalers adds LogsUnmarshalers.
76-
func withLogsUnmarshalers(logsUnmarshalers ...LogsUnmarshaler) FactoryOption {
77-
return func(factory *kafkaReceiverFactory) {
78-
for _, unmarshaler := range logsUnmarshalers {
79-
factory.logsUnmarshalers[unmarshaler.Encoding()] = unmarshaler
80-
}
81-
}
82-
}
83-
8457
// NewFactory creates Kafka receiver factory.
8558
func NewFactory(options ...FactoryOption) receiver.Factory {
86-
f := &kafkaReceiverFactory{
87-
tracesUnmarshalers: map[string]TracesUnmarshaler{},
88-
metricsUnmarshalers: map[string]MetricsUnmarshaler{},
89-
logsUnmarshalers: map[string]LogsUnmarshaler{},
90-
}
59+
f := &kafkaReceiverFactory{}
9160
for _, o := range options {
9261
o(f)
9362
}
@@ -133,32 +102,20 @@ func createDefaultConfig() component.Config {
133102
}
134103
}
135104

136-
type kafkaReceiverFactory struct {
137-
tracesUnmarshalers map[string]TracesUnmarshaler
138-
metricsUnmarshalers map[string]MetricsUnmarshaler
139-
logsUnmarshalers map[string]LogsUnmarshaler
140-
}
105+
type kafkaReceiverFactory struct{}
141106

142107
func (f *kafkaReceiverFactory) createTracesReceiver(
143108
_ context.Context,
144109
set receiver.Settings,
145110
cfg component.Config,
146111
nextConsumer consumer.Traces,
147112
) (receiver.Traces, error) {
148-
for encoding, unmarshal := range defaultTracesUnmarshalers() {
149-
f.tracesUnmarshalers[encoding] = unmarshal
150-
}
151-
152113
oCfg := *(cfg.(*Config))
153114
if oCfg.Topic == "" {
154115
oCfg.Topic = defaultTracesTopic
155116
}
156-
unmarshaler := f.tracesUnmarshalers[oCfg.Encoding]
157-
if unmarshaler == nil {
158-
return nil, errUnrecognizedEncoding
159-
}
160117

161-
r, err := newTracesReceiver(oCfg, set, unmarshaler, nextConsumer)
118+
r, err := newTracesReceiver(oCfg, set, nextConsumer)
162119
if err != nil {
163120
return nil, err
164121
}
@@ -171,20 +128,12 @@ func (f *kafkaReceiverFactory) createMetricsReceiver(
171128
cfg component.Config,
172129
nextConsumer consumer.Metrics,
173130
) (receiver.Metrics, error) {
174-
for encoding, unmarshal := range defaultMetricsUnmarshalers() {
175-
f.metricsUnmarshalers[encoding] = unmarshal
176-
}
177-
178131
oCfg := *(cfg.(*Config))
179132
if oCfg.Topic == "" {
180133
oCfg.Topic = defaultMetricsTopic
181134
}
182-
unmarshaler := f.metricsUnmarshalers[oCfg.Encoding]
183-
if unmarshaler == nil {
184-
return nil, errUnrecognizedEncoding
185-
}
186135

187-
r, err := newMetricsReceiver(oCfg, set, unmarshaler, nextConsumer)
136+
r, err := newMetricsReceiver(oCfg, set, nextConsumer)
188137
if err != nil {
189138
return nil, err
190139
}
@@ -197,20 +146,12 @@ func (f *kafkaReceiverFactory) createLogsReceiver(
197146
cfg component.Config,
198147
nextConsumer consumer.Logs,
199148
) (receiver.Logs, error) {
200-
for encoding, unmarshaler := range defaultLogsUnmarshalers(set.BuildInfo.Version, set.Logger) {
201-
f.logsUnmarshalers[encoding] = unmarshaler
202-
}
203-
204149
oCfg := *(cfg.(*Config))
205150
if oCfg.Topic == "" {
206151
oCfg.Topic = defaultLogsTopic
207152
}
208-
unmarshaler, err := getLogsUnmarshaler(oCfg.Encoding, f.logsUnmarshalers)
209-
if err != nil {
210-
return nil, err
211-
}
212153

213-
r, err := newLogsReceiver(oCfg, set, unmarshaler, nextConsumer)
154+
r, err := newLogsReceiver(oCfg, set, nextConsumer)
214155
if err != nil {
215156
return nil, err
216157
}

receiver/kafkareceiver/factory_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ func TestCreateTracesReceiver(t *testing.T) {
3737
cfg := createDefaultConfig().(*Config)
3838
cfg.Brokers = []string{"invalid:9092"}
3939
cfg.ProtocolVersion = "2.0.0"
40-
f := kafkaReceiverFactory{tracesUnmarshalers: defaultTracesUnmarshalers()}
40+
f := kafkaReceiverFactory{}
4141
r, err := f.createTracesReceiver(context.Background(), receivertest.NewNopSettings(), cfg, nil)
4242
require.NoError(t, err)
4343
// no available broker
@@ -46,7 +46,7 @@ func TestCreateTracesReceiver(t *testing.T) {
4646

4747
func TestWithTracesUnmarshalers(t *testing.T) {
4848
unmarshaler := &customTracesUnmarshaler{}
49-
f := NewFactory(withTracesUnmarshalers(unmarshaler))
49+
f := NewFactory()
5050
cfg := createDefaultConfig().(*Config)
5151
// disable contacting broker
5252
cfg.Metadata.Full = false
@@ -76,7 +76,7 @@ func TestCreateMetricsReceiver(t *testing.T) {
7676
cfg := createDefaultConfig().(*Config)
7777
cfg.Brokers = []string{"invalid:9092"}
7878
cfg.ProtocolVersion = "2.0.0"
79-
f := kafkaReceiverFactory{metricsUnmarshalers: defaultMetricsUnmarshalers()}
79+
f := kafkaReceiverFactory{}
8080
r, err := f.createMetricsReceiver(context.Background(), receivertest.NewNopSettings(), cfg, nil)
8181
require.NoError(t, err)
8282
// no available broker
@@ -85,7 +85,7 @@ func TestCreateMetricsReceiver(t *testing.T) {
8585

8686
func TestWithMetricsUnmarshalers(t *testing.T) {
8787
unmarshaler := &customMetricsUnmarshaler{}
88-
f := NewFactory(withMetricsUnmarshalers(unmarshaler))
88+
f := NewFactory()
8989
cfg := createDefaultConfig().(*Config)
9090
// disable contacting broker
9191
cfg.Metadata.Full = false
@@ -115,7 +115,7 @@ func TestCreateLogsReceiver(t *testing.T) {
115115
cfg := createDefaultConfig().(*Config)
116116
cfg.Brokers = []string{"invalid:9092"}
117117
cfg.ProtocolVersion = "2.0.0"
118-
f := kafkaReceiverFactory{logsUnmarshalers: defaultLogsUnmarshalers("Test Version", zap.NewNop())}
118+
f := kafkaReceiverFactory{}
119119
r, err := f.createLogsReceiver(context.Background(), receivertest.NewNopSettings(), cfg, nil)
120120
require.NoError(t, err)
121121
// no available broker
@@ -146,7 +146,7 @@ func TestGetLogsUnmarshaler_encoding_text_error(t *testing.T) {
146146

147147
func TestWithLogsUnmarshalers(t *testing.T) {
148148
unmarshaler := &customLogsUnmarshaler{}
149-
f := NewFactory(withLogsUnmarshalers(unmarshaler))
149+
f := NewFactory()
150150
cfg := createDefaultConfig().(*Config)
151151
// disable contacting broker
152152
cfg.Metadata.Full = false

receiver/kafkareceiver/kafka_receiver.go

Lines changed: 84 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,9 @@ import (
1414
"go.opentelemetry.io/collector/component"
1515
"go.opentelemetry.io/collector/component/componentstatus"
1616
"go.opentelemetry.io/collector/consumer"
17+
"go.opentelemetry.io/collector/pdata/plog"
18+
"go.opentelemetry.io/collector/pdata/pmetric"
19+
"go.opentelemetry.io/collector/pdata/ptrace"
1720
"go.opentelemetry.io/collector/receiver"
1821
"go.opentelemetry.io/collector/receiver/receiverhelper"
1922
"go.opentelemetry.io/otel/attribute"
@@ -100,11 +103,7 @@ var _ receiver.Traces = (*kafkaTracesConsumer)(nil)
100103
var _ receiver.Metrics = (*kafkaMetricsConsumer)(nil)
101104
var _ receiver.Logs = (*kafkaLogsConsumer)(nil)
102105

103-
func newTracesReceiver(config Config, set receiver.Settings, unmarshaler TracesUnmarshaler, nextConsumer consumer.Traces) (*kafkaTracesConsumer, error) {
104-
if unmarshaler == nil {
105-
return nil, errUnrecognizedEncoding
106-
}
107-
106+
func newTracesReceiver(config Config, set receiver.Settings, nextConsumer consumer.Traces) (*kafkaTracesConsumer, error) {
108107
telemetryBuilder, err := metadata.NewTelemetryBuilder(set.TelemetrySettings)
109108
if err != nil {
110109
return nil, err
@@ -114,7 +113,6 @@ func newTracesReceiver(config Config, set receiver.Settings, unmarshaler TracesU
114113
config: config,
115114
topics: []string{config.Topic},
116115
nextConsumer: nextConsumer,
117-
unmarshaler: unmarshaler,
118116
settings: set,
119117
autocommitEnabled: config.AutoCommit.Enable,
120118
messageMarking: config.MessageMarking,
@@ -170,6 +168,22 @@ func (c *kafkaTracesConsumer) Start(_ context.Context, host component.Host) erro
170168
if err != nil {
171169
return err
172170
}
171+
// extensions take precedence over internal encodings
172+
if unmarshaler, errExt := loadEncodingExtension[ptrace.Unmarshaler](
173+
host,
174+
c.config.Encoding,
175+
); errExt == nil {
176+
c.unmarshaler = &tracesEncodingUnmarshaler{
177+
unmarshaler: *unmarshaler,
178+
encoding: c.config.Encoding,
179+
}
180+
}
181+
if unmarshaler, ok := defaultTracesUnmarshalers()[c.config.Encoding]; c.unmarshaler == nil && ok {
182+
c.unmarshaler = unmarshaler
183+
}
184+
if c.unmarshaler == nil {
185+
return errUnrecognizedEncoding
186+
}
173187
// consumerGroup may be set in tests to inject fake implementation.
174188
if c.consumerGroup == nil {
175189
if c.consumerGroup, err = createKafkaClient(c.config); err != nil {
@@ -229,11 +243,7 @@ func (c *kafkaTracesConsumer) Shutdown(context.Context) error {
229243
return c.consumerGroup.Close()
230244
}
231245

232-
func newMetricsReceiver(config Config, set receiver.Settings, unmarshaler MetricsUnmarshaler, nextConsumer consumer.Metrics) (*kafkaMetricsConsumer, error) {
233-
if unmarshaler == nil {
234-
return nil, errUnrecognizedEncoding
235-
}
236-
246+
func newMetricsReceiver(config Config, set receiver.Settings, nextConsumer consumer.Metrics) (*kafkaMetricsConsumer, error) {
237247
telemetryBuilder, err := metadata.NewTelemetryBuilder(set.TelemetrySettings)
238248
if err != nil {
239249
return nil, err
@@ -243,7 +253,6 @@ func newMetricsReceiver(config Config, set receiver.Settings, unmarshaler Metric
243253
config: config,
244254
topics: []string{config.Topic},
245255
nextConsumer: nextConsumer,
246-
unmarshaler: unmarshaler,
247256
settings: set,
248257
autocommitEnabled: config.AutoCommit.Enable,
249258
messageMarking: config.MessageMarking,
@@ -267,6 +276,22 @@ func (c *kafkaMetricsConsumer) Start(_ context.Context, host component.Host) err
267276
if err != nil {
268277
return err
269278
}
279+
// extensions take precedence over internal encodings
280+
if unmarshaler, errExt := loadEncodingExtension[pmetric.Unmarshaler](
281+
host,
282+
c.config.Encoding,
283+
); errExt == nil {
284+
c.unmarshaler = &metricsEncodingUnmarshaler{
285+
unmarshaler: *unmarshaler,
286+
encoding: c.config.Encoding,
287+
}
288+
}
289+
if unmarshaler, ok := defaultMetricsUnmarshalers()[c.config.Encoding]; c.unmarshaler == nil && ok {
290+
c.unmarshaler = unmarshaler
291+
}
292+
if c.unmarshaler == nil {
293+
return errUnrecognizedEncoding
294+
}
270295
// consumerGroup may be set in tests to inject fake implementation.
271296
if c.consumerGroup == nil {
272297
if c.consumerGroup, err = createKafkaClient(c.config); err != nil {
@@ -326,11 +351,7 @@ func (c *kafkaMetricsConsumer) Shutdown(context.Context) error {
326351
return c.consumerGroup.Close()
327352
}
328353

329-
func newLogsReceiver(config Config, set receiver.Settings, unmarshaler LogsUnmarshaler, nextConsumer consumer.Logs) (*kafkaLogsConsumer, error) {
330-
if unmarshaler == nil {
331-
return nil, errUnrecognizedEncoding
332-
}
333-
354+
func newLogsReceiver(config Config, set receiver.Settings, nextConsumer consumer.Logs) (*kafkaLogsConsumer, error) {
334355
telemetryBuilder, err := metadata.NewTelemetryBuilder(set.TelemetrySettings)
335356
if err != nil {
336357
return nil, err
@@ -340,7 +361,6 @@ func newLogsReceiver(config Config, set receiver.Settings, unmarshaler LogsUnmar
340361
config: config,
341362
topics: []string{config.Topic},
342363
nextConsumer: nextConsumer,
343-
unmarshaler: unmarshaler,
344364
settings: set,
345365
autocommitEnabled: config.AutoCommit.Enable,
346366
messageMarking: config.MessageMarking,
@@ -364,6 +384,25 @@ func (c *kafkaLogsConsumer) Start(_ context.Context, host component.Host) error
364384
if err != nil {
365385
return err
366386
}
387+
// extensions take precedence over internal encodings
388+
if unmarshaler, errExt := loadEncodingExtension[plog.Unmarshaler](
389+
host,
390+
c.config.Encoding,
391+
); errExt == nil {
392+
c.unmarshaler = &logsEncodingUnmarshaler{
393+
unmarshaler: *unmarshaler,
394+
encoding: c.config.Encoding,
395+
}
396+
}
397+
if unmarshaler, errInt := getLogsUnmarshaler(
398+
c.config.Encoding,
399+
defaultLogsUnmarshalers(c.settings.BuildInfo.Version, c.settings.Logger),
400+
); c.unmarshaler == nil && errInt == nil {
401+
c.unmarshaler = unmarshaler
402+
}
403+
if c.unmarshaler == nil {
404+
return errUnrecognizedEncoding
405+
}
367406
// consumerGroup may be set in tests to inject fake implementation.
368407
if c.consumerGroup == nil {
369408
if c.consumerGroup, err = createKafkaClient(c.config); err != nil {
@@ -720,3 +759,30 @@ func toSaramaInitialOffset(initialOffset string) (int64, error) {
720759
return 0, errInvalidInitialOffset
721760
}
722761
}
762+
763+
// loadEncodingExtension tries to load an available extension for the given encoding.
764+
func loadEncodingExtension[T any](host component.Host, encoding string) (*T, error) {
765+
extensionID, err := encodingToComponentID(encoding)
766+
if err != nil {
767+
return nil, err
768+
}
769+
encodingExtension, ok := host.GetExtensions()[*extensionID]
770+
if !ok {
771+
return nil, fmt.Errorf("unknown encoding extension %q", encoding)
772+
}
773+
unmarshaler, ok := encodingExtension.(T)
774+
if !ok {
775+
return nil, fmt.Errorf("extension %q is not an unmarshaler", encoding)
776+
}
777+
return &unmarshaler, nil
778+
}
779+
780+
// encodingToComponentID converts an encoding string to a component ID using the given encoding as type.
781+
func encodingToComponentID(encoding string) (*component.ID, error) {
782+
componentType, err := component.NewType(encoding)
783+
if err != nil {
784+
return nil, fmt.Errorf("invalid component type: %w", err)
785+
}
786+
id := component.NewID(componentType)
787+
return &id, nil
788+
}

0 commit comments

Comments
 (0)