From a75ae5f1dbe16cf5a17c6a0954c12ebea1de5605 Mon Sep 17 00:00:00 2001 From: nolouch Date: Wed, 20 Mar 2024 15:55:17 +0800 Subject: [PATCH] *: optimize heartbeat process with async runner Signed-off-by: nolouch --- pkg/cluster/cluster.go | 10 +- pkg/core/region.go | 87 +++++++++++---- pkg/core/region_test.go | 7 +- pkg/core/region_tree.go | 5 + pkg/core/region_tree_test.go | 2 + pkg/mcs/scheduling/server/cluster.go | 86 +++++++++++++-- pkg/ratelimit/runner.go | 11 +- pkg/schedule/config/config.go | 8 ++ pkg/syncer/client.go | 5 +- pkg/utils/ctxutil/context.go | 27 +++++ server/cluster/cluster.go | 155 ++++++++++++++++++--------- server/cluster/cluster_test.go | 101 ++++++++--------- server/cluster/cluster_worker.go | 10 +- tests/cluster.go | 2 + 14 files changed, 378 insertions(+), 138 deletions(-) create mode 100644 pkg/utils/ctxutil/context.go diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 916200bfa3ec..3edefbcaefdc 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -28,6 +28,7 @@ type Cluster interface { GetLabelStats() *statistics.LabelStatistics GetCoordinator() *schedule.Coordinator GetRuleManager() *placement.RuleManager + GetBasicCluster() *core.BasicCluster } // HandleStatsAsync handles the flow asynchronously. @@ -55,8 +56,13 @@ func HandleOverlaps(c Cluster, overlaps []*core.RegionInfo) { } // Collect collects the cluster information. -func Collect(c Cluster, region *core.RegionInfo, stores []*core.StoreInfo, hasRegionStats bool) { +func Collect(c Cluster, region *core.RegionInfo, hasRegionStats bool) { if hasRegionStats { - c.GetRegionStats().Observe(region, stores) + // get region again from root tree. make sure the observed region is the latest. + region = c.GetBasicCluster().GetRegion(region.GetID()) + if region == nil { + return + } + c.GetRegionStats().Observe(region, c.GetBasicCluster().GetRegionStores(region)) } } diff --git a/pkg/core/region.go b/pkg/core/region.go index f7a4ef5f0fd5..3fa1b72261ae 100644 --- a/pkg/core/region.go +++ b/pkg/core/region.go @@ -16,6 +16,7 @@ package core import ( "bytes" + "context" "encoding/hex" "fmt" "math" @@ -35,6 +36,8 @@ import ( "github.com/pingcap/kvproto/pkg/replication_modepb" "github.com/pingcap/log" "github.com/tikv/pd/pkg/errs" + "github.com/tikv/pd/pkg/ratelimit" + "github.com/tikv/pd/pkg/utils/ctxutil" "github.com/tikv/pd/pkg/utils/logutil" "github.com/tikv/pd/pkg/utils/syncutil" "github.com/tikv/pd/pkg/utils/typeutil" @@ -711,20 +714,51 @@ func (r *RegionInfo) isRegionRecreated() bool { // RegionGuideFunc is a function that determines which follow-up operations need to be performed based on the origin // and new region information. -type RegionGuideFunc func(region, origin *RegionInfo) (saveKV, saveCache, needSync bool) +type RegionGuideFunc func(ctx context.Context, region, origin *RegionInfo) (saveKV, saveCache, needSync bool) // GenerateRegionGuideFunc is used to generate a RegionGuideFunc. Control the log output by specifying the log function. // nil means do not print the log. func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc { noLog := func(msg string, fields ...zap.Field) {} - debug, info := noLog, noLog + d, i := noLog, noLog + debug, info := d, i if enableLog { - debug = log.Debug - info = log.Info + d = log.Debug + i = log.Info + debug, info = d, i } // Save to storage if meta is updated. // Save to cache if meta or leader is updated, or contains any down/pending peer. - return func(region, origin *RegionInfo) (saveKV, saveCache, needSync bool) { + return func(ctx context.Context, region, origin *RegionInfo) (saveKV, saveCache, needSync bool) { + taskRunner, ok := ctx.Value(ctxutil.TaskRunnerKey).(ratelimit.Runner) + limiter, _ := ctx.Value(ctxutil.LimiterKey).(*ratelimit.ConcurrencyLimiter) + // print log asynchronously + if ok { + debug = func(msg string, fields ...zap.Field) { + taskRunner.RunTask( + ctx, + ratelimit.TaskOpts{ + TaskName: "Log", + Limit: limiter, + }, + func(ctx context.Context) { + d(msg, fields...) + }, + ) + } + info = func(msg string, fields ...zap.Field) { + taskRunner.RunTask( + ctx, + ratelimit.TaskOpts{ + TaskName: "Log", + Limit: limiter, + }, + func(ctx context.Context) { + i(msg, fields...) + }, + ) + } + } if origin == nil { if log.GetLevel() <= zap.DebugLevel { debug("insert new region", @@ -789,7 +823,7 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc { } if !SortedPeersStatsEqual(region.GetDownPeers(), origin.GetDownPeers()) { if log.GetLevel() <= zap.DebugLevel { - debug("down-peers changed", zap.Uint64("region-id", region.GetID())) + debug("down-peers changed", zap.Uint64("region-id", region.GetID()), zap.Reflect("before", origin.GetDownPeers()), zap.Reflect("after", region.GetDownPeers())) } saveCache, needSync = true, true return @@ -912,7 +946,7 @@ func (r *RegionsInfo) CheckAndPutRegion(region *RegionInfo) []*RegionInfo { if origin == nil || !bytes.Equal(origin.GetStartKey(), region.GetStartKey()) || !bytes.Equal(origin.GetEndKey(), region.GetEndKey()) { ols = r.tree.overlaps(®ionItem{RegionInfo: region}) } - err := check(region, origin, ols) + err := check(region, origin, convertItemsToRegions(ols)) if err != nil { log.Debug("region is stale", zap.Stringer("origin", origin.GetMeta()), errs.ZapError(err)) // return the state region to delete. @@ -933,48 +967,62 @@ func (r *RegionsInfo) PutRegion(region *RegionInfo) []*RegionInfo { } // PreCheckPutRegion checks if the region is valid to put. -func (r *RegionsInfo) PreCheckPutRegion(region *RegionInfo, trace RegionHeartbeatProcessTracer) (*RegionInfo, []*regionItem, error) { - origin, overlaps := r.GetRelevantRegions(region, trace) +func (r *RegionsInfo) PreCheckPutRegion(region *RegionInfo) (*RegionInfo, []*RegionInfo, error) { + origin, overlaps := r.GetRelevantRegions(region) err := check(region, origin, overlaps) return origin, overlaps, err } +func convertItemsToRegions(items []*regionItem) []*RegionInfo { + regions := make([]*RegionInfo, 0, len(items)) + for _, item := range items { + regions = append(regions, item.RegionInfo) + } + return regions +} + // AtomicCheckAndPutRegion checks if the region is valid to put, if valid then put. -func (r *RegionsInfo) AtomicCheckAndPutRegion(region *RegionInfo, trace RegionHeartbeatProcessTracer) ([]*RegionInfo, error) { +func (r *RegionsInfo) AtomicCheckAndPutRegion(ctx context.Context, region *RegionInfo) ([]*RegionInfo, error) { + tracer, ok := ctx.Value(ctxutil.HeartbeatTracerKey).(RegionHeartbeatProcessTracer) + if !ok { + tracer = NewNoopHeartbeatProcessTracer() + } r.t.Lock() var ols []*regionItem origin := r.getRegionLocked(region.GetID()) if origin == nil || !bytes.Equal(origin.GetStartKey(), region.GetStartKey()) || !bytes.Equal(origin.GetEndKey(), region.GetEndKey()) { ols = r.tree.overlaps(®ionItem{RegionInfo: region}) } - trace.OnCheckOverlapsFinished() - err := check(region, origin, ols) + tracer.OnCheckOverlapsFinished() + err := check(region, origin, convertItemsToRegions(ols)) if err != nil { r.t.Unlock() - trace.OnValidateRegionFinished() + tracer.OnValidateRegionFinished() return nil, err } - trace.OnValidateRegionFinished() + tracer.OnValidateRegionFinished() origin, overlaps, rangeChanged := r.setRegionLocked(region, true, ols...) r.t.Unlock() - trace.OnSetRegionFinished() + tracer.OnSetRegionFinished() r.UpdateSubTree(region, origin, overlaps, rangeChanged) - trace.OnUpdateSubTreeFinished() + tracer.OnUpdateSubTreeFinished() return overlaps, nil } // GetRelevantRegions returns the relevant regions for a given region. -func (r *RegionsInfo) GetRelevantRegions(region *RegionInfo, trace RegionHeartbeatProcessTracer) (origin *RegionInfo, overlaps []*regionItem) { +func (r *RegionsInfo) GetRelevantRegions(region *RegionInfo) (origin *RegionInfo, overlaps []*RegionInfo) { r.t.RLock() defer r.t.RUnlock() origin = r.getRegionLocked(region.GetID()) if origin == nil || !bytes.Equal(origin.GetStartKey(), region.GetStartKey()) || !bytes.Equal(origin.GetEndKey(), region.GetEndKey()) { - overlaps = r.tree.overlaps(®ionItem{RegionInfo: region}) + for _, item := range r.tree.overlaps(®ionItem{RegionInfo: region}) { + overlaps = append(overlaps, item.RegionInfo) + } } return } -func check(region, origin *RegionInfo, overlaps []*regionItem) error { +func check(region, origin *RegionInfo, overlaps []*RegionInfo) error { for _, item := range overlaps { // PD ignores stale regions' heartbeats, unless it is recreated recently by unsafe recover operation. if region.GetRegionEpoch().GetVersion() < item.GetRegionEpoch().GetVersion() && !region.isRegionRecreated() { @@ -1043,7 +1091,6 @@ func (r *RegionsInfo) setRegionLocked(region *RegionInfo, withOverlaps bool, ol item = ®ionItem{RegionInfo: region} r.regions[region.GetID()] = item } - var overlaps []*RegionInfo if rangeChanged { overlaps = r.tree.update(item, withOverlaps, ol...) diff --git a/pkg/core/region_test.go b/pkg/core/region_test.go index 3c6536a6a773..5bb09eb52b07 100644 --- a/pkg/core/region_test.go +++ b/pkg/core/region_test.go @@ -15,6 +15,7 @@ package core import ( + "context" "crypto/rand" "fmt" "math" @@ -363,7 +364,7 @@ func TestNeedSync(t *testing.T) { for _, testCase := range testCases { regionA := region.Clone(testCase.optionsA...) regionB := region.Clone(testCase.optionsB...) - _, _, needSync := RegionGuide(regionA, regionB) + _, _, needSync := RegionGuide(context.TODO(), regionA, regionB) re.Equal(testCase.needSync, needSync) } } @@ -459,9 +460,9 @@ func TestSetRegionConcurrence(t *testing.T) { regions := NewRegionsInfo() region := NewTestRegionInfo(1, 1, []byte("a"), []byte("b")) go func() { - regions.AtomicCheckAndPutRegion(region, NewNoopHeartbeatProcessTracer()) + regions.AtomicCheckAndPutRegion(context.TODO(), region) }() - regions.AtomicCheckAndPutRegion(region, NewNoopHeartbeatProcessTracer()) + regions.AtomicCheckAndPutRegion(context.TODO(), region) re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/core/UpdateSubTree")) } diff --git a/pkg/core/region_tree.go b/pkg/core/region_tree.go index 333e1730ec8a..8c928f391eb3 100644 --- a/pkg/core/region_tree.go +++ b/pkg/core/region_tree.go @@ -35,6 +35,11 @@ func (r *regionItem) GetStartKey() []byte { return r.meta.StartKey } +// GetID returns the ID of the region. +func (r *regionItem) GetID() uint64 { + return r.meta.GetId() +} + // GetEndKey returns the end key of the region. func (r *regionItem) GetEndKey() []byte { return r.meta.EndKey diff --git a/pkg/core/region_tree_test.go b/pkg/core/region_tree_test.go index 4e002fb8157b..f4ef6cb67b3b 100644 --- a/pkg/core/region_tree_test.go +++ b/pkg/core/region_tree_test.go @@ -158,6 +158,8 @@ func TestRegionTree(t *testing.T) { updateNewItem(tree, regionA) updateNewItem(tree, regionC) + re.Nil(tree.overlaps(newRegionItem([]byte("b"), []byte("c")))) + re.Equal(regionC, tree.overlaps(newRegionItem([]byte("a"), []byte("cc")))[1].RegionInfo) re.Nil(tree.search([]byte{})) re.Equal(regionA, tree.search([]byte("a"))) re.Nil(tree.search([]byte("b"))) diff --git a/pkg/mcs/scheduling/server/cluster.go b/pkg/mcs/scheduling/server/cluster.go index 1b915b6874d2..43fb236ee22e 100644 --- a/pkg/mcs/scheduling/server/cluster.go +++ b/pkg/mcs/scheduling/server/cluster.go @@ -2,6 +2,7 @@ package server import ( "context" + "runtime" "sync" "sync/atomic" "time" @@ -15,6 +16,7 @@ import ( "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/mcs/scheduling/server/config" + "github.com/tikv/pd/pkg/ratelimit" "github.com/tikv/pd/pkg/schedule" sc "github.com/tikv/pd/pkg/schedule/config" "github.com/tikv/pd/pkg/schedule/hbstream" @@ -29,6 +31,7 @@ import ( "github.com/tikv/pd/pkg/statistics/buckets" "github.com/tikv/pd/pkg/statistics/utils" "github.com/tikv/pd/pkg/storage" + "github.com/tikv/pd/pkg/utils/ctxutil" "github.com/tikv/pd/pkg/utils/logutil" "go.uber.org/zap" ) @@ -51,6 +54,9 @@ type Cluster struct { apiServerLeader atomic.Value clusterID uint64 running atomic.Bool + + taskRunner ratelimit.Runner + hbConcurrencyLimiter *ratelimit.ConcurrencyLimiter } const ( @@ -81,6 +87,9 @@ func NewCluster(parentCtx context.Context, persistConfig *config.PersistConfig, storage: storage, clusterID: clusterID, checkMembershipCh: checkMembershipCh, + + taskRunner: ratelimit.NewAsyncRunner("heartbeat-async-task-runner", time.Minute), + hbConcurrencyLimiter: ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU() * 2)), } c.coordinator = schedule.NewCoordinator(ctx, c, hbStreams) err = c.ruleManager.Initialize(persistConfig.GetMaxReplicas(), persistConfig.GetLocationLabels(), persistConfig.GetIsolationLevel()) @@ -517,6 +526,7 @@ func (c *Cluster) StartBackgroundJobs() { go c.runUpdateStoreStats() go c.runCoordinator() go c.runMetricsCollectionJob() + c.taskRunner.Start() c.running.Store(true) } @@ -527,6 +537,7 @@ func (c *Cluster) StopBackgroundJobs() { } c.running.Store(false) c.coordinator.Stop() + c.taskRunner.Stop() c.cancel() c.wg.Wait() } @@ -543,7 +554,13 @@ func (c *Cluster) HandleRegionHeartbeat(region *core.RegionInfo) error { tracer = core.NewHeartbeatProcessTracer() } tracer.Begin() - if err := c.processRegionHeartbeat(region, tracer); err != nil { + ctx := context.WithValue(c.ctx, ctxutil.HeartbeatTracerKey, tracer) + ctx = context.WithValue(ctx, ctxutil.LimiterKey, c.hbConcurrencyLimiter) + if c.persistConfig.GetScheduleConfig().EnableHeartbeatAsyncRunner { + ctx = context.WithValue(ctx, ctxutil.TaskRunnerKey, c.taskRunner) + } + + if err := c.processRegionHeartbeat(ctx, region); err != nil { tracer.OnAllStageFinished() return err } @@ -553,26 +570,55 @@ func (c *Cluster) HandleRegionHeartbeat(region *core.RegionInfo) error { } // processRegionHeartbeat updates the region information. -func (c *Cluster) processRegionHeartbeat(region *core.RegionInfo, tracer core.RegionHeartbeatProcessTracer) error { - origin, _, err := c.PreCheckPutRegion(region, tracer) +func (c *Cluster) processRegionHeartbeat(ctx context.Context, region *core.RegionInfo) error { + tracer, ok := ctx.Value(ctxutil.HeartbeatTracerKey).(core.RegionHeartbeatProcessTracer) + if !ok { + tracer = core.NewNoopHeartbeatProcessTracer() + } + runner, ok := ctx.Value(ctxutil.TaskRunnerKey).(ratelimit.Runner) + if !ok { + runner = syncRunner + } + limiter, _ := ctx.Value(ctxutil.LimiterKey).(*ratelimit.ConcurrencyLimiter) + origin, _, err := c.PreCheckPutRegion(region) tracer.OnPreCheckFinished() if err != nil { return err } region.Inherit(origin, c.GetStoreConfig().IsEnableRegionBucket()) - cluster.HandleStatsAsync(c, region) + runner.RunTask( + ctx, + ratelimit.TaskOpts{ + TaskName: "HandleStatsAsync", + Limit: limiter, + }, + func(ctx context.Context) { + cluster.HandleStatsAsync(c, region) + }, + ) tracer.OnAsyncHotStatsFinished() hasRegionStats := c.regionStats != nil // Save to storage if meta is updated, except for flashback. // Save to cache if meta or leader is updated, or contains any down/pending peer. - _, saveCache, _ := core.GenerateRegionGuideFunc(true)(region, origin) + _, saveCache, _ := core.GenerateRegionGuideFunc(true)(ctx, region, origin) if !saveCache { // Due to some config changes need to update the region stats as well, // so we do some extra checks here. if hasRegionStats && c.regionStats.RegionStatsNeedUpdate(region) { - c.regionStats.Observe(region, c.GetRegionStores(region)) + runner.RunTask( + ctx, + ratelimit.TaskOpts{ + TaskName: "ObserveRegionStatsAsync", + Limit: limiter, + }, + func(ctx context.Context) { + if c.regionStats.RegionStatsNeedUpdate(region) { + cluster.Collect(c, region, hasRegionStats) + } + }, + ) } return nil } @@ -583,15 +629,35 @@ func (c *Cluster) processRegionHeartbeat(region *core.RegionInfo, tracer core.Re // check its validation again here. // // However, it can't solve the race condition of concurrent heartbeats from the same region. - if overlaps, err = c.AtomicCheckAndPutRegion(region, tracer); err != nil { + + // Async task in next PR. + if overlaps, err = c.AtomicCheckAndPutRegion(ctx, region); err != nil { tracer.OnSaveCacheFinished() return err } - - cluster.HandleOverlaps(c, overlaps) + runner.RunTask( + ctx, + ratelimit.TaskOpts{ + TaskName: "HandleOverlaps", + Limit: limiter, + }, + func(ctx context.Context) { + cluster.HandleOverlaps(c, overlaps) + }, + ) } tracer.OnSaveCacheFinished() - cluster.Collect(c, region, c.GetRegionStores(region), hasRegionStats) + // handle region stats + runner.RunTask( + ctx, + ratelimit.TaskOpts{ + TaskName: "CollectRegionStatsAsync", + Limit: c.hbConcurrencyLimiter, + }, + func(ctx context.Context) { + cluster.Collect(c, region, hasRegionStats) + }, + ) tracer.OnCollectRegionStatsFinished() return nil } diff --git a/pkg/ratelimit/runner.go b/pkg/ratelimit/runner.go index 661668af3b91..477d03f2e13a 100644 --- a/pkg/ratelimit/runner.go +++ b/pkg/ratelimit/runner.go @@ -29,6 +29,8 @@ const initialCapacity = 100 // Runner is the interface for running tasks. type Runner interface { RunTask(ctx context.Context, opt TaskOpts, f func(context.Context)) error + Start() + Stop() } // Task is a task to be run. @@ -60,9 +62,7 @@ func NewAsyncRunner(name string, maxPendingDuration time.Duration) *AsyncRunner maxPendingDuration: maxPendingDuration, taskChan: make(chan *Task), pendingTasks: make([]*Task, 0, initialCapacity), - stopChan: make(chan struct{}), } - s.Start() return s } @@ -75,6 +75,7 @@ type TaskOpts struct { // Start starts the runner. func (s *AsyncRunner) Start() { + s.stopChan = make(chan struct{}) s.wg.Add(1) go func() { defer s.wg.Done() @@ -166,3 +167,9 @@ func (s *SyncRunner) RunTask(ctx context.Context, opt TaskOpts, f func(context.C f(ctx) return nil } + +// Start starts the runner. +func (s *SyncRunner) Start() {} + +// Stop stops the runner. +func (s *SyncRunner) Stop() {} diff --git a/pkg/schedule/config/config.go b/pkg/schedule/config/config.go index 56038ddcb098..2fca8c192257 100644 --- a/pkg/schedule/config/config.go +++ b/pkg/schedule/config/config.go @@ -52,6 +52,7 @@ const ( defaultEnableJointConsensus = true defaultEnableTiKVSplitRegion = true defaultEnableHeartbeatBreakdownMetrics = true + defaultEnableHeartbeatAsyncRunner = false defaultEnableCrossTableMerge = true defaultEnableDiagnostic = true defaultStrictlyMatchLabel = false @@ -267,6 +268,9 @@ type ScheduleConfig struct { // EnableHeartbeatBreakdownMetrics is the option to enable heartbeat stats metrics. EnableHeartbeatBreakdownMetrics bool `toml:"enable-heartbeat-breakdown-metrics" json:"enable-heartbeat-breakdown-metrics,string"` + // EnableHeartbeatAsyncRunner is the option to enable heartbeat async runner. + EnableHeartbeatAsyncRunner bool `toml:"enable-heartbeat-async-runner" json:"enable-heartbeat-async-runner,string"` + // Schedulers support for loading customized schedulers Schedulers SchedulerConfigs `toml:"schedulers" json:"schedulers-v2"` // json v2 is for the sake of compatible upgrade @@ -382,6 +386,10 @@ func (c *ScheduleConfig) Adjust(meta *configutil.ConfigMetaData, reloading bool) c.EnableHeartbeatBreakdownMetrics = defaultEnableHeartbeatBreakdownMetrics } + if !meta.IsDefined("enable-heartbeat-async-runner") { + c.EnableHeartbeatAsyncRunner = defaultEnableHeartbeatAsyncRunner + } + if !meta.IsDefined("enable-cross-table-merge") { c.EnableCrossTableMerge = defaultEnableCrossTableMerge } diff --git a/pkg/syncer/client.go b/pkg/syncer/client.go index ffbd71d2f1ea..558423722ffe 100644 --- a/pkg/syncer/client.go +++ b/pkg/syncer/client.go @@ -200,13 +200,12 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) { region = core.NewRegionInfo(r, regionLeader, core.SetSource(core.Sync)) } - tracer := core.NewNoopHeartbeatProcessTracer() - origin, _, err := bc.PreCheckPutRegion(region, tracer) + origin, _, err := bc.PreCheckPutRegion(region) if err != nil { log.Debug("region is stale", zap.Stringer("origin", origin.GetMeta()), errs.ZapError(err)) continue } - saveKV, _, _ := regionGuide(region, origin) + saveKV, _, _ := regionGuide(ctx, region, origin) overlaps := bc.PutRegion(region) if hasBuckets { diff --git a/pkg/utils/ctxutil/context.go b/pkg/utils/ctxutil/context.go new file mode 100644 index 000000000000..8a4c73218287 --- /dev/null +++ b/pkg/utils/ctxutil/context.go @@ -0,0 +1,27 @@ +// Copyright 2024 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ctxutil + +// CtxKey is a custom type used as a key for values stored in Context. +type CtxKey string + +const ( + // HeartbeatTracerKey is the key for the heartbeat tracer in the context. + HeartbeatTracerKey CtxKey = "h_tracer" + // TaskRunnerKey is the key for the task runner in the context. + TaskRunnerKey CtxKey = "task_runner" + // LimiterKey is the key for the concurrency limiter in the context. + LimiterKey CtxKey = "limiter" +) diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index b11f1ff55a50..95507d8c4aa9 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -21,6 +21,7 @@ import ( "io" "math" "net/http" + "runtime" "strconv" "strings" "sync" @@ -44,6 +45,7 @@ import ( mcsutils "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/memory" "github.com/tikv/pd/pkg/progress" + "github.com/tikv/pd/pkg/ratelimit" "github.com/tikv/pd/pkg/replication" sc "github.com/tikv/pd/pkg/schedule/config" "github.com/tikv/pd/pkg/schedule/hbstream" @@ -56,6 +58,7 @@ import ( "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/syncer" "github.com/tikv/pd/pkg/unsaferecovery" + "github.com/tikv/pd/pkg/utils/ctxutil" "github.com/tikv/pd/pkg/utils/etcdutil" "github.com/tikv/pd/pkg/utils/logutil" "github.com/tikv/pd/pkg/utils/netutil" @@ -166,6 +169,9 @@ type RaftCluster struct { keyspaceGroupManager *keyspace.GroupManager independentServices sync.Map hbstreams *hbstream.HeartbeatStreams + + taskRunner ratelimit.Runner + hbConcurrencyLimiter *ratelimit.ConcurrencyLimiter } // Status saves some state information. @@ -182,13 +188,15 @@ type Status struct { func NewRaftCluster(ctx context.Context, clusterID uint64, basicCluster *core.BasicCluster, storage storage.Storage, regionSyncer *syncer.RegionSyncer, etcdClient *clientv3.Client, httpClient *http.Client) *RaftCluster { return &RaftCluster{ - serverCtx: ctx, - clusterID: clusterID, - regionSyncer: regionSyncer, - httpClient: httpClient, - etcdClient: etcdClient, - core: basicCluster, - storage: storage, + serverCtx: ctx, + clusterID: clusterID, + regionSyncer: regionSyncer, + httpClient: httpClient, + etcdClient: etcdClient, + core: basicCluster, + storage: storage, + taskRunner: ratelimit.NewAsyncRunner("heartbeat-async-task-runner", time.Minute), + hbConcurrencyLimiter: ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU() * 2)), } } @@ -291,7 +299,6 @@ func (c *RaftCluster) Start(s Server) error { log.Warn("raft cluster has already been started") return nil } - c.isAPIServiceMode = s.IsAPIServiceMode() err := c.InitCluster(s.GetAllocator(), s.GetPersistOptions(), s.GetHBStreams(), s.GetKeyspaceGroupManager()) if err != nil { @@ -347,6 +354,7 @@ func (c *RaftCluster) Start(s Server) error { go c.startGCTuner() c.running = true + c.taskRunner.Start() return nil } @@ -740,6 +748,7 @@ func (c *RaftCluster) Stop() { if !c.IsServiceIndependent(mcsutils.SchedulingServiceName) { c.stopSchedulingJobs() } + c.taskRunner.Stop() c.Unlock() c.wg.Wait() @@ -988,10 +997,21 @@ func (c *RaftCluster) processReportBuckets(buckets *metapb.Buckets) error { } var regionGuide = core.GenerateRegionGuideFunc(true) +var syncRunner = ratelimit.NewSyncRunner() // processRegionHeartbeat updates the region information. -func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo, tracer core.RegionHeartbeatProcessTracer) error { - origin, _, err := c.core.PreCheckPutRegion(region, tracer) +func (c *RaftCluster) processRegionHeartbeat(ctx context.Context, region *core.RegionInfo) error { + tracer, ok := ctx.Value(ctxutil.HeartbeatTracerKey).(core.RegionHeartbeatProcessTracer) + if !ok { + tracer = core.NewNoopHeartbeatProcessTracer() + } + runner, ok := ctx.Value(ctxutil.TaskRunnerKey).(ratelimit.Runner) + if !ok { + runner = syncRunner + } + limiter, _ := ctx.Value(ctxutil.LimiterKey).(*ratelimit.ConcurrencyLimiter) + + origin, _, err := c.core.PreCheckPutRegion(region) tracer.OnPreCheckFinished() if err != nil { return err @@ -1000,13 +1020,22 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo, tracer cor region.Inherit(origin, c.GetStoreConfig().IsEnableRegionBucket()) if !c.IsServiceIndependent(mcsutils.SchedulingServiceName) { - cluster.HandleStatsAsync(c, region) + runner.RunTask( + ctx, + ratelimit.TaskOpts{ + TaskName: "HandleStatsAsync", + Limit: limiter, + }, + func(ctx context.Context) { + cluster.HandleStatsAsync(c, region) + }, + ) } tracer.OnAsyncHotStatsFinished() hasRegionStats := c.regionStats != nil // Save to storage if meta is updated, except for flashback. // Save to cache if meta or leader is updated, or contains any down/pending peer. - saveKV, saveCache, needSync := regionGuide(region, origin) + saveKV, saveCache, needSync := regionGuide(ctx, region, origin) tracer.OnRegionGuideFinished() if !saveKV && !saveCache { // Due to some config changes need to update the region stats as well, @@ -1015,7 +1044,18 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo, tracer cor // region stats needs to be collected in API mode. // We need to think of a better way to reduce this part of the cost in the future. if hasRegionStats && c.regionStats.RegionStatsNeedUpdate(region) { - c.regionStats.Observe(region, c.getRegionStoresLocked(region)) + runner.RunTask( + ctx, + ratelimit.TaskOpts{ + TaskName: "ObserveRegionStatsAsync", + Limit: limiter, + }, + func(ctx context.Context) { + if c.regionStats.RegionStatsNeedUpdate(region) { + cluster.Collect(c, region, hasRegionStats) + } + }, + ) } return nil } @@ -1032,43 +1072,72 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo, tracer cor // check its validation again here. // // However, it can't solve the race condition of concurrent heartbeats from the same region. - if overlaps, err = c.core.AtomicCheckAndPutRegion(region, tracer); err != nil { + if overlaps, err = c.core.AtomicCheckAndPutRegion(ctx, region); err != nil { tracer.OnSaveCacheFinished() return err } if !c.IsServiceIndependent(mcsutils.SchedulingServiceName) { - cluster.HandleOverlaps(c, overlaps) + runner.RunTask( + ctx, + ratelimit.TaskOpts{ + TaskName: "HandleOverlaps", + Limit: limiter, + }, + func(ctx context.Context) { + cluster.HandleOverlaps(c, overlaps) + }, + ) } regionUpdateCacheEventCounter.Inc() } tracer.OnSaveCacheFinished() - // TODO: Due to the accuracy requirements of the API "/regions/check/xxx", - // region stats needs to be collected in API mode. - // We need to think of a better way to reduce this part of the cost in the future. - cluster.Collect(c, region, c.GetRegionStores(region), hasRegionStats) + // handle region stats + runner.RunTask( + ctx, + ratelimit.TaskOpts{ + TaskName: "CollectRegionStatsAsync", + Limit: c.hbConcurrencyLimiter, + }, + func(ctx context.Context) { + // TODO: Due to the accuracy requirements of the API "/regions/check/xxx", + // region stats needs to be collected in API mode. + // We need to think of a better way to reduce this part of the cost in the future. + cluster.Collect(c, region, hasRegionStats) + }, + ) + tracer.OnCollectRegionStatsFinished() if c.storage != nil { - // If there are concurrent heartbeats from the same region, the last write will win even if - // writes to storage in the critical area. So don't use mutex to protect it. - // Not successfully saved to storage is not fatal, it only leads to longer warm-up - // after restart. Here we only log the error then go on updating cache. - for _, item := range overlaps { - if err := c.storage.DeleteRegion(item.GetMeta()); err != nil { - log.Error("failed to delete region from storage", - zap.Uint64("region-id", item.GetID()), - logutil.ZapRedactStringer("region-meta", core.RegionToHexMeta(item.GetMeta())), - errs.ZapError(err)) - } - } if saveKV { - if err := c.storage.SaveRegion(region.GetMeta()); err != nil { - log.Error("failed to save region to storage", - zap.Uint64("region-id", region.GetID()), - logutil.ZapRedactStringer("region-meta", core.RegionToHexMeta(region.GetMeta())), - errs.ZapError(err)) - } - regionUpdateKVEventCounter.Inc() + runner.RunTask( + ctx, + ratelimit.TaskOpts{ + TaskName: "SaveRegionToKV", + Limit: c.hbConcurrencyLimiter, + }, + func(ctx context.Context) { + // If there are concurrent heartbeats from the same region, the last write will win even if + // writes to storage in the critical area. So don't use mutex to protect it. + // Not successfully saved to storage is not fatal, it only leads to longer warm-up + // after restart. Here we only log the error then go on updating cache. + for _, item := range overlaps { + if err := c.storage.DeleteRegion(item.GetMeta()); err != nil { + log.Error("failed to delete region from storage", + zap.Uint64("region-id", item.GetID()), + logutil.ZapRedactStringer("region-meta", core.RegionToHexMeta(item.GetMeta())), + errs.ZapError(err)) + } + } + if err := c.storage.SaveRegion(region.GetMeta()); err != nil { + log.Error("failed to save region to storage", + zap.Uint64("region-id", region.GetID()), + logutil.ZapRedactStringer("region-meta", core.RegionToHexMeta(region.GetMeta())), + errs.ZapError(err)) + } + regionUpdateKVEventCounter.Inc() + }, + ) } } @@ -2069,16 +2138,6 @@ func (c *RaftCluster) resetProgressIndicator() { storesETAGauge.Reset() } -func (c *RaftCluster) getRegionStoresLocked(region *core.RegionInfo) []*core.StoreInfo { - stores := make([]*core.StoreInfo, 0, len(region.GetPeers())) - for _, p := range region.GetPeers() { - if store := c.core.GetStore(p.StoreId); store != nil { - stores = append(stores, store) - } - } - return stores -} - // OnStoreVersionChange changes the version of the cluster when needed. func (c *RaftCluster) OnStoreVersionChange() { c.RLock() diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index dc0f79667614..a3a9e5fbb73e 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -54,6 +54,7 @@ import ( "github.com/tikv/pd/pkg/statistics" "github.com/tikv/pd/pkg/statistics/utils" "github.com/tikv/pd/pkg/storage" + "github.com/tikv/pd/pkg/utils/ctxutil" "github.com/tikv/pd/pkg/utils/operatorutil" "github.com/tikv/pd/pkg/utils/testutil" "github.com/tikv/pd/pkg/utils/typeutil" @@ -631,7 +632,7 @@ func TestRegionHeartbeatHotStat(t *testing.T) { region := core.NewRegionInfo(regionMeta, leader, core.WithInterval(&pdpb.TimeInterval{StartTimestamp: 0, EndTimestamp: utils.RegionHeartBeatReportInterval}), core.SetWrittenBytes(30000*10), core.SetWrittenKeys(300000*10)) - err = cluster.processRegionHeartbeat(region, core.NewNoopHeartbeatProcessTracer()) + err = cluster.processRegionHeartbeat(context.TODO(), region) re.NoError(err) // wait HotStat to update items time.Sleep(time.Second) @@ -644,7 +645,7 @@ func TestRegionHeartbeatHotStat(t *testing.T) { StoreId: 4, } region = region.Clone(core.WithRemoveStorePeer(2), core.WithAddPeer(newPeer)) - err = cluster.processRegionHeartbeat(region, core.NewNoopHeartbeatProcessTracer()) + err = cluster.processRegionHeartbeat(context.TODO(), region) re.NoError(err) // wait HotStat to update items time.Sleep(time.Second) @@ -681,8 +682,8 @@ func TestBucketHeartbeat(t *testing.T) { re.NoError(cluster.putStoreLocked(store)) } - re.NoError(cluster.processRegionHeartbeat(regions[0], core.NewNoopHeartbeatProcessTracer())) - re.NoError(cluster.processRegionHeartbeat(regions[1], core.NewNoopHeartbeatProcessTracer())) + re.NoError(cluster.processRegionHeartbeat(context.TODO(), regions[0])) + re.NoError(cluster.processRegionHeartbeat(context.TODO(), regions[1])) re.Nil(cluster.GetRegion(uint64(1)).GetBuckets()) re.NoError(cluster.processReportBuckets(buckets)) re.Equal(buckets, cluster.GetRegion(uint64(1)).GetBuckets()) @@ -701,13 +702,13 @@ func TestBucketHeartbeat(t *testing.T) { // case5: region update should inherit buckets. newRegion := regions[1].Clone(core.WithIncConfVer(), core.SetBuckets(nil)) opt.SetRegionBucketEnabled(true) - re.NoError(cluster.processRegionHeartbeat(newRegion, core.NewNoopHeartbeatProcessTracer())) + re.NoError(cluster.processRegionHeartbeat(context.TODO(), newRegion)) re.Len(cluster.GetRegion(uint64(1)).GetBuckets().GetKeys(), 2) // case6: disable region bucket in opt.SetRegionBucketEnabled(false) newRegion2 := regions[1].Clone(core.WithIncConfVer(), core.SetBuckets(nil)) - re.NoError(cluster.processRegionHeartbeat(newRegion2, core.NewNoopHeartbeatProcessTracer())) + re.NoError(cluster.processRegionHeartbeat(context.TODO(), newRegion2)) re.Nil(cluster.GetRegion(uint64(1)).GetBuckets()) re.Empty(cluster.GetRegion(uint64(1)).GetBuckets().GetKeys()) } @@ -733,25 +734,25 @@ func TestRegionHeartbeat(t *testing.T) { for i, region := range regions { // region does not exist. - re.NoError(cluster.processRegionHeartbeat(region, core.NewNoopHeartbeatProcessTracer())) + re.NoError(cluster.processRegionHeartbeat(context.TODO(), region)) checkRegions(re, cluster.core, regions[:i+1]) checkRegionsKV(re, cluster.storage, regions[:i+1]) // region is the same, not updated. - re.NoError(cluster.processRegionHeartbeat(region, core.NewNoopHeartbeatProcessTracer())) + re.NoError(cluster.processRegionHeartbeat(context.TODO(), region)) checkRegions(re, cluster.core, regions[:i+1]) checkRegionsKV(re, cluster.storage, regions[:i+1]) origin := region // region is updated. region = origin.Clone(core.WithIncVersion()) regions[i] = region - re.NoError(cluster.processRegionHeartbeat(region, core.NewNoopHeartbeatProcessTracer())) + re.NoError(cluster.processRegionHeartbeat(context.TODO(), region)) checkRegions(re, cluster.core, regions[:i+1]) checkRegionsKV(re, cluster.storage, regions[:i+1]) // region is stale (Version). stale := origin.Clone(core.WithIncConfVer()) - re.Error(cluster.processRegionHeartbeat(stale, core.NewNoopHeartbeatProcessTracer())) + re.Error(cluster.processRegionHeartbeat(context.TODO(), stale)) checkRegions(re, cluster.core, regions[:i+1]) checkRegionsKV(re, cluster.storage, regions[:i+1]) @@ -761,13 +762,13 @@ func TestRegionHeartbeat(t *testing.T) { core.WithIncConfVer(), ) regions[i] = region - re.NoError(cluster.processRegionHeartbeat(region, core.NewNoopHeartbeatProcessTracer())) + re.NoError(cluster.processRegionHeartbeat(context.TODO(), region)) checkRegions(re, cluster.core, regions[:i+1]) checkRegionsKV(re, cluster.storage, regions[:i+1]) // region is stale (ConfVer). stale = origin.Clone(core.WithIncConfVer()) - re.Error(cluster.processRegionHeartbeat(stale, core.NewNoopHeartbeatProcessTracer())) + re.Error(cluster.processRegionHeartbeat(context.TODO(), stale)) checkRegions(re, cluster.core, regions[:i+1]) checkRegionsKV(re, cluster.storage, regions[:i+1]) @@ -779,38 +780,38 @@ func TestRegionHeartbeat(t *testing.T) { }, })) regions[i] = region - re.NoError(cluster.processRegionHeartbeat(region, core.NewNoopHeartbeatProcessTracer())) + re.NoError(cluster.processRegionHeartbeat(context.TODO(), region)) checkRegions(re, cluster.core, regions[:i+1]) // Add a pending peer. region = region.Clone(core.WithPendingPeers([]*metapb.Peer{region.GetPeers()[rand.Intn(len(region.GetPeers()))]})) regions[i] = region - re.NoError(cluster.processRegionHeartbeat(region, core.NewNoopHeartbeatProcessTracer())) + re.NoError(cluster.processRegionHeartbeat(context.TODO(), region)) checkRegions(re, cluster.core, regions[:i+1]) // Clear down peers. region = region.Clone(core.WithDownPeers(nil)) regions[i] = region - re.NoError(cluster.processRegionHeartbeat(region, core.NewNoopHeartbeatProcessTracer())) + re.NoError(cluster.processRegionHeartbeat(context.TODO(), region)) checkRegions(re, cluster.core, regions[:i+1]) // Clear pending peers. region = region.Clone(core.WithPendingPeers(nil)) regions[i] = region - re.NoError(cluster.processRegionHeartbeat(region, core.NewNoopHeartbeatProcessTracer())) + re.NoError(cluster.processRegionHeartbeat(context.TODO(), region)) checkRegions(re, cluster.core, regions[:i+1]) // Remove peers. origin = region region = origin.Clone(core.SetPeers(region.GetPeers()[:1])) regions[i] = region - re.NoError(cluster.processRegionHeartbeat(region, core.NewNoopHeartbeatProcessTracer())) + re.NoError(cluster.processRegionHeartbeat(context.TODO(), region)) checkRegions(re, cluster.core, regions[:i+1]) checkRegionsKV(re, cluster.storage, regions[:i+1]) // Add peers. region = origin regions[i] = region - re.NoError(cluster.processRegionHeartbeat(region, core.NewNoopHeartbeatProcessTracer())) + re.NoError(cluster.processRegionHeartbeat(context.TODO(), region)) checkRegions(re, cluster.core, regions[:i+1]) checkRegionsKV(re, cluster.storage, regions[:i+1]) @@ -820,47 +821,47 @@ func TestRegionHeartbeat(t *testing.T) { core.WithIncConfVer(), ) regions[i] = region - re.NoError(cluster.processRegionHeartbeat(region, core.NewNoopHeartbeatProcessTracer())) + re.NoError(cluster.processRegionHeartbeat(context.TODO(), region)) checkRegions(re, cluster.core, regions[:i+1]) // Change leader. region = region.Clone(core.WithLeader(region.GetPeers()[1])) regions[i] = region - re.NoError(cluster.processRegionHeartbeat(region, core.NewNoopHeartbeatProcessTracer())) + re.NoError(cluster.processRegionHeartbeat(context.TODO(), region)) checkRegions(re, cluster.core, regions[:i+1]) // Change ApproximateSize. region = region.Clone(core.SetApproximateSize(144)) regions[i] = region - re.NoError(cluster.processRegionHeartbeat(region, core.NewNoopHeartbeatProcessTracer())) + re.NoError(cluster.processRegionHeartbeat(context.TODO(), region)) checkRegions(re, cluster.core, regions[:i+1]) // Change ApproximateKeys. region = region.Clone(core.SetApproximateKeys(144000)) regions[i] = region - re.NoError(cluster.processRegionHeartbeat(region, core.NewNoopHeartbeatProcessTracer())) + re.NoError(cluster.processRegionHeartbeat(context.TODO(), region)) checkRegions(re, cluster.core, regions[:i+1]) // Change bytes written. region = region.Clone(core.SetWrittenBytes(24000)) regions[i] = region - re.NoError(cluster.processRegionHeartbeat(region, core.NewNoopHeartbeatProcessTracer())) + re.NoError(cluster.processRegionHeartbeat(context.TODO(), region)) checkRegions(re, cluster.core, regions[:i+1]) // Change bytes read. region = region.Clone(core.SetReadBytes(1080000)) regions[i] = region - re.NoError(cluster.processRegionHeartbeat(region, core.NewNoopHeartbeatProcessTracer())) + re.NoError(cluster.processRegionHeartbeat(context.TODO(), region)) checkRegions(re, cluster.core, regions[:i+1]) // Flashback region = region.Clone(core.WithFlashback(true, 1)) regions[i] = region - re.NoError(cluster.processRegionHeartbeat(region, core.NewNoopHeartbeatProcessTracer())) + re.NoError(cluster.processRegionHeartbeat(context.TODO(), region)) checkRegions(re, cluster.core, regions[:i+1]) region = region.Clone(core.WithFlashback(false, 0)) regions[i] = region - re.NoError(cluster.processRegionHeartbeat(region, core.NewNoopHeartbeatProcessTracer())) + re.NoError(cluster.processRegionHeartbeat(context.TODO(), region)) checkRegions(re, cluster.core, regions[:i+1]) } @@ -916,8 +917,7 @@ func TestRegionHeartbeat(t *testing.T) { core.WithNewRegionID(10000), core.WithDecVersion(), ) - tracer := core.NewHeartbeatProcessTracer() - re.Error(cluster.processRegionHeartbeat(overlapRegion, tracer)) + re.Error(cluster.processRegionHeartbeat(context.TODO(), overlapRegion)) region := &metapb.Region{} ok, err := storage.LoadRegion(regions[n-1].GetID(), region) re.True(ok) @@ -941,9 +941,11 @@ func TestRegionHeartbeat(t *testing.T) { core.WithStartKey(regions[n-2].GetStartKey()), core.WithNewRegionID(regions[n-1].GetID()+1), ) - tracer = core.NewHeartbeatProcessTracer() + tracer := core.NewHeartbeatProcessTracer() tracer.Begin() - re.NoError(cluster.processRegionHeartbeat(overlapRegion, tracer)) + ctx := context.TODO() + ctx = context.WithValue(ctx, ctxutil.HeartbeatTracerKey, tracer) + re.NoError(cluster.processRegionHeartbeat(ctx, overlapRegion)) tracer.OnAllStageFinished() re.Condition(func() bool { fileds := tracer.LogFields() @@ -977,7 +979,7 @@ func TestRegionFlowChanged(t *testing.T) { regions := []*core.RegionInfo{core.NewTestRegionInfo(1, 1, []byte{}, []byte{})} processRegions := func(regions []*core.RegionInfo) { for _, r := range regions { - cluster.processRegionHeartbeat(r, core.NewNoopHeartbeatProcessTracer()) + cluster.processRegionHeartbeat(ctx, r) } } regions = core.SplitRegions(regions) @@ -1013,7 +1015,7 @@ func TestRegionSizeChanged(t *testing.T) { core.SetApproximateKeys(curMaxMergeKeys-1), core.SetSource(core.Heartbeat), ) - cluster.processRegionHeartbeat(region, core.NewNoopHeartbeatProcessTracer()) + cluster.processRegionHeartbeat(context.TODO(), region) regionID := region.GetID() re.True(cluster.regionStats.IsRegionStatsType(regionID, statistics.UndersizedRegion)) // Test ApproximateSize and ApproximateKeys change. @@ -1023,16 +1025,16 @@ func TestRegionSizeChanged(t *testing.T) { core.SetApproximateKeys(curMaxMergeKeys+1), core.SetSource(core.Heartbeat), ) - cluster.processRegionHeartbeat(region, core.NewNoopHeartbeatProcessTracer()) + cluster.processRegionHeartbeat(context.TODO(), region) re.False(cluster.regionStats.IsRegionStatsType(regionID, statistics.UndersizedRegion)) // Test MaxMergeRegionSize and MaxMergeRegionKeys change. cluster.opt.SetMaxMergeRegionSize(uint64(curMaxMergeSize + 2)) cluster.opt.SetMaxMergeRegionKeys(uint64(curMaxMergeKeys + 2)) - cluster.processRegionHeartbeat(region, core.NewNoopHeartbeatProcessTracer()) + cluster.processRegionHeartbeat(context.TODO(), region) re.True(cluster.regionStats.IsRegionStatsType(regionID, statistics.UndersizedRegion)) cluster.opt.SetMaxMergeRegionSize(uint64(curMaxMergeSize)) cluster.opt.SetMaxMergeRegionKeys(uint64(curMaxMergeKeys)) - cluster.processRegionHeartbeat(region, core.NewNoopHeartbeatProcessTracer()) + cluster.processRegionHeartbeat(context.TODO(), region) re.False(cluster.regionStats.IsRegionStatsType(regionID, statistics.UndersizedRegion)) } @@ -1095,11 +1097,11 @@ func TestConcurrentRegionHeartbeat(t *testing.T) { re.NoError(failpoint.Enable("github.com/tikv/pd/server/cluster/concurrentRegionHeartbeat", "return(true)")) go func() { defer wg.Done() - cluster.processRegionHeartbeat(source, core.NewNoopHeartbeatProcessTracer()) + cluster.processRegionHeartbeat(context.TODO(), source) }() time.Sleep(100 * time.Millisecond) re.NoError(failpoint.Disable("github.com/tikv/pd/server/cluster/concurrentRegionHeartbeat")) - re.NoError(cluster.processRegionHeartbeat(target, core.NewNoopHeartbeatProcessTracer())) + re.NoError(cluster.processRegionHeartbeat(context.TODO(), target)) wg.Wait() checkRegion(re, cluster.GetRegionByKey([]byte{}), target) } @@ -1161,7 +1163,7 @@ func TestRegionLabelIsolationLevel(t *testing.T) { func heartbeatRegions(re *require.Assertions, cluster *RaftCluster, regions []*core.RegionInfo) { // Heartbeat and check region one by one. for _, r := range regions { - re.NoError(cluster.processRegionHeartbeat(r, core.NewNoopHeartbeatProcessTracer())) + re.NoError(cluster.processRegionHeartbeat(context.TODO(), r)) checkRegion(re, cluster.GetRegion(r.GetID()), r) checkRegion(re, cluster.GetRegionByKey(r.GetStartKey()), r) @@ -1198,7 +1200,7 @@ func TestHeartbeatSplit(t *testing.T) { // 1: [nil, nil) region1 := core.NewRegionInfo(&metapb.Region{Id: 1, RegionEpoch: &metapb.RegionEpoch{Version: 1, ConfVer: 1}}, nil) - re.NoError(cluster.processRegionHeartbeat(region1, core.NewNoopHeartbeatProcessTracer())) + re.NoError(cluster.processRegionHeartbeat(context.TODO(), region1)) checkRegion(re, cluster.GetRegionByKey([]byte("foo")), region1) // split 1 to 2: [nil, m) 1: [m, nil), sync 2 first. @@ -1207,12 +1209,12 @@ func TestHeartbeatSplit(t *testing.T) { core.WithIncVersion(), ) region2 := core.NewRegionInfo(&metapb.Region{Id: 2, EndKey: []byte("m"), RegionEpoch: &metapb.RegionEpoch{Version: 1, ConfVer: 1}}, nil) - re.NoError(cluster.processRegionHeartbeat(region2, core.NewNoopHeartbeatProcessTracer())) + re.NoError(cluster.processRegionHeartbeat(context.TODO(), region2)) checkRegion(re, cluster.GetRegionByKey([]byte("a")), region2) // [m, nil) is missing before r1's heartbeat. re.Nil(cluster.GetRegionByKey([]byte("z"))) - re.NoError(cluster.processRegionHeartbeat(region1, core.NewNoopHeartbeatProcessTracer())) + re.NoError(cluster.processRegionHeartbeat(context.TODO(), region1)) checkRegion(re, cluster.GetRegionByKey([]byte("z")), region1) // split 1 to 3: [m, q) 1: [q, nil), sync 1 first. @@ -1221,12 +1223,12 @@ func TestHeartbeatSplit(t *testing.T) { core.WithIncVersion(), ) region3 := core.NewRegionInfo(&metapb.Region{Id: 3, StartKey: []byte("m"), EndKey: []byte("q"), RegionEpoch: &metapb.RegionEpoch{Version: 1, ConfVer: 1}}, nil) - re.NoError(cluster.processRegionHeartbeat(region1, core.NewNoopHeartbeatProcessTracer())) + re.NoError(cluster.processRegionHeartbeat(context.TODO(), region1)) checkRegion(re, cluster.GetRegionByKey([]byte("z")), region1) checkRegion(re, cluster.GetRegionByKey([]byte("a")), region2) // [m, q) is missing before r3's heartbeat. re.Nil(cluster.GetRegionByKey([]byte("n"))) - re.NoError(cluster.processRegionHeartbeat(region3, core.NewNoopHeartbeatProcessTracer())) + re.NoError(cluster.processRegionHeartbeat(context.TODO(), region3)) checkRegion(re, cluster.GetRegionByKey([]byte("n")), region3) } @@ -1522,11 +1524,11 @@ func TestUpdateStorePendingPeerCount(t *testing.T) { }, } origin := core.NewRegionInfo(&metapb.Region{Id: 1, Peers: peers[:3]}, peers[0], core.WithPendingPeers(peers[1:3])) - re.NoError(tc.processRegionHeartbeat(origin, core.NewNoopHeartbeatProcessTracer())) + re.NoError(tc.processRegionHeartbeat(context.TODO(), origin)) time.Sleep(50 * time.Millisecond) checkPendingPeerCount([]int{0, 1, 1, 0}, tc.RaftCluster, re) newRegion := core.NewRegionInfo(&metapb.Region{Id: 1, Peers: peers[1:]}, peers[1], core.WithPendingPeers(peers[3:4])) - re.NoError(tc.processRegionHeartbeat(newRegion, core.NewNoopHeartbeatProcessTracer())) + re.NoError(tc.processRegionHeartbeat(context.TODO(), newRegion)) time.Sleep(50 * time.Millisecond) checkPendingPeerCount([]int{0, 0, 0, 1}, tc.RaftCluster, re) } @@ -2137,6 +2139,7 @@ func newTestRaftCluster( opt *config.PersistOptions, s storage.Storage, ) *RaftCluster { + opt.GetScheduleConfig().EnableHeartbeatAsyncRunner = false rc := &RaftCluster{serverCtx: ctx, core: core.NewBasicCluster(), storage: s} rc.InitCluster(id, opt, nil, nil) rc.ruleManager = placement.NewRuleManager(ctx, storage.NewStorageWithMemoryBackend(), rc, opt) @@ -2959,12 +2962,12 @@ func TestShouldRun(t *testing.T) { for _, testCase := range testCases { r := tc.GetRegion(testCase.regionID) nr := r.Clone(core.WithLeader(r.GetPeers()[0]), core.SetSource(core.Heartbeat)) - re.NoError(tc.processRegionHeartbeat(nr, core.NewNoopHeartbeatProcessTracer())) + re.NoError(tc.processRegionHeartbeat(context.TODO(), nr)) re.Equal(testCase.ShouldRun, co.ShouldRun()) } nr := &metapb.Region{Id: 6, Peers: []*metapb.Peer{}} newRegion := core.NewRegionInfo(nr, nil, core.SetSource(core.Heartbeat)) - re.Error(tc.processRegionHeartbeat(newRegion, core.NewNoopHeartbeatProcessTracer())) + re.Error(tc.processRegionHeartbeat(context.TODO(), newRegion)) re.Equal(7, tc.core.GetClusterNotFromStorageRegionsCnt()) } @@ -3002,12 +3005,12 @@ func TestShouldRunWithNonLeaderRegions(t *testing.T) { for _, testCase := range testCases { r := tc.GetRegion(testCase.regionID) nr := r.Clone(core.WithLeader(r.GetPeers()[0]), core.SetSource(core.Heartbeat)) - re.NoError(tc.processRegionHeartbeat(nr, core.NewNoopHeartbeatProcessTracer())) + re.NoError(tc.processRegionHeartbeat(context.TODO(), nr)) re.Equal(testCase.ShouldRun, co.ShouldRun()) } nr := &metapb.Region{Id: 9, Peers: []*metapb.Peer{}} newRegion := core.NewRegionInfo(nr, nil, core.SetSource(core.Heartbeat)) - re.Error(tc.processRegionHeartbeat(newRegion, core.NewNoopHeartbeatProcessTracer())) + re.Error(tc.processRegionHeartbeat(context.TODO(), newRegion)) re.Equal(9, tc.core.GetClusterNotFromStorageRegionsCnt()) // Now, after server is prepared, there exist some regions with no leader. diff --git a/server/cluster/cluster_worker.go b/server/cluster/cluster_worker.go index 5ae8fdc0396f..5b8ea0a6ec1a 100644 --- a/server/cluster/cluster_worker.go +++ b/server/cluster/cluster_worker.go @@ -16,6 +16,7 @@ package cluster import ( "bytes" + "context" "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/metapb" @@ -26,6 +27,7 @@ import ( mcsutils "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/schedule/operator" "github.com/tikv/pd/pkg/statistics/buckets" + "github.com/tikv/pd/pkg/utils/ctxutil" "github.com/tikv/pd/pkg/utils/logutil" "github.com/tikv/pd/pkg/utils/typeutil" "github.com/tikv/pd/pkg/versioninfo" @@ -39,7 +41,13 @@ func (c *RaftCluster) HandleRegionHeartbeat(region *core.RegionInfo) error { tracer = core.NewHeartbeatProcessTracer() } tracer.Begin() - if err := c.processRegionHeartbeat(region, tracer); err != nil { + ctx := context.WithValue(c.ctx, ctxutil.HeartbeatTracerKey, tracer) + ctx = context.WithValue(ctx, ctxutil.LimiterKey, c.hbConcurrencyLimiter) + if c.GetScheduleConfig().EnableHeartbeatAsyncRunner { + ctx = context.WithValue(ctx, ctxutil.TaskRunnerKey, c.taskRunner) + } + + if err := c.processRegionHeartbeat(ctx, region); err != nil { tracer.OnAllStageFinished() return err } diff --git a/tests/cluster.go b/tests/cluster.go index 198b49ce7284..55a7646a6887 100644 --- a/tests/cluster.go +++ b/tests/cluster.go @@ -88,6 +88,8 @@ func NewTestAPIServer(ctx context.Context, cfg *config.Config) (*TestServer, err } func createTestServer(ctx context.Context, cfg *config.Config, services []string) (*TestServer, error) { + // disable the heartbeat async runner in test + cfg.Schedule.EnableHeartbeatAsyncRunner = false err := logutil.SetupLogger(cfg.Log, &cfg.Logger, &cfg.LogProps, cfg.Security.RedactInfoLog) if err != nil { return nil, err