From c8500f8b2ba7e2daf771b0e360de7a7c53cf1eee Mon Sep 17 00:00:00 2001 From: Alex Kristiansen Date: Thu, 18 Apr 2019 12:27:37 -0500 Subject: [PATCH 1/3] migrate kafka to reporterV2 with error return --- .../kafka/consumergroup/consumergroup.go | 10 ++++++---- .../consumergroup_integration_test.go | 4 ++-- .../module/kafka/partition/partition.go | 20 +++++++++++-------- .../partition/partition_integration_test.go | 10 +++++----- 4 files changed, 25 insertions(+), 19 deletions(-) diff --git a/metricbeat/module/kafka/consumergroup/consumergroup.go b/metricbeat/module/kafka/consumergroup/consumergroup.go index b45aae8cca20..476abf4abe2a 100644 --- a/metricbeat/module/kafka/consumergroup/consumergroup.go +++ b/metricbeat/module/kafka/consumergroup/consumergroup.go @@ -24,6 +24,7 @@ import ( "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/metricbeat/mb" "github.com/elastic/beats/metricbeat/module/kafka" + "github.com/pkg/errors" ) // init registers the MetricSet with the central registry. @@ -76,11 +77,10 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { } // Fetch consumer group metrics from kafka -func (m *MetricSet) Fetch(r mb.ReporterV2) { +func (m *MetricSet) Fetch(r mb.ReporterV2) error { broker, err := m.Connect() if err != nil { - r.Error(err) - return + return errors.Wrap(err, "error in connect") } defer broker.Close() @@ -111,6 +111,8 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) { } err = fetchGroupInfo(emitEvent, broker, m.groups.pred(), m.topics.pred()) if err != nil { - r.Error(err) + return errors.Wrap(err, "error in fetch") } + + return nil } diff --git a/metricbeat/module/kafka/consumergroup/consumergroup_integration_test.go b/metricbeat/module/kafka/consumergroup/consumergroup_integration_test.go index df67af8da077..43be85287cbd 100644 --- a/metricbeat/module/kafka/consumergroup/consumergroup_integration_test.go +++ b/metricbeat/module/kafka/consumergroup/consumergroup_integration_test.go @@ -47,9 +47,9 @@ func TestData(t *testing.T) { } defer c.Close() - ms := mbtest.NewReportingMetricSetV2(t, getConfig()) + ms := mbtest.NewReportingMetricSetV2Error(t, getConfig()) for retries := 0; retries < 3; retries++ { - err = mbtest.WriteEventsReporterV2(ms, t, "") + err = mbtest.WriteEventsReporterV2Error(ms, t, "") if err == nil { return } diff --git a/metricbeat/module/kafka/partition/partition.go b/metricbeat/module/kafka/partition/partition.go index 2e002e2ebcfd..325e6c2acc4e 100644 --- a/metricbeat/module/kafka/partition/partition.go +++ b/metricbeat/module/kafka/partition/partition.go @@ -18,7 +18,6 @@ package partition import ( - "errors" "fmt" "github.com/Shopify/sarama" @@ -28,6 +27,7 @@ import ( "github.com/elastic/beats/metricbeat/mb" "github.com/elastic/beats/metricbeat/mb/parse" "github.com/elastic/beats/metricbeat/module/kafka" + "github.com/pkg/errors" ) // init registers the partition MetricSet with the central registry. @@ -74,18 +74,16 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { } // Fetch partition stats list from kafka -func (m *MetricSet) Fetch(r mb.ReporterV2) { +func (m *MetricSet) Fetch(r mb.ReporterV2) error { broker, err := m.Connect() if err != nil { - r.Error(err) - return + return errors.Wrap(err, "error in connect") } defer broker.Close() topics, err := broker.GetTopicsMetadata(m.topics...) if err != nil { - r.Error(err) - return + return errors.Wrap(err, "error getting topic metadata") } evtBroker := common.MapStr{ @@ -123,8 +121,10 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) { err = errFailQueryOffset } - logp.Err("Failed to query kafka partition (%v:%v) offsets: %v", + msg := fmt.Errorf("Failed to query kafka partition (%v:%v) offsets: %v", topic.Name, partition.ID, err) + m.Logger().Error(msg) + r.Error(msg) continue } @@ -163,16 +163,20 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) { } // TODO (deprecation): Remove fields from MetricSetFields moved to ModuleFields - r.Event(mb.Event{ + sent := r.Event(mb.Event{ ModuleFields: common.MapStr{ "broker": evtBroker, "topic": evtTopic, }, MetricSetFields: event, }) + if !sent { + return errors.New("metricset is closed") + } } } } + return nil } // queryOffsetRange queries the broker for the oldest and the newest offsets in diff --git a/metricbeat/module/kafka/partition/partition_integration_test.go b/metricbeat/module/kafka/partition/partition_integration_test.go index dc696c26b1d2..54b7e5b4ed60 100644 --- a/metricbeat/module/kafka/partition/partition_integration_test.go +++ b/metricbeat/module/kafka/partition/partition_integration_test.go @@ -46,8 +46,8 @@ func TestData(t *testing.T) { generateKafkaData(t, "metricbeat-generate-data") - ms := mbtest.NewReportingMetricSetV2(t, getConfig("")) - err := mbtest.WriteEventsReporterV2(ms, t, "") + ms := mbtest.NewReportingMetricSetV2Error(t, getConfig("")) + err := mbtest.WriteEventsReporterV2Error(ms, t, "") if err != nil { t.Fatal("write", err) } @@ -65,8 +65,8 @@ func TestTopic(t *testing.T) { // Create initial topic generateKafkaData(t, testTopic) - f := mbtest.NewReportingMetricSetV2(t, getConfig(testTopic)) - dataBefore, err := mbtest.ReportingFetchV2(f) + f := mbtest.NewReportingMetricSetV2Error(t, getConfig(testTopic)) + dataBefore, err := mbtest.ReportingFetchV2Error(f) if err != nil { t.Fatal("write", err) } @@ -82,7 +82,7 @@ func TestTopic(t *testing.T) { generateKafkaData(t, testTopic) } - dataAfter, err := mbtest.ReportingFetchV2(f) + dataAfter, err := mbtest.ReportingFetchV2Error(f) if err != nil { t.Fatal("write", err) } From e286c97808ed1e75cd7933cf0c0407eb4cf34c48 Mon Sep 17 00:00:00 2001 From: Alex Kristiansen Date: Thu, 18 Apr 2019 13:09:43 -0500 Subject: [PATCH 2/3] make fmt --- metricbeat/module/kafka/consumergroup/consumergroup.go | 3 ++- metricbeat/module/kafka/partition/partition.go | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/metricbeat/module/kafka/consumergroup/consumergroup.go b/metricbeat/module/kafka/consumergroup/consumergroup.go index 476abf4abe2a..045276ed9d5b 100644 --- a/metricbeat/module/kafka/consumergroup/consumergroup.go +++ b/metricbeat/module/kafka/consumergroup/consumergroup.go @@ -20,11 +20,12 @@ package consumergroup import ( "fmt" + "github.com/pkg/errors" + "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/metricbeat/mb" "github.com/elastic/beats/metricbeat/module/kafka" - "github.com/pkg/errors" ) // init registers the MetricSet with the central registry. diff --git a/metricbeat/module/kafka/partition/partition.go b/metricbeat/module/kafka/partition/partition.go index 325e6c2acc4e..a9f5d33779bd 100644 --- a/metricbeat/module/kafka/partition/partition.go +++ b/metricbeat/module/kafka/partition/partition.go @@ -22,12 +22,13 @@ import ( "github.com/Shopify/sarama" + "github.com/pkg/errors" + "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/metricbeat/mb" "github.com/elastic/beats/metricbeat/mb/parse" "github.com/elastic/beats/metricbeat/module/kafka" - "github.com/pkg/errors" ) // init registers the partition MetricSet with the central registry. From 068a8436849b3ac03214811e9eec2f0943bd3e29 Mon Sep 17 00:00:00 2001 From: Alex Kristiansen Date: Tue, 23 Apr 2019 13:28:35 -0500 Subject: [PATCH 3/3] return nil on closed channel --- metricbeat/module/kafka/partition/partition.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metricbeat/module/kafka/partition/partition.go b/metricbeat/module/kafka/partition/partition.go index a9f5d33779bd..2e939c3cde55 100644 --- a/metricbeat/module/kafka/partition/partition.go +++ b/metricbeat/module/kafka/partition/partition.go @@ -172,7 +172,7 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) error { MetricSetFields: event, }) if !sent { - return errors.New("metricset is closed") + return nil } } }