Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[metricbeat] migrate kafka to reporterV2 with error return #11868

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions metricbeat/module/kafka/consumergroup/consumergroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ package consumergroup
import (
"fmt"

"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/metricbeat/mb"
Expand Down Expand Up @@ -76,11 +78,10 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
}

// Fetch consumer group metrics from kafka
func (m *MetricSet) Fetch(r mb.ReporterV2) {
func (m *MetricSet) Fetch(r mb.ReporterV2) error {
broker, err := m.Connect()
if err != nil {
r.Error(err)
return
return errors.Wrap(err, "error in connect")
}
defer broker.Close()

Expand Down Expand Up @@ -111,6 +112,8 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) {
}
err = fetchGroupInfo(emitEvent, broker, m.groups.pred(), m.topics.pred())
if err != nil {
r.Error(err)
return errors.Wrap(err, "error in fetch")
}

return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@ func TestData(t *testing.T) {
}
defer c.Close()

ms := mbtest.NewReportingMetricSetV2(t, getConfig())
ms := mbtest.NewReportingMetricSetV2Error(t, getConfig())
for retries := 0; retries < 3; retries++ {
err = mbtest.WriteEventsReporterV2(ms, t, "")
err = mbtest.WriteEventsReporterV2Error(ms, t, "")
if err == nil {
return
}
Expand Down
21 changes: 13 additions & 8 deletions metricbeat/module/kafka/partition/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@
package partition

import (
"errors"
"fmt"

"github.com/Shopify/sarama"

"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/metricbeat/mb"
Expand Down Expand Up @@ -74,18 +75,16 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
}

// Fetch partition stats list from kafka
func (m *MetricSet) Fetch(r mb.ReporterV2) {
func (m *MetricSet) Fetch(r mb.ReporterV2) error {
broker, err := m.Connect()
if err != nil {
r.Error(err)
return
return errors.Wrap(err, "error in connect")
}
defer broker.Close()

topics, err := broker.GetTopicsMetadata(m.topics...)
if err != nil {
r.Error(err)
return
return errors.Wrap(err, "error getting topic metadata")
}

evtBroker := common.MapStr{
Expand Down Expand Up @@ -123,8 +122,10 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) {
err = errFailQueryOffset
}

logp.Err("Failed to query kafka partition (%v:%v) offsets: %v",
msg := fmt.Errorf("Failed to query kafka partition (%v:%v) offsets: %v",
topic.Name, partition.ID, err)
m.Logger().Error(msg)
r.Error(msg)
continue
}

Expand Down Expand Up @@ -163,16 +164,20 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) {
}

// TODO (deprecation): Remove fields from MetricSetFields moved to ModuleFields
r.Event(mb.Event{
sent := r.Event(mb.Event{
ModuleFields: common.MapStr{
"broker": evtBroker,
"topic": evtTopic,
},
MetricSetFields: event,
})
if !sent {
return nil
}
}
}
}
return nil
}

// queryOffsetRange queries the broker for the oldest and the newest offsets in
Expand Down
10 changes: 5 additions & 5 deletions metricbeat/module/kafka/partition/partition_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ func TestData(t *testing.T) {

generateKafkaData(t, "metricbeat-generate-data")

ms := mbtest.NewReportingMetricSetV2(t, getConfig(""))
err := mbtest.WriteEventsReporterV2(ms, t, "")
ms := mbtest.NewReportingMetricSetV2Error(t, getConfig(""))
err := mbtest.WriteEventsReporterV2Error(ms, t, "")
if err != nil {
t.Fatal("write", err)
}
Expand All @@ -65,8 +65,8 @@ func TestTopic(t *testing.T) {
// Create initial topic
generateKafkaData(t, testTopic)

f := mbtest.NewReportingMetricSetV2(t, getConfig(testTopic))
dataBefore, err := mbtest.ReportingFetchV2(f)
f := mbtest.NewReportingMetricSetV2Error(t, getConfig(testTopic))
dataBefore, err := mbtest.ReportingFetchV2Error(f)
if err != nil {
t.Fatal("write", err)
}
Expand All @@ -82,7 +82,7 @@ func TestTopic(t *testing.T) {
generateKafkaData(t, testTopic)
}

dataAfter, err := mbtest.ReportingFetchV2(f)
dataAfter, err := mbtest.ReportingFetchV2Error(f)
if err != nil {
t.Fatal("write", err)
}
Expand Down