Skip to content

Commit

Permalink
add MongoDB plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
jipperinbham committed Jul 7, 2015
1 parent e9ad786 commit f1893d8
Show file tree
Hide file tree
Showing 9 changed files with 1,041 additions and 7 deletions.
1 change: 1 addition & 0 deletions plugins/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package all
import (
_ "github.com/influxdb/telegraf/plugins/kafka_consumer"
_ "github.com/influxdb/telegraf/plugins/memcached"
_ "github.com/influxdb/telegraf/plugins/mongodb"
_ "github.com/influxdb/telegraf/plugins/mysql"
_ "github.com/influxdb/telegraf/plugins/postgresql"
_ "github.com/influxdb/telegraf/plugins/redis"
Expand Down
110 changes: 110 additions & 0 deletions plugins/mongodb/mongodb.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package mongodb

import (
"fmt"
"net/url"
"sync"
"time"

"github.com/influxdb/telegraf/plugins"
"gopkg.in/mgo.v2"
)

type MongoDB struct {
Servers []string
mongos map[string]*Server
}

var sampleConfig = `
# An array of URI to gather stats about. Specify an ip or hostname
# with optional port add password. ie mongodb://user:auth_key@10.10.3.30:27017,
# mongodb://10.10.3.33:18832, 10.0.0.1:10000, etc.
#
# If no servers are specified, then 127.0.0.1 is used as the host and 27107 as the port.
servers = ["127.0.0.1:27017"]`

func (m *MongoDB) SampleConfig() string {
return sampleConfig
}

func (*MongoDB) Description() string {
return "Read metrics from one or many MongoDB servers"
}

var localhost = &url.URL{Host: "127.0.0.1:27017"}

// Reads stats from all configured servers accumulates stats.
// Returns one of the errors encountered while gather stats (if any).
func (m *MongoDB) Gather(acc plugins.Accumulator) error {
if len(m.Servers) == 0 {
m.gatherServer(m.getMongoServer(localhost), acc)
return nil
}

var wg sync.WaitGroup

var outerr error

for _, serv := range m.Servers {
u, err := url.Parse(serv)
if err != nil {
return fmt.Errorf("Unable to parse to address '%s': %s", serv, err)
} else if u.Scheme == "" {
u.Scheme = "mongodb"
// fallback to simple string based address (i.e. "10.0.0.1:10000")
u.Host = serv
if u.Path == u.Host {
u.Path = ""
}
}
wg.Add(1)
go func(serv string) {
defer wg.Done()
outerr = m.gatherServer(m.getMongoServer(u), acc)
}(serv)
}

wg.Wait()

return outerr
}

func (m *MongoDB) getMongoServer(url *url.URL) *Server {
if _, ok := m.mongos[url.Host]; !ok {
m.mongos[url.Host] = &Server{
Url: url,
}
}
return m.mongos[url.Host]
}

func (m *MongoDB) gatherServer(server *Server, acc plugins.Accumulator) error {
if server.Session == nil {
var dialAddrs []string
if server.Url.User != nil {
dialAddrs = []string{server.Url.String()}
} else {
dialAddrs = []string{server.Url.Host}
}
dialInfo, err := mgo.ParseURL(dialAddrs[0])
if err != nil {
return fmt.Errorf("Unable to parse URL (%s), %s\n", dialAddrs[0], err.Error())
}
dialInfo.Direct = true
dialInfo.Timeout = time.Duration(10) * time.Second
sess, err := mgo.DialWithInfo(dialInfo)
if err != nil {
return fmt.Errorf("Unable to connect to MongoDB, %s\n", err.Error())
}
server.Session = sess
}
return server.gatherData(acc)
}

func init() {
plugins.Add("mongodb", func() plugins.Plugin {
return &MongoDB{
mongos: make(map[string]*Server),
}
})
}
100 changes: 100 additions & 0 deletions plugins/mongodb/mongodb_data.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package mongodb

import (
"fmt"
"reflect"
"strconv"

"github.com/influxdb/telegraf/plugins"
)

type MongodbData struct {
StatLine *StatLine
Tags map[string]string
}

func NewMongodbData(statLine *StatLine, tags map[string]string) *MongodbData {
if statLine.NodeType != "" && statLine.NodeType != "UNK" {
tags["state"] = statLine.NodeType
}
return &MongodbData{
StatLine: statLine,
Tags: tags,
}
}

var DefaultStats = map[string]string{
"inserts_per_sec": "Insert",
"queries_per_sec": "Query",
"updates_per_sec": "Update",
"deletes_per_sec": "Delete",
"getmores_per_sec": "GetMore",
"commands_per_sec": "Command",
"flushes_per_sec": "Flushes",
"vsize_megabytes": "Virtual",
"resident_megabytes": "Resident",
"queued_reads": "QueuedReaders",
"queued_writes": "QueuedWriters",
"active_reads": "ActiveReaders",
"active_writes": "ActiveWriters",
"net_in_bytes": "NetIn",
"net_out_bytes": "NetOut",
"open_connections": "NumConnections",
}

var DefaultReplStats = map[string]string{
"repl_inserts_per_sec": "InsertR",
"repl_queries_per_sec": "QueryR",
"repl_updates_per_sec": "UpdateR",
"repl_deletes_per_sec": "DeleteR",
"repl_getmores_per_sec": "GetMoreR",
"repl_commands_per_sec": "CommandR",
"member_status": "NodeType",
}

var MmapStats = map[string]string{
"mapped_megabytes": "Mapped",
"non-mapped_megabytes": "NonMapped",
"page_faults_per_sec": "Faults",
}

var WiredTigerStats = map[string]string{
"percent_cache_dirty": "CacheDirtyPercent",
"percent_cache_used": "CacheUsedPercent",
}

func (d *MongodbData) AddDefaultStats(acc plugins.Accumulator) {
statLine := reflect.ValueOf(d.StatLine).Elem()
d.addStat(acc, statLine, DefaultStats)
if d.StatLine.NodeType != "" {
d.addStat(acc, statLine, DefaultReplStats)
}
if d.StatLine.StorageEngine == "mmapv1" {
d.addStat(acc, statLine, MmapStats)
} else if d.StatLine.StorageEngine == "wiredTiger" {
for key, value := range WiredTigerStats {
val := statLine.FieldByName(value).Interface()
percentVal := fmt.Sprintf("%.1f", val.(float64)*100)
floatVal, _ := strconv.ParseFloat(percentVal, 64)
d.add(acc, key, floatVal)
}
}
}

func (d *MongodbData) addStat(acc plugins.Accumulator, statLine reflect.Value, stats map[string]string) {
for key, value := range stats {
val := statLine.FieldByName(value).Interface()
d.add(acc, key, val)
}
}

func (d *MongodbData) add(acc plugins.Accumulator, key string, val interface{}) {
acc.AddValuesWithTime(
key,
map[string]interface{}{
"value": val,
},
d.Tags,
d.StatLine.Time,
)
}
111 changes: 111 additions & 0 deletions plugins/mongodb/mongodb_data_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package mongodb

import (
"testing"
"time"

"github.com/influxdb/telegraf/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

var tags = make(map[string]string)

func TestAddNonReplStats(t *testing.T) {
d := NewMongodbData(
&StatLine{
StorageEngine: "",
Time: time.Now(),
Insert: 0,
Query: 0,
Update: 0,
Delete: 0,
GetMore: 0,
Command: 0,
Flushes: 0,
Virtual: 0,
Resident: 0,
QueuedReaders: 0,
QueuedWriters: 0,
ActiveReaders: 0,
ActiveWriters: 0,
NetIn: 0,
NetOut: 0,
NumConnections: 0,
},
tags,
)
var acc testutil.Accumulator

d.AddDefaultStats(&acc)

for key, _ := range DefaultStats {
assert.True(t, acc.HasIntValue(key))
}
}

func TestAddReplStats(t *testing.T) {
d := NewMongodbData(
&StatLine{
StorageEngine: "mmapv1",
Mapped: 0,
NonMapped: 0,
Faults: 0,
},
tags,
)

var acc testutil.Accumulator

d.AddDefaultStats(&acc)

for key, _ := range MmapStats {
assert.True(t, acc.HasIntValue(key))
}
}

func TestAddWiredTigerStats(t *testing.T) {
d := NewMongodbData(
&StatLine{
StorageEngine: "wiredTiger",
CacheDirtyPercent: 0,
CacheUsedPercent: 0,
},
tags,
)

var acc testutil.Accumulator

d.AddDefaultStats(&acc)

for key, _ := range WiredTigerStats {
assert.True(t, acc.HasFloatValue(key))
}
}

func TestStateTag(t *testing.T) {
d := NewMongodbData(
&StatLine{
StorageEngine: "",
Time: time.Now(),
Insert: 0,
Query: 0,
NodeType: "PRI",
},
tags,
)

stats := []string{"inserts_per_sec", "queries_per_sec"}

stateTags := make(map[string]string)
stateTags["state"] = "PRI"

var acc testutil.Accumulator

d.AddDefaultStats(&acc)

for _, key := range stats {
err := acc.ValidateTaggedValue(key, int64(0), stateTags)
require.NoError(t, err)
}
}
50 changes: 50 additions & 0 deletions plugins/mongodb/mongodb_server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package mongodb

import (
"net/url"
"time"

"github.com/influxdb/telegraf/plugins"
"gopkg.in/mgo.v2"
"gopkg.in/mgo.v2/bson"
)

type Server struct {
Url *url.URL
Session *mgo.Session
lastResult *ServerStatus
}

func (s *Server) getDefaultTags() map[string]string {
tags := make(map[string]string)
tags["host"] = s.Url.Host
return tags
}

func (s *Server) gatherData(acc plugins.Accumulator) error {
s.Session.SetMode(mgo.Eventual, true)
s.Session.SetSocketTimeout(0)
result := &ServerStatus{}
err := s.Session.DB("admin").Run(bson.D{{"serverStatus", 1}, {"recordStats", 0}}, result)
if err != nil {
return err
}
defer func() {
s.lastResult = result
}()

result.SampleTime = time.Now()
if s.lastResult != nil && result != nil {
duration := result.SampleTime.Sub(s.lastResult.SampleTime)
durationInSeconds := int64(duration.Seconds())
if durationInSeconds == 0 {
durationInSeconds = 1
}
data := NewMongodbData(
NewStatLine(*s.lastResult, *result, s.Url.Host, true, durationInSeconds),
s.getDefaultTags(),
)
data.AddDefaultStats(acc)
}
return nil
}
Loading

0 comments on commit f1893d8

Please sign in to comment.