-
Notifications
You must be signed in to change notification settings - Fork 83
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Unique limiters for each API listener #1904
Changes from 2 commits
08fc24a
716e8d3
2e626b8
676a12b
bfb16e6
b6b7ca2
ec7e9bf
71193cf
8b78ecc
bd1475e
d1dde1f
4c3efa2
0be91b4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -25,7 +25,6 @@ import ( | |
"github.com/elastic/fleet-server/v7/internal/pkg/config" | ||
"github.com/elastic/fleet-server/v7/internal/pkg/dl" | ||
"github.com/elastic/fleet-server/v7/internal/pkg/es" | ||
"github.com/elastic/fleet-server/v7/internal/pkg/limit" | ||
"github.com/elastic/fleet-server/v7/internal/pkg/logger" | ||
"github.com/elastic/fleet-server/v7/internal/pkg/model" | ||
"github.com/elastic/fleet-server/v7/internal/pkg/policy" | ||
|
@@ -42,27 +41,21 @@ func (e *HTTPError) Error() string { | |
|
||
type AckT struct { | ||
cfg *config.Server | ||
limit *limit.Limiter | ||
bulk bulk.Bulk | ||
cache cache.Cache | ||
} | ||
|
||
func NewAckT(cfg *config.Server, bulker bulk.Bulk, cache cache.Cache) *AckT { | ||
log.Info(). | ||
Interface("limits", cfg.Limits.AckLimit). | ||
Msg("Setting config ack_limits") | ||
|
||
return &AckT{ | ||
cfg: cfg, | ||
bulk: bulker, | ||
cache: cache, | ||
limit: limit.NewLimiter(&cfg.Limits.AckLimit), | ||
} | ||
} | ||
|
||
func (rt Router) handleAcks(w http.ResponseWriter, r *http.Request, ps httprouter.Params) { | ||
//nolint:dupl // function body calls different internal hander then handleCheckin | ||
michel-laterman marked this conversation as resolved.
Show resolved
Hide resolved
|
||
func (rt *Router) handleAcks(w http.ResponseWriter, r *http.Request, ps httprouter.Params) { | ||
start := time.Now() | ||
|
||
id := ps.ByName("id") | ||
|
||
reqID := r.Header.Get(logger.HeaderRequestID) | ||
|
@@ -91,12 +84,6 @@ func (rt Router) handleAcks(w http.ResponseWriter, r *http.Request, ps httproute | |
} | ||
|
||
func (ack *AckT) handleAcks(zlog *zerolog.Logger, w http.ResponseWriter, r *http.Request, id string) error { | ||
limitF, err := ack.limit.Acquire() | ||
if err != nil { | ||
return err | ||
} | ||
defer limitF() | ||
|
||
agent, err := authAgent(r, &id, ack.bulk, ack.cache) | ||
if err != nil { | ||
return err | ||
|
@@ -107,10 +94,6 @@ func (ack *AckT) handleAcks(zlog *zerolog.Logger, w http.ResponseWriter, r *http | |
return ctx.Str(LogAccessAPIKeyID, agent.AccessAPIKeyID) | ||
}) | ||
|
||
// Metrics; serenity now. | ||
dfunc := cntAcks.IncStart() | ||
defer dfunc() | ||
Comment on lines
-110
to
-112
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In order to not change the structure of the stats we collect, and still track rate-limit errors per route. I've moved the incrementer/decrement to the rate limiter wrapper, this means that the total calls (and active calls) will always be tracked, even in the case where authentication fails. |
||
|
||
return ack.processRequest(*zlog, w, r, agent) | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rate limiting responses are done by the wrapping limiter