Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial throttling implementation #94

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 50 additions & 3 deletions app/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,15 @@ var opts struct {

Signature bool `long:"signature" env:"SIGNATURE" description:"enable reproxy signature headers"`
Dbg bool `long:"dbg" env:"DEBUG" description:"debug mode"`

Throttling struct {
Enabled bool `long:"enabled" env:"ENABLED" description:"enable per-proxy server throttling of requests"`
Rate int `long:"rate" env:"RATE" description:"Maximum sustained rate of requests"`
Burst int `long:"burst" env:"BURST" description:"Burst bucket capacity"`
HttpStatusCode int `long:"http-status-code" env:"HTTP_STATUS_CODE" default:"503" description:"Http status code returned for throttled requests"`
PerServerRate map[string]int `long:"per-server-rate" env:"PER_SERVER_RATE" description:"Per virtual server maximum sustained rate of requests"`
PerServerBurst map[string]int `long:"per-server-burst" env:"PER_SERVER_BURST" description:"Per virtual server burst bucket capacity"`
} `group:"throttling" namespace:"throttling" env-namespace:"THROTTLING"`
}

var revision = "unknown"
Expand Down Expand Up @@ -216,6 +225,8 @@ func run() error {
return fmt.Errorf("failed to convert MaxSize: %w", err)
}

throttlingConfig := constructThrottlingConfig()

px := &proxy.Http{
Version: revision,
Matcher: svc,
Expand Down Expand Up @@ -243,9 +254,10 @@ func run() error {
ExpectContinue: opts.Timeouts.ExpectContinue,
ResponseHeader: opts.Timeouts.ResponseHeader,
},
Metrics: makeMetrics(ctx, svc),
Metrics: makeMetrics(ctx, svc, throttlingConfig),
Reporter: errReporter,
PluginConductor: makePluginConductor(ctx),
Throttling: makeThrottling(throttlingConfig),
}

err = px.Run(ctx)
Expand Down Expand Up @@ -321,11 +333,11 @@ func makePluginConductor(ctx context.Context) proxy.MiddlewareProvider {
return conductor
}

