Skip to content

Commit

Permalink
Add AsyncSend to TxMgr
Browse files Browse the repository at this point in the history
Adds an AsyncSend method to TxMgr. I'd like to use this for `op-deployer`, which needs to send multiple transactions in parallel but with predictable nonces. `AsyncSend` returns a channel that resolves with the result of each send, but synchronously increases the nonce and prepares the first send prior to returning.
  • Loading branch information
mslipper committed Sep 11, 2024
1 parent 627f7af commit dafbe65
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 13 deletions.
70 changes: 59 additions & 11 deletions op-service/txmgr/txmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@ var (
ErrClosed = errors.New("transaction manager is closed")
)

type SendResponse struct {
Receipt *types.Receipt
Nonce uint64
Err error
}

// TxManager is an interface that allows callers to reliably publish txs,
// bumping the gas price if needed, and obtain the receipt of the resulting tx.
//
Expand All @@ -63,6 +69,14 @@ type TxManager interface {
// mempool and is in need of replacement or cancellation.
Send(ctx context.Context, candidate TxCandidate) (*types.Receipt, error)

// SendAsync is used to create & send a transaction asynchronously. It has similar internal
// semantics to Send, however it returns a channel that will receive the result of the
// send operation once it completes. Transactions crafted synchronously - that is, nonce
// management and gas estimation happen prior to the method returning. This allows callers
// that rely on predictable nonces to send multiple transactions in parallel while preserving
// the order of nonce increments.
SendAsync(ctx context.Context, candidate TxCandidate) <-chan SendResponse

// From returns the sending address associated with the instance of the transaction manager.
// It is static for a single instance of a TxManager.
From() common.Address
Expand Down Expand Up @@ -218,28 +232,62 @@ type TxCandidate struct {
//
// NOTE: Send can be called concurrently, the nonce will be managed internally.
func (m *SimpleTxManager) Send(ctx context.Context, candidate TxCandidate) (*types.Receipt, error) {
res := <-m.SendAsync(ctx, candidate)
return res.Receipt, res.Err
}

func (m *SimpleTxManager) SendAsync(ctx context.Context, candidate TxCandidate) <-chan SendResponse {
ch := make(chan SendResponse, 1)
// refuse new requests if the tx manager is closed
if m.closed.Load() {
return nil, ErrClosed
ch <- SendResponse{
Receipt: nil,
Err: ErrClosed,
}
return ch
}

m.metr.RecordPendingTx(m.pending.Add(1))
defer func() {
m.metr.RecordPendingTx(m.pending.Add(-1))
}()
receipt, err := m.send(ctx, candidate)

var cancel context.CancelFunc
if m.cfg.TxSendTimeout == 0 {
ctx, cancel = context.WithCancel(ctx)
} else {
ctx, cancel = context.WithTimeout(ctx, m.cfg.TxSendTimeout)
}

tx, err := m.prepare(ctx, candidate)
if err != nil {
m.resetNonce()
cancel()
ch <- SendResponse{
Receipt: nil,
Err: err,
}
return ch
}
return receipt, err

go func() {
receipt, err := m.sendTx(ctx, tx)
cancel()
if err != nil {
m.resetNonce()
}
ch <- SendResponse{
Receipt: receipt,
Nonce: tx.Nonce(),
Err: err,
}
}()

return ch
}

// send performs the actual transaction creation and sending.
func (m *SimpleTxManager) send(ctx context.Context, candidate TxCandidate) (*types.Receipt, error) {
if m.cfg.TxSendTimeout != 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, m.cfg.TxSendTimeout)
defer cancel()
}
// prepare prepares the transaction for sending.
func (m *SimpleTxManager) prepare(ctx context.Context, candidate TxCandidate) (*types.Transaction, error) {
tx, err := retry.Do(ctx, 30, retry.Fixed(2*time.Second), func() (*types.Transaction, error) {
if m.closed.Load() {
return nil, ErrClosed
Expand All @@ -253,7 +301,7 @@ func (m *SimpleTxManager) send(ctx context.Context, candidate TxCandidate) (*typ
if err != nil {
return nil, fmt.Errorf("failed to create the tx: %w", err)
}
return m.sendTx(ctx, tx)
return tx, nil
}

// craftTx creates the signed transaction
Expand Down
4 changes: 2 additions & 2 deletions op-service/txmgr/txmgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -420,12 +420,12 @@ func TestTxMgrTxSendTimeout(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Hour)
defer cancel()

receipt, err := h.mgr.send(ctx, txCandidate)
tx, err := h.mgr.Send(ctx, txCandidate)
require.ErrorIs(t, err, context.DeadlineExceeded)
// Because network timeout is much shorter than send timeout, we should see multiple send attempts
// before the overall send fails.
require.Greater(t, sendCount, 1)
require.Nil(t, receipt)
require.Nil(t, tx)
}

// TestAlreadyReserved tests that AlreadyReserved error results in immediate abort of transaction
Expand Down

0 comments on commit dafbe65

Please sign in to comment.