Skip to content

Commit

Permalink
Merge branch 'master' into sp_framework
Browse files Browse the repository at this point in the history
  • Loading branch information
CabinfeverB committed Dec 22, 2021
2 parents 810f1f5 + 39fb207 commit 997da8c
Show file tree
Hide file tree
Showing 29 changed files with 714 additions and 314 deletions.
2 changes: 1 addition & 1 deletion conf/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@
## Controls the time interval between write hot regions info into leveldb
# hot-regions-write-interval= "10m"
## The day of hot regions data to be reserved. 0 means close.
# hot-regions-reserved-days= "7"
# hot-regions-reserved-days= 7
## The number of Leader scheduling tasks performed at the same time.
# leader-schedule-limit = 4
## The number of Region scheduling tasks performed at the same time.
Expand Down
5 changes: 5 additions & 0 deletions pkg/movingaverage/avg_over_time.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,3 +117,8 @@ func (aot *AvgOverTime) Clone() *AvgOverTime {
avgInterval: aot.avgInterval,
}
}

// GetIntervalSum returns the sum of interval
func (aot *AvgOverTime) GetIntervalSum() time.Duration {
return aot.intervalSum
}
4 changes: 2 additions & 2 deletions server/api/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ func (h *schedulerHandler) redirectSchedulerDelete(w http.ResponseWriter, name,
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /schedulers/{name} [post]
func (h *schedulerHandler) PauseOrResume(w http.ResponseWriter, r *http.Request) {
var input map[string]int
var input map[string]int64
if err := apiutil.ReadJSONRespondError(h.r, w, r.Body, &input); err != nil {
return
}
Expand All @@ -332,7 +332,7 @@ func (h *schedulerHandler) PauseOrResume(w http.ResponseWriter, r *http.Request)
h.r.JSON(w, http.StatusBadRequest, "missing pause time")
return
}
if err := h.PauseOrResumeScheduler(name, int64(t)); err != nil {
if err := h.PauseOrResumeScheduler(name, t); err != nil {
h.r.JSON(w, http.StatusInternalServerError, err.Error())
return
}
Expand Down
57 changes: 7 additions & 50 deletions server/api/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,56 +36,10 @@ import (
"github.com/unrolled/render"
)

// MetaStore contains meta information about a store which needed to show.
// MetaStore contains meta information about a store.
type MetaStore struct {
StoreID uint64 `json:"id,omitempty"`
Address string `json:"address,omitempty"`
Labels []*metapb.StoreLabel `json:"labels,omitempty"`
Version string `json:"version,omitempty"`
PeerAddress string `json:"peer_address,omitempty"`
StatusAddress string `json:"status_address,omitempty"`
GitHash string `json:"git_hash,omitempty"`
StartTimestamp int64 `json:"start_timestamp,omitempty"`
DeployPath string `json:"deploy_path,omitempty"`
LastHeartbeat int64 `json:"last_heartbeat,omitempty"`
PhysicallyDestroyed bool `json:"physically_destroyed,omitempty"`
StateName string `json:"state_name"`
}

// NewMetaStore convert metapb.Store to MetaStore without State
func NewMetaStore(store *metapb.Store, stateName string) *MetaStore {
metaStore := &MetaStore{StateName: stateName}
metaStore.StoreID = store.GetId()
metaStore.Address = store.GetAddress()
metaStore.Labels = store.GetLabels()
metaStore.Version = store.GetVersion()
metaStore.PeerAddress = store.GetPeerAddress()
metaStore.StatusAddress = store.GetStatusAddress()
metaStore.GitHash = store.GetGitHash()
metaStore.StartTimestamp = store.GetStartTimestamp()
metaStore.DeployPath = store.GetDeployPath()
metaStore.LastHeartbeat = store.GetLastHeartbeat()
metaStore.PhysicallyDestroyed = store.GetPhysicallyDestroyed()
return metaStore
}

// ConvertToMetapbStore convert to metapb.Store
// For test only.
func (m *MetaStore) ConvertToMetapbStore() *metapb.Store {
metapbStore := &metapb.Store{
Id: m.StoreID,
Address: m.Address,
State: metapb.StoreState(metapb.StoreState_value[m.StateName]),
Labels: m.Labels,
Version: m.Version,
PeerAddress: m.PeerAddress,
StatusAddress: m.StatusAddress,
GitHash: m.GitHash,
StartTimestamp: m.StartTimestamp,
DeployPath: m.DeployPath,
LastHeartbeat: m.LastHeartbeat,
}
return metapbStore
*metapb.Store
StateName string `json:"state_name"`
}

// StoreStatus contains status about a store.
Expand Down Expand Up @@ -123,7 +77,10 @@ const (

func newStoreInfo(opt *config.ScheduleConfig, store *core.StoreInfo) *StoreInfo {
s := &StoreInfo{
Store: NewMetaStore(store.GetMeta(), store.GetState().String()),
Store: &MetaStore{
Store: store.GetMeta(),
StateName: store.GetState().String(),
},
Status: &StoreStatus{
Capacity: typeutil.ByteSize(store.GetCapacity()),
Available: typeutil.ByteSize(store.GetAvailable()),
Expand Down
63 changes: 8 additions & 55 deletions server/api/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
"github.com/tikv/pd/server"
"github.com/tikv/pd/server/config"
"github.com/tikv/pd/server/core"
"github.com/tikv/pd/server/versioninfo"
)

var _ = Suite(&testStoreSuite{})
Expand Down Expand Up @@ -115,8 +114,7 @@ func checkStoresInfo(c *C, ss []*StoreInfo, want []*metapb.Store) {
}
}
for _, s := range ss {
metapbStore := s.Store.ConvertToMetapbStore()
obtained := proto.Clone(metapbStore).(*metapb.Store)
obtained := proto.Clone(s.Store.Store).(*metapb.Store)
expected := proto.Clone(mapWant[obtained.Id]).(*metapb.Store)
// Ignore lastHeartbeat
obtained.LastHeartbeat, expected.LastHeartbeat = 0, 0
Expand Down Expand Up @@ -168,51 +166,6 @@ func (s *testStoreSuite) TestStoreGet(c *C) {
checkStoresInfo(c, []*StoreInfo{info}, s.stores[:1])
}

func (s *testStoreSuite) TestStoreInfoGet(c *C) {
timeStamp := time.Now().Unix()
url := fmt.Sprintf("%s/store/1112", s.urlPrefix)
_, errPut := s.grpcSvr.PutStore(context.Background(), &pdpb.PutStoreRequest{
Header: &pdpb.RequestHeader{ClusterId: s.svr.ClusterID()},
Store: &metapb.Store{
Id: 1112,
Address: fmt.Sprintf("tikv%d", 1112),
State: 1,
Labels: nil,
Version: versioninfo.MinSupportedVersion(versioninfo.Version5_0).String(),
StatusAddress: fmt.Sprintf("tikv%d", 1112),
GitHash: "45ce5b9584d618bc777877bea77cb94f61b8410",
StartTimestamp: timeStamp,
DeployPath: "/home/test",
LastHeartbeat: timeStamp,
},
})
c.Assert(errPut, IsNil)

info := new(StoreInfo)

err := readJSON(testDialClient, url, info)
c.Assert(err, IsNil)
c.Assert(info.Store.StateName, Equals, metapb.StoreState_Offline.String())
c.Assert(info.Store.StoreID, Equals, uint64(1112))
c.Assert(info.Store.Address, Equals, "tikv1112")
c.Assert(info.Store.Version, Equals, versioninfo.MinSupportedVersion(versioninfo.Version5_0).String())
c.Assert(info.Store.StatusAddress, Equals, fmt.Sprintf("tikv%d", 1112))
c.Assert(info.Store.GitHash, Equals, "45ce5b9584d618bc777877bea77cb94f61b8410")
c.Assert(info.Store.StartTimestamp, Equals, timeStamp)
c.Assert(info.Store.DeployPath, Equals, "/home/test")
c.Assert(info.Store.LastHeartbeat, Equals, timeStamp)

resp, err := testDialClient.Get(url)
c.Assert(err, IsNil)
defer resp.Body.Close()
b, err := io.ReadAll(resp.Body)
c.Assert(err, IsNil)
str := string(b)
c.Assert(strings.Contains(str, "\"state\""), Equals, false)
s.cleanup()
s.SetUpSuite(c)
}

func (s *testStoreSuite) TestStoreLabel(c *C) {
url := fmt.Sprintf("%s/store/1", s.urlPrefix)
var info StoreInfo
Expand Down Expand Up @@ -309,7 +262,7 @@ func (s *testStoreSuite) TestStoreDelete(c *C) {
err := readJSON(testDialClient, url, store)
c.Assert(err, IsNil)
c.Assert(store.Store.PhysicallyDestroyed, IsFalse)
c.Assert(store.Store.StateName, Equals, metapb.StoreState_Offline.String())
c.Assert(store.Store.State, Equals, metapb.StoreState_Offline)

// up store success because it is offline but not physically destroyed
status := requestStatusBody(c, testDialClient, http.MethodPost, fmt.Sprintf("%s/state?state=Up", url))
Expand All @@ -320,15 +273,15 @@ func (s *testStoreSuite) TestStoreDelete(c *C) {
store = new(StoreInfo)
err = readJSON(testDialClient, url, store)
c.Assert(err, IsNil)
c.Assert(store.Store.StateName, Equals, metapb.StoreState_Up.String())
c.Assert(store.Store.State, Equals, metapb.StoreState_Up)
c.Assert(store.Store.PhysicallyDestroyed, IsFalse)

// offline store with physically destroyed
status = requestStatusBody(c, testDialClient, http.MethodDelete, fmt.Sprintf("%s?force=true", url))
c.Assert(status, Equals, http.StatusOK)
err = readJSON(testDialClient, url, store)
c.Assert(err, IsNil)
c.Assert(store.Store.StateName, Equals, metapb.StoreState_Offline.String())
c.Assert(store.Store.State, Equals, metapb.StoreState_Offline)
c.Assert(store.Store.PhysicallyDestroyed, IsTrue)

// try to up store again failed because it is physically destroyed
Expand All @@ -344,15 +297,15 @@ func (s *testStoreSuite) TestStoreSetState(c *C) {
info := StoreInfo{}
err := readJSON(testDialClient, url, &info)
c.Assert(err, IsNil)
c.Assert(info.Store.StateName, Equals, metapb.StoreState_Up.String())
c.Assert(info.Store.State, Equals, metapb.StoreState_Up)

// Set to Offline.
info = StoreInfo{}
err = postJSON(testDialClient, url+"/state?state=Offline", nil)
c.Assert(err, IsNil)
err = readJSON(testDialClient, url, &info)
c.Assert(err, IsNil)
c.Assert(info.Store.StateName, Equals, metapb.StoreState_Offline.String())
c.Assert(info.Store.State, Equals, metapb.StoreState_Offline)

// store not found
info = StoreInfo{}
Expand All @@ -367,7 +320,7 @@ func (s *testStoreSuite) TestStoreSetState(c *C) {
c.Assert(err, NotNil)
err = readJSON(testDialClient, url, &info)
c.Assert(err, IsNil)
c.Assert(info.Store.StateName, Equals, metapb.StoreState_Offline.String())
c.Assert(info.Store.State, Equals, metapb.StoreState_Offline)
}

// Set back to Up.
Expand All @@ -376,7 +329,7 @@ func (s *testStoreSuite) TestStoreSetState(c *C) {
c.Assert(err, IsNil)
err = readJSON(testDialClient, url, &info)
c.Assert(err, IsNil)
c.Assert(info.Store.StateName, Equals, metapb.StoreState_Up.String())
c.Assert(info.Store.State, Equals, metapb.StoreState_Up)
}

func (s *testStoreSuite) TestUrlStoreFilter(c *C) {
Expand Down
4 changes: 2 additions & 2 deletions server/api/trend.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,8 @@ func (h *trendHandler) getTrendStores() ([]trendStore, error) {
for _, store := range stores {
info := newStoreInfo(h.svr.GetScheduleConfig(), store)
s := trendStore{
ID: info.Store.StoreID,
Address: info.Store.Address,
ID: info.Store.GetId(),
Address: info.Store.GetAddress(),
StateName: info.Store.StateName,
Capacity: uint64(info.Status.Capacity),
Available: uint64(info.Status.Available),
Expand Down
43 changes: 21 additions & 22 deletions server/core/hot_region_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,19 +62,19 @@ type HistoryHotRegions struct {
// HistoryHotRegion wraps hot region info
// it is storage format of hot_region_storage
type HistoryHotRegion struct {
UpdateTime int64 `json:"update_time,omitempty"`
RegionID uint64 `json:"region_id,omitempty"`
PeerID uint64 `json:"peer_id,omitempty"`
StoreID uint64 `json:"store_id,omitempty"`
IsLeader bool `json:"is_leader,omitempty"`
IsLearner bool `json:"is_learner,omitempty"`
HotRegionType string `json:"hot_region_type,omitempty"`
HotDegree int64 `json:"hot_degree,omitempty"`
FlowBytes float64 `json:"flow_bytes,omitempty"`
KeyRate float64 `json:"key_rate,omitempty"`
QueryRate float64 `json:"query_rate,omitempty"`
StartKey []byte `json:"start_key,omitempty"`
EndKey []byte `json:"end_key,omitempty"`
UpdateTime int64 `json:"update_time"`
RegionID uint64 `json:"region_id"`
PeerID uint64 `json:"peer_id"`
StoreID uint64 `json:"store_id"`
IsLeader bool `json:"is_leader"`
IsLearner bool `json:"is_learner"`
HotRegionType string `json:"hot_region_type"`
HotDegree int64 `json:"hot_degree"`
FlowBytes float64 `json:"flow_bytes"`
KeyRate float64 `json:"key_rate"`
QueryRate float64 `json:"query_rate"`
StartKey string `json:"start_key"`
EndKey string `json:"end_key"`
// Encryption metadata for start_key and end_key. encryption_meta.iv is IV for start_key.
// IV for end_key is calculated from (encryption_meta.iv + len(start_key)).
// The field is only used by PD and should be ignored otherwise.
Expand Down Expand Up @@ -254,16 +254,16 @@ func (h *HotRegionStorage) packHistoryHotRegions(historyHotRegions []HistoryHotR
for i := range historyHotRegions {
region := &metapb.Region{
Id: historyHotRegions[i].RegionID,
StartKey: historyHotRegions[i].StartKey,
EndKey: historyHotRegions[i].EndKey,
StartKey: HexRegionKey([]byte(historyHotRegions[i].StartKey)),
EndKey: HexRegionKey([]byte(historyHotRegions[i].EndKey)),
EncryptionMeta: historyHotRegions[i].EncryptionMeta,
}
region, err := encryption.EncryptRegion(region, h.encryptionKeyManager)
if err != nil {
return err
}
historyHotRegions[i].StartKey = region.StartKey
historyHotRegions[i].EndKey = region.EndKey
historyHotRegions[i].StartKey = String(region.StartKey)
historyHotRegions[i].EndKey = String(region.EndKey)
key := HotRegionStorePath(hotRegionType, historyHotRegions[i].UpdateTime, historyHotRegions[i].RegionID)
h.batchHotInfo[key] = &historyHotRegions[i]
}
Expand Down Expand Up @@ -338,21 +338,20 @@ func (it *HotRegionStorageIterator) Next() (*HistoryHotRegion, error) {
}
region := &metapb.Region{
Id: message.RegionID,
StartKey: message.StartKey,
EndKey: message.EndKey,
StartKey: []byte(message.StartKey),
EndKey: []byte(message.EndKey),
EncryptionMeta: message.EncryptionMeta,
}
if err := encryption.DecryptRegion(region, it.encryptionKeyManager); err != nil {
return nil, err
}
message.StartKey = region.StartKey
message.EndKey = region.EndKey
message.StartKey = String(region.StartKey)
message.EndKey = String(region.EndKey)
message.EncryptionMeta = nil
return &message, nil
}

// HotRegionStorePath generate hot region store key for HotRegionStorage.
// TODO:find a better place to put this function.
func HotRegionStorePath(hotRegionType string, updateTime int64, regionID uint64) string {
return path.Join(
"schedule",
Expand Down
22 changes: 19 additions & 3 deletions server/core/hot_region_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package core

import (
"context"
"encoding/json"
"fmt"
"log"
"math/rand"
Expand Down Expand Up @@ -63,8 +64,8 @@ func (m *MockPackHotRegionInfo) GenHistoryHotRegions(num int, updateTime time.Ti
FlowBytes: rand.Float64() * 100,
KeyRate: rand.Float64() * 100,
QueryRate: rand.Float64() * 100,
StartKey: []byte(fmt.Sprintf("%20d", i)),
EndKey: []byte(fmt.Sprintf("%20d", i)),
StartKey: fmt.Sprintf("%20d", i),
EndKey: fmt.Sprintf("%20d", i),
}
if i%2 == 1 {
m.historyHotWrites = append(m.historyHotWrites, historyHotRegion)
Expand Down Expand Up @@ -103,20 +104,33 @@ func (t *testHotRegionStorage) TestHotRegionWrite(c *C) {
RegionID: 1,
StoreID: 1,
HotRegionType: ReadType.String(),
StartKey: string([]byte{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0xff, 0x15, 0x5f, 0x69, 0x80, 0x0, 0x0, 0x0, 0x0, 0xff, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0xfa}),
EndKey: string([]byte{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0xff, 0x15, 0x5f, 0x69, 0x80, 0x0, 0x0, 0x0, 0x0, 0xff, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0xfa}),
},
{
UpdateTime: now.Add(10*time.Second).UnixNano() / int64(time.Millisecond),
RegionID: 2,
StoreID: 1,
HotRegionType: ReadType.String(),
StartKey: string([]byte{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0xff, 0x15, 0x5f, 0x69, 0x80, 0x0, 0x0, 0x0, 0x0, 0xff, 0x0, 0x0, 0x2, 0x0, 0x0, 0x0, 0x0, 0x0, 0xfa}),
EndKey: string([]byte{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0xff, 0x15, 0x5f, 0x69, 0x80, 0x0, 0x0, 0x0, 0x0, 0xff, 0x0, 0x0, 0x2, 0x0, 0x0, 0x0, 0x0, 0x0, 0xfa}),
},
{
UpdateTime: now.Add(20*time.Second).UnixNano() / int64(time.Millisecond),
RegionID: 3,
StoreID: 1,
HotRegionType: ReadType.String(),
StartKey: string([]byte{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0xff, 0x83, 0x5f, 0x69, 0x80, 0x0, 0x0, 0x0, 0x0, 0xff, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0xfa}),
EndKey: string([]byte{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0xff, 0x83, 0x5f, 0x69, 0x80, 0x0, 0x0, 0x0, 0x0, 0xff, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0xfa}),
},
}
var copyHotRegionStorages []HistoryHotRegion
data, _ := json.Marshal(hotRegionStorages)
json.Unmarshal(data, &copyHotRegionStorages)
for i, region := range hotRegionStorages {
copyHotRegionStorages[i].StartKey = region.StartKey
copyHotRegionStorages[i].EndKey = region.EndKey
}
packHotRegionInfo.historyHotReads = hotRegionStorages
packHotRegionInfo.historyHotWrites = []HistoryHotRegion{
{
Expand All @@ -133,7 +147,9 @@ func (t *testHotRegionStorage) TestHotRegionWrite(c *C) {
now.Add(40*time.Second).UnixNano()/int64(time.Millisecond))
index := 0
for next, err := iter.Next(); next != nil && err == nil; next, err = iter.Next() {
c.Assert(reflect.DeepEqual(&hotRegionStorages[index], next), IsTrue)
copyHotRegionStorages[index].StartKey = HexRegionKeyStr([]byte(copyHotRegionStorages[index].StartKey))
copyHotRegionStorages[index].EndKey = HexRegionKeyStr([]byte(copyHotRegionStorages[index].EndKey))
c.Assert(reflect.DeepEqual(&copyHotRegionStorages[index], next), IsTrue)
index++
}
c.Assert(err, IsNil)
Expand Down
Loading

0 comments on commit 997da8c

Please sign in to comment.