Skip to content

Commit

Permalink
Kafka extract header metada (#24367)
Browse files Browse the repository at this point in the history
**Description:** Enable support to extract headers from Kafka Messages
and attach them to resource attributes. In later stages in the pipeline,
different exporters can utilize this information.

**Link to tracking Issue:**
[21729](#21729)

**Testing:** Added test cases for logs, traces, and metrics.

**Documentation:** <Describe the documentation added.>

---------

Co-authored-by: Alex Boten <aboten@lightstep.com>
  • Loading branch information
VihasMakwana and Alex Boten authored Sep 29, 2023
1 parent 9982fbc commit 31df678
Show file tree
Hide file tree
Showing 8 changed files with 493 additions and 67 deletions.
20 changes: 20 additions & 0 deletions .chloggen/header-extraction-kafka.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Use this changelog template to create an entry for release notes.
# If your change doesn't affect end users, such as a test fix or a tooling change,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: 'enhancement'

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: kafkareceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Allow users to attach kafka header metadata with the log/metric/trace record in the pipeline. Introduce a new config param, 'header_extraction' and some examples.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [24367]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
42 changes: 41 additions & 1 deletion receiver/kafkareceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,51 @@ The following settings can be optionally configured:
- `after`: (default = false) If true, the messages are marked after the pipeline execution
- `on_error`: (default = false) If false, only the successfully processed messages are marked
**Note: this can block the entire partition in case a message processing returns a permanent error**

- `header_extraction`:
- `extract_headers` (default = false): Allows user to attach header fields to resource attributes in otel piepline
- `headers` (default = []): List of headers they'd like to extract from kafka record.
**Note: Matching pattern will be `exact`. Regexes are not supported as of now.**
Example:

```yaml
receivers:
kafka:
protocol_version: 2.0.0
```
Example of header extraction:
```yaml
receivers:
kafka:
topic: test
header_extraction:
extract_headers: true
headers: ["header1", "header2"]
```
- If we feed following kafka record to `test` topic and use above configs:
```yaml
{
event: Hello,
headers: {
header1: value1,
header2: value2,
}
}
```
we will get a log record in collector similar to:
```yaml
{
...
body: Hello,
resource: {
kafka.header.header1: value1,
kafka.header.header2: value2,
},
...
}
```

- Here you can see the kafka record header `header1` and `header2` being added to resource attribute.
- Every **matching** kafka header key is prefixed with `kafka.header` string and attached to resource attributes.
8 changes: 8 additions & 0 deletions receiver/kafkareceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ type MessageMarking struct {
OnError bool `mapstructure:"on_error"`
}

type HeaderExtraction struct {
ExtractHeaders bool `mapstructure:"extract_headers"`
Headers []string `mapstructure:"headers"`
}

// Config defines configuration for Kafka receiver.
type Config struct {
// The list of kafka brokers (default localhost:9092)
Expand Down Expand Up @@ -60,6 +65,9 @@ type Config struct {

// Controls the way the messages are marked as consumed
MessageMarking MessageMarking `mapstructure:"message_marking"`

// Extract headers from kafka records
HeaderExtraction HeaderExtraction `mapstructure:"header_extraction"`
}

const (
Expand Down
3 changes: 3 additions & 0 deletions receiver/kafkareceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ func createDefaultConfig() component.Config {
After: false,
OnError: false,
},
HeaderExtraction: HeaderExtraction{
ExtractHeaders: false,
},
}
}

Expand Down
94 changes: 94 additions & 0 deletions receiver/kafkareceiver/header_extraction.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package kafkareceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkareceiver"

import (
"fmt"

"github.com/IBM/sarama"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.uber.org/zap"
)

func getAttribute(key string) string {
return fmt.Sprintf("kafka.header.%s", key)
}

type HeaderExtractor interface {
extractHeadersTraces(ptrace.Traces, *sarama.ConsumerMessage)
extractHeadersMetrics(pmetric.Metrics, *sarama.ConsumerMessage)
extractHeadersLogs(plog.Logs, *sarama.ConsumerMessage)
}

type headerExtractor struct {
logger *zap.Logger
headers []string
}

func (he *headerExtractor) extractHeadersTraces(traces ptrace.Traces, message *sarama.ConsumerMessage) {
for _, header := range he.headers {
value, ok := getHeaderValue(message.Headers, header)
if !ok {
he.logger.Debug("Header key not found in the trace: ", zap.String("key", header))
continue
}
for i := 0; i < traces.ResourceSpans().Len(); i++ {
rs := traces.ResourceSpans().At(i)
rs.Resource().Attributes().PutStr(getAttribute(header), value)
}
}
}

func (he *headerExtractor) extractHeadersLogs(logs plog.Logs, message *sarama.ConsumerMessage) {
for _, header := range he.headers {
value, ok := getHeaderValue(message.Headers, header)
if !ok {
he.logger.Debug("Header key not found in the log: ", zap.String("key", header))
continue
}
for i := 0; i < logs.ResourceLogs().Len(); i++ {
rl := logs.ResourceLogs().At(i)
rl.Resource().Attributes().PutStr(getAttribute(header), value)
}
}
}

func (he *headerExtractor) extractHeadersMetrics(metrics pmetric.Metrics, message *sarama.ConsumerMessage) {
for _, header := range he.headers {
value, ok := getHeaderValue(message.Headers, header)
if !ok {
he.logger.Debug("Header key not found in the metric: ", zap.String("key", header))
continue
}
for i := 0; i < metrics.ResourceMetrics().Len(); i++ {
rm := metrics.ResourceMetrics().At(i)
rm.Resource().Attributes().PutStr(getAttribute(header), value)
}
}
}

func getHeaderValue(headers []*sarama.RecordHeader, header string) (string, bool) {
for _, kafkaHeader := range headers {
headerKey := string(kafkaHeader.Key)
if headerKey == header {
// matching header found
return string(kafkaHeader.Value), true
}
}
// no header found matching the key, report to the user
return "", false
}

type nopHeaderExtractor struct{}

func (he *nopHeaderExtractor) extractHeadersTraces(_ ptrace.Traces, _ *sarama.ConsumerMessage) {
}

func (he *nopHeaderExtractor) extractHeadersLogs(_ plog.Logs, _ *sarama.ConsumerMessage) {
}

func (he *nopHeaderExtractor) extractHeadersMetrics(_ pmetric.Metrics, _ *sarama.ConsumerMessage) {
}
Loading

0 comments on commit 31df678

Please sign in to comment.