Skip to content

Commit

Permalink
Unique limiters for each API listener (#1904) (#1936)
Browse files Browse the repository at this point in the history
* Unique limiters for each API listener

Refactor the limit.Limiter so it can wrap the separate API httprouter
endpoints. Limiter.WrapX() calls take the handler and stats incrementer
for metrics/error counting. api.Run() replaced with Router.Run(), which
will generate an httprouter for each listener in order to be able to
associate the httprouter with a unique Limiter.

* Add listener address labeled logs to limiter

* Review feedback

* Apply suggestions from code review

Co-authored-by: Anderson Queiroz <me@andersonq.me>

* review feedback

* fix import

* Fix test

Co-authored-by: Anderson Queiroz <me@andersonq.me>
(cherry picked from commit c99ccd8)

Co-authored-by: Michel Laterman <82832767+michel-laterman@users.noreply.github.com>
  • Loading branch information
mergify[bot] and michel-laterman authored Sep 29, 2022
1 parent 4203ce1 commit 4f8282c
Show file tree
Hide file tree
Showing 16 changed files with 458 additions and 292 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
- Add error detail to catch-all HTTP error response. {pull}1854[1854]
- Fix issue were errors where being ignored written to elasticsearch. {pull}1896[1896]
- LoadServerLimits will not overwrite specified limits when loading default/agent number specified values. {issue}1841[1841] {pull}1912[1912]
- Use seperate rate limiters for internal and external API listeners. {issue}1859[1859] {pull}1904[1904]

==== New Features

Expand Down
4 changes: 2 additions & 2 deletions cmd/fleet/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -907,10 +907,10 @@ func (f *FleetServer) runSubsystems(ctx context.Context, cfg *config.Config, g *
ack := api.NewAckT(&cfg.Inputs[0].Server, bulker, f.cache)
st := api.NewStatusT(&cfg.Inputs[0].Server, bulker, f.cache)

router := api.NewRouter(ctx, bulker, ct, et, at, ack, st, sm, tracer, f.bi)
router := api.NewRouter(&cfg.Inputs[0].Server, bulker, ct, et, at, ack, st, sm, tracer, f.bi)

g.Go(loggedRunFunc(ctx, "Http server", func(ctx context.Context) error {
return api.Run(ctx, router, &cfg.Inputs[0].Server)
return router.Run(ctx)
}))

return err
Expand Down
20 changes: 0 additions & 20 deletions internal/pkg/api/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"strings"

"github.com/elastic/fleet-server/v7/internal/pkg/dl"
"github.com/elastic/fleet-server/v7/internal/pkg/limit"
"github.com/elastic/fleet-server/v7/internal/pkg/logger"

"github.com/pkg/errors"
Expand Down Expand Up @@ -43,7 +42,6 @@ type HTTPErrResp struct {

// NewHTTPErrResp creates an ErrResp from a go error
func NewHTTPErrResp(err error) HTTPErrResp {

errTable := []struct {
target error
meta HTTPErrResp
Expand All @@ -57,24 +55,6 @@ func NewHTTPErrResp(err error) HTTPErrResp {
zerolog.WarnLevel,
},
},
{
limit.ErrRateLimit,
HTTPErrResp{
http.StatusTooManyRequests,
"RateLimit",
"exceeded the rate limit",
zerolog.DebugLevel,
},
},
{
limit.ErrMaxLimit,
HTTPErrResp{
http.StatusTooManyRequests,
"MaxLimit",
"exceeded the max limit",
zerolog.DebugLevel,
},
},
{
ErrAPIKeyNotEnabled,
HTTPErrResp{
Expand Down
21 changes: 2 additions & 19 deletions internal/pkg/api/handleAck.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/elastic/fleet-server/v7/internal/pkg/config"
"github.com/elastic/fleet-server/v7/internal/pkg/dl"
"github.com/elastic/fleet-server/v7/internal/pkg/es"
"github.com/elastic/fleet-server/v7/internal/pkg/limit"
"github.com/elastic/fleet-server/v7/internal/pkg/logger"
"github.com/elastic/fleet-server/v7/internal/pkg/model"
"github.com/elastic/fleet-server/v7/internal/pkg/policy"
Expand All @@ -42,27 +41,21 @@ func (e *HTTPError) Error() string {

type AckT struct {
cfg *config.Server
limit *limit.Limiter
bulk bulk.Bulk
cache cache.Cache
}

func NewAckT(cfg *config.Server, bulker bulk.Bulk, cache cache.Cache) *AckT {
log.Info().
Interface("limits", cfg.Limits.AckLimit).
Msg("Setting config ack_limits")

return &AckT{
cfg: cfg,
bulk: bulker,
cache: cache,
limit: limit.NewLimiter(&cfg.Limits.AckLimit),
}
}

func (rt Router) handleAcks(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
//nolint:dupl // function body calls different internal handler then handleCheckin
func (rt *Router) handleAcks(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
start := time.Now()

id := ps.ByName("id")

reqID := r.Header.Get(logger.HeaderRequestID)
Expand Down Expand Up @@ -91,12 +84,6 @@ func (rt Router) handleAcks(w http.ResponseWriter, r *http.Request, ps httproute
}

func (ack *AckT) handleAcks(zlog *zerolog.Logger, w http.ResponseWriter, r *http.Request, id string) error {
limitF, err := ack.limit.Acquire()
if err != nil {
return err
}
defer limitF()

agent, err := authAgent(r, &id, ack.bulk, ack.cache)
if err != nil {
return err
Expand All @@ -107,10 +94,6 @@ func (ack *AckT) handleAcks(zlog *zerolog.Logger, w http.ResponseWriter, r *http
return ctx.Str(LogAccessAPIKeyID, agent.AccessAPIKeyID)
})

// Metrics; serenity now.
dfunc := cntAcks.IncStart()
defer dfunc()

return ack.processRequest(*zlog, w, r, agent)
}

Expand Down
20 changes: 1 addition & 19 deletions internal/pkg/api/handleArtifacts.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"github.com/elastic/fleet-server/v7/internal/pkg/cache"
"github.com/elastic/fleet-server/v7/internal/pkg/config"
"github.com/elastic/fleet-server/v7/internal/pkg/dl"
"github.com/elastic/fleet-server/v7/internal/pkg/limit"
"github.com/elastic/fleet-server/v7/internal/pkg/logger"
"github.com/elastic/fleet-server/v7/internal/pkg/model"
"github.com/elastic/fleet-server/v7/internal/pkg/throttle"
Expand All @@ -46,24 +45,17 @@ type ArtifactT struct {
bulker bulk.Bulk
cache cache.Cache
esThrottle *throttle.Throttle
limit *limit.Limiter
}

func NewArtifactT(cfg *config.Server, bulker bulk.Bulk, cache cache.Cache) *ArtifactT {
log.Info().
Interface("limits", cfg.Limits.ArtifactLimit).
Int("maxParallel", defaultMaxParallel).
Msg("Artifact install limits")

return &ArtifactT{
bulker: bulker,
cache: cache,
limit: limit.NewLimiter(&cfg.Limits.ArtifactLimit),
esThrottle: throttle.NewThrottle(defaultMaxParallel),
}
}

func (rt Router) handleArtifacts(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
func (rt *Router) handleArtifacts(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
start := time.Now()

var (
Expand Down Expand Up @@ -112,12 +104,6 @@ func (rt Router) handleArtifacts(w http.ResponseWriter, r *http.Request, ps http
}

func (at ArtifactT) handleArtifacts(zlog *zerolog.Logger, r *http.Request, id, sha2 string) (io.Reader, error) {
limitF, err := at.limit.Acquire()
if err != nil {
return nil, err
}
defer limitF()

// Authenticate the APIKey; retrieve agent record.
// Note: This is going to be a bit slow even if we hit the cache on the api key.
// In order to validate that the agent still has that api key, we fetch the agent record from elastic.
Expand All @@ -131,10 +117,6 @@ func (at ArtifactT) handleArtifacts(zlog *zerolog.Logger, r *http.Request, id, s
return ctx.Str(LogAccessAPIKeyID, agent.AccessAPIKeyID)
})

// Metrics; serenity now.
dfunc := cntArtifacts.IncStart()
defer dfunc()

return at.processRequest(r.Context(), *zlog, agent, id, sha2)
}

Expand Down
32 changes: 2 additions & 30 deletions internal/pkg/api/handleCheckin.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/elastic/fleet-server/v7/internal/pkg/checkin"
"github.com/elastic/fleet-server/v7/internal/pkg/config"
"github.com/elastic/fleet-server/v7/internal/pkg/dl"
"github.com/elastic/fleet-server/v7/internal/pkg/limit"
"github.com/elastic/fleet-server/v7/internal/pkg/logger"
"github.com/elastic/fleet-server/v7/internal/pkg/model"
"github.com/elastic/fleet-server/v7/internal/pkg/monitor"
Expand All @@ -48,7 +47,8 @@ const (
kEncodingGzip = "gzip"
)

func (rt Router) handleCheckin(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
//nolint:dupl // function body calls different internal hander then handleAck
func (rt *Router) handleCheckin(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
start := time.Now()

id := ps.ByName("id")
Expand All @@ -65,12 +65,6 @@ func (rt Router) handleCheckin(w http.ResponseWriter, r *http.Request, ps httpro
cntCheckin.IncError(err)
resp := NewHTTPErrResp(err)

// Log this as warn for visibility that limit has been reached.
// This allows customers to tune the configuration on detection of threshold.
if errors.Is(err, limit.ErrMaxLimit) {
resp.Level = zerolog.WarnLevel
}

zlog.WithLevel(resp.Level).
Err(err).
Int(ECSHTTPResponseCode, resp.StatusCode).
Expand All @@ -93,7 +87,6 @@ type CheckinT struct {
ad *action.Dispatcher
tr *action.TokenResolver
bulker bulk.Bulk
limit *limit.Limiter
}

func NewCheckinT(
Expand All @@ -107,14 +100,6 @@ func NewCheckinT(
tr *action.TokenResolver,
bulker bulk.Bulk,
) *CheckinT {

log.Info().
Interface("limits", cfg.Limits.CheckinLimit).
Dur("long_poll_timeout", cfg.Timeouts.CheckinLongPoll).
Dur("long_poll_timestamp", cfg.Timeouts.CheckinTimestamp).
Dur("long_poll_jitter", cfg.Timeouts.CheckinJitter).
Msg("Checkin install limits")

ct := &CheckinT{
verCon: verCon,
cfg: cfg,
Expand All @@ -124,23 +109,15 @@ func NewCheckinT(
gcp: gcp,
ad: ad,
tr: tr,
limit: limit.NewLimiter(&cfg.Limits.CheckinLimit),
bulker: bulker,
}

return ct
}

func (ct *CheckinT) handleCheckin(zlog *zerolog.Logger, w http.ResponseWriter, r *http.Request, id string) error {

start := time.Now()

limitF, err := ct.limit.Acquire()
if err != nil {
return err
}
defer limitF()

agent, err := authAgent(r, &id, ct.bulker, ct.cache)
if err != nil {
return err
Expand All @@ -158,11 +135,6 @@ func (ct *CheckinT) handleCheckin(zlog *zerolog.Logger, w http.ResponseWriter, r

// Safely check if the agent version is different, return empty string otherwise
newVer := agent.CheckDifferentVersion(ver)

// Metrics; serenity now.
dfunc := cntCheckin.IncStart()
defer dfunc()

return ct.processRequest(*zlog, w, r, start, agent, newVer)
}

Expand Down
20 changes: 1 addition & 19 deletions internal/pkg/api/handleEnroll.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/elastic/fleet-server/v7/internal/pkg/cache"
"github.com/elastic/fleet-server/v7/internal/pkg/config"
"github.com/elastic/fleet-server/v7/internal/pkg/dl"
"github.com/elastic/fleet-server/v7/internal/pkg/limit"
"github.com/elastic/fleet-server/v7/internal/pkg/logger"
"github.com/elastic/fleet-server/v7/internal/pkg/model"
"github.com/elastic/fleet-server/v7/internal/pkg/rollback"
Expand Down Expand Up @@ -49,25 +48,19 @@ type EnrollerT struct {
cfg *config.Server
bulker bulk.Bulk
cache cache.Cache
limit *limit.Limiter
}

func NewEnrollerT(verCon version.Constraints, cfg *config.Server, bulker bulk.Bulk, c cache.Cache) (*EnrollerT, error) {
log.Info().
Interface("limits", cfg.Limits.EnrollLimit).
Msg("Setting config enroll_limit")

return &EnrollerT{
verCon: verCon,
cfg: cfg,
limit: limit.NewLimiter(&cfg.Limits.EnrollLimit),
bulker: bulker,
cache: c,
}, nil

}

func (rt Router) handleEnroll(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
func (rt *Router) handleEnroll(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
start := time.Now()

// Work around wonky router rule
Expand Down Expand Up @@ -129,13 +122,6 @@ func (rt Router) handleEnroll(w http.ResponseWriter, r *http.Request, ps httprou
}

func (et *EnrollerT) handleEnroll(rb *rollback.Rollback, zlog *zerolog.Logger, w http.ResponseWriter, r *http.Request) (*EnrollResponse, error) {

limitF, err := et.limit.Acquire()
if err != nil {
return nil, err
}
defer limitF()

key, err := authAPIKey(r, et.bulker, et.cache)
if err != nil {
return nil, err
Expand All @@ -151,10 +137,6 @@ func (et *EnrollerT) handleEnroll(rb *rollback.Rollback, zlog *zerolog.Logger, w
return nil, err
}

// Metrics; serenity now.
dfunc := cntEnroll.IncStart()
defer dfunc()

return et.processRequest(rb, *zlog, w, r, key.ID, ver)
}

Expand Down
Loading

0 comments on commit 4f8282c

Please sign in to comment.