diff --git a/metricbeat/module/mongodb/dbstats/_meta/data.json b/metricbeat/module/mongodb/dbstats/_meta/data.json index 671dceeaf4a9..6a4105be4d7d 100644 --- a/metricbeat/module/mongodb/dbstats/_meta/data.json +++ b/metricbeat/module/mongodb/dbstats/_meta/data.json @@ -1,25 +1,27 @@ { "@timestamp": "2017-10-12T08:05:34.853Z", - "beat": { + "agent": { "hostname": "host.example.com", "name": "host.example.com" }, + "event": { + "dataset": "mongodb.dbstats", + "duration": 115000, + "module": "mongodb" + }, "metricset": { - "host": "mongodb:27017", - "module": "mongodb", - "name": "dbstats", - "rtt": 115 + "name": "dbstats" }, "mongodb": { "dbstats": { "avg_obj_size": { - "bytes": 59 + "bytes": 741 }, - "collections": 1, + "collections": 2, "data_size": { - "bytes": 59 + "bytes": 1482 }, - "db": "admin", + "db": "local", "file_size": {}, "index_size": { "bytes": 32768 @@ -27,10 +29,14 @@ "indexes": 2, "ns_size_mb": {}, "num_extents": 0, - "objects": 1, + "objects": 2, "storage_size": { - "bytes": 16384 + "bytes": 32768 } } + }, + "service": { + "address": "172.26.0.2:27017", + "type": "mongodb" } } \ No newline at end of file diff --git a/metricbeat/module/mongodb/dbstats/dbstats.go b/metricbeat/module/mongodb/dbstats/dbstats.go index 626d1b7b1358..4db9779f3c4e 100644 --- a/metricbeat/module/mongodb/dbstats/dbstats.go +++ b/metricbeat/module/mongodb/dbstats/dbstats.go @@ -18,7 +18,7 @@ package dbstats import ( - "errors" + "github.com/pkg/errors" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" @@ -26,7 +26,7 @@ import ( "github.com/elastic/beats/metricbeat/module/mongodb" ) -var debugf = logp.MakeDebug("mongodb.dbstats") +var logger = logp.NewLogger("mongodb.dbstats") // init registers the MetricSet with the central registry. // The New method will be called after the setup of the module and before starting to fetch data @@ -56,28 +56,30 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { return &MetricSet{ms}, nil } -// Fetch methods implements the data gathering and data conversion to the right format -// It returns the event which is then forward to the output. In case of an error, a -// descriptive error must be returned. -func (m *MetricSet) Fetch() ([]common.MapStr, error) { - // events is the list of events collected from each of the databases. - var events []common.MapStr - +// Fetch methods implements the data gathering and data conversion to the right +// format. It publishes the event which is then forwarded to the output. In case +// of an error set the Error field of mb.Event or simply call report.Error(). +func (m *MetricSet) Fetch(reporter mb.ReporterV2) { // instantiate direct connections to each of the configured Mongo hosts mongoSession, err := mongodb.NewDirectSession(m.DialInfo) if err != nil { - return nil, err + logger.Error(err) + reporter.Error(err) + return } defer mongoSession.Close() // Get the list of databases names, which we'll use to call db.stats() on each dbNames, err := mongoSession.DatabaseNames() if err != nil { - logp.Err("Error retrieving database names from Mongo instance") - return events, err + err = errors.Wrap(err, "Error retrieving database names from Mongo instance") + logger.Error(err) + reporter.Error(err) + return } // for each database, call db.stats() and append to events + totalEvents := 0 for _, dbName := range dbNames { db := mongoSession.DB(dbName) @@ -85,18 +87,23 @@ func (m *MetricSet) Fetch() ([]common.MapStr, error) { err := db.Run("dbStats", &result) if err != nil { - logp.Err("Failed to retrieve stats for db %s", dbName) + err = errors.Wrapf(err, "Failed to retrieve stats for db %s", dbName) + logger.Error(err) continue } data, _ := schema.Apply(result) - events = append(events, data) + if reported := reporter.Event(mb.Event{MetricSetFields: data}); !reported { + logger.Debug("error reporting event") + return + } + totalEvents++ } - if len(events) == 0 { + if totalEvents == 0 { err = errors.New("Failed to retrieve dbStats from any databases") - logp.Err(err.Error()) - return events, err + logger.Error(err) + reporter.Error(err) } - return events, nil + return } diff --git a/metricbeat/module/mongodb/dbstats/dbstats_integration_test.go b/metricbeat/module/mongodb/dbstats/dbstats_integration_test.go index 512ef1c2b8ed..9ef22d6a7001 100644 --- a/metricbeat/module/mongodb/dbstats/dbstats_integration_test.go +++ b/metricbeat/module/mongodb/dbstats/dbstats_integration_test.go @@ -32,44 +32,46 @@ import ( func TestFetch(t *testing.T) { compose.EnsureUp(t, "mongodb") - f := mbtest.NewEventsFetcher(t, getConfig()) - events, err := f.Fetch() - if !assert.NoError(t, err) { - t.FailNow() + f := mbtest.NewReportingMetricSetV2(t, getConfig()) + events, errs := mbtest.ReportingFetchV2(f) + if len(errs) > 0 { + t.Fatalf("Expected 0 error, had %d. %v\n", len(errs), errs) } + assert.NotEmpty(t, events) for _, event := range events { t.Logf("%s/%s event: %+v", f.Module().Name(), f.Name(), event) + metricsetFields := event.MetricSetFields // Check a few event Fields - db := event["db"].(string) + db := metricsetFields["db"].(string) assert.NotEqual(t, db, "") - collections := event["collections"].(int64) + collections := metricsetFields["collections"].(int64) assert.True(t, collections > 0) - objects := event["objects"].(int64) + objects := metricsetFields["objects"].(int64) assert.True(t, objects > 0) - avgObjSize, err := event.GetValue("avg_obj_size.bytes") + avgObjSize, err := metricsetFields.GetValue("avg_obj_size.bytes") assert.NoError(t, err) assert.True(t, avgObjSize.(int64) > 0) - dataSize, err := event.GetValue("data_size.bytes") + dataSize, err := metricsetFields.GetValue("data_size.bytes") assert.NoError(t, err) assert.True(t, dataSize.(int64) > 0) - storageSize, err := event.GetValue("storage_size.bytes") + storageSize, err := metricsetFields.GetValue("storage_size.bytes") assert.NoError(t, err) assert.True(t, storageSize.(int64) > 0) - numExtents := event["num_extents"].(int64) + numExtents := metricsetFields["num_extents"].(int64) assert.True(t, numExtents >= 0) - indexes := event["indexes"].(int64) + indexes := metricsetFields["indexes"].(int64) assert.True(t, indexes >= 0) - indexSize, err := event.GetValue("index_size.bytes") + indexSize, err := metricsetFields.GetValue("index_size.bytes") assert.NoError(t, err) assert.True(t, indexSize.(int64) > 0) } @@ -78,9 +80,14 @@ func TestFetch(t *testing.T) { func TestData(t *testing.T) { compose.EnsureUp(t, "mongodb") - f := mbtest.NewEventsFetcher(t, getConfig()) - err := mbtest.WriteEvents(f, t) - if err != nil { + f := mbtest.NewReportingMetricSetV2(t, getConfig()) + events, errs := mbtest.ReportingFetchV2(f) + if len(errs) > 0 { + t.Fatalf("Expected 0 error, had %d. %v\n", len(errs), errs) + } + assert.NotEmpty(t, events) + + if err := mbtest.WriteEventsReporterV2(f, t, ""); err != nil { t.Fatal("write", err) } }