Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: Setup Global Resource Controller #40732

Merged
merged 7 commits into from
Jan 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions domain/domain_sysvars.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ func (do *Domain) initDomainSysVars() {

variable.SetExternalTimestamp = do.setExternalTimestamp
variable.GetExternalTimestamp = do.getExternalTimestamp

setGlobalResourceControlFunc := do.setGlobalResourceControl
variable.SetGlobalResourceControl.Store(&setGlobalResourceControlFunc)
}

// setStatsCacheCapacity sets statsCache cap
Expand Down Expand Up @@ -67,6 +70,15 @@ func (do *Domain) setPDClientDynamicOption(name, sVal string) {
}
}

func (do *Domain) setGlobalResourceControl(enable bool) {
if enable {
variable.EnableGlobalResourceControlFunc()
} else {
variable.DisableGlobalResourceControlFunc()
}
logutil.BgLogger().Info("set resource control", zap.Bool("enable", enable))
}

// updatePDClient is used to set the dynamic option into the PD client.
func (do *Domain) updatePDClient(option pd.DynamicOption, val interface{}) error {
store, ok := do.store.(interface{ GetPDClient() pd.Client })
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ require (
github.com/tdakkota/asciicheck v0.1.1
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2
github.com/tikv/client-go/v2 v2.0.5-0.20230120021435-f89383775234
github.com/tikv/pd v1.1.0-beta.0.20230119114149-402c2bfee2f3
github.com/tikv/pd/client v0.0.0-20230119115149-5c518d079b93
github.com/timakin/bodyclose v0.0.0-20210704033933-f49887972144
github.com/twmb/murmur3 v1.1.3
Expand Down Expand Up @@ -225,7 +226,6 @@ require (
github.com/sirupsen/logrus v1.9.0 // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect
github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a // indirect
github.com/tikv/pd v1.1.0-beta.0.20230119114149-402c2bfee2f3 // indirect
github.com/tklauser/go-sysconf v0.3.10 // indirect
github.com/tklauser/numcpus v0.4.0 // indirect
github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802 // indirect
Expand Down
4 changes: 2 additions & 2 deletions resourcemanager/rm.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import (
"github.com/pingcap/tidb/util/cpu"
)

// GlobalResourceManager is a global resource manager
var GlobalResourceManager = NewResourceManger()
// InstanceResourceManager is a local instance resource manager
var InstanceResourceManager = NewResourceManger()

// RandomName is to get a random name for register pool. It is just for test.
func RandomName() string {
Expand Down
1 change: 1 addition & 0 deletions sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -2258,6 +2258,7 @@ var defaultSysVars = []*SysVar{
},
{Scope: ScopeGlobal, Name: TiDBEnableResourceControl, Value: BoolToOnOff(DefTiDBEnableResourceControl), Type: TypeBool, SetGlobal: func(ctx context.Context, vars *SessionVars, s string) error {
EnableResourceControl.Store(TiDBOptOn(s))
(*SetGlobalResourceControl.Load())(TiDBOptOn(s))
return nil
}, GetGlobal: func(ctx context.Context, vars *SessionVars) (string, error) {
return BoolToOnOff(EnableResourceControl.Load()), nil
Expand Down
19 changes: 18 additions & 1 deletion sessionctx/variable/sysvar_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1115,26 +1115,43 @@ func TestSetJobScheduleWindow(t *testing.T) {
}

func TestTiDBEnableResourceControl(t *testing.T) {
// setup the hooks for test
enable := false
EnableGlobalResourceControlFunc = func() { enable = true }
DisableGlobalResourceControlFunc = func() { enable = false }
setGlobalResourceControlFunc := func(enable bool) {
if enable {
EnableGlobalResourceControlFunc()
} else {
DisableGlobalResourceControlFunc()
}
}
SetGlobalResourceControl.Store(&setGlobalResourceControlFunc)

vars := NewSessionVars(nil)
mock := NewMockGlobalAccessor4Tests()
mock.SessionVars = vars
vars.GlobalVarsAccessor = mock
resourceControlEnabled := GetSysVar(TiDBEnableResourceControl)

// Default true
// Default false
require.Equal(t, resourceControlEnabled.Value, Off)
require.Equal(t, enable, false)

// Set to On
err := mock.SetGlobalSysVar(context.Background(), TiDBEnableResourceControl, On)

require.NoError(t, err)
val, err1 := mock.GetGlobalSysVar(TiDBEnableResourceControl)
require.NoError(t, err1)
require.Equal(t, On, val)
require.Equal(t, enable, true)

// Set to off
err = mock.SetGlobalSysVar(context.Background(), TiDBEnableResourceControl, Off)
require.NoError(t, err)
val, err1 = mock.GetGlobalSysVar(TiDBEnableResourceControl)
require.NoError(t, err1)
require.Equal(t, Off, val)
require.Equal(t, enable, false)
}
10 changes: 10 additions & 0 deletions sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -1265,6 +1265,16 @@ var (
SetExternalTimestamp func(ctx context.Context, ts uint64) error
// GetExternalTimestamp is the func registered by staleread to get externaltimestamp from pd
GetExternalTimestamp func(ctx context.Context) (uint64, error)
// SetGlobalResourceControl is the func registered by domain to set cluster resource control.
SetGlobalResourceControl atomic.Pointer[func(bool)]
)

// Hooks functions for Cluster Resource Control.
var (
// EnableGlobalResourceControlFunc is the function registered by tikv_driver to set cluster resource control.
EnableGlobalResourceControlFunc func() = func() {}
// DisableGlobalResourceControlFunc is the function registered by tikv_driver to unset cluster resource control.
DisableGlobalResourceControlFunc func() = func() {}
)

func serverMemoryLimitDefaultValue() string {
Expand Down
2 changes: 2 additions & 0 deletions store/driver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//kv",
"//sessionctx/variable",
"//store/copr",
"//store/driver/error",
"//store/driver/txn",
Expand All @@ -19,6 +20,7 @@ go_library(
"@com_github_tikv_client_go_v2//tikv",
"@com_github_tikv_client_go_v2//tikvrpc",
"@com_github_tikv_client_go_v2//util",
"@com_github_tikv_pd//pkg/mcs/resource_manager/client",
"@com_github_tikv_pd_client//:client",
"@org_golang_google_grpc//:grpc",
"@org_golang_google_grpc//keepalive",
Expand Down
25 changes: 25 additions & 0 deletions store/driver/tikv_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
deadlockpb "github.com/pingcap/kvproto/pkg/deadlock"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/store/copr"
derr "github.com/pingcap/tidb/store/driver/error"
txn_driver "github.com/pingcap/tidb/store/driver/txn"
Expand All @@ -38,6 +39,7 @@ import (
"github.com/tikv/client-go/v2/tikvrpc"
"github.com/tikv/client-go/v2/util"
pd "github.com/tikv/pd/client"
rmclient "github.com/tikv/pd/pkg/mcs/resource_manager/client"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
Expand All @@ -53,6 +55,10 @@ var mc storeCache
func init() {
mc.cache = make(map[string]*tikvStore)
rand.Seed(time.Now().UnixNano())

// Setup the Hooks to dynamic control global resource controller.
variable.EnableGlobalResourceControlFunc = tikv.EnableResourceControl
variable.DisableGlobalResourceControlFunc = tikv.DisableResourceControl
}

// Option is a function that changes some config of Driver
Expand Down Expand Up @@ -86,6 +92,25 @@ func WithPDClientConfig(client config.PDClient) Option {
}
}

// TrySetupGlobalResourceController tries to setup global resource controller.
func TrySetupGlobalResourceController(ctx context.Context, serverID uint64, s kv.Storage) error {
var (
store *tikvStore
ok bool
)
if store, ok = s.(*tikvStore); !ok {
return errors.New("cannot setup up resource controller, should use tikv storage")
}

control, err := rmclient.NewResourceGroupController(serverID, store.GetPDClient(), rmclient.DefaultRequestUnitConfig())
if err != nil {
return err
}
tikv.SetResourceControlInterceptor(control)
control.Start(ctx)
return nil
}

// TiKVDriver implements engine TiKV.
type TiKVDriver struct {
keyspaceName string
Expand Down
2 changes: 1 addition & 1 deletion testkit/mockstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func bootstrap(t testing.TB, store kv.Storage, lease time.Duration) *domain.Doma
err := store.Close()
require.NoError(t, err)
view.Stop()
resourcemanager.GlobalResourceManager.Reset()
resourcemanager.InstanceResourceManager.Reset()
})
return dom
}
Expand Down
8 changes: 6 additions & 2 deletions tidb-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,9 +219,13 @@ func main() {

keyspaceName := config.GetGlobalKeyspaceName()

resourcemanager.GlobalResourceManager.Start()
resourcemanager.InstanceResourceManager.Start()
storage, dom := createStoreAndDomain(keyspaceName)
svr := createServer(storage, dom)
err = driver.TrySetupGlobalResourceController(context.Background(), dom.ServerID(), storage)
if err != nil {
logutil.BgLogger().Warn("failed to setup global resource controller", zap.Error(err))
}

// Register error API is not thread-safe, the caller MUST NOT register errors after initialization.
// To prevent misuse, set a flag to indicate that register new error will panic immediately.
Expand All @@ -233,7 +237,7 @@ func main() {
svr.Close()
cleanup(svr, storage, dom, graceful)
cpuprofile.StopCPUProfiler()
resourcemanager.GlobalResourceManager.Stop()
resourcemanager.InstanceResourceManager.Stop()
close(exited)
})
topsql.SetupTopSQL()
Expand Down
4 changes: 2 additions & 2 deletions util/gpool/spmc/spmcpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func NewSPMCPool[T any, U any, C any, CT any, TF pooltask.Context[CT]](name stri
result.capacity.Add(size)
result.workers = newWorkerLoopQueue[T, U, C, CT, TF](int(size))
result.cond = sync.NewCond(result.lock)
err := resourcemanager.GlobalResourceManager.Register(result, name, component)
err := resourcemanager.InstanceResourceManager.Register(result, name, component)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -225,7 +225,7 @@ func (p *Pool[T, U, C, CT, TF]) ReleaseAndWait() {

close(p.stopCh)
p.release()
defer resourcemanager.GlobalResourceManager.Unregister(p.Name())
defer resourcemanager.InstanceResourceManager.Unregister(p.Name())
for {
// Wait for all workers to exit and all task to be completed.
if p.Running() == 0 && p.heartbeatDone.Load() && p.waitingTask.Load() == 0 {
Expand Down