Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Metricbeat] Migrate MongoDB dbstats Metricset to use ReporterV2 interface #10852

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 17 additions & 11 deletions metricbeat/module/mongodb/dbstats/_meta/data.json
Original file line number Diff line number Diff line change
@@ -1,36 +1,42 @@
{
"@timestamp": "2017-10-12T08:05:34.853Z",
"beat": {
"agent": {
"hostname": "host.example.com",
"name": "host.example.com"
},
"event": {
"dataset": "mongodb.dbstats",
"duration": 115000,
"module": "mongodb"
},
"metricset": {
"host": "mongodb:27017",
"module": "mongodb",
"name": "dbstats",
"rtt": 115
"name": "dbstats"
},
"mongodb": {
"dbstats": {
"avg_obj_size": {
"bytes": 59
"bytes": 741
},
"collections": 1,
"collections": 2,
"data_size": {
"bytes": 59
"bytes": 1482
},
"db": "admin",
"db": "local",
"file_size": {},
"index_size": {
"bytes": 32768
},
"indexes": 2,
"ns_size_mb": {},
"num_extents": 0,
"objects": 1,
"objects": 2,
"storage_size": {
"bytes": 16384
"bytes": 32768
}
}
},
"service": {
"address": "172.26.0.2:27017",
"type": "mongodb"
}
}
40 changes: 22 additions & 18 deletions metricbeat/module/mongodb/dbstats/dbstats.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@
package dbstats

import (
"errors"
"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/metricbeat/mb"
"github.com/elastic/beats/metricbeat/module/mongodb"
)

var debugf = logp.MakeDebug("mongodb.dbstats")
var logger = logp.NewLogger("mongodb.dbstats")

// init registers the MetricSet with the central registry.
// The New method will be called after the setup of the module and before starting to fetch data
Expand Down Expand Up @@ -56,47 +56,51 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
return &MetricSet{ms}, nil
}

// Fetch methods implements the data gathering and data conversion to the right format
// It returns the event which is then forward to the output. In case of an error, a
// descriptive error must be returned.
func (m *MetricSet) Fetch() ([]common.MapStr, error) {
// events is the list of events collected from each of the databases.
var events []common.MapStr

// Fetch methods implements the data gathering and data conversion to the right
// format. It publishes the event which is then forwarded to the output. In case
// of an error set the Error field of mb.Event or simply call report.Error().
func (m *MetricSet) Fetch(reporter mb.ReporterV2) {
// instantiate direct connections to each of the configured Mongo hosts
mongoSession, err := mongodb.NewDirectSession(m.DialInfo)
if err != nil {
return nil, err
logger.Error(err)
reporter.Error(err)
return
}
defer mongoSession.Close()

// Get the list of databases names, which we'll use to call db.stats() on each
dbNames, err := mongoSession.DatabaseNames()
if err != nil {
logp.Err("Error retrieving database names from Mongo instance")
return events, err
err = errors.Wrap(err, "Error retrieving database names from Mongo instance")
logger.Error(err)
reporter.Error(err)
return
}

// for each database, call db.stats() and append to events
totalEvents := 0
for _, dbName := range dbNames {
db := mongoSession.DB(dbName)

result := common.MapStr{}

err := db.Run("dbStats", &result)
if err != nil {
logp.Err("Failed to retrieve stats for db %s", dbName)
err = errors.Wrapf(err, "Failed to retrieve stats for db %s", dbName)
logger.Error(err)
continue
}
data, _ := schema.Apply(result)
events = append(events, data)
reporter.Event(mb.Event{MetricSetFields: data})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Result of reporter.Event() could be checked here to avoid continuing doing queries when the metricset is being stopped.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have the same concerns than here. I really don't have a strong opinion anyways but I think it's good that we reach consensus for this in the future.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we already had some kind of consensus 🙂 but as replied in the other PR I don't see this as such a big deal, so as you prefer.

totalEvents++
}

if len(events) == 0 {
if totalEvents == 0 {
err = errors.New("Failed to retrieve dbStats from any databases")
logp.Err(err.Error())
return events, err
logger.Error(err)
sayden marked this conversation as resolved.
Show resolved Hide resolved
reporter.Error(err)
}

return events, nil
return
}
39 changes: 23 additions & 16 deletions metricbeat/module/mongodb/dbstats/dbstats_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,44 +32,46 @@ import (
func TestFetch(t *testing.T) {
compose.EnsureUp(t, "mongodb")

f := mbtest.NewEventsFetcher(t, getConfig())
events, err := f.Fetch()
if !assert.NoError(t, err) {
t.FailNow()
f := mbtest.NewReportingMetricSetV2(t, getConfig())
events, errs := mbtest.ReportingFetchV2(f)
if len(errs) > 0 {
t.Fatalf("Expected 0 error, had %d. %v\n", len(errs), errs)
}
assert.NotEmpty(t, events)

for _, event := range events {
t.Logf("%s/%s event: %+v", f.Module().Name(), f.Name(), event)
metricsetFields := event.MetricSetFields

// Check a few event Fields
db := event["db"].(string)
db := metricsetFields["db"].(string)
assert.NotEqual(t, db, "")

collections := event["collections"].(int64)
collections := metricsetFields["collections"].(int64)
assert.True(t, collections > 0)

objects := event["objects"].(int64)
objects := metricsetFields["objects"].(int64)
assert.True(t, objects > 0)

avgObjSize, err := event.GetValue("avg_obj_size.bytes")
avgObjSize, err := metricsetFields.GetValue("avg_obj_size.bytes")
assert.NoError(t, err)
assert.True(t, avgObjSize.(int64) > 0)

dataSize, err := event.GetValue("data_size.bytes")
dataSize, err := metricsetFields.GetValue("data_size.bytes")
assert.NoError(t, err)
assert.True(t, dataSize.(int64) > 0)

storageSize, err := event.GetValue("storage_size.bytes")
storageSize, err := metricsetFields.GetValue("storage_size.bytes")
assert.NoError(t, err)
assert.True(t, storageSize.(int64) > 0)

numExtents := event["num_extents"].(int64)
numExtents := metricsetFields["num_extents"].(int64)
assert.True(t, numExtents >= 0)

indexes := event["indexes"].(int64)
indexes := metricsetFields["indexes"].(int64)
assert.True(t, indexes >= 0)

indexSize, err := event.GetValue("index_size.bytes")
indexSize, err := metricsetFields.GetValue("index_size.bytes")
assert.NoError(t, err)
assert.True(t, indexSize.(int64) > 0)
}
Expand All @@ -78,9 +80,14 @@ func TestFetch(t *testing.T) {
func TestData(t *testing.T) {
compose.EnsureUp(t, "mongodb")

f := mbtest.NewEventsFetcher(t, getConfig())
err := mbtest.WriteEvents(f, t)
if err != nil {
f := mbtest.NewReportingMetricSetV2(t, getConfig())
events, errs := mbtest.ReportingFetchV2(f)
if len(errs) > 0 {
t.Fatalf("Expected 0 error, had %d. %v\n", len(errs), errs)
}
assert.NotEmpty(t, events)

if err := mbtest.WriteEventsReporterV2(f, t, ""); err != nil {
t.Fatal("write", err)
}
}
Expand Down