diff --git a/errors.toml b/errors.toml index b6123058310..a318fc32492 100644 --- a/errors.toml +++ b/errors.toml @@ -16,6 +16,11 @@ error = ''' redirect failed ''' +["PD:apiutil:ErrRedirectToNotLeader"] +error = ''' +redirect to not leader +''' + ["PD:autoscaling:ErrEmptyMetricsResponse"] error = ''' metrics response from Prometheus is empty diff --git a/pkg/errs/errno.go b/pkg/errs/errno.go index b8a882cd187..a4320238374 100644 --- a/pkg/errs/errno.go +++ b/pkg/errs/errno.go @@ -197,6 +197,8 @@ var ( var ( ErrRedirect = errors.Normalize("redirect failed", errors.RFCCodeText("PD:apiutil:ErrRedirect")) ErrOptionNotExist = errors.Normalize("the option %s does not exist", errors.RFCCodeText("PD:apiutil:ErrOptionNotExist")) + // ErrRedirectToNotLeader is the error message for redirect to not leader. + ErrRedirectToNotLeader = errors.Normalize("redirect to not leader", errors.RFCCodeText("PD:apiutil:ErrRedirectToNotLeader")) ) // grpcutil errors diff --git a/pkg/utils/apiutil/apiutil.go b/pkg/utils/apiutil/apiutil.go index 8928688fed9..164b3c0783d 100644 --- a/pkg/utils/apiutil/apiutil.go +++ b/pkg/utils/apiutil/apiutil.go @@ -61,11 +61,6 @@ const ( // ForwardToMicroServiceHeader is used to mark the request is forwarded to micro service. ForwardToMicroServiceHeader = "Forward-To-Micro-Service" - // ErrRedirectFailed is the error message for redirect failed. - ErrRedirectFailed = "redirect failed" - // ErrRedirectToNotLeader is the error message for redirect to not leader. - ErrRedirectToNotLeader = "redirect to not leader" - chunkSize = 4096 ) @@ -459,7 +454,7 @@ func (p *customReverseProxies) ServeHTTP(w http.ResponseWriter, r *http.Request) } return } - http.Error(w, ErrRedirectFailed, http.StatusInternalServerError) + http.Error(w, errs.ErrRedirect.FastGenByArgs().Error(), http.StatusInternalServerError) } // copyHeader duplicates the HTTP headers from the source `src` to the destination `dst`. diff --git a/pkg/utils/apiutil/serverapi/middleware.go b/pkg/utils/apiutil/serverapi/middleware.go index 1494dde27b8..26b36a88ca8 100644 --- a/pkg/utils/apiutil/serverapi/middleware.go +++ b/pkg/utils/apiutil/serverapi/middleware.go @@ -162,7 +162,13 @@ func (h *redirector) matchMicroServiceRedirectRules(r *http.Request) (bool, stri func (h *redirector) ServeHTTP(w http.ResponseWriter, r *http.Request, next http.HandlerFunc) { redirectToMicroService, targetAddr := h.matchMicroServiceRedirectRules(r) allowFollowerHandle := len(r.Header.Get(apiutil.PDAllowFollowerHandleHeader)) > 0 - if !h.s.IsClosed() && (allowFollowerHandle || h.s.GetMember().IsLeader()) && !redirectToMicroService { + + if h.s.IsClosed() { + http.Error(w, errs.ErrServerNotStarted.FastGenByArgs().Error(), http.StatusInternalServerError) + return + } + + if (allowFollowerHandle || h.s.GetMember().IsLeader()) && !redirectToMicroService { next(w, r) return } @@ -170,7 +176,7 @@ func (h *redirector) ServeHTTP(w http.ResponseWriter, r *http.Request, next http // Prevent more than one redirection. if name := r.Header.Get(apiutil.PDRedirectorHeader); len(name) != 0 { log.Error("redirect but server is not leader", zap.String("from", name), zap.String("server", h.s.Name()), errs.ZapError(errs.ErrRedirect)) - http.Error(w, apiutil.ErrRedirectToNotLeader, http.StatusInternalServerError) + http.Error(w, errs.ErrRedirectToNotLeader.FastGenByArgs().Error(), http.StatusInternalServerError) return } @@ -189,7 +195,7 @@ func (h *redirector) ServeHTTP(w http.ResponseWriter, r *http.Request, next http var clientUrls []string if redirectToMicroService { if len(targetAddr) == 0 { - http.Error(w, apiutil.ErrRedirectFailed, http.StatusInternalServerError) + http.Error(w, errs.ErrRedirect.FastGenByArgs().Error(), http.StatusInternalServerError) return } clientUrls = append(clientUrls, targetAddr) diff --git a/scripts/ci-subtask.sh b/scripts/ci-subtask.sh index a2e396088d6..dd29895327b 100755 --- a/scripts/ci-subtask.sh +++ b/scripts/ci-subtask.sh @@ -29,11 +29,22 @@ else weight() { [[ $1 == "github.com/tikv/pd/server/api" ]] && return 30 [[ $1 == "github.com/tikv/pd/pkg/schedule" ]] && return 30 - [[ $1 == "pd/tests/server/api" ]] && return 30 + [[ $1 == "github.com/tikv/pd/pkg/core" ]] && return 30 + [[ $1 == "github.com/tikv/pd/tests/server/api" ]] && return 30 [[ $1 =~ "pd/tests" ]] && return 5 return 1 } + # Create an associative array to store the weight of each task. + declare -A task_weights + for t in ${tasks[@]}; do + weight $t + task_weights[$t]=$? + done + + # Sort tasks by weight in descending order. + tasks=($(printf "%s\n" "${tasks[@]}" | sort -rn)) + scores=($(seq "$1" | xargs -I{} echo 0)) res=() @@ -42,8 +53,7 @@ else for i in ${!scores[@]}; do [[ ${scores[i]} -lt ${scores[$min_i]} ]] && min_i=$i done - weight $t - scores[$min_i]=$((${scores[$min_i]} + $?)) + scores[$min_i]=$((${scores[$min_i]} + ${task_weights[$t]})) [[ $(($min_i + 1)) -eq $2 ]] && res+=($t) done printf "%s " "${res[@]}" diff --git a/server/apiv2/middlewares/redirector.go b/server/apiv2/middlewares/redirector.go index 285f096e823..37c06de1585 100644 --- a/server/apiv2/middlewares/redirector.go +++ b/server/apiv2/middlewares/redirector.go @@ -30,9 +30,13 @@ import ( func Redirector() gin.HandlerFunc { return func(c *gin.Context) { svr := c.MustGet(ServerContextKey).(*server.Server) + + if svr.IsClosed() { + c.AbortWithStatusJSON(http.StatusInternalServerError, errs.ErrServerNotStarted.FastGenByArgs().Error()) + return + } allowFollowerHandle := len(c.Request.Header.Get(apiutil.PDAllowFollowerHandleHeader)) > 0 - isLeader := svr.GetMember().IsLeader() - if !svr.IsClosed() && (allowFollowerHandle || isLeader) { + if allowFollowerHandle || svr.GetMember().IsLeader() { c.Next() return } @@ -46,12 +50,11 @@ func Redirector() gin.HandlerFunc { c.Request.Header.Set(apiutil.PDRedirectorHeader, svr.Name()) - leader := svr.GetMember().GetLeader() - if leader == nil { + if svr.GetMember().GetLeader() == nil { c.AbortWithStatusJSON(http.StatusServiceUnavailable, errs.ErrLeaderNil.FastGenByArgs().Error()) return } - clientUrls := leader.GetClientUrls() + clientUrls := svr.GetMember().GetLeader().GetClientUrls() urls := make([]url.URL, 0, len(clientUrls)) for _, item := range clientUrls { u, err := url.Parse(item) diff --git a/tests/integrations/mcs/keyspace/tso_keyspace_group_test.go b/tests/integrations/mcs/keyspace/tso_keyspace_group_test.go index 1415acc46d1..7dcce498d56 100644 --- a/tests/integrations/mcs/keyspace/tso_keyspace_group_test.go +++ b/tests/integrations/mcs/keyspace/tso_keyspace_group_test.go @@ -301,7 +301,10 @@ func (suite *keyspaceGroupTestSuite) TestDefaultKeyspaceGroup() { return code == http.StatusOK && kg != nil }, testutil.WithWaitFor(time.Second*1)) suite.Equal(utils.DefaultKeyspaceGroupID, kg.ID) - suite.Len(kg.Members, utils.DefaultKeyspaceGroupReplicaCount) + // the allocNodesToAllKeyspaceGroups loop will run every 100ms. + testutil.Eventually(suite.Require(), func() bool { + return len(kg.Members) == utils.DefaultKeyspaceGroupReplicaCount + }) for _, member := range kg.Members { suite.Contains(nodes, member.Address) }