func makeMetrics(ctx context.Context, informer mgmt.Informer) proxy.MiddlewareProvider {
func makeMetrics(ctx context.Context, informer mgmt.Informer, throttlingConfig *mgmt.ProxyThrottlingConfig) proxy.MiddlewareProvider {
if !opts.Management.Enabled {
return nil
}
metrics := mgmt.NewMetrics()
metrics := mgmt.NewMetrics(throttlingConfig)
go func() {
mgSrv := mgmt.Server{
Listen: opts.Management.Listen,
Expand All @@ -341,6 +353,41 @@ func makeMetrics(ctx context.Context, informer mgmt.Informer) proxy.MiddlewarePr
return metrics
}

func makeThrottling(throttlingConfig *mgmt.ProxyThrottlingConfig) proxy.MiddlewareProvider {
if !opts.Throttling.Enabled {
return nil
}
throttler := mgmt.NewThrottler(throttlingConfig)
return throttler
}

func constructThrottlingConfig() *mgmt.ProxyThrottlingConfig {
perServerThrottlingConfig := make(map[string]mgmt.ServerThrottlingConfig)
for serverName, serverRate := range opts.Throttling.PerServerRate {
perServerThrottlingConfig[serverName] = mgmt.ServerThrottlingConfig{
Enabled: true,
Rate: serverRate,
}
}
for serverName, serverBurst := range opts.Throttling.PerServerBurst {
if serverThrottlingConfig, exists := perServerThrottlingConfig[serverName]; exists {
serverThrottlingConfig.Burst = serverBurst
} else {
log.Printf("[WARN] throttling burst for the virtual server %v specified without "+
"throttling rate, throttling for the virtual server disabled", serverName)
}
}
return &mgmt.ProxyThrottlingConfig{
HttpStatusCode: opts.Throttling.HttpStatusCode,
ProxyThrottlingConfig: mgmt.ServerThrottlingConfig{
Enabled: opts.Throttling.Enabled,
Rate: opts.Throttling.Rate,
Burst: opts.Throttling.Burst,
},
PerServerThrottlingConfig: perServerThrottlingConfig,
}
}

func makeSSLConfig() (config proxy.SSLConfig, err error) {
switch opts.SSL.Type {
case "none":
Expand Down
35 changes: 27 additions & 8 deletions app/mgmt/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,20 @@ import (

// Metrics provides registration and middleware for prometheus
type Metrics struct {
totalRequests *prometheus.CounterVec
responseStatus *prometheus.CounterVec
httpDuration *prometheus.HistogramVec
totalRequests *prometheus.CounterVec
responseStatus *prometheus.CounterVec
httpDuration *prometheus.HistogramVec
throttledRequests *prometheus.CounterVec
isThrotllingEnabed bool
throtlingHttpStatusCode int
}

// NewMetrics create metrics object with all counters registered
func NewMetrics() *Metrics {
res := &Metrics{}
func NewMetrics(throttlingConfig *ProxyThrottlingConfig) *Metrics {
res := &Metrics{
isThrotllingEnabed: throttlingConfig.ProxyThrottlingConfig.Enabled,
throtlingHttpStatusCode: throttlingConfig.HttpStatusCode,
}

res.totalRequests = prometheus.NewCounterVec(
prometheus.CounterOpts{
Expand All @@ -45,6 +51,14 @@ func NewMetrics() *Metrics {
Buckets: []float64{0.01, 0.1, 0.5, 1, 2, 3, 5},
}, []string{"path"})

res.throttledRequests = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "http_requests_throttled",
Help: "Number of throttled requests.",
},
[]string{"server"},
)

prometheus.Unregister(prometheus.NewGoCollector())

if err := prometheus.Register(res.totalRequests); err != nil {
Expand All @@ -56,6 +70,9 @@ func NewMetrics() *Metrics {
if err := prometheus.Register(res.httpDuration); err != nil {
log.Printf("[WARN] can't register prometheus httpDuration, %v", err)
}
if err := prometheus.Register(res.throttledRequests); err != nil {
log.Printf("[WARN] can't register prometheus throttledRequests, %v", err)
}

return res
}
Expand All @@ -70,15 +87,17 @@ func (m *Metrics) Middleware(next http.Handler) http.Handler {
server = strings.Split(r.Host, ":")[0]
}

timer := prometheus.NewTimer(m.httpDuration.WithLabelValues(path))
rw := NewResponseWriter(w)
timer := prometheus.NewTimer(m.httpDuration.WithLabelValues(path))
next.ServeHTTP(rw, r)
timer.ObserveDuration()

statusCode := rw.statusCode
m.responseStatus.WithLabelValues(strconv.Itoa(statusCode)).Inc()
m.totalRequests.WithLabelValues(server).Inc()

timer.ObserveDuration()
if m.isThrotllingEnabed && statusCode == m.throtlingHttpStatusCode {
m.throttledRequests.WithLabelValues(server).Inc()
}
})
}

Expand Down
88 changes: 88 additions & 0 deletions app/mgmt/throttling_config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package mgmt

import (
"log"
"net/http"
"strings"

"golang.org/x/time/rate"
)

type ProxyThrottlingConfig struct {
HttpStatusCode int
ProxyThrottlingConfig ServerThrottlingConfig
PerServerThrottlingConfig map[string]ServerThrottlingConfig
}

type ServerThrottlingConfig struct {
Enabled bool
Rate int
Burst int
}

type Throttler struct {
throttlingConfig ProxyThrottlingConfig
proxyRateLimiter *rate.Limiter
perServerRateLimiters map[string]*rate.Limiter
reportThrottlingHandler http.HandlerFunc
}

func NewThrottler(throttlingConfig *ProxyThrottlingConfig) *Throttler {
perServerRateLimiters := make(map[string]*rate.Limiter)
for server, serverThrottlingConfig := range throttlingConfig.PerServerThrottlingConfig {
perServerRateLimiters[server] = createRateLimiter(serverThrottlingConfig)
}
return &Throttler{
throttlingConfig: *throttlingConfig,
proxyRateLimiter: createRateLimiter(throttlingConfig.ProxyThrottlingConfig),
perServerRateLimiters: perServerRateLimiters,
reportThrottlingHandler: createReportThrottlingHandler(throttlingConfig.HttpStatusCode),
}
}

func (t *Throttler) Middleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// stage one: global rate limit
if t.proxyRateLimiter != nil && !t.proxyRateLimiter.Allow() {
t.reportThrottlingHandler.ServeHTTP(w, r)
return
}
// stage two: per server rate limit
server := getServer(r)
serverRateLimiter := t.perServerRateLimiters[server]
if serverRateLimiter != nil && !serverRateLimiter.Allow() {
t.reportThrottlingHandler.ServeHTTP(w, r)
return
}
next.ServeHTTP(w, r)
})
}

func getServer(r *http.Request) string {
server := r.URL.Hostname()
if server == "" {
server = strings.Split(r.Host, ":")[0] // drop port
}
return server
}

func createReportThrottlingHandler(httpStatusCode int) http.HandlerFunc {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Add("Content-Type", "text/plain")
w.WriteHeader(httpStatusCode)
_, err := w.Write([]byte("Request rate limit exceeded, please retry later"))
if err != nil {
log.Printf("[WARN] can't write throttle request output content, %v", err)
}
})
}

