Skip to content

Commit

Permalink
*: Make PD placement api as an interface and add some mock codes (#29696
Browse files Browse the repository at this point in the history
)

* *: Make PD placement api as an interface and add some mock codes

* modify

* add comments

Co-authored-by: Arenatlx <314806019@qq.com>
  • Loading branch information
lcwangchao and AilinKid authored Nov 15, 2021
1 parent 64681dd commit d3a4187
Show file tree
Hide file tree
Showing 2 changed files with 135 additions and 55 deletions.
68 changes: 13 additions & 55 deletions domain/infosync/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package infosync

import (
"bytes"
"context"
"encoding/hex"
"encoding/json"
Expand Down Expand Up @@ -99,6 +98,7 @@ type InfoSyncer struct {
prometheusAddr string
modifyTime time.Time
labelRuleManager LabelRuleManager
placementManager PlacementManager
}

// ServerInfo is server static information.
Expand Down Expand Up @@ -179,8 +179,10 @@ func GlobalInfoSyncerInit(ctx context.Context, id string, serverIDGetter func()
}
if etcdCli != nil {
is.labelRuleManager = initLabelRuleManager(etcdCli.Endpoints())
is.placementManager = initPlacementManager(etcdCli.Endpoints())
} else {
is.labelRuleManager = initLabelRuleManager([]string{})
is.placementManager = initPlacementManager([]string{})
}
setGlobalInfoSyncer(is)
return is, nil
Expand Down Expand Up @@ -215,6 +217,13 @@ func initLabelRuleManager(addrs []string) LabelRuleManager {
return &PDLabelManager{addrs: addrs}
}

func initPlacementManager(addrs []string) PlacementManager {
if len(addrs) == 0 {
return &mockPlacementManager{}
}
return &PDPlacementManager{addrs: addrs}
}

// GetServerInfo gets self server static information.
func GetServerInfo() (*ServerInfo, error) {
is, err := getGlobalInfoSyncer()
Expand Down Expand Up @@ -401,22 +410,7 @@ func GetAllRuleBundles(ctx context.Context) ([]*placement.Bundle, error) {
return nil, err
}

bundles := []*placement.Bundle{}
if is.etcdCli == nil {
return bundles, nil
}

addrs := is.etcdCli.Endpoints()

if len(addrs) == 0 {
return nil, errors.Errorf("pd unavailable")
}

res, err := doRequest(ctx, addrs, path.Join(pdapi.Config, "placement-rule"), "GET", nil)
if err == nil && res != nil {
err = json.Unmarshal(res, &bundles)
}
return bundles, err
return is.placementManager.GetAllRuleBundles(ctx)
}

// GetRuleBundle is used to get one specific rule bundle from PD.
Expand All @@ -426,53 +420,17 @@ func GetRuleBundle(ctx context.Context, name string) (*placement.Bundle, error)
return nil, err
}

bundle := &placement.Bundle{ID: name}

if is.etcdCli == nil {
return bundle, nil
}

addrs := is.etcdCli.Endpoints()

if len(addrs) == 0 {
return nil, errors.Errorf("pd unavailable")
}

res, err := doRequest(ctx, addrs, path.Join(pdapi.Config, "placement-rule", name), "GET", nil)
if err == nil && res != nil {
err = json.Unmarshal(res, bundle)
}
return bundle, err
return is.placementManager.GetRuleBundle(ctx, name)
}

// PutRuleBundles is used to post specific rule bundles to PD.
func PutRuleBundles(ctx context.Context, bundles []*placement.Bundle) error {
if len(bundles) == 0 {
return nil
}

is, err := getGlobalInfoSyncer()
if err != nil {
return err
}

if is.etcdCli == nil {
return nil
}

addrs := is.etcdCli.Endpoints()

if len(addrs) == 0 {
return errors.Errorf("pd unavailable")
}

b, err := json.Marshal(bundles)
if err != nil {
return err
}

_, err = doRequest(ctx, addrs, path.Join(pdapi.Config, "placement-rule")+"?partial=true", "POST", bytes.NewReader(b))
return err
return is.placementManager.PutRuleBundles(ctx, bundles)
}

