Skip to content

Commit

Permalink
txmgr: improve code sharing between Send and SendAsync (ethereum-opti…
Browse files Browse the repository at this point in the history
  • Loading branch information
sebastianst authored Sep 12, 2024
1 parent 849680b commit 87af6f0
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 56 deletions.
61 changes: 15 additions & 46 deletions op-service/txmgr/txmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,9 +232,14 @@ type TxCandidate struct {
//
// NOTE: Send can be called concurrently, the nonce will be managed internally.
func (m *SimpleTxManager) Send(ctx context.Context, candidate TxCandidate) (*types.Receipt, error) {
_, r, err := m.send(ctx, candidate)
return r, err
}

func (m *SimpleTxManager) send(ctx context.Context, candidate TxCandidate) (*types.Transaction, *types.Receipt, error) {
// refuse new requests if the tx manager is closed
if m.closed.Load() {
return nil, ErrClosed
return nil, nil, ErrClosed
}

m.metr.RecordPendingTx(m.pending.Add(1))
Expand All @@ -251,63 +256,27 @@ func (m *SimpleTxManager) Send(ctx context.Context, candidate TxCandidate) (*typ
tx, err := m.prepare(ctx, candidate)
if err != nil {
m.resetNonce()
return nil, err
return tx, nil, err
}
receipt, err := m.sendTx(ctx, tx)
if err != nil {
m.resetNonce()
return nil, err
return nil, nil, err
}
return receipt, err
return tx, receipt, err
}

func (m *SimpleTxManager) SendAsync(ctx context.Context, candidate TxCandidate, ch chan SendResponse) {
if cap(ch) == 0 {
panic("SendAsync: channel must be buffered")
}

// refuse new requests if the tx manager is closed
if m.closed.Load() {
ch <- SendResponse{
Receipt: nil,
Err: ErrClosed,
}
return
}

m.metr.RecordPendingTx(m.pending.Add(1))

var cancel context.CancelFunc
if m.cfg.TxSendTimeout == 0 {
ctx, cancel = context.WithCancel(ctx)
} else {
ctx, cancel = context.WithTimeout(ctx, m.cfg.TxSendTimeout)
}

tx, err := m.prepare(ctx, candidate)
if err != nil {
m.resetNonce()
cancel()
m.metr.RecordPendingTx(m.pending.Add(-1))
ch <- SendResponse{
Receipt: nil,
Err: err,
}
return
}

go func() {
defer m.metr.RecordPendingTx(m.pending.Add(-1))
defer cancel()
receipt, err := m.sendTx(ctx, tx)
if err != nil {
m.resetNonce()
}
ch <- SendResponse{
tx, receipt, err := m.send(ctx, candidate)
r := SendResponse{
Receipt: receipt,
Nonce: tx.Nonce(),
Err: err,
}
if tx != nil {
r.Nonce = tx.Nonce()
}
ch <- r
}()
}

Expand Down
12 changes: 2 additions & 10 deletions op-service/txmgr/txmgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,8 @@ func testSendVariants(t *testing.T, testFn func(t *testing.T, send testSendVaria

t.Run("SendAsync", func(t *testing.T) {
testFn(t, func(ctx context.Context, h *testHarness, tx TxCandidate) (*types.Receipt, error) {
ch := make(chan SendResponse, 1)
// unbuffered is ok, will be written to from a goroutine spawned inside SendAsync
ch := make(chan SendResponse)
h.mgr.SendAsync(ctx, tx, ch)
res := <-ch
return res.Receipt, res.Err
Expand Down Expand Up @@ -1588,12 +1589,3 @@ func TestMakeSidecar(t *testing.T) {
require.Equal(t, hashes[i], eth.KZGToVersionedHash(commit))
}
}

func TestSendAsyncUnbufferedChan(t *testing.T) {
conf := configWithNumConfs(2)
h := newTestHarnessWithConfig(t, conf)

require.Panics(t, func() {
h.mgr.SendAsync(context.Background(), TxCandidate{}, make(chan SendResponse))
})
}

0 comments on commit 87af6f0

Please sign in to comment.