func createRateLimiter(throttlingConfig ServerThrottlingConfig) *rate.Limiter {
if !throttlingConfig.Enabled {
return nil
}
return rate.NewLimiter(
rate.Limit(throttlingConfig.Rate),
throttlingConfig.Burst,
)
}
30 changes: 20 additions & 10 deletions app/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type Http struct { // nolint golint
PluginConductor MiddlewareProvider
Reporter Reporter
LBSelector func(len int) int
Throttling MiddlewareProvider
}

// Matcher source info (server and route) to the destination url
Expand Down Expand Up @@ -113,6 +114,7 @@ func (h *Http) Run(ctx context.Context) error {
h.healthMiddleware,
h.matchHandler,
h.mgmtHandler(),
h.throttlingHandler(),
h.pluginHandler(),
headersHandler(h.ProxyHeaders),
accessLogHandler(h.AccessLog),
Expand Down Expand Up @@ -351,23 +353,15 @@ func (h *Http) pluginHandler() func(next http.Handler) http.Handler {
log.Printf("[INFO] plugin support enabled")
return h.PluginConductor.Middleware
}
return func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
next.ServeHTTP(w, r)
})
}
return passThroughHandler()
}

func (h *Http) mgmtHandler() func(next http.Handler) http.Handler {
if h.Metrics != nil {
log.Printf("[DEBUG] metrics enabled")
return h.Metrics.Middleware
}
return func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
next.ServeHTTP(w, r)
})
}
return passThroughHandler()
}

func (h *Http) makeHTTPServer(addr string, router http.Handler) *http.Server {
Expand Down Expand Up @@ -398,3 +392,19 @@ func (h *Http) setXRealIP(r *http.Request) {
}
r.Header.Add("X-Real-IP", ip)
}

func passThroughHandler() func(next http.Handler) http.Handler {
return func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
next.ServeHTTP(w, r)
})
}
}

func (h *Http) throttlingHandler() func(next http.Handler) http.Handler {
if h.Throttling != nil {
log.Printf("[DEBUG] throttling enabled")
return h.Throttling.Middleware
}
return passThroughHandler()
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ require (
github.com/stretchr/testify v1.7.0
github.com/umputun/go-flags v1.5.1
golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2
golang.org/x/time v0.0.0-20210611083556-38a9dc6acbc6
gopkg.in/natefinch/lumberjack.v2 v2.0.0
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,8 @@ golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20210611083556-38a9dc6acbc6 h1:Vv0JUPWTyeqUq42B2WJ1FeIDjjvGKoA2Ss+Ts0lAVbs=
golang.org/x/time v0.0.0-20210611083556-38a9dc6acbc6/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
Expand Down
3 changes: 3 additions & 0 deletions vendor/golang.org/x/time/AUTHORS

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions vendor/golang.org/x/time/CONTRIBUTORS

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 27 additions & 0 deletions vendor/golang.org/x/time/LICENSE

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 22 additions & 0 deletions vendor/golang.org/x/time/PATENTS

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading