Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add SendAsync to TxMgr #11843

Merged
merged 2 commits into from
Sep 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions op-challenger/sender/sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,10 @@ func (s *stubTxMgr) Send(ctx context.Context, candidate txmgr.TxCandidate) (*typ
return <-ch, nil
}

func (s *stubTxMgr) SendAsync(ctx context.Context, candidate txmgr.TxCandidate, ch chan txmgr.SendResponse) {
panic("unimplemented")
}

func (s *stubTxMgr) recordTx(candidate txmgr.TxCandidate) chan *types.Receipt {
s.m.Lock()
defer s.m.Unlock()
Expand Down
4 changes: 4 additions & 0 deletions op-e2e/actions/l2_proposer.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ func (f fakeTxMgr) Send(_ context.Context, _ txmgr.TxCandidate) (*types.Receipt,
panic("unimplemented")
}

func (f fakeTxMgr) SendAsync(ctx context.Context, candidate txmgr.TxCandidate, ch chan txmgr.SendResponse) {
panic("unimplemented")
}

func (f fakeTxMgr) Close() {
}

Expand Down
5 changes: 5 additions & 0 deletions op-service/txmgr/mocks/TxManager.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

93 changes: 83 additions & 10 deletions op-service/txmgr/txmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@ var (
ErrClosed = errors.New("transaction manager is closed")
)

type SendResponse struct {
Receipt *types.Receipt
Nonce uint64
Err error
}

// TxManager is an interface that allows callers to reliably publish txs,
// bumping the gas price if needed, and obtain the receipt of the resulting tx.
//
Expand All @@ -63,6 +69,14 @@ type TxManager interface {
// mempool and is in need of replacement or cancellation.
Send(ctx context.Context, candidate TxCandidate) (*types.Receipt, error)

// SendAsync is used to create & send a transaction asynchronously. It has similar internal
// semantics to Send, however it returns a channel that will receive the result of the
// send operation once it completes. Transactions crafted synchronously - that is, nonce
// management and gas estimation happen prior to the method returning. This allows callers
// that rely on predictable nonces to send multiple transactions in parallel while preserving
// the order of nonce increments.
SendAsync(ctx context.Context, candidate TxCandidate, ch chan SendResponse)

// From returns the sending address associated with the instance of the transaction manager.
// It is static for a single instance of a TxManager.
From() common.Address
Expand Down Expand Up @@ -222,24 +236,83 @@ func (m *SimpleTxManager) Send(ctx context.Context, candidate TxCandidate) (*typ
if m.closed.Load() {
return nil, ErrClosed
}

m.metr.RecordPendingTx(m.pending.Add(1))
defer func() {
m.metr.RecordPendingTx(m.pending.Add(-1))
}()
receipt, err := m.send(ctx, candidate)
defer m.metr.RecordPendingTx(m.pending.Add(-1))

var cancel context.CancelFunc
if m.cfg.TxSendTimeout == 0 {
ctx, cancel = context.WithCancel(ctx)
} else {
ctx, cancel = context.WithTimeout(ctx, m.cfg.TxSendTimeout)
}
defer cancel()

tx, err := m.prepare(ctx, candidate)
if err != nil {
m.resetNonce()
return nil, err
}
receipt, err := m.sendTx(ctx, tx)
if err != nil {
m.resetNonce()
return nil, err
}
return receipt, err
}

// send performs the actual transaction creation and sending.
func (m *SimpleTxManager) send(ctx context.Context, candidate TxCandidate) (*types.Receipt, error) {
if m.cfg.TxSendTimeout != 0 {
var cancel context.CancelFunc
func (m *SimpleTxManager) SendAsync(ctx context.Context, candidate TxCandidate, ch chan SendResponse) {
if cap(ch) == 0 {
panic("SendAsync: channel must be buffered")
}

// refuse new requests if the tx manager is closed
if m.closed.Load() {
ch <- SendResponse{
Receipt: nil,
Err: ErrClosed,
}
return
}

m.metr.RecordPendingTx(m.pending.Add(1))

var cancel context.CancelFunc
if m.cfg.TxSendTimeout == 0 {
ctx, cancel = context.WithCancel(ctx)
} else {
ctx, cancel = context.WithTimeout(ctx, m.cfg.TxSendTimeout)
defer cancel()
}

tx, err := m.prepare(ctx, candidate)
if err != nil {
m.resetNonce()
cancel()
m.metr.RecordPendingTx(m.pending.Add(-1))
ch <- SendResponse{
Receipt: nil,
Err: err,
}
return
}

go func() {
mslipper marked this conversation as resolved.
Show resolved Hide resolved
defer m.metr.RecordPendingTx(m.pending.Add(-1))
defer cancel()
receipt, err := m.sendTx(ctx, tx)
if err != nil {
m.resetNonce()
}
ch <- SendResponse{
Receipt: receipt,
Nonce: tx.Nonce(),
Err: err,
}
}()
}

// prepare prepares the transaction for sending.
func (m *SimpleTxManager) prepare(ctx context.Context, candidate TxCandidate) (*types.Transaction, error) {
tx, err := retry.Do(ctx, 30, retry.Fixed(2*time.Second), func() (*types.Transaction, error) {
if m.closed.Load() {
return nil, ErrClosed
Expand All @@ -253,7 +326,7 @@ func (m *SimpleTxManager) send(ctx context.Context, candidate TxCandidate) (*typ
if err != nil {
return nil, fmt.Errorf("failed to create the tx: %w", err)
}
return m.sendTx(ctx, tx)
return tx, nil
}

// craftTx creates the signed transaction
Expand Down
Loading