Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gomaxprocs: Use a ticker (30s) to set gomaxprocs #8278

Merged
merged 9 commits into from
Jun 8, 2022
72 changes: 60 additions & 12 deletions beater/beater.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ import (
"go.elastic.co/apm/module/apmhttp/v2"
"go.elastic.co/apm/v2"
"go.uber.org/automaxprocs/maxprocs"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"golang.org/x/sync/errgroup"

"github.com/elastic/beats/v7/libbeat/beat"
Expand Down Expand Up @@ -96,22 +98,11 @@ func NewCreator(args CreatorParams) beat.Creator {
libbeatMonitoringRegistry: monitoring.Default.GetRegistry("libbeat"),
}

// Use `maxprocs` to change the GOMAXPROCS respecting any CFS quotas, if
// set. This is necessary since the Go runtime will default to the number
// of CPUs available in the machine it's running in, however, when running
// in a container or in a cgroup with resource limits, the disparity can be
// extreme.
// Having a significantly greater GOMAXPROCS set than the granted CFS quota
// results in a significant amount of time spent "throttling", essentially
// pausing the the running OS threads for the throttled period.
_, err := maxprocs.Set(maxprocs.Logger(logger.Infof))
if err != nil {
logger.Errorf("failed to set GOMAXPROCS: %v", err)
}
var elasticsearchOutputConfig *agentconfig.C
if hasElasticsearchOutput(b) {
elasticsearchOutputConfig = b.Config.Output.Config()
}
var err error
bt.config, err = config.NewConfig(bt.rawConfig, elasticsearchOutputConfig)
if err != nil {
return nil, err
Expand Down Expand Up @@ -164,6 +155,23 @@ func (bt *beater) Run(b *beat.Beat) error {
}

func (bt *beater) run(ctx context.Context, cancelContext context.CancelFunc, b *beat.Beat) error {
// Use `maxprocs` to change the GOMAXPROCS respecting any CFS quotas, if
// set. This is necessary since the Go runtime will default to the number
// of CPUs available in the machine it's running in, however, when running
// in a container or in a cgroup with resource limits, the disparity can be
// extreme.
// Having a significantly greater GOMAXPROCS set than the granted CFS quota
// results in a significant amount of time spent "throttling", essentially
// pausing the the running OS threads for the throttled period.
// Since the quotas may be updated without restarting the process, the
// GOMAXPROCS are adjusted every 30s.
maxprocsLogger := bt.logger.WithOptions(
zap.WrapCore(func(c zapcore.Core) zapcore.Core {
return &zapDiffCore{Core: c}
}),
)
go adjustMaxProcs(ctx, 30*time.Second, maxprocsLogger)

tracer, tracerServer, err := initTracing(b, bt.config, bt.logger)
if err != nil {
return err
Expand Down Expand Up @@ -1026,3 +1034,43 @@ func queryClusterUUID(ctx context.Context, esClient elasticsearch.Client) error
s.Set(response.ClusterUUID)
return nil
}

func adjustMaxProcs(ctx context.Context, d time.Duration, logger *logp.Logger) error {
setMaxProcs := func() {
if _, err := maxprocs.Set(maxprocs.Logger(logger.Infof)); err != nil {
marclop marked this conversation as resolved.
Show resolved Hide resolved
logger.Errorf("failed to set GOMAXPROCS: %v", err)
}
}
// set the gomaxprocs immediately.
setMaxProcs()
ticker := time.NewTicker(d)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
setMaxProcs()
}
}
}

type zapDiffCore struct {
marclop marked this conversation as resolved.
Show resolved Hide resolved
mu sync.Mutex
zapcore.Core
lastMessage string
}

// Check records the logged entry.Message and discards the log entry when it
// matches the previously logged entry.Message.
func (c *zapDiffCore) Check(e zapcore.Entry, ce *zapcore.CheckedEntry) *zapcore.CheckedEntry {
c.mu.Lock()
defer c.mu.Unlock()

// Discard the entry if the logged message matches the previous log.
if e.Message == c.lastMessage && c.lastMessage != "" {
return nil
}
c.lastMessage = e.Message
return c.Core.Check(e, ce)
}
56 changes: 56 additions & 0 deletions beater/beater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,62 @@ func TestQueryClusterUUIDRegistriesDoNotExist(t *testing.T) {
assert.Equal(t, clusterUUID, fs.Strings["cluster_uuid"])
}

func TestAdjustMaxProcsTickerRefresh(t *testing.T) {
// This test asserts that the GOMAXPROCS is called multiple times
// respecting the time.Duration that is passed in the function.
for _, maxP := range []int{2, 4, 8} {
t.Run(fmt.Sprintf("%d_GOMAXPROCS", maxP), func(t *testing.T) {
observedLogs := testAdjustMaxProcs(t, maxP, false)
assert.GreaterOrEqual(t, observedLogs.Len(), 10)
})
}
}

func TestAdjustMaxProcsTickerRefreshDiffLogger(t *testing.T) {
// This test asserts that the log messages aren't logged more than once.
for _, maxP := range []int{2, 4, 8} {
t.Run(fmt.Sprintf("%d_GOMAXPROCS", maxP), func(t *testing.T) {
observedLogs := testAdjustMaxProcs(t, maxP, true)
// Assert that only 1 message has been logged.
assert.Equal(t, observedLogs.Len(), 1)
})
}
}

func testAdjustMaxProcs(t *testing.T, maxP int, diffCore bool) *observer.ObservedLogs {
t.Setenv("GOMAXPROCS", fmt.Sprint(maxP))
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

core, observedLogs := observer.New(zapcore.DebugLevel)
logger := logp.NewLogger("", zap.WrapCore(func(in zapcore.Core) zapcore.Core {
c := zapcore.NewTee(in, core)
if diffCore {
return &zapDiffCore{Core: c}
}
return c
}))

// Adjust maxprocs every 1ms.
refreshDuration := time.Millisecond
go adjustMaxProcs(ctx, refreshDuration, logger)

filterMsg := fmt.Sprintf(`maxprocs: Honoring GOMAXPROCS="%d"`, maxP)
for {
select {
// Wait for 50ms so adjustmaxprocs has had time to run a few times.
case <-time.After(50 * refreshDuration):
logs := observedLogs.FilterMessageSnippet(filterMsg)
if logs.Len() >= 1 {
return logs
}
case <-ctx.Done():
t.Error(ctx.Err())
return nil
}
}
}

type mockClusterUUIDClient struct {
ClusterUUID string `json:"cluster_uuid"`
}
Expand Down
37 changes: 37 additions & 0 deletions beater/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1000,6 +1000,43 @@ func TestServerPProf(t *testing.T) {
}
}

func TestServerGoMaxProcsLogMessage(t *testing.T) {
// Assert that the gomaxprocs library is called and use the
// log message that is printed as
for _, n := range []int{1, 2, 4} {
t.Run(fmt.Sprintf("%d_GOMAXPROCS", n), func(t *testing.T) {
t.Setenv("GOMAXPROCS", fmt.Sprint(n))

beat, cfg := newBeat(t, nil, nil, nil)
apm, err := newTestBeater(t, beat, cfg, nil)
require.NoError(t, err)
apm.start()
defer apm.Stop()

timeout := time.NewTimer(time.Second)
defer timeout.Stop()
for {
select {
case <-timeout.C:
t.Error("timed out waiting for log message, total logs observed:", apm.logs.Len())
for _, log := range apm.logs.All() {
t.Log(log.LoggerName, log.Message)
}
return
case <-time.After(time.Millisecond):
logs := apm.logs.FilterMessageSnippet(fmt.Sprintf(
`maxprocs: Honoring GOMAXPROCS="%d" as set in environment`, n,
))
if logs.Len() > 0 {
assert.Len(t, logs.All(), 1, "coundn't find gomaxprocs message logs")
return
}
}
}
})
}
}

type dummyOutputClient struct {
}

Expand Down