Skip to content

Commit

Permalink
chore: use nop-header extractir
Browse files Browse the repository at this point in the history
  • Loading branch information
Vihas Splunk committed Aug 8, 2023
1 parent 2c283ee commit d7e367f
Show file tree
Hide file tree
Showing 6 changed files with 107 additions and 79 deletions.
1 change: 1 addition & 0 deletions receiver/kafkareceiver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ require (

require (
github.com/aws/aws-sdk-go v1.44.299 // indirect
github.com/benbjohnson/clock v1.3.0 // indirect
github.com/cenkalti/backoff/v4 v4.2.1 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/eapache/go-resiliency v1.3.0 // indirect
Expand Down
1 change: 1 addition & 0 deletions receiver/kafkareceiver/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 18 additions & 1 deletion receiver/kafkareceiver/header_extraction.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@ func getAttribute(key string) string {
return fmt.Sprintf("kafka.header.%s", key)
}

type HeaderExtractor interface {
extractHeadersTraces(ptrace.Traces, *sarama.ConsumerMessage)
extractHeadersMetrics(pmetric.Metrics, *sarama.ConsumerMessage)
extractHeadersLogs(plog.Logs, *sarama.ConsumerMessage)
}

type headerExtractor struct {
logger *zap.Logger
headers []string
Expand All @@ -40,7 +46,7 @@ func (he *headerExtractor) extractHeadersLogs(logs plog.Logs, message *sarama.Co
for _, header := range he.headers {
value, ok := getHeaderValue(message.Headers, header)
if !ok {
he.logger.Debug("Header key not found in the logger: ", zap.String("key", header))
he.logger.Debug("Header key not found in the log: ", zap.String("key", header))
continue
}
for i := 0; i < logs.ResourceLogs().Len(); i++ {
Expand Down Expand Up @@ -75,3 +81,14 @@ func getHeaderValue(headers []*sarama.RecordHeader, header string) (string, bool
// no header found matching the key, report to the user
return "", false
}

type nopHeaderExtractor struct{}

func (he *nopHeaderExtractor) extractHeadersTraces(traces ptrace.Traces, message *sarama.ConsumerMessage) {
}

func (he *nopHeaderExtractor) extractHeadersLogs(logs plog.Logs, message *sarama.ConsumerMessage) {
}

func (he *nopHeaderExtractor) extractHeadersMetrics(metrics pmetric.Metrics, message *sarama.ConsumerMessage) {
}
1 change: 0 additions & 1 deletion receiver/kafkareceiver/header_extraction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,6 @@ func TestHeaderExtractionMetrics(t *testing.T) {
},
Value: bts,
}
// groupClaim.messageChan <- &sarama.ConsumerMessage{}
cancelFunc()
wg.Wait()

Expand Down
21 changes: 9 additions & 12 deletions receiver/kafkareceiver/kafka_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ func (c *kafkaTracesConsumer) Start(_ context.Context, host component.Host) erro
obsrecv: obsrecv,
autocommitEnabled: c.autocommitEnabled,
messageMarking: c.messageMarking,
headerExtractor: &nopHeaderExtractor{},
}
if c.headerExtraction {
consumerGroup.headerExtractor = &headerExtractor{
Expand Down Expand Up @@ -245,6 +246,7 @@ func (c *kafkaMetricsConsumer) Start(_ context.Context, host component.Host) err
obsrecv: obsrecv,
autocommitEnabled: c.autocommitEnabled,
messageMarking: c.messageMarking,
headerExtractor: &nopHeaderExtractor{},
}
if c.headerExtraction {
metricsConsumerGroup.headerExtractor = &headerExtractor{
Expand Down Expand Up @@ -374,6 +376,7 @@ func (c *kafkaLogsConsumer) Start(_ context.Context, host component.Host) error
obsrecv: obsrecv,
autocommitEnabled: c.autocommitEnabled,
messageMarking: c.messageMarking,
headerExtractor: &nopHeaderExtractor{},
}
if c.headerExtraction {
logsConsumerGroup.headerExtractor = &headerExtractor{
Expand Down Expand Up @@ -424,7 +427,7 @@ type tracesConsumerGroupHandler struct {

autocommitEnabled bool
messageMarking MessageMarking
headerExtractor *headerExtractor
headerExtractor HeaderExtractor
}

type metricsConsumerGroupHandler struct {
Expand All @@ -440,7 +443,7 @@ type metricsConsumerGroupHandler struct {

autocommitEnabled bool
messageMarking MessageMarking
headerExtractor *headerExtractor
headerExtractor HeaderExtractor
}

type logsConsumerGroupHandler struct {
Expand All @@ -456,7 +459,7 @@ type logsConsumerGroupHandler struct {

autocommitEnabled bool
messageMarking MessageMarking
headerExtractor *headerExtractor
headerExtractor HeaderExtractor
}

var _ sarama.ConsumerGroupHandler = (*tracesConsumerGroupHandler)(nil)
Expand Down Expand Up @@ -513,9 +516,7 @@ func (c *tracesConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSe
return err
}

if c.headerExtractor != nil {
c.headerExtractor.extractHeadersTraces(traces, message)
}
c.headerExtractor.extractHeadersTraces(traces, message)
spanCount := traces.SpanCount()
err = c.nextConsumer.ConsumeTraces(session.Context(), traces)
c.obsrecv.EndTracesOp(ctx, c.unmarshaler.Encoding(), spanCount, err)
Expand Down Expand Up @@ -590,9 +591,7 @@ func (c *metricsConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupS
}
return err
}
if c.headerExtractor != nil {
c.headerExtractor.extractHeadersMetrics(metrics, message)
}
c.headerExtractor.extractHeadersMetrics(metrics, message)

dataPointCount := metrics.DataPointCount()
err = c.nextConsumer.ConsumeMetrics(session.Context(), metrics)
Expand Down Expand Up @@ -673,9 +672,7 @@ func (c *logsConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSess
}
return err
}
if c.headerExtractor != nil {
c.headerExtractor.extractHeadersLogs(logs, message)
}
c.headerExtractor.extractHeadersLogs(logs, message)
err = c.nextConsumer.ConsumeLogs(session.Context(), logs)
// TODO
c.obsrecv.EndLogsOp(ctx, c.unmarshaler.Encoding(), logs.LogRecordCount(), err)
Expand Down
143 changes: 78 additions & 65 deletions receiver/kafkareceiver/kafka_receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,11 +139,12 @@ func TestTracesConsumerGroupHandler(t *testing.T) {
obsrecv, err := obsreport.NewReceiver(obsreport.ReceiverSettings{ReceiverCreateSettings: receivertest.NewNopCreateSettings()})
require.NoError(t, err)
c := tracesConsumerGroupHandler{
unmarshaler: newPdataTracesUnmarshaler(&ptrace.ProtoUnmarshaler{}, defaultEncoding),
logger: zap.NewNop(),
ready: make(chan bool),
nextConsumer: consumertest.NewNop(),
obsrecv: obsrecv,
unmarshaler: newPdataTracesUnmarshaler(&ptrace.ProtoUnmarshaler{}, defaultEncoding),
logger: zap.NewNop(),
ready: make(chan bool),
nextConsumer: consumertest.NewNop(),
obsrecv: obsrecv,
headerExtractor: &nopHeaderExtractor{},
}

testSession := testConsumerGroupSession{ctx: context.Background()}
Expand Down Expand Up @@ -188,11 +189,12 @@ func TestTracesConsumerGroupHandler_session_done(t *testing.T) {
obsrecv, err := obsreport.NewReceiver(obsreport.ReceiverSettings{ReceiverCreateSettings: receivertest.NewNopCreateSettings()})
require.NoError(t, err)
c := tracesConsumerGroupHandler{
unmarshaler: newPdataTracesUnmarshaler(&ptrace.ProtoUnmarshaler{}, defaultEncoding),
logger: zap.NewNop(),
ready: make(chan bool),
nextConsumer: consumertest.NewNop(),
obsrecv: obsrecv,
unmarshaler: newPdataTracesUnmarshaler(&ptrace.ProtoUnmarshaler{}, defaultEncoding),
logger: zap.NewNop(),
ready: make(chan bool),
nextConsumer: consumertest.NewNop(),
obsrecv: obsrecv,
headerExtractor: &nopHeaderExtractor{},
}

ctx, cancelFunc := context.WithCancel(context.Background())
Expand Down Expand Up @@ -234,11 +236,12 @@ func TestTracesConsumerGroupHandler_error_unmarshal(t *testing.T) {
obsrecv, err := obsreport.NewReceiver(obsreport.ReceiverSettings{ReceiverCreateSettings: receivertest.NewNopCreateSettings()})
require.NoError(t, err)
c := tracesConsumerGroupHandler{
unmarshaler: newPdataTracesUnmarshaler(&ptrace.ProtoUnmarshaler{}, defaultEncoding),
logger: zap.NewNop(),
ready: make(chan bool),
nextConsumer: consumertest.NewNop(),
obsrecv: obsrecv,
unmarshaler: newPdataTracesUnmarshaler(&ptrace.ProtoUnmarshaler{}, defaultEncoding),
logger: zap.NewNop(),
ready: make(chan bool),
nextConsumer: consumertest.NewNop(),
obsrecv: obsrecv,
headerExtractor: &nopHeaderExtractor{},
}

wg := sync.WaitGroup{}
Expand All @@ -261,11 +264,12 @@ func TestTracesConsumerGroupHandler_error_nextConsumer(t *testing.T) {
obsrecv, err := obsreport.NewReceiver(obsreport.ReceiverSettings{ReceiverCreateSettings: receivertest.NewNopCreateSettings()})
require.NoError(t, err)
c := tracesConsumerGroupHandler{
unmarshaler: newPdataTracesUnmarshaler(&ptrace.ProtoUnmarshaler{}, defaultEncoding),
logger: zap.NewNop(),
ready: make(chan bool),
nextConsumer: consumertest.NewErr(consumerError),
obsrecv: obsrecv,
unmarshaler: newPdataTracesUnmarshaler(&ptrace.ProtoUnmarshaler{}, defaultEncoding),
logger: zap.NewNop(),
ready: make(chan bool),
nextConsumer: consumertest.NewErr(consumerError),
obsrecv: obsrecv,
headerExtractor: &nopHeaderExtractor{},
}

wg := sync.WaitGroup{}
Expand Down Expand Up @@ -396,11 +400,12 @@ func TestMetricsConsumerGroupHandler(t *testing.T) {
obsrecv, err := obsreport.NewReceiver(obsreport.ReceiverSettings{ReceiverCreateSettings: receivertest.NewNopCreateSettings()})
require.NoError(t, err)
c := metricsConsumerGroupHandler{
unmarshaler: newPdataMetricsUnmarshaler(&pmetric.ProtoUnmarshaler{}, defaultEncoding),
logger: zap.NewNop(),
ready: make(chan bool),
nextConsumer: consumertest.NewNop(),
obsrecv: obsrecv,
unmarshaler: newPdataMetricsUnmarshaler(&pmetric.ProtoUnmarshaler{}, defaultEncoding),
logger: zap.NewNop(),
ready: make(chan bool),
nextConsumer: consumertest.NewNop(),
obsrecv: obsrecv,
headerExtractor: &nopHeaderExtractor{},
}

testSession := testConsumerGroupSession{ctx: context.Background()}
Expand Down Expand Up @@ -445,11 +450,12 @@ func TestMetricsConsumerGroupHandler_session_done(t *testing.T) {
obsrecv, err := obsreport.NewReceiver(obsreport.ReceiverSettings{ReceiverCreateSettings: receivertest.NewNopCreateSettings()})
require.NoError(t, err)
c := metricsConsumerGroupHandler{
unmarshaler: newPdataMetricsUnmarshaler(&pmetric.ProtoUnmarshaler{}, defaultEncoding),
logger: zap.NewNop(),
ready: make(chan bool),
nextConsumer: consumertest.NewNop(),
obsrecv: obsrecv,
unmarshaler: newPdataMetricsUnmarshaler(&pmetric.ProtoUnmarshaler{}, defaultEncoding),
logger: zap.NewNop(),
ready: make(chan bool),
nextConsumer: consumertest.NewNop(),
obsrecv: obsrecv,
headerExtractor: &nopHeaderExtractor{},
}

ctx, cancelFunc := context.WithCancel(context.Background())
Expand Down Expand Up @@ -490,11 +496,12 @@ func TestMetricsConsumerGroupHandler_error_unmarshal(t *testing.T) {
obsrecv, err := obsreport.NewReceiver(obsreport.ReceiverSettings{ReceiverCreateSettings: receivertest.NewNopCreateSettings()})
require.NoError(t, err)
c := metricsConsumerGroupHandler{
unmarshaler: newPdataMetricsUnmarshaler(&pmetric.ProtoUnmarshaler{}, defaultEncoding),
logger: zap.NewNop(),
ready: make(chan bool),
nextConsumer: consumertest.NewNop(),
obsrecv: obsrecv,
unmarshaler: newPdataMetricsUnmarshaler(&pmetric.ProtoUnmarshaler{}, defaultEncoding),
logger: zap.NewNop(),
ready: make(chan bool),
nextConsumer: consumertest.NewNop(),
obsrecv: obsrecv,
headerExtractor: &nopHeaderExtractor{},
}

wg := sync.WaitGroup{}
Expand All @@ -517,11 +524,12 @@ func TestMetricsConsumerGroupHandler_error_nextConsumer(t *testing.T) {
obsrecv, err := obsreport.NewReceiver(obsreport.ReceiverSettings{ReceiverCreateSettings: receivertest.NewNopCreateSettings()})
require.NoError(t, err)
c := metricsConsumerGroupHandler{
unmarshaler: newPdataMetricsUnmarshaler(&pmetric.ProtoUnmarshaler{}, defaultEncoding),
logger: zap.NewNop(),
ready: make(chan bool),
nextConsumer: consumertest.NewErr(consumerError),
obsrecv: obsrecv,
unmarshaler: newPdataMetricsUnmarshaler(&pmetric.ProtoUnmarshaler{}, defaultEncoding),
logger: zap.NewNop(),
ready: make(chan bool),
nextConsumer: consumertest.NewErr(consumerError),
obsrecv: obsrecv,
headerExtractor: &nopHeaderExtractor{},
}

wg := sync.WaitGroup{}
Expand Down Expand Up @@ -651,11 +659,12 @@ func TestLogsConsumerGroupHandler(t *testing.T) {
obsrecv, err := obsreport.NewReceiver(obsreport.ReceiverSettings{ReceiverCreateSettings: receivertest.NewNopCreateSettings()})
require.NoError(t, err)
c := logsConsumerGroupHandler{
unmarshaler: newPdataLogsUnmarshaler(&plog.ProtoUnmarshaler{}, defaultEncoding),
logger: zap.NewNop(),
ready: make(chan bool),
nextConsumer: consumertest.NewNop(),
obsrecv: obsrecv,
unmarshaler: newPdataLogsUnmarshaler(&plog.ProtoUnmarshaler{}, defaultEncoding),
logger: zap.NewNop(),
ready: make(chan bool),
nextConsumer: consumertest.NewNop(),
obsrecv: obsrecv,
headerExtractor: &nopHeaderExtractor{},
}

testSession := testConsumerGroupSession{ctx: context.Background()}
Expand Down Expand Up @@ -700,11 +709,12 @@ func TestLogsConsumerGroupHandler_session_done(t *testing.T) {
obsrecv, err := obsreport.NewReceiver(obsreport.ReceiverSettings{ReceiverCreateSettings: receivertest.NewNopCreateSettings()})
require.NoError(t, err)
c := logsConsumerGroupHandler{
unmarshaler: newPdataLogsUnmarshaler(&plog.ProtoUnmarshaler{}, defaultEncoding),
logger: zap.NewNop(),
ready: make(chan bool),
nextConsumer: consumertest.NewNop(),
obsrecv: obsrecv,
unmarshaler: newPdataLogsUnmarshaler(&plog.ProtoUnmarshaler{}, defaultEncoding),
logger: zap.NewNop(),
ready: make(chan bool),
nextConsumer: consumertest.NewNop(),
obsrecv: obsrecv,
headerExtractor: &nopHeaderExtractor{},
}

ctx, cancelFunc := context.WithCancel(context.Background())
Expand Down Expand Up @@ -745,11 +755,12 @@ func TestLogsConsumerGroupHandler_error_unmarshal(t *testing.T) {
obsrecv, err := obsreport.NewReceiver(obsreport.ReceiverSettings{ReceiverCreateSettings: receivertest.NewNopCreateSettings()})
require.NoError(t, err)
c := logsConsumerGroupHandler{
unmarshaler: newPdataLogsUnmarshaler(&plog.ProtoUnmarshaler{}, defaultEncoding),
logger: zap.NewNop(),
ready: make(chan bool),
nextConsumer: consumertest.NewNop(),
obsrecv: obsrecv,
unmarshaler: newPdataLogsUnmarshaler(&plog.ProtoUnmarshaler{}, defaultEncoding),
logger: zap.NewNop(),
ready: make(chan bool),
nextConsumer: consumertest.NewNop(),
obsrecv: obsrecv,
headerExtractor: &nopHeaderExtractor{},
}

wg := sync.WaitGroup{}
Expand All @@ -772,11 +783,12 @@ func TestLogsConsumerGroupHandler_error_nextConsumer(t *testing.T) {
obsrecv, err := obsreport.NewReceiver(obsreport.ReceiverSettings{ReceiverCreateSettings: receivertest.NewNopCreateSettings()})
require.NoError(t, err)
c := logsConsumerGroupHandler{
unmarshaler: newPdataLogsUnmarshaler(&plog.ProtoUnmarshaler{}, defaultEncoding),
logger: zap.NewNop(),
ready: make(chan bool),
nextConsumer: consumertest.NewErr(consumerError),
obsrecv: obsrecv,
unmarshaler: newPdataLogsUnmarshaler(&plog.ProtoUnmarshaler{}, defaultEncoding),
logger: zap.NewNop(),
ready: make(chan bool),
nextConsumer: consumertest.NewErr(consumerError),
obsrecv: obsrecv,
headerExtractor: &nopHeaderExtractor{},
}

wg := sync.WaitGroup{}
Expand Down Expand Up @@ -842,11 +854,12 @@ func TestLogsConsumerGroupHandler_unmarshal_text(t *testing.T) {
require.NoError(t, err)
sink := &consumertest.LogsSink{}
c := logsConsumerGroupHandler{
unmarshaler: unmarshaler,
logger: zap.NewNop(),
ready: make(chan bool),
nextConsumer: sink,
obsrecv: obsrecv,
unmarshaler: unmarshaler,
logger: zap.NewNop(),
ready: make(chan bool),
nextConsumer: sink,
obsrecv: obsrecv,
headerExtractor: &nopHeaderExtractor{},
}

wg := sync.WaitGroup{}
Expand Down

0 comments on commit d7e367f

Please sign in to comment.