Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add IP to Pool txs #1817

Merged
merged 14 commits into from
Mar 16, 2023
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions db/migrations/pool/0003.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
-- +migrate Up
ToniRamirezM marked this conversation as resolved.
Show resolved Hide resolved
ALTER TABLE pool.transaction
DROP COLUMN failed_counter;

ALTER TABLE pool.transaction
ADD COLUMN ip VARCHAR;

-- +migrate Down
ALTER TABLE pool.transaction
ADD COLUMN failed_counter DECIMAL(78, 0) DEFAULT 0;

ALTER TABLE pool.transaction
DROP COLUMN ip;
12 changes: 12 additions & 0 deletions db/migrations/state/0004.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
-- +migrate Up
CREATE TABLE state.event
tclemos marked this conversation as resolved.
Show resolved Hide resolved
(
event_type VARCHAR NOT NULL,
timestamp TIMESTAMP WITH TIME ZONE NOT NULL,
ip VARCHAR,
tx_hash VARCHAR,
payload VARCHAR
);

-- +migrate Down
DROP table state.event;
1 change: 1 addition & 0 deletions jsonrpc/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type Request struct {
ID interface{} `json:"id"`
Method string `json:"method"`
Params json.RawMessage `json:"params,omitempty"`
IP string `json:"ip"`
}

// Response is a jsonrpc success response
Expand Down
8 changes: 4 additions & 4 deletions jsonrpc/endpoints_eth.go
Original file line number Diff line number Diff line change
Expand Up @@ -614,11 +614,11 @@ func (e *EthEndpoints) newPendingTransactionFilter(wsConn *websocket.Conn) (inte
// SendRawTransaction has two different ways to handle new transactions:
// - for Sequencer nodes it tries to add the tx to the pool
// - for Non-Sequencer nodes it relays the Tx to the Sequencer node
func (e *EthEndpoints) SendRawTransaction(input string) (interface{}, rpcError) {
func (e *EthEndpoints) SendRawTransaction(input, ip string) (interface{}, rpcError) {
tclemos marked this conversation as resolved.
Show resolved Hide resolved
if e.cfg.SequencerNodeURI != "" {
return e.relayTxToSequencerNode(input)
} else {
return e.tryToAddTxToPool(input)
return e.tryToAddTxToPool(input, ip)
}
}

Expand All @@ -637,14 +637,14 @@ func (e *EthEndpoints) relayTxToSequencerNode(input string) (interface{}, rpcErr
return txHash, nil
}

func (e *EthEndpoints) tryToAddTxToPool(input string) (interface{}, rpcError) {
func (e *EthEndpoints) tryToAddTxToPool(input, ip string) (interface{}, rpcError) {
tx, err := hexToTx(input)
if err != nil {
return rpcErrorResponse(invalidParamsErrorCode, "invalid tx input", err)
}

log.Infof("adding TX to the pool: %v", tx.Hash().Hex())
if err := e.pool.AddTx(context.Background(), *tx); err != nil {
if err := e.pool.AddTx(context.Background(), *tx, ip); err != nil {
return rpcErrorResponse(defaultErrorCode, err.Error(), nil)
}
log.Infof("TX added to the pool: %v", tx.Hash().Hex())
Expand Down
12 changes: 6 additions & 6 deletions jsonrpc/endpoints_eth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2739,7 +2739,7 @@ func TestSendRawTransactionViaGeth(t *testing.T) {
})

m.Pool.
On("AddTx", context.Background(), txMatchByHash).
On("AddTx", context.Background(), txMatchByHash, "").
Return(nil).
Once()
},
Expand All @@ -2756,7 +2756,7 @@ func TestSendRawTransactionViaGeth(t *testing.T) {
})

m.Pool.
On("AddTx", context.Background(), txMatchByHash).
On("AddTx", context.Background(), txMatchByHash, "").
Return(errors.New("failed to add TX to the pool")).
Once()
},
Expand Down Expand Up @@ -2814,7 +2814,7 @@ func TestSendRawTransactionJSONRPCCall(t *testing.T) {
},
SetupMocks: func(t *testing.T, m *mocks, tc testCase) {
m.Pool.
On("AddTx", context.Background(), mock.IsType(types.Transaction{})).
On("AddTx", context.Background(), mock.IsType(types.Transaction{}), "").
Return(nil).
Once()
},
Expand All @@ -2836,7 +2836,7 @@ func TestSendRawTransactionJSONRPCCall(t *testing.T) {
},
SetupMocks: func(t *testing.T, m *mocks, tc testCase) {
m.Pool.
On("AddTx", context.Background(), mock.IsType(types.Transaction{})).
On("AddTx", context.Background(), mock.IsType(types.Transaction{}), "").
Return(errors.New("failed to add TX to the pool")).
Once()
},
Expand Down Expand Up @@ -2904,7 +2904,7 @@ func TestSendRawTransactionViaGethForNonSequencerNode(t *testing.T) {
})

m.Pool.
On("AddTx", context.Background(), txMatchByHash).
On("AddTx", context.Background(), txMatchByHash, "").
Return(nil).
Once()
},
Expand All @@ -2921,7 +2921,7 @@ func TestSendRawTransactionViaGethForNonSequencerNode(t *testing.T) {
})

