Skip to content

Commit

Permalink
fix race
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <rleungx@gmail.com>
  • Loading branch information
rleungx committed Nov 24, 2023
1 parent 17c5f81 commit 01e7080
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 13 deletions.
5 changes: 5 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ error = '''
redirect failed
'''

["PD:apiutil:ErrRedirectToNotLeader"]
error = '''
redirect to not leader
'''

["PD:autoscaling:ErrEmptyMetricsResponse"]
error = '''
metrics response from Prometheus is empty
Expand Down
2 changes: 2 additions & 0 deletions pkg/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,8 @@ var (
var (
ErrRedirect = errors.Normalize("redirect failed", errors.RFCCodeText("PD:apiutil:ErrRedirect"))
ErrOptionNotExist = errors.Normalize("the option %s does not exist", errors.RFCCodeText("PD:apiutil:ErrOptionNotExist"))
// ErrRedirectToNotLeader is the error message for redirect to not leader.
ErrRedirectToNotLeader = errors.Normalize("redirect to not leader", errors.RFCCodeText("PD:apiutil:ErrRedirectToNotLeader"))
)

// grpcutil errors
Expand Down
7 changes: 1 addition & 6 deletions pkg/utils/apiutil/apiutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,6 @@ const (
// ForwardToMicroServiceHeader is used to mark the request is forwarded to micro service.
ForwardToMicroServiceHeader = "Forward-To-Micro-Service"

// ErrRedirectFailed is the error message for redirect failed.
ErrRedirectFailed = "redirect failed"
// ErrRedirectToNotLeader is the error message for redirect to not leader.
ErrRedirectToNotLeader = "redirect to not leader"

chunkSize = 4096
)

Expand Down Expand Up @@ -459,7 +454,7 @@ func (p *customReverseProxies) ServeHTTP(w http.ResponseWriter, r *http.Request)
}
return
}
http.Error(w, ErrRedirectFailed, http.StatusInternalServerError)
http.Error(w, errs.ErrRedirect.FastGenByArgs().Error(), http.StatusInternalServerError)

Check warning on line 457 in pkg/utils/apiutil/apiutil.go

View check run for this annotation

Codecov / codecov/patch

pkg/utils/apiutil/apiutil.go#L457

Added line #L457 was not covered by tests
}

// copyHeader duplicates the HTTP headers from the source `src` to the destination `dst`.
Expand Down
12 changes: 9 additions & 3 deletions pkg/utils/apiutil/serverapi/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,15 +162,21 @@ func (h *redirector) matchMicroServiceRedirectRules(r *http.Request) (bool, stri
func (h *redirector) ServeHTTP(w http.ResponseWriter, r *http.Request, next http.HandlerFunc) {
redirectToMicroService, targetAddr := h.matchMicroServiceRedirectRules(r)
allowFollowerHandle := len(r.Header.Get(apiutil.PDAllowFollowerHandleHeader)) > 0
if !h.s.IsClosed() && (allowFollowerHandle || h.s.GetMember().IsLeader()) && !redirectToMicroService {

if h.s.IsClosed() {
http.Error(w, errs.ErrServerNotStarted.FastGenByArgs().Error(), http.StatusInternalServerError)
return
}

if (allowFollowerHandle || h.s.GetMember().IsLeader()) && !redirectToMicroService {
next(w, r)
return
}

// Prevent more than one redirection.
if name := r.Header.Get(apiutil.PDRedirectorHeader); len(name) != 0 {
log.Error("redirect but server is not leader", zap.String("from", name), zap.String("server", h.s.Name()), errs.ZapError(errs.ErrRedirect))
http.Error(w, apiutil.ErrRedirectToNotLeader, http.StatusInternalServerError)
http.Error(w, errs.ErrRedirectToNotLeader.FastGenByArgs().Error(), http.StatusInternalServerError)
return
}

Expand All @@ -189,7 +195,7 @@ func (h *redirector) ServeHTTP(w http.ResponseWriter, r *http.Request, next http
var clientUrls []string
if redirectToMicroService {
if len(targetAddr) == 0 {
http.Error(w, apiutil.ErrRedirectFailed, http.StatusInternalServerError)
http.Error(w, errs.ErrRedirect.FastGenByArgs().Error(), http.StatusInternalServerError)

Check warning on line 198 in pkg/utils/apiutil/serverapi/middleware.go

View check run for this annotation

Codecov / codecov/patch

pkg/utils/apiutil/serverapi/middleware.go#L198

Added line #L198 was not covered by tests
return
}
clientUrls = append(clientUrls, targetAddr)
Expand Down
12 changes: 8 additions & 4 deletions server/apiv2/middlewares/redirector.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,13 @@ import (
func Redirector() gin.HandlerFunc {
return func(c *gin.Context) {
svr := c.MustGet(ServerContextKey).(*server.Server)

if svr.IsClosed() {
c.AbortWithStatusJSON(http.StatusInternalServerError, errs.ErrServerNotStarted.FastGenByArgs().Error())
return

Check warning on line 36 in server/apiv2/middlewares/redirector.go

View check run for this annotation

Codecov / codecov/patch

server/apiv2/middlewares/redirector.go#L35-L36

Added lines #L35 - L36 were not covered by tests
}
allowFollowerHandle := len(c.Request.Header.Get(apiutil.PDAllowFollowerHandleHeader)) > 0
if !svr.IsClosed() && (allowFollowerHandle || svr.GetMember().IsLeader()) {
if allowFollowerHandle || svr.GetMember().IsLeader() {
c.Next()
return
}
Expand All @@ -45,12 +50,11 @@ func Redirector() gin.HandlerFunc {

c.Request.Header.Set(apiutil.PDRedirectorHeader, svr.Name())

leader := svr.GetMember().GetLeader()
if leader == nil {
if svr.GetMember().GetLeader() == nil {
c.AbortWithStatusJSON(http.StatusServiceUnavailable, errs.ErrLeaderNil.FastGenByArgs().Error())
return
}
clientUrls := leader.GetClientUrls()
clientUrls := svr.GetMember().GetLeader().GetClientUrls()
urls := make([]url.URL, 0, len(clientUrls))
for _, item := range clientUrls {
u, err := url.Parse(item)
Expand Down

0 comments on commit 01e7080

Please sign in to comment.