Skip to content
This repository has been archived by the owner on May 11, 2024. It is now read-only.

Commit

Permalink
fix(pkg): fix a bug in transaction sender (#606)
Browse files Browse the repository at this point in the history
Co-authored-by: David <david@taiko.xyz>
  • Loading branch information
mask-pp and davidtaikocha authored Mar 6, 2024
1 parent 1b03e6a commit 40325bc
Show file tree
Hide file tree
Showing 7 changed files with 178 additions and 81 deletions.
6 changes: 3 additions & 3 deletions driver/driver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func (s *DriverTestSuite) TestCheckL1ReorgToHigherFork() {
s.Equal(l1Head3.Hash(), l1Head1.Hash())

// Because of evm_revert operation, the nonce of the proposer need to be adjusted.
sender.AdjustNonce(nil)
s.Nil(sender.SetNonce(nil, true))
// Propose ten blocks on another fork
for i := 0; i < 10; i++ {
s.ProposeInvalidTxListBytes(s.p)
Expand Down Expand Up @@ -225,7 +225,7 @@ func (s *DriverTestSuite) TestCheckL1ReorgToLowerFork() {
s.Equal(l1Head3.Number.Uint64(), l1Head1.Number.Uint64())
s.Equal(l1Head3.Hash(), l1Head1.Hash())

sender.AdjustNonce(nil)
s.Nil(sender.SetNonce(nil, true))
// Propose one blocks on another fork
s.ProposeInvalidTxListBytes(s.p)

Expand Down Expand Up @@ -283,7 +283,7 @@ func (s *DriverTestSuite) TestCheckL1ReorgToSameHeightFork() {
s.Equal(l1Head3.Number.Uint64(), l1Head1.Number.Uint64())
s.Equal(l1Head3.Hash(), l1Head1.Hash())

sender.AdjustNonce(nil)
s.Nil(sender.SetNonce(nil, true))
// Propose two blocks on another fork
s.ProposeInvalidTxListBytes(s.p)
time.Sleep(3 * time.Second)
Expand Down
44 changes: 25 additions & 19 deletions internal/sender/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,27 +48,33 @@ func (s *Sender) adjustGas(txData types.TxData) {
}
}

// AdjustNonce adjusts the nonce of the given transaction with the current nonce of the sender.
func (s *Sender) AdjustNonce(txData types.TxData) {
nonce, err := s.client.NonceAt(s.ctx, s.Opts.From, nil)
if err != nil {
log.Warn("Failed to get the nonce", "from", s.Opts.From, "err", err)
return
// SetNonce adjusts the nonce of the given transaction with the current nonce of the sender.
func (s *Sender) SetNonce(txData types.TxData, adjust bool) (err error) {
var nonce uint64
if adjust {
s.nonce, err = s.client.NonceAt(s.ctx, s.Opts.From, nil)
if err != nil {
log.Warn("Failed to get the nonce", "from", s.Opts.From, "err", err)
return err
}
}
s.Opts.Nonce = new(big.Int).SetUint64(nonce)

switch tx := txData.(type) {
case *types.DynamicFeeTx:
tx.Nonce = nonce
case *types.BlobTx:
tx.Nonce = nonce
case *types.LegacyTx:
tx.Nonce = nonce
case *types.AccessListTx:
tx.Nonce = nonce
default:
log.Debug("Unsupported transaction type when adjust nonce", "from", s.Opts.From)
nonce = s.nonce

if !utils.IsNil(txData) {
switch tx := txData.(type) {
case *types.DynamicFeeTx:
tx.Nonce = nonce
case *types.BlobTx:
tx.Nonce = nonce
case *types.LegacyTx:
tx.Nonce = nonce
case *types.AccessListTx:
tx.Nonce = nonce
default:
return fmt.Errorf("unsupported transaction type: %v", txData)
}
}
return
}

// updateGasTipGasFee updates the gas tip cap and gas fee cap of the sender with the given chain head info.
Expand Down
95 changes: 70 additions & 25 deletions internal/sender/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ type Sender struct {
head *types.Header
client *rpc.EthClient

Opts *bind.TransactOpts
nonce uint64
Opts *bind.TransactOpts

unconfirmedTxs cmap.ConcurrentMap[string, *TxToConfirm]
txToConfirmCh cmap.ConcurrentMap[string, chan *TxToConfirm]
Expand Down Expand Up @@ -110,6 +111,12 @@ func NewSender(ctx context.Context, cfg *Config, client *rpc.EthClient, priv *ec
}
}

// Get the nonce
nonce, err := client.NonceAt(ctx, opts.From, nil)
if err != nil {
return nil, err
}

// Get the chain ID
head, err := client.HeaderByNumber(ctx, nil)
if err != nil {
Expand All @@ -121,13 +128,12 @@ func NewSender(ctx context.Context, cfg *Config, client *rpc.EthClient, priv *ec
Config: cfg,
head: head,
client: client,
nonce: nonce,
Opts: opts,
unconfirmedTxs: cmap.New[*TxToConfirm](),
txToConfirmCh: cmap.New[chan *TxToConfirm](),
stopCh: make(chan struct{}),
}
// Initialize the nonce
sender.AdjustNonce(nil)

// Initialize the gas fee related fields
if err = sender.updateGasTipGasFee(head); err != nil {
Expand Down Expand Up @@ -177,6 +183,10 @@ func (s *Sender) GetUnconfirmedTx(txID string) *types.Transaction {

// SendRawTransaction sends a transaction to the given Ethereum node.
func (s *Sender) SendRawTransaction(nonce uint64, target *common.Address, value *big.Int, data []byte) (string, error) {
if s.unconfirmedTxs.Count() >= unconfirmedTxsCap {
return "", fmt.Errorf("too many pending transactions")
}

gasLimit := s.GasLimit
if gasLimit == 0 {
var err error
Expand All @@ -192,16 +202,36 @@ func (s *Sender) SendRawTransaction(nonce uint64, target *common.Address, value
return "", err
}
}
return s.SendTransaction(types.NewTx(&types.DynamicFeeTx{
ChainID: s.client.ChainID,
To: target,
Nonce: nonce,
GasFeeCap: s.Opts.GasFeeCap,
GasTipCap: s.Opts.GasTipCap,
Gas: gasLimit,
Value: value,
Data: data,
}))

txID := uuid.New()
txToConfirm := &TxToConfirm{
ID: txID,
originalTx: &types.DynamicFeeTx{
ChainID: s.client.ChainID,
To: target,
Nonce: nonce,
GasFeeCap: s.Opts.GasFeeCap,
GasTipCap: s.Opts.GasTipCap,
Gas: gasLimit,
Value: value,
Data: data,
},
}

if err := s.send(txToConfirm, false); err != nil && !strings.Contains(err.Error(), "replacement transaction") {
log.Error("Failed to send transaction",
"tx_id", txID,
"nonce", txToConfirm.CurrentTx.Nonce(),
"err", err,
)
return "", err
}

// Add the transaction to the unconfirmed transactions
s.unconfirmedTxs.Set(txID, txToConfirm)
s.txToConfirmCh.Set(txID, make(chan *TxToConfirm, 1))

return txID, nil
}

// SendTransaction sends a transaction to the given Ethereum node.
Expand All @@ -222,7 +252,7 @@ func (s *Sender) SendTransaction(tx *types.Transaction) (string, error) {
CurrentTx: tx,
}

if err := s.send(txToConfirm); err != nil && !strings.Contains(err.Error(), "replacement transaction") {
if err := s.send(txToConfirm, true); err != nil && !strings.Contains(err.Error(), "replacement transaction") {
log.Error("Failed to send transaction",
"tx_id", txID,
"nonce", txToConfirm.CurrentTx.Nonce(),
Expand All @@ -240,12 +270,19 @@ func (s *Sender) SendTransaction(tx *types.Transaction) (string, error) {
}

// send is the internal method to send the given transaction.
func (s *Sender) send(tx *TxToConfirm) error {
func (s *Sender) send(tx *TxToConfirm, resetNonce bool) error {
s.mu.Lock()
defer s.mu.Unlock()

originalTx := tx.originalTx

if resetNonce {
// Set the nonce of the transaction.
if err := s.SetNonce(originalTx, false); err != nil {
return err
}
}

for i := 0; i < nonceIncorrectRetrys; i++ {
// Retry when nonce is incorrect
rawTx, err := s.Opts.Signer(s.Opts.From, types.NewTx(originalTx))
Expand All @@ -258,13 +295,21 @@ func (s *Sender) send(tx *TxToConfirm) error {
// Check if the error is nonce too low
if err != nil {
if strings.Contains(err.Error(), "nonce too low") {
s.AdjustNonce(originalTx)
log.Warn("Nonce is incorrect, retry sending the transaction with new nonce",
"tx_id", tx.ID,
"nonce", tx.CurrentTx.Nonce(),
"hash", rawTx.Hash(),
"err", err,
)
if err := s.SetNonce(originalTx, true); err != nil {
log.Error("Failed to set nonce when appear nonce too low",
"tx_id", tx.ID,
"nonce", tx.CurrentTx.Nonce(),
"hash", rawTx.Hash(),
"err", err,
)
} else {
log.Warn("Nonce is incorrect, retry sending the transaction with new nonce",
"tx_id", tx.ID,
"nonce", tx.CurrentTx.Nonce(),
"hash", rawTx.Hash(),
"err", err,
)
}
continue
}
if strings.Contains(err.Error(), "replacement transaction underpriced") {
Expand All @@ -287,7 +332,7 @@ func (s *Sender) send(tx *TxToConfirm) error {
}
break
}
s.Opts.Nonce = new(big.Int).Add(s.Opts.Nonce, common.Big1)
s.nonce++
return nil
}

Expand Down Expand Up @@ -340,7 +385,7 @@ func (s *Sender) resendUnconfirmedTxs() {
s.releaseUnconfirmedTx(id)
continue
}
if err := s.send(unconfirmedTx); err != nil {
if err := s.send(unconfirmedTx, true); err != nil {
log.Warn(
"Failed to resend the transaction",
"tx_id", id,
Expand Down Expand Up @@ -390,7 +435,7 @@ func (s *Sender) checkPendingTransactionsConfirmation() {
}
pendingTx.Receipt = receipt
if receipt.Status != types.ReceiptStatusSuccessful {
pendingTx.Err = fmt.Errorf("transaction reverted, hash: %s", receipt.TxHash)
pendingTx.Err = fmt.Errorf("transaction status is failed, hash: %s", receipt.TxHash)
s.releaseUnconfirmedTx(id)
continue
}
Expand Down
42 changes: 40 additions & 2 deletions internal/sender/sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,50 @@ type SenderTestSuite struct {
sender *sender.Sender
}

func (s *SenderTestSuite) TestNormalSender() {
func (s *SenderTestSuite) TestSendTransaction() {
var (
opts = s.sender.Opts
client = s.RPCClient.L1
eg errgroup.Group
)
eg.SetLimit(runtime.NumCPU())
for i := 0; i < 8; i++ {
i := i
eg.Go(func() error {
to := common.BigToAddress(big.NewInt(int64(i)))
tx := types.NewTx(&types.DynamicFeeTx{
ChainID: client.ChainID,
To: &to,
GasFeeCap: opts.GasFeeCap,
GasTipCap: opts.GasTipCap,
Gas: 21000000,
Value: big.NewInt(1),
Data: nil,
})

_, err := s.sender.SendTransaction(tx)
return err
})
}
s.Nil(eg.Wait())

for _, confirmCh := range s.sender.TxToConfirmChannels() {
confirm := <-confirmCh
s.Nil(confirm.Err)
}
}

func (s *SenderTestSuite) TestSendRawTransaction() {
nonce, err := s.RPCClient.L1.NonceAt(context.Background(), s.sender.Opts.From, nil)
s.Nil(err)

var eg errgroup.Group
eg.SetLimit(runtime.NumCPU())
for i := 0; i < 5; i++ {
i := i
eg.Go(func() error {
addr := common.BigToAddress(big.NewInt(int64(i)))
_, err := s.sender.SendRawTransaction(s.sender.Opts.Nonce.Uint64(), &addr, big.NewInt(1), nil)
_, err := s.sender.SendRawTransaction(nonce+uint64(i), &addr, big.NewInt(1), nil)
return err
})
}
Expand Down Expand Up @@ -121,6 +157,7 @@ func (s *SenderTestSuite) TestNonceTooLow() {

func (s *SenderTestSuite) SetupTest() {
s.ClientTestSuite.SetupTest()
s.SetL1Automine(true)

ctx := context.Background()
priv, err := crypto.ToECDSA(common.FromHex(os.Getenv("L1_PROPOSER_PRIVATE_KEY")))
Expand All @@ -137,6 +174,7 @@ func (s *SenderTestSuite) SetupTest() {
}

func (s *SenderTestSuite) TearDownTest() {
s.SetL1Automine(false)
s.sender.Close()
s.ClientTestSuite.TearDownTest()
}
Expand Down
8 changes: 8 additions & 0 deletions internal/testutils/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ func (s *ClientTestSuite) ProposeInvalidTxListBytes(proposer Proposer) {
invalidTxListBytes := RandomBytes(256)

s.Nil(proposer.ProposeTxList(context.Background(), invalidTxListBytes, 1))
for _, confirmCh := range proposer.GetSender().TxToConfirmChannels() {
confirm := <-confirmCh
s.Nil(confirm.Err)
}
}

func (s *ClientTestSuite) ProposeAndInsertEmptyBlocks(
Expand All @@ -54,6 +58,10 @@ func (s *ClientTestSuite) ProposeAndInsertEmptyBlocks(
s.Nil(err)

s.Nil(proposer.ProposeTxList(context.Background(), encoded, 0))
for _, confirmCh := range proposer.GetSender().TxToConfirmChannels() {
confirm := <-confirmCh
s.Nil(confirm.Err)
}

s.ProposeInvalidTxListBytes(proposer)

Expand Down
Loading

0 comments on commit 40325bc

Please sign in to comment.