m.Pool.
On("AddTx", context.Background(), txMatchByHash).
On("AddTx", context.Background(), txMatchByHash, "").
Return(errors.New("failed to add TX to the pool")).
Once()
},
Expand Down
5 changes: 5 additions & 0 deletions jsonrpc/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,11 @@ func (h *Handler) Handle(req handleRequest) Response {
inArgs[i+1] = val.Elem()
}

// And request IP as an input parameter for eth_sendRawTransaction
if req.Method == "eth_sendRawTransaction" {
inputs = append(inputs, req.IP)
}

if fd.numParams() > 0 {
if err := json.Unmarshal(req.Params, &inputs); err != nil {
return NewResponse(req.Request, nil, newRPCError(invalidParamsErrorCode, "Invalid Params"))
Expand Down
2 changes: 1 addition & 1 deletion jsonrpc/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (

// jsonRPCTxPool contains the methods required to interact with the tx pool.
type jsonRPCTxPool interface {
AddTx(ctx context.Context, tx types.Transaction) error
AddTx(ctx context.Context, tx types.Transaction, ip string) error
GetGasPrice(ctx context.Context) (uint64, error)
GetNonce(ctx context.Context, address common.Address) (uint64, error)
GetPendingTxHashesSince(ctx context.Context, since time.Time) ([]common.Hash, error)
Expand Down
10 changes: 5 additions & 5 deletions jsonrpc/mock_pool_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 16 additions & 8 deletions jsonrpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,11 +243,13 @@ func (s *Server) handle(w http.ResponseWriter, req *http.Request) {
return
}

ip := req.Header.Get("X-Forwarded-For")

start := time.Now()
if single {
s.handleSingleRequest(w, data)
s.handleSingleRequest(w, data, ip)
} else {
s.handleBatchRequest(w, data)
s.handleBatchRequest(w, data, ip)
}
metrics.RequestDuration(start)
}
Expand All @@ -262,9 +264,9 @@ func (s *Server) isSingleRequest(data []byte) (bool, rpcError) {
return x[0] == '{', nil
}

func (s *Server) handleSingleRequest(w http.ResponseWriter, data []byte) {
func (s *Server) handleSingleRequest(w http.ResponseWriter, data []byte, ip string) {
defer metrics.RequestHandled(metrics.RequestHandledLabelSingle)
request, err := s.parseRequest(data)
request, err := s.parseRequest(data, ip)
if err != nil {
handleError(w, err)
return
Expand All @@ -285,9 +287,9 @@ func (s *Server) handleSingleRequest(w http.ResponseWriter, data []byte) {
}
}

func (s *Server) handleBatchRequest(w http.ResponseWriter, data []byte) {
func (s *Server) handleBatchRequest(w http.ResponseWriter, data []byte, ip string) {
defer metrics.RequestHandled(metrics.RequestHandledLabelBatch)
requests, err := s.parseRequests(data)
requests, err := s.parseRequests(data, ip)
if err != nil {
handleError(w, err)
return
Expand All @@ -308,23 +310,29 @@ func (s *Server) handleBatchRequest(w http.ResponseWriter, data []byte) {
}
}

func (s *Server) parseRequest(data []byte) (Request, error) {
func (s *Server) parseRequest(data []byte, ip string) (Request, error) {
var req Request

if err := json.Unmarshal(data, &req); err != nil {
return Request{}, newRPCError(invalidRequestErrorCode, "Invalid json request")
}

req.IP = ip

return req, nil
}

func (s *Server) parseRequests(data []byte) ([]Request, error) {
func (s *Server) parseRequests(data []byte, ip string) ([]Request, error) {
var requests []Request

if err := json.Unmarshal(data, &requests); err != nil {
return nil, newRPCError(invalidRequestErrorCode, "Invalid json request")
}

for _, req := range requests {
req.IP = ip
}

return requests, nil
}

Expand Down
2 changes: 1 addition & 1 deletion pool/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ type storage interface {
GetTxs(ctx context.Context, filterStatus TxStatus, isClaims bool, minGasPrice, limit uint64) ([]*Transaction, error)
GetTxFromAddressFromByHash(ctx context.Context, hash common.Hash) (common.Address, uint64, error)
GetTxByHash(ctx context.Context, hash common.Hash) (*Transaction, error)
IncrementFailedCounter(ctx context.Context, hashes []string) error
GetTxZkCountersByHash(ctx context.Context, hash common.Hash) (*state.ZKCounters, error)
DeleteTransactionByHash(ctx context.Context, hash common.Hash) error
MarkWIPTxsAsPending(ctx context.Context) error
Expand All @@ -41,4 +40,5 @@ type stateInterface interface {
GetNonce(ctx context.Context, address common.Address, batchNumber uint64, dbTx pgx.Tx) (uint64, error)
GetTransactionByHash(ctx context.Context, transactionHash common.Hash, dbTx pgx.Tx) (*types.Transaction, error)
PreProcessTransaction(ctx context.Context, tx *types.Transaction, dbTx pgx.Tx) (*state.ProcessBatchResponse, error)
AddEvent(ctx context.Context, event *state.Event, dbTx pgx.Tx) error
}
Loading