Skip to content

Commit

Permalink
Add support for multiple line text and perfdata to nagios parser (inf…
Browse files Browse the repository at this point in the history
  • Loading branch information
sergeykhegay authored and Mathieu Lecarme committed Apr 17, 2020
1 parent b9796e2 commit 9bf7b22
Show file tree
Hide file tree
Showing 4 changed files with 706 additions and 181 deletions.
112 changes: 46 additions & 66 deletions plugins/inputs/exec/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@ package exec
import (
"bytes"
"fmt"
"log"
"os/exec"
"path/filepath"
"runtime"
"strings"
"sync"
"syscall"
"time"

"github.com/kballard/go-shellquote"
Expand Down Expand Up @@ -61,39 +61,18 @@ func NewExec() *Exec {
}

type Runner interface {
Run(*Exec, string, telegraf.Accumulator) ([]byte, error)
Run(string, time.Duration) ([]byte, []byte, error)
}

type CommandRunner struct{}

func AddNagiosState(exitCode error, acc telegraf.Accumulator) error {
nagiosState := 0
if exitCode != nil {
exiterr, ok := exitCode.(*exec.ExitError)
if ok {
status, ok := exiterr.Sys().(syscall.WaitStatus)
if ok {
nagiosState = status.ExitStatus()
} else {
return fmt.Errorf("exec: unable to get nagios plugin exit code")
}
} else {
return fmt.Errorf("exec: unable to get nagios plugin exit code")
}
}
fields := map[string]interface{}{"state": nagiosState}
acc.AddFields("nagios_state", fields, nil)
return nil
}

func (c CommandRunner) Run(
e *Exec,
command string,
acc telegraf.Accumulator,
) ([]byte, error) {
timeout time.Duration,
) ([]byte, []byte, error) {
split_cmd, err := shellquote.Split(command)
if err != nil || len(split_cmd) == 0 {
return nil, fmt.Errorf("exec: unable to parse command, %s", err)
return nil, nil, fmt.Errorf("exec: unable to parse command, %s", err)
}

cmd := exec.Command(split_cmd[0], split_cmd[1:]...)
Expand All @@ -105,44 +84,35 @@ func (c CommandRunner) Run(
cmd.Stdout = &out
cmd.Stderr = &stderr

if err := internal.RunTimeout(cmd, e.Timeout.Duration); err != nil {
switch e.parser.(type) {
case *nagios.NagiosParser:
AddNagiosState(err, acc)
default:
var errMessage = ""
if stderr.Len() > 0 {
stderr = removeCarriageReturns(stderr)
// Limit the number of bytes.
didTruncate := false
if stderr.Len() > MaxStderrBytes {
stderr.Truncate(MaxStderrBytes)
didTruncate = true
}
if i := bytes.IndexByte(stderr.Bytes(), '\n'); i > 0 {
// Only show truncation if the newline wasn't the last character.
if i < stderr.Len()-1 {
didTruncate = true
}
stderr.Truncate(i)
}
if didTruncate {
stderr.WriteString("...")
}
runErr := internal.RunTimeout(cmd, timeout)

errMessage = fmt.Sprintf(": %s", stderr.String())
}
return nil, fmt.Errorf("exec: %s for command '%s'%s", err, command, errMessage)
}
} else {
switch e.parser.(type) {
case *nagios.NagiosParser:
AddNagiosState(nil, acc)
}
out = removeCarriageReturns(out)
if stderr.Len() > 0 {
stderr = removeCarriageReturns(stderr)
stderr = truncate(stderr)
}

out = removeCarriageReturns(out)
return out.Bytes(), nil
return out.Bytes(), stderr.Bytes(), runErr
}

func truncate(buf bytes.Buffer) bytes.Buffer {
// Limit the number of bytes.
didTruncate := false
if buf.Len() > MaxStderrBytes {
buf.Truncate(MaxStderrBytes)
didTruncate = true
}
if i := bytes.IndexByte(buf.Bytes(), '\n'); i > 0 {
// Only show truncation if the newline wasn't the last character.
if i < buf.Len()-1 {
didTruncate = true
}
buf.Truncate(i)
}
if didTruncate {
buf.WriteString("...")
}
return buf
}

// removeCarriageReturns removes all carriage returns from the input if the
Expand Down Expand Up @@ -173,21 +143,31 @@ func removeCarriageReturns(b bytes.Buffer) bytes.Buffer {

func (e *Exec) ProcessCommand(command string, acc telegraf.Accumulator, wg *sync.WaitGroup) {
defer wg.Done()
_, isNagios := e.parser.(*nagios.NagiosParser)

out, err := e.runner.Run(e, command, acc)
if err != nil {
out, errbuf, runErr := e.runner.Run(command, e.Timeout.Duration)
if !isNagios && runErr != nil {
err := fmt.Errorf("exec: %s for command '%s': %s", runErr, command, string(errbuf))
acc.AddError(err)
return
}

metrics, err := e.parser.Parse(out)
if err != nil {
acc.AddError(err)
} else {
for _, metric := range metrics {
acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), metric.Time())
return
}

if isNagios {
metrics, err = nagios.TryAddState(runErr, metrics)
if err != nil {
log.Printf("E! [inputs.exec] failed to add nagios state: %s", err)
}
}

for _, m := range metrics {
acc.AddMetric(m)
}
}

func (e *Exec) SampleConfig() string {
Expand Down
87 changes: 73 additions & 14 deletions plugins/inputs/exec/exec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import (
"fmt"
"runtime"
"testing"
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/parsers"

"github.com/influxdata/telegraf/testutil"
Expand Down Expand Up @@ -74,22 +74,21 @@ var crTests = []CarriageReturnTest{
}

type runnerMock struct {
out []byte
err error
out []byte
errout []byte
err error
}

func newRunnerMock(out []byte, err error) Runner {
func newRunnerMock(out []byte, errout []byte, err error) Runner {
return &runnerMock{
out: out,
err: err,
out: out,
errout: errout,
err: err,
}
}

func (r runnerMock) Run(e *Exec, command string, acc telegraf.Accumulator) ([]byte, error) {
if r.err != nil {
return nil, r.err
}
return r.out, nil
func (r runnerMock) Run(command string, _ time.Duration) ([]byte, []byte, error) {
return r.out, r.errout, r.err
}

func TestExec(t *testing.T) {
Expand All @@ -98,7 +97,7 @@ func TestExec(t *testing.T) {
MetricName: "exec",
})
e := &Exec{
runner: newRunnerMock([]byte(validJson), nil),
runner: newRunnerMock([]byte(validJson), nil, nil),
Commands: []string{"testcommand arg1"},
parser: parser,
}
Expand Down Expand Up @@ -127,7 +126,7 @@ func TestExecMalformed(t *testing.T) {
MetricName: "exec",
})
e := &Exec{
runner: newRunnerMock([]byte(malformedJson), nil),
runner: newRunnerMock([]byte(malformedJson), nil, nil),
Commands: []string{"badcommand arg1"},
parser: parser,
}
Expand All @@ -143,7 +142,7 @@ func TestCommandError(t *testing.T) {
MetricName: "exec",
})
e := &Exec{
runner: newRunnerMock(nil, fmt.Errorf("exit status code 1")),
runner: newRunnerMock(nil, nil, fmt.Errorf("exit status code 1")),
Commands: []string{"badcommand"},
parser: parser,
}
Expand Down Expand Up @@ -201,6 +200,66 @@ func TestExecCommandWithoutGlobAndPath(t *testing.T) {
acc.AssertContainsFields(t, "metric", fields)
}

func TestTruncate(t *testing.T) {
tests := []struct {
name string
bufF func() *bytes.Buffer
expF func() *bytes.Buffer
}{
{
name: "should not truncate",
bufF: func() *bytes.Buffer {
var b bytes.Buffer
b.WriteString("hello world")
return &b
},
expF: func() *bytes.Buffer {
var b bytes.Buffer
b.WriteString("hello world")
return &b
},
},
{
name: "should truncate up to the new line",
bufF: func() *bytes.Buffer {
var b bytes.Buffer
b.WriteString("hello world\nand all the people")
return &b
},
expF: func() *bytes.Buffer {
var b bytes.Buffer
b.WriteString("hello world...")
return &b
},
},
{
name: "should truncate to the MaxStderrBytes",
bufF: func() *bytes.Buffer {
var b bytes.Buffer
for i := 0; i < 2*MaxStderrBytes; i++ {
b.WriteByte('b')
}
return &b
},
expF: func() *bytes.Buffer {
var b bytes.Buffer
for i := 0; i < MaxStderrBytes; i++ {
b.WriteByte('b')
}
b.WriteString("...")
return &b
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
res := truncate(*tt.bufF())
require.Equal(t, tt.expF().Bytes(), res.Bytes())
})
}
}

func TestRemoveCarriageReturns(t *testing.T) {
if runtime.GOOS == "windows" {
// Test that all carriage returns are removed
Expand Down
Loading

0 comments on commit 9bf7b22

Please sign in to comment.