diff --git a/core/adapters/eth_tx.go b/core/adapters/eth_tx.go index ab737faebff..cc9c6d9c166 100644 --- a/core/adapters/eth_tx.go +++ b/core/adapters/eth_tx.go @@ -7,7 +7,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" "github.com/smartcontractkit/chainlink/core/logger" - "github.com/smartcontractkit/chainlink/core/store" + strpkg "github.com/smartcontractkit/chainlink/core/store" "github.com/smartcontractkit/chainlink/core/store/models" "github.com/smartcontractkit/chainlink/core/utils" "gopkg.in/guregu/null.v3" @@ -33,7 +33,7 @@ type EthTx struct { // Perform creates the run result for the transaction if the existing run result // is not currently pending. Then it confirms the transaction was confirmed on // the blockchain. -func (etx *EthTx) Perform(input models.RunResult, store *store.Store) models.RunResult { +func (etx *EthTx) Perform(input models.RunResult, store *strpkg.Store) models.RunResult { if !store.TxManager.Connected() { input.MarkPendingConnection() return input @@ -69,7 +69,7 @@ func getTxData(e *EthTx, input *models.RunResult) ([]byte, error) { func createTxRunResult( e *EthTx, input *models.RunResult, - store *store.Store, + store *strpkg.Store, ) { value, err := getTxData(e, input) if err != nil { @@ -96,10 +96,38 @@ func createTxRunResult( } input.ApplyResult(tx.Hash.String()) - ensureTxRunResult(input, store) + + txAttempt := tx.Attempts[0] + logger.Debugw( + fmt.Sprintf("Tx #0 checking on-chain state"), + "txHash", txAttempt.Hash.String(), + "txID", txAttempt.TxID, + ) + + receipt, state, err := store.TxManager.CheckAttempt(txAttempt, tx.SentAt) + if err != nil { + input.SetError(err) + return + } + + logger.Debugw( + fmt.Sprintf("Tx #0 is %s", state), + "txHash", txAttempt.Hash.String(), + "txID", txAttempt.TxID, + "receiptBlockNumber", receipt.BlockNumber.ToInt(), + "currentBlockNumber", tx.SentAt, + "receiptHash", receipt.Hash.Hex(), + ) + + if state != strpkg.Safe { + input.MarkPendingConfirmations() + return + } + + addReceiptToResult(receipt, input) } -func ensureTxRunResult(input *models.RunResult, str *store.Store) { +func ensureTxRunResult(input *models.RunResult, str *strpkg.Store) { val, err := input.ResultString() if err != nil { input.SetError(err) @@ -112,14 +140,23 @@ func ensureTxRunResult(input *models.RunResult, str *store.Store) { return } - receipt, err := str.TxManager.BumpGasUntilSafe(hash) + receipt, state, err := str.TxManager.BumpGasUntilSafe(hash) if err != nil { + if state == strpkg.Unknown { + input.SetError(err) + return + } + + // We failed to get one of the TxAttempt receipts, so we won't mark this + // run as errored in order to try again logger.Warn("EthTx Adapter Perform Resuming: ", err) } - if receipt == nil { + + if state != strpkg.Safe { input.MarkPendingConfirmations() return } + addReceiptToResult(receipt, input) } diff --git a/core/adapters/eth_tx_test.go b/core/adapters/eth_tx_test.go index 083e296a459..b0e5e42434f 100644 --- a/core/adapters/eth_tx_test.go +++ b/core/adapters/eth_tx_test.go @@ -13,6 +13,7 @@ import ( "github.com/smartcontractkit/chainlink/core/adapters" "github.com/smartcontractkit/chainlink/core/internal/cltest" "github.com/smartcontractkit/chainlink/core/internal/mocks" + strpkg "github.com/smartcontractkit/chainlink/core/store" "github.com/smartcontractkit/chainlink/core/store/models" "github.com/smartcontractkit/chainlink/core/utils" "github.com/stretchr/testify/assert" @@ -25,7 +26,6 @@ func TestEthTxAdapter_Perform_Confirmed(t *testing.T) { app, cleanup := cltest.NewApplicationWithKey(t) defer cleanup() store := app.Store - config := store.Config address := cltest.NewAddress() fHash := models.HexToFunctionSelector("b3f98adc") @@ -40,7 +40,6 @@ func TestEthTxAdapter_Perform_Confirmed(t *testing.T) { hash := cltest.NewHash() sentAt := uint64(23456) confirmed := sentAt + 1 - safe := confirmed + config.MinOutgoingConfirmations() ethMock.Register("eth_sendRawTransaction", hash, func(_ interface{}, data ...interface{}) error { rlp := data[0].([]interface{})[0].(string) @@ -57,7 +56,6 @@ func TestEthTxAdapter_Perform_Confirmed(t *testing.T) { ethMock.Register("eth_blockNumber", utils.Uint64ToHex(sentAt)) receipt := models.TxReceipt{Hash: hash, BlockNumber: cltest.Int(confirmed)} ethMock.Register("eth_getTransactionReceipt", receipt) - ethMock.Register("eth_blockNumber", utils.Uint64ToHex(safe)) adapter := adapters.EthTx{ Address: address, @@ -84,7 +82,6 @@ func TestEthTxAdapter_Perform_ConfirmedWithBytes(t *testing.T) { app, cleanup := cltest.NewApplicationWithKey(t) defer cleanup() store := app.Store - config := store.Config address := cltest.NewAddress() fHash := models.HexToFunctionSelector("b3f98adc") @@ -99,7 +96,6 @@ func TestEthTxAdapter_Perform_ConfirmedWithBytes(t *testing.T) { hash := cltest.NewHash() sentAt := uint64(23456) confirmed := sentAt + 1 - safe := confirmed + config.MinOutgoingConfirmations() ethMock.Register("eth_sendRawTransaction", hash, func(_ interface{}, data ...interface{}) error { rlp := data[0].([]interface{})[0].(string) @@ -118,7 +114,6 @@ func TestEthTxAdapter_Perform_ConfirmedWithBytes(t *testing.T) { ethMock.Register("eth_blockNumber", utils.Uint64ToHex(sentAt)) receipt := models.TxReceipt{Hash: hash, BlockNumber: cltest.Int(confirmed)} ethMock.Register("eth_getTransactionReceipt", receipt) - ethMock.Register("eth_blockNumber", utils.Uint64ToHex(safe)) adapter := adapters.EthTx{ Address: address, @@ -140,13 +135,12 @@ func TestEthTxAdapter_Perform_ConfirmedWithBytes(t *testing.T) { ethMock.EventuallyAllCalled(t) } -func TestEthTxAdapter_Perform_ConfirmedWithBytesAndNoDataPrefix(t *testing.T) { +func TestEthTxAdapter_Perform_SafeWithBytesAndNoDataPrefix(t *testing.T) { t.Parallel() app, cleanup := cltest.NewApplicationWithKey(t) defer cleanup() store := app.Store - config := store.Config address := cltest.NewAddress() fHash := models.HexToFunctionSelector("b3f98adc") @@ -158,9 +152,7 @@ func TestEthTxAdapter_Perform_ConfirmedWithBytesAndNoDataPrefix(t *testing.T) { require.NoError(t, app.StartAndConnect()) hash := cltest.NewHash() - sentAt := uint64(23456) - confirmed := sentAt + 1 - safe := confirmed + config.MinOutgoingConfirmations() + currentHeight := uint64(23456) ethMock.Register("eth_sendRawTransaction", hash, func(_ interface{}, data ...interface{}) error { rlp := data[0].([]interface{})[0].(string) @@ -175,10 +167,10 @@ func TestEthTxAdapter_Perform_ConfirmedWithBytesAndNoDataPrefix(t *testing.T) { assert.Equal(t, wantData, hexutil.Encode(tx.Data())) return nil }) - ethMock.Register("eth_blockNumber", utils.Uint64ToHex(sentAt)) - receipt := models.TxReceipt{Hash: hash, BlockNumber: cltest.Int(confirmed)} + ethMock.Register("eth_blockNumber", utils.Uint64ToHex(currentHeight)) + safe := currentHeight - store.Config.MinOutgoingConfirmations() + receipt := models.TxReceipt{Hash: hash, BlockNumber: cltest.Int(safe)} ethMock.Register("eth_getTransactionReceipt", receipt) - ethMock.Register("eth_blockNumber", utils.Uint64ToHex(safe)) adapter := adapters.EthTx{ Address: address, @@ -422,27 +414,65 @@ func TestEthTxAdapter_Perform_WithErrorInvalidInput(t *testing.T) { assert.Contains(t, output.Error(), "Cannot connect to nodes") } -func TestEthTxAdapter_Perform_PendingConfirmations_WithErrorInTxManager(t *testing.T) { +func TestEthTxAdapter_Perform_PendingConfirmations_WithFatalErrorInTxManager(t *testing.T) { t.Parallel() app, cleanup := cltest.NewApplicationWithKey(t) defer cleanup() store := app.Store - ethMock := app.MockEthClient() - ethMock.Register("eth_getTransactionCount", `0x0100`) + ethMock := app.MockEthClient(cltest.Strict) + ethMock.Register("eth_getTransactionCount", `0x17`) assert.Nil(t, app.Start()) + require.NoError(t, app.WaitForConnection()) + adapter := adapters.EthTx{ Address: cltest.NewAddress(), FunctionSelector: models.HexToFunctionSelector("0xb3f98adc"), } - input := cltest.RunResultWithResult("") + input := cltest.RunResultWithResult(cltest.NewHash().String()) input.Status = models.RunStatusPendingConfirmations - ethMock.RegisterError("eth_blockNumber", "Cannot connect to nodes") + ethMock.RegisterError("eth_blockNumber", "Invalid node id") output := adapter.Perform(input, store) - assert.False(t, output.HasError()) + ethMock.AssertAllCalled() + + assert.Equal(t, models.RunStatusErrored, output.Status) + assert.NotNil(t, output.Error()) +} + +func TestEthTxAdapter_Perform_PendingConfirmations_WithRecoverableErrorInTxManager(t *testing.T) { + t.Parallel() + + app, cleanup := cltest.NewApplicationWithKey(t) + defer cleanup() + + store := app.Store + ethMock := app.MockEthClient(cltest.Strict) + ethMock.Register("eth_getTransactionCount", `0x12`) + assert.Nil(t, app.Start()) + + from := cltest.GetAccountAddress(t, store) + tx := cltest.CreateTx(t, store, from, uint64(14372)) + input := cltest.RunResultWithResult(tx.Attempts[0].Hash.String()) + input.Status = models.RunStatusPendingConfirmations + + ethMock.Register("eth_blockNumber", "0x100") + ethMock.RegisterError("eth_getTransactionReceipt", "Connection reset by peer") + + require.NoError(t, app.WaitForConnection()) + + adapter := adapters.EthTx{ + Address: cltest.NewAddress(), + FunctionSelector: models.HexToFunctionSelector("0xb3f98adc"), + } + output := adapter.Perform(input, store) + + ethMock.AssertAllCalled() + + assert.Equal(t, models.RunStatusPendingConfirmations, output.Status) + assert.NoError(t, output.GetError()) } func TestEthTxAdapter_DeserializationBytesFormat(t *testing.T) { @@ -451,6 +481,9 @@ func TestEthTxAdapter_DeserializationBytesFormat(t *testing.T) { ctrl := gomock.NewController(t) txmMock := mocks.NewMockTxManager(ctrl) store.TxManager = txmMock + + txAttempt := &models.TxAttempt{} + tx := &models.Tx{Attempts: []*models.TxAttempt{txAttempt}} txmMock.EXPECT().Register(gomock.Any()) txmMock.EXPECT().Connected().Return(true).AnyTimes() txmMock.EXPECT().CreateTxWithGas(gomock.Any(), gomock.Any(), hexutil.MustDecode( @@ -458,8 +491,8 @@ func TestEthTxAdapter_DeserializationBytesFormat(t *testing.T) { "0000000000000000000000000000000000000000000000000000000000000020"+ "000000000000000000000000000000000000000000000000000000000000000b"+ "68656c6c6f20776f726c64000000000000000000000000000000000000000000"), - gomock.Any(), gomock.Any()).Return(&models.Tx{}, nil) - txmMock.EXPECT().BumpGasUntilSafe(gomock.Any()) + gomock.Any(), gomock.Any()).Return(tx, nil) + txmMock.EXPECT().CheckAttempt(txAttempt, uint64(0)).Return(&models.TxReceipt{}, strpkg.Unconfirmed, nil) task := models.TaskSpec{} err := json.Unmarshal([]byte(`{"type": "EthTx", "params": {"format": "bytes"}}`), &task) @@ -541,8 +574,6 @@ func TestEthTxAdapter_Perform_NoDoubleSpendOnSendTransactionFail(t *testing.T) { app, cleanup := cltest.NewApplicationWithKey(t) defer cleanup() store := app.Store - config := store.Config - ethMock := app.MockEthClient(cltest.Strict) ethMock.Register("eth_getTransactionCount", `0x1`) @@ -558,8 +589,6 @@ func TestEthTxAdapter_Perform_NoDoubleSpendOnSendTransactionFail(t *testing.T) { hash := cltest.NewHash() sentAt := uint64(9183) - ethMock.Register("eth_getBalance", "0x100") - ethMock.Register("eth_call", "0x100") var firstTxData []interface{} ethMock.Register("eth_sendRawTransaction", hash, @@ -582,7 +611,6 @@ func TestEthTxAdapter_Perform_NoDoubleSpendOnSendTransactionFail(t *testing.T) { // Run the adapter again confirmed := sentAt + 1 - safe := confirmed + config.MinOutgoingConfirmations() ethMock.Register("eth_blockNumber", utils.Uint64ToHex(confirmed)) var secondTxData []interface{} @@ -593,7 +621,6 @@ func TestEthTxAdapter_Perform_NoDoubleSpendOnSendTransactionFail(t *testing.T) { }) receipt := models.TxReceipt{Hash: hash, BlockNumber: cltest.Int(confirmed)} ethMock.Register("eth_getTransactionReceipt", receipt) - ethMock.Register("eth_blockNumber", utils.Uint64ToHex(safe)) data = adapter.Perform(input, store) assert.NoError(t, data.GetError()) @@ -647,7 +674,8 @@ func TestEthTxAdapter_Perform_NoDoubleSpendOnSendTransactionFailAndNonceChange(t firstTxData = data return errors.New("no bueno") }) - ethMock.Register("eth_blockNumber", utils.Uint64ToHex(sentAt)) + receipt := models.TxReceipt{Hash: hash, BlockNumber: cltest.Int(sentAt)} + ethMock.Register("eth_getTransactionReceipt", receipt) adapter := adapters.EthTx{ Address: address, diff --git a/core/internal/cltest/mocks.go b/core/internal/cltest/mocks.go index 209b8577bfe..3f3c92339a6 100644 --- a/core/internal/cltest/mocks.go +++ b/core/internal/cltest/mocks.go @@ -172,6 +172,11 @@ func (mock *EthMock) EventuallyAllCalled(t *testing.T) { g.Eventually(mock.Remaining).Should(gomega.HaveLen(0)) } +// AssertAllCalled immediately checks that all calls have been made +func (mock *EthMock) AssertAllCalled() { + assert.Empty(mock.t, mock.Remaining()) +} + // Call will call given method and set the result func (mock *EthMock) Call(result interface{}, method string, args ...interface{}) error { mock.mutex.Lock() diff --git a/core/internal/features_test.go b/core/internal/features_test.go index ab0fc3e790b..c7326f397e1 100644 --- a/core/internal/features_test.go +++ b/core/internal/features_test.go @@ -78,7 +78,6 @@ func TestIntegration_HttpRequestWithHeaders(t *testing.T) { eth.Context("ethTx.Perform()#1 at block 23456", func(eth *cltest.EthMock) { eth.Register("eth_blockNumber", utils.Uint64ToHex(sentAt)) eth.Register("eth_sendRawTransaction", attempt1Hash) // Initial tx attempt sent - eth.Register("eth_blockNumber", utils.Uint64ToHex(sentAt)) eth.Register("eth_getTransactionReceipt", unconfirmedReceipt) }) j := cltest.CreateHelloWorldJobViaWeb(t, app, mockServer.URL) @@ -133,7 +132,6 @@ func TestIntegration_FeeBump(t *testing.T) { eth.Context("ethTx.Perform()#1 at block 23456", func(eth *cltest.EthMock) { eth.Register("eth_blockNumber", utils.Uint64ToHex(sentAt)) eth.Register("eth_sendRawTransaction", attempt1Hash) // Initial tx attempt sent - eth.Register("eth_blockNumber", utils.Uint64ToHex(sentAt)) eth.Register("eth_getTransactionReceipt", unconfirmedReceipt) }) j := cltest.CreateHelloWorldJobViaWeb(t, app, mockServer.URL) @@ -572,22 +570,21 @@ func TestIntegration_NonceManagement_firstRunWithExistingTxs(t *testing.T) { eth := app.MockEthClient() eth.Context("app.Start()", func(eth *cltest.EthMock) { - eth.Register("eth_getTransactionCount", `0x0100`) // activate account nonce + eth.Register("eth_getTransactionCount", `0x100`) // activate account nonce }) require.NoError(t, app.StartAndConnect()) createCompletedJobRun := func(blockNumber uint64, expectedNonce uint64) { hash := common.HexToHash("0xb7862c896a6ba2711bccc0410184e46d793ea83b3e05470f1d359ea276d16bb5") + confirmedBlockNumber := blockNumber - app.Store.Config.MinOutgoingConfirmations() + eth.Context("ethTx.Perform()", func(eth *cltest.EthMock) { eth.Register("eth_blockNumber", utils.Uint64ToHex(blockNumber)) eth.Register("eth_sendRawTransaction", hash) - eth.Register("eth_getTransactionReceipt", models.TxReceipt{ Hash: hash, - BlockNumber: cltest.Int(blockNumber), + BlockNumber: cltest.Int(confirmedBlockNumber), }) - confirmedBlockNumber := blockNumber + app.Store.Config.MinOutgoingConfirmations() - eth.Register("eth_blockNumber", utils.Uint64ToHex(confirmedBlockNumber)) }) jr := cltest.CreateJobRunViaWeb(t, app, j, `{"result":"0x11"}`) @@ -599,8 +596,8 @@ func TestIntegration_NonceManagement_firstRunWithExistingTxs(t *testing.T) { assert.Equal(t, expectedNonce, tx.Nonce) } - createCompletedJobRun(100, uint64(0x0100)) - createCompletedJobRun(200, uint64(0x0101)) + createCompletedJobRun(100, uint64(0x100)) + createCompletedJobRun(200, uint64(0x101)) } func TestIntegration_CreateServiceAgreement(t *testing.T) { diff --git a/core/internal/mocks/tx_manager_mocks.go b/core/internal/mocks/tx_manager_mocks.go index 1769a690ef3..9608a9798d3 100644 --- a/core/internal/mocks/tx_manager_mocks.go +++ b/core/internal/mocks/tx_manager_mocks.go @@ -41,12 +41,13 @@ func (m *MockTxManager) EXPECT() *MockTxManagerMockRecorder { } // BumpGasUntilSafe mocks base method -func (m *MockTxManager) BumpGasUntilSafe(arg0 common.Hash) (*models.TxReceipt, error) { +func (m *MockTxManager) BumpGasUntilSafe(arg0 common.Hash) (*models.TxReceipt, store.AttemptState, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "BumpGasUntilSafe", arg0) ret0, _ := ret[0].(*models.TxReceipt) - ret1, _ := ret[1].(error) - return ret0, ret1 + ret1, _ := ret[1].(store.AttemptState) + ret2, _ := ret[2].(error) + return ret0, ret1, ret2 } // BumpGasUntilSafe indicates an expected call of BumpGasUntilSafe @@ -55,6 +56,22 @@ func (mr *MockTxManagerMockRecorder) BumpGasUntilSafe(arg0 interface{}) *gomock. return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BumpGasUntilSafe", reflect.TypeOf((*MockTxManager)(nil).BumpGasUntilSafe), arg0) } +// CheckAttempt mocks base method +func (m *MockTxManager) CheckAttempt(arg0 *models.TxAttempt, arg1 uint64) (*models.TxReceipt, store.AttemptState, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CheckAttempt", arg0, arg1) + ret0, _ := ret[0].(*models.TxReceipt) + ret1, _ := ret[1].(store.AttemptState) + ret2, _ := ret[2].(error) + return ret0, ret1, ret2 +} + +// CheckAttempt indicates an expected call of CheckAttempt +func (mr *MockTxManagerMockRecorder) CheckAttempt(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CheckAttempt", reflect.TypeOf((*MockTxManager)(nil).CheckAttempt), arg0, arg1) +} + // Connect mocks base method func (m *MockTxManager) Connect(arg0 *models.Head) error { m.ctrl.T.Helper() diff --git a/core/store/tx_manager.go b/core/store/tx_manager.go index cfdcd1a8b02..cde5ddecdfa 100644 --- a/core/store/tx_manager.go +++ b/core/store/tx_manager.go @@ -43,8 +43,9 @@ type TxManager interface { CreateTx(to common.Address, data []byte) (*models.Tx, error) CreateTxWithGas(surrogateID null.String, to common.Address, data []byte, gasPriceWei *big.Int, gasLimit uint64) (*models.Tx, error) CreateTxWithEth(from, to common.Address, value *assets.Eth) (*models.Tx, error) + CheckAttempt(txAttempt *models.TxAttempt, blockHeight uint64) (*models.TxReceipt, AttemptState, error) - BumpGasUntilSafe(hash common.Hash) (*models.TxReceipt, error) + BumpGasUntilSafe(hash common.Hash) (*models.TxReceipt, AttemptState, error) ContractLINKBalance(wr models.WithdrawalRequest) (assets.Link, error) WithdrawLINK(wr models.WithdrawalRequest) (common.Hash, error) @@ -227,7 +228,7 @@ func (txm *EthTxManager) sendInitialTx( gasLimit uint64, value *assets.Eth) (*models.Tx, error) { - blockNumber, err := txm.getBlockNumber() + blockHeight, err := txm.getBlockNumber() if err != nil { return nil, errors.Wrap(err, "TxManager#sendInitialTx getBlockNumber") } @@ -246,11 +247,13 @@ func (txm *EthTxManager) sendInitialTx( return errors.Wrap(err, "TxManager#sendInitialTx newEthTx") } - tx, err = txm.orm.CreateTx(surrogateID, ethTx, &ma.Address, blockNumber) + tx, err = txm.orm.CreateTx(surrogateID, ethTx, &ma.Address, blockHeight) if err != nil { return errors.Wrap(err, "TxManager#sendInitialTx CreateTx") } + logger.Debugw(fmt.Sprintf("Adding Tx attempt #%d", 0), "txID", tx.ID) + _, err = txm.SendRawTx(tx.SignedRawTx) if err != nil { return errors.Wrap(err, "TxManager#sendInitialTx SendRawTx") @@ -278,7 +281,7 @@ func (txm *EthTxManager) retryInitialTx( return errors.Wrap(err, "TxManager#retryInitialTx ReloadNonce") } - blockNumber, err := txm.getBlockNumber() + blockHeight, err := txm.getBlockNumber() if err != nil { return errors.Wrap(err, "TxManager#retryInitialTx getBlockNumber") } @@ -296,7 +299,7 @@ func (txm *EthTxManager) retryInitialTx( return errors.Wrap(err, "TxManager#retryInitialTx newEthTx") } - err = txm.orm.UpdateTx(tx, ethTx, &ma.Address, blockNumber) + err = txm.orm.UpdateTx(tx, ethTx, &ma.Address, blockHeight) if err != nil { return errors.Wrap(err, "TxManager#retryInitialTx UpdateTx") } @@ -353,27 +356,28 @@ func (txm *EthTxManager) GetLINKBalance(address common.Address) (*assets.Link, e return (*assets.Link)(balance), nil } -// BumpGasUntilSafe returns true if the given transaction hash has been -// confirmed on the blockchain. -func (txm *EthTxManager) BumpGasUntilSafe(hash common.Hash) (*models.TxReceipt, error) { - blockNumber, err := txm.getBlockNumber() +// BumpGasUntilSafe process a collection of related TxAttempts, trying to get +// at least one TxAttempt into a safe state, bumping gas if needed +func (txm *EthTxManager) BumpGasUntilSafe(hash common.Hash) (*models.TxReceipt, AttemptState, error) { + blockHeight, err := txm.getBlockNumber() if err != nil { - return nil, errors.Wrap(err, "BumpGasUntilSafe getBlockNumber") + return nil, Unknown, errors.Wrap(err, "BumpGasUntilSafe getBlockNumber") } tx, err := txm.orm.FindTxByAttempt(hash) if err != nil { - return nil, errors.Wrap(err, "BumpGasUntilSafe FindTxByAttempt") + return nil, Unknown, errors.Wrap(err, "BumpGasUntilSafe FindTxByAttempt") } var merr error for attemptIndex := range tx.Attempts { - receipt, state, err := txm.checkAttempt(tx, attemptIndex, blockNumber) - if state == safe || state == confirmed { - return receipt, err // success, so all other attempt errors can be ignored. + receipt, state, err := txm.processAttempt(tx, attemptIndex, blockHeight) + if state == Safe || state == Confirmed { + return receipt, state, err // success, so all other attempt errors can be ignored. } merr = multierr.Append(merr, err) } - return nil, merr + + return nil, Unconfirmed, merr } // ContractLINKBalance returns the balance for the contract associated with this @@ -398,6 +402,16 @@ func (txm *EthTxManager) ContractLINKBalance(wr models.WithdrawalRequest) (asset return *linkBalance, nil } +// GetETHAndLINKBalances attempts to retrieve the ethereum node's perception of +// the latest ETH and LINK balances for the active account on the txm, or an +// error on failure. +func (txm *EthTxManager) GetETHAndLINKBalances(address common.Address) (*big.Int, *assets.Link, error) { + linkBalance, linkErr := txm.GetLINKBalance(address) + ethBalance, ethErr := txm.EthClient.GetWeiBalance(address) + merr := multierr.Append(linkErr, ethErr) + return ethBalance, linkBalance, merr +} + // WithdrawLINK withdraws the given amount of LINK from the contract to the // configured withdrawal address. If wr.ContractAddress is empty (zero address), // funds are withdrawn from configured OracleContractAddress. @@ -429,167 +443,165 @@ func (txm *EthTxManager) WithdrawLINK(wr models.WithdrawalRequest) (common.Hash, return tx.Hash, nil } -func (txm *EthTxManager) createAttempt( - tx *models.Tx, - gasPriceWei *big.Int, - blockNumber uint64, -) (*models.TxAttempt, error) { - ma := txm.getAccount(tx.From) - if ma == nil { - return nil, fmt.Errorf("Unable to locate %v as an available account in EthTxManager. Has TxManager been started or has the address been removed?", tx.From.Hex()) - } - etx := tx.EthTx(gasPriceWei) - etx, err := txm.keyStore.SignTx(ma.Account, etx, txm.config.ChainID()) +// CheckAttempt retrieves a receipt for a TxAttempt, and check if it meets the +// minimum number of confirmations +func (txm *EthTxManager) CheckAttempt(txAttempt *models.TxAttempt, blockHeight uint64) (*models.TxReceipt, AttemptState, error) { + receipt, err := txm.GetTxReceipt(txAttempt.Hash) if err != nil { - return nil, errors.Wrap(err, "createAttempt#SignTx failed") + return nil, Unknown, errors.Wrap(err, "CheckAttempt GetTxReceipt failed") } - logger.Debugw(fmt.Sprintf("Adding Tx attempt #%d", len(tx.Attempts)+1), "txId", tx.ID) - - txAttempt, err := txm.orm.AddTxAttempt(tx, etx, blockNumber) - if err != nil { - return nil, errors.Wrap(err, "createAttempt#AddTxAttempt failed") + if receipt.Unconfirmed() { + return receipt, Unconfirmed, nil } - if _, err = txm.SendRawTx(txAttempt.SignedRawTx); err != nil { - return nil, errors.Wrap(err, "createAttempt#SendRawTx failed") + minimumConfirmations := new(big.Int).SetUint64(txm.config.MinOutgoingConfirmations()) + confirmedAt := new(big.Int).Add(minimumConfirmations, receipt.BlockNumber.ToInt()) + + // 0 based indexing since receipt is 1 conf + confirmedAt.Sub(confirmedAt, big.NewInt(1)) + + if new(big.Int).SetUint64(blockHeight).Cmp(confirmedAt) == -1 { + return receipt, Confirmed, nil } - return txAttempt, nil + return receipt, Safe, nil } -type attemptState int +// AttemptState enumerates the possible states of a transaction attempt as it +// gets accepted and confirmed by the blockchain +type AttemptState int const ( - unconfirmed attemptState = iota - confirmed - safe + // Unknown is returned when the state of a transaction could not be + // determined because of an error + Unknown AttemptState = iota + // Unconfirmed means that a transaction has had no confirmations at all + Unconfirmed + // Confirmed means that a transaftion has had at least one transaction, but + // not enough to satisfy the minimum number of confirmations configuration + // option + Confirmed + // Safe has the required number of confirmations or more + Safe ) -func (txm *EthTxManager) checkAttempt( +// String conforms to the Stringer interface for AttemptState +func (a AttemptState) String() string { + switch a { + case Unconfirmed: + return "unconfirmed" + case Confirmed: + return "confirmed" + case Safe: + return "safe" + default: + return "unknown" + } +} + +// processAttempt checks the state of a transaction attempt on the blockchain +// and decides if it is safe, needs bumping or more confirmations are needed to +// decide +func (txm *EthTxManager) processAttempt( tx *models.Tx, attemptIndex int, - blockNumber uint64, -) (*models.TxReceipt, attemptState, error) { + blockHeight uint64, +) (*models.TxReceipt, AttemptState, error) { txAttempt := tx.Attempts[attemptIndex] - logger.Debugw(fmt.Sprintf("Checking Tx attempt #%d", attemptIndex), "txId", tx.ID) + logger.Debugw( + fmt.Sprintf("Tx #%d checking on-chain state", attemptIndex), + "txHash", txAttempt.Hash.String(), + "txID", txAttempt.TxID, + ) - receipt, err := txm.GetTxReceipt(txAttempt.Hash) + receipt, state, err := txm.CheckAttempt(txAttempt, blockHeight) if err != nil { - return nil, unconfirmed, errors.Wrap(err, "checkAttempt GetTxReceipt") + return nil, Unknown, errors.Wrap(err, "processAttempt CheckAttempt failed") } - if receipt.Unconfirmed() { - return txm.handleUnconfirmed(tx, attemptIndex, blockNumber) + logger.Debugw( + fmt.Sprintf("Tx #%d is %s", attemptIndex, state), + "txHash", txAttempt.Hash.String(), + "txID", txAttempt.TxID, + "receiptBlockNumber", receipt.BlockNumber.ToInt(), + "currentBlockNumber", blockHeight, + "receiptHash", receipt.Hash.Hex(), + ) + + switch state { + case Safe: + return receipt, state, txm.handleSafe(tx, attemptIndex) + + case Confirmed: // nothing to do, need to wait + return receipt, state, nil + + case Unconfirmed: + if isLatestAttempt(tx, txAttempt) && txm.hasTxAttemptMetGasBumpThreshold(tx, attemptIndex, blockHeight) { + err = txm.bumpGas(tx, attemptIndex, blockHeight) + } + + return receipt, state, err } - return txm.handleConfirmed(tx, attemptIndex, receipt, blockNumber) -} -// GetETHAndLINKBalances attempts to retrieve the ethereum node's perception of -// the latest ETH and LINK balances for the active account on the txm, or an -// error on failure. -func (txm *EthTxManager) GetETHAndLINKBalances(address common.Address) (*big.Int, *assets.Link, error) { - linkBalance, linkErr := txm.GetLINKBalance(address) - ethBalance, ethErr := txm.EthClient.GetWeiBalance(address) - merr := multierr.Append(linkErr, ethErr) - return ethBalance, linkBalance, merr + panic("invariant violated, 'Unknown' state returned without error") } -// handleConfirmed checks whether a tx is confirmed, and records and reports it -// as such if so. Its bool return value is true if the tx is confirmed and it -// was successfully recorded as confirmed. -func (txm *EthTxManager) handleConfirmed( +// hasTxAttemptMetGasBumpThreshold returns true if the current block height +// exceeds the configured gas bump threshold, indicating that it is time for a +// new transaction attempt to be created with an increased gas price +func (txm *EthTxManager) hasTxAttemptMetGasBumpThreshold( tx *models.Tx, attemptIndex int, - rcpt *models.TxReceipt, - blockNumber uint64, -) (*models.TxReceipt, attemptState, error) { + blockHeight uint64) bool { + + gasBumpThreshold := txm.config.EthGasBumpThreshold() txAttempt := tx.Attempts[attemptIndex] - minConfs := big.NewInt(int64(txm.config.MinOutgoingConfirmations())) - confirmedAt := big.NewInt(0).Add(minConfs, rcpt.BlockNumber.ToInt()) - confirmedAt.Sub(confirmedAt, big.NewInt(1)) // 0 based indexing since rcpt is 1 conf + return blockHeight >= txAttempt.SentAt+gasBumpThreshold +} - logger.Debugw( - fmt.Sprintf("Tx #%d checking for minimum of %v confirmations", attemptIndex, minConfs), - "txHash", txAttempt.Hash.String(), - "txid", txAttempt.TxID, - "receiptBlockNumber", rcpt.BlockNumber.ToInt(), - "currentBlockNumber", blockNumber, - "receiptHash", rcpt.Hash.Hex(), - ) +// isLatestAttempt returns true only if the attempt is the last +// attempt associated with the transaction, alluding to the fact that +// it has the highest gas price after subsequent bumps. +func isLatestAttempt(tx *models.Tx, txAttempt *models.TxAttempt) bool { + return tx.Hash == txAttempt.Hash +} - if big.NewInt(int64(blockNumber)).Cmp(confirmedAt) == -1 { - return nil, confirmed, nil - } +// handleSafe marks a transaction as safe, no more work needs to be done +func (txm *EthTxManager) handleSafe( + tx *models.Tx, + attemptIndex int) error { + txAttempt := tx.Attempts[attemptIndex] if err := txm.orm.MarkTxSafe(tx, txAttempt); err != nil { - return nil, confirmed, err + return errors.Wrap(err, "handleSafe MarkTxSafe failed") } + minimumConfirmations := txm.config.MinOutgoingConfirmations() ethBalance, linkBalance, balanceErr := txm.GetETHAndLINKBalances(tx.From) + logger.Infow( - fmt.Sprintf("Tx #%d got minimum confirmations (%d)", txAttempt.TxID, minConfs), + fmt.Sprintf("Tx #%d got minimum confirmations (%d)", attemptIndex, minimumConfirmations), "txHash", txAttempt.Hash.String(), - "txid", txAttempt.TxID, + "txID", txAttempt.TxID, "ethBalance", ethBalance, "linkBalance", linkBalance, "err", balanceErr, ) - return rcpt, safe, nil + return nil } -func (txm *EthTxManager) handleUnconfirmed( - tx *models.Tx, - attemptIndex int, - blockNumber uint64, -) (*models.TxReceipt, attemptState, error) { +// bumpGas creates a new transaction attempt with an increased gas cost +func (txm *EthTxManager) bumpGas(tx *models.Tx, attemptIndex int, blockHeight uint64) error { txAttempt := tx.Attempts[attemptIndex] - if !isLatestAttempt(tx, txAttempt) { - return nil, unconfirmed, nil - } - - gasBumpThreshold := txm.config.EthGasBumpThreshold() - logParams := []interface{}{ - "txHash", txAttempt.Hash.String(), - "txId", tx.ID, - "nonce", tx.Nonce, - "gasPrice", txAttempt.GasPrice.String(), - "from", tx.From.Hex(), - "blockNumber", blockNumber, - "sentAt", txAttempt.SentAt, - "gasBumpThreshold", gasBumpThreshold, - } - if blockNumber >= txAttempt.SentAt+gasBumpThreshold { - logger.Debugw( - fmt.Sprintf("Tx #%d unconfirmed, bumping gas", attemptIndex), - logParams..., - ) - return nil, unconfirmed, txm.bumpGas(tx, attemptIndex, blockNumber) - } - logger.Infow( - fmt.Sprintf("Tx #%d unconfirmed, not yet ready to bump gas", attemptIndex), - logParams..., - ) - return nil, unconfirmed, nil -} - -// isLatestAttempt returns true only if the attempt is the last -// attempt associated with the transaction, alluding to the fact that -// it has the highest gas price after subsequent bumps. -func isLatestAttempt(tx *models.Tx, txAttempt *models.TxAttempt) bool { - return tx.Hash == txAttempt.Hash -} - -func (txm *EthTxManager) bumpGas(tx *models.Tx, attemptIndex int, blockNumber uint64) error { - txAttempt := tx.Attempts[attemptIndex] originalGasPrice := txAttempt.GasPrice.ToInt() bumpedGasPrice := new(big.Int).Add(originalGasPrice, txm.config.EthGasBumpWei()) - bumpedTxAttempt, err := txm.createAttempt(tx, bumpedGasPrice, blockNumber) + bumpedTxAttempt, err := txm.createAttempt(tx, bumpedGasPrice, blockHeight) if err != nil { return errors.Wrapf(err, "bumpGas from Tx #%s", txAttempt.Hash.Hex()) } @@ -601,6 +613,36 @@ func (txm *EthTxManager) bumpGas(tx *models.Tx, attemptIndex int, blockNumber ui return nil } +// createAttempt adds a new transaction attempt to a transaction record +func (txm *EthTxManager) createAttempt( + tx *models.Tx, + gasPriceWei *big.Int, + blockHeight uint64, +) (*models.TxAttempt, error) { + ma := txm.getAccount(tx.From) + if ma == nil { + return nil, fmt.Errorf("Unable to locate %v as an available account in EthTxManager. Has TxManager been started or has the address been removed?", tx.From.Hex()) + } + etx := tx.EthTx(gasPriceWei) + etx, err := txm.keyStore.SignTx(ma.Account, etx, txm.config.ChainID()) + if err != nil { + return nil, errors.Wrap(err, "createAttempt#SignTx failed") + } + + logger.Debugw(fmt.Sprintf("Adding Tx attempt #%d", len(tx.Attempts)+1), "txID", tx.ID) + + txAttempt, err := txm.orm.AddTxAttempt(tx, etx, blockHeight) + if err != nil { + return nil, errors.Wrap(err, "createAttempt#AddTxAttempt failed") + } + + if _, err = txm.SendRawTx(txAttempt.SignedRawTx); err != nil { + return nil, errors.Wrap(err, "createAttempt#SendRawTx failed") + } + + return txAttempt, nil +} + // NextActiveAccount uses round robin to select a managed account // from the list of available accounts as defined in Register(...) func (txm *EthTxManager) NextActiveAccount() *ManagedAccount { diff --git a/core/store/tx_manager_test.go b/core/store/tx_manager_test.go index 4fadab46a27..ecef6e879cf 100644 --- a/core/store/tx_manager_test.go +++ b/core/store/tx_manager_test.go @@ -97,8 +97,9 @@ func TestTxManager_CreateTx_RoundRobinSuccess(t *testing.T) { ethMock.Register("eth_blockNumber", utils.Uint64ToHex(sentAt+config.EthGasBumpThreshold())) }) - _, err = manager.BumpGasUntilSafe(createdTx1.Attempts[0].Hash) + _, state, err := manager.BumpGasUntilSafe(createdTx1.Attempts[0].Hash) require.NoError(t, err) + assert.Equal(t, strpkg.Unconfirmed, state) // retrieve new gas bumped second attempt createdTx1, err = store.FindTx(createdTx1.ID) @@ -327,14 +328,13 @@ func TestTxManager_BumpGasUntilSafe_lessThanGasBumpThreshold(t *testing.T) { tx := cltest.CreateTx(t, store, from, sentAt) require.Greater(t, len(tx.Attempts), 0) - a := tx.Attempts[0] - ethMock.Register("eth_getTransactionReceipt", models.TxReceipt{}) ethMock.Register("eth_blockNumber", utils.Uint64ToHex(gasThreshold-1)) - receipt, err := txm.BumpGasUntilSafe(a.Hash) + receipt, state, err := txm.BumpGasUntilSafe(tx.Attempts[0].Hash) assert.NoError(t, err) assert.Nil(t, receipt) + assert.Equal(t, strpkg.Unconfirmed, state) tx, err = store.FindTx(tx.ID) require.NoError(t, err) @@ -363,15 +363,14 @@ func TestTxManager_BumpGasUntilSafe_atGasBumpThreshold(t *testing.T) { tx := cltest.CreateTx(t, store, from, sentAt) require.Greater(t, len(tx.Attempts), 0) - a := tx.Attempts[0] - - ethMock.Register("eth_getTransactionReceipt", models.TxReceipt{}) ethMock.Register("eth_blockNumber", utils.Uint64ToHex(gasThreshold)) + ethMock.Register("eth_getTransactionReceipt", models.TxReceipt{}) ethMock.Register("eth_sendRawTransaction", cltest.NewHash()) - receipt, err := txm.BumpGasUntilSafe(a.Hash) + receipt, state, err := txm.BumpGasUntilSafe(tx.Attempts[0].Hash) assert.NoError(t, err) assert.Nil(t, receipt) + assert.Equal(t, strpkg.Unconfirmed, state) tx, err = store.FindTx(tx.ID) require.NoError(t, err) @@ -400,15 +399,14 @@ func TestTxManager_BumpGasUntilSafe_exceedsGasBumpThreshold(t *testing.T) { tx := cltest.CreateTx(t, store, from, sentAt) require.Greater(t, len(tx.Attempts), 0) - a := tx.Attempts[0] - - ethMock.Register("eth_getTransactionReceipt", models.TxReceipt{}) ethMock.Register("eth_blockNumber", utils.Uint64ToHex(gasThreshold+1)) + ethMock.Register("eth_getTransactionReceipt", models.TxReceipt{}) ethMock.Register("eth_sendRawTransaction", cltest.NewHash()) - receipt, err := txm.BumpGasUntilSafe(a.Hash) + receipt, state, err := txm.BumpGasUntilSafe(tx.Attempts[0].Hash) assert.NoError(t, err) assert.Nil(t, receipt) + assert.Equal(t, strpkg.Unconfirmed, state) tx, err = store.FindTx(tx.ID) require.NoError(t, err) @@ -438,14 +436,13 @@ func TestTxManager_BumpGasUntilSafe_confirmed_lessThanGasThreshold(t *testing.T) tx := cltest.CreateTx(t, store, from, sentAt) require.Greater(t, len(tx.Attempts), 0) - a := tx.Attempts[0] - - ethMock.Register("eth_getTransactionReceipt", models.TxReceipt{Hash: cltest.NewHash(), BlockNumber: cltest.Int(gasThreshold)}) ethMock.Register("eth_blockNumber", utils.Uint64ToHex(gasThreshold+minConfs-1)) + ethMock.Register("eth_getTransactionReceipt", models.TxReceipt{Hash: cltest.NewHash(), BlockNumber: cltest.Int(gasThreshold)}) - receipt, err := txm.BumpGasUntilSafe(a.Hash) + receipt, state, err := txm.BumpGasUntilSafe(tx.Attempts[0].Hash) assert.NoError(t, err) - assert.Nil(t, receipt) + assert.NotNil(t, receipt) + assert.Equal(t, strpkg.Confirmed, state) tx, err = store.FindTx(tx.ID) require.NoError(t, err) @@ -475,16 +472,15 @@ func TestTxManager_BumpGasUntilSafe_confirmed_atGasBumpThreshold(t *testing.T) { tx := cltest.CreateTx(t, store, from, sentAt) require.Greater(t, len(tx.Attempts), 0) - a := tx.Attempts[0] - - ethMock.Register("eth_getTransactionReceipt", models.TxReceipt{Hash: cltest.NewHash(), BlockNumber: cltest.Int(gasThreshold)}) ethMock.Register("eth_blockNumber", utils.Uint64ToHex(gasThreshold+minConfs)) + ethMock.Register("eth_getTransactionReceipt", models.TxReceipt{Hash: cltest.NewHash(), BlockNumber: cltest.Int(gasThreshold)}) ethMock.Register("eth_getBalance", "0x100") ethMock.Register("eth_call", "0x100") - receipt, err := txm.BumpGasUntilSafe(a.Hash) + receipt, state, err := txm.BumpGasUntilSafe(tx.Attempts[0].Hash) assert.NoError(t, err) assert.NotNil(t, receipt) + assert.Equal(t, strpkg.Safe, state) tx, err = store.FindTx(tx.ID) require.NoError(t, err) @@ -514,16 +510,15 @@ func TestTxManager_BumpGasUntilSafe_confirmed_exceedsGasBumpThreshold(t *testing tx := cltest.CreateTx(t, store, from, sentAt) require.Greater(t, len(tx.Attempts), 0) - a := tx.Attempts[0] - - ethMock.Register("eth_getTransactionReceipt", models.TxReceipt{Hash: cltest.NewHash(), BlockNumber: cltest.Int(gasThreshold)}) ethMock.Register("eth_blockNumber", utils.Uint64ToHex(gasThreshold+minConfs+1)) + ethMock.Register("eth_getTransactionReceipt", models.TxReceipt{Hash: cltest.NewHash(), BlockNumber: cltest.Int(gasThreshold)}) ethMock.Register("eth_getBalance", "0x100") ethMock.Register("eth_call", "0x100") - receipt, err := txm.BumpGasUntilSafe(a.Hash) + receipt, state, err := txm.BumpGasUntilSafe(tx.Attempts[0].Hash) assert.NoError(t, err) assert.NotNil(t, receipt) + assert.Equal(t, strpkg.Safe, state) tx, err = store.FindTx(tx.ID) require.NoError(t, err) @@ -606,7 +601,7 @@ func TestTxManager_BumpGasUntilSafe_erroring(t *testing.T) { ethMock.ShouldCall(test.mockSetup).During(func() { ethMock.Register("eth_blockNumber", utils.Uint64ToHex(test.blockHeight)) require.NoError(t, app.StartAndConnect()) - receipt, err := txm.BumpGasUntilSafe(a.Hash) + receipt, _, err := txm.BumpGasUntilSafe(a.Hash) receiptPresent := receipt != nil require.Equal(t, test.wantReceipt, receiptPresent) @@ -616,6 +611,88 @@ func TestTxManager_BumpGasUntilSafe_erroring(t *testing.T) { } } +func TestTxManager_CheckAttempt(t *testing.T) { + t.Parallel() + + app, cleanup := cltest.NewApplicationWithKey(t) + defer cleanup() + + ethMock := app.MockEthClient(cltest.Strict) + ethMock.Register("eth_getTransactionCount", "0x0") + require.NoError(t, app.StartAndConnect()) + + store := app.Store + config := store.Config + txm := store.TxManager + + from := cltest.GetAccountAddress(t, store) + sentAt := uint64(14770) + hash := cltest.NewHash() + gasBumpThreshold := sentAt + config.EthGasBumpThreshold() + + tx := cltest.CreateTx(t, store, from, sentAt) + require.Len(t, tx.Attempts, 1) + + // Initial check, no receipt, no change of the block height + retrievedReceipt := models.TxReceipt{} + ethMock.Register("eth_getTransactionReceipt", retrievedReceipt) + + receipt, state, err := txm.CheckAttempt(tx.Attempts[0], sentAt) + require.NoError(t, err) + assert.Equal(t, strpkg.Unconfirmed, state) + assert.Equal(t, receipt, &retrievedReceipt) + + ethMock.EventuallyAllCalled(t) + + // A receipt is found, but is not yet safe + retrievedReceipt = models.TxReceipt{Hash: hash, BlockNumber: cltest.Int(sentAt)} + ethMock.Register("eth_getTransactionReceipt", retrievedReceipt) + + receipt, state, err = txm.CheckAttempt(tx.Attempts[0], sentAt) + require.NoError(t, err) + assert.Equal(t, strpkg.Confirmed, state) + assert.Equal(t, receipt, &retrievedReceipt) + + ethMock.EventuallyAllCalled(t) + + // A receipt is found, and now is safe + ethMock.Register("eth_getTransactionReceipt", retrievedReceipt) + + receipt, state, err = txm.CheckAttempt(tx.Attempts[0], sentAt+gasBumpThreshold) + require.NoError(t, err) + assert.Equal(t, strpkg.Safe, state) + assert.Equal(t, receipt, &retrievedReceipt) + + ethMock.EventuallyAllCalled(t) +} + +func TestTxManager_CheckAttempt_error(t *testing.T) { + t.Parallel() + + app, cleanup := cltest.NewApplicationWithKey(t) + defer cleanup() + + ethMock := app.MockEthClient(cltest.Strict) + ethMock.Register("eth_getTransactionCount", "0x0") + require.NoError(t, app.StartAndConnect()) + + store := app.Store + txm := store.TxManager + + sentAt := uint64(14770) + + // Initial check, no receipt, no change of the block height + ethMock.RegisterError("eth_getTransactionReceipt", "that aint gonna work chief") + + txAttempt := &models.TxAttempt{} + receipt, state, err := txm.CheckAttempt(txAttempt, sentAt) + require.Error(t, err) + assert.Equal(t, strpkg.Unknown, state) + assert.Nil(t, receipt) + + ethMock.EventuallyAllCalled(t) +} + func TestTxManager_Register(t *testing.T) { t.Parallel() @@ -826,8 +903,7 @@ func TestTxManager_LogsETHAndLINKBalancesAfterSuccessfulTx(t *testing.T) { ethMock.Register("eth_getTransactionCount", utils.Uint64ToHex(nonce)) ethMock.Register("eth_sendRawTransaction", hash) ethMock.Register("eth_getTransactionReceipt", confirmedReceipt) - ethMock.Register("eth_blockNumber", utils.Uint64ToHex( - confirmedHeight)) + ethMock.Register("eth_blockNumber", utils.Uint64ToHex(confirmedHeight)) ethMock.Register("eth_getBalance", mockedEthBalance) ethMock.Register("eth_call", mockedLinkBalance) }) @@ -838,7 +914,7 @@ func TestTxManager_LogsETHAndLINKBalancesAfterSuccessfulTx(t *testing.T) { require.Len(t, confirmedTx.Attempts, 1) initialSuccessfulAttempt := confirmedTx.Attempts[0] - receipt, err := manager.BumpGasUntilSafe(initialSuccessfulAttempt.Hash) + receipt, _, err := manager.BumpGasUntilSafe(initialSuccessfulAttempt.Hash) assert.NoError(t, err) assert.NotNil(t, receipt) @@ -849,7 +925,10 @@ func TestTxManager_LogsETHAndLINKBalancesAfterSuccessfulTx(t *testing.T) { messages = append(messages, log.Message) } - assert.Contains(t, messages, "Tx #0 checking for minimum of 6 confirmations") + assert.Contains(t, messages, "Tx #0 checking on-chain state") + assert.Contains(t, messages, "Tx #0 is safe") + // This message includes the amounts + assert.Contains(t, messages, "Tx #0 got minimum confirmations (6)") } func TestTxManager_CreateTxWithGas(t *testing.T) {