Skip to content

Commit

Permalink
Merge pull request #97
Browse files Browse the repository at this point in the history
* revendor with latest rest lib

* simplify with passThroughHandler

* add deps for throttling
  • Loading branch information
umputun authored Jul 3, 2021
1 parent a9c7db2 commit 7103968
Show file tree
Hide file tree
Showing 32 changed files with 2,437 additions and 63 deletions.
10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,12 @@ _see also [examples/metrics](https://github.com/umputun/reproxy/tree/master/exam

Reproxy returns 502 (Bad Gateway) error in case if request doesn't match to any provided routes and assets. In case if some unexpected, internal error happened it returns 500. By default reproxy renders the simplest text version of the error - "Server error". Setting `--error.enabled` turns on the default html error message and with `--error.template` user may set any custom html template file for the error rendering. The template has two vars: `{{.ErrCode}}` and `{{.ErrMessage}}`. For example this template `oh my! {{.ErrCode}} - {{.ErrMessage}}` will be rendered to `oh my! 502 - Bad Gateway`

## Throttling

Reproxy allows to define system level max req/sec value for the overall system activity as well as per user. 0 values (default) treated as unlimited.

User activity limited for both matched and unmatched routes. All unmatched routes considered as a "single destination group" and get a common limiter which is `rate*3`. It means if 10 (req/sec) defined with `--throttle.user=10` the end user will be able to perform up to 30 request pers second for either static assets or unmatched routes. For matched routes this limiter maintained per destination (route), i.e. request proxied to s1.example.com/api will allow 10 r/s and the request proxied to s2.example.com will allow another 10 r/s.

## Plugins support

The core functionality of reproxy can be extended with external plugins. Each plugin is an independent process/container implementing [rpc server](https://golang.org/pkg/net/rpc/). Plugins registered with reproxy conductor and added to the chain of the middlewares. Each plugin receives request with the original url, headers and all matching route info and responds with the headers and the status code. Any status code >= 400 treated as an error response and terminates flow immediately with the proxy error. There are two types of headers plugins can set:
Expand Down Expand Up @@ -349,6 +355,10 @@ health-check:
--health-check.enabled enable automatic health-check [$HEALTH_CHECK_ENABLED]
--health-check.interval= automatic health-check interval (default: 300s) [$HEALTH_CHECK_INTERVAL]

throttle:
--throttle.system= throttle overall activity' (default: 0) [$THROTTLE_SYSTEM]
--throttle.user= limit req/sec per user and per proxy destination (default: 0) [$THROTTLE_USER]


Help Options:
-h, --help Show this help message
Expand Down
7 changes: 7 additions & 0 deletions app/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,11 @@ var opts struct {
Interval time.Duration `long:"interval" env:"INTERVAL" default:"300s" description:"automatic health-check interval"`
} `group:"health-check" namespace:"health-check" env-namespace:"HEALTH_CHECK"`

Throttle struct {
System int `long:"system" env:"SYSTEM" default:"0" description:"throttle overall activity'"`
User int `long:"user" env:"USER" default:"0" description:"limit req/sec per user and per proxy destination"`
} `group:"throttle" namespace:"throttle" env-namespace:"THROTTLE"`

Plugin struct {
Enabled bool `long:"enabled" env:"ENABLED" description:"enable plugin support"`
Listen string `long:"listen" env:"LISTEN" default:"127.0.0.1:8081" description:"registration listen on host:port"`
Expand Down Expand Up @@ -246,6 +251,8 @@ func run() error {
Metrics: makeMetrics(ctx, svc),
Reporter: errReporter,
PluginConductor: makePluginConductor(ctx),
ThrottleSystem: opts.Throttle.System * 3,
ThottleUser: opts.Throttle.User,
}

err = px.Run(ctx)
Expand Down
115 changes: 79 additions & 36 deletions app/proxy/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,13 @@ import (
"net/http"
"strings"

"github.com/didip/tollbooth/v6"
"github.com/didip/tollbooth/v6/libstring"
log "github.com/go-pkgz/lgr"
R "github.com/go-pkgz/rest"
"github.com/gorilla/handlers"

"github.com/umputun/reproxy/app/discovery"
)

func headersHandler(headers []string) func(next http.Handler) http.Handler {
Expand All @@ -32,11 +36,7 @@ func headersHandler(headers []string) func(next http.Handler) http.Handler {

func maxReqSizeHandler(maxSize int64) func(next http.Handler) http.Handler {
if maxSize <= 0 {
return func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
next.ServeHTTP(w, r)
})
}
return passThroughHandler
}

log.Printf("[DEBUG] request size limited to %d", maxSize)
Expand Down Expand Up @@ -65,49 +65,92 @@ func accessLogHandler(wr io.Writer) func(next http.Handler) http.Handler {

func stdoutLogHandler(enable bool, lh func(next http.Handler) http.Handler) func(next http.Handler) http.Handler {

if enable {
log.Printf("[DEBUG] stdout logging enabled")
return func(next http.Handler) http.Handler {
fn := func(w http.ResponseWriter, r *http.Request) {
// don't log to stdout GET ~/(.*)/ping$ requests
if r.Method == "GET" && strings.HasSuffix(r.URL.Path, "/ping") {
next.ServeHTTP(w, r)
return
}
lh(next).ServeHTTP(w, r)
}
return http.HandlerFunc(fn)
}
if !enable {
return passThroughHandler
}

log.Printf("[DEBUG] stdout logging enabled")
return func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
next.ServeHTTP(w, r)
})
fn := func(w http.ResponseWriter, r *http.Request) {
// don't log to stdout GET ~/(.*)/ping$ requests
if r.Method == "GET" && strings.HasSuffix(r.URL.Path, "/ping") {
next.ServeHTTP(w, r)
return
}
lh(next).ServeHTTP(w, r)
}
return http.HandlerFunc(fn)
}
}

func gzipHandler(enabled bool) func(next http.Handler) http.Handler {
if enabled {
log.Printf("[DEBUG] gzip enabled")
return handlers.CompressHandler
if !enabled {
return passThroughHandler
}

return func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
next.ServeHTTP(w, r)
})
}
log.Printf("[DEBUG] gzip enabled")
return handlers.CompressHandler
}

