Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: unify the usage of independent service #8508

Merged
merged 2 commits into from
Aug 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/utils/apiutil/serverapi/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func (h *redirector) matchMicroServiceRedirectRules(r *http.Request) (bool, stri
for _, rule := range h.microserviceRedirectRules {
// Now we only support checking the scheduling service whether it is independent
if rule.targetServiceName == mcsutils.SchedulingServiceName {
if !h.s.IsServiceIndependent(mcsutils.SchedulingServiceName) {
if !h.s.GetRaftCluster().IsServiceIndependent(mcsutils.SchedulingServiceName) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to unify mcsutils.SchedulingServiceName and utils.SchedulingServiceName as well?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#8476 will change it to constant.xxx

continue
}
}
Expand Down
36 changes: 22 additions & 14 deletions server/api/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,14 @@
return
}
rc.RemoveRegionIfExist(regionID)
if h.svr.IsServiceIndependent(utils.SchedulingServiceName) {
msg := "The region is removed from server cache."
if rc.IsServiceIndependent(utils.SchedulingServiceName) {
err = h.deleteRegionCacheInSchedulingServer(regionID)
if err != nil {
msg = buildMsg(err)

Check warning on line 68 in server/api/admin.go

View check run for this annotation

Codecov / codecov/patch

server/api/admin.go#L68

Added line #L68 was not covered by tests
}
}
msg := "The region is removed from server cache."
h.rd.JSON(w, http.StatusOK, h.buildMsg(msg, err))
h.rd.JSON(w, http.StatusOK, msg)
}

// @Tags admin
Expand Down Expand Up @@ -101,11 +104,15 @@
}
// Remove region from cache.
rc.RemoveRegionIfExist(regionID)
if h.svr.IsServiceIndependent(utils.SchedulingServiceName) {
msg := "The region is removed from server cache and region meta storage."
if rc.IsServiceIndependent(utils.SchedulingServiceName) {

Check warning on line 108 in server/api/admin.go

View check run for this annotation

Codecov / codecov/patch

server/api/admin.go#L107-L108

Added lines #L107 - L108 were not covered by tests
err = h.deleteRegionCacheInSchedulingServer(regionID)
if err != nil {
msg = buildMsg(err)

Check warning on line 111 in server/api/admin.go

View check run for this annotation

Codecov / codecov/patch

server/api/admin.go#L110-L111

Added lines #L110 - L111 were not covered by tests
}
}
msg := "The region is removed from server cache and region meta storage."
h.rd.JSON(w, http.StatusOK, h.buildMsg(msg, err))

h.rd.JSON(w, http.StatusOK, msg)

Check warning on line 115 in server/api/admin.go

View check run for this annotation

Codecov / codecov/patch

server/api/admin.go#L115

Added line #L115 was not covered by tests
}

// @Tags admin
Expand All @@ -117,11 +124,15 @@
var err error
rc := getCluster(r)
rc.ResetRegionCache()
if h.svr.IsServiceIndependent(utils.SchedulingServiceName) {
msg := "All regions are removed from server cache."
if rc.IsServiceIndependent(utils.SchedulingServiceName) {
err = h.deleteRegionCacheInSchedulingServer()
if err != nil {
msg = buildMsg(err)

Check warning on line 131 in server/api/admin.go

View check run for this annotation

Codecov / codecov/patch

server/api/admin.go#L131

Added line #L131 was not covered by tests
}
}
msg := "All regions are removed from server cache."
h.rd.JSON(w, http.StatusOK, h.buildMsg(msg, err))

h.rd.JSON(w, http.StatusOK, msg)
}

// Intentionally no swagger mark as it is supposed to be only used in
Expand Down Expand Up @@ -240,9 +251,6 @@
return nil
}

