From 7cd04898a161d325c6141d87dfde3ff334fbfd7f Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Thu, 9 Jun 2022 08:15:31 +0800 Subject: [PATCH] gomaxprocs: Use a ticker (30s) to set gomaxprocs (#8278) (#8343) Adds a new goroutine which tries to refresh the gomaxprocs setting every `30s`. This accounts for cases where the CFS quotas may be refreshed without the APM Server process being restarted (like in ESS / ECE). Signed-off-by: Marc Lopez Rubio (cherry picked from commit c61dd3a21c98679f56d82b26a6f5f23dac9d3e29) Co-authored-by: Marc Lopez Rubio --- beater/beater.go | 58 ++++++++++++++++++++++++++++++++--------- beater/beater_test.go | 57 ++++++++++++++++++++++++++++++++++++++++ beater/server_test.go | 37 ++++++++++++++++++++++++++ changelogs/8.3.asciidoc | 4 +++ 4 files changed, 144 insertions(+), 12 deletions(-) diff --git a/beater/beater.go b/beater/beater.go index 692f22bd121..98876e0eada 100644 --- a/beater/beater.go +++ b/beater/beater.go @@ -96,22 +96,11 @@ func NewCreator(args CreatorParams) beat.Creator { libbeatMonitoringRegistry: monitoring.Default.GetRegistry("libbeat"), } - // Use `maxprocs` to change the GOMAXPROCS respecting any CFS quotas, if - // set. This is necessary since the Go runtime will default to the number - // of CPUs available in the machine it's running in, however, when running - // in a container or in a cgroup with resource limits, the disparity can be - // extreme. - // Having a significantly greater GOMAXPROCS set than the granted CFS quota - // results in a significant amount of time spent "throttling", essentially - // pausing the the running OS threads for the throttled period. - _, err := maxprocs.Set(maxprocs.Logger(logger.Infof)) - if err != nil { - logger.Errorf("failed to set GOMAXPROCS: %v", err) - } var elasticsearchOutputConfig *agentconfig.C if hasElasticsearchOutput(b) { elasticsearchOutputConfig = b.Config.Output.Config() } + var err error bt.config, err = config.NewConfig(bt.rawConfig, elasticsearchOutputConfig) if err != nil { return nil, err @@ -164,6 +153,18 @@ func (bt *beater) Run(b *beat.Beat) error { } func (bt *beater) run(ctx context.Context, cancelContext context.CancelFunc, b *beat.Beat) error { + // Use `maxprocs` to change the GOMAXPROCS respecting any CFS quotas, if + // set. This is necessary since the Go runtime will default to the number + // of CPUs available in the machine it's running in, however, when running + // in a container or in a cgroup with resource limits, the disparity can be + // extreme. + // Having a significantly greater GOMAXPROCS set than the granted CFS quota + // results in a significant amount of time spent "throttling", essentially + // pausing the the running OS threads for the throttled period. + // Since the quotas may be updated without restarting the process, the + // GOMAXPROCS are adjusted every 30s. + go adjustMaxProcs(ctx, 30*time.Second, diffInfof(bt.logger), bt.logger.Errorf) + tracer, tracerServer, err := initTracing(b, bt.config, bt.logger) if err != nil { return err @@ -1026,3 +1027,36 @@ func queryClusterUUID(ctx context.Context, esClient elasticsearch.Client) error s.Set(response.ClusterUUID) return nil } + +type logf func(string, ...interface{}) + +func adjustMaxProcs(ctx context.Context, d time.Duration, infof, errorf logf) error { + setMaxProcs := func() { + if _, err := maxprocs.Set(maxprocs.Logger(infof)); err != nil { + errorf("failed to set GOMAXPROCS: %v", err) + } + } + // set the gomaxprocs immediately. + setMaxProcs() + ticker := time.NewTicker(d) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-ticker.C: + setMaxProcs() + } + } +} + +func diffInfof(logger *logp.Logger) logf { + var last string + return func(format string, args ...interface{}) { + msg := fmt.Sprintf(format, args...) + if msg != last { + logger.Info(msg) + last = msg + } + } +} diff --git a/beater/beater_test.go b/beater/beater_test.go index ec51d17ab24..3f248c3985e 100644 --- a/beater/beater_test.go +++ b/beater/beater_test.go @@ -419,6 +419,63 @@ func TestQueryClusterUUIDRegistriesDoNotExist(t *testing.T) { assert.Equal(t, clusterUUID, fs.Strings["cluster_uuid"]) } +func TestAdjustMaxProcsTickerRefresh(t *testing.T) { + // This test asserts that the GOMAXPROCS is called multiple times + // respecting the time.Duration that is passed in the function. + for _, maxP := range []int{2, 4, 8} { + t.Run(fmt.Sprintf("%d_GOMAXPROCS", maxP), func(t *testing.T) { + observedLogs := testAdjustMaxProcs(t, maxP, false) + assert.GreaterOrEqual(t, observedLogs.Len(), 10) + }) + } +} + +func TestAdjustMaxProcsTickerRefreshDiffLogger(t *testing.T) { + // This test asserts that the log messages aren't logged more than once. + for _, maxP := range []int{2, 4, 8} { + t.Run(fmt.Sprintf("%d_GOMAXPROCS", maxP), func(t *testing.T) { + observedLogs := testAdjustMaxProcs(t, maxP, true) + // Assert that only 1 message has been logged. + assert.Equal(t, observedLogs.Len(), 1) + }) + } +} + +func testAdjustMaxProcs(t *testing.T, maxP int, diffCore bool) *observer.ObservedLogs { + t.Setenv("GOMAXPROCS", fmt.Sprint(maxP)) + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + core, observedLogs := observer.New(zapcore.DebugLevel) + logger := logp.NewLogger("", zap.WrapCore(func(in zapcore.Core) zapcore.Core { + return zapcore.NewTee(in, core) + })) + + // Adjust maxprocs every 1ms. + refreshDuration := time.Millisecond + logFunc := logger.Infof + if diffCore { + logFunc = diffInfof(logger) + } + + go adjustMaxProcs(ctx, refreshDuration, logFunc, logger.Errorf) + + filterMsg := fmt.Sprintf(`maxprocs: Honoring GOMAXPROCS="%d"`, maxP) + for { + select { + // Wait for 50ms so adjustmaxprocs has had time to run a few times. + case <-time.After(50 * refreshDuration): + logs := observedLogs.FilterMessageSnippet(filterMsg) + if logs.Len() >= 1 { + return logs + } + case <-ctx.Done(): + t.Error(ctx.Err()) + return nil + } + } +} + type mockClusterUUIDClient struct { ClusterUUID string `json:"cluster_uuid"` } diff --git a/beater/server_test.go b/beater/server_test.go index e6f5469fbb5..c85ea4c227a 100644 --- a/beater/server_test.go +++ b/beater/server_test.go @@ -1001,6 +1001,43 @@ func TestServerPProf(t *testing.T) { } } +func TestServerGoMaxProcsLogMessage(t *testing.T) { + // Assert that the gomaxprocs library is called and use the + // log message that is printed as + for _, n := range []int{1, 2, 4} { + t.Run(fmt.Sprintf("%d_GOMAXPROCS", n), func(t *testing.T) { + t.Setenv("GOMAXPROCS", fmt.Sprint(n)) + + beat, cfg := newBeat(t, nil, nil, nil) + apm, err := newTestBeater(t, beat, cfg, nil) + require.NoError(t, err) + apm.start() + defer apm.Stop() + + timeout := time.NewTimer(time.Second) + defer timeout.Stop() + for { + select { + case <-timeout.C: + t.Error("timed out waiting for log message, total logs observed:", apm.logs.Len()) + for _, log := range apm.logs.All() { + t.Log(log.LoggerName, log.Message) + } + return + case <-time.After(time.Millisecond): + logs := apm.logs.FilterMessageSnippet(fmt.Sprintf( + `maxprocs: Honoring GOMAXPROCS="%d" as set in environment`, n, + )) + if logs.Len() > 0 { + assert.Len(t, logs.All(), 1, "coundn't find gomaxprocs message logs") + return + } + } + } + }) + } +} + type dummyOutputClient struct { } diff --git a/changelogs/8.3.asciidoc b/changelogs/8.3.asciidoc index 8315c8f9c93..6898881d9a5 100644 --- a/changelogs/8.3.asciidoc +++ b/changelogs/8.3.asciidoc @@ -37,3 +37,7 @@ https://github.com/elastic/apm-server/compare/8.2.2\...8.3.0[View commits] - System, process, and well-defined runtime metrics are now sent to the shared `metrics-apm.internal-` data stream {pull}7882[7882] - Number of parallel bulk requests are now configurable via `output.elasticsearch.max_requests` {pull}8055[8055] - OTLP/HTTP protocol is now supported {pull}8156[8156] + +[float] +==== Performance improvements +- Limit the number of gomaxprocs when CPU limits have been set within a cgroup, yielding significant performance improvements, particularly when the APM Server experiences high traffic from a high number of APM agents {pull}8175[8175] {pull}8278[8278]