Skip to content

Commit

Permalink
Cross platform support for the 'processes' plugin
Browse files Browse the repository at this point in the history
closes #798
  • Loading branch information
sparrc committed Mar 8, 2016
1 parent dd52b4b commit df052c2
Show file tree
Hide file tree
Showing 4 changed files with 309 additions and 34 deletions.
4 changes: 4 additions & 0 deletions etc/telegraf.conf
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,10 @@
[[inputs.mem]]
# no configuration

# Get the number of processes and group them by status
[[inputs.processes]]
# no configuration

# Read metrics about swap memory usage
[[inputs.swap]]
# no configuration
Expand Down
209 changes: 180 additions & 29 deletions plugins/inputs/system/processes.go
Original file line number Diff line number Diff line change
@@ -1,61 +1,212 @@
// +build !windows

package system

import (
"bytes"
"fmt"
"io/ioutil"
"log"
"os"
"os/exec"
"path"
"runtime"
"strconv"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/shirou/gopsutil/process"
)

type Processes struct {
execPS func() ([]byte, error)
readProcFile func(statFile string) ([]byte, error)

forcePS bool
forceProc bool
}

func (_ *Processes) Description() string {
return "Get the number of processes and group them by status (Linux only)"
func (p *Processes) Description() string {
return "Get the number of processes and group them by status"
}

func (_ *Processes) SampleConfig() string { return "" }
func (p *Processes) SampleConfig() string { return "" }

func (s *Processes) Gather(acc telegraf.Accumulator) error {
pids, err := process.Pids()
if err != nil {
return fmt.Errorf("error getting pids list: %s", err)
func (p *Processes) Gather(acc telegraf.Accumulator) error {
// Get an empty map of metric fields
fields := getEmptyFields()

// Decide if we will use 'ps' to get stats (use procfs otherwise)
usePS := true
if runtime.GOOS == "linux" {
usePS = false
}
if p.forcePS {
usePS = true
} else if p.forceProc {
usePS = false
}

// Gather stats from 'ps' or procfs
if usePS {
if err := p.gatherFromPS(fields); err != nil {
return err
}
} else {
if err := p.gatherFromProc(fields); err != nil {
return err
}
}
// TODO handle other OS (Windows/BSD/Solaris/OSX)

acc.AddFields("processes", fields, nil)
return nil
}

// Gets empty fields of metrics based on the OS
func getEmptyFields() map[string]interface{} {
fields := map[string]interface{}{
"paging": uint64(0),
"blocked": uint64(0),
"zombie": uint64(0),
"stopped": uint64(0),
"running": uint64(0),
"sleeping": uint64(0),
}
for _, pid := range pids {
process, err := process.NewProcess(pid)
if err != nil {
log.Printf("Can not get process %d status: %s", pid, err)
"blocked": int64(0),
"zombie": int64(0),
"stopped": int64(0),
"running": int64(0),
"sleeping": int64(0),
"total": int64(0),
}
switch runtime.GOOS {
case "freebsd":
fields["idle"] = int64(0)
fields["wait"] = int64(0)
case "darwin":
fields["idle"] = int64(0)
case "openbsd":
fields["idle"] = int64(0)
case "linux":
fields["paging"] = int64(0)
fields["total_threads"] = int64(0)
}
return fields
}

// exec `ps` to get all process states
func (p *Processes) gatherFromPS(fields map[string]interface{}) error {
out, err := p.execPS()
if err != nil {
return err
}

for _, status := range bytes.Fields(out) {
switch status[0] {
case 'W':
fields["wait"] = fields["wait"].(int64) + int64(1)
case 'U', 'D':
// Also known as uninterruptible sleep or disk sleep
fields["blocked"] = fields["blocked"].(int64) + int64(1)
case 'Z':
fields["zombie"] = fields["zombie"].(int64) + int64(1)
case 'T':
fields["stopped"] = fields["stopped"].(int64) + int64(1)
case 'R':
fields["running"] = fields["running"].(int64) + int64(1)
case 'S':
fields["sleeping"] = fields["sleeping"].(int64) + int64(1)
case 'I':
fields["idle"] = fields["idle"].(int64) + int64(1)
default:
log.Printf("processes: Unknown state [ %s ] from ps",
string(status[0]))
}
fields["total"] = fields["total"].(int64) + int64(1)
}
return nil
}

// get process states from /proc/(pid)/stat files
func (p *Processes) gatherFromProc(fields map[string]interface{}) error {
files, err := ioutil.ReadDir("/proc")
if err != nil {
return err
}

for _, file := range files {
if !file.IsDir() {
continue
}
status, err := process.Status()

statFile := path.Join("/proc", file.Name(), "stat")
data, err := p.readProcFile(statFile)
if err != nil {
log.Printf("Can not get process %d status: %s\n", pid, err)
return err
}
if data == nil {
continue
}
_, exists := fields[status]
if !exists {
log.Printf("Status '%s' for process with pid: %d\n", status, pid)

stats := bytes.Fields(data)
if len(stats) < 3 {
return fmt.Errorf("Something is terribly wrong with %s", statFile)
}
switch stats[2][0] {
case 'R':
fields["running"] = fields["running"].(int64) + int64(1)
case 'S':
fields["sleeping"] = fields["sleeping"].(int64) + int64(1)
case 'D':
fields["blocked"] = fields["blocked"].(int64) + int64(1)
case 'Z':
fields["zombies"] = fields["zombies"].(int64) + int64(1)
case 'T', 't':
fields["stopped"] = fields["stopped"].(int64) + int64(1)
case 'W':
fields["paging"] = fields["paging"].(int64) + int64(1)
default:
log.Printf("processes: Unknown state [ %s ] in file %s",
string(stats[2][0]), statFile)
}
fields["total"] = fields["total"].(int64) + int64(1)

threads, err := strconv.Atoi(string(stats[19]))
if err != nil {
log.Printf("processes: Error parsing thread count: %s", err)
continue
}
fields[status] = fields[status].(uint64) + uint64(1)
fields["total_threads"] = fields["total_threads"].(int64) + int64(threads)
}

acc.AddFields("processes", fields, nil)
return nil
}

func readProcFile(statFile string) ([]byte, error) {
if _, err := os.Stat(statFile); os.IsNotExist(err) {
return nil, nil
} else if err != nil {
return nil, err
}

data, err := ioutil.ReadFile(statFile)
if err != nil {
return nil, err
}

return data, nil
}

func execPS() ([]byte, error) {
bin, err := exec.LookPath("ps")
if err != nil {
return nil, err
}

out, err := exec.Command(bin, "axo", "state").Output()
if err != nil {
return nil, err
}

return out, err
}

func init() {
inputs.Add("processes", func() telegraf.Input {
return &Processes{}
return &Processes{
execPS: execPS,
readProcFile: readProcFile,
}
})
}
128 changes: 124 additions & 4 deletions plugins/inputs/system/processes_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package system

import (
"fmt"
"runtime"
"testing"

"github.com/influxdata/telegraf/testutil"
Expand All @@ -9,13 +11,131 @@ import (
)

func TestProcesses(t *testing.T) {
processes := &Processes{}
processes := &Processes{
execPS: execPS,
readProcFile: readProcFile,
}
var acc testutil.Accumulator

err := processes.Gather(&acc)
require.NoError(t, err)

assert.True(t, acc.HasUIntField("processes", "running"))
assert.True(t, acc.HasUIntField("processes", "sleeping"))
assert.True(t, acc.HasUIntField("processes", "stopped"))
assert.True(t, acc.HasIntField("processes", "running"))
assert.True(t, acc.HasIntField("processes", "sleeping"))
assert.True(t, acc.HasIntField("processes", "stopped"))
assert.True(t, acc.HasIntField("processes", "total"))
}

func TestFromPS(t *testing.T) {
processes := &Processes{
execPS: testExecPS,
forcePS: true,
}

var acc testutil.Accumulator
err := processes.Gather(&acc)
require.NoError(t, err)

fields := getEmptyFields()
fields["blocked"] = int64(1)
fields["running"] = int64(4)
fields["sleeping"] = int64(34)
fields["total"] = int64(39)

acc.AssertContainsTaggedFields(t, "processes", fields, map[string]string{})
}

func TestFromPSError(t *testing.T) {
processes := &Processes{
execPS: testExecPSError,
forcePS: true,
}

var acc testutil.Accumulator
err := processes.Gather(&acc)
require.Error(t, err)
}

func TestFromProcFiles(t *testing.T) {
if runtime.GOOS != "linux" {
t.Skip("This test only runs on linux")
}
tester := tester{}
processes := &Processes{
readProcFile: tester.testProcFile,
forceProc: true,
}

var acc testutil.Accumulator
err := processes.Gather(&acc)
require.NoError(t, err)

fields := getEmptyFields()
fields["sleeping"] = tester.calls
fields["total_threads"] = tester.calls
fields["total"] = tester.calls

acc.AssertContainsTaggedFields(t, "processes", fields, map[string]string{})
}

func testExecPS() ([]byte, error) {
return []byte(testPSOut), nil
}

// struct for counting calls to testProcFile
type tester struct {
calls int
}

func (t *tester) testProcFile(_ string) ([]byte, error) {
return []byte(fmt.Sprintf(testProcStat, "S", "1")), nil
}

func testExecPSError() ([]byte, error) {
return []byte(testPSOut), fmt.Errorf("ERROR!")
}

const testPSOut = `
S
S
S
S
R
R
S
S
Ss
Ss
S
SNs
Ss
Ss
S
R+
S
U
S
S
S
S
Ss
S+
Ss
S
S+
S+
Ss
S+
Ss
S
R+
Ss
S
S+
S+
Ss
S+
`

const testProcStat = `10 (rcuob/0) %s 2 0 0 0 -1 2129984 0 0 0 0 0 0 0 0 20 0 %s 0 11 0 0 18446744073709551615 0 0 0 0 0 0 0 2147483647 0 18446744073709551615 0 0 17 0 0 0 0 0 0 0 0 0 0 0 0 0 0
`
Loading

0 comments on commit df052c2

Please sign in to comment.