func signatureHandler(enabled bool, version string) func(next http.Handler) http.Handler {
if enabled {
log.Printf("[DEBUG] signature headers enabled")
return R.AppInfo("reproxy", "umputun", version)
if !enabled {
return passThroughHandler
}
return func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
next.ServeHTTP(w, r)
})
log.Printf("[DEBUG] signature headers enabled")
return R.AppInfo("reproxy", "umputun", version)
}

// limiterSystemHandler throttles overall activity of reproxy server, 0 means disabled
func limiterSystemHandler(reqSec int) func(next http.Handler) http.Handler {
if reqSec <= 0 {
return passThroughHandler
}
return func(h http.Handler) http.Handler {
lmt := tollbooth.NewLimiter(float64(reqSec), nil)
fn := func(w http.ResponseWriter, r *http.Request) {
if httpError := tollbooth.LimitByKeys(lmt, []string{"system"}); httpError != nil {
http.Error(w, http.StatusText(http.StatusTooManyRequests), http.StatusTooManyRequests)
return
}
h.ServeHTTP(w, r)
}
return http.HandlerFunc(fn)
}
}

// limiterUserHandler throttles per user activity. In case if match found the limit is per destination
// otherwise global (per user in any case). 0 means disabled
func limiterUserHandler(reqSec int) func(next http.Handler) http.Handler {
if reqSec <= 0 {
return passThroughHandler
}

return func(h http.Handler) http.Handler {
lmt := tollbooth.NewLimiter(float64(reqSec), nil)
fn := func(w http.ResponseWriter, r *http.Request) {
keys := []string{libstring.RemoteIP(lmt.GetIPLookups(), lmt.GetForwardedForIndexFromBehind(), r)}

// add dst proxy if matched
if r.Context().Value(ctxMatch) != nil { // route match detected by matchHandler
match := r.Context().Value(ctxMatch).(discovery.MatchedRoute)
matchType := r.Context().Value(ctxMatchType).(discovery.MatchType)
if matchType == discovery.MTProxy {
keys = append(keys, match.Mapper.Dst)
}
}

if httpError := tollbooth.LimitByKeys(lmt, keys); httpError != nil {
http.Error(w, http.StatusText(http.StatusTooManyRequests), http.StatusTooManyRequests)
return
}
h.ServeHTTP(w, r)
}
return http.HandlerFunc(fn)
}
}

func passThroughHandler(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
next.ServeHTTP(w, r)
})
}
97 changes: 97 additions & 0 deletions app/proxy/handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,18 @@ package proxy

import (
"bytes"
"context"
"net/http"
"net/http/httptest"
"strconv"
"sync"
"sync/atomic"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/umputun/reproxy/app/discovery"
)

func Test_headersHandler(t *testing.T) {
Expand Down Expand Up @@ -83,3 +89,94 @@ func Test_signatureHandler(t *testing.T) {
assert.Equal(t, "", wr.Result().Header.Get("App-Version"), wr.Result().Header)
}
}

func Test_limiterSystemHandler(t *testing.T) {

var passed int32
handler := limiterSystemHandler(10)(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
atomic.AddInt32(&passed, 1)
}))

ts := httptest.NewServer(handler)
var wg sync.WaitGroup
wg.Add(100)
for i := 0; i < 100; i++ {
go func() {
defer wg.Done()
req, err := http.NewRequest("GET", ts.URL, nil)
require.NoError(t, err)
client := http.Client{}
resp, err := client.Do(req)
require.NoError(t, err)
resp.Body.Close()
}()
}
wg.Wait()
assert.Equal(t, int32(10), atomic.LoadInt32(&passed))
}

func Test_limiterClientHandlerNoMatches(t *testing.T) {

var passed int32
handler := limiterUserHandler(10)(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
atomic.AddInt32(&passed, 1)
}))

