Skip to content

Commit

Permalink
[kafka exporter] Allow for additional marshalers to be added. (#16514)
Browse files Browse the repository at this point in the history
Adding in work for allowing custom metrics and logs marshalers
  • Loading branch information
MovieStoreGuy authored Nov 30, 2022
1 parent 870f399 commit 4a496a4
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 8 deletions.
16 changes: 16 additions & 0 deletions .chloggen/msg_allow-custom-marshalers.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: 'enhancement'

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: kafkaexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Allows for custom marshalers to be added in future releases

# One or more tracking issues related to the change
issues: [14514]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
18 changes: 18 additions & 0 deletions exporter/kafkaexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,24 @@ func WithTracesMarshalers(tracesMarshalers ...TracesMarshaler) FactoryOption {
}
}

// WithMetricsMarshalers adds additional metric marshalers to the exporter factory.
func WithMetricsMarshalers(metricMarshalers ...MetricsMarshaler) FactoryOption {
return func(factory *kafkaExporterFactory) {
for _, marshaler := range metricMarshalers {
factory.metricsMarshalers[marshaler.Encoding()] = marshaler
}
}
}

// WithLogMarshalers adds additional log marshalers to the exporter factory.
func WithLogsMarshalers(logsMarshalers ...LogsMarshaler) FactoryOption {
return func(factory *kafkaExporterFactory) {
for _, marshaler := range logsMarshalers {
factory.logsMarshalers[marshaler.Encoding()] = marshaler
}
}
}

// NewFactory creates Kafka exporter factory.
func NewFactory(options ...FactoryOption) component.ExporterFactory {
f := &kafkaExporterFactory{
Expand Down
62 changes: 54 additions & 8 deletions exporter/kafkaexporter/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,10 @@ func TestCreateMetricExporter(t *testing.T) {
t.Parallel()

tests := []struct {
name string
conf *Config
err error
name string
conf *Config
marshalers []MetricsMarshaler
err error
}{
{
name: "valid config (no validating broker)",
Expand All @@ -97,14 +98,36 @@ func TestCreateMetricExporter(t *testing.T) {
}),
err: &net.DNSError{},
},
{
name: "default_encoding",
conf: applyConfigOption(func(conf *Config) {
// Disabling broker check to ensure encoding work
conf.Metadata.Full = false
conf.Encoding = defaultEncoding
}),
marshalers: nil,
err: nil,
},
{
name: "custom_encoding",
conf: applyConfigOption(func(conf *Config) {
// Disabling broker check to ensure encoding work
conf.Metadata.Full = false
conf.Encoding = "custom"
}),
marshalers: []MetricsMarshaler{
newMockMarshaler[pmetric.Metrics]("custom"),
},
err: nil,
},
}

for _, tc := range tests {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()

f := NewFactory()
f := NewFactory(WithMetricsMarshalers(tc.marshalers...))
exporter, err := f.CreateMetricsExporter(
context.Background(),
componenttest.NewNopExporterCreateSettings(),
Expand All @@ -125,9 +148,10 @@ func TestCreateLogExporter(t *testing.T) {
t.Parallel()

tests := []struct {
name string
conf *Config
err error
name string
conf *Config
marshalers []LogsMarshaler
err error
}{
{
name: "valid config (no validating broker)",
Expand All @@ -148,14 +172,36 @@ func TestCreateLogExporter(t *testing.T) {
}),
err: &net.DNSError{},
},
{
name: "default_encoding",
conf: applyConfigOption(func(conf *Config) {
// Disabling broker check to ensure encoding work
conf.Metadata.Full = false
conf.Encoding = defaultEncoding
}),
marshalers: nil,
err: nil,
},
{
name: "custom_encoding",
conf: applyConfigOption(func(conf *Config) {
// Disabling broker check to ensure encoding work
conf.Metadata.Full = false
conf.Encoding = "custom"
}),
marshalers: []LogsMarshaler{
newMockMarshaler[plog.Logs]("custom"),
},
err: nil,
},
}

for _, tc := range tests {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()

f := NewFactory()
f := NewFactory(WithLogsMarshalers(tc.marshalers...))
exporter, err := f.CreateLogsExporter(
context.Background(),
componenttest.NewNopExporterCreateSettings(),
Expand Down

0 comments on commit 4a496a4

Please sign in to comment.