Skip to content

Commit

Permalink
Cross platform support for the 'processes' plugin
Browse files Browse the repository at this point in the history
closes #798
  • Loading branch information
sparrc committed Mar 8, 2016
1 parent dd52b4b commit 7008bc2
Show file tree
Hide file tree
Showing 3 changed files with 136 additions and 30 deletions.
158 changes: 132 additions & 26 deletions plugins/inputs/system/processes.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,21 @@
// +build !windows

package system

import (
"bytes"
"encoding/binary"
"fmt"
"io/ioutil"
"log"
"os"
"os/exec"
"path"
"runtime"
"strconv"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/shirou/gopsutil/process"
)

type Processes struct {
Expand All @@ -19,41 +28,138 @@ func (_ *Processes) Description() string {
func (_ *Processes) SampleConfig() string { return "" }

func (s *Processes) Gather(acc telegraf.Accumulator) error {
pids, err := process.Pids()
fields := map[string]interface{}{
"blocked": int64(0),
"zombie": int64(0),
"stopped": int64(0),
"running": int64(0),
"sleeping": int64(0),
"total": int64(0),
}

usePS := true
switch runtime.GOOS {
case "freebsd":
fields["idle"] = int64(0)
fields["wait"] = int64(0)
case "darwin":
fields["idle"] = int64(0)
case "openbsd":
fields["idle"] = int64(0)
case "linux":
usePS = false
fields["paging"] = int64(0)
fields["total_threads"] = int64(0)
}

if usePS {
if err := fromPS(fields); err != nil {
return err
}
} else {
if err := fromProc(fields); err != nil {
return err
}
}

acc.AddFields("processes", fields, nil)
return nil
}

// exec `ps` to get all process states
func fromPS(fields map[string]interface{}) error {
bin, err := exec.LookPath("ps")
if err != nil {
return fmt.Errorf("error getting pids list: %s", err)
return err
}
// TODO handle other OS (Windows/BSD/Solaris/OSX)
fields := map[string]interface{}{
"paging": uint64(0),
"blocked": uint64(0),
"zombie": uint64(0),
"stopped": uint64(0),
"running": uint64(0),
"sleeping": uint64(0),
}
for _, pid := range pids {
process, err := process.NewProcess(pid)
if err != nil {
log.Printf("Can not get process %d status: %s", pid, err)
continue

out, err := exec.Command(bin, "axo", "state").Output()
if err != nil {
return err
}

for _, status := range bytes.Fields(out) {
switch status[0] {
case 'W':
fields["wait"] = fields["wait"].(int64) + int64(1)
case 'U', 'D':
// Also known as uninterruptible sleep or disk sleep
fields["blocked"] = fields["blocked"].(int64) + int64(1)
case 'Z':
fields["zombie"] = fields["zombie"].(int64) + int64(1)
case 'T':
fields["stopped"] = fields["stopped"].(int64) + int64(1)
case 'R':
fields["running"] = fields["running"].(int64) + int64(1)
case 'S':
fields["sleeping"] = fields["sleeping"].(int64) + int64(1)
case 'I':
fields["idle"] = fields["idle"].(int64) + int64(1)
default:
log.Printf("processes: Unknown state [ %s ] from ps",
string(status[0]))
}
status, err := process.Status()
if err != nil {
log.Printf("Can not get process %d status: %s\n", pid, err)
fields["total"] = fields["total"].(int64) + int64(1)
}
return nil
}

// get process states from /proc/(pid)/stat files
func fromProc(fields map[string]interface{}) error {
files, err := ioutil.ReadDir("/proc")
if err != nil {
return err
}

for _, file := range files {
if !file.IsDir() {
continue
}
_, exists := fields[status]
if !exists {
log.Printf("Status '%s' for process with pid: %d\n", status, pid)

statFile := path.Join("/proc", file.Name(), "stat")
if _, err := os.Stat(statFile); os.IsNotExist(err) {
continue
} else if os.IsPermission(err) {
return err
}
fields[status] = fields[status].(uint64) + uint64(1)
}

acc.AddFields("processes", fields, nil)
data, err := ioutil.ReadFile(statFile)
if err != nil {
return err
}

stats := bytes.Fields(data)
if len(stats) < 3 {
return fmt.Errorf("Something is terribly wrong with %s", statFile)
}
switch stats[2][0] {
case 'R':
fields["running"] = fields["running"].(int64) + int64(1)
case 'S':
fields["sleeping"] = fields["sleeping"].(int64) + int64(1)
case 'D':
fields["blocked"] = fields["blocked"].(int64) + int64(1)
case 'Z':
fields["zombies"] = fields["zombies"].(int64) + int64(1)
case 'T', 't':
fields["stopped"] = fields["stopped"].(int64) + int64(1)
case 'W':
fields["paging"] = fields["paging"].(int64) + int64(1)
default:
log.Printf("processes: Unknown state [ %s ] in file %s",
string(stats[2][0]), statFile)
}
fields["total"] = fields["total"].(int64) + int64(1)

threads, err := strconv.Atoi(string(stats[19]))
if err != nil {
return err
}
fields["total_threads"] = fields["total_threads"].(int64) + int64(threads)
}
return nil
}

func init() {
inputs.Add("processes", func() telegraf.Input {
return &Processes{}
Expand Down
6 changes: 3 additions & 3 deletions plugins/inputs/system/processes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ func TestProcesses(t *testing.T) {
err := processes.Gather(&acc)
require.NoError(t, err)

assert.True(t, acc.HasUIntField("processes", "running"))
assert.True(t, acc.HasUIntField("processes", "sleeping"))
assert.True(t, acc.HasUIntField("processes", "stopped"))
assert.True(t, acc.HasIntField("processes", "running"))
assert.True(t, acc.HasIntField("processes", "sleeping"))
assert.True(t, acc.HasIntField("processes", "stopped"))
}
2 changes: 1 addition & 1 deletion scripts/circle-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ telegraf -sample-config > $tmpdir/config.toml
exit_if_fail telegraf -config $tmpdir/config.toml \
-test -input-filter cpu:mem

mv $GOPATH/bin/telegraf $CIRCLE_ARTIFACTS
cat $GOPATH/bin/telegraf | gzip > $CIRCLE_ARTIFACTS/telegraf.gz

eval "git describe --exact-match HEAD"
if [ $? -eq 0 ]; then
Expand Down

0 comments on commit 7008bc2

Please sign in to comment.