diff --git a/metricbeat/module/munin/node/node.go b/metricbeat/module/munin/node/node.go index 7bc9cde2342..62c15bd77e9 100644 --- a/metricbeat/module/munin/node/node.go +++ b/metricbeat/module/munin/node/node.go @@ -20,8 +20,9 @@ package node import ( "time" + "github.com/pkg/errors" + "github.com/elastic/beats/libbeat/common" - "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/metricbeat/mb" "github.com/elastic/beats/metricbeat/module/munin" ) @@ -66,11 +67,10 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { } // Fetch method implements the data gathering -func (m *MetricSet) Fetch(r mb.ReporterV2) { +func (m *MetricSet) Fetch(r mb.ReporterV2) error { node, err := munin.Connect(m.Host(), m.timeout) if err != nil { - r.Error(err) - return + return errors.Wrap(err, "error in Connect") } defer node.Close() @@ -78,15 +78,17 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) { if len(plugins) == 0 { plugins, err = node.List() if err != nil { - r.Error(err) - return + return errors.Wrap(err, "error getting plugin list") } } for _, plugin := range plugins { metrics, err := node.Fetch(plugin, m.sanitize) if err != nil { + msg := errors.Wrap(err, "error fetching metrics") r.Error(err) + m.Logger().Error(msg) + continue } // Even if there was some error, keep sending succesfully collected metrics if any @@ -105,8 +107,8 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) { }, } if !r.Event(event) { - logp.Debug("munin", "Failed to report event, interrupting Fetch") - return + return errors.New("metricset has closed") } } + return nil } diff --git a/metricbeat/module/munin/node/node_integration_test.go b/metricbeat/module/munin/node/node_integration_test.go index 1e8fbbe2fd6..201a0ecac1d 100644 --- a/metricbeat/module/munin/node/node_integration_test.go +++ b/metricbeat/module/munin/node/node_integration_test.go @@ -32,8 +32,8 @@ import ( func TestFetch(t *testing.T) { compose.EnsureUp(t, "munin") - f := mbtest.NewReportingMetricSetV2(t, getConfig()) - events, errs := mbtest.ReportingFetchV2(f) + f := mbtest.NewReportingMetricSetV2Error(t, getConfig()) + events, errs := mbtest.ReportingFetchV2Error(f) assert.Empty(t, errs) if !assert.NotEmpty(t, events) { @@ -48,8 +48,8 @@ func TestData(t *testing.T) { compose.EnsureUp(t, "munin") config := getConfig() - f := mbtest.NewReportingMetricSetV2(t, config) - err := mbtest.WriteEventsReporterV2(f, t, ".") + f := mbtest.NewReportingMetricSetV2Error(t, config) + err := mbtest.WriteEventsReporterV2Error(f, t, ".") if err != nil { t.Fatal("write", err) }