Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Heartbeat] Add browser monitor timeout #32434

Merged
merged 14 commits into from
Jul 27, 2022
Merged
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff]
- Send targetted error message for unexpected synthetics exits. {pull}31936[31936]
- Reduced memory usage slightly for browser monitors. {pull}32317[32317]
- Automatically kill zombie-ish node processes. {pull}32393[32393]
- Added timeout for browser monitors. {pull}32434[32434]

*Metricbeat*

Expand Down
9 changes: 9 additions & 0 deletions heartbeat/ecserr/ecserr.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package ecserr

import (
"fmt"
"time"
)

// ECSErr represents an error per the ECS specification
Expand Down Expand Up @@ -72,3 +73,11 @@ func NewBadCmdStatusErr(exitCode int, cmd string) *ECSErr {
fmt.Sprintf("command '%s' exited unexpectedly with code: %d", cmd, exitCode),
)
}

func NewCmdTimeoutStatusErr(timeout time.Duration, cmd string) *ECSErr {
return NewECSErr(
ETYPE_IO,
"CMD_TIMEOUT",
fmt.Sprintf("command '%s' did not exit before extended timeout: %s", cmd, timeout.String()),
)
}
18 changes: 0 additions & 18 deletions x-pack/heartbeat/monitors.d/browser-inline.yml

This file was deleted.

11 changes: 0 additions & 11 deletions x-pack/heartbeat/monitors.d/todos.yml.disabled

This file was deleted.

3 changes: 3 additions & 0 deletions x-pack/heartbeat/monitors/browser/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package browser

import (
"fmt"
"time"

"github.com/elastic/beats/v7/x-pack/heartbeat/monitors/browser/source"
"github.com/elastic/beats/v7/x-pack/heartbeat/monitors/browser/synthexec"
Expand All @@ -16,6 +17,7 @@ func DefaultConfig() *Config {
return &Config{
Sandbox: false,
Screenshots: "on",
Timeout: 15 * time.Minute,
}
}

Expand All @@ -35,6 +37,7 @@ type Config struct {
PlaywrightOpts map[string]interface{} `config:"playwright_options"`
FilterJourneys synthexec.FilterJourneyConfig `config:"filter_journeys"`
IgnoreHTTPSErrors bool `config:"ignore_https_errors"`
Timeout time.Duration `config:"timeout"`
}

var ErrNameRequired = fmt.Errorf("config 'name' must be specified for this monitor")
Expand Down
7 changes: 5 additions & 2 deletions x-pack/heartbeat/monitors/browser/project.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"context"
"encoding/json"
"fmt"
"time"

"github.com/elastic/beats/v7/heartbeat/monitors/jobs"
"github.com/elastic/beats/v7/heartbeat/monitors/plugin"
Expand Down Expand Up @@ -125,16 +126,18 @@ func (p *Project) extraArgs() []string {
func (p *Project) jobs() []jobs.Job {
var j jobs.Job
isScript := p.projectCfg.Source.Inline != nil
ctx := context.WithValue(context.Background(), synthexec.SynthexecTimeout, p.projectCfg.Timeout+30*time.Second)

if isScript {
src := p.projectCfg.Source.Inline.Script
j = synthexec.InlineJourneyJob(context.TODO(), src, p.Params(), p.StdFields(), p.extraArgs()...)
j = synthexec.InlineJourneyJob(ctx, src, p.Params(), p.StdFields(), p.extraArgs()...)
} else {
j = func(event *beat.Event) ([]jobs.Job, error) {
err := p.Fetch()
if err != nil {
return nil, fmt.Errorf("could not fetch for project job: %w", err)
}
sj, err := synthexec.ProjectJob(context.TODO(), p.Workdir(), p.Params(), p.FilterJourneys(), p.StdFields(), p.extraArgs()...)
sj, err := synthexec.ProjectJob(ctx, p.Workdir(), p.Params(), p.FilterJourneys(), p.StdFields(), p.extraArgs()...)
if err != nil {
return nil, err
}
Expand Down
22 changes: 22 additions & 0 deletions x-pack/heartbeat/monitors/browser/project_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
)

func TestValidLocal(t *testing.T) {
timeout := 30
_, filename, _, _ := runtime.Caller(0)
path := path.Join(filepath.Dir(filename), "source/fixtures/todos")
testParams := map[string]interface{}{
Expand All @@ -40,6 +41,7 @@ func TestValidLocal(t *testing.T) {
"path": path,
},
},
"timeout": timeout,
})
s, e := NewProject(cfg)
require.NoError(t, e)
Expand All @@ -57,6 +59,7 @@ func TestValidLocal(t *testing.T) {
}

func TestValidInline(t *testing.T) {
timeout := 30
script := "a script"
testParams := map[string]interface{}{
"key1": "value1",
Expand All @@ -71,6 +74,7 @@ func TestValidInline(t *testing.T) {
"script": script,
},
},
"timeout": timeout,
})
s, e := NewProject(cfg)
require.NoError(t, e)
Expand Down Expand Up @@ -211,3 +215,21 @@ func TestExtraArgs(t *testing.T) {
})
}
}

