Skip to content

Commit

Permalink
Use the latest independent resource group manager client
Browse files Browse the repository at this point in the history
Signed-off-by: JmPotato <ghzpotato@gmail.com>
  • Loading branch information
JmPotato committed Feb 7, 2023
1 parent 52e5a7b commit 6af6899
Show file tree
Hide file tree
Showing 22 changed files with 97 additions and 150 deletions.
4 changes: 2 additions & 2 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -4085,8 +4085,8 @@ def go_deps():
name = "com_github_tikv_client_go_v2",
build_file_proto_mode = "disable_global",
importpath = "github.com/tikv/client-go/v2",
sum = "h1:gkAF7XxM2mfh5ZHbyLhXkaKyDd97soe1SMFIZ2vW260=",
version = "v2.0.6-0.20230207040004-9b3ecc1dcaa9",
sum = "h1:1/ow7ZUnsU5CcxHF1cFAKdD+5b58tMbaeb8qAoli1m4=",
version = "v2.0.6-0.20230207090754-29dfcc272912",
)
go_repository(
name = "com_github_tikv_pd",
Expand Down
12 changes: 6 additions & 6 deletions ddl/attributes_sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ PARTITION BY RANGE (c) (
func TestFlashbackTable(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)

_, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), true)
_, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), dom.GetPDClient(), true)
require.NoError(t, err)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
Expand Down Expand Up @@ -331,7 +331,7 @@ PARTITION BY RANGE (c) (
func TestDropTable(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)

_, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), true)
_, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), dom.GetPDClient(), true)
require.NoError(t, err)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
Expand Down Expand Up @@ -384,7 +384,7 @@ PARTITION BY RANGE (c) (
func TestCreateWithSameName(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)

_, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), true)
_, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), dom.GetPDClient(), true)
require.NoError(t, err)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
Expand Down Expand Up @@ -448,7 +448,7 @@ PARTITION BY RANGE (c) (
func TestPartition(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)

_, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), true)
_, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), dom.GetPDClient(), true)
require.NoError(t, err)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
Expand Down Expand Up @@ -508,7 +508,7 @@ PARTITION BY RANGE (c) (
func TestDropSchema(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)

_, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), true)
_, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), dom.GetPDClient(), true)
require.NoError(t, err)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
Expand All @@ -534,7 +534,7 @@ PARTITION BY RANGE (c) (
func TestDefaultKeyword(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)

_, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), true)
_, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), dom.GetPDClient(), true)
require.NoError(t, err)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
Expand Down
2 changes: 1 addition & 1 deletion ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ type DDL interface {
CreatePlacementPolicy(ctx sessionctx.Context, stmt *ast.CreatePlacementPolicyStmt) error
DropPlacementPolicy(ctx sessionctx.Context, stmt *ast.DropPlacementPolicyStmt) error
AlterPlacementPolicy(ctx sessionctx.Context, stmt *ast.AlterPlacementPolicyStmt) error
CreateResourceGroup(ctx sessionctx.Context, stmt *ast.CreateResourceGroupStmt) error
AddResourceGroup(ctx sessionctx.Context, stmt *ast.CreateResourceGroupStmt) error
AlterResourceGroup(ctx sessionctx.Context, stmt *ast.AlterResourceGroupStmt) error
DropResourceGroup(ctx sessionctx.Context, stmt *ast.DropResourceGroupStmt) error
FlashbackCluster(ctx sessionctx.Context, flashbackTS uint64) error
Expand Down
4 changes: 2 additions & 2 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -7613,8 +7613,8 @@ func checkIgnorePlacementDDL(ctx sessionctx.Context) bool {
return false
}

// CreateResourceGroup implements the DDL interface, creates a resource group.
func (d *ddl) CreateResourceGroup(ctx sessionctx.Context, stmt *ast.CreateResourceGroupStmt) (err error) {
// AddResourceGroup implements the DDL interface, creates a resource group.
func (d *ddl) AddResourceGroup(ctx sessionctx.Context, stmt *ast.CreateResourceGroupStmt) (err error) {
groupInfo := &model.ResourceGroupInfo{ResourceGroupSettings: &model.ResourceGroupSettings{}}
groupName := stmt.ResourceGroupName
groupInfo.Name = groupName
Expand Down
2 changes: 1 addition & 1 deletion ddl/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func TestMain(m *testing.M) {
conf.Experimental.AllowsExpressionIndex = true
})

_, err := infosync.GlobalInfoSyncerInit(context.Background(), "t", func() uint64 { return 1 }, nil, nil, true)
_, err := infosync.GlobalInfoSyncerInit(context.Background(), "t", func() uint64 { return 1 }, nil, nil, nil, true)
if err != nil {
_, _ = fmt.Fprintf(os.Stderr, "ddl: infosync.GlobalInfoSyncerInit: %v\n", err)
os.Exit(1)
Expand Down
4 changes: 2 additions & 2 deletions ddl/resource_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,11 @@ func onCreateResourceGroup(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64,
case model.StateNone:
// none -> public
groupInfo.State = model.StatePublic
err := t.CreateResourceGroup(groupInfo)
err := t.AddResourceGroup(groupInfo)
if err != nil {
return ver, errors.Trace(err)
}
err = infosync.CreateResourceGroup(context.TODO(), protoGroup)
err = infosync.AddResourceGroup(context.TODO(), protoGroup)
if err != nil {
logutil.BgLogger().Warn("create resource group failed", zap.Error(err))
return ver, errors.Trace(err)
Expand Down
2 changes: 1 addition & 1 deletion ddl/resource_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func TestResourceGroupBasic(t *testing.T) {
g = testResourceGroupNameFromIS(t, tk.Session(), "y")
re.Nil(g)
tk.MustContainErrMsg("create resource group x RU_PER_SEC=1000, CPU='8000m';", resourcegroup.ErrInvalidResourceGroupDuplicatedMode.Error())
groups, err := infosync.GetAllResourceGroups(context.TODO())
groups, err := infosync.ListResourceGroups(context.TODO())
require.Equal(t, 0, len(groups))
require.NoError(t, err)

Expand Down
4 changes: 2 additions & 2 deletions ddl/schematracker/checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,9 +433,9 @@ func (d Checker) AlterPlacementPolicy(ctx sessionctx.Context, stmt *ast.AlterPla
panic("implement me")
}

// CreateResourceGroup implements the DDL interface.
// AddResourceGroup implements the DDL interface.
// ResourceGroup do not affect the transaction.
func (d Checker) CreateResourceGroup(ctx sessionctx.Context, stmt *ast.CreateResourceGroupStmt) error {
func (d Checker) AddResourceGroup(ctx sessionctx.Context, stmt *ast.CreateResourceGroupStmt) error {
return nil
}

Expand Down
4 changes: 2 additions & 2 deletions ddl/schematracker/dm_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -1173,8 +1173,8 @@ func (SchemaTracker) AlterPlacementPolicy(ctx sessionctx.Context, stmt *ast.Alte
return nil
}

// CreateResourceGroup implements the DDL interface, it's no-op in DM's case.
func (SchemaTracker) CreateResourceGroup(_ sessionctx.Context, _ *ast.CreateResourceGroupStmt) error {
// AddResourceGroup implements the DDL interface, it's no-op in DM's case.
func (SchemaTracker) AddResourceGroup(_ sessionctx.Context, _ *ast.CreateResourceGroupStmt) error {
return nil
}

Expand Down
4 changes: 2 additions & 2 deletions domain/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func TestNormalSessionPool(t *testing.T) {
domain, err := session.BootstrapSession(store)
require.NoError(t, err)
defer domain.Close()
info, err1 := infosync.GlobalInfoSyncerInit(context.Background(), "t", func() uint64 { return 1 }, nil, nil, true)
info, err1 := infosync.GlobalInfoSyncerInit(context.Background(), "t", func() uint64 { return 1 }, nil, nil, nil, true)
require.NoError(t, err1)
conf := config.GetGlobalConfig()
conf.Socket = ""
Expand Down Expand Up @@ -107,7 +107,7 @@ func TestAbnormalSessionPool(t *testing.T) {
domain, err := session.BootstrapSession(store)
require.NoError(t, err)
defer domain.Close()
info, err1 := infosync.GlobalInfoSyncerInit(context.Background(), "t", func() uint64 { return 1 }, nil, nil, true)
info, err1 := infosync.GlobalInfoSyncerInit(context.Background(), "t", func() uint64 { return 1 }, nil, nil, nil, true)
require.NoError(t, err1)
conf := config.GetGlobalConfig()
conf.Socket = ""
Expand Down
25 changes: 14 additions & 11 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1047,18 +1047,13 @@ func (do *Domain) Init(
}

// step 1: prepare the info/schema syncer which domain reload needed.
pdCli := do.GetPDClient()
skipRegisterToDashboard := config.GetGlobalConfig().SkipRegisterToDashboard
do.info, err = infosync.GlobalInfoSyncerInit(ctx, do.ddl.GetID(), do.ServerID, do.etcdClient, do.unprefixedEtcdCli, skipRegisterToDashboard)
do.info, err = infosync.GlobalInfoSyncerInit(ctx, do.ddl.GetID(), do.ServerID, do.etcdClient, do.unprefixedEtcdCli, pdCli, skipRegisterToDashboard)
if err != nil {
return err
}

var pdClient pd.Client
if store, ok := do.store.(kv.StorageWithPD); ok {
pdClient = store.GetPDClient()
}
do.globalCfgSyncer = globalconfigsync.NewGlobalConfigSyncer(pdClient)

do.globalCfgSyncer = globalconfigsync.NewGlobalConfigSyncer(pdCli)
err = do.ddl.SchemaSyncer().Init(ctx)
if err != nil {
return err
Expand Down Expand Up @@ -1089,12 +1084,12 @@ func (do *Domain) Init(
if !skipRegisterToDashboard {
do.wg.Run(do.topologySyncerKeeper, "topologySyncerKeeper")
}
if pdClient != nil {
if pdCli != nil {
do.wg.Run(func() {
do.closestReplicaReadCheckLoop(ctx, pdClient)
do.closestReplicaReadCheckLoop(ctx, pdCli)
}, "closestReplicaReadCheckLoop")
}
err = do.initLogBackup(ctx, pdClient)
err = do.initLogBackup(ctx, pdCli)
if err != nil {
return err
}
Expand Down Expand Up @@ -1339,6 +1334,14 @@ func (do *Domain) GetEtcdClient() *clientv3.Client {
return do.etcdClient
}

// GetPDClient returns the PD client.
func (do *Domain) GetPDClient() pd.Client {
if store, ok := do.store.(kv.StorageWithPD); ok {
return store.GetPDClient()
}
return nil
}

// LoadPrivilegeLoop create a goroutine loads privilege tables in a loop, it
// should be called only once in BootstrapSession.
func (do *Domain) LoadPrivilegeLoop(sctx sessionctx.Context) error {
Expand Down
1 change: 0 additions & 1 deletion domain/infosync/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ go_library(
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_kvproto//pkg/metapb",
"@com_github_pingcap_kvproto//pkg/resource_manager",
"@com_github_pingcap_log//:log",
"@com_github_tikv_client_go_v2//oracle",
"@com_github_tikv_pd_client//:client",
"@io_etcd_go_etcd_client_v3//:client",
Expand Down
38 changes: 24 additions & 14 deletions domain/infosync/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ type InfoSyncer struct {
placementManager PlacementManager
scheduleManager ScheduleManager
tiflashReplicaManager TiFlashReplicaManager
resourceGroupManager ResourceGroupManager
resourceGroupManager pd.ResourceManagerClient
}

// ServerInfo is server static information.
Expand Down Expand Up @@ -186,7 +186,14 @@ func setGlobalInfoSyncer(is *InfoSyncer) {
}

// GlobalInfoSyncerInit return a new InfoSyncer. It is exported for testing.
func GlobalInfoSyncerInit(ctx context.Context, id string, serverIDGetter func() uint64, etcdCli *clientv3.Client, unprefixedEtcdCli *clientv3.Client, skipRegisterToDashBoard bool) (*InfoSyncer, error) {
func GlobalInfoSyncerInit(
ctx context.Context,
id string,
serverIDGetter func() uint64,
etcdCli, unprefixedEtcdCli *clientv3.Client,
pdCli pd.Client,
skipRegisterToDashBoard bool,
) (*InfoSyncer, error) {
is := &InfoSyncer{
etcdCli: etcdCli,
unprefixedEtcdCli: unprefixedEtcdCli,
Expand All @@ -201,8 +208,8 @@ func GlobalInfoSyncerInit(ctx context.Context, id string, serverIDGetter func()
is.labelRuleManager = initLabelRuleManager(etcdCli)
is.placementManager = initPlacementManager(etcdCli)
is.scheduleManager = initScheduleManager(etcdCli)
is.resourceGroupManager = initResourceGroupManager(etcdCli)
is.tiflashReplicaManager = initTiFlashReplicaManager(etcdCli)
is.resourceGroupManager = initResourceGroupManager(pdCli)
setGlobalInfoSyncer(is)
return is, nil
}
Expand Down Expand Up @@ -247,11 +254,11 @@ func initPlacementManager(etcdCli *clientv3.Client) PlacementManager {
return &PDPlacementManager{etcdCli: etcdCli}
}

func initResourceGroupManager(etcdCli *clientv3.Client) ResourceGroupManager {
if etcdCli == nil {
func initResourceGroupManager(pdCli pd.Client) pd.ResourceManagerClient {
if pdCli == nil {
return &mockResourceGroupManager{groups: make(map[string]*rmpb.ResourceGroup)}
}
return NewResourceManager(etcdCli)
return pdCli
}

func initTiFlashReplicaManager(etcdCli *clientv3.Client) TiFlashReplicaManager {
Expand Down Expand Up @@ -590,23 +597,24 @@ func GetResourceGroup(ctx context.Context, name string) (*rmpb.ResourceGroup, er
return is.resourceGroupManager.GetResourceGroup(ctx, name)
}

// GetAllResourceGroups is used to get all resource groups from resource manager.
func GetAllResourceGroups(ctx context.Context) ([]*rmpb.ResourceGroup, error) {
// ListResourceGroups is used to get all resource groups from resource manager.
func ListResourceGroups(ctx context.Context) ([]*rmpb.ResourceGroup, error) {
is, err := getGlobalInfoSyncer()
if err != nil {
return nil, err
}

return is.resourceGroupManager.GetAllResourceGroups(ctx)
return is.resourceGroupManager.ListResourceGroups(ctx)
}

// CreateResourceGroup is used to create one specific resource group to resource manager.
func CreateResourceGroup(ctx context.Context, group *rmpb.ResourceGroup) error {
// AddResourceGroup is used to create one specific resource group to resource manager.
func AddResourceGroup(ctx context.Context, group *rmpb.ResourceGroup) error {
is, err := getGlobalInfoSyncer()
if err != nil {
return err
}
return is.resourceGroupManager.CreateResourceGroup(ctx, group)
_, err = is.resourceGroupManager.AddResourceGroup(ctx, group)
return err
}

// ModifyResourceGroup is used to modify one specific resource group to resource manager.
Expand All @@ -615,7 +623,8 @@ func ModifyResourceGroup(ctx context.Context, group *rmpb.ResourceGroup) error {
if err != nil {
return err
}
return is.resourceGroupManager.ModifyResourceGroup(ctx, group)
_, err = is.resourceGroupManager.ModifyResourceGroup(ctx, group)
return err
}

// DeleteResourceGroup is used to delete one specific resource group from resource manager.
Expand All @@ -624,7 +633,8 @@ func DeleteResourceGroup(ctx context.Context, name string) error {
if err != nil {
return err
}
return is.resourceGroupManager.DeleteResourceGroup(ctx, name)
_, err = is.resourceGroupManager.DeleteResourceGroup(ctx, name)
return err
}

// PutRuleBundlesWithDefaultRetry will retry for default times
Expand Down
6 changes: 3 additions & 3 deletions domain/infosync/info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func TestTopology(t *testing.T) {
require.NoError(t, err)
}()

info, err := GlobalInfoSyncerInit(ctx, currentID, func() uint64 { return 1 }, client, client, false)
info, err := GlobalInfoSyncerInit(ctx, currentID, func() uint64 { return 1 }, client, client, nil, false)
require.NoError(t, err)

err = info.newTopologySessionAndStoreServerInfo(ctx, util2.NewSessionDefaultRetryCnt)
Expand Down Expand Up @@ -152,7 +152,7 @@ func (is *InfoSyncer) ttlKeyExists(ctx context.Context) (bool, error) {
}

func TestPutBundlesRetry(t *testing.T) {
_, err := GlobalInfoSyncerInit(context.TODO(), "test", func() uint64 { return 1 }, nil, nil, false)
_, err := GlobalInfoSyncerInit(context.TODO(), "test", func() uint64 { return 1 }, nil, nil, nil, false)
require.NoError(t, err)

bundle, err := placement.NewBundleFromOptions(&model.PlacementSettings{PrimaryRegion: "r1", Regions: "r1,r2"})
Expand Down Expand Up @@ -216,7 +216,7 @@ func TestPutBundlesRetry(t *testing.T) {

func TestTiFlashManager(t *testing.T) {
ctx := context.Background()
_, err := GlobalInfoSyncerInit(ctx, "test", func() uint64 { return 1 }, nil, nil, false)
_, err := GlobalInfoSyncerInit(ctx, "test", func() uint64 { return 1 }, nil, nil, nil, false)
tiflash := NewMockTiFlash()
SetMockTiFlash(tiflash)

Expand Down
Loading

0 comments on commit 6af6899

Please sign in to comment.