Skip to content

Commit

Permalink
Merge PR: rpc api rate limit (#3273)
Browse files Browse the repository at this point in the history
  • Loading branch information
ilovers authored Dec 13, 2023
1 parent f1ce92c commit 345ad1d
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 11 deletions.
2 changes: 1 addition & 1 deletion app/rpc/apis.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func GetAPIs(clientCtx context.CLIContext, log log.Logger, keys ...ethsecp256k1.
rateLimiters := getRateLimiter()
disableAPI := getDisableAPI()
ethBackend = backend.New(clientCtx, log, rateLimiters, disableAPI)
ethAPI := eth.NewAPI(clientCtx, log, ethBackend, nonceLock, keys...)
ethAPI := eth.NewAPI(rateLimiters, clientCtx, log, ethBackend, nonceLock, keys...)
if evmtypes.GetEnableBloomFilter() {
ethBackend.StartBloomHandlers(evmtypes.BloomBitsBlocks, evmtypes.GetIndexer().GetDB())
}
Expand Down
56 changes: 52 additions & 4 deletions app/rpc/namespaces/eth/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"sync"
"time"

"golang.org/x/time/rate"

"github.com/ethereum/go-ethereum/accounts"
"github.com/ethereum/go-ethereum/accounts/keystore"
"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -91,10 +93,18 @@ type PublicEthereumAPI struct {
systemContract []byte
e2cWasmCodeLimit uint64
e2cWasmMsgHelperAddr string
rateLimiters map[string]*rate.Limiter
}

func (api *PublicEthereumAPI) GetRateLimiter(apiName string) *rate.Limiter {
if api.rateLimiters == nil {
return nil
}
return api.rateLimiters[apiName]
}

// NewAPI creates an instance of the public ETH Web3 API.
func NewAPI(
func NewAPI(rateLimiters map[string]*rate.Limiter,
clientCtx clientcontext.CLIContext, log log.Logger, backend backend.Backend, nonceLock *rpctypes.AddrLocker,
keys ...ethsecp256k1.PrivKey,
) *PublicEthereumAPI {
Expand All @@ -118,6 +128,7 @@ func NewAPI(
fastQueryThreshold: viper.GetUint64(FlagFastQueryThreshold),
systemContract: getSystemContractAddr(clientCtx),
e2cWasmMsgHelperAddr: viper.GetString(FlagE2cWasmMsgHelperAddr),
rateLimiters: rateLimiters,
}
api.evmFactory = simulation.NewEvmFactory(clientCtx.ChainID, api.wrappedBackend)
module := evm.AppModuleBasic{}
Expand Down Expand Up @@ -507,6 +518,10 @@ func (api *PublicEthereumAPI) getStorageAt(address common.Address, key []byte, b
func (api *PublicEthereumAPI) GetStorageAt(address common.Address, key string, blockNrOrHash rpctypes.BlockNumberOrHash) (hexutil.Bytes, error) {
monitor := monitor.GetMonitor("eth_getStorageAt", api.logger, api.Metrics).OnBegin()
defer monitor.OnEnd("address", address, "key", key, "block number", blockNrOrHash)
rateLimiter := api.GetRateLimiter("eth_getStorageAt")
if rateLimiter != nil && !rateLimiter.Allow() {
return nil, rpctypes.ErrServerBusy
}
blockNum, err := api.backend.ConvertToBlockNumber(blockNrOrHash)
if err != nil {
return nil, err
Expand All @@ -523,7 +538,10 @@ func (api *PublicEthereumAPI) GetStorageAtInternal(address common.Address, key [
func (api *PublicEthereumAPI) GetTransactionCount(address common.Address, blockNrOrHash rpctypes.BlockNumberOrHash) (*hexutil.Uint64, error) {
monitor := monitor.GetMonitor("eth_getTransactionCount", api.logger, api.Metrics).OnBegin()
defer monitor.OnEnd("address", address, "block number", blockNrOrHash)

rateLimiter := api.GetRateLimiter("eth_getTransactionCount")
if rateLimiter != nil && !rateLimiter.Allow() {
return nil, rpctypes.ErrServerBusy
}
blockNum, err := api.backend.ConvertToBlockNumber(blockNrOrHash)
if err != nil {
return nil, err
Expand Down Expand Up @@ -554,6 +572,10 @@ func (api *PublicEthereumAPI) GetTransactionCount(address common.Address, blockN
func (api *PublicEthereumAPI) GetBlockTransactionCountByHash(hash common.Hash) *hexutil.Uint {
monitor := monitor.GetMonitor("eth_getBlockTransactionCountByHash", api.logger, api.Metrics).OnBegin()
defer monitor.OnEnd("hash", hash)
rateLimiter := api.GetRateLimiter("eth_getBlockTransactionCountByHash")
if rateLimiter != nil && !rateLimiter.Allow() {
return nil
}
res, _, err := api.clientCtx.Query(fmt.Sprintf("custom/%s/%s/%s", evmtypes.ModuleName, evmtypes.QueryHashToHeight, hash.Hex()))
if err != nil {
return nil
Expand Down Expand Up @@ -637,6 +659,10 @@ func (api *PublicEthereumAPI) GetUncleCountByBlockNumber(_ rpctypes.BlockNumber)
func (api *PublicEthereumAPI) GetCode(address common.Address, blockNrOrHash rpctypes.BlockNumberOrHash) (hexutil.Bytes, error) {
monitor := monitor.GetMonitor("eth_getCode", api.logger, api.Metrics).OnBegin()
defer monitor.OnEnd("address", address, "block number", blockNrOrHash)
rateLimiter := api.GetRateLimiter("eth_getCode")
if rateLimiter != nil && !rateLimiter.Allow() {
return nil, rpctypes.ErrServerBusy
}
blockNumber, err := api.backend.ConvertToBlockNumber(blockNrOrHash)
if err != nil {
return nil, err
Expand Down Expand Up @@ -684,6 +710,10 @@ func (api *PublicEthereumAPI) GetCodeByHash(hash common.Hash) (hexutil.Bytes, er
// GetTransactionLogs returns the logs given a transaction hash.
func (api *PublicEthereumAPI) GetTransactionLogs(txHash common.Hash) ([]*ethtypes.Log, error) {
api.logger.Debug("eth_getTransactionLogs", "hash", txHash)
rateLimiter := api.GetRateLimiter("eth_getTransactionLogs")
if rateLimiter != nil && !rateLimiter.Allow() {
return nil, rpctypes.ErrServerBusy
}
return api.backend.GetTransactionLogs(txHash)
}

Expand Down Expand Up @@ -865,6 +895,10 @@ func (api *PublicEthereumAPI) addCallCache(key common.Hash, data []byte) {
func (api *PublicEthereumAPI) Call(args rpctypes.CallArgs, blockNrOrHash rpctypes.BlockNumberOrHash, overrides *evmtypes.StateOverrides) (hexutil.Bytes, error) {
monitor := monitor.GetMonitor("eth_call", api.logger, api.Metrics).OnBegin()
defer monitor.OnEnd("args", args, "block number", blockNrOrHash)
rateLimiter := api.GetRateLimiter("eth_call")
if rateLimiter != nil && !rateLimiter.Allow() {
return nil, rpctypes.ErrServerBusy
}
if overrides != nil {
if err := overrides.Check(); err != nil {
return nil, err
Expand Down Expand Up @@ -1115,7 +1149,10 @@ func (api *PublicEthereumAPI) simDoCall(args rpctypes.CallArgs, cap uint64) (uin
func (api *PublicEthereumAPI) EstimateGas(args rpctypes.CallArgs) (hexutil.Uint64, error) {
monitor := monitor.GetMonitor("eth_estimateGas", api.logger, api.Metrics).OnBegin()
defer monitor.OnEnd("args", args)

rateLimiter := api.GetRateLimiter("eth_estimateGas")
if rateLimiter != nil && !rateLimiter.Allow() {
return 0, rpctypes.ErrServerBusy
}
params, err := api.getEvmParams()
if err != nil {
return 0, TransformDataError(err, "eth_estimateGas")
Expand Down Expand Up @@ -1159,6 +1196,10 @@ func (api *PublicEthereumAPI) EstimateGas(args rpctypes.CallArgs) (hexutil.Uint6
func (api *PublicEthereumAPI) GetBlockByHash(hash common.Hash, fullTx bool) (*watcher.Block, error) {
monitor := monitor.GetMonitor("eth_getBlockByHash", api.logger, api.Metrics).OnBegin()
defer monitor.OnEnd("hash", hash, "full", fullTx)
rateLimiter := api.GetRateLimiter("eth_getBlockByHash")
if rateLimiter != nil && !rateLimiter.Allow() {
return nil, rpctypes.ErrServerBusy
}
blockRes, err := api.backend.GetBlockByHash(hash, fullTx)
if err != nil {
return nil, TransformDataError(err, RPCEthGetBlockByHash)
Expand Down Expand Up @@ -1218,7 +1259,10 @@ func (api *PublicEthereumAPI) getBlockByNumber(blockNum rpctypes.BlockNumber, fu
func (api *PublicEthereumAPI) GetBlockByNumber(blockNum rpctypes.BlockNumber, fullTx bool) (*watcher.Block, error) {
monitor := monitor.GetMonitor("eth_getBlockByNumber", api.logger, api.Metrics).OnBegin()
defer monitor.OnEnd("number", blockNum, "full", fullTx)

rateLimiter := api.GetRateLimiter("eth_getBlockByNumber")
if rateLimiter != nil && !rateLimiter.Allow() {
return nil, rpctypes.ErrServerBusy
}
blockRes, err := api.getBlockByNumber(blockNum, fullTx)
return blockRes, err
}
Expand Down Expand Up @@ -1329,6 +1373,10 @@ func (api *PublicEthereumAPI) getTransactionByBlockAndIndex(block *tmtypes.Block
func (api *PublicEthereumAPI) GetTransactionReceipt(hash common.Hash) (*watcher.TransactionReceipt, error) {
monitor := monitor.GetMonitor("eth_getTransactionReceipt", api.logger, api.Metrics).OnBegin()
defer monitor.OnEnd("hash", hash)
rateLimiter := api.GetRateLimiter("eth_getTransactionReceipt")
if rateLimiter != nil && !rateLimiter.Allow() {
return nil, rpctypes.ErrServerBusy
}
res, e := api.wrappedBackend.GetTransactionReceipt(hash)
// do not use watchdb when it`s a evm2cm tx
if e == nil && !api.isEvm2CmTx(res.To) {
Expand Down
11 changes: 5 additions & 6 deletions app/rpc/namespaces/eth/filters/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
)

var (
ErrServerBusy = errors.New("server is too busy")
ErrMethodNotAllowed = errors.New("the method is not allowed")
NameSpace = "filters"
)
Expand Down Expand Up @@ -138,7 +137,7 @@ func (api *PublicFilterAPI) NewPendingTransactionFilter() rpc.ID {
}
rateLimiter := api.backend.GetRateLimiter("eth_newPendingTransactionFilter")
if rateLimiter != nil && !rateLimiter.Allow() {
return rpc.ID(fmt.Sprintf("error creating pending tx filter: %s", ErrServerBusy.Error()))
return rpc.ID(fmt.Sprintf("error creating pending tx filter: %s", rpctypes.ErrServerBusy.Error()))
}
pendingTxSub, cancelSubs, err := api.events.SubscribePendingTxs()
if err != nil {
Expand Down Expand Up @@ -235,7 +234,7 @@ func (api *PublicFilterAPI) NewBlockFilter() rpc.ID {
}
rateLimiter := api.backend.GetRateLimiter("eth_newBlockFilter")
if rateLimiter != nil && !rateLimiter.Allow() {
return rpc.ID(fmt.Sprintf("error creating block filter: %s", ErrServerBusy.Error()))
return rpc.ID(fmt.Sprintf("error creating block filter: %s", rpctypes.ErrServerBusy.Error()))
}
headerSub, cancelSubs, err := api.events.SubscribeNewHeads()
if err != nil {
Expand Down Expand Up @@ -402,7 +401,7 @@ func (api *PublicFilterAPI) NewFilter(criteria filters.FilterCriteria) (rpc.ID,
}
rateLimiter := api.backend.GetRateLimiter("eth_newFilter")
if rateLimiter != nil && !rateLimiter.Allow() {
return rpc.ID(""), ErrServerBusy
return rpc.ID(""), rpctypes.ErrServerBusy
}
var (
filterID = rpc.ID("")
Expand Down Expand Up @@ -468,7 +467,7 @@ func (api *PublicFilterAPI) GetLogs(ctx context.Context, criteria filters.Filter
}
rateLimiter := api.backend.GetRateLimiter("eth_getLogs")
if rateLimiter != nil && !rateLimiter.Allow() {
return nil, ErrServerBusy
return nil, rpctypes.ErrServerBusy
}
var filter *Filter
if criteria.BlockHash != nil {
Expand Down Expand Up @@ -574,7 +573,7 @@ func (api *PublicFilterAPI) GetFilterChanges(id rpc.ID) (interface{}, error) {
}
rateLimiter := api.backend.GetRateLimiter("eth_getFilterChanges")
if rateLimiter != nil && !rateLimiter.Allow() {
return nil, ErrServerBusy
return nil, rpctypes.ErrServerBusy
}
api.filtersMu.Lock()
defer api.filtersMu.Unlock()
Expand Down
1 change: 1 addition & 0 deletions app/rpc/types/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
)

var (
ErrServerBusy = errors.New("server is too busy, please try again later")
// static gas limit for all blocks
defaultGasLimit = hexutil.Uint64(int64(^uint32(0)))
defaultGasUsed = hexutil.Uint64(0)
Expand Down

0 comments on commit 345ad1d

Please sign in to comment.