func (h *adminHandler) buildMsg(msg string, err error) string {
if h.svr.IsServiceIndependent(utils.SchedulingServiceName) && err != nil {
return fmt.Sprintf("This operation was executed in API server but needs to be re-executed on scheduling server due to the following error: %s", err.Error())
}
return msg
func buildMsg(err error) string {
return fmt.Sprintf("This operation was executed in API server but needs to be re-executed on scheduling server due to the following error: %s", err.Error())

Check warning on line 255 in server/api/admin.go

View check run for this annotation

Codecov / codecov/patch

server/api/admin.go#L254-L255

Added lines #L254 - L255 were not covered by tests
}
6 changes: 3 additions & 3 deletions server/api/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func newConfHandler(svr *server.Server, rd *render.Render) *confHandler {
// @Router /config [get]
func (h *confHandler) GetConfig(w http.ResponseWriter, r *http.Request) {
cfg := h.svr.GetConfig()
if h.svr.IsServiceIndependent(utils.SchedulingServiceName) &&
if h.svr.GetRaftCluster().IsServiceIndependent(utils.SchedulingServiceName) &&
r.Header.Get(apiutil.XForbiddenForwardToMicroServiceHeader) != "true" {
schedulingServerConfig, err := h.getSchedulingServerConfig()
if err != nil {
Expand Down Expand Up @@ -336,7 +336,7 @@ func getConfigMap(cfg map[string]any, key []string, value any) map[string]any {
// @Success 200 {object} sc.ScheduleConfig
// @Router /config/schedule [get]
func (h *confHandler) GetScheduleConfig(w http.ResponseWriter, r *http.Request) {
if h.svr.IsServiceIndependent(utils.SchedulingServiceName) &&
if h.svr.GetRaftCluster().IsServiceIndependent(utils.SchedulingServiceName) &&
r.Header.Get(apiutil.XForbiddenForwardToMicroServiceHeader) != "true" {
cfg, err := h.getSchedulingServerConfig()
if err != nil {
Expand Down Expand Up @@ -410,7 +410,7 @@ func (h *confHandler) SetScheduleConfig(w http.ResponseWriter, r *http.Request)
// @Success 200 {object} sc.ReplicationConfig
// @Router /config/replicate [get]
func (h *confHandler) GetReplicationConfig(w http.ResponseWriter, r *http.Request) {
if h.svr.IsServiceIndependent(utils.SchedulingServiceName) &&
if h.svr.GetRaftCluster().IsServiceIndependent(utils.SchedulingServiceName) &&
r.Header.Get(apiutil.XForbiddenForwardToMicroServiceHeader) != "true" {
cfg, err := h.getSchedulingServerConfig()
if err != nil {
Expand Down
28 changes: 22 additions & 6 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,18 +375,18 @@
servers, err := discovery.Discover(c.etcdClient, strconv.FormatUint(c.clusterID, 10), mcsutils.SchedulingServiceName)
if c.opt.GetMicroServiceConfig().IsSchedulingFallbackEnabled() && (err != nil || len(servers) == 0) {
c.startSchedulingJobs(c, c.hbstreams)
c.independentServices.Delete(mcsutils.SchedulingServiceName)
c.UnsetServiceIndependent(mcsutils.SchedulingServiceName)
} else {
if c.stopSchedulingJobs() || c.coordinator == nil {
c.initCoordinator(c.ctx, c, c.hbstreams)
}
if !c.IsServiceIndependent(mcsutils.SchedulingServiceName) {
c.independentServices.Store(mcsutils.SchedulingServiceName, true)
c.SetServiceIndependent(mcsutils.SchedulingServiceName)
}
}
} else {
c.startSchedulingJobs(c, c.hbstreams)
c.independentServices.Delete(mcsutils.SchedulingServiceName)
c.UnsetServiceIndependent(mcsutils.SchedulingServiceName)
}
}

Expand Down Expand Up @@ -2439,9 +2439,25 @@

// IsServiceIndependent returns whether the service is independent.
func (c *RaftCluster) IsServiceIndependent(name string) bool {
independent, exist := c.independentServices.Load(name)
if !exist {
if c == nil {
return false
}
return independent.(bool)
_, exist := c.independentServices.Load(name)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need not to check the result?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no

return exist
}

// SetServiceIndependent sets the service to be independent.
func (c *RaftCluster) SetServiceIndependent(name string) {
if c == nil {
return

Check warning on line 2452 in server/cluster/cluster.go

View check run for this annotation

Codecov / codecov/patch

server/cluster/cluster.go#L2452

Added line #L2452 was not covered by tests
}
c.independentServices.Store(name, struct{}{})
}

// UnsetServiceIndependent unsets the service to be independent.
func (c *RaftCluster) UnsetServiceIndependent(name string) {
if c == nil {
return

Check warning on line 2460 in server/cluster/cluster.go

View check run for this annotation

Codecov / codecov/patch

server/cluster/cluster.go#L2460

Added line #L2460 was not covered by tests
}
c.independentServices.Delete(name)
}
55 changes: 30 additions & 25 deletions server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -951,7 +951,7 @@

s.handleDamagedStore(request.GetStats())
storeHeartbeatHandleDuration.WithLabelValues(storeAddress, storeLabel).Observe(time.Since(start).Seconds())
if s.IsServiceIndependent(utils.SchedulingServiceName) {
if rc.IsServiceIndependent(utils.SchedulingServiceName) {
forwardCli, _ := s.updateSchedulingClient(ctx)
cli := forwardCli.getClient()
if cli != nil {
Expand Down Expand Up @@ -1307,7 +1307,7 @@
regionHeartbeatHandleDuration.WithLabelValues(storeAddress, storeLabel).Observe(time.Since(start).Seconds())
regionHeartbeatCounter.WithLabelValues(storeAddress, storeLabel, "report", "ok").Inc()

if s.IsServiceIndependent(utils.SchedulingServiceName) {
if rc.IsServiceIndependent(utils.SchedulingServiceName) {
if forwardErrCh != nil {
select {
case err, ok := <-forwardErrCh:
Expand Down Expand Up @@ -1786,7 +1786,13 @@
}, nil
}
}
if s.IsServiceIndependent(utils.SchedulingServiceName) {

rc := s.GetRaftCluster()
if rc == nil {
return &pdpb.AskBatchSplitResponse{Header: s.notBootstrappedHeader()}, nil

Check warning on line 1792 in server/grpc_service.go

View check run for this annotation

Codecov / codecov/patch

server/grpc_service.go#L1790-L1792

Added lines #L1790 - L1792 were not covered by tests
}

if rc.IsServiceIndependent(utils.SchedulingServiceName) {

Check warning on line 1795 in server/grpc_service.go

View check run for this annotation

Codecov / codecov/patch

server/grpc_service.go#L1795

Added line #L1795 was not covered by tests
forwardCli, err := s.updateSchedulingClient(ctx)
if err != nil {
return &pdpb.AskBatchSplitResponse{
Expand Down Expand Up @@ -1821,11 +1827,6 @@
return rsp.(*pdpb.AskBatchSplitResponse), err
}

rc := s.GetRaftCluster()
if rc == nil {
return &pdpb.AskBatchSplitResponse{Header: s.notBootstrappedHeader()}, nil
}

if !versioninfo.IsFeatureSupported(rc.GetOpts().GetClusterVersion(), versioninfo.BatchSplit) {
return &pdpb.AskBatchSplitResponse{Header: s.incompatibleVersion("batch_split")}, nil
}
Expand Down Expand Up @@ -2015,7 +2016,13 @@
}, nil
}
}
if s.IsServiceIndependent(utils.SchedulingServiceName) {

rc := s.GetRaftCluster()
if rc == nil {
return &pdpb.ScatterRegionResponse{Header: s.notBootstrappedHeader()}, nil

Check warning on line 2022 in server/grpc_service.go

View check run for this annotation

Codecov / codecov/patch

server/grpc_service.go#L2022

Added line #L2022 was not covered by tests
}

if rc.IsServiceIndependent(utils.SchedulingServiceName) {
forwardCli, err := s.updateSchedulingClient(ctx)
if err != nil {
return &pdpb.ScatterRegionResponse{
Expand Down Expand Up @@ -2067,11 +2074,6 @@
return rsp.(*pdpb.ScatterRegionResponse), err
}

rc := s.GetRaftCluster()
if rc == nil {
return &pdpb.ScatterRegionResponse{Header: s.notBootstrappedHeader()}, nil
}

if len(request.GetRegionsId()) > 0 {
percentage, err := scatterRegions(rc, request.GetRegionsId(), request.GetGroup(), int(request.GetRetryLimit()), request.GetSkipStoreLimit())
if err != nil {
Expand Down Expand Up @@ -2292,7 +2294,13 @@
}, nil
}
}
if s.IsServiceIndependent(utils.SchedulingServiceName) {

rc := s.GetRaftCluster()
if rc == nil {
return &pdpb.GetOperatorResponse{Header: s.notBootstrappedHeader()}, nil

Check warning on line 2300 in server/grpc_service.go

View check run for this annotation

Codecov / codecov/patch

server/grpc_service.go#L2300

Added line #L2300 was not covered by tests
}

if rc.IsServiceIndependent(utils.SchedulingServiceName) {
forwardCli, err := s.updateSchedulingClient(ctx)
if err != nil {
return &pdpb.GetOperatorResponse{
Expand Down Expand Up @@ -2327,11 +2335,6 @@
return rsp.(*pdpb.GetOperatorResponse), err
}

rc := s.GetRaftCluster()
if rc == nil {
return &pdpb.GetOperatorResponse{Header: s.notBootstrappedHeader()}, nil
}

opController := rc.GetOperatorController()
requestID := request.GetRegionId()
r := opController.GetOperatorStatus(requestID)
Expand Down Expand Up @@ -2611,7 +2614,13 @@
}, nil
}
}
if s.IsServiceIndependent(utils.SchedulingServiceName) {

rc := s.GetRaftCluster()
if rc == nil {
return &pdpb.SplitRegionsResponse{Header: s.notBootstrappedHeader()}, nil

Check warning on line 2620 in server/grpc_service.go

View check run for this annotation

Codecov / codecov/patch

server/grpc_service.go#L2618-L2620

Added lines #L2618 - L2620 were not covered by tests
}

if rc.IsServiceIndependent(utils.SchedulingServiceName) {

Check warning on line 2623 in server/grpc_service.go

View check run for this annotation

Codecov / codecov/patch

server/grpc_service.go#L2623

Added line #L2623 was not covered by tests
forwardCli, err := s.updateSchedulingClient(ctx)
if err != nil {
return &pdpb.SplitRegionsResponse{
Expand Down Expand Up @@ -2648,10 +2657,6 @@
return rsp.(*pdpb.SplitRegionsResponse), err
}

rc := s.GetRaftCluster()
if rc == nil {
return &pdpb.SplitRegionsResponse{Header: s.notBootstrappedHeader()}, nil
}
finishedPercentage, newRegionIDs := rc.GetRegionSplitter().SplitRegions(ctx, request.GetSplitKeys(), int(request.GetRetryLimit()))
return &pdpb.SplitRegionsResponse{
Header: s.header(),
Expand Down
9 changes: 0 additions & 9 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1465,15 +1465,6 @@ func (s *Server) GetRegions() []*core.RegionInfo {
return nil
}

// IsServiceIndependent returns if the service is enabled
func (s *Server) IsServiceIndependent(name string) bool {
rc := s.GetRaftCluster()
if rc != nil {
return rc.IsServiceIndependent(name)
}
return false
}

// GetServiceLabels returns ApiAccessPaths by given service label
// TODO: this function will be used for updating api rate limit config
func (s *Server) GetServiceLabels(serviceLabel string) []apiutil.AccessPath {
Expand Down