Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add IP to Pool txs #1817

Merged
merged 14 commits into from
Mar 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions db/migrations/pool/0003.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
-- +migrate Up
ToniRamirezM marked this conversation as resolved.
Show resolved Hide resolved
ALTER TABLE pool.transaction
DROP COLUMN failed_counter;

ALTER TABLE pool.transaction
ADD COLUMN ip VARCHAR;

-- +migrate Down
ALTER TABLE pool.transaction
ADD COLUMN failed_counter DECIMAL(78, 0) DEFAULT 0;

ALTER TABLE pool.transaction
DROP COLUMN ip;
12 changes: 12 additions & 0 deletions db/migrations/state/0004.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
-- +migrate Up
CREATE TABLE state.event
tclemos marked this conversation as resolved.
Show resolved Hide resolved
(
event_type VARCHAR NOT NULL,
timestamp TIMESTAMP WITH TIME ZONE NOT NULL,
ip VARCHAR,
tx_hash VARCHAR,
payload VARCHAR
);

-- +migrate Down
DROP table state.event;
10 changes: 6 additions & 4 deletions jsonrpc/endpoints_eth.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"math/big"
"net/http"

"github.com/0xPolygonHermez/zkevm-node/hex"
"github.com/0xPolygonHermez/zkevm-node/log"
Expand Down Expand Up @@ -614,11 +615,12 @@ func (e *EthEndpoints) newPendingTransactionFilter(wsConn *websocket.Conn) (inte
// SendRawTransaction has two different ways to handle new transactions:
// - for Sequencer nodes it tries to add the tx to the pool
// - for Non-Sequencer nodes it relays the Tx to the Sequencer node
func (e *EthEndpoints) SendRawTransaction(input string) (interface{}, rpcError) {
func (e *EthEndpoints) SendRawTransaction(httpRequest *http.Request, input string) (interface{}, rpcError) {
if e.cfg.SequencerNodeURI != "" {
return e.relayTxToSequencerNode(input)
} else {
return e.tryToAddTxToPool(input)
ip := httpRequest.Header.Get("X-Forwarded-For")
return e.tryToAddTxToPool(input, ip)
}
}

Expand All @@ -637,14 +639,14 @@ func (e *EthEndpoints) relayTxToSequencerNode(input string) (interface{}, rpcErr
return txHash, nil
}

func (e *EthEndpoints) tryToAddTxToPool(input string) (interface{}, rpcError) {
func (e *EthEndpoints) tryToAddTxToPool(input, ip string) (interface{}, rpcError) {
tx, err := hexToTx(input)
if err != nil {
return rpcErrorResponse(invalidParamsErrorCode, "invalid tx input", err)
}

log.Infof("adding TX to the pool: %v", tx.Hash().Hex())
if err := e.pool.AddTx(context.Background(), *tx); err != nil {
if err := e.pool.AddTx(context.Background(), *tx, ip); err != nil {
return rpcErrorResponse(defaultErrorCode, err.Error(), nil)
}
log.Infof("TX added to the pool: %v", tx.Hash().Hex())
Expand Down
12 changes: 6 additions & 6 deletions jsonrpc/endpoints_eth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2739,7 +2739,7 @@ func TestSendRawTransactionViaGeth(t *testing.T) {
})

m.Pool.
On("AddTx", context.Background(), txMatchByHash).
On("AddTx", context.Background(), txMatchByHash, "").
Return(nil).
Once()
},
Expand All @@ -2756,7 +2756,7 @@ func TestSendRawTransactionViaGeth(t *testing.T) {
})

m.Pool.
On("AddTx", context.Background(), txMatchByHash).
On("AddTx", context.Background(), txMatchByHash, "").
Return(errors.New("failed to add TX to the pool")).
Once()
},
Expand Down Expand Up @@ -2814,7 +2814,7 @@ func TestSendRawTransactionJSONRPCCall(t *testing.T) {
},
SetupMocks: func(t *testing.T, m *mocks, tc testCase) {
m.Pool.
On("AddTx", context.Background(), mock.IsType(types.Transaction{})).
On("AddTx", context.Background(), mock.IsType(types.Transaction{}), "").
Return(nil).
Once()
},
Expand All @@ -2836,7 +2836,7 @@ func TestSendRawTransactionJSONRPCCall(t *testing.T) {
},
SetupMocks: func(t *testing.T, m *mocks, tc testCase) {
m.Pool.
On("AddTx", context.Background(), mock.IsType(types.Transaction{})).
On("AddTx", context.Background(), mock.IsType(types.Transaction{}), "").
Return(errors.New("failed to add TX to the pool")).
Once()
},
Expand Down Expand Up @@ -2904,7 +2904,7 @@ func TestSendRawTransactionViaGethForNonSequencerNode(t *testing.T) {
})

m.Pool.
On("AddTx", context.Background(), txMatchByHash).
On("AddTx", context.Background(), txMatchByHash, "").
Return(nil).
Once()
},
Expand All @@ -2921,7 +2921,7 @@ func TestSendRawTransactionViaGethForNonSequencerNode(t *testing.T) {
})

