diff --git a/internal/internal.go b/internal/internal.go index 4b8e1536ff86f..bef977ea3c987 100644 --- a/internal/internal.go +++ b/internal/internal.go @@ -2,13 +2,16 @@ package internal import ( "bufio" + "bytes" "crypto/rand" "crypto/tls" "crypto/x509" "errors" "fmt" "io/ioutil" + "log" "os" + "os/exec" "strings" "time" "unicode" @@ -16,6 +19,12 @@ import ( const alphanum string = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz" +var ( + TimeoutErr = errors.New("Command timed out.") + + NotImplementedError = errors.New("not implemented yet") +) + // Duration just wraps time.Duration type Duration struct { Duration time.Duration @@ -33,8 +42,6 @@ func (d *Duration) UnmarshalTOML(b []byte) error { return nil } -var NotImplementedError = errors.New("not implemented yet") - // ReadLines reads contents from a file and splits them by new lines. // A convenience wrapper to ReadLinesOffsetN(filename, 0, -1). func ReadLines(filename string) ([]string, error) { @@ -139,3 +146,45 @@ func SnakeCase(in string) string { return string(out) } + +// CombinedOutputTimeout runs the given command with the given timeout and +// returns the combined output of stdout and stderr. +// If the command times out, it attempts to kill the process. +func CombinedOutputTimeout(c *exec.Cmd, timeout time.Duration) ([]byte, error) { + var b bytes.Buffer + c.Stdout = &b + c.Stderr = &b + if err := c.Start(); err != nil { + return nil, err + } + err := WaitTimeout(c, timeout) + return b.Bytes(), err +} + +// RunTimeout runs the given command with the given timeout. +// If the command times out, it attempts to kill the process. +func RunTimeout(c *exec.Cmd, timeout time.Duration) error { + if err := c.Start(); err != nil { + return err + } + return WaitTimeout(c, timeout) +} + +// WaitTimeout waits for the given command to finish with a timeout. +// It assumes the command has already been started. +// If the command times out, it attempts to kill the process. +func WaitTimeout(c *exec.Cmd, timeout time.Duration) error { + timer := time.NewTimer(timeout) + done := make(chan error) + go func() { done <- c.Wait() }() + select { + case err := <-done: + timer.Stop() + return err + case <-timer.C: + if err := c.Process.Kill(); err != nil { + log.Printf("FATAL error killing process: %s", err) + } + return TimeoutErr + } +} diff --git a/internal/internal_test.go b/internal/internal_test.go index 7ff64e87bb0fe..3d260d529336e 100644 --- a/internal/internal_test.go +++ b/internal/internal_test.go @@ -1,6 +1,12 @@ package internal -import "testing" +import ( + "os/exec" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) type SnakeTest struct { input string @@ -30,3 +36,53 @@ func TestSnakeCase(t *testing.T) { } } } + +func TestRunTimeout(t *testing.T) { + cmd := exec.Command("sleep", "10") + start := time.Now() + err := RunTimeout(cmd, time.Millisecond*20) + elapsed := time.Since(start) + + assert.Equal(t, TimeoutErr, err) + // Verify that command gets killed in 20ms, with some breathing room + assert.True(t, elapsed < time.Millisecond*75) +} + +func TestCombinedOutputTimeout(t *testing.T) { + cmd := exec.Command("sleep", "10") + start := time.Now() + _, err := CombinedOutputTimeout(cmd, time.Millisecond*20) + elapsed := time.Since(start) + + assert.Equal(t, TimeoutErr, err) + // Verify that command gets killed in 20ms, with some breathing room + assert.True(t, elapsed < time.Millisecond*75) +} + +func TestCombinedOutput(t *testing.T) { + cmd := exec.Command("echo", "foo") + out, err := CombinedOutputTimeout(cmd, time.Second) + + assert.NoError(t, err) + assert.Equal(t, "foo\n", string(out)) +} + +// test that CombinedOutputTimeout and exec.Cmd.CombinedOutput return +// the same output from a failed command. +func TestCombinedOutputError(t *testing.T) { + cmd := exec.Command("sleep", "foo") + expected, err := cmd.CombinedOutput() + + cmd2 := exec.Command("sleep", "foo") + actual, err := CombinedOutputTimeout(cmd2, time.Second) + + assert.Error(t, err) + assert.Equal(t, expected, actual) +} + +func TestRunError(t *testing.T) { + cmd := exec.Command("sleep", "foo") + err := RunTimeout(cmd, time.Second) + + assert.Error(t, err) +} diff --git a/plugins/inputs/exec/exec.go b/plugins/inputs/exec/exec.go index d2e09ccd06957..c1b2092e84269 100644 --- a/plugins/inputs/exec/exec.go +++ b/plugins/inputs/exec/exec.go @@ -6,10 +6,12 @@ import ( "os/exec" "sync" "syscall" + "time" "github.com/gonuts/go-shellquote" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/parsers" "github.com/influxdata/telegraf/plugins/parsers/nagios" @@ -19,6 +21,9 @@ const sampleConfig = ` ## Commands array commands = ["/tmp/test.sh", "/usr/bin/mycollector --foo=bar"] + ## Timeout for each command to complete. + timeout = "5s" + ## measurement name suffix (for separating different commands) name_suffix = "_mycollector" @@ -32,6 +37,7 @@ const sampleConfig = ` type Exec struct { Commands []string Command string + Timeout internal.Duration parser parsers.Parser @@ -43,7 +49,8 @@ type Exec struct { func NewExec() *Exec { return &Exec{ - runner: CommandRunner{}, + runner: CommandRunner{}, + Timeout: internal.Duration{Duration: time.Second * 5}, } } @@ -73,7 +80,11 @@ func AddNagiosState(exitCode error, acc telegraf.Accumulator) error { return nil } -func (c CommandRunner) Run(e *Exec, command string, acc telegraf.Accumulator) ([]byte, error) { +func (c CommandRunner) Run( + e *Exec, + command string, + acc telegraf.Accumulator, +) ([]byte, error) { split_cmd, err := shellquote.Split(command) if err != nil || len(split_cmd) == 0 { return nil, fmt.Errorf("exec: unable to parse command, %s", err) @@ -84,7 +95,7 @@ func (c CommandRunner) Run(e *Exec, command string, acc telegraf.Accumulator) ([ var out bytes.Buffer cmd.Stdout = &out - if err := cmd.Run(); err != nil { + if err := internal.RunTimeout(cmd, e.Timeout.Duration); err != nil { switch e.parser.(type) { case *nagios.NagiosParser: AddNagiosState(err, acc) diff --git a/plugins/inputs/ipmi_sensor/command.go b/plugins/inputs/ipmi_sensor/command.go index 353c27d36dd65..76374c49487e0 100644 --- a/plugins/inputs/ipmi_sensor/command.go +++ b/plugins/inputs/ipmi_sensor/command.go @@ -1,10 +1,12 @@ package ipmi_sensor import ( - "bytes" "fmt" "os/exec" "strings" + "time" + + "github.com/influxdata/telegraf/internal" ) type CommandRunner struct{} @@ -18,21 +20,16 @@ func (t CommandRunner) cmd(conn *Connection, args ...string) *exec.Cmd { } return exec.Command(path, opts...) - } func (t CommandRunner) Run(conn *Connection, args ...string) (string, error) { cmd := t.cmd(conn, args...) - var stdout bytes.Buffer - var stderr bytes.Buffer - cmd.Stdout = &stdout - cmd.Stderr = &stderr - err := cmd.Run() + output, err := internal.CombinedOutputTimeout(cmd, time.Second*5) if err != nil { return "", fmt.Errorf("run %s %s: %s (%s)", - cmd.Path, strings.Join(cmd.Args, " "), stderr.String(), err) + cmd.Path, strings.Join(cmd.Args, " "), string(output), err) } - return stdout.String(), err + return string(output), err } diff --git a/plugins/inputs/leofs/leofs.go b/plugins/inputs/leofs/leofs.go index f4910ad0cdb61..06c71e932ace2 100644 --- a/plugins/inputs/leofs/leofs.go +++ b/plugins/inputs/leofs/leofs.go @@ -3,13 +3,16 @@ package leofs import ( "bufio" "fmt" - "github.com/influxdata/telegraf" - "github.com/influxdata/telegraf/plugins/inputs" "net/url" "os/exec" "strconv" "strings" "sync" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/plugins/inputs" ) const oid = ".1.3.6.1.4.1.35450" @@ -175,14 +178,18 @@ func (l *LeoFS) Gather(acc telegraf.Accumulator) error { return outerr } -func (l *LeoFS) gatherServer(endpoint string, serverType ServerType, acc telegraf.Accumulator) error { +func (l *LeoFS) gatherServer( + endpoint string, + serverType ServerType, + acc telegraf.Accumulator, +) error { cmd := exec.Command("snmpwalk", "-v2c", "-cpublic", endpoint, oid) stdout, err := cmd.StdoutPipe() if err != nil { return err } cmd.Start() - defer cmd.Wait() + defer internal.WaitTimeout(cmd, time.Second*5) scanner := bufio.NewScanner(stdout) if !scanner.Scan() { return fmt.Errorf("Unable to retrieve the node name") diff --git a/plugins/inputs/ping/ping.go b/plugins/inputs/ping/ping.go index 6c26acb8a08ae..ff05f684fa37e 100644 --- a/plugins/inputs/ping/ping.go +++ b/plugins/inputs/ping/ping.go @@ -4,20 +4,23 @@ package ping import ( "errors" + "fmt" "os/exec" "runtime" "strconv" "strings" "sync" + "time" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/inputs" ) // HostPinger is a function that runs the "ping" function using a list of // passed arguments. This can be easily switched with a mocked ping function // for unit test purposes (see ping_test.go) -type HostPinger func(args ...string) (string, error) +type HostPinger func(timeout float64, args ...string) (string, error) type Ping struct { // Interval at which to ping (ping -i ) @@ -74,7 +77,7 @@ func (p *Ping) Gather(acc telegraf.Accumulator) error { go func(u string) { defer wg.Done() args := p.args(u) - out, err := p.pingHost(args...) + out, err := p.pingHost(p.Timeout, args...) if err != nil { // Combine go err + stderr output errorChannel <- errors.New( @@ -116,13 +119,15 @@ func (p *Ping) Gather(acc telegraf.Accumulator) error { return errors.New(strings.Join(errorStrings, "\n")) } -func hostPinger(args ...string) (string, error) { +func hostPinger(timeout float64, args ...string) (string, error) { bin, err := exec.LookPath("ping") if err != nil { return "", err } + fmt.Println(args) c := exec.Command(bin, args...) - out, err := c.CombinedOutput() + out, err := internal.CombinedOutputTimeout(c, + time.Second*time.Duration(timeout+1)) return string(out), err } diff --git a/plugins/inputs/ping/ping_test.go b/plugins/inputs/ping/ping_test.go index cd61a4fb2161f..25ecdf2fae777 100644 --- a/plugins/inputs/ping/ping_test.go +++ b/plugins/inputs/ping/ping_test.go @@ -124,7 +124,7 @@ func TestArgs(t *testing.T) { "Expected: %s Actual: %s", expected, actual) } -func mockHostPinger(args ...string) (string, error) { +func mockHostPinger(timeout float64, args ...string) (string, error) { return linuxPingOutput, nil } @@ -161,7 +161,7 @@ PING www.google.com (216.58.218.164) 56(84) bytes of data. rtt min/avg/max/mdev = 35.225/44.033/51.806/5.325 ms ` -func mockLossyHostPinger(args ...string) (string, error) { +func mockLossyHostPinger(timeout float64, args ...string) (string, error) { return lossyPingOutput, nil } @@ -192,7 +192,7 @@ Request timeout for icmp_seq 0 2 packets transmitted, 0 packets received, 100.0% packet loss ` -func mockErrorHostPinger(args ...string) (string, error) { +func mockErrorHostPinger(timeout float64, args ...string) (string, error) { return errorPingOutput, errors.New("No packets received") } @@ -215,7 +215,7 @@ func TestBadPingGather(t *testing.T) { acc.AssertContainsTaggedFields(t, "ping", fields, tags) } -func mockFatalHostPinger(args ...string) (string, error) { +func mockFatalHostPinger(timeout float64, args ...string) (string, error) { return fatalPingOutput, errors.New("So very bad") } diff --git a/plugins/inputs/sysstat/sysstat.go b/plugins/inputs/sysstat/sysstat.go index c8c17ac45d5e5..c55516716f558 100644 --- a/plugins/inputs/sysstat/sysstat.go +++ b/plugins/inputs/sysstat/sysstat.go @@ -17,6 +17,7 @@ import ( "time" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/inputs" ) @@ -98,31 +99,34 @@ var sampleConfig = ` # group = true # # - ## Options for the sadf command. The values on the left represent the sadf options and - ## the values on the right their description (wich are used for grouping and prefixing metrics). + ## Options for the sadf command. The values on the left represent the sadf + ## options and the values on the right their description (wich are used for + ## grouping and prefixing metrics). ## - ## Run 'sar -h' or 'man sar' to find out the supported options for your sysstat version. + ## Run 'sar -h' or 'man sar' to find out the supported options for your + ## sysstat version. [inputs.sysstat.options] - -C = "cpu" - -B = "paging" - -b = "io" - -d = "disk" # requires DISK activity - "-n ALL" = "network" - "-P ALL" = "per_cpu" - -q = "queue" - -R = "mem" - -r = "mem_util" - -S = "swap_util" - -u = "cpu_util" - -v = "inode" - -W = "swap" - -w = "task" - # -H = "hugepages" # only available for newer linux distributions - # "-I ALL" = "interrupts" # requires INT activity + -C = "cpu" + -B = "paging" + -b = "io" + -d = "disk" # requires DISK activity + "-n ALL" = "network" + "-P ALL" = "per_cpu" + -q = "queue" + -R = "mem" + -r = "mem_util" + -S = "swap_util" + -u = "cpu_util" + -v = "inode" + -W = "swap" + -w = "task" + # -H = "hugepages" # only available for newer linux distributions + # "-I ALL" = "interrupts" # requires INT activity # # - ## Device tags can be used to add additional tags for devices. For example the configuration below - ## adds a tag vg with value rootvg for all metrics with sda devices. + ## Device tags can be used to add additional tags for devices. + ## For example the configuration below adds a tag vg with value rootvg for + ## all metrics with sda devices. # [[inputs.sysstat.device_tags.sda]] # vg = "rootvg" ` @@ -174,24 +178,28 @@ func (s *Sysstat) Gather(acc telegraf.Accumulator) error { return errors.New(strings.Join(errorStrings, "\n")) } -// collect collects sysstat data with the collector utility sadc. It runs the following command: +// collect collects sysstat data with the collector utility sadc. +// It runs the following command: // Sadc -S -S ... 2 tmpFile -// The above command collects system metrics during and saves it in binary form to tmpFile. +// The above command collects system metrics during and +// saves it in binary form to tmpFile. func (s *Sysstat) collect() error { options := []string{} for _, act := range s.Activities { options = append(options, "-S", act) } s.tmpFile = path.Join("/tmp", fmt.Sprintf("sysstat-%d", time.Now().Unix())) - collectInterval := s.interval - parseInterval // collectInterval has to be smaller than the telegraf data collection interval + // collectInterval has to be smaller than the telegraf data collection interval + collectInterval := s.interval - parseInterval - if collectInterval < 0 { // If true, interval is not defined yet and Gather is run for the first time. + // If true, interval is not defined yet and Gather is run for the first time. + if collectInterval < 0 { collectInterval = 1 // In that case we only collect for 1 second. } options = append(options, strconv.Itoa(collectInterval), "2", s.tmpFile) cmd := execCommand(s.Sadc, options...) - out, err := cmd.CombinedOutput() + out, err := internal.CombinedOutputTimeout(cmd, time.Second*5) if err != nil { return fmt.Errorf("failed to run command %s: %s", strings.Join(cmd.Args, " "), string(out)) } @@ -279,8 +287,9 @@ func (s *Sysstat) parse(acc telegraf.Accumulator, option string, ts time.Time) e acc.AddFields(measurement, v.fields, v.tags, ts) } } - if err := cmd.Wait(); err != nil { - return fmt.Errorf("command %s failed with %s", strings.Join(cmd.Args, " "), err) + if err := internal.WaitTimeout(cmd, time.Second*5); err != nil { + return fmt.Errorf("command %s failed with %s", + strings.Join(cmd.Args, " "), err) } return nil }