Skip to content

Commit

Permalink
fix: address issues raised from code review
Browse files Browse the repository at this point in the history
rvagg committed Aug 9, 2024

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
1 parent 670f433 commit a7b1db5
Showing 8 changed files with 278 additions and 222 deletions.
18 changes: 12 additions & 6 deletions cmd/lotus-gateway/main.go
Original file line number Diff line number Diff line change
@@ -124,12 +124,12 @@ var runCmd = &cli.Command{
&cli.DurationFlag{
Name: "api-max-lookback",
Usage: "maximum duration allowable for tipset lookbacks",
Value: gateway.DefaultLookbackCap,
Value: gateway.DefaultMaxLookbackDuration,
},
&cli.Int64Flag{
Name: "api-wait-lookback-limit",
Usage: "maximum number of blocks to search back through for message inclusion",
Value: int64(gateway.DefaultStateWaitLookbackLimit),
Value: int64(gateway.DefaultMaxMessageLookbackEpochs),
},
&cli.Int64Flag{
Name: "rate-limit",
@@ -154,7 +154,7 @@ var runCmd = &cli.Command{
},
&cli.Int64Flag{
Name: "conn-per-minute",
Usage: "A hard limit on the number of incomming connections (requests) to accept per remote host per minute. Use 0 to disable",
Usage: "A hard limit on the number of incoming connections (requests) to accept per remote host per minute. Use 0 to disable",
Value: 0,
},
&cli.IntFlag{
@@ -212,13 +212,19 @@ var runCmd = &cli.Command{
gwapi := gateway.NewNode(
api,
gateway.WithEthSubHandler(subHnd),
gateway.WithLookbackCap(lookbackCap),
gateway.WithStateWaitLookbackLimit(waitLookback),
gateway.WithMaxLookbackDuration(lookbackCap),
gateway.WithMaxMessageLookbackEpochs(waitLookback),
gateway.WithRateLimit(globalRateLimit),
gateway.WithRateLimitTimeout(rateLimitTimeout),
gateway.WithEthMaxFiltersPerConn(maxFiltersPerConn),
)
handler, err := gateway.Handler(gwapi, api, perConnectionRateLimit, perHostConnectionsPerMinute, serverOptions...)
handler, err := gateway.Handler(
gwapi,
api,
gateway.WithPerConnectionAPIRateLimit(perConnectionRateLimit),
gateway.WithPerHostConnectionsPerMinute(perHostConnectionsPerMinute),
gateway.WithJsonrpcServerOptions(serverOptions...),
)
if err != nil {
return xerrors.Errorf("failed to set up gateway HTTP handler")
}
182 changes: 111 additions & 71 deletions gateway/handler.go
Original file line number Diff line number Diff line change
@@ -37,33 +37,62 @@ type ShutdownHandler interface {
Shutdown(ctx context.Context) error
}

var _ ShutdownHandler = &statefulCallHandler{}
var _ ShutdownHandler = &RateLimitHandler{}
var _ ShutdownHandler = (*statefulCallHandler)(nil)
var _ ShutdownHandler = (*RateLimitHandler)(nil)

// Handler returns a gateway http.Handler, to be mounted as-is on the server. The handler is
// returned as a ShutdownHandler which allows for graceful shutdown of the handler via its
// Shutdown method.
// handlerOptions holds the options for the Handler function.
type handlerOptions struct {
perConnectionAPIRateLimit int
perHostConnectionsPerMinute int
jsonrpcServerOptions []jsonrpc.ServerOption
}

// HandlerOption is a functional option for configuring the Handler.
type HandlerOption func(*handlerOptions)

// WithPerConnectionAPIRateLimit sets the per connection API rate limit.
//
// The handler will limit the number of API calls per minute within a single WebSocket connection
// (where API calls are weighted by their relative expense), and the number of connections per
// minute from a single host.
func WithPerConnectionAPIRateLimit(limit int) HandlerOption {
return func(opts *handlerOptions) {
opts.perConnectionAPIRateLimit = limit
}
}

// WithPerHostConnectionsPerMinute sets the per host connections per minute limit.
//
// Connection limiting is a hard limit that will reject requests with a 429 status code if the limit
// is exceeded. API call limiting is a soft limit that will delay requests if the limit is exceeded.
func Handler(
gwapi lapi.Gateway,
api lapi.FullNode,
perConnectionAPIRateLimit int,
perHostConnectionsPerMinute int,
opts ...jsonrpc.ServerOption,
) (ShutdownHandler, error) {
// Connection limiting is a hard limit that will reject requests with a http.StatusTooManyRequests
// status code if the limit is exceeded. API call limiting is a soft limit that will delay requests
// if the limit is exceeded.
func WithPerHostConnectionsPerMinute(limit int) HandlerOption {
return func(opts *handlerOptions) {
opts.perHostConnectionsPerMinute = limit
}
}

m := mux.NewRouter()
// WithJsonrpcServerOptions sets the JSON-RPC server options.
func WithJsonrpcServerOptions(options ...jsonrpc.ServerOption) HandlerOption {
return func(opts *handlerOptions) {
opts.jsonrpcServerOptions = options
}
}

opts = append(opts, jsonrpc.WithReverseClient[lapi.EthSubscriberMethods]("Filecoin"), jsonrpc.WithServerErrors(lapi.RPCErrors))
// Handler returns a gateway http.Handler, to be mounted as-is on the server. The handler is
// returned as a ShutdownHandler which allows for graceful shutdown of the handler via its
// Shutdown method.
func Handler(gwapi lapi.Gateway, api lapi.FullNode, options ...HandlerOption) (ShutdownHandler, error) {
opts := &handlerOptions{}
for _, option := range options {
option(opts)
}

m := mux.NewRouter()

rpcopts := append(opts.jsonrpcServerOptions, jsonrpc.WithReverseClient[lapi.EthSubscriberMethods]("Filecoin"), jsonrpc.WithServerErrors(lapi.RPCErrors))
serveRpc := func(path string, hnd interface{}) {
rpcServer := jsonrpc.NewServer(opts...)
rpcServer := jsonrpc.NewServer(rpcopts...)
rpcServer.Register("Filecoin", hnd)
rpcServer.AliasMethod("rpc.discover", "Filecoin.Discover")

@@ -91,11 +120,11 @@ func Handler(
m.PathPrefix("/").Handler(http.DefaultServeMux)

handler := &statefulCallHandler{m}
if perConnectionAPIRateLimit > 0 && perHostConnectionsPerMinute > 0 {
if opts.perConnectionAPIRateLimit > 0 || opts.perHostConnectionsPerMinute > 0 {
return NewRateLimitHandler(
handler,
perConnectionAPIRateLimit,
perHostConnectionsPerMinute,
opts.perConnectionAPIRateLimit,
opts.perHostConnectionsPerMinute,
connectionLimiterCleanupInterval,
), nil
}
@@ -125,14 +154,15 @@ type hostLimiter struct {
}

type RateLimitHandler struct {
cancelFunc context.CancelFunc
mu sync.Mutex
limiters map[string]*hostLimiter
perConnectionAPILimit rate.Limit
perHostConnectionsPerMinute int
next http.Handler
cleanupInterval time.Duration
expiryDuration time.Duration
cancelFunc context.CancelFunc
limiters map[string]*hostLimiter
limitersLk sync.Mutex
perConnectionAPILimit rate.Limit
perHostConnectionsLimit rate.Limit
perHostConnectionsLimitBurst int
next http.Handler
cleanupInterval time.Duration
expiryDuration time.Duration
}

// NewRateLimitHandler creates a new RateLimitHandler that wraps the
@@ -149,85 +179,95 @@ func NewRateLimitHandler(

ctx, cancel := context.WithCancel(context.Background())
h := &RateLimitHandler{
cancelFunc: cancel,
limiters: make(map[string]*hostLimiter),
perConnectionAPILimit: rate.Inf,
perHostConnectionsPerMinute: perHostConnectionsPerMinute,
next: next,
cleanupInterval: cleanupInterval,
expiryDuration: 5 * cleanupInterval,
cancelFunc: cancel,
limiters: make(map[string]*hostLimiter),
perConnectionAPILimit: rate.Inf,
perHostConnectionsLimit: rate.Inf,
next: next,
cleanupInterval: cleanupInterval,
expiryDuration: 5 * cleanupInterval,
}
if perConnectionAPIRateLimit > 0 {
h.perConnectionAPILimit = rate.Every(time.Second / time.Duration(perConnectionAPIRateLimit))
}
if perHostConnectionsPerMinute > 0 {
h.perHostConnectionsLimit = rate.Every(time.Minute / time.Duration(perHostConnectionsPerMinute))
h.perHostConnectionsLimitBurst = perHostConnectionsPerMinute
}
go h.cleanupExpiredLimiters(ctx)
return h
}

func (h *RateLimitHandler) getLimits(host string) *hostLimiter {
h.mu.Lock()
defer h.mu.Unlock()

entry, exists := h.limiters[host]
if !exists {
var limiter *rate.Limiter
if h.perHostConnectionsPerMinute > 0 {
requestLimit := rate.Every(time.Minute / time.Duration(h.perHostConnectionsPerMinute))
limiter = rate.NewLimiter(requestLimit, h.perHostConnectionsPerMinute)
}
entry = &hostLimiter{
limiter: limiter,
lastAccess: time.Now(),
func (h *RateLimitHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if h.perHostConnectionsLimit != rate.Inf {
host, _, err := net.SplitHostPort(r.RemoteAddr)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
h.limiters[host] = entry
} else {
entry.lastAccess = time.Now()
}

return entry
}

func (h *RateLimitHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
host, _, err := net.SplitHostPort(r.RemoteAddr)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
h.limitersLk.Lock()
entry, exists := h.limiters[host]
if !exists {
entry = &hostLimiter{
limiter: rate.NewLimiter(h.perHostConnectionsLimit, h.perHostConnectionsLimitBurst),
lastAccess: time.Now(),
}
h.limiters[host] = entry
} else {
entry.lastAccess = time.Now()
}
h.limitersLk.Unlock()

limits := h.getLimits(host)
if limits.limiter != nil && !limits.limiter.Allow() {
w.WriteHeader(http.StatusTooManyRequests)
return
if !entry.limiter.Allow() {
w.WriteHeader(http.StatusTooManyRequests)
return
}
}

if h.perConnectionAPILimit != rate.Inf {
// new rate limiter for each connection, to throttle a single WebSockets connection;
// allow for a burst of MaxRateLimitTokens
apiLimiter := rate.NewLimiter(h.perConnectionAPILimit, MaxRateLimitTokens)
r = r.WithContext(context.WithValue(r.Context(), perConnectionAPIRateLimiterKey, apiLimiter))
r = r.WithContext(setPerConnectionAPIRateLimiter(r.Context(), apiLimiter))
}

h.next.ServeHTTP(w, r)
}

// setPerConnectionAPIRateLimiter sets the rate limiter in the context.
func setPerConnectionAPIRateLimiter(ctx context.Context, limiter *rate.Limiter) context.Context {
return context.WithValue(ctx, perConnectionAPIRateLimiterKey, limiter)
}

// getPerConnectionAPIRateLimiter retrieves the rate limiter from the context.
func getPerConnectionAPIRateLimiter(ctx context.Context) (*rate.Limiter, bool) {
limiter, ok := ctx.Value(perConnectionAPIRateLimiterKey).(*rate.Limiter)
return limiter, ok
}

// cleanupExpiredLimiters periodically checks for limiters that have expired and removes them.
func (h *RateLimitHandler) cleanupExpiredLimiters(ctx context.Context) {
if h.cleanupInterval == 0 {
return
}

for {
ticker := time.NewTicker(h.cleanupInterval)
defer ticker.Stop()

for ctx.Err() == nil {
select {
case <-ctx.Done():
return
case <-time.After(h.cleanupInterval):
h.mu.Lock()
case <-ticker.C:
h.limitersLk.Lock()
now := time.Now()
for host, entry := range h.limiters {
if now.Sub(entry.lastAccess) > h.expiryDuration {
delete(h.limiters, host)
}
}
h.mu.Unlock()
h.limitersLk.Unlock()
}
}
}
2 changes: 1 addition & 1 deletion gateway/handler_test.go
Original file line number Diff line number Diff line change
@@ -13,7 +13,7 @@ import (
func TestRequestRateLimiterHandler(t *testing.T) {
var callCount int
h := gateway.NewRateLimitHandler(
http.HandlerFunc(func(_ http.ResponseWriter, _ *http.Request) {
http.HandlerFunc(func(http.ResponseWriter, *http.Request) {
callCount++
}),
0, // api rate
Loading

0 comments on commit a7b1db5

Please sign in to comment.