From d3a4187f7e99a0c156912b6831686e89fc8b6a85 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E8=B6=85?= Date: Mon, 15 Nov 2021 14:08:56 +0800 Subject: [PATCH] *: Make PD placement api as an interface and add some mock codes (#29696) * *: Make PD placement api as an interface and add some mock codes * modify * add comments Co-authored-by: Arenatlx <314806019@qq.com> --- domain/infosync/info.go | 68 +++------------ domain/infosync/placement_manager.go | 122 +++++++++++++++++++++++++++ 2 files changed, 135 insertions(+), 55 deletions(-) create mode 100644 domain/infosync/placement_manager.go diff --git a/domain/infosync/info.go b/domain/infosync/info.go index 57f7e336a4ccf..b159100f0f92e 100644 --- a/domain/infosync/info.go +++ b/domain/infosync/info.go @@ -15,7 +15,6 @@ package infosync import ( - "bytes" "context" "encoding/hex" "encoding/json" @@ -99,6 +98,7 @@ type InfoSyncer struct { prometheusAddr string modifyTime time.Time labelRuleManager LabelRuleManager + placementManager PlacementManager } // ServerInfo is server static information. @@ -179,8 +179,10 @@ func GlobalInfoSyncerInit(ctx context.Context, id string, serverIDGetter func() } if etcdCli != nil { is.labelRuleManager = initLabelRuleManager(etcdCli.Endpoints()) + is.placementManager = initPlacementManager(etcdCli.Endpoints()) } else { is.labelRuleManager = initLabelRuleManager([]string{}) + is.placementManager = initPlacementManager([]string{}) } setGlobalInfoSyncer(is) return is, nil @@ -215,6 +217,13 @@ func initLabelRuleManager(addrs []string) LabelRuleManager { return &PDLabelManager{addrs: addrs} } +func initPlacementManager(addrs []string) PlacementManager { + if len(addrs) == 0 { + return &mockPlacementManager{} + } + return &PDPlacementManager{addrs: addrs} +} + // GetServerInfo gets self server static information. func GetServerInfo() (*ServerInfo, error) { is, err := getGlobalInfoSyncer() @@ -401,22 +410,7 @@ func GetAllRuleBundles(ctx context.Context) ([]*placement.Bundle, error) { return nil, err } - bundles := []*placement.Bundle{} - if is.etcdCli == nil { - return bundles, nil - } - - addrs := is.etcdCli.Endpoints() - - if len(addrs) == 0 { - return nil, errors.Errorf("pd unavailable") - } - - res, err := doRequest(ctx, addrs, path.Join(pdapi.Config, "placement-rule"), "GET", nil) - if err == nil && res != nil { - err = json.Unmarshal(res, &bundles) - } - return bundles, err + return is.placementManager.GetAllRuleBundles(ctx) } // GetRuleBundle is used to get one specific rule bundle from PD. @@ -426,53 +420,17 @@ func GetRuleBundle(ctx context.Context, name string) (*placement.Bundle, error) return nil, err } - bundle := &placement.Bundle{ID: name} - - if is.etcdCli == nil { - return bundle, nil - } - - addrs := is.etcdCli.Endpoints() - - if len(addrs) == 0 { - return nil, errors.Errorf("pd unavailable") - } - - res, err := doRequest(ctx, addrs, path.Join(pdapi.Config, "placement-rule", name), "GET", nil) - if err == nil && res != nil { - err = json.Unmarshal(res, bundle) - } - return bundle, err + return is.placementManager.GetRuleBundle(ctx, name) } // PutRuleBundles is used to post specific rule bundles to PD. func PutRuleBundles(ctx context.Context, bundles []*placement.Bundle) error { - if len(bundles) == 0 { - return nil - } - is, err := getGlobalInfoSyncer() if err != nil { return err } - if is.etcdCli == nil { - return nil - } - - addrs := is.etcdCli.Endpoints() - - if len(addrs) == 0 { - return errors.Errorf("pd unavailable") - } - - b, err := json.Marshal(bundles) - if err != nil { - return err - } - - _, err = doRequest(ctx, addrs, path.Join(pdapi.Config, "placement-rule")+"?partial=true", "POST", bytes.NewReader(b)) - return err + return is.placementManager.PutRuleBundles(ctx, bundles) } func (is *InfoSyncer) getAllServerInfo(ctx context.Context) (map[string]*ServerInfo, error) { diff --git a/domain/infosync/placement_manager.go b/domain/infosync/placement_manager.go new file mode 100644 index 0000000000000..7c4db7dcd61e3 --- /dev/null +++ b/domain/infosync/placement_manager.go @@ -0,0 +1,122 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package infosync + +import ( + "bytes" + "context" + "encoding/json" + "path" + "sync" + + "github.com/pingcap/tidb/ddl/placement" + "github.com/pingcap/tidb/util/pdapi" +) + +// PlacementManager manages placement settings +type PlacementManager interface { + // GetRuleBundle is used to get one specific rule bundle from PD. + GetRuleBundle(ctx context.Context, name string) (*placement.Bundle, error) + // GetAllRuleBundles is used to get all rule bundles from PD. It is used to load full rules from PD while fullload infoschema. + GetAllRuleBundles(ctx context.Context) ([]*placement.Bundle, error) + // PutRuleBundles is used to post specific rule bundles to PD. + PutRuleBundles(ctx context.Context, bundles []*placement.Bundle) error +} + +// PDPlacementManager manages placement with pd +type PDPlacementManager struct { + addrs []string +} + +// GetRuleBundle is used to get one specific rule bundle from PD. +func (m *PDPlacementManager) GetRuleBundle(ctx context.Context, name string) (*placement.Bundle, error) { + bundle := &placement.Bundle{ID: name} + res, err := doRequest(ctx, m.addrs, path.Join(pdapi.Config, "placement-rule", name), "GET", nil) + if err == nil && res != nil { + err = json.Unmarshal(res, bundle) + } + return bundle, err +} + +// GetAllRuleBundles is used to get all rule bundles from PD. It is used to load full rules from PD while fullload infoschema. +func (m *PDPlacementManager) GetAllRuleBundles(ctx context.Context) ([]*placement.Bundle, error) { + var bundles []*placement.Bundle + res, err := doRequest(ctx, m.addrs, path.Join(pdapi.Config, "placement-rule"), "GET", nil) + if err == nil && res != nil { + err = json.Unmarshal(res, &bundles) + } + return bundles, err +} + +// PutRuleBundles is used to post specific rule bundles to PD. +func (m *PDPlacementManager) PutRuleBundles(ctx context.Context, bundles []*placement.Bundle) error { + if len(bundles) == 0 { + return nil + } + + b, err := json.Marshal(bundles) + if err != nil { + return err + } + + _, err = doRequest(ctx, m.addrs, path.Join(pdapi.Config, "placement-rule")+"?partial=true", "POST", bytes.NewReader(b)) + return err +} + +type mockPlacementManager struct { + sync.Mutex + bundles map[string]*placement.Bundle +} + +func (m *mockPlacementManager) GetRuleBundle(_ context.Context, name string) (*placement.Bundle, error) { + m.Lock() + defer m.Unlock() + + if bundle, ok := m.bundles[name]; ok { + return bundle, nil + } + + return &placement.Bundle{ID: name}, nil +} + +func (m *mockPlacementManager) GetAllRuleBundles(_ context.Context) ([]*placement.Bundle, error) { + m.Lock() + defer m.Unlock() + + bundles := make([]*placement.Bundle, 0, len(m.bundles)) + for _, bundle := range m.bundles { + bundles = append(bundles, bundle) + } + return bundles, nil +} + +func (m *mockPlacementManager) PutRuleBundles(_ context.Context, bundles []*placement.Bundle) error { + m.Lock() + defer m.Unlock() + + if m.bundles == nil { + m.bundles = make(map[string]*placement.Bundle) + } + + for _, bundle := range bundles { + if bundle.IsEmpty() { + delete(m.bundles, bundle.ID) + } else { + m.bundles[bundle.ID] = bundle + } + } + + return nil +}