diff --git a/server/http_handler.go b/server/http_handler.go index 6378d6f4212fb..db8f2428aefdd 100644 --- a/server/http_handler.go +++ b/server/http_handler.go @@ -45,6 +45,7 @@ import ( "github.com/pingcap/tidb/sessionctx/binloginfo" "github.com/pingcap/tidb/sessionctx/stmtctx" "github.com/pingcap/tidb/sessionctx/variable" + "github.com/pingcap/tidb/store/helper" "github.com/pingcap/tidb/store/tikv" "github.com/pingcap/tidb/store/tikv/tikvrpc" "github.com/pingcap/tidb/table" @@ -110,55 +111,30 @@ func writeData(w http.ResponseWriter, data interface{}) { } type tikvHandlerTool struct { - regionCache *tikv.RegionCache - store kvStore + helper.Helper } // newTikvHandlerTool checks and prepares for tikv handler. // It would panic when any error happens. func (s *Server) newTikvHandlerTool() *tikvHandlerTool { - var tikvStore kvStore + var tikvStore tikv.Storage store, ok := s.driver.(*TiDBDriver) if !ok { panic("Invalid KvStore with illegal driver") } - if tikvStore, ok = store.store.(kvStore); !ok { + if tikvStore, ok = store.store.(tikv.Storage); !ok { panic("Invalid KvStore with illegal store") } regionCache := tikvStore.GetRegionCache() return &tikvHandlerTool{ - regionCache: regionCache, - store: tikvStore, - } -} - -func (t *tikvHandlerTool) getMvccByEncodedKey(encodedKey kv.Key) (*kvrpcpb.MvccGetByKeyResponse, error) { - keyLocation, err := t.regionCache.LocateKey(tikv.NewBackoffer(context.Background(), 500), encodedKey) - if err != nil { - return nil, errors.Trace(err) - } - - tikvReq := &tikvrpc.Request{ - Type: tikvrpc.CmdMvccGetByKey, - MvccGetByKey: &kvrpcpb.MvccGetByKeyRequest{ - Key: encodedKey, + helper.Helper{ + RegionCache: regionCache, + Store: tikvStore, }, } - kvResp, err := t.store.SendReq(tikv.NewBackoffer(context.Background(), 500), tikvReq, keyLocation.Region, time.Minute) - if err != nil { - logutil.Logger(context.Background()).Info("get MVCC by encoded key failed", - zap.Binary("encodeKey", encodedKey), - zap.Reflect("region", keyLocation.Region), - zap.Binary("startKey", keyLocation.StartKey), - zap.Binary("endKey", keyLocation.EndKey), - zap.Reflect("kvResp", kvResp), - zap.Error(err)) - return nil, errors.Trace(err) - } - return kvResp.MvccGetByKey, nil } type mvccKV struct { @@ -168,14 +144,14 @@ type mvccKV struct { func (t *tikvHandlerTool) getMvccByHandle(tableID, handle int64) (*mvccKV, error) { encodedKey := tablecodec.EncodeRowKeyWithHandle(tableID, handle) - data, err := t.getMvccByEncodedKey(encodedKey) - return &mvccKV{strings.ToUpper(hex.EncodeToString(encodedKey)), data}, err + data, err := t.GetMvccByEncodedKey(encodedKey) + return &mvccKV{Key: strings.ToUpper(hex.EncodeToString(encodedKey)), Value: data}, err } func (t *tikvHandlerTool) getMvccByStartTs(startTS uint64, startKey, endKey []byte) (*kvrpcpb.MvccGetByStartTsResponse, error) { bo := tikv.NewBackoffer(context.Background(), 5000) for { - curRegion, err := t.regionCache.LocateKey(bo, startKey) + curRegion, err := t.RegionCache.LocateKey(bo, startKey) if err != nil { logutil.Logger(context.Background()).Error("get MVCC by startTS failed", zap.Uint64("txnStartTS", startTS), zap.Binary("startKey", startKey), zap.Error(err)) return nil, errors.Trace(err) @@ -188,7 +164,7 @@ func (t *tikvHandlerTool) getMvccByStartTs(startTS uint64, startKey, endKey []by }, } tikvReq.Context.Priority = kvrpcpb.CommandPri_Low - kvResp, err := t.store.SendReq(bo, tikvReq, curRegion.Region, time.Hour) + kvResp, err := t.Store.SendReq(bo, tikvReq, curRegion.Region, time.Hour) if err != nil { logutil.Logger(context.Background()).Error("get MVCC by startTS failed", zap.Uint64("txnStartTS", startTS), @@ -257,7 +233,7 @@ func (t *tikvHandlerTool) getMvccByIdxValue(idx table.Index, values url.Values, if err != nil { return nil, errors.Trace(err) } - data, err := t.getMvccByEncodedKey(encodedKey) + data, err := t.GetMvccByEncodedKey(encodedKey) return &mvccKV{strings.ToUpper(hex.EncodeToString(encodedKey)), data}, err } @@ -310,7 +286,7 @@ func (t *tikvHandlerTool) getTable(dbName, tableName string) (*model.TableInfo, } func (t *tikvHandlerTool) schema() (infoschema.InfoSchema, error) { - session, err := session.CreateSession(t.store.(kv.Storage)) + session, err := session.CreateSession(t.Store.(kv.Storage)) if err != nil { return nil, errors.Trace(err) } @@ -322,36 +298,11 @@ func (t *tikvHandlerTool) handleMvccGetByHex(params map[string]string) (interfac if err != nil { return nil, errors.Trace(err) } - return t.getMvccByEncodedKey(encodedKey) -} - -func (t *tikvHandlerTool) getAllHistoryDDL() ([]*model.Job, error) { - s, err := session.CreateSession(t.store.(kv.Storage)) - if err != nil { - return nil, errors.Trace(err) - } - - if s != nil { - defer s.Close() - } - - store := domain.GetDomain(s.(sessionctx.Context)).Store() - txn, err := store.Begin() - - if err != nil { - return nil, errors.Trace(err) - } - txnMeta := meta.NewMeta(txn) - - jobs, err := txnMeta.GetAllHistoryDDLJobs() - if err != nil { - return nil, errors.Trace(err) - } - return jobs, nil + return t.GetMvccByEncodedKey(encodedKey) } -func (t *tikvHandlerTool) scrapeHotInfo(rw string) (map[tblIndex]regionMetric, error) { - regionMetrics, err := t.fetchHotRegion(rw) +func (t *tikvHandlerTool) scrapeHotInfo(rw string) (map[tblIndex]helper.RegionMetric, error) { + regionMetrics, err := t.FetchHotRegion(rw) if err != nil { return nil, err } @@ -363,34 +314,6 @@ func (t *tikvHandlerTool) scrapeHotInfo(rw string) (map[tblIndex]regionMetric, e return tblIdx, nil } -// storeHotRegionInfos records all hog region stores. -// it's the response of PD. -type storeHotRegionInfos struct { - AsPeer map[uint64]*hotRegionsStat `json:"as_peer"` - AsLeader map[uint64]*hotRegionsStat `json:"as_leader"` -} - -// hotRegions records echo store's hot region. -// it's the response of PD. -type hotRegionsStat struct { - RegionsStat []regionStat `json:"statistics"` -} - -// regionStat records each hot region's statistics -// it's the response of PD. -type regionStat struct { - RegionID uint64 `json:"region_id"` - FlowBytes uint64 `json:"flow_bytes"` - HotDegree int `json:"hot_degree"` -} - -// regionMetric presents the final metric output entry. -type regionMetric struct { - FlowBytes uint64 `json:"flow_bytes"` - MaxHotDegree int `json:"max_hot_degree"` - Count int `json:"region_count"` -} - // tblIndex presents the aggregate key that combined with db,table,index type tblIndex struct { DbName string `json:"db_name"` @@ -398,54 +321,15 @@ type tblIndex struct { IndexName string `json:"index_name"` } -func (t *tikvHandlerTool) fetchHotRegion(rw string) (map[uint64]regionMetric, error) { - etcd, ok := t.store.(domain.EtcdBackend) - if !ok { - return nil, errors.New("not implemented") - } - pdHosts := etcd.EtcdAddrs() - if len(pdHosts) == 0 { - return nil, errors.New("pd unavailable") - } - req, err := http.NewRequest("GET", protocol+pdHosts[0]+rw, nil) - if err != nil { - return nil, errors.Trace(err) - } - timeout, cancelFunc := context.WithTimeout(context.Background(), 50*time.Millisecond) - resp, err := http.DefaultClient.Do(req.WithContext(timeout)) - cancelFunc() - if err != nil { - return nil, errors.Trace(err) - } - defer func() { - err = resp.Body.Close() - if err != nil { - logutil.Logger(context.Background()).Error("close body failed", zap.Error(err)) - } - }() - var regionResp storeHotRegionInfos - err = json.NewDecoder(resp.Body).Decode(®ionResp) - if err != nil { - return nil, errors.Trace(err) - } - metric := make(map[uint64]regionMetric) - for _, hotRegions := range regionResp.AsLeader { - for _, region := range hotRegions.RegionsStat { - metric[region.RegionID] = regionMetric{FlowBytes: region.FlowBytes, MaxHotDegree: region.HotDegree} - } - } - return metric, nil -} - -func (t *tikvHandlerTool) fetchRegionTableIndex(metrics map[uint64]regionMetric) (map[tblIndex]regionMetric, error) { +func (t *tikvHandlerTool) fetchRegionTableIndex(metrics map[uint64]helper.RegionMetric) (map[tblIndex]helper.RegionMetric, error) { schema, err := t.schema() if err != nil { return nil, err } - idxMetrics := make(map[tblIndex]regionMetric) + idxMetrics := make(map[tblIndex]helper.RegionMetric) for regionID, regionMetric := range metrics { - region, err := t.regionCache.LocateRegionByID(tikv.NewBackoffer(context.Background(), 500), regionID) + region, err := t.RegionCache.LocateRegionByID(tikv.NewBackoffer(context.Background(), 500), regionID) if err != nil { logutil.Logger(context.Background()).Error("locate region failed", zap.Error(err)) continue @@ -713,7 +597,7 @@ type RegionFrameRange struct { func (t *tikvHandlerTool) getRegionsMeta(regionIDs []uint64) ([]RegionMeta, error) { regions := make([]RegionMeta, len(regionIDs)) for i, regionID := range regionIDs { - meta, leader, err := t.regionCache.PDClient().GetRegionByID(context.TODO(), regionID) + meta, leader, err := t.RegionCache.PDClient().GetRegionByID(context.TODO(), regionID) if err != nil { return nil, errors.Trace(err) } @@ -925,6 +809,31 @@ func (h ddlHistoryJobHandler) ServeHTTP(w http.ResponseWriter, req *http.Request writeData(w, jobs) } +func (h ddlHistoryJobHandler) getAllHistoryDDL() ([]*model.Job, error) { + s, err := session.CreateSession(h.Store.(kv.Storage)) + if err != nil { + return nil, errors.Trace(err) + } + + if s != nil { + defer s.Close() + } + + store := domain.GetDomain(s.(sessionctx.Context)).Store() + txn, err := store.Begin() + + if err != nil { + return nil, errors.Trace(err) + } + txnMeta := meta.NewMeta(txn) + + jobs, err := txnMeta.GetAllHistoryDDLJobs() + if err != nil { + return nil, errors.Trace(err) + } + return jobs, nil +} + func (h ddlResignOwnerHandler) resignDDLOwner() error { dom, err := session.GetDomain(h.store) if err != nil { @@ -958,7 +867,7 @@ func (h ddlResignOwnerHandler) ServeHTTP(w http.ResponseWriter, req *http.Reques func (h tableHandler) getPDAddr() ([]string, error) { var pdAddrs []string - etcd, ok := h.store.(domain.EtcdBackend) + etcd, ok := h.Store.(domain.EtcdBackend) if !ok { return nil, errors.New("not implemented") } @@ -1069,7 +978,7 @@ func (h tableHandler) handleRegionRequest(schema infoschema.InfoSchema, tbl tabl tableID := tbl.Meta().ID // for record startKey, endKey := tablecodec.GetTableHandleKeyRange(tableID) - recordRegionIDs, err := h.regionCache.ListRegionIDsInKeyRange(tikv.NewBackoffer(context.Background(), 500), startKey, endKey) + recordRegionIDs, err := h.RegionCache.ListRegionIDsInKeyRange(tikv.NewBackoffer(context.Background(), 500), startKey, endKey) if err != nil { writeError(w, err) return @@ -1087,7 +996,7 @@ func (h tableHandler) handleRegionRequest(schema infoschema.InfoSchema, tbl tabl indices[i].Name = index.Meta().Name.String() indices[i].ID = indexID startKey, endKey := tablecodec.GetTableIndexKeyRange(tableID, indexID) - rIDs, err := h.regionCache.ListRegionIDsInKeyRange(tikv.NewBackoffer(context.Background(), 500), startKey, endKey) + rIDs, err := h.RegionCache.ListRegionIDsInKeyRange(tikv.NewBackoffer(context.Background(), 500), startKey, endKey) if err != nil { writeError(w, err) return @@ -1161,7 +1070,7 @@ func (h tableHandler) handleDiskUsageRequest(schema infoschema.InfoSchema, tbl t type hotRegion struct { tblIndex - regionMetric + helper.RegionMetric } type hotRegions []hotRegion @@ -1187,7 +1096,7 @@ func (h regionHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { startKey := []byte{'m'} endKey := []byte{'n'} - recordRegionIDs, err := h.regionCache.ListRegionIDsInKeyRange(tikv.NewBackoffer(context.Background(), 500), startKey, endKey) + recordRegionIDs, err := h.RegionCache.ListRegionIDsInKeyRange(tikv.NewBackoffer(context.Background(), 500), startKey, endKey) if err != nil { writeError(w, err) return @@ -1212,7 +1121,7 @@ func (h regionHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { writeError(w, err) return } - asSortedEntry := func(metric map[tblIndex]regionMetric) hotRegions { + asSortedEntry := func(metric map[tblIndex]helper.RegionMetric) hotRegions { hs := make(hotRegions, 0, len(metric)) for key, value := range metric { hs = append(hs, hotRegion{key, value}) @@ -1237,7 +1146,7 @@ func (h regionHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { regionID := uint64(regionIDInt) // locate region - region, err := h.regionCache.LocateRegionByID(tikv.NewBackoffer(context.Background(), 500), regionID) + region, err := h.RegionCache.LocateRegionByID(tikv.NewBackoffer(context.Background(), 500), regionID) if err != nil { writeError(w, err) return @@ -1607,7 +1516,7 @@ type serverInfo struct { // ServeHTTP handles request of ddl server info. func (h serverInfoHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { - do, err := session.GetDomain(h.store.(kv.Storage)) + do, err := session.GetDomain(h.Store.(kv.Storage)) if err != nil { writeError(w, errors.New("create session error")) log.Error(err) @@ -1630,7 +1539,7 @@ type clusterServerInfo struct { // ServeHTTP handles request of all ddl servers info. func (h allServerInfoHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { - do, err := session.GetDomain(h.store.(kv.Storage)) + do, err := session.GetDomain(h.Store.(kv.Storage)) if err != nil { writeError(w, errors.New("create session error")) log.Error(err) diff --git a/server/http_handler_test.go b/server/http_handler_test.go index 1b90271a0aa75..826ce61481776 100644 --- a/server/http_handler_test.go +++ b/server/http_handler_test.go @@ -600,7 +600,7 @@ func (ts *HTTPHandlerTestSuite) TestAllHistory(c *C) { decoder := json.NewDecoder(resp.Body) var jobs []*model.Job - s, _ := session.CreateSession(ts.server.newTikvHandlerTool().store.(kv.Storage)) + s, _ := session.CreateSession(ts.server.newTikvHandlerTool().Store.(kv.Storage)) defer s.Close() store := domain.GetDomain(s.(sessionctx.Context)).Store() txn, _ := store.Begin() @@ -714,7 +714,7 @@ func (ts *HTTPHandlerTestSuite) TestServerInfo(c *C) { c.Assert(info.Version, Equals, mysql.ServerVersion) c.Assert(info.GitHash, Equals, printer.TiDBGitHash) - store := ts.server.newTikvHandlerTool().store.(kv.Storage) + store := ts.server.newTikvHandlerTool().Store.(kv.Storage) do, err := session.GetDomain(store.(kv.Storage)) c.Assert(err, IsNil) ddl := do.DDL() @@ -737,7 +737,7 @@ func (ts *HTTPHandlerTestSuite) TestAllServerInfo(c *C) { c.Assert(clusterInfo.IsAllServerVersionConsistent, IsTrue) c.Assert(clusterInfo.ServersNum, Equals, 1) - store := ts.server.newTikvHandlerTool().store.(kv.Storage) + store := ts.server.newTikvHandlerTool().Store.(kv.Storage) do, err := session.GetDomain(store.(kv.Storage)) c.Assert(err, IsNil) ddl := do.DDL() diff --git a/server/http_status.go b/server/http_status.go index 0a79ba0a95d70..62bd51c548c4f 100644 --- a/server/http_status.go +++ b/server/http_status.go @@ -68,7 +68,7 @@ func (s *Server) startHTTPServer() { router.Handle("/schema/{db}/{table}", schemaHandler{tikvHandlerTool}) router.Handle("/tables/{colID}/{colTp}/{colFlag}/{colLen}", valueHandler{}) router.Handle("/ddl/history", ddlHistoryJobHandler{tikvHandlerTool}).Name("DDL_History") - router.Handle("/ddl/owner/resign", ddlResignOwnerHandler{tikvHandlerTool.store.(kv.Storage)}).Name("DDL_Owner_Resign") + router.Handle("/ddl/owner/resign", ddlResignOwnerHandler{tikvHandlerTool.Store.(kv.Storage)}).Name("DDL_Owner_Resign") // HTTP path for get server info. router.Handle("/info", serverInfoHandler{tikvHandlerTool}).Name("Info") diff --git a/store/helper/helper.go b/store/helper/helper.go new file mode 100644 index 0000000000000..ee1ff6284eff9 --- /dev/null +++ b/store/helper/helper.go @@ -0,0 +1,135 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package helper + +import ( + "context" + "encoding/json" + "net/http" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/kvproto/pkg/kvrpcpb" + "github.com/pingcap/tidb/domain" + "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/store/tikv" + "github.com/pingcap/tidb/store/tikv/tikvrpc" + "github.com/pingcap/tidb/util/logutil" + "go.uber.org/zap" +) + +const ( + protocol = "http://" +) + +// Helper is a middleware to get some information from tikv/pd. It can be used for TiDB's http api or mem table. +type Helper struct { + Store tikv.Storage + RegionCache *tikv.RegionCache +} + +// GetMvccByEncodedKey get the MVCC value by the specific encoded key. +func (h *Helper) GetMvccByEncodedKey(encodedKey kv.Key) (*kvrpcpb.MvccGetByKeyResponse, error) { + keyLocation, err := h.RegionCache.LocateKey(tikv.NewBackoffer(context.Background(), 500), encodedKey) + if err != nil { + return nil, errors.Trace(err) + } + + tikvReq := &tikvrpc.Request{ + Type: tikvrpc.CmdMvccGetByKey, + MvccGetByKey: &kvrpcpb.MvccGetByKeyRequest{ + Key: encodedKey, + }, + } + kvResp, err := h.Store.SendReq(tikv.NewBackoffer(context.Background(), 500), tikvReq, keyLocation.Region, time.Minute) + if err != nil { + logutil.Logger(context.Background()).Info("get MVCC by encoded key failed", + zap.Binary("encodeKey", encodedKey), + zap.Reflect("region", keyLocation.Region), + zap.Binary("startKey", keyLocation.StartKey), + zap.Binary("endKey", keyLocation.EndKey), + zap.Reflect("kvResp", kvResp), + zap.Error(err)) + return nil, errors.Trace(err) + } + return kvResp.MvccGetByKey, nil +} + +// StoreHotRegionInfos records all hog region stores. +// it's the response of PD. +type StoreHotRegionInfos struct { + AsPeer map[uint64]*hotRegionsStat `json:"as_peer"` + AsLeader map[uint64]*hotRegionsStat `json:"as_leader"` +} + +// hotRegions records echo store's hot region. +// it's the response of PD. +type hotRegionsStat struct { + RegionsStat []regionStat `json:"statistics"` +} + +// regionStat records each hot region's statistics +// it's the response of PD. +type regionStat struct { + RegionID uint64 `json:"region_id"` + FlowBytes uint64 `json:"flow_bytes"` + HotDegree int `json:"hot_degree"` +} + +// RegionMetric presents the final metric output entry. +type RegionMetric struct { + FlowBytes uint64 `json:"flow_bytes"` + MaxHotDegree int `json:"max_hot_degree"` + Count int `json:"region_count"` +} + +// FetchHotRegion fetches the hot region information from PD's http api. +func (h *Helper) FetchHotRegion(rw string) (map[uint64]RegionMetric, error) { + etcd, ok := h.Store.(domain.EtcdBackend) + if !ok { + return nil, errors.WithStack(errors.New("not implemented")) + } + pdHosts := etcd.EtcdAddrs() + if len(pdHosts) == 0 { + return nil, errors.New("pd unavailable") + } + req, err := http.NewRequest("GET", protocol+pdHosts[0]+rw, nil) + if err != nil { + return nil, errors.Trace(err) + } + timeout, cancelFunc := context.WithTimeout(context.Background(), 50*time.Millisecond) + resp, err := http.DefaultClient.Do(req.WithContext(timeout)) + cancelFunc() + if err != nil { + return nil, errors.Trace(err) + } + defer func() { + err = resp.Body.Close() + if err != nil { + logutil.Logger(context.Background()).Error("close body failed", zap.Error(err)) + } + }() + var regionResp StoreHotRegionInfos + err = json.NewDecoder(resp.Body).Decode(®ionResp) + if err != nil { + return nil, errors.Trace(err) + } + metric := make(map[uint64]RegionMetric) + for _, hotRegions := range regionResp.AsLeader { + for _, region := range hotRegions.RegionsStat { + metric[region.RegionID] = RegionMetric{FlowBytes: region.FlowBytes, MaxHotDegree: region.HotDegree} + } + } + return metric, nil +} diff --git a/store/helper/helper_test.go b/store/helper/helper_test.go new file mode 100644 index 0000000000000..697a46c5b5b81 --- /dev/null +++ b/store/helper/helper_test.go @@ -0,0 +1,118 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package helper + +import ( + "crypto/tls" + "encoding/json" + "fmt" + "net/http" + "testing" + "time" + + "github.com/gorilla/mux" + . "github.com/pingcap/check" + "github.com/pingcap/log" + "github.com/pingcap/tidb/store/mockstore" + "github.com/pingcap/tidb/store/mockstore/mocktikv" + "github.com/pingcap/tidb/store/tikv" + "go.uber.org/zap" +) + +type HelperTestSuite struct { + store tikv.Storage +} + +var _ = Suite(new(HelperTestSuite)) + +func TestT(t *testing.T) { + CustomVerboseFlag = true + TestingT(t) +} + +type mockStore struct { + tikv.Storage + pdAddrs []string +} + +func (s *mockStore) EtcdAddrs() []string { + return s.pdAddrs +} + +func (s *mockStore) StartGCWorker() error { + panic("not implemented") +} + +func (s *mockStore) TLSConfig() *tls.Config { + panic("not implemented") +} + +func (s *HelperTestSuite) SetUpSuite(c *C) { + go s.mockPDHTTPServer(c) + time.Sleep(100 * time.Millisecond) + mvccStore := mocktikv.MustNewMVCCStore() + mockTikvStore, err := mockstore.NewMockTikvStore(mockstore.WithMVCCStore(mvccStore)) + s.store = &mockStore{ + mockTikvStore.(tikv.Storage), + []string{"127.0.0.1:10090/"}, + } + c.Assert(err, IsNil) +} + +func (s *HelperTestSuite) TestHotRegion(c *C) { + helper := Helper{ + Store: s.store, + RegionCache: s.store.GetRegionCache(), + } + regionMetric, err := helper.FetchHotRegion("/pd/api/v1/hotspot/regions/read") + c.Assert(err, IsNil, Commentf("err: %+v", err)) + c.Assert(fmt.Sprintf("%v", regionMetric), Equals, "map[1:{100 1 0}]") +} + +func (s *HelperTestSuite) mockPDHTTPServer(c *C) { + router := mux.NewRouter() + router.HandleFunc("/pd/api/v1/hotspot/regions/read", s.mockHotRegionResponse) + serverMux := http.NewServeMux() + serverMux.Handle("/", router) + server := &http.Server{Addr: "127.0.0.1:10090", Handler: serverMux} + err := server.ListenAndServe() + c.Assert(err, IsNil) +} + +func (s *HelperTestSuite) mockHotRegionResponse(w http.ResponseWriter, req *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + regionsStat := hotRegionsStat{ + []regionStat{ + { + FlowBytes: 100, + RegionID: 1, + HotDegree: 1, + }, + }, + } + resp := StoreHotRegionInfos{ + AsLeader: make(map[uint64]*hotRegionsStat), + } + resp.AsLeader[0] = ®ionsStat + data, err := json.MarshalIndent(resp, "", " ") + if err != nil { + log.Panic("json marshal failed", zap.Error(err)) + } + _, err = w.Write(data) + if err != nil { + log.Panic("write http response failed", zap.Error(err)) + } + +}