Skip to content

Commit

Permalink
[Heartbeat] Limit jobs by type via Env Vars (#34307)
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewvc authored Feb 5, 2023
1 parent 23c2c42 commit ad651a7
Show file tree
Hide file tree
Showing 10 changed files with 161 additions and 44 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff]


*Heartbeat*
- Users can now configure max scheduler job limits per monitor type via env var. {pull}34307[34307]

- Remove host and port matching restrictions on hint-generated monitors. {pull}34376[34376]

Expand Down
18 changes: 15 additions & 3 deletions heartbeat/autodiscover/builder/hints/monitors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package hints

import (
"sort"
"testing"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -96,7 +97,7 @@ func TestGenerateHints(t *testing.T) {
result: mapstr.M{
"type": "http",
"schedule": "@every 5s",
"hosts": []interface{}{"1.2.3.4:8888", "1.2.3.4:9090"},
"hosts": []string{"1.2.3.4:8888", "1.2.3.4:9090"},
},
},
{
Expand Down Expand Up @@ -137,7 +138,7 @@ func TestGenerateHints(t *testing.T) {
len: 1,
result: mapstr.M{
"type": "http",
"hosts": []interface{}{"1.2.3.4:9090"},
"hosts": []string{"1.2.3.4:9090"},
"schedule": "@every 5s",
"processors": []interface{}{
map[string]interface{}{
Expand Down Expand Up @@ -170,7 +171,7 @@ func TestGenerateHints(t *testing.T) {
result: mapstr.M{
"type": "http",
"schedule": "@every 5s",
"hosts": []interface{}{"1.2.3.4:8888", "1.2.3.4:9090"},
"hosts": []string{"1.2.3.4:8888", "1.2.3.4:9090"},
},
},
{
Expand Down Expand Up @@ -209,6 +210,17 @@ func TestGenerateHints(t *testing.T) {
err := cfgs[0].Unpack(&config)
assert.Nil(t, err, test.message)

// Autodiscover can return configs with different sort orders here, which is irrelevant
// To make tests pass consistently we sort the host list
hostStrs := []string{}
if hostsSlice, ok := config["hosts"].([]interface{}); ok && len(hostsSlice) > 0 {
for _, hi := range hostsSlice {
hostStrs = append(hostStrs, hi.(string))
}
sort.Strings(hostStrs)
config["hosts"] = hostStrs
}

assert.Equal(t, test.result, config, test.message)
}

Expand Down
4 changes: 2 additions & 2 deletions heartbeat/beater/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ type Heartbeat struct {
done chan struct{}
stopOnce sync.Once
// config is used for iterating over elements of the config.
config config.Config
config *config.Config
scheduler *scheduler.Scheduler
monitorReloader *cfgfile.Reloader
monitorFactory *monitors.RunnerFactory
Expand All @@ -59,7 +59,7 @@ type Heartbeat struct {

// New creates a new heartbeat.
func New(b *beat.Beat, rawConfig *conf.C) (beat.Beater, error) {
parsedConfig := config.DefaultConfig
parsedConfig := config.DefaultConfig()
if err := rawConfig.Unpack(&parsedConfig); err != nil {
return nil, fmt.Errorf("error reading config file: %w", err)
}
Expand Down
36 changes: 30 additions & 6 deletions heartbeat/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,15 @@
package config

import (
"fmt"
"os"
"strconv"
"strings"

"github.com/elastic/beats/v7/libbeat/autodiscover"
"github.com/elastic/beats/v7/libbeat/processors/util"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
)

type LocationWithID struct {
Expand Down Expand Up @@ -53,10 +59,28 @@ type Scheduler struct {
}

// DefaultConfig is the canonical instantiation of Config.
var DefaultConfig = Config{
Jobs: map[string]*JobLimit{
"browser": {
Limit: 2,
},
},
func DefaultConfig() *Config {
limits := map[string]*JobLimit{
"browser": {Limit: 2},
}

// Read the env key SYNTHETICS_LIMIT_{TYPE} for each type of monitor to set scaling limits
// hard coded list of types to avoid cycles in current plugin system.
// TODO: refactor plugin system to DRY this up
for _, t := range []string{"http", "tcp", "icmp", "browser"} {
envKey := fmt.Sprintf("SYNTHETICS_LIMIT_%s", strings.ToUpper(t))
if limitStr := os.Getenv(envKey); limitStr != "" {
tLimitVal, err := strconv.ParseInt(limitStr, 10, 64)
if err != nil {
logp.L().Warnf("Could not parse job limit env var %s with value '%s' as integer", envKey, limitStr)
continue
}

limits[t] = &JobLimit{Limit: tLimitVal}
}
}

return &Config{
Jobs: limits,
}
}
82 changes: 79 additions & 3 deletions heartbeat/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,83 @@
// specific language governing permissions and limitations
// under the License.

//go:build !integration
// +build !integration

package config

import (
"os"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestDefaults(t *testing.T) {
cases := []struct {
Name string
EnvKey string
EnvVal string
LimitType string
LimitVal int64
}{
{
"Browser monitor override",
"SYNTHETICS_LIMIT_BROWSER",
"123",
"browser",
123,
},
{
"Browser default is 2 when other monitor is overridden",
"SYNTHETICS_LIMIT_HTTP",
"123",
"browser",
2,
},
{
"Browser default is 2 when nothing is overridden",
"FOO",
"bar",
"browser",
2,
},
{
"Browser default is 2 when bad value passed",
"SYNTHETICS_LIMIT_BROWSER",
"bar",
"browser",
2,
},
{
"HTTP monitor override",
"SYNTHETICS_LIMIT_HTTP",
"456",
"http",
456,
},
{
"TCP monitor override",
"SYNTHETICS_LIMIT_TCP",
"789",
"tcp",
789,
},
{
"ICMP monitor override",
"SYNTHETICS_LIMIT_ICMP",
"911",
"icmp",
911,
},
}

for _, c := range cases {
t.Run(c.Name, func(t *testing.T) {
os.Setenv(c.EnvKey, c.EnvVal)
defer os.Unsetenv(c.EnvKey)

dc := DefaultConfig()
require.NotNil(t, dc.Jobs[c.LimitType])
assert.Equal(t, dc.Jobs[c.LimitType].Limit, c.LimitVal)
})
}
}
2 changes: 2 additions & 0 deletions heartbeat/docs/heartbeat-scheduler.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,5 @@ heartbeat.jobs:

In the example, at any given time {beatname_uc} guarantees that only 10
concurrent `http` tasks and only 5 concurrent `browser` tasks will be active.

These limits can also be set via the environment variables `SYNTHETICS_LIMIT_{TYPE}`, where `{TYPE}` is one of `BROWSER`, `HTTP`, `TCP`, and `ICMP`.
36 changes: 18 additions & 18 deletions heartbeat/monitors/active/dialchain/dialers.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,12 @@ import (
//
// The dialer will update the active events with:
//
// {
// "tcp": {
// "port": ...,
// "rtt": { "connect": { "us": ... }}
// }
// }
// {
// "tcp": {
// "port": ...,
// "rtt": { "connect": { "us": ... }}
// }
// }
func TCPDialer(to time.Duration) NetDialer {
return CreateNetDialer(to)
}
Expand All @@ -56,12 +56,12 @@ func TCPDialer(to time.Duration) NetDialer {
//
// The dialer will update the active events with:
//
// {
// "udp": {
// "port": ...,
// "rtt": { "connect": { "us": ... }}
// }
// }
// {
// "udp": {
// "port": ...,
// "rtt": { "connect": { "us": ... }}
// }
// }
func UDPDialer(to time.Duration) NetDialer {
return CreateNetDialer(to)
}
Expand Down Expand Up @@ -106,13 +106,13 @@ func CreateNetDialer(timeout time.Duration) NetDialer {
}

end := time.Now()
eventext.MergeEventFields(event, mapstr.M{
namespace: mapstr.M{
"rtt": mapstr.M{
"connect": look.RTT(end.Sub(start)),
},
ef := mapstr.M{}
ef[namespace] = mapstr.M{
"rtt": mapstr.M{
"connect": look.RTT(end.Sub(start)),
},
})
}
eventext.MergeEventFields(event, ef)

return conn, nil
}), nil
Expand Down
10 changes: 5 additions & 5 deletions heartbeat/monitors/active/dialchain/socks5.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ import (
//
// The layer will update the active event with:
//
// {
// "socks5": {
// "rtt": { "connect": { "us": ... }}
// }
// }
// {
// "socks5": {
// "rtt": { "connect": { "us": ... }}
// }
// }
func SOCKS5Layer(config *transport.ProxyConfig) Layer {
return func(event *beat.Event, next transport.Dialer) (transport.Dialer, error) {
var timer timer
Expand Down
4 changes: 3 additions & 1 deletion heartbeat/monitors/active/http/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,9 @@ func parseBody(b interface{}) (positiveMatch, negativeMatch []match.Matcher, err
return positiveMatch, negativeMatch, errBodyIllegalBody
}

/* checkBody accepts 2 check types:
/*
checkBody accepts 2 check types:
1. positive
2. negative
So, there are 4 kinds of scenarios:
Expand Down
12 changes: 6 additions & 6 deletions heartbeat/monitors/active/http/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func respondingHTTPBodyChecks(body string) validator.Validator {
func respondingHTTPHeaderChecks() validator.Validator {
return lookslike.MustCompile(map[string]interface{}{
"http.response.headers": map[string]interface{}{
"Date": isdef.IsString,
"Date": isdef.Optional(isdef.IsString),
"Content-Length": isdef.Optional(isdef.IsString),
"Content-Type": isdef.Optional(isdef.IsString),
"Location": isdef.Optional(isdef.IsString),
Expand Down Expand Up @@ -261,12 +261,12 @@ func TestUpStatuses(t *testing.T) {

testslike.Test(
t,
lookslike.Strict(lookslike.Compose(
lookslike.Compose(
hbtest.BaseChecks("127.0.0.1", "up", "http"),
hbtest.RespondingTCPChecks(),
hbtest.SummaryChecks(1, 0),
respondingHTTPChecks(server.URL, "text/plain; charset=utf-8", status),
)),
),
event.Fields,
)
})
Expand Down Expand Up @@ -423,7 +423,7 @@ func TestJsonBody(t *testing.T) {
if tc.expression != "" {
jsonCheck["expression"] = tc.expression
}
if tc.condition != nil {
if len(tc.condition) > 0 {
jsonCheck["condition"] = tc.condition
}

Expand Down Expand Up @@ -687,7 +687,7 @@ func TestRedirect(t *testing.T) {

testslike.Test(
t,
lookslike.Strict(lookslike.Compose(
lookslike.Compose(
hbtest.BaseChecks("", "up", "http"),
hbtest.SummaryChecks(1, 0),
minimalRespondingHTTPChecks(testURL, "text/plain; charset=utf-8", 200),
Expand All @@ -701,7 +701,7 @@ func TestRedirect(t *testing.T) {
server.URL + redirectingPaths["/redirect_two"],
},
}),
)),
),
event.Fields,
)
}
Expand Down

0 comments on commit ad651a7

Please sign in to comment.