From 5931eceb629f6f83cfdc09870f988021e95fdd14 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Fri, 9 Aug 2024 15:48:08 +0800 Subject: [PATCH 1/2] unify the usage of independent service Signed-off-by: Ryan Leung --- pkg/utils/apiutil/serverapi/middleware.go | 2 +- server/api/admin.go | 36 ++++++++------- server/api/config.go | 6 +-- server/cluster/cluster.go | 28 +++++++++--- server/grpc_service.go | 55 ++++++++++++----------- server/server.go | 9 ---- 6 files changed, 76 insertions(+), 60 deletions(-) diff --git a/pkg/utils/apiutil/serverapi/middleware.go b/pkg/utils/apiutil/serverapi/middleware.go index 9af8d234b34..3b0d776dfd6 100644 --- a/pkg/utils/apiutil/serverapi/middleware.go +++ b/pkg/utils/apiutil/serverapi/middleware.go @@ -129,7 +129,7 @@ func (h *redirector) matchMicroServiceRedirectRules(r *http.Request) (bool, stri for _, rule := range h.microserviceRedirectRules { // Now we only support checking the scheduling service whether it is independent if rule.targetServiceName == mcsutils.SchedulingServiceName { - if !h.s.IsServiceIndependent(mcsutils.SchedulingServiceName) { + if !h.s.GetRaftCluster().IsServiceIndependent(mcsutils.SchedulingServiceName) { continue } } diff --git a/server/api/admin.go b/server/api/admin.go index dfaecbec755..7dd041165f2 100644 --- a/server/api/admin.go +++ b/server/api/admin.go @@ -61,11 +61,14 @@ func (h *adminHandler) DeleteRegionCache(w http.ResponseWriter, r *http.Request) return } rc.RemoveRegionIfExist(regionID) - if h.svr.IsServiceIndependent(utils.SchedulingServiceName) { + msg := "The region is removed from server cache." + if rc.IsServiceIndependent(utils.SchedulingServiceName) { err = h.deleteRegionCacheInSchedulingServer(regionID) + if err != nil { + msg = fmt.Sprintf("This operation was executed in API server but needs to be re-executed on scheduling server due to the following error: %s", err.Error()) + } } - msg := "The region is removed from server cache." - h.rd.JSON(w, http.StatusOK, h.buildMsg(msg, err)) + h.rd.JSON(w, http.StatusOK, msg) } // @Tags admin @@ -101,11 +104,15 @@ func (h *adminHandler) DeleteRegionStorage(w http.ResponseWriter, r *http.Reques } // Remove region from cache. rc.RemoveRegionIfExist(regionID) - if h.svr.IsServiceIndependent(utils.SchedulingServiceName) { + msg := "The region is removed from server cache and region meta storage." + if rc.IsServiceIndependent(utils.SchedulingServiceName) { err = h.deleteRegionCacheInSchedulingServer(regionID) + if err != nil { + msg = fmt.Sprintf("This operation was executed in API server but needs to be re-executed on scheduling server due to the following error: %s", err.Error()) + } } - msg := "The region is removed from server cache and region meta storage." - h.rd.JSON(w, http.StatusOK, h.buildMsg(msg, err)) + + h.rd.JSON(w, http.StatusOK, msg) } // @Tags admin @@ -117,11 +124,15 @@ func (h *adminHandler) DeleteAllRegionCache(w http.ResponseWriter, r *http.Reque var err error rc := getCluster(r) rc.ResetRegionCache() - if h.svr.IsServiceIndependent(utils.SchedulingServiceName) { + msg := "All regions are removed from server cache." + if rc.IsServiceIndependent(utils.SchedulingServiceName) { err = h.deleteRegionCacheInSchedulingServer() + if err != nil { + msg = fmt.Sprintf("This operation was executed in API server but needs to be re-executed on scheduling server due to the following error: %s", err.Error()) + } } - msg := "All regions are removed from server cache." - h.rd.JSON(w, http.StatusOK, h.buildMsg(msg, err)) + + h.rd.JSON(w, http.StatusOK, msg) } // Intentionally no swagger mark as it is supposed to be only used in @@ -239,10 +250,3 @@ func (h *adminHandler) deleteRegionCacheInSchedulingServer(id ...uint64) error { } return nil } - -func (h *adminHandler) buildMsg(msg string, err error) string { - if h.svr.IsServiceIndependent(utils.SchedulingServiceName) && err != nil { - return fmt.Sprintf("This operation was executed in API server but needs to be re-executed on scheduling server due to the following error: %s", err.Error()) - } - return msg -} diff --git a/server/api/config.go b/server/api/config.go index 9f568221d89..88cb17783bc 100644 --- a/server/api/config.go +++ b/server/api/config.go @@ -62,7 +62,7 @@ func newConfHandler(svr *server.Server, rd *render.Render) *confHandler { // @Router /config [get] func (h *confHandler) GetConfig(w http.ResponseWriter, r *http.Request) { cfg := h.svr.GetConfig() - if h.svr.IsServiceIndependent(utils.SchedulingServiceName) && + if h.svr.GetRaftCluster().IsServiceIndependent(utils.SchedulingServiceName) && r.Header.Get(apiutil.XForbiddenForwardToMicroServiceHeader) != "true" { schedulingServerConfig, err := h.getSchedulingServerConfig() if err != nil { @@ -336,7 +336,7 @@ func getConfigMap(cfg map[string]any, key []string, value any) map[string]any { // @Success 200 {object} sc.ScheduleConfig // @Router /config/schedule [get] func (h *confHandler) GetScheduleConfig(w http.ResponseWriter, r *http.Request) { - if h.svr.IsServiceIndependent(utils.SchedulingServiceName) && + if h.svr.GetRaftCluster().IsServiceIndependent(utils.SchedulingServiceName) && r.Header.Get(apiutil.XForbiddenForwardToMicroServiceHeader) != "true" { cfg, err := h.getSchedulingServerConfig() if err != nil { @@ -410,7 +410,7 @@ func (h *confHandler) SetScheduleConfig(w http.ResponseWriter, r *http.Request) // @Success 200 {object} sc.ReplicationConfig // @Router /config/replicate [get] func (h *confHandler) GetReplicationConfig(w http.ResponseWriter, r *http.Request) { - if h.svr.IsServiceIndependent(utils.SchedulingServiceName) && + if h.svr.GetRaftCluster().IsServiceIndependent(utils.SchedulingServiceName) && r.Header.Get(apiutil.XForbiddenForwardToMicroServiceHeader) != "true" { cfg, err := h.getSchedulingServerConfig() if err != nil { diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index c8013c63e2d..2150fe992ad 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -375,18 +375,18 @@ func (c *RaftCluster) checkServices() { servers, err := discovery.Discover(c.etcdClient, strconv.FormatUint(c.clusterID, 10), mcsutils.SchedulingServiceName) if c.opt.GetMicroServiceConfig().IsSchedulingFallbackEnabled() && (err != nil || len(servers) == 0) { c.startSchedulingJobs(c, c.hbstreams) - c.independentServices.Delete(mcsutils.SchedulingServiceName) + c.UnsetServiceIndependent(mcsutils.SchedulingServiceName) } else { if c.stopSchedulingJobs() || c.coordinator == nil { c.initCoordinator(c.ctx, c, c.hbstreams) } if !c.IsServiceIndependent(mcsutils.SchedulingServiceName) { - c.independentServices.Store(mcsutils.SchedulingServiceName, true) + c.SetServiceIndependent(mcsutils.SchedulingServiceName) } } } else { c.startSchedulingJobs(c, c.hbstreams) - c.independentServices.Delete(mcsutils.SchedulingServiceName) + c.UnsetServiceIndependent(mcsutils.SchedulingServiceName) } } @@ -2439,9 +2439,25 @@ func IsClientURL(addr string, etcdClient *clientv3.Client) bool { // IsServiceIndependent returns whether the service is independent. func (c *RaftCluster) IsServiceIndependent(name string) bool { - independent, exist := c.independentServices.Load(name) - if !exist { + if c == nil { return false } - return independent.(bool) + _, exist := c.independentServices.Load(name) + return exist +} + +// SetServiceIndependent sets the service to be independent. +func (c *RaftCluster) SetServiceIndependent(name string) { + if c == nil { + return + } + c.independentServices.Store(name, struct{}{}) +} + +// UnsetServiceIndependent unsets the service to be independent. +func (c *RaftCluster) UnsetServiceIndependent(name string) { + if c == nil { + return + } + c.independentServices.Delete(name) } diff --git a/server/grpc_service.go b/server/grpc_service.go index fee5e0e3355..21126addd24 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -951,7 +951,7 @@ func (s *GrpcServer) StoreHeartbeat(ctx context.Context, request *pdpb.StoreHear s.handleDamagedStore(request.GetStats()) storeHeartbeatHandleDuration.WithLabelValues(storeAddress, storeLabel).Observe(time.Since(start).Seconds()) - if s.IsServiceIndependent(utils.SchedulingServiceName) { + if rc.IsServiceIndependent(utils.SchedulingServiceName) { forwardCli, _ := s.updateSchedulingClient(ctx) cli := forwardCli.getClient() if cli != nil { @@ -1307,7 +1307,7 @@ func (s *GrpcServer) RegionHeartbeat(stream pdpb.PD_RegionHeartbeatServer) error regionHeartbeatHandleDuration.WithLabelValues(storeAddress, storeLabel).Observe(time.Since(start).Seconds()) regionHeartbeatCounter.WithLabelValues(storeAddress, storeLabel, "report", "ok").Inc() - if s.IsServiceIndependent(utils.SchedulingServiceName) { + if rc.IsServiceIndependent(utils.SchedulingServiceName) { if forwardErrCh != nil { select { case err, ok := <-forwardErrCh: @@ -1786,7 +1786,13 @@ func (s *GrpcServer) AskBatchSplit(ctx context.Context, request *pdpb.AskBatchSp }, nil } } - if s.IsServiceIndependent(utils.SchedulingServiceName) { + + rc := s.GetRaftCluster() + if rc == nil { + return &pdpb.AskBatchSplitResponse{Header: s.notBootstrappedHeader()}, nil + } + + if rc.IsServiceIndependent(utils.SchedulingServiceName) { forwardCli, err := s.updateSchedulingClient(ctx) if err != nil { return &pdpb.AskBatchSplitResponse{ @@ -1821,11 +1827,6 @@ func (s *GrpcServer) AskBatchSplit(ctx context.Context, request *pdpb.AskBatchSp return rsp.(*pdpb.AskBatchSplitResponse), err } - rc := s.GetRaftCluster() - if rc == nil { - return &pdpb.AskBatchSplitResponse{Header: s.notBootstrappedHeader()}, nil - } - if !versioninfo.IsFeatureSupported(rc.GetOpts().GetClusterVersion(), versioninfo.BatchSplit) { return &pdpb.AskBatchSplitResponse{Header: s.incompatibleVersion("batch_split")}, nil } @@ -2015,7 +2016,13 @@ func (s *GrpcServer) ScatterRegion(ctx context.Context, request *pdpb.ScatterReg }, nil } } - if s.IsServiceIndependent(utils.SchedulingServiceName) { + + rc := s.GetRaftCluster() + if rc == nil { + return &pdpb.ScatterRegionResponse{Header: s.notBootstrappedHeader()}, nil + } + + if rc.IsServiceIndependent(utils.SchedulingServiceName) { forwardCli, err := s.updateSchedulingClient(ctx) if err != nil { return &pdpb.ScatterRegionResponse{ @@ -2067,11 +2074,6 @@ func (s *GrpcServer) ScatterRegion(ctx context.Context, request *pdpb.ScatterReg return rsp.(*pdpb.ScatterRegionResponse), err } - rc := s.GetRaftCluster() - if rc == nil { - return &pdpb.ScatterRegionResponse{Header: s.notBootstrappedHeader()}, nil - } - if len(request.GetRegionsId()) > 0 { percentage, err := scatterRegions(rc, request.GetRegionsId(), request.GetGroup(), int(request.GetRetryLimit()), request.GetSkipStoreLimit()) if err != nil { @@ -2292,7 +2294,13 @@ func (s *GrpcServer) GetOperator(ctx context.Context, request *pdpb.GetOperatorR }, nil } } - if s.IsServiceIndependent(utils.SchedulingServiceName) { + + rc := s.GetRaftCluster() + if rc == nil { + return &pdpb.GetOperatorResponse{Header: s.notBootstrappedHeader()}, nil + } + + if rc.IsServiceIndependent(utils.SchedulingServiceName) { forwardCli, err := s.updateSchedulingClient(ctx) if err != nil { return &pdpb.GetOperatorResponse{ @@ -2327,11 +2335,6 @@ func (s *GrpcServer) GetOperator(ctx context.Context, request *pdpb.GetOperatorR return rsp.(*pdpb.GetOperatorResponse), err } - rc := s.GetRaftCluster() - if rc == nil { - return &pdpb.GetOperatorResponse{Header: s.notBootstrappedHeader()}, nil - } - opController := rc.GetOperatorController() requestID := request.GetRegionId() r := opController.GetOperatorStatus(requestID) @@ -2611,7 +2614,13 @@ func (s *GrpcServer) SplitRegions(ctx context.Context, request *pdpb.SplitRegion }, nil } } - if s.IsServiceIndependent(utils.SchedulingServiceName) { + + rc := s.GetRaftCluster() + if rc == nil { + return &pdpb.SplitRegionsResponse{Header: s.notBootstrappedHeader()}, nil + } + + if rc.IsServiceIndependent(utils.SchedulingServiceName) { forwardCli, err := s.updateSchedulingClient(ctx) if err != nil { return &pdpb.SplitRegionsResponse{ @@ -2648,10 +2657,6 @@ func (s *GrpcServer) SplitRegions(ctx context.Context, request *pdpb.SplitRegion return rsp.(*pdpb.SplitRegionsResponse), err } - rc := s.GetRaftCluster() - if rc == nil { - return &pdpb.SplitRegionsResponse{Header: s.notBootstrappedHeader()}, nil - } finishedPercentage, newRegionIDs := rc.GetRegionSplitter().SplitRegions(ctx, request.GetSplitKeys(), int(request.GetRetryLimit())) return &pdpb.SplitRegionsResponse{ Header: s.header(), diff --git a/server/server.go b/server/server.go index c10d15e90b1..a08d31b2945 100644 --- a/server/server.go +++ b/server/server.go @@ -1465,15 +1465,6 @@ func (s *Server) GetRegions() []*core.RegionInfo { return nil } -// IsServiceIndependent returns if the service is enabled -func (s *Server) IsServiceIndependent(name string) bool { - rc := s.GetRaftCluster() - if rc != nil { - return rc.IsServiceIndependent(name) - } - return false -} - // GetServiceLabels returns ApiAccessPaths by given service label // TODO: this function will be used for updating api rate limit config func (s *Server) GetServiceLabels(serviceLabel string) []apiutil.AccessPath { From 21ac3a9f9d347904557b52aa45df52db7fdc8377 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Mon, 12 Aug 2024 10:39:12 +0800 Subject: [PATCH 2/2] fix Signed-off-by: Ryan Leung --- server/api/admin.go | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/server/api/admin.go b/server/api/admin.go index 7dd041165f2..15d5d40412a 100644 --- a/server/api/admin.go +++ b/server/api/admin.go @@ -65,7 +65,7 @@ func (h *adminHandler) DeleteRegionCache(w http.ResponseWriter, r *http.Request) if rc.IsServiceIndependent(utils.SchedulingServiceName) { err = h.deleteRegionCacheInSchedulingServer(regionID) if err != nil { - msg = fmt.Sprintf("This operation was executed in API server but needs to be re-executed on scheduling server due to the following error: %s", err.Error()) + msg = buildMsg(err) } } h.rd.JSON(w, http.StatusOK, msg) @@ -108,7 +108,7 @@ func (h *adminHandler) DeleteRegionStorage(w http.ResponseWriter, r *http.Reques if rc.IsServiceIndependent(utils.SchedulingServiceName) { err = h.deleteRegionCacheInSchedulingServer(regionID) if err != nil { - msg = fmt.Sprintf("This operation was executed in API server but needs to be re-executed on scheduling server due to the following error: %s", err.Error()) + msg = buildMsg(err) } } @@ -128,7 +128,7 @@ func (h *adminHandler) DeleteAllRegionCache(w http.ResponseWriter, r *http.Reque if rc.IsServiceIndependent(utils.SchedulingServiceName) { err = h.deleteRegionCacheInSchedulingServer() if err != nil { - msg = fmt.Sprintf("This operation was executed in API server but needs to be re-executed on scheduling server due to the following error: %s", err.Error()) + msg = buildMsg(err) } } @@ -250,3 +250,7 @@ func (h *adminHandler) deleteRegionCacheInSchedulingServer(id ...uint64) error { } return nil } + +func buildMsg(err error) string { + return fmt.Sprintf("This operation was executed in API server but needs to be re-executed on scheduling server due to the following error: %s", err.Error()) +}