Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lustre2 plugin #86

Merged
merged 6 commits into from
Aug 4, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions plugins/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package all
import (
_ "github.com/influxdb/telegraf/plugins/elasticsearch"
_ "github.com/influxdb/telegraf/plugins/kafka_consumer"
_ "github.com/influxdb/telegraf/plugins/lustre2"
_ "github.com/influxdb/telegraf/plugins/memcached"
_ "github.com/influxdb/telegraf/plugins/mongodb"
_ "github.com/influxdb/telegraf/plugins/mysql"
Expand Down
236 changes: 236 additions & 0 deletions plugins/lustre2/lustre2.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,236 @@
// +build linux

/*
Lustre 2.x telegraf plugin

Lustre (http://lustre.org/) is an open-source, parallel file system
for HPC environments. It stores statistics about its activity in
/proc

*/
package lustre2

import (
"path/filepath"
"strconv"
"strings"

"github.com/influxdb/telegraf/plugins"
common "github.com/influxdb/telegraf/plugins/system/ps/common"
)

// Lustre proc files can change between versions, so we want to future-proof
// by letting people choose what to look at.
type Lustre2 struct {
Ost_procfiles []string
Mds_procfiles []string
}

var sampleConfig = `
# An array of /proc globs to search for Lustre stats
# If not specified, the default will work on Lustre 2.5.x
#
# ost_procfiles = ["/proc/fs/lustre/obdfilter/*/stats", "/proc/fs/lustre/osd-ldiskfs/*/stats"]
# mds_procfiles = ["/proc/fs/lustre/mdt/*/md_stats"]`

/* The wanted fields would be a []string if not for the
lines that start with read_bytes/write_bytes and contain
both the byte count and the function call count
*/
type mapping struct {
inProc string // What to look for at the start of a line in /proc/fs/lustre/*
field uint32 // which field to extract from that line
reportAs string // What measurement name to use
tag string // Additional tag to add for this metric
}

var wanted_ost_fields = []*mapping{
{
inProc: "write_bytes",
field: 6,
reportAs: "write_bytes",
},
{ // line starts with 'write_bytes', but value write_calls is in second column
inProc: "write_bytes",
field: 1,
reportAs: "write_calls",
},
{
inProc: "read_bytes",
field: 6,
reportAs: "read_bytes",
},
{ // line starts with 'read_bytes', but value read_calls is in second column
inProc: "read_bytes",
field: 1,
reportAs: "read_calls",
},
{
inProc: "cache_hit",
},
{
inProc: "cache_miss",
},
{
inProc: "cache_access",
},
}

var wanted_mds_fields = []*mapping{
{
inProc: "open",
},
{
inProc: "close",
},
{
inProc: "mknod",
},
{
inProc: "link",
},
{
inProc: "unlink",
},
{
inProc: "mkdir",
},
{
inProc: "rmdir",
},
{
inProc: "rename",
},
{
inProc: "getattr",
},
{
inProc: "setattr",
},
{
inProc: "getxattr",
},
{
inProc: "setxattr",
},
{
inProc: "statfs",
},
{
inProc: "sync",
},
{
inProc: "samedir_rename",
},
{
inProc: "crossdir_rename",
},
}

func (l *Lustre2) GetLustreProcStats(fileglob string, wanted_fields []*mapping, acc plugins.Accumulator) error {
files, err := filepath.Glob(fileglob)
if err != nil {
return err
}

for _, file := range files {
/* Turn /proc/fs/lustre/obdfilter/<ost_name>/stats and similar
* into just the object store target name
* Assumpion: the target name is always second to last,
* which is true in Lustre 2.1->2.5
*/
path := strings.Split(file, "/")
name := path[len(path)-2]
tags := map[string]string{
"name": name,
}

lines, err := common.ReadLines(file)
if err != nil {
return err
}

for _, line := range lines {
fields := strings.Fields(line)

for _, wanted := range wanted_fields {
var data uint64
if fields[0] == wanted.inProc {
wanted_field := wanted.field
// if not set, assume field[1]. Shouldn't be field[0], as
// that's a string
if wanted_field == 0 {
wanted_field = 1
}
data, err = strconv.ParseUint((fields[wanted_field]), 10, 64)
if err != nil {
return err
}
report_name := wanted.inProc
if wanted.reportAs != "" {
report_name = wanted.reportAs
}
acc.Add(report_name, data, tags)

}
}
}
}
return nil
}

// SampleConfig returns sample configuration message
func (l *Lustre2) SampleConfig() string {
return sampleConfig
}

// Description returns description of Lustre2 plugin
func (l *Lustre2) Description() string {
return "Read metrics from local Lustre service on OST, MDS"
}

// Gather reads stats from all lustre targets
func (l *Lustre2) Gather(acc plugins.Accumulator) error {

if len(l.Ost_procfiles) == 0 {
// read/write bytes are in obdfilter/<ost_name>/stats
err := l.GetLustreProcStats("/proc/fs/lustre/obdfilter/*/stats", wanted_ost_fields, acc)
if err != nil {
return err
}
// cache counters are in osd-ldiskfs/<ost_name>/stats
err = l.GetLustreProcStats("/proc/fs/lustre/osd-ldiskfs/*/stats", wanted_ost_fields, acc)
if err != nil {
return err
}
}

if len(l.Mds_procfiles) == 0 {
// Metadata server stats
err := l.GetLustreProcStats("/proc/fs/lustre/mdt/*/md_stats", wanted_mds_fields, acc)
if err != nil {
return err
}
}

for _, procfile := range l.Ost_procfiles {
err := l.GetLustreProcStats(procfile, wanted_ost_fields, acc)
if err != nil {
return err
}
}
for _, procfile := range l.Mds_procfiles {
err := l.GetLustreProcStats(procfile, wanted_mds_fields, acc)
if err != nil {
return err
}
}

return nil
}

func init() {
plugins.Add("lustre2", func() plugins.Plugin {
return &Lustre2{}
})
}
144 changes: 144 additions & 0 deletions plugins/lustre2/lustre2_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
package lustre2

import (
"io/ioutil"
"os"
"testing"

"github.com/influxdb/telegraf/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

// Set config file variables to point to fake directory structure instead of /proc?

const obdfilterProcContents = `snapshot_time 1438693064.430544 secs.usecs
read_bytes 203238095 samples [bytes] 4096 1048576 78026117632000
write_bytes 71893382 samples [bytes] 1 1048576 15201500833981
get_info 1182008495 samples [reqs]
set_info_async 2 samples [reqs]
connect 1117 samples [reqs]
reconnect 1160 samples [reqs]
disconnect 1084 samples [reqs]
statfs 3575885 samples [reqs]
create 698 samples [reqs]
destroy 3190060 samples [reqs]
setattr 605647 samples [reqs]
punch 805187 samples [reqs]
sync 6608753 samples [reqs]
preprw 275131477 samples [reqs]
commitrw 275131477 samples [reqs]
quotactl 229231 samples [reqs]
ping 78020757 samples [reqs]
`

const osdldiskfsProcContents = `snapshot_time 1438693135.640551 secs.usecs
get_page 275132812 samples [usec] 0 3147 1320420955 22041662259
cache_access 19047063027 samples [pages] 1 1 19047063027
cache_hit 7393729777 samples [pages] 1 1 7393729777
cache_miss 11653333250 samples [pages] 1 1 11653333250
`

const mdtProcContents = `snapshot_time 1438693238.20113 secs.usecs
open 1024577037 samples [reqs]
close 873243496 samples [reqs]
mknod 349042 samples [reqs]
link 445 samples [reqs]
unlink 3549417 samples [reqs]
mkdir 705499 samples [reqs]
rmdir 227434 samples [reqs]
rename 629196 samples [reqs]
getattr 1503663097 samples [reqs]
setattr 1898364 samples [reqs]
getxattr 6145349681 samples [reqs]
setxattr 83969 samples [reqs]
statfs 2916320 samples [reqs]
sync 434081 samples [reqs]
samedir_rename 259625 samples [reqs]
crossdir_rename 369571 samples [reqs]
`

type metrics struct {
name string
value uint64
}

func TestLustre2GeneratesMetrics(t *testing.T) {

tempdir := os.TempDir() + "/telegraf/proc/fs/lustre/"
ost_name := "OST0001"

mdtdir := tempdir + "/mdt/"
err := os.MkdirAll(mdtdir+"/"+ost_name, 0755)
require.NoError(t, err)

osddir := tempdir + "/osd-ldiskfs/"
err = os.MkdirAll(osddir+"/"+ost_name, 0755)
require.NoError(t, err)

obddir := tempdir + "/obdfilter/"
err = os.MkdirAll(obddir+"/"+ost_name, 0755)
require.NoError(t, err)

err = ioutil.WriteFile(mdtdir+"/"+ost_name+"/md_stats", []byte(mdtProcContents), 0644)
require.NoError(t, err)

err = ioutil.WriteFile(osddir+"/"+ost_name+"/stats", []byte(osdldiskfsProcContents), 0644)
require.NoError(t, err)

err = ioutil.WriteFile(obddir+"/"+ost_name+"/stats", []byte(obdfilterProcContents), 0644)
require.NoError(t, err)

m := &Lustre2{
Ost_procfiles: []string{obddir + "/*/stats", osddir + "/*/stats"},
Mds_procfiles: []string{mdtdir + "/*/md_stats"},
}

var acc testutil.Accumulator

err = m.Gather(&acc)
require.NoError(t, err)

tags := map[string]string{
"name": ost_name,
}

intMetrics := []*metrics{
{
name: "write_bytes",
value: 15201500833981,
},
{
name: "read_bytes",
value: 78026117632000,
},
{
name: "write_calls",
value: 71893382,
},
{
name: "read_calls",
value: 203238095,
},
{
name: "cache_hit",
value: 7393729777,
},
{
name: "cache_access",
value: 19047063027,
},
{
name: "cache_miss",
value: 11653333250,
},
}

for _, metric := range intMetrics {
assert.True(t, acc.HasUIntValue(metric.name), metric.name)
assert.True(t, acc.CheckTaggedValue(metric.name, metric.value, tags))
}

err = os.RemoveAll(os.TempDir() + "/telegraf")
require.NoError(t, err)
}