Skip to content

Commit

Permalink
Add SendAsync to TxMgr
Browse files Browse the repository at this point in the history
Adds an SendAsync method to TxMgr. I'd like to use this for `op-deployer`, which needs to send multiple transactions in parallel but with predictable nonces. `SendAsync` returns a channel that resolves with the result of each send, but synchronously increases the nonce and prepares the first send prior to returning.
  • Loading branch information
mslipper committed Sep 11, 2024
1 parent 627f7af commit d9dacff
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 12 deletions.
4 changes: 4 additions & 0 deletions op-challenger/sender/sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,10 @@ func (s *stubTxMgr) Send(ctx context.Context, candidate txmgr.TxCandidate) (*typ
return <-ch, nil
}

func (s *stubTxMgr) SendAsync(ctx context.Context, candidate txmgr.TxCandidate, ch chan txmgr.SendResponse) {
panic("unimplemented")
}

func (s *stubTxMgr) recordTx(candidate txmgr.TxCandidate) chan *types.Receipt {
s.m.Lock()
defer s.m.Unlock()
Expand Down
4 changes: 4 additions & 0 deletions op-e2e/actions/l2_proposer.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ func (f fakeTxMgr) Send(_ context.Context, _ txmgr.TxCandidate) (*types.Receipt,
panic("unimplemented")
}

func (f fakeTxMgr) SendAsync(ctx context.Context, candidate txmgr.TxCandidate, ch chan txmgr.SendResponse) {
panic("unimplemented")
}

func (f fakeTxMgr) Close() {
}

Expand Down
4 changes: 4 additions & 0 deletions op-service/txmgr/mocks/TxManager.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

71 changes: 60 additions & 11 deletions op-service/txmgr/txmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@ var (
ErrClosed = errors.New("transaction manager is closed")
)

type SendResponse struct {
Receipt *types.Receipt
Nonce uint64
Err error
}

// TxManager is an interface that allows callers to reliably publish txs,
// bumping the gas price if needed, and obtain the receipt of the resulting tx.
//
Expand All @@ -63,6 +69,14 @@ type TxManager interface {
// mempool and is in need of replacement or cancellation.
Send(ctx context.Context, candidate TxCandidate) (*types.Receipt, error)

// SendAsync is used to create & send a transaction asynchronously. It has similar internal
// semantics to Send, however it returns a channel that will receive the result of the
// send operation once it completes. Transactions crafted synchronously - that is, nonce
// management and gas estimation happen prior to the method returning. This allows callers
// that rely on predictable nonces to send multiple transactions in parallel while preserving
// the order of nonce increments.
SendAsync(ctx context.Context, candidate TxCandidate, ch chan SendResponse)

// From returns the sending address associated with the instance of the transaction manager.
// It is static for a single instance of a TxManager.
From() common.Address
Expand Down Expand Up @@ -218,28 +232,63 @@ type TxCandidate struct {
//
// NOTE: Send can be called concurrently, the nonce will be managed internally.
func (m *SimpleTxManager) Send(ctx context.Context, candidate TxCandidate) (*types.Receipt, error) {
ch := make(chan SendResponse, 1)
m.SendAsync(ctx, candidate, ch)
res := <-ch
return res.Receipt, res.Err
}

func (m *SimpleTxManager) SendAsync(ctx context.Context, candidate TxCandidate, ch chan SendResponse) {
// refuse new requests if the tx manager is closed
if m.closed.Load() {
return nil, ErrClosed
ch <- SendResponse{
Receipt: nil,
Err: ErrClosed,
}
return
}

m.metr.RecordPendingTx(m.pending.Add(1))
defer func() {
m.metr.RecordPendingTx(m.pending.Add(-1))
}()
receipt, err := m.send(ctx, candidate)

var cancel context.CancelFunc
if m.cfg.TxSendTimeout == 0 {
ctx, cancel = context.WithCancel(ctx)
} else {
ctx, cancel = context.WithTimeout(ctx, m.cfg.TxSendTimeout)
}

tx, err := m.prepare(ctx, candidate)
if err != nil {
m.resetNonce()
cancel()
ch <- SendResponse{
Receipt: nil,
Err: err,
}
return
}
return receipt, err

go func() {
receipt, err := m.sendTx(ctx, tx)
cancel()
if err != nil {
m.resetNonce()
}
ch <- SendResponse{
Receipt: receipt,
Nonce: tx.Nonce(),
Err: err,
}
}()

return
}

// send performs the actual transaction creation and sending.
func (m *SimpleTxManager) send(ctx context.Context, candidate TxCandidate) (*types.Receipt, error) {
if m.cfg.TxSendTimeout != 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, m.cfg.TxSendTimeout)
defer cancel()
}
// prepare prepares the transaction for sending.
func (m *SimpleTxManager) prepare(ctx context.Context, candidate TxCandidate) (*types.Transaction, error) {
tx, err := retry.Do(ctx, 30, retry.Fixed(2*time.Second), func() (*types.Transaction, error) {
if m.closed.Load() {
return nil, ErrClosed
Expand All @@ -253,7 +302,7 @@ func (m *SimpleTxManager) send(ctx context.Context, candidate TxCandidate) (*typ
if err != nil {
return nil, fmt.Errorf("failed to create the tx: %w", err)
}
return m.sendTx(ctx, tx)
return tx, nil
}

// craftTx creates the signed transaction
Expand Down
2 changes: 1 addition & 1 deletion op-service/txmgr/txmgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ func TestTxMgrTxSendTimeout(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Hour)
defer cancel()

receipt, err := h.mgr.send(ctx, txCandidate)
receipt, err := h.mgr.Send(ctx, txCandidate)
require.ErrorIs(t, err, context.DeadlineExceeded)
// Because network timeout is much shorter than send timeout, we should see multiple send attempts
// before the overall send fails.
Expand Down

0 comments on commit d9dacff

Please sign in to comment.