Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: use the latest independent resource group manager client #41162

Merged
merged 6 commits into from
Feb 7, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -4085,8 +4085,8 @@ def go_deps():
name = "com_github_tikv_client_go_v2",
build_file_proto_mode = "disable_global",
importpath = "github.com/tikv/client-go/v2",
sum = "h1:gkAF7XxM2mfh5ZHbyLhXkaKyDd97soe1SMFIZ2vW260=",
version = "v2.0.6-0.20230207040004-9b3ecc1dcaa9",
sum = "h1:1/ow7ZUnsU5CcxHF1cFAKdD+5b58tMbaeb8qAoli1m4=",
version = "v2.0.6-0.20230207090754-29dfcc272912",
)
go_repository(
name = "com_github_tikv_pd",
Expand Down
12 changes: 6 additions & 6 deletions ddl/attributes_sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ PARTITION BY RANGE (c) (
func TestFlashbackTable(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)

_, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), true)
_, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), dom.GetPDClient(), true)
require.NoError(t, err)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
Expand Down Expand Up @@ -331,7 +331,7 @@ PARTITION BY RANGE (c) (
func TestDropTable(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)

_, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), true)
_, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), dom.GetPDClient(), true)
require.NoError(t, err)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
Expand Down Expand Up @@ -384,7 +384,7 @@ PARTITION BY RANGE (c) (
func TestCreateWithSameName(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)

_, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), true)
_, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), dom.GetPDClient(), true)
require.NoError(t, err)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
Expand Down Expand Up @@ -448,7 +448,7 @@ PARTITION BY RANGE (c) (
func TestPartition(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)

_, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), true)
_, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), dom.GetPDClient(), true)
require.NoError(t, err)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
Expand Down Expand Up @@ -508,7 +508,7 @@ PARTITION BY RANGE (c) (
func TestDropSchema(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)

_, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), true)
_, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), dom.GetPDClient(), true)
require.NoError(t, err)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
Expand All @@ -534,7 +534,7 @@ PARTITION BY RANGE (c) (
func TestDefaultKeyword(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)

_, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), true)
_, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), dom.GetPDClient(), true)
require.NoError(t, err)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
Expand Down
2 changes: 1 addition & 1 deletion ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ type DDL interface {
CreatePlacementPolicy(ctx sessionctx.Context, stmt *ast.CreatePlacementPolicyStmt) error
DropPlacementPolicy(ctx sessionctx.Context, stmt *ast.DropPlacementPolicyStmt) error
AlterPlacementPolicy(ctx sessionctx.Context, stmt *ast.AlterPlacementPolicyStmt) error
CreateResourceGroup(ctx sessionctx.Context, stmt *ast.CreateResourceGroupStmt) error
AddResourceGroup(ctx sessionctx.Context, stmt *ast.CreateResourceGroupStmt) error
AlterResourceGroup(ctx sessionctx.Context, stmt *ast.AlterResourceGroupStmt) error
DropResourceGroup(ctx sessionctx.Context, stmt *ast.DropResourceGroupStmt) error
FlashbackCluster(ctx sessionctx.Context, flashbackTS uint64) error
Expand Down
4 changes: 2 additions & 2 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -7613,8 +7613,8 @@ func checkIgnorePlacementDDL(ctx sessionctx.Context) bool {
return false
}

// CreateResourceGroup implements the DDL interface, creates a resource group.
func (d *ddl) CreateResourceGroup(ctx sessionctx.Context, stmt *ast.CreateResourceGroupStmt) (err error) {
// AddResourceGroup implements the DDL interface, creates a resource group.
func (d *ddl) AddResourceGroup(ctx sessionctx.Context, stmt *ast.CreateResourceGroupStmt) (err error) {
groupInfo := &model.ResourceGroupInfo{ResourceGroupSettings: &model.ResourceGroupSettings{}}
groupName := stmt.ResourceGroupName
groupInfo.Name = groupName
Expand Down
2 changes: 1 addition & 1 deletion ddl/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func TestMain(m *testing.M) {
conf.Experimental.AllowsExpressionIndex = true
})

_, err := infosync.GlobalInfoSyncerInit(context.Background(), "t", func() uint64 { return 1 }, nil, nil, true)
_, err := infosync.GlobalInfoSyncerInit(context.Background(), "t", func() uint64 { return 1 }, nil, nil, nil, true)
if err != nil {
_, _ = fmt.Fprintf(os.Stderr, "ddl: infosync.GlobalInfoSyncerInit: %v\n", err)
os.Exit(1)
Expand Down
4 changes: 2 additions & 2 deletions ddl/resource_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,11 @@ func onCreateResourceGroup(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64,
case model.StateNone:
// none -> public
groupInfo.State = model.StatePublic
err := t.CreateResourceGroup(groupInfo)
err := t.AddResourceGroup(groupInfo)
if err != nil {
return ver, errors.Trace(err)
}
err = infosync.CreateResourceGroup(context.TODO(), protoGroup)
err = infosync.AddResourceGroup(context.TODO(), protoGroup)
if err != nil {
logutil.BgLogger().Warn("create resource group failed", zap.Error(err))
return ver, errors.Trace(err)
Expand Down
2 changes: 1 addition & 1 deletion ddl/resource_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func TestResourceGroupBasic(t *testing.T) {
g = testResourceGroupNameFromIS(t, tk.Session(), "y")
re.Nil(g)
tk.MustContainErrMsg("create resource group x RU_PER_SEC=1000, CPU='8000m';", resourcegroup.ErrInvalidResourceGroupDuplicatedMode.Error())
groups, err := infosync.GetAllResourceGroups(context.TODO())
groups, err := infosync.ListResourceGroups(context.TODO())
require.Equal(t, 0, len(groups))
require.NoError(t, err)

Expand Down
4 changes: 2 additions & 2 deletions ddl/schematracker/checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,9 +433,9 @@ func (d Checker) AlterPlacementPolicy(ctx sessionctx.Context, stmt *ast.AlterPla
panic("implement me")
}

// CreateResourceGroup implements the DDL interface.
// AddResourceGroup implements the DDL interface.
// ResourceGroup do not affect the transaction.
func (d Checker) CreateResourceGroup(ctx sessionctx.Context, stmt *ast.CreateResourceGroupStmt) error {
func (d Checker) AddResourceGroup(ctx sessionctx.Context, stmt *ast.CreateResourceGroupStmt) error {
return nil
}

Expand Down
4 changes: 2 additions & 2 deletions ddl/schematracker/dm_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -1173,8 +1173,8 @@ func (SchemaTracker) AlterPlacementPolicy(ctx sessionctx.Context, stmt *ast.Alte
return nil
}

// CreateResourceGroup implements the DDL interface, it's no-op in DM's case.
func (SchemaTracker) CreateResourceGroup(_ sessionctx.Context, _ *ast.CreateResourceGroupStmt) error {
// AddResourceGroup implements the DDL interface, it's no-op in DM's case.
func (SchemaTracker) AddResourceGroup(_ sessionctx.Context, _ *ast.CreateResourceGroupStmt) error {
return nil
}

Expand Down
4 changes: 2 additions & 2 deletions domain/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func TestNormalSessionPool(t *testing.T) {
domain, err := session.BootstrapSession(store)
require.NoError(t, err)
defer domain.Close()
info, err1 := infosync.GlobalInfoSyncerInit(context.Background(), "t", func() uint64 { return 1 }, nil, nil, true)
info, err1 := infosync.GlobalInfoSyncerInit(context.Background(), "t", func() uint64 { return 1 }, nil, nil, nil, true)
require.NoError(t, err1)
conf := config.GetGlobalConfig()
conf.Socket = ""
Expand Down Expand Up @@ -107,7 +107,7 @@ func TestAbnormalSessionPool(t *testing.T) {
domain, err := session.BootstrapSession(store)
require.NoError(t, err)
defer domain.Close()
info, err1 := infosync.GlobalInfoSyncerInit(context.Background(), "t", func() uint64 { return 1 }, nil, nil, true)
info, err1 := infosync.GlobalInfoSyncerInit(context.Background(), "t", func() uint64 { return 1 }, nil, nil, nil, true)
require.NoError(t, err1)
conf := config.GetGlobalConfig()
conf.Socket = ""
Expand Down
25 changes: 14 additions & 11 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1047,18 +1047,13 @@ func (do *Domain) Init(
}

// step 1: prepare the info/schema syncer which domain reload needed.
pdCli := do.GetPDClient()
skipRegisterToDashboard := config.GetGlobalConfig().SkipRegisterToDashboard
do.info, err = infosync.GlobalInfoSyncerInit(ctx, do.ddl.GetID(), do.ServerID, do.etcdClient, do.unprefixedEtcdCli, skipRegisterToDashboard)
do.info, err = infosync.GlobalInfoSyncerInit(ctx, do.ddl.GetID(), do.ServerID, do.etcdClient, do.unprefixedEtcdCli, pdCli, skipRegisterToDashboard)
if err != nil {
return err
}

var pdClient pd.Client
if store, ok := do.store.(kv.StorageWithPD); ok {
pdClient = store.GetPDClient()
}
do.globalCfgSyncer = globalconfigsync.NewGlobalConfigSyncer(pdClient)

do.globalCfgSyncer = globalconfigsync.NewGlobalConfigSyncer(pdCli)
err = do.ddl.SchemaSyncer().Init(ctx)
if err != nil {
return err
Expand Down Expand Up @@ -1089,12 +1084,12 @@ func (do *Domain) Init(
if !skipRegisterToDashboard {
do.wg.Run(do.topologySyncerKeeper, "topologySyncerKeeper")
}
if pdClient != nil {
if pdCli != nil {
do.wg.Run(func() {
do.closestReplicaReadCheckLoop(ctx, pdClient)
do.closestReplicaReadCheckLoop(ctx, pdCli)
}, "closestReplicaReadCheckLoop")
}
err = do.initLogBackup(ctx, pdClient)
err = do.initLogBackup(ctx, pdCli)
if err != nil {
return err
}
Expand Down Expand Up @@ -1339,6 +1334,14 @@ func (do *Domain) GetEtcdClient() *clientv3.Client {
return do.etcdClient
}

// GetPDClient returns the PD client.
func (do *Domain) GetPDClient() pd.Client {
if store, ok := do.store.(kv.StorageWithPD); ok {
return store.GetPDClient()
}
return nil
}

// LoadPrivilegeLoop create a goroutine loads privilege tables in a loop, it
// should be called only once in BootstrapSession.
func (do *Domain) LoadPrivilegeLoop(sctx sessionctx.Context) error {
Expand Down
1 change: 0 additions & 1 deletion domain/infosync/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ go_library(
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_kvproto//pkg/metapb",
"@com_github_pingcap_kvproto//pkg/resource_manager",
"@com_github_pingcap_log//:log",
"@com_github_tikv_client_go_v2//oracle",
"@com_github_tikv_pd_client//:client",
"@io_etcd_go_etcd_client_v3//:client",
Expand Down
38 changes: 24 additions & 14 deletions domain/infosync/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ type InfoSyncer struct {
placementManager PlacementManager
scheduleManager ScheduleManager
tiflashReplicaManager TiFlashReplicaManager
resourceGroupManager ResourceGroupManager
resourceGroupManager pd.ResourceManagerClient
}

// ServerInfo is server static information.
Expand Down Expand Up @@ -186,7 +186,14 @@ func setGlobalInfoSyncer(is *InfoSyncer) {
}

// GlobalInfoSyncerInit return a new InfoSyncer. It is exported for testing.
func GlobalInfoSyncerInit(ctx context.Context, id string, serverIDGetter func() uint64, etcdCli *clientv3.Client, unprefixedEtcdCli *clientv3.Client, skipRegisterToDashBoard bool) (*InfoSyncer, error) {
func GlobalInfoSyncerInit(
ctx context.Context,
id string,
serverIDGetter func() uint64,
etcdCli, unprefixedEtcdCli *clientv3.Client,
pdCli pd.Client,
skipRegisterToDashBoard bool,
) (*InfoSyncer, error) {
is := &InfoSyncer{
etcdCli: etcdCli,
unprefixedEtcdCli: unprefixedEtcdCli,
Expand All @@ -201,8 +208,8 @@ func GlobalInfoSyncerInit(ctx context.Context, id string, serverIDGetter func()
is.labelRuleManager = initLabelRuleManager(etcdCli)
is.placementManager = initPlacementManager(etcdCli)
is.scheduleManager = initScheduleManager(etcdCli)
is.resourceGroupManager = initResourceGroupManager(etcdCli)
is.tiflashReplicaManager = initTiFlashReplicaManager(etcdCli)
is.resourceGroupManager = initResourceGroupManager(pdCli)
setGlobalInfoSyncer(is)
return is, nil
}
Expand Down Expand Up @@ -247,11 +254,11 @@ func initPlacementManager(etcdCli *clientv3.Client) PlacementManager {
return &PDPlacementManager{etcdCli: etcdCli}
}

func initResourceGroupManager(etcdCli *clientv3.Client) ResourceGroupManager {
if etcdCli == nil {
func initResourceGroupManager(pdCli pd.Client) pd.ResourceManagerClient {
if pdCli == nil {
return &mockResourceGroupManager{groups: make(map[string]*rmpb.ResourceGroup)}
}
return NewResourceManager(etcdCli)
return pdCli
}

func initTiFlashReplicaManager(etcdCli *clientv3.Client) TiFlashReplicaManager {
Expand Down Expand Up @@ -590,23 +597,24 @@ func GetResourceGroup(ctx context.Context, name string) (*rmpb.ResourceGroup, er
return is.resourceGroupManager.GetResourceGroup(ctx, name)
}

// GetAllResourceGroups is used to get all resource groups from resource manager.
func GetAllResourceGroups(ctx context.Context) ([]*rmpb.ResourceGroup, error) {
// ListResourceGroups is used to get all resource groups from resource manager.
func ListResourceGroups(ctx context.Context) ([]*rmpb.ResourceGroup, error) {
is, err := getGlobalInfoSyncer()
if err != nil {
return nil, err
}

return is.resourceGroupManager.GetAllResourceGroups(ctx)
return is.resourceGroupManager.ListResourceGroups(ctx)
}

// CreateResourceGroup is used to create one specific resource group to resource manager.
func CreateResourceGroup(ctx context.Context, group *rmpb.ResourceGroup) error {
// AddResourceGroup is used to create one specific resource group to resource manager.
func AddResourceGroup(ctx context.Context, group *rmpb.ResourceGroup) error {
is, err := getGlobalInfoSyncer()
if err != nil {
return err
}
return is.resourceGroupManager.CreateResourceGroup(ctx, group)
_, err = is.resourceGroupManager.AddResourceGroup(ctx, group)
return err
}

// ModifyResourceGroup is used to modify one specific resource group to resource manager.
Expand All @@ -615,7 +623,8 @@ func ModifyResourceGroup(ctx context.Context, group *rmpb.ResourceGroup) error {
if err != nil {
return err
}
return is.resourceGroupManager.ModifyResourceGroup(ctx, group)
_, err = is.resourceGroupManager.ModifyResourceGroup(ctx, group)
return err
}

// DeleteResourceGroup is used to delete one specific resource group from resource manager.
Expand All @@ -624,7 +633,8 @@ func DeleteResourceGroup(ctx context.Context, name string) error {
if err != nil {
return err
}
return is.resourceGroupManager.DeleteResourceGroup(ctx, name)
_, err = is.resourceGroupManager.DeleteResourceGroup(ctx, name)
return err
}

// PutRuleBundlesWithDefaultRetry will retry for default times
Expand Down
6 changes: 3 additions & 3 deletions domain/infosync/info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func TestTopology(t *testing.T) {
require.NoError(t, err)
}()

info, err := GlobalInfoSyncerInit(ctx, currentID, func() uint64 { return 1 }, client, client, false)
info, err := GlobalInfoSyncerInit(ctx, currentID, func() uint64 { return 1 }, client, client, nil, false)
require.NoError(t, err)

err = info.newTopologySessionAndStoreServerInfo(ctx, util2.NewSessionDefaultRetryCnt)
Expand Down Expand Up @@ -152,7 +152,7 @@ func (is *InfoSyncer) ttlKeyExists(ctx context.Context) (bool, error) {
}

func TestPutBundlesRetry(t *testing.T) {
_, err := GlobalInfoSyncerInit(context.TODO(), "test", func() uint64 { return 1 }, nil, nil, false)
_, err := GlobalInfoSyncerInit(context.TODO(), "test", func() uint64 { return 1 }, nil, nil, nil, false)
require.NoError(t, err)

bundle, err := placement.NewBundleFromOptions(&model.PlacementSettings{PrimaryRegion: "r1", Regions: "r1,r2"})
Expand Down Expand Up @@ -216,7 +216,7 @@ func TestPutBundlesRetry(t *testing.T) {

func TestTiFlashManager(t *testing.T) {
ctx := context.Background()
_, err := GlobalInfoSyncerInit(ctx, "test", func() uint64 { return 1 }, nil, nil, false)
_, err := GlobalInfoSyncerInit(ctx, "test", func() uint64 { return 1 }, nil, nil, nil, false)
tiflash := NewMockTiFlash()
SetMockTiFlash(tiflash)

Expand Down
Loading