diff --git a/metricbeat/module/kafka/consumergroup/consumergroup.go b/metricbeat/module/kafka/consumergroup/consumergroup.go index b45aae8cca20..045276ed9d5b 100644 --- a/metricbeat/module/kafka/consumergroup/consumergroup.go +++ b/metricbeat/module/kafka/consumergroup/consumergroup.go @@ -20,6 +20,8 @@ package consumergroup import ( "fmt" + "github.com/pkg/errors" + "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/metricbeat/mb" @@ -76,11 +78,10 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { } // Fetch consumer group metrics from kafka -func (m *MetricSet) Fetch(r mb.ReporterV2) { +func (m *MetricSet) Fetch(r mb.ReporterV2) error { broker, err := m.Connect() if err != nil { - r.Error(err) - return + return errors.Wrap(err, "error in connect") } defer broker.Close() @@ -111,6 +112,8 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) { } err = fetchGroupInfo(emitEvent, broker, m.groups.pred(), m.topics.pred()) if err != nil { - r.Error(err) + return errors.Wrap(err, "error in fetch") } + + return nil } diff --git a/metricbeat/module/kafka/consumergroup/consumergroup_integration_test.go b/metricbeat/module/kafka/consumergroup/consumergroup_integration_test.go index df67af8da077..43be85287cbd 100644 --- a/metricbeat/module/kafka/consumergroup/consumergroup_integration_test.go +++ b/metricbeat/module/kafka/consumergroup/consumergroup_integration_test.go @@ -47,9 +47,9 @@ func TestData(t *testing.T) { } defer c.Close() - ms := mbtest.NewReportingMetricSetV2(t, getConfig()) + ms := mbtest.NewReportingMetricSetV2Error(t, getConfig()) for retries := 0; retries < 3; retries++ { - err = mbtest.WriteEventsReporterV2(ms, t, "") + err = mbtest.WriteEventsReporterV2Error(ms, t, "") if err == nil { return } diff --git a/metricbeat/module/kafka/partition/partition.go b/metricbeat/module/kafka/partition/partition.go index 2e002e2ebcfd..2e939c3cde55 100644 --- a/metricbeat/module/kafka/partition/partition.go +++ b/metricbeat/module/kafka/partition/partition.go @@ -18,11 +18,12 @@ package partition import ( - "errors" "fmt" "github.com/Shopify/sarama" + "github.com/pkg/errors" + "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/metricbeat/mb" @@ -74,18 +75,16 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { } // Fetch partition stats list from kafka -func (m *MetricSet) Fetch(r mb.ReporterV2) { +func (m *MetricSet) Fetch(r mb.ReporterV2) error { broker, err := m.Connect() if err != nil { - r.Error(err) - return + return errors.Wrap(err, "error in connect") } defer broker.Close() topics, err := broker.GetTopicsMetadata(m.topics...) if err != nil { - r.Error(err) - return + return errors.Wrap(err, "error getting topic metadata") } evtBroker := common.MapStr{ @@ -123,8 +122,10 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) { err = errFailQueryOffset } - logp.Err("Failed to query kafka partition (%v:%v) offsets: %v", + msg := fmt.Errorf("Failed to query kafka partition (%v:%v) offsets: %v", topic.Name, partition.ID, err) + m.Logger().Error(msg) + r.Error(msg) continue } @@ -163,16 +164,20 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) { } // TODO (deprecation): Remove fields from MetricSetFields moved to ModuleFields - r.Event(mb.Event{ + sent := r.Event(mb.Event{ ModuleFields: common.MapStr{ "broker": evtBroker, "topic": evtTopic, }, MetricSetFields: event, }) + if !sent { + return nil + } } } } + return nil } // queryOffsetRange queries the broker for the oldest and the newest offsets in diff --git a/metricbeat/module/kafka/partition/partition_integration_test.go b/metricbeat/module/kafka/partition/partition_integration_test.go index dc696c26b1d2..54b7e5b4ed60 100644 --- a/metricbeat/module/kafka/partition/partition_integration_test.go +++ b/metricbeat/module/kafka/partition/partition_integration_test.go @@ -46,8 +46,8 @@ func TestData(t *testing.T) { generateKafkaData(t, "metricbeat-generate-data") - ms := mbtest.NewReportingMetricSetV2(t, getConfig("")) - err := mbtest.WriteEventsReporterV2(ms, t, "") + ms := mbtest.NewReportingMetricSetV2Error(t, getConfig("")) + err := mbtest.WriteEventsReporterV2Error(ms, t, "") if err != nil { t.Fatal("write", err) } @@ -65,8 +65,8 @@ func TestTopic(t *testing.T) { // Create initial topic generateKafkaData(t, testTopic) - f := mbtest.NewReportingMetricSetV2(t, getConfig(testTopic)) - dataBefore, err := mbtest.ReportingFetchV2(f) + f := mbtest.NewReportingMetricSetV2Error(t, getConfig(testTopic)) + dataBefore, err := mbtest.ReportingFetchV2Error(f) if err != nil { t.Fatal("write", err) } @@ -82,7 +82,7 @@ func TestTopic(t *testing.T) { generateKafkaData(t, testTopic) } - dataAfter, err := mbtest.ReportingFetchV2(f) + dataAfter, err := mbtest.ReportingFetchV2Error(f) if err != nil { t.Fatal("write", err) }