Skip to content

Commit

Permalink
Merge branch 'master' into wg-lock
Browse files Browse the repository at this point in the history
  • Loading branch information
CabinfeverB committed Sep 22, 2023
2 parents 9a879c7 + a21fd58 commit 4d3d534
Show file tree
Hide file tree
Showing 98 changed files with 841 additions and 858 deletions.
6 changes: 5 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ ifeq ("$(WITH_RACE)", "1")
BUILD_CGO_ENABLED := 1
endif

ifeq ($(PLUGIN), 1)
BUILD_TAGS += with_plugin
endif

LDFLAGS += -X "$(PD_PKG)/pkg/versioninfo.PDReleaseVersion=$(shell git describe --tags --dirty --always)"
LDFLAGS += -X "$(PD_PKG)/pkg/versioninfo.PDBuildTS=$(shell date -u '+%Y-%m-%d %I:%M:%S')"
LDFLAGS += -X "$(PD_PKG)/pkg/versioninfo.PDGitHash=$(shell git rev-parse HEAD)"
Expand Down Expand Up @@ -286,4 +290,4 @@ clean-build:
rm -rf $(BUILD_BIN_PATH)
rm -rf $(GO_TOOLS_BIN_PATH)

.PHONY: clean clean-test clean-build
.PHONY: clean clean-test clean-build
25 changes: 18 additions & 7 deletions client/resource_group/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,8 @@ func (c *ResourceGroupsController) Start(ctx context.Context) {
cfgRevision := resp.GetHeader().GetRevision()
var watchMetaChannel, watchConfigChannel chan []*meta_storagepb.Event
if !c.ruConfig.isSingleGroupByKeyspace {
watchMetaChannel, err = c.provider.Watch(ctx, pd.GroupSettingsPathPrefixBytes, pd.WithRev(metaRevision), pd.WithPrefix())
// Use WithPrevKV() to get the previous key-value pair when get Delete Event.
watchMetaChannel, err = c.provider.Watch(ctx, pd.GroupSettingsPathPrefixBytes, pd.WithRev(metaRevision), pd.WithPrefix(), pd.WithPrevKV())
if err != nil {
log.Warn("watch resource group meta failed", zap.Error(err))
}
Expand All @@ -260,7 +261,8 @@ func (c *ResourceGroupsController) Start(ctx context.Context) {
}
case <-watchRetryTimer.C:
if !c.ruConfig.isSingleGroupByKeyspace && watchMetaChannel == nil {
watchMetaChannel, err = c.provider.Watch(ctx, pd.GroupSettingsPathPrefixBytes, pd.WithRev(metaRevision), pd.WithPrefix())
// Use WithPrevKV() to get the previous key-value pair when get Delete Event.
watchMetaChannel, err = c.provider.Watch(ctx, pd.GroupSettingsPathPrefixBytes, pd.WithRev(metaRevision), pd.WithPrefix(), pd.WithPrevKV())
if err != nil {
log.Warn("watch resource group meta failed", zap.Error(err))
watchRetryTimer.Reset(watchRetryInterval)
Expand Down Expand Up @@ -319,18 +321,27 @@ func (c *ResourceGroupsController) Start(ctx context.Context) {
for _, item := range resp {
metaRevision = item.Kv.ModRevision
group := &rmpb.ResourceGroup{}
if err := proto.Unmarshal(item.Kv.Value, group); err != nil {
continue
}
switch item.Type {
case meta_storagepb.Event_PUT:
if err = proto.Unmarshal(item.Kv.Value, group); err != nil {
continue
}
if item, ok := c.groupsController.Load(group.Name); ok {
gc := item.(*groupCostController)
gc.modifyMeta(group)
}
case meta_storagepb.Event_DELETE:
if _, ok := c.groupsController.LoadAndDelete(group.Name); ok {
resourceGroupStatusGauge.DeleteLabelValues(group.Name)
if item.PrevKv != nil {
if err = proto.Unmarshal(item.PrevKv.Value, group); err != nil {
continue
}
if _, ok := c.groupsController.LoadAndDelete(group.Name); ok {
resourceGroupStatusGauge.DeleteLabelValues(group.Name)
}
} else {
// Prev-kv is compacted means there must have been a delete event before this event,
// which means that this is just a duplicated event, so we can just ignore it.
log.Info("previous key-value pair has been compacted", zap.String("required-key", string(item.Kv.Key)), zap.String("value", string(item.Kv.Value)))
}
}
}
Expand Down
47 changes: 0 additions & 47 deletions client/resource_manager_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ type ResourceManagerClient interface {
ModifyResourceGroup(ctx context.Context, metaGroup *rmpb.ResourceGroup) (string, error)
DeleteResourceGroup(ctx context.Context, resourceGroupName string) (string, error)
LoadResourceGroups(ctx context.Context) ([]*rmpb.ResourceGroup, int64, error)
WatchResourceGroup(ctx context.Context, revision int64) (chan []*rmpb.ResourceGroup, error)
AcquireTokenBuckets(ctx context.Context, request *rmpb.TokenBucketsRequest) ([]*rmpb.TokenBucketResponse, error)
Watch(ctx context.Context, key []byte, opts ...OpOption) (chan []*meta_storagepb.Event, error)
}
Expand Down Expand Up @@ -188,52 +187,6 @@ func (c *client) LoadResourceGroups(ctx context.Context) ([]*rmpb.ResourceGroup,
return groups, resp.Header.Revision, nil
}

// WatchResourceGroup [just for TEST] watches resource groups changes.
// It returns a stream of slices of resource groups.
// The first message in stream contains all current resource groups,
// all subsequent messages contains new events[PUT/DELETE] for all resource groups.
func (c *client) WatchResourceGroup(ctx context.Context, revision int64) (chan []*rmpb.ResourceGroup, error) {
configChan, err := c.Watch(ctx, GroupSettingsPathPrefixBytes, WithRev(revision), WithPrefix())
if err != nil {
return nil, err
}
resourceGroupWatcherChan := make(chan []*rmpb.ResourceGroup)
go func() {
defer func() {
close(resourceGroupWatcherChan)
if r := recover(); r != nil {
log.Error("[pd] panic in ResourceManagerClient `WatchResourceGroups`", zap.Any("error", r))
return
}
}()
for {
select {
case <-ctx.Done():
return
case res, ok := <-configChan:
if !ok {
return
}
groups := make([]*rmpb.ResourceGroup, 0, len(res))
for _, item := range res {
switch item.Type {
case meta_storagepb.Event_PUT:
group := &rmpb.ResourceGroup{}
if err := proto.Unmarshal(item.Kv.Value, group); err != nil {
return
}
groups = append(groups, group)
case meta_storagepb.Event_DELETE:
continue
}
}
resourceGroupWatcherChan <- groups
}
}
}()
return resourceGroupWatcherChan, err
}

func (c *client) AcquireTokenBuckets(ctx context.Context, request *rmpb.TokenBucketsRequest) ([]*rmpb.TokenBucketResponse, error) {
req := &tokenRequest{
done: make(chan error, 1),
Expand Down
30 changes: 15 additions & 15 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -531,21 +531,6 @@ error = '''
plugin is not found: %s
'''

["PD:operator:ErrRegionAbnormalPeer"]
error = '''
region %v has abnormal peer
'''

["PD:operator:ErrRegionNotAdjacent"]
error = '''
two regions are not adjacent
'''

["PD:operator:ErrRegionNotFound"]
error = '''
region %v not found
'''

["PD:os:ErrOSOpen"]
error = '''
open error
Expand Down Expand Up @@ -616,6 +601,21 @@ error = '''
failed to unmarshal proto
'''

["PD:region:ErrRegionAbnormalPeer"]
error = '''
region %v has abnormal peer
'''

["PD:region:ErrRegionNotAdjacent"]
error = '''
two regions are not adjacent
'''

["PD:region:ErrRegionNotFound"]
error = '''
region %v not found
'''

["PD:region:ErrRegionRuleContent"]
error = '''
invalid region rule content, %s
Expand Down
4 changes: 2 additions & 2 deletions pkg/autoscaling/prometheus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func makeJSONResponse(promResp *response) (*http.Response, []byte, error) {

response := &http.Response{
Status: "200 OK",
StatusCode: 200,
StatusCode: http.StatusOK,
Proto: "HTTP/1.1",
ProtoMajor: 1,
ProtoMinor: 1,
Expand Down Expand Up @@ -246,7 +246,7 @@ func (c *errorHTTPStatusClient) Do(_ context.Context, req *http.Request) (r *htt

r, body, err = makeJSONResponse(promResp)

r.StatusCode = 500
r.StatusCode = http.StatusInternalServerError
r.Status = "500 Internal Server Error"

return
Expand Down
40 changes: 17 additions & 23 deletions pkg/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -682,14 +682,9 @@ func (r *RegionInfo) isRegionRecreated() bool {
return r.GetRegionEpoch().GetVersion() == 1 && r.GetRegionEpoch().GetConfVer() == 1 && (len(r.GetStartKey()) != 0 || len(r.GetEndKey()) != 0)
}

// RegionChanged is a struct that records the changes of the region.
type RegionChanged struct {
IsNew, SaveKV, SaveCache, NeedSync bool
}

// RegionGuideFunc is a function that determines which follow-up operations need to be performed based on the origin
// and new region information.
type RegionGuideFunc func(region, origin *RegionInfo) *RegionChanged
type RegionGuideFunc func(region, origin *RegionInfo) (isNew, saveKV, saveCache, needSync bool)

// GenerateRegionGuideFunc is used to generate a RegionGuideFunc. Control the log output by specifying the log function.
// nil means do not print the log.
Expand All @@ -702,19 +697,18 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc {
}
// Save to storage if meta is updated.
// Save to cache if meta or leader is updated, or contains any down/pending peer.
// Mark IsNew if the region in cache does not have leader.
return func(region, origin *RegionInfo) (changed *RegionChanged) {
changed = &RegionChanged{}
// Mark isNew if the region in cache does not have leader.
return func(region, origin *RegionInfo) (isNew, saveKV, saveCache, needSync bool) {
if origin == nil {
if log.GetLevel() <= zap.DebugLevel {
debug("insert new region",
zap.Uint64("region-id", region.GetID()),
logutil.ZapRedactStringer("meta-region", RegionToHexMeta(region.GetMeta())))
}
changed.SaveKV, changed.SaveCache, changed.IsNew = true, true, true
saveKV, saveCache, isNew = true, true, true
} else {
if !origin.IsFromHeartbeat() {
changed.IsNew = true
isNew = true
}
r := region.GetRegionEpoch()
o := origin.GetRegionEpoch()
Expand All @@ -727,7 +721,7 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc {
zap.Uint64("new-version", r.GetVersion()),
)
}
changed.SaveKV, changed.SaveCache = true, true
saveKV, saveCache = true, true
}
if r.GetConfVer() > o.GetConfVer() {
if log.GetLevel() <= zap.InfoLevel {
Expand All @@ -738,11 +732,11 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc {
zap.Uint64("new-confver", r.GetConfVer()),
)
}
changed.SaveCache, changed.SaveKV = true, true
saveKV, saveCache = true, true
}
if region.GetLeader().GetId() != origin.GetLeader().GetId() {
if origin.GetLeader().GetId() == 0 {
changed.IsNew = true
isNew = true
} else if log.GetLevel() <= zap.InfoLevel {
info("leader changed",
zap.Uint64("region-id", region.GetID()),
Expand All @@ -751,57 +745,57 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc {
)
}
// We check it first and do not return because the log is important for us to investigate,
changed.SaveCache, changed.NeedSync = true, true
saveCache, needSync = true, true
}
if len(region.GetPeers()) != len(origin.GetPeers()) {
changed.SaveCache, changed.SaveKV = true, true
saveKV, saveCache = true, true
return
}
if len(region.GetBuckets().GetKeys()) != len(origin.GetBuckets().GetKeys()) {
if log.GetLevel() <= zap.DebugLevel {
debug("bucket key changed", zap.Uint64("region-id", region.GetID()))
}
changed.SaveCache, changed.SaveKV = true, true
saveKV, saveCache = true, true
return
}
// Once flow has changed, will update the cache.
// Because keys and bytes are strongly related, only bytes are judged.
if region.GetRoundBytesWritten() != origin.GetRoundBytesWritten() ||
region.GetRoundBytesRead() != origin.GetRoundBytesRead() ||
region.flowRoundDivisor < origin.flowRoundDivisor {
changed.SaveCache, changed.NeedSync = true, true
saveCache, needSync = true, true
return
}
if !SortedPeersStatsEqual(region.GetDownPeers(), origin.GetDownPeers()) {
if log.GetLevel() <= zap.DebugLevel {
debug("down-peers changed", zap.Uint64("region-id", region.GetID()))
}
changed.SaveCache, changed.NeedSync = true, true
saveCache, needSync = true, true
return
}
if !SortedPeersEqual(region.GetPendingPeers(), origin.GetPendingPeers()) {
if log.GetLevel() <= zap.DebugLevel {
debug("pending-peers changed", zap.Uint64("region-id", region.GetID()))
}
changed.SaveCache, changed.NeedSync = true, true
saveCache, needSync = true, true
return
}
if region.GetApproximateSize() != origin.GetApproximateSize() ||
region.GetApproximateKeys() != origin.GetApproximateKeys() {
changed.SaveCache = true
saveCache = true
return
}
if region.GetReplicationStatus().GetState() != replication_modepb.RegionReplicationState_UNKNOWN &&
(region.GetReplicationStatus().GetState() != origin.GetReplicationStatus().GetState() ||
region.GetReplicationStatus().GetStateId() != origin.GetReplicationStatus().GetStateId()) {
changed.SaveCache = true
saveCache = true
return
}
// Do not save to kv, because 1) flashback will be eventually set to
// false, 2) flashback changes almost all regions in a cluster.
// Saving kv may downgrade PD performance when there are many regions.
if region.IsFlashbackChanged(origin) {
changed.SaveCache = true
saveCache = true
return
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/core/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,8 +333,8 @@ func TestNeedSync(t *testing.T) {
for _, testCase := range testCases {
regionA := region.Clone(testCase.optionsA...)
regionB := region.Clone(testCase.optionsB...)
changed := RegionGuide(regionA, regionB)
re.Equal(testCase.needSync, changed.NeedSync)
_, _, _, needSync := RegionGuide(regionA, regionB)
re.Equal(testCase.needSync, needSync)
}
}

Expand Down
39 changes: 16 additions & 23 deletions pkg/core/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ const (
initialMinSpace = 8 * units.GiB // 2^33=8GB
slowStoreThreshold = 80
awakenStoreInterval = 10 * time.Minute // 2 * slowScoreRecoveryTime
splitStoreWait = time.Minute

// EngineKey is the label key used to indicate engine.
EngineKey = "engine"
Expand All @@ -51,23 +50,22 @@ const (
type StoreInfo struct {
meta *metapb.Store
*storeStats
pauseLeaderTransfer bool // not allow to be used as source or target of transfer leader
slowStoreEvicted bool // this store has been evicted as a slow store, should not transfer leader to it
slowTrendEvicted bool // this store has been evicted as a slow store by trend, should not transfer leader to it
leaderCount int
regionCount int
learnerCount int
witnessCount int
leaderSize int64
regionSize int64
pendingPeerCount int
lastPersistTime time.Time
leaderWeight float64
regionWeight float64
limiter storelimit.StoreLimit
minResolvedTS uint64
lastAwakenTime time.Time
recentlySplitRegionsTime time.Time
pauseLeaderTransfer bool // not allow to be used as source or target of transfer leader
slowStoreEvicted bool // this store has been evicted as a slow store, should not transfer leader to it
slowTrendEvicted bool // this store has been evicted as a slow store by trend, should not transfer leader to it
leaderCount int
regionCount int
learnerCount int
witnessCount int
leaderSize int64
regionSize int64
pendingPeerCount int
lastPersistTime time.Time
leaderWeight float64
regionWeight float64
limiter storelimit.StoreLimit
minResolvedTS uint64
lastAwakenTime time.Time
}

// NewStoreInfo creates StoreInfo with meta data.
Expand Down Expand Up @@ -541,11 +539,6 @@ func (s *StoreInfo) NeedAwakenStore() bool {
return s.GetLastHeartbeatTS().Sub(s.lastAwakenTime) > awakenStoreInterval
}

// HasRecentlySplitRegions checks if there are some region are splitted in this store.
func (s *StoreInfo) HasRecentlySplitRegions() bool {
return time.Since(s.recentlySplitRegionsTime) < splitStoreWait
}

var (
// If a store's last heartbeat is storeDisconnectDuration ago, the store will
// be marked as disconnected state. The value should be greater than tikv's
Expand Down
Loading

0 comments on commit 4d3d534

Please sign in to comment.