Skip to content

Commit

Permalink
Merge branch 'master' into wg-lock
Browse files Browse the repository at this point in the history
  • Loading branch information
lhy1024 committed Sep 21, 2023
2 parents 7ced8f5 + 62ff67a commit 9a879c7
Show file tree
Hide file tree
Showing 22 changed files with 356 additions and 139 deletions.
40 changes: 23 additions & 17 deletions pkg/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -682,9 +682,14 @@ func (r *RegionInfo) isRegionRecreated() bool {
return r.GetRegionEpoch().GetVersion() == 1 && r.GetRegionEpoch().GetConfVer() == 1 && (len(r.GetStartKey()) != 0 || len(r.GetEndKey()) != 0)
}

// RegionChanged is a struct that records the changes of the region.
type RegionChanged struct {
IsNew, SaveKV, SaveCache, NeedSync bool
}

// RegionGuideFunc is a function that determines which follow-up operations need to be performed based on the origin
// and new region information.
type RegionGuideFunc func(region, origin *RegionInfo) (isNew, saveKV, saveCache, needSync bool)
type RegionGuideFunc func(region, origin *RegionInfo) *RegionChanged

// GenerateRegionGuideFunc is used to generate a RegionGuideFunc. Control the log output by specifying the log function.
// nil means do not print the log.
Expand All @@ -697,18 +702,19 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc {
}
// Save to storage if meta is updated.
// Save to cache if meta or leader is updated, or contains any down/pending peer.
// Mark isNew if the region in cache does not have leader.
return func(region, origin *RegionInfo) (isNew, saveKV, saveCache, needSync bool) {
// Mark IsNew if the region in cache does not have leader.
return func(region, origin *RegionInfo) (changed *RegionChanged) {
changed = &RegionChanged{}
if origin == nil {
if log.GetLevel() <= zap.DebugLevel {
debug("insert new region",
zap.Uint64("region-id", region.GetID()),
logutil.ZapRedactStringer("meta-region", RegionToHexMeta(region.GetMeta())))
}
saveKV, saveCache, isNew = true, true, true
changed.SaveKV, changed.SaveCache, changed.IsNew = true, true, true
} else {
if !origin.IsFromHeartbeat() {
isNew = true
changed.IsNew = true
}
r := region.GetRegionEpoch()
o := origin.GetRegionEpoch()
Expand All @@ -721,7 +727,7 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc {
zap.Uint64("new-version", r.GetVersion()),
)
}
saveKV, saveCache = true, true
changed.SaveKV, changed.SaveCache = true, true
}
if r.GetConfVer() > o.GetConfVer() {
if log.GetLevel() <= zap.InfoLevel {
Expand All @@ -732,11 +738,11 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc {
zap.Uint64("new-confver", r.GetConfVer()),
)
}
saveKV, saveCache = true, true
changed.SaveCache, changed.SaveKV = true, true
}
if region.GetLeader().GetId() != origin.GetLeader().GetId() {
if origin.GetLeader().GetId() == 0 {
isNew = true
changed.IsNew = true
} else if log.GetLevel() <= zap.InfoLevel {
info("leader changed",
zap.Uint64("region-id", region.GetID()),
Expand All @@ -745,57 +751,57 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc {
)
}
// We check it first and do not return because the log is important for us to investigate,
saveCache, needSync = true, true
changed.SaveCache, changed.NeedSync = true, true
}
if len(region.GetPeers()) != len(origin.GetPeers()) {
saveKV, saveCache = true, true
changed.SaveCache, changed.SaveKV = true, true
return
}
if len(region.GetBuckets().GetKeys()) != len(origin.GetBuckets().GetKeys()) {
if log.GetLevel() <= zap.DebugLevel {
debug("bucket key changed", zap.Uint64("region-id", region.GetID()))
}
saveKV, saveCache = true, true
changed.SaveCache, changed.SaveKV = true, true
return
}
// Once flow has changed, will update the cache.
// Because keys and bytes are strongly related, only bytes are judged.
if region.GetRoundBytesWritten() != origin.GetRoundBytesWritten() ||
region.GetRoundBytesRead() != origin.GetRoundBytesRead() ||
region.flowRoundDivisor < origin.flowRoundDivisor {
saveCache, needSync = true, true
changed.SaveCache, changed.NeedSync = true, true
return
}
if !SortedPeersStatsEqual(region.GetDownPeers(), origin.GetDownPeers()) {
if log.GetLevel() <= zap.DebugLevel {
debug("down-peers changed", zap.Uint64("region-id", region.GetID()))
}
saveCache, needSync = true, true
changed.SaveCache, changed.NeedSync = true, true
return
}
if !SortedPeersEqual(region.GetPendingPeers(), origin.GetPendingPeers()) {
if log.GetLevel() <= zap.DebugLevel {
debug("pending-peers changed", zap.Uint64("region-id", region.GetID()))
}
saveCache, needSync = true, true
changed.SaveCache, changed.NeedSync = true, true
return
}
if region.GetApproximateSize() != origin.GetApproximateSize() ||
region.GetApproximateKeys() != origin.GetApproximateKeys() {
saveCache = true
changed.SaveCache = true
return
}
if region.GetReplicationStatus().GetState() != replication_modepb.RegionReplicationState_UNKNOWN &&
(region.GetReplicationStatus().GetState() != origin.GetReplicationStatus().GetState() ||
region.GetReplicationStatus().GetStateId() != origin.GetReplicationStatus().GetStateId()) {
saveCache = true
changed.SaveCache = true

Check warning on line 797 in pkg/core/region.go

View check run for this annotation

Codecov / codecov/patch

pkg/core/region.go#L797

Added line #L797 was not covered by tests
return
}
// Do not save to kv, because 1) flashback will be eventually set to
// false, 2) flashback changes almost all regions in a cluster.
// Saving kv may downgrade PD performance when there are many regions.
if region.IsFlashbackChanged(origin) {
saveCache = true
changed.SaveCache = true
return
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/core/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,8 +333,8 @@ func TestNeedSync(t *testing.T) {
for _, testCase := range testCases {
regionA := region.Clone(testCase.optionsA...)
regionB := region.Clone(testCase.optionsB...)
_, _, _, needSync := RegionGuide(regionA, regionB)
re.Equal(testCase.needSync, needSync)
changed := RegionGuide(regionA, regionB)
re.Equal(testCase.needSync, changed.NeedSync)
}
}

Expand Down
39 changes: 23 additions & 16 deletions pkg/core/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ const (
initialMinSpace = 8 * units.GiB // 2^33=8GB
slowStoreThreshold = 80
awakenStoreInterval = 10 * time.Minute // 2 * slowScoreRecoveryTime
splitStoreWait = time.Minute

// EngineKey is the label key used to indicate engine.
EngineKey = "engine"
Expand All @@ -50,22 +51,23 @@ const (
type StoreInfo struct {
meta *metapb.Store
*storeStats
pauseLeaderTransfer bool // not allow to be used as source or target of transfer leader
slowStoreEvicted bool // this store has been evicted as a slow store, should not transfer leader to it
slowTrendEvicted bool // this store has been evicted as a slow store by trend, should not transfer leader to it
leaderCount int
regionCount int
learnerCount int
witnessCount int
leaderSize int64
regionSize int64
pendingPeerCount int
lastPersistTime time.Time
leaderWeight float64
regionWeight float64
limiter storelimit.StoreLimit
minResolvedTS uint64
lastAwakenTime time.Time
pauseLeaderTransfer bool // not allow to be used as source or target of transfer leader
slowStoreEvicted bool // this store has been evicted as a slow store, should not transfer leader to it
slowTrendEvicted bool // this store has been evicted as a slow store by trend, should not transfer leader to it
leaderCount int
regionCount int
learnerCount int
witnessCount int
leaderSize int64
regionSize int64
pendingPeerCount int
lastPersistTime time.Time
leaderWeight float64
regionWeight float64
limiter storelimit.StoreLimit
minResolvedTS uint64
lastAwakenTime time.Time
recentlySplitRegionsTime time.Time
}

// NewStoreInfo creates StoreInfo with meta data.
Expand Down Expand Up @@ -539,6 +541,11 @@ func (s *StoreInfo) NeedAwakenStore() bool {
return s.GetLastHeartbeatTS().Sub(s.lastAwakenTime) > awakenStoreInterval
}

// HasRecentlySplitRegions checks if there are some region are splitted in this store.
func (s *StoreInfo) HasRecentlySplitRegions() bool {
return time.Since(s.recentlySplitRegionsTime) < splitStoreWait
}

var (
// If a store's last heartbeat is storeDisconnectDuration ago, the store will
// be marked as disconnected state. The value should be greater than tikv's
Expand Down
7 changes: 7 additions & 0 deletions pkg/core/store_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,3 +274,10 @@ func SetLastAwakenTime(lastAwaken time.Time) StoreCreateOption {
store.lastAwakenTime = lastAwaken
}
}

// SetRecentlySplitRegionsTime sets last split time for the store.
func SetRecentlySplitRegionsTime(recentlySplitRegionsTime time.Time) StoreCreateOption {
return func(store *StoreInfo) {
store.recentlySplitRegionsTime = recentlySplitRegionsTime
}
}
7 changes: 6 additions & 1 deletion pkg/mcs/resourcemanager/server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,11 +191,16 @@ func (s *Service) AcquireTokenBuckets(stream rmpb.ResourceManager_AcquireTokenBu
}
// Send the consumption to update the metrics.
isBackground := req.GetIsBackground()
isTiFlash := req.GetIsTiflash()
if isBackground && isTiFlash {
return errors.New("background and tiflash cannot be true at the same time")

Check warning on line 196 in pkg/mcs/resourcemanager/server/grpc_service.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/resourcemanager/server/grpc_service.go#L196

Added line #L196 was not covered by tests
}
s.manager.consumptionDispatcher <- struct {
resourceGroupName string
*rmpb.Consumption
isBackground bool
}{resourceGroupName, req.GetConsumptionSinceLastRequest(), isBackground}
isTiFlash bool
}{resourceGroupName, req.GetConsumptionSinceLastRequest(), isBackground, isTiFlash}
if isBackground {
continue
}
Expand Down
21 changes: 13 additions & 8 deletions pkg/mcs/resourcemanager/server/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ type Manager struct {
resourceGroupName string
*rmpb.Consumption
isBackground bool
isTiFlash bool
}
// record update time of each resource group
consumptionRecord map[string]time.Time
Expand All @@ -81,6 +82,7 @@ func NewManager[T ConfigProvider](srv bs.Server) *Manager {
resourceGroupName string
*rmpb.Consumption
isBackground bool
isTiFlash bool
}, defaultConsumptionChanSize),
consumptionRecord: make(map[string]time.Time),
}
Expand Down Expand Up @@ -361,20 +363,23 @@ func (m *Manager) backgroundMetricsFlush(ctx context.Context) {
if consumption == nil {
continue
}
backgroundType := ""
ruLabelType := tidbTypeLabel
if consumptionInfo.isBackground {
backgroundType = backgroundTypeLabel
ruLabelType = backgroundTypeLabel

Check warning on line 368 in pkg/mcs/resourcemanager/server/manager.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/resourcemanager/server/manager.go#L368

Added line #L368 was not covered by tests
}
if consumptionInfo.isTiFlash {
ruLabelType = tiflashTypeLabel

Check warning on line 371 in pkg/mcs/resourcemanager/server/manager.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/resourcemanager/server/manager.go#L371

Added line #L371 was not covered by tests
}

var (
name = consumptionInfo.resourceGroupName
rruMetrics = readRequestUnitCost.WithLabelValues(name, backgroundType)
wruMetrics = writeRequestUnitCost.WithLabelValues(name, backgroundType)
rruMetrics = readRequestUnitCost.WithLabelValues(name, ruLabelType)
wruMetrics = writeRequestUnitCost.WithLabelValues(name, ruLabelType)
sqlLayerRuMetrics = sqlLayerRequestUnitCost.WithLabelValues(name)
readByteMetrics = readByteCost.WithLabelValues(name, backgroundType)
writeByteMetrics = writeByteCost.WithLabelValues(name, backgroundType)
kvCPUMetrics = kvCPUCost.WithLabelValues(name, backgroundType)
sqlCPUMetrics = sqlCPUCost.WithLabelValues(name, backgroundType)
readByteMetrics = readByteCost.WithLabelValues(name, ruLabelType)
writeByteMetrics = writeByteCost.WithLabelValues(name, ruLabelType)
kvCPUMetrics = kvCPUCost.WithLabelValues(name, ruLabelType)
sqlCPUMetrics = sqlCPUCost.WithLabelValues(name, ruLabelType)
readRequestCountMetrics = requestCount.WithLabelValues(name, readTypeLabel)
writeRequestCountMetrics = requestCount.WithLabelValues(name, writeTypeLabel)
)
Expand Down
2 changes: 2 additions & 0 deletions pkg/mcs/resourcemanager/server/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ const (
readTypeLabel = "read"
writeTypeLabel = "write"
backgroundTypeLabel = "background"
tiflashTypeLabel = "tiflash"
tidbTypeLabel = "tidb"
)

var (
Expand Down
8 changes: 4 additions & 4 deletions pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,8 +433,8 @@ func (c *Cluster) processRegionHeartbeat(region *core.RegionInfo) error {
// Save to storage if meta is updated, except for flashback.
// Save to cache if meta or leader is updated, or contains any down/pending peer.
// Mark isNew if the region in cache does not have leader.
isNew, _, saveCache, _ := core.GenerateRegionGuideFunc(true)(region, origin)
if !saveCache && !isNew {
changed := core.GenerateRegionGuideFunc(true)(region, origin)
if !changed.SaveCache && !changed.IsNew {
// Due to some config changes need to update the region stats as well,
// so we do some extra checks here.
if hasRegionStats && c.regionStats.RegionStatsNeedUpdate(region) {
Expand All @@ -444,7 +444,7 @@ func (c *Cluster) processRegionHeartbeat(region *core.RegionInfo) error {
}

var overlaps []*core.RegionInfo
if saveCache {
if changed.SaveCache {
// To prevent a concurrent heartbeat of another region from overriding the up-to-date region info by a stale one,
// check its validation again here.
//
Expand All @@ -456,7 +456,7 @@ func (c *Cluster) processRegionHeartbeat(region *core.RegionInfo) error {
cluster.HandleOverlaps(c, overlaps)
}

cluster.Collect(c, region, c.GetRegionStores(region), hasRegionStats, isNew, c.IsPrepared())
cluster.Collect(c, region, c.GetRegionStores(region), hasRegionStats, changed.IsNew, c.IsPrepared())
return nil
}

Expand Down
8 changes: 4 additions & 4 deletions pkg/schedule/checker/merge_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@ var (
mergeCheckerPausedCounter = checkerCounter.WithLabelValues(mergeCheckerName, "paused")
mergeCheckerRecentlySplitCounter = checkerCounter.WithLabelValues(mergeCheckerName, "recently-split")
mergeCheckerRecentlyStartCounter = checkerCounter.WithLabelValues(mergeCheckerName, "recently-start")
mergeCheckerSkipUninitRegionCounter = checkerCounter.WithLabelValues(mergeCheckerName, "skip-uninit-region")
mergeCheckerNoLeaderCounter = checkerCounter.WithLabelValues(mergeCheckerName, "no-leader")
mergeCheckerNoNeedCounter = checkerCounter.WithLabelValues(mergeCheckerName, "no-need")
mergeCheckerSpecialPeerCounter = checkerCounter.WithLabelValues(mergeCheckerName, "special-peer")
mergeCheckerUnhealthyRegionCounter = checkerCounter.WithLabelValues(mergeCheckerName, "unhealthy-region")
mergeCheckerAbnormalReplicaCounter = checkerCounter.WithLabelValues(mergeCheckerName, "abnormal-replica")
mergeCheckerHotRegionCounter = checkerCounter.WithLabelValues(mergeCheckerName, "hot-region")
mergeCheckerNoTargetCounter = checkerCounter.WithLabelValues(mergeCheckerName, "no-target")
Expand Down Expand Up @@ -129,7 +129,7 @@ func (m *MergeChecker) Check(region *core.RegionInfo) []*operator.Operator {

// when pd just started, it will load region meta from region storage,
if region.GetLeader() == nil {
mergeCheckerSkipUninitRegionCounter.Inc()
mergeCheckerNoLeaderCounter.Inc()
return nil
}

Expand All @@ -141,7 +141,7 @@ func (m *MergeChecker) Check(region *core.RegionInfo) []*operator.Operator {

// skip region has down peers or pending peers
if !filter.IsRegionHealthy(region) {
mergeCheckerSpecialPeerCounter.Inc()
mergeCheckerUnhealthyRegionCounter.Inc()

Check warning on line 144 in pkg/schedule/checker/merge_checker.go

View check run for this annotation

Codecov / codecov/patch

pkg/schedule/checker/merge_checker.go#L144

Added line #L144 was not covered by tests
return nil
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/schedule/filter/counter.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ const (
storeStateTooManyPendingPeer
storeStateRejectLeader
storeStateSlowTrend
storeStateRecentlySplitRegions

filtersLen
)
Expand Down Expand Up @@ -156,6 +157,7 @@ var filters = [filtersLen]string{
"store-state-too-many-pending-peers-filter",
"store-state-reject-leader-filter",
"store-state-slow-trend-filter",
"store-state-recently-split-regions-filter",
}

// String implements fmt.Stringer interface.
Expand Down
2 changes: 1 addition & 1 deletion pkg/schedule/filter/counter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func TestString(t *testing.T) {
expected string
}{
{int(storeStateTombstone), "store-state-tombstone-filter"},
{int(filtersLen - 1), "store-state-slow-trend-filter"},
{int(filtersLen - 1), "store-state-recently-split-regions-filter"},
{int(filtersLen), "unknown"},
}

Expand Down
Loading

0 comments on commit 9a879c7

Please sign in to comment.