m.Pool.
On("AddTx", context.Background(), txMatchByHash).
On("AddTx", context.Background(), txMatchByHash, "").
Return(errors.New("failed to add TX to the pool")).
Once()
},
Expand Down
11 changes: 10 additions & 1 deletion jsonrpc/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package jsonrpc
import (
"encoding/json"
"fmt"
"net/http"
"reflect"
"strings"
"sync"
Expand Down Expand Up @@ -34,7 +35,8 @@ func (f *funcData) numParams() int {

type handleRequest struct {
Request
wsConn *websocket.Conn
wsConn *websocket.Conn
HttpRequest *http.Request
}

// Handler manage services to handle jsonrpc requests
Expand Down Expand Up @@ -101,12 +103,19 @@ func (h *Handler) Handle(req handleRequest) Response {
requestHasWebSocketConn := req.wsConn != nil
funcHasMoreThanOneInputParams := len(fd.reqt) > 1
firstFuncParamIsWebSocketConn := false
firstFuncParamIsHttpRequest := false
if funcHasMoreThanOneInputParams {
firstFuncParamIsWebSocketConn = fd.reqt[1].AssignableTo(reflect.TypeOf(&websocket.Conn{}))
firstFuncParamIsHttpRequest = fd.reqt[1].AssignableTo(reflect.TypeOf(&http.Request{}))
}
if requestHasWebSocketConn && firstFuncParamIsWebSocketConn {
inArgs[1] = reflect.ValueOf(req.wsConn)
inArgsOffset++
} else if firstFuncParamIsHttpRequest {
// If in the future one endponit needs to have both a websocket connection and an http request
// we will need to modify this code to properly handle it
inArgs[1] = reflect.ValueOf(req.HttpRequest)
inArgsOffset++
}

// check params passed by request match function params
Expand Down
2 changes: 1 addition & 1 deletion jsonrpc/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (

// jsonRPCTxPool contains the methods required to interact with the tx pool.
type jsonRPCTxPool interface {
AddTx(ctx context.Context, tx types.Transaction) error
AddTx(ctx context.Context, tx types.Transaction, ip string) error
GetGasPrice(ctx context.Context) (uint64, error)
GetNonce(ctx context.Context, address common.Address) (uint64, error)
GetPendingTxHashesSince(ctx context.Context, since time.Time) ([]common.Hash, error)
Expand Down
10 changes: 5 additions & 5 deletions jsonrpc/mock_pool_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 6 additions & 6 deletions jsonrpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,9 +245,9 @@ func (s *Server) handle(w http.ResponseWriter, req *http.Request) {

start := time.Now()
if single {
s.handleSingleRequest(w, data)
s.handleSingleRequest(req, w, data)
} else {
s.handleBatchRequest(w, data)
s.handleBatchRequest(req, w, data)
}
metrics.RequestDuration(start)
}
Expand All @@ -262,14 +262,14 @@ func (s *Server) isSingleRequest(data []byte) (bool, rpcError) {
return x[0] == '{', nil
}

func (s *Server) handleSingleRequest(w http.ResponseWriter, data []byte) {
func (s *Server) handleSingleRequest(httpRequest *http.Request, w http.ResponseWriter, data []byte) {
defer metrics.RequestHandled(metrics.RequestHandledLabelSingle)
request, err := s.parseRequest(data)
if err != nil {
handleError(w, err)
return
}
req := handleRequest{Request: request}
req := handleRequest{Request: request, HttpRequest: httpRequest}
response := s.handler.Handle(req)

respBytes, err := json.Marshal(response)
Expand All @@ -285,7 +285,7 @@ func (s *Server) handleSingleRequest(w http.ResponseWriter, data []byte) {
}
}

func (s *Server) handleBatchRequest(w http.ResponseWriter, data []byte) {
func (s *Server) handleBatchRequest(httpRequest *http.Request, w http.ResponseWriter, data []byte) {
defer metrics.RequestHandled(metrics.RequestHandledLabelBatch)
requests, err := s.parseRequests(data)
if err != nil {
Expand All @@ -296,7 +296,7 @@ func (s *Server) handleBatchRequest(w http.ResponseWriter, data []byte) {
responses := make([]Response, 0, len(requests))

for _, request := range requests {
req := handleRequest{Request: request}
req := handleRequest{Request: request, HttpRequest: httpRequest}
response := s.handler.Handle(req)
responses = append(responses, response)
}
Expand Down
2 changes: 1 addition & 1 deletion pool/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ type storage interface {
GetTxs(ctx context.Context, filterStatus TxStatus, isClaims bool, minGasPrice, limit uint64) ([]*Transaction, error)
GetTxFromAddressFromByHash(ctx context.Context, hash common.Hash) (common.Address, uint64, error)
GetTxByHash(ctx context.Context, hash common.Hash) (*Transaction, error)
IncrementFailedCounter(ctx context.Context, hashes []string) error
GetTxZkCountersByHash(ctx context.Context, hash common.Hash) (*state.ZKCounters, error)
DeleteTransactionByHash(ctx context.Context, hash common.Hash) error
MarkWIPTxsAsPending(ctx context.Context) error
Expand All @@ -41,4 +40,5 @@ type stateInterface interface {
GetNonce(ctx context.Context, address common.Address, batchNumber uint64, dbTx pgx.Tx) (uint64, error)
GetTransactionByHash(ctx context.Context, transactionHash common.Hash, dbTx pgx.Tx) (*types.Transaction, error)
PreProcessTransaction(ctx context.Context, tx *types.Transaction, dbTx pgx.Tx) (*state.ProcessBatchResponse, error)
AddEvent(ctx context.Context, event *state.Event, dbTx pgx.Tx) error
}
Loading