Skip to content

Commit

Permalink
Cross platform support for the 'processes' plugin
Browse files Browse the repository at this point in the history
closes #798
  • Loading branch information
sparrc committed Mar 9, 2016
1 parent 5ffa2a3 commit 2f45b8b
Show file tree
Hide file tree
Showing 7 changed files with 381 additions and 35 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
- [#797](https://github.com/influxdata/telegraf/issues/797): Provide option for persistent MQTT consumer client sessions.
- [#799](https://github.com/influxdata/telegraf/pull/799): Add number of threads for procstat input plugin. Thanks @titilambert!
- [#776](https://github.com/influxdata/telegraf/pull/776): Add Zookeeper chroot option to kafka_consumer. Thanks @prune998!
- [#811](https://github.com/influxdata/telegraf/pull/811): Add processes plugin for classifying total procs on system. Thanks @titilambert!

### Bugfixes
- [#748](https://github.com/influxdata/telegraf/issues/748): Fix sensor plugin split on ":"
Expand All @@ -24,6 +25,7 @@
- [#773](https://github.com/influxdata/telegraf/issues/773): Fix duplicate measurements in snmp plugin. Thanks @titilambert!
- [#708](https://github.com/influxdata/telegraf/issues/708): packaging: build ARM package
- [#713](https://github.com/influxdata/telegraf/issues/713): packaging: insecure permissions error on log directory
- [#816](https://github.com/influxdata/telegraf/issues/816): Fix phpfpm panic if fcgi endpoint unreachable.

## v0.10.4.1

Expand Down
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -214,11 +214,13 @@ Currently implemented sources:
* disk
* diskio
* swap
* processes

Telegraf can also collect metrics via the following service plugins:

* statsd
* udp listener
* udp_listener
* tcp_listener
* mqtt_consumer
* kafka_consumer
* nats_consumer
Expand Down
4 changes: 4 additions & 0 deletions etc/telegraf.conf
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,10 @@
[[inputs.mem]]
# no configuration

# Get the number of processes and group them by status
[[inputs.processes]]
# no configuration

# Read metrics about swap memory usage
[[inputs.swap]]
# no configuration
Expand Down
58 changes: 58 additions & 0 deletions plugins/inputs/system/PROCESSES_README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# Processes Input Plugin

This plugin gathers info about the total number of processes and groups
them by status (zombie, sleeping, running, etc.)

On linux this plugin requires access to procfs (/proc), on other OSes
it requires access to execute `ps`.

### Configuration:

```toml
# Get the number of processes and group them by status
[[inputs.processes]]
# no configuration
```

### Measurements & Fields:

- processes
- blocked (aka disk sleep or uninterruptible sleep)
- running
- sleeping
- stopped
- total
- zombie
- wait (freebsd only)
- idle (bsd only)
- paging (linux only)
- total_threads (linux only)

### Process State Mappings

Different OSes use slightly different State codes for their processes, these
state codes are documented in `man ps`, and I will give a mapping of what major
OS state codes correspond to in telegraf metrics:

```
Linux FreeBSD Darwin meaning
R R R running
S S S sleeping
Z Z Z zombie
T T T stopped
none I I idle (sleeping for longer than about 20 seconds)
D D,L U blocked (waiting in uninterruptible sleep, or locked)
W W none paging (linux kernel < 2.6 only), wait (freebsd)
```

### Tags:

None

### Example Output:

```
$ telegraf -config ~/ws/telegraf.conf -input-filter processes -test
* Plugin: processes, Collection 1
> processes blocked=8i,running=1i,sleeping=265i,stopped=0i,total=274i,zombie=0i,paging=0i,total_threads=687i 1457478636980905042
```
213 changes: 184 additions & 29 deletions plugins/inputs/system/processes.go
Original file line number Diff line number Diff line change
@@ -1,61 +1,216 @@
// +build !windows

package system

import (
"bytes"
"fmt"
"io/ioutil"
"log"
"os"
"os/exec"
"path"
"runtime"
"strconv"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/shirou/gopsutil/process"
)

type Processes struct {
execPS func() ([]byte, error)
readProcFile func(statFile string) ([]byte, error)

forcePS bool
forceProc bool
}

func (_ *Processes) Description() string {
return "Get the number of processes and group them by status (Linux only)"
func (p *Processes) Description() string {
return "Get the number of processes and group them by status"
}

func (_ *Processes) SampleConfig() string { return "" }
func (p *Processes) SampleConfig() string { return "" }

func (s *Processes) Gather(acc telegraf.Accumulator) error {
pids, err := process.Pids()
if err != nil {
return fmt.Errorf("error getting pids list: %s", err)
func (p *Processes) Gather(acc telegraf.Accumulator) error {
// Get an empty map of metric fields
fields := getEmptyFields()

// Decide if we will use 'ps' to get stats (use procfs otherwise)
usePS := true
if runtime.GOOS == "linux" {
usePS = false
}
// TODO handle other OS (Windows/BSD/Solaris/OSX)
if p.forcePS {
usePS = true
} else if p.forceProc {
usePS = false
}

// Gather stats from 'ps' or procfs
if usePS {
if err := p.gatherFromPS(fields); err != nil {
return err
}
} else {
if err := p.gatherFromProc(fields); err != nil {
return err
}
}

acc.AddFields("processes", fields, nil)
return nil
}

// Gets empty fields of metrics based on the OS
func getEmptyFields() map[string]interface{} {
fields := map[string]interface{}{
"paging": uint64(0),
"blocked": uint64(0),
"zombie": uint64(0),
"stopped": uint64(0),
"running": uint64(0),
"sleeping": uint64(0),
}
for _, pid := range pids {
process, err := process.NewProcess(pid)
if err != nil {
log.Printf("Can not get process %d status: %s", pid, err)
"blocked": int64(0),
"zombie": int64(0),
"stopped": int64(0),
"running": int64(0),
"sleeping": int64(0),
"total": int64(0),
}
switch runtime.GOOS {
case "freebsd":
fields["idle"] = int64(0)
fields["wait"] = int64(0)
case "darwin":
fields["idle"] = int64(0)
case "openbsd":
fields["idle"] = int64(0)
case "linux":
fields["paging"] = int64(0)
fields["total_threads"] = int64(0)
}
return fields
}

// exec `ps` to get all process states
func (p *Processes) gatherFromPS(fields map[string]interface{}) error {
out, err := p.execPS()
if err != nil {
return err
}

for i, status := range bytes.Fields(out) {
if i == 0 && string(status) == "STAT" {
// This is a header, skip it
continue
}
switch status[0] {
case 'W':
fields["wait"] = fields["wait"].(int64) + int64(1)
case 'U', 'D', 'L':
// Also known as uninterruptible sleep or disk sleep
fields["blocked"] = fields["blocked"].(int64) + int64(1)
case 'Z':
fields["zombie"] = fields["zombie"].(int64) + int64(1)
case 'T':
fields["stopped"] = fields["stopped"].(int64) + int64(1)
case 'R':
fields["running"] = fields["running"].(int64) + int64(1)
case 'S':
fields["sleeping"] = fields["sleeping"].(int64) + int64(1)
case 'I':
fields["idle"] = fields["idle"].(int64) + int64(1)
default:
log.Printf("processes: Unknown state [ %s ] from ps",
string(status[0]))
}
fields["total"] = fields["total"].(int64) + int64(1)
}
return nil
}

// get process states from /proc/(pid)/stat files
func (p *Processes) gatherFromProc(fields map[string]interface{}) error {
files, err := ioutil.ReadDir("/proc")
if err != nil {
return err
}

for _, file := range files {
if !file.IsDir() {
continue
}
status, err := process.Status()

statFile := path.Join("/proc", file.Name(), "stat")
data, err := p.readProcFile(statFile)
if err != nil {
log.Printf("Can not get process %d status: %s\n", pid, err)
return err
}
if data == nil {
continue
}
_, exists := fields[status]
if !exists {
log.Printf("Status '%s' for process with pid: %d\n", status, pid)

stats := bytes.Fields(data)
if len(stats) < 3 {
return fmt.Errorf("Something is terribly wrong with %s", statFile)
}
switch stats[2][0] {
case 'R':
fields["running"] = fields["running"].(int64) + int64(1)
case 'S':
fields["sleeping"] = fields["sleeping"].(int64) + int64(1)
case 'D':
fields["blocked"] = fields["blocked"].(int64) + int64(1)
case 'Z':
fields["zombies"] = fields["zombies"].(int64) + int64(1)
case 'T', 't':
fields["stopped"] = fields["stopped"].(int64) + int64(1)
case 'W':
fields["paging"] = fields["paging"].(int64) + int64(1)
default:
log.Printf("processes: Unknown state [ %s ] in file %s",
string(stats[2][0]), statFile)
}
fields["total"] = fields["total"].(int64) + int64(1)

threads, err := strconv.Atoi(string(stats[19]))
if err != nil {
log.Printf("processes: Error parsing thread count: %s", err)
continue
}
fields[status] = fields[status].(uint64) + uint64(1)
fields["total_threads"] = fields["total_threads"].(int64) + int64(threads)
}

acc.AddFields("processes", fields, nil)
return nil
}

func readProcFile(statFile string) ([]byte, error) {
if _, err := os.Stat(statFile); os.IsNotExist(err) {
return nil, nil
} else if err != nil {
return nil, err
}

data, err := ioutil.ReadFile(statFile)
if err != nil {
return nil, err
}

return data, nil
}

func execPS() ([]byte, error) {
bin, err := exec.LookPath("ps")
if err != nil {
return nil, err
}

out, err := exec.Command(bin, "axo", "state").Output()
if err != nil {
return nil, err
}

return out, err
}

func init() {
inputs.Add("processes", func() telegraf.Input {
return &Processes{}
return &Processes{
execPS: execPS,
readProcFile: readProcFile,
}
})
}
Loading

0 comments on commit 2f45b8b

Please sign in to comment.