Skip to content

Commit

Permalink
Add SendAsync to TxMgr (#11843)
Browse files Browse the repository at this point in the history
* Add SendAsync to TxMgr

Adds a SendAsync method to TxMgr. I'd like to use this for `op-deployer`, which needs to send multiple transactions in parallel but with predictable nonces. `SendAsync` returns a channel that resolves with the result of each send, but synchronously increases the nonce and prepares the first send prior to returning.

* review updates + tests
  • Loading branch information
mslipper authored Sep 12, 2024
1 parent d887cfa commit 97aa08a
Show file tree
Hide file tree
Showing 5 changed files with 285 additions and 163 deletions.
4 changes: 4 additions & 0 deletions op-challenger/sender/sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,10 @@ func (s *stubTxMgr) Send(ctx context.Context, candidate txmgr.TxCandidate) (*typ
return <-ch, nil
}

func (s *stubTxMgr) SendAsync(ctx context.Context, candidate txmgr.TxCandidate, ch chan txmgr.SendResponse) {
panic("unimplemented")
}

func (s *stubTxMgr) recordTx(candidate txmgr.TxCandidate) chan *types.Receipt {
s.m.Lock()
defer s.m.Unlock()
Expand Down
4 changes: 4 additions & 0 deletions op-e2e/actions/l2_proposer.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ func (f fakeTxMgr) Send(_ context.Context, _ txmgr.TxCandidate) (*types.Receipt,
panic("unimplemented")
}

func (f fakeTxMgr) SendAsync(ctx context.Context, candidate txmgr.TxCandidate, ch chan txmgr.SendResponse) {
panic("unimplemented")
}

func (f fakeTxMgr) Close() {
}

Expand Down
5 changes: 5 additions & 0 deletions op-service/txmgr/mocks/TxManager.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

93 changes: 83 additions & 10 deletions op-service/txmgr/txmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@ var (
ErrClosed = errors.New("transaction manager is closed")
)

type SendResponse struct {
Receipt *types.Receipt
Nonce uint64
Err error
}

// TxManager is an interface that allows callers to reliably publish txs,
// bumping the gas price if needed, and obtain the receipt of the resulting tx.
//
Expand All @@ -63,6 +69,14 @@ type TxManager interface {
// mempool and is in need of replacement or cancellation.
Send(ctx context.Context, candidate TxCandidate) (*types.Receipt, error)

// SendAsync is used to create & send a transaction asynchronously. It has similar internal
// semantics to Send, however it returns a channel that will receive the result of the
// send operation once it completes. Transactions crafted synchronously - that is, nonce
// management and gas estimation happen prior to the method returning. This allows callers
// that rely on predictable nonces to send multiple transactions in parallel while preserving
// the order of nonce increments.
SendAsync(ctx context.Context, candidate TxCandidate, ch chan SendResponse)

// From returns the sending address associated with the instance of the transaction manager.
// It is static for a single instance of a TxManager.
From() common.Address
Expand Down Expand Up @@ -222,24 +236,83 @@ func (m *SimpleTxManager) Send(ctx context.Context, candidate TxCandidate) (*typ
if m.closed.Load() {
return nil, ErrClosed
}

m.metr.RecordPendingTx(m.pending.Add(1))
defer func() {
m.metr.RecordPendingTx(m.pending.Add(-1))
}()
receipt, err := m.send(ctx, candidate)
defer m.metr.RecordPendingTx(m.pending.Add(-1))

var cancel context.CancelFunc
if m.cfg.TxSendTimeout == 0 {
ctx, cancel = context.WithCancel(ctx)
} else {
ctx, cancel = context.WithTimeout(ctx, m.cfg.TxSendTimeout)
}
defer cancel()

tx, err := m.prepare(ctx, candidate)
if err != nil {
m.resetNonce()
return nil, err
}
receipt, err := m.sendTx(ctx, tx)
if err != nil {
m.resetNonce()
return nil, err
}
return receipt, err
}

// send performs the actual transaction creation and sending.
func (m *SimpleTxManager) send(ctx context.Context, candidate TxCandidate) (*types.Receipt, error) {
if m.cfg.TxSendTimeout != 0 {
var cancel context.CancelFunc
func (m *SimpleTxManager) SendAsync(ctx context.Context, candidate TxCandidate, ch chan SendResponse) {
if cap(ch) == 0 {
panic("SendAsync: channel must be buffered")
}

// refuse new requests if the tx manager is closed
if m.closed.Load() {
ch <- SendResponse{
Receipt: nil,
Err: ErrClosed,
}
return
}

m.metr.RecordPendingTx(m.pending.Add(1))

var cancel context.CancelFunc
if m.cfg.TxSendTimeout == 0 {
ctx, cancel = context.WithCancel(ctx)
} else {
ctx, cancel = context.WithTimeout(ctx, m.cfg.TxSendTimeout)
defer cancel()
}

tx, err := m.prepare(ctx, candidate)
if err != nil {
m.resetNonce()
cancel()
m.metr.RecordPendingTx(m.pending.Add(-1))
ch <- SendResponse{
Receipt: nil,
Err: err,
}
return
}

go func() {
defer m.metr.RecordPendingTx(m.pending.Add(-1))
defer cancel()
receipt, err := m.sendTx(ctx, tx)
if err != nil {
m.resetNonce()
}
ch <- SendResponse{
Receipt: receipt,
Nonce: tx.Nonce(),
Err: err,
}
}()
}

// prepare prepares the transaction for sending.
func (m *SimpleTxManager) prepare(ctx context.Context, candidate TxCandidate) (*types.Transaction, error) {
tx, err := retry.Do(ctx, 30, retry.Fixed(2*time.Second), func() (*types.Transaction, error) {
if m.closed.Load() {
return nil, ErrClosed
Expand All @@ -253,7 +326,7 @@ func (m *SimpleTxManager) send(ctx context.Context, candidate TxCandidate) (*typ
if err != nil {
return nil, fmt.Errorf("failed to create the tx: %w", err)
}
return m.sendTx(ctx, tx)
return tx, nil
}

// craftTx creates the signed transaction
Expand Down
Loading

0 comments on commit 97aa08a

Please sign in to comment.