diff --git a/client/http/interface.go b/client/http/interface.go index 11c24beaefd..3684e19b1f5 100644 --- a/client/http/interface.go +++ b/client/http/interface.go @@ -49,6 +49,7 @@ type Client interface { GetRegionStatusByKeyRange(context.Context, *KeyRange, bool) (*RegionStats, error) GetStores(context.Context) (*StoresInfo, error) GetStore(context.Context, uint64) (*StoreInfo, error) + DeleteStore(context.Context, uint64) error SetStoreLabels(context.Context, int64, map[string]string) error GetHealthStatus(context.Context) ([]Health, error) /* Config-related interfaces */ @@ -440,6 +441,14 @@ func (c *client) GetStore(ctx context.Context, storeID uint64) (*StoreInfo, erro return &store, nil } +// DeleteStore deletes the store by ID. +func (c *client) DeleteStore(ctx context.Context, storeID uint64) error { + return c.request(ctx, newRequestInfo(). + WithName(deleteStoreName). + WithURI(StoreByID(storeID)). + WithMethod(http.MethodDelete)) +} + // GetClusterVersion gets the cluster version. func (c *client) GetClusterVersion(ctx context.Context) (string, error) { var version string diff --git a/client/http/request_info.go b/client/http/request_info.go index 3fb91c6ca97..40bd0368250 100644 --- a/client/http/request_info.go +++ b/client/http/request_info.go @@ -39,6 +39,7 @@ const ( getRegionStatusByKeyRangeName = "GetRegionStatusByKeyRange" getStoresName = "GetStores" getStoreName = "GetStore" + deleteStoreName = "DeleteStore" setStoreLabelsName = "SetStoreLabels" getHealthStatusName = "GetHealthStatus" getConfigName = "GetConfig" diff --git a/tests/integrations/client/http_client_test.go b/tests/integrations/client/http_client_test.go index 9d7e0985940..f4a48dcd63e 100644 --- a/tests/integrations/client/http_client_test.go +++ b/tests/integrations/client/http_client_test.go @@ -26,6 +26,7 @@ import ( "time" "github.com/pingcap/errors" + "github.com/pingcap/kvproto/pkg/metapb" "github.com/prometheus/client_golang/prometheus" dto "github.com/prometheus/client_model/go" "github.com/stretchr/testify/require" @@ -80,6 +81,15 @@ func (suite *httpClientTestSuite) SetupSuite() { leaderServer := cluster.GetLeaderServer() err = leaderServer.BootstrapCluster() + // Add 2 more stores to the cluster. + for i := 2; i <= 4; i++ { + tests.MustPutStore(re, cluster, &metapb.Store{ + Id: uint64(i), + State: metapb.StoreState_Up, + NodeState: metapb.NodeState_Serving, + LastHeartbeat: time.Now().UnixNano(), + }) + } re.NoError(err) for _, region := range []*core.RegionInfo{ core.NewTestRegionInfo(10, 1, []byte("a1"), []byte("a2")), @@ -165,29 +175,29 @@ func (suite *httpClientTestSuite) TestMeta() { re.Empty(regionStats.StoreLeaderCount) hotReadRegions, err := client.GetHotReadRegions(ctx) re.NoError(err) - re.Len(hotReadRegions.AsPeer, 1) - re.Len(hotReadRegions.AsLeader, 1) + re.Len(hotReadRegions.AsPeer, 4) + re.Len(hotReadRegions.AsLeader, 4) hotWriteRegions, err := client.GetHotWriteRegions(ctx) re.NoError(err) - re.Len(hotWriteRegions.AsPeer, 1) - re.Len(hotWriteRegions.AsLeader, 1) + re.Len(hotWriteRegions.AsPeer, 4) + re.Len(hotWriteRegions.AsLeader, 4) historyHorRegions, err := client.GetHistoryHotRegions(ctx, &pd.HistoryHotRegionsRequest{ StartTime: 0, EndTime: time.Now().AddDate(0, 0, 1).UnixNano() / int64(time.Millisecond), }) re.NoError(err) re.Empty(historyHorRegions.HistoryHotRegion) - store, err := client.GetStores(ctx) + stores, err := client.GetStores(ctx) re.NoError(err) - re.Equal(1, store.Count) - re.Len(store.Stores, 1) - storeID := uint64(store.Stores[0].Store.ID) // TODO: why type is different? + re.Equal(4, stores.Count) + re.Len(stores.Stores, 4) + storeID := uint64(stores.Stores[0].Store.ID) // TODO: why type is different? store2, err := client.GetStore(ctx, storeID) re.NoError(err) re.EqualValues(storeID, store2.Store.ID) version, err := client.GetClusterVersion(ctx) re.NoError(err) - re.Equal("0.0.0", version) + re.Equal("1.0.0", version) rgs, _ := client.GetRegionsByKeyRange(ctx, pd.NewKeyRange([]byte("a"), []byte("a1")), 100) re.Equal(int64(0), rgs.Count) rgs, _ = client.GetRegionsByKeyRange(ctx, pd.NewKeyRange([]byte("a1"), []byte("a3")), 100) @@ -196,6 +206,12 @@ func (suite *httpClientTestSuite) TestMeta() { re.Equal(int64(1), rgs.Count) rgs, _ = client.GetRegionsByKeyRange(ctx, pd.NewKeyRange([]byte(""), []byte("")), 100) re.Equal(int64(2), rgs.Count) + // store 2 origin status:offline + err = client.DeleteStore(ctx, 2) + re.NoError(err) + store2, err = client.GetStore(ctx, 2) + re.NoError(err) + re.Equal(int64(metapb.StoreState_Offline), store2.Store.State) } func (suite *httpClientTestSuite) TestGetMinResolvedTSByStoresIDs() { diff --git a/tools/pd-simulator/main.go b/tools/pd-simulator/main.go index 45b3ecd75c9..e3dc43ca122 100644 --- a/tools/pd-simulator/main.go +++ b/tools/pd-simulator/main.go @@ -25,6 +25,7 @@ import ( "github.com/BurntSushi/toml" "github.com/pingcap/log" flag "github.com/spf13/pflag" + pdHttp "github.com/tikv/pd/client/http" "github.com/tikv/pd/pkg/schedule/schedulers" "github.com/tikv/pd/pkg/statistics" "github.com/tikv/pd/pkg/utils/logutil" @@ -92,6 +93,7 @@ func main() { func run(simCase string, simConfig *sc.SimConfig) { if *pdAddr != "" { + simulator.PDHTTPClient = pdHttp.NewClient("pd-simulator", []string{*pdAddr}) simStart(*pdAddr, *statusAddress, simCase, simConfig) } else { local, clean := NewSingleServer(context.Background(), simConfig) @@ -105,6 +107,7 @@ func run(simCase string, simConfig *sc.SimConfig) { } time.Sleep(100 * time.Millisecond) } + simulator.PDHTTPClient = pdHttp.NewClient("pd-simulator", []string{local.GetAddr()}) simStart(local.GetAddr(), "", simCase, simConfig, clean) } } @@ -183,6 +186,9 @@ EXIT: analysis.GetTransferCounter().PrintResult() } + if simulator.PDHTTPClient != nil { + simulator.PDHTTPClient.Close() + } if simResult != "OK" { os.Exit(1) } diff --git a/tools/pd-simulator/simulator/cases/cases.go b/tools/pd-simulator/simulator/cases/cases.go index 00b5404669f..c4e2f999978 100644 --- a/tools/pd-simulator/simulator/cases/cases.go +++ b/tools/pd-simulator/simulator/cases/cases.go @@ -16,8 +16,8 @@ package cases import ( "github.com/pingcap/kvproto/pkg/metapb" + pdHttp "github.com/tikv/pd/client/http" "github.com/tikv/pd/pkg/core" - "github.com/tikv/pd/pkg/schedule/placement" "github.com/tikv/pd/pkg/utils/typeutil" "github.com/tikv/pd/tools/pd-simulator/simulator/config" "github.com/tikv/pd/tools/pd-simulator/simulator/info" @@ -57,7 +57,7 @@ type Case struct { TableNumber int Checker CheckerFunc // To check the schedule is finished. - Rules []*placement.Rule + Rules []*pdHttp.Rule Labels typeutil.StringSlice } diff --git a/tools/pd-simulator/simulator/cases/diagnose_rule.go b/tools/pd-simulator/simulator/cases/diagnose_rule.go index 5d34e051071..2cd11b9624a 100644 --- a/tools/pd-simulator/simulator/cases/diagnose_rule.go +++ b/tools/pd-simulator/simulator/cases/diagnose_rule.go @@ -19,6 +19,7 @@ import ( "github.com/docker/go-units" "github.com/pingcap/kvproto/pkg/metapb" + pdHttp "github.com/tikv/pd/client/http" "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/schedule/placement" sc "github.com/tikv/pd/tools/pd-simulator/simulator/config" @@ -30,15 +31,15 @@ import ( func newRule1(_ *sc.SimConfig) *Case { var simCase Case - simCase.Rules = make([]*placement.Rule, 0) - simCase.Rules = append(simCase.Rules, &placement.Rule{ + simCase.Rules = make([]*pdHttp.Rule, 0) + simCase.Rules = append(simCase.Rules, &pdHttp.Rule{ GroupID: "test1", ID: "test1", StartKeyHex: "", EndKeyHex: "", - Role: placement.Learner, + Role: pdHttp.Learner, Count: 1, - LabelConstraints: []placement.LabelConstraint{ + LabelConstraints: []pdHttp.LabelConstraint{ { Key: "region", Op: "in", @@ -46,14 +47,14 @@ func newRule1(_ *sc.SimConfig) *Case { }, }, LocationLabels: []string{"host"}, - }, &placement.Rule{ + }, &pdHttp.Rule{ GroupID: placement.DefaultGroupID, ID: placement.DefaultRuleID, StartKeyHex: "", EndKeyHex: "", - Role: placement.Voter, + Role: pdHttp.Voter, Count: 5, - LabelConstraints: []placement.LabelConstraint{ + LabelConstraints: []pdHttp.LabelConstraint{ { Key: "region", Op: "in", @@ -130,16 +131,16 @@ func newRule1(_ *sc.SimConfig) *Case { func newRule2(_ *sc.SimConfig) *Case { var simCase Case - simCase.Rules = make([]*placement.Rule, 0) + simCase.Rules = make([]*pdHttp.Rule, 0) simCase.Rules = append(simCase.Rules, - &placement.Rule{ + &pdHttp.Rule{ GroupID: "test1", ID: "test1", StartKeyHex: "", EndKeyHex: "", - Role: placement.Leader, + Role: pdHttp.Leader, Count: 1, - LabelConstraints: []placement.LabelConstraint{ + LabelConstraints: []pdHttp.LabelConstraint{ { Key: "region", Op: "in", diff --git a/tools/pd-simulator/simulator/client.go b/tools/pd-simulator/simulator/client.go index 50ed57995df..113eadab5e0 100644 --- a/tools/pd-simulator/simulator/client.go +++ b/tools/pd-simulator/simulator/client.go @@ -15,11 +15,7 @@ package simulator import ( - "bytes" "context" - "encoding/json" - "fmt" - "net/http" "strings" "sync" "time" @@ -27,8 +23,8 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" + pdHttp "github.com/tikv/pd/client/http" "github.com/tikv/pd/pkg/core" - "github.com/tikv/pd/pkg/schedule/placement" "github.com/tikv/pd/pkg/utils/typeutil" sc "github.com/tikv/pd/tools/pd-simulator/simulator/config" "github.com/tikv/pd/tools/pd-simulator/simulator/simutil" @@ -54,12 +50,12 @@ type Client interface { const ( pdTimeout = time.Second maxInitClusterRetries = 100 - httpPrefix = "pd/api/v1" ) var ( // errFailInitClusterID is returned when failed to load clusterID from all supplied PD addresses. errFailInitClusterID = errors.New("[pd] failed to get cluster id") + PDHTTPClient pdHttp.Client ) type client struct { @@ -67,7 +63,6 @@ type client struct { tag string clusterID uint64 clientConn *grpc.ClientConn - httpClient *http.Client reportRegionHeartbeatCh chan *core.RegionInfo receiveRegionHeartbeatCh chan *pdpb.RegionHeartbeatResponse @@ -88,7 +83,6 @@ func NewClient(pdAddr string, tag string) (Client, <-chan *pdpb.RegionHeartbeatR ctx: ctx, cancel: cancel, tag: tag, - httpClient: &http.Client{}, } cc, err := c.createConn() if err != nil { @@ -319,46 +313,27 @@ func (c *client) PutStore(ctx context.Context, store *metapb.Store) error { func (c *client) PutPDConfig(config *sc.PDConfig) error { if len(config.PlacementRules) > 0 { - path := fmt.Sprintf("%s/%s/config/rules/batch", c.url, httpPrefix) - ruleOps := make([]*placement.RuleOp, 0) + ruleOps := make([]*pdHttp.RuleOp, 0) for _, rule := range config.PlacementRules { - ruleOps = append(ruleOps, &placement.RuleOp{ + ruleOps = append(ruleOps, &pdHttp.RuleOp{ Rule: rule, - Action: placement.RuleOpAdd, + Action: pdHttp.RuleOpAdd, }) } - content, _ := json.Marshal(ruleOps) - req, err := http.NewRequest(http.MethodPost, path, bytes.NewBuffer(content)) - req.Header.Add("Content-Type", "application/json") + err := PDHTTPClient.SetPlacementRuleInBatch(c.ctx, ruleOps) if err != nil { return err } - res, err := c.httpClient.Do(req) - if err != nil { - return err - } - defer res.Body.Close() - simutil.Logger.Info("add placement rule success", zap.String("rules", string(content))) + simutil.Logger.Info("add placement rule success", zap.Any("rules", config.PlacementRules)) } if len(config.LocationLabels) > 0 { - path := fmt.Sprintf("%s/%s/config", c.url, httpPrefix) data := make(map[string]any) data["location-labels"] = config.LocationLabels - content, err := json.Marshal(data) - if err != nil { - return err - } - req, err := http.NewRequest(http.MethodPost, path, bytes.NewBuffer(content)) - req.Header.Add("Content-Type", "application/json") - if err != nil { - return err - } - res, err := c.httpClient.Do(req) + err := PDHTTPClient.SetConfig(c.ctx, data) if err != nil { return err } - defer res.Body.Close() - simutil.Logger.Info("add location labels success", zap.String("labels", string(content))) + simutil.Logger.Info("add location labels success", zap.Any("labels", config.LocationLabels)) } return nil } diff --git a/tools/pd-simulator/simulator/config/config.go b/tools/pd-simulator/simulator/config/config.go index 01bf8199ab4..6598cf35c0f 100644 --- a/tools/pd-simulator/simulator/config/config.go +++ b/tools/pd-simulator/simulator/config/config.go @@ -21,8 +21,8 @@ import ( "github.com/BurntSushi/toml" "github.com/docker/go-units" + pdHttp "github.com/tikv/pd/client/http" sc "github.com/tikv/pd/pkg/schedule/config" - "github.com/tikv/pd/pkg/schedule/placement" "github.com/tikv/pd/pkg/utils/configutil" "github.com/tikv/pd/pkg/utils/tempurl" "github.com/tikv/pd/pkg/utils/typeutil" @@ -133,6 +133,6 @@ func (sc *SimConfig) Speed() uint64 { // PDConfig saves some config which may be changed in PD. type PDConfig struct { - PlacementRules []*placement.Rule + PlacementRules []*pdHttp.Rule LocationLabels typeutil.StringSlice } diff --git a/tools/pd-simulator/simulator/event.go b/tools/pd-simulator/simulator/event.go index 8be8f89d759..20c75b58384 100644 --- a/tools/pd-simulator/simulator/event.go +++ b/tools/pd-simulator/simulator/event.go @@ -216,6 +216,12 @@ func (*DownNode) Run(raft *RaftEngine, _ int64) bool { return false } delete(raft.conn.Nodes, node.Id) + // delete store + err := PDHTTPClient.DeleteStore(context.Background(), node.Id) + if err != nil { + simutil.Logger.Error("put store failed", zap.Uint64("node-id", node.Id), zap.Error(err)) + return false + } node.Stop() regions := raft.GetRegions() diff --git a/tools/pd-simulator/simulator/node.go b/tools/pd-simulator/simulator/node.go index 883b5d4474b..c51cdfd8a38 100644 --- a/tools/pd-simulator/simulator/node.go +++ b/tools/pd-simulator/simulator/node.go @@ -72,6 +72,7 @@ func NewNode(s *cases.Store, pdAddr string, config *sc.SimConfig) (*Node, error) StoreId: s.ID, Capacity: uint64(config.RaftStore.Capacity), StartTime: uint32(time.Now().Unix()), + Available: uint64(config.RaftStore.Capacity), }, } tag := fmt.Sprintf("store %d", s.ID)