From 8b348ddb115c2f64a6eadeb80a33fcc3e9d49e09 Mon Sep 17 00:00:00 2001 From: DarianShawn Date: Wed, 24 Aug 2022 15:33:45 +0800 Subject: [PATCH 1/7] Add debug logs --- jsonrpc/filter_manager.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/jsonrpc/filter_manager.go b/jsonrpc/filter_manager.go index 02c09bb44b..cfad1b8dda 100644 --- a/jsonrpc/filter_manager.go +++ b/jsonrpc/filter_manager.go @@ -289,6 +289,10 @@ OUT_LOOP: // set timer to remove filter if filterBase != nil { + // It is awful, which would simple drop the not expired timer to the GC, + // which might take several minutes to recycle it. + // And even worse, the ws filter don't have an expire timestamp, which + // means we might make an imediate fired timer. timeoutCh = time.After(time.Until(filterBase.expiredAt)) } @@ -308,6 +312,8 @@ OUT_LOOP: continue OUT_LOOP } + f.logger.Info("filterBase timeout", "id", filterBase.id, "expired", filterBase.expiredAt) + if !f.Uninstall(filterBase.id) { f.logger.Error("failed to uninstall filter", "id", filterBase.id) } @@ -551,6 +557,8 @@ func (f *FilterManager) addFilter(filter filter) string { f.emitSignalToUpdateCh() } + f.logger.Debug("filter added", "id", base.id, "timeout", base.expiredAt) + return base.id } From 49e3969d35df3200e2326c6cc2038dcdeb4d9d87 Mon Sep 17 00:00:00 2001 From: DarianShawn Date: Wed, 24 Aug 2022 16:40:00 +0800 Subject: [PATCH 2/7] Remove websocket connection when closed --- jsonrpc/dispatcher.go | 6 + jsonrpc/eth_endpoint.go | 8 +- jsonrpc/filter_manager.go | 206 ++++++++++++++++++--------------- jsonrpc/filter_manager_test.go | 84 +++++++++++++- jsonrpc/jsonrpc.go | 24 +++- 5 files changed, 221 insertions(+), 107 deletions(-) diff --git a/jsonrpc/dispatcher.go b/jsonrpc/dispatcher.go index ffb5fbae5a..69b3ce88f9 100644 --- a/jsonrpc/dispatcher.go +++ b/jsonrpc/dispatcher.go @@ -98,6 +98,8 @@ func (d *Dispatcher) getFnHandler(req Request) (*serviceData, *funcData, Error) type wsConn interface { WriteMessage(messageType int, data []byte) error + GetFilterID() string + SetFilterID(string) } // as per https://www.jsonrpc.org/specification, the `id` in JSON-RPC 2.0 @@ -168,6 +170,10 @@ func (d *Dispatcher) handleUnsubscribe(req Request) (bool, Error) { return d.filterManager.Uninstall(filterID), nil } +func (d *Dispatcher) RemoveFilterByWs(conn wsConn) { + d.filterManager.RemoveFilterByWs(conn) +} + func (d *Dispatcher) HandleWs(reqBody []byte, conn wsConn) ([]byte, error) { var req Request if err := json.Unmarshal(reqBody, &req); err != nil { diff --git a/jsonrpc/eth_endpoint.go b/jsonrpc/eth_endpoint.go index d3abf706b0..378b9166e0 100644 --- a/jsonrpc/eth_endpoint.go +++ b/jsonrpc/eth_endpoint.go @@ -817,16 +817,12 @@ func (e *Eth) GetFilterChanges(id string) (interface{}, error) { // UninstallFilter uninstalls a filter with given ID func (e *Eth) UninstallFilter(id string) (bool, error) { - ok := e.filterManager.Uninstall(id) - - return ok, nil + return e.filterManager.Uninstall(id), nil } // Unsubscribe uninstalls a filter in a websocket func (e *Eth) Unsubscribe(id string) (bool, error) { - ok := e.filterManager.Uninstall(id) - - return ok, nil + return e.filterManager.Uninstall(id), nil } func (e *Eth) getBlockHeader(number BlockNumber) (*types.Header, error) { diff --git a/jsonrpc/filter_manager.go b/jsonrpc/filter_manager.go index cfad1b8dda..eb8bf1a00e 100644 --- a/jsonrpc/filter_manager.go +++ b/jsonrpc/filter_manager.go @@ -1,6 +1,7 @@ package jsonrpc import ( + "bytes" "container/heap" "encoding/json" "errors" @@ -23,6 +24,7 @@ var ( ErrBlockNotFound = errors.New("block not found") ErrIncorrectBlockRange = errors.New("incorrect range") ErrPendingBlockNumber = errors.New("pending block number is not supported") + ErrNoWSConnection = errors.New("no websocket connection") ) // defaultTimeout is the timeout to remove the filters that don't have a web socket stream @@ -35,8 +37,8 @@ const ( // filter is an interface that BlockFilter and LogFilter implement type filter interface { - // isWS returns the flag indicating the filter has web socket stream - isWS() bool + // hasWSConn returns the flag indicating the filter has web socket stream + hasWSConn() bool // getFilterBase returns filterBase that has common fields getFilterBase() *filterBase @@ -57,7 +59,7 @@ type filterBase struct { heapIndex int // timestamp to be expired - expiredAt time.Time + expiresAt time.Time // websocket connection ws wsConn @@ -77,8 +79,8 @@ func (f *filterBase) getFilterBase() *filterBase { return f } -// isWS returns the flag indicating this filter has websocket connection -func (f *filterBase) isWS() bool { +// hasWSConn returns the flag indicating this filter has websocket connection +func (f *filterBase) hasWSConn() bool { return f.ws != nil } @@ -93,12 +95,19 @@ const ethSubscriptionTemplate = `{ // writeMessageToWs sends given message to websocket stream func (f *filterBase) writeMessageToWs(msg string) error { - res := fmt.Sprintf(ethSubscriptionTemplate, f.id, msg) - if err := f.ws.WriteMessage(websocket.TextMessage, []byte(res)); err != nil { + if !f.hasWSConn() { + return ErrNoWSConnection + } + + var v bytes.Buffer + if _, err := v.WriteString(fmt.Sprintf(ethSubscriptionTemplate, f.id, msg)); err != nil { return err } - return nil + return f.ws.WriteMessage( + websocket.TextMessage, + v.Bytes(), + ) } // blockFilter is a filter to store the updates of block @@ -113,8 +122,9 @@ func (f *blockFilter) takeBlockUpdates() []*types.Header { updates, newHead := f.block.getUpdates() f.Lock() + defer f.Unlock() + f.block = newHead - f.Unlock() return updates } @@ -123,9 +133,10 @@ func (f *blockFilter) takeBlockUpdates() []*types.Header { func (f *blockFilter) getUpdates() (string, error) { headers := f.takeBlockUpdates() - updates := []string{} - for _, header := range headers { - updates = append(updates, header.Hash.String()) + // alloc once and for all + updates := make([]string, len(headers)) + for i, header := range headers { + updates[i] = header.Hash.String() } return fmt.Sprintf("[\"%s\"]", strings.Join(updates, "\",\"")), nil @@ -135,8 +146,9 @@ func (f *blockFilter) getUpdates() (string, error) { func (f *blockFilter) sendUpdates() error { updates := f.takeBlockUpdates() - for _, block := range updates { - raw, err := json.Marshal(block) + // it is block header actually + for _, header := range updates { + raw, err := json.Marshal(header) if err != nil { return err } @@ -226,6 +238,8 @@ type filterManagerStore interface { // FilterManager manages all running filters type FilterManager struct { + sync.RWMutex // provide basic r/w lock + logger hclog.Logger timeout time.Duration @@ -234,7 +248,6 @@ type FilterManager struct { subscription blockchain.Subscription blockStream *blockStream - lock sync.RWMutex filters map[string]filter timeouts timeHeapImpl @@ -248,7 +261,6 @@ func NewFilterManager(logger hclog.Logger, store filterManagerStore) *FilterMana timeout: defaultTimeout, store: store, blockStream: &blockStream{}, - lock: sync.RWMutex{}, filters: make(map[string]filter), timeouts: timeHeapImpl{}, updateCh: make(chan struct{}), @@ -293,7 +305,7 @@ OUT_LOOP: // which might take several minutes to recycle it. // And even worse, the ws filter don't have an expire timestamp, which // means we might make an imediate fired timer. - timeoutCh = time.After(time.Until(filterBase.expiredAt)) + timeoutCh = time.After(time.Until(filterBase.expiresAt)) } select { @@ -312,7 +324,7 @@ OUT_LOOP: continue OUT_LOOP } - f.logger.Info("filterBase timeout", "id", filterBase.id, "expired", filterBase.expiredAt) + f.logger.Info("filterBase timeout", "id", filterBase.id, "expiresAt", filterBase.expiresAt) if !f.Uninstall(filterBase.id) { f.logger.Error("failed to uninstall filter", "id", filterBase.id) @@ -340,6 +352,10 @@ func (f *FilterManager) NewBlockFilter(ws wsConn) string { block: f.blockStream.Head(), } + if filter.hasWSConn() { + ws.SetFilterID(filter.id) + } + return f.addFilter(filter) } @@ -355,8 +371,8 @@ func (f *FilterManager) NewLogFilter(logQuery *LogQuery, ws wsConn) string { // Exists checks the filter with given ID exists func (f *FilterManager) Exists(id string) bool { - f.lock.RLock() - defer f.lock.RUnlock() + f.RLock() + defer f.RUnlock() _, ok := f.filters[id] @@ -474,18 +490,24 @@ func (f *FilterManager) GetLogs(query *LogQuery) ([]*Log, error) { return f.getLogsFromBlocks(query) } -// GetLogFilterFromID return log filter for given filterID -func (f *FilterManager) GetLogFilterFromID(filterID string) (*logFilter, error) { - f.lock.RLock() - defer f.lock.RUnlock() +// getFilterByID fetches the filter by the ID +// +// Release lock as quick as possible +func (f *FilterManager) getFilterByID(filterID string) filter { + f.RLock() + defer f.RUnlock() - filter, ok := f.filters[filterID] + return f.filters[filterID] +} - if !ok { +// GetLogFilterFromID return log filter for given filterID +func (f *FilterManager) GetLogFilterFromID(filterID string) (*logFilter, error) { + filterRaw := f.getFilterByID(filterID) + if filterRaw == nil { return nil, ErrFilterDoesNotExists } - logFilter, ok := filter.(*logFilter) + logFilter, ok := filterRaw.(*logFilter) if !ok { return nil, ErrCastingFilterToLogFilter } @@ -495,8 +517,8 @@ func (f *FilterManager) GetLogFilterFromID(filterID string) (*logFilter, error) // GetFilterChanges returns the updates of the filter with given ID in string func (f *FilterManager) GetFilterChanges(id string) (string, error) { - f.lock.RLock() - defer f.lock.RUnlock() + f.RLock() + defer f.RUnlock() filter, ok := f.filters[id] @@ -505,7 +527,7 @@ func (f *FilterManager) GetFilterChanges(id string) (string, error) { } // we cannot get updates from a ws filter with getFilterChanges - if filter.isWS() { + if filter.hasWSConn() { return "", ErrWSFilterDoesNotSupportGetChanges } @@ -519,13 +541,15 @@ func (f *FilterManager) GetFilterChanges(id string) (string, error) { // Uninstall removes the filter with given ID from list func (f *FilterManager) Uninstall(id string) bool { - f.lock.Lock() - defer f.lock.Unlock() + f.Lock() + defer f.Unlock() return f.removeFilterByID(id) } -// removeFilterByID removes the filter with given ID, unsafe against race condition +// removeFilterByID removes the filter with given ID +// +// Not thread safe func (f *FilterManager) removeFilterByID(id string) bool { filter, ok := f.filters[id] if !ok { @@ -541,23 +565,31 @@ func (f *FilterManager) removeFilterByID(id string) bool { return true } +// RemoveFilterByWs removes the filter with given WS [Thread safe] +func (f *FilterManager) RemoveFilterByWs(ws wsConn) { + f.Lock() + defer f.Unlock() + + f.removeFilterByID(ws.GetFilterID()) +} + // addFilter is an internal method to add given filter to list and heap func (f *FilterManager) addFilter(filter filter) string { - f.lock.Lock() - defer f.lock.Unlock() + f.Lock() + defer f.Unlock() base := filter.getFilterBase() f.filters[base.id] = filter // Set timeout and add to heap if filter doesn't have web socket connection - if !filter.isWS() { - base.expiredAt = time.Now().Add(f.timeout) + if !filter.hasWSConn() { + base.expiresAt = time.Now().Add(f.timeout) f.timeouts.addFilter(base) f.emitSignalToUpdateCh() } - f.logger.Debug("filter added", "id", base.id, "timeout", base.expiredAt) + f.logger.Debug("filter added", "id", base.id, "timeout", base.expiresAt) return base.id } @@ -573,8 +605,8 @@ func (f *FilterManager) emitSignalToUpdateCh() { // nextTimeoutFilter returns the filter that will be expired next // nextTimeoutFilter returns the only filter with timeout func (f *FilterManager) nextTimeoutFilter() *filterBase { - f.lock.RLock() - defer f.lock.RUnlock() + f.RLock() + defer f.RUnlock() if len(f.timeouts) == 0 { return nil @@ -586,12 +618,10 @@ func (f *FilterManager) nextTimeoutFilter() *filterBase { return base } -// dispatchEvent is a event handler for new block event +// dispatchEvent is an event handler for new block event func (f *FilterManager) dispatchEvent(evnt *blockchain.Event) error { // store new event in each filters - if err := f.processEvent(evnt); err != nil { - return err - } + f.processEvent(evnt) // send data to web socket stream if err := f.flushWsFilters(); err != nil { @@ -602,34 +632,25 @@ func (f *FilterManager) dispatchEvent(evnt *blockchain.Event) error { } // processEvent makes each filter append the new data that interests them -func (f *FilterManager) processEvent(evnt *blockchain.Event) error { - f.lock.RLock() - defer f.lock.RUnlock() +func (f *FilterManager) processEvent(evnt *blockchain.Event) { + f.RLock() + defer f.RUnlock() - // first include all the new headers in the blockstream for BlockFilter for _, header := range evnt.NewChain { + // first include all the new headers in the blockstream for BlockFilter f.blockStream.push(header) - } - - // process old chain to include old logs marked removed for LogFilter - for _, header := range evnt.OldChain { - if processErr := f.appendLogsToFilters(header, true); processErr != nil { - f.logger.Error(fmt.Sprintf("Unable to process block, %v", processErr)) - } - } - // process new chain to include new logs for LogFilter - for _, header := range evnt.NewChain { - if processErr := f.appendLogsToFilters(header, false); processErr != nil { + // process new chain to include new logs for LogFilter + if processErr := f.appendLogsToFilters(header); processErr != nil { f.logger.Error(fmt.Sprintf("Unable to process block, %v", processErr)) } } - - return nil } // appendLogsToFilters makes each LogFilters append logs in the header -func (f *FilterManager) appendLogsToFilters(header *types.Header, removed bool) error { +// +// Would not append any removed logs. +func (f *FilterManager) appendLogsToFilters(header *types.Header) error { receipts, err := f.store.GetReceiptsByHash(header.Hash) if err != nil { return err @@ -651,26 +672,25 @@ func (f *FilterManager) appendLogsToFilters(header *types.Header, removed bool) for indx, receipt := range receipts { // check the logs with the filters for _, log := range receipt.Logs { - if receipt.TxHash == types.ZeroHash { - // Extract tx Hash - receipt.TxHash = block.Transactions[indx].Hash - } - - nn := &Log{ - Address: log.Address, - Topics: log.Topics, - Data: argBytes(log.Data), - BlockNumber: argUint64(header.Number), - BlockHash: header.Hash, - TxHash: receipt.TxHash, - TxIndex: argUint64(indx), - Removed: removed, - } - for _, f := range logFilters { - if f.query.Match(log) { - f.appendLog(nn) + if !f.query.Match(log) { + continue } + + if receipt.TxHash == types.ZeroHash { + // Extract tx Hash + receipt.TxHash = block.Transactions[indx].Hash + } + f.appendLog(&Log{ + Address: log.Address, + Topics: log.Topics, + Data: argBytes(log.Data), + BlockNumber: argUint64(header.Number), + BlockHash: header.Hash, + TxHash: receipt.TxHash, + TxIndex: argUint64(indx), + Removed: false, + }) } } } @@ -683,10 +703,10 @@ func (f *FilterManager) appendLogsToFilters(header *types.Header, removed bool) func (f *FilterManager) flushWsFilters() error { closedFilterIDs := make([]string, 0) - f.lock.RLock() + f.RLock() for id, filter := range f.filters { - if !filter.isWS() { + if !filter.hasWSConn() { continue } @@ -704,17 +724,17 @@ func (f *FilterManager) flushWsFilters() error { } } - f.lock.RUnlock() + f.RUnlock() // remove filters with closed web socket connections from FilterManager if len(closedFilterIDs) > 0 { - f.lock.Lock() + f.Lock() for _, id := range closedFilterIDs { f.removeFilterByID(id) } - f.lock.Unlock() + f.Unlock() f.emitSignalToUpdateCh() f.logger.Info(fmt.Sprintf("Removed %d filters due to closed connections", len(closedFilterIDs))) } @@ -724,10 +744,10 @@ func (f *FilterManager) flushWsFilters() error { // getLogFilters returns logFilters func (f *FilterManager) getLogFilters() []*logFilter { - f.lock.RLock() - defer f.lock.RUnlock() + f.RLock() + defer f.RUnlock() - logFilters := []*logFilter{} + logFilters := make([]*logFilter, 0) for _, f := range f.filters { if logFilter, ok := f.(*logFilter); ok { @@ -757,7 +777,7 @@ func (t *timeHeapImpl) removeFilter(filter *filterBase) bool { func (t timeHeapImpl) Len() int { return len(t) } func (t timeHeapImpl) Less(i, j int) bool { - return t[i].expiredAt.Before(t[j].expiredAt) + return t[i].expiresAt.Before(t[j].expiresAt) } func (t timeHeapImpl) Swap(i, j int) { @@ -819,15 +839,11 @@ type headElem struct { } func (h *headElem) getUpdates() ([]*types.Header, *headElem) { - res := []*types.Header{} + res := make([]*types.Header, 0) cur := h - for { - if cur.next == nil { - break - } - + for cur.next != nil { cur = cur.next res = append(res, cur.header) } diff --git a/jsonrpc/filter_manager_test.go b/jsonrpc/filter_manager_test.go index 5bc43f8bd0..10333c9a71 100644 --- a/jsonrpc/filter_manager_test.go +++ b/jsonrpc/filter_manager_test.go @@ -14,9 +14,16 @@ import ( ) func TestFilterLog(t *testing.T) { + t.Parallel() + store := newMockStore() m := NewFilterManager(hclog.NewNullLogger(), store) + // filter manager should Close(), but mock one might crash on writing on a closed channel + //nolint:errcheck + defer recover() + defer m.Close() + go m.Run() id := m.NewLogFilter(&LogQuery{ @@ -74,9 +81,16 @@ func TestFilterLog(t *testing.T) { } func TestFilterBlock(t *testing.T) { + t.Parallel() + store := newMockStore() m := NewFilterManager(hclog.NewNullLogger(), store) + // filter manager should Close(), but mock one might crash on writing on a closed channel + //nolint:errcheck + defer recover() + defer m.Close() + go m.Run() // add block filter @@ -178,6 +192,10 @@ func Test_GetLogsForQuery(t *testing.T) { f := NewFilterManager(hclog.NewNullLogger(), store) + t.Cleanup(func() { + f.Close() // prevent memory leak + }) + testTable := []struct { name string query *LogQuery @@ -256,6 +274,8 @@ func Test_GetLogsForQuery(t *testing.T) { } func Test_GetLogFilterFromID(t *testing.T) { + t.Parallel() // speed it up + store := newMockStore() m := NewFilterManager(hclog.NewNullLogger(), store) @@ -281,9 +301,16 @@ func Test_GetLogFilterFromID(t *testing.T) { } func TestFilterTimeout(t *testing.T) { + t.Parallel() + store := newMockStore() m := NewFilterManager(hclog.NewNullLogger(), store) + // filter manager should Close(), but mock one might crash on writing on a closed channel + //nolint:errcheck + defer recover() + defer m.Close() + m.timeout = 2 * time.Second go m.Run() @@ -296,7 +323,34 @@ func TestFilterTimeout(t *testing.T) { assert.False(t, m.Exists(id)) } +func TestRemoveFilterByWebsocket(t *testing.T) { + t.Parallel() + + store := newMockStore() + + mock := &mockWsConn{ + msgCh: make(chan []byte, 1), + } + + m := NewFilterManager(hclog.NewNullLogger(), store) + // filter manager should Close(), but mock one might crash on writing on a closed channel + //nolint:errcheck + defer recover() + defer m.Close() + + go m.Run() + + id := m.NewBlockFilter(mock) + + m.RemoveFilterByWs(mock) + + // false because filter was removed + assert.False(t, m.Exists(id)) +} + func TestFilterWebsocket(t *testing.T) { + t.Parallel() + store := newMockStore() mock := &mockWsConn{ @@ -304,6 +358,11 @@ func TestFilterWebsocket(t *testing.T) { } m := NewFilterManager(hclog.NewNullLogger(), store) + // filter manager should Close(), but mock one might crash on writing on a closed channel + //nolint:errcheck + defer recover() + defer m.Close() + go m.Run() id := m.NewBlockFilter(mock) @@ -314,7 +373,16 @@ func TestFilterWebsocket(t *testing.T) { } type mockWsConn struct { - msgCh chan []byte + msgCh chan []byte + filterID string +} + +func (m *mockWsConn) SetFilterID(filterID string) { + m.filterID = filterID +} + +func (m *mockWsConn) GetFilterID() string { + return m.filterID } func (m *mockWsConn) WriteMessage(messageType int, b []byte) error { @@ -324,6 +392,8 @@ func (m *mockWsConn) WriteMessage(messageType int, b []byte) error { } func TestHeadStream(t *testing.T) { + t.Parallel() + b := &blockStream{} b.push(&types.Header{Hash: types.StringToHash("1")}) @@ -347,14 +417,26 @@ func TestHeadStream(t *testing.T) { type MockClosedWSConnection struct{} +func (m *MockClosedWSConnection) SetFilterID(_filterID string) {} + +func (m *MockClosedWSConnection) GetFilterID() string { + return "" +} + func (m *MockClosedWSConnection) WriteMessage(_messageType int, _data []byte) error { return websocket.ErrCloseSent } func TestClosedFilterDeletion(t *testing.T) { + t.Parallel() + store := newMockStore() m := NewFilterManager(hclog.NewNullLogger(), store) + // filter manager should Close(), but mock one might crash on writing on a closed channel + //nolint:errcheck + defer recover() + defer m.Close() go m.Run() diff --git a/jsonrpc/jsonrpc.go b/jsonrpc/jsonrpc.go index c79865a5cd..aebf2e21c4 100644 --- a/jsonrpc/jsonrpc.go +++ b/jsonrpc/jsonrpc.go @@ -41,6 +41,7 @@ type JSONRPC struct { } type dispatcher interface { + RemoveFilterByWs(conn wsConn) HandleWs(reqBody []byte, conn wsConn) ([]byte, error) Handle(reqBody []byte) ([]byte, error) } @@ -143,15 +144,25 @@ var wsUpgrader = websocket.Upgrader{ // wsWrapper is a wrapping object for the web socket connection and logger type wsWrapper struct { - ws *websocket.Conn // the actual WS connection - logger hclog.Logger // module logger - writeLock sync.Mutex // writer lock + sync.Mutex // basic r/w lock + + ws *websocket.Conn // the actual WS connection + logger hclog.Logger // module logger + filterID string // filter ID +} + +func (w *wsWrapper) SetFilterID(filterID string) { + w.filterID = filterID +} + +func (w *wsWrapper) GetFilterID() string { + return w.filterID } // WriteMessage writes out the message to the WS peer func (w *wsWrapper) WriteMessage(messageType int, data []byte) error { - w.writeLock.Lock() - defer w.writeLock.Unlock() + w.Lock() + defer w.Unlock() writeErr := w.ws.WriteMessage(messageType, data) if writeErr != nil { @@ -211,6 +222,9 @@ func (j *JSONRPC) handleWs(w http.ResponseWriter, req *http.Request) { j.logger.Info("Closing WS connection with error") } + // remove websocket connection when closed + j.dispatcher.RemoveFilterByWs(wrapConn) + break } From 5d46b345a174d149b0fa133c29311dae9b192ed7 Mon Sep 17 00:00:00 2001 From: DarianShawn Date: Wed, 24 Aug 2022 18:52:53 +0800 Subject: [PATCH 3/7] Add batch and block limit to jsonrpc and graphql --- command/server/config.go | 43 +++++++------ command/server/params.go | 60 +++++++++-------- command/server/server.go | 15 +++++ graphql/service.go | 8 ++- jsonrpc/default.go | 8 +++ jsonrpc/dispatcher.go | 31 ++++++--- jsonrpc/dispatcher_test.go | 113 ++++++++++++++++++++++++++------- jsonrpc/filter_manager.go | 32 ++++++---- jsonrpc/filter_manager_test.go | 26 +++++--- jsonrpc/jsonrpc.go | 17 ++++- jsonrpc/web3_endpoint_test.go | 4 +- server/config.go | 3 + server/server.go | 3 + 13 files changed, 263 insertions(+), 100 deletions(-) create mode 100644 jsonrpc/default.go diff --git a/command/server/config.go b/command/server/config.go index 0fa68a6bcf..df3061f28f 100644 --- a/command/server/config.go +++ b/command/server/config.go @@ -7,6 +7,7 @@ import ( "strings" "github.com/dogechain-lab/dogechain/command" + "github.com/dogechain-lab/dogechain/jsonrpc" "github.com/dogechain-lab/dogechain/network" "github.com/hashicorp/hcl" @@ -14,23 +15,25 @@ import ( // Config defines the server configuration params type Config struct { - GenesisPath string `json:"chain_config"` - SecretsConfigPath string `json:"secrets_config"` - DataDir string `json:"data_dir"` - BlockGasTarget string `json:"block_gas_target"` - GRPCAddr string `json:"grpc_addr"` - JSONRPCAddr string `json:"jsonrpc_addr"` - Telemetry *Telemetry `json:"telemetry"` - Network *Network `json:"network"` - ShouldSeal bool `json:"seal"` - TxPool *TxPool `json:"tx_pool"` - LogLevel string `json:"log_level"` - RestoreFile string `json:"restore_file"` - BlockTime uint64 `json:"block_time_s"` - Headers *Headers `json:"headers"` - LogFilePath string `json:"log_to"` - EnableGraphQL bool `json:"enable_graphql"` - GraphQLAddr string `json:"graphql_addr"` + GenesisPath string `json:"chain_config"` + SecretsConfigPath string `json:"secrets_config"` + DataDir string `json:"data_dir"` + BlockGasTarget string `json:"block_gas_target"` + GRPCAddr string `json:"grpc_addr"` + JSONRPCAddr string `json:"jsonrpc_addr"` + Telemetry *Telemetry `json:"telemetry"` + Network *Network `json:"network"` + ShouldSeal bool `json:"seal"` + TxPool *TxPool `json:"tx_pool"` + LogLevel string `json:"log_level"` + RestoreFile string `json:"restore_file"` + BlockTime uint64 `json:"block_time_s"` + Headers *Headers `json:"headers"` + LogFilePath string `json:"log_to"` + EnableGraphQL bool `json:"enable_graphql"` + GraphQLAddr string `json:"graphql_addr"` + JSONRPCBatchRequestLimit uint64 `json:"json_rpc_batch_request_limit" yaml:"json_rpc_batch_request_limit"` + JSONRPCBlockRangeLimit uint64 `json:"json_rpc_block_range_limit" yaml:"json_rpc_block_range_limit"` } // Telemetry holds the config details for metric services. @@ -95,8 +98,10 @@ func DefaultConfig() *Config { Headers: &Headers{ AccessControlAllowOrigins: []string{"*"}, }, - LogFilePath: "", - EnableGraphQL: false, + LogFilePath: "", + EnableGraphQL: false, + JSONRPCBatchRequestLimit: jsonrpc.DefaultJSONRPCBatchRequestLimit, + JSONRPCBlockRangeLimit: jsonrpc.DefaultJSONRPCBlockRangeLimit, } } diff --git a/command/server/params.go b/command/server/params.go index e561ab69a2..aa9963fb9d 100644 --- a/command/server/params.go +++ b/command/server/params.go @@ -14,32 +14,34 @@ import ( ) const ( - configFlag = "config" - genesisPathFlag = "chain" - dataDirFlag = "data-dir" - libp2pAddressFlag = "libp2p" - prometheusAddressFlag = "prometheus" - natFlag = "nat" - dnsFlag = "dns" - sealFlag = "seal" - maxPeersFlag = "max-peers" - maxInboundPeersFlag = "max-inbound-peers" - maxOutboundPeersFlag = "max-outbound-peers" - priceLimitFlag = "price-limit" - maxSlotsFlag = "max-slots" - maxAccountDemotionsFlag = "max-account-demotions" - pruneTickSecondsFlag = "prune-tick-seconds" - promoteOutdateSecondsFlag = "promote-outdate-seconds" - blockGasTargetFlag = "block-gas-target" - secretsConfigFlag = "secrets-config" - restoreFlag = "restore" - blockTimeFlag = "block-time" - devIntervalFlag = "dev-interval" - devFlag = "dev" - corsOriginFlag = "access-control-allow-origins" - daemonFlag = "daemon" - logFileLocationFlag = "log-to" - enableGraphQLFlag = "enable-graphql" + configFlag = "config" + genesisPathFlag = "chain" + dataDirFlag = "data-dir" + libp2pAddressFlag = "libp2p" + prometheusAddressFlag = "prometheus" + natFlag = "nat" + dnsFlag = "dns" + sealFlag = "seal" + maxPeersFlag = "max-peers" + maxInboundPeersFlag = "max-inbound-peers" + maxOutboundPeersFlag = "max-outbound-peers" + priceLimitFlag = "price-limit" + maxSlotsFlag = "max-slots" + maxAccountDemotionsFlag = "max-account-demotions" + pruneTickSecondsFlag = "prune-tick-seconds" + promoteOutdateSecondsFlag = "promote-outdate-seconds" + blockGasTargetFlag = "block-gas-target" + secretsConfigFlag = "secrets-config" + restoreFlag = "restore" + blockTimeFlag = "block-time" + devIntervalFlag = "dev-interval" + devFlag = "dev" + corsOriginFlag = "access-control-allow-origins" + daemonFlag = "daemon" + logFileLocationFlag = "log-to" + enableGraphQLFlag = "enable-graphql" + jsonRPCBatchRequestLimitFlag = "json-rpc-batch-request-limit" + jsonRPCBlockRangeLimitFlag = "json-rpc-block-range-limit" ) const ( @@ -85,6 +87,9 @@ type serverParams struct { secretsConfig *secrets.SecretsManagerConfig logFileLocation string + + jsonRPCBatchLengthLimit uint64 + jsonRPCBlockRangeLimit uint64 } func (p *serverParams) validateFlags() error { @@ -162,11 +167,14 @@ func (p *serverParams) generateConfig() *server.Config { JSONRPC: &server.JSONRPC{ JSONRPCAddr: p.jsonRPCAddress, AccessControlAllowOrigin: p.corsAllowedOrigins, + BatchLengthLimit: p.jsonRPCBatchLengthLimit, + BlockRangeLimit: p.jsonRPCBlockRangeLimit, }, EnableGraphQL: p.rawConfig.EnableGraphQL, GraphQL: &server.GraphQL{ GraphQLAddr: p.graphqlAddress, AccessControlAllowOrigin: p.corsAllowedOrigins, + BlockRangeLimit: p.jsonRPCBlockRangeLimit, }, GRPCAddr: p.grpcAddress, LibP2PAddr: p.libp2pAddress, diff --git a/command/server/server.go b/command/server/server.go index 29971f1196..b6e0fb1932 100644 --- a/command/server/server.go +++ b/command/server/server.go @@ -234,6 +234,21 @@ func setFlags(cmd *cobra.Command) { "the flag indicating that node enable graphql service", ) + cmd.Flags().Uint64Var( + ¶ms.jsonRPCBatchLengthLimit, + jsonRPCBatchRequestLimitFlag, + defaultConfig.JSONRPCBatchRequestLimit, + "the max length to be considered when handling json-rpc batch requests", + ) + + cmd.Flags().Uint64Var( + ¶ms.jsonRPCBlockRangeLimit, + jsonRPCBlockRangeLimitFlag, + defaultConfig.JSONRPCBlockRangeLimit, + "the max block range to be considered when executing json-rpc requests "+ + "that consider fromBlock/toBlock values (e.g. eth_getLogs)", + ) + setDevFlags(cmd) } diff --git a/graphql/service.go b/graphql/service.go index 211a122351..1dde0dba78 100644 --- a/graphql/service.go +++ b/graphql/service.go @@ -26,6 +26,7 @@ type Config struct { Forks chain.Forks ChainID uint64 AccessControlAllowOrigin []string + BlockRangeLimit uint64 } // GraphQLStore defines all the methods required @@ -38,10 +39,15 @@ type GraphQLStore interface { // NewJSONRPC returns the JSONRPC http server func NewGraphQLService(logger hclog.Logger, config *Config) (*GraphQLService, error) { + var blockRangeLimit = config.BlockRangeLimit + if blockRangeLimit == 0 { + blockRangeLimit = rpc.DefaultJSONRPCBlockRangeLimit + } + q := Resolver{ backend: config.Store, chainID: config.ChainID, - filterManager: rpc.NewFilterManager(hclog.NewNullLogger(), config.Store), + filterManager: rpc.NewFilterManager(hclog.NewNullLogger(), config.Store, blockRangeLimit), } s, err := graphql.ParseSchema(schema, &q) diff --git a/jsonrpc/default.go b/jsonrpc/default.go new file mode 100644 index 0000000000..e0b9c37584 --- /dev/null +++ b/jsonrpc/default.go @@ -0,0 +1,8 @@ +package jsonrpc + +const ( + // maximum length allowed for json_rpc batch requests + DefaultJSONRPCBatchRequestLimit uint64 = 1 + // maximum block range allowed for json_rpc requests with fromBlock/toBlock values (e.g. eth_getLogs) + DefaultJSONRPCBlockRangeLimit uint64 = 100 +) diff --git a/jsonrpc/dispatcher.go b/jsonrpc/dispatcher.go index 69b3ce88f9..6a6158fef8 100644 --- a/jsonrpc/dispatcher.go +++ b/jsonrpc/dispatcher.go @@ -39,21 +39,29 @@ type endpoints struct { // Dispatcher handles all json rpc requests by delegating // the execution flow to the corresponding service type Dispatcher struct { - logger hclog.Logger - serviceMap map[string]*serviceData - filterManager *FilterManager - endpoints endpoints - chainID uint64 + logger hclog.Logger + serviceMap map[string]*serviceData + filterManager *FilterManager + endpoints endpoints + chainID uint64 + jsonRPCBatchLengthLimit uint64 } -func newDispatcher(logger hclog.Logger, store JSONRPCStore, chainID uint64) *Dispatcher { +func newDispatcher( + logger hclog.Logger, + store JSONRPCStore, + chainID uint64, + jsonRPCBatchLengthLimit uint64, + blockRangeLimit uint64, +) *Dispatcher { d := &Dispatcher{ - logger: logger.Named("dispatcher"), - chainID: chainID, + logger: logger.Named("dispatcher"), + chainID: chainID, + jsonRPCBatchLengthLimit: jsonRPCBatchLengthLimit, } if store != nil { - d.filterManager = NewFilterManager(logger, store) + d.filterManager = NewFilterManager(logger, store, blockRangeLimit) go d.filterManager.Run() } @@ -252,6 +260,11 @@ func (d *Dispatcher) Handle(reqBody []byte) ([]byte, error) { return NewRPCResponse(nil, "2.0", nil, NewInvalidRequestError("Invalid json request")).Bytes() } + // avoid handling long batch requests + if len(requests) > int(d.jsonRPCBatchLengthLimit) { + return NewRPCResponse(nil, "2.0", nil, NewInvalidRequestError("Batch request length too long")).Bytes() + } + responses := make([]Response, 0) for _, req := range requests { diff --git a/jsonrpc/dispatcher_test.go b/jsonrpc/dispatcher_test.go index 0ca8e57b89..c6074fa348 100644 --- a/jsonrpc/dispatcher_test.go +++ b/jsonrpc/dispatcher_test.go @@ -58,7 +58,7 @@ func expectBatchJSONResult(data []byte, v interface{}) error { func TestDispatcher_HandleWebsocketConnection_EthSubscribe(t *testing.T) { t.Run("clients should be able to receive \"newHeads\" event thru eth_subscribe", func(t *testing.T) { store := newMockStore() - dispatcher := newDispatcher(hclog.NewNullLogger(), store, 0) + dispatcher := newDispatcher(hclog.NewNullLogger(), store, 0, 20, 1000) mockConnection := &mockWsConn{ msgCh: make(chan []byte, 1), @@ -92,7 +92,7 @@ func TestDispatcher_HandleWebsocketConnection_EthSubscribe(t *testing.T) { func TestDispatcher_WebsocketConnection_RequestFormats(t *testing.T) { store := newMockStore() - dispatcher := newDispatcher(hclog.NewNullLogger(), store, 0) + dispatcher := newDispatcher(hclog.NewNullLogger(), store, 0, 20, 1000) mockConnection := &mockWsConn{ msgCh: make(chan []byte, 1), @@ -196,7 +196,7 @@ func (m *mockService) Filter(f LogQuery) (interface{}, error) { func TestDispatcherFuncDecode(t *testing.T) { srv := &mockService{msgCh: make(chan interface{}, 10)} - dispatcher := newDispatcher(hclog.NewNullLogger(), newMockStore(), 0) + dispatcher := newDispatcher(hclog.NewNullLogger(), newMockStore(), 0, 20, 1000) dispatcher.registerService("mock", srv) handleReq := func(typ string, msg string) interface{} { @@ -261,25 +261,94 @@ func TestDispatcherFuncDecode(t *testing.T) { } func TestDispatcherBatchRequest(t *testing.T) { - dispatcher := newDispatcher(hclog.NewNullLogger(), newMockStore(), 0) - - // test with leading whitespace (" \t\n\n\r") - leftBytes := []byte{0x20, 0x20, 0x09, 0x0A, 0x0A, 0x0D} - resp, err := dispatcher.Handle(append(leftBytes, []byte(`[ - {"id":1,"jsonrpc":"2.0","method":"eth_getBalance","params":["0x1", true]}, - {"id":2,"jsonrpc":"2.0","method":"eth_getBlockByNumber","params":["0x2", true]}, - {"id":3,"jsonrpc":"2.0","method":"eth_getBlockByNumber","params":["0x3", true]}, - {"id":4,"jsonrpc":"2.0","method": "web3_sha3","params": ["0x68656c6c6f20776f726c64"]} -]`)...)) - assert.NoError(t, err) - - var res []SuccessResponse - - assert.NoError(t, expectBatchJSONResult(resp, &res)) - assert.Len(t, res, 4) + handle := func(dispatcher *Dispatcher, reqBody []byte) []byte { + res, _ := dispatcher.Handle(reqBody) + return res + } - jsonerr := &ObjectError{Code: -32602, Message: "Invalid Params"} + cases := []struct { + name string + desc string + dispatcher *Dispatcher + reqBody []byte + err *ObjectError + batchResponse []*SuccessResponse + }{ + { + "leading-whitespace", + "test with leading whitespace (\" \\t\\n\\n\\r\\)", + newDispatcher(hclog.NewNullLogger(), newMockStore(), 0, 20, 1000), + append([]byte{0x20, 0x20, 0x09, 0x0A, 0x0A, 0x0D}, []byte(`[ + {"id":1,"jsonrpc":"2.0","method":"eth_getBalance","params":["0x1", true]}, + {"id":2,"jsonrpc":"2.0","method":"eth_getBlockByNumber","params":["0x2", true]}, + {"id":3,"jsonrpc":"2.0","method":"eth_getBlockByNumber","params":["0x3", true]}, + {"id":4,"jsonrpc":"2.0","method": "web3_sha3","params": ["0x68656c6c6f20776f726c64"]}]`)...), + nil, + []*SuccessResponse{ + {Error: &ObjectError{Code: -32602, Message: "Invalid Params"}}, + {Error: nil}, + {Error: nil}, + {Error: nil}}, + }, + { + "valid-batch-req", + "test with batch req length within batchRequestLengthLimit", + newDispatcher(hclog.NewNullLogger(), newMockStore(), 0, 10, 1000), + []byte(`[ + {"id":1,"jsonrpc":"2.0","method":"eth_getBlockByNumber","params":["latest", true]}, + {"id":2,"jsonrpc":"2.0","method":"eth_getBlockByNumber","params":["latest", true]}, + {"id":3,"jsonrpc":"2.0","method":"eth_getBlockByNumber","params":["latest", true]}, + {"id":4,"jsonrpc":"2.0","method":"eth_getBlockByNumber","params":["latest", true]}, + {"id":5,"jsonrpc":"2.0","method":"eth_getBlockByNumber","params":["latest", true]}, + {"id":6,"jsonrpc":"2.0","method":"eth_getBlockByNumber","params":["latest", true]}]`), + nil, + []*SuccessResponse{ + {Error: nil}, + {Error: nil}, + {Error: nil}, + {Error: nil}, + {Error: nil}, + {Error: nil}}, + }, + { + "invalid-batch-req", + "test with batch req length exceeding batchRequestLengthLimit", + newDispatcher(hclog.NewNullLogger(), newMockStore(), 0, 3, 1000), + []byte(`[ + {"id":1,"jsonrpc":"2.0","method":"eth_getBlockByNumber","params":["latest", true]}, + {"id":2,"jsonrpc":"2.0","method":"eth_getBlockByNumber","params":["latest", true]}, + {"id":3,"jsonrpc":"2.0","method":"eth_getBlockByNumber","params":["latest", true]}, + {"id":4,"jsonrpc":"2.0","method":"eth_getBlockByNumber","params":["latest", true]}, + {"id":5,"jsonrpc":"2.0","method":"eth_getBlockByNumber","params":["latest", true]}, + {"id":6,"jsonrpc":"2.0","method":"eth_getBlockByNumber","params":["latest", true]}]`), + &ObjectError{Code: -32600, Message: "Batch request length too long"}, + nil, + }, + } - assert.Equal(t, res[0].Error, jsonerr) - assert.Nil(t, res[3].Error) + for _, c := range cases { + res := handle(c.dispatcher, c.reqBody) + + if c.err != nil { + var resp ErrorResponse + + assert.NoError(t, expectBatchJSONResult(res, &resp)) + assert.Equal(t, resp.Error, c.err) + } else { + var batchResp []SuccessResponse + assert.NoError(t, expectBatchJSONResult(res, &batchResp)) + + if c.name == "leading-whitespace" { + assert.Len(t, batchResp, 4) + for index, resp := range batchResp { + assert.Equal(t, resp.Error, c.batchResponse[index].Error) + } + } else if c.name == "valid-batch-req" { + assert.Len(t, batchResp, 6) + for index, resp := range batchResp { + assert.Equal(t, resp.Error, c.batchResponse[index].Error) + } + } + } + } } diff --git a/jsonrpc/filter_manager.go b/jsonrpc/filter_manager.go index eb8bf1a00e..b65647831e 100644 --- a/jsonrpc/filter_manager.go +++ b/jsonrpc/filter_manager.go @@ -23,6 +23,7 @@ var ( ErrCastingFilterToLogFilter = errors.New("casting filter object to logFilter error") ErrBlockNotFound = errors.New("block not found") ErrIncorrectBlockRange = errors.New("incorrect range") + ErrBlockRangeTooHigh = errors.New("block range too high") ErrPendingBlockNumber = errors.New("pending block number is not supported") ErrNoWSConnection = errors.New("no websocket connection") ) @@ -244,9 +245,10 @@ type FilterManager struct { timeout time.Duration - store filterManagerStore - subscription blockchain.Subscription - blockStream *blockStream + store filterManagerStore + subscription blockchain.Subscription + blockStream *blockStream + blockRangeLimit uint64 filters map[string]filter timeouts timeHeapImpl @@ -255,16 +257,17 @@ type FilterManager struct { closeCh chan struct{} } -func NewFilterManager(logger hclog.Logger, store filterManagerStore) *FilterManager { +func NewFilterManager(logger hclog.Logger, store filterManagerStore, blockRangeLimit uint64) *FilterManager { m := &FilterManager{ - logger: logger.Named("filter"), - timeout: defaultTimeout, - store: store, - blockStream: &blockStream{}, - filters: make(map[string]filter), - timeouts: timeHeapImpl{}, - updateCh: make(chan struct{}), - closeCh: make(chan struct{}), + logger: logger.Named("filter"), + timeout: defaultTimeout, + store: store, + blockStream: &blockStream{}, + blockRangeLimit: blockRangeLimit, + filters: make(map[string]filter), + timeouts: timeHeapImpl{}, + updateCh: make(chan struct{}), + closeCh: make(chan struct{}), } // start blockstream with the current header @@ -439,6 +442,11 @@ func (f *FilterManager) getLogsFromBlocks(query *LogQuery) ([]*Log, error) { return nil, ErrIncorrectBlockRange } + // avoid handling large block ranges + if to-from > f.blockRangeLimit { + return nil, ErrBlockRangeTooHigh + } + // If from equals genesis block // skip it if from == 0 { diff --git a/jsonrpc/filter_manager_test.go b/jsonrpc/filter_manager_test.go index 10333c9a71..3b380764e5 100644 --- a/jsonrpc/filter_manager_test.go +++ b/jsonrpc/filter_manager_test.go @@ -18,7 +18,7 @@ func TestFilterLog(t *testing.T) { store := newMockStore() - m := NewFilterManager(hclog.NewNullLogger(), store) + m := NewFilterManager(hclog.NewNullLogger(), store, 1000) // filter manager should Close(), but mock one might crash on writing on a closed channel //nolint:errcheck defer recover() @@ -85,7 +85,7 @@ func TestFilterBlock(t *testing.T) { store := newMockStore() - m := NewFilterManager(hclog.NewNullLogger(), store) + m := NewFilterManager(hclog.NewNullLogger(), store, 1000) // filter manager should Close(), but mock one might crash on writing on a closed channel //nolint:errcheck defer recover() @@ -190,7 +190,7 @@ func Test_GetLogsForQuery(t *testing.T) { store.appendBlocksToStore(blocks) - f := NewFilterManager(hclog.NewNullLogger(), store) + f := NewFilterManager(hclog.NewNullLogger(), store, 1000) t.Cleanup(func() { f.Close() // prevent memory leak @@ -251,6 +251,16 @@ func Test_GetLogsForQuery(t *testing.T) { 0, ErrIncorrectBlockRange, }, + { + "Block range too high", + &LogQuery{ + FromBlock: 10, + ToBlock: 1012, + Topics: topics, + }, + 0, + ErrBlockRangeTooHigh, + }, } for _, testCase := range testTable { @@ -278,7 +288,7 @@ func Test_GetLogFilterFromID(t *testing.T) { store := newMockStore() - m := NewFilterManager(hclog.NewNullLogger(), store) + m := NewFilterManager(hclog.NewNullLogger(), store, 1000) // filter manager should Close(), but mock one might crash on writing on a closed channel //nolint:errcheck defer recover() @@ -305,7 +315,7 @@ func TestFilterTimeout(t *testing.T) { store := newMockStore() - m := NewFilterManager(hclog.NewNullLogger(), store) + m := NewFilterManager(hclog.NewNullLogger(), store, 1000) // filter manager should Close(), but mock one might crash on writing on a closed channel //nolint:errcheck defer recover() @@ -332,7 +342,7 @@ func TestRemoveFilterByWebsocket(t *testing.T) { msgCh: make(chan []byte, 1), } - m := NewFilterManager(hclog.NewNullLogger(), store) + m := NewFilterManager(hclog.NewNullLogger(), store, 1000) // filter manager should Close(), but mock one might crash on writing on a closed channel //nolint:errcheck defer recover() @@ -357,7 +367,7 @@ func TestFilterWebsocket(t *testing.T) { msgCh: make(chan []byte, 1), } - m := NewFilterManager(hclog.NewNullLogger(), store) + m := NewFilterManager(hclog.NewNullLogger(), store, 1000) // filter manager should Close(), but mock one might crash on writing on a closed channel //nolint:errcheck defer recover() @@ -432,7 +442,7 @@ func TestClosedFilterDeletion(t *testing.T) { store := newMockStore() - m := NewFilterManager(hclog.NewNullLogger(), store) + m := NewFilterManager(hclog.NewNullLogger(), store, 1000) // filter manager should Close(), but mock one might crash on writing on a closed channel //nolint:errcheck defer recover() diff --git a/jsonrpc/jsonrpc.go b/jsonrpc/jsonrpc.go index aebf2e21c4..e3e110677d 100644 --- a/jsonrpc/jsonrpc.go +++ b/jsonrpc/jsonrpc.go @@ -60,14 +60,29 @@ type Config struct { Addr *net.TCPAddr ChainID uint64 AccessControlAllowOrigin []string + BatchLengthLimit uint64 + BlockRangeLimit uint64 } // NewJSONRPC returns the JSONRPC http server func NewJSONRPC(logger hclog.Logger, config *Config) (*JSONRPC, error) { + var ( + batchLengthLimit = config.BatchLengthLimit + blockRangeLimit = config.BlockRangeLimit + ) + + if batchLengthLimit == 0 { + batchLengthLimit = DefaultJSONRPCBatchRequestLimit + } + + if blockRangeLimit == 0 { + blockRangeLimit = DefaultJSONRPCBlockRangeLimit + } + srv := &JSONRPC{ logger: logger.Named("jsonrpc"), config: config, - dispatcher: newDispatcher(logger, config.Store, config.ChainID), + dispatcher: newDispatcher(logger, config.Store, config.ChainID, batchLengthLimit, blockRangeLimit), } // start http server diff --git a/jsonrpc/web3_endpoint_test.go b/jsonrpc/web3_endpoint_test.go index cc9de34d8c..1e6cff0095 100644 --- a/jsonrpc/web3_endpoint_test.go +++ b/jsonrpc/web3_endpoint_test.go @@ -10,7 +10,7 @@ import ( ) func TestWeb3EndpointSha3(t *testing.T) { - dispatcher := newDispatcher(hclog.NewNullLogger(), newMockStore(), 0) + dispatcher := newDispatcher(hclog.NewNullLogger(), newMockStore(), 0, 20, 1000) resp, err := dispatcher.Handle([]byte(`{ "method": "web3_sha3", @@ -25,7 +25,7 @@ func TestWeb3EndpointSha3(t *testing.T) { } func TestWeb3EndpointClientVersion(t *testing.T) { - dispatcher := newDispatcher(hclog.NewNullLogger(), newMockStore(), 0) + dispatcher := newDispatcher(hclog.NewNullLogger(), newMockStore(), 0, 20, 1000) resp, err := dispatcher.Handle([]byte(`{ "method": "web3_clientVersion", diff --git a/server/config.go b/server/config.go index d709fd0d28..dc3f6d6065 100644 --- a/server/config.go +++ b/server/config.go @@ -56,9 +56,12 @@ type Telemetry struct { type JSONRPC struct { JSONRPCAddr *net.TCPAddr AccessControlAllowOrigin []string + BatchLengthLimit uint64 + BlockRangeLimit uint64 } type GraphQL struct { GraphQLAddr *net.TCPAddr AccessControlAllowOrigin []string + BlockRangeLimit uint64 } diff --git a/server/server.go b/server/server.go index 0eacf65217..16d270244c 100644 --- a/server/server.go +++ b/server/server.go @@ -575,6 +575,8 @@ func (s *Server) setupJSONRPC() error { Addr: s.config.JSONRPC.JSONRPCAddr, ChainID: uint64(s.config.Chain.Params.ChainID), AccessControlAllowOrigin: s.config.JSONRPC.AccessControlAllowOrigin, + BatchLengthLimit: s.config.JSONRPC.BatchLengthLimit, + BlockRangeLimit: s.config.JSONRPC.BlockRangeLimit, } srv, err := jsonrpc.NewJSONRPC(s.logger, conf) @@ -608,6 +610,7 @@ func (s *Server) setupGraphQL() error { Addr: s.config.GraphQL.GraphQLAddr, ChainID: uint64(s.config.Chain.Params.ChainID), AccessControlAllowOrigin: s.config.GraphQL.AccessControlAllowOrigin, + BlockRangeLimit: s.config.GraphQL.BlockRangeLimit, } srv, err := graphql.NewGraphQLService(s.logger, conf) From 9831b66e71dfae37d4cf4ac544db25dd5ef38d34 Mon Sep 17 00:00:00 2001 From: DarianShawn Date: Wed, 24 Aug 2022 19:24:16 +0800 Subject: [PATCH 4/7] Refresh jsonrpc filter when it is reachable --- jsonrpc/filter_manager.go | 46 +++++++++++++++++++++++++++++---------- 1 file changed, 35 insertions(+), 11 deletions(-) diff --git a/jsonrpc/filter_manager.go b/jsonrpc/filter_manager.go index b65647831e..57f3fce13e 100644 --- a/jsonrpc/filter_manager.go +++ b/jsonrpc/filter_manager.go @@ -523,28 +523,56 @@ func (f *FilterManager) GetLogFilterFromID(filterID string) (*logFilter, error) return logFilter, nil } -// GetFilterChanges returns the updates of the filter with given ID in string +// refreshFilterTimeout updates the timeout for a filter to the current time +func (f *FilterManager) refreshFilterTimeout(filter *filterBase) { + f.timeouts.removeFilter(filter) + f.addFilterTimeout(filter) +} + +// addFilterTimeout set timeout and add to heap +func (f *FilterManager) addFilterTimeout(filter *filterBase) { + filter.expiresAt = time.Now().Add(f.timeout) + f.timeouts.addFilter(filter) + f.emitSignalToUpdateCh() +} + +// GetFilterChanges returns the updates of the filter with given ID in string, +// and refreshes the timeout on the filter func (f *FilterManager) GetFilterChanges(id string) (string, error) { + filter, res, err := f.getFilterAndChanges(id) + + if err == nil && !filter.hasWSConn() { + // Refresh the timeout on this filter + f.Lock() + f.refreshFilterTimeout(filter.getFilterBase()) + f.Unlock() + } + + return res, err +} + +// getFilterAndChanges returns the updates of the filter with given ID in string +func (f *FilterManager) getFilterAndChanges(id string) (filter, string, error) { f.RLock() defer f.RUnlock() filter, ok := f.filters[id] if !ok { - return "", ErrFilterDoesNotExists + return nil, "", ErrFilterDoesNotExists } // we cannot get updates from a ws filter with getFilterChanges if filter.hasWSConn() { - return "", ErrWSFilterDoesNotSupportGetChanges + return nil, "", ErrWSFilterDoesNotSupportGetChanges } res, err := filter.getUpdates() if err != nil { - return "", err + return nil, "", err } - return res, nil + return filter, res, nil } // Uninstall removes the filter with given ID from list @@ -592,9 +620,7 @@ func (f *FilterManager) addFilter(filter filter) string { // Set timeout and add to heap if filter doesn't have web socket connection if !filter.hasWSConn() { - base.expiresAt = time.Now().Add(f.timeout) - f.timeouts.addFilter(base) - f.emitSignalToUpdateCh() + f.addFilterTimeout(base) } f.logger.Debug("filter added", "id", base.id, "timeout", base.expiresAt) @@ -737,13 +763,11 @@ func (f *FilterManager) flushWsFilters() error { // remove filters with closed web socket connections from FilterManager if len(closedFilterIDs) > 0 { f.Lock() - for _, id := range closedFilterIDs { f.removeFilterByID(id) } - f.Unlock() - f.emitSignalToUpdateCh() + f.logger.Info(fmt.Sprintf("Removed %d filters due to closed connections", len(closedFilterIDs))) } From 1b3c7602cdcaa477f0336f84fdc085cdab656728 Mon Sep 17 00:00:00 2001 From: DarianShawn Date: Wed, 24 Aug 2022 19:29:11 +0800 Subject: [PATCH 5/7] Fix lint error --- jsonrpc/dispatcher_test.go | 4 +++- jsonrpc/filter_manager.go | 1 + 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/jsonrpc/dispatcher_test.go b/jsonrpc/dispatcher_test.go index c6074fa348..6b7cc83e67 100644 --- a/jsonrpc/dispatcher_test.go +++ b/jsonrpc/dispatcher_test.go @@ -263,9 +263,11 @@ func TestDispatcherFuncDecode(t *testing.T) { func TestDispatcherBatchRequest(t *testing.T) { handle := func(dispatcher *Dispatcher, reqBody []byte) []byte { res, _ := dispatcher.Handle(reqBody) + return res } + //nolint:lll cases := []struct { name string desc string @@ -282,7 +284,7 @@ func TestDispatcherBatchRequest(t *testing.T) { {"id":1,"jsonrpc":"2.0","method":"eth_getBalance","params":["0x1", true]}, {"id":2,"jsonrpc":"2.0","method":"eth_getBlockByNumber","params":["0x2", true]}, {"id":3,"jsonrpc":"2.0","method":"eth_getBlockByNumber","params":["0x3", true]}, - {"id":4,"jsonrpc":"2.0","method": "web3_sha3","params": ["0x68656c6c6f20776f726c64"]}]`)...), + {"id":4,"jsonrpc":"2.0","method":"web3_sha3","params": ["0x68656c6c6f20776f726c64"]}]`)...), nil, []*SuccessResponse{ {Error: &ObjectError{Code: -32602, Message: "Invalid Params"}}, diff --git a/jsonrpc/filter_manager.go b/jsonrpc/filter_manager.go index 57f3fce13e..e21d36d07f 100644 --- a/jsonrpc/filter_manager.go +++ b/jsonrpc/filter_manager.go @@ -715,6 +715,7 @@ func (f *FilterManager) appendLogsToFilters(header *types.Header) error { // Extract tx Hash receipt.TxHash = block.Transactions[indx].Hash } + f.appendLog(&Log{ Address: log.Address, Topics: log.Topics, From 206c067a85a61fc8ceb12c0ff262455aaf595762 Mon Sep 17 00:00:00 2001 From: DarianShawn Date: Wed, 24 Aug 2022 23:09:54 +0800 Subject: [PATCH 6/7] Disable rpc batch and block range limit when set to 0 --- command/server/params.go | 9 ++--- command/server/server.go | 4 +- graphql/service.go | 7 +--- jsonrpc/default.go | 5 ++- jsonrpc/dispatcher.go | 5 ++- jsonrpc/dispatcher_test.go | 81 ++++++++++++++++++++++++++++---------- jsonrpc/filter_manager.go | 16 ++++---- jsonrpc/jsonrpc.go | 20 ++-------- 8 files changed, 84 insertions(+), 63 deletions(-) diff --git a/command/server/params.go b/command/server/params.go index aa9963fb9d..73eed21efa 100644 --- a/command/server/params.go +++ b/command/server/params.go @@ -87,9 +87,6 @@ type serverParams struct { secretsConfig *secrets.SecretsManagerConfig logFileLocation string - - jsonRPCBatchLengthLimit uint64 - jsonRPCBlockRangeLimit uint64 } func (p *serverParams) validateFlags() error { @@ -167,14 +164,14 @@ func (p *serverParams) generateConfig() *server.Config { JSONRPC: &server.JSONRPC{ JSONRPCAddr: p.jsonRPCAddress, AccessControlAllowOrigin: p.corsAllowedOrigins, - BatchLengthLimit: p.jsonRPCBatchLengthLimit, - BlockRangeLimit: p.jsonRPCBlockRangeLimit, + BatchLengthLimit: p.rawConfig.JSONRPCBatchRequestLimit, + BlockRangeLimit: p.rawConfig.JSONRPCBlockRangeLimit, }, EnableGraphQL: p.rawConfig.EnableGraphQL, GraphQL: &server.GraphQL{ GraphQLAddr: p.graphqlAddress, AccessControlAllowOrigin: p.corsAllowedOrigins, - BlockRangeLimit: p.jsonRPCBlockRangeLimit, + BlockRangeLimit: p.rawConfig.JSONRPCBlockRangeLimit, }, GRPCAddr: p.grpcAddress, LibP2PAddr: p.libp2pAddress, diff --git a/command/server/server.go b/command/server/server.go index b6e0fb1932..777256f348 100644 --- a/command/server/server.go +++ b/command/server/server.go @@ -235,14 +235,14 @@ func setFlags(cmd *cobra.Command) { ) cmd.Flags().Uint64Var( - ¶ms.jsonRPCBatchLengthLimit, + ¶ms.rawConfig.JSONRPCBatchRequestLimit, jsonRPCBatchRequestLimitFlag, defaultConfig.JSONRPCBatchRequestLimit, "the max length to be considered when handling json-rpc batch requests", ) cmd.Flags().Uint64Var( - ¶ms.jsonRPCBlockRangeLimit, + ¶ms.rawConfig.JSONRPCBlockRangeLimit, jsonRPCBlockRangeLimitFlag, defaultConfig.JSONRPCBlockRangeLimit, "the max block range to be considered when executing json-rpc requests "+ diff --git a/graphql/service.go b/graphql/service.go index 1dde0dba78..72652b92e6 100644 --- a/graphql/service.go +++ b/graphql/service.go @@ -39,15 +39,10 @@ type GraphQLStore interface { // NewJSONRPC returns the JSONRPC http server func NewGraphQLService(logger hclog.Logger, config *Config) (*GraphQLService, error) { - var blockRangeLimit = config.BlockRangeLimit - if blockRangeLimit == 0 { - blockRangeLimit = rpc.DefaultJSONRPCBlockRangeLimit - } - q := Resolver{ backend: config.Store, chainID: config.ChainID, - filterManager: rpc.NewFilterManager(hclog.NewNullLogger(), config.Store, blockRangeLimit), + filterManager: rpc.NewFilterManager(hclog.NewNullLogger(), config.Store, config.BlockRangeLimit), } s, err := graphql.ParseSchema(schema, &q) diff --git a/jsonrpc/default.go b/jsonrpc/default.go index e0b9c37584..03006b63e1 100644 --- a/jsonrpc/default.go +++ b/jsonrpc/default.go @@ -1,8 +1,9 @@ package jsonrpc const ( - // maximum length allowed for json_rpc batch requests + // DefaultJSONRPCBatchRequestLimit maximum length allowed for json_rpc batch requests DefaultJSONRPCBatchRequestLimit uint64 = 1 - // maximum block range allowed for json_rpc requests with fromBlock/toBlock values (e.g. eth_getLogs) + // DefaultJSONRPCBlockRangeLimit maximum block range allowed for json_rpc + // requests with fromBlock/toBlock values (e.g. eth_getLogs) DefaultJSONRPCBlockRangeLimit uint64 = 100 ) diff --git a/jsonrpc/dispatcher.go b/jsonrpc/dispatcher.go index 6a6158fef8..b3ab778b35 100644 --- a/jsonrpc/dispatcher.go +++ b/jsonrpc/dispatcher.go @@ -260,8 +260,9 @@ func (d *Dispatcher) Handle(reqBody []byte) ([]byte, error) { return NewRPCResponse(nil, "2.0", nil, NewInvalidRequestError("Invalid json request")).Bytes() } - // avoid handling long batch requests - if len(requests) > int(d.jsonRPCBatchLengthLimit) { + // if not disabled, avoid handling long batch requests + if d.jsonRPCBatchLengthLimit > 0 && + len(requests) > int(d.jsonRPCBatchLengthLimit) { return NewRPCResponse(nil, "2.0", nil, NewInvalidRequestError("Batch request length too long")).Bytes() } diff --git a/jsonrpc/dispatcher_test.go b/jsonrpc/dispatcher_test.go index 6b7cc83e67..f5f0030656 100644 --- a/jsonrpc/dispatcher_test.go +++ b/jsonrpc/dispatcher_test.go @@ -58,7 +58,7 @@ func expectBatchJSONResult(data []byte, v interface{}) error { func TestDispatcher_HandleWebsocketConnection_EthSubscribe(t *testing.T) { t.Run("clients should be able to receive \"newHeads\" event thru eth_subscribe", func(t *testing.T) { store := newMockStore() - dispatcher := newDispatcher(hclog.NewNullLogger(), store, 0, 20, 1000) + dispatcher := newDispatcher(hclog.NewNullLogger(), store, 0, 0, 0) mockConnection := &mockWsConn{ msgCh: make(chan []byte, 1), @@ -92,7 +92,7 @@ func TestDispatcher_HandleWebsocketConnection_EthSubscribe(t *testing.T) { func TestDispatcher_WebsocketConnection_RequestFormats(t *testing.T) { store := newMockStore() - dispatcher := newDispatcher(hclog.NewNullLogger(), store, 0, 20, 1000) + dispatcher := newDispatcher(hclog.NewNullLogger(), store, 0, 0, 0) mockConnection := &mockWsConn{ msgCh: make(chan []byte, 1), @@ -196,7 +196,7 @@ func (m *mockService) Filter(f LogQuery) (interface{}, error) { func TestDispatcherFuncDecode(t *testing.T) { srv := &mockService{msgCh: make(chan interface{}, 10)} - dispatcher := newDispatcher(hclog.NewNullLogger(), newMockStore(), 0, 20, 1000) + dispatcher := newDispatcher(hclog.NewNullLogger(), newMockStore(), 0, 0, 0) dispatcher.registerService("mock", srv) handleReq := func(typ string, msg string) interface{} { @@ -279,12 +279,12 @@ func TestDispatcherBatchRequest(t *testing.T) { { "leading-whitespace", "test with leading whitespace (\" \\t\\n\\n\\r\\)", - newDispatcher(hclog.NewNullLogger(), newMockStore(), 0, 20, 1000), + newDispatcher(hclog.NewNullLogger(), newMockStore(), 0, 0, 0), append([]byte{0x20, 0x20, 0x09, 0x0A, 0x0A, 0x0D}, []byte(`[ - {"id":1,"jsonrpc":"2.0","method":"eth_getBalance","params":["0x1", true]}, - {"id":2,"jsonrpc":"2.0","method":"eth_getBlockByNumber","params":["0x2", true]}, - {"id":3,"jsonrpc":"2.0","method":"eth_getBlockByNumber","params":["0x3", true]}, - {"id":4,"jsonrpc":"2.0","method":"web3_sha3","params": ["0x68656c6c6f20776f726c64"]}]`)...), + {"id":1,"jsonrpc":"2.0","method":"eth_getBalance","params":["0x1", true]}, + {"id":2,"jsonrpc":"2.0","method":"eth_getBlockByNumber","params":["0x2", true]}, + {"id":3,"jsonrpc":"2.0","method":"eth_getBlockByNumber","params":["0x3", true]}, + {"id":4,"jsonrpc":"2.0","method":"web3_sha3","params": ["0x68656c6c6f20776f726c64"]}]`)...), nil, []*SuccessResponse{ {Error: &ObjectError{Code: -32602, Message: "Invalid Params"}}, @@ -295,14 +295,14 @@ func TestDispatcherBatchRequest(t *testing.T) { { "valid-batch-req", "test with batch req length within batchRequestLengthLimit", - newDispatcher(hclog.NewNullLogger(), newMockStore(), 0, 10, 1000), + newDispatcher(hclog.NewNullLogger(), newMockStore(), 0, 0, 0), []byte(`[ - {"id":1,"jsonrpc":"2.0","method":"eth_getBlockByNumber","params":["latest", true]}, - {"id":2,"jsonrpc":"2.0","method":"eth_getBlockByNumber","params":["latest", true]}, - {"id":3,"jsonrpc":"2.0","method":"eth_getBlockByNumber","params":["latest", true]}, - {"id":4,"jsonrpc":"2.0","method":"eth_getBlockByNumber","params":["latest", true]}, - {"id":5,"jsonrpc":"2.0","method":"eth_getBlockByNumber","params":["latest", true]}, - {"id":6,"jsonrpc":"2.0","method":"eth_getBlockByNumber","params":["latest", true]}]`), + {"id":1,"jsonrpc":"2.0","method":"eth_getBlockByNumber","params":["latest", true]}, + {"id":2,"jsonrpc":"2.0","method":"eth_getBlockByNumber","params":["latest", true]}, + {"id":3,"jsonrpc":"2.0","method":"eth_getBlockByNumber","params":["latest", true]}, + {"id":4,"jsonrpc":"2.0","method":"eth_getBlockByNumber","params":["latest", true]}, + {"id":5,"jsonrpc":"2.0","method":"eth_getBlockByNumber","params":["latest", true]}, + {"id":6,"jsonrpc":"2.0","method":"eth_getBlockByNumber","params":["latest", true]}]`), nil, []*SuccessResponse{ {Error: nil}, @@ -317,15 +317,49 @@ func TestDispatcherBatchRequest(t *testing.T) { "test with batch req length exceeding batchRequestLengthLimit", newDispatcher(hclog.NewNullLogger(), newMockStore(), 0, 3, 1000), []byte(`[ - {"id":1,"jsonrpc":"2.0","method":"eth_getBlockByNumber","params":["latest", true]}, - {"id":2,"jsonrpc":"2.0","method":"eth_getBlockByNumber","params":["latest", true]}, - {"id":3,"jsonrpc":"2.0","method":"eth_getBlockByNumber","params":["latest", true]}, - {"id":4,"jsonrpc":"2.0","method":"eth_getBlockByNumber","params":["latest", true]}, - {"id":5,"jsonrpc":"2.0","method":"eth_getBlockByNumber","params":["latest", true]}, - {"id":6,"jsonrpc":"2.0","method":"eth_getBlockByNumber","params":["latest", true]}]`), + {"id":1,"jsonrpc":"2.0","method":"eth_getBlockByNumber","params":["latest", true]}, + {"id":2,"jsonrpc":"2.0","method":"eth_getBlockByNumber","params":["latest", true]}, + {"id":3,"jsonrpc":"2.0","method":"eth_getBlockByNumber","params":["latest", true]}, + {"id":4,"jsonrpc":"2.0","method":"eth_getBlockByNumber","params":["latest", true]}, + {"id":5,"jsonrpc":"2.0","method":"eth_getBlockByNumber","params":["latest", true]}, + {"id":6,"jsonrpc":"2.0","method":"eth_getBlockByNumber","params":["latest", true]}]`), &ObjectError{Code: -32600, Message: "Batch request length too long"}, nil, }, + { + "no-limits", + "test when limits are not set", + newDispatcher(hclog.NewNullLogger(), newMockStore(), 0, 0, 0), + []byte(`[ + {"id":1,"jsonrpc":"2.0","method":"eth_getBlockByNumber","params":["latest", true]}, + {"id":2,"jsonrpc":"2.0","method":"eth_getBlockByNumber","params":["latest", true]}, + {"id":3,"jsonrpc":"2.0","method":"eth_getBlockByNumber","params":["latest", true]}, + {"id":4,"jsonrpc":"2.0","method":"eth_getBlockByNumber","params":["latest", true]}, + {"id":5,"jsonrpc":"2.0","method":"eth_getBlockByNumber","params":["latest", true]}, + {"id":6,"jsonrpc":"2.0","method":"eth_getBlockByNumber","params":["latest", true]}, + {"id":7,"jsonrpc":"2.0","method":"eth_getBlockByNumber","params":["latest", true]}, + {"id":8,"jsonrpc":"2.0","method":"eth_getBlockByNumber","params":["latest", true]}, + {"id":9,"jsonrpc":"2.0","method":"eth_getBlockByNumber","params":["latest", true]}, + {"id":10,"jsonrpc":"2.0","method":"eth_getBlockByNumber","params":["latest", true]}, + {"id":11,"jsonrpc":"2.0","method":"eth_getBlockByNumber","params":["latest", true]}, + {"id":12,"jsonrpc":"2.0","method":"eth_getBlockByNumber","params":["latest", true]}]`, + ), + nil, + []*SuccessResponse{ + {Error: nil}, + {Error: nil}, + {Error: nil}, + {Error: nil}, + {Error: nil}, + {Error: nil}, + {Error: nil}, + {Error: nil}, + {Error: nil}, + {Error: nil}, + {Error: nil}, + {Error: nil}, + }, + }, } for _, c := range cases { @@ -350,6 +384,11 @@ func TestDispatcherBatchRequest(t *testing.T) { for index, resp := range batchResp { assert.Equal(t, resp.Error, c.batchResponse[index].Error) } + } else if c.name == "no-limits" { + assert.Len(t, batchResp, 12) + for index, resp := range batchResp { + assert.Equal(t, resp.Error, c.batchResponse[index].Error) + } } } } diff --git a/jsonrpc/filter_manager.go b/jsonrpc/filter_manager.go index e21d36d07f..e50767f2a7 100644 --- a/jsonrpc/filter_manager.go +++ b/jsonrpc/filter_manager.go @@ -438,21 +438,21 @@ func (f *FilterManager) getLogsFromBlocks(query *LogQuery) ([]*Log, error) { return nil, err } + // If from equals genesis block + // skip it + if from == 0 { + from = 1 + } + if to < from { return nil, ErrIncorrectBlockRange } - // avoid handling large block ranges - if to-from > f.blockRangeLimit { + // if not disabled, avoid handling large block ranges + if f.blockRangeLimit > 0 && to-from > f.blockRangeLimit { return nil, ErrBlockRangeTooHigh } - // If from equals genesis block - // skip it - if from == 0 { - from = 1 - } - logs := make([]*Log, 0) for i := from; i <= to; i++ { diff --git a/jsonrpc/jsonrpc.go b/jsonrpc/jsonrpc.go index e3e110677d..d1c8053680 100644 --- a/jsonrpc/jsonrpc.go +++ b/jsonrpc/jsonrpc.go @@ -66,23 +66,11 @@ type Config struct { // NewJSONRPC returns the JSONRPC http server func NewJSONRPC(logger hclog.Logger, config *Config) (*JSONRPC, error) { - var ( - batchLengthLimit = config.BatchLengthLimit - blockRangeLimit = config.BlockRangeLimit - ) - - if batchLengthLimit == 0 { - batchLengthLimit = DefaultJSONRPCBatchRequestLimit - } - - if blockRangeLimit == 0 { - blockRangeLimit = DefaultJSONRPCBlockRangeLimit - } - srv := &JSONRPC{ - logger: logger.Named("jsonrpc"), - config: config, - dispatcher: newDispatcher(logger, config.Store, config.ChainID, batchLengthLimit, blockRangeLimit), + logger: logger.Named("jsonrpc"), + config: config, + dispatcher: newDispatcher(logger, config.Store, config.ChainID, + config.BatchLengthLimit, config.BlockRangeLimit), } // start http server From 6537862255b3f60697531e77ef04fc2d98133417 Mon Sep 17 00:00:00 2001 From: DarianShawn Date: Wed, 24 Aug 2022 23:18:22 +0800 Subject: [PATCH 7/7] Fix lll nolint warning --- jsonrpc/dispatcher_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/jsonrpc/dispatcher_test.go b/jsonrpc/dispatcher_test.go index f5f0030656..24cd59dba7 100644 --- a/jsonrpc/dispatcher_test.go +++ b/jsonrpc/dispatcher_test.go @@ -267,7 +267,6 @@ func TestDispatcherBatchRequest(t *testing.T) { return res } - //nolint:lll cases := []struct { name string desc string