Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into use-resource-contro…
Browse files Browse the repository at this point in the history
…ller

Conflicts:
	domain/globalconfigsync/globalconfig.go
	go.mod
	go.sum
	store/mockstore/unistore/pd.go
  • Loading branch information
nolouch committed Jan 20, 2023
2 parents 2400de3 + 38a7c62 commit 909e9ce
Show file tree
Hide file tree
Showing 38 changed files with 1,945 additions and 283 deletions.
1,431 changes: 1,367 additions & 64 deletions DEPS.bzl

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions ddl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ go_library(
"//parser/opcode",
"//parser/terror",
"//parser/types",
"//privilege",
"//sessionctx",
"//sessionctx/binloginfo",
"//sessionctx/stmtctx",
Expand Down
12 changes: 12 additions & 0 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/parser/terror"
field_types "github.com/pingcap/tidb/parser/types"
"github.com/pingcap/tidb/privilege"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/sessiontxn"
Expand Down Expand Up @@ -7668,6 +7669,17 @@ func (d *ddl) DropResourceGroup(ctx sessionctx.Context, stmt *ast.DropResourceGr
return err
}

// check to see if some user has dependency on the group
checker := privilege.GetPrivilegeManager(ctx)
if checker == nil {
return errors.New("miss privilege checker")
}
user, matched := checker.MatchUserResourceGroupName(groupName.L)
if matched {
err = errors.Errorf("user [%s] depends on the resource group to drop", user)
return err
}

job := &model.Job{
SchemaID: group.ID,
SchemaName: group.Name.L,
Expand Down
4 changes: 4 additions & 0 deletions ddl/resource_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,10 @@ func TestResourceGroupBasic(t *testing.T) {
tk.MustGetErrCode("create user usr_fail resource group nil_group", mysql.ErrResourceGroupNotExists)
tk.MustExec("create user user2")
tk.MustGetErrCode("alter user user2 resource group nil_group", mysql.ErrResourceGroupNotExists)

tk.MustExec("create resource group do_not_delete_rg rru_per_sec=100 wru_per_sec=200")
tk.MustExec("create user usr3 resource group do_not_delete_rg")
tk.MustContainErrMsg("drop resource group do_not_delete_rg", "user [usr3] depends on the resource group to drop")
}

func testResourceGroupNameFromIS(t *testing.T, ctx sessionctx.Context, name string) *model.ResourceGroupInfo {
Expand Down
4 changes: 2 additions & 2 deletions ddl/resourcegroup/group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func TestNewResourceGroupFromOptions(t *testing.T) {
output: &rmpb.ResourceGroup{
Name: groupName,
Mode: rmpb.GroupMode_RawMode,
ResourceSettings: &rmpb.GroupResourceSettings{
RawResourceSettings: &rmpb.GroupRawResourceSettings{
Cpu: &rmpb.TokenBucket{Settings: &rmpb.TokenLimitSettings{FillRate: 8000}},
IoRead: &rmpb.TokenBucket{Settings: &rmpb.TokenLimitSettings{FillRate: 3000000000}},
IoWrite: &rmpb.TokenBucket{Settings: &rmpb.TokenLimitSettings{FillRate: 1500000000}},
Expand All @@ -119,7 +119,7 @@ func TestNewResourceGroupFromOptions(t *testing.T) {
output: &rmpb.ResourceGroup{
Name: groupName,
Mode: rmpb.GroupMode_RawMode,
ResourceSettings: &rmpb.GroupResourceSettings{
RawResourceSettings: &rmpb.GroupRawResourceSettings{
Cpu: &rmpb.TokenBucket{Settings: &rmpb.TokenLimitSettings{FillRate: 8000}},
IoRead: &rmpb.TokenBucket{Settings: &rmpb.TokenLimitSettings{FillRate: 3145728000}},
IoWrite: &rmpb.TokenBucket{Settings: &rmpb.TokenLimitSettings{FillRate: 3145728000}},
Expand Down
30 changes: 19 additions & 11 deletions distsql/request_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,17 +131,16 @@ func (builder *RequestBuilder) SetHandleRangesForTables(sc *stmtctx.StatementCon
// SetTableHandles sets "KeyRanges" for "kv.Request" by converting table handles
// "handles" to "KeyRanges" firstly.
func (builder *RequestBuilder) SetTableHandles(tid int64, handles []kv.Handle) *RequestBuilder {
var keyRanges []kv.KeyRange
keyRanges, builder.FixedRowCountHint = TableHandlesToKVRanges(tid, handles)
builder.Request.KeyRanges = kv.NewNonParitionedKeyRanges(keyRanges)
keyRanges, hints := TableHandlesToKVRanges(tid, handles)
builder.Request.KeyRanges = kv.NewNonParitionedKeyRangesWithHint(keyRanges, hints)
return builder
}

// SetPartitionsAndHandles sets "KeyRanges" for "kv.Request" by converting ParitionHandles to KeyRanges.
// handles in slice must be kv.PartitionHandle.
func (builder *RequestBuilder) SetPartitionsAndHandles(handles []kv.Handle) *RequestBuilder {
keyRanges := PartitionHandlesToKVRanges(handles)
builder.Request.KeyRanges = kv.NewNonParitionedKeyRanges(keyRanges)
keyRanges, hints := PartitionHandlesToKVRanges(handles)
builder.Request.KeyRanges = kv.NewNonParitionedKeyRangesWithHint(keyRanges, hints)
return builder
}

Expand Down Expand Up @@ -194,6 +193,12 @@ func (builder *RequestBuilder) SetKeyRanges(keyRanges []kv.KeyRange) *RequestBui
return builder
}

// SetKeyRangesWithHints sets "KeyRanges" for "kv.Request" with row count hints.
func (builder *RequestBuilder) SetKeyRangesWithHints(keyRanges []kv.KeyRange, hints []int) *RequestBuilder {
builder.Request.KeyRanges = kv.NewNonParitionedKeyRangesWithHint(keyRanges, hints)
return builder
}

// SetWrappedKeyRanges sets "KeyRanges" for "kv.Request".
func (builder *RequestBuilder) SetWrappedKeyRanges(keyRanges *kv.KeyRanges) *RequestBuilder {
builder.Request.KeyRanges = keyRanges
Expand Down Expand Up @@ -551,7 +556,7 @@ func SplitRangesAcrossInt64Boundary(ranges []*ranger.Range, keepOrder bool, desc
// For continuous handles, we should merge them to a single key range.
func TableHandlesToKVRanges(tid int64, handles []kv.Handle) ([]kv.KeyRange, []int) {
krs := make([]kv.KeyRange, 0, len(handles))
hint := make([]int, 0, len(handles))
hints := make([]int, 0, len(handles))
i := 0
for i < len(handles) {
if commonHandle, ok := handles[i].(*kv.CommonHandle); ok {
Expand All @@ -560,7 +565,7 @@ func TableHandlesToKVRanges(tid int64, handles []kv.Handle) ([]kv.KeyRange, []in
EndKey: tablecodec.EncodeRowKey(tid, kv.Key(commonHandle.Encoded()).Next()),
}
krs = append(krs, ran)
hint = append(hint, 1)
hints = append(hints, 1)
i++
continue
}
Expand All @@ -576,16 +581,17 @@ func TableHandlesToKVRanges(tid int64, handles []kv.Handle) ([]kv.KeyRange, []in
startKey := tablecodec.EncodeRowKey(tid, low)
endKey := tablecodec.EncodeRowKey(tid, high)
krs = append(krs, kv.KeyRange{StartKey: startKey, EndKey: endKey})
hint = append(hint, j-i)
hints = append(hints, j-i)
i = j
}
return krs, hint
return krs, hints
}

// PartitionHandlesToKVRanges convert ParitionHandles to kv ranges.
// Handle in slices must be kv.PartitionHandle
func PartitionHandlesToKVRanges(handles []kv.Handle) []kv.KeyRange {
func PartitionHandlesToKVRanges(handles []kv.Handle) ([]kv.KeyRange, []int) {
krs := make([]kv.KeyRange, 0, len(handles))
hints := make([]int, 0, len(handles))
i := 0
for i < len(handles) {
ph := handles[i].(kv.PartitionHandle)
Expand All @@ -597,6 +603,7 @@ func PartitionHandlesToKVRanges(handles []kv.Handle) []kv.KeyRange {
EndKey: tablecodec.EncodeRowKey(pid, append(commonHandle.Encoded(), 0)),
}
krs = append(krs, ran)
hints = append(hints, 1)
i++
continue
}
Expand All @@ -615,9 +622,10 @@ func PartitionHandlesToKVRanges(handles []kv.Handle) []kv.KeyRange {
startKey := tablecodec.EncodeRowKey(pid, low)
endKey := tablecodec.EncodeRowKey(pid, high)
krs = append(krs, kv.KeyRange{StartKey: startKey, EndKey: endKey})
hints = append(hints, j-i)
i = j
}
return krs
return krs, hints
}

// IndexRangesToKVRanges converts index ranges to "KeyRange".
Expand Down
26 changes: 13 additions & 13 deletions distsql/request_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,11 @@ func TestTableHandlesToKVRanges(t *testing.T) {

// Build key ranges.
expect := getExpectedRanges(1, hrs)
actual, _ := TableHandlesToKVRanges(1, handles)
actual, hints := TableHandlesToKVRanges(1, handles)

// Compare key ranges and expected key ranges.
require.Equal(t, len(expect), len(actual))
require.Equal(t, hints, []int{1, 4, 2, 1, 2})
for i := range actual {
require.Equal(t, expect[i].StartKey, actual[i].StartKey)
require.Equal(t, expect[i].EndKey, actual[i].EndKey)
Expand Down Expand Up @@ -378,7 +379,7 @@ func TestRequestBuilder3(t *testing.T) {
Tp: 103,
StartTs: 0x0,
Data: []uint8{0x18, 0x0, 0x20, 0x0, 0x40, 0x0, 0x5a, 0x0},
KeyRanges: kv.NewNonParitionedKeyRanges([]kv.KeyRange{
KeyRanges: kv.NewNonParitionedKeyRangesWithHint([]kv.KeyRange{
{
StartKey: kv.Key{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0xf, 0x5f, 0x72, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0},
EndKey: kv.Key{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0xf, 0x5f, 0x72, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1},
Expand All @@ -395,17 +396,16 @@ func TestRequestBuilder3(t *testing.T) {
StartKey: kv.Key{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0xf, 0x5f, 0x72, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x64},
EndKey: kv.Key{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0xf, 0x5f, 0x72, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x65},
},
}),
Cacheable: true,
KeepOrder: false,
Desc: false,
Concurrency: variable.DefDistSQLScanConcurrency,
IsolationLevel: 0,
Priority: 0,
NotFillCache: false,
ReplicaRead: kv.ReplicaReadLeader,
ReadReplicaScope: kv.GlobalReplicaScope,
FixedRowCountHint: []int{1, 4, 2, 1},
}, []int{1, 4, 2, 1}),
Cacheable: true,
KeepOrder: false,
Desc: false,
Concurrency: variable.DefDistSQLScanConcurrency,
IsolationLevel: 0,
Priority: 0,
NotFillCache: false,
ReplicaRead: kv.ReplicaReadLeader,
ReadReplicaScope: kv.GlobalReplicaScope,
}
expect.Paging.MinPagingSize = paging.MinPagingSize
expect.Paging.MaxPagingSize = paging.MaxPagingSize
Expand Down
1 change: 1 addition & 0 deletions domain/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ go_library(
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_kvproto//pkg/metapb",
"@com_github_pingcap_kvproto//pkg/pdpb",
"@com_github_pingcap_log//:log",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//oracle",
Expand Down
3 changes: 2 additions & 1 deletion domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/log"
"github.com/pingcap/tidb/bindinfo"
"github.com/pingcap/tidb/br/pkg/streamhelper"
Expand Down Expand Up @@ -445,7 +446,7 @@ func (do *Domain) InfoSyncer() *infosync.InfoSyncer {

// NotifyGlobalConfigChange notify global config syncer to store the global config into PD.
func (do *Domain) NotifyGlobalConfigChange(name, value string) {
do.globalCfgSyncer.Notify(pd.GlobalConfigItem{Name: name, Value: value})
do.globalCfgSyncer.Notify(pd.GlobalConfigItem{Name: name, Value: value, EventType: pdpb.EventType_PUT})
}

// GetGlobalConfigSyncer exports for testing.
Expand Down
7 changes: 5 additions & 2 deletions domain/globalconfigsync/globalconfig.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2021 PingCAP, Inc.
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -22,6 +22,9 @@ import (
"go.uber.org/zap"
)

// GlobalConfigPath as Etcd prefix
const GlobalConfigPath = "/global/config/"

// GlobalConfigSyncer is used to sync pd global config.
type GlobalConfigSyncer struct {
pd pd.Client
Expand All @@ -41,7 +44,7 @@ func (s *GlobalConfigSyncer) StoreGlobalConfig(ctx context.Context, configPath s
if s.pd == nil {
return nil
}
err := s.pd.StoreGlobalConfig(ctx, configPath, []pd.GlobalConfigItem{item})
err := s.pd.StoreGlobalConfig(ctx, GlobalConfigPath, []pd.GlobalConfigItem{item})
if err != nil {
return err
}
Expand Down
18 changes: 10 additions & 8 deletions domain/globalconfigsync/globalconfig_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2021 PingCAP, Inc.
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -16,6 +16,7 @@ package globalconfigsync_test

import (
"context"
"path"
"runtime"
"testing"
"time"
Expand Down Expand Up @@ -58,11 +59,12 @@ func TestGlobalConfigSyncer(t *testing.T) {
syncer.Notify(pd.GlobalConfigItem{Name: "a", Value: "b"})
err = syncer.StoreGlobalConfig(context.Background(), <-syncer.NotifyCh)
require.NoError(t, err)
items, err := client.LoadGlobalConfig(context.Background(), []string{"a"})
items, revision, err := client.LoadGlobalConfig(context.Background(), globalconfigsync.GlobalConfigPath)
require.NoError(t, err)
require.Equal(t, len(items), 1)
require.Equal(t, items[0].Name, "/global/config/a")
require.Equal(t, items[0].Value, "b")
require.Equal(t, 1, len(items))
require.Equal(t, path.Join(globalconfigsync.GlobalConfigPath, "a"), items[0].Name)
require.Equal(t, int64(0), revision)
require.Equal(t, "b", items[0].Value)
}

func TestStoreGlobalConfig(t *testing.T) {
Expand Down Expand Up @@ -95,15 +97,15 @@ func TestStoreGlobalConfig(t *testing.T) {
client :=
store.(kv.StorageWithPD).GetPDClient()
// enable top sql will be translated to enable_resource_metering
items, err := client.LoadGlobalConfig(context.Background(), []string{"enable_resource_metering", "source_id"})
items, _, err := client.LoadGlobalConfig(context.Background(), globalconfigsync.GlobalConfigPath)
require.NoError(t, err)
if len(items) == 2 && items[0].Value == "" {
continue
}
require.Len(t, items, 2)
require.Equal(t, items[0].Name, "/global/config/enable_resource_metering")
require.Equal(t, items[0].Name, path.Join(globalconfigsync.GlobalConfigPath, "enable_resource_metering"))
require.Equal(t, items[0].Value, "true")
require.Equal(t, items[1].Name, "/global/config/source_id")
require.Equal(t, items[1].Name, path.Join(globalconfigsync.GlobalConfigPath, "source_id"))
require.Equal(t, items[1].Value, "2")
return
}
Expand Down
2 changes: 1 addition & 1 deletion errno/errname.go
Original file line number Diff line number Diff line change
Expand Up @@ -1103,7 +1103,7 @@ var MySQLErrName = map[uint16]*mysql.ErrMessage{
ErrResourceGroupNotExists: mysql.Message("Unknown resource group '%-.192s'", nil),

ErrColumnInChange: mysql.Message("column %s id %d does not exist, this column may have been updated by other DDL ran in parallel", nil),
ErrResourceGroupSupportDisabled: mysql.Message("Resource group feature is disabled", nil),
ErrResourceGroupSupportDisabled: mysql.Message("Resource control feature is disabled. Run `SET GLOBAL tidb_enable_resource_control='on'` to enable the feature", nil),

// TiKV/PD errors.
ErrPDServerTimeout: mysql.Message("PD server timeout: %s", nil),
Expand Down
2 changes: 1 addition & 1 deletion errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2538,7 +2538,7 @@ Unknown resource group '%-.192s'

["schema:8250"]
error = '''
Resource group feature is disabled
Resource control feature is disabled. Run `SET GLOBAL tidb_enable_resource_control='on'` to enable the feature
'''

["session:8002"]
Expand Down
8 changes: 4 additions & 4 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -4220,13 +4220,13 @@ func (builder *dataReaderBuilder) buildTableReaderForIndexJoin(ctx context.Conte
continue
}
handle := kv.IntHandle(content.keys[0].GetInt64())
tmp, _ := distsql.TableHandlesToKVRanges(pid, []kv.Handle{handle})
kvRanges = append(kvRanges, tmp...)
ranges, _ := distsql.TableHandlesToKVRanges(pid, []kv.Handle{handle})
kvRanges = append(kvRanges, ranges...)
}
} else {
for _, p := range usedPartitionList {
tmp, _ := distsql.TableHandlesToKVRanges(p.GetPhysicalID(), handles)
kvRanges = append(kvRanges, tmp...)
ranges, _ := distsql.TableHandlesToKVRanges(p.GetPhysicalID(), handles)
kvRanges = append(kvRanges, ranges...)
}
}

Expand Down
Loading

0 comments on commit 909e9ce

Please sign in to comment.