func (is *InfoSyncer) getAllServerInfo(ctx context.Context) (map[string]*ServerInfo, error) {
Expand Down
122 changes: 122 additions & 0 deletions domain/infosync/placement_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package infosync

import (
"bytes"
"context"
"encoding/json"
"path"
"sync"

"github.com/pingcap/tidb/ddl/placement"
"github.com/pingcap/tidb/util/pdapi"
)

// PlacementManager manages placement settings
type PlacementManager interface {
// GetRuleBundle is used to get one specific rule bundle from PD.
GetRuleBundle(ctx context.Context, name string) (*placement.Bundle, error)
// GetAllRuleBundles is used to get all rule bundles from PD. It is used to load full rules from PD while fullload infoschema.
GetAllRuleBundles(ctx context.Context) ([]*placement.Bundle, error)
// PutRuleBundles is used to post specific rule bundles to PD.
PutRuleBundles(ctx context.Context, bundles []*placement.Bundle) error
}

// PDPlacementManager manages placement with pd
type PDPlacementManager struct {
addrs []string
}

// GetRuleBundle is used to get one specific rule bundle from PD.
func (m *PDPlacementManager) GetRuleBundle(ctx context.Context, name string) (*placement.Bundle, error) {
bundle := &placement.Bundle{ID: name}
res, err := doRequest(ctx, m.addrs, path.Join(pdapi.Config, "placement-rule", name), "GET", nil)
if err == nil && res != nil {
err = json.Unmarshal(res, bundle)
}
return bundle, err
}

// GetAllRuleBundles is used to get all rule bundles from PD. It is used to load full rules from PD while fullload infoschema.
func (m *PDPlacementManager) GetAllRuleBundles(ctx context.Context) ([]*placement.Bundle, error) {
var bundles []*placement.Bundle
res, err := doRequest(ctx, m.addrs, path.Join(pdapi.Config, "placement-rule"), "GET", nil)
if err == nil && res != nil {
err = json.Unmarshal(res, &bundles)
}
return bundles, err
}

// PutRuleBundles is used to post specific rule bundles to PD.
func (m *PDPlacementManager) PutRuleBundles(ctx context.Context, bundles []*placement.Bundle) error {
if len(bundles) == 0 {
return nil
}

b, err := json.Marshal(bundles)
if err != nil {
return err
}

_, err = doRequest(ctx, m.addrs, path.Join(pdapi.Config, "placement-rule")+"?partial=true", "POST", bytes.NewReader(b))
return err
}

type mockPlacementManager struct {
sync.Mutex
bundles map[string]*placement.Bundle
}

func (m *mockPlacementManager) GetRuleBundle(_ context.Context, name string) (*placement.Bundle, error) {
m.Lock()
defer m.Unlock()

if bundle, ok := m.bundles[name]; ok {
return bundle, nil
}

return &placement.Bundle{ID: name}, nil
}

func (m *mockPlacementManager) GetAllRuleBundles(_ context.Context) ([]*placement.Bundle, error) {
m.Lock()
defer m.Unlock()

bundles := make([]*placement.Bundle, 0, len(m.bundles))
for _, bundle := range m.bundles {
bundles = append(bundles, bundle)
}
return bundles, nil
}

func (m *mockPlacementManager) PutRuleBundles(_ context.Context, bundles []*placement.Bundle) error {
m.Lock()
defer m.Unlock()

if m.bundles == nil {
m.bundles = make(map[string]*placement.Bundle)
}

for _, bundle := range bundles {
if bundle.IsEmpty() {
delete(m.bundles, bundle.ID)
} else {
m.bundles[bundle.ID] = bundle
}
}

return nil
}

0 comments on commit d3a4187

Please sign in to comment.