Skip to content
This repository has been archived by the owner on Apr 19, 2024. It is now read-only.

Commit

Permalink
Fix global rates sometimes checked with getLocalRateLimit().
Browse files Browse the repository at this point in the history
  • Loading branch information
Baliedge committed Oct 11, 2023
1 parent e547a49 commit b2b697d
Showing 1 changed file with 26 additions and 26 deletions.
52 changes: 26 additions & 26 deletions gubernator.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ func (s *V1Instance) GetRateLimits(ctx context.Context, r *GetRateLimitsReq) (*G
defer metricConcurrentChecks.Dec()

if len(r.Requests) > maxBatchSize {
metricCheckErrorCounter.WithLabelValues("Request too large").Add(1)
metricCheckErrorCounter.WithLabelValues("Request too large").Inc()
return nil, status.Errorf(codes.OutOfRange,
"Requests.RateLimits list too large; max size is '%d'", maxBatchSize)
}
Expand All @@ -225,13 +225,13 @@ func (s *V1Instance) GetRateLimits(ctx context.Context, r *GetRateLimitsReq) (*G
var err error

if len(req.UniqueKey) == 0 {
metricCheckErrorCounter.WithLabelValues("Invalid request").Add(1)
metricCheckErrorCounter.WithLabelValues("Invalid request").Inc()
resp.Responses[i] = &RateLimitResp{Error: "field 'unique_key' cannot be empty"}
continue
}

if len(req.Name) == 0 {
metricCheckErrorCounter.WithLabelValues("Invalid request").Add(1)
metricCheckErrorCounter.WithLabelValues("Invalid request").Inc()
resp.Responses[i] = &RateLimitResp{Error: "field 'namespace' cannot be empty"}
continue
}
Expand Down Expand Up @@ -260,10 +260,24 @@ func (s *V1Instance) GetRateLimits(ctx context.Context, r *GetRateLimitsReq) (*G
continue
}

if HasBehavior(req.Behavior, Behavior_GLOBAL) {
resp.Responses[i], err = s.getGlobalRateLimit(ctx, req)
if err != nil {
err = errors.Wrap(err, "Error in getGlobalRateLimit")
span := trace.SpanFromContext(ctx)
span.RecordError(err)
resp.Responses[i] = &RateLimitResp{Error: err.Error()}
}

// Inform the client of the owner key of the key
resp.Responses[i].Metadata = map[string]string{"owner": peer.Info().GRPCAddress}
continue
}

// If our server instance is the owner of this rate limit
if peer.Info().IsOwner {
// Apply our rate limit algorithm to the request
metricGetRateLimitCounter.WithLabelValues("local").Add(1)
metricGetRateLimitCounter.WithLabelValues("local").Inc()
funcTimer1 := prometheus.NewTimer(metricFuncTimeDuration.WithLabelValues("V1Instance.getLocalRateLimit (local)"))
resp.Responses[i], err = s.getLocalRateLimit(ctx, req)
funcTimer1.ObserveDuration()
Expand All @@ -274,20 +288,6 @@ func (s *V1Instance) GetRateLimits(ctx context.Context, r *GetRateLimitsReq) (*G
resp.Responses[i] = &RateLimitResp{Error: err.Error()}
}
} else {
if HasBehavior(req.Behavior, Behavior_GLOBAL) {
resp.Responses[i], err = s.getGlobalRateLimit(ctx, req)
if err != nil {
err = errors.Wrap(err, "Error in getGlobalRateLimit")
span := trace.SpanFromContext(ctx)
span.RecordError(err)
resp.Responses[i] = &RateLimitResp{Error: err.Error()}
}

// Inform the client of the owner key of the key
resp.Responses[i].Metadata = map[string]string{"owner": peer.Info().GRPCAddress}
continue
}

// Request must be forwarded to peer that owns the key.
// Launch remote peer request in goroutine.
wg.Add(1)
Expand Down Expand Up @@ -356,7 +356,7 @@ func (s *V1Instance) asyncRequest(ctx context.Context, req *AsyncReq) {
// If we are attempting again, the owner of this rate limit might have changed to us!
if attempts != 0 {
if req.Peer.Info().IsOwner {
metricGetRateLimitCounter.WithLabelValues("local").Add(1)
metricGetRateLimitCounter.WithLabelValues("local").Inc()
resp.Resp, err = s.getLocalRateLimit(ctx, req.Req)
if err != nil {
s.log.WithContext(ctx).
Expand All @@ -371,12 +371,12 @@ func (s *V1Instance) asyncRequest(ctx context.Context, req *AsyncReq) {
}

// Make an RPC call to the peer that owns this rate limit
metricGetRateLimitCounter.WithLabelValues("forward").Add(1)
metricGetRateLimitCounter.WithLabelValues("forward").Inc()
r, err := req.Peer.GetPeerRateLimit(ctx, req.Req)
if err != nil {
if IsNotReady(err) {
attempts++
metricBatchSendRetries.WithLabelValues(req.Req.Name).Add(1)
metricBatchSendRetries.WithLabelValues(req.Req.Name).Inc()
req.Peer, err = s.GetPeer(ctx, req.Key)
if err != nil {
errPart := fmt.Sprintf("Error finding peer that owns rate limit '%s'", req.Key)
Expand Down Expand Up @@ -406,7 +406,7 @@ func (s *V1Instance) asyncRequest(ctx context.Context, req *AsyncReq) {
req.WG.Done()

if isDeadlineExceeded(ctx.Err()) {
metricCheckErrorCounter.WithLabelValues("Timeout forwarding to peer").Add(1)
metricCheckErrorCounter.WithLabelValues("Timeout forwarding to peer").Inc()
}
}

Expand Down Expand Up @@ -446,7 +446,7 @@ func (s *V1Instance) getGlobalRateLimit(ctx context.Context, req *RateLimitReq)
cpy.Behavior = Behavior_NO_BATCHING

// Process the rate limit like we own it
metricGetRateLimitCounter.WithLabelValues("global").Add(1)
metricGetRateLimitCounter.WithLabelValues("global").Inc()
resp, err = s.getLocalRateLimit(ctx, cpy)
if err != nil {
return nil, errors.Wrap(err, "Error in getLocalRateLimit")
Expand Down Expand Up @@ -478,7 +478,7 @@ func (s *V1Instance) UpdatePeerGlobals(ctx context.Context, r *UpdatePeerGlobals
func (s *V1Instance) GetPeerRateLimits(ctx context.Context, r *GetPeerRateLimitsReq) (resp *GetPeerRateLimitsResp, err error) {
if len(r.Requests) > maxBatchSize {
err := fmt.Errorf("'PeerRequest.rate_limits' list too large; max size is '%d'", maxBatchSize)
metricCheckErrorCounter.WithLabelValues("Request too large").Add(1)
metricCheckErrorCounter.WithLabelValues("Request too large").Inc()
return nil, status.Error(codes.OutOfRange, err.Error())
}

Expand Down Expand Up @@ -778,11 +778,11 @@ func SetBehavior(b *Behavior, flag Behavior, set bool) {
func countError(err error, defaultType string) {
for {
if err == nil {
metricCheckErrorCounter.WithLabelValues(defaultType).Add(1)
metricCheckErrorCounter.WithLabelValues(defaultType).Inc()
return
}
if errors.Is(err, context.DeadlineExceeded) {
metricCheckErrorCounter.WithLabelValues("Timeout").Add(1)
metricCheckErrorCounter.WithLabelValues("Timeout").Inc()
return
}

Expand Down

0 comments on commit b2b697d

Please sign in to comment.