Skip to content

Commit

Permalink
server: move out some code from http_handler.go (#10071)
Browse files Browse the repository at this point in the history
  • Loading branch information
winoros authored Apr 9, 2019
1 parent 1c21151 commit 7acbe52
Show file tree
Hide file tree
Showing 5 changed files with 311 additions and 149 deletions.
199 changes: 54 additions & 145 deletions server/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
"github.com/pingcap/tidb/sessionctx/binloginfo"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/store/helper"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
"github.com/pingcap/tidb/table"
Expand Down Expand Up @@ -110,55 +111,30 @@ func writeData(w http.ResponseWriter, data interface{}) {
}

type tikvHandlerTool struct {
regionCache *tikv.RegionCache
store kvStore
helper.Helper
}

// newTikvHandlerTool checks and prepares for tikv handler.
// It would panic when any error happens.
func (s *Server) newTikvHandlerTool() *tikvHandlerTool {
var tikvStore kvStore
var tikvStore tikv.Storage
store, ok := s.driver.(*TiDBDriver)
if !ok {
panic("Invalid KvStore with illegal driver")
}

if tikvStore, ok = store.store.(kvStore); !ok {
if tikvStore, ok = store.store.(tikv.Storage); !ok {
panic("Invalid KvStore with illegal store")
}

regionCache := tikvStore.GetRegionCache()

return &tikvHandlerTool{
regionCache: regionCache,
store: tikvStore,
}
}

func (t *tikvHandlerTool) getMvccByEncodedKey(encodedKey kv.Key) (*kvrpcpb.MvccGetByKeyResponse, error) {
keyLocation, err := t.regionCache.LocateKey(tikv.NewBackoffer(context.Background(), 500), encodedKey)
if err != nil {
return nil, errors.Trace(err)
}

tikvReq := &tikvrpc.Request{
Type: tikvrpc.CmdMvccGetByKey,
MvccGetByKey: &kvrpcpb.MvccGetByKeyRequest{
Key: encodedKey,
helper.Helper{
RegionCache: regionCache,
Store: tikvStore,
},
}
kvResp, err := t.store.SendReq(tikv.NewBackoffer(context.Background(), 500), tikvReq, keyLocation.Region, time.Minute)
if err != nil {
logutil.Logger(context.Background()).Info("get MVCC by encoded key failed",
zap.Binary("encodeKey", encodedKey),
zap.Reflect("region", keyLocation.Region),
zap.Binary("startKey", keyLocation.StartKey),
zap.Binary("endKey", keyLocation.EndKey),
zap.Reflect("kvResp", kvResp),
zap.Error(err))
return nil, errors.Trace(err)
}
return kvResp.MvccGetByKey, nil
}

type mvccKV struct {
Expand All @@ -168,14 +144,14 @@ type mvccKV struct {

func (t *tikvHandlerTool) getMvccByHandle(tableID, handle int64) (*mvccKV, error) {
encodedKey := tablecodec.EncodeRowKeyWithHandle(tableID, handle)
data, err := t.getMvccByEncodedKey(encodedKey)
return &mvccKV{strings.ToUpper(hex.EncodeToString(encodedKey)), data}, err
data, err := t.GetMvccByEncodedKey(encodedKey)
return &mvccKV{Key: strings.ToUpper(hex.EncodeToString(encodedKey)), Value: data}, err
}

func (t *tikvHandlerTool) getMvccByStartTs(startTS uint64, startKey, endKey []byte) (*kvrpcpb.MvccGetByStartTsResponse, error) {
bo := tikv.NewBackoffer(context.Background(), 5000)
for {
curRegion, err := t.regionCache.LocateKey(bo, startKey)
curRegion, err := t.RegionCache.LocateKey(bo, startKey)
if err != nil {
logutil.Logger(context.Background()).Error("get MVCC by startTS failed", zap.Uint64("txnStartTS", startTS), zap.Binary("startKey", startKey), zap.Error(err))
return nil, errors.Trace(err)
Expand All @@ -188,7 +164,7 @@ func (t *tikvHandlerTool) getMvccByStartTs(startTS uint64, startKey, endKey []by
},
}
tikvReq.Context.Priority = kvrpcpb.CommandPri_Low
kvResp, err := t.store.SendReq(bo, tikvReq, curRegion.Region, time.Hour)
kvResp, err := t.Store.SendReq(bo, tikvReq, curRegion.Region, time.Hour)
if err != nil {
logutil.Logger(context.Background()).Error("get MVCC by startTS failed",
zap.Uint64("txnStartTS", startTS),
Expand Down Expand Up @@ -257,7 +233,7 @@ func (t *tikvHandlerTool) getMvccByIdxValue(idx table.Index, values url.Values,
if err != nil {
return nil, errors.Trace(err)
}
data, err := t.getMvccByEncodedKey(encodedKey)
data, err := t.GetMvccByEncodedKey(encodedKey)
return &mvccKV{strings.ToUpper(hex.EncodeToString(encodedKey)), data}, err
}

Expand Down Expand Up @@ -310,7 +286,7 @@ func (t *tikvHandlerTool) getTable(dbName, tableName string) (*model.TableInfo,
}

func (t *tikvHandlerTool) schema() (infoschema.InfoSchema, error) {
session, err := session.CreateSession(t.store.(kv.Storage))
session, err := session.CreateSession(t.Store.(kv.Storage))
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -322,36 +298,11 @@ func (t *tikvHandlerTool) handleMvccGetByHex(params map[string]string) (interfac
if err != nil {
return nil, errors.Trace(err)
}
return t.getMvccByEncodedKey(encodedKey)
}

func (t *tikvHandlerTool) getAllHistoryDDL() ([]*model.Job, error) {
s, err := session.CreateSession(t.store.(kv.Storage))
if err != nil {
return nil, errors.Trace(err)
}

if s != nil {
defer s.Close()
}

store := domain.GetDomain(s.(sessionctx.Context)).Store()
txn, err := store.Begin()

if err != nil {
return nil, errors.Trace(err)
}
txnMeta := meta.NewMeta(txn)

jobs, err := txnMeta.GetAllHistoryDDLJobs()
if err != nil {
return nil, errors.Trace(err)
}
return jobs, nil
return t.GetMvccByEncodedKey(encodedKey)
}

func (t *tikvHandlerTool) scrapeHotInfo(rw string) (map[tblIndex]regionMetric, error) {
regionMetrics, err := t.fetchHotRegion(rw)
func (t *tikvHandlerTool) scrapeHotInfo(rw string) (map[tblIndex]helper.RegionMetric, error) {
regionMetrics, err := t.FetchHotRegion(rw)
if err != nil {
return nil, err
}
Expand All @@ -363,89 +314,22 @@ func (t *tikvHandlerTool) scrapeHotInfo(rw string) (map[tblIndex]regionMetric, e
return tblIdx, nil
}

// storeHotRegionInfos records all hog region stores.
// it's the response of PD.
type storeHotRegionInfos struct {
AsPeer map[uint64]*hotRegionsStat `json:"as_peer"`
AsLeader map[uint64]*hotRegionsStat `json:"as_leader"`
}

// hotRegions records echo store's hot region.
// it's the response of PD.
type hotRegionsStat struct {
RegionsStat []regionStat `json:"statistics"`
}

// regionStat records each hot region's statistics
// it's the response of PD.
type regionStat struct {
RegionID uint64 `json:"region_id"`
FlowBytes uint64 `json:"flow_bytes"`
HotDegree int `json:"hot_degree"`
}

// regionMetric presents the final metric output entry.
type regionMetric struct {
FlowBytes uint64 `json:"flow_bytes"`
MaxHotDegree int `json:"max_hot_degree"`
Count int `json:"region_count"`
}

// tblIndex presents the aggregate key that combined with db,table,index
type tblIndex struct {
DbName string `json:"db_name"`
TableName string `json:"table_name"`
IndexName string `json:"index_name"`
}

func (t *tikvHandlerTool) fetchHotRegion(rw string) (map[uint64]regionMetric, error) {
etcd, ok := t.store.(domain.EtcdBackend)
if !ok {
return nil, errors.New("not implemented")
}
pdHosts := etcd.EtcdAddrs()
if len(pdHosts) == 0 {
return nil, errors.New("pd unavailable")
}
req, err := http.NewRequest("GET", protocol+pdHosts[0]+rw, nil)
if err != nil {
return nil, errors.Trace(err)
}
timeout, cancelFunc := context.WithTimeout(context.Background(), 50*time.Millisecond)
resp, err := http.DefaultClient.Do(req.WithContext(timeout))
cancelFunc()
if err != nil {
return nil, errors.Trace(err)
}
defer func() {
err = resp.Body.Close()
if err != nil {
logutil.Logger(context.Background()).Error("close body failed", zap.Error(err))
}
}()
var regionResp storeHotRegionInfos
err = json.NewDecoder(resp.Body).Decode(&regionResp)
if err != nil {
return nil, errors.Trace(err)
}
metric := make(map[uint64]regionMetric)
for _, hotRegions := range regionResp.AsLeader {
for _, region := range hotRegions.RegionsStat {
metric[region.RegionID] = regionMetric{FlowBytes: region.FlowBytes, MaxHotDegree: region.HotDegree}
}
}
return metric, nil
}

func (t *tikvHandlerTool) fetchRegionTableIndex(metrics map[uint64]regionMetric) (map[tblIndex]regionMetric, error) {
func (t *tikvHandlerTool) fetchRegionTableIndex(metrics map[uint64]helper.RegionMetric) (map[tblIndex]helper.RegionMetric, error) {
schema, err := t.schema()
if err != nil {
return nil, err
}

idxMetrics := make(map[tblIndex]regionMetric)
idxMetrics := make(map[tblIndex]helper.RegionMetric)
for regionID, regionMetric := range metrics {
region, err := t.regionCache.LocateRegionByID(tikv.NewBackoffer(context.Background(), 500), regionID)
region, err := t.RegionCache.LocateRegionByID(tikv.NewBackoffer(context.Background(), 500), regionID)
if err != nil {
logutil.Logger(context.Background()).Error("locate region failed", zap.Error(err))
continue
Expand Down Expand Up @@ -713,7 +597,7 @@ type RegionFrameRange struct {
func (t *tikvHandlerTool) getRegionsMeta(regionIDs []uint64) ([]RegionMeta, error) {
regions := make([]RegionMeta, len(regionIDs))
for i, regionID := range regionIDs {
meta, leader, err := t.regionCache.PDClient().GetRegionByID(context.TODO(), regionID)
meta, leader, err := t.RegionCache.PDClient().GetRegionByID(context.TODO(), regionID)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -925,6 +809,31 @@ func (h ddlHistoryJobHandler) ServeHTTP(w http.ResponseWriter, req *http.Request
writeData(w, jobs)
}

func (h ddlHistoryJobHandler) getAllHistoryDDL() ([]*model.Job, error) {
s, err := session.CreateSession(h.Store.(kv.Storage))
if err != nil {
return nil, errors.Trace(err)
}

if s != nil {
defer s.Close()
}

store := domain.GetDomain(s.(sessionctx.Context)).Store()
txn, err := store.Begin()

if err != nil {
return nil, errors.Trace(err)
}
txnMeta := meta.NewMeta(txn)

jobs, err := txnMeta.GetAllHistoryDDLJobs()
if err != nil {
return nil, errors.Trace(err)
}
return jobs, nil
}

func (h ddlResignOwnerHandler) resignDDLOwner() error {
dom, err := session.GetDomain(h.store)
if err != nil {
Expand Down Expand Up @@ -958,7 +867,7 @@ func (h ddlResignOwnerHandler) ServeHTTP(w http.ResponseWriter, req *http.Reques

func (h tableHandler) getPDAddr() ([]string, error) {
var pdAddrs []string
etcd, ok := h.store.(domain.EtcdBackend)
etcd, ok := h.Store.(domain.EtcdBackend)
if !ok {
return nil, errors.New("not implemented")
}
Expand Down Expand Up @@ -1069,7 +978,7 @@ func (h tableHandler) handleRegionRequest(schema infoschema.InfoSchema, tbl tabl
tableID := tbl.Meta().ID
// for record
startKey, endKey := tablecodec.GetTableHandleKeyRange(tableID)
recordRegionIDs, err := h.regionCache.ListRegionIDsInKeyRange(tikv.NewBackoffer(context.Background(), 500), startKey, endKey)
recordRegionIDs, err := h.RegionCache.ListRegionIDsInKeyRange(tikv.NewBackoffer(context.Background(), 500), startKey, endKey)
if err != nil {
writeError(w, err)
return
Expand All @@ -1087,7 +996,7 @@ func (h tableHandler) handleRegionRequest(schema infoschema.InfoSchema, tbl tabl
indices[i].Name = index.Meta().Name.String()
indices[i].ID = indexID
startKey, endKey := tablecodec.GetTableIndexKeyRange(tableID, indexID)
rIDs, err := h.regionCache.ListRegionIDsInKeyRange(tikv.NewBackoffer(context.Background(), 500), startKey, endKey)
rIDs, err := h.RegionCache.ListRegionIDsInKeyRange(tikv.NewBackoffer(context.Background(), 500), startKey, endKey)
if err != nil {
writeError(w, err)
return
Expand Down Expand Up @@ -1161,7 +1070,7 @@ func (h tableHandler) handleDiskUsageRequest(schema infoschema.InfoSchema, tbl t

type hotRegion struct {
tblIndex
regionMetric
helper.RegionMetric
}
type hotRegions []hotRegion

Expand All @@ -1187,7 +1096,7 @@ func (h regionHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
startKey := []byte{'m'}
endKey := []byte{'n'}

recordRegionIDs, err := h.regionCache.ListRegionIDsInKeyRange(tikv.NewBackoffer(context.Background(), 500), startKey, endKey)
recordRegionIDs, err := h.RegionCache.ListRegionIDsInKeyRange(tikv.NewBackoffer(context.Background(), 500), startKey, endKey)
if err != nil {
writeError(w, err)
return
Expand All @@ -1212,7 +1121,7 @@ func (h regionHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
writeError(w, err)
return
}
asSortedEntry := func(metric map[tblIndex]regionMetric) hotRegions {
asSortedEntry := func(metric map[tblIndex]helper.RegionMetric) hotRegions {
hs := make(hotRegions, 0, len(metric))
for key, value := range metric {
hs = append(hs, hotRegion{key, value})
Expand All @@ -1237,7 +1146,7 @@ func (h regionHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
regionID := uint64(regionIDInt)

// locate region
region, err := h.regionCache.LocateRegionByID(tikv.NewBackoffer(context.Background(), 500), regionID)
region, err := h.RegionCache.LocateRegionByID(tikv.NewBackoffer(context.Background(), 500), regionID)
if err != nil {
writeError(w, err)
return
Expand Down Expand Up @@ -1607,7 +1516,7 @@ type serverInfo struct {

// ServeHTTP handles request of ddl server info.
func (h serverInfoHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
do, err := session.GetDomain(h.store.(kv.Storage))
do, err := session.GetDomain(h.Store.(kv.Storage))
if err != nil {
writeError(w, errors.New("create session error"))
log.Error(err)
Expand All @@ -1630,7 +1539,7 @@ type clusterServerInfo struct {

// ServeHTTP handles request of all ddl servers info.
func (h allServerInfoHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
do, err := session.GetDomain(h.store.(kv.Storage))
do, err := session.GetDomain(h.Store.(kv.Storage))
if err != nil {
writeError(w, errors.New("create session error"))
log.Error(err)
Expand Down
6 changes: 3 additions & 3 deletions server/http_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -600,7 +600,7 @@ func (ts *HTTPHandlerTestSuite) TestAllHistory(c *C) {
decoder := json.NewDecoder(resp.Body)

var jobs []*model.Job
s, _ := session.CreateSession(ts.server.newTikvHandlerTool().store.(kv.Storage))
s, _ := session.CreateSession(ts.server.newTikvHandlerTool().Store.(kv.Storage))
defer s.Close()
store := domain.GetDomain(s.(sessionctx.Context)).Store()
txn, _ := store.Begin()
Expand Down Expand Up @@ -714,7 +714,7 @@ func (ts *HTTPHandlerTestSuite) TestServerInfo(c *C) {
c.Assert(info.Version, Equals, mysql.ServerVersion)
c.Assert(info.GitHash, Equals, printer.TiDBGitHash)

store := ts.server.newTikvHandlerTool().store.(kv.Storage)
store := ts.server.newTikvHandlerTool().Store.(kv.Storage)
do, err := session.GetDomain(store.(kv.Storage))
c.Assert(err, IsNil)
ddl := do.DDL()
Expand All @@ -737,7 +737,7 @@ func (ts *HTTPHandlerTestSuite) TestAllServerInfo(c *C) {
c.Assert(clusterInfo.IsAllServerVersionConsistent, IsTrue)
c.Assert(clusterInfo.ServersNum, Equals, 1)

store := ts.server.newTikvHandlerTool().store.(kv.Storage)
store := ts.server.newTikvHandlerTool().Store.(kv.Storage)
do, err := session.GetDomain(store.(kv.Storage))
c.Assert(err, IsNil)
ddl := do.DDL()
Expand Down
Loading

0 comments on commit 7acbe52

Please sign in to comment.