func TestEmptyTimeout(t *testing.T) {
defaults := DefaultConfig()
cfg := conf.MustNewConfigFrom(mapstr.M{
"name": "My Name",
"id": "myId",
"source": mapstr.M{
"inline": mapstr.M{
"script": "script",
},
},
})
s, e := NewProject(cfg)

require.NoError(t, e)
require.NotNil(t, s)
require.Equal(t, s.projectCfg.Timeout, defaults.Timeout)
}
23 changes: 21 additions & 2 deletions x-pack/heartbeat/monitors/browser/synthexec/synthexec.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ type FilterJourneyConfig struct {
// where these are unsupported
var platformCmdMutate func(*exec.Cmd) = func(*exec.Cmd) {}

var SynthexecTimeout struct{}

// ProjectJob will run a single journey by name from the given project.
func ProjectJob(ctx context.Context, projectPath string, params mapstr.M, filterJourneys FilterJourneyConfig, fields stdfields.StdMonitorFields, extraArgs ...string) (jobs.Job, error) {
// Run the command in the given projectPath, use '.' as the first arg since the command runs
Expand Down Expand Up @@ -241,9 +243,17 @@ func runCmd(
return nil, err
}

// Kill the process if the context ends
// Get timeout from parent ctx
timeout, _ := ctx.Value(SynthexecTimeout).(time.Duration)
ctx, cancel := context.WithTimeout(ctx, timeout)
go func() {
<-ctx.Done()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we discussed the kill/warning is a problem since this will happen even during runs that are not broken. I think the simplest way to do that would be to define an atomic bool that flips when cmd.Wait returns and check that here.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than use a switch, which introduces a (small) state dependency between the two routines, I'd rather check the exit status with if !cmd.ProcessState.Exited() { // kill and log error}. wdyt?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't realize that existed, that's perfect

// ProcessState can be null if it hasn't reported back yet
if cmd.ProcessState != nil && cmd.ProcessState.Exited() {
return
}

err := cmd.Process.Kill()
if err != nil {
logp.Warn("could not kill synthetics process: %s", err)
Expand All @@ -258,8 +268,16 @@ func runCmd(

var cmdError *SynthError = nil
if err != nil {
cmdError = ECSErrToSynthError(ecserr.NewBadCmdStatusErr(cmd.ProcessState.ExitCode(), loggableCmd.String()))
// err could be generic or it could have been killed by context timeout, log and check context
// to decide which error to stream
logp.Warn("Error executing command '%s' (%d): %s", loggableCmd.String(), cmd.ProcessState.ExitCode(), err)

if errors.Is(ctx.Err(), context.DeadlineExceeded) {
timeout, _ := ctx.Value(SynthexecTimeout).(time.Duration)
cmdError = ECSErrToSynthError(ecserr.NewCmdTimeoutStatusErr(timeout, loggableCmd.String()))
} else {
cmdError = ECSErrToSynthError(ecserr.NewBadCmdStatusErr(cmd.ProcessState.ExitCode(), loggableCmd.String()))
}
}

mpx.writeSynthEvent(&SynthEvent{
Expand All @@ -270,6 +288,7 @@ func runCmd(

wg.Wait()
mpx.Close()
cancel()
}()

return mpx, nil
Expand Down
23 changes: 19 additions & 4 deletions x-pack/heartbeat/monitors/browser/synthexec/synthexec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func TestRunCmd(t *testing.T) {
cmd := exec.Command("go", "run", "./main.go")

stdinStr := "MY_STDIN"
synthEvents := runAndCollect(t, cmd, stdinStr)
synthEvents := runAndCollect(t, cmd, stdinStr, 15*time.Minute)

t.Run("has echo'd stdin to stdout", func(t *testing.T) {
stdoutEvents := eventsWithType(Stdout, synthEvents)
Expand Down Expand Up @@ -133,7 +133,7 @@ func TestRunCmd(t *testing.T) {

func TestRunBadExitCodeCmd(t *testing.T) {
cmd := exec.Command("go", "run", "./main.go", "exit")
synthEvents := runAndCollect(t, cmd, "")
synthEvents := runAndCollect(t, cmd, "", 15*time.Minute)

// go run outputs "exit status 123" to stderr so we have two messages
require.Len(t, synthEvents, 2)
Expand All @@ -149,11 +149,26 @@ func TestRunBadExitCodeCmd(t *testing.T) {
})
}

func runAndCollect(t *testing.T, cmd *exec.Cmd, stdinStr string) []*SynthEvent {
func TestRunTimeoutExitCodeCmd(t *testing.T) {
cmd := exec.Command("go", "run", "./main.go")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this test, are we sure that there isn't a race here? I wonder if on fast systems that might execute fast enough to be flaky. It might be safer to add a sleep into that go program.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a small timeout to the executable just in case

synthEvents := runAndCollect(t, cmd, "", 0*time.Second)

// go run should not produce any additional stderr output in this case
require.Len(t, synthEvents, 1)

t.Run("has a cmd status event", func(t *testing.T) {
stdoutEvents := eventsWithType(CmdStatus, synthEvents)
require.Len(t, stdoutEvents, 1)
require.Equal(t, synthEvents[0].Error.Code, "CMD_TIMEOUT")
})
}

func runAndCollect(t *testing.T, cmd *exec.Cmd, stdinStr string, cmdTimeout time.Duration) []*SynthEvent {
_, filename, _, _ := runtime.Caller(0)
cmd.Dir = path.Join(filepath.Dir(filename), "testcmd")
ctx := context.WithValue(context.TODO(), SynthexecTimeout, cmdTimeout)

mpx, err := runCmd(context.TODO(), cmd, &stdinStr, nil, FilterJourneyConfig{})
mpx, err := runCmd(ctx, cmd, &stdinStr, nil, FilterJourneyConfig{})
require.NoError(t, err)

var synthEvents []*SynthEvent
Expand Down
2 changes: 2 additions & 0 deletions x-pack/heartbeat/monitors/browser/synthexec/testcmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
)

func main() {
//Sleep first to test timeout feature async
time.Sleep(time.Millisecond * 500)
// For sending JSON results
pipe := os.NewFile(3, "pipe")

Expand Down