Skip to content

Commit

Permalink
domain: Introduce runaway manager (#44339)
Browse files Browse the repository at this point in the history
ref #43691
  • Loading branch information
Connor1996 authored Jun 16, 2023
1 parent 1b2c272 commit 7a29bec
Show file tree
Hide file tree
Showing 35 changed files with 445 additions and 303 deletions.
16 changes: 8 additions & 8 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -4230,8 +4230,8 @@ def go_deps():
name = "com_github_tikv_client_go_v2",
build_file_proto_mode = "disable_global",
importpath = "github.com/tikv/client-go/v2",
sum = "h1:BDQQDtqhUo3epgVc8HOPYLg9w8zehRaegEX+1bHMyVg=",
version = "v2.0.8-0.20230614032106-0b4b0ca00e74",
sum = "h1:k6GnsFTv7l9UdAiaZYlWLsTVDzOVFJY3lf/FCcDHoQ4=",
version = "v2.0.8-0.20230615161845-b32f340d0609",
)
go_repository(
name = "com_github_tikv_pd",
Expand All @@ -4245,8 +4245,8 @@ def go_deps():
name = "com_github_tikv_pd_client",
build_file_proto_mode = "disable_global",
importpath = "github.com/tikv/pd/client",
sum = "h1:GZdnq41Gs/Zxz0Ne/cZ/rKual+GD8F3h434OwhZogfA=",
version = "v0.0.0-20230608061541-026ddf08a351",
sum = "h1:a5SATBxu/0Z6qNnz4KXDN91gDA06waaYcHM6dkb6lz4=",
version = "v0.0.0-20230613052906-7158cb319935",
)
go_repository(
name = "com_github_timakin_bodyclose",
Expand Down Expand Up @@ -6041,15 +6041,15 @@ def go_deps():
name = "org_golang_google_genproto_googleapis_api",
build_file_proto_mode = "disable",
importpath = "google.golang.org/genproto/googleapis/api",
sum = "h1:AZX1ra8YbFMSb7+1pI8S9v4rrgRR7jU1FmuFSSjTVcQ=",
version = "v0.0.0-20230526203410-71b5a4ffd15e",
sum = "h1:HiYVD+FGJkTo+9zj1gqz0anapsa1JxjiSrN+BJKyUmE=",
version = "v0.0.0-20230525234020-1aefcd67740a",
)
go_repository(
name = "org_golang_google_genproto_googleapis_rpc",
build_file_proto_mode = "disable",
importpath = "google.golang.org/genproto/googleapis/rpc",
sum = "h1:0nDDozoAU19Qb2HwhXadU8OcsiO/09cnTqhUtq2MEOM=",
version = "v0.0.0-20230525234030-28d5490b6b19",
sum = "h1:XSJ8Vk1SWuNr8S18z1NZSziL0CPIXLCCMDOEFtHBOFc=",
version = "v0.0.0-20230530153820-e85fd2cbaebc",
)

