Skip to content

Commit

Permalink
gomaxprocs: Use a ticker (30s) to set gomaxprocs (#8278) (#8343)
Browse files Browse the repository at this point in the history
Adds a new goroutine which tries to refresh the gomaxprocs setting every
`30s`. This accounts for cases where the CFS quotas may be refreshed
without the APM Server process being restarted (like in ESS / ECE).

Signed-off-by: Marc Lopez Rubio <marc5.12@outlook.com>
(cherry picked from commit c61dd3a)

Co-authored-by: Marc Lopez Rubio <marc5.12@outlook.com>
  • Loading branch information
mergify[bot] and marclop authored Jun 9, 2022
1 parent d920cdd commit 7cd0489
Show file tree
Hide file tree
Showing 4 changed files with 144 additions and 12 deletions.
58 changes: 46 additions & 12 deletions beater/beater.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,22 +96,11 @@ func NewCreator(args CreatorParams) beat.Creator {
libbeatMonitoringRegistry: monitoring.Default.GetRegistry("libbeat"),
}

// Use `maxprocs` to change the GOMAXPROCS respecting any CFS quotas, if
// set. This is necessary since the Go runtime will default to the number
// of CPUs available in the machine it's running in, however, when running
// in a container or in a cgroup with resource limits, the disparity can be
// extreme.
// Having a significantly greater GOMAXPROCS set than the granted CFS quota
// results in a significant amount of time spent "throttling", essentially
// pausing the the running OS threads for the throttled period.
_, err := maxprocs.Set(maxprocs.Logger(logger.Infof))
if err != nil {
logger.Errorf("failed to set GOMAXPROCS: %v", err)
}
var elasticsearchOutputConfig *agentconfig.C
if hasElasticsearchOutput(b) {
elasticsearchOutputConfig = b.Config.Output.Config()
}
var err error
bt.config, err = config.NewConfig(bt.rawConfig, elasticsearchOutputConfig)
if err != nil {
return nil, err
Expand Down Expand Up @@ -164,6 +153,18 @@ func (bt *beater) Run(b *beat.Beat) error {
}

func (bt *beater) run(ctx context.Context, cancelContext context.CancelFunc, b *beat.Beat) error {
// Use `maxprocs` to change the GOMAXPROCS respecting any CFS quotas, if
// set. This is necessary since the Go runtime will default to the number
// of CPUs available in the machine it's running in, however, when running
// in a container or in a cgroup with resource limits, the disparity can be
// extreme.
// Having a significantly greater GOMAXPROCS set than the granted CFS quota
// results in a significant amount of time spent "throttling", essentially
// pausing the the running OS threads for the throttled period.
// Since the quotas may be updated without restarting the process, the
// GOMAXPROCS are adjusted every 30s.
go adjustMaxProcs(ctx, 30*time.Second, diffInfof(bt.logger), bt.logger.Errorf)

tracer, tracerServer, err := initTracing(b, bt.config, bt.logger)
if err != nil {
return err
Expand Down Expand Up @@ -1026,3 +1027,36 @@ func queryClusterUUID(ctx context.Context, esClient elasticsearch.Client) error
s.Set(response.ClusterUUID)
return nil
}

type logf func(string, ...interface{})

func adjustMaxProcs(ctx context.Context, d time.Duration, infof, errorf logf) error {
setMaxProcs := func() {
if _, err := maxprocs.Set(maxprocs.Logger(infof)); err != nil {
errorf("failed to set GOMAXPROCS: %v", err)
}
}
// set the gomaxprocs immediately.
setMaxProcs()
ticker := time.NewTicker(d)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
setMaxProcs()
}
}
}

func diffInfof(logger *logp.Logger) logf {
var last string
return func(format string, args ...interface{}) {
msg := fmt.Sprintf(format, args...)
if msg != last {
logger.Info(msg)
last = msg
}
}
}
57 changes: 57 additions & 0 deletions beater/beater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,63 @@ func TestQueryClusterUUIDRegistriesDoNotExist(t *testing.T) {
assert.Equal(t, clusterUUID, fs.Strings["cluster_uuid"])
}

func TestAdjustMaxProcsTickerRefresh(t *testing.T) {
// This test asserts that the GOMAXPROCS is called multiple times
// respecting the time.Duration that is passed in the function.
for _, maxP := range []int{2, 4, 8} {
t.Run(fmt.Sprintf("%d_GOMAXPROCS", maxP), func(t *testing.T) {
observedLogs := testAdjustMaxProcs(t, maxP, false)
assert.GreaterOrEqual(t, observedLogs.Len(), 10)
})
}
}

func TestAdjustMaxProcsTickerRefreshDiffLogger(t *testing.T) {
// This test asserts that the log messages aren't logged more than once.
for _, maxP := range []int{2, 4, 8} {
t.Run(fmt.Sprintf("%d_GOMAXPROCS", maxP), func(t *testing.T) {
observedLogs := testAdjustMaxProcs(t, maxP, true)
// Assert that only 1 message has been logged.
assert.Equal(t, observedLogs.Len(), 1)
})
}
}

func testAdjustMaxProcs(t *testing.T, maxP int, diffCore bool) *observer.ObservedLogs {
t.Setenv("GOMAXPROCS", fmt.Sprint(maxP))
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

core, observedLogs := observer.New(zapcore.DebugLevel)
logger := logp.NewLogger("", zap.WrapCore(func(in zapcore.Core) zapcore.Core {
return zapcore.NewTee(in, core)
}))

// Adjust maxprocs every 1ms.
refreshDuration := time.Millisecond
logFunc := logger.Infof
if diffCore {
logFunc = diffInfof(logger)
}

go adjustMaxProcs(ctx, refreshDuration, logFunc, logger.Errorf)

filterMsg := fmt.Sprintf(`maxprocs: Honoring GOMAXPROCS="%d"`, maxP)
for {
select {
// Wait for 50ms so adjustmaxprocs has had time to run a few times.
case <-time.After(50 * refreshDuration):
logs := observedLogs.FilterMessageSnippet(filterMsg)
if logs.Len() >= 1 {
return logs
}
case <-ctx.Done():
t.Error(ctx.Err())
return nil
}
}
}

type mockClusterUUIDClient struct {
ClusterUUID string `json:"cluster_uuid"`
}
Expand Down
37 changes: 37 additions & 0 deletions beater/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1001,6 +1001,43 @@ func TestServerPProf(t *testing.T) {
}
}

func TestServerGoMaxProcsLogMessage(t *testing.T) {
// Assert that the gomaxprocs library is called and use the
// log message that is printed as
for _, n := range []int{1, 2, 4} {
t.Run(fmt.Sprintf("%d_GOMAXPROCS", n), func(t *testing.T) {
t.Setenv("GOMAXPROCS", fmt.Sprint(n))

beat, cfg := newBeat(t, nil, nil, nil)
apm, err := newTestBeater(t, beat, cfg, nil)
require.NoError(t, err)
apm.start()
defer apm.Stop()

timeout := time.NewTimer(time.Second)
defer timeout.Stop()
for {
select {
case <-timeout.C:
t.Error("timed out waiting for log message, total logs observed:", apm.logs.Len())
for _, log := range apm.logs.All() {
t.Log(log.LoggerName, log.Message)
}
return
case <-time.After(time.Millisecond):
logs := apm.logs.FilterMessageSnippet(fmt.Sprintf(
`maxprocs: Honoring GOMAXPROCS="%d" as set in environment`, n,
))
if logs.Len() > 0 {
assert.Len(t, logs.All(), 1, "coundn't find gomaxprocs message logs")
return
}
}
}
})
}
}

type dummyOutputClient struct {
}

Expand Down
4 changes: 4 additions & 0 deletions changelogs/8.3.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,7 @@ https://github.com/elastic/apm-server/compare/8.2.2\...8.3.0[View commits]
- System, process, and well-defined runtime metrics are now sent to the shared `metrics-apm.internal-<namespace>` data stream {pull}7882[7882]
- Number of parallel bulk requests are now configurable via `output.elasticsearch.max_requests` {pull}8055[8055]
- OTLP/HTTP protocol is now supported {pull}8156[8156]

[float]
==== Performance improvements
- Limit the number of gomaxprocs when CPU limits have been set within a cgroup, yielding significant performance improvements, particularly when the APM Server experiences high traffic from a high number of APM agents {pull}8175[8175] {pull}8278[8278]

0 comments on commit 7cd0489

Please sign in to comment.