Skip to content

Commit

Permalink
close tikv#4490
Browse files Browse the repository at this point in the history
Signed-off-by: Cabinfever_B <cabinfeveroier@gmail.com>
  • Loading branch information
CabinfeverB committed Dec 28, 2021
2 parents 990c934 + dfdf0e2 commit 2f07409
Show file tree
Hide file tree
Showing 20 changed files with 305 additions and 264 deletions.
6 changes: 3 additions & 3 deletions pkg/apiutil/apiutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,11 +148,11 @@ func GetComponentNameOnHTTP(r *http.Request) string {
// ComponentSignatureRoundTripper is used to add component signature in HTTP header
type ComponentSignatureRoundTripper struct {
proxied http.RoundTripper
component *string
component string
}

// NewComponentSignatureRoundTripper returns a new ComponentSignatureRoundTripper.
func NewComponentSignatureRoundTripper(roundTripper http.RoundTripper, componentName *string) *ComponentSignatureRoundTripper {
func NewComponentSignatureRoundTripper(roundTripper http.RoundTripper, componentName string) *ComponentSignatureRoundTripper {
return &ComponentSignatureRoundTripper{
proxied: roundTripper,
component: componentName,
Expand All @@ -161,7 +161,7 @@ func NewComponentSignatureRoundTripper(roundTripper http.RoundTripper, component

// RoundTrip is used to implement RoundTripper
func (rt *ComponentSignatureRoundTripper) RoundTrip(req *http.Request) (resp *http.Response, err error) {
req.Header.Add(componentSignatureKey, *rt.component)
req.Header.Add(componentSignatureKey, rt.component)
// Send the request, get the response and the error
resp, err = rt.proxied.RoundTrip(req)
return
Expand Down
15 changes: 9 additions & 6 deletions pkg/apiutil/serverapi/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,10 @@ import (
"io"
"net/http"
"net/url"
"strings"

"github.com/pingcap/log"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/server"
"github.com/tikv/pd/server/config"
"github.com/urfave/negroni"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -113,11 +111,16 @@ func (h *redirector) ServeHTTP(w http.ResponseWriter, r *http.Request, next http
http.Error(w, "no leader", http.StatusServiceUnavailable)
return
}
clientUrls := leader.GetClientUrls()
urls := make([]url.URL, 0, len(clientUrls))
for _, item := range clientUrls {
u, err := url.Parse(item)
if err != nil {
http.Error(w, errs.ErrURLParse.Wrap(err).GenWithStackByCause().Error(), http.StatusInternalServerError)
return
}

urls, err := config.ParseUrls(strings.Join(leader.GetClientUrls(), ","))
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
urls = append(urls, *u)
}
client := h.s.GetHTTPClient()
NewCustomReverseProxies(client, urls).ServeHTTP(w, r)
Expand Down
10 changes: 9 additions & 1 deletion pkg/mock/mockcluster/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func (mc *Cluster) RegionWriteStats() map[uint64][]*statistics.HotPeerStat {

// HotRegionsFromStore picks hot regions in specify store.
func (mc *Cluster) HotRegionsFromStore(store uint64, kind statistics.RWType) []*core.RegionInfo {
stats := mc.HotCache.HotRegionsFromStore(store, kind, mc.GetHotRegionCacheHitsThreshold())
stats := hotRegionsFromStore(mc.HotCache, store, kind, mc.GetHotRegionCacheHitsThreshold())
regions := make([]*core.RegionInfo, 0, len(stats))
for _, stat := range stats {
region := mc.GetRegion(stat.RegionID)
Expand All @@ -141,6 +141,14 @@ func (mc *Cluster) HotRegionsFromStore(store uint64, kind statistics.RWType) []*
return regions
}

// hotRegionsFromStore picks hot region in specify store.
func hotRegionsFromStore(w *statistics.HotCache, storeID uint64, kind statistics.RWType, minHotDegree int) []*statistics.HotPeerStat {
if stats, ok := w.RegionStats(kind, minHotDegree)[storeID]; ok && len(stats) > 0 {
return stats
}
return nil
}

// AllocPeer allocs a new peer on a store.
func (mc *Cluster) AllocPeer(storeID uint64) (*metapb.Peer, error) {
peerID, err := mc.AllocID()
Expand Down
1 change: 1 addition & 0 deletions server/api/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,7 @@ func (s *testGetRegionRangeHolesSuite) TestRegionRangeHoles(c *C) {
{"", core.HexRegionKeyStr(r1.GetStartKey())},
{core.HexRegionKeyStr(r1.GetEndKey()), core.HexRegionKeyStr(r3.GetStartKey())},
{core.HexRegionKeyStr(r4.GetEndKey()), core.HexRegionKeyStr(r6.GetStartKey())},
{core.HexRegionKeyStr(r6.GetEndKey()), ""},
})
}

Expand Down
6 changes: 3 additions & 3 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -594,11 +594,11 @@ func (c *RaftCluster) HandleStoreHeartbeat(stats *pdpb.StoreStats) error {
c.limiter.Collect(newStore.GetStoreStats())
}

regionIDs := make(map[uint64]struct{}, len(stats.GetPeerStats()))
regions := make(map[uint64]*core.RegionInfo, len(stats.GetPeerStats()))
for _, peerStat := range stats.GetPeerStats() {
regionID := peerStat.GetRegionId()
regionIDs[regionID] = struct{}{}
region := c.GetRegion(regionID)
regions[regionID] = region
if region == nil {
log.Warn("discard hot peer stat for unknown region",
zap.Uint64("region-id", regionID),
Expand All @@ -624,7 +624,7 @@ func (c *RaftCluster) HandleStoreHeartbeat(stats *pdpb.StoreStats) error {
peerInfo := core.NewPeerInfo(peer, loads, interval)
c.hotStat.CheckReadAsync(statistics.NewCheckPeerTask(peerInfo, region))
}
c.hotStat.CheckReadAsync(statistics.NewCollectUnReportedPeerTask(storeID, regionIDs, interval))
c.hotStat.CheckReadAsync(statistics.NewCollectUnReportedPeerTask(storeID, regions, interval))
return nil
}

Expand Down
46 changes: 14 additions & 32 deletions server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,8 @@ type Config struct {
// an election, thus minimizing disruptions.
PreVote bool `toml:"enable-prevote"`

MaxRequestBytes uint `toml:"max-request-bytes" json:"max-request-bytes"`

Security SecurityConfig `toml:"security" json:"security"`

LabelProperty LabelPropertyConfig `toml:"label-property" json:"label-property"`
Expand All @@ -149,12 +151,9 @@ type Config struct {
// For all warnings during parsing.
WarningMsgs []string

// Only test can change them.
nextRetryDelay time.Duration
DisableStrictReconfigCheck bool

HeartbeatStreamBindInterval typeutil.Duration

LeaderPriorityCheckInterval typeutil.Duration

logger *zap.Logger
Expand Down Expand Up @@ -201,10 +200,10 @@ func NewConfig() *Config {

const (
defaultLeaderLease = int64(3)
defaultNextRetryDelay = time.Second
defaultCompactionMode = "periodic"
defaultAutoCompactionRetention = "1h"
defaultQuotaBackendBytes = typeutil.ByteSize(8 * 1024 * 1024 * 1024) // 8GB
defaultMaxRequestBytes = uint(1.5 * 1024 * 1024) // 1.5MB

defaultName = "pd"
defaultClientUrls = "http://127.0.0.1:2379"
Expand Down Expand Up @@ -550,15 +549,14 @@ func (c *Config) Adjust(meta *toml.MetaData, reloading bool) error {
c.Labels = make(map[string]string)
}

if c.nextRetryDelay == 0 {
c.nextRetryDelay = defaultNextRetryDelay
}

adjustString(&c.AutoCompactionMode, defaultCompactionMode)
adjustString(&c.AutoCompactionRetention, defaultAutoCompactionRetention)
if !configMetaData.IsDefined("quota-backend-bytes") {
c.QuotaBackendBytes = defaultQuotaBackendBytes
}
if !configMetaData.IsDefined("max-request-bytes") {
c.MaxRequestBytes = defaultMaxRequestBytes
}
adjustDuration(&c.TickInterval, defaultTickInterval)
adjustDuration(&c.ElectionInterval, defaultElectionInterval)

Expand Down Expand Up @@ -788,7 +786,7 @@ const (
defaultEnableJointConsensus = true
defaultEnableCrossTableMerge = true
defaultHotRegionsWriteInterval = 10 * time.Minute
defaultHotRegionsResevervedDays = 0
defaultHotRegionsReservedDays = 0
)

func (c *ScheduleConfig) adjust(meta *configMetaData, reloading bool) error {
Expand Down Expand Up @@ -875,7 +873,7 @@ func (c *ScheduleConfig) adjust(meta *configMetaData, reloading bool) error {
}

if !meta.IsDefined("hot-regions-reserved-days") {
adjustInt64(&c.HotRegionsReservedDays, defaultHotRegionsResevervedDays)
adjustInt64(&c.HotRegionsReservedDays, defaultHotRegionsReservedDays)
}

return c.Validate()
Expand Down Expand Up @@ -928,7 +926,7 @@ func (c *ScheduleConfig) MigrateDeprecatedFlags() {
// Validate is used to validate if some scheduling configurations are right.
func (c *ScheduleConfig) Validate() error {
if c.TolerantSizeRatio < 0 {
return errors.New("tolerant-size-ratio should be nonnegative")
return errors.New("tolerant-size-ratio should be non-negative")
}
if c.LowSpaceRatio < 0 || c.LowSpaceRatio > 1 {
return errors.New("low-space-ratio should between 0 and 1")
Expand Down Expand Up @@ -1201,23 +1199,6 @@ func (c LabelPropertyConfig) Clone() LabelPropertyConfig {
return m
}

// ParseUrls parse a string into multiple urls.
// Export for api.
func ParseUrls(s string) ([]url.URL, error) {
items := strings.Split(s, ",")
urls := make([]url.URL, 0, len(items))
for _, item := range items {
u, err := url.Parse(item)
if err != nil {
return nil, errs.ErrURLParse.Wrap(err).GenWithStackByCause()
}

urls = append(urls, *u)
}

return urls, nil
}

// SetupLogger setup the logger.
func (c *Config) SetupLogger() error {
lg, p, err := log.InitLogger(&c.Log, zap.AddStacktrace(zapcore.FatalLevel))
Expand Down Expand Up @@ -1262,6 +1243,7 @@ func (c *Config) GenEmbedEtcdConfig() (*embed.Config, error) {
cfg.AutoCompactionMode = c.AutoCompactionMode
cfg.AutoCompactionRetention = c.AutoCompactionRetention
cfg.QuotaBackendBytes = int64(c.QuotaBackendBytes)
cfg.MaxRequestBytes = c.MaxRequestBytes

allowedCN, serr := c.Security.GetOneAllowedCN()
if serr != nil {
Expand All @@ -1284,22 +1266,22 @@ func (c *Config) GenEmbedEtcdConfig() (*embed.Config, error) {
cfg.Logger = "zap"
var err error

cfg.LPUrls, err = ParseUrls(c.PeerUrls)
cfg.LPUrls, err = parseUrls(c.PeerUrls)
if err != nil {
return nil, err
}

cfg.APUrls, err = ParseUrls(c.AdvertisePeerUrls)
cfg.APUrls, err = parseUrls(c.AdvertisePeerUrls)
if err != nil {
return nil, err
}

cfg.LCUrls, err = ParseUrls(c.ClientUrls)
cfg.LCUrls, err = parseUrls(c.ClientUrls)
if err != nil {
return nil, err
}

cfg.ACUrls, err = ParseUrls(c.AdvertiseClientUrls)
cfg.ACUrls, err = parseUrls(c.AdvertiseClientUrls)
if err != nil {
return nil, err
}
Expand Down
4 changes: 4 additions & 0 deletions server/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,12 +158,15 @@ func (s *testConfigSuite) TestValidation(c *C) {
c.Assert(cfg.Schedule.Validate(), NotNil)
// check quota
c.Assert(cfg.QuotaBackendBytes, Equals, defaultQuotaBackendBytes)
// check request bytes
c.Assert(cfg.MaxRequestBytes, Equals, defaultMaxRequestBytes)
}

func (s *testConfigSuite) TestAdjust(c *C) {
cfgData := `
name = ""
lease = 0
max-request-bytes = 20000000
[pd-server]
metric-storage = "http://127.0.0.1:9090"
Expand All @@ -184,6 +187,7 @@ leader-schedule-limit = 0
c.Assert(err, IsNil)
c.Assert(cfg.Name, Equals, fmt.Sprintf("%s-%s", defaultName, host))
c.Assert(cfg.LeaderLease, Equals, defaultLeaderLease)
c.Assert(cfg.MaxRequestBytes, Equals, uint(20000000))
// When defined, use values from config file.
c.Assert(cfg.Schedule.MaxMergeRegionSize, Equals, uint64(0))
c.Assert(cfg.Schedule.EnableOneWayMerge, IsTrue)
Expand Down
18 changes: 18 additions & 0 deletions server/config/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@ package config
import (
"net/url"
"regexp"
"strings"

"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/tikv/pd/pkg/errs"
)

const (
Expand Down Expand Up @@ -87,3 +89,19 @@ func NewTestOptions() *PersistOptions {
c.Adjust(nil, false)
return NewPersistOptions(c)
}

// parseUrls parse a string into multiple urls.
func parseUrls(s string) ([]url.URL, error) {
items := strings.Split(s, ",")
urls := make([]url.URL, 0, len(items))
for _, item := range items {
u, err := url.Parse(item)
if err != nil {
return nil, errs.ErrURLParse.Wrap(err).GenWithStackByCause()
}

urls = append(urls, *u)
}

return urls, nil
}
4 changes: 4 additions & 0 deletions server/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -1143,6 +1143,10 @@ func (r *RegionsInfo) GetRangeHoles() [][]string {
lastEndKey = region.GetEndKey()
return true
})
// If the last end key is not empty, it means there is a range hole at the end.
if len(lastEndKey) > 0 {
rangeHoles = append(rangeHoles, []string{HexRegionKeyStr(lastEndKey), ""})
}
return rangeHoles
}

Expand Down
2 changes: 1 addition & 1 deletion server/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -999,7 +999,7 @@ func (h *Handler) packHotRegions(hotPeersStat statistics.StoreHotPeersStat, hotR
}
}
stat := core.HistoryHotRegion{
// store in ms.
// store in ms.
UpdateTime: hotPeerStat.LastUpdateTime.UnixNano() / int64(time.Millisecond),
RegionID: hotPeerStat.RegionID,
StoreID: hotPeerStat.StoreID,
Expand Down
6 changes: 3 additions & 3 deletions server/schedulers/hot_region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1552,7 +1552,7 @@ func (s *testHotCacheSuite) TestCheckRegionFlow(c *C) {
c.Check(len(items), Greater, 0)
for _, item := range items {
if item.StoreID == 3 {
c.Check(item.IsNeedDelete(), IsTrue)
c.Check(item.GetActionType(), Equals, statistics.Remove)
continue
}
c.Check(item.HotDegree, Equals, testcase.DegreeAfterTransferLeader+2)
Expand Down Expand Up @@ -1586,9 +1586,9 @@ func (s *testHotCacheSuite) TestCheckRegionFlowWithDifferentThreshold(c *C) {
items = tc.AddLeaderRegionWithWriteInfo(201, 1, rate*statistics.WriteReportInterval, 0, 0, statistics.WriteReportInterval, []uint64{3, 4}, 1)
for _, item := range items {
if item.StoreID < 4 {
c.Check(item.IsNeedDelete(), IsTrue)
c.Check(item.GetActionType(), Equals, statistics.Remove)
} else {
c.Check(item.IsNeedDelete(), IsFalse)
c.Check(item.GetActionType(), Equals, statistics.Update)
}
}
}
Expand Down
Loading

0 comments on commit 2f07409

Please sign in to comment.