ts := httptest.NewServer(handler)
var wg sync.WaitGroup
wg.Add(100)
for i := 0; i < 100; i++ {
go func() {
defer wg.Done()
req, err := http.NewRequest("GET", ts.URL, nil)
require.NoError(t, err)
client := http.Client{}
resp, err := client.Do(req)
require.NoError(t, err)
resp.Body.Close()
}()
}
wg.Wait()
assert.Equal(t, int32(10), atomic.LoadInt32(&passed))
}

func Test_limiterClientHandlerWithMatches(t *testing.T) {
var passed int32
handler := limiterUserHandler(10)(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
atomic.AddInt32(&passed, 1)
}))

wrapWithContext := func(next http.Handler) http.Handler {
var id int32
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
n := int(atomic.AddInt32(&id, 1))
m := discovery.MatchedRoute{Mapper: discovery.URLMapper{Dst: strconv.Itoa(n % 2)}}
ctx := context.WithValue(context.Background(), ctxMatchType, discovery.MTProxy)
ctx = context.WithValue(ctx, ctxMatch, m)
next.ServeHTTP(w, r.WithContext(ctx))
})
}

ts := httptest.NewServer(wrapWithContext(handler))

var wg sync.WaitGroup
wg.Add(100)
for i := 0; i < 100; i++ {
go func(id int) {
defer wg.Done()
req, err := http.NewRequest("POST", ts.URL, bytes.NewBufferString("123456"))
require.NoError(t, err)
m := discovery.MatchedRoute{Mapper: discovery.URLMapper{Dst: strconv.Itoa(id % 2)}}
ctx := context.WithValue(context.Background(), ctxMatchType, discovery.MTProxy)
ctx = context.WithValue(ctx, ctxMatch, m)
req = req.WithContext(ctx)

client := http.Client{}
resp, err := client.Do(req)
require.NoError(t, err)
resp.Body.Close()
}(i)
}
wg.Wait()
assert.Equal(t, int32(20), atomic.LoadInt32(&passed))
}
51 changes: 24 additions & 27 deletions app/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ type Http struct { // nolint golint
PluginConductor MiddlewareProvider
Reporter Reporter
LBSelector func(len int) int

ThrottleSystem int
ThottleUser int
}

// Matcher source info (server and route) to the destination url
Expand Down Expand Up @@ -107,18 +110,20 @@ func (h *Http) Run(ctx context.Context) error {
}()

handler := R.Wrap(h.proxyHandler(),
R.Recoverer(log.Default()),
signatureHandler(h.Signature, h.Version),
h.pingHandler,
h.healthMiddleware,
h.matchHandler,
h.mgmtHandler(),
h.pluginHandler(),
headersHandler(h.ProxyHeaders),
accessLogHandler(h.AccessLog),
R.Recoverer(log.Default()), // recover on errors
signatureHandler(h.Signature, h.Version), // send app signature
h.pingHandler, // respond to /ping
h.healthMiddleware, // respond to /health
h.matchHandler, // set matched routes to context
limiterSystemHandler(h.ThrottleSystem), // limit total requests/sec
limiterUserHandler(h.ThottleUser), // req/seq per user/route match
h.mgmtHandler(), // handles /metrics and /routes for prometheus
h.pluginHandler(), // prc to external plugins
headersHandler(h.ProxyHeaders), // set response headers
accessLogHandler(h.AccessLog), // apache-format log file
stdoutLogHandler(h.StdOutEnabled, logger.New(logger.Log(log.Default()), logger.Prefix("[INFO]")).Handler),
maxReqSizeHandler(h.MaxBodySize),
gzipHandler(h.GzEnabled),
maxReqSizeHandler(h.MaxBodySize), // limit request max size
gzipHandler(h.GzEnabled), // gzip response
)

if len(h.SSLConfig.FQDNs) == 0 && h.SSLConfig.SSLMode == SSLAuto {
Expand Down Expand Up @@ -347,27 +352,19 @@ func (h *Http) toHTTP(address string, httpPort int) string {
}

func (h *Http) pluginHandler() func(next http.Handler) http.Handler {
if h.PluginConductor != nil {
log.Printf("[INFO] plugin support enabled")
return h.PluginConductor.Middleware
}
return func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
next.ServeHTTP(w, r)
})
if h.PluginConductor == nil {
return passThroughHandler
}
log.Printf("[INFO] plugin support enabled")
return h.PluginConductor.Middleware
}

func (h *Http) mgmtHandler() func(next http.Handler) http.Handler {
if h.Metrics != nil {
log.Printf("[DEBUG] metrics enabled")
return h.Metrics.Middleware
}
return func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
next.ServeHTTP(w, r)
})
if h.Metrics == nil {
return passThroughHandler
}
log.Printf("[DEBUG] metrics enabled")
return h.Metrics.Middleware
}

func (h *Http) makeHTTPServer(addr string, router http.Handler) *http.Server {
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/umputun/reproxy
go 1.16

require (
github.com/didip/tollbooth/v6 v6.1.0
github.com/go-pkgz/lgr v0.10.4
github.com/go-pkgz/repeater v1.1.3
github.com/go-pkgz/rest v1.11.0
Expand Down
Loading

0 comments on commit 7103968

Please sign in to comment.