From 65f4a45f12c944a98850f91ee74305b6de2a98cb Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Mon, 6 Nov 2023 18:12:26 +0800 Subject: [PATCH] dynamic enable scheduling jobs Signed-off-by: Ryan Leung --- pkg/utils/apiutil/serverapi/middleware.go | 3 ++- server/cluster/cluster.go | 27 ++++++++++++++++++----- server/cluster/scheduling_controller.go | 17 ++++++++------ server/grpc_service.go | 4 ++-- 4 files changed, 35 insertions(+), 16 deletions(-) diff --git a/pkg/utils/apiutil/serverapi/middleware.go b/pkg/utils/apiutil/serverapi/middleware.go index 19438ad0f913..e269c4236a28 100644 --- a/pkg/utils/apiutil/serverapi/middleware.go +++ b/pkg/utils/apiutil/serverapi/middleware.go @@ -22,6 +22,7 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/log" "github.com/tikv/pd/pkg/errs" + "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/slice" "github.com/tikv/pd/pkg/utils/apiutil" "github.com/tikv/pd/server" @@ -106,7 +107,7 @@ func MicroserviceRedirectRule(matchPath, targetPath, targetServiceName string, m } func (h *redirector) matchMicroServiceRedirectRules(r *http.Request) (bool, string) { - if !h.s.IsAPIServiceMode() { + if !h.s.IsServiceEnabled(utils.SchedulingServiceName) { return false, "" } if len(h.microserviceRedirectRules) == 0 { diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index c0db3e1b4cf7..9817fec87eeb 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -41,6 +41,7 @@ import ( "github.com/tikv/pd/pkg/gctuner" "github.com/tikv/pd/pkg/id" "github.com/tikv/pd/pkg/keyspace" + "github.com/tikv/pd/pkg/mcs/discovery" mcsutils "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/memory" "github.com/tikv/pd/pkg/progress" @@ -349,9 +350,25 @@ func (c *RaftCluster) runServiceCheckJob() { checkFn := func() { if c.isAPIServiceMode { - once.Do(c.initSchedulers) - c.enabledServices.Store(mcsutils.SchedulingServiceName, true) + servers, err := discovery.Discover(c.etcdClient, strconv.FormatUint(c.clusterID, 10), mcsutils.SchedulingServiceName) + if err != nil { + return + } + if len(servers) != 0 { + if c.schedulingController.running.Load() { + c.stopSchedulingJobs() + } + once.Do(c.initSchedulers) + c.enabledServices.Store(mcsutils.SchedulingServiceName, true) + } else { + if !c.schedulingController.running.Load() { + c.coordinator = schedule.NewCoordinator(c.ctx, c, c.GetHeartbeatStreams()) + c.startSchedulingJobs() + } + c.enabledServices.Delete(mcsutils.SchedulingServiceName) + } } else if !c.schedulingController.running.Load() { + c.coordinator = schedule.NewCoordinator(c.ctx, c, c.GetHeartbeatStreams()) c.startSchedulingJobs() c.enabledServices.Delete(mcsutils.SchedulingServiceName) } @@ -996,7 +1013,7 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error { // Save to cache if meta or leader is updated, or contains any down/pending peer. // Mark isNew if the region in cache does not have leader. isNew, saveKV, saveCache, needSync := regionGuide(region, origin) - if !c.IsServiceEnabled(mcsutils.SchedulingServiceName) && !saveKV && !saveCache && !isNew { + if !saveKV && !saveCache && !isNew { // Due to some config changes need to update the region stats as well, // so we do some extra checks here. if hasRegionStats && c.regionStats.RegionStatsNeedUpdate(region) { @@ -1027,9 +1044,7 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error { regionUpdateCacheEventCounter.Inc() } - if !c.IsServiceEnabled(mcsutils.SchedulingServiceName) { - cluster.Collect(c, region, c.GetRegionStores(region), hasRegionStats, isNew, c.IsPrepared()) - } + cluster.Collect(c, region, c.GetRegionStores(region), hasRegionStats, isNew, c.IsPrepared()) if c.storage != nil { // If there are concurrent heartbeats from the same region, the last write will win even if diff --git a/server/cluster/scheduling_controller.go b/server/cluster/scheduling_controller.go index a6d74cd59e2c..135fc5467a14 100644 --- a/server/cluster/scheduling_controller.go +++ b/server/cluster/scheduling_controller.go @@ -38,9 +38,10 @@ import ( ) type schedulingController struct { - ctx context.Context - cancel context.CancelFunc - wg sync.WaitGroup + parentCtx context.Context + ctx context.Context + cancel context.CancelFunc + wg sync.WaitGroup *core.BasicCluster opt *config.PersistOptions coordinator *schedule.Coordinator @@ -51,14 +52,15 @@ type schedulingController struct { running atomic.Bool } -func newSchedulingController(ctx context.Context) *schedulingController { - ctx, cancel := context.WithCancel(ctx) +func newSchedulingController(parentCtx context.Context) *schedulingController { + ctx, cancel := context.WithCancel(parentCtx) return &schedulingController{ + parentCtx: parentCtx, ctx: ctx, cancel: cancel, labelStats: statistics.NewLabelStatistics(), - hotStat: statistics.NewHotStat(ctx), - slowStat: statistics.NewSlowStat(ctx), + hotStat: statistics.NewHotStat(parentCtx), + slowStat: statistics.NewSlowStat(parentCtx), } } @@ -78,6 +80,7 @@ func (sc *schedulingController) stopSchedulingJobs() { } func (sc *schedulingController) startSchedulingJobs() { + sc.ctx, sc.cancel = context.WithCancel(sc.parentCtx) sc.wg.Add(3) go sc.runCoordinator() go sc.runStatsBackgroundJobs() diff --git a/server/grpc_service.go b/server/grpc_service.go index 4aa19de0a49d..6fe5c69d1b95 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -866,7 +866,7 @@ func (s *GrpcServer) StoreHeartbeat(ctx context.Context, request *pdpb.StoreHear if _, err := cli.StoreHeartbeat(ctx, req); err != nil { log.Info("forward store heartbeat failed", zap.Error(err)) // reset to let it be updated in the next request - // s.schedulingClient.CompareAndSwap(forwardCli, &schedulingClient{}) + s.schedulingClient.CompareAndSwap(forwardCli, &schedulingClient{}) } } } @@ -1184,7 +1184,7 @@ func (s *GrpcServer) RegionHeartbeat(stream pdpb.PD_RegionHeartbeatServer) error regionHeartbeatHandleDuration.WithLabelValues(storeAddress, storeLabel).Observe(time.Since(start).Seconds()) regionHeartbeatCounter.WithLabelValues(storeAddress, storeLabel, "report", "ok").Inc() - if s.IsAPIServiceMode() { + if s.IsServiceEnabled(utils.SchedulingServiceName) { forwardedSchedulingHost, ok := s.GetServicePrimaryAddr(stream.Context(), utils.SchedulingServiceName) if !ok || len(forwardedSchedulingHost) == 0 { log.Error("failed to find scheduling service primary address")