go_repository(
Expand Down
3 changes: 2 additions & 1 deletion build/nogo_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -933,7 +933,8 @@
"br/pkg/lightning/checkpoints/checkpoints.go": "cfg.TikvImporter.Addr is deprecated",
"br/pkg/lightning/checkpoints/glue_checkpoint.go": "cfg.TikvImporter.Addr is deprecated",
"br/pkg/lightning/backend/local/local.go": "grpc Compressor/Decompressor is deprecated",
"br/pkg/lightning/backend/local/compress.go": "grpc Compressor/Decompressor is deprecated"
"br/pkg/lightning/backend/local/compress.go": "grpc Compressor/Decompressor is deprecated",
"domain/infosync/resource_manager_client.go": "github.com/golang/protobuf deprecated"
},
"only_files": {
"expression/bench_test.go": "expression/bench_test.go",
Expand Down
3 changes: 2 additions & 1 deletion ddl/tests/resourcegroup/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@ go_test(
srcs = ["resource_group_test.go"],
flaky = True,
race = "on",
shard_count = 5,
shard_count = 6,
deps = [
"//ddl/resourcegroup",
"//ddl/util/callback",
"//domain",
"//domain/infosync",
"//errno",
"//parser/auth",
"//parser/model",
"//sessionctx",
"//testkit",
Expand Down
26 changes: 26 additions & 0 deletions ddl/tests/resourcegroup/resource_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/domain/infosync"
mysql "github.com/pingcap/tidb/errno"
"github.com/pingcap/tidb/parser/auth"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/testkit"
Expand Down Expand Up @@ -217,6 +218,31 @@ func testResourceGroupNameFromIS(t *testing.T, ctx sessionctx.Context, name stri
return g
}

func TestResourceGroupRunaway(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
require.NoError(t, tk.Session().Auth(&auth.UserIdentity{Username: "root", Hostname: "localhost"}, nil, nil, nil))

tk.MustExec("use test")
tk.MustExec("create table t(a int)")
tk.MustExec("insert into t values(1)")

tk.MustExec("set global tidb_enable_resource_control='on'")
tk.MustExec("create resource group rg1 RU_PER_SEC=1000 QUERY_LIMIT=(EXEC_ELAPSED='100ms' ACTION=KILL)")
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/store/copr/sleepCoprRequest", `sleep(500)`))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/store/copr/sleepCoprRequest"))
}()
err := tk.QueryToErr("select /*+ resource_group(rg1) */ * from t")
require.ErrorContains(t, err, "[executor:8253]Query execution was interrupted, identified as runaway query")

tk.MustExec("alter resource group rg1 RU_PER_SEC=1000 QUERY_LIMIT=(EXEC_ELAPSED='100ms' ACTION=COOLDOWN)")
tk.MustQuery("select /*+ resource_group(rg1) */ * from t")

tk.MustExec("alter resource group rg1 RU_PER_SEC=1000 QUERY_LIMIT=(EXEC_ELAPSED='100ms' ACTION=DRYRUN)")
tk.MustQuery("select /*+ resource_group(rg1) */ * from t")
}

func TestResourceGroupHint(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
Expand Down
1 change: 1 addition & 0 deletions distsql/request_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,7 @@ func (builder *RequestBuilder) SetFromSessionVars(sv *variable.SessionVars) *Req
builder.StoreBatchSize = sv.StoreBatchSize
builder.Request.ResourceGroupName = sv.ResourceGroupName
builder.Request.StoreBusyThreshold = sv.LoadBasedReplicaReadThreshold
builder.Request.RunawayChecker = sv.StmtCtx.RunawayChecker
return builder
}

