From 7a29bec6d8a03fd0bc339cb25803f558aa03b34e Mon Sep 17 00:00:00 2001 From: Connor Date: Thu, 15 Jun 2023 20:45:10 -0700 Subject: [PATCH] domain: Introduce runaway manager (#44339) ref pingcap/tidb#43691 --- DEPS.bzl | 16 +- build/nogo_config.json | 3 +- ddl/tests/resourcegroup/BUILD.bazel | 3 +- .../resourcegroup/resource_group_test.go | 26 ++++ distsql/request_builder.go | 1 + domain/BUILD.bazel | 2 + domain/domain.go | 62 ++++++-- domain/infosync/BUILD.bazel | 3 +- domain/infosync/info.go | 18 +-- domain/infosync/resource_group_manager.go | 115 -------------- domain/infosync/resource_manager_client.go | 147 ++++++++++++++++++ domain/resourcegroup/BUILD.bazel | 9 ++ domain/resourcegroup/runaway.go | 118 +++++++++++++- errno/errcode.go | 1 + errno/errname.go | 1 + errors.toml | 5 + executor/BUILD.bazel | 1 - executor/adapter.go | 17 +- executor/calibrate_resource.go | 16 +- executor/calibrate_resource_test.go | 16 +- executor/executor.go | 2 + go.mod | 4 +- go.sum | 8 +- kv/BUILD.bazel | 1 + kv/kv.go | 3 + sessionctx/stmtctx/BUILD.bazel | 1 + sessionctx/stmtctx/stmtctx.go | 2 + store/copr/BUILD.bazel | 1 + store/copr/coprocessor.go | 16 ++ store/driver/BUILD.bazel | 2 - store/driver/tikv_driver.go | 22 --- store/mockstore/unistore/BUILD.bazel | 2 +- store/mockstore/unistore/pd.go | 98 ++---------- tidb-server/main.go | 5 +- util/dbterror/exeerrors/errors.go | 1 + 35 files changed, 445 insertions(+), 303 deletions(-) delete mode 100644 domain/infosync/resource_group_manager.go create mode 100644 domain/infosync/resource_manager_client.go diff --git a/DEPS.bzl b/DEPS.bzl index 0e50825601ae7..435cc1e06a6df 100644 --- a/DEPS.bzl +++ b/DEPS.bzl @@ -4230,8 +4230,8 @@ def go_deps(): name = "com_github_tikv_client_go_v2", build_file_proto_mode = "disable_global", importpath = "github.com/tikv/client-go/v2", - sum = "h1:BDQQDtqhUo3epgVc8HOPYLg9w8zehRaegEX+1bHMyVg=", - version = "v2.0.8-0.20230614032106-0b4b0ca00e74", + sum = "h1:k6GnsFTv7l9UdAiaZYlWLsTVDzOVFJY3lf/FCcDHoQ4=", + version = "v2.0.8-0.20230615161845-b32f340d0609", ) go_repository( name = "com_github_tikv_pd", @@ -4245,8 +4245,8 @@ def go_deps(): name = "com_github_tikv_pd_client", build_file_proto_mode = "disable_global", importpath = "github.com/tikv/pd/client", - sum = "h1:GZdnq41Gs/Zxz0Ne/cZ/rKual+GD8F3h434OwhZogfA=", - version = "v0.0.0-20230608061541-026ddf08a351", + sum = "h1:a5SATBxu/0Z6qNnz4KXDN91gDA06waaYcHM6dkb6lz4=", + version = "v0.0.0-20230613052906-7158cb319935", ) go_repository( name = "com_github_timakin_bodyclose", @@ -6041,15 +6041,15 @@ def go_deps(): name = "org_golang_google_genproto_googleapis_api", build_file_proto_mode = "disable", importpath = "google.golang.org/genproto/googleapis/api", - sum = "h1:AZX1ra8YbFMSb7+1pI8S9v4rrgRR7jU1FmuFSSjTVcQ=", - version = "v0.0.0-20230526203410-71b5a4ffd15e", + sum = "h1:HiYVD+FGJkTo+9zj1gqz0anapsa1JxjiSrN+BJKyUmE=", + version = "v0.0.0-20230525234020-1aefcd67740a", ) go_repository( name = "org_golang_google_genproto_googleapis_rpc", build_file_proto_mode = "disable", importpath = "google.golang.org/genproto/googleapis/rpc", - sum = "h1:0nDDozoAU19Qb2HwhXadU8OcsiO/09cnTqhUtq2MEOM=", - version = "v0.0.0-20230525234030-28d5490b6b19", + sum = "h1:XSJ8Vk1SWuNr8S18z1NZSziL0CPIXLCCMDOEFtHBOFc=", + version = "v0.0.0-20230530153820-e85fd2cbaebc", ) go_repository( diff --git a/build/nogo_config.json b/build/nogo_config.json index 24fbbc49979d1..fb2db9a5ca40a 100644 --- a/build/nogo_config.json +++ b/build/nogo_config.json @@ -933,7 +933,8 @@ "br/pkg/lightning/checkpoints/checkpoints.go": "cfg.TikvImporter.Addr is deprecated", "br/pkg/lightning/checkpoints/glue_checkpoint.go": "cfg.TikvImporter.Addr is deprecated", "br/pkg/lightning/backend/local/local.go": "grpc Compressor/Decompressor is deprecated", - "br/pkg/lightning/backend/local/compress.go": "grpc Compressor/Decompressor is deprecated" + "br/pkg/lightning/backend/local/compress.go": "grpc Compressor/Decompressor is deprecated", + "domain/infosync/resource_manager_client.go": "github.com/golang/protobuf deprecated" }, "only_files": { "expression/bench_test.go": "expression/bench_test.go", diff --git a/ddl/tests/resourcegroup/BUILD.bazel b/ddl/tests/resourcegroup/BUILD.bazel index fefe8fcc06e53..c34f56c33a041 100644 --- a/ddl/tests/resourcegroup/BUILD.bazel +++ b/ddl/tests/resourcegroup/BUILD.bazel @@ -6,13 +6,14 @@ go_test( srcs = ["resource_group_test.go"], flaky = True, race = "on", - shard_count = 5, + shard_count = 6, deps = [ "//ddl/resourcegroup", "//ddl/util/callback", "//domain", "//domain/infosync", "//errno", + "//parser/auth", "//parser/model", "//sessionctx", "//testkit", diff --git a/ddl/tests/resourcegroup/resource_group_test.go b/ddl/tests/resourcegroup/resource_group_test.go index dfb30b6092faa..3e3c7028acad3 100644 --- a/ddl/tests/resourcegroup/resource_group_test.go +++ b/ddl/tests/resourcegroup/resource_group_test.go @@ -28,6 +28,7 @@ import ( "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/domain/infosync" mysql "github.com/pingcap/tidb/errno" + "github.com/pingcap/tidb/parser/auth" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/testkit" @@ -217,6 +218,31 @@ func testResourceGroupNameFromIS(t *testing.T, ctx sessionctx.Context, name stri return g } +func TestResourceGroupRunaway(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + require.NoError(t, tk.Session().Auth(&auth.UserIdentity{Username: "root", Hostname: "localhost"}, nil, nil, nil)) + + tk.MustExec("use test") + tk.MustExec("create table t(a int)") + tk.MustExec("insert into t values(1)") + + tk.MustExec("set global tidb_enable_resource_control='on'") + tk.MustExec("create resource group rg1 RU_PER_SEC=1000 QUERY_LIMIT=(EXEC_ELAPSED='100ms' ACTION=KILL)") + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/store/copr/sleepCoprRequest", `sleep(500)`)) + defer func() { + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/store/copr/sleepCoprRequest")) + }() + err := tk.QueryToErr("select /*+ resource_group(rg1) */ * from t") + require.ErrorContains(t, err, "[executor:8253]Query execution was interrupted, identified as runaway query") + + tk.MustExec("alter resource group rg1 RU_PER_SEC=1000 QUERY_LIMIT=(EXEC_ELAPSED='100ms' ACTION=COOLDOWN)") + tk.MustQuery("select /*+ resource_group(rg1) */ * from t") + + tk.MustExec("alter resource group rg1 RU_PER_SEC=1000 QUERY_LIMIT=(EXEC_ELAPSED='100ms' ACTION=DRYRUN)") + tk.MustQuery("select /*+ resource_group(rg1) */ * from t") +} + func TestResourceGroupHint(t *testing.T) { store := testkit.CreateMockStore(t) tk := testkit.NewTestKit(t, store) diff --git a/distsql/request_builder.go b/distsql/request_builder.go index 2d515c4f8853d..a86df57c0763e 100644 --- a/distsql/request_builder.go +++ b/distsql/request_builder.go @@ -309,6 +309,7 @@ func (builder *RequestBuilder) SetFromSessionVars(sv *variable.SessionVars) *Req builder.StoreBatchSize = sv.StoreBatchSize builder.Request.ResourceGroupName = sv.ResourceGroupName builder.Request.StoreBusyThreshold = sv.LoadBasedReplicaReadThreshold + builder.Request.RunawayChecker = sv.StmtCtx.RunawayChecker return builder } diff --git a/domain/BUILD.bazel b/domain/BUILD.bazel index cb2e31915513d..466e68b794222 100644 --- a/domain/BUILD.bazel +++ b/domain/BUILD.bazel @@ -34,6 +34,7 @@ go_library( "//domain/globalconfigsync", "//domain/infosync", "//domain/metrics", + "//domain/resourcegroup", "//errno", "//infoschema", "//infoschema/perfschema", @@ -87,6 +88,7 @@ go_library( "@com_github_tikv_client_go_v2//tikv", "@com_github_tikv_client_go_v2//txnkv/transaction", "@com_github_tikv_pd_client//:client", + "@com_github_tikv_pd_client//resource_group/controller", "@io_etcd_go_etcd_client_v3//:client", "@io_etcd_go_etcd_client_v3//concurrency", "@org_golang_google_grpc//:grpc", diff --git a/domain/domain.go b/domain/domain.go index 1ad87f10bb315..2aaa57f9b5fa9 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -45,6 +45,7 @@ import ( "github.com/pingcap/tidb/disttask/framework/storage" "github.com/pingcap/tidb/domain/globalconfigsync" "github.com/pingcap/tidb/domain/infosync" + "github.com/pingcap/tidb/domain/resourcegroup" "github.com/pingcap/tidb/errno" "github.com/pingcap/tidb/infoschema" "github.com/pingcap/tidb/infoschema/perfschema" @@ -86,6 +87,7 @@ import ( "github.com/tikv/client-go/v2/tikv" "github.com/tikv/client-go/v2/txnkv/transaction" pd "github.com/tikv/pd/client" + rmclient "github.com/tikv/pd/client/resource_group/controller" clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/client/v3/concurrency" atomicutil "go.uber.org/atomic" @@ -147,17 +149,19 @@ type Domain struct { memoryUsageAlarmHandle *memoryusagealarm.Handle serverMemoryLimitHandle *servermemorylimit.Handle // TODO: use Run for each process in future pr - wg *util.WaitGroupEnhancedWrapper - statsUpdating atomicutil.Int32 - cancel context.CancelFunc - indexUsageSyncLease time.Duration - dumpFileGcChecker *dumpFileGcChecker - planReplayerHandle *planReplayerHandle - extractTaskHandle *ExtractHandle - expiredTimeStamp4PC types.Time - logBackupAdvancer *daemon.OwnerDaemon - historicalStatsWorker *HistoricalStatsWorker - ttlJobManager atomic.Pointer[ttlworker.JobManager] + wg *util.WaitGroupEnhancedWrapper + statsUpdating atomicutil.Int32 + cancel context.CancelFunc + indexUsageSyncLease time.Duration + dumpFileGcChecker *dumpFileGcChecker + planReplayerHandle *planReplayerHandle + extractTaskHandle *ExtractHandle + expiredTimeStamp4PC types.Time + logBackupAdvancer *daemon.OwnerDaemon + historicalStatsWorker *HistoricalStatsWorker + ttlJobManager atomic.Pointer[ttlworker.JobManager] + runawayManager *resourcegroup.RunawayManager + resourceGroupsController *rmclient.ResourceGroupsController serverID uint64 serverIDSession *concurrency.Session @@ -1179,6 +1183,10 @@ func (do *Domain) Init( if err != nil { return err } + err = do.initResourceGroupsController(ctx, pdCli) + if err != nil { + return err + } do.globalCfgSyncer = globalconfigsync.NewGlobalConfigSyncer(pdCli) err = do.ddl.SchemaSyncer().Init(ctx) if err != nil { @@ -1235,6 +1243,23 @@ func (do *Domain) SetOnClose(onClose func()) { do.onClose = onClose } +func (do *Domain) initResourceGroupsController(ctx context.Context, pdClient pd.Client) error { + if pdClient == nil { + logutil.BgLogger().Warn("cannot setup up resource controller, not using tikv storage") + // return nil as unistore doesn't support it + return nil + } + + control, err := rmclient.NewResourceGroupController(ctx, do.ServerID(), pdClient, nil, rmclient.WithMaxWaitDuration(resourcegroup.MaxWaitDuration)) + if err != nil { + return err + } + control.Start(ctx) + do.runawayManager = resourcegroup.NewRunawayManager(control) + do.resourceGroupsController = control + return nil +} + func (do *Domain) initLogBackup(ctx context.Context, pdClient pd.Client) error { cfg := config.GetGlobalConfig() if pdClient == nil || do.etcdClient == nil { @@ -1917,6 +1942,21 @@ func (do *Domain) SetupPlanReplayerHandle(collectorSctx sessionctx.Context, work } } +// RunawayManager returns the runaway manager. +func (do *Domain) RunawayManager() *resourcegroup.RunawayManager { + return do.runawayManager +} + +// ResourceGroupsController returns the resource groups controller. +func (do *Domain) ResourceGroupsController() *rmclient.ResourceGroupsController { + return do.resourceGroupsController +} + +// SetResourceGroupsController is only used in test. +func (do *Domain) SetResourceGroupsController(controller *rmclient.ResourceGroupsController) { + do.resourceGroupsController = controller +} + // SetupHistoricalStatsWorker setups worker func (do *Domain) SetupHistoricalStatsWorker(ctx sessionctx.Context) { do.historicalStatsWorker = &HistoricalStatsWorker{ diff --git a/domain/infosync/BUILD.bazel b/domain/infosync/BUILD.bazel index c5781932fe650..2eed57e867ac8 100644 --- a/domain/infosync/BUILD.bazel +++ b/domain/infosync/BUILD.bazel @@ -9,7 +9,7 @@ go_library( "mock_info.go", "placement_manager.go", "region.go", - "resource_group_manager.go", + "resource_manager_client.go", "schedule_manager.go", "tiflash_manager.go", ], @@ -39,6 +39,7 @@ go_library( "//util/pdapi", "//util/syncutil", "//util/versioninfo", + "@com_github_golang_protobuf//proto", "@com_github_gorilla_mux//:mux", "@com_github_pingcap_errors//:errors", "@com_github_pingcap_failpoint//:failpoint", diff --git a/domain/infosync/info.go b/domain/infosync/info.go index 33103da90af8a..4eb1fc7e8b1c0 100644 --- a/domain/infosync/info.go +++ b/domain/infosync/info.go @@ -119,7 +119,7 @@ type InfoSyncer struct { placementManager PlacementManager scheduleManager ScheduleManager tiflashReplicaManager TiFlashReplicaManager - resourceGroupManager pd.ResourceManagerClient + resourceManagerClient pd.ResourceManagerClient } // ServerInfo is server static information. @@ -211,7 +211,7 @@ func GlobalInfoSyncerInit( is.placementManager = initPlacementManager(etcdCli) is.scheduleManager = initScheduleManager(etcdCli) is.tiflashReplicaManager = initTiFlashReplicaManager(etcdCli, codec) - is.resourceGroupManager = initResourceGroupManager(pdCli) + is.resourceManagerClient = initResourceManagerClient(pdCli) setGlobalInfoSyncer(is) return is, nil } @@ -256,10 +256,10 @@ func initPlacementManager(etcdCli *clientv3.Client) PlacementManager { return &PDPlacementManager{etcdCli: etcdCli} } -func initResourceGroupManager(pdCli pd.Client) (cli pd.ResourceManagerClient) { +func initResourceManagerClient(pdCli pd.Client) (cli pd.ResourceManagerClient) { cli = pdCli if pdCli == nil { - cli = NewMockResourceGroupManager() + cli = NewMockResourceManagerClient() } failpoint.Inject("managerAlreadyCreateSomeGroups", func(val failpoint.Value) { if val.(bool) { @@ -589,7 +589,7 @@ func GetResourceGroup(ctx context.Context, name string) (*rmpb.ResourceGroup, er return nil, err } - return is.resourceGroupManager.GetResourceGroup(ctx, name) + return is.resourceManagerClient.GetResourceGroup(ctx, name) } // ListResourceGroups is used to get all resource groups from resource manager. @@ -599,7 +599,7 @@ func ListResourceGroups(ctx context.Context) ([]*rmpb.ResourceGroup, error) { return nil, err } - return is.resourceGroupManager.ListResourceGroups(ctx) + return is.resourceManagerClient.ListResourceGroups(ctx) } // AddResourceGroup is used to create one specific resource group to resource manager. @@ -608,7 +608,7 @@ func AddResourceGroup(ctx context.Context, group *rmpb.ResourceGroup) error { if err != nil { return err } - _, err = is.resourceGroupManager.AddResourceGroup(ctx, group) + _, err = is.resourceManagerClient.AddResourceGroup(ctx, group) return err } @@ -618,7 +618,7 @@ func ModifyResourceGroup(ctx context.Context, group *rmpb.ResourceGroup) error { if err != nil { return err } - _, err = is.resourceGroupManager.ModifyResourceGroup(ctx, group) + _, err = is.resourceManagerClient.ModifyResourceGroup(ctx, group) return err } @@ -628,7 +628,7 @@ func DeleteResourceGroup(ctx context.Context, name string) error { if err != nil { return err } - _, err = is.resourceGroupManager.DeleteResourceGroup(ctx, name) + _, err = is.resourceManagerClient.DeleteResourceGroup(ctx, name) return err } diff --git a/domain/infosync/resource_group_manager.go b/domain/infosync/resource_group_manager.go deleted file mode 100644 index a508220ebf571..0000000000000 --- a/domain/infosync/resource_group_manager.go +++ /dev/null @@ -1,115 +0,0 @@ -// Copyright 2022 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package infosync - -import ( - "context" - "fmt" - "math" - "sync" - - "github.com/pingcap/kvproto/pkg/meta_storagepb" - rmpb "github.com/pingcap/kvproto/pkg/resource_manager" - "github.com/pingcap/tidb/domain/resourcegroup" - pd "github.com/tikv/pd/client" -) - -type mockResourceGroupManager struct { - sync.RWMutex - groups map[string]*rmpb.ResourceGroup -} - -// NewMockResourceGroupManager return a mock ResourceManagerClient for test usage. -func NewMockResourceGroupManager() pd.ResourceManagerClient { - mockMgr := &mockResourceGroupManager{ - groups: make(map[string]*rmpb.ResourceGroup), - } - mockMgr.groups[resourcegroup.DefaultResourceGroupName] = &rmpb.ResourceGroup{ - Name: resourcegroup.DefaultResourceGroupName, - Mode: rmpb.GroupMode_RUMode, - RUSettings: &rmpb.GroupRequestUnitSettings{ - RU: &rmpb.TokenBucket{ - Settings: &rmpb.TokenLimitSettings{ - FillRate: math.MaxInt32, - BurstLimit: -1, - }, - }, - }, - Priority: 8, - } - return mockMgr -} - -var _ pd.ResourceManagerClient = (*mockResourceGroupManager)(nil) - -func (m *mockResourceGroupManager) ListResourceGroups(ctx context.Context) ([]*rmpb.ResourceGroup, error) { - m.RLock() - defer m.RUnlock() - groups := make([]*rmpb.ResourceGroup, 0, len(m.groups)) - for _, group := range m.groups { - groups = append(groups, group) - } - return groups, nil -} - -func (m *mockResourceGroupManager) GetResourceGroup(ctx context.Context, name string) (*rmpb.ResourceGroup, error) { - m.RLock() - defer m.RUnlock() - group, ok := m.groups[name] - if !ok { - return nil, fmt.Errorf("the group %s does not exist", name) - } - return group, nil -} - -func (m *mockResourceGroupManager) AddResourceGroup(ctx context.Context, group *rmpb.ResourceGroup) (string, error) { - m.Lock() - defer m.Unlock() - if _, ok := m.groups[group.Name]; ok { - return "", fmt.Errorf("the group %s already exists", group.Name) - } - m.groups[group.Name] = group - return "Success!", nil -} - -func (m *mockResourceGroupManager) ModifyResourceGroup(ctx context.Context, group *rmpb.ResourceGroup) (string, error) { - m.Lock() - defer m.Unlock() - m.groups[group.Name] = group - return "Success!", nil -} - -func (m *mockResourceGroupManager) DeleteResourceGroup(ctx context.Context, name string) (string, error) { - m.Lock() - defer m.Unlock() - delete(m.groups, name) - return "Success!", nil -} - -func (m *mockResourceGroupManager) AcquireTokenBuckets(ctx context.Context, request *rmpb.TokenBucketsRequest) ([]*rmpb.TokenBucketResponse, error) { - return nil, nil -} - -func (m *mockResourceGroupManager) WatchResourceGroup(ctx context.Context, revision int64) (chan []*rmpb.ResourceGroup, error) { - return nil, nil -} - -func (m *mockResourceGroupManager) Watch(ctx context.Context, key []byte, opts ...pd.OpOption) (chan []*meta_storagepb.Event, error) { - return nil, nil -} - -func (m *mockResourceGroupManager) LoadResourceGroups(ctx context.Context) ([]*rmpb.ResourceGroup, int64, error) { - return nil, 0, nil -} diff --git a/domain/infosync/resource_manager_client.go b/domain/infosync/resource_manager_client.go new file mode 100644 index 0000000000000..3233889b6a858 --- /dev/null +++ b/domain/infosync/resource_manager_client.go @@ -0,0 +1,147 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package infosync + +import ( + "context" + "fmt" + "math" + "sync" + + "github.com/golang/protobuf/proto" + "github.com/pingcap/kvproto/pkg/meta_storagepb" + rmpb "github.com/pingcap/kvproto/pkg/resource_manager" + "github.com/pingcap/tidb/domain/resourcegroup" + pd "github.com/tikv/pd/client" +) + +type mockResourceManagerClient struct { + sync.RWMutex + groups map[string]*rmpb.ResourceGroup + eventCh chan []*meta_storagepb.Event +} + +// NewMockResourceManagerClient return a mock ResourceManagerClient for test usage. +func NewMockResourceManagerClient() pd.ResourceManagerClient { + mockMgr := &mockResourceManagerClient{ + groups: make(map[string]*rmpb.ResourceGroup), + eventCh: make(chan []*meta_storagepb.Event, 100), + } + mockMgr.groups[resourcegroup.DefaultResourceGroupName] = &rmpb.ResourceGroup{ + Name: resourcegroup.DefaultResourceGroupName, + Mode: rmpb.GroupMode_RUMode, + RUSettings: &rmpb.GroupRequestUnitSettings{ + RU: &rmpb.TokenBucket{ + Settings: &rmpb.TokenLimitSettings{ + FillRate: math.MaxInt32, + BurstLimit: -1, + }, + }, + }, + Priority: 8, + } + return mockMgr +} + +var _ pd.ResourceManagerClient = (*mockResourceManagerClient)(nil) + +func (m *mockResourceManagerClient) ListResourceGroups(ctx context.Context) ([]*rmpb.ResourceGroup, error) { + m.RLock() + defer m.RUnlock() + groups := make([]*rmpb.ResourceGroup, 0, len(m.groups)) + for _, group := range m.groups { + groups = append(groups, group) + } + return groups, nil +} + +func (m *mockResourceManagerClient) GetResourceGroup(ctx context.Context, name string) (*rmpb.ResourceGroup, error) { + m.RLock() + defer m.RUnlock() + group, ok := m.groups[name] + if !ok { + return nil, fmt.Errorf("the group %s does not exist", name) + } + return group, nil +} + +func (m *mockResourceManagerClient) AddResourceGroup(ctx context.Context, group *rmpb.ResourceGroup) (string, error) { + m.Lock() + defer m.Unlock() + if _, ok := m.groups[group.Name]; ok { + return "", fmt.Errorf("the group %s already exists", group.Name) + } + m.groups[group.Name] = group + value, err := proto.Marshal(group) + if err != nil { + return "", err + } + m.eventCh <- []*meta_storagepb.Event{{ + Type: meta_storagepb.Event_PUT, + Kv: &meta_storagepb.KeyValue{ + Value: value, + }}} + + return "Success!", nil +} + +func (m *mockResourceManagerClient) ModifyResourceGroup(ctx context.Context, group *rmpb.ResourceGroup) (string, error) { + m.Lock() + defer m.Unlock() + m.groups[group.Name] = group + value, err := proto.Marshal(group) + if err != nil { + return "", err + } + m.eventCh <- []*meta_storagepb.Event{{ + Type: meta_storagepb.Event_PUT, + Kv: &meta_storagepb.KeyValue{ + Value: value, + }}} + return "Success!", nil +} + +func (m *mockResourceManagerClient) DeleteResourceGroup(ctx context.Context, name string) (string, error) { + m.Lock() + defer m.Unlock() + group := m.groups[name] + delete(m.groups, name) + value, err := proto.Marshal(group) + if err != nil { + return "", err + } + m.eventCh <- []*meta_storagepb.Event{{ + Type: meta_storagepb.Event_DELETE, + Kv: &meta_storagepb.KeyValue{ + Value: value, + }}} + return "Success!", nil +} + +func (m *mockResourceManagerClient) AcquireTokenBuckets(ctx context.Context, request *rmpb.TokenBucketsRequest) ([]*rmpb.TokenBucketResponse, error) { + return nil, nil +} + +func (m *mockResourceManagerClient) WatchResourceGroup(ctx context.Context, revision int64) (chan []*rmpb.ResourceGroup, error) { + return nil, nil +} + +func (m *mockResourceManagerClient) LoadResourceGroups(ctx context.Context) ([]*rmpb.ResourceGroup, int64, error) { + return nil, 0, nil +} + +func (m *mockResourceManagerClient) Watch(ctx context.Context, key []byte, opts ...pd.OpOption) (chan []*meta_storagepb.Event, error) { + return m.eventCh, nil +} diff --git a/domain/resourcegroup/BUILD.bazel b/domain/resourcegroup/BUILD.bazel index 58769da4b03d9..a74a601e4262b 100644 --- a/domain/resourcegroup/BUILD.bazel +++ b/domain/resourcegroup/BUILD.bazel @@ -5,4 +5,13 @@ go_library( srcs = ["runaway.go"], importpath = "github.com/pingcap/tidb/domain/resourcegroup", visibility = ["//visibility:public"], + deps = [ + "//util/dbterror/exeerrors", + "//util/logutil", + "@com_github_pingcap_kvproto//pkg/resource_manager", + "@com_github_tikv_client_go_v2//tikv", + "@com_github_tikv_client_go_v2//tikvrpc", + "@com_github_tikv_pd_client//resource_group/controller", + "@org_uber_go_zap//:zap", + ], ) diff --git a/domain/resourcegroup/runaway.go b/domain/resourcegroup/runaway.go index 72846c52dbf47..97cdeb30f22ac 100644 --- a/domain/resourcegroup/runaway.go +++ b/domain/resourcegroup/runaway.go @@ -14,5 +14,119 @@ package resourcegroup -// DefaultResourceGroupName is the name of the default resource group. -const DefaultResourceGroupName = "default" +import ( + "sync/atomic" + "time" + + rmpb "github.com/pingcap/kvproto/pkg/resource_manager" + "github.com/pingcap/tidb/util/dbterror/exeerrors" + "github.com/pingcap/tidb/util/logutil" + "github.com/tikv/client-go/v2/tikv" + "github.com/tikv/client-go/v2/tikvrpc" + rmclient "github.com/tikv/pd/client/resource_group/controller" + "go.uber.org/zap" +) + +const ( + // DefaultResourceGroupName is the default resource group name. + DefaultResourceGroupName = "default" + // MaxWaitDuration is the max duration to wait for acquiring token buckets. + MaxWaitDuration = time.Second * 30 +) + +// RunawayManager is used to detect and record runaway queries. +type RunawayManager struct { + resourceGroupCtl *rmclient.ResourceGroupsController + // TODO: add watch records +} + +// NewRunawayManager creates a new RunawayManager. +func NewRunawayManager(resourceGroupCtl *rmclient.ResourceGroupsController) *RunawayManager { + return &RunawayManager{ + resourceGroupCtl: resourceGroupCtl, + } +} + +// DeriveChecker derives a RunawayChecker from the given resource group +func (rm *RunawayManager) DeriveChecker(resourceGroupName string, originalSQL string, planDigest string) *RunawayChecker { + group, err := rm.resourceGroupCtl.GetResourceGroup(resourceGroupName) + if err != nil || group == nil { + logutil.BgLogger().Warn("cannot setup up runaway checker", zap.Error(err)) + return nil + } + if group.RunawaySettings == nil { + return nil + } + return newRunawayChecker(rm, group.RunawaySettings, originalSQL, planDigest) +} + +// MarkRunaway marks the query as runaway. +func (rm *RunawayManager) MarkRunaway(originalSQL string, planDigest string) { + // TODO: insert into watch records +} + +// RunawayChecker is used to check if the query is runaway. +type RunawayChecker struct { + manager *RunawayManager + originalSQL string + planDigest string + + deadline time.Time + action rmpb.RunawayAction + + marked atomic.Bool +} + +func newRunawayChecker(manager *RunawayManager, setting *rmpb.RunawaySettings, originalSQL string, planDigest string) *RunawayChecker { + return &RunawayChecker{ + manager: manager, + originalSQL: originalSQL, + planDigest: planDigest, + deadline: time.Now().Add(time.Duration(setting.Rule.ExecElapsedTimeMs) * time.Millisecond), + action: setting.Action, + marked: atomic.Bool{}, + } +} + +// BeforeCopRequest checks runaway and modifies the request if necessary before sending coprocessor request. +func (r *RunawayChecker) BeforeCopRequest(req *tikvrpc.Request) error { + marked := r.marked.Load() + if !marked { + until := time.Until(r.deadline) + if until > 0 { + if r.action == rmpb.RunawayAction_Kill { + // if the execution time is close to the threshold, set a timeout + if until < tikv.ReadTimeoutMedium { + req.Context.MaxExecutionDurationMs = uint64(until.Milliseconds()) + } + } + return nil + } + // execution time exceeds the threshold, mark the query as runaway + if r.marked.CompareAndSwap(false, true) { + r.manager.MarkRunaway(r.originalSQL, r.planDigest) + } + } + switch r.action { + case rmpb.RunawayAction_Kill: + return exeerrors.ErrResourceGroupQueryRunaway + case rmpb.RunawayAction_CoolDown: + req.ResourceControlContext.OverridePriority = 1 // set priority to lowest + return nil + case rmpb.RunawayAction_DryRun: + return nil + default: + return nil + } +} + +// AfterCopRequest checks runaway after receiving coprocessor response. +func (r *RunawayChecker) AfterCopRequest() { + // Do not perform action here as it may be the last cop request and just let it finish. If it's not the last cop request, action would be performed in `BeforeCopRequest` when handling the next cop request. + // Here only marks the query as runaway + if !r.marked.Load() && r.deadline.Before(time.Now()) { + if r.marked.CompareAndSwap(false, true) { + r.manager.MarkRunaway(r.originalSQL, r.planDigest) + } + } +} diff --git a/errno/errcode.go b/errno/errcode.go index f0b9141feb188..d65af4703d13e 100644 --- a/errno/errcode.go +++ b/errno/errcode.go @@ -1138,6 +1138,7 @@ const ( ErrResourceGroupSupportDisabled = 8250 ErrResourceGroupConfigUnavailable = 8251 ErrResourceGroupThrottled = 8252 + ErrResourceGroupQueryRunaway = 8253 // TiKV/PD/TiFlash errors. ErrPDServerTimeout = 9001 diff --git a/errno/errname.go b/errno/errname.go index 0d60f71e0c4fb..8229948d57d6b 100644 --- a/errno/errname.go +++ b/errno/errname.go @@ -1138,6 +1138,7 @@ var MySQLErrName = map[uint16]*mysql.ErrMessage{ ErrResourceGroupSupportDisabled: mysql.Message("Resource control feature is disabled. Run `SET GLOBAL tidb_enable_resource_control='on'` to enable the feature", nil), ErrResourceGroupConfigUnavailable: mysql.Message("Resource group configuration is unavailable", nil), ErrResourceGroupThrottled: mysql.Message("Exceeded resource group quota limitation", nil), + ErrResourceGroupQueryRunaway: mysql.Message("Query execution was interrupted, identified as runaway query", nil), // TiKV/PD errors. ErrPDServerTimeout: mysql.Message("PD server timeout: %s", nil), diff --git a/errors.toml b/errors.toml index df676b7085639..7342abebeccb3 100644 --- a/errors.toml +++ b/errors.toml @@ -1851,6 +1851,11 @@ error = ''' Failed to split region ranges: %s ''' +["executor:8253"] +error = ''' +Query execution was interrupted, identified as runaway query +''' + ["expression:1139"] error = ''' Got error '%-.64s' from regexp diff --git a/executor/BUILD.bazel b/executor/BUILD.bazel index c522c41b98021..8947043060bbd 100644 --- a/executor/BUILD.bazel +++ b/executor/BUILD.bazel @@ -256,7 +256,6 @@ go_library( "@com_github_tikv_client_go_v2//txnkv/txnsnapshot", "@com_github_tikv_client_go_v2//util", "@com_github_tikv_pd_client//:client", - "@com_github_tikv_pd_client//resource_group/controller", "@com_github_twmb_murmur3//:murmur3", "@com_sourcegraph_sourcegraph_appdash//:appdash", "@com_sourcegraph_sourcegraph_appdash//opentracing", diff --git a/executor/adapter.go b/executor/adapter.go index 4a511c966a0fb..c1dd71058a742 100644 --- a/executor/adapter.go +++ b/executor/adapter.go @@ -526,6 +526,11 @@ func (a *ExecStmt) Exec(ctx context.Context) (_ sqlexec.RecordSet, err error) { } // ExecuteExec will rewrite `a.Plan`, so set plan label should be executed after `a.buildExecutor`. ctx = a.observeStmtBeginForTopSQL(ctx) + if variable.EnableResourceControl.Load() && domain.GetDomain(sctx).RunawayManager() != nil { + stmtCtx := sctx.GetSessionVars().StmtCtx + _, planDigest := GetPlanDigest(stmtCtx) + stmtCtx.RunawayChecker = domain.GetDomain(sctx).RunawayManager().DeriveChecker(sctx.GetSessionVars().ResourceGroupName, stmtCtx.OriginalSQL, planDigest.String()) + } breakpoint.Inject(a.Ctx, sessiontxn.BreakPointBeforeExecutorFirstRun) if err = a.openExecutor(ctx, e); err != nil { @@ -1529,7 +1534,7 @@ func (a *ExecStmt) LogSlowQuery(txnTS uint64, succ bool, hasMoreResults bool) { copTaskInfo := stmtCtx.CopTasksDetails() memMax := sessVars.MemTracker.MaxConsumed() diskMax := sessVars.DiskTracker.MaxConsumed() - _, planDigest := getPlanDigest(stmtCtx) + _, planDigest := GetPlanDigest(stmtCtx) binaryPlan := "" if variable.GenerateBinaryPlan.Load() { @@ -1727,8 +1732,8 @@ func getPlanTree(stmtCtx *stmtctx.StatementContext) string { return variable.SlowLogPlanPrefix + planTree + variable.SlowLogPlanSuffix } -// getPlanDigest will try to get the select plan tree if the plan is select or the select plan of delete/update/insert statement. -func getPlanDigest(stmtCtx *stmtctx.StatementContext) (string, *parser.Digest) { +// GetPlanDigest will try to get the select plan tree if the plan is select or the select plan of delete/update/insert statement. +func GetPlanDigest(stmtCtx *stmtctx.StatementContext) (string, *parser.Digest) { normalized, planDigest := stmtCtx.GetPlanDigest() if len(normalized) > 0 && planDigest != nil { return normalized, planDigest @@ -1831,11 +1836,11 @@ func (a *ExecStmt) SummaryStmt(succ bool) { var planDigestGen func() string if a.Plan.TP() == plancodec.TypePointGet { planDigestGen = func() string { - _, planDigest := getPlanDigest(stmtCtx) + _, planDigest := GetPlanDigest(stmtCtx) return planDigest.String() } } else { - _, tmp := getPlanDigest(stmtCtx) + _, tmp := GetPlanDigest(stmtCtx) planDigest = tmp.String() } @@ -1938,7 +1943,7 @@ func (a *ExecStmt) observeStmtBeginForTopSQL(ctx context.Context) context.Contex vars := a.Ctx.GetSessionVars() sc := vars.StmtCtx normalizedSQL, sqlDigest := sc.SQLDigest() - normalizedPlan, planDigest := getPlanDigest(sc) + normalizedPlan, planDigest := GetPlanDigest(sc) var sqlDigestByte, planDigestByte []byte if sqlDigest != nil { sqlDigestByte = sqlDigest.Bytes() diff --git a/executor/calibrate_resource.go b/executor/calibrate_resource.go index f82c349a1ba59..55ee8a1700b98 100644 --- a/executor/calibrate_resource.go +++ b/executor/calibrate_resource.go @@ -23,6 +23,7 @@ import ( "github.com/docker/go-units" "github.com/pingcap/errors" + "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/infoschema" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/parser/ast" @@ -34,7 +35,6 @@ import ( "github.com/pingcap/tidb/util/mathutil" "github.com/pingcap/tidb/util/sqlexec" "github.com/tikv/client-go/v2/oracle" - rmclient "github.com/tikv/pd/client/resource_group/controller" ) var ( @@ -76,21 +76,8 @@ var ( writeReqCount: 3550, }, } - - // resourceGroupCtl is the ResourceGroupController in pd client - resourceGroupCtl *rmclient.ResourceGroupsController ) -// SetResourceGroupController set a inited ResourceGroupsController for calibrate usage. -func SetResourceGroupController(rc *rmclient.ResourceGroupsController) { - resourceGroupCtl = rc -} - -// GetResourceGroupController returns the ResourceGroupsController. -func GetResourceGroupController() *rmclient.ResourceGroupsController { - return resourceGroupCtl -} - // the resource cost rate of a specified workload per 1 tikv cpu. type baseResourceCost struct { // represents the average ratio of TiDB CPU time to TiKV CPU time, this is used to calculate whether tikv cpu @@ -279,6 +266,7 @@ func (e *calibrateResourceExec) staticCalibrate(ctx context.Context, req *chunk. if !variable.EnableResourceControl.Load() { return infoschema.ErrResourceGroupSupportDisabled } + resourceGroupCtl := domain.GetDomain(e.ctx).ResourceGroupsController() // first fetch the ru settings config. if resourceGroupCtl == nil { return errors.New("resource group controller is not initialized") diff --git a/executor/calibrate_resource_test.go b/executor/calibrate_resource_test.go index f4850f9d056c8..71d31afe1c19a 100644 --- a/executor/calibrate_resource_test.go +++ b/executor/calibrate_resource_test.go @@ -21,7 +21,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" - "github.com/pingcap/tidb/executor" + "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/testkit" "github.com/pingcap/tidb/types" @@ -44,16 +44,10 @@ func TestCalibrateResource(t *testing.T) { tk.MustExec("SET GLOBAL tidb_enable_resource_control='ON';") - // resource group controller is not inited. - rs, err = tk.Exec("CALIBRATE RESOURCE") - require.NoError(t, err) - require.NotNil(t, rs) - err = rs.Next(context.Background(), rs.NewChunk(nil)) - require.ErrorContains(t, err, "resource group controller is not initialized") - - oldResourceCtl := executor.GetResourceGroupController() + do := domain.GetDomain(tk.Session()) + oldResourceCtl := do.ResourceGroupsController() defer func() { - executor.SetResourceGroupController(oldResourceCtl) + do.SetResourceGroupsController(oldResourceCtl) }() mockPrivider := &mockResourceGroupProvider{ @@ -69,7 +63,7 @@ func TestCalibrateResource(t *testing.T) { } resourceCtl, err := rmclient.NewResourceGroupController(context.Background(), 1, mockPrivider, nil) require.NoError(t, err) - executor.SetResourceGroupController(resourceCtl) + do.SetResourceGroupsController(resourceCtl) // empty metrics error rs, err = tk.Exec("CALIBRATE RESOURCE") diff --git a/executor/executor.go b/executor/executor.go index c399743b5f1b8..1246c943f90ca 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -2169,6 +2169,7 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) { sc.InExplainStmt = true sc.InVerboseExplain = strings.ToLower(explainForStmt.Format) == types.ExplainFormatVerbose } + // TODO: Many same bool variables here. // We should set only two variables ( // IgnoreErr and StrictSQLMode) to avoid setting the same bool variables and @@ -2292,6 +2293,7 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) { vars.DurationWaitTS = 0 vars.CurrInsertBatchExtraCols = nil vars.CurrInsertValues = chunk.Row{} + return } diff --git a/go.mod b/go.mod index a54097cc606c9..fb93f215d155d 100644 --- a/go.mod +++ b/go.mod @@ -96,8 +96,8 @@ require ( github.com/stretchr/testify v1.8.4 github.com/tdakkota/asciicheck v0.2.0 github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 - github.com/tikv/client-go/v2 v2.0.8-0.20230614032106-0b4b0ca00e74 - github.com/tikv/pd/client v0.0.0-20230608061541-026ddf08a351 + github.com/tikv/client-go/v2 v2.0.8-0.20230615161845-b32f340d0609 + github.com/tikv/pd/client v0.0.0-20230613052906-7158cb319935 github.com/timakin/bodyclose v0.0.0-20230421092635-574207250966 github.com/twmb/murmur3 v1.1.6 github.com/uber/jaeger-client-go v2.22.1+incompatible diff --git a/go.sum b/go.sum index 0095bc3e978a6..b6676786e2df7 100644 --- a/go.sum +++ b/go.sum @@ -972,10 +972,10 @@ github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 h1:mbAskLJ0oJf github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2/go.mod h1:2PfKggNGDuadAa0LElHrByyrz4JPZ9fFx6Gs7nx7ZZU= github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a h1:J/YdBZ46WKpXsxsW93SG+q0F8KI+yFrcIDT4c/RNoc4= github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a/go.mod h1:h4xBhSNtOeEosLJ4P7JyKXX7Cabg7AVkWCK5gV2vOrM= -github.com/tikv/client-go/v2 v2.0.8-0.20230614032106-0b4b0ca00e74 h1:BDQQDtqhUo3epgVc8HOPYLg9w8zehRaegEX+1bHMyVg= -github.com/tikv/client-go/v2 v2.0.8-0.20230614032106-0b4b0ca00e74/go.mod h1:4KkKqjJgKlvvWMyNqdnAlYFfV4QjEj1fEb5Hb/FoT88= -github.com/tikv/pd/client v0.0.0-20230608061541-026ddf08a351 h1:GZdnq41Gs/Zxz0Ne/cZ/rKual+GD8F3h434OwhZogfA= -github.com/tikv/pd/client v0.0.0-20230608061541-026ddf08a351/go.mod h1:YmNkj9UT8IjwFov9k3oquH0UgIUHniUaQT3jXKgZYbM= +github.com/tikv/client-go/v2 v2.0.8-0.20230615161845-b32f340d0609 h1:k6GnsFTv7l9UdAiaZYlWLsTVDzOVFJY3lf/FCcDHoQ4= +github.com/tikv/client-go/v2 v2.0.8-0.20230615161845-b32f340d0609/go.mod h1:4KkKqjJgKlvvWMyNqdnAlYFfV4QjEj1fEb5Hb/FoT88= +github.com/tikv/pd/client v0.0.0-20230613052906-7158cb319935 h1:a5SATBxu/0Z6qNnz4KXDN91gDA06waaYcHM6dkb6lz4= +github.com/tikv/pd/client v0.0.0-20230613052906-7158cb319935/go.mod h1:YmNkj9UT8IjwFov9k3oquH0UgIUHniUaQT3jXKgZYbM= github.com/timakin/bodyclose v0.0.0-20230421092635-574207250966 h1:quvGphlmUVU+nhpFa4gg4yJyTRJ13reZMDHrKwYw53M= github.com/timakin/bodyclose v0.0.0-20230421092635-574207250966/go.mod h1:27bSVNWSBOHm+qRp1T9qzaIpsWEP6TbUnei/43HK+PQ= github.com/tklauser/go-sysconf v0.3.9/go.mod h1:11DU/5sG7UexIrp/O6g35hrWzu0JxlwQ3LSFUzyeuhs= diff --git a/kv/BUILD.bazel b/kv/BUILD.bazel index a1817ba4083e1..2fa197b037f53 100644 --- a/kv/BUILD.bazel +++ b/kv/BUILD.bazel @@ -23,6 +23,7 @@ go_library( visibility = ["//visibility:public"], deps = [ "//config", + "//domain/resourcegroup", "//errno", "//parser/model", "//parser/mysql", diff --git a/kv/kv.go b/kv/kv.go index ec74e7ef02ba9..7b320d97df638 100644 --- a/kv/kv.go +++ b/kv/kv.go @@ -25,6 +25,7 @@ import ( "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/tidb/config" + "github.com/pingcap/tidb/domain/resourcegroup" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/util/memory" "github.com/pingcap/tidb/util/trxevents" @@ -579,6 +580,8 @@ type Request struct { // StoreBusyThreshold is the threshold for the store to return ServerIsBusy StoreBusyThreshold time.Duration + RunawayChecker *resourcegroup.RunawayChecker + // ConnID stores the session connection id. ConnID uint64 } diff --git a/sessionctx/stmtctx/BUILD.bazel b/sessionctx/stmtctx/BUILD.bazel index d02edd86138a1..b0b4239eab7ea 100644 --- a/sessionctx/stmtctx/BUILD.bazel +++ b/sessionctx/stmtctx/BUILD.bazel @@ -6,6 +6,7 @@ go_library( importpath = "github.com/pingcap/tidb/sessionctx/stmtctx", visibility = ["//visibility:public"], deps = [ + "//domain/resourcegroup", "//errno", "//parser", "//parser/ast", diff --git a/sessionctx/stmtctx/stmtctx.go b/sessionctx/stmtctx/stmtctx.go index f248e80a66359..4d1e2773b4b68 100644 --- a/sessionctx/stmtctx/stmtctx.go +++ b/sessionctx/stmtctx/stmtctx.go @@ -27,6 +27,7 @@ import ( "time" "github.com/pingcap/errors" + "github.com/pingcap/tidb/domain/resourcegroup" "github.com/pingcap/tidb/errno" "github.com/pingcap/tidb/parser" "github.com/pingcap/tidb/parser/ast" @@ -245,6 +246,7 @@ type StatementContext struct { NotFillCache bool MemTracker *memory.Tracker DiskTracker *disk.Tracker + RunawayChecker *resourcegroup.RunawayChecker IsTiFlash atomic2.Bool RuntimeStatsColl *execdetails.RuntimeStatsColl TableIDs []int64 diff --git a/store/copr/BUILD.bazel b/store/copr/BUILD.bazel index aa137de8c9df3..56c271066a478 100644 --- a/store/copr/BUILD.bazel +++ b/store/copr/BUILD.bazel @@ -18,6 +18,7 @@ go_library( deps = [ "//config", "//domain/infosync", + "//domain/resourcegroup", "//errno", "//kv", "//metrics", diff --git a/store/copr/coprocessor.go b/store/copr/coprocessor.go index 99d50bbbdb9ea..0fe457849e85d 100644 --- a/store/copr/coprocessor.go +++ b/store/copr/coprocessor.go @@ -34,6 +34,7 @@ import ( "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/tidb/domain/infosync" + "github.com/pingcap/tidb/domain/resourcegroup" "github.com/pingcap/tidb/errno" "github.com/pingcap/tidb/kv" tidbmetrics "github.com/pingcap/tidb/metrics" @@ -183,6 +184,7 @@ func (c *CopClient) BuildCopIterator(ctx context.Context, req *kv.Request, vars replicaReadSeed: c.replicaReadSeed, rpcCancel: tikv.NewRPCanceller(), buildTaskElapsed: *buildOpt.elapsed, + runawayChecker: req.RunawayChecker, } it.tasks = tasks if it.concurrency > len(tasks) { @@ -683,6 +685,8 @@ type copIterator struct { buildTaskElapsed time.Duration storeBatchedNum atomic.Uint64 storeBatchedFallbackNum atomic.Uint64 + + runawayChecker *resourcegroup.RunawayChecker } // copIteratorWorker receives tasks from copIteratorTaskSender, handles tasks and sends the copResponse to respChan. @@ -1165,6 +1169,13 @@ func (worker *copIteratorWorker) handleTaskOnce(bo *Backoffer, task *copTask, ch if worker.req.ResourceGroupTagger != nil { worker.req.ResourceGroupTagger(req) } + failpoint.Inject("sleepCoprRequest", nil) + + if worker.req.RunawayChecker != nil { + if err := worker.req.RunawayChecker.BeforeCopRequest(req); err != nil { + return nil, err + } + } req.StoreTp = getEndPointType(task.storeType) startTime := time.Now() if worker.kvclient.Stats == nil { @@ -1200,12 +1211,17 @@ func (worker *copIteratorWorker) handleTaskOnce(bo *Backoffer, task *copTask, ch // Set task.storeAddr field so its task.String() method have the store address information. task.storeAddr = storeAddr + costTime := time.Since(startTime) copResp := resp.Resp.(*coprocessor.Response) if costTime > minLogCopTaskTime { worker.logTimeCopTask(costTime, task, bo, copResp) } + if worker.req.RunawayChecker != nil { + worker.req.RunawayChecker.AfterCopRequest() + } + storeID := strconv.FormatUint(req.Context.GetPeer().GetStoreId(), 10) isInternal := util.IsRequestSourceInternal(&task.requestSource) scope := metrics.LblGeneral diff --git a/store/driver/BUILD.bazel b/store/driver/BUILD.bazel index 0252bf21a1d23..497874e57b3a8 100644 --- a/store/driver/BUILD.bazel +++ b/store/driver/BUILD.bazel @@ -6,7 +6,6 @@ go_library( importpath = "github.com/pingcap/tidb/store/driver", visibility = ["//visibility:public"], deps = [ - "//executor", "//executor/importer", "//kv", "//sessionctx/variable", @@ -23,7 +22,6 @@ go_library( "@com_github_tikv_client_go_v2//tikvrpc", "@com_github_tikv_client_go_v2//util", "@com_github_tikv_pd_client//:client", - "@com_github_tikv_pd_client//resource_group/controller", "@org_golang_google_grpc//:grpc", "@org_golang_google_grpc//keepalive", "@org_uber_go_zap//:zap", diff --git a/store/driver/tikv_driver.go b/store/driver/tikv_driver.go index bcd70ecb811c7..1834325b126a5 100644 --- a/store/driver/tikv_driver.go +++ b/store/driver/tikv_driver.go @@ -27,7 +27,6 @@ import ( "github.com/pingcap/errors" deadlockpb "github.com/pingcap/kvproto/pkg/deadlock" "github.com/pingcap/kvproto/pkg/kvrpcpb" - "github.com/pingcap/tidb/executor" "github.com/pingcap/tidb/executor/importer" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/sessionctx/variable" @@ -41,7 +40,6 @@ import ( "github.com/tikv/client-go/v2/tikvrpc" "github.com/tikv/client-go/v2/util" pd "github.com/tikv/pd/client" - rmclient "github.com/tikv/pd/client/resource_group/controller" "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/keepalive" @@ -96,26 +94,6 @@ func WithPDClientConfig(client config.PDClient) Option { } } -// TrySetupGlobalResourceController tries to setup global resource controller. -func TrySetupGlobalResourceController(ctx context.Context, serverID uint64, s kv.Storage) error { - var ( - store *tikvStore - ok bool - ) - if store, ok = s.(*tikvStore); !ok { - return errors.New("cannot setup up resource controller, should use tikv storage") - } - - control, err := rmclient.NewResourceGroupController(ctx, serverID, store.GetPDClient(), nil, rmclient.WithMaxWaitDuration(time.Second*30)) - if err != nil { - return err - } - executor.SetResourceGroupController(control) - tikv.SetResourceControlInterceptor(control) - control.Start(ctx) - return nil -} - func getKVStore(path string, tls config.Security) (kv.Storage, error) { return TiKVDriver{}.OpenWithOptions(path, WithSecurity(tls)) } diff --git a/store/mockstore/unistore/BUILD.bazel b/store/mockstore/unistore/BUILD.bazel index cade081fe2ccf..53120c16d32fc 100644 --- a/store/mockstore/unistore/BUILD.bazel +++ b/store/mockstore/unistore/BUILD.bazel @@ -13,7 +13,7 @@ go_library( importpath = "github.com/pingcap/tidb/store/mockstore/unistore", visibility = ["//visibility:public"], deps = [ - "//domain/resourcegroup", + "//domain/infosync", "//parser/terror", "//store/mockstore/unistore/config", "//store/mockstore/unistore/lockstore", diff --git a/store/mockstore/unistore/pd.go b/store/mockstore/unistore/pd.go index d10bb80adae74..a72f69f356e64 100644 --- a/store/mockstore/unistore/pd.go +++ b/store/mockstore/unistore/pd.go @@ -17,7 +17,6 @@ package unistore import ( "context" "errors" - "fmt" "math" "sync" "sync/atomic" @@ -26,7 +25,7 @@ import ( "github.com/pingcap/kvproto/pkg/meta_storagepb" "github.com/pingcap/kvproto/pkg/pdpb" rmpb "github.com/pingcap/kvproto/pkg/resource_manager" - "github.com/pingcap/tidb/domain/resourcegroup" + "github.com/pingcap/tidb/domain/infosync" us "github.com/pingcap/tidb/store/mockstore/unistore/tikv" "github.com/tikv/client-go/v2/oracle" pd "github.com/tikv/pd/client" @@ -36,45 +35,20 @@ var _ pd.Client = new(pdClient) type pdClient struct { *us.MockPD + pd.ResourceManagerClient - serviceSafePoints map[string]uint64 - gcSafePointMu sync.Mutex - globalConfig map[string]string - externalTimestamp atomic.Uint64 - resourceGroupManager *resourceGroupManager -} - -type resourceGroupManager struct { - sync.RWMutex - groups map[string]*rmpb.ResourceGroup -} - -func newResourceGroupManager() *resourceGroupManager { - mgr := &resourceGroupManager{ - groups: make(map[string]*rmpb.ResourceGroup), - } - mgr.groups[resourcegroup.DefaultResourceGroupName] = &rmpb.ResourceGroup{ - Name: resourcegroup.DefaultResourceGroupName, - Mode: rmpb.GroupMode_RUMode, - RUSettings: &rmpb.GroupRequestUnitSettings{ - RU: &rmpb.TokenBucket{ - Settings: &rmpb.TokenLimitSettings{ - FillRate: math.MaxInt32, - BurstLimit: -1, - }, - }, - }, - Priority: 8, - } - return mgr + serviceSafePoints map[string]uint64 + gcSafePointMu sync.Mutex + globalConfig map[string]string + externalTimestamp atomic.Uint64 } func newPDClient(pd *us.MockPD) *pdClient { return &pdClient{ - MockPD: pd, - serviceSafePoints: make(map[string]uint64), - globalConfig: make(map[string]string), - resourceGroupManager: newResourceGroupManager(), + MockPD: pd, + ResourceManagerClient: infosync.NewMockResourceManagerClient(), + serviceSafePoints: make(map[string]uint64), + globalConfig: make(map[string]string), } } @@ -216,54 +190,6 @@ func (c *pdClient) UpdateKeyspaceState(ctx context.Context, id uint32, state key return nil, nil } -func (c *pdClient) ListResourceGroups(ctx context.Context) ([]*rmpb.ResourceGroup, error) { - c.resourceGroupManager.RLock() - defer c.resourceGroupManager.RUnlock() - groups := make([]*rmpb.ResourceGroup, 0, len(c.resourceGroupManager.groups)) - for _, group := range c.resourceGroupManager.groups { - groups = append(groups, group) - } - return groups, nil -} - -func (c *pdClient) GetResourceGroup(ctx context.Context, name string) (*rmpb.ResourceGroup, error) { - c.resourceGroupManager.RLock() - defer c.resourceGroupManager.RUnlock() - group, ok := c.resourceGroupManager.groups[name] - if !ok { - return nil, nil - } - return group, nil -} - -func (c *pdClient) AddResourceGroup(ctx context.Context, group *rmpb.ResourceGroup) (string, error) { - c.resourceGroupManager.Lock() - defer c.resourceGroupManager.Unlock() - if _, ok := c.resourceGroupManager.groups[group.Name]; ok { - return "", fmt.Errorf("the group %s already exists", group.Name) - } - c.resourceGroupManager.groups[group.Name] = group - return "Success!", nil -} - -func (c *pdClient) ModifyResourceGroup(ctx context.Context, group *rmpb.ResourceGroup) (string, error) { - c.resourceGroupManager.Lock() - defer c.resourceGroupManager.Unlock() - c.resourceGroupManager.groups[group.Name] = group - return "Success!", nil -} - -func (c *pdClient) DeleteResourceGroup(ctx context.Context, name string) (string, error) { - c.resourceGroupManager.Lock() - defer c.resourceGroupManager.Unlock() - delete(c.resourceGroupManager.groups, name) - return "Success!", nil -} - -func (c *pdClient) WatchResourceGroup(ctx context.Context, revision int64) (chan []*rmpb.ResourceGroup, error) { - return nil, nil -} - func (c *pdClient) AcquireTokenBuckets(ctx context.Context, request *rmpb.TokenBucketsRequest) ([]*rmpb.TokenBucketResponse, error) { return nil, nil } @@ -312,10 +238,6 @@ func (c *pdClient) GetLocalTSWithinKeyspaceAsync(ctx context.Context, dcLocation return nil } -func (c *pdClient) Watch(ctx context.Context, key []byte, opts ...pd.OpOption) (chan []*meta_storagepb.Event, error) { - return nil, nil -} - func (c *pdClient) Get(ctx context.Context, key []byte, opts ...pd.OpOption) (*meta_storagepb.GetResponse, error) { return nil, nil } diff --git a/tidb-server/main.go b/tidb-server/main.go index c8bf1e6b80edc..cb5e30e62e28f 100644 --- a/tidb-server/main.go +++ b/tidb-server/main.go @@ -238,10 +238,7 @@ func main() { resourcemanager.InstanceResourceManager.Start() storage, dom := createStoreAndDomain(keyspaceName) svr := createServer(storage, dom) - err = driver.TrySetupGlobalResourceController(context.Background(), dom.ServerID(), storage) - if err != nil { - logutil.BgLogger().Warn("failed to setup global resource controller", zap.Error(err)) - } + tikv.SetResourceControlInterceptor(dom.ResourceGroupsController()) // Register error API is not thread-safe, the caller MUST NOT register errors after initialization. // To prevent misuse, set a flag to indicate that register new error will panic immediately. diff --git a/util/dbterror/exeerrors/errors.go b/util/dbterror/exeerrors/errors.go index 09665abb3b7af..fb6d581303c4d 100644 --- a/util/dbterror/exeerrors/errors.go +++ b/util/dbterror/exeerrors/errors.go @@ -51,6 +51,7 @@ var ( ErrDeadlock = dbterror.ClassExecutor.NewStd(mysql.ErrLockDeadlock) ErrQueryInterrupted = dbterror.ClassExecutor.NewStd(mysql.ErrQueryInterrupted) ErrMaxExecTimeExceeded = dbterror.ClassExecutor.NewStd(mysql.ErrMaxExecTimeExceeded) + ErrResourceGroupQueryRunaway = dbterror.ClassExecutor.NewStd(mysql.ErrResourceGroupQueryRunaway) ErrDynamicPrivilegeNotRegistered = dbterror.ClassExecutor.NewStd(mysql.ErrDynamicPrivilegeNotRegistered) ErrIllegalPrivilegeLevel = dbterror.ClassExecutor.NewStd(mysql.ErrIllegalPrivilegeLevel) ErrInvalidSplitRegionRanges = dbterror.ClassExecutor.NewStd(mysql.ErrInvalidSplitRegionRanges)