diff --git a/Makefile b/Makefile index ef1c06ae54cb..3cdbbbd94931 100644 --- a/Makefile +++ b/Makefile @@ -1785,14 +1785,10 @@ fuzz: ## Run fuzz tests. fuzz: bin/fuzz bin/fuzz $(TESTFLAGS) -tests $(TESTS) -timeout $(TESTTIMEOUT) $(PKG) -# Short hand to re-generate all bazel BUILD files. -# -# Even with --symlink_prefix, some sub-command somewhere hardcodes the -# creation of a "bazel-out" symlink. This bazel-out symlink can only -# be blocked by the existence of a file before the bazel command is -# invoked. For now, this is left as an exercise for the user. -# -bazel-generate: ## Generate all bazel BUILD files. +# Short hand to re-generate all bazel BUILD files. (Does the same thing as +# `./dev generate bazel`.) +.PHONY: bazel-generate +bazel-generate: @echo 'Generating DEPS.bzl and BUILD files using gazelle' ./build/bazelutil/bazel-generate.sh diff --git a/WORKSPACE b/WORKSPACE index 81f06d954a40..8e6623aefef9 100644 --- a/WORKSPACE +++ b/WORKSPACE @@ -34,7 +34,7 @@ git_repository( # Load up cockroachdb's go dependencies (the ones listed under go.mod). The # `DEPS.bzl` file is kept up to date using the `update-repos` Gazelle command -# (see `make bazel-generate`). +# (see `build/bazelutil/bazel-generate.sh`). # # gazelle:repository_macro DEPS.bzl%go_deps load("//:DEPS.bzl", "go_deps") diff --git a/build/README.md b/build/README.md index 07aae554a0f2..e5bc8047b828 100644 --- a/build/README.md +++ b/build/README.md @@ -160,7 +160,7 @@ is missing, ensure it is used in code. This can be a blank dependency, e.g. `import _ "golang.org/api/compute/v1"`. These changes must then be committed in the submodule directory (see [Working with Submodules](#working-with-submodules)). -Finally, run `make bazel-generate` to regenerate `DEPS.bzl` with the updated Go dependency information. +Finally, run `./dev generate bazel` to regenerate `DEPS.bzl` with the updated Go dependency information. Programs can then be run using `go build ...` or `go test ...`. diff --git a/build/bazelutil/bazel-generate.sh b/build/bazelutil/bazel-generate.sh index 7573ebcb2d90..06fc3be84cd2 100755 --- a/build/bazelutil/bazel-generate.sh +++ b/build/bazelutil/bazel-generate.sh @@ -2,6 +2,11 @@ set -exuo pipefail +# Even with --symlink_prefix, some sub-command somewhere hardcodes the +# creation of a "bazel-out" symlink. This bazel-out symlink can only +# be blocked by the existence of a file before the bazel command is +# invoked. For now, this is left as an exercise for the user. + bazel run //:gazelle -- update-repos -from_file=go.mod -build_file_proto_mode=disable_global -to_macro=DEPS.bzl%go_deps -prune=true bazel run //pkg/cmd/generate-test-suites --run_under="cd $PWD && " > pkg/BUILD.bazel bazel run //:gazelle diff --git a/build/teamcity-check-genfiles.sh b/build/teamcity-check-genfiles.sh index bc6f0b6755e4..94aee64134bf 100755 --- a/build/teamcity-check-genfiles.sh +++ b/build/teamcity-check-genfiles.sh @@ -30,7 +30,7 @@ rm artifacts/buildshort.log TEAMCITY_BAZEL_SUPPORT_GENERATE=1 # See teamcity-bazel-support.sh. run run_bazel build/bazelutil/bazel-generate.sh &> artifacts/buildshort.log || (cat artifacts/buildshort.log && false) rm artifacts/buildshort.log -check_clean "Run \`make bazel-generate\` to automatically regenerate these." +check_clean "Run \`./dev generate bazel\` to automatically regenerate these." run build/builder.sh make generate &> artifacts/generate.log || (cat artifacts/generate.log && false) rm artifacts/generate.log check_clean "Run \`make generate\` to automatically regenerate these." diff --git a/pkg/ccl/serverccl/tenant_grpc_test.go b/pkg/ccl/serverccl/tenant_grpc_test.go index aa8e66aa9f05..c65afd779dc0 100644 --- a/pkg/ccl/serverccl/tenant_grpc_test.go +++ b/pkg/ccl/serverccl/tenant_grpc_test.go @@ -13,6 +13,7 @@ import ( "io/ioutil" "net/http" "testing" + "time" "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -38,6 +39,7 @@ func TestTenantGRPCServices(t *testing.T) { defer log.Scope(t).Close(t) ctx := context.Background() + httpClient := httputil.NewClientWithTimeout(15 * time.Second) serverParams, _ := tests.CreateTestServerParams() testCluster := serverutils.StartNewTestCluster(t, 3, base.TestClusterArgs{ @@ -77,7 +79,7 @@ func TestTenantGRPCServices(t *testing.T) { }) t.Run("gRPC Gateway is running", func(t *testing.T) { - resp, err := httputil.Get(ctx, "http://"+tenant.HTTPAddr()+"/_status/statements") + resp, err := httpClient.Get(ctx, "http://"+tenant.HTTPAddr()+"/_status/statements") defer http.DefaultClient.CloseIdleConnections() require.NoError(t, err) defer resp.Body.Close() @@ -98,7 +100,7 @@ func TestTenantGRPCServices(t *testing.T) { defer connTenant2.Close() t.Run("statements endpoint fans out request to multiple pods", func(t *testing.T) { - resp, err := httputil.Get(ctx, "http://"+tenant2.HTTPAddr()+"/_status/statements") + resp, err := httpClient.Get(ctx, "http://"+tenant2.HTTPAddr()+"/_status/statements") defer http.DefaultClient.CloseIdleConnections() require.NoError(t, err) defer resp.Body.Close() @@ -115,7 +117,7 @@ func TestTenantGRPCServices(t *testing.T) { defer connTenant3.Close() t.Run("fanout of statements endpoint is segregated by tenant", func(t *testing.T) { - resp, err := httputil.Get(ctx, "http://"+tenant3.HTTPAddr()+"/_status/statements") + resp, err := httpClient.Get(ctx, "http://"+tenant3.HTTPAddr()+"/_status/statements") defer http.DefaultClient.CloseIdleConnections() require.NoError(t, err) defer resp.Body.Close() @@ -160,7 +162,7 @@ func TestTenantGRPCServices(t *testing.T) { }) t.Run("sessions endpoint is available", func(t *testing.T) { - resp, err := httputil.Get(ctx, "http://"+tenant.HTTPAddr()+"/_status/sessions") + resp, err := httpClient.Get(ctx, "http://"+tenant.HTTPAddr()+"/_status/sessions") defer http.DefaultClient.CloseIdleConnections() require.NoError(t, err) require.Equal(t, 200, resp.StatusCode) diff --git a/pkg/cmd/dev/generate.go b/pkg/cmd/dev/generate.go index df7623a9bec2..50dc7f8ca522 100644 --- a/pkg/cmd/dev/generate.go +++ b/pkg/cmd/dev/generate.go @@ -80,8 +80,7 @@ func (d *dev) generateBazel(cmd *cobra.Command) error { if err != nil { return err } - _, err = d.exec.CommandContext(ctx, filepath.Join(workspace, "build", "bazelutil", "bazel-generate.sh")) - return err + return d.exec.CommandContextInheritingStdStreams(ctx, filepath.Join(workspace, "build", "bazelutil", "bazel-generate.sh")) } func (d *dev) generateDocs(cmd *cobra.Command) error { diff --git a/pkg/cmd/dev/io/exec/exec.go b/pkg/cmd/dev/io/exec/exec.go index eabf2b8206dd..ea44aa082fc9 100644 --- a/pkg/cmd/dev/io/exec/exec.go +++ b/pkg/cmd/dev/io/exec/exec.go @@ -80,12 +80,6 @@ func WithRecording(r *recording.Recording) func(e *Exec) { } } -// CommandContext wraps around exec.CommandContext, executing the named program -// with the given arguments. -func (e *Exec) CommandContext(ctx context.Context, name string, args ...string) ([]byte, error) { - return e.commandContextImpl(ctx, nil, false, name, args...) -} - // CommandContextSilent is like CommandContext, but does not take over // stdout/stderr. It's to be used for "internal" operations. func (e *Exec) CommandContextSilent( diff --git a/pkg/util/admission/granter.go b/pkg/util/admission/granter.go index fcc2d3c1abd6..d4dea88783ee 100644 --- a/pkg/util/admission/granter.go +++ b/pkg/util/admission/granter.go @@ -382,6 +382,7 @@ type tokenGranter struct { requester requester availableBurstTokens int maxBurstTokens int + skipTokenEnforcement bool // Optional. Practically, both uses of tokenGranter, for SQLKVResponseWork // and SQLSQLResponseWork have a non-nil value. We don't expect to use // memory overload indicators here since memory accounting and disk spilling @@ -396,8 +397,9 @@ func (tg *tokenGranter) getPairedRequester() requester { return tg.requester } -func (tg *tokenGranter) refillBurstTokens() { +func (tg *tokenGranter) refillBurstTokens(skipTokenEnforcement bool) { tg.availableBurstTokens = tg.maxBurstTokens + tg.skipTokenEnforcement = skipTokenEnforcement } func (tg *tokenGranter) grantKind() grantKind { @@ -412,7 +414,7 @@ func (tg *tokenGranter) tryGetLocked() grantResult { if tg.cpuOverload != nil && tg.cpuOverload.isOverloaded() { return grantFailDueToSharedResource } - if tg.availableBurstTokens > 0 { + if tg.availableBurstTokens > 0 || tg.skipTokenEnforcement { tg.availableBurstTokens-- return grantSuccess } @@ -446,10 +448,12 @@ func (tg *tokenGranter) continueGrantChain(grantChainID grantChainID) { // KVWork, that are limited by slots (CPU bound work) and/or tokens (IO // bound work). type kvGranter struct { - coord *GrantCoordinator - requester requester - usedSlots int - totalSlots int + coord *GrantCoordinator + requester requester + usedSlots int + totalSlots int + skipSlotEnforcement bool + ioTokensEnabled bool // There is no rate limiting in granting these tokens. That is, they are all // burst tokens. @@ -478,7 +482,7 @@ func (sg *kvGranter) tryGet() bool { } func (sg *kvGranter) tryGetLocked() grantResult { - if sg.usedSlots < sg.totalSlots { + if sg.usedSlots < sg.totalSlots || sg.skipSlotEnforcement { if !sg.ioTokensEnabled || sg.availableIOTokens > 0 { sg.usedSlots++ if sg.usedSlotsMetric != nil { @@ -555,8 +559,11 @@ func (sg *kvGranter) setAvailableIOTokensLocked(tokens int64) { // StoreGrantCoordinators) for KVWork that uses that store. See the // NewGrantCoordinators and NewGrantCoordinatorSQL functions. type GrantCoordinator struct { - settings *cluster.Settings + settings *cluster.Settings + lastCPULoadSamplePeriod time.Duration + // mu is ordered before any mutex acquired in a requester implementation. + // TODO(sumeer): move everything covered by mu into a nested struct. mu syncutil.Mutex // NB: Some granters can be nil. granters [numWorkKinds]granterWithLockedCalls @@ -842,20 +849,38 @@ func (coord *GrantCoordinator) GetWorkQueue(workKind WorkKind) *WorkQueue { return coord.queues[workKind].(*WorkQueue) } -// CPULoad implements CPULoadListener and is called every 1ms. The same -// frequency is used for refilling the burst tokens since synchronizing the -// two means that the refilled burst can take into account the latest -// schedulers stats (indirectly, via the implementation of -// cpuOverloadIndicator). -// TODO(sumeer): after experimentation, possibly generalize the 1ms ticks used -// for CPULoad. -func (coord *GrantCoordinator) CPULoad(runnable int, procs int) { +// CPULoad implements CPULoadListener and is called periodically (see +// CPULoadListener for details). The same frequency is used for refilling the +// burst tokens since synchronizing the two means that the refilled burst can +// take into account the latest schedulers stats (indirectly, via the +// implementation of cpuOverloadIndicator). +func (coord *GrantCoordinator) CPULoad(runnable int, procs int, samplePeriod time.Duration) { + if coord.lastCPULoadSamplePeriod != 0 && coord.lastCPULoadSamplePeriod != samplePeriod && + KVAdmissionControlEnabled.Get(&coord.settings.SV) { + log.Infof(context.Background(), "CPULoad switching to period %s", samplePeriod.String()) + } + coord.lastCPULoadSamplePeriod = samplePeriod + coord.mu.Lock() defer coord.mu.Unlock() coord.numProcs = procs - coord.cpuLoadListener.CPULoad(runnable, procs) - coord.granters[SQLKVResponseWork].(*tokenGranter).refillBurstTokens() - coord.granters[SQLSQLResponseWork].(*tokenGranter).refillBurstTokens() + coord.cpuLoadListener.CPULoad(runnable, procs, samplePeriod) + + // Slot adjustment and token refilling requires 1ms periods to work well. If + // the CPULoad ticks are less frequent, there is no guarantee that the + // tokens or slots will be sufficient to service requests. This is + // particularly the case for slots where we dynamically adjust them, and + // high contention can suddenly result in high slot utilization even while + // cpu utilization stays low. We don't want to artificially bottleneck + // request processing when we are in this slow CPULoad ticks regime since we + // can't adjust slots or refill tokens fast enough. So we explicitly tell + // the granters to not do token or slot enforcement. + skipEnforcement := samplePeriod > time.Millisecond + coord.granters[SQLKVResponseWork].(*tokenGranter).refillBurstTokens(skipEnforcement) + coord.granters[SQLSQLResponseWork].(*tokenGranter).refillBurstTokens(skipEnforcement) + if coord.granters[KVWork] != nil { + coord.granters[KVWork].(*kvGranter).skipSlotEnforcement = skipEnforcement + } if coord.grantChainActive && !coord.tryTerminateGrantChain() { return } @@ -1264,12 +1289,11 @@ type cpuOverloadIndicator interface { } // CPULoadListener listens to the latest CPU load information. Currently we -// expect this to be called every 1ms. -// TODO(sumeer): experiment with more smoothing. It is possible that rapid -// slot fluctuation may be resulting in under-utilization at a time scale that -// is not observable at our metrics frequency. +// expect this to be called every 1ms, unless the cpu is extremely +// underloaded. If the samplePeriod is > 1ms, admission control enforcement +// for CPU is disabled. type CPULoadListener interface { - CPULoad(runnable int, procs int) + CPULoad(runnable int, procs int, samplePeriod time.Duration) } // kvSlotAdjuster is an implementer of CPULoadListener and @@ -1293,8 +1317,9 @@ type kvSlotAdjuster struct { var _ cpuOverloadIndicator = &kvSlotAdjuster{} var _ CPULoadListener = &kvSlotAdjuster{} -func (kvsa *kvSlotAdjuster) CPULoad(runnable int, procs int) { +func (kvsa *kvSlotAdjuster) CPULoad(runnable int, procs int, _ time.Duration) { threshold := int(KVSlotAdjusterOverloadThreshold.Get(&kvsa.settings.SV)) + // Simple heuristic, which worked ok in experiments. More sophisticated ones // could be devised. if runnable >= threshold*procs { @@ -1332,7 +1357,7 @@ func (kvsa *kvSlotAdjuster) CPULoad(runnable int, procs int) { } func (kvsa *kvSlotAdjuster) isOverloaded() bool { - return kvsa.granter.usedSlots >= kvsa.granter.totalSlots + return kvsa.granter.usedSlots >= kvsa.granter.totalSlots && !kvsa.granter.skipSlotEnforcement } // sqlNodeCPUOverloadIndicator is the implementation of cpuOverloadIndicator @@ -1559,7 +1584,9 @@ func (io *ioLoadListener) adjustTokens(m pebble.Metrics) { var _ cpuOverloadIndicator = &sqlNodeCPUOverloadIndicator{} var _ CPULoadListener = &sqlNodeCPUOverloadIndicator{} -func (sn *sqlNodeCPUOverloadIndicator) CPULoad(runnable int, procs int) { +func (sn *sqlNodeCPUOverloadIndicator) CPULoad( + runnable int, procs int, samplePeriod time.Duration, +) { } func (sn *sqlNodeCPUOverloadIndicator) isOverloaded() bool { diff --git a/pkg/util/admission/granter_test.go b/pkg/util/admission/granter_test.go index b6c694385b7e..789bfbcd029c 100644 --- a/pkg/util/admission/granter_test.go +++ b/pkg/util/admission/granter_test.go @@ -16,6 +16,7 @@ import ( "sort" "strings" "testing" + "time" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/util/leaktest" @@ -83,7 +84,7 @@ func (tr *testRequester) getAdmittedCount() uint64 { // return-grant work= // took-without-permission work= // continue-grant-chain work= -// cpu-load runnable= procs= +// cpu-load runnable= procs= [infrequent=] // set-io-tokens tokens= func TestGranterBasic(t *testing.T) { defer leaktest.AfterTest(t)() @@ -159,7 +160,15 @@ func TestGranterBasic(t *testing.T) { var runnable, procs int d.ScanArgs(t, "runnable", &runnable) d.ScanArgs(t, "procs", &procs) - coord.CPULoad(runnable, procs) + infrequent := false + if d.HasArg("infrequent") { + d.ScanArgs(t, "infrequent", &infrequent) + } + samplePeriod := time.Millisecond + if infrequent { + samplePeriod = 250 * time.Millisecond + } + coord.CPULoad(runnable, procs, samplePeriod) return flushAndReset() case "set-io-tokens": diff --git a/pkg/util/admission/testdata/granter b/pkg/util/admission/testdata/granter index 6edb98551d2a..c68edae0ceb8 100644 --- a/pkg/util/admission/testdata/granter +++ b/pkg/util/admission/testdata/granter @@ -366,3 +366,83 @@ kv: granted in chain 0, and returning true GrantCoordinator: (chain: id: 6 active: false index: 0) kv: used: 3, total: 3 io-avail: 0 sql-kv-response: avail: 0 sql-sql-response: avail: 1 sql-leaf-start: used: 2, total: 2 sql-root-start: used: 1, total: 1 + +##################################################################### +# Test skipping of enforcements when CPULoad has high sampling period. +init-grant-coordinator min-cpu=1 max-cpu=3 sql-kv-tokens=1 sql-sql-tokens=1 sql-leaf=2 sql-root=2 +---- +GrantCoordinator: +(chain: id: 1 active: false index: 0) kv: used: 0, total: 1 sql-kv-response: avail: 1 +sql-sql-response: avail: 1 sql-leaf-start: used: 0, total: 2 sql-root-start: used: 0, total: 2 + +# No more slots after this slot is granted. +try-get work=kv +---- +kv: tryGet returned true +GrantCoordinator: +(chain: id: 1 active: false index: 0) kv: used: 1, total: 1 sql-kv-response: avail: 1 +sql-sql-response: avail: 1 sql-leaf-start: used: 0, total: 2 sql-root-start: used: 0, total: 2 + +# Since no more KV slots, cannot grant token to sql-kv-response. +try-get work=sql-kv-response +---- +sql-kv-response: tryGet returned false +GrantCoordinator: +(chain: id: 1 active: false index: 0) kv: used: 1, total: 1 sql-kv-response: avail: 1 +sql-sql-response: avail: 1 sql-leaf-start: used: 0, total: 2 sql-root-start: used: 0, total: 2 + +# Since no more KV slots, cannot grant token to sql-sql-response. +try-get work=sql-sql-response +---- +sql-sql-response: tryGet returned false +GrantCoordinator: +(chain: id: 1 active: false index: 0) kv: used: 1, total: 1 sql-kv-response: avail: 1 +sql-sql-response: avail: 1 sql-leaf-start: used: 0, total: 2 sql-root-start: used: 0, total: 2 + +# CPULoad shows overload, so cannot increase KV slots, but since it is +# infrequent, slot and token enforcement is disabled. +cpu-load runnable=20 procs=1 infrequent=true +---- +GrantCoordinator: +(chain: id: 1 active: false index: 5) kv: used: 1, total: 1 sql-kv-response: avail: 1 +sql-sql-response: avail: 1 sql-leaf-start: used: 0, total: 2 sql-root-start: used: 0, total: 2 + +# sql-kv-response can get a token. +try-get work=sql-kv-response +---- +sql-kv-response: tryGet returned true +GrantCoordinator: +(chain: id: 1 active: false index: 5) kv: used: 1, total: 1 sql-kv-response: avail: 0 +sql-sql-response: avail: 1 sql-leaf-start: used: 0, total: 2 sql-root-start: used: 0, total: 2 + +# sql-kv-response can get another token, even though tokens are exhausted. +try-get work=sql-kv-response +---- +sql-kv-response: tryGet returned true +GrantCoordinator: +(chain: id: 1 active: false index: 5) kv: used: 1, total: 1 sql-kv-response: avail: -1 +sql-sql-response: avail: 1 sql-leaf-start: used: 0, total: 2 sql-root-start: used: 0, total: 2 + +# sql-sql-response can get a token. +try-get work=sql-sql-response +---- +sql-sql-response: tryGet returned true +GrantCoordinator: +(chain: id: 1 active: false index: 5) kv: used: 1, total: 1 sql-kv-response: avail: -1 +sql-sql-response: avail: 0 sql-leaf-start: used: 0, total: 2 sql-root-start: used: 0, total: 2 + +# sql-sql-response can get another token, even though tokens are exhausted. +try-get work=sql-sql-response +---- +sql-sql-response: tryGet returned true +GrantCoordinator: +(chain: id: 1 active: false index: 5) kv: used: 1, total: 1 sql-kv-response: avail: -1 +sql-sql-response: avail: -1 sql-leaf-start: used: 0, total: 2 sql-root-start: used: 0, total: 2 + +# KV can get another slot even though slots are exhausted. +try-get work=kv +---- +kv: tryGet returned true +GrantCoordinator: +(chain: id: 1 active: false index: 5) kv: used: 2, total: 1 sql-kv-response: avail: -1 +sql-sql-response: avail: -1 sql-leaf-start: used: 0, total: 2 sql-root-start: used: 0, total: 2 diff --git a/pkg/util/goschedstats/BUILD.bazel b/pkg/util/goschedstats/BUILD.bazel index 24c4ed5d81e0..32f01d9d7672 100644 --- a/pkg/util/goschedstats/BUILD.bazel +++ b/pkg/util/goschedstats/BUILD.bazel @@ -19,5 +19,9 @@ go_test( name = "goschedstats_test", srcs = ["runnable_test.go"], embed = [":goschedstats"], - deps = ["//pkg/testutils"], + deps = [ + "//pkg/testutils", + "//pkg/util/timeutil", + "@com_github_stretchr_testify//require", + ], ) diff --git a/pkg/util/goschedstats/runnable.go b/pkg/util/goschedstats/runnable.go index 19b463184512..9baa03f317eb 100644 --- a/pkg/util/goschedstats/runnable.go +++ b/pkg/util/goschedstats/runnable.go @@ -54,8 +54,22 @@ func RecentNormalizedRunnableGoroutines() float64 { // will have to add a new version of that file. var _ = numRunnableGoroutines -// We sample the number of runnable goroutines once per samplePeriod. -const samplePeriod = time.Millisecond +// We sample the number of runnable goroutines once per samplePeriodShort or +// samplePeriodLong (if the system is underloaded). Using samplePeriodLong can +// cause sluggish response to a load spike, from the perspective of +// RunnableCountCallback implementers (admission control), so it is not ideal. +// We support this behavior only because we have observed 5-10% of cpu +// utilization on CockroachDB nodes that are doing no other work, even though +// 1ms polling (samplePeriodShort) is extremely cheap. The cause may be a poor +// interaction with processor idle state +// https://github.com/golang/go/issues/30740#issuecomment-471634471. See +// #66881. +const samplePeriodShort = time.Millisecond +const samplePeriodLong = 250 * time.Millisecond + +// The system is underloaded if the number of runnable goroutines per proc +// is below this threshold. +const underloadedRunnablePerProcThreshold = 1 * toFixedPoint // We "report" the average value every reportingPeriod. // Note: if this is changed from 1s, CumulativeNormalizedRunnableGoroutines() @@ -78,8 +92,8 @@ var total uint64 var ewma uint64 // RunnableCountCallback is provided the current value of runnable goroutines, -// and GOMAXPROCS. -type RunnableCountCallback func(numRunnable int, numProcs int) +// GOMAXPROCS, and the current sampling period. +type RunnableCountCallback func(numRunnable int, numProcs int, samplePeriod time.Duration) type callbackWithID struct { RunnableCountCallback @@ -95,7 +109,8 @@ var callbackInfo struct { } // RegisterRunnableCountCallback registers a callback to be run with the -// runnable and procs info every 1ms. This is exclusively for use by admission +// runnable and procs info, every 1ms, unless cpu load is very low (see the +// commentary for samplePeriodShort). This is exclusively for use by admission // control that wants to react extremely quickly to cpu changes. Past // experience in other systems (not CockroachDB) motivated not consuming a // smoothed signal for admission control. The CockroachDB setting may possibly @@ -140,48 +155,90 @@ func UnregisterRunnableCountCallback(id int64) { func init() { go func() { - lastTime := timeutil.Now() - // sum accumulates the sum of the number of runnable goroutines per CPU, - // multiplied by toFixedPoint, for all samples since the last reporting. - var sum uint64 - var numSamples int - - ticker := time.NewTicker(samplePeriod) - // We keep local versions of "total" and "ewma" and we just Store the - // updated values to the globals. - var localTotal, localEWMA uint64 + sst := schedStatsTicker{ + lastTime: timeutil.Now(), + curPeriod: samplePeriodShort, + numRunnableGoroutines: numRunnableGoroutines, + } + ticker := time.NewTicker(sst.curPeriod) for { t := <-ticker.C - if t.Sub(lastTime) > reportingPeriod { - if numSamples > 0 { - // We want the average value over the reporting period, so we divide - // by numSamples. - newValue := sum / uint64(numSamples) - localTotal += newValue - atomic.StoreUint64(&total, localTotal) - - // ewma(t) = c * value(t) + (1 - c) * ewma(t-1) - // We use c = 0.5. - localEWMA = (newValue + localEWMA) / 2 - atomic.StoreUint64(&ewma, localEWMA) - } - lastTime = t - sum = 0 - numSamples = 0 - } - runnable, numProcs := numRunnableGoroutines() callbackInfo.mu.Lock() cbs := callbackInfo.cbs callbackInfo.mu.Unlock() - for i := range cbs { - cbs[i].RunnableCountCallback(runnable, numProcs) - } - // The value of the sample is the ratio of runnable to numProcs (scaled - // for fixed-point arithmetic). - sum += uint64(runnable) * toFixedPoint / uint64(numProcs) - numSamples++ + sst.getStatsOnTick(t, cbs, ticker) } }() } +// timeTickerInterface abstracts time.Ticker for testing. +type timeTickerInterface interface { + Reset(d time.Duration) +} + +// schedStatsTicker contains the local state maintained across stats collection +// ticks. +type schedStatsTicker struct { + lastTime time.Time + curPeriod time.Duration + numRunnableGoroutines func() (numRunnable int, numProcs int) + // sum accumulates the sum of the number of runnable goroutines per CPU, + // multiplied by toFixedPoint, for all samples since the last reporting. + sum uint64 + // numSamples is the number of samples since the last reporting. + numSamples int + // We keep local versions of "total" and "ewma" and we just Store the + // updated values to the globals. + localTotal, localEWMA uint64 +} + +// getStatsOnTick gets scheduler stats as the ticker has ticked. +func (s *schedStatsTicker) getStatsOnTick( + t time.Time, cbs []callbackWithID, ticker timeTickerInterface, +) { + if t.Sub(s.lastTime) > reportingPeriod { + var avgValue uint64 + if s.numSamples > 0 { + // We want the average value over the reporting period, so we divide + // by numSamples. + avgValue = s.sum / uint64(s.numSamples) + s.localTotal += avgValue + atomic.StoreUint64(&total, s.localTotal) + + // ewma(t) = c * value(t) + (1 - c) * ewma(t-1) + // We use c = 0.5. + s.localEWMA = (avgValue + s.localEWMA) / 2 + atomic.StoreUint64(&ewma, s.localEWMA) + } + nextPeriod := samplePeriodShort + // Both the mean over the last 1s, and the exponentially weighted average + // must be low for the system to be considered underloaded. + if avgValue < underloadedRunnablePerProcThreshold && + s.localEWMA < underloadedRunnablePerProcThreshold { + // Underloaded, so switch to longer sampling period. + nextPeriod = samplePeriodLong + } + // We switch the sample period only at reportingPeriod boundaries + // since it ensures that all samples contributing to a reporting + // period were at equal intervals (this is desirable since we average + // them). It also naturally reduces the frequency at which we reset a + // ticker. + if nextPeriod != s.curPeriod { + ticker.Reset(nextPeriod) + s.curPeriod = nextPeriod + } + s.lastTime = t + s.sum = 0 + s.numSamples = 0 + } + runnable, numProcs := s.numRunnableGoroutines() + for i := range cbs { + cbs[i].RunnableCountCallback(runnable, numProcs, s.curPeriod) + } + // The value of the sample is the ratio of runnable to numProcs (scaled + // for fixed-point arithmetic). + s.sum += uint64(runnable) * toFixedPoint / uint64(numProcs) + s.numSamples++ +} + var _ = RecentNormalizedRunnableGoroutines diff --git a/pkg/util/goschedstats/runnable_test.go b/pkg/util/goschedstats/runnable_test.go index b9f32fac80fe..41d72ff6ed5b 100644 --- a/pkg/util/goschedstats/runnable_test.go +++ b/pkg/util/goschedstats/runnable_test.go @@ -14,8 +14,11 @@ import ( "fmt" "runtime" "testing" + "time" "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/stretchr/testify/require" ) func TestNumRunnableGoroutines(t *testing.T) { @@ -39,3 +42,77 @@ func TestNumRunnableGoroutines(t *testing.T) { return nil }) } + +type testTimeTicker struct { + numResets int + lastResetDuration time.Duration +} + +func (t *testTimeTicker) Reset(d time.Duration) { + t.numResets++ + t.lastResetDuration = d +} + +func TestSchedStatsTicker(t *testing.T) { + runnable := 0 + numRunnable := func() (numRunnable int, numProcs int) { + return runnable, 1 + } + var callbackSamplePeriod time.Duration + var numCallbacks int + cb := func(numRunnable int, numProcs int, samplePeriod time.Duration) { + require.Equal(t, runnable, numRunnable) + require.Equal(t, 1, numProcs) + callbackSamplePeriod = samplePeriod + numCallbacks++ + } + cbs := []callbackWithID{{cb, 0}} + now := timeutil.UnixEpoch + startTime := now + sst := schedStatsTicker{ + lastTime: now, + curPeriod: samplePeriodShort, + numRunnableGoroutines: numRunnable, + } + tt := testTimeTicker{} + // Tick every 1ms until the reportingPeriod has elapsed. + for i := 1; ; i++ { + now = now.Add(samplePeriodShort) + sst.getStatsOnTick(now, cbs, &tt) + if now.Sub(startTime) <= reportingPeriod { + // No reset of the time ticker. + require.Equal(t, 0, tt.numResets) + // Each tick causes a callback. + require.Equal(t, i, numCallbacks) + require.Equal(t, samplePeriodShort, callbackSamplePeriod) + } else { + break + } + } + // Since underloaded, the time ticker is reset to samplePeriodLong, and this + // period is provided to the latest callback. + require.Equal(t, 1, tt.numResets) + require.Equal(t, samplePeriodLong, tt.lastResetDuration) + require.Equal(t, samplePeriodLong, callbackSamplePeriod) + // Increase load so no longer underloaded. + runnable = 2 + startTime = now + tt.numResets = 0 + for i := 1; ; i++ { + now = now.Add(samplePeriodLong) + sst.getStatsOnTick(now, cbs, &tt) + if now.Sub(startTime) <= reportingPeriod { + // No reset of the time ticker. + require.Equal(t, 0, tt.numResets) + // Each tick causes a callback. + require.Equal(t, samplePeriodLong, callbackSamplePeriod) + } else { + break + } + } + // No longer underloaded, so the time ticker is reset to samplePeriodShort, + // and this period is provided to the latest callback. + require.Equal(t, 1, tt.numResets) + require.Equal(t, samplePeriodShort, tt.lastResetDuration) + require.Equal(t, samplePeriodShort, callbackSamplePeriod) +}