Skip to content

Commit

Permalink
dynamic enable scheduling jobs
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <rleungx@gmail.com>
  • Loading branch information
rleungx committed Nov 7, 2023
1 parent 0f0fa4b commit 65f4a45
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 16 deletions.
3 changes: 2 additions & 1 deletion pkg/utils/apiutil/serverapi/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/slice"
"github.com/tikv/pd/pkg/utils/apiutil"
"github.com/tikv/pd/server"
Expand Down Expand Up @@ -106,7 +107,7 @@ func MicroserviceRedirectRule(matchPath, targetPath, targetServiceName string, m
}

func (h *redirector) matchMicroServiceRedirectRules(r *http.Request) (bool, string) {
if !h.s.IsAPIServiceMode() {
if !h.s.IsServiceEnabled(utils.SchedulingServiceName) {
return false, ""
}
if len(h.microserviceRedirectRules) == 0 {
Expand Down
27 changes: 21 additions & 6 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/tikv/pd/pkg/gctuner"
"github.com/tikv/pd/pkg/id"
"github.com/tikv/pd/pkg/keyspace"
"github.com/tikv/pd/pkg/mcs/discovery"
mcsutils "github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/memory"
"github.com/tikv/pd/pkg/progress"
Expand Down Expand Up @@ -349,9 +350,25 @@ func (c *RaftCluster) runServiceCheckJob() {

checkFn := func() {
if c.isAPIServiceMode {
once.Do(c.initSchedulers)
c.enabledServices.Store(mcsutils.SchedulingServiceName, true)
servers, err := discovery.Discover(c.etcdClient, strconv.FormatUint(c.clusterID, 10), mcsutils.SchedulingServiceName)
if err != nil {
return
}
if len(servers) != 0 {
if c.schedulingController.running.Load() {
c.stopSchedulingJobs()
}
once.Do(c.initSchedulers)
c.enabledServices.Store(mcsutils.SchedulingServiceName, true)
} else {
if !c.schedulingController.running.Load() {
c.coordinator = schedule.NewCoordinator(c.ctx, c, c.GetHeartbeatStreams())
c.startSchedulingJobs()
}
c.enabledServices.Delete(mcsutils.SchedulingServiceName)
}
} else if !c.schedulingController.running.Load() {
c.coordinator = schedule.NewCoordinator(c.ctx, c, c.GetHeartbeatStreams())
c.startSchedulingJobs()
c.enabledServices.Delete(mcsutils.SchedulingServiceName)
}
Expand Down Expand Up @@ -996,7 +1013,7 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error {
// Save to cache if meta or leader is updated, or contains any down/pending peer.
// Mark isNew if the region in cache does not have leader.
isNew, saveKV, saveCache, needSync := regionGuide(region, origin)
if !c.IsServiceEnabled(mcsutils.SchedulingServiceName) && !saveKV && !saveCache && !isNew {
if !saveKV && !saveCache && !isNew {
// Due to some config changes need to update the region stats as well,
// so we do some extra checks here.
if hasRegionStats && c.regionStats.RegionStatsNeedUpdate(region) {
Expand Down Expand Up @@ -1027,9 +1044,7 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error {
regionUpdateCacheEventCounter.Inc()
}

if !c.IsServiceEnabled(mcsutils.SchedulingServiceName) {
cluster.Collect(c, region, c.GetRegionStores(region), hasRegionStats, isNew, c.IsPrepared())
}
cluster.Collect(c, region, c.GetRegionStores(region), hasRegionStats, isNew, c.IsPrepared())

if c.storage != nil {
// If there are concurrent heartbeats from the same region, the last write will win even if
Expand Down
17 changes: 10 additions & 7 deletions server/cluster/scheduling_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,10 @@ import (
)

type schedulingController struct {
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
parentCtx context.Context
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
*core.BasicCluster
opt *config.PersistOptions
coordinator *schedule.Coordinator
Expand All @@ -51,14 +52,15 @@ type schedulingController struct {
running atomic.Bool
}

func newSchedulingController(ctx context.Context) *schedulingController {
ctx, cancel := context.WithCancel(ctx)
func newSchedulingController(parentCtx context.Context) *schedulingController {
ctx, cancel := context.WithCancel(parentCtx)
return &schedulingController{
parentCtx: parentCtx,
ctx: ctx,
cancel: cancel,
labelStats: statistics.NewLabelStatistics(),
hotStat: statistics.NewHotStat(ctx),
slowStat: statistics.NewSlowStat(ctx),
hotStat: statistics.NewHotStat(parentCtx),
slowStat: statistics.NewSlowStat(parentCtx),
}
}

Expand All @@ -78,6 +80,7 @@ func (sc *schedulingController) stopSchedulingJobs() {
}

func (sc *schedulingController) startSchedulingJobs() {
sc.ctx, sc.cancel = context.WithCancel(sc.parentCtx)
sc.wg.Add(3)
go sc.runCoordinator()
go sc.runStatsBackgroundJobs()
Expand Down
4 changes: 2 additions & 2 deletions server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -866,7 +866,7 @@ func (s *GrpcServer) StoreHeartbeat(ctx context.Context, request *pdpb.StoreHear
if _, err := cli.StoreHeartbeat(ctx, req); err != nil {
log.Info("forward store heartbeat failed", zap.Error(err))
// reset to let it be updated in the next request
// s.schedulingClient.CompareAndSwap(forwardCli, &schedulingClient{})
s.schedulingClient.CompareAndSwap(forwardCli, &schedulingClient{})
}
}
}
Expand Down Expand Up @@ -1184,7 +1184,7 @@ func (s *GrpcServer) RegionHeartbeat(stream pdpb.PD_RegionHeartbeatServer) error
regionHeartbeatHandleDuration.WithLabelValues(storeAddress, storeLabel).Observe(time.Since(start).Seconds())
regionHeartbeatCounter.WithLabelValues(storeAddress, storeLabel, "report", "ok").Inc()

if s.IsAPIServiceMode() {
if s.IsServiceEnabled(utils.SchedulingServiceName) {
forwardedSchedulingHost, ok := s.GetServicePrimaryAddr(stream.Context(), utils.SchedulingServiceName)
if !ok || len(forwardedSchedulingHost) == 0 {
log.Error("failed to find scheduling service primary address")
Expand Down

0 comments on commit 65f4a45

Please sign in to comment.