Expand Down
2 changes: 2 additions & 0 deletions domain/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ go_library(
"//domain/globalconfigsync",
"//domain/infosync",
"//domain/metrics",
"//domain/resourcegroup",
"//errno",
"//infoschema",
"//infoschema/perfschema",
Expand Down Expand Up @@ -87,6 +88,7 @@ go_library(
"@com_github_tikv_client_go_v2//tikv",
"@com_github_tikv_client_go_v2//txnkv/transaction",
"@com_github_tikv_pd_client//:client",
"@com_github_tikv_pd_client//resource_group/controller",
"@io_etcd_go_etcd_client_v3//:client",
"@io_etcd_go_etcd_client_v3//concurrency",
"@org_golang_google_grpc//:grpc",
Expand Down
62 changes: 51 additions & 11 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
"github.com/pingcap/tidb/disttask/framework/storage"
"github.com/pingcap/tidb/domain/globalconfigsync"
"github.com/pingcap/tidb/domain/infosync"
"github.com/pingcap/tidb/domain/resourcegroup"
"github.com/pingcap/tidb/errno"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/infoschema/perfschema"
Expand Down Expand Up @@ -86,6 +87,7 @@ import (
"github.com/tikv/client-go/v2/tikv"
"github.com/tikv/client-go/v2/txnkv/transaction"
pd "github.com/tikv/pd/client"
rmclient "github.com/tikv/pd/client/resource_group/controller"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/concurrency"
atomicutil "go.uber.org/atomic"
Expand Down Expand Up @@ -147,17 +149,19 @@ type Domain struct {
memoryUsageAlarmHandle *memoryusagealarm.Handle
serverMemoryLimitHandle *servermemorylimit.Handle
// TODO: use Run for each process in future pr
wg *util.WaitGroupEnhancedWrapper
statsUpdating atomicutil.Int32
cancel context.CancelFunc
indexUsageSyncLease time.Duration
dumpFileGcChecker *dumpFileGcChecker
planReplayerHandle *planReplayerHandle
extractTaskHandle *ExtractHandle
expiredTimeStamp4PC types.Time
logBackupAdvancer *daemon.OwnerDaemon
historicalStatsWorker *HistoricalStatsWorker
ttlJobManager atomic.Pointer[ttlworker.JobManager]
wg *util.WaitGroupEnhancedWrapper
statsUpdating atomicutil.Int32
cancel context.CancelFunc
indexUsageSyncLease time.Duration
dumpFileGcChecker *dumpFileGcChecker
planReplayerHandle *planReplayerHandle
extractTaskHandle *ExtractHandle
expiredTimeStamp4PC types.Time
logBackupAdvancer *daemon.OwnerDaemon
historicalStatsWorker *HistoricalStatsWorker
ttlJobManager atomic.Pointer[ttlworker.JobManager]
runawayManager *resourcegroup.RunawayManager
resourceGroupsController *rmclient.ResourceGroupsController

serverID uint64
serverIDSession *concurrency.Session
Expand Down Expand Up @@ -1179,6 +1183,10 @@ func (do *Domain) Init(
if err != nil {
return err
}
err = do.initResourceGroupsController(ctx, pdCli)
if err != nil {
return err
}
do.globalCfgSyncer = globalconfigsync.NewGlobalConfigSyncer(pdCli)
err = do.ddl.SchemaSyncer().Init(ctx)
if err != nil {
Expand Down Expand Up @@ -1235,6 +1243,23 @@ func (do *Domain) SetOnClose(onClose func()) {
do.onClose = onClose
}

func (do *Domain) initResourceGroupsController(ctx context.Context, pdClient pd.Client) error {
if pdClient == nil {
logutil.BgLogger().Warn("cannot setup up resource controller, not using tikv storage")
// return nil as unistore doesn't support it
return nil
}

control, err := rmclient.NewResourceGroupController(ctx, do.ServerID(), pdClient, nil, rmclient.WithMaxWaitDuration(resourcegroup.MaxWaitDuration))
if err != nil {
return err
}
control.Start(ctx)
do.runawayManager = resourcegroup.NewRunawayManager(control)
do.resourceGroupsController = control
return nil
}

func (do *Domain) initLogBackup(ctx context.Context, pdClient pd.Client) error {
cfg := config.GetGlobalConfig()
if pdClient == nil || do.etcdClient == nil {
Expand Down Expand Up @@ -1917,6 +1942,21 @@ func (do *Domain) SetupPlanReplayerHandle(collectorSctx sessionctx.Context, work
}
}

// RunawayManager returns the runaway manager.
func (do *Domain) RunawayManager() *resourcegroup.RunawayManager {
return do.runawayManager
}

// ResourceGroupsController returns the resource groups controller.
func (do *Domain) ResourceGroupsController() *rmclient.ResourceGroupsController {
return do.resourceGroupsController
}

// SetResourceGroupsController is only used in test.
func (do *Domain) SetResourceGroupsController(controller *rmclient.ResourceGroupsController) {
do.resourceGroupsController = controller
}

// SetupHistoricalStatsWorker setups worker
func (do *Domain) SetupHistoricalStatsWorker(ctx sessionctx.Context) {
do.historicalStatsWorker = &HistoricalStatsWorker{
Expand Down
3 changes: 2 additions & 1 deletion domain/infosync/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ go_library(
"mock_info.go",
"placement_manager.go",
"region.go",
"resource_group_manager.go",
"resource_manager_client.go",
"schedule_manager.go",
"tiflash_manager.go",
],
Expand Down Expand Up @@ -39,6 +39,7 @@ go_library(
"//util/pdapi",
"//util/syncutil",
"//util/versioninfo",
"@com_github_golang_protobuf//proto",
"@com_github_gorilla_mux//:mux",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
Expand Down
18 changes: 9 additions & 9 deletions domain/infosync/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ type InfoSyncer struct {
placementManager PlacementManager
scheduleManager ScheduleManager
tiflashReplicaManager TiFlashReplicaManager
resourceGroupManager pd.ResourceManagerClient
resourceManagerClient pd.ResourceManagerClient
}

// ServerInfo is server static information.
Expand Down Expand Up @@ -211,7 +211,7 @@ func GlobalInfoSyncerInit(
is.placementManager = initPlacementManager(etcdCli)
is.scheduleManager = initScheduleManager(etcdCli)
is.tiflashReplicaManager = initTiFlashReplicaManager(etcdCli, codec)
is.resourceGroupManager = initResourceGroupManager(pdCli)
is.resourceManagerClient = initResourceManagerClient(pdCli)
setGlobalInfoSyncer(is)
return is, nil
}
Expand Down Expand Up @@ -256,10 +256,10 @@ func initPlacementManager(etcdCli *clientv3.Client) PlacementManager {
return &PDPlacementManager{etcdCli: etcdCli}
}

func initResourceGroupManager(pdCli pd.Client) (cli pd.ResourceManagerClient) {
func initResourceManagerClient(pdCli pd.Client) (cli pd.ResourceManagerClient) {
cli = pdCli
if pdCli == nil {
cli = NewMockResourceGroupManager()
cli = NewMockResourceManagerClient()
}
failpoint.Inject("managerAlreadyCreateSomeGroups", func(val failpoint.Value) {
if val.(bool) {
Expand Down Expand Up @@ -589,7 +589,7 @@ func GetResourceGroup(ctx context.Context, name string) (*rmpb.ResourceGroup, er
return nil, err
}

return is.resourceGroupManager.GetResourceGroup(ctx, name)
return is.resourceManagerClient.GetResourceGroup(ctx, name)
}

// ListResourceGroups is used to get all resource groups from resource manager.
Expand All @@ -599,7 +599,7 @@ func ListResourceGroups(ctx context.Context) ([]*rmpb.ResourceGroup, error) {
return nil, err
}

return is.resourceGroupManager.ListResourceGroups(ctx)
return is.resourceManagerClient.ListResourceGroups(ctx)
}

// AddResourceGroup is used to create one specific resource group to resource manager.
Expand All @@ -608,7 +608,7 @@ func AddResourceGroup(ctx context.Context, group *rmpb.ResourceGroup) error {
if err != nil {
return err
}
_, err = is.resourceGroupManager.AddResourceGroup(ctx, group)
_, err = is.resourceManagerClient.AddResourceGroup(ctx, group)
return err
}

Expand All @@ -618,7 +618,7 @@ func ModifyResourceGroup(ctx context.Context, group *rmpb.ResourceGroup) error {
if err != nil {
return err
}
_, err = is.resourceGroupManager.ModifyResourceGroup(ctx, group)
_, err = is.resourceManagerClient.ModifyResourceGroup(ctx, group)
return err
}

Expand All @@ -628,7 +628,7 @@ func DeleteResourceGroup(ctx context.Context, name string) error {
if err != nil {
return err
}
_, err = is.resourceGroupManager.DeleteResourceGroup(ctx, name)
_, err = is.resourceManagerClient.DeleteResourceGroup(ctx, name)
return err
}

Expand Down
Loading

0 comments on commit 7a29bec

Please sign in to comment.