Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gomaxprocs: Use a ticker (30s) to set gomaxprocs #8278

Merged
merged 9 commits into from
Jun 8, 2022
58 changes: 46 additions & 12 deletions beater/beater.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,22 +96,11 @@ func NewCreator(args CreatorParams) beat.Creator {
libbeatMonitoringRegistry: monitoring.Default.GetRegistry("libbeat"),
}

// Use `maxprocs` to change the GOMAXPROCS respecting any CFS quotas, if
// set. This is necessary since the Go runtime will default to the number
// of CPUs available in the machine it's running in, however, when running
// in a container or in a cgroup with resource limits, the disparity can be
// extreme.
// Having a significantly greater GOMAXPROCS set than the granted CFS quota
// results in a significant amount of time spent "throttling", essentially
// pausing the the running OS threads for the throttled period.
_, err := maxprocs.Set(maxprocs.Logger(logger.Infof))
if err != nil {
logger.Errorf("failed to set GOMAXPROCS: %v", err)
}
var elasticsearchOutputConfig *agentconfig.C
if hasElasticsearchOutput(b) {
elasticsearchOutputConfig = b.Config.Output.Config()
}
var err error
bt.config, err = config.NewConfig(bt.rawConfig, elasticsearchOutputConfig)
if err != nil {
return nil, err
Expand Down Expand Up @@ -164,6 +153,18 @@ func (bt *beater) Run(b *beat.Beat) error {
}

func (bt *beater) run(ctx context.Context, cancelContext context.CancelFunc, b *beat.Beat) error {
// Use `maxprocs` to change the GOMAXPROCS respecting any CFS quotas, if
// set. This is necessary since the Go runtime will default to the number
// of CPUs available in the machine it's running in, however, when running
// in a container or in a cgroup with resource limits, the disparity can be
// extreme.
// Having a significantly greater GOMAXPROCS set than the granted CFS quota
// results in a significant amount of time spent "throttling", essentially
// pausing the the running OS threads for the throttled period.
// Since the quotas may be updated without restarting the process, the
// GOMAXPROCS are adjusted every 30s.
go adjustMaxProcs(ctx, 30*time.Second, diffInfof(bt.logger), bt.logger.Errorf)

tracer, tracerServer, err := initTracing(b, bt.config, bt.logger)
if err != nil {
return err
Expand Down Expand Up @@ -1026,3 +1027,36 @@ func queryClusterUUID(ctx context.Context, esClient elasticsearch.Client) error
s.Set(response.ClusterUUID)
return nil
}

type logf func(string, ...interface{})

func adjustMaxProcs(ctx context.Context, d time.Duration, infof, errorf logf) error {
setMaxProcs := func() {
if _, err := maxprocs.Set(maxprocs.Logger(infof)); err != nil {
errorf("failed to set GOMAXPROCS: %v", err)
}
}
// set the gomaxprocs immediately.
setMaxProcs()
ticker := time.NewTicker(d)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
setMaxProcs()
}
}
}

func diffInfof(logger *logp.Logger) logf {
var last string
return func(format string, args ...interface{}) {
msg := fmt.Sprintf(format, args...)
if msg != last {
logger.Info(msg)
last = msg
}
}
}
57 changes: 57 additions & 0 deletions beater/beater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,63 @@ func TestQueryClusterUUIDRegistriesDoNotExist(t *testing.T) {
assert.Equal(t, clusterUUID, fs.Strings["cluster_uuid"])
}

func TestAdjustMaxProcsTickerRefresh(t *testing.T) {
// This test asserts that the GOMAXPROCS is called multiple times
// respecting the time.Duration that is passed in the function.
for _, maxP := range []int{2, 4, 8} {
t.Run(fmt.Sprintf("%d_GOMAXPROCS", maxP), func(t *testing.T) {
observedLogs := testAdjustMaxProcs(t, maxP, false)
assert.GreaterOrEqual(t, observedLogs.Len(), 10)
})
}
}

func TestAdjustMaxProcsTickerRefreshDiffLogger(t *testing.T) {
// This test asserts that the log messages aren't logged more than once.
for _, maxP := range []int{2, 4, 8} {
t.Run(fmt.Sprintf("%d_GOMAXPROCS", maxP), func(t *testing.T) {
observedLogs := testAdjustMaxProcs(t, maxP, true)
// Assert that only 1 message has been logged.
assert.Equal(t, observedLogs.Len(), 1)
})
}
}

func testAdjustMaxProcs(t *testing.T, maxP int, diffCore bool) *observer.ObservedLogs {
t.Setenv("GOMAXPROCS", fmt.Sprint(maxP))
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

core, observedLogs := observer.New(zapcore.DebugLevel)
logger := logp.NewLogger("", zap.WrapCore(func(in zapcore.Core) zapcore.Core {
return zapcore.NewTee(in, core)
}))

// Adjust maxprocs every 1ms.
refreshDuration := time.Millisecond
logFunc := logger.Infof
if diffCore {
logFunc = diffInfof(logger)
}

go adjustMaxProcs(ctx, refreshDuration, logFunc, logger.Errorf)

filterMsg := fmt.Sprintf(`maxprocs: Honoring GOMAXPROCS="%d"`, maxP)
for {
select {
// Wait for 50ms so adjustmaxprocs has had time to run a few times.
case <-time.After(50 * refreshDuration):
logs := observedLogs.FilterMessageSnippet(filterMsg)
if logs.Len() >= 1 {
return logs
}
case <-ctx.Done():
t.Error(ctx.Err())
return nil
}
}
}

type mockClusterUUIDClient struct {
ClusterUUID string `json:"cluster_uuid"`
}
Expand Down
37 changes: 37 additions & 0 deletions beater/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1000,6 +1000,43 @@ func TestServerPProf(t *testing.T) {
}
}

func TestServerGoMaxProcsLogMessage(t *testing.T) {
// Assert that the gomaxprocs library is called and use the
// log message that is printed as
for _, n := range []int{1, 2, 4} {
t.Run(fmt.Sprintf("%d_GOMAXPROCS", n), func(t *testing.T) {
t.Setenv("GOMAXPROCS", fmt.Sprint(n))

beat, cfg := newBeat(t, nil, nil, nil)
apm, err := newTestBeater(t, beat, cfg, nil)
require.NoError(t, err)
apm.start()
defer apm.Stop()

timeout := time.NewTimer(time.Second)
defer timeout.Stop()
for {
select {
case <-timeout.C:
t.Error("timed out waiting for log message, total logs observed:", apm.logs.Len())
for _, log := range apm.logs.All() {
t.Log(log.LoggerName, log.Message)
}
return
case <-time.After(time.Millisecond):
logs := apm.logs.FilterMessageSnippet(fmt.Sprintf(
`maxprocs: Honoring GOMAXPROCS="%d" as set in environment`, n,
))
if logs.Len() > 0 {
assert.Len(t, logs.All(), 1, "coundn't find gomaxprocs message logs")
return
}
}
}
})
}
}

type dummyOutputClient